9 Commits

34 changed files with 593 additions and 337 deletions

View File

@@ -20,26 +20,21 @@ jobs:
steps:
- name: Checkout
uses: actions/checkout@v4
with:
submodules: 'recursive'
- name: Install sudo for ACT
run: apt-get update && apt-get install -y sudo
if: env.ACT=='true'
- name: Install fuse and maven
run: sudo apt-get update && sudo apt-get install -y libfuse2
- name: Install FUSE
run: sudo apt-get update && sudo apt-get install -y libfuse2 libfuse3-dev libfuse3-3 fuse3
- name: Download maven
run: |
cd "$HOME"
mkdir maven-bin
curl -s -L https://dlcdn.apache.org/maven/maven-3/3.9.9/binaries/apache-maven-3.9.9-bin.tar.gz | tar xvz --strip-components=1 -C maven-bin
echo "$HOME"/maven-bin/bin >> $GITHUB_PATH
- name: Maven info
run: |
echo $GITHUB_PATH
echo $PATH
mvn -v
- name: User allow other for fuse
run: echo "user_allow_other" | sudo tee -a /etc/fuse.conf
- name: Dump fuse.conf
run: cat /etc/fuse.conf
- name: Set up JDK 21
uses: actions/setup-java@v4
@@ -48,6 +43,9 @@ jobs:
distribution: "zulu"
cache: maven
- name: Build LazyFS
run: cd thirdparty/lazyfs/ && ./build.sh
- name: Test with Maven
run: cd dhfs-parent && mvn -T $(nproc) --batch-mode --update-snapshots package verify

3
.gitmodules vendored Normal file
View File

@@ -0,0 +1,3 @@
[submodule "thirdparty/lazyfs/lazyfs"]
path = thirdparty/lazyfs/lazyfs
url = git@github.com:dsrhaslab/lazyfs.git

View File

@@ -72,26 +72,6 @@
<artifactId>quarkus-junit5</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.github.SerCeMan</groupId>
<artifactId>jnr-fuse</artifactId>
<version>44ed40f8ce</version>
</dependency>
<dependency>
<groupId>com.github.jnr</groupId>
<artifactId>jnr-ffi</artifactId>
<version>2.2.16</version>
</dependency>
<dependency>
<groupId>com.github.jnr</groupId>
<artifactId>jnr-posix</artifactId>
<version>3.1.19</version>
</dependency>
<dependency>
<groupId>com.github.jnr</groupId>
<artifactId>jnr-constants</artifactId>
<version>0.10.4</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>

View File

@@ -35,13 +35,15 @@ public class DhfsFusex3IT {
String c2uuid;
String c3uuid;
Network network;
// This calculation is somewhat racy, so keep it hardcoded for now
long emptyFileCount = 9;
@BeforeEach
void setup(TestInfo testInfo) throws IOException, InterruptedException, TimeoutException {
// TODO: Dedup
Network network = Network.newNetwork();
network = Network.newNetwork();
container1 = new GenericContainer<>(DhfsImage.getInstance())
.withPrivilegedMode(true)
@@ -252,21 +254,22 @@ public class DhfsFusex3IT {
await().atMost(45, TimeUnit.SECONDS).until(() -> "tesempty\n".equals(container3.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testf").getStdout()));
var client = DockerClientFactory.instance().client();
client.pauseContainerCmd(container1.getContainerId()).exec();
client.pauseContainerCmd(container2.getContainerId()).exec();
// Pauses needed as otherwise docker buffers some incoming packets
client.disconnectFromNetworkCmd().withContainerId(container1.getContainerId()).withNetworkId(network.getId()).exec();
client.disconnectFromNetworkCmd().withContainerId(container2.getContainerId()).withNetworkId(network.getId()).exec();
client.disconnectFromNetworkCmd().withContainerId(container3.getContainerId()).withNetworkId(network.getId()).exec();
waitingConsumer3.waitUntil(frame -> frame.getUtf8String().contains("Lost connection to"), 60, TimeUnit.SECONDS, 2);
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container3.execInContainer("/bin/sh", "-c", "echo test3 >> /dhfs_test/fuse/testf").getExitCode());
client.pauseContainerCmd(container3.getContainerId()).exec();
client.unpauseContainerCmd(container2.getContainerId()).exec();
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Lost connection to"), 60, TimeUnit.SECONDS, 2);
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "echo test2 >> /dhfs_test/fuse/testf").getExitCode());
client.pauseContainerCmd(container2.getContainerId()).exec();
client.unpauseContainerCmd(container1.getContainerId()).exec();
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Lost connection to"), 60, TimeUnit.SECONDS, 2);
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container3.execInContainer("/bin/sh", "-c", "echo test3 >> /dhfs_test/fuse/testf").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "echo test2 >> /dhfs_test/fuse/testf").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo test1 >> /dhfs_test/fuse/testf").getExitCode());
client.unpauseContainerCmd(container2.getContainerId()).exec();
client.unpauseContainerCmd(container3.getContainerId()).exec();
client.connectToNetworkCmd().withContainerId(container1.getContainerId()).withNetworkId(network.getId()).exec();
client.connectToNetworkCmd().withContainerId(container2.getContainerId()).withNetworkId(network.getId()).exec();
client.connectToNetworkCmd().withContainerId(container3.getContainerId()).withNetworkId(network.getId()).exec();
Log.warn("Waiting for connections");
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS, 2);
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS, 2);

View File

