29 Commits

Author SHA1 Message Date
c1dfe88164 quarkus update 2025-10-12 10:54:00 +02:00
acf2ae2cef less commitSnapshots created 2025-08-22 23:19:39 +02:00
19016d5e46 less bad read cache 2025-08-22 18:47:43 +02:00
c4945e7354 more fuse logs 2025-07-19 14:26:27 +02:00
60ffc12c61 disable lazyfs tests by default
still some fuse issues there
2025-06-25 18:50:19 +02:00
e3e62467e4 cleanup data path config 2025-06-25 18:02:20 +02:00
2434b0464f fit.cvut.cz link fix 2025-06-12 09:52:24 +02:00
ab4e06177e add a notice 2025-06-12 09:49:30 +02:00
86f240f439 cleanup dependencies a little 2025-05-14 21:45:20 +02:00
59447aa286 fix windows map size 2025-05-14 20:54:59 +02:00
98b7fe81ae another note 2025-05-14 20:43:32 +02:00
0713d20d95 windows note 2 2025-05-14 20:39:14 +02:00
13390ba301 windows note 2025-05-14 20:24:42 +02:00
b040c64da7 grammar fix 2025-05-14 20:21:19 +02:00
fa9b77dc34 update java version 2025-05-14 20:20:53 +02:00
508df91c0a readme fixie 2025-05-14 19:54:31 +02:00
20eb436c4b remove readme trailing space space 2025-05-14 19:53:19 +02:00
59d5b22266 improve readme 2 2025-05-14 19:52:19 +02:00
4167f661e8 copy readme to run wrapper 2025-05-14 19:49:28 +02:00
2cc5a703ef Improve readme 2025-05-14 19:48:38 +02:00
a5490047b8 Slight fixes 2025-05-14 19:21:00 +02:00
2cd210dfd1 Slight test cleanup 2025-05-14 19:16:22 +02:00
6e37c26845 Slight config cleanup 2025-05-14 19:00:51 +02:00
4f7c7927f3 JAVA_HOME in run wrapper 2025-05-14 18:33:27 +02:00
723a94ce0e update readme 2025-05-14 18:11:53 +02:00
57b57397b6 Dhfs-fuse: remove TieredStopAtLevel from test 2025-05-14 18:05:25 +02:00
2a6656cd1a Sync-base: increase default getSelfInfo timeout 2025-05-14 18:04:50 +02:00
de5338a813 more cleanup 2025-05-14 16:52:45 +02:00
8b4430fa73 Some config cleanup 2025-05-14 14:16:06 +02:00
65 changed files with 441 additions and 1241 deletions

View File

@@ -1,4 +0,0 @@
**/.parcel-cache
**/dist
**/node_modules
**/target

View File

