somewhat working fs

This commit is contained in:
2024-12-28 18:40:21 +01:00
parent 14ba4b8e2e
commit 6da2e43cee
158 changed files with 5884 additions and 1218 deletions

View File

@@ -1,9 +1,11 @@
package com.usatiuk.kleppmanntree;
import java.io.Serializable;
public record LogEffect<TimestampT extends Comparable<TimestampT>, PeerIdT extends Comparable<PeerIdT>, MetaT extends NodeMeta, NodeIdT>(
LogEffectOld<TimestampT, PeerIdT, MetaT, NodeIdT> oldInfo,
OpMove<TimestampT, PeerIdT, MetaT, NodeIdT> effectiveOp,
NodeIdT newParentId,
MetaT newMeta,
NodeIdT childId) {
NodeIdT childId) implements Serializable {
}

View File

@@ -1,6 +1,9 @@
package com.usatiuk.kleppmanntree;
import java.io.Serializable;
public record LogEffectOld<TimestampT extends Comparable<TimestampT>, PeerIdT extends Comparable<PeerIdT>, MetaT extends NodeMeta, NodeIdT>
(OpMove<TimestampT, PeerIdT, MetaT, NodeIdT> oldEffectiveMove,
NodeIdT oldParent,
MetaT oldMeta) {}
MetaT oldMeta) implements Serializable {
}

View File

@@ -1,7 +1,9 @@
package com.usatiuk.kleppmanntree;
import java.io.Serializable;
import java.util.List;
public record LogRecord<TimestampT extends Comparable<TimestampT>, PeerIdT extends Comparable<PeerIdT>, MetaT extends NodeMeta, NodeIdT>
(OpMove<TimestampT, PeerIdT, MetaT, NodeIdT> op,
List<LogEffect<TimestampT, PeerIdT, MetaT, NodeIdT>> effects) {}
List<LogEffect<TimestampT, PeerIdT, MetaT, NodeIdT>> effects) implements Serializable {
}

View File

@@ -1,5 +1,8 @@
package com.usatiuk.kleppmanntree;
import java.io.Serializable;
public record OpMove<TimestampT extends Comparable<TimestampT>, PeerIdT extends Comparable<PeerIdT>, MetaT extends NodeMeta, NodeIdT>
(CombinedTimestamp<TimestampT, PeerIdT> timestamp, NodeIdT newParentId, MetaT newMeta,
NodeIdT childId) {}
NodeIdT childId) implements Serializable {
}

View File

