From 6d1e51c7455d19df29f3bc47091434e8d7e7d0e9 Mon Sep 17 00:00:00 2001 From: Stepan Usatiuk Date: Sat, 12 Oct 2024 21:22:48 +0200 Subject: [PATCH] Server: split peer initial sync info into a jobject --- .../dhfs/objects/jrepository/JObjectData.java | 9 ++- .../repository/PersistentPeerDataService.java | 59 +++++++++++++++++-- .../repository/PersistentRemoteHostsData.java | 5 -- .../peersync/PeerDirectoryLocal.java | 27 +++++++++ .../PeerDirectoryLocalSerializer.java | 26 ++++++++ .../src/main/proto/dhfs_objects_serial.proto | 6 ++ 6 files changed, 119 insertions(+), 13 deletions(-) create mode 100644 dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/peersync/PeerDirectoryLocal.java create mode 100644 dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/peersync/PeerDirectoryLocalSerializer.java diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/jrepository/JObjectData.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/jrepository/JObjectData.java index 6e98b775..ad5b1332 100644 --- a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/jrepository/JObjectData.java +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/jrepository/JObjectData.java @@ -7,6 +7,7 @@ import com.usatiuk.dhfs.objects.repository.ConflictResolver; import java.io.Serial; import java.io.Serializable; import java.util.Collection; +import java.util.List; @ProtoMirror(JObjectDataP.class) public abstract class JObjectData implements Serializable { @@ -15,13 +16,17 @@ public abstract class JObjectData implements Serializable { public abstract String getName(); - public abstract Class getConflictResolver(); + public Class getConflictResolver() { + throw new UnsupportedOperationException(); + } public Class getRefType() { throw new UnsupportedOperationException("This object shouldn't have refs"); } - public abstract Collection extractRefs(); + public Collection extractRefs() { + return List.of(); + } public int estimateSize() { return 0; diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/PersistentPeerDataService.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/PersistentPeerDataService.java index 99792ac3..41607866 100644 --- a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/PersistentPeerDataService.java +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/PersistentPeerDataService.java @@ -7,6 +7,7 @@ import com.usatiuk.dhfs.objects.jrepository.JObjectManager; import com.usatiuk.dhfs.objects.jrepository.JObjectTxManager; import com.usatiuk.dhfs.objects.repository.invalidation.InvalidationQueueService; import com.usatiuk.dhfs.objects.repository.peersync.PeerDirectory; +import com.usatiuk.dhfs.objects.repository.peersync.PeerDirectoryLocal; import com.usatiuk.dhfs.objects.repository.peersync.PersistentPeerInfo; import com.usatiuk.dhfs.objects.repository.peertrust.PeerTrustManager; import io.grpc.Status; @@ -85,13 +86,23 @@ public class PersistentPeerDataService { var newpd = new PeerDirectory(); jObjectManager.put(new PersistentPeerInfo(_selfUuid, getSelfCertificate()), Optional.of(PeerDirectory.PeerDirectoryObjName)); newpd.getPeers().add(_selfUuid); - var dir = jObjectManager.put(newpd, Optional.empty()); + jObjectManager.put(newpd, Optional.empty()); + jObjectManager.put(new PeerDirectoryLocal(), Optional.empty()); }); } if (!shutdownChecker.lastShutdownClean()) { _persistentData.getData().getIrregularShutdownCounter().addAndGet(1); - _persistentData.getData().getInitialObjSyncDone().clear(); + jObjectTxManager.executeTx(() -> { + getPeerDirectoryLocal().rwLock(); + getPeerDirectoryLocal().tryResolve(JObjectManager.ResolutionStrategy.LOCAL_ONLY); + try { + getPeerDirectoryLocal().getData().getInitialObjSyncDone().clear(); + getPeerDirectoryLocal().bumpVer(); + } finally { + getPeerDirectoryLocal().rwUnlock(); + } + }); } jObjectManager.registerWriteListener(PersistentPeerInfo.class, this::pushPeerUpdates); @@ -125,6 +136,17 @@ public class PersistentPeerDataService { } } + private JObject getPeerDirectoryLocal() { + var got = jObjectManager.get(PeerDirectoryLocal.PeerDirectoryLocalObjName).orElseThrow(() -> new IllegalStateException("Peer directory not found")); + got.runReadLocked(JObjectManager.ResolutionStrategy.LOCAL_ONLY, (m, d) -> { + if (d == null) throw new IllegalStateException("Could not resolve peer directory!"); + if (!(d instanceof PeerDirectoryLocal)) + throw new IllegalStateException("Peer directory local is of wrong type!"); + return null; + }); + return (JObject) got; + } + private JObject getPeerDirectory() { var got = jObjectManager.get(PeerDirectory.PeerDirectoryObjName).orElseThrow(() -> new IllegalStateException("Peer directory not found")); got.runReadLocked(JObjectManager.ResolutionStrategy.LOCAL_ONLY, (m, d) -> { @@ -245,8 +267,15 @@ public class PersistentPeerDataService { boolean removedInner = d.getPeers().remove(host); Log.info("Removing host: " + host + (removedInner ? " removed" : " did not exists")); if (removedInner) { - _persistentData.runWriteLocked(pd -> pd.getInitialObjSyncDone().remove(host)); - _persistentData.runWriteLocked(pd -> pd.getInitialOpSyncDone().remove(host)); + getPeerDirectoryLocal().rwLock(); + getPeerDirectoryLocal().tryResolve(JObjectManager.ResolutionStrategy.LOCAL_ONLY); + try { + getPeerDirectoryLocal().getData().getInitialObjSyncDone().remove(host); + getPeerDirectoryLocal().getData().getInitialOpSyncDone().remove(host); + getPeerDirectoryLocal().bumpVer(); + } finally { + getPeerDirectoryLocal().rwUnlock(); + } getPeer(host).runWriteLocked(JObjectManager.ResolutionStrategy.NO_RESOLUTION, (mp, dp, bp, vp) -> { mp.removeRef(m.getName()); return null; @@ -294,11 +323,29 @@ public class PersistentPeerDataService { // Returns true if host's initial sync wasn't done before, and marks it as done public boolean markInitialOpSyncDone(UUID connectedHost) { - return _persistentData.runWriteLocked(d -> d.getInitialOpSyncDone().add(connectedHost)); + return jObjectTxManager.executeTx(() -> { + getPeerDirectoryLocal().rwLock(); + getPeerDirectoryLocal().tryResolve(JObjectManager.ResolutionStrategy.LOCAL_ONLY); + try { + getPeerDirectoryLocal().bumpVer(); + return getPeerDirectoryLocal().getData().getInitialOpSyncDone().add(connectedHost); + } finally { + getPeerDirectoryLocal().rwUnlock(); + } + }); } public boolean markInitialObjSyncDone(UUID connectedHost) { - return _persistentData.runWriteLocked(d -> d.getInitialObjSyncDone().add(connectedHost)); + return jObjectTxManager.executeTx(() -> { + getPeerDirectoryLocal().rwLock(); + getPeerDirectoryLocal().tryResolve(JObjectManager.ResolutionStrategy.LOCAL_ONLY); + try { + getPeerDirectoryLocal().bumpVer(); + return getPeerDirectoryLocal().getData().getInitialObjSyncDone().add(connectedHost); + } finally { + getPeerDirectoryLocal().rwUnlock(); + } + }); } } diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/PersistentRemoteHostsData.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/PersistentRemoteHostsData.java index e119c041..6f5b64b5 100644 --- a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/PersistentRemoteHostsData.java +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/PersistentRemoteHostsData.java @@ -7,7 +7,6 @@ import java.io.Serial; import java.io.Serializable; import java.security.KeyPair; import java.security.cert.X509Certificate; -import java.util.HashSet; import java.util.UUID; import java.util.concurrent.atomic.AtomicLong; @@ -22,10 +21,6 @@ public class PersistentRemoteHostsData implements Serializable { @Getter private final AtomicLong _irregularShutdownCounter = new AtomicLong(); @Getter - private final HashSet _initialOpSyncDone = new HashSet<>(); - @Getter - private final HashSet _initialObjSyncDone = new HashSet<>(); - @Getter @Setter private X509Certificate _selfCertificate = null; @Getter diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/peersync/PeerDirectoryLocal.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/peersync/PeerDirectoryLocal.java new file mode 100644 index 00000000..e3e54994 --- /dev/null +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/peersync/PeerDirectoryLocal.java @@ -0,0 +1,27 @@ +package com.usatiuk.dhfs.objects.repository.peersync; + +import com.usatiuk.dhfs.objects.jrepository.JObjectData; +import com.usatiuk.dhfs.objects.jrepository.OnlyLocal; +import lombok.Getter; + +import java.util.HashSet; +import java.util.UUID; + +@OnlyLocal +public class PeerDirectoryLocal extends JObjectData { + public static final String PeerDirectoryLocalObjName = "peer_directory_local"; + @Getter + private final HashSet _initialOpSyncDone = new HashSet<>(); + @Getter + private final HashSet _initialObjSyncDone = new HashSet<>(); + + @Override + public String getName() { + return PeerDirectoryLocalObjName; + } + + @Override + public int estimateSize() { + return 1024; //FIXME: + } +} diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/peersync/PeerDirectoryLocalSerializer.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/peersync/PeerDirectoryLocalSerializer.java new file mode 100644 index 00000000..ba2650e4 --- /dev/null +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/peersync/PeerDirectoryLocalSerializer.java @@ -0,0 +1,26 @@ +package com.usatiuk.dhfs.objects.repository.peersync; + +import com.usatiuk.autoprotomap.runtime.ProtoSerializer; +import com.usatiuk.dhfs.objects.persistence.PeerDirectoryLocalP; +import jakarta.inject.Singleton; + +import java.util.Objects; +import java.util.UUID; + +@Singleton +public class PeerDirectoryLocalSerializer implements ProtoSerializer { + @Override + public PeerDirectoryLocal deserialize(PeerDirectoryLocalP message) { + var ret = new PeerDirectoryLocal(); + ret.getInitialOpSyncDone().addAll(message.getInitialOpSyncDonePeersList().stream().map(UUID::fromString).toList()); + ret.getInitialObjSyncDone().addAll(message.getInitialObjSyncDonePeersList().stream().map(UUID::fromString).toList()); + return ret; + } + + @Override + public PeerDirectoryLocalP serialize(PeerDirectoryLocal object) { + return PeerDirectoryLocalP.newBuilder() + .addAllInitialObjSyncDonePeers(() -> object.getInitialObjSyncDone().stream().map(Objects::toString).iterator()) + .addAllInitialOpSyncDonePeers(() -> object.getInitialOpSyncDone().stream().map(Objects::toString).iterator()).build(); + } +} diff --git a/dhfs-parent/server/src/main/proto/dhfs_objects_serial.proto b/dhfs-parent/server/src/main/proto/dhfs_objects_serial.proto index 76597906..665e8d23 100644 --- a/dhfs-parent/server/src/main/proto/dhfs_objects_serial.proto +++ b/dhfs-parent/server/src/main/proto/dhfs_objects_serial.proto @@ -136,6 +136,11 @@ message JKleppmannTreePersistentDataP { repeated JKleppmannTreeOpLogPEntry opLog = 5; } +message PeerDirectoryLocalP { + repeated string initialOpSyncDonePeers = 1; + repeated string initialObjSyncDonePeers = 2; +} + message JObjectDataP { oneof obj { FileP file = 2; @@ -145,5 +150,6 @@ message JObjectDataP { PersistentPeerInfoP persistentPeerInfo = 7; JKleppmannTreeNodeP jKleppmannTreeNode = 8; JKleppmannTreePersistentDataP jKleppmannTreePersistentData = 9; + PeerDirectoryLocalP peerDirectoryLocal = 10; } } \ No newline at end of file