diff --git a/.github/workflows/server.yml b/.github/workflows/server.yml index 2b140c66..9add034e 100644 --- a/.github/workflows/server.yml +++ b/.github/workflows/server.yml @@ -20,26 +20,21 @@ jobs: steps: - name: Checkout uses: actions/checkout@v4 + with: + submodules: 'recursive' - name: Install sudo for ACT run: apt-get update && apt-get install -y sudo if: env.ACT=='true' - - name: Install fuse and maven - run: sudo apt-get update && sudo apt-get install -y libfuse2 + - name: Install FUSE + run: sudo apt-get update && sudo apt-get install -y libfuse2 libfuse3-dev libfuse3-3 fuse3 - - name: Download maven - run: | - cd "$HOME" - mkdir maven-bin - curl -s -L https://dlcdn.apache.org/maven/maven-3/3.9.9/binaries/apache-maven-3.9.9-bin.tar.gz | tar xvz --strip-components=1 -C maven-bin - echo "$HOME"/maven-bin/bin >> $GITHUB_PATH - - - name: Maven info - run: | - echo $GITHUB_PATH - echo $PATH - mvn -v + - name: User allow other for fuse + run: echo "user_allow_other" | sudo tee -a /etc/fuse.conf + + - name: Dump fuse.conf + run: cat /etc/fuse.conf - name: Set up JDK 21 uses: actions/setup-java@v4 @@ -48,6 +43,9 @@ jobs: distribution: "zulu" cache: maven + - name: Build LazyFS + run: cd thirdparty/lazyfs/ && ./build.sh + - name: Test with Maven run: cd dhfs-parent && mvn -T $(nproc) --batch-mode --update-snapshots package verify diff --git a/.gitmodules b/.gitmodules new file mode 100644 index 00000000..caf16ce6 --- /dev/null +++ b/.gitmodules @@ -0,0 +1,3 @@ +[submodule "thirdparty/lazyfs/lazyfs"] + path = thirdparty/lazyfs/lazyfs + url = git@github.com:dsrhaslab/lazyfs.git diff --git a/dhfs-parent/dhfs-app/src/test/java/com/usatiuk/dhfs/integration/LazyFs.java b/dhfs-parent/dhfs-app/src/test/java/com/usatiuk/dhfs/integration/LazyFs.java new file mode 100644 index 00000000..abe95ccb --- /dev/null +++ b/dhfs-parent/dhfs-app/src/test/java/com/usatiuk/dhfs/integration/LazyFs.java @@ -0,0 +1,177 @@ +package com.usatiuk.dhfs.integration; + +import io.quarkus.logging.Log; + +import java.io.*; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; + +public class LazyFs { + private static final String lazyFsPath; + + private final String dataRoot; + private final String lazyFsDataPath; + private final String name; + private final File tmpConfigFile; + private final File fifoPath; + + static { + lazyFsPath = System.getProperty("lazyFsPath"); + System.out.println("LazyFs Path: " + lazyFsPath); + } + + public LazyFs(String name, String dataRoot, String lazyFsDataPath) { + this.name = name; + this.dataRoot = dataRoot; + this.lazyFsDataPath = lazyFsDataPath; + + try { + tmpConfigFile = File.createTempFile("lazyfs", ".conf"); + tmpConfigFile.deleteOnExit(); + + fifoPath = new File("/tmp/" + ThreadLocalRandom.current().nextLong() + ".faultsfifo"); + fifoPath.deleteOnExit(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private Thread outputPiper; + private Process fs; + + private String fifoPath() { + return fifoPath.getAbsolutePath(); + } + + public void start() { + var lfsPath = Path.of(lazyFsPath).resolve("build").resolve("lazyfs"); + if (!lfsPath.toFile().isFile()) + throw new IllegalStateException("LazyFs binary does not exist: " + lfsPath.toAbsolutePath()); + if (!lfsPath.toFile().canExecute()) + throw new IllegalStateException("LazyFs binary is not executable: " + lfsPath.toAbsolutePath()); + + try (var rwFile = new RandomAccessFile(tmpConfigFile, "rw"); + var channel = rwFile.getChannel()) { + channel.truncate(0); + var config = "[faults]\n" + + "fifo_path=\"" + fifoPath() + "\"\n" + + "[cache]\n" + + "apply_eviction=false\n" + + "[cache.simple]\n" + + "custom_size=\"1gb\"\n" + + "blocks_per_page=1\n" + + "[filesystem]\n" + + "log_all_operations=false\n" + + "logfile=\"\""; + rwFile.write(config.getBytes()); + Log.info("LazyFs config: \n" + config); + } catch (Exception e) { + throw new RuntimeException(e); + } + + var argList = new ArrayList(); + + argList.add(lfsPath.toString()); + argList.add(Path.of(dataRoot).toString()); + argList.add("--config-path"); + argList.add(tmpConfigFile.getAbsolutePath()); + argList.add("-o"); + argList.add("allow_other"); + argList.add("-o"); + argList.add("modules=subdir"); + argList.add("-o"); + argList.add("subdir=" + Path.of(lazyFsDataPath).toAbsolutePath().toString()); + try { + Log.info("Starting LazyFs " + argList); + fs = Runtime.getRuntime().exec(argList.toArray(String[]::new)); + Thread.sleep(1000); + } catch (Exception e) { + throw new RuntimeException(e); + } + + outputPiper = new Thread(() -> { + try { + try (BufferedReader input = new BufferedReader(new InputStreamReader(fs.getInputStream()))) { + String line; + + while ((line = input.readLine()) != null) { + System.out.println(line); + } + } + } catch (Exception e) { + Log.info("Exception in LazyFs piper", e); + } + }); + outputPiper.start(); + outputPiper = new Thread(() -> { + try { + try (BufferedReader input = new BufferedReader(new InputStreamReader(fs.getErrorStream()))) { + String line; + + while ((line = input.readLine()) != null) { + System.out.println(line); + } + } + } catch (Exception e) { + Log.info("Exception in LazyFs piper", e); + } + }); + outputPiper.start(); + } + + public void crash() { + try { + var cmd = "echo \"lazyfs::crash::timing=after::op=write::from_rgx=*\" > " + fifoPath(); + System.out.println("Running command: " + cmd); + Runtime.getRuntime().exec(new String[]{"/bin/sh", "-c", cmd}); + stop(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public void stop() { + try { + if (fs == null) { + return; + } + Runtime.getRuntime().exec(new String[]{"/bin/sh", "-c", "fusermount3 -u " + dataRoot}); + if (!fs.waitFor(1, TimeUnit.SECONDS)) + throw new RuntimeException("LazyFs process did not stop in time"); + fs = null; + } catch (Exception e) { + throw new RuntimeException(e); + } + } + +// Doesn't actually work? +// +// public void crashop() { +// try { +// var cmd = "echo \"lazyfs::torn-op::file=" + Path.of(lazyFsDataPath).toAbsolutePath().toString() + "/objects/data.mdb::persist=1,3::parts=3::occurrence=5\" > /tmp/faults.fifo"; +// System.out.println("Running command: " + cmd); +// Runtime.getRuntime().exec(new String[]{"/bin/sh", "-c", cmd}); +// Thread.sleep(1000); +// Runtime.getRuntime().exec(new String[]{"/bin/sh", "-c", "fusermount3 -u " + dataRoot}); +// Thread.sleep(1000); +// } catch (Exception e) { +// throw new RuntimeException(e); +// } +// } +// +// public void crashseq() { +// try { +// var cmd = "echo \"lazyfs::torn-seq::op=write::file=" + Path.of(lazyFsDataPath).toAbsolutePath().toString() + "/objects/data.mdb::persist=1,4::occurrence=2\" > /tmp/faults.fifo"; +// System.out.println("Running command: " + cmd); +// Runtime.getRuntime().exec(new String[]{"/bin/sh", "-c", cmd}); +// Thread.sleep(1000); +// Runtime.getRuntime().exec(new String[]{"/bin/sh", "-c", "fusermount3 -u " + dataRoot}); +// Thread.sleep(1000); +// } catch (Exception e) { +// throw new RuntimeException(e); +// } +// } +} + diff --git a/dhfs-parent/dhfs-app/src/test/java/com/usatiuk/dhfs/integration/LazyFsIT.java b/dhfs-parent/dhfs-app/src/test/java/com/usatiuk/dhfs/integration/LazyFsIT.java new file mode 100644 index 00000000..a1dc72e2 --- /dev/null +++ b/dhfs-parent/dhfs-app/src/test/java/com/usatiuk/dhfs/integration/LazyFsIT.java @@ -0,0 +1,199 @@ +package com.usatiuk.dhfs.integration; + +import com.github.dockerjava.api.model.Device; +import com.usatiuk.dhfs.TestDataCleaner; +import io.quarkus.logging.Log; +import org.junit.jupiter.api.*; +import org.slf4j.LoggerFactory; +import org.testcontainers.DockerClientFactory; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.Network; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.containers.output.WaitingConsumer; +import org.testcontainers.containers.wait.strategy.Wait; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.time.Duration; +import java.util.Objects; +import java.util.UUID; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.stream.Stream; + +import static org.awaitility.Awaitility.await; + +public class LazyFsIT { + GenericContainer container1; + GenericContainer container2; + + WaitingConsumer waitingConsumer1; + WaitingConsumer waitingConsumer2; + + String c1uuid; + String c2uuid; + + File data1; + File data2; + File data1Lazy; + + LazyFs lazyFs1; + + @BeforeEach + void setup(TestInfo testInfo) throws IOException, InterruptedException, TimeoutException { + data1 = Files.createTempDirectory("dhfsdata").toFile(); + data2 = Files.createTempDirectory("dhfsdata").toFile(); + data1Lazy = Files.createTempDirectory("lazyfsroot").toFile(); + + Network network = Network.newNetwork(); + + lazyFs1 = new LazyFs(testInfo.getDisplayName(), data1.toString(), data1Lazy.toString()); + lazyFs1.start(); + + container1 = new GenericContainer<>(DhfsImage.getInstance()) + .withPrivilegedMode(true) + .withCreateContainerCmdModifier(cmd -> Objects.requireNonNull(cmd.getHostConfig()).withDevices(Device.parse("/dev/fuse"))) + .waitingFor(Wait.forLogMessage(".*Listening.*", 1).withStartupTimeout(Duration.ofSeconds(60))).withNetwork(network) + .withFileSystemBind(data1.getAbsolutePath(), "/dhfs_test/data"); + container2 = new GenericContainer<>(DhfsImage.getInstance()) + .withPrivilegedMode(true) + .withCreateContainerCmdModifier(cmd -> Objects.requireNonNull(cmd.getHostConfig()).withDevices(Device.parse("/dev/fuse"))) + .waitingFor(Wait.forLogMessage(".*Listening.*", 1).withStartupTimeout(Duration.ofSeconds(60))).withNetwork(network) + .withFileSystemBind(data2.getAbsolutePath(), "/dhfs_test/data"); + + Stream.of(container1, container2).parallel().forEach(GenericContainer::start); + + waitingConsumer1 = new WaitingConsumer(); + var loggingConsumer1 = new Slf4jLogConsumer(LoggerFactory.getLogger(LazyFsIT.class)).withPrefix("1-" + testInfo.getDisplayName()); + container1.followOutput(loggingConsumer1.andThen(waitingConsumer1)); + waitingConsumer2 = new WaitingConsumer(); + var loggingConsumer2 = new Slf4jLogConsumer(LoggerFactory.getLogger(LazyFsIT.class)).withPrefix("2-" + testInfo.getDisplayName()); + container2.followOutput(loggingConsumer2.andThen(waitingConsumer2)); + + c1uuid = container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/data/stuff/self_uuid").getStdout(); + c2uuid = container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/data/stuff/self_uuid").getStdout(); + + Assertions.assertDoesNotThrow(() -> UUID.fromString(c1uuid)); + Assertions.assertDoesNotThrow(() -> UUID.fromString(c2uuid)); + + waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("New address"), 60, TimeUnit.SECONDS); + waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("New address"), 60, TimeUnit.SECONDS); + + var c1curl = container1.execInContainer("/bin/sh", "-c", + "curl --header \"Content-Type: application/json\" " + + " --request PUT " + + " --data '{\"uuid\":\"" + c2uuid + "\"}' " + + " http://localhost:8080/peers-manage/known-peers"); + + var c2curl = container2.execInContainer("/bin/sh", "-c", + "curl --header \"Content-Type: application/json\" " + + " --request PUT " + + " --data '{\"uuid\":\"" + c1uuid + "\"}' " + + " http://localhost:8080/peers-manage/known-peers"); + + waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS); + waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS); + } + + @AfterEach + void stop() { + lazyFs1.stop(); + + Stream.of(container1, container2).parallel().forEach(GenericContainer::stop); + TestDataCleaner.purgeDirectory(data1); + TestDataCleaner.purgeDirectory(data1Lazy); + TestDataCleaner.purgeDirectory(data2); + } + + @Test + void killTest(TestInfo testInfo) throws Exception { + var executor = Executors.newFixedThreadPool(2); + var barrier = new CyclicBarrier(2); + var ret1 = executor.submit(() -> { + try { + Log.info("Writing to container 1"); + barrier.await(); + container1.execInContainer("/bin/sh", "-c", "counter=0; while true; do counter=`expr $counter + 1`; echo $counter >> /dhfs_test/fuse/test1; done"); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + barrier.await(); + Thread.sleep(1000); + Log.info("Killing"); + lazyFs1.crash(); + + waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Caused by: org.lmdbjava"), 5, TimeUnit.SECONDS); + Thread.sleep(1000); + var client = DockerClientFactory.instance().client(); + client.killContainerCmd(container1.getContainerId()).exec(); + container1.stop(); + waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Lost connection to"), 60, TimeUnit.SECONDS); + Log.info("Restart"); + lazyFs1.start(); + container1.start(); + waitingConsumer1 = new WaitingConsumer(); + var loggingConsumer1 = new Slf4jLogConsumer(LoggerFactory.getLogger(LazyFsIT.class)).withPrefix("1-" + testInfo.getDisplayName()); + container1.followOutput(loggingConsumer1.andThen(waitingConsumer1)); + waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS); + waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS); + + await().atMost(45, TimeUnit.SECONDS).until(() -> { + Log.info("Listing consistency"); + var ls1 = container1.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse"); + var cat1 = container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/*"); + var ls2 = container2.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse"); + var cat2 = container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/*"); + Log.info(ls1); + Log.info(cat1); + Log.info(ls2); + Log.info(cat2); + + return ls1.equals(ls2) && cat1.equals(cat2); + }); + } + +// @Test +// void killTestDirs(TestInfo testInfo) throws Exception { +// var executor = Executors.newFixedThreadPool(2); +// var barrier = new CyclicBarrier(2); +// var ret1 = executor.submit(() -> { +// try { +// Log.info("Writing to container 1"); +// barrier.await(); +// container1.execInContainer("/bin/sh", "-c", "counter=0; while true; do counter=`expr $counter + 1`; echo $counter >> /dhfs_test/fuse/test$counter; done"); +// } catch (Exception e) { +// throw new RuntimeException(e); +// } +// }); +// barrier.await(); +// Thread.sleep(10000); +// var client = DockerClientFactory.instance().client(); +// client.killContainerCmd(container1.getContainerId()).exec(); +// container1.stop(); +// waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Lost connection to"), 60, TimeUnit.SECONDS); +// container1.start(); +// waitingConsumer1 = new WaitingConsumer(); +// var loggingConsumer1 = new Slf4jLogConsumer(LoggerFactory.getLogger(LazyFsIT.class)).withPrefix("1-" + testInfo.getDisplayName()); +// container1.followOutput(loggingConsumer1.andThen(waitingConsumer1)); +// waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS); +// waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS); +// +// await().atMost(45, TimeUnit.SECONDS).until(() -> { +// Log.info("Listing consistency"); +// var ls1 = container1.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse"); +// var cat1 = container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/*"); +// var ls2 = container2.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse"); +// var cat2 = container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/*"); +// Log.info(ls1); +// Log.info(cat1); +// Log.info(ls2); +// Log.info(cat2); +// +// return ls1.equals(ls2) && cat1.equals(cat2); +// }); +// } +} diff --git a/dhfs-parent/pom.xml b/dhfs-parent/pom.xml index 2214ac9a..aa87de41 100644 --- a/dhfs-parent/pom.xml +++ b/dhfs-parent/pom.xml @@ -144,6 +144,7 @@ org.jboss.logmanager.LogManager ${project.build.directory} + ${project.basedir}/../../thirdparty/lazyfs/lazyfs/lazyfs ${dhfs.native-libs-dir} ${maven.home} diff --git a/thirdparty/lazyfs/build.sh b/thirdparty/lazyfs/build.sh new file mode 100755 index 00000000..d3f3ed81 --- /dev/null +++ b/thirdparty/lazyfs/build.sh @@ -0,0 +1,9 @@ +# apt install g++ cmake libfuse3-dev libfuse3-3 fuse3 + +export CMAKE_BUILD_PARALLEL_LEVEL="$(nproc)" + +cd lazyfs + +cd libs/libpcache && ./build.sh && cd - + +cd lazyfs && ./build.sh && cd - diff --git a/thirdparty/lazyfs/lazyfs b/thirdparty/lazyfs/lazyfs new file mode 160000 index 00000000..7b137f90 --- /dev/null +++ b/thirdparty/lazyfs/lazyfs @@ -0,0 +1 @@ +Subproject commit 7b137f90ef85168dc1ffc5dfe651b070aefa88d3