mirror of
https://github.com/usatiuk/dhfs.git
synced 2025-10-28 20:47:49 +01:00
some initial resync infrastructure
This commit is contained in:
@@ -49,6 +49,8 @@ public class PeerManager {
|
||||
long pingTimeout;
|
||||
@Inject
|
||||
PeerDiscoveryDirectory peerDiscoveryDirectory;
|
||||
@Inject
|
||||
SyncHandler syncHandler;
|
||||
private ExecutorService _heartbeatExecutor;
|
||||
|
||||
// Note: keep priority updated with below
|
||||
@@ -101,13 +103,10 @@ public class PeerManager {
|
||||
private void handleConnectionSuccess(PeerInfo host, PeerAddress address) {
|
||||
boolean wasReachable = isReachable(host);
|
||||
|
||||
// boolean shouldSyncObj = persistentPeerDataService.markInitialObjSyncDone(host);
|
||||
// boolean shouldSyncOp = persistentPeerDataService.markInitialOpSyncDone(host);
|
||||
//
|
||||
// if (shouldSyncObj)
|
||||
// syncHandler.pushInitialResyncObj(host);
|
||||
// if (shouldSyncOp)
|
||||
// syncHandler.pushInitialResyncOp(host);
|
||||
boolean shouldSync = persistentPeerDataService.markInitialSyncDone(host.id());
|
||||
|
||||
if (shouldSync)
|
||||
syncHandler.doInitialSync(host.id());
|
||||
|
||||
_states.put(host.id(), address);
|
||||
|
||||
@@ -179,14 +178,12 @@ public class PeerManager {
|
||||
});
|
||||
}
|
||||
|
||||
// public void removeRemoteHost(UUID host) {
|
||||
// persistentPeerDataService.removeHost(host);
|
||||
// // Race?
|
||||
// _transientPeersState.runWriteLocked(d -> {
|
||||
// d.getStates().remove(host);
|
||||
// return null;
|
||||
// });
|
||||
// }
|
||||
public void removeRemoteHost(PeerId peerId) {
|
||||
transactionManager.run(() -> {
|
||||
peerInfoService.removePeer(peerId);
|
||||
persistentPeerDataService.resetInitialSyncDone(peerId);
|
||||
});
|
||||
}
|
||||
|
||||
private PeerAddress selectBestAddress(PeerId host) {
|
||||
return peerDiscoveryDirectory.getForPeer(host).stream().findFirst().orElseThrow();
|
||||
@@ -207,9 +204,7 @@ public class PeerManager {
|
||||
peerInfoService.putPeer(host, cert);
|
||||
});
|
||||
|
||||
peerTrustManager.reloadTrustManagerHosts(
|
||||
transactionManager.run(() -> peerInfoService.getPeers().stream().toList())
|
||||
); //FIXME:
|
||||
peerTrustManager.reloadTrustManagerHosts(transactionManager.run(() -> peerInfoService.getPeers().stream().toList())); //FIXME:
|
||||
}
|
||||
|
||||
public Collection<AvailablePeerInfo> getSeenButNotAddedHosts() {
|
||||
|
||||
@@ -13,6 +13,7 @@ import jakarta.enterprise.context.ApplicationScoped;
|
||||
import jakarta.enterprise.event.Observes;
|
||||
import jakarta.inject.Inject;
|
||||
import org.eclipse.microprofile.config.inject.ConfigProperty;
|
||||
import org.pcollections.HashTreePSet;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.security.KeyPair;
|
||||
@@ -38,6 +39,8 @@ public class PersistentPeerDataService {
|
||||
Transaction curTx;
|
||||
@Inject
|
||||
PeerInfoService peerInfoService;
|
||||
@Inject
|
||||
TransactionManager txm;
|
||||
|
||||
@ConfigProperty(name = "dhfs.peerdiscovery.preset-uuid")
|
||||
Optional<String> presetUuid;
|
||||
@@ -61,7 +64,7 @@ public class PersistentPeerDataService {
|
||||
_selfKeyPair = CertificateTools.generateKeyPair();
|
||||
_selfCertificate = CertificateTools.generateCertificate(_selfKeyPair, _selfUuid.toString());
|
||||
|
||||
curTx.put(new PersistentRemoteHostsData(_selfUuid, 0, _selfCertificate, _selfKeyPair));
|
||||
curTx.put(new PersistentRemoteHostsData(_selfUuid, 0, _selfCertificate, _selfKeyPair, HashTreePSet.empty()));
|
||||
peerInfoService.putPeer(_selfUuid, _selfCertificate.getEncoded());
|
||||
} catch (CertificateEncodingException e) {
|
||||
throw new RuntimeException(e);
|
||||
@@ -122,59 +125,28 @@ public class PersistentPeerDataService {
|
||||
return _selfCertificate;
|
||||
}
|
||||
|
||||
// // Returns true if host's initial sync wasn't done before, and marks it as done
|
||||
// public boolean markInitialOpSyncDone(UUID connectedHost) {
|
||||
// return jObjectTxManager.executeTx(() -> {
|
||||
// peerDirectoryLocal.get().rwLock();
|
||||
// try {
|
||||
// peerDirectoryLocal.get().local();
|
||||
// boolean contained = peerDirectoryLocal.get().getData().getInitialOpSyncDone().contains(connectedHost);
|
||||
//
|
||||
// if (!contained)
|
||||
// peerDirectoryLocal.get().local().mutate(new JMutator<PeerDirectoryLocal>() {
|
||||
// @Override
|
||||
// public boolean mutate(PeerDirectoryLocal object) {
|
||||
// object.getInitialOpSyncDone().add(connectedHost);
|
||||
// return true;
|
||||
// }
|
||||
//
|
||||
// @Override
|
||||
// public void revert(PeerDirectoryLocal object) {
|
||||
// object.getInitialOpSyncDone().remove(connectedHost);
|
||||
// }
|
||||
// });
|
||||
// return !contained;
|
||||
// } finally {
|
||||
// peerDirectoryLocal.get().rwUnlock();
|
||||
// }
|
||||
// });
|
||||
// }
|
||||
//
|
||||
// public boolean markInitialObjSyncDone(UUID connectedHost) {
|
||||
// return jObjectTxManager.executeTx(() -> {
|
||||
// peerDirectoryLocal.get().rwLock();
|
||||
// try {
|
||||
// peerDirectoryLocal.get().local();
|
||||
// boolean contained = peerDirectoryLocal.get().getData().getInitialObjSyncDone().contains(connectedHost);
|
||||
//
|
||||
// if (!contained)
|
||||
// peerDirectoryLocal.get().local().mutate(new JMutator<PeerDirectoryLocal>() {
|
||||
// @Override
|
||||
// public boolean mutate(PeerDirectoryLocal object) {
|
||||
// object.getInitialObjSyncDone().add(connectedHost);
|
||||
// return true;
|
||||
// }
|
||||
//
|
||||
// @Override
|
||||
// public void revert(PeerDirectoryLocal object) {
|
||||
// object.getInitialObjSyncDone().remove(connectedHost);
|
||||
// }
|
||||
// });
|
||||
// return !contained;
|
||||
// } finally {
|
||||
// peerDirectoryLocal.get().rwUnlock();
|
||||
// }
|
||||
// });
|
||||
// }
|
||||
// Returns true if host's initial sync wasn't done before, and marks it as done
|
||||
public boolean markInitialSyncDone(PeerId peerId) {
|
||||
return txm.run(() -> {
|
||||
var data = curTx.get(PersistentRemoteHostsData.class, PersistentRemoteHostsData.KEY).orElse(null);
|
||||
if (data == null) throw new IllegalStateException("Self data not found");
|
||||
boolean exists = data.initialSyncDone().contains(peerId);
|
||||
if (exists) return false;
|
||||
curTx.put(data.withInitialSyncDone(data.initialSyncDone().plus(peerId)));
|
||||
return true;
|
||||
});
|
||||
}
|
||||
|
||||
// Returns true if it was marked as done before, and resets it
|
||||
public boolean resetInitialSyncDone(PeerId peerId) {
|
||||
return txm.run(() -> {
|
||||
var data = curTx.get(PersistentRemoteHostsData.class, PersistentRemoteHostsData.KEY).orElse(null);
|
||||
if (data == null) throw new IllegalStateException("Self data not found");
|
||||
boolean exists = data.initialSyncDone().contains(peerId);
|
||||
if (!exists) return false;
|
||||
curTx.put(data.withInitialSyncDone(data.initialSyncDone().minus(peerId)));
|
||||
return true;
|
||||
});
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -3,6 +3,8 @@ package com.usatiuk.dhfs.objects.repository;
|
||||
import com.usatiuk.dhfs.objects.JData;
|
||||
import com.usatiuk.dhfs.objects.JObjectKey;
|
||||
import com.usatiuk.dhfs.objects.PeerId;
|
||||
import org.pcollections.HashTreePSet;
|
||||
import org.pcollections.PSet;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.security.KeyPair;
|
||||
@@ -11,7 +13,8 @@ import java.security.cert.X509Certificate;
|
||||
public record PersistentRemoteHostsData(PeerId selfUuid,
|
||||
long selfCounter,
|
||||
X509Certificate selfCertificate,
|
||||
KeyPair selfKeyPair) implements JData, Serializable {
|
||||
KeyPair selfKeyPair,
|
||||
PSet<PeerId> initialSyncDone) implements JData, Serializable {
|
||||
public static final JObjectKey KEY = JObjectKey.of("self_peer_data");
|
||||
|
||||
@Override
|
||||
@@ -20,6 +23,10 @@ public record PersistentRemoteHostsData(PeerId selfUuid,
|
||||
}
|
||||
|
||||
public PersistentRemoteHostsData withSelfCounter(long selfCounter) {
|
||||
return new PersistentRemoteHostsData(selfUuid, selfCounter, selfCertificate, selfKeyPair);
|
||||
return new PersistentRemoteHostsData(selfUuid, selfCounter, selfCertificate, selfKeyPair, HashTreePSet.empty());
|
||||
}
|
||||
|
||||
public PersistentRemoteHostsData withInitialSyncDone(PSet<PeerId> initialSyncDone) {
|
||||
return new PersistentRemoteHostsData(selfUuid, selfCounter, selfCertificate, selfKeyPair, initialSyncDone);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -248,4 +248,8 @@ public class SyncHandler {
|
||||
curTx.put(current);
|
||||
}
|
||||
}
|
||||
|
||||
public void doInitialSync(PeerId peer) {
|
||||
//TODO:
|
||||
}
|
||||
}
|
||||
@@ -9,6 +9,7 @@ import com.usatiuk.dhfs.objects.jkleppmanntree.structs.JKleppmannTreeNode;
|
||||
import com.usatiuk.dhfs.objects.repository.PersistentPeerDataService;
|
||||
import com.usatiuk.dhfs.objects.repository.peersync.structs.JKleppmannTreeNodeMetaPeer;
|
||||
import com.usatiuk.dhfs.objects.transaction.Transaction;
|
||||
import io.quarkus.logging.Log;
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
import jakarta.inject.Inject;
|
||||
|
||||
@@ -76,4 +77,19 @@ public class PeerInfoService {
|
||||
getTree().move(parent, new JKleppmannTreeNodeMetaPeer(newPeerInfo.id()), getTree().getNewNodeId());
|
||||
});
|
||||
}
|
||||
|
||||
public void removePeer(PeerId id) {
|
||||
jObjectTxManager.run(() -> {
|
||||
var gotKey = getTree().traverse(List.of(id.toString()));
|
||||
if (gotKey == null) {
|
||||
return;
|
||||
}
|
||||
var meta = curTx.get(JKleppmannTreeNode.class, gotKey).map(node -> (JKleppmannTreeNodeMetaPeer) node.meta()).orElse(null);
|
||||
if (meta == null) {
|
||||
Log.warn("Peer " + id + " not found in the tree");
|
||||
return;
|
||||
}
|
||||
getTree().trash(meta, id.toJObjectKey());
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -34,7 +34,7 @@ public class ManagementApi {
|
||||
@Path("known-peers")
|
||||
@DELETE
|
||||
public void deletePeer(KnownPeerDelete knownPeerDelete) {
|
||||
// peerManager.removeRemoteHost(PeerId.of(knownPeerPut.uuid()));
|
||||
peerManager.removeRemoteHost(PeerId.of(knownPeerDelete.uuid()));
|
||||
}
|
||||
|
||||
@Path("available-peers")
|
||||
|
||||
Reference in New Issue
Block a user