8 Commits

Author SHA1 Message Date
7784d975d7 Utils: javadocs 2025-05-13 22:55:25 +02:00
d0b45177dd Sync-base: more javadocs 2 2025-05-13 22:50:24 +02:00
838405fb46 Sync-base: more javadocs 2025-05-13 22:20:13 +02:00
dbad8a2b22 Objects: javadocs 2025-05-13 20:53:44 +02:00
66dabdef25 a couple more javadocs 2025-05-13 20:13:28 +02:00
87e127bdfb KleppmannTree javadocs
and some more
2025-05-13 15:55:33 +02:00
fd62543687 CI: reenable rest of CI 2025-05-12 16:15:51 +02:00
757a0bbc8a javadocs github pages (#7)
* javadocs github pages

* fix

* fix

* proper pages
2025-05-12 16:13:42 +02:00
140 changed files with 2093 additions and 149 deletions

View File

@@ -236,3 +236,37 @@ jobs:
with:
name: Run wrapper
path: ~/run-wrapper.tar.gz
publish-javadoc:
environment:
name: github-pages
url: ${{ steps.deployment.outputs.page_url }}
runs-on: ubuntu-latest
permissions:
contents: read
pages: write
id-token: write
needs: [build-webui, build-dhfs]
steps:
- name: Checkout repository
uses: actions/checkout@v4
- uses: actions/download-artifact@v4
with:
name: DHFS Javadocs
path: dhfs-javadocs-downloaded
- name: Setup Pages
uses: actions/configure-pages@v5
- name: Upload artifact
uses: actions/upload-pages-artifact@v3
with:
# Upload entire repository
path: 'dhfs-javadocs-downloaded'
- name: Deploy to GitHub Pages
id: deployment
uses: actions/deploy-pages@v4

View File

@@ -107,6 +107,7 @@ public class DhfsFileService {
/**
* Get the attributes of a file or directory.
*
* @param uuid the UUID of the file or directory
* @return the attributes of the file or directory
*/
@@ -133,6 +134,7 @@ public class DhfsFileService {
/**
* Try to resolve a path to a file or directory.
*
* @param name the path to resolve
* @return the key of the file or directory, or an empty optional if it does not exist
*/
@@ -161,6 +163,7 @@ public class DhfsFileService {
/**
* Create a new file with the given name and mode.
*
* @param name the name of the file
* @param mode the mode of the file
* @return the key of the created file
@@ -191,6 +194,7 @@ public class DhfsFileService {
/**
* Get the parent directory of a file or directory.
*
* @param ino the key of the file or directory
* @return the parent directory
*/
@@ -207,6 +211,7 @@ public class DhfsFileService {
/**
* Create a new directory with the given name and mode.
*
* @param name the name of the directory
* @param mode the mode of the directory
*/
@@ -227,6 +232,7 @@ public class DhfsFileService {
/**
* Unlink a file or directory.
*
* @param name the name of the file or directory
* @throws DirectoryNotEmptyException if the directory is not empty and recursive delete is not allowed
*/
@@ -245,8 +251,9 @@ public class DhfsFileService {
/**
* Rename a file or directory.
*
* @param from the old name
* @param to the new name
* @param to the new name
* @return true if the rename was successful, false otherwise
*/
public Boolean rename(String from, String to) {
@@ -265,6 +272,7 @@ public class DhfsFileService {
/**
* Change the mode of a file or directory.
*
* @param uuid the ID of the file or directory
* @param mode the new mode
* @return true if the mode was changed successfully, false otherwise
@@ -291,6 +299,7 @@ public class DhfsFileService {
/**
* Read the contents of a directory.
*
* @param name the path of the directory
* @return an iterable of the names of the files in the directory
*/
@@ -307,9 +316,10 @@ public class DhfsFileService {
/**
* Read the contents of a file.
*
* @param fileUuid the ID of the file
* @param offset the offset to start reading from
* @param length the number of bytes to read
* @param offset the offset to start reading from
* @param length the number of bytes to read
* @return the contents of the file as a ByteString
*/
public ByteString read(JObjectKey fileUuid, long offset, int length) {
@@ -375,6 +385,7 @@ public class DhfsFileService {
/**
* Get the size of a file.
*
* @param uuid the ID of the file
* @return the size of the file
*/
@@ -391,6 +402,7 @@ public class DhfsFileService {
/**
* Get the size of a chunk.
*
* @param uuid the ID of the chunk
* @return the size of the chunk
*/
@@ -404,9 +416,10 @@ public class DhfsFileService {
/**
* Write data to a file.
*
* @param fileUuid the ID of the file
* @param offset the offset to write to
* @param data the data to write
* @param offset the offset to write to
* @param data the data to write
* @return the number of bytes written
*/
public Long write(JObjectKey fileUuid, long offset, ByteString data) {
@@ -513,8 +526,9 @@ public class DhfsFileService {
/**
* Truncate a file to the given length.
*
* @param fileUuid the ID of the file
* @param length the new length of the file
* @param length the new length of the file
* @return true if the truncate was successful, false otherwise
*/
public Boolean truncate(JObjectKey fileUuid, long length) {
@@ -608,8 +622,9 @@ public class DhfsFileService {
/**
* Fill the given range with zeroes.
*
* @param fillStart the start of the range
* @param length the end of the range
* @param length the end of the range
* @param newChunks the map to store the new chunks in
*/
private void fillZeros(long fillStart, long length, Map<Long, JObjectKey> newChunks) {
@@ -649,6 +664,7 @@ public class DhfsFileService {
/**
* Read the contents of a symlink.
*
* @param uuid the ID of the symlink
* @return the contents of the symlink as a string
*/
@@ -660,6 +676,7 @@ public class DhfsFileService {
/**
* Read the contents of a symlink as a ByteString.
*
* @param uuid the ID of the symlink
* @return the contents of the symlink as a ByteString
*/
@@ -672,6 +689,7 @@ public class DhfsFileService {
/**
* Create a symlink.
*
* @param oldpath the target of the symlink
* @param newpath the path of the symlink
* @return the key of the created symlink
@@ -700,9 +718,10 @@ public class DhfsFileService {
/**
* Set the access and modification times of a file.
*
* @param fileUuid the ID of the file
* @param atimeMs the access time in milliseconds
* @param mtimeMs the modification time in milliseconds
* @param atimeMs the access time in milliseconds
* @param mtimeMs the modification time in milliseconds
* @return true if the times were set successfully, false otherwise
*/
public Boolean setTimes(JObjectKey fileUuid, long atimeMs, long mtimeMs) {
@@ -728,6 +747,7 @@ public class DhfsFileService {
/**
* Get the size of a file.
*
* @param fileUuid the ID of the file
* @return the size of the file
*/
@@ -752,9 +772,10 @@ public class DhfsFileService {
/**
* Write data to a file.
*
* @param fileUuid the ID of the file
* @param offset the offset to write to
* @param data the data to write
* @param offset the offset to write to
* @param data the data to write
* @return the number of bytes written
*/
public Long write(JObjectKey fileUuid, long offset, byte[] data) {

View File

@@ -139,7 +139,7 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-failsafe-plugin</artifactId>
<configuration>
<forkCount>0.5C</forkCount>
<forkCount>1C</forkCount>
<reuseForks>false</reuseForks>
<parallel>classes</parallel>
<systemPropertyVariables>

View File

@@ -39,6 +39,9 @@ import java.util.concurrent.atomic.AtomicLong;
import static jnr.posix.FileStat.*;
/**
* FUSE file system implementation.
*/
@ApplicationScoped
public class DhfsFuse extends FuseStubFS {
private static final int blksize = 1048576;
@@ -56,6 +59,11 @@ public class DhfsFuse extends FuseStubFS {
@Inject
DhfsFileService fileService;
/**
* Allocate a handle for the given key.
* @param key the key to allocate a handle for
* @return the allocated handle, not 0
*/
private long allocateHandle(JObjectKey key) {
while (true) {
var newFh = _fh.getAndIncrement();
@@ -66,8 +74,14 @@ public class DhfsFuse extends FuseStubFS {
}
}
/**
* Get the key from the handle.
* @param handle the handle to get the key from
* @return the key, or null if not found
*/
private JObjectKey getFromHandle(long handle) {
assert handle != 0;
if(handle == 0)
throw new IllegalStateException("Handle is 0");
return _openHandles.get(handle);
}

View File

@@ -7,6 +7,9 @@ import jnr.ffi.Pointer;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
/**
* JnrPtrByteOutput is a ByteOutput implementation that writes to a `jnr.ffi.Pointer`.
*/
public class JnrPtrByteOutput extends ByteOutput {
private final Pointer _backing;
private final long _size;

View File

@@ -1,5 +1,8 @@
package com.usatiuk.kleppmanntree;
/**
* Exception thrown when an attempt is made to create a new tree node as a child with a name that already exists.
*/
public class AlreadyExistsException extends RuntimeException {
public AlreadyExistsException(String message) {
super(message);

View File

@@ -1,32 +0,0 @@
package com.usatiuk.kleppmanntree;
import java.io.Serializable;
public class AtomicClock implements Clock<Long>, Serializable {
private long _max = 0;
public AtomicClock(long counter) {
_max = counter;
}
@Override
public Long getTimestamp() {
return ++_max;
}
public void setTimestamp(Long timestamp) {
_max = timestamp;
}
@Override
public Long peekTimestamp() {
return _max;
}
@Override
public Long updateTimestamp(Long receivedTimestamp) {
var old = _max;
_max = Math.max(_max, receivedTimestamp) + 1;
return old;
}
}

View File

@@ -1,9 +1,26 @@
package com.usatiuk.kleppmanntree;
/**
* Clock interface
*/
public interface Clock<TimestampT extends Comparable<TimestampT>> {
/**
* Increment and get the current timestamp.
* @return the incremented timestamp
*/
TimestampT getTimestamp();
/**
* Get the current timestamp without incrementing it.
* @return the current timestamp
*/
TimestampT peekTimestamp();
/**
* Update the timestamp with an externally received timestamp.
* Will set the currently stored timestamp to <code>max(receivedTimestamp, currentTimestamp) + 1</code>
* @param receivedTimestamp the received timestamp
* @return the previous timestamp
*/
TimestampT updateTimestamp(TimestampT receivedTimestamp);
}

View File

@@ -3,6 +3,13 @@ package com.usatiuk.kleppmanntree;
import java.io.Serializable;
import java.util.Comparator;
/**
* CombinedTimestamp is a record that represents a timestamp and a node ID, ordered first by timestamp and then by node ID.
* @param timestamp the timestamp
* @param nodeId the node ID. If null, then only the timestamp is used for ordering.
* @param <TimestampT> the type of the timestamp
* @param <PeerIdT> the type of the node ID
*/
public record CombinedTimestamp<TimestampT extends Comparable<TimestampT>, PeerIdT extends Comparable<PeerIdT>>
(TimestampT timestamp,
PeerIdT nodeId) implements Comparable<CombinedTimestamp<TimestampT, PeerIdT>>, Serializable {

View File

@@ -8,6 +8,14 @@ import java.util.function.Function;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* An implementation of a tree as described in <a href="https://martin.kleppmann.com/papers/move-op.pdf">A highly-available move operation for replicated trees</a>
*
* @param <TimestampT> Type of the timestamp
* @param <PeerIdT> Type of the peer ID
* @param <MetaT> Type of the node metadata
* @param <NodeIdT> Type of the node ID
*/
public class KleppmannTree<TimestampT extends Comparable<TimestampT>, PeerIdT extends Comparable<PeerIdT>, MetaT extends NodeMeta, NodeIdT> {
private static final Logger LOGGER = Logger.getLogger(KleppmannTree.class.getName());
@@ -16,6 +24,14 @@ public class KleppmannTree<TimestampT extends Comparable<TimestampT>, PeerIdT ex
private final Clock<TimestampT> _clock;
private final OpRecorder<TimestampT, PeerIdT, MetaT, NodeIdT> _opRecorder;
/**
* Constructor with all the dependencies
*
* @param storage Storage interface
* @param peers Peer interface
* @param clock Clock interface
* @param opRecorder Operation recorder interface
*/
public KleppmannTree(StorageInterface<TimestampT, PeerIdT, MetaT, NodeIdT> storage,
PeerInterface<PeerIdT> peers,
Clock<TimestampT> clock,
@@ -26,6 +42,13 @@ public class KleppmannTree<TimestampT extends Comparable<TimestampT>, PeerIdT ex
_opRecorder = opRecorder;
}
/**
* Traverse the tree from the given node ID using the given list of names
*
* @param fromId The starting node ID
* @param names The list of names to traverse
* @return The resulting node ID or null if not found
*/
private NodeIdT traverseImpl(NodeIdT fromId, List<String> names) {
if (names.isEmpty()) return fromId;
@@ -39,14 +62,21 @@ public class KleppmannTree<TimestampT extends Comparable<TimestampT>, PeerIdT ex
return traverseImpl(childId, names.subList(1, names.size()));
}
public NodeIdT traverse(NodeIdT fromId, List<String> names) {
return traverseImpl(fromId, names.subList(1, names.size()));
}
/**
* Traverse the tree from its root node using the given list of names
*
* @param names The list of names to traverse
* @return The resulting node ID or null if not found
*/
public NodeIdT traverse(List<String> names) {
return traverseImpl(_storage.getRootId(), names);
}
/**
* Undo the effect of a log effect
*
* @param effect The log effect to undo
*/
private void undoEffect(LogEffect<TimestampT, PeerIdT, MetaT, NodeIdT> effect) {
if (effect.oldInfo() != null) {
var node = _storage.getById(effect.childId());
@@ -89,6 +119,11 @@ public class KleppmannTree<TimestampT extends Comparable<TimestampT>, PeerIdT ex
}
}
/**
* Undo the effects of a log record
*
* @param op The log record to undo
*/
private void undoOp(LogRecord<TimestampT, PeerIdT, MetaT, NodeIdT> op) {
LOGGER.finer(() -> "Will undo op: " + op);
if (op.effects() != null)
@@ -96,16 +131,32 @@ public class KleppmannTree<TimestampT extends Comparable<TimestampT>, PeerIdT ex
undoEffect(e);
}
/**
* Redo the operation in a log record
*
* @param entry The log record to redo
*/
private void redoOp(Map.Entry<CombinedTimestamp<TimestampT, PeerIdT>, LogRecord<TimestampT, PeerIdT, MetaT, NodeIdT>> entry) {
var newEffects = doOp(entry.getValue().op(), false);
_storage.getLog().replace(entry.getKey(), newEffects);
}
/**
* Perform the operation and put it in the log
*
* @param op The operation to perform
* @param failCreatingIfExists Whether to fail if there is a name conflict,
* otherwise replace the existing node
* @throws AlreadyExistsException If the node already exists and failCreatingIfExists is true
*/
private void doAndPut(OpMove<TimestampT, PeerIdT, MetaT, NodeIdT> op, boolean failCreatingIfExists) {
var res = doOp(op, failCreatingIfExists);
_storage.getLog().put(res.op().timestamp(), res);
}
/**
* Try to trim the log to the causality threshold
*/
private void tryTrimLog() {
var log = _storage.getLog();
var timeLog = _storage.getPeerTimestampLog();
@@ -161,22 +212,52 @@ public class KleppmannTree<TimestampT extends Comparable<TimestampT>, PeerIdT ex
}
}
/**
* Move a node to a new parent with new metadata
*
* @param newParent The new parent node ID
* @param newMeta The new metadata
* @param child The child node ID
* @throws AlreadyExistsException If the node already exists and failCreatingIfExists is true
*/
public <LocalMetaT extends MetaT> void move(NodeIdT newParent, LocalMetaT newMeta, NodeIdT child) {
move(newParent, newMeta, child, true);
}
/**
* Move a node to a new parent with new metadata
*
* @param newParent The new parent node ID
* @param newMeta The new metadata
* @param child The child node ID
* @param failCreatingIfExists Whether to fail if there is a name conflict,
* otherwise replace the existing node
* @throws AlreadyExistsException If the node already exists and failCreatingIfExists is true
*/
public void move(NodeIdT newParent, MetaT newMeta, NodeIdT child, boolean failCreatingIfExists) {
var createdMove = createMove(newParent, newMeta, child);
applyOp(_peers.getSelfId(), createdMove, failCreatingIfExists);
_opRecorder.recordOp(createdMove);
}
/**
* Apply an external operation from a remote peer
*
* @param from The peer ID
* @param op The operation to apply
*/
public void applyExternalOp(PeerIdT from, OpMove<TimestampT, PeerIdT, MetaT, NodeIdT> op) {
_clock.updateTimestamp(op.timestamp().timestamp());
applyOp(from, op, false);
}
// Returns true if the timestamp is newer than what's seen, false otherwise
/**
* Update the causality threshold timestamp for a peer
*
* @param from The peer ID
* @param newTimestamp The timestamp received from it
* @return True if the timestamp was updated, false otherwise
*/
private boolean updateTimestampImpl(PeerIdT from, TimestampT newTimestamp) {
TimestampT oldRef = _storage.getPeerTimestampLog().getForPeer(from);
if (oldRef != null && oldRef.compareTo(newTimestamp) >= 0) { // FIXME?
@@ -187,6 +268,12 @@ public class KleppmannTree<TimestampT extends Comparable<TimestampT>, PeerIdT ex
return true;
}
/**
* Update the causality threshold timestamp for a peer
*
* @param from The peer ID
* @param timestamp The timestamp received from it
*/
public void updateExternalTimestamp(PeerIdT from, TimestampT timestamp) {
var gotExt = _storage.getPeerTimestampLog().getForPeer(from);
var gotSelf = _storage.getPeerTimestampLog().getForPeer(_peers.getSelfId());
@@ -197,6 +284,15 @@ public class KleppmannTree<TimestampT extends Comparable<TimestampT>, PeerIdT ex
tryTrimLog();
}
/**
* Apply an operation from a peer
*
* @param from The peer ID
* @param op The operation to apply
* @param failCreatingIfExists Whether to fail if there is a name conflict,
* otherwise replace the existing node
* @throws AlreadyExistsException If the node already exists and failCreatingIfExists is true
*/
private void applyOp(PeerIdT from, OpMove<TimestampT, PeerIdT, MetaT, NodeIdT> op, boolean failCreatingIfExists) {
if (!updateTimestampImpl(op.timestamp().nodeId(), op.timestamp().timestamp())) return;
@@ -229,14 +325,36 @@ public class KleppmannTree<TimestampT extends Comparable<TimestampT>, PeerIdT ex
}
}
/**
* Get a new timestamp, incrementing the one in storage
*
* @return A new timestamp
*/
private CombinedTimestamp<TimestampT, PeerIdT> getTimestamp() {
return new CombinedTimestamp<>(_clock.getTimestamp(), _peers.getSelfId());
}
/**
* Create a new move operation
*
* @param newParent The new parent node ID
* @param newMeta The new metadata
* @param node The child node ID
* @return A new move operation
*/
private <LocalMetaT extends MetaT> OpMove<TimestampT, PeerIdT, LocalMetaT, NodeIdT> createMove(NodeIdT newParent, LocalMetaT newMeta, NodeIdT node) {
return new OpMove<>(getTimestamp(), newParent, newMeta, node);
}
/**
* Perform the operation and return the log record
*
* @param op The operation to perform
* @param failCreatingIfExists Whether to fail if there is a name conflict,
* otherwise replace the existing node
* @return The log record
* @throws AlreadyExistsException If the node already exists and failCreatingIfExists is true
*/
private LogRecord<TimestampT, PeerIdT, MetaT, NodeIdT> doOp(OpMove<TimestampT, PeerIdT, MetaT, NodeIdT> op, boolean failCreatingIfExists) {
LOGGER.finer(() -> "Doing op: " + op);
LogRecord<TimestampT, PeerIdT, MetaT, NodeIdT> computed;
@@ -253,10 +371,24 @@ public class KleppmannTree<TimestampT extends Comparable<TimestampT>, PeerIdT ex
return computed;
}
/**
* Get a new node from storage
*
* @param key The node ID
* @param parent The parent node ID
* @param meta The metadata
* @return A new tree node
*/
private TreeNode<TimestampT, PeerIdT, MetaT, NodeIdT> getNewNode(NodeIdT key, NodeIdT parent, MetaT meta) {
return _storage.createNewNode(key, parent, meta);
}
/**
* Apply the effects of a log record
*
* @param sourceOp The source operation
* @param effects The list of log effects
*/
private void applyEffects(OpMove<TimestampT, PeerIdT, MetaT, NodeIdT> sourceOp, List<LogEffect<TimestampT, PeerIdT, MetaT, NodeIdT>> effects) {
for (var effect : effects) {
LOGGER.finer(() -> "Applying effect: " + effect + " from op " + sourceOp);
@@ -297,6 +429,15 @@ public class KleppmannTree<TimestampT extends Comparable<TimestampT>, PeerIdT ex
}
}
/**
* Compute the effects of a move operation
*
* @param op The operation to process
* @param failCreatingIfExists Whether to fail if there is a name conflict,
* otherwise replace the existing node
* @return The log record with the computed effects
* @throws AlreadyExistsException If the node already exists and failCreatingIfExists is true
*/
private LogRecord<TimestampT, PeerIdT, MetaT, NodeIdT> computeEffects(OpMove<TimestampT, PeerIdT, MetaT, NodeIdT> op, boolean failCreatingIfExists) {
var node = _storage.getById(op.childId());
@@ -380,6 +521,13 @@ public class KleppmannTree<TimestampT extends Comparable<TimestampT>, PeerIdT ex
));
}
/**
* Check if a node is an ancestor of another node
*
* @param child The child node ID
* @param parent The parent node ID
* @return True if the child is an ancestor of the parent, false otherwise
*/
private boolean isAncestor(NodeIdT child, NodeIdT parent) {
var node = _storage.getById(parent);
NodeIdT curParent;
@@ -390,6 +538,11 @@ public class KleppmannTree<TimestampT extends Comparable<TimestampT>, PeerIdT ex
return false;
}
/**
* Walk the tree and apply the given consumer to each node
*
* @param consumer The consumer to apply to each node
*/
public void walkTree(Consumer<TreeNode<TimestampT, PeerIdT, MetaT, NodeIdT>> consumer) {
ArrayDeque<NodeIdT> queue = new ArrayDeque<>();
queue.push(_storage.getRootId());
@@ -403,6 +556,12 @@ public class KleppmannTree<TimestampT extends Comparable<TimestampT>, PeerIdT ex
}
}
/**
* Find the parent of a node that matches the given predicate
*
* @param kidPredicate The predicate to match the child node
* @return A pair containing the name of the child and the ID of the parent, or null if not found
*/
public Pair<String, NodeIdT> findParent(Function<TreeNode<TimestampT, PeerIdT, MetaT, NodeIdT>, Boolean> kidPredicate) {
ArrayDeque<NodeIdT> queue = new ArrayDeque<>();
queue.push(_storage.getRootId());
@@ -423,6 +582,13 @@ public class KleppmannTree<TimestampT extends Comparable<TimestampT>, PeerIdT ex
return null;
}
/**
* Record the bootstrap operations for a given peer
* Will visit all nodes of the tree and add their effective operations to both the queue to be sent to the peer,
* and to the global operation log.
*
* @param host The peer ID
*/
public void recordBoostrapFor(PeerIdT host) {
TreeMap<CombinedTimestamp<TimestampT, PeerIdT>, OpMove<TimestampT, PeerIdT, MetaT, NodeIdT>> result = new TreeMap<>();

View File

@@ -2,6 +2,18 @@ package com.usatiuk.kleppmanntree;
import java.io.Serializable;
/**
* LogEffect is a record that represents the effect of a log entry on a tree node.
* @param oldInfo the old information about the node, before it was moved. Null if the node did not exist before
* @param effectiveOp the operation that had caused this effect to be applied
* @param newParentId the ID of the new parent node
* @param newMeta the new metadata of the node
* @param childId the ID of the child node
* @param <TimestampT> the type of the timestamp
* @param <PeerIdT> the type of the peer ID
* @param <MetaT> the type of the node metadata
* @param <NodeIdT> the type of the node ID
*/
public record LogEffect<TimestampT extends Comparable<TimestampT>, PeerIdT extends Comparable<PeerIdT>, MetaT extends NodeMeta, NodeIdT>(
LogEffectOld<TimestampT, PeerIdT, MetaT, NodeIdT> oldInfo,
OpMove<TimestampT, PeerIdT, MetaT, NodeIdT> effectiveOp,

View File

@@ -2,6 +2,16 @@ package com.usatiuk.kleppmanntree;
import java.io.Serializable;
/**
* Represents the old information about a node before it was moved.
* @param oldEffectiveMove the old effective move that had caused this effect to be applied
* @param oldParent the ID of the old parent node
* @param oldMeta the old metadata of the node
* @param <TimestampT> the type of the timestamp
* @param <PeerIdT> the type of the peer ID
* @param <MetaT> the type of the node metadata
* @param <NodeIdT> the type of the node ID
*/
public record LogEffectOld<TimestampT extends Comparable<TimestampT>, PeerIdT extends Comparable<PeerIdT>, MetaT extends NodeMeta, NodeIdT>
(OpMove<TimestampT, PeerIdT, MetaT, NodeIdT> oldEffectiveMove,
NodeIdT oldParent,

View File

@@ -4,29 +4,82 @@ import org.apache.commons.lang3.tuple.Pair;
import java.util.List;
/**
* LogInterface is an interface that allows accessing the log of operations
* @param <TimestampT> the type of the timestamp
* @param <PeerIdT> the type of the peer ID
* @param <MetaT> the type of the node metadata
* @param <NodeIdT> the type of the node ID
*/
public interface LogInterface<
TimestampT extends Comparable<TimestampT>,
PeerIdT extends Comparable<PeerIdT>,
MetaT extends NodeMeta,
NodeIdT> {
/**
* Peek the oldest log entry.
* @return the oldest log entry
*/
Pair<CombinedTimestamp<TimestampT, PeerIdT>, LogRecord<TimestampT, PeerIdT, MetaT, NodeIdT>> peekOldest();
/**
* Take the oldest log entry.
* @return the oldest log entry
*/
Pair<CombinedTimestamp<TimestampT, PeerIdT>, LogRecord<TimestampT, PeerIdT, MetaT, NodeIdT>> takeOldest();
/**
* Peek the newest log entry.
* @return the newest log entry
*/
Pair<CombinedTimestamp<TimestampT, PeerIdT>, LogRecord<TimestampT, PeerIdT, MetaT, NodeIdT>> peekNewest();
/**
* Return all log entries that are newer than the given timestamp.
* @param since the timestamp to compare with
* @param inclusive if true, include the log entry with the given timestamp
* @return a list of log entries that are newer than the given timestamp
*/
List<Pair<CombinedTimestamp<TimestampT, PeerIdT>, LogRecord<TimestampT, PeerIdT, MetaT, NodeIdT>>>
newestSlice(CombinedTimestamp<TimestampT, PeerIdT> since, boolean inclusive);
/**
* Return all the log entries
* @return a list of all log entries
*/
List<Pair<CombinedTimestamp<TimestampT, PeerIdT>, LogRecord<TimestampT, PeerIdT, MetaT, NodeIdT>>> getAll();
/**
* Checks if the log is empty.
* @return true if the log is empty, false otherwise
*/
boolean isEmpty();
/**
* Checks if the log contains the given timestamp.
* @param timestamp the timestamp to check
* @return true if the log contains the given timestamp, false otherwise
*/
boolean containsKey(CombinedTimestamp<TimestampT, PeerIdT> timestamp);
/**
* Get the size of the log.
* @return the size of the log (number of entries)
*/
long size();
/**
* Add a log entry to the log.
* @param timestamp the timestamp of the log entry
* @param record the log entry
* @throws IllegalStateException if the log entry already exists
*/
void put(CombinedTimestamp<TimestampT, PeerIdT> timestamp, LogRecord<TimestampT, PeerIdT, MetaT, NodeIdT> record);
/**
* Replace a log entry in the log.
* @param timestamp the timestamp of the log entry
* @param record the log entry
*/
void replace(CombinedTimestamp<TimestampT, PeerIdT> timestamp, LogRecord<TimestampT, PeerIdT, MetaT, NodeIdT> record);
}

View File

@@ -3,6 +3,15 @@ package com.usatiuk.kleppmanntree;
import java.io.Serializable;
import java.util.List;
/**
* Represents a log record in the Kleppmann tree.
* @param op the operation that is stored in this log record
* @param effects the effects of the operation (resulting moves)
* @param <TimestampT> the type of the timestamp
* @param <PeerIdT> the type of the peer ID
* @param <MetaT> the type of the node metadata
* @param <NodeIdT> the type of the node ID
*/
public record LogRecord<TimestampT extends Comparable<TimestampT>, PeerIdT extends Comparable<PeerIdT>, MetaT extends NodeMeta, NodeIdT>
(OpMove<TimestampT, PeerIdT, MetaT, NodeIdT> op,
List<LogEffect<TimestampT, PeerIdT, MetaT, NodeIdT>> effects) implements Serializable {

View File

@@ -2,8 +2,24 @@ package com.usatiuk.kleppmanntree;
import java.io.Serializable;
/**
* Represents metadata associated with a node in the Kleppmann tree.
* This interface is used to define the metadata that can be associated with nodes in the tree.
* Implementations of this interface should provide a name for the node and a method to create a copy of it with a new name.
*/
public interface NodeMeta extends Serializable {
/**
* Returns the name of the node.
*
* @return the name of the node
*/
String name();
/**
* Creates a copy of the metadata with a new name.
*
* @param name the new name for the metadata
* @return a new instance of NodeMeta with the specified name
*/
NodeMeta withName(String name);
}

View File

@@ -2,9 +2,27 @@ package com.usatiuk.kleppmanntree;
import java.io.Serializable;
/**
* Operation that moves a child node to a new parent node.
*
* @param timestamp the timestamp of the operation
* @param newParentId the ID of the new parent node
* @param newMeta the new metadata of the node, can be null
* @param childId the ID of the child node (the node that is being moved)
* @param <TimestampT> the type of the timestamp
* @param <PeerIdT> the type of the peer ID
* @param <MetaT> the type of the node metadata
* @param <NodeIdT> the type of the node ID
*/
public record OpMove<TimestampT extends Comparable<TimestampT>, PeerIdT extends Comparable<PeerIdT>, MetaT extends NodeMeta, NodeIdT>
(CombinedTimestamp<TimestampT, PeerIdT> timestamp, NodeIdT newParentId, MetaT newMeta,
NodeIdT childId) implements Serializable {
/**
* Returns the new name of the node: name extracted from the new metadata if available,
* otherwise the child ID converted to string.
*
* @return the new name of the node
*/
public String newName() {
if (newMeta != null)
return newMeta.name();

View File

@@ -1,7 +1,26 @@
package com.usatiuk.kleppmanntree;
/**
* Interface to provide recording operations to be sent to peers asynchronously.
* @param <TimestampT> the type of the timestamp
* @param <PeerIdT> the type of the peer ID
* @param <MetaT> the type of the node metadata
* @param <NodeIdT> the type of the node ID
*/
public interface OpRecorder<TimestampT extends Comparable<TimestampT>, PeerIdT extends Comparable<PeerIdT>, MetaT extends NodeMeta, NodeIdT> {
/**
* Records an operation to be sent to peers asynchronously.
* The operation will be sent to all known peers in the system.
*
* @param op the operation to be recorded
*/
void recordOp(OpMove<TimestampT, PeerIdT, MetaT, NodeIdT> op);
/**
* Records an operation to be sent to a specific peer asynchronously.
*
* @param peer the ID of the peer to send the operation to
* @param op the operation to be recorded
*/
void recordOpForPeer(PeerIdT peer, OpMove<TimestampT, PeerIdT, MetaT, NodeIdT> op);
}

View File

@@ -2,8 +2,22 @@ package com.usatiuk.kleppmanntree;
import java.util.Collection;
/**
* Interface providing access to a list of known peers.
* @param <PeerIdT> the type of the peer ID
*/
public interface PeerInterface<PeerIdT extends Comparable<PeerIdT>> {
/**
* Returns the ID of the current peer.
*
* @return the ID of the current peer
*/
PeerIdT getSelfId();
/**
* Returns a collection of all known peers.
*
* @return a collection of all known peers
*/
Collection<PeerIdT> getAllPeers();
}

View File

@@ -1,11 +1,26 @@
package com.usatiuk.kleppmanntree;
/**
* Interface providing a map of newest received timestamps for each peer. (causality thresholds)
* If a peer has some timestamp recorded in this map,
* it means that all messages coming from this peer will have a newer timestamp.
* @param <TimestampT>
* @param <PeerIdT>
*/
public interface PeerTimestampLogInterface<
TimestampT extends Comparable<TimestampT>,
PeerIdT extends Comparable<PeerIdT>> {
/**
* Get the timestamp for a specific peer.
* @param peerId the ID of the peer
* @return the timestamp for the peer
*/
TimestampT getForPeer(PeerIdT peerId);
/**
* Get the timestamp for the current peer.
*/
void putForPeer(PeerIdT peerId, TimestampT timestamp);
}

View File

@@ -1,28 +1,89 @@
package com.usatiuk.kleppmanntree;
/**
* Storage interface for the Kleppmann tree.
*
* @param <TimestampT> the type of the timestamp
* @param <PeerIdT> the type of the peer ID
* @param <MetaT> the type of the node metadata
* @param <NodeIdT> the type of the node ID
*/
public interface StorageInterface<
TimestampT extends Comparable<TimestampT>,
PeerIdT extends Comparable<PeerIdT>,
MetaT extends NodeMeta,
NodeIdT> {
/**
* Get the root node ID.
*
* @return the root node IDx
*/
NodeIdT getRootId();
/**
* Get the trash node ID.
*
* @return the trash node ID
*/
NodeIdT getTrashId();
/**
* Get the lost and found node ID.
*
* @return the lost and found node ID
*/
NodeIdT getLostFoundId();
/**
* Get the new node ID.
*
* @return the new node ID
*/
NodeIdT getNewNodeId();
/**
* Get the node by its ID.
*
* @param id the ID of the node
* @return the node with the specified ID, or null if not found
*/
TreeNode<TimestampT, PeerIdT, MetaT, NodeIdT> getById(NodeIdT id);
// Creates a node, returned wrapper is RW-locked
/**
* Create a new node with the specified key, parent, and metadata.
*
* @param key the ID of the new node
* @param parent the ID of the parent node
* @param meta the metadata of the new node
* @return the new node
*/
TreeNode<TimestampT, PeerIdT, MetaT, NodeIdT> createNewNode(NodeIdT key, NodeIdT parent, MetaT meta);
/**
* Put a node into the storage.
*
* @param node the node to put into the storage
*/
void putNode(TreeNode<TimestampT, PeerIdT, MetaT, NodeIdT> node);
/**
* Remove a node from the storage.
*
* @param id the ID of the node to remove
*/
void removeNode(NodeIdT id);
/**
* Get the log interface.
*
* @return the log interface
*/
LogInterface<TimestampT, PeerIdT, MetaT, NodeIdT> getLog();
/**
* Get the peer timestamp log interface.
*
* @return the peer timestamp log interface
*/
PeerTimestampLogInterface<TimestampT, PeerIdT> getPeerTimestampLog();
}

View File

@@ -5,29 +5,92 @@ import org.pcollections.PMap;
import java.io.Serializable;
/**
* Represents a node in the Kleppmann tree.
*
* @param <TimestampT> the type of the timestamp
* @param <PeerIdT> the type of the peer ID
* @param <MetaT> the type of the node metadata
* @param <NodeIdT> the type of the node ID
*/
public interface TreeNode<TimestampT extends Comparable<TimestampT>, PeerIdT extends Comparable<PeerIdT>, MetaT extends NodeMeta, NodeIdT> extends Serializable {
/**
* Get the ID of the node.
*
* @return the ID of the node
*/
NodeIdT key();
/**
* Get the ID of the parent node.
*
* @return the ID of the parent node
*/
NodeIdT parent();
/**
* Get the last effective operation that moved this node.
*
* @return the last effective operation
*/
OpMove<TimestampT, PeerIdT, MetaT, NodeIdT> lastEffectiveOp();
/**
* Get the metadata stored in this node.
*
* @return the metadata of the node
*/
@Nullable
MetaT meta();
/**
* Get the name of the node.
* If the node has metadata, the name is extracted from it, otherwise the key is converted to string.
*
* @return the name of the node
*/
default String name() {
var meta = meta();
if (meta != null) return meta.name();
return key().toString();
}
/**
* Get the children of this node.
*
* @return a map of child IDs to their respective nodes
*/
PMap<String, NodeIdT> children();
/**
* Make a copy of this node with a new parent.
*
* @param parent the ID of the new parent node
* @return a new TreeNode instance with the updated parent
*/
TreeNode<TimestampT, PeerIdT, MetaT, NodeIdT> withParent(NodeIdT parent);
/**
* Make a copy of this node with a new last effective operation.
*
* @param lastEffectiveOp the new last effective operation
* @return a new TreeNode instance with the updated last effective operation
*/
TreeNode<TimestampT, PeerIdT, MetaT, NodeIdT> withLastEffectiveOp(OpMove<TimestampT, PeerIdT, MetaT, NodeIdT> lastEffectiveOp);
/**
* Make a copy of this node with new metadata.
*
* @param meta the new metadata
* @return a new TreeNode instance with the updated metadata
*/
TreeNode<TimestampT, PeerIdT, MetaT, NodeIdT> withMeta(MetaT meta);
/**
* Make a copy of this node with new children.
*
* @param children the new children
* @return a new TreeNode instance with the updated children
*/
TreeNode<TimestampT, PeerIdT, MetaT, NodeIdT> withChildren(PMap<String, NodeIdT> children);
}

View File

@@ -2,9 +2,20 @@ package com.usatiuk.objects;
import java.io.Serializable;
/**
* JData is a marker interface for all objects that can be stored in the object store.
*/
public interface JData extends Serializable {
/**
* Returns the key of the object.
* @return the key of the object
*/
JObjectKey key();
/**
* Returns the estimated size of the object in bytes.
* @return the estimated size of the object in bytes
*/
default int estimateSize() {
return 100;
}

View File

@@ -2,15 +2,34 @@ package com.usatiuk.objects;
import com.usatiuk.objects.iterators.Data;
/**
* JDataVersionedWrapper is a wrapper for JData that contains its version number
* (the id of the transaction that had changed it last)
*/
public sealed interface JDataVersionedWrapper extends Data<JDataVersionedWrapper> permits JDataVersionedWrapperLazy, JDataVersionedWrapperImpl {
@Override
default JDataVersionedWrapper value() {
return this;
}
/**
* Returns the wrapped object.
*
* @return the wrapped object
*/
JData data();
/**
* Returns the version number of the object.
*
* @return the version number of the object
*/
long version();
/**
* Returns the estimated size of the object in bytes.
*
* @return the estimated size of the object in bytes
*/
int estimateSize();
}

View File

@@ -4,6 +4,9 @@ import jakarta.annotation.Nonnull;
import java.io.Serializable;
/**
* Simple wrapper for an already-existing JData object with a version.
*/
public record JDataVersionedWrapperImpl(@Nonnull JData data,
long version) implements Serializable, JDataVersionedWrapper {
@Override

View File

@@ -2,18 +2,35 @@ package com.usatiuk.objects;
import java.util.function.Supplier;
/**
* Lazy JDataVersionedWrapper implementation.
* The object is deserialized only when data() is called for the first time.
* Also allows to set a callback to be called when the data is loaded (e.g. to cache it).
*/
public final class JDataVersionedWrapperLazy implements JDataVersionedWrapper {
private final long _version;
private final int _estimatedSize;
private JData _data;
private Supplier<JData> _producer;
/**
* Creates a new JDataVersionedWrapperLazy object.
*
* @param version the version number of the object
* @param estimatedSize the estimated size of the object in bytes
* @param producer a supplier that produces the wrapped object
*/
public JDataVersionedWrapperLazy(long version, int estimatedSize, Supplier<JData> producer) {
_version = version;
_estimatedSize = estimatedSize;
_producer = producer;
}
/**
* Set a callback to be called when the data is loaded.
*
* @param cacheCallback the callback to be called
*/
public void setCacheCallback(Runnable cacheCallback) {
if (_data != null) {
throw new IllegalStateException("Cache callback can be set only before data is loaded");

View File

@@ -7,11 +7,21 @@ import jakarta.inject.Singleton;
import java.nio.ByteBuffer;
/**
* Serializer for JDataVersionedWrapper objects.
* The objects are stored in a simple format: first is 8-byte long, then the serialized object.
*/
@Singleton
public class JDataVersionedWrapperSerializer {
@Inject
ObjectSerializer<JData> dataSerializer;
/**
* Serializes a JDataVersionedWrapper object to a ByteString.
*
* @param obj the object to serialize
* @return the serialized object as a ByteString
*/
public ByteString serialize(JDataVersionedWrapper obj) {
ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES);
buffer.putLong(obj.version());
@@ -19,6 +29,13 @@ public class JDataVersionedWrapperSerializer {
return ByteString.copyFrom(buffer).concat(dataSerializer.serialize(obj.data()));
}
/**
* Deserializes a JDataVersionedWrapper object from a ByteBuffer.
* Returns a lazy wrapper (JDataVersionedWrapperLazy).
*
* @param data the ByteBuffer containing the serialized object
* @return the deserialized object
*/
public JDataVersionedWrapper deserialize(ByteBuffer data) {
var version = data.getLong();
return new JDataVersionedWrapperLazy(version, data.remaining(),

View File

@@ -5,30 +5,68 @@ import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.UUID;
/**
* JObjectKey is an interface for object keys to be used in the object store.
*/
public sealed interface JObjectKey extends Serializable, Comparable<JObjectKey> permits JObjectKeyImpl, JObjectKeyMax, JObjectKeyMin {
JObjectKeyMin MIN = new JObjectKeyMin();
JObjectKeyMax MAX = new JObjectKeyMax();
/**
* Creates a new JObjectKey from a string value.
*
* @param value the string value of the key
* @return a new JObjectKey
*/
static JObjectKey of(String value) {
return new JObjectKeyImpl(value);
}
/**
* Creates a new JObjectKey with a random UUID.
*
* @return a new JObjectKey with a random UUID
*/
static JObjectKey random() {
return new JObjectKeyImpl(UUID.randomUUID().toString());
}
/**
* Returns a JObjectKey that compares less than all other keys.
* Calling value on this key will result in an exception.
*
* @return a JObjectKey that compares less than all other keys
*/
static JObjectKey first() {
return MIN;
}
/**
* Returns a JObjectKey that compares greater than all other keys.
* Calling value on this key will result in an exception.
*
* @return a JObjectKey that compares greater than all other keys
*/
static JObjectKey last() {
return MAX;
}
/**
* Creates a new JObjectKey from a byte array.
*
* @param bytes the byte array representing the key
* @return a new JObjectKey
*/
static JObjectKey fromBytes(byte[] bytes) {
return new JObjectKeyImpl(new String(bytes, StandardCharsets.ISO_8859_1));
}
/**
* Creates a new JObjectKey from a ByteBuffer.
*
* @param buff the ByteBuffer representing the key
* @return a new JObjectKey
*/
static JObjectKey fromByteBuffer(ByteBuffer buff) {
byte[] bytes = new byte[buff.remaining()];
buff.get(bytes);
@@ -41,7 +79,17 @@ public sealed interface JObjectKey extends Serializable, Comparable<JObjectKey>
@Override
String toString();
/**
* Returns the byte buffer representation of the key.
*
* @return the byte buffer representation of the key
*/
ByteBuffer toByteBuffer();
/**
* Returns the string value of the key.
*
* @return the string value of the key
*/
String value();
}

View File

@@ -7,6 +7,9 @@ import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Objects;
/**
* A "real" implementation of JObjectKey, containing an underlying string, and a cached lazily created byte buffer.
*/
public final class JObjectKeyImpl implements JObjectKey {
@Serial
private static final long serialVersionUID = 0L;

View File

@@ -2,6 +2,9 @@ package com.usatiuk.objects;
import java.nio.ByteBuffer;
/**
* JObjectKey implementation that compares greater than all other keys.
*/
public record JObjectKeyMax() implements JObjectKey {
@Override
public int compareTo(JObjectKey o) {

View File

@@ -2,6 +2,9 @@ package com.usatiuk.objects;
import java.nio.ByteBuffer;
/**
* JObjectKey implementation that compares less than all other keys.
*/
public record JObjectKeyMin() implements JObjectKey {
@Override
public int compareTo(JObjectKey o) {

View File

@@ -10,6 +10,9 @@ import jakarta.enterprise.context.ApplicationScoped;
import java.io.IOException;
import java.nio.ByteBuffer;
/**
* Simple Java object serializer.
*/
@ApplicationScoped
@DefaultBean
public class JavaDataSerializer implements ObjectSerializer<JData> {

View File

@@ -4,8 +4,25 @@ import com.google.protobuf.ByteString;
import java.nio.ByteBuffer;
/**
* Interface for serializing and deserializing objects.
*
* @param <T> the type of object to serialize/deserialize
*/
public interface ObjectSerializer<T> {
/**
* Serialize an object to a ByteString.
*
* @param obj the object to serialize
* @return the serialized object as a ByteString
*/
ByteString serialize(T obj);
/**
* Deserialize an object from a ByteBuffer.
*
* @param data the ByteBuffer containing the serialized object
* @return the deserialized object
*/
T deserialize(ByteBuffer data);
}

View File

@@ -4,19 +4,63 @@ import org.apache.commons.lang3.tuple.Pair;
import java.util.Iterator;
/**
* An iterator over key-value pairs that can be closed and supports peek and skip operations, in both directions.
* @param <K> the type of the keys
* @param <V> the type of the values
*/
public interface CloseableKvIterator<K extends Comparable<? super K>, V> extends Iterator<Pair<K, V>>, AutoCloseable {
/**
* Returns the upcoming key in the forward direction without advancing the iterator.
*
* @return the current key
* @throws IllegalStateException if there is no next element
*/
K peekNextKey();
/**
* Skips the next element in the forward direction.
*
* @throws IllegalStateException if there is no next element
*/
void skip();
/**
* Checks if there is a next element in the forward direction.
*
* @return true if there is a next element, false otherwise
* @throws IllegalStateException if there is no next element
*/
K peekPrevKey();
/**
* Returns the key-value pair in the reverse direction, and advances the iterator.
*
* @return the previous key-value pair
* @throws IllegalStateException if there is no previous element
*/
Pair<K, V> prev();
/**
* Checks if there is a previous element in the reverse direction.
*
* @return true if there is a previous element, false otherwise
*/
boolean hasPrev();
/**
* Skips the previous element in the reverse direction.
*
* @throws IllegalStateException if there is no previous element
*/
void skipPrev();
/**
* Returns a reversed iterator that iterates in the reverse direction.
*
* @return a new CloseableKvIterator that iterates in the reverse direction
*/
default CloseableKvIterator<K, V> reversed() {
return new ReversedKvIterator<K, V>(this);
}

View File

@@ -1,5 +1,13 @@
package com.usatiuk.objects.iterators;
/**
* Interface indicating that data is present.
* @param <V> the type of the value
*/
public interface Data<V> extends MaybeTombstone<V> {
/**
* Get the value.
* @return the value
*/
V value();
}

View File

@@ -1,4 +1,9 @@
package com.usatiuk.objects.iterators;
/**
* Simple implementation of the Data interface.
* @param value the value
* @param <V> the type of the value
*/
public record DataWrapper<V>(V value) implements Data<V> {
}

View File

@@ -1,5 +1,8 @@
package com.usatiuk.objects.iterators;
/**
* Allows to specify initial positioning of the iterator relative to the requested key.
*/
public enum IteratorStart {
LT,
LE,

View File

@@ -5,11 +5,25 @@ import org.apache.commons.lang3.tuple.Pair;
import java.util.NoSuchElementException;
import java.util.function.Function;
/**
* A key-value iterator that filters keys based on a predicate.
*
* @param <K> the type of the keys
* @param <V> the type of the values
*/
public class KeyPredicateKvIterator<K extends Comparable<K>, V> extends ReversibleKvIterator<K, V> {
private final CloseableKvIterator<K, V> _backing;
private final Function<K, Boolean> _filter;
private K _next;
/**
* Constructs a KeyPredicateKvIterator with the specified backing iterator, start position, and filter.
*
* @param backing the backing iterator
* @param start the starting position relative to the startKey
* @param startKey the starting key
* @param filter the filter function to apply to keys. Only keys for which this function returns true will be included in the iteration.
*/
public KeyPredicateKvIterator(CloseableKvIterator<K, V> backing, IteratorStart start, K startKey, Function<K, Boolean> filter) {
_goingForward = true;
_backing = backing;

View File

@@ -4,10 +4,23 @@ import org.apache.commons.lang3.tuple.Pair;
import java.util.function.Function;
/**
* A mapping key-value iterator that transforms the values of a backing iterator using a specified function.
*
* @param <K> the type of the keys
* @param <V> the type of the values in the backing iterator
* @param <V_T> the type of the transformed values
*/
public class MappingKvIterator<K extends Comparable<K>, V, V_T> implements CloseableKvIterator<K, V_T> {
private final CloseableKvIterator<K, V> _backing;
private final Function<V, V_T> _transformer;
/**
* Constructs a MappingKvIterator with the specified backing iterator and transformer function.
*
* @param backing the backing iterator
* @param transformer the function to transform values
*/
public MappingKvIterator(CloseableKvIterator<K, V> backing, Function<V, V_T> transformer) {
_backing = backing;
_transformer = transformer;

View File

@@ -1,4 +1,8 @@
package com.usatiuk.objects.iterators;
/**
* Optional-like interface, can either be {@link Data} or {@link Tombstone}.
* @param <T> the type of the value
*/
public interface MaybeTombstone<T> {
}

View File

@@ -9,10 +9,25 @@ import java.util.NavigableMap;
import java.util.NoSuchElementException;
import java.util.TreeMap;
/**
* A merging key-value iterator that combines multiple iterators into a single iterator.
*
* @param <K> the type of the keys
* @param <V> the type of the values
*/
public class MergingKvIterator<K extends Comparable<K>, V> extends ReversibleKvIterator<K, V> {
private final NavigableMap<K, IteratorEntry<K, V>> _sortedIterators = new TreeMap<>();
private final List<IteratorEntry<K, V>> _iterators;
/**
* Constructs a MergingKvIterator with the specified start type, start key, and list of iterators.
* The iterators have priority based on their order in the list: if two iterators have the same key,
* the one that is in the beginning of the list will be used.
*
* @param startType the starting position relative to the startKey
* @param startKey the starting key
* @param iterators the list of iterators to merge
*/
public MergingKvIterator(IteratorStart startType, K startKey, List<CloseableKvIterator<K, V>> iterators) {
_goingForward = true;
@@ -88,6 +103,15 @@ public class MergingKvIterator<K extends Comparable<K>, V> extends ReversibleKvI
// }
}
/**
* Constructs a MergingKvIterator with the specified start type, start key, and array of iterators.
* The iterators have priority based on their order in the array: if two iterators have the same key,
* the one that is in the beginning of the array will be used.
*
* @param startType the starting position relative to the startKey
* @param startKey the starting key
* @param iterators the array of iterators to merge
*/
@SafeVarargs
public MergingKvIterator(IteratorStart startType, K startKey, CloseableKvIterator<K, V>... iterators) {
this(startType, startKey, List.of(iterators));

View File

@@ -4,11 +4,25 @@ import org.apache.commons.lang3.tuple.Pair;
import java.util.*;
/**
* A key-value iterator for a {@link NavigableMap}.
* It allows iterating over the keys and values in a sorted order.
*
* @param <K> the type of the keys
* @param <V> the type of the values
*/
public class NavigableMapKvIterator<K extends Comparable<K>, V> extends ReversibleKvIterator<K, V> {
private final NavigableMap<K, V> _map;
private Iterator<Map.Entry<K, V>> _iterator;
private Map.Entry<K, V> _next;
/**
* Constructs a NavigableMapKvIterator with the specified map, start type, and start key.
*
* @param map the map to iterate over
* @param start the starting position relative to the startKey
* @param key the starting key
*/
public NavigableMapKvIterator(NavigableMap<K, ? extends V> map, IteratorStart start, K key) {
_map = (NavigableMap<K, V>) map;
SortedMap<K, V> _view;

View File

@@ -2,9 +2,19 @@ package com.usatiuk.objects.iterators;
import org.apache.commons.lang3.tuple.Pair;
/**
* A wrapper for a key-value iterator that iterates in reverse order.
* @param <K> the type of the keys
* @param <V> the type of the values
*/
public class ReversedKvIterator<K extends Comparable<? super K>, V> implements CloseableKvIterator<K, V> {
private final CloseableKvIterator<K, V> _backing;
/**
* Constructs a ReversedKvIterator with the specified backing iterator.
*
* @param backing the backing iterator
*/
public ReversedKvIterator(CloseableKvIterator<K, V> backing) {
_backing = backing;
}

View File

@@ -2,9 +2,21 @@ package com.usatiuk.objects.iterators;
import org.apache.commons.lang3.tuple.Pair;
/**
* Base class for a reversible key-value iterator.
*
* @param <K> the type of the keys
* @param <V> the type of the values
*/
public abstract class ReversibleKvIterator<K extends Comparable<K>, V> implements CloseableKvIterator<K, V> {
/**
* The current direction of the iterator.
*/
protected boolean _goingForward;
/**
* Reverses the current direction of the iterator.
*/
protected abstract void reverse();
private void ensureForward() {
@@ -19,12 +31,33 @@ public abstract class ReversibleKvIterator<K extends Comparable<K>, V> implement
}
}
/**
* Fills the next element in the iterator, depending on the current direction.
*
* @throws IllegalStateException if there is no next element
*/
abstract protected K peekImpl();
/**
* Skips the next element in the iterator, depending on the current direction.
*
* @throws IllegalStateException if there is no next element
*/
abstract protected void skipImpl();
/**
* Checks if there is a next element in the iterator, depending on the current direction.
*
* @return true if there is a next element, false otherwise
*/
abstract protected boolean hasImpl();
/**
* Returns the next element in the iterator, depending on the current direction.
*
* @return the next element
* @throws IllegalStateException if there is no next element
*/
abstract protected Pair<K, V> nextImpl();
@Override

View File

@@ -1,4 +1,8 @@
package com.usatiuk.objects.iterators;
/**
* Indicates that the value is a tombstone.
* @param <V> the type of the value
*/
public interface Tombstone<V> extends MaybeTombstone<V> {
}

View File

@@ -1,4 +1,8 @@
package com.usatiuk.objects.iterators;
/**
* Simple implementation of the Tombstone interface.
* @param <V> the type of the value
*/
public record TombstoneImpl<V>() implements Tombstone<V> {
}

View File

@@ -6,11 +6,25 @@ import java.util.List;
import java.util.NoSuchElementException;
import java.util.function.Function;
/**
* A key-value iterator that skips tombstones.
*
* @param <K> the type of the keys
* @param <V> the type of the values
*/
public class TombstoneSkippingIterator<K extends Comparable<K>, V> extends ReversibleKvIterator<K, V> {
private final MergingKvIterator<K, MaybeTombstone<V>> _backing;
private Pair<K, V> _next = null;
private boolean _checkedNext = false;
/**
* Constructs a TombstoneSkippingIterator with the specified start position, start key, and list of iterators.
* Like {@link MappingKvIterator}, iterators have a priority depending on their order in the list.
*
* @param start the starting position relative to the startKey
* @param startKey the starting key
* @param iterators the list of iterators to merge
*/
public TombstoneSkippingIterator(IteratorStart start, K startKey, List<CloseableKvIterator<K, MaybeTombstone<V>>> iterators) {
_goingForward = true;
_backing = new MergingKvIterator<>(start, startKey, iterators);

View File

@@ -8,12 +8,36 @@ import javax.annotation.Nonnull;
import java.util.List;
import java.util.Optional;
/**
* Interface for a snapshot of a database.
* Represents a point-in-time view of a storage, with a unique ID.
*
* @param <K> the type of the key
* @param <V> the type of the value
*/
public interface Snapshot<K extends Comparable<K>, V> extends AutoCloseable {
/**
* Get a list of iterators representing the snapshot.
* The iterators have priority: the first one in the list is the highest.
* The data type of the iterator is a tombstone: a tombstone represents a deleted value that does not exist anymore.
* The list of iterators is intended to be consumed by {@link com.usatiuk.objects.iterators.TombstoneSkippingIterator}
*
* @return a list of iterators
*/
List<CloseableKvIterator<K, MaybeTombstone<V>>> getIterator(IteratorStart start, K key);
/**
* Read an object from the snapshot.
* @param name the name of the object
* @return an optional containing the object if it exists, or an empty optional if it does not
*/
@Nonnull
Optional<V> readObject(K name);
/**
* Get the ID of the snapshot.
* @return the ID of the snapshot
*/
long id();
@Override

View File

@@ -24,6 +24,11 @@ import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
/**
* CachingObjectPersistentStore is a caching layer for the SerializingObjectPersistentStore
* It stores the already deserialized objects in memory.
* Not (yet) thread safe for writes.
*/
@ApplicationScoped
public class CachingObjectPersistentStore {
private final AtomicReference<Cache> _cache;
@@ -62,6 +67,12 @@ public class CachingObjectPersistentStore {
}
}
/**
* Commit the transaction to the underlying store and update the cache.
* Once this function returns, the transaction is committed and the cache is updated.
* @param objs the transaction manifest object
* @param txId the transaction ID
*/
public void commitTx(TxManifestObj<? extends JDataVersionedWrapper> objs, long txId) {
Log.tracev("Committing: {0} writes, {1} deletes", objs.written().size(), objs.deleted().size());
@@ -79,6 +90,12 @@ public class CachingObjectPersistentStore {
Log.tracev("Committed: {0} writes, {1} deletes", objs.written().size(), objs.deleted().size());
}
/**
* Get a snapshot of underlying store and the cache.
* Objects are read from the cache if possible, if not, they are read from the underlying store,
* then possibly lazily cached when their data is accessed.
* @return a snapshot of the cached store
*/
public Snapshot<JObjectKey, JDataVersionedWrapper> getSnapshot() {
while (true) {
var cache = _cache.get();

View File

@@ -26,15 +26,19 @@ import java.nio.file.Path;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.stream.Stream;
import static org.lmdbjava.DbiFlags.MDB_CREATE;
import static org.lmdbjava.Env.create;
/**
* Persistent object storage using LMDB
*/
@ApplicationScoped
@IfBuildProperty(name = "dhfs.objects.persistence", stringValue = "lmdb")
public class LmdbObjectPersistentStore implements ObjectPersistentStore {
private static final String DB_NAME = "objects";
// LMDB object name for the transaction id
private static final String DB_VER_OBJ_NAME_STR = "__DB_VER_OBJ";
private static final ByteBuffer DB_VER_OBJ_NAME;
@@ -100,6 +104,12 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore {
if (!_ready) throw new IllegalStateException("Wrong service order!");
}
/**
* Get a snapshot of the database.
* Note that the ByteBuffers are invalid after the snapshot is closed.
*
* @return a snapshot of the database
*/
@Override
public Snapshot<JObjectKey, ByteBuffer> getSnapshot() {
var txn = _env.txnRead();

View File

@@ -15,6 +15,10 @@ import java.util.Optional;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Stream;
/**
* In-memory implementation of the ObjectPersistentStore interface.
* For testing purposes.
*/
@ApplicationScoped
@IfBuildProperty(name = "dhfs.objects.persistence", stringValue = "memory")
public class MemoryObjectPersistentStore implements ObjectPersistentStore {

View File

@@ -8,14 +8,33 @@ import javax.annotation.Nonnull;
import java.nio.ByteBuffer;
import java.util.Optional;
// Persistent storage of objects
// All changes are written as sequential transactions
/**
* Interface for a persistent store of objects.
* Does not have to be thread-safe! (yet), it is expected that all commits are done by the same thread.
*/
public interface ObjectPersistentStore {
/**
* Get a snapshot of the persistent store.
* @return a snapshot of the persistent store
*/
Snapshot<JObjectKey, ByteBuffer> getSnapshot();
/**
* Commit a transaction to the persistent store.
* @param names the transaction manifest
* @param txId the transaction ID
*/
void commitTx(TxManifestRaw names, long txId);
/**
* Get the size of the persistent store.
* @return the size of the persistent store
*/
long getTotalSpace();
/**
* Get the free space of the persistent store.
* @return the free space of the persistent store
*/
long getFreeSpace();
}

View File

@@ -16,6 +16,11 @@ import java.util.List;
import java.util.Optional;
import java.util.stream.Stream;
/**
* Serializing wrapper for the ObjectPersistentStore.
* It serializes the objects before storing them in the persistent store.
* It deserializes the objects after reading them from the persistent store.
*/
@ApplicationScoped
public class SerializingObjectPersistentStore {
@Inject
@@ -24,6 +29,13 @@ public class SerializingObjectPersistentStore {
@Inject
ObjectPersistentStore delegateStore;
/**
* Get a snapshot of the persistent store, with deserialized objects.
*
* The objects are deserialized lazily, only when their data is accessed.
*
* @return a snapshot of the persistent store
*/
public Snapshot<JObjectKey, JDataVersionedWrapper> getSnapshot() {
return new Snapshot<JObjectKey, JDataVersionedWrapper>() {
private final Snapshot<JObjectKey, ByteBuffer> _backing = delegateStore.getSnapshot();
@@ -54,6 +66,12 @@ public class SerializingObjectPersistentStore {
}
/**
* Serialize the objects, in parallel
* @param objs the objects to serialize
* @return the serialized objects
*/
private TxManifestRaw prepareManifest(TxManifestObj<? extends JDataVersionedWrapper> objs) {
return new TxManifestRaw(
objs.written().parallelStream()
@@ -62,6 +80,11 @@ public class SerializingObjectPersistentStore {
, objs.deleted());
}
/**
* Commit a transaction to the persistent store.
* @param objects the transaction manifest
* @param txId the transaction ID
*/
void commitTx(TxManifestObj<? extends JDataVersionedWrapper> objects, long txId) {
delegateStore.commitTx(prepareManifest(objects), txId);
}

View File

@@ -6,7 +6,6 @@ import org.apache.commons.lang3.tuple.Pair;
import java.io.Serializable;
import java.util.Collection;
// FIXME: Serializable
public record TxManifestObj<T>(Collection<Pair<JObjectKey, T>> written,
Collection<JObjectKey> deleted) implements Serializable {
}

View File

@@ -7,7 +7,6 @@ import org.apache.commons.lang3.tuple.Pair;
import java.io.Serializable;
import java.util.Collection;
// FIXME: Serializable
public record TxManifestRaw(Collection<Pair<JObjectKey, ByteString>> written,
Collection<JObjectKey> deleted) implements Serializable {
}

View File

@@ -33,6 +33,10 @@ import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
/**
* Asynchronous write cache of objects.
* Objects are put into a write queue by commitTx, and written to the storage by a separate thread.
*/
@ApplicationScoped
public class WritebackObjectPersistentStore {
@Inject
@@ -260,16 +264,23 @@ public class WritebackObjectPersistentStore {
}
}
public void asyncFence(long bundleId, Runnable fn) {
/**
* Run a given callback after the transaction with id txId is committed.
* If the transaction is already committed, the callback is run immediately.
*
* @param txId transaction id to wait for
* @param fn callback to run
*/
public void asyncFence(long txId, Runnable fn) {
verifyReady();
if (bundleId < 0) throw new IllegalArgumentException("txId should be >0!");
if (_lastFlushedId.get() >= bundleId) {
if (txId < 0) throw new IllegalArgumentException("txId should be >0!");
if (_lastFlushedId.get() >= txId) {
fn.run();
return;
}
_pendingBundleLock.lock();
try {
if (_lastFlushedId.get() >= bundleId) {
if (_lastFlushedId.get() >= txId) {
fn.run();
return;
}
@@ -284,12 +295,23 @@ public class WritebackObjectPersistentStore {
}
}
/**
* Commit a transaction to the persistent store.
*
* @param writes the transaction manifest
* @return a function that allows to add a callback to be run after the transaction is committed
*/
public Consumer<Runnable> commitTx(Collection<TxRecord.TxObjectRecord<?>> writes) {
long bundleId = commitBundle(writes);
return r -> asyncFence(bundleId, r);
}
/**
* Get a snapshot of the persistent store, including the pending writes.
*
* @return a snapshot of the store
*/
public Snapshot<JObjectKey, JDataVersionedWrapper> getSnapshot() {
Snapshot<JObjectKey, JDataVersionedWrapper> cache = null;
PendingWriteData pw = null;

View File

@@ -1,3 +1,5 @@
package com.usatiuk.dhfs;
import io.quarkus.logging.Log;
import io.quarkus.runtime.ShutdownEvent;
import io.quarkus.runtime.StartupEvent;
@@ -12,6 +14,9 @@ import java.util.Arrays;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* Periodically check for deadlocks in the JVM and log them if found.
*/
@ApplicationScoped
public class DeadlockDetector {
private final ExecutorService _executor = Executors.newSingleThreadExecutor();

View File

@@ -6,7 +6,7 @@ import com.usatiuk.utils.SerializationHelper;
import jakarta.inject.Singleton;
@Singleton
public class TemporaryOpSerializer implements ProtoSerializer<OpP, Op> {
public class OpSerializer implements ProtoSerializer<OpP, Op> {
@Override
public Op deserialize(OpP message) {
return SerializationHelper.deserialize(message.getSerializedData().toByteArray());

View File

@@ -11,6 +11,11 @@ import org.eclipse.microprofile.config.inject.ConfigProperty;
import java.io.IOException;
import java.nio.file.Paths;
/**
* This class checks if the application was shut down cleanly.
* It creates a file in the specified directory on startup and deletes it on shutdown.
* If the file exists on startup, it means the application was not shut down cleanly.
*/
@ApplicationScoped
public class ShutdownChecker {
private static final String dataFileName = "running";

View File

@@ -22,6 +22,9 @@ import org.eclipse.microprofile.config.inject.ConfigProperty;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* Simple class to automatically download remote objects in the background.
*/
@ApplicationScoped
public class AutosyncProcessor {
private final HashSetDelayedBlockingQueue<JObjectKey> _pending = new HashSetDelayedBlockingQueue<>(0);
@@ -77,6 +80,11 @@ public class AutosyncProcessor {
_autosyncExcecutor.shutdownNow();
}
/**
* Adds an object to the queue to be downloaded.
*
* @param name the object to add
*/
public void add(JObjectKey name) {
_pending.add(name);
}

View File

@@ -10,6 +10,9 @@ import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import org.eclipse.microprofile.config.inject.ConfigProperty;
/**
* Pre-commit hook to automatically download remote objects, if the option to download all objects is enabled.
*/
@Singleton
public class AutosyncTxHook implements PreCommitTxHook {
@Inject

View File

@@ -7,6 +7,9 @@ import org.apache.commons.collections4.multimap.HashSetValuedHashMap;
import java.io.Serial;
import java.io.Serializable;
/**
* Stores data about deferred invalidations.
*/
public class DeferredInvalidationQueueData implements Serializable {
@Serial
private static final long serialVersionUID = 1L;

View File

@@ -20,6 +20,11 @@ import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
/**
* Service to handle deferred invalidations.
* It is responsible for storing and returning deferred invalidations to the invalidation queue.
* It also is responsible for persisting and restoring the deferred invalidations on startup and shutdown.
*/
@ApplicationScoped
public class DeferredInvalidationQueueService implements PeerConnectedEventListener {
private static final String dataFileName = "invqueue";
@@ -59,7 +64,9 @@ public class DeferredInvalidationQueueService implements PeerConnectedEventListe
}
}
// FIXME:
/**
* Periodically returns deferred invalidations to the invalidation queue for all reachable hosts.
*/
@Scheduled(every = "15s", concurrentExecution = Scheduled.ConcurrentExecution.SKIP)
@Blocking
void periodicReturn() {
@@ -67,6 +74,11 @@ public class DeferredInvalidationQueueService implements PeerConnectedEventListe
returnForHost(reachable);
}
/**
* Returns deferred invalidations for a specific host.
*
* @param host the host to return deferred invalidations for
*/
void returnForHost(PeerId host) {
synchronized (this) {
var col = _persistentData.deferredInvalidations.get(host);
@@ -78,6 +90,10 @@ public class DeferredInvalidationQueueService implements PeerConnectedEventListe
}
}
/**
* Defer a specific invalidation.
* @param entry the invalidation to defer
*/
void defer(InvalidationQueueEntry entry) {
synchronized (this) {
Log.tracev("Deferred invalidation: {0}", entry);

View File

@@ -8,6 +8,12 @@ import org.pcollections.PMap;
import java.util.Collection;
import java.util.List;
/**
* Information about a new version of a remote object, possibly with its data.
* @param key the key of the object
* @param changelog the changelog of the object (version vector)
* @param data the data of the object
*/
public record IndexUpdateOp(JObjectKey key, PMap<PeerId, Long> changelog, JDataRemoteDto data) implements Op {
@Override
public Collection<JObjectKey> getEscapedRefs() {

View File

@@ -2,8 +2,8 @@ package com.usatiuk.dhfs.invalidation;
import com.usatiuk.dhfs.peersync.PeerId;
import com.usatiuk.dhfs.peersync.PeerInfoService;
import com.usatiuk.dhfs.peersync.ReachablePeerManager;
import com.usatiuk.dhfs.peersync.PersistentPeerDataService;
import com.usatiuk.dhfs.peersync.ReachablePeerManager;
import com.usatiuk.dhfs.rpc.RemoteObjectServiceClient;
import com.usatiuk.objects.JData;
import com.usatiuk.objects.JObjectKey;
@@ -31,6 +31,11 @@ import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
/**
* Service to handle sending operations to remote peers.
* This service works with objects, containing a queue of them.
* The operations to be sent to peers are extracted from the objects in the queue.
*/
@ApplicationScoped
public class InvalidationQueueService {
private final HashSetDelayedBlockingQueue<InvalidationQueueEntry> _queue;
@@ -118,6 +123,7 @@ public class InvalidationQueueService {
String stats = "Sent invalidation: ";
long success = 0;
// Don't try to send same object in multiple threads
List<AutoCloseableNoThrow> locks = new LinkedList<>();
try {
ArrayListValuedHashMap<PeerId, Op> ops = new ArrayListValuedHashMap<>();
@@ -194,6 +200,11 @@ public class InvalidationQueueService {
Log.info("Invalidation sender exiting");
}
/**
* Extract operations from an object for all peers and push them.
*
* @param key the object key to process
*/
public void pushInvalidationToAll(JObjectKey key) {
while (true) {
var queue = _toAllQueue.get();
@@ -209,6 +220,7 @@ public class InvalidationQueueService {
}
}
void pushInvalidationToOne(InvalidationQueueEntry entry) {
if (reachablePeerManager.isReachable(entry.peer()))
_queue.add(entry);
@@ -223,11 +235,23 @@ public class InvalidationQueueService {
deferredInvalidationQueueService.defer(entry);
}
/**
* Extract operations from an object for some specific peer and push them.
*
* @param host the host to extract operations for
* @param obj the object key to process
*/
public void pushInvalidationToOne(PeerId host, JObjectKey obj) {
var entry = new InvalidationQueueEntry(host, obj);
pushInvalidationToOne(entry);
}
/**
* Extract operations from an object for some specific peer and push them, without delay.
*
* @param host the host to extract operations for
* @param obj the object key to process
*/
public void pushInvalidationToOneNoDelay(PeerId host, JObjectKey obj) {
var entry = new InvalidationQueueEntry(host, obj);
pushInvalidationToOneNoDelay(entry);

View File

@@ -5,6 +5,16 @@ import com.usatiuk.objects.JObjectKey;
import java.io.Serializable;
import java.util.Collection;
/**
* Represents a unit of information to be sent to another peer.
* The operations are extracted from objects in the key-value storage, and then sent to peers.
*/
public interface Op extends Serializable {
/**
* Returns the keys of the objects that are referenced in this op.
* These objects should be marked as "escaped" in the local storage for the purposed of garbage collection.
*
* @return the keys of the objects that are referenced in this operation
*/
Collection<JObjectKey> getEscapedRefs();
}

View File

@@ -6,6 +6,17 @@ import org.apache.commons.lang3.tuple.Pair;
import java.util.List;
/**
* Interface for extracting operations from data objects.
* @param <T> the type of data
*/
public interface OpExtractor<T extends JData> {
/**
* Extract operations from the given data object.
*
* @param data the data object to extract operations from
* @param peerId the ID of the peer to extract operations for
* @return a pair of a list of operations and a runnable to execute after the operations are sent to the peer
*/
Pair<List<Op>, Runnable> extractOps(T data, PeerId peerId);
}

View File

@@ -14,6 +14,11 @@ import java.util.List;
import java.util.Map;
import java.util.stream.Stream;
/**
* Service for extracting operations from JData objects.
* This service uses the {@link OpExtractor} interface to extract operations from JData objects.
* It is used to extract operations from JData objects before they are sent to the peer.
*/
@ApplicationScoped
public class OpExtractorService {
private final Map<Class<? extends JData>, OpExtractor> _opExtractorMap;
@@ -38,6 +43,13 @@ public class OpExtractorService {
_opExtractorMap = Map.copyOf(opExtractorMap);
}
/**
* Extract operations from the given JData object.
*
* @param data the JData object to extract operations from
* @param peerId the ID of the peer to extract operations for
* @return a pair of a list of operations and a runnable to execute after the operations are sent to the peer
*/
public @Nullable Pair<List<Op>, Runnable> extractOps(JData data, PeerId peerId) {
var extractor = _opExtractorMap.get(data.getClass());
if (extractor == null) {

View File

@@ -2,6 +2,16 @@ package com.usatiuk.dhfs.invalidation;
import com.usatiuk.dhfs.peersync.PeerId;
/**
* Interface for handling operations.
* @param <T> the type of operation
*/
public interface OpHandler<T extends Op> {
/**
* Handles the given operation.
*
* @param from the ID of the peer that sent the operation
* @param op the operation to handle
*/
void handleOp(PeerId from, T op);
}

View File

@@ -10,6 +10,11 @@ import java.util.HashMap;
import java.util.Map;
import java.util.stream.Stream;
/**
* Service for handling operations.
* This service uses the {@link OpHandler} interface to handle operations.
* It is used to handle operations received from the peer.
*/
@ApplicationScoped
public class OpHandlerService {
private final Map<Class<? extends Op>, OpHandler> _opHandlerMap;
@@ -34,6 +39,12 @@ public class OpHandlerService {
_opHandlerMap = Map.copyOf(OpHandlerMap);
}
/**
* Handle the given operation.
*
* @param from the ID of the peer that sent the operation
* @param op the operation to handle
*/
public void handleOp(PeerId from, Op op) {
var handler = _opHandlerMap.get(op.getClass());
if (handler == null) {

View File

@@ -24,6 +24,10 @@ import java.util.*;
import java.util.function.Function;
import java.util.function.Supplier;
/**
* Automatically synchronized and persistent Kleppmann tree service.
* The trees are identified by their names, and can have any type of root node.
*/
@ApplicationScoped
public class JKleppmannTreeManager {
private static final String dataFileName = "trees";
@@ -38,6 +42,12 @@ public class JKleppmannTreeManager {
@Inject
PersistentPeerDataService persistentPeerDataService;
/**
* Get or create a tree with the given name.
* @param name the name of the tree
* @param rootNodeSupplier a supplier for the root node meta
* @return the tree
*/
public JKleppmannTree getTree(JObjectKey name, Supplier<JKleppmannTreeNodeMeta> rootNodeSupplier) {
return txManager.executeTx(() -> {
var data = curTx.get(JKleppmannTreePersistentData.class, name).orElse(null);
@@ -64,13 +74,20 @@ public class JKleppmannTreeManager {
});
}
/**
* Get a tree with the given name.
* @param name the name of the tree
* @return the tree
*/
public Optional<JKleppmannTree> getTree(JObjectKey name) {
return txManager.executeTx(() -> {
return curTx.get(JKleppmannTreePersistentData.class, name).map(JKleppmannTree::new);
});
}
/**
* Kleppmann tree wrapper, automatically synchronized and persistent.
*/
public class JKleppmannTree {
private final KleppmannTree<Long, PeerId, JKleppmannTreeNodeMeta, JObjectKey> _tree;
private final JKleppmannTreeStorageInterface _storageInterface;
@@ -88,26 +105,57 @@ public class JKleppmannTreeManager {
_tree = new KleppmannTree<>(_storageInterface, peerInterface, _clock, new JOpRecorder());
}
/**
* Traverse the tree from root to find a node with the given name.
* @param names list of names to traverse
* @return the node key
*/
public JObjectKey traverse(List<String> names) {
return _tree.traverse(names);
}
/**
* Get a new node id. (random)
* @return the new node id
*/
public JObjectKey getNewNodeId() {
return _storageInterface.getNewNodeId();
}
/**
* Move a node to a new parent.
* @param newParent the new parent
* @param newMeta the new node metadata
* @param node the node to move
*/
public void move(JObjectKey newParent, JKleppmannTreeNodeMeta newMeta, JObjectKey node) {
_tree.move(newParent, newMeta, node);
}
/**
* Move a node to the trash.
* @param newMeta the new node metadata
* @param nodeKey the node key
*/
public void trash(JKleppmannTreeNodeMeta newMeta, JObjectKey nodeKey) {
_tree.move(_storageInterface.getTrashId(), newMeta.withName(nodeKey.toString()), nodeKey);
}
/**
* Check if there are any pending operations for the given peer.
* @param host the peer id
* @return true if there are pending operations, false otherwise
*/
public boolean hasPendingOpsForHost(PeerId host) {
return !_data.queues().getOrDefault(host, TreePMap.empty()).isEmpty();
}
/**
* Get the pending operations for the given peer.
* @param host the peer id
* @param limit the maximum number of operations to return
* @return the list of pending operations
*/
public List<Op> getPendingOpsForHost(PeerId host, int limit) {
ArrayList<Op> collected = new ArrayList<>();
for (var node : _data.queues().getOrDefault(host, TreePMap.empty()).entrySet()) {
@@ -119,7 +167,13 @@ public class JKleppmannTreeManager {
return Collections.unmodifiableList(collected);
}
// @Override
/**
* Mark the operation as committed for the given host.
* This should be called when the operation is successfully applied on the host.
* All operations should be sent and received in timestamp order.
* @param host the peer id
* @param op the operation to commit
*/
public void commitOpForHost(PeerId host, Op op) {
if (op instanceof JKleppmannTreePeriodicPushOp)
return;
@@ -135,15 +189,27 @@ public class JKleppmannTreeManager {
curTx.put(_data);
}
/**
* Record bootstrap operations for the given host.
* @param host the peer id
*/
public void recordBootstrap(PeerId host) {
_tree.recordBoostrapFor(host);
}
/**
* Get the parent of a node that matches the given predicate.
* @param predicate the predicate to match
*/
public Pair<String, JObjectKey> findParent(Function<TreeNode<Long, PeerId, JKleppmannTreeNodeMeta, JObjectKey>, Boolean> predicate) {
return _tree.findParent(predicate);
}
// @Override
/**
* Accept an external operation from the given peer.
* @param from the peer id
* @param op the operation to accept
*/
public void acceptExternalOp(PeerId from, Op op) {
if (op instanceof JKleppmannTreePeriodicPushOp(JObjectKey treeName, PeerId from1, long timestamp)) {
_tree.updateExternalTimestamp(from1, timestamp);
@@ -166,6 +232,11 @@ public class JKleppmannTreeManager {
_tree.applyExternalOp(from, jop.op());
}
/**
* Create a dummy operation that contains the timestamp of the last operation, to move causality threshold
* forward even without any real operations.
* @return the periodic push operation
*/
public Op getPeriodicPushOp() {
return new JKleppmannTreePeriodicPushOp(_treeName, persistentPeerDataService.getSelfUuid(), _clock.peekTimestamp());
}

View File

@@ -14,6 +14,9 @@ import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
* KleppmannTree node implementation for JKleppmannTree
*/
public record JKleppmannTreeNode(JObjectKey key, JObjectKey parent,
OpMove<Long, PeerId, JKleppmannTreeNodeMeta, JObjectKey> lastEffectiveOp,
@Nullable JKleppmannTreeNodeMeta meta,

View File

@@ -10,7 +10,9 @@ import java.io.Serializable;
import java.util.Collection;
import java.util.Objects;
// Separate refcounting from JKleppmannTreeNode
/**
* Separate reference counting from JKleppmannTreeNode
*/
public record JKleppmannTreeNodeHolder(PCollection<JDataRef> refsFrom, boolean frozen,
JKleppmannTreeNode node) implements JDataRefcounted, Serializable {

View File

@@ -5,7 +5,6 @@ import com.usatiuk.objects.JObjectKey;
import java.util.Collection;
//@ProtoMirror(JKleppmannTreeNodeMetaP.class)
public interface JKleppmannTreeNodeMeta extends NodeMeta {
JKleppmannTreeNodeMeta withName(String name);

View File

@@ -14,6 +14,9 @@ import org.pcollections.PSortedMap;
import java.util.Collection;
import java.util.List;
/**
* Various persistent data for JKleppmannTree
*/
public record JKleppmannTreePersistentData(
JObjectKey key, PCollection<JDataRef> refsFrom, boolean frozen,
long clock,

View File

@@ -12,6 +12,12 @@ import java.util.ArrayList;
import java.util.Optional;
// TODO: It's not actually generic right now, only longs are supported essentially
/**
* Persistent-storage backed ordered map service.
* Local and remote objects can implement the ${@link JMapHolder} interface, then they can be used with this service
* to store references to other objects identified by sorded keys of some kind. (for now only longs)
*/
@Singleton
public class JMapHelper {
@Inject
@@ -33,29 +39,69 @@ public class JMapHelper {
return JObjectKey.of(holder.value() + ">");
}
/**
* Get an iterator for the map of a given holder.
* @param holder the holder of the map
* @param start the start position of the iterator relative to the key
* @param key the key to start the iterator from
* @return an iterator for the map of the given holder
* @param <K> the type of the key
*/
public <K extends JMapKey> CloseableKvIterator<K, JMapEntry<K>> getIterator(JMapHolder<K> holder, IteratorStart start, K key) {
return new JMapIterator<>(curTx.getIterator(start, makeKey(holder.key(), key)), holder);
}
/**
* Get an iterator for the map of a given holder. The iterator starts from the first key.
* @param holder the holder of the map
* @return an iterator for the map of the given holder
* @param <K> the type of the key
*/
public <K extends JMapKey> CloseableKvIterator<K, JMapEntry<K>> getIterator(JMapHolder<K> holder) {
return new JMapIterator<>(curTx.getIterator(IteratorStart.GT, makeKeyFirst(holder.key())), holder);
}
/**
* Put a new entry into the map of a given holder.
* @param holder the holder of the map
* @param key the key to put
* @param ref the key of the object reference to which to record
* @param <K> the type of the key
*/
public <K extends JMapKey> void put(JMapHolder<K> holder, K key, JObjectKey ref) {
curTx.put(new JMapEntry<>(holder.key(), key, ref));
}
/**
* Get an entry from the map of a given holder.
* @param holder the holder of the map
* @param key the key to get
* @return an optional containing the entry if it exists, or an empty optional if it does not
* @param <K> the type of the key
*/
public <K extends JMapKey> Optional<JMapEntry<K>> get(JMapHolder<K> holder, K key) {
return curTx.get(JMapEntry.class, makeKey(holder.key(), key)).map(e -> (JMapEntry<K>) e);
}
/**
* Delete an entry from the map of a given holder.
* @param holder the holder of the map
* @param key the key to delete
* @param <K> the type of the key
*/
public <K extends JMapKey> void delete(JMapHolder<K> holder, K key) {
curTx.delete(makeKey(holder.key(), key));
}
public <K extends JMapKey> void deleteAll(JMapHolder<K> he) {
/**
* Delete all entries from the map of a given holder.
* @param holder the holder of the map
* @param <K> the type of the key
*/
public <K extends JMapKey> void deleteAll(JMapHolder<K> holder) {
ArrayList<K> collectedKeys = new ArrayList<>();
try (var it = getIterator(he)) {
try (var it = getIterator(holder)) {
while (it.hasNext()) {
var curKey = it.peekNextKey();
collectedKeys.add(curKey);
@@ -64,8 +110,8 @@ public class JMapHelper {
}
for (var curKey : collectedKeys) {
delete(he, curKey);
Log.tracev("Removed map entry {0} to {1}", he.key(), curKey);
delete(holder, curKey);
Log.tracev("Removed map entry {0} to {1}", holder.key(), curKey);
}
}

View File

@@ -2,5 +2,9 @@ package com.usatiuk.dhfs.jmap;
import com.usatiuk.objects.JData;
/**
* Marker interface that allows an object to hold an ordered key-value map of object references.
* @param <K>
*/
public interface JMapHolder<K extends JMapKey> extends JData {
}

View File

@@ -10,6 +10,9 @@ import com.usatiuk.objects.transaction.Transaction;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
/**
* This hook is used to delete all the entries of a map in a map holder when the holder is deleted.
*/
@Singleton
public class JMapHolderRefcounterTxHook implements PreCommitTxHook {
@Inject

View File

@@ -6,6 +6,10 @@ import com.usatiuk.objects.iterators.CloseableKvIterator;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.commons.lang3.tuple.Pair;
/**
* Iterates over JMap entries of a given holder.
* @param <K> the type of the key
*/
public class JMapIterator<K extends JMapKey> implements CloseableKvIterator<K, JMapEntry<K>> {
private final CloseableKvIterator<JObjectKey, JData> _backing;
private final JObjectKey _prefix;

View File

@@ -1,4 +1,7 @@
package com.usatiuk.dhfs.jmap;
/**
* Marker interface for JMap keys. TODO: Actually only longs are supported right now.
*/
public interface JMapKey extends Comparable<JMapKey> {
}

View File

@@ -6,6 +6,12 @@ import com.usatiuk.objects.JObjectKey;
import java.util.Comparator;
/**
* A reference from a JMap object to some object.
* This is used to get the parent object from the reference.
* @param holder the object that holds the map
* @param mapKey the key in the map
*/
public record JMapRef(JObjectKey holder, JMapKey mapKey) implements JDataRef {
@Override
public JObjectKey obj() {

View File

@@ -4,6 +4,9 @@ import com.usatiuk.dhfs.peersync.PeerId;
import java.net.InetAddress;
/**
* Represents a peer address with an IP address and port.
*/
public record IpPeerAddress(PeerId peer, PeerAddressType type,
InetAddress address, int port, int securePort) implements PeerAddress {
public IpPeerAddress withType(PeerAddressType type) {

View File

@@ -6,8 +6,20 @@ import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Optional;
/**
* Helper class for parsing peer addresses from strings.
* <p>
* The expected format is: <peerId>:<ip>:<port>:<securePort>
* </p>
*/
public class PeerAddrStringHelper {
/**
* Parses a string into an IpPeerAddress object.
*
* @param addr the string to parse
* @return an Optional containing the parsed IpPeerAddress, or an empty Optional if the string is empty
*/
public static Optional<IpPeerAddress> parse(String addr) {
if (addr.isEmpty()) {
return Optional.empty();
@@ -21,6 +33,13 @@ public class PeerAddrStringHelper {
}
}
/**
* Parses a string into an IpPeerAddress object, with a manually provided peer ID.
*
* @param peerId the peer ID to use
* @param addr the string to parse
* @return an Optional containing the parsed IpPeerAddress, or an empty Optional if the string is empty
*/
public static Optional<IpPeerAddress> parseNoPeer(PeerId peerId, String addr) {
if (addr.isEmpty()) {
return Optional.empty();

View File

@@ -4,8 +4,21 @@ import com.usatiuk.dhfs.peersync.PeerId;
import java.io.Serializable;
/**
* Peer address interface, can be used to represent different types of peer addresses, not only IP.
*/
public interface PeerAddress extends Serializable {
/**
* Returns the peer ID associated with this address.
*
* @return the peer ID
*/
PeerId peer();
/**
* Returns the type of this peer address (LAN/WAN/etc)
*
* @return the type of the peer address
*/
PeerAddressType type();
}

View File

@@ -13,12 +13,21 @@ import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
/**
* Peer discovery directory collects known peer addresses, and automatically cleans up old entries.
*/
@ApplicationScoped
public class PeerDiscoveryDirectory {
private final MultiValuedMap<PeerId, PeerEntry> _entries = new HashSetValuedHashMap<>();
@ConfigProperty(name = "dhfs.peerdiscovery.timeout")
long timeout;
/**
* Notifies the directory about a new address for a peer.
* If the address is already known, it updates the last seen time.
*
* @param addr the new address
*/
public void notifyAddr(PeerAddress addr) {
Log.tracev("New address {0}", addr);
synchronized (_entries) {
@@ -28,6 +37,13 @@ public class PeerDiscoveryDirectory {
}
}
/**
* Returns a collection of addresses for a given peer.
* Cleans up old entries that are no longer reachable.
*
* @param peer the peer ID
* @return a collection of addresses for the peer
*/
public Collection<PeerAddress> getForPeer(PeerId peer) {
synchronized (_entries) {
long curTime = System.currentTimeMillis();
@@ -43,6 +59,12 @@ public class PeerDiscoveryDirectory {
}
}
/**
* Returns a collection of reachable peers.
* Cleans up old entries that are no longer reachable.
*
* @return a collection of reachable peers
*/
public Collection<PeerId> getReachablePeers() {
synchronized (_entries) {
long curTime = System.currentTimeMillis();

View File

@@ -5,6 +5,9 @@ import io.quarkus.scheduler.Scheduled;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
/**
* Notified PeerDiscoveryDirectory about manually added peer addresses.
*/
@ApplicationScoped
public class PersistentStaticPeerDiscovery {
@Inject

View File

@@ -9,6 +9,9 @@ import java.util.Arrays;
import java.util.List;
import java.util.Optional;
/**
* Notifies PeerDiscoveryDirectory about statically configured peer addresses.
*/
@ApplicationScoped
public class StaticPeerDiscovery {
private final List<IpPeerAddress> _peers;

View File

@@ -15,6 +15,9 @@ import org.eclipse.microprofile.config.inject.ConfigProperty;
import java.net.*;
/**
* Broadcasts information about this peer to the local network.
*/
@ApplicationScoped
@IfBuildProperty(name = "dhfs.local-discovery", stringValue = "true")
public class LocalPeerDiscoveryBroadcaster {

View File

@@ -19,6 +19,10 @@ import org.eclipse.microprofile.config.inject.ConfigProperty;
import java.net.*;
import java.nio.ByteBuffer;
/**
* Listens for peer discovery packets from other peers on the local network.
* When a packet is received, it notifies the PeerDiscoveryDirectory about the new peer.
*/
@ApplicationScoped
@IfBuildProperty(name = "dhfs.local-discovery", stringValue = "true")
public class LocalPeerDiscoveryClient {

View File

@@ -3,8 +3,24 @@ package com.usatiuk.dhfs.peersync;
import com.usatiuk.objects.JData;
import com.usatiuk.objects.JObjectKey;
/**
* Allows to specify custom processing of initial synchronization/crash recovery for a specific object type.
*
* @param <T> the type of the object
*/
public interface InitialSyncProcessor<T extends JData> {
/**
* Called when the peer is connected for the first time (or needs to be re-synced).
*
* @param from the peer that initiated the sync
* @param key the key of the object to be synchronized
*/
void prepareForInitialSync(PeerId from, JObjectKey key);
/**
* Called when the system had crashed (and the object needs to probably be re-synced).
*
* @param key the key of the object to be handled
*/
void handleCrash(JObjectKey key);
}

View File

@@ -1,5 +1,13 @@
package com.usatiuk.dhfs.peersync;
/**
* Listener for peer connected events.
*/
public interface PeerConnectedEventListener {
/**
* Called when a peer is connected.
*
* @param peerId the ID of the connected peer
*/
void handlePeerConnected(PeerId peerId);
}

View File

@@ -1,5 +1,13 @@
package com.usatiuk.dhfs.peersync;
/**
* Listener for peer disconnected events.
*/
public interface PeerDisconnectedEventListener {
/**
* Called when a peer is disconnected.
*
* @param peerId the ID of the disconnected peer
*/
void handlePeerDisconnected(PeerId peerId);
}

View File

@@ -4,6 +4,11 @@ import com.usatiuk.objects.JObjectKey;
import java.io.Serializable;
/**
* Represents a peer ID
* Type-safe wrapper for JObjectKey
* @param id the ID of the peer (as a JObjectKey)
*/
public record PeerId(JObjectKey id) implements Serializable, Comparable<PeerId> {
public static PeerId of(String id) {
return new PeerId(JObjectKey.of(id));

View File

@@ -11,6 +11,17 @@ import org.pcollections.PMap;
import java.security.cert.X509Certificate;
/**
* Represents information about a peer in the cluster
* {@link JDataRemotePush} annotation is used, as invalidating a peer information by the peer itself might make it unreachable,
* as it will not be possible to download it from the invalidated peer, so the peer information should be send with the notification
*
* @param key the key of the peer
* @param id the ID of the peer
* @param cert the certificate of the peer
* @param kickCounter the kick counter of the peer, entries are incremented when a peer is kicked out for not being seen for a long time
* @param lastSeenTimestamp the last time the peer was seen
*/
@JDataRemotePush
public record PeerInfo(JObjectKey key, PeerId id, ByteString cert,
PMap<PeerId, Long> kickCounter,

View File

@@ -14,6 +14,9 @@ import jakarta.inject.Inject;
import java.util.List;
import java.util.Optional;
/**
* Service for managing information about peers connected to the cluster.
*/
@ApplicationScoped
public class PeerInfoService {
public static final JObjectKey TREE_KEY = JObjectKey.of("peers");
@@ -32,7 +35,7 @@ public class PeerInfoService {
return jKleppmannTreeManager.getTree(TREE_KEY, () -> null);
}
public Optional<PeerInfo> getPeerInfoImpl(JObjectKey key) {
Optional<PeerInfo> getPeerInfoImpl(JObjectKey key) {
return jObjectTxManager.run(() -> {
return curTx.get(JKleppmannTreeNodeHolder.class, key).map(JKleppmannTreeNodeHolder::node).flatMap(node -> {
var meta = (JKleppmannTreeNodeMetaPeer) node.meta();
@@ -42,6 +45,12 @@ public class PeerInfoService {
}
/**
* Checks if the peer with the given ID is known to the cluster.
*
* @param peer the ID of the peer to check
* @return true if the peer exists, false otherwise
*/
public boolean existsPeer(PeerId peer) {
return jObjectTxManager.run(() -> {
var gotKey = getTree().traverse(List.of(JKleppmannTreeNodeMetaPeer.peerIdToNodeId(peer).value()));
@@ -52,6 +61,12 @@ public class PeerInfoService {
});
}
/**
* Gets the information about the peer with the given ID.
*
* @param peer the ID of the peer to get information about
* @return an Optional containing the PeerInfo object if the peer exists, or an empty Optional if it does not
*/
public Optional<PeerInfo> getPeerInfo(PeerId peer) {
return jObjectTxManager.run(() -> {
var gotKey = getTree().traverse(List.of(JKleppmannTreeNodeMetaPeer.peerIdToNodeId(peer).value()));
@@ -65,6 +80,11 @@ public class PeerInfoService {
});
}
/**
* Gets the information about all peers in the cluster.
*
* @return a list of PeerInfo objects representing all peers in the cluster
*/
public List<PeerInfo> getPeers() {
return jObjectTxManager.run(() -> {
var gotKey = getTree().traverse(List.of());
@@ -80,6 +100,11 @@ public class PeerInfoService {
});
}
/**
* Gets the information about all peers in the cluster, excluding the current peer.
*
* @return a list of PeerInfo objects representing all peers in the cluster, excluding the current peer
*/
public List<PeerInfo> getPeersNoSelf() {
return jObjectTxManager.run(() -> {
return getPeers().stream().filter(
@@ -87,6 +112,12 @@ public class PeerInfoService {
});
}
/**
* Gets the information about all synchronized peers in the cluster.
* A peer might not be synchronized if it had not been seen for a while, for example.
*
* @return a list of PeerInfo objects representing all synchronized peers in the cluster
*/
public List<PeerInfo> getSynchronizedPeers() {
return jObjectTxManager.run(() -> {
return getPeers().stream().filter(pi -> {
@@ -98,6 +129,12 @@ public class PeerInfoService {
});
}
/**
* Gets the information about all synchronized peers in the cluster, excluding the current peer.
* A peer might not be synchronized if it had not been seen for a while, for example.
*
* @return a list of PeerInfo objects representing all synchronized peers in the cluster, excluding the current peer
*/
public List<PeerInfo> getSynchronizedPeersNoSelf() {
return jObjectTxManager.run(() -> {
return getPeersNoSelf().stream().filter(pi -> {
@@ -106,6 +143,12 @@ public class PeerInfoService {
});
}
/**
* Add a new peer to the cluster.
*
* @param id the ID of the peer to add
* @param cert the certificate of the peer
*/
public void putPeer(PeerId id, byte[] cert) {
jObjectTxManager.run(() -> {
var parent = getTree().traverse(List.of());
@@ -115,6 +158,11 @@ public class PeerInfoService {
});
}
/**
* Remove a peer from the cluster.
*
* @param id the ID of the peer to remove
*/
public void removePeer(PeerId id) {
jObjectTxManager.run(() -> {
var gotKey = getTree().traverse(List.of(JKleppmannTreeNodeMetaPeer.peerIdToNodeId(id).value()));

View File

@@ -9,6 +9,9 @@ import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import org.eclipse.microprofile.config.inject.ConfigProperty;
/**
* Periodically updates the last seen timestamp of reachable peers and increments the kick counter for unreachable peers.
*/
@ApplicationScoped
public class PeerLastSeenUpdater {
@Inject

View File

@@ -33,6 +33,10 @@ import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
/**
* Handles various locally relevant persistent data related to peers,
* such as local peer's UUID, certificate, and persistent peer addresses.
*/
@ApplicationScoped
public class PersistentPeerDataService {
@Inject

View File

@@ -29,6 +29,9 @@ import java.util.concurrent.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
* Handles connections to known peers in the cluster.
*/
@ApplicationScoped
public class ReachablePeerManager {
private final ConcurrentMap<PeerId, PeerAddress> _states = new ConcurrentHashMap<>();
@@ -137,11 +140,10 @@ public class ReachablePeerManager {
}
}
public void handleConnectionError(com.usatiuk.dhfs.peersync.PeerInfo host) {
private void handleConnectionError(com.usatiuk.dhfs.peersync.PeerInfo host) {
handleConnectionError(host.id());
}
// FIXME:
private boolean pingCheck(com.usatiuk.dhfs.peersync.PeerInfo host, PeerAddress address) {
try {
return rpcClientFactory.withObjSyncClient(host.id(), address, pingTimeout, (peer, c) -> {
@@ -154,28 +156,45 @@ public class ReachablePeerManager {
}
}
/**
* Checks if the given host is reachable.
* @param host the host to check
* @return true if the host is reachable, false otherwise
*/
public boolean isReachable(PeerId host) {
return _states.containsKey(host);
}
/**
* Checks if the given host is reachable.
* @param host the host to check
* @return true if the host is reachable, false otherwise
*/
public boolean isReachable(com.usatiuk.dhfs.peersync.PeerInfo host) {
return isReachable(host.id());
}
/**
* Gets the address of the given host.
* @param host the host to get the address for
* @return the address of the host, or null if not reachable
*/
public PeerAddress getAddress(PeerId host) {
return _states.get(host);
}
/**
* Gets the ids of all reachable hosts.
* @return a list of ids of all reachable hosts
*/
public List<PeerId> getAvailableHosts() {
return _states.keySet().stream().toList();
}
// public List<UUID> getUnavailableHosts() {
// return _transientPeersState.runReadLocked(d -> d.getStates().entrySet().stream()
// .filter(e -> !e.getValue().isReachable())
// .map(Map.Entry::getKey).toList());
// }
/**
* Gets a snapshot of current state of the connected (and not connected) peers
* @return information about all connected/disconnected peers
*/
public HostStateSnapshot getHostStateSnapshot() {
return transactionManager.run(() -> {
var partition = peerInfoService.getPeersNoSelf().stream().map(com.usatiuk.dhfs.peersync.PeerInfo::id)
@@ -184,16 +203,31 @@ public class ReachablePeerManager {
});
}
/**
* Removes the given host from the cluster
* @param peerId the id of the host to remove
*/
public void removeRemoteHost(PeerId peerId) {
transactionManager.run(() -> {
peerInfoService.removePeer(peerId);
});
}
/**
* Selects the best address for the given host.
* The address is selected based on the type of the address. (with e.g. LAN address preferred over WAN)
* @param host the host to select the address for
* @return the best address for the host, or null if not reachable
*/
public Optional<PeerAddress> selectBestAddress(PeerId host) {
return peerDiscoveryDirectory.getForPeer(host).stream().min(Comparator.comparing(PeerAddress::type));
}
/**
* Call the given peer and get its information.
* @param host the peer to get the information for
* @return the information about the peer
*/
private ApiPeerInfo getInfo(PeerId host) {
return transactionManager.run(() -> {
if (peerInfoService.getPeerInfo(host).isPresent())
@@ -206,6 +240,12 @@ public class ReachablePeerManager {
});
}
/**
* Adds the given peer to the cluster.
* The certificate provided is verified against the one peer is using right now.
* @param host the peer to add
* @param cert the certificate of the peer
*/
public void addRemoteHost(PeerId host, @Nullable String cert) {
transactionManager.run(() -> {
var info = getInfo(host);
@@ -222,7 +262,10 @@ public class ReachablePeerManager {
peerTrustManager.reloadTrustManagerHosts(transactionManager.run(() -> peerInfoService.getPeers().stream().toList())); //FIXME:
}
/**
* Gets the information about all reachable peers that are not added to the cluster.
* @return a collection of pairs of peer id and peer info
*/
public Collection<Pair<PeerId, ApiPeerInfo>> getSeenButNotAddedHosts() {
return transactionManager.run(() -> {
return peerDiscoveryDirectory.getReachablePeers().stream().filter(p -> !peerInfoService.getPeerInfo(p).isPresent())

View File

@@ -8,6 +8,9 @@ import jakarta.ws.rs.Path;
import java.security.cert.CertificateEncodingException;
import java.util.Base64;
/**
* Provides information about the peer publicly, without any authentication.
*/
@Path("/peer-info")
public class PeerSyncApi {
@Inject

View File

@@ -8,8 +8,17 @@ import jakarta.enterprise.context.ApplicationScoped;
import java.net.URI;
import java.util.concurrent.TimeUnit;
/**
* Allows to query peers about their information, even if peer isn't part of the cluster.
*/
@ApplicationScoped
public class PeerSyncApiClientDynamic {
/**
* Queries peer about its information.
*
* @param addr the address of the peer to query
* @return the peer information
*/
public ApiPeerInfo getSelfInfo(PeerAddress addr) {
return switch (addr) {
case IpPeerAddress ipAddr -> getSelfInfo(ipAddr.address().getHostAddress(), ipAddr.port());
@@ -20,8 +29,8 @@ public class PeerSyncApiClientDynamic {
private ApiPeerInfo getSelfInfo(String address, int port) {
var client = QuarkusRestClientBuilder.newBuilder()
.baseUri(URI.create("http://" + address + ":" + port))
.connectTimeout(5, TimeUnit.SECONDS)
.readTimeout(5, TimeUnit.SECONDS)
.connectTimeout(1, TimeUnit.SECONDS)
.readTimeout(1, TimeUnit.SECONDS)
.build(PeerSyncApiClient.class);
return client.getSelfInfo();
}

View File

@@ -22,8 +22,17 @@ import java.security.cert.X509Certificate;
import java.util.Calendar;
import java.util.Date;
/**
* Helper class for generating and manipulating X.509 certificates.
*/
public class CertificateTools {
/**
* Converts a byte array to an X.509 certificate.
*
* @param bytes the byte array representing the certificate
* @return the X.509 certificate
*/
public static X509Certificate certFromBytes(byte[] bytes) {
try {
CertificateFactory certFactory = CertificateFactory.getInstance("X.509");
@@ -34,6 +43,10 @@ public class CertificateTools {
}
}
/**
* Generates a random RSA key pair.
* @return the generated RSA key pair
*/
public static KeyPair generateKeyPair() {
try {
KeyPairGenerator keyGen = KeyPairGenerator.getInstance("RSA");
@@ -44,6 +57,13 @@ public class CertificateTools {
}
}
/**
* Generates an X.509 certificate using the provided key pair and subject name.
*
* @param keyPair the key pair to use for the certificate
* @param subject the subject name for the certificate
* @return the generated X.509 certificate
*/
public static X509Certificate generateCertificate(KeyPair keyPair, String subject) {
try {
Provider bcProvider = new BouncyCastleProvider();

Some files were not shown because too many files have changed in this diff Show More