use uuids for peers

This commit is contained in:
2024-06-22 16:16:27 +02:00
parent 386bd14458
commit e3dc6e2a71
21 changed files with 134 additions and 120 deletions

View File

@@ -4,12 +4,12 @@ import com.usatiuk.dhfs.storage.SerializationHelper;
import com.usatiuk.dhfs.storage.objects.jrepository.JObject;
import com.usatiuk.dhfs.storage.objects.repository.distributed.ConflictResolver;
import com.usatiuk.dhfs.storage.objects.repository.distributed.ObjectMetadata;
import com.usatiuk.dhfs.storage.objects.repository.distributed.PersistentRemoteHostsService;
import com.usatiuk.dhfs.storage.objects.repository.distributed.RemoteObjectServiceClient;
import io.quarkus.logging.Log;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import org.apache.commons.lang3.NotImplementedException;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import java.util.LinkedHashMap;
import java.util.Objects;
@@ -17,14 +17,14 @@ import java.util.UUID;
@ApplicationScoped
public class DirectoryConflictResolver implements ConflictResolver {
@ConfigProperty(name = "dhfs.objects.distributed.selfname")
String selfname;
@Inject
PersistentRemoteHostsService persistentRemoteHostsService;
@Inject
RemoteObjectServiceClient remoteObjectServiceClient;
@Override
public ConflictResolutionResult resolve(String conflictHost, JObject<?> ours) {
public ConflictResolutionResult resolve(UUID conflictHost, JObject<?> ours) {
var theirsData = remoteObjectServiceClient.getSpecificObject(conflictHost, ours.getName());
if (!ours.isOf(Directory.class))
@@ -52,7 +52,7 @@ public class DirectoryConflictResolver implements ConflictResolver {
Directory first;
Directory second;
String otherHostname;
UUID otherHostname;
if (oursDir.getMtime() >= theirsDir.getMtime()) {
first = oursDir;
second = theirsDir;
@@ -60,7 +60,7 @@ public class DirectoryConflictResolver implements ConflictResolver {
} else {
second = oursDir;
first = theirsDir;
otherHostname = selfname;
otherHostname = persistentRemoteHostsService.getSelfUuid();
}
mergedChildren.putAll(first.getChildren());
@@ -85,15 +85,15 @@ public class DirectoryConflictResolver implements ConflictResolver {
newMetadata = new ObjectMetadata(ours.getName(), oursHeader.getConflictResolver(), m.getType());
for (var entry : oursHeader.getChangelog().getEntriesList()) {
newMetadata.getChangelog().put(entry.getHost(), entry.getVersion());
newMetadata.getChangelog().put(UUID.fromString(entry.getHost()), entry.getVersion());
}
for (var entry : theirsHeader.getChangelog().getEntriesList()) {
newMetadata.getChangelog().merge(entry.getHost(), entry.getVersion(), Long::max);
newMetadata.getChangelog().merge(UUID.fromString(entry.getHost()), entry.getVersion(), Long::max);
}
boolean wasChanged = mergedChildren.size() != first.getChildren().size();
if (wasChanged) {
newMetadata.getChangelog().merge(selfname, 1L, Long::sum);
newMetadata.getChangelog().merge(persistentRemoteHostsService.getSelfUuid(), 1L, Long::sum);
}
newMtime = first.getMtime();

View File

@@ -4,10 +4,12 @@ import com.usatiuk.dhfs.storage.objects.jrepository.JObject;
import com.usatiuk.dhfs.storage.objects.repository.distributed.ConflictResolver;
import jakarta.enterprise.context.ApplicationScoped;
import java.util.UUID;
@ApplicationScoped
public class NoOpConflictResolver implements ConflictResolver {
@Override
public ConflictResolutionResult resolve(String conflictHost, JObject<?> conflictSource) {
public ConflictResolutionResult resolve(UUID conflictHost, JObject<?> conflictSource) {
// Maybe check types?
return ConflictResolutionResult.RESOLVED;
}

View File

@@ -5,10 +5,12 @@ import com.usatiuk.dhfs.storage.objects.repository.distributed.ConflictResolver;
import jakarta.enterprise.context.ApplicationScoped;
import org.apache.commons.lang3.NotImplementedException;
import java.util.UUID;
@ApplicationScoped
public class NotImplementedConflictResolver implements ConflictResolver {
@Override
public ConflictResolutionResult resolve(String conflictHost, JObject<?> conflictSource) {
public ConflictResolutionResult resolve(UUID conflictHost, JObject<?> conflictSource) {
throw new NotImplementedException();
}
}

View File

@@ -8,18 +8,19 @@ import org.apache.commons.lang3.NotImplementedException;
import java.io.Serializable;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class JObject<T extends JObjectData> implements Serializable {
// Create a new object
protected JObject(JObjectResolver resolver, String name, String conflictResolver, String selfname, T obj) {
protected JObject(JObjectResolver resolver, String name, String conflictResolver, UUID selfUuid, T obj) {
_resolver = resolver;
_metaPart = new ObjectMetadata(name, conflictResolver, obj.getClass());
_dataPart.set(obj);
// FIXME:?
if (!obj.assumeUnique())
_metaPart.bumpVersion(selfname);
_metaPart.bumpVersion(selfUuid);
}
// Create an object from existing metadata

View File

@@ -3,6 +3,7 @@ package com.usatiuk.dhfs.storage.objects.jrepository;
import com.google.protobuf.ByteString;
import com.usatiuk.dhfs.storage.SerializationHelper;
import com.usatiuk.dhfs.storage.objects.repository.distributed.ObjectMetadata;
import com.usatiuk.dhfs.storage.objects.repository.distributed.PersistentRemoteHostsService;
import com.usatiuk.dhfs.storage.objects.repository.persistence.ObjectPersistentStore;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
@@ -12,7 +13,6 @@ import io.quarkus.runtime.Startup;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import lombok.Getter;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import java.lang.ref.ReferenceQueue;
import java.lang.ref.SoftReference;
@@ -24,15 +24,15 @@ public class JObjectManagerImpl implements JObjectManager {
@Inject
ObjectPersistentStore objectPersistentStore;
@ConfigProperty(name = "dhfs.objects.distributed.selfname")
String selfname;
@Inject
JObjectResolver jObjectResolver;
@Inject
JObjectWriteback jObjectWriteback;
@Inject
PersistentRemoteHostsService persistentRemoteHostsService;
private static class NamedSoftReference extends SoftReference<JObject<?>> {
public NamedSoftReference(JObject<?> target, ReferenceQueue<JObject<?>> q) {
super(target, q);
@@ -176,7 +176,7 @@ public class JObjectManagerImpl implements JObjectManager {
return (JObject<D>) inMap;
} else {
var created = new JObject<D>(jObjectResolver, object.getName(),
object.getConflictResolver().getName(), selfname, object);
object.getConflictResolver().getName(), persistentRemoteHostsService.getSelfUuid(), object);
_map.put(object.getName(), new NamedSoftReference(created, _refQueue));
created.runWriteLockedMeta((m, d, b) -> {
jObjectResolver.notifyWrite(created);

View File

@@ -2,6 +2,7 @@ package com.usatiuk.dhfs.storage.objects.jrepository;
import com.usatiuk.dhfs.storage.SerializationHelper;
import com.usatiuk.dhfs.storage.objects.repository.distributed.InvalidationQueueService;
import com.usatiuk.dhfs.storage.objects.repository.distributed.PersistentRemoteHostsService;
import com.usatiuk.dhfs.storage.objects.repository.distributed.RemoteObjectServiceClient;
import com.usatiuk.dhfs.storage.objects.repository.persistence.ObjectPersistentStore;
import io.grpc.Status;
@@ -9,7 +10,6 @@ import io.grpc.StatusRuntimeException;
import io.quarkus.logging.Log;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import java.util.Optional;
@@ -27,8 +27,8 @@ public class JObjectResolver {
@Inject
JObjectWriteback jObjectWriteback;
@ConfigProperty(name = "dhfs.objects.distributed.selfname")
String selfname;
@Inject
PersistentRemoteHostsService persistentRemoteHostsService;
public <T extends JObjectData> Optional<T> resolveDataLocal(JObject<T> jObject) {
jObject.assertRWLock();
@@ -73,7 +73,7 @@ public class JObjectResolver {
public void bumpVersionSelf(JObject<?> self) {
self.assertRWLock();
self.runWriteLockedMeta((m, bump, invalidate) -> {
m.bumpVersion(selfname);
m.bumpVersion(persistentRemoteHostsService.getSelfUuid());
return null;
});
}

View File

@@ -1,8 +1,9 @@
package com.usatiuk.dhfs.storage.objects.repository.distributed;
import com.usatiuk.dhfs.objects.repository.distributed.ObjectHeader;
import com.usatiuk.dhfs.storage.objects.jrepository.JObject;
import java.util.UUID;
public interface ConflictResolver {
enum ConflictResolutionResult {
RESOLVED,
@@ -10,5 +11,5 @@ public interface ConflictResolver {
}
ConflictResolutionResult
resolve(String conflictHost, JObject<?> conflictSource);
resolve(UUID conflictHost, JObject<?> conflictSource);
}

View File

@@ -6,10 +6,11 @@ import lombok.Getter;
import lombok.Setter;
import java.io.Serializable;
import java.util.UUID;
@Getter
public class HostInfo implements Serializable {
private final String _name;
private final UUID _uuid;
@Setter
private String _addr;
@@ -17,10 +18,10 @@ public class HostInfo implements Serializable {
private Integer _port;
@JsonbCreator
public HostInfo(@JsonbProperty("_name") String name,
@JsonbProperty("_addr") String addr,
@JsonbProperty("_port") Integer port) {
_name = name;
public HostInfo(@JsonbProperty("uuid") String uuid,
@JsonbProperty("addr") String addr,
@JsonbProperty("port") Integer port) {
_uuid = UUID.fromString(uuid);
_addr = addr;
_port = port;
}

View File

@@ -18,7 +18,7 @@ public class InvalidationQueueService {
@Inject
RemoteObjectServiceClient remoteObjectServiceClient;
private Map<String, SequencedSet<String>> _hostToInvObj = new LinkedHashMap<>();
private Map<UUID, SequencedSet<String>> _hostToInvObj = new LinkedHashMap<>();
private Thread _senderThread;
@@ -34,13 +34,13 @@ public class InvalidationQueueService {
_senderThread.join();
}
private SequencedSet<String> getSetForHost(String host) {
private SequencedSet<String> getSetForHost(UUID host) {
synchronized (this) {
return _hostToInvObj.computeIfAbsent(host, k -> new LinkedHashSet<>());
}
}
public Map<String, SequencedSet<String>> pullAll() throws InterruptedException {
public Map<UUID, SequencedSet<String>> pullAll() throws InterruptedException {
synchronized (this) {
while (_hostToInvObj.isEmpty())
this.wait();
@@ -103,7 +103,7 @@ public class InvalidationQueueService {
}
}
public void pushInvalidationToOne(String host, String name) {
public void pushInvalidationToOne(UUID host, String name) {
synchronized (this) {
getSetForHost(host).add(name);
this.notifyAll();

View File

@@ -5,11 +5,11 @@ import com.usatiuk.dhfs.objects.repository.distributed.ObjectChangelogEntry;
import com.usatiuk.dhfs.objects.repository.distributed.ObjectHeader;
import com.usatiuk.dhfs.storage.objects.jrepository.JObjectData;
import lombok.Getter;
import org.eclipse.microprofile.config.ConfigProvider;
import java.io.Serializable;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.UUID;
public class ObjectMetadata implements Serializable {
public ObjectMetadata(String name, String conflictResolver, Class<? extends JObjectData> type) {
@@ -28,10 +28,10 @@ public class ObjectMetadata implements Serializable {
private final Class<? extends JObjectData> _type;
@Getter
private final Map<String, Long> _remoteCopies = new LinkedHashMap<>();
private final Map<UUID, Long> _remoteCopies = new LinkedHashMap<>();
@Getter
private final Map<String, Long> _changelog = new LinkedHashMap<>();
private final Map<UUID, Long> _changelog = new LinkedHashMap<>();
public Long getOurVersion() {
return _changelog.values().stream().reduce(0L, Long::sum);
@@ -42,14 +42,15 @@ public class ObjectMetadata implements Serializable {
return Math.max(getOurVersion(), _remoteCopies.values().stream().max(Long::compareTo).get());
}
public void bumpVersion(String selfname) {
_changelog.merge(selfname, 1L, Long::sum);
public void bumpVersion(UUID selfUuid) {
_changelog.merge(selfUuid, 1L, Long::sum);
}
public ObjectChangelog toRpcChangelog() {
var changelogBuilder = ObjectChangelog.newBuilder();
for (var m : getChangelog().entrySet()) {
changelogBuilder.addEntries(ObjectChangelogEntry.newBuilder().setHost(m.getKey()).setVersion(m.getValue()).build());
changelogBuilder.addEntries(ObjectChangelogEntry.newBuilder()
.setHost(m.getKey().toString()).setVersion(m.getValue()).build());
}
return changelogBuilder.build();
}

View File

@@ -4,8 +4,12 @@ import lombok.Getter;
import java.io.Serializable;
import java.util.HashMap;
import java.util.UUID;
public class PersistentRemoteHostsData implements Serializable {
@Getter
private final HashMap<String, HostInfo> _remoteHosts = new HashMap<>();
private final HashMap<UUID, HostInfo> _remoteHosts = new HashMap<>();
@Getter
private final UUID _selfUuid = UUID.randomUUID();
}

View File

@@ -14,12 +14,10 @@ import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.List;
import java.util.UUID;
@ApplicationScoped
public class PersistentRemoteHostsService {
@ConfigProperty(name = "dhfs.objects.distributed.selfname")
String selfname;
@ConfigProperty(name = "dhfs.objects.distributed.root")
String dataRoot;
@@ -27,6 +25,8 @@ public class PersistentRemoteHostsService {
private PersistentRemoteHosts _persistentData = new PersistentRemoteHosts();
private UUID _selfUuid;
void init(@Observes @Priority(300) StartupEvent event) throws IOException {
Paths.get(dataRoot).toFile().mkdirs();
Log.info("Initializing with root " + dataRoot);
@@ -34,6 +34,8 @@ public class PersistentRemoteHostsService {
Log.info("Reading hosts");
_persistentData = SerializationHelper.deserialize(Files.readAllBytes(Paths.get(dataRoot).resolve(dataFileName)));
}
_selfUuid = _persistentData.runReadLocked(PersistentRemoteHostsData::getSelfUuid);
Log.info("Self uuid is: " + _selfUuid.toString());
}
void shutdown(@Observes @Priority(300) ShutdownEvent event) throws IOException {
@@ -42,7 +44,13 @@ public class PersistentRemoteHostsService {
Log.info("Shutdown");
}
public HostInfo getInfo(String name) {
public UUID getSelfUuid() {
if (_selfUuid == null)
throw new IllegalStateException();
else return _selfUuid;
}
public HostInfo getInfo(UUID name) {
return _persistentData.runReadLocked(data -> {
return data.getRemoteHosts().get(name);
});
@@ -56,7 +64,7 @@ public class PersistentRemoteHostsService {
public void addHost(HostInfo hostInfo) {
_persistentData.runWriteLocked(d -> {
d.getRemoteHosts().put(hostInfo.getName(), hostInfo);
d.getRemoteHosts().put(hostInfo.getUuid(), hostInfo);
return null;
});
}

View File

@@ -15,7 +15,6 @@ import jakarta.annotation.Priority;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
import jakarta.inject.Inject;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import java.io.IOException;
import java.util.*;
@@ -23,9 +22,6 @@ import java.util.concurrent.TimeUnit;
@ApplicationScoped
public class RemoteHostManager {
@ConfigProperty(name = "dhfs.objects.distributed.selfname")
String selfname;
@Inject
PersistentRemoteHostsService persistentRemoteHostsService;
@@ -45,20 +41,20 @@ public class RemoteHostManager {
public void tryConnectAll() {
for (var host : persistentRemoteHostsService.getHosts()) {
var shouldTry = _transientPeersState.runReadLocked(d -> {
var s = d.getStates().get(host.getName());
var s = d.getStates().get(host.getUuid());
if (s == null) return true;
return !s.getState().equals(TransientPeersStateData.TransientPeerState.ConnectionState.REACHABLE);
});
if (shouldTry) {
Log.info("Trying to connect to " + host.getName());
Log.info("Trying to connect to " + host.getUuid());
if (reachable(host)) {
handleConnectionSuccess(host.getName());
handleConnectionSuccess(host.getUuid());
}
}
}
}
public void handleConnectionSuccess(String host) {
public void handleConnectionSuccess(UUID host) {
if (_transientPeersState.runReadLocked(d -> d.getStates().getOrDefault(
host, new TransientPeersStateData.TransientPeerState(TransientPeersStateData.TransientPeerState.ConnectionState.NOT_SEEN)
)).getState().equals(TransientPeersStateData.TransientPeerState.ConnectionState.REACHABLE)) return;
@@ -72,7 +68,7 @@ public class RemoteHostManager {
syncHandler.doInitialResync(host);
}
public void handleConnectionError(String host) {
public void handleConnectionError(UUID host) {
Log.info("Lost connection to " + host);
_transientPeersState.runWriteLocked(d -> {
d.getStates().putIfAbsent(host, new TransientPeersStateData.TransientPeerState());
@@ -108,30 +104,30 @@ public class RemoteHostManager {
private boolean reachable(HostInfo hostInfo) {
try {
return withClient(hostInfo.getAddr(), hostInfo.getPort(), Optional.of(5000L /*ms*/), c -> {
var ret = c.ping(PingRequest.newBuilder().setSelfname(selfname).build());
if (!ret.getSelfname().equals(hostInfo.getName())) {
throw new IllegalStateException("Ping selfname returned " + ret.getSelfname() + " but expected " + hostInfo.getName());
var ret = c.ping(PingRequest.newBuilder().setSelfUuid(persistentRemoteHostsService.getSelfUuid().toString()).build());
if (!UUID.fromString(ret.getSelfUuid()).equals(hostInfo.getUuid())) {
throw new IllegalStateException("Ping selfUuid returned " + ret.getSelfUuid() + " but expected " + hostInfo.getUuid());
}
return true;
});
} catch (Exception ignored) {
Log.info("Host " + hostInfo.getName() + " is unreachable: " + ignored.getMessage() + " " + ignored.getCause());
Log.info("Host " + hostInfo.getUuid() + " is unreachable: " + ignored.getMessage() + " " + ignored.getCause());
return false;
}
}
public boolean reachable(String host) {
public boolean reachable(UUID host) {
return reachable(persistentRemoteHostsService.getInfo(host));
}
public <R> R withClientAny(Collection<String> targets, ClientFunction<R> fn) {
public <R> R withClientAny(Collection<UUID> targets, ClientFunction<R> fn) {
var shuffledList = new ArrayList<>(targets);
Collections.shuffle(shuffledList);
for (String target : shuffledList) {
for (UUID target : shuffledList) {
var hostinfo = persistentRemoteHostsService.getInfo(target);
boolean shouldTry = _transientPeersState.runReadLocked(d -> {
var res = d.getStates().get(hostinfo.getName());
var res = d.getStates().get(hostinfo.getUuid());
if (res == null) return true;
return res.getState() == TransientPeersStateData.TransientPeerState.ConnectionState.REACHABLE;
});
@@ -142,26 +138,26 @@ public class RemoteHostManager {
return withClient(hostinfo.getAddr(), hostinfo.getPort(), Optional.empty(), fn);
} catch (StatusRuntimeException e) {
if (e.getStatus().equals(Status.UNAVAILABLE)) {
Log.info("Host " + hostinfo.getName() + " is unreachable: " + e.getMessage());
handleConnectionError(hostinfo.getName());
Log.info("Host " + hostinfo.getUuid() + " is unreachable: " + e.getMessage());
handleConnectionError(hostinfo.getUuid());
} else throw e;
}
}
throw new IllegalStateException("No reachable targets!");
}
public <R> R withClient(String target, ClientFunction<R> fn) {
public <R> R withClient(UUID target, ClientFunction<R> fn) {
var hostinfo = persistentRemoteHostsService.getInfo(target);
return withClient(hostinfo.getAddr(), hostinfo.getPort(), Optional.empty(), fn);
}
public List<String> getAvailableHosts() {
public List<UUID> getAvailableHosts() {
return _transientPeersState.runReadLocked(d -> d.getStates().entrySet().stream()
.filter(e -> e.getValue().getState().equals(TransientPeersStateData.TransientPeerState.ConnectionState.REACHABLE))
.map(Map.Entry::getKey).toList());
}
public List<String> getSeenHosts() {
public List<UUID> getSeenHosts() {
return _transientPeersState.runReadLocked(d -> d.getStates().entrySet().stream()
.filter(e -> !e.getValue().getState().equals(TransientPeersStateData.TransientPeerState.ConnectionState.NOT_SEEN))
.map(Map.Entry::getKey).toList());

View File

@@ -9,15 +9,15 @@ import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.commons.lang3.tuple.Pair;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import java.util.List;
import java.util.Map;
import java.util.UUID;
@ApplicationScoped
public class RemoteObjectServiceClient {
@ConfigProperty(name = "dhfs.objects.distributed.selfname")
String selfname;
@Inject
PersistentRemoteHostsService persistentRemoteHostsService;
@Inject
RemoteHostManager remoteHostManager;
@@ -25,9 +25,9 @@ public class RemoteObjectServiceClient {
@Inject
JObjectManager jObjectManager;
public Pair<ObjectHeader, ByteString> getSpecificObject(String host, String name) {
public Pair<ObjectHeader, ByteString> getSpecificObject(UUID host, String name) {
return remoteHostManager.withClient(host, client -> {
var reply = client.getObject(GetObjectRequest.newBuilder().setSelfname(selfname).setName(name).build());
var reply = client.getObject(GetObjectRequest.newBuilder().setSelfUuid(persistentRemoteHostsService.getSelfUuid().toString()).setName(name).build());
return Pair.of(reply.getObject().getHeader(), reply.getObject().getContent());
});
}
@@ -41,10 +41,10 @@ public class RemoteObjectServiceClient {
});
return remoteHostManager.withClientAny(targets, client -> {
var reply = client.getObject(GetObjectRequest.newBuilder().setSelfname(selfname).setName(jObject.getName()).build());
var reply = client.getObject(GetObjectRequest.newBuilder().setSelfUuid(persistentRemoteHostsService.getSelfUuid().toString()).setName(jObject.getName()).build());
var receivedSelfVer = reply.getObject().getHeader().getChangelog()
.getEntriesList().stream().filter(p -> p.getHost().equals(selfname))
.getEntriesList().stream().filter(p -> p.getHost().equals(persistentRemoteHostsService.getSelfUuid().toString()))
.findFirst().map(ObjectChangelogEntry::getVersion).orElse(0L);
var receivedTotalVer = reply.getObject().getHeader().getChangelog().getEntriesList()
@@ -53,7 +53,7 @@ public class RemoteObjectServiceClient {
return jObject.runWriteLockedMeta((md, b, v) -> {
var outdated =
(md.getOurVersion() > receivedTotalVer)
|| (md.getChangelog().get(selfname) > receivedSelfVer);
|| (md.getChangelog().get(persistentRemoteHostsService.getSelfUuid()) > receivedSelfVer);
if (outdated) {
Log.error("Race when trying to fetch");
@@ -64,15 +64,15 @@ public class RemoteObjectServiceClient {
});
}
public IndexUpdatePush getIndex(String host) {
public IndexUpdatePush getIndex(UUID host) {
return remoteHostManager.withClient(host, client -> {
var req = GetIndexRequest.newBuilder().setSelfname(selfname).build();
var req = GetIndexRequest.newBuilder().setSelfUuid(persistentRemoteHostsService.getSelfUuid().toString()).build();
return client.getIndex(req);
});
}
public List<IndexUpdateError> notifyUpdate(String host, List<String> names) {
var builder = IndexUpdatePush.newBuilder().setSelfname(selfname);
public List<IndexUpdateError> notifyUpdate(UUID host, List<String> names) {
var builder = IndexUpdatePush.newBuilder().setSelfUuid(persistentRemoteHostsService.getSelfUuid().toString());
for (var v : names) {
var obj = jObjectManager.get(v);
if (obj.isEmpty()) continue;

View File

@@ -12,14 +12,12 @@ import io.smallrye.common.annotation.Blocking;
import io.smallrye.mutiny.Uni;
import jakarta.inject.Inject;
import org.apache.commons.lang3.tuple.Pair;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import java.util.UUID;
// Note: RunOnVirtualThread hangs somehow
@GrpcService
public class RemoteObjectServiceServer implements DhfsObjectSyncGrpc {
@ConfigProperty(name = "dhfs.objects.distributed.selfname")
String selfname;
@Inject
SyncHandler syncHandler;
@@ -29,12 +27,15 @@ public class RemoteObjectServiceServer implements DhfsObjectSyncGrpc {
@Inject
RemoteHostManager remoteHostManager;
@Inject
PersistentRemoteHostsService persistentRemoteHostsService;
@Override
@Blocking
public Uni<GetObjectReply> getObject(GetObjectRequest request) {
if (request.getSelfname().isBlank()) throw new StatusRuntimeException(Status.INVALID_ARGUMENT);
if (request.getSelfUuid().isBlank()) throw new StatusRuntimeException(Status.INVALID_ARGUMENT);
remoteHostManager.handleConnectionSuccess(request.getSelfname());
remoteHostManager.handleConnectionSuccess(UUID.fromString(request.getSelfUuid()));
Log.info("<-- getObject: " + request.getName());
@@ -48,11 +49,11 @@ public class RemoteObjectServiceServer implements DhfsObjectSyncGrpc {
@Override
@Blocking
public Uni<IndexUpdatePush> getIndex(GetIndexRequest request) {
if (request.getSelfname().isBlank()) throw new StatusRuntimeException(Status.INVALID_ARGUMENT);
remoteHostManager.handleConnectionSuccess(request.getSelfname());
if (request.getSelfUuid().isBlank()) throw new StatusRuntimeException(Status.INVALID_ARGUMENT);
remoteHostManager.handleConnectionSuccess(UUID.fromString(request.getSelfUuid()));
Log.info("<-- getIndex: ");
var builder = IndexUpdatePush.newBuilder().setSelfname(selfname);
var builder = IndexUpdatePush.newBuilder().setSelfUuid(persistentRemoteHostsService.getSelfUuid().toString());
var objs = jObjectManager.find("");
@@ -68,8 +69,8 @@ public class RemoteObjectServiceServer implements DhfsObjectSyncGrpc {
@Override
@Blocking
public Uni<IndexUpdateReply> indexUpdate(IndexUpdatePush request) {
if (request.getSelfname().isBlank()) throw new StatusRuntimeException(Status.INVALID_ARGUMENT);
remoteHostManager.handleConnectionSuccess(request.getSelfname());
if (request.getSelfUuid().isBlank()) throw new StatusRuntimeException(Status.INVALID_ARGUMENT);
remoteHostManager.handleConnectionSuccess(UUID.fromString(request.getSelfUuid()));
// Log.info("<-- indexUpdate: " + request.getHeader().getName());
return Uni.createFrom().item(syncHandler.handleRemoteUpdate(request));
@@ -78,9 +79,9 @@ public class RemoteObjectServiceServer implements DhfsObjectSyncGrpc {
@Override
@Blocking
public Uni<PingReply> ping(PingRequest request) {
if (request.getSelfname().isBlank()) throw new StatusRuntimeException(Status.INVALID_ARGUMENT);
remoteHostManager.handleConnectionSuccess(request.getSelfname());
if (request.getSelfUuid().isBlank()) throw new StatusRuntimeException(Status.INVALID_ARGUMENT);
remoteHostManager.handleConnectionSuccess(UUID.fromString(request.getSelfUuid()));
return Uni.createFrom().item(PingReply.newBuilder().setSelfname(selfname).build());
return Uni.createFrom().item(PingReply.newBuilder().setSelfUuid(persistentRemoteHostsService.getSelfUuid().toString()).build());
}
}

View File

@@ -11,15 +11,12 @@ import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Instance;
import jakarta.inject.Inject;
import org.apache.commons.lang3.NotImplementedException;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import java.util.Objects;
import java.util.UUID;
@ApplicationScoped
public class SyncHandler {
@ConfigProperty(name = "dhfs.objects.distributed.selfname")
String selfname;
@Inject
JObjectManager jObjectManager;
@@ -35,7 +32,10 @@ public class SyncHandler {
@Inject
Instance<ConflictResolver> conflictResolvers;
public void doInitialResync(String host) {
@Inject
PersistentRemoteHostsService persistentRemoteHostsService;
public void doInitialResync(UUID host) {
var got = remoteObjectServiceClient.getIndex(host);
handleRemoteUpdate(got);
// Push our index to the other peer too, as they might not request it if
@@ -50,7 +50,7 @@ public class SyncHandler {
}
}
private void handleOneUpdate(String from, ObjectHeader header) {
private void handleOneUpdate(UUID from, ObjectHeader header) {
JObject<?> found;
try {
found = jObjectManager.getOrPut(header.getName(), new ObjectMetadata(
@@ -63,7 +63,7 @@ public class SyncHandler {
}
var receivedSelfVer = header.getChangelog()
.getEntriesList().stream().filter(p -> p.getHost().equals(selfname))
.getEntriesList().stream().filter(p -> p.getHost().equals(persistentRemoteHostsService.getSelfUuid().toString()))
.findFirst().map(ObjectChangelogEntry::getVersion).orElse(0L);
var receivedTotalVer = header.getChangelog().getEntriesList()
@@ -76,13 +76,14 @@ public class SyncHandler {
return false;
}
if (md.getChangelog().getOrDefault(selfname, 0L) > receivedSelfVer) return true;
if (md.getChangelog().getOrDefault(persistentRemoteHostsService.getSelfUuid(), 0L) > receivedSelfVer)
return true;
md.getRemoteCopies().put(from, receivedTotalVer);
if (Objects.equals(md.getOurVersion(), receivedTotalVer)) {
for (var e : header.getChangelog().getEntriesList()) {
if (!Objects.equals(md.getChangelog().getOrDefault(e.getHost(), 0L),
if (!Objects.equals(md.getChangelog().getOrDefault(UUID.fromString(e.getHost()), 0L),
e.getVersion())) return true;
}
}
@@ -101,9 +102,9 @@ public class SyncHandler {
md.getChangelog().clear();
for (var entry : header.getChangelog().getEntriesList()) {
md.getChangelog().put(entry.getHost(), entry.getVersion());
md.getChangelog().put(UUID.fromString(entry.getHost()), entry.getVersion());
}
md.getChangelog().putIfAbsent(selfname, 0L);
md.getChangelog().putIfAbsent(persistentRemoteHostsService.getSelfUuid(), 0L);
return false;
});
@@ -121,11 +122,11 @@ public class SyncHandler {
}
public IndexUpdateReply handleRemoteUpdate(IndexUpdatePush request) {
var builder = IndexUpdateReply.newBuilder().setSelfname(selfname);
var builder = IndexUpdateReply.newBuilder().setSelfUuid(persistentRemoteHostsService.getSelfUuid().toString());
for (var u : request.getHeaderList()) {
try {
handleOneUpdate(request.getSelfname(), u);
handleOneUpdate(UUID.fromString(request.getSelfUuid()), u);
} catch (Exception ex) {
builder.addErrors(IndexUpdateError.newBuilder().setObjectName(u.getName()).setError(ex.toString()).build());
}

View File

@@ -7,6 +7,7 @@ import lombok.Setter;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.UUID;
public class TransientPeersStateData {
@@ -25,5 +26,5 @@ public class TransientPeersStateData {
}
@Getter
private final Map<String, TransientPeerState> _states = new LinkedHashMap<>();
private final Map<UUID, TransientPeerState> _states = new LinkedHashMap<>();
}

View File

@@ -78,7 +78,6 @@ public class FileObjectPersistentStore implements ObjectPersistentStore {
}
}
@Nonnull
@Override
public void writeObject(String name, ByteString data) {
var file = Path.of(root, name);
@@ -99,7 +98,6 @@ public class FileObjectPersistentStore implements ObjectPersistentStore {
}
}
@Nonnull
@Override
public void deleteObject(String name) {
var file = Path.of(root, name);

View File

@@ -13,8 +13,6 @@ public interface ObjectPersistentStore {
@Nonnull
ByteString readObject(String name);
@Nonnull
void writeObject(String name, ByteString data);
@Nonnull
void deleteObject(String name);
}

View File

@@ -15,11 +15,11 @@ service DhfsObjectSyncGrpc {
}
message PingRequest {
string selfname = 1;
string selfUuid = 1;
}
message PingReply {
string selfname = 1;
string selfUuid = 1;
}
message ObjectChangelogEntry {
@@ -44,23 +44,23 @@ message ApiObject {
}
message GetObjectRequest {
string selfname = 10;
string selfUuid = 10;
string name = 2;
}
message GetObjectReply {
string selfname = 10;
string selfUuid = 10;
ApiObject object = 1;
}
message GetIndexRequest {
string selfname = 10;
string selfUuid = 10;
}
message IndexUpdatePush {
string selfname = 10;
string selfUuid = 10;
repeated ObjectHeader header = 1;
}
@@ -71,7 +71,7 @@ message IndexUpdateError {
}
message IndexUpdateReply {
string selfname = 10;
string selfUuid = 10;
repeated IndexUpdateError errors = 1;
}

View File

@@ -2,5 +2,4 @@ quarkus.grpc.server.use-separate-server=false
dhfs.objects.persistence.files.root=${HOME}/dhfs_data/dhfs_root
dhfs.objects.distributed.root=${HOME}/dhfs_data/dhfs_root_d
dhfs.fuse.root=${HOME}/dhfs_data/dhfs_fuse_root
dhfs.storage.files.target_chunk_size=1048576
dhfs.objects.distributed.selfname=thing1
dhfs.storage.files.target_chunk_size=1048576