mirror of
https://github.com/usatiuk/dhfs.git
synced 2025-10-28 20:47:49 +01:00
Server: slightly less broken peer removal
This commit is contained in:
@@ -111,7 +111,8 @@ public class PeerManager {
|
||||
|
||||
_states.put(host.id(), address);
|
||||
|
||||
if (wasReachable) return;
|
||||
if (wasReachable)
|
||||
return;
|
||||
|
||||
Log.infov("Connected to {0}", host);
|
||||
|
||||
|
||||
@@ -120,8 +120,12 @@ public class PersistentPeerDataService {
|
||||
var data = curTx.get(PersistentRemoteHostsData.class, PersistentRemoteHostsData.KEY).orElse(null);
|
||||
if (data == null) throw new IllegalStateException("Self data not found");
|
||||
boolean exists = data.initialSyncDone().contains(peerId);
|
||||
if (exists) return false;
|
||||
if (exists) {
|
||||
Log.tracev("Already marked sync state for {0}", peerId);
|
||||
return false;
|
||||
}
|
||||
curTx.put(data.withInitialSyncDone(data.initialSyncDone().plus(peerId)));
|
||||
Log.infov("Did mark sync state for {0}", peerId);
|
||||
return true;
|
||||
});
|
||||
}
|
||||
@@ -132,8 +136,12 @@ public class PersistentPeerDataService {
|
||||
var data = curTx.get(PersistentRemoteHostsData.class, PersistentRemoteHostsData.KEY).orElse(null);
|
||||
if (data == null) throw new IllegalStateException("Self data not found");
|
||||
boolean exists = data.initialSyncDone().contains(peerId);
|
||||
if (!exists) return false;
|
||||
if (!exists) {
|
||||
Log.infov("Already reset sync state for {0}", peerId);
|
||||
return false;
|
||||
}
|
||||
curTx.put(data.withInitialSyncDone(data.initialSyncDone().minus(peerId)));
|
||||
Log.infov("Did reset sync state for {0}", peerId);
|
||||
return true;
|
||||
});
|
||||
}
|
||||
|
||||
@@ -7,6 +7,7 @@ import com.usatiuk.dhfs.objects.iterators.IteratorStart;
|
||||
import com.usatiuk.dhfs.objects.repository.invalidation.InvalidationQueueService;
|
||||
import com.usatiuk.dhfs.objects.transaction.Transaction;
|
||||
import com.usatiuk.dhfs.objects.transaction.TransactionManager;
|
||||
import io.quarkus.logging.Log;
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
import jakarta.enterprise.inject.Instance;
|
||||
import jakarta.inject.Inject;
|
||||
@@ -75,6 +76,7 @@ public class SyncHandler {
|
||||
|
||||
public void doInitialSync(PeerId peer) {
|
||||
txm.run(() -> {
|
||||
Log.tracev("Will do initial sync for {0}", peer);
|
||||
try (var it = curTx.getIterator(IteratorStart.GE, JObjectKey.first())) {
|
||||
while (it.hasNext()) {
|
||||
var key = it.peekNextKey();
|
||||
|
||||
@@ -23,6 +23,13 @@ public class JKleppmannTreeNodeMetaPeer extends JKleppmannTreeNodeMeta {
|
||||
return JObjectKey.of(id.toJObjectKey().name() + "_tree_node");
|
||||
}
|
||||
|
||||
public static PeerId nodeIdToPeerId(JObjectKey id) {
|
||||
if (!id.name().endsWith("_tree_node")) {
|
||||
throw new IllegalArgumentException("Not a tree node key: " + id);
|
||||
}
|
||||
return PeerId.of(id.name().substring(0, id.name().length() - "_tree_node".length()));
|
||||
}
|
||||
|
||||
@Override
|
||||
public JKleppmannTreeNodeMeta withName(String name) {
|
||||
assert name.equals(peerIdToNodeId(PeerId.of(getPeerId().name())).toString());
|
||||
|
||||
@@ -6,6 +6,7 @@ import com.usatiuk.dhfs.objects.RemoteObjectDataWrapper;
|
||||
import com.usatiuk.dhfs.objects.jkleppmanntree.structs.JKleppmannTreeNode;
|
||||
import com.usatiuk.dhfs.objects.repository.PersistentPeerDataService;
|
||||
import com.usatiuk.dhfs.objects.repository.peersync.PeerInfo;
|
||||
import com.usatiuk.dhfs.objects.repository.peersync.structs.JKleppmannTreeNodeMetaPeer;
|
||||
import com.usatiuk.dhfs.objects.transaction.PreCommitTxHook;
|
||||
import com.usatiuk.dhfs.objects.transaction.Transaction;
|
||||
import io.quarkus.logging.Log;
|
||||
@@ -23,9 +24,19 @@ public class PeerInfoCertUpdateTxHook implements PreCommitTxHook {
|
||||
public void onChange(JObjectKey key, JData old, JData cur) {
|
||||
if (cur instanceof JKleppmannTreeNode n) {
|
||||
if (n.key().name().equals("peers_jt_root")) {
|
||||
// TODO: This is kinda sucky
|
||||
Log.infov("Changed peer tree root: {0} to {1}", key, cur);
|
||||
|
||||
curTx.onCommit(() -> persistentPeerDataService.updateCerts());
|
||||
if (!(old instanceof JKleppmannTreeNode oldNode))
|
||||
throw new IllegalStateException("Old node is not a tree node");
|
||||
|
||||
for (var curRef : oldNode.children().entrySet()) {
|
||||
if (!n.children().containsKey(curRef.getKey())) {
|
||||
Log.infov("Will reset sync state for {0}", curRef.getValue());
|
||||
curTx.onCommit(() -> persistentPeerDataService.resetInitialSyncDone(JKleppmannTreeNodeMetaPeer.nodeIdToPeerId(curRef.getValue())));
|
||||
}
|
||||
}
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user