simple peer sync

not tested
This commit is contained in:
2024-06-22 17:06:08 +02:00
parent e3dc6e2a71
commit f4be438f33
9 changed files with 267 additions and 106 deletions

View File

@@ -1,9 +1,9 @@
package com.usatiuk.dhfs.storage.objects.repository.distributed;
import com.usatiuk.dhfs.objects.repository.distributed.peersync.PeerInfo;
import jakarta.json.bind.annotation.JsonbCreator;
import jakarta.json.bind.annotation.JsonbProperty;
import lombok.Getter;
import lombok.Setter;
import java.io.Serializable;
import java.util.UUID;
@@ -12,17 +12,12 @@ import java.util.UUID;
public class HostInfo implements Serializable {
private final UUID _uuid;
@Setter
private String _addr;
@Setter
private Integer _port;
@JsonbCreator
public HostInfo(@JsonbProperty("uuid") String uuid,
@JsonbProperty("addr") String addr,
@JsonbProperty("port") Integer port) {
public HostInfo(@JsonbProperty("uuid") String uuid) {
_uuid = UUID.fromString(uuid);
_addr = addr;
_port = port;
}
public PeerInfo toPeerInfo() {
return PeerInfo.newBuilder().setUuid(_uuid.toString()).build();
}
}

View File

@@ -1,11 +1,7 @@
package com.usatiuk.dhfs.storage.objects.repository.distributed;
import com.usatiuk.dhfs.objects.repository.distributed.DhfsObjectSyncGrpcGrpc;
import com.usatiuk.dhfs.objects.repository.distributed.PingRequest;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.netty.NegotiationType;
import io.grpc.netty.NettyChannelBuilder;
import com.usatiuk.dhfs.storage.objects.repository.distributed.peersync.PeerSyncClient;
import io.quarkus.logging.Log;
import io.quarkus.runtime.ShutdownEvent;
import io.quarkus.runtime.StartupEvent;
@@ -17,8 +13,10 @@ import jakarta.enterprise.event.Observes;
import jakarta.inject.Inject;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
@ApplicationScoped
public class RemoteHostManager {
@@ -28,7 +26,13 @@ public class RemoteHostManager {
@Inject
SyncHandler syncHandler;
TransientPeersState _transientPeersState = new TransientPeersState();
@Inject
RpcClientFactory rpcClientFactory;
@Inject
PeerSyncClient peerSyncClient;
private final TransientPeersState _transientPeersState = new TransientPeersState();
void init(@Observes @Priority(350) StartupEvent event) throws IOException {
}
@@ -43,11 +47,11 @@ public class RemoteHostManager {
var shouldTry = _transientPeersState.runReadLocked(d -> {
var s = d.getStates().get(host.getUuid());
if (s == null) return true;
return !s.getState().equals(TransientPeersStateData.TransientPeerState.ConnectionState.REACHABLE);
return !s.getState().equals(TransientPeerState.ConnectionState.REACHABLE);
});
if (shouldTry) {
Log.info("Trying to connect to " + host.getUuid());
if (reachable(host)) {
if (pingCheck(host.getUuid())) {
handleConnectionSuccess(host.getUuid());
}
}
@@ -56,110 +60,70 @@ public class RemoteHostManager {
public void handleConnectionSuccess(UUID host) {
if (_transientPeersState.runReadLocked(d -> d.getStates().getOrDefault(
host, new TransientPeersStateData.TransientPeerState(TransientPeersStateData.TransientPeerState.ConnectionState.NOT_SEEN)
)).getState().equals(TransientPeersStateData.TransientPeerState.ConnectionState.REACHABLE)) return;
host, new TransientPeerState(TransientPeerState.ConnectionState.NOT_SEEN)
)).getState().equals(TransientPeerState.ConnectionState.REACHABLE)) return;
_transientPeersState.runWriteLocked(d -> {
d.getStates().putIfAbsent(host, new TransientPeersStateData.TransientPeerState());
d.getStates().putIfAbsent(host, new TransientPeerState());
var curState = d.getStates().get(host);
curState.setState(TransientPeersStateData.TransientPeerState.ConnectionState.REACHABLE);
curState.setState(TransientPeerState.ConnectionState.REACHABLE);
return null;
});
Log.info("Connected to " + host);
syncHandler.doInitialResync(host);
peerSyncClient.syncPeersOne(host);
}
public void handleConnectionError(UUID host) {
Log.info("Lost connection to " + host);
_transientPeersState.runWriteLocked(d -> {
d.getStates().putIfAbsent(host, new TransientPeersStateData.TransientPeerState());
d.getStates().putIfAbsent(host, new TransientPeerState());
var curState = d.getStates().get(host);
curState.setState(TransientPeersStateData.TransientPeerState.ConnectionState.UNREACHABLE);
curState.setState(TransientPeerState.ConnectionState.UNREACHABLE);
return null;
});
}
@FunctionalInterface
public interface ClientFunction<R> {
R apply(DhfsObjectSyncGrpcGrpc.DhfsObjectSyncGrpcBlockingStub client);
}
private <R> R withClient(String addr, int port, Optional<Long> timeout, ClientFunction<R> fn) {
var channel = NettyChannelBuilder.forAddress(addr, port).negotiationType(NegotiationType.PLAINTEXT)
.usePlaintext().build();
var client = DhfsObjectSyncGrpcGrpc.newBlockingStub(channel)
.withMaxOutboundMessageSize(Integer.MAX_VALUE)
.withMaxInboundMessageSize(Integer.MAX_VALUE);
if (timeout.isPresent()) {
client = client.withDeadlineAfter(timeout.get(), TimeUnit.MILLISECONDS);
}
try {
return fn.apply(client);
} finally {
channel.shutdownNow();
}
}
// FIXME:
private boolean reachable(HostInfo hostInfo) {
private boolean pingCheck(UUID host) {
TransientPeerState state = _transientPeersState.runReadLocked(s -> s.getStates().get(host));
if (state == null) return false;
try {
return withClient(hostInfo.getAddr(), hostInfo.getPort(), Optional.of(5000L /*ms*/), c -> {
return rpcClientFactory.withObjSyncClient(state.getAddr(), state.getPort(), Optional.of(5000L /*ms*/), c -> {
var ret = c.ping(PingRequest.newBuilder().setSelfUuid(persistentRemoteHostsService.getSelfUuid().toString()).build());
if (!UUID.fromString(ret.getSelfUuid()).equals(hostInfo.getUuid())) {
throw new IllegalStateException("Ping selfUuid returned " + ret.getSelfUuid() + " but expected " + hostInfo.getUuid());
if (!UUID.fromString(ret.getSelfUuid()).equals(host)) {
throw new IllegalStateException("Ping selfUuid returned " + ret.getSelfUuid() + " but expected " + host);
}
return true;
});
} catch (Exception ignored) {
Log.info("Host " + hostInfo.getUuid() + " is unreachable: " + ignored.getMessage() + " " + ignored.getCause());
Log.info("Host " + host + " is unreachable: " + ignored.getMessage() + " " + ignored.getCause());
return false;
}
}
public boolean reachable(UUID host) {
return reachable(persistentRemoteHostsService.getInfo(host));
public boolean isReachable(UUID host) {
return _transientPeersState.runReadLocked(d -> {
var res = d.getStates().get(host);
return res.getState() == TransientPeerState.ConnectionState.REACHABLE;
});
}
public <R> R withClientAny(Collection<UUID> targets, ClientFunction<R> fn) {
var shuffledList = new ArrayList<>(targets);
Collections.shuffle(shuffledList);
for (UUID target : shuffledList) {
var hostinfo = persistentRemoteHostsService.getInfo(target);
boolean shouldTry = _transientPeersState.runReadLocked(d -> {
var res = d.getStates().get(hostinfo.getUuid());
if (res == null) return true;
return res.getState() == TransientPeersStateData.TransientPeerState.ConnectionState.REACHABLE;
});
if (!shouldTry) continue;
try {
return withClient(hostinfo.getAddr(), hostinfo.getPort(), Optional.empty(), fn);
} catch (StatusRuntimeException e) {
if (e.getStatus().equals(Status.UNAVAILABLE)) {
Log.info("Host " + hostinfo.getUuid() + " is unreachable: " + e.getMessage());
handleConnectionError(hostinfo.getUuid());
} else throw e;
}
}
throw new IllegalStateException("No reachable targets!");
}
public <R> R withClient(UUID target, ClientFunction<R> fn) {
var hostinfo = persistentRemoteHostsService.getInfo(target);
return withClient(hostinfo.getAddr(), hostinfo.getPort(), Optional.empty(), fn);
public TransientPeerState getTransientState(UUID host) {
return _transientPeersState.runReadLocked(d -> {
d.getStates().putIfAbsent(host, new TransientPeerState());
return d.getStates().get(host);
});
}
public List<UUID> getAvailableHosts() {
return _transientPeersState.runReadLocked(d -> d.getStates().entrySet().stream()
.filter(e -> e.getValue().getState().equals(TransientPeersStateData.TransientPeerState.ConnectionState.REACHABLE))
.filter(e -> e.getValue().getState().equals(TransientPeerState.ConnectionState.REACHABLE))
.map(Map.Entry::getKey).toList());
}
public List<UUID> getSeenHosts() {
return _transientPeersState.runReadLocked(d -> d.getStates().entrySet().stream()
.filter(e -> !e.getValue().getState().equals(TransientPeersStateData.TransientPeerState.ConnectionState.NOT_SEEN))
.filter(e -> !e.getValue().getState().equals(TransientPeerState.ConnectionState.NOT_SEEN))
.map(Map.Entry::getKey).toList());
}

View File

@@ -20,13 +20,13 @@ public class RemoteObjectServiceClient {
PersistentRemoteHostsService persistentRemoteHostsService;
@Inject
RemoteHostManager remoteHostManager;
RpcClientFactory rpcClientFactory;
@Inject
JObjectManager jObjectManager;
public Pair<ObjectHeader, ByteString> getSpecificObject(UUID host, String name) {
return remoteHostManager.withClient(host, client -> {
return rpcClientFactory.withObjSyncClient(host, client -> {
var reply = client.getObject(GetObjectRequest.newBuilder().setSelfUuid(persistentRemoteHostsService.getSelfUuid().toString()).setName(name).build());
return Pair.of(reply.getObject().getHeader(), reply.getObject().getContent());
});
@@ -40,7 +40,7 @@ public class RemoteObjectServiceClient {
return md.getRemoteCopies().entrySet().stream().filter(entry -> entry.getValue().equals(bestVer)).map(Map.Entry::getKey).toList();
});
return remoteHostManager.withClientAny(targets, client -> {
return rpcClientFactory.withObjSyncClient(targets, client -> {
var reply = client.getObject(GetObjectRequest.newBuilder().setSelfUuid(persistentRemoteHostsService.getSelfUuid().toString()).setName(jObject.getName()).build());
var receivedSelfVer = reply.getObject().getHeader().getChangelog()
@@ -65,7 +65,7 @@ public class RemoteObjectServiceClient {
}
public IndexUpdatePush getIndex(UUID host) {
return remoteHostManager.withClient(host, client -> {
return rpcClientFactory.withObjSyncClient(host, client -> {
var req = GetIndexRequest.newBuilder().setSelfUuid(persistentRemoteHostsService.getSelfUuid().toString()).build();
return client.getIndex(req);
});
@@ -81,6 +81,6 @@ public class RemoteObjectServiceClient {
var send = builder.build();
return remoteHostManager.withClient(host, client -> client.indexUpdate(send).getErrorsList());
return rpcClientFactory.withObjSyncClient(host, client -> client.indexUpdate(send).getErrorsList());
}
}

View File

@@ -0,0 +1,101 @@
package com.usatiuk.dhfs.storage.objects.repository.distributed;
import com.usatiuk.dhfs.objects.repository.distributed.DhfsObjectSyncGrpcGrpc;
import com.usatiuk.dhfs.objects.repository.distributed.peersync.DhfsObjectPeerSyncGrpcGrpc;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.netty.NegotiationType;
import io.grpc.netty.NettyChannelBuilder;
import io.quarkus.logging.Log;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import java.util.*;
import java.util.concurrent.TimeUnit;
// TODO: Dedup this
@ApplicationScoped
public class RpcClientFactory {
@Inject
PersistentRemoteHostsService persistentRemoteHostsService;
@Inject
RemoteHostManager remoteHostManager;
@FunctionalInterface
public interface ObjectSyncClientFunction<R> {
R apply(DhfsObjectSyncGrpcGrpc.DhfsObjectSyncGrpcBlockingStub client);
}
public <R> R withObjSyncClient(Collection<UUID> targets, ObjectSyncClientFunction<R> fn) {
var shuffledList = new ArrayList<>(targets);
Collections.shuffle(shuffledList);
for (UUID target : shuffledList) {
var hostinfo = remoteHostManager.getTransientState(target);
boolean shouldTry = remoteHostManager.isReachable(target);
if (!shouldTry) continue;
try {
return withObjSyncClient(hostinfo.getAddr(), hostinfo.getPort(), Optional.empty(), fn);
} catch (StatusRuntimeException e) {
if (e.getStatus().equals(Status.UNAVAILABLE)) {
Log.info("Host " + target + " is unreachable: " + e.getMessage());
remoteHostManager.handleConnectionError(target);
} else throw e;
}
}
throw new IllegalStateException("No reachable targets!");
}
public <R> R withObjSyncClient(UUID target, ObjectSyncClientFunction<R> fn) {
var hostinfo = remoteHostManager.getTransientState(target);
return withObjSyncClient(hostinfo.getAddr(), hostinfo.getPort(), Optional.empty(), fn);
}
public <R> R withObjSyncClient(String addr, int port, Optional<Long> timeout, ObjectSyncClientFunction<R> fn) {
var channel = NettyChannelBuilder.forAddress(addr, port).negotiationType(NegotiationType.PLAINTEXT)
.usePlaintext().build();
var client = DhfsObjectSyncGrpcGrpc.newBlockingStub(channel)
.withMaxOutboundMessageSize(Integer.MAX_VALUE)
.withMaxInboundMessageSize(Integer.MAX_VALUE);
if (timeout.isPresent()) {
client = client.withDeadlineAfter(timeout.get(), TimeUnit.MILLISECONDS);
}
try {
return fn.apply(client);
} finally {
channel.shutdownNow();
}
}
@FunctionalInterface
public interface PeerSyncClientFunction<R> {
R apply(DhfsObjectPeerSyncGrpcGrpc.DhfsObjectPeerSyncGrpcBlockingStub client);
}
public <R> R withPeerSyncClient(UUID target, PeerSyncClientFunction<R> fn) {
var hostinfo = remoteHostManager.getTransientState(target);
return withPeerSyncClient(hostinfo.getAddr(), hostinfo.getPort(), Optional.empty(), fn);
}
public <R> R withPeerSyncClient(String addr, int port, Optional<Long> timeout, PeerSyncClientFunction<R> fn) {
var channel = NettyChannelBuilder.forAddress(addr, port).negotiationType(NegotiationType.PLAINTEXT)
.usePlaintext().build();
var client = DhfsObjectPeerSyncGrpcGrpc.newBlockingStub(channel)
.withMaxOutboundMessageSize(Integer.MAX_VALUE)
.withMaxInboundMessageSize(Integer.MAX_VALUE);
if (timeout.isPresent()) {
client = client.withDeadlineAfter(timeout.get(), TimeUnit.MILLISECONDS);
}
try {
return fn.apply(client);
} finally {
channel.shutdownNow();
}
}
}

View File

@@ -0,0 +1,32 @@
package com.usatiuk.dhfs.storage.objects.repository.distributed;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
@AllArgsConstructor
@NoArgsConstructor
public class TransientPeerState {
public TransientPeerState(ConnectionState connectionState) {
_state = connectionState;
}
public enum ConnectionState {
NOT_SEEN,
REACHABLE,
UNREACHABLE
}
@Getter
@Setter
private ConnectionState _state = ConnectionState.NOT_SEEN;
@Getter
@Setter
private String _addr;
@Getter
@Setter
private int _port;
}

View File

@@ -9,22 +9,9 @@ import java.util.LinkedHashMap;
import java.util.Map;
import java.util.UUID;
public class TransientPeersStateData {
@AllArgsConstructor
@NoArgsConstructor
public static class TransientPeerState {
public enum ConnectionState {
NOT_SEEN,
REACHABLE,
UNREACHABLE
}
@Getter
@Setter
private ConnectionState _state = ConnectionState.NOT_SEEN;
}
@Getter
private final Map<UUID, TransientPeerState> _states = new LinkedHashMap<>();
}

View File

@@ -0,0 +1,46 @@
package com.usatiuk.dhfs.storage.objects.repository.distributed.peersync;
import com.usatiuk.dhfs.objects.repository.distributed.peersync.PeerInfo;
import com.usatiuk.dhfs.objects.repository.distributed.peersync.SyncPeersData;
import com.usatiuk.dhfs.storage.objects.repository.distributed.HostInfo;
import com.usatiuk.dhfs.storage.objects.repository.distributed.PersistentRemoteHostsService;
import com.usatiuk.dhfs.storage.objects.repository.distributed.RemoteHostManager;
import com.usatiuk.dhfs.storage.objects.repository.distributed.RpcClientFactory;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import java.util.UUID;
@ApplicationScoped
public class PeerSyncClient {
@Inject
RemoteHostManager remoteHostManager;
@Inject
RpcClientFactory rpcClientFactory;
@Inject
PersistentRemoteHostsService persistentRemoteHostsService;
public void syncPeersOne(UUID host) {
var ret = rpcClientFactory.withPeerSyncClient(host, client -> {
var builder = SyncPeersData.newBuilder();
builder.setSelfUuid(persistentRemoteHostsService.getSelfUuid().toString());
for (var h : persistentRemoteHostsService.getHosts()) {
builder.addMyPeers(h.toPeerInfo());
}
builder.addMyPeers(PeerInfo.newBuilder().setUuid(persistentRemoteHostsService.getSelfUuid().toString()).build());
return client.syncPeers(builder.build());
});
for (var np : ret.getMyPeersList()) {
persistentRemoteHostsService.addHost(new HostInfo(np.getUuid()));
}
}
public void syncPeersAll() {
for (var h : remoteHostManager.getSeenHosts()) {
syncPeersOne(h);
}
}
}

View File

@@ -0,0 +1,15 @@
package com.usatiuk.dhfs.storage.objects.repository.distributed.peersync;
import com.usatiuk.dhfs.objects.repository.distributed.peersync.DhfsObjectPeerSyncGrpc;
import com.usatiuk.dhfs.objects.repository.distributed.peersync.SyncPeersData;
import io.quarkus.grpc.GrpcService;
import io.smallrye.mutiny.Uni;
@GrpcService
public class PeerSyncServer implements DhfsObjectPeerSyncGrpc {
@Override
public Uni<SyncPeersData> syncPeers(SyncPeersData request) {
return null;
}
}

View File

@@ -0,0 +1,21 @@
syntax = "proto3";
option java_multiple_files = true;
option java_package = "com.usatiuk.dhfs.objects.repository.distributed.peersync";
option java_outer_classname = "DhfsObjectPeerSyncApi";
package dhfs.objects.peersync;
service DhfsObjectPeerSyncGrpc {
rpc SyncPeers (SyncPeersData) returns (SyncPeersData) {}
}
message PeerInfo {
string uuid = 1;
}
message SyncPeersData {
string selfUuid = 1;
repeated PeerInfo my_peers = 2;
}