@@ -0,0 +1,177 @@
package com.usatiuk.dhfs.integration;
import io.quarkus.logging.Log;
import java.io.*;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
public class LazyFs {
private static final String lazyFsPath;
private final String dataRoot;
private final String lazyFsDataPath;
private final String name;
private final File tmpConfigFile;
private final File fifoPath;
static {
lazyFsPath = System.getProperty("lazyFsPath");
System.out.println("LazyFs Path: " + lazyFsPath);
}
public LazyFs(String name, String dataRoot, String lazyFsDataPath) {
this.name = name;
this.dataRoot = dataRoot;
this.lazyFsDataPath = lazyFsDataPath;
try {
tmpConfigFile = File.createTempFile("lazyfs", ".conf");
tmpConfigFile.deleteOnExit();
fifoPath = new File("/tmp/" + ThreadLocalRandom.current().nextLong() + ".faultsfifo");
fifoPath.deleteOnExit();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
private Thread outputPiper;
private Process fs;
private String fifoPath() {
return fifoPath.getAbsolutePath();
}
public void start() {
var lfsPath = Path.of(lazyFsPath).resolve("build").resolve("lazyfs");
if (!lfsPath.toFile().isFile())
throw new IllegalStateException("LazyFs binary does not exist: " + lfsPath.toAbsolutePath());
if (!lfsPath.toFile().canExecute())
throw new IllegalStateException("LazyFs binary is not executable: " + lfsPath.toAbsolutePath());
try (var rwFile = new RandomAccessFile(tmpConfigFile, "rw");
var channel = rwFile.getChannel()) {
channel.truncate(0);
var config = "[faults]\n" +
"fifo_path=\"" + fifoPath() + "\"\n" +
"[cache]\n" +
"apply_eviction=false\n" +
"[cache.simple]\n" +
"custom_size=\"1gb\"\n" +
"blocks_per_page=1\n" +
"[filesystem]\n" +
"log_all_operations=false\n" +
"logfile=\"\"";
rwFile.write(config.getBytes());
Log.info("LazyFs config: \n" + config);
} catch (Exception e) {
throw new RuntimeException(e);
}
var argList = new ArrayList<String>();
argList.add(lfsPath.toString());
argList.add(Path.of(dataRoot).toString());
argList.add("--config-path");
argList.add(tmpConfigFile.getAbsolutePath());
argList.add("-o");
argList.add("allow_other");
argList.add("-o");
argList.add("modules=subdir");
argList.add("-o");
argList.add("subdir=" + Path.of(lazyFsDataPath).toAbsolutePath().toString());
try {
Log.info("Starting LazyFs " + argList);
fs = Runtime.getRuntime().exec(argList.toArray(String[]::new));
Thread.sleep(1000);
} catch (Exception e) {
throw new RuntimeException(e);
}
outputPiper = new Thread(() -> {
try {
try (BufferedReader input = new BufferedReader(new InputStreamReader(fs.getInputStream()))) {
String line;
while ((line = input.readLine()) != null) {
System.out.println(line);
}
}
} catch (Exception e) {
Log.info("Exception in LazyFs piper", e);
}
});
outputPiper.start();
outputPiper = new Thread(() -> {
try {
try (BufferedReader input = new BufferedReader(new InputStreamReader(fs.getErrorStream()))) {
String line;
while ((line = input.readLine()) != null) {
System.out.println(line);
}
}
} catch (Exception e) {
Log.info("Exception in LazyFs piper", e);
}
});
outputPiper.start();
}
public void crash() {
try {
var cmd = "echo \"lazyfs::crash::timing=after::op=write::from_rgx=*\" > " + fifoPath();
System.out.println("Running command: " + cmd);
Runtime.getRuntime().exec(new String[]{"/bin/sh", "-c", cmd});
stop();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
public void stop() {
try {
if (fs == null) {
return;
}
Runtime.getRuntime().exec(new String[]{"/bin/sh", "-c", "fusermount3 -u " + dataRoot});
if (!fs.waitFor(1, TimeUnit.SECONDS))
throw new RuntimeException("LazyFs process did not stop in time");
fs = null;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
// Doesn't actually work?
//
// public void crashop() {
// try {
// var cmd = "echo \"lazyfs::torn-op::file=" + Path.of(lazyFsDataPath).toAbsolutePath().toString() + "/objects/data.mdb::persist=1,3::parts=3::occurrence=5\" > /tmp/faults.fifo";
// System.out.println("Running command: " + cmd);
// Runtime.getRuntime().exec(new String[]{"/bin/sh", "-c", cmd});
// Thread.sleep(1000);
// Runtime.getRuntime().exec(new String[]{"/bin/sh", "-c", "fusermount3 -u " + dataRoot});
// Thread.sleep(1000);
// } catch (Exception e) {
// throw new RuntimeException(e);
// }
// }
//
// public void crashseq() {
// try {
// var cmd = "echo \"lazyfs::torn-seq::op=write::file=" + Path.of(lazyFsDataPath).toAbsolutePath().toString() + "/objects/data.mdb::persist=1,4::occurrence=2\" > /tmp/faults.fifo";
// System.out.println("Running command: " + cmd);
// Runtime.getRuntime().exec(new String[]{"/bin/sh", "-c", cmd});
// Thread.sleep(1000);
// Runtime.getRuntime().exec(new String[]{"/bin/sh", "-c", "fusermount3 -u " + dataRoot});
// Thread.sleep(1000);
// } catch (Exception e) {
// throw new RuntimeException(e);
// }
// }
}

View File

@@ -0,0 +1,199 @@
package com.usatiuk.dhfs.integration;
import com.github.dockerjava.api.model.Device;
import com.usatiuk.dhfs.TestDataCleaner;
import io.quarkus.logging.Log;
import org.junit.jupiter.api.*;
import org.slf4j.LoggerFactory;
import org.testcontainers.DockerClientFactory;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.containers.output.WaitingConsumer;
import org.testcontainers.containers.wait.strategy.Wait;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.time.Duration;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Stream;
import static org.awaitility.Awaitility.await;
public class LazyFsIT {
GenericContainer<?> container1;
GenericContainer<?> container2;
WaitingConsumer waitingConsumer1;
WaitingConsumer waitingConsumer2;
String c1uuid;
String c2uuid;
File data1;
File data2;
File data1Lazy;
LazyFs lazyFs1;
@BeforeEach
void setup(TestInfo testInfo) throws IOException, InterruptedException, TimeoutException {
data1 = Files.createTempDirectory("dhfsdata").toFile();
data2 = Files.createTempDirectory("dhfsdata").toFile();
data1Lazy = Files.createTempDirectory("lazyfsroot").toFile();
Network network = Network.newNetwork();
lazyFs1 = new LazyFs(testInfo.getDisplayName(), data1.toString(), data1Lazy.toString());
lazyFs1.start();
container1 = new GenericContainer<>(DhfsImage.getInstance())
.withPrivilegedMode(true)
.withCreateContainerCmdModifier(cmd -> Objects.requireNonNull(cmd.getHostConfig()).withDevices(Device.parse("/dev/fuse")))
.waitingFor(Wait.forLogMessage(".*Listening.*", 1).withStartupTimeout(Duration.ofSeconds(60))).withNetwork(network)
.withFileSystemBind(data1.getAbsolutePath(), "/dhfs_test/data");
container2 = new GenericContainer<>(DhfsImage.getInstance())
.withPrivilegedMode(true)
.withCreateContainerCmdModifier(cmd -> Objects.requireNonNull(cmd.getHostConfig()).withDevices(Device.parse("/dev/fuse")))
.waitingFor(Wait.forLogMessage(".*Listening.*", 1).withStartupTimeout(Duration.ofSeconds(60))).withNetwork(network)
.withFileSystemBind(data2.getAbsolutePath(), "/dhfs_test/data");
Stream.of(container1, container2).parallel().forEach(GenericContainer::start);
waitingConsumer1 = new WaitingConsumer();
var loggingConsumer1 = new Slf4jLogConsumer(LoggerFactory.getLogger(LazyFsIT.class)).withPrefix("1-" + testInfo.getDisplayName());
container1.followOutput(loggingConsumer1.andThen(waitingConsumer1));
waitingConsumer2 = new WaitingConsumer();
var loggingConsumer2 = new Slf4jLogConsumer(LoggerFactory.getLogger(LazyFsIT.class)).withPrefix("2-" + testInfo.getDisplayName());
container2.followOutput(loggingConsumer2.andThen(waitingConsumer2));
c1uuid = container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/data/stuff/self_uuid").getStdout();
c2uuid = container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/data/stuff/self_uuid").getStdout();
Assertions.assertDoesNotThrow(() -> UUID.fromString(c1uuid));
Assertions.assertDoesNotThrow(() -> UUID.fromString(c2uuid));
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("New address"), 60, TimeUnit.SECONDS);
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("New address"), 60, TimeUnit.SECONDS);
var c1curl = container1.execInContainer("/bin/sh", "-c",
"curl --header \"Content-Type: application/json\" " +
" --request PUT " +
" --data '{\"uuid\":\"" + c2uuid + "\"}' " +
" http://localhost:8080/peers-manage/known-peers");
var c2curl = container2.execInContainer("/bin/sh", "-c",
"curl --header \"Content-Type: application/json\" " +
" --request PUT " +
" --data '{\"uuid\":\"" + c1uuid + "\"}' " +
" http://localhost:8080/peers-manage/known-peers");
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
}
@AfterEach
void stop() {
lazyFs1.stop();
Stream.of(container1, container2).parallel().forEach(GenericContainer::stop);
TestDataCleaner.purgeDirectory(data1);
TestDataCleaner.purgeDirectory(data1Lazy);
TestDataCleaner.purgeDirectory(data2);
}
@Test
void killTest(TestInfo testInfo) throws Exception {
var executor = Executors.newFixedThreadPool(2);
var barrier = new CyclicBarrier(2);
var ret1 = executor.submit(() -> {
try {
Log.info("Writing to container 1");
barrier.await();
container1.execInContainer("/bin/sh", "-c", "counter=0; while true; do counter=`expr $counter + 1`; echo $counter >> /dhfs_test/fuse/test1; done");
} catch (Exception e) {
throw new RuntimeException(e);
}
});
barrier.await();
Thread.sleep(1000);
Log.info("Killing");
lazyFs1.crash();
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Caused by: org.lmdbjava"), 5, TimeUnit.SECONDS);
Thread.sleep(1000);
var client = DockerClientFactory.instance().client();
client.killContainerCmd(container1.getContainerId()).exec();
container1.stop();
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Lost connection to"), 60, TimeUnit.SECONDS);
Log.info("Restart");
lazyFs1.start();
container1.start();
waitingConsumer1 = new WaitingConsumer();
var loggingConsumer1 = new Slf4jLogConsumer(LoggerFactory.getLogger(LazyFsIT.class)).withPrefix("1-" + testInfo.getDisplayName());
container1.followOutput(loggingConsumer1.andThen(waitingConsumer1));
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
await().atMost(45, TimeUnit.SECONDS).until(() -> {
Log.info("Listing consistency");
var ls1 = container1.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse");
var cat1 = container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/*");
var ls2 = container2.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse");
var cat2 = container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/*");
Log.info(ls1);
Log.info(cat1);
Log.info(ls2);
Log.info(cat2);
return ls1.equals(ls2) && cat1.equals(cat2);
});
}
// @Test
// void killTestDirs(TestInfo testInfo) throws Exception {
// var executor = Executors.newFixedThreadPool(2);
// var barrier = new CyclicBarrier(2);
// var ret1 = executor.submit(() -> {
// try {
// Log.info("Writing to container 1");
// barrier.await();
// container1.execInContainer("/bin/sh", "-c", "counter=0; while true; do counter=`expr $counter + 1`; echo $counter >> /dhfs_test/fuse/test$counter; done");
// } catch (Exception e) {
// throw new RuntimeException(e);
// }
// });
// barrier.await();
// Thread.sleep(10000);
// var client = DockerClientFactory.instance().client();
// client.killContainerCmd(container1.getContainerId()).exec();
// container1.stop();
// waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Lost connection to"), 60, TimeUnit.SECONDS);
// container1.start();
// waitingConsumer1 = new WaitingConsumer();
// var loggingConsumer1 = new Slf4jLogConsumer(LoggerFactory.getLogger(LazyFsIT.class)).withPrefix("1-" + testInfo.getDisplayName());
// container1.followOutput(loggingConsumer1.andThen(waitingConsumer1));
// waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
// waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
//
// await().atMost(45, TimeUnit.SECONDS).until(() -> {
// Log.info("Listing consistency");
// var ls1 = container1.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse");
// var cat1 = container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/*");
// var ls2 = container2.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse");
// var cat2 = container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/*");
// Log.info(ls1);
// Log.info(cat1);
// Log.info(ls2);
// Log.info(cat2);
//
// return ls1.equals(ls2) && cat1.equals(cat2);
// });
// }
}

View File

@@ -72,26 +72,6 @@
<artifactId>quarkus-junit5</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.github.SerCeMan</groupId>
<artifactId>jnr-fuse</artifactId>
<version>44ed40f8ce</version>
</dependency>
<dependency>
<groupId>com.github.jnr</groupId>
<artifactId>jnr-ffi</artifactId>
<version>2.2.16</version>
</dependency>
<dependency>
<groupId>com.github.jnr</groupId>
<artifactId>jnr-posix</artifactId>
<version>3.1.19</version>
</dependency>
<dependency>
<groupId>com.github.jnr</groupId>
<artifactId>jnr-constants</artifactId>
<version>0.10.4</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>

View File

@@ -1,10 +1,10 @@
package com.usatiuk.dhfs.files.objects;
import com.usatiuk.dhfs.JDataRemote;
import com.usatiuk.objects.JObjectKey;
import com.usatiuk.dhfs.jmap.JMapHolder;
import com.usatiuk.dhfs.jmap.JMapLongKey;
import com.usatiuk.dhfs.repository.JDataRemoteDto;
import com.usatiuk.objects.JObjectKey;
import java.util.Collection;
import java.util.Set;
@@ -28,6 +28,10 @@ public record File(JObjectKey key, long mode, long cTime, long mTime,
return new File(key, mode, cTime, mTime, symlink);
}
public File withCurrentMTime() {
return new File(key, mode, cTime, System.currentTimeMillis(), symlink);
}
@Override
public Collection<JObjectKey> collectRefsTo() {
return Set.of();

View File

@@ -247,7 +247,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
} else if (dent instanceof RemoteObjectMeta) {
var remote = remoteTx.getData(JDataRemote.class, uuid).orElse(null);
if (remote instanceof File f) {
remoteTx.putData(f.withMode(mode).withMTime(System.currentTimeMillis()));
remoteTx.putData(f.withMode(mode).withCurrentMTime());
return true;
} else {
throw new IllegalArgumentException(uuid + " is not a file");
@@ -453,7 +453,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
jMapHelper.put(file, JMapLongKey.of(e.getKey()), e.getValue());
}
remoteTx.putData(file);
remoteTx.putData(file.withCurrentMTime());
return (long) data.size();
});
@@ -544,7 +544,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
jMapHelper.put(file, JMapLongKey.of(e.getKey()), e.getValue());
}
remoteTx.putData(file);
remoteTx.putData(file.withCurrentMTime());
return true;
});
}

View File

@@ -111,6 +111,7 @@ public abstract class DhfsFileServiceSimpleTestImpl {
var uuid = ret.get();
var curMtime = fileService.getattr(uuid).get().mtime();
fileService.write(uuid, 0, new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9});
Assertions.assertArrayEquals(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, fileService.read(uuid, 0, 10).get().toByteArray());
Assertions.assertArrayEquals(new byte[]{2, 3, 4, 5, 6, 7, 8, 9}, fileService.read(uuid, 2, 8).get().toByteArray());
@@ -123,6 +124,9 @@ public abstract class DhfsFileServiceSimpleTestImpl {
fileService.write(uuid, 3, new byte[]{17, 18});
Assertions.assertArrayEquals(new byte[]{0, 1, 2, 17, 18, 11, 15, 16, 8, 9, 13, 14}, fileService.read(uuid, 0, 12).get().toByteArray());
var newMtime = fileService.getattr(uuid).get().mtime();
Assertions.assertTrue(newMtime > curMtime);
fileService.unlink("/writeTest");
Assertions.assertFalse(fileService.open("/writeTest").isPresent());
}

View File

@@ -73,24 +73,9 @@
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.github.SerCeMan</groupId>
<groupId>com.github.serceman</groupId>
<artifactId>jnr-fuse</artifactId>
<version>44ed40f8ce</version>
</dependency>
<dependency>
<groupId>com.github.jnr</groupId>
<artifactId>jnr-ffi</artifactId>
<version>2.2.16</version>
</dependency>
<dependency>
<groupId>com.github.jnr</groupId>
<artifactId>jnr-posix</artifactId>
<version>3.1.19</version>
</dependency>
<dependency>
<groupId>com.github.jnr</groupId>
<artifactId>jnr-constants</artifactId>
<version>0.10.4</version>
<version>0.5.8</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>

View File

@@ -1,6 +1,13 @@
package com.usatiuk.objects;
public sealed interface JDataVersionedWrapper permits JDataVersionedWrapperLazy, JDataVersionedWrapperImpl {
import com.usatiuk.objects.iterators.Data;
public sealed interface JDataVersionedWrapper extends Data<JDataVersionedWrapper> permits JDataVersionedWrapperLazy, JDataVersionedWrapperImpl {
@Override
default JDataVersionedWrapper value() {
return this;
}
JData data();
long version();

View File

@@ -24,6 +24,8 @@ public class JDataVersionedWrapperSerializer implements ObjectSerializer<JDataVe
public JDataVersionedWrapper deserialize(ByteString data) {
var version = data.substring(0, Long.BYTES).asReadOnlyByteBuffer().getLong();
var rawData = data.substring(Long.BYTES);
return new JDataVersionedWrapperLazy(version, rawData.size(), () -> dataSerializer.deserialize(rawData));
return new JDataVersionedWrapperLazy(version, rawData.size(),
() -> dataSerializer.deserialize(rawData)
);
}
}

View File

@@ -2,9 +2,6 @@ package com.usatiuk.objects.iterators;
import java.util.Optional;
public record Data<V>(V value) implements MaybeTombstone<V> {
@Override
public Optional<V> opt() {
return Optional.of(value);
}
public interface Data<V> extends MaybeTombstone<V> {
V value();
}

View File

@@ -0,0 +1,6 @@
package com.usatiuk.objects.iterators;
import java.util.Optional;
public record DataWrapper<V>(V value) implements Data<V> {
}

View File

@@ -3,5 +3,4 @@ package com.usatiuk.objects.iterators;
import java.util.Optional;
public interface MaybeTombstone<T> {
Optional<T> opt();
}

View File

@@ -9,22 +9,22 @@ public class NavigableMapKvIterator<K extends Comparable<K>, V> extends Reversib
private Iterator<Map.Entry<K, V>> _iterator;
private Map.Entry<K, V> _next;
public NavigableMapKvIterator(NavigableMap<K, V> map, IteratorStart start, K key) {
_map = map;
public NavigableMapKvIterator(NavigableMap<K, ? extends V> map, IteratorStart start, K key) {
_map = (NavigableMap<K, V>) map;
SortedMap<K, V> _view;
_goingForward = true;
switch (start) {
case GE -> _view = map.tailMap(key, true);
case GT -> _view = map.tailMap(key, false);
case GE -> _view = _map.tailMap(key, true);
case GT -> _view = _map.tailMap(key, false);
case LE -> {
var floorKey = map.floorKey(key);
var floorKey = _map.floorKey(key);
if (floorKey == null) _view = _map;
else _view = map.tailMap(floorKey, true);
else _view = _map.tailMap(floorKey, true);
}
case LT -> {
var lowerKey = map.lowerKey(key);
if (lowerKey == null) _view = _map;
else _view = map.tailMap(lowerKey, true);
else _view = _map.tailMap(lowerKey, true);
}
default -> throw new IllegalArgumentException("Unknown start type");
}

View File

@@ -2,9 +2,5 @@ package com.usatiuk.objects.iterators;
import java.util.Optional;
public record Tombstone<V>() implements MaybeTombstone<V> {
@Override
public Optional<V> opt() {
return Optional.empty();
}
public interface Tombstone<V> extends MaybeTombstone<V> {
}

View File

@@ -0,0 +1,4 @@
package com.usatiuk.objects.iterators;
public record TombstoneImpl<V>() implements Tombstone<V> {
}

View File

@@ -1,83 +1,24 @@
package com.usatiuk.objects.iterators;
import io.quarkus.logging.Log;
import org.apache.commons.lang3.tuple.Pair;
import java.util.List;
public class TombstoneMergingKvIterator<K extends Comparable<K>, V> implements CloseableKvIterator<K, V> {
private final CloseableKvIterator<K, V> _backing;
private final String _name;
public TombstoneMergingKvIterator(String name, IteratorStart startType, K startKey, List<IterProdFn<K, MaybeTombstone<V>>> iterators) {
_name = name;
_backing = new PredicateKvIterator<>(
new MergingKvIterator<>(name + "-merging", startType, startKey, iterators),
public abstract class TombstoneMergingKvIterator {
public static <K extends Comparable<K>, V> CloseableKvIterator<K, V> of(String name, IteratorStart startType, K startKey, List<IterProdFn<K, MaybeTombstone<V>>> iterators) {
return new PredicateKvIterator<K, MaybeTombstone<V>, V>(
new MergingKvIterator<K, MaybeTombstone<V>>(name + "-merging", startType, startKey, iterators),
startType, startKey,
pair -> {
Log.tracev("{0} - Processing pair {1}", _name, pair);
if (pair instanceof Tombstone) {
Log.tracev("{0} - Processing pair {1}", name, pair);
if (pair instanceof Tombstone<V>) {
return null;
}
return ((Data<V>) pair).value();
});
}
@SafeVarargs
public TombstoneMergingKvIterator(String name, IteratorStart startType, K startKey, IterProdFn<K, MaybeTombstone<V>>... iterators) {
this(name, startType, startKey, List.of(iterators));
}
@Override
public K peekNextKey() {
return _backing.peekNextKey();
}
@Override
public void skip() {
_backing.skip();
}
@Override
public K peekPrevKey() {
return _backing.peekPrevKey();
}
@Override
public Pair<K, V> prev() {
return _backing.prev();
}
@Override
public boolean hasPrev() {
return _backing.hasPrev();
}
@Override
public void skipPrev() {
_backing.skipPrev();
}
@Override
public void close() {
_backing.close();
}
@Override
public boolean hasNext() {
return _backing.hasNext();
}
@Override
public Pair<K, V> next() {
return _backing.next();
}
@Override
public String toString() {
return "TombstoneMergingKvIterator{" +
"_backing=" + _backing +
", _name='" + _name + '\'' +
'}';
public static <K extends Comparable<K>, V> CloseableKvIterator<K, V> of(String name, IteratorStart startType, K startKey, IterProdFn<K, MaybeTombstone<V>>... iterators) {
return of(name, startType, startKey, List.of(iterators));
}
}

View File

@@ -34,10 +34,9 @@ public class CachingObjectPersistentStore {
long version,
int sizeLimit) {
public Cache withPut(JObjectKey key, Optional<JDataVersionedWrapper> obj) {
int objSize = obj.map(JDataVersionedWrapper::estimateSize).orElse(16);
var entry = obj.<CacheEntry>map(o -> new CacheEntryPresent(o, o.estimateSize())).orElse(new CacheEntryMiss());
int newSize = size() + objSize;
var entry = new CacheEntry(obj.<MaybeTombstone<JDataVersionedWrapper>>map(Data::new).orElse(new Tombstone<>()), objSize);
int newSize = size() + entry.size();
var old = map.get(key);
if (old != null)
@@ -187,16 +186,9 @@ public class CachingObjectPersistentStore {
@Override
public CloseableKvIterator<JObjectKey, JDataVersionedWrapper> getIterator(IteratorStart start, JObjectKey key) {
return new TombstoneMergingKvIterator<>("cache", start, key,
(mS, mK)
-> new MappingKvIterator<>(
new NavigableMapKvIterator<>(_curCache.map(), mS, mK),
e -> {
// Log.tracev("Taken from cache: {0}", e);
return e.object();
}
),
(mS, mK) -> new MappingKvIterator<>(new CachingKvIterator(_backing.getIterator(start, key)), Data::new));
return TombstoneMergingKvIterator.<JObjectKey, JDataVersionedWrapper>of("cache", start, key,
(mS, mK) -> new NavigableMapKvIterator<JObjectKey, MaybeTombstone<JDataVersionedWrapper>>(_curCache.map(), mS, mK),
(mS, mK) -> new CachingKvIterator(_backing.getIterator(start, key)));
}
@Nonnull
@@ -204,12 +196,12 @@ public class CachingObjectPersistentStore {
public Optional<JDataVersionedWrapper> readObject(JObjectKey name) {
var cached = _curCache.map().get(name);
if (cached != null) {
return switch (cached.object()) {
case Data<JDataVersionedWrapper> data -> Optional.of(data.value());
case Tombstone<JDataVersionedWrapper> tombstone -> {
return switch (cached) {
case CacheEntryPresent data -> Optional.of(data.value());
case CacheEntryMiss tombstone -> {
yield Optional.empty();
}
default -> throw new IllegalStateException("Unexpected value: " + cached.object());
default -> throw new IllegalStateException("Unexpected value: " + cached);
};
}
var read = _backing.readObject(name);
@@ -228,7 +220,7 @@ public class CachingObjectPersistentStore {
_backing.close();
}
private class CachingKvIterator implements CloseableKvIterator<JObjectKey, JDataVersionedWrapper> {
private class CachingKvIterator implements CloseableKvIterator<JObjectKey, MaybeTombstone<JDataVersionedWrapper>> {
private final CloseableKvIterator<JObjectKey, JDataVersionedWrapper> _delegate;
private CachingKvIterator(CloseableKvIterator<JObjectKey, JDataVersionedWrapper> delegate) {
@@ -261,10 +253,10 @@ public class CachingObjectPersistentStore {
}
@Override
public Pair<JObjectKey, JDataVersionedWrapper> prev() {
public Pair<JObjectKey, MaybeTombstone<JDataVersionedWrapper>> prev() {
var prev = _delegate.prev();
maybeCache(prev.getKey(), Optional.of(prev.getValue()));
return prev;
return (Pair<JObjectKey, MaybeTombstone<JDataVersionedWrapper>>) (Pair<JObjectKey, ?>) prev;
}
@Override
@@ -278,10 +270,10 @@ public class CachingObjectPersistentStore {
}
@Override
public Pair<JObjectKey, JDataVersionedWrapper> next() {
public Pair<JObjectKey, MaybeTombstone<JDataVersionedWrapper>> next() {
var next = _delegate.next();
maybeCache(next.getKey(), Optional.of(next.getValue()));
return next;
return (Pair<JObjectKey, MaybeTombstone<JDataVersionedWrapper>>) (Pair<JObjectKey, ?>) next;
}
}
};
@@ -294,6 +286,18 @@ public class CachingObjectPersistentStore {
}
}
private record CacheEntry(MaybeTombstone<JDataVersionedWrapper> object, int size) {
private interface CacheEntry extends MaybeTombstone<JDataVersionedWrapper> {
int size();
}
private record CacheEntryPresent(JDataVersionedWrapper value,
int size) implements CacheEntry, Data<JDataVersionedWrapper> {
}
private record CacheEntryMiss() implements CacheEntry, Tombstone<JDataVersionedWrapper> {
@Override
public int size() {
return 64;
}
}
}

View File

@@ -208,6 +208,7 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore {
// private final Exception _allocationStacktrace = new Exception();
private final Exception _allocationStacktrace = null;
private boolean _hasNext = false;
private JObjectKey _peekedNextKey = null;
LmdbKvIterator(RefcountedCloseable<Txn<ByteBuffer>> txn, IteratorStart start, JObjectKey key) {
_txn = txn;
@@ -277,24 +278,24 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore {
}
}
var realGot = JObjectKey.fromByteBuffer(_cursor.key());
_cursor.key().flip();
switch (start) {
case LT -> {
// assert !_hasNext || realGot.compareTo(key) < 0;
}
case LE -> {
// assert !_hasNext || realGot.compareTo(key) <= 0;
}
case GT -> {
assert !_hasNext || realGot.compareTo(key) > 0;
}
case GE -> {
assert !_hasNext || realGot.compareTo(key) >= 0;
}
}
Log.tracev("got: {0}, hasNext: {1}", realGot, _hasNext);
// var realGot = JObjectKey.fromByteBuffer(_cursor.key());
// _cursor.key().flip();
//
// switch (start) {
// case LT -> {
//// assert !_hasNext || realGot.compareTo(key) < 0;
// }
// case LE -> {
//// assert !_hasNext || realGot.compareTo(key) <= 0;
// }
// case GT -> {
// assert !_hasNext || realGot.compareTo(key) > 0;
// }
// case GE -> {
// assert !_hasNext || realGot.compareTo(key) >= 0;
// }
// }
// Log.tracev("got: {0}, hasNext: {1}", realGot, _hasNext);
}
@Override
@@ -323,6 +324,7 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore {
}
}
_goingForward = !_goingForward;
_peekedNextKey = null;
}
@Override
@@ -330,8 +332,12 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore {
if (!_hasNext) {
throw new NoSuchElementException("No more elements");
}
if (_peekedNextKey != null) {
return _peekedNextKey;
}
var ret = JObjectKey.fromByteBuffer(_cursor.key());
_cursor.key().flip();
_peekedNextKey = ret;
return ret;
}
@@ -341,6 +347,7 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore {
_hasNext = _cursor.next();
else
_hasNext = _cursor.prev();
_peekedNextKey = null;
}
@Override
@@ -362,6 +369,7 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore {
else
_hasNext = _cursor.prev();
Log.tracev("Read: {0}, hasNext: {1}", ret, _hasNext);
_peekedNextKey = null;
return ret;
}
}

View File

@@ -1,6 +1,9 @@
package com.usatiuk.objects.stores;
import com.usatiuk.objects.JDataVersionedWrapper;
import com.usatiuk.objects.JObjectKey;
import com.usatiuk.objects.iterators.Tombstone;
public record PendingDelete(JObjectKey key, long bundleId) implements PendingWriteEntry {
public record PendingDelete(JObjectKey key,
long bundleId) implements PendingWriteEntry, Tombstone<JDataVersionedWrapper> {
}

View File

@@ -1,6 +1,7 @@
package com.usatiuk.objects.stores;
import com.usatiuk.objects.JDataVersionedWrapper;
import com.usatiuk.objects.iterators.Data;
public record PendingWrite(JDataVersionedWrapper data, long bundleId) implements PendingWriteEntry {
public record PendingWrite(JDataVersionedWrapper value, long bundleId) implements PendingWriteEntry, Data<JDataVersionedWrapper> {
}

View File

@@ -1,5 +1,8 @@
package com.usatiuk.objects.stores;
public interface PendingWriteEntry {
import com.usatiuk.objects.JDataVersionedWrapper;
import com.usatiuk.objects.iterators.MaybeTombstone;
public interface PendingWriteEntry extends MaybeTombstone<JDataVersionedWrapper> {
long bundleId();
}

View File

@@ -350,15 +350,9 @@ public class WritebackObjectPersistentStore {
@Override
public CloseableKvIterator<JObjectKey, JDataVersionedWrapper> getIterator(IteratorStart start, JObjectKey key) {
return new TombstoneMergingKvIterator<>("writeback-ps", start, key,
(tS, tK) -> new MappingKvIterator<>(
new NavigableMapKvIterator<>(_pendingWrites, tS, tK),
e -> switch (e) {
case PendingWrite pw -> new Data<>(pw.data());
case PendingDelete d -> new Tombstone<>();
default -> throw new IllegalStateException("Unexpected value: " + e);
}),
(tS, tK) -> new MappingKvIterator<>(_cache.getIterator(tS, tK), Data::new));
return TombstoneMergingKvIterator.<JObjectKey, JDataVersionedWrapper>of("writeback-ps", start, key,
(tS, tK) -> new NavigableMapKvIterator<>(_pendingWrites, tS, tK),
(tS, tK) -> (CloseableKvIterator<JObjectKey, MaybeTombstone<JDataVersionedWrapper>>) (CloseableKvIterator<JObjectKey, ?>) _cache.getIterator(tS, tK));
}
@Nonnull
@@ -367,7 +361,7 @@ public class WritebackObjectPersistentStore {
var cached = _pendingWrites.get(name);
if (cached != null) {
return switch (cached) {
case PendingWrite c -> Optional.of(c.data());
case PendingWrite c -> Optional.of(c.value());
case PendingDelete d -> {
yield Optional.empty();
}

View File

@@ -21,10 +21,6 @@ import java.util.*;
import java.util.function.Function;
import java.util.stream.Stream;
// Manages all access to com.usatiuk.objects.JData objects.
// In particular, it serves as a source of truth for what is committed to the backing storage.
// All data goes through it, it is responsible for transaction atomicity
// TODO: persistent tx id
@ApplicationScoped
public class JObjectManager {
private final List<PreCommitTxHook> _preCommitTxHooks;
@@ -219,7 +215,7 @@ public class JObjectManager {
// TODO: Every write gets a dependency due to hooks
continue;
// assert false;
// throw new TxCommitException("Serialization hazard: " + dep.isEmpty() + " vs " + read.getValue().data().isEmpty());
// throw new TxCommitException("Serialization hazard: " + dep.isEmpty() + " vs " + read.getValue().value().isEmpty());
}
if (current.get().version() > snapshotId) {
@@ -270,31 +266,4 @@ public class JObjectManager {
});
tx.close();
}
// private class TransactionObjectSourceImpl implements TransactionObjectSource {
// private final long _txId;
//
// private TransactionObjectSourceImpl(long txId) {
// _txId = txId;
// }
//
// @Override
// public <T extends JData> TransactionObject<T> get(Class<T> type, JObjectKey key) {
// var got = getObj(type, key);
// if (got.data().isPresent() && got.data().get().version() > _txId) {
// throw new TxCommitException("Serialization race for " + key + ": " + got.data().get().version() + " vs " + _txId);
// }
// return got;
// }
//
// @Override
// public <T extends JData> TransactionObject<T> getWriteLocked(Class<T> type, JObjectKey key) {
// var got = getObjLock(type, key);
// if (got.data().isPresent() && got.data().get().version() > _txId) {
// got.lock().close();
// throw new TxCommitException("Serialization race for " + key + ": " + got.data().get().version() + " vs " + _txId);
// }
// return got;
// }
// }
}

View File

@@ -7,7 +7,6 @@ import com.usatiuk.objects.iterators.*;
import com.usatiuk.objects.snapshot.Snapshot;
import com.usatiuk.objects.snapshot.SnapshotManager;
import io.quarkus.logging.Log;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import org.apache.commons.lang3.tuple.Pair;
@@ -162,20 +161,25 @@ public class TransactionFactoryImpl implements TransactionFactory {
@Override
public CloseableKvIterator<JObjectKey, JData> getIterator(IteratorStart start, JObjectKey key) {
Log.tracev("Getting tx iterator with start={0}, key={1}", start, key);
return new ReadTrackingIterator(new TombstoneMergingKvIterator<>("tx", start, key,
return new ReadTrackingIterator(TombstoneMergingKvIterator.<JObjectKey, ReadTrackingInternalCrap>of("tx", start, key,
(tS, tK) -> new MappingKvIterator<>(new NavigableMapKvIterator<>(_writes, tS, tK),
t -> switch (t) {
case TxRecord.TxObjectRecordWrite<?> write ->
new Data<>(new ReadTrackingInternalCrapTx(write.data()));
case TxRecord.TxObjectRecordDeleted deleted -> new Tombstone<>();
new DataWrapper<>(new ReadTrackingInternalCrapTx(write.data()));
case TxRecord.TxObjectRecordDeleted deleted -> new TombstoneImpl<>();
case null, default -> null;
}),
(tS, tK) -> new MappingKvIterator<>(_snapshot.getIterator(tS, tK),
d -> new Data<ReadTrackingInternalCrap>(new ReadTrackingInternalCrapSource(d)))));
d -> new DataWrapper<ReadTrackingInternalCrap>(new ReadTrackingInternalCrapSource(d)))));
}
@Override
public void put(JData obj) {
var read = _readSet.get(obj.key());
if (read != null && (read.data().map(JDataVersionedWrapper::data).orElse(null) == obj)) {
return;
}
_writes.put(obj.key(), new TxRecord.TxObjectRecordWrite<>(obj));
_newWrites.put(obj.key(), new TxRecord.TxObjectRecordWrite<>(obj));
}

View File

@@ -17,30 +17,27 @@ public interface TransactionManager {
return supplier.get();
}
begin();
T ret;
try {
ret = supplier.get();
} catch (TxCommitException txCommitException) {
rollback();
if (tries == 0) {
Log.error("Transaction commit failed", txCommitException);
throw txCommitException;
while (true) {
begin();
boolean commit = false;
try {
var ret = supplier.get();
commit = true;
commit();
return ret;
} catch (TxCommitException txCommitException) {
if (!commit)
rollback();
if (tries == 0) {
Log.error("Transaction commit failed", txCommitException);
throw txCommitException;
}
tries--;
} catch (Throwable e) {
if (!commit)
rollback();
throw e;
}
return runTries(supplier, tries - 1);
} catch (Throwable e) {
rollback();
throw e;
}
try {
commit();
return ret;
} catch (TxCommitException txCommitException) {
if (tries == 0) {
Log.error("Transaction commit failed", txCommitException);
throw txCommitException;
}
return runTries(supplier, tries - 1);
}
}
@@ -55,29 +52,29 @@ public interface TransactionManager {
};
}
begin();
try {
fn.apply();
} catch (TxCommitException txCommitException) {
rollback();
if (tries == 0) {
Log.error("Transaction commit failed", txCommitException);
throw txCommitException;
while (true) {
begin();
boolean commit = false;
try {
fn.apply();
commit = true;
var ret = commit();
return ret;
} catch (TxCommitException txCommitException) {
if (!commit)
rollback();
if (tries == 0) {
Log.error("Transaction commit failed", txCommitException);
throw txCommitException;
}
tries--;
} catch (Throwable e) {
if (!commit)
rollback();
throw e;
}
return runTries(fn, tries - 1);
} catch (Throwable e) {
rollback();
throw e;
}
try {
return commit();
} catch (TxCommitException txCommitException) {
if (tries == 0) {
Log.error("Transaction commit failed", txCommitException);
throw txCommitException;
}
return runTries(fn, tries - 1);
}
}
default TransactionHandle run(VoidFn fn) {

View File

@@ -35,13 +35,6 @@
<dhfs.native-libs-dir>${project.parent.build.outputDirectory}/native</dhfs.native-libs-dir>
</properties>
<repositories>
<repository>
<id>jitpack.io</id>
<url>https://jitpack.io</url>
</repository>
</repositories>
<dependencyManagement>
<dependencies>
<dependency>
@@ -151,6 +144,7 @@
</native.image.path>
<java.util.logging.manager>org.jboss.logmanager.LogManager</java.util.logging.manager>
<buildDirectory>${project.build.directory}</buildDirectory>
<lazyFsPath>${project.basedir}/../../thirdparty/lazyfs/lazyfs/lazyfs</lazyFsPath>
<nativeLibsDirectory>${dhfs.native-libs-dir}</nativeLibsDirectory>
<maven.home>${maven.home}</maven.home>
</systemPropertyVariables>

View File

@@ -72,26 +72,6 @@
<artifactId>quarkus-junit5</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.github.SerCeMan</groupId>
<artifactId>jnr-fuse</artifactId>
<version>44ed40f8ce</version>
</dependency>
<dependency>
<groupId>com.github.jnr</groupId>
<artifactId>jnr-ffi</artifactId>
<version>2.2.16</version>
</dependency>
<dependency>
<groupId>com.github.jnr</groupId>
<artifactId>jnr-posix</artifactId>
<version>3.1.19</version>
</dependency>
<dependency>
<groupId>com.github.jnr</groupId>
<artifactId>jnr-constants</artifactId>
<version>0.10.4</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>

View File

@@ -36,7 +36,11 @@ public class DeferredInvalidationQueueService implements PeerConnectedEventListe
Log.info("Initializing with root " + dataRoot);
if (Paths.get(dataRoot).resolve(dataFileName).toFile().exists()) {
Log.info("Reading invalidation queue");
_persistentData = SerializationHelper.deserialize(Files.readAllBytes(Paths.get(dataRoot).resolve(dataFileName)));
try {
_persistentData = SerializationHelper.deserialize(Files.readAllBytes(Paths.get(dataRoot).resolve(dataFileName)));
} catch (Exception e) {
Log.error("Error reading invalidation queue", e);
}
}
}

9
thirdparty/lazyfs/build.sh vendored Executable file
View File

@@ -0,0 +1,9 @@
# apt install g++ cmake libfuse3-dev libfuse3-3 fuse3
export CMAKE_BUILD_PARALLEL_LEVEL="$(nproc)"
cd lazyfs
cd libs/libpcache && ./build.sh && cd -
cd lazyfs && ./build.sh && cd -

1
thirdparty/lazyfs/lazyfs vendored Submodule