diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/PeerManager.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/PeerManager.java index eabd1b8c..52741a53 100644 --- a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/PeerManager.java +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/PeerManager.java @@ -49,6 +49,8 @@ public class PeerManager { long pingTimeout; @Inject PeerDiscoveryDirectory peerDiscoveryDirectory; + @Inject + SyncHandler syncHandler; private ExecutorService _heartbeatExecutor; // Note: keep priority updated with below @@ -101,13 +103,10 @@ public class PeerManager { private void handleConnectionSuccess(PeerInfo host, PeerAddress address) { boolean wasReachable = isReachable(host); -// boolean shouldSyncObj = persistentPeerDataService.markInitialObjSyncDone(host); -// boolean shouldSyncOp = persistentPeerDataService.markInitialOpSyncDone(host); -// -// if (shouldSyncObj) -// syncHandler.pushInitialResyncObj(host); -// if (shouldSyncOp) -// syncHandler.pushInitialResyncOp(host); + boolean shouldSync = persistentPeerDataService.markInitialSyncDone(host.id()); + + if (shouldSync) + syncHandler.doInitialSync(host.id()); _states.put(host.id(), address); @@ -179,14 +178,12 @@ public class PeerManager { }); } -// public void removeRemoteHost(UUID host) { -// persistentPeerDataService.removeHost(host); -// // Race? -// _transientPeersState.runWriteLocked(d -> { -// d.getStates().remove(host); -// return null; -// }); -// } + public void removeRemoteHost(PeerId peerId) { + transactionManager.run(() -> { + peerInfoService.removePeer(peerId); + persistentPeerDataService.resetInitialSyncDone(peerId); + }); + } private PeerAddress selectBestAddress(PeerId host) { return peerDiscoveryDirectory.getForPeer(host).stream().findFirst().orElseThrow(); @@ -207,9 +204,7 @@ public class PeerManager { peerInfoService.putPeer(host, cert); }); - peerTrustManager.reloadTrustManagerHosts( - transactionManager.run(() -> peerInfoService.getPeers().stream().toList()) - ); //FIXME: + peerTrustManager.reloadTrustManagerHosts(transactionManager.run(() -> peerInfoService.getPeers().stream().toList())); //FIXME: } public Collection getSeenButNotAddedHosts() { diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/PersistentPeerDataService.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/PersistentPeerDataService.java index 3f3f0eae..8d21c6ef 100644 --- a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/PersistentPeerDataService.java +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/PersistentPeerDataService.java @@ -13,6 +13,7 @@ import jakarta.enterprise.context.ApplicationScoped; import jakarta.enterprise.event.Observes; import jakarta.inject.Inject; import org.eclipse.microprofile.config.inject.ConfigProperty; +import org.pcollections.HashTreePSet; import java.io.IOException; import java.security.KeyPair; @@ -38,6 +39,8 @@ public class PersistentPeerDataService { Transaction curTx; @Inject PeerInfoService peerInfoService; + @Inject + TransactionManager txm; @ConfigProperty(name = "dhfs.peerdiscovery.preset-uuid") Optional presetUuid; @@ -61,7 +64,7 @@ public class PersistentPeerDataService { _selfKeyPair = CertificateTools.generateKeyPair(); _selfCertificate = CertificateTools.generateCertificate(_selfKeyPair, _selfUuid.toString()); - curTx.put(new PersistentRemoteHostsData(_selfUuid, 0, _selfCertificate, _selfKeyPair)); + curTx.put(new PersistentRemoteHostsData(_selfUuid, 0, _selfCertificate, _selfKeyPair, HashTreePSet.empty())); peerInfoService.putPeer(_selfUuid, _selfCertificate.getEncoded()); } catch (CertificateEncodingException e) { throw new RuntimeException(e); @@ -122,59 +125,28 @@ public class PersistentPeerDataService { return _selfCertificate; } -// // Returns true if host's initial sync wasn't done before, and marks it as done -// public boolean markInitialOpSyncDone(UUID connectedHost) { -// return jObjectTxManager.executeTx(() -> { -// peerDirectoryLocal.get().rwLock(); -// try { -// peerDirectoryLocal.get().local(); -// boolean contained = peerDirectoryLocal.get().getData().getInitialOpSyncDone().contains(connectedHost); -// -// if (!contained) -// peerDirectoryLocal.get().local().mutate(new JMutator() { -// @Override -// public boolean mutate(PeerDirectoryLocal object) { -// object.getInitialOpSyncDone().add(connectedHost); -// return true; -// } -// -// @Override -// public void revert(PeerDirectoryLocal object) { -// object.getInitialOpSyncDone().remove(connectedHost); -// } -// }); -// return !contained; -// } finally { -// peerDirectoryLocal.get().rwUnlock(); -// } -// }); -// } -// -// public boolean markInitialObjSyncDone(UUID connectedHost) { -// return jObjectTxManager.executeTx(() -> { -// peerDirectoryLocal.get().rwLock(); -// try { -// peerDirectoryLocal.get().local(); -// boolean contained = peerDirectoryLocal.get().getData().getInitialObjSyncDone().contains(connectedHost); -// -// if (!contained) -// peerDirectoryLocal.get().local().mutate(new JMutator() { -// @Override -// public boolean mutate(PeerDirectoryLocal object) { -// object.getInitialObjSyncDone().add(connectedHost); -// return true; -// } -// -// @Override -// public void revert(PeerDirectoryLocal object) { -// object.getInitialObjSyncDone().remove(connectedHost); -// } -// }); -// return !contained; -// } finally { -// peerDirectoryLocal.get().rwUnlock(); -// } -// }); -// } + // Returns true if host's initial sync wasn't done before, and marks it as done + public boolean markInitialSyncDone(PeerId peerId) { + return txm.run(() -> { + var data = curTx.get(PersistentRemoteHostsData.class, PersistentRemoteHostsData.KEY).orElse(null); + if (data == null) throw new IllegalStateException("Self data not found"); + boolean exists = data.initialSyncDone().contains(peerId); + if (exists) return false; + curTx.put(data.withInitialSyncDone(data.initialSyncDone().plus(peerId))); + return true; + }); + } + + // Returns true if it was marked as done before, and resets it + public boolean resetInitialSyncDone(PeerId peerId) { + return txm.run(() -> { + var data = curTx.get(PersistentRemoteHostsData.class, PersistentRemoteHostsData.KEY).orElse(null); + if (data == null) throw new IllegalStateException("Self data not found"); + boolean exists = data.initialSyncDone().contains(peerId); + if (!exists) return false; + curTx.put(data.withInitialSyncDone(data.initialSyncDone().minus(peerId))); + return true; + }); + } } diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/PersistentRemoteHostsData.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/PersistentRemoteHostsData.java index 23a99e0e..56f3411f 100644 --- a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/PersistentRemoteHostsData.java +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/PersistentRemoteHostsData.java @@ -3,6 +3,8 @@ package com.usatiuk.dhfs.objects.repository; import com.usatiuk.dhfs.objects.JData; import com.usatiuk.dhfs.objects.JObjectKey; import com.usatiuk.dhfs.objects.PeerId; +import org.pcollections.HashTreePSet; +import org.pcollections.PSet; import java.io.Serializable; import java.security.KeyPair; @@ -11,7 +13,8 @@ import java.security.cert.X509Certificate; public record PersistentRemoteHostsData(PeerId selfUuid, long selfCounter, X509Certificate selfCertificate, - KeyPair selfKeyPair) implements JData, Serializable { + KeyPair selfKeyPair, + PSet initialSyncDone) implements JData, Serializable { public static final JObjectKey KEY = JObjectKey.of("self_peer_data"); @Override @@ -20,6 +23,10 @@ public record PersistentRemoteHostsData(PeerId selfUuid, } public PersistentRemoteHostsData withSelfCounter(long selfCounter) { - return new PersistentRemoteHostsData(selfUuid, selfCounter, selfCertificate, selfKeyPair); + return new PersistentRemoteHostsData(selfUuid, selfCounter, selfCertificate, selfKeyPair, HashTreePSet.empty()); + } + + public PersistentRemoteHostsData withInitialSyncDone(PSet initialSyncDone) { + return new PersistentRemoteHostsData(selfUuid, selfCounter, selfCertificate, selfKeyPair, initialSyncDone); } } diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/SyncHandler.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/SyncHandler.java index aab38104..ae91198f 100644 --- a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/SyncHandler.java +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/SyncHandler.java @@ -248,4 +248,8 @@ public class SyncHandler { curTx.put(current); } } + + public void doInitialSync(PeerId peer) { + //TODO: + } } \ No newline at end of file diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/peersync/PeerInfoService.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/peersync/PeerInfoService.java index 5398302f..783d391a 100644 --- a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/peersync/PeerInfoService.java +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/peersync/PeerInfoService.java @@ -9,6 +9,7 @@ import com.usatiuk.dhfs.objects.jkleppmanntree.structs.JKleppmannTreeNode; import com.usatiuk.dhfs.objects.repository.PersistentPeerDataService; import com.usatiuk.dhfs.objects.repository.peersync.structs.JKleppmannTreeNodeMetaPeer; import com.usatiuk.dhfs.objects.transaction.Transaction; +import io.quarkus.logging.Log; import jakarta.enterprise.context.ApplicationScoped; import jakarta.inject.Inject; @@ -76,4 +77,19 @@ public class PeerInfoService { getTree().move(parent, new JKleppmannTreeNodeMetaPeer(newPeerInfo.id()), getTree().getNewNodeId()); }); } + + public void removePeer(PeerId id) { + jObjectTxManager.run(() -> { + var gotKey = getTree().traverse(List.of(id.toString())); + if (gotKey == null) { + return; + } + var meta = curTx.get(JKleppmannTreeNode.class, gotKey).map(node -> (JKleppmannTreeNodeMetaPeer) node.meta()).orElse(null); + if (meta == null) { + Log.warn("Peer " + id + " not found in the tree"); + return; + } + getTree().trash(meta, id.toJObjectKey()); + }); + } } diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/webapi/ManagementApi.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/webapi/ManagementApi.java index 344ef33f..43c1aa88 100644 --- a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/webapi/ManagementApi.java +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/webapi/ManagementApi.java @@ -34,7 +34,7 @@ public class ManagementApi { @Path("known-peers") @DELETE public void deletePeer(KnownPeerDelete knownPeerDelete) { -// peerManager.removeRemoteHost(PeerId.of(knownPeerPut.uuid())); + peerManager.removeRemoteHost(PeerId.of(knownPeerDelete.uuid())); } @Path("available-peers")