29 Commits

Author SHA1 Message Date
34db870fc6 Objects: simplify TombstoneMergingKvIterator 2025-04-22 23:53:41 +02:00
0e62a29ce0 Objects: cache peeked key in LmdbKvIterator 2025-04-22 23:53:18 +02:00
7de5f91fd2 Dhfs-app: lazyfs crash test 2025-04-22 23:25:26 +02:00
ac68208b1a Sync-base: don't crash if invalidation queue is corrupted 2025-04-22 21:38:15 +02:00
4e0675940e Dhfs-app: better fileConflictTest 2025-04-22 21:37:51 +02:00
4f5f347b3c Use stable jnr-fuse version 2025-04-21 11:30:14 +02:00
bd5395e03f Dhfs-fs: mtime fix 2025-04-21 11:29:35 +02:00
f56f564e8b Objects: simplify TransactionManager 2025-04-21 11:15:48 +02:00
eaa413e200 Objects: interfacify MaybeTombstone Data 2025-04-19 17:25:06 +02:00
f3e4d99fcb Objects: seal JDataVersionedWrapper 2025-04-19 12:07:36 +02:00
1c71b26ed8 Objects: 1 less field in JDataVersionedWrapperLazy 2025-04-19 12:06:33 +02:00
e6f95ef028 Remove supportlib
nice idea, but ram usage explosion seems to cancel out the benefits
2025-04-19 11:32:35 +02:00
59e8f6a6b4 Objects: one less copy when serializing
only cache what was really read, otherwise its lifetime is the same as transaction
2025-04-19 11:03:26 +02:00
0292df7f0e Objects: faster JObjectKey 2025-04-19 11:02:30 +02:00
a6a4101bb0 Objects: use bytebuffer to read
a little less GC pressure
2025-04-18 13:21:04 +02:00
59fa5dcf28 Fixie for HashSetDelayedBlockingQueueTest 2025-04-18 13:08:55 +02:00
0f5fb8b8b6 Objects: PBT MergingIterator test 2025-04-18 13:08:40 +02:00
c087dd8971 More microoptimizations 3! 2025-04-18 12:13:22 +02:00
14ddddd0ff Sync-base: use serialized certificate in self data
makes it easier to switch serialization
2025-04-18 11:06:40 +02:00
9859378627 Sync-base: move "_data" to suffix
makes cache much less bad
2025-04-18 11:06:15 +02:00
e167c21d40 More microoptimizations 2! 2025-04-17 11:48:43 +02:00
7dc8f486ea More microoptimizations! 2025-04-17 10:02:26 +02:00
da1a996e6f Support: un-simplify allocateUninitialized 2025-04-17 09:20:56 +02:00
bb52a3af0e Objects: waste less cpu in transaction commit 2025-04-17 00:26:58 +02:00
de0b868349 Objects: one less sorted tree traversal in advanceIterator
totally overengineering
2025-04-17 00:14:56 +02:00
d4d4e150c1 Objects: use LATIN1 strings for keys
should be a bit faster to match the internal string representation
2025-04-17 00:12:37 +02:00
c9b0400d50 Objects: faster MergingKvIterator 2025-04-16 23:41:30 +02:00
94218330b1 Simplify allocateUninitialized 2025-04-16 16:26:58 +02:00
dbe2a72f7c Objects: don't create a db_ver_obj bytebuffer every time 2025-04-16 15:39:52 +02:00
86 changed files with 1173 additions and 1398 deletions

View File

