mirror of
https://github.com/usatiuk/dhfs.git
synced 2025-10-28 12:37:48 +01:00
KleppmannTree javadocs
and some more
This commit is contained in:
@@ -107,6 +107,7 @@ public class DhfsFileService {
|
||||
|
||||
/**
|
||||
* Get the attributes of a file or directory.
|
||||
*
|
||||
* @param uuid the UUID of the file or directory
|
||||
* @return the attributes of the file or directory
|
||||
*/
|
||||
@@ -133,6 +134,7 @@ public class DhfsFileService {
|
||||
|
||||
/**
|
||||
* Try to resolve a path to a file or directory.
|
||||
*
|
||||
* @param name the path to resolve
|
||||
* @return the key of the file or directory, or an empty optional if it does not exist
|
||||
*/
|
||||
@@ -161,6 +163,7 @@ public class DhfsFileService {
|
||||
|
||||
/**
|
||||
* Create a new file with the given name and mode.
|
||||
*
|
||||
* @param name the name of the file
|
||||
* @param mode the mode of the file
|
||||
* @return the key of the created file
|
||||
@@ -191,6 +194,7 @@ public class DhfsFileService {
|
||||
|
||||
/**
|
||||
* Get the parent directory of a file or directory.
|
||||
*
|
||||
* @param ino the key of the file or directory
|
||||
* @return the parent directory
|
||||
*/
|
||||
@@ -207,6 +211,7 @@ public class DhfsFileService {
|
||||
|
||||
/**
|
||||
* Create a new directory with the given name and mode.
|
||||
*
|
||||
* @param name the name of the directory
|
||||
* @param mode the mode of the directory
|
||||
*/
|
||||
@@ -227,6 +232,7 @@ public class DhfsFileService {
|
||||
|
||||
/**
|
||||
* Unlink a file or directory.
|
||||
*
|
||||
* @param name the name of the file or directory
|
||||
* @throws DirectoryNotEmptyException if the directory is not empty and recursive delete is not allowed
|
||||
*/
|
||||
@@ -245,8 +251,9 @@ public class DhfsFileService {
|
||||
|
||||
/**
|
||||
* Rename a file or directory.
|
||||
*
|
||||
* @param from the old name
|
||||
* @param to the new name
|
||||
* @param to the new name
|
||||
* @return true if the rename was successful, false otherwise
|
||||
*/
|
||||
public Boolean rename(String from, String to) {
|
||||
@@ -265,6 +272,7 @@ public class DhfsFileService {
|
||||
|
||||
/**
|
||||
* Change the mode of a file or directory.
|
||||
*
|
||||
* @param uuid the ID of the file or directory
|
||||
* @param mode the new mode
|
||||
* @return true if the mode was changed successfully, false otherwise
|
||||
@@ -291,6 +299,7 @@ public class DhfsFileService {
|
||||
|
||||
/**
|
||||
* Read the contents of a directory.
|
||||
*
|
||||
* @param name the path of the directory
|
||||
* @return an iterable of the names of the files in the directory
|
||||
*/
|
||||
@@ -307,9 +316,10 @@ public class DhfsFileService {
|
||||
|
||||
/**
|
||||
* Read the contents of a file.
|
||||
*
|
||||
* @param fileUuid the ID of the file
|
||||
* @param offset the offset to start reading from
|
||||
* @param length the number of bytes to read
|
||||
* @param offset the offset to start reading from
|
||||
* @param length the number of bytes to read
|
||||
* @return the contents of the file as a ByteString
|
||||
*/
|
||||
public ByteString read(JObjectKey fileUuid, long offset, int length) {
|
||||
@@ -375,6 +385,7 @@ public class DhfsFileService {
|
||||
|
||||
/**
|
||||
* Get the size of a file.
|
||||
*
|
||||
* @param uuid the ID of the file
|
||||
* @return the size of the file
|
||||
*/
|
||||
@@ -391,6 +402,7 @@ public class DhfsFileService {
|
||||
|
||||
/**
|
||||
* Get the size of a chunk.
|
||||
*
|
||||
* @param uuid the ID of the chunk
|
||||
* @return the size of the chunk
|
||||
*/
|
||||
@@ -404,9 +416,10 @@ public class DhfsFileService {
|
||||
|
||||
/**
|
||||
* Write data to a file.
|
||||
*
|
||||
* @param fileUuid the ID of the file
|
||||
* @param offset the offset to write to
|
||||
* @param data the data to write
|
||||
* @param offset the offset to write to
|
||||
* @param data the data to write
|
||||
* @return the number of bytes written
|
||||
*/
|
||||
public Long write(JObjectKey fileUuid, long offset, ByteString data) {
|
||||
@@ -513,8 +526,9 @@ public class DhfsFileService {
|
||||
|
||||
/**
|
||||
* Truncate a file to the given length.
|
||||
*
|
||||
* @param fileUuid the ID of the file
|
||||
* @param length the new length of the file
|
||||
* @param length the new length of the file
|
||||
* @return true if the truncate was successful, false otherwise
|
||||
*/
|
||||
public Boolean truncate(JObjectKey fileUuid, long length) {
|
||||
@@ -608,8 +622,9 @@ public class DhfsFileService {
|
||||
|
||||
/**
|
||||
* Fill the given range with zeroes.
|
||||
*
|
||||
* @param fillStart the start of the range
|
||||
* @param length the end of the range
|
||||
* @param length the end of the range
|
||||
* @param newChunks the map to store the new chunks in
|
||||
*/
|
||||
private void fillZeros(long fillStart, long length, Map<Long, JObjectKey> newChunks) {
|
||||
@@ -649,6 +664,7 @@ public class DhfsFileService {
|
||||
|
||||
/**
|
||||
* Read the contents of a symlink.
|
||||
*
|
||||
* @param uuid the ID of the symlink
|
||||
* @return the contents of the symlink as a string
|
||||
*/
|
||||
@@ -660,6 +676,7 @@ public class DhfsFileService {
|
||||
|
||||
/**
|
||||
* Read the contents of a symlink as a ByteString.
|
||||
*
|
||||
* @param uuid the ID of the symlink
|
||||
* @return the contents of the symlink as a ByteString
|
||||
*/
|
||||
@@ -672,6 +689,7 @@ public class DhfsFileService {
|
||||
|
||||
/**
|
||||
* Create a symlink.
|
||||
*
|
||||
* @param oldpath the target of the symlink
|
||||
* @param newpath the path of the symlink
|
||||
* @return the key of the created symlink
|
||||
@@ -700,9 +718,10 @@ public class DhfsFileService {
|
||||
|
||||
/**
|
||||
* Set the access and modification times of a file.
|
||||
*
|
||||
* @param fileUuid the ID of the file
|
||||
* @param atimeMs the access time in milliseconds
|
||||
* @param mtimeMs the modification time in milliseconds
|
||||
* @param atimeMs the access time in milliseconds
|
||||
* @param mtimeMs the modification time in milliseconds
|
||||
* @return true if the times were set successfully, false otherwise
|
||||
*/
|
||||
public Boolean setTimes(JObjectKey fileUuid, long atimeMs, long mtimeMs) {
|
||||
@@ -728,6 +747,7 @@ public class DhfsFileService {
|
||||
|
||||
/**
|
||||
* Get the size of a file.
|
||||
*
|
||||
* @param fileUuid the ID of the file
|
||||
* @return the size of the file
|
||||
*/
|
||||
@@ -752,9 +772,10 @@ public class DhfsFileService {
|
||||
|
||||
/**
|
||||
* Write data to a file.
|
||||
*
|
||||
* @param fileUuid the ID of the file
|
||||
* @param offset the offset to write to
|
||||
* @param data the data to write
|
||||
* @param offset the offset to write to
|
||||
* @param data the data to write
|
||||
* @return the number of bytes written
|
||||
*/
|
||||
public Long write(JObjectKey fileUuid, long offset, byte[] data) {
|
||||
|
||||
@@ -39,6 +39,9 @@ import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import static jnr.posix.FileStat.*;
|
||||
|
||||
/**
|
||||
* FUSE file system implementation.
|
||||
*/
|
||||
@ApplicationScoped
|
||||
public class DhfsFuse extends FuseStubFS {
|
||||
private static final int blksize = 1048576;
|
||||
@@ -56,6 +59,11 @@ public class DhfsFuse extends FuseStubFS {
|
||||
@Inject
|
||||
DhfsFileService fileService;
|
||||
|
||||
/**
|
||||
* Allocate a handle for the given key.
|
||||
* @param key the key to allocate a handle for
|
||||
* @return the allocated handle, not 0
|
||||
*/
|
||||
private long allocateHandle(JObjectKey key) {
|
||||
while (true) {
|
||||
var newFh = _fh.getAndIncrement();
|
||||
@@ -66,8 +74,14 @@ public class DhfsFuse extends FuseStubFS {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the key from the handle.
|
||||
* @param handle the handle to get the key from
|
||||
* @return the key, or null if not found
|
||||
*/
|
||||
private JObjectKey getFromHandle(long handle) {
|
||||
assert handle != 0;
|
||||
if(handle == 0)
|
||||
throw new IllegalStateException("Handle is 0");
|
||||
return _openHandles.get(handle);
|
||||
}
|
||||
|
||||
|
||||
@@ -7,6 +7,9 @@ import jnr.ffi.Pointer;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.MappedByteBuffer;
|
||||
|
||||
/**
|
||||
* JnrPtrByteOutput is a ByteOutput implementation that writes to a `jnr.ffi.Pointer`.
|
||||
*/
|
||||
public class JnrPtrByteOutput extends ByteOutput {
|
||||
private final Pointer _backing;
|
||||
private final long _size;
|
||||
|
||||
@@ -1,5 +1,8 @@
|
||||
package com.usatiuk.kleppmanntree;
|
||||
|
||||
/**
|
||||
* Exception thrown when an attempt is made to create a new tree node as a child with a name that already exists.
|
||||
*/
|
||||
public class AlreadyExistsException extends RuntimeException {
|
||||
public AlreadyExistsException(String message) {
|
||||
super(message);
|
||||
|
||||
@@ -1,9 +1,26 @@
|
||||
package com.usatiuk.kleppmanntree;
|
||||
|
||||
/**
|
||||
* Clock interface
|
||||
*/
|
||||
public interface Clock<TimestampT extends Comparable<TimestampT>> {
|
||||
/**
|
||||
* Increment and get the current timestamp.
|
||||
* @return the incremented timestamp
|
||||
*/
|
||||
TimestampT getTimestamp();
|
||||
|
||||
/**
|
||||
* Get the current timestamp without incrementing it.
|
||||
* @return the current timestamp
|
||||
*/
|
||||
TimestampT peekTimestamp();
|
||||
|
||||
/**
|
||||
* Update the timestamp with an externally received timestamp.
|
||||
* Will set the currently stored timestamp to <code>max(receivedTimestamp, currentTimestamp) + 1</code>
|
||||
* @param receivedTimestamp the received timestamp
|
||||
* @return the previous timestamp
|
||||
*/
|
||||
TimestampT updateTimestamp(TimestampT receivedTimestamp);
|
||||
}
|
||||
|
||||
@@ -3,6 +3,13 @@ package com.usatiuk.kleppmanntree;
|
||||
import java.io.Serializable;
|
||||
import java.util.Comparator;
|
||||
|
||||
/**
|
||||
* CombinedTimestamp is a record that represents a timestamp and a node ID, ordered first by timestamp and then by node ID.
|
||||
* @param timestamp the timestamp
|
||||
* @param nodeId the node ID. If null, then only the timestamp is used for ordering.
|
||||
* @param <TimestampT> the type of the timestamp
|
||||
* @param <PeerIdT> the type of the node ID
|
||||
*/
|
||||
public record CombinedTimestamp<TimestampT extends Comparable<TimestampT>, PeerIdT extends Comparable<PeerIdT>>
|
||||
(TimestampT timestamp,
|
||||
PeerIdT nodeId) implements Comparable<CombinedTimestamp<TimestampT, PeerIdT>>, Serializable {
|
||||
|
||||
@@ -8,6 +8,14 @@ import java.util.function.Function;
|
||||
import java.util.logging.Level;
|
||||
import java.util.logging.Logger;
|
||||
|
||||
/**
|
||||
* An implementation of a tree as described in <a href="https://martin.kleppmann.com/papers/move-op.pdf">A highly-available move operation for replicated trees</a>
|
||||
*
|
||||
* @param <TimestampT> Type of the timestamp
|
||||
* @param <PeerIdT> Type of the peer ID
|
||||
* @param <MetaT> Type of the node metadata
|
||||
* @param <NodeIdT> Type of the node ID
|
||||
*/
|
||||
public class KleppmannTree<TimestampT extends Comparable<TimestampT>, PeerIdT extends Comparable<PeerIdT>, MetaT extends NodeMeta, NodeIdT> {
|
||||
private static final Logger LOGGER = Logger.getLogger(KleppmannTree.class.getName());
|
||||
|
||||
@@ -16,6 +24,14 @@ public class KleppmannTree<TimestampT extends Comparable<TimestampT>, PeerIdT ex
|
||||
private final Clock<TimestampT> _clock;
|
||||
private final OpRecorder<TimestampT, PeerIdT, MetaT, NodeIdT> _opRecorder;
|
||||
|
||||
/**
|
||||
* Constructor with all the dependencies
|
||||
*
|
||||
* @param storage Storage interface
|
||||
* @param peers Peer interface
|
||||
* @param clock Clock interface
|
||||
* @param opRecorder Operation recorder interface
|
||||
*/
|
||||
public KleppmannTree(StorageInterface<TimestampT, PeerIdT, MetaT, NodeIdT> storage,
|
||||
PeerInterface<PeerIdT> peers,
|
||||
Clock<TimestampT> clock,
|
||||
@@ -26,6 +42,13 @@ public class KleppmannTree<TimestampT extends Comparable<TimestampT>, PeerIdT ex
|
||||
_opRecorder = opRecorder;
|
||||
}
|
||||
|
||||
/**
|
||||
* Traverse the tree from the given node ID using the given list of names
|
||||
*
|
||||
* @param fromId The starting node ID
|
||||
* @param names The list of names to traverse
|
||||
* @return The resulting node ID or null if not found
|
||||
*/
|
||||
private NodeIdT traverseImpl(NodeIdT fromId, List<String> names) {
|
||||
if (names.isEmpty()) return fromId;
|
||||
|
||||
@@ -39,14 +62,21 @@ public class KleppmannTree<TimestampT extends Comparable<TimestampT>, PeerIdT ex
|
||||
return traverseImpl(childId, names.subList(1, names.size()));
|
||||
}
|
||||
|
||||
public NodeIdT traverse(NodeIdT fromId, List<String> names) {
|
||||
return traverseImpl(fromId, names.subList(1, names.size()));
|
||||
}
|
||||
|
||||
/**
|
||||
* Traverse the tree from its root node using the given list of names
|
||||
*
|
||||
* @param names The list of names to traverse
|
||||
* @return The resulting node ID or null if not found
|
||||
*/
|
||||
public NodeIdT traverse(List<String> names) {
|
||||
return traverseImpl(_storage.getRootId(), names);
|
||||
}
|
||||
|
||||
/**
|
||||
* Undo the effect of a log effect
|
||||
*
|
||||
* @param effect The log effect to undo
|
||||
*/
|
||||
private void undoEffect(LogEffect<TimestampT, PeerIdT, MetaT, NodeIdT> effect) {
|
||||
if (effect.oldInfo() != null) {
|
||||
var node = _storage.getById(effect.childId());
|
||||
@@ -89,6 +119,11 @@ public class KleppmannTree<TimestampT extends Comparable<TimestampT>, PeerIdT ex
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Undo the effects of a log record
|
||||
*
|
||||
* @param op The log record to undo
|
||||
*/
|
||||
private void undoOp(LogRecord<TimestampT, PeerIdT, MetaT, NodeIdT> op) {
|
||||
LOGGER.finer(() -> "Will undo op: " + op);
|
||||
if (op.effects() != null)
|
||||
@@ -96,16 +131,32 @@ public class KleppmannTree<TimestampT extends Comparable<TimestampT>, PeerIdT ex
|
||||
undoEffect(e);
|
||||
}
|
||||
|
||||
/**
|
||||
* Redo the operation in a log record
|
||||
*
|
||||
* @param entry The log record to redo
|
||||
*/
|
||||
private void redoOp(Map.Entry<CombinedTimestamp<TimestampT, PeerIdT>, LogRecord<TimestampT, PeerIdT, MetaT, NodeIdT>> entry) {
|
||||
var newEffects = doOp(entry.getValue().op(), false);
|
||||
_storage.getLog().replace(entry.getKey(), newEffects);
|
||||
}
|
||||
|
||||
/**
|
||||
* Perform the operation and put it in the log
|
||||
*
|
||||
* @param op The operation to perform
|
||||
* @param failCreatingIfExists Whether to fail if there is a name conflict,
|
||||
* otherwise replace the existing node
|
||||
* @throws AlreadyExistsException If the node already exists and failCreatingIfExists is true
|
||||
*/
|
||||
private void doAndPut(OpMove<TimestampT, PeerIdT, MetaT, NodeIdT> op, boolean failCreatingIfExists) {
|
||||
var res = doOp(op, failCreatingIfExists);
|
||||
_storage.getLog().put(res.op().timestamp(), res);
|
||||
}
|
||||
|
||||
/**
|
||||
* Try to trim the log to the causality threshold
|
||||
*/
|
||||
private void tryTrimLog() {
|
||||
var log = _storage.getLog();
|
||||
var timeLog = _storage.getPeerTimestampLog();
|
||||
@@ -161,22 +212,52 @@ public class KleppmannTree<TimestampT extends Comparable<TimestampT>, PeerIdT ex
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Move a node to a new parent with new metadata
|
||||
*
|
||||
* @param newParent The new parent node ID
|
||||
* @param newMeta The new metadata
|
||||
* @param child The child node ID
|
||||
* @throws AlreadyExistsException If the node already exists and failCreatingIfExists is true
|
||||
*/
|
||||
public <LocalMetaT extends MetaT> void move(NodeIdT newParent, LocalMetaT newMeta, NodeIdT child) {
|
||||
move(newParent, newMeta, child, true);
|
||||
}
|
||||
|
||||
/**
|
||||
* Move a node to a new parent with new metadata
|
||||
*
|
||||
* @param newParent The new parent node ID
|
||||
* @param newMeta The new metadata
|
||||
* @param child The child node ID
|
||||
* @param failCreatingIfExists Whether to fail if there is a name conflict,
|
||||
* otherwise replace the existing node
|
||||
* @throws AlreadyExistsException If the node already exists and failCreatingIfExists is true
|
||||
*/
|
||||
public void move(NodeIdT newParent, MetaT newMeta, NodeIdT child, boolean failCreatingIfExists) {
|
||||
var createdMove = createMove(newParent, newMeta, child);
|
||||
applyOp(_peers.getSelfId(), createdMove, failCreatingIfExists);
|
||||
_opRecorder.recordOp(createdMove);
|
||||
}
|
||||
|
||||
/**
|
||||
* Apply an external operation from a remote peer
|
||||
*
|
||||
* @param from The peer ID
|
||||
* @param op The operation to apply
|
||||
*/
|
||||
public void applyExternalOp(PeerIdT from, OpMove<TimestampT, PeerIdT, MetaT, NodeIdT> op) {
|
||||
_clock.updateTimestamp(op.timestamp().timestamp());
|
||||
applyOp(from, op, false);
|
||||
}
|
||||
|
||||
// Returns true if the timestamp is newer than what's seen, false otherwise
|
||||
/**
|
||||
* Update the causality threshold timestamp for a peer
|
||||
*
|
||||
* @param from The peer ID
|
||||
* @param newTimestamp The timestamp received from it
|
||||
* @return True if the timestamp was updated, false otherwise
|
||||
*/
|
||||
private boolean updateTimestampImpl(PeerIdT from, TimestampT newTimestamp) {
|
||||
TimestampT oldRef = _storage.getPeerTimestampLog().getForPeer(from);
|
||||
if (oldRef != null && oldRef.compareTo(newTimestamp) >= 0) { // FIXME?
|
||||
@@ -187,6 +268,12 @@ public class KleppmannTree<TimestampT extends Comparable<TimestampT>, PeerIdT ex
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Update the causality threshold timestamp for a peer
|
||||
*
|
||||
* @param from The peer ID
|
||||
* @param timestamp The timestamp received from it
|
||||
*/
|
||||
public void updateExternalTimestamp(PeerIdT from, TimestampT timestamp) {
|
||||
var gotExt = _storage.getPeerTimestampLog().getForPeer(from);
|
||||
var gotSelf = _storage.getPeerTimestampLog().getForPeer(_peers.getSelfId());
|
||||
@@ -197,6 +284,15 @@ public class KleppmannTree<TimestampT extends Comparable<TimestampT>, PeerIdT ex
|
||||
tryTrimLog();
|
||||
}
|
||||
|
||||
/**
|
||||
* Apply an operation from a peer
|
||||
*
|
||||
* @param from The peer ID
|
||||
* @param op The operation to apply
|
||||
* @param failCreatingIfExists Whether to fail if there is a name conflict,
|
||||
* otherwise replace the existing node
|
||||
* @throws AlreadyExistsException If the node already exists and failCreatingIfExists is true
|
||||
*/
|
||||
private void applyOp(PeerIdT from, OpMove<TimestampT, PeerIdT, MetaT, NodeIdT> op, boolean failCreatingIfExists) {
|
||||
if (!updateTimestampImpl(op.timestamp().nodeId(), op.timestamp().timestamp())) return;
|
||||
|
||||
@@ -229,14 +325,36 @@ public class KleppmannTree<TimestampT extends Comparable<TimestampT>, PeerIdT ex
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a new timestamp, incrementing the one in storage
|
||||
*
|
||||
* @return A new timestamp
|
||||
*/
|
||||
private CombinedTimestamp<TimestampT, PeerIdT> getTimestamp() {
|
||||
return new CombinedTimestamp<>(_clock.getTimestamp(), _peers.getSelfId());
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a new move operation
|
||||
*
|
||||
* @param newParent The new parent node ID
|
||||
* @param newMeta The new metadata
|
||||
* @param node The child node ID
|
||||
* @return A new move operation
|
||||
*/
|
||||
private <LocalMetaT extends MetaT> OpMove<TimestampT, PeerIdT, LocalMetaT, NodeIdT> createMove(NodeIdT newParent, LocalMetaT newMeta, NodeIdT node) {
|
||||
return new OpMove<>(getTimestamp(), newParent, newMeta, node);
|
||||
}
|
||||
|
||||
/**
|
||||
* Perform the operation and return the log record
|
||||
*
|
||||
* @param op The operation to perform
|
||||
* @param failCreatingIfExists Whether to fail if there is a name conflict,
|
||||
* otherwise replace the existing node
|
||||
* @return The log record
|
||||
* @throws AlreadyExistsException If the node already exists and failCreatingIfExists is true
|
||||
*/
|
||||
private LogRecord<TimestampT, PeerIdT, MetaT, NodeIdT> doOp(OpMove<TimestampT, PeerIdT, MetaT, NodeIdT> op, boolean failCreatingIfExists) {
|
||||
LOGGER.finer(() -> "Doing op: " + op);
|
||||
LogRecord<TimestampT, PeerIdT, MetaT, NodeIdT> computed;
|
||||
@@ -253,10 +371,24 @@ public class KleppmannTree<TimestampT extends Comparable<TimestampT>, PeerIdT ex
|
||||
return computed;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a new node from storage
|
||||
*
|
||||
* @param key The node ID
|
||||
* @param parent The parent node ID
|
||||
* @param meta The metadata
|
||||
* @return A new tree node
|
||||
*/
|
||||
private TreeNode<TimestampT, PeerIdT, MetaT, NodeIdT> getNewNode(NodeIdT key, NodeIdT parent, MetaT meta) {
|
||||
return _storage.createNewNode(key, parent, meta);
|
||||
}
|
||||
|
||||
/**
|
||||
* Apply the effects of a log record
|
||||
*
|
||||
* @param sourceOp The source operation
|
||||
* @param effects The list of log effects
|
||||
*/
|
||||
private void applyEffects(OpMove<TimestampT, PeerIdT, MetaT, NodeIdT> sourceOp, List<LogEffect<TimestampT, PeerIdT, MetaT, NodeIdT>> effects) {
|
||||
for (var effect : effects) {
|
||||
LOGGER.finer(() -> "Applying effect: " + effect + " from op " + sourceOp);
|
||||
@@ -297,6 +429,15 @@ public class KleppmannTree<TimestampT extends Comparable<TimestampT>, PeerIdT ex
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Compute the effects of a move operation
|
||||
*
|
||||
* @param op The operation to process
|
||||
* @param failCreatingIfExists Whether to fail if there is a name conflict,
|
||||
* otherwise replace the existing node
|
||||
* @return The log record with the computed effects
|
||||
* @throws AlreadyExistsException If the node already exists and failCreatingIfExists is true
|
||||
*/
|
||||
private LogRecord<TimestampT, PeerIdT, MetaT, NodeIdT> computeEffects(OpMove<TimestampT, PeerIdT, MetaT, NodeIdT> op, boolean failCreatingIfExists) {
|
||||
var node = _storage.getById(op.childId());
|
||||
|
||||
@@ -380,6 +521,13 @@ public class KleppmannTree<TimestampT extends Comparable<TimestampT>, PeerIdT ex
|
||||
));
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if a node is an ancestor of another node
|
||||
*
|
||||
* @param child The child node ID
|
||||
* @param parent The parent node ID
|
||||
* @return True if the child is an ancestor of the parent, false otherwise
|
||||
*/
|
||||
private boolean isAncestor(NodeIdT child, NodeIdT parent) {
|
||||
var node = _storage.getById(parent);
|
||||
NodeIdT curParent;
|
||||
@@ -390,6 +538,11 @@ public class KleppmannTree<TimestampT extends Comparable<TimestampT>, PeerIdT ex
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Walk the tree and apply the given consumer to each node
|
||||
*
|
||||
* @param consumer The consumer to apply to each node
|
||||
*/
|
||||
public void walkTree(Consumer<TreeNode<TimestampT, PeerIdT, MetaT, NodeIdT>> consumer) {
|
||||
ArrayDeque<NodeIdT> queue = new ArrayDeque<>();
|
||||
queue.push(_storage.getRootId());
|
||||
@@ -403,6 +556,12 @@ public class KleppmannTree<TimestampT extends Comparable<TimestampT>, PeerIdT ex
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Find the parent of a node that matches the given predicate
|
||||
*
|
||||
* @param kidPredicate The predicate to match the child node
|
||||
* @return A pair containing the name of the child and the ID of the parent, or null if not found
|
||||
*/
|
||||
public Pair<String, NodeIdT> findParent(Function<TreeNode<TimestampT, PeerIdT, MetaT, NodeIdT>, Boolean> kidPredicate) {
|
||||
ArrayDeque<NodeIdT> queue = new ArrayDeque<>();
|
||||
queue.push(_storage.getRootId());
|
||||
@@ -423,6 +582,13 @@ public class KleppmannTree<TimestampT extends Comparable<TimestampT>, PeerIdT ex
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Record the bootstrap operations for a given peer
|
||||
* Will visit all nodes of the tree and add their effective operations to both the queue to be sent to the peer,
|
||||
* and to the global operation log.
|
||||
*
|
||||
* @param host The peer ID
|
||||
*/
|
||||
public void recordBoostrapFor(PeerIdT host) {
|
||||
TreeMap<CombinedTimestamp<TimestampT, PeerIdT>, OpMove<TimestampT, PeerIdT, MetaT, NodeIdT>> result = new TreeMap<>();
|
||||
|
||||
|
||||
@@ -2,6 +2,18 @@ package com.usatiuk.kleppmanntree;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
||||
/**
|
||||
* LogEffect is a record that represents the effect of a log entry on a tree node.
|
||||
* @param oldInfo the old information about the node, before it was moved. Null if the node did not exist before
|
||||
* @param effectiveOp the operation that had caused this effect to be applied
|
||||
* @param newParentId the ID of the new parent node
|
||||
* @param newMeta the new metadata of the node
|
||||
* @param childId the ID of the child node
|
||||
* @param <TimestampT> the type of the timestamp
|
||||
* @param <PeerIdT> the type of the peer ID
|
||||
* @param <MetaT> the type of the node metadata
|
||||
* @param <NodeIdT> the type of the node ID
|
||||
*/
|
||||
public record LogEffect<TimestampT extends Comparable<TimestampT>, PeerIdT extends Comparable<PeerIdT>, MetaT extends NodeMeta, NodeIdT>(
|
||||
LogEffectOld<TimestampT, PeerIdT, MetaT, NodeIdT> oldInfo,
|
||||
OpMove<TimestampT, PeerIdT, MetaT, NodeIdT> effectiveOp,
|
||||
|
||||
@@ -2,6 +2,16 @@ package com.usatiuk.kleppmanntree;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
||||
/**
|
||||
* Represents the old information about a node before it was moved.
|
||||
* @param oldEffectiveMove the old effective move that had caused this effect to be applied
|
||||
* @param oldParent the ID of the old parent node
|
||||
* @param oldMeta the old metadata of the node
|
||||
* @param <TimestampT> the type of the timestamp
|
||||
* @param <PeerIdT> the type of the peer ID
|
||||
* @param <MetaT> the type of the node metadata
|
||||
* @param <NodeIdT> the type of the node ID
|
||||
*/
|
||||
public record LogEffectOld<TimestampT extends Comparable<TimestampT>, PeerIdT extends Comparable<PeerIdT>, MetaT extends NodeMeta, NodeIdT>
|
||||
(OpMove<TimestampT, PeerIdT, MetaT, NodeIdT> oldEffectiveMove,
|
||||
NodeIdT oldParent,
|
||||
|
||||
@@ -4,29 +4,82 @@ import org.apache.commons.lang3.tuple.Pair;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* LogInterface is an interface that allows accessing the log of operations
|
||||
* @param <TimestampT> the type of the timestamp
|
||||
* @param <PeerIdT> the type of the peer ID
|
||||
* @param <MetaT> the type of the node metadata
|
||||
* @param <NodeIdT> the type of the node ID
|
||||
*/
|
||||
public interface LogInterface<
|
||||
TimestampT extends Comparable<TimestampT>,
|
||||
PeerIdT extends Comparable<PeerIdT>,
|
||||
MetaT extends NodeMeta,
|
||||
NodeIdT> {
|
||||
/**
|
||||
* Peek the oldest log entry.
|
||||
* @return the oldest log entry
|
||||
*/
|
||||
Pair<CombinedTimestamp<TimestampT, PeerIdT>, LogRecord<TimestampT, PeerIdT, MetaT, NodeIdT>> peekOldest();
|
||||
|
||||
/**
|
||||
* Take the oldest log entry.
|
||||
* @return the oldest log entry
|
||||
*/
|
||||
Pair<CombinedTimestamp<TimestampT, PeerIdT>, LogRecord<TimestampT, PeerIdT, MetaT, NodeIdT>> takeOldest();
|
||||
|
||||
/**
|
||||
* Peek the newest log entry.
|
||||
* @return the newest log entry
|
||||
*/
|
||||
Pair<CombinedTimestamp<TimestampT, PeerIdT>, LogRecord<TimestampT, PeerIdT, MetaT, NodeIdT>> peekNewest();
|
||||
|
||||
/**
|
||||
* Return all log entries that are newer than the given timestamp.
|
||||
* @param since the timestamp to compare with
|
||||
* @param inclusive if true, include the log entry with the given timestamp
|
||||
* @return a list of log entries that are newer than the given timestamp
|
||||
*/
|
||||
List<Pair<CombinedTimestamp<TimestampT, PeerIdT>, LogRecord<TimestampT, PeerIdT, MetaT, NodeIdT>>>
|
||||
newestSlice(CombinedTimestamp<TimestampT, PeerIdT> since, boolean inclusive);
|
||||
|
||||
/**
|
||||
* Return all the log entries
|
||||
* @return a list of all log entries
|
||||
*/
|
||||
List<Pair<CombinedTimestamp<TimestampT, PeerIdT>, LogRecord<TimestampT, PeerIdT, MetaT, NodeIdT>>> getAll();
|
||||
|
||||
/**
|
||||
* Checks if the log is empty.
|
||||
* @return true if the log is empty, false otherwise
|
||||
*/
|
||||
boolean isEmpty();
|
||||
|
||||
/**
|
||||
* Checks if the log contains the given timestamp.
|
||||
* @param timestamp the timestamp to check
|
||||
* @return true if the log contains the given timestamp, false otherwise
|
||||
*/
|
||||
boolean containsKey(CombinedTimestamp<TimestampT, PeerIdT> timestamp);
|
||||
|
||||
/**
|
||||
* Get the size of the log.
|
||||
* @return the size of the log (number of entries)
|
||||
*/
|
||||
long size();
|
||||
|
||||
/**
|
||||
* Add a log entry to the log.
|
||||
* @param timestamp the timestamp of the log entry
|
||||
* @param record the log entry
|
||||
* @throws IllegalStateException if the log entry already exists
|
||||
*/
|
||||
void put(CombinedTimestamp<TimestampT, PeerIdT> timestamp, LogRecord<TimestampT, PeerIdT, MetaT, NodeIdT> record);
|
||||
|
||||
/**
|
||||
* Replace a log entry in the log.
|
||||
* @param timestamp the timestamp of the log entry
|
||||
* @param record the log entry
|
||||
*/
|
||||
void replace(CombinedTimestamp<TimestampT, PeerIdT> timestamp, LogRecord<TimestampT, PeerIdT, MetaT, NodeIdT> record);
|
||||
}
|
||||
|
||||
@@ -3,6 +3,15 @@ package com.usatiuk.kleppmanntree;
|
||||
import java.io.Serializable;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Represents a log record in the Kleppmann tree.
|
||||
* @param op the operation that is stored in this log record
|
||||
* @param effects the effects of the operation (resulting moves)
|
||||
* @param <TimestampT> the type of the timestamp
|
||||
* @param <PeerIdT> the type of the peer ID
|
||||
* @param <MetaT> the type of the node metadata
|
||||
* @param <NodeIdT> the type of the node ID
|
||||
*/
|
||||
public record LogRecord<TimestampT extends Comparable<TimestampT>, PeerIdT extends Comparable<PeerIdT>, MetaT extends NodeMeta, NodeIdT>
|
||||
(OpMove<TimestampT, PeerIdT, MetaT, NodeIdT> op,
|
||||
List<LogEffect<TimestampT, PeerIdT, MetaT, NodeIdT>> effects) implements Serializable {
|
||||
|
||||
@@ -2,8 +2,24 @@ package com.usatiuk.kleppmanntree;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
||||
/**
|
||||
* Represents metadata associated with a node in the Kleppmann tree.
|
||||
* This interface is used to define the metadata that can be associated with nodes in the tree.
|
||||
* Implementations of this interface should provide a name for the node and a method to create a copy of it with a new name.
|
||||
*/
|
||||
public interface NodeMeta extends Serializable {
|
||||
/**
|
||||
* Returns the name of the node.
|
||||
*
|
||||
* @return the name of the node
|
||||
*/
|
||||
String name();
|
||||
|
||||
/**
|
||||
* Creates a copy of the metadata with a new name.
|
||||
*
|
||||
* @param name the new name for the metadata
|
||||
* @return a new instance of NodeMeta with the specified name
|
||||
*/
|
||||
NodeMeta withName(String name);
|
||||
}
|
||||
|
||||
@@ -2,9 +2,27 @@ package com.usatiuk.kleppmanntree;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
||||
/**
|
||||
* Operation that moves a child node to a new parent node.
|
||||
*
|
||||
* @param timestamp the timestamp of the operation
|
||||
* @param newParentId the ID of the new parent node
|
||||
* @param newMeta the new metadata of the node, can be null
|
||||
* @param childId the ID of the child node (the node that is being moved)
|
||||
* @param <TimestampT> the type of the timestamp
|
||||
* @param <PeerIdT> the type of the peer ID
|
||||
* @param <MetaT> the type of the node metadata
|
||||
* @param <NodeIdT> the type of the node ID
|
||||
*/
|
||||
public record OpMove<TimestampT extends Comparable<TimestampT>, PeerIdT extends Comparable<PeerIdT>, MetaT extends NodeMeta, NodeIdT>
|
||||
(CombinedTimestamp<TimestampT, PeerIdT> timestamp, NodeIdT newParentId, MetaT newMeta,
|
||||
NodeIdT childId) implements Serializable {
|
||||
/**
|
||||
* Returns the new name of the node: name extracted from the new metadata if available,
|
||||
* otherwise the child ID converted to string.
|
||||
*
|
||||
* @return the new name of the node
|
||||
*/
|
||||
public String newName() {
|
||||
if (newMeta != null)
|
||||
return newMeta.name();
|
||||
|
||||
@@ -1,7 +1,26 @@
|
||||
package com.usatiuk.kleppmanntree;
|
||||
|
||||
/**
|
||||
* Interface to provide recording operations to be sent to peers asynchronously.
|
||||
* @param <TimestampT> the type of the timestamp
|
||||
* @param <PeerIdT> the type of the peer ID
|
||||
* @param <MetaT> the type of the node metadata
|
||||
* @param <NodeIdT> the type of the node ID
|
||||
*/
|
||||
public interface OpRecorder<TimestampT extends Comparable<TimestampT>, PeerIdT extends Comparable<PeerIdT>, MetaT extends NodeMeta, NodeIdT> {
|
||||
/**
|
||||
* Records an operation to be sent to peers asynchronously.
|
||||
* The operation will be sent to all known peers in the system.
|
||||
*
|
||||
* @param op the operation to be recorded
|
||||
*/
|
||||
void recordOp(OpMove<TimestampT, PeerIdT, MetaT, NodeIdT> op);
|
||||
|
||||
/**
|
||||
* Records an operation to be sent to a specific peer asynchronously.
|
||||
*
|
||||
* @param peer the ID of the peer to send the operation to
|
||||
* @param op the operation to be recorded
|
||||
*/
|
||||
void recordOpForPeer(PeerIdT peer, OpMove<TimestampT, PeerIdT, MetaT, NodeIdT> op);
|
||||
}
|
||||
|
||||
@@ -2,8 +2,22 @@ package com.usatiuk.kleppmanntree;
|
||||
|
||||
import java.util.Collection;
|
||||
|
||||
/**
|
||||
* Interface providing access to a list of known peers.
|
||||
* @param <PeerIdT> the type of the peer ID
|
||||
*/
|
||||
public interface PeerInterface<PeerIdT extends Comparable<PeerIdT>> {
|
||||
/**
|
||||
* Returns the ID of the current peer.
|
||||
*
|
||||
* @return the ID of the current peer
|
||||
*/
|
||||
PeerIdT getSelfId();
|
||||
|
||||
/**
|
||||
* Returns a collection of all known peers.
|
||||
*
|
||||
* @return a collection of all known peers
|
||||
*/
|
||||
Collection<PeerIdT> getAllPeers();
|
||||
}
|
||||
|
||||
@@ -1,11 +1,26 @@
|
||||
package com.usatiuk.kleppmanntree;
|
||||
|
||||
/**
|
||||
* Interface providing a map of newest received timestamps for each peer. (causality thresholds)
|
||||
* If a peer has some timestamp recorded in this map,
|
||||
* it means that all messages coming from this peer will have a newer timestamp.
|
||||
* @param <TimestampT>
|
||||
* @param <PeerIdT>
|
||||
*/
|
||||
public interface PeerTimestampLogInterface<
|
||||
TimestampT extends Comparable<TimestampT>,
|
||||
PeerIdT extends Comparable<PeerIdT>> {
|
||||
|
||||
/**
|
||||
* Get the timestamp for a specific peer.
|
||||
* @param peerId the ID of the peer
|
||||
* @return the timestamp for the peer
|
||||
*/
|
||||
TimestampT getForPeer(PeerIdT peerId);
|
||||
|
||||
/**
|
||||
* Get the timestamp for the current peer.
|
||||
*/
|
||||
void putForPeer(PeerIdT peerId, TimestampT timestamp);
|
||||
|
||||
}
|
||||
|
||||
@@ -1,28 +1,89 @@
|
||||
package com.usatiuk.kleppmanntree;
|
||||
|
||||
/**
|
||||
* Storage interface for the Kleppmann tree.
|
||||
*
|
||||
* @param <TimestampT> the type of the timestamp
|
||||
* @param <PeerIdT> the type of the peer ID
|
||||
* @param <MetaT> the type of the node metadata
|
||||
* @param <NodeIdT> the type of the node ID
|
||||
*/
|
||||
public interface StorageInterface<
|
||||
TimestampT extends Comparable<TimestampT>,
|
||||
PeerIdT extends Comparable<PeerIdT>,
|
||||
MetaT extends NodeMeta,
|
||||
NodeIdT> {
|
||||
/**
|
||||
* Get the root node ID.
|
||||
*
|
||||
* @return the root node IDx
|
||||
*/
|
||||
NodeIdT getRootId();
|
||||
|
||||
/**
|
||||
* Get the trash node ID.
|
||||
*
|
||||
* @return the trash node ID
|
||||
*/
|
||||
NodeIdT getTrashId();
|
||||
|
||||
/**
|
||||
* Get the lost and found node ID.
|
||||
*
|
||||
* @return the lost and found node ID
|
||||
*/
|
||||
NodeIdT getLostFoundId();
|
||||
|
||||
/**
|
||||
* Get the new node ID.
|
||||
*
|
||||
* @return the new node ID
|
||||
*/
|
||||
NodeIdT getNewNodeId();
|
||||
|
||||
/**
|
||||
* Get the node by its ID.
|
||||
*
|
||||
* @param id the ID of the node
|
||||
* @return the node with the specified ID, or null if not found
|
||||
*/
|
||||
TreeNode<TimestampT, PeerIdT, MetaT, NodeIdT> getById(NodeIdT id);
|
||||
|
||||
// Creates a node, returned wrapper is RW-locked
|
||||
/**
|
||||
* Create a new node with the specified key, parent, and metadata.
|
||||
*
|
||||
* @param key the ID of the new node
|
||||
* @param parent the ID of the parent node
|
||||
* @param meta the metadata of the new node
|
||||
* @return the new node
|
||||
*/
|
||||
TreeNode<TimestampT, PeerIdT, MetaT, NodeIdT> createNewNode(NodeIdT key, NodeIdT parent, MetaT meta);
|
||||
|
||||
/**
|
||||
* Put a node into the storage.
|
||||
*
|
||||
* @param node the node to put into the storage
|
||||
*/
|
||||
void putNode(TreeNode<TimestampT, PeerIdT, MetaT, NodeIdT> node);
|
||||
|
||||
/**
|
||||
* Remove a node from the storage.
|
||||
*
|
||||
* @param id the ID of the node to remove
|
||||
*/
|
||||
void removeNode(NodeIdT id);
|
||||
|
||||
/**
|
||||
* Get the log interface.
|
||||
*
|
||||
* @return the log interface
|
||||
*/
|
||||
LogInterface<TimestampT, PeerIdT, MetaT, NodeIdT> getLog();
|
||||
|
||||
/**
|
||||
* Get the peer timestamp log interface.
|
||||
*
|
||||
* @return the peer timestamp log interface
|
||||
*/
|
||||
PeerTimestampLogInterface<TimestampT, PeerIdT> getPeerTimestampLog();
|
||||
}
|
||||
|
||||
@@ -5,29 +5,92 @@ import org.pcollections.PMap;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
||||
/**
|
||||
* Represents a node in the Kleppmann tree.
|
||||
*
|
||||
* @param <TimestampT> the type of the timestamp
|
||||
* @param <PeerIdT> the type of the peer ID
|
||||
* @param <MetaT> the type of the node metadata
|
||||
* @param <NodeIdT> the type of the node ID
|
||||
*/
|
||||
public interface TreeNode<TimestampT extends Comparable<TimestampT>, PeerIdT extends Comparable<PeerIdT>, MetaT extends NodeMeta, NodeIdT> extends Serializable {
|
||||
/**
|
||||
* Get the ID of the node.
|
||||
*
|
||||
* @return the ID of the node
|
||||
*/
|
||||
NodeIdT key();
|
||||
|
||||
/**
|
||||
* Get the ID of the parent node.
|
||||
*
|
||||
* @return the ID of the parent node
|
||||
*/
|
||||
NodeIdT parent();
|
||||
|
||||
/**
|
||||
* Get the last effective operation that moved this node.
|
||||
*
|
||||
* @return the last effective operation
|
||||
*/
|
||||
OpMove<TimestampT, PeerIdT, MetaT, NodeIdT> lastEffectiveOp();
|
||||
|
||||
/**
|
||||
* Get the metadata stored in this node.
|
||||
*
|
||||
* @return the metadata of the node
|
||||
*/
|
||||
@Nullable
|
||||
MetaT meta();
|
||||
|
||||
/**
|
||||
* Get the name of the node.
|
||||
* If the node has metadata, the name is extracted from it, otherwise the key is converted to string.
|
||||
*
|
||||
* @return the name of the node
|
||||
*/
|
||||
default String name() {
|
||||
var meta = meta();
|
||||
if (meta != null) return meta.name();
|
||||
return key().toString();
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the children of this node.
|
||||
*
|
||||
* @return a map of child IDs to their respective nodes
|
||||
*/
|
||||
PMap<String, NodeIdT> children();
|
||||
|
||||
/**
|
||||
* Make a copy of this node with a new parent.
|
||||
*
|
||||
* @param parent the ID of the new parent node
|
||||
* @return a new TreeNode instance with the updated parent
|
||||
*/
|
||||
TreeNode<TimestampT, PeerIdT, MetaT, NodeIdT> withParent(NodeIdT parent);
|
||||
|
||||
/**
|
||||
* Make a copy of this node with a new last effective operation.
|
||||
*
|
||||
* @param lastEffectiveOp the new last effective operation
|
||||
* @return a new TreeNode instance with the updated last effective operation
|
||||
*/
|
||||
TreeNode<TimestampT, PeerIdT, MetaT, NodeIdT> withLastEffectiveOp(OpMove<TimestampT, PeerIdT, MetaT, NodeIdT> lastEffectiveOp);
|
||||
|
||||
/**
|
||||
* Make a copy of this node with new metadata.
|
||||
*
|
||||
* @param meta the new metadata
|
||||
* @return a new TreeNode instance with the updated metadata
|
||||
*/
|
||||
TreeNode<TimestampT, PeerIdT, MetaT, NodeIdT> withMeta(MetaT meta);
|
||||
|
||||
/**
|
||||
* Make a copy of this node with new children.
|
||||
*
|
||||
* @param children the new children
|
||||
* @return a new TreeNode instance with the updated children
|
||||
*/
|
||||
TreeNode<TimestampT, PeerIdT, MetaT, NodeIdT> withChildren(PMap<String, NodeIdT> children);
|
||||
}
|
||||
|
||||
@@ -4,19 +4,63 @@ import org.apache.commons.lang3.tuple.Pair;
|
||||
|
||||
import java.util.Iterator;
|
||||
|
||||
|
||||
/**
|
||||
* An iterator over key-value pairs that can be closed and supports peek and skip operations, in both directions.
|
||||
* @param <K> the type of the keys
|
||||
* @param <V> the type of the values
|
||||
*/
|
||||
public interface CloseableKvIterator<K extends Comparable<? super K>, V> extends Iterator<Pair<K, V>>, AutoCloseable {
|
||||
/**
|
||||
* Returns the upcoming key in the forward direction without advancing the iterator.
|
||||
*
|
||||
* @return the current key
|
||||
* @throws IllegalStateException if there is no next element
|
||||
*/
|
||||
K peekNextKey();
|
||||
|
||||
/**
|
||||
* Skips the next element in the forward direction.
|
||||
*
|
||||
* @throws IllegalStateException if there is no next element
|
||||
*/
|
||||
void skip();
|
||||
|
||||
/**
|
||||
* Checks if there is a next element in the forward direction.
|
||||
*
|
||||
* @return true if there is a next element, false otherwise
|
||||
* @throws IllegalStateException if there is no next element
|
||||
*/
|
||||
K peekPrevKey();
|
||||
|
||||
/**
|
||||
* Returns the key-value pair in the reverse direction, and advances the iterator.
|
||||
*
|
||||
* @return the previous key-value pair
|
||||
* @throws IllegalStateException if there is no previous element
|
||||
*/
|
||||
Pair<K, V> prev();
|
||||
|
||||
/**
|
||||
* Checks if there is a previous element in the reverse direction.
|
||||
*
|
||||
* @return true if there is a previous element, false otherwise
|
||||
*/
|
||||
boolean hasPrev();
|
||||
|
||||
/**
|
||||
* Skips the previous element in the reverse direction.
|
||||
*
|
||||
* @throws IllegalStateException if there is no previous element
|
||||
*/
|
||||
void skipPrev();
|
||||
|
||||
/**
|
||||
* Returns a reversed iterator that iterates in the reverse direction.
|
||||
*
|
||||
* @return a new CloseableKvIterator that iterates in the reverse direction
|
||||
*/
|
||||
default CloseableKvIterator<K, V> reversed() {
|
||||
return new ReversedKvIterator<K, V>(this);
|
||||
}
|
||||
|
||||
@@ -1,5 +1,13 @@
|
||||
package com.usatiuk.objects.iterators;
|
||||
|
||||
/**
|
||||
* Interface indicating that data is present.
|
||||
* @param <V> the type of the value
|
||||
*/
|
||||
public interface Data<V> extends MaybeTombstone<V> {
|
||||
/**
|
||||
* Get the value.
|
||||
* @return the value
|
||||
*/
|
||||
V value();
|
||||
}
|
||||
|
||||
@@ -1,4 +1,9 @@
|
||||
package com.usatiuk.objects.iterators;
|
||||
|
||||
/**
|
||||
* Simple implementation of the Data interface.
|
||||
* @param value the value
|
||||
* @param <V> the type of the value
|
||||
*/
|
||||
public record DataWrapper<V>(V value) implements Data<V> {
|
||||
}
|
||||
|
||||
@@ -1,5 +1,8 @@
|
||||
package com.usatiuk.objects.iterators;
|
||||
|
||||
/**
|
||||
* Allows to specify initial positioning of the iterator relative to the requested key.
|
||||
*/
|
||||
public enum IteratorStart {
|
||||
LT,
|
||||
LE,
|
||||
|
||||
@@ -5,11 +5,25 @@ import org.apache.commons.lang3.tuple.Pair;
|
||||
import java.util.NoSuchElementException;
|
||||
import java.util.function.Function;
|
||||
|
||||
/**
|
||||
* A key-value iterator that filters keys based on a predicate.
|
||||
*
|
||||
* @param <K> the type of the keys
|
||||
* @param <V> the type of the values
|
||||
*/
|
||||
public class KeyPredicateKvIterator<K extends Comparable<K>, V> extends ReversibleKvIterator<K, V> {
|
||||
private final CloseableKvIterator<K, V> _backing;
|
||||
private final Function<K, Boolean> _filter;
|
||||
private K _next;
|
||||
|
||||
/**
|
||||
* Constructs a KeyPredicateKvIterator with the specified backing iterator, start position, and filter.
|
||||
*
|
||||
* @param backing the backing iterator
|
||||
* @param start the starting position relative to the startKey
|
||||
* @param startKey the starting key
|
||||
* @param filter the filter function to apply to keys. Only keys for which this function returns true will be included in the iteration.
|
||||
*/
|
||||
public KeyPredicateKvIterator(CloseableKvIterator<K, V> backing, IteratorStart start, K startKey, Function<K, Boolean> filter) {
|
||||
_goingForward = true;
|
||||
_backing = backing;
|
||||
|
||||
@@ -4,10 +4,23 @@ import org.apache.commons.lang3.tuple.Pair;
|
||||
|
||||
import java.util.function.Function;
|
||||
|
||||
/**
|
||||
* A mapping key-value iterator that transforms the values of a backing iterator using a specified function.
|
||||
*
|
||||
* @param <K> the type of the keys
|
||||
* @param <V> the type of the values in the backing iterator
|
||||
* @param <V_T> the type of the transformed values
|
||||
*/
|
||||
public class MappingKvIterator<K extends Comparable<K>, V, V_T> implements CloseableKvIterator<K, V_T> {
|
||||
private final CloseableKvIterator<K, V> _backing;
|
||||
private final Function<V, V_T> _transformer;
|
||||
|
||||
/**
|
||||
* Constructs a MappingKvIterator with the specified backing iterator and transformer function.
|
||||
*
|
||||
* @param backing the backing iterator
|
||||
* @param transformer the function to transform values
|
||||
*/
|
||||
public MappingKvIterator(CloseableKvIterator<K, V> backing, Function<V, V_T> transformer) {
|
||||
_backing = backing;
|
||||
_transformer = transformer;
|
||||
|
||||
@@ -1,4 +1,8 @@
|
||||
package com.usatiuk.objects.iterators;
|
||||
|
||||
/**
|
||||
* Optional-like interface, can either be {@link Data} or {@link Tombstone}.
|
||||
* @param <T> the type of the value
|
||||
*/
|
||||
public interface MaybeTombstone<T> {
|
||||
}
|
||||
|
||||
@@ -9,10 +9,25 @@ import java.util.NavigableMap;
|
||||
import java.util.NoSuchElementException;
|
||||
import java.util.TreeMap;
|
||||
|
||||
/**
|
||||
* A merging key-value iterator that combines multiple iterators into a single iterator.
|
||||
*
|
||||
* @param <K> the type of the keys
|
||||
* @param <V> the type of the values
|
||||
*/
|
||||
public class MergingKvIterator<K extends Comparable<K>, V> extends ReversibleKvIterator<K, V> {
|
||||
private final NavigableMap<K, IteratorEntry<K, V>> _sortedIterators = new TreeMap<>();
|
||||
private final List<IteratorEntry<K, V>> _iterators;
|
||||
|
||||
/**
|
||||
* Constructs a MergingKvIterator with the specified start type, start key, and list of iterators.
|
||||
* The iterators have priority based on their order in the list: if two iterators have the same key,
|
||||
* the one that is in the beginning of the list will be used.
|
||||
*
|
||||
* @param startType the starting position relative to the startKey
|
||||
* @param startKey the starting key
|
||||
* @param iterators the list of iterators to merge
|
||||
*/
|
||||
public MergingKvIterator(IteratorStart startType, K startKey, List<CloseableKvIterator<K, V>> iterators) {
|
||||
_goingForward = true;
|
||||
|
||||
@@ -88,6 +103,15 @@ public class MergingKvIterator<K extends Comparable<K>, V> extends ReversibleKvI
|
||||
// }
|
||||
}
|
||||
|
||||
/**
|
||||
* Constructs a MergingKvIterator with the specified start type, start key, and array of iterators.
|
||||
* The iterators have priority based on their order in the array: if two iterators have the same key,
|
||||
* the one that is in the beginning of the array will be used.
|
||||
*
|
||||
* @param startType the starting position relative to the startKey
|
||||
* @param startKey the starting key
|
||||
* @param iterators the array of iterators to merge
|
||||
*/
|
||||
@SafeVarargs
|
||||
public MergingKvIterator(IteratorStart startType, K startKey, CloseableKvIterator<K, V>... iterators) {
|
||||
this(startType, startKey, List.of(iterators));
|
||||
|
||||
@@ -4,11 +4,25 @@ import org.apache.commons.lang3.tuple.Pair;
|
||||
|
||||
import java.util.*;
|
||||
|
||||
/**
|
||||
* A key-value iterator for a {@link NavigableMap}.
|
||||
* It allows iterating over the keys and values in a sorted order.
|
||||
*
|
||||
* @param <K> the type of the keys
|
||||
* @param <V> the type of the values
|
||||
*/
|
||||
public class NavigableMapKvIterator<K extends Comparable<K>, V> extends ReversibleKvIterator<K, V> {
|
||||
private final NavigableMap<K, V> _map;
|
||||
private Iterator<Map.Entry<K, V>> _iterator;
|
||||
private Map.Entry<K, V> _next;
|
||||
|
||||
/**
|
||||
* Constructs a NavigableMapKvIterator with the specified map, start type, and start key.
|
||||
*
|
||||
* @param map the map to iterate over
|
||||
* @param start the starting position relative to the startKey
|
||||
* @param key the starting key
|
||||
*/
|
||||
public NavigableMapKvIterator(NavigableMap<K, ? extends V> map, IteratorStart start, K key) {
|
||||
_map = (NavigableMap<K, V>) map;
|
||||
SortedMap<K, V> _view;
|
||||
|
||||
@@ -2,9 +2,19 @@ package com.usatiuk.objects.iterators;
|
||||
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
|
||||
/**
|
||||
* A wrapper for a key-value iterator that iterates in reverse order.
|
||||
* @param <K> the type of the keys
|
||||
* @param <V> the type of the values
|
||||
*/
|
||||
public class ReversedKvIterator<K extends Comparable<? super K>, V> implements CloseableKvIterator<K, V> {
|
||||
private final CloseableKvIterator<K, V> _backing;
|
||||
|
||||
/**
|
||||
* Constructs a ReversedKvIterator with the specified backing iterator.
|
||||
*
|
||||
* @param backing the backing iterator
|
||||
*/
|
||||
public ReversedKvIterator(CloseableKvIterator<K, V> backing) {
|
||||
_backing = backing;
|
||||
}
|
||||
|
||||
@@ -2,9 +2,21 @@ package com.usatiuk.objects.iterators;
|
||||
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
|
||||
/**
|
||||
* Base class for a reversible key-value iterator.
|
||||
*
|
||||
* @param <K> the type of the keys
|
||||
* @param <V> the type of the values
|
||||
*/
|
||||
public abstract class ReversibleKvIterator<K extends Comparable<K>, V> implements CloseableKvIterator<K, V> {
|
||||
/**
|
||||
* The current direction of the iterator.
|
||||
*/
|
||||
protected boolean _goingForward;
|
||||
|
||||
/**
|
||||
* Reverses the current direction of the iterator.
|
||||
*/
|
||||
protected abstract void reverse();
|
||||
|
||||
private void ensureForward() {
|
||||
@@ -19,12 +31,33 @@ public abstract class ReversibleKvIterator<K extends Comparable<K>, V> implement
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Fills the next element in the iterator, depending on the current direction.
|
||||
*
|
||||
* @throws IllegalStateException if there is no next element
|
||||
*/
|
||||
abstract protected K peekImpl();
|
||||
|
||||
/**
|
||||
* Skips the next element in the iterator, depending on the current direction.
|
||||
*
|
||||
* @throws IllegalStateException if there is no next element
|
||||
*/
|
||||
abstract protected void skipImpl();
|
||||
|
||||
/**
|
||||
* Checks if there is a next element in the iterator, depending on the current direction.
|
||||
*
|
||||
* @return true if there is a next element, false otherwise
|
||||
*/
|
||||
abstract protected boolean hasImpl();
|
||||
|
||||
/**
|
||||
* Returns the next element in the iterator, depending on the current direction.
|
||||
*
|
||||
* @return the next element
|
||||
* @throws IllegalStateException if there is no next element
|
||||
*/
|
||||
abstract protected Pair<K, V> nextImpl();
|
||||
|
||||
@Override
|
||||
|
||||
@@ -1,4 +1,8 @@
|
||||
package com.usatiuk.objects.iterators;
|
||||
|
||||
/**
|
||||
* Indicates that the value is a tombstone.
|
||||
* @param <V> the type of the value
|
||||
*/
|
||||
public interface Tombstone<V> extends MaybeTombstone<V> {
|
||||
}
|
||||
|
||||
@@ -1,4 +1,8 @@
|
||||
package com.usatiuk.objects.iterators;
|
||||
|
||||
/**
|
||||
* Simple implementation of the Tombstone interface.
|
||||
* @param <V> the type of the value
|
||||
*/
|
||||
public record TombstoneImpl<V>() implements Tombstone<V> {
|
||||
}
|
||||
|
||||
@@ -6,11 +6,25 @@ import java.util.List;
|
||||
import java.util.NoSuchElementException;
|
||||
import java.util.function.Function;
|
||||
|
||||
/**
|
||||
* A key-value iterator that skips tombstones.
|
||||
*
|
||||
* @param <K> the type of the keys
|
||||
* @param <V> the type of the values
|
||||
*/
|
||||
public class TombstoneSkippingIterator<K extends Comparable<K>, V> extends ReversibleKvIterator<K, V> {
|
||||
private final MergingKvIterator<K, MaybeTombstone<V>> _backing;
|
||||
private Pair<K, V> _next = null;
|
||||
private boolean _checkedNext = false;
|
||||
|
||||
/**
|
||||
* Constructs a TombstoneSkippingIterator with the specified start position, start key, and list of iterators.
|
||||
* Like {@link MappingKvIterator}, iterators have a priority depending on their order in the list.
|
||||
*
|
||||
* @param start the starting position relative to the startKey
|
||||
* @param startKey the starting key
|
||||
* @param iterators the list of iterators to merge
|
||||
*/
|
||||
public TombstoneSkippingIterator(IteratorStart start, K startKey, List<CloseableKvIterator<K, MaybeTombstone<V>>> iterators) {
|
||||
_goingForward = true;
|
||||
_backing = new MergingKvIterator<>(start, startKey, iterators);
|
||||
|
||||
@@ -20,8 +20,8 @@ public class PeerSyncApiClientDynamic {
|
||||
private ApiPeerInfo getSelfInfo(String address, int port) {
|
||||
var client = QuarkusRestClientBuilder.newBuilder()
|
||||
.baseUri(URI.create("http://" + address + ":" + port))
|
||||
.connectTimeout(5, TimeUnit.SECONDS)
|
||||
.readTimeout(5, TimeUnit.SECONDS)
|
||||
.connectTimeout(1, TimeUnit.SECONDS)
|
||||
.readTimeout(1, TimeUnit.SECONDS)
|
||||
.build(PeerSyncApiClient.class);
|
||||
return client.getSelfInfo();
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user