diff --git a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/invalidation/DeferredInvalidationQueueService.java b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/invalidation/DeferredInvalidationQueueService.java index affb19f8..c1742f10 100644 --- a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/invalidation/DeferredInvalidationQueueService.java +++ b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/invalidation/DeferredInvalidationQueueService.java @@ -2,7 +2,7 @@ package com.usatiuk.dhfs.invalidation; import com.usatiuk.dhfs.peersync.PeerConnectedEventListener; import com.usatiuk.dhfs.peersync.PeerId; -import com.usatiuk.dhfs.peersync.PeerManager; +import com.usatiuk.dhfs.peersync.ConnectedPeerManager; import com.usatiuk.utils.SerializationHelper; import io.quarkus.logging.Log; import io.quarkus.runtime.ShutdownEvent; @@ -24,7 +24,7 @@ import java.nio.file.Paths; public class DeferredInvalidationQueueService implements PeerConnectedEventListener { private static final String dataFileName = "invqueue"; @Inject - PeerManager remoteHostManager; + ConnectedPeerManager remoteHostManager; @Inject InvalidationQueueService invalidationQueueService; @ConfigProperty(name = "dhfs.objects.persistence.files.root") diff --git a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/invalidation/InvalidationQueueService.java b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/invalidation/InvalidationQueueService.java index 9e61d612..8a1932b5 100644 --- a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/invalidation/InvalidationQueueService.java +++ b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/invalidation/InvalidationQueueService.java @@ -2,7 +2,7 @@ package com.usatiuk.dhfs.invalidation; import com.usatiuk.dhfs.peersync.PeerId; import com.usatiuk.dhfs.peersync.PeerInfoService; -import com.usatiuk.dhfs.peersync.PeerManager; +import com.usatiuk.dhfs.peersync.ConnectedPeerManager; import com.usatiuk.dhfs.peersync.PersistentPeerDataService; import com.usatiuk.dhfs.rpc.RemoteObjectServiceClient; import com.usatiuk.objects.JData; @@ -37,7 +37,7 @@ public class InvalidationQueueService { private final AtomicReference> _toAllQueue = new AtomicReference<>(new ConcurrentHashSet<>()); private final DataLocker _locker = new DataLocker(); @Inject - PeerManager remoteHostManager; + ConnectedPeerManager remoteHostManager; @Inject DeferredInvalidationQueueService deferredInvalidationQueueService; @Inject diff --git a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/peersync/PeerManager.java b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/peersync/ConnectedPeerManager.java similarity index 97% rename from dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/peersync/PeerManager.java rename to dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/peersync/ConnectedPeerManager.java index 5bcf7aa5..9620ad77 100644 --- a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/peersync/PeerManager.java +++ b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/peersync/ConnectedPeerManager.java @@ -30,9 +30,8 @@ import java.util.stream.Collectors; import java.util.stream.Stream; @ApplicationScoped -public class PeerManager { +public class ConnectedPeerManager { private final ConcurrentMap _states = new ConcurrentHashMap<>(); - // FIXME: Ideally not call them on every ping private final Collection _connectedListeners; private final Collection _disconnectedListeners; @Inject @@ -59,7 +58,7 @@ public class PeerManager { SyncHandler syncHandler; private ExecutorService _heartbeatExecutor; - public PeerManager(Instance connectedListeners, Instance disconnectedListeners) { + public ConnectedPeerManager(Instance connectedListeners, Instance disconnectedListeners) { _connectedListeners = List.copyOf(connectedListeners.stream().toList()); _disconnectedListeners = List.copyOf(disconnectedListeners.stream().toList()); } diff --git a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/peersync/PeerLastSeenUpdater.java b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/peersync/PeerLastSeenUpdater.java index f6054813..ca3d7889 100644 --- a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/peersync/PeerLastSeenUpdater.java +++ b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/peersync/PeerLastSeenUpdater.java @@ -12,7 +12,7 @@ import org.eclipse.microprofile.config.inject.ConfigProperty; @ApplicationScoped public class PeerLastSeenUpdater { @Inject - PeerManager peerManager; + ConnectedPeerManager connectedPeerManager; @Inject PeerInfoService peerInfoService; @Inject @@ -30,7 +30,7 @@ public class PeerLastSeenUpdater { @Scheduled(every = "${dhfs.objects.last-seen.update}", concurrentExecution = Scheduled.ConcurrentExecution.SKIP) @Blocking void update() { - var snapshot = peerManager.getHostStateSnapshot(); + var snapshot = connectedPeerManager.getHostStateSnapshot(); for (var a : snapshot.available()) { txm.run(() -> { var curInfo = remoteTransaction.getData(PeerInfo.class, a.id()).orElse(null); diff --git a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/peersync/PersistentPeerDataService.java b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/peersync/PersistentPeerDataService.java index aed7d4fd..2dfb3cf9 100644 --- a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/peersync/PersistentPeerDataService.java +++ b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/peersync/PersistentPeerDataService.java @@ -52,7 +52,7 @@ public class PersistentPeerDataService { @Inject TransactionManager txm; @Inject - PeerManager peerManager; + ConnectedPeerManager connectedPeerManager; @ConfigProperty(name = "dhfs.peerdiscovery.preset-uuid") Optional presetUuid; @@ -135,7 +135,7 @@ public class PersistentPeerDataService { } curTx.put(data.withInitialSyncDone(data.initialSyncDone().minus(peerId))); Log.infov("Did reset sync state for {0}", peerId); - curTx.onCommit(() -> peerManager.handleConnectionError(peerId)); + curTx.onCommit(() -> connectedPeerManager.handleConnectionError(peerId)); return true; }); } diff --git a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/rpc/RemoteObjectServiceClient.java b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/rpc/RemoteObjectServiceClient.java index 4089d4ca..4914c236 100644 --- a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/rpc/RemoteObjectServiceClient.java +++ b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/rpc/RemoteObjectServiceClient.java @@ -4,10 +4,9 @@ import com.usatiuk.dhfs.ProtoSerializer; import com.usatiuk.dhfs.invalidation.InvalidationQueueService; import com.usatiuk.dhfs.invalidation.Op; import com.usatiuk.dhfs.peersync.PeerId; -import com.usatiuk.dhfs.peersync.PeerManager; +import com.usatiuk.dhfs.peersync.ConnectedPeerManager; import com.usatiuk.dhfs.peersync.PersistentPeerDataService; import com.usatiuk.dhfs.persistence.JObjectKeyP; -import com.usatiuk.dhfs.refcount.JDataRef; import com.usatiuk.dhfs.remoteobj.ReceivedObject; import com.usatiuk.dhfs.remoteobj.RemoteObjectMeta; import com.usatiuk.dhfs.remoteobj.RemoteTransaction; @@ -52,7 +51,7 @@ public class RemoteObjectServiceClient { @Inject ProtoSerializer receivedObjectProtoSerializer; @Inject - PeerManager peerManager; + ConnectedPeerManager connectedPeerManager; public Pair getSpecificObject(JObjectKey key, PeerId peerId) { return rpcClientFactory.withObjSyncClient(peerId, (peer, client) -> { @@ -71,7 +70,7 @@ public class RemoteObjectServiceClient { var targetVersion = objMeta.versionSum(); var targets = objMeta.knownRemoteVersions().isEmpty() - ? peerManager.getAvailableHosts() + ? connectedPeerManager.getAvailableHosts() : objMeta.knownRemoteVersions().entrySet().stream() .filter(entry -> entry.getValue().equals(targetVersion)) .map(Map.Entry::getKey).toList(); diff --git a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/rpc/RemoteObjectServiceServerImpl.java b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/rpc/RemoteObjectServiceServerImpl.java index 048522a1..2bd94be7 100644 --- a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/rpc/RemoteObjectServiceServerImpl.java +++ b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/rpc/RemoteObjectServiceServerImpl.java @@ -5,7 +5,7 @@ import com.usatiuk.dhfs.autosync.AutosyncProcessor; import com.usatiuk.dhfs.invalidation.Op; import com.usatiuk.dhfs.invalidation.OpHandlerService; import com.usatiuk.dhfs.peersync.PeerId; -import com.usatiuk.dhfs.peersync.PeerManager; +import com.usatiuk.dhfs.peersync.ConnectedPeerManager; import com.usatiuk.dhfs.persistence.JObjectKeyP; import com.usatiuk.dhfs.remoteobj.*; import com.usatiuk.dhfs.repository.*; @@ -31,7 +31,7 @@ public class RemoteObjectServiceServerImpl { @Inject TransactionManager txm; @Inject - PeerManager peerManager; + ConnectedPeerManager connectedPeerManager; @Inject Transaction curTx; diff --git a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/rpc/RpcClientFactory.java b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/rpc/RpcClientFactory.java index aedeef61..02f52ea5 100644 --- a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/rpc/RpcClientFactory.java +++ b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/rpc/RpcClientFactory.java @@ -4,7 +4,7 @@ import com.usatiuk.dhfs.peerdiscovery.IpPeerAddress; import com.usatiuk.dhfs.peerdiscovery.PeerAddress; import com.usatiuk.dhfs.peersync.PeerDisconnectedEventListener; import com.usatiuk.dhfs.peersync.PeerId; -import com.usatiuk.dhfs.peersync.PeerManager; +import com.usatiuk.dhfs.peersync.ConnectedPeerManager; import com.usatiuk.dhfs.repository.DhfsObjectSyncGrpcGrpc; import io.grpc.ManagedChannel; import io.grpc.Status; @@ -29,7 +29,7 @@ public class RpcClientFactory implements PeerDisconnectedEventListener { long syncTimeout; @Inject - PeerManager peerManager; + ConnectedPeerManager connectedPeerManager; @Inject RpcChannelFactory rpcChannelFactory; @@ -56,7 +56,7 @@ public class RpcClientFactory implements PeerDisconnectedEventListener { } public R withObjSyncClient(PeerId target, ObjectSyncClientFunction fn) { - var hostinfo = peerManager.getAddress(target); + var hostinfo = connectedPeerManager.getAddress(target); if (hostinfo == null) throw new StatusRuntimeException(Status.UNAVAILABLE.withDescription("Not known to be reachable: " + target)); diff --git a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/webapi/PeerManagementApi.java b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/webapi/PeerManagementApi.java index ec74090a..f6b51399 100644 --- a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/webapi/PeerManagementApi.java +++ b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/webapi/PeerManagementApi.java @@ -2,7 +2,7 @@ package com.usatiuk.dhfs.webapi; import com.usatiuk.dhfs.peersync.PeerId; import com.usatiuk.dhfs.peersync.PeerInfoService; -import com.usatiuk.dhfs.peersync.PeerManager; +import com.usatiuk.dhfs.peersync.ConnectedPeerManager; import com.usatiuk.dhfs.peersync.PersistentPeerDataService; import jakarta.inject.Inject; import jakarta.ws.rs.*; @@ -14,7 +14,7 @@ public class PeerManagementApi { @Inject PeerInfoService peerInfoService; @Inject - PeerManager peerManager; + ConnectedPeerManager connectedPeerManager; @Inject PersistentPeerDataService persistentPeerDataService; @@ -23,27 +23,27 @@ public class PeerManagementApi { public List knownPeers() { return peerInfoService.getPeers().stream().map( peerInfo -> new PeerInfo(peerInfo.id().toString(), Base64.getEncoder().encodeToString(peerInfo.cert().toByteArray()), - Optional.ofNullable(peerManager.getAddress(peerInfo.id())).map(Objects::toString).orElse(null))).toList(); + Optional.ofNullable(connectedPeerManager.getAddress(peerInfo.id())).map(Objects::toString).orElse(null))).toList(); } @Path("known-peers/{peerId}") @PUT public void addPeer(@PathParam("peerId") String peerId, KnownPeerPut knownPeerPut) { - peerManager.addRemoteHost(PeerId.of(peerId), knownPeerPut.cert()); + connectedPeerManager.addRemoteHost(PeerId.of(peerId), knownPeerPut.cert()); } @Path("known-peers/{peerId}") @DELETE public void deletePeer(@PathParam("peerId") String peerId) { - peerManager.removeRemoteHost(PeerId.of(peerId)); + connectedPeerManager.removeRemoteHost(PeerId.of(peerId)); } @Path("available-peers") @GET public Collection availablePeers() { - return peerManager.getSeenButNotAddedHosts().stream() + return connectedPeerManager.getSeenButNotAddedHosts().stream() .map(p -> new PeerInfo(p.getLeft().toString(), p.getRight().cert(), - peerManager.selectBestAddress(p.getLeft()).map(Objects::toString).orElse(null))) + connectedPeerManager.selectBestAddress(p.getLeft()).map(Objects::toString).orElse(null))) .toList(); } } diff --git a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/webapi/PersistentPeerAddressApi.java b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/webapi/PersistentPeerAddressApi.java index 40555ae4..99802fbf 100644 --- a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/webapi/PersistentPeerAddressApi.java +++ b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/webapi/PersistentPeerAddressApi.java @@ -3,7 +3,7 @@ package com.usatiuk.dhfs.webapi; import com.usatiuk.dhfs.peerdiscovery.PeerAddrStringHelper; import com.usatiuk.dhfs.peersync.PeerId; import com.usatiuk.dhfs.peersync.PeerInfoService; -import com.usatiuk.dhfs.peersync.PeerManager; +import com.usatiuk.dhfs.peersync.ConnectedPeerManager; import com.usatiuk.dhfs.peersync.PersistentPeerDataService; import jakarta.inject.Inject; import jakarta.ws.rs.*; @@ -15,7 +15,7 @@ public class PersistentPeerAddressApi { @Inject PeerInfoService peerInfoService; @Inject - PeerManager peerManager; + ConnectedPeerManager connectedPeerManager; @Inject PersistentPeerDataService persistentPeerDataService;