mirror of
https://github.com/usatiuk/dhfs.git
synced 2025-10-29 04:57:48 +01:00
Compare commits
40 Commits
cursed-ite
...
060ab1767d
| Author | SHA1 | Date | |
|---|---|---|---|
| 060ab1767d | |||
| 89d87095c8 | |||
| 7425c1f312 | |||
| 428eca325f | |||
| 005bc35496 | |||
| 6685575ca5 | |||
| 1ae813aacd | |||
| e81671251a | |||
| add26bb156 | |||
| 4060045f15 | |||
| 75b484d5b2 | |||
| 1d9dc8ed4d | |||
| 7a85704862 | |||
| 367eedd540 | |||
| d01b9204f7 | |||
| 67fdacc3ff | |||
| 6ed9051be1 | |||
| abf95ba847 | |||
| 6a9f64439f | |||
| ceb9342b45 | |||
| ca354ba09c | |||
| 81af021292 | |||
| 0c04079258 | |||
| 2e2eb3ac97 | |||
| e2e756e7c5 | |||
| 04e932ed62 | |||
| aeec66389d | |||
| adc7356d4a | |||
| 16da05292f | |||
| b0149b7251 | |||
| 24416c1e87 | |||
| 34db870fc6 | |||
| 0e62a29ce0 | |||
| 7de5f91fd2 | |||
| ac68208b1a | |||
| 4e0675940e | |||
| 4f5f347b3c | |||
| bd5395e03f | |||
| f56f564e8b | |||
| eaa413e200 |
26
.github/workflows/server.yml
vendored
26
.github/workflows/server.yml
vendored
@@ -20,26 +20,21 @@ jobs:
|
||||
steps:
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v4
|
||||
with:
|
||||
submodules: 'recursive'
|
||||
|
||||
- name: Install sudo for ACT
|
||||
run: apt-get update && apt-get install -y sudo
|
||||
if: env.ACT=='true'
|
||||
|
||||
- name: Install fuse and maven
|
||||
run: sudo apt-get update && sudo apt-get install -y libfuse2
|
||||
- name: Install FUSE
|
||||
run: sudo apt-get update && sudo apt-get install -y libfuse2 libfuse3-dev libfuse3-3 fuse3
|
||||
|
||||
- name: Download maven
|
||||
run: |
|
||||
cd "$HOME"
|
||||
mkdir maven-bin
|
||||
curl -s -L https://dlcdn.apache.org/maven/maven-3/3.9.9/binaries/apache-maven-3.9.9-bin.tar.gz | tar xvz --strip-components=1 -C maven-bin
|
||||
echo "$HOME"/maven-bin/bin >> $GITHUB_PATH
|
||||
|
||||
- name: Maven info
|
||||
run: |
|
||||
echo $GITHUB_PATH
|
||||
echo $PATH
|
||||
mvn -v
|
||||
- name: User allow other for fuse
|
||||
run: echo "user_allow_other" | sudo tee -a /etc/fuse.conf
|
||||
|
||||
- name: Dump fuse.conf
|
||||
run: cat /etc/fuse.conf
|
||||
|
||||
- name: Set up JDK 21
|
||||
uses: actions/setup-java@v4
|
||||
@@ -48,6 +43,9 @@ jobs:
|
||||
distribution: "zulu"
|
||||
cache: maven
|
||||
|
||||
- name: Build LazyFS
|
||||
run: cd thirdparty/lazyfs/ && ./build.sh
|
||||
|
||||
- name: Test with Maven
|
||||
run: cd dhfs-parent && mvn -T $(nproc) --batch-mode --update-snapshots package verify
|
||||
|
||||
|
||||
3
.gitmodules
vendored
Normal file
3
.gitmodules
vendored
Normal file
@@ -0,0 +1,3 @@
|
||||
[submodule "thirdparty/lazyfs/lazyfs"]
|
||||
path = thirdparty/lazyfs/lazyfs
|
||||
url = git@github.com:dsrhaslab/lazyfs.git
|
||||
@@ -13,6 +13,11 @@
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.junit.jupiter</groupId>
|
||||
<artifactId>junit-jupiter-params</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.testcontainers</groupId>
|
||||
<artifactId>testcontainers</artifactId>
|
||||
@@ -72,26 +77,6 @@
|
||||
<artifactId>quarkus-junit5</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.github.SerCeMan</groupId>
|
||||
<artifactId>jnr-fuse</artifactId>
|
||||
<version>44ed40f8ce</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.github.jnr</groupId>
|
||||
<artifactId>jnr-ffi</artifactId>
|
||||
<version>2.2.16</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.github.jnr</groupId>
|
||||
<artifactId>jnr-posix</artifactId>
|
||||
<version>3.1.19</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.github.jnr</groupId>
|
||||
<artifactId>jnr-constants</artifactId>
|
||||
<version>0.10.4</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.commons</groupId>
|
||||
<artifactId>commons-lang3</artifactId>
|
||||
@@ -174,16 +159,13 @@
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-failsafe-plugin</artifactId>
|
||||
<configuration>
|
||||
<forkCount>1C</forkCount>
|
||||
<reuseForks>false</reuseForks>
|
||||
<parallel>classes</parallel>
|
||||
<systemPropertyVariables>
|
||||
<junit.jupiter.execution.parallel.enabled>
|
||||
true
|
||||
false
|
||||
</junit.jupiter.execution.parallel.enabled>
|
||||
<junit.jupiter.execution.parallel.mode.default>
|
||||
concurrent
|
||||
</junit.jupiter.execution.parallel.mode.default>
|
||||
<junit.jupiter.execution.parallel.config.dynamic.factor>
|
||||
0.5
|
||||
</junit.jupiter.execution.parallel.config.dynamic.factor>
|
||||
<junit.platform.output.capture.stdout>true</junit.platform.output.capture.stdout>
|
||||
<junit.platform.output.capture.stderr>true</junit.platform.output.capture.stderr>
|
||||
</systemPropertyVariables>
|
||||
|
||||
@@ -32,9 +32,11 @@ public class DhfsFuseIT {
|
||||
String c1uuid;
|
||||
String c2uuid;
|
||||
|
||||
Network network;
|
||||
|
||||
@BeforeEach
|
||||
void setup(TestInfo testInfo) throws IOException, InterruptedException, TimeoutException {
|
||||
Network network = Network.newNetwork();
|
||||
network = Network.newNetwork();
|
||||
container1 = new GenericContainer<>(DhfsImage.getInstance())
|
||||
.withPrivilegedMode(true)
|
||||
.withCreateContainerCmdModifier(cmd -> Objects.requireNonNull(cmd.getHostConfig()).withDevices(Device.parse("/dev/fuse")))
|
||||
@@ -81,6 +83,7 @@ public class DhfsFuseIT {
|
||||
@AfterEach
|
||||
void stop() {
|
||||
Stream.of(container1, container2).parallel().forEach(GenericContainer::stop);
|
||||
network.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
|
||||
@@ -35,13 +35,15 @@ public class DhfsFusex3IT {
|
||||
String c2uuid;
|
||||
String c3uuid;
|
||||
|
||||
Network network;
|
||||
|
||||
// This calculation is somewhat racy, so keep it hardcoded for now
|
||||
long emptyFileCount = 9;
|
||||
|
||||
@BeforeEach
|
||||
void setup(TestInfo testInfo) throws IOException, InterruptedException, TimeoutException {
|
||||
// TODO: Dedup
|
||||
Network network = Network.newNetwork();
|
||||
network = Network.newNetwork();
|
||||
|
||||
container1 = new GenericContainer<>(DhfsImage.getInstance())
|
||||
.withPrivilegedMode(true)
|
||||
@@ -131,6 +133,7 @@ public class DhfsFusex3IT {
|
||||
@AfterEach
|
||||
void stop() {
|
||||
Stream.of(container1, container2, container3).parallel().forEach(GenericContainer::stop);
|
||||
network.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -252,21 +255,22 @@ public class DhfsFusex3IT {
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> "tesempty\n".equals(container3.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testf").getStdout()));
|
||||
|
||||
var client = DockerClientFactory.instance().client();
|
||||
client.pauseContainerCmd(container1.getContainerId()).exec();
|
||||
client.pauseContainerCmd(container2.getContainerId()).exec();
|
||||
// Pauses needed as otherwise docker buffers some incoming packets
|
||||
client.disconnectFromNetworkCmd().withContainerId(container1.getContainerId()).withNetworkId(network.getId()).exec();
|
||||
client.disconnectFromNetworkCmd().withContainerId(container2.getContainerId()).withNetworkId(network.getId()).exec();
|
||||
client.disconnectFromNetworkCmd().withContainerId(container3.getContainerId()).withNetworkId(network.getId()).exec();
|
||||
|
||||
waitingConsumer3.waitUntil(frame -> frame.getUtf8String().contains("Lost connection to"), 60, TimeUnit.SECONDS, 2);
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container3.execInContainer("/bin/sh", "-c", "echo test3 >> /dhfs_test/fuse/testf").getExitCode());
|
||||
client.pauseContainerCmd(container3.getContainerId()).exec();
|
||||
client.unpauseContainerCmd(container2.getContainerId()).exec();
|
||||
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Lost connection to"), 60, TimeUnit.SECONDS, 2);
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "echo test2 >> /dhfs_test/fuse/testf").getExitCode());
|
||||
client.pauseContainerCmd(container2.getContainerId()).exec();
|
||||
client.unpauseContainerCmd(container1.getContainerId()).exec();
|
||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Lost connection to"), 60, TimeUnit.SECONDS, 2);
|
||||
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container3.execInContainer("/bin/sh", "-c", "echo test3 >> /dhfs_test/fuse/testf").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "echo test2 >> /dhfs_test/fuse/testf").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo test1 >> /dhfs_test/fuse/testf").getExitCode());
|
||||
client.unpauseContainerCmd(container2.getContainerId()).exec();
|
||||
client.unpauseContainerCmd(container3.getContainerId()).exec();
|
||||
|
||||
client.connectToNetworkCmd().withContainerId(container1.getContainerId()).withNetworkId(network.getId()).exec();
|
||||
client.connectToNetworkCmd().withContainerId(container2.getContainerId()).withNetworkId(network.getId()).exec();
|
||||
client.connectToNetworkCmd().withContainerId(container3.getContainerId()).withNetworkId(network.getId()).exec();
|
||||
|
||||
Log.warn("Waiting for connections");
|
||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS, 2);
|
||||
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS, 2);
|
||||
|
||||
@@ -66,7 +66,7 @@ public class DhfsImage implements Future<String> {
|
||||
.run("apt update && apt install -y libfuse2 curl gcc")
|
||||
.copy("/app", "/app")
|
||||
.copy("/libs", "/libs")
|
||||
.cmd("java", "-ea", "-Xmx128M",
|
||||
.cmd("java", "-ea", "-Xmx256M", "-XX:TieredStopAtLevel=1","-XX:+UseParallelGC",
|
||||
"--add-exports", "java.base/sun.nio.ch=ALL-UNNAMED",
|
||||
"--add-exports", "java.base/jdk.internal.access=ALL-UNNAMED",
|
||||
"--add-opens=java.base/java.nio=ALL-UNNAMED",
|
||||
@@ -75,12 +75,12 @@ public class DhfsImage implements Future<String> {
|
||||
"-Ddhfs.objects.deletion.delay=0",
|
||||
"-Ddhfs.objects.deletion.can-delete-retry-delay=1000",
|
||||
"-Ddhfs.objects.ref_verification=true",
|
||||
"-Ddhfs.objects.write_log=true",
|
||||
"-Ddhfs.objects.sync.timeout=10",
|
||||
"-Ddhfs.objects.sync.ping.timeout=5",
|
||||
"-Ddhfs.objects.reconnect_interval=1s",
|
||||
"-Dquarkus.log.category.\"com.usatiuk\".level=TRACE",
|
||||
"-Dquarkus.log.category.\"com.usatiuk.dhfs\".level=TRACE",
|
||||
"-Dquarkus.log.category.\"com.usatiuk.objects.transaction\".level=INFO",
|
||||
"-Ddhfs.objects.periodic-push-op-interval=5s",
|
||||
"-Ddhfs.fuse.root=/dhfs_test/fuse",
|
||||
"-Ddhfs.objects.persistence.files.root=/dhfs_test/data",
|
||||
|
||||
@@ -18,10 +18,7 @@ import java.nio.file.Files;
|
||||
import java.time.Duration;
|
||||
import java.util.Objects;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.CyclicBarrier;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.*;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import static org.awaitility.Awaitility.await;
|
||||
@@ -39,12 +36,18 @@ public class KillIT {
|
||||
File data1;
|
||||
File data2;
|
||||
|
||||
Network network;
|
||||
|
||||
ExecutorService executor;
|
||||
|
||||
@BeforeEach
|
||||
void setup(TestInfo testInfo) throws IOException, InterruptedException, TimeoutException {
|
||||
executor = Executors.newCachedThreadPool();
|
||||
|
||||
data1 = Files.createTempDirectory("").toFile();
|
||||
data2 = Files.createTempDirectory("").toFile();
|
||||
|
||||
Network network = Network.newNetwork();
|
||||
network = Network.newNetwork();
|
||||
|
||||
container1 = new GenericContainer<>(DhfsImage.getInstance())
|
||||
.withPrivilegedMode(true)
|
||||
@@ -96,11 +99,28 @@ public class KillIT {
|
||||
Stream.of(container1, container2).parallel().forEach(GenericContainer::stop);
|
||||
TestDataCleaner.purgeDirectory(data1);
|
||||
TestDataCleaner.purgeDirectory(data2);
|
||||
executor.close();
|
||||
network.close();
|
||||
}
|
||||
|
||||
private void checkConsistency() {
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> {
|
||||
Log.info("Listing consistency");
|
||||
var ls1 = container1.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse");
|
||||
var cat1 = container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/*");
|
||||
var ls2 = container2.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse");
|
||||
var cat2 = container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/*");
|
||||
Log.info(ls1);
|
||||
Log.info(cat1);
|
||||
Log.info(ls2);
|
||||
Log.info(cat2);
|
||||
|
||||
return ls1.equals(ls2) && cat1.equals(cat2);
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
void killTest(TestInfo testInfo) throws Exception {
|
||||
var executor = Executors.newFixedThreadPool(2);
|
||||
var barrier = new CyclicBarrier(2);
|
||||
var ret1 = executor.submit(() -> {
|
||||
try {
|
||||
@@ -124,24 +144,11 @@ public class KillIT {
|
||||
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
|
||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
|
||||
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> {
|
||||
Log.info("Listing consistency");
|
||||
var ls1 = container1.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse");
|
||||
var cat1 = container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/*");
|
||||
var ls2 = container2.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse");
|
||||
var cat2 = container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/*");
|
||||
Log.info(ls1);
|
||||
Log.info(cat1);
|
||||
Log.info(ls2);
|
||||
Log.info(cat2);
|
||||
|
||||
return ls1.equals(ls2) && cat1.equals(cat2);
|
||||
});
|
||||
checkConsistency();
|
||||
}
|
||||
|
||||
@Test
|
||||
void killTestDirs(TestInfo testInfo) throws Exception {
|
||||
var executor = Executors.newFixedThreadPool(2);
|
||||
var barrier = new CyclicBarrier(2);
|
||||
var ret1 = executor.submit(() -> {
|
||||
try {
|
||||
@@ -165,18 +172,64 @@ public class KillIT {
|
||||
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
|
||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
|
||||
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> {
|
||||
Log.info("Listing consistency");
|
||||
var ls1 = container1.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse");
|
||||
var cat1 = container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/*");
|
||||
var ls2 = container2.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse");
|
||||
var cat2 = container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/*");
|
||||
Log.info(ls1);
|
||||
Log.info(cat1);
|
||||
Log.info(ls2);
|
||||
Log.info(cat2);
|
||||
checkConsistency();
|
||||
}
|
||||
|
||||
return ls1.equals(ls2) && cat1.equals(cat2);
|
||||
@Test
|
||||
void killTest2(TestInfo testInfo) throws Exception {
|
||||
var barrier = new CyclicBarrier(2);
|
||||
var ret1 = executor.submit(() -> {
|
||||
try {
|
||||
Log.info("Writing to container 1");
|
||||
barrier.await();
|
||||
container1.execInContainer("/bin/sh", "-c", "counter=0; while [ ! -f /tmp/stopprinting ]; do counter=`expr $counter + 1`; echo $counter >> /dhfs_test/fuse/test1; done");
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
});
|
||||
barrier.await();
|
||||
Thread.sleep(10000);
|
||||
var client = DockerClientFactory.instance().client();
|
||||
client.killContainerCmd(container2.getContainerId()).exec();
|
||||
container2.stop();
|
||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Lost connection to"), 60, TimeUnit.SECONDS);
|
||||
container1.execInContainer("/bin/sh", "-c", "touch /tmp/stopprinting");
|
||||
container2.start();
|
||||
waitingConsumer2 = new WaitingConsumer();
|
||||
var loggingConsumer2 = new Slf4jLogConsumer(LoggerFactory.getLogger(KillIT.class)).withPrefix("2-" + testInfo.getDisplayName());
|
||||
container2.followOutput(loggingConsumer2.andThen(waitingConsumer2));
|
||||
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
|
||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
|
||||
|
||||
checkConsistency();
|
||||
}
|
||||
|
||||
@Test
|
||||
void killTestDirs2(TestInfo testInfo) throws Exception {
|
||||
var barrier = new CyclicBarrier(2);
|
||||
var ret1 = executor.submit(() -> {
|
||||
try {
|
||||
Log.info("Writing to container 1");
|
||||
barrier.await();
|
||||
container1.execInContainer("/bin/sh", "-c", "counter=0; while [ ! -f /tmp/stopprinting ]; do counter=`expr $counter + 1`; echo $counter >> /dhfs_test/fuse/test$counter; done");
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
});
|
||||
barrier.await();
|
||||
Thread.sleep(10000);
|
||||
var client = DockerClientFactory.instance().client();
|
||||
client.killContainerCmd(container2.getContainerId()).exec();
|
||||
container2.stop();
|
||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Lost connection to"), 60, TimeUnit.SECONDS);
|
||||
container1.execInContainer("/bin/sh", "-c", "touch /tmp/stopprinting");
|
||||
container2.start();
|
||||
waitingConsumer2 = new WaitingConsumer();
|
||||
var loggingConsumer2 = new Slf4jLogConsumer(LoggerFactory.getLogger(KillIT.class)).withPrefix("2-" + testInfo.getDisplayName());
|
||||
container2.followOutput(loggingConsumer2.andThen(waitingConsumer2));
|
||||
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
|
||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
|
||||
|
||||
checkConsistency();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,219 @@
|
||||
package com.usatiuk.dhfs.integration;
|
||||
|
||||
import io.quarkus.logging.Log;
|
||||
|
||||
import java.io.*;
|
||||
import java.nio.file.Path;
|
||||
import java.util.ArrayList;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.CyclicBarrier;
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public class LazyFs {
|
||||
private static final String lazyFsPath;
|
||||
|
||||
private final String mountRoot;
|
||||
private final String dataRoot;
|
||||
private final String name;
|
||||
|
||||
private final File configFile;
|
||||
private final File fifoFile;
|
||||
|
||||
static {
|
||||
lazyFsPath = System.getProperty("lazyFsPath");
|
||||
System.out.println("LazyFs Path: " + lazyFsPath);
|
||||
}
|
||||
|
||||
public LazyFs(String name, String mountRoot, String dataRoot) {
|
||||
this.name = name;
|
||||
this.mountRoot = mountRoot;
|
||||
this.dataRoot = dataRoot;
|
||||
|
||||
try {
|
||||
configFile = File.createTempFile("lazyfs", ".conf");
|
||||
configFile.deleteOnExit();
|
||||
|
||||
fifoFile = new File("/tmp/" + ThreadLocalRandom.current().nextLong() + ".faultsfifo");
|
||||
fifoFile.deleteOnExit();
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
||||
Runtime.getRuntime().addShutdownHook(new Thread(this::stop));
|
||||
}
|
||||
|
||||
private Thread errPiper;
|
||||
private Thread outPiper;
|
||||
private CountDownLatch startLatch;
|
||||
private Process fs;
|
||||
|
||||
private String fifoPath() {
|
||||
return fifoFile.getAbsolutePath();
|
||||
}
|
||||
|
||||
public void start(String extraOpts) {
|
||||
var lfsPath = Path.of(lazyFsPath).resolve("build").resolve("lazyfs");
|
||||
if (!lfsPath.toFile().isFile())
|
||||
throw new IllegalStateException("LazyFs binary does not exist: " + lfsPath.toAbsolutePath());
|
||||
if (!lfsPath.toFile().canExecute())
|
||||
throw new IllegalStateException("LazyFs binary is not executable: " + lfsPath.toAbsolutePath());
|
||||
|
||||
try (var rwFile = new RandomAccessFile(configFile, "rw");
|
||||
var channel = rwFile.getChannel()) {
|
||||
channel.truncate(0);
|
||||
var config = "[faults]\n" +
|
||||
"fifo_path=\"" + fifoPath() + "\"\n" +
|
||||
"[cache]\n" +
|
||||
"apply_eviction=false\n" +
|
||||
"[cache.simple]\n" +
|
||||
"custom_size=\"1gb\"\n" +
|
||||
"blocks_per_page=1\n" +
|
||||
"[filesystem]\n" +
|
||||
"log_all_operations=false\n" +
|
||||
"logfile=\"\"\n" + extraOpts;
|
||||
rwFile.write(config.getBytes());
|
||||
Log.info("LazyFs config: \n" + config);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
||||
var argList = new ArrayList<String>();
|
||||
|
||||
argList.add(lfsPath.toString());
|
||||
argList.add(Path.of(mountRoot).toString());
|
||||
argList.add("--config-path");
|
||||
argList.add(configFile.getAbsolutePath());
|
||||
argList.add("-o");
|
||||
argList.add("allow_other");
|
||||
argList.add("-o");
|
||||
argList.add("modules=subdir");
|
||||
argList.add("-o");
|
||||
argList.add("subdir=" + Path.of(dataRoot).toAbsolutePath().toString());
|
||||
try {
|
||||
Log.info("Starting LazyFs " + argList);
|
||||
fs = Runtime.getRuntime().exec(argList.toArray(String[]::new));
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
||||
startLatch = new CountDownLatch(1);
|
||||
|
||||
outPiper = new Thread(() -> {
|
||||
try {
|
||||
try (BufferedReader input = new BufferedReader(new InputStreamReader(fs.getInputStream()))) {
|
||||
String line;
|
||||
|
||||
while ((line = input.readLine()) != null) {
|
||||
if (line.contains("running LazyFS"))
|
||||
startLatch.countDown();
|
||||
System.out.println(line);
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
Log.info("Exception in LazyFs piper", e);
|
||||
}
|
||||
Log.info("LazyFs out piper finished");
|
||||
});
|
||||
outPiper.start();
|
||||
errPiper = new Thread(() -> {
|
||||
try {
|
||||
try (BufferedReader input = new BufferedReader(new InputStreamReader(fs.getErrorStream()))) {
|
||||
String line;
|
||||
|
||||
while ((line = input.readLine()) != null) {
|
||||
System.out.println(line);
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
Log.info("Exception in LazyFs piper", e);
|
||||
}
|
||||
Log.info("LazyFs err piper finished");
|
||||
});
|
||||
errPiper.start();
|
||||
|
||||
try {
|
||||
if (!startLatch.await(30, TimeUnit.SECONDS))
|
||||
throw new RuntimeException("StartLatch timed out");
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
Log.info("LazyFs started");
|
||||
}
|
||||
|
||||
public void start() {
|
||||
start("");
|
||||
}
|
||||
|
||||
private String mdbPath() {
|
||||
return Path.of(dataRoot).resolve("objects").resolve("data.mdb").toAbsolutePath().toString();
|
||||
}
|
||||
|
||||
public void startTornOp() {
|
||||
start("\n" +
|
||||
"[[injection]]\n" +
|
||||
"type=\"torn-seq\"\n" +
|
||||
"op=\"write\"\n" +
|
||||
"file=\"" + mdbPath() + "\"\n" +
|
||||
"persist=[1,4]\n" +
|
||||
"occurrence=3");
|
||||
}
|
||||
|
||||
public void startTornSeq() {
|
||||
start("[[injection]]\n" +
|
||||
"type=\"torn-op\"\n" +
|
||||
"file=\"" + mdbPath() + "\"\n" +
|
||||
"occurrence=3\n" +
|
||||
"parts=3 #or parts_bytes=[4096,3600,1260]\n" +
|
||||
"persist=[1,3]");
|
||||
}
|
||||
|
||||
public void crash() {
|
||||
try {
|
||||
var cmd = "echo \"lazyfs::crash::timing=after::op=write::from_rgx=*\" > " + fifoPath();
|
||||
Log.info("Running command: " + cmd);
|
||||
Runtime.getRuntime().exec(new String[]{"/bin/sh", "-c", cmd}).waitFor();
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
public void stop() {
|
||||
try {
|
||||
synchronized (this) {
|
||||
Runtime.getRuntime().exec(new String[]{"/bin/sh", "-c", "fusermount3 -u " + mountRoot}).waitFor();
|
||||
}
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
// Doesn't actually work?
|
||||
//
|
||||
// public void crashop() {
|
||||
// try {
|
||||
// var cmd = "echo \"lazyfs::torn-op::file=" + Path.of(lazyFsDataPath).toAbsolutePath().toString() + "/objects/data.mdb::persist=1,3::parts=3::occurrence=5\" > /tmp/faults.fifo";
|
||||
// System.out.println("Running command: " + cmd);
|
||||
// Runtime.getRuntime().exec(new String[]{"/bin/sh", "-c", cmd});
|
||||
// Thread.sleep(1000);
|
||||
// Runtime.getRuntime().exec(new String[]{"/bin/sh", "-c", "fusermount3 -u " + dataRoot});
|
||||
// Thread.sleep(1000);
|
||||
// } catch (Exception e) {
|
||||
// throw new RuntimeException(e);
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// public void crashseq() {
|
||||
// try {
|
||||
// var cmd = "echo \"lazyfs::torn-seq::op=write::file=" + Path.of(lazyFsDataPath).toAbsolutePath().toString() + "/objects/data.mdb::persist=1,4::occurrence=2\" > /tmp/faults.fifo";
|
||||
// System.out.println("Running command: " + cmd);
|
||||
// Runtime.getRuntime().exec(new String[]{"/bin/sh", "-c", cmd});
|
||||
// Thread.sleep(1000);
|
||||
// Runtime.getRuntime().exec(new String[]{"/bin/sh", "-c", "fusermount3 -u " + dataRoot});
|
||||
// Thread.sleep(1000);
|
||||
// } catch (Exception e) {
|
||||
// throw new RuntimeException(e);
|
||||
// }
|
||||
// }
|
||||
}
|
||||
|
||||
@@ -0,0 +1,470 @@
|
||||
package com.usatiuk.dhfs.integration;
|
||||
|
||||
import com.github.dockerjava.api.model.Device;
|
||||
import com.usatiuk.dhfs.TestDataCleaner;
|
||||
import io.quarkus.logging.Log;
|
||||
import org.junit.jupiter.api.*;
|
||||
import org.junit.jupiter.params.provider.Arguments;
|
||||
import org.junit.jupiter.params.provider.EnumSource;
|
||||
import org.junit.jupiter.params.provider.MethodSource;
|
||||
import org.junit.jupiter.params.provider.ValueSource;
|
||||
import org.junit.jupiter.params.ParameterizedTest;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.testcontainers.DockerClientFactory;
|
||||
import org.testcontainers.containers.GenericContainer;
|
||||
import org.testcontainers.containers.Network;
|
||||
import org.testcontainers.containers.output.Slf4jLogConsumer;
|
||||
import org.testcontainers.containers.output.WaitingConsumer;
|
||||
import org.testcontainers.containers.wait.strategy.Wait;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Files;
|
||||
import java.time.Duration;
|
||||
import java.util.Objects;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.*;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import static org.awaitility.Awaitility.await;
|
||||
|
||||
public class LazyFsIT {
|
||||
GenericContainer<?> container1;
|
||||
GenericContainer<?> container2;
|
||||
|
||||
WaitingConsumer waitingConsumer1;
|
||||
WaitingConsumer waitingConsumer2;
|
||||
|
||||
String c1uuid;
|
||||
String c2uuid;
|
||||
|
||||
File data1;
|
||||
File data2;
|
||||
File data1Lazy;
|
||||
File data2Lazy;
|
||||
|
||||
LazyFs lazyFs1;
|
||||
LazyFs lazyFs2;
|
||||
|
||||
ExecutorService executor;
|
||||
Network network;
|
||||
|
||||
@BeforeEach
|
||||
void setup(TestInfo testInfo) throws IOException, InterruptedException, TimeoutException {
|
||||
executor = Executors.newCachedThreadPool();
|
||||
data1 = Files.createTempDirectory("dhfsdata").toFile();
|
||||
data2 = Files.createTempDirectory("dhfsdata").toFile();
|
||||
data1Lazy = Files.createTempDirectory("lazyfsroot").toFile();
|
||||
data2Lazy = Files.createTempDirectory("lazyfsroot").toFile();
|
||||
|
||||
network = Network.newNetwork();
|
||||
|
||||
lazyFs1 = new LazyFs(testInfo.getDisplayName(), data1.toString(), data1Lazy.toString());
|
||||
lazyFs1.start();
|
||||
lazyFs2 = new LazyFs(testInfo.getDisplayName(), data2.toString(), data2Lazy.toString());
|
||||
lazyFs2.start();
|
||||
|
||||
container1 = new GenericContainer<>(DhfsImage.getInstance())
|
||||
.withPrivilegedMode(true)
|
||||
.withCreateContainerCmdModifier(cmd -> Objects.requireNonNull(cmd.getHostConfig()).withDevices(Device.parse("/dev/fuse")))
|
||||
.waitingFor(Wait.forLogMessage(".*Listening.*", 1).withStartupTimeout(Duration.ofSeconds(60))).withNetwork(network)
|
||||
.withFileSystemBind(data1.getAbsolutePath(), "/dhfs_test/data");
|
||||
container2 = new GenericContainer<>(DhfsImage.getInstance())
|
||||
.withPrivilegedMode(true)
|
||||
.withCreateContainerCmdModifier(cmd -> Objects.requireNonNull(cmd.getHostConfig()).withDevices(Device.parse("/dev/fuse")))
|
||||
.waitingFor(Wait.forLogMessage(".*Listening.*", 1).withStartupTimeout(Duration.ofSeconds(60))).withNetwork(network)
|
||||
.withFileSystemBind(data2.getAbsolutePath(), "/dhfs_test/data");
|
||||
|
||||
Stream.of(container1, container2).parallel().forEach(GenericContainer::start);
|
||||
|
||||
waitingConsumer1 = new WaitingConsumer();
|
||||
var loggingConsumer1 = new Slf4jLogConsumer(LoggerFactory.getLogger(LazyFsIT.class)).withPrefix("1-" + testInfo.getDisplayName());
|
||||
container1.followOutput(loggingConsumer1.andThen(waitingConsumer1));
|
||||
waitingConsumer2 = new WaitingConsumer();
|
||||
var loggingConsumer2 = new Slf4jLogConsumer(LoggerFactory.getLogger(LazyFsIT.class)).withPrefix("2-" + testInfo.getDisplayName());
|
||||
container2.followOutput(loggingConsumer2.andThen(waitingConsumer2));
|
||||
|
||||
c1uuid = container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/data/stuff/self_uuid").getStdout();
|
||||
c2uuid = container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/data/stuff/self_uuid").getStdout();
|
||||
|
||||
Assertions.assertDoesNotThrow(() -> UUID.fromString(c1uuid));
|
||||
Assertions.assertDoesNotThrow(() -> UUID.fromString(c2uuid));
|
||||
|
||||
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("New address"), 60, TimeUnit.SECONDS);
|
||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("New address"), 60, TimeUnit.SECONDS);
|
||||
|
||||
var c1curl = container1.execInContainer("/bin/sh", "-c",
|
||||
"curl --header \"Content-Type: application/json\" " +
|
||||
" --request PUT " +
|
||||
" --data '{\"uuid\":\"" + c2uuid + "\"}' " +
|
||||
" http://localhost:8080/peers-manage/known-peers");
|
||||
|
||||
var c2curl = container2.execInContainer("/bin/sh", "-c",
|
||||
"curl --header \"Content-Type: application/json\" " +
|
||||
" --request PUT " +
|
||||
" --data '{\"uuid\":\"" + c1uuid + "\"}' " +
|
||||
" http://localhost:8080/peers-manage/known-peers");
|
||||
|
||||
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
|
||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
@AfterEach
|
||||
void stop() {
|
||||
lazyFs1.stop();
|
||||
lazyFs2.stop();
|
||||
|
||||
Stream.of(container1, container2).parallel().forEach(GenericContainer::stop);
|
||||
TestDataCleaner.purgeDirectory(data1);
|
||||
TestDataCleaner.purgeDirectory(data1Lazy);
|
||||
TestDataCleaner.purgeDirectory(data2);
|
||||
TestDataCleaner.purgeDirectory(data2Lazy);
|
||||
|
||||
executor.close();
|
||||
network.close();
|
||||
}
|
||||
|
||||
private void checkConsistency(String testName) {
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> {
|
||||
var ls1 = container1.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse");
|
||||
var cat1 = container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/*");
|
||||
var ls2 = container2.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse");
|
||||
var cat2 = container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/*");
|
||||
Log.info("Listing consistency " + testName + "\n"
|
||||
+ ls1 + "\n"
|
||||
+ cat1 + "\n"
|
||||
+ ls2 + "\n"
|
||||
+ cat2 + "\n");
|
||||
|
||||
return ls1.equals(ls2) && cat1.equals(cat2);
|
||||
});
|
||||
}
|
||||
|
||||
private static enum CrashType {
|
||||
CRASH,
|
||||
TORN_OP,
|
||||
TORN_SEQ
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@EnumSource(CrashType.class)
|
||||
void killTest(CrashType crashType, TestInfo testInfo) throws Exception {
|
||||
var barrier = new CountDownLatch(1);
|
||||
executor.submit(() -> {
|
||||
try {
|
||||
Log.info("Writing to container 1");
|
||||
barrier.countDown();
|
||||
container1.execInContainer("/bin/sh", "-c", "counter=0; while true; do counter=`expr $counter + 1`; echo $counter >> /dhfs_test/fuse/test1; done");
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
});
|
||||
barrier.await();
|
||||
Thread.sleep(3000);
|
||||
Log.info("Killing");
|
||||
lazyFs1.crash();
|
||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Caused by: org.lmdbjava"), 60, TimeUnit.SECONDS);
|
||||
var client = DockerClientFactory.instance().client();
|
||||
client.killContainerCmd(container1.getContainerId()).exec();
|
||||
container1.stop();
|
||||
lazyFs1.stop();
|
||||
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Lost connection to"), 60, TimeUnit.SECONDS);
|
||||
Log.info("Restart");
|
||||
switch (crashType) {
|
||||
case CRASH -> lazyFs1.start();
|
||||
case TORN_OP -> lazyFs1.startTornOp();
|
||||
case TORN_SEQ -> lazyFs1.startTornSeq();
|
||||
}
|
||||
container1.start();
|
||||
|
||||
waitingConsumer1 = new WaitingConsumer();
|
||||
var loggingConsumer1 = new Slf4jLogConsumer(LoggerFactory.getLogger(LazyFsIT.class)).withPrefix("1-" + testInfo.getDisplayName());
|
||||
container1.followOutput(loggingConsumer1.andThen(waitingConsumer1));
|
||||
try {
|
||||
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
|
||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
|
||||
} catch (TimeoutException e) {
|
||||
Log.info("Failed to connect: " + testInfo.getDisplayName());
|
||||
if (crashType.equals(CrashType.CRASH))
|
||||
throw e;
|
||||
// LazyFs can crash too early
|
||||
Assumptions.assumeTrue(false);
|
||||
}
|
||||
|
||||
executor.submit(() -> {
|
||||
try {
|
||||
Log.info("Writing to container 1");
|
||||
barrier.countDown();
|
||||
container1.execInContainer("/bin/sh", "-c", "counter=0; while true; do counter=`expr $counter + 1`; echo $counter >> /dhfs_test/fuse/test2; done");
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
});
|
||||
Log.info("Killing");
|
||||
if (crashType.equals(CrashType.CRASH)) {
|
||||
Thread.sleep(3000);
|
||||
lazyFs1.crash();
|
||||
}
|
||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Caused by: org.lmdbjava"), 60, TimeUnit.SECONDS);
|
||||
client.killContainerCmd(container1.getContainerId()).exec();
|
||||
container1.stop();
|
||||
lazyFs1.stop();
|
||||
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Lost connection to"), 60, TimeUnit.SECONDS);
|
||||
Log.info("Restart");
|
||||
lazyFs1.start();
|
||||
container1.start();
|
||||
|
||||
waitingConsumer1 = new WaitingConsumer();
|
||||
loggingConsumer1 = new Slf4jLogConsumer(LoggerFactory.getLogger(LazyFsIT.class)).withPrefix("1-" + testInfo.getDisplayName());
|
||||
container1.followOutput(loggingConsumer1.andThen(waitingConsumer1));
|
||||
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
|
||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
|
||||
|
||||
checkConsistency(testInfo.getDisplayName());
|
||||
}
|
||||
|
||||
|
||||
@ParameterizedTest
|
||||
@EnumSource(CrashType.class)
|
||||
void killTestDirs(CrashType crashType, TestInfo testInfo) throws Exception {
|
||||
var barrier = new CountDownLatch(1);
|
||||
executor.submit(() -> {
|
||||
try {
|
||||
Log.info("Writing to container 1");
|
||||
barrier.countDown();
|
||||
container1.execInContainer("/bin/sh", "-c", "counter=0; while true; do counter=`expr $counter + 1`; echo $counter >> /dhfs_test/fuse/test$counter; done");
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
});
|
||||
barrier.await();
|
||||
Thread.sleep(3000);
|
||||
Log.info("Killing");
|
||||
lazyFs1.crash();
|
||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Caused by: org.lmdbjava"), 60, TimeUnit.SECONDS);
|
||||
var client = DockerClientFactory.instance().client();
|
||||
client.killContainerCmd(container1.getContainerId()).exec();
|
||||
container1.stop();
|
||||
lazyFs1.stop();
|
||||
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Lost connection to"), 60, TimeUnit.SECONDS);
|
||||
Log.info("Restart");
|
||||
switch (crashType) {
|
||||
case CRASH -> lazyFs1.start();
|
||||
case TORN_OP -> lazyFs1.startTornOp();
|
||||
case TORN_SEQ -> lazyFs1.startTornSeq();
|
||||
}
|
||||
container1.start();
|
||||
|
||||
waitingConsumer1 = new WaitingConsumer();
|
||||
var loggingConsumer1 = new Slf4jLogConsumer(LoggerFactory.getLogger(LazyFsIT.class)).withPrefix("1-" + testInfo.getDisplayName());
|
||||
container1.followOutput(loggingConsumer1.andThen(waitingConsumer1));
|
||||
try {
|
||||
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
|
||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
|
||||
} catch (TimeoutException e) {
|
||||
Log.info("Failed to connect: " + testInfo.getDisplayName());
|
||||
if (crashType.equals(CrashType.CRASH))
|
||||
throw e;
|
||||
// LazyFs can crash too early
|
||||
|
||||
Assumptions.assumeTrue(false);
|
||||
}
|
||||
|
||||
executor.submit(() -> {
|
||||
try {
|
||||
Log.info("Writing to container 1");
|
||||
barrier.countDown();
|
||||
container1.execInContainer("/bin/sh", "-c", "counter=0; while true; do counter=`expr $counter + 1`; echo $counter >> /dhfs_test/fuse/2test$counter; done");
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
});
|
||||
Log.info("Killing");
|
||||
if (crashType.equals(CrashType.CRASH)) {
|
||||
Thread.sleep(3000);
|
||||
lazyFs1.crash();
|
||||
}
|
||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Caused by: org.lmdbjava"), 60, TimeUnit.SECONDS);
|
||||
client.killContainerCmd(container1.getContainerId()).exec();
|
||||
container1.stop();
|
||||
lazyFs1.stop();
|
||||
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Lost connection to"), 60, TimeUnit.SECONDS);
|
||||
Log.info("Restart");
|
||||
lazyFs1.start();
|
||||
container1.start();
|
||||
|
||||
waitingConsumer1 = new WaitingConsumer();
|
||||
loggingConsumer1 = new Slf4jLogConsumer(LoggerFactory.getLogger(LazyFsIT.class)).withPrefix("1-" + testInfo.getDisplayName());
|
||||
container1.followOutput(loggingConsumer1.andThen(waitingConsumer1));
|
||||
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
|
||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
|
||||
|
||||
checkConsistency(testInfo.getDisplayName());
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@EnumSource(CrashType.class)
|
||||
void killTest2(CrashType crashType, TestInfo testInfo) throws Exception {
|
||||
var barrier = new CountDownLatch(1);
|
||||
executor.submit(() -> {
|
||||
try {
|
||||
Log.info("Writing to container 1");
|
||||
barrier.countDown();
|
||||
container1.execInContainer("/bin/sh", "-c", "counter=0; while [ ! -f /tmp/stopprinting1 ]; do counter=`expr $counter + 1`; echo $counter >> /dhfs_test/fuse/test1; done");
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
});
|
||||
barrier.await();
|
||||
Thread.sleep(3000);
|
||||
Log.info("Killing");
|
||||
lazyFs2.crash();
|
||||
container1.execInContainer("/bin/sh", "-c", "touch /tmp/stopprinting1");
|
||||
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Caused by: org.lmdbjava"), 60, TimeUnit.SECONDS);
|
||||
var client = DockerClientFactory.instance().client();
|
||||
client.killContainerCmd(container2.getContainerId()).exec();
|
||||
container2.stop();
|
||||
lazyFs2.stop();
|
||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Lost connection to"), 60, TimeUnit.SECONDS);
|
||||
Log.info("Restart");
|
||||
switch (crashType) {
|
||||
case CRASH -> lazyFs2.start();
|
||||
case TORN_OP -> lazyFs2.startTornOp();
|
||||
case TORN_SEQ -> lazyFs2.startTornSeq();
|
||||
}
|
||||
container2.start();
|
||||
|
||||
waitingConsumer2 = new WaitingConsumer();
|
||||
var loggingConsumer2 = new Slf4jLogConsumer(LoggerFactory.getLogger(LazyFsIT.class)).withPrefix("2-" + testInfo.getDisplayName());
|
||||
container2.followOutput(loggingConsumer2.andThen(waitingConsumer2));
|
||||
try {
|
||||
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
|
||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
|
||||
} catch (TimeoutException e) {
|
||||
Log.info("Failed to connect: " + testInfo.getDisplayName());
|
||||
if (crashType.equals(CrashType.CRASH))
|
||||
throw e;
|
||||
// LazyFs can crash too early
|
||||
|
||||
Assumptions.assumeTrue(false);
|
||||
}
|
||||
var barrier2 = new CountDownLatch(1);
|
||||
executor.submit(() -> {
|
||||
try {
|
||||
Log.info("Writing to container 1");
|
||||
barrier2.countDown();
|
||||
container1.execInContainer("/bin/sh", "-c", "counter=0; while [ ! -f /tmp/stopprinting2 ]; do counter=`expr $counter + 1`; echo $counter >> /dhfs_test/fuse/test2; done");
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
});
|
||||
barrier2.await();
|
||||
Log.info("Killing");
|
||||
Thread.sleep(3000);
|
||||
if (crashType.equals(CrashType.CRASH)) {
|
||||
lazyFs2.crash();
|
||||
}
|
||||
container1.execInContainer("/bin/sh", "-c", "touch /tmp/stopprinting2");
|
||||
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Caused by: org.lmdbjava"), 60, TimeUnit.SECONDS);
|
||||
client.killContainerCmd(container2.getContainerId()).exec();
|
||||
container2.stop();
|
||||
lazyFs2.stop();
|
||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Lost connection to"), 60, TimeUnit.SECONDS);
|
||||
Log.info("Restart");
|
||||
lazyFs2.start();
|
||||
container2.start();
|
||||
|
||||
waitingConsumer2 = new WaitingConsumer();
|
||||
loggingConsumer2 = new Slf4jLogConsumer(LoggerFactory.getLogger(LazyFsIT.class)).withPrefix("2-" + testInfo.getDisplayName());
|
||||
container2.followOutput(loggingConsumer2.andThen(waitingConsumer2));
|
||||
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
|
||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
|
||||
|
||||
checkConsistency(testInfo.getDisplayName());
|
||||
}
|
||||
|
||||
|
||||
@ParameterizedTest
|
||||
@EnumSource(CrashType.class)
|
||||
void killTestDirs2(CrashType crashType, TestInfo testInfo) throws Exception {
|
||||
var barrier = new CountDownLatch(1);
|
||||
executor.submit(() -> {
|
||||
try {
|
||||
Log.info("Writing to container 1");
|
||||
barrier.countDown();
|
||||
container1.execInContainer("/bin/sh", "-c", "counter=0; while [ ! -f /tmp/stopprinting1 ]; do counter=`expr $counter + 1`; echo $counter >> /dhfs_test/fuse/test$counter; done");
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
});
|
||||
barrier.await();
|
||||
Thread.sleep(3000);
|
||||
Log.info("Killing");
|
||||
lazyFs2.crash();
|
||||
container1.execInContainer("/bin/sh", "-c", "touch /tmp/stopprinting1");
|
||||
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Caused by: org.lmdbjava"), 60, TimeUnit.SECONDS);
|
||||
var client = DockerClientFactory.instance().client();
|
||||
client.killContainerCmd(container2.getContainerId()).exec();
|
||||
container2.stop();
|
||||
lazyFs2.stop();
|
||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Lost connection to"), 60, TimeUnit.SECONDS);
|
||||
Log.info("Restart");
|
||||
switch (crashType) {
|
||||
case CRASH -> lazyFs2.start();
|
||||
case TORN_OP -> lazyFs2.startTornOp();
|
||||
case TORN_SEQ -> lazyFs2.startTornSeq();
|
||||
}
|
||||
container2.start();
|
||||
|
||||
waitingConsumer2 = new WaitingConsumer();
|
||||
var loggingConsumer2 = new Slf4jLogConsumer(LoggerFactory.getLogger(LazyFsIT.class)).withPrefix("2-" + testInfo.getDisplayName());
|
||||
container2.followOutput(loggingConsumer2.andThen(waitingConsumer2));
|
||||
try {
|
||||
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
|
||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
|
||||
} catch (TimeoutException e) {
|
||||
Log.info("Failed to connect: " + testInfo.getDisplayName());
|
||||
if (crashType.equals(CrashType.CRASH))
|
||||
throw e;
|
||||
// LazyFs can crash too early
|
||||
|
||||
Assumptions.assumeTrue(false);
|
||||
}
|
||||
|
||||
var barrier2 = new CountDownLatch(1);
|
||||
executor.submit(() -> {
|
||||
try {
|
||||
Log.info("Writing to container 1");
|
||||
barrier2.countDown();
|
||||
container1.execInContainer("/bin/sh", "-c", "counter=0; while [ ! -f /tmp/stopprinting2 ]; do counter=`expr $counter + 1`; echo $counter >> /dhfs_test/fuse/2test$counter; done");
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
});
|
||||
barrier2.await();
|
||||
Thread.sleep(3000);
|
||||
Log.info("Killing");
|
||||
if (crashType.equals(CrashType.CRASH)) {
|
||||
lazyFs2.crash();
|
||||
}
|
||||
container1.execInContainer("/bin/sh", "-c", "touch /tmp/stopprinting2");
|
||||
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Caused by: org.lmdbjava"), 60, TimeUnit.SECONDS);
|
||||
client.killContainerCmd(container2.getContainerId()).exec();
|
||||
container2.stop();
|
||||
lazyFs2.stop();
|
||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Lost connection to"), 60, TimeUnit.SECONDS);
|
||||
Log.info("Restart");
|
||||
lazyFs2.start();
|
||||
container2.start();
|
||||
|
||||
waitingConsumer2 = new WaitingConsumer();
|
||||
loggingConsumer2 = new Slf4jLogConsumer(LoggerFactory.getLogger(LazyFsIT.class)).withPrefix("2-" + testInfo.getDisplayName());
|
||||
container2.followOutput(loggingConsumer2.andThen(waitingConsumer2));
|
||||
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
|
||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
|
||||
|
||||
checkConsistency(testInfo.getDisplayName());
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
@@ -29,9 +29,11 @@ public class ResyncIT {
|
||||
String c1uuid;
|
||||
String c2uuid;
|
||||
|
||||
Network network;
|
||||
|
||||
@BeforeEach
|
||||
void setup(TestInfo testInfo) throws IOException, InterruptedException, TimeoutException {
|
||||
Network network = Network.newNetwork();
|
||||
network = Network.newNetwork();
|
||||
|
||||
container1 = new GenericContainer<>(DhfsImage.getInstance())
|
||||
.withPrivilegedMode(true)
|
||||
@@ -55,6 +57,7 @@ public class ResyncIT {
|
||||
@AfterEach
|
||||
void stop() {
|
||||
Stream.of(container1, container2).parallel().forEach(GenericContainer::stop);
|
||||
network.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
|
||||
@@ -72,26 +72,6 @@
|
||||
<artifactId>quarkus-junit5</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.github.SerCeMan</groupId>
|
||||
<artifactId>jnr-fuse</artifactId>
|
||||
<version>44ed40f8ce</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.github.jnr</groupId>
|
||||
<artifactId>jnr-ffi</artifactId>
|
||||
<version>2.2.16</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.github.jnr</groupId>
|
||||
<artifactId>jnr-posix</artifactId>
|
||||
<version>3.1.19</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.github.jnr</groupId>
|
||||
<artifactId>jnr-constants</artifactId>
|
||||
<version>0.10.4</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.commons</groupId>
|
||||
<artifactId>commons-lang3</artifactId>
|
||||
|
||||
@@ -1,10 +1,10 @@
|
||||
package com.usatiuk.dhfs.files.objects;
|
||||
|
||||
import com.usatiuk.dhfs.JDataRemote;
|
||||
import com.usatiuk.objects.JObjectKey;
|
||||
import com.usatiuk.dhfs.jmap.JMapHolder;
|
||||
import com.usatiuk.dhfs.jmap.JMapLongKey;
|
||||
import com.usatiuk.dhfs.repository.JDataRemoteDto;
|
||||
import com.usatiuk.objects.JObjectKey;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.Set;
|
||||
@@ -28,6 +28,10 @@ public record File(JObjectKey key, long mode, long cTime, long mTime,
|
||||
return new File(key, mode, cTime, mTime, symlink);
|
||||
}
|
||||
|
||||
public File withCurrentMTime() {
|
||||
return new File(key, mode, cTime, System.currentTimeMillis(), symlink);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Collection<JObjectKey> collectRefsTo() {
|
||||
return Set.of();
|
||||
|
||||
@@ -247,7 +247,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
} else if (dent instanceof RemoteObjectMeta) {
|
||||
var remote = remoteTx.getData(JDataRemote.class, uuid).orElse(null);
|
||||
if (remote instanceof File f) {
|
||||
remoteTx.putData(f.withMode(mode).withMTime(System.currentTimeMillis()));
|
||||
remoteTx.putData(f.withMode(mode).withCurrentMTime());
|
||||
return true;
|
||||
} else {
|
||||
throw new IllegalArgumentException(uuid + " is not a file");
|
||||
@@ -444,16 +444,16 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
}
|
||||
|
||||
for (var e : removedChunks.entrySet()) {
|
||||
Log.tracev("Removing chunk {0}-{1}", e.getKey(), e.getValue());
|
||||
// Log.tracev("Removing chunk {0}-{1}", e.getKey(), e.getValue());
|
||||
jMapHelper.delete(file, JMapLongKey.of(e.getKey()));
|
||||
}
|
||||
|
||||
for (var e : newChunks.entrySet()) {
|
||||
Log.tracev("Adding chunk {0}-{1}", e.getKey(), e.getValue());
|
||||
// Log.tracev("Adding chunk {0}-{1}", e.getKey(), e.getValue());
|
||||
jMapHelper.put(file, JMapLongKey.of(e.getKey()), e.getValue());
|
||||
}
|
||||
|
||||
remoteTx.putData(file);
|
||||
remoteTx.putData(file.withCurrentMTime());
|
||||
|
||||
return (long) data.size();
|
||||
});
|
||||
@@ -535,16 +535,16 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
// file = file.withChunks(file.chunks().minusAll(removedChunks.keySet()).plusAll(newChunks)).withMTime(System.currentTimeMillis());
|
||||
|
||||
for (var e : removedChunks.entrySet()) {
|
||||
Log.tracev("Removing chunk {0}-{1}", e.getKey(), e.getValue());
|
||||
// Log.tracev("Removing chunk {0}-{1}", e.getKey(), e.getValue());
|
||||
jMapHelper.delete(file, JMapLongKey.of(e.getKey()));
|
||||
}
|
||||
|
||||
for (var e : newChunks.entrySet()) {
|
||||
Log.tracev("Adding chunk {0}-{1}", e.getKey(), e.getValue());
|
||||
// Log.tracev("Adding chunk {0}-{1}", e.getKey(), e.getValue());
|
||||
jMapHelper.put(file, JMapLongKey.of(e.getKey()), e.getValue());
|
||||
}
|
||||
|
||||
remoteTx.putData(file);
|
||||
remoteTx.putData(file.withCurrentMTime());
|
||||
return true;
|
||||
});
|
||||
}
|
||||
|
||||
@@ -111,6 +111,7 @@ public abstract class DhfsFileServiceSimpleTestImpl {
|
||||
|
||||
var uuid = ret.get();
|
||||
|
||||
var curMtime = fileService.getattr(uuid).get().mtime();
|
||||
fileService.write(uuid, 0, new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9});
|
||||
Assertions.assertArrayEquals(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, fileService.read(uuid, 0, 10).get().toByteArray());
|
||||
Assertions.assertArrayEquals(new byte[]{2, 3, 4, 5, 6, 7, 8, 9}, fileService.read(uuid, 2, 8).get().toByteArray());
|
||||
@@ -123,6 +124,9 @@ public abstract class DhfsFileServiceSimpleTestImpl {
|
||||
fileService.write(uuid, 3, new byte[]{17, 18});
|
||||
Assertions.assertArrayEquals(new byte[]{0, 1, 2, 17, 18, 11, 15, 16, 8, 9, 13, 14}, fileService.read(uuid, 0, 12).get().toByteArray());
|
||||
|
||||
var newMtime = fileService.getattr(uuid).get().mtime();
|
||||
Assertions.assertTrue(newMtime > curMtime);
|
||||
|
||||
fileService.unlink("/writeTest");
|
||||
Assertions.assertFalse(fileService.open("/writeTest").isPresent());
|
||||
}
|
||||
|
||||
@@ -73,24 +73,9 @@
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.github.SerCeMan</groupId>
|
||||
<groupId>com.github.serceman</groupId>
|
||||
<artifactId>jnr-fuse</artifactId>
|
||||
<version>44ed40f8ce</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.github.jnr</groupId>
|
||||
<artifactId>jnr-ffi</artifactId>
|
||||
<version>2.2.16</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.github.jnr</groupId>
|
||||
<artifactId>jnr-posix</artifactId>
|
||||
<version>3.1.19</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.github.jnr</groupId>
|
||||
<artifactId>jnr-constants</artifactId>
|
||||
<version>0.10.4</version>
|
||||
<version>0.5.8</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.commons</groupId>
|
||||
|
||||
@@ -1,6 +1,13 @@
|
||||
package com.usatiuk.objects;
|
||||
|
||||
public sealed interface JDataVersionedWrapper permits JDataVersionedWrapperLazy, JDataVersionedWrapperImpl {
|
||||
import com.usatiuk.objects.iterators.Data;
|
||||
|
||||
public sealed interface JDataVersionedWrapper extends Data<JDataVersionedWrapper> permits JDataVersionedWrapperLazy, JDataVersionedWrapperImpl {
|
||||
@Override
|
||||
default JDataVersionedWrapper value() {
|
||||
return this;
|
||||
}
|
||||
|
||||
JData data();
|
||||
|
||||
long version();
|
||||
|
||||
@@ -24,6 +24,8 @@ public class JDataVersionedWrapperSerializer implements ObjectSerializer<JDataVe
|
||||
public JDataVersionedWrapper deserialize(ByteString data) {
|
||||
var version = data.substring(0, Long.BYTES).asReadOnlyByteBuffer().getLong();
|
||||
var rawData = data.substring(Long.BYTES);
|
||||
return new JDataVersionedWrapperLazy(version, rawData.size(), () -> dataSerializer.deserialize(rawData));
|
||||
return new JDataVersionedWrapperLazy(version, rawData.size(),
|
||||
() -> dataSerializer.deserialize(rawData)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,9 +2,6 @@ package com.usatiuk.objects.iterators;
|
||||
|
||||
import java.util.Optional;
|
||||
|
||||
public record Data<V>(V value) implements MaybeTombstone<V> {
|
||||
@Override
|
||||
public Optional<V> opt() {
|
||||
return Optional.of(value);
|
||||
}
|
||||
public interface Data<V> extends MaybeTombstone<V> {
|
||||
V value();
|
||||
}
|
||||
|
||||
@@ -0,0 +1,6 @@
|
||||
package com.usatiuk.objects.iterators;
|
||||
|
||||
import java.util.Optional;
|
||||
|
||||
public record DataWrapper<V>(V value) implements Data<V> {
|
||||
}
|
||||
@@ -39,20 +39,20 @@ public class KeyPredicateKvIterator<K extends Comparable<K>, V> extends Reversib
|
||||
}
|
||||
|
||||
|
||||
switch (start) {
|
||||
case LT -> {
|
||||
// assert _next == null || _next.getKey().compareTo(startKey) < 0;
|
||||
}
|
||||
case LE -> {
|
||||
// assert _next == null || _next.getKey().compareTo(startKey) <= 0;
|
||||
}
|
||||
case GT -> {
|
||||
assert _next == null || _next.compareTo(startKey) > 0;
|
||||
}
|
||||
case GE -> {
|
||||
assert _next == null || _next.compareTo(startKey) >= 0;
|
||||
}
|
||||
}
|
||||
// switch (start) {
|
||||
// case LT -> {
|
||||
//// assert _next == null || _next.getKey().compareTo(startKey) < 0;
|
||||
// }
|
||||
// case LE -> {
|
||||
//// assert _next == null || _next.getKey().compareTo(startKey) <= 0;
|
||||
// }
|
||||
// case GT -> {
|
||||
// assert _next == null || _next.compareTo(startKey) > 0;
|
||||
// }
|
||||
// case GE -> {
|
||||
// assert _next == null || _next.compareTo(startKey) >= 0;
|
||||
// }
|
||||
// }
|
||||
}
|
||||
|
||||
private void fillNext() {
|
||||
|
||||
@@ -3,5 +3,4 @@ package com.usatiuk.objects.iterators;
|
||||
import java.util.Optional;
|
||||
|
||||
public interface MaybeTombstone<T> {
|
||||
Optional<T> opt();
|
||||
}
|
||||
|
||||
@@ -80,21 +80,21 @@ public class MergingKvIterator<K extends Comparable<K>, V> extends ReversibleKvI
|
||||
advanceIterator(iterator);
|
||||
}
|
||||
|
||||
Log.tracev("{0} Initialized: {1}", _name, _sortedIterators);
|
||||
switch (startType) {
|
||||
// case LT -> {
|
||||
// assert _sortedIterators.isEmpty() || _sortedIterators.firstKey().compareTo(initialStartKey) < 0;
|
||||
// Log.tracev("{0} Initialized: {1}", _name, _sortedIterators);
|
||||
// switch (startType) {
|
||||
//// case LT -> {
|
||||
//// assert _sortedIterators.isEmpty() || _sortedIterators.firstKey().compareTo(initialStartKey) < 0;
|
||||
//// }
|
||||
//// case LE -> {
|
||||
//// assert _sortedIterators.isEmpty() || _sortedIterators.firstKey().compareTo(initialStartKey) <= 0;
|
||||
//// }
|
||||
// case GT -> {
|
||||
// assert _sortedIterators.isEmpty() || _sortedIterators.firstKey().compareTo(startKey) > 0;
|
||||
// }
|
||||
// case LE -> {
|
||||
// assert _sortedIterators.isEmpty() || _sortedIterators.firstKey().compareTo(initialStartKey) <= 0;
|
||||
// case GE -> {
|
||||
// assert _sortedIterators.isEmpty() || _sortedIterators.firstKey().compareTo(startKey) >= 0;
|
||||
// }
|
||||
case GT -> {
|
||||
assert _sortedIterators.isEmpty() || _sortedIterators.firstKey().compareTo(startKey) > 0;
|
||||
}
|
||||
case GE -> {
|
||||
assert _sortedIterators.isEmpty() || _sortedIterators.firstKey().compareTo(startKey) >= 0;
|
||||
}
|
||||
}
|
||||
// }
|
||||
}
|
||||
|
||||
@SafeVarargs
|
||||
@@ -105,7 +105,7 @@ public class MergingKvIterator<K extends Comparable<K>, V> extends ReversibleKvI
|
||||
private void advanceIterator(IteratorEntry<K, V> iteratorEntry) {
|
||||
while (iteratorEntry.iterator().hasNext()) {
|
||||
K key = iteratorEntry.iterator().peekNextKey();
|
||||
Log.tracev("{0} Advance peeked: {1}-{2}", _name, iteratorEntry, key);
|
||||
// Log.tracev("{0} Advance peeked: {1}-{2}", _name, iteratorEntry, key);
|
||||
|
||||
MutableObject<IteratorEntry<K, V>> mutableBoolean = new MutableObject<>(null);
|
||||
|
||||
@@ -125,7 +125,7 @@ public class MergingKvIterator<K extends Comparable<K>, V> extends ReversibleKvI
|
||||
});
|
||||
|
||||
if (newVal != iteratorEntry) {
|
||||
Log.tracev("{0} Skipped: {1}", _name, iteratorEntry.iterator().peekNextKey());
|
||||
// Log.tracev("{0} Skipped: {1}", _name, iteratorEntry.iterator().peekNextKey());
|
||||
iteratorEntry.iterator().skip();
|
||||
continue;
|
||||
}
|
||||
@@ -141,7 +141,7 @@ public class MergingKvIterator<K extends Comparable<K>, V> extends ReversibleKvI
|
||||
@Override
|
||||
protected void reverse() {
|
||||
var cur = _goingForward ? _sortedIterators.pollFirstEntry() : _sortedIterators.pollLastEntry();
|
||||
Log.tracev("{0} Reversing from {1}", _name, cur);
|
||||
// Log.tracev("{0} Reversing from {1}", _name, cur);
|
||||
_goingForward = !_goingForward;
|
||||
_sortedIterators.clear();
|
||||
for (IteratorEntry<K, V> iterator : _iterators) {
|
||||
@@ -176,7 +176,7 @@ public class MergingKvIterator<K extends Comparable<K>, V> extends ReversibleKvI
|
||||
}
|
||||
cur.getValue().iterator().skip();
|
||||
advanceIterator(cur.getValue());
|
||||
Log.tracev("{0} Skip: {1}, next: {2}", _name, cur, _sortedIterators);
|
||||
// Log.tracev("{0} Skip: {1}, next: {2}", _name, cur, _sortedIterators);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -9,22 +9,22 @@ public class NavigableMapKvIterator<K extends Comparable<K>, V> extends Reversib
|
||||
private Iterator<Map.Entry<K, V>> _iterator;
|
||||
private Map.Entry<K, V> _next;
|
||||
|
||||
public NavigableMapKvIterator(NavigableMap<K, V> map, IteratorStart start, K key) {
|
||||
_map = map;
|
||||
public NavigableMapKvIterator(NavigableMap<K, ? extends V> map, IteratorStart start, K key) {
|
||||
_map = (NavigableMap<K, V>) map;
|
||||
SortedMap<K, V> _view;
|
||||
_goingForward = true;
|
||||
switch (start) {
|
||||
case GE -> _view = map.tailMap(key, true);
|
||||
case GT -> _view = map.tailMap(key, false);
|
||||
case GE -> _view = _map.tailMap(key, true);
|
||||
case GT -> _view = _map.tailMap(key, false);
|
||||
case LE -> {
|
||||
var floorKey = map.floorKey(key);
|
||||
var floorKey = _map.floorKey(key);
|
||||
if (floorKey == null) _view = _map;
|
||||
else _view = map.tailMap(floorKey, true);
|
||||
else _view = _map.tailMap(floorKey, true);
|
||||
}
|
||||
case LT -> {
|
||||
var lowerKey = map.lowerKey(key);
|
||||
if (lowerKey == null) _view = _map;
|
||||
else _view = map.tailMap(lowerKey, true);
|
||||
else _view = _map.tailMap(lowerKey, true);
|
||||
}
|
||||
default -> throw new IllegalArgumentException("Unknown start type");
|
||||
}
|
||||
|
||||
@@ -44,20 +44,20 @@ public class PredicateKvIterator<K extends Comparable<K>, V, V_T> extends Revers
|
||||
}
|
||||
|
||||
|
||||
switch (start) {
|
||||
case LT -> {
|
||||
// assert _next == null || _next.getKey().compareTo(startKey) < 0;
|
||||
}
|
||||
case LE -> {
|
||||
// assert _next == null || _next.getKey().compareTo(startKey) <= 0;
|
||||
}
|
||||
case GT -> {
|
||||
assert _next == null || _next.getKey().compareTo(startKey) > 0;
|
||||
}
|
||||
case GE -> {
|
||||
assert _next == null || _next.getKey().compareTo(startKey) >= 0;
|
||||
}
|
||||
}
|
||||
// switch (start) {
|
||||
// case LT -> {
|
||||
//// assert _next == null || _next.getKey().compareTo(startKey) < 0;
|
||||
// }
|
||||
// case LE -> {
|
||||
//// assert _next == null || _next.getKey().compareTo(startKey) <= 0;
|
||||
// }
|
||||
// case GT -> {
|
||||
// assert _next == null || _next.getKey().compareTo(startKey) > 0;
|
||||
// }
|
||||
// case GE -> {
|
||||
// assert _next == null || _next.getKey().compareTo(startKey) >= 0;
|
||||
// }
|
||||
// }
|
||||
}
|
||||
|
||||
private void fillNext() {
|
||||
@@ -81,8 +81,8 @@ public class PredicateKvIterator<K extends Comparable<K>, V, V_T> extends Revers
|
||||
else if (!_goingForward && !wasAtEnd)
|
||||
_backing.skipPrev();
|
||||
|
||||
if (!wasAtEnd)
|
||||
Log.tracev("Skipped in reverse: {0}", _next);
|
||||
// if (!wasAtEnd)
|
||||
// Log.tracev("Skipped in reverse: {0}", _next);
|
||||
|
||||
_next = null;
|
||||
_checkedNext = false;
|
||||
|
||||
@@ -2,9 +2,5 @@ package com.usatiuk.objects.iterators;
|
||||
|
||||
import java.util.Optional;
|
||||
|
||||
public record Tombstone<V>() implements MaybeTombstone<V> {
|
||||
@Override
|
||||
public Optional<V> opt() {
|
||||
return Optional.empty();
|
||||
}
|
||||
public interface Tombstone<V> extends MaybeTombstone<V> {
|
||||
}
|
||||
|
||||
@@ -0,0 +1,4 @@
|
||||
package com.usatiuk.objects.iterators;
|
||||
|
||||
public record TombstoneImpl<V>() implements Tombstone<V> {
|
||||
}
|
||||
@@ -1,83 +1,24 @@
|
||||
package com.usatiuk.objects.iterators;
|
||||
|
||||
import io.quarkus.logging.Log;
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
public class TombstoneMergingKvIterator<K extends Comparable<K>, V> implements CloseableKvIterator<K, V> {
|
||||
private final CloseableKvIterator<K, V> _backing;
|
||||
private final String _name;
|
||||
|
||||
public TombstoneMergingKvIterator(String name, IteratorStart startType, K startKey, List<IterProdFn<K, MaybeTombstone<V>>> iterators) {
|
||||
_name = name;
|
||||
_backing = new PredicateKvIterator<>(
|
||||
new MergingKvIterator<>(name + "-merging", startType, startKey, iterators),
|
||||
public abstract class TombstoneMergingKvIterator {
|
||||
public static <K extends Comparable<K>, V> CloseableKvIterator<K, V> of(String name, IteratorStart startType, K startKey, List<IterProdFn<K, MaybeTombstone<V>>> iterators) {
|
||||
return new PredicateKvIterator<K, MaybeTombstone<V>, V>(
|
||||
new MergingKvIterator<K, MaybeTombstone<V>>(name + "-merging", startType, startKey, iterators),
|
||||
startType, startKey,
|
||||
pair -> {
|
||||
Log.tracev("{0} - Processing pair {1}", _name, pair);
|
||||
if (pair instanceof Tombstone) {
|
||||
// Log.tracev("{0} - Processing pair {1}", name, pair);
|
||||
if (pair instanceof Tombstone<V>) {
|
||||
return null;
|
||||
}
|
||||
return ((Data<V>) pair).value();
|
||||
});
|
||||
}
|
||||
|
||||
@SafeVarargs
|
||||
public TombstoneMergingKvIterator(String name, IteratorStart startType, K startKey, IterProdFn<K, MaybeTombstone<V>>... iterators) {
|
||||
this(name, startType, startKey, List.of(iterators));
|
||||
}
|
||||
|
||||
@Override
|
||||
public K peekNextKey() {
|
||||
return _backing.peekNextKey();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void skip() {
|
||||
_backing.skip();
|
||||
}
|
||||
|
||||
@Override
|
||||
public K peekPrevKey() {
|
||||
return _backing.peekPrevKey();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Pair<K, V> prev() {
|
||||
return _backing.prev();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasPrev() {
|
||||
return _backing.hasPrev();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void skipPrev() {
|
||||
_backing.skipPrev();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
_backing.close();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasNext() {
|
||||
return _backing.hasNext();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Pair<K, V> next() {
|
||||
return _backing.next();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "TombstoneMergingKvIterator{" +
|
||||
"_backing=" + _backing +
|
||||
", _name='" + _name + '\'' +
|
||||
'}';
|
||||
public static <K extends Comparable<K>, V> CloseableKvIterator<K, V> of(String name, IteratorStart startType, K startKey, IterProdFn<K, MaybeTombstone<V>>... iterators) {
|
||||
return of(name, startType, startKey, List.of(iterators));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,29 +0,0 @@
|
||||
package com.usatiuk.objects.snapshot;
|
||||
|
||||
import com.usatiuk.objects.JDataVersionedWrapper;
|
||||
import com.usatiuk.objects.JObjectKey;
|
||||
import com.usatiuk.objects.stores.WritebackObjectPersistentStore;
|
||||
import com.usatiuk.objects.transaction.TxRecord;
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Singleton;
|
||||
|
||||
import javax.annotation.Nonnull;
|
||||
import java.util.Collection;
|
||||
import java.util.Optional;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
@Singleton
|
||||
public class SnapshotManager {
|
||||
@Inject
|
||||
WritebackObjectPersistentStore writebackStore;
|
||||
|
||||
public Snapshot<JObjectKey, JDataVersionedWrapper> createSnapshot() {
|
||||
return writebackStore.getSnapshot();
|
||||
}
|
||||
|
||||
// This should not be called for the same objects concurrently
|
||||
public Consumer<Runnable> commitTx(Collection<TxRecord.TxObjectRecord<?>> writes) {
|
||||
return writebackStore.commitTx(writes);
|
||||
}
|
||||
}
|
||||
@@ -34,22 +34,23 @@ public class CachingObjectPersistentStore {
|
||||
long version,
|
||||
int sizeLimit) {
|
||||
public Cache withPut(JObjectKey key, Optional<JDataVersionedWrapper> obj) {
|
||||
int objSize = obj.map(JDataVersionedWrapper::estimateSize).orElse(16);
|
||||
var entry = obj.<CacheEntry>map(o -> new CacheEntryPresent(o, o.estimateSize())).orElse(new CacheEntryMiss());
|
||||
|
||||
int newSize = size() + objSize;
|
||||
var entry = new CacheEntry(obj.<MaybeTombstone<JDataVersionedWrapper>>map(Data::new).orElse(new Tombstone<>()), objSize);
|
||||
int newSize = size() + entry.size();
|
||||
|
||||
var old = map.get(key);
|
||||
if (old != null)
|
||||
newSize -= old.size();
|
||||
|
||||
TreePMap<JObjectKey, CacheEntry> newCache = map().plus(key, entry);
|
||||
TreePMap<JObjectKey, CacheEntry> newCache = map();
|
||||
|
||||
while (newSize > sizeLimit) {
|
||||
var del = newCache.firstEntry();
|
||||
newCache = newCache.minusFirstEntry();
|
||||
newSize -= del.getValue().size();
|
||||
}
|
||||
|
||||
newCache = newCache.plus(key, entry);
|
||||
return new Cache(
|
||||
newCache,
|
||||
newSize,
|
||||
@@ -187,16 +188,9 @@ public class CachingObjectPersistentStore {
|
||||
|
||||
@Override
|
||||
public CloseableKvIterator<JObjectKey, JDataVersionedWrapper> getIterator(IteratorStart start, JObjectKey key) {
|
||||
return new TombstoneMergingKvIterator<>("cache", start, key,
|
||||
(mS, mK)
|
||||
-> new MappingKvIterator<>(
|
||||
new NavigableMapKvIterator<>(_curCache.map(), mS, mK),
|
||||
e -> {
|
||||
// Log.tracev("Taken from cache: {0}", e);
|
||||
return e.object();
|
||||
}
|
||||
),
|
||||
(mS, mK) -> new MappingKvIterator<>(new CachingKvIterator(_backing.getIterator(start, key)), Data::new));
|
||||
return TombstoneMergingKvIterator.<JObjectKey, JDataVersionedWrapper>of("cache", start, key,
|
||||
(mS, mK) -> new NavigableMapKvIterator<JObjectKey, MaybeTombstone<JDataVersionedWrapper>>(_curCache.map(), mS, mK),
|
||||
(mS, mK) -> new CachingKvIterator(_backing.getIterator(start, key)));
|
||||
}
|
||||
|
||||
@Nonnull
|
||||
@@ -204,12 +198,12 @@ public class CachingObjectPersistentStore {
|
||||
public Optional<JDataVersionedWrapper> readObject(JObjectKey name) {
|
||||
var cached = _curCache.map().get(name);
|
||||
if (cached != null) {
|
||||
return switch (cached.object()) {
|
||||
case Data<JDataVersionedWrapper> data -> Optional.of(data.value());
|
||||
case Tombstone<JDataVersionedWrapper> tombstone -> {
|
||||
return switch (cached) {
|
||||
case CacheEntryPresent data -> Optional.of(data.value());
|
||||
case CacheEntryMiss tombstone -> {
|
||||
yield Optional.empty();
|
||||
}
|
||||
default -> throw new IllegalStateException("Unexpected value: " + cached.object());
|
||||
default -> throw new IllegalStateException("Unexpected value: " + cached);
|
||||
};
|
||||
}
|
||||
var read = _backing.readObject(name);
|
||||
@@ -228,7 +222,7 @@ public class CachingObjectPersistentStore {
|
||||
_backing.close();
|
||||
}
|
||||
|
||||
private class CachingKvIterator implements CloseableKvIterator<JObjectKey, JDataVersionedWrapper> {
|
||||
private class CachingKvIterator implements CloseableKvIterator<JObjectKey, MaybeTombstone<JDataVersionedWrapper>> {
|
||||
private final CloseableKvIterator<JObjectKey, JDataVersionedWrapper> _delegate;
|
||||
|
||||
private CachingKvIterator(CloseableKvIterator<JObjectKey, JDataVersionedWrapper> delegate) {
|
||||
@@ -261,10 +255,10 @@ public class CachingObjectPersistentStore {
|
||||
}
|
||||
|
||||
@Override
|
||||
public Pair<JObjectKey, JDataVersionedWrapper> prev() {
|
||||
public Pair<JObjectKey, MaybeTombstone<JDataVersionedWrapper>> prev() {
|
||||
var prev = _delegate.prev();
|
||||
maybeCache(prev.getKey(), Optional.of(prev.getValue()));
|
||||
return prev;
|
||||
return (Pair<JObjectKey, MaybeTombstone<JDataVersionedWrapper>>) (Pair<JObjectKey, ?>) prev;
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -278,10 +272,10 @@ public class CachingObjectPersistentStore {
|
||||
}
|
||||
|
||||
@Override
|
||||
public Pair<JObjectKey, JDataVersionedWrapper> next() {
|
||||
public Pair<JObjectKey, MaybeTombstone<JDataVersionedWrapper>> next() {
|
||||
var next = _delegate.next();
|
||||
maybeCache(next.getKey(), Optional.of(next.getValue()));
|
||||
return next;
|
||||
return (Pair<JObjectKey, MaybeTombstone<JDataVersionedWrapper>>) (Pair<JObjectKey, ?>) next;
|
||||
}
|
||||
}
|
||||
};
|
||||
@@ -294,6 +288,18 @@ public class CachingObjectPersistentStore {
|
||||
}
|
||||
}
|
||||
|
||||
private record CacheEntry(MaybeTombstone<JDataVersionedWrapper> object, int size) {
|
||||
private interface CacheEntry extends MaybeTombstone<JDataVersionedWrapper> {
|
||||
int size();
|
||||
}
|
||||
|
||||
private record CacheEntryPresent(JDataVersionedWrapper value,
|
||||
int size) implements CacheEntry, Data<JDataVersionedWrapper> {
|
||||
}
|
||||
|
||||
private record CacheEntryMiss() implements CacheEntry, Tombstone<JDataVersionedWrapper> {
|
||||
@Override
|
||||
public int size() {
|
||||
return 64;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -208,6 +208,7 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore {
|
||||
// private final Exception _allocationStacktrace = new Exception();
|
||||
private final Exception _allocationStacktrace = null;
|
||||
private boolean _hasNext = false;
|
||||
private JObjectKey _peekedNextKey = null;
|
||||
|
||||
LmdbKvIterator(RefcountedCloseable<Txn<ByteBuffer>> txn, IteratorStart start, JObjectKey key) {
|
||||
_txn = txn;
|
||||
@@ -277,24 +278,24 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore {
|
||||
}
|
||||
}
|
||||
|
||||
var realGot = JObjectKey.fromByteBuffer(_cursor.key());
|
||||
_cursor.key().flip();
|
||||
|
||||
switch (start) {
|
||||
case LT -> {
|
||||
// assert !_hasNext || realGot.compareTo(key) < 0;
|
||||
}
|
||||
case LE -> {
|
||||
// assert !_hasNext || realGot.compareTo(key) <= 0;
|
||||
}
|
||||
case GT -> {
|
||||
assert !_hasNext || realGot.compareTo(key) > 0;
|
||||
}
|
||||
case GE -> {
|
||||
assert !_hasNext || realGot.compareTo(key) >= 0;
|
||||
}
|
||||
}
|
||||
Log.tracev("got: {0}, hasNext: {1}", realGot, _hasNext);
|
||||
// var realGot = JObjectKey.fromByteBuffer(_cursor.key());
|
||||
// _cursor.key().flip();
|
||||
//
|
||||
// switch (start) {
|
||||
// case LT -> {
|
||||
//// assert !_hasNext || realGot.compareTo(key) < 0;
|
||||
// }
|
||||
// case LE -> {
|
||||
//// assert !_hasNext || realGot.compareTo(key) <= 0;
|
||||
// }
|
||||
// case GT -> {
|
||||
// assert !_hasNext || realGot.compareTo(key) > 0;
|
||||
// }
|
||||
// case GE -> {
|
||||
// assert !_hasNext || realGot.compareTo(key) >= 0;
|
||||
// }
|
||||
// }
|
||||
// Log.tracev("got: {0}, hasNext: {1}", realGot, _hasNext);
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -323,6 +324,7 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore {
|
||||
}
|
||||
}
|
||||
_goingForward = !_goingForward;
|
||||
_peekedNextKey = null;
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -330,8 +332,12 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore {
|
||||
if (!_hasNext) {
|
||||
throw new NoSuchElementException("No more elements");
|
||||
}
|
||||
if (_peekedNextKey != null) {
|
||||
return _peekedNextKey;
|
||||
}
|
||||
var ret = JObjectKey.fromByteBuffer(_cursor.key());
|
||||
_cursor.key().flip();
|
||||
_peekedNextKey = ret;
|
||||
return ret;
|
||||
}
|
||||
|
||||
@@ -341,6 +347,7 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore {
|
||||
_hasNext = _cursor.next();
|
||||
else
|
||||
_hasNext = _cursor.prev();
|
||||
_peekedNextKey = null;
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -361,7 +368,8 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore {
|
||||
_hasNext = _cursor.next();
|
||||
else
|
||||
_hasNext = _cursor.prev();
|
||||
Log.tracev("Read: {0}, hasNext: {1}", ret, _hasNext);
|
||||
// Log.tracev("Read: {0}, hasNext: {1}", ret, _hasNext);
|
||||
_peekedNextKey = null;
|
||||
return ret;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,6 +1,9 @@
|
||||
package com.usatiuk.objects.stores;
|
||||
|
||||
import com.usatiuk.objects.JDataVersionedWrapper;
|
||||
import com.usatiuk.objects.JObjectKey;
|
||||
import com.usatiuk.objects.iterators.Tombstone;
|
||||
|
||||
public record PendingDelete(JObjectKey key, long bundleId) implements PendingWriteEntry {
|
||||
public record PendingDelete(JObjectKey key,
|
||||
long bundleId) implements PendingWriteEntry, Tombstone<JDataVersionedWrapper> {
|
||||
}
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package com.usatiuk.objects.stores;
|
||||
|
||||
import com.usatiuk.objects.JDataVersionedWrapper;
|
||||
import com.usatiuk.objects.iterators.Data;
|
||||
|
||||
public record PendingWrite(JDataVersionedWrapper data, long bundleId) implements PendingWriteEntry {
|
||||
public record PendingWrite(JDataVersionedWrapper value, long bundleId) implements PendingWriteEntry, Data<JDataVersionedWrapper> {
|
||||
}
|
||||
|
||||
@@ -1,5 +1,8 @@
|
||||
package com.usatiuk.objects.stores;
|
||||
|
||||
public interface PendingWriteEntry {
|
||||
import com.usatiuk.objects.JDataVersionedWrapper;
|
||||
import com.usatiuk.objects.iterators.MaybeTombstone;
|
||||
|
||||
public interface PendingWriteEntry extends MaybeTombstone<JDataVersionedWrapper> {
|
||||
long bundleId();
|
||||
}
|
||||
|
||||
@@ -81,6 +81,7 @@ public class WritebackObjectPersistentStore {
|
||||
lastTxId = s.id();
|
||||
}
|
||||
_lastCommittedId.set(lastTxId);
|
||||
_lastWrittenId.set(lastTxId);
|
||||
_pendingWrites.set(new PendingWriteData(TreePMap.empty(), lastTxId, lastTxId));
|
||||
_ready = true;
|
||||
}
|
||||
@@ -350,15 +351,9 @@ public class WritebackObjectPersistentStore {
|
||||
|
||||
@Override
|
||||
public CloseableKvIterator<JObjectKey, JDataVersionedWrapper> getIterator(IteratorStart start, JObjectKey key) {
|
||||
return new TombstoneMergingKvIterator<>("writeback-ps", start, key,
|
||||
(tS, tK) -> new MappingKvIterator<>(
|
||||
new NavigableMapKvIterator<>(_pendingWrites, tS, tK),
|
||||
e -> switch (e) {
|
||||
case PendingWrite pw -> new Data<>(pw.data());
|
||||
case PendingDelete d -> new Tombstone<>();
|
||||
default -> throw new IllegalStateException("Unexpected value: " + e);
|
||||
}),
|
||||
(tS, tK) -> new MappingKvIterator<>(_cache.getIterator(tS, tK), Data::new));
|
||||
return TombstoneMergingKvIterator.<JObjectKey, JDataVersionedWrapper>of("writeback-ps", start, key,
|
||||
(tS, tK) -> new NavigableMapKvIterator<>(_pendingWrites, tS, tK),
|
||||
(tS, tK) -> (CloseableKvIterator<JObjectKey, MaybeTombstone<JDataVersionedWrapper>>) (CloseableKvIterator<JObjectKey, ?>) _cache.getIterator(tS, tK));
|
||||
}
|
||||
|
||||
@Nonnull
|
||||
@@ -367,7 +362,7 @@ public class WritebackObjectPersistentStore {
|
||||
var cached = _pendingWrites.get(name);
|
||||
if (cached != null) {
|
||||
return switch (cached) {
|
||||
case PendingWrite c -> Optional.of(c.data());
|
||||
case PendingWrite c -> Optional.of(c.value());
|
||||
case PendingDelete d -> {
|
||||
yield Optional.empty();
|
||||
}
|
||||
|
||||
@@ -1,12 +1,11 @@
|
||||
package com.usatiuk.objects.transaction;
|
||||
|
||||
import com.google.common.collect.Streams;
|
||||
import com.usatiuk.dhfs.utils.AutoCloseableNoThrow;
|
||||
import com.usatiuk.objects.JData;
|
||||
import com.usatiuk.objects.JDataVersionedWrapper;
|
||||
import com.usatiuk.objects.JObjectKey;
|
||||
import com.usatiuk.objects.snapshot.Snapshot;
|
||||
import com.usatiuk.objects.snapshot.SnapshotManager;
|
||||
import com.usatiuk.objects.stores.WritebackObjectPersistentStore;
|
||||
import io.quarkus.logging.Log;
|
||||
import io.quarkus.runtime.StartupEvent;
|
||||
import jakarta.annotation.Priority;
|
||||
@@ -14,17 +13,13 @@ import jakarta.enterprise.context.ApplicationScoped;
|
||||
import jakarta.enterprise.event.Observes;
|
||||
import jakarta.enterprise.inject.Instance;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Singleton;
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
// Manages all access to com.usatiuk.objects.JData objects.
|
||||
// In particular, it serves as a source of truth for what is committed to the backing storage.
|
||||
// All data goes through it, it is responsible for transaction atomicity
|
||||
// TODO: persistent tx id
|
||||
@ApplicationScoped
|
||||
public class JObjectManager {
|
||||
private final List<PreCommitTxHook> _preCommitTxHooks;
|
||||
@@ -35,7 +30,7 @@ public class JObjectManager {
|
||||
}
|
||||
|
||||
@Inject
|
||||
SnapshotManager snapshotManager;
|
||||
WritebackObjectPersistentStore writebackObjectPersistentStore;
|
||||
@Inject
|
||||
TransactionFactory transactionFactory;
|
||||
@Inject
|
||||
@@ -180,25 +175,36 @@ public class JObjectManager {
|
||||
toUnlock.add(lock);
|
||||
}
|
||||
|
||||
commitSnapshot = snapshotManager.createSnapshot();
|
||||
commitSnapshot = writebackObjectPersistentStore.getSnapshot();
|
||||
} else {
|
||||
Log.trace("Committing transaction - no changes");
|
||||
|
||||
long version = 0L;
|
||||
|
||||
for (var read : readSet.values()) {
|
||||
version = Math.max(version, read.data().map(JDataVersionedWrapper::version).orElse(0L));
|
||||
if (read instanceof TransactionObjectLocked<?> locked) {
|
||||
locked.lock().close();
|
||||
}
|
||||
}
|
||||
|
||||
long finalVersion = version;
|
||||
Consumer<Runnable> fenceFn = r -> {
|
||||
writebackObjectPersistentStore.asyncFence(finalVersion, r);
|
||||
};
|
||||
|
||||
return Pair.of(
|
||||
Stream.concat(
|
||||
tx.getOnCommit().stream(),
|
||||
tx.getOnFlush().stream()
|
||||
Stream.<Runnable>of(() -> {
|
||||
for (var f : tx.getOnFlush())
|
||||
fenceFn.accept(f);
|
||||
})
|
||||
).toList(),
|
||||
new TransactionHandle() {
|
||||
@Override
|
||||
public void onFlush(Runnable runnable) {
|
||||
runnable.run();
|
||||
fenceFn.accept(runnable);
|
||||
}
|
||||
});
|
||||
}
|
||||
@@ -219,7 +225,7 @@ public class JObjectManager {
|
||||
// TODO: Every write gets a dependency due to hooks
|
||||
continue;
|
||||
// assert false;
|
||||
// throw new TxCommitException("Serialization hazard: " + dep.isEmpty() + " vs " + read.getValue().data().isEmpty());
|
||||
// throw new TxCommitException("Serialization hazard: " + dep.isEmpty() + " vs " + read.getValue().value().isEmpty());
|
||||
}
|
||||
|
||||
if (current.get().version() > snapshotId) {
|
||||
@@ -233,8 +239,9 @@ public class JObjectManager {
|
||||
Log.tracev("Skipped dependency checks: no changes");
|
||||
}
|
||||
|
||||
var addFlushCallback = snapshotManager.commitTx(writes.values());
|
||||
var addFlushCallback = writebackObjectPersistentStore.commitTx(writes.values());
|
||||
|
||||
// TODO: is it ok to possibly run it inside transaction?
|
||||
for (var callback : tx.getOnFlush()) {
|
||||
addFlushCallback.accept(callback);
|
||||
}
|
||||
@@ -270,31 +277,4 @@ public class JObjectManager {
|
||||
});
|
||||
tx.close();
|
||||
}
|
||||
|
||||
// private class TransactionObjectSourceImpl implements TransactionObjectSource {
|
||||
// private final long _txId;
|
||||
//
|
||||
// private TransactionObjectSourceImpl(long txId) {
|
||||
// _txId = txId;
|
||||
// }
|
||||
//
|
||||
// @Override
|
||||
// public <T extends JData> TransactionObject<T> get(Class<T> type, JObjectKey key) {
|
||||
// var got = getObj(type, key);
|
||||
// if (got.data().isPresent() && got.data().get().version() > _txId) {
|
||||
// throw new TxCommitException("Serialization race for " + key + ": " + got.data().get().version() + " vs " + _txId);
|
||||
// }
|
||||
// return got;
|
||||
// }
|
||||
//
|
||||
// @Override
|
||||
// public <T extends JData> TransactionObject<T> getWriteLocked(Class<T> type, JObjectKey key) {
|
||||
// var got = getObjLock(type, key);
|
||||
// if (got.data().isPresent() && got.data().get().version() > _txId) {
|
||||
// got.lock().close();
|
||||
// throw new TxCommitException("Serialization race for " + key + ": " + got.data().get().version() + " vs " + _txId);
|
||||
// }
|
||||
// return got;
|
||||
// }
|
||||
// }
|
||||
}
|
||||
@@ -5,9 +5,8 @@ import com.usatiuk.objects.JDataVersionedWrapper;
|
||||
import com.usatiuk.objects.JObjectKey;
|
||||
import com.usatiuk.objects.iterators.*;
|
||||
import com.usatiuk.objects.snapshot.Snapshot;
|
||||
import com.usatiuk.objects.snapshot.SnapshotManager;
|
||||
import com.usatiuk.objects.stores.WritebackObjectPersistentStore;
|
||||
import io.quarkus.logging.Log;
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Singleton;
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
@@ -18,7 +17,7 @@ import java.util.*;
|
||||
@Singleton
|
||||
public class TransactionFactoryImpl implements TransactionFactory {
|
||||
@Inject
|
||||
SnapshotManager snapshotManager;
|
||||
WritebackObjectPersistentStore writebackObjectPersistentStore;
|
||||
@Inject
|
||||
LockManager lockManager;
|
||||
@ConfigProperty(name = "dhfs.objects.transaction.never-lock")
|
||||
@@ -65,7 +64,7 @@ public class TransactionFactoryImpl implements TransactionFactory {
|
||||
private Map<JObjectKey, TxRecord.TxObjectRecord<?>> _newWrites = new HashMap<>();
|
||||
|
||||
private TransactionImpl() {
|
||||
_snapshot = snapshotManager.createSnapshot();
|
||||
_snapshot = writebackObjectPersistentStore.getSnapshot();
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -95,15 +94,12 @@ public class TransactionFactoryImpl implements TransactionFactory {
|
||||
|
||||
@Override
|
||||
public <T extends JData> Optional<T> getFromSource(Class<T> type, JObjectKey key) {
|
||||
var got = _readSet.get(key);
|
||||
|
||||
if (got == null) {
|
||||
var read = _snapshot.readObject(key);
|
||||
_readSet.put(key, new TransactionObjectNoLock<>(read));
|
||||
return read.map(JDataVersionedWrapper::data).map(type::cast);
|
||||
}
|
||||
|
||||
return got.data().map(JDataVersionedWrapper::data).map(type::cast);
|
||||
return _readSet.computeIfAbsent(key, k -> {
|
||||
var read = _snapshot.readObject(k);
|
||||
return new TransactionObjectNoLock<>(read);
|
||||
})
|
||||
.data()
|
||||
.map(w -> type.cast(w.data()));
|
||||
}
|
||||
|
||||
public <T extends JData> Optional<T> getWriteLockedFromSource(Class<T> type, JObjectKey key) {
|
||||
@@ -162,20 +158,25 @@ public class TransactionFactoryImpl implements TransactionFactory {
|
||||
@Override
|
||||
public CloseableKvIterator<JObjectKey, JData> getIterator(IteratorStart start, JObjectKey key) {
|
||||
Log.tracev("Getting tx iterator with start={0}, key={1}", start, key);
|
||||
return new ReadTrackingIterator(new TombstoneMergingKvIterator<>("tx", start, key,
|
||||
return new ReadTrackingIterator(TombstoneMergingKvIterator.<JObjectKey, ReadTrackingInternalCrap>of("tx", start, key,
|
||||
(tS, tK) -> new MappingKvIterator<>(new NavigableMapKvIterator<>(_writes, tS, tK),
|
||||
t -> switch (t) {
|
||||
case TxRecord.TxObjectRecordWrite<?> write ->
|
||||
new Data<>(new ReadTrackingInternalCrapTx(write.data()));
|
||||
case TxRecord.TxObjectRecordDeleted deleted -> new Tombstone<>();
|
||||
new DataWrapper<>(new ReadTrackingInternalCrapTx(write.data()));
|
||||
case TxRecord.TxObjectRecordDeleted deleted -> new TombstoneImpl<>();
|
||||
case null, default -> null;
|
||||
}),
|
||||
(tS, tK) -> new MappingKvIterator<>(_snapshot.getIterator(tS, tK),
|
||||
d -> new Data<ReadTrackingInternalCrap>(new ReadTrackingInternalCrapSource(d)))));
|
||||
d -> new DataWrapper<ReadTrackingInternalCrap>(new ReadTrackingInternalCrapSource(d)))));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void put(JData obj) {
|
||||
var read = _readSet.get(obj.key());
|
||||
if (read != null && (read.data().map(JDataVersionedWrapper::data).orElse(null) == obj)) {
|
||||
return;
|
||||
}
|
||||
|
||||
_writes.put(obj.key(), new TxRecord.TxObjectRecordWrite<>(obj));
|
||||
_newWrites.put(obj.key(), new TxRecord.TxObjectRecordWrite<>(obj));
|
||||
}
|
||||
|
||||
@@ -17,30 +17,27 @@ public interface TransactionManager {
|
||||
return supplier.get();
|
||||
}
|
||||
|
||||
begin();
|
||||
T ret;
|
||||
try {
|
||||
ret = supplier.get();
|
||||
} catch (TxCommitException txCommitException) {
|
||||
rollback();
|
||||
if (tries == 0) {
|
||||
Log.error("Transaction commit failed", txCommitException);
|
||||
throw txCommitException;
|
||||
while (true) {
|
||||
begin();
|
||||
boolean commit = false;
|
||||
try {
|
||||
var ret = supplier.get();
|
||||
commit = true;
|
||||
commit();
|
||||
return ret;
|
||||
} catch (TxCommitException txCommitException) {
|
||||
if (!commit)
|
||||
rollback();
|
||||
if (tries == 0) {
|
||||
Log.error("Transaction commit failed", txCommitException);
|
||||
throw txCommitException;
|
||||
}
|
||||
tries--;
|
||||
} catch (Throwable e) {
|
||||
if (!commit)
|
||||
rollback();
|
||||
throw e;
|
||||
}
|
||||
return runTries(supplier, tries - 1);
|
||||
} catch (Throwable e) {
|
||||
rollback();
|
||||
throw e;
|
||||
}
|
||||
try {
|
||||
commit();
|
||||
return ret;
|
||||
} catch (TxCommitException txCommitException) {
|
||||
if (tries == 0) {
|
||||
Log.error("Transaction commit failed", txCommitException);
|
||||
throw txCommitException;
|
||||
}
|
||||
return runTries(supplier, tries - 1);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -55,29 +52,29 @@ public interface TransactionManager {
|
||||
};
|
||||
}
|
||||
|
||||
begin();
|
||||
try {
|
||||
fn.apply();
|
||||
} catch (TxCommitException txCommitException) {
|
||||
rollback();
|
||||
if (tries == 0) {
|
||||
Log.error("Transaction commit failed", txCommitException);
|
||||
throw txCommitException;
|
||||
while (true) {
|
||||
begin();
|
||||
boolean commit = false;
|
||||
try {
|
||||
fn.apply();
|
||||
commit = true;
|
||||
var ret = commit();
|
||||
return ret;
|
||||
} catch (TxCommitException txCommitException) {
|
||||
if (!commit)
|
||||
rollback();
|
||||
if (tries == 0) {
|
||||
Log.error("Transaction commit failed", txCommitException);
|
||||
throw txCommitException;
|
||||
}
|
||||
tries--;
|
||||
} catch (Throwable e) {
|
||||
if (!commit)
|
||||
rollback();
|
||||
throw e;
|
||||
}
|
||||
return runTries(fn, tries - 1);
|
||||
} catch (Throwable e) {
|
||||
rollback();
|
||||
throw e;
|
||||
}
|
||||
try {
|
||||
return commit();
|
||||
} catch (TxCommitException txCommitException) {
|
||||
if (tries == 0) {
|
||||
Log.error("Transaction commit failed", txCommitException);
|
||||
throw txCommitException;
|
||||
}
|
||||
return runTries(fn, tries - 1);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
default TransactionHandle run(VoidFn fn) {
|
||||
|
||||
@@ -35,13 +35,6 @@
|
||||
<dhfs.native-libs-dir>${project.parent.build.outputDirectory}/native</dhfs.native-libs-dir>
|
||||
</properties>
|
||||
|
||||
<repositories>
|
||||
<repository>
|
||||
<id>jitpack.io</id>
|
||||
<url>https://jitpack.io</url>
|
||||
</repository>
|
||||
</repositories>
|
||||
|
||||
<dependencyManagement>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
@@ -151,6 +144,7 @@
|
||||
</native.image.path>
|
||||
<java.util.logging.manager>org.jboss.logmanager.LogManager</java.util.logging.manager>
|
||||
<buildDirectory>${project.build.directory}</buildDirectory>
|
||||
<lazyFsPath>${project.basedir}/../../thirdparty/lazyfs/lazyfs/lazyfs</lazyFsPath>
|
||||
<nativeLibsDirectory>${dhfs.native-libs-dir}</nativeLibsDirectory>
|
||||
<maven.home>${maven.home}</maven.home>
|
||||
</systemPropertyVariables>
|
||||
|
||||
@@ -72,26 +72,6 @@
|
||||
<artifactId>quarkus-junit5</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.github.SerCeMan</groupId>
|
||||
<artifactId>jnr-fuse</artifactId>
|
||||
<version>44ed40f8ce</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.github.jnr</groupId>
|
||||
<artifactId>jnr-ffi</artifactId>
|
||||
<version>2.2.16</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.github.jnr</groupId>
|
||||
<artifactId>jnr-posix</artifactId>
|
||||
<version>3.1.19</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.github.jnr</groupId>
|
||||
<artifactId>jnr-constants</artifactId>
|
||||
<version>0.10.4</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.commons</groupId>
|
||||
<artifactId>commons-lang3</artifactId>
|
||||
|
||||
@@ -6,8 +6,7 @@ import org.pcollections.*;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
|
||||
public record RemoteObjectMeta(PCollection<JDataRef> refsFrom, boolean frozen,
|
||||
JObjectKey key,
|
||||
public record RemoteObjectMeta(JObjectKey key, PCollection<JDataRef> refsFrom, boolean frozen,
|
||||
PMap<PeerId, Long> knownRemoteVersions,
|
||||
Class<? extends JDataRemote> knownType,
|
||||
PSet<PeerId> confirmedDeletes,
|
||||
@@ -16,22 +15,22 @@ public record RemoteObjectMeta(PCollection<JDataRef> refsFrom, boolean frozen,
|
||||
boolean hasLocalData) implements JDataRefcounted {
|
||||
// Self put
|
||||
public RemoteObjectMeta(JDataRemote data, PeerId initialPeer) {
|
||||
this(HashTreePSet.empty(), false,
|
||||
data.key(), HashTreePMap.empty(), data.getClass(), HashTreePSet.empty(), false,
|
||||
this(data.key(), HashTreePSet.empty(), false,
|
||||
HashTreePMap.empty(), data.getClass(), HashTreePSet.empty(), false,
|
||||
HashTreePMap.<PeerId, Long>empty().plus(initialPeer, 1L),
|
||||
true);
|
||||
}
|
||||
|
||||
public RemoteObjectMeta(JObjectKey key, PMap<PeerId, Long> remoteChangelog) {
|
||||
this(HashTreePSet.empty(), false,
|
||||
key, HashTreePMap.empty(), JDataRemote.class, HashTreePSet.empty(), true,
|
||||
this(key, HashTreePSet.empty(), false,
|
||||
HashTreePMap.empty(), JDataRemote.class, HashTreePSet.empty(), true,
|
||||
remoteChangelog,
|
||||
false);
|
||||
}
|
||||
|
||||
public RemoteObjectMeta(JObjectKey key) {
|
||||
this(HashTreePSet.empty(), false,
|
||||
key, HashTreePMap.empty(), JDataRemote.class, HashTreePSet.empty(), true,
|
||||
this(key, HashTreePSet.empty(), false,
|
||||
HashTreePMap.empty(), JDataRemote.class, HashTreePSet.empty(), true,
|
||||
TreePMap.empty(),
|
||||
false);
|
||||
}
|
||||
@@ -44,47 +43,42 @@ public record RemoteObjectMeta(PCollection<JDataRef> refsFrom, boolean frozen,
|
||||
return JObjectKey.of(key.value() + "_data");
|
||||
}
|
||||
|
||||
@Override
|
||||
public JObjectKey key() {
|
||||
return ofMetaKey(key);
|
||||
}
|
||||
|
||||
public JObjectKey dataKey() {
|
||||
return ofDataKey(key);
|
||||
}
|
||||
|
||||
@Override
|
||||
public RemoteObjectMeta withRefsFrom(PCollection<JDataRef> refs) {
|
||||
return new RemoteObjectMeta(refs, frozen, key, knownRemoteVersions, knownType, confirmedDeletes, seen, changelog, hasLocalData);
|
||||
return new RemoteObjectMeta(key, refs, frozen, knownRemoteVersions, knownType, confirmedDeletes, seen, changelog, hasLocalData);
|
||||
}
|
||||
|
||||
@Override
|
||||
public RemoteObjectMeta withFrozen(boolean frozen) {
|
||||
return new RemoteObjectMeta(refsFrom, frozen, key, knownRemoteVersions, knownType, confirmedDeletes, seen, changelog, hasLocalData);
|
||||
return new RemoteObjectMeta(key, refsFrom, frozen, knownRemoteVersions, knownType, confirmedDeletes, seen, changelog, hasLocalData);
|
||||
}
|
||||
|
||||
public RemoteObjectMeta withKnownRemoteVersions(PMap<PeerId, Long> knownRemoteVersions) {
|
||||
return new RemoteObjectMeta(refsFrom, frozen, key, knownRemoteVersions, knownType, confirmedDeletes, seen, changelog, hasLocalData);
|
||||
return new RemoteObjectMeta(key, refsFrom, frozen, knownRemoteVersions, knownType, confirmedDeletes, seen, changelog, hasLocalData);
|
||||
}
|
||||
|
||||
public RemoteObjectMeta withKnownType(Class<? extends JDataRemote> knownType) {
|
||||
return new RemoteObjectMeta(refsFrom, frozen, key, knownRemoteVersions, knownType, confirmedDeletes, seen, changelog, hasLocalData);
|
||||
return new RemoteObjectMeta(key, refsFrom, frozen, knownRemoteVersions, knownType, confirmedDeletes, seen, changelog, hasLocalData);
|
||||
}
|
||||
|
||||
public RemoteObjectMeta withConfirmedDeletes(PSet<PeerId> confirmedDeletes) {
|
||||
return new RemoteObjectMeta(refsFrom, frozen, key, knownRemoteVersions, knownType, confirmedDeletes, seen, changelog, hasLocalData);
|
||||
return new RemoteObjectMeta(key, refsFrom, frozen, knownRemoteVersions, knownType, confirmedDeletes, seen, changelog, hasLocalData);
|
||||
}
|
||||
|
||||
public RemoteObjectMeta withSeen(boolean seen) {
|
||||
return new RemoteObjectMeta(refsFrom, frozen, key, knownRemoteVersions, knownType, confirmedDeletes, seen, changelog, hasLocalData);
|
||||
return new RemoteObjectMeta(key, refsFrom, frozen, knownRemoteVersions, knownType, confirmedDeletes, seen, changelog, hasLocalData);
|
||||
}
|
||||
|
||||
public RemoteObjectMeta withChangelog(PMap<PeerId, Long> changelog) {
|
||||
return new RemoteObjectMeta(refsFrom, frozen, key, knownRemoteVersions, knownType, confirmedDeletes, seen, changelog, hasLocalData);
|
||||
return new RemoteObjectMeta(key, refsFrom, frozen, knownRemoteVersions, knownType, confirmedDeletes, seen, changelog, hasLocalData);
|
||||
}
|
||||
|
||||
public RemoteObjectMeta withHaveLocal(boolean haveLocal) {
|
||||
return new RemoteObjectMeta(refsFrom, frozen, key, knownRemoteVersions, knownType, confirmedDeletes, seen, changelog, haveLocal);
|
||||
return new RemoteObjectMeta(key, refsFrom, frozen, knownRemoteVersions, knownType, confirmedDeletes, seen, changelog, haveLocal);
|
||||
}
|
||||
|
||||
public long versionSum() {
|
||||
|
||||
@@ -180,7 +180,7 @@ public class PeerManager {
|
||||
});
|
||||
}
|
||||
|
||||
private Optional<PeerAddress> selectBestAddress(PeerId host) {
|
||||
public Optional<PeerAddress> selectBestAddress(PeerId host) {
|
||||
return peerDiscoveryDirectory.getForPeer(host).stream().min(Comparator.comparing(PeerAddress::type));
|
||||
}
|
||||
|
||||
|
||||
@@ -18,10 +18,7 @@ import org.apache.commons.lang3.tuple.Pair;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.*;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@@ -90,18 +87,24 @@ public class RemoteObjectServiceClient {
|
||||
}
|
||||
|
||||
public OpPushReply pushOps(PeerId target, List<Op> ops) {
|
||||
var barrier = new CountDownLatch(ops.size());
|
||||
for (Op op : ops) {
|
||||
txm.run(() -> {
|
||||
for (var ref : op.getEscapedRefs()) {
|
||||
curTx.get(RemoteObjectMeta.class, ref).map(m -> m.withSeen(true)).ifPresent(curTx::put);
|
||||
}
|
||||
});
|
||||
}).onFlush(barrier::countDown);
|
||||
}
|
||||
var builder = OpPushRequest.newBuilder();
|
||||
for (Op op : ops) {
|
||||
builder.addMsg(opProtoSerializer.serialize(op));
|
||||
}
|
||||
var built = builder.build();
|
||||
try {
|
||||
barrier.await();
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
rpcClientFactory.withObjSyncClient(target, (tgt, client) -> client.opPush(built));
|
||||
return OpPushReply.getDefaultInstance();
|
||||
}
|
||||
|
||||
@@ -8,6 +8,7 @@ import com.usatiuk.dhfs.repository.invalidation.OpHandler;
|
||||
import com.usatiuk.dhfs.repository.syncmap.DtoMapperService;
|
||||
import com.usatiuk.objects.JObjectKey;
|
||||
import com.usatiuk.objects.transaction.Transaction;
|
||||
import com.usatiuk.objects.transaction.TransactionHandle;
|
||||
import com.usatiuk.objects.transaction.TransactionManager;
|
||||
import io.grpc.Status;
|
||||
import io.grpc.StatusRuntimeException;
|
||||
@@ -17,6 +18,9 @@ import jakarta.enterprise.context.ApplicationScoped;
|
||||
import jakarta.inject.Inject;
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
// Note: RunOnVirtualThread hangs somehow
|
||||
@ApplicationScoped
|
||||
public class RemoteObjectServiceServerImpl {
|
||||
@@ -101,19 +105,30 @@ public class RemoteObjectServiceServerImpl {
|
||||
}
|
||||
|
||||
public Uni<OpPushReply> opPush(PeerId from, OpPushRequest request) {
|
||||
var handles = new ArrayList<TransactionHandle>();
|
||||
try {
|
||||
var ops = request.getMsgList().stream().map(opProtoSerializer::deserialize).toList();
|
||||
for (var op : ops) {
|
||||
Log.infov("<-- opPush: {0} from {1}", op, from);
|
||||
txm.run(() -> {
|
||||
var handle = txm.run(() -> {
|
||||
opHandler.handleOp(from, op);
|
||||
});
|
||||
handles.add(handle);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
Log.error("Error handling ops", e);
|
||||
throw e;
|
||||
}
|
||||
return Uni.createFrom().item(OpPushReply.getDefaultInstance());
|
||||
return Uni.createFrom().emitter(e -> {
|
||||
var counter = new AtomicLong(handles.size());
|
||||
for (var handle : handles) {
|
||||
handle.onFlush(() -> {
|
||||
if (counter.decrementAndGet() == 0) {
|
||||
e.complete(OpPushReply.getDefaultInstance());
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public Uni<PingReply> ping(PeerId from, PingRequest request) {
|
||||
|
||||
@@ -3,6 +3,7 @@ package com.usatiuk.dhfs.repository;
|
||||
import com.usatiuk.dhfs.PeerId;
|
||||
import com.usatiuk.dhfs.repository.peerdiscovery.IpPeerAddress;
|
||||
import com.usatiuk.dhfs.repository.peerdiscovery.PeerAddress;
|
||||
import io.grpc.ManagedChannel;
|
||||
import io.grpc.Status;
|
||||
import io.grpc.StatusRuntimeException;
|
||||
import io.quarkus.logging.Log;
|
||||
@@ -20,7 +21,7 @@ import java.util.concurrent.TimeUnit;
|
||||
|
||||
// TODO: Dedup this
|
||||
@ApplicationScoped
|
||||
public class RpcClientFactory {
|
||||
public class RpcClientFactory implements PeerDisconnectedEventListener {
|
||||
@ConfigProperty(name = "dhfs.objects.sync.timeout")
|
||||
long syncTimeout;
|
||||
|
||||
@@ -79,8 +80,20 @@ public class RpcClientFactory {
|
||||
return fn.apply(host, stub.withDeadlineAfter(timeout, TimeUnit.SECONDS));
|
||||
}
|
||||
|
||||
public void dropCache() {
|
||||
_objSyncCache = new ConcurrentHashMap<>();
|
||||
@Override
|
||||
public void handlePeerDisconnected(PeerId peerId) {
|
||||
ArrayList<ObjSyncStubKey> toRemove = new ArrayList<>();
|
||||
for (var objSyncStubKey : _objSyncCache.keySet()) {
|
||||
if (objSyncStubKey.id().equals(peerId)) {
|
||||
toRemove.add(objSyncStubKey);
|
||||
}
|
||||
}
|
||||
for (var objSyncStubKey : toRemove) {
|
||||
var stub = _objSyncCache.remove(objSyncStubKey);
|
||||
if (stub != null) {
|
||||
((ManagedChannel) stub.getChannel()).shutdown();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@FunctionalInterface
|
||||
|
||||
@@ -36,7 +36,11 @@ public class DeferredInvalidationQueueService implements PeerConnectedEventListe
|
||||
Log.info("Initializing with root " + dataRoot);
|
||||
if (Paths.get(dataRoot).resolve(dataFileName).toFile().exists()) {
|
||||
Log.info("Reading invalidation queue");
|
||||
_persistentData = SerializationHelper.deserialize(Files.readAllBytes(Paths.get(dataRoot).resolve(dataFileName)));
|
||||
try {
|
||||
_persistentData = SerializationHelper.deserialize(Files.readAllBytes(Paths.get(dataRoot).resolve(dataFileName)));
|
||||
} catch (Exception e) {
|
||||
Log.error("Error reading invalidation queue", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -162,7 +162,7 @@ public class InvalidationQueueService {
|
||||
commits.get(p).forEach(Runnable::run);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
Log.warnv("Failed to send invalidations, will retry", e);
|
||||
Log.warn("Failed to send invalidations, will retry", e);
|
||||
for (var inv : data) {
|
||||
pushInvalidationToOne(inv);
|
||||
}
|
||||
|
||||
@@ -1,4 +1,6 @@
|
||||
package com.usatiuk.dhfs.repository.webapi;
|
||||
|
||||
public record KnownPeerInfo(String uuid) {
|
||||
import jakarta.annotation.Nullable;
|
||||
|
||||
public record KnownPeerInfo(String uuid, @Nullable String knownAddress) {
|
||||
}
|
||||
|
||||
@@ -0,0 +1,6 @@
|
||||
package com.usatiuk.dhfs.repository.webapi;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
public record KnownPeers(List<KnownPeerInfo> peers, String selfUuid) {
|
||||
}
|
||||
@@ -2,6 +2,7 @@ package com.usatiuk.dhfs.repository.webapi;
|
||||
|
||||
import com.usatiuk.dhfs.PeerId;
|
||||
import com.usatiuk.dhfs.repository.PeerManager;
|
||||
import com.usatiuk.dhfs.repository.PersistentPeerDataService;
|
||||
import com.usatiuk.dhfs.repository.peersync.PeerInfoService;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.ws.rs.DELETE;
|
||||
@@ -11,6 +12,8 @@ import jakarta.ws.rs.Path;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
|
||||
@Path("/peers-manage")
|
||||
public class PeerManagementApi {
|
||||
@@ -18,11 +21,15 @@ public class PeerManagementApi {
|
||||
PeerInfoService peerInfoService;
|
||||
@Inject
|
||||
PeerManager peerManager;
|
||||
@Inject
|
||||
PersistentPeerDataService persistentPeerDataService;
|
||||
|
||||
@Path("known-peers")
|
||||
@GET
|
||||
public List<KnownPeerInfo> knownPeers() {
|
||||
return peerInfoService.getPeers().stream().map(peerInfo -> new KnownPeerInfo(peerInfo.id().toString())).toList();
|
||||
public KnownPeers knownPeers() {
|
||||
return new KnownPeers(peerInfoService.getPeers().stream().map(peerInfo -> new KnownPeerInfo(peerInfo.id().toString(),
|
||||
Optional.ofNullable(peerManager.getAddress(peerInfo.id())).map(Objects::toString).orElse(null))).toList(),
|
||||
persistentPeerDataService.getSelfUuid().toString());
|
||||
}
|
||||
|
||||
@Path("known-peers")
|
||||
@@ -40,7 +47,10 @@ public class PeerManagementApi {
|
||||
@Path("available-peers")
|
||||
@GET
|
||||
public Collection<KnownPeerInfo> availablePeers() {
|
||||
return peerManager.getSeenButNotAddedHosts().stream().map(p -> new KnownPeerInfo(p.toString())).toList();
|
||||
return peerManager.getSeenButNotAddedHosts().stream()
|
||||
.map(p -> new KnownPeerInfo(p.toString(),
|
||||
peerManager.selectBestAddress(p).map(Objects::toString).orElse(null)))
|
||||
.toList();
|
||||
}
|
||||
|
||||
@Path("peer-state")
|
||||
|
||||
9
thirdparty/lazyfs/build.sh
vendored
Executable file
9
thirdparty/lazyfs/build.sh
vendored
Executable file
@@ -0,0 +1,9 @@
|
||||
# apt install g++ cmake libfuse3-dev libfuse3-3 fuse3
|
||||
|
||||
export CMAKE_BUILD_PARALLEL_LEVEL="$(nproc)"
|
||||
|
||||
cd lazyfs
|
||||
|
||||
cd libs/libpcache && ./build.sh && cd -
|
||||
|
||||
cd lazyfs && ./build.sh && cd -
|
||||
1
thirdparty/lazyfs/lazyfs
vendored
Submodule
1
thirdparty/lazyfs/lazyfs
vendored
Submodule
Submodule thirdparty/lazyfs/lazyfs added at 7b137f90ef
@@ -12,8 +12,10 @@ export function PeerAvailableCard({ peerInfo }: TPeerAvailableCardProps) {
|
||||
return (
|
||||
<div className="peerAvailableCard">
|
||||
<div className={"peerInfo"}>
|
||||
<span>UUID: </span>
|
||||
<span>{peerInfo.uuid}</span>
|
||||
<div>
|
||||
<span>UUID: </span>
|
||||
<span>{peerInfo.uuid}</span>
|
||||
</div>
|
||||
</div>
|
||||
<fetcher.Form
|
||||
className="actions"
|
||||
|
||||
@@ -21,15 +21,21 @@ export function PeerKnownCard({ peerInfo }: TPeerKnownCardProps) {
|
||||
<div className="peerKnownCard">
|
||||
<div className={"peerInfo"}>
|
||||
<div>
|
||||
<div>
|
||||
|
||||
<span>UUID: </span>
|
||||
<span>{peerInfo.uuid}</span>
|
||||
</div>
|
||||
<div>
|
||||
<span>{peerInfo.knownAddress ? "connected" : "not connected"}</span>
|
||||
</div>
|
||||
</div>
|
||||
<div>
|
||||
<fetcher.Form
|
||||
className="actions"
|
||||
method="put"
|
||||
action={"/home/peers"}
|
||||
>
|
||||
<span>Manual address: </span>
|
||||
<input
|
||||
name="intent"
|
||||
hidden={true}
|
||||
@@ -43,6 +49,7 @@ export function PeerKnownCard({ peerInfo }: TPeerKnownCardProps) {
|
||||
<input
|
||||
name="address"
|
||||
defaultValue={addr?.address || ""}
|
||||
placeholder={"ip:port:secure port"}
|
||||
/>
|
||||
<button type="submit">save</button>
|
||||
</fetcher.Form>
|
||||
|
||||
@@ -8,9 +8,9 @@ import { PeerKnownCard } from "./PeerKnownCard";
|
||||
export function PeerState() {
|
||||
const loaderData = useLoaderData() as LoaderToType<typeof peerStateLoader>;
|
||||
|
||||
const knownPeers = loaderData.knownPeers.map((p) => (
|
||||
<PeerKnownCard peerInfo={p} key={p.uuid} />
|
||||
));
|
||||
const knownPeers = loaderData.knownPeers.peers
|
||||
.filter((p) => p.uuid !== loaderData.knownPeers.selfUuid)
|
||||
.map((p) => <PeerKnownCard peerInfo={p} key={p.uuid} />);
|
||||
|
||||
const availablePeers = loaderData.availablePeers.map((p) => (
|
||||
<PeerAvailableCard peerInfo={p} key={p.uuid} />
|
||||
@@ -18,6 +18,7 @@ export function PeerState() {
|
||||
|
||||
return (
|
||||
<div id={"PeerState"}>
|
||||
<div>Self UUID: {loaderData.knownPeers.selfUuid}</div>
|
||||
<div>
|
||||
<div>Known peers</div>
|
||||
<div>{knownPeers}</div>
|
||||
|
||||
@@ -1,13 +1,13 @@
|
||||
import { fetchJSON, fetchJSON_throws } from "./utils";
|
||||
import {
|
||||
AvailablePeerInfoToResp,
|
||||
KnownPeerInfoToResp,
|
||||
KnownPeersToResp,
|
||||
NoContentToResp,
|
||||
PeerAddressInfoToResp,
|
||||
TAvailablePeerInfoArrTo,
|
||||
TAvailablePeerInfoToResp,
|
||||
TKnownPeerInfoArrTo,
|
||||
TKnownPeerInfoToResp,
|
||||
TKnownPeerInfoArrTo, TKnownPeersTo,
|
||||
TKnownPeersToResp,
|
||||
TNoContentToResp,
|
||||
TPeerAddressInfoArrTo,
|
||||
TPeerAddressInfoToResp,
|
||||
@@ -20,11 +20,11 @@ export async function getAvailablePeers(): Promise<TAvailablePeerInfoArrTo> {
|
||||
>("/peers-manage/available-peers", "GET", AvailablePeerInfoToResp);
|
||||
}
|
||||
|
||||
export async function getKnownPeers(): Promise<TKnownPeerInfoArrTo> {
|
||||
return fetchJSON_throws<TKnownPeerInfoToResp, typeof KnownPeerInfoToResp>(
|
||||
export async function getKnownPeers(): Promise<TKnownPeersTo> {
|
||||
return fetchJSON_throws<TKnownPeersToResp, typeof KnownPeersToResp>(
|
||||
"/peers-manage/known-peers",
|
||||
"GET",
|
||||
KnownPeerInfoToResp,
|
||||
KnownPeersToResp,
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
@@ -39,6 +39,7 @@ export type TTokenToResp = z.infer<typeof TokenToResp>;
|
||||
// AvailablePeerInfo
|
||||
export const AvailablePeerInfoTo = z.object({
|
||||
uuid: z.string(),
|
||||
knownAddress: z.string().optional(),
|
||||
// addr: z.string(),
|
||||
// port: z.number(),
|
||||
});
|
||||
@@ -55,14 +56,21 @@ export type TAvailablePeerInfoToResp = z.infer<typeof AvailablePeerInfoToResp>;
|
||||
// KnownPeerInfo
|
||||
export const KnownPeerInfoTo = z.object({
|
||||
uuid: z.string(),
|
||||
knownAddress: z.string().optional(),
|
||||
});
|
||||
export type TKnownPeerInfoTo = z.infer<typeof KnownPeerInfoTo>;
|
||||
|
||||
export const KnownPeerInfoArrTo = z.array(KnownPeerInfoTo);
|
||||
export type TKnownPeerInfoArrTo = z.infer<typeof KnownPeerInfoArrTo>;
|
||||
|
||||
export const KnownPeerInfoToResp = CreateAPIResponse(KnownPeerInfoArrTo);
|
||||
export type TKnownPeerInfoToResp = z.infer<typeof KnownPeerInfoToResp>;
|
||||
export const KnownPeersTo = z.object({
|
||||
selfUuid: z.string(),
|
||||
peers: KnownPeerInfoArrTo,
|
||||
});
|
||||
export type TKnownPeersTo = z.infer<typeof KnownPeersTo>;
|
||||
|
||||
export const KnownPeersToResp = CreateAPIResponse(KnownPeersTo);
|
||||
export type TKnownPeersToResp = z.infer<typeof KnownPeersToResp>;
|
||||
|
||||
// PeerAddressInfo
|
||||
export const PeerAddressInfoTo = z.object({
|
||||
|
||||
@@ -40,6 +40,8 @@
|
||||
color: inherit;
|
||||
font-size: inherit;
|
||||
|
||||
padding: 0.5rem;
|
||||
|
||||
width: 100%;
|
||||
height: 100%;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user