diff --git a/dhfs-parent/kleppmanntree/src/main/java/com/usatiuk/kleppmanntree/KleppmannTree.java b/dhfs-parent/kleppmanntree/src/main/java/com/usatiuk/kleppmanntree/KleppmannTree.java index 62c59030..42fb5de1 100644 --- a/dhfs-parent/kleppmanntree/src/main/java/com/usatiuk/kleppmanntree/KleppmannTree.java +++ b/dhfs-parent/kleppmanntree/src/main/java/com/usatiuk/kleppmanntree/KleppmannTree.java @@ -220,14 +220,14 @@ public class KleppmannTree, PeerIdT ex return true; } - public void updateExternalTimestamp(PeerIdT from, TimestampT timestamp) { + public boolean updateExternalTimestamp(PeerIdT from, TimestampT timestamp) { _storage.rLock(); try { // TODO: Ideally no point in this separate locking? var gotExt = _storage.getPeerTimestampLog().getForPeer(from); var gotSelf = _storage.getPeerTimestampLog().getForPeer(_peers.getSelfId()); if ((gotExt != null && gotExt.compareTo(timestamp) >= 0) - && (gotSelf != null && gotSelf.compareTo(_clock.peekTimestamp()) >= 0)) return; + && (gotSelf != null && gotSelf.compareTo(_clock.peekTimestamp()) >= 0)) return false; } finally { _storage.rUnlock(); } @@ -239,6 +239,8 @@ public class KleppmannTree, PeerIdT ex } finally { _storage.rwUnlock(); } + + return true; } private void applyOp(PeerIdT from, OpMove op, boolean failCreatingIfExists) { diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/jkleppmanntree/JKleppmannTreeManager.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/jkleppmanntree/JKleppmannTreeManager.java index bebd567b..43502d20 100644 --- a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/jkleppmanntree/JKleppmannTreeManager.java +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/jkleppmanntree/JKleppmannTreeManager.java @@ -161,10 +161,9 @@ public class JKleppmannTreeManager { } @Override - public void acceptExternalOp(UUID from, Op op) { + public boolean acceptExternalOp(UUID from, Op op) { if (op instanceof JKleppmannTreePeriodicPushOp pushOp) { - _tree.updateExternalTimestamp(pushOp.getFrom(), pushOp.getTimestamp()); - return; + return _tree.updateExternalTimestamp(pushOp.getFrom(), pushOp.getTimestamp()); } if (!(op instanceof JKleppmannTreeOpWrapper jop)) @@ -222,6 +221,7 @@ public class JKleppmannTreeManager { } } } + return true; } @Override diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/opsupport/OpObject.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/opsupport/OpObject.java index 019dbc1c..edc13484 100644 --- a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/opsupport/OpObject.java +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/opsupport/OpObject.java @@ -14,7 +14,7 @@ public interface OpObject { void pushBootstrap(UUID host); - void acceptExternalOp(UUID from, Op op); + boolean acceptExternalOp(UUID from, Op op); Op getPeriodicPushOp(); diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/opsupport/OpObjectRegistry.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/opsupport/OpObjectRegistry.java index a08919a2..d62de3bf 100644 --- a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/opsupport/OpObjectRegistry.java +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/opsupport/OpObjectRegistry.java @@ -3,8 +3,6 @@ package com.usatiuk.dhfs.objects.repository.opsupport; import com.usatiuk.dhfs.objects.repository.PeerManager; import io.grpc.Status; import io.grpc.StatusRuntimeException; -import io.quarkus.scheduler.Scheduled; -import io.smallrye.common.annotation.Blocking; import jakarta.enterprise.context.ApplicationScoped; import jakarta.inject.Inject; @@ -32,8 +30,11 @@ public class OpObjectRegistry { if (got == null) throw new StatusRuntimeException(Status.NOT_FOUND.withDescription("Queue with id " + objId + " not registered")); got.addToTx(); + boolean push = false; for (Op op : ops) - got.acceptExternalOp(from, op); + push |= got.acceptExternalOp(from, op); + if (push) + opSender.push(got); } public void pushBootstrapData(UUID host) { @@ -43,13 +44,4 @@ public class OpObjectRegistry { o.pushBootstrap(host); } } - - - @Scheduled(every = "${dhfs.objects.periodic-push-op-interval}", concurrentExecution = Scheduled.ConcurrentExecution.SKIP) - @Blocking - void periodicPush() { - for (var obj : _objects.values()) { - opSender.push(obj); - } - } } diff --git a/dhfs-parent/server/src/test/java/com/usatiuk/dhfs/integration/DhfsFuseIT.java b/dhfs-parent/server/src/test/java/com/usatiuk/dhfs/integration/DhfsFuseIT.java index 1fd97272..b9d9f92d 100644 --- a/dhfs-parent/server/src/test/java/com/usatiuk/dhfs/integration/DhfsFuseIT.java +++ b/dhfs-parent/server/src/test/java/com/usatiuk/dhfs/integration/DhfsFuseIT.java @@ -186,9 +186,11 @@ public class DhfsFuseIT { await().atMost(45, TimeUnit.SECONDS).until(() -> "tesempty\n".equals(container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf1").getStdout())); + Log.info("Deleting"); await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "rm /root/dhfs_default/fuse/testf1").getExitCode()); await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse").getExitCode()); + Log.info("Deleted"); // FIXME? waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Deleting from persistent"), 60, TimeUnit.SECONDS, 3); @@ -295,12 +297,13 @@ public class DhfsFuseIT { } }).anyMatch(r -> r != 0); Assumptions.assumeTrue(!createFail, "Failed creating one or more files"); - var ls = container2.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse"); - var cat = container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/*"); - Log.info(ls); - Log.info(cat); - await().atMost(45, TimeUnit.SECONDS).until(() -> cat.getStdout().contains("test1") && cat.getStdout().contains("test2")); -// Assertions.assertTrue(ls.getStdout().chars().filter(c -> c == '\n').count() >= 2); + await().atMost(45, TimeUnit.SECONDS).until(() -> { + var ls = container2.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse"); + var cat = container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/*"); + Log.info(ls); + Log.info(cat); + return cat.getStdout().contains("test1") && cat.getStdout().contains("test2"); + }); } @Test diff --git a/dhfs-parent/server/src/test/java/com/usatiuk/dhfs/integration/DhfsFusex3IT.java b/dhfs-parent/server/src/test/java/com/usatiuk/dhfs/integration/DhfsFusex3IT.java index 45a602ea..b401b053 100644 --- a/dhfs-parent/server/src/test/java/com/usatiuk/dhfs/integration/DhfsFusex3IT.java +++ b/dhfs-parent/server/src/test/java/com/usatiuk/dhfs/integration/DhfsFusex3IT.java @@ -35,7 +35,8 @@ public class DhfsFusex3IT { String c2uuid; String c3uuid; - long emptyFileCount; + // This calculation is somewhat racy, so keep it hardcoded for now + long emptyFileCount = 9; @BeforeEach void setup(TestInfo testInfo) throws IOException, InterruptedException, TimeoutException { @@ -114,8 +115,6 @@ public class DhfsFusex3IT { waitingConsumer3.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS, 2); waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS, 2); waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS, 2); - - emptyFileCount = Integer.valueOf(container1.execInContainer("/bin/sh", "-c", "find /root/dhfs_default/data/objs -type f | wc -l").getStdout().strip()); } private boolean checkEmpty() throws IOException, InterruptedException {