Not 100% completely broken host management

This commit is contained in:
2024-06-18 20:19:31 +02:00
parent 7b994520bd
commit 6762a89f8f
15 changed files with 462 additions and 47 deletions

View File

@@ -39,6 +39,18 @@
<groupId>io.quarkus</groupId>
<artifactId>quarkus-arc</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-rest</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-rest-jsonb</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-scheduler</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-junit5</artifactId>

View File

@@ -40,18 +40,7 @@ public class DistributedObjectRepository implements ObjectRepository {
SyncHandler syncHandler;
void init(@Observes @Priority(400) StartupEvent event) throws IOException {
try {
Log.info("Starting sync");
var got = remoteObjectServiceClient.getIndex();
for (var h : got.getObjectsList()) {
syncHandler.handleRemoteUpdate(IndexUpdatePush.newBuilder()
.setSelfname(got.getSelfname()).setHeader(h).build());
}
Log.info("Sync complete");
} catch (Exception e) {
Log.error("Error when fetching remote index:");
Log.error(e);
}
}
void shutdown(@Observes @Priority(200) ShutdownEvent event) throws IOException {
@@ -105,7 +94,7 @@ public class DistributedObjectRepository implements ObjectRepository {
});
// FIXME: Race?
try {
remoteObjectServiceClient.notifyUpdate(name);
syncHandler.notifyUpdateAll(name);
} catch (Exception e) {
Log.error("Error when notifying remote update:");
Log.error(e);

View File

@@ -0,0 +1,27 @@
package com.usatiuk.dhfs.storage.objects.repository.distributed;
import jakarta.json.bind.annotation.JsonbCreator;
import jakarta.json.bind.annotation.JsonbProperty;
import lombok.Getter;
import lombok.Setter;
import java.io.Serializable;
@Getter
public class HostInfo implements Serializable {
private final String _name;
@Setter
private String _addr;
@Setter
private Integer _port;
@JsonbCreator
public HostInfo(@JsonbProperty("_name") String name,
@JsonbProperty("_addr") String addr,
@JsonbProperty("_port") Integer port) {
_name = name;
_addr = addr;
_port = port;
}
}

View File

@@ -0,0 +1,8 @@
package com.usatiuk.dhfs.storage.objects.repository.distributed;
import jakarta.enterprise.context.ApplicationScoped;
@ApplicationScoped
public class InvalidationQueue {
}

View File

@@ -0,0 +1,29 @@
package com.usatiuk.dhfs.storage.objects.repository.distributed;
import jakarta.inject.Inject;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.PUT;
import jakarta.ws.rs.Path;
import java.util.List;
@Path("/objects-manage")
public class ManagmentApi {
@Inject
RemoteHostManager remoteHostManager;
@Inject
PersistentRemoteHostsService persistentRemoteHostsService;
@Path("known-peers")
@GET
public List<HostInfo> knownPeers() {
return persistentRemoteHostsService.getHosts();
}
@Path("known-peers")
@PUT
public void addPeer(HostInfo hostInfo) {
persistentRemoteHostsService.addHost(hostInfo);
}
}

View File

@@ -0,0 +1,33 @@
package com.usatiuk.dhfs.storage.objects.repository.distributed;
import java.io.Serializable;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class PersistentRemoteHosts implements Serializable {
private final PersistentRemoteHostsData _data = new PersistentRemoteHostsData();
private final ReadWriteLock _lock = new ReentrantReadWriteLock();
@FunctionalInterface
public interface PersistentRemoteHostsFn<R> {
R apply(PersistentRemoteHostsData hostsData);
}
public <R> R runReadLocked(PersistentRemoteHostsFn<R> fn) {
_lock.readLock().lock();
try {
return fn.apply(_data);
} finally {
_lock.readLock().unlock();
}
}
public <R> R runWriteLocked(PersistentRemoteHostsFn<R> fn) {
_lock.writeLock().lock();
try {
return fn.apply(_data);
} finally {
_lock.writeLock().unlock();
}
}
}

View File

@@ -0,0 +1,11 @@
package com.usatiuk.dhfs.storage.objects.repository.distributed;
import lombok.Getter;
import java.io.Serializable;
import java.util.HashMap;
public class PersistentRemoteHostsData implements Serializable {
@Getter
private final HashMap<String, HostInfo> _remoteHosts = new HashMap<>();
}

View File

@@ -0,0 +1,63 @@
package com.usatiuk.dhfs.storage.objects.repository.distributed;
import com.usatiuk.dhfs.storage.DeserializationHelper;
import io.quarkus.logging.Log;
import io.quarkus.runtime.ShutdownEvent;
import io.quarkus.runtime.StartupEvent;
import jakarta.annotation.Priority;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
import org.apache.commons.lang3.SerializationUtils;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.List;
@ApplicationScoped
public class PersistentRemoteHostsService {
@ConfigProperty(name = "dhfs.objects.distributed.selfname")
String selfname;
@ConfigProperty(name = "dhfs.objects.distributed.root")
String dataRoot;
final String dataFileName = "hosts";
private PersistentRemoteHosts _persistentData = new PersistentRemoteHosts();
void init(@Observes @Priority(300) StartupEvent event) throws IOException {
Paths.get(dataRoot).toFile().mkdirs();
Log.info("Initializing with root " + dataRoot);
if (Paths.get(dataRoot).resolve(dataFileName).toFile().exists()) {
Log.info("Reading hosts");
_persistentData = DeserializationHelper.deserialize(Files.readAllBytes(Paths.get(dataRoot).resolve(dataFileName)));
}
}
void shutdown(@Observes @Priority(300) ShutdownEvent event) throws IOException {
Log.info("Saving hosts");
Files.write(Paths.get(dataRoot).resolve(dataFileName), SerializationUtils.serialize(_persistentData));
Log.info("Shutdown");
}
public HostInfo getInfo(String name) {
return _persistentData.runReadLocked(data -> {
return data.getRemoteHosts().get(name);
});
}
public List<HostInfo> getHosts() {
return _persistentData.runReadLocked(data -> {
return data.getRemoteHosts().values().stream().toList();
});
}
public void addHost(HostInfo hostInfo) {
_persistentData.runWriteLocked(d -> {
d.getRemoteHosts().put(hostInfo.getName(), hostInfo);
return null;
});
}
}

View File

@@ -1,63 +1,172 @@
package com.usatiuk.dhfs.storage.objects.repository.distributed;
import com.usatiuk.dhfs.objects.repository.distributed.DhfsObjectSyncGrpcGrpc;
import com.usatiuk.dhfs.objects.repository.distributed.PingRequest;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.netty.NettyChannelBuilder;
import io.quarkus.logging.Log;
import io.quarkus.runtime.ShutdownEvent;
import io.quarkus.runtime.StartupEvent;
import io.quarkus.scheduler.Scheduled;
import jakarta.annotation.Priority;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
import lombok.AllArgsConstructor;
import lombok.Getter;
import jakarta.inject.Inject;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import java.io.IOException;
import java.util.HashMap;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
@ApplicationScoped
public class RemoteHostManager {
@ConfigProperty(name = "dhfs.objects.distributed.selfname")
String selfname;
@ConfigProperty(name = "dhfs.objects.distributed.friend")
String remoteHostName;
@ConfigProperty(name = "dhfs.objects.distributed.friendAddr")
String remoteHostAddr;
@ConfigProperty(name = "dhfs.objects.distributed.friendPort")
String remoteHostPort;
@Inject
PersistentRemoteHostsService persistentRemoteHostsService;
@Getter
@AllArgsConstructor
private static class HostInfo {
String _addr;
Integer _port;
}
final HashMap<String, HostInfo> _remoteHosts = new HashMap<>();
TransientPeersState _transientPeersState = new TransientPeersState();
void init(@Observes @Priority(350) StartupEvent event) throws IOException {
_remoteHosts.put(remoteHostName, new HostInfo(remoteHostAddr, Integer.valueOf(remoteHostPort)));
tryConnectAll();
}
void shutdown(@Observes @Priority(250) ShutdownEvent event) throws IOException {
}
@Scheduled(every = "10s")
public void tryConnectAll() {
for (var host : persistentRemoteHostsService.getHosts()) {
var shouldTry = _transientPeersState.runReadLocked(d -> {
var s = d.getStates().get(host.getName());
if (s == null) return true;
if (s.getState().equals(TransientPeersStateData.TransientPeerState.ConnectionState.REACHABLE))
return false;
return true;
});
if (shouldTry) {
Log.info("Trying to connect to " + host);
if (reachable(host)) {
Log.info("Connected to " + host);
handleConnectionSuccess(host.getName());
}
}
}
}
private final ArrayList<Function<String, Void>> _connectionSuccessHandlers = new ArrayList<>();
private final ArrayList<Function<String, Void>> _connectionErrorHandlers = new ArrayList<>();
private void handleConnectionSuccess(String host) {
_transientPeersState.runWriteLocked(d -> {
d.getStates().putIfAbsent(host, new TransientPeersStateData.TransientPeerState());
var curState = d.getStates().get(host);
curState.setState(TransientPeersStateData.TransientPeerState.ConnectionState.REACHABLE);
return null;
});
for (var h : _connectionSuccessHandlers) {
h.apply(host);
}
}
private void handleConnectionError(String host) {
_transientPeersState.runWriteLocked(d -> {
d.getStates().putIfAbsent(host, new TransientPeersStateData.TransientPeerState());
var curState = d.getStates().get(host);
curState.setState(TransientPeersStateData.TransientPeerState.ConnectionState.UNREACHABLE);
return null;
});
for (var h : _connectionErrorHandlers) {
h.apply(host);
}
}
public void addConnectionSuccessHandler(Function<String, Void> handler) {
_connectionSuccessHandlers.add(handler);
}
public void addConnectionErrorHandler(Function<String, Void> handler) {
_connectionErrorHandlers.add(handler);
}
@FunctionalInterface
public interface ClientFunction<R> {
R apply(DhfsObjectSyncGrpcGrpc.DhfsObjectSyncGrpcBlockingStub client);
}
public <R> R withClient(ClientFunction<R> fn) {
var hostInfo = _remoteHosts.get(remoteHostName);
var channel = NettyChannelBuilder.forAddress(hostInfo.getAddr(), hostInfo.getPort())
private <R> R withClient(String addr, int port, Optional<Long> timeout, ClientFunction<R> fn) {
var channel = NettyChannelBuilder.forAddress(addr, port)
.usePlaintext().build();
var client = DhfsObjectSyncGrpcGrpc.newBlockingStub(channel)
.withMaxOutboundMessageSize(Integer.MAX_VALUE)
.withMaxInboundMessageSize(Integer.MAX_VALUE);
if (timeout.isPresent()) {
client = client.withDeadlineAfter(timeout.get(), TimeUnit.MILLISECONDS);
}
try {
return fn.apply(client);
} finally {
channel.shutdownNow();
}
}
// FIXME:
private boolean reachable(HostInfo hostInfo) {
try {
return withClient(hostInfo.getAddr(), hostInfo.getPort(), Optional.of(4000L /*ms*/), c -> {
var ret = c.ping(PingRequest.newBuilder().setSelfname(selfname).build());
if (!ret.getSelfname().equals(hostInfo.getName())) {
throw new IllegalStateException("Ping selfname returned " + ret.getSelfname() + " but expected " + hostInfo.getName());
}
return true;
});
} catch (Exception ignored) {
Log.info("Host " + hostInfo.getName() + " is unreachable: " + ignored.getMessage() + " " + ignored.getCause());
return false;
}
}
public boolean reachable(String host) {
return reachable(persistentRemoteHostsService.getInfo(host));
}
public <R> R withClientAny(Collection<String> targets, ClientFunction<R> fn) {
var shuffledList = new ArrayList<>(targets);
Collections.shuffle(shuffledList);
for (String target : shuffledList) {
var hostinfo = persistentRemoteHostsService.getInfo(target);
boolean shouldTry = _transientPeersState.runReadLocked(d -> {
var res = d.getStates().get(hostinfo.getName());
if (res == null) return true;
return res.getState() == TransientPeersStateData.TransientPeerState.ConnectionState.REACHABLE;
});
if (!shouldTry) continue;
try {
return withClient(hostinfo.getAddr(), hostinfo.getPort(), Optional.empty(), fn);
} catch (StatusRuntimeException e) {
if (e.getStatus().equals(Status.UNAVAILABLE)) {
Log.info("Host " + hostinfo.getName() + " is unreachable: " + e.getMessage());
handleConnectionError(hostinfo.getName());
} else throw e;
}
}
throw new IllegalStateException("No reachable targets!");
}
public <R> R withClient(String target, ClientFunction<R> fn) {
var hostinfo = persistentRemoteHostsService.getInfo(target);
return withClient(hostinfo.getAddr(), hostinfo.getPort(), Optional.empty(), fn);
}
public List<String> getAvailableHosts() {
return _transientPeersState.runReadLocked(d -> d.getStates().entrySet().stream()
.filter(e -> e.getValue().getState().equals(TransientPeersStateData.TransientPeerState.ConnectionState.REACHABLE))
.map(Map.Entry::getKey).toList());
}
}

View File

@@ -7,6 +7,8 @@ import jakarta.inject.Inject;
import org.apache.commons.lang3.NotImplementedException;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import java.util.Map;
@ApplicationScoped
public class RemoteObjectServiceClient {
@ConfigProperty(name = "dhfs.objects.distributed.selfname")
@@ -19,13 +21,18 @@ public class RemoteObjectServiceClient {
RemoteHostManager remoteHostManager;
public byte[] getObject(String name) {
return remoteHostManager.withClient(client -> {
var reply = client.getObject(GetObjectRequest.newBuilder().setName(name).build());
var meta = objectIndexService.getMeta(name).orElseThrow(() -> {
Log.error("Race when trying to fetch");
return new NotImplementedException();
});
var meta = objectIndexService.getMeta(name).orElseThrow(() -> {
Log.error("Race when trying to fetch");
return new NotImplementedException();
});
var targets = meta.runReadLocked(d -> {
var bestVer = d.getBestVersion();
return d.getRemoteCopies().entrySet().stream().filter(entry -> entry.getValue().equals(bestVer)).map(Map.Entry::getKey).toList();
});
return remoteHostManager.withClientAny(targets, client -> {
var reply = client.getObject(GetObjectRequest.newBuilder().setName(name).build());
var receivedSelfVer = reply.getObject().getHeader().getChangelog()
.getEntriesList().stream().filter(p -> p.getHost().equals(selfname))
@@ -51,16 +58,16 @@ public class RemoteObjectServiceClient {
});
}
public GetIndexReply getIndex() {
return remoteHostManager.withClient(client -> {
public GetIndexReply getIndex(String host) {
return remoteHostManager.withClient(host, client -> {
var req = GetIndexRequest.newBuilder().build();
var reply = client.getIndex(req);
return reply;
});
}
public Boolean notifyUpdate(String name) {
return remoteHostManager.withClient(client -> {
public void notifyUpdate(String host, String name) {
remoteHostManager.withClient(host, client -> {
var meta = objectIndexService.getMeta(name).orElseThrow(() -> {
Log.error("Race when trying to notify update");
return new NotImplementedException();
@@ -71,7 +78,7 @@ public class RemoteObjectServiceClient {
client.indexUpdate(builder.setHeader(
meta.runReadLocked(ObjectMetaData::toRpcHeader)
).build());
return true;
return null;
});
}
}

View File

@@ -11,12 +11,16 @@ import io.smallrye.common.annotation.Blocking;
import io.smallrye.mutiny.Uni;
import jakarta.inject.Inject;
import org.apache.commons.lang3.tuple.Pair;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import java.util.Optional;
// Note: RunOnVirtualThread hangs somehow
@GrpcService
public class RemoteObjectServiceServer implements DhfsObjectSyncGrpc {
@ConfigProperty(name = "dhfs.objects.distributed.selfname")
String selfname;
@Inject
ObjectPersistentStore objectPersistentStore;
@@ -51,7 +55,7 @@ public class RemoteObjectServiceServer implements DhfsObjectSyncGrpc {
@Blocking
public Uni<GetIndexReply> getIndex(GetIndexRequest request) {
Log.info("<-- getIndex: ");
var builder = GetIndexReply.newBuilder();
var builder = GetIndexReply.newBuilder().setSelfname(selfname);
objectIndexService.forAllRead((name, meta) -> {
builder.addObjects(meta.runReadLocked(ObjectMetaData::toRpcHeader));
});
@@ -64,4 +68,9 @@ public class RemoteObjectServiceServer implements DhfsObjectSyncGrpc {
Log.info("<-- indexUpdate: " + request.getHeader().getName());
return Uni.createFrom().item(syncHandler.handleRemoteUpdate(request));
}
@Override
public Uni<PingReply> ping(PingRequest request) {
return Uni.createFrom().item(PingReply.newBuilder().setSelfname(selfname).build());
}
}

View File

@@ -8,11 +8,17 @@ import com.usatiuk.dhfs.storage.objects.repository.persistence.ObjectPersistentS
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.quarkus.logging.Log;
import io.quarkus.runtime.ShutdownEvent;
import io.quarkus.runtime.StartupEvent;
import jakarta.annotation.Priority;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
import jakarta.inject.Inject;
import org.apache.commons.lang3.NotImplementedException;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import java.io.IOException;
@ApplicationScoped
public class SyncHandler {
@ConfigProperty(name = "dhfs.objects.distributed.selfname")
@@ -27,6 +33,30 @@ public class SyncHandler {
@Inject
JObjectManager jObjectManager;
@Inject
RemoteHostManager remoteHostManager;
@Inject
RemoteObjectServiceClient remoteObjectServiceClient;
void init(@Observes @Priority(340) StartupEvent event) throws IOException {
remoteHostManager.addConnectionSuccessHandler((host) -> {
doInitialResync(host);
return null;
});
}
void shutdown(@Observes @Priority(240) ShutdownEvent event) throws IOException {
}
private void doInitialResync(String host) {
var got = remoteObjectServiceClient.getIndex(host);
for (var h : got.getObjectsList()) {
handleRemoteUpdate(IndexUpdatePush.newBuilder()
.setSelfname(got.getSelfname()).setHeader(h).build());
}
}
public IndexUpdateReply handleRemoteUpdate(IndexUpdatePush request) {
var meta = objectIndexService.getOrCreateMeta(request.getHeader().getName(), request.getHeader().getAssumeUnique());
@@ -81,4 +111,21 @@ public class SyncHandler {
return IndexUpdateReply.newBuilder().build();
}
public void notifyUpdateAll(String name) {
for (var host : remoteHostManager.getAvailableHosts())
remoteHostManager.withClient(host, client -> {
var meta = objectIndexService.getMeta(name).orElseThrow(() -> {
Log.error("Race when trying to notify update");
return new NotImplementedException();
});
var builder = IndexUpdatePush.newBuilder().setSelfname(selfname);
client.indexUpdate(builder.setHeader(
meta.runReadLocked(ObjectMetaData::toRpcHeader)
).build());
return null;
});
}
}

View File

@@ -0,0 +1,32 @@
package com.usatiuk.dhfs.storage.objects.repository.distributed;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class TransientPeersState {
private final TransientPeersStateData _data = new TransientPeersStateData();
private final ReadWriteLock _lock = new ReentrantReadWriteLock();
@FunctionalInterface
public interface TransientPeersStaten<R> {
R apply(TransientPeersStateData hostsData);
}
public <R> R runReadLocked(TransientPeersStaten<R> fn) {
_lock.readLock().lock();
try {
return fn.apply(_data);
} finally {
_lock.readLock().unlock();
}
}
public <R> R runWriteLocked(TransientPeersStaten<R> fn) {
_lock.writeLock().lock();
try {
return fn.apply(_data);
} finally {
_lock.writeLock().unlock();
}
}
}

View File

@@ -0,0 +1,29 @@
package com.usatiuk.dhfs.storage.objects.repository.distributed;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import java.util.LinkedHashMap;
import java.util.Map;
public class TransientPeersStateData {
@AllArgsConstructor
@NoArgsConstructor
public static class TransientPeerState {
public enum ConnectionState {
NOT_SEEN,
REACHABLE,
UNREACHABLE
}
@Getter
@Setter
private ConnectionState _state = ConnectionState.NOT_SEEN;
}
@Getter
private final Map<String, TransientPeerState> _states = new LinkedHashMap<>();
}

View File

@@ -10,6 +10,16 @@ service DhfsObjectSyncGrpc {
rpc GetObject (GetObjectRequest) returns (GetObjectReply) {}
rpc GetIndex (GetIndexRequest) returns (GetIndexReply) {}
rpc IndexUpdate (IndexUpdatePush) returns (IndexUpdateReply) {}
rpc Ping (PingRequest) returns (PingReply) {}
}
message PingRequest {
string selfname = 1;
}
message PingReply {
string selfname = 1;
}
message ObjectChangelogEntry {
@@ -43,7 +53,7 @@ message GetObjectReply {
}
message GetIndexRequest {
bool dontaskme = 1;
}
message GetIndexReply {