@@ -20,26 +20,21 @@ jobs:
steps:
- name: Checkout
uses: actions/checkout@v4
with:
submodules: 'recursive'
- name: Install sudo for ACT
run: apt-get update && apt-get install -y sudo
if: env.ACT=='true'
- name: Install fuse and maven
run: sudo apt-get update && sudo apt-get install -y libfuse2
- name: Install FUSE
run: sudo apt-get update && sudo apt-get install -y libfuse2 libfuse3-dev libfuse3-3 fuse3
- name: Download maven
run: |
cd "$HOME"
mkdir maven-bin
curl -s -L https://dlcdn.apache.org/maven/maven-3/3.9.9/binaries/apache-maven-3.9.9-bin.tar.gz | tar xvz --strip-components=1 -C maven-bin
echo "$HOME"/maven-bin/bin >> $GITHUB_PATH
- name: Maven info
run: |
echo $GITHUB_PATH
echo $PATH
mvn -v
- name: User allow other for fuse
run: echo "user_allow_other" | sudo tee -a /etc/fuse.conf
- name: Dump fuse.conf
run: cat /etc/fuse.conf
- name: Set up JDK 21
uses: actions/setup-java@v4
@@ -48,6 +43,9 @@ jobs:
distribution: "zulu"
cache: maven
- name: Build LazyFS
run: cd thirdparty/lazyfs/ && ./build.sh
- name: Test with Maven
run: cd dhfs-parent && mvn -T $(nproc) --batch-mode --update-snapshots package verify
@@ -89,102 +87,6 @@ jobs:
name: Webui
path: webui/dist
build-native-libs:
strategy:
matrix:
include:
- os: ubuntu-latest
cross: "linux/amd64"
- os: ubuntu-latest
cross: "linux/arm64"
- os: macos-latest
runs-on: ${{ matrix.os }}
env:
DO_LOCAL_BUILD: ${{ matrix.os == 'macos-latest' }}
DOCKER_PLATFORM: ${{ matrix.cross || 'NATIVE' }}
steps:
- name: Checkout
uses: actions/checkout@v4
- name: Set SANITIZED_DOCKER_PLATFORM
run: echo "SANITIZED_DOCKER_PLATFORM=$(echo $DOCKER_PLATFORM | tr / _ )" >> $GITHUB_ENV
- name: Set DOCKER_BUILDER_IMAGE
run: echo "DOCKER_BUILDER_IMAGE=dhfs_lib_builder-${{matrix.os}}-$SANITIZED_DOCKER_PLATFORM" >> $GITHUB_ENV
- name: Build config
run: |
echo DO_LOCAL_BUILD: $DO_LOCAL_BUILD
echo DOCKER_PLATFORM: $DOCKER_PLATFORM
echo SANITIZED_DOCKER_PLATFORM: $SANITIZED_DOCKER_PLATFORM
echo DOCKER_BUILDER_IMAGE: $DOCKER_BUILDER_IMAGE
- name: Set up JDK 21
if: ${{ env.DO_LOCAL_BUILD == 'TRUE' }}
uses: actions/setup-java@v4
with:
java-version: "21"
distribution: "zulu"
cache: maven
- name: Set up Docker Buildx
if: ${{ env.DO_LOCAL_BUILD != 'TRUE' }}
uses: docker/setup-buildx-action@v3
- name: Set up QEMU
if: ${{ env.DO_LOCAL_BUILD != 'TRUE' }}
uses: docker/setup-qemu-action@v3
- name: Build Docker builder image
if: ${{ env.DO_LOCAL_BUILD != 'TRUE' }}
uses: docker/build-push-action@v5
with:
context: ./libdhfs_support/builder
file: ./libdhfs_support/builder/Dockerfile
push: false
platforms: ${{ env.DOCKER_PLATFORM }}
tags: ${{ env.DOCKER_BUILDER_IMAGE }}
cache-from: type=gha,scope=build-${{ env.DOCKER_BUILDER_IMAGE }}
cache-to: type=gha,mode=max,scope=build-${{ env.DOCKER_BUILDER_IMAGE }}
load: true
- name: Build the library
run: |
CMAKE_ARGS="-DCMAKE_BUILD_TYPE=Release" libdhfs_support/builder/cross-build.sh both build "$(pwd)/result"
- name: Upload build
uses: actions/upload-artifact@v4
with:
name: NativeLib-${{ matrix.os }}-${{ env.SANITIZED_DOCKER_PLATFORM }}
path: result
merge-native-libs:
runs-on: ubuntu-latest
needs: [build-native-libs]
steps:
- name: Checkout
uses: actions/checkout@v4
- name: Download artifacts
uses: actions/download-artifact@v4
with:
path: downloaded-libs
- name: Merge all
run: rsync -av downloaded-libs/NativeLib*/* result/
- name: Check that libs exists
run: |
test -f "result/Linux-x86_64/libdhfs_support.so" || exit 1
- name: Upload
uses: actions/upload-artifact@v4
with:
name: NativeLibs
path: result
publish-docker:
runs-on: ubuntu-latest
permissions:
@@ -194,7 +96,7 @@ jobs:
# with sigstore/fulcio when running outside of PRs.
id-token: write
needs: [build-webui, merge-native-libs, build-dhfs]
needs: [build-webui, build-dhfs]
steps:
- name: Checkout repository
@@ -212,12 +114,6 @@ jobs:
name: Webui
path: webui-dist-downloaded
- name: Download native libs
uses: actions/download-artifact@v4
with:
name: NativeLibs
path: dhfs-native-downloaded
- name: Show all the files
run: find .
@@ -293,7 +189,7 @@ jobs:
# with sigstore/fulcio when running outside of PRs.
id-token: write
needs: [build-webui, merge-native-libs, build-dhfs]
needs: [build-webui, build-dhfs]
steps:
- name: Checkout repository
@@ -309,11 +205,6 @@ jobs:
name: Webui
path: webui-dist-downloaded
- uses: actions/download-artifact@v4
with:
name: NativeLibs
path: dhfs-native-downloaded
- name: Show all the files
run: find .
@@ -326,9 +217,6 @@ jobs:
- name: Copy Webui
run: cp -r ./webui-dist-downloaded "run-wrapper-out/dhfs/app/Webui"
- name: Copy Webui
run: cp -r ./dhfs-native-downloaded "run-wrapper-out/dhfs/app/NativeLibs"
- name: Copy run wrapper
run: cp -r ./run-wrapper/* "run-wrapper-out/dhfs/app/"

3
.gitmodules vendored Normal file
View File

@@ -0,0 +1,3 @@
[submodule "thirdparty/lazyfs/lazyfs"]
path = thirdparty/lazyfs/lazyfs
url = git@github.com:dsrhaslab/lazyfs.git

View File

@@ -9,8 +9,6 @@ COPY ./dhfs-package-downloaded/*.jar .
COPY ./dhfs-package-downloaded/app .
COPY ./dhfs-package-downloaded/quarkus .
WORKDIR /usr/src/app/native-libs
COPY ./dhfs-native-downloaded/. .
WORKDIR /usr/src/app/webui
COPY ./webui-dist-downloaded/. .

View File

@@ -2,7 +2,7 @@
<configuration default="false" name="Main 2" type="QsApplicationConfigurationType" factoryName="QuarkusApplication">
<option name="MAIN_CLASS_NAME" value="com.usatiuk.dhfs.app.Main" />
<module name="dhfs-app" />
<option name="VM_PARAMETERS" value="-XX:+UnlockDiagnosticVMOptions -XX:+DebugNonSafepoints --add-exports java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-exports java.base/jdk.internal.access=ALL-UNNAMED -ea -Dcom.usatiuk.dhfs.supportlib.native-path=$ProjectFileDir$/target/classes/native -Xmx2G -Ddhfs.webui.root=$ProjectFileDir$/../webui/dist -Ddhfs.fuse.root=${HOME}/dhfs_test/2/fuse -Ddhfs.objects.persistence.files.root=${HOME}/dhfs_test/2/data -Ddhfs.objects.persistence.stuff.root=${HOME}/dhfs_test/2/data/stuff -Ddhfs.objects.peerdiscovery.broadcast=false -Dquarkus.http.port=9020 -Dquarkus.http.ssl-port=9021 -Ddhfs.peerdiscovery.preset-uuid=22000000-0000-0000-0000-000000000000 -Ddhfs.peerdiscovery.static-peers=11000000-0000-0000-0000-000000000000:127.0.0.1:9010:9011" />
<option name="VM_PARAMETERS" value="-XX:+UnlockDiagnosticVMOptions -XX:+DebugNonSafepoints --add-exports java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-exports java.base/jdk.internal.access=ALL-UNNAMED -ea -Xmx2G -Ddhfs.webui.root=$ProjectFileDir$/../webui/dist -Ddhfs.fuse.root=${HOME}/dhfs_test/2/fuse -Ddhfs.objects.persistence.files.root=${HOME}/dhfs_test/2/data -Ddhfs.objects.persistence.stuff.root=${HOME}/dhfs_test/2/data/stuff -Ddhfs.objects.peerdiscovery.broadcast=false -Dquarkus.http.port=9020 -Dquarkus.http.ssl-port=9021 -Ddhfs.peerdiscovery.preset-uuid=22000000-0000-0000-0000-000000000000 -Ddhfs.peerdiscovery.static-peers=11000000-0000-0000-0000-000000000000:127.0.0.1:8080:9011" />
<extension name="coverage">
<pattern>
<option name="PATTERN" value="com.usatiuk.dhfs.*" />

View File

@@ -2,7 +2,7 @@
<configuration default="false" name="Main" type="QsApplicationConfigurationType" factoryName="QuarkusApplication" nameIsGenerated="true">
<option name="MAIN_CLASS_NAME" value="com.usatiuk.dhfs.app.Main" />
<module name="dhfs-app" />
<option name="VM_PARAMETERS" value="-XX:+UnlockDiagnosticVMOptions -XX:+DebugNonSafepoints --add-exports java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-exports java.base/jdk.internal.access=ALL-UNNAMED -ea -Dcom.usatiuk.dhfs.supportlib.native-path=$ProjectFileDir$/target/classes/native -Xmx2G -Ddhfs.webui.root=$ProjectFileDir$/../webui/dist -Ddhfs.fuse.root=${HOME}/dhfs_test/1/fuse -Ddhfs.objects.persistence.files.root=${HOME}/dhfs_test/1/data -Ddhfs.objects.persistence.stuff.root=${HOME}/dhfs_test/1/data/stuff -Ddhfs.objects.peerdiscovery.broadcast=true -Dquarkus.http.port=8080 -Dquarkus.http.ssl-port=9011 -Ddhfs.peerdiscovery.preset-uuid=11000000-0000-0000-0000-000000000000 -Ddhfs.peerdiscovery.static-peers=22000000-0000-0000-0000-000000000000:127.0.0.1:9020:9021 -Dquarkus.http.host=0.0.0.0" />
<option name="VM_PARAMETERS" value="-XX:+UnlockDiagnosticVMOptions --enable-preview -XX:+UseParallelGC -XX:+DebugNonSafepoints --add-exports java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-exports java.base/jdk.internal.access=ALL-UNNAMED -ea -Xmx2G -Ddhfs.webui.root=$ProjectFileDir$/../webui/dist -Ddhfs.fuse.root=${HOME}/dhfs_test/1/fuse -Ddhfs.objects.persistence.files.root=${HOME}/dhfs_test/1/data -Ddhfs.objects.persistence.stuff.root=${HOME}/dhfs_test/1/data/stuff -Ddhfs.objects.peerdiscovery.broadcast=true -Dquarkus.http.port=8080 -Dquarkus.http.ssl-port=9011 -Ddhfs.peerdiscovery.preset-uuid=11000000-0000-0000-0000-000000000000 -Ddhfs.peerdiscovery.static-peers=22000000-0000-0000-0000-000000000000:127.0.0.1:9020:9021 -Dquarkus.http.host=0.0.0.0" />
<extension name="coverage">
<pattern>
<option name="PATTERN" value="com.usatiuk.dhfs.*" />

View File

@@ -72,26 +72,6 @@
<artifactId>quarkus-junit5</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.github.SerCeMan</groupId>
<artifactId>jnr-fuse</artifactId>
<version>44ed40f8ce</version>
</dependency>
<dependency>
<groupId>com.github.jnr</groupId>
<artifactId>jnr-ffi</artifactId>
<version>2.2.16</version>
</dependency>
<dependency>
<groupId>com.github.jnr</groupId>
<artifactId>jnr-posix</artifactId>
<version>3.1.19</version>
</dependency>
<dependency>
<groupId>com.github.jnr</groupId>
<artifactId>jnr-constants</artifactId>
<version>0.10.4</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
@@ -127,11 +107,6 @@
<artifactId>kleppmanntree</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.usatiuk.dhfs</groupId>
<artifactId>supportlib</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.usatiuk.dhfs</groupId>
<artifactId>objects</artifactId>

View File

@@ -35,13 +35,15 @@ public class DhfsFusex3IT {
String c2uuid;
String c3uuid;
Network network;
// This calculation is somewhat racy, so keep it hardcoded for now
long emptyFileCount = 9;
@BeforeEach
void setup(TestInfo testInfo) throws IOException, InterruptedException, TimeoutException {
// TODO: Dedup
Network network = Network.newNetwork();
network = Network.newNetwork();
container1 = new GenericContainer<>(DhfsImage.getInstance())
.withPrivilegedMode(true)
@@ -252,21 +254,22 @@ public class DhfsFusex3IT {
await().atMost(45, TimeUnit.SECONDS).until(() -> "tesempty\n".equals(container3.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testf").getStdout()));
var client = DockerClientFactory.instance().client();
client.pauseContainerCmd(container1.getContainerId()).exec();
client.pauseContainerCmd(container2.getContainerId()).exec();
// Pauses needed as otherwise docker buffers some incoming packets
client.disconnectFromNetworkCmd().withContainerId(container1.getContainerId()).withNetworkId(network.getId()).exec();
client.disconnectFromNetworkCmd().withContainerId(container2.getContainerId()).withNetworkId(network.getId()).exec();
client.disconnectFromNetworkCmd().withContainerId(container3.getContainerId()).withNetworkId(network.getId()).exec();
waitingConsumer3.waitUntil(frame -> frame.getUtf8String().contains("Lost connection to"), 60, TimeUnit.SECONDS, 2);
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container3.execInContainer("/bin/sh", "-c", "echo test3 >> /dhfs_test/fuse/testf").getExitCode());
client.pauseContainerCmd(container3.getContainerId()).exec();
client.unpauseContainerCmd(container2.getContainerId()).exec();
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Lost connection to"), 60, TimeUnit.SECONDS, 2);
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "echo test2 >> /dhfs_test/fuse/testf").getExitCode());
client.pauseContainerCmd(container2.getContainerId()).exec();
client.unpauseContainerCmd(container1.getContainerId()).exec();
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Lost connection to"), 60, TimeUnit.SECONDS, 2);
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container3.execInContainer("/bin/sh", "-c", "echo test3 >> /dhfs_test/fuse/testf").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "echo test2 >> /dhfs_test/fuse/testf").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo test1 >> /dhfs_test/fuse/testf").getExitCode());
client.unpauseContainerCmd(container2.getContainerId()).exec();
client.unpauseContainerCmd(container3.getContainerId()).exec();
client.connectToNetworkCmd().withContainerId(container1.getContainerId()).withNetworkId(network.getId()).exec();
client.connectToNetworkCmd().withContainerId(container2.getContainerId()).withNetworkId(network.getId()).exec();
client.connectToNetworkCmd().withContainerId(container3.getContainerId()).withNetworkId(network.getId()).exec();
Log.warn("Waiting for connections");
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS, 2);
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS, 2);

View File

@@ -79,7 +79,6 @@ public class DhfsImage implements Future<String> {
"-Ddhfs.objects.sync.timeout=10",
"-Ddhfs.objects.sync.ping.timeout=5",
"-Ddhfs.objects.reconnect_interval=1s",
"-Dcom.usatiuk.dhfs.supportlib.native-path=/libs",
"-Dquarkus.log.category.\"com.usatiuk\".level=TRACE",
"-Dquarkus.log.category.\"com.usatiuk.dhfs\".level=TRACE",
"-Ddhfs.objects.periodic-push-op-interval=5s",

View File

@@ -0,0 +1,177 @@
package com.usatiuk.dhfs.integration;
import io.quarkus.logging.Log;
import java.io.*;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
public class LazyFs {
private static final String lazyFsPath;
private final String dataRoot;
private final String lazyFsDataPath;
private final String name;
private final File tmpConfigFile;
private final File fifoPath;
static {
lazyFsPath = System.getProperty("lazyFsPath");
System.out.println("LazyFs Path: " + lazyFsPath);
}
public LazyFs(String name, String dataRoot, String lazyFsDataPath) {
this.name = name;
this.dataRoot = dataRoot;
this.lazyFsDataPath = lazyFsDataPath;
try {
tmpConfigFile = File.createTempFile("lazyfs", ".conf");
tmpConfigFile.deleteOnExit();
fifoPath = new File("/tmp/" + ThreadLocalRandom.current().nextLong() + ".faultsfifo");
fifoPath.deleteOnExit();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
private Thread outputPiper;
private Process fs;
private String fifoPath() {
return fifoPath.getAbsolutePath();
}
public void start() {
var lfsPath = Path.of(lazyFsPath).resolve("build").resolve("lazyfs");
if (!lfsPath.toFile().isFile())
throw new IllegalStateException("LazyFs binary does not exist: " + lfsPath.toAbsolutePath());
if (!lfsPath.toFile().canExecute())
throw new IllegalStateException("LazyFs binary is not executable: " + lfsPath.toAbsolutePath());
try (var rwFile = new RandomAccessFile(tmpConfigFile, "rw");
var channel = rwFile.getChannel()) {
channel.truncate(0);
var config = "[faults]\n" +
"fifo_path=\"" + fifoPath() + "\"\n" +
"[cache]\n" +
"apply_eviction=false\n" +
"[cache.simple]\n" +
"custom_size=\"1gb\"\n" +
"blocks_per_page=1\n" +
"[filesystem]\n" +
"log_all_operations=false\n" +
"logfile=\"\"";
rwFile.write(config.getBytes());
Log.info("LazyFs config: \n" + config);
} catch (Exception e) {
throw new RuntimeException(e);
}
var argList = new ArrayList<String>();
argList.add(lfsPath.toString());
argList.add(Path.of(dataRoot).toString());
argList.add("--config-path");
argList.add(tmpConfigFile.getAbsolutePath());
argList.add("-o");
argList.add("allow_other");
argList.add("-o");
argList.add("modules=subdir");
argList.add("-o");
argList.add("subdir=" + Path.of(lazyFsDataPath).toAbsolutePath().toString());
try {
Log.info("Starting LazyFs " + argList);
fs = Runtime.getRuntime().exec(argList.toArray(String[]::new));
Thread.sleep(1000);
} catch (Exception e) {
throw new RuntimeException(e);
}
outputPiper = new Thread(() -> {
try {
try (BufferedReader input = new BufferedReader(new InputStreamReader(fs.getInputStream()))) {
String line;
while ((line = input.readLine()) != null) {
System.out.println(line);
}
}
} catch (Exception e) {
Log.info("Exception in LazyFs piper", e);
}
});
outputPiper.start();
outputPiper = new Thread(() -> {
try {
try (BufferedReader input = new BufferedReader(new InputStreamReader(fs.getErrorStream()))) {
String line;
while ((line = input.readLine()) != null) {
System.out.println(line);
}
}
} catch (Exception e) {
Log.info("Exception in LazyFs piper", e);
}
});
outputPiper.start();
}
public void crash() {
try {
var cmd = "echo \"lazyfs::crash::timing=after::op=write::from_rgx=*\" > " + fifoPath();
System.out.println("Running command: " + cmd);
Runtime.getRuntime().exec(new String[]{"/bin/sh", "-c", cmd});
stop();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
public void stop() {
try {
if (fs == null) {
return;
}
Runtime.getRuntime().exec(new String[]{"/bin/sh", "-c", "fusermount3 -u " + dataRoot});
if (!fs.waitFor(1, TimeUnit.SECONDS))
throw new RuntimeException("LazyFs process did not stop in time");
fs = null;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
// Doesn't actually work?
//
// public void crashop() {
// try {
// var cmd = "echo \"lazyfs::torn-op::file=" + Path.of(lazyFsDataPath).toAbsolutePath().toString() + "/objects/data.mdb::persist=1,3::parts=3::occurrence=5\" > /tmp/faults.fifo";
// System.out.println("Running command: " + cmd);
// Runtime.getRuntime().exec(new String[]{"/bin/sh", "-c", cmd});
// Thread.sleep(1000);
// Runtime.getRuntime().exec(new String[]{"/bin/sh", "-c", "fusermount3 -u " + dataRoot});
// Thread.sleep(1000);
// } catch (Exception e) {
// throw new RuntimeException(e);
// }
// }
//
// public void crashseq() {
// try {
// var cmd = "echo \"lazyfs::torn-seq::op=write::file=" + Path.of(lazyFsDataPath).toAbsolutePath().toString() + "/objects/data.mdb::persist=1,4::occurrence=2\" > /tmp/faults.fifo";
// System.out.println("Running command: " + cmd);
// Runtime.getRuntime().exec(new String[]{"/bin/sh", "-c", cmd});
// Thread.sleep(1000);
// Runtime.getRuntime().exec(new String[]{"/bin/sh", "-c", "fusermount3 -u " + dataRoot});
// Thread.sleep(1000);
// } catch (Exception e) {
// throw new RuntimeException(e);
// }
// }
}

View File

@@ -0,0 +1,199 @@
package com.usatiuk.dhfs.integration;
import com.github.dockerjava.api.model.Device;
import com.usatiuk.dhfs.TestDataCleaner;
import io.quarkus.logging.Log;
import org.junit.jupiter.api.*;
import org.slf4j.LoggerFactory;
import org.testcontainers.DockerClientFactory;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.containers.output.WaitingConsumer;
import org.testcontainers.containers.wait.strategy.Wait;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.time.Duration;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Stream;
import static org.awaitility.Awaitility.await;
public class LazyFsIT {
GenericContainer<?> container1;
GenericContainer<?> container2;
WaitingConsumer waitingConsumer1;
WaitingConsumer waitingConsumer2;
String c1uuid;
String c2uuid;
File data1;
File data2;
File data1Lazy;
LazyFs lazyFs1;
@BeforeEach
void setup(TestInfo testInfo) throws IOException, InterruptedException, TimeoutException {
data1 = Files.createTempDirectory("dhfsdata").toFile();
data2 = Files.createTempDirectory("dhfsdata").toFile();
data1Lazy = Files.createTempDirectory("lazyfsroot").toFile();
Network network = Network.newNetwork();
lazyFs1 = new LazyFs(testInfo.getDisplayName(), data1.toString(), data1Lazy.toString());
lazyFs1.start();
container1 = new GenericContainer<>(DhfsImage.getInstance())
.withPrivilegedMode(true)
.withCreateContainerCmdModifier(cmd -> Objects.requireNonNull(cmd.getHostConfig()).withDevices(Device.parse("/dev/fuse")))
.waitingFor(Wait.forLogMessage(".*Listening.*", 1).withStartupTimeout(Duration.ofSeconds(60))).withNetwork(network)
.withFileSystemBind(data1.getAbsolutePath(), "/dhfs_test/data");
container2 = new GenericContainer<>(DhfsImage.getInstance())
.withPrivilegedMode(true)
.withCreateContainerCmdModifier(cmd -> Objects.requireNonNull(cmd.getHostConfig()).withDevices(Device.parse("/dev/fuse")))
.waitingFor(Wait.forLogMessage(".*Listening.*", 1).withStartupTimeout(Duration.ofSeconds(60))).withNetwork(network)
.withFileSystemBind(data2.getAbsolutePath(), "/dhfs_test/data");
Stream.of(container1, container2).parallel().forEach(GenericContainer::start);
waitingConsumer1 = new WaitingConsumer();
var loggingConsumer1 = new Slf4jLogConsumer(LoggerFactory.getLogger(LazyFsIT.class)).withPrefix("1-" + testInfo.getDisplayName());
container1.followOutput(loggingConsumer1.andThen(waitingConsumer1));
waitingConsumer2 = new WaitingConsumer();
var loggingConsumer2 = new Slf4jLogConsumer(LoggerFactory.getLogger(LazyFsIT.class)).withPrefix("2-" + testInfo.getDisplayName());
container2.followOutput(loggingConsumer2.andThen(waitingConsumer2));
c1uuid = container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/data/stuff/self_uuid").getStdout();
c2uuid = container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/data/stuff/self_uuid").getStdout();
Assertions.assertDoesNotThrow(() -> UUID.fromString(c1uuid));
Assertions.assertDoesNotThrow(() -> UUID.fromString(c2uuid));
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("New address"), 60, TimeUnit.SECONDS);
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("New address"), 60, TimeUnit.SECONDS);
var c1curl = container1.execInContainer("/bin/sh", "-c",
"curl --header \"Content-Type: application/json\" " +
" --request PUT " +
" --data '{\"uuid\":\"" + c2uuid + "\"}' " +
" http://localhost:8080/peers-manage/known-peers");
var c2curl = container2.execInContainer("/bin/sh", "-c",
"curl --header \"Content-Type: application/json\" " +
" --request PUT " +
" --data '{\"uuid\":\"" + c1uuid + "\"}' " +
" http://localhost:8080/peers-manage/known-peers");
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
}
@AfterEach
void stop() {
lazyFs1.stop();
Stream.of(container1, container2).parallel().forEach(GenericContainer::stop);
TestDataCleaner.purgeDirectory(data1);
TestDataCleaner.purgeDirectory(data1Lazy);
TestDataCleaner.purgeDirectory(data2);
}
@Test
void killTest(TestInfo testInfo) throws Exception {
var executor = Executors.newFixedThreadPool(2);
var barrier = new CyclicBarrier(2);
var ret1 = executor.submit(() -> {
try {
Log.info("Writing to container 1");
barrier.await();
container1.execInContainer("/bin/sh", "-c", "counter=0; while true; do counter=`expr $counter + 1`; echo $counter >> /dhfs_test/fuse/test1; done");
} catch (Exception e) {
throw new RuntimeException(e);
}
});
barrier.await();
Thread.sleep(1000);
Log.info("Killing");
lazyFs1.crash();
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Caused by: org.lmdbjava"), 5, TimeUnit.SECONDS);
Thread.sleep(1000);
var client = DockerClientFactory.instance().client();
client.killContainerCmd(container1.getContainerId()).exec();
container1.stop();
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Lost connection to"), 60, TimeUnit.SECONDS);
Log.info("Restart");
lazyFs1.start();
container1.start();
waitingConsumer1 = new WaitingConsumer();
var loggingConsumer1 = new Slf4jLogConsumer(LoggerFactory.getLogger(LazyFsIT.class)).withPrefix("1-" + testInfo.getDisplayName());
container1.followOutput(loggingConsumer1.andThen(waitingConsumer1));
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
await().atMost(45, TimeUnit.SECONDS).until(() -> {
Log.info("Listing consistency");
var ls1 = container1.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse");
var cat1 = container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/*");
var ls2 = container2.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse");
var cat2 = container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/*");
Log.info(ls1);
Log.info(cat1);
Log.info(ls2);
Log.info(cat2);
return ls1.equals(ls2) && cat1.equals(cat2);
});
}
// @Test
// void killTestDirs(TestInfo testInfo) throws Exception {
// var executor = Executors.newFixedThreadPool(2);
// var barrier = new CyclicBarrier(2);
// var ret1 = executor.submit(() -> {
// try {
// Log.info("Writing to container 1");
// barrier.await();
// container1.execInContainer("/bin/sh", "-c", "counter=0; while true; do counter=`expr $counter + 1`; echo $counter >> /dhfs_test/fuse/test$counter; done");
// } catch (Exception e) {
// throw new RuntimeException(e);
// }
// });
// barrier.await();
// Thread.sleep(10000);
// var client = DockerClientFactory.instance().client();
// client.killContainerCmd(container1.getContainerId()).exec();
// container1.stop();
// waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Lost connection to"), 60, TimeUnit.SECONDS);
// container1.start();
// waitingConsumer1 = new WaitingConsumer();
// var loggingConsumer1 = new Slf4jLogConsumer(LoggerFactory.getLogger(LazyFsIT.class)).withPrefix("1-" + testInfo.getDisplayName());
// container1.followOutput(loggingConsumer1.andThen(waitingConsumer1));
// waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
// waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
//
// await().atMost(45, TimeUnit.SECONDS).until(() -> {
// Log.info("Listing consistency");
// var ls1 = container1.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse");
// var cat1 = container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/*");
// var ls2 = container2.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse");
// var cat2 = container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/*");
// Log.info(ls1);
// Log.info(cat1);
// Log.info(ls2);
// Log.info(cat2);
//
// return ls1.equals(ls2) && cat1.equals(cat2);
// });
// }
}

View File

@@ -5,7 +5,6 @@ dhfs.objects.ref_verification=true
dhfs.objects.deletion.delay=0
quarkus.log.category."com.usatiuk.dhfs".level=TRACE
quarkus.log.category."com.usatiuk.dhfs".min-level=TRACE
quarkus.class-loading.parent-first-artifacts=com.usatiuk.dhfs:supportlib
quarkus.http.test-port=0
quarkus.http.test-ssl-port=0
dhfs.local-discovery=false

View File

@@ -72,26 +72,6 @@
<artifactId>quarkus-junit5</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.github.SerCeMan</groupId>
<artifactId>jnr-fuse</artifactId>
<version>44ed40f8ce</version>
</dependency>
<dependency>
<groupId>com.github.jnr</groupId>
<artifactId>jnr-ffi</artifactId>
<version>2.2.16</version>
</dependency>
<dependency>
<groupId>com.github.jnr</groupId>
<artifactId>jnr-posix</artifactId>
<version>3.1.19</version>
</dependency>
<dependency>
<groupId>com.github.jnr</groupId>
<artifactId>jnr-constants</artifactId>
<version>0.10.4</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
@@ -127,11 +107,6 @@
<artifactId>kleppmanntree</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.usatiuk.dhfs</groupId>
<artifactId>supportlib</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.usatiuk.dhfs</groupId>
<artifactId>objects</artifactId>

View File

@@ -1,10 +1,10 @@
package com.usatiuk.dhfs.files.objects;
import com.usatiuk.dhfs.JDataRemote;
import com.usatiuk.objects.JObjectKey;
import com.usatiuk.dhfs.jmap.JMapHolder;
import com.usatiuk.dhfs.jmap.JMapLongKey;
import com.usatiuk.dhfs.repository.JDataRemoteDto;
import com.usatiuk.objects.JObjectKey;
import java.util.Collection;
import java.util.Set;
@@ -28,6 +28,10 @@ public record File(JObjectKey key, long mode, long cTime, long mTime,
return new File(key, mode, cTime, mTime, symlink);
}
public File withCurrentMTime() {
return new File(key, mode, cTime, System.currentTimeMillis(), symlink);
}
@Override
public Collection<JObjectKey> collectRefsTo() {
return Set.of();

View File

@@ -33,6 +33,7 @@ import jakarta.inject.Inject;
import org.apache.commons.lang3.tuple.Pair;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.util.*;
@@ -246,7 +247,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
} else if (dent instanceof RemoteObjectMeta) {
var remote = remoteTx.getData(JDataRemote.class, uuid).orElse(null);
if (remote instanceof File f) {
remoteTx.putData(f.withMode(mode).withMTime(System.currentTimeMillis()));
remoteTx.putData(f.withMode(mode).withCurrentMTime());
return true;
} else {
throw new IllegalArgumentException(uuid + " is not a file");
@@ -410,7 +411,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
if (existingEnd < offset) {
if (!pendingPrefix.isEmpty()) {
int diff = Math.toIntExact(offset - existingEnd);
pendingPrefix = pendingPrefix.concat(ByteString.copyFrom(new byte[diff]));
pendingPrefix = pendingPrefix.concat(UnsafeByteOperations.unsafeWrap(ByteBuffer.allocateDirect(diff)));
} else {
fillZeros(existingEnd, offset, newChunks);
start = offset;
@@ -452,7 +453,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
jMapHelper.put(file, JMapLongKey.of(e.getKey()), e.getValue());
}
remoteTx.putData(file);
remoteTx.putData(file.withCurrentMTime());
return (long) data.size();
});
@@ -543,7 +544,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
jMapHelper.put(file, JMapLongKey.of(e.getKey()), e.getValue());
}
remoteTx.putData(file);
remoteTx.putData(file.withCurrentMTime());
return true;
});
}
@@ -572,7 +573,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
}
if (!zeroCache.containsKey(end - cur))
zeroCache.put(end - cur, createChunk(UnsafeByteOperations.unsafeWrap(new byte[Math.toIntExact(end - cur)])));
zeroCache.put(end - cur, createChunk(UnsafeByteOperations.unsafeWrap(ByteBuffer.allocateDirect(Math.toIntExact(end - cur)))));
ChunkData newChunkData = zeroCache.get(end - cur);
newChunks.put(start, newChunkData.key());

View File

@@ -111,6 +111,7 @@ public abstract class DhfsFileServiceSimpleTestImpl {
var uuid = ret.get();
var curMtime = fileService.getattr(uuid).get().mtime();
fileService.write(uuid, 0, new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9});
Assertions.assertArrayEquals(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, fileService.read(uuid, 0, 10).get().toByteArray());
Assertions.assertArrayEquals(new byte[]{2, 3, 4, 5, 6, 7, 8, 9}, fileService.read(uuid, 2, 8).get().toByteArray());
@@ -123,6 +124,9 @@ public abstract class DhfsFileServiceSimpleTestImpl {
fileService.write(uuid, 3, new byte[]{17, 18});
Assertions.assertArrayEquals(new byte[]{0, 1, 2, 17, 18, 11, 15, 16, 8, 9, 13, 14}, fileService.read(uuid, 0, 12).get().toByteArray());
var newMtime = fileService.getattr(uuid).get().mtime();
Assertions.assertTrue(newMtime > curMtime);
fileService.unlink("/writeTest");
Assertions.assertFalse(fileService.open("/writeTest").isPresent());
}

View File

@@ -5,7 +5,6 @@ dhfs.objects.ref_verification=true
dhfs.objects.deletion.delay=0
quarkus.log.category."com.usatiuk.dhfs".level=TRACE
quarkus.log.category."com.usatiuk.dhfs".min-level=TRACE
quarkus.class-loading.parent-first-artifacts=com.usatiuk.dhfs:supportlib
quarkus.http.test-port=0
quarkus.http.test-ssl-port=0
dhfs.local-discovery=false

View File

@@ -73,24 +73,9 @@
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.github.SerCeMan</groupId>
<groupId>com.github.serceman</groupId>
<artifactId>jnr-fuse</artifactId>
<version>44ed40f8ce</version>
</dependency>
<dependency>
<groupId>com.github.jnr</groupId>
<artifactId>jnr-ffi</artifactId>
<version>2.2.16</version>
</dependency>
<dependency>
<groupId>com.github.jnr</groupId>
<artifactId>jnr-posix</artifactId>
<version>3.1.19</version>
</dependency>
<dependency>
<groupId>com.github.jnr</groupId>
<artifactId>jnr-constants</artifactId>
<version>0.10.4</version>
<version>0.5.8</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
@@ -127,11 +112,6 @@
<artifactId>kleppmanntree</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.usatiuk.dhfs</groupId>
<artifactId>supportlib</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.usatiuk.dhfs</groupId>
<artifactId>objects</artifactId>

View File

@@ -5,7 +5,6 @@ import com.sun.security.auth.module.UnixSystem;
import com.usatiuk.dhfs.files.service.DhfsFileService;
import com.usatiuk.dhfs.files.service.DirectoryNotEmptyException;
import com.usatiuk.dhfs.files.service.GetattrRes;
import com.usatiuk.dhfs.supportlib.UninitializedByteBuffer;
import com.usatiuk.kleppmanntree.AlreadyExistsException;
import com.usatiuk.objects.JObjectKey;
import io.grpc.Status;
@@ -28,6 +27,7 @@ import ru.serce.jnrfuse.struct.FuseFileInfo;
import ru.serce.jnrfuse.struct.Statvfs;
import ru.serce.jnrfuse.struct.Timespec;
import java.nio.ByteBuffer;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Optional;
@@ -237,7 +237,7 @@ public class DhfsFuse extends FuseStubFS {
if (offset < 0) return -ErrorCodes.EINVAL();
try {
var fileKey = getFromHandle(fi.fh.get());
var buffer = UninitializedByteBuffer.allocateUninitialized((int) size);
var buffer = ByteBuffer.allocateDirect((int) size);
if (buffer.isDirect()) {
jnrPtrByteOutputAccessors.getUnsafe().copyMemory(

View File

@@ -5,7 +5,6 @@ dhfs.objects.ref_verification=true
dhfs.objects.deletion.delay=0
quarkus.log.category."com.usatiuk.dhfs".level=TRACE
quarkus.log.category."com.usatiuk.dhfs".min-level=TRACE
quarkus.class-loading.parent-first-artifacts=com.usatiuk.dhfs:supportlib
quarkus.http.test-port=0
quarkus.http.test-ssl-port=0
dhfs.local-discovery=false

View File

@@ -18,6 +18,11 @@
</properties>
<dependencies>
<dependency>
<groupId>net.jqwik</groupId>
<artifactId>jqwik</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-junit5</artifactId>
@@ -54,11 +59,6 @@
<artifactId>utils</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.usatiuk.dhfs</groupId>
<artifactId>supportlib</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-junit5-mockito</artifactId>

View File

@@ -1,6 +1,13 @@
package com.usatiuk.objects;
public interface JDataVersionedWrapper {
import com.usatiuk.objects.iterators.Data;
public sealed interface JDataVersionedWrapper extends Data<JDataVersionedWrapper> permits JDataVersionedWrapperLazy, JDataVersionedWrapperImpl {
@Override
default JDataVersionedWrapper value() {
return this;
}
JData data();
long version();

View File

@@ -2,11 +2,11 @@ package com.usatiuk.objects;
import java.util.function.Supplier;
public class JDataVersionedWrapperLazy implements JDataVersionedWrapper {
public final class JDataVersionedWrapperLazy implements JDataVersionedWrapper {
private JData _data;
private final long _version;
private final int _estimatedSize;
private Supplier<JData> _producer;
private JData _data;
public JDataVersionedWrapperLazy(long version, int estimatedSize, Supplier<JData> producer) {
_version = version;
@@ -14,6 +14,18 @@ public class JDataVersionedWrapperLazy implements JDataVersionedWrapper {
_producer = producer;
}
public void setCacheCallback(Runnable cacheCallback) {
if (_data != null) {
throw new IllegalStateException("Cache callback can be set only before data is loaded");
}
var oldProducer = _producer;
_producer = () -> {
var ret = oldProducer.get();
cacheCallback.run();
return ret;
};
}
public JData data() {
if (_data != null)
return _data;

View File

@@ -2,12 +2,12 @@ package com.usatiuk.objects;
import com.google.protobuf.ByteString;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import java.nio.ByteBuffer;
@ApplicationScoped
@Singleton
public class JDataVersionedWrapperSerializer implements ObjectSerializer<JDataVersionedWrapper> {
@Inject
ObjectSerializer<JData> dataSerializer;
@@ -24,6 +24,8 @@ public class JDataVersionedWrapperSerializer implements ObjectSerializer<JDataVe
public JDataVersionedWrapper deserialize(ByteString data) {
var version = data.substring(0, Long.BYTES).asReadOnlyByteBuffer().getLong();
var rawData = data.substring(Long.BYTES);
return new JDataVersionedWrapperLazy(version, rawData.size(), () -> dataSerializer.deserialize(rawData));
return new JDataVersionedWrapperLazy(version, rawData.size(),
() -> dataSerializer.deserialize(rawData)
);
}
}

View File

@@ -26,11 +26,13 @@ public sealed interface JObjectKey extends Serializable, Comparable<JObjectKey>
}
static JObjectKey fromBytes(byte[] bytes) {
return new JObjectKeyImpl(new String(bytes, StandardCharsets.UTF_8));
return new JObjectKeyImpl(new String(bytes, StandardCharsets.ISO_8859_1));
}
static JObjectKey fromByteBuffer(ByteBuffer buff) {
return new JObjectKeyImpl(StandardCharsets.UTF_8.decode(buff).toString());
byte[] bytes = new byte[buff.remaining()];
buff.get(bytes);
return new JObjectKeyImpl(bytes);
}
@Override
@@ -39,8 +41,6 @@ public sealed interface JObjectKey extends Serializable, Comparable<JObjectKey>
@Override
String toString();
byte[] bytes();
ByteBuffer toByteBuffer();
String value();

View File

@@ -1,11 +1,24 @@
package com.usatiuk.objects;
import com.usatiuk.dhfs.supportlib.UninitializedByteBuffer;
import java.io.Serial;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Objects;
public final class JObjectKeyImpl implements JObjectKey {
@Serial
private static final long serialVersionUID = 0L;
private final String value;
private transient ByteBuffer _bb = null;
public JObjectKeyImpl(String value) {
this.value = value;
}
public JObjectKeyImpl(byte[] bytes) {
this.value = new String(bytes, StandardCharsets.ISO_8859_1);
}
public record JObjectKeyImpl(String value) implements JObjectKey {
@Override
public int compareTo(JObjectKey o) {
switch (o) {
@@ -27,17 +40,36 @@ public record JObjectKeyImpl(String value) implements JObjectKey {
}
@Override
public byte[] bytes() {
return value.getBytes(StandardCharsets.UTF_8);
public ByteBuffer toByteBuffer() {
if (_bb != null) return _bb;
synchronized (this) {
if (_bb != null) return _bb;
var bytes = value.getBytes(StandardCharsets.ISO_8859_1);
var directBb = ByteBuffer.allocateDirect(bytes.length);
directBb.put(bytes);
directBb.flip();
_bb = directBb;
return directBb;
}
}
@Override
public ByteBuffer toByteBuffer() {
var heapBb = StandardCharsets.UTF_8.encode(value);
if (heapBb.isDirect()) return heapBb;
var directBb = UninitializedByteBuffer.allocateUninitialized(heapBb.remaining());
directBb.put(heapBb);
directBb.flip();
return directBb;
public String value() {
return value;
}
@Override
public boolean equals(Object obj) {
if (obj == this) return true;
if (obj == null || obj.getClass() != this.getClass()) return false;
var that = (JObjectKeyImpl) obj;
return Objects.equals(this.value, that.value);
}
@Override
public int hashCode() {
return Objects.hash(value);
}
}

View File

@@ -18,11 +18,6 @@ public record JObjectKeyMax() implements JObjectKey {
}
}
@Override
public byte[] bytes() {
throw new UnsupportedOperationException();
}
@Override
public ByteBuffer toByteBuffer() {
throw new UnsupportedOperationException();

View File

@@ -18,11 +18,6 @@ public record JObjectKeyMin() implements JObjectKey {
}
}
@Override
public byte[] bytes() {
throw new UnsupportedOperationException();
}
@Override
public ByteBuffer toByteBuffer() {
throw new UnsupportedOperationException();

View File

@@ -2,9 +2,6 @@ package com.usatiuk.objects.iterators;
import java.util.Optional;
public record Data<V>(V value) implements MaybeTombstone<V> {
@Override
public Optional<V> opt() {
return Optional.of(value);
}
public interface Data<V> extends MaybeTombstone<V> {
V value();
}

View File

@@ -0,0 +1,6 @@
package com.usatiuk.objects.iterators;
import java.util.Optional;
public record DataWrapper<V>(V value) implements Data<V> {
}

View File

@@ -3,5 +3,4 @@ package com.usatiuk.objects.iterators;
import java.util.Optional;
public interface MaybeTombstone<T> {
Optional<T> opt();
}

View File

@@ -1,25 +1,37 @@
package com.usatiuk.objects.iterators;
import io.quarkus.logging.Log;
import org.apache.commons.lang3.mutable.MutableObject;
import org.apache.commons.lang3.tuple.Pair;
import java.util.*;
import java.util.stream.IntStream;
import java.util.List;
import java.util.NavigableMap;
import java.util.NoSuchElementException;
import java.util.TreeMap;
public class MergingKvIterator<K extends Comparable<K>, V> extends ReversibleKvIterator<K, V> {
private final NavigableMap<K, CloseableKvIterator<K, V>> _sortedIterators = new TreeMap<>();
private record IteratorEntry<K extends Comparable<K>, V>(int priority, CloseableKvIterator<K, V> iterator) {
public IteratorEntry<K, V> reversed() {
return new IteratorEntry<>(priority, iterator.reversed());
}
}
private final NavigableMap<K, IteratorEntry<K, V>> _sortedIterators = new TreeMap<>();
private final String _name;
private final Map<CloseableKvIterator<K, V>, Integer> _iterators;
private final List<IteratorEntry<K, V>> _iterators;
public MergingKvIterator(String name, IteratorStart startType, K startKey, List<IterProdFn<K, V>> iterators) {
_goingForward = true;
_name = name;
_iterators = Map.ofEntries(
IntStream.range(0, iterators.size())
.mapToObj(i -> Pair.of(iterators.get(i).get(startType, startKey), i))
.toArray(Pair[]::new)
);
// Why streams are so slow?
{
IteratorEntry<K, V>[] iteratorEntries = new IteratorEntry[iterators.size()];
for (int i = 0; i < iterators.size(); i++) {
iteratorEntries[i] = new IteratorEntry<>(i, iterators.get(i).get(startType, startKey));
}
_iterators = List.of(iteratorEntries);
}
if (startType == IteratorStart.LT || startType == IteratorStart.LE) {
// Starting at a greatest key less than/less or equal than:
@@ -30,7 +42,8 @@ public class MergingKvIterator<K extends Comparable<K>, V> extends ReversibleKvI
K greatestLess = null;
K smallestMore = null;
for (var it : _iterators.keySet()) {
for (var ite : _iterators) {
var it = ite.iterator();
if (it.hasNext()) {
var peeked = it.peekNextKey();
if (startType == IteratorStart.LE ? peeked.compareTo(startKey) <= 0 : peeked.compareTo(startKey) < 0) {
@@ -55,14 +68,15 @@ public class MergingKvIterator<K extends Comparable<K>, V> extends ReversibleKvI
// Empty iterators
}
for (var iterator : _iterators.keySet()) {
for (var ite : _iterators) {
var iterator = ite.iterator();
while (iterator.hasNext() && iterator.peekNextKey().compareTo(initialMaxValue) < 0) {
iterator.skip();
}
}
}
for (CloseableKvIterator<K, V> iterator : _iterators.keySet()) {
for (IteratorEntry<K, V> iterator : _iterators) {
advanceIterator(iterator);
}
@@ -88,29 +102,39 @@ public class MergingKvIterator<K extends Comparable<K>, V> extends ReversibleKvI
this(name, startType, startKey, List.of(iterators));
}
private void advanceIterator(CloseableKvIterator<K, V> iterator) {
if (!iterator.hasNext()) {
return;
}
private void advanceIterator(IteratorEntry<K, V> iteratorEntry) {
while (iteratorEntry.iterator().hasNext()) {
K key = iteratorEntry.iterator().peekNextKey();
Log.tracev("{0} Advance peeked: {1}-{2}", _name, iteratorEntry, key);
K key = iterator.peekNextKey();
Log.tracev("{0} Advance peeked: {1}-{2}", _name, iterator, key);
if (!_sortedIterators.containsKey(key)) {
_sortedIterators.put(key, iterator);
return;
}
MutableObject<IteratorEntry<K, V>> mutableBoolean = new MutableObject<>(null);
// Expects that reversed iterator returns itself when reversed again
var oursPrio = _iterators.get(_goingForward ? iterator : iterator.reversed());
var them = _sortedIterators.get(key);
var theirsPrio = _iterators.get(_goingForward ? them : them.reversed());
if (oursPrio < theirsPrio) {
_sortedIterators.put(key, iterator);
advanceIterator(them);
} else {
Log.tracev("{0} Skipped: {1}", _name, iterator.peekNextKey());
iterator.skip();
advanceIterator(iterator);
var newVal = _sortedIterators.merge(key, iteratorEntry, (theirsEntry, oldValOurs) -> {
var oursPrio = oldValOurs.priority();
var theirsPrio = theirsEntry.priority();
if (oursPrio < theirsPrio) {
mutableBoolean.setValue(theirsEntry);
return oldValOurs;
// advance them
// return
} else {
return theirsEntry;
// skip, continue
}
});
if (newVal != iteratorEntry) {
Log.tracev("{0} Skipped: {1}", _name, iteratorEntry.iterator().peekNextKey());
iteratorEntry.iterator().skip();
continue;
}
if (mutableBoolean.getValue() != null) {
advanceIterator(mutableBoolean.getValue());
return;
}
return;
}
}
@@ -120,7 +144,7 @@ public class MergingKvIterator<K extends Comparable<K>, V> extends ReversibleKvI
Log.tracev("{0} Reversing from {1}", _name, cur);
_goingForward = !_goingForward;
_sortedIterators.clear();
for (CloseableKvIterator<K, V> iterator : _iterators.keySet()) {
for (IteratorEntry<K, V> iterator : _iterators) {
// _goingForward inverted already
advanceIterator(!_goingForward ? iterator.reversed() : iterator);
}
@@ -150,7 +174,7 @@ public class MergingKvIterator<K extends Comparable<K>, V> extends ReversibleKvI
if (cur == null) {
throw new NoSuchElementException();
}
cur.getValue().skip();
cur.getValue().iterator().skip();
advanceIterator(cur.getValue());
Log.tracev("{0} Skip: {1}, next: {2}", _name, cur, _sortedIterators);
}
@@ -166,7 +190,7 @@ public class MergingKvIterator<K extends Comparable<K>, V> extends ReversibleKvI
if (cur == null) {
throw new NoSuchElementException();
}
var curVal = cur.getValue().next();
var curVal = cur.getValue().iterator().next();
advanceIterator(cur.getValue());
// Log.tracev("{0} Read from {1}: {2}, next: {3}", _name, cur.getValue(), curVal, _sortedIterators.keySet());
return curVal;
@@ -174,8 +198,8 @@ public class MergingKvIterator<K extends Comparable<K>, V> extends ReversibleKvI
@Override
public void close() {
for (CloseableKvIterator<K, V> iterator : _iterators.keySet()) {
iterator.close();
for (IteratorEntry<K, V> iterator : _iterators) {
iterator.iterator().close();
}
}

View File

@@ -9,22 +9,22 @@ public class NavigableMapKvIterator<K extends Comparable<K>, V> extends Reversib
private Iterator<Map.Entry<K, V>> _iterator;
private Map.Entry<K, V> _next;
public NavigableMapKvIterator(NavigableMap<K, V> map, IteratorStart start, K key) {
_map = map;
public NavigableMapKvIterator(NavigableMap<K, ? extends V> map, IteratorStart start, K key) {
_map = (NavigableMap<K, V>) map;
SortedMap<K, V> _view;
_goingForward = true;
switch (start) {
case GE -> _view = map.tailMap(key, true);
case GT -> _view = map.tailMap(key, false);
case GE -> _view = _map.tailMap(key, true);
case GT -> _view = _map.tailMap(key, false);
case LE -> {
var floorKey = map.floorKey(key);
var floorKey = _map.floorKey(key);
if (floorKey == null) _view = _map;
else _view = map.tailMap(floorKey, true);
else _view = _map.tailMap(floorKey, true);
}
case LT -> {
var lowerKey = map.lowerKey(key);
if (lowerKey == null) _view = _map;
else _view = map.tailMap(lowerKey, true);
else _view = _map.tailMap(lowerKey, true);
}
default -> throw new IllegalArgumentException("Unknown start type");
}

View File

@@ -2,9 +2,5 @@ package com.usatiuk.objects.iterators;
import java.util.Optional;
public record Tombstone<V>() implements MaybeTombstone<V> {
@Override
public Optional<V> opt() {
return Optional.empty();
}
public interface Tombstone<V> extends MaybeTombstone<V> {
}

View File

@@ -0,0 +1,4 @@
package com.usatiuk.objects.iterators;
public record TombstoneImpl<V>() implements Tombstone<V> {
}

View File

@@ -1,83 +1,24 @@
package com.usatiuk.objects.iterators;
import io.quarkus.logging.Log;
import org.apache.commons.lang3.tuple.Pair;
import java.util.List;
public class TombstoneMergingKvIterator<K extends Comparable<K>, V> implements CloseableKvIterator<K, V> {
private final CloseableKvIterator<K, V> _backing;
private final String _name;
public TombstoneMergingKvIterator(String name, IteratorStart startType, K startKey, List<IterProdFn<K, MaybeTombstone<V>>> iterators) {
_name = name;
_backing = new PredicateKvIterator<>(
new MergingKvIterator<>(name + "-merging", startType, startKey, iterators),
public abstract class TombstoneMergingKvIterator {
public static <K extends Comparable<K>, V> CloseableKvIterator<K, V> of(String name, IteratorStart startType, K startKey, List<IterProdFn<K, MaybeTombstone<V>>> iterators) {
return new PredicateKvIterator<K, MaybeTombstone<V>, V>(
new MergingKvIterator<K, MaybeTombstone<V>>(name + "-merging", startType, startKey, iterators),
startType, startKey,
pair -> {
Log.tracev("{0} - Processing pair {1}", _name, pair);
if (pair instanceof Tombstone) {
Log.tracev("{0} - Processing pair {1}", name, pair);
if (pair instanceof Tombstone<V>) {
return null;
}
return ((Data<V>) pair).value();
});
}
@SafeVarargs
public TombstoneMergingKvIterator(String name, IteratorStart startType, K startKey, IterProdFn<K, MaybeTombstone<V>>... iterators) {
this(name, startType, startKey, List.of(iterators));
}
@Override
public K peekNextKey() {
return _backing.peekNextKey();
}
@Override
public void skip() {
_backing.skip();
}
@Override
public K peekPrevKey() {
return _backing.peekPrevKey();
}
@Override
public Pair<K, V> prev() {
return _backing.prev();
}
@Override
public boolean hasPrev() {
return _backing.hasPrev();
}
@Override
public void skipPrev() {
_backing.skipPrev();
}
@Override
public void close() {
_backing.close();
}
@Override
public boolean hasNext() {
return _backing.hasNext();
}
@Override
public Pair<K, V> next() {
return _backing.next();
}
@Override
public String toString() {
return "TombstoneMergingKvIterator{" +
"_backing=" + _backing +
", _name='" + _name + '\'' +
'}';
public static <K extends Comparable<K>, V> CloseableKvIterator<K, V> of(String name, IteratorStart startType, K startKey, IterProdFn<K, MaybeTombstone<V>>... iterators) {
return of(name, startType, startKey, List.of(iterators));
}
}

View File

@@ -6,13 +6,14 @@ import com.usatiuk.objects.stores.WritebackObjectPersistentStore;
import com.usatiuk.objects.transaction.TxRecord;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import javax.annotation.Nonnull;
import java.util.Collection;
import java.util.Optional;
import java.util.function.Consumer;
@ApplicationScoped
@Singleton
public class SnapshotManager {
@Inject
WritebackObjectPersistentStore writebackStore;

View File

@@ -1,6 +1,7 @@
package com.usatiuk.objects.stores;
import com.usatiuk.objects.JDataVersionedWrapper;
import com.usatiuk.objects.JDataVersionedWrapperLazy;
import com.usatiuk.objects.JObjectKey;
import com.usatiuk.objects.iterators.*;
import com.usatiuk.objects.snapshot.Snapshot;
@@ -33,10 +34,9 @@ public class CachingObjectPersistentStore {
long version,
int sizeLimit) {
public Cache withPut(JObjectKey key, Optional<JDataVersionedWrapper> obj) {
int objSize = obj.map(JDataVersionedWrapper::estimateSize).orElse(16);
var entry = obj.<CacheEntry>map(o -> new CacheEntryPresent(o, o.estimateSize())).orElse(new CacheEntryMiss());
int newSize = size() + objSize;
var entry = new CacheEntry(obj.<MaybeTombstone<JDataVersionedWrapper>>map(Data::new).orElse(new Tombstone<>()), objSize);
int newSize = size() + entry.size();
var old = map.get(key);
if (old != null)
@@ -141,10 +141,11 @@ public class CachingObjectPersistentStore {
Cache finalCurCache = curCache;
return new Snapshot<JObjectKey, JDataVersionedWrapper>() {
private boolean _invalid = false;
private boolean _closed = false;
private final Cache _curCache = finalCurCache;
private final Snapshot<JObjectKey, JDataVersionedWrapper> _backing = finalBacking;
private void maybeCache(JObjectKey key, Optional<JDataVersionedWrapper> obj) {
private void doCache(JObjectKey key, Optional<JDataVersionedWrapper> obj) {
_cacheTries.incrementAndGet();
if (_invalid)
return;
@@ -160,18 +161,34 @@ public class CachingObjectPersistentStore {
_cached.incrementAndGet();
}
private void maybeCache(JObjectKey key, Optional<JDataVersionedWrapper> obj) {
if (obj.isEmpty()) {
doCache(key, obj);
return;
}
var wrapper = obj.get();
if (!(wrapper instanceof JDataVersionedWrapperLazy lazy)) {
doCache(key, obj);
return;
}
lazy.setCacheCallback(() -> {
if (_closed) {
Log.error("Cache callback called after close");
System.exit(-1);
}
doCache(key, obj);
});
return;
}
@Override
public CloseableKvIterator<JObjectKey, JDataVersionedWrapper> getIterator(IteratorStart start, JObjectKey key) {
return new TombstoneMergingKvIterator<>("cache", start, key,
(mS, mK)
-> new MappingKvIterator<>(
new NavigableMapKvIterator<>(_curCache.map(), mS, mK),
e -> {
// Log.tracev("Taken from cache: {0}", e);
return e.object();
}
),
(mS, mK) -> new MappingKvIterator<>(new CachingKvIterator(_backing.getIterator(start, key)), Data::new));
return TombstoneMergingKvIterator.<JObjectKey, JDataVersionedWrapper>of("cache", start, key,
(mS, mK) -> new NavigableMapKvIterator<JObjectKey, MaybeTombstone<JDataVersionedWrapper>>(_curCache.map(), mS, mK),
(mS, mK) -> new CachingKvIterator(_backing.getIterator(start, key)));
}
@Nonnull
@@ -179,12 +196,12 @@ public class CachingObjectPersistentStore {
public Optional<JDataVersionedWrapper> readObject(JObjectKey name) {
var cached = _curCache.map().get(name);
if (cached != null) {
return switch (cached.object()) {
case Data<JDataVersionedWrapper> data -> Optional.of(data.value());
case Tombstone<JDataVersionedWrapper> tombstone -> {
return switch (cached) {
case CacheEntryPresent data -> Optional.of(data.value());
case CacheEntryMiss tombstone -> {
yield Optional.empty();
}
default -> throw new IllegalStateException("Unexpected value: " + cached.object());
default -> throw new IllegalStateException("Unexpected value: " + cached);
};
}
var read = _backing.readObject(name);
@@ -199,10 +216,11 @@ public class CachingObjectPersistentStore {
@Override
public void close() {
_closed = true;
_backing.close();
}
private class CachingKvIterator implements CloseableKvIterator<JObjectKey, JDataVersionedWrapper> {
private class CachingKvIterator implements CloseableKvIterator<JObjectKey, MaybeTombstone<JDataVersionedWrapper>> {
private final CloseableKvIterator<JObjectKey, JDataVersionedWrapper> _delegate;
private CachingKvIterator(CloseableKvIterator<JObjectKey, JDataVersionedWrapper> delegate) {
@@ -235,10 +253,10 @@ public class CachingObjectPersistentStore {
}
@Override
public Pair<JObjectKey, JDataVersionedWrapper> prev() {
public Pair<JObjectKey, MaybeTombstone<JDataVersionedWrapper>> prev() {
var prev = _delegate.prev();
maybeCache(prev.getKey(), Optional.of(prev.getValue()));
return prev;
return (Pair<JObjectKey, MaybeTombstone<JDataVersionedWrapper>>) (Pair<JObjectKey, ?>) prev;
}
@Override
@@ -252,10 +270,10 @@ public class CachingObjectPersistentStore {
}
@Override
public Pair<JObjectKey, JDataVersionedWrapper> next() {
public Pair<JObjectKey, MaybeTombstone<JDataVersionedWrapper>> next() {
var next = _delegate.next();
maybeCache(next.getKey(), Optional.of(next.getValue()));
return next;
return (Pair<JObjectKey, MaybeTombstone<JDataVersionedWrapper>>) (Pair<JObjectKey, ?>) next;
}
}
};
@@ -268,6 +286,18 @@ public class CachingObjectPersistentStore {
}
}
private record CacheEntry(MaybeTombstone<JDataVersionedWrapper> object, int size) {
private interface CacheEntry extends MaybeTombstone<JDataVersionedWrapper> {
int size();
}
private record CacheEntryPresent(JDataVersionedWrapper value,
int size) implements CacheEntry, Data<JDataVersionedWrapper> {
}
private record CacheEntryMiss() implements CacheEntry, Tombstone<JDataVersionedWrapper> {
@Override
public int size() {
return 64;
}
}
}

View File

@@ -2,6 +2,7 @@ package com.usatiuk.objects.stores;
import com.google.protobuf.ByteString;
import com.google.protobuf.UnsafeByteOperations;
import com.usatiuk.dhfs.utils.RefcountedCloseable;
import com.usatiuk.objects.JObjectKey;
import com.usatiuk.objects.JObjectKeyMax;
import com.usatiuk.objects.JObjectKeyMin;
@@ -10,8 +11,6 @@ import com.usatiuk.objects.iterators.IteratorStart;
import com.usatiuk.objects.iterators.KeyPredicateKvIterator;
import com.usatiuk.objects.iterators.ReversibleKvIterator;
import com.usatiuk.objects.snapshot.Snapshot;
import com.usatiuk.dhfs.supportlib.UninitializedByteBuffer;
import com.usatiuk.dhfs.utils.RefcountedCloseable;
import io.quarkus.arc.properties.IfBuildProperty;
import io.quarkus.logging.Log;
import io.quarkus.runtime.ShutdownEvent;
@@ -30,7 +29,6 @@ import java.lang.ref.Cleaner;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.NoSuchElementException;
import java.util.Optional;
@@ -41,7 +39,17 @@ import static org.lmdbjava.Env.create;
@IfBuildProperty(name = "dhfs.objects.persistence", stringValue = "lmdb")
public class LmdbObjectPersistentStore implements ObjectPersistentStore {
private static final String DB_NAME = "objects";
private static final byte[] DB_VER_OBJ_NAME = "__DB_VER_OBJ".getBytes(StandardCharsets.UTF_8);
private static final String DB_VER_OBJ_NAME_STR = "__DB_VER_OBJ";
private static final ByteBuffer DB_VER_OBJ_NAME;
static {
byte[] tmp = DB_VER_OBJ_NAME_STR.getBytes(StandardCharsets.ISO_8859_1);
var bb = ByteBuffer.allocateDirect(tmp.length);
bb.put(tmp);
bb.flip();
DB_VER_OBJ_NAME = bb.asReadOnlyBuffer();
}
private final Path _root;
private Env<ByteBuffer> _env;
private Dbi<ByteBuffer> _db;
@@ -67,13 +75,10 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore {
if (read.isPresent()) {
Log.infov("Read tx id {0}", read.get());
} else {
var bb = ByteBuffer.allocateDirect(DB_VER_OBJ_NAME.length);
bb.put(DB_VER_OBJ_NAME);
bb.flip();
var bbData = ByteBuffer.allocateDirect(8);
bbData.putLong(0);
bbData.flip();
_db.put(txn, bb, bbData);
_db.put(txn, DB_VER_OBJ_NAME.asReadOnlyBuffer(), bbData);
txn.commit();
}
}
@@ -82,10 +87,7 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore {
}
private Optional<Long> readTxId(Txn<ByteBuffer> txn) {
var bb = ByteBuffer.allocateDirect(DB_VER_OBJ_NAME.length);
bb.put(DB_VER_OBJ_NAME);
bb.flip();
var value = _db.get(txn, bb);
var value = _db.get(txn, DB_VER_OBJ_NAME.asReadOnlyBuffer());
return Optional.ofNullable(value).map(ByteBuffer::getLong);
}
@@ -121,7 +123,7 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore {
@Override
public CloseableKvIterator<JObjectKey, ByteString> getIterator(IteratorStart start, JObjectKey key) {
assert !_closed;
return new KeyPredicateKvIterator<>(new LmdbKvIterator(_txn.ref(), start, key), start, key, (k) -> !Arrays.equals(k.value().getBytes(StandardCharsets.UTF_8), DB_VER_OBJ_NAME));
return new KeyPredicateKvIterator<>(new LmdbKvIterator(_txn.ref(), start, key), start, key, (k) -> !k.value().equals(DB_VER_OBJ_NAME_STR));
}
@Nonnull
@@ -129,12 +131,7 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore {
public Optional<ByteString> readObject(JObjectKey name) {
assert !_closed;
var got = _db.get(_txn.get(), name.toByteBuffer());
var ret = Optional.ofNullable(got).map(read -> {
var uninitBb = UninitializedByteBuffer.allocateUninitialized(got.remaining());
uninitBb.put(got);
uninitBb.flip();
return UnsafeByteOperations.unsafeWrap(uninitBb);
});
var ret = Optional.ofNullable(got).map(UnsafeByteOperations::unsafeWrap);
return ret;
}
@@ -168,13 +165,10 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore {
assert txId > readTxId(txn).orElseThrow();
var bb = ByteBuffer.allocateDirect(DB_VER_OBJ_NAME.length);
bb.put(DB_VER_OBJ_NAME);
bb.flip();
var bbData = ByteBuffer.allocateDirect(8);
bbData.putLong(txId);
bbData.flip();
_db.put(txn, bb, bbData);
_db.put(txn, DB_VER_OBJ_NAME.asReadOnlyBuffer(), bbData);
} catch (Throwable t) {
txn.close();
throw t;
@@ -214,6 +208,7 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore {
// private final Exception _allocationStacktrace = new Exception();
private final Exception _allocationStacktrace = null;
private boolean _hasNext = false;
private JObjectKey _peekedNextKey = null;
LmdbKvIterator(RefcountedCloseable<Txn<ByteBuffer>> txn, IteratorStart start, JObjectKey key) {
_txn = txn;
@@ -283,24 +278,24 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore {
}
}
var realGot = JObjectKey.fromByteBuffer(_cursor.key());
_cursor.key().flip();
switch (start) {
case LT -> {
// assert !_hasNext || realGot.compareTo(key) < 0;
}
case LE -> {
// assert !_hasNext || realGot.compareTo(key) <= 0;
}
case GT -> {
assert !_hasNext || realGot.compareTo(key) > 0;
}
case GE -> {
assert !_hasNext || realGot.compareTo(key) >= 0;
}
}
Log.tracev("got: {0}, hasNext: {1}", realGot, _hasNext);
// var realGot = JObjectKey.fromByteBuffer(_cursor.key());
// _cursor.key().flip();
//
// switch (start) {
// case LT -> {
//// assert !_hasNext || realGot.compareTo(key) < 0;
// }
// case LE -> {
//// assert !_hasNext || realGot.compareTo(key) <= 0;
// }
// case GT -> {
// assert !_hasNext || realGot.compareTo(key) > 0;
// }
// case GE -> {
// assert !_hasNext || realGot.compareTo(key) >= 0;
// }
// }
// Log.tracev("got: {0}, hasNext: {1}", realGot, _hasNext);
}
@Override
@@ -329,6 +324,7 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore {
}
}
_goingForward = !_goingForward;
_peekedNextKey = null;
}
@Override
@@ -336,8 +332,12 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore {
if (!_hasNext) {
throw new NoSuchElementException("No more elements");
}
if (_peekedNextKey != null) {
return _peekedNextKey;
}
var ret = JObjectKey.fromByteBuffer(_cursor.key());
_cursor.key().flip();
_peekedNextKey = ret;
return ret;
}
@@ -347,6 +347,7 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore {
_hasNext = _cursor.next();
else
_hasNext = _cursor.prev();
_peekedNextKey = null;
}
@Override
@@ -360,18 +361,15 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore {
throw new NoSuchElementException("No more elements");
}
// TODO: Right now with java serialization it doesn't matter, it's all copied to arrays anyway
// var val = _cursor.val();
// var bbDirect = UninitializedByteBuffer.allocateUninitialized(val.remaining());
// bbDirect.put(val);
// bbDirect.flip();
// var bs = UnsafeByteOperations.unsafeWrap(bbDirect);
// var ret = Pair.of(JObjectKey.fromByteBuffer(_cursor.key()), bs);
var ret = Pair.of(JObjectKey.fromByteBuffer(_cursor.key()), ByteString.copyFrom(_cursor.val()));
var val = _cursor.val();
var bs = UnsafeByteOperations.unsafeWrap(val);
var ret = Pair.of(JObjectKey.fromByteBuffer(_cursor.key()), bs);
if (_goingForward)
_hasNext = _cursor.next();
else
_hasNext = _cursor.prev();
Log.tracev("Read: {0}, hasNext: {1}", ret, _hasNext);
_peekedNextKey = null;
return ret;
}
}

View File

@@ -1,6 +1,9 @@
package com.usatiuk.objects.stores;
import com.usatiuk.objects.JDataVersionedWrapper;
import com.usatiuk.objects.JObjectKey;
import com.usatiuk.objects.iterators.Tombstone;
public record PendingDelete(JObjectKey key, long bundleId) implements PendingWriteEntry {
public record PendingDelete(JObjectKey key,
long bundleId) implements PendingWriteEntry, Tombstone<JDataVersionedWrapper> {
}

View File

@@ -1,6 +1,7 @@
package com.usatiuk.objects.stores;
import com.usatiuk.objects.JDataVersionedWrapper;
import com.usatiuk.objects.iterators.Data;
public record PendingWrite(JDataVersionedWrapper data, long bundleId) implements PendingWriteEntry {
public record PendingWrite(JDataVersionedWrapper value, long bundleId) implements PendingWriteEntry, Data<JDataVersionedWrapper> {
}

View File

@@ -1,5 +1,8 @@
package com.usatiuk.objects.stores;
public interface PendingWriteEntry {
import com.usatiuk.objects.JDataVersionedWrapper;
import com.usatiuk.objects.iterators.MaybeTombstone;
public interface PendingWriteEntry extends MaybeTombstone<JDataVersionedWrapper> {
long bundleId();
}

View File

@@ -3,7 +3,6 @@ package com.usatiuk.objects.stores;
import com.google.protobuf.ByteString;
import com.usatiuk.objects.JDataVersionedWrapper;
import com.usatiuk.objects.JObjectKey;
import com.usatiuk.objects.JObjectKeyImpl;
import com.usatiuk.objects.ObjectSerializer;
import com.usatiuk.objects.iterators.CloseableKvIterator;
import com.usatiuk.objects.iterators.IteratorStart;

View File

@@ -350,15 +350,9 @@ public class WritebackObjectPersistentStore {
@Override
public CloseableKvIterator<JObjectKey, JDataVersionedWrapper> getIterator(IteratorStart start, JObjectKey key) {
return new TombstoneMergingKvIterator<>("writeback-ps", start, key,
(tS, tK) -> new MappingKvIterator<>(
new NavigableMapKvIterator<>(_pendingWrites, tS, tK),
e -> switch (e) {
case PendingWrite pw -> new Data<>(pw.data());
case PendingDelete d -> new Tombstone<>();
default -> throw new IllegalStateException("Unexpected value: " + e);
}),
(tS, tK) -> new MappingKvIterator<>(_cache.getIterator(tS, tK), Data::new));
return TombstoneMergingKvIterator.<JObjectKey, JDataVersionedWrapper>of("writeback-ps", start, key,
(tS, tK) -> new NavigableMapKvIterator<>(_pendingWrites, tS, tK),
(tS, tK) -> (CloseableKvIterator<JObjectKey, MaybeTombstone<JDataVersionedWrapper>>) (CloseableKvIterator<JObjectKey, ?>) _cache.getIterator(tS, tK));
}
@Nonnull
@@ -367,7 +361,7 @@ public class WritebackObjectPersistentStore {
var cached = _pendingWrites.get(name);
if (cached != null) {
return switch (cached) {
case PendingWrite c -> Optional.of(c.data());
case PendingWrite c -> Optional.of(c.value());
case PendingDelete d -> {
yield Optional.empty();
}

View File

@@ -1,11 +1,12 @@
package com.usatiuk.objects.transaction;
import com.google.common.collect.Streams;
import com.usatiuk.dhfs.utils.AutoCloseableNoThrow;
import com.usatiuk.objects.JData;
import com.usatiuk.objects.JDataVersionedWrapper;
import com.usatiuk.objects.JObjectKey;
import com.usatiuk.objects.snapshot.Snapshot;
import com.usatiuk.objects.snapshot.SnapshotManager;
import com.usatiuk.dhfs.utils.AutoCloseableNoThrow;
import io.quarkus.logging.Log;
import io.quarkus.runtime.StartupEvent;
import jakarta.annotation.Priority;
@@ -13,20 +14,22 @@ import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
import jakarta.enterprise.inject.Instance;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import org.apache.commons.lang3.tuple.Pair;
import java.util.*;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Stream;
// Manages all access to com.usatiuk.objects.JData objects.
// In particular, it serves as a source of truth for what is committed to the backing storage.
// All data goes through it, it is responsible for transaction atomicity
// TODO: persistent tx id
@ApplicationScoped
public class JObjectManager {
private final List<PreCommitTxHook> _preCommitTxHooks;
private record CommitHookIterationData(PreCommitTxHook hook,
Map<JObjectKey, TxRecord.TxObjectRecord<?>> lastWrites,
Map<JObjectKey, TxRecord.TxObjectRecord<?>> pendingWrites) {
}
@Inject
SnapshotManager snapshotManager;
@Inject
@@ -57,25 +60,27 @@ public class JObjectManager {
public Pair<Collection<Runnable>, TransactionHandle> commit(TransactionPrivate tx) {
verifyReady();
var writes = new LinkedHashMap<JObjectKey, TxRecord.TxObjectRecord<?>>();
var dependenciesLocked = new LinkedHashMap<JObjectKey, Optional<JDataVersionedWrapper>>();
var writes = new HashMap<JObjectKey, TxRecord.TxObjectRecord<?>>();
Snapshot<JObjectKey, JDataVersionedWrapper> commitSnapshot = null;
Map<JObjectKey, TransactionObject<?>> readSet;
var toUnlock = new ArrayList<AutoCloseableNoThrow>();
Map<JObjectKey, TransactionObject<?>> readSet = null;
Collection<AutoCloseableNoThrow> toUnlock = null;
try {
try {
long pendingCount = 0;
Map<PreCommitTxHook, Map<JObjectKey, TxRecord.TxObjectRecord<?>>> pendingWrites = Map.ofEntries(
_preCommitTxHooks.stream().map(p -> Pair.of(p, new HashMap<>())).toArray(Pair[]::new)
);
Map<PreCommitTxHook, Map<JObjectKey, TxRecord.TxObjectRecord<?>>> lastWrites = Map.ofEntries(
_preCommitTxHooks.stream().map(p -> Pair.of(p, new HashMap<>())).toArray(Pair[]::new)
);
List<CommitHookIterationData> hookIterationData;
{
CommitHookIterationData[] hookIterationDataArray = new CommitHookIterationData[_preCommitTxHooks.size()];
for (int i = 0; i < _preCommitTxHooks.size(); i++) {
var hook = _preCommitTxHooks.get(i);
hookIterationDataArray[i] = new CommitHookIterationData(hook, new HashMap<>(), new HashMap<>());
}
hookIterationData = List.of(hookIterationDataArray);
}
for (var n : tx.drainNewWrites()) {
for (var hookPut : _preCommitTxHooks) {
pendingWrites.get(hookPut).put(n.key(), n);
for (var hookPut : hookIterationData) {
hookPut.pendingWrites().put(n.key(), n);
pendingCount++;
}
writes.put(n.key(), n);
@@ -87,9 +92,10 @@ public class JObjectManager {
// For example, when a hook makes changes to an object, and another hook changes the object before/after it
// on the next iteration, the first hook should receive the version of the object it had created
// as the "old" version, and the new version with all the changes after it.
do {
for (var hook : _preCommitTxHooks) {
var lastCurHookSeen = lastWrites.get(hook);
while (pendingCount > 0) {
for (var hookId : hookIterationData) {
var hook = hookId.hook();
var lastCurHookSeen = hookId.lastWrites();
Function<JObjectKey, JData> getPrev =
key -> switch (lastCurHookSeen.get(key)) {
case TxRecord.TxObjectRecordWrite<?> write -> write.data();
@@ -100,7 +106,7 @@ public class JObjectManager {
}
};
var curIteration = pendingWrites.get(hook);
var curIteration = hookId.pendingWrites();
// Log.trace("Commit iteration with " + curIteration.size() + " records for hook " + hook.getClass());
@@ -127,23 +133,23 @@ public class JObjectManager {
curIteration.clear();
for (var n : tx.drainNewWrites()) {
for (var hookPut : _preCommitTxHooks) {
if (hookPut == hook) {
for (var hookPut : hookIterationData) {
if (hookPut == hookId) {
lastCurHookSeen.put(n.key(), n);
continue;
}
var before = pendingWrites.get(hookPut).put(n.key(), n);
var before = hookPut.pendingWrites().put(n.key(), n);
if (before == null)
pendingCount++;
}
writes.put(n.key(), n);
}
}
} while (pendingCount > 0);
}
} catch (Throwable e) {
for (var read : tx.reads().entrySet()) {
if (read.getValue() instanceof TransactionObjectLocked<?> locked) {
toUnlock.add(locked.lock());
locked.lock().close();
}
}
throw e;
@@ -152,25 +158,34 @@ public class JObjectManager {
readSet = tx.reads();
if (!writes.isEmpty()) {
Stream.concat(readSet.keySet().stream(), writes.keySet().stream())
.sorted(Comparator.comparing(JObjectKey::toString))
.forEach(k -> {
var lock = lockManager.lockObject(k);
toUnlock.add(lock);
});
toUnlock = new ArrayList<>(readSet.size() + writes.size());
ArrayList<JObjectKey> toLock = new ArrayList<>(readSet.size() + writes.size());
for (var read : readSet.entrySet()) {
if (read.getValue() instanceof TransactionObjectLocked<?> locked) {
toUnlock.add(locked.lock());
} else {
toLock.add(read.getKey());
}
}
for (var write : writes.entrySet()) {
toLock.add(write.getKey());
}
Collections.sort(toLock);
for (var key : toLock) {
var lock = lockManager.lockObject(key);
toUnlock.add(lock);
}
commitSnapshot = snapshotManager.createSnapshot();
}
for (var read : readSet.entrySet()) {
if (read.getValue() instanceof TransactionObjectLocked<?> locked) {
toUnlock.add(locked.lock());
}
}
if (writes.isEmpty()) {
} else {
Log.trace("Committing transaction - no changes");
for (var read : readSet.values()) {
if (read instanceof TransactionObjectLocked<?> locked) {
locked.lock().close();
}
}
return Pair.of(
Stream.concat(
tx.getOnCommit().stream(),
@@ -189,24 +204,23 @@ public class JObjectManager {
if (snapshotId != commitSnapshot.id()) {
for (var read : readSet.entrySet()) {
dependenciesLocked.put(read.getKey(), commitSnapshot.readObject(read.getKey()));
var dep = dependenciesLocked.get(read.getKey());
var current = commitSnapshot.readObject(read.getKey());
if (dep.isEmpty() != read.getValue().data().isEmpty()) {
if (current.isEmpty() != read.getValue().data().isEmpty()) {
Log.tracev("Checking read dependency {0} - not found", read.getKey());
throw new TxCommitException("Serialization hazard: " + dep.isEmpty() + " vs " + read.getValue().data().isEmpty());
throw new TxCommitException("Serialization hazard: " + current.isEmpty() + " vs " + read.getValue().data().isEmpty());
}
if (dep.isEmpty()) {
if (current.isEmpty()) {
// TODO: Every write gets a dependency due to hooks
continue;
// assert false;
// throw new TxCommitException("Serialization hazard: " + dep.isEmpty() + " vs " + read.getValue().data().isEmpty());
// throw new TxCommitException("Serialization hazard: " + dep.isEmpty() + " vs " + read.getValue().value().isEmpty());
}
if (dep.get().version() > snapshotId) {
if (current.get().version() > snapshotId) {
Log.tracev("Checking dependency {0} - newer than", read.getKey());
throw new TxCommitException("Serialization hazard: " + dep.get().data().key() + " " + dep.get().version() + " vs " + snapshotId);
throw new TxCommitException("Serialization hazard: " + current.get().data().key() + " " + current.get().version() + " vs " + snapshotId);
}
Log.tracev("Checking dependency {0} - ok with read", read.getKey());
@@ -215,21 +229,7 @@ public class JObjectManager {
Log.tracev("Skipped dependency checks: no changes");
}
boolean same = snapshotId == commitSnapshot.id();
var addFlushCallback = snapshotManager.commitTx(
writes.values().stream()
.filter(r -> {
if (!same)
if (r instanceof TxRecord.TxObjectRecordWrite<?>(JData data)) {
var dep = dependenciesLocked.get(data.key());
if (dep.isPresent() && dep.get().version() > snapshotId) {
Log.trace("Skipping write " + data.key() + " - dependency " + dep.get().version() + " vs " + snapshotId);
return false;
}
}
return true;
}).toList());
var addFlushCallback = snapshotManager.commitTx(writes.values());
for (var callback : tx.getOnFlush()) {
addFlushCallback.accept(callback);
@@ -247,9 +247,10 @@ public class JObjectManager {
Log.trace("Error when committing transaction", t);
throw new TxCommitException(t.getMessage(), t);
} finally {
for (var unlock : toUnlock) {
unlock.close();
}
if (toUnlock != null)
for (var unlock : toUnlock) {
unlock.close();
}
if (commitSnapshot != null)
commitSnapshot.close();
tx.close();
@@ -265,31 +266,4 @@ public class JObjectManager {
});
tx.close();
}
// private class TransactionObjectSourceImpl implements TransactionObjectSource {
// private final long _txId;
//
// private TransactionObjectSourceImpl(long txId) {
// _txId = txId;
// }
//
// @Override
// public <T extends JData> TransactionObject<T> get(Class<T> type, JObjectKey key) {
// var got = getObj(type, key);
// if (got.data().isPresent() && got.data().get().version() > _txId) {
// throw new TxCommitException("Serialization race for " + key + ": " + got.data().get().version() + " vs " + _txId);
// }
// return got;
// }
//
// @Override
// public <T extends JData> TransactionObject<T> getWriteLocked(Class<T> type, JObjectKey key) {
// var got = getObjLock(type, key);
// if (got.data().isPresent() && got.data().get().version() > _txId) {
// got.lock().close();
// throw new TxCommitException("Serialization race for " + key + ": " + got.data().get().version() + " vs " + _txId);
// }
// return got;
// }
// }
}

View File

@@ -6,8 +6,9 @@ import com.usatiuk.dhfs.utils.DataLocker;
import jakarta.annotation.Nonnull;
import jakarta.annotation.Nullable;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Singleton;
@ApplicationScoped
@Singleton
public class LockManager {
private final DataLocker _objLocker = new DataLocker();

View File

@@ -7,14 +7,14 @@ import com.usatiuk.objects.iterators.*;
import com.usatiuk.objects.snapshot.Snapshot;
import com.usatiuk.objects.snapshot.SnapshotManager;
import io.quarkus.logging.Log;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import org.apache.commons.lang3.tuple.Pair;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import java.util.*;
@ApplicationScoped
@Singleton
public class TransactionFactoryImpl implements TransactionFactory {
@Inject
SnapshotManager snapshotManager;
@@ -161,20 +161,25 @@ public class TransactionFactoryImpl implements TransactionFactory {
@Override
public CloseableKvIterator<JObjectKey, JData> getIterator(IteratorStart start, JObjectKey key) {
Log.tracev("Getting tx iterator with start={0}, key={1}", start, key);
return new ReadTrackingIterator(new TombstoneMergingKvIterator<>("tx", start, key,
return new ReadTrackingIterator(TombstoneMergingKvIterator.<JObjectKey, ReadTrackingInternalCrap>of("tx", start, key,
(tS, tK) -> new MappingKvIterator<>(new NavigableMapKvIterator<>(_writes, tS, tK),
t -> switch (t) {
case TxRecord.TxObjectRecordWrite<?> write ->
new Data<>(new ReadTrackingInternalCrapTx(write.data()));
case TxRecord.TxObjectRecordDeleted deleted -> new Tombstone<>();
new DataWrapper<>(new ReadTrackingInternalCrapTx(write.data()));
case TxRecord.TxObjectRecordDeleted deleted -> new TombstoneImpl<>();
case null, default -> null;
}),
(tS, tK) -> new MappingKvIterator<>(_snapshot.getIterator(tS, tK),
d -> new Data<ReadTrackingInternalCrap>(new ReadTrackingInternalCrapSource(d)))));
d -> new DataWrapper<ReadTrackingInternalCrap>(new ReadTrackingInternalCrapSource(d)))));
}
@Override
public void put(JData obj) {
var read = _readSet.get(obj.key());
if (read != null && (read.data().map(JDataVersionedWrapper::data).orElse(null) == obj)) {
return;
}
_writes.put(obj.key(), new TxRecord.TxObjectRecordWrite<>(obj));
_newWrites.put(obj.key(), new TxRecord.TxObjectRecordWrite<>(obj));
}

View File

@@ -17,30 +17,27 @@ public interface TransactionManager {
return supplier.get();
}
begin();
T ret;
try {
ret = supplier.get();
} catch (TxCommitException txCommitException) {
rollback();
if (tries == 0) {
Log.error("Transaction commit failed", txCommitException);
throw txCommitException;
while (true) {
begin();
boolean commit = false;
try {
var ret = supplier.get();
commit = true;
commit();
return ret;
} catch (TxCommitException txCommitException) {
if (!commit)
rollback();
if (tries == 0) {
Log.error("Transaction commit failed", txCommitException);
throw txCommitException;
}
tries--;
} catch (Throwable e) {
if (!commit)
rollback();
throw e;
}
return runTries(supplier, tries - 1);
} catch (Throwable e) {
rollback();
throw e;
}
try {
commit();
return ret;
} catch (TxCommitException txCommitException) {
if (tries == 0) {
Log.error("Transaction commit failed", txCommitException);
throw txCommitException;
}
return runTries(supplier, tries - 1);
}
}
@@ -55,29 +52,29 @@ public interface TransactionManager {
};
}
begin();
try {
fn.apply();
} catch (TxCommitException txCommitException) {
rollback();
if (tries == 0) {
Log.error("Transaction commit failed", txCommitException);
throw txCommitException;
while (true) {
begin();
boolean commit = false;
try {
fn.apply();
commit = true;
var ret = commit();
return ret;
} catch (TxCommitException txCommitException) {
if (!commit)
rollback();
if (tries == 0) {
Log.error("Transaction commit failed", txCommitException);
throw txCommitException;
}
tries--;
} catch (Throwable e) {
if (!commit)
rollback();
throw e;
}
return runTries(fn, tries - 1);
} catch (Throwable e) {
rollback();
throw e;
}
try {
return commit();
} catch (TxCommitException txCommitException) {
if (tries == 0) {
Log.error("Transaction commit failed", txCommitException);
throw txCommitException;
}
return runTries(fn, tries - 1);
}
}
default TransactionHandle run(VoidFn fn) {

View File

@@ -0,0 +1,262 @@
package com.usatiuk.objects.iterators;
import net.jqwik.api.*;
import net.jqwik.api.state.Action;
import net.jqwik.api.state.ActionChain;
import org.apache.commons.lang3.tuple.Pair;
import org.junit.jupiter.api.Assertions;
import java.util.*;
public class MergingKvIteratorPbtTest {
static class MergingIteratorModel implements CloseableKvIterator<Integer, Integer> {
private final CloseableKvIterator<Integer, Integer> mergedIterator;
private final CloseableKvIterator<Integer, Integer> mergingIterator;
private MergingIteratorModel(List<List<Map.Entry<Integer, Integer>>> pairs, IteratorStart startType, Integer startKey) {
TreeMap<Integer, Integer> perfectMerged = new TreeMap<>();
for (List<Map.Entry<Integer, Integer>> list : pairs) {
for (Map.Entry<Integer, Integer> pair : list) {
perfectMerged.putIfAbsent(pair.getKey(), pair.getValue());
}
}
mergedIterator = new NavigableMapKvIterator<>(perfectMerged, startType, startKey);
mergingIterator = new MergingKvIterator<>("test", startType, startKey, pairs.stream().<IterProdFn<Integer, Integer>>map(
list -> (IteratorStart start, Integer key) -> new NavigableMapKvIterator<>(new TreeMap<Integer, Integer>(Map.ofEntries(list.toArray(Map.Entry[]::new))), start, key)
).toList());
}
@Override
public Integer peekNextKey() {
var mergedKey = mergedIterator.peekNextKey();
var mergingKey = mergingIterator.peekNextKey();
Assertions.assertEquals(mergedKey, mergingKey);
return mergedKey;
}
@Override
public void skip() {
mergedIterator.skip();
mergingIterator.skip();
}
@Override
public Integer peekPrevKey() {
var mergedKey = mergedIterator.peekPrevKey();
var mergingKey = mergingIterator.peekPrevKey();
Assertions.assertEquals(mergedKey, mergingKey);
return mergedKey;
}
@Override
public Pair<Integer, Integer> prev() {
var mergedKey = mergedIterator.prev();
var mergingKey = mergingIterator.prev();
Assertions.assertEquals(mergedKey, mergingKey);
return mergedKey;
}
@Override
public boolean hasPrev() {
var mergedKey = mergedIterator.hasPrev();
var mergingKey = mergingIterator.hasPrev();
Assertions.assertEquals(mergedKey, mergingKey);
return mergedKey;
}
@Override
public void skipPrev() {
mergedIterator.skipPrev();
mergingIterator.skipPrev();
}
@Override
public void close() {
mergedIterator.close();
mergingIterator.close();
}
@Override
public boolean hasNext() {
var mergedKey = mergedIterator.hasNext();
var mergingKey = mergingIterator.hasNext();
Assertions.assertEquals(mergedKey, mergingKey);
return mergedKey;
}
@Override
public Pair<Integer, Integer> next() {
var mergedKey = mergedIterator.next();
var mergingKey = mergingIterator.next();
Assertions.assertEquals(mergedKey, mergingKey);
return mergedKey;
}
}
static class PeekNextKeyAction extends Action.JustMutate<MergingIteratorModel> {
@Override
public void mutate(MergingIteratorModel state) {
state.peekNextKey();
}
@Override
public boolean precondition(MergingIteratorModel state) {
return state.hasNext();
}
@Override
public String description() {
return "Peek next key";
}
}
static class SkipAction extends Action.JustMutate<MergingIteratorModel> {
@Override
public void mutate(MergingIteratorModel state) {
state.skip();
}
@Override
public boolean precondition(MergingIteratorModel state) {
return state.hasNext();
}
@Override
public String description() {
return "Skip next key";
}
}
static class PeekPrevKeyAction extends Action.JustMutate<MergingIteratorModel> {
@Override
public void mutate(MergingIteratorModel state) {
state.peekPrevKey();
}
@Override
public boolean precondition(MergingIteratorModel state) {
return state.hasPrev();
}
@Override
public String description() {
return "Peek prev key";
}
}
static class SkipPrevAction extends Action.JustMutate<MergingIteratorModel> {
@Override
public void mutate(MergingIteratorModel state) {
state.skipPrev();
}
@Override
public boolean precondition(MergingIteratorModel state) {
return state.hasPrev();
}
@Override
public String description() {
return "Skip prev key";
}
}
static class PrevAction extends Action.JustMutate<MergingIteratorModel> {
@Override
public void mutate(MergingIteratorModel state) {
state.prev();
}
@Override
public boolean precondition(MergingIteratorModel state) {
return state.hasPrev();
}
@Override
public String description() {
return "Prev key";
}
}
static class NextAction extends Action.JustMutate<MergingIteratorModel> {
@Override
public void mutate(MergingIteratorModel state) {
state.next();
}
@Override
public boolean precondition(MergingIteratorModel state) {
return state.hasNext();
}
@Override
public String description() {
return "Next key";
}
}
static class HasNextAction extends Action.JustMutate<MergingIteratorModel> {
@Override
public void mutate(MergingIteratorModel state) {
state.hasNext();
}
@Override
public boolean precondition(MergingIteratorModel state) {
return true;
}
@Override
public String description() {
return "Has next key";
}
}
static class HasPrevAction extends Action.JustMutate<MergingIteratorModel> {
@Override
public void mutate(MergingIteratorModel state) {
state.hasPrev();
}
@Override
public boolean precondition(MergingIteratorModel state) {
return true;
}
@Override
public String description() {
return "Has prev key";
}
}
@Property
public void checkMergingIterator(@ForAll("actions") ActionChain<MergingIteratorModel> actions) {
actions.run();
}
@Provide
Arbitrary<ActionChain<MergingIteratorModel>> actions(@ForAll("lists") List<List<Map.Entry<Integer, Integer>>> list,
@ForAll IteratorStart iteratorStart, @ForAll("startKey") Integer startKey) {
return ActionChain.startWith(() -> new MergingIteratorModel(list, iteratorStart, startKey))
.withAction(new NextAction())
.withAction(new PeekNextKeyAction())
.withAction(new SkipAction())
.withAction(new PeekPrevKeyAction())
.withAction(new SkipPrevAction())
.withAction(new PrevAction())
.withAction(new HasNextAction())
.withAction(new HasPrevAction());
}
@Provide
Arbitrary<List<List<Map.Entry<Integer, Integer>>>> lists() {
return Arbitraries.entries(Arbitraries.integers().between(-50, 50), Arbitraries.integers().between(-50, 50))
.list().uniqueElements(Map.Entry::getKey).ofMinSize(0).ofMaxSize(20)
.list().ofMinSize(1).ofMaxSize(5);
}
@Provide
Arbitrary<Integer> startKey() {
return Arbitraries.integers().between(-51, 51);
}
}

View File

@@ -16,7 +16,6 @@
<module>dhfs-fuse</module>
<module>dhfs-app</module>
<module>kleppmanntree</module>
<module>supportlib</module>
<module>objects</module>
<module>utils</module>
</modules>
@@ -36,13 +35,6 @@
<dhfs.native-libs-dir>${project.parent.build.outputDirectory}/native</dhfs.native-libs-dir>
</properties>
<repositories>
<repository>
<id>jitpack.io</id>
<url>https://jitpack.io</url>
</repository>
</repositories>
<dependencyManagement>
<dependencies>
<dependency>
@@ -52,6 +44,12 @@
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>net.jqwik</groupId>
<artifactId>jqwik</artifactId>
<version>1.9.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
@@ -116,9 +114,6 @@
<systemPropertyVariables>
<java.util.logging.manager>org.jboss.logmanager.LogManager</java.util.logging.manager>
<maven.home>${maven.home}</maven.home>
<com.usatiuk.dhfs.supportlib.native-path>
${dhfs.native-libs-dir}
</com.usatiuk.dhfs.supportlib.native-path>
</systemPropertyVariables>
<argLine>
--add-exports java.base/sun.nio.ch=ALL-UNNAMED
@@ -149,6 +144,7 @@
</native.image.path>
<java.util.logging.manager>org.jboss.logmanager.LogManager</java.util.logging.manager>
<buildDirectory>${project.build.directory}</buildDirectory>
<lazyFsPath>${project.basedir}/../../thirdparty/lazyfs/lazyfs/lazyfs</lazyFsPath>
<nativeLibsDirectory>${dhfs.native-libs-dir}</nativeLibsDirectory>
<maven.home>${maven.home}</maven.home>
</systemPropertyVariables>
@@ -186,7 +182,6 @@
--initialize-at-run-time=com.usatiuk.dhfs.utils.DataLocker$Lock,
--initialize-at-run-time=com.usatiuk.objects.stores.LmdbObjectPersistentStore$LmdbKvIterator,
--initialize-at-run-time=com.usatiuk.objects.stores.LmdbObjectPersistentStore,
--initialize-at-run-time=com.usatiuk.dhfs.supportlib.UninitializedByteBuffer,
--initialize-at-run-time=com.google.protobuf.UnsafeUtil
</quarkus.native.additional-build-args>
</properties>

View File

@@ -1,114 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.usatiuk.dhfs</groupId>
<artifactId>parent</artifactId>
<version>1.0-SNAPSHOT</version>
</parent>
<artifactId>supportlib</artifactId>
<properties>
<cmake.download>false</cmake.download>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>exec-maven-plugin</artifactId>
<groupId>org.codehaus.mojo</groupId>
<version>3.4.1</version>
<executions>
<execution>
<id>CMake Configure</id>
<phase>generate-sources</phase>
<goals>
<goal>exec</goal>
</goals>
<configuration>
<executable>
${project.parent.basedir}/../libdhfs_support/builder/cross-build.sh
</executable>
<arguments>
<argument>configure</argument>
<argument>${project.build.outputDirectory}/native-build</argument>
<argument>${dhfs.native-libs-dir}</argument>
</arguments>
</configuration>
</execution>
<execution>
<id>CMake Build</id>
<phase>compile</phase>
<goals>
<goal>exec</goal>
</goals>
<configuration>
<executable>
${project.parent.basedir}/../libdhfs_support/builder/cross-build.sh
</executable>
<arguments>
<argument>build</argument>
<argument>${project.build.outputDirectory}/native-build</argument>
<argument>${dhfs.native-libs-dir}</argument>
</arguments>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>com.googlecode.cmake-maven-project</groupId>
<artifactId>cmake-maven-plugin</artifactId>
<version>3.30.2-b1</version>
<executions>
<execution>
<id>cmake-generate</id>
<goals>
<goal>generate</goal>
</goals>
<configuration>
<sourcePath>
${project.parent.basedir}/../libdhfs_support
</sourcePath>
<targetPath>
${project.build.outputDirectory}/native-build-local
</targetPath>
<options>
<option>
-DJAVA_HOME=${java.home}
</option>
<option>
-DDHFS_LIB_INSTALL=${dhfs.native-libs-dir}
</option>
</options>
</configuration>
</execution>
<execution>
<id>cmake-compile</id>
<goals>
<goal>compile</goal>
</goals>
<configuration>
<target>
install
</target>
<projectDirectory>
${project.build.outputDirectory}/native-build-local
</projectDirectory>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

View File

@@ -1,15 +0,0 @@
package com.usatiuk.dhfs.supportlib;
import java.nio.file.Path;
class DhfsNativeLibFinder {
static private final String LibName = "libdhfs_support";
static Path getLibPath() {
var override = System.getProperty("com.usatiuk.dhfs.supportlib.native-path-override");
if (override != null)
return Path.of(override);
return Path.of(System.getProperty("com.usatiuk.dhfs.supportlib.native-path"))
.resolve(SysUtils.getLibPlatform() + "-" + SysUtils.getLibArch()).resolve(LibName + "." + SysUtils.getLibExtension());
}
}

View File

@@ -1,35 +0,0 @@
package com.usatiuk.dhfs.supportlib;
import java.nio.ByteBuffer;
import java.util.logging.Logger;
public class DhfsSupport {
public static final int PAGE_SIZE;
private static final Logger LOGGER = Logger.getLogger(DhfsSupport.class.getName());
private static final DhfsSupportImpl IMPLEMENTATION;
static {
DhfsSupportImpl tmp;
try {
System.load(DhfsNativeLibFinder.getLibPath().toAbsolutePath().toString());
tmp = new DhfsSupportImplNative();
} catch (Throwable e) {
LOGGER.warning("Failed to load native libraries, using fallback: \n" + e);
tmp = new DhfsSupportImplFallback();
}
IMPLEMENTATION = tmp;
PAGE_SIZE = getPageSizeInternal();
}
static long allocateUninitializedByteBuffer(ByteBuffer[] bb, int size) {
return IMPLEMENTATION.allocateUninitializedByteBuffer(bb, size);
}
static void releaseByteBuffer(long token) {
IMPLEMENTATION.releaseByteBuffer(token);
}
private static int getPageSizeInternal() {
return IMPLEMENTATION.getPageSizeInternal();
}
}

View File

@@ -1,11 +0,0 @@
package com.usatiuk.dhfs.supportlib;
import java.nio.ByteBuffer;
interface DhfsSupportImpl {
long allocateUninitializedByteBuffer(ByteBuffer[] bb, int size);
void releaseByteBuffer(long token);
int getPageSizeInternal();
}

View File

@@ -1,21 +0,0 @@
package com.usatiuk.dhfs.supportlib;
import java.nio.ByteBuffer;
class DhfsSupportImplFallback implements DhfsSupportImpl {
@Override
public long allocateUninitializedByteBuffer(ByteBuffer[] bb, int size) {
bb[0] = ByteBuffer.allocateDirect(size);
return -1;
}
@Override
public void releaseByteBuffer(long token) {
// GC
}
@Override
public int getPageSizeInternal() {
return 4096; // FIXME:?
}
}

View File

@@ -1,20 +0,0 @@
package com.usatiuk.dhfs.supportlib;
import java.nio.ByteBuffer;
class DhfsSupportImplNative implements DhfsSupportImpl {
@Override
public long allocateUninitializedByteBuffer(ByteBuffer[] bb, int size) {
return DhfsSupportNative.allocateUninitializedByteBuffer(bb, size);
}
@Override
public void releaseByteBuffer(long token) {
DhfsSupportNative.releaseByteBuffer(token);
}
@Override
public int getPageSizeInternal() {
return DhfsSupportNative.PAGE_SIZE;
}
}

View File

@@ -1,20 +0,0 @@
package com.usatiuk.dhfs.supportlib;
import java.nio.ByteBuffer;
class DhfsSupportNative {
static public final int PAGE_SIZE;
static {
System.load(DhfsNativeLibFinder.getLibPath().toAbsolutePath().toString());
PAGE_SIZE = getPageSizeInternal();
}
static native long allocateUninitializedByteBuffer(ByteBuffer[] bb, int size);
static native void releaseByteBuffer(long token);
private static native int getPageSizeInternal();
}

View File

@@ -1,43 +0,0 @@
package com.usatiuk.dhfs.supportlib;
import org.apache.commons.lang3.SystemUtils;
class SysUtils {
static String getLibPlatform() {
if (SystemUtils.IS_OS_MAC) {
return "Darwin";
} else if (SystemUtils.IS_OS_LINUX) {
return "Linux";
} else {
throw new IllegalStateException("Unsupported OS: " + SystemUtils.OS_NAME);
}
}
static String getLibExtension() {
if (SystemUtils.IS_OS_MAC) {
return "dylib";
} else if (SystemUtils.IS_OS_LINUX) {
return "so";
} else {
throw new IllegalStateException("Unsupported OS: " + SystemUtils.OS_NAME);
}
}
static String getLibArch() {
if (SystemUtils.IS_OS_MAC) {
return switch (SystemUtils.OS_ARCH) {
case "aarch64" -> "arm64";
default -> throw new IllegalStateException("Unsupported architecture: " + SystemUtils.OS_ARCH);
};
} else if (SystemUtils.IS_OS_LINUX) {
return switch (SystemUtils.OS_ARCH) {
case "aarch64" -> "aarch64";
case "amd64" -> "x86_64";
default -> throw new IllegalStateException("Unsupported architecture: " + SystemUtils.OS_ARCH);
};
} else {
throw new IllegalStateException("Unsupported OS: " + SystemUtils.OS_NAME);
}
}
}

View File

@@ -1,32 +0,0 @@
package com.usatiuk.dhfs.supportlib;
import java.lang.ref.Cleaner;
import java.nio.ByteBuffer;
import java.util.logging.Logger;
public class UninitializedByteBuffer {
private static final Cleaner CLEANER = Cleaner.create();
private static final Logger LOGGER = Logger.getLogger(UninitializedByteBuffer.class.getName());
public static ByteBuffer allocateUninitialized(int size) {
try {
if (size < DhfsSupport.PAGE_SIZE)
return ByteBuffer.allocateDirect(size);
var bb = new ByteBuffer[1];
long token = DhfsSupport.allocateUninitializedByteBuffer(bb, size);
var ret = bb[0];
CLEANER.register(ret, () -> {
try {
DhfsSupport.releaseByteBuffer(token);
} catch (Throwable e) {
LOGGER.severe("Error releasing buffer: " + e);
System.exit(-1);
}
});
return ret;
} catch (OutOfMemoryError e) {
return ByteBuffer.allocate(size);
}
}
}

View File

@@ -72,26 +72,6 @@
<artifactId>quarkus-junit5</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.github.SerCeMan</groupId>
<artifactId>jnr-fuse</artifactId>
<version>44ed40f8ce</version>
</dependency>
<dependency>
<groupId>com.github.jnr</groupId>
<artifactId>jnr-ffi</artifactId>
<version>2.2.16</version>
</dependency>
<dependency>
<groupId>com.github.jnr</groupId>
<artifactId>jnr-posix</artifactId>
<version>3.1.19</version>
</dependency>
<dependency>
<groupId>com.github.jnr</groupId>
<artifactId>jnr-constants</artifactId>
<version>0.10.4</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
@@ -127,11 +107,6 @@
<artifactId>kleppmanntree</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.usatiuk.dhfs</groupId>
<artifactId>supportlib</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.usatiuk.dhfs</groupId>
<artifactId>objects</artifactId>

View File

@@ -6,30 +6,27 @@ import org.pcollections.PCollection;
import java.util.Collection;
public record RemoteObjectDataWrapper<T extends JDataRemote>(PCollection<JDataRef> refsFrom,
boolean frozen,
T data) implements JDataRefcounted {
public record RemoteObjectDataWrapper<T extends JDataRemote>(
JObjectKey key,
PCollection<JDataRef> refsFrom,
boolean frozen,
T data) implements JDataRefcounted {
public RemoteObjectDataWrapper(T data) {
this(HashTreePSet.empty(), false, data);
this(RemoteObjectMeta.ofDataKey(data.key()), HashTreePSet.empty(), false, data);
}
@Override
public RemoteObjectDataWrapper<T> withRefsFrom(PCollection<JDataRef> refs) {
return new RemoteObjectDataWrapper<>(refs, frozen, data);
return new RemoteObjectDataWrapper<>(key, refs, frozen, data);
}
@Override
public RemoteObjectDataWrapper<T> withFrozen(boolean frozen) {
return new RemoteObjectDataWrapper<>(refsFrom, frozen, data);
return new RemoteObjectDataWrapper<>(key, refsFrom, frozen, data);
}
public RemoteObjectDataWrapper<T> withData(T data) {
return new RemoteObjectDataWrapper<>(refsFrom, frozen, data);
}
@Override
public JObjectKey key() {
return RemoteObjectMeta.ofDataKey(data.key());
return new RemoteObjectDataWrapper<>(key, refsFrom, frozen, data);
}
@Override

View File

@@ -41,7 +41,7 @@ public record RemoteObjectMeta(PCollection<JDataRef> refsFrom, boolean frozen,
}
public static JObjectKey ofDataKey(JObjectKey key) {
return JObjectKey.of("data_" + key.value());
return JObjectKey.of(key.value() + "_data");
}
@Override

View File

@@ -9,12 +9,13 @@ import com.usatiuk.objects.transaction.Transaction;
import io.quarkus.logging.Log;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import org.apache.commons.lang3.mutable.MutableObject;
import org.pcollections.HashTreePSet;
import java.util.Optional;
@ApplicationScoped
@Singleton
public class RemoteTransaction {
@Inject
Transaction curTx;

View File

@@ -3,11 +3,11 @@ package com.usatiuk.dhfs.jmap;
import com.usatiuk.objects.JData;
import com.usatiuk.objects.JObjectKey;
public record JMapEntry<K extends JMapKey>(JObjectKey holder,
public record JMapEntry<K extends JMapKey>(JObjectKey key,
JObjectKey holder,
K selfKey,
JObjectKey ref) implements JData {
@Override
public JObjectKey key() {
return JMapHelper.makeKey(holder, selfKey);
public JMapEntry(JObjectKey holder, K selfKey, JObjectKey ref) {
this(JMapHelper.makeKey(holder, selfKey), holder, selfKey, ref);
}
}

View File

@@ -7,11 +7,12 @@ import com.usatiuk.objects.transaction.Transaction;
import io.quarkus.logging.Log;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import java.util.ArrayList;
import java.util.Optional;
@ApplicationScoped
@Singleton
public class JMapHelper {
@Inject
Transaction curTx;

View File

@@ -1,11 +1,13 @@
package com.usatiuk.dhfs.repository;
import com.google.protobuf.ByteString;
import com.usatiuk.dhfs.PeerId;
import com.usatiuk.dhfs.ShutdownChecker;
import com.usatiuk.dhfs.repository.peerdiscovery.IpPeerAddress;
import com.usatiuk.dhfs.repository.peerdiscovery.PeerAddressType;
import com.usatiuk.dhfs.repository.peersync.PeerInfoService;
import com.usatiuk.dhfs.repository.peertrust.PeerTrustManager;
import com.usatiuk.dhfs.utils.SerializationHelper;
import com.usatiuk.objects.transaction.Transaction;
import com.usatiuk.objects.transaction.TransactionManager;
import io.quarkus.logging.Log;
@@ -64,8 +66,8 @@ public class PersistentPeerDataService {
var selfData = curTx.get(PersistentRemoteHostsData.class, PersistentRemoteHostsData.KEY).orElse(null);
if (selfData != null) {
_selfUuid = selfData.selfUuid();
_selfCertificate = selfData.selfCertificate();
_selfKeyPair = selfData.selfKeyPair();
_selfCertificate = CertificateTools.certFromBytes(selfData.selfCertificate().toByteArray());
_selfKeyPair = SerializationHelper.deserialize(selfData.selfKeyPair().toByteArray());
return;
} else {
try {
@@ -74,7 +76,7 @@ public class PersistentPeerDataService {
_selfKeyPair = CertificateTools.generateKeyPair();
_selfCertificate = CertificateTools.generateCertificate(_selfKeyPair, _selfUuid.toString());
curTx.put(new PersistentRemoteHostsData(_selfUuid, _selfCertificate, _selfKeyPair, HashTreePSet.empty(), HashTreePMap.empty()));
curTx.put(new PersistentRemoteHostsData(_selfUuid, ByteString.copyFrom(_selfCertificate.getEncoded()), SerializationHelper.serialize(_selfKeyPair), HashTreePSet.empty(), HashTreePMap.empty()));
peerInfoService.putPeer(_selfUuid, _selfCertificate.getEncoded());
} catch (CertificateEncodingException e) {
throw new RuntimeException(e);

View File

@@ -1,5 +1,6 @@
package com.usatiuk.dhfs.repository;
import com.google.protobuf.ByteString;
import com.usatiuk.dhfs.PeerId;
import com.usatiuk.dhfs.repository.peerdiscovery.IpPeerAddress;
import com.usatiuk.objects.JData;
@@ -12,8 +13,8 @@ import java.security.KeyPair;
import java.security.cert.X509Certificate;
public record PersistentRemoteHostsData(PeerId selfUuid,
X509Certificate selfCertificate,
KeyPair selfKeyPair,
ByteString selfCertificate,
ByteString selfKeyPair,
PSet<PeerId> initialSyncDone,
PMap<PeerId, IpPeerAddress> persistentPeerAddress) implements JData, Serializable {
public static final JObjectKey KEY = JObjectKey.of("self_peer_data");

View File

@@ -36,7 +36,11 @@ public class DeferredInvalidationQueueService implements PeerConnectedEventListe
Log.info("Initializing with root " + dataRoot);
if (Paths.get(dataRoot).resolve(dataFileName).toFile().exists()) {
Log.info("Reading invalidation queue");
_persistentData = SerializationHelper.deserialize(Files.readAllBytes(Paths.get(dataRoot).resolve(dataFileName)));
try {
_persistentData = SerializationHelper.deserialize(Files.readAllBytes(Paths.get(dataRoot).resolve(dataFileName)));
} catch (Exception e) {
Log.error("Error reading invalidation queue", e);
}
}
}

View File

@@ -5,7 +5,6 @@ dhfs.objects.ref_verification=true
dhfs.objects.deletion.delay=0
quarkus.log.category."com.usatiuk.dhfs".level=TRACE
quarkus.log.category."com.usatiuk.dhfs".min-level=TRACE
quarkus.class-loading.parent-first-artifacts=com.usatiuk.dhfs:supportlib
quarkus.http.test-port=0
quarkus.http.test-ssl-port=0
dhfs.local-discovery=false

View File

@@ -15,57 +15,47 @@ public class DataLocker {
@Nonnull
public AutoCloseableNoThrow lock(Object data) {
while (true) {
var newTag = new LockTag();
var oldTag = _locks.putIfAbsent(data, newTag);
if (oldTag == null) {
return new Lock(data, newTag);
}
try {
var tag = _locks.get(data);
if (tag != null) {
synchronized (tag) {
while (!tag.released) {
if (tag.owner == Thread.currentThread()) {
return DUMMY_LOCK;
}
tag.wait();
synchronized (oldTag) {
while (!oldTag.released) {
if (oldTag.owner == Thread.currentThread()) {
return DUMMY_LOCK;
}
oldTag.wait();
// tag.wait(4000L);
// if (!tag.released) {
// System.out.println("Timeout waiting for lock: " + data);
// System.exit(1);
// throw new InterruptedException();
// }
}
continue;
}
}
} catch (InterruptedException ignored) {
}
var newTag = new LockTag();
var oldTag = _locks.putIfAbsent(data, newTag);
if (oldTag == null) {
return new Lock(data, newTag);
}
}
}
@Nullable
public AutoCloseableNoThrow tryLock(Object data) {
while (true) {
var tag = _locks.get(data);
if (tag != null) {
synchronized (tag) {
if (!tag.released) {
if (tag.owner == Thread.currentThread()) {
return DUMMY_LOCK;
}
return null;
}
continue;
}
}
var newTag = new LockTag();
var oldTag = _locks.putIfAbsent(data, newTag);
if (oldTag == null) {
return new Lock(data, newTag);
}
synchronized (oldTag) {
if (!oldTag.released) {
if (oldTag.owner == Thread.currentThread()) {
return DUMMY_LOCK;
}
return null;
}
}
}
}
@@ -83,11 +73,11 @@ public class DataLocker {
public Lock(Object key, LockTag tag) {
_key = key;
_tag = tag;
CLEANER.register(this, () -> {
if (!tag.released) {
Log.error("Lock collected without release: " + key);
}
});
// CLEANER.register(this, () -> {
// if (!tag.released) {
// Log.error("Lock collected without release: " + key);
// }
// });
}
@Override

View File

@@ -45,7 +45,7 @@ public class HashSetDelayedBlockingQueueTest {
queue.add("hello!");
Assertions.assertEquals("hello!", queue.get());
var gotTime = System.currentTimeMillis();
Assertions.assertTrue((gotTime - curTime) <= 10);
Assertions.assertTrue((gotTime - curTime) <= 50);
}
@Test

View File

@@ -14,7 +14,6 @@ exec java \
-Ddhfs.fuse.root=/dhfs_root_fuse \
-Dquarkus.http.host=0.0.0.0 \
-Ddhfs.objects.ref_verification=false \
-Dcom.usatiuk.dhfs.supportlib.native-path=/usr/src/app/native-libs \
-Dquarkus.log.category.\"com.usatiuk.dhfs\".level=$DHFS_LOGLEVEL \
"$@" \
-jar quarkus-run.jar

View File

@@ -1,83 +0,0 @@
.DS_Store
/toolchain
/cmake-build-debug
/sysroot
/mvn-build
# Covers JetBrains IDEs: IntelliJ, RubyMine, PhpStorm, AppCode, PyCharm, CLion, Android Studio, WebStorm and Rider
# Reference: https://intellij-support.jetbrains.com/hc/en-us/articles/206544839
# User-specific stuff
.idea/**/workspace.xml
.idea/**/tasks.xml
.idea/**/usage.statistics.xml
.idea/**/dictionaries
.idea/**/shelf
# AWS User-specific
.idea/**/aws.xml
# Generated files
.idea/**/contentModel.xml
# Sensitive or high-churn files
.idea/**/dataSources/
.idea/**/dataSources.ids
.idea/**/dataSources.local.xml
.idea/**/sqlDataSources.xml
.idea/**/dynamic.xml
.idea/**/uiDesigner.xml
.idea/**/dbnavigator.xml
# Gradle
.idea/**/gradle.xml
.idea/**/libraries
# Gradle and Maven with auto-import
# When using Gradle or Maven with auto-import, you should exclude module files,
# since they will be recreated, and may cause churn. Uncomment if using
# auto-import.
# .idea/artifacts
# .idea/compiler.xml
# .idea/jarRepositories.xml
# .idea/modules.xml
# .idea/*.iml
# .idea/modules
# *.iml
# *.ipr
# CMake
cmake-build-*/
# Mongo Explorer plugin
.idea/**/mongoSettings.xml
# File-based project format
*.iws
# IntelliJ
out/
# mpeltonen/sbt-idea plugin
.idea_modules/
# JIRA plugin
atlassian-ide-plugin.xml
# Cursive Clojure plugin
.idea/replstate.xml
# SonarLint plugin
.idea/sonarlint/
# Crashlytics plugin (for Android Studio and IntelliJ)
com_crashlytics_export_strings.xml
crashlytics.properties
crashlytics-build.properties
fabric.properties
# Editor-based Rest Client
.idea/httpRequests
# Android studio 3.1+ serialized cache file
.idea/caches/build_file_checksums.ser

