Sync-base: rename ConnectedPeerManager to ReachablePeerManager

tests check for "connected" in logs
This commit is contained in:
2025-05-06 20:28:21 +02:00
parent d972cd1562
commit 52ccbb99bc
10 changed files with 32 additions and 32 deletions

View File

@@ -2,7 +2,7 @@ package com.usatiuk.dhfs.invalidation;
import com.usatiuk.dhfs.peersync.PeerConnectedEventListener;
import com.usatiuk.dhfs.peersync.PeerId;
import com.usatiuk.dhfs.peersync.ConnectedPeerManager;
import com.usatiuk.dhfs.peersync.ReachablePeerManager;
import com.usatiuk.utils.SerializationHelper;
import io.quarkus.logging.Log;
import io.quarkus.runtime.ShutdownEvent;
@@ -24,7 +24,7 @@ import java.nio.file.Paths;
public class DeferredInvalidationQueueService implements PeerConnectedEventListener {
private static final String dataFileName = "invqueue";
@Inject
ConnectedPeerManager remoteHostManager;
ReachablePeerManager reachablePeerManager;
@Inject
InvalidationQueueService invalidationQueueService;
@ConfigProperty(name = "dhfs.objects.persistence.files.root")
@@ -63,7 +63,7 @@ public class DeferredInvalidationQueueService implements PeerConnectedEventListe
@Scheduled(every = "15s", concurrentExecution = Scheduled.ConcurrentExecution.SKIP)
@Blocking
void periodicReturn() {
for (var reachable : remoteHostManager.getAvailableHosts())
for (var reachable : reachablePeerManager.getAvailableHosts())
returnForHost(reachable);
}

View File

@@ -2,7 +2,7 @@ package com.usatiuk.dhfs.invalidation;
import com.usatiuk.dhfs.peersync.PeerId;
import com.usatiuk.dhfs.peersync.PeerInfoService;
import com.usatiuk.dhfs.peersync.ConnectedPeerManager;
import com.usatiuk.dhfs.peersync.ReachablePeerManager;
import com.usatiuk.dhfs.peersync.PersistentPeerDataService;
import com.usatiuk.dhfs.rpc.RemoteObjectServiceClient;
import com.usatiuk.objects.JData;
@@ -37,7 +37,7 @@ public class InvalidationQueueService {
private final AtomicReference<ConcurrentHashSet<JObjectKey>> _toAllQueue = new AtomicReference<>(new ConcurrentHashSet<>());
private final DataLocker _locker = new DataLocker();
@Inject
ConnectedPeerManager remoteHostManager;
ReachablePeerManager reachablePeerManager;
@Inject
DeferredInvalidationQueueService deferredInvalidationQueueService;
@Inject
@@ -103,7 +103,7 @@ public class InvalidationQueueService {
}
if (toAllQueue != null) {
var hostInfo = remoteHostManager.getHostStateSnapshot();
var hostInfo = reachablePeerManager.getHostStateSnapshot();
for (var o : toAllQueue) {
for (var h : hostInfo.available())
_queue.add(new InvalidationQueueEntry(h, o));
@@ -129,7 +129,7 @@ public class InvalidationQueueService {
continue;
}
if (!remoteHostManager.isReachable(e.peer())) {
if (!reachablePeerManager.isReachable(e.peer())) {
deferredInvalidationQueueService.defer(e);
continue;
}
@@ -210,14 +210,14 @@ public class InvalidationQueueService {
}
void pushInvalidationToOne(InvalidationQueueEntry entry) {
if (remoteHostManager.isReachable(entry.peer()))
if (reachablePeerManager.isReachable(entry.peer()))
_queue.add(entry);
else
deferredInvalidationQueueService.defer(entry);
}
void pushInvalidationToOneNoDelay(InvalidationQueueEntry entry) {
if (remoteHostManager.isReachable(entry.peer()))
if (reachablePeerManager.isReachable(entry.peer()))
_queue.addNoDelay(entry);
else
deferredInvalidationQueueService.defer(entry);

View File

@@ -12,7 +12,7 @@ import org.eclipse.microprofile.config.inject.ConfigProperty;
@ApplicationScoped
public class PeerLastSeenUpdater {
@Inject
ConnectedPeerManager connectedPeerManager;
ReachablePeerManager reachablePeerManager;
@Inject
PeerInfoService peerInfoService;
@Inject
@@ -30,7 +30,7 @@ public class PeerLastSeenUpdater {
@Scheduled(every = "${dhfs.objects.last-seen.update}", concurrentExecution = Scheduled.ConcurrentExecution.SKIP)
@Blocking
void update() {
var snapshot = connectedPeerManager.getHostStateSnapshot();
var snapshot = reachablePeerManager.getHostStateSnapshot();
for (var a : snapshot.available()) {
txm.run(() -> {
var curInfo = remoteTransaction.getData(PeerInfo.class, a.id()).orElse(null);

View File

@@ -52,7 +52,7 @@ public class PersistentPeerDataService {
@Inject
TransactionManager txm;
@Inject
ConnectedPeerManager connectedPeerManager;
ReachablePeerManager reachablePeerManager;
@ConfigProperty(name = "dhfs.peerdiscovery.preset-uuid")
Optional<String> presetUuid;
@@ -135,7 +135,7 @@ public class PersistentPeerDataService {
}
curTx.put(data.withInitialSyncDone(data.initialSyncDone().minus(peerId)));
Log.infov("Did reset sync state for {0}", peerId);
curTx.onCommit(() -> connectedPeerManager.handleConnectionError(peerId));
curTx.onCommit(() -> reachablePeerManager.handleConnectionError(peerId));
return true;
});
}

View File

@@ -30,7 +30,7 @@ import java.util.stream.Collectors;
import java.util.stream.Stream;
@ApplicationScoped
public class ConnectedPeerManager {
public class ReachablePeerManager {
private final ConcurrentMap<PeerId, PeerAddress> _states = new ConcurrentHashMap<>();
private final Collection<PeerConnectedEventListener> _connectedListeners;
private final Collection<PeerDisconnectedEventListener> _disconnectedListeners;
@@ -58,7 +58,7 @@ public class ConnectedPeerManager {
SyncHandler syncHandler;
private ExecutorService _heartbeatExecutor;
public ConnectedPeerManager(Instance<PeerConnectedEventListener> connectedListeners, Instance<PeerDisconnectedEventListener> disconnectedListeners) {
public ReachablePeerManager(Instance<PeerConnectedEventListener> connectedListeners, Instance<PeerDisconnectedEventListener> disconnectedListeners) {
_connectedListeners = List.copyOf(connectedListeners.stream().toList());
_disconnectedListeners = List.copyOf(disconnectedListeners.stream().toList());
}

View File

@@ -4,7 +4,7 @@ import com.usatiuk.dhfs.ProtoSerializer;
import com.usatiuk.dhfs.invalidation.InvalidationQueueService;
import com.usatiuk.dhfs.invalidation.Op;
import com.usatiuk.dhfs.peersync.PeerId;
import com.usatiuk.dhfs.peersync.ConnectedPeerManager;
import com.usatiuk.dhfs.peersync.ReachablePeerManager;
import com.usatiuk.dhfs.peersync.PersistentPeerDataService;
import com.usatiuk.dhfs.persistence.JObjectKeyP;
import com.usatiuk.dhfs.remoteobj.ReceivedObject;
@@ -51,7 +51,7 @@ public class RemoteObjectServiceClient {
@Inject
ProtoSerializer<GetObjectReply, ReceivedObject> receivedObjectProtoSerializer;
@Inject
ConnectedPeerManager connectedPeerManager;
ReachablePeerManager reachablePeerManager;
public Pair<PeerId, ReceivedObject> getSpecificObject(JObjectKey key, PeerId peerId) {
return rpcClientFactory.withObjSyncClient(peerId, (peer, client) -> {
@@ -70,7 +70,7 @@ public class RemoteObjectServiceClient {
var targetVersion = objMeta.versionSum();
var targets = objMeta.knownRemoteVersions().isEmpty()
? connectedPeerManager.getAvailableHosts()
? reachablePeerManager.getAvailableHosts()
: objMeta.knownRemoteVersions().entrySet().stream()
.filter(entry -> entry.getValue().equals(targetVersion))
.map(Map.Entry::getKey).toList();

View File

@@ -5,7 +5,7 @@ import com.usatiuk.dhfs.autosync.AutosyncProcessor;
import com.usatiuk.dhfs.invalidation.Op;
import com.usatiuk.dhfs.invalidation.OpHandlerService;
import com.usatiuk.dhfs.peersync.PeerId;
import com.usatiuk.dhfs.peersync.ConnectedPeerManager;
import com.usatiuk.dhfs.peersync.ReachablePeerManager;
import com.usatiuk.dhfs.persistence.JObjectKeyP;
import com.usatiuk.dhfs.remoteobj.*;
import com.usatiuk.dhfs.repository.*;
@@ -31,7 +31,7 @@ public class RemoteObjectServiceServerImpl {
@Inject
TransactionManager txm;
@Inject
ConnectedPeerManager connectedPeerManager;
ReachablePeerManager reachablePeerManager;
@Inject
Transaction curTx;

View File

@@ -4,7 +4,7 @@ import com.usatiuk.dhfs.peerdiscovery.IpPeerAddress;
import com.usatiuk.dhfs.peerdiscovery.PeerAddress;
import com.usatiuk.dhfs.peersync.PeerDisconnectedEventListener;
import com.usatiuk.dhfs.peersync.PeerId;
import com.usatiuk.dhfs.peersync.ConnectedPeerManager;
import com.usatiuk.dhfs.peersync.ReachablePeerManager;
import com.usatiuk.dhfs.repository.DhfsObjectSyncGrpcGrpc;
import io.grpc.ManagedChannel;
import io.grpc.Status;
@@ -29,7 +29,7 @@ public class RpcClientFactory implements PeerDisconnectedEventListener {
long syncTimeout;
@Inject
ConnectedPeerManager connectedPeerManager;
ReachablePeerManager reachablePeerManager;
@Inject
RpcChannelFactory rpcChannelFactory;
@@ -56,7 +56,7 @@ public class RpcClientFactory implements PeerDisconnectedEventListener {
}
public <R> R withObjSyncClient(PeerId target, ObjectSyncClientFunction<R> fn) {
var hostinfo = connectedPeerManager.getAddress(target);
var hostinfo = reachablePeerManager.getAddress(target);
if (hostinfo == null)
throw new StatusRuntimeException(Status.UNAVAILABLE.withDescription("Not known to be reachable: " + target));

View File

@@ -2,7 +2,7 @@ package com.usatiuk.dhfs.webapi;
import com.usatiuk.dhfs.peersync.PeerId;
import com.usatiuk.dhfs.peersync.PeerInfoService;
import com.usatiuk.dhfs.peersync.ConnectedPeerManager;
import com.usatiuk.dhfs.peersync.ReachablePeerManager;
import com.usatiuk.dhfs.peersync.PersistentPeerDataService;
import jakarta.inject.Inject;
import jakarta.ws.rs.*;
@@ -14,7 +14,7 @@ public class PeerManagementApi {
@Inject
PeerInfoService peerInfoService;
@Inject
ConnectedPeerManager connectedPeerManager;
ReachablePeerManager reachablePeerManager;
@Inject
PersistentPeerDataService persistentPeerDataService;
@@ -23,27 +23,27 @@ public class PeerManagementApi {
public List<PeerInfo> knownPeers() {
return peerInfoService.getPeers().stream().map(
peerInfo -> new PeerInfo(peerInfo.id().toString(), Base64.getEncoder().encodeToString(peerInfo.cert().toByteArray()),
Optional.ofNullable(connectedPeerManager.getAddress(peerInfo.id())).map(Objects::toString).orElse(null))).toList();
Optional.ofNullable(reachablePeerManager.getAddress(peerInfo.id())).map(Objects::toString).orElse(null))).toList();
}
@Path("known-peers/{peerId}")
@PUT
public void addPeer(@PathParam("peerId") String peerId, KnownPeerPut knownPeerPut) {
connectedPeerManager.addRemoteHost(PeerId.of(peerId), knownPeerPut.cert());
reachablePeerManager.addRemoteHost(PeerId.of(peerId), knownPeerPut.cert());
}
@Path("known-peers/{peerId}")
@DELETE
public void deletePeer(@PathParam("peerId") String peerId) {
connectedPeerManager.removeRemoteHost(PeerId.of(peerId));
reachablePeerManager.removeRemoteHost(PeerId.of(peerId));
}
@Path("available-peers")
@GET
public Collection<PeerInfo> availablePeers() {
return connectedPeerManager.getSeenButNotAddedHosts().stream()
return reachablePeerManager.getSeenButNotAddedHosts().stream()
.map(p -> new PeerInfo(p.getLeft().toString(), p.getRight().cert(),
connectedPeerManager.selectBestAddress(p.getLeft()).map(Objects::toString).orElse(null)))
reachablePeerManager.selectBestAddress(p.getLeft()).map(Objects::toString).orElse(null)))
.toList();
}
}

View File

@@ -3,7 +3,7 @@ package com.usatiuk.dhfs.webapi;
import com.usatiuk.dhfs.peerdiscovery.PeerAddrStringHelper;
import com.usatiuk.dhfs.peersync.PeerId;
import com.usatiuk.dhfs.peersync.PeerInfoService;
import com.usatiuk.dhfs.peersync.ConnectedPeerManager;
import com.usatiuk.dhfs.peersync.ReachablePeerManager;
import com.usatiuk.dhfs.peersync.PersistentPeerDataService;
import jakarta.inject.Inject;
import jakarta.ws.rs.*;
@@ -15,7 +15,7 @@ public class PersistentPeerAddressApi {
@Inject
PeerInfoService peerInfoService;
@Inject
ConnectedPeerManager connectedPeerManager;
ReachablePeerManager reachablePeerManager;
@Inject
PersistentPeerDataService persistentPeerDataService;