mirror of
https://github.com/usatiuk/dhfs.git
synced 2025-10-28 20:47:49 +01:00
be more proactive when setting connectivity state
This commit is contained in:
@@ -60,7 +60,10 @@ public class RemoteHostManager {
|
||||
private final ArrayList<Function<String, Void>> _connectionSuccessHandlers = new ArrayList<>();
|
||||
private final ArrayList<Function<String, Void>> _connectionErrorHandlers = new ArrayList<>();
|
||||
|
||||
private void handleConnectionSuccess(String host) {
|
||||
public void handleConnectionSuccess(String host) {
|
||||
if (_transientPeersState.runReadLocked(d -> d.getStates().getOrDefault(
|
||||
host, new TransientPeersStateData.TransientPeerState(TransientPeersStateData.TransientPeerState.ConnectionState.NOT_SEEN)
|
||||
)).getState().equals(TransientPeersStateData.TransientPeerState.ConnectionState.REACHABLE)) return;
|
||||
_transientPeersState.runWriteLocked(d -> {
|
||||
d.getStates().putIfAbsent(host, new TransientPeersStateData.TransientPeerState());
|
||||
var curState = d.getStates().get(host);
|
||||
@@ -72,7 +75,7 @@ public class RemoteHostManager {
|
||||
}
|
||||
}
|
||||
|
||||
private void handleConnectionError(String host) {
|
||||
public void handleConnectionError(String host) {
|
||||
_transientPeersState.runWriteLocked(d -> {
|
||||
d.getStates().putIfAbsent(host, new TransientPeersStateData.TransientPeerState());
|
||||
var curState = d.getStates().get(host);
|
||||
|
||||
@@ -32,7 +32,7 @@ public class RemoteObjectServiceClient {
|
||||
});
|
||||
|
||||
return remoteHostManager.withClientAny(targets, client -> {
|
||||
var reply = client.getObject(GetObjectRequest.newBuilder().setName(name).build());
|
||||
var reply = client.getObject(GetObjectRequest.newBuilder().setSelfname(selfname).setName(name).build());
|
||||
|
||||
var receivedSelfVer = reply.getObject().getHeader().getChangelog()
|
||||
.getEntriesList().stream().filter(p -> p.getHost().equals(selfname))
|
||||
@@ -60,14 +60,14 @@ public class RemoteObjectServiceClient {
|
||||
|
||||
public GetIndexReply getIndex(String host) {
|
||||
return remoteHostManager.withClient(host, client -> {
|
||||
var req = GetIndexRequest.newBuilder().build();
|
||||
var req = GetIndexRequest.newBuilder().setSelfname(selfname).build();
|
||||
var reply = client.getIndex(req);
|
||||
return reply;
|
||||
});
|
||||
}
|
||||
|
||||
public void notifyUpdate(String host, String name) {
|
||||
remoteHostManager.withClient(host, client -> {
|
||||
remoteHostManager.withClient(host, client -> {
|
||||
var meta = objectIndexService.getMeta(name).orElseThrow(() -> {
|
||||
Log.error("Race when trying to notify update");
|
||||
return new NotImplementedException();
|
||||
|
||||
@@ -30,9 +30,15 @@ public class RemoteObjectServiceServer implements DhfsObjectSyncGrpc {
|
||||
@Inject
|
||||
SyncHandler syncHandler;
|
||||
|
||||
@Inject
|
||||
RemoteHostManager remoteHostManager;
|
||||
|
||||
@Override
|
||||
@Blocking
|
||||
public Uni<GetObjectReply> getObject(GetObjectRequest request) {
|
||||
if (request.getSelfname().isBlank()) throw new StatusRuntimeException(Status.INVALID_ARGUMENT);
|
||||
remoteHostManager.handleConnectionSuccess(request.getSelfname());
|
||||
|
||||
Log.info("<-- getObject: " + request.getName());
|
||||
|
||||
var meta = objectIndexService.getMeta(request.getName()).orElseThrow(() -> new StatusRuntimeException(Status.NOT_FOUND));
|
||||
@@ -54,6 +60,9 @@ public class RemoteObjectServiceServer implements DhfsObjectSyncGrpc {
|
||||
@Override
|
||||
@Blocking
|
||||
public Uni<GetIndexReply> getIndex(GetIndexRequest request) {
|
||||
if (request.getSelfname().isBlank()) throw new StatusRuntimeException(Status.INVALID_ARGUMENT);
|
||||
remoteHostManager.handleConnectionSuccess(request.getSelfname());
|
||||
|
||||
Log.info("<-- getIndex: ");
|
||||
var builder = GetIndexReply.newBuilder().setSelfname(selfname);
|
||||
objectIndexService.forAllRead((name, meta) -> {
|
||||
@@ -65,12 +74,18 @@ public class RemoteObjectServiceServer implements DhfsObjectSyncGrpc {
|
||||
@Override
|
||||
@Blocking
|
||||
public Uni<IndexUpdateReply> indexUpdate(IndexUpdatePush request) {
|
||||
if (request.getSelfname().isBlank()) throw new StatusRuntimeException(Status.INVALID_ARGUMENT);
|
||||
remoteHostManager.handleConnectionSuccess(request.getSelfname());
|
||||
|
||||
Log.info("<-- indexUpdate: " + request.getHeader().getName());
|
||||
return Uni.createFrom().item(syncHandler.handleRemoteUpdate(request));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Uni<PingReply> ping(PingRequest request) {
|
||||
if (request.getSelfname().isBlank()) throw new StatusRuntimeException(Status.INVALID_ARGUMENT);
|
||||
remoteHostManager.handleConnectionSuccess(request.getSelfname());
|
||||
|
||||
return Uni.createFrom().item(PingReply.newBuilder().setSelfname(selfname).build());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -43,6 +43,8 @@ message ApiObject {
|
||||
}
|
||||
|
||||
message GetObjectRequest {
|
||||
string selfname = 10;
|
||||
|
||||
string name = 2;
|
||||
}
|
||||
|
||||
@@ -53,6 +55,8 @@ message GetObjectReply {
|
||||
}
|
||||
|
||||
message GetIndexRequest {
|
||||
string selfname = 10;
|
||||
|
||||
bool dontaskme = 1;
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user