View File

@@ -1,39 +0,0 @@
cmake_minimum_required(VERSION 3.24)
project(libdhfs_support CXX)
if (CMAKE_BUILD_TYPE STREQUAL "Debug")
if (NOT SANITIZE)
set(SANITIZE YES)
endif ()
endif ()
include(CheckCXXCompilerFlag)
if (SANITIZE STREQUAL "YES")
message(STATUS "Enabling sanitizers!")
add_compile_options(-Werror -Wall -Wextra -pedantic -Wshadow -Wformat=2 -Wfloat-equal -D_GLIBCXX_DEBUG -Wconversion)
check_cxx_compiler_flag(-fsanitize-trap=all CAN_TRAP)
if (CAN_TRAP)
add_compile_options(-fsanitize=undefined -fsanitize-trap=all -fno-sanitize-recover)
add_link_options(-fsanitize=undefined -fsanitize-trap=all -fno-sanitize-recover)
else ()
message(WARNING "Sanitizers not supported!")
endif ()
endif ()
if (CMAKE_BUILD_TYPE STREQUAL "Release")
add_compile_options(-flto)
add_link_options(-flto)
endif ()
if (NOT CMAKE_BUILD_TYPE STREQUAL "Debug")
add_compile_options(-O3)
add_link_options(-O3)
endif ()
message(STATUS "Build type: ${CMAKE_BUILD_TYPE}")
set(CMAKE_POSITION_INDEPENDENT_CODE ON)
add_subdirectory(helpers)
add_subdirectory(DhfsSupportNative)

