diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/jkleppmanntree/JKleppmannTreeManager.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/jkleppmanntree/JKleppmannTreeManager.java index 732bb1c6..ec0c3844 100644 --- a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/jkleppmanntree/JKleppmannTreeManager.java +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/jkleppmanntree/JKleppmannTreeManager.java @@ -93,6 +93,15 @@ public class JKleppmannTreeManager { _tree.move(_storageInterface.getTrashId(), newMeta.withName(node), node); } + @Override + public boolean hasPendingOpsForHost(UUID host) { + return _persistentData.get() + .runReadLocked(JObjectManager.ResolutionStrategy.LOCAL_ONLY, + (m, d) -> d.getQueues().containsKey(host) && + !d.getQueues().get(host).isEmpty() + ); + } + @Override public List getPendingOpsForHost(UUID host, int limit) { return _persistentData.get().runReadLocked(JObjectManager.ResolutionStrategy.LOCAL_ONLY, (m, d) -> { diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/opsupport/OpObject.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/opsupport/OpObject.java index 6fb345ac..019dbc1c 100644 --- a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/opsupport/OpObject.java +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/opsupport/OpObject.java @@ -6,6 +6,8 @@ import java.util.UUID; public interface OpObject { String getId(); + boolean hasPendingOpsForHost(UUID host); + List getPendingOpsForHost(UUID host, int limit); void commitOpForHost(UUID host, Op op); diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/opsupport/OpSender.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/opsupport/OpSender.java index c5c59c2a..9cd68547 100644 --- a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/opsupport/OpSender.java +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/opsupport/OpSender.java @@ -74,9 +74,19 @@ public class OpSender { // Must be peeked before getPendingOpForHost var periodicPushOp = obj.getPeriodicPushOp(); - List collected = obj.getPendingOpsForHost(host, batchSize); + if (!obj.hasPendingOpsForHost(host)) { + if (periodicPushOp == null) return; + try { + remoteObjectServiceClient.pushOps(List.of(periodicPushOp), obj.getId(), host); + Log.debug("Sent periodic op update to " + host + "of" + obj.getId()); + } catch (Throwable e) { + Log.warn("Error pushing periodic op for " + host + " of " + obj.getId(), e); + } + return; + } - if (!collected.isEmpty()) { + while (obj.hasPendingOpsForHost(host)) { + List collected = obj.getPendingOpsForHost(host, batchSize); try { // The peer should finish the call only if it had persisted everything remoteObjectServiceClient.pushOps(collected, obj.getId(), host); @@ -90,14 +100,6 @@ public class OpSender { } catch (Throwable e) { Log.warn("Error sending op to " + host, e); } - } else { - if (periodicPushOp == null) return; - try { - remoteObjectServiceClient.pushOps(List.of(periodicPushOp), obj.getId(), host); - Log.debug("Sent periodic op update to " + host + "of" + obj.getId()); - } catch (Throwable e) { - Log.warn("Error pushing periodic op for " + host + " of " + obj.getId(), e); - } } } diff --git a/dhfs-parent/server/src/test/java/com/usatiuk/dhfs/integration/ResyncIT.java b/dhfs-parent/server/src/test/java/com/usatiuk/dhfs/integration/ResyncIT.java index c244b6c1..9d32aa72 100644 --- a/dhfs-parent/server/src/test/java/com/usatiuk/dhfs/integration/ResyncIT.java +++ b/dhfs-parent/server/src/test/java/com/usatiuk/dhfs/integration/ResyncIT.java @@ -85,4 +85,47 @@ public class ResyncIT { Assertions.assertEquals("tesempty\n", container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf1").getStdout()); } + + @Test + void manyFiles() throws IOException, InterruptedException, TimeoutException { + var ret = container1.execInContainer("/bin/sh", "-c", "for i in $(seq 1 200); do echo $i > /root/dhfs_default/fuse/test$i; done"); + Assertions.assertEquals(0, ret.getExitCode()); + var foundWc = container1.execInContainer("/bin/sh", "-c", "find /root/dhfs_default/fuse -type f | wc -l"); + Assertions.assertEquals(200, Integer.valueOf(foundWc.getStdout().strip())); + + ret = container2.execInContainer("/bin/sh", "-c", "for i in $(seq 1 200); do echo $i > /root/dhfs_default/fuse/test-2-$i; done"); + Assertions.assertEquals(0, ret.getExitCode()); + foundWc = container2.execInContainer("/bin/sh", "-c", "find /root/dhfs_default/fuse -type f | wc -l"); + Assertions.assertEquals(200, Integer.valueOf(foundWc.getStdout().strip())); + + c1uuid = container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/data/stuff/self_uuid").getStdout(); + c2uuid = container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/data/stuff/self_uuid").getStdout(); + + Assertions.assertDoesNotThrow(() -> UUID.fromString(c1uuid)); + Assertions.assertDoesNotThrow(() -> UUID.fromString(c2uuid)); + + waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Ignoring new address"), 60, TimeUnit.SECONDS); + waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Ignoring new address"), 60, TimeUnit.SECONDS); + + var c1curl = container1.execInContainer("/bin/sh", "-c", + "curl --header \"Content-Type: application/json\" " + + " --request PUT " + + " --data '{\"uuid\":\"" + c2uuid + "\"}' " + + " http://localhost:8080/objects-manage/known-peers"); + + var c2curl = container2.execInContainer("/bin/sh", "-c", + "curl --header \"Content-Type: application/json\" " + + " --request PUT " + + " --data '{\"uuid\":\"" + c1uuid + "\"}' " + + " http://localhost:8080/objects-manage/known-peers"); + + waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS); + waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS); + Thread.sleep(5000); + var foundWc2 = container2.execInContainer("/bin/sh", "-c", "find /root/dhfs_default/fuse -type f | wc -l"); + Assertions.assertEquals(400, Integer.valueOf(foundWc2.getStdout().strip())); + foundWc2 = container1.execInContainer("/bin/sh", "-c", "find /root/dhfs_default/fuse -type f | wc -l"); + Assertions.assertEquals(400, Integer.valueOf(foundWc2.getStdout().strip())); + } + }