diff --git a/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/HostInfo.java b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/HostInfo.java index b0a7efda..ceb62a86 100644 --- a/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/HostInfo.java +++ b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/HostInfo.java @@ -1,9 +1,9 @@ package com.usatiuk.dhfs.storage.objects.repository.distributed; +import com.usatiuk.dhfs.objects.repository.distributed.peersync.PeerInfo; import jakarta.json.bind.annotation.JsonbCreator; import jakarta.json.bind.annotation.JsonbProperty; import lombok.Getter; -import lombok.Setter; import java.io.Serializable; import java.util.UUID; @@ -12,17 +12,12 @@ import java.util.UUID; public class HostInfo implements Serializable { private final UUID _uuid; - @Setter - private String _addr; - @Setter - private Integer _port; - @JsonbCreator - public HostInfo(@JsonbProperty("uuid") String uuid, - @JsonbProperty("addr") String addr, - @JsonbProperty("port") Integer port) { + public HostInfo(@JsonbProperty("uuid") String uuid) { _uuid = UUID.fromString(uuid); - _addr = addr; - _port = port; + } + + public PeerInfo toPeerInfo() { + return PeerInfo.newBuilder().setUuid(_uuid.toString()).build(); } } diff --git a/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/RemoteHostManager.java b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/RemoteHostManager.java index 70b14023..5c81b81d 100644 --- a/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/RemoteHostManager.java +++ b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/RemoteHostManager.java @@ -1,11 +1,7 @@ package com.usatiuk.dhfs.storage.objects.repository.distributed; -import com.usatiuk.dhfs.objects.repository.distributed.DhfsObjectSyncGrpcGrpc; import com.usatiuk.dhfs.objects.repository.distributed.PingRequest; -import io.grpc.Status; -import io.grpc.StatusRuntimeException; -import io.grpc.netty.NegotiationType; -import io.grpc.netty.NettyChannelBuilder; +import com.usatiuk.dhfs.storage.objects.repository.distributed.peersync.PeerSyncClient; import io.quarkus.logging.Log; import io.quarkus.runtime.ShutdownEvent; import io.quarkus.runtime.StartupEvent; @@ -17,8 +13,10 @@ import jakarta.enterprise.event.Observes; import jakarta.inject.Inject; import java.io.IOException; -import java.util.*; -import java.util.concurrent.TimeUnit; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.UUID; @ApplicationScoped public class RemoteHostManager { @@ -28,7 +26,13 @@ public class RemoteHostManager { @Inject SyncHandler syncHandler; - TransientPeersState _transientPeersState = new TransientPeersState(); + @Inject + RpcClientFactory rpcClientFactory; + + @Inject + PeerSyncClient peerSyncClient; + + private final TransientPeersState _transientPeersState = new TransientPeersState(); void init(@Observes @Priority(350) StartupEvent event) throws IOException { } @@ -43,11 +47,11 @@ public class RemoteHostManager { var shouldTry = _transientPeersState.runReadLocked(d -> { var s = d.getStates().get(host.getUuid()); if (s == null) return true; - return !s.getState().equals(TransientPeersStateData.TransientPeerState.ConnectionState.REACHABLE); + return !s.getState().equals(TransientPeerState.ConnectionState.REACHABLE); }); if (shouldTry) { Log.info("Trying to connect to " + host.getUuid()); - if (reachable(host)) { + if (pingCheck(host.getUuid())) { handleConnectionSuccess(host.getUuid()); } } @@ -56,110 +60,70 @@ public class RemoteHostManager { public void handleConnectionSuccess(UUID host) { if (_transientPeersState.runReadLocked(d -> d.getStates().getOrDefault( - host, new TransientPeersStateData.TransientPeerState(TransientPeersStateData.TransientPeerState.ConnectionState.NOT_SEEN) - )).getState().equals(TransientPeersStateData.TransientPeerState.ConnectionState.REACHABLE)) return; + host, new TransientPeerState(TransientPeerState.ConnectionState.NOT_SEEN) + )).getState().equals(TransientPeerState.ConnectionState.REACHABLE)) return; _transientPeersState.runWriteLocked(d -> { - d.getStates().putIfAbsent(host, new TransientPeersStateData.TransientPeerState()); + d.getStates().putIfAbsent(host, new TransientPeerState()); var curState = d.getStates().get(host); - curState.setState(TransientPeersStateData.TransientPeerState.ConnectionState.REACHABLE); + curState.setState(TransientPeerState.ConnectionState.REACHABLE); return null; }); Log.info("Connected to " + host); syncHandler.doInitialResync(host); + peerSyncClient.syncPeersOne(host); } public void handleConnectionError(UUID host) { Log.info("Lost connection to " + host); _transientPeersState.runWriteLocked(d -> { - d.getStates().putIfAbsent(host, new TransientPeersStateData.TransientPeerState()); + d.getStates().putIfAbsent(host, new TransientPeerState()); var curState = d.getStates().get(host); - curState.setState(TransientPeersStateData.TransientPeerState.ConnectionState.UNREACHABLE); + curState.setState(TransientPeerState.ConnectionState.UNREACHABLE); return null; }); } - - @FunctionalInterface - public interface ClientFunction { - R apply(DhfsObjectSyncGrpcGrpc.DhfsObjectSyncGrpcBlockingStub client); - } - - private R withClient(String addr, int port, Optional timeout, ClientFunction fn) { - var channel = NettyChannelBuilder.forAddress(addr, port).negotiationType(NegotiationType.PLAINTEXT) - .usePlaintext().build(); - var client = DhfsObjectSyncGrpcGrpc.newBlockingStub(channel) - .withMaxOutboundMessageSize(Integer.MAX_VALUE) - .withMaxInboundMessageSize(Integer.MAX_VALUE); - if (timeout.isPresent()) { - client = client.withDeadlineAfter(timeout.get(), TimeUnit.MILLISECONDS); - } - try { - return fn.apply(client); - } finally { - channel.shutdownNow(); - } - } - // FIXME: - private boolean reachable(HostInfo hostInfo) { + private boolean pingCheck(UUID host) { + TransientPeerState state = _transientPeersState.runReadLocked(s -> s.getStates().get(host)); + if (state == null) return false; try { - return withClient(hostInfo.getAddr(), hostInfo.getPort(), Optional.of(5000L /*ms*/), c -> { + return rpcClientFactory.withObjSyncClient(state.getAddr(), state.getPort(), Optional.of(5000L /*ms*/), c -> { var ret = c.ping(PingRequest.newBuilder().setSelfUuid(persistentRemoteHostsService.getSelfUuid().toString()).build()); - if (!UUID.fromString(ret.getSelfUuid()).equals(hostInfo.getUuid())) { - throw new IllegalStateException("Ping selfUuid returned " + ret.getSelfUuid() + " but expected " + hostInfo.getUuid()); + if (!UUID.fromString(ret.getSelfUuid()).equals(host)) { + throw new IllegalStateException("Ping selfUuid returned " + ret.getSelfUuid() + " but expected " + host); } return true; }); } catch (Exception ignored) { - Log.info("Host " + hostInfo.getUuid() + " is unreachable: " + ignored.getMessage() + " " + ignored.getCause()); + Log.info("Host " + host + " is unreachable: " + ignored.getMessage() + " " + ignored.getCause()); return false; } } - public boolean reachable(UUID host) { - return reachable(persistentRemoteHostsService.getInfo(host)); + public boolean isReachable(UUID host) { + return _transientPeersState.runReadLocked(d -> { + var res = d.getStates().get(host); + return res.getState() == TransientPeerState.ConnectionState.REACHABLE; + }); } - public R withClientAny(Collection targets, ClientFunction fn) { - var shuffledList = new ArrayList<>(targets); - Collections.shuffle(shuffledList); - for (UUID target : shuffledList) { - var hostinfo = persistentRemoteHostsService.getInfo(target); - - boolean shouldTry = _transientPeersState.runReadLocked(d -> { - var res = d.getStates().get(hostinfo.getUuid()); - if (res == null) return true; - return res.getState() == TransientPeersStateData.TransientPeerState.ConnectionState.REACHABLE; - }); - - if (!shouldTry) continue; - - try { - return withClient(hostinfo.getAddr(), hostinfo.getPort(), Optional.empty(), fn); - } catch (StatusRuntimeException e) { - if (e.getStatus().equals(Status.UNAVAILABLE)) { - Log.info("Host " + hostinfo.getUuid() + " is unreachable: " + e.getMessage()); - handleConnectionError(hostinfo.getUuid()); - } else throw e; - } - } - throw new IllegalStateException("No reachable targets!"); - } - - public R withClient(UUID target, ClientFunction fn) { - var hostinfo = persistentRemoteHostsService.getInfo(target); - return withClient(hostinfo.getAddr(), hostinfo.getPort(), Optional.empty(), fn); + public TransientPeerState getTransientState(UUID host) { + return _transientPeersState.runReadLocked(d -> { + d.getStates().putIfAbsent(host, new TransientPeerState()); + return d.getStates().get(host); + }); } public List getAvailableHosts() { return _transientPeersState.runReadLocked(d -> d.getStates().entrySet().stream() - .filter(e -> e.getValue().getState().equals(TransientPeersStateData.TransientPeerState.ConnectionState.REACHABLE)) + .filter(e -> e.getValue().getState().equals(TransientPeerState.ConnectionState.REACHABLE)) .map(Map.Entry::getKey).toList()); } public List getSeenHosts() { return _transientPeersState.runReadLocked(d -> d.getStates().entrySet().stream() - .filter(e -> !e.getValue().getState().equals(TransientPeersStateData.TransientPeerState.ConnectionState.NOT_SEEN)) + .filter(e -> !e.getValue().getState().equals(TransientPeerState.ConnectionState.NOT_SEEN)) .map(Map.Entry::getKey).toList()); } diff --git a/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/RemoteObjectServiceClient.java b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/RemoteObjectServiceClient.java index b2d638b4..ff44ef2f 100644 --- a/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/RemoteObjectServiceClient.java +++ b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/RemoteObjectServiceClient.java @@ -20,13 +20,13 @@ public class RemoteObjectServiceClient { PersistentRemoteHostsService persistentRemoteHostsService; @Inject - RemoteHostManager remoteHostManager; + RpcClientFactory rpcClientFactory; @Inject JObjectManager jObjectManager; public Pair getSpecificObject(UUID host, String name) { - return remoteHostManager.withClient(host, client -> { + return rpcClientFactory.withObjSyncClient(host, client -> { var reply = client.getObject(GetObjectRequest.newBuilder().setSelfUuid(persistentRemoteHostsService.getSelfUuid().toString()).setName(name).build()); return Pair.of(reply.getObject().getHeader(), reply.getObject().getContent()); }); @@ -40,7 +40,7 @@ public class RemoteObjectServiceClient { return md.getRemoteCopies().entrySet().stream().filter(entry -> entry.getValue().equals(bestVer)).map(Map.Entry::getKey).toList(); }); - return remoteHostManager.withClientAny(targets, client -> { + return rpcClientFactory.withObjSyncClient(targets, client -> { var reply = client.getObject(GetObjectRequest.newBuilder().setSelfUuid(persistentRemoteHostsService.getSelfUuid().toString()).setName(jObject.getName()).build()); var receivedSelfVer = reply.getObject().getHeader().getChangelog() @@ -65,7 +65,7 @@ public class RemoteObjectServiceClient { } public IndexUpdatePush getIndex(UUID host) { - return remoteHostManager.withClient(host, client -> { + return rpcClientFactory.withObjSyncClient(host, client -> { var req = GetIndexRequest.newBuilder().setSelfUuid(persistentRemoteHostsService.getSelfUuid().toString()).build(); return client.getIndex(req); }); @@ -81,6 +81,6 @@ public class RemoteObjectServiceClient { var send = builder.build(); - return remoteHostManager.withClient(host, client -> client.indexUpdate(send).getErrorsList()); + return rpcClientFactory.withObjSyncClient(host, client -> client.indexUpdate(send).getErrorsList()); } } diff --git a/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/RpcClientFactory.java b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/RpcClientFactory.java new file mode 100644 index 00000000..0a7b99c2 --- /dev/null +++ b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/RpcClientFactory.java @@ -0,0 +1,101 @@ +package com.usatiuk.dhfs.storage.objects.repository.distributed; + +import com.usatiuk.dhfs.objects.repository.distributed.DhfsObjectSyncGrpcGrpc; +import com.usatiuk.dhfs.objects.repository.distributed.peersync.DhfsObjectPeerSyncGrpcGrpc; +import io.grpc.Status; +import io.grpc.StatusRuntimeException; +import io.grpc.netty.NegotiationType; +import io.grpc.netty.NettyChannelBuilder; +import io.quarkus.logging.Log; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; + +import java.util.*; +import java.util.concurrent.TimeUnit; + +// TODO: Dedup this +@ApplicationScoped +public class RpcClientFactory { + @Inject + PersistentRemoteHostsService persistentRemoteHostsService; + + @Inject + RemoteHostManager remoteHostManager; + + @FunctionalInterface + public interface ObjectSyncClientFunction { + R apply(DhfsObjectSyncGrpcGrpc.DhfsObjectSyncGrpcBlockingStub client); + } + + public R withObjSyncClient(Collection targets, ObjectSyncClientFunction fn) { + var shuffledList = new ArrayList<>(targets); + Collections.shuffle(shuffledList); + for (UUID target : shuffledList) { + var hostinfo = remoteHostManager.getTransientState(target); + + boolean shouldTry = remoteHostManager.isReachable(target); + + if (!shouldTry) continue; + + try { + return withObjSyncClient(hostinfo.getAddr(), hostinfo.getPort(), Optional.empty(), fn); + } catch (StatusRuntimeException e) { + if (e.getStatus().equals(Status.UNAVAILABLE)) { + Log.info("Host " + target + " is unreachable: " + e.getMessage()); + remoteHostManager.handleConnectionError(target); + } else throw e; + } + } + throw new IllegalStateException("No reachable targets!"); + } + + + public R withObjSyncClient(UUID target, ObjectSyncClientFunction fn) { + var hostinfo = remoteHostManager.getTransientState(target); + return withObjSyncClient(hostinfo.getAddr(), hostinfo.getPort(), Optional.empty(), fn); + } + + public R withObjSyncClient(String addr, int port, Optional timeout, ObjectSyncClientFunction fn) { + var channel = NettyChannelBuilder.forAddress(addr, port).negotiationType(NegotiationType.PLAINTEXT) + .usePlaintext().build(); + var client = DhfsObjectSyncGrpcGrpc.newBlockingStub(channel) + .withMaxOutboundMessageSize(Integer.MAX_VALUE) + .withMaxInboundMessageSize(Integer.MAX_VALUE); + if (timeout.isPresent()) { + client = client.withDeadlineAfter(timeout.get(), TimeUnit.MILLISECONDS); + } + try { + return fn.apply(client); + } finally { + channel.shutdownNow(); + } + } + + @FunctionalInterface + public interface PeerSyncClientFunction { + R apply(DhfsObjectPeerSyncGrpcGrpc.DhfsObjectPeerSyncGrpcBlockingStub client); + } + + public R withPeerSyncClient(UUID target, PeerSyncClientFunction fn) { + var hostinfo = remoteHostManager.getTransientState(target); + return withPeerSyncClient(hostinfo.getAddr(), hostinfo.getPort(), Optional.empty(), fn); + } + + public R withPeerSyncClient(String addr, int port, Optional timeout, PeerSyncClientFunction fn) { + var channel = NettyChannelBuilder.forAddress(addr, port).negotiationType(NegotiationType.PLAINTEXT) + .usePlaintext().build(); + var client = DhfsObjectPeerSyncGrpcGrpc.newBlockingStub(channel) + .withMaxOutboundMessageSize(Integer.MAX_VALUE) + .withMaxInboundMessageSize(Integer.MAX_VALUE); + if (timeout.isPresent()) { + client = client.withDeadlineAfter(timeout.get(), TimeUnit.MILLISECONDS); + } + try { + return fn.apply(client); + } finally { + channel.shutdownNow(); + } + } + + +} diff --git a/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/TransientPeerState.java b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/TransientPeerState.java new file mode 100644 index 00000000..99aa1caa --- /dev/null +++ b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/TransientPeerState.java @@ -0,0 +1,32 @@ +package com.usatiuk.dhfs.storage.objects.repository.distributed; + +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; + +@AllArgsConstructor +@NoArgsConstructor +public class TransientPeerState { + public TransientPeerState(ConnectionState connectionState) { + _state = connectionState; + } + + public enum ConnectionState { + NOT_SEEN, + REACHABLE, + UNREACHABLE + } + + @Getter + @Setter + private ConnectionState _state = ConnectionState.NOT_SEEN; + + @Getter + @Setter + private String _addr; + + @Getter + @Setter + private int _port; +} diff --git a/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/TransientPeersStateData.java b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/TransientPeersStateData.java index 5f40fce2..6b4a10f6 100644 --- a/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/TransientPeersStateData.java +++ b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/TransientPeersStateData.java @@ -9,22 +9,9 @@ import java.util.LinkedHashMap; import java.util.Map; import java.util.UUID; + public class TransientPeersStateData { - @AllArgsConstructor - @NoArgsConstructor - public static class TransientPeerState { - public enum ConnectionState { - NOT_SEEN, - REACHABLE, - UNREACHABLE - } - - @Getter - @Setter - private ConnectionState _state = ConnectionState.NOT_SEEN; - } - @Getter private final Map _states = new LinkedHashMap<>(); } diff --git a/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/peersync/PeerSyncClient.java b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/peersync/PeerSyncClient.java new file mode 100644 index 00000000..1cd8f4b8 --- /dev/null +++ b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/peersync/PeerSyncClient.java @@ -0,0 +1,46 @@ +package com.usatiuk.dhfs.storage.objects.repository.distributed.peersync; + +import com.usatiuk.dhfs.objects.repository.distributed.peersync.PeerInfo; +import com.usatiuk.dhfs.objects.repository.distributed.peersync.SyncPeersData; +import com.usatiuk.dhfs.storage.objects.repository.distributed.HostInfo; +import com.usatiuk.dhfs.storage.objects.repository.distributed.PersistentRemoteHostsService; +import com.usatiuk.dhfs.storage.objects.repository.distributed.RemoteHostManager; +import com.usatiuk.dhfs.storage.objects.repository.distributed.RpcClientFactory; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; + +import java.util.UUID; + +@ApplicationScoped +public class PeerSyncClient { + @Inject + RemoteHostManager remoteHostManager; + + @Inject + RpcClientFactory rpcClientFactory; + + @Inject + PersistentRemoteHostsService persistentRemoteHostsService; + + public void syncPeersOne(UUID host) { + var ret = rpcClientFactory.withPeerSyncClient(host, client -> { + var builder = SyncPeersData.newBuilder(); + builder.setSelfUuid(persistentRemoteHostsService.getSelfUuid().toString()); + for (var h : persistentRemoteHostsService.getHosts()) { + builder.addMyPeers(h.toPeerInfo()); + } + builder.addMyPeers(PeerInfo.newBuilder().setUuid(persistentRemoteHostsService.getSelfUuid().toString()).build()); + return client.syncPeers(builder.build()); + }); + + for (var np : ret.getMyPeersList()) { + persistentRemoteHostsService.addHost(new HostInfo(np.getUuid())); + } + } + + public void syncPeersAll() { + for (var h : remoteHostManager.getSeenHosts()) { + syncPeersOne(h); + } + } +} diff --git a/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/peersync/PeerSyncServer.java b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/peersync/PeerSyncServer.java new file mode 100644 index 00000000..eaae1719 --- /dev/null +++ b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/peersync/PeerSyncServer.java @@ -0,0 +1,15 @@ +package com.usatiuk.dhfs.storage.objects.repository.distributed.peersync; + +import com.usatiuk.dhfs.objects.repository.distributed.peersync.DhfsObjectPeerSyncGrpc; +import com.usatiuk.dhfs.objects.repository.distributed.peersync.SyncPeersData; +import io.quarkus.grpc.GrpcService; +import io.smallrye.mutiny.Uni; + +@GrpcService +public class PeerSyncServer implements DhfsObjectPeerSyncGrpc { + + @Override + public Uni syncPeers(SyncPeersData request) { + return null; + } +} diff --git a/server/src/main/proto/dhfs_objects_peer_sync.proto b/server/src/main/proto/dhfs_objects_peer_sync.proto new file mode 100644 index 00000000..e4c0a191 --- /dev/null +++ b/server/src/main/proto/dhfs_objects_peer_sync.proto @@ -0,0 +1,21 @@ +syntax = "proto3"; + +option java_multiple_files = true; +option java_package = "com.usatiuk.dhfs.objects.repository.distributed.peersync"; +option java_outer_classname = "DhfsObjectPeerSyncApi"; + +package dhfs.objects.peersync; + +service DhfsObjectPeerSyncGrpc { + rpc SyncPeers (SyncPeersData) returns (SyncPeersData) {} +} + +message PeerInfo { + string uuid = 1; +} + +message SyncPeersData { + string selfUuid = 1; + + repeated PeerInfo my_peers = 2; +}