mirror of
https://github.com/usatiuk/dhfs.git
synced 2025-10-28 20:47:49 +01:00
Server: don't forget to really send all the ops
This commit is contained in:
@@ -93,6 +93,15 @@ public class JKleppmannTreeManager {
|
||||
_tree.move(_storageInterface.getTrashId(), newMeta.withName(node), node);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasPendingOpsForHost(UUID host) {
|
||||
return _persistentData.get()
|
||||
.runReadLocked(JObjectManager.ResolutionStrategy.LOCAL_ONLY,
|
||||
(m, d) -> d.getQueues().containsKey(host) &&
|
||||
!d.getQueues().get(host).isEmpty()
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Op> getPendingOpsForHost(UUID host, int limit) {
|
||||
return _persistentData.get().runReadLocked(JObjectManager.ResolutionStrategy.LOCAL_ONLY, (m, d) -> {
|
||||
|
||||
@@ -6,6 +6,8 @@ import java.util.UUID;
|
||||
public interface OpObject {
|
||||
String getId();
|
||||
|
||||
boolean hasPendingOpsForHost(UUID host);
|
||||
|
||||
List<Op> getPendingOpsForHost(UUID host, int limit);
|
||||
|
||||
void commitOpForHost(UUID host, Op op);
|
||||
|
||||
@@ -74,9 +74,19 @@ public class OpSender {
|
||||
// Must be peeked before getPendingOpForHost
|
||||
var periodicPushOp = obj.getPeriodicPushOp();
|
||||
|
||||
List<Op> collected = obj.getPendingOpsForHost(host, batchSize);
|
||||
if (!obj.hasPendingOpsForHost(host)) {
|
||||
if (periodicPushOp == null) return;
|
||||
try {
|
||||
remoteObjectServiceClient.pushOps(List.of(periodicPushOp), obj.getId(), host);
|
||||
Log.debug("Sent periodic op update to " + host + "of" + obj.getId());
|
||||
} catch (Throwable e) {
|
||||
Log.warn("Error pushing periodic op for " + host + " of " + obj.getId(), e);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
if (!collected.isEmpty()) {
|
||||
while (obj.hasPendingOpsForHost(host)) {
|
||||
List<Op> collected = obj.getPendingOpsForHost(host, batchSize);
|
||||
try {
|
||||
// The peer should finish the call only if it had persisted everything
|
||||
remoteObjectServiceClient.pushOps(collected, obj.getId(), host);
|
||||
@@ -90,14 +100,6 @@ public class OpSender {
|
||||
} catch (Throwable e) {
|
||||
Log.warn("Error sending op to " + host, e);
|
||||
}
|
||||
} else {
|
||||
if (periodicPushOp == null) return;
|
||||
try {
|
||||
remoteObjectServiceClient.pushOps(List.of(periodicPushOp), obj.getId(), host);
|
||||
Log.debug("Sent periodic op update to " + host + "of" + obj.getId());
|
||||
} catch (Throwable e) {
|
||||
Log.warn("Error pushing periodic op for " + host + " of " + obj.getId(), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -85,4 +85,47 @@ public class ResyncIT {
|
||||
Assertions.assertEquals("tesempty\n", container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf1").getStdout());
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
void manyFiles() throws IOException, InterruptedException, TimeoutException {
|
||||
var ret = container1.execInContainer("/bin/sh", "-c", "for i in $(seq 1 200); do echo $i > /root/dhfs_default/fuse/test$i; done");
|
||||
Assertions.assertEquals(0, ret.getExitCode());
|
||||
var foundWc = container1.execInContainer("/bin/sh", "-c", "find /root/dhfs_default/fuse -type f | wc -l");
|
||||
Assertions.assertEquals(200, Integer.valueOf(foundWc.getStdout().strip()));
|
||||
|
||||
ret = container2.execInContainer("/bin/sh", "-c", "for i in $(seq 1 200); do echo $i > /root/dhfs_default/fuse/test-2-$i; done");
|
||||
Assertions.assertEquals(0, ret.getExitCode());
|
||||
foundWc = container2.execInContainer("/bin/sh", "-c", "find /root/dhfs_default/fuse -type f | wc -l");
|
||||
Assertions.assertEquals(200, Integer.valueOf(foundWc.getStdout().strip()));
|
||||
|
||||
c1uuid = container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/data/stuff/self_uuid").getStdout();
|
||||
c2uuid = container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/data/stuff/self_uuid").getStdout();
|
||||
|
||||
Assertions.assertDoesNotThrow(() -> UUID.fromString(c1uuid));
|
||||
Assertions.assertDoesNotThrow(() -> UUID.fromString(c2uuid));
|
||||
|
||||
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Ignoring new address"), 60, TimeUnit.SECONDS);
|
||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Ignoring new address"), 60, TimeUnit.SECONDS);
|
||||
|
||||
var c1curl = container1.execInContainer("/bin/sh", "-c",
|
||||
"curl --header \"Content-Type: application/json\" " +
|
||||
" --request PUT " +
|
||||
" --data '{\"uuid\":\"" + c2uuid + "\"}' " +
|
||||
" http://localhost:8080/objects-manage/known-peers");
|
||||
|
||||
var c2curl = container2.execInContainer("/bin/sh", "-c",
|
||||
"curl --header \"Content-Type: application/json\" " +
|
||||
" --request PUT " +
|
||||
" --data '{\"uuid\":\"" + c1uuid + "\"}' " +
|
||||
" http://localhost:8080/objects-manage/known-peers");
|
||||
|
||||
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
|
||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
|
||||
Thread.sleep(5000);
|
||||
var foundWc2 = container2.execInContainer("/bin/sh", "-c", "find /root/dhfs_default/fuse -type f | wc -l");
|
||||
Assertions.assertEquals(400, Integer.valueOf(foundWc2.getStdout().strip()));
|
||||
foundWc2 = container1.execInContainer("/bin/sh", "-c", "find /root/dhfs_default/fuse -type f | wc -l");
|
||||
Assertions.assertEquals(400, Integer.valueOf(foundWc2.getStdout().strip()));
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user