Server: split peer initial sync info into a jobject

This commit is contained in:
2024-10-12 21:22:48 +02:00
parent feb4b085d6
commit 6d1e51c745
6 changed files with 119 additions and 13 deletions

View File

@@ -7,6 +7,7 @@ import com.usatiuk.dhfs.objects.repository.ConflictResolver;
import java.io.Serial;
import java.io.Serializable;
import java.util.Collection;
import java.util.List;
@ProtoMirror(JObjectDataP.class)
public abstract class JObjectData implements Serializable {
@@ -15,13 +16,17 @@ public abstract class JObjectData implements Serializable {
public abstract String getName();
public abstract Class<? extends ConflictResolver> getConflictResolver();
public Class<? extends ConflictResolver> getConflictResolver() {
throw new UnsupportedOperationException();
}
public Class<? extends JObjectData> getRefType() {
throw new UnsupportedOperationException("This object shouldn't have refs");
}
public abstract Collection<String> extractRefs();
public Collection<String> extractRefs() {
return List.of();
}
public int estimateSize() {
return 0;

View File

@@ -7,6 +7,7 @@ import com.usatiuk.dhfs.objects.jrepository.JObjectManager;
import com.usatiuk.dhfs.objects.jrepository.JObjectTxManager;
import com.usatiuk.dhfs.objects.repository.invalidation.InvalidationQueueService;
import com.usatiuk.dhfs.objects.repository.peersync.PeerDirectory;
import com.usatiuk.dhfs.objects.repository.peersync.PeerDirectoryLocal;
import com.usatiuk.dhfs.objects.repository.peersync.PersistentPeerInfo;
import com.usatiuk.dhfs.objects.repository.peertrust.PeerTrustManager;
import io.grpc.Status;
@@ -85,13 +86,23 @@ public class PersistentPeerDataService {
var newpd = new PeerDirectory();
jObjectManager.put(new PersistentPeerInfo(_selfUuid, getSelfCertificate()), Optional.of(PeerDirectory.PeerDirectoryObjName));
newpd.getPeers().add(_selfUuid);
var dir = jObjectManager.put(newpd, Optional.empty());
jObjectManager.put(newpd, Optional.empty());
jObjectManager.put(new PeerDirectoryLocal(), Optional.empty());
});
}
if (!shutdownChecker.lastShutdownClean()) {
_persistentData.getData().getIrregularShutdownCounter().addAndGet(1);
_persistentData.getData().getInitialObjSyncDone().clear();
jObjectTxManager.executeTx(() -> {
getPeerDirectoryLocal().rwLock();
getPeerDirectoryLocal().tryResolve(JObjectManager.ResolutionStrategy.LOCAL_ONLY);
try {
getPeerDirectoryLocal().getData().getInitialObjSyncDone().clear();
getPeerDirectoryLocal().bumpVer();
} finally {
getPeerDirectoryLocal().rwUnlock();
}
});
}
jObjectManager.registerWriteListener(PersistentPeerInfo.class, this::pushPeerUpdates);
@@ -125,6 +136,17 @@ public class PersistentPeerDataService {
}
}
private JObject<PeerDirectoryLocal> getPeerDirectoryLocal() {
var got = jObjectManager.get(PeerDirectoryLocal.PeerDirectoryLocalObjName).orElseThrow(() -> new IllegalStateException("Peer directory not found"));
got.runReadLocked(JObjectManager.ResolutionStrategy.LOCAL_ONLY, (m, d) -> {
if (d == null) throw new IllegalStateException("Could not resolve peer directory!");
if (!(d instanceof PeerDirectoryLocal))
throw new IllegalStateException("Peer directory local is of wrong type!");
return null;
});
return (JObject<PeerDirectoryLocal>) got;
}
private JObject<PeerDirectory> getPeerDirectory() {
var got = jObjectManager.get(PeerDirectory.PeerDirectoryObjName).orElseThrow(() -> new IllegalStateException("Peer directory not found"));
got.runReadLocked(JObjectManager.ResolutionStrategy.LOCAL_ONLY, (m, d) -> {
@@ -245,8 +267,15 @@ public class PersistentPeerDataService {
boolean removedInner = d.getPeers().remove(host);
Log.info("Removing host: " + host + (removedInner ? " removed" : " did not exists"));
if (removedInner) {
_persistentData.runWriteLocked(pd -> pd.getInitialObjSyncDone().remove(host));
_persistentData.runWriteLocked(pd -> pd.getInitialOpSyncDone().remove(host));
getPeerDirectoryLocal().rwLock();
getPeerDirectoryLocal().tryResolve(JObjectManager.ResolutionStrategy.LOCAL_ONLY);
try {
getPeerDirectoryLocal().getData().getInitialObjSyncDone().remove(host);
getPeerDirectoryLocal().getData().getInitialOpSyncDone().remove(host);
getPeerDirectoryLocal().bumpVer();
} finally {
getPeerDirectoryLocal().rwUnlock();
}
getPeer(host).runWriteLocked(JObjectManager.ResolutionStrategy.NO_RESOLUTION, (mp, dp, bp, vp) -> {
mp.removeRef(m.getName());
return null;
@@ -294,11 +323,29 @@ public class PersistentPeerDataService {
// Returns true if host's initial sync wasn't done before, and marks it as done
public boolean markInitialOpSyncDone(UUID connectedHost) {
return _persistentData.runWriteLocked(d -> d.getInitialOpSyncDone().add(connectedHost));
return jObjectTxManager.executeTx(() -> {
getPeerDirectoryLocal().rwLock();
getPeerDirectoryLocal().tryResolve(JObjectManager.ResolutionStrategy.LOCAL_ONLY);
try {
getPeerDirectoryLocal().bumpVer();
return getPeerDirectoryLocal().getData().getInitialOpSyncDone().add(connectedHost);
} finally {
getPeerDirectoryLocal().rwUnlock();
}
});
}
public boolean markInitialObjSyncDone(UUID connectedHost) {
return _persistentData.runWriteLocked(d -> d.getInitialObjSyncDone().add(connectedHost));
return jObjectTxManager.executeTx(() -> {
getPeerDirectoryLocal().rwLock();
getPeerDirectoryLocal().tryResolve(JObjectManager.ResolutionStrategy.LOCAL_ONLY);
try {
getPeerDirectoryLocal().bumpVer();
return getPeerDirectoryLocal().getData().getInitialObjSyncDone().add(connectedHost);
} finally {
getPeerDirectoryLocal().rwUnlock();
}
});
}
}

View File

@@ -7,7 +7,6 @@ import java.io.Serial;
import java.io.Serializable;
import java.security.KeyPair;
import java.security.cert.X509Certificate;
import java.util.HashSet;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
@@ -22,10 +21,6 @@ public class PersistentRemoteHostsData implements Serializable {
@Getter
private final AtomicLong _irregularShutdownCounter = new AtomicLong();
@Getter
private final HashSet<UUID> _initialOpSyncDone = new HashSet<>();
@Getter
private final HashSet<UUID> _initialObjSyncDone = new HashSet<>();
@Getter
@Setter
private X509Certificate _selfCertificate = null;
@Getter

View File

@@ -0,0 +1,27 @@
package com.usatiuk.dhfs.objects.repository.peersync;
import com.usatiuk.dhfs.objects.jrepository.JObjectData;
import com.usatiuk.dhfs.objects.jrepository.OnlyLocal;
import lombok.Getter;
import java.util.HashSet;
import java.util.UUID;
@OnlyLocal
public class PeerDirectoryLocal extends JObjectData {
public static final String PeerDirectoryLocalObjName = "peer_directory_local";
@Getter
private final HashSet<UUID> _initialOpSyncDone = new HashSet<>();
@Getter
private final HashSet<UUID> _initialObjSyncDone = new HashSet<>();
@Override
public String getName() {
return PeerDirectoryLocalObjName;
}
@Override
public int estimateSize() {
return 1024; //FIXME:
}
}

View File

@@ -0,0 +1,26 @@
package com.usatiuk.dhfs.objects.repository.peersync;
import com.usatiuk.autoprotomap.runtime.ProtoSerializer;
import com.usatiuk.dhfs.objects.persistence.PeerDirectoryLocalP;
import jakarta.inject.Singleton;
import java.util.Objects;
import java.util.UUID;
@Singleton
public class PeerDirectoryLocalSerializer implements ProtoSerializer<PeerDirectoryLocalP, PeerDirectoryLocal> {
@Override
public PeerDirectoryLocal deserialize(PeerDirectoryLocalP message) {
var ret = new PeerDirectoryLocal();
ret.getInitialOpSyncDone().addAll(message.getInitialOpSyncDonePeersList().stream().map(UUID::fromString).toList());
ret.getInitialObjSyncDone().addAll(message.getInitialObjSyncDonePeersList().stream().map(UUID::fromString).toList());
return ret;
}
@Override
public PeerDirectoryLocalP serialize(PeerDirectoryLocal object) {
return PeerDirectoryLocalP.newBuilder()
.addAllInitialObjSyncDonePeers(() -> object.getInitialObjSyncDone().stream().map(Objects::toString).iterator())
.addAllInitialOpSyncDonePeers(() -> object.getInitialOpSyncDone().stream().map(Objects::toString).iterator()).build();
}
}

View File

@@ -136,6 +136,11 @@ message JKleppmannTreePersistentDataP {
repeated JKleppmannTreeOpLogPEntry opLog = 5;
}
message PeerDirectoryLocalP {
repeated string initialOpSyncDonePeers = 1;
repeated string initialObjSyncDonePeers = 2;
}
message JObjectDataP {
oneof obj {
FileP file = 2;
@@ -145,5 +150,6 @@ message JObjectDataP {
PersistentPeerInfoP persistentPeerInfo = 7;
JKleppmannTreeNodeP jKleppmannTreeNode = 8;
JKleppmannTreePersistentDataP jKleppmannTreePersistentData = 9;
PeerDirectoryLocalP peerDirectoryLocal = 10;
}
}