From e0b4f973495e69144f29cef9664de3ed4fc4c6ba Mon Sep 17 00:00:00 2001 From: Stepan Usatiuk Date: Sun, 23 Mar 2025 23:18:49 +0100 Subject: [PATCH] Server: tree op push ack (to motivate garbage collection) --- .../usatiuk/kleppmanntree/KleppmannTree.java | 12 ++++---- .../jkleppmanntree/JKleppmannTreeManager.java | 21 +++++++++----- .../JKleppmannTreePeriodicPushOp.java | 29 +++++++------------ .../repository/invalidation/OpHandler.java | 7 +++++ .../repository/invalidation/OpPusher.java | 2 +- .../usatiuk/dhfs/integration/DhfsFuseIT.java | 6 ---- 6 files changed, 36 insertions(+), 41 deletions(-) diff --git a/dhfs-parent/kleppmanntree/src/main/java/com/usatiuk/kleppmanntree/KleppmannTree.java b/dhfs-parent/kleppmanntree/src/main/java/com/usatiuk/kleppmanntree/KleppmannTree.java index ffae8d88..f1f238b1 100644 --- a/dhfs-parent/kleppmanntree/src/main/java/com/usatiuk/kleppmanntree/KleppmannTree.java +++ b/dhfs-parent/kleppmanntree/src/main/java/com/usatiuk/kleppmanntree/KleppmannTree.java @@ -187,16 +187,14 @@ public class KleppmannTree, PeerIdT ex return true; } - public boolean updateExternalTimestamp(PeerIdT from, TimestampT timestamp) { - // TODO: Ideally no point in this separate locking? + public void updateExternalTimestamp(PeerIdT from, TimestampT timestamp) { var gotExt = _storage.getPeerTimestampLog().getForPeer(from); var gotSelf = _storage.getPeerTimestampLog().getForPeer(_peers.getSelfId()); - if ((gotExt != null && gotExt.compareTo(timestamp) >= 0) - && (gotSelf != null && gotSelf.compareTo(_clock.peekTimestamp()) >= 0)) return false; - updateTimestampImpl(_peers.getSelfId(), _clock.peekTimestamp()); // FIXME:? Kind of a hack? - updateTimestampImpl(from, timestamp); + if (!(gotExt != null && gotExt.compareTo(timestamp) >= 0)) + updateTimestampImpl(from, timestamp); + if (!(gotSelf != null && gotSelf.compareTo(_clock.peekTimestamp()) >= 0)) + updateTimestampImpl(_peers.getSelfId(), _clock.peekTimestamp()); // FIXME:? Kind of a hack? tryTrimLog(); - return true; } private void applyOp(PeerIdT from, OpMove op, boolean failCreatingIfExists) { diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/jkleppmanntree/JKleppmannTreeManager.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/jkleppmanntree/JKleppmannTreeManager.java index 0305e0b8..d9af1633 100644 --- a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/jkleppmanntree/JKleppmannTreeManager.java +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/jkleppmanntree/JKleppmannTreeManager.java @@ -6,6 +6,7 @@ import com.usatiuk.dhfs.objects.jkleppmanntree.structs.JKleppmannTreeNode; import com.usatiuk.dhfs.objects.jkleppmanntree.structs.JKleppmannTreeNodeMeta; import com.usatiuk.dhfs.objects.jkleppmanntree.structs.JKleppmannTreeNodeMetaDirectory; import com.usatiuk.dhfs.objects.jkleppmanntree.structs.JKleppmannTreePersistentData; +import com.usatiuk.dhfs.objects.repository.PersistentPeerDataService; import com.usatiuk.dhfs.objects.repository.invalidation.Op; import com.usatiuk.dhfs.objects.repository.peersync.PeerInfoService; import com.usatiuk.dhfs.objects.transaction.LockingStrategy; @@ -34,6 +35,8 @@ public class JKleppmannTreeManager { JKleppmannTreePeerInterface peerInterface; @Inject PeerInfoService peerInfoService; + @Inject + PersistentPeerDataService persistentPeerDataService; public JKleppmannTree getTree(JObjectKey name, LockingStrategy lockingStrategy) { return txManager.executeTx(() -> { @@ -113,6 +116,9 @@ public class JKleppmannTreeManager { // @Override public void commitOpForHost(PeerId host, Op op) { + if (op instanceof JKleppmannTreePeriodicPushOp) + return; + if (!(op instanceof JKleppmannTreeOpWrapper jop)) throw new IllegalArgumentException("Invalid incoming op type for JKleppmannTree: " + op.getClass()); @@ -133,9 +139,10 @@ public class JKleppmannTreeManager { } // @Override - public boolean acceptExternalOp(PeerId from, Op op) { - if (op instanceof JKleppmannTreePeriodicPushOp pushOp) { - return _tree.updateExternalTimestamp(pushOp.getFrom(), pushOp.getTimestamp()); + public void acceptExternalOp(PeerId from, Op op) { + if (op instanceof JKleppmannTreePeriodicPushOp(JObjectKey treeName, PeerId from1, long timestamp)) { + _tree.updateExternalTimestamp(from1, timestamp); + return; } if (!(op instanceof JKleppmannTreeOpWrapper jop)) @@ -192,13 +199,11 @@ public class JKleppmannTreeManager { // } // } } - return true; } -// @Override -// public Op getPeriodicPushOp() { -// return new JKleppmannTreePeriodicPushOp(persistentPeerDataService.getSelfUuid(), _clock.peekTimestamp()); -// } + public Op getPeriodicPushOp() { + return new JKleppmannTreePeriodicPushOp(_treeName, persistentPeerDataService.getSelfUuid(), _clock.peekTimestamp()); + } // @Override // public void addToTx() { diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/jkleppmanntree/JKleppmannTreePeriodicPushOp.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/jkleppmanntree/JKleppmannTreePeriodicPushOp.java index 530bf28e..6ddf10ae 100644 --- a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/jkleppmanntree/JKleppmannTreePeriodicPushOp.java +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/jkleppmanntree/JKleppmannTreePeriodicPushOp.java @@ -1,26 +1,17 @@ package com.usatiuk.dhfs.objects.jkleppmanntree; +import com.usatiuk.dhfs.objects.JObjectKey; import com.usatiuk.dhfs.objects.PeerId; +import com.usatiuk.dhfs.objects.repository.invalidation.Op; -public class JKleppmannTreePeriodicPushOp { - private final PeerId _from; - private final long _timestamp; +import java.io.Serializable; +import java.util.Collection; +import java.util.List; - public JKleppmannTreePeriodicPushOp(PeerId from, long timestamp) { - _from = from; - _timestamp = timestamp; +public record JKleppmannTreePeriodicPushOp(JObjectKey treeName, PeerId from, + long timestamp) implements Op, Serializable { + @Override + public Collection getEscapedRefs() { + return List.of(); } - - public PeerId getFrom() { - return _from; - } - - public long getTimestamp() { - return _timestamp; - } - -// @Override -// public Collection getEscapedRefs() { -// return List.of(); -// } } diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/invalidation/OpHandler.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/invalidation/OpHandler.java index 6b78661a..7121f308 100644 --- a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/invalidation/OpHandler.java +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/invalidation/OpHandler.java @@ -3,6 +3,7 @@ package com.usatiuk.dhfs.objects.repository.invalidation; import com.usatiuk.dhfs.objects.PeerId; import com.usatiuk.dhfs.objects.jkleppmanntree.JKleppmannTreeManager; import com.usatiuk.dhfs.objects.jkleppmanntree.JKleppmannTreeOpWrapper; +import com.usatiuk.dhfs.objects.jkleppmanntree.JKleppmannTreePeriodicPushOp; import com.usatiuk.dhfs.objects.transaction.Transaction; import jakarta.enterprise.context.ApplicationScoped; import jakarta.inject.Inject; @@ -15,6 +16,8 @@ public class OpHandler { Transaction curTx; @Inject JKleppmannTreeManager jKleppmannTreeManager; + @Inject + InvalidationQueueService invalidationQueueService; public void handleOp(PeerId from, Op op) { if (op instanceof IndexUpdateOp iu) { @@ -22,6 +25,10 @@ public class OpHandler { } else if (op instanceof JKleppmannTreeOpWrapper jk) { var tree = jKleppmannTreeManager.getTree(jk.treeName()); tree.acceptExternalOp(from, jk); + curTx.onCommit(() -> invalidationQueueService.pushInvalidationToOne(from, jk.treeName())); + } else if (op instanceof JKleppmannTreePeriodicPushOp pop) { + var tree = jKleppmannTreeManager.getTree(pop.treeName()); + tree.acceptExternalOp(from, pop); } } } diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/invalidation/OpPusher.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/invalidation/OpPusher.java index 65f82e65..de603c32 100644 --- a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/invalidation/OpPusher.java +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/invalidation/OpPusher.java @@ -54,7 +54,7 @@ public class OpPusher { var tree = jKleppmannTreeManager.getTree(pd.key()); if (!tree.hasPendingOpsForHost(entry.peer())) - return null; + return List.of(tree.getPeriodicPushOp()); var ops = tree.getPendingOpsForHost(entry.peer(), 1); diff --git a/dhfs-parent/server/src/test/java/com/usatiuk/dhfs/integration/DhfsFuseIT.java b/dhfs-parent/server/src/test/java/com/usatiuk/dhfs/integration/DhfsFuseIT.java index d247a52c..4cd40344 100644 --- a/dhfs-parent/server/src/test/java/com/usatiuk/dhfs/integration/DhfsFuseIT.java +++ b/dhfs-parent/server/src/test/java/com/usatiuk/dhfs/integration/DhfsFuseIT.java @@ -192,12 +192,6 @@ public class DhfsFuseIT { 0 == container2.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse").getExitCode()); Log.info("Deleted"); - // TODO: Fix this - Log.info("Dummy write"); - await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo tesempty > /root/dhfs_default/fuse/testd1").getExitCode()); - await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "echo tesempty > /root/dhfs_default/fuse/testd2").getExitCode()); - Log.info("Dummy written"); - // FIXME? waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Deleting from persistent"), 60, TimeUnit.SECONDS, 3); waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Deleting from persistent"), 60, TimeUnit.SECONDS, 3);