From 1ae813aacd9496d4a4c2dab8b923731c51a5213c Mon Sep 17 00:00:00 2001 From: Stepan Usatiuk Date: Thu, 24 Apr 2025 15:46:23 +0200 Subject: [PATCH] Sync-base: cleanup old grpc channels --- .../dhfs/repository/RpcClientFactory.java | 19 ++++++++++++++++--- .../InvalidationQueueService.java | 2 +- 2 files changed, 17 insertions(+), 4 deletions(-) diff --git a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/repository/RpcClientFactory.java b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/repository/RpcClientFactory.java index 24f86755..d765c422 100644 --- a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/repository/RpcClientFactory.java +++ b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/repository/RpcClientFactory.java @@ -3,6 +3,7 @@ package com.usatiuk.dhfs.repository; import com.usatiuk.dhfs.PeerId; import com.usatiuk.dhfs.repository.peerdiscovery.IpPeerAddress; import com.usatiuk.dhfs.repository.peerdiscovery.PeerAddress; +import io.grpc.ManagedChannel; import io.grpc.Status; import io.grpc.StatusRuntimeException; import io.quarkus.logging.Log; @@ -20,7 +21,7 @@ import java.util.concurrent.TimeUnit; // TODO: Dedup this @ApplicationScoped -public class RpcClientFactory { +public class RpcClientFactory implements PeerDisconnectedEventListener { @ConfigProperty(name = "dhfs.objects.sync.timeout") long syncTimeout; @@ -79,8 +80,20 @@ public class RpcClientFactory { return fn.apply(host, stub.withDeadlineAfter(timeout, TimeUnit.SECONDS)); } - public void dropCache() { - _objSyncCache = new ConcurrentHashMap<>(); + @Override + public void handlePeerDisconnected(PeerId peerId) { + ArrayList toRemove = new ArrayList<>(); + for (var objSyncStubKey : _objSyncCache.keySet()) { + if (objSyncStubKey.id().equals(peerId)) { + toRemove.add(objSyncStubKey); + } + } + for (var objSyncStubKey : toRemove) { + var stub = _objSyncCache.remove(objSyncStubKey); + if (stub != null) { + ((ManagedChannel) stub.getChannel()).shutdown(); + } + } } @FunctionalInterface diff --git a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/repository/invalidation/InvalidationQueueService.java b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/repository/invalidation/InvalidationQueueService.java index 294d7318..cd00fcc5 100644 --- a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/repository/invalidation/InvalidationQueueService.java +++ b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/repository/invalidation/InvalidationQueueService.java @@ -162,7 +162,7 @@ public class InvalidationQueueService { commits.get(p).forEach(Runnable::run); } } catch (Exception e) { - Log.warnv("Failed to send invalidations, will retry", e); + Log.warn("Failed to send invalidations, will retry", e); for (var inv : data) { pushInvalidationToOne(inv); }