@@ -3,12 +3,13 @@ package com.usatiuk.kleppmanntree;
import lombok.Getter;
import lombok.Setter;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
@Getter
@Setter
public class TreeNode<TimestampT extends Comparable<TimestampT>, PeerIdT extends Comparable<PeerIdT>, MetaT extends NodeMeta, NodeIdT> {
public class TreeNode<TimestampT extends Comparable<TimestampT>, PeerIdT extends Comparable<PeerIdT>, MetaT extends NodeMeta, NodeIdT> implements Serializable {
private final NodeIdT _id;
private NodeIdT _parent = null;
private OpMove<TimestampT, PeerIdT, MetaT, NodeIdT> _lastEffectiveOp = null;

View File

@@ -154,7 +154,7 @@ class ObjectsAllocProcessor {
try (var constructor = classCreator.getConstructorCreator(item.klass.name().toString(), long.class.getName())) {
constructor.invokeSpecialMethod(MethodDescriptor.ofConstructor(Object.class), constructor.getThis());
constructor.writeInstanceField(modified.getFieldDescriptor(), constructor.getThis(), constructor.load(false));
constructor.writeInstanceField(modified.getFieldDescriptor(), constructor.getThis(), constructor.load(true)); // FIXME:
for (var field : fieldsMap.values()) {
if (!Objects.equals(field.getName(), VERSION_NAME))
constructor.writeInstanceField(field, constructor.getThis(), constructor.invokeInterfaceMethod(
@@ -239,6 +239,7 @@ class ObjectsAllocProcessor {
Map<String, MethodInfo> collectMethods(List<ClassInfo> types) {
return types.stream()
.flatMap(x -> x.methods().stream())
.filter(x -> x.name().startsWith("get") || x.name().startsWith("set"))
.collect(Collectors.toMap(MethodInfo::name, x -> x));
}

View File

@@ -44,6 +44,13 @@ public interface TransactionManager {
}
}
default void executeTx(VoidFn fn) {
run(fn);
}
default <T> T executeTx(Supplier<T> supplier) {
return run(supplier);
}
Transaction current();
}

View File

@@ -0,0 +1,5 @@
*
!target/*-runner
!target/*-runner.jar
!target/lib/*
!target/quarkus-app/*

43
dhfs-parent/server-old/.gitignore vendored Normal file
View File

@@ -0,0 +1,43 @@
#Maven
target/
pom.xml.tag
pom.xml.releaseBackup
pom.xml.versionsBackup
release.properties
.flattened-pom.xml
# Eclipse
.project
.classpath
.settings/
bin/
# IntelliJ
.idea
*.ipr
*.iml
*.iws
# NetBeans
nb-configuration.xml
# Visual Studio Code
.vscode
.factorypath
# OSX
.DS_Store
# Vim
*.swp
*.swo
# patch
*.orig
*.rej
# Local environment
.env
# Plugin directory
/.quarkus/cli/plugins/

View File

@@ -0,0 +1,2 @@
FROM azul/zulu-openjdk-debian:21-jre-latest
RUN apt update && apt install -y libfuse2 curl

View File

@@ -0,0 +1,42 @@
version: "3.2"
services:
dhfs1:
build: .
privileged: true
devices:
- /dev/fuse
volumes:
- $HOME/dhfs/dhfs1:/dhfs_root
- $HOME/dhfs/dhfs1_f:/dhfs_root/fuse:rshared
- ./target/quarkus-app:/app
command: "java --add-exports java.base/sun.nio.ch=ALL-UNNAMED
-Ddhfs.objects.persistence.files.root=/dhfs_root/p
-Ddhfs.objects.root=/dhfs_root/d
-Ddhfs.fuse.root=/dhfs_root/fuse -Dquarkus.http.host=0.0.0.0
-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=*:5005
-jar /app/quarkus-run.jar"
ports:
- 8080:8080
- 8081:8443
- 5005:5005
dhfs2:
build: .
privileged: true
devices:
- /dev/fuse
volumes:
- $HOME/dhfs/dhfs2:/dhfs_root
- $HOME/dhfs/dhfs2_f:/dhfs_root/fuse:rshared
- ./target/quarkus-app:/app
command: "java --add-exports java.base/sun.nio.ch=ALL-UNNAMED
--add-exports java.base/jdk.internal.access=ALL-UNNAMED
-Ddhfs.objects.persistence.files.root=/dhfs_root/p
-Ddhfs.objects.root=/dhfs_root/d
-Ddhfs.fuse.root=/dhfs_root/fuse -Dquarkus.http.host=0.0.0.0
-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=*:5010
-jar /app/quarkus-run.jar"
ports:
- 8090:8080
- 8091:8443
- 5010:5010

View File

@@ -0,0 +1,209 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.usatiuk.dhfs</groupId>
<artifactId>server</artifactId>
<version>1.0.0-SNAPSHOT</version>
<parent>
<groupId>com.usatiuk.dhfs</groupId>
<artifactId>parent</artifactId>
<version>1.0-SNAPSHOT</version>
</parent>
<dependencies>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.usatiuk</groupId>
<artifactId>autoprotomap</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.usatiuk</groupId>
<artifactId>autoprotomap-deployment</artifactId>
<version>1.0-SNAPSHOT</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.bouncycastle</groupId>
<artifactId>bcprov-jdk18on</artifactId>
<version>1.78.1</version>
</dependency>
<dependency>
<groupId>org.bouncycastle</groupId>
<artifactId>bcpkix-jdk18on</artifactId>
<version>1.78.1</version>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-security</artifactId>
</dependency>
<dependency>
<groupId>net.openhft</groupId>
<artifactId>zero-allocation-hashing</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-grpc</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-arc</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-rest</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-rest-client</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-rest-client-jsonb</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-rest-jsonb</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-scheduler</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-junit5</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.github.SerCeMan</groupId>
<artifactId>jnr-fuse</artifactId>
<version>44ed40f8ce</version>
</dependency>
<dependency>
<groupId>com.github.jnr</groupId>
<artifactId>jnr-ffi</artifactId>
<version>2.2.16</version>
</dependency>
<dependency>
<groupId>com.github.jnr</groupId>
<artifactId>jnr-posix</artifactId>
<version>3.1.19</version>
</dependency>
<dependency>
<groupId>com.github.jnr</groupId>
<artifactId>jnr-constants</artifactId>
<version>0.10.4</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
</dependency>
<dependency>
<groupId>org.jboss.slf4j</groupId>
<artifactId>slf4j-jboss-logmanager</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-collections4</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-math3</artifactId>
<version>3.6.1</version>
</dependency>
<dependency>
<groupId>com.usatiuk</groupId>
<artifactId>kleppmanntree</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.usatiuk.dhfs</groupId>
<artifactId>supportlib</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.usatiuk.dhfs</groupId>
<artifactId>objects</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.usatiuk.dhfs</groupId>
<artifactId>utils</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<forkCount>1C</forkCount>
<reuseForks>false</reuseForks>
<parallel>classes</parallel>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-failsafe-plugin</artifactId>
<configuration>
<systemPropertyVariables>
<junit.jupiter.execution.parallel.enabled>
true
</junit.jupiter.execution.parallel.enabled>
<junit.jupiter.execution.parallel.mode.default>
concurrent
</junit.jupiter.execution.parallel.mode.default>
<junit.jupiter.execution.parallel.config.dynamic.factor>
0.5
</junit.jupiter.execution.parallel.config.dynamic.factor>
<junit.platform.output.capture.stdout>true</junit.platform.output.capture.stdout>
<junit.platform.output.capture.stderr>true</junit.platform.output.capture.stderr>
</systemPropertyVariables>
</configuration>
</plugin>
<plugin>
<groupId>${quarkus.platform.group-id}</groupId>
<artifactId>quarkus-maven-plugin</artifactId>
<version>${quarkus.platform.version}</version>
<extensions>true</extensions>
<executions>
<execution>
<id>quarkus-plugin</id>
<goals>
<goal>build</goal>
<goal>generate-code</goal>
<goal>generate-code-tests</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

View File

@@ -0,0 +1 @@
lombok.accessors.prefix += _

View File

@@ -0,0 +1,97 @@
####
# This Dockerfile is used in order to build a container that runs the Quarkus application in JVM mode
#
# Before building the container image run:
#
# ./mvnw package
#
# Then, build the image with:
#
# docker build -f src/main/docker/Dockerfile.jvm -t quarkus/server-jvm .
#
# Then run the container using:
#
# docker run -i --rm -p 8080:8080 quarkus/server-jvm
#
# If you want to include the debug port into your docker image
# you will have to expose the debug port (default 5005 being the default) like this : EXPOSE 8080 5005.
# Additionally you will have to set -e JAVA_DEBUG=true and -e JAVA_DEBUG_PORT=*:5005
# when running the container
#
# Then run the container using :
#
# docker run -i --rm -p 8080:8080 quarkus/server-jvm
#
# This image uses the `run-java.sh` script to run the application.
# This scripts computes the command line to execute your Java application, and
# includes memory/GC tuning.
# You can configure the behavior using the following environment properties:
# - JAVA_OPTS: JVM options passed to the `java` command (example: "-verbose:class")
# - JAVA_OPTS_APPEND: User specified Java options to be appended to generated options
# in JAVA_OPTS (example: "-Dsome.property=foo")
# - JAVA_MAX_MEM_RATIO: Is used when no `-Xmx` option is given in JAVA_OPTS. This is
# used to calculate a default maximal heap memory based on a containers restriction.
# If used in a container without any memory constraints for the container then this
# option has no effect. If there is a memory constraint then `-Xmx` is set to a ratio
# of the container available memory as set here. The default is `50` which means 50%
# of the available memory is used as an upper boundary. You can skip this mechanism by
# setting this value to `0` in which case no `-Xmx` option is added.
# - JAVA_INITIAL_MEM_RATIO: Is used when no `-Xms` option is given in JAVA_OPTS. This
# is used to calculate a default initial heap memory based on the maximum heap memory.
# If used in a container without any memory constraints for the container then this
# option has no effect. If there is a memory constraint then `-Xms` is set to a ratio
# of the `-Xmx` memory as set here. The default is `25` which means 25% of the `-Xmx`
# is used as the initial heap size. You can skip this mechanism by setting this value
# to `0` in which case no `-Xms` option is added (example: "25")
# - JAVA_MAX_INITIAL_MEM: Is used when no `-Xms` option is given in JAVA_OPTS.
# This is used to calculate the maximum value of the initial heap memory. If used in
# a container without any memory constraints for the container then this option has
# no effect. If there is a memory constraint then `-Xms` is limited to the value set
# here. The default is 4096MB which means the calculated value of `-Xms` never will
# be greater than 4096MB. The value of this variable is expressed in MB (example: "4096")
# - JAVA_DIAGNOSTICS: Set this to get some diagnostics information to standard output
# when things are happening. This option, if set to true, will set
# `-XX:+UnlockDiagnosticVMOptions`. Disabled by default (example: "true").
# - JAVA_DEBUG: If set remote debugging will be switched on. Disabled by default (example:
# true").
# - JAVA_DEBUG_PORT: Port used for remote debugging. Defaults to 5005 (example: "8787").
# - CONTAINER_CORE_LIMIT: A calculated core limit as described in
# https://www.kernel.org/doc/Documentation/scheduler/sched-bwc.txt. (example: "2")
# - CONTAINER_MAX_MEMORY: Memory limit given to the container (example: "1024").
# - GC_MIN_HEAP_FREE_RATIO: Minimum percentage of heap free after GC to avoid expansion.
# (example: "20")
# - GC_MAX_HEAP_FREE_RATIO: Maximum percentage of heap free after GC to avoid shrinking.
# (example: "40")
# - GC_TIME_RATIO: Specifies the ratio of the time spent outside the garbage collection.
# (example: "4")
# - GC_ADAPTIVE_SIZE_POLICY_WEIGHT: The weighting given to the current GC time versus
# previous GC times. (example: "90")
# - GC_METASPACE_SIZE: The initial metaspace size. (example: "20")
# - GC_MAX_METASPACE_SIZE: The maximum metaspace size. (example: "100")
# - GC_CONTAINER_OPTIONS: Specify Java GC to use. The value of this variable should
# contain the necessary JRE command-line options to specify the required GC, which
# will override the default of `-XX:+UseParallelGC` (example: -XX:+UseG1GC).
# - HTTPS_PROXY: The location of the https proxy. (example: "myuser@127.0.0.1:8080")
# - HTTP_PROXY: The location of the http proxy. (example: "myuser@127.0.0.1:8080")
# - NO_PROXY: A comma separated lists of hosts, IP addresses or domains that can be
# accessed directly. (example: "foo.example.com,bar.example.com")
#
###
FROM registry.access.redhat.com/ubi8/openjdk-21:1.18
ENV LANGUAGE='en_US:en'
# We make four distinct layers so if there are application changes the library layers can be re-used
COPY --chown=185 target/quarkus-app/lib/ /deployments/lib/
COPY --chown=185 target/quarkus-app/*.jar /deployments/
COPY --chown=185 target/quarkus-app/app/ /deployments/app/
COPY --chown=185 target/quarkus-app/quarkus/ /deployments/quarkus/
EXPOSE 8080
USER 185
ENV JAVA_OPTS_APPEND="-Dquarkus.http.host=0.0.0.0 -Djava.util.logging.manager=org.jboss.logmanager.LogManager"
ENV JAVA_APP_JAR="/deployments/quarkus-run.jar"
ENTRYPOINT [ "/opt/jboss/container/java/run/run-java.sh" ]

View File

@@ -0,0 +1,93 @@
####
# This Dockerfile is used in order to build a container that runs the Quarkus application in JVM mode
#
# Before building the container image run:
#
# ./mvnw package -Dquarkus.package.jar.type=legacy-jar
#
# Then, build the image with:
#
# docker build -f src/main/docker/Dockerfile.legacy-jar -t quarkus/server-legacy-jar .
#
# Then run the container using:
#
# docker run -i --rm -p 8080:8080 quarkus/server-legacy-jar
#
# If you want to include the debug port into your docker image
# you will have to expose the debug port (default 5005 being the default) like this : EXPOSE 8080 5005.
# Additionally you will have to set -e JAVA_DEBUG=true and -e JAVA_DEBUG_PORT=*:5005
# when running the container
#
# Then run the container using :
#
# docker run -i --rm -p 8080:8080 quarkus/server-legacy-jar
#
# This image uses the `run-java.sh` script to run the application.
# This scripts computes the command line to execute your Java application, and
# includes memory/GC tuning.
# You can configure the behavior using the following environment properties:
# - JAVA_OPTS: JVM options passed to the `java` command (example: "-verbose:class")
# - JAVA_OPTS_APPEND: User specified Java options to be appended to generated options
# in JAVA_OPTS (example: "-Dsome.property=foo")
# - JAVA_MAX_MEM_RATIO: Is used when no `-Xmx` option is given in JAVA_OPTS. This is
# used to calculate a default maximal heap memory based on a containers restriction.
# If used in a container without any memory constraints for the container then this
# option has no effect. If there is a memory constraint then `-Xmx` is set to a ratio
# of the container available memory as set here. The default is `50` which means 50%
# of the available memory is used as an upper boundary. You can skip this mechanism by
# setting this value to `0` in which case no `-Xmx` option is added.
# - JAVA_INITIAL_MEM_RATIO: Is used when no `-Xms` option is given in JAVA_OPTS. This
# is used to calculate a default initial heap memory based on the maximum heap memory.
# If used in a container without any memory constraints for the container then this
# option has no effect. If there is a memory constraint then `-Xms` is set to a ratio
# of the `-Xmx` memory as set here. The default is `25` which means 25% of the `-Xmx`
# is used as the initial heap size. You can skip this mechanism by setting this value
# to `0` in which case no `-Xms` option is added (example: "25")
# - JAVA_MAX_INITIAL_MEM: Is used when no `-Xms` option is given in JAVA_OPTS.
# This is used to calculate the maximum value of the initial heap memory. If used in
# a container without any memory constraints for the container then this option has
# no effect. If there is a memory constraint then `-Xms` is limited to the value set
# here. The default is 4096MB which means the calculated value of `-Xms` never will
# be greater than 4096MB. The value of this variable is expressed in MB (example: "4096")
# - JAVA_DIAGNOSTICS: Set this to get some diagnostics information to standard output
# when things are happening. This option, if set to true, will set
# `-XX:+UnlockDiagnosticVMOptions`. Disabled by default (example: "true").
# - JAVA_DEBUG: If set remote debugging will be switched on. Disabled by default (example:
# true").
# - JAVA_DEBUG_PORT: Port used for remote debugging. Defaults to 5005 (example: "8787").
# - CONTAINER_CORE_LIMIT: A calculated core limit as described in
# https://www.kernel.org/doc/Documentation/scheduler/sched-bwc.txt. (example: "2")
# - CONTAINER_MAX_MEMORY: Memory limit given to the container (example: "1024").
# - GC_MIN_HEAP_FREE_RATIO: Minimum percentage of heap free after GC to avoid expansion.
# (example: "20")
# - GC_MAX_HEAP_FREE_RATIO: Maximum percentage of heap free after GC to avoid shrinking.
# (example: "40")
# - GC_TIME_RATIO: Specifies the ratio of the time spent outside the garbage collection.
# (example: "4")
# - GC_ADAPTIVE_SIZE_POLICY_WEIGHT: The weighting given to the current GC time versus
# previous GC times. (example: "90")
# - GC_METASPACE_SIZE: The initial metaspace size. (example: "20")
# - GC_MAX_METASPACE_SIZE: The maximum metaspace size. (example: "100")
# - GC_CONTAINER_OPTIONS: Specify Java GC to use. The value of this variable should
# contain the necessary JRE command-line options to specify the required GC, which
# will override the default of `-XX:+UseParallelGC` (example: -XX:+UseG1GC).
# - HTTPS_PROXY: The location of the https proxy. (example: "myuser@127.0.0.1:8080")
# - HTTP_PROXY: The location of the http proxy. (example: "myuser@127.0.0.1:8080")
# - NO_PROXY: A comma separated lists of hosts, IP addresses or domains that can be
# accessed directly. (example: "foo.example.com,bar.example.com")
#
###
FROM registry.access.redhat.com/ubi8/openjdk-21:1.18
ENV LANGUAGE='en_US:en'
COPY target/lib/* /deployments/lib/
COPY target/*-runner.jar /deployments/quarkus-run.jar
EXPOSE 8080
USER 185
ENV JAVA_OPTS_APPEND="-Dquarkus.http.host=0.0.0.0 -Djava.util.logging.manager=org.jboss.logmanager.LogManager"
ENV JAVA_APP_JAR="/deployments/quarkus-run.jar"
ENTRYPOINT [ "/opt/jboss/container/java/run/run-java.sh" ]

View File

@@ -0,0 +1,27 @@
####
# This Dockerfile is used in order to build a container that runs the Quarkus application in native (no JVM) mode.
#
# Before building the container image run:
#
# ./mvnw package -Dnative
#
# Then, build the image with:
#
# docker build -f src/main/docker/Dockerfile.native -t quarkus/server .
#
# Then run the container using:
#
# docker run -i --rm -p 8080:8080 quarkus/server
#
###
FROM registry.access.redhat.com/ubi8/ubi-minimal:8.9
WORKDIR /work/
RUN chown 1001 /work \
&& chmod "g+rwX" /work \
&& chown 1001:root /work
COPY --chown=1001:root target/*-runner /work/application
EXPOSE 8080
USER 1001
ENTRYPOINT ["./application", "-Dquarkus.http.host=0.0.0.0"]

View File

@@ -0,0 +1,30 @@
####
# This Dockerfile is used in order to build a container that runs the Quarkus application in native (no JVM) mode.
# It uses a micro base image, tuned for Quarkus native executables.
# It reduces the size of the resulting container image.
# Check https://quarkus.io/guides/quarkus-runtime-base-image for further information about this image.
#
# Before building the container image run:
#
# ./mvnw package -Dnative
#
# Then, build the image with:
#
# docker build -f src/main/docker/Dockerfile.native-micro -t quarkus/server .
#
# Then run the container using:
#
# docker run -i --rm -p 8080:8080 quarkus/server
#
###
FROM quay.io/quarkus/quarkus-micro-image:2.0
WORKDIR /work/
RUN chown 1001 /work \
&& chmod "g+rwX" /work \
&& chown 1001:root /work
COPY --chown=1001:root target/*-runner /work/application
EXPOSE 8080
USER 1001
ENTRYPOINT ["./application", "-Dquarkus.http.host=0.0.0.0"]

View File

@@ -0,0 +1,63 @@
import io.quarkus.logging.Log;
import io.quarkus.runtime.ShutdownEvent;
import io.quarkus.runtime.StartupEvent;
import jakarta.annotation.Priority;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
import java.lang.management.ManagementFactory;
import java.lang.management.ThreadInfo;
import java.lang.management.ThreadMXBean;
import java.util.Arrays;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@ApplicationScoped
public class DeadlockDetector {
private final ExecutorService _executor = Executors.newSingleThreadExecutor();
void init(@Observes @Priority(1) StartupEvent event) {
_executor.submit(this::run);
}
void shutdown(@Observes @Priority(100000) ShutdownEvent event) {
_executor.shutdownNow();
}
private void run() {
ThreadMXBean bean = ManagementFactory.getThreadMXBean();
try {
while (!Thread.interrupted()) {
Thread.sleep(4000);
long[] threadIds = bean.findDeadlockedThreads(); // Returns null if no threads are deadlocked.
if (threadIds != null) {
ThreadInfo[] infos = bean.getThreadInfo(threadIds, Integer.MAX_VALUE);
StringBuilder sb = new StringBuilder();
sb.append("Deadlock detected!\n");
for (ThreadInfo info : infos) {
StackTraceElement[] stack = info.getStackTrace();
sb.append(info.getThreadName()).append("\n");
sb.append("getLockedMonitors: ").append(Arrays.toString(info.getLockedMonitors())).append("\n");
sb.append("getLockedSynchronizers: ").append(Arrays.toString(info.getLockedSynchronizers())).append("\n");
sb.append("waiting on: ").append(info.getLockInfo()).append("\n");
sb.append("locked by: ").append(info.getLockOwnerName()).append("\n");
sb.append("Stack trace:\n");
for (var e : stack) {
sb.append(e.toString()).append("\n");
}
sb.append("===");
}
Log.error(sb);
}
}
} catch (InterruptedException e) {
}
Log.info("Deadlock detector thread exiting");
}
}

View File

@@ -0,0 +1,21 @@
package com.usatiuk.dhfs;
import io.quarkus.runtime.Quarkus;
import io.quarkus.runtime.QuarkusApplication;
import io.quarkus.runtime.annotations.QuarkusMain;
@QuarkusMain
public class Main {
public static void main(String... args) {
Quarkus.run(DhfsStorageServerApp.class, args);
}
public static class DhfsStorageServerApp implements QuarkusApplication {
@Override
public int run(String... args) throws Exception {
Quarkus.waitForExit();
return 0;
}
}
}

View File

@@ -0,0 +1,42 @@
package com.usatiuk.dhfs;
import io.quarkus.logging.Log;
import io.quarkus.runtime.ShutdownEvent;
import io.quarkus.runtime.StartupEvent;
import jakarta.annotation.Priority;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import java.io.IOException;
import java.nio.file.Paths;
@ApplicationScoped
public class ShutdownChecker {
private static final String dataFileName = "running";
@ConfigProperty(name = "dhfs.objects.root")
String dataRoot;
boolean _cleanShutdown = true;
boolean _initialized = false;
void init(@Observes @Priority(2) StartupEvent event) throws IOException {
Paths.get(dataRoot).toFile().mkdirs();
Log.info("Initializing with root " + dataRoot);
if (Paths.get(dataRoot).resolve(dataFileName).toFile().exists()) {
_cleanShutdown = false;
Log.error("Unclean shutdown detected!");
} else {
Paths.get(dataRoot).resolve(dataFileName).toFile().createNewFile();
}
_initialized = true;
}
void shutdown(@Observes @Priority(100000) ShutdownEvent event) throws IOException {
Paths.get(dataRoot).resolve(dataFileName).toFile().delete();
}
public boolean lastShutdownClean() {
if (!_initialized) throw new IllegalStateException("ShutdownChecker not initialized");
return _cleanShutdown;
}
}

View File

@@ -0,0 +1,90 @@
package com.usatiuk.dhfs.files.objects;
import com.google.protobuf.ByteString;
import com.usatiuk.dhfs.files.conflicts.NoOpConflictResolver;
import com.usatiuk.dhfs.objects.jrepository.AssumedUnique;
import com.usatiuk.dhfs.objects.jrepository.JObjectData;
import com.usatiuk.dhfs.objects.jrepository.Leaf;
import com.usatiuk.dhfs.objects.persistence.ChunkDataP;
import com.usatiuk.dhfs.objects.repository.ConflictResolver;
import net.openhft.hashing.LongTupleHashFunction;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
@AssumedUnique
@Leaf
public class ChunkData extends JObjectData {
final ChunkDataP _data;
public ChunkData(ByteString bytes) {
super();
_data = ChunkDataP.newBuilder()
.setData(bytes)
// TODO: There might be (most definitely) a copy there
.setName(Arrays.stream(LongTupleHashFunction.xx128().hashBytes(bytes.asReadOnlyByteBuffer()))
.mapToObj(Long::toHexString).collect(Collectors.joining()))
.build();
}
public ChunkData(ByteString bytes, String name) {
super();
_data = ChunkDataP.newBuilder()
.setData(bytes)
.setName(name)
.build();
}
public ChunkData(ChunkDataP chunkDataP) {
super();
_data = chunkDataP;
}
ChunkDataP getData() {
return _data;
}
public ByteString getBytes() {
return _data.getData();
}
public int getSize() {
return _data.getData().size();
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ChunkData chunkData = (ChunkData) o;
return Objects.equals(getName(), chunkData.getName());
}
@Override
public int hashCode() {
return Objects.hashCode(getName());
}
@Override
public String getName() {
return _data.getName();
}
@Override
public Class<? extends ConflictResolver> getConflictResolver() {
return NoOpConflictResolver.class;
}
@Override
public Collection<String> extractRefs() {
return List.of();
}
@Override
public int estimateSize() {
return _data.getData().size();
}
}

View File

@@ -0,0 +1,51 @@
package com.usatiuk.dhfs.files.objects;
import com.usatiuk.dhfs.files.conflicts.FileConflictResolver;
import com.usatiuk.dhfs.objects.jrepository.JObjectData;
import com.usatiuk.dhfs.objects.repository.ConflictResolver;
import lombok.Getter;
import lombok.Setter;
import java.util.*;
public class File extends FsNode {
@Getter
private final NavigableMap<Long, String> _chunks;
@Getter
private final boolean _symlink;
@Getter
@Setter
private long _size = 0;
public File(UUID uuid, long mode, boolean symlink) {
super(uuid, mode);
_symlink = symlink;
_chunks = new TreeMap<>();
}
public File(UUID uuid, long mode, boolean symlink, NavigableMap<Long, String> chunks) {
super(uuid, mode);
_symlink = symlink;
_chunks = chunks;
}
@Override
public Class<? extends ConflictResolver> getConflictResolver() {
return FileConflictResolver.class;
}
@Override
public Class<? extends JObjectData> getRefType() {
return ChunkData.class;
}
@Override
public Collection<String> extractRefs() {
return Collections.unmodifiableCollection(_chunks.values());
}
@Override
public int estimateSize() {
return _chunks.size() * 192;
}
}

View File

@@ -0,0 +1,43 @@
package com.usatiuk.dhfs.files.objects;
import com.usatiuk.dhfs.objects.jrepository.JObjectData;
import lombok.Getter;
import lombok.Setter;
import java.io.Serial;
import java.util.UUID;
public abstract class FsNode extends JObjectData {
@Serial
private static final long serialVersionUID = 1;
@Getter
final UUID _uuid;
@Getter
@Setter
private long _mode;
@Getter
@Setter
private long _ctime;
@Getter
@Setter
private long _mtime;
protected FsNode(UUID uuid) {
this._uuid = uuid;
this._ctime = System.currentTimeMillis();
this._mtime = this._ctime;
}
protected FsNode(UUID uuid, long mode) {
this._uuid = uuid;
this._mode = mode;
this._ctime = System.currentTimeMillis();
this._mtime = this._ctime;
}
@Override
public String getName() {
return _uuid.toString();
}
}

View File

@@ -0,0 +1,51 @@
package com.usatiuk.dhfs.files.service;
import com.google.protobuf.ByteString;
import com.google.protobuf.UnsafeByteOperations;
import com.usatiuk.dhfs.files.objects.File;
import com.usatiuk.dhfs.objects.jrepository.JObject;
import org.apache.commons.lang3.tuple.Pair;
import java.util.Optional;
public interface DhfsFileService {
Optional<String> open(String name);
Optional<String> create(String name, long mode);
Pair<String, String> inoToParent(String ino);
void mkdir(String name, long mode);
Optional<GetattrRes> getattr(String name);
Boolean chmod(String name, long mode);
void unlink(String name);
Boolean rename(String from, String to);
Boolean setTimes(String fileUuid, long atimeMs, long mtimeMs);
Iterable<String> readDir(String name);
void updateFileSize(JObject<File> file);
Long size(String f);
Optional<ByteString> read(String fileUuid, long offset, int length);
Long write(String fileUuid, long offset, ByteString data);
default Long write(String fileUuid, long offset, byte[] data) {
return write(fileUuid, offset, UnsafeByteOperations.unsafeWrap(data));
}
Boolean truncate(String fileUuid, long length);
String readlink(String uuid);
ByteString readlinkBS(String uuid);
String symlink(String oldpath, String newpath);
}

View File

@@ -0,0 +1,814 @@
package com.usatiuk.dhfs.files.service;
import com.google.protobuf.ByteString;
import com.google.protobuf.UnsafeByteOperations;
import com.usatiuk.dhfs.files.objects.ChunkData;
import com.usatiuk.dhfs.files.objects.File;
import com.usatiuk.dhfs.files.objects.FsNode;
import com.usatiuk.dhfs.objects.jkleppmanntree.JKleppmannTreeManager;
import com.usatiuk.dhfs.objects.jkleppmanntree.structs.JKleppmannTreeNode;
import com.usatiuk.dhfs.objects.jkleppmanntree.structs.JKleppmannTreeNodeMeta;
import com.usatiuk.dhfs.objects.jkleppmanntree.structs.JKleppmannTreeNodeMetaDirectory;
import com.usatiuk.dhfs.objects.jkleppmanntree.structs.JKleppmannTreeNodeMetaFile;
import com.usatiuk.dhfs.objects.jrepository.JMutator;
import com.usatiuk.dhfs.objects.jrepository.JObject;
import com.usatiuk.dhfs.objects.jrepository.JObjectManager;
import com.usatiuk.dhfs.objects.jrepository.JObjectTxManager;
import com.usatiuk.dhfs.objects.repository.PersistentPeerDataService;
import com.usatiuk.dhfs.utils.StatusRuntimeExceptionNoStacktrace;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.quarkus.logging.Log;
import io.quarkus.runtime.StartupEvent;
import jakarta.annotation.Priority;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
import jakarta.inject.Inject;
import org.apache.commons.lang3.tuple.Pair;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.util.*;
import java.util.stream.StreamSupport;
@ApplicationScoped
public class DhfsFileServiceImpl implements DhfsFileService {
@Inject
JObjectManager jObjectManager;
@Inject
JObjectTxManager jObjectTxManager;
@ConfigProperty(name = "dhfs.files.target_chunk_size")
int targetChunkSize;
@ConfigProperty(name = "dhfs.files.write_merge_threshold")
float writeMergeThreshold;
@ConfigProperty(name = "dhfs.files.write_merge_max_chunk_to_take")
float writeMergeMaxChunkToTake;
@ConfigProperty(name = "dhfs.files.write_merge_limit")
float writeMergeLimit;
@ConfigProperty(name = "dhfs.files.write_last_chunk_limit")
float writeLastChunkLimit;
@ConfigProperty(name = "dhfs.files.use_hash_for_chunks")
boolean useHashForChunks;
@ConfigProperty(name = "dhfs.files.allow_recursive_delete")
boolean allowRecursiveDelete;
@ConfigProperty(name = "dhfs.objects.ref_verification")
boolean refVerification;
@ConfigProperty(name = "dhfs.objects.write_log")
boolean writeLogging;
@Inject
PersistentPeerDataService persistentPeerDataService;
@Inject
JKleppmannTreeManager jKleppmannTreeManager;
private JKleppmannTreeManager.JKleppmannTree _tree;
private ChunkData createChunk(ByteString bytes) {
if (useHashForChunks) {
return new ChunkData(bytes);
} else {
return new ChunkData(bytes, persistentPeerDataService.getUniqueId());
}
}
void init(@Observes @Priority(500) StartupEvent event) {
Log.info("Initializing file service");
_tree = jKleppmannTreeManager.getTree("fs");
}
private JObject<JKleppmannTreeNode> getDirEntry(String name) {
var res = _tree.traverse(StreamSupport.stream(Path.of(name).spliterator(), false).map(p -> p.toString()).toList());
if (res == null) throw new StatusRuntimeExceptionNoStacktrace(Status.NOT_FOUND);
var ret = jObjectManager.get(res).orElseThrow(() -> new StatusRuntimeException(Status.NOT_FOUND.withDescription("Tree node exists but not found as jObject: " + name)));
if (!ret.getMeta().getKnownClass().equals(JKleppmannTreeNode.class))
throw new StatusRuntimeException(Status.NOT_FOUND.withDescription("Tree node exists but not jObject: " + name));
return (JObject<JKleppmannTreeNode>) ret;
}
private Optional<JObject<JKleppmannTreeNode>> getDirEntryOpt(String name) {
var res = _tree.traverse(StreamSupport.stream(Path.of(name).spliterator(), false).map(p -> p.toString()).toList());
if (res == null) return Optional.empty();
var ret = jObjectManager.get(res).orElseThrow(() -> new StatusRuntimeException(Status.NOT_FOUND.withDescription("Tree node exists but not found as jObject: " + name)));
if (!ret.getMeta().getKnownClass().equals(JKleppmannTreeNode.class))
throw new StatusRuntimeException(Status.NOT_FOUND.withDescription("Tree node exists but not jObject: " + name));
return Optional.of((JObject<JKleppmannTreeNode>) ret);
}
@Override
public Optional<GetattrRes> getattr(String uuid) {
return jObjectTxManager.executeTx(() -> {
var ref = jObjectManager.get(uuid);
if (ref.isEmpty()) return Optional.empty();
return ref.get().runReadLocked(JObjectManager.ResolutionStrategy.REMOTE, (m, d) -> {
GetattrRes ret;
if (d instanceof File f) {
ret = new GetattrRes(f.getMtime(), f.getCtime(), f.getMode(), f.isSymlink() ? GetattrType.SYMLINK : GetattrType.FILE);
} else if (d instanceof JKleppmannTreeNode) {
ret = new GetattrRes(100, 100, 0700, GetattrType.DIRECTORY);
} else {
throw new StatusRuntimeException(Status.DATA_LOSS.withDescription("FsNode is not an FsNode: " + m.getName()));
}
return Optional.of(ret);
});
});
}
@Override
public Optional<String> open(String name) {
return jObjectTxManager.executeTx(() -> {
try {
var ret = getDirEntry(name);
return Optional.of(ret.runReadLocked(JObjectManager.ResolutionStrategy.LOCAL_ONLY, (m, d) -> {
if (d.getNode().getMeta() instanceof JKleppmannTreeNodeMetaFile f) return f.getFileIno();
else if (d.getNode().getMeta() instanceof JKleppmannTreeNodeMetaDirectory f) return m.getName();
throw new StatusRuntimeException(Status.DATA_LOSS.withDescription("FsNode is not an FsNode: " + m.getName()));
}));
} catch (StatusRuntimeException e) {
if (e.getStatus().getCode() == Status.Code.NOT_FOUND) {
return Optional.empty();
}
throw e;
}
});
}
private void ensureDir(JObject<JKleppmannTreeNode> entry) {
entry.runReadLocked(JObjectManager.ResolutionStrategy.REMOTE, (m, d) -> {
if (d.getNode().getMeta() instanceof JKleppmannTreeNodeMetaFile f)
throw new StatusRuntimeExceptionNoStacktrace(Status.INVALID_ARGUMENT.withDescription(m.getName() + " is a file, not directory"));
else if (d.getNode().getMeta() instanceof JKleppmannTreeNodeMetaDirectory f) return null;
throw new StatusRuntimeException(Status.DATA_LOSS.withDescription("FsNode is not an FsNode: " + m.getName()));
});
}
@Override
public Optional<String> create(String name, long mode) {
return jObjectTxManager.executeTx(() -> {
Path path = Path.of(name);
var parent = getDirEntry(path.getParent().toString());
ensureDir(parent);
String fname = path.getFileName().toString();
var fuuid = UUID.randomUUID();
Log.debug("Creating file " + fuuid);
File f = new File(fuuid, mode, false);
var newNodeId = _tree.getNewNodeId();
var fobj = jObjectManager.putLocked(f, Optional.of(newNodeId));
try {
_tree.move(parent.getMeta().getName(), new JKleppmannTreeNodeMetaFile(fname, f.getName()), newNodeId);
} catch (Exception e) {
fobj.getMeta().removeRef(newNodeId);
throw e;
} finally {
fobj.rwUnlock();
}
return Optional.of(f.getName());
});
}
//FIXME: Slow..
@Override
public Pair<String, String> inoToParent(String ino) {
return jObjectTxManager.executeTx(() -> {
return _tree.findParent(w -> {
if (w.getNode().getMeta() instanceof JKleppmannTreeNodeMetaFile f)
if (f.getFileIno().equals(ino))
return true;
return false;
});
});
}
@Override
public void mkdir(String name, long mode) {
jObjectTxManager.executeTx(() -> {
Path path = Path.of(name);
var parent = getDirEntry(path.getParent().toString());
ensureDir(parent);
String dname = path.getFileName().toString();
Log.debug("Creating directory " + name);
_tree.move(parent.getMeta().getName(), new JKleppmannTreeNodeMetaDirectory(dname), _tree.getNewNodeId());
});
}
@Override
public void unlink(String name) {
jObjectTxManager.executeTx(() -> {
var node = getDirEntryOpt(name).orElse(null);
JKleppmannTreeNodeMeta meta = node.runReadLocked(JObjectManager.ResolutionStrategy.REMOTE, (m, d) -> {
if (d.getNode().getMeta() instanceof JKleppmannTreeNodeMetaDirectory f)
if (!d.getNode().getChildren().isEmpty()) throw new DirectoryNotEmptyException();
return d.getNode().getMeta();
});
_tree.trash(meta, node.getMeta().getName());
});
}
@Override
public Boolean rename(String from, String to) {
return jObjectTxManager.executeTx(() -> {
var node = getDirEntry(from);
JKleppmannTreeNodeMeta meta = node.runReadLocked(JObjectManager.ResolutionStrategy.REMOTE, (m, d) -> d.getNode().getMeta());
var toPath = Path.of(to);
var toDentry = getDirEntry(toPath.getParent().toString());
ensureDir(toDentry);
_tree.move(toDentry.getMeta().getName(), meta.withName(toPath.getFileName().toString()), node.getMeta().getName());
return true;
});
}
@Override
public Boolean chmod(String uuid, long mode) {
return jObjectTxManager.executeTx(() -> {
var dent = jObjectManager.get(uuid).orElseThrow(() -> new StatusRuntimeExceptionNoStacktrace(Status.NOT_FOUND));
dent.runWriteLocked(JObjectManager.ResolutionStrategy.REMOTE, (m, d, bump, i) -> {
if (d instanceof JKleppmannTreeNode) {
return null;//FIXME:?
} else if (d instanceof File f) {
bump.apply();
f.setMtime(System.currentTimeMillis());
f.setMode(mode);
} else {
throw new IllegalArgumentException(uuid + " is not a file");
}
return null;
});
return true;
});
}
@Override
public Iterable<String> readDir(String name) {
return jObjectTxManager.executeTx(() -> {
var found = getDirEntry(name);
return found.runReadLocked(JObjectManager.ResolutionStrategy.REMOTE, (m, d) -> {
if (!(d instanceof JKleppmannTreeNode) || !(d.getNode().getMeta() instanceof JKleppmannTreeNodeMetaDirectory)) {
throw new StatusRuntimeException(Status.INVALID_ARGUMENT);
}
return new ArrayList<>(d.getNode().getChildren().keySet());
});
});
}
@Override
public Optional<ByteString> read(String fileUuid, long offset, int length) {
return jObjectTxManager.executeTx(() -> {
if (length < 0)
throw new StatusRuntimeException(Status.INVALID_ARGUMENT.withDescription("Length should be more than zero: " + length));
if (offset < 0)
throw new StatusRuntimeException(Status.INVALID_ARGUMENT.withDescription("Offset should be more than zero: " + offset));
var fileOpt = jObjectManager.get(fileUuid);
if (fileOpt.isEmpty()) {
Log.error("File not found when trying to read: " + fileUuid);
return Optional.empty();
}
var file = fileOpt.get();
try {
return file.runReadLocked(JObjectManager.ResolutionStrategy.REMOTE, (md, fileData) -> {
if (!(fileData instanceof File)) {
throw new StatusRuntimeException(Status.INVALID_ARGUMENT);
}
var chunksAll = ((File) fileData).getChunks();
if (chunksAll.isEmpty()) {
return Optional.of(ByteString.empty());
}
var chunksList = chunksAll.tailMap(chunksAll.floorKey(offset)).entrySet();
if (chunksList.isEmpty()) {
return Optional.of(ByteString.empty());
}
var chunks = chunksList.iterator();
ByteString buf = ByteString.empty();
long curPos = offset;
var chunk = chunks.next();
while (curPos < offset + length) {
var chunkPos = chunk.getKey();
long offInChunk = curPos - chunkPos;
long toReadInChunk = (offset + length) - curPos;
var chunkBytes = readChunk(chunk.getValue());
long readableLen = chunkBytes.size() - offInChunk;
var toReadReally = Math.min(readableLen, toReadInChunk);
if (toReadReally < 0) break;
buf = buf.concat(chunkBytes.substring((int) offInChunk, (int) (offInChunk + toReadReally)));
curPos += toReadReally;
if (readableLen > toReadInChunk)
break;
if (!chunks.hasNext()) break;
chunk = chunks.next();
}
// FIXME:
return Optional.of(buf);
});
} catch (Exception e) {
Log.error("Error reading file: " + fileUuid, e);
return Optional.empty();
}
});
}
private ByteString readChunk(String uuid) {
var chunkRead = jObjectManager.get(uuid).orElse(null);
if (chunkRead == null) {
Log.error("Chunk requested not found: " + uuid);
throw new StatusRuntimeException(Status.NOT_FOUND);
}
return chunkRead.runReadLocked(JObjectManager.ResolutionStrategy.REMOTE, (m, d) -> {
if (!(d instanceof ChunkData cd))
throw new StatusRuntimeException(Status.INVALID_ARGUMENT);
return cd.getBytes();
});
}
private int getChunkSize(String uuid) {
return readChunk(uuid).size();
}
private void cleanupChunks(File f, Collection<String> uuids) {
// FIXME:
var inFile = useHashForChunks ? new HashSet<>(f.getChunks().values()) : Collections.emptySet();
for (var cuuid : uuids) {
try {
if (inFile.contains(cuuid)) continue;
jObjectManager.get(cuuid)
.ifPresent(jObject -> jObject.runWriteLocked(JObjectManager.ResolutionStrategy.NO_RESOLUTION,
(m, d, b, v) -> {
m.removeRef(f.getName());
return null;
}));
} catch (Exception e) {
Log.error("Error when cleaning chunk " + cuuid, e);
}
}
}
@Override
public Long write(String fileUuid, long offset, ByteString data) {
return jObjectTxManager.executeTx(() -> {
if (offset < 0)
throw new StatusRuntimeException(Status.INVALID_ARGUMENT.withDescription("Offset should be more than zero: " + offset));
// FIXME:
var file = (JObject<File>) jObjectManager.get(fileUuid).orElse(null);
if (file == null) {
Log.error("File not found when trying to read: " + fileUuid);
return -1L;
}
file.rwLockNoCopy();
try {
file.tryResolve(JObjectManager.ResolutionStrategy.REMOTE);
// FIXME:
if (!(file.getData() instanceof File))
throw new StatusRuntimeException(Status.INVALID_ARGUMENT);
if (writeLogging) {
Log.info("Writing to file: " + file.getMeta().getName() + " size=" + size(fileUuid) + " "
+ offset + " " + data.size());
}
if (size(fileUuid) < offset)
truncate(fileUuid, offset);
// FIXME: Some kind of immutable interface?
var chunksAll = Collections.unmodifiableNavigableMap(file.getData().getChunks());
var first = chunksAll.floorEntry(offset);
var last = chunksAll.lowerEntry(offset + data.size());
NavigableMap<Long, String> removedChunks = new TreeMap<>();
long start = 0;
NavigableMap<Long, String> beforeFirst = first != null ? chunksAll.headMap(first.getKey(), false) : Collections.emptyNavigableMap();
NavigableMap<Long, String> afterLast = last != null ? chunksAll.tailMap(last.getKey(), false) : Collections.emptyNavigableMap();
if (first != null && (getChunkSize(first.getValue()) + first.getKey() <= offset)) {
beforeFirst = chunksAll;
afterLast = Collections.emptyNavigableMap();
first = null;
last = null;
start = offset;
} else if (!chunksAll.isEmpty()) {
var between = chunksAll.subMap(first.getKey(), true, last.getKey(), true);
removedChunks.putAll(between);
start = first.getKey();
}
ByteString pendingWrites = ByteString.empty();
if (first != null && first.getKey() < offset) {
var chunkBytes = readChunk(first.getValue());
pendingWrites = pendingWrites.concat(chunkBytes.substring(0, (int) (offset - first.getKey())));
}
pendingWrites = pendingWrites.concat(data);
if (last != null) {
var lchunkBytes = readChunk(last.getValue());
if (last.getKey() + lchunkBytes.size() > offset + data.size()) {
var startInFile = offset + data.size();
var startInChunk = startInFile - last.getKey();
pendingWrites = pendingWrites.concat(lchunkBytes.substring((int) startInChunk, lchunkBytes.size()));
}
}
int combinedSize = pendingWrites.size();
if (targetChunkSize > 0) {
if (combinedSize < (targetChunkSize * writeMergeThreshold)) {
boolean leftDone = false;
boolean rightDone = false;
while (!leftDone && !rightDone) {
if (beforeFirst.isEmpty()) leftDone = true;
if (!beforeFirst.isEmpty() || !leftDone) {
var takeLeft = beforeFirst.lastEntry();
var cuuid = takeLeft.getValue();
if (getChunkSize(cuuid) >= (targetChunkSize * writeMergeMaxChunkToTake)) {
leftDone = true;
continue;
}
if ((combinedSize + getChunkSize(cuuid)) > (targetChunkSize * writeMergeLimit)) {
leftDone = true;
continue;
}
// FIXME: (and test this)
beforeFirst = beforeFirst.headMap(takeLeft.getKey(), false);
start = takeLeft.getKey();
pendingWrites = readChunk(cuuid).concat(pendingWrites);
combinedSize += getChunkSize(cuuid);
removedChunks.put(takeLeft.getKey(), takeLeft.getValue());
}
if (afterLast.isEmpty()) rightDone = true;
if (!afterLast.isEmpty() && !rightDone) {
var takeRight = afterLast.firstEntry();
var cuuid = takeRight.getValue();
if (getChunkSize(cuuid) >= (targetChunkSize * writeMergeMaxChunkToTake)) {
rightDone = true;
continue;
}
if ((combinedSize + getChunkSize(cuuid)) > (targetChunkSize * writeMergeLimit)) {
rightDone = true;
continue;
}
// FIXME: (and test this)
afterLast = afterLast.tailMap(takeRight.getKey(), false);
pendingWrites = pendingWrites.concat(readChunk(cuuid));
combinedSize += getChunkSize(cuuid);
removedChunks.put(takeRight.getKey(), takeRight.getValue());
}
}
}
}
NavigableMap<Long, String> newChunks = new TreeMap<>();
{
int cur = 0;
while (cur < combinedSize) {
int end;
if (targetChunkSize <= 0)
end = combinedSize;
else {
if ((combinedSize - cur) > (targetChunkSize * writeLastChunkLimit)) {
end = Math.min(cur + targetChunkSize, combinedSize);
} else {
end = combinedSize;
}
}
var thisChunk = pendingWrites.substring(cur, end);
ChunkData newChunkData = createChunk(thisChunk);
//FIXME:
jObjectManager.put(newChunkData, Optional.of(file.getMeta().getName()));
newChunks.put(start, newChunkData.getName());
start += thisChunk.size();
cur = end;
}
}
file.mutate(new FileChunkMutator(file.getData().getMtime(), System.currentTimeMillis(), removedChunks, newChunks));
cleanupChunks(file.getData(), removedChunks.values());
updateFileSize((JObject<File>) file);
} finally {
file.rwUnlock();
}
return (long) data.size();
});
}
@Override
public Boolean truncate(String fileUuid, long length) {
return jObjectTxManager.executeTx(() -> {
if (length < 0)
throw new StatusRuntimeException(Status.INVALID_ARGUMENT.withDescription("Length should be more than zero: " + length));
var file = (JObject<File>) jObjectManager.get(fileUuid).orElse(null);
if (file == null) {
Log.error("File not found when trying to read: " + fileUuid);
return false;
}
if (length == 0) {
file.rwLockNoCopy();
try {
file.tryResolve(JObjectManager.ResolutionStrategy.REMOTE);
var oldChunks = Collections.unmodifiableNavigableMap(new TreeMap<>(file.getData().getChunks()));
file.mutate(new JMutator<>() {
long oldMtime;
@Override
public boolean mutate(File object) {
oldMtime = object.getMtime();
object.getChunks().clear();
return true;
}
@Override
public void revert(File object) {
object.setMtime(oldMtime);
object.getChunks().putAll(oldChunks);
}
});
cleanupChunks(file.getData(), oldChunks.values());
updateFileSize((JObject<File>) file);
} catch (Exception e) {
Log.error("Error writing file chunks: " + fileUuid, e);
return false;
} finally {
file.rwUnlock();
}
return true;
}
file.rwLockNoCopy();
try {
file.tryResolve(JObjectManager.ResolutionStrategy.REMOTE);
var curSize = size(fileUuid);
if (curSize == length) return true;
var chunksAll = Collections.unmodifiableNavigableMap(file.getData().getChunks());
NavigableMap<Long, String> removedChunks = new TreeMap<>();
NavigableMap<Long, String> newChunks = new TreeMap<>();
if (curSize < length) {
long combinedSize = (length - curSize);
long start = curSize;
// Hack
HashMap<Long, ByteString> zeroCache = new HashMap<>();
{
long cur = 0;
while (cur < combinedSize) {
long end;
if (targetChunkSize <= 0)
end = combinedSize;
else {
if ((combinedSize - cur) > (targetChunkSize * 1.5)) {
end = cur + targetChunkSize;
} else {
end = combinedSize;
}
}
if (!zeroCache.containsKey(end - cur))
zeroCache.put(end - cur, UnsafeByteOperations.unsafeWrap(new byte[Math.toIntExact(end - cur)]));
ChunkData newChunkData = createChunk(zeroCache.get(end - cur));
//FIXME:
jObjectManager.put(newChunkData, Optional.of(file.getMeta().getName()));
newChunks.put(start, newChunkData.getName());
start += newChunkData.getSize();
cur = end;
}
}
} else {
var tail = chunksAll.lowerEntry(length);
var afterTail = chunksAll.tailMap(tail.getKey(), false);
removedChunks.put(tail.getKey(), tail.getValue());
removedChunks.putAll(afterTail);
var tailBytes = readChunk(tail.getValue());
var newChunk = tailBytes.substring(0, (int) (length - tail.getKey()));
ChunkData newChunkData = createChunk(newChunk);
//FIXME:
jObjectManager.put(newChunkData, Optional.of(file.getMeta().getName()));
newChunks.put(tail.getKey(), newChunkData.getName());
}
file.mutate(new FileChunkMutator(file.getData().getMtime(), System.currentTimeMillis(), removedChunks, newChunks));
cleanupChunks(file.getData(), removedChunks.values());
updateFileSize((JObject<File>) file);
return true;
} catch (Exception e) {
Log.error("Error reading file: " + fileUuid, e);
return false;
} finally {
file.rwUnlock();
}
});
}
@Override
public String readlink(String uuid) {
return jObjectTxManager.executeTx(() -> {
return readlinkBS(uuid).toStringUtf8();
});
}
@Override
public ByteString readlinkBS(String uuid) {
return jObjectTxManager.executeTx(() -> {
var fileOpt = jObjectManager.get(uuid).orElseThrow(() -> new StatusRuntimeException(Status.NOT_FOUND.withDescription("File not found when trying to readlink: " + uuid)));
return fileOpt.runReadLocked(JObjectManager.ResolutionStrategy.REMOTE, (md, fileData) -> {
if (!(fileData instanceof File)) {
throw new StatusRuntimeException(Status.INVALID_ARGUMENT);
}
if (!((File) fileData).isSymlink())
throw new StatusRuntimeException(Status.INVALID_ARGUMENT.withDescription("Not a symlink: " + uuid));
return read(uuid, 0, Math.toIntExact(size(uuid))).get();
});
});
}
@Override
public String symlink(String oldpath, String newpath) {
return jObjectTxManager.executeTx(() -> {
Path path = Path.of(newpath);
var parent = getDirEntry(path.getParent().toString());
ensureDir(parent);
String fname = path.getFileName().toString();
var fuuid = UUID.randomUUID();
Log.debug("Creating file " + fuuid);
File f = new File(fuuid, 0, true);
var newNodeId = _tree.getNewNodeId();
ChunkData newChunkData = createChunk(UnsafeByteOperations.unsafeWrap(oldpath.getBytes(StandardCharsets.UTF_8)));
f.getChunks().put(0L, newChunkData.getName());
jObjectManager.put(newChunkData, Optional.of(f.getName()));
var newFile = jObjectManager.putLocked(f, Optional.of(newNodeId));
try {
updateFileSize(newFile);
} finally {
newFile.rwUnlock();
}
_tree.move(parent.getMeta().getName(), new JKleppmannTreeNodeMetaFile(fname, f.getName()), newNodeId);
return f.getName();
});
}
@Override
public Boolean setTimes(String fileUuid, long atimeMs, long mtimeMs) {
return jObjectTxManager.executeTx(() -> {
var file = jObjectManager.get(fileUuid).orElseThrow(
() -> new StatusRuntimeException(Status.NOT_FOUND.withDescription(
"File not found for setTimes: " + fileUuid))
);
file.runWriteLocked(JObjectManager.ResolutionStrategy.REMOTE, (m, fileData, bump, i) -> {
if (fileData instanceof JKleppmannTreeNode) return null; // FIXME:
if (!(fileData instanceof FsNode fd))
throw new StatusRuntimeException(Status.INVALID_ARGUMENT);
bump.apply();
fd.setMtime(mtimeMs);
return null;
});
return true;
});
}
@Override
public void updateFileSize(JObject<File> file) {
jObjectTxManager.executeTx(() -> {
file.rwLockNoCopy();
try {
file.tryResolve(JObjectManager.ResolutionStrategy.REMOTE);
if (!(file.getData() instanceof File fd))
throw new StatusRuntimeException(Status.INVALID_ARGUMENT);
long realSize = 0;
var last = fd.getChunks().lastEntry();
if (last != null) {
var lastSize = getChunkSize(last.getValue());
realSize = last.getKey() + lastSize;
}
if (realSize != fd.getSize()) {
long finalRealSize = realSize;
file.mutate(new JMutator<File>() {
long oldSize;
@Override
public boolean mutate(File object) {
oldSize = object.getSize();
object.setSize(finalRealSize);
return true;
}
@Override
public void revert(File object) {
object.setSize(oldSize);
}
});
}
} catch (Exception e) {
Log.error("Error updating file size: " + file.getMeta().getName(), e);
} finally {
file.rwUnlock();
}
});
}
@Override
public Long size(String uuid) {
return jObjectTxManager.executeTx(() -> {
var read = jObjectManager.get(uuid)
.orElseThrow(() -> new StatusRuntimeException(Status.NOT_FOUND));
try {
return read.runReadLocked(JObjectManager.ResolutionStrategy.REMOTE, (fsNodeData, fileData) -> {
if (!(fileData instanceof File fd))
throw new StatusRuntimeException(Status.INVALID_ARGUMENT);
return fd.getSize();
});
} catch (Exception e) {
Log.error("Error reading file: " + uuid, e);
return -1L;
}
});
}
}

View File

@@ -0,0 +1,8 @@
package com.usatiuk.dhfs.files.service;
public class DirectoryNotEmptyException extends RuntimeException {
@Override
public synchronized Throwable fillInStackTrace() {
return this;
}
}

View File

@@ -0,0 +1,4 @@
package com.usatiuk.dhfs.files.service;
public record GetattrRes(long mtime, long ctime, long mode, GetattrType type) {
}

View File

@@ -0,0 +1,7 @@
package com.usatiuk.dhfs.files.service;
public enum GetattrType {
FILE,
DIRECTORY,
SYMLINK
}

View File

@@ -0,0 +1,391 @@
package com.usatiuk.dhfs.fuse;
import com.google.protobuf.UnsafeByteOperations;
import com.sun.security.auth.module.UnixSystem;
import com.usatiuk.dhfs.files.service.DhfsFileService;
import com.usatiuk.dhfs.files.service.DirectoryNotEmptyException;
import com.usatiuk.dhfs.files.service.GetattrRes;
import com.usatiuk.dhfs.objects.repository.persistence.ObjectPersistentStore;
import com.usatiuk.dhfs.supportlib.UninitializedByteBuffer;
import com.usatiuk.kleppmanntree.AlreadyExistsException;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.quarkus.logging.Log;
import io.quarkus.runtime.ShutdownEvent;
import io.quarkus.runtime.StartupEvent;
import jakarta.annotation.Priority;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
import jakarta.inject.Inject;
import jnr.ffi.Pointer;
import org.apache.commons.lang3.SystemUtils;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import ru.serce.jnrfuse.ErrorCodes;
import ru.serce.jnrfuse.FuseFillDir;
import ru.serce.jnrfuse.FuseStubFS;
import ru.serce.jnrfuse.struct.FileStat;
import ru.serce.jnrfuse.struct.FuseFileInfo;
import ru.serce.jnrfuse.struct.Statvfs;
import ru.serce.jnrfuse.struct.Timespec;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Optional;
import static jnr.posix.FileStat.*;
@ApplicationScoped
public class DhfsFuse extends FuseStubFS {
private static final int blksize = 1048576;
private static final int iosize = 1048576;
@Inject
ObjectPersistentStore persistentStore; // FIXME?
@ConfigProperty(name = "dhfs.fuse.root")
String root;
@ConfigProperty(name = "dhfs.fuse.enabled")
boolean enabled;
@ConfigProperty(name = "dhfs.fuse.debug")
Boolean debug;
@ConfigProperty(name = "dhfs.files.target_chunk_size")
int targetChunkSize;
@Inject
JnrPtrByteOutputAccessors jnrPtrByteOutputAccessors;
@Inject
DhfsFileService fileService;
void init(@Observes @Priority(100000) StartupEvent event) {
if (!enabled) return;
Paths.get(root).toFile().mkdirs();
Log.info("Mounting with root " + root);
var uid = new UnixSystem().getUid();
var gid = new UnixSystem().getGid();
var opts = new ArrayList<String>();
// Assuming macFuse
if (SystemUtils.IS_OS_MAC) {
opts.add("-o");
opts.add("iosize=" + iosize);
} else if (SystemUtils.IS_OS_LINUX) {
// FIXME: There's something else missing: the writes still seem to be 32k max
// opts.add("-o");
// opts.add("large_read");
opts.add("-o");
opts.add("big_writes");
opts.add("-o");
opts.add("max_read=" + iosize);
opts.add("-o");
opts.add("max_write=" + iosize);
}
opts.add("-o");
opts.add("auto_cache");
opts.add("-o");
opts.add("uid=" + uid);
opts.add("-o");
opts.add("gid=" + gid);
mount(Paths.get(root), false, debug, opts.toArray(String[]::new));
}
void shutdown(@Observes @Priority(1) ShutdownEvent event) {
if (!enabled) return;
Log.info("Unmounting");
umount();
Log.info("Unmounted");
}
@Override
public int statfs(String path, Statvfs stbuf) {
try {
stbuf.f_frsize.set(blksize);
stbuf.f_bsize.set(blksize);
stbuf.f_blocks.set(persistentStore.getTotalSpace() / blksize); // total data blocks in file system
stbuf.f_bfree.set(persistentStore.getFreeSpace() / blksize); // free blocks in fs
stbuf.f_bavail.set(persistentStore.getUsableSpace() / blksize); // avail blocks in fs
stbuf.f_files.set(1000); //FIXME:
stbuf.f_ffree.set(Integer.MAX_VALUE - 2000); //FIXME:
stbuf.f_favail.set(Integer.MAX_VALUE - 2000); //FIXME:
stbuf.f_namemax.set(2048);
return super.statfs(path, stbuf);
} catch (Exception e) {
Log.error("When statfs " + path, e);
return -ErrorCodes.EIO();
}
}
@Override
public int getattr(String path, FileStat stat) {
try {
var fileOpt = fileService.open(path);
if (fileOpt.isEmpty()) return -ErrorCodes.ENOENT();
var uuid = fileOpt.get();
Optional<GetattrRes> found = fileService.getattr(uuid);
if (found.isEmpty()) {
return -ErrorCodes.ENOENT();
}
switch (found.get().type()) {
case FILE -> {
stat.st_mode.set(S_IFREG | found.get().mode());
stat.st_nlink.set(1);
stat.st_size.set(fileService.size(uuid));
}
case DIRECTORY -> {
stat.st_mode.set(S_IFDIR | found.get().mode());
stat.st_nlink.set(2);
}
case SYMLINK -> {
stat.st_mode.set(S_IFLNK | 0777);
stat.st_nlink.set(1);
stat.st_size.set(fileService.size(uuid));
}
}
// FIXME: Race?
stat.st_ctim.tv_sec.set(found.get().ctime() / 1000);
stat.st_ctim.tv_nsec.set((found.get().ctime() % 1000) * 1000);
stat.st_mtim.tv_sec.set(found.get().mtime() / 1000);
stat.st_mtim.tv_nsec.set((found.get().mtime() % 1000) * 1000);
stat.st_atim.tv_sec.set(found.get().mtime() / 1000);
stat.st_atim.tv_nsec.set((found.get().mtime() % 1000) * 1000);
stat.st_blksize.set(blksize);
} catch (Exception e) {
Log.error("When getattr " + path, e);
return -ErrorCodes.EIO();
} catch (Throwable e) {
Log.error("When getattr " + path, e);
return -ErrorCodes.EIO();
}
return 0;
}
@Override
public int utimens(String path, Timespec[] timespec) {
try {
var fileOpt = fileService.open(path);
if (fileOpt.isEmpty()) return -ErrorCodes.ENOENT();
var file = fileOpt.get();
var res = fileService.setTimes(file,
timespec[0].tv_sec.get() * 1000,
timespec[1].tv_sec.get() * 1000);
if (!res) return -ErrorCodes.EINVAL();
else return 0;
} catch (Exception e) {
Log.error("When utimens " + path, e);
return -ErrorCodes.EIO();
}
}
@Override
public int open(String path, FuseFileInfo fi) {
try {
if (fileService.open(path).isEmpty()) return -ErrorCodes.ENOENT();
return 0;
} catch (Exception e) {
Log.error("When open " + path, e);
return -ErrorCodes.EIO();
}
}
@Override
public int read(String path, Pointer buf, long size, long offset, FuseFileInfo fi) {
if (size < 0) return -ErrorCodes.EINVAL();
if (offset < 0) return -ErrorCodes.EINVAL();
try {
var fileOpt = fileService.open(path);
if (fileOpt.isEmpty()) return -ErrorCodes.ENOENT();
var file = fileOpt.get();
var read = fileService.read(fileOpt.get(), offset, (int) size);
if (read.isEmpty()) return 0;
UnsafeByteOperations.unsafeWriteTo(read.get(), new JnrPtrByteOutput(jnrPtrByteOutputAccessors, buf, size));
return read.get().size();
} catch (Exception e) {
Log.error("When reading " + path, e);
return -ErrorCodes.EIO();
}
}
@Override
public int write(String path, Pointer buf, long size, long offset, FuseFileInfo fi) {
if (offset < 0) return -ErrorCodes.EINVAL();
try {
var fileOpt = fileService.open(path);
if (fileOpt.isEmpty()) return -ErrorCodes.ENOENT();
var buffer = UninitializedByteBuffer.allocateUninitialized((int) size);
jnrPtrByteOutputAccessors.getUnsafe().copyMemory(
buf.address(),
jnrPtrByteOutputAccessors.getNioAccess().getBufferAddress(buffer),
size
);
var written = fileService.write(fileOpt.get(), offset, UnsafeByteOperations.unsafeWrap(buffer));
return written.intValue();
} catch (Exception e) {
Log.error("When writing " + path, e);
return -ErrorCodes.EIO();
}
}
@Override
public int create(String path, long mode, FuseFileInfo fi) {
try {
var ret = fileService.create(path, mode);
if (ret.isEmpty()) return -ErrorCodes.ENOSPC();
else return 0;
} catch (Exception e) {
Log.error("When creating " + path, e);
return -ErrorCodes.EIO();
}
}
@Override
public int mkdir(String path, long mode) {
try {
fileService.mkdir(path, mode);
return 0;
} catch (AlreadyExistsException aex) {
return -ErrorCodes.EEXIST();
} catch (Exception e) {
Log.error("When creating dir " + path, e);
return -ErrorCodes.EIO();
}
}
@Override
public int rmdir(String path) {
try {
fileService.unlink(path);
return 0;
} catch (DirectoryNotEmptyException ex) {
return -ErrorCodes.ENOTEMPTY();
} catch (Exception e) {
Log.error("When removing dir " + path, e);
return -ErrorCodes.EIO();
}
}
@Override
public int rename(String path, String newName) {
try {
var ret = fileService.rename(path, newName);
if (!ret) return -ErrorCodes.ENOENT();
else return 0;
} catch (Exception e) {
Log.error("When renaming " + path, e);
return -ErrorCodes.EIO();
}
}
@Override
public int unlink(String path) {
try {
fileService.unlink(path);
return 0;
} catch (Exception e) {
Log.error("When unlinking " + path, e);
return -ErrorCodes.EIO();
}
}
@Override
public int truncate(String path, long size) {
if (size < 0) return -ErrorCodes.EINVAL();
try {
var fileOpt = fileService.open(path);
if (fileOpt.isEmpty()) return -ErrorCodes.ENOENT();
var file = fileOpt.get();
var ok = fileService.truncate(file, size);
if (ok)
return 0;
else
return -ErrorCodes.ENOSPC();
} catch (Exception e) {
Log.error("When truncating " + path, e);
return -ErrorCodes.EIO();
}
}
@Override
public int chmod(String path, long mode) {
try {
var fileOpt = fileService.open(path);
if (fileOpt.isEmpty()) return -ErrorCodes.ENOENT();
var ret = fileService.chmod(fileOpt.get(), mode);
if (ret) return 0;
else return -ErrorCodes.EINVAL();
} catch (Exception e) {
Log.error("When chmod " + path, e);
return -ErrorCodes.EIO();
}
}
@Override
public int readdir(String path, Pointer buf, FuseFillDir filler, long offset, FuseFileInfo fi) {
try {
Iterable<String> found;
try {
found = fileService.readDir(path);
} catch (StatusRuntimeException e) {
if (e.getStatus().getCode().equals(Status.NOT_FOUND.getCode()))
return -ErrorCodes.ENOENT();
else throw e;
}
filler.apply(buf, ".", null, 0);
filler.apply(buf, "..", null, 0);
for (var c : found) {
filler.apply(buf, c, null, 0);
}
return 0;
} catch (Exception e) {
Log.error("When readdir " + path, e);
return -ErrorCodes.EIO();
}
}
@Override
public int readlink(String path, Pointer buf, long size) {
if (size < 0) return -ErrorCodes.EINVAL();
try {
var fileOpt = fileService.open(path);
if (fileOpt.isEmpty()) return -ErrorCodes.ENOENT();
var file = fileOpt.get();
var read = fileService.readlinkBS(fileOpt.get());
if (read.isEmpty()) return 0;
UnsafeByteOperations.unsafeWriteTo(read, new JnrPtrByteOutput(jnrPtrByteOutputAccessors, buf, size));
buf.putByte(Math.min(size - 1, read.size()), (byte) 0);
return 0;
} catch (Exception e) {
Log.error("When reading " + path, e);
return -ErrorCodes.EIO();
}
}
@Override
public int chown(String path, long uid, long gid) {
try {
var fileOpt = fileService.open(path);
if (fileOpt.isEmpty()) return -ErrorCodes.ENOENT();
return 0;
} catch (Exception e) {
Log.error("When chown " + path, e);
return -ErrorCodes.EIO();
}
}
@Override
public int symlink(String oldpath, String newpath) {
try {
var ret = fileService.symlink(oldpath, newpath);
if (ret == null) return -ErrorCodes.EEXIST();
else return 0;
} catch (Exception e) {
Log.error("When creating " + newpath, e);
return -ErrorCodes.EIO();
}
}
}

View File

@@ -0,0 +1,64 @@
package com.usatiuk.dhfs.fuse;
import com.google.protobuf.ByteOutput;
import jnr.ffi.Pointer;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
public class JnrPtrByteOutput extends ByteOutput {
private final Pointer _backing;
private final long _size;
private final JnrPtrByteOutputAccessors _accessors;
private long _pos;
public JnrPtrByteOutput(JnrPtrByteOutputAccessors accessors, Pointer backing, long size) {
_backing = backing;
_size = size;
_pos = 0;
_accessors = accessors;
}
@Override
public void write(byte value) {
throw new UnsupportedOperationException();
}
@Override
public void write(byte[] value, int offset, int length) {
if (length + _pos > _size) throw new IndexOutOfBoundsException();
_backing.put(_pos, value, offset, length);
_pos += length;
}
@Override
public void writeLazy(byte[] value, int offset, int length) {
if (length + _pos > _size) throw new IndexOutOfBoundsException();
_backing.put(_pos, value, offset, length);
_pos += length;
}
@Override
public void write(ByteBuffer value) {
var rem = value.remaining();
if (rem + _pos > _size) throw new IndexOutOfBoundsException();
if (value.isDirect()) {
if (value instanceof MappedByteBuffer mb) {
mb.load();
}
long addr = _accessors.getNioAccess().getBufferAddress(value) + value.position();
var out = _backing.address() + _pos;
_accessors.getUnsafe().copyMemory(addr, out, rem);
} else {
throw new UnsupportedOperationException();
}
_pos += rem;
}
@Override
public void writeLazy(ByteBuffer value) {
write(value);
}
}

View File

@@ -0,0 +1,24 @@
package com.usatiuk.dhfs.fuse;
import jakarta.inject.Singleton;
import jdk.internal.access.JavaNioAccess;
import jdk.internal.access.SharedSecrets;
import lombok.Getter;
import sun.misc.Unsafe;
import java.lang.reflect.Field;
@Singleton
class JnrPtrByteOutputAccessors {
@Getter
JavaNioAccess _nioAccess;
@Getter
Unsafe _unsafe;
JnrPtrByteOutputAccessors() throws NoSuchFieldException, IllegalAccessException {
_nioAccess = SharedSecrets.getJavaNioAccess();
Field f = Unsafe.class.getDeclaredField("theUnsafe");
f.setAccessible(true);
_unsafe = (Unsafe) f.get(null);
}
}

View File

@@ -0,0 +1,566 @@
package com.usatiuk.dhfs.objects.jkleppmanntree;
import com.usatiuk.dhfs.files.objects.File;
import com.usatiuk.dhfs.objects.jkleppmanntree.structs.*;
import com.usatiuk.dhfs.objects.jrepository.*;
import com.usatiuk.dhfs.objects.repository.PersistentPeerDataService;
import com.usatiuk.dhfs.objects.repository.opsupport.Op;
import com.usatiuk.dhfs.objects.repository.opsupport.OpObject;
import com.usatiuk.dhfs.objects.repository.opsupport.OpObjectRegistry;
import com.usatiuk.dhfs.objects.repository.opsupport.OpSender;
import com.usatiuk.kleppmanntree.*;
import com.usatiuk.dhfs.utils.VoidFn;
import io.quarkus.logging.Log;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import org.apache.commons.lang3.tuple.Pair;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
@ApplicationScoped
public class JKleppmannTreeManager {
private static final String dataFileName = "trees";
private final ConcurrentHashMap<String, JKleppmannTree> _trees = new ConcurrentHashMap<>();
@Inject
JKleppmannTreePeerInterface jKleppmannTreePeerInterface;
@Inject
OpSender opSender;
@Inject
OpObjectRegistry opObjectRegistry;
@Inject
JObjectManager jObjectManager;
@Inject
PersistentPeerDataService persistentPeerDataService;
@Inject
JObjectTxManager jObjectTxManager;
@Inject
SoftJObjectFactory softJObjectFactory;
@Inject
JKleppmannTreePeerInterface peerInterface;
public JKleppmannTree getTree(String name) {
return _trees.computeIfAbsent(name, this::createTree);
}
private JKleppmannTree createTree(String name) {
return jObjectTxManager.executeTx(() -> {
var data = jObjectManager.get(JKleppmannTreePersistentData.nameFromTreeName(name)).orElse(null);
if (data == null) {
data = jObjectManager.put(new JKleppmannTreePersistentData(name), Optional.empty());
}
var tree = new JKleppmannTree(name);
opObjectRegistry.registerObject(tree);
return tree;
});
}
public class JKleppmannTree implements OpObject {
private final KleppmannTree<Long, UUID, JKleppmannTreeNodeMeta, String, JKleppmannTreeNodeWrapper> _tree;
private final SoftJObject<JKleppmannTreePersistentData> _persistentData;
private final JKleppmannTreeStorageInterface _storageInterface;
private final JKleppmannTreeClock _clock;
private final String _treeName;
JKleppmannTree(String treeName) {
_treeName = treeName;
_persistentData = softJObjectFactory.create(JKleppmannTreePersistentData.class, JKleppmannTreePersistentData.nameFromTreeName(treeName));
_storageInterface = new JKleppmannTreeStorageInterface();
_clock = new JKleppmannTreeClock();
_tree = new KleppmannTree<>(_storageInterface, peerInterface, _clock, new JOpRecorder());
}
public String traverse(List<String> names) {
return _tree.traverse(names);
}
public String getNewNodeId() {
return _storageInterface.getNewNodeId();
}
public void move(String newParent, JKleppmannTreeNodeMeta newMeta, String node) {
_tree.move(newParent, newMeta, node);
}
public void trash(JKleppmannTreeNodeMeta newMeta, String node) {
_tree.move(_storageInterface.getTrashId(), newMeta.withName(node), node);
}
@Override
public boolean hasPendingOpsForHost(UUID host) {
return _persistentData.get()
.runReadLocked(JObjectManager.ResolutionStrategy.LOCAL_ONLY,
(m, d) -> d.getQueues().containsKey(host) &&
!d.getQueues().get(host).isEmpty()
);
}
@Override
public List<Op> getPendingOpsForHost(UUID host, int limit) {
return _persistentData.get().runReadLocked(JObjectManager.ResolutionStrategy.LOCAL_ONLY, (m, d) -> {
if (d.getQueues().containsKey(host)) {
var queue = d.getQueues().get(host);
ArrayList<Op> collected = new ArrayList<>();
for (var node : queue.entrySet()) {
collected.add(new JKleppmannTreeOpWrapper(node.getValue()));
if (collected.size() >= limit) break;
}
return collected;
}
return List.of();
});
}
@Override
public String getId() {
return _treeName;
}
@Override
public void commitOpForHost(UUID host, Op op) {
if (!(op instanceof JKleppmannTreeOpWrapper jop))
throw new IllegalArgumentException("Invalid incoming op type for JKleppmannTree: " + op.getClass() + " " + getId());
_persistentData.get().assertRwLock();
_persistentData.get().tryResolve(JObjectManager.ResolutionStrategy.LOCAL_ONLY);
var got = _persistentData.get().getData().getQueues().get(host).firstEntry().getValue();
if (!Objects.equals(jop.getOp(), got))
throw new IllegalArgumentException("Committed op push was not the oldest");
_persistentData.get().mutate(new JMutator<JKleppmannTreePersistentData>() {
@Override
public boolean mutate(JKleppmannTreePersistentData object) {
object.getQueues().get(host).pollFirstEntry();
return true;
}
@Override
public void revert(JKleppmannTreePersistentData object) {
object.getQueues().get(host).put(jop.getOp().timestamp(), jop.getOp());
}
});
}
@Override
public void pushBootstrap(UUID host) {
_tree.recordBoostrapFor(host);
}
public Pair<String, String> findParent(Function<JKleppmannTreeNodeWrapper, Boolean> predicate) {
return _tree.findParent(predicate);
}
@Override
public boolean acceptExternalOp(UUID from, Op op) {
if (op instanceof JKleppmannTreePeriodicPushOp pushOp) {
return _tree.updateExternalTimestamp(pushOp.getFrom(), pushOp.getTimestamp());
}
if (!(op instanceof JKleppmannTreeOpWrapper jop))
throw new IllegalArgumentException("Invalid incoming op type for JKleppmannTree: " + op.getClass() + " " + getId());
JObject<?> fileRef;
if (jop.getOp().newMeta() instanceof JKleppmannTreeNodeMetaFile f) {
var fino = f.getFileIno();
fileRef = jObjectManager.getOrPut(fino, File.class, Optional.of(jop.getOp().childId()));
} else {
fileRef = null;
}
if (Log.isTraceEnabled())
Log.trace("Received op from " + from + ": " + jop.getOp().timestamp().timestamp() + " " + jop.getOp().childId() + "->" + jop.getOp().newParentId() + " as " + jop.getOp().newMeta().getName());
try {
_tree.applyExternalOp(from, jop.getOp());
} catch (Exception e) {
Log.error("Error applying external op", e);
throw e;
} finally {
// FIXME:
// Fixup the ref if it didn't really get applied
if ((fileRef == null) && (jop.getOp().newMeta() instanceof JKleppmannTreeNodeMetaFile))
Log.error("Could not create child of pushed op: " + jop.getOp());
if (jop.getOp().newMeta() instanceof JKleppmannTreeNodeMetaFile f) {
if (fileRef != null) {
var got = jObjectManager.get(jop.getOp().childId()).orElse(null);
VoidFn remove = () -> {
fileRef.runWriteLockedVoid(JObjectManager.ResolutionStrategy.LOCAL_ONLY, (m, d, b, v) -> {
m.removeRef(jop.getOp().childId());
});
};
if (got == null) {
remove.apply();
} else {
try {
got.rLock();
try {
got.tryResolve(JObjectManager.ResolutionStrategy.LOCAL_ONLY);
if (got.getData() == null || !got.getData().extractRefs().contains(f.getFileIno()))
remove.apply();
} finally {
got.rUnlock();
}
} catch (DeletedObjectAccessException dex) {
remove.apply();
}
}
}
}
}
return true;
}
@Override
public Op getPeriodicPushOp() {
return new JKleppmannTreePeriodicPushOp(persistentPeerDataService.getSelfUuid(), _clock.peekTimestamp());
}
@Override
public void addToTx() {
// FIXME: a hack
_persistentData.get().rwLockNoCopy();
_persistentData.get().rwUnlock();
}
private class JOpRecorder implements OpRecorder<Long, UUID, JKleppmannTreeNodeMeta, String> {
@Override
public void recordOp(OpMove<Long, UUID, JKleppmannTreeNodeMeta, String> op) {
_persistentData.get().assertRwLock();
_persistentData.get().tryResolve(JObjectManager.ResolutionStrategy.LOCAL_ONLY);
var hostUuds = persistentPeerDataService.getHostUuids().stream().toList();
_persistentData.get().mutate(new JMutator<JKleppmannTreePersistentData>() {
@Override
public boolean mutate(JKleppmannTreePersistentData object) {
object.recordOp(hostUuds, op);
return true;
}
@Override
public void revert(JKleppmannTreePersistentData object) {
object.removeOp(hostUuds, op);
}
});
opSender.push(JKleppmannTree.this);
}
@Override
public void recordOpForPeer(UUID peer, OpMove<Long, UUID, JKleppmannTreeNodeMeta, String> op) {
_persistentData.get().assertRwLock();
_persistentData.get().tryResolve(JObjectManager.ResolutionStrategy.LOCAL_ONLY);
_persistentData.get().mutate(new JMutator<JKleppmannTreePersistentData>() {
@Override
public boolean mutate(JKleppmannTreePersistentData object) {
object.recordOp(peer, op);
return true;
}
@Override
public void revert(JKleppmannTreePersistentData object) {
object.removeOp(peer, op);
}
});
opSender.push(JKleppmannTree.this);
}
}
private class JKleppmannTreeClock implements Clock<Long> {
@Override
public Long getTimestamp() {
_persistentData.get().assertRwLock();
_persistentData.get().tryResolve(JObjectManager.ResolutionStrategy.LOCAL_ONLY);
var ret = _persistentData.get().getData().getClock().peekTimestamp() + 1;
_persistentData.get().mutate(new JMutator<JKleppmannTreePersistentData>() {
@Override
public boolean mutate(JKleppmannTreePersistentData object) {
object.getClock().getTimestamp();
return true;
}
@Override
public void revert(JKleppmannTreePersistentData object) {
object.getClock().ungetTimestamp();
}
});
return ret;
}
@Override
public Long peekTimestamp() {
return _persistentData.get().runReadLocked(JObjectManager.ResolutionStrategy.LOCAL_ONLY, (m, d) -> d.getClock().peekTimestamp());
}
@Override
public Long updateTimestamp(Long receivedTimestamp) {
_persistentData.get().assertRwLock();
_persistentData.get().tryResolve(JObjectManager.ResolutionStrategy.LOCAL_ONLY);
_persistentData.get().mutate(new JMutator<JKleppmannTreePersistentData>() {
Long _old;
@Override
public boolean mutate(JKleppmannTreePersistentData object) {
_old = object.getClock().updateTimestamp(receivedTimestamp);
return true;
}
@Override
public void revert(JKleppmannTreePersistentData object) {
object.getClock().setTimestamp(_old);
}
});
return _persistentData.get().getData().getClock().peekTimestamp();
}
}
public class JKleppmannTreeStorageInterface implements StorageInterface<Long, UUID, JKleppmannTreeNodeMeta, String, JKleppmannTreeNodeWrapper> {
private final LogWrapper _logWrapper = new LogWrapper();
private final PeerLogWrapper _peerLogWrapper = new PeerLogWrapper();
public JKleppmannTreeStorageInterface() {
if (jObjectManager.get(getRootId()).isEmpty()) {
putNode(new JKleppmannTreeNode(new TreeNode<>(getRootId(), null, new JKleppmannTreeNodeMetaDirectory(""))));
putNode(new JKleppmannTreeNode(new TreeNode<>(getTrashId(), null, null)));
}
}
public JObject<JKleppmannTreeNode> putNode(JKleppmannTreeNode node) {
return jObjectManager.put(node, Optional.ofNullable(node.getNode().getParent()));
}
public JObject<JKleppmannTreeNode> putNodeLocked(JKleppmannTreeNode node) {
return jObjectManager.putLocked(node, Optional.ofNullable(node.getNode().getParent()));
}
@Override
public String getRootId() {
return _treeName + "_jt_root";
}
@Override
public String getTrashId() {
return _treeName + "_jt_trash";
}
@Override
public String getNewNodeId() {
return persistentPeerDataService.getUniqueId();
}
@Override
public JKleppmannTreeNodeWrapper getById(String id) {
var got = jObjectManager.get(id);
if (got.isEmpty()) return null;
return new JKleppmannTreeNodeWrapper((JObject<JKleppmannTreeNode>) got.get());
}
@Override
public JKleppmannTreeNodeWrapper createNewNode(TreeNode<Long, UUID, JKleppmannTreeNodeMeta, String> node) {
return new JKleppmannTreeNodeWrapper(putNodeLocked(new JKleppmannTreeNode(node)));
}
@Override
public void removeNode(String id) {}
@Override
public LogInterface<Long, UUID, JKleppmannTreeNodeMeta, String> getLog() {
return _logWrapper;
}
@Override
public PeerTimestampLogInterface<Long, UUID> getPeerTimestampLog() {
return _peerLogWrapper;
}
@Override
public void rLock() {
_persistentData.get().rLock();
}
@Override
public void rUnlock() {
_persistentData.get().rUnlock();
}
@Override
public void rwLock() {
_persistentData.get().rwLockNoCopy();
}
@Override
public void rwUnlock() {
_persistentData.get().rwUnlock();
}
@Override
public void assertRwLock() {
_persistentData.get().assertRwLock();
}
private class PeerLogWrapper implements PeerTimestampLogInterface<Long, UUID> {
@Override
public Long getForPeer(UUID peerId) {
return _persistentData.get().runReadLocked(JObjectManager.ResolutionStrategy.LOCAL_ONLY,
(m, d) -> d.getPeerTimestampLog().get(peerId));
}
@Override
public void putForPeer(UUID peerId, Long timestamp) {
_persistentData.get().assertRwLock();
_persistentData.get().tryResolve(JObjectManager.ResolutionStrategy.LOCAL_ONLY);
_persistentData.get().mutate(new JMutator<JKleppmannTreePersistentData>() {
Long old;
@Override
public boolean mutate(JKleppmannTreePersistentData object) {
old = object.getPeerTimestampLog().put(peerId, timestamp);
return !Objects.equals(old, timestamp);
}
@Override
public void revert(JKleppmannTreePersistentData object) {
if (old != null)
object.getPeerTimestampLog().put(peerId, old);
else
object.getPeerTimestampLog().remove(peerId, timestamp);
}
});
}
}
private class LogWrapper implements LogInterface<Long, UUID, JKleppmannTreeNodeMeta, String> {
@Override
public Pair<CombinedTimestamp<Long, UUID>, LogRecord<Long, UUID, JKleppmannTreeNodeMeta, String>> peekOldest() {
return _persistentData.get().runReadLocked(JObjectManager.ResolutionStrategy.LOCAL_ONLY, (m, d) -> {
var ret = d.getLog().firstEntry();
if (ret == null) return null;
return Pair.of(ret);
});
}
@Override
public Pair<CombinedTimestamp<Long, UUID>, LogRecord<Long, UUID, JKleppmannTreeNodeMeta, String>> takeOldest() {
_persistentData.get().assertRwLock();
_persistentData.get().tryResolve(JObjectManager.ResolutionStrategy.LOCAL_ONLY);
var ret = _persistentData.get().getData().getLog().firstEntry();
if (ret != null)
_persistentData.get().mutate(new JMutator<JKleppmannTreePersistentData>() {
@Override
public boolean mutate(JKleppmannTreePersistentData object) {
object.getLog().pollFirstEntry();
return true;
}
@Override
public void revert(JKleppmannTreePersistentData object) {
object.getLog().put(ret.getKey(), ret.getValue());
}
});
return Pair.of(ret);
}
@Override
public Pair<CombinedTimestamp<Long, UUID>, LogRecord<Long, UUID, JKleppmannTreeNodeMeta, String>> peekNewest() {
return _persistentData.get().runReadLocked(JObjectManager.ResolutionStrategy.LOCAL_ONLY, (m, d) -> {
var ret = d.getLog().lastEntry();
if (ret == null) return null;
return Pair.of(ret);
});
}
@Override
public List<Pair<CombinedTimestamp<Long, UUID>, LogRecord<Long, UUID, JKleppmannTreeNodeMeta, String>>> newestSlice(CombinedTimestamp<Long, UUID> since, boolean inclusive) {
return _persistentData.get().runReadLocked(JObjectManager.ResolutionStrategy.LOCAL_ONLY, (m, d) -> {
var tail = d.getLog().tailMap(since, inclusive);
return tail.entrySet().stream().map(e -> Pair.of(e.getKey(), e.getValue())).toList();
});
}
@Override
public List<Pair<CombinedTimestamp<Long, UUID>, LogRecord<Long, UUID, JKleppmannTreeNodeMeta, String>>> getAll() {
return _persistentData.get().runReadLocked(JObjectManager.ResolutionStrategy.LOCAL_ONLY, (m, d) -> {
return d.getLog().entrySet().stream().map(e -> Pair.of(e.getKey(), e.getValue())).toList();
});
}
@Override
public boolean isEmpty() {
return _persistentData.get().runReadLocked(JObjectManager.ResolutionStrategy.LOCAL_ONLY, (m, d) -> {
return d.getLog().isEmpty();
});
}
@Override
public boolean containsKey(CombinedTimestamp<Long, UUID> timestamp) {
return _persistentData.get().runReadLocked(JObjectManager.ResolutionStrategy.LOCAL_ONLY, (m, d) -> {
return d.getLog().containsKey(timestamp);
});
}
@Override
public long size() {
return _persistentData.get().runReadLocked(JObjectManager.ResolutionStrategy.LOCAL_ONLY, (m, d) -> {
return (long) d.getLog().size();
});
}
@Override
public void put(CombinedTimestamp<Long, UUID> timestamp, LogRecord<Long, UUID, JKleppmannTreeNodeMeta, String> record) {
_persistentData.get().assertRwLock();
_persistentData.get().tryResolve(JObjectManager.ResolutionStrategy.LOCAL_ONLY);
if (_persistentData.get().getData().getLog().containsKey(timestamp))
throw new IllegalStateException("Overwriting log entry?");
_persistentData.get().mutate(new JMutator<JKleppmannTreePersistentData>() {
@Override
public boolean mutate(JKleppmannTreePersistentData object) {
object.getLog().put(timestamp, record);
return true;
}
@Override
public void revert(JKleppmannTreePersistentData object) {
object.getLog().remove(timestamp, record);
}
});
}
@Override
public void replace(CombinedTimestamp<Long, UUID> timestamp, LogRecord<Long, UUID, JKleppmannTreeNodeMeta, String> record) {
_persistentData.get().assertRwLock();
_persistentData.get().tryResolve(JObjectManager.ResolutionStrategy.LOCAL_ONLY);
_persistentData.get().mutate(new JMutator<JKleppmannTreePersistentData>() {
LogRecord<Long, UUID, JKleppmannTreeNodeMeta, String> old;
@Override
public boolean mutate(JKleppmannTreePersistentData object) {
old = object.getLog().put(timestamp, record);
return !Objects.equals(old, record);
}
@Override
public void revert(JKleppmannTreePersistentData object) {
if (old != null)
object.getLog().put(timestamp, old);
else
object.getLog().remove(timestamp, record);
}
});
}
}
}
}
}

View File

@@ -0,0 +1,71 @@
package com.usatiuk.dhfs.objects.jkleppmanntree;
import com.usatiuk.dhfs.objects.jkleppmanntree.structs.JKleppmannTreeNode;
import com.usatiuk.dhfs.objects.jkleppmanntree.structs.JKleppmannTreeNodeMeta;
import com.usatiuk.dhfs.objects.jrepository.JObject;
import com.usatiuk.dhfs.objects.jrepository.JObjectManager;
import com.usatiuk.kleppmanntree.TreeNode;
import com.usatiuk.kleppmanntree.TreeNodeWrapper;
import java.util.UUID;
public class JKleppmannTreeNodeWrapper implements TreeNodeWrapper<Long, UUID, JKleppmannTreeNodeMeta, String> {
private final JObject<JKleppmannTreeNode> _backing;
public JKleppmannTreeNodeWrapper(JObject<JKleppmannTreeNode> backing) {_backing = backing;}
@Override
public void rLock() {
_backing.rLock();
}
@Override
public void rUnlock() {
_backing.rUnlock();
}
@Override
public void rwLock() {
_backing.rwLock();
}
@Override
public void rwUnlock() {
_backing.bumpVer(); // FIXME:?
_backing.rwUnlock();
}
@Override
public void freeze() {
_backing.runWriteLocked(JObjectManager.ResolutionStrategy.NO_RESOLUTION, (m, d, b, v) -> {
m.freeze();
return null;
});
}
@Override
public void unfreeze() {
_backing.runWriteLocked(JObjectManager.ResolutionStrategy.NO_RESOLUTION, (m, d, b, v) -> {
m.unfreeze();
return null;
});
}
@Override
public void notifyRef(String id) {
_backing.getMeta().addRef(id);
}
@Override
public void notifyRmRef(String id) {
_backing.getMeta().removeRef(id);
}
@Override
public TreeNode<Long, UUID, JKleppmannTreeNodeMeta, String> getNode() {
_backing.tryResolve(JObjectManager.ResolutionStrategy.LOCAL_ONLY);
if (_backing.getData() == null)
throw new IllegalStateException("Node " + _backing.getMeta().getName() + " data lost!");
return _backing.getData().getNode();
}
}

View File

@@ -0,0 +1,30 @@
package com.usatiuk.dhfs.objects.jkleppmanntree;
import com.usatiuk.dhfs.objects.jkleppmanntree.structs.JKleppmannTreeNodeMeta;
import com.usatiuk.dhfs.objects.jkleppmanntree.structs.JKleppmannTreeNodeMetaFile;
import com.usatiuk.dhfs.objects.repository.opsupport.Op;
import com.usatiuk.kleppmanntree.OpMove;
import lombok.Getter;
import java.util.Collection;
import java.util.List;
import java.util.UUID;
// Wrapper to avoid having to specify generic types
public class JKleppmannTreeOpWrapper implements Op {
@Getter
private final OpMove<Long, UUID, JKleppmannTreeNodeMeta, String> _op;
public JKleppmannTreeOpWrapper(OpMove<Long, UUID, JKleppmannTreeNodeMeta, String> op) {
if (op == null) throw new IllegalArgumentException("op shouldn't be null");
_op = op;
}
@Override
public Collection<String> getEscapedRefs() {
if (_op.newMeta() instanceof JKleppmannTreeNodeMetaFile mf) {
return List.of(mf.getFileIno());
}
return List.of();
}
}

View File

@@ -0,0 +1,25 @@
package com.usatiuk.dhfs.objects.jkleppmanntree;
import com.usatiuk.dhfs.objects.repository.PersistentPeerDataService;
import com.usatiuk.kleppmanntree.PeerInterface;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import java.util.Collection;
import java.util.UUID;
@Singleton
public class JKleppmannTreePeerInterface implements PeerInterface<UUID> {
@Inject
PersistentPeerDataService persistentPeerDataService;
@Override
public UUID getSelfId() {
return persistentPeerDataService.getSelfUuid();
}
@Override
public Collection<UUID> getAllPeers() {
return persistentPeerDataService.getHostUuidsAndSelf();
}
}

View File

@@ -0,0 +1,25 @@
package com.usatiuk.dhfs.objects.jkleppmanntree;
import com.usatiuk.dhfs.objects.repository.opsupport.Op;
import lombok.Getter;
import java.util.Collection;
import java.util.List;
import java.util.UUID;
public class JKleppmannTreePeriodicPushOp implements Op {
@Getter
private final UUID _from;
@Getter
private final long _timestamp;
public JKleppmannTreePeriodicPushOp(UUID from, long timestamp) {
_from = from;
_timestamp = timestamp;
}
@Override
public Collection<String> getEscapedRefs() {
return List.of();
}
}

View File

@@ -0,0 +1,45 @@
package com.usatiuk.dhfs.objects.jkleppmanntree.structs;
import com.usatiuk.dhfs.objects.jrepository.JObjectData;
import com.usatiuk.dhfs.objects.jrepository.OnlyLocal;
import com.usatiuk.dhfs.objects.repository.ConflictResolver;
import com.usatiuk.kleppmanntree.TreeNode;
import lombok.Getter;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
// FIXME: Ideally this is two classes?
@OnlyLocal
public class JKleppmannTreeNode extends JObjectData {
@Getter
final TreeNode<Long, UUID, JKleppmannTreeNodeMeta, String> _node;
public JKleppmannTreeNode(TreeNode<Long, UUID, JKleppmannTreeNodeMeta, String> node) {
_node = node;
}
@Override
public String getName() {
return _node.getId();
}
@Override
public Class<? extends ConflictResolver> getConflictResolver() {
return null;
}
@Override
public Collection<String> extractRefs() {
if (_node.getMeta() instanceof JKleppmannTreeNodeMetaFile)
return List.of(((JKleppmannTreeNodeMetaFile) _node.getMeta()).getFileIno());
return Collections.unmodifiableCollection(_node.getChildren().values());
}
@Override
public Class<? extends JObjectData> getRefType() {
return JObjectData.class;
}
}

View File

@@ -0,0 +1,31 @@
package com.usatiuk.dhfs.objects.jkleppmanntree.structs;
import com.usatiuk.autoprotomap.runtime.ProtoMirror;
import com.usatiuk.dhfs.objects.persistence.JKleppmannTreeNodeMetaP;
import com.usatiuk.kleppmanntree.NodeMeta;
import lombok.Getter;
import java.util.Objects;
@ProtoMirror(JKleppmannTreeNodeMetaP.class)
public abstract class JKleppmannTreeNodeMeta implements NodeMeta {
@Getter
private final String _name;
public JKleppmannTreeNodeMeta(String name) {_name = name;}
public abstract JKleppmannTreeNodeMeta withName(String name);
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
JKleppmannTreeNodeMeta that = (JKleppmannTreeNodeMeta) o;
return Objects.equals(_name, that._name);
}
@Override
public int hashCode() {
return Objects.hashCode(_name);
}
}

View File

@@ -0,0 +1,16 @@
package com.usatiuk.dhfs.objects.jkleppmanntree.structs;
import com.usatiuk.autoprotomap.runtime.ProtoMirror;
import com.usatiuk.dhfs.objects.persistence.JKleppmannTreeNodeMetaDirectoryP;
@ProtoMirror(JKleppmannTreeNodeMetaDirectoryP.class)
public class JKleppmannTreeNodeMetaDirectory extends JKleppmannTreeNodeMeta {
public JKleppmannTreeNodeMetaDirectory(String name) {
super(name);
}
@Override
public JKleppmannTreeNodeMeta withName(String name) {
return new JKleppmannTreeNodeMetaDirectory(name);
}
}

View File

@@ -0,0 +1,37 @@
package com.usatiuk.dhfs.objects.jkleppmanntree.structs;
import com.usatiuk.autoprotomap.runtime.ProtoMirror;
import com.usatiuk.dhfs.objects.persistence.JKleppmannTreeNodeMetaFileP;
import lombok.Getter;
import java.util.Objects;
@ProtoMirror(JKleppmannTreeNodeMetaFileP.class)
public class JKleppmannTreeNodeMetaFile extends JKleppmannTreeNodeMeta {
@Getter
private final String _fileIno;
public JKleppmannTreeNodeMetaFile(String name, String fileIno) {
super(name);
_fileIno = fileIno;
}
@Override
public JKleppmannTreeNodeMeta withName(String name) {
return new JKleppmannTreeNodeMetaFile(name, _fileIno);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
if (!super.equals(o)) return false;
JKleppmannTreeNodeMetaFile that = (JKleppmannTreeNodeMetaFile) o;
return Objects.equals(_fileIno, that._fileIno);
}
@Override
public int hashCode() {
return Objects.hash(super.hashCode(), _fileIno);
}
}

View File

@@ -0,0 +1,88 @@
package com.usatiuk.dhfs.objects.jkleppmanntree.structs;
import com.usatiuk.dhfs.objects.jrepository.JObjectData;
import com.usatiuk.dhfs.objects.jrepository.OnlyLocal;
import com.usatiuk.dhfs.objects.repository.ConflictResolver;
import com.usatiuk.kleppmanntree.AtomicClock;
import com.usatiuk.kleppmanntree.CombinedTimestamp;
import com.usatiuk.kleppmanntree.LogRecord;
import com.usatiuk.kleppmanntree.OpMove;
import lombok.Getter;
import java.util.*;
@OnlyLocal
public class JKleppmannTreePersistentData extends JObjectData {
private final String _treeName;
@Getter
private final AtomicClock _clock;
@Getter
private final HashMap<UUID, TreeMap<CombinedTimestamp<Long, UUID>, OpMove<Long, UUID, JKleppmannTreeNodeMeta, String>>> _queues;
@Getter
private final HashMap<UUID, Long> _peerTimestampLog;
@Getter
private final TreeMap<CombinedTimestamp<Long, UUID>, LogRecord<Long, UUID, JKleppmannTreeNodeMeta, String>> _log;
public JKleppmannTreePersistentData(String treeName, AtomicClock clock,
HashMap<UUID, TreeMap<CombinedTimestamp<Long, UUID>, OpMove<Long, UUID, JKleppmannTreeNodeMeta, String>>> queues,
HashMap<UUID, Long> peerTimestampLog, TreeMap<CombinedTimestamp<Long, UUID>, LogRecord<Long, UUID, JKleppmannTreeNodeMeta, String>> log) {
_treeName = treeName;
_clock = clock;
_queues = queues;
_peerTimestampLog = peerTimestampLog;
_log = log;
}
public JKleppmannTreePersistentData(String treeName) {
_treeName = treeName;
_clock = new AtomicClock(1);
_queues = new HashMap<>();
_peerTimestampLog = new HashMap<>();
_log = new TreeMap<>();
}
public static String nameFromTreeName(String treeName) {
return treeName + "_pd";
}
public void recordOp(UUID host, OpMove<Long, UUID, JKleppmannTreeNodeMeta, String> opMove) {
_queues.computeIfAbsent(host, h -> new TreeMap<>());
_queues.get(host).put(opMove.timestamp(), opMove);
}
public void removeOp(UUID host, OpMove<Long, UUID, JKleppmannTreeNodeMeta, String> opMove) {
_queues.get(host).remove(opMove.timestamp(), opMove);
}
public void recordOp(Collection<UUID> hosts, OpMove<Long, UUID, JKleppmannTreeNodeMeta, String> opMove) {
for (var u : hosts) {
recordOp(u, opMove);
}
}
public void removeOp(Collection<UUID> hosts, OpMove<Long, UUID, JKleppmannTreeNodeMeta, String> opMove) {
for (var u : hosts) {
removeOp(u, opMove);
}
}
@Override
public String getName() {
return nameFromTreeName(_treeName);
}
public String getTreeName() {
return _treeName;
}
@Override
public Class<? extends ConflictResolver> getConflictResolver() {
return null;
}
@Override
public Collection<String> extractRefs() {
return List.of();
}
}

Some files were not shown because too many files have changed in this diff Show More