mirror of
https://github.com/usatiuk/dhfs.git
synced 2025-10-29 04:57:48 +01:00
Compare commits
24 Commits
bb-keys
...
4f5f347b3c
| Author | SHA1 | Date | |
|---|---|---|---|
| 4f5f347b3c | |||
| bd5395e03f | |||
| f56f564e8b | |||
| eaa413e200 | |||
| f3e4d99fcb | |||
| 1c71b26ed8 | |||
| e6f95ef028 | |||
| 59e8f6a6b4 | |||
| 0292df7f0e | |||
| a6a4101bb0 | |||
| 59fa5dcf28 | |||
| 0f5fb8b8b6 | |||
| c087dd8971 | |||
| 14ddddd0ff | |||
| 9859378627 | |||
| e167c21d40 | |||
| 7dc8f486ea | |||
| da1a996e6f | |||
| bb52a3af0e | |||
| de0b868349 | |||
| d4d4e150c1 | |||
| c9b0400d50 | |||
| 94218330b1 | |||
| dbe2a72f7c |
114
.github/workflows/server.yml
vendored
114
.github/workflows/server.yml
vendored
@@ -89,102 +89,6 @@ jobs:
|
||||
name: Webui
|
||||
path: webui/dist
|
||||
|
||||
build-native-libs:
|
||||
strategy:
|
||||
matrix:
|
||||
include:
|
||||
- os: ubuntu-latest
|
||||
cross: "linux/amd64"
|
||||
- os: ubuntu-latest
|
||||
cross: "linux/arm64"
|
||||
- os: macos-latest
|
||||
|
||||
runs-on: ${{ matrix.os }}
|
||||
env:
|
||||
DO_LOCAL_BUILD: ${{ matrix.os == 'macos-latest' }}
|
||||
DOCKER_PLATFORM: ${{ matrix.cross || 'NATIVE' }}
|
||||
|
||||
steps:
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v4
|
||||
|
||||
- name: Set SANITIZED_DOCKER_PLATFORM
|
||||
run: echo "SANITIZED_DOCKER_PLATFORM=$(echo $DOCKER_PLATFORM | tr / _ )" >> $GITHUB_ENV
|
||||
|
||||
- name: Set DOCKER_BUILDER_IMAGE
|
||||
run: echo "DOCKER_BUILDER_IMAGE=dhfs_lib_builder-${{matrix.os}}-$SANITIZED_DOCKER_PLATFORM" >> $GITHUB_ENV
|
||||
|
||||
- name: Build config
|
||||
run: |
|
||||
echo DO_LOCAL_BUILD: $DO_LOCAL_BUILD
|
||||
echo DOCKER_PLATFORM: $DOCKER_PLATFORM
|
||||
echo SANITIZED_DOCKER_PLATFORM: $SANITIZED_DOCKER_PLATFORM
|
||||
echo DOCKER_BUILDER_IMAGE: $DOCKER_BUILDER_IMAGE
|
||||
|
||||
- name: Set up JDK 21
|
||||
if: ${{ env.DO_LOCAL_BUILD == 'TRUE' }}
|
||||
uses: actions/setup-java@v4
|
||||
with:
|
||||
java-version: "21"
|
||||
distribution: "zulu"
|
||||
cache: maven
|
||||
|
||||
- name: Set up Docker Buildx
|
||||
if: ${{ env.DO_LOCAL_BUILD != 'TRUE' }}
|
||||
uses: docker/setup-buildx-action@v3
|
||||
|
||||
- name: Set up QEMU
|
||||
if: ${{ env.DO_LOCAL_BUILD != 'TRUE' }}
|
||||
uses: docker/setup-qemu-action@v3
|
||||
|
||||
- name: Build Docker builder image
|
||||
if: ${{ env.DO_LOCAL_BUILD != 'TRUE' }}
|
||||
uses: docker/build-push-action@v5
|
||||
with:
|
||||
context: ./libdhfs_support/builder
|
||||
file: ./libdhfs_support/builder/Dockerfile
|
||||
push: false
|
||||
platforms: ${{ env.DOCKER_PLATFORM }}
|
||||
tags: ${{ env.DOCKER_BUILDER_IMAGE }}
|
||||
cache-from: type=gha,scope=build-${{ env.DOCKER_BUILDER_IMAGE }}
|
||||
cache-to: type=gha,mode=max,scope=build-${{ env.DOCKER_BUILDER_IMAGE }}
|
||||
load: true
|
||||
|
||||
- name: Build the library
|
||||
run: |
|
||||
CMAKE_ARGS="-DCMAKE_BUILD_TYPE=Release" libdhfs_support/builder/cross-build.sh both build "$(pwd)/result"
|
||||
|
||||
- name: Upload build
|
||||
uses: actions/upload-artifact@v4
|
||||
with:
|
||||
name: NativeLib-${{ matrix.os }}-${{ env.SANITIZED_DOCKER_PLATFORM }}
|
||||
path: result
|
||||
|
||||
merge-native-libs:
|
||||
runs-on: ubuntu-latest
|
||||
needs: [build-native-libs]
|
||||
steps:
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v4
|
||||
|
||||
- name: Download artifacts
|
||||
uses: actions/download-artifact@v4
|
||||
with:
|
||||
path: downloaded-libs
|
||||
|
||||
- name: Merge all
|
||||
run: rsync -av downloaded-libs/NativeLib*/* result/
|
||||
|
||||
- name: Check that libs exists
|
||||
run: |
|
||||
test -f "result/Linux-x86_64/libdhfs_support.so" || exit 1
|
||||
|
||||
- name: Upload
|
||||
uses: actions/upload-artifact@v4
|
||||
with:
|
||||
name: NativeLibs
|
||||
path: result
|
||||
|
||||
publish-docker:
|
||||
runs-on: ubuntu-latest
|
||||
permissions:
|
||||
@@ -194,7 +98,7 @@ jobs:
|
||||
# with sigstore/fulcio when running outside of PRs.
|
||||
id-token: write
|
||||
|
||||
needs: [build-webui, merge-native-libs, build-dhfs]
|
||||
needs: [build-webui, build-dhfs]
|
||||
|
||||
steps:
|
||||
- name: Checkout repository
|
||||
@@ -212,12 +116,6 @@ jobs:
|
||||
name: Webui
|
||||
path: webui-dist-downloaded
|
||||
|
||||
- name: Download native libs
|
||||
uses: actions/download-artifact@v4
|
||||
with:
|
||||
name: NativeLibs
|
||||
path: dhfs-native-downloaded
|
||||
|
||||
- name: Show all the files
|
||||
run: find .
|
||||
|
||||
@@ -293,7 +191,7 @@ jobs:
|
||||
# with sigstore/fulcio when running outside of PRs.
|
||||
id-token: write
|
||||
|
||||
needs: [build-webui, merge-native-libs, build-dhfs]
|
||||
needs: [build-webui, build-dhfs]
|
||||
|
||||
steps:
|
||||
- name: Checkout repository
|
||||
@@ -309,11 +207,6 @@ jobs:
|
||||
name: Webui
|
||||
path: webui-dist-downloaded
|
||||
|
||||
- uses: actions/download-artifact@v4
|
||||
with:
|
||||
name: NativeLibs
|
||||
path: dhfs-native-downloaded
|
||||
|
||||
- name: Show all the files
|
||||
run: find .
|
||||
|
||||
@@ -326,9 +219,6 @@ jobs:
|
||||
- name: Copy Webui
|
||||
run: cp -r ./webui-dist-downloaded "run-wrapper-out/dhfs/app/Webui"
|
||||
|
||||
- name: Copy Webui
|
||||
run: cp -r ./dhfs-native-downloaded "run-wrapper-out/dhfs/app/NativeLibs"
|
||||
|
||||
- name: Copy run wrapper
|
||||
run: cp -r ./run-wrapper/* "run-wrapper-out/dhfs/app/"
|
||||
|
||||
|
||||
@@ -9,8 +9,6 @@ COPY ./dhfs-package-downloaded/*.jar .
|
||||
COPY ./dhfs-package-downloaded/app .
|
||||
COPY ./dhfs-package-downloaded/quarkus .
|
||||
|
||||
WORKDIR /usr/src/app/native-libs
|
||||
COPY ./dhfs-native-downloaded/. .
|
||||
WORKDIR /usr/src/app/webui
|
||||
COPY ./webui-dist-downloaded/. .
|
||||
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
<configuration default="false" name="Main 2" type="QsApplicationConfigurationType" factoryName="QuarkusApplication">
|
||||
<option name="MAIN_CLASS_NAME" value="com.usatiuk.dhfs.app.Main" />
|
||||
<module name="dhfs-app" />
|
||||
<option name="VM_PARAMETERS" value="-XX:+UnlockDiagnosticVMOptions -XX:+DebugNonSafepoints --add-exports java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-exports java.base/jdk.internal.access=ALL-UNNAMED -ea -Dcom.usatiuk.dhfs.supportlib.native-path=$ProjectFileDir$/target/classes/native -Xmx2G -Ddhfs.webui.root=$ProjectFileDir$/../webui/dist -Ddhfs.fuse.root=${HOME}/dhfs_test/2/fuse -Ddhfs.objects.persistence.files.root=${HOME}/dhfs_test/2/data -Ddhfs.objects.persistence.stuff.root=${HOME}/dhfs_test/2/data/stuff -Ddhfs.objects.peerdiscovery.broadcast=false -Dquarkus.http.port=9020 -Dquarkus.http.ssl-port=9021 -Ddhfs.peerdiscovery.preset-uuid=22000000-0000-0000-0000-000000000000 -Ddhfs.peerdiscovery.static-peers=11000000-0000-0000-0000-000000000000:127.0.0.1:9010:9011" />
|
||||
<option name="VM_PARAMETERS" value="-XX:+UnlockDiagnosticVMOptions -XX:+DebugNonSafepoints --add-exports java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-exports java.base/jdk.internal.access=ALL-UNNAMED -ea -Xmx2G -Ddhfs.webui.root=$ProjectFileDir$/../webui/dist -Ddhfs.fuse.root=${HOME}/dhfs_test/2/fuse -Ddhfs.objects.persistence.files.root=${HOME}/dhfs_test/2/data -Ddhfs.objects.persistence.stuff.root=${HOME}/dhfs_test/2/data/stuff -Ddhfs.objects.peerdiscovery.broadcast=false -Dquarkus.http.port=9020 -Dquarkus.http.ssl-port=9021 -Ddhfs.peerdiscovery.preset-uuid=22000000-0000-0000-0000-000000000000 -Ddhfs.peerdiscovery.static-peers=11000000-0000-0000-0000-000000000000:127.0.0.1:8080:9011" />
|
||||
<extension name="coverage">
|
||||
<pattern>
|
||||
<option name="PATTERN" value="com.usatiuk.dhfs.*" />
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
<configuration default="false" name="Main" type="QsApplicationConfigurationType" factoryName="QuarkusApplication" nameIsGenerated="true">
|
||||
<option name="MAIN_CLASS_NAME" value="com.usatiuk.dhfs.app.Main" />
|
||||
<module name="dhfs-app" />
|
||||
<option name="VM_PARAMETERS" value="-XX:+UnlockDiagnosticVMOptions -XX:+DebugNonSafepoints --add-exports java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-exports java.base/jdk.internal.access=ALL-UNNAMED -ea -Dcom.usatiuk.dhfs.supportlib.native-path=$ProjectFileDir$/target/classes/native -Xmx2G -Ddhfs.webui.root=$ProjectFileDir$/../webui/dist -Ddhfs.fuse.root=${HOME}/dhfs_test/1/fuse -Ddhfs.objects.persistence.files.root=${HOME}/dhfs_test/1/data -Ddhfs.objects.persistence.stuff.root=${HOME}/dhfs_test/1/data/stuff -Ddhfs.objects.peerdiscovery.broadcast=true -Dquarkus.http.port=8080 -Dquarkus.http.ssl-port=9011 -Ddhfs.peerdiscovery.preset-uuid=11000000-0000-0000-0000-000000000000 -Ddhfs.peerdiscovery.static-peers=22000000-0000-0000-0000-000000000000:127.0.0.1:9020:9021 -Dquarkus.http.host=0.0.0.0" />
|
||||
<option name="VM_PARAMETERS" value="-XX:+UnlockDiagnosticVMOptions --enable-preview -XX:+UseParallelGC -XX:+DebugNonSafepoints --add-exports java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-exports java.base/jdk.internal.access=ALL-UNNAMED -ea -Xmx2G -Ddhfs.webui.root=$ProjectFileDir$/../webui/dist -Ddhfs.fuse.root=${HOME}/dhfs_test/1/fuse -Ddhfs.objects.persistence.files.root=${HOME}/dhfs_test/1/data -Ddhfs.objects.persistence.stuff.root=${HOME}/dhfs_test/1/data/stuff -Ddhfs.objects.peerdiscovery.broadcast=true -Dquarkus.http.port=8080 -Dquarkus.http.ssl-port=9011 -Ddhfs.peerdiscovery.preset-uuid=11000000-0000-0000-0000-000000000000 -Ddhfs.peerdiscovery.static-peers=22000000-0000-0000-0000-000000000000:127.0.0.1:9020:9021 -Dquarkus.http.host=0.0.0.0" />
|
||||
<extension name="coverage">
|
||||
<pattern>
|
||||
<option name="PATTERN" value="com.usatiuk.dhfs.*" />
|
||||
|
||||
@@ -72,26 +72,6 @@
|
||||
<artifactId>quarkus-junit5</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.github.SerCeMan</groupId>
|
||||
<artifactId>jnr-fuse</artifactId>
|
||||
<version>44ed40f8ce</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.github.jnr</groupId>
|
||||
<artifactId>jnr-ffi</artifactId>
|
||||
<version>2.2.16</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.github.jnr</groupId>
|
||||
<artifactId>jnr-posix</artifactId>
|
||||
<version>3.1.19</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.github.jnr</groupId>
|
||||
<artifactId>jnr-constants</artifactId>
|
||||
<version>0.10.4</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.commons</groupId>
|
||||
<artifactId>commons-lang3</artifactId>
|
||||
@@ -127,11 +107,6 @@
|
||||
<artifactId>kleppmanntree</artifactId>
|
||||
<version>1.0-SNAPSHOT</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.usatiuk.dhfs</groupId>
|
||||
<artifactId>supportlib</artifactId>
|
||||
<version>1.0-SNAPSHOT</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.usatiuk.dhfs</groupId>
|
||||
<artifactId>objects</artifactId>
|
||||
|
||||
@@ -79,7 +79,6 @@ public class DhfsImage implements Future<String> {
|
||||
"-Ddhfs.objects.sync.timeout=10",
|
||||
"-Ddhfs.objects.sync.ping.timeout=5",
|
||||
"-Ddhfs.objects.reconnect_interval=1s",
|
||||
"-Dcom.usatiuk.dhfs.supportlib.native-path=/libs",
|
||||
"-Dquarkus.log.category.\"com.usatiuk\".level=TRACE",
|
||||
"-Dquarkus.log.category.\"com.usatiuk.dhfs\".level=TRACE",
|
||||
"-Ddhfs.objects.periodic-push-op-interval=5s",
|
||||
|
||||
@@ -5,7 +5,6 @@ dhfs.objects.ref_verification=true
|
||||
dhfs.objects.deletion.delay=0
|
||||
quarkus.log.category."com.usatiuk.dhfs".level=TRACE
|
||||
quarkus.log.category."com.usatiuk.dhfs".min-level=TRACE
|
||||
quarkus.class-loading.parent-first-artifacts=com.usatiuk.dhfs:supportlib
|
||||
quarkus.http.test-port=0
|
||||
quarkus.http.test-ssl-port=0
|
||||
dhfs.local-discovery=false
|
||||
|
||||
@@ -72,26 +72,6 @@
|
||||
<artifactId>quarkus-junit5</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.github.SerCeMan</groupId>
|
||||
<artifactId>jnr-fuse</artifactId>
|
||||
<version>44ed40f8ce</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.github.jnr</groupId>
|
||||
<artifactId>jnr-ffi</artifactId>
|
||||
<version>2.2.16</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.github.jnr</groupId>
|
||||
<artifactId>jnr-posix</artifactId>
|
||||
<version>3.1.19</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.github.jnr</groupId>
|
||||
<artifactId>jnr-constants</artifactId>
|
||||
<version>0.10.4</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.commons</groupId>
|
||||
<artifactId>commons-lang3</artifactId>
|
||||
@@ -127,11 +107,6 @@
|
||||
<artifactId>kleppmanntree</artifactId>
|
||||
<version>1.0-SNAPSHOT</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.usatiuk.dhfs</groupId>
|
||||
<artifactId>supportlib</artifactId>
|
||||
<version>1.0-SNAPSHOT</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.usatiuk.dhfs</groupId>
|
||||
<artifactId>objects</artifactId>
|
||||
|
||||
@@ -1,10 +1,10 @@
|
||||
package com.usatiuk.dhfs.files.objects;
|
||||
|
||||
import com.usatiuk.dhfs.JDataRemote;
|
||||
import com.usatiuk.objects.JObjectKey;
|
||||
import com.usatiuk.dhfs.jmap.JMapHolder;
|
||||
import com.usatiuk.dhfs.jmap.JMapLongKey;
|
||||
import com.usatiuk.dhfs.repository.JDataRemoteDto;
|
||||
import com.usatiuk.objects.JObjectKey;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.Set;
|
||||
@@ -28,6 +28,10 @@ public record File(JObjectKey key, long mode, long cTime, long mTime,
|
||||
return new File(key, mode, cTime, mTime, symlink);
|
||||
}
|
||||
|
||||
public File withCurrentMTime() {
|
||||
return new File(key, mode, cTime, System.currentTimeMillis(), symlink);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Collection<JObjectKey> collectRefsTo() {
|
||||
return Set.of();
|
||||
|
||||
@@ -33,6 +33,7 @@ import jakarta.inject.Inject;
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
import org.eclipse.microprofile.config.inject.ConfigProperty;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.nio.file.Path;
|
||||
import java.util.*;
|
||||
@@ -246,7 +247,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
} else if (dent instanceof RemoteObjectMeta) {
|
||||
var remote = remoteTx.getData(JDataRemote.class, uuid).orElse(null);
|
||||
if (remote instanceof File f) {
|
||||
remoteTx.putData(f.withMode(mode).withMTime(System.currentTimeMillis()));
|
||||
remoteTx.putData(f.withMode(mode).withCurrentMTime());
|
||||
return true;
|
||||
} else {
|
||||
throw new IllegalArgumentException(uuid + " is not a file");
|
||||
@@ -410,7 +411,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
if (existingEnd < offset) {
|
||||
if (!pendingPrefix.isEmpty()) {
|
||||
int diff = Math.toIntExact(offset - existingEnd);
|
||||
pendingPrefix = pendingPrefix.concat(ByteString.copyFrom(new byte[diff]));
|
||||
pendingPrefix = pendingPrefix.concat(UnsafeByteOperations.unsafeWrap(ByteBuffer.allocateDirect(diff)));
|
||||
} else {
|
||||
fillZeros(existingEnd, offset, newChunks);
|
||||
start = offset;
|
||||
@@ -452,7 +453,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
jMapHelper.put(file, JMapLongKey.of(e.getKey()), e.getValue());
|
||||
}
|
||||
|
||||
remoteTx.putData(file);
|
||||
remoteTx.putData(file.withCurrentMTime());
|
||||
|
||||
return (long) data.size();
|
||||
});
|
||||
@@ -543,7 +544,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
jMapHelper.put(file, JMapLongKey.of(e.getKey()), e.getValue());
|
||||
}
|
||||
|
||||
remoteTx.putData(file);
|
||||
remoteTx.putData(file.withCurrentMTime());
|
||||
return true;
|
||||
});
|
||||
}
|
||||
@@ -572,7 +573,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
}
|
||||
|
||||
if (!zeroCache.containsKey(end - cur))
|
||||
zeroCache.put(end - cur, createChunk(UnsafeByteOperations.unsafeWrap(new byte[Math.toIntExact(end - cur)])));
|
||||
zeroCache.put(end - cur, createChunk(UnsafeByteOperations.unsafeWrap(ByteBuffer.allocateDirect(Math.toIntExact(end - cur)))));
|
||||
|
||||
ChunkData newChunkData = zeroCache.get(end - cur);
|
||||
newChunks.put(start, newChunkData.key());
|
||||
|
||||
@@ -111,6 +111,7 @@ public abstract class DhfsFileServiceSimpleTestImpl {
|
||||
|
||||
var uuid = ret.get();
|
||||
|
||||
var curMtime = fileService.getattr(uuid).get().mtime();
|
||||
fileService.write(uuid, 0, new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9});
|
||||
Assertions.assertArrayEquals(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, fileService.read(uuid, 0, 10).get().toByteArray());
|
||||
Assertions.assertArrayEquals(new byte[]{2, 3, 4, 5, 6, 7, 8, 9}, fileService.read(uuid, 2, 8).get().toByteArray());
|
||||
@@ -123,6 +124,9 @@ public abstract class DhfsFileServiceSimpleTestImpl {
|
||||
fileService.write(uuid, 3, new byte[]{17, 18});
|
||||
Assertions.assertArrayEquals(new byte[]{0, 1, 2, 17, 18, 11, 15, 16, 8, 9, 13, 14}, fileService.read(uuid, 0, 12).get().toByteArray());
|
||||
|
||||
var newMtime = fileService.getattr(uuid).get().mtime();
|
||||
Assertions.assertTrue(newMtime > curMtime);
|
||||
|
||||
fileService.unlink("/writeTest");
|
||||
Assertions.assertFalse(fileService.open("/writeTest").isPresent());
|
||||
}
|
||||
|
||||
@@ -5,7 +5,6 @@ dhfs.objects.ref_verification=true
|
||||
dhfs.objects.deletion.delay=0
|
||||
quarkus.log.category."com.usatiuk.dhfs".level=TRACE
|
||||
quarkus.log.category."com.usatiuk.dhfs".min-level=TRACE
|
||||
quarkus.class-loading.parent-first-artifacts=com.usatiuk.dhfs:supportlib
|
||||
quarkus.http.test-port=0
|
||||
quarkus.http.test-ssl-port=0
|
||||
dhfs.local-discovery=false
|
||||
|
||||
@@ -73,24 +73,9 @@
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.github.SerCeMan</groupId>
|
||||
<groupId>com.github.serceman</groupId>
|
||||
<artifactId>jnr-fuse</artifactId>
|
||||
<version>44ed40f8ce</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.github.jnr</groupId>
|
||||
<artifactId>jnr-ffi</artifactId>
|
||||
<version>2.2.16</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.github.jnr</groupId>
|
||||
<artifactId>jnr-posix</artifactId>
|
||||
<version>3.1.19</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.github.jnr</groupId>
|
||||
<artifactId>jnr-constants</artifactId>
|
||||
<version>0.10.4</version>
|
||||
<version>0.5.8</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.commons</groupId>
|
||||
@@ -127,11 +112,6 @@
|
||||
<artifactId>kleppmanntree</artifactId>
|
||||
<version>1.0-SNAPSHOT</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.usatiuk.dhfs</groupId>
|
||||
<artifactId>supportlib</artifactId>
|
||||
<version>1.0-SNAPSHOT</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.usatiuk.dhfs</groupId>
|
||||
<artifactId>objects</artifactId>
|
||||
|
||||
@@ -5,7 +5,6 @@ import com.sun.security.auth.module.UnixSystem;
|
||||
import com.usatiuk.dhfs.files.service.DhfsFileService;
|
||||
import com.usatiuk.dhfs.files.service.DirectoryNotEmptyException;
|
||||
import com.usatiuk.dhfs.files.service.GetattrRes;
|
||||
import com.usatiuk.dhfs.supportlib.UninitializedByteBuffer;
|
||||
import com.usatiuk.kleppmanntree.AlreadyExistsException;
|
||||
import com.usatiuk.objects.JObjectKey;
|
||||
import io.grpc.Status;
|
||||
@@ -28,6 +27,7 @@ import ru.serce.jnrfuse.struct.FuseFileInfo;
|
||||
import ru.serce.jnrfuse.struct.Statvfs;
|
||||
import ru.serce.jnrfuse.struct.Timespec;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.file.Paths;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Optional;
|
||||
@@ -237,7 +237,7 @@ public class DhfsFuse extends FuseStubFS {
|
||||
if (offset < 0) return -ErrorCodes.EINVAL();
|
||||
try {
|
||||
var fileKey = getFromHandle(fi.fh.get());
|
||||
var buffer = UninitializedByteBuffer.allocateUninitialized((int) size);
|
||||
var buffer = ByteBuffer.allocateDirect((int) size);
|
||||
|
||||
if (buffer.isDirect()) {
|
||||
jnrPtrByteOutputAccessors.getUnsafe().copyMemory(
|
||||
|
||||
@@ -5,7 +5,6 @@ dhfs.objects.ref_verification=true
|
||||
dhfs.objects.deletion.delay=0
|
||||
quarkus.log.category."com.usatiuk.dhfs".level=TRACE
|
||||
quarkus.log.category."com.usatiuk.dhfs".min-level=TRACE
|
||||
quarkus.class-loading.parent-first-artifacts=com.usatiuk.dhfs:supportlib
|
||||
quarkus.http.test-port=0
|
||||
quarkus.http.test-ssl-port=0
|
||||
dhfs.local-discovery=false
|
||||
|
||||
@@ -18,6 +18,11 @@
|
||||
</properties>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>net.jqwik</groupId>
|
||||
<artifactId>jqwik</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.quarkus</groupId>
|
||||
<artifactId>quarkus-junit5</artifactId>
|
||||
@@ -54,11 +59,6 @@
|
||||
<artifactId>utils</artifactId>
|
||||
<version>1.0-SNAPSHOT</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.usatiuk.dhfs</groupId>
|
||||
<artifactId>supportlib</artifactId>
|
||||
<version>1.0-SNAPSHOT</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.quarkus</groupId>
|
||||
<artifactId>quarkus-junit5-mockito</artifactId>
|
||||
|
||||
@@ -1,6 +1,13 @@
|
||||
package com.usatiuk.objects;
|
||||
|
||||
public interface JDataVersionedWrapper {
|
||||
import com.usatiuk.objects.iterators.Data;
|
||||
|
||||
public sealed interface JDataVersionedWrapper extends Data<JDataVersionedWrapper> permits JDataVersionedWrapperLazy, JDataVersionedWrapperImpl {
|
||||
@Override
|
||||
default JDataVersionedWrapper value() {
|
||||
return this;
|
||||
}
|
||||
|
||||
JData data();
|
||||
|
||||
long version();
|
||||
|
||||
@@ -2,11 +2,11 @@ package com.usatiuk.objects;
|
||||
|
||||
import java.util.function.Supplier;
|
||||
|
||||
public class JDataVersionedWrapperLazy implements JDataVersionedWrapper {
|
||||
public final class JDataVersionedWrapperLazy implements JDataVersionedWrapper {
|
||||
private JData _data;
|
||||
private final long _version;
|
||||
private final int _estimatedSize;
|
||||
private Supplier<JData> _producer;
|
||||
private JData _data;
|
||||
|
||||
public JDataVersionedWrapperLazy(long version, int estimatedSize, Supplier<JData> producer) {
|
||||
_version = version;
|
||||
@@ -14,6 +14,18 @@ public class JDataVersionedWrapperLazy implements JDataVersionedWrapper {
|
||||
_producer = producer;
|
||||
}
|
||||
|
||||
public void setCacheCallback(Runnable cacheCallback) {
|
||||
if (_data != null) {
|
||||
throw new IllegalStateException("Cache callback can be set only before data is loaded");
|
||||
}
|
||||
var oldProducer = _producer;
|
||||
_producer = () -> {
|
||||
var ret = oldProducer.get();
|
||||
cacheCallback.run();
|
||||
return ret;
|
||||
};
|
||||
}
|
||||
|
||||
public JData data() {
|
||||
if (_data != null)
|
||||
return _data;
|
||||
|
||||
@@ -2,12 +2,12 @@ package com.usatiuk.objects;
|
||||
|
||||
|
||||
import com.google.protobuf.ByteString;
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Singleton;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
@ApplicationScoped
|
||||
@Singleton
|
||||
public class JDataVersionedWrapperSerializer implements ObjectSerializer<JDataVersionedWrapper> {
|
||||
@Inject
|
||||
ObjectSerializer<JData> dataSerializer;
|
||||
|
||||
@@ -26,11 +26,13 @@ public sealed interface JObjectKey extends Serializable, Comparable<JObjectKey>
|
||||
}
|
||||
|
||||
static JObjectKey fromBytes(byte[] bytes) {
|
||||
return new JObjectKeyImpl(new String(bytes, StandardCharsets.UTF_8));
|
||||
return new JObjectKeyImpl(new String(bytes, StandardCharsets.ISO_8859_1));
|
||||
}
|
||||
|
||||
static JObjectKey fromByteBuffer(ByteBuffer buff) {
|
||||
return new JObjectKeyImpl(StandardCharsets.UTF_8.decode(buff).toString());
|
||||
byte[] bytes = new byte[buff.remaining()];
|
||||
buff.get(bytes);
|
||||
return new JObjectKeyImpl(bytes);
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -39,8 +41,6 @@ public sealed interface JObjectKey extends Serializable, Comparable<JObjectKey>
|
||||
@Override
|
||||
String toString();
|
||||
|
||||
byte[] bytes();
|
||||
|
||||
ByteBuffer toByteBuffer();
|
||||
|
||||
String value();
|
||||
|
||||
@@ -1,11 +1,24 @@
|
||||
package com.usatiuk.objects;
|
||||
|
||||
import com.usatiuk.dhfs.supportlib.UninitializedByteBuffer;
|
||||
|
||||
import java.io.Serial;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.Objects;
|
||||
|
||||
public final class JObjectKeyImpl implements JObjectKey {
|
||||
@Serial
|
||||
private static final long serialVersionUID = 0L;
|
||||
private final String value;
|
||||
private transient ByteBuffer _bb = null;
|
||||
|
||||
public JObjectKeyImpl(String value) {
|
||||
this.value = value;
|
||||
}
|
||||
|
||||
public JObjectKeyImpl(byte[] bytes) {
|
||||
this.value = new String(bytes, StandardCharsets.ISO_8859_1);
|
||||
}
|
||||
|
||||
public record JObjectKeyImpl(String value) implements JObjectKey {
|
||||
@Override
|
||||
public int compareTo(JObjectKey o) {
|
||||
switch (o) {
|
||||
@@ -27,17 +40,36 @@ public record JObjectKeyImpl(String value) implements JObjectKey {
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] bytes() {
|
||||
return value.getBytes(StandardCharsets.UTF_8);
|
||||
public ByteBuffer toByteBuffer() {
|
||||
if (_bb != null) return _bb;
|
||||
|
||||
synchronized (this) {
|
||||
if (_bb != null) return _bb;
|
||||
var bytes = value.getBytes(StandardCharsets.ISO_8859_1);
|
||||
var directBb = ByteBuffer.allocateDirect(bytes.length);
|
||||
directBb.put(bytes);
|
||||
directBb.flip();
|
||||
_bb = directBb;
|
||||
return directBb;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public ByteBuffer toByteBuffer() {
|
||||
var heapBb = StandardCharsets.UTF_8.encode(value);
|
||||
if (heapBb.isDirect()) return heapBb;
|
||||
var directBb = UninitializedByteBuffer.allocateUninitialized(heapBb.remaining());
|
||||
directBb.put(heapBb);
|
||||
directBb.flip();
|
||||
return directBb;
|
||||
public String value() {
|
||||
return value;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
if (obj == this) return true;
|
||||
if (obj == null || obj.getClass() != this.getClass()) return false;
|
||||
var that = (JObjectKeyImpl) obj;
|
||||
return Objects.equals(this.value, that.value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(value);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -18,11 +18,6 @@ public record JObjectKeyMax() implements JObjectKey {
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] bytes() {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ByteBuffer toByteBuffer() {
|
||||
throw new UnsupportedOperationException();
|
||||
|
||||
@@ -18,11 +18,6 @@ public record JObjectKeyMin() implements JObjectKey {
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] bytes() {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ByteBuffer toByteBuffer() {
|
||||
throw new UnsupportedOperationException();
|
||||
|
||||
@@ -2,9 +2,6 @@ package com.usatiuk.objects.iterators;
|
||||
|
||||
import java.util.Optional;
|
||||
|
||||
public record Data<V>(V value) implements MaybeTombstone<V> {
|
||||
@Override
|
||||
public Optional<V> opt() {
|
||||
return Optional.of(value);
|
||||
}
|
||||
public interface Data<V> extends MaybeTombstone<V> {
|
||||
V value();
|
||||
}
|
||||
|
||||
@@ -0,0 +1,6 @@
|
||||
package com.usatiuk.objects.iterators;
|
||||
|
||||
import java.util.Optional;
|
||||
|
||||
public record DataWrapper<V>(V value) implements Data<V> {
|
||||
}
|
||||
@@ -3,5 +3,4 @@ package com.usatiuk.objects.iterators;
|
||||
import java.util.Optional;
|
||||
|
||||
public interface MaybeTombstone<T> {
|
||||
Optional<T> opt();
|
||||
}
|
||||
|
||||
@@ -1,25 +1,37 @@
|
||||
package com.usatiuk.objects.iterators;
|
||||
|
||||
import io.quarkus.logging.Log;
|
||||
import org.apache.commons.lang3.mutable.MutableObject;
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.stream.IntStream;
|
||||
import java.util.List;
|
||||
import java.util.NavigableMap;
|
||||
import java.util.NoSuchElementException;
|
||||
import java.util.TreeMap;
|
||||
|
||||
public class MergingKvIterator<K extends Comparable<K>, V> extends ReversibleKvIterator<K, V> {
|
||||
private final NavigableMap<K, CloseableKvIterator<K, V>> _sortedIterators = new TreeMap<>();
|
||||
private record IteratorEntry<K extends Comparable<K>, V>(int priority, CloseableKvIterator<K, V> iterator) {
|
||||
public IteratorEntry<K, V> reversed() {
|
||||
return new IteratorEntry<>(priority, iterator.reversed());
|
||||
}
|
||||
}
|
||||
|
||||
private final NavigableMap<K, IteratorEntry<K, V>> _sortedIterators = new TreeMap<>();
|
||||
private final String _name;
|
||||
private final Map<CloseableKvIterator<K, V>, Integer> _iterators;
|
||||
private final List<IteratorEntry<K, V>> _iterators;
|
||||
|
||||
public MergingKvIterator(String name, IteratorStart startType, K startKey, List<IterProdFn<K, V>> iterators) {
|
||||
_goingForward = true;
|
||||
_name = name;
|
||||
|
||||
_iterators = Map.ofEntries(
|
||||
IntStream.range(0, iterators.size())
|
||||
.mapToObj(i -> Pair.of(iterators.get(i).get(startType, startKey), i))
|
||||
.toArray(Pair[]::new)
|
||||
);
|
||||
// Why streams are so slow?
|
||||
{
|
||||
IteratorEntry<K, V>[] iteratorEntries = new IteratorEntry[iterators.size()];
|
||||
for (int i = 0; i < iterators.size(); i++) {
|
||||
iteratorEntries[i] = new IteratorEntry<>(i, iterators.get(i).get(startType, startKey));
|
||||
}
|
||||
_iterators = List.of(iteratorEntries);
|
||||
}
|
||||
|
||||
if (startType == IteratorStart.LT || startType == IteratorStart.LE) {
|
||||
// Starting at a greatest key less than/less or equal than:
|
||||
@@ -30,7 +42,8 @@ public class MergingKvIterator<K extends Comparable<K>, V> extends ReversibleKvI
|
||||
K greatestLess = null;
|
||||
K smallestMore = null;
|
||||
|
||||
for (var it : _iterators.keySet()) {
|
||||
for (var ite : _iterators) {
|
||||
var it = ite.iterator();
|
||||
if (it.hasNext()) {
|
||||
var peeked = it.peekNextKey();
|
||||
if (startType == IteratorStart.LE ? peeked.compareTo(startKey) <= 0 : peeked.compareTo(startKey) < 0) {
|
||||
@@ -55,14 +68,15 @@ public class MergingKvIterator<K extends Comparable<K>, V> extends ReversibleKvI
|
||||
// Empty iterators
|
||||
}
|
||||
|
||||
for (var iterator : _iterators.keySet()) {
|
||||
for (var ite : _iterators) {
|
||||
var iterator = ite.iterator();
|
||||
while (iterator.hasNext() && iterator.peekNextKey().compareTo(initialMaxValue) < 0) {
|
||||
iterator.skip();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for (CloseableKvIterator<K, V> iterator : _iterators.keySet()) {
|
||||
for (IteratorEntry<K, V> iterator : _iterators) {
|
||||
advanceIterator(iterator);
|
||||
}
|
||||
|
||||
@@ -88,29 +102,39 @@ public class MergingKvIterator<K extends Comparable<K>, V> extends ReversibleKvI
|
||||
this(name, startType, startKey, List.of(iterators));
|
||||
}
|
||||
|
||||
private void advanceIterator(CloseableKvIterator<K, V> iterator) {
|
||||
if (!iterator.hasNext()) {
|
||||
return;
|
||||
}
|
||||
private void advanceIterator(IteratorEntry<K, V> iteratorEntry) {
|
||||
while (iteratorEntry.iterator().hasNext()) {
|
||||
K key = iteratorEntry.iterator().peekNextKey();
|
||||
Log.tracev("{0} Advance peeked: {1}-{2}", _name, iteratorEntry, key);
|
||||
|
||||
K key = iterator.peekNextKey();
|
||||
Log.tracev("{0} Advance peeked: {1}-{2}", _name, iterator, key);
|
||||
if (!_sortedIterators.containsKey(key)) {
|
||||
_sortedIterators.put(key, iterator);
|
||||
return;
|
||||
}
|
||||
MutableObject<IteratorEntry<K, V>> mutableBoolean = new MutableObject<>(null);
|
||||
|
||||
// Expects that reversed iterator returns itself when reversed again
|
||||
var oursPrio = _iterators.get(_goingForward ? iterator : iterator.reversed());
|
||||
var them = _sortedIterators.get(key);
|
||||
var theirsPrio = _iterators.get(_goingForward ? them : them.reversed());
|
||||
if (oursPrio < theirsPrio) {
|
||||
_sortedIterators.put(key, iterator);
|
||||
advanceIterator(them);
|
||||
} else {
|
||||
Log.tracev("{0} Skipped: {1}", _name, iterator.peekNextKey());
|
||||
iterator.skip();
|
||||
advanceIterator(iterator);
|
||||
var newVal = _sortedIterators.merge(key, iteratorEntry, (theirsEntry, oldValOurs) -> {
|
||||
var oursPrio = oldValOurs.priority();
|
||||
var theirsPrio = theirsEntry.priority();
|
||||
|
||||
if (oursPrio < theirsPrio) {
|
||||
mutableBoolean.setValue(theirsEntry);
|
||||
return oldValOurs;
|
||||
// advance them
|
||||
// return
|
||||
} else {
|
||||
return theirsEntry;
|
||||
// skip, continue
|
||||
}
|
||||
});
|
||||
|
||||
if (newVal != iteratorEntry) {
|
||||
Log.tracev("{0} Skipped: {1}", _name, iteratorEntry.iterator().peekNextKey());
|
||||
iteratorEntry.iterator().skip();
|
||||
continue;
|
||||
}
|
||||
|
||||
if (mutableBoolean.getValue() != null) {
|
||||
advanceIterator(mutableBoolean.getValue());
|
||||
return;
|
||||
}
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -120,7 +144,7 @@ public class MergingKvIterator<K extends Comparable<K>, V> extends ReversibleKvI
|
||||
Log.tracev("{0} Reversing from {1}", _name, cur);
|
||||
_goingForward = !_goingForward;
|
||||
_sortedIterators.clear();
|
||||
for (CloseableKvIterator<K, V> iterator : _iterators.keySet()) {
|
||||
for (IteratorEntry<K, V> iterator : _iterators) {
|
||||
// _goingForward inverted already
|
||||
advanceIterator(!_goingForward ? iterator.reversed() : iterator);
|
||||
}
|
||||
@@ -150,7 +174,7 @@ public class MergingKvIterator<K extends Comparable<K>, V> extends ReversibleKvI
|
||||
if (cur == null) {
|
||||
throw new NoSuchElementException();
|
||||
}
|
||||
cur.getValue().skip();
|
||||
cur.getValue().iterator().skip();
|
||||
advanceIterator(cur.getValue());
|
||||
Log.tracev("{0} Skip: {1}, next: {2}", _name, cur, _sortedIterators);
|
||||
}
|
||||
@@ -166,7 +190,7 @@ public class MergingKvIterator<K extends Comparable<K>, V> extends ReversibleKvI
|
||||
if (cur == null) {
|
||||
throw new NoSuchElementException();
|
||||
}
|
||||
var curVal = cur.getValue().next();
|
||||
var curVal = cur.getValue().iterator().next();
|
||||
advanceIterator(cur.getValue());
|
||||
// Log.tracev("{0} Read from {1}: {2}, next: {3}", _name, cur.getValue(), curVal, _sortedIterators.keySet());
|
||||
return curVal;
|
||||
@@ -174,8 +198,8 @@ public class MergingKvIterator<K extends Comparable<K>, V> extends ReversibleKvI
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
for (CloseableKvIterator<K, V> iterator : _iterators.keySet()) {
|
||||
iterator.close();
|
||||
for (IteratorEntry<K, V> iterator : _iterators) {
|
||||
iterator.iterator().close();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -9,22 +9,22 @@ public class NavigableMapKvIterator<K extends Comparable<K>, V> extends Reversib
|
||||
private Iterator<Map.Entry<K, V>> _iterator;
|
||||
private Map.Entry<K, V> _next;
|
||||
|
||||
public NavigableMapKvIterator(NavigableMap<K, V> map, IteratorStart start, K key) {
|
||||
_map = map;
|
||||
public NavigableMapKvIterator(NavigableMap<K, ? extends V> map, IteratorStart start, K key) {
|
||||
_map = (NavigableMap<K, V>) map;
|
||||
SortedMap<K, V> _view;
|
||||
_goingForward = true;
|
||||
switch (start) {
|
||||
case GE -> _view = map.tailMap(key, true);
|
||||
case GT -> _view = map.tailMap(key, false);
|
||||
case GE -> _view = _map.tailMap(key, true);
|
||||
case GT -> _view = _map.tailMap(key, false);
|
||||
case LE -> {
|
||||
var floorKey = map.floorKey(key);
|
||||
var floorKey = _map.floorKey(key);
|
||||
if (floorKey == null) _view = _map;
|
||||
else _view = map.tailMap(floorKey, true);
|
||||
else _view = _map.tailMap(floorKey, true);
|
||||
}
|
||||
case LT -> {
|
||||
var lowerKey = map.lowerKey(key);
|
||||
if (lowerKey == null) _view = _map;
|
||||
else _view = map.tailMap(lowerKey, true);
|
||||
else _view = _map.tailMap(lowerKey, true);
|
||||
}
|
||||
default -> throw new IllegalArgumentException("Unknown start type");
|
||||
}
|
||||
|
||||
@@ -2,9 +2,5 @@ package com.usatiuk.objects.iterators;
|
||||
|
||||
import java.util.Optional;
|
||||
|
||||
public record Tombstone<V>() implements MaybeTombstone<V> {
|
||||
@Override
|
||||
public Optional<V> opt() {
|
||||
return Optional.empty();
|
||||
}
|
||||
public interface Tombstone<V> extends MaybeTombstone<V> {
|
||||
}
|
||||
|
||||
@@ -0,0 +1,4 @@
|
||||
package com.usatiuk.objects.iterators;
|
||||
|
||||
public record TombstoneImpl<V>() implements Tombstone<V> {
|
||||
}
|
||||
@@ -16,7 +16,7 @@ public class TombstoneMergingKvIterator<K extends Comparable<K>, V> implements C
|
||||
startType, startKey,
|
||||
pair -> {
|
||||
Log.tracev("{0} - Processing pair {1}", _name, pair);
|
||||
if (pair instanceof Tombstone) {
|
||||
if (pair instanceof Tombstone<V>) {
|
||||
return null;
|
||||
}
|
||||
return ((Data<V>) pair).value();
|
||||
|
||||
@@ -6,13 +6,14 @@ import com.usatiuk.objects.stores.WritebackObjectPersistentStore;
|
||||
import com.usatiuk.objects.transaction.TxRecord;
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Singleton;
|
||||
|
||||
import javax.annotation.Nonnull;
|
||||
import java.util.Collection;
|
||||
import java.util.Optional;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
@ApplicationScoped
|
||||
@Singleton
|
||||
public class SnapshotManager {
|
||||
@Inject
|
||||
WritebackObjectPersistentStore writebackStore;
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package com.usatiuk.objects.stores;
|
||||
|
||||
import com.usatiuk.objects.JDataVersionedWrapper;
|
||||
import com.usatiuk.objects.JDataVersionedWrapperLazy;
|
||||
import com.usatiuk.objects.JObjectKey;
|
||||
import com.usatiuk.objects.iterators.*;
|
||||
import com.usatiuk.objects.snapshot.Snapshot;
|
||||
@@ -33,10 +34,9 @@ public class CachingObjectPersistentStore {
|
||||
long version,
|
||||
int sizeLimit) {
|
||||
public Cache withPut(JObjectKey key, Optional<JDataVersionedWrapper> obj) {
|
||||
int objSize = obj.map(JDataVersionedWrapper::estimateSize).orElse(16);
|
||||
var entry = obj.<CacheEntry>map(o -> new CacheEntryPresent(o, o.estimateSize())).orElse(new CacheEntryMiss());
|
||||
|
||||
int newSize = size() + objSize;
|
||||
var entry = new CacheEntry(obj.<MaybeTombstone<JDataVersionedWrapper>>map(Data::new).orElse(new Tombstone<>()), objSize);
|
||||
int newSize = size() + entry.size();
|
||||
|
||||
var old = map.get(key);
|
||||
if (old != null)
|
||||
@@ -141,10 +141,11 @@ public class CachingObjectPersistentStore {
|
||||
Cache finalCurCache = curCache;
|
||||
return new Snapshot<JObjectKey, JDataVersionedWrapper>() {
|
||||
private boolean _invalid = false;
|
||||
private boolean _closed = false;
|
||||
private final Cache _curCache = finalCurCache;
|
||||
private final Snapshot<JObjectKey, JDataVersionedWrapper> _backing = finalBacking;
|
||||
|
||||
private void maybeCache(JObjectKey key, Optional<JDataVersionedWrapper> obj) {
|
||||
private void doCache(JObjectKey key, Optional<JDataVersionedWrapper> obj) {
|
||||
_cacheTries.incrementAndGet();
|
||||
if (_invalid)
|
||||
return;
|
||||
@@ -160,18 +161,34 @@ public class CachingObjectPersistentStore {
|
||||
_cached.incrementAndGet();
|
||||
}
|
||||
|
||||
private void maybeCache(JObjectKey key, Optional<JDataVersionedWrapper> obj) {
|
||||
if (obj.isEmpty()) {
|
||||
doCache(key, obj);
|
||||
return;
|
||||
}
|
||||
|
||||
var wrapper = obj.get();
|
||||
|
||||
if (!(wrapper instanceof JDataVersionedWrapperLazy lazy)) {
|
||||
doCache(key, obj);
|
||||
return;
|
||||
}
|
||||
|
||||
lazy.setCacheCallback(() -> {
|
||||
if (_closed) {
|
||||
Log.error("Cache callback called after close");
|
||||
System.exit(-1);
|
||||
}
|
||||
doCache(key, obj);
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
@Override
|
||||
public CloseableKvIterator<JObjectKey, JDataVersionedWrapper> getIterator(IteratorStart start, JObjectKey key) {
|
||||
return new TombstoneMergingKvIterator<>("cache", start, key,
|
||||
(mS, mK)
|
||||
-> new MappingKvIterator<>(
|
||||
new NavigableMapKvIterator<>(_curCache.map(), mS, mK),
|
||||
e -> {
|
||||
// Log.tracev("Taken from cache: {0}", e);
|
||||
return e.object();
|
||||
}
|
||||
),
|
||||
(mS, mK) -> new MappingKvIterator<>(new CachingKvIterator(_backing.getIterator(start, key)), Data::new));
|
||||
(mS, mK) -> new NavigableMapKvIterator<JObjectKey, MaybeTombstone<JDataVersionedWrapper>>(_curCache.map(), mS, mK),
|
||||
(mS, mK) -> new CachingKvIterator(_backing.getIterator(start, key)));
|
||||
}
|
||||
|
||||
@Nonnull
|
||||
@@ -179,12 +196,12 @@ public class CachingObjectPersistentStore {
|
||||
public Optional<JDataVersionedWrapper> readObject(JObjectKey name) {
|
||||
var cached = _curCache.map().get(name);
|
||||
if (cached != null) {
|
||||
return switch (cached.object()) {
|
||||
case Data<JDataVersionedWrapper> data -> Optional.of(data.value());
|
||||
case Tombstone<JDataVersionedWrapper> tombstone -> {
|
||||
return switch (cached) {
|
||||
case CacheEntryPresent data -> Optional.of(data.value());
|
||||
case CacheEntryMiss tombstone -> {
|
||||
yield Optional.empty();
|
||||
}
|
||||
default -> throw new IllegalStateException("Unexpected value: " + cached.object());
|
||||
default -> throw new IllegalStateException("Unexpected value: " + cached);
|
||||
};
|
||||
}
|
||||
var read = _backing.readObject(name);
|
||||
@@ -199,10 +216,11 @@ public class CachingObjectPersistentStore {
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
_closed = true;
|
||||
_backing.close();
|
||||
}
|
||||
|
||||
private class CachingKvIterator implements CloseableKvIterator<JObjectKey, JDataVersionedWrapper> {
|
||||
private class CachingKvIterator implements CloseableKvIterator<JObjectKey, MaybeTombstone<JDataVersionedWrapper>> {
|
||||
private final CloseableKvIterator<JObjectKey, JDataVersionedWrapper> _delegate;
|
||||
|
||||
private CachingKvIterator(CloseableKvIterator<JObjectKey, JDataVersionedWrapper> delegate) {
|
||||
@@ -235,10 +253,10 @@ public class CachingObjectPersistentStore {
|
||||
}
|
||||
|
||||
@Override
|
||||
public Pair<JObjectKey, JDataVersionedWrapper> prev() {
|
||||
public Pair<JObjectKey, MaybeTombstone<JDataVersionedWrapper>> prev() {
|
||||
var prev = _delegate.prev();
|
||||
maybeCache(prev.getKey(), Optional.of(prev.getValue()));
|
||||
return prev;
|
||||
return (Pair<JObjectKey, MaybeTombstone<JDataVersionedWrapper>>) (Pair<JObjectKey, ?>) prev;
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -252,10 +270,10 @@ public class CachingObjectPersistentStore {
|
||||
}
|
||||
|
||||
@Override
|
||||
public Pair<JObjectKey, JDataVersionedWrapper> next() {
|
||||
public Pair<JObjectKey, MaybeTombstone<JDataVersionedWrapper>> next() {
|
||||
var next = _delegate.next();
|
||||
maybeCache(next.getKey(), Optional.of(next.getValue()));
|
||||
return next;
|
||||
return (Pair<JObjectKey, MaybeTombstone<JDataVersionedWrapper>>) (Pair<JObjectKey, ?>) next;
|
||||
}
|
||||
}
|
||||
};
|
||||
@@ -268,6 +286,18 @@ public class CachingObjectPersistentStore {
|
||||
}
|
||||
}
|
||||
|
||||
private record CacheEntry(MaybeTombstone<JDataVersionedWrapper> object, int size) {
|
||||
private interface CacheEntry extends MaybeTombstone<JDataVersionedWrapper> {
|
||||
int size();
|
||||
}
|
||||
|
||||
private record CacheEntryPresent(JDataVersionedWrapper value,
|
||||
int size) implements CacheEntry, Data<JDataVersionedWrapper> {
|
||||
}
|
||||
|
||||
private record CacheEntryMiss() implements CacheEntry, Tombstone<JDataVersionedWrapper> {
|
||||
@Override
|
||||
public int size() {
|
||||
return 64;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,6 +2,7 @@ package com.usatiuk.objects.stores;
|
||||
|
||||
import com.google.protobuf.ByteString;
|
||||
import com.google.protobuf.UnsafeByteOperations;
|
||||
import com.usatiuk.dhfs.utils.RefcountedCloseable;
|
||||
import com.usatiuk.objects.JObjectKey;
|
||||
import com.usatiuk.objects.JObjectKeyMax;
|
||||
import com.usatiuk.objects.JObjectKeyMin;
|
||||
@@ -10,8 +11,6 @@ import com.usatiuk.objects.iterators.IteratorStart;
|
||||
import com.usatiuk.objects.iterators.KeyPredicateKvIterator;
|
||||
import com.usatiuk.objects.iterators.ReversibleKvIterator;
|
||||
import com.usatiuk.objects.snapshot.Snapshot;
|
||||
import com.usatiuk.dhfs.supportlib.UninitializedByteBuffer;
|
||||
import com.usatiuk.dhfs.utils.RefcountedCloseable;
|
||||
import io.quarkus.arc.properties.IfBuildProperty;
|
||||
import io.quarkus.logging.Log;
|
||||
import io.quarkus.runtime.ShutdownEvent;
|
||||
@@ -30,7 +29,6 @@ import java.lang.ref.Cleaner;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.nio.file.Path;
|
||||
import java.util.Arrays;
|
||||
import java.util.NoSuchElementException;
|
||||
import java.util.Optional;
|
||||
|
||||
@@ -41,7 +39,17 @@ import static org.lmdbjava.Env.create;
|
||||
@IfBuildProperty(name = "dhfs.objects.persistence", stringValue = "lmdb")
|
||||
public class LmdbObjectPersistentStore implements ObjectPersistentStore {
|
||||
private static final String DB_NAME = "objects";
|
||||
private static final byte[] DB_VER_OBJ_NAME = "__DB_VER_OBJ".getBytes(StandardCharsets.UTF_8);
|
||||
private static final String DB_VER_OBJ_NAME_STR = "__DB_VER_OBJ";
|
||||
private static final ByteBuffer DB_VER_OBJ_NAME;
|
||||
|
||||
static {
|
||||
byte[] tmp = DB_VER_OBJ_NAME_STR.getBytes(StandardCharsets.ISO_8859_1);
|
||||
var bb = ByteBuffer.allocateDirect(tmp.length);
|
||||
bb.put(tmp);
|
||||
bb.flip();
|
||||
DB_VER_OBJ_NAME = bb.asReadOnlyBuffer();
|
||||
}
|
||||
|
||||
private final Path _root;
|
||||
private Env<ByteBuffer> _env;
|
||||
private Dbi<ByteBuffer> _db;
|
||||
@@ -67,13 +75,10 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore {
|
||||
if (read.isPresent()) {
|
||||
Log.infov("Read tx id {0}", read.get());
|
||||
} else {
|
||||
var bb = ByteBuffer.allocateDirect(DB_VER_OBJ_NAME.length);
|
||||
bb.put(DB_VER_OBJ_NAME);
|
||||
bb.flip();
|
||||
var bbData = ByteBuffer.allocateDirect(8);
|
||||
bbData.putLong(0);
|
||||
bbData.flip();
|
||||
_db.put(txn, bb, bbData);
|
||||
_db.put(txn, DB_VER_OBJ_NAME.asReadOnlyBuffer(), bbData);
|
||||
txn.commit();
|
||||
}
|
||||
}
|
||||
@@ -82,10 +87,7 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore {
|
||||
}
|
||||
|
||||
private Optional<Long> readTxId(Txn<ByteBuffer> txn) {
|
||||
var bb = ByteBuffer.allocateDirect(DB_VER_OBJ_NAME.length);
|
||||
bb.put(DB_VER_OBJ_NAME);
|
||||
bb.flip();
|
||||
var value = _db.get(txn, bb);
|
||||
var value = _db.get(txn, DB_VER_OBJ_NAME.asReadOnlyBuffer());
|
||||
return Optional.ofNullable(value).map(ByteBuffer::getLong);
|
||||
}
|
||||
|
||||
@@ -121,7 +123,7 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore {
|
||||
@Override
|
||||
public CloseableKvIterator<JObjectKey, ByteString> getIterator(IteratorStart start, JObjectKey key) {
|
||||
assert !_closed;
|
||||
return new KeyPredicateKvIterator<>(new LmdbKvIterator(_txn.ref(), start, key), start, key, (k) -> !Arrays.equals(k.value().getBytes(StandardCharsets.UTF_8), DB_VER_OBJ_NAME));
|
||||
return new KeyPredicateKvIterator<>(new LmdbKvIterator(_txn.ref(), start, key), start, key, (k) -> !k.value().equals(DB_VER_OBJ_NAME_STR));
|
||||
}
|
||||
|
||||
@Nonnull
|
||||
@@ -129,12 +131,7 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore {
|
||||
public Optional<ByteString> readObject(JObjectKey name) {
|
||||
assert !_closed;
|
||||
var got = _db.get(_txn.get(), name.toByteBuffer());
|
||||
var ret = Optional.ofNullable(got).map(read -> {
|
||||
var uninitBb = UninitializedByteBuffer.allocateUninitialized(got.remaining());
|
||||
uninitBb.put(got);
|
||||
uninitBb.flip();
|
||||
return UnsafeByteOperations.unsafeWrap(uninitBb);
|
||||
});
|
||||
var ret = Optional.ofNullable(got).map(UnsafeByteOperations::unsafeWrap);
|
||||
return ret;
|
||||
}
|
||||
|
||||
@@ -168,13 +165,10 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore {
|
||||
|
||||
assert txId > readTxId(txn).orElseThrow();
|
||||
|
||||
var bb = ByteBuffer.allocateDirect(DB_VER_OBJ_NAME.length);
|
||||
bb.put(DB_VER_OBJ_NAME);
|
||||
bb.flip();
|
||||
var bbData = ByteBuffer.allocateDirect(8);
|
||||
bbData.putLong(txId);
|
||||
bbData.flip();
|
||||
_db.put(txn, bb, bbData);
|
||||
_db.put(txn, DB_VER_OBJ_NAME.asReadOnlyBuffer(), bbData);
|
||||
} catch (Throwable t) {
|
||||
txn.close();
|
||||
throw t;
|
||||
@@ -360,13 +354,9 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore {
|
||||
throw new NoSuchElementException("No more elements");
|
||||
}
|
||||
// TODO: Right now with java serialization it doesn't matter, it's all copied to arrays anyway
|
||||
// var val = _cursor.val();
|
||||
// var bbDirect = UninitializedByteBuffer.allocateUninitialized(val.remaining());
|
||||
// bbDirect.put(val);
|
||||
// bbDirect.flip();
|
||||
// var bs = UnsafeByteOperations.unsafeWrap(bbDirect);
|
||||
// var ret = Pair.of(JObjectKey.fromByteBuffer(_cursor.key()), bs);
|
||||
var ret = Pair.of(JObjectKey.fromByteBuffer(_cursor.key()), ByteString.copyFrom(_cursor.val()));
|
||||
var val = _cursor.val();
|
||||
var bs = UnsafeByteOperations.unsafeWrap(val);
|
||||
var ret = Pair.of(JObjectKey.fromByteBuffer(_cursor.key()), bs);
|
||||
if (_goingForward)
|
||||
_hasNext = _cursor.next();
|
||||
else
|
||||
|
||||
@@ -1,6 +1,9 @@
|
||||
package com.usatiuk.objects.stores;
|
||||
|
||||
import com.usatiuk.objects.JDataVersionedWrapper;
|
||||
import com.usatiuk.objects.JObjectKey;
|
||||
import com.usatiuk.objects.iterators.Tombstone;
|
||||
|
||||
public record PendingDelete(JObjectKey key, long bundleId) implements PendingWriteEntry {
|
||||
public record PendingDelete(JObjectKey key,
|
||||
long bundleId) implements PendingWriteEntry, Tombstone<JDataVersionedWrapper> {
|
||||
}
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package com.usatiuk.objects.stores;
|
||||
|
||||
import com.usatiuk.objects.JDataVersionedWrapper;
|
||||
import com.usatiuk.objects.iterators.Data;
|
||||
|
||||
public record PendingWrite(JDataVersionedWrapper data, long bundleId) implements PendingWriteEntry {
|
||||
public record PendingWrite(JDataVersionedWrapper value, long bundleId) implements PendingWriteEntry, Data<JDataVersionedWrapper> {
|
||||
}
|
||||
|
||||
@@ -1,5 +1,8 @@
|
||||
package com.usatiuk.objects.stores;
|
||||
|
||||
public interface PendingWriteEntry {
|
||||
import com.usatiuk.objects.JDataVersionedWrapper;
|
||||
import com.usatiuk.objects.iterators.MaybeTombstone;
|
||||
|
||||
public interface PendingWriteEntry extends MaybeTombstone<JDataVersionedWrapper> {
|
||||
long bundleId();
|
||||
}
|
||||
|
||||
@@ -3,7 +3,6 @@ package com.usatiuk.objects.stores;
|
||||
import com.google.protobuf.ByteString;
|
||||
import com.usatiuk.objects.JDataVersionedWrapper;
|
||||
import com.usatiuk.objects.JObjectKey;
|
||||
import com.usatiuk.objects.JObjectKeyImpl;
|
||||
import com.usatiuk.objects.ObjectSerializer;
|
||||
import com.usatiuk.objects.iterators.CloseableKvIterator;
|
||||
import com.usatiuk.objects.iterators.IteratorStart;
|
||||
|
||||
@@ -350,15 +350,9 @@ public class WritebackObjectPersistentStore {
|
||||
|
||||
@Override
|
||||
public CloseableKvIterator<JObjectKey, JDataVersionedWrapper> getIterator(IteratorStart start, JObjectKey key) {
|
||||
return new TombstoneMergingKvIterator<>("writeback-ps", start, key,
|
||||
(tS, tK) -> new MappingKvIterator<>(
|
||||
new NavigableMapKvIterator<>(_pendingWrites, tS, tK),
|
||||
e -> switch (e) {
|
||||
case PendingWrite pw -> new Data<>(pw.data());
|
||||
case PendingDelete d -> new Tombstone<>();
|
||||
default -> throw new IllegalStateException("Unexpected value: " + e);
|
||||
}),
|
||||
(tS, tK) -> new MappingKvIterator<>(_cache.getIterator(tS, tK), Data::new));
|
||||
return new TombstoneMergingKvIterator<JObjectKey, JDataVersionedWrapper>("writeback-ps", start, key,
|
||||
(tS, tK) -> new NavigableMapKvIterator<>(_pendingWrites, tS, tK),
|
||||
(tS, tK) -> (CloseableKvIterator<JObjectKey, MaybeTombstone<JDataVersionedWrapper>>) (CloseableKvIterator<JObjectKey, ?>) _cache.getIterator(tS, tK));
|
||||
}
|
||||
|
||||
@Nonnull
|
||||
@@ -367,7 +361,7 @@ public class WritebackObjectPersistentStore {
|
||||
var cached = _pendingWrites.get(name);
|
||||
if (cached != null) {
|
||||
return switch (cached) {
|
||||
case PendingWrite c -> Optional.of(c.data());
|
||||
case PendingWrite c -> Optional.of(c.value());
|
||||
case PendingDelete d -> {
|
||||
yield Optional.empty();
|
||||
}
|
||||
|
||||
@@ -1,11 +1,12 @@
|
||||
package com.usatiuk.objects.transaction;
|
||||
|
||||
import com.google.common.collect.Streams;
|
||||
import com.usatiuk.dhfs.utils.AutoCloseableNoThrow;
|
||||
import com.usatiuk.objects.JData;
|
||||
import com.usatiuk.objects.JDataVersionedWrapper;
|
||||
import com.usatiuk.objects.JObjectKey;
|
||||
import com.usatiuk.objects.snapshot.Snapshot;
|
||||
import com.usatiuk.objects.snapshot.SnapshotManager;
|
||||
import com.usatiuk.dhfs.utils.AutoCloseableNoThrow;
|
||||
import io.quarkus.logging.Log;
|
||||
import io.quarkus.runtime.StartupEvent;
|
||||
import jakarta.annotation.Priority;
|
||||
@@ -13,20 +14,22 @@ import jakarta.enterprise.context.ApplicationScoped;
|
||||
import jakarta.enterprise.event.Observes;
|
||||
import jakarta.enterprise.inject.Instance;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Singleton;
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
// Manages all access to com.usatiuk.objects.JData objects.
|
||||
// In particular, it serves as a source of truth for what is committed to the backing storage.
|
||||
// All data goes through it, it is responsible for transaction atomicity
|
||||
// TODO: persistent tx id
|
||||
@ApplicationScoped
|
||||
public class JObjectManager {
|
||||
private final List<PreCommitTxHook> _preCommitTxHooks;
|
||||
|
||||
private record CommitHookIterationData(PreCommitTxHook hook,
|
||||
Map<JObjectKey, TxRecord.TxObjectRecord<?>> lastWrites,
|
||||
Map<JObjectKey, TxRecord.TxObjectRecord<?>> pendingWrites) {
|
||||
}
|
||||
|
||||
@Inject
|
||||
SnapshotManager snapshotManager;
|
||||
@Inject
|
||||
@@ -57,25 +60,27 @@ public class JObjectManager {
|
||||
|
||||
public Pair<Collection<Runnable>, TransactionHandle> commit(TransactionPrivate tx) {
|
||||
verifyReady();
|
||||
var writes = new LinkedHashMap<JObjectKey, TxRecord.TxObjectRecord<?>>();
|
||||
var dependenciesLocked = new LinkedHashMap<JObjectKey, Optional<JDataVersionedWrapper>>();
|
||||
var writes = new HashMap<JObjectKey, TxRecord.TxObjectRecord<?>>();
|
||||
Snapshot<JObjectKey, JDataVersionedWrapper> commitSnapshot = null;
|
||||
Map<JObjectKey, TransactionObject<?>> readSet;
|
||||
var toUnlock = new ArrayList<AutoCloseableNoThrow>();
|
||||
Map<JObjectKey, TransactionObject<?>> readSet = null;
|
||||
Collection<AutoCloseableNoThrow> toUnlock = null;
|
||||
|
||||
try {
|
||||
try {
|
||||
long pendingCount = 0;
|
||||
Map<PreCommitTxHook, Map<JObjectKey, TxRecord.TxObjectRecord<?>>> pendingWrites = Map.ofEntries(
|
||||
_preCommitTxHooks.stream().map(p -> Pair.of(p, new HashMap<>())).toArray(Pair[]::new)
|
||||
);
|
||||
Map<PreCommitTxHook, Map<JObjectKey, TxRecord.TxObjectRecord<?>>> lastWrites = Map.ofEntries(
|
||||
_preCommitTxHooks.stream().map(p -> Pair.of(p, new HashMap<>())).toArray(Pair[]::new)
|
||||
);
|
||||
List<CommitHookIterationData> hookIterationData;
|
||||
{
|
||||
CommitHookIterationData[] hookIterationDataArray = new CommitHookIterationData[_preCommitTxHooks.size()];
|
||||
for (int i = 0; i < _preCommitTxHooks.size(); i++) {
|
||||
var hook = _preCommitTxHooks.get(i);
|
||||
hookIterationDataArray[i] = new CommitHookIterationData(hook, new HashMap<>(), new HashMap<>());
|
||||
}
|
||||
hookIterationData = List.of(hookIterationDataArray);
|
||||
}
|
||||
|
||||
for (var n : tx.drainNewWrites()) {
|
||||
for (var hookPut : _preCommitTxHooks) {
|
||||
pendingWrites.get(hookPut).put(n.key(), n);
|
||||
for (var hookPut : hookIterationData) {
|
||||
hookPut.pendingWrites().put(n.key(), n);
|
||||
pendingCount++;
|
||||
}
|
||||
writes.put(n.key(), n);
|
||||
@@ -87,9 +92,10 @@ public class JObjectManager {
|
||||
// For example, when a hook makes changes to an object, and another hook changes the object before/after it
|
||||
// on the next iteration, the first hook should receive the version of the object it had created
|
||||
// as the "old" version, and the new version with all the changes after it.
|
||||
do {
|
||||
for (var hook : _preCommitTxHooks) {
|
||||
var lastCurHookSeen = lastWrites.get(hook);
|
||||
while (pendingCount > 0) {
|
||||
for (var hookId : hookIterationData) {
|
||||
var hook = hookId.hook();
|
||||
var lastCurHookSeen = hookId.lastWrites();
|
||||
Function<JObjectKey, JData> getPrev =
|
||||
key -> switch (lastCurHookSeen.get(key)) {
|
||||
case TxRecord.TxObjectRecordWrite<?> write -> write.data();
|
||||
@@ -100,7 +106,7 @@ public class JObjectManager {
|
||||
}
|
||||
};
|
||||
|
||||
var curIteration = pendingWrites.get(hook);
|
||||
var curIteration = hookId.pendingWrites();
|
||||
|
||||
// Log.trace("Commit iteration with " + curIteration.size() + " records for hook " + hook.getClass());
|
||||
|
||||
@@ -127,23 +133,23 @@ public class JObjectManager {
|
||||
curIteration.clear();
|
||||
|
||||
for (var n : tx.drainNewWrites()) {
|
||||
for (var hookPut : _preCommitTxHooks) {
|
||||
if (hookPut == hook) {
|
||||
for (var hookPut : hookIterationData) {
|
||||
if (hookPut == hookId) {
|
||||
lastCurHookSeen.put(n.key(), n);
|
||||
continue;
|
||||
}
|
||||
var before = pendingWrites.get(hookPut).put(n.key(), n);
|
||||
var before = hookPut.pendingWrites().put(n.key(), n);
|
||||
if (before == null)
|
||||
pendingCount++;
|
||||
}
|
||||
writes.put(n.key(), n);
|
||||
}
|
||||
}
|
||||
} while (pendingCount > 0);
|
||||
}
|
||||
} catch (Throwable e) {
|
||||
for (var read : tx.reads().entrySet()) {
|
||||
if (read.getValue() instanceof TransactionObjectLocked<?> locked) {
|
||||
toUnlock.add(locked.lock());
|
||||
locked.lock().close();
|
||||
}
|
||||
}
|
||||
throw e;
|
||||
@@ -152,25 +158,34 @@ public class JObjectManager {
|
||||
readSet = tx.reads();
|
||||
|
||||
if (!writes.isEmpty()) {
|
||||
Stream.concat(readSet.keySet().stream(), writes.keySet().stream())
|
||||
.sorted(Comparator.comparing(JObjectKey::toString))
|
||||
.forEach(k -> {
|
||||
var lock = lockManager.lockObject(k);
|
||||
toUnlock.add(lock);
|
||||
});
|
||||
toUnlock = new ArrayList<>(readSet.size() + writes.size());
|
||||
ArrayList<JObjectKey> toLock = new ArrayList<>(readSet.size() + writes.size());
|
||||
for (var read : readSet.entrySet()) {
|
||||
if (read.getValue() instanceof TransactionObjectLocked<?> locked) {
|
||||
toUnlock.add(locked.lock());
|
||||
} else {
|
||||
toLock.add(read.getKey());
|
||||
}
|
||||
}
|
||||
for (var write : writes.entrySet()) {
|
||||
toLock.add(write.getKey());
|
||||
}
|
||||
Collections.sort(toLock);
|
||||
for (var key : toLock) {
|
||||
var lock = lockManager.lockObject(key);
|
||||
toUnlock.add(lock);
|
||||
}
|
||||
|
||||
commitSnapshot = snapshotManager.createSnapshot();
|
||||
}
|
||||
|
||||
for (var read : readSet.entrySet()) {
|
||||
if (read.getValue() instanceof TransactionObjectLocked<?> locked) {
|
||||
toUnlock.add(locked.lock());
|
||||
}
|
||||
}
|
||||
|
||||
if (writes.isEmpty()) {
|
||||
} else {
|
||||
Log.trace("Committing transaction - no changes");
|
||||
|
||||
for (var read : readSet.values()) {
|
||||
if (read instanceof TransactionObjectLocked<?> locked) {
|
||||
locked.lock().close();
|
||||
}
|
||||
}
|
||||
|
||||
return Pair.of(
|
||||
Stream.concat(
|
||||
tx.getOnCommit().stream(),
|
||||
@@ -189,24 +204,23 @@ public class JObjectManager {
|
||||
|
||||
if (snapshotId != commitSnapshot.id()) {
|
||||
for (var read : readSet.entrySet()) {
|
||||
dependenciesLocked.put(read.getKey(), commitSnapshot.readObject(read.getKey()));
|
||||
var dep = dependenciesLocked.get(read.getKey());
|
||||
var current = commitSnapshot.readObject(read.getKey());
|
||||
|
||||
if (dep.isEmpty() != read.getValue().data().isEmpty()) {
|
||||
if (current.isEmpty() != read.getValue().data().isEmpty()) {
|
||||
Log.tracev("Checking read dependency {0} - not found", read.getKey());
|
||||
throw new TxCommitException("Serialization hazard: " + dep.isEmpty() + " vs " + read.getValue().data().isEmpty());
|
||||
throw new TxCommitException("Serialization hazard: " + current.isEmpty() + " vs " + read.getValue().data().isEmpty());
|
||||
}
|
||||
|
||||
if (dep.isEmpty()) {
|
||||
if (current.isEmpty()) {
|
||||
// TODO: Every write gets a dependency due to hooks
|
||||
continue;
|
||||
// assert false;
|
||||
// throw new TxCommitException("Serialization hazard: " + dep.isEmpty() + " vs " + read.getValue().data().isEmpty());
|
||||
// throw new TxCommitException("Serialization hazard: " + dep.isEmpty() + " vs " + read.getValue().value().isEmpty());
|
||||
}
|
||||
|
||||
if (dep.get().version() > snapshotId) {
|
||||
if (current.get().version() > snapshotId) {
|
||||
Log.tracev("Checking dependency {0} - newer than", read.getKey());
|
||||
throw new TxCommitException("Serialization hazard: " + dep.get().data().key() + " " + dep.get().version() + " vs " + snapshotId);
|
||||
throw new TxCommitException("Serialization hazard: " + current.get().data().key() + " " + current.get().version() + " vs " + snapshotId);
|
||||
}
|
||||
|
||||
Log.tracev("Checking dependency {0} - ok with read", read.getKey());
|
||||
@@ -215,21 +229,7 @@ public class JObjectManager {
|
||||
Log.tracev("Skipped dependency checks: no changes");
|
||||
}
|
||||
|
||||
boolean same = snapshotId == commitSnapshot.id();
|
||||
|
||||
var addFlushCallback = snapshotManager.commitTx(
|
||||
writes.values().stream()
|
||||
.filter(r -> {
|
||||
if (!same)
|
||||
if (r instanceof TxRecord.TxObjectRecordWrite<?>(JData data)) {
|
||||
var dep = dependenciesLocked.get(data.key());
|
||||
if (dep.isPresent() && dep.get().version() > snapshotId) {
|
||||
Log.trace("Skipping write " + data.key() + " - dependency " + dep.get().version() + " vs " + snapshotId);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}).toList());
|
||||
var addFlushCallback = snapshotManager.commitTx(writes.values());
|
||||
|
||||
for (var callback : tx.getOnFlush()) {
|
||||
addFlushCallback.accept(callback);
|
||||
@@ -247,9 +247,10 @@ public class JObjectManager {
|
||||
Log.trace("Error when committing transaction", t);
|
||||
throw new TxCommitException(t.getMessage(), t);
|
||||
} finally {
|
||||
for (var unlock : toUnlock) {
|
||||
unlock.close();
|
||||
}
|
||||
if (toUnlock != null)
|
||||
for (var unlock : toUnlock) {
|
||||
unlock.close();
|
||||
}
|
||||
if (commitSnapshot != null)
|
||||
commitSnapshot.close();
|
||||
tx.close();
|
||||
@@ -265,31 +266,4 @@ public class JObjectManager {
|
||||
});
|
||||
tx.close();
|
||||
}
|
||||
|
||||
// private class TransactionObjectSourceImpl implements TransactionObjectSource {
|
||||
// private final long _txId;
|
||||
//
|
||||
// private TransactionObjectSourceImpl(long txId) {
|
||||
// _txId = txId;
|
||||
// }
|
||||
//
|
||||
// @Override
|
||||
// public <T extends JData> TransactionObject<T> get(Class<T> type, JObjectKey key) {
|
||||
// var got = getObj(type, key);
|
||||
// if (got.data().isPresent() && got.data().get().version() > _txId) {
|
||||
// throw new TxCommitException("Serialization race for " + key + ": " + got.data().get().version() + " vs " + _txId);
|
||||
// }
|
||||
// return got;
|
||||
// }
|
||||
//
|
||||
// @Override
|
||||
// public <T extends JData> TransactionObject<T> getWriteLocked(Class<T> type, JObjectKey key) {
|
||||
// var got = getObjLock(type, key);
|
||||
// if (got.data().isPresent() && got.data().get().version() > _txId) {
|
||||
// got.lock().close();
|
||||
// throw new TxCommitException("Serialization race for " + key + ": " + got.data().get().version() + " vs " + _txId);
|
||||
// }
|
||||
// return got;
|
||||
// }
|
||||
// }
|
||||
}
|
||||
@@ -6,8 +6,9 @@ import com.usatiuk.dhfs.utils.DataLocker;
|
||||
import jakarta.annotation.Nonnull;
|
||||
import jakarta.annotation.Nullable;
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
import jakarta.inject.Singleton;
|
||||
|
||||
@ApplicationScoped
|
||||
@Singleton
|
||||
public class LockManager {
|
||||
private final DataLocker _objLocker = new DataLocker();
|
||||
|
||||
|
||||
@@ -7,14 +7,14 @@ import com.usatiuk.objects.iterators.*;
|
||||
import com.usatiuk.objects.snapshot.Snapshot;
|
||||
import com.usatiuk.objects.snapshot.SnapshotManager;
|
||||
import io.quarkus.logging.Log;
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Singleton;
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
import org.eclipse.microprofile.config.inject.ConfigProperty;
|
||||
|
||||
import java.util.*;
|
||||
|
||||
@ApplicationScoped
|
||||
@Singleton
|
||||
public class TransactionFactoryImpl implements TransactionFactory {
|
||||
@Inject
|
||||
SnapshotManager snapshotManager;
|
||||
@@ -165,12 +165,12 @@ public class TransactionFactoryImpl implements TransactionFactory {
|
||||
(tS, tK) -> new MappingKvIterator<>(new NavigableMapKvIterator<>(_writes, tS, tK),
|
||||
t -> switch (t) {
|
||||
case TxRecord.TxObjectRecordWrite<?> write ->
|
||||
new Data<>(new ReadTrackingInternalCrapTx(write.data()));
|
||||
case TxRecord.TxObjectRecordDeleted deleted -> new Tombstone<>();
|
||||
new DataWrapper<>(new ReadTrackingInternalCrapTx(write.data()));
|
||||
case TxRecord.TxObjectRecordDeleted deleted -> new TombstoneImpl<>();
|
||||
case null, default -> null;
|
||||
}),
|
||||
(tS, tK) -> new MappingKvIterator<>(_snapshot.getIterator(tS, tK),
|
||||
d -> new Data<ReadTrackingInternalCrap>(new ReadTrackingInternalCrapSource(d)))));
|
||||
d -> new DataWrapper<ReadTrackingInternalCrap>(new ReadTrackingInternalCrapSource(d)))));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -17,30 +17,27 @@ public interface TransactionManager {
|
||||
return supplier.get();
|
||||
}
|
||||
|
||||
begin();
|
||||
T ret;
|
||||
try {
|
||||
ret = supplier.get();
|
||||
} catch (TxCommitException txCommitException) {
|
||||
rollback();
|
||||
if (tries == 0) {
|
||||
Log.error("Transaction commit failed", txCommitException);
|
||||
throw txCommitException;
|
||||
while (true) {
|
||||
begin();
|
||||
boolean commit = false;
|
||||
try {
|
||||
var ret = supplier.get();
|
||||
commit = true;
|
||||
commit();
|
||||
return ret;
|
||||
} catch (TxCommitException txCommitException) {
|
||||
if (!commit)
|
||||
rollback();
|
||||
if (tries == 0) {
|
||||
Log.error("Transaction commit failed", txCommitException);
|
||||
throw txCommitException;
|
||||
}
|
||||
tries--;
|
||||
} catch (Throwable e) {
|
||||
if (!commit)
|
||||
rollback();
|
||||
throw e;
|
||||
}
|
||||
return runTries(supplier, tries - 1);
|
||||
} catch (Throwable e) {
|
||||
rollback();
|
||||
throw e;
|
||||
}
|
||||
try {
|
||||
commit();
|
||||
return ret;
|
||||
} catch (TxCommitException txCommitException) {
|
||||
if (tries == 0) {
|
||||
Log.error("Transaction commit failed", txCommitException);
|
||||
throw txCommitException;
|
||||
}
|
||||
return runTries(supplier, tries - 1);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -55,29 +52,29 @@ public interface TransactionManager {
|
||||
};
|
||||
}
|
||||
|
||||
begin();
|
||||
try {
|
||||
fn.apply();
|
||||
} catch (TxCommitException txCommitException) {
|
||||
rollback();
|
||||
if (tries == 0) {
|
||||
Log.error("Transaction commit failed", txCommitException);
|
||||
throw txCommitException;
|
||||
while (true) {
|
||||
begin();
|
||||
boolean commit = false;
|
||||
try {
|
||||
fn.apply();
|
||||
commit = true;
|
||||
var ret = commit();
|
||||
return ret;
|
||||
} catch (TxCommitException txCommitException) {
|
||||
if (!commit)
|
||||
rollback();
|
||||
if (tries == 0) {
|
||||
Log.error("Transaction commit failed", txCommitException);
|
||||
throw txCommitException;
|
||||
}
|
||||
tries--;
|
||||
} catch (Throwable e) {
|
||||
if (!commit)
|
||||
rollback();
|
||||
throw e;
|
||||
}
|
||||
return runTries(fn, tries - 1);
|
||||
} catch (Throwable e) {
|
||||
rollback();
|
||||
throw e;
|
||||
}
|
||||
try {
|
||||
return commit();
|
||||
} catch (TxCommitException txCommitException) {
|
||||
if (tries == 0) {
|
||||
Log.error("Transaction commit failed", txCommitException);
|
||||
throw txCommitException;
|
||||
}
|
||||
return runTries(fn, tries - 1);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
default TransactionHandle run(VoidFn fn) {
|
||||
|
||||
@@ -0,0 +1,262 @@
|
||||
package com.usatiuk.objects.iterators;
|
||||
|
||||
import net.jqwik.api.*;
|
||||
import net.jqwik.api.state.Action;
|
||||
import net.jqwik.api.state.ActionChain;
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
|
||||
import java.util.*;
|
||||
|
||||
public class MergingKvIteratorPbtTest {
|
||||
static class MergingIteratorModel implements CloseableKvIterator<Integer, Integer> {
|
||||
private final CloseableKvIterator<Integer, Integer> mergedIterator;
|
||||
private final CloseableKvIterator<Integer, Integer> mergingIterator;
|
||||
|
||||
private MergingIteratorModel(List<List<Map.Entry<Integer, Integer>>> pairs, IteratorStart startType, Integer startKey) {
|
||||
TreeMap<Integer, Integer> perfectMerged = new TreeMap<>();
|
||||
for (List<Map.Entry<Integer, Integer>> list : pairs) {
|
||||
for (Map.Entry<Integer, Integer> pair : list) {
|
||||
perfectMerged.putIfAbsent(pair.getKey(), pair.getValue());
|
||||
}
|
||||
}
|
||||
mergedIterator = new NavigableMapKvIterator<>(perfectMerged, startType, startKey);
|
||||
mergingIterator = new MergingKvIterator<>("test", startType, startKey, pairs.stream().<IterProdFn<Integer, Integer>>map(
|
||||
list -> (IteratorStart start, Integer key) -> new NavigableMapKvIterator<>(new TreeMap<Integer, Integer>(Map.ofEntries(list.toArray(Map.Entry[]::new))), start, key)
|
||||
).toList());
|
||||
}
|
||||
|
||||
@Override
|
||||
public Integer peekNextKey() {
|
||||
var mergedKey = mergedIterator.peekNextKey();
|
||||
var mergingKey = mergingIterator.peekNextKey();
|
||||
Assertions.assertEquals(mergedKey, mergingKey);
|
||||
return mergedKey;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void skip() {
|
||||
mergedIterator.skip();
|
||||
mergingIterator.skip();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Integer peekPrevKey() {
|
||||
var mergedKey = mergedIterator.peekPrevKey();
|
||||
var mergingKey = mergingIterator.peekPrevKey();
|
||||
Assertions.assertEquals(mergedKey, mergingKey);
|
||||
return mergedKey;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Pair<Integer, Integer> prev() {
|
||||
var mergedKey = mergedIterator.prev();
|
||||
var mergingKey = mergingIterator.prev();
|
||||
Assertions.assertEquals(mergedKey, mergingKey);
|
||||
return mergedKey;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasPrev() {
|
||||
var mergedKey = mergedIterator.hasPrev();
|
||||
var mergingKey = mergingIterator.hasPrev();
|
||||
Assertions.assertEquals(mergedKey, mergingKey);
|
||||
return mergedKey;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void skipPrev() {
|
||||
mergedIterator.skipPrev();
|
||||
mergingIterator.skipPrev();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
mergedIterator.close();
|
||||
mergingIterator.close();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasNext() {
|
||||
var mergedKey = mergedIterator.hasNext();
|
||||
var mergingKey = mergingIterator.hasNext();
|
||||
Assertions.assertEquals(mergedKey, mergingKey);
|
||||
return mergedKey;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Pair<Integer, Integer> next() {
|
||||
var mergedKey = mergedIterator.next();
|
||||
var mergingKey = mergingIterator.next();
|
||||
Assertions.assertEquals(mergedKey, mergingKey);
|
||||
return mergedKey;
|
||||
}
|
||||
}
|
||||
|
||||
static class PeekNextKeyAction extends Action.JustMutate<MergingIteratorModel> {
|
||||
@Override
|
||||
public void mutate(MergingIteratorModel state) {
|
||||
state.peekNextKey();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean precondition(MergingIteratorModel state) {
|
||||
return state.hasNext();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String description() {
|
||||
return "Peek next key";
|
||||
}
|
||||
}
|
||||
|
||||
static class SkipAction extends Action.JustMutate<MergingIteratorModel> {
|
||||
@Override
|
||||
public void mutate(MergingIteratorModel state) {
|
||||
state.skip();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean precondition(MergingIteratorModel state) {
|
||||
return state.hasNext();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String description() {
|
||||
return "Skip next key";
|
||||
}
|
||||
}
|
||||
|
||||
static class PeekPrevKeyAction extends Action.JustMutate<MergingIteratorModel> {
|
||||
@Override
|
||||
public void mutate(MergingIteratorModel state) {
|
||||
state.peekPrevKey();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean precondition(MergingIteratorModel state) {
|
||||
return state.hasPrev();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String description() {
|
||||
return "Peek prev key";
|
||||
}
|
||||
}
|
||||
|
||||
static class SkipPrevAction extends Action.JustMutate<MergingIteratorModel> {
|
||||
@Override
|
||||
public void mutate(MergingIteratorModel state) {
|
||||
state.skipPrev();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean precondition(MergingIteratorModel state) {
|
||||
return state.hasPrev();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String description() {
|
||||
return "Skip prev key";
|
||||
}
|
||||
}
|
||||
|
||||
static class PrevAction extends Action.JustMutate<MergingIteratorModel> {
|
||||
@Override
|
||||
public void mutate(MergingIteratorModel state) {
|
||||
state.prev();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean precondition(MergingIteratorModel state) {
|
||||
return state.hasPrev();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String description() {
|
||||
return "Prev key";
|
||||
}
|
||||
}
|
||||
|
||||
static class NextAction extends Action.JustMutate<MergingIteratorModel> {
|
||||
@Override
|
||||
public void mutate(MergingIteratorModel state) {
|
||||
state.next();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean precondition(MergingIteratorModel state) {
|
||||
return state.hasNext();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String description() {
|
||||
return "Next key";
|
||||
}
|
||||
}
|
||||
|
||||
static class HasNextAction extends Action.JustMutate<MergingIteratorModel> {
|
||||
@Override
|
||||
public void mutate(MergingIteratorModel state) {
|
||||
state.hasNext();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean precondition(MergingIteratorModel state) {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String description() {
|
||||
return "Has next key";
|
||||
}
|
||||
}
|
||||
|
||||
static class HasPrevAction extends Action.JustMutate<MergingIteratorModel> {
|
||||
@Override
|
||||
public void mutate(MergingIteratorModel state) {
|
||||
state.hasPrev();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean precondition(MergingIteratorModel state) {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String description() {
|
||||
return "Has prev key";
|
||||
}
|
||||
}
|
||||
|
||||
@Property
|
||||
public void checkMergingIterator(@ForAll("actions") ActionChain<MergingIteratorModel> actions) {
|
||||
actions.run();
|
||||
}
|
||||
|
||||
@Provide
|
||||
Arbitrary<ActionChain<MergingIteratorModel>> actions(@ForAll("lists") List<List<Map.Entry<Integer, Integer>>> list,
|
||||
@ForAll IteratorStart iteratorStart, @ForAll("startKey") Integer startKey) {
|
||||
return ActionChain.startWith(() -> new MergingIteratorModel(list, iteratorStart, startKey))
|
||||
.withAction(new NextAction())
|
||||
.withAction(new PeekNextKeyAction())
|
||||
.withAction(new SkipAction())
|
||||
.withAction(new PeekPrevKeyAction())
|
||||
.withAction(new SkipPrevAction())
|
||||
.withAction(new PrevAction())
|
||||
.withAction(new HasNextAction())
|
||||
.withAction(new HasPrevAction());
|
||||
}
|
||||
|
||||
@Provide
|
||||
Arbitrary<List<List<Map.Entry<Integer, Integer>>>> lists() {
|
||||
return Arbitraries.entries(Arbitraries.integers().between(-50, 50), Arbitraries.integers().between(-50, 50))
|
||||
.list().uniqueElements(Map.Entry::getKey).ofMinSize(0).ofMaxSize(20)
|
||||
.list().ofMinSize(1).ofMaxSize(5);
|
||||
}
|
||||
|
||||
@Provide
|
||||
Arbitrary<Integer> startKey() {
|
||||
return Arbitraries.integers().between(-51, 51);
|
||||
}
|
||||
}
|
||||
@@ -16,7 +16,6 @@
|
||||
<module>dhfs-fuse</module>
|
||||
<module>dhfs-app</module>
|
||||
<module>kleppmanntree</module>
|
||||
<module>supportlib</module>
|
||||
<module>objects</module>
|
||||
<module>utils</module>
|
||||
</modules>
|
||||
@@ -36,13 +35,6 @@
|
||||
<dhfs.native-libs-dir>${project.parent.build.outputDirectory}/native</dhfs.native-libs-dir>
|
||||
</properties>
|
||||
|
||||
<repositories>
|
||||
<repository>
|
||||
<id>jitpack.io</id>
|
||||
<url>https://jitpack.io</url>
|
||||
</repository>
|
||||
</repositories>
|
||||
|
||||
<dependencyManagement>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
@@ -52,6 +44,12 @@
|
||||
<type>pom</type>
|
||||
<scope>import</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>net.jqwik</groupId>
|
||||
<artifactId>jqwik</artifactId>
|
||||
<version>1.9.2</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.projectlombok</groupId>
|
||||
<artifactId>lombok</artifactId>
|
||||
@@ -116,9 +114,6 @@
|
||||
<systemPropertyVariables>
|
||||
<java.util.logging.manager>org.jboss.logmanager.LogManager</java.util.logging.manager>
|
||||
<maven.home>${maven.home}</maven.home>
|
||||
<com.usatiuk.dhfs.supportlib.native-path>
|
||||
${dhfs.native-libs-dir}
|
||||
</com.usatiuk.dhfs.supportlib.native-path>
|
||||
</systemPropertyVariables>
|
||||
<argLine>
|
||||
--add-exports java.base/sun.nio.ch=ALL-UNNAMED
|
||||
@@ -186,7 +181,6 @@
|
||||
--initialize-at-run-time=com.usatiuk.dhfs.utils.DataLocker$Lock,
|
||||
--initialize-at-run-time=com.usatiuk.objects.stores.LmdbObjectPersistentStore$LmdbKvIterator,
|
||||
--initialize-at-run-time=com.usatiuk.objects.stores.LmdbObjectPersistentStore,
|
||||
--initialize-at-run-time=com.usatiuk.dhfs.supportlib.UninitializedByteBuffer,
|
||||
--initialize-at-run-time=com.google.protobuf.UnsafeUtil
|
||||
</quarkus.native.additional-build-args>
|
||||
</properties>
|
||||
|
||||
@@ -1,114 +0,0 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xmlns="http://maven.apache.org/POM/4.0.0"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<parent>
|
||||
<groupId>com.usatiuk.dhfs</groupId>
|
||||
<artifactId>parent</artifactId>
|
||||
<version>1.0-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<artifactId>supportlib</artifactId>
|
||||
|
||||
<properties>
|
||||
<cmake.download>false</cmake.download>
|
||||
</properties>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.apache.commons</groupId>
|
||||
<artifactId>commons-lang3</artifactId>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<artifactId>exec-maven-plugin</artifactId>
|
||||
<groupId>org.codehaus.mojo</groupId>
|
||||
<version>3.4.1</version>
|
||||
<executions>
|
||||
<execution>
|
||||
<id>CMake Configure</id>
|
||||
<phase>generate-sources</phase>
|
||||
<goals>
|
||||
<goal>exec</goal>
|
||||
</goals>
|
||||
<configuration>
|
||||
<executable>
|
||||
${project.parent.basedir}/../libdhfs_support/builder/cross-build.sh
|
||||
</executable>
|
||||
<arguments>
|
||||
<argument>configure</argument>
|
||||
<argument>${project.build.outputDirectory}/native-build</argument>
|
||||
<argument>${dhfs.native-libs-dir}</argument>
|
||||
</arguments>
|
||||
</configuration>
|
||||
</execution>
|
||||
<execution>
|
||||
<id>CMake Build</id>
|
||||
<phase>compile</phase>
|
||||
<goals>
|
||||
<goal>exec</goal>
|
||||
</goals>
|
||||
<configuration>
|
||||
<executable>
|
||||
${project.parent.basedir}/../libdhfs_support/builder/cross-build.sh
|
||||
</executable>
|
||||
<arguments>
|
||||
<argument>build</argument>
|
||||
<argument>${project.build.outputDirectory}/native-build</argument>
|
||||
<argument>${dhfs.native-libs-dir}</argument>
|
||||
</arguments>
|
||||
</configuration>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<groupId>com.googlecode.cmake-maven-project</groupId>
|
||||
<artifactId>cmake-maven-plugin</artifactId>
|
||||
<version>3.30.2-b1</version>
|
||||
<executions>
|
||||
<execution>
|
||||
<id>cmake-generate</id>
|
||||
<goals>
|
||||
<goal>generate</goal>
|
||||
</goals>
|
||||
<configuration>
|
||||
<sourcePath>
|
||||
${project.parent.basedir}/../libdhfs_support
|
||||
</sourcePath>
|
||||
<targetPath>
|
||||
${project.build.outputDirectory}/native-build-local
|
||||
</targetPath>
|
||||
<options>
|
||||
<option>
|
||||
-DJAVA_HOME=${java.home}
|
||||
</option>
|
||||
<option>
|
||||
-DDHFS_LIB_INSTALL=${dhfs.native-libs-dir}
|
||||
</option>
|
||||
</options>
|
||||
</configuration>
|
||||
</execution>
|
||||
<execution>
|
||||
<id>cmake-compile</id>
|
||||
<goals>
|
||||
<goal>compile</goal>
|
||||
</goals>
|
||||
<configuration>
|
||||
<target>
|
||||
install
|
||||
</target>
|
||||
<projectDirectory>
|
||||
${project.build.outputDirectory}/native-build-local
|
||||
</projectDirectory>
|
||||
</configuration>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
|
||||
</project>
|
||||
@@ -1,15 +0,0 @@
|
||||
package com.usatiuk.dhfs.supportlib;
|
||||
|
||||
import java.nio.file.Path;
|
||||
|
||||
class DhfsNativeLibFinder {
|
||||
static private final String LibName = "libdhfs_support";
|
||||
|
||||
static Path getLibPath() {
|
||||
var override = System.getProperty("com.usatiuk.dhfs.supportlib.native-path-override");
|
||||
if (override != null)
|
||||
return Path.of(override);
|
||||
return Path.of(System.getProperty("com.usatiuk.dhfs.supportlib.native-path"))
|
||||
.resolve(SysUtils.getLibPlatform() + "-" + SysUtils.getLibArch()).resolve(LibName + "." + SysUtils.getLibExtension());
|
||||
}
|
||||
}
|
||||
@@ -1,35 +0,0 @@
|
||||
package com.usatiuk.dhfs.supportlib;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.logging.Logger;
|
||||
|
||||
public class DhfsSupport {
|
||||
public static final int PAGE_SIZE;
|
||||
private static final Logger LOGGER = Logger.getLogger(DhfsSupport.class.getName());
|
||||
private static final DhfsSupportImpl IMPLEMENTATION;
|
||||
|
||||
static {
|
||||
DhfsSupportImpl tmp;
|
||||
try {
|
||||
System.load(DhfsNativeLibFinder.getLibPath().toAbsolutePath().toString());
|
||||
tmp = new DhfsSupportImplNative();
|
||||
} catch (Throwable e) {
|
||||
LOGGER.warning("Failed to load native libraries, using fallback: \n" + e);
|
||||
tmp = new DhfsSupportImplFallback();
|
||||
}
|
||||
IMPLEMENTATION = tmp;
|
||||
PAGE_SIZE = getPageSizeInternal();
|
||||
}
|
||||
|
||||
static long allocateUninitializedByteBuffer(ByteBuffer[] bb, int size) {
|
||||
return IMPLEMENTATION.allocateUninitializedByteBuffer(bb, size);
|
||||
}
|
||||
|
||||
static void releaseByteBuffer(long token) {
|
||||
IMPLEMENTATION.releaseByteBuffer(token);
|
||||
}
|
||||
|
||||
private static int getPageSizeInternal() {
|
||||
return IMPLEMENTATION.getPageSizeInternal();
|
||||
}
|
||||
}
|
||||
@@ -1,11 +0,0 @@
|
||||
package com.usatiuk.dhfs.supportlib;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
interface DhfsSupportImpl {
|
||||
long allocateUninitializedByteBuffer(ByteBuffer[] bb, int size);
|
||||
|
||||
void releaseByteBuffer(long token);
|
||||
|
||||
int getPageSizeInternal();
|
||||
}
|
||||
@@ -1,21 +0,0 @@
|
||||
package com.usatiuk.dhfs.supportlib;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
class DhfsSupportImplFallback implements DhfsSupportImpl {
|
||||
@Override
|
||||
public long allocateUninitializedByteBuffer(ByteBuffer[] bb, int size) {
|
||||
bb[0] = ByteBuffer.allocateDirect(size);
|
||||
return -1;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void releaseByteBuffer(long token) {
|
||||
// GC
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getPageSizeInternal() {
|
||||
return 4096; // FIXME:?
|
||||
}
|
||||
}
|
||||
@@ -1,20 +0,0 @@
|
||||
package com.usatiuk.dhfs.supportlib;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
class DhfsSupportImplNative implements DhfsSupportImpl {
|
||||
@Override
|
||||
public long allocateUninitializedByteBuffer(ByteBuffer[] bb, int size) {
|
||||
return DhfsSupportNative.allocateUninitializedByteBuffer(bb, size);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void releaseByteBuffer(long token) {
|
||||
DhfsSupportNative.releaseByteBuffer(token);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getPageSizeInternal() {
|
||||
return DhfsSupportNative.PAGE_SIZE;
|
||||
}
|
||||
}
|
||||
@@ -1,20 +0,0 @@
|
||||
package com.usatiuk.dhfs.supportlib;
|
||||
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
|
||||
class DhfsSupportNative {
|
||||
static public final int PAGE_SIZE;
|
||||
|
||||
static {
|
||||
System.load(DhfsNativeLibFinder.getLibPath().toAbsolutePath().toString());
|
||||
PAGE_SIZE = getPageSizeInternal();
|
||||
}
|
||||
|
||||
static native long allocateUninitializedByteBuffer(ByteBuffer[] bb, int size);
|
||||
|
||||
static native void releaseByteBuffer(long token);
|
||||
|
||||
private static native int getPageSizeInternal();
|
||||
}
|
||||
@@ -1,43 +0,0 @@
|
||||
package com.usatiuk.dhfs.supportlib;
|
||||
|
||||
import org.apache.commons.lang3.SystemUtils;
|
||||
|
||||
class SysUtils {
|
||||
static String getLibPlatform() {
|
||||
if (SystemUtils.IS_OS_MAC) {
|
||||
return "Darwin";
|
||||
} else if (SystemUtils.IS_OS_LINUX) {
|
||||
return "Linux";
|
||||
} else {
|
||||
throw new IllegalStateException("Unsupported OS: " + SystemUtils.OS_NAME);
|
||||
}
|
||||
}
|
||||
|
||||
static String getLibExtension() {
|
||||
if (SystemUtils.IS_OS_MAC) {
|
||||
return "dylib";
|
||||
} else if (SystemUtils.IS_OS_LINUX) {
|
||||
return "so";
|
||||
} else {
|
||||
throw new IllegalStateException("Unsupported OS: " + SystemUtils.OS_NAME);
|
||||
}
|
||||
}
|
||||
|
||||
static String getLibArch() {
|
||||
if (SystemUtils.IS_OS_MAC) {
|
||||
return switch (SystemUtils.OS_ARCH) {
|
||||
case "aarch64" -> "arm64";
|
||||
default -> throw new IllegalStateException("Unsupported architecture: " + SystemUtils.OS_ARCH);
|
||||
};
|
||||
} else if (SystemUtils.IS_OS_LINUX) {
|
||||
return switch (SystemUtils.OS_ARCH) {
|
||||
case "aarch64" -> "aarch64";
|
||||
case "amd64" -> "x86_64";
|
||||
default -> throw new IllegalStateException("Unsupported architecture: " + SystemUtils.OS_ARCH);
|
||||
};
|
||||
} else {
|
||||
throw new IllegalStateException("Unsupported OS: " + SystemUtils.OS_NAME);
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
@@ -1,32 +0,0 @@
|
||||
package com.usatiuk.dhfs.supportlib;
|
||||
|
||||
import java.lang.ref.Cleaner;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.logging.Logger;
|
||||
|
||||
public class UninitializedByteBuffer {
|
||||
private static final Cleaner CLEANER = Cleaner.create();
|
||||
private static final Logger LOGGER = Logger.getLogger(UninitializedByteBuffer.class.getName());
|
||||
|
||||
public static ByteBuffer allocateUninitialized(int size) {
|
||||
try {
|
||||
if (size < DhfsSupport.PAGE_SIZE)
|
||||
return ByteBuffer.allocateDirect(size);
|
||||
|
||||
var bb = new ByteBuffer[1];
|
||||
long token = DhfsSupport.allocateUninitializedByteBuffer(bb, size);
|
||||
var ret = bb[0];
|
||||
CLEANER.register(ret, () -> {
|
||||
try {
|
||||
DhfsSupport.releaseByteBuffer(token);
|
||||
} catch (Throwable e) {
|
||||
LOGGER.severe("Error releasing buffer: " + e);
|
||||
System.exit(-1);
|
||||
}
|
||||
});
|
||||
return ret;
|
||||
} catch (OutOfMemoryError e) {
|
||||
return ByteBuffer.allocate(size);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -72,26 +72,6 @@
|
||||
<artifactId>quarkus-junit5</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.github.SerCeMan</groupId>
|
||||
<artifactId>jnr-fuse</artifactId>
|
||||
<version>44ed40f8ce</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.github.jnr</groupId>
|
||||
<artifactId>jnr-ffi</artifactId>
|
||||
<version>2.2.16</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.github.jnr</groupId>
|
||||
<artifactId>jnr-posix</artifactId>
|
||||
<version>3.1.19</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.github.jnr</groupId>
|
||||
<artifactId>jnr-constants</artifactId>
|
||||
<version>0.10.4</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.commons</groupId>
|
||||
<artifactId>commons-lang3</artifactId>
|
||||
@@ -127,11 +107,6 @@
|
||||
<artifactId>kleppmanntree</artifactId>
|
||||
<version>1.0-SNAPSHOT</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.usatiuk.dhfs</groupId>
|
||||
<artifactId>supportlib</artifactId>
|
||||
<version>1.0-SNAPSHOT</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.usatiuk.dhfs</groupId>
|
||||
<artifactId>objects</artifactId>
|
||||
|
||||
@@ -6,30 +6,27 @@ import org.pcollections.PCollection;
|
||||
|
||||
import java.util.Collection;
|
||||
|
||||
public record RemoteObjectDataWrapper<T extends JDataRemote>(PCollection<JDataRef> refsFrom,
|
||||
boolean frozen,
|
||||
T data) implements JDataRefcounted {
|
||||
public record RemoteObjectDataWrapper<T extends JDataRemote>(
|
||||
JObjectKey key,
|
||||
PCollection<JDataRef> refsFrom,
|
||||
boolean frozen,
|
||||
T data) implements JDataRefcounted {
|
||||
public RemoteObjectDataWrapper(T data) {
|
||||
this(HashTreePSet.empty(), false, data);
|
||||
this(RemoteObjectMeta.ofDataKey(data.key()), HashTreePSet.empty(), false, data);
|
||||
}
|
||||
|
||||
@Override
|
||||
public RemoteObjectDataWrapper<T> withRefsFrom(PCollection<JDataRef> refs) {
|
||||
return new RemoteObjectDataWrapper<>(refs, frozen, data);
|
||||
return new RemoteObjectDataWrapper<>(key, refs, frozen, data);
|
||||
}
|
||||
|
||||
@Override
|
||||
public RemoteObjectDataWrapper<T> withFrozen(boolean frozen) {
|
||||
return new RemoteObjectDataWrapper<>(refsFrom, frozen, data);
|
||||
return new RemoteObjectDataWrapper<>(key, refsFrom, frozen, data);
|
||||
}
|
||||
|
||||
public RemoteObjectDataWrapper<T> withData(T data) {
|
||||
return new RemoteObjectDataWrapper<>(refsFrom, frozen, data);
|
||||
}
|
||||
|
||||
@Override
|
||||
public JObjectKey key() {
|
||||
return RemoteObjectMeta.ofDataKey(data.key());
|
||||
return new RemoteObjectDataWrapper<>(key, refsFrom, frozen, data);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -41,7 +41,7 @@ public record RemoteObjectMeta(PCollection<JDataRef> refsFrom, boolean frozen,
|
||||
}
|
||||
|
||||
public static JObjectKey ofDataKey(JObjectKey key) {
|
||||
return JObjectKey.of("data_" + key.value());
|
||||
return JObjectKey.of(key.value() + "_data");
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -9,12 +9,13 @@ import com.usatiuk.objects.transaction.Transaction;
|
||||
import io.quarkus.logging.Log;
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Singleton;
|
||||
import org.apache.commons.lang3.mutable.MutableObject;
|
||||
import org.pcollections.HashTreePSet;
|
||||
|
||||
import java.util.Optional;
|
||||
|
||||
@ApplicationScoped
|
||||
@Singleton
|
||||
public class RemoteTransaction {
|
||||
@Inject
|
||||
Transaction curTx;
|
||||
|
||||
@@ -3,11 +3,11 @@ package com.usatiuk.dhfs.jmap;
|
||||
import com.usatiuk.objects.JData;
|
||||
import com.usatiuk.objects.JObjectKey;
|
||||
|
||||
public record JMapEntry<K extends JMapKey>(JObjectKey holder,
|
||||
public record JMapEntry<K extends JMapKey>(JObjectKey key,
|
||||
JObjectKey holder,
|
||||
K selfKey,
|
||||
JObjectKey ref) implements JData {
|
||||
@Override
|
||||
public JObjectKey key() {
|
||||
return JMapHelper.makeKey(holder, selfKey);
|
||||
public JMapEntry(JObjectKey holder, K selfKey, JObjectKey ref) {
|
||||
this(JMapHelper.makeKey(holder, selfKey), holder, selfKey, ref);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -7,11 +7,12 @@ import com.usatiuk.objects.transaction.Transaction;
|
||||
import io.quarkus.logging.Log;
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Singleton;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Optional;
|
||||
|
||||
@ApplicationScoped
|
||||
@Singleton
|
||||
public class JMapHelper {
|
||||
@Inject
|
||||
Transaction curTx;
|
||||
|
||||
@@ -1,11 +1,13 @@
|
||||
package com.usatiuk.dhfs.repository;
|
||||
|
||||
import com.google.protobuf.ByteString;
|
||||
import com.usatiuk.dhfs.PeerId;
|
||||
import com.usatiuk.dhfs.ShutdownChecker;
|
||||
import com.usatiuk.dhfs.repository.peerdiscovery.IpPeerAddress;
|
||||
import com.usatiuk.dhfs.repository.peerdiscovery.PeerAddressType;
|
||||
import com.usatiuk.dhfs.repository.peersync.PeerInfoService;
|
||||
import com.usatiuk.dhfs.repository.peertrust.PeerTrustManager;
|
||||
import com.usatiuk.dhfs.utils.SerializationHelper;
|
||||
import com.usatiuk.objects.transaction.Transaction;
|
||||
import com.usatiuk.objects.transaction.TransactionManager;
|
||||
import io.quarkus.logging.Log;
|
||||
@@ -64,8 +66,8 @@ public class PersistentPeerDataService {
|
||||
var selfData = curTx.get(PersistentRemoteHostsData.class, PersistentRemoteHostsData.KEY).orElse(null);
|
||||
if (selfData != null) {
|
||||
_selfUuid = selfData.selfUuid();
|
||||
_selfCertificate = selfData.selfCertificate();
|
||||
_selfKeyPair = selfData.selfKeyPair();
|
||||
_selfCertificate = CertificateTools.certFromBytes(selfData.selfCertificate().toByteArray());
|
||||
_selfKeyPair = SerializationHelper.deserialize(selfData.selfKeyPair().toByteArray());
|
||||
return;
|
||||
} else {
|
||||
try {
|
||||
@@ -74,7 +76,7 @@ public class PersistentPeerDataService {
|
||||
_selfKeyPair = CertificateTools.generateKeyPair();
|
||||
_selfCertificate = CertificateTools.generateCertificate(_selfKeyPair, _selfUuid.toString());
|
||||
|
||||
curTx.put(new PersistentRemoteHostsData(_selfUuid, _selfCertificate, _selfKeyPair, HashTreePSet.empty(), HashTreePMap.empty()));
|
||||
curTx.put(new PersistentRemoteHostsData(_selfUuid, ByteString.copyFrom(_selfCertificate.getEncoded()), SerializationHelper.serialize(_selfKeyPair), HashTreePSet.empty(), HashTreePMap.empty()));
|
||||
peerInfoService.putPeer(_selfUuid, _selfCertificate.getEncoded());
|
||||
} catch (CertificateEncodingException e) {
|
||||
throw new RuntimeException(e);
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
package com.usatiuk.dhfs.repository;
|
||||
|
||||
import com.google.protobuf.ByteString;
|
||||
import com.usatiuk.dhfs.PeerId;
|
||||
import com.usatiuk.dhfs.repository.peerdiscovery.IpPeerAddress;
|
||||
import com.usatiuk.objects.JData;
|
||||
@@ -12,8 +13,8 @@ import java.security.KeyPair;
|
||||
import java.security.cert.X509Certificate;
|
||||
|
||||
public record PersistentRemoteHostsData(PeerId selfUuid,
|
||||
X509Certificate selfCertificate,
|
||||
KeyPair selfKeyPair,
|
||||
ByteString selfCertificate,
|
||||
ByteString selfKeyPair,
|
||||
PSet<PeerId> initialSyncDone,
|
||||
PMap<PeerId, IpPeerAddress> persistentPeerAddress) implements JData, Serializable {
|
||||
public static final JObjectKey KEY = JObjectKey.of("self_peer_data");
|
||||
|
||||
@@ -5,7 +5,6 @@ dhfs.objects.ref_verification=true
|
||||
dhfs.objects.deletion.delay=0
|
||||
quarkus.log.category."com.usatiuk.dhfs".level=TRACE
|
||||
quarkus.log.category."com.usatiuk.dhfs".min-level=TRACE
|
||||
quarkus.class-loading.parent-first-artifacts=com.usatiuk.dhfs:supportlib
|
||||
quarkus.http.test-port=0
|
||||
quarkus.http.test-ssl-port=0
|
||||
dhfs.local-discovery=false
|
||||
|
||||
@@ -15,57 +15,47 @@ public class DataLocker {
|
||||
@Nonnull
|
||||
public AutoCloseableNoThrow lock(Object data) {
|
||||
while (true) {
|
||||
var newTag = new LockTag();
|
||||
var oldTag = _locks.putIfAbsent(data, newTag);
|
||||
if (oldTag == null) {
|
||||
return new Lock(data, newTag);
|
||||
}
|
||||
try {
|
||||
var tag = _locks.get(data);
|
||||
if (tag != null) {
|
||||
synchronized (tag) {
|
||||
while (!tag.released) {
|
||||
if (tag.owner == Thread.currentThread()) {
|
||||
return DUMMY_LOCK;
|
||||
}
|
||||
tag.wait();
|
||||
synchronized (oldTag) {
|
||||
while (!oldTag.released) {
|
||||
if (oldTag.owner == Thread.currentThread()) {
|
||||
return DUMMY_LOCK;
|
||||
}
|
||||
oldTag.wait();
|
||||
// tag.wait(4000L);
|
||||
// if (!tag.released) {
|
||||
// System.out.println("Timeout waiting for lock: " + data);
|
||||
// System.exit(1);
|
||||
// throw new InterruptedException();
|
||||
// }
|
||||
}
|
||||
continue;
|
||||
}
|
||||
}
|
||||
} catch (InterruptedException ignored) {
|
||||
}
|
||||
|
||||
var newTag = new LockTag();
|
||||
var oldTag = _locks.putIfAbsent(data, newTag);
|
||||
if (oldTag == null) {
|
||||
return new Lock(data, newTag);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Nullable
|
||||
public AutoCloseableNoThrow tryLock(Object data) {
|
||||
while (true) {
|
||||
var tag = _locks.get(data);
|
||||
if (tag != null) {
|
||||
synchronized (tag) {
|
||||
if (!tag.released) {
|
||||
if (tag.owner == Thread.currentThread()) {
|
||||
return DUMMY_LOCK;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
var newTag = new LockTag();
|
||||
var oldTag = _locks.putIfAbsent(data, newTag);
|
||||
if (oldTag == null) {
|
||||
return new Lock(data, newTag);
|
||||
}
|
||||
synchronized (oldTag) {
|
||||
if (!oldTag.released) {
|
||||
if (oldTag.owner == Thread.currentThread()) {
|
||||
return DUMMY_LOCK;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -83,11 +73,11 @@ public class DataLocker {
|
||||
public Lock(Object key, LockTag tag) {
|
||||
_key = key;
|
||||
_tag = tag;
|
||||
CLEANER.register(this, () -> {
|
||||
if (!tag.released) {
|
||||
Log.error("Lock collected without release: " + key);
|
||||
}
|
||||
});
|
||||
// CLEANER.register(this, () -> {
|
||||
// if (!tag.released) {
|
||||
// Log.error("Lock collected without release: " + key);
|
||||
// }
|
||||
// });
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -45,7 +45,7 @@ public class HashSetDelayedBlockingQueueTest {
|
||||
queue.add("hello!");
|
||||
Assertions.assertEquals("hello!", queue.get());
|
||||
var gotTime = System.currentTimeMillis();
|
||||
Assertions.assertTrue((gotTime - curTime) <= 10);
|
||||
Assertions.assertTrue((gotTime - curTime) <= 50);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
||||
@@ -14,7 +14,6 @@ exec java \
|
||||
-Ddhfs.fuse.root=/dhfs_root_fuse \
|
||||
-Dquarkus.http.host=0.0.0.0 \
|
||||
-Ddhfs.objects.ref_verification=false \
|
||||
-Dcom.usatiuk.dhfs.supportlib.native-path=/usr/src/app/native-libs \
|
||||
-Dquarkus.log.category.\"com.usatiuk.dhfs\".level=$DHFS_LOGLEVEL \
|
||||
"$@" \
|
||||
-jar quarkus-run.jar
|
||||
|
||||
83
libdhfs_support/.gitignore
vendored
83
libdhfs_support/.gitignore
vendored
@@ -1,83 +0,0 @@
|
||||
.DS_Store
|
||||
/toolchain
|
||||
/cmake-build-debug
|
||||
/sysroot
|
||||
/mvn-build
|
||||
|
||||
# Covers JetBrains IDEs: IntelliJ, RubyMine, PhpStorm, AppCode, PyCharm, CLion, Android Studio, WebStorm and Rider
|
||||
# Reference: https://intellij-support.jetbrains.com/hc/en-us/articles/206544839
|
||||
|
||||
# User-specific stuff
|
||||
.idea/**/workspace.xml
|
||||
.idea/**/tasks.xml
|
||||
.idea/**/usage.statistics.xml
|
||||
.idea/**/dictionaries
|
||||
.idea/**/shelf
|
||||
|
||||
# AWS User-specific
|
||||
.idea/**/aws.xml
|
||||
|
||||
# Generated files
|
||||
.idea/**/contentModel.xml
|
||||
|
||||
# Sensitive or high-churn files
|
||||
.idea/**/dataSources/
|
||||
.idea/**/dataSources.ids
|
||||
.idea/**/dataSources.local.xml
|
||||
.idea/**/sqlDataSources.xml
|
||||
.idea/**/dynamic.xml
|
||||
.idea/**/uiDesigner.xml
|
||||
.idea/**/dbnavigator.xml
|
||||
|
||||
# Gradle
|
||||
.idea/**/gradle.xml
|
||||
.idea/**/libraries
|
||||
|
||||
# Gradle and Maven with auto-import
|
||||
# When using Gradle or Maven with auto-import, you should exclude module files,
|
||||
# since they will be recreated, and may cause churn. Uncomment if using
|
||||
# auto-import.
|
||||
# .idea/artifacts
|
||||
# .idea/compiler.xml
|
||||
# .idea/jarRepositories.xml
|
||||
# .idea/modules.xml
|
||||
# .idea/*.iml
|
||||
# .idea/modules
|
||||
# *.iml
|
||||
# *.ipr
|
||||
|
||||
# CMake
|
||||
cmake-build-*/
|
||||
|
||||
# Mongo Explorer plugin
|
||||
.idea/**/mongoSettings.xml
|
||||
|
||||
# File-based project format
|
||||
*.iws
|
||||
|
||||
# IntelliJ
|
||||
out/
|
||||
|
||||
# mpeltonen/sbt-idea plugin
|
||||
.idea_modules/
|
||||
|
||||
# JIRA plugin
|
||||
atlassian-ide-plugin.xml
|
||||
|
||||
# Cursive Clojure plugin
|
||||
.idea/replstate.xml
|
||||
|
||||
# SonarLint plugin
|
||||
.idea/sonarlint/
|
||||
|
||||
# Crashlytics plugin (for Android Studio and IntelliJ)
|
||||
com_crashlytics_export_strings.xml
|
||||
crashlytics.properties
|
||||
crashlytics-build.properties
|
||||
fabric.properties
|
||||
|
||||
# Editor-based Rest Client
|
||||
.idea/httpRequests
|
||||
|
||||
# Android studio 3.1+ serialized cache file
|
||||
.idea/caches/build_file_checksums.ser
|
||||
@@ -1,39 +0,0 @@
|
||||
cmake_minimum_required(VERSION 3.24)
|
||||
project(libdhfs_support CXX)
|
||||
|
||||
if (CMAKE_BUILD_TYPE STREQUAL "Debug")
|
||||
if (NOT SANITIZE)
|
||||
set(SANITIZE YES)
|
||||
endif ()
|
||||
endif ()
|
||||
|
||||
include(CheckCXXCompilerFlag)
|
||||
|
||||
if (SANITIZE STREQUAL "YES")
|
||||
message(STATUS "Enabling sanitizers!")
|
||||
add_compile_options(-Werror -Wall -Wextra -pedantic -Wshadow -Wformat=2 -Wfloat-equal -D_GLIBCXX_DEBUG -Wconversion)
|
||||
check_cxx_compiler_flag(-fsanitize-trap=all CAN_TRAP)
|
||||
if (CAN_TRAP)
|
||||
add_compile_options(-fsanitize=undefined -fsanitize-trap=all -fno-sanitize-recover)
|
||||
add_link_options(-fsanitize=undefined -fsanitize-trap=all -fno-sanitize-recover)
|
||||
else ()
|
||||
message(WARNING "Sanitizers not supported!")
|
||||
endif ()
|
||||
endif ()
|
||||
|
||||
if (CMAKE_BUILD_TYPE STREQUAL "Release")
|
||||
add_compile_options(-flto)
|
||||
add_link_options(-flto)
|
||||
endif ()
|
||||
|
||||
if (NOT CMAKE_BUILD_TYPE STREQUAL "Debug")
|
||||
add_compile_options(-O3)
|
||||
add_link_options(-O3)
|
||||
endif ()
|
||||
|
||||
message(STATUS "Build type: ${CMAKE_BUILD_TYPE}")
|
||||
|
||||
set(CMAKE_POSITION_INDEPENDENT_CODE ON)
|
||||
|
||||
add_subdirectory(helpers)
|
||||
add_subdirectory(DhfsSupportNative)
|
||||
@@ -1,26 +0,0 @@
|
||||
set(CMAKE_CXX_STANDARD 17)
|
||||
|
||||
find_package(JNI REQUIRED COMPONENTS JVM)
|
||||
find_package(Java REQUIRED)
|
||||
include(UseJava)
|
||||
|
||||
add_jar(DhfsSupportNative
|
||||
"${PROJECT_SOURCE_DIR}/../dhfs-parent/supportlib/src/main/java/com/usatiuk/dhfs/supportlib/DhfsSupportNative.java"
|
||||
"LibPathDummy.java"
|
||||
GENERATE_NATIVE_HEADERS DhfsSupportNative-native
|
||||
)
|
||||
|
||||
add_library(dhfs_support SHARED
|
||||
src/DhfsSupportNative.cpp
|
||||
)
|
||||
|
||||
target_compile_options(dhfs_support PRIVATE
|
||||
-Wno-unused-parameter
|
||||
)
|
||||
|
||||
target_link_libraries(dhfs_support PRIVATE
|
||||
helpers
|
||||
DhfsSupportNative-native
|
||||
)
|
||||
|
||||
install(TARGETS dhfs_support LIBRARY DESTINATION "${DHFS_LIB_INSTALL}/${CMAKE_SYSTEM_NAME}-${CMAKE_SYSTEM_PROCESSOR}")
|
||||
@@ -1,9 +0,0 @@
|
||||
package com.usatiuk.dhfs.supportlib;
|
||||
|
||||
import java.nio.file.Path;
|
||||
|
||||
class DhfsNativeLibFinder {
|
||||
static Path getLibPath() {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
@@ -1,55 +0,0 @@
|
||||
#include <cstdio>
|
||||
#include <cstdlib>
|
||||
#include <cstdint>
|
||||
#include <cassert>
|
||||
|
||||
#include "com_usatiuk_dhfs_supportlib_DhfsSupportNative.h"
|
||||
|
||||
#include "Utils.h"
|
||||
#include "MemoryHelpers.h"
|
||||
|
||||
extern "C" {
|
||||
JNIEXPORT jlong JNICALL Java_com_usatiuk_dhfs_supportlib_DhfsSupportNative_allocateUninitializedByteBuffer
|
||||
(JNIEnv* env, jclass klass, jobjectArray bb, jint size) {
|
||||
if (size < 0) {
|
||||
env->ThrowNew(env->FindClass("java/lang/IllegalArgumentException"), "Size less than 0?");
|
||||
return 0;
|
||||
}
|
||||
|
||||
size_t checked_size = checked_cast<size_t>(size);
|
||||
|
||||
void* buf;
|
||||
if (checked_size < MemoryHelpers::get_page_size())
|
||||
buf = malloc(checked_size);
|
||||
else
|
||||
buf = std::aligned_alloc(MemoryHelpers::get_page_size(),
|
||||
align_up(checked_size, MemoryHelpers::get_page_size()));
|
||||
|
||||
if (buf == nullptr) {
|
||||
env->ThrowNew(env->FindClass("java/lang/OutOfMemoryError"), "Buffer memory allocation failed");
|
||||
return 0;
|
||||
}
|
||||
|
||||
env->SetObjectArrayElement(bb, 0, env->NewDirectByteBuffer(buf, checked_cast<jlong>(checked_size)));
|
||||
|
||||
jlong token = checked_cast<jlong>((uintptr_t) buf);
|
||||
return token;
|
||||
}
|
||||
|
||||
JNIEXPORT void JNICALL Java_com_usatiuk_dhfs_supportlib_DhfsSupportNative_releaseByteBuffer
|
||||
(JNIEnv* env, jclass klass, jlong token) {
|
||||
const auto addr = checked_cast<uintptr_t>(token);
|
||||
|
||||
if (addr == 0) {
|
||||
env->ThrowNew(env->FindClass("java/lang/IllegalArgumentException"), "Trying to free null pointer");
|
||||
return;
|
||||
}
|
||||
|
||||
free((void*) addr);
|
||||
}
|
||||
|
||||
JNIEXPORT jint JNICALL Java_com_usatiuk_dhfs_supportlib_DhfsSupportNative_getPageSizeInternal
|
||||
(JNIEnv*, jclass) {
|
||||
return checked_cast<jint>(MemoryHelpers::get_page_size());
|
||||
}
|
||||
}
|
||||
@@ -1,3 +0,0 @@
|
||||
FROM rockylinux:8
|
||||
|
||||
RUN dnf install -y java-21-openjdk-headless java-21-openjdk-devel cmake gcc gcc-c++
|
||||
@@ -1,56 +0,0 @@
|
||||
#!/usr/bin/env bash
|
||||
|
||||
CMAKE_ARGS="${CMAKE_ARGS:--DCMAKE_BUILD_TYPE=Debug}"
|
||||
|
||||
export SCRIPT_DIR=$(cd -- "$(dirname -- "${BASH_SOURCE[0]}")" &>/dev/null && pwd)
|
||||
cd "$SCRIPT_DIR"
|
||||
|
||||
if [[ "${DO_LOCAL_BUILD^^}" != "TRUE" ]]; then
|
||||
if [[ "$(uname)" == "Linux" ]]; then
|
||||
if [[ -z "${DOCKER_PLATFORM}" ]]; then
|
||||
echo "Already on linux"
|
||||
exit 0
|
||||
fi
|
||||
fi
|
||||
exec "$SCRIPT_DIR"/docker-launch.sh "$@"
|
||||
fi
|
||||
|
||||
set -euxo pipefail
|
||||
|
||||
if [ $# -lt 3 ]; then
|
||||
echo "Not enough arguments supplied: (build/configure) (build dir) (output dir)"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
PROJECT_DIR="$SCRIPT_DIR/.."
|
||||
|
||||
CONFIGURE_DIR="$2"
|
||||
INSTALL_DIR="$3"
|
||||
|
||||
function configure() {
|
||||
cmake -B"$CONFIGURE_DIR" -S"$PROJECT_DIR" -DDHFS_LIB_INSTALL="$INSTALL_DIR" $CMAKE_ARGS
|
||||
}
|
||||
|
||||
function build() {
|
||||
cmake --build "$CONFIGURE_DIR" --target install
|
||||
}
|
||||
|
||||
mkdir -p "$2"
|
||||
mkdir -p "$3"
|
||||
|
||||
case "$1" in
|
||||
"configure")
|
||||
configure
|
||||
;;
|
||||
"build")
|
||||
build
|
||||
;;
|
||||
"both")
|
||||
configure
|
||||
build
|
||||
;;
|
||||
*)
|
||||
echo "Unknown command"
|
||||
exit 1
|
||||
;;
|
||||
esac
|
||||
@@ -1,21 +0,0 @@
|
||||
#!/usr/bin/env bash
|
||||
|
||||
set -exo pipefail
|
||||
export SCRIPT_DIR=$(cd -- "$(dirname -- "${BASH_SOURCE[0]}")" &>/dev/null && pwd)
|
||||
cd "$SCRIPT_DIR"
|
||||
|
||||
PLATFORM_ARG=""
|
||||
if [[ ! -z "${DOCKER_PLATFORM}" ]]; then
|
||||
PLATFORM_ARG="--platform $DOCKER_PLATFORM"
|
||||
fi
|
||||
|
||||
if [[ -z "${DOCKER_BUILDER_IMAGE}" ]]; then
|
||||
DOCKER_IMG_FILE=$(mktemp)
|
||||
docker build $PLATFORM_ARG --iidfile "$DOCKER_IMG_FILE" .
|
||||
DOCKER_BUILDER_IMAGE="$(cat "$DOCKER_IMG_FILE")"
|
||||
fi
|
||||
|
||||
ROOT_DIR="$(dirname "$(dirname "$SCRIPT_DIR")")"
|
||||
|
||||
docker run $PLATFORM_ARG --rm -v "$ROOT_DIR:$ROOT_DIR" -e DO_LOCAL_BUILD=TRUE "$DOCKER_BUILDER_IMAGE" \
|
||||
"$SCRIPT_DIR/cross-build.sh" "$@"
|
||||
@@ -1,9 +0,0 @@
|
||||
set(CMAKE_CXX_STANDARD 17)
|
||||
|
||||
add_library(helpers
|
||||
include/MemoryHelpers.h
|
||||
src/MemoryHelpers.cpp
|
||||
include/Utils.h
|
||||
)
|
||||
|
||||
target_include_directories(helpers PUBLIC include)
|
||||
@@ -1,12 +0,0 @@
|
||||
//
|
||||
// Created by stepus53 on 24.8.24.
|
||||
//
|
||||
|
||||
#ifndef MEMORYHELPERS_H
|
||||
#define MEMORYHELPERS_H
|
||||
|
||||
namespace MemoryHelpers {
|
||||
unsigned int get_page_size();
|
||||
}
|
||||
|
||||
#endif //MEMORYHELPERS_H
|
||||
@@ -1,41 +0,0 @@
|
||||
//
|
||||
// Created by stepus53 on 24.8.24.
|
||||
//
|
||||
|
||||
#ifndef UTILS_H
|
||||
#define UTILS_H
|
||||
|
||||
#include <cassert>
|
||||
|
||||
#pragma GCC diagnostic push
|
||||
#pragma GCC diagnostic ignored "-Wsign-conversion"
|
||||
#pragma GCC diagnostic ignored "-Wsign-compare"
|
||||
|
||||
template<typename To, typename From>
|
||||
constexpr To checked_cast(const From& f) {
|
||||
To result = static_cast<To>(f);
|
||||
assert(f == result);
|
||||
return result;
|
||||
}
|
||||
|
||||
#pragma GCC diagnostic pop
|
||||
|
||||
template<typename T, typename A>
|
||||
T align_up(T what, A alignment) {
|
||||
assert(__builtin_popcount(alignment) == 1);
|
||||
|
||||
const T mask = checked_cast<T>(alignment - 1);
|
||||
|
||||
T ret;
|
||||
|
||||
if (what & mask)
|
||||
ret = (what + mask) & ~mask;
|
||||
else
|
||||
ret = what;
|
||||
|
||||
assert((ret & mask) == 0);
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
#endif //UTILS_H
|
||||
@@ -1,14 +0,0 @@
|
||||
//
|
||||
// Created by stepus53 on 24.8.24.
|
||||
//
|
||||
|
||||
#include "MemoryHelpers.h"
|
||||
|
||||
#include <unistd.h>
|
||||
|
||||
#include "Utils.h"
|
||||
|
||||
unsigned int MemoryHelpers::get_page_size() {
|
||||
static const auto PAGE_SIZE = checked_cast<unsigned int>(sysconf(_SC_PAGESIZE));
|
||||
return PAGE_SIZE;
|
||||
}
|
||||
@@ -36,7 +36,6 @@ java \
|
||||
-Dquarkus.http.host=0.0.0.0 \
|
||||
-Dquarkus.log.category.\"com.usatiuk\".level=INFO \
|
||||
-Dquarkus.log.category.\"com.usatiuk.dhfs\".level=INFO \
|
||||
-Dcom.usatiuk.dhfs.supportlib.native-path="$SCRIPT_DIR"/NativeLibs \
|
||||
-Ddhfs.webui.root="$SCRIPT_DIR"/Webui $EXTRAOPTS_PARSED \
|
||||
-jar "$SCRIPT_DIR"/"DHFS Package"/quarkus-run.jar >quarkus.log 2>&1 &
|
||||
|
||||
|
||||
Reference in New Issue
Block a user