4 Commits

Author SHA1 Message Date
c1dfe88164 quarkus update 2025-10-12 10:54:00 +02:00
acf2ae2cef less commitSnapshots created 2025-08-22 23:19:39 +02:00
19016d5e46 less bad read cache 2025-08-22 18:47:43 +02:00
c4945e7354 more fuse logs 2025-07-19 14:26:27 +02:00
9 changed files with 73 additions and 158 deletions

View File

@@ -18,8 +18,6 @@ dhfs.objects.ref_verification=true
dhfs.files.use_hash_for_chunks=false
dhfs.objects.autosync.threads=16
dhfs.objects.autosync.download-all=false
dhfs.objects.move-processor.threads=16
dhfs.objects.ref-processor.threads=16
dhfs.objects.opsender.batch-size=100
dhfs.objects.lock_timeout_secs=2
dhfs.local-discovery=true

View File

@@ -128,7 +128,9 @@ public class DhfsFuse extends FuseStubFS {
opts.add("-o");
opts.add("gid=" + gid);
}
Log.info("FUSE options: " + opts);
mount(Paths.get(root), false, debug, opts.toArray(String[]::new));
Log.info("Mounted");
}
void shutdown(@Observes @Priority(1) ShutdownEvent event) {

View File

@@ -14,8 +14,6 @@ dhfs.objects.deletion.can-delete-retry-delay=10000
dhfs.objects.ref_verification=true
dhfs.objects.autosync.threads=8
dhfs.objects.autosync.download-all=false
dhfs.objects.move-processor.threads=8
dhfs.objects.ref-processor.threads=8
dhfs.local-discovery=true
dhfs.peerdiscovery.timeout=10000
quarkus.log.category."com.usatiuk".min-level=TRACE

View File

@@ -18,6 +18,11 @@
</properties>
<dependencies>
<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
<version>3.2.2</version>
</dependency>
<dependency>
<groupId>net.jqwik</groupId>
<artifactId>jqwik</artifactId>

View File

@@ -1,9 +1,12 @@
package com.usatiuk.objects.stores;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.usatiuk.objects.JDataVersionedWrapper;
import com.usatiuk.objects.JDataVersionedWrapperLazy;
import com.usatiuk.objects.JObjectKey;
import com.usatiuk.objects.iterators.*;
import com.usatiuk.objects.iterators.CloseableKvIterator;
import com.usatiuk.objects.iterators.IteratorStart;
import com.usatiuk.objects.iterators.MaybeTombstone;
import com.usatiuk.objects.snapshot.Snapshot;
import com.usatiuk.utils.ListUtils;
import io.quarkus.logging.Log;
@@ -14,15 +17,11 @@ import jakarta.enterprise.event.Observes;
import jakarta.inject.Inject;
import org.apache.commons.lang3.tuple.Pair;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.pcollections.TreePMap;
import javax.annotation.Nonnull;
import java.time.Duration;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
/**
* CachingObjectPersistentStore is a caching layer for the SerializingObjectPersistentStore
@@ -31,61 +30,49 @@ import java.util.concurrent.atomic.AtomicReference;
*/
@ApplicationScoped
public class CachingObjectPersistentStore {
private final AtomicReference<Cache> _cache;
@Inject
SerializingObjectPersistentStore delegate;
@ConfigProperty(name = "dhfs.objects.lru.print-stats")
boolean printStats;
private ExecutorService _statusExecutor;
private AtomicLong _cached = new AtomicLong();
private AtomicLong _cacheTries = new AtomicLong();
// private ExecutorService _statusExecutor;
private final com.github.benmanes.caffeine.cache.Cache<Pair<Long, JObjectKey>, JDataVersionedWrapper> _cache;
public CachingObjectPersistentStore(@ConfigProperty(name = "dhfs.objects.lru.limit") int sizeLimit) {
_cache = new AtomicReference<>(
new Cache(TreePMap.empty(), 0, -1, sizeLimit)
);
_cache = Caffeine.newBuilder()
.maximumWeight(sizeLimit)
.weigher((Pair<Long, JObjectKey> key, JDataVersionedWrapper value) -> value.estimateSize())
.expireAfterWrite(Duration.ofMinutes(5)).build();
}
void init(@Observes @Priority(110) StartupEvent event) {
try (var s = delegate.getSnapshot()) {
_cache.set(_cache.get().withVersion(s.id()));
}
if (printStats) {
_statusExecutor = Executors.newSingleThreadExecutor();
_statusExecutor.submit(() -> {
try {
while (true) {
Log.infov("Cache status: size=" + _cache.get().size() / 1024 / 1024 + "MB" + " cache success ratio: " + (_cached.get() / (double) _cacheTries.get()));
_cached.set(0);
_cacheTries.set(0);
Thread.sleep(1000);
}
} catch (InterruptedException ignored) {
}
});
}
// if (printStats) {
// _statusExecutor = Executors.newSingleThreadExecutor();
// _statusExecutor.submit(() -> {
// try {
// while (true) {
// Log.infov("Cache status: size=" + _cache.estimatedSize() / 1024 / 1024 + "MB" + " cache success ratio: " + (_cached.get() / (double) _cacheTries.get()));
// _cached.set(0);
// _cacheTries.set(0);
// Thread.sleep(1000);
// }
// } catch (InterruptedException ignored) {
// }
// });
// }
}
/**
* Commit the transaction to the underlying store and update the cache.
* Once this function returns, the transaction is committed and the cache is updated.
*
* @param objs the transaction manifest object
* @param txId the transaction ID
*/
public void commitTx(TxManifestObj<? extends JDataVersionedWrapper> objs, long txId) {
Log.tracev("Committing: {0} writes, {1} deletes", objs.written().size(), objs.deleted().size());
var cache = _cache.get();
for (var write : objs.written()) {
cache = cache.withPut(write.getLeft(), Optional.of(write.getRight()));
}
for (var del : objs.deleted()) {
cache = cache.withPut(del, Optional.empty());
}
cache = cache.withVersion(txId);
delegate.commitTx(objs, txId);
_cache.set(cache);
Log.tracev("Committed: {0} writes, {1} deletes", objs.written().size(), objs.deleted().size());
}
@@ -94,60 +81,28 @@ public class CachingObjectPersistentStore {
* Get a snapshot of underlying store and the cache.
* Objects are read from the cache if possible, if not, they are read from the underlying store,
* then possibly lazily cached when their data is accessed.
*
* @return a snapshot of the cached store
*/
public Snapshot<JObjectKey, JDataVersionedWrapper> getSnapshot() {
while (true) {
var cache = _cache.get();
if (cache == null)
return delegate.getSnapshot();
Cache curCache = null;
Snapshot<JObjectKey, JDataVersionedWrapper> backing = null;
try {
curCache = _cache.get();
backing = delegate.getSnapshot();
if (curCache.version() != backing.id()) {
backing.close();
backing = null;
continue;
}
Snapshot<JObjectKey, JDataVersionedWrapper> finalBacking = backing;
Cache finalCurCache = curCache;
return new Snapshot<JObjectKey, JDataVersionedWrapper>() {
private final Cache _curCache = finalCurCache;
private final Snapshot<JObjectKey, JDataVersionedWrapper> _backing = finalBacking;
private boolean _invalid = false;
private boolean _closed = false;
private void doCache(JObjectKey key, Optional<JDataVersionedWrapper> obj) {
_cacheTries.incrementAndGet();
if (_invalid)
return;
var globalCache = _cache.get();
if (globalCache.version() != _curCache.version()) {
_invalid = true;
return;
}
var newCache = globalCache.withPut(key, obj);
if (_cache.compareAndSet(globalCache, newCache))
_cached.incrementAndGet();
private void doCache(JObjectKey key, JDataVersionedWrapper obj) {
var cacheKey = Pair.of(obj.version(), key);
_cache.put(cacheKey, obj);
}
private void maybeCache(JObjectKey key, Optional<JDataVersionedWrapper> obj) {
if (obj.isEmpty()) {
doCache(key, obj);
return;
}
var wrapper = obj.get();
if (!(wrapper instanceof JDataVersionedWrapperLazy lazy)) {
private void maybeCache(JObjectKey key, JDataVersionedWrapper obj) {
if (!(obj instanceof JDataVersionedWrapperLazy lazy)) {
doCache(key, obj);
return;
}
@@ -164,29 +119,25 @@ public class CachingObjectPersistentStore {
@Override
public List<CloseableKvIterator<JObjectKey, MaybeTombstone<JDataVersionedWrapper>>> getIterator(IteratorStart start, JObjectKey key) {
return ListUtils.prependAndMap(
new NavigableMapKvIterator<JObjectKey, MaybeTombstone<JDataVersionedWrapper>>(_curCache.map(), start, key),
return ListUtils.map(
_backing.getIterator(start, key),
i -> new CachingKvIterator((CloseableKvIterator<JObjectKey, JDataVersionedWrapper>) (CloseableKvIterator<JObjectKey, ?>) i)
);
}
private JDataVersionedWrapper tryGetCached(JObjectKey key, JDataVersionedWrapper obj) {
var cached = _cache.getIfPresent(Pair.of(obj.version(), key));
if (cached != null) {
return cached;
}
maybeCache(key, obj);
return obj;
}
@Nonnull
@Override
public Optional<JDataVersionedWrapper> readObject(JObjectKey name) {
var cached = _curCache.map().get(name);
if (cached != null) {
return switch (cached) {
case CacheEntryPresent data -> Optional.of(data.value());
case CacheEntryMiss tombstone -> {
yield Optional.empty();
}
default -> throw new IllegalStateException("Unexpected value: " + cached);
};
}
var read = _backing.readObject(name);
maybeCache(name, read);
return read;
return _backing.readObject(name).map(o -> tryGetCached(name, o));
}
@Override
@@ -235,8 +186,8 @@ public class CachingObjectPersistentStore {
@Override
public Pair<JObjectKey, MaybeTombstone<JDataVersionedWrapper>> prev() {
var prev = _delegate.prev();
maybeCache(prev.getKey(), Optional.of(prev.getValue()));
return (Pair<JObjectKey, MaybeTombstone<JDataVersionedWrapper>>) (Pair<JObjectKey, ?>) prev;
var cached = tryGetCached(prev.getKey(), prev.getValue());
return Pair.of(prev.getKey(), cached);
}
@Override
@@ -252,8 +203,8 @@ public class CachingObjectPersistentStore {
@Override
public Pair<JObjectKey, MaybeTombstone<JDataVersionedWrapper>> next() {
var next = _delegate.next();
maybeCache(next.getKey(), Optional.of(next.getValue()));
return (Pair<JObjectKey, MaybeTombstone<JDataVersionedWrapper>>) (Pair<JObjectKey, ?>) next;
var cached = tryGetCached(next.getKey(), next.getValue());
return Pair.of(next.getKey(), cached);
}
}
};
@@ -265,54 +216,4 @@ public class CachingObjectPersistentStore {
}
}
}
private interface CacheEntry extends MaybeTombstone<JDataVersionedWrapper> {
int size();
}
private record Cache(TreePMap<JObjectKey, CacheEntry> map,
int size,
long version,
int sizeLimit) {
public Cache withPut(JObjectKey key, Optional<JDataVersionedWrapper> obj) {
var entry = obj.<CacheEntry>map(o -> new CacheEntryPresent(o, o.estimateSize())).orElse(new CacheEntryMiss());
int newSize = size() + entry.size();
var old = map.get(key);
if (old != null)
newSize -= old.size();
TreePMap<JObjectKey, CacheEntry> newCache = map();
while (newSize > sizeLimit) {
var del = newCache.firstEntry();
newCache = newCache.minusFirstEntry();
newSize -= del.getValue().size();
}
newCache = newCache.plus(key, entry);
return new Cache(
newCache,
newSize,
version,
sizeLimit
);
}
public Cache withVersion(long version) {
return new Cache(map, size, version, sizeLimit);
}
}
private record CacheEntryPresent(JDataVersionedWrapper value,
int size) implements CacheEntry, Data<JDataVersionedWrapper> {
}
private record CacheEntryMiss() implements CacheEntry, Tombstone<JDataVersionedWrapper> {
@Override
public int size() {
return 64;
}
}
}

View File

@@ -307,6 +307,15 @@ public class WritebackObjectPersistentStore {
return r -> asyncFence(bundleId, r);
}
/**
* Get the last committed transaction ID.
*
* @return the last committed transaction ID
*/
public long getLastCommitId() {
return _lastCommittedId.get();
}
/**
* Get a snapshot of the persistent store, including the pending writes.
*

View File

@@ -165,7 +165,6 @@ public class TransactionService {
toUnlock.add(lock);
}
commitSnapshot = writebackObjectPersistentStore.getSnapshot();
} else {
Log.trace("Committing transaction - no changes");
@@ -201,7 +200,10 @@ public class TransactionService {
Log.trace("Committing transaction start");
var snapshotId = tx.snapshot().id();
if (snapshotId != commitSnapshot.id()) {
// All dependencies are locked and could not be changed concurrently now
if (snapshotId != writebackObjectPersistentStore.getLastCommitId()) {
commitSnapshot = writebackObjectPersistentStore.getSnapshot();
for (var read : readSet.entrySet()) {
var current = commitSnapshot.readObject(read.getKey());

View File

@@ -28,8 +28,8 @@
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<quarkus.platform.artifact-id>quarkus-bom</quarkus.platform.artifact-id>
<quarkus.platform.group-id>io.quarkus.platform</quarkus.platform.group-id>
<quarkus.platform.version>3.20.0</quarkus.platform.version>
<surefire-plugin.version>3.5.2</surefire-plugin.version>
<quarkus.platform.version>3.27.0</quarkus.platform.version>
<surefire-plugin.version>3.5.4</surefire-plugin.version>
<dhfs.native-libs-dir>${project.parent.build.outputDirectory}/native</dhfs.native-libs-dir>
</properties>
@@ -63,7 +63,7 @@
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
<version>5.9.1</version>
<version>5.13.4</version>
<scope>test</scope>
</dependency>
<dependency>

View File

@@ -17,8 +17,8 @@ dhfs.objects.ref_verification=true
dhfs.files.use_hash_for_chunks=false
dhfs.objects.autosync.threads=16
dhfs.objects.autosync.download-all=false
dhfs.objects.move-processor.threads=16
dhfs.objects.ref-processor.threads=16
dhfs.objects.move-processor.threads=8
dhfs.objects.ref-processor.threads=8
dhfs.objects.opsender.batch-size=100
dhfs.objects.lock_timeout_secs=2
dhfs.local-discovery=true