6 Commits

Author SHA1 Message Date
d48cc18e85 Server: only try to purge directory
won't bother getting it working with rootful docker
2025-04-13 22:03:31 +02:00
77177414eb Server: slight directory fixes 2025-04-13 20:37:12 +02:00
83e0f6eb0a Server: fix not being able to delete temp dir in tests 2025-04-13 20:05:51 +02:00
a5727c01b1 Server: push resync after crash 2025-04-13 19:57:16 +02:00
711c4f5e28 Server: simple kill test 2025-04-13 19:36:34 +02:00
45556f2b74 Objects: rename name to value in JObjectKey 2025-04-13 17:10:33 +02:00
29 changed files with 537 additions and 267 deletions

View File

@@ -30,11 +30,15 @@ public class TestDataCleaner {
purgeDirectory(Path.of(tempDirectory).toFile());
}
void purgeDirectory(File dir) {
for (File file : Objects.requireNonNull(dir.listFiles())) {
if (file.isDirectory())
purgeDirectory(file);
file.delete();
public static void purgeDirectory(File dir) {
try {
for (File file : Objects.requireNonNull(dir.listFiles())) {
if (file.isDirectory())
purgeDirectory(file);
file.delete();
}
} catch (Exception e) {
Log.error("Couldn't purge directory " + dir, e);
}
}
}

View File

@@ -53,8 +53,8 @@ public class DhfsFuseIT {
var loggingConsumer2 = new Slf4jLogConsumer(LoggerFactory.getLogger(DhfsFuseIT.class)).withPrefix("2-" + testInfo.getDisplayName());
container2.followOutput(loggingConsumer2.andThen(waitingConsumer2));
c1uuid = container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/data/stuff/self_uuid").getStdout();
c2uuid = container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/data/stuff/self_uuid").getStdout();
c1uuid = container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/data/stuff/self_uuid").getStdout();
c2uuid = container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/data/stuff/self_uuid").getStdout();
Assertions.assertDoesNotThrow(() -> UUID.fromString(c1uuid));
Assertions.assertDoesNotThrow(() -> UUID.fromString(c2uuid));
@@ -85,111 +85,111 @@ public class DhfsFuseIT {
@Test
void readWriteFileTest() throws IOException, InterruptedException, TimeoutException {
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo tesempty > /root/dhfs_default/fuse/testf1").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo tesempty > /dhfs_test/fuse/testf1").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() ->
"tesempty\n".equals(container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf1").getStdout()));
"tesempty\n".equals(container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testf1").getStdout()));
}
@Test
void readWriteRewriteFileTest() throws IOException, InterruptedException, TimeoutException {
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo tesempty > /root/dhfs_default/fuse/testf1").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo tesempty > /dhfs_test/fuse/testf1").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() ->
"tesempty\n".equals(container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf1").getStdout()));
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo rewritten > /root/dhfs_default/fuse/testf1").getExitCode());
"tesempty\n".equals(container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testf1").getStdout()));
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo rewritten > /dhfs_test/fuse/testf1").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() ->
"rewritten\n".equals(container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf1").getStdout()));
"rewritten\n".equals(container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testf1").getStdout()));
}
@Test
void createDelayedTest() throws IOException, InterruptedException, TimeoutException {
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo tesempty > /root/dhfs_default/fuse/testf1").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo tesempty > /dhfs_test/fuse/testf1").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() ->
"tesempty\n".equals(container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf1").getStdout()));
"tesempty\n".equals(container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testf1").getStdout()));
await().atMost(45, TimeUnit.SECONDS).until(() ->
"tesempty\n".equals(container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf1").getStdout()));
"tesempty\n".equals(container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testf1").getStdout()));
var client = DockerClientFactory.instance().client();
client.pauseContainerCmd(container2.getContainerId()).exec();
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Lost connection to"), 60, TimeUnit.SECONDS);
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo newfile > /root/dhfs_default/fuse/testf2").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo newfile > /dhfs_test/fuse/testf2").getExitCode());
client.unpauseContainerCmd(container2.getContainerId()).exec();
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
await().atMost(45, TimeUnit.SECONDS).until(() ->
"newfile\n".equals(container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf2").getStdout()));
"newfile\n".equals(container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testf2").getStdout()));
await().atMost(45, TimeUnit.SECONDS).until(() ->
"newfile\n".equals(container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf2").getStdout()));
"newfile\n".equals(container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testf2").getStdout()));
}
@Test
void writeRewriteDelayedTest() throws IOException, InterruptedException, TimeoutException {
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo tesempty > /root/dhfs_default/fuse/testf1").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo tesempty > /dhfs_test/fuse/testf1").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() ->
"tesempty\n".equals(container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf1").getStdout()));
"tesempty\n".equals(container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testf1").getStdout()));
await().atMost(45, TimeUnit.SECONDS).until(() ->
"tesempty\n".equals(container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf1").getStdout()));
"tesempty\n".equals(container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testf1").getStdout()));
var client = DockerClientFactory.instance().client();
client.pauseContainerCmd(container2.getContainerId()).exec();
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Lost connection to"), 60, TimeUnit.SECONDS);
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo rewritten > /root/dhfs_default/fuse/testf1").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo rewritten > /dhfs_test/fuse/testf1").getExitCode());
client.unpauseContainerCmd(container2.getContainerId()).exec();
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
await().atMost(45, TimeUnit.SECONDS).until(() ->
"rewritten\n".equals(container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf1").getStdout()));
"rewritten\n".equals(container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testf1").getStdout()));
await().atMost(45, TimeUnit.SECONDS).until(() ->
"rewritten\n".equals(container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf1").getStdout()));
"rewritten\n".equals(container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testf1").getStdout()));
}
// TODO: How this fits with the tree?
@Test
@Disabled
void deleteDelayedTest() throws IOException, InterruptedException, TimeoutException {
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo tesempty > /root/dhfs_default/fuse/testf1").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> "tesempty\n".equals(container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf1").getStdout()));
await().atMost(45, TimeUnit.SECONDS).until(() -> "tesempty\n".equals(container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf1").getStdout()));
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo tesempty > /dhfs_test/fuse/testf1").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> "tesempty\n".equals(container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testf1").getStdout()));
await().atMost(45, TimeUnit.SECONDS).until(() -> "tesempty\n".equals(container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testf1").getStdout()));
var client = DockerClientFactory.instance().client();
client.pauseContainerCmd(container2.getContainerId()).exec();
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Lost connection to"), 60, TimeUnit.SECONDS);
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "rm /root/dhfs_default/fuse/testf1").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "rm /dhfs_test/fuse/testf1").getExitCode());
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Delaying deletion check"), 60, TimeUnit.SECONDS, 1);
client.unpauseContainerCmd(container2.getContainerId()).exec();
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse").getExitCode());
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Deleting from persistent"), 60, TimeUnit.SECONDS, 1);
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Deleting from persistent"), 60, TimeUnit.SECONDS, 3);
await().atMost(45, TimeUnit.SECONDS).until(() -> 1 == container2.execInContainer("/bin/sh", "-c", "test -f /root/dhfs_default/fuse/testf1").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> 1 == container1.execInContainer("/bin/sh", "-c", "test -f /root/dhfs_default/fuse/testf1").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> 1 == container2.execInContainer("/bin/sh", "-c", "test -f /dhfs_test/fuse/testf1").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> 1 == container1.execInContainer("/bin/sh", "-c", "test -f /dhfs_test/fuse/testf1").getExitCode());
}
@Test
void deleteTest() throws IOException, InterruptedException, TimeoutException {
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo tesempty > /root/dhfs_default/fuse/testf1").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo tesempty > /dhfs_test/fuse/testf1").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() ->
"tesempty\n".equals(container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf1").getStdout()));
"tesempty\n".equals(container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testf1").getStdout()));
await().atMost(45, TimeUnit.SECONDS).until(() ->
"tesempty\n".equals(container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf1").getStdout()));
"tesempty\n".equals(container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testf1").getStdout()));
Log.info("Deleting");
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "rm /root/dhfs_default/fuse/testf1").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "rm /dhfs_test/fuse/testf1").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() ->
0 == container2.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse").getExitCode());
0 == container2.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse").getExitCode());
Log.info("Deleted");
// FIXME?
@@ -197,50 +197,50 @@ public class DhfsFuseIT {
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Deleting from persistent"), 60, TimeUnit.SECONDS, 3);
await().atMost(45, TimeUnit.SECONDS).until(() ->
1 == container2.execInContainer("/bin/sh", "-c", "test -f /root/dhfs_default/fuse/testf1").getExitCode());
1 == container2.execInContainer("/bin/sh", "-c", "test -f /dhfs_test/fuse/testf1").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() ->
1 == container1.execInContainer("/bin/sh", "-c", "test -f /root/dhfs_default/fuse/testf1").getExitCode());
1 == container1.execInContainer("/bin/sh", "-c", "test -f /dhfs_test/fuse/testf1").getExitCode());
}
@Test
void moveFileTest() throws IOException, InterruptedException, TimeoutException {
Log.info("Creating");
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo tesempty > /root/dhfs_default/fuse/testf1").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> "tesempty\n".equals(container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf1").getStdout()));
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo tesempty > /dhfs_test/fuse/testf1").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> "tesempty\n".equals(container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testf1").getStdout()));
Log.info("Listing");
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse/").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse/").getExitCode());
Log.info("Moving");
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "mv /root/dhfs_default/fuse/testf1 /root/dhfs_default/fuse/testf2").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "mv /dhfs_test/fuse/testf1 /dhfs_test/fuse/testf2").getExitCode());
Log.info("Listing");
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse/").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse/").getExitCode());
Log.info("Reading");
await().atMost(45, TimeUnit.SECONDS).until(() -> "tesempty\n".equals(container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf2").getStdout()));
await().atMost(45, TimeUnit.SECONDS).until(() -> "tesempty\n".equals(container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testf2").getStdout()));
}
@Test
void moveDirTest() throws IOException, InterruptedException, TimeoutException {
Log.info("Creating");
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "mkdir /root/dhfs_default/fuse/testdir").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo tesempty > /root/dhfs_default/fuse/testdir/testf1").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> "tesempty\n".equals(container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testdir/testf1").getStdout()));
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "mkdir /dhfs_test/fuse/testdir").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo tesempty > /dhfs_test/fuse/testdir/testf1").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> "tesempty\n".equals(container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testdir/testf1").getStdout()));
Log.info("Listing");
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse/").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse/").getExitCode());
Log.info("Moving");
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "mkdir /root/dhfs_default/fuse/testdir2").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "mv /root/dhfs_default/fuse/testdir /root/dhfs_default/fuse/testdir2/testdirm").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "mkdir /dhfs_test/fuse/testdir2").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "mv /dhfs_test/fuse/testdir /dhfs_test/fuse/testdir2/testdirm").getExitCode());
Log.info("Listing");
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse/").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse/").getExitCode());
Log.info("Reading");
await().atMost(45, TimeUnit.SECONDS).until(() -> "tesempty\n".equals(container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testdir2/testdirm/testf1").getStdout()));
await().atMost(45, TimeUnit.SECONDS).until(() -> "tesempty\n".equals(container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testdir2/testdirm/testf1").getStdout()));
}
// TODO: This probably shouldn't be working right now
@Test
void removeAddHostTest() throws IOException, InterruptedException, TimeoutException {
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo tesempty > /root/dhfs_default/fuse/testf1").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> "tesempty\n".equals(container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf1").getStdout()));
await().atMost(45, TimeUnit.SECONDS).until(() -> "tesempty\n".equals(container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf1").getStdout()));
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo tesempty > /dhfs_test/fuse/testf1").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> "tesempty\n".equals(container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testf1").getStdout()));
await().atMost(45, TimeUnit.SECONDS).until(() -> "tesempty\n".equals(container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testf1").getStdout()));
var c2curl = container2.execInContainer("/bin/sh", "-c",
"curl --header \"Content-Type: application/json\" " +
@@ -248,14 +248,14 @@ public class DhfsFuseIT {
" --data '{\"uuid\":\"" + c1uuid + "\"}' " +
" http://localhost:8080/peers-manage/known-peers");
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "echo rewritten > /root/dhfs_default/fuse/testf1").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "echo jioadsd > /root/dhfs_default/fuse/newfile1").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo asvdkljm > /root/dhfs_default/fuse/newfile1").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> "tesempty\n".equals(container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf1").getStdout()));
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "echo rewritten > /dhfs_test/fuse/testf1").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "echo jioadsd > /dhfs_test/fuse/newfile1").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo asvdkljm > /dhfs_test/fuse/newfile1").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> "tesempty\n".equals(container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testf1").getStdout()));
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Lost connection to"), 60, TimeUnit.SECONDS);
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "echo dfgvh > /root/dhfs_default/fuse/newfile2").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo dscfg > /root/dhfs_default/fuse/newfile2").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "echo dfgvh > /dhfs_test/fuse/newfile2").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo dscfg > /dhfs_test/fuse/newfile2").getExitCode());
Log.info("Re-adding");
container2.execInContainer("/bin/sh", "-c",
@@ -266,14 +266,14 @@ public class DhfsFuseIT {
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
await().atMost(45, TimeUnit.SECONDS).until(() -> "rewritten\n".equals(container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf1").getStdout()));
await().atMost(45, TimeUnit.SECONDS).until(() -> "rewritten\n".equals(container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf1").getStdout()));
await().atMost(45, TimeUnit.SECONDS).until(() -> "rewritten\n".equals(container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testf1").getStdout()));
await().atMost(45, TimeUnit.SECONDS).until(() -> "rewritten\n".equals(container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testf1").getStdout()));
await().atMost(45, TimeUnit.SECONDS).until(() -> {
Log.info("Listing removeAddHostTest");
var cat1 = container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/*");
var cat2 = container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/*");
var ls1 = container1.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse/");
var ls2 = container2.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse/");
var cat1 = container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/*");
var cat2 = container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/*");
var ls1 = container1.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse/");
var ls2 = container2.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse/");
Log.info(cat1);
Log.info(cat2);
Log.info(ls1);
@@ -286,10 +286,10 @@ public class DhfsFuseIT {
@Test
void dirConflictTest() throws IOException, InterruptedException, TimeoutException {
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse").getExitCode());
boolean createFail = Stream.of(Pair.of(container1, "echo test1 >> /root/dhfs_default/fuse/testf"),
Pair.of(container2, "echo test2 >> /root/dhfs_default/fuse/testf")).parallel().map(p -> {
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse").getExitCode());
boolean createFail = Stream.of(Pair.of(container1, "echo test1 >> /dhfs_test/fuse/testf"),
Pair.of(container2, "echo test2 >> /dhfs_test/fuse/testf")).parallel().map(p -> {
try {
return p.getLeft().execInContainer("/bin/sh", "-c", p.getRight()).getExitCode();
} catch (Exception e) {
@@ -298,8 +298,8 @@ public class DhfsFuseIT {
}).anyMatch(r -> r != 0);
Assumptions.assumeTrue(!createFail, "Failed creating one or more files");
await().atMost(45, TimeUnit.SECONDS).until(() -> {
var ls = container2.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse");
var cat = container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/*");
var ls = container2.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse");
var cat = container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/*");
Log.info(ls);
Log.info(cat);
return cat.getStdout().contains("test1") && cat.getStdout().contains("test2");
@@ -308,38 +308,38 @@ public class DhfsFuseIT {
@Test
void dirCycleTest() throws IOException, InterruptedException, TimeoutException {
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "mkdir /root/dhfs_default/fuse/a").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "mkdir /root/dhfs_default/fuse/b").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo xqr489 >> /root/dhfs_default/fuse/a/testfa").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo ahinou >> /root/dhfs_default/fuse/b/testfb").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "ls -lavh /root/dhfs_default/fuse").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "mkdir /dhfs_test/fuse/a").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "mkdir /dhfs_test/fuse/b").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo xqr489 >> /dhfs_test/fuse/a/testfa").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo ahinou >> /dhfs_test/fuse/b/testfb").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "ls -lavh /dhfs_test/fuse").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> {
var c2ls = container2.execInContainer("/bin/sh", "-c", "find /root/dhfs_default/fuse -type f -exec cat {} \\;");
var c2ls = container2.execInContainer("/bin/sh", "-c", "find /dhfs_test/fuse -type f -exec cat {} \\;");
return c2ls.getExitCode() == 0 && c2ls.getStdout().contains("xqr489") && c2ls.getStdout().contains("ahinou");
});
var client = DockerClientFactory.instance().client();
client.pauseContainerCmd(container1.getContainerId()).exec();
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "mv /root/dhfs_default/fuse/a /root/dhfs_default/fuse/b").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "mv /dhfs_test/fuse/a /dhfs_test/fuse/b").getExitCode());
client.pauseContainerCmd(container2.getContainerId()).exec();
client.unpauseContainerCmd(container1.getContainerId()).exec();
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "mv /root/dhfs_default/fuse/b /root/dhfs_default/fuse/a").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "mv /dhfs_test/fuse/b /dhfs_test/fuse/a").getExitCode());
client.unpauseContainerCmd(container2.getContainerId()).exec();
await().atMost(45, TimeUnit.SECONDS).until(() -> {
Log.info("Listing dirCycleTest");
Log.info(container1.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse"));
Log.info(container1.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse/a"));
Log.info(container1.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse/b"));
Log.info(container2.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse"));
Log.info(container2.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse/a"));
Log.info(container2.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse/b"));
Log.info(container1.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse"));
Log.info(container1.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse/a"));
Log.info(container1.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse/b"));
Log.info(container2.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse"));
Log.info(container2.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse/a"));
Log.info(container2.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse/b"));
var c1ls2 = container1.execInContainer("/bin/sh", "-c", "find /root/dhfs_default/fuse -maxdepth 3 -type f -exec cat {} \\;");
var c1ls2 = container1.execInContainer("/bin/sh", "-c", "find /dhfs_test/fuse -maxdepth 3 -type f -exec cat {} \\;");
Log.info(c1ls2);
var c2ls2 = container1.execInContainer("/bin/sh", "-c", "find /root/dhfs_default/fuse -maxdepth 3 -type f -exec cat {} \\;");
var c2ls2 = container1.execInContainer("/bin/sh", "-c", "find /dhfs_test/fuse -maxdepth 3 -type f -exec cat {} \\;");
Log.info(c2ls2);
return c1ls2.getStdout().contains("xqr489") && c1ls2.getStdout().contains("ahinou")
@@ -353,27 +353,27 @@ public class DhfsFuseIT {
void removeAndMove() throws IOException, InterruptedException, TimeoutException {
var client = DockerClientFactory.instance().client();
Log.info("Creating");
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo tesempty > /root/dhfs_default/fuse/testf1").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> "tesempty\n".equals(container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf1").getStdout()));
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo tesempty > /dhfs_test/fuse/testf1").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> "tesempty\n".equals(container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testf1").getStdout()));
Log.info("Listing");
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse/").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> "tesempty\n".equals(container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf1").getStdout()));
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse/").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> "tesempty\n".equals(container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testf1").getStdout()));
client.pauseContainerCmd(container1.getContainerId()).exec();
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Lost connection to"), 60, TimeUnit.SECONDS, 1);
Log.info("Removing");
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "rm /root/dhfs_default/fuse/testf1").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "rm /dhfs_test/fuse/testf1").getExitCode());
client.pauseContainerCmd(container2.getContainerId()).exec();
client.unpauseContainerCmd(container1.getContainerId()).exec();
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Lost connection to"), 60, TimeUnit.SECONDS, 1);
Log.info("Moving");
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "mv /root/dhfs_default/fuse/testf1 /root/dhfs_default/fuse/testf2").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "mv /dhfs_test/fuse/testf1 /dhfs_test/fuse/testf2").getExitCode());
Log.info("Listing");
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse/").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse/").getExitCode());
Log.info("Reading");
await().atMost(45, TimeUnit.SECONDS).until(() -> "tesempty\n".equals(container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf2").getStdout()));
await().atMost(45, TimeUnit.SECONDS).until(() -> "tesempty\n".equals(container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testf2").getStdout()));
client.unpauseContainerCmd(container2.getContainerId()).exec();
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS, 1);
@@ -382,10 +382,10 @@ public class DhfsFuseIT {
// TODO: it always seems to be removed?
Log.info("Reading both");
await().atMost(45, TimeUnit.SECONDS).until(() -> {
var ls1 = container1.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse/");
var ls2 = container2.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse/");
var cat1 = container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/*");
var cat2 = container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/*");
var ls1 = container1.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse/");
var ls2 = container2.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse/");
var cat1 = container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/*");
var cat2 = container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/*");
Log.info("cat1: " + cat1);
Log.info("cat2: " + cat2);
Log.info("ls1: " + ls1);

View File

@@ -59,9 +59,9 @@ public class DhfsFusex3IT {
Stream.of(container1, container2, container3).parallel().forEach(GenericContainer::start);
c1uuid = container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/data/stuff/self_uuid").getStdout();
c2uuid = container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/data/stuff/self_uuid").getStdout();
c3uuid = container3.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/data/stuff/self_uuid").getStdout();
c1uuid = container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/data/stuff/self_uuid").getStdout();
c2uuid = container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/data/stuff/self_uuid").getStdout();
c3uuid = container3.execInContainer("/bin/sh", "-c", "cat /dhfs_test/data/stuff/self_uuid").getStdout();
Log.info(container1.getContainerId() + "=" + c1uuid);
Log.info(container2.getContainerId() + "=" + c2uuid);
@@ -119,8 +119,8 @@ public class DhfsFusex3IT {
private boolean checkEmpty() throws IOException, InterruptedException {
for (var container : List.of(container1, container2, container3)) {
var found = container.execInContainer("/bin/sh", "-c", "find /root/dhfs_default/data/objs -type f");
var foundWc = container.execInContainer("/bin/sh", "-c", "find /root/dhfs_default/data/objs -type f | wc -l");
var found = container.execInContainer("/bin/sh", "-c", "find /dhfs_test/data/objs -type f");
var foundWc = container.execInContainer("/bin/sh", "-c", "find /dhfs_test/data/objs -type f | wc -l");
Log.info("Remaining objects in " + container.getContainerId() + ": " + found.toString() + " " + foundWc.toString());
if (!(found.getExitCode() == 0 && foundWc.getExitCode() == 0 && Integer.parseInt(foundWc.getStdout().strip()) == emptyFileCount))
return false;
@@ -135,47 +135,47 @@ public class DhfsFusex3IT {
@Test
void readWriteFileTest() throws IOException, InterruptedException, TimeoutException {
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo tesempty > /root/dhfs_default/fuse/testf1").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> "tesempty\n".equals(container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf1").getStdout()));
await().atMost(45, TimeUnit.SECONDS).until(() -> "tesempty\n".equals(container3.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf1").getStdout()));
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo tesempty > /dhfs_test/fuse/testf1").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> "tesempty\n".equals(container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testf1").getStdout()));
await().atMost(45, TimeUnit.SECONDS).until(() -> "tesempty\n".equals(container3.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testf1").getStdout()));
}
// FIXME:
@Test
@Disabled
void largerFileDeleteTest() throws IOException, InterruptedException, TimeoutException {
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "cd /root/dhfs_default/fuse && dd if=/dev/urandom of=10MB.bin bs=1M count=10").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "head -c 10 /root/dhfs_default/fuse/10MB.bin").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container3.execInContainer("/bin/sh", "-c", "rm /root/dhfs_default/fuse/10MB.bin").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "cd /dhfs_test/fuse && dd if=/dev/urandom of=10MB.bin bs=1M count=10").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "head -c 10 /dhfs_test/fuse/10MB.bin").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container3.execInContainer("/bin/sh", "-c", "rm /dhfs_test/fuse/10MB.bin").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> checkEmpty());
}
@Test
@Disabled
void largerFileDeleteTestNoDelays() throws IOException, InterruptedException, TimeoutException {
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "cd /root/dhfs_default/fuse && dd if=/dev/urandom of=10MB.bin bs=1M count=10").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "head -c 10 /root/dhfs_default/fuse/10MB.bin").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container3.execInContainer("/bin/sh", "-c", "rm /root/dhfs_default/fuse/10MB.bin").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "cd /dhfs_test/fuse && dd if=/dev/urandom of=10MB.bin bs=1M count=10").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "head -c 10 /dhfs_test/fuse/10MB.bin").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container3.execInContainer("/bin/sh", "-c", "rm /dhfs_test/fuse/10MB.bin").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> checkEmpty());
}
@Test
void gccHelloWorldTest() throws IOException, InterruptedException, TimeoutException {
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo '#include<stdio.h>\nint main(){printf(\"hello world\"); return 0;}' > /root/dhfs_default/fuse/hello.c").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "cd /root/dhfs_default/fuse && gcc hello.c").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo '#include<stdio.h>\nint main(){printf(\"hello world\"); return 0;}' > /dhfs_test/fuse/hello.c").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "cd /dhfs_test/fuse && gcc hello.c").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> {
var helloOut = container1.execInContainer("/bin/sh", "-c", "/root/dhfs_default/fuse/a.out");
var helloOut = container1.execInContainer("/bin/sh", "-c", "/dhfs_test/fuse/a.out");
Log.info(helloOut);
return helloOut.getExitCode() == 0 && helloOut.getStdout().equals("hello world");
});
await().atMost(45, TimeUnit.SECONDS).until(() -> {
var helloOut = container2.execInContainer("/bin/sh", "-c", "/root/dhfs_default/fuse/a.out");
var helloOut = container2.execInContainer("/bin/sh", "-c", "/dhfs_test/fuse/a.out");
Log.info(helloOut);
return helloOut.getExitCode() == 0 && helloOut.getStdout().equals("hello world");
});
await().atMost(45, TimeUnit.SECONDS).until(() -> {
var helloOut = container3.execInContainer("/bin/sh", "-c", "/root/dhfs_default/fuse/a.out");
var helloOut = container3.execInContainer("/bin/sh", "-c", "/dhfs_test/fuse/a.out");
Log.info(helloOut);
return helloOut.getExitCode() == 0 && helloOut.getStdout().equals("hello world");
});
@@ -183,9 +183,9 @@ public class DhfsFusex3IT {
@Test
void removeHostTest() throws IOException, InterruptedException, TimeoutException {
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo tesempty > /root/dhfs_default/fuse/testf1").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> "tesempty\n".equals(container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf1").getStdout()));
await().atMost(45, TimeUnit.SECONDS).until(() -> "tesempty\n".equals(container3.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf1").getStdout()));
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo tesempty > /dhfs_test/fuse/testf1").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> "tesempty\n".equals(container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testf1").getStdout()));
await().atMost(45, TimeUnit.SECONDS).until(() -> "tesempty\n".equals(container3.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testf1").getStdout()));
var c3curl = container3.execInContainer("/bin/sh", "-c",
"curl --header \"Content-Type: application/json\" " +
@@ -195,10 +195,10 @@ public class DhfsFusex3IT {
Thread.sleep(10000);
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "echo rewritten > /root/dhfs_default/fuse/testf1").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> "rewritten\n".equals(container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf1").getStdout()));
await().atMost(45, TimeUnit.SECONDS).until(() -> "tesempty\n".equals(container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf1").getStdout()));
await().atMost(45, TimeUnit.SECONDS).until(() -> "tesempty\n".equals(container3.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf1").getStdout()));
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "echo rewritten > /dhfs_test/fuse/testf1").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> "rewritten\n".equals(container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testf1").getStdout()));
await().atMost(45, TimeUnit.SECONDS).until(() -> "tesempty\n".equals(container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testf1").getStdout()));
await().atMost(45, TimeUnit.SECONDS).until(() -> "tesempty\n".equals(container3.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testf1").getStdout()));
}
@Test
@@ -208,15 +208,15 @@ public class DhfsFusex3IT {
client.pauseContainerCmd(container2.getContainerId()).exec();
// Pauses needed as otherwise docker buffers some incoming packets
waitingConsumer3.waitUntil(frame -> frame.getUtf8String().contains("Lost connection to"), 60, TimeUnit.SECONDS, 2);
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container3.execInContainer("/bin/sh", "-c", "echo test3 >> /root/dhfs_default/fuse/testf").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container3.execInContainer("/bin/sh", "-c", "echo test3 >> /dhfs_test/fuse/testf").getExitCode());
client.pauseContainerCmd(container3.getContainerId()).exec();
client.unpauseContainerCmd(container2.getContainerId()).exec();
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Lost connection to"), 60, TimeUnit.SECONDS, 2);
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "echo test2 >> /root/dhfs_default/fuse/testf").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "echo test2 >> /dhfs_test/fuse/testf").getExitCode());
client.pauseContainerCmd(container2.getContainerId()).exec();
client.unpauseContainerCmd(container1.getContainerId()).exec();
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Lost connection to"), 60, TimeUnit.SECONDS, 2);
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo test1 >> /root/dhfs_default/fuse/testf").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo test1 >> /dhfs_test/fuse/testf").getExitCode());
client.unpauseContainerCmd(container2.getContainerId()).exec();
client.unpauseContainerCmd(container3.getContainerId()).exec();
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS, 2);
@@ -225,8 +225,8 @@ public class DhfsFusex3IT {
await().atMost(45, TimeUnit.SECONDS).until(() -> {
for (var c : List.of(container1, container2, container3)) {
var ls = c.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse");
var cat = c.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/*");
var ls = c.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse");
var cat = c.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/*");
Log.info(ls);
Log.info(cat);
if (!(cat.getStdout().contains("test1") && cat.getStdout().contains("test2") && cat.getStdout().contains("test3")))
@@ -236,35 +236,35 @@ public class DhfsFusex3IT {
});
await().atMost(45, TimeUnit.SECONDS).until(() -> {
return container1.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse").getStdout().equals(
container2.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse").getStdout()) &&
container3.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse").getStdout().equals(
container2.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse").getStdout()) &&
container3.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/*").getStdout().equals(
container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/*").getStdout());
return container1.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse").getStdout().equals(
container2.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse").getStdout()) &&
container3.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse").getStdout().equals(
container2.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse").getStdout()) &&
container3.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/*").getStdout().equals(
container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/*").getStdout());
});
}
@Test
void fileConflictTest() throws IOException, InterruptedException, TimeoutException {
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo tesempty > /root/dhfs_default/fuse/testf").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> "tesempty\n".equals(container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf").getStdout()));
await().atMost(45, TimeUnit.SECONDS).until(() -> "tesempty\n".equals(container3.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf").getStdout()));
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo tesempty > /dhfs_test/fuse/testf").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> "tesempty\n".equals(container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testf").getStdout()));
await().atMost(45, TimeUnit.SECONDS).until(() -> "tesempty\n".equals(container3.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testf").getStdout()));
var client = DockerClientFactory.instance().client();
client.pauseContainerCmd(container1.getContainerId()).exec();
client.pauseContainerCmd(container2.getContainerId()).exec();
// Pauses needed as otherwise docker buffers some incoming packets
waitingConsumer3.waitUntil(frame -> frame.getUtf8String().contains("Lost connection to"), 60, TimeUnit.SECONDS, 2);
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container3.execInContainer("/bin/sh", "-c", "echo test3 >> /root/dhfs_default/fuse/testf").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container3.execInContainer("/bin/sh", "-c", "echo test3 >> /dhfs_test/fuse/testf").getExitCode());
client.pauseContainerCmd(container3.getContainerId()).exec();
client.unpauseContainerCmd(container2.getContainerId()).exec();
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Lost connection to"), 60, TimeUnit.SECONDS, 2);
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "echo test2 >> /root/dhfs_default/fuse/testf").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "echo test2 >> /dhfs_test/fuse/testf").getExitCode());
client.pauseContainerCmd(container2.getContainerId()).exec();
client.unpauseContainerCmd(container1.getContainerId()).exec();
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Lost connection to"), 60, TimeUnit.SECONDS, 2);
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo test1 >> /root/dhfs_default/fuse/testf").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo test1 >> /dhfs_test/fuse/testf").getExitCode());
client.unpauseContainerCmd(container2.getContainerId()).exec();
client.unpauseContainerCmd(container3.getContainerId()).exec();
Log.warn("Waiting for connections");
@@ -276,9 +276,9 @@ public class DhfsFusex3IT {
// TODO: There's some issue with cache, so avoid file reads
await().atMost(45, TimeUnit.SECONDS).until(() -> {
Log.info("Listing consistency 1");
var ls1 = container1.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse");
var ls2 = container2.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse");
var ls3 = container3.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse");
var ls1 = container1.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse");
var ls2 = container2.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse");
var ls3 = container3.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse");
Log.info(ls1);
Log.info(ls2);
Log.info(ls3);
@@ -290,8 +290,8 @@ public class DhfsFusex3IT {
await().atMost(45, TimeUnit.SECONDS).until(() -> {
Log.info("Listing");
for (var c : List.of(container1, container2, container3)) {
var ls = c.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse");
var cat = c.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/*");
var ls = c.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse");
var cat = c.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/*");
Log.info(ls);
Log.info(cat);
if (!(cat.getExitCode() == 0 && ls.getExitCode() == 0))
@@ -304,12 +304,12 @@ public class DhfsFusex3IT {
await().atMost(45, TimeUnit.SECONDS).until(() -> {
Log.info("Listing consistency");
var ls1 = container1.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse");
var cat1 = container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/*");
var ls2 = container2.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse");
var cat2 = container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/*");
var ls3 = container3.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse");
var cat3 = container3.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/*");
var ls1 = container1.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse");
var cat1 = container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/*");
var ls2 = container2.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse");
var cat2 = container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/*");
var ls3 = container3.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse");
var cat3 = container3.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/*");
Log.info(ls1);
Log.info(cat1);
Log.info(ls2);

View File

@@ -83,7 +83,11 @@ public class DhfsImage implements Future<String> {
"-Dquarkus.log.category.\"com.usatiuk\".level=TRACE",
"-Dquarkus.log.category.\"com.usatiuk.dhfs\".level=TRACE",
"-Ddhfs.objects.periodic-push-op-interval=5s",
"-Ddhfs.fuse.root=/dhfs_test/fuse",
"-Ddhfs.objects.persistence.files.root=/dhfs_test/data",
"-Ddhfs.objects.persistence.stuff.root=/dhfs_test/data/stuff",
"-jar", "/app/quarkus-run.jar")
.run("mkdir /dhfs_test && chmod 777 /dhfs_test")
.build())
.withFileFromPath("/app", Paths.get(buildPath, "quarkus-app"))
.withFileFromPath("/libs", Paths.get(nativeLibsDirectory));

View File

@@ -0,0 +1,182 @@
package com.usatiuk.dhfs.integration;
import com.github.dockerjava.api.model.Device;
import com.usatiuk.dhfs.TestDataCleaner;
import io.quarkus.logging.Log;
import org.junit.jupiter.api.*;
import org.slf4j.LoggerFactory;
import org.testcontainers.DockerClientFactory;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.containers.output.WaitingConsumer;
import org.testcontainers.containers.wait.strategy.Wait;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.time.Duration;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Stream;
import static org.awaitility.Awaitility.await;
public class KillIT {
GenericContainer<?> container1;
GenericContainer<?> container2;
WaitingConsumer waitingConsumer1;
WaitingConsumer waitingConsumer2;
String c1uuid;
String c2uuid;
File data1;
File data2;
@BeforeEach
void setup(TestInfo testInfo) throws IOException, InterruptedException, TimeoutException {
data1 = Files.createTempDirectory("").toFile();
data2 = Files.createTempDirectory("").toFile();
Network network = Network.newNetwork();
container1 = new GenericContainer<>(DhfsImage.getInstance())
.withPrivilegedMode(true)
.withCreateContainerCmdModifier(cmd -> Objects.requireNonNull(cmd.getHostConfig()).withDevices(Device.parse("/dev/fuse")))
.waitingFor(Wait.forLogMessage(".*Listening.*", 1).withStartupTimeout(Duration.ofSeconds(60))).withNetwork(network)
.withFileSystemBind(data1.getAbsolutePath(), "/dhfs_test/data");
container2 = new GenericContainer<>(DhfsImage.getInstance())
.withPrivilegedMode(true)
.withCreateContainerCmdModifier(cmd -> Objects.requireNonNull(cmd.getHostConfig()).withDevices(Device.parse("/dev/fuse")))
.waitingFor(Wait.forLogMessage(".*Listening.*", 1).withStartupTimeout(Duration.ofSeconds(60))).withNetwork(network)
.withFileSystemBind(data2.getAbsolutePath(), "/dhfs_test/data");
Stream.of(container1, container2).parallel().forEach(GenericContainer::start);
waitingConsumer1 = new WaitingConsumer();
var loggingConsumer1 = new Slf4jLogConsumer(LoggerFactory.getLogger(KillIT.class)).withPrefix("1-" + testInfo.getDisplayName());
container1.followOutput(loggingConsumer1.andThen(waitingConsumer1));
waitingConsumer2 = new WaitingConsumer();
var loggingConsumer2 = new Slf4jLogConsumer(LoggerFactory.getLogger(KillIT.class)).withPrefix("2-" + testInfo.getDisplayName());
container2.followOutput(loggingConsumer2.andThen(waitingConsumer2));
c1uuid = container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/data/stuff/self_uuid").getStdout();
c2uuid = container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/data/stuff/self_uuid").getStdout();
Assertions.assertDoesNotThrow(() -> UUID.fromString(c1uuid));
Assertions.assertDoesNotThrow(() -> UUID.fromString(c2uuid));
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("New address"), 60, TimeUnit.SECONDS);
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("New address"), 60, TimeUnit.SECONDS);
var c1curl = container1.execInContainer("/bin/sh", "-c",
"curl --header \"Content-Type: application/json\" " +
" --request PUT " +
" --data '{\"uuid\":\"" + c2uuid + "\"}' " +
" http://localhost:8080/peers-manage/known-peers");
var c2curl = container2.execInContainer("/bin/sh", "-c",
"curl --header \"Content-Type: application/json\" " +
" --request PUT " +
" --data '{\"uuid\":\"" + c1uuid + "\"}' " +
" http://localhost:8080/peers-manage/known-peers");
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
}
@AfterEach
void stop() {
Stream.of(container1, container2).parallel().forEach(GenericContainer::stop);
TestDataCleaner.purgeDirectory(data1);
TestDataCleaner.purgeDirectory(data2);
}
@Test
void killTest(TestInfo testInfo) throws Exception {
var executor = Executors.newFixedThreadPool(2);
var barrier = new CyclicBarrier(2);
var ret1 = executor.submit(() -> {
try {
Log.info("Writing to container 1");
barrier.await();
container1.execInContainer("/bin/sh", "-c", "counter=0; while true; do counter=`expr $counter + 1`; echo $counter >> /dhfs_test/fuse/test1; done");
} catch (Exception e) {
throw new RuntimeException(e);
}
});
barrier.await();
Thread.sleep(10000);
var client = DockerClientFactory.instance().client();
client.killContainerCmd(container1.getContainerId()).exec();
container1.stop();
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Lost connection to"), 60, TimeUnit.SECONDS);
container1.start();
waitingConsumer1 = new WaitingConsumer();
var loggingConsumer1 = new Slf4jLogConsumer(LoggerFactory.getLogger(KillIT.class)).withPrefix("1-" + testInfo.getDisplayName());
container1.followOutput(loggingConsumer1.andThen(waitingConsumer1));
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
await().atMost(45, TimeUnit.SECONDS).until(() -> {
Log.info("Listing consistency");
var ls1 = container1.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse");
var cat1 = container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/*");
var ls2 = container2.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse");
var cat2 = container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/*");
Log.info(ls1);
Log.info(cat1);
Log.info(ls2);
Log.info(cat2);
return ls1.equals(ls2) && cat1.equals(cat2);
});
}
@Test
void killTestDirs(TestInfo testInfo) throws Exception {
var executor = Executors.newFixedThreadPool(2);
var barrier = new CyclicBarrier(2);
var ret1 = executor.submit(() -> {
try {
Log.info("Writing to container 1");
barrier.await();
container1.execInContainer("/bin/sh", "-c", "counter=0; while true; do counter=`expr $counter + 1`; echo $counter >> /dhfs_test/fuse/test$counter; done");
} catch (Exception e) {
throw new RuntimeException(e);
}
});
barrier.await();
Thread.sleep(10000);
var client = DockerClientFactory.instance().client();
client.killContainerCmd(container1.getContainerId()).exec();
container1.stop();
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Lost connection to"), 60, TimeUnit.SECONDS);
container1.start();
waitingConsumer1 = new WaitingConsumer();
var loggingConsumer1 = new Slf4jLogConsumer(LoggerFactory.getLogger(KillIT.class)).withPrefix("1-" + testInfo.getDisplayName());
container1.followOutput(loggingConsumer1.andThen(waitingConsumer1));
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
await().atMost(45, TimeUnit.SECONDS).until(() -> {
Log.info("Listing consistency");
var ls1 = container1.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse");
var cat1 = container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/*");
var ls2 = container2.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse");
var cat2 = container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/*");
Log.info(ls1);
Log.info(cat1);
Log.info(ls2);
Log.info(cat2);
return ls1.equals(ls2) && cat1.equals(cat2);
});
}
}

View File

@@ -59,9 +59,9 @@ public class ResyncIT {
@Test
void readWriteFileTest() throws IOException, InterruptedException, TimeoutException {
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo tesempty > /root/dhfs_default/fuse/testf1").getExitCode());
c1uuid = container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/data/stuff/self_uuid").getStdout();
c2uuid = container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/data/stuff/self_uuid").getStdout();
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo tesempty > /dhfs_test/fuse/testf1").getExitCode());
c1uuid = container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/data/stuff/self_uuid").getStdout();
c2uuid = container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/data/stuff/self_uuid").getStdout();
Assertions.assertDoesNotThrow(() -> UUID.fromString(c1uuid));
Assertions.assertDoesNotThrow(() -> UUID.fromString(c2uuid));
@@ -84,24 +84,24 @@ public class ResyncIT {
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
await().atMost(45, TimeUnit.SECONDS).until(() -> "tesempty\n".equals(container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf1").getStdout()));
await().atMost(45, TimeUnit.SECONDS).until(() -> "tesempty\n".equals(container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testf1").getStdout()));
}
@Test
void manyFiles() throws IOException, InterruptedException, TimeoutException {
var ret = container1.execInContainer("/bin/sh", "-c", "for i in $(seq 1 200); do echo $i > /root/dhfs_default/fuse/test$i; done");
var ret = container1.execInContainer("/bin/sh", "-c", "for i in $(seq 1 200); do echo $i > /dhfs_test/fuse/test$i; done");
Assertions.assertEquals(0, ret.getExitCode());
var foundWc = container1.execInContainer("/bin/sh", "-c", "find /root/dhfs_default/fuse -type f | wc -l");
var foundWc = container1.execInContainer("/bin/sh", "-c", "find /dhfs_test/fuse -type f | wc -l");
Assertions.assertEquals(200, Integer.valueOf(foundWc.getStdout().strip()));
ret = container2.execInContainer("/bin/sh", "-c", "for i in $(seq 1 200); do echo $i > /root/dhfs_default/fuse/test-2-$i; done");
ret = container2.execInContainer("/bin/sh", "-c", "for i in $(seq 1 200); do echo $i > /dhfs_test/fuse/test-2-$i; done");
Assertions.assertEquals(0, ret.getExitCode());
foundWc = container2.execInContainer("/bin/sh", "-c", "find /root/dhfs_default/fuse -type f | wc -l");
foundWc = container2.execInContainer("/bin/sh", "-c", "find /dhfs_test/fuse -type f | wc -l");
Assertions.assertEquals(200, Integer.valueOf(foundWc.getStdout().strip()));
c1uuid = container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/data/stuff/self_uuid").getStdout();
c2uuid = container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/data/stuff/self_uuid").getStdout();
c1uuid = container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/data/stuff/self_uuid").getStdout();
c2uuid = container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/data/stuff/self_uuid").getStdout();
Assertions.assertDoesNotThrow(() -> UUID.fromString(c1uuid));
Assertions.assertDoesNotThrow(() -> UUID.fromString(c2uuid));
@@ -124,24 +124,24 @@ public class ResyncIT {
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
await().atMost(120, TimeUnit.SECONDS).until(() -> {
var foundWc2 = container2.execInContainer("/bin/sh", "-c", "find /root/dhfs_default/fuse -type f | wc -l");
var foundWc2 = container2.execInContainer("/bin/sh", "-c", "find /dhfs_test/fuse -type f | wc -l");
return 400 == Integer.valueOf(foundWc2.getStdout().strip());
});
await().atMost(120, TimeUnit.SECONDS).until(() -> {
var foundWc2 = container1.execInContainer("/bin/sh", "-c", "find /root/dhfs_default/fuse -type f | wc -l");
var foundWc2 = container1.execInContainer("/bin/sh", "-c", "find /dhfs_test/fuse -type f | wc -l");
return 400 == Integer.valueOf(foundWc2.getStdout().strip());
});
}
@Test
void folderAfterMove() throws IOException, InterruptedException, TimeoutException {
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "mkdir /root/dhfs_default/fuse/testd1").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo tesempty1 > /root/dhfs_default/fuse/testd1/testf1").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "mv /root/dhfs_default/fuse/testd1 /root/dhfs_default/fuse/testd2").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo tesempty2 > /root/dhfs_default/fuse/testd2/testf2").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "mkdir /dhfs_test/fuse/testd1").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo tesempty1 > /dhfs_test/fuse/testd1/testf1").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "mv /dhfs_test/fuse/testd1 /dhfs_test/fuse/testd2").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo tesempty2 > /dhfs_test/fuse/testd2/testf2").getExitCode());
c1uuid = container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/data/stuff/self_uuid").getStdout();
c2uuid = container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/data/stuff/self_uuid").getStdout();
c1uuid = container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/data/stuff/self_uuid").getStdout();
c2uuid = container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/data/stuff/self_uuid").getStdout();
Assertions.assertDoesNotThrow(() -> UUID.fromString(c1uuid));
Assertions.assertDoesNotThrow(() -> UUID.fromString(c2uuid));
@@ -164,8 +164,8 @@ public class ResyncIT {
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
await().atMost(45, TimeUnit.SECONDS).until(() -> "tesempty1\n".equals(container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testd2/testf1").getStdout()));
await().atMost(45, TimeUnit.SECONDS).until(() -> "tesempty2\n".equals(container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testd2/testf2").getStdout()));
await().atMost(45, TimeUnit.SECONDS).until(() -> "tesempty1\n".equals(container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testd2/testf1").getStdout()));
await().atMost(45, TimeUnit.SECONDS).until(() -> "tesempty2\n".equals(container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testd2/testf2").getStdout()));
}
}

View File

@@ -19,7 +19,7 @@ public class ChunkDataProtoSerializer implements ProtoSerializer<ChunkDataP, Chu
@Override
public ChunkDataP serialize(ChunkData object) {
return ChunkDataP.newBuilder()
.setKey(JObjectKeyP.newBuilder().setName(object.key().name()).build())
.setKey(JObjectKeyP.newBuilder().setName(object.key().value()).build())
.setData(object.data())
.build();
}

View File

@@ -74,6 +74,10 @@ public class DhfsFuse extends FuseStubFS {
void init(@Observes @Priority(100000) StartupEvent event) {
if (!enabled) return;
Paths.get(root).toFile().mkdirs();
if (!Paths.get(root).toFile().isDirectory())
throw new IllegalStateException("Could not create directory " + root);
Log.info("Mounting with root " + root);
var uid = new UnixSystem().getUid();

View File

@@ -2,11 +2,6 @@ package com.usatiuk.objects;
import java.io.Serializable;
// TODO: This could be maybe moved to a separate module?
// The base class for JObject data
// Only one instance of this "exists" per key, the instance in the manager is canonical
// When committing a transaction, the instance is checked against it, if it isn't the same, a race occurred.
// It is immutable, its version is filled in by the allocator from the AllocVersionProvider
public interface JData extends Serializable {
JObjectKey key();

View File

@@ -9,8 +9,8 @@ public sealed interface JObjectKey extends Serializable, Comparable<JObjectKey>
JObjectKeyMin MIN = new JObjectKeyMin();
JObjectKeyMax MAX = new JObjectKeyMax();
static JObjectKey of(String name) {
return new JObjectKeyImpl(name);
static JObjectKey of(String value) {
return new JObjectKeyImpl(value);
}
static JObjectKey random() {
@@ -43,5 +43,5 @@ public sealed interface JObjectKey extends Serializable, Comparable<JObjectKey>
ByteBuffer toByteBuffer();
String name();
String value();
}

View File

@@ -5,12 +5,12 @@ import com.usatiuk.dhfs.supportlib.UninitializedByteBuffer;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
public record JObjectKeyImpl(String name) implements JObjectKey {
public record JObjectKeyImpl(String value) implements JObjectKey {
@Override
public int compareTo(JObjectKey o) {
switch (o) {
case JObjectKeyImpl jObjectKeyImpl -> {
return name.compareTo(jObjectKeyImpl.name());
return value.compareTo(jObjectKeyImpl.value());
}
case JObjectKeyMax jObjectKeyMax -> {
return -1;
@@ -23,17 +23,17 @@ public record JObjectKeyImpl(String name) implements JObjectKey {
@Override
public String toString() {
return name;
return value;
}
@Override
public byte[] bytes() {
return name.getBytes(StandardCharsets.UTF_8);
return value.getBytes(StandardCharsets.UTF_8);
}
@Override
public ByteBuffer toByteBuffer() {
var heapBb = StandardCharsets.UTF_8.encode(name);
var heapBb = StandardCharsets.UTF_8.encode(value);
if (heapBb.isDirect()) return heapBb;
var directBb = UninitializedByteBuffer.allocateUninitialized(heapBb.remaining());
directBb.put(heapBb);

View File

@@ -29,7 +29,7 @@ public record JObjectKeyMax() implements JObjectKey {
}
@Override
public String name() {
public String value() {
throw new UnsupportedOperationException();
}
}

View File

@@ -29,7 +29,7 @@ public record JObjectKeyMin() implements JObjectKey {
}
@Override
public String name() {
public String value() {
throw new UnsupportedOperationException();
}
}

View File

@@ -2,7 +2,6 @@ package com.usatiuk.objects.stores;
import com.google.protobuf.ByteString;
import com.usatiuk.objects.JObjectKey;
import com.usatiuk.objects.JObjectKeyImpl;
import com.usatiuk.objects.JObjectKeyMax;
import com.usatiuk.objects.JObjectKeyMin;
import com.usatiuk.objects.iterators.CloseableKvIterator;
@@ -121,7 +120,7 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore {
@Override
public CloseableKvIterator<JObjectKey, ByteString> getIterator(IteratorStart start, JObjectKey key) {
assert !_closed;
return new KeyPredicateKvIterator<>(new LmdbKvIterator(_txn.ref(), start, key), start, key, (k) -> !Arrays.equals(k.name().getBytes(StandardCharsets.UTF_8), DB_VER_OBJ_NAME));
return new KeyPredicateKvIterator<>(new LmdbKvIterator(_txn.ref(), start, key), start, key, (k) -> !Arrays.equals(k.value().getBytes(StandardCharsets.UTF_8), DB_VER_OBJ_NAME));
}
@Nonnull

View File

@@ -595,13 +595,13 @@ public abstract class ObjectsTestImpl {
txm.run(() -> {
var iter = curTx.getIterator(IteratorStart.GT, JObjectKey.of(key));
var got = iter.next();
Assertions.assertEquals(key1, got.getKey().name());
Assertions.assertEquals(key1, got.getKey().value());
got = iter.next();
Assertions.assertEquals(key2, got.getKey().name());
Assertions.assertEquals(key2, got.getKey().value());
got = iter.next();
Assertions.assertEquals(key3, got.getKey().name());
Assertions.assertEquals(key3, got.getKey().value());
got = iter.next();
Assertions.assertEquals(key4, got.getKey().name());
Assertions.assertEquals(key4, got.getKey().value());
iter.close();
});
}
@@ -623,25 +623,25 @@ public abstract class ObjectsTestImpl {
txm.run(() -> {
try (var iter = curTx.getIterator(IteratorStart.GT, JObjectKey.of(key))) {
var got = iter.next();
Assertions.assertEquals(key1, got.getKey().name());
Assertions.assertEquals(key1, got.getKey().value());
got = iter.next();
Assertions.assertEquals(key2, got.getKey().name());
Assertions.assertEquals(key2, got.getKey().value());
got = iter.next();
Assertions.assertEquals(key3, got.getKey().name());
Assertions.assertEquals(key3, got.getKey().value());
got = iter.next();
Assertions.assertEquals(key4, got.getKey().name());
Assertions.assertEquals(key4, got.getKey().value());
}
});
txm.run(() -> {
try (var iter = curTx.getIterator(IteratorStart.LT, JObjectKey.of(key + "_5"))) {
var got = iter.next();
Assertions.assertEquals(key4, got.getKey().name());
Assertions.assertEquals(key4, got.getKey().value());
Assertions.assertTrue(iter.hasPrev());
got = iter.prev();
Assertions.assertEquals(key4, got.getKey().name());
Assertions.assertEquals(key4, got.getKey().value());
Assertions.assertTrue(iter.hasNext());
got = iter.next();
Assertions.assertEquals(key4, got.getKey().name());
Assertions.assertEquals(key4, got.getKey().value());
}
});
txm.run(() -> {
@@ -653,7 +653,7 @@ public abstract class ObjectsTestImpl {
});
txm.run(() -> {
try (var iter = curTx.getIterator(IteratorStart.GT, JObjectKey.of(key))) {
Assertions.assertTrue(!iter.hasNext() || !iter.next().getKey().name().startsWith(key));
Assertions.assertTrue(!iter.hasNext() || !iter.next().getKey().value().startsWith(key));
}
});
}
@@ -695,9 +695,9 @@ public abstract class ObjectsTestImpl {
barrier2.await();
try (var iter = curTx.getIterator(IteratorStart.GT, JObjectKey.of(key))) {
var got = iter.next();
Assertions.assertEquals(key1, got.getKey().name());
Assertions.assertEquals(key1, got.getKey().value());
got = iter.next();
Assertions.assertEquals(key4, got.getKey().name());
Assertions.assertEquals(key4, got.getKey().value());
}
} catch (Exception e) {
throw new RuntimeException(e);
@@ -710,13 +710,13 @@ public abstract class ObjectsTestImpl {
txm.run(() -> {
try (var iter = curTx.getIterator(IteratorStart.GT, JObjectKey.of(key))) {
var got = iter.next();
Assertions.assertEquals(key1, got.getKey().name());
Assertions.assertEquals(key1, got.getKey().value());
got = iter.next();
Assertions.assertEquals(key2, got.getKey().name());
Assertions.assertEquals(key2, got.getKey().value());
got = iter.next();
Assertions.assertEquals(key3, got.getKey().name());
Assertions.assertEquals(key3, got.getKey().value());
got = iter.next();
Assertions.assertEquals(key4, got.getKey().name());
Assertions.assertEquals(key4, got.getKey().value());
}
});
txm.run(() -> {
@@ -728,7 +728,7 @@ public abstract class ObjectsTestImpl {
});
txm.run(() -> {
try (var iter = curTx.getIterator(IteratorStart.GT, JObjectKey.of(key))) {
Assertions.assertTrue(!iter.hasNext() || !iter.next().getKey().name().startsWith(key));
Assertions.assertTrue(!iter.hasNext() || !iter.next().getKey().value().startsWith(key));
}
});
}
@@ -771,12 +771,12 @@ public abstract class ObjectsTestImpl {
barrier2.await();
try (var iter = curTx.getIterator(IteratorStart.GT, JObjectKey.of(key))) {
var got = iter.next();
Assertions.assertEquals(key1, got.getKey().name());
Assertions.assertEquals(key1, got.getKey().value());
got = iter.next();
Assertions.assertEquals(key2, got.getKey().name());
Assertions.assertEquals(key2, got.getKey().value());
Assertions.assertEquals("John2", ((Parent) got.getValue()).name());
got = iter.next();
Assertions.assertEquals(key4, got.getKey().name());
Assertions.assertEquals(key4, got.getKey().value());
}
} catch (Exception e) {
throw new RuntimeException(e);
@@ -789,14 +789,14 @@ public abstract class ObjectsTestImpl {
txm.run(() -> {
try (var iter = curTx.getIterator(IteratorStart.GT, JObjectKey.of(key))) {
var got = iter.next();
Assertions.assertEquals(key1, got.getKey().name());
Assertions.assertEquals(key1, got.getKey().value());
got = iter.next();
Assertions.assertEquals(key2, got.getKey().name());
Assertions.assertEquals(key2, got.getKey().value());
Assertions.assertEquals("John5", ((Parent) got.getValue()).name());
got = iter.next();
Assertions.assertEquals(key3, got.getKey().name());
Assertions.assertEquals(key3, got.getKey().value());
got = iter.next();
Assertions.assertEquals(key4, got.getKey().name());
Assertions.assertEquals(key4, got.getKey().value());
}
});
txm.run(() -> {
@@ -808,7 +808,7 @@ public abstract class ObjectsTestImpl {
});
txm.run(() -> {
try (var iter = curTx.getIterator(IteratorStart.GT, JObjectKey.of(key))) {
Assertions.assertTrue(!iter.hasNext() || !iter.next().getKey().name().startsWith(key));
Assertions.assertTrue(!iter.hasNext() || !iter.next().getKey().value().startsWith(key));
}
});
}
@@ -851,38 +851,38 @@ public abstract class ObjectsTestImpl {
barrier2.await();
try (var iter = curTx.getIterator(IteratorStart.LE, JObjectKey.of(key3))) {
var got = iter.next();
Assertions.assertEquals(key2, got.getKey().name());
Assertions.assertEquals(key2, got.getKey().value());
Assertions.assertEquals("John2", ((Parent) got.getValue()).name());
Assertions.assertTrue(iter.hasNext());
Assertions.assertTrue(iter.hasPrev());
got = iter.next();
Assertions.assertEquals(key4, got.getKey().name());
Assertions.assertEquals(key4, got.getKey().value());
Assertions.assertTrue(iter.hasPrev());
got = iter.prev();
Assertions.assertEquals(key4, got.getKey().name());
Assertions.assertEquals(key4, got.getKey().value());
Assertions.assertTrue(iter.hasPrev());
got = iter.prev();
Assertions.assertEquals("John2", ((Parent) got.getValue()).name());
Assertions.assertTrue(iter.hasPrev());
got = iter.prev();
Assertions.assertEquals(key1, got.getKey().name());
Assertions.assertEquals(key1, got.getKey().value());
Assertions.assertTrue(iter.hasNext());
got = iter.next();
Assertions.assertEquals(key1, got.getKey().name());
Assertions.assertEquals(key1, got.getKey().value());
got = iter.next();
Assertions.assertEquals(key2, got.getKey().name());
Assertions.assertEquals(key2, got.getKey().value());
Assertions.assertEquals("John2", ((Parent) got.getValue()).name());
got = iter.next();
Assertions.assertEquals(key4, got.getKey().name());
Assertions.assertEquals(key4, got.getKey().value());
}
try (var iter = curTx.getIterator(IteratorStart.GT, JObjectKey.of(key))) {
var got = iter.next();
Assertions.assertEquals(key1, got.getKey().name());
Assertions.assertEquals(key1, got.getKey().value());
got = iter.next();
Assertions.assertEquals(key2, got.getKey().name());
Assertions.assertEquals(key2, got.getKey().value());
Assertions.assertEquals("John2", ((Parent) got.getValue()).name());
got = iter.next();
Assertions.assertEquals(key4, got.getKey().name());
Assertions.assertEquals(key4, got.getKey().value());
}
} catch (Exception e) {
throw new RuntimeException(e);
@@ -895,11 +895,11 @@ public abstract class ObjectsTestImpl {
txm.run(() -> {
try (var iter = curTx.getIterator(IteratorStart.GT, JObjectKey.of(key))) {
var got = iter.next();
Assertions.assertEquals(key1, got.getKey().name());
Assertions.assertEquals(key1, got.getKey().value());
got = iter.next();
Assertions.assertEquals(key3, got.getKey().name());
Assertions.assertEquals(key3, got.getKey().value());
got = iter.next();
Assertions.assertEquals(key4, got.getKey().name());
Assertions.assertEquals(key4, got.getKey().value());
}
});
txm.run(() -> {
@@ -910,7 +910,7 @@ public abstract class ObjectsTestImpl {
});
txm.run(() -> {
try (var iter = curTx.getIterator(IteratorStart.GT, JObjectKey.of(key))) {
Assertions.assertTrue(!iter.hasNext() || !iter.next().getKey().name().startsWith(key));
Assertions.assertTrue(!iter.hasNext() || !iter.next().getKey().value().startsWith(key));
}
});
}

View File

@@ -13,6 +13,6 @@ public class JObjectKeyProtoSerializer implements ProtoSerializer<JObjectKeyP, J
@Override
public JObjectKeyP serialize(JObjectKey object) {
return JObjectKeyP.newBuilder().setName(object.name()).build();
return JObjectKeyP.newBuilder().setName(object.value()).build();
}
}

View File

@@ -0,0 +1,36 @@
package com.usatiuk.dhfs;
import com.usatiuk.dhfs.repository.InitialSyncProcessor;
import com.usatiuk.dhfs.repository.invalidation.InvalidationQueueService;
import com.usatiuk.dhfs.repository.peersync.PeerInfoService;
import com.usatiuk.objects.JObjectKey;
import com.usatiuk.objects.transaction.Transaction;
import io.quarkus.logging.Log;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
@ApplicationScoped
public class RemoteObjectInitialSyncProcessor implements InitialSyncProcessor<RemoteObjectMeta> {
@Inject
InvalidationQueueService invalidationQueueService;
@Inject
Transaction curTx;
@Inject
PeerInfoService peerInfoService;
@Override
public void prepareForInitialSync(PeerId from, JObjectKey key) {
}
@Override
public void handleCrash(JObjectKey key) {
var data = curTx.get(RemoteObjectMeta.class, key).orElseThrow();
var versionSum = data.versionSum();
for (var p : peerInfoService.getPeersNoSelf()) {
if (data.knownRemoteVersions().getOrDefault(p.id(), 0L) != versionSum) {
invalidationQueueService.pushInvalidationToOne(p.id(), key);
Log.infov("Pushing after crash invalidation to {0} for {1}", p.id(), key);
}
}
}
}

View File

@@ -41,7 +41,7 @@ public record RemoteObjectMeta(PCollection<JDataRef> refsFrom, boolean frozen,
}
public static JObjectKey ofDataKey(JObjectKey key) {
return JObjectKey.of("data_" + key.name());
return JObjectKey.of("data_" + key.value());
}
@Override

View File

@@ -21,6 +21,10 @@ public class ShutdownChecker {
void init(@Observes @Priority(2) StartupEvent event) throws IOException {
Paths.get(dataRoot).toFile().mkdirs();
if (!Paths.get(dataRoot).toFile().isDirectory())
throw new IllegalStateException("Could not create directory " + dataRoot);
Log.info("Initializing with root " + dataRoot);
if (Paths.get(dataRoot).resolve(dataFileName).toFile().exists()) {
_cleanShutdown = false;

View File

@@ -1,9 +1,11 @@
package com.usatiuk.dhfs.jkleppmanntree;
import com.usatiuk.objects.JObjectKey;
import com.usatiuk.dhfs.PeerId;
import com.usatiuk.dhfs.jkleppmanntree.structs.JKleppmannTreePersistentData;
import com.usatiuk.dhfs.repository.InitialSyncProcessor;
import com.usatiuk.dhfs.repository.invalidation.InvalidationQueueService;
import com.usatiuk.objects.JObjectKey;
import io.quarkus.logging.Log;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
@@ -11,10 +13,18 @@ import jakarta.inject.Inject;
public class JKleppmannTreeInitialSyncProcessor implements InitialSyncProcessor<JKleppmannTreePersistentData> {
@Inject
JKleppmannTreeManager jKleppmannTreeManager;
@Inject
InvalidationQueueService invalidationQueueService;
@Override
public void prepareForInitialSync(PeerId from, JObjectKey key) {
var tree = jKleppmannTreeManager.getTree(key);
tree.recordBootstrap(from);
}
@Override
public void handleCrash(JObjectKey key) {
invalidationQueueService.pushInvalidationToAll(key);
Log.infov("Pushing after crash invalidation to all for {0}", key);
}
}

View File

@@ -6,7 +6,6 @@ import com.usatiuk.dhfs.jkleppmanntree.structs.*;
import com.usatiuk.dhfs.repository.PersistentPeerDataService;
import com.usatiuk.dhfs.repository.invalidation.Op;
import com.usatiuk.dhfs.repository.peersync.PeerInfoService;
import com.usatiuk.objects.JObjectKeyImpl;
import com.usatiuk.objects.transaction.LockingStrategy;
import com.usatiuk.objects.transaction.Transaction;
import com.usatiuk.objects.transaction.TransactionManager;
@@ -50,11 +49,11 @@ public class JKleppmannTreeManager {
TreePMap.empty()
);
curTx.put(data);
var rootNode = new JKleppmannTreeNode(JObjectKey.of(name.name() + "_jt_root"), null, new JKleppmannTreeNodeMetaDirectory(""));
var rootNode = new JKleppmannTreeNode(JObjectKey.of(name.value() + "_jt_root"), null, new JKleppmannTreeNodeMetaDirectory(""));
curTx.put(rootNode);
var trashNode = new JKleppmannTreeNode(JObjectKey.of(name.name() + "_jt_trash"), null, new JKleppmannTreeNodeMetaDirectory(""));
var trashNode = new JKleppmannTreeNode(JObjectKey.of(name.value() + "_jt_trash"), null, new JKleppmannTreeNodeMetaDirectory(""));
curTx.put(trashNode);
var lf_node = new JKleppmannTreeNode(JObjectKey.of(name.name() + "_jt_lf"), null, new JKleppmannTreeNodeMetaDirectory(""));
var lf_node = new JKleppmannTreeNode(JObjectKey.of(name.value() + "_jt_lf"), null, new JKleppmannTreeNodeMetaDirectory(""));
curTx.put(lf_node);
}
return new JKleppmannTree(data);
@@ -259,17 +258,17 @@ public class JKleppmannTreeManager {
@Override
public JObjectKey getRootId() {
return JObjectKey.of(_treeName.name() + "_jt_root");
return JObjectKey.of(_treeName.value() + "_jt_root");
}
@Override
public JObjectKey getTrashId() {
return JObjectKey.of(_treeName.name() + "_jt_trash");
return JObjectKey.of(_treeName.value() + "_jt_trash");
}
@Override
public JObjectKey getLostFoundId() {
return JObjectKey.of(_treeName.name() + "_jt_lf");
return JObjectKey.of(_treeName.value() + "_jt_lf");
}
@Override

View File

@@ -7,7 +7,6 @@ import com.usatiuk.dhfs.PeerId;
import com.usatiuk.kleppmanntree.CombinedTimestamp;
import com.usatiuk.kleppmanntree.LogRecord;
import com.usatiuk.kleppmanntree.OpMove;
import com.usatiuk.objects.JObjectKeyImpl;
import org.pcollections.PCollection;
import org.pcollections.PMap;
import org.pcollections.PSortedMap;
@@ -50,6 +49,6 @@ public record JKleppmannTreePersistentData(
@Override
public Collection<JObjectKey> collectRefsTo() {
return List.of(JObjectKey.of(key().name() + "_jt_trash"), JObjectKey.of(key().name() + "_jt_root"), JObjectKey.of(key().name() + "_jt_lf"));
return List.of(JObjectKey.of(key().value() + "_jt_trash"), JObjectKey.of(key().value() + "_jt_root"), JObjectKey.of(key().value() + "_jt_lf"));
}
}

View File

@@ -17,19 +17,19 @@ public class JMapHelper {
Transaction curTx;
static <K extends JMapKey> JObjectKey makePrefix(JObjectKey holder) {
return JObjectKey.of(holder.name() + "=");
return JObjectKey.of(holder.value() + "=");
}
static <K extends JMapKey> JObjectKey makeKeyFirst(JObjectKey holder) {
return JObjectKey.of(holder.name() + "<");
return JObjectKey.of(holder.value() + "<");
}
static <K extends JMapKey> JObjectKey makeKey(JObjectKey holder, K key) {
return JObjectKey.of(makePrefix(holder).name() + key.toString());
return JObjectKey.of(makePrefix(holder).value() + key.toString());
}
static <K extends JMapKey> JObjectKey makeKeyLast(JObjectKey holder) {
return JObjectKey.of(holder.name() + ">");
return JObjectKey.of(holder.value() + ">");
}
public <K extends JMapKey> CloseableKvIterator<K, JMapEntry<K>> getIterator(JMapHolder<K> holder, IteratorStart start, K key) {

View File

@@ -23,16 +23,16 @@ public class JMapIterator<K extends JMapKey> implements CloseableKvIterator<K, J
_hasNext = false;
return;
}
if (!_backing.peekNextKey().name().startsWith(_prefix.name())) {
if (!_backing.peekNextKey().value().startsWith(_prefix.value())) {
_backing.skip();
if (!_backing.peekNextKey().name().startsWith(_prefix.name())) {
if (!_backing.peekNextKey().value().startsWith(_prefix.value())) {
_hasNext = false;
}
}
}
public K keyToKey(JObjectKey key) {
var keyPart = key.name().substring(_prefix.name().length());
var keyPart = key.value().substring(_prefix.value().length());
return (K) JMapLongKey.of(Long.parseLong(keyPart));
}
@@ -90,7 +90,7 @@ public class JMapIterator<K extends JMapKey> implements CloseableKvIterator<K, J
throw new IllegalStateException("No next element");
}
var next = _backing.next();
assert next.getKey().name().startsWith(_prefix.name());
assert next.getKey().value().startsWith(_prefix.value());
advance();
return Pair.of(keyToKey(next.getKey()), (JMapEntry<K>) next.getValue());
}

View File

@@ -1,9 +1,10 @@
package com.usatiuk.dhfs.repository;
import com.usatiuk.dhfs.PeerId;
import com.usatiuk.objects.JData;
import com.usatiuk.objects.JObjectKey;
import com.usatiuk.dhfs.PeerId;
public interface InitialSyncProcessor<T extends JData> {
void prepareForInitialSync(PeerId from, JObjectKey key);
void handleCrash(JObjectKey key);
}

View File

@@ -3,14 +3,18 @@ package com.usatiuk.dhfs.repository;
import com.usatiuk.dhfs.JDataRemote;
import com.usatiuk.dhfs.PeerId;
import com.usatiuk.dhfs.RemoteTransaction;
import com.usatiuk.dhfs.ShutdownChecker;
import com.usatiuk.dhfs.repository.invalidation.InvalidationQueueService;
import com.usatiuk.objects.JData;
import com.usatiuk.objects.JObjectKey;
import com.usatiuk.objects.iterators.IteratorStart;
import com.usatiuk.dhfs.repository.invalidation.InvalidationQueueService;
import com.usatiuk.objects.transaction.Transaction;
import com.usatiuk.objects.transaction.TransactionManager;
import io.quarkus.logging.Log;
import io.quarkus.runtime.StartupEvent;
import jakarta.annotation.Priority;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
import jakarta.enterprise.inject.Instance;
import jakarta.inject.Inject;
import org.pcollections.HashTreePSet;
@@ -40,6 +44,8 @@ public class SyncHandler {
RemoteTransaction remoteTx;
@Inject
RemoteObjectServiceClient remoteObjectServiceClient;
@Inject
ShutdownChecker shutdownChecker;
public SyncHandler(Instance<ObjSyncHandler<?, ?>> syncHandlers, Instance<InitialSyncProcessor<?>> initialSyncProcessors) {
HashMap<Class<? extends JDataRemote>, ObjSyncHandler> objToHandlerMap = new HashMap<>();
@@ -113,6 +119,33 @@ public class SyncHandler {
}
}
public void resyncAfterCrash(@Observes @Priority(100000) StartupEvent event) {
if (shutdownChecker.lastShutdownClean())
return;
List<JObjectKey> objs = new LinkedList<>();
txm.run(() -> {
try (var it = curTx.getIterator(IteratorStart.GE, JObjectKey.first())) {
while (it.hasNext()) {
var key = it.peekNextKey();
objs.add(key);
// TODO: Nested transactions
it.skip();
}
}
});
for (var obj : objs) {
txm.run(() -> {
var proc = curTx.get(JData.class, obj).flatMap(o -> Optional.ofNullable(_initialSyncProcessors.get(o.getClass()))).orElse(null);
if (proc != null) {
proc.handleCrash(obj);
}
Log.infov("Handled crash of {0}", obj);
});
}
}
public void doInitialSync(PeerId peer) {
List<JObjectKey> objs = new LinkedList<>();
txm.run(() -> {

View File

@@ -51,7 +51,7 @@ public class PeerInfoService {
public boolean existsPeer(PeerId peer) {
return jObjectTxManager.run(() -> {
var gotKey = getTreeR().traverse(List.of(JKleppmannTreeNodeMetaPeer.peerIdToNodeId(peer).name()));
var gotKey = getTreeR().traverse(List.of(JKleppmannTreeNodeMetaPeer.peerIdToNodeId(peer).value()));
if (gotKey == null) {
return false;
}
@@ -61,7 +61,7 @@ public class PeerInfoService {
public Optional<PeerInfo> getPeerInfo(PeerId peer) {
return jObjectTxManager.run(() -> {
var gotKey = getTreeR().traverse(List.of(JKleppmannTreeNodeMetaPeer.peerIdToNodeId(peer).name()));
var gotKey = getTreeR().traverse(List.of(JKleppmannTreeNodeMetaPeer.peerIdToNodeId(peer).value()));
if (gotKey == null) {
return Optional.empty();
}
@@ -114,7 +114,7 @@ public class PeerInfoService {
public void removePeer(PeerId id) {
jObjectTxManager.run(() -> {
var gotKey = getTreeR().traverse(List.of(JKleppmannTreeNodeMetaPeer.peerIdToNodeId(id).name()));
var gotKey = getTreeR().traverse(List.of(JKleppmannTreeNodeMetaPeer.peerIdToNodeId(id).value()));
if (gotKey == null) {
return;
}

View File

@@ -11,19 +11,19 @@ public class JKleppmannTreeNodeMetaPeer extends JKleppmannTreeNodeMeta {
private final JObjectKey _peerId;
public JKleppmannTreeNodeMetaPeer(PeerId id) {
super(peerIdToNodeId(id).name());
super(peerIdToNodeId(id).value());
_peerId = id.toJObjectKey();
}
public static JObjectKey peerIdToNodeId(PeerId id) {
return JObjectKey.of(id.toJObjectKey().name() + "_tree_node");
return JObjectKey.of(id.toJObjectKey().value() + "_tree_node");
}
public static PeerId nodeIdToPeerId(JObjectKey id) {
if (!id.name().endsWith("_tree_node")) {
if (!id.value().endsWith("_tree_node")) {
throw new IllegalArgumentException("Not a tree node key: " + id);
}
return PeerId.of(id.name().substring(0, id.name().length() - "_tree_node".length()));
return PeerId.of(id.value().substring(0, id.value().length() - "_tree_node".length()));
}
public JObjectKey getPeerId() {
@@ -32,7 +32,7 @@ public class JKleppmannTreeNodeMetaPeer extends JKleppmannTreeNodeMeta {
@Override
public JKleppmannTreeNodeMeta withName(String name) {
assert name.equals(peerIdToNodeId(PeerId.of(getPeerId().name())).toString());
assert name.equals(peerIdToNodeId(PeerId.of(getPeerId().value())).toString());
assert getName().equals(name);
return this;
}

View File

@@ -30,7 +30,7 @@ public class PeerInfoCertUpdateTxHook implements PreCommitTxHook {
// connecting two other nodes
// TODO: Can there be a prettier way to do this? (e.g. more general proxying of ops?)
if (cur instanceof JKleppmannTreeNode n) {
if (n.key().name().equals("peers_jt_root")) {
if (n.key().value().equals("peers_jt_root")) {
// TODO: This is kinda sucky
Log.infov("Changed peer tree root: {0} to {1}", key, cur);