View File

@@ -1,26 +0,0 @@
set(CMAKE_CXX_STANDARD 17)
find_package(JNI REQUIRED COMPONENTS JVM)
find_package(Java REQUIRED)
include(UseJava)
add_jar(DhfsSupportNative
"${PROJECT_SOURCE_DIR}/../dhfs-parent/supportlib/src/main/java/com/usatiuk/dhfs/supportlib/DhfsSupportNative.java"
"LibPathDummy.java"
GENERATE_NATIVE_HEADERS DhfsSupportNative-native
)
add_library(dhfs_support SHARED
src/DhfsSupportNative.cpp
)
target_compile_options(dhfs_support PRIVATE
-Wno-unused-parameter
)
target_link_libraries(dhfs_support PRIVATE
helpers
DhfsSupportNative-native
)
install(TARGETS dhfs_support LIBRARY DESTINATION "${DHFS_LIB_INSTALL}/${CMAKE_SYSTEM_NAME}-${CMAKE_SYSTEM_PROCESSOR}")

View File

@@ -1,9 +0,0 @@
package com.usatiuk.dhfs.supportlib;
import java.nio.file.Path;
class DhfsNativeLibFinder {
static Path getLibPath() {
return null;
}
}

View File

@@ -1,55 +0,0 @@
#include <cstdio>
#include <cstdlib>
#include <cstdint>
#include <cassert>
#include "com_usatiuk_dhfs_supportlib_DhfsSupportNative.h"
#include "Utils.h"
#include "MemoryHelpers.h"
extern "C" {
JNIEXPORT jlong JNICALL Java_com_usatiuk_dhfs_supportlib_DhfsSupportNative_allocateUninitializedByteBuffer
(JNIEnv* env, jclass klass, jobjectArray bb, jint size) {
if (size < 0) {
env->ThrowNew(env->FindClass("java/lang/IllegalArgumentException"), "Size less than 0?");
return 0;
}
size_t checked_size = checked_cast<size_t>(size);
void* buf;
if (checked_size < MemoryHelpers::get_page_size())
buf = malloc(checked_size);
else
buf = std::aligned_alloc(MemoryHelpers::get_page_size(),
align_up(checked_size, MemoryHelpers::get_page_size()));
if (buf == nullptr) {
env->ThrowNew(env->FindClass("java/lang/OutOfMemoryError"), "Buffer memory allocation failed");
return 0;
}
env->SetObjectArrayElement(bb, 0, env->NewDirectByteBuffer(buf, checked_cast<jlong>(checked_size)));
jlong token = checked_cast<jlong>((uintptr_t) buf);
return token;
}
JNIEXPORT void JNICALL Java_com_usatiuk_dhfs_supportlib_DhfsSupportNative_releaseByteBuffer
(JNIEnv* env, jclass klass, jlong token) {
const auto addr = checked_cast<uintptr_t>(token);
if (addr == 0) {
env->ThrowNew(env->FindClass("java/lang/IllegalArgumentException"), "Trying to free null pointer");
return;
}
free((void*) addr);
}
JNIEXPORT jint JNICALL Java_com_usatiuk_dhfs_supportlib_DhfsSupportNative_getPageSizeInternal
(JNIEnv*, jclass) {
return checked_cast<jint>(MemoryHelpers::get_page_size());
}
}

