mirror of
https://github.com/usatiuk/dhfs.git
synced 2025-10-28 20:47:49 +01:00
Compare commits
6 Commits
146870c281
...
d48cc18e85
| Author | SHA1 | Date | |
|---|---|---|---|
| d48cc18e85 | |||
| 77177414eb | |||
| 83e0f6eb0a | |||
| a5727c01b1 | |||
| 711c4f5e28 | |||
| 45556f2b74 |
@@ -30,11 +30,15 @@ public class TestDataCleaner {
|
||||
purgeDirectory(Path.of(tempDirectory).toFile());
|
||||
}
|
||||
|
||||
void purgeDirectory(File dir) {
|
||||
for (File file : Objects.requireNonNull(dir.listFiles())) {
|
||||
if (file.isDirectory())
|
||||
purgeDirectory(file);
|
||||
file.delete();
|
||||
public static void purgeDirectory(File dir) {
|
||||
try {
|
||||
for (File file : Objects.requireNonNull(dir.listFiles())) {
|
||||
if (file.isDirectory())
|
||||
purgeDirectory(file);
|
||||
file.delete();
|
||||
}
|
||||
} catch (Exception e) {
|
||||
Log.error("Couldn't purge directory " + dir, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -53,8 +53,8 @@ public class DhfsFuseIT {
|
||||
var loggingConsumer2 = new Slf4jLogConsumer(LoggerFactory.getLogger(DhfsFuseIT.class)).withPrefix("2-" + testInfo.getDisplayName());
|
||||
container2.followOutput(loggingConsumer2.andThen(waitingConsumer2));
|
||||
|
||||
c1uuid = container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/data/stuff/self_uuid").getStdout();
|
||||
c2uuid = container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/data/stuff/self_uuid").getStdout();
|
||||
c1uuid = container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/data/stuff/self_uuid").getStdout();
|
||||
c2uuid = container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/data/stuff/self_uuid").getStdout();
|
||||
|
||||
Assertions.assertDoesNotThrow(() -> UUID.fromString(c1uuid));
|
||||
Assertions.assertDoesNotThrow(() -> UUID.fromString(c2uuid));
|
||||
@@ -85,111 +85,111 @@ public class DhfsFuseIT {
|
||||
|
||||
@Test
|
||||
void readWriteFileTest() throws IOException, InterruptedException, TimeoutException {
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo tesempty > /root/dhfs_default/fuse/testf1").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo tesempty > /dhfs_test/fuse/testf1").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() ->
|
||||
"tesempty\n".equals(container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf1").getStdout()));
|
||||
"tesempty\n".equals(container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testf1").getStdout()));
|
||||
}
|
||||
|
||||
@Test
|
||||
void readWriteRewriteFileTest() throws IOException, InterruptedException, TimeoutException {
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo tesempty > /root/dhfs_default/fuse/testf1").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo tesempty > /dhfs_test/fuse/testf1").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() ->
|
||||
"tesempty\n".equals(container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf1").getStdout()));
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo rewritten > /root/dhfs_default/fuse/testf1").getExitCode());
|
||||
"tesempty\n".equals(container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testf1").getStdout()));
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo rewritten > /dhfs_test/fuse/testf1").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() ->
|
||||
"rewritten\n".equals(container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf1").getStdout()));
|
||||
"rewritten\n".equals(container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testf1").getStdout()));
|
||||
}
|
||||
|
||||
@Test
|
||||
void createDelayedTest() throws IOException, InterruptedException, TimeoutException {
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo tesempty > /root/dhfs_default/fuse/testf1").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo tesempty > /dhfs_test/fuse/testf1").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() ->
|
||||
"tesempty\n".equals(container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf1").getStdout()));
|
||||
"tesempty\n".equals(container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testf1").getStdout()));
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() ->
|
||||
"tesempty\n".equals(container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf1").getStdout()));
|
||||
"tesempty\n".equals(container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testf1").getStdout()));
|
||||
|
||||
var client = DockerClientFactory.instance().client();
|
||||
client.pauseContainerCmd(container2.getContainerId()).exec();
|
||||
|
||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Lost connection to"), 60, TimeUnit.SECONDS);
|
||||
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo newfile > /root/dhfs_default/fuse/testf2").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo newfile > /dhfs_test/fuse/testf2").getExitCode());
|
||||
|
||||
client.unpauseContainerCmd(container2.getContainerId()).exec();
|
||||
|
||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() ->
|
||||
"newfile\n".equals(container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf2").getStdout()));
|
||||
"newfile\n".equals(container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testf2").getStdout()));
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() ->
|
||||
"newfile\n".equals(container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf2").getStdout()));
|
||||
"newfile\n".equals(container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testf2").getStdout()));
|
||||
}
|
||||
|
||||
@Test
|
||||
void writeRewriteDelayedTest() throws IOException, InterruptedException, TimeoutException {
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo tesempty > /root/dhfs_default/fuse/testf1").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo tesempty > /dhfs_test/fuse/testf1").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() ->
|
||||
"tesempty\n".equals(container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf1").getStdout()));
|
||||
"tesempty\n".equals(container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testf1").getStdout()));
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() ->
|
||||
"tesempty\n".equals(container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf1").getStdout()));
|
||||
"tesempty\n".equals(container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testf1").getStdout()));
|
||||
|
||||
var client = DockerClientFactory.instance().client();
|
||||
client.pauseContainerCmd(container2.getContainerId()).exec();
|
||||
|
||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Lost connection to"), 60, TimeUnit.SECONDS);
|
||||
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo rewritten > /root/dhfs_default/fuse/testf1").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo rewritten > /dhfs_test/fuse/testf1").getExitCode());
|
||||
|
||||
client.unpauseContainerCmd(container2.getContainerId()).exec();
|
||||
|
||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
|
||||
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() ->
|
||||
"rewritten\n".equals(container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf1").getStdout()));
|
||||
"rewritten\n".equals(container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testf1").getStdout()));
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() ->
|
||||
"rewritten\n".equals(container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf1").getStdout()));
|
||||
"rewritten\n".equals(container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testf1").getStdout()));
|
||||
}
|
||||
|
||||
// TODO: How this fits with the tree?
|
||||
@Test
|
||||
@Disabled
|
||||
void deleteDelayedTest() throws IOException, InterruptedException, TimeoutException {
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo tesempty > /root/dhfs_default/fuse/testf1").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> "tesempty\n".equals(container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf1").getStdout()));
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> "tesempty\n".equals(container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf1").getStdout()));
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo tesempty > /dhfs_test/fuse/testf1").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> "tesempty\n".equals(container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testf1").getStdout()));
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> "tesempty\n".equals(container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testf1").getStdout()));
|
||||
|
||||
var client = DockerClientFactory.instance().client();
|
||||
client.pauseContainerCmd(container2.getContainerId()).exec();
|
||||
|
||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Lost connection to"), 60, TimeUnit.SECONDS);
|
||||
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "rm /root/dhfs_default/fuse/testf1").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "rm /dhfs_test/fuse/testf1").getExitCode());
|
||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Delaying deletion check"), 60, TimeUnit.SECONDS, 1);
|
||||
|
||||
client.unpauseContainerCmd(container2.getContainerId()).exec();
|
||||
|
||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
|
||||
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse").getExitCode());
|
||||
|
||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Deleting from persistent"), 60, TimeUnit.SECONDS, 1);
|
||||
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Deleting from persistent"), 60, TimeUnit.SECONDS, 3);
|
||||
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 1 == container2.execInContainer("/bin/sh", "-c", "test -f /root/dhfs_default/fuse/testf1").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 1 == container1.execInContainer("/bin/sh", "-c", "test -f /root/dhfs_default/fuse/testf1").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 1 == container2.execInContainer("/bin/sh", "-c", "test -f /dhfs_test/fuse/testf1").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 1 == container1.execInContainer("/bin/sh", "-c", "test -f /dhfs_test/fuse/testf1").getExitCode());
|
||||
}
|
||||
|
||||
@Test
|
||||
void deleteTest() throws IOException, InterruptedException, TimeoutException {
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo tesempty > /root/dhfs_default/fuse/testf1").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo tesempty > /dhfs_test/fuse/testf1").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() ->
|
||||
"tesempty\n".equals(container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf1").getStdout()));
|
||||
"tesempty\n".equals(container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testf1").getStdout()));
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() ->
|
||||
"tesempty\n".equals(container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf1").getStdout()));
|
||||
"tesempty\n".equals(container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testf1").getStdout()));
|
||||
|
||||
Log.info("Deleting");
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "rm /root/dhfs_default/fuse/testf1").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "rm /dhfs_test/fuse/testf1").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() ->
|
||||
0 == container2.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse").getExitCode());
|
||||
0 == container2.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse").getExitCode());
|
||||
Log.info("Deleted");
|
||||
|
||||
// FIXME?
|
||||
@@ -197,50 +197,50 @@ public class DhfsFuseIT {
|
||||
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Deleting from persistent"), 60, TimeUnit.SECONDS, 3);
|
||||
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() ->
|
||||
1 == container2.execInContainer("/bin/sh", "-c", "test -f /root/dhfs_default/fuse/testf1").getExitCode());
|
||||
1 == container2.execInContainer("/bin/sh", "-c", "test -f /dhfs_test/fuse/testf1").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() ->
|
||||
1 == container1.execInContainer("/bin/sh", "-c", "test -f /root/dhfs_default/fuse/testf1").getExitCode());
|
||||
1 == container1.execInContainer("/bin/sh", "-c", "test -f /dhfs_test/fuse/testf1").getExitCode());
|
||||
}
|
||||
|
||||
@Test
|
||||
void moveFileTest() throws IOException, InterruptedException, TimeoutException {
|
||||
Log.info("Creating");
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo tesempty > /root/dhfs_default/fuse/testf1").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> "tesempty\n".equals(container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf1").getStdout()));
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo tesempty > /dhfs_test/fuse/testf1").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> "tesempty\n".equals(container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testf1").getStdout()));
|
||||
Log.info("Listing");
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse/").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse/").getExitCode());
|
||||
Log.info("Moving");
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "mv /root/dhfs_default/fuse/testf1 /root/dhfs_default/fuse/testf2").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "mv /dhfs_test/fuse/testf1 /dhfs_test/fuse/testf2").getExitCode());
|
||||
Log.info("Listing");
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse/").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse/").getExitCode());
|
||||
Log.info("Reading");
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> "tesempty\n".equals(container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf2").getStdout()));
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> "tesempty\n".equals(container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testf2").getStdout()));
|
||||
}
|
||||
|
||||
@Test
|
||||
void moveDirTest() throws IOException, InterruptedException, TimeoutException {
|
||||
Log.info("Creating");
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "mkdir /root/dhfs_default/fuse/testdir").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo tesempty > /root/dhfs_default/fuse/testdir/testf1").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> "tesempty\n".equals(container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testdir/testf1").getStdout()));
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "mkdir /dhfs_test/fuse/testdir").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo tesempty > /dhfs_test/fuse/testdir/testf1").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> "tesempty\n".equals(container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testdir/testf1").getStdout()));
|
||||
Log.info("Listing");
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse/").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse/").getExitCode());
|
||||
Log.info("Moving");
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "mkdir /root/dhfs_default/fuse/testdir2").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "mv /root/dhfs_default/fuse/testdir /root/dhfs_default/fuse/testdir2/testdirm").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "mkdir /dhfs_test/fuse/testdir2").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "mv /dhfs_test/fuse/testdir /dhfs_test/fuse/testdir2/testdirm").getExitCode());
|
||||
Log.info("Listing");
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse/").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse/").getExitCode());
|
||||
Log.info("Reading");
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> "tesempty\n".equals(container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testdir2/testdirm/testf1").getStdout()));
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> "tesempty\n".equals(container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testdir2/testdirm/testf1").getStdout()));
|
||||
}
|
||||
|
||||
|
||||
// TODO: This probably shouldn't be working right now
|
||||
@Test
|
||||
void removeAddHostTest() throws IOException, InterruptedException, TimeoutException {
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo tesempty > /root/dhfs_default/fuse/testf1").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> "tesempty\n".equals(container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf1").getStdout()));
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> "tesempty\n".equals(container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf1").getStdout()));
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo tesempty > /dhfs_test/fuse/testf1").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> "tesempty\n".equals(container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testf1").getStdout()));
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> "tesempty\n".equals(container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testf1").getStdout()));
|
||||
|
||||
var c2curl = container2.execInContainer("/bin/sh", "-c",
|
||||
"curl --header \"Content-Type: application/json\" " +
|
||||
@@ -248,14 +248,14 @@ public class DhfsFuseIT {
|
||||
" --data '{\"uuid\":\"" + c1uuid + "\"}' " +
|
||||
" http://localhost:8080/peers-manage/known-peers");
|
||||
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "echo rewritten > /root/dhfs_default/fuse/testf1").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "echo jioadsd > /root/dhfs_default/fuse/newfile1").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo asvdkljm > /root/dhfs_default/fuse/newfile1").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> "tesempty\n".equals(container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf1").getStdout()));
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "echo rewritten > /dhfs_test/fuse/testf1").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "echo jioadsd > /dhfs_test/fuse/newfile1").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo asvdkljm > /dhfs_test/fuse/newfile1").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> "tesempty\n".equals(container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testf1").getStdout()));
|
||||
|
||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Lost connection to"), 60, TimeUnit.SECONDS);
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "echo dfgvh > /root/dhfs_default/fuse/newfile2").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo dscfg > /root/dhfs_default/fuse/newfile2").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "echo dfgvh > /dhfs_test/fuse/newfile2").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo dscfg > /dhfs_test/fuse/newfile2").getExitCode());
|
||||
|
||||
Log.info("Re-adding");
|
||||
container2.execInContainer("/bin/sh", "-c",
|
||||
@@ -266,14 +266,14 @@ public class DhfsFuseIT {
|
||||
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
|
||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
|
||||
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> "rewritten\n".equals(container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf1").getStdout()));
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> "rewritten\n".equals(container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf1").getStdout()));
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> "rewritten\n".equals(container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testf1").getStdout()));
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> "rewritten\n".equals(container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testf1").getStdout()));
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> {
|
||||
Log.info("Listing removeAddHostTest");
|
||||
var cat1 = container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/*");
|
||||
var cat2 = container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/*");
|
||||
var ls1 = container1.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse/");
|
||||
var ls2 = container2.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse/");
|
||||
var cat1 = container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/*");
|
||||
var cat2 = container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/*");
|
||||
var ls1 = container1.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse/");
|
||||
var ls2 = container2.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse/");
|
||||
Log.info(cat1);
|
||||
Log.info(cat2);
|
||||
Log.info(ls1);
|
||||
@@ -286,10 +286,10 @@ public class DhfsFuseIT {
|
||||
|
||||
@Test
|
||||
void dirConflictTest() throws IOException, InterruptedException, TimeoutException {
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse").getExitCode());
|
||||
boolean createFail = Stream.of(Pair.of(container1, "echo test1 >> /root/dhfs_default/fuse/testf"),
|
||||
Pair.of(container2, "echo test2 >> /root/dhfs_default/fuse/testf")).parallel().map(p -> {
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse").getExitCode());
|
||||
boolean createFail = Stream.of(Pair.of(container1, "echo test1 >> /dhfs_test/fuse/testf"),
|
||||
Pair.of(container2, "echo test2 >> /dhfs_test/fuse/testf")).parallel().map(p -> {
|
||||
try {
|
||||
return p.getLeft().execInContainer("/bin/sh", "-c", p.getRight()).getExitCode();
|
||||
} catch (Exception e) {
|
||||
@@ -298,8 +298,8 @@ public class DhfsFuseIT {
|
||||
}).anyMatch(r -> r != 0);
|
||||
Assumptions.assumeTrue(!createFail, "Failed creating one or more files");
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> {
|
||||
var ls = container2.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse");
|
||||
var cat = container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/*");
|
||||
var ls = container2.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse");
|
||||
var cat = container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/*");
|
||||
Log.info(ls);
|
||||
Log.info(cat);
|
||||
return cat.getStdout().contains("test1") && cat.getStdout().contains("test2");
|
||||
@@ -308,38 +308,38 @@ public class DhfsFuseIT {
|
||||
|
||||
@Test
|
||||
void dirCycleTest() throws IOException, InterruptedException, TimeoutException {
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "mkdir /root/dhfs_default/fuse/a").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "mkdir /root/dhfs_default/fuse/b").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo xqr489 >> /root/dhfs_default/fuse/a/testfa").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo ahinou >> /root/dhfs_default/fuse/b/testfb").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "ls -lavh /root/dhfs_default/fuse").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "mkdir /dhfs_test/fuse/a").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "mkdir /dhfs_test/fuse/b").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo xqr489 >> /dhfs_test/fuse/a/testfa").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo ahinou >> /dhfs_test/fuse/b/testfb").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "ls -lavh /dhfs_test/fuse").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> {
|
||||
var c2ls = container2.execInContainer("/bin/sh", "-c", "find /root/dhfs_default/fuse -type f -exec cat {} \\;");
|
||||
var c2ls = container2.execInContainer("/bin/sh", "-c", "find /dhfs_test/fuse -type f -exec cat {} \\;");
|
||||
return c2ls.getExitCode() == 0 && c2ls.getStdout().contains("xqr489") && c2ls.getStdout().contains("ahinou");
|
||||
});
|
||||
|
||||
var client = DockerClientFactory.instance().client();
|
||||
client.pauseContainerCmd(container1.getContainerId()).exec();
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "mv /root/dhfs_default/fuse/a /root/dhfs_default/fuse/b").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "mv /dhfs_test/fuse/a /dhfs_test/fuse/b").getExitCode());
|
||||
client.pauseContainerCmd(container2.getContainerId()).exec();
|
||||
client.unpauseContainerCmd(container1.getContainerId()).exec();
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "mv /root/dhfs_default/fuse/b /root/dhfs_default/fuse/a").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "mv /dhfs_test/fuse/b /dhfs_test/fuse/a").getExitCode());
|
||||
client.unpauseContainerCmd(container2.getContainerId()).exec();
|
||||
|
||||
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> {
|
||||
Log.info("Listing dirCycleTest");
|
||||
Log.info(container1.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse"));
|
||||
Log.info(container1.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse/a"));
|
||||
Log.info(container1.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse/b"));
|
||||
Log.info(container2.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse"));
|
||||
Log.info(container2.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse/a"));
|
||||
Log.info(container2.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse/b"));
|
||||
Log.info(container1.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse"));
|
||||
Log.info(container1.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse/a"));
|
||||
Log.info(container1.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse/b"));
|
||||
Log.info(container2.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse"));
|
||||
Log.info(container2.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse/a"));
|
||||
Log.info(container2.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse/b"));
|
||||
|
||||
var c1ls2 = container1.execInContainer("/bin/sh", "-c", "find /root/dhfs_default/fuse -maxdepth 3 -type f -exec cat {} \\;");
|
||||
var c1ls2 = container1.execInContainer("/bin/sh", "-c", "find /dhfs_test/fuse -maxdepth 3 -type f -exec cat {} \\;");
|
||||
Log.info(c1ls2);
|
||||
var c2ls2 = container1.execInContainer("/bin/sh", "-c", "find /root/dhfs_default/fuse -maxdepth 3 -type f -exec cat {} \\;");
|
||||
var c2ls2 = container1.execInContainer("/bin/sh", "-c", "find /dhfs_test/fuse -maxdepth 3 -type f -exec cat {} \\;");
|
||||
Log.info(c2ls2);
|
||||
|
||||
return c1ls2.getStdout().contains("xqr489") && c1ls2.getStdout().contains("ahinou")
|
||||
@@ -353,27 +353,27 @@ public class DhfsFuseIT {
|
||||
void removeAndMove() throws IOException, InterruptedException, TimeoutException {
|
||||
var client = DockerClientFactory.instance().client();
|
||||
Log.info("Creating");
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo tesempty > /root/dhfs_default/fuse/testf1").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> "tesempty\n".equals(container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf1").getStdout()));
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo tesempty > /dhfs_test/fuse/testf1").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> "tesempty\n".equals(container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testf1").getStdout()));
|
||||
Log.info("Listing");
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse/").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> "tesempty\n".equals(container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf1").getStdout()));
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse/").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> "tesempty\n".equals(container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testf1").getStdout()));
|
||||
|
||||
client.pauseContainerCmd(container1.getContainerId()).exec();
|
||||
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Lost connection to"), 60, TimeUnit.SECONDS, 1);
|
||||
|
||||
Log.info("Removing");
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "rm /root/dhfs_default/fuse/testf1").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "rm /dhfs_test/fuse/testf1").getExitCode());
|
||||
|
||||
client.pauseContainerCmd(container2.getContainerId()).exec();
|
||||
client.unpauseContainerCmd(container1.getContainerId()).exec();
|
||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Lost connection to"), 60, TimeUnit.SECONDS, 1);
|
||||
Log.info("Moving");
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "mv /root/dhfs_default/fuse/testf1 /root/dhfs_default/fuse/testf2").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "mv /dhfs_test/fuse/testf1 /dhfs_test/fuse/testf2").getExitCode());
|
||||
Log.info("Listing");
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse/").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse/").getExitCode());
|
||||
Log.info("Reading");
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> "tesempty\n".equals(container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf2").getStdout()));
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> "tesempty\n".equals(container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testf2").getStdout()));
|
||||
client.unpauseContainerCmd(container2.getContainerId()).exec();
|
||||
|
||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS, 1);
|
||||
@@ -382,10 +382,10 @@ public class DhfsFuseIT {
|
||||
// TODO: it always seems to be removed?
|
||||
Log.info("Reading both");
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> {
|
||||
var ls1 = container1.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse/");
|
||||
var ls2 = container2.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse/");
|
||||
var cat1 = container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/*");
|
||||
var cat2 = container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/*");
|
||||
var ls1 = container1.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse/");
|
||||
var ls2 = container2.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse/");
|
||||
var cat1 = container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/*");
|
||||
var cat2 = container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/*");
|
||||
Log.info("cat1: " + cat1);
|
||||
Log.info("cat2: " + cat2);
|
||||
Log.info("ls1: " + ls1);
|
||||
|
||||
@@ -59,9 +59,9 @@ public class DhfsFusex3IT {
|
||||
|
||||
Stream.of(container1, container2, container3).parallel().forEach(GenericContainer::start);
|
||||
|
||||
c1uuid = container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/data/stuff/self_uuid").getStdout();
|
||||
c2uuid = container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/data/stuff/self_uuid").getStdout();
|
||||
c3uuid = container3.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/data/stuff/self_uuid").getStdout();
|
||||
c1uuid = container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/data/stuff/self_uuid").getStdout();
|
||||
c2uuid = container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/data/stuff/self_uuid").getStdout();
|
||||
c3uuid = container3.execInContainer("/bin/sh", "-c", "cat /dhfs_test/data/stuff/self_uuid").getStdout();
|
||||
|
||||
Log.info(container1.getContainerId() + "=" + c1uuid);
|
||||
Log.info(container2.getContainerId() + "=" + c2uuid);
|
||||
@@ -119,8 +119,8 @@ public class DhfsFusex3IT {
|
||||
|
||||
private boolean checkEmpty() throws IOException, InterruptedException {
|
||||
for (var container : List.of(container1, container2, container3)) {
|
||||
var found = container.execInContainer("/bin/sh", "-c", "find /root/dhfs_default/data/objs -type f");
|
||||
var foundWc = container.execInContainer("/bin/sh", "-c", "find /root/dhfs_default/data/objs -type f | wc -l");
|
||||
var found = container.execInContainer("/bin/sh", "-c", "find /dhfs_test/data/objs -type f");
|
||||
var foundWc = container.execInContainer("/bin/sh", "-c", "find /dhfs_test/data/objs -type f | wc -l");
|
||||
Log.info("Remaining objects in " + container.getContainerId() + ": " + found.toString() + " " + foundWc.toString());
|
||||
if (!(found.getExitCode() == 0 && foundWc.getExitCode() == 0 && Integer.parseInt(foundWc.getStdout().strip()) == emptyFileCount))
|
||||
return false;
|
||||
@@ -135,47 +135,47 @@ public class DhfsFusex3IT {
|
||||
|
||||
@Test
|
||||
void readWriteFileTest() throws IOException, InterruptedException, TimeoutException {
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo tesempty > /root/dhfs_default/fuse/testf1").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> "tesempty\n".equals(container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf1").getStdout()));
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> "tesempty\n".equals(container3.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf1").getStdout()));
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo tesempty > /dhfs_test/fuse/testf1").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> "tesempty\n".equals(container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testf1").getStdout()));
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> "tesempty\n".equals(container3.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testf1").getStdout()));
|
||||
}
|
||||
|
||||
// FIXME:
|
||||
@Test
|
||||
@Disabled
|
||||
void largerFileDeleteTest() throws IOException, InterruptedException, TimeoutException {
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "cd /root/dhfs_default/fuse && dd if=/dev/urandom of=10MB.bin bs=1M count=10").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "head -c 10 /root/dhfs_default/fuse/10MB.bin").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container3.execInContainer("/bin/sh", "-c", "rm /root/dhfs_default/fuse/10MB.bin").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "cd /dhfs_test/fuse && dd if=/dev/urandom of=10MB.bin bs=1M count=10").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "head -c 10 /dhfs_test/fuse/10MB.bin").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container3.execInContainer("/bin/sh", "-c", "rm /dhfs_test/fuse/10MB.bin").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> checkEmpty());
|
||||
}
|
||||
|
||||
@Test
|
||||
@Disabled
|
||||
void largerFileDeleteTestNoDelays() throws IOException, InterruptedException, TimeoutException {
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "cd /root/dhfs_default/fuse && dd if=/dev/urandom of=10MB.bin bs=1M count=10").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "head -c 10 /root/dhfs_default/fuse/10MB.bin").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container3.execInContainer("/bin/sh", "-c", "rm /root/dhfs_default/fuse/10MB.bin").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "cd /dhfs_test/fuse && dd if=/dev/urandom of=10MB.bin bs=1M count=10").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "head -c 10 /dhfs_test/fuse/10MB.bin").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container3.execInContainer("/bin/sh", "-c", "rm /dhfs_test/fuse/10MB.bin").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> checkEmpty());
|
||||
}
|
||||
|
||||
@Test
|
||||
void gccHelloWorldTest() throws IOException, InterruptedException, TimeoutException {
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo '#include<stdio.h>\nint main(){printf(\"hello world\"); return 0;}' > /root/dhfs_default/fuse/hello.c").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "cd /root/dhfs_default/fuse && gcc hello.c").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo '#include<stdio.h>\nint main(){printf(\"hello world\"); return 0;}' > /dhfs_test/fuse/hello.c").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "cd /dhfs_test/fuse && gcc hello.c").getExitCode());
|
||||
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> {
|
||||
var helloOut = container1.execInContainer("/bin/sh", "-c", "/root/dhfs_default/fuse/a.out");
|
||||
var helloOut = container1.execInContainer("/bin/sh", "-c", "/dhfs_test/fuse/a.out");
|
||||
Log.info(helloOut);
|
||||
return helloOut.getExitCode() == 0 && helloOut.getStdout().equals("hello world");
|
||||
});
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> {
|
||||
var helloOut = container2.execInContainer("/bin/sh", "-c", "/root/dhfs_default/fuse/a.out");
|
||||
var helloOut = container2.execInContainer("/bin/sh", "-c", "/dhfs_test/fuse/a.out");
|
||||
Log.info(helloOut);
|
||||
return helloOut.getExitCode() == 0 && helloOut.getStdout().equals("hello world");
|
||||
});
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> {
|
||||
var helloOut = container3.execInContainer("/bin/sh", "-c", "/root/dhfs_default/fuse/a.out");
|
||||
var helloOut = container3.execInContainer("/bin/sh", "-c", "/dhfs_test/fuse/a.out");
|
||||
Log.info(helloOut);
|
||||
return helloOut.getExitCode() == 0 && helloOut.getStdout().equals("hello world");
|
||||
});
|
||||
@@ -183,9 +183,9 @@ public class DhfsFusex3IT {
|
||||
|
||||
@Test
|
||||
void removeHostTest() throws IOException, InterruptedException, TimeoutException {
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo tesempty > /root/dhfs_default/fuse/testf1").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> "tesempty\n".equals(container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf1").getStdout()));
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> "tesempty\n".equals(container3.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf1").getStdout()));
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo tesempty > /dhfs_test/fuse/testf1").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> "tesempty\n".equals(container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testf1").getStdout()));
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> "tesempty\n".equals(container3.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testf1").getStdout()));
|
||||
|
||||
var c3curl = container3.execInContainer("/bin/sh", "-c",
|
||||
"curl --header \"Content-Type: application/json\" " +
|
||||
@@ -195,10 +195,10 @@ public class DhfsFusex3IT {
|
||||
|
||||
Thread.sleep(10000);
|
||||
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "echo rewritten > /root/dhfs_default/fuse/testf1").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> "rewritten\n".equals(container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf1").getStdout()));
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> "tesempty\n".equals(container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf1").getStdout()));
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> "tesempty\n".equals(container3.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf1").getStdout()));
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "echo rewritten > /dhfs_test/fuse/testf1").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> "rewritten\n".equals(container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testf1").getStdout()));
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> "tesempty\n".equals(container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testf1").getStdout()));
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> "tesempty\n".equals(container3.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testf1").getStdout()));
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -208,15 +208,15 @@ public class DhfsFusex3IT {
|
||||
client.pauseContainerCmd(container2.getContainerId()).exec();
|
||||
// Pauses needed as otherwise docker buffers some incoming packets
|
||||
waitingConsumer3.waitUntil(frame -> frame.getUtf8String().contains("Lost connection to"), 60, TimeUnit.SECONDS, 2);
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container3.execInContainer("/bin/sh", "-c", "echo test3 >> /root/dhfs_default/fuse/testf").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container3.execInContainer("/bin/sh", "-c", "echo test3 >> /dhfs_test/fuse/testf").getExitCode());
|
||||
client.pauseContainerCmd(container3.getContainerId()).exec();
|
||||
client.unpauseContainerCmd(container2.getContainerId()).exec();
|
||||
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Lost connection to"), 60, TimeUnit.SECONDS, 2);
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "echo test2 >> /root/dhfs_default/fuse/testf").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "echo test2 >> /dhfs_test/fuse/testf").getExitCode());
|
||||
client.pauseContainerCmd(container2.getContainerId()).exec();
|
||||
client.unpauseContainerCmd(container1.getContainerId()).exec();
|
||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Lost connection to"), 60, TimeUnit.SECONDS, 2);
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo test1 >> /root/dhfs_default/fuse/testf").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo test1 >> /dhfs_test/fuse/testf").getExitCode());
|
||||
client.unpauseContainerCmd(container2.getContainerId()).exec();
|
||||
client.unpauseContainerCmd(container3.getContainerId()).exec();
|
||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS, 2);
|
||||
@@ -225,8 +225,8 @@ public class DhfsFusex3IT {
|
||||
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> {
|
||||
for (var c : List.of(container1, container2, container3)) {
|
||||
var ls = c.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse");
|
||||
var cat = c.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/*");
|
||||
var ls = c.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse");
|
||||
var cat = c.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/*");
|
||||
Log.info(ls);
|
||||
Log.info(cat);
|
||||
if (!(cat.getStdout().contains("test1") && cat.getStdout().contains("test2") && cat.getStdout().contains("test3")))
|
||||
@@ -236,35 +236,35 @@ public class DhfsFusex3IT {
|
||||
});
|
||||
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> {
|
||||
return container1.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse").getStdout().equals(
|
||||
container2.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse").getStdout()) &&
|
||||
container3.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse").getStdout().equals(
|
||||
container2.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse").getStdout()) &&
|
||||
container3.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/*").getStdout().equals(
|
||||
container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/*").getStdout());
|
||||
return container1.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse").getStdout().equals(
|
||||
container2.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse").getStdout()) &&
|
||||
container3.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse").getStdout().equals(
|
||||
container2.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse").getStdout()) &&
|
||||
container3.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/*").getStdout().equals(
|
||||
container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/*").getStdout());
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
void fileConflictTest() throws IOException, InterruptedException, TimeoutException {
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo tesempty > /root/dhfs_default/fuse/testf").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> "tesempty\n".equals(container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf").getStdout()));
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> "tesempty\n".equals(container3.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf").getStdout()));
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo tesempty > /dhfs_test/fuse/testf").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> "tesempty\n".equals(container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testf").getStdout()));
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> "tesempty\n".equals(container3.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testf").getStdout()));
|
||||
|
||||
var client = DockerClientFactory.instance().client();
|
||||
client.pauseContainerCmd(container1.getContainerId()).exec();
|
||||
client.pauseContainerCmd(container2.getContainerId()).exec();
|
||||
// Pauses needed as otherwise docker buffers some incoming packets
|
||||
waitingConsumer3.waitUntil(frame -> frame.getUtf8String().contains("Lost connection to"), 60, TimeUnit.SECONDS, 2);
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container3.execInContainer("/bin/sh", "-c", "echo test3 >> /root/dhfs_default/fuse/testf").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container3.execInContainer("/bin/sh", "-c", "echo test3 >> /dhfs_test/fuse/testf").getExitCode());
|
||||
client.pauseContainerCmd(container3.getContainerId()).exec();
|
||||
client.unpauseContainerCmd(container2.getContainerId()).exec();
|
||||
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Lost connection to"), 60, TimeUnit.SECONDS, 2);
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "echo test2 >> /root/dhfs_default/fuse/testf").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "echo test2 >> /dhfs_test/fuse/testf").getExitCode());
|
||||
client.pauseContainerCmd(container2.getContainerId()).exec();
|
||||
client.unpauseContainerCmd(container1.getContainerId()).exec();
|
||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Lost connection to"), 60, TimeUnit.SECONDS, 2);
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo test1 >> /root/dhfs_default/fuse/testf").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo test1 >> /dhfs_test/fuse/testf").getExitCode());
|
||||
client.unpauseContainerCmd(container2.getContainerId()).exec();
|
||||
client.unpauseContainerCmd(container3.getContainerId()).exec();
|
||||
Log.warn("Waiting for connections");
|
||||
@@ -276,9 +276,9 @@ public class DhfsFusex3IT {
|
||||
// TODO: There's some issue with cache, so avoid file reads
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> {
|
||||
Log.info("Listing consistency 1");
|
||||
var ls1 = container1.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse");
|
||||
var ls2 = container2.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse");
|
||||
var ls3 = container3.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse");
|
||||
var ls1 = container1.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse");
|
||||
var ls2 = container2.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse");
|
||||
var ls3 = container3.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse");
|
||||
Log.info(ls1);
|
||||
Log.info(ls2);
|
||||
Log.info(ls3);
|
||||
@@ -290,8 +290,8 @@ public class DhfsFusex3IT {
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> {
|
||||
Log.info("Listing");
|
||||
for (var c : List.of(container1, container2, container3)) {
|
||||
var ls = c.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse");
|
||||
var cat = c.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/*");
|
||||
var ls = c.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse");
|
||||
var cat = c.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/*");
|
||||
Log.info(ls);
|
||||
Log.info(cat);
|
||||
if (!(cat.getExitCode() == 0 && ls.getExitCode() == 0))
|
||||
@@ -304,12 +304,12 @@ public class DhfsFusex3IT {
|
||||
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> {
|
||||
Log.info("Listing consistency");
|
||||
var ls1 = container1.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse");
|
||||
var cat1 = container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/*");
|
||||
var ls2 = container2.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse");
|
||||
var cat2 = container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/*");
|
||||
var ls3 = container3.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse");
|
||||
var cat3 = container3.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/*");
|
||||
var ls1 = container1.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse");
|
||||
var cat1 = container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/*");
|
||||
var ls2 = container2.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse");
|
||||
var cat2 = container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/*");
|
||||
var ls3 = container3.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse");
|
||||
var cat3 = container3.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/*");
|
||||
Log.info(ls1);
|
||||
Log.info(cat1);
|
||||
Log.info(ls2);
|
||||
|
||||
@@ -83,7 +83,11 @@ public class DhfsImage implements Future<String> {
|
||||
"-Dquarkus.log.category.\"com.usatiuk\".level=TRACE",
|
||||
"-Dquarkus.log.category.\"com.usatiuk.dhfs\".level=TRACE",
|
||||
"-Ddhfs.objects.periodic-push-op-interval=5s",
|
||||
"-Ddhfs.fuse.root=/dhfs_test/fuse",
|
||||
"-Ddhfs.objects.persistence.files.root=/dhfs_test/data",
|
||||
"-Ddhfs.objects.persistence.stuff.root=/dhfs_test/data/stuff",
|
||||
"-jar", "/app/quarkus-run.jar")
|
||||
.run("mkdir /dhfs_test && chmod 777 /dhfs_test")
|
||||
.build())
|
||||
.withFileFromPath("/app", Paths.get(buildPath, "quarkus-app"))
|
||||
.withFileFromPath("/libs", Paths.get(nativeLibsDirectory));
|
||||
|
||||
@@ -0,0 +1,182 @@
|
||||
package com.usatiuk.dhfs.integration;
|
||||
|
||||
import com.github.dockerjava.api.model.Device;
|
||||
import com.usatiuk.dhfs.TestDataCleaner;
|
||||
import io.quarkus.logging.Log;
|
||||
import org.junit.jupiter.api.*;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.testcontainers.DockerClientFactory;
|
||||
import org.testcontainers.containers.GenericContainer;
|
||||
import org.testcontainers.containers.Network;
|
||||
import org.testcontainers.containers.output.Slf4jLogConsumer;
|
||||
import org.testcontainers.containers.output.WaitingConsumer;
|
||||
import org.testcontainers.containers.wait.strategy.Wait;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Files;
|
||||
import java.time.Duration;
|
||||
import java.util.Objects;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.CyclicBarrier;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import static org.awaitility.Awaitility.await;
|
||||
|
||||
public class KillIT {
|
||||
GenericContainer<?> container1;
|
||||
GenericContainer<?> container2;
|
||||
|
||||
WaitingConsumer waitingConsumer1;
|
||||
WaitingConsumer waitingConsumer2;
|
||||
|
||||
String c1uuid;
|
||||
String c2uuid;
|
||||
|
||||
File data1;
|
||||
File data2;
|
||||
|
||||
@BeforeEach
|
||||
void setup(TestInfo testInfo) throws IOException, InterruptedException, TimeoutException {
|
||||
data1 = Files.createTempDirectory("").toFile();
|
||||
data2 = Files.createTempDirectory("").toFile();
|
||||
|
||||
Network network = Network.newNetwork();
|
||||
|
||||
container1 = new GenericContainer<>(DhfsImage.getInstance())
|
||||
.withPrivilegedMode(true)
|
||||
.withCreateContainerCmdModifier(cmd -> Objects.requireNonNull(cmd.getHostConfig()).withDevices(Device.parse("/dev/fuse")))
|
||||
.waitingFor(Wait.forLogMessage(".*Listening.*", 1).withStartupTimeout(Duration.ofSeconds(60))).withNetwork(network)
|
||||
.withFileSystemBind(data1.getAbsolutePath(), "/dhfs_test/data");
|
||||
container2 = new GenericContainer<>(DhfsImage.getInstance())
|
||||
.withPrivilegedMode(true)
|
||||
.withCreateContainerCmdModifier(cmd -> Objects.requireNonNull(cmd.getHostConfig()).withDevices(Device.parse("/dev/fuse")))
|
||||
.waitingFor(Wait.forLogMessage(".*Listening.*", 1).withStartupTimeout(Duration.ofSeconds(60))).withNetwork(network)
|
||||
.withFileSystemBind(data2.getAbsolutePath(), "/dhfs_test/data");
|
||||
|
||||
Stream.of(container1, container2).parallel().forEach(GenericContainer::start);
|
||||
|
||||
waitingConsumer1 = new WaitingConsumer();
|
||||
var loggingConsumer1 = new Slf4jLogConsumer(LoggerFactory.getLogger(KillIT.class)).withPrefix("1-" + testInfo.getDisplayName());
|
||||
container1.followOutput(loggingConsumer1.andThen(waitingConsumer1));
|
||||
waitingConsumer2 = new WaitingConsumer();
|
||||
var loggingConsumer2 = new Slf4jLogConsumer(LoggerFactory.getLogger(KillIT.class)).withPrefix("2-" + testInfo.getDisplayName());
|
||||
container2.followOutput(loggingConsumer2.andThen(waitingConsumer2));
|
||||
|
||||
c1uuid = container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/data/stuff/self_uuid").getStdout();
|
||||
c2uuid = container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/data/stuff/self_uuid").getStdout();
|
||||
|
||||
Assertions.assertDoesNotThrow(() -> UUID.fromString(c1uuid));
|
||||
Assertions.assertDoesNotThrow(() -> UUID.fromString(c2uuid));
|
||||
|
||||
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("New address"), 60, TimeUnit.SECONDS);
|
||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("New address"), 60, TimeUnit.SECONDS);
|
||||
|
||||
var c1curl = container1.execInContainer("/bin/sh", "-c",
|
||||
"curl --header \"Content-Type: application/json\" " +
|
||||
" --request PUT " +
|
||||
" --data '{\"uuid\":\"" + c2uuid + "\"}' " +
|
||||
" http://localhost:8080/peers-manage/known-peers");
|
||||
|
||||
var c2curl = container2.execInContainer("/bin/sh", "-c",
|
||||
"curl --header \"Content-Type: application/json\" " +
|
||||
" --request PUT " +
|
||||
" --data '{\"uuid\":\"" + c1uuid + "\"}' " +
|
||||
" http://localhost:8080/peers-manage/known-peers");
|
||||
|
||||
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
|
||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
@AfterEach
|
||||
void stop() {
|
||||
Stream.of(container1, container2).parallel().forEach(GenericContainer::stop);
|
||||
TestDataCleaner.purgeDirectory(data1);
|
||||
TestDataCleaner.purgeDirectory(data2);
|
||||
}
|
||||
|
||||
@Test
|
||||
void killTest(TestInfo testInfo) throws Exception {
|
||||
var executor = Executors.newFixedThreadPool(2);
|
||||
var barrier = new CyclicBarrier(2);
|
||||
var ret1 = executor.submit(() -> {
|
||||
try {
|
||||
Log.info("Writing to container 1");
|
||||
barrier.await();
|
||||
container1.execInContainer("/bin/sh", "-c", "counter=0; while true; do counter=`expr $counter + 1`; echo $counter >> /dhfs_test/fuse/test1; done");
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
});
|
||||
barrier.await();
|
||||
Thread.sleep(10000);
|
||||
var client = DockerClientFactory.instance().client();
|
||||
client.killContainerCmd(container1.getContainerId()).exec();
|
||||
container1.stop();
|
||||
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Lost connection to"), 60, TimeUnit.SECONDS);
|
||||
container1.start();
|
||||
waitingConsumer1 = new WaitingConsumer();
|
||||
var loggingConsumer1 = new Slf4jLogConsumer(LoggerFactory.getLogger(KillIT.class)).withPrefix("1-" + testInfo.getDisplayName());
|
||||
container1.followOutput(loggingConsumer1.andThen(waitingConsumer1));
|
||||
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
|
||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
|
||||
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> {
|
||||
Log.info("Listing consistency");
|
||||
var ls1 = container1.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse");
|
||||
var cat1 = container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/*");
|
||||
var ls2 = container2.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse");
|
||||
var cat2 = container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/*");
|
||||
Log.info(ls1);
|
||||
Log.info(cat1);
|
||||
Log.info(ls2);
|
||||
Log.info(cat2);
|
||||
|
||||
return ls1.equals(ls2) && cat1.equals(cat2);
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
void killTestDirs(TestInfo testInfo) throws Exception {
|
||||
var executor = Executors.newFixedThreadPool(2);
|
||||
var barrier = new CyclicBarrier(2);
|
||||
var ret1 = executor.submit(() -> {
|
||||
try {
|
||||
Log.info("Writing to container 1");
|
||||
barrier.await();
|
||||
container1.execInContainer("/bin/sh", "-c", "counter=0; while true; do counter=`expr $counter + 1`; echo $counter >> /dhfs_test/fuse/test$counter; done");
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
});
|
||||
barrier.await();
|
||||
Thread.sleep(10000);
|
||||
var client = DockerClientFactory.instance().client();
|
||||
client.killContainerCmd(container1.getContainerId()).exec();
|
||||
container1.stop();
|
||||
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Lost connection to"), 60, TimeUnit.SECONDS);
|
||||
container1.start();
|
||||
waitingConsumer1 = new WaitingConsumer();
|
||||
var loggingConsumer1 = new Slf4jLogConsumer(LoggerFactory.getLogger(KillIT.class)).withPrefix("1-" + testInfo.getDisplayName());
|
||||
container1.followOutput(loggingConsumer1.andThen(waitingConsumer1));
|
||||
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
|
||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
|
||||
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> {
|
||||
Log.info("Listing consistency");
|
||||
var ls1 = container1.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse");
|
||||
var cat1 = container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/*");
|
||||
var ls2 = container2.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse");
|
||||
var cat2 = container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/*");
|
||||
Log.info(ls1);
|
||||
Log.info(cat1);
|
||||
Log.info(ls2);
|
||||
Log.info(cat2);
|
||||
|
||||
return ls1.equals(ls2) && cat1.equals(cat2);
|
||||
});
|
||||
}
|
||||
}
|
||||
@@ -59,9 +59,9 @@ public class ResyncIT {
|
||||
|
||||
@Test
|
||||
void readWriteFileTest() throws IOException, InterruptedException, TimeoutException {
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo tesempty > /root/dhfs_default/fuse/testf1").getExitCode());
|
||||
c1uuid = container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/data/stuff/self_uuid").getStdout();
|
||||
c2uuid = container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/data/stuff/self_uuid").getStdout();
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo tesempty > /dhfs_test/fuse/testf1").getExitCode());
|
||||
c1uuid = container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/data/stuff/self_uuid").getStdout();
|
||||
c2uuid = container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/data/stuff/self_uuid").getStdout();
|
||||
|
||||
Assertions.assertDoesNotThrow(() -> UUID.fromString(c1uuid));
|
||||
Assertions.assertDoesNotThrow(() -> UUID.fromString(c2uuid));
|
||||
@@ -84,24 +84,24 @@ public class ResyncIT {
|
||||
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
|
||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
|
||||
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> "tesempty\n".equals(container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf1").getStdout()));
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> "tesempty\n".equals(container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testf1").getStdout()));
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
void manyFiles() throws IOException, InterruptedException, TimeoutException {
|
||||
var ret = container1.execInContainer("/bin/sh", "-c", "for i in $(seq 1 200); do echo $i > /root/dhfs_default/fuse/test$i; done");
|
||||
var ret = container1.execInContainer("/bin/sh", "-c", "for i in $(seq 1 200); do echo $i > /dhfs_test/fuse/test$i; done");
|
||||
Assertions.assertEquals(0, ret.getExitCode());
|
||||
var foundWc = container1.execInContainer("/bin/sh", "-c", "find /root/dhfs_default/fuse -type f | wc -l");
|
||||
var foundWc = container1.execInContainer("/bin/sh", "-c", "find /dhfs_test/fuse -type f | wc -l");
|
||||
Assertions.assertEquals(200, Integer.valueOf(foundWc.getStdout().strip()));
|
||||
|
||||
ret = container2.execInContainer("/bin/sh", "-c", "for i in $(seq 1 200); do echo $i > /root/dhfs_default/fuse/test-2-$i; done");
|
||||
ret = container2.execInContainer("/bin/sh", "-c", "for i in $(seq 1 200); do echo $i > /dhfs_test/fuse/test-2-$i; done");
|
||||
Assertions.assertEquals(0, ret.getExitCode());
|
||||
foundWc = container2.execInContainer("/bin/sh", "-c", "find /root/dhfs_default/fuse -type f | wc -l");
|
||||
foundWc = container2.execInContainer("/bin/sh", "-c", "find /dhfs_test/fuse -type f | wc -l");
|
||||
Assertions.assertEquals(200, Integer.valueOf(foundWc.getStdout().strip()));
|
||||
|
||||
c1uuid = container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/data/stuff/self_uuid").getStdout();
|
||||
c2uuid = container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/data/stuff/self_uuid").getStdout();
|
||||
c1uuid = container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/data/stuff/self_uuid").getStdout();
|
||||
c2uuid = container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/data/stuff/self_uuid").getStdout();
|
||||
|
||||
Assertions.assertDoesNotThrow(() -> UUID.fromString(c1uuid));
|
||||
Assertions.assertDoesNotThrow(() -> UUID.fromString(c2uuid));
|
||||
@@ -124,24 +124,24 @@ public class ResyncIT {
|
||||
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
|
||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
|
||||
await().atMost(120, TimeUnit.SECONDS).until(() -> {
|
||||
var foundWc2 = container2.execInContainer("/bin/sh", "-c", "find /root/dhfs_default/fuse -type f | wc -l");
|
||||
var foundWc2 = container2.execInContainer("/bin/sh", "-c", "find /dhfs_test/fuse -type f | wc -l");
|
||||
return 400 == Integer.valueOf(foundWc2.getStdout().strip());
|
||||
});
|
||||
await().atMost(120, TimeUnit.SECONDS).until(() -> {
|
||||
var foundWc2 = container1.execInContainer("/bin/sh", "-c", "find /root/dhfs_default/fuse -type f | wc -l");
|
||||
var foundWc2 = container1.execInContainer("/bin/sh", "-c", "find /dhfs_test/fuse -type f | wc -l");
|
||||
return 400 == Integer.valueOf(foundWc2.getStdout().strip());
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
void folderAfterMove() throws IOException, InterruptedException, TimeoutException {
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "mkdir /root/dhfs_default/fuse/testd1").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo tesempty1 > /root/dhfs_default/fuse/testd1/testf1").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "mv /root/dhfs_default/fuse/testd1 /root/dhfs_default/fuse/testd2").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo tesempty2 > /root/dhfs_default/fuse/testd2/testf2").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "mkdir /dhfs_test/fuse/testd1").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo tesempty1 > /dhfs_test/fuse/testd1/testf1").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "mv /dhfs_test/fuse/testd1 /dhfs_test/fuse/testd2").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo tesempty2 > /dhfs_test/fuse/testd2/testf2").getExitCode());
|
||||
|
||||
c1uuid = container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/data/stuff/self_uuid").getStdout();
|
||||
c2uuid = container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/data/stuff/self_uuid").getStdout();
|
||||
c1uuid = container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/data/stuff/self_uuid").getStdout();
|
||||
c2uuid = container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/data/stuff/self_uuid").getStdout();
|
||||
|
||||
Assertions.assertDoesNotThrow(() -> UUID.fromString(c1uuid));
|
||||
Assertions.assertDoesNotThrow(() -> UUID.fromString(c2uuid));
|
||||
@@ -164,8 +164,8 @@ public class ResyncIT {
|
||||
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
|
||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
|
||||
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> "tesempty1\n".equals(container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testd2/testf1").getStdout()));
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> "tesempty2\n".equals(container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testd2/testf2").getStdout()));
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> "tesempty1\n".equals(container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testd2/testf1").getStdout()));
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> "tesempty2\n".equals(container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testd2/testf2").getStdout()));
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -19,7 +19,7 @@ public class ChunkDataProtoSerializer implements ProtoSerializer<ChunkDataP, Chu
|
||||
@Override
|
||||
public ChunkDataP serialize(ChunkData object) {
|
||||
return ChunkDataP.newBuilder()
|
||||
.setKey(JObjectKeyP.newBuilder().setName(object.key().name()).build())
|
||||
.setKey(JObjectKeyP.newBuilder().setName(object.key().value()).build())
|
||||
.setData(object.data())
|
||||
.build();
|
||||
}
|
||||
|
||||
@@ -74,6 +74,10 @@ public class DhfsFuse extends FuseStubFS {
|
||||
void init(@Observes @Priority(100000) StartupEvent event) {
|
||||
if (!enabled) return;
|
||||
Paths.get(root).toFile().mkdirs();
|
||||
|
||||
if (!Paths.get(root).toFile().isDirectory())
|
||||
throw new IllegalStateException("Could not create directory " + root);
|
||||
|
||||
Log.info("Mounting with root " + root);
|
||||
|
||||
var uid = new UnixSystem().getUid();
|
||||
|
||||
@@ -2,11 +2,6 @@ package com.usatiuk.objects;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
||||
// TODO: This could be maybe moved to a separate module?
|
||||
// The base class for JObject data
|
||||
// Only one instance of this "exists" per key, the instance in the manager is canonical
|
||||
// When committing a transaction, the instance is checked against it, if it isn't the same, a race occurred.
|
||||
// It is immutable, its version is filled in by the allocator from the AllocVersionProvider
|
||||
public interface JData extends Serializable {
|
||||
JObjectKey key();
|
||||
|
||||
|
||||
@@ -9,8 +9,8 @@ public sealed interface JObjectKey extends Serializable, Comparable<JObjectKey>
|
||||
JObjectKeyMin MIN = new JObjectKeyMin();
|
||||
JObjectKeyMax MAX = new JObjectKeyMax();
|
||||
|
||||
static JObjectKey of(String name) {
|
||||
return new JObjectKeyImpl(name);
|
||||
static JObjectKey of(String value) {
|
||||
return new JObjectKeyImpl(value);
|
||||
}
|
||||
|
||||
static JObjectKey random() {
|
||||
@@ -43,5 +43,5 @@ public sealed interface JObjectKey extends Serializable, Comparable<JObjectKey>
|
||||
|
||||
ByteBuffer toByteBuffer();
|
||||
|
||||
String name();
|
||||
String value();
|
||||
}
|
||||
|
||||
@@ -5,12 +5,12 @@ import com.usatiuk.dhfs.supportlib.UninitializedByteBuffer;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
|
||||
public record JObjectKeyImpl(String name) implements JObjectKey {
|
||||
public record JObjectKeyImpl(String value) implements JObjectKey {
|
||||
@Override
|
||||
public int compareTo(JObjectKey o) {
|
||||
switch (o) {
|
||||
case JObjectKeyImpl jObjectKeyImpl -> {
|
||||
return name.compareTo(jObjectKeyImpl.name());
|
||||
return value.compareTo(jObjectKeyImpl.value());
|
||||
}
|
||||
case JObjectKeyMax jObjectKeyMax -> {
|
||||
return -1;
|
||||
@@ -23,17 +23,17 @@ public record JObjectKeyImpl(String name) implements JObjectKey {
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return name;
|
||||
return value;
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] bytes() {
|
||||
return name.getBytes(StandardCharsets.UTF_8);
|
||||
return value.getBytes(StandardCharsets.UTF_8);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ByteBuffer toByteBuffer() {
|
||||
var heapBb = StandardCharsets.UTF_8.encode(name);
|
||||
var heapBb = StandardCharsets.UTF_8.encode(value);
|
||||
if (heapBb.isDirect()) return heapBb;
|
||||
var directBb = UninitializedByteBuffer.allocateUninitialized(heapBb.remaining());
|
||||
directBb.put(heapBb);
|
||||
|
||||
@@ -29,7 +29,7 @@ public record JObjectKeyMax() implements JObjectKey {
|
||||
}
|
||||
|
||||
@Override
|
||||
public String name() {
|
||||
public String value() {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -29,7 +29,7 @@ public record JObjectKeyMin() implements JObjectKey {
|
||||
}
|
||||
|
||||
@Override
|
||||
public String name() {
|
||||
public String value() {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,7 +2,6 @@ package com.usatiuk.objects.stores;
|
||||
|
||||
import com.google.protobuf.ByteString;
|
||||
import com.usatiuk.objects.JObjectKey;
|
||||
import com.usatiuk.objects.JObjectKeyImpl;
|
||||
import com.usatiuk.objects.JObjectKeyMax;
|
||||
import com.usatiuk.objects.JObjectKeyMin;
|
||||
import com.usatiuk.objects.iterators.CloseableKvIterator;
|
||||
@@ -121,7 +120,7 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore {
|
||||
@Override
|
||||
public CloseableKvIterator<JObjectKey, ByteString> getIterator(IteratorStart start, JObjectKey key) {
|
||||
assert !_closed;
|
||||
return new KeyPredicateKvIterator<>(new LmdbKvIterator(_txn.ref(), start, key), start, key, (k) -> !Arrays.equals(k.name().getBytes(StandardCharsets.UTF_8), DB_VER_OBJ_NAME));
|
||||
return new KeyPredicateKvIterator<>(new LmdbKvIterator(_txn.ref(), start, key), start, key, (k) -> !Arrays.equals(k.value().getBytes(StandardCharsets.UTF_8), DB_VER_OBJ_NAME));
|
||||
}
|
||||
|
||||
@Nonnull
|
||||
|
||||
@@ -595,13 +595,13 @@ public abstract class ObjectsTestImpl {
|
||||
txm.run(() -> {
|
||||
var iter = curTx.getIterator(IteratorStart.GT, JObjectKey.of(key));
|
||||
var got = iter.next();
|
||||
Assertions.assertEquals(key1, got.getKey().name());
|
||||
Assertions.assertEquals(key1, got.getKey().value());
|
||||
got = iter.next();
|
||||
Assertions.assertEquals(key2, got.getKey().name());
|
||||
Assertions.assertEquals(key2, got.getKey().value());
|
||||
got = iter.next();
|
||||
Assertions.assertEquals(key3, got.getKey().name());
|
||||
Assertions.assertEquals(key3, got.getKey().value());
|
||||
got = iter.next();
|
||||
Assertions.assertEquals(key4, got.getKey().name());
|
||||
Assertions.assertEquals(key4, got.getKey().value());
|
||||
iter.close();
|
||||
});
|
||||
}
|
||||
@@ -623,25 +623,25 @@ public abstract class ObjectsTestImpl {
|
||||
txm.run(() -> {
|
||||
try (var iter = curTx.getIterator(IteratorStart.GT, JObjectKey.of(key))) {
|
||||
var got = iter.next();
|
||||
Assertions.assertEquals(key1, got.getKey().name());
|
||||
Assertions.assertEquals(key1, got.getKey().value());
|
||||
got = iter.next();
|
||||
Assertions.assertEquals(key2, got.getKey().name());
|
||||
Assertions.assertEquals(key2, got.getKey().value());
|
||||
got = iter.next();
|
||||
Assertions.assertEquals(key3, got.getKey().name());
|
||||
Assertions.assertEquals(key3, got.getKey().value());
|
||||
got = iter.next();
|
||||
Assertions.assertEquals(key4, got.getKey().name());
|
||||
Assertions.assertEquals(key4, got.getKey().value());
|
||||
}
|
||||
});
|
||||
txm.run(() -> {
|
||||
try (var iter = curTx.getIterator(IteratorStart.LT, JObjectKey.of(key + "_5"))) {
|
||||
var got = iter.next();
|
||||
Assertions.assertEquals(key4, got.getKey().name());
|
||||
Assertions.assertEquals(key4, got.getKey().value());
|
||||
Assertions.assertTrue(iter.hasPrev());
|
||||
got = iter.prev();
|
||||
Assertions.assertEquals(key4, got.getKey().name());
|
||||
Assertions.assertEquals(key4, got.getKey().value());
|
||||
Assertions.assertTrue(iter.hasNext());
|
||||
got = iter.next();
|
||||
Assertions.assertEquals(key4, got.getKey().name());
|
||||
Assertions.assertEquals(key4, got.getKey().value());
|
||||
}
|
||||
});
|
||||
txm.run(() -> {
|
||||
@@ -653,7 +653,7 @@ public abstract class ObjectsTestImpl {
|
||||
});
|
||||
txm.run(() -> {
|
||||
try (var iter = curTx.getIterator(IteratorStart.GT, JObjectKey.of(key))) {
|
||||
Assertions.assertTrue(!iter.hasNext() || !iter.next().getKey().name().startsWith(key));
|
||||
Assertions.assertTrue(!iter.hasNext() || !iter.next().getKey().value().startsWith(key));
|
||||
}
|
||||
});
|
||||
}
|
||||
@@ -695,9 +695,9 @@ public abstract class ObjectsTestImpl {
|
||||
barrier2.await();
|
||||
try (var iter = curTx.getIterator(IteratorStart.GT, JObjectKey.of(key))) {
|
||||
var got = iter.next();
|
||||
Assertions.assertEquals(key1, got.getKey().name());
|
||||
Assertions.assertEquals(key1, got.getKey().value());
|
||||
got = iter.next();
|
||||
Assertions.assertEquals(key4, got.getKey().name());
|
||||
Assertions.assertEquals(key4, got.getKey().value());
|
||||
}
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
@@ -710,13 +710,13 @@ public abstract class ObjectsTestImpl {
|
||||
txm.run(() -> {
|
||||
try (var iter = curTx.getIterator(IteratorStart.GT, JObjectKey.of(key))) {
|
||||
var got = iter.next();
|
||||
Assertions.assertEquals(key1, got.getKey().name());
|
||||
Assertions.assertEquals(key1, got.getKey().value());
|
||||
got = iter.next();
|
||||
Assertions.assertEquals(key2, got.getKey().name());
|
||||
Assertions.assertEquals(key2, got.getKey().value());
|
||||
got = iter.next();
|
||||
Assertions.assertEquals(key3, got.getKey().name());
|
||||
Assertions.assertEquals(key3, got.getKey().value());
|
||||
got = iter.next();
|
||||
Assertions.assertEquals(key4, got.getKey().name());
|
||||
Assertions.assertEquals(key4, got.getKey().value());
|
||||
}
|
||||
});
|
||||
txm.run(() -> {
|
||||
@@ -728,7 +728,7 @@ public abstract class ObjectsTestImpl {
|
||||
});
|
||||
txm.run(() -> {
|
||||
try (var iter = curTx.getIterator(IteratorStart.GT, JObjectKey.of(key))) {
|
||||
Assertions.assertTrue(!iter.hasNext() || !iter.next().getKey().name().startsWith(key));
|
||||
Assertions.assertTrue(!iter.hasNext() || !iter.next().getKey().value().startsWith(key));
|
||||
}
|
||||
});
|
||||
}
|
||||
@@ -771,12 +771,12 @@ public abstract class ObjectsTestImpl {
|
||||
barrier2.await();
|
||||
try (var iter = curTx.getIterator(IteratorStart.GT, JObjectKey.of(key))) {
|
||||
var got = iter.next();
|
||||
Assertions.assertEquals(key1, got.getKey().name());
|
||||
Assertions.assertEquals(key1, got.getKey().value());
|
||||
got = iter.next();
|
||||
Assertions.assertEquals(key2, got.getKey().name());
|
||||
Assertions.assertEquals(key2, got.getKey().value());
|
||||
Assertions.assertEquals("John2", ((Parent) got.getValue()).name());
|
||||
got = iter.next();
|
||||
Assertions.assertEquals(key4, got.getKey().name());
|
||||
Assertions.assertEquals(key4, got.getKey().value());
|
||||
}
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
@@ -789,14 +789,14 @@ public abstract class ObjectsTestImpl {
|
||||
txm.run(() -> {
|
||||
try (var iter = curTx.getIterator(IteratorStart.GT, JObjectKey.of(key))) {
|
||||
var got = iter.next();
|
||||
Assertions.assertEquals(key1, got.getKey().name());
|
||||
Assertions.assertEquals(key1, got.getKey().value());
|
||||
got = iter.next();
|
||||
Assertions.assertEquals(key2, got.getKey().name());
|
||||
Assertions.assertEquals(key2, got.getKey().value());
|
||||
Assertions.assertEquals("John5", ((Parent) got.getValue()).name());
|
||||
got = iter.next();
|
||||
Assertions.assertEquals(key3, got.getKey().name());
|
||||
Assertions.assertEquals(key3, got.getKey().value());
|
||||
got = iter.next();
|
||||
Assertions.assertEquals(key4, got.getKey().name());
|
||||
Assertions.assertEquals(key4, got.getKey().value());
|
||||
}
|
||||
});
|
||||
txm.run(() -> {
|
||||
@@ -808,7 +808,7 @@ public abstract class ObjectsTestImpl {
|
||||
});
|
||||
txm.run(() -> {
|
||||
try (var iter = curTx.getIterator(IteratorStart.GT, JObjectKey.of(key))) {
|
||||
Assertions.assertTrue(!iter.hasNext() || !iter.next().getKey().name().startsWith(key));
|
||||
Assertions.assertTrue(!iter.hasNext() || !iter.next().getKey().value().startsWith(key));
|
||||
}
|
||||
});
|
||||
}
|
||||
@@ -851,38 +851,38 @@ public abstract class ObjectsTestImpl {
|
||||
barrier2.await();
|
||||
try (var iter = curTx.getIterator(IteratorStart.LE, JObjectKey.of(key3))) {
|
||||
var got = iter.next();
|
||||
Assertions.assertEquals(key2, got.getKey().name());
|
||||
Assertions.assertEquals(key2, got.getKey().value());
|
||||
Assertions.assertEquals("John2", ((Parent) got.getValue()).name());
|
||||
Assertions.assertTrue(iter.hasNext());
|
||||
Assertions.assertTrue(iter.hasPrev());
|
||||
got = iter.next();
|
||||
Assertions.assertEquals(key4, got.getKey().name());
|
||||
Assertions.assertEquals(key4, got.getKey().value());
|
||||
Assertions.assertTrue(iter.hasPrev());
|
||||
got = iter.prev();
|
||||
Assertions.assertEquals(key4, got.getKey().name());
|
||||
Assertions.assertEquals(key4, got.getKey().value());
|
||||
Assertions.assertTrue(iter.hasPrev());
|
||||
got = iter.prev();
|
||||
Assertions.assertEquals("John2", ((Parent) got.getValue()).name());
|
||||
Assertions.assertTrue(iter.hasPrev());
|
||||
got = iter.prev();
|
||||
Assertions.assertEquals(key1, got.getKey().name());
|
||||
Assertions.assertEquals(key1, got.getKey().value());
|
||||
Assertions.assertTrue(iter.hasNext());
|
||||
got = iter.next();
|
||||
Assertions.assertEquals(key1, got.getKey().name());
|
||||
Assertions.assertEquals(key1, got.getKey().value());
|
||||
got = iter.next();
|
||||
Assertions.assertEquals(key2, got.getKey().name());
|
||||
Assertions.assertEquals(key2, got.getKey().value());
|
||||
Assertions.assertEquals("John2", ((Parent) got.getValue()).name());
|
||||
got = iter.next();
|
||||
Assertions.assertEquals(key4, got.getKey().name());
|
||||
Assertions.assertEquals(key4, got.getKey().value());
|
||||
}
|
||||
try (var iter = curTx.getIterator(IteratorStart.GT, JObjectKey.of(key))) {
|
||||
var got = iter.next();
|
||||
Assertions.assertEquals(key1, got.getKey().name());
|
||||
Assertions.assertEquals(key1, got.getKey().value());
|
||||
got = iter.next();
|
||||
Assertions.assertEquals(key2, got.getKey().name());
|
||||
Assertions.assertEquals(key2, got.getKey().value());
|
||||
Assertions.assertEquals("John2", ((Parent) got.getValue()).name());
|
||||
got = iter.next();
|
||||
Assertions.assertEquals(key4, got.getKey().name());
|
||||
Assertions.assertEquals(key4, got.getKey().value());
|
||||
}
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
@@ -895,11 +895,11 @@ public abstract class ObjectsTestImpl {
|
||||
txm.run(() -> {
|
||||
try (var iter = curTx.getIterator(IteratorStart.GT, JObjectKey.of(key))) {
|
||||
var got = iter.next();
|
||||
Assertions.assertEquals(key1, got.getKey().name());
|
||||
Assertions.assertEquals(key1, got.getKey().value());
|
||||
got = iter.next();
|
||||
Assertions.assertEquals(key3, got.getKey().name());
|
||||
Assertions.assertEquals(key3, got.getKey().value());
|
||||
got = iter.next();
|
||||
Assertions.assertEquals(key4, got.getKey().name());
|
||||
Assertions.assertEquals(key4, got.getKey().value());
|
||||
}
|
||||
});
|
||||
txm.run(() -> {
|
||||
@@ -910,7 +910,7 @@ public abstract class ObjectsTestImpl {
|
||||
});
|
||||
txm.run(() -> {
|
||||
try (var iter = curTx.getIterator(IteratorStart.GT, JObjectKey.of(key))) {
|
||||
Assertions.assertTrue(!iter.hasNext() || !iter.next().getKey().name().startsWith(key));
|
||||
Assertions.assertTrue(!iter.hasNext() || !iter.next().getKey().value().startsWith(key));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@@ -13,6 +13,6 @@ public class JObjectKeyProtoSerializer implements ProtoSerializer<JObjectKeyP, J
|
||||
|
||||
@Override
|
||||
public JObjectKeyP serialize(JObjectKey object) {
|
||||
return JObjectKeyP.newBuilder().setName(object.name()).build();
|
||||
return JObjectKeyP.newBuilder().setName(object.value()).build();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,36 @@
|
||||
package com.usatiuk.dhfs;
|
||||
|
||||
import com.usatiuk.dhfs.repository.InitialSyncProcessor;
|
||||
import com.usatiuk.dhfs.repository.invalidation.InvalidationQueueService;
|
||||
import com.usatiuk.dhfs.repository.peersync.PeerInfoService;
|
||||
import com.usatiuk.objects.JObjectKey;
|
||||
import com.usatiuk.objects.transaction.Transaction;
|
||||
import io.quarkus.logging.Log;
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
import jakarta.inject.Inject;
|
||||
|
||||
@ApplicationScoped
|
||||
public class RemoteObjectInitialSyncProcessor implements InitialSyncProcessor<RemoteObjectMeta> {
|
||||
@Inject
|
||||
InvalidationQueueService invalidationQueueService;
|
||||
@Inject
|
||||
Transaction curTx;
|
||||
@Inject
|
||||
PeerInfoService peerInfoService;
|
||||
|
||||
@Override
|
||||
public void prepareForInitialSync(PeerId from, JObjectKey key) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleCrash(JObjectKey key) {
|
||||
var data = curTx.get(RemoteObjectMeta.class, key).orElseThrow();
|
||||
var versionSum = data.versionSum();
|
||||
for (var p : peerInfoService.getPeersNoSelf()) {
|
||||
if (data.knownRemoteVersions().getOrDefault(p.id(), 0L) != versionSum) {
|
||||
invalidationQueueService.pushInvalidationToOne(p.id(), key);
|
||||
Log.infov("Pushing after crash invalidation to {0} for {1}", p.id(), key);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -41,7 +41,7 @@ public record RemoteObjectMeta(PCollection<JDataRef> refsFrom, boolean frozen,
|
||||
}
|
||||
|
||||
public static JObjectKey ofDataKey(JObjectKey key) {
|
||||
return JObjectKey.of("data_" + key.name());
|
||||
return JObjectKey.of("data_" + key.value());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -21,6 +21,10 @@ public class ShutdownChecker {
|
||||
|
||||
void init(@Observes @Priority(2) StartupEvent event) throws IOException {
|
||||
Paths.get(dataRoot).toFile().mkdirs();
|
||||
|
||||
if (!Paths.get(dataRoot).toFile().isDirectory())
|
||||
throw new IllegalStateException("Could not create directory " + dataRoot);
|
||||
|
||||
Log.info("Initializing with root " + dataRoot);
|
||||
if (Paths.get(dataRoot).resolve(dataFileName).toFile().exists()) {
|
||||
_cleanShutdown = false;
|
||||
|
||||
@@ -1,9 +1,11 @@
|
||||
package com.usatiuk.dhfs.jkleppmanntree;
|
||||
|
||||
import com.usatiuk.objects.JObjectKey;
|
||||
import com.usatiuk.dhfs.PeerId;
|
||||
import com.usatiuk.dhfs.jkleppmanntree.structs.JKleppmannTreePersistentData;
|
||||
import com.usatiuk.dhfs.repository.InitialSyncProcessor;
|
||||
import com.usatiuk.dhfs.repository.invalidation.InvalidationQueueService;
|
||||
import com.usatiuk.objects.JObjectKey;
|
||||
import io.quarkus.logging.Log;
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
import jakarta.inject.Inject;
|
||||
|
||||
@@ -11,10 +13,18 @@ import jakarta.inject.Inject;
|
||||
public class JKleppmannTreeInitialSyncProcessor implements InitialSyncProcessor<JKleppmannTreePersistentData> {
|
||||
@Inject
|
||||
JKleppmannTreeManager jKleppmannTreeManager;
|
||||
@Inject
|
||||
InvalidationQueueService invalidationQueueService;
|
||||
|
||||
@Override
|
||||
public void prepareForInitialSync(PeerId from, JObjectKey key) {
|
||||
var tree = jKleppmannTreeManager.getTree(key);
|
||||
tree.recordBootstrap(from);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleCrash(JObjectKey key) {
|
||||
invalidationQueueService.pushInvalidationToAll(key);
|
||||
Log.infov("Pushing after crash invalidation to all for {0}", key);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -6,7 +6,6 @@ import com.usatiuk.dhfs.jkleppmanntree.structs.*;
|
||||
import com.usatiuk.dhfs.repository.PersistentPeerDataService;
|
||||
import com.usatiuk.dhfs.repository.invalidation.Op;
|
||||
import com.usatiuk.dhfs.repository.peersync.PeerInfoService;
|
||||
import com.usatiuk.objects.JObjectKeyImpl;
|
||||
import com.usatiuk.objects.transaction.LockingStrategy;
|
||||
import com.usatiuk.objects.transaction.Transaction;
|
||||
import com.usatiuk.objects.transaction.TransactionManager;
|
||||
@@ -50,11 +49,11 @@ public class JKleppmannTreeManager {
|
||||
TreePMap.empty()
|
||||
);
|
||||
curTx.put(data);
|
||||
var rootNode = new JKleppmannTreeNode(JObjectKey.of(name.name() + "_jt_root"), null, new JKleppmannTreeNodeMetaDirectory(""));
|
||||
var rootNode = new JKleppmannTreeNode(JObjectKey.of(name.value() + "_jt_root"), null, new JKleppmannTreeNodeMetaDirectory(""));
|
||||
curTx.put(rootNode);
|
||||
var trashNode = new JKleppmannTreeNode(JObjectKey.of(name.name() + "_jt_trash"), null, new JKleppmannTreeNodeMetaDirectory(""));
|
||||
var trashNode = new JKleppmannTreeNode(JObjectKey.of(name.value() + "_jt_trash"), null, new JKleppmannTreeNodeMetaDirectory(""));
|
||||
curTx.put(trashNode);
|
||||
var lf_node = new JKleppmannTreeNode(JObjectKey.of(name.name() + "_jt_lf"), null, new JKleppmannTreeNodeMetaDirectory(""));
|
||||
var lf_node = new JKleppmannTreeNode(JObjectKey.of(name.value() + "_jt_lf"), null, new JKleppmannTreeNodeMetaDirectory(""));
|
||||
curTx.put(lf_node);
|
||||
}
|
||||
return new JKleppmannTree(data);
|
||||
@@ -259,17 +258,17 @@ public class JKleppmannTreeManager {
|
||||
|
||||
@Override
|
||||
public JObjectKey getRootId() {
|
||||
return JObjectKey.of(_treeName.name() + "_jt_root");
|
||||
return JObjectKey.of(_treeName.value() + "_jt_root");
|
||||
}
|
||||
|
||||
@Override
|
||||
public JObjectKey getTrashId() {
|
||||
return JObjectKey.of(_treeName.name() + "_jt_trash");
|
||||
return JObjectKey.of(_treeName.value() + "_jt_trash");
|
||||
}
|
||||
|
||||
@Override
|
||||
public JObjectKey getLostFoundId() {
|
||||
return JObjectKey.of(_treeName.name() + "_jt_lf");
|
||||
return JObjectKey.of(_treeName.value() + "_jt_lf");
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -7,7 +7,6 @@ import com.usatiuk.dhfs.PeerId;
|
||||
import com.usatiuk.kleppmanntree.CombinedTimestamp;
|
||||
import com.usatiuk.kleppmanntree.LogRecord;
|
||||
import com.usatiuk.kleppmanntree.OpMove;
|
||||
import com.usatiuk.objects.JObjectKeyImpl;
|
||||
import org.pcollections.PCollection;
|
||||
import org.pcollections.PMap;
|
||||
import org.pcollections.PSortedMap;
|
||||
@@ -50,6 +49,6 @@ public record JKleppmannTreePersistentData(
|
||||
|
||||
@Override
|
||||
public Collection<JObjectKey> collectRefsTo() {
|
||||
return List.of(JObjectKey.of(key().name() + "_jt_trash"), JObjectKey.of(key().name() + "_jt_root"), JObjectKey.of(key().name() + "_jt_lf"));
|
||||
return List.of(JObjectKey.of(key().value() + "_jt_trash"), JObjectKey.of(key().value() + "_jt_root"), JObjectKey.of(key().value() + "_jt_lf"));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -17,19 +17,19 @@ public class JMapHelper {
|
||||
Transaction curTx;
|
||||
|
||||
static <K extends JMapKey> JObjectKey makePrefix(JObjectKey holder) {
|
||||
return JObjectKey.of(holder.name() + "=");
|
||||
return JObjectKey.of(holder.value() + "=");
|
||||
}
|
||||
|
||||
static <K extends JMapKey> JObjectKey makeKeyFirst(JObjectKey holder) {
|
||||
return JObjectKey.of(holder.name() + "<");
|
||||
return JObjectKey.of(holder.value() + "<");
|
||||
}
|
||||
|
||||
static <K extends JMapKey> JObjectKey makeKey(JObjectKey holder, K key) {
|
||||
return JObjectKey.of(makePrefix(holder).name() + key.toString());
|
||||
return JObjectKey.of(makePrefix(holder).value() + key.toString());
|
||||
}
|
||||
|
||||
static <K extends JMapKey> JObjectKey makeKeyLast(JObjectKey holder) {
|
||||
return JObjectKey.of(holder.name() + ">");
|
||||
return JObjectKey.of(holder.value() + ">");
|
||||
}
|
||||
|
||||
public <K extends JMapKey> CloseableKvIterator<K, JMapEntry<K>> getIterator(JMapHolder<K> holder, IteratorStart start, K key) {
|
||||
|
||||
@@ -23,16 +23,16 @@ public class JMapIterator<K extends JMapKey> implements CloseableKvIterator<K, J
|
||||
_hasNext = false;
|
||||
return;
|
||||
}
|
||||
if (!_backing.peekNextKey().name().startsWith(_prefix.name())) {
|
||||
if (!_backing.peekNextKey().value().startsWith(_prefix.value())) {
|
||||
_backing.skip();
|
||||
if (!_backing.peekNextKey().name().startsWith(_prefix.name())) {
|
||||
if (!_backing.peekNextKey().value().startsWith(_prefix.value())) {
|
||||
_hasNext = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public K keyToKey(JObjectKey key) {
|
||||
var keyPart = key.name().substring(_prefix.name().length());
|
||||
var keyPart = key.value().substring(_prefix.value().length());
|
||||
return (K) JMapLongKey.of(Long.parseLong(keyPart));
|
||||
}
|
||||
|
||||
@@ -90,7 +90,7 @@ public class JMapIterator<K extends JMapKey> implements CloseableKvIterator<K, J
|
||||
throw new IllegalStateException("No next element");
|
||||
}
|
||||
var next = _backing.next();
|
||||
assert next.getKey().name().startsWith(_prefix.name());
|
||||
assert next.getKey().value().startsWith(_prefix.value());
|
||||
advance();
|
||||
return Pair.of(keyToKey(next.getKey()), (JMapEntry<K>) next.getValue());
|
||||
}
|
||||
|
||||
@@ -1,9 +1,10 @@
|
||||
package com.usatiuk.dhfs.repository;
|
||||
|
||||
import com.usatiuk.dhfs.PeerId;
|
||||
import com.usatiuk.objects.JData;
|
||||
import com.usatiuk.objects.JObjectKey;
|
||||
import com.usatiuk.dhfs.PeerId;
|
||||
|
||||
public interface InitialSyncProcessor<T extends JData> {
|
||||
void prepareForInitialSync(PeerId from, JObjectKey key);
|
||||
void handleCrash(JObjectKey key);
|
||||
}
|
||||
|
||||
@@ -3,14 +3,18 @@ package com.usatiuk.dhfs.repository;
|
||||
import com.usatiuk.dhfs.JDataRemote;
|
||||
import com.usatiuk.dhfs.PeerId;
|
||||
import com.usatiuk.dhfs.RemoteTransaction;
|
||||
import com.usatiuk.dhfs.ShutdownChecker;
|
||||
import com.usatiuk.dhfs.repository.invalidation.InvalidationQueueService;
|
||||
import com.usatiuk.objects.JData;
|
||||
import com.usatiuk.objects.JObjectKey;
|
||||
import com.usatiuk.objects.iterators.IteratorStart;
|
||||
import com.usatiuk.dhfs.repository.invalidation.InvalidationQueueService;
|
||||
import com.usatiuk.objects.transaction.Transaction;
|
||||
import com.usatiuk.objects.transaction.TransactionManager;
|
||||
import io.quarkus.logging.Log;
|
||||
import io.quarkus.runtime.StartupEvent;
|
||||
import jakarta.annotation.Priority;
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
import jakarta.enterprise.event.Observes;
|
||||
import jakarta.enterprise.inject.Instance;
|
||||
import jakarta.inject.Inject;
|
||||
import org.pcollections.HashTreePSet;
|
||||
@@ -40,6 +44,8 @@ public class SyncHandler {
|
||||
RemoteTransaction remoteTx;
|
||||
@Inject
|
||||
RemoteObjectServiceClient remoteObjectServiceClient;
|
||||
@Inject
|
||||
ShutdownChecker shutdownChecker;
|
||||
|
||||
public SyncHandler(Instance<ObjSyncHandler<?, ?>> syncHandlers, Instance<InitialSyncProcessor<?>> initialSyncProcessors) {
|
||||
HashMap<Class<? extends JDataRemote>, ObjSyncHandler> objToHandlerMap = new HashMap<>();
|
||||
@@ -113,6 +119,33 @@ public class SyncHandler {
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public void resyncAfterCrash(@Observes @Priority(100000) StartupEvent event) {
|
||||
if (shutdownChecker.lastShutdownClean())
|
||||
return;
|
||||
List<JObjectKey> objs = new LinkedList<>();
|
||||
txm.run(() -> {
|
||||
try (var it = curTx.getIterator(IteratorStart.GE, JObjectKey.first())) {
|
||||
while (it.hasNext()) {
|
||||
var key = it.peekNextKey();
|
||||
objs.add(key);
|
||||
// TODO: Nested transactions
|
||||
it.skip();
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
for (var obj : objs) {
|
||||
txm.run(() -> {
|
||||
var proc = curTx.get(JData.class, obj).flatMap(o -> Optional.ofNullable(_initialSyncProcessors.get(o.getClass()))).orElse(null);
|
||||
if (proc != null) {
|
||||
proc.handleCrash(obj);
|
||||
}
|
||||
Log.infov("Handled crash of {0}", obj);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
public void doInitialSync(PeerId peer) {
|
||||
List<JObjectKey> objs = new LinkedList<>();
|
||||
txm.run(() -> {
|
||||
|
||||
@@ -51,7 +51,7 @@ public class PeerInfoService {
|
||||
|
||||
public boolean existsPeer(PeerId peer) {
|
||||
return jObjectTxManager.run(() -> {
|
||||
var gotKey = getTreeR().traverse(List.of(JKleppmannTreeNodeMetaPeer.peerIdToNodeId(peer).name()));
|
||||
var gotKey = getTreeR().traverse(List.of(JKleppmannTreeNodeMetaPeer.peerIdToNodeId(peer).value()));
|
||||
if (gotKey == null) {
|
||||
return false;
|
||||
}
|
||||
@@ -61,7 +61,7 @@ public class PeerInfoService {
|
||||
|
||||
public Optional<PeerInfo> getPeerInfo(PeerId peer) {
|
||||
return jObjectTxManager.run(() -> {
|
||||
var gotKey = getTreeR().traverse(List.of(JKleppmannTreeNodeMetaPeer.peerIdToNodeId(peer).name()));
|
||||
var gotKey = getTreeR().traverse(List.of(JKleppmannTreeNodeMetaPeer.peerIdToNodeId(peer).value()));
|
||||
if (gotKey == null) {
|
||||
return Optional.empty();
|
||||
}
|
||||
@@ -114,7 +114,7 @@ public class PeerInfoService {
|
||||
|
||||
public void removePeer(PeerId id) {
|
||||
jObjectTxManager.run(() -> {
|
||||
var gotKey = getTreeR().traverse(List.of(JKleppmannTreeNodeMetaPeer.peerIdToNodeId(id).name()));
|
||||
var gotKey = getTreeR().traverse(List.of(JKleppmannTreeNodeMetaPeer.peerIdToNodeId(id).value()));
|
||||
if (gotKey == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -11,19 +11,19 @@ public class JKleppmannTreeNodeMetaPeer extends JKleppmannTreeNodeMeta {
|
||||
private final JObjectKey _peerId;
|
||||
|
||||
public JKleppmannTreeNodeMetaPeer(PeerId id) {
|
||||
super(peerIdToNodeId(id).name());
|
||||
super(peerIdToNodeId(id).value());
|
||||
_peerId = id.toJObjectKey();
|
||||
}
|
||||
|
||||
public static JObjectKey peerIdToNodeId(PeerId id) {
|
||||
return JObjectKey.of(id.toJObjectKey().name() + "_tree_node");
|
||||
return JObjectKey.of(id.toJObjectKey().value() + "_tree_node");
|
||||
}
|
||||
|
||||
public static PeerId nodeIdToPeerId(JObjectKey id) {
|
||||
if (!id.name().endsWith("_tree_node")) {
|
||||
if (!id.value().endsWith("_tree_node")) {
|
||||
throw new IllegalArgumentException("Not a tree node key: " + id);
|
||||
}
|
||||
return PeerId.of(id.name().substring(0, id.name().length() - "_tree_node".length()));
|
||||
return PeerId.of(id.value().substring(0, id.value().length() - "_tree_node".length()));
|
||||
}
|
||||
|
||||
public JObjectKey getPeerId() {
|
||||
@@ -32,7 +32,7 @@ public class JKleppmannTreeNodeMetaPeer extends JKleppmannTreeNodeMeta {
|
||||
|
||||
@Override
|
||||
public JKleppmannTreeNodeMeta withName(String name) {
|
||||
assert name.equals(peerIdToNodeId(PeerId.of(getPeerId().name())).toString());
|
||||
assert name.equals(peerIdToNodeId(PeerId.of(getPeerId().value())).toString());
|
||||
assert getName().equals(name);
|
||||
return this;
|
||||
}
|
||||
|
||||
@@ -30,7 +30,7 @@ public class PeerInfoCertUpdateTxHook implements PreCommitTxHook {
|
||||
// connecting two other nodes
|
||||
// TODO: Can there be a prettier way to do this? (e.g. more general proxying of ops?)
|
||||
if (cur instanceof JKleppmannTreeNode n) {
|
||||
if (n.key().name().equals("peers_jt_root")) {
|
||||
if (n.key().value().equals("peers_jt_root")) {
|
||||
// TODO: This is kinda sucky
|
||||
Log.infov("Changed peer tree root: {0} to {1}", key, cur);
|
||||
|
||||
|
||||
Reference in New Issue
Block a user