8 Commits

Author SHA1 Message Date
c1dfe88164 quarkus update 2025-10-12 10:54:00 +02:00
acf2ae2cef less commitSnapshots created 2025-08-22 23:19:39 +02:00
19016d5e46 less bad read cache 2025-08-22 18:47:43 +02:00
c4945e7354 more fuse logs 2025-07-19 14:26:27 +02:00
60ffc12c61 disable lazyfs tests by default
still some fuse issues there
2025-06-25 18:50:19 +02:00
e3e62467e4 cleanup data path config 2025-06-25 18:02:20 +02:00
2434b0464f fit.cvut.cz link fix 2025-06-12 09:52:24 +02:00
ab4e06177e add a notice 2025-06-12 09:49:30 +02:00
39 changed files with 133 additions and 288 deletions

View File

@@ -81,3 +81,10 @@ Then, a web interface will be available at `losthost:8080` (or whatever the HTTP
To run LazyFS tests, LazyFS needs to be built: the git submodules need to be cloned and `./thirdparty/lazyfs/build.sh` script needs to be run.
LazyFS tests were only tested on Linux.
### Notice
This software was developed with the support of the Faculty of Information Technology, Czech Technical University in Prague, [fit.cvut.cz](https://fit.cvut.cz)
<img src="./docs/logo-fit-en-cerna.svg" height="64">

View File

@@ -2,7 +2,7 @@
<configuration default="false" name="Main 2" type="QsApplicationConfigurationType" factoryName="QuarkusApplication">
<option name="MAIN_CLASS_NAME" value="com.usatiuk.dhfsfuse.Main" />
<module name="dhfs-fuse" />
<option name="VM_PARAMETERS" value="-XX:+UnlockDiagnosticVMOptions -XX:+UseParallelGC -XX:+DebugNonSafepoints --enable-preview --add-exports java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-exports java.base/jdk.internal.access=ALL-UNNAMED -ea -Xmx512M -Ddhfs.webui.root=$ProjectFileDir$/../webui/dist -Ddhfs.fuse.root=${HOME}/dhfs_test/2/fuse -Ddhfs.objects.persistence.files.root=${HOME}/dhfs_test/2/data -Ddhfs.objects.persistence.stuff.root=${HOME}/dhfs_test/2/data/stuff -Ddhfs.objects.peerdiscovery.broadcast=false -Dquarkus.http.port=9020 -Dquarkus.http.ssl-port=9021 -Ddhfs.peerdiscovery.preset-uuid=22000000-0000-0000-0000-000000000000 -Ddhfs.peerdiscovery.static-peers=11000000-0000-0000-0000-000000000000:127.0.0.1:9010:9011" />
<option name="VM_PARAMETERS" value="-XX:+UnlockDiagnosticVMOptions -XX:+UseParallelGC -XX:+DebugNonSafepoints --enable-preview --add-exports java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-exports java.base/jdk.internal.access=ALL-UNNAMED -ea -Xmx512M -Ddhfs.webui.root=$ProjectFileDir$/../webui/dist -Ddhfs.fuse.root=${HOME}/dhfs_test/2/fuse -Ddhfs.objects.persistence.root=${HOME}/dhfs_test/2/data -Ddhfs.objects.peerdiscovery.broadcast=false -Dquarkus.http.port=9020 -Dquarkus.http.ssl-port=9021 -Ddhfs.peerdiscovery.preset-uuid=22000000-0000-0000-0000-000000000000 -Ddhfs.peerdiscovery.static-peers=11000000-0000-0000-0000-000000000000:127.0.0.1:9010:9011" />
<extension name="coverage">
<pattern>
<option name="PATTERN" value="com.usatiuk.dhfs.remoteobj.*" />

View File

@@ -2,7 +2,7 @@
<configuration default="false" name="Main" type="QsApplicationConfigurationType" factoryName="QuarkusApplication" nameIsGenerated="true">
<option name="MAIN_CLASS_NAME" value="com.usatiuk.dhfsfuse.Main" />
<module name="dhfs-fuse" />
<option name="VM_PARAMETERS" value="-XX:+UnlockDiagnosticVMOptions -XX:+UseZGC -XX:+ZGenerational --enable-preview -XX:+DebugNonSafepoints --add-exports java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-exports java.base/jdk.internal.access=ALL-UNNAMED -ea -Xmx1G -Ddhfs.webui.root=$ProjectFileDir$/../webui/dist -Ddhfs.fuse.root=${HOME}/dhfs_test/1/fuse -Ddhfs.objects.persistence.files.root=${HOME}/dhfs_test/1/data -Ddhfs.objects.persistence.stuff.root=${HOME}/dhfs_test/1/data/stuff -Ddhfs.objects.peerdiscovery.broadcast=true -Dquarkus.http.port=9010 -Dquarkus.http.ssl-port=9011 -Ddhfs.peerdiscovery.preset-uuid=11000000-0000-0000-0000-000000000000 -Ddhfs.peerdiscovery.static-peers=22000000-0000-0000-0000-000000000000:127.0.0.1:9020:9021 -Dquarkus.http.host=0.0.0.0" />
<option name="VM_PARAMETERS" value="-XX:+UnlockDiagnosticVMOptions -XX:+UseZGC -XX:+ZGenerational --enable-preview -XX:+DebugNonSafepoints --add-exports java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-exports java.base/jdk.internal.access=ALL-UNNAMED -ea -Xmx1G -Ddhfs.webui.root=$ProjectFileDir$/../webui/dist -Ddhfs.fuse.root=${HOME}/dhfs_test/1/fuse -Ddhfs.objects.persistence.root=${HOME}/dhfs_test/1/data -Ddhfs.objects.peerdiscovery.broadcast=true -Dquarkus.http.port=9010 -Dquarkus.http.ssl-port=9011 -Ddhfs.peerdiscovery.preset-uuid=11000000-0000-0000-0000-000000000000 -Ddhfs.peerdiscovery.static-peers=22000000-0000-0000-0000-000000000000:127.0.0.1:9020:9021 -Dquarkus.http.host=0.0.0.0" />
<extension name="coverage">
<pattern>
<option name="PATTERN" value="com.usatiuk.dhfs.remoteobj.*" />

View File

@@ -1,43 +0,0 @@
version: "3.2"
services:
dhfs1:
build: .
privileged: true
devices:
- /dev/fuse
volumes:
- $HOME/dhfs/dhfs1:/dhfs_root
- $HOME/dhfs/dhfs1_f:/dhfs_root/fuse:rshared
- ./target/quarkus-app:/app
command: "java --add-exports java.base/sun.nio.ch=ALL-UNNAMED
-Ddhfs.objects.persistence.files.root=/dhfs_root/p
-Ddhfs.objects.root=/dhfs_root/d
-Ddhfs.fuse.root=/dhfs_root/fuse -Dquarkus.http.host=0.0.0.0
-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=*:5005
-jar /app/quarkus-run.jar"
ports:
- 8080:8080
- 8081:8443
- 5005:5005
dhfs2:
build: .
privileged: true
devices:
- /dev/fuse
volumes:
- $HOME/dhfs/dhfs2:/dhfs_root
- $HOME/dhfs/dhfs2_f:/dhfs_root/fuse:rshared
- ./target/quarkus-app:/app
command: "java --add-exports java.base/sun.nio.ch=ALL-UNNAMED
--add-exports java.base/jdk.internal.access=ALL-UNNAMED
--add-opens=java.base/java.nio=ALL-UNNAMED
-Ddhfs.objects.persistence.files.root=/dhfs_root/p
-Ddhfs.objects.root=/dhfs_root/d
-Ddhfs.fuse.root=/dhfs_root/fuse -Dquarkus.http.host=0.0.0.0
-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=*:5010
-jar /app/quarkus-run.jar"
ports:
- 8090:8080
- 8091:8443
- 5010:5010

View File

@@ -18,8 +18,6 @@ dhfs.objects.ref_verification=true
dhfs.files.use_hash_for_chunks=false
dhfs.objects.autosync.threads=16
dhfs.objects.autosync.download-all=false
dhfs.objects.move-processor.threads=16
dhfs.objects.ref-processor.threads=16
dhfs.objects.opsender.batch-size=100
dhfs.objects.lock_timeout_secs=2
dhfs.local-discovery=true

View File

@@ -21,7 +21,7 @@ abstract public class TempDataProfile implements QuarkusTestProfile {
throw new RuntimeException(e);
}
var ret = new HashMap<String, String>();
ret.put("dhfs.objects.persistence.files.root", tempDirWithPrefix.resolve("dhfs_root_test").toString());
ret.put("dhfs.objects.persistence.root", tempDirWithPrefix.resolve("dhfs_root_test").toString());
ret.put("dhfs.fuse.root", tempDirWithPrefix.resolve("dhfs_fuse_root_test").toString());
getConfigOverrides(ret);
return ret;

View File

@@ -15,7 +15,7 @@ import java.util.Objects;
@ApplicationScoped
public class TestDataCleaner {
@ConfigProperty(name = "dhfs.objects.persistence.files.root")
@ConfigProperty(name = "dhfs.objects.persistence.root")
String tempDirectory;
void init(@Observes @Priority(1) StartupEvent event) throws IOException {

View File

@@ -1,5 +1,4 @@
dhfs.objects.persistence.files.root=${HOME}/dhfs_data/dhfs_root_test
dhfs.objects.root=${HOME}/dhfs_data/dhfs_root_d_test
dhfs.objects.persistence.root=${HOME}/dhfs_data/dhfs_root_test
dhfs.fuse.root=${HOME}/dhfs_data/dhfs_fuse_root_test
dhfs.objects.ref_verification=true
dhfs.objects.deletion.delay=0

View File

@@ -11,8 +11,7 @@ services:
- $HOME/dhfs/dhfs1_f:/dhfs_root/fuse:rshared
- ./target/quarkus-app:/app
command: "java --add-exports java.base/sun.nio.ch=ALL-UNNAMED
-Ddhfs.objects.persistence.files.root=/dhfs_root/p
-Ddhfs.objects.root=/dhfs_root/d
-Ddhfs.objects.persistence.root=/dhfs_root/p
-Ddhfs.fuse.root=/dhfs_root/fuse -Dquarkus.http.host=0.0.0.0
-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=*:5005
-jar /app/quarkus-run.jar"
@@ -32,8 +31,7 @@ services:
command: "java --add-exports java.base/sun.nio.ch=ALL-UNNAMED
--add-exports java.base/jdk.internal.access=ALL-UNNAMED
--add-opens=java.base/java.nio=ALL-UNNAMED
-Ddhfs.objects.persistence.files.root=/dhfs_root/p
-Ddhfs.objects.root=/dhfs_root/d
-Ddhfs.objects.persistence.root=/dhfs_root/p
-Ddhfs.fuse.root=/dhfs_root/fuse -Dquarkus.http.host=0.0.0.0
-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=*:5010
-jar /app/quarkus-run.jar"

View File

@@ -128,7 +128,9 @@ public class DhfsFuse extends FuseStubFS {
opts.add("-o");
opts.add("gid=" + gid);
}
Log.info("FUSE options: " + opts);
mount(Paths.get(root), false, debug, opts.toArray(String[]::new));
Log.info("Mounted");
}
void shutdown(@Observes @Priority(1) ShutdownEvent event) {

View File

@@ -14,8 +14,6 @@ dhfs.objects.deletion.can-delete-retry-delay=10000
dhfs.objects.ref_verification=true
dhfs.objects.autosync.threads=8
dhfs.objects.autosync.download-all=false
dhfs.objects.move-processor.threads=8
dhfs.objects.ref-processor.threads=8
dhfs.local-discovery=true
dhfs.peerdiscovery.timeout=10000
quarkus.log.category."com.usatiuk".min-level=TRACE

View File

@@ -21,7 +21,7 @@ abstract public class TempDataProfile implements QuarkusTestProfile {
throw new RuntimeException(e);
}
var ret = new HashMap<String, String>();
ret.put("dhfs.objects.persistence.files.root", tempDirWithPrefix.resolve("dhfs_root_test").toString());
ret.put("dhfs.objects.persistence.root", tempDirWithPrefix.resolve("dhfs_root_test").toString());
ret.put("dhfs.fuse.root", tempDirWithPrefix.resolve("dhfs_fuse_root_test").toString());
getConfigOverrides(ret);
return ret;

View File

@@ -15,7 +15,7 @@ import java.util.Objects;
@ApplicationScoped
public class TestDataCleaner {
@ConfigProperty(name = "dhfs.objects.persistence.files.root")
@ConfigProperty(name = "dhfs.objects.persistence.root")
String tempDirectory;
void init(@Observes @Priority(1) StartupEvent event) throws IOException {

View File

@@ -56,8 +56,8 @@ public class DhfsFuseIT {
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Listening"), 60, TimeUnit.SECONDS);
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Listening"), 60, TimeUnit.SECONDS);
c1uuid = container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/data/stuff/self_uuid").getStdout();
c2uuid = container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/data/stuff/self_uuid").getStdout();
c1uuid = container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/data/self_uuid").getStdout();
c2uuid = container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/data/self_uuid").getStdout();
Assertions.assertDoesNotThrow(() -> UUID.fromString(c1uuid));
Assertions.assertDoesNotThrow(() -> UUID.fromString(c2uuid));

View File

@@ -72,9 +72,9 @@ public class DhfsFusex3IT {
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Listening"), 60, TimeUnit.SECONDS);
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Listening"), 60, TimeUnit.SECONDS);
c1uuid = container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/data/stuff/self_uuid").getStdout();
c2uuid = container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/data/stuff/self_uuid").getStdout();
c3uuid = container3.execInContainer("/bin/sh", "-c", "cat /dhfs_test/data/stuff/self_uuid").getStdout();
c1uuid = container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/data/self_uuid").getStdout();
c2uuid = container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/data/self_uuid").getStdout();
c3uuid = container3.execInContainer("/bin/sh", "-c", "cat /dhfs_test/data/self_uuid").getStdout();
Log.info(container1.getContainerId() + "=" + c1uuid + " = 1");
Log.info(container2.getContainerId() + "=" + c2uuid + " = 2");

View File

@@ -87,7 +87,7 @@ public class DhfsImage implements Future<String> {
"-Dquarkus.log.category.\"com.usatiuk.objects.transaction\".level=INFO",
"-Ddhfs.objects.periodic-push-op-interval=5s",
"-Ddhfs.fuse.root=/dhfs_test/fuse",
"-Ddhfs.objects.persistence.files.root=/dhfs_test/data",
"-Ddhfs.objects.persistence.root=/dhfs_test/data",
"-Ddhfs.objects.persistence.stuff.root=/dhfs_test/data/stuff",
"-jar", "/app/quarkus-run.jar")
.run("mkdir /dhfs_test && chmod 777 /dhfs_test")

View File

@@ -72,8 +72,8 @@ public class KillIT {
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Listening"), 60, TimeUnit.SECONDS);
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Listening"), 60, TimeUnit.SECONDS);
c1uuid = container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/data/stuff/self_uuid").getStdout();
c2uuid = container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/data/stuff/self_uuid").getStdout();
c1uuid = container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/data/self_uuid").getStdout();
c2uuid = container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/data/self_uuid").getStdout();
Assertions.assertDoesNotThrow(() -> UUID.fromString(c1uuid));
Assertions.assertDoesNotThrow(() -> UUID.fromString(c2uuid));

View File

@@ -23,6 +23,7 @@ import java.util.stream.Stream;
import static org.awaitility.Awaitility.await;
@Disabled
public class LazyFsIT {
GenericContainer<?> container1;
GenericContainer<?> container2;
@@ -82,8 +83,8 @@ public class LazyFsIT {
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Listening"), 60, TimeUnit.SECONDS);
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Listening"), 60, TimeUnit.SECONDS);
c1uuid = container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/data/stuff/self_uuid").getStdout();
c2uuid = container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/data/stuff/self_uuid").getStdout();
c1uuid = container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/data/self_uuid").getStdout();
c2uuid = container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/data/self_uuid").getStdout();
Assertions.assertDoesNotThrow(() -> UUID.fromString(c1uuid));
Assertions.assertDoesNotThrow(() -> UUID.fromString(c2uuid));

View File

@@ -66,8 +66,8 @@ public class ResyncIT {
@Test
void readWriteFileTest() throws IOException, InterruptedException, TimeoutException {
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo tesempty > /dhfs_test/fuse/testf1").getExitCode());
c1uuid = container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/data/stuff/self_uuid").getStdout();
c2uuid = container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/data/stuff/self_uuid").getStdout();
c1uuid = container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/data/self_uuid").getStdout();
c2uuid = container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/data/self_uuid").getStdout();
Assertions.assertDoesNotThrow(() -> UUID.fromString(c1uuid));
Assertions.assertDoesNotThrow(() -> UUID.fromString(c2uuid));
@@ -106,8 +106,8 @@ public class ResyncIT {
foundWc = container2.execInContainer("/bin/sh", "-c", "find /dhfs_test/fuse -type f | wc -l");
Assertions.assertEquals(200, Integer.valueOf(foundWc.getStdout().strip()));
c1uuid = container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/data/stuff/self_uuid").getStdout();
c2uuid = container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/data/stuff/self_uuid").getStdout();
c1uuid = container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/data/self_uuid").getStdout();
c2uuid = container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/data/self_uuid").getStdout();
Assertions.assertDoesNotThrow(() -> UUID.fromString(c1uuid));
Assertions.assertDoesNotThrow(() -> UUID.fromString(c2uuid));
@@ -146,8 +146,8 @@ public class ResyncIT {
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "mv /dhfs_test/fuse/testd1 /dhfs_test/fuse/testd2").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo tesempty2 > /dhfs_test/fuse/testd2/testf2").getExitCode());
c1uuid = container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/data/stuff/self_uuid").getStdout();
c2uuid = container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/data/stuff/self_uuid").getStdout();
c1uuid = container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/data/self_uuid").getStdout();
c2uuid = container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/data/self_uuid").getStdout();
Assertions.assertDoesNotThrow(() -> UUID.fromString(c1uuid));
Assertions.assertDoesNotThrow(() -> UUID.fromString(c2uuid));

View File

@@ -1,5 +1,4 @@
dhfs.objects.persistence.files.root=${HOME}/dhfs_data/dhfs_root_test
dhfs.objects.root=${HOME}/dhfs_data/dhfs_root_d_test
dhfs.objects.persistence.root=${HOME}/dhfs_data/dhfs_root_test
dhfs.fuse.root=${HOME}/dhfs_data/dhfs_fuse_root_test
dhfs.objects.ref_verification=true
dhfs.objects.deletion.delay=0

View File

@@ -18,6 +18,11 @@
</properties>
<dependencies>
<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
<version>3.2.2</version>
</dependency>
<dependency>
<groupId>net.jqwik</groupId>
<artifactId>jqwik</artifactId>

View File

@@ -1,9 +1,12 @@
package com.usatiuk.objects.stores;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.usatiuk.objects.JDataVersionedWrapper;
import com.usatiuk.objects.JDataVersionedWrapperLazy;
import com.usatiuk.objects.JObjectKey;
import com.usatiuk.objects.iterators.*;
import com.usatiuk.objects.iterators.CloseableKvIterator;
import com.usatiuk.objects.iterators.IteratorStart;
import com.usatiuk.objects.iterators.MaybeTombstone;
import com.usatiuk.objects.snapshot.Snapshot;
import com.usatiuk.utils.ListUtils;
import io.quarkus.logging.Log;
@@ -14,15 +17,11 @@ import jakarta.enterprise.event.Observes;
import jakarta.inject.Inject;
import org.apache.commons.lang3.tuple.Pair;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.pcollections.TreePMap;
import javax.annotation.Nonnull;
import java.time.Duration;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
/**
* CachingObjectPersistentStore is a caching layer for the SerializingObjectPersistentStore
@@ -31,61 +30,49 @@ import java.util.concurrent.atomic.AtomicReference;
*/
@ApplicationScoped
public class CachingObjectPersistentStore {
private final AtomicReference<Cache> _cache;
@Inject
SerializingObjectPersistentStore delegate;
@ConfigProperty(name = "dhfs.objects.lru.print-stats")
boolean printStats;
private ExecutorService _statusExecutor;
private AtomicLong _cached = new AtomicLong();
private AtomicLong _cacheTries = new AtomicLong();
// private ExecutorService _statusExecutor;
private final com.github.benmanes.caffeine.cache.Cache<Pair<Long, JObjectKey>, JDataVersionedWrapper> _cache;
public CachingObjectPersistentStore(@ConfigProperty(name = "dhfs.objects.lru.limit") int sizeLimit) {
_cache = new AtomicReference<>(
new Cache(TreePMap.empty(), 0, -1, sizeLimit)
);
_cache = Caffeine.newBuilder()
.maximumWeight(sizeLimit)
.weigher((Pair<Long, JObjectKey> key, JDataVersionedWrapper value) -> value.estimateSize())
.expireAfterWrite(Duration.ofMinutes(5)).build();
}
void init(@Observes @Priority(110) StartupEvent event) {
try (var s = delegate.getSnapshot()) {
_cache.set(_cache.get().withVersion(s.id()));
}
if (printStats) {
_statusExecutor = Executors.newSingleThreadExecutor();
_statusExecutor.submit(() -> {
try {
while (true) {
Log.infov("Cache status: size=" + _cache.get().size() / 1024 / 1024 + "MB" + " cache success ratio: " + (_cached.get() / (double) _cacheTries.get()));
_cached.set(0);
_cacheTries.set(0);
Thread.sleep(1000);
}
} catch (InterruptedException ignored) {
}
});
}
// if (printStats) {
// _statusExecutor = Executors.newSingleThreadExecutor();
// _statusExecutor.submit(() -> {
// try {
// while (true) {
// Log.infov("Cache status: size=" + _cache.estimatedSize() / 1024 / 1024 + "MB" + " cache success ratio: " + (_cached.get() / (double) _cacheTries.get()));
// _cached.set(0);
// _cacheTries.set(0);
// Thread.sleep(1000);
// }
// } catch (InterruptedException ignored) {
// }
// });
// }
}
/**
* Commit the transaction to the underlying store and update the cache.
* Once this function returns, the transaction is committed and the cache is updated.
*
* @param objs the transaction manifest object
* @param txId the transaction ID
*/
public void commitTx(TxManifestObj<? extends JDataVersionedWrapper> objs, long txId) {
Log.tracev("Committing: {0} writes, {1} deletes", objs.written().size(), objs.deleted().size());
var cache = _cache.get();
for (var write : objs.written()) {
cache = cache.withPut(write.getLeft(), Optional.of(write.getRight()));
}
for (var del : objs.deleted()) {
cache = cache.withPut(del, Optional.empty());
}
cache = cache.withVersion(txId);
delegate.commitTx(objs, txId);
_cache.set(cache);
Log.tracev("Committed: {0} writes, {1} deletes", objs.written().size(), objs.deleted().size());
}
@@ -94,60 +81,28 @@ public class CachingObjectPersistentStore {
* Get a snapshot of underlying store and the cache.
* Objects are read from the cache if possible, if not, they are read from the underlying store,
* then possibly lazily cached when their data is accessed.
*
* @return a snapshot of the cached store
*/
public Snapshot<JObjectKey, JDataVersionedWrapper> getSnapshot() {
while (true) {
var cache = _cache.get();
if (cache == null)
return delegate.getSnapshot();
Cache curCache = null;
Snapshot<JObjectKey, JDataVersionedWrapper> backing = null;
try {
curCache = _cache.get();
backing = delegate.getSnapshot();
if (curCache.version() != backing.id()) {
backing.close();
backing = null;
continue;
}
Snapshot<JObjectKey, JDataVersionedWrapper> finalBacking = backing;
Cache finalCurCache = curCache;
return new Snapshot<JObjectKey, JDataVersionedWrapper>() {
private final Cache _curCache = finalCurCache;
private final Snapshot<JObjectKey, JDataVersionedWrapper> _backing = finalBacking;
private boolean _invalid = false;
private boolean _closed = false;
private void doCache(JObjectKey key, Optional<JDataVersionedWrapper> obj) {
_cacheTries.incrementAndGet();
if (_invalid)
return;
var globalCache = _cache.get();
if (globalCache.version() != _curCache.version()) {
_invalid = true;
return;
}
var newCache = globalCache.withPut(key, obj);
if (_cache.compareAndSet(globalCache, newCache))
_cached.incrementAndGet();
private void doCache(JObjectKey key, JDataVersionedWrapper obj) {
var cacheKey = Pair.of(obj.version(), key);
_cache.put(cacheKey, obj);
}
private void maybeCache(JObjectKey key, Optional<JDataVersionedWrapper> obj) {
if (obj.isEmpty()) {
doCache(key, obj);
return;
}
var wrapper = obj.get();
if (!(wrapper instanceof JDataVersionedWrapperLazy lazy)) {
private void maybeCache(JObjectKey key, JDataVersionedWrapper obj) {
if (!(obj instanceof JDataVersionedWrapperLazy lazy)) {
doCache(key, obj);
return;
}
@@ -164,29 +119,25 @@ public class CachingObjectPersistentStore {
@Override
public List<CloseableKvIterator<JObjectKey, MaybeTombstone<JDataVersionedWrapper>>> getIterator(IteratorStart start, JObjectKey key) {
return ListUtils.prependAndMap(
new NavigableMapKvIterator<JObjectKey, MaybeTombstone<JDataVersionedWrapper>>(_curCache.map(), start, key),
return ListUtils.map(
_backing.getIterator(start, key),
i -> new CachingKvIterator((CloseableKvIterator<JObjectKey, JDataVersionedWrapper>) (CloseableKvIterator<JObjectKey, ?>) i)
);
}
private JDataVersionedWrapper tryGetCached(JObjectKey key, JDataVersionedWrapper obj) {
var cached = _cache.getIfPresent(Pair.of(obj.version(), key));
if (cached != null) {
return cached;
}
maybeCache(key, obj);
return obj;
}
@Nonnull
@Override
public Optional<JDataVersionedWrapper> readObject(JObjectKey name) {
var cached = _curCache.map().get(name);
if (cached != null) {
return switch (cached) {
case CacheEntryPresent data -> Optional.of(data.value());
case CacheEntryMiss tombstone -> {
yield Optional.empty();
}
default -> throw new IllegalStateException("Unexpected value: " + cached);
};
}
var read = _backing.readObject(name);
maybeCache(name, read);
return read;
return _backing.readObject(name).map(o -> tryGetCached(name, o));
}
@Override
@@ -235,8 +186,8 @@ public class CachingObjectPersistentStore {
@Override
public Pair<JObjectKey, MaybeTombstone<JDataVersionedWrapper>> prev() {
var prev = _delegate.prev();
maybeCache(prev.getKey(), Optional.of(prev.getValue()));
return (Pair<JObjectKey, MaybeTombstone<JDataVersionedWrapper>>) (Pair<JObjectKey, ?>) prev;
var cached = tryGetCached(prev.getKey(), prev.getValue());
return Pair.of(prev.getKey(), cached);
}
@Override
@@ -252,8 +203,8 @@ public class CachingObjectPersistentStore {
@Override
public Pair<JObjectKey, MaybeTombstone<JDataVersionedWrapper>> next() {
var next = _delegate.next();
maybeCache(next.getKey(), Optional.of(next.getValue()));
return (Pair<JObjectKey, MaybeTombstone<JDataVersionedWrapper>>) (Pair<JObjectKey, ?>) next;
var cached = tryGetCached(next.getKey(), next.getValue());
return Pair.of(next.getKey(), cached);
}
}
};
@@ -265,54 +216,4 @@ public class CachingObjectPersistentStore {
}
}
}
private interface CacheEntry extends MaybeTombstone<JDataVersionedWrapper> {
int size();
}
private record Cache(TreePMap<JObjectKey, CacheEntry> map,
int size,
long version,
int sizeLimit) {
public Cache withPut(JObjectKey key, Optional<JDataVersionedWrapper> obj) {
var entry = obj.<CacheEntry>map(o -> new CacheEntryPresent(o, o.estimateSize())).orElse(new CacheEntryMiss());
int newSize = size() + entry.size();
var old = map.get(key);
if (old != null)
newSize -= old.size();
TreePMap<JObjectKey, CacheEntry> newCache = map();
while (newSize > sizeLimit) {
var del = newCache.firstEntry();
newCache = newCache.minusFirstEntry();
newSize -= del.getValue().size();
}
newCache = newCache.plus(key, entry);
return new Cache(
newCache,
newSize,
version,
sizeLimit
);
}
public Cache withVersion(long version) {
return new Cache(map, size, version, sizeLimit);
}
}
private record CacheEntryPresent(JDataVersionedWrapper value,
int size) implements CacheEntry, Data<JDataVersionedWrapper> {
}
private record CacheEntryMiss() implements CacheEntry, Tombstone<JDataVersionedWrapper> {
@Override
public int size() {
return 64;
}
}
}

View File

@@ -58,7 +58,7 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore {
private Dbi<ByteBuffer> _db;
private boolean _ready = false;
public LmdbObjectPersistentStore(@ConfigProperty(name = "dhfs.objects.persistence.files.root") String root) {
public LmdbObjectPersistentStore(@ConfigProperty(name = "dhfs.objects.persistence.root") String root) {
_root = Path.of(root).resolve("objects");
}

View File

@@ -307,6 +307,15 @@ public class WritebackObjectPersistentStore {
return r -> asyncFence(bundleId, r);
}
/**
* Get the last committed transaction ID.
*
* @return the last committed transaction ID
*/
public long getLastCommitId() {
return _lastCommittedId.get();
}
/**
* Get a snapshot of the persistent store, including the pending writes.
*

View File

@@ -165,7 +165,6 @@ public class TransactionService {
toUnlock.add(lock);
}
commitSnapshot = writebackObjectPersistentStore.getSnapshot();
} else {
Log.trace("Committing transaction - no changes");
@@ -201,7 +200,10 @@ public class TransactionService {
Log.trace("Committing transaction start");
var snapshotId = tx.snapshot().id();
if (snapshotId != commitSnapshot.id()) {
// All dependencies are locked and could not be changed concurrently now
if (snapshotId != writebackObjectPersistentStore.getLastCommitId()) {
commitSnapshot = writebackObjectPersistentStore.getSnapshot();
for (var read : readSet.entrySet()) {
var current = commitSnapshot.readObject(read.getKey());

View File

@@ -3,7 +3,7 @@ dhfs.objects.writeback.limit=16777216
dhfs.objects.lru.limit=67108864
dhfs.objects.lru.print-stats=false
dhfs.objects.lock_timeout_secs=15
dhfs.objects.persistence.files.root=${HOME}/dhfs_default/data/objs
dhfs.objects.persistence.root=${HOME}/dhfs_default/data/objs
dhfs.objects.persistence.snapshot-extra-checks=false
dhfs.objects.last-seen.update=60
dhfs.objects.last-seen.timeout=43200

View File

@@ -21,7 +21,7 @@ public class TempDataProfile implements QuarkusTestProfile {
throw new RuntimeException(e);
}
var ret = new HashMap<String, String>();
ret.put("dhfs.objects.persistence.files.root", tempDirWithPrefix.resolve("dhfs_root_test").toString());
ret.put("dhfs.objects.persistence.root", tempDirWithPrefix.resolve("dhfs_root_test").toString());
ret.put("dhfs.fuse.root", tempDirWithPrefix.resolve("dhfs_fuse_root_test").toString());
ret.put("dhfs.objects.persistence", "lmdb");
getConfigOverrides(ret);

View File

@@ -15,7 +15,7 @@ import java.util.Objects;
@ApplicationScoped
public class TestDataCleaner {
@ConfigProperty(name = "dhfs.objects.persistence.files.root")
@ConfigProperty(name = "dhfs.objects.persistence.root")
String tempDirectory;
void init(@Observes @Priority(1) StartupEvent event) throws IOException {

View File

@@ -28,8 +28,8 @@
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<quarkus.platform.artifact-id>quarkus-bom</quarkus.platform.artifact-id>
<quarkus.platform.group-id>io.quarkus.platform</quarkus.platform.group-id>
<quarkus.platform.version>3.20.0</quarkus.platform.version>
<surefire-plugin.version>3.5.2</surefire-plugin.version>
<quarkus.platform.version>3.27.0</quarkus.platform.version>
<surefire-plugin.version>3.5.4</surefire-plugin.version>
<dhfs.native-libs-dir>${project.parent.build.outputDirectory}/native</dhfs.native-libs-dir>
</properties>
@@ -63,7 +63,7 @@
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
<version>5.9.1</version>
<version>5.13.4</version>
<scope>test</scope>
</dependency>
<dependency>

View File

@@ -1,43 +0,0 @@
version: "3.2"
services:
dhfs1:
build: .
privileged: true
devices:
- /dev/fuse
volumes:
- $HOME/dhfs/dhfs1:/dhfs_root
- $HOME/dhfs/dhfs1_f:/dhfs_root/fuse:rshared
- ./target/quarkus-app:/app
command: "java --add-exports java.base/sun.nio.ch=ALL-UNNAMED
-Ddhfs.objects.persistence.files.root=/dhfs_root/p
-Ddhfs.objects.root=/dhfs_root/d
-Ddhfs.fuse.root=/dhfs_root/fuse -Dquarkus.http.host=0.0.0.0
-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=*:5005
-jar /app/quarkus-run.jar"
ports:
- 8080:8080
- 8081:8443
- 5005:5005
dhfs2:
build: .
privileged: true
devices:
- /dev/fuse
volumes:
- $HOME/dhfs/dhfs2:/dhfs_root
- $HOME/dhfs/dhfs2_f:/dhfs_root/fuse:rshared
- ./target/quarkus-app:/app
command: "java --add-exports java.base/sun.nio.ch=ALL-UNNAMED
--add-exports java.base/jdk.internal.access=ALL-UNNAMED
--add-opens=java.base/java.nio=ALL-UNNAMED
-Ddhfs.objects.persistence.files.root=/dhfs_root/p
-Ddhfs.objects.root=/dhfs_root/d
-Ddhfs.fuse.root=/dhfs_root/fuse -Dquarkus.http.host=0.0.0.0
-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=*:5010
-jar /app/quarkus-run.jar"
ports:
- 8090:8080
- 8091:8443
- 5010:5010

View File

@@ -19,7 +19,7 @@ import java.nio.file.Paths;
@ApplicationScoped
public class ShutdownChecker {
private static final String dataFileName = "running";
@ConfigProperty(name = "dhfs.objects.persistence.files.root")
@ConfigProperty(name = "dhfs.objects.persistence.root")
String dataRoot;
boolean _cleanShutdown = true;
boolean _initialized = false;

View File

@@ -32,7 +32,7 @@ public class DeferredInvalidationQueueService implements PeerConnectedEventListe
ReachablePeerManager reachablePeerManager;
@Inject
InvalidationQueueService invalidationQueueService;
@ConfigProperty(name = "dhfs.objects.persistence.files.root")
@ConfigProperty(name = "dhfs.objects.persistence.root")
String dataRoot;
private DeferredInvalidationQueueData _persistentData = new DeferredInvalidationQueueData();

View File

@@ -60,7 +60,7 @@ public class PersistentPeerDataService {
@ConfigProperty(name = "dhfs.peerdiscovery.preset-uuid")
Optional<String> presetUuid;
@ConfigProperty(name = "dhfs.objects.persistence.stuff.root")
@ConfigProperty(name = "dhfs.objects.persistence.root")
String stuffRoot;
private PeerId _selfUuid;

View File

@@ -8,7 +8,6 @@ dhfs.objects.reconnect_interval=5s
dhfs.objects.write_log=false
dhfs.objects.periodic-push-op-interval=5m
dhfs.fuse.root=${HOME}/dhfs_default/fuse
dhfs.objects.persistence.stuff.root=${HOME}/dhfs_default/data/stuff
dhfs.fuse.debug=false
dhfs.fuse.enabled=true
dhfs.files.allow_recursive_delete=false
@@ -18,8 +17,8 @@ dhfs.objects.ref_verification=true
dhfs.files.use_hash_for_chunks=false
dhfs.objects.autosync.threads=16
dhfs.objects.autosync.download-all=false
dhfs.objects.move-processor.threads=16
dhfs.objects.ref-processor.threads=16
dhfs.objects.move-processor.threads=8
dhfs.objects.ref-processor.threads=8
dhfs.objects.opsender.batch-size=100
dhfs.objects.lock_timeout_secs=2
dhfs.local-discovery=true

View File

@@ -21,7 +21,7 @@ abstract public class TempDataProfile implements QuarkusTestProfile {
throw new RuntimeException(e);
}
var ret = new HashMap<String, String>();
ret.put("dhfs.objects.persistence.files.root", tempDirWithPrefix.resolve("dhfs_root_test").toString());
ret.put("dhfs.objects.persistence.root", tempDirWithPrefix.resolve("dhfs_root_test").toString());
ret.put("dhfs.fuse.root", tempDirWithPrefix.resolve("dhfs_fuse_root_test").toString());
getConfigOverrides(ret);
return ret;

View File

@@ -15,7 +15,7 @@ import java.util.Objects;
@ApplicationScoped
public class TestDataCleaner {
@ConfigProperty(name = "dhfs.objects.persistence.files.root")
@ConfigProperty(name = "dhfs.objects.persistence.root")
String tempDirectory;
void init(@Observes @Priority(1) StartupEvent event) throws IOException {

View File

@@ -1,5 +1,4 @@
dhfs.objects.persistence.files.root=${HOME}/dhfs_data/dhfs_root_test
dhfs.objects.root=${HOME}/dhfs_data/dhfs_root_d_test
dhfs.objects.persistence.root=${HOME}/dhfs_data/dhfs_root_test
dhfs.fuse.root=${HOME}/dhfs_data/dhfs_fuse_root_test
dhfs.objects.ref_verification=true
dhfs.objects.deletion.delay=0

File diff suppressed because one or more lines are too long

After

Width:  |  Height:  |  Size: 18 KiB

View File

@@ -42,8 +42,7 @@ fi
--add-exports java.base/sun.nio.ch=ALL-UNNAMED \
--add-exports java.base/jdk.internal.access=ALL-UNNAMED \
--add-opens=java.base/java.nio=ALL-UNNAMED \
-Ddhfs.objects.persistence.files.root="$SCRIPT_DIR"/../data/objects \
-Ddhfs.objects.persistence.stuff.root="$SCRIPT_DIR"/../data/stuff \
-Ddhfs.objects.persistence.root="$SCRIPT_DIR"/../data \
-Ddhfs.fuse.root="$SCRIPT_DIR"/../fuse \
-Dquarkus.http.host=0.0.0.0 \
-Dquarkus.log.category.\"com.usatiuk\".level=INFO \