View File

@@ -1,3 +0,0 @@
FROM rockylinux:8
RUN dnf install -y java-21-openjdk-headless java-21-openjdk-devel cmake gcc gcc-c++

View File

@@ -1,56 +0,0 @@
#!/usr/bin/env bash
CMAKE_ARGS="${CMAKE_ARGS:--DCMAKE_BUILD_TYPE=Debug}"
export SCRIPT_DIR=$(cd -- "$(dirname -- "${BASH_SOURCE[0]}")" &>/dev/null && pwd)
cd "$SCRIPT_DIR"
if [[ "${DO_LOCAL_BUILD^^}" != "TRUE" ]]; then
if [[ "$(uname)" == "Linux" ]]; then
if [[ -z "${DOCKER_PLATFORM}" ]]; then
echo "Already on linux"
exit 0
fi
fi
exec "$SCRIPT_DIR"/docker-launch.sh "$@"
fi
set -euxo pipefail
if [ $# -lt 3 ]; then
echo "Not enough arguments supplied: (build/configure) (build dir) (output dir)"
exit 1
fi
PROJECT_DIR="$SCRIPT_DIR/.."
CONFIGURE_DIR="$2"
INSTALL_DIR="$3"
function configure() {
cmake -B"$CONFIGURE_DIR" -S"$PROJECT_DIR" -DDHFS_LIB_INSTALL="$INSTALL_DIR" $CMAKE_ARGS
}
function build() {
cmake --build "$CONFIGURE_DIR" --target install
}
mkdir -p "$2"
mkdir -p "$3"
case "$1" in
"configure")
configure
;;
"build")
build
;;
"both")
configure
build
;;
*)
echo "Unknown command"
exit 1
;;
esac

