mirror of
https://github.com/usatiuk/dhfs.git
synced 2025-10-29 04:57:48 +01:00
remove dhfs-app
This commit is contained in:
@@ -139,16 +139,13 @@
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-failsafe-plugin</artifactId>
|
||||
<configuration>
|
||||
<forkCount>1C</forkCount>
|
||||
<reuseForks>false</reuseForks>
|
||||
<parallel>classes</parallel>
|
||||
<systemPropertyVariables>
|
||||
<junit.jupiter.execution.parallel.enabled>
|
||||
true
|
||||
false
|
||||
</junit.jupiter.execution.parallel.enabled>
|
||||
<junit.jupiter.execution.parallel.mode.default>
|
||||
concurrent
|
||||
</junit.jupiter.execution.parallel.mode.default>
|
||||
<junit.jupiter.execution.parallel.config.dynamic.factor>
|
||||
0.5
|
||||
</junit.jupiter.execution.parallel.config.dynamic.factor>
|
||||
<junit.platform.output.capture.stdout>true</junit.platform.output.capture.stdout>
|
||||
<junit.platform.output.capture.stderr>true</junit.platform.output.capture.stderr>
|
||||
</systemPropertyVariables>
|
||||
|
||||
@@ -0,0 +1,21 @@
|
||||
package com.usatiuk.dhfsfuse;
|
||||
|
||||
import io.quarkus.runtime.Quarkus;
|
||||
import io.quarkus.runtime.QuarkusApplication;
|
||||
import io.quarkus.runtime.annotations.QuarkusMain;
|
||||
|
||||
@QuarkusMain
|
||||
public class Main {
|
||||
public static void main(String... args) {
|
||||
Quarkus.run(DhfsStorageServerApp.class, args);
|
||||
}
|
||||
|
||||
public static class DhfsStorageServerApp implements QuarkusApplication {
|
||||
|
||||
@Override
|
||||
public int run(String... args) throws Exception {
|
||||
Quarkus.waitForExit();
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -14,8 +14,9 @@ dhfs.objects.persistence.stuff.root=${HOME}/dhfs_default/data/stuff
|
||||
dhfs.fuse.debug=false
|
||||
dhfs.fuse.enabled=true
|
||||
dhfs.files.allow_recursive_delete=false
|
||||
dhfs.files.target_chunk_size=2097152
|
||||
dhfs.files.target_chunk_alignment=19
|
||||
dhfs.files.target_chunk_size=524288
|
||||
dhfs.files.max_chunk_size=524288
|
||||
dhfs.files.target_chunk_alignment=17
|
||||
dhfs.objects.deletion.delay=1000
|
||||
dhfs.objects.deletion.can-delete-retry-delay=10000
|
||||
dhfs.objects.ref_verification=true
|
||||
|
||||
@@ -30,7 +30,7 @@ public class TestDataCleaner {
|
||||
purgeDirectory(Path.of(tempDirectory).toFile());
|
||||
}
|
||||
|
||||
void purgeDirectory(File dir) {
|
||||
public static void purgeDirectory(File dir) {
|
||||
for (File file : Objects.requireNonNull(dir.listFiles())) {
|
||||
if (file.isDirectory())
|
||||
purgeDirectory(file);
|
||||
|
||||
@@ -0,0 +1,469 @@
|
||||
package com.usatiuk.dhfsfuse.integration;
|
||||
|
||||
import com.github.dockerjava.api.model.Device;
|
||||
import io.quarkus.logging.Log;
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
import org.junit.jupiter.api.*;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.testcontainers.DockerClientFactory;
|
||||
import org.testcontainers.containers.GenericContainer;
|
||||
import org.testcontainers.containers.Network;
|
||||
import org.testcontainers.containers.output.Slf4jLogConsumer;
|
||||
import org.testcontainers.containers.output.WaitingConsumer;
|
||||
import org.testcontainers.containers.wait.strategy.Wait;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.time.Duration;
|
||||
import java.util.Objects;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import static org.awaitility.Awaitility.await;
|
||||
|
||||
public class DhfsFuseIT {
|
||||
GenericContainer<?> container1;
|
||||
GenericContainer<?> container2;
|
||||
|
||||
WaitingConsumer waitingConsumer1;
|
||||
WaitingConsumer waitingConsumer2;
|
||||
|
||||
String c1uuid;
|
||||
String c2uuid;
|
||||
|
||||
Network network;
|
||||
|
||||
@BeforeEach
|
||||
void setup(TestInfo testInfo) throws IOException, InterruptedException, TimeoutException {
|
||||
network = Network.newNetwork();
|
||||
container1 = new GenericContainer<>(DhfsImage.getInstance())
|
||||
.withPrivilegedMode(true)
|
||||
.withCreateContainerCmdModifier(cmd -> Objects.requireNonNull(cmd.getHostConfig()).withDevices(Device.parse("/dev/fuse")))
|
||||
.waitingFor(Wait.forLogMessage(".*Listening.*", 1).withStartupTimeout(Duration.ofSeconds(60))).withNetwork(network);
|
||||
container2 = new GenericContainer<>(DhfsImage.getInstance())
|
||||
.withPrivilegedMode(true)
|
||||
.withCreateContainerCmdModifier(cmd -> Objects.requireNonNull(cmd.getHostConfig()).withDevices(Device.parse("/dev/fuse")))
|
||||
.waitingFor(Wait.forLogMessage(".*Listening.*", 1).withStartupTimeout(Duration.ofSeconds(60))).withNetwork(network);
|
||||
|
||||
Stream.of(container1, container2).parallel().forEach(GenericContainer::start);
|
||||
|
||||
waitingConsumer1 = new WaitingConsumer();
|
||||
var loggingConsumer1 = new Slf4jLogConsumer(LoggerFactory.getLogger(DhfsFuseIT.class)).withPrefix("1-" + testInfo.getDisplayName());
|
||||
container1.followOutput(loggingConsumer1.andThen(waitingConsumer1));
|
||||
waitingConsumer2 = new WaitingConsumer();
|
||||
var loggingConsumer2 = new Slf4jLogConsumer(LoggerFactory.getLogger(DhfsFuseIT.class)).withPrefix("2-" + testInfo.getDisplayName());
|
||||
container2.followOutput(loggingConsumer2.andThen(waitingConsumer2));
|
||||
|
||||
c1uuid = container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/data/stuff/self_uuid").getStdout();
|
||||
c2uuid = container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/data/stuff/self_uuid").getStdout();
|
||||
|
||||
Assertions.assertDoesNotThrow(() -> UUID.fromString(c1uuid));
|
||||
Assertions.assertDoesNotThrow(() -> UUID.fromString(c2uuid));
|
||||
|
||||
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("New address"), 60, TimeUnit.SECONDS);
|
||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("New address"), 60, TimeUnit.SECONDS);
|
||||
|
||||
var c1curl = container1.execInContainer("/bin/sh", "-c",
|
||||
"curl --header \"Content-Type: application/json\" " +
|
||||
" --request PUT " +
|
||||
" --data '{}' " +
|
||||
" http://localhost:8080/peers-manage/known-peers/" + c2uuid);
|
||||
|
||||
var c2curl = container2.execInContainer("/bin/sh", "-c",
|
||||
"curl --header \"Content-Type: application/json\" " +
|
||||
" --request PUT " +
|
||||
" --data '{}' " +
|
||||
" http://localhost:8080/peers-manage/known-peers/" + c1uuid);
|
||||
|
||||
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
|
||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
private void checkConsistency() {
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> {
|
||||
Log.info("Listing consistency");
|
||||
var cat1 = container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/*/*");
|
||||
var cat2 = container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/*/*");
|
||||
var ls1 = container1.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse/*/*");
|
||||
var ls2 = container2.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse/*/*");
|
||||
Log.info(ls1);
|
||||
Log.info(cat1);
|
||||
Log.info(ls2);
|
||||
Log.info(cat2);
|
||||
|
||||
return ls1.equals(ls2) && cat1.equals(cat2);
|
||||
});
|
||||
}
|
||||
|
||||
@AfterEach
|
||||
void stop() {
|
||||
Stream.of(container1, container2).parallel().forEach(GenericContainer::stop);
|
||||
network.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
void readWriteFileTest() throws IOException, InterruptedException, TimeoutException {
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo tesempty > /dhfs_test/fuse/testf1").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() ->
|
||||
"tesempty\n".equals(container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testf1").getStdout()));
|
||||
}
|
||||
|
||||
@Test
|
||||
void readWriteRewriteFileTest() throws IOException, InterruptedException, TimeoutException {
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo tesempty > /dhfs_test/fuse/testf1").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() ->
|
||||
"tesempty\n".equals(container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testf1").getStdout()));
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo rewritten > /dhfs_test/fuse/testf1").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() ->
|
||||
"rewritten\n".equals(container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testf1").getStdout()));
|
||||
}
|
||||
|
||||
@Test
|
||||
void createDelayedTest() throws IOException, InterruptedException, TimeoutException {
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo tesempty > /dhfs_test/fuse/testf1").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() ->
|
||||
"tesempty\n".equals(container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testf1").getStdout()));
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() ->
|
||||
"tesempty\n".equals(container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testf1").getStdout()));
|
||||
|
||||
var client = DockerClientFactory.instance().client();
|
||||
client.pauseContainerCmd(container2.getContainerId()).exec();
|
||||
|
||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Lost connection to"), 60, TimeUnit.SECONDS);
|
||||
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo newfile > /dhfs_test/fuse/testf2").getExitCode());
|
||||
|
||||
client.unpauseContainerCmd(container2.getContainerId()).exec();
|
||||
|
||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() ->
|
||||
"newfile\n".equals(container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testf2").getStdout()));
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() ->
|
||||
"newfile\n".equals(container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testf2").getStdout()));
|
||||
}
|
||||
|
||||
@Test
|
||||
void writeRewriteDelayedTest() throws IOException, InterruptedException, TimeoutException {
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo tesempty > /dhfs_test/fuse/testf1").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() ->
|
||||
"tesempty\n".equals(container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testf1").getStdout()));
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() ->
|
||||
"tesempty\n".equals(container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testf1").getStdout()));
|
||||
|
||||
var client = DockerClientFactory.instance().client();
|
||||
client.pauseContainerCmd(container2.getContainerId()).exec();
|
||||
|
||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Lost connection to"), 60, TimeUnit.SECONDS);
|
||||
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo rewritten > /dhfs_test/fuse/testf1").getExitCode());
|
||||
|
||||
client.unpauseContainerCmd(container2.getContainerId()).exec();
|
||||
|
||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
|
||||
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() ->
|
||||
"rewritten\n".equals(container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testf1").getStdout()));
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() ->
|
||||
"rewritten\n".equals(container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testf1").getStdout()));
|
||||
}
|
||||
|
||||
@Test
|
||||
void deleteTest() throws IOException, InterruptedException, TimeoutException {
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo tesempty > /dhfs_test/fuse/testf1").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() ->
|
||||
"tesempty\n".equals(container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testf1").getStdout()));
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() ->
|
||||
"tesempty\n".equals(container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testf1").getStdout()));
|
||||
|
||||
Log.info("Deleting");
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "rm /dhfs_test/fuse/testf1").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() ->
|
||||
0 == container2.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse").getExitCode());
|
||||
Log.info("Deleted");
|
||||
|
||||
// FIXME?
|
||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Deleting from persistent"), 60, TimeUnit.SECONDS, 3);
|
||||
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Deleting from persistent"), 60, TimeUnit.SECONDS, 3);
|
||||
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() ->
|
||||
1 == container2.execInContainer("/bin/sh", "-c", "test -f /dhfs_test/fuse/testf1").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() ->
|
||||
1 == container1.execInContainer("/bin/sh", "-c", "test -f /dhfs_test/fuse/testf1").getExitCode());
|
||||
}
|
||||
|
||||
@Test
|
||||
void deleteTestKickedOut() throws IOException, InterruptedException, TimeoutException {
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo tesempty > /dhfs_test/fuse/testf1").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() ->
|
||||
"tesempty\n".equals(container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testf1").getStdout()));
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() ->
|
||||
"tesempty\n".equals(container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testf1").getStdout()));
|
||||
|
||||
container2.stop();
|
||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("kicked"), 60, TimeUnit.SECONDS, 1);
|
||||
|
||||
Log.info("Deleting");
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "rm /dhfs_test/fuse/testf1").getExitCode());
|
||||
Log.info("Deleted");
|
||||
|
||||
// FIXME?
|
||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Deleting from persistent"), 60, TimeUnit.SECONDS, 3);
|
||||
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() ->
|
||||
1 == container1.execInContainer("/bin/sh", "-c", "test -f /dhfs_test/fuse/testf1").getExitCode());
|
||||
}
|
||||
|
||||
@Test
|
||||
void moveFileTest() throws IOException, InterruptedException, TimeoutException {
|
||||
Log.info("Creating");
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo tesempty > /dhfs_test/fuse/testf1").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> "tesempty\n".equals(container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testf1").getStdout()));
|
||||
Log.info("Listing");
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse/").getExitCode());
|
||||
Log.info("Moving");
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "mv /dhfs_test/fuse/testf1 /dhfs_test/fuse/testf2").getExitCode());
|
||||
Log.info("Listing");
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse/").getExitCode());
|
||||
Log.info("Reading");
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> "tesempty\n".equals(container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testf2").getStdout()));
|
||||
}
|
||||
|
||||
@Test
|
||||
void moveDirTest() throws IOException, InterruptedException, TimeoutException {
|
||||
Log.info("Creating");
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "mkdir /dhfs_test/fuse/testdir").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo tesempty > /dhfs_test/fuse/testdir/testf1").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> "tesempty\n".equals(container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testdir/testf1").getStdout()));
|
||||
Log.info("Listing");
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse/").getExitCode());
|
||||
Log.info("Moving");
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "mkdir /dhfs_test/fuse/testdir2").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "mv /dhfs_test/fuse/testdir /dhfs_test/fuse/testdir2/testdirm").getExitCode());
|
||||
Log.info("Listing");
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse/").getExitCode());
|
||||
Log.info("Reading");
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> "tesempty\n".equals(container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testdir2/testdirm/testf1").getStdout()));
|
||||
}
|
||||
|
||||
|
||||
// TODO: This probably shouldn't be working right now
|
||||
@Test
|
||||
void removeAddHostTest() throws IOException, InterruptedException, TimeoutException {
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo tesempty > /dhfs_test/fuse/testf1").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> "tesempty\n".equals(container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testf1").getStdout()));
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> "tesempty\n".equals(container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testf1").getStdout()));
|
||||
|
||||
var c2curl = container2.execInContainer("/bin/sh", "-c",
|
||||
"curl --header \"Content-Type: application/json\" " +
|
||||
" --request DELETE " +
|
||||
" --data '{}' " +
|
||||
" http://localhost:8080/peers-manage/known-peers/" + c1uuid);
|
||||
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "echo rewritten > /dhfs_test/fuse/testf1").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "echo jioadsd > /dhfs_test/fuse/newfile1").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo asvdkljm > /dhfs_test/fuse/newfile1").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> "tesempty\n".equals(container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testf1").getStdout()));
|
||||
|
||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Lost connection to"), 60, TimeUnit.SECONDS);
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "echo dfgvh > /dhfs_test/fuse/newfile2").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo dscfg > /dhfs_test/fuse/newfile2").getExitCode());
|
||||
|
||||
Log.info("Re-adding");
|
||||
container2.execInContainer("/bin/sh", "-c",
|
||||
"curl --header \"Content-Type: application/json\" " +
|
||||
" --request PUT " +
|
||||
" --data '{}' " +
|
||||
" http://localhost:8080/peers-manage/known-peers/" + c1uuid);
|
||||
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
|
||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
|
||||
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> "rewritten\n".equals(container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testf1").getStdout()));
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> "rewritten\n".equals(container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testf1").getStdout()));
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> {
|
||||
Log.info("Listing removeAddHostTest");
|
||||
var cat1 = container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/*");
|
||||
var cat2 = container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/*");
|
||||
var ls1 = container1.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse/");
|
||||
var ls2 = container2.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse/");
|
||||
Log.info(cat1);
|
||||
Log.info(cat2);
|
||||
Log.info(ls1);
|
||||
Log.info(ls2);
|
||||
|
||||
return cat1.getStdout().contains("jioadsd") && cat1.getStdout().contains("asvdkljm") && cat1.getStdout().contains("dfgvh") && cat1.getStdout().contains("dscfg")
|
||||
&& cat2.getStdout().contains("jioadsd") && cat2.getStdout().contains("asvdkljm") && cat2.getStdout().contains("dfgvh") && cat2.getStdout().contains("dscfg");
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
void dirConflictTest() throws IOException, InterruptedException, TimeoutException {
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse").getExitCode());
|
||||
boolean createFail = Stream.of(Pair.of(container1, "echo test1 >> /dhfs_test/fuse/testf"),
|
||||
Pair.of(container2, "echo test2 >> /dhfs_test/fuse/testf")).parallel().map(p -> {
|
||||
try {
|
||||
return p.getLeft().execInContainer("/bin/sh", "-c", p.getRight()).getExitCode();
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}).anyMatch(r -> r != 0);
|
||||
Assumptions.assumeTrue(!createFail, "Failed creating one or more files");
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> {
|
||||
var ls = container2.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse");
|
||||
var cat = container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/*");
|
||||
Log.info(ls);
|
||||
Log.info(cat);
|
||||
return cat.getStdout().contains("test1") && cat.getStdout().contains("test2");
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
void dirConflictTest2() throws IOException, InterruptedException, TimeoutException {
|
||||
var client = DockerClientFactory.instance().client();
|
||||
client.disconnectFromNetworkCmd().withContainerId(container1.getContainerId()).withNetworkId(network.getId()).exec();
|
||||
client.disconnectFromNetworkCmd().withContainerId(container2.getContainerId()).withNetworkId(network.getId()).exec();
|
||||
|
||||
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Lost connection to"), 60, TimeUnit.SECONDS, 1);
|
||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Lost connection to"), 60, TimeUnit.SECONDS, 1);
|
||||
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "mkdir /dhfs_test/fuse/a && echo fdsaio >> /dhfs_test/fuse/a/testf").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "mkdir /dhfs_test/fuse/a && echo exgrg >> /dhfs_test/fuse/a/testf").getExitCode());
|
||||
|
||||
client.connectToNetworkCmd().withContainerId(container1.getContainerId()).withNetworkId(network.getId()).exec();
|
||||
client.connectToNetworkCmd().withContainerId(container2.getContainerId()).withNetworkId(network.getId()).exec();
|
||||
|
||||
Log.warn("Waiting for connections");
|
||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS, 1);
|
||||
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS, 1);
|
||||
Log.warn("Connected");
|
||||
|
||||
checkConsistency();
|
||||
|
||||
var ls1 = container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/a*/*");
|
||||
Assertions.assertTrue(ls1.getStdout().contains("fdsaio"));
|
||||
Assertions.assertTrue(ls1.getStdout().contains("exgrg"));
|
||||
}
|
||||
|
||||
@Test
|
||||
void dirCycleTest() throws IOException, InterruptedException, TimeoutException {
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "mkdir /dhfs_test/fuse/a").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "mkdir /dhfs_test/fuse/b").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo xqr489 >> /dhfs_test/fuse/a/testfa").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo ahinou >> /dhfs_test/fuse/b/testfb").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "ls -lavh /dhfs_test/fuse").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> {
|
||||
var c2ls = container2.execInContainer("/bin/sh", "-c", "find /dhfs_test/fuse -type f -exec cat {} \\;");
|
||||
return c2ls.getExitCode() == 0 && c2ls.getStdout().contains("xqr489") && c2ls.getStdout().contains("ahinou");
|
||||
});
|
||||
|
||||
var client = DockerClientFactory.instance().client();
|
||||
client.pauseContainerCmd(container1.getContainerId()).exec();
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "mv /dhfs_test/fuse/a /dhfs_test/fuse/b").getExitCode());
|
||||
client.pauseContainerCmd(container2.getContainerId()).exec();
|
||||
client.unpauseContainerCmd(container1.getContainerId()).exec();
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "mv /dhfs_test/fuse/b /dhfs_test/fuse/a").getExitCode());
|
||||
client.unpauseContainerCmd(container2.getContainerId()).exec();
|
||||
|
||||
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> {
|
||||
Log.info("Listing dirCycleTest");
|
||||
Log.info(container1.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse"));
|
||||
Log.info(container1.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse/a"));
|
||||
Log.info(container1.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse/b"));
|
||||
Log.info(container2.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse"));
|
||||
Log.info(container2.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse/a"));
|
||||
Log.info(container2.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse/b"));
|
||||
|
||||
var c1ls2 = container1.execInContainer("/bin/sh", "-c", "find /dhfs_test/fuse -maxdepth 3 -type f -exec cat {} \\;");
|
||||
Log.info(c1ls2);
|
||||
var c2ls2 = container1.execInContainer("/bin/sh", "-c", "find /dhfs_test/fuse -maxdepth 3 -type f -exec cat {} \\;");
|
||||
Log.info(c2ls2);
|
||||
|
||||
return c1ls2.getStdout().contains("xqr489") && c1ls2.getStdout().contains("ahinou")
|
||||
&& c2ls2.getStdout().contains("xqr489") && c2ls2.getStdout().contains("ahinou")
|
||||
&& c1ls2.getExitCode() == 0 && c2ls2.getExitCode() == 0;
|
||||
});
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
void removeAndMove() throws IOException, InterruptedException, TimeoutException {
|
||||
var client = DockerClientFactory.instance().client();
|
||||
Log.info("Creating");
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo tesempty > /dhfs_test/fuse/testf1").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> "tesempty\n".equals(container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testf1").getStdout()));
|
||||
Log.info("Listing");
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse/").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> "tesempty\n".equals(container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testf1").getStdout()));
|
||||
|
||||
client.pauseContainerCmd(container1.getContainerId()).exec();
|
||||
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Lost connection to"), 60, TimeUnit.SECONDS, 1);
|
||||
|
||||
Log.info("Removing");
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "rm /dhfs_test/fuse/testf1").getExitCode());
|
||||
|
||||
client.pauseContainerCmd(container2.getContainerId()).exec();
|
||||
client.unpauseContainerCmd(container1.getContainerId()).exec();
|
||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Lost connection to"), 60, TimeUnit.SECONDS, 1);
|
||||
Log.info("Moving");
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "mv /dhfs_test/fuse/testf1 /dhfs_test/fuse/testf2").getExitCode());
|
||||
Log.info("Listing");
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse/").getExitCode());
|
||||
Log.info("Reading");
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> "tesempty\n".equals(container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testf2").getStdout()));
|
||||
client.unpauseContainerCmd(container2.getContainerId()).exec();
|
||||
|
||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS, 1);
|
||||
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS, 1);
|
||||
// Either removed, or moved
|
||||
// TODO: it always seems to be removed?
|
||||
Log.info("Reading both");
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> {
|
||||
var ls1 = container1.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse/");
|
||||
var ls2 = container2.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse/");
|
||||
var cat1 = container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/*");
|
||||
var cat2 = container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/*");
|
||||
Log.info("cat1: " + cat1);
|
||||
Log.info("cat2: " + cat2);
|
||||
Log.info("ls1: " + ls1);
|
||||
Log.info("ls2: " + ls2);
|
||||
|
||||
if (!ls1.getStdout().equals(ls2.getStdout())) {
|
||||
Log.info("Different ls?");
|
||||
return false;
|
||||
}
|
||||
|
||||
if (ls1.getStdout().trim().isEmpty() && ls2.getStdout().trim().isEmpty()) {
|
||||
Log.info("Both empty");
|
||||
return true;
|
||||
}
|
||||
|
||||
if (!cat1.getStdout().equals(cat2.getStdout())) {
|
||||
Log.info("Different cat?");
|
||||
return false;
|
||||
}
|
||||
|
||||
if (!(cat1.getExitCode() == 0 && cat2.getExitCode() == 0 && ls1.getExitCode() == 0 && ls2.getExitCode() == 0)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
boolean hasMoved = cat1.getStdout().contains("tesempty") && cat2.getStdout().contains("tesempty")
|
||||
&& ls1.getStdout().contains("testf2") && !ls1.getStdout().contains("testf1")
|
||||
&& ls2.getStdout().contains("testf2") && !ls2.getStdout().contains("testf1");
|
||||
|
||||
boolean removed = !cat1.getStdout().contains("tesempty") && !cat2.getStdout().contains("tesempty")
|
||||
&& !ls1.getStdout().contains("testf2") && !ls1.getStdout().contains("testf1")
|
||||
&& !ls2.getStdout().contains("testf2") && !ls2.getStdout().contains("testf1");
|
||||
|
||||
if (hasMoved && removed) {
|
||||
Log.info("Both removed and moved");
|
||||
return false;
|
||||
}
|
||||
|
||||
return hasMoved || removed;
|
||||
});
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,331 @@
|
||||
package com.usatiuk.dhfsfuse.integration;
|
||||
|
||||
import com.github.dockerjava.api.model.Device;
|
||||
import io.quarkus.logging.Log;
|
||||
import org.junit.jupiter.api.*;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.testcontainers.DockerClientFactory;
|
||||
import org.testcontainers.containers.GenericContainer;
|
||||
import org.testcontainers.containers.Network;
|
||||
import org.testcontainers.containers.output.Slf4jLogConsumer;
|
||||
import org.testcontainers.containers.output.WaitingConsumer;
|
||||
import org.testcontainers.containers.wait.strategy.Wait;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.time.Duration;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import static org.awaitility.Awaitility.await;
|
||||
|
||||
public class DhfsFusex3IT {
|
||||
GenericContainer<?> container1;
|
||||
GenericContainer<?> container2;
|
||||
GenericContainer<?> container3;
|
||||
|
||||
WaitingConsumer waitingConsumer1;
|
||||
WaitingConsumer waitingConsumer2;
|
||||
WaitingConsumer waitingConsumer3;
|
||||
|
||||
String c1uuid;
|
||||
String c2uuid;
|
||||
String c3uuid;
|
||||
|
||||
Network network;
|
||||
|
||||
// This calculation is somewhat racy, so keep it hardcoded for now
|
||||
long emptyFileCount = 9;
|
||||
|
||||
@BeforeEach
|
||||
void setup(TestInfo testInfo) throws IOException, InterruptedException, TimeoutException {
|
||||
// TODO: Dedup
|
||||
network = Network.newNetwork();
|
||||
|
||||
container1 = new GenericContainer<>(DhfsImage.getInstance())
|
||||
.withPrivilegedMode(true)
|
||||
.withCreateContainerCmdModifier(cmd -> Objects.requireNonNull(cmd.getHostConfig()).withDevices(Device.parse("/dev/fuse")))
|
||||
.waitingFor(Wait.forLogMessage(".*Listening.*", 1).withStartupTimeout(Duration.ofSeconds(60))).withNetwork(network);
|
||||
container2 = new GenericContainer<>(DhfsImage.getInstance())
|
||||
.withPrivilegedMode(true)
|
||||
.withCreateContainerCmdModifier(cmd -> Objects.requireNonNull(cmd.getHostConfig()).withDevices(Device.parse("/dev/fuse")))
|
||||
.waitingFor(Wait.forLogMessage(".*Listening.*", 1).withStartupTimeout(Duration.ofSeconds(60))).withNetwork(network);
|
||||
container3 = new GenericContainer<>(DhfsImage.getInstance())
|
||||
.withPrivilegedMode(true)
|
||||
.withCreateContainerCmdModifier(cmd -> Objects.requireNonNull(cmd.getHostConfig()).withDevices(Device.parse("/dev/fuse")))
|
||||
.waitingFor(Wait.forLogMessage(".*Listening.*", 1).withStartupTimeout(Duration.ofSeconds(60))).withNetwork(network);
|
||||
|
||||
|
||||
Stream.of(container1, container2, container3).parallel().forEach(GenericContainer::start);
|
||||
|
||||
c1uuid = container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/data/stuff/self_uuid").getStdout();
|
||||
c2uuid = container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/data/stuff/self_uuid").getStdout();
|
||||
c3uuid = container3.execInContainer("/bin/sh", "-c", "cat /dhfs_test/data/stuff/self_uuid").getStdout();
|
||||
|
||||
Log.info(container1.getContainerId() + "=" + c1uuid);
|
||||
Log.info(container2.getContainerId() + "=" + c2uuid);
|
||||
Log.info(container3.getContainerId() + "=" + c3uuid);
|
||||
|
||||
waitingConsumer1 = new WaitingConsumer();
|
||||
var loggingConsumer1 = new Slf4jLogConsumer(LoggerFactory.getLogger(DhfsFusex3IT.class))
|
||||
.withPrefix(c1uuid.substring(0, 4) + "-" + testInfo.getDisplayName());
|
||||
container1.followOutput(loggingConsumer1.andThen(waitingConsumer1));
|
||||
waitingConsumer2 = new WaitingConsumer();
|
||||
var loggingConsumer2 = new Slf4jLogConsumer(LoggerFactory.getLogger(DhfsFusex3IT.class))
|
||||
.withPrefix(c2uuid.substring(0, 4) + "-" + testInfo.getDisplayName());
|
||||
container2.followOutput(loggingConsumer2.andThen(waitingConsumer2));
|
||||
waitingConsumer3 = new WaitingConsumer();
|
||||
var loggingConsumer3 = new Slf4jLogConsumer(LoggerFactory.getLogger(DhfsFusex3IT.class))
|
||||
.withPrefix(c3uuid.substring(0, 4) + "-" + testInfo.getDisplayName());
|
||||
container3.followOutput(loggingConsumer3.andThen(waitingConsumer3));
|
||||
|
||||
Assertions.assertDoesNotThrow(() -> UUID.fromString(c1uuid));
|
||||
Assertions.assertDoesNotThrow(() -> UUID.fromString(c2uuid));
|
||||
Assertions.assertDoesNotThrow(() -> UUID.fromString(c3uuid));
|
||||
|
||||
waitingConsumer3.waitUntil(frame -> frame.getUtf8String().contains("New address"), 60, TimeUnit.SECONDS, 2);
|
||||
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("New address"), 60, TimeUnit.SECONDS, 2);
|
||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("New address"), 60, TimeUnit.SECONDS, 2);
|
||||
|
||||
var c1curl = container1.execInContainer("/bin/sh", "-c",
|
||||
"curl --header \"Content-Type: application/json\" " +
|
||||
" --request PUT " +
|
||||
" --data '{}' " +
|
||||
" http://localhost:8080/peers-manage/known-peers/" + c2uuid);
|
||||
|
||||
var c2curl1 = container2.execInContainer("/bin/sh", "-c",
|
||||
"curl --header \"Content-Type: application/json\" " +
|
||||
" --request PUT " +
|
||||
" --data '{}' " +
|
||||
" http://localhost:8080/peers-manage/known-peers/" + c1uuid);
|
||||
|
||||
var c2curl3 = container2.execInContainer("/bin/sh", "-c",
|
||||
"curl --header \"Content-Type: application/json\" " +
|
||||
" --request PUT " +
|
||||
" --data '{}' " +
|
||||
" http://localhost:8080/peers-manage/known-peers/" + c3uuid);
|
||||
|
||||
var c3curl = container3.execInContainer("/bin/sh", "-c",
|
||||
"curl --header \"Content-Type: application/json\" " +
|
||||
" --request PUT " +
|
||||
" --data '{}' " +
|
||||
" http://localhost:8080/peers-manage/known-peers/" + c2uuid);
|
||||
|
||||
waitingConsumer3.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS, 2);
|
||||
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS, 2);
|
||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS, 2);
|
||||
}
|
||||
|
||||
private boolean checkEmpty() throws IOException, InterruptedException {
|
||||
for (var container : List.of(container1, container2, container3)) {
|
||||
var found = container.execInContainer("/bin/sh", "-c", "find /dhfs_test/data/objs -type f");
|
||||
var foundWc = container.execInContainer("/bin/sh", "-c", "find /dhfs_test/data/objs -type f | wc -l");
|
||||
Log.info("Remaining objects in " + container.getContainerId() + ": " + found.toString() + " " + foundWc.toString());
|
||||
if (!(found.getExitCode() == 0 && foundWc.getExitCode() == 0 && Integer.parseInt(foundWc.getStdout().strip()) == emptyFileCount))
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
@AfterEach
|
||||
void stop() {
|
||||
Stream.of(container1, container2, container3).parallel().forEach(GenericContainer::stop);
|
||||
network.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
void readWriteFileTest() throws IOException, InterruptedException, TimeoutException {
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo tesempty > /dhfs_test/fuse/testf1").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> "tesempty\n".equals(container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testf1").getStdout()));
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> "tesempty\n".equals(container3.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testf1").getStdout()));
|
||||
}
|
||||
|
||||
// FIXME:
|
||||
@Test
|
||||
@Disabled
|
||||
void largerFileDeleteTest() throws IOException, InterruptedException, TimeoutException {
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "cd /dhfs_test/fuse && dd if=/dev/urandom of=10MB.bin bs=1M count=10").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "head -c 10 /dhfs_test/fuse/10MB.bin").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container3.execInContainer("/bin/sh", "-c", "rm /dhfs_test/fuse/10MB.bin").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> checkEmpty());
|
||||
}
|
||||
|
||||
@Test
|
||||
@Disabled
|
||||
void largerFileDeleteTestNoDelays() throws IOException, InterruptedException, TimeoutException {
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "cd /dhfs_test/fuse && dd if=/dev/urandom of=10MB.bin bs=1M count=10").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "head -c 10 /dhfs_test/fuse/10MB.bin").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container3.execInContainer("/bin/sh", "-c", "rm /dhfs_test/fuse/10MB.bin").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> checkEmpty());
|
||||
}
|
||||
|
||||
@Test
|
||||
void gccHelloWorldTest() throws IOException, InterruptedException, TimeoutException {
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo '#include<stdio.h>\nint main(){printf(\"hello world\"); return 0;}' > /dhfs_test/fuse/hello.c").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "cd /dhfs_test/fuse && gcc hello.c").getExitCode());
|
||||
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> {
|
||||
var helloOut = container1.execInContainer("/bin/sh", "-c", "/dhfs_test/fuse/a.out");
|
||||
Log.info(helloOut);
|
||||
return helloOut.getExitCode() == 0 && helloOut.getStdout().equals("hello world");
|
||||
});
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> {
|
||||
var helloOut = container2.execInContainer("/bin/sh", "-c", "/dhfs_test/fuse/a.out");
|
||||
Log.info(helloOut);
|
||||
return helloOut.getExitCode() == 0 && helloOut.getStdout().equals("hello world");
|
||||
});
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> {
|
||||
var helloOut = container3.execInContainer("/bin/sh", "-c", "/dhfs_test/fuse/a.out");
|
||||
Log.info(helloOut);
|
||||
return helloOut.getExitCode() == 0 && helloOut.getStdout().equals("hello world");
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
void removeHostTest() throws IOException, InterruptedException, TimeoutException {
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo tesempty > /dhfs_test/fuse/testf1").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> "tesempty\n".equals(container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testf1").getStdout()));
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> "tesempty\n".equals(container3.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testf1").getStdout()));
|
||||
|
||||
var c3curl = container3.execInContainer("/bin/sh", "-c",
|
||||
"curl --header \"Content-Type: application/json\" " +
|
||||
" --request DELETE " +
|
||||
" --data '{}' " +
|
||||
" http://localhost:8080/peers-manage/known-peers/" + c2uuid);
|
||||
|
||||
Thread.sleep(10000);
|
||||
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "echo rewritten > /dhfs_test/fuse/testf1").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> "rewritten\n".equals(container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testf1").getStdout()));
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> "tesempty\n".equals(container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testf1").getStdout()));
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> "tesempty\n".equals(container3.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testf1").getStdout()));
|
||||
}
|
||||
|
||||
@Test
|
||||
void dirConflictTest() throws IOException, InterruptedException, TimeoutException {
|
||||
var client = DockerClientFactory.instance().client();
|
||||
client.pauseContainerCmd(container1.getContainerId()).exec();
|
||||
client.pauseContainerCmd(container2.getContainerId()).exec();
|
||||
// Pauses needed as otherwise docker buffers some incoming packets
|
||||
waitingConsumer3.waitUntil(frame -> frame.getUtf8String().contains("Lost connection to"), 60, TimeUnit.SECONDS, 2);
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container3.execInContainer("/bin/sh", "-c", "echo test3 >> /dhfs_test/fuse/testf").getExitCode());
|
||||
client.pauseContainerCmd(container3.getContainerId()).exec();
|
||||
client.unpauseContainerCmd(container2.getContainerId()).exec();
|
||||
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Lost connection to"), 60, TimeUnit.SECONDS, 2);
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "echo test2 >> /dhfs_test/fuse/testf").getExitCode());
|
||||
client.pauseContainerCmd(container2.getContainerId()).exec();
|
||||
client.unpauseContainerCmd(container1.getContainerId()).exec();
|
||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Lost connection to"), 60, TimeUnit.SECONDS, 2);
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo test1 >> /dhfs_test/fuse/testf").getExitCode());
|
||||
client.unpauseContainerCmd(container2.getContainerId()).exec();
|
||||
client.unpauseContainerCmd(container3.getContainerId()).exec();
|
||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS, 2);
|
||||
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS, 2);
|
||||
waitingConsumer3.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS, 2);
|
||||
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> {
|
||||
for (var c : List.of(container1, container2, container3)) {
|
||||
var ls = c.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse");
|
||||
var cat = c.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/*");
|
||||
Log.info(ls);
|
||||
Log.info(cat);
|
||||
if (!(cat.getStdout().contains("test1") && cat.getStdout().contains("test2") && cat.getStdout().contains("test3")))
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
});
|
||||
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> {
|
||||
return container1.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse").getStdout().equals(
|
||||
container2.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse").getStdout()) &&
|
||||
container3.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse").getStdout().equals(
|
||||
container2.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse").getStdout()) &&
|
||||
container3.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/*").getStdout().equals(
|
||||
container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/*").getStdout());
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
void fileConflictTest() throws IOException, InterruptedException, TimeoutException {
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo tesempty > /dhfs_test/fuse/testf").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> "tesempty\n".equals(container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testf").getStdout()));
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> "tesempty\n".equals(container3.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testf").getStdout()));
|
||||
|
||||
var client = DockerClientFactory.instance().client();
|
||||
client.disconnectFromNetworkCmd().withContainerId(container1.getContainerId()).withNetworkId(network.getId()).exec();
|
||||
client.disconnectFromNetworkCmd().withContainerId(container2.getContainerId()).withNetworkId(network.getId()).exec();
|
||||
client.disconnectFromNetworkCmd().withContainerId(container3.getContainerId()).withNetworkId(network.getId()).exec();
|
||||
|
||||
waitingConsumer3.waitUntil(frame -> frame.getUtf8String().contains("Lost connection to"), 60, TimeUnit.SECONDS, 2);
|
||||
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Lost connection to"), 60, TimeUnit.SECONDS, 2);
|
||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Lost connection to"), 60, TimeUnit.SECONDS, 2);
|
||||
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container3.execInContainer("/bin/sh", "-c", "echo test3 >> /dhfs_test/fuse/testf").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "echo test2 >> /dhfs_test/fuse/testf").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo test1 >> /dhfs_test/fuse/testf").getExitCode());
|
||||
|
||||
client.connectToNetworkCmd().withContainerId(container1.getContainerId()).withNetworkId(network.getId()).exec();
|
||||
client.connectToNetworkCmd().withContainerId(container2.getContainerId()).withNetworkId(network.getId()).exec();
|
||||
client.connectToNetworkCmd().withContainerId(container3.getContainerId()).withNetworkId(network.getId()).exec();
|
||||
|
||||
Log.warn("Waiting for connections");
|
||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS, 2);
|
||||
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS, 2);
|
||||
waitingConsumer3.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS, 2);
|
||||
Log.warn("Connected");
|
||||
|
||||
// TODO: There's some issue with cache, so avoid file reads
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> {
|
||||
Log.info("Listing consistency 1");
|
||||
var ls1 = container1.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse");
|
||||
var ls2 = container2.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse");
|
||||
var ls3 = container3.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse");
|
||||
Log.info(ls1);
|
||||
Log.info(ls2);
|
||||
Log.info(ls3);
|
||||
|
||||
return (ls1.getExitCode() == 0 && ls2.getExitCode() == 0 && ls3.getExitCode() == 0)
|
||||
&& (ls1.getStdout().equals(ls2.getStdout()) && ls2.getStdout().equals(ls3.getStdout()));
|
||||
});
|
||||
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> {
|
||||
Log.info("Listing");
|
||||
for (var c : List.of(container1, container2, container3)) {
|
||||
var ls = c.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse");
|
||||
var cat = c.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/*");
|
||||
Log.info(ls);
|
||||
Log.info(cat);
|
||||
if (!(cat.getExitCode() == 0 && ls.getExitCode() == 0))
|
||||
return false;
|
||||
if (!(cat.getStdout().contains("test1") && cat.getStdout().contains("test2") && cat.getStdout().contains("test3")))
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
});
|
||||
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> {
|
||||
Log.info("Listing consistency");
|
||||
var ls1 = container1.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse");
|
||||
var cat1 = container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/*");
|
||||
var ls2 = container2.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse");
|
||||
var cat2 = container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/*");
|
||||
var ls3 = container3.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse");
|
||||
var cat3 = container3.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/*");
|
||||
Log.info(ls1);
|
||||
Log.info(cat1);
|
||||
Log.info(ls2);
|
||||
Log.info(cat2);
|
||||
Log.info(ls3);
|
||||
Log.info(cat3);
|
||||
|
||||
return (ls1.getExitCode() == 0 && ls2.getExitCode() == 0 && ls3.getExitCode() == 0)
|
||||
&& (cat1.getExitCode() == 0 && cat2.getExitCode() == 0 && cat3.getExitCode() == 0)
|
||||
&& (cat1.getStdout().equals(cat2.getStdout()) && cat2.getStdout().equals(cat3.getStdout()))
|
||||
&& (ls1.getStdout().equals(ls2.getStdout()) && ls2.getStdout().equals(ls3.getStdout()));
|
||||
});
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,102 @@
|
||||
package com.usatiuk.dhfsfuse.integration;
|
||||
|
||||
import io.quarkus.logging.Log;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
import org.testcontainers.images.builder.ImageFromDockerfile;
|
||||
|
||||
import java.nio.file.Paths;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
public class DhfsImage implements Future<String> {
|
||||
|
||||
private static final DhfsImage INSTANCE = new DhfsImage();
|
||||
private static String _builtImage = null;
|
||||
|
||||
private DhfsImage() {
|
||||
}
|
||||
|
||||
public static DhfsImage getInstance() {
|
||||
return INSTANCE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean cancel(boolean mayInterruptIfRunning) {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isCancelled() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isDone() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String get() throws InterruptedException, ExecutionException {
|
||||
return buildImpl();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String get(long timeout, @NotNull TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
|
||||
return buildImpl();
|
||||
}
|
||||
|
||||
private synchronized String buildImpl() {
|
||||
if (_builtImage != null) {
|
||||
return _builtImage;
|
||||
}
|
||||
|
||||
Log.info("Building image");
|
||||
|
||||
String buildPath = System.getProperty("buildDirectory");
|
||||
String nativeLibsDirectory = System.getProperty("nativeLibsDirectory");
|
||||
Log.info("Build path: " + buildPath);
|
||||
Log.info("Native libs path: " + nativeLibsDirectory);
|
||||
|
||||
var image = new ImageFromDockerfile()
|
||||
.withDockerfileFromBuilder(builder ->
|
||||
builder
|
||||
.from("azul/zulu-openjdk-debian:21-jre-headless-latest")
|
||||
.run("apt update && apt install -y libfuse2 curl gcc")
|
||||
.copy("/app", "/app")
|
||||
.copy("/libs", "/libs")
|
||||
.cmd("java", "-ea", "-Xmx256M", "-XX:TieredStopAtLevel=1", "-XX:+UseParallelGC",
|
||||
"--add-exports", "java.base/sun.nio.ch=ALL-UNNAMED",
|
||||
"--add-exports", "java.base/jdk.internal.access=ALL-UNNAMED",
|
||||
"--add-opens=java.base/java.nio=ALL-UNNAMED",
|
||||
"--enable-preview",
|
||||
"-Ddhfs.objects.peerdiscovery.interval=1s",
|
||||
"-Ddhfs.objects.invalidation.delay=100",
|
||||
"-Ddhfs.objects.deletion.delay=0",
|
||||
"-Ddhfs.objects.deletion.can-delete-retry-delay=1000",
|
||||
"-Ddhfs.objects.ref_verification=true",
|
||||
"-Ddhfs.objects.sync.timeout=30",
|
||||
"-Ddhfs.objects.sync.ping.timeout=5",
|
||||
"-Ddhfs.objects.reconnect_interval=1s",
|
||||
"-Ddhfs.objects.last-seen.timeout=30",
|
||||
"-Ddhfs.objects.last-seen.update=10",
|
||||
"-Ddhfs.sync.cert-check=false",
|
||||
"-Dquarkus.log.category.\"com.usatiuk\".level=TRACE",
|
||||
"-Dquarkus.log.category.\"com.usatiuk.dhfs\".level=TRACE",
|
||||
"-Dquarkus.log.category.\"com.usatiuk.objects.transaction\".level=INFO",
|
||||
"-Ddhfs.objects.periodic-push-op-interval=5s",
|
||||
"-Ddhfs.fuse.root=/dhfs_test/fuse",
|
||||
"-Ddhfs.objects.persistence.files.root=/dhfs_test/data",
|
||||
"-Ddhfs.objects.persistence.stuff.root=/dhfs_test/data/stuff",
|
||||
"-jar", "/app/quarkus-run.jar")
|
||||
.run("mkdir /dhfs_test && chmod 777 /dhfs_test")
|
||||
.build())
|
||||
.withFileFromPath("/app", Paths.get(buildPath, "quarkus-app"))
|
||||
.withFileFromPath("/libs", Paths.get(nativeLibsDirectory));
|
||||
|
||||
_builtImage = image.get();
|
||||
Log.info("Image built: " + _builtImage);
|
||||
return _builtImage;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,235 @@
|
||||
package com.usatiuk.dhfsfuse.integration;
|
||||
|
||||
import com.github.dockerjava.api.model.Device;
|
||||
import com.usatiuk.dhfsfuse.TestDataCleaner;
|
||||
import io.quarkus.logging.Log;
|
||||
import org.junit.jupiter.api.*;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.testcontainers.DockerClientFactory;
|
||||
import org.testcontainers.containers.GenericContainer;
|
||||
import org.testcontainers.containers.Network;
|
||||
import org.testcontainers.containers.output.Slf4jLogConsumer;
|
||||
import org.testcontainers.containers.output.WaitingConsumer;
|
||||
import org.testcontainers.containers.wait.strategy.Wait;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Files;
|
||||
import java.time.Duration;
|
||||
import java.util.Objects;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.*;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import static org.awaitility.Awaitility.await;
|
||||
|
||||
public class KillIT {
|
||||
GenericContainer<?> container1;
|
||||
GenericContainer<?> container2;
|
||||
|
||||
WaitingConsumer waitingConsumer1;
|
||||
WaitingConsumer waitingConsumer2;
|
||||
|
||||
String c1uuid;
|
||||
String c2uuid;
|
||||
|
||||
File data1;
|
||||
File data2;
|
||||
|
||||
Network network;
|
||||
|
||||
ExecutorService executor;
|
||||
|
||||
@BeforeEach
|
||||
void setup(TestInfo testInfo) throws IOException, InterruptedException, TimeoutException {
|
||||
executor = Executors.newCachedThreadPool();
|
||||
|
||||
data1 = Files.createTempDirectory("").toFile();
|
||||
data2 = Files.createTempDirectory("").toFile();
|
||||
|
||||
network = Network.newNetwork();
|
||||
|
||||
container1 = new GenericContainer<>(DhfsImage.getInstance())
|
||||
.withPrivilegedMode(true)
|
||||
.withCreateContainerCmdModifier(cmd -> Objects.requireNonNull(cmd.getHostConfig()).withDevices(Device.parse("/dev/fuse")))
|
||||
.waitingFor(Wait.forLogMessage(".*Listening.*", 1).withStartupTimeout(Duration.ofSeconds(60))).withNetwork(network)
|
||||
.withFileSystemBind(data1.getAbsolutePath(), "/dhfs_test/data");
|
||||
container2 = new GenericContainer<>(DhfsImage.getInstance())
|
||||
.withPrivilegedMode(true)
|
||||
.withCreateContainerCmdModifier(cmd -> Objects.requireNonNull(cmd.getHostConfig()).withDevices(Device.parse("/dev/fuse")))
|
||||
.waitingFor(Wait.forLogMessage(".*Listening.*", 1).withStartupTimeout(Duration.ofSeconds(60))).withNetwork(network)
|
||||
.withFileSystemBind(data2.getAbsolutePath(), "/dhfs_test/data");
|
||||
|
||||
Stream.of(container1, container2).parallel().forEach(GenericContainer::start);
|
||||
|
||||
waitingConsumer1 = new WaitingConsumer();
|
||||
var loggingConsumer1 = new Slf4jLogConsumer(LoggerFactory.getLogger(KillIT.class)).withPrefix("1-" + testInfo.getDisplayName());
|
||||
container1.followOutput(loggingConsumer1.andThen(waitingConsumer1));
|
||||
waitingConsumer2 = new WaitingConsumer();
|
||||
var loggingConsumer2 = new Slf4jLogConsumer(LoggerFactory.getLogger(KillIT.class)).withPrefix("2-" + testInfo.getDisplayName());
|
||||
container2.followOutput(loggingConsumer2.andThen(waitingConsumer2));
|
||||
|
||||
c1uuid = container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/data/stuff/self_uuid").getStdout();
|
||||
c2uuid = container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/data/stuff/self_uuid").getStdout();
|
||||
|
||||
Assertions.assertDoesNotThrow(() -> UUID.fromString(c1uuid));
|
||||
Assertions.assertDoesNotThrow(() -> UUID.fromString(c2uuid));
|
||||
|
||||
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("New address"), 60, TimeUnit.SECONDS);
|
||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("New address"), 60, TimeUnit.SECONDS);
|
||||
|
||||
var c1curl = container1.execInContainer("/bin/sh", "-c",
|
||||
"curl --header \"Content-Type: application/json\" " +
|
||||
" --request PUT " +
|
||||
" --data '{}' " +
|
||||
" http://localhost:8080/peers-manage/known-peers/" + c2uuid);
|
||||
|
||||
var c2curl = container2.execInContainer("/bin/sh", "-c",
|
||||
"curl --header \"Content-Type: application/json\" " +
|
||||
" --request PUT " +
|
||||
" --data '{}' " +
|
||||
" http://localhost:8080/peers-manage/known-peers/" + c1uuid);
|
||||
|
||||
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
|
||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
@AfterEach
|
||||
void stop() {
|
||||
Stream.of(container1, container2).parallel().forEach(GenericContainer::stop);
|
||||
TestDataCleaner.purgeDirectory(data1);
|
||||
TestDataCleaner.purgeDirectory(data2);
|
||||
executor.close();
|
||||
network.close();
|
||||
}
|
||||
|
||||
private void checkConsistency() {
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> {
|
||||
Log.info("Listing consistency");
|
||||
var ls1 = container1.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse");
|
||||
var cat1 = container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/*");
|
||||
var ls2 = container2.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse");
|
||||
var cat2 = container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/*");
|
||||
Log.info(ls1);
|
||||
Log.info(cat1);
|
||||
Log.info(ls2);
|
||||
Log.info(cat2);
|
||||
|
||||
return ls1.equals(ls2) && cat1.equals(cat2) && ls1.getExitCode() == 0 && ls2.getExitCode() == 0 && cat1.getExitCode() == 0 && cat2.getExitCode() == 0;
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
void killTest(TestInfo testInfo) throws Exception {
|
||||
var barrier = new CyclicBarrier(2);
|
||||
var ret1 = executor.submit(() -> {
|
||||
try {
|
||||
Log.info("Writing to container 1");
|
||||
barrier.await();
|
||||
container1.execInContainer("/bin/sh", "-c", "counter=0; while true; do counter=`expr $counter + 1`; echo $counter >> /dhfs_test/fuse/test1; done");
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
});
|
||||
barrier.await();
|
||||
Thread.sleep(10000);
|
||||
var client = DockerClientFactory.instance().client();
|
||||
client.killContainerCmd(container1.getContainerId()).exec();
|
||||
container1.stop();
|
||||
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Lost connection to"), 60, TimeUnit.SECONDS);
|
||||
container1.start();
|
||||
waitingConsumer1 = new WaitingConsumer();
|
||||
var loggingConsumer1 = new Slf4jLogConsumer(LoggerFactory.getLogger(KillIT.class)).withPrefix("1-" + testInfo.getDisplayName());
|
||||
container1.followOutput(loggingConsumer1.andThen(waitingConsumer1));
|
||||
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
|
||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
|
||||
|
||||
checkConsistency();
|
||||
}
|
||||
|
||||
@Test
|
||||
void killTestDirs(TestInfo testInfo) throws Exception {
|
||||
var barrier = new CyclicBarrier(2);
|
||||
var ret1 = executor.submit(() -> {
|
||||
try {
|
||||
Log.info("Writing to container 1");
|
||||
barrier.await();
|
||||
container1.execInContainer("/bin/sh", "-c", "counter=0; while true; do counter=`expr $counter + 1`; echo $counter >> /dhfs_test/fuse/test$counter; done");
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
});
|
||||
barrier.await();
|
||||
Thread.sleep(10000);
|
||||
var client = DockerClientFactory.instance().client();
|
||||
client.killContainerCmd(container1.getContainerId()).exec();
|
||||
container1.stop();
|
||||
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Lost connection to"), 60, TimeUnit.SECONDS);
|
||||
container1.start();
|
||||
waitingConsumer1 = new WaitingConsumer();
|
||||
var loggingConsumer1 = new Slf4jLogConsumer(LoggerFactory.getLogger(KillIT.class)).withPrefix("1-" + testInfo.getDisplayName());
|
||||
container1.followOutput(loggingConsumer1.andThen(waitingConsumer1));
|
||||
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
|
||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
|
||||
|
||||
checkConsistency();
|
||||
}
|
||||
|
||||
@Test
|
||||
void killTest2(TestInfo testInfo) throws Exception {
|
||||
var barrier = new CyclicBarrier(2);
|
||||
var ret1 = executor.submit(() -> {
|
||||
try {
|
||||
Log.info("Writing to container 1");
|
||||
barrier.await();
|
||||
container1.execInContainer("/bin/sh", "-c", "counter=0; while [ ! -f /tmp/stopprinting ]; do counter=`expr $counter + 1`; echo $counter >> /dhfs_test/fuse/test1; done");
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
});
|
||||
barrier.await();
|
||||
Thread.sleep(10000);
|
||||
var client = DockerClientFactory.instance().client();
|
||||
client.killContainerCmd(container2.getContainerId()).exec();
|
||||
container2.stop();
|
||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Lost connection to"), 60, TimeUnit.SECONDS);
|
||||
container1.execInContainer("/bin/sh", "-c", "touch /tmp/stopprinting");
|
||||
container2.start();
|
||||
waitingConsumer2 = new WaitingConsumer();
|
||||
var loggingConsumer2 = new Slf4jLogConsumer(LoggerFactory.getLogger(KillIT.class)).withPrefix("2-" + testInfo.getDisplayName());
|
||||
container2.followOutput(loggingConsumer2.andThen(waitingConsumer2));
|
||||
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
|
||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
|
||||
|
||||
checkConsistency();
|
||||
}
|
||||
|
||||
@Test
|
||||
void killTestDirs2(TestInfo testInfo) throws Exception {
|
||||
var barrier = new CyclicBarrier(2);
|
||||
var ret1 = executor.submit(() -> {
|
||||
try {
|
||||
Log.info("Writing to container 1");
|
||||
barrier.await();
|
||||
container1.execInContainer("/bin/sh", "-c", "counter=0; while [ ! -f /tmp/stopprinting ]; do counter=`expr $counter + 1`; echo $counter >> /dhfs_test/fuse/test$counter; done");
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
});
|
||||
barrier.await();
|
||||
Thread.sleep(10000);
|
||||
var client = DockerClientFactory.instance().client();
|
||||
client.killContainerCmd(container2.getContainerId()).exec();
|
||||
container2.stop();
|
||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Lost connection to"), 60, TimeUnit.SECONDS);
|
||||
container1.execInContainer("/bin/sh", "-c", "touch /tmp/stopprinting");
|
||||
container2.start();
|
||||
waitingConsumer2 = new WaitingConsumer();
|
||||
var loggingConsumer2 = new Slf4jLogConsumer(LoggerFactory.getLogger(KillIT.class)).withPrefix("2-" + testInfo.getDisplayName());
|
||||
container2.followOutput(loggingConsumer2.andThen(waitingConsumer2));
|
||||
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
|
||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
|
||||
|
||||
checkConsistency();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,215 @@
|
||||
package com.usatiuk.dhfsfuse.integration;
|
||||
|
||||
import io.quarkus.logging.Log;
|
||||
|
||||
import java.io.*;
|
||||
import java.nio.file.Path;
|
||||
import java.util.ArrayList;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public class LazyFs {
|
||||
private static final String lazyFsPath;
|
||||
|
||||
static {
|
||||
lazyFsPath = System.getProperty("lazyFsPath");
|
||||
System.out.println("LazyFs Path: " + lazyFsPath);
|
||||
}
|
||||
|
||||
private final String mountRoot;
|
||||
private final String dataRoot;
|
||||
private final String name;
|
||||
private final File configFile;
|
||||
private final File fifoFile;
|
||||
private Thread errPiper;
|
||||
private Thread outPiper;
|
||||
private CountDownLatch startLatch;
|
||||
private Process fs;
|
||||
public LazyFs(String name, String mountRoot, String dataRoot) {
|
||||
this.name = name;
|
||||
this.mountRoot = mountRoot;
|
||||
this.dataRoot = dataRoot;
|
||||
|
||||
try {
|
||||
configFile = File.createTempFile("lazyfs", ".conf");
|
||||
configFile.deleteOnExit();
|
||||
|
||||
fifoFile = new File("/tmp/" + ThreadLocalRandom.current().nextLong() + ".faultsfifo");
|
||||
fifoFile.deleteOnExit();
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
||||
Runtime.getRuntime().addShutdownHook(new Thread(this::stop));
|
||||
}
|
||||
|
||||
private String fifoPath() {
|
||||
return fifoFile.getAbsolutePath();
|
||||
}
|
||||
|
||||
public void start(String extraOpts) {
|
||||
var lfsPath = Path.of(lazyFsPath).resolve("build").resolve("lazyfs");
|
||||
if (!lfsPath.toFile().isFile())
|
||||
throw new IllegalStateException("LazyFs binary does not exist: " + lfsPath.toAbsolutePath());
|
||||
if (!lfsPath.toFile().canExecute())
|
||||
throw new IllegalStateException("LazyFs binary is not executable: " + lfsPath.toAbsolutePath());
|
||||
|
||||
try (var rwFile = new RandomAccessFile(configFile, "rw");
|
||||
var channel = rwFile.getChannel()) {
|
||||
channel.truncate(0);
|
||||
var config = "[faults]\n" +
|
||||
"fifo_path=\"" + fifoPath() + "\"\n" +
|
||||
"[cache]\n" +
|
||||
"apply_eviction=false\n" +
|
||||
"[cache.simple]\n" +
|
||||
"custom_size=\"1gb\"\n" +
|
||||
"blocks_per_page=1\n" +
|
||||
"[filesystem]\n" +
|
||||
"log_all_operations=false\n" +
|
||||
"logfile=\"\"\n" + extraOpts;
|
||||
rwFile.write(config.getBytes());
|
||||
Log.info("LazyFs config: \n" + config);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
||||
var argList = new ArrayList<String>();
|
||||
|
||||
argList.add(lfsPath.toString());
|
||||
argList.add(Path.of(mountRoot).toString());
|
||||
argList.add("--config-path");
|
||||
argList.add(configFile.getAbsolutePath());
|
||||
argList.add("-o");
|
||||
argList.add("allow_other");
|
||||
argList.add("-o");
|
||||
argList.add("modules=subdir");
|
||||
argList.add("-o");
|
||||
argList.add("subdir=" + Path.of(dataRoot).toAbsolutePath().toString());
|
||||
try {
|
||||
Log.info("Starting LazyFs " + argList);
|
||||
fs = Runtime.getRuntime().exec(argList.toArray(String[]::new));
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
||||
startLatch = new CountDownLatch(1);
|
||||
|
||||
outPiper = new Thread(() -> {
|
||||
try {
|
||||
try (BufferedReader input = new BufferedReader(new InputStreamReader(fs.getInputStream()))) {
|
||||
String line;
|
||||
|
||||
while ((line = input.readLine()) != null) {
|
||||
if (line.contains("running LazyFS"))
|
||||
startLatch.countDown();
|
||||
System.out.println(line);
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
Log.info("Exception in LazyFs piper", e);
|
||||
}
|
||||
Log.info("LazyFs out piper finished");
|
||||
});
|
||||
outPiper.start();
|
||||
errPiper = new Thread(() -> {
|
||||
try {
|
||||
try (BufferedReader input = new BufferedReader(new InputStreamReader(fs.getErrorStream()))) {
|
||||
String line;
|
||||
|
||||
while ((line = input.readLine()) != null) {
|
||||
System.out.println(line);
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
Log.info("Exception in LazyFs piper", e);
|
||||
}
|
||||
Log.info("LazyFs err piper finished");
|
||||
});
|
||||
errPiper.start();
|
||||
|
||||
try {
|
||||
if (!startLatch.await(30, TimeUnit.SECONDS))
|
||||
throw new RuntimeException("StartLatch timed out");
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
Log.info("LazyFs started");
|
||||
}
|
||||
|
||||
public void start() {
|
||||
start("");
|
||||
}
|
||||
|
||||
private String mdbPath() {
|
||||
return Path.of(dataRoot).resolve("objects").resolve("data.mdb").toAbsolutePath().toString();
|
||||
}
|
||||
|
||||
public void startTornOp() {
|
||||
start("\n" +
|
||||
"[[injection]]\n" +
|
||||
"type=\"torn-seq\"\n" +
|
||||
"op=\"write\"\n" +
|
||||
"file=\"" + mdbPath() + "\"\n" +
|
||||
"persist=[1,4]\n" +
|
||||
"occurrence=3");
|
||||
}
|
||||
|
||||
public void startTornSeq() {
|
||||
start("[[injection]]\n" +
|
||||
"type=\"torn-op\"\n" +
|
||||
"file=\"" + mdbPath() + "\"\n" +
|
||||
"occurrence=3\n" +
|
||||
"parts=3 #or parts_bytes=[4096,3600,1260]\n" +
|
||||
"persist=[1,3]");
|
||||
}
|
||||
|
||||
public void crash() {
|
||||
try {
|
||||
var cmd = "echo \"lazyfs::crash::timing=after::op=write::from_rgx=*\" > " + fifoPath();
|
||||
Log.info("Running command: " + cmd);
|
||||
Runtime.getRuntime().exec(new String[]{"/bin/sh", "-c", cmd}).waitFor();
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
public void stop() {
|
||||
try {
|
||||
synchronized (this) {
|
||||
Runtime.getRuntime().exec(new String[]{"/bin/sh", "-c", "fusermount3 -u " + mountRoot}).waitFor();
|
||||
}
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
// Doesn't actually work?
|
||||
//
|
||||
// public void crashop() {
|
||||
// try {
|
||||
// var cmd = "echo \"lazyfs::torn-op::file=" + Path.of(lazyFsDataPath).toAbsolutePath().toString() + "/objects/data.mdb::persist=1,3::parts=3::occurrence=5\" > /tmp/faults.fifo";
|
||||
// System.out.println("Running command: " + cmd);
|
||||
// Runtime.getRuntime().exec(new String[]{"/bin/sh", "-c", cmd});
|
||||
// Thread.sleep(1000);
|
||||
// Runtime.getRuntime().exec(new String[]{"/bin/sh", "-c", "fusermount3 -u " + dataRoot});
|
||||
// Thread.sleep(1000);
|
||||
// } catch (Exception e) {
|
||||
// throw new RuntimeException(e);
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// public void crashseq() {
|
||||
// try {
|
||||
// var cmd = "echo \"lazyfs::torn-seq::op=write::file=" + Path.of(lazyFsDataPath).toAbsolutePath().toString() + "/objects/data.mdb::persist=1,4::occurrence=2\" > /tmp/faults.fifo";
|
||||
// System.out.println("Running command: " + cmd);
|
||||
// Runtime.getRuntime().exec(new String[]{"/bin/sh", "-c", cmd});
|
||||
// Thread.sleep(1000);
|
||||
// Runtime.getRuntime().exec(new String[]{"/bin/sh", "-c", "fusermount3 -u " + dataRoot});
|
||||
// Thread.sleep(1000);
|
||||
// } catch (Exception e) {
|
||||
// throw new RuntimeException(e);
|
||||
// }
|
||||
// }
|
||||
}
|
||||
|
||||
@@ -0,0 +1,489 @@
|
||||
package com.usatiuk.dhfsfuse.integration;
|
||||
|
||||
import com.github.dockerjava.api.model.Device;
|
||||
import com.usatiuk.dhfsfuse.TestDataCleaner;
|
||||
import io.quarkus.logging.Log;
|
||||
import org.junit.jupiter.api.*;
|
||||
import org.junit.jupiter.params.ParameterizedTest;
|
||||
import org.junit.jupiter.params.provider.EnumSource;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.testcontainers.DockerClientFactory;
|
||||
import org.testcontainers.containers.GenericContainer;
|
||||
import org.testcontainers.containers.Network;
|
||||
import org.testcontainers.containers.output.Slf4jLogConsumer;
|
||||
import org.testcontainers.containers.output.WaitingConsumer;
|
||||
import org.testcontainers.containers.wait.strategy.Wait;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Files;
|
||||
import java.time.Duration;
|
||||
import java.util.Objects;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.*;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import static org.awaitility.Awaitility.await;
|
||||
|
||||
public class LazyFsIT {
|
||||
GenericContainer<?> container1;
|
||||
GenericContainer<?> container2;
|
||||
|
||||
WaitingConsumer waitingConsumer1;
|
||||
WaitingConsumer waitingConsumer2;
|
||||
|
||||
String c1uuid;
|
||||
String c2uuid;
|
||||
|
||||
File data1;
|
||||
File data2;
|
||||
File data1Lazy;
|
||||
File data2Lazy;
|
||||
|
||||
LazyFs lazyFs1;
|
||||
LazyFs lazyFs2;
|
||||
|
||||
ExecutorService executor;
|
||||
Network network;
|
||||
|
||||
@BeforeEach
|
||||
void setup(TestInfo testInfo) throws IOException, InterruptedException, TimeoutException {
|
||||
executor = Executors.newCachedThreadPool();
|
||||
data1 = Files.createTempDirectory("dhfsdata").toFile();
|
||||
data2 = Files.createTempDirectory("dhfsdata").toFile();
|
||||
data1Lazy = Files.createTempDirectory("lazyfsroot").toFile();
|
||||
data2Lazy = Files.createTempDirectory("lazyfsroot").toFile();
|
||||
|
||||
network = Network.newNetwork();
|
||||
|
||||
lazyFs1 = new LazyFs(testInfo.getDisplayName(), data1.toString(), data1Lazy.toString());
|
||||
lazyFs1.start();
|
||||
lazyFs2 = new LazyFs(testInfo.getDisplayName(), data2.toString(), data2Lazy.toString());
|
||||
lazyFs2.start();
|
||||
|
||||
container1 = new GenericContainer<>(DhfsImage.getInstance())
|
||||
.withPrivilegedMode(true)
|
||||
.withCreateContainerCmdModifier(cmd -> Objects.requireNonNull(cmd.getHostConfig()).withDevices(Device.parse("/dev/fuse")))
|
||||
.waitingFor(Wait.forLogMessage(".*Listening.*", 1).withStartupTimeout(Duration.ofSeconds(60))).withNetwork(network)
|
||||
.withFileSystemBind(data1.getAbsolutePath(), "/dhfs_test/data");
|
||||
container2 = new GenericContainer<>(DhfsImage.getInstance())
|
||||
.withPrivilegedMode(true)
|
||||
.withCreateContainerCmdModifier(cmd -> Objects.requireNonNull(cmd.getHostConfig()).withDevices(Device.parse("/dev/fuse")))
|
||||
.waitingFor(Wait.forLogMessage(".*Listening.*", 1).withStartupTimeout(Duration.ofSeconds(60))).withNetwork(network)
|
||||
.withFileSystemBind(data2.getAbsolutePath(), "/dhfs_test/data");
|
||||
|
||||
Stream.of(container1, container2).parallel().forEach(GenericContainer::start);
|
||||
|
||||
waitingConsumer1 = new WaitingConsumer();
|
||||
var loggingConsumer1 = new Slf4jLogConsumer(LoggerFactory.getLogger(LazyFsIT.class)).withPrefix("1-" + testInfo.getDisplayName());
|
||||
container1.followOutput(loggingConsumer1.andThen(waitingConsumer1));
|
||||
waitingConsumer2 = new WaitingConsumer();
|
||||
var loggingConsumer2 = new Slf4jLogConsumer(LoggerFactory.getLogger(LazyFsIT.class)).withPrefix("2-" + testInfo.getDisplayName());
|
||||
container2.followOutput(loggingConsumer2.andThen(waitingConsumer2));
|
||||
|
||||
c1uuid = container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/data/stuff/self_uuid").getStdout();
|
||||
c2uuid = container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/data/stuff/self_uuid").getStdout();
|
||||
|
||||
Assertions.assertDoesNotThrow(() -> UUID.fromString(c1uuid));
|
||||
Assertions.assertDoesNotThrow(() -> UUID.fromString(c2uuid));
|
||||
|
||||
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("New address"), 60, TimeUnit.SECONDS);
|
||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("New address"), 60, TimeUnit.SECONDS);
|
||||
|
||||
var c1curl = container1.execInContainer("/bin/sh", "-c",
|
||||
"curl --header \"Content-Type: application/json\" " +
|
||||
" --request PUT " +
|
||||
" --data '{}' " +
|
||||
" http://localhost:8080/peers-manage/known-peers/" + c2uuid);
|
||||
|
||||
var c2curl = container2.execInContainer("/bin/sh", "-c",
|
||||
"curl --header \"Content-Type: application/json\" " +
|
||||
" --request PUT " +
|
||||
" --data '{}' " +
|
||||
" http://localhost:8080/peers-manage/known-peers/" + c1uuid);
|
||||
|
||||
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
|
||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
@AfterEach
|
||||
void stop() {
|
||||
lazyFs1.stop();
|
||||
lazyFs2.stop();
|
||||
|
||||
Stream.of(container1, container2).parallel().forEach(GenericContainer::stop);
|
||||
TestDataCleaner.purgeDirectory(data1);
|
||||
TestDataCleaner.purgeDirectory(data1Lazy);
|
||||
TestDataCleaner.purgeDirectory(data2);
|
||||
TestDataCleaner.purgeDirectory(data2Lazy);
|
||||
|
||||
executor.close();
|
||||
network.close();
|
||||
}
|
||||
|
||||
private void checkConsistency(String testName) {
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> {
|
||||
var ls1 = container1.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse");
|
||||
var cat1 = container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/*");
|
||||
var ls2 = container2.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse");
|
||||
var cat2 = container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/*");
|
||||
Log.info("Listing consistency " + testName + "\n"
|
||||
+ ls1 + "\n"
|
||||
+ cat1 + "\n"
|
||||
+ ls2 + "\n"
|
||||
+ cat2 + "\n");
|
||||
|
||||
return ls1.equals(ls2) && cat1.equals(cat2) && ls1.getExitCode() == 0 && ls2.getExitCode() == 0 && cat1.getExitCode() == 0 && cat2.getExitCode() == 0;
|
||||
});
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@EnumSource(CrashType.class)
|
||||
void killTest(CrashType crashType, TestInfo testInfo) throws Exception {
|
||||
var barrier = new CountDownLatch(1);
|
||||
executor.submit(() -> {
|
||||
try {
|
||||
Log.info("Writing to container 1");
|
||||
barrier.countDown();
|
||||
container1.execInContainer("/bin/sh", "-c", "counter=0; while true; do counter=`expr $counter + 1`; echo $counter >> /dhfs_test/fuse/test1; done");
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
});
|
||||
barrier.await();
|
||||
Thread.sleep(3000);
|
||||
Log.info("Killing");
|
||||
lazyFs1.crash();
|
||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Caused by: org.lmdbjava"), 60, TimeUnit.SECONDS);
|
||||
var client = DockerClientFactory.instance().client();
|
||||
client.killContainerCmd(container1.getContainerId()).exec();
|
||||
container1.stop();
|
||||
lazyFs1.stop();
|
||||
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Lost connection to"), 60, TimeUnit.SECONDS);
|
||||
Log.info("Restart");
|
||||
switch (crashType) {
|
||||
case CRASH -> lazyFs1.start();
|
||||
case TORN_OP -> lazyFs1.startTornOp();
|
||||
case TORN_SEQ -> lazyFs1.startTornSeq();
|
||||
}
|
||||
container1.start();
|
||||
|
||||
waitingConsumer1 = new WaitingConsumer();
|
||||
var loggingConsumer1 = new Slf4jLogConsumer(LoggerFactory.getLogger(LazyFsIT.class)).withPrefix("1-" + testInfo.getDisplayName());
|
||||
container1.followOutput(loggingConsumer1.andThen(waitingConsumer1));
|
||||
try {
|
||||
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
|
||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
|
||||
} catch (TimeoutException e) {
|
||||
Log.info("Failed to connect: " + testInfo.getDisplayName());
|
||||
// Sometimes it doesn't get mounted properly for some reason
|
||||
Assumptions.assumeTrue(false);
|
||||
}
|
||||
|
||||
executor.submit(() -> {
|
||||
try {
|
||||
Log.info("Writing to container 1");
|
||||
barrier.countDown();
|
||||
container1.execInContainer("/bin/sh", "-c", "counter=0; while true; do counter=`expr $counter + 1`; echo $counter >> /dhfs_test/fuse/test2; done");
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
});
|
||||
Log.info("Killing");
|
||||
if (crashType.equals(CrashType.CRASH)) {
|
||||
Thread.sleep(3000);
|
||||
lazyFs1.crash();
|
||||
}
|
||||
try {
|
||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Caused by: org.lmdbjava"), 60, TimeUnit.SECONDS);
|
||||
} catch (TimeoutException e) {
|
||||
// Sometimes crash doesn't work
|
||||
Log.info("Failed to crash: " + testInfo.getDisplayName());
|
||||
if (crashType.equals(CrashType.CRASH))
|
||||
throw e;
|
||||
Assumptions.assumeTrue(false);
|
||||
}
|
||||
client.killContainerCmd(container1.getContainerId()).exec();
|
||||
container1.stop();
|
||||
lazyFs1.stop();
|
||||
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Lost connection to"), 60, TimeUnit.SECONDS);
|
||||
Log.info("Restart");
|
||||
lazyFs1.start();
|
||||
container1.start();
|
||||
|
||||
waitingConsumer1 = new WaitingConsumer();
|
||||
loggingConsumer1 = new Slf4jLogConsumer(LoggerFactory.getLogger(LazyFsIT.class)).withPrefix("1-" + testInfo.getDisplayName());
|
||||
container1.followOutput(loggingConsumer1.andThen(waitingConsumer1));
|
||||
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
|
||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
|
||||
|
||||
checkConsistency(testInfo.getDisplayName());
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@EnumSource(CrashType.class)
|
||||
void killTestDirs(CrashType crashType, TestInfo testInfo) throws Exception {
|
||||
var barrier = new CountDownLatch(1);
|
||||
executor.submit(() -> {
|
||||
try {
|
||||
Log.info("Writing to container 1");
|
||||
barrier.countDown();
|
||||
container1.execInContainer("/bin/sh", "-c", "counter=0; while true; do counter=`expr $counter + 1`; echo $counter >> /dhfs_test/fuse/test$counter; done");
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
});
|
||||
barrier.await();
|
||||
Thread.sleep(3000);
|
||||
Log.info("Killing");
|
||||
lazyFs1.crash();
|
||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Caused by: org.lmdbjava"), 60, TimeUnit.SECONDS);
|
||||
var client = DockerClientFactory.instance().client();
|
||||
client.killContainerCmd(container1.getContainerId()).exec();
|
||||
container1.stop();
|
||||
lazyFs1.stop();
|
||||
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Lost connection to"), 60, TimeUnit.SECONDS);
|
||||
Log.info("Restart");
|
||||
switch (crashType) {
|
||||
case CRASH -> lazyFs1.start();
|
||||
case TORN_OP -> lazyFs1.startTornOp();
|
||||
case TORN_SEQ -> lazyFs1.startTornSeq();
|
||||
}
|
||||
container1.start();
|
||||
|
||||
waitingConsumer1 = new WaitingConsumer();
|
||||
var loggingConsumer1 = new Slf4jLogConsumer(LoggerFactory.getLogger(LazyFsIT.class)).withPrefix("1-" + testInfo.getDisplayName());
|
||||
container1.followOutput(loggingConsumer1.andThen(waitingConsumer1));
|
||||
try {
|
||||
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
|
||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
|
||||
} catch (TimeoutException e) {
|
||||
Log.info("Failed to connect: " + testInfo.getDisplayName());
|
||||
// Sometimes it doesn't get mounted properly for some reason
|
||||
|
||||
Assumptions.assumeTrue(false);
|
||||
}
|
||||
|
||||
executor.submit(() -> {
|
||||
try {
|
||||
Log.info("Writing to container 1");
|
||||
barrier.countDown();
|
||||
container1.execInContainer("/bin/sh", "-c", "counter=0; while true; do counter=`expr $counter + 1`; echo $counter >> /dhfs_test/fuse/2test$counter; done");
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
});
|
||||
Log.info("Killing");
|
||||
if (crashType.equals(CrashType.CRASH)) {
|
||||
Thread.sleep(3000);
|
||||
lazyFs1.crash();
|
||||
}
|
||||
try {
|
||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Caused by: org.lmdbjava"), 60, TimeUnit.SECONDS);
|
||||
} catch (TimeoutException e) {
|
||||
// Sometimes crash doesn't work
|
||||
Log.info("Failed to crash: " + testInfo.getDisplayName());
|
||||
if (crashType.equals(CrashType.CRASH))
|
||||
throw e;
|
||||
Assumptions.assumeTrue(false);
|
||||
}
|
||||
client.killContainerCmd(container1.getContainerId()).exec();
|
||||
container1.stop();
|
||||
lazyFs1.stop();
|
||||
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Lost connection to"), 60, TimeUnit.SECONDS);
|
||||
Log.info("Restart");
|
||||
lazyFs1.start();
|
||||
container1.start();
|
||||
|
||||
waitingConsumer1 = new WaitingConsumer();
|
||||
loggingConsumer1 = new Slf4jLogConsumer(LoggerFactory.getLogger(LazyFsIT.class)).withPrefix("1-" + testInfo.getDisplayName());
|
||||
container1.followOutput(loggingConsumer1.andThen(waitingConsumer1));
|
||||
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
|
||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
|
||||
|
||||
checkConsistency(testInfo.getDisplayName());
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@EnumSource(CrashType.class)
|
||||
void killTest2(CrashType crashType, TestInfo testInfo) throws Exception {
|
||||
var barrier = new CountDownLatch(1);
|
||||
executor.submit(() -> {
|
||||
try {
|
||||
Log.info("Writing to container 1");
|
||||
barrier.countDown();
|
||||
container1.execInContainer("/bin/sh", "-c", "counter=0; while [ ! -f /tmp/stopprinting1 ]; do counter=`expr $counter + 1`; echo $counter >> /dhfs_test/fuse/test1; done");
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
});
|
||||
barrier.await();
|
||||
Thread.sleep(3000);
|
||||
Log.info("Killing");
|
||||
lazyFs2.crash();
|
||||
container1.execInContainer("/bin/sh", "-c", "touch /tmp/stopprinting1");
|
||||
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Caused by: org.lmdbjava"), 60, TimeUnit.SECONDS);
|
||||
var client = DockerClientFactory.instance().client();
|
||||
client.killContainerCmd(container2.getContainerId()).exec();
|
||||
container2.stop();
|
||||
lazyFs2.stop();
|
||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Lost connection to"), 60, TimeUnit.SECONDS);
|
||||
Log.info("Restart");
|
||||
switch (crashType) {
|
||||
case CRASH -> lazyFs2.start();
|
||||
case TORN_OP -> lazyFs2.startTornOp();
|
||||
case TORN_SEQ -> lazyFs2.startTornSeq();
|
||||
}
|
||||
container2.start();
|
||||
|
||||
waitingConsumer2 = new WaitingConsumer();
|
||||
var loggingConsumer2 = new Slf4jLogConsumer(LoggerFactory.getLogger(LazyFsIT.class)).withPrefix("2-" + testInfo.getDisplayName());
|
||||
container2.followOutput(loggingConsumer2.andThen(waitingConsumer2));
|
||||
try {
|
||||
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
|
||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
|
||||
} catch (TimeoutException e) {
|
||||
Log.info("Failed to connect: " + testInfo.getDisplayName());
|
||||
// Sometimes it doesn't get mounted properly for some reason
|
||||
|
||||
Assumptions.assumeTrue(false);
|
||||
}
|
||||
var barrier2 = new CountDownLatch(1);
|
||||
executor.submit(() -> {
|
||||
try {
|
||||
Log.info("Writing to container 1");
|
||||
barrier2.countDown();
|
||||
container1.execInContainer("/bin/sh", "-c", "counter=0; while [ ! -f /tmp/stopprinting2 ]; do counter=`expr $counter + 1`; echo $counter >> /dhfs_test/fuse/test2; done");
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
});
|
||||
barrier2.await();
|
||||
Log.info("Killing");
|
||||
Thread.sleep(3000);
|
||||
if (crashType.equals(CrashType.CRASH)) {
|
||||
lazyFs2.crash();
|
||||
}
|
||||
container1.execInContainer("/bin/sh", "-c", "touch /tmp/stopprinting2");
|
||||
try {
|
||||
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Caused by: org.lmdbjava"), 60, TimeUnit.SECONDS);
|
||||
} catch (TimeoutException e) {
|
||||
// Sometimes crash doesn't work
|
||||
Log.info("Failed to crash: " + testInfo.getDisplayName());
|
||||
if (crashType.equals(CrashType.CRASH))
|
||||
throw e;
|
||||
Assumptions.assumeTrue(false);
|
||||
}
|
||||
client.killContainerCmd(container2.getContainerId()).exec();
|
||||
container2.stop();
|
||||
lazyFs2.stop();
|
||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Lost connection to"), 60, TimeUnit.SECONDS);
|
||||
Log.info("Restart");
|
||||
lazyFs2.start();
|
||||
container2.start();
|
||||
|
||||
waitingConsumer2 = new WaitingConsumer();
|
||||
loggingConsumer2 = new Slf4jLogConsumer(LoggerFactory.getLogger(LazyFsIT.class)).withPrefix("2-" + testInfo.getDisplayName());
|
||||
container2.followOutput(loggingConsumer2.andThen(waitingConsumer2));
|
||||
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
|
||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
|
||||
|
||||
checkConsistency(testInfo.getDisplayName());
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@EnumSource(CrashType.class)
|
||||
void killTestDirs2(CrashType crashType, TestInfo testInfo) throws Exception {
|
||||
var barrier = new CountDownLatch(1);
|
||||
executor.submit(() -> {
|
||||
try {
|
||||
Log.info("Writing to container 1");
|
||||
barrier.countDown();
|
||||
container1.execInContainer("/bin/sh", "-c", "counter=0; while [ ! -f /tmp/stopprinting1 ]; do counter=`expr $counter + 1`; echo $counter >> /dhfs_test/fuse/test$counter; done");
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
});
|
||||
barrier.await();
|
||||
Thread.sleep(3000);
|
||||
Log.info("Killing");
|
||||
lazyFs2.crash();
|
||||
container1.execInContainer("/bin/sh", "-c", "touch /tmp/stopprinting1");
|
||||
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Caused by: org.lmdbjava"), 60, TimeUnit.SECONDS);
|
||||
var client = DockerClientFactory.instance().client();
|
||||
client.killContainerCmd(container2.getContainerId()).exec();
|
||||
container2.stop();
|
||||
lazyFs2.stop();
|
||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Lost connection to"), 60, TimeUnit.SECONDS);
|
||||
Log.info("Restart");
|
||||
switch (crashType) {
|
||||
case CRASH -> lazyFs2.start();
|
||||
case TORN_OP -> lazyFs2.startTornOp();
|
||||
case TORN_SEQ -> lazyFs2.startTornSeq();
|
||||
}
|
||||
container2.start();
|
||||
|
||||
waitingConsumer2 = new WaitingConsumer();
|
||||
var loggingConsumer2 = new Slf4jLogConsumer(LoggerFactory.getLogger(LazyFsIT.class)).withPrefix("2-" + testInfo.getDisplayName());
|
||||
container2.followOutput(loggingConsumer2.andThen(waitingConsumer2));
|
||||
try {
|
||||
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
|
||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
|
||||
} catch (TimeoutException e) {
|
||||
Log.info("Failed to connect: " + testInfo.getDisplayName());
|
||||
// Sometimes it doesn't get mounted properly for some reason
|
||||
Assumptions.assumeTrue(false);
|
||||
}
|
||||
|
||||
var barrier2 = new CountDownLatch(1);
|
||||
executor.submit(() -> {
|
||||
try {
|
||||
Log.info("Writing to container 1");
|
||||
barrier2.countDown();
|
||||
container1.execInContainer("/bin/sh", "-c", "counter=0; while [ ! -f /tmp/stopprinting2 ]; do counter=`expr $counter + 1`; echo $counter >> /dhfs_test/fuse/2test$counter; done");
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
});
|
||||
barrier2.await();
|
||||
Thread.sleep(3000);
|
||||
Log.info("Killing");
|
||||
if (crashType.equals(CrashType.CRASH)) {
|
||||
lazyFs2.crash();
|
||||
}
|
||||
container1.execInContainer("/bin/sh", "-c", "touch /tmp/stopprinting2");
|
||||
try {
|
||||
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Caused by: org.lmdbjava"), 60, TimeUnit.SECONDS);
|
||||
} catch (TimeoutException e) {
|
||||
// Sometimes crash doesn't work
|
||||
Log.info("Failed to crash: " + testInfo.getDisplayName());
|
||||
if (crashType.equals(CrashType.CRASH))
|
||||
throw e;
|
||||
Assumptions.assumeTrue(false);
|
||||
}
|
||||
client.killContainerCmd(container2.getContainerId()).exec();
|
||||
container2.stop();
|
||||
lazyFs2.stop();
|
||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Lost connection to"), 60, TimeUnit.SECONDS);
|
||||
Log.info("Restart");
|
||||
lazyFs2.start();
|
||||
container2.start();
|
||||
|
||||
waitingConsumer2 = new WaitingConsumer();
|
||||
loggingConsumer2 = new Slf4jLogConsumer(LoggerFactory.getLogger(LazyFsIT.class)).withPrefix("2-" + testInfo.getDisplayName());
|
||||
container2.followOutput(loggingConsumer2.andThen(waitingConsumer2));
|
||||
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
|
||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
|
||||
|
||||
checkConsistency(testInfo.getDisplayName());
|
||||
}
|
||||
|
||||
|
||||
private static enum CrashType {
|
||||
CRASH,
|
||||
TORN_OP,
|
||||
TORN_SEQ
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
@@ -0,0 +1,174 @@
|
||||
package com.usatiuk.dhfsfuse.integration;
|
||||
|
||||
import com.github.dockerjava.api.model.Device;
|
||||
import org.junit.jupiter.api.*;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.testcontainers.containers.GenericContainer;
|
||||
import org.testcontainers.containers.Network;
|
||||
import org.testcontainers.containers.output.Slf4jLogConsumer;
|
||||
import org.testcontainers.containers.output.WaitingConsumer;
|
||||
import org.testcontainers.containers.wait.strategy.Wait;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.time.Duration;
|
||||
import java.util.Objects;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import static org.awaitility.Awaitility.await;
|
||||
|
||||
public class ResyncIT {
|
||||
GenericContainer<?> container1;
|
||||
GenericContainer<?> container2;
|
||||
|
||||
WaitingConsumer waitingConsumer1;
|
||||
WaitingConsumer waitingConsumer2;
|
||||
|
||||
String c1uuid;
|
||||
String c2uuid;
|
||||
|
||||
Network network;
|
||||
|
||||
@BeforeEach
|
||||
void setup(TestInfo testInfo) throws IOException, InterruptedException, TimeoutException {
|
||||
network = Network.newNetwork();
|
||||
|
||||
container1 = new GenericContainer<>(DhfsImage.getInstance())
|
||||
.withPrivilegedMode(true)
|
||||
.withCreateContainerCmdModifier(cmd -> Objects.requireNonNull(cmd.getHostConfig()).withDevices(Device.parse("/dev/fuse")))
|
||||
.waitingFor(Wait.forLogMessage(".*Listening.*", 1).withStartupTimeout(Duration.ofSeconds(60))).withNetwork(network);
|
||||
container2 = new GenericContainer<>(DhfsImage.getInstance())
|
||||
.withPrivilegedMode(true)
|
||||
.withCreateContainerCmdModifier(cmd -> Objects.requireNonNull(cmd.getHostConfig()).withDevices(Device.parse("/dev/fuse")))
|
||||
.waitingFor(Wait.forLogMessage(".*Listening.*", 1).withStartupTimeout(Duration.ofSeconds(60))).withNetwork(network);
|
||||
|
||||
Stream.of(container1, container2).parallel().forEach(GenericContainer::start);
|
||||
|
||||
waitingConsumer1 = new WaitingConsumer();
|
||||
var loggingConsumer1 = new Slf4jLogConsumer(LoggerFactory.getLogger(DhfsFuseIT.class)).withPrefix("1-" + testInfo.getDisplayName());
|
||||
container1.followOutput(loggingConsumer1.andThen(waitingConsumer1));
|
||||
waitingConsumer2 = new WaitingConsumer();
|
||||
var loggingConsumer2 = new Slf4jLogConsumer(LoggerFactory.getLogger(DhfsFuseIT.class)).withPrefix("2-" + testInfo.getDisplayName());
|
||||
container2.followOutput(loggingConsumer2.andThen(waitingConsumer2));
|
||||
}
|
||||
|
||||
@AfterEach
|
||||
void stop() {
|
||||
Stream.of(container1, container2).parallel().forEach(GenericContainer::stop);
|
||||
network.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
void readWriteFileTest() throws IOException, InterruptedException, TimeoutException {
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo tesempty > /dhfs_test/fuse/testf1").getExitCode());
|
||||
c1uuid = container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/data/stuff/self_uuid").getStdout();
|
||||
c2uuid = container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/data/stuff/self_uuid").getStdout();
|
||||
|
||||
Assertions.assertDoesNotThrow(() -> UUID.fromString(c1uuid));
|
||||
Assertions.assertDoesNotThrow(() -> UUID.fromString(c2uuid));
|
||||
|
||||
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("New address"), 60, TimeUnit.SECONDS);
|
||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("New address"), 60, TimeUnit.SECONDS);
|
||||
|
||||
var c1curl = container1.execInContainer("/bin/sh", "-c",
|
||||
"curl --header \"Content-Type: application/json\" " +
|
||||
" --request PUT " +
|
||||
" --data '{}' " +
|
||||
" http://localhost:8080/peers-manage/known-peers/" + c2uuid);
|
||||
|
||||
var c2curl = container2.execInContainer("/bin/sh", "-c",
|
||||
"curl --header \"Content-Type: application/json\" " +
|
||||
" --request PUT " +
|
||||
" --data '{}' " +
|
||||
" http://localhost:8080/peers-manage/known-peers/" + c1uuid);
|
||||
|
||||
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
|
||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
|
||||
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> "tesempty\n".equals(container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testf1").getStdout()));
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
void manyFiles() throws IOException, InterruptedException, TimeoutException {
|
||||
var ret = container1.execInContainer("/bin/sh", "-c", "for i in $(seq 1 200); do echo $i > /dhfs_test/fuse/test$i; done");
|
||||
Assertions.assertEquals(0, ret.getExitCode());
|
||||
var foundWc = container1.execInContainer("/bin/sh", "-c", "find /dhfs_test/fuse -type f | wc -l");
|
||||
Assertions.assertEquals(200, Integer.valueOf(foundWc.getStdout().strip()));
|
||||
|
||||
ret = container2.execInContainer("/bin/sh", "-c", "for i in $(seq 1 200); do echo $i > /dhfs_test/fuse/test-2-$i; done");
|
||||
Assertions.assertEquals(0, ret.getExitCode());
|
||||
foundWc = container2.execInContainer("/bin/sh", "-c", "find /dhfs_test/fuse -type f | wc -l");
|
||||
Assertions.assertEquals(200, Integer.valueOf(foundWc.getStdout().strip()));
|
||||
|
||||
c1uuid = container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/data/stuff/self_uuid").getStdout();
|
||||
c2uuid = container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/data/stuff/self_uuid").getStdout();
|
||||
|
||||
Assertions.assertDoesNotThrow(() -> UUID.fromString(c1uuid));
|
||||
Assertions.assertDoesNotThrow(() -> UUID.fromString(c2uuid));
|
||||
|
||||
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("New address"), 60, TimeUnit.SECONDS);
|
||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("New address"), 60, TimeUnit.SECONDS);
|
||||
|
||||
var c1curl = container1.execInContainer("/bin/sh", "-c",
|
||||
"curl --header \"Content-Type: application/json\" " +
|
||||
" --request PUT " +
|
||||
" --data '{}' " +
|
||||
" http://localhost:8080/peers-manage/known-peers/" + c2uuid);
|
||||
|
||||
var c2curl = container2.execInContainer("/bin/sh", "-c",
|
||||
"curl --header \"Content-Type: application/json\" " +
|
||||
" --request PUT " +
|
||||
" --data '{}' " +
|
||||
" http://localhost:8080/peers-manage/known-peers/" + c1uuid);
|
||||
|
||||
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
|
||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
|
||||
await().atMost(120, TimeUnit.SECONDS).until(() -> {
|
||||
var foundWc2 = container2.execInContainer("/bin/sh", "-c", "find /dhfs_test/fuse -type f | wc -l");
|
||||
return 400 == Integer.valueOf(foundWc2.getStdout().strip());
|
||||
});
|
||||
await().atMost(120, TimeUnit.SECONDS).until(() -> {
|
||||
var foundWc2 = container1.execInContainer("/bin/sh", "-c", "find /dhfs_test/fuse -type f | wc -l");
|
||||
return 400 == Integer.valueOf(foundWc2.getStdout().strip());
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
void folderAfterMove() throws IOException, InterruptedException, TimeoutException {
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "mkdir /dhfs_test/fuse/testd1").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo tesempty1 > /dhfs_test/fuse/testd1/testf1").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "mv /dhfs_test/fuse/testd1 /dhfs_test/fuse/testd2").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo tesempty2 > /dhfs_test/fuse/testd2/testf2").getExitCode());
|
||||
|
||||
c1uuid = container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/data/stuff/self_uuid").getStdout();
|
||||
c2uuid = container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/data/stuff/self_uuid").getStdout();
|
||||
|
||||
Assertions.assertDoesNotThrow(() -> UUID.fromString(c1uuid));
|
||||
Assertions.assertDoesNotThrow(() -> UUID.fromString(c2uuid));
|
||||
|
||||
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("New address"), 60, TimeUnit.SECONDS);
|
||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("New address"), 60, TimeUnit.SECONDS);
|
||||
|
||||
var c1curl = container1.execInContainer("/bin/sh", "-c",
|
||||
"curl --header \"Content-Type: application/json\" " +
|
||||
" --request PUT " +
|
||||
" --data '{}' " +
|
||||
" http://localhost:8080/peers-manage/known-peers/" + c2uuid);
|
||||
|
||||
var c2curl = container2.execInContainer("/bin/sh", "-c",
|
||||
"curl --header \"Content-Type: application/json\" " +
|
||||
" --request PUT " +
|
||||
" --data '{}' " +
|
||||
" http://localhost:8080/peers-manage/known-peers/" + c1uuid);
|
||||
|
||||
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
|
||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
|
||||
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> "tesempty1\n".equals(container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testd2/testf1").getStdout()));
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> "tesempty2\n".equals(container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testd2/testf2").getStdout()));
|
||||
}
|
||||
|
||||
}
|
||||
Reference in New Issue
Block a user