Server: peer connected/disconnected event listeners

This commit is contained in:
2025-03-16 12:58:56 +01:00
parent 8fbdf50732
commit fa64dac9aa
5 changed files with 39 additions and 31 deletions

View File

@@ -0,0 +1,7 @@
package com.usatiuk.dhfs.objects.repository;
import com.usatiuk.dhfs.objects.PeerId;
public interface PeerConnectedEventListener {
void handlePeerConnected(PeerId peerId);
}

View File

@@ -0,0 +1,7 @@
package com.usatiuk.dhfs.objects.repository;
import com.usatiuk.dhfs.objects.PeerId;
public interface PeerDisconnectedEventListener {
void handlePeerDisconnected(PeerId peerId);
}

View File

@@ -17,11 +17,14 @@ import io.smallrye.common.annotation.Blocking;
import jakarta.annotation.Priority;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
import jakarta.enterprise.inject.Instance;
import jakarta.inject.Inject;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import java.io.IOException;
import java.util.*;
import java.util.Base64;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.*;
import java.util.stream.Collectors;
@@ -29,8 +32,8 @@ import java.util.stream.Collectors;
public class PeerManager {
private final ConcurrentMap<PeerId, PeerAddress> _states = new ConcurrentHashMap<>();
// FIXME: Ideally not call them on every ping
private final ArrayList<ConnectionEventListener> _connectedListeners = new ArrayList<>();
private final ArrayList<ConnectionEventListener> _disconnectedListeners = new ArrayList<>();
private final Collection<PeerConnectedEventListener> _connectedListeners;
private final Collection<PeerDisconnectedEventListener> _disconnectedListeners;
@Inject
PersistentPeerDataService persistentPeerDataService;
@Inject
@@ -53,6 +56,11 @@ public class PeerManager {
SyncHandler syncHandler;
private ExecutorService _heartbeatExecutor;
public PeerManager(Instance<PeerConnectedEventListener> connectedListeners, Instance<PeerDisconnectedEventListener> disconnectedListeners) {
_connectedListeners = List.copyOf(connectedListeners.stream().toList());
_disconnectedListeners = List.copyOf(disconnectedListeners.stream().toList());
}
// Note: keep priority updated with below
void init(@Observes @Priority(600) StartupEvent event) throws IOException {
_heartbeatExecutor = Executors.newVirtualThreadPerTaskExecutor();
@@ -86,20 +94,6 @@ public class PeerManager {
}
}
// Note: registrations should be completed with Priority < 600
public void registerConnectEventListener(ConnectionEventListener listener) {
synchronized (_connectedListeners) {
_connectedListeners.add(listener);
}
}
// Note: registrations should be completed with Priority < 600
public void registerDisconnectEventListener(ConnectionEventListener listener) {
synchronized (_disconnectedListeners) {
_disconnectedListeners.add(listener);
}
}
private void handleConnectionSuccess(PeerInfo host, PeerAddress address) {
boolean wasReachable = isReachable(host);
@@ -114,9 +108,9 @@ public class PeerManager {
Log.infov("Connected to {0}", host);
// for (var l : _connectedListeners) {
// l.apply(host);
// }
for (var l : _connectedListeners) {
l.handlePeerConnected(host.id());
}
}
public void handleConnectionError(PeerInfo host) {
@@ -127,9 +121,9 @@ public class PeerManager {
_states.remove(host.id());
// for (var l : _disconnectedListeners) {
// l.apply(host);
// }
for (var l : _disconnectedListeners) {
l.handlePeerDisconnected(host.id());
}
}
// FIXME:
@@ -210,11 +204,6 @@ public class PeerManager {
});
}
@FunctionalInterface
public interface ConnectionEventListener {
void apply(UUID host);
}
public record HostStateSnapshot(Collection<PeerId> available, Collection<PeerId> unavailable) {
}

View File

@@ -2,10 +2,10 @@ package com.usatiuk.dhfs.objects.repository;
import com.usatiuk.dhfs.ShutdownChecker;
import com.usatiuk.dhfs.objects.PeerId;
import com.usatiuk.dhfs.objects.transaction.TransactionManager;
import com.usatiuk.dhfs.objects.repository.peersync.PeerInfoService;
import com.usatiuk.dhfs.objects.repository.peertrust.PeerTrustManager;
import com.usatiuk.dhfs.objects.transaction.Transaction;
import com.usatiuk.dhfs.objects.transaction.TransactionManager;
import io.quarkus.logging.Log;
import io.quarkus.runtime.StartupEvent;
import jakarta.annotation.Priority;

View File

@@ -1,6 +1,7 @@
package com.usatiuk.dhfs.objects.repository.invalidation;
import com.usatiuk.dhfs.objects.PeerId;
import com.usatiuk.dhfs.objects.repository.PeerConnectedEventListener;
import com.usatiuk.dhfs.objects.repository.PeerManager;
import com.usatiuk.dhfs.utils.SerializationHelper;
import io.quarkus.logging.Log;
@@ -20,7 +21,7 @@ import java.nio.file.Files;
import java.nio.file.Paths;
@ApplicationScoped
public class DeferredInvalidationQueueService {
public class DeferredInvalidationQueueService implements PeerConnectedEventListener {
private static final String dataFileName = "invqueue";
@Inject
PeerManager remoteHostManager;
@@ -37,7 +38,6 @@ public class DeferredInvalidationQueueService {
Log.info("Reading invalidation queue");
_persistentData = SerializationHelper.deserialize(Files.readAllBytes(Paths.get(dataRoot).resolve(dataFileName)));
}
// remoteHostManager.registerConnectEventListener(this::returnForHost);
}
void shutdown(@Observes @Priority(300) ShutdownEvent event) throws IOException {
@@ -80,4 +80,9 @@ public class DeferredInvalidationQueueService {
_persistentData.deferredInvalidations.put(entry.peer(), entry);
}
}
@Override
public void handlePeerConnected(PeerId peerId) {
returnForHost(peerId);
}
}