View File

@@ -1,21 +0,0 @@
#!/usr/bin/env bash
set -exo pipefail
export SCRIPT_DIR=$(cd -- "$(dirname -- "${BASH_SOURCE[0]}")" &>/dev/null && pwd)
cd "$SCRIPT_DIR"
PLATFORM_ARG=""
if [[ ! -z "${DOCKER_PLATFORM}" ]]; then
PLATFORM_ARG="--platform $DOCKER_PLATFORM"
fi
if [[ -z "${DOCKER_BUILDER_IMAGE}" ]]; then
DOCKER_IMG_FILE=$(mktemp)
docker build $PLATFORM_ARG --iidfile "$DOCKER_IMG_FILE" .
DOCKER_BUILDER_IMAGE="$(cat "$DOCKER_IMG_FILE")"
fi
ROOT_DIR="$(dirname "$(dirname "$SCRIPT_DIR")")"
docker run $PLATFORM_ARG --rm -v "$ROOT_DIR:$ROOT_DIR" -e DO_LOCAL_BUILD=TRUE "$DOCKER_BUILDER_IMAGE" \
"$SCRIPT_DIR/cross-build.sh" "$@"

View File

@@ -1,9 +0,0 @@
set(CMAKE_CXX_STANDARD 17)
add_library(helpers
include/MemoryHelpers.h
src/MemoryHelpers.cpp
include/Utils.h
)
target_include_directories(helpers PUBLIC include)