@@ -7,12 +7,6 @@ on:
pull_request:
branches: ["main"]
env:
# Use docker.io for Docker Hub if empty
REGISTRY: ghcr.io
# github.repository as <account>/<repo>
IMAGE_NAME: ${{ github.repository }}
jobs:
build-dhfs:
runs-on: ubuntu-latest
@@ -49,9 +43,6 @@ jobs:
- name: Test with Maven
run: cd dhfs-parent && mvn -T $(nproc) --batch-mode --update-snapshots package verify javadoc:aggregate
# - name: Build with Maven
# run: cd dhfs-parent && mvn --batch-mode --update-snapshots package # -Dquarkus.log.category.\"com.usatiuk.dhfs\".min-level=DEBUG
- uses: actions/upload-artifact@v4
with:
name: DHFS Server Package
@@ -92,107 +83,10 @@ jobs:
name: Webui
path: webui/dist
publish-docker:
runs-on: ubuntu-latest
permissions:
contents: read
packages: write
# This is used to complete the identity challenge
# with sigstore/fulcio when running outside of PRs.
id-token: write
needs: [build-webui, build-dhfs]
steps:
- name: Checkout repository
uses: actions/checkout@v4
- name: Download server package
uses: actions/download-artifact@v4
with:
name: DHFS Server Package
path: dhfs-package-downloaded
- name: Download webui
uses: actions/download-artifact@v4
with:
name: Webui
path: webui-dist-downloaded
- name: Show all the files
run: find .
# Install the cosign tool except on PR
# https://github.com/sigstore/cosign-installer
- name: Install cosign
if: github.event_name != 'pull_request'
uses: sigstore/cosign-installer@v3.5.0
with:
cosign-release: "v2.2.4"
# Set up BuildKit Docker container builder to be able to build
# multi-platform images and export cache
# https://github.com/docker/setup-buildx-action
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
- name: Set up QEMU
uses: docker/setup-qemu-action@v3
# Login against a Docker registry except on PR
# https://github.com/docker/login-action
- name: Log into registry ${{ env.REGISTRY }}
if: github.event_name != 'pull_request'
uses: docker/login-action@v3
with:
registry: ${{ env.REGISTRY }}
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
# Extract metadata (tags, labels) for Docker
# https://github.com/docker/metadata-action
- name: Extract Docker metadata
id: meta
uses: docker/metadata-action@v5
with:
images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}
# Build and push Docker image with Buildx (don't push on PR)
# https://github.com/docker/build-push-action
- name: Build and push Docker image
id: build-and-push
uses: docker/build-push-action@v5
with:
context: .
file: ./Dockerfile.ci
push: ${{ github.event_name != 'pull_request' }}
platforms: linux/amd64,linux/arm64
tags: ${{ steps.meta.outputs.tags }}
labels: ${{ steps.meta.outputs.labels }}
cache-from: type=gha
cache-to: type=gha,mode=max
# Sign the resulting Docker image digest except on PRs.
# This will only write to the public Rekor transparency log when the Docker
# repository is public to avoid leaking data. If you would like to publish
# transparency data even for private images, pass --force to cosign below.
# https://github.com/sigstore/cosign
- name: Sign the published Docker image
if: ${{ github.event_name != 'pull_request' }}
env:
# https://docs.github.com/en/actions/security-guides/security-hardening-for-github-actions#using-an-intermediate-environment-variable
TAGS: ${{ steps.meta.outputs.tags }}
DIGEST: ${{ steps.build-and-push.outputs.digest }}
# This step uses the identity token to provision an ephemeral certificate
# against the sigstore community Fulcio instance.
run: echo "${TAGS}" | xargs -I {} cosign sign --yes {}@${DIGEST}
publish-run-wrapper:
runs-on: ubuntu-latest
permissions:
contents: read
packages: write
# This is used to complete the identity challenge
# with sigstore/fulcio when running outside of PRs.
id-token: write
needs: [build-webui, build-dhfs]
@@ -225,6 +119,10 @@ jobs:
- name: Copy run wrapper
run: cp -r ./run-wrapper/* "run-wrapper-out/dhfs/app/"
- name: Copy README
run: |
cp README.md "run-wrapper-out/dhfs/"
- name: Add version to run wrapper
run: echo $GITHUB_RUN_ID > "run-wrapper-out/dhfs/app/"version
@@ -260,13 +158,12 @@ jobs:
- name: Setup Pages
uses: actions/configure-pages@v5
- name: Upload artifact
uses: actions/upload-pages-artifact@v3
with:
# Upload entire repository
path: 'dhfs-javadocs-downloaded'
path: "dhfs-javadocs-downloaded"
- name: Deploy to GitHub Pages
id: deployment
uses: actions/deploy-pages@v4

1
.vscode/ltex.dictionary.en-US.txt vendored Normal file
View File

@@ -0,0 +1 @@
Syncthing

View File

@@ -1,35 +0,0 @@
FROM node:20-bullseye as webui-build
WORKDIR /usr/src/app/webui-build
COPY ./webui/package*.json ./
RUN npm i
COPY ./webui/. .
RUN npm run build
FROM azul/zulu-openjdk:21 as server-build
WORKDIR /usr/src/app/server-build
COPY ./server/.mvn .mvn
COPY ./server/mvnw ./server/pom.xml ./
RUN ./mvnw quarkus:go-offline
# The previous thing still doesn't download 100% everything
RUN ./mvnw -Dmaven.test.skip=true -Dskip.unit=true package --fail-never
COPY ./server/. .
RUN ./mvnw -Dmaven.test.skip=true -Dskip.unit=true clean package
FROM azul/zulu-openjdk-alpine:21-jre-headless
RUN apk update && apk add fuse && rm -rf /var/cache/apk/*
WORKDIR /usr/src/app
COPY --from=server-build /usr/src/app/server-build/target/quarkus-app/. .
RUN mkdir -p webui
COPY --from=webui-build /usr/src/app/webui-build/dist/. ./webui
ENV dhfs_webui_root=/usr/src/app/webui
COPY ./dockerentry.sh .
RUN ["chmod", "+x", "./dockerentry.sh"]
CMD [ "./dockerentry.sh" ]

View File

@@ -1,22 +0,0 @@
FROM azul/zulu-openjdk:21-jre-headless
RUN apt update && apt install -y libfuse2 && apt-get clean
WORKDIR /usr/src/app
COPY ./dhfs-package-downloaded/lib .
COPY ./dhfs-package-downloaded/*.jar .
COPY ./dhfs-package-downloaded/app .
COPY ./dhfs-package-downloaded/quarkus .
WORKDIR /usr/src/app/webui
COPY ./webui-dist-downloaded/. .
ENV dhfs_webui_root=/usr/src/app/webui
WORKDIR /usr/src/app
COPY ./dockerentry.sh .
RUN ["chmod", "+x", "./dockerentry.sh"]
CMD [ "./dockerentry.sh" ]

View File

@@ -1,4 +1,6 @@
# Distributed Home File System 🚧
# Distributed Home File System
[Javadocs](https://usatiuk.github.io/dhfs/)
## What is this?
@@ -11,41 +13,78 @@ Syncthing and allowing you to stream your files like Google Drive File Stream
[Download latest build](https://nightly.link/usatiuk/dhfs/workflows/server/main/Run%20wrapper.zip)
This is a simple wrapper around the jar/web ui distribution that allows you to run/stop
the DHFS server in the background, and update itself (hopefully!)
This is a simple set of scripts that allows you to run/stop
the DHFS server in the background, and update it.
Once unpacked, in the root folder (`dhfs`), there will be 3 folders:
- `app` contains the application
- `data` contains the filesystem data storage
- `fuse` is the default filesystem mount point (not on Windows, the default mount drive letter is `Z`)
Note that on Windows, the path to the root can not contain spaces.
## How to use it?
### General prerequisites
Java should be available as `java` in path, and Java 21+ is required.
Java should be available as `java` in `PATH`, or with a correctly set `JAVA_HOME` (ignored on Windows), and Java 21 is required.
FUSE 2 userspace library also should be available:
- On Ubuntu `libfuse2` package can be installed.
- On Ubuntu `libfuse2` package can be installed, or an analogous package for other distributions.
- On Windows, [WinFsp](https://winfsp.dev/) should be installed.
- On Windows, [WinFsp](https://winfsp.dev/) should be installed.
- On macOS, [macFUSE](https://macfuse.github.io/).
In the run-wrapper, 3 scripts are available.
### How to run it?
In the run-wrapper `app` folder, 3 scripts are available.
- `run` script starts the filesystem
- `stop` script stops it
- `update` script will update the filesystem to the newest available CI build
On Windows, Powershell alternatives should be used. For them to work, it might be required to allow execution of unsigned scripts using `set-executionpolicy remotesigned`.
On Windows, Powershell versions of the scripts should be used. For them to work, it might be required to allow execution of unsigned scripts using `set-executionpolicy unrestricted`.
### Additional options
Additional options for the filesystem can be specified in the `extra-opts` file in the same directory with the run scripts.
One line in the `extra-opts` file corresponds to one option passed to the JVM when starting the filesystem.
One line in the `extra-opts` file corresponds to one option passed to the JVM when starting the filesystem.
Some extra possible configuration options are:
- `-Ddhfs.fuse.root=` specifies the root where filesystem should be mounted. By default it is the `fuse` path under the `run-wrapper` root. For windows, it should be a disk letter, by default `Z:\`.
- `-Ddhfs.objects.last-seen.timeout=` specifies the period of time (in seconds) after which unavailable peers will be ignored for gabgage collection and resynchronized after being reconnected. The default is 43200 (30 days), if set to `-1`, this feature is disabled.
- `-Ddhfs.fuse.root=` specifies the root where filesystem should be mounted. By default, it is the `fuse` path under the `run-wrapper` root. For windows, it should be a disk root, by default `Z:\`.
- `-Ddhfs.objects.last-seen.timeout=` specifies the period of time (in seconds) after which unavailable peers will be ignored for garbage collection and resynchronized after being reconnected. The default is 43200 (30 days), if set to `-1`, this feature is disabled.
- `-Ddhfs.objects.autosync.download-all=` specifies whether all objects (files and their data) should be downloaded to this peer. `true` or `false`, the default is `false`.
- `-Ddhfs.objects.peerdiscovery.port=` port to broadcast on and listen to for LAN peer discovery (default is `42262`)
- `-Ddhfs.objects.peerdiscovery.broadcast=` whether to enable local peer discovery or not (default is `true`)
- `-Dquarkus.http.port=` HTTP port to listen on (default is `8080`)
- `-Dquarkus.http.ssl-port=` HTTPS port to listen on (default is `8443`)
- `-Dquarkus.http.host=` IP address to listen on (default is `0.0.0.0`)
- `-Ddhfs.peerdiscovery.static-peers=` allows to manually specify a peer's address in format of `peer id:http port:https port`, for example `-Ddhfs.peerdiscovery.static-peers=11000000-0000-0000-0000-000000000000:127.0.0.1:9010:9011`
On Windows, the entire space for the filesystem should also be preallocated, the `-Ddhfs.objects.persistence.lmdb.size=` option controls the size (the value is in bytes), on Windows the default is 100GB.
On Windows, the entire space for the filesystem should also be preallocated, the `-Ddhfs.objects.persistence.lmdb.size=` option controls the size (the value is in bytes), on Windows the default is 100 GB.
Then, a web interface will be available at `losthost:8080`, that can be used to connect with other peers.
In case of errors, the standard output is redirected to `quarkus.log` in the `app` folder, on Windows the error output is separate.
### How to connect to other peers?
Then, a web interface will be available at `losthost:8080` (or whatever the HTTP port is), that can be used to connect with other peers. Peers on local network should be available to be connected to automatically.
## Other notes
### Running tests
To run LazyFS tests, LazyFS needs to be built: the git submodules need to be cloned and `./thirdparty/lazyfs/build.sh` script needs to be run.
LazyFS tests were only tested on Linux.
### Notice
This software was developed with the support of the Faculty of Information Technology, Czech Technical University in Prague, [fit.cvut.cz](https://fit.cvut.cz)
<img src="./docs/logo-fit-en-cerna.svg" height="64">

View File

@@ -2,7 +2,7 @@
<configuration default="false" name="Main 2" type="QsApplicationConfigurationType" factoryName="QuarkusApplication">
<option name="MAIN_CLASS_NAME" value="com.usatiuk.dhfsfuse.Main" />
<module name="dhfs-fuse" />
<option name="VM_PARAMETERS" value="-XX:+UnlockDiagnosticVMOptions -XX:+UseParallelGC -XX:+DebugNonSafepoints --enable-preview --add-exports java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-exports java.base/jdk.internal.access=ALL-UNNAMED -ea -Xmx512M -Ddhfs.webui.root=$ProjectFileDir$/../webui/dist -Ddhfs.fuse.root=${HOME}/dhfs_test/2/fuse -Ddhfs.objects.persistence.files.root=${HOME}/dhfs_test/2/data -Ddhfs.objects.persistence.stuff.root=${HOME}/dhfs_test/2/data/stuff -Ddhfs.objects.peerdiscovery.broadcast=false -Dquarkus.http.port=9020 -Dquarkus.http.ssl-port=9021 -Ddhfs.peerdiscovery.preset-uuid=22000000-0000-0000-0000-000000000000 -Ddhfs.peerdiscovery.static-peers=11000000-0000-0000-0000-000000000000:127.0.0.1:9010:9011" />
<option name="VM_PARAMETERS" value="-XX:+UnlockDiagnosticVMOptions -XX:+UseParallelGC -XX:+DebugNonSafepoints --enable-preview --add-exports java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-exports java.base/jdk.internal.access=ALL-UNNAMED -ea -Xmx512M -Ddhfs.webui.root=$ProjectFileDir$/../webui/dist -Ddhfs.fuse.root=${HOME}/dhfs_test/2/fuse -Ddhfs.objects.persistence.root=${HOME}/dhfs_test/2/data -Ddhfs.objects.peerdiscovery.broadcast=false -Dquarkus.http.port=9020 -Dquarkus.http.ssl-port=9021 -Ddhfs.peerdiscovery.preset-uuid=22000000-0000-0000-0000-000000000000 -Ddhfs.peerdiscovery.static-peers=11000000-0000-0000-0000-000000000000:127.0.0.1:9010:9011" />
<extension name="coverage">
<pattern>
<option name="PATTERN" value="com.usatiuk.dhfs.remoteobj.*" />

View File

@@ -2,7 +2,7 @@
<configuration default="false" name="Main" type="QsApplicationConfigurationType" factoryName="QuarkusApplication" nameIsGenerated="true">
<option name="MAIN_CLASS_NAME" value="com.usatiuk.dhfsfuse.Main" />
<module name="dhfs-fuse" />
<option name="VM_PARAMETERS" value="-XX:+UnlockDiagnosticVMOptions -XX:+UseZGC -XX:+ZGenerational --enable-preview -XX:+DebugNonSafepoints --add-exports java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-exports java.base/jdk.internal.access=ALL-UNNAMED -ea -Xmx1G -Ddhfs.webui.root=$ProjectFileDir$/../webui/dist -Ddhfs.fuse.root=${HOME}/dhfs_test/1/fuse -Ddhfs.objects.persistence.files.root=${HOME}/dhfs_test/1/data -Ddhfs.objects.persistence.stuff.root=${HOME}/dhfs_test/1/data/stuff -Ddhfs.objects.peerdiscovery.broadcast=true -Dquarkus.http.port=9010 -Dquarkus.http.ssl-port=9011 -Ddhfs.peerdiscovery.preset-uuid=11000000-0000-0000-0000-000000000000 -Ddhfs.peerdiscovery.static-peers=22000000-0000-0000-0000-000000000000:127.0.0.1:9020:9021 -Dquarkus.http.host=0.0.0.0" />
<option name="VM_PARAMETERS" value="-XX:+UnlockDiagnosticVMOptions -XX:+UseZGC -XX:+ZGenerational --enable-preview -XX:+DebugNonSafepoints --add-exports java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-exports java.base/jdk.internal.access=ALL-UNNAMED -ea -Xmx1G -Ddhfs.webui.root=$ProjectFileDir$/../webui/dist -Ddhfs.fuse.root=${HOME}/dhfs_test/1/fuse -Ddhfs.objects.persistence.root=${HOME}/dhfs_test/1/data -Ddhfs.objects.peerdiscovery.broadcast=true -Dquarkus.http.port=9010 -Dquarkus.http.ssl-port=9011 -Ddhfs.peerdiscovery.preset-uuid=11000000-0000-0000-0000-000000000000 -Ddhfs.peerdiscovery.static-peers=22000000-0000-0000-0000-000000000000:127.0.0.1:9020:9021 -Dquarkus.http.host=0.0.0.0" />
<extension name="coverage">
<pattern>
<option name="PATTERN" value="com.usatiuk.dhfs.remoteobj.*" />

View File

@@ -1,43 +0,0 @@
version: "3.2"
services:
dhfs1:
build: .
privileged: true
devices:
- /dev/fuse
volumes:
- $HOME/dhfs/dhfs1:/dhfs_root
- $HOME/dhfs/dhfs1_f:/dhfs_root/fuse:rshared
- ./target/quarkus-app:/app
command: "java --add-exports java.base/sun.nio.ch=ALL-UNNAMED
-Ddhfs.objects.persistence.files.root=/dhfs_root/p
-Ddhfs.objects.root=/dhfs_root/d
-Ddhfs.fuse.root=/dhfs_root/fuse -Dquarkus.http.host=0.0.0.0
-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=*:5005
-jar /app/quarkus-run.jar"
ports:
- 8080:8080
- 8081:8443
- 5005:5005
dhfs2:
build: .
privileged: true
devices:
- /dev/fuse
volumes:
- $HOME/dhfs/dhfs2:/dhfs_root
- $HOME/dhfs/dhfs2_f:/dhfs_root/fuse:rshared
- ./target/quarkus-app:/app
command: "java --add-exports java.base/sun.nio.ch=ALL-UNNAMED
--add-exports java.base/jdk.internal.access=ALL-UNNAMED
--add-opens=java.base/java.nio=ALL-UNNAMED
-Ddhfs.objects.persistence.files.root=/dhfs_root/p
-Ddhfs.objects.root=/dhfs_root/d
-Ddhfs.fuse.root=/dhfs_root/fuse -Dquarkus.http.host=0.0.0.0
-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=*:5010
-jar /app/quarkus-run.jar"
ports:
- 8090:8080
- 8091:8443
- 5010:5010

View File

@@ -23,22 +23,10 @@
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.bouncycastle</groupId>
<artifactId>bcprov-jdk18on</artifactId>
</dependency>
<dependency>
<groupId>org.bouncycastle</groupId>
<artifactId>bcpkix-jdk18on</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-security</artifactId>
</dependency>
<dependency>
<groupId>net.openhft</groupId>
<artifactId>zero-allocation-hashing</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-grpc</artifactId>
@@ -47,22 +35,6 @@
<groupId>io.quarkus</groupId>
<artifactId>quarkus-arc</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-rest</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-rest-client</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-rest-client-jsonb</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-rest-jsonb</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-scheduler</artifactId>
@@ -85,10 +57,6 @@
<artifactId>slf4j-jboss-logmanager</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-collections4</artifactId>
@@ -97,11 +65,6 @@
<groupId>org.pcollections</groupId>
<artifactId>pcollections</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-math3</artifactId>
<version>3.6.1</version>
</dependency>
<dependency>
<groupId>com.usatiuk.dhfs</groupId>
<artifactId>sync-base</artifactId>

View File

@@ -45,20 +45,13 @@ import java.util.stream.StreamSupport;
*/
@ApplicationScoped
public class DhfsFileService {
@ConfigProperty(name = "dhfs.files.target_chunk_alignment")
@ConfigProperty(name = "dhfs.files.target_chunk_alignment", defaultValue = "17")
int targetChunkAlignment;
@ConfigProperty(name = "dhfs.files.target_chunk_size")
int targetChunkSize;
@ConfigProperty(name = "dhfs.files.max_chunk_size", defaultValue = "524288")
int maxChunkSize;
@ConfigProperty(name = "dhfs.files.use_hash_for_chunks")
boolean useHashForChunks;
@ConfigProperty(name = "dhfs.files.allow_recursive_delete")
boolean allowRecursiveDelete;
@ConfigProperty(name = "dhfs.objects.ref_verification")
boolean refVerification;
@ConfigProperty(name = "dhfs.objects.write_log")
boolean writeLogging;
@Inject
Transaction curTx;
@@ -89,6 +82,10 @@ public class DhfsFileService {
return newChunk;
}
int targetChunkSize() {
return 1 << targetChunkAlignment;
}
void init(@Observes @Priority(500) StartupEvent event) {
Log.info("Initializing file service");
getTree();
@@ -259,7 +256,7 @@ public class DhfsFileService {
* @param to the new name
* @return true if the rename was successful, false otherwise
*/
public Boolean rename(String from, String to) {
public boolean rename(String from, String to) {
return jObjectTxManager.executeTx(() -> {
var node = getDirEntry(from);
JKleppmannTreeNodeMeta meta = node.meta();
@@ -280,7 +277,7 @@ public class DhfsFileService {
* @param mode the new mode
* @return true if the mode was changed successfully, false otherwise
*/
public Boolean chmod(JObjectKey uuid, long mode) {
public boolean chmod(JObjectKey uuid, long mode) {
return jObjectTxManager.executeTx(() -> {
var dent = curTx.get(JData.class, uuid).orElseThrow(() -> new StatusRuntimeExceptionNoStacktrace(Status.NOT_FOUND));
@@ -425,7 +422,7 @@ public class DhfsFileService {
* @param data the data to write
* @return the number of bytes written
*/
public Long write(JObjectKey fileUuid, long offset, ByteString data) {
public long write(JObjectKey fileUuid, long offset, ByteString data) {
return jObjectTxManager.executeTx(() -> {
if (offset < 0)
throw new StatusRuntimeException(Status.INVALID_ARGUMENT.withDescription("Offset should be more than zero: " + offset));
@@ -499,7 +496,7 @@ public class DhfsFileService {
else if (targetChunkAlignment < 0)
end = combinedSize;
else
end = Math.min(cur + targetChunkSize, combinedSize);
end = Math.min(cur + targetChunkSize(), combinedSize);
var thisChunk = pendingWrites.substring(cur, end);
@@ -534,7 +531,7 @@ public class DhfsFileService {
* @param length the new length of the file
* @return true if the truncate was successful, false otherwise
*/
public Boolean truncate(JObjectKey fileUuid, long length) {
public boolean truncate(JObjectKey fileUuid, long length) {
return jObjectTxManager.executeTx(() -> {
if (length < 0)
throw new StatusRuntimeException(Status.INVALID_ARGUMENT.withDescription("Length should be more than zero: " + length));
@@ -643,11 +640,11 @@ public class DhfsFileService {
while (cur < combinedSize) {
long end;
if (targetChunkSize <= 0)
if (targetChunkSize() <= 0)
end = combinedSize;
else {
if ((combinedSize - cur) > (targetChunkSize * 1.5)) {
end = cur + targetChunkSize;
if ((combinedSize - cur) > (targetChunkSize() * 1.5)) {
end = cur + targetChunkSize();
} else {
end = combinedSize;
}
@@ -726,7 +723,7 @@ public class DhfsFileService {
* @param mtimeMs the modification time in milliseconds
* @return true if the times were set successfully, false otherwise
*/
public Boolean setTimes(JObjectKey fileUuid, long mtimeMs) {
public boolean setTimes(JObjectKey fileUuid, long mtimeMs) {
return jObjectTxManager.executeTx(() -> {
var dent = curTx.get(JData.class, fileUuid).orElseThrow(() -> new StatusRuntimeExceptionNoStacktrace(Status.NOT_FOUND));
@@ -780,7 +777,7 @@ public class DhfsFileService {
* @param data the data to write
* @return the number of bytes written
*/
public Long write(JObjectKey fileUuid, long offset, byte[] data) {
public long write(JObjectKey fileUuid, long offset, byte[] data) {
return write(fileUuid, offset, UnsafeByteOperations.unsafeWrap(data));
}

View File

@@ -1,7 +1,5 @@
quarkus.grpc.server.use-separate-server=false
dhfs.objects.peerdiscovery.port=42069
dhfs.objects.peerdiscovery.interval=4s
dhfs.objects.peerdiscovery.broadcast=true
dhfs.objects.sync.timeout=30
dhfs.objects.sync.ping.timeout=5
dhfs.objects.invalidation.threads=16
@@ -14,16 +12,12 @@ dhfs.objects.persistence.stuff.root=${HOME}/dhfs_default/data/stuff
dhfs.fuse.debug=false
dhfs.fuse.enabled=true
dhfs.files.allow_recursive_delete=false
dhfs.files.target_chunk_size=2097152
dhfs.files.target_chunk_alignment=19
dhfs.objects.deletion.delay=1000
dhfs.objects.deletion.can-delete-retry-delay=10000
dhfs.objects.ref_verification=true
dhfs.files.use_hash_for_chunks=false
dhfs.objects.autosync.threads=16
dhfs.objects.autosync.download-all=false
dhfs.objects.move-processor.threads=16
dhfs.objects.ref-processor.threads=16
dhfs.objects.opsender.batch-size=100
dhfs.objects.lock_timeout_secs=2
dhfs.local-discovery=true

View File

@@ -21,7 +21,7 @@ abstract public class TempDataProfile implements QuarkusTestProfile {
throw new RuntimeException(e);
}
var ret = new HashMap<String, String>();
ret.put("dhfs.objects.persistence.files.root", tempDirWithPrefix.resolve("dhfs_root_test").toString());
ret.put("dhfs.objects.persistence.root", tempDirWithPrefix.resolve("dhfs_root_test").toString());
ret.put("dhfs.fuse.root", tempDirWithPrefix.resolve("dhfs_fuse_root_test").toString());
getConfigOverrides(ret);
return ret;

View File

@@ -15,7 +15,7 @@ import java.util.Objects;
@ApplicationScoped
public class TestDataCleaner {
@ConfigProperty(name = "dhfs.objects.persistence.files.root")
@ConfigProperty(name = "dhfs.objects.persistence.root")
String tempDirectory;
void init(@Observes @Priority(1) StartupEvent event) throws IOException {

View File

@@ -1,83 +0,0 @@
package com.usatiuk.dhfsfs.benchmarks;
import io.quarkus.logging.Log;
import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
import java.util.Arrays;
import java.util.function.Supplier;
public class Benchmarker {
static <T> long[] runLatency(Supplier<T> fn, int iterations) {
var out = new long[iterations];
int hash = 1;
for (int i = 0; i < iterations; i++) {
long startNanos = System.nanoTime();
var cur = fn.get();
long stopNanos = System.nanoTime();
out[i] = stopNanos - startNanos;
hash = hash * 31 + cur.hashCode();
}
System.out.println("\nHash: " + hash);
return out;
}
static <T> long[] runThroughput(Supplier<T> fn, int iterations, long iterationTime) {
var out = new long[iterations];
int hash = 1;
for (int i = 0; i < iterations; i++) {
long startMillis = System.currentTimeMillis();
long count = 0;
// FIXME: That's probably janky
while (System.currentTimeMillis() - startMillis < iterationTime) {
var res = fn.get();
count++;
hash = hash * 31 + res.hashCode();
}
System.out.println("Ran iteration " + i + "/" + iterations + " count=" + count);
out[i] = count;
}
System.out.println("\nHash: " + hash);
return out;
}
static void printStats(double[] data, String unit) {
DescriptiveStatistics stats = new DescriptiveStatistics();
for (var r : data) {
stats.addValue(r);
}
Log.info("\n" + stats +
"\n 50%: " + stats.getPercentile(50) + " " + unit +
"\n 90%: " + stats.getPercentile(90) + " " + unit +
"\n 95%: " + stats.getPercentile(95) + " " + unit +
"\n 99%: " + stats.getPercentile(99) + " " + unit +
"\n 99.9%: " + stats.getPercentile(99.9) + " " + unit +
"\n 99.99%: " + stats.getPercentile(99.99) + " " + unit
);
}
static <T> void runAndPrintMixSimple(String name, Supplier<T> fn, int latencyIterations, int thrptIterations, int thrptIterationTime, int warmupIterations, int warmupIterationTime) {
System.out.println("\n=========\n" + "Running " + name + "\n=========\n");
System.out.println("==Warmup==");
runThroughput(fn, warmupIterations, warmupIterationTime);
System.out.println("==Warmup done==");
System.out.println("==Throughput==");
var thrpt = runThroughput(fn, thrptIterations, thrptIterationTime);
printStats(Arrays.stream(thrpt).mapToDouble(o -> (double) o / 1000).toArray(), "ops/s");
System.out.println("==Throughput done==");
System.out.println("==Latency==");
var lat = runLatency(fn, latencyIterations);
printStats(Arrays.stream(lat).mapToDouble(o -> (double) o).toArray(), "ns/op");
System.out.println("==Latency done==");
System.out.println("\n=========\n" + name + " done" + "\n=========\n");
}
}

View File

@@ -1,53 +0,0 @@
package com.usatiuk.dhfsfs.benchmarks;
import com.google.protobuf.UnsafeByteOperations;
import com.usatiuk.dhfsfs.TempDataProfile;
import com.usatiuk.dhfsfs.service.DhfsFileService;
import com.usatiuk.objects.JObjectKey;
import io.quarkus.test.junit.QuarkusTest;
import io.quarkus.test.junit.TestProfile;
import jakarta.inject.Inject;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import java.nio.ByteBuffer;
import java.util.Map;
class Profiles {
public static class DhfsFuseTestProfile extends TempDataProfile {
@Override
protected void getConfigOverrides(Map<String, String> ret) {
ret.put("quarkus.log.category.\"com.usatiuk.dhfs\".level", "INFO");
ret.put("dhfs.fuse.enabled", "false");
ret.put("dhfs.objects.ref_verification", "false");
}
}
}
@QuarkusTest
@TestProfile(Profiles.DhfsFuseTestProfile.class)
public class DhfsFileBenchmarkTest {
@Inject
DhfsFileService dhfsFileService;
@Test
@Disabled
void openRootTest() {
Benchmarker.runAndPrintMixSimple("dhfsFileService.open(\"\")",
() -> {
return dhfsFileService.open("");
}, 1_000_000, 5, 1000, 5, 1000);
}
@Test
@Disabled
void writeMbTest() {
JObjectKey file = dhfsFileService.create("/writeMbTest", 0777).get();
var bb = ByteBuffer.allocateDirect(1024 * 1024);
Benchmarker.runAndPrintMixSimple("dhfsFileService.write(\"\")",
() -> {
var thing = UnsafeByteOperations.unsafeWrap(bb);
return dhfsFileService.write(file, dhfsFileService.size(file), thing);
}, 1_000, 10, 100, 1, 100);
}
}

View File

@@ -1,5 +1,4 @@
dhfs.objects.persistence.files.root=${HOME}/dhfs_data/dhfs_root_test
dhfs.objects.root=${HOME}/dhfs_data/dhfs_root_d_test
dhfs.objects.persistence.root=${HOME}/dhfs_data/dhfs_root_test
dhfs.fuse.root=${HOME}/dhfs_data/dhfs_fuse_root_test
dhfs.objects.ref_verification=true
dhfs.objects.deletion.delay=0

View File

@@ -11,8 +11,7 @@ services:
- $HOME/dhfs/dhfs1_f:/dhfs_root/fuse:rshared
- ./target/quarkus-app:/app
command: "java --add-exports java.base/sun.nio.ch=ALL-UNNAMED
-Ddhfs.objects.persistence.files.root=/dhfs_root/p
-Ddhfs.objects.root=/dhfs_root/d
-Ddhfs.objects.persistence.root=/dhfs_root/p
-Ddhfs.fuse.root=/dhfs_root/fuse -Dquarkus.http.host=0.0.0.0
-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=*:5005
-jar /app/quarkus-run.jar"
@@ -32,8 +31,7 @@ services:
command: "java --add-exports java.base/sun.nio.ch=ALL-UNNAMED
--add-exports java.base/jdk.internal.access=ALL-UNNAMED
--add-opens=java.base/java.nio=ALL-UNNAMED
-Ddhfs.objects.persistence.files.root=/dhfs_root/p
-Ddhfs.objects.root=/dhfs_root/d
-Ddhfs.objects.persistence.root=/dhfs_root/p
-Ddhfs.fuse.root=/dhfs_root/fuse -Dquarkus.http.host=0.0.0.0
-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=*:5010
-jar /app/quarkus-run.jar"

View File

@@ -23,22 +23,10 @@
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.bouncycastle</groupId>
<artifactId>bcprov-jdk18on</artifactId>
</dependency>
<dependency>
<groupId>org.bouncycastle</groupId>
<artifactId>bcpkix-jdk18on</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-security</artifactId>
</dependency>
<dependency>
<groupId>net.openhft</groupId>
<artifactId>zero-allocation-hashing</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-grpc</artifactId>
@@ -47,14 +35,6 @@
<groupId>io.quarkus</groupId>
<artifactId>quarkus-arc</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-rest</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-rest-client</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-rest-client-jsonb</artifactId>
@@ -90,10 +70,6 @@
<artifactId>slf4j-jboss-logmanager</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-collections4</artifactId>
@@ -102,11 +78,6 @@
<groupId>org.pcollections</groupId>
<artifactId>pcollections</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-math3</artifactId>
<version>3.6.1</version>
</dependency>
<dependency>
<groupId>com.usatiuk.dhfs</groupId>
<artifactId>dhfs-fs</artifactId>

View File

@@ -128,7 +128,9 @@ public class DhfsFuse extends FuseStubFS {
opts.add("-o");
opts.add("gid=" + gid);
}
Log.info("FUSE options: " + opts);
mount(Paths.get(root), false, debug, opts.toArray(String[]::new));
Log.info("Mounted");
}
void shutdown(@Observes @Priority(1) ShutdownEvent event) {
@@ -267,7 +269,7 @@ public class DhfsFuse extends FuseStubFS {
try {
var fileKey = getFromHandle(fi.fh.get());
var written = fileService.write(fileKey, offset, UnsafeByteOperations.unsafeWrap(buffer));
return written.intValue();
return Math.toIntExact(written);
} catch (Exception e) {
Log.error("When writing " + path, e);
return -ErrorCodes.EIO();

View File

@@ -1,32 +1,19 @@
quarkus.grpc.server.use-separate-server=false
dhfs.objects.peerdiscovery.port=42069
dhfs.objects.peerdiscovery.interval=4s
dhfs.objects.peerdiscovery.broadcast=true
dhfs.objects.sync.timeout=30
dhfs.objects.sync.ping.timeout=5
dhfs.objects.invalidation.threads=16
dhfs.objects.invalidation.delay=1000
dhfs.objects.reconnect_interval=5s
dhfs.objects.write_log=false
dhfs.objects.periodic-push-op-interval=5m
dhfs.fuse.root=${HOME}/dhfs_default/fuse
dhfs.objects.persistence.stuff.root=${HOME}/dhfs_default/data/stuff
dhfs.fuse.debug=false
dhfs.fuse.enabled=true
dhfs.files.allow_recursive_delete=false
dhfs.files.target_chunk_size=524288
dhfs.files.max_chunk_size=524288
dhfs.files.target_chunk_alignment=17
dhfs.objects.deletion.delay=1000
dhfs.objects.deletion.can-delete-retry-delay=10000
dhfs.objects.ref_verification=true
dhfs.files.use_hash_for_chunks=false
dhfs.objects.autosync.threads=16
dhfs.objects.autosync.threads=8
dhfs.objects.autosync.download-all=false
dhfs.objects.move-processor.threads=16
dhfs.objects.ref-processor.threads=16
dhfs.objects.opsender.batch-size=100
dhfs.objects.lock_timeout_secs=2
dhfs.local-discovery=true
dhfs.peerdiscovery.timeout=10000
quarkus.log.category."com.usatiuk".min-level=TRACE

View File

@@ -21,7 +21,7 @@ abstract public class TempDataProfile implements QuarkusTestProfile {
throw new RuntimeException(e);
}
var ret = new HashMap<String, String>();
ret.put("dhfs.objects.persistence.files.root", tempDirWithPrefix.resolve("dhfs_root_test").toString());
ret.put("dhfs.objects.persistence.root", tempDirWithPrefix.resolve("dhfs_root_test").toString());
ret.put("dhfs.fuse.root", tempDirWithPrefix.resolve("dhfs_fuse_root_test").toString());
getConfigOverrides(ret);
return ret;

View File

@@ -15,7 +15,7 @@ import java.util.Objects;
@ApplicationScoped
public class TestDataCleaner {
@ConfigProperty(name = "dhfs.objects.persistence.files.root")
@ConfigProperty(name = "dhfs.objects.persistence.root")
String tempDirectory;
void init(@Observes @Priority(1) StartupEvent event) throws IOException {

View File

@@ -10,10 +10,8 @@ import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.containers.output.WaitingConsumer;
import org.testcontainers.containers.wait.strategy.Wait;
import java.io.IOException;
import java.time.Duration;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
@@ -58,8 +56,8 @@ public class DhfsFuseIT {
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Listening"), 60, TimeUnit.SECONDS);
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Listening"), 60, TimeUnit.SECONDS);
c1uuid = container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/data/stuff/self_uuid").getStdout();
c2uuid = container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/data/stuff/self_uuid").getStdout();
c1uuid = container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/data/self_uuid").getStdout();
c2uuid = container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/data/self_uuid").getStdout();
Assertions.assertDoesNotThrow(() -> UUID.fromString(c1uuid));
Assertions.assertDoesNotThrow(() -> UUID.fromString(c2uuid));
@@ -131,13 +129,15 @@ public class DhfsFuseIT {
"tesempty\n".equals(container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testf1").getStdout()));
var client = DockerClientFactory.instance().client();
client.pauseContainerCmd(container2.getContainerId()).exec();
client.disconnectFromNetworkCmd().withContainerId(container2.getContainerId()).withNetworkId(network.getId()).exec();
client.disconnectFromNetworkCmd().withContainerId(container1.getContainerId()).withNetworkId(network.getId()).exec();
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Lost connection to"), 60, TimeUnit.SECONDS);
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo newfile > /dhfs_test/fuse/testf2").getExitCode());
client.unpauseContainerCmd(container2.getContainerId()).exec();
client.connectToNetworkCmd().withContainerId(container2.getContainerId()).withNetworkId(network.getId()).exec();
client.connectToNetworkCmd().withContainerId(container1.getContainerId()).withNetworkId(network.getId()).exec();
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
await().atMost(45, TimeUnit.SECONDS).until(() ->
@@ -155,13 +155,13 @@ public class DhfsFuseIT {
"tesempty\n".equals(container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testf1").getStdout()));
var client = DockerClientFactory.instance().client();
client.pauseContainerCmd(container2.getContainerId()).exec();
client.disconnectFromNetworkCmd().withNetworkId(network.getId()).withContainerId(container2.getContainerId()).exec();
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Lost connection to"), 60, TimeUnit.SECONDS);
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo rewritten > /dhfs_test/fuse/testf1").getExitCode());
client.unpauseContainerCmd(container2.getContainerId()).exec();
client.connectToNetworkCmd().withContainerId(container2.getContainerId()).withNetworkId(network.getId()).exec();
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
@@ -203,18 +203,34 @@ public class DhfsFuseIT {
await().atMost(45, TimeUnit.SECONDS).until(() ->
"tesempty\n".equals(container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testf1").getStdout()));
container2.stop();
var client = DockerClientFactory.instance().client();
client.disconnectFromNetworkCmd().withNetworkId(network.getId()).withContainerId(container2.getContainerId()).exec();
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("kicked"), 60, TimeUnit.SECONDS, 1);
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo tesempty2 > /dhfs_test/fuse/testf2").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "echo tesempty3 > /dhfs_test/fuse/testf3").getExitCode());
Log.info("Deleting");
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "rm /dhfs_test/fuse/testf1").getExitCode());
Log.info("Deleted");
// FIXME?
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Deleting from persistent"), 60, TimeUnit.SECONDS, 3);
client.connectToNetworkCmd().withContainerId(container2.getContainerId()).withNetworkId(network.getId()).exec();
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
await().atMost(45, TimeUnit.SECONDS).until(() ->
1 == container1.execInContainer("/bin/sh", "-c", "test -f /dhfs_test/fuse/testf1").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() ->
"tesempty2\n".equals(container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testf2").getStdout()));
await().atMost(45, TimeUnit.SECONDS).until(() ->
"tesempty3\n".equals(container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testf3").getStdout()));
await().atMost(45, TimeUnit.SECONDS).until(() ->
"tesempty2\n".equals(container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testf2").getStdout()));
await().atMost(45, TimeUnit.SECONDS).until(() ->
"tesempty3\n".equals(container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testf3").getStdout()));
}
@Test
@@ -362,12 +378,12 @@ public class DhfsFuseIT {
});
var client = DockerClientFactory.instance().client();
client.pauseContainerCmd(container1.getContainerId()).exec();
client.disconnectFromNetworkCmd().withContainerId(container1.getContainerId()).withNetworkId(network.getId()).exec();
client.disconnectFromNetworkCmd().withContainerId(container2.getContainerId()).withNetworkId(network.getId()).exec();
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "mv /dhfs_test/fuse/a /dhfs_test/fuse/b").getExitCode());
client.pauseContainerCmd(container2.getContainerId()).exec();
client.unpauseContainerCmd(container1.getContainerId()).exec();
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "mv /dhfs_test/fuse/b /dhfs_test/fuse/a").getExitCode());
client.unpauseContainerCmd(container2.getContainerId()).exec();
client.connectToNetworkCmd().withContainerId(container1.getContainerId()).withNetworkId(network.getId()).exec();
client.connectToNetworkCmd().withContainerId(container2.getContainerId()).withNetworkId(network.getId()).exec();
await().atMost(45, TimeUnit.SECONDS).until(() -> {
@@ -401,14 +417,14 @@ public class DhfsFuseIT {
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse/").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> "tesempty\n".equals(container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testf1").getStdout()));
client.pauseContainerCmd(container1.getContainerId()).exec();
client.disconnectFromNetworkCmd().withContainerId(container1.getContainerId()).withNetworkId(network.getId()).exec();
client.disconnectFromNetworkCmd().withContainerId(container2.getContainerId()).withNetworkId(network.getId()).exec();
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Lost connection to"), 60, TimeUnit.SECONDS, 1);
Log.info("Removing");
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "rm /dhfs_test/fuse/testf1").getExitCode());
client.pauseContainerCmd(container2.getContainerId()).exec();
client.unpauseContainerCmd(container1.getContainerId()).exec();
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Lost connection to"), 60, TimeUnit.SECONDS, 1);
Log.info("Moving");
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "mv /dhfs_test/fuse/testf1 /dhfs_test/fuse/testf2").getExitCode());
@@ -416,12 +432,14 @@ public class DhfsFuseIT {
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse/").getExitCode());
Log.info("Reading");
await().atMost(45, TimeUnit.SECONDS).until(() -> "tesempty\n".equals(container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testf2").getStdout()));
client.unpauseContainerCmd(container2.getContainerId()).exec();
client.connectToNetworkCmd().withContainerId(container1.getContainerId()).withNetworkId(network.getId()).exec();
client.connectToNetworkCmd().withContainerId(container2.getContainerId()).withNetworkId(network.getId()).exec();
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS, 1);
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS, 1);
// Either removed, or moved
// TODO: it always seems to be removed?
Log.info("Reading both");
await().atMost(45, TimeUnit.SECONDS).until(() -> {
var ls1 = container1.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse/");

View File

@@ -9,10 +9,8 @@ import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.containers.output.WaitingConsumer;
import org.testcontainers.containers.wait.strategy.Wait;
import java.io.IOException;
import java.time.Duration;
import java.util.List;
import java.util.Objects;
import java.util.UUID;
@@ -37,9 +35,6 @@ public class DhfsFusex3IT {
Network network;
// This calculation is somewhat racy, so keep it hardcoded for now
long emptyFileCount = 9;
@BeforeEach
void setup(TestInfo testInfo) throws IOException, InterruptedException, TimeoutException {
// TODO: Dedup
@@ -77,9 +72,9 @@ public class DhfsFusex3IT {
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Listening"), 60, TimeUnit.SECONDS);
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Listening"), 60, TimeUnit.SECONDS);
c1uuid = container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/data/stuff/self_uuid").getStdout();
c2uuid = container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/data/stuff/self_uuid").getStdout();
c3uuid = container3.execInContainer("/bin/sh", "-c", "cat /dhfs_test/data/stuff/self_uuid").getStdout();
c1uuid = container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/data/self_uuid").getStdout();
c2uuid = container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/data/self_uuid").getStdout();
c3uuid = container3.execInContainer("/bin/sh", "-c", "cat /dhfs_test/data/self_uuid").getStdout();
Log.info(container1.getContainerId() + "=" + c1uuid + " = 1");
Log.info(container2.getContainerId() + "=" + c2uuid + " = 2");
@@ -122,17 +117,6 @@ public class DhfsFusex3IT {
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS, 2);
}
private boolean checkEmpty() throws IOException, InterruptedException {
for (var container : List.of(container1, container2, container3)) {
var found = container.execInContainer("/bin/sh", "-c", "find /dhfs_test/data/objs -type f");
var foundWc = container.execInContainer("/bin/sh", "-c", "find /dhfs_test/data/objs -type f | wc -l");
Log.info("Remaining objects in " + container.getContainerId() + ": " + found.toString() + " " + foundWc.toString());
if (!(found.getExitCode() == 0 && foundWc.getExitCode() == 0 && Integer.parseInt(foundWc.getStdout().strip()) == emptyFileCount))
return false;
}
return true;
}
@AfterEach
void stop() {
Stream.of(container1, container2, container3).parallel().forEach(GenericContainer::stop);
@@ -146,25 +130,6 @@ public class DhfsFusex3IT {
await().atMost(45, TimeUnit.SECONDS).until(() -> "tesempty\n".equals(container3.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testf1").getStdout()));
}
// FIXME:
@Test
@Disabled
void largerFileDeleteTest() throws IOException, InterruptedException, TimeoutException {
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "cd /dhfs_test/fuse && dd if=/dev/urandom of=10MB.bin bs=1M count=10").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "head -c 10 /dhfs_test/fuse/10MB.bin").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container3.execInContainer("/bin/sh", "-c", "rm /dhfs_test/fuse/10MB.bin").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> checkEmpty());
}
@Test
@Disabled
void largerFileDeleteTestNoDelays() throws IOException, InterruptedException, TimeoutException {
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "cd /dhfs_test/fuse && dd if=/dev/urandom of=10MB.bin bs=1M count=10").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "head -c 10 /dhfs_test/fuse/10MB.bin").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container3.execInContainer("/bin/sh", "-c", "rm /dhfs_test/fuse/10MB.bin").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> checkEmpty());
}
@Test
void gccHelloWorldTest() throws IOException, InterruptedException, TimeoutException {
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo '#include<stdio.h>\nint main(){printf(\"hello world\"); return 0;}' > /dhfs_test/fuse/hello.c").getExitCode());
@@ -210,21 +175,22 @@ public class DhfsFusex3IT {
@Test
void dirConflictTest() throws IOException, InterruptedException, TimeoutException {
var client = DockerClientFactory.instance().client();
client.pauseContainerCmd(container1.getContainerId()).exec();
client.pauseContainerCmd(container2.getContainerId()).exec();
// Pauses needed as otherwise docker buffers some incoming packets
client.disconnectFromNetworkCmd().withContainerId(container1.getContainerId()).withNetworkId(network.getId()).exec();
client.disconnectFromNetworkCmd().withContainerId(container2.getContainerId()).withNetworkId(network.getId()).exec();
client.disconnectFromNetworkCmd().withContainerId(container3.getContainerId()).withNetworkId(network.getId()).exec();
waitingConsumer3.waitUntil(frame -> frame.getUtf8String().contains("Lost connection to"), 60, TimeUnit.SECONDS, 2);
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container3.execInContainer("/bin/sh", "-c", "echo test3 >> /dhfs_test/fuse/testf").getExitCode());
client.pauseContainerCmd(container3.getContainerId()).exec();
client.unpauseContainerCmd(container2.getContainerId()).exec();
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Lost connection to"), 60, TimeUnit.SECONDS, 2);
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "echo test2 >> /dhfs_test/fuse/testf").getExitCode());
client.pauseContainerCmd(container2.getContainerId()).exec();
client.unpauseContainerCmd(container1.getContainerId()).exec();
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Lost connection to"), 60, TimeUnit.SECONDS, 2);
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo test1 >> /dhfs_test/fuse/testf").getExitCode());
client.unpauseContainerCmd(container2.getContainerId()).exec();
client.unpauseContainerCmd(container3.getContainerId()).exec();
client.connectToNetworkCmd().withContainerId(container1.getContainerId()).withNetworkId(network.getId()).exec();
client.connectToNetworkCmd().withContainerId(container2.getContainerId()).withNetworkId(network.getId()).exec();
client.connectToNetworkCmd().withContainerId(container3.getContainerId()).withNetworkId(network.getId()).exec();
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS, 2);
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS, 2);
waitingConsumer3.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS, 2);

View File

@@ -66,7 +66,7 @@ public class DhfsImage implements Future<String> {
.run("apt update && apt install -y libfuse2 curl gcc")
.copy("/app", "/app")
.copy("/libs", "/libs")
.cmd("java", "-ea", "-Xmx256M", "-XX:TieredStopAtLevel=1", "-XX:+UseParallelGC",
.cmd("java", "-ea", "-Xmx512M", "-XX:+UseParallelGC",
"--add-exports", "java.base/sun.nio.ch=ALL-UNNAMED",
"--add-exports", "java.base/jdk.internal.access=ALL-UNNAMED",
"--add-opens=java.base/java.nio=ALL-UNNAMED",
@@ -87,7 +87,7 @@ public class DhfsImage implements Future<String> {
"-Dquarkus.log.category.\"com.usatiuk.objects.transaction\".level=INFO",
"-Ddhfs.objects.periodic-push-op-interval=5s",
"-Ddhfs.fuse.root=/dhfs_test/fuse",
"-Ddhfs.objects.persistence.files.root=/dhfs_test/data",
"-Ddhfs.objects.persistence.root=/dhfs_test/data",
"-Ddhfs.objects.persistence.stuff.root=/dhfs_test/data/stuff",
"-jar", "/app/quarkus-run.jar")
.run("mkdir /dhfs_test && chmod 777 /dhfs_test")

View File

@@ -72,8 +72,8 @@ public class KillIT {
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Listening"), 60, TimeUnit.SECONDS);
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Listening"), 60, TimeUnit.SECONDS);
c1uuid = container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/data/stuff/self_uuid").getStdout();
c2uuid = container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/data/stuff/self_uuid").getStdout();
c1uuid = container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/data/self_uuid").getStdout();
c2uuid = container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/data/self_uuid").getStdout();
Assertions.assertDoesNotThrow(() -> UUID.fromString(c1uuid));
Assertions.assertDoesNotThrow(() -> UUID.fromString(c2uuid));

View File

@@ -12,12 +12,10 @@ import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.containers.output.WaitingConsumer;
import org.testcontainers.containers.wait.strategy.Wait;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.time.Duration;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.*;
@@ -25,6 +23,7 @@ import java.util.stream.Stream;
import static org.awaitility.Awaitility.await;
@Disabled
public class LazyFsIT {
GenericContainer<?> container1;
GenericContainer<?> container2;
@@ -84,8 +83,8 @@ public class LazyFsIT {
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Listening"), 60, TimeUnit.SECONDS);
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Listening"), 60, TimeUnit.SECONDS);
c1uuid = container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/data/stuff/self_uuid").getStdout();
c2uuid = container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/data/stuff/self_uuid").getStdout();
c1uuid = container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/data/self_uuid").getStdout();
c2uuid = container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/data/self_uuid").getStdout();
Assertions.assertDoesNotThrow(() -> UUID.fromString(c1uuid));
Assertions.assertDoesNotThrow(() -> UUID.fromString(c2uuid));
@@ -125,7 +124,7 @@ public class LazyFsIT {
}
private void checkConsistency(String testName) {
await().atMost(45, TimeUnit.SECONDS).until(() -> {
await().atMost(120, TimeUnit.SECONDS).until(() -> {
var ls1 = container1.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse");
var cat1 = container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/*");
var ls2 = container2.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse");

View File

@@ -66,8 +66,8 @@ public class ResyncIT {
@Test
void readWriteFileTest() throws IOException, InterruptedException, TimeoutException {
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo tesempty > /dhfs_test/fuse/testf1").getExitCode());
c1uuid = container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/data/stuff/self_uuid").getStdout();
c2uuid = container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/data/stuff/self_uuid").getStdout();
c1uuid = container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/data/self_uuid").getStdout();
c2uuid = container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/data/self_uuid").getStdout();
Assertions.assertDoesNotThrow(() -> UUID.fromString(c1uuid));
Assertions.assertDoesNotThrow(() -> UUID.fromString(c2uuid));
@@ -106,8 +106,8 @@ public class ResyncIT {
foundWc = container2.execInContainer("/bin/sh", "-c", "find /dhfs_test/fuse -type f | wc -l");
Assertions.assertEquals(200, Integer.valueOf(foundWc.getStdout().strip()));
c1uuid = container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/data/stuff/self_uuid").getStdout();
c2uuid = container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/data/stuff/self_uuid").getStdout();
c1uuid = container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/data/self_uuid").getStdout();
c2uuid = container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/data/self_uuid").getStdout();
Assertions.assertDoesNotThrow(() -> UUID.fromString(c1uuid));
Assertions.assertDoesNotThrow(() -> UUID.fromString(c2uuid));
@@ -146,8 +146,8 @@ public class ResyncIT {
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "mv /dhfs_test/fuse/testd1 /dhfs_test/fuse/testd2").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo tesempty2 > /dhfs_test/fuse/testd2/testf2").getExitCode());
c1uuid = container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/data/stuff/self_uuid").getStdout();
c2uuid = container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/data/stuff/self_uuid").getStdout();
c1uuid = container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/data/self_uuid").getStdout();
c2uuid = container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/data/self_uuid").getStdout();
Assertions.assertDoesNotThrow(() -> UUID.fromString(c1uuid));
Assertions.assertDoesNotThrow(() -> UUID.fromString(c2uuid));

View File

@@ -1,5 +1,4 @@
dhfs.objects.persistence.files.root=${HOME}/dhfs_data/dhfs_root_test
dhfs.objects.root=${HOME}/dhfs_data/dhfs_root_d_test
dhfs.objects.persistence.root=${HOME}/dhfs_data/dhfs_root_test
dhfs.fuse.root=${HOME}/dhfs_data/dhfs_fuse_root_test
dhfs.objects.ref_verification=true
dhfs.objects.deletion.delay=0

View File

@@ -18,6 +18,11 @@
</properties>
<dependencies>
<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
<version>3.2.2</version>
</dependency>
<dependency>
<groupId>net.jqwik</groupId>
<artifactId>jqwik</artifactId>
@@ -36,10 +41,6 @@
<groupId>io.quarkus</groupId>
<artifactId>quarkus-grpc</artifactId>
</dependency>
<dependency>
<groupId>net.openhft</groupId>
<artifactId>zero-allocation-hashing</artifactId>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>

View File

@@ -1,9 +1,12 @@
package com.usatiuk.objects.stores;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.usatiuk.objects.JDataVersionedWrapper;
import com.usatiuk.objects.JDataVersionedWrapperLazy;
import com.usatiuk.objects.JObjectKey;
import com.usatiuk.objects.iterators.*;
import com.usatiuk.objects.iterators.CloseableKvIterator;
import com.usatiuk.objects.iterators.IteratorStart;
import com.usatiuk.objects.iterators.MaybeTombstone;
import com.usatiuk.objects.snapshot.Snapshot;
import com.usatiuk.utils.ListUtils;
import io.quarkus.logging.Log;
@@ -14,15 +17,11 @@ import jakarta.enterprise.event.Observes;
import jakarta.inject.Inject;
import org.apache.commons.lang3.tuple.Pair;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.pcollections.TreePMap;
import javax.annotation.Nonnull;
import java.time.Duration;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
/**
* CachingObjectPersistentStore is a caching layer for the SerializingObjectPersistentStore
@@ -31,61 +30,49 @@ import java.util.concurrent.atomic.AtomicReference;
*/
@ApplicationScoped
public class CachingObjectPersistentStore {
private final AtomicReference<Cache> _cache;
@Inject
SerializingObjectPersistentStore delegate;
@ConfigProperty(name = "dhfs.objects.lru.print-stats")
boolean printStats;
private ExecutorService _statusExecutor;
private AtomicLong _cached = new AtomicLong();
private AtomicLong _cacheTries = new AtomicLong();
// private ExecutorService _statusExecutor;
private final com.github.benmanes.caffeine.cache.Cache<Pair<Long, JObjectKey>, JDataVersionedWrapper> _cache;
public CachingObjectPersistentStore(@ConfigProperty(name = "dhfs.objects.lru.limit") int sizeLimit) {
_cache = new AtomicReference<>(
new Cache(TreePMap.empty(), 0, -1, sizeLimit)
);
_cache = Caffeine.newBuilder()
.maximumWeight(sizeLimit)
.weigher((Pair<Long, JObjectKey> key, JDataVersionedWrapper value) -> value.estimateSize())
.expireAfterWrite(Duration.ofMinutes(5)).build();
}
void init(@Observes @Priority(110) StartupEvent event) {
try (var s = delegate.getSnapshot()) {
_cache.set(_cache.get().withVersion(s.id()));
}
if (printStats) {
_statusExecutor = Executors.newSingleThreadExecutor();
_statusExecutor.submit(() -> {
try {
while (true) {
Log.infov("Cache status: size=" + _cache.get().size() / 1024 / 1024 + "MB" + " cache success ratio: " + (_cached.get() / (double) _cacheTries.get()));
_cached.set(0);
_cacheTries.set(0);
Thread.sleep(1000);
}
} catch (InterruptedException ignored) {
}
});
}
// if (printStats) {
// _statusExecutor = Executors.newSingleThreadExecutor();
// _statusExecutor.submit(() -> {
// try {
// while (true) {
// Log.infov("Cache status: size=" + _cache.estimatedSize() / 1024 / 1024 + "MB" + " cache success ratio: " + (_cached.get() / (double) _cacheTries.get()));
// _cached.set(0);
// _cacheTries.set(0);
// Thread.sleep(1000);
// }
// } catch (InterruptedException ignored) {
// }
// });
// }
}
/**
* Commit the transaction to the underlying store and update the cache.
* Once this function returns, the transaction is committed and the cache is updated.
*
* @param objs the transaction manifest object
* @param txId the transaction ID
*/
public void commitTx(TxManifestObj<? extends JDataVersionedWrapper> objs, long txId) {
Log.tracev("Committing: {0} writes, {1} deletes", objs.written().size(), objs.deleted().size());
var cache = _cache.get();
for (var write : objs.written()) {
cache = cache.withPut(write.getLeft(), Optional.of(write.getRight()));
}
for (var del : objs.deleted()) {
cache = cache.withPut(del, Optional.empty());
}
cache = cache.withVersion(txId);
delegate.commitTx(objs, txId);
_cache.set(cache);
Log.tracev("Committed: {0} writes, {1} deletes", objs.written().size(), objs.deleted().size());
}
@@ -94,60 +81,28 @@ public class CachingObjectPersistentStore {
* Get a snapshot of underlying store and the cache.
* Objects are read from the cache if possible, if not, they are read from the underlying store,
* then possibly lazily cached when their data is accessed.
*
* @return a snapshot of the cached store
*/
public Snapshot<JObjectKey, JDataVersionedWrapper> getSnapshot() {
while (true) {
var cache = _cache.get();
if (cache == null)
return delegate.getSnapshot();
Cache curCache = null;
Snapshot<JObjectKey, JDataVersionedWrapper> backing = null;
try {
curCache = _cache.get();
backing = delegate.getSnapshot();
if (curCache.version() != backing.id()) {
backing.close();
backing = null;
continue;
}
Snapshot<JObjectKey, JDataVersionedWrapper> finalBacking = backing;
Cache finalCurCache = curCache;
return new Snapshot<JObjectKey, JDataVersionedWrapper>() {
private final Cache _curCache = finalCurCache;
private final Snapshot<JObjectKey, JDataVersionedWrapper> _backing = finalBacking;
private boolean _invalid = false;
private boolean _closed = false;
private void doCache(JObjectKey key, Optional<JDataVersionedWrapper> obj) {
_cacheTries.incrementAndGet();
if (_invalid)
return;
var globalCache = _cache.get();
if (globalCache.version() != _curCache.version()) {
_invalid = true;
return;
}
var newCache = globalCache.withPut(key, obj);
if (_cache.compareAndSet(globalCache, newCache))
_cached.incrementAndGet();
private void doCache(JObjectKey key, JDataVersionedWrapper obj) {
var cacheKey = Pair.of(obj.version(), key);
_cache.put(cacheKey, obj);
}
private void maybeCache(JObjectKey key, Optional<JDataVersionedWrapper> obj) {
if (obj.isEmpty()) {
doCache(key, obj);
return;
}
var wrapper = obj.get();
if (!(wrapper instanceof JDataVersionedWrapperLazy lazy)) {
private void maybeCache(JObjectKey key, JDataVersionedWrapper obj) {
if (!(obj instanceof JDataVersionedWrapperLazy lazy)) {
doCache(key, obj);
return;
}
@@ -164,29 +119,25 @@ public class CachingObjectPersistentStore {
@Override
public List<CloseableKvIterator<JObjectKey, MaybeTombstone<JDataVersionedWrapper>>> getIterator(IteratorStart start, JObjectKey key) {
return ListUtils.prependAndMap(
new NavigableMapKvIterator<JObjectKey, MaybeTombstone<JDataVersionedWrapper>>(_curCache.map(), start, key),
return ListUtils.map(
_backing.getIterator(start, key),
i -> new CachingKvIterator((CloseableKvIterator<JObjectKey, JDataVersionedWrapper>) (CloseableKvIterator<JObjectKey, ?>) i)
);
}
private JDataVersionedWrapper tryGetCached(JObjectKey key, JDataVersionedWrapper obj) {
var cached = _cache.getIfPresent(Pair.of(obj.version(), key));
if (cached != null) {
return cached;
}
maybeCache(key, obj);
return obj;
}
@Nonnull
@Override
public Optional<JDataVersionedWrapper> readObject(JObjectKey name) {
var cached = _curCache.map().get(name);
if (cached != null) {
return switch (cached) {
case CacheEntryPresent data -> Optional.of(data.value());
case CacheEntryMiss tombstone -> {
yield Optional.empty();
}
default -> throw new IllegalStateException("Unexpected value: " + cached);
};
}
var read = _backing.readObject(name);
maybeCache(name, read);
return read;
return _backing.readObject(name).map(o -> tryGetCached(name, o));
}
@Override
@@ -235,8 +186,8 @@ public class CachingObjectPersistentStore {
@Override
public Pair<JObjectKey, MaybeTombstone<JDataVersionedWrapper>> prev() {
var prev = _delegate.prev();
maybeCache(prev.getKey(), Optional.of(prev.getValue()));
return (Pair<JObjectKey, MaybeTombstone<JDataVersionedWrapper>>) (Pair<JObjectKey, ?>) prev;
var cached = tryGetCached(prev.getKey(), prev.getValue());
return Pair.of(prev.getKey(), cached);
}
@Override
@@ -252,8 +203,8 @@ public class CachingObjectPersistentStore {
@Override
public Pair<JObjectKey, MaybeTombstone<JDataVersionedWrapper>> next() {
var next = _delegate.next();
maybeCache(next.getKey(), Optional.of(next.getValue()));
return (Pair<JObjectKey, MaybeTombstone<JDataVersionedWrapper>>) (Pair<JObjectKey, ?>) next;
var cached = tryGetCached(next.getKey(), next.getValue());
return Pair.of(next.getKey(), cached);
}
}
};
@@ -265,54 +216,4 @@ public class CachingObjectPersistentStore {
}
}
}
private interface CacheEntry extends MaybeTombstone<JDataVersionedWrapper> {
int size();
}
private record Cache(TreePMap<JObjectKey, CacheEntry> map,
int size,
long version,
int sizeLimit) {
public Cache withPut(JObjectKey key, Optional<JDataVersionedWrapper> obj) {
var entry = obj.<CacheEntry>map(o -> new CacheEntryPresent(o, o.estimateSize())).orElse(new CacheEntryMiss());
int newSize = size() + entry.size();
var old = map.get(key);
if (old != null)
newSize -= old.size();
TreePMap<JObjectKey, CacheEntry> newCache = map();
while (newSize > sizeLimit) {
var del = newCache.firstEntry();
newCache = newCache.minusFirstEntry();
newSize -= del.getValue().size();
}
newCache = newCache.plus(key, entry);
return new Cache(
newCache,
newSize,
version,
sizeLimit
);
}
public Cache withVersion(long version) {
return new Cache(map, size, version, sizeLimit);
}
}
private record CacheEntryPresent(JDataVersionedWrapper value,
int size) implements CacheEntry, Data<JDataVersionedWrapper> {
}
private record CacheEntryMiss() implements CacheEntry, Tombstone<JDataVersionedWrapper> {
@Override
public int size() {
return 64;
}
}
}

View File

@@ -58,7 +58,7 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore {
private Dbi<ByteBuffer> _db;
private boolean _ready = false;
public LmdbObjectPersistentStore(@ConfigProperty(name = "dhfs.objects.persistence.files.root") String root) {
public LmdbObjectPersistentStore(@ConfigProperty(name = "dhfs.objects.persistence.root") String root) {
_root = Path.of(root).resolve("objects");
}

View File

@@ -307,6 +307,15 @@ public class WritebackObjectPersistentStore {
return r -> asyncFence(bundleId, r);
}
/**
* Get the last committed transaction ID.
*
* @return the last committed transaction ID
*/
public long getLastCommitId() {
return _lastCommittedId.get();
}
/**
* Get a snapshot of the persistent store, including the pending writes.
*

View File

@@ -165,7 +165,6 @@ public class TransactionService {
toUnlock.add(lock);
}
commitSnapshot = writebackObjectPersistentStore.getSnapshot();
} else {
Log.trace("Committing transaction - no changes");
@@ -201,7 +200,10 @@ public class TransactionService {
Log.trace("Committing transaction start");
var snapshotId = tx.snapshot().id();
if (snapshotId != commitSnapshot.id()) {
// All dependencies are locked and could not be changed concurrently now
if (snapshotId != writebackObjectPersistentStore.getLastCommitId()) {
commitSnapshot = writebackObjectPersistentStore.getSnapshot();
for (var read : readSet.entrySet()) {
var current = commitSnapshot.readObject(read.getKey());

View File

@@ -3,7 +3,7 @@ dhfs.objects.writeback.limit=16777216
dhfs.objects.lru.limit=67108864
dhfs.objects.lru.print-stats=false
dhfs.objects.lock_timeout_secs=15
dhfs.objects.persistence.files.root=${HOME}/dhfs_default/data/objs
dhfs.objects.persistence.root=${HOME}/dhfs_default/data/objs
dhfs.objects.persistence.snapshot-extra-checks=false
dhfs.objects.last-seen.update=60
dhfs.objects.last-seen.timeout=43200

View File

@@ -21,7 +21,7 @@ public class TempDataProfile implements QuarkusTestProfile {
throw new RuntimeException(e);
}
var ret = new HashMap<String, String>();
ret.put("dhfs.objects.persistence.files.root", tempDirWithPrefix.resolve("dhfs_root_test").toString());
ret.put("dhfs.objects.persistence.root", tempDirWithPrefix.resolve("dhfs_root_test").toString());
ret.put("dhfs.fuse.root", tempDirWithPrefix.resolve("dhfs_fuse_root_test").toString());
ret.put("dhfs.objects.persistence", "lmdb");
getConfigOverrides(ret);

View File

@@ -15,7 +15,7 @@ import java.util.Objects;
@ApplicationScoped
public class TestDataCleaner {
@ConfigProperty(name = "dhfs.objects.persistence.files.root")
@ConfigProperty(name = "dhfs.objects.persistence.root")
String tempDirectory;
void init(@Observes @Priority(1) StartupEvent event) throws IOException {

View File

@@ -28,8 +28,8 @@
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<quarkus.platform.artifact-id>quarkus-bom</quarkus.platform.artifact-id>
<quarkus.platform.group-id>io.quarkus.platform</quarkus.platform.group-id>
<quarkus.platform.version>3.20.0</quarkus.platform.version>
<surefire-plugin.version>3.5.2</surefire-plugin.version>
<quarkus.platform.version>3.27.0</quarkus.platform.version>
<surefire-plugin.version>3.5.4</surefire-plugin.version>
<dhfs.native-libs-dir>${project.parent.build.outputDirectory}/native</dhfs.native-libs-dir>
</properties>
@@ -54,11 +54,6 @@
<version>1.18.34</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>net.openhft</groupId>
<artifactId>zero-allocation-hashing</artifactId>
<version>0.16</version>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
@@ -68,7 +63,7 @@
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
<version>5.9.1</version>
<version>5.13.4</version>
<scope>test</scope>
</dependency>
<dependency>

View File

@@ -1,43 +0,0 @@
version: "3.2"
services:
dhfs1:
build: .
privileged: true
devices:
- /dev/fuse
volumes:
- $HOME/dhfs/dhfs1:/dhfs_root
- $HOME/dhfs/dhfs1_f:/dhfs_root/fuse:rshared
- ./target/quarkus-app:/app
command: "java --add-exports java.base/sun.nio.ch=ALL-UNNAMED
-Ddhfs.objects.persistence.files.root=/dhfs_root/p
-Ddhfs.objects.root=/dhfs_root/d
-Ddhfs.fuse.root=/dhfs_root/fuse -Dquarkus.http.host=0.0.0.0
-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=*:5005
-jar /app/quarkus-run.jar"
ports:
- 8080:8080
- 8081:8443
- 5005:5005
dhfs2:
build: .
privileged: true
devices:
- /dev/fuse
volumes:
- $HOME/dhfs/dhfs2:/dhfs_root
- $HOME/dhfs/dhfs2_f:/dhfs_root/fuse:rshared
- ./target/quarkus-app:/app
command: "java --add-exports java.base/sun.nio.ch=ALL-UNNAMED
--add-exports java.base/jdk.internal.access=ALL-UNNAMED
--add-opens=java.base/java.nio=ALL-UNNAMED
-Ddhfs.objects.persistence.files.root=/dhfs_root/p
-Ddhfs.objects.root=/dhfs_root/d
-Ddhfs.fuse.root=/dhfs_root/fuse -Dquarkus.http.host=0.0.0.0
-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=*:5010
-jar /app/quarkus-run.jar"
ports:
- 8090:8080
- 8091:8443
- 5010:5010

View File

@@ -13,10 +13,6 @@
</parent>
<dependencies>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-smallrye-openapi</artifactId>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
@@ -39,10 +35,6 @@
<groupId>io.quarkus</groupId>
<artifactId>quarkus-security</artifactId>
</dependency>
<dependency>
<groupId>net.openhft</groupId>
<artifactId>zero-allocation-hashing</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-grpc</artifactId>
@@ -59,14 +51,6 @@
<groupId>io.quarkus</groupId>
<artifactId>quarkus-rest-client</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-rest-client-jsonb</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-rest-jsonb</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-scheduler</artifactId>
@@ -101,11 +85,6 @@
<groupId>org.pcollections</groupId>
<artifactId>pcollections</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-math3</artifactId>
<version>3.6.1</version>
</dependency>
<dependency>
<groupId>com.usatiuk</groupId>
<artifactId>kleppmanntree</artifactId>

View File

@@ -19,7 +19,7 @@ import java.nio.file.Paths;
@ApplicationScoped
public class ShutdownChecker {
private static final String dataFileName = "running";
@ConfigProperty(name = "dhfs.objects.persistence.files.root")
@ConfigProperty(name = "dhfs.objects.persistence.root")
String dataRoot;
boolean _cleanShutdown = true;
boolean _initialized = false;

View File

@@ -32,7 +32,7 @@ public class DeferredInvalidationQueueService implements PeerConnectedEventListe
ReachablePeerManager reachablePeerManager;
@Inject
InvalidationQueueService invalidationQueueService;
@ConfigProperty(name = "dhfs.objects.persistence.files.root")
@ConfigProperty(name = "dhfs.objects.persistence.root")
String dataRoot;
private DeferredInvalidationQueueData _persistentData = new DeferredInvalidationQueueData();

View File

@@ -23,6 +23,8 @@ import java.net.*;
public class LocalPeerDiscoveryBroadcaster {
@Inject
PersistentPeerDataService persistentPeerDataService;
@Inject
LocalPeerDiscoveryConfig localPeerDiscoveryConfig;
@ConfigProperty(name = "quarkus.http.port")
int ourPort;
@@ -30,17 +32,11 @@ public class LocalPeerDiscoveryBroadcaster {
@ConfigProperty(name = "quarkus.http.ssl-port")
int ourSecurePort;
@ConfigProperty(name = "dhfs.objects.peerdiscovery.port")
int broadcastPort;
@ConfigProperty(name = "dhfs.objects.peerdiscovery.broadcast")
boolean enabled;
private DatagramSocket _socket;
@Startup
void init() throws SocketException {
if (!enabled) {
if (!localPeerDiscoveryConfig.broadcast()) {
return;
}
_socket = new DatagramSocket();
@@ -48,7 +44,7 @@ public class LocalPeerDiscoveryBroadcaster {
}
void shutdown(@Observes @Priority(10) ShutdownEvent event) {
if (!enabled) {
if (!localPeerDiscoveryConfig.broadcast()) {
return;
}
_socket.close();
@@ -56,7 +52,7 @@ public class LocalPeerDiscoveryBroadcaster {
@Scheduled(every = "${dhfs.objects.peerdiscovery.interval}", concurrentExecution = Scheduled.ConcurrentExecution.SKIP, skipExecutionIf = Scheduled.ApplicationNotRunning.class)
public void broadcast() throws Exception {
if (!enabled) {
if (!localPeerDiscoveryConfig.broadcast()) {
return;
}
var sendData = PeerDiscoveryInfo.newBuilder()
@@ -69,7 +65,7 @@ public class LocalPeerDiscoveryBroadcaster {
DatagramPacket sendPacket
= new DatagramPacket(sendBytes, sendBytes.length,
InetAddress.getByName("255.255.255.255"), broadcastPort);
InetAddress.getByName("255.255.255.255"), localPeerDiscoveryConfig.port());
_socket.send(sendPacket);
@@ -92,15 +88,13 @@ public class LocalPeerDiscoveryBroadcaster {
}
try {
sendPacket = new DatagramPacket(sendBytes, sendBytes.length, broadcast, broadcastPort);
sendPacket = new DatagramPacket(sendBytes, sendBytes.length, broadcast, localPeerDiscoveryConfig.port());
_socket.send(sendPacket);
Log.tracev("Broadcast sent to: {0}, at: {1}", broadcast.getHostAddress(), networkInterface.getDisplayName());
} catch (Exception ignored) {
continue;
}
// Log.trace(getClass().getName() + "Broadcast sent to: " + broadcast.getHostAddress()
// + ", at: " + networkInterface.getDisplayName());
}
}
}

View File

@@ -14,7 +14,6 @@ import jakarta.annotation.Priority;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
import jakarta.inject.Inject;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import java.net.*;
import java.nio.ByteBuffer;
@@ -28,17 +27,19 @@ import java.nio.ByteBuffer;
public class LocalPeerDiscoveryClient {
@Inject
PeerDiscoveryDirectory peerDiscoveryDirectory;
@ConfigProperty(name = "dhfs.objects.peerdiscovery.broadcast")
boolean enabled;
@Inject
LocalPeerDiscoveryConfig localPeerDiscoveryConfig;
;
private Thread _clientThread;
private DatagramSocket _socket;
@Startup
void init() throws SocketException, UnknownHostException {
if (!enabled) {
if (!localPeerDiscoveryConfig.broadcast()) {
return;
}
_socket = new DatagramSocket(42069, InetAddress.getByName("0.0.0.0"));
_socket = new DatagramSocket(localPeerDiscoveryConfig.port(), InetAddress.getByName("0.0.0.0"));
_socket.setBroadcast(true);
_clientThread = new Thread(this::client);
@@ -47,7 +48,7 @@ public class LocalPeerDiscoveryClient {
}
void shutdown(@Observes @Priority(10) ShutdownEvent event) throws InterruptedException {
if (!enabled) {
if (!localPeerDiscoveryConfig.broadcast()) {
return;
}
_socket.close();

View File

@@ -0,0 +1,16 @@
package com.usatiuk.dhfs.peerdiscovery.local;
import io.smallrye.config.ConfigMapping;
import io.smallrye.config.WithDefault;
@ConfigMapping(prefix = "dhfs.objects.peerdiscovery")
public interface LocalPeerDiscoveryConfig {
@WithDefault("42262")
int port();
@WithDefault("true")
boolean broadcast();
String interval();
}

View File

@@ -60,7 +60,7 @@ public class PersistentPeerDataService {
@ConfigProperty(name = "dhfs.peerdiscovery.preset-uuid")
Optional<String> presetUuid;
@ConfigProperty(name = "dhfs.objects.persistence.stuff.root")
@ConfigProperty(name = "dhfs.objects.persistence.root")
String stuffRoot;
private PeerId _selfUuid;

View File

@@ -4,6 +4,7 @@ import com.usatiuk.dhfs.peerdiscovery.IpPeerAddress;
import com.usatiuk.dhfs.peerdiscovery.PeerAddress;
import io.quarkus.rest.client.reactive.QuarkusRestClientBuilder;
import jakarta.enterprise.context.ApplicationScoped;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import java.net.URI;
import java.util.concurrent.TimeUnit;
@@ -13,6 +14,10 @@ import java.util.concurrent.TimeUnit;
*/
@ApplicationScoped
public class PeerSyncApiClientDynamic {
@ConfigProperty(name = "dhfs.objects.sync.peer-sync-api.timeout", defaultValue = "5")
int timeout;
/**
* Queries peer about its information.
*
@@ -29,8 +34,8 @@ public class PeerSyncApiClientDynamic {
private ApiPeerInfo getSelfInfo(String address, int port) {
var client = QuarkusRestClientBuilder.newBuilder()
.baseUri(URI.create("http://" + address + ":" + port))
.connectTimeout(1, TimeUnit.SECONDS)
.readTimeout(1, TimeUnit.SECONDS)
.connectTimeout(timeout, TimeUnit.SECONDS)
.readTimeout(timeout, TimeUnit.SECONDS)
.build(PeerSyncApiClient.class);
return client.getSelfInfo();
}

View File

@@ -1,13 +1,32 @@
package com.usatiuk.dhfs.rpc;
import com.usatiuk.dhfs.ProtoSerializer;
import com.usatiuk.dhfs.autosync.AutosyncProcessor;
import com.usatiuk.dhfs.invalidation.Op;
import com.usatiuk.dhfs.invalidation.OpHandlerService;
import com.usatiuk.dhfs.peersync.PeerId;
import com.usatiuk.dhfs.peersync.ReachablePeerManager;
import com.usatiuk.dhfs.persistence.JObjectKeyP;
import com.usatiuk.dhfs.remoteobj.*;
import com.usatiuk.dhfs.repository.*;
import com.usatiuk.dhfs.syncmap.DtoMapperService;
import com.usatiuk.objects.JObjectKey;
import com.usatiuk.objects.transaction.Transaction;
import com.usatiuk.objects.transaction.TransactionHandle;
import com.usatiuk.objects.transaction.TransactionManager;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.quarkus.grpc.GrpcService;
import io.quarkus.logging.Log;
import io.quarkus.security.identity.SecurityIdentity;
import io.smallrye.common.annotation.Blocking;
import io.smallrye.mutiny.Uni;
import jakarta.annotation.security.RolesAllowed;
import jakarta.inject.Inject;
import org.apache.commons.lang3.tuple.Pair;
import java.util.ArrayList;
import java.util.concurrent.atomic.AtomicLong;
// Note: RunOnVirtualThread hangs somehow
@GrpcService
@@ -15,8 +34,120 @@ import jakarta.inject.Inject;
public class RemoteObjectServiceServer implements DhfsObjectSyncGrpc {
@Inject
SecurityIdentity identity;
@Inject
RemoteObjectServiceServerImpl remoteObjectServiceServerImpl;
TransactionManager txm;
@Inject
Transaction curTx;
@Inject
ProtoSerializer<OpP, Op> opProtoSerializer;
@Inject
ProtoSerializer<GetObjectReply, ReceivedObject> receivedObjectProtoSerializer;
@Inject
RemoteTransaction remoteTx;
@Inject
OpHandlerService opHandlerService;
@Inject
DtoMapperService dtoMapperService;
@Inject
AutosyncProcessor autosyncProcessor;
public Uni<GetObjectReply> getObject(PeerId from, GetObjectRequest request) {
Log.info("<-- getObject: " + request.getName() + " from " + from);
Pair<RemoteObjectMeta, JDataRemoteDto> got = txm.run(() -> {
var meta = remoteTx.getMeta(JObjectKey.of(request.getName().getName())).orElse(null);
var obj = remoteTx.getDataLocal(JDataRemote.class, JObjectKey.of(request.getName().getName())).orElse(null);
if (meta != null && !meta.seen())
curTx.put(meta.withSeen(true));
if (obj != null)
for (var ref : obj.collectRefsTo()) {
var refMeta = remoteTx.getMeta(ref).orElse(null);
if (refMeta != null && !refMeta.seen())
curTx.put(refMeta.withSeen(true));
}
return Pair.of(meta, obj == null ? null : dtoMapperService.toDto(obj, obj.dtoClass()));
});
if ((got.getValue() != null) && (got.getKey() == null)) {
Log.error("Inconsistent state for object meta: " + request.getName());
throw new StatusRuntimeException(Status.INTERNAL);
}
if (got.getValue() == null) {
Log.info("<-- getObject NOT FOUND: " + request.getName() + " from " + from);
throw new StatusRuntimeException(Status.NOT_FOUND);
}
var serialized = receivedObjectProtoSerializer.serialize(new ReceivedObject(got.getKey().changelog(), got.getRight()));
return Uni.createFrom().item(serialized);
}
public Uni<CanDeleteReply> canDelete(PeerId from, CanDeleteRequest request) {
Log.infov("<-- canDelete: {0} from {1}", request, from);
var builder = CanDeleteReply.newBuilder();
txm.run(() -> {
var obj = curTx.get(RemoteObjectMeta.class, JObjectKey.of(request.getName().getName())).orElse(null);
if (obj == null) {
builder.setDeletionCandidate(true);
return;
}
builder.setDeletionCandidate(!obj.frozen() && obj.refsFrom().isEmpty());
if (!builder.getDeletionCandidate()) {
for (var r : obj.refsFrom()) {
builder.addReferrers(JObjectKeyP.newBuilder().setName(r.obj().toString()).build());
curTx.onCommit(() -> autosyncProcessor.add(r.obj()));
}
}
if (!builder.getDeletionCandidate()) {
Log.infov("Not deletion candidate: {0}, {1} (asked from {2})", obj, builder, from);
}
});
return Uni.createFrom().item(builder.build());
}
public Uni<OpPushReply> opPush(PeerId from, OpPushRequest request) {
if (request.getMsgCount() == 0) {
Log.infov("<-- opPush: empty from {0}", from);
return Uni.createFrom().item(OpPushReply.getDefaultInstance());
}
var handles = new ArrayList<TransactionHandle>();
try {
var ops = request.getMsgList().stream().map(opProtoSerializer::deserialize).toList();
for (var op : ops) {
Log.infov("<-- opPush: {0} from {1}", op, from);
var handle = txm.run(() -> {
opHandlerService.handleOp(from, op);
});
handles.add(handle);
}
} catch (Exception e) {
Log.error("Error handling ops", e);
throw e;
}
return Uni.createFrom().emitter(e -> {
var counter = new AtomicLong(handles.size());
for (var handle : handles) {
handle.onFlush(() -> {
if (counter.decrementAndGet() == 0) {
e.complete(OpPushReply.getDefaultInstance());
}
});
}
});
}
public Uni<PingReply> ping(PeerId from, PingRequest request) {
return Uni.createFrom().item(PingReply.getDefaultInstance());
}
private PeerId getIdentity() {
return PeerId.of(identity.getPrincipal().getName().substring(3));
@@ -25,24 +156,24 @@ public class RemoteObjectServiceServer implements DhfsObjectSyncGrpc {
@Override
@Blocking
public Uni<GetObjectReply> getObject(GetObjectRequest request) {
return remoteObjectServiceServerImpl.getObject(getIdentity(), request);
return getObject(getIdentity(), request);
}
@Override
@Blocking
public Uni<CanDeleteReply> canDelete(CanDeleteRequest request) {
return remoteObjectServiceServerImpl.canDelete(getIdentity(), request);
return canDelete(getIdentity(), request);
}
@Override
@Blocking
public Uni<OpPushReply> opPush(OpPushRequest request) {
return remoteObjectServiceServerImpl.opPush(getIdentity(), request);
return opPush(getIdentity(), request);
}
@Override
@Blocking
public Uni<PingReply> ping(PingRequest request) {
return remoteObjectServiceServerImpl.ping(getIdentity(), request);
return ping(getIdentity(), request);
}
}

View File

@@ -1,146 +0,0 @@
package com.usatiuk.dhfs.rpc;
import com.usatiuk.dhfs.ProtoSerializer;
import com.usatiuk.dhfs.autosync.AutosyncProcessor;
import com.usatiuk.dhfs.invalidation.Op;
import com.usatiuk.dhfs.invalidation.OpHandlerService;
import com.usatiuk.dhfs.peersync.PeerId;
import com.usatiuk.dhfs.peersync.ReachablePeerManager;
import com.usatiuk.dhfs.persistence.JObjectKeyP;
import com.usatiuk.dhfs.remoteobj.*;
import com.usatiuk.dhfs.repository.*;
import com.usatiuk.dhfs.syncmap.DtoMapperService;
import com.usatiuk.objects.JObjectKey;
import com.usatiuk.objects.transaction.Transaction;
import com.usatiuk.objects.transaction.TransactionHandle;
import com.usatiuk.objects.transaction.TransactionManager;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.quarkus.logging.Log;
import io.smallrye.mutiny.Uni;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import org.apache.commons.lang3.tuple.Pair;
import java.util.ArrayList;
import java.util.concurrent.atomic.AtomicLong;
// Note: RunOnVirtualThread hangs somehow
@ApplicationScoped
public class RemoteObjectServiceServerImpl {
@Inject
TransactionManager txm;
@Inject
ReachablePeerManager reachablePeerManager;
@Inject
Transaction curTx;
@Inject
ProtoSerializer<OpP, Op> opProtoSerializer;
@Inject
ProtoSerializer<GetObjectReply, ReceivedObject> receivedObjectProtoSerializer;
@Inject
RemoteTransaction remoteTx;
@Inject
OpHandlerService opHandlerService;
@Inject
DtoMapperService dtoMapperService;
@Inject
AutosyncProcessor autosyncProcessor;
public Uni<GetObjectReply> getObject(PeerId from, GetObjectRequest request) {
Log.info("<-- getObject: " + request.getName() + " from " + from);
Pair<RemoteObjectMeta, JDataRemoteDto> got = txm.run(() -> {
var meta = remoteTx.getMeta(JObjectKey.of(request.getName().getName())).orElse(null);
var obj = remoteTx.getDataLocal(JDataRemote.class, JObjectKey.of(request.getName().getName())).orElse(null);
if (meta != null && !meta.seen())
curTx.put(meta.withSeen(true));
if (obj != null)
for (var ref : obj.collectRefsTo()) {
var refMeta = remoteTx.getMeta(ref).orElse(null);
if (refMeta != null && !refMeta.seen())
curTx.put(refMeta.withSeen(true));
}
return Pair.of(meta, obj == null ? null : dtoMapperService.toDto(obj, obj.dtoClass()));
});
if ((got.getValue() != null) && (got.getKey() == null)) {
Log.error("Inconsistent state for object meta: " + request.getName());
throw new StatusRuntimeException(Status.INTERNAL);
}
if (got.getValue() == null) {
Log.info("<-- getObject NOT FOUND: " + request.getName() + " from " + from);
throw new StatusRuntimeException(Status.NOT_FOUND);
}
var serialized = receivedObjectProtoSerializer.serialize(new ReceivedObject(got.getKey().changelog(), got.getRight()));
return Uni.createFrom().item(serialized);
}
public Uni<CanDeleteReply> canDelete(PeerId from, CanDeleteRequest request) {
Log.infov("<-- canDelete: {0} from {1}", request, from);
var builder = CanDeleteReply.newBuilder();
txm.run(() -> {
var obj = curTx.get(RemoteObjectMeta.class, JObjectKey.of(request.getName().getName())).orElse(null);
if (obj == null) {
builder.setDeletionCandidate(true);
return;
}
builder.setDeletionCandidate(!obj.frozen() && obj.refsFrom().isEmpty());
if (!builder.getDeletionCandidate()) {
for (var r : obj.refsFrom()) {
builder.addReferrers(JObjectKeyP.newBuilder().setName(r.obj().toString()).build());
curTx.onCommit(() -> autosyncProcessor.add(r.obj()));
}
}
if (!builder.getDeletionCandidate()) {
Log.infov("Not deletion candidate: {0}, {1} (asked from {2})", obj, builder, from);
}
});
return Uni.createFrom().item(builder.build());
}
public Uni<OpPushReply> opPush(PeerId from, OpPushRequest request) {
if (request.getMsgCount() == 0) {
Log.infov("<-- opPush: empty from {0}", from);
return Uni.createFrom().item(OpPushReply.getDefaultInstance());
}
var handles = new ArrayList<TransactionHandle>();
try {
var ops = request.getMsgList().stream().map(opProtoSerializer::deserialize).toList();
for (var op : ops) {
Log.infov("<-- opPush: {0} from {1}", op, from);
var handle = txm.run(() -> {
opHandlerService.handleOp(from, op);
});
handles.add(handle);
}
} catch (Exception e) {
Log.error("Error handling ops", e);
throw e;
}
return Uni.createFrom().emitter(e -> {
var counter = new AtomicLong(handles.size());
for (var handle : handles) {
handle.onFlush(() -> {
if (counter.decrementAndGet() == 0) {
e.complete(OpPushReply.getDefaultInstance());
}
});
}
});
}
public Uni<PingReply> ping(PeerId from, PingRequest request) {
return Uni.createFrom().item(PingReply.getDefaultInstance());
}
}

View File

@@ -1,7 +1,5 @@
quarkus.grpc.server.use-separate-server=false
dhfs.objects.peerdiscovery.port=42069
dhfs.objects.peerdiscovery.interval=4s
dhfs.objects.peerdiscovery.broadcast=true
dhfs.objects.sync.timeout=30
dhfs.objects.sync.ping.timeout=5
dhfs.objects.invalidation.threads=16
@@ -10,20 +8,17 @@ dhfs.objects.reconnect_interval=5s
dhfs.objects.write_log=false
dhfs.objects.periodic-push-op-interval=5m
dhfs.fuse.root=${HOME}/dhfs_default/fuse
dhfs.objects.persistence.stuff.root=${HOME}/dhfs_default/data/stuff
dhfs.fuse.debug=false
dhfs.fuse.enabled=true
dhfs.files.allow_recursive_delete=false
dhfs.files.target_chunk_size=2097152
dhfs.files.target_chunk_alignment=19
dhfs.objects.deletion.delay=1000
dhfs.objects.deletion.can-delete-retry-delay=10000
dhfs.objects.ref_verification=true
dhfs.files.use_hash_for_chunks=false
dhfs.objects.autosync.threads=16
dhfs.objects.autosync.download-all=false
dhfs.objects.move-processor.threads=16
dhfs.objects.ref-processor.threads=16
dhfs.objects.move-processor.threads=8
dhfs.objects.ref-processor.threads=8
dhfs.objects.opsender.batch-size=100
dhfs.objects.lock_timeout_secs=2
dhfs.local-discovery=true

View File

@@ -21,7 +21,7 @@ abstract public class TempDataProfile implements QuarkusTestProfile {
throw new RuntimeException(e);
}
var ret = new HashMap<String, String>();
ret.put("dhfs.objects.persistence.files.root", tempDirWithPrefix.resolve("dhfs_root_test").toString());
ret.put("dhfs.objects.persistence.root", tempDirWithPrefix.resolve("dhfs_root_test").toString());
ret.put("dhfs.fuse.root", tempDirWithPrefix.resolve("dhfs_fuse_root_test").toString());
getConfigOverrides(ret);
return ret;

View File

@@ -15,7 +15,7 @@ import java.util.Objects;
@ApplicationScoped
public class TestDataCleaner {
@ConfigProperty(name = "dhfs.objects.persistence.files.root")
@ConfigProperty(name = "dhfs.objects.persistence.root")
String tempDirectory;
void init(@Observes @Priority(1) StartupEvent event) throws IOException {

View File

@@ -1,5 +1,4 @@
dhfs.objects.persistence.files.root=${HOME}/dhfs_data/dhfs_root_test
dhfs.objects.root=${HOME}/dhfs_data/dhfs_root_d_test
dhfs.objects.persistence.root=${HOME}/dhfs_data/dhfs_root_test
dhfs.fuse.root=${HOME}/dhfs_data/dhfs_fuse_root_test
dhfs.objects.ref_verification=true
dhfs.objects.deletion.delay=0

View File

@@ -1,25 +0,0 @@
version: "3.2"
services:
dhfs1:
# image: ghcr.io/usatiuk/dhfs:main
build: .
privileged: true
devices:
- /dev/fuse
command: "./dockerentry.sh -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=*:5010 -Ddhfs.objects.autosync.download-all=true"
ports:
- 8080:8080
- 8081:8443
- 5005:5005
dhfs2:
# image: ghcr.io/usatiuk/dhfs:main
build: .
privileged: true
devices:
- /dev/fuse
command: "./dockerentry.sh -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=*:5010 -Ddhfs.objects.autosync.download-all=true"
ports:
- 8090:8080
- 8091:8443
- 5010:5010

View File

@@ -1,19 +0,0 @@
#!/bin/bash
set -e || true
set -u || true
set -o pipefail || true
set -x || true
exec java \
-Xmx512M \
--add-exports java.base/sun.nio.ch=ALL-UNNAMED \
--add-exports java.base/jdk.internal.access=ALL-UNNAMED \
-Ddhfs.objects.persistence.files.root=/dhfs_root/p \
-Ddhfs.objects.root=/dhfs_root/d \
-Ddhfs.fuse.root=/dhfs_root_fuse \
-Dquarkus.http.host=0.0.0.0 \
-Ddhfs.objects.ref_verification=false \
-Dquarkus.log.category.\"com.usatiuk.dhfs\".level=$DHFS_LOGLEVEL \
"$@" \
-jar quarkus-run.jar

File diff suppressed because one or more lines are too long

After

Width:  |  Height:  |  Size: 18 KiB

80
launcher/.gitignore vendored
View File

@@ -1,80 +0,0 @@
# Covers JetBrains IDEs: IntelliJ, RubyMine, PhpStorm, AppCode, PyCharm, CLion, Android Studio, WebStorm and Rider
# Reference: https://intellij-support.jetbrains.com/hc/en-us/articles/206544839
# User-specific stuff
.idea/**/workspace.xml
.idea/**/tasks.xml
.idea/**/usage.statistics.xml
.idea/**/dictionaries
.idea/**/shelf
# AWS User-specific
.idea/**/aws.xml
# Generated files
.idea/**/contentModel.xml
# Sensitive or high-churn files
.idea/**/dataSources/
.idea/**/dataSources.ids
.idea/**/dataSources.local.xml
.idea/**/sqlDataSources.xml
.idea/**/dynamic.xml
.idea/**/uiDesigner.xml
.idea/**/dbnavigator.xml
# Gradle
.idea/**/gradle.xml
.idea/**/libraries
# Gradle and Maven with auto-import
# When using Gradle or Maven with auto-import, you should exclude module files,
# since they will be recreated, and may cause churn. Uncomment if using
# auto-import.
# .idea/artifacts
# .idea/compiler.xml
# .idea/jarRepositories.xml
# .idea/modules.xml
# .idea/*.iml
# .idea/modules
# *.iml
# *.ipr
# CMake
cmake-build-*/
build/
# Mongo Explorer plugin
.idea/**/mongoSettings.xml
# File-based project format
*.iws
# IntelliJ
out/
# mpeltonen/sbt-idea plugin
.idea_modules/
# JIRA plugin
atlassian-ide-plugin.xml
# Cursive Clojure plugin
.idea/replstate.xml
# SonarLint plugin
.idea/sonarlint/
# Crashlytics plugin (for Android Studio and IntelliJ)
com_crashlytics_export_strings.xml
crashlytics.properties
crashlytics-build.properties
fabric.properties
# Editor-based Rest Client
.idea/httpRequests
# Android studio 3.1+ serialized cache file
.idea/caches/build_file_checksums.ser
Testing

View File

@@ -1,26 +0,0 @@
cmake_minimum_required(VERSION 3.10)
project(launcher)
if (SANITIZE STREQUAL "YES")
message(WARNING "Enabling sanitizers!")
add_compile_options(-Wall -Wextra -pedantic -Wshadow -Wformat=2 -Wfloat-equal -D_GLIBCXX_DEBUG -Wconversion -U_FORTIFY_SOURCE -D_FORTIFY_SOURCE=2)
add_compile_options(-fsanitize=address -fsanitize=undefined -fno-sanitize-recover)
add_link_options(-fsanitize=address -fsanitize=undefined -fno-sanitize-recover)
endif ()
if (CMAKE_BUILD_TYPE STREQUAL "Release")
add_compile_options(-flto)
add_link_options(-flto)
endif ()
if (NOT CMAKE_BUILD_TYPE STREQUAL "Debug")
add_compile_options(-O3)
add_link_options(-O3)
endif ()
find_package(wxWidgets REQUIRED COMPONENTS net core base)
if (wxWidgets_USE_FILE) # not defined in CONFIG mode
include(${wxWidgets_USE_FILE})
endif ()
add_subdirectory(src)

View File

@@ -1,7 +0,0 @@
set(CMAKE_CXX_STANDARD 17)
set(CMAKE_CXX_STANDARD_REQUIRED ON)
add_executable(launcher
LauncherApp.cpp
)
target_link_libraries(launcher ${wxWidgets_LIBRARIES})

View File

@@ -1,58 +0,0 @@
//
// Created by Stepan Usatiuk on 11.07.2024.
//
// For compilers that don't support precompilation, include "wx/wx.h"
#include "wx/wxprec.h"
#ifndef WX_PRECOMP
# include "wx/wx.h"
#endif
#include "wx/notebook.h"
#include "LauncherApp.h"
#include "wx/taskbar.h"
IMPLEMENT_APP(LauncherApp)
// This is executed upon startup, like 'main()' in non-wxWidgets programs.
bool LauncherApp::OnInit() {
wxFrame* frame = new MainFrame(_T("DHFS Launcher"), wxDefaultPosition);
frame->CreateStatusBar();
frame->SetStatusText(_T("Hello World"));
frame->Show(true);
SetTopWindow(frame);
wxTaskBarIcon* tb = new wxTaskBarIcon();
auto img = new wxImage(32, 32, false);
img->Clear(128);
tb->SetIcon(*(new wxBitmapBundle(*(new wxBitmap(*img)))), "e");
return true;
}
BEGIN_EVENT_TABLE(MainFrame, wxFrame)
EVT_BUTTON(BUTTON_Hello, MainFrame::OnExit) // Tell the OS to run MainFrame::OnExit when
END_EVENT_TABLE() // The button is pressed
MainFrame::MainFrame(const wxString& title, const wxPoint& pos)
: wxFrame((wxFrame*) NULL, -1, title, pos) {
Notebook = new wxNotebook(this, NOTEBOOK_Main);
Panel = new wxPanel(Notebook);
Panel2 = new wxPanel(Notebook);
Notebook->AddPage(Panel, "Hello");
Notebook->AddPage(Panel2, "Hello2");
Panel->SetBackgroundColour(wxColour(0xFF0000));
HelloWorld = new wxButton(Panel, BUTTON_Hello, _T("Hello World"),
// shows a button on this window
wxDefaultPosition, wxDefaultSize, 0); // with the text "hello World"
}
void MainFrame::OnExit(wxCommandEvent& event) {
Close(TRUE);
}

View File

@@ -1,38 +0,0 @@
//
// Created by Stepan Usatiuk on 11.07.2024.
//
#ifndef HELLOWORLDAPP_H
#define HELLOWORLDAPP_H
// The HelloWorldApp class. This class shows a window
// containing a statusbar with the text "Hello World"
class LauncherApp : public wxApp {
public:
virtual bool OnInit();
};
class MainFrame : public wxFrame // MainFrame is the class for our window,
{
// It contains the window and all objects in it
public:
MainFrame(const wxString& title, const wxPoint& pos);
wxButton* HelloWorld;
wxNotebook* Notebook;
wxPanel *Panel;
wxPanel *Panel2;
void OnExit(wxCommandEvent& event);
DECLARE_EVENT_TABLE()
};
enum {
BUTTON_Hello = wxID_HIGHEST + 1, // declares an id which will be used to call our button
NOTEBOOK_Main = wxID_HIGHEST + 2 // declares an id which will be used to call our button
};
DECLARE_APP(LauncherApp)
#endif //HELLOWORLDAPP_H

View File

@@ -23,16 +23,26 @@ EXTRAOPTS_PARSED="$(tr '\n\r' ' ' <"$EXTRAOPTS")"
echo "Extra options: $EXTRAOPTS_PARSED"
java \
if [ -n "${JAVA_HOME:-}" ]; then
JAVA_EXEC="$JAVA_HOME/bin/java"
else
JAVA_EXEC="java"
fi
if ! command -v "$JAVA_EXEC"; then
echo "Java not found"
exit 1
fi
"$JAVA_EXEC" \
-Xmx512M \
--enable-preview \
-Ddhfs.objects.writeback.limit=134217728 \
-Ddhfs.objects.lru.limit=134217728 \
-Ddhfs.objects.writeback.limit=16777216 \
-Ddhfs.objects.lru.limit=67108864 \
--add-exports java.base/sun.nio.ch=ALL-UNNAMED \
--add-exports java.base/jdk.internal.access=ALL-UNNAMED \
--add-opens=java.base/java.nio=ALL-UNNAMED \
-Ddhfs.objects.persistence.files.root="$SCRIPT_DIR"/../data/objects \
-Ddhfs.objects.persistence.stuff.root="$SCRIPT_DIR"/../data/stuff \
-Ddhfs.objects.persistence.root="$SCRIPT_DIR"/../data \
-Ddhfs.fuse.root="$SCRIPT_DIR"/../fuse \
-Dquarkus.http.host=0.0.0.0 \
-Dquarkus.log.category.\"com.usatiuk\".level=INFO \

View File

@@ -24,14 +24,14 @@ Write-Host "Extra options: $($ExtraOptsParsed -join ' ')"
$JAVA_OPTS = @(
"-Xmx512M"
"--enable-preview"
"-Ddhfs.objects.writeback.limit=134217728"
"-Ddhfs.objects.lru.limit=134217728"
"-Ddhfs.objects.writeback.limit=16777216"
"-Ddhfs.objects.lru.limit=67108864"
"--add-exports", "java.base/sun.nio.ch=ALL-UNNAMED"
"--add-exports", "java.base/jdk.internal.access=ALL-UNNAMED"
"--add-opens=java.base/java.nio=ALL-UNNAMED"
"-Ddhfs.objects.persistence.files.root=$($PSScriptRoot)\..\data\objects"
"-Ddhfs.objects.persistence.stuff.root=$($PSScriptRoot)\..\data\stuff"
"-Ddhfs.objects.persistence.lmdb.size=1000000000"
"-Ddhfs.objects.persistence.lmdb.size=100000000000"
"-Ddhfs.fuse.root=Z:\"
"-Dquarkus.http.host=0.0.0.0"
'-Dquarkus.log.category.\"com.usatiuk\".level=INFO'

View File

@@ -14,7 +14,6 @@ if [ -f "$PIDFILE" ]; then
fi
fi
# 💀
LATEST=$(curl "https://api.github.com/repos/usatiuk/dhfs/actions/runs?branch=main&status=completed&per_page=1" | tr -d "[:space:]" | sed -n "s/.*\[{\"id\":\([0-9]*\).*/\1/p")
echo Latest: $LATEST