Sync-base: more javadocs

This commit is contained in:
2025-05-13 22:18:52 +02:00
parent dbad8a2b22
commit 838405fb46
43 changed files with 509 additions and 16 deletions

View File

@@ -139,7 +139,7 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-failsafe-plugin</artifactId>
<configuration>
<forkCount>0.5C</forkCount>
<forkCount>1C</forkCount>
<reuseForks>false</reuseForks>
<parallel>classes</parallel>
<systemPropertyVariables>

View File

@@ -22,6 +22,9 @@ import org.eclipse.microprofile.config.inject.ConfigProperty;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* Simple class to automatically download remote objects in the background.
*/
@ApplicationScoped
public class AutosyncProcessor {
private final HashSetDelayedBlockingQueue<JObjectKey> _pending = new HashSetDelayedBlockingQueue<>(0);
@@ -77,6 +80,11 @@ public class AutosyncProcessor {
_autosyncExcecutor.shutdownNow();
}
/**
* Adds an object to the queue to be downloaded.
*
* @param name the object to add
*/
public void add(JObjectKey name) {
_pending.add(name);
}

View File

@@ -10,6 +10,9 @@ import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import org.eclipse.microprofile.config.inject.ConfigProperty;
/**
* Pre-commit hook to automatically download remote objects, if the option to download all objects is enabled.
*/
@Singleton
public class AutosyncTxHook implements PreCommitTxHook {
@Inject

View File

@@ -7,6 +7,9 @@ import org.apache.commons.collections4.multimap.HashSetValuedHashMap;
import java.io.Serial;
import java.io.Serializable;
/**
* Stores data about deferred invalidations.
*/
public class DeferredInvalidationQueueData implements Serializable {
@Serial
private static final long serialVersionUID = 1L;

View File

@@ -20,6 +20,11 @@ import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
/**
* Service to handle deferred invalidations.
* It is responsible for storing and returning deferred invalidations to the invalidation queue.
* It also is responsible for persisting and restoring the deferred invalidations on startup and shutdown.
*/
@ApplicationScoped
public class DeferredInvalidationQueueService implements PeerConnectedEventListener {
private static final String dataFileName = "invqueue";
@@ -59,7 +64,9 @@ public class DeferredInvalidationQueueService implements PeerConnectedEventListe
}
}
// FIXME:
/**
* Periodically returns deferred invalidations to the invalidation queue for all reachable hosts.
*/
@Scheduled(every = "15s", concurrentExecution = Scheduled.ConcurrentExecution.SKIP)
@Blocking
void periodicReturn() {
@@ -67,6 +74,11 @@ public class DeferredInvalidationQueueService implements PeerConnectedEventListe
returnForHost(reachable);
}
/**
* Returns deferred invalidations for a specific host.
*
* @param host the host to return deferred invalidations for
*/
void returnForHost(PeerId host) {
synchronized (this) {
var col = _persistentData.deferredInvalidations.get(host);
@@ -78,6 +90,10 @@ public class DeferredInvalidationQueueService implements PeerConnectedEventListe
}
}
/**
* Defer a specific invalidation.
* @param entry the invalidation to defer
*/
void defer(InvalidationQueueEntry entry) {
synchronized (this) {
Log.tracev("Deferred invalidation: {0}", entry);

View File

@@ -8,6 +8,12 @@ import org.pcollections.PMap;
import java.util.Collection;
import java.util.List;
/**
* Information about a new version of a remote object, possibly with its data.
* @param key the key of the object
* @param changelog the changelog of the object (version vector)
* @param data the data of the object
*/
public record IndexUpdateOp(JObjectKey key, PMap<PeerId, Long> changelog, JDataRemoteDto data) implements Op {
@Override
public Collection<JObjectKey> getEscapedRefs() {

View File

@@ -2,8 +2,8 @@ package com.usatiuk.dhfs.invalidation;
import com.usatiuk.dhfs.peersync.PeerId;
import com.usatiuk.dhfs.peersync.PeerInfoService;
import com.usatiuk.dhfs.peersync.ReachablePeerManager;
import com.usatiuk.dhfs.peersync.PersistentPeerDataService;
import com.usatiuk.dhfs.peersync.ReachablePeerManager;
import com.usatiuk.dhfs.rpc.RemoteObjectServiceClient;
import com.usatiuk.objects.JData;
import com.usatiuk.objects.JObjectKey;
@@ -31,6 +31,11 @@ import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
/**
* Service to handle sending operations to remote peers.
* This service works with objects, containing a queue of them.
* The operations to be sent to peers are extracted from the objects in the queue.
*/
@ApplicationScoped
public class InvalidationQueueService {
private final HashSetDelayedBlockingQueue<InvalidationQueueEntry> _queue;
@@ -118,6 +123,7 @@ public class InvalidationQueueService {
String stats = "Sent invalidation: ";
long success = 0;
// Don't try to send same object in multiple threads
List<AutoCloseableNoThrow> locks = new LinkedList<>();
try {
ArrayListValuedHashMap<PeerId, Op> ops = new ArrayListValuedHashMap<>();
@@ -194,6 +200,11 @@ public class InvalidationQueueService {
Log.info("Invalidation sender exiting");
}
/**
* Extract operations from an object for all peers and push them.
*
* @param key the object key to process
*/
public void pushInvalidationToAll(JObjectKey key) {
while (true) {
var queue = _toAllQueue.get();
@@ -209,6 +220,7 @@ public class InvalidationQueueService {
}
}
void pushInvalidationToOne(InvalidationQueueEntry entry) {
if (reachablePeerManager.isReachable(entry.peer()))
_queue.add(entry);
@@ -223,11 +235,23 @@ public class InvalidationQueueService {
deferredInvalidationQueueService.defer(entry);
}
/**
* Extract operations from an object for some specific peer and push them.
*
* @param host the host to extract operations for
* @param obj the object key to process
*/
public void pushInvalidationToOne(PeerId host, JObjectKey obj) {
var entry = new InvalidationQueueEntry(host, obj);
pushInvalidationToOne(entry);
}
/**
* Extract operations from an object for some specific peer and push them, without delay.
*
* @param host the host to extract operations for
* @param obj the object key to process
*/
public void pushInvalidationToOneNoDelay(PeerId host, JObjectKey obj) {
var entry = new InvalidationQueueEntry(host, obj);
pushInvalidationToOneNoDelay(entry);

View File

@@ -5,6 +5,16 @@ import com.usatiuk.objects.JObjectKey;
import java.io.Serializable;
import java.util.Collection;
/**
* Represents a unit of information to be sent to another peer.
* The operations are extracted from objects in the key-value storage, and then sent to peers.
*/
public interface Op extends Serializable {
/**
* Returns the keys of the objects that are referenced in this op.
* These objects should be marked as "escaped" in the local storage for the purposed of garbage collection.
*
* @return the keys of the objects that are referenced in this operation
*/
Collection<JObjectKey> getEscapedRefs();
}

View File

@@ -6,6 +6,17 @@ import org.apache.commons.lang3.tuple.Pair;
import java.util.List;
/**
* Interface for extracting operations from data objects.
* @param <T> the type of data
*/
public interface OpExtractor<T extends JData> {
/**
* Extract operations from the given data object.
*
* @param data the data object to extract operations from
* @param peerId the ID of the peer to extract operations for
* @return a pair of a list of operations and a runnable to execute after the operations are sent to the peer
*/
Pair<List<Op>, Runnable> extractOps(T data, PeerId peerId);
}

View File

@@ -14,6 +14,11 @@ import java.util.List;
import java.util.Map;
import java.util.stream.Stream;
/**
* Service for extracting operations from JData objects.
* This service uses the {@link OpExtractor} interface to extract operations from JData objects.
* It is used to extract operations from JData objects before they are sent to the peer.
*/
@ApplicationScoped
public class OpExtractorService {
private final Map<Class<? extends JData>, OpExtractor> _opExtractorMap;
@@ -38,6 +43,13 @@ public class OpExtractorService {
_opExtractorMap = Map.copyOf(opExtractorMap);
}
/**
* Extract operations from the given JData object.
*
* @param data the JData object to extract operations from
* @param peerId the ID of the peer to extract operations for
* @return a pair of a list of operations and a runnable to execute after the operations are sent to the peer
*/
public @Nullable Pair<List<Op>, Runnable> extractOps(JData data, PeerId peerId) {
var extractor = _opExtractorMap.get(data.getClass());
if (extractor == null) {

View File

@@ -2,6 +2,16 @@ package com.usatiuk.dhfs.invalidation;
import com.usatiuk.dhfs.peersync.PeerId;
/**
* Interface for handling operations.
* @param <T> the type of operation
*/
public interface OpHandler<T extends Op> {
/**
* Handles the given operation.
*
* @param from the ID of the peer that sent the operation
* @param op the operation to handle
*/
void handleOp(PeerId from, T op);
}

View File

@@ -10,6 +10,11 @@ import java.util.HashMap;
import java.util.Map;
import java.util.stream.Stream;
/**
* Service for handling operations.
* This service uses the {@link OpHandler} interface to handle operations.
* It is used to handle operations received from the peer.
*/
@ApplicationScoped
public class OpHandlerService {
private final Map<Class<? extends Op>, OpHandler> _opHandlerMap;
@@ -34,6 +39,12 @@ public class OpHandlerService {
_opHandlerMap = Map.copyOf(OpHandlerMap);
}
/**
* Handle the given operation.
*
* @param from the ID of the peer that sent the operation
* @param op the operation to handle
*/
public void handleOp(PeerId from, Op op) {
var handler = _opHandlerMap.get(op.getClass());
if (handler == null) {

View File

@@ -24,6 +24,10 @@ import java.util.*;
import java.util.function.Function;
import java.util.function.Supplier;
/**
* Automatically synchronized and persistent Kleppmann tree service.
* The trees are identified by their names, and can have any type of root node.
*/
@ApplicationScoped
public class JKleppmannTreeManager {
private static final String dataFileName = "trees";
@@ -38,6 +42,12 @@ public class JKleppmannTreeManager {
@Inject
PersistentPeerDataService persistentPeerDataService;
/**
* Get or create a tree with the given name.
* @param name the name of the tree
* @param rootNodeSupplier a supplier for the root node meta
* @return the tree
*/
public JKleppmannTree getTree(JObjectKey name, Supplier<JKleppmannTreeNodeMeta> rootNodeSupplier) {
return txManager.executeTx(() -> {
var data = curTx.get(JKleppmannTreePersistentData.class, name).orElse(null);
@@ -64,13 +74,20 @@ public class JKleppmannTreeManager {
});
}
/**
* Get a tree with the given name.
* @param name the name of the tree
* @return the tree
*/
public Optional<JKleppmannTree> getTree(JObjectKey name) {
return txManager.executeTx(() -> {
return curTx.get(JKleppmannTreePersistentData.class, name).map(JKleppmannTree::new);
});
}
/**
* Kleppmann tree wrapper, automatically synchronized and persistent.
*/
public class JKleppmannTree {
private final KleppmannTree<Long, PeerId, JKleppmannTreeNodeMeta, JObjectKey> _tree;
private final JKleppmannTreeStorageInterface _storageInterface;
@@ -88,26 +105,57 @@ public class JKleppmannTreeManager {
_tree = new KleppmannTree<>(_storageInterface, peerInterface, _clock, new JOpRecorder());
}
/**
* Traverse the tree from root to find a node with the given name.
* @param names list of names to traverse
* @return the node key
*/
public JObjectKey traverse(List<String> names) {
return _tree.traverse(names);
}
/**
* Get a new node id. (random)
* @return the new node id
*/
public JObjectKey getNewNodeId() {
return _storageInterface.getNewNodeId();
}
/**
* Move a node to a new parent.
* @param newParent the new parent
* @param newMeta the new node metadata
* @param node the node to move
*/
public void move(JObjectKey newParent, JKleppmannTreeNodeMeta newMeta, JObjectKey node) {
_tree.move(newParent, newMeta, node);
}
/**
* Move a node to the trash.
* @param newMeta the new node metadata
* @param nodeKey the node key
*/
public void trash(JKleppmannTreeNodeMeta newMeta, JObjectKey nodeKey) {
_tree.move(_storageInterface.getTrashId(), newMeta.withName(nodeKey.toString()), nodeKey);
}
/**
* Check if there are any pending operations for the given peer.
* @param host the peer id
* @return true if there are pending operations, false otherwise
*/
public boolean hasPendingOpsForHost(PeerId host) {
return !_data.queues().getOrDefault(host, TreePMap.empty()).isEmpty();
}
/**
* Get the pending operations for the given peer.
* @param host the peer id
* @param limit the maximum number of operations to return
* @return the list of pending operations
*/
public List<Op> getPendingOpsForHost(PeerId host, int limit) {
ArrayList<Op> collected = new ArrayList<>();
for (var node : _data.queues().getOrDefault(host, TreePMap.empty()).entrySet()) {
@@ -119,7 +167,13 @@ public class JKleppmannTreeManager {
return Collections.unmodifiableList(collected);
}
// @Override
/**
* Mark the operation as committed for the given host.
* This should be called when the operation is successfully applied on the host.
* All operations should be sent and received in timestamp order.
* @param host the peer id
* @param op the operation to commit
*/
public void commitOpForHost(PeerId host, Op op) {
if (op instanceof JKleppmannTreePeriodicPushOp)
return;
@@ -135,15 +189,27 @@ public class JKleppmannTreeManager {
curTx.put(_data);
}
/**
* Record bootstrap operations for the given host.
* @param host the peer id
*/
public void recordBootstrap(PeerId host) {
_tree.recordBoostrapFor(host);
}
/**
* Get the parent of a node that matches the given predicate.
* @param predicate the predicate to match
*/
public Pair<String, JObjectKey> findParent(Function<TreeNode<Long, PeerId, JKleppmannTreeNodeMeta, JObjectKey>, Boolean> predicate) {
return _tree.findParent(predicate);
}
// @Override
/**
* Accept an external operation from the given peer.
* @param from the peer id
* @param op the operation to accept
*/
public void acceptExternalOp(PeerId from, Op op) {
if (op instanceof JKleppmannTreePeriodicPushOp(JObjectKey treeName, PeerId from1, long timestamp)) {
_tree.updateExternalTimestamp(from1, timestamp);
@@ -166,6 +232,11 @@ public class JKleppmannTreeManager {
_tree.applyExternalOp(from, jop.op());
}
/**
* Create a dummy operation that contains the timestamp of the last operation, to move causality threshold
* forward even without any real operations.
* @return the periodic push operation
*/
public Op getPeriodicPushOp() {
return new JKleppmannTreePeriodicPushOp(_treeName, persistentPeerDataService.getSelfUuid(), _clock.peekTimestamp());
}

View File

@@ -14,6 +14,9 @@ import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
* KleppmannTree node implementation for JKleppmannTree
*/
public record JKleppmannTreeNode(JObjectKey key, JObjectKey parent,
OpMove<Long, PeerId, JKleppmannTreeNodeMeta, JObjectKey> lastEffectiveOp,
@Nullable JKleppmannTreeNodeMeta meta,

View File

@@ -10,7 +10,9 @@ import java.io.Serializable;
import java.util.Collection;
import java.util.Objects;
// Separate refcounting from JKleppmannTreeNode
/**
* Separate reference counting from JKleppmannTreeNode
*/
public record JKleppmannTreeNodeHolder(PCollection<JDataRef> refsFrom, boolean frozen,
JKleppmannTreeNode node) implements JDataRefcounted, Serializable {

View File

@@ -5,7 +5,6 @@ import com.usatiuk.objects.JObjectKey;
import java.util.Collection;
//@ProtoMirror(JKleppmannTreeNodeMetaP.class)
public interface JKleppmannTreeNodeMeta extends NodeMeta {
JKleppmannTreeNodeMeta withName(String name);

View File

@@ -14,6 +14,9 @@ import org.pcollections.PSortedMap;
import java.util.Collection;
import java.util.List;
/**
* Various persistent data for JKleppmannTree
*/
public record JKleppmannTreePersistentData(
JObjectKey key, PCollection<JDataRef> refsFrom, boolean frozen,
long clock,

View File

@@ -12,6 +12,12 @@ import java.util.ArrayList;
import java.util.Optional;
// TODO: It's not actually generic right now, only longs are supported essentially
/**
* Persistent-storage backed ordered map service.
* Local and remote objects can implement the ${@link JMapHolder} interface, then they can be used with this service
* to store references to other objects identified by sorded keys of some kind. (for now only longs)
*/
@Singleton
public class JMapHelper {
@Inject
@@ -33,29 +39,69 @@ public class JMapHelper {
return JObjectKey.of(holder.value() + ">");
}
/**
* Get an iterator for the map of a given holder.
* @param holder the holder of the map
* @param start the start position of the iterator relative to the key
* @param key the key to start the iterator from
* @return an iterator for the map of the given holder
* @param <K> the type of the key
*/
public <K extends JMapKey> CloseableKvIterator<K, JMapEntry<K>> getIterator(JMapHolder<K> holder, IteratorStart start, K key) {
return new JMapIterator<>(curTx.getIterator(start, makeKey(holder.key(), key)), holder);
}
/**
* Get an iterator for the map of a given holder. The iterator starts from the first key.
* @param holder the holder of the map
* @return an iterator for the map of the given holder
* @param <K> the type of the key
*/
public <K extends JMapKey> CloseableKvIterator<K, JMapEntry<K>> getIterator(JMapHolder<K> holder) {
return new JMapIterator<>(curTx.getIterator(IteratorStart.GT, makeKeyFirst(holder.key())), holder);
}
/**
* Put a new entry into the map of a given holder.
* @param holder the holder of the map
* @param key the key to put
* @param ref the key of the object reference to which to record
* @param <K> the type of the key
*/
public <K extends JMapKey> void put(JMapHolder<K> holder, K key, JObjectKey ref) {
curTx.put(new JMapEntry<>(holder.key(), key, ref));
}
/**
* Get an entry from the map of a given holder.
* @param holder the holder of the map
* @param key the key to get
* @return an optional containing the entry if it exists, or an empty optional if it does not
* @param <K> the type of the key
*/
public <K extends JMapKey> Optional<JMapEntry<K>> get(JMapHolder<K> holder, K key) {
return curTx.get(JMapEntry.class, makeKey(holder.key(), key)).map(e -> (JMapEntry<K>) e);
}
/**
* Delete an entry from the map of a given holder.
* @param holder the holder of the map
* @param key the key to delete
* @param <K> the type of the key
*/
public <K extends JMapKey> void delete(JMapHolder<K> holder, K key) {
curTx.delete(makeKey(holder.key(), key));
}
public <K extends JMapKey> void deleteAll(JMapHolder<K> he) {
/**
* Delete all entries from the map of a given holder.
* @param holder the holder of the map
* @param <K> the type of the key
*/
public <K extends JMapKey> void deleteAll(JMapHolder<K> holder) {
ArrayList<K> collectedKeys = new ArrayList<>();
try (var it = getIterator(he)) {
try (var it = getIterator(holder)) {
while (it.hasNext()) {
var curKey = it.peekNextKey();
collectedKeys.add(curKey);
@@ -64,8 +110,8 @@ public class JMapHelper {
}
for (var curKey : collectedKeys) {
delete(he, curKey);
Log.tracev("Removed map entry {0} to {1}", he.key(), curKey);
delete(holder, curKey);
Log.tracev("Removed map entry {0} to {1}", holder.key(), curKey);
}
}

View File

@@ -2,5 +2,9 @@ package com.usatiuk.dhfs.jmap;
import com.usatiuk.objects.JData;
/**
* Marker interface that allows an object to hold an ordered key-value map of object references.
* @param <K>
*/
public interface JMapHolder<K extends JMapKey> extends JData {
}

View File

@@ -10,6 +10,9 @@ import com.usatiuk.objects.transaction.Transaction;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
/**
* This hook is used to delete all the entries of a map in a map holder when the holder is deleted.
*/
@Singleton
public class JMapHolderRefcounterTxHook implements PreCommitTxHook {
@Inject

View File

@@ -6,6 +6,10 @@ import com.usatiuk.objects.iterators.CloseableKvIterator;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.commons.lang3.tuple.Pair;
/**
* Iterates over JMap entries of a given holder.
* @param <K> the type of the key
*/
public class JMapIterator<K extends JMapKey> implements CloseableKvIterator<K, JMapEntry<K>> {
private final CloseableKvIterator<JObjectKey, JData> _backing;
private final JObjectKey _prefix;

View File

@@ -1,4 +1,7 @@
package com.usatiuk.dhfs.jmap;
/**
* Marker interface for JMap keys. TODO: Actually only longs are supported right now.
*/
public interface JMapKey extends Comparable<JMapKey> {
}

View File

@@ -6,6 +6,12 @@ import com.usatiuk.objects.JObjectKey;
import java.util.Comparator;
/**
* A reference from a JMap object to some object.
* This is used to get the parent object from the reference.
* @param holder the object that holds the map
* @param mapKey the key in the map
*/
public record JMapRef(JObjectKey holder, JMapKey mapKey) implements JDataRef {
@Override
public JObjectKey obj() {

View File

@@ -4,6 +4,9 @@ import com.usatiuk.dhfs.peersync.PeerId;
import java.net.InetAddress;
/**
* Represents a peer address with an IP address and port.
*/
public record IpPeerAddress(PeerId peer, PeerAddressType type,
InetAddress address, int port, int securePort) implements PeerAddress {
public IpPeerAddress withType(PeerAddressType type) {

View File

@@ -6,8 +6,20 @@ import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Optional;
/**
* Helper class for parsing peer addresses from strings.
* <p>
* The expected format is: <peerId>:<ip>:<port>:<securePort>
* </p>
*/
public class PeerAddrStringHelper {
/**
* Parses a string into an IpPeerAddress object.
*
* @param addr the string to parse
* @return an Optional containing the parsed IpPeerAddress, or an empty Optional if the string is empty
*/
public static Optional<IpPeerAddress> parse(String addr) {
if (addr.isEmpty()) {
return Optional.empty();
@@ -21,6 +33,13 @@ public class PeerAddrStringHelper {
}
}
/**
* Parses a string into an IpPeerAddress object, with a manually provided peer ID.
*
* @param peerId the peer ID to use
* @param addr the string to parse
* @return an Optional containing the parsed IpPeerAddress, or an empty Optional if the string is empty
*/
public static Optional<IpPeerAddress> parseNoPeer(PeerId peerId, String addr) {
if (addr.isEmpty()) {
return Optional.empty();

View File

@@ -4,8 +4,21 @@ import com.usatiuk.dhfs.peersync.PeerId;
import java.io.Serializable;
/**
* Peer address interface, can be used to represent different types of peer addresses, not only IP.
*/
public interface PeerAddress extends Serializable {
/**
* Returns the peer ID associated with this address.
*
* @return the peer ID
*/
PeerId peer();
/**
* Returns the type of this peer address (LAN/WAN/etc)
*
* @return the type of the peer address
*/
PeerAddressType type();
}

View File

@@ -13,12 +13,21 @@ import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
/**
* Peer discovery directory collects known peer addresses, and automatically cleans up old entries.
*/
@ApplicationScoped
public class PeerDiscoveryDirectory {
private final MultiValuedMap<PeerId, PeerEntry> _entries = new HashSetValuedHashMap<>();
@ConfigProperty(name = "dhfs.peerdiscovery.timeout")
long timeout;
/**
* Notifies the directory about a new address for a peer.
* If the address is already known, it updates the last seen time.
*
* @param addr the new address
*/
public void notifyAddr(PeerAddress addr) {
Log.tracev("New address {0}", addr);
synchronized (_entries) {
@@ -28,6 +37,13 @@ public class PeerDiscoveryDirectory {
}
}
/**
* Returns a collection of addresses for a given peer.
* Cleans up old entries that are no longer reachable.
*
* @param peer the peer ID
* @return a collection of addresses for the peer
*/
public Collection<PeerAddress> getForPeer(PeerId peer) {
synchronized (_entries) {
long curTime = System.currentTimeMillis();
@@ -43,6 +59,12 @@ public class PeerDiscoveryDirectory {
}
}
/**
* Returns a collection of reachable peers.
* Cleans up old entries that are no longer reachable.
*
* @return a collection of reachable peers
*/
public Collection<PeerId> getReachablePeers() {
synchronized (_entries) {
long curTime = System.currentTimeMillis();

View File

@@ -5,6 +5,9 @@ import io.quarkus.scheduler.Scheduled;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
/**
* Notified PeerDiscoveryDirectory about manually added peer addresses.
*/
@ApplicationScoped
public class PersistentStaticPeerDiscovery {
@Inject

View File

@@ -9,6 +9,9 @@ import java.util.Arrays;
import java.util.List;
import java.util.Optional;
/**
* Notifies PeerDiscoveryDirectory about statically configured peer addresses.
*/
@ApplicationScoped
public class StaticPeerDiscovery {
private final List<IpPeerAddress> _peers;

View File

@@ -15,6 +15,9 @@ import org.eclipse.microprofile.config.inject.ConfigProperty;
import java.net.*;
/**
* Broadcasts information about this peer to the local network.
*/
@ApplicationScoped
@IfBuildProperty(name = "dhfs.local-discovery", stringValue = "true")
public class LocalPeerDiscoveryBroadcaster {

View File

@@ -19,6 +19,10 @@ import org.eclipse.microprofile.config.inject.ConfigProperty;
import java.net.*;
import java.nio.ByteBuffer;
/**
* Listens for peer discovery packets from other peers on the local network.
* When a packet is received, it notifies the PeerDiscoveryDirectory about the new peer.
*/
@ApplicationScoped
@IfBuildProperty(name = "dhfs.local-discovery", stringValue = "true")
public class LocalPeerDiscoveryClient {

View File

@@ -3,8 +3,24 @@ package com.usatiuk.dhfs.peersync;
import com.usatiuk.objects.JData;
import com.usatiuk.objects.JObjectKey;
/**
* Allows to specify custom processing of initial synchronization/crash recovery for a specific object type.
*
* @param <T> the type of the object
*/
public interface InitialSyncProcessor<T extends JData> {
/**
* Called when the peer is connected for the first time (or needs to be re-synced).
*
* @param from the peer that initiated the sync
* @param key the key of the object to be synchronized
*/
void prepareForInitialSync(PeerId from, JObjectKey key);
/**
* Called when the system had crashed (and the object needs to probably be re-synced).
*
* @param key the key of the object to be handled
*/
void handleCrash(JObjectKey key);
}

View File

@@ -1,5 +1,13 @@
package com.usatiuk.dhfs.peersync;
/**
* Listener for peer connected events.
*/
public interface PeerConnectedEventListener {
/**
* Called when a peer is connected.
*
* @param peerId the ID of the connected peer
*/
void handlePeerConnected(PeerId peerId);
}

View File

@@ -1,5 +1,13 @@
package com.usatiuk.dhfs.peersync;
/**
* Listener for peer disconnected events.
*/
public interface PeerDisconnectedEventListener {
/**
* Called when a peer is disconnected.
*
* @param peerId the ID of the disconnected peer
*/
void handlePeerDisconnected(PeerId peerId);
}

View File

@@ -4,6 +4,11 @@ import com.usatiuk.objects.JObjectKey;
import java.io.Serializable;
/**
* Represents a peer ID
* Type-safe wrapper for JObjectKey
* @param id the ID of the peer (as a JObjectKey)
*/
public record PeerId(JObjectKey id) implements Serializable, Comparable<PeerId> {
public static PeerId of(String id) {
return new PeerId(JObjectKey.of(id));

View File

@@ -11,6 +11,15 @@ import org.pcollections.PMap;
import java.security.cert.X509Certificate;
/**
* Represents information about a peer in the cluster
*
* @param key the key of the peer
* @param id the ID of the peer
* @param cert the certificate of the peer
* @param kickCounter the kick counter of the peer, entries are incremented when a peer is kicked out for not being seen for a long time
* @param lastSeenTimestamp the last time the peer was seen
*/
@JDataRemotePush
public record PeerInfo(JObjectKey key, PeerId id, ByteString cert,
PMap<PeerId, Long> kickCounter,

View File

@@ -14,6 +14,9 @@ import jakarta.inject.Inject;
import java.util.List;
import java.util.Optional;
/**
* Service for managing information about peers connected to the cluster.
*/
@ApplicationScoped
public class PeerInfoService {
public static final JObjectKey TREE_KEY = JObjectKey.of("peers");
@@ -32,7 +35,7 @@ public class PeerInfoService {
return jKleppmannTreeManager.getTree(TREE_KEY, () -> null);
}
public Optional<PeerInfo> getPeerInfoImpl(JObjectKey key) {
Optional<PeerInfo> getPeerInfoImpl(JObjectKey key) {
return jObjectTxManager.run(() -> {
return curTx.get(JKleppmannTreeNodeHolder.class, key).map(JKleppmannTreeNodeHolder::node).flatMap(node -> {
var meta = (JKleppmannTreeNodeMetaPeer) node.meta();
@@ -42,6 +45,12 @@ public class PeerInfoService {
}
/**
* Checks if the peer with the given ID is known to the cluster.
*
* @param peer the ID of the peer to check
* @return true if the peer exists, false otherwise
*/
public boolean existsPeer(PeerId peer) {
return jObjectTxManager.run(() -> {
var gotKey = getTree().traverse(List.of(JKleppmannTreeNodeMetaPeer.peerIdToNodeId(peer).value()));
@@ -52,6 +61,12 @@ public class PeerInfoService {
});
}
/**
* Gets the information about the peer with the given ID.
*
* @param peer the ID of the peer to get information about
* @return an Optional containing the PeerInfo object if the peer exists, or an empty Optional if it does not
*/
public Optional<PeerInfo> getPeerInfo(PeerId peer) {
return jObjectTxManager.run(() -> {
var gotKey = getTree().traverse(List.of(JKleppmannTreeNodeMetaPeer.peerIdToNodeId(peer).value()));
@@ -65,6 +80,11 @@ public class PeerInfoService {
});
}
/**
* Gets the information about all peers in the cluster.
*
* @return a list of PeerInfo objects representing all peers in the cluster
*/
public List<PeerInfo> getPeers() {
return jObjectTxManager.run(() -> {
var gotKey = getTree().traverse(List.of());
@@ -80,6 +100,11 @@ public class PeerInfoService {
});
}
/**
* Gets the information about all peers in the cluster, excluding the current peer.
*
* @return a list of PeerInfo objects representing all peers in the cluster, excluding the current peer
*/
public List<PeerInfo> getPeersNoSelf() {
return jObjectTxManager.run(() -> {
return getPeers().stream().filter(
@@ -87,6 +112,12 @@ public class PeerInfoService {
});
}
/**
* Gets the information about all synchronized peers in the cluster.
* A peer might not be synchronized if it had not been seen for a while, for example.
*
* @return a list of PeerInfo objects representing all synchronized peers in the cluster
*/
public List<PeerInfo> getSynchronizedPeers() {
return jObjectTxManager.run(() -> {
return getPeers().stream().filter(pi -> {
@@ -98,6 +129,12 @@ public class PeerInfoService {
});
}
/**
* Gets the information about all synchronized peers in the cluster, excluding the current peer.
* A peer might not be synchronized if it had not been seen for a while, for example.
*
* @return a list of PeerInfo objects representing all synchronized peers in the cluster, excluding the current peer
*/
public List<PeerInfo> getSynchronizedPeersNoSelf() {
return jObjectTxManager.run(() -> {
return getPeersNoSelf().stream().filter(pi -> {
@@ -106,6 +143,12 @@ public class PeerInfoService {
});
}
/**
* Add a new peer to the cluster.
*
* @param id the ID of the peer to add
* @param cert the certificate of the peer
*/
public void putPeer(PeerId id, byte[] cert) {
jObjectTxManager.run(() -> {
var parent = getTree().traverse(List.of());
@@ -115,6 +158,11 @@ public class PeerInfoService {
});
}
/**
* Remove a peer from the cluster.
*
* @param id the ID of the peer to remove
*/
public void removePeer(PeerId id) {
jObjectTxManager.run(() -> {
var gotKey = getTree().traverse(List.of(JKleppmannTreeNodeMetaPeer.peerIdToNodeId(id).value()));

View File

@@ -9,6 +9,9 @@ import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import org.eclipse.microprofile.config.inject.ConfigProperty;
/**
* Periodically updates the last seen timestamp of reachable peers and increments the kick counter for unreachable peers.
*/
@ApplicationScoped
public class PeerLastSeenUpdater {
@Inject

View File

@@ -33,6 +33,10 @@ import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
/**
* Handles various locally relevant persistent data related to peers,
* such as local peer's UUID, certificate, and persistent peer addresses.
*/
@ApplicationScoped
public class PersistentPeerDataService {
@Inject

View File

@@ -29,6 +29,9 @@ import java.util.concurrent.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
* Handles connections to known peers in the cluster.
*/
@ApplicationScoped
public class ReachablePeerManager {
private final ConcurrentMap<PeerId, PeerAddress> _states = new ConcurrentHashMap<>();
@@ -137,11 +140,10 @@ public class ReachablePeerManager {
}
}
public void handleConnectionError(com.usatiuk.dhfs.peersync.PeerInfo host) {
private void handleConnectionError(com.usatiuk.dhfs.peersync.PeerInfo host) {
handleConnectionError(host.id());
}
// FIXME:
private boolean pingCheck(com.usatiuk.dhfs.peersync.PeerInfo host, PeerAddress address) {
try {
return rpcClientFactory.withObjSyncClient(host.id(), address, pingTimeout, (peer, c) -> {
@@ -154,18 +156,37 @@ public class ReachablePeerManager {
}
}
/**
* Checks if the given host is reachable.
* @param host the host to check
* @return true if the host is reachable, false otherwise
*/
public boolean isReachable(PeerId host) {
return _states.containsKey(host);
}
/**
* Checks if the given host is reachable.
* @param host the host to check
* @return true if the host is reachable, false otherwise
*/
public boolean isReachable(com.usatiuk.dhfs.peersync.PeerInfo host) {
return isReachable(host.id());
}
/**
* Gets the address of the given host.
* @param host the host to get the address for
* @return the address of the host, or null if not reachable
*/
public PeerAddress getAddress(PeerId host) {
return _states.get(host);
}
/**
* Gets the ids of all reachable hosts.
* @return a list of ids of all reachable hosts
*/
public List<PeerId> getAvailableHosts() {
return _states.keySet().stream().toList();
}
@@ -176,6 +197,10 @@ public class ReachablePeerManager {
// .map(Map.Entry::getKey).toList());
// }
/**
* Gets a snapshot of current state of the connected (and not connected) peers
* @return information about all connected/disconnected peers
*/
public HostStateSnapshot getHostStateSnapshot() {
return transactionManager.run(() -> {
var partition = peerInfoService.getPeersNoSelf().stream().map(com.usatiuk.dhfs.peersync.PeerInfo::id)
@@ -184,16 +209,31 @@ public class ReachablePeerManager {
});
}
/**
* Removes the given host from the cluster
* @param peerId the id of the host to remove
*/
public void removeRemoteHost(PeerId peerId) {
transactionManager.run(() -> {
peerInfoService.removePeer(peerId);
});
}
/**
* Selects the best address for the given host.
* The address is selected based on the type of the address. (with e.g. LAN address preferred over WAN)
* @param host the host to select the address for
* @return the best address for the host, or null if not reachable
*/
public Optional<PeerAddress> selectBestAddress(PeerId host) {
return peerDiscoveryDirectory.getForPeer(host).stream().min(Comparator.comparing(PeerAddress::type));
}
/**
* Call the given peer and get its information.
* @param host the peer to get the information for
* @return the information about the peer
*/
private ApiPeerInfo getInfo(PeerId host) {
return transactionManager.run(() -> {
if (peerInfoService.getPeerInfo(host).isPresent())
@@ -206,6 +246,12 @@ public class ReachablePeerManager {
});
}
/**
* Adds the given peer to the cluster.
* The certificate provided is verified against the one peer is using right now.
* @param host the peer to add
* @param cert the certificate of the peer
*/
public void addRemoteHost(PeerId host, @Nullable String cert) {
transactionManager.run(() -> {
var info = getInfo(host);
@@ -222,7 +268,10 @@ public class ReachablePeerManager {
peerTrustManager.reloadTrustManagerHosts(transactionManager.run(() -> peerInfoService.getPeers().stream().toList())); //FIXME:
}
/**
* Gets the information about all reachable peers that are not added to the cluster.
* @return a collection of pairs of peer id and peer info
*/
public Collection<Pair<PeerId, ApiPeerInfo>> getSeenButNotAddedHosts() {
return transactionManager.run(() -> {
return peerDiscoveryDirectory.getReachablePeers().stream().filter(p -> !peerInfoService.getPeerInfo(p).isPresent())

View File

@@ -8,6 +8,9 @@ import jakarta.ws.rs.Path;
import java.security.cert.CertificateEncodingException;
import java.util.Base64;
/**
* Provides information about the peer publicly, without any authentication.
*/
@Path("/peer-info")
public class PeerSyncApi {
@Inject

View File

@@ -8,8 +8,17 @@ import jakarta.enterprise.context.ApplicationScoped;
import java.net.URI;
import java.util.concurrent.TimeUnit;
/**
* Allows to query peers about their information, even if peer isn't part of the cluster.
*/
@ApplicationScoped
public class PeerSyncApiClientDynamic {
/**
* Queries peer about its information.
*
* @param addr the address of the peer to query
* @return the peer information
*/
public ApiPeerInfo getSelfInfo(PeerAddress addr) {
return switch (addr) {
case IpPeerAddress ipAddr -> getSelfInfo(ipAddr.address().getHostAddress(), ipAddr.port());

View File

@@ -4,6 +4,9 @@ import com.usatiuk.objects.JObjectKey;
import java.io.Serializable;
/**
* JDataRefs are used to store additional metadata about incoming references to objects for reference counting.
*/
public interface JDataRef extends Comparable<JDataRef>, Serializable {
JObjectKey obj();
}