Sync-base: "kick out" inactive peers

This commit is contained in:
2025-05-03 13:14:06 +02:00
parent db51d7280c
commit 4e7b13227b
13 changed files with 172 additions and 71 deletions

View File

@@ -168,35 +168,6 @@ public class DhfsFuseIT {
"rewritten\n".equals(container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testf1").getStdout()));
}
// TODO: How this fits with the tree?
@Test
@Disabled
void deleteDelayedTest() throws IOException, InterruptedException, TimeoutException {
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo tesempty > /dhfs_test/fuse/testf1").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> "tesempty\n".equals(container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testf1").getStdout()));
await().atMost(45, TimeUnit.SECONDS).until(() -> "tesempty\n".equals(container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testf1").getStdout()));
var client = DockerClientFactory.instance().client();
client.pauseContainerCmd(container2.getContainerId()).exec();
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Lost connection to"), 60, TimeUnit.SECONDS);
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "rm /dhfs_test/fuse/testf1").getExitCode());
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Delaying deletion check"), 60, TimeUnit.SECONDS, 1);
client.unpauseContainerCmd(container2.getContainerId()).exec();
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse").getExitCode());
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Deleting from persistent"), 60, TimeUnit.SECONDS, 1);
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Deleting from persistent"), 60, TimeUnit.SECONDS, 3);
await().atMost(45, TimeUnit.SECONDS).until(() -> 1 == container2.execInContainer("/bin/sh", "-c", "test -f /dhfs_test/fuse/testf1").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> 1 == container1.execInContainer("/bin/sh", "-c", "test -f /dhfs_test/fuse/testf1").getExitCode());
}
@Test
void deleteTest() throws IOException, InterruptedException, TimeoutException {
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo tesempty > /dhfs_test/fuse/testf1").getExitCode());
@@ -221,6 +192,28 @@ public class DhfsFuseIT {
1 == container1.execInContainer("/bin/sh", "-c", "test -f /dhfs_test/fuse/testf1").getExitCode());
}
@Test
void deleteTestKickedOut() throws IOException, InterruptedException, TimeoutException {
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo tesempty > /dhfs_test/fuse/testf1").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() ->
"tesempty\n".equals(container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testf1").getStdout()));
await().atMost(45, TimeUnit.SECONDS).until(() ->
"tesempty\n".equals(container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testf1").getStdout()));
container2.stop();
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("kicked"), 60, TimeUnit.SECONDS, 1);
Log.info("Deleting");
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "rm /dhfs_test/fuse/testf1").getExitCode());
Log.info("Deleted");
// FIXME?
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Deleting from persistent"), 60, TimeUnit.SECONDS, 3);
await().atMost(45, TimeUnit.SECONDS).until(() ->
1 == container1.execInContainer("/bin/sh", "-c", "test -f /dhfs_test/fuse/testf1").getExitCode());
}
@Test
void moveFileTest() throws IOException, InterruptedException, TimeoutException {
Log.info("Creating");

View File

@@ -79,6 +79,8 @@ public class DhfsImage implements Future<String> {
"-Ddhfs.objects.sync.timeout=30",
"-Ddhfs.objects.sync.ping.timeout=5",
"-Ddhfs.objects.reconnect_interval=1s",
"-Ddhfs.objects.last-seen.timeout=30",
"-Ddhfs.objects.last-seen.update=10",
"-Ddhfs.sync.cert-check=false",
"-Dquarkus.log.category.\"com.usatiuk\".level=TRACE",
"-Dquarkus.log.category.\"com.usatiuk.dhfs\".level=TRACE",

View File

@@ -1,10 +1,12 @@
dhfs.objects.persistence=lmdb
dhfs.objects.writeback.limit=67108864
dhfs.objects.writeback.limit=16777216
dhfs.objects.lru.limit=67108864
dhfs.objects.lru.print-stats=false
dhfs.objects.lock_timeout_secs=15
dhfs.objects.persistence.files.root=${HOME}/dhfs_default/data/objs
dhfs.objects.persistence.snapshot-extra-checks=false
dhfs.objects.transaction.never-lock=true
dhfs.objects.last-seen.update=60
dhfs.objects.last-seen.timeout=43200
quarkus.log.category."com.usatiuk.objects.iterators".level=INFO
quarkus.log.category."com.usatiuk.objects.iterators".min-level=INFO

View File

@@ -24,6 +24,6 @@ public class JKleppmannTreePeerInterface implements PeerInterface<PeerId> {
@Override
public Collection<PeerId> getAllPeers() {
return peerInfoService.getPeers().stream().map(PeerInfo::id).toList();
return peerInfoService.getSynchronizedPeers().stream().map(PeerInfo::id).toList();
}
}

View File

@@ -6,24 +6,46 @@ import com.usatiuk.dhfs.remoteobj.JDataRemote;
import com.usatiuk.dhfs.remoteobj.JDataRemoteDto;
import com.usatiuk.dhfs.remoteobj.JDataRemotePush;
import com.usatiuk.objects.JObjectKey;
import org.pcollections.HashTreePMap;
import org.pcollections.PMap;
import java.security.cert.X509Certificate;
@JDataRemotePush
public record PeerInfo(JObjectKey key, PeerId id, ByteString cert) implements JDataRemote, JDataRemoteDto {
public record PeerInfo(JObjectKey key, PeerId id, ByteString cert,
PMap<PeerId, Long> kickCounter,
long lastSeenTimestamp) implements JDataRemote, JDataRemoteDto {
public PeerInfo(PeerId id, byte[] cert) {
this(id.toJObjectKey(), id, ByteString.copyFrom(cert));
this(id.toJObjectKey(), id, ByteString.copyFrom(cert), HashTreePMap.empty(), System.currentTimeMillis());
}
public X509Certificate parsedCert() {
return CertificateTools.certFromBytes(cert.toByteArray());
}
public PeerInfo withKickCounter(PMap<PeerId, Long> kickCounter) {
return new PeerInfo(key, id, cert, kickCounter, lastSeenTimestamp);
}
public PeerInfo withIncrementedKickCounter(PeerId peerId) {
return new PeerInfo(key, id, cert, kickCounter.plus(peerId, kickCounter.getOrDefault(peerId, 0L) + 1), lastSeenTimestamp);
}
public PeerInfo withLastSeenTimestamp(long lastSeenTimestamp) {
return new PeerInfo(key, id, cert, kickCounter, lastSeenTimestamp);
}
public long kickCounterSum() {
return kickCounter.values().stream().mapToLong(Long::longValue).sum();
}
@Override
public String toString() {
return "PeerInfo{" +
"key=" + key +
", id=" + id +
", kickCounter=" + kickCounter +
", lastSeenTimestamp=" + lastSeenTimestamp +
'}';
}
}

View File

@@ -87,17 +87,27 @@ public class PeerInfoService {
public List<PeerInfo> getPeersNoSelf() {
return jObjectTxManager.run(() -> {
var gotKey = getTreeR().traverse(List.of());
return curTx.get(JKleppmannTreeNodeHolder.class, gotKey).map(JKleppmannTreeNodeHolder::node).map(
node -> node.children().keySet().stream()
.map(JObjectKey::of).map(this::getPeerInfoImpl)
.filter(o -> {
if (o.isEmpty())
Log.warnv("Could not get peer info for peer {0}", o);
return o.isPresent();
}).map(Optional::get).filter(
peerInfo -> !peerInfo.id().equals(persistentPeerDataService.getSelfUuid())).toList())
.orElseThrow();
return getPeers().stream().filter(
peerInfo -> !peerInfo.id().equals(persistentPeerDataService.getSelfUuid())).toList();
});
}
public List<PeerInfo> getSynchronizedPeers() {
return jObjectTxManager.run(() -> {
return getPeers().stream().filter(pi -> {
if (pi.id().equals(persistentPeerDataService.getSelfUuid())) {
return true;
}
return persistentPeerDataService.isInitialSyncDone(pi.id());
}).toList();
});
}
public List<PeerInfo> getSynchronizedPeersNoSelf() {
return jObjectTxManager.run(() -> {
return getPeersNoSelf().stream().filter(pi -> {
return persistentPeerDataService.isInitialSyncDone(pi.id());
}).toList();
});
}

View File

@@ -89,10 +89,11 @@ public class PeerInfoSyncHandler implements ObjSyncHandler<PeerInfo, PeerInfo> {
if (oursCurData == null)
throw new StatusRuntimeException(Status.ABORTED.withDescription("Conflict but we don't have local copy"));
if (!receivedData.equals(oursCurData))
throw new StatusRuntimeException(Status.ABORTED.withDescription("PeerInfo data conflict"));
if (!receivedData.cert().equals(oursCurData.cert()))
throw new StatusRuntimeException(Status.ABORTED.withDescription("PeerInfo certificate conflict for " + key));
HashPMap<PeerId, Long> newChangelog = HashTreePMap.from(current.changelog());
HashPMap<PeerId, Long> newKickCounter = HashTreePMap.from(oursCurData.kickCounter());
for (var entry : receivedChangelog.entrySet()) {
newChangelog = newChangelog.plus(entry.getKey(),
@@ -100,6 +101,20 @@ public class PeerInfoSyncHandler implements ObjSyncHandler<PeerInfo, PeerInfo> {
);
}
for (var entry : receivedData.kickCounter().entrySet()) {
newKickCounter = newKickCounter.plus(entry.getKey(),
Long.max(newKickCounter.getOrDefault(entry.getKey(), 0L), entry.getValue())
);
}
var newData = oursCurData.withKickCounter(newKickCounter)
.withLastSeenTimestamp(Math.max(oursCurData.lastSeenTimestamp(), receivedData.lastSeenTimestamp()));
if (!newData.equals(oursCurData))
newChangelog = newChangelog.plus(persistentPeerDataService.getSelfUuid(), newChangelog.getOrDefault(persistentPeerDataService.getSelfUuid(), 0L) + 1L);
remoteTx.putDataRaw(newData);
current = current.withChangelog(newChangelog);
}
}

View File

@@ -0,0 +1,60 @@
package com.usatiuk.dhfs.peersync;
import com.usatiuk.dhfs.remoteobj.RemoteTransaction;
import com.usatiuk.objects.transaction.Transaction;
import com.usatiuk.objects.transaction.TransactionManager;
import io.quarkus.scheduler.Scheduled;
import io.smallrye.common.annotation.Blocking;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import org.eclipse.microprofile.config.inject.ConfigProperty;
@ApplicationScoped
public class PeerLastSeenUpdater {
@Inject
PeerManager peerManager;
@Inject
PeerInfoService peerInfoService;
@Inject
Transaction curTx;
@Inject
TransactionManager txm;
@Inject
RemoteTransaction remoteTransaction;
@ConfigProperty(name = "dhfs.objects.last-seen.timeout")
long lastSeenTimeout;
@Inject
PersistentPeerDataService persistentPeerDataService;
@Scheduled(every = "${dhfs.objects.last-seen.update}", concurrentExecution = Scheduled.ConcurrentExecution.SKIP)
@Blocking
void update() {
var snapshot = peerManager.getHostStateSnapshot();
for (var a : snapshot.available()) {
txm.run(() -> {
var curInfo = remoteTransaction.getData(PeerInfo.class, a.id()).orElse(null);
if (curInfo == null) return;
var newInfo = curInfo.withLastSeenTimestamp(System.currentTimeMillis());
remoteTransaction.putData(newInfo);
});
}
for (var u : snapshot.unavailable()) {
txm.run(() -> {
if (!persistentPeerDataService.isInitialSyncDone(u))
return;
var curInfo = remoteTransaction.getData(PeerInfo.class, u.id()).orElse(null);
if (curInfo == null) return;
var lastSeen = curInfo.lastSeenTimestamp();
if (System.currentTimeMillis() - lastSeen > (lastSeenTimeout * 1000)) {
var kicked = curInfo.withIncrementedKickCounter(persistentPeerDataService.getSelfUuid());
remoteTransaction.putData(kicked);
}
});
}
}
}

View File

@@ -125,19 +125,23 @@ public class PeerManager {
}
}
public void handleConnectionError(com.usatiuk.dhfs.peersync.PeerInfo host) {
public void handleConnectionError(PeerId host) {
boolean wasReachable = isReachable(host);
if (wasReachable)
Log.infov("Lost connection to {0}", host);
_states.remove(host.id());
_states.remove(host);
for (var l : _disconnectedListeners) {
l.handlePeerDisconnected(host.id());
l.handlePeerDisconnected(host);
}
}
public void handleConnectionError(com.usatiuk.dhfs.peersync.PeerInfo host) {
handleConnectionError(host.id());
}
// FIXME:
private boolean pingCheck(com.usatiuk.dhfs.peersync.PeerInfo host, PeerAddress address) {
try {

View File

@@ -51,6 +51,8 @@ public class PersistentPeerDataService {
PeerInfoService peerInfoService;
@Inject
TransactionManager txm;
@Inject
PeerManager peerManager;
@ConfigProperty(name = "dhfs.peerdiscovery.preset-uuid")
Optional<String> presetUuid;
@@ -89,21 +91,6 @@ public class PersistentPeerDataService {
Files.write(Path.of(stuffRoot, "self_uuid"), _selfUuid.id().toString().getBytes(), StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING);
}
// private void pushPeerUpdates() {
// pushPeerUpdates(null);
// }
// private void pushPeerUpdates(@Nullable JObject<?> obj) {
// if (obj != null)
// Log.info("Scheduling certificate update after " + obj.getMeta().getName() + " was updated");
// executorService.submit(() -> {
// updateCerts();
// invalidationQueueService.pushInvalidationToAll(PeerDirectory.PeerDirectoryObjName);
// for (var p : peerDirectory.get().runReadLocked(JObjectManager.ResolutionStrategy.LOCAL_ONLY, (m, d) -> d.getPeers().stream().toList()))
// invalidationQueueService.pushInvalidationToAll(PersistentPeerInfo.getNameFromUuid(p));
// });
// }
public PeerId getSelfUuid() {
return _selfUuid;
}
@@ -148,6 +135,7 @@ public class PersistentPeerDataService {
}
curTx.put(data.withInitialSyncDone(data.initialSyncDone().minus(peerId)));
Log.infov("Did reset sync state for {0}", peerId);
curTx.onCommit(() -> peerManager.handleConnectionError(peerId));
return true;
});
}
@@ -160,7 +148,6 @@ public class PersistentPeerDataService {
});
}
public List<IpPeerAddress> getPersistentPeerAddresses() {
return txm.run(() -> {
var data = curTx.get(PersistentRemoteHostsData.class, PersistentRemoteHostsData.KEY).orElse(null);

View File

@@ -1,7 +1,6 @@
package com.usatiuk.dhfs.peertrust;
import com.usatiuk.dhfs.invalidation.InvalidationQueueService;
import com.usatiuk.dhfs.jkleppmanntree.structs.JKleppmannTreeNode;
import com.usatiuk.dhfs.jkleppmanntree.structs.JKleppmannTreeNodeHolder;
import com.usatiuk.dhfs.peersync.PeerInfo;
import com.usatiuk.dhfs.peersync.PeerInfoService;
@@ -43,7 +42,7 @@ public class PeerInfoCertUpdateTxHook implements PreCommitTxHook {
for (var curRef : oldNode.node().children().entrySet()) {
if (!n.node().children().containsKey(curRef.getKey())) {
Log.infov("Will reset sync state for {0}", curRef.getValue());
curTx.onCommit(() -> persistentPeerDataService.resetInitialSyncDone(JKleppmannTreeNodeMetaPeer.nodeIdToPeerId(curRef.getValue())));
persistentPeerDataService.resetInitialSyncDone(JKleppmannTreeNodeMetaPeer.nodeIdToPeerId(curRef.getValue()));
}
}
return;
@@ -54,9 +53,16 @@ public class PeerInfoCertUpdateTxHook implements PreCommitTxHook {
return;
}
if (!(remote.data() instanceof PeerInfo))
if (!(remote.data() instanceof PeerInfo curPi))
return;
var oldPi = (PeerInfo) ((RemoteObjectDataWrapper) old).data();
if (oldPi.kickCounterSum() != curPi.kickCounterSum()) {
Log.warnv("Peer kicked out: {0} to {1}", key, cur);
persistentPeerDataService.resetInitialSyncDone(curPi.id());
}
Log.infov("Changed peer info: {0} to {1}", key, cur);
curTx.onCommit(() -> invalidationQueueService.pushInvalidationToAll(key));

View File

@@ -20,7 +20,7 @@ public class IndexUpdateOpHandler implements OpHandler<IndexUpdateOp> {
@Override
public void handleOp(PeerId from, IndexUpdateOp op) {
txm.run(() -> {
syncHandler.handleRemoteUpdate(from, op.key(), op.changelog(), null);
syncHandler.handleRemoteUpdate(from, op.key(), op.changelog(), op.data());
});
}
}

View File

@@ -132,7 +132,7 @@ public class RemoteObjectDeleter {
return true;
}
var knownHosts = peerInfoService.getPeersNoSelf();
var knownHosts = peerInfoService.getSynchronizedPeersNoSelf();
RemoteObjectMeta finalTarget = target;
List<PeerId> missing = knownHosts.stream()
.map(PeerInfo::id)
@@ -187,7 +187,7 @@ public class RemoteObjectDeleter {
if (!obj.seen())
return true;
var knownHosts = peerInfoService.getPeersNoSelf();
var knownHosts = peerInfoService.getSynchronizedPeersNoSelf();
boolean missing = false;
for (var x : knownHosts) {
if (!obj.confirmedDeletes().contains(x.id())) {