From 9562951221040a8906c9aa48870c9833fc04c932 Mon Sep 17 00:00:00 2001 From: Stepan Usatiuk Date: Tue, 18 Jun 2024 20:42:06 +0200 Subject: [PATCH] be more proactive when setting connectivity state --- .../repository/distributed/RemoteHostManager.java | 7 +++++-- .../distributed/RemoteObjectServiceClient.java | 6 +++--- .../distributed/RemoteObjectServiceServer.java | 15 +++++++++++++++ server/src/main/proto/dhfs_objects_sync.proto | 4 ++++ 4 files changed, 27 insertions(+), 5 deletions(-) diff --git a/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/RemoteHostManager.java b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/RemoteHostManager.java index 152baed0..428a93dd 100644 --- a/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/RemoteHostManager.java +++ b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/RemoteHostManager.java @@ -60,7 +60,10 @@ public class RemoteHostManager { private final ArrayList> _connectionSuccessHandlers = new ArrayList<>(); private final ArrayList> _connectionErrorHandlers = new ArrayList<>(); - private void handleConnectionSuccess(String host) { + public void handleConnectionSuccess(String host) { + if (_transientPeersState.runReadLocked(d -> d.getStates().getOrDefault( + host, new TransientPeersStateData.TransientPeerState(TransientPeersStateData.TransientPeerState.ConnectionState.NOT_SEEN) + )).getState().equals(TransientPeersStateData.TransientPeerState.ConnectionState.REACHABLE)) return; _transientPeersState.runWriteLocked(d -> { d.getStates().putIfAbsent(host, new TransientPeersStateData.TransientPeerState()); var curState = d.getStates().get(host); @@ -72,7 +75,7 @@ public class RemoteHostManager { } } - private void handleConnectionError(String host) { + public void handleConnectionError(String host) { _transientPeersState.runWriteLocked(d -> { d.getStates().putIfAbsent(host, new TransientPeersStateData.TransientPeerState()); var curState = d.getStates().get(host); diff --git a/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/RemoteObjectServiceClient.java b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/RemoteObjectServiceClient.java index 3ec87770..a5dd2624 100644 --- a/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/RemoteObjectServiceClient.java +++ b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/RemoteObjectServiceClient.java @@ -32,7 +32,7 @@ public class RemoteObjectServiceClient { }); return remoteHostManager.withClientAny(targets, client -> { - var reply = client.getObject(GetObjectRequest.newBuilder().setName(name).build()); + var reply = client.getObject(GetObjectRequest.newBuilder().setSelfname(selfname).setName(name).build()); var receivedSelfVer = reply.getObject().getHeader().getChangelog() .getEntriesList().stream().filter(p -> p.getHost().equals(selfname)) @@ -60,14 +60,14 @@ public class RemoteObjectServiceClient { public GetIndexReply getIndex(String host) { return remoteHostManager.withClient(host, client -> { - var req = GetIndexRequest.newBuilder().build(); + var req = GetIndexRequest.newBuilder().setSelfname(selfname).build(); var reply = client.getIndex(req); return reply; }); } public void notifyUpdate(String host, String name) { - remoteHostManager.withClient(host, client -> { + remoteHostManager.withClient(host, client -> { var meta = objectIndexService.getMeta(name).orElseThrow(() -> { Log.error("Race when trying to notify update"); return new NotImplementedException(); diff --git a/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/RemoteObjectServiceServer.java b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/RemoteObjectServiceServer.java index ffdbe07a..0b499b70 100644 --- a/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/RemoteObjectServiceServer.java +++ b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/RemoteObjectServiceServer.java @@ -30,9 +30,15 @@ public class RemoteObjectServiceServer implements DhfsObjectSyncGrpc { @Inject SyncHandler syncHandler; + @Inject + RemoteHostManager remoteHostManager; + @Override @Blocking public Uni getObject(GetObjectRequest request) { + if (request.getSelfname().isBlank()) throw new StatusRuntimeException(Status.INVALID_ARGUMENT); + remoteHostManager.handleConnectionSuccess(request.getSelfname()); + Log.info("<-- getObject: " + request.getName()); var meta = objectIndexService.getMeta(request.getName()).orElseThrow(() -> new StatusRuntimeException(Status.NOT_FOUND)); @@ -54,6 +60,9 @@ public class RemoteObjectServiceServer implements DhfsObjectSyncGrpc { @Override @Blocking public Uni getIndex(GetIndexRequest request) { + if (request.getSelfname().isBlank()) throw new StatusRuntimeException(Status.INVALID_ARGUMENT); + remoteHostManager.handleConnectionSuccess(request.getSelfname()); + Log.info("<-- getIndex: "); var builder = GetIndexReply.newBuilder().setSelfname(selfname); objectIndexService.forAllRead((name, meta) -> { @@ -65,12 +74,18 @@ public class RemoteObjectServiceServer implements DhfsObjectSyncGrpc { @Override @Blocking public Uni indexUpdate(IndexUpdatePush request) { + if (request.getSelfname().isBlank()) throw new StatusRuntimeException(Status.INVALID_ARGUMENT); + remoteHostManager.handleConnectionSuccess(request.getSelfname()); + Log.info("<-- indexUpdate: " + request.getHeader().getName()); return Uni.createFrom().item(syncHandler.handleRemoteUpdate(request)); } @Override public Uni ping(PingRequest request) { + if (request.getSelfname().isBlank()) throw new StatusRuntimeException(Status.INVALID_ARGUMENT); + remoteHostManager.handleConnectionSuccess(request.getSelfname()); + return Uni.createFrom().item(PingReply.newBuilder().setSelfname(selfname).build()); } } diff --git a/server/src/main/proto/dhfs_objects_sync.proto b/server/src/main/proto/dhfs_objects_sync.proto index 44267d04..24dac168 100644 --- a/server/src/main/proto/dhfs_objects_sync.proto +++ b/server/src/main/proto/dhfs_objects_sync.proto @@ -43,6 +43,8 @@ message ApiObject { } message GetObjectRequest { + string selfname = 10; + string name = 2; } @@ -53,6 +55,8 @@ message GetObjectReply { } message GetIndexRequest { + string selfname = 10; + bool dontaskme = 1; }