View File

@@ -1,12 +0,0 @@
//
// Created by stepus53 on 24.8.24.
//
#ifndef MEMORYHELPERS_H
#define MEMORYHELPERS_H
namespace MemoryHelpers {
unsigned int get_page_size();
}
#endif //MEMORYHELPERS_H

View File

@@ -1,41 +0,0 @@
//
// Created by stepus53 on 24.8.24.
//
#ifndef UTILS_H
#define UTILS_H
#include <cassert>
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wsign-conversion"
#pragma GCC diagnostic ignored "-Wsign-compare"
template<typename To, typename From>
constexpr To checked_cast(const From& f) {
To result = static_cast<To>(f);
assert(f == result);
return result;
}
#pragma GCC diagnostic pop
template<typename T, typename A>
T align_up(T what, A alignment) {
assert(__builtin_popcount(alignment) == 1);
const T mask = checked_cast<T>(alignment - 1);
T ret;
if (what & mask)
ret = (what + mask) & ~mask;
else
ret = what;
assert((ret & mask) == 0);
return ret;
}
#endif //UTILS_H

View File

@@ -1,14 +0,0 @@
//
// Created by stepus53 on 24.8.24.
//
#include "MemoryHelpers.h"
#include <unistd.h>
#include "Utils.h"
unsigned int MemoryHelpers::get_page_size() {
static const auto PAGE_SIZE = checked_cast<unsigned int>(sysconf(_SC_PAGESIZE));
return PAGE_SIZE;
}

View File

@@ -36,7 +36,6 @@ java \
-Dquarkus.http.host=0.0.0.0 \
-Dquarkus.log.category.\"com.usatiuk\".level=INFO \
-Dquarkus.log.category.\"com.usatiuk.dhfs\".level=INFO \
-Dcom.usatiuk.dhfs.supportlib.native-path="$SCRIPT_DIR"/NativeLibs \
-Ddhfs.webui.root="$SCRIPT_DIR"/Webui $EXTRAOPTS_PARSED \
-jar "$SCRIPT_DIR"/"DHFS Package"/quarkus-run.jar >quarkus.log 2>&1 &

9
thirdparty/lazyfs/build.sh vendored Executable file
View File

@@ -0,0 +1,9 @@
# apt install g++ cmake libfuse3-dev libfuse3-3 fuse3
export CMAKE_BUILD_PARALLEL_LEVEL="$(nproc)"
cd lazyfs
cd libs/libpcache && ./build.sh && cd -
cd lazyfs && ./build.sh && cd -

1
thirdparty/lazyfs/lazyfs vendored Submodule