Sync-base: more javadocs 2

This commit is contained in:
2025-05-13 22:50:24 +02:00
parent 838405fb46
commit 83ceefa041
46 changed files with 546 additions and 75 deletions

View File

@@ -1,3 +1,5 @@
package com.usatiuk.dhfs;
import io.quarkus.logging.Log;
import io.quarkus.runtime.ShutdownEvent;
import io.quarkus.runtime.StartupEvent;
@@ -12,6 +14,9 @@ import java.util.Arrays;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* Periodically check for deadlocks in the JVM and log them if found.
*/
@ApplicationScoped
public class DeadlockDetector {
private final ExecutorService _executor = Executors.newSingleThreadExecutor();

View File

@@ -6,7 +6,7 @@ import com.usatiuk.utils.SerializationHelper;
import jakarta.inject.Singleton;
@Singleton
public class TemporaryOpSerializer implements ProtoSerializer<OpP, Op> {
public class OpSerializer implements ProtoSerializer<OpP, Op> {
@Override
public Op deserialize(OpP message) {
return SerializationHelper.deserialize(message.getSerializedData().toByteArray());

View File

@@ -11,6 +11,11 @@ import org.eclipse.microprofile.config.inject.ConfigProperty;
import java.io.IOException;
import java.nio.file.Paths;
/**
* This class checks if the application was shut down cleanly.
* It creates a file in the specified directory on startup and deletes it on shutdown.
* If the file exists on startup, it means the application was not shut down cleanly.
*/
@ApplicationScoped
public class ShutdownChecker {
private static final String dataFileName = "running";

View File

@@ -8,9 +8,7 @@ import java.util.Optional;
/**
* Helper class for parsing peer addresses from strings.
* <p>
* The expected format is: <peerId>:<ip>:<port>:<securePort>
* </p>
* The expected format is: peerId:ip:port:securePort
*/
public class PeerAddrStringHelper {

View File

@@ -13,6 +13,8 @@ import java.security.cert.X509Certificate;
/**
* Represents information about a peer in the cluster
* {@link JDataRemotePush} annotation is used, as invalidating a peer information by the peer itself might make it unreachable,
* as it will not be possible to download it from the invalidated peer, so the peer information should be send with the notification
*
* @param key the key of the peer
* @param id the ID of the peer

View File

@@ -191,12 +191,6 @@ public class ReachablePeerManager {
return _states.keySet().stream().toList();
}
// public List<UUID> getUnavailableHosts() {
// return _transientPeersState.runReadLocked(d -> d.getStates().entrySet().stream()
// .filter(e -> !e.getValue().isReachable())
// .map(Map.Entry::getKey).toList());
// }
/**
* Gets a snapshot of current state of the connected (and not connected) peers
* @return information about all connected/disconnected peers

View File

@@ -22,8 +22,17 @@ import java.security.cert.X509Certificate;
import java.util.Calendar;
import java.util.Date;
/**
* Helper class for generating and manipulating X.509 certificates.
*/
public class CertificateTools {
/**
* Converts a byte array to an X.509 certificate.
*
* @param bytes the byte array representing the certificate
* @return the X.509 certificate
*/
public static X509Certificate certFromBytes(byte[] bytes) {
try {
CertificateFactory certFactory = CertificateFactory.getInstance("X.509");
@@ -34,6 +43,10 @@ public class CertificateTools {
}
}
/**
* Generates a random RSA key pair.
* @return the generated RSA key pair
*/
public static KeyPair generateKeyPair() {
try {
KeyPairGenerator keyGen = KeyPairGenerator.getInstance("RSA");
@@ -44,6 +57,13 @@ public class CertificateTools {
}
}
/**
* Generates an X.509 certificate using the provided key pair and subject name.
*
* @param keyPair the key pair to use for the certificate
* @param subject the subject name for the certificate
* @return the generated X.509 certificate
*/
public static X509Certificate generateCertificate(KeyPair keyPair, String subject) {
try {
Provider bcProvider = new BouncyCastleProvider();

View File

@@ -15,6 +15,9 @@ import io.quarkus.logging.Log;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
/**
* Automatically refreshes certificates in the trust manager for peers when their info is updated.
*/
@Singleton
public class PeerInfoCertUpdateTxHook implements PreCommitTxHook {
@Inject

View File

@@ -14,6 +14,9 @@ import jakarta.inject.Inject;
import java.util.function.Supplier;
/**
* Augments the security identity of peers that are members of the cluster.
*/
@ApplicationScoped
public class PeerRolesAugmentor implements SecurityIdentityAugmentor {
@Inject

View File

@@ -17,6 +17,9 @@ import java.security.cert.X509Certificate;
import java.util.Collection;
import java.util.concurrent.atomic.AtomicReference;
/**
* Custom trust manager that trusts the certificates of peers in the cluster.
*/
@ApplicationScoped
public class PeerTrustManager implements X509TrustManager {
private final AtomicReference<X509TrustManager> trustManager = new AtomicReference<>();

View File

@@ -13,6 +13,9 @@ import javax.net.ssl.KeyManagerFactory;
import java.security.KeyStore;
import java.security.cert.Certificate;
/**
* Customizes the HTTP server options to use the peer trust manager and the self-signed certificate.
*/
@ApplicationScoped
public class PeerTrustServerCustomizer implements HttpServerOptionsCustomizer {

View File

@@ -10,6 +10,10 @@ import io.quarkus.logging.Log;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
/**
* Automatically deletes objects that are not referenced by anything else.
* For remote objects, it puts them into the deletion queue.
*/
@Singleton
public class DeleterTxHook implements PreCommitTxHook {
@Inject

View File

@@ -3,6 +3,10 @@ package com.usatiuk.dhfs.refcount;
import com.usatiuk.dhfs.jmap.JMapRef;
import com.usatiuk.objects.JObjectKey;
/**
* Good old boring object reference.
* @param obj the object that is the source of the reference
*/
public record JDataNormalRef(JObjectKey obj) implements JDataRef {
@Override
public int compareTo(JDataRef o) {

View File

@@ -7,15 +7,46 @@ import org.pcollections.PCollection;
import java.util.Collection;
import java.util.List;
/**
* Interface for a reference counted object
*/
public interface JDataRefcounted extends JData {
/**
* Returns list of incoming references to this object.
*
* @return list of incoming references
*/
PCollection<JDataRef> refsFrom();
/**
* Create a copy of this object with the given list of incoming references.
*
* @param refs list of incoming references
* @return copy of this object with the given list of incoming references
*/
JDataRefcounted withRefsFrom(PCollection<JDataRef> refs);
/**
* Returns whether this object is frozen or not.
* A frozen object cannot be garbage collected.
*
* @return true if this object is frozen, false otherwise
*/
boolean frozen();
/**
* Create a copy of this object with the given frozen state.
*
* @param frozen true if this object should be frozen, false otherwise
* @return copy of this object with the given frozen state
*/
JDataRefcounted withFrozen(boolean frozen);
/**
* Collect outgoing references to other objects.
*
* @return list of outgoing references
*/
default Collection<JObjectKey> collectRefsTo() {
return List.of();
}

View File

@@ -1,7 +0,0 @@
package com.usatiuk.dhfs.remoteobj;
import com.usatiuk.dhfs.peersync.PeerId;
public interface ConflictResolver {
void resolve(PeerId fromPeer, RemoteObjectMeta ours, RemoteObjectMeta theirs);
}

View File

@@ -12,6 +12,9 @@ import org.pcollections.PMap;
import javax.annotation.Nullable;
/**
* Fallback handler for remote object updates, no conflict resolution.
*/
@ApplicationScoped
public class DefaultObjSyncHandler {
@Inject

View File

@@ -6,17 +6,43 @@ import java.io.Serializable;
import java.util.Collection;
import java.util.List;
/**
* Interface for a remote object. Remote objects are objects that are automatically synchronized between peers,
* and versioned using a version vector.
*/
public interface JDataRemote extends Serializable {
/**
* Returns the key of this remote object.
*
* @return the key of this remote object
*/
JObjectKey key();
/**
* Returns the estimated size of this remote object in bytes.
*
* @return the estimated size of this remote object in bytes
*/
default int estimateSize() {
return 100;
}
/**
* Collect outgoing references to other objects.
*
* @return list of outgoing references
*/
default Collection<JObjectKey> collectRefsTo() {
return List.of();
}
/**
* Returns the class of DTO of this object that should be used for remote synchronization.
* It can be the same as the object.
*
* @return the class of DTO of this object that should be used for remote synchronization
*/
default Class<? extends JDataRemoteDto> dtoClass() {
assert JDataRemoteDto.class.isAssignableFrom(getClass());
return (Class<? extends JDataRemoteDto>) this.getClass();

View File

@@ -2,7 +2,15 @@ package com.usatiuk.dhfs.remoteobj;
import java.io.Serializable;
/**
* Marker interface for a DTO class to be used when synchronizing some remote object.
*/
public interface JDataRemoteDto extends Serializable {
/**
* Returns the class of the remote object that this DTO represents.
*
* @return the class of the remote object that this DTO represents
*/
default Class<? extends JDataRemote> objClass() {
assert JDataRemote.class.isAssignableFrom(getClass());
return (Class<? extends JDataRemote>) this.getClass();

View File

@@ -5,6 +5,11 @@ import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* Annotation for eagerly pushed remote objects.
* This annotation is used to mark remote object which notification operations should contain the object itself,
* to avoid the other peer having to download it.
*/
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
public @interface JDataRemotePush {

View File

@@ -6,7 +6,21 @@ import org.pcollections.PMap;
import javax.annotation.Nullable;
/**
* Interface for handling remote updates of objects.
*
* @param <T> the type of the remote object
* @param <D> the type of the remote object DTO
*/
public interface ObjSyncHandler<T extends JDataRemote, D extends JDataRemoteDto> {
/**
* Handles a remote update of an object.
*
* @param from the ID of the peer that sent the update
* @param key the key of the object
* @param receivedChangelog the changelog received from the peer
* @param receivedData the data received from the peer
*/
void handleRemoteUpdate(PeerId from, JObjectKey key,
PMap<PeerId, Long> receivedChangelog,
@Nullable D receivedData);

View File

@@ -9,6 +9,9 @@ import com.usatiuk.objects.transaction.Transaction;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
/**
* Transaction hook to automatically notify {@link InvalidationQueueService} about changed objects.
*/
@Singleton
public class RemoteObjPusherTxHook implements PreCommitTxHook {
@Inject

View File

@@ -8,6 +8,12 @@ import org.pcollections.PCollection;
import java.util.Collection;
/**
* Wrapper for remote object data.
* This class is used to store additional metadata about incoming references to objects for reference counting.
*
* @param <T> the type of the remote object data
*/
public record RemoteObjectDataWrapper<T extends JDataRemote>(
JObjectKey key,
PCollection<JDataRef> refsFrom,

View File

@@ -29,6 +29,9 @@ import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
/**
* Handles deletion of remote objects, asynchronously.
*/
@ApplicationScoped
public class RemoteObjectDeleter {
private final HashSetDelayedBlockingQueue<JObjectKey> _quickCandidates = new HashSetDelayedBlockingQueue<>(0);
@@ -79,14 +82,7 @@ public class RemoteObjectDeleter {
_refProcessorExecutorService.submit(this::refProcessor);
}
// Continue GC from last shutdown
//FIXME
// executorService.submit(() ->
// jObjectManager.findAll().forEach(n -> {
// jObjectManager.get(n).ifPresent(o -> o.runWriteLocked(JObjectManager.ResolutionStrategy.NO_RESOLUTION, (m, d, b, v) -> {
// return null;
// }));
// }));
// TODO: Continue GC from last shutdown
}
void shutdown(@Observes @Priority(800) ShutdownEvent event) throws InterruptedException {
@@ -96,10 +92,10 @@ public class RemoteObjectDeleter {
}
}
// public void putQuickDeletionCandidate(JObjectKey obj) {
// _quickCandidates.add(obj);
// }
/**
* Add a deletion candidate to the queue.
* @param obj the object to be deleted
*/
public void putDeletionCandidate(RemoteObjectMeta obj) {
if (!obj.seen()) {
if (_quickCandidates.add(obj.key()))
@@ -177,7 +173,6 @@ public class RemoteObjectDeleter {
});
}
// FIXME:
private boolean canDelete(JDataRefcounted obj) {
return obj.refsFrom().isEmpty() && !obj.frozen();
}

View File

@@ -9,6 +9,10 @@ import org.pcollections.*;
import java.util.Collection;
import java.util.List;
/**
* Metadata for remote objects.
* This class makes a peer aware of remote object's existence without necessarily downloading its data.
*/
public record RemoteObjectMeta(JObjectKey key, PCollection<JDataRef> refsFrom, boolean frozen,
PMap<PeerId, Long> knownRemoteVersions,
Class<? extends JDataRemote> knownType,

View File

@@ -12,6 +12,9 @@ import org.pcollections.HashTreePSet;
import java.util.Optional;
/**
* Helper class for working with remote objects.
*/
@Singleton
public class RemoteTransaction {
@Inject
@@ -71,10 +74,22 @@ public class RemoteTransaction {
});
}
/**
* Get metadata for the object with the given key.
*
* @param key the key of the object
* @return an Optional containing the metadata if it exists, or an empty Optional if it doesn't
*/
public Optional<RemoteObjectMeta> getMeta(JObjectKey key) {
return curTx.get(RemoteObjectMeta.class, RemoteObjectMeta.ofMetaKey(key));
}
/**
* Put the data of a remote object into the storage, without incrementing the version vector.
*
* @param obj the object to put
* @param <T> the type of the object
*/
public <T extends JDataRemote> void putDataRaw(T obj) {
var curMeta = getMeta(obj.key()).orElse(null);
if (curMeta == null)
@@ -92,11 +107,25 @@ public class RemoteTransaction {
curTx.put(newData);
}
/**
* Put the data of a remote object into the storage, creating a new object.
* Should only be used when an object is known to be new. (for example, when it is created with a unique random key)
*
* @param obj the object to put
* @param <T> the type of the object
*/
public <T extends JDataRemote> void putDataNew(T obj) {
curTx.putNew(new RemoteObjectMeta(obj, persistentPeerDataService.getSelfUuid()));
curTx.putNew(new RemoteObjectDataWrapper<>(obj));
}
/**
* Put the data of a remote object into the storage, either creating a new object or updating an existing one.
* If the object already exists, its version vector is incremented.
*
* @param obj the object to put
* @param <T> the type of the object
*/
public <T extends JDataRemote> void putData(T obj) {
var curMeta = getMeta(obj.key()).orElse(null);
@@ -126,10 +155,26 @@ public class RemoteTransaction {
curTx.put(newData);
}
/**
* Get the data of a remote object with the given key.
* The data can be downloaded from the remote object if it is not already present in the local storage.
*
* @param type the type of the object
* @param key the key of the object
* @return an Optional containing the data if it exists, or an empty Optional if it doesn't
*/
public <T extends JDataRemote> Optional<T> getData(Class<T> type, JObjectKey key) {
return getData(type, key, true);
}
/**
* Get the data of a remote object with the given key.
* The data will not be downloaded from the remote object if it is not already present in the local storage.
*
* @param type the type of the object
* @param key the key of the object
* @return an Optional containing the data if it exists, or an empty Optional if it doesn't
*/
public <T extends JDataRemote> Optional<T> getDataLocal(Class<T> type, JObjectKey key) {
return getData(type, key, false);
}

View File

@@ -29,6 +29,9 @@ import java.util.Map;
import java.util.Optional;
import java.util.stream.Stream;
/**
* Handles synchronization of remote objects.
*/
@ApplicationScoped
public class SyncHandler {
private final Map<Class<? extends JDataRemote>, ObjSyncHandler> _objToSyncHandler;
@@ -93,6 +96,15 @@ public class SyncHandler {
_initialSyncProcessors = Map.copyOf(initialSyncProcessorHashMap);
}
/**
* Handles remote update of an object.
*
* @param from the ID of the peer that sent the update
* @param key the key of the object
* @param receivedChangelog the changelog received from the peer
* @param receivedData the data received from the peer
* @param <D> the type of the remote object DTO
*/
public <D extends JDataRemoteDto> void handleRemoteUpdate(PeerId from, JObjectKey key,
PMap<PeerId, Long> receivedChangelog,
@Nullable D receivedData) {
@@ -123,7 +135,9 @@ public class SyncHandler {
}
}
/**
* Resync objects after a crash.
*/
public void resyncAfterCrash(@Observes @Priority(100000) StartupEvent event) {
if (shutdownChecker.lastShutdownClean())
return;
@@ -145,6 +159,11 @@ public class SyncHandler {
});
}
/**
* Do initial sync for a newly connected (or reconnected) peer.
*
* @param peer the ID of the peer
*/
public void doInitialSync(PeerId peer) {
txm.run(() -> {
Log.tracev("Will do initial sync for {0}", peer);

View File

@@ -1,9 +0,0 @@
package com.usatiuk.dhfs.remoteobj;
import jakarta.enterprise.context.ApplicationScoped;
@ApplicationScoped
public class SyncHandlerService {
}

View File

@@ -6,7 +6,18 @@ import org.pcollections.PMap;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
* Helper class for synchronizing objects.
*/
public class SyncHelper {
/**
* Compares two changelogs.
*
* @param current the current changelog
* @param other the other changelog
* @return the result of the comparison
*/
public static ChangelogCmpResult compareChangelogs(PMap<PeerId, Long> current, PMap<PeerId, Long> other) {
boolean hasLower = false;
boolean hasHigher = false;

View File

@@ -14,7 +14,7 @@ import org.pcollections.HashTreePMap;
import org.pcollections.PMap;
@ApplicationScoped
public class TemporaryReceivedObjectSerializer implements ProtoSerializer<GetObjectReply, ReceivedObject> {
public class ReceivedObjectSerializer implements ProtoSerializer<GetObjectReply, ReceivedObject> {
@Inject
ProtoSerializer<JDataRemoteDtoP, JDataRemoteDto> remoteObjectSerializer;

View File

@@ -7,7 +7,7 @@ import com.usatiuk.utils.SerializationHelper;
import jakarta.enterprise.context.ApplicationScoped;
@ApplicationScoped
public class TemporaryRemoteObjectSerializer implements ProtoSerializer<JDataRemoteDtoP, JDataRemoteDto> {
public class RemoteObjectSerializer implements ProtoSerializer<JDataRemoteDtoP, JDataRemoteDto> {
@Override
public JDataRemoteDto deserialize(JDataRemoteDtoP message) {
return SerializationHelper.deserialize(message.getSerializedData().toByteArray());

View File

@@ -4,8 +4,8 @@ import com.usatiuk.dhfs.ProtoSerializer;
import com.usatiuk.dhfs.invalidation.InvalidationQueueService;
import com.usatiuk.dhfs.invalidation.Op;
import com.usatiuk.dhfs.peersync.PeerId;
import com.usatiuk.dhfs.peersync.ReachablePeerManager;
import com.usatiuk.dhfs.peersync.PersistentPeerDataService;
import com.usatiuk.dhfs.peersync.ReachablePeerManager;
import com.usatiuk.dhfs.persistence.JObjectKeyP;
import com.usatiuk.dhfs.remoteobj.ReceivedObject;
import com.usatiuk.dhfs.remoteobj.RemoteObjectMeta;
@@ -29,6 +29,9 @@ import java.util.concurrent.*;
import java.util.function.Function;
import java.util.stream.Collectors;
/**
* Helper class for calling remote peers RPCs.
*/
@ApplicationScoped
public class RemoteObjectServiceClient {
private final ExecutorService _batchExecutor = Executors.newVirtualThreadPerTaskExecutor();
@@ -53,6 +56,13 @@ public class RemoteObjectServiceClient {
@Inject
ReachablePeerManager reachablePeerManager;
/**
* Download a specific object from a specific peer.
*
* @param key the key of the object to download
* @param peerId the ID of the peer to download from
* @return a pair of the peer ID from which the object was downloaded and the downloaded object
*/
public Pair<PeerId, ReceivedObject> getSpecificObject(JObjectKey key, PeerId peerId) {
return rpcClientFactory.withObjSyncClient(peerId, (peer, client) -> {
var reply = client.getObject(GetObjectRequest.newBuilder().setName(JObjectKeyP.newBuilder().setName(key.toString()).build()).build());
@@ -61,6 +71,12 @@ public class RemoteObjectServiceClient {
});
}
/**
* Download a specific object from some reachable peer.
*
* @param key the key of the object to download
* @param onReceive a callback function to process the received object
*/
public void getObject(JObjectKey key, Function<Pair<PeerId, ReceivedObject>, Boolean> onReceive) {
var objMeta = remoteTx.getMeta(key).orElse(null);
@@ -93,6 +109,13 @@ public class RemoteObjectServiceClient {
});
}
/**
* Push a list of operations to a specific peer.
*
* @param target the ID of the peer to push to
* @param ops the list of operations to push
* @return the reply from the peer
*/
public OpPushReply pushOps(PeerId target, List<Op> ops) {
var barrier = new CountDownLatch(ops.size());
for (Op op : ops) {
@@ -116,6 +139,13 @@ public class RemoteObjectServiceClient {
return OpPushReply.getDefaultInstance();
}
/**
* Ask given peers if they can delete the object with the given key.
*
* @param targets the list of peers to ask
* @param objKey the key of the object to delete
* @return a collection of pairs of peer IDs and their replies
*/
public Collection<Pair<PeerId, CanDeleteReply>> canDelete(Collection<PeerId> targets, JObjectKey objKey) {
Log.trace("Asking canDelete for " + objKey + " from " + targets.stream().map(PeerId::toString).collect(Collectors.joining(", ")));
try {

View File

@@ -47,7 +47,7 @@ public class RemoteObjectServiceServerImpl {
DtoMapperService dtoMapperService;
@Inject
AutosyncProcessor autosyncProcessor;
public Uni<GetObjectReply> getObject(PeerId from, GetObjectRequest request) {
Log.info("<-- getObject: " + request.getName() + " from " + from);

View File

@@ -15,6 +15,9 @@ import java.security.KeyStore;
import java.security.cert.Certificate;
import java.util.concurrent.TimeUnit;
/**
* Factory for creating gRPC channels
*/
@ApplicationScoped
public class RpcChannelFactory {
@Inject
@@ -39,6 +42,14 @@ public class RpcChannelFactory {
}
}
/**
* Creates a secure channel to the given host and port, with correct credentials.
*
* @param host the host to connect to
* @param address the address of the host
* @param port the port to connect to
* @return a secure gRPC channel
*/
ManagedChannel getSecureChannel(PeerId host, String address, int port) {
return NettyChannelBuilder.forAddress(address, port, getChannelCredentials()).overrideAuthority(host.toString()).idleTimeout(10, TimeUnit.SECONDS).build();
}

View File

@@ -22,7 +22,9 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
// TODO: Dedup this
/**
* Factory for creating gRPC clients for object synchronization and other RPC calls.
*/
@ApplicationScoped
public class RpcClientFactory implements PeerDisconnectedEventListener {
@ConfigProperty(name = "dhfs.objects.sync.timeout")
@@ -34,9 +36,16 @@ public class RpcClientFactory implements PeerDisconnectedEventListener {
@Inject
RpcChannelFactory rpcChannelFactory;
// FIXME: Leaks!
private ConcurrentMap<ObjSyncStubKey, DhfsObjectSyncGrpcGrpc.DhfsObjectSyncGrpcBlockingStub> _objSyncCache = new ConcurrentHashMap<>();
/**
* Try calling a given function on given peers in random order.
*
* @param targets the list of targets to call
* @param fn the function to call
* @param <R> the return type of the function
* @return the result of the function call
*/
public <R> R withObjSyncClient(Collection<PeerId> targets, ObjectSyncClientFunction<R> fn) {
var shuffledList = new ArrayList<>(targets);
Collections.shuffle(shuffledList);
@@ -55,6 +64,14 @@ public class RpcClientFactory implements PeerDisconnectedEventListener {
throw new StatusRuntimeException(Status.UNAVAILABLE.withDescription("No reachable targets!"));
}
/**
* Try calling a given function on a given target.
*
* @param target the target to call
* @param fn the function to call
* @param <R> the return type of the function
* @return the result of the function call
*/
public <R> R withObjSyncClient(PeerId target, ObjectSyncClientFunction<R> fn) {
var hostinfo = reachablePeerManager.getAddress(target);
@@ -64,6 +81,16 @@ public class RpcClientFactory implements PeerDisconnectedEventListener {
return withObjSyncClient(target, hostinfo, syncTimeout, fn);
}
/**
* Try calling a given function on a given target with a specified timeout.
*
* @param host the host to call
* @param address the address of the host
* @param timeout the timeout for the call
* @param fn the function to call
* @param <R> the return type of the function
* @return the result of the function call
*/
public <R> R withObjSyncClient(PeerId host, PeerAddress address, long timeout, ObjectSyncClientFunction<R> fn) {
return switch (address) {
case IpPeerAddress ipPeerAddress ->
@@ -72,6 +99,17 @@ public class RpcClientFactory implements PeerDisconnectedEventListener {
};
}
/**
* Try calling a given function on a given target with a specified timeout.
*
* @param host the host to call
* @param addr the address of the host
* @param port the port of the host
* @param timeout the timeout for the call
* @param fn the function to call
* @param <R> the return type of the function
* @return the result of the function call
*/
public <R> R withObjSyncClient(PeerId host, InetAddress addr, int port, long timeout, ObjectSyncClientFunction<R> fn) {
var key = new ObjSyncStubKey(host, addr, port);
var stub = _objSyncCache.computeIfAbsent(key, (k) -> {

View File

@@ -3,8 +3,26 @@ package com.usatiuk.dhfs.syncmap;
import com.usatiuk.dhfs.remoteobj.JDataRemote;
import com.usatiuk.dhfs.remoteobj.JDataRemoteDto;
/**
* Interface for mapping between a remote object and its DTO representation.
*
* @param <F> the type of the remote object
* @param <D> the type of the DTO
*/
public interface DtoMapper<F extends JDataRemote, D extends JDataRemoteDto> {
/**
* Converts a remote object to its DTO representation.
*
* @param obj the remote object to convert
* @return the DTO representation of the remote object
*/
D toDto(F obj);
/**
* Converts a DTO to its corresponding remote object.
*
* @param dto the DTO to convert
* @return the remote object representation of the DTO
*/
F fromDto(D dto);
}

View File

@@ -11,6 +11,9 @@ import java.util.HashMap;
import java.util.Map;
import java.util.stream.Stream;
/**
* Service for mapping between remote objects and their DTO representations.
*/
@Singleton
public class DtoMapperService {
private final Map<Class<? extends JDataRemote>, DtoMapper> _remoteToDtoMap;
@@ -41,6 +44,15 @@ public class DtoMapperService {
_dtoToRemoteMap = Map.copyOf(dtoToRemoteMap);
}
/**
* Converts a remote object to its DTO representation.
*
* @param from the remote object to convert
* @param to the class of the DTO representation
* @param <F> the type of the remote object
* @param <D> the type of the DTO
* @return the DTO representation of the remote object
*/
public <F extends JDataRemote, D extends JDataRemoteDto> D toDto(F from, Class<D> to) {
if (to.equals(from.getClass())) {
return (D) from;
@@ -50,6 +62,15 @@ public class DtoMapperService {
return to.cast(got);
}
/**
* Converts a DTO to its corresponding remote object.
*
* @param from the DTO to convert
* @param to the class of the remote object representation
* @param <F> the type of the remote object
* @param <D> the type of the DTO
* @return the remote object representation of the DTO
*/
public <F extends JDataRemote, D extends JDataRemoteDto> F fromDto(D from, Class<F> to) {
if (to.equals(from.getClass())) {
return (F) from;

View File

@@ -3,13 +3,16 @@ package com.usatiuk.dhfs.webapi;
import com.usatiuk.dhfs.peerdiscovery.PeerAddrStringHelper;
import com.usatiuk.dhfs.peersync.PeerId;
import com.usatiuk.dhfs.peersync.PeerInfoService;
import com.usatiuk.dhfs.peersync.ReachablePeerManager;
import com.usatiuk.dhfs.peersync.PersistentPeerDataService;
import com.usatiuk.dhfs.peersync.ReachablePeerManager;
import jakarta.inject.Inject;
import jakarta.ws.rs.*;
import java.util.Collection;
/**
* Simple API for managing connected peers and manually specifying their addresses.
*/
@Path("/peers-addr-manage")
public class PersistentPeerAddressApi {
@Inject

View File

@@ -12,6 +12,9 @@ import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Optional;
/**
* Web UI router for serving static files.
*/
@ApplicationScoped
public class WebUiRouter {

View File

@@ -1,18 +0,0 @@
package com.usatiuk.utils;
import java.nio.ByteBuffer;
public class ByteUtils {
public static byte[] longToBytes(long val) {
return ByteBuffer.wrap(new byte[8]).putLong(val).array();
}
public static long bytesToLong(byte[] bytes) {
return ByteBuffer.wrap(bytes).getLong();
}
// Returns a ByteBuffer of size 8 with position reset
public static ByteBuffer longToBb(long val) {
return ByteBuffer.allocate(8).putLong(val).flip();
}
}

View File

@@ -9,6 +9,9 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* Allows to lock arbitrary keys.
*/
public class DataLocker {
private final ConcurrentHashMap<Object, WeakReference<ReentrantLock>> _locks = new ConcurrentHashMap<>();
private static final Cleaner CLEANER = Cleaner.create();
@@ -36,6 +39,12 @@ public class DataLocker {
}
}
/**
* Locks the data and returns an AutoCloseable that unlocks it when closed.
*
* @param data the data to lock
* @return an AutoCloseable that unlocks the data when closed
*/
@Nonnull
public AutoCloseableNoThrow lock(Object data) {
var lock = getTag(data);
@@ -43,6 +52,13 @@ public class DataLocker {
return lock::unlock;
}
/**
* Tries to lock the data and returns an AutoCloseable that unlocks it when closed.
* If the lock is not acquired, returns null.
*
* @param data the data to lock
* @return an AutoCloseable that unlocks the data when closed, or null if the lock was not acquired
*/
@Nullable
public AutoCloseableNoThrow tryLock(Object data) {
var lock = getTag(data);

View File

@@ -7,20 +7,38 @@ import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.function.Function;
/**
* Blocking queue that delays the objects for a given time, and deduplicates them.
*
* @param <T> the type of the objects in the queue
*/
public class HashSetDelayedBlockingQueue<T> {
private final LinkedHashMap<T, SetElement<T>> _set = new LinkedHashMap<>();
private final Object _sleepSynchronizer = new Object();
private long _delay;
private boolean _closed = false;
/**
* Creates a new HashSetDelayedBlockingQueue with the specified delay.
*
* @param delay the delay in milliseconds
*/
public HashSetDelayedBlockingQueue(long delay) {
_delay = delay;
}
/**
* @return the delay in milliseconds
*/
public long getDelay() {
return _delay;
}
/**
* Sets the delay for the queue.
*
* @param delay the delay in milliseconds
*/
public void setDelay(long delay) {
synchronized (_sleepSynchronizer) {
_delay = delay;
@@ -28,8 +46,12 @@ public class HashSetDelayedBlockingQueue<T> {
}
}
// If there's object with key in the queue, don't do anything
// Returns whether it was added or not
/**
* Adds the object to the queue if it doesn't exist.
*
* @param el the object to add
* @return true if the object was added, false if it already exists
*/
public boolean add(T el) {
synchronized (this) {
if (_closed) throw new IllegalStateException("Adding to a queue that is closed!");
@@ -43,9 +65,12 @@ public class HashSetDelayedBlockingQueue<T> {
}
// Adds the object to the queue, if it exists re-adds it
// With no delay
// Returns the old object, or null
/**
* Adds the object to the queue with no delay.
*
* @param el the object to add
* @return the old object if it existed, null otherwise
*/
public T addNoDelay(T el) {
synchronized (this) {
if (_closed) throw new IllegalStateException("Adding to a queue that is closed!");
@@ -60,8 +85,12 @@ public class HashSetDelayedBlockingQueue<T> {
}
}
// Adds the object to the queue, if it exists re-adds it with a new delay
// Returns the old object, or null
/**
* Adds the object to the queue, if it exists re-adds it with a new delay
*
* @param el the object to add
* @return the old object if it existed, null otherwise
*/
public T readd(T el) {
synchronized (this) {
if (_closed) throw new IllegalStateException("Adding to a queue that is closed!");
@@ -76,8 +105,13 @@ public class HashSetDelayedBlockingQueue<T> {
}
}
// Merges the object with the old one
// Returns the old object, or null
/**
* Merges the object with the old one.
*
* @param el the object to merge
* @param transformer the function to transform the old object
* @return the old object if it existed, null otherwise
*/
public T merge(T el, Function<T, T> transformer) {
synchronized (this) {
if (_closed) throw new IllegalStateException("Adding to a queue that is closed!");
@@ -97,7 +131,12 @@ public class HashSetDelayedBlockingQueue<T> {
}
}
// Removes the object
/**
* Removes the object from the queue.
*
* @param el the object to remove
* @return the removed object, or null if it didn't exist
*/
public T remove(T el) {
synchronized (this) {
var rem = _set.remove(el);
@@ -106,6 +145,13 @@ public class HashSetDelayedBlockingQueue<T> {
}
}
/**
* Gets the object from the queue, waiting for it if necessary.
*
* @param timeout the timeout in milliseconds, or -1 for no timeout
* @return the object, or null if it timed out
* @throws InterruptedException if the thread is interrupted
*/
public T get(long timeout) throws InterruptedException {
long startedWaiting = timeout > 0 ? System.currentTimeMillis() : -1;
@@ -148,6 +194,12 @@ public class HashSetDelayedBlockingQueue<T> {
throw new InterruptedException();
}
/**
* Gets the object from the queue, waiting for it if necessary.
*
* @return the object
* @throws InterruptedException if the thread is interrupted
*/
public T get() throws InterruptedException {
T ret;
do {
@@ -155,6 +207,11 @@ public class HashSetDelayedBlockingQueue<T> {
return ret;
}
/**
* Checks if the queue has an object that is ready to be processed.
*
* @return true if there is an object ready, false otherwise
*/
public boolean hasImmediate() {
synchronized (this) {
if (_set.isEmpty()) return false;
@@ -166,6 +223,11 @@ public class HashSetDelayedBlockingQueue<T> {
}
}
/**
* Tries to get the object from the queue without waiting.
*
* @return the object, or null if it doesn't exist
*/
@Nullable
public T tryGet() {
synchronized (this) {
@@ -182,6 +244,11 @@ public class HashSetDelayedBlockingQueue<T> {
}
}
/**
* Gets all objects from the queue that are ready to be processed.
*
* @return a collection of objects
*/
public Collection<T> getAll() {
ArrayList<T> out = new ArrayList<>();
@@ -198,6 +265,11 @@ public class HashSetDelayedBlockingQueue<T> {
return out;
}
/**
* Closes the queue and returns all objects in it.
*
* @return a collection of objects
*/
public Collection<T> close() {
synchronized (this) {
_closed = true;
@@ -207,6 +279,12 @@ public class HashSetDelayedBlockingQueue<T> {
}
}
/**
* Gets all objects from the queue, waiting for them if necessary.
*
* @return a collection of objects
* @throws InterruptedException if the thread is interrupted
*/
public Collection<T> getAllWait() throws InterruptedException {
Collection<T> out;
do {
@@ -214,6 +292,13 @@ public class HashSetDelayedBlockingQueue<T> {
return out;
}
/**
* Gets all objects from the queue, waiting for them if necessary.
*
* @param max the maximum number of objects to get
* @return a collection of objects
* @throws InterruptedException if the thread is interrupted
*/
public Collection<T> getAllWait(int max) throws InterruptedException {
Collection<T> out;
do {
@@ -221,6 +306,14 @@ public class HashSetDelayedBlockingQueue<T> {
return out;
}
/**
* Gets all objects from the queue, waiting for them if necessary.
*
* @param max the maximum number of objects to get
* @param timeout the timeout in milliseconds, or -1 for no timeout
* @return a collection of objects
* @throws InterruptedException if the thread is interrupted
*/
public Collection<T> getAllWait(int max, long timeout) throws InterruptedException {
ArrayList<T> out = new ArrayList<>();

View File

@@ -3,8 +3,21 @@ package com.usatiuk.utils;
import java.util.List;
import java.util.function.Function;
/**
* Utility class for list operations.
*/
public class ListUtils {
/**
* Prepends an item to a list and maps the rest of the list using a provided function.
*
* @param item The item to prepend.
* @param suffix The list to append to.
* @param suffixFn The function to map the suffix items.
* @param <T> The type of the items in the list.
* @param <T_V> The type of the mapped items.
* @return A new list with the prepended item and mapped suffix items.
*/
public static <T, T_V> List<T_V> prependAndMap(T_V item, List<T> suffix, Function<T, T_V> suffixFn) {
T_V[] arr = (T_V[]) new Object[suffix.size() + 1];
arr[0] = item;
@@ -14,6 +27,14 @@ public class ListUtils {
return List.of(arr);
}
/**
* Prepends an item to a list.
*
* @param item The item to prepend.
* @param suffix The list to append to.
* @param <T> The type of the items in the list.
* @return A new list with the prepended item and the original suffix items.
*/
public static <T> List<T> prepend(T item, List<T> suffix) {
T[] arr = (T[]) new Object[suffix.size() + 1];
arr[0] = item;
@@ -23,6 +44,15 @@ public class ListUtils {
return List.of(arr);
}
/**
* Maps a list using a provided function.
*
* @param suffix The list to map.
* @param suffixFn The function to map the items.
* @param <T> The type of the items in the list.
* @param <T_V> The type of the mapped items.
* @return A new list with the mapped items.
*/
public static <T, T_V> List<T_V> map(List<T> suffix, Function<T, T_V> suffixFn) {
T_V[] arr = (T_V[]) new Object[suffix.size()];
for (int i = 0; i < suffix.size(); i++) {

View File

@@ -10,6 +10,13 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.Serializable;
/**
* Utility class for serialization and deserialization of objects.
* <p>
* This class provides methods to serialize and deserialize objects using Java's built-in serialization mechanism.
* It also includes methods to handle byte arrays and input streams for serialization and deserialization.
* </p>
*/
public abstract class SerializationHelper {
// Taken from SerializationUtils
public static <T> T deserialize(final InputStream inputStream) {

View File

@@ -6,6 +6,9 @@ import io.grpc.StatusRuntimeException;
import javax.annotation.Nullable;
/**
* A {@link StatusRuntimeException} that does not fill in the stack trace.
*/
public class StatusRuntimeExceptionNoStacktrace extends StatusRuntimeException {
public StatusRuntimeExceptionNoStacktrace(Status status) {
super(status);

View File

@@ -5,6 +5,9 @@ import java.lang.invoke.MethodHandle;
import java.nio.ByteBuffer;
import java.util.function.Consumer;
/**
* Utility class for creating uninitialized ByteBuffers, to avoid zeroing memory unnecessarily.
*/
public class UninitializedByteBuffer {
private static final Linker LINKER = Linker.nativeLinker();
private static final MethodHandle malloc = LINKER.downcallHandle(
@@ -16,6 +19,12 @@ public class UninitializedByteBuffer {
FunctionDescriptor.ofVoid(ValueLayout.ADDRESS)
);
/**
* Allocates a new uninitialized ByteBuffer of the specified capacity.
*
* @param capacity the capacity of the ByteBuffer
* @return a new uninitialized ByteBuffer
*/
public static ByteBuffer allocate(int capacity) {
UnsafeAccessor.NIO.reserveMemory(capacity, capacity);
@@ -38,6 +47,12 @@ public class UninitializedByteBuffer {
return reint.asByteBuffer();
}
/**
* Gets the address of the given ByteBuffer.
*
* @param buffer the ByteBuffer to get the address of
* @return the address of the ByteBuffer
*/
public static long getAddress(ByteBuffer buffer) {
return UnsafeAccessor.NIO.getBufferAddress(buffer);
}

View File

@@ -6,6 +6,9 @@ import sun.misc.Unsafe;
import java.lang.reflect.Field;
/**
* Provides access to the {@link Unsafe} class and {@link JavaNioAccess} class.
*/
public abstract class UnsafeAccessor {
public static final JavaNioAccess NIO;
public static final Unsafe UNSAFE;