From 83ceefa0413b8bf4227186a3356b8fdfdec165bc Mon Sep 17 00:00:00 2001 From: Stepan Usatiuk Date: Tue, 13 May 2025 22:50:24 +0200 Subject: [PATCH] Sync-base: more javadocs 2 --- .../usatiuk/dhfs}/DeadlockDetector.java | 5 + ...aryOpSerializer.java => OpSerializer.java} | 2 +- .../com/usatiuk/dhfs/ShutdownChecker.java | 5 + .../peerdiscovery/PeerAddrStringHelper.java | 4 +- .../com/usatiuk/dhfs/peersync/PeerInfo.java | 2 + .../dhfs/peersync/ReachablePeerManager.java | 6 - .../dhfs/peertrust/CertificateTools.java | 20 ++++ .../peertrust/PeerInfoCertUpdateTxHook.java | 3 + .../dhfs/peertrust/PeerRolesAugmentor.java | 3 + .../dhfs/peertrust/PeerTrustManager.java | 3 + .../peertrust/PeerTrustServerCustomizer.java | 3 + .../usatiuk/dhfs/refcount/DeleterTxHook.java | 4 + .../usatiuk/dhfs/refcount/JDataNormalRef.java | 4 + .../dhfs/refcount/JDataRefcounted.java | 31 +++++ .../dhfs/remoteobj/ConflictResolver.java | 7 -- .../dhfs/remoteobj/DefaultObjSyncHandler.java | 3 + .../usatiuk/dhfs/remoteobj/JDataRemote.java | 26 ++++ .../dhfs/remoteobj/JDataRemoteDto.java | 8 ++ .../dhfs/remoteobj/JDataRemotePush.java | 5 + .../dhfs/remoteobj/ObjSyncHandler.java | 14 +++ .../dhfs/remoteobj/RemoteObjPusherTxHook.java | 3 + .../remoteobj/RemoteObjectDataWrapper.java | 6 + .../dhfs/remoteobj/RemoteObjectDeleter.java | 21 ++-- .../dhfs/remoteobj/RemoteObjectMeta.java | 4 + .../dhfs/remoteobj/RemoteTransaction.java | 45 +++++++ .../usatiuk/dhfs/remoteobj/SyncHandler.java | 21 +++- .../dhfs/remoteobj/SyncHandlerService.java | 9 -- .../usatiuk/dhfs/remoteobj/SyncHelper.java | 11 ++ ...zer.java => ReceivedObjectSerializer.java} | 2 +- ...lizer.java => RemoteObjectSerializer.java} | 2 +- .../dhfs/rpc/RemoteObjectServiceClient.java | 32 ++++- .../rpc/RemoteObjectServiceServerImpl.java | 2 +- .../usatiuk/dhfs/rpc/RpcChannelFactory.java | 11 ++ .../usatiuk/dhfs/rpc/RpcClientFactory.java | 42 ++++++- .../com/usatiuk/dhfs/syncmap/DtoMapper.java | 18 +++ .../dhfs/syncmap/DtoMapperService.java | 21 ++++ .../dhfs/webapi/PersistentPeerAddressApi.java | 5 +- .../com/usatiuk/dhfs/webui/WebUiRouter.java | 3 + .../java/com/usatiuk/utils/ByteUtils.java | 18 --- .../java/com/usatiuk/utils/DataLocker.java | 16 +++ .../utils/HashSetDelayedBlockingQueue.java | 113 ++++++++++++++++-- .../java/com/usatiuk/utils/ListUtils.java | 30 +++++ .../usatiuk/utils/SerializationHelper.java | 7 ++ .../StatusRuntimeExceptionNoStacktrace.java | 3 + .../utils/UninitializedByteBuffer.java | 15 +++ .../com/usatiuk/utils/UnsafeAccessor.java | 3 + 46 files changed, 546 insertions(+), 75 deletions(-) rename dhfs-parent/sync-base/src/main/java/{ => com/usatiuk/dhfs}/DeadlockDetector.java (95%) rename dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/{TemporaryOpSerializer.java => OpSerializer.java} (88%) delete mode 100644 dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/remoteobj/ConflictResolver.java delete mode 100644 dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/remoteobj/SyncHandlerService.java rename dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/rpc/{TemporaryReceivedObjectSerializer.java => ReceivedObjectSerializer.java} (94%) rename dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/rpc/{TemporaryRemoteObjectSerializer.java => RemoteObjectSerializer.java} (86%) delete mode 100644 dhfs-parent/utils/src/main/java/com/usatiuk/utils/ByteUtils.java diff --git a/dhfs-parent/sync-base/src/main/java/DeadlockDetector.java b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/DeadlockDetector.java similarity index 95% rename from dhfs-parent/sync-base/src/main/java/DeadlockDetector.java rename to dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/DeadlockDetector.java index 7b275098..14b03e1e 100644 --- a/dhfs-parent/sync-base/src/main/java/DeadlockDetector.java +++ b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/DeadlockDetector.java @@ -1,3 +1,5 @@ +package com.usatiuk.dhfs; + import io.quarkus.logging.Log; import io.quarkus.runtime.ShutdownEvent; import io.quarkus.runtime.StartupEvent; @@ -12,6 +14,9 @@ import java.util.Arrays; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +/** + * Periodically check for deadlocks in the JVM and log them if found. + */ @ApplicationScoped public class DeadlockDetector { private final ExecutorService _executor = Executors.newSingleThreadExecutor(); diff --git a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/TemporaryOpSerializer.java b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/OpSerializer.java similarity index 88% rename from dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/TemporaryOpSerializer.java rename to dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/OpSerializer.java index 4d7d0ee4..fc0eceec 100644 --- a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/TemporaryOpSerializer.java +++ b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/OpSerializer.java @@ -6,7 +6,7 @@ import com.usatiuk.utils.SerializationHelper; import jakarta.inject.Singleton; @Singleton -public class TemporaryOpSerializer implements ProtoSerializer { +public class OpSerializer implements ProtoSerializer { @Override public Op deserialize(OpP message) { return SerializationHelper.deserialize(message.getSerializedData().toByteArray()); diff --git a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/ShutdownChecker.java b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/ShutdownChecker.java index 03352e95..9754668b 100644 --- a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/ShutdownChecker.java +++ b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/ShutdownChecker.java @@ -11,6 +11,11 @@ import org.eclipse.microprofile.config.inject.ConfigProperty; import java.io.IOException; import java.nio.file.Paths; +/** + * This class checks if the application was shut down cleanly. + * It creates a file in the specified directory on startup and deletes it on shutdown. + * If the file exists on startup, it means the application was not shut down cleanly. + */ @ApplicationScoped public class ShutdownChecker { private static final String dataFileName = "running"; diff --git a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/peerdiscovery/PeerAddrStringHelper.java b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/peerdiscovery/PeerAddrStringHelper.java index 89fa93ef..13a14c1a 100644 --- a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/peerdiscovery/PeerAddrStringHelper.java +++ b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/peerdiscovery/PeerAddrStringHelper.java @@ -8,9 +8,7 @@ import java.util.Optional; /** * Helper class for parsing peer addresses from strings. - *

- * The expected format is: ::: - *

+ * The expected format is: peerId:ip:port:securePort */ public class PeerAddrStringHelper { diff --git a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/peersync/PeerInfo.java b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/peersync/PeerInfo.java index e23a42d5..c97de6b3 100644 --- a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/peersync/PeerInfo.java +++ b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/peersync/PeerInfo.java @@ -13,6 +13,8 @@ import java.security.cert.X509Certificate; /** * Represents information about a peer in the cluster + * {@link JDataRemotePush} annotation is used, as invalidating a peer information by the peer itself might make it unreachable, + * as it will not be possible to download it from the invalidated peer, so the peer information should be send with the notification * * @param key the key of the peer * @param id the ID of the peer diff --git a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/peersync/ReachablePeerManager.java b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/peersync/ReachablePeerManager.java index e0fe3af8..7ef95d39 100644 --- a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/peersync/ReachablePeerManager.java +++ b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/peersync/ReachablePeerManager.java @@ -191,12 +191,6 @@ public class ReachablePeerManager { return _states.keySet().stream().toList(); } -// public List getUnavailableHosts() { -// return _transientPeersState.runReadLocked(d -> d.getStates().entrySet().stream() -// .filter(e -> !e.getValue().isReachable()) -// .map(Map.Entry::getKey).toList()); -// } - /** * Gets a snapshot of current state of the connected (and not connected) peers * @return information about all connected/disconnected peers diff --git a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/peertrust/CertificateTools.java b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/peertrust/CertificateTools.java index a042c052..4ff0b3e6 100644 --- a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/peertrust/CertificateTools.java +++ b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/peertrust/CertificateTools.java @@ -22,8 +22,17 @@ import java.security.cert.X509Certificate; import java.util.Calendar; import java.util.Date; +/** + * Helper class for generating and manipulating X.509 certificates. + */ public class CertificateTools { + /** + * Converts a byte array to an X.509 certificate. + * + * @param bytes the byte array representing the certificate + * @return the X.509 certificate + */ public static X509Certificate certFromBytes(byte[] bytes) { try { CertificateFactory certFactory = CertificateFactory.getInstance("X.509"); @@ -34,6 +43,10 @@ public class CertificateTools { } } + /** + * Generates a random RSA key pair. + * @return the generated RSA key pair + */ public static KeyPair generateKeyPair() { try { KeyPairGenerator keyGen = KeyPairGenerator.getInstance("RSA"); @@ -44,6 +57,13 @@ public class CertificateTools { } } + /** + * Generates an X.509 certificate using the provided key pair and subject name. + * + * @param keyPair the key pair to use for the certificate + * @param subject the subject name for the certificate + * @return the generated X.509 certificate + */ public static X509Certificate generateCertificate(KeyPair keyPair, String subject) { try { Provider bcProvider = new BouncyCastleProvider(); diff --git a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/peertrust/PeerInfoCertUpdateTxHook.java b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/peertrust/PeerInfoCertUpdateTxHook.java index c7c7ac0f..01e0c87c 100644 --- a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/peertrust/PeerInfoCertUpdateTxHook.java +++ b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/peertrust/PeerInfoCertUpdateTxHook.java @@ -15,6 +15,9 @@ import io.quarkus.logging.Log; import jakarta.inject.Inject; import jakarta.inject.Singleton; +/** + * Automatically refreshes certificates in the trust manager for peers when their info is updated. + */ @Singleton public class PeerInfoCertUpdateTxHook implements PreCommitTxHook { @Inject diff --git a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/peertrust/PeerRolesAugmentor.java b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/peertrust/PeerRolesAugmentor.java index 26601253..0726717b 100644 --- a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/peertrust/PeerRolesAugmentor.java +++ b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/peertrust/PeerRolesAugmentor.java @@ -14,6 +14,9 @@ import jakarta.inject.Inject; import java.util.function.Supplier; +/** + * Augments the security identity of peers that are members of the cluster. + */ @ApplicationScoped public class PeerRolesAugmentor implements SecurityIdentityAugmentor { @Inject diff --git a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/peertrust/PeerTrustManager.java b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/peertrust/PeerTrustManager.java index 160bc993..5309ec70 100644 --- a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/peertrust/PeerTrustManager.java +++ b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/peertrust/PeerTrustManager.java @@ -17,6 +17,9 @@ import java.security.cert.X509Certificate; import java.util.Collection; import java.util.concurrent.atomic.AtomicReference; +/** + * Custom trust manager that trusts the certificates of peers in the cluster. + */ @ApplicationScoped public class PeerTrustManager implements X509TrustManager { private final AtomicReference trustManager = new AtomicReference<>(); diff --git a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/peertrust/PeerTrustServerCustomizer.java b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/peertrust/PeerTrustServerCustomizer.java index a45fa70c..9024a9ab 100644 --- a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/peertrust/PeerTrustServerCustomizer.java +++ b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/peertrust/PeerTrustServerCustomizer.java @@ -13,6 +13,9 @@ import javax.net.ssl.KeyManagerFactory; import java.security.KeyStore; import java.security.cert.Certificate; +/** + * Customizes the HTTP server options to use the peer trust manager and the self-signed certificate. + */ @ApplicationScoped public class PeerTrustServerCustomizer implements HttpServerOptionsCustomizer { diff --git a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/refcount/DeleterTxHook.java b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/refcount/DeleterTxHook.java index 28f2b3f1..70e805fd 100644 --- a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/refcount/DeleterTxHook.java +++ b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/refcount/DeleterTxHook.java @@ -10,6 +10,10 @@ import io.quarkus.logging.Log; import jakarta.inject.Inject; import jakarta.inject.Singleton; +/** + * Automatically deletes objects that are not referenced by anything else. + * For remote objects, it puts them into the deletion queue. + */ @Singleton public class DeleterTxHook implements PreCommitTxHook { @Inject diff --git a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/refcount/JDataNormalRef.java b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/refcount/JDataNormalRef.java index 3ef92fda..a0ae35f5 100644 --- a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/refcount/JDataNormalRef.java +++ b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/refcount/JDataNormalRef.java @@ -3,6 +3,10 @@ package com.usatiuk.dhfs.refcount; import com.usatiuk.dhfs.jmap.JMapRef; import com.usatiuk.objects.JObjectKey; +/** + * Good old boring object reference. + * @param obj the object that is the source of the reference + */ public record JDataNormalRef(JObjectKey obj) implements JDataRef { @Override public int compareTo(JDataRef o) { diff --git a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/refcount/JDataRefcounted.java b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/refcount/JDataRefcounted.java index 741fa0d7..40a755d8 100644 --- a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/refcount/JDataRefcounted.java +++ b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/refcount/JDataRefcounted.java @@ -7,15 +7,46 @@ import org.pcollections.PCollection; import java.util.Collection; import java.util.List; +/** + * Interface for a reference counted object + */ public interface JDataRefcounted extends JData { + /** + * Returns list of incoming references to this object. + * + * @return list of incoming references + */ PCollection refsFrom(); + /** + * Create a copy of this object with the given list of incoming references. + * + * @param refs list of incoming references + * @return copy of this object with the given list of incoming references + */ JDataRefcounted withRefsFrom(PCollection refs); + /** + * Returns whether this object is frozen or not. + * A frozen object cannot be garbage collected. + * + * @return true if this object is frozen, false otherwise + */ boolean frozen(); + /** + * Create a copy of this object with the given frozen state. + * + * @param frozen true if this object should be frozen, false otherwise + * @return copy of this object with the given frozen state + */ JDataRefcounted withFrozen(boolean frozen); + /** + * Collect outgoing references to other objects. + * + * @return list of outgoing references + */ default Collection collectRefsTo() { return List.of(); } diff --git a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/remoteobj/ConflictResolver.java b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/remoteobj/ConflictResolver.java deleted file mode 100644 index a45040b7..00000000 --- a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/remoteobj/ConflictResolver.java +++ /dev/null @@ -1,7 +0,0 @@ -package com.usatiuk.dhfs.remoteobj; - -import com.usatiuk.dhfs.peersync.PeerId; - -public interface ConflictResolver { - void resolve(PeerId fromPeer, RemoteObjectMeta ours, RemoteObjectMeta theirs); -} diff --git a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/remoteobj/DefaultObjSyncHandler.java b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/remoteobj/DefaultObjSyncHandler.java index ff674e05..fb9f601e 100644 --- a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/remoteobj/DefaultObjSyncHandler.java +++ b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/remoteobj/DefaultObjSyncHandler.java @@ -12,6 +12,9 @@ import org.pcollections.PMap; import javax.annotation.Nullable; +/** + * Fallback handler for remote object updates, no conflict resolution. + */ @ApplicationScoped public class DefaultObjSyncHandler { @Inject diff --git a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/remoteobj/JDataRemote.java b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/remoteobj/JDataRemote.java index 4ddac751..55723425 100644 --- a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/remoteobj/JDataRemote.java +++ b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/remoteobj/JDataRemote.java @@ -6,17 +6,43 @@ import java.io.Serializable; import java.util.Collection; import java.util.List; +/** + * Interface for a remote object. Remote objects are objects that are automatically synchronized between peers, + * and versioned using a version vector. + */ public interface JDataRemote extends Serializable { + /** + * Returns the key of this remote object. + * + * @return the key of this remote object + */ JObjectKey key(); + /** + * Returns the estimated size of this remote object in bytes. + * + * @return the estimated size of this remote object in bytes + */ default int estimateSize() { return 100; } + /** + * Collect outgoing references to other objects. + * + * @return list of outgoing references + */ + default Collection collectRefsTo() { return List.of(); } + /** + * Returns the class of DTO of this object that should be used for remote synchronization. + * It can be the same as the object. + * + * @return the class of DTO of this object that should be used for remote synchronization + */ default Class dtoClass() { assert JDataRemoteDto.class.isAssignableFrom(getClass()); return (Class) this.getClass(); diff --git a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/remoteobj/JDataRemoteDto.java b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/remoteobj/JDataRemoteDto.java index a081b7a4..41056773 100644 --- a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/remoteobj/JDataRemoteDto.java +++ b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/remoteobj/JDataRemoteDto.java @@ -2,7 +2,15 @@ package com.usatiuk.dhfs.remoteobj; import java.io.Serializable; +/** + * Marker interface for a DTO class to be used when synchronizing some remote object. + */ public interface JDataRemoteDto extends Serializable { + /** + * Returns the class of the remote object that this DTO represents. + * + * @return the class of the remote object that this DTO represents + */ default Class objClass() { assert JDataRemote.class.isAssignableFrom(getClass()); return (Class) this.getClass(); diff --git a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/remoteobj/JDataRemotePush.java b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/remoteobj/JDataRemotePush.java index 6f255606..60da0be3 100644 --- a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/remoteobj/JDataRemotePush.java +++ b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/remoteobj/JDataRemotePush.java @@ -5,6 +5,11 @@ import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; +/** + * Annotation for eagerly pushed remote objects. + * This annotation is used to mark remote object which notification operations should contain the object itself, + * to avoid the other peer having to download it. + */ @Retention(RetentionPolicy.RUNTIME) @Target(ElementType.TYPE) public @interface JDataRemotePush { diff --git a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/remoteobj/ObjSyncHandler.java b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/remoteobj/ObjSyncHandler.java index 2d11cb6c..3ab55f30 100644 --- a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/remoteobj/ObjSyncHandler.java +++ b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/remoteobj/ObjSyncHandler.java @@ -6,7 +6,21 @@ import org.pcollections.PMap; import javax.annotation.Nullable; +/** + * Interface for handling remote updates of objects. + * + * @param the type of the remote object + * @param the type of the remote object DTO + */ public interface ObjSyncHandler { + /** + * Handles a remote update of an object. + * + * @param from the ID of the peer that sent the update + * @param key the key of the object + * @param receivedChangelog the changelog received from the peer + * @param receivedData the data received from the peer + */ void handleRemoteUpdate(PeerId from, JObjectKey key, PMap receivedChangelog, @Nullable D receivedData); diff --git a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/remoteobj/RemoteObjPusherTxHook.java b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/remoteobj/RemoteObjPusherTxHook.java index 648a5643..ca3b79c5 100644 --- a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/remoteobj/RemoteObjPusherTxHook.java +++ b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/remoteobj/RemoteObjPusherTxHook.java @@ -9,6 +9,9 @@ import com.usatiuk.objects.transaction.Transaction; import jakarta.inject.Inject; import jakarta.inject.Singleton; +/** + * Transaction hook to automatically notify {@link InvalidationQueueService} about changed objects. + */ @Singleton public class RemoteObjPusherTxHook implements PreCommitTxHook { @Inject diff --git a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/remoteobj/RemoteObjectDataWrapper.java b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/remoteobj/RemoteObjectDataWrapper.java index 0432861b..ec362552 100644 --- a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/remoteobj/RemoteObjectDataWrapper.java +++ b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/remoteobj/RemoteObjectDataWrapper.java @@ -8,6 +8,12 @@ import org.pcollections.PCollection; import java.util.Collection; +/** + * Wrapper for remote object data. + * This class is used to store additional metadata about incoming references to objects for reference counting. + * + * @param the type of the remote object data + */ public record RemoteObjectDataWrapper( JObjectKey key, PCollection refsFrom, diff --git a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/remoteobj/RemoteObjectDeleter.java b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/remoteobj/RemoteObjectDeleter.java index 40f44be8..1e0a9f3b 100644 --- a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/remoteobj/RemoteObjectDeleter.java +++ b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/remoteobj/RemoteObjectDeleter.java @@ -29,6 +29,9 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.stream.Stream; +/** + * Handles deletion of remote objects, asynchronously. + */ @ApplicationScoped public class RemoteObjectDeleter { private final HashSetDelayedBlockingQueue _quickCandidates = new HashSetDelayedBlockingQueue<>(0); @@ -79,14 +82,7 @@ public class RemoteObjectDeleter { _refProcessorExecutorService.submit(this::refProcessor); } - // Continue GC from last shutdown - //FIXME -// executorService.submit(() -> -// jObjectManager.findAll().forEach(n -> { -// jObjectManager.get(n).ifPresent(o -> o.runWriteLocked(JObjectManager.ResolutionStrategy.NO_RESOLUTION, (m, d, b, v) -> { -// return null; -// })); -// })); + // TODO: Continue GC from last shutdown } void shutdown(@Observes @Priority(800) ShutdownEvent event) throws InterruptedException { @@ -96,10 +92,10 @@ public class RemoteObjectDeleter { } } -// public void putQuickDeletionCandidate(JObjectKey obj) { -// _quickCandidates.add(obj); -// } - + /** + * Add a deletion candidate to the queue. + * @param obj the object to be deleted + */ public void putDeletionCandidate(RemoteObjectMeta obj) { if (!obj.seen()) { if (_quickCandidates.add(obj.key())) @@ -177,7 +173,6 @@ public class RemoteObjectDeleter { }); } - // FIXME: private boolean canDelete(JDataRefcounted obj) { return obj.refsFrom().isEmpty() && !obj.frozen(); } diff --git a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/remoteobj/RemoteObjectMeta.java b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/remoteobj/RemoteObjectMeta.java index 10242b49..911eb77c 100644 --- a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/remoteobj/RemoteObjectMeta.java +++ b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/remoteobj/RemoteObjectMeta.java @@ -9,6 +9,10 @@ import org.pcollections.*; import java.util.Collection; import java.util.List; +/** + * Metadata for remote objects. + * This class makes a peer aware of remote object's existence without necessarily downloading its data. + */ public record RemoteObjectMeta(JObjectKey key, PCollection refsFrom, boolean frozen, PMap knownRemoteVersions, Class knownType, diff --git a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/remoteobj/RemoteTransaction.java b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/remoteobj/RemoteTransaction.java index 1f11ef4b..ea904649 100644 --- a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/remoteobj/RemoteTransaction.java +++ b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/remoteobj/RemoteTransaction.java @@ -12,6 +12,9 @@ import org.pcollections.HashTreePSet; import java.util.Optional; +/** + * Helper class for working with remote objects. + */ @Singleton public class RemoteTransaction { @Inject @@ -71,10 +74,22 @@ public class RemoteTransaction { }); } + /** + * Get metadata for the object with the given key. + * + * @param key the key of the object + * @return an Optional containing the metadata if it exists, or an empty Optional if it doesn't + */ public Optional getMeta(JObjectKey key) { return curTx.get(RemoteObjectMeta.class, RemoteObjectMeta.ofMetaKey(key)); } + /** + * Put the data of a remote object into the storage, without incrementing the version vector. + * + * @param obj the object to put + * @param the type of the object + */ public void putDataRaw(T obj) { var curMeta = getMeta(obj.key()).orElse(null); if (curMeta == null) @@ -92,11 +107,25 @@ public class RemoteTransaction { curTx.put(newData); } + /** + * Put the data of a remote object into the storage, creating a new object. + * Should only be used when an object is known to be new. (for example, when it is created with a unique random key) + * + * @param obj the object to put + * @param the type of the object + */ public void putDataNew(T obj) { curTx.putNew(new RemoteObjectMeta(obj, persistentPeerDataService.getSelfUuid())); curTx.putNew(new RemoteObjectDataWrapper<>(obj)); } + /** + * Put the data of a remote object into the storage, either creating a new object or updating an existing one. + * If the object already exists, its version vector is incremented. + * + * @param obj the object to put + * @param the type of the object + */ public void putData(T obj) { var curMeta = getMeta(obj.key()).orElse(null); @@ -126,10 +155,26 @@ public class RemoteTransaction { curTx.put(newData); } + /** + * Get the data of a remote object with the given key. + * The data can be downloaded from the remote object if it is not already present in the local storage. + * + * @param type the type of the object + * @param key the key of the object + * @return an Optional containing the data if it exists, or an empty Optional if it doesn't + */ public Optional getData(Class type, JObjectKey key) { return getData(type, key, true); } + /** + * Get the data of a remote object with the given key. + * The data will not be downloaded from the remote object if it is not already present in the local storage. + * + * @param type the type of the object + * @param key the key of the object + * @return an Optional containing the data if it exists, or an empty Optional if it doesn't + */ public Optional getDataLocal(Class type, JObjectKey key) { return getData(type, key, false); } diff --git a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/remoteobj/SyncHandler.java b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/remoteobj/SyncHandler.java index 8fdfc2df..6fae52e0 100644 --- a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/remoteobj/SyncHandler.java +++ b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/remoteobj/SyncHandler.java @@ -29,6 +29,9 @@ import java.util.Map; import java.util.Optional; import java.util.stream.Stream; +/** + * Handles synchronization of remote objects. + */ @ApplicationScoped public class SyncHandler { private final Map, ObjSyncHandler> _objToSyncHandler; @@ -93,6 +96,15 @@ public class SyncHandler { _initialSyncProcessors = Map.copyOf(initialSyncProcessorHashMap); } + /** + * Handles remote update of an object. + * + * @param from the ID of the peer that sent the update + * @param key the key of the object + * @param receivedChangelog the changelog received from the peer + * @param receivedData the data received from the peer + * @param the type of the remote object DTO + */ public void handleRemoteUpdate(PeerId from, JObjectKey key, PMap receivedChangelog, @Nullable D receivedData) { @@ -123,7 +135,9 @@ public class SyncHandler { } } - + /** + * Resync objects after a crash. + */ public void resyncAfterCrash(@Observes @Priority(100000) StartupEvent event) { if (shutdownChecker.lastShutdownClean()) return; @@ -145,6 +159,11 @@ public class SyncHandler { }); } + /** + * Do initial sync for a newly connected (or reconnected) peer. + * + * @param peer the ID of the peer + */ public void doInitialSync(PeerId peer) { txm.run(() -> { Log.tracev("Will do initial sync for {0}", peer); diff --git a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/remoteobj/SyncHandlerService.java b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/remoteobj/SyncHandlerService.java deleted file mode 100644 index 9e8f8757..00000000 --- a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/remoteobj/SyncHandlerService.java +++ /dev/null @@ -1,9 +0,0 @@ -package com.usatiuk.dhfs.remoteobj; - -import jakarta.enterprise.context.ApplicationScoped; - -@ApplicationScoped -public class SyncHandlerService { - - -} diff --git a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/remoteobj/SyncHelper.java b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/remoteobj/SyncHelper.java index 12665255..ab645b0c 100644 --- a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/remoteobj/SyncHelper.java +++ b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/remoteobj/SyncHelper.java @@ -6,7 +6,18 @@ import org.pcollections.PMap; import java.util.stream.Collectors; import java.util.stream.Stream; +/** + * Helper class for synchronizing objects. + */ public class SyncHelper { + + /** + * Compares two changelogs. + * + * @param current the current changelog + * @param other the other changelog + * @return the result of the comparison + */ public static ChangelogCmpResult compareChangelogs(PMap current, PMap other) { boolean hasLower = false; boolean hasHigher = false; diff --git a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/rpc/TemporaryReceivedObjectSerializer.java b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/rpc/ReceivedObjectSerializer.java similarity index 94% rename from dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/rpc/TemporaryReceivedObjectSerializer.java rename to dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/rpc/ReceivedObjectSerializer.java index cbc47498..55c3ef21 100644 --- a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/rpc/TemporaryReceivedObjectSerializer.java +++ b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/rpc/ReceivedObjectSerializer.java @@ -14,7 +14,7 @@ import org.pcollections.HashTreePMap; import org.pcollections.PMap; @ApplicationScoped -public class TemporaryReceivedObjectSerializer implements ProtoSerializer { +public class ReceivedObjectSerializer implements ProtoSerializer { @Inject ProtoSerializer remoteObjectSerializer; diff --git a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/rpc/TemporaryRemoteObjectSerializer.java b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/rpc/RemoteObjectSerializer.java similarity index 86% rename from dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/rpc/TemporaryRemoteObjectSerializer.java rename to dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/rpc/RemoteObjectSerializer.java index ea3cbd1c..afd74156 100644 --- a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/rpc/TemporaryRemoteObjectSerializer.java +++ b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/rpc/RemoteObjectSerializer.java @@ -7,7 +7,7 @@ import com.usatiuk.utils.SerializationHelper; import jakarta.enterprise.context.ApplicationScoped; @ApplicationScoped -public class TemporaryRemoteObjectSerializer implements ProtoSerializer { +public class RemoteObjectSerializer implements ProtoSerializer { @Override public JDataRemoteDto deserialize(JDataRemoteDtoP message) { return SerializationHelper.deserialize(message.getSerializedData().toByteArray()); diff --git a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/rpc/RemoteObjectServiceClient.java b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/rpc/RemoteObjectServiceClient.java index 27ef5a62..67b8a80b 100644 --- a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/rpc/RemoteObjectServiceClient.java +++ b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/rpc/RemoteObjectServiceClient.java @@ -4,8 +4,8 @@ import com.usatiuk.dhfs.ProtoSerializer; import com.usatiuk.dhfs.invalidation.InvalidationQueueService; import com.usatiuk.dhfs.invalidation.Op; import com.usatiuk.dhfs.peersync.PeerId; -import com.usatiuk.dhfs.peersync.ReachablePeerManager; import com.usatiuk.dhfs.peersync.PersistentPeerDataService; +import com.usatiuk.dhfs.peersync.ReachablePeerManager; import com.usatiuk.dhfs.persistence.JObjectKeyP; import com.usatiuk.dhfs.remoteobj.ReceivedObject; import com.usatiuk.dhfs.remoteobj.RemoteObjectMeta; @@ -29,6 +29,9 @@ import java.util.concurrent.*; import java.util.function.Function; import java.util.stream.Collectors; +/** + * Helper class for calling remote peers RPCs. + */ @ApplicationScoped public class RemoteObjectServiceClient { private final ExecutorService _batchExecutor = Executors.newVirtualThreadPerTaskExecutor(); @@ -53,6 +56,13 @@ public class RemoteObjectServiceClient { @Inject ReachablePeerManager reachablePeerManager; + /** + * Download a specific object from a specific peer. + * + * @param key the key of the object to download + * @param peerId the ID of the peer to download from + * @return a pair of the peer ID from which the object was downloaded and the downloaded object + */ public Pair getSpecificObject(JObjectKey key, PeerId peerId) { return rpcClientFactory.withObjSyncClient(peerId, (peer, client) -> { var reply = client.getObject(GetObjectRequest.newBuilder().setName(JObjectKeyP.newBuilder().setName(key.toString()).build()).build()); @@ -61,6 +71,12 @@ public class RemoteObjectServiceClient { }); } + /** + * Download a specific object from some reachable peer. + * + * @param key the key of the object to download + * @param onReceive a callback function to process the received object + */ public void getObject(JObjectKey key, Function, Boolean> onReceive) { var objMeta = remoteTx.getMeta(key).orElse(null); @@ -93,6 +109,13 @@ public class RemoteObjectServiceClient { }); } + /** + * Push a list of operations to a specific peer. + * + * @param target the ID of the peer to push to + * @param ops the list of operations to push + * @return the reply from the peer + */ public OpPushReply pushOps(PeerId target, List ops) { var barrier = new CountDownLatch(ops.size()); for (Op op : ops) { @@ -116,6 +139,13 @@ public class RemoteObjectServiceClient { return OpPushReply.getDefaultInstance(); } + /** + * Ask given peers if they can delete the object with the given key. + * + * @param targets the list of peers to ask + * @param objKey the key of the object to delete + * @return a collection of pairs of peer IDs and their replies + */ public Collection> canDelete(Collection targets, JObjectKey objKey) { Log.trace("Asking canDelete for " + objKey + " from " + targets.stream().map(PeerId::toString).collect(Collectors.joining(", "))); try { diff --git a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/rpc/RemoteObjectServiceServerImpl.java b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/rpc/RemoteObjectServiceServerImpl.java index 19b81f69..d0fe8f41 100644 --- a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/rpc/RemoteObjectServiceServerImpl.java +++ b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/rpc/RemoteObjectServiceServerImpl.java @@ -47,7 +47,7 @@ public class RemoteObjectServiceServerImpl { DtoMapperService dtoMapperService; @Inject AutosyncProcessor autosyncProcessor; - + public Uni getObject(PeerId from, GetObjectRequest request) { Log.info("<-- getObject: " + request.getName() + " from " + from); diff --git a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/rpc/RpcChannelFactory.java b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/rpc/RpcChannelFactory.java index 26035f9c..3fd1d19b 100644 --- a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/rpc/RpcChannelFactory.java +++ b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/rpc/RpcChannelFactory.java @@ -15,6 +15,9 @@ import java.security.KeyStore; import java.security.cert.Certificate; import java.util.concurrent.TimeUnit; +/** + * Factory for creating gRPC channels + */ @ApplicationScoped public class RpcChannelFactory { @Inject @@ -39,6 +42,14 @@ public class RpcChannelFactory { } } + /** + * Creates a secure channel to the given host and port, with correct credentials. + * + * @param host the host to connect to + * @param address the address of the host + * @param port the port to connect to + * @return a secure gRPC channel + */ ManagedChannel getSecureChannel(PeerId host, String address, int port) { return NettyChannelBuilder.forAddress(address, port, getChannelCredentials()).overrideAuthority(host.toString()).idleTimeout(10, TimeUnit.SECONDS).build(); } diff --git a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/rpc/RpcClientFactory.java b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/rpc/RpcClientFactory.java index bad0ae29..5f5bfcbe 100644 --- a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/rpc/RpcClientFactory.java +++ b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/rpc/RpcClientFactory.java @@ -22,7 +22,9 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; -// TODO: Dedup this +/** + * Factory for creating gRPC clients for object synchronization and other RPC calls. + */ @ApplicationScoped public class RpcClientFactory implements PeerDisconnectedEventListener { @ConfigProperty(name = "dhfs.objects.sync.timeout") @@ -34,9 +36,16 @@ public class RpcClientFactory implements PeerDisconnectedEventListener { @Inject RpcChannelFactory rpcChannelFactory; - // FIXME: Leaks! private ConcurrentMap _objSyncCache = new ConcurrentHashMap<>(); + /** + * Try calling a given function on given peers in random order. + * + * @param targets the list of targets to call + * @param fn the function to call + * @param the return type of the function + * @return the result of the function call + */ public R withObjSyncClient(Collection targets, ObjectSyncClientFunction fn) { var shuffledList = new ArrayList<>(targets); Collections.shuffle(shuffledList); @@ -55,6 +64,14 @@ public class RpcClientFactory implements PeerDisconnectedEventListener { throw new StatusRuntimeException(Status.UNAVAILABLE.withDescription("No reachable targets!")); } + /** + * Try calling a given function on a given target. + * + * @param target the target to call + * @param fn the function to call + * @param the return type of the function + * @return the result of the function call + */ public R withObjSyncClient(PeerId target, ObjectSyncClientFunction fn) { var hostinfo = reachablePeerManager.getAddress(target); @@ -64,6 +81,16 @@ public class RpcClientFactory implements PeerDisconnectedEventListener { return withObjSyncClient(target, hostinfo, syncTimeout, fn); } + /** + * Try calling a given function on a given target with a specified timeout. + * + * @param host the host to call + * @param address the address of the host + * @param timeout the timeout for the call + * @param fn the function to call + * @param the return type of the function + * @return the result of the function call + */ public R withObjSyncClient(PeerId host, PeerAddress address, long timeout, ObjectSyncClientFunction fn) { return switch (address) { case IpPeerAddress ipPeerAddress -> @@ -72,6 +99,17 @@ public class RpcClientFactory implements PeerDisconnectedEventListener { }; } + /** + * Try calling a given function on a given target with a specified timeout. + * + * @param host the host to call + * @param addr the address of the host + * @param port the port of the host + * @param timeout the timeout for the call + * @param fn the function to call + * @param the return type of the function + * @return the result of the function call + */ public R withObjSyncClient(PeerId host, InetAddress addr, int port, long timeout, ObjectSyncClientFunction fn) { var key = new ObjSyncStubKey(host, addr, port); var stub = _objSyncCache.computeIfAbsent(key, (k) -> { diff --git a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/syncmap/DtoMapper.java b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/syncmap/DtoMapper.java index 28fc6024..60975153 100644 --- a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/syncmap/DtoMapper.java +++ b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/syncmap/DtoMapper.java @@ -3,8 +3,26 @@ package com.usatiuk.dhfs.syncmap; import com.usatiuk.dhfs.remoteobj.JDataRemote; import com.usatiuk.dhfs.remoteobj.JDataRemoteDto; +/** + * Interface for mapping between a remote object and its DTO representation. + * + * @param the type of the remote object + * @param the type of the DTO + */ public interface DtoMapper { + /** + * Converts a remote object to its DTO representation. + * + * @param obj the remote object to convert + * @return the DTO representation of the remote object + */ D toDto(F obj); + /** + * Converts a DTO to its corresponding remote object. + * + * @param dto the DTO to convert + * @return the remote object representation of the DTO + */ F fromDto(D dto); } diff --git a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/syncmap/DtoMapperService.java b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/syncmap/DtoMapperService.java index 022c7c9f..e60581c0 100644 --- a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/syncmap/DtoMapperService.java +++ b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/syncmap/DtoMapperService.java @@ -11,6 +11,9 @@ import java.util.HashMap; import java.util.Map; import java.util.stream.Stream; +/** + * Service for mapping between remote objects and their DTO representations. + */ @Singleton public class DtoMapperService { private final Map, DtoMapper> _remoteToDtoMap; @@ -41,6 +44,15 @@ public class DtoMapperService { _dtoToRemoteMap = Map.copyOf(dtoToRemoteMap); } + /** + * Converts a remote object to its DTO representation. + * + * @param from the remote object to convert + * @param to the class of the DTO representation + * @param the type of the remote object + * @param the type of the DTO + * @return the DTO representation of the remote object + */ public D toDto(F from, Class to) { if (to.equals(from.getClass())) { return (D) from; @@ -50,6 +62,15 @@ public class DtoMapperService { return to.cast(got); } + /** + * Converts a DTO to its corresponding remote object. + * + * @param from the DTO to convert + * @param to the class of the remote object representation + * @param the type of the remote object + * @param the type of the DTO + * @return the remote object representation of the DTO + */ public F fromDto(D from, Class to) { if (to.equals(from.getClass())) { return (F) from; diff --git a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/webapi/PersistentPeerAddressApi.java b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/webapi/PersistentPeerAddressApi.java index 351b4ecb..556778dd 100644 --- a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/webapi/PersistentPeerAddressApi.java +++ b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/webapi/PersistentPeerAddressApi.java @@ -3,13 +3,16 @@ package com.usatiuk.dhfs.webapi; import com.usatiuk.dhfs.peerdiscovery.PeerAddrStringHelper; import com.usatiuk.dhfs.peersync.PeerId; import com.usatiuk.dhfs.peersync.PeerInfoService; -import com.usatiuk.dhfs.peersync.ReachablePeerManager; import com.usatiuk.dhfs.peersync.PersistentPeerDataService; +import com.usatiuk.dhfs.peersync.ReachablePeerManager; import jakarta.inject.Inject; import jakarta.ws.rs.*; import java.util.Collection; +/** + * Simple API for managing connected peers and manually specifying their addresses. + */ @Path("/peers-addr-manage") public class PersistentPeerAddressApi { @Inject diff --git a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/webui/WebUiRouter.java b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/webui/WebUiRouter.java index bdbb6ce5..7bf0b144 100644 --- a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/webui/WebUiRouter.java +++ b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/webui/WebUiRouter.java @@ -12,6 +12,9 @@ import java.nio.file.Path; import java.nio.file.Paths; import java.util.Optional; +/** + * Web UI router for serving static files. + */ @ApplicationScoped public class WebUiRouter { diff --git a/dhfs-parent/utils/src/main/java/com/usatiuk/utils/ByteUtils.java b/dhfs-parent/utils/src/main/java/com/usatiuk/utils/ByteUtils.java deleted file mode 100644 index f7075b40..00000000 --- a/dhfs-parent/utils/src/main/java/com/usatiuk/utils/ByteUtils.java +++ /dev/null @@ -1,18 +0,0 @@ -package com.usatiuk.utils; - -import java.nio.ByteBuffer; - -public class ByteUtils { - public static byte[] longToBytes(long val) { - return ByteBuffer.wrap(new byte[8]).putLong(val).array(); - } - - public static long bytesToLong(byte[] bytes) { - return ByteBuffer.wrap(bytes).getLong(); - } - - // Returns a ByteBuffer of size 8 with position reset - public static ByteBuffer longToBb(long val) { - return ByteBuffer.allocate(8).putLong(val).flip(); - } -} diff --git a/dhfs-parent/utils/src/main/java/com/usatiuk/utils/DataLocker.java b/dhfs-parent/utils/src/main/java/com/usatiuk/utils/DataLocker.java index 7e3e7a7c..0bf3b4a3 100644 --- a/dhfs-parent/utils/src/main/java/com/usatiuk/utils/DataLocker.java +++ b/dhfs-parent/utils/src/main/java/com/usatiuk/utils/DataLocker.java @@ -9,6 +9,9 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +/** + * Allows to lock arbitrary keys. + */ public class DataLocker { private final ConcurrentHashMap> _locks = new ConcurrentHashMap<>(); private static final Cleaner CLEANER = Cleaner.create(); @@ -36,6 +39,12 @@ public class DataLocker { } } + /** + * Locks the data and returns an AutoCloseable that unlocks it when closed. + * + * @param data the data to lock + * @return an AutoCloseable that unlocks the data when closed + */ @Nonnull public AutoCloseableNoThrow lock(Object data) { var lock = getTag(data); @@ -43,6 +52,13 @@ public class DataLocker { return lock::unlock; } + /** + * Tries to lock the data and returns an AutoCloseable that unlocks it when closed. + * If the lock is not acquired, returns null. + * + * @param data the data to lock + * @return an AutoCloseable that unlocks the data when closed, or null if the lock was not acquired + */ @Nullable public AutoCloseableNoThrow tryLock(Object data) { var lock = getTag(data); diff --git a/dhfs-parent/utils/src/main/java/com/usatiuk/utils/HashSetDelayedBlockingQueue.java b/dhfs-parent/utils/src/main/java/com/usatiuk/utils/HashSetDelayedBlockingQueue.java index b4ff8dc7..e44753b4 100644 --- a/dhfs-parent/utils/src/main/java/com/usatiuk/utils/HashSetDelayedBlockingQueue.java +++ b/dhfs-parent/utils/src/main/java/com/usatiuk/utils/HashSetDelayedBlockingQueue.java @@ -7,20 +7,38 @@ import java.util.Collection; import java.util.LinkedHashMap; import java.util.function.Function; +/** + * Blocking queue that delays the objects for a given time, and deduplicates them. + * + * @param the type of the objects in the queue + */ public class HashSetDelayedBlockingQueue { private final LinkedHashMap> _set = new LinkedHashMap<>(); private final Object _sleepSynchronizer = new Object(); private long _delay; private boolean _closed = false; + /** + * Creates a new HashSetDelayedBlockingQueue with the specified delay. + * + * @param delay the delay in milliseconds + */ public HashSetDelayedBlockingQueue(long delay) { _delay = delay; } + /** + * @return the delay in milliseconds + */ public long getDelay() { return _delay; } + /** + * Sets the delay for the queue. + * + * @param delay the delay in milliseconds + */ public void setDelay(long delay) { synchronized (_sleepSynchronizer) { _delay = delay; @@ -28,8 +46,12 @@ public class HashSetDelayedBlockingQueue { } } - // If there's object with key in the queue, don't do anything - // Returns whether it was added or not + /** + * Adds the object to the queue if it doesn't exist. + * + * @param el the object to add + * @return true if the object was added, false if it already exists + */ public boolean add(T el) { synchronized (this) { if (_closed) throw new IllegalStateException("Adding to a queue that is closed!"); @@ -43,9 +65,12 @@ public class HashSetDelayedBlockingQueue { } - // Adds the object to the queue, if it exists re-adds it - // With no delay - // Returns the old object, or null + /** + * Adds the object to the queue with no delay. + * + * @param el the object to add + * @return the old object if it existed, null otherwise + */ public T addNoDelay(T el) { synchronized (this) { if (_closed) throw new IllegalStateException("Adding to a queue that is closed!"); @@ -60,8 +85,12 @@ public class HashSetDelayedBlockingQueue { } } - // Adds the object to the queue, if it exists re-adds it with a new delay - // Returns the old object, or null + /** + * Adds the object to the queue, if it exists re-adds it with a new delay + * + * @param el the object to add + * @return the old object if it existed, null otherwise + */ public T readd(T el) { synchronized (this) { if (_closed) throw new IllegalStateException("Adding to a queue that is closed!"); @@ -76,8 +105,13 @@ public class HashSetDelayedBlockingQueue { } } - // Merges the object with the old one - // Returns the old object, or null + /** + * Merges the object with the old one. + * + * @param el the object to merge + * @param transformer the function to transform the old object + * @return the old object if it existed, null otherwise + */ public T merge(T el, Function transformer) { synchronized (this) { if (_closed) throw new IllegalStateException("Adding to a queue that is closed!"); @@ -97,7 +131,12 @@ public class HashSetDelayedBlockingQueue { } } - // Removes the object + /** + * Removes the object from the queue. + * + * @param el the object to remove + * @return the removed object, or null if it didn't exist + */ public T remove(T el) { synchronized (this) { var rem = _set.remove(el); @@ -106,6 +145,13 @@ public class HashSetDelayedBlockingQueue { } } + /** + * Gets the object from the queue, waiting for it if necessary. + * + * @param timeout the timeout in milliseconds, or -1 for no timeout + * @return the object, or null if it timed out + * @throws InterruptedException if the thread is interrupted + */ public T get(long timeout) throws InterruptedException { long startedWaiting = timeout > 0 ? System.currentTimeMillis() : -1; @@ -148,6 +194,12 @@ public class HashSetDelayedBlockingQueue { throw new InterruptedException(); } + /** + * Gets the object from the queue, waiting for it if necessary. + * + * @return the object + * @throws InterruptedException if the thread is interrupted + */ public T get() throws InterruptedException { T ret; do { @@ -155,6 +207,11 @@ public class HashSetDelayedBlockingQueue { return ret; } + /** + * Checks if the queue has an object that is ready to be processed. + * + * @return true if there is an object ready, false otherwise + */ public boolean hasImmediate() { synchronized (this) { if (_set.isEmpty()) return false; @@ -166,6 +223,11 @@ public class HashSetDelayedBlockingQueue { } } + /** + * Tries to get the object from the queue without waiting. + * + * @return the object, or null if it doesn't exist + */ @Nullable public T tryGet() { synchronized (this) { @@ -182,6 +244,11 @@ public class HashSetDelayedBlockingQueue { } } + /** + * Gets all objects from the queue that are ready to be processed. + * + * @return a collection of objects + */ public Collection getAll() { ArrayList out = new ArrayList<>(); @@ -198,6 +265,11 @@ public class HashSetDelayedBlockingQueue { return out; } + /** + * Closes the queue and returns all objects in it. + * + * @return a collection of objects + */ public Collection close() { synchronized (this) { _closed = true; @@ -207,6 +279,12 @@ public class HashSetDelayedBlockingQueue { } } + /** + * Gets all objects from the queue, waiting for them if necessary. + * + * @return a collection of objects + * @throws InterruptedException if the thread is interrupted + */ public Collection getAllWait() throws InterruptedException { Collection out; do { @@ -214,6 +292,13 @@ public class HashSetDelayedBlockingQueue { return out; } + /** + * Gets all objects from the queue, waiting for them if necessary. + * + * @param max the maximum number of objects to get + * @return a collection of objects + * @throws InterruptedException if the thread is interrupted + */ public Collection getAllWait(int max) throws InterruptedException { Collection out; do { @@ -221,6 +306,14 @@ public class HashSetDelayedBlockingQueue { return out; } + /** + * Gets all objects from the queue, waiting for them if necessary. + * + * @param max the maximum number of objects to get + * @param timeout the timeout in milliseconds, or -1 for no timeout + * @return a collection of objects + * @throws InterruptedException if the thread is interrupted + */ public Collection getAllWait(int max, long timeout) throws InterruptedException { ArrayList out = new ArrayList<>(); diff --git a/dhfs-parent/utils/src/main/java/com/usatiuk/utils/ListUtils.java b/dhfs-parent/utils/src/main/java/com/usatiuk/utils/ListUtils.java index c9b06caa..74c0391e 100644 --- a/dhfs-parent/utils/src/main/java/com/usatiuk/utils/ListUtils.java +++ b/dhfs-parent/utils/src/main/java/com/usatiuk/utils/ListUtils.java @@ -3,8 +3,21 @@ package com.usatiuk.utils; import java.util.List; import java.util.function.Function; +/** + * Utility class for list operations. + */ public class ListUtils { + /** + * Prepends an item to a list and maps the rest of the list using a provided function. + * + * @param item The item to prepend. + * @param suffix The list to append to. + * @param suffixFn The function to map the suffix items. + * @param The type of the items in the list. + * @param The type of the mapped items. + * @return A new list with the prepended item and mapped suffix items. + */ public static List prependAndMap(T_V item, List suffix, Function suffixFn) { T_V[] arr = (T_V[]) new Object[suffix.size() + 1]; arr[0] = item; @@ -14,6 +27,14 @@ public class ListUtils { return List.of(arr); } + /** + * Prepends an item to a list. + * + * @param item The item to prepend. + * @param suffix The list to append to. + * @param The type of the items in the list. + * @return A new list with the prepended item and the original suffix items. + */ public static List prepend(T item, List suffix) { T[] arr = (T[]) new Object[suffix.size() + 1]; arr[0] = item; @@ -23,6 +44,15 @@ public class ListUtils { return List.of(arr); } + /** + * Maps a list using a provided function. + * + * @param suffix The list to map. + * @param suffixFn The function to map the items. + * @param The type of the items in the list. + * @param The type of the mapped items. + * @return A new list with the mapped items. + */ public static List map(List suffix, Function suffixFn) { T_V[] arr = (T_V[]) new Object[suffix.size()]; for (int i = 0; i < suffix.size(); i++) { diff --git a/dhfs-parent/utils/src/main/java/com/usatiuk/utils/SerializationHelper.java b/dhfs-parent/utils/src/main/java/com/usatiuk/utils/SerializationHelper.java index 2df57b06..ce7cfb71 100644 --- a/dhfs-parent/utils/src/main/java/com/usatiuk/utils/SerializationHelper.java +++ b/dhfs-parent/utils/src/main/java/com/usatiuk/utils/SerializationHelper.java @@ -10,6 +10,13 @@ import java.io.IOException; import java.io.InputStream; import java.io.Serializable; +/** + * Utility class for serialization and deserialization of objects. + *

+ * This class provides methods to serialize and deserialize objects using Java's built-in serialization mechanism. + * It also includes methods to handle byte arrays and input streams for serialization and deserialization. + *

+ */ public abstract class SerializationHelper { // Taken from SerializationUtils public static T deserialize(final InputStream inputStream) { diff --git a/dhfs-parent/utils/src/main/java/com/usatiuk/utils/StatusRuntimeExceptionNoStacktrace.java b/dhfs-parent/utils/src/main/java/com/usatiuk/utils/StatusRuntimeExceptionNoStacktrace.java index 963da69d..2a549baa 100644 --- a/dhfs-parent/utils/src/main/java/com/usatiuk/utils/StatusRuntimeExceptionNoStacktrace.java +++ b/dhfs-parent/utils/src/main/java/com/usatiuk/utils/StatusRuntimeExceptionNoStacktrace.java @@ -6,6 +6,9 @@ import io.grpc.StatusRuntimeException; import javax.annotation.Nullable; +/** + * A {@link StatusRuntimeException} that does not fill in the stack trace. + */ public class StatusRuntimeExceptionNoStacktrace extends StatusRuntimeException { public StatusRuntimeExceptionNoStacktrace(Status status) { super(status); diff --git a/dhfs-parent/utils/src/main/java/com/usatiuk/utils/UninitializedByteBuffer.java b/dhfs-parent/utils/src/main/java/com/usatiuk/utils/UninitializedByteBuffer.java index 3e857d90..9c2ee9f9 100644 --- a/dhfs-parent/utils/src/main/java/com/usatiuk/utils/UninitializedByteBuffer.java +++ b/dhfs-parent/utils/src/main/java/com/usatiuk/utils/UninitializedByteBuffer.java @@ -5,6 +5,9 @@ import java.lang.invoke.MethodHandle; import java.nio.ByteBuffer; import java.util.function.Consumer; +/** + * Utility class for creating uninitialized ByteBuffers, to avoid zeroing memory unnecessarily. + */ public class UninitializedByteBuffer { private static final Linker LINKER = Linker.nativeLinker(); private static final MethodHandle malloc = LINKER.downcallHandle( @@ -16,6 +19,12 @@ public class UninitializedByteBuffer { FunctionDescriptor.ofVoid(ValueLayout.ADDRESS) ); + /** + * Allocates a new uninitialized ByteBuffer of the specified capacity. + * + * @param capacity the capacity of the ByteBuffer + * @return a new uninitialized ByteBuffer + */ public static ByteBuffer allocate(int capacity) { UnsafeAccessor.NIO.reserveMemory(capacity, capacity); @@ -38,6 +47,12 @@ public class UninitializedByteBuffer { return reint.asByteBuffer(); } + /** + * Gets the address of the given ByteBuffer. + * + * @param buffer the ByteBuffer to get the address of + * @return the address of the ByteBuffer + */ public static long getAddress(ByteBuffer buffer) { return UnsafeAccessor.NIO.getBufferAddress(buffer); } diff --git a/dhfs-parent/utils/src/main/java/com/usatiuk/utils/UnsafeAccessor.java b/dhfs-parent/utils/src/main/java/com/usatiuk/utils/UnsafeAccessor.java index 17d72460..997840ec 100644 --- a/dhfs-parent/utils/src/main/java/com/usatiuk/utils/UnsafeAccessor.java +++ b/dhfs-parent/utils/src/main/java/com/usatiuk/utils/UnsafeAccessor.java @@ -6,6 +6,9 @@ import sun.misc.Unsafe; import java.lang.reflect.Field; +/** + * Provides access to the {@link Unsafe} class and {@link JavaNioAccess} class. + */ public abstract class UnsafeAccessor { public static final JavaNioAccess NIO; public static final Unsafe UNSAFE;