Sync-base: rename ConnectedPeerManager

This commit is contained in:
2025-05-05 22:18:00 +02:00
parent 0849df60ae
commit 289a2b880e
10 changed files with 27 additions and 29 deletions

View File

@@ -2,7 +2,7 @@ package com.usatiuk.dhfs.invalidation;
import com.usatiuk.dhfs.peersync.PeerConnectedEventListener; import com.usatiuk.dhfs.peersync.PeerConnectedEventListener;
import com.usatiuk.dhfs.peersync.PeerId; import com.usatiuk.dhfs.peersync.PeerId;
import com.usatiuk.dhfs.peersync.PeerManager; import com.usatiuk.dhfs.peersync.ConnectedPeerManager;
import com.usatiuk.utils.SerializationHelper; import com.usatiuk.utils.SerializationHelper;
import io.quarkus.logging.Log; import io.quarkus.logging.Log;
import io.quarkus.runtime.ShutdownEvent; import io.quarkus.runtime.ShutdownEvent;
@@ -24,7 +24,7 @@ import java.nio.file.Paths;
public class DeferredInvalidationQueueService implements PeerConnectedEventListener { public class DeferredInvalidationQueueService implements PeerConnectedEventListener {
private static final String dataFileName = "invqueue"; private static final String dataFileName = "invqueue";
@Inject @Inject
PeerManager remoteHostManager; ConnectedPeerManager remoteHostManager;
@Inject @Inject
InvalidationQueueService invalidationQueueService; InvalidationQueueService invalidationQueueService;
@ConfigProperty(name = "dhfs.objects.persistence.files.root") @ConfigProperty(name = "dhfs.objects.persistence.files.root")

View File

@@ -2,7 +2,7 @@ package com.usatiuk.dhfs.invalidation;
import com.usatiuk.dhfs.peersync.PeerId; import com.usatiuk.dhfs.peersync.PeerId;
import com.usatiuk.dhfs.peersync.PeerInfoService; import com.usatiuk.dhfs.peersync.PeerInfoService;
import com.usatiuk.dhfs.peersync.PeerManager; import com.usatiuk.dhfs.peersync.ConnectedPeerManager;
import com.usatiuk.dhfs.peersync.PersistentPeerDataService; import com.usatiuk.dhfs.peersync.PersistentPeerDataService;
import com.usatiuk.dhfs.rpc.RemoteObjectServiceClient; import com.usatiuk.dhfs.rpc.RemoteObjectServiceClient;
import com.usatiuk.objects.JData; import com.usatiuk.objects.JData;
@@ -37,7 +37,7 @@ public class InvalidationQueueService {
private final AtomicReference<ConcurrentHashSet<JObjectKey>> _toAllQueue = new AtomicReference<>(new ConcurrentHashSet<>()); private final AtomicReference<ConcurrentHashSet<JObjectKey>> _toAllQueue = new AtomicReference<>(new ConcurrentHashSet<>());
private final DataLocker _locker = new DataLocker(); private final DataLocker _locker = new DataLocker();
@Inject @Inject
PeerManager remoteHostManager; ConnectedPeerManager remoteHostManager;
@Inject @Inject
DeferredInvalidationQueueService deferredInvalidationQueueService; DeferredInvalidationQueueService deferredInvalidationQueueService;
@Inject @Inject

View File

@@ -30,9 +30,8 @@ import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;
@ApplicationScoped @ApplicationScoped
public class PeerManager { public class ConnectedPeerManager {
private final ConcurrentMap<PeerId, PeerAddress> _states = new ConcurrentHashMap<>(); private final ConcurrentMap<PeerId, PeerAddress> _states = new ConcurrentHashMap<>();
// FIXME: Ideally not call them on every ping
private final Collection<PeerConnectedEventListener> _connectedListeners; private final Collection<PeerConnectedEventListener> _connectedListeners;
private final Collection<PeerDisconnectedEventListener> _disconnectedListeners; private final Collection<PeerDisconnectedEventListener> _disconnectedListeners;
@Inject @Inject
@@ -59,7 +58,7 @@ public class PeerManager {
SyncHandler syncHandler; SyncHandler syncHandler;
private ExecutorService _heartbeatExecutor; private ExecutorService _heartbeatExecutor;
public PeerManager(Instance<PeerConnectedEventListener> connectedListeners, Instance<PeerDisconnectedEventListener> disconnectedListeners) { public ConnectedPeerManager(Instance<PeerConnectedEventListener> connectedListeners, Instance<PeerDisconnectedEventListener> disconnectedListeners) {
_connectedListeners = List.copyOf(connectedListeners.stream().toList()); _connectedListeners = List.copyOf(connectedListeners.stream().toList());
_disconnectedListeners = List.copyOf(disconnectedListeners.stream().toList()); _disconnectedListeners = List.copyOf(disconnectedListeners.stream().toList());
} }

View File

@@ -12,7 +12,7 @@ import org.eclipse.microprofile.config.inject.ConfigProperty;
@ApplicationScoped @ApplicationScoped
public class PeerLastSeenUpdater { public class PeerLastSeenUpdater {
@Inject @Inject
PeerManager peerManager; ConnectedPeerManager connectedPeerManager;
@Inject @Inject
PeerInfoService peerInfoService; PeerInfoService peerInfoService;
@Inject @Inject
@@ -30,7 +30,7 @@ public class PeerLastSeenUpdater {
@Scheduled(every = "${dhfs.objects.last-seen.update}", concurrentExecution = Scheduled.ConcurrentExecution.SKIP) @Scheduled(every = "${dhfs.objects.last-seen.update}", concurrentExecution = Scheduled.ConcurrentExecution.SKIP)
@Blocking @Blocking
void update() { void update() {
var snapshot = peerManager.getHostStateSnapshot(); var snapshot = connectedPeerManager.getHostStateSnapshot();
for (var a : snapshot.available()) { for (var a : snapshot.available()) {
txm.run(() -> { txm.run(() -> {
var curInfo = remoteTransaction.getData(PeerInfo.class, a.id()).orElse(null); var curInfo = remoteTransaction.getData(PeerInfo.class, a.id()).orElse(null);

View File

@@ -52,7 +52,7 @@ public class PersistentPeerDataService {
@Inject @Inject
TransactionManager txm; TransactionManager txm;
@Inject @Inject
PeerManager peerManager; ConnectedPeerManager connectedPeerManager;
@ConfigProperty(name = "dhfs.peerdiscovery.preset-uuid") @ConfigProperty(name = "dhfs.peerdiscovery.preset-uuid")
Optional<String> presetUuid; Optional<String> presetUuid;
@@ -135,7 +135,7 @@ public class PersistentPeerDataService {
} }
curTx.put(data.withInitialSyncDone(data.initialSyncDone().minus(peerId))); curTx.put(data.withInitialSyncDone(data.initialSyncDone().minus(peerId)));
Log.infov("Did reset sync state for {0}", peerId); Log.infov("Did reset sync state for {0}", peerId);
curTx.onCommit(() -> peerManager.handleConnectionError(peerId)); curTx.onCommit(() -> connectedPeerManager.handleConnectionError(peerId));
return true; return true;
}); });
} }

View File

@@ -4,10 +4,9 @@ import com.usatiuk.dhfs.ProtoSerializer;
import com.usatiuk.dhfs.invalidation.InvalidationQueueService; import com.usatiuk.dhfs.invalidation.InvalidationQueueService;
import com.usatiuk.dhfs.invalidation.Op; import com.usatiuk.dhfs.invalidation.Op;
import com.usatiuk.dhfs.peersync.PeerId; import com.usatiuk.dhfs.peersync.PeerId;
import com.usatiuk.dhfs.peersync.PeerManager; import com.usatiuk.dhfs.peersync.ConnectedPeerManager;
import com.usatiuk.dhfs.peersync.PersistentPeerDataService; import com.usatiuk.dhfs.peersync.PersistentPeerDataService;
import com.usatiuk.dhfs.persistence.JObjectKeyP; import com.usatiuk.dhfs.persistence.JObjectKeyP;
import com.usatiuk.dhfs.refcount.JDataRef;
import com.usatiuk.dhfs.remoteobj.ReceivedObject; import com.usatiuk.dhfs.remoteobj.ReceivedObject;
import com.usatiuk.dhfs.remoteobj.RemoteObjectMeta; import com.usatiuk.dhfs.remoteobj.RemoteObjectMeta;
import com.usatiuk.dhfs.remoteobj.RemoteTransaction; import com.usatiuk.dhfs.remoteobj.RemoteTransaction;
@@ -52,7 +51,7 @@ public class RemoteObjectServiceClient {
@Inject @Inject
ProtoSerializer<GetObjectReply, ReceivedObject> receivedObjectProtoSerializer; ProtoSerializer<GetObjectReply, ReceivedObject> receivedObjectProtoSerializer;
@Inject @Inject
PeerManager peerManager; ConnectedPeerManager connectedPeerManager;
public Pair<PeerId, ReceivedObject> getSpecificObject(JObjectKey key, PeerId peerId) { public Pair<PeerId, ReceivedObject> getSpecificObject(JObjectKey key, PeerId peerId) {
return rpcClientFactory.withObjSyncClient(peerId, (peer, client) -> { return rpcClientFactory.withObjSyncClient(peerId, (peer, client) -> {
@@ -71,7 +70,7 @@ public class RemoteObjectServiceClient {
var targetVersion = objMeta.versionSum(); var targetVersion = objMeta.versionSum();
var targets = objMeta.knownRemoteVersions().isEmpty() var targets = objMeta.knownRemoteVersions().isEmpty()
? peerManager.getAvailableHosts() ? connectedPeerManager.getAvailableHosts()
: objMeta.knownRemoteVersions().entrySet().stream() : objMeta.knownRemoteVersions().entrySet().stream()
.filter(entry -> entry.getValue().equals(targetVersion)) .filter(entry -> entry.getValue().equals(targetVersion))
.map(Map.Entry::getKey).toList(); .map(Map.Entry::getKey).toList();

View File

@@ -5,7 +5,7 @@ import com.usatiuk.dhfs.autosync.AutosyncProcessor;
import com.usatiuk.dhfs.invalidation.Op; import com.usatiuk.dhfs.invalidation.Op;
import com.usatiuk.dhfs.invalidation.OpHandlerService; import com.usatiuk.dhfs.invalidation.OpHandlerService;
import com.usatiuk.dhfs.peersync.PeerId; import com.usatiuk.dhfs.peersync.PeerId;
import com.usatiuk.dhfs.peersync.PeerManager; import com.usatiuk.dhfs.peersync.ConnectedPeerManager;
import com.usatiuk.dhfs.persistence.JObjectKeyP; import com.usatiuk.dhfs.persistence.JObjectKeyP;
import com.usatiuk.dhfs.remoteobj.*; import com.usatiuk.dhfs.remoteobj.*;
import com.usatiuk.dhfs.repository.*; import com.usatiuk.dhfs.repository.*;
@@ -31,7 +31,7 @@ public class RemoteObjectServiceServerImpl {
@Inject @Inject
TransactionManager txm; TransactionManager txm;
@Inject @Inject
PeerManager peerManager; ConnectedPeerManager connectedPeerManager;
@Inject @Inject
Transaction curTx; Transaction curTx;

View File

@@ -4,7 +4,7 @@ import com.usatiuk.dhfs.peerdiscovery.IpPeerAddress;
import com.usatiuk.dhfs.peerdiscovery.PeerAddress; import com.usatiuk.dhfs.peerdiscovery.PeerAddress;
import com.usatiuk.dhfs.peersync.PeerDisconnectedEventListener; import com.usatiuk.dhfs.peersync.PeerDisconnectedEventListener;
import com.usatiuk.dhfs.peersync.PeerId; import com.usatiuk.dhfs.peersync.PeerId;
import com.usatiuk.dhfs.peersync.PeerManager; import com.usatiuk.dhfs.peersync.ConnectedPeerManager;
import com.usatiuk.dhfs.repository.DhfsObjectSyncGrpcGrpc; import com.usatiuk.dhfs.repository.DhfsObjectSyncGrpcGrpc;
import io.grpc.ManagedChannel; import io.grpc.ManagedChannel;
import io.grpc.Status; import io.grpc.Status;
@@ -29,7 +29,7 @@ public class RpcClientFactory implements PeerDisconnectedEventListener {
long syncTimeout; long syncTimeout;
@Inject @Inject
PeerManager peerManager; ConnectedPeerManager connectedPeerManager;
@Inject @Inject
RpcChannelFactory rpcChannelFactory; RpcChannelFactory rpcChannelFactory;
@@ -56,7 +56,7 @@ public class RpcClientFactory implements PeerDisconnectedEventListener {
} }
public <R> R withObjSyncClient(PeerId target, ObjectSyncClientFunction<R> fn) { public <R> R withObjSyncClient(PeerId target, ObjectSyncClientFunction<R> fn) {
var hostinfo = peerManager.getAddress(target); var hostinfo = connectedPeerManager.getAddress(target);
if (hostinfo == null) if (hostinfo == null)
throw new StatusRuntimeException(Status.UNAVAILABLE.withDescription("Not known to be reachable: " + target)); throw new StatusRuntimeException(Status.UNAVAILABLE.withDescription("Not known to be reachable: " + target));

View File

@@ -2,7 +2,7 @@ package com.usatiuk.dhfs.webapi;
import com.usatiuk.dhfs.peersync.PeerId; import com.usatiuk.dhfs.peersync.PeerId;
import com.usatiuk.dhfs.peersync.PeerInfoService; import com.usatiuk.dhfs.peersync.PeerInfoService;
import com.usatiuk.dhfs.peersync.PeerManager; import com.usatiuk.dhfs.peersync.ConnectedPeerManager;
import com.usatiuk.dhfs.peersync.PersistentPeerDataService; import com.usatiuk.dhfs.peersync.PersistentPeerDataService;
import jakarta.inject.Inject; import jakarta.inject.Inject;
import jakarta.ws.rs.*; import jakarta.ws.rs.*;
@@ -14,7 +14,7 @@ public class PeerManagementApi {
@Inject @Inject
PeerInfoService peerInfoService; PeerInfoService peerInfoService;
@Inject @Inject
PeerManager peerManager; ConnectedPeerManager connectedPeerManager;
@Inject @Inject
PersistentPeerDataService persistentPeerDataService; PersistentPeerDataService persistentPeerDataService;
@@ -23,27 +23,27 @@ public class PeerManagementApi {
public List<PeerInfo> knownPeers() { public List<PeerInfo> knownPeers() {
return peerInfoService.getPeers().stream().map( return peerInfoService.getPeers().stream().map(
peerInfo -> new PeerInfo(peerInfo.id().toString(), Base64.getEncoder().encodeToString(peerInfo.cert().toByteArray()), peerInfo -> new PeerInfo(peerInfo.id().toString(), Base64.getEncoder().encodeToString(peerInfo.cert().toByteArray()),
Optional.ofNullable(peerManager.getAddress(peerInfo.id())).map(Objects::toString).orElse(null))).toList(); Optional.ofNullable(connectedPeerManager.getAddress(peerInfo.id())).map(Objects::toString).orElse(null))).toList();
} }
@Path("known-peers/{peerId}") @Path("known-peers/{peerId}")
@PUT @PUT
public void addPeer(@PathParam("peerId") String peerId, KnownPeerPut knownPeerPut) { public void addPeer(@PathParam("peerId") String peerId, KnownPeerPut knownPeerPut) {
peerManager.addRemoteHost(PeerId.of(peerId), knownPeerPut.cert()); connectedPeerManager.addRemoteHost(PeerId.of(peerId), knownPeerPut.cert());
} }
@Path("known-peers/{peerId}") @Path("known-peers/{peerId}")
@DELETE @DELETE
public void deletePeer(@PathParam("peerId") String peerId) { public void deletePeer(@PathParam("peerId") String peerId) {
peerManager.removeRemoteHost(PeerId.of(peerId)); connectedPeerManager.removeRemoteHost(PeerId.of(peerId));
} }
@Path("available-peers") @Path("available-peers")
@GET @GET
public Collection<PeerInfo> availablePeers() { public Collection<PeerInfo> availablePeers() {
return peerManager.getSeenButNotAddedHosts().stream() return connectedPeerManager.getSeenButNotAddedHosts().stream()
.map(p -> new PeerInfo(p.getLeft().toString(), p.getRight().cert(), .map(p -> new PeerInfo(p.getLeft().toString(), p.getRight().cert(),
peerManager.selectBestAddress(p.getLeft()).map(Objects::toString).orElse(null))) connectedPeerManager.selectBestAddress(p.getLeft()).map(Objects::toString).orElse(null)))
.toList(); .toList();
} }
} }

View File

@@ -3,7 +3,7 @@ package com.usatiuk.dhfs.webapi;
import com.usatiuk.dhfs.peerdiscovery.PeerAddrStringHelper; import com.usatiuk.dhfs.peerdiscovery.PeerAddrStringHelper;
import com.usatiuk.dhfs.peersync.PeerId; import com.usatiuk.dhfs.peersync.PeerId;
import com.usatiuk.dhfs.peersync.PeerInfoService; import com.usatiuk.dhfs.peersync.PeerInfoService;
import com.usatiuk.dhfs.peersync.PeerManager; import com.usatiuk.dhfs.peersync.ConnectedPeerManager;
import com.usatiuk.dhfs.peersync.PersistentPeerDataService; import com.usatiuk.dhfs.peersync.PersistentPeerDataService;
import jakarta.inject.Inject; import jakarta.inject.Inject;
import jakarta.ws.rs.*; import jakarta.ws.rs.*;
@@ -15,7 +15,7 @@ public class PersistentPeerAddressApi {
@Inject @Inject
PeerInfoService peerInfoService; PeerInfoService peerInfoService;
@Inject @Inject
PeerManager peerManager; ConnectedPeerManager connectedPeerManager;
@Inject @Inject
PersistentPeerDataService persistentPeerDataService; PersistentPeerDataService persistentPeerDataService;