diff --git a/server/src/main/java/com/usatiuk/dhfs/storage/files/objects/DirectoryConflictResolver.java b/server/src/main/java/com/usatiuk/dhfs/storage/files/objects/DirectoryConflictResolver.java index 5b0300d5..7fb4e7c5 100644 --- a/server/src/main/java/com/usatiuk/dhfs/storage/files/objects/DirectoryConflictResolver.java +++ b/server/src/main/java/com/usatiuk/dhfs/storage/files/objects/DirectoryConflictResolver.java @@ -4,12 +4,12 @@ import com.usatiuk.dhfs.storage.SerializationHelper; import com.usatiuk.dhfs.storage.objects.jrepository.JObject; import com.usatiuk.dhfs.storage.objects.repository.distributed.ConflictResolver; import com.usatiuk.dhfs.storage.objects.repository.distributed.ObjectMetadata; +import com.usatiuk.dhfs.storage.objects.repository.distributed.PersistentRemoteHostsService; import com.usatiuk.dhfs.storage.objects.repository.distributed.RemoteObjectServiceClient; import io.quarkus.logging.Log; import jakarta.enterprise.context.ApplicationScoped; import jakarta.inject.Inject; import org.apache.commons.lang3.NotImplementedException; -import org.eclipse.microprofile.config.inject.ConfigProperty; import java.util.LinkedHashMap; import java.util.Objects; @@ -17,14 +17,14 @@ import java.util.UUID; @ApplicationScoped public class DirectoryConflictResolver implements ConflictResolver { - @ConfigProperty(name = "dhfs.objects.distributed.selfname") - String selfname; + @Inject + PersistentRemoteHostsService persistentRemoteHostsService; @Inject RemoteObjectServiceClient remoteObjectServiceClient; @Override - public ConflictResolutionResult resolve(String conflictHost, JObject ours) { + public ConflictResolutionResult resolve(UUID conflictHost, JObject ours) { var theirsData = remoteObjectServiceClient.getSpecificObject(conflictHost, ours.getName()); if (!ours.isOf(Directory.class)) @@ -52,7 +52,7 @@ public class DirectoryConflictResolver implements ConflictResolver { Directory first; Directory second; - String otherHostname; + UUID otherHostname; if (oursDir.getMtime() >= theirsDir.getMtime()) { first = oursDir; second = theirsDir; @@ -60,7 +60,7 @@ public class DirectoryConflictResolver implements ConflictResolver { } else { second = oursDir; first = theirsDir; - otherHostname = selfname; + otherHostname = persistentRemoteHostsService.getSelfUuid(); } mergedChildren.putAll(first.getChildren()); @@ -85,15 +85,15 @@ public class DirectoryConflictResolver implements ConflictResolver { newMetadata = new ObjectMetadata(ours.getName(), oursHeader.getConflictResolver(), m.getType()); for (var entry : oursHeader.getChangelog().getEntriesList()) { - newMetadata.getChangelog().put(entry.getHost(), entry.getVersion()); + newMetadata.getChangelog().put(UUID.fromString(entry.getHost()), entry.getVersion()); } for (var entry : theirsHeader.getChangelog().getEntriesList()) { - newMetadata.getChangelog().merge(entry.getHost(), entry.getVersion(), Long::max); + newMetadata.getChangelog().merge(UUID.fromString(entry.getHost()), entry.getVersion(), Long::max); } boolean wasChanged = mergedChildren.size() != first.getChildren().size(); if (wasChanged) { - newMetadata.getChangelog().merge(selfname, 1L, Long::sum); + newMetadata.getChangelog().merge(persistentRemoteHostsService.getSelfUuid(), 1L, Long::sum); } newMtime = first.getMtime(); diff --git a/server/src/main/java/com/usatiuk/dhfs/storage/files/objects/NoOpConflictResolver.java b/server/src/main/java/com/usatiuk/dhfs/storage/files/objects/NoOpConflictResolver.java index 6526f128..92a12758 100644 --- a/server/src/main/java/com/usatiuk/dhfs/storage/files/objects/NoOpConflictResolver.java +++ b/server/src/main/java/com/usatiuk/dhfs/storage/files/objects/NoOpConflictResolver.java @@ -4,10 +4,12 @@ import com.usatiuk.dhfs.storage.objects.jrepository.JObject; import com.usatiuk.dhfs.storage.objects.repository.distributed.ConflictResolver; import jakarta.enterprise.context.ApplicationScoped; +import java.util.UUID; + @ApplicationScoped public class NoOpConflictResolver implements ConflictResolver { @Override - public ConflictResolutionResult resolve(String conflictHost, JObject conflictSource) { + public ConflictResolutionResult resolve(UUID conflictHost, JObject conflictSource) { // Maybe check types? return ConflictResolutionResult.RESOLVED; } diff --git a/server/src/main/java/com/usatiuk/dhfs/storage/files/objects/NotImplementedConflictResolver.java b/server/src/main/java/com/usatiuk/dhfs/storage/files/objects/NotImplementedConflictResolver.java index 8352f668..08a18887 100644 --- a/server/src/main/java/com/usatiuk/dhfs/storage/files/objects/NotImplementedConflictResolver.java +++ b/server/src/main/java/com/usatiuk/dhfs/storage/files/objects/NotImplementedConflictResolver.java @@ -5,10 +5,12 @@ import com.usatiuk.dhfs.storage.objects.repository.distributed.ConflictResolver; import jakarta.enterprise.context.ApplicationScoped; import org.apache.commons.lang3.NotImplementedException; +import java.util.UUID; + @ApplicationScoped public class NotImplementedConflictResolver implements ConflictResolver { @Override - public ConflictResolutionResult resolve(String conflictHost, JObject conflictSource) { + public ConflictResolutionResult resolve(UUID conflictHost, JObject conflictSource) { throw new NotImplementedException(); } } diff --git a/server/src/main/java/com/usatiuk/dhfs/storage/objects/jrepository/JObject.java b/server/src/main/java/com/usatiuk/dhfs/storage/objects/jrepository/JObject.java index bcb7b468..c49b0eb4 100644 --- a/server/src/main/java/com/usatiuk/dhfs/storage/objects/jrepository/JObject.java +++ b/server/src/main/java/com/usatiuk/dhfs/storage/objects/jrepository/JObject.java @@ -8,18 +8,19 @@ import org.apache.commons.lang3.NotImplementedException; import java.io.Serializable; import java.util.Objects; +import java.util.UUID; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReentrantReadWriteLock; public class JObject implements Serializable { // Create a new object - protected JObject(JObjectResolver resolver, String name, String conflictResolver, String selfname, T obj) { + protected JObject(JObjectResolver resolver, String name, String conflictResolver, UUID selfUuid, T obj) { _resolver = resolver; _metaPart = new ObjectMetadata(name, conflictResolver, obj.getClass()); _dataPart.set(obj); // FIXME:? if (!obj.assumeUnique()) - _metaPart.bumpVersion(selfname); + _metaPart.bumpVersion(selfUuid); } // Create an object from existing metadata diff --git a/server/src/main/java/com/usatiuk/dhfs/storage/objects/jrepository/JObjectManagerImpl.java b/server/src/main/java/com/usatiuk/dhfs/storage/objects/jrepository/JObjectManagerImpl.java index 52969be8..558ffd45 100644 --- a/server/src/main/java/com/usatiuk/dhfs/storage/objects/jrepository/JObjectManagerImpl.java +++ b/server/src/main/java/com/usatiuk/dhfs/storage/objects/jrepository/JObjectManagerImpl.java @@ -3,6 +3,7 @@ package com.usatiuk.dhfs.storage.objects.jrepository; import com.google.protobuf.ByteString; import com.usatiuk.dhfs.storage.SerializationHelper; import com.usatiuk.dhfs.storage.objects.repository.distributed.ObjectMetadata; +import com.usatiuk.dhfs.storage.objects.repository.distributed.PersistentRemoteHostsService; import com.usatiuk.dhfs.storage.objects.repository.persistence.ObjectPersistentStore; import io.grpc.Status; import io.grpc.StatusRuntimeException; @@ -12,7 +13,6 @@ import io.quarkus.runtime.Startup; import jakarta.enterprise.context.ApplicationScoped; import jakarta.inject.Inject; import lombok.Getter; -import org.eclipse.microprofile.config.inject.ConfigProperty; import java.lang.ref.ReferenceQueue; import java.lang.ref.SoftReference; @@ -24,15 +24,15 @@ public class JObjectManagerImpl implements JObjectManager { @Inject ObjectPersistentStore objectPersistentStore; - @ConfigProperty(name = "dhfs.objects.distributed.selfname") - String selfname; - @Inject JObjectResolver jObjectResolver; @Inject JObjectWriteback jObjectWriteback; + @Inject + PersistentRemoteHostsService persistentRemoteHostsService; + private static class NamedSoftReference extends SoftReference> { public NamedSoftReference(JObject target, ReferenceQueue> q) { super(target, q); @@ -176,7 +176,7 @@ public class JObjectManagerImpl implements JObjectManager { return (JObject) inMap; } else { var created = new JObject(jObjectResolver, object.getName(), - object.getConflictResolver().getName(), selfname, object); + object.getConflictResolver().getName(), persistentRemoteHostsService.getSelfUuid(), object); _map.put(object.getName(), new NamedSoftReference(created, _refQueue)); created.runWriteLockedMeta((m, d, b) -> { jObjectResolver.notifyWrite(created); diff --git a/server/src/main/java/com/usatiuk/dhfs/storage/objects/jrepository/JObjectResolver.java b/server/src/main/java/com/usatiuk/dhfs/storage/objects/jrepository/JObjectResolver.java index 07008012..bad65ebd 100644 --- a/server/src/main/java/com/usatiuk/dhfs/storage/objects/jrepository/JObjectResolver.java +++ b/server/src/main/java/com/usatiuk/dhfs/storage/objects/jrepository/JObjectResolver.java @@ -2,6 +2,7 @@ package com.usatiuk.dhfs.storage.objects.jrepository; import com.usatiuk.dhfs.storage.SerializationHelper; import com.usatiuk.dhfs.storage.objects.repository.distributed.InvalidationQueueService; +import com.usatiuk.dhfs.storage.objects.repository.distributed.PersistentRemoteHostsService; import com.usatiuk.dhfs.storage.objects.repository.distributed.RemoteObjectServiceClient; import com.usatiuk.dhfs.storage.objects.repository.persistence.ObjectPersistentStore; import io.grpc.Status; @@ -9,7 +10,6 @@ import io.grpc.StatusRuntimeException; import io.quarkus.logging.Log; import jakarta.inject.Inject; import jakarta.inject.Singleton; -import org.eclipse.microprofile.config.inject.ConfigProperty; import java.util.Optional; @@ -27,8 +27,8 @@ public class JObjectResolver { @Inject JObjectWriteback jObjectWriteback; - @ConfigProperty(name = "dhfs.objects.distributed.selfname") - String selfname; + @Inject + PersistentRemoteHostsService persistentRemoteHostsService; public Optional resolveDataLocal(JObject jObject) { jObject.assertRWLock(); @@ -73,7 +73,7 @@ public class JObjectResolver { public void bumpVersionSelf(JObject self) { self.assertRWLock(); self.runWriteLockedMeta((m, bump, invalidate) -> { - m.bumpVersion(selfname); + m.bumpVersion(persistentRemoteHostsService.getSelfUuid()); return null; }); } diff --git a/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/ConflictResolver.java b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/ConflictResolver.java index b0555cf3..1c439c13 100644 --- a/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/ConflictResolver.java +++ b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/ConflictResolver.java @@ -1,8 +1,9 @@ package com.usatiuk.dhfs.storage.objects.repository.distributed; -import com.usatiuk.dhfs.objects.repository.distributed.ObjectHeader; import com.usatiuk.dhfs.storage.objects.jrepository.JObject; +import java.util.UUID; + public interface ConflictResolver { enum ConflictResolutionResult { RESOLVED, @@ -10,5 +11,5 @@ public interface ConflictResolver { } ConflictResolutionResult - resolve(String conflictHost, JObject conflictSource); + resolve(UUID conflictHost, JObject conflictSource); } diff --git a/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/HostInfo.java b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/HostInfo.java index 5b048b61..b0a7efda 100644 --- a/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/HostInfo.java +++ b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/HostInfo.java @@ -6,10 +6,11 @@ import lombok.Getter; import lombok.Setter; import java.io.Serializable; +import java.util.UUID; @Getter public class HostInfo implements Serializable { - private final String _name; + private final UUID _uuid; @Setter private String _addr; @@ -17,10 +18,10 @@ public class HostInfo implements Serializable { private Integer _port; @JsonbCreator - public HostInfo(@JsonbProperty("_name") String name, - @JsonbProperty("_addr") String addr, - @JsonbProperty("_port") Integer port) { - _name = name; + public HostInfo(@JsonbProperty("uuid") String uuid, + @JsonbProperty("addr") String addr, + @JsonbProperty("port") Integer port) { + _uuid = UUID.fromString(uuid); _addr = addr; _port = port; } diff --git a/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/InvalidationQueueService.java b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/InvalidationQueueService.java index 80c069c2..fad80eaa 100644 --- a/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/InvalidationQueueService.java +++ b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/InvalidationQueueService.java @@ -18,7 +18,7 @@ public class InvalidationQueueService { @Inject RemoteObjectServiceClient remoteObjectServiceClient; - private Map> _hostToInvObj = new LinkedHashMap<>(); + private Map> _hostToInvObj = new LinkedHashMap<>(); private Thread _senderThread; @@ -34,13 +34,13 @@ public class InvalidationQueueService { _senderThread.join(); } - private SequencedSet getSetForHost(String host) { + private SequencedSet getSetForHost(UUID host) { synchronized (this) { return _hostToInvObj.computeIfAbsent(host, k -> new LinkedHashSet<>()); } } - public Map> pullAll() throws InterruptedException { + public Map> pullAll() throws InterruptedException { synchronized (this) { while (_hostToInvObj.isEmpty()) this.wait(); @@ -103,7 +103,7 @@ public class InvalidationQueueService { } } - public void pushInvalidationToOne(String host, String name) { + public void pushInvalidationToOne(UUID host, String name) { synchronized (this) { getSetForHost(host).add(name); this.notifyAll(); diff --git a/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/ObjectMetadata.java b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/ObjectMetadata.java index a05310ae..04649e2b 100644 --- a/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/ObjectMetadata.java +++ b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/ObjectMetadata.java @@ -5,11 +5,11 @@ import com.usatiuk.dhfs.objects.repository.distributed.ObjectChangelogEntry; import com.usatiuk.dhfs.objects.repository.distributed.ObjectHeader; import com.usatiuk.dhfs.storage.objects.jrepository.JObjectData; import lombok.Getter; -import org.eclipse.microprofile.config.ConfigProvider; import java.io.Serializable; import java.util.LinkedHashMap; import java.util.Map; +import java.util.UUID; public class ObjectMetadata implements Serializable { public ObjectMetadata(String name, String conflictResolver, Class type) { @@ -28,10 +28,10 @@ public class ObjectMetadata implements Serializable { private final Class _type; @Getter - private final Map _remoteCopies = new LinkedHashMap<>(); + private final Map _remoteCopies = new LinkedHashMap<>(); @Getter - private final Map _changelog = new LinkedHashMap<>(); + private final Map _changelog = new LinkedHashMap<>(); public Long getOurVersion() { return _changelog.values().stream().reduce(0L, Long::sum); @@ -42,14 +42,15 @@ public class ObjectMetadata implements Serializable { return Math.max(getOurVersion(), _remoteCopies.values().stream().max(Long::compareTo).get()); } - public void bumpVersion(String selfname) { - _changelog.merge(selfname, 1L, Long::sum); + public void bumpVersion(UUID selfUuid) { + _changelog.merge(selfUuid, 1L, Long::sum); } public ObjectChangelog toRpcChangelog() { var changelogBuilder = ObjectChangelog.newBuilder(); for (var m : getChangelog().entrySet()) { - changelogBuilder.addEntries(ObjectChangelogEntry.newBuilder().setHost(m.getKey()).setVersion(m.getValue()).build()); + changelogBuilder.addEntries(ObjectChangelogEntry.newBuilder() + .setHost(m.getKey().toString()).setVersion(m.getValue()).build()); } return changelogBuilder.build(); } diff --git a/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/PersistentRemoteHostsData.java b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/PersistentRemoteHostsData.java index 4d3d5559..9b1d7117 100644 --- a/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/PersistentRemoteHostsData.java +++ b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/PersistentRemoteHostsData.java @@ -4,8 +4,12 @@ import lombok.Getter; import java.io.Serializable; import java.util.HashMap; +import java.util.UUID; public class PersistentRemoteHostsData implements Serializable { @Getter - private final HashMap _remoteHosts = new HashMap<>(); + private final HashMap _remoteHosts = new HashMap<>(); + + @Getter + private final UUID _selfUuid = UUID.randomUUID(); } diff --git a/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/PersistentRemoteHostsService.java b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/PersistentRemoteHostsService.java index 4f171f9b..d4152bb8 100644 --- a/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/PersistentRemoteHostsService.java +++ b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/PersistentRemoteHostsService.java @@ -14,12 +14,10 @@ import java.io.IOException; import java.nio.file.Files; import java.nio.file.Paths; import java.util.List; +import java.util.UUID; @ApplicationScoped public class PersistentRemoteHostsService { - @ConfigProperty(name = "dhfs.objects.distributed.selfname") - String selfname; - @ConfigProperty(name = "dhfs.objects.distributed.root") String dataRoot; @@ -27,6 +25,8 @@ public class PersistentRemoteHostsService { private PersistentRemoteHosts _persistentData = new PersistentRemoteHosts(); + private UUID _selfUuid; + void init(@Observes @Priority(300) StartupEvent event) throws IOException { Paths.get(dataRoot).toFile().mkdirs(); Log.info("Initializing with root " + dataRoot); @@ -34,6 +34,8 @@ public class PersistentRemoteHostsService { Log.info("Reading hosts"); _persistentData = SerializationHelper.deserialize(Files.readAllBytes(Paths.get(dataRoot).resolve(dataFileName))); } + _selfUuid = _persistentData.runReadLocked(PersistentRemoteHostsData::getSelfUuid); + Log.info("Self uuid is: " + _selfUuid.toString()); } void shutdown(@Observes @Priority(300) ShutdownEvent event) throws IOException { @@ -42,7 +44,13 @@ public class PersistentRemoteHostsService { Log.info("Shutdown"); } - public HostInfo getInfo(String name) { + public UUID getSelfUuid() { + if (_selfUuid == null) + throw new IllegalStateException(); + else return _selfUuid; + } + + public HostInfo getInfo(UUID name) { return _persistentData.runReadLocked(data -> { return data.getRemoteHosts().get(name); }); @@ -56,7 +64,7 @@ public class PersistentRemoteHostsService { public void addHost(HostInfo hostInfo) { _persistentData.runWriteLocked(d -> { - d.getRemoteHosts().put(hostInfo.getName(), hostInfo); + d.getRemoteHosts().put(hostInfo.getUuid(), hostInfo); return null; }); } diff --git a/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/RemoteHostManager.java b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/RemoteHostManager.java index 2d630166..70b14023 100644 --- a/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/RemoteHostManager.java +++ b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/RemoteHostManager.java @@ -15,7 +15,6 @@ import jakarta.annotation.Priority; import jakarta.enterprise.context.ApplicationScoped; import jakarta.enterprise.event.Observes; import jakarta.inject.Inject; -import org.eclipse.microprofile.config.inject.ConfigProperty; import java.io.IOException; import java.util.*; @@ -23,9 +22,6 @@ import java.util.concurrent.TimeUnit; @ApplicationScoped public class RemoteHostManager { - @ConfigProperty(name = "dhfs.objects.distributed.selfname") - String selfname; - @Inject PersistentRemoteHostsService persistentRemoteHostsService; @@ -45,20 +41,20 @@ public class RemoteHostManager { public void tryConnectAll() { for (var host : persistentRemoteHostsService.getHosts()) { var shouldTry = _transientPeersState.runReadLocked(d -> { - var s = d.getStates().get(host.getName()); + var s = d.getStates().get(host.getUuid()); if (s == null) return true; return !s.getState().equals(TransientPeersStateData.TransientPeerState.ConnectionState.REACHABLE); }); if (shouldTry) { - Log.info("Trying to connect to " + host.getName()); + Log.info("Trying to connect to " + host.getUuid()); if (reachable(host)) { - handleConnectionSuccess(host.getName()); + handleConnectionSuccess(host.getUuid()); } } } } - public void handleConnectionSuccess(String host) { + public void handleConnectionSuccess(UUID host) { if (_transientPeersState.runReadLocked(d -> d.getStates().getOrDefault( host, new TransientPeersStateData.TransientPeerState(TransientPeersStateData.TransientPeerState.ConnectionState.NOT_SEEN) )).getState().equals(TransientPeersStateData.TransientPeerState.ConnectionState.REACHABLE)) return; @@ -72,7 +68,7 @@ public class RemoteHostManager { syncHandler.doInitialResync(host); } - public void handleConnectionError(String host) { + public void handleConnectionError(UUID host) { Log.info("Lost connection to " + host); _transientPeersState.runWriteLocked(d -> { d.getStates().putIfAbsent(host, new TransientPeersStateData.TransientPeerState()); @@ -108,30 +104,30 @@ public class RemoteHostManager { private boolean reachable(HostInfo hostInfo) { try { return withClient(hostInfo.getAddr(), hostInfo.getPort(), Optional.of(5000L /*ms*/), c -> { - var ret = c.ping(PingRequest.newBuilder().setSelfname(selfname).build()); - if (!ret.getSelfname().equals(hostInfo.getName())) { - throw new IllegalStateException("Ping selfname returned " + ret.getSelfname() + " but expected " + hostInfo.getName()); + var ret = c.ping(PingRequest.newBuilder().setSelfUuid(persistentRemoteHostsService.getSelfUuid().toString()).build()); + if (!UUID.fromString(ret.getSelfUuid()).equals(hostInfo.getUuid())) { + throw new IllegalStateException("Ping selfUuid returned " + ret.getSelfUuid() + " but expected " + hostInfo.getUuid()); } return true; }); } catch (Exception ignored) { - Log.info("Host " + hostInfo.getName() + " is unreachable: " + ignored.getMessage() + " " + ignored.getCause()); + Log.info("Host " + hostInfo.getUuid() + " is unreachable: " + ignored.getMessage() + " " + ignored.getCause()); return false; } } - public boolean reachable(String host) { + public boolean reachable(UUID host) { return reachable(persistentRemoteHostsService.getInfo(host)); } - public R withClientAny(Collection targets, ClientFunction fn) { + public R withClientAny(Collection targets, ClientFunction fn) { var shuffledList = new ArrayList<>(targets); Collections.shuffle(shuffledList); - for (String target : shuffledList) { + for (UUID target : shuffledList) { var hostinfo = persistentRemoteHostsService.getInfo(target); boolean shouldTry = _transientPeersState.runReadLocked(d -> { - var res = d.getStates().get(hostinfo.getName()); + var res = d.getStates().get(hostinfo.getUuid()); if (res == null) return true; return res.getState() == TransientPeersStateData.TransientPeerState.ConnectionState.REACHABLE; }); @@ -142,26 +138,26 @@ public class RemoteHostManager { return withClient(hostinfo.getAddr(), hostinfo.getPort(), Optional.empty(), fn); } catch (StatusRuntimeException e) { if (e.getStatus().equals(Status.UNAVAILABLE)) { - Log.info("Host " + hostinfo.getName() + " is unreachable: " + e.getMessage()); - handleConnectionError(hostinfo.getName()); + Log.info("Host " + hostinfo.getUuid() + " is unreachable: " + e.getMessage()); + handleConnectionError(hostinfo.getUuid()); } else throw e; } } throw new IllegalStateException("No reachable targets!"); } - public R withClient(String target, ClientFunction fn) { + public R withClient(UUID target, ClientFunction fn) { var hostinfo = persistentRemoteHostsService.getInfo(target); return withClient(hostinfo.getAddr(), hostinfo.getPort(), Optional.empty(), fn); } - public List getAvailableHosts() { + public List getAvailableHosts() { return _transientPeersState.runReadLocked(d -> d.getStates().entrySet().stream() .filter(e -> e.getValue().getState().equals(TransientPeersStateData.TransientPeerState.ConnectionState.REACHABLE)) .map(Map.Entry::getKey).toList()); } - public List getSeenHosts() { + public List getSeenHosts() { return _transientPeersState.runReadLocked(d -> d.getStates().entrySet().stream() .filter(e -> !e.getValue().getState().equals(TransientPeersStateData.TransientPeerState.ConnectionState.NOT_SEEN)) .map(Map.Entry::getKey).toList()); diff --git a/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/RemoteObjectServiceClient.java b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/RemoteObjectServiceClient.java index adb0ba34..b2d638b4 100644 --- a/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/RemoteObjectServiceClient.java +++ b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/RemoteObjectServiceClient.java @@ -9,15 +9,15 @@ import jakarta.enterprise.context.ApplicationScoped; import jakarta.inject.Inject; import org.apache.commons.lang3.NotImplementedException; import org.apache.commons.lang3.tuple.Pair; -import org.eclipse.microprofile.config.inject.ConfigProperty; import java.util.List; import java.util.Map; +import java.util.UUID; @ApplicationScoped public class RemoteObjectServiceClient { - @ConfigProperty(name = "dhfs.objects.distributed.selfname") - String selfname; + @Inject + PersistentRemoteHostsService persistentRemoteHostsService; @Inject RemoteHostManager remoteHostManager; @@ -25,9 +25,9 @@ public class RemoteObjectServiceClient { @Inject JObjectManager jObjectManager; - public Pair getSpecificObject(String host, String name) { + public Pair getSpecificObject(UUID host, String name) { return remoteHostManager.withClient(host, client -> { - var reply = client.getObject(GetObjectRequest.newBuilder().setSelfname(selfname).setName(name).build()); + var reply = client.getObject(GetObjectRequest.newBuilder().setSelfUuid(persistentRemoteHostsService.getSelfUuid().toString()).setName(name).build()); return Pair.of(reply.getObject().getHeader(), reply.getObject().getContent()); }); } @@ -41,10 +41,10 @@ public class RemoteObjectServiceClient { }); return remoteHostManager.withClientAny(targets, client -> { - var reply = client.getObject(GetObjectRequest.newBuilder().setSelfname(selfname).setName(jObject.getName()).build()); + var reply = client.getObject(GetObjectRequest.newBuilder().setSelfUuid(persistentRemoteHostsService.getSelfUuid().toString()).setName(jObject.getName()).build()); var receivedSelfVer = reply.getObject().getHeader().getChangelog() - .getEntriesList().stream().filter(p -> p.getHost().equals(selfname)) + .getEntriesList().stream().filter(p -> p.getHost().equals(persistentRemoteHostsService.getSelfUuid().toString())) .findFirst().map(ObjectChangelogEntry::getVersion).orElse(0L); var receivedTotalVer = reply.getObject().getHeader().getChangelog().getEntriesList() @@ -53,7 +53,7 @@ public class RemoteObjectServiceClient { return jObject.runWriteLockedMeta((md, b, v) -> { var outdated = (md.getOurVersion() > receivedTotalVer) - || (md.getChangelog().get(selfname) > receivedSelfVer); + || (md.getChangelog().get(persistentRemoteHostsService.getSelfUuid()) > receivedSelfVer); if (outdated) { Log.error("Race when trying to fetch"); @@ -64,15 +64,15 @@ public class RemoteObjectServiceClient { }); } - public IndexUpdatePush getIndex(String host) { + public IndexUpdatePush getIndex(UUID host) { return remoteHostManager.withClient(host, client -> { - var req = GetIndexRequest.newBuilder().setSelfname(selfname).build(); + var req = GetIndexRequest.newBuilder().setSelfUuid(persistentRemoteHostsService.getSelfUuid().toString()).build(); return client.getIndex(req); }); } - public List notifyUpdate(String host, List names) { - var builder = IndexUpdatePush.newBuilder().setSelfname(selfname); + public List notifyUpdate(UUID host, List names) { + var builder = IndexUpdatePush.newBuilder().setSelfUuid(persistentRemoteHostsService.getSelfUuid().toString()); for (var v : names) { var obj = jObjectManager.get(v); if (obj.isEmpty()) continue; diff --git a/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/RemoteObjectServiceServer.java b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/RemoteObjectServiceServer.java index 7b185b79..fbff121d 100644 --- a/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/RemoteObjectServiceServer.java +++ b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/RemoteObjectServiceServer.java @@ -12,14 +12,12 @@ import io.smallrye.common.annotation.Blocking; import io.smallrye.mutiny.Uni; import jakarta.inject.Inject; import org.apache.commons.lang3.tuple.Pair; -import org.eclipse.microprofile.config.inject.ConfigProperty; + +import java.util.UUID; // Note: RunOnVirtualThread hangs somehow @GrpcService public class RemoteObjectServiceServer implements DhfsObjectSyncGrpc { - @ConfigProperty(name = "dhfs.objects.distributed.selfname") - String selfname; - @Inject SyncHandler syncHandler; @@ -29,12 +27,15 @@ public class RemoteObjectServiceServer implements DhfsObjectSyncGrpc { @Inject RemoteHostManager remoteHostManager; + @Inject + PersistentRemoteHostsService persistentRemoteHostsService; + @Override @Blocking public Uni getObject(GetObjectRequest request) { - if (request.getSelfname().isBlank()) throw new StatusRuntimeException(Status.INVALID_ARGUMENT); + if (request.getSelfUuid().isBlank()) throw new StatusRuntimeException(Status.INVALID_ARGUMENT); - remoteHostManager.handleConnectionSuccess(request.getSelfname()); + remoteHostManager.handleConnectionSuccess(UUID.fromString(request.getSelfUuid())); Log.info("<-- getObject: " + request.getName()); @@ -48,11 +49,11 @@ public class RemoteObjectServiceServer implements DhfsObjectSyncGrpc { @Override @Blocking public Uni getIndex(GetIndexRequest request) { - if (request.getSelfname().isBlank()) throw new StatusRuntimeException(Status.INVALID_ARGUMENT); - remoteHostManager.handleConnectionSuccess(request.getSelfname()); + if (request.getSelfUuid().isBlank()) throw new StatusRuntimeException(Status.INVALID_ARGUMENT); + remoteHostManager.handleConnectionSuccess(UUID.fromString(request.getSelfUuid())); Log.info("<-- getIndex: "); - var builder = IndexUpdatePush.newBuilder().setSelfname(selfname); + var builder = IndexUpdatePush.newBuilder().setSelfUuid(persistentRemoteHostsService.getSelfUuid().toString()); var objs = jObjectManager.find(""); @@ -68,8 +69,8 @@ public class RemoteObjectServiceServer implements DhfsObjectSyncGrpc { @Override @Blocking public Uni indexUpdate(IndexUpdatePush request) { - if (request.getSelfname().isBlank()) throw new StatusRuntimeException(Status.INVALID_ARGUMENT); - remoteHostManager.handleConnectionSuccess(request.getSelfname()); + if (request.getSelfUuid().isBlank()) throw new StatusRuntimeException(Status.INVALID_ARGUMENT); + remoteHostManager.handleConnectionSuccess(UUID.fromString(request.getSelfUuid())); // Log.info("<-- indexUpdate: " + request.getHeader().getName()); return Uni.createFrom().item(syncHandler.handleRemoteUpdate(request)); @@ -78,9 +79,9 @@ public class RemoteObjectServiceServer implements DhfsObjectSyncGrpc { @Override @Blocking public Uni ping(PingRequest request) { - if (request.getSelfname().isBlank()) throw new StatusRuntimeException(Status.INVALID_ARGUMENT); - remoteHostManager.handleConnectionSuccess(request.getSelfname()); + if (request.getSelfUuid().isBlank()) throw new StatusRuntimeException(Status.INVALID_ARGUMENT); + remoteHostManager.handleConnectionSuccess(UUID.fromString(request.getSelfUuid())); - return Uni.createFrom().item(PingReply.newBuilder().setSelfname(selfname).build()); + return Uni.createFrom().item(PingReply.newBuilder().setSelfUuid(persistentRemoteHostsService.getSelfUuid().toString()).build()); } } diff --git a/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/SyncHandler.java b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/SyncHandler.java index fad1d6ff..8c32b92b 100644 --- a/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/SyncHandler.java +++ b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/SyncHandler.java @@ -11,15 +11,12 @@ import jakarta.enterprise.context.ApplicationScoped; import jakarta.enterprise.inject.Instance; import jakarta.inject.Inject; import org.apache.commons.lang3.NotImplementedException; -import org.eclipse.microprofile.config.inject.ConfigProperty; import java.util.Objects; +import java.util.UUID; @ApplicationScoped public class SyncHandler { - @ConfigProperty(name = "dhfs.objects.distributed.selfname") - String selfname; - @Inject JObjectManager jObjectManager; @@ -35,7 +32,10 @@ public class SyncHandler { @Inject Instance conflictResolvers; - public void doInitialResync(String host) { + @Inject + PersistentRemoteHostsService persistentRemoteHostsService; + + public void doInitialResync(UUID host) { var got = remoteObjectServiceClient.getIndex(host); handleRemoteUpdate(got); // Push our index to the other peer too, as they might not request it if @@ -50,7 +50,7 @@ public class SyncHandler { } } - private void handleOneUpdate(String from, ObjectHeader header) { + private void handleOneUpdate(UUID from, ObjectHeader header) { JObject found; try { found = jObjectManager.getOrPut(header.getName(), new ObjectMetadata( @@ -63,7 +63,7 @@ public class SyncHandler { } var receivedSelfVer = header.getChangelog() - .getEntriesList().stream().filter(p -> p.getHost().equals(selfname)) + .getEntriesList().stream().filter(p -> p.getHost().equals(persistentRemoteHostsService.getSelfUuid().toString())) .findFirst().map(ObjectChangelogEntry::getVersion).orElse(0L); var receivedTotalVer = header.getChangelog().getEntriesList() @@ -76,13 +76,14 @@ public class SyncHandler { return false; } - if (md.getChangelog().getOrDefault(selfname, 0L) > receivedSelfVer) return true; + if (md.getChangelog().getOrDefault(persistentRemoteHostsService.getSelfUuid(), 0L) > receivedSelfVer) + return true; md.getRemoteCopies().put(from, receivedTotalVer); if (Objects.equals(md.getOurVersion(), receivedTotalVer)) { for (var e : header.getChangelog().getEntriesList()) { - if (!Objects.equals(md.getChangelog().getOrDefault(e.getHost(), 0L), + if (!Objects.equals(md.getChangelog().getOrDefault(UUID.fromString(e.getHost()), 0L), e.getVersion())) return true; } } @@ -101,9 +102,9 @@ public class SyncHandler { md.getChangelog().clear(); for (var entry : header.getChangelog().getEntriesList()) { - md.getChangelog().put(entry.getHost(), entry.getVersion()); + md.getChangelog().put(UUID.fromString(entry.getHost()), entry.getVersion()); } - md.getChangelog().putIfAbsent(selfname, 0L); + md.getChangelog().putIfAbsent(persistentRemoteHostsService.getSelfUuid(), 0L); return false; }); @@ -121,11 +122,11 @@ public class SyncHandler { } public IndexUpdateReply handleRemoteUpdate(IndexUpdatePush request) { - var builder = IndexUpdateReply.newBuilder().setSelfname(selfname); + var builder = IndexUpdateReply.newBuilder().setSelfUuid(persistentRemoteHostsService.getSelfUuid().toString()); for (var u : request.getHeaderList()) { try { - handleOneUpdate(request.getSelfname(), u); + handleOneUpdate(UUID.fromString(request.getSelfUuid()), u); } catch (Exception ex) { builder.addErrors(IndexUpdateError.newBuilder().setObjectName(u.getName()).setError(ex.toString()).build()); } diff --git a/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/TransientPeersStateData.java b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/TransientPeersStateData.java index 0df62697..5f40fce2 100644 --- a/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/TransientPeersStateData.java +++ b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/TransientPeersStateData.java @@ -7,6 +7,7 @@ import lombok.Setter; import java.util.LinkedHashMap; import java.util.Map; +import java.util.UUID; public class TransientPeersStateData { @@ -25,5 +26,5 @@ public class TransientPeersStateData { } @Getter - private final Map _states = new LinkedHashMap<>(); + private final Map _states = new LinkedHashMap<>(); } diff --git a/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/persistence/FileObjectPersistentStore.java b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/persistence/FileObjectPersistentStore.java index 4e075fcc..7b9fc937 100644 --- a/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/persistence/FileObjectPersistentStore.java +++ b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/persistence/FileObjectPersistentStore.java @@ -78,7 +78,6 @@ public class FileObjectPersistentStore implements ObjectPersistentStore { } } - @Nonnull @Override public void writeObject(String name, ByteString data) { var file = Path.of(root, name); @@ -99,7 +98,6 @@ public class FileObjectPersistentStore implements ObjectPersistentStore { } } - @Nonnull @Override public void deleteObject(String name) { var file = Path.of(root, name); diff --git a/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/persistence/ObjectPersistentStore.java b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/persistence/ObjectPersistentStore.java index d63b7291..a6d00837 100644 --- a/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/persistence/ObjectPersistentStore.java +++ b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/persistence/ObjectPersistentStore.java @@ -13,8 +13,6 @@ public interface ObjectPersistentStore { @Nonnull ByteString readObject(String name); - @Nonnull void writeObject(String name, ByteString data); - @Nonnull void deleteObject(String name); } diff --git a/server/src/main/proto/dhfs_objects_sync.proto b/server/src/main/proto/dhfs_objects_sync.proto index 0e9e0180..baacb86b 100644 --- a/server/src/main/proto/dhfs_objects_sync.proto +++ b/server/src/main/proto/dhfs_objects_sync.proto @@ -15,11 +15,11 @@ service DhfsObjectSyncGrpc { } message PingRequest { - string selfname = 1; + string selfUuid = 1; } message PingReply { - string selfname = 1; + string selfUuid = 1; } message ObjectChangelogEntry { @@ -44,23 +44,23 @@ message ApiObject { } message GetObjectRequest { - string selfname = 10; + string selfUuid = 10; string name = 2; } message GetObjectReply { - string selfname = 10; + string selfUuid = 10; ApiObject object = 1; } message GetIndexRequest { - string selfname = 10; + string selfUuid = 10; } message IndexUpdatePush { - string selfname = 10; + string selfUuid = 10; repeated ObjectHeader header = 1; } @@ -71,7 +71,7 @@ message IndexUpdateError { } message IndexUpdateReply { - string selfname = 10; + string selfUuid = 10; repeated IndexUpdateError errors = 1; } \ No newline at end of file diff --git a/server/src/main/resources/application.properties b/server/src/main/resources/application.properties index 194baa80..1b99d73f 100644 --- a/server/src/main/resources/application.properties +++ b/server/src/main/resources/application.properties @@ -2,5 +2,4 @@ quarkus.grpc.server.use-separate-server=false dhfs.objects.persistence.files.root=${HOME}/dhfs_data/dhfs_root dhfs.objects.distributed.root=${HOME}/dhfs_data/dhfs_root_d dhfs.fuse.root=${HOME}/dhfs_data/dhfs_fuse_root -dhfs.storage.files.target_chunk_size=1048576 -dhfs.objects.distributed.selfname=thing1 +dhfs.storage.files.target_chunk_size=1048576 \ No newline at end of file