Sync-base: cleanup old grpc channels

This commit is contained in:
2025-04-24 15:46:23 +02:00
parent e81671251a
commit 1ae813aacd
2 changed files with 17 additions and 4 deletions

View File

@@ -3,6 +3,7 @@ package com.usatiuk.dhfs.repository;
import com.usatiuk.dhfs.PeerId;
import com.usatiuk.dhfs.repository.peerdiscovery.IpPeerAddress;
import com.usatiuk.dhfs.repository.peerdiscovery.PeerAddress;
import io.grpc.ManagedChannel;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.quarkus.logging.Log;
@@ -20,7 +21,7 @@ import java.util.concurrent.TimeUnit;
// TODO: Dedup this
@ApplicationScoped
public class RpcClientFactory {
public class RpcClientFactory implements PeerDisconnectedEventListener {
@ConfigProperty(name = "dhfs.objects.sync.timeout")
long syncTimeout;
@@ -79,8 +80,20 @@ public class RpcClientFactory {
return fn.apply(host, stub.withDeadlineAfter(timeout, TimeUnit.SECONDS));
}
public void dropCache() {
_objSyncCache = new ConcurrentHashMap<>();
@Override
public void handlePeerDisconnected(PeerId peerId) {
ArrayList<ObjSyncStubKey> toRemove = new ArrayList<>();
for (var objSyncStubKey : _objSyncCache.keySet()) {
if (objSyncStubKey.id().equals(peerId)) {
toRemove.add(objSyncStubKey);
}
}
for (var objSyncStubKey : toRemove) {
var stub = _objSyncCache.remove(objSyncStubKey);
if (stub != null) {
((ManagedChannel) stub.getChannel()).shutdown();
}
}
}
@FunctionalInterface

View File

@@ -162,7 +162,7 @@ public class InvalidationQueueService {
commits.get(p).forEach(Runnable::run);
}
} catch (Exception e) {
Log.warnv("Failed to send invalidations, will retry", e);
Log.warn("Failed to send invalidations, will retry", e);
for (var inv : data) {
pushInvalidationToOne(inv);
}