diff --git a/dhfs-parent/dhfs-fuse/pom.xml b/dhfs-parent/dhfs-fuse/pom.xml index 08609b9d..1d7d9199 100644 --- a/dhfs-parent/dhfs-fuse/pom.xml +++ b/dhfs-parent/dhfs-fuse/pom.xml @@ -139,7 +139,7 @@ org.apache.maven.plugins maven-failsafe-plugin - 0.5C + 1C false classes diff --git a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/autosync/AutosyncProcessor.java b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/autosync/AutosyncProcessor.java index ce680536..f968250d 100644 --- a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/autosync/AutosyncProcessor.java +++ b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/autosync/AutosyncProcessor.java @@ -22,6 +22,9 @@ import org.eclipse.microprofile.config.inject.ConfigProperty; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +/** + * Simple class to automatically download remote objects in the background. + */ @ApplicationScoped public class AutosyncProcessor { private final HashSetDelayedBlockingQueue _pending = new HashSetDelayedBlockingQueue<>(0); @@ -77,6 +80,11 @@ public class AutosyncProcessor { _autosyncExcecutor.shutdownNow(); } + /** + * Adds an object to the queue to be downloaded. + * + * @param name the object to add + */ public void add(JObjectKey name) { _pending.add(name); } diff --git a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/autosync/AutosyncTxHook.java b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/autosync/AutosyncTxHook.java index 47096849..176af909 100644 --- a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/autosync/AutosyncTxHook.java +++ b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/autosync/AutosyncTxHook.java @@ -10,6 +10,9 @@ import jakarta.inject.Inject; import jakarta.inject.Singleton; import org.eclipse.microprofile.config.inject.ConfigProperty; +/** + * Pre-commit hook to automatically download remote objects, if the option to download all objects is enabled. + */ @Singleton public class AutosyncTxHook implements PreCommitTxHook { @Inject diff --git a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/invalidation/DeferredInvalidationQueueData.java b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/invalidation/DeferredInvalidationQueueData.java index 19c426b8..84ec09d0 100644 --- a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/invalidation/DeferredInvalidationQueueData.java +++ b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/invalidation/DeferredInvalidationQueueData.java @@ -7,6 +7,9 @@ import org.apache.commons.collections4.multimap.HashSetValuedHashMap; import java.io.Serial; import java.io.Serializable; +/** + * Stores data about deferred invalidations. + */ public class DeferredInvalidationQueueData implements Serializable { @Serial private static final long serialVersionUID = 1L; diff --git a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/invalidation/DeferredInvalidationQueueService.java b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/invalidation/DeferredInvalidationQueueService.java index 96fa61d0..f5447407 100644 --- a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/invalidation/DeferredInvalidationQueueService.java +++ b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/invalidation/DeferredInvalidationQueueService.java @@ -20,6 +20,11 @@ import java.io.IOException; import java.nio.file.Files; import java.nio.file.Paths; +/** + * Service to handle deferred invalidations. + * It is responsible for storing and returning deferred invalidations to the invalidation queue. + * It also is responsible for persisting and restoring the deferred invalidations on startup and shutdown. + */ @ApplicationScoped public class DeferredInvalidationQueueService implements PeerConnectedEventListener { private static final String dataFileName = "invqueue"; @@ -59,7 +64,9 @@ public class DeferredInvalidationQueueService implements PeerConnectedEventListe } } - // FIXME: + /** + * Periodically returns deferred invalidations to the invalidation queue for all reachable hosts. + */ @Scheduled(every = "15s", concurrentExecution = Scheduled.ConcurrentExecution.SKIP) @Blocking void periodicReturn() { @@ -67,6 +74,11 @@ public class DeferredInvalidationQueueService implements PeerConnectedEventListe returnForHost(reachable); } + /** + * Returns deferred invalidations for a specific host. + * + * @param host the host to return deferred invalidations for + */ void returnForHost(PeerId host) { synchronized (this) { var col = _persistentData.deferredInvalidations.get(host); @@ -78,6 +90,10 @@ public class DeferredInvalidationQueueService implements PeerConnectedEventListe } } + /** + * Defer a specific invalidation. + * @param entry the invalidation to defer + */ void defer(InvalidationQueueEntry entry) { synchronized (this) { Log.tracev("Deferred invalidation: {0}", entry); diff --git a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/invalidation/IndexUpdateOp.java b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/invalidation/IndexUpdateOp.java index 9688d247..a6f56069 100644 --- a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/invalidation/IndexUpdateOp.java +++ b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/invalidation/IndexUpdateOp.java @@ -8,6 +8,12 @@ import org.pcollections.PMap; import java.util.Collection; import java.util.List; +/** + * Information about a new version of a remote object, possibly with its data. + * @param key the key of the object + * @param changelog the changelog of the object (version vector) + * @param data the data of the object + */ public record IndexUpdateOp(JObjectKey key, PMap changelog, JDataRemoteDto data) implements Op { @Override public Collection getEscapedRefs() { diff --git a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/invalidation/InvalidationQueueService.java b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/invalidation/InvalidationQueueService.java index 59a8b3f7..db8d1ebe 100644 --- a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/invalidation/InvalidationQueueService.java +++ b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/invalidation/InvalidationQueueService.java @@ -2,8 +2,8 @@ package com.usatiuk.dhfs.invalidation; import com.usatiuk.dhfs.peersync.PeerId; import com.usatiuk.dhfs.peersync.PeerInfoService; -import com.usatiuk.dhfs.peersync.ReachablePeerManager; import com.usatiuk.dhfs.peersync.PersistentPeerDataService; +import com.usatiuk.dhfs.peersync.ReachablePeerManager; import com.usatiuk.dhfs.rpc.RemoteObjectServiceClient; import com.usatiuk.objects.JData; import com.usatiuk.objects.JObjectKey; @@ -31,6 +31,11 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +/** + * Service to handle sending operations to remote peers. + * This service works with objects, containing a queue of them. + * The operations to be sent to peers are extracted from the objects in the queue. + */ @ApplicationScoped public class InvalidationQueueService { private final HashSetDelayedBlockingQueue _queue; @@ -118,6 +123,7 @@ public class InvalidationQueueService { String stats = "Sent invalidation: "; long success = 0; + // Don't try to send same object in multiple threads List locks = new LinkedList<>(); try { ArrayListValuedHashMap ops = new ArrayListValuedHashMap<>(); @@ -194,6 +200,11 @@ public class InvalidationQueueService { Log.info("Invalidation sender exiting"); } + /** + * Extract operations from an object for all peers and push them. + * + * @param key the object key to process + */ public void pushInvalidationToAll(JObjectKey key) { while (true) { var queue = _toAllQueue.get(); @@ -209,6 +220,7 @@ public class InvalidationQueueService { } } + void pushInvalidationToOne(InvalidationQueueEntry entry) { if (reachablePeerManager.isReachable(entry.peer())) _queue.add(entry); @@ -223,11 +235,23 @@ public class InvalidationQueueService { deferredInvalidationQueueService.defer(entry); } + /** + * Extract operations from an object for some specific peer and push them. + * + * @param host the host to extract operations for + * @param obj the object key to process + */ public void pushInvalidationToOne(PeerId host, JObjectKey obj) { var entry = new InvalidationQueueEntry(host, obj); pushInvalidationToOne(entry); } + /** + * Extract operations from an object for some specific peer and push them, without delay. + * + * @param host the host to extract operations for + * @param obj the object key to process + */ public void pushInvalidationToOneNoDelay(PeerId host, JObjectKey obj) { var entry = new InvalidationQueueEntry(host, obj); pushInvalidationToOneNoDelay(entry); diff --git a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/invalidation/Op.java b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/invalidation/Op.java index 15914e84..9104a9b5 100644 --- a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/invalidation/Op.java +++ b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/invalidation/Op.java @@ -5,6 +5,16 @@ import com.usatiuk.objects.JObjectKey; import java.io.Serializable; import java.util.Collection; +/** + * Represents a unit of information to be sent to another peer. + * The operations are extracted from objects in the key-value storage, and then sent to peers. + */ public interface Op extends Serializable { + /** + * Returns the keys of the objects that are referenced in this op. + * These objects should be marked as "escaped" in the local storage for the purposed of garbage collection. + * + * @return the keys of the objects that are referenced in this operation + */ Collection getEscapedRefs(); } diff --git a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/invalidation/OpExtractor.java b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/invalidation/OpExtractor.java index beaf4046..4bdc69c5 100644 --- a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/invalidation/OpExtractor.java +++ b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/invalidation/OpExtractor.java @@ -6,6 +6,17 @@ import org.apache.commons.lang3.tuple.Pair; import java.util.List; +/** + * Interface for extracting operations from data objects. + * @param the type of data + */ public interface OpExtractor { + /** + * Extract operations from the given data object. + * + * @param data the data object to extract operations from + * @param peerId the ID of the peer to extract operations for + * @return a pair of a list of operations and a runnable to execute after the operations are sent to the peer + */ Pair, Runnable> extractOps(T data, PeerId peerId); } diff --git a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/invalidation/OpExtractorService.java b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/invalidation/OpExtractorService.java index 7b8eb32d..a21327ce 100644 --- a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/invalidation/OpExtractorService.java +++ b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/invalidation/OpExtractorService.java @@ -14,6 +14,11 @@ import java.util.List; import java.util.Map; import java.util.stream.Stream; +/** + * Service for extracting operations from JData objects. + * This service uses the {@link OpExtractor} interface to extract operations from JData objects. + * It is used to extract operations from JData objects before they are sent to the peer. + */ @ApplicationScoped public class OpExtractorService { private final Map, OpExtractor> _opExtractorMap; @@ -38,6 +43,13 @@ public class OpExtractorService { _opExtractorMap = Map.copyOf(opExtractorMap); } + /** + * Extract operations from the given JData object. + * + * @param data the JData object to extract operations from + * @param peerId the ID of the peer to extract operations for + * @return a pair of a list of operations and a runnable to execute after the operations are sent to the peer + */ public @Nullable Pair, Runnable> extractOps(JData data, PeerId peerId) { var extractor = _opExtractorMap.get(data.getClass()); if (extractor == null) { diff --git a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/invalidation/OpHandler.java b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/invalidation/OpHandler.java index ddada994..8fb61499 100644 --- a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/invalidation/OpHandler.java +++ b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/invalidation/OpHandler.java @@ -2,6 +2,16 @@ package com.usatiuk.dhfs.invalidation; import com.usatiuk.dhfs.peersync.PeerId; +/** + * Interface for handling operations. + * @param the type of operation + */ public interface OpHandler { + /** + * Handles the given operation. + * + * @param from the ID of the peer that sent the operation + * @param op the operation to handle + */ void handleOp(PeerId from, T op); } diff --git a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/invalidation/OpHandlerService.java b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/invalidation/OpHandlerService.java index e6ee8d59..13295f0c 100644 --- a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/invalidation/OpHandlerService.java +++ b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/invalidation/OpHandlerService.java @@ -10,6 +10,11 @@ import java.util.HashMap; import java.util.Map; import java.util.stream.Stream; +/** + * Service for handling operations. + * This service uses the {@link OpHandler} interface to handle operations. + * It is used to handle operations received from the peer. + */ @ApplicationScoped public class OpHandlerService { private final Map, OpHandler> _opHandlerMap; @@ -34,6 +39,12 @@ public class OpHandlerService { _opHandlerMap = Map.copyOf(OpHandlerMap); } + /** + * Handle the given operation. + * + * @param from the ID of the peer that sent the operation + * @param op the operation to handle + */ public void handleOp(PeerId from, Op op) { var handler = _opHandlerMap.get(op.getClass()); if (handler == null) { diff --git a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/jkleppmanntree/JKleppmannTreeManager.java b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/jkleppmanntree/JKleppmannTreeManager.java index 06d72caf..1feb6f33 100644 --- a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/jkleppmanntree/JKleppmannTreeManager.java +++ b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/jkleppmanntree/JKleppmannTreeManager.java @@ -24,6 +24,10 @@ import java.util.*; import java.util.function.Function; import java.util.function.Supplier; +/** + * Automatically synchronized and persistent Kleppmann tree service. + * The trees are identified by their names, and can have any type of root node. + */ @ApplicationScoped public class JKleppmannTreeManager { private static final String dataFileName = "trees"; @@ -38,6 +42,12 @@ public class JKleppmannTreeManager { @Inject PersistentPeerDataService persistentPeerDataService; + /** + * Get or create a tree with the given name. + * @param name the name of the tree + * @param rootNodeSupplier a supplier for the root node meta + * @return the tree + */ public JKleppmannTree getTree(JObjectKey name, Supplier rootNodeSupplier) { return txManager.executeTx(() -> { var data = curTx.get(JKleppmannTreePersistentData.class, name).orElse(null); @@ -64,13 +74,20 @@ public class JKleppmannTreeManager { }); } + /** + * Get a tree with the given name. + * @param name the name of the tree + * @return the tree + */ public Optional getTree(JObjectKey name) { return txManager.executeTx(() -> { return curTx.get(JKleppmannTreePersistentData.class, name).map(JKleppmannTree::new); }); } - + /** + * Kleppmann tree wrapper, automatically synchronized and persistent. + */ public class JKleppmannTree { private final KleppmannTree _tree; private final JKleppmannTreeStorageInterface _storageInterface; @@ -88,26 +105,57 @@ public class JKleppmannTreeManager { _tree = new KleppmannTree<>(_storageInterface, peerInterface, _clock, new JOpRecorder()); } + /** + * Traverse the tree from root to find a node with the given name. + * @param names list of names to traverse + * @return the node key + */ public JObjectKey traverse(List names) { return _tree.traverse(names); } + /** + * Get a new node id. (random) + * @return the new node id + */ public JObjectKey getNewNodeId() { return _storageInterface.getNewNodeId(); } + /** + * Move a node to a new parent. + * @param newParent the new parent + * @param newMeta the new node metadata + * @param node the node to move + */ public void move(JObjectKey newParent, JKleppmannTreeNodeMeta newMeta, JObjectKey node) { _tree.move(newParent, newMeta, node); } + /** + * Move a node to the trash. + * @param newMeta the new node metadata + * @param nodeKey the node key + */ public void trash(JKleppmannTreeNodeMeta newMeta, JObjectKey nodeKey) { _tree.move(_storageInterface.getTrashId(), newMeta.withName(nodeKey.toString()), nodeKey); } + /** + * Check if there are any pending operations for the given peer. + * @param host the peer id + * @return true if there are pending operations, false otherwise + */ public boolean hasPendingOpsForHost(PeerId host) { return !_data.queues().getOrDefault(host, TreePMap.empty()).isEmpty(); } + /** + * Get the pending operations for the given peer. + * @param host the peer id + * @param limit the maximum number of operations to return + * @return the list of pending operations + */ public List getPendingOpsForHost(PeerId host, int limit) { ArrayList collected = new ArrayList<>(); for (var node : _data.queues().getOrDefault(host, TreePMap.empty()).entrySet()) { @@ -119,7 +167,13 @@ public class JKleppmannTreeManager { return Collections.unmodifiableList(collected); } - // @Override + /** + * Mark the operation as committed for the given host. + * This should be called when the operation is successfully applied on the host. + * All operations should be sent and received in timestamp order. + * @param host the peer id + * @param op the operation to commit + */ public void commitOpForHost(PeerId host, Op op) { if (op instanceof JKleppmannTreePeriodicPushOp) return; @@ -135,15 +189,27 @@ public class JKleppmannTreeManager { curTx.put(_data); } + /** + * Record bootstrap operations for the given host. + * @param host the peer id + */ public void recordBootstrap(PeerId host) { _tree.recordBoostrapFor(host); } + /** + * Get the parent of a node that matches the given predicate. + * @param predicate the predicate to match + */ public Pair findParent(Function, Boolean> predicate) { return _tree.findParent(predicate); } - // @Override + /** + * Accept an external operation from the given peer. + * @param from the peer id + * @param op the operation to accept + */ public void acceptExternalOp(PeerId from, Op op) { if (op instanceof JKleppmannTreePeriodicPushOp(JObjectKey treeName, PeerId from1, long timestamp)) { _tree.updateExternalTimestamp(from1, timestamp); @@ -166,6 +232,11 @@ public class JKleppmannTreeManager { _tree.applyExternalOp(from, jop.op()); } + /** + * Create a dummy operation that contains the timestamp of the last operation, to move causality threshold + * forward even without any real operations. + * @return the periodic push operation + */ public Op getPeriodicPushOp() { return new JKleppmannTreePeriodicPushOp(_treeName, persistentPeerDataService.getSelfUuid(), _clock.peekTimestamp()); } diff --git a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/jkleppmanntree/structs/JKleppmannTreeNode.java b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/jkleppmanntree/structs/JKleppmannTreeNode.java index c1e7dafd..c4e4c75a 100644 --- a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/jkleppmanntree/structs/JKleppmannTreeNode.java +++ b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/jkleppmanntree/structs/JKleppmannTreeNode.java @@ -14,6 +14,9 @@ import java.util.Optional; import java.util.stream.Collectors; import java.util.stream.Stream; +/** + * KleppmannTree node implementation for JKleppmannTree + */ public record JKleppmannTreeNode(JObjectKey key, JObjectKey parent, OpMove lastEffectiveOp, @Nullable JKleppmannTreeNodeMeta meta, diff --git a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/jkleppmanntree/structs/JKleppmannTreeNodeHolder.java b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/jkleppmanntree/structs/JKleppmannTreeNodeHolder.java index fb3ac05c..ed17d226 100644 --- a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/jkleppmanntree/structs/JKleppmannTreeNodeHolder.java +++ b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/jkleppmanntree/structs/JKleppmannTreeNodeHolder.java @@ -10,7 +10,9 @@ import java.io.Serializable; import java.util.Collection; import java.util.Objects; -// Separate refcounting from JKleppmannTreeNode +/** + * Separate reference counting from JKleppmannTreeNode + */ public record JKleppmannTreeNodeHolder(PCollection refsFrom, boolean frozen, JKleppmannTreeNode node) implements JDataRefcounted, Serializable { diff --git a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/jkleppmanntree/structs/JKleppmannTreeNodeMeta.java b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/jkleppmanntree/structs/JKleppmannTreeNodeMeta.java index 86d558d3..5618405b 100644 --- a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/jkleppmanntree/structs/JKleppmannTreeNodeMeta.java +++ b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/jkleppmanntree/structs/JKleppmannTreeNodeMeta.java @@ -5,7 +5,6 @@ import com.usatiuk.objects.JObjectKey; import java.util.Collection; -//@ProtoMirror(JKleppmannTreeNodeMetaP.class) public interface JKleppmannTreeNodeMeta extends NodeMeta { JKleppmannTreeNodeMeta withName(String name); diff --git a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/jkleppmanntree/structs/JKleppmannTreePersistentData.java b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/jkleppmanntree/structs/JKleppmannTreePersistentData.java index c7186e75..0fd9a1e6 100644 --- a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/jkleppmanntree/structs/JKleppmannTreePersistentData.java +++ b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/jkleppmanntree/structs/JKleppmannTreePersistentData.java @@ -14,6 +14,9 @@ import org.pcollections.PSortedMap; import java.util.Collection; import java.util.List; +/** + * Various persistent data for JKleppmannTree + */ public record JKleppmannTreePersistentData( JObjectKey key, PCollection refsFrom, boolean frozen, long clock, diff --git a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/jmap/JMapHelper.java b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/jmap/JMapHelper.java index 48251738..73a6e409 100644 --- a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/jmap/JMapHelper.java +++ b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/jmap/JMapHelper.java @@ -12,6 +12,12 @@ import java.util.ArrayList; import java.util.Optional; // TODO: It's not actually generic right now, only longs are supported essentially + +/** + * Persistent-storage backed ordered map service. + * Local and remote objects can implement the ${@link JMapHolder} interface, then they can be used with this service + * to store references to other objects identified by sorded keys of some kind. (for now only longs) + */ @Singleton public class JMapHelper { @Inject @@ -33,29 +39,69 @@ public class JMapHelper { return JObjectKey.of(holder.value() + ">"); } + + /** + * Get an iterator for the map of a given holder. + * @param holder the holder of the map + * @param start the start position of the iterator relative to the key + * @param key the key to start the iterator from + * @return an iterator for the map of the given holder + * @param the type of the key + */ public CloseableKvIterator> getIterator(JMapHolder holder, IteratorStart start, K key) { return new JMapIterator<>(curTx.getIterator(start, makeKey(holder.key(), key)), holder); } + /** + * Get an iterator for the map of a given holder. The iterator starts from the first key. + * @param holder the holder of the map + * @return an iterator for the map of the given holder + * @param the type of the key + */ public CloseableKvIterator> getIterator(JMapHolder holder) { return new JMapIterator<>(curTx.getIterator(IteratorStart.GT, makeKeyFirst(holder.key())), holder); } + /** + * Put a new entry into the map of a given holder. + * @param holder the holder of the map + * @param key the key to put + * @param ref the key of the object reference to which to record + * @param the type of the key + */ public void put(JMapHolder holder, K key, JObjectKey ref) { curTx.put(new JMapEntry<>(holder.key(), key, ref)); } + /** + * Get an entry from the map of a given holder. + * @param holder the holder of the map + * @param key the key to get + * @return an optional containing the entry if it exists, or an empty optional if it does not + * @param the type of the key + */ public Optional> get(JMapHolder holder, K key) { return curTx.get(JMapEntry.class, makeKey(holder.key(), key)).map(e -> (JMapEntry) e); } + /** + * Delete an entry from the map of a given holder. + * @param holder the holder of the map + * @param key the key to delete + * @param the type of the key + */ public void delete(JMapHolder holder, K key) { curTx.delete(makeKey(holder.key(), key)); } - public void deleteAll(JMapHolder he) { + /** + * Delete all entries from the map of a given holder. + * @param holder the holder of the map + * @param the type of the key + */ + public void deleteAll(JMapHolder holder) { ArrayList collectedKeys = new ArrayList<>(); - try (var it = getIterator(he)) { + try (var it = getIterator(holder)) { while (it.hasNext()) { var curKey = it.peekNextKey(); collectedKeys.add(curKey); @@ -64,8 +110,8 @@ public class JMapHelper { } for (var curKey : collectedKeys) { - delete(he, curKey); - Log.tracev("Removed map entry {0} to {1}", he.key(), curKey); + delete(holder, curKey); + Log.tracev("Removed map entry {0} to {1}", holder.key(), curKey); } } diff --git a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/jmap/JMapHolder.java b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/jmap/JMapHolder.java index c77229db..4f06a61f 100644 --- a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/jmap/JMapHolder.java +++ b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/jmap/JMapHolder.java @@ -2,5 +2,9 @@ package com.usatiuk.dhfs.jmap; import com.usatiuk.objects.JData; +/** + * Marker interface that allows an object to hold an ordered key-value map of object references. + * @param + */ public interface JMapHolder extends JData { } diff --git a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/jmap/JMapHolderRefcounterTxHook.java b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/jmap/JMapHolderRefcounterTxHook.java index 66a146c1..a30ad7e8 100644 --- a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/jmap/JMapHolderRefcounterTxHook.java +++ b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/jmap/JMapHolderRefcounterTxHook.java @@ -10,6 +10,9 @@ import com.usatiuk.objects.transaction.Transaction; import jakarta.inject.Inject; import jakarta.inject.Singleton; +/** + * This hook is used to delete all the entries of a map in a map holder when the holder is deleted. + */ @Singleton public class JMapHolderRefcounterTxHook implements PreCommitTxHook { @Inject diff --git a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/jmap/JMapIterator.java b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/jmap/JMapIterator.java index c881a81c..ae6e8873 100644 --- a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/jmap/JMapIterator.java +++ b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/jmap/JMapIterator.java @@ -6,6 +6,10 @@ import com.usatiuk.objects.iterators.CloseableKvIterator; import org.apache.commons.lang3.NotImplementedException; import org.apache.commons.lang3.tuple.Pair; +/** + * Iterates over JMap entries of a given holder. + * @param the type of the key + */ public class JMapIterator implements CloseableKvIterator> { private final CloseableKvIterator _backing; private final JObjectKey _prefix; diff --git a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/jmap/JMapKey.java b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/jmap/JMapKey.java index da3435f4..19413264 100644 --- a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/jmap/JMapKey.java +++ b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/jmap/JMapKey.java @@ -1,4 +1,7 @@ package com.usatiuk.dhfs.jmap; +/** + * Marker interface for JMap keys. TODO: Actually only longs are supported right now. + */ public interface JMapKey extends Comparable { } diff --git a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/jmap/JMapRef.java b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/jmap/JMapRef.java index d3fd2ab1..38d33171 100644 --- a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/jmap/JMapRef.java +++ b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/jmap/JMapRef.java @@ -6,6 +6,12 @@ import com.usatiuk.objects.JObjectKey; import java.util.Comparator; +/** + * A reference from a JMap object to some object. + * This is used to get the parent object from the reference. + * @param holder the object that holds the map + * @param mapKey the key in the map + */ public record JMapRef(JObjectKey holder, JMapKey mapKey) implements JDataRef { @Override public JObjectKey obj() { diff --git a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/peerdiscovery/IpPeerAddress.java b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/peerdiscovery/IpPeerAddress.java index dee6ba44..1526da41 100644 --- a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/peerdiscovery/IpPeerAddress.java +++ b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/peerdiscovery/IpPeerAddress.java @@ -4,6 +4,9 @@ import com.usatiuk.dhfs.peersync.PeerId; import java.net.InetAddress; +/** + * Represents a peer address with an IP address and port. + */ public record IpPeerAddress(PeerId peer, PeerAddressType type, InetAddress address, int port, int securePort) implements PeerAddress { public IpPeerAddress withType(PeerAddressType type) { diff --git a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/peerdiscovery/PeerAddrStringHelper.java b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/peerdiscovery/PeerAddrStringHelper.java index 62ae4c2a..89fa93ef 100644 --- a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/peerdiscovery/PeerAddrStringHelper.java +++ b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/peerdiscovery/PeerAddrStringHelper.java @@ -6,8 +6,20 @@ import java.net.InetAddress; import java.net.UnknownHostException; import java.util.Optional; +/** + * Helper class for parsing peer addresses from strings. + *

+ * The expected format is: ::: + *

+ */ public class PeerAddrStringHelper { + /** + * Parses a string into an IpPeerAddress object. + * + * @param addr the string to parse + * @return an Optional containing the parsed IpPeerAddress, or an empty Optional if the string is empty + */ public static Optional parse(String addr) { if (addr.isEmpty()) { return Optional.empty(); @@ -21,6 +33,13 @@ public class PeerAddrStringHelper { } } + /** + * Parses a string into an IpPeerAddress object, with a manually provided peer ID. + * + * @param peerId the peer ID to use + * @param addr the string to parse + * @return an Optional containing the parsed IpPeerAddress, or an empty Optional if the string is empty + */ public static Optional parseNoPeer(PeerId peerId, String addr) { if (addr.isEmpty()) { return Optional.empty(); diff --git a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/peerdiscovery/PeerAddress.java b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/peerdiscovery/PeerAddress.java index e4d2f4e5..34be7015 100644 --- a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/peerdiscovery/PeerAddress.java +++ b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/peerdiscovery/PeerAddress.java @@ -4,8 +4,21 @@ import com.usatiuk.dhfs.peersync.PeerId; import java.io.Serializable; +/** + * Peer address interface, can be used to represent different types of peer addresses, not only IP. + */ public interface PeerAddress extends Serializable { + /** + * Returns the peer ID associated with this address. + * + * @return the peer ID + */ PeerId peer(); + /** + * Returns the type of this peer address (LAN/WAN/etc) + * + * @return the type of the peer address + */ PeerAddressType type(); } diff --git a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/peerdiscovery/PeerDiscoveryDirectory.java b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/peerdiscovery/PeerDiscoveryDirectory.java index ff76df69..5c4a3bd8 100644 --- a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/peerdiscovery/PeerDiscoveryDirectory.java +++ b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/peerdiscovery/PeerDiscoveryDirectory.java @@ -13,12 +13,21 @@ import java.util.Map; import java.util.Objects; import java.util.stream.Collectors; +/** + * Peer discovery directory collects known peer addresses, and automatically cleans up old entries. + */ @ApplicationScoped public class PeerDiscoveryDirectory { private final MultiValuedMap _entries = new HashSetValuedHashMap<>(); @ConfigProperty(name = "dhfs.peerdiscovery.timeout") long timeout; + /** + * Notifies the directory about a new address for a peer. + * If the address is already known, it updates the last seen time. + * + * @param addr the new address + */ public void notifyAddr(PeerAddress addr) { Log.tracev("New address {0}", addr); synchronized (_entries) { @@ -28,6 +37,13 @@ public class PeerDiscoveryDirectory { } } + /** + * Returns a collection of addresses for a given peer. + * Cleans up old entries that are no longer reachable. + * + * @param peer the peer ID + * @return a collection of addresses for the peer + */ public Collection getForPeer(PeerId peer) { synchronized (_entries) { long curTime = System.currentTimeMillis(); @@ -43,6 +59,12 @@ public class PeerDiscoveryDirectory { } } + /** + * Returns a collection of reachable peers. + * Cleans up old entries that are no longer reachable. + * + * @return a collection of reachable peers + */ public Collection getReachablePeers() { synchronized (_entries) { long curTime = System.currentTimeMillis(); diff --git a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/peerdiscovery/PersistentStaticPeerDiscovery.java b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/peerdiscovery/PersistentStaticPeerDiscovery.java index f371e430..52abccf9 100644 --- a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/peerdiscovery/PersistentStaticPeerDiscovery.java +++ b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/peerdiscovery/PersistentStaticPeerDiscovery.java @@ -5,6 +5,9 @@ import io.quarkus.scheduler.Scheduled; import jakarta.enterprise.context.ApplicationScoped; import jakarta.inject.Inject; +/** + * Notified PeerDiscoveryDirectory about manually added peer addresses. + */ @ApplicationScoped public class PersistentStaticPeerDiscovery { @Inject diff --git a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/peerdiscovery/StaticPeerDiscovery.java b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/peerdiscovery/StaticPeerDiscovery.java index b366359f..9694b33e 100644 --- a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/peerdiscovery/StaticPeerDiscovery.java +++ b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/peerdiscovery/StaticPeerDiscovery.java @@ -9,6 +9,9 @@ import java.util.Arrays; import java.util.List; import java.util.Optional; +/** + * Notifies PeerDiscoveryDirectory about statically configured peer addresses. + */ @ApplicationScoped public class StaticPeerDiscovery { private final List _peers; diff --git a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/peerdiscovery/local/LocalPeerDiscoveryBroadcaster.java b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/peerdiscovery/local/LocalPeerDiscoveryBroadcaster.java index 9e79a923..a156f056 100644 --- a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/peerdiscovery/local/LocalPeerDiscoveryBroadcaster.java +++ b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/peerdiscovery/local/LocalPeerDiscoveryBroadcaster.java @@ -15,6 +15,9 @@ import org.eclipse.microprofile.config.inject.ConfigProperty; import java.net.*; +/** + * Broadcasts information about this peer to the local network. + */ @ApplicationScoped @IfBuildProperty(name = "dhfs.local-discovery", stringValue = "true") public class LocalPeerDiscoveryBroadcaster { diff --git a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/peerdiscovery/local/LocalPeerDiscoveryClient.java b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/peerdiscovery/local/LocalPeerDiscoveryClient.java index aaec3d95..0076d1c4 100644 --- a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/peerdiscovery/local/LocalPeerDiscoveryClient.java +++ b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/peerdiscovery/local/LocalPeerDiscoveryClient.java @@ -19,6 +19,10 @@ import org.eclipse.microprofile.config.inject.ConfigProperty; import java.net.*; import java.nio.ByteBuffer; +/** + * Listens for peer discovery packets from other peers on the local network. + * When a packet is received, it notifies the PeerDiscoveryDirectory about the new peer. + */ @ApplicationScoped @IfBuildProperty(name = "dhfs.local-discovery", stringValue = "true") public class LocalPeerDiscoveryClient { diff --git a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/peersync/InitialSyncProcessor.java b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/peersync/InitialSyncProcessor.java index 6b7e1fb3..cc37e43a 100644 --- a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/peersync/InitialSyncProcessor.java +++ b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/peersync/InitialSyncProcessor.java @@ -3,8 +3,24 @@ package com.usatiuk.dhfs.peersync; import com.usatiuk.objects.JData; import com.usatiuk.objects.JObjectKey; +/** + * Allows to specify custom processing of initial synchronization/crash recovery for a specific object type. + * + * @param the type of the object + */ public interface InitialSyncProcessor { + /** + * Called when the peer is connected for the first time (or needs to be re-synced). + * + * @param from the peer that initiated the sync + * @param key the key of the object to be synchronized + */ void prepareForInitialSync(PeerId from, JObjectKey key); + /** + * Called when the system had crashed (and the object needs to probably be re-synced). + * + * @param key the key of the object to be handled + */ void handleCrash(JObjectKey key); } diff --git a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/peersync/PeerConnectedEventListener.java b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/peersync/PeerConnectedEventListener.java index 2bff44ae..773cfa60 100644 --- a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/peersync/PeerConnectedEventListener.java +++ b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/peersync/PeerConnectedEventListener.java @@ -1,5 +1,13 @@ package com.usatiuk.dhfs.peersync; +/** + * Listener for peer connected events. + */ public interface PeerConnectedEventListener { + /** + * Called when a peer is connected. + * + * @param peerId the ID of the connected peer + */ void handlePeerConnected(PeerId peerId); } diff --git a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/peersync/PeerDisconnectedEventListener.java b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/peersync/PeerDisconnectedEventListener.java index ae84c512..29daa489 100644 --- a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/peersync/PeerDisconnectedEventListener.java +++ b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/peersync/PeerDisconnectedEventListener.java @@ -1,5 +1,13 @@ package com.usatiuk.dhfs.peersync; +/** + * Listener for peer disconnected events. + */ public interface PeerDisconnectedEventListener { + /** + * Called when a peer is disconnected. + * + * @param peerId the ID of the disconnected peer + */ void handlePeerDisconnected(PeerId peerId); } diff --git a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/peersync/PeerId.java b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/peersync/PeerId.java index 55fa6813..c433d5c7 100644 --- a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/peersync/PeerId.java +++ b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/peersync/PeerId.java @@ -4,6 +4,11 @@ import com.usatiuk.objects.JObjectKey; import java.io.Serializable; +/** + * Represents a peer ID + * Type-safe wrapper for JObjectKey + * @param id the ID of the peer (as a JObjectKey) + */ public record PeerId(JObjectKey id) implements Serializable, Comparable { public static PeerId of(String id) { return new PeerId(JObjectKey.of(id)); diff --git a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/peersync/PeerInfo.java b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/peersync/PeerInfo.java index d97c9e3e..e23a42d5 100644 --- a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/peersync/PeerInfo.java +++ b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/peersync/PeerInfo.java @@ -11,6 +11,15 @@ import org.pcollections.PMap; import java.security.cert.X509Certificate; +/** + * Represents information about a peer in the cluster + * + * @param key the key of the peer + * @param id the ID of the peer + * @param cert the certificate of the peer + * @param kickCounter the kick counter of the peer, entries are incremented when a peer is kicked out for not being seen for a long time + * @param lastSeenTimestamp the last time the peer was seen + */ @JDataRemotePush public record PeerInfo(JObjectKey key, PeerId id, ByteString cert, PMap kickCounter, diff --git a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/peersync/PeerInfoService.java b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/peersync/PeerInfoService.java index 97651b57..f4902ffa 100644 --- a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/peersync/PeerInfoService.java +++ b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/peersync/PeerInfoService.java @@ -14,6 +14,9 @@ import jakarta.inject.Inject; import java.util.List; import java.util.Optional; +/** + * Service for managing information about peers connected to the cluster. + */ @ApplicationScoped public class PeerInfoService { public static final JObjectKey TREE_KEY = JObjectKey.of("peers"); @@ -32,7 +35,7 @@ public class PeerInfoService { return jKleppmannTreeManager.getTree(TREE_KEY, () -> null); } - public Optional getPeerInfoImpl(JObjectKey key) { + Optional getPeerInfoImpl(JObjectKey key) { return jObjectTxManager.run(() -> { return curTx.get(JKleppmannTreeNodeHolder.class, key).map(JKleppmannTreeNodeHolder::node).flatMap(node -> { var meta = (JKleppmannTreeNodeMetaPeer) node.meta(); @@ -42,6 +45,12 @@ public class PeerInfoService { } + /** + * Checks if the peer with the given ID is known to the cluster. + * + * @param peer the ID of the peer to check + * @return true if the peer exists, false otherwise + */ public boolean existsPeer(PeerId peer) { return jObjectTxManager.run(() -> { var gotKey = getTree().traverse(List.of(JKleppmannTreeNodeMetaPeer.peerIdToNodeId(peer).value())); @@ -52,6 +61,12 @@ public class PeerInfoService { }); } + /** + * Gets the information about the peer with the given ID. + * + * @param peer the ID of the peer to get information about + * @return an Optional containing the PeerInfo object if the peer exists, or an empty Optional if it does not + */ public Optional getPeerInfo(PeerId peer) { return jObjectTxManager.run(() -> { var gotKey = getTree().traverse(List.of(JKleppmannTreeNodeMetaPeer.peerIdToNodeId(peer).value())); @@ -65,6 +80,11 @@ public class PeerInfoService { }); } + /** + * Gets the information about all peers in the cluster. + * + * @return a list of PeerInfo objects representing all peers in the cluster + */ public List getPeers() { return jObjectTxManager.run(() -> { var gotKey = getTree().traverse(List.of()); @@ -80,6 +100,11 @@ public class PeerInfoService { }); } + /** + * Gets the information about all peers in the cluster, excluding the current peer. + * + * @return a list of PeerInfo objects representing all peers in the cluster, excluding the current peer + */ public List getPeersNoSelf() { return jObjectTxManager.run(() -> { return getPeers().stream().filter( @@ -87,6 +112,12 @@ public class PeerInfoService { }); } + /** + * Gets the information about all synchronized peers in the cluster. + * A peer might not be synchronized if it had not been seen for a while, for example. + * + * @return a list of PeerInfo objects representing all synchronized peers in the cluster + */ public List getSynchronizedPeers() { return jObjectTxManager.run(() -> { return getPeers().stream().filter(pi -> { @@ -98,6 +129,12 @@ public class PeerInfoService { }); } + /** + * Gets the information about all synchronized peers in the cluster, excluding the current peer. + * A peer might not be synchronized if it had not been seen for a while, for example. + * + * @return a list of PeerInfo objects representing all synchronized peers in the cluster, excluding the current peer + */ public List getSynchronizedPeersNoSelf() { return jObjectTxManager.run(() -> { return getPeersNoSelf().stream().filter(pi -> { @@ -106,6 +143,12 @@ public class PeerInfoService { }); } + /** + * Add a new peer to the cluster. + * + * @param id the ID of the peer to add + * @param cert the certificate of the peer + */ public void putPeer(PeerId id, byte[] cert) { jObjectTxManager.run(() -> { var parent = getTree().traverse(List.of()); @@ -115,6 +158,11 @@ public class PeerInfoService { }); } + /** + * Remove a peer from the cluster. + * + * @param id the ID of the peer to remove + */ public void removePeer(PeerId id) { jObjectTxManager.run(() -> { var gotKey = getTree().traverse(List.of(JKleppmannTreeNodeMetaPeer.peerIdToNodeId(id).value())); diff --git a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/peersync/PeerLastSeenUpdater.java b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/peersync/PeerLastSeenUpdater.java index 8a3d037e..35ab5c24 100644 --- a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/peersync/PeerLastSeenUpdater.java +++ b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/peersync/PeerLastSeenUpdater.java @@ -9,6 +9,9 @@ import jakarta.enterprise.context.ApplicationScoped; import jakarta.inject.Inject; import org.eclipse.microprofile.config.inject.ConfigProperty; +/** + * Periodically updates the last seen timestamp of reachable peers and increments the kick counter for unreachable peers. + */ @ApplicationScoped public class PeerLastSeenUpdater { @Inject diff --git a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/peersync/PersistentPeerDataService.java b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/peersync/PersistentPeerDataService.java index c25c0198..d2fa2d34 100644 --- a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/peersync/PersistentPeerDataService.java +++ b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/peersync/PersistentPeerDataService.java @@ -33,6 +33,10 @@ import java.util.Optional; import java.util.UUID; import java.util.concurrent.ExecutorService; +/** + * Handles various locally relevant persistent data related to peers, + * such as local peer's UUID, certificate, and persistent peer addresses. + */ @ApplicationScoped public class PersistentPeerDataService { @Inject diff --git a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/peersync/ReachablePeerManager.java b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/peersync/ReachablePeerManager.java index 4f6eac39..e0fe3af8 100644 --- a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/peersync/ReachablePeerManager.java +++ b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/peersync/ReachablePeerManager.java @@ -29,6 +29,9 @@ import java.util.concurrent.*; import java.util.stream.Collectors; import java.util.stream.Stream; +/** + * Handles connections to known peers in the cluster. + */ @ApplicationScoped public class ReachablePeerManager { private final ConcurrentMap _states = new ConcurrentHashMap<>(); @@ -137,11 +140,10 @@ public class ReachablePeerManager { } } - public void handleConnectionError(com.usatiuk.dhfs.peersync.PeerInfo host) { + private void handleConnectionError(com.usatiuk.dhfs.peersync.PeerInfo host) { handleConnectionError(host.id()); } - // FIXME: private boolean pingCheck(com.usatiuk.dhfs.peersync.PeerInfo host, PeerAddress address) { try { return rpcClientFactory.withObjSyncClient(host.id(), address, pingTimeout, (peer, c) -> { @@ -154,18 +156,37 @@ public class ReachablePeerManager { } } + /** + * Checks if the given host is reachable. + * @param host the host to check + * @return true if the host is reachable, false otherwise + */ public boolean isReachable(PeerId host) { return _states.containsKey(host); } + /** + * Checks if the given host is reachable. + * @param host the host to check + * @return true if the host is reachable, false otherwise + */ public boolean isReachable(com.usatiuk.dhfs.peersync.PeerInfo host) { return isReachable(host.id()); } + /** + * Gets the address of the given host. + * @param host the host to get the address for + * @return the address of the host, or null if not reachable + */ public PeerAddress getAddress(PeerId host) { return _states.get(host); } + /** + * Gets the ids of all reachable hosts. + * @return a list of ids of all reachable hosts + */ public List getAvailableHosts() { return _states.keySet().stream().toList(); } @@ -176,6 +197,10 @@ public class ReachablePeerManager { // .map(Map.Entry::getKey).toList()); // } + /** + * Gets a snapshot of current state of the connected (and not connected) peers + * @return information about all connected/disconnected peers + */ public HostStateSnapshot getHostStateSnapshot() { return transactionManager.run(() -> { var partition = peerInfoService.getPeersNoSelf().stream().map(com.usatiuk.dhfs.peersync.PeerInfo::id) @@ -184,16 +209,31 @@ public class ReachablePeerManager { }); } + /** + * Removes the given host from the cluster + * @param peerId the id of the host to remove + */ public void removeRemoteHost(PeerId peerId) { transactionManager.run(() -> { peerInfoService.removePeer(peerId); }); } + /** + * Selects the best address for the given host. + * The address is selected based on the type of the address. (with e.g. LAN address preferred over WAN) + * @param host the host to select the address for + * @return the best address for the host, or null if not reachable + */ public Optional selectBestAddress(PeerId host) { return peerDiscoveryDirectory.getForPeer(host).stream().min(Comparator.comparing(PeerAddress::type)); } + /** + * Call the given peer and get its information. + * @param host the peer to get the information for + * @return the information about the peer + */ private ApiPeerInfo getInfo(PeerId host) { return transactionManager.run(() -> { if (peerInfoService.getPeerInfo(host).isPresent()) @@ -206,6 +246,12 @@ public class ReachablePeerManager { }); } + /** + * Adds the given peer to the cluster. + * The certificate provided is verified against the one peer is using right now. + * @param host the peer to add + * @param cert the certificate of the peer + */ public void addRemoteHost(PeerId host, @Nullable String cert) { transactionManager.run(() -> { var info = getInfo(host); @@ -222,7 +268,10 @@ public class ReachablePeerManager { peerTrustManager.reloadTrustManagerHosts(transactionManager.run(() -> peerInfoService.getPeers().stream().toList())); //FIXME: } - + /** + * Gets the information about all reachable peers that are not added to the cluster. + * @return a collection of pairs of peer id and peer info + */ public Collection> getSeenButNotAddedHosts() { return transactionManager.run(() -> { return peerDiscoveryDirectory.getReachablePeers().stream().filter(p -> !peerInfoService.getPeerInfo(p).isPresent()) diff --git a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/peersync/api/PeerSyncApi.java b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/peersync/api/PeerSyncApi.java index cfa3192f..eea84c55 100644 --- a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/peersync/api/PeerSyncApi.java +++ b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/peersync/api/PeerSyncApi.java @@ -8,6 +8,9 @@ import jakarta.ws.rs.Path; import java.security.cert.CertificateEncodingException; import java.util.Base64; +/** + * Provides information about the peer publicly, without any authentication. + */ @Path("/peer-info") public class PeerSyncApi { @Inject diff --git a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/peersync/api/PeerSyncApiClientDynamic.java b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/peersync/api/PeerSyncApiClientDynamic.java index 19fa5939..5ac3cd98 100644 --- a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/peersync/api/PeerSyncApiClientDynamic.java +++ b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/peersync/api/PeerSyncApiClientDynamic.java @@ -8,8 +8,17 @@ import jakarta.enterprise.context.ApplicationScoped; import java.net.URI; import java.util.concurrent.TimeUnit; +/** + * Allows to query peers about their information, even if peer isn't part of the cluster. + */ @ApplicationScoped public class PeerSyncApiClientDynamic { + /** + * Queries peer about its information. + * + * @param addr the address of the peer to query + * @return the peer information + */ public ApiPeerInfo getSelfInfo(PeerAddress addr) { return switch (addr) { case IpPeerAddress ipAddr -> getSelfInfo(ipAddr.address().getHostAddress(), ipAddr.port()); diff --git a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/refcount/JDataRef.java b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/refcount/JDataRef.java index 729d081c..0f8fe50a 100644 --- a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/refcount/JDataRef.java +++ b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/refcount/JDataRef.java @@ -4,6 +4,9 @@ import com.usatiuk.objects.JObjectKey; import java.io.Serializable; +/** + * JDataRefs are used to store additional metadata about incoming references to objects for reference counting. + */ public interface JDataRef extends Comparable, Serializable { JObjectKey obj(); }