mirror of
https://github.com/usatiuk/dhfs.git
synced 2025-10-28 20:47:49 +01:00
Compare commits
4 Commits
60ffc12c61
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
| c1dfe88164 | |||
| acf2ae2cef | |||
| 19016d5e46 | |||
| c4945e7354 |
@@ -18,8 +18,6 @@ dhfs.objects.ref_verification=true
|
||||
dhfs.files.use_hash_for_chunks=false
|
||||
dhfs.objects.autosync.threads=16
|
||||
dhfs.objects.autosync.download-all=false
|
||||
dhfs.objects.move-processor.threads=16
|
||||
dhfs.objects.ref-processor.threads=16
|
||||
dhfs.objects.opsender.batch-size=100
|
||||
dhfs.objects.lock_timeout_secs=2
|
||||
dhfs.local-discovery=true
|
||||
|
||||
@@ -128,7 +128,9 @@ public class DhfsFuse extends FuseStubFS {
|
||||
opts.add("-o");
|
||||
opts.add("gid=" + gid);
|
||||
}
|
||||
Log.info("FUSE options: " + opts);
|
||||
mount(Paths.get(root), false, debug, opts.toArray(String[]::new));
|
||||
Log.info("Mounted");
|
||||
}
|
||||
|
||||
void shutdown(@Observes @Priority(1) ShutdownEvent event) {
|
||||
|
||||
@@ -14,8 +14,6 @@ dhfs.objects.deletion.can-delete-retry-delay=10000
|
||||
dhfs.objects.ref_verification=true
|
||||
dhfs.objects.autosync.threads=8
|
||||
dhfs.objects.autosync.download-all=false
|
||||
dhfs.objects.move-processor.threads=8
|
||||
dhfs.objects.ref-processor.threads=8
|
||||
dhfs.local-discovery=true
|
||||
dhfs.peerdiscovery.timeout=10000
|
||||
quarkus.log.category."com.usatiuk".min-level=TRACE
|
||||
|
||||
@@ -18,6 +18,11 @@
|
||||
</properties>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>com.github.ben-manes.caffeine</groupId>
|
||||
<artifactId>caffeine</artifactId>
|
||||
<version>3.2.2</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>net.jqwik</groupId>
|
||||
<artifactId>jqwik</artifactId>
|
||||
|
||||
@@ -1,9 +1,12 @@
|
||||
package com.usatiuk.objects.stores;
|
||||
|
||||
import com.github.benmanes.caffeine.cache.Caffeine;
|
||||
import com.usatiuk.objects.JDataVersionedWrapper;
|
||||
import com.usatiuk.objects.JDataVersionedWrapperLazy;
|
||||
import com.usatiuk.objects.JObjectKey;
|
||||
import com.usatiuk.objects.iterators.*;
|
||||
import com.usatiuk.objects.iterators.CloseableKvIterator;
|
||||
import com.usatiuk.objects.iterators.IteratorStart;
|
||||
import com.usatiuk.objects.iterators.MaybeTombstone;
|
||||
import com.usatiuk.objects.snapshot.Snapshot;
|
||||
import com.usatiuk.utils.ListUtils;
|
||||
import io.quarkus.logging.Log;
|
||||
@@ -14,15 +17,11 @@ import jakarta.enterprise.event.Observes;
|
||||
import jakarta.inject.Inject;
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
import org.eclipse.microprofile.config.inject.ConfigProperty;
|
||||
import org.pcollections.TreePMap;
|
||||
|
||||
import javax.annotation.Nonnull;
|
||||
import java.time.Duration;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
/**
|
||||
* CachingObjectPersistentStore is a caching layer for the SerializingObjectPersistentStore
|
||||
@@ -31,61 +30,49 @@ import java.util.concurrent.atomic.AtomicReference;
|
||||
*/
|
||||
@ApplicationScoped
|
||||
public class CachingObjectPersistentStore {
|
||||
private final AtomicReference<Cache> _cache;
|
||||
@Inject
|
||||
SerializingObjectPersistentStore delegate;
|
||||
@ConfigProperty(name = "dhfs.objects.lru.print-stats")
|
||||
boolean printStats;
|
||||
private ExecutorService _statusExecutor;
|
||||
private AtomicLong _cached = new AtomicLong();
|
||||
private AtomicLong _cacheTries = new AtomicLong();
|
||||
// private ExecutorService _statusExecutor;
|
||||
|
||||
private final com.github.benmanes.caffeine.cache.Cache<Pair<Long, JObjectKey>, JDataVersionedWrapper> _cache;
|
||||
|
||||
public CachingObjectPersistentStore(@ConfigProperty(name = "dhfs.objects.lru.limit") int sizeLimit) {
|
||||
_cache = new AtomicReference<>(
|
||||
new Cache(TreePMap.empty(), 0, -1, sizeLimit)
|
||||
);
|
||||
_cache = Caffeine.newBuilder()
|
||||
.maximumWeight(sizeLimit)
|
||||
.weigher((Pair<Long, JObjectKey> key, JDataVersionedWrapper value) -> value.estimateSize())
|
||||
.expireAfterWrite(Duration.ofMinutes(5)).build();
|
||||
}
|
||||
|
||||
void init(@Observes @Priority(110) StartupEvent event) {
|
||||
try (var s = delegate.getSnapshot()) {
|
||||
_cache.set(_cache.get().withVersion(s.id()));
|
||||
}
|
||||
|
||||
if (printStats) {
|
||||
_statusExecutor = Executors.newSingleThreadExecutor();
|
||||
_statusExecutor.submit(() -> {
|
||||
try {
|
||||
while (true) {
|
||||
Log.infov("Cache status: size=" + _cache.get().size() / 1024 / 1024 + "MB" + " cache success ratio: " + (_cached.get() / (double) _cacheTries.get()));
|
||||
_cached.set(0);
|
||||
_cacheTries.set(0);
|
||||
Thread.sleep(1000);
|
||||
}
|
||||
} catch (InterruptedException ignored) {
|
||||
}
|
||||
});
|
||||
}
|
||||
// if (printStats) {
|
||||
// _statusExecutor = Executors.newSingleThreadExecutor();
|
||||
// _statusExecutor.submit(() -> {
|
||||
// try {
|
||||
// while (true) {
|
||||
// Log.infov("Cache status: size=" + _cache.estimatedSize() / 1024 / 1024 + "MB" + " cache success ratio: " + (_cached.get() / (double) _cacheTries.get()));
|
||||
// _cached.set(0);
|
||||
// _cacheTries.set(0);
|
||||
// Thread.sleep(1000);
|
||||
// }
|
||||
// } catch (InterruptedException ignored) {
|
||||
// }
|
||||
// });
|
||||
// }
|
||||
}
|
||||
|
||||
/**
|
||||
* Commit the transaction to the underlying store and update the cache.
|
||||
* Once this function returns, the transaction is committed and the cache is updated.
|
||||
*
|
||||
* @param objs the transaction manifest object
|
||||
* @param txId the transaction ID
|
||||
*/
|
||||
public void commitTx(TxManifestObj<? extends JDataVersionedWrapper> objs, long txId) {
|
||||
Log.tracev("Committing: {0} writes, {1} deletes", objs.written().size(), objs.deleted().size());
|
||||
|
||||
var cache = _cache.get();
|
||||
for (var write : objs.written()) {
|
||||
cache = cache.withPut(write.getLeft(), Optional.of(write.getRight()));
|
||||
}
|
||||
for (var del : objs.deleted()) {
|
||||
cache = cache.withPut(del, Optional.empty());
|
||||
}
|
||||
cache = cache.withVersion(txId);
|
||||
delegate.commitTx(objs, txId);
|
||||
_cache.set(cache);
|
||||
|
||||
Log.tracev("Committed: {0} writes, {1} deletes", objs.written().size(), objs.deleted().size());
|
||||
}
|
||||
@@ -94,60 +81,28 @@ public class CachingObjectPersistentStore {
|
||||
* Get a snapshot of underlying store and the cache.
|
||||
* Objects are read from the cache if possible, if not, they are read from the underlying store,
|
||||
* then possibly lazily cached when their data is accessed.
|
||||
*
|
||||
* @return a snapshot of the cached store
|
||||
*/
|
||||
public Snapshot<JObjectKey, JDataVersionedWrapper> getSnapshot() {
|
||||
while (true) {
|
||||
var cache = _cache.get();
|
||||
|
||||
if (cache == null)
|
||||
return delegate.getSnapshot();
|
||||
|
||||
Cache curCache = null;
|
||||
Snapshot<JObjectKey, JDataVersionedWrapper> backing = null;
|
||||
|
||||
try {
|
||||
curCache = _cache.get();
|
||||
backing = delegate.getSnapshot();
|
||||
|
||||
if (curCache.version() != backing.id()) {
|
||||
backing.close();
|
||||
backing = null;
|
||||
continue;
|
||||
}
|
||||
Snapshot<JObjectKey, JDataVersionedWrapper> finalBacking = backing;
|
||||
Cache finalCurCache = curCache;
|
||||
return new Snapshot<JObjectKey, JDataVersionedWrapper>() {
|
||||
private final Cache _curCache = finalCurCache;
|
||||
private final Snapshot<JObjectKey, JDataVersionedWrapper> _backing = finalBacking;
|
||||
private boolean _invalid = false;
|
||||
private boolean _closed = false;
|
||||
|
||||
private void doCache(JObjectKey key, Optional<JDataVersionedWrapper> obj) {
|
||||
_cacheTries.incrementAndGet();
|
||||
if (_invalid)
|
||||
return;
|
||||
|
||||
var globalCache = _cache.get();
|
||||
if (globalCache.version() != _curCache.version()) {
|
||||
_invalid = true;
|
||||
return;
|
||||
}
|
||||
|
||||
var newCache = globalCache.withPut(key, obj);
|
||||
if (_cache.compareAndSet(globalCache, newCache))
|
||||
_cached.incrementAndGet();
|
||||
private void doCache(JObjectKey key, JDataVersionedWrapper obj) {
|
||||
var cacheKey = Pair.of(obj.version(), key);
|
||||
_cache.put(cacheKey, obj);
|
||||
}
|
||||
|
||||
private void maybeCache(JObjectKey key, Optional<JDataVersionedWrapper> obj) {
|
||||
if (obj.isEmpty()) {
|
||||
doCache(key, obj);
|
||||
return;
|
||||
}
|
||||
|
||||
var wrapper = obj.get();
|
||||
|
||||
if (!(wrapper instanceof JDataVersionedWrapperLazy lazy)) {
|
||||
private void maybeCache(JObjectKey key, JDataVersionedWrapper obj) {
|
||||
if (!(obj instanceof JDataVersionedWrapperLazy lazy)) {
|
||||
doCache(key, obj);
|
||||
return;
|
||||
}
|
||||
@@ -164,29 +119,25 @@ public class CachingObjectPersistentStore {
|
||||
|
||||
@Override
|
||||
public List<CloseableKvIterator<JObjectKey, MaybeTombstone<JDataVersionedWrapper>>> getIterator(IteratorStart start, JObjectKey key) {
|
||||
return ListUtils.prependAndMap(
|
||||
new NavigableMapKvIterator<JObjectKey, MaybeTombstone<JDataVersionedWrapper>>(_curCache.map(), start, key),
|
||||
return ListUtils.map(
|
||||
_backing.getIterator(start, key),
|
||||
i -> new CachingKvIterator((CloseableKvIterator<JObjectKey, JDataVersionedWrapper>) (CloseableKvIterator<JObjectKey, ?>) i)
|
||||
);
|
||||
}
|
||||
|
||||
private JDataVersionedWrapper tryGetCached(JObjectKey key, JDataVersionedWrapper obj) {
|
||||
var cached = _cache.getIfPresent(Pair.of(obj.version(), key));
|
||||
if (cached != null) {
|
||||
return cached;
|
||||
}
|
||||
maybeCache(key, obj);
|
||||
return obj;
|
||||
}
|
||||
|
||||
@Nonnull
|
||||
@Override
|
||||
public Optional<JDataVersionedWrapper> readObject(JObjectKey name) {
|
||||
var cached = _curCache.map().get(name);
|
||||
if (cached != null) {
|
||||
return switch (cached) {
|
||||
case CacheEntryPresent data -> Optional.of(data.value());
|
||||
case CacheEntryMiss tombstone -> {
|
||||
yield Optional.empty();
|
||||
}
|
||||
default -> throw new IllegalStateException("Unexpected value: " + cached);
|
||||
};
|
||||
}
|
||||
var read = _backing.readObject(name);
|
||||
maybeCache(name, read);
|
||||
return read;
|
||||
return _backing.readObject(name).map(o -> tryGetCached(name, o));
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -235,8 +186,8 @@ public class CachingObjectPersistentStore {
|
||||
@Override
|
||||
public Pair<JObjectKey, MaybeTombstone<JDataVersionedWrapper>> prev() {
|
||||
var prev = _delegate.prev();
|
||||
maybeCache(prev.getKey(), Optional.of(prev.getValue()));
|
||||
return (Pair<JObjectKey, MaybeTombstone<JDataVersionedWrapper>>) (Pair<JObjectKey, ?>) prev;
|
||||
var cached = tryGetCached(prev.getKey(), prev.getValue());
|
||||
return Pair.of(prev.getKey(), cached);
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -252,8 +203,8 @@ public class CachingObjectPersistentStore {
|
||||
@Override
|
||||
public Pair<JObjectKey, MaybeTombstone<JDataVersionedWrapper>> next() {
|
||||
var next = _delegate.next();
|
||||
maybeCache(next.getKey(), Optional.of(next.getValue()));
|
||||
return (Pair<JObjectKey, MaybeTombstone<JDataVersionedWrapper>>) (Pair<JObjectKey, ?>) next;
|
||||
var cached = tryGetCached(next.getKey(), next.getValue());
|
||||
return Pair.of(next.getKey(), cached);
|
||||
}
|
||||
}
|
||||
};
|
||||
@@ -265,54 +216,4 @@ public class CachingObjectPersistentStore {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private interface CacheEntry extends MaybeTombstone<JDataVersionedWrapper> {
|
||||
int size();
|
||||
}
|
||||
|
||||
private record Cache(TreePMap<JObjectKey, CacheEntry> map,
|
||||
int size,
|
||||
long version,
|
||||
int sizeLimit) {
|
||||
public Cache withPut(JObjectKey key, Optional<JDataVersionedWrapper> obj) {
|
||||
var entry = obj.<CacheEntry>map(o -> new CacheEntryPresent(o, o.estimateSize())).orElse(new CacheEntryMiss());
|
||||
|
||||
int newSize = size() + entry.size();
|
||||
|
||||
var old = map.get(key);
|
||||
if (old != null)
|
||||
newSize -= old.size();
|
||||
|
||||
TreePMap<JObjectKey, CacheEntry> newCache = map();
|
||||
|
||||
while (newSize > sizeLimit) {
|
||||
var del = newCache.firstEntry();
|
||||
newCache = newCache.minusFirstEntry();
|
||||
newSize -= del.getValue().size();
|
||||
}
|
||||
|
||||
newCache = newCache.plus(key, entry);
|
||||
return new Cache(
|
||||
newCache,
|
||||
newSize,
|
||||
version,
|
||||
sizeLimit
|
||||
);
|
||||
}
|
||||
|
||||
public Cache withVersion(long version) {
|
||||
return new Cache(map, size, version, sizeLimit);
|
||||
}
|
||||
}
|
||||
|
||||
private record CacheEntryPresent(JDataVersionedWrapper value,
|
||||
int size) implements CacheEntry, Data<JDataVersionedWrapper> {
|
||||
}
|
||||
|
||||
private record CacheEntryMiss() implements CacheEntry, Tombstone<JDataVersionedWrapper> {
|
||||
@Override
|
||||
public int size() {
|
||||
return 64;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -307,6 +307,15 @@ public class WritebackObjectPersistentStore {
|
||||
return r -> asyncFence(bundleId, r);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the last committed transaction ID.
|
||||
*
|
||||
* @return the last committed transaction ID
|
||||
*/
|
||||
public long getLastCommitId() {
|
||||
return _lastCommittedId.get();
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a snapshot of the persistent store, including the pending writes.
|
||||
*
|
||||
|
||||
@@ -165,7 +165,6 @@ public class TransactionService {
|
||||
toUnlock.add(lock);
|
||||
}
|
||||
|
||||
commitSnapshot = writebackObjectPersistentStore.getSnapshot();
|
||||
} else {
|
||||
Log.trace("Committing transaction - no changes");
|
||||
|
||||
@@ -201,7 +200,10 @@ public class TransactionService {
|
||||
Log.trace("Committing transaction start");
|
||||
var snapshotId = tx.snapshot().id();
|
||||
|
||||
if (snapshotId != commitSnapshot.id()) {
|
||||
// All dependencies are locked and could not be changed concurrently now
|
||||
if (snapshotId != writebackObjectPersistentStore.getLastCommitId()) {
|
||||
commitSnapshot = writebackObjectPersistentStore.getSnapshot();
|
||||
|
||||
for (var read : readSet.entrySet()) {
|
||||
var current = commitSnapshot.readObject(read.getKey());
|
||||
|
||||
|
||||
@@ -28,8 +28,8 @@
|
||||
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
|
||||
<quarkus.platform.artifact-id>quarkus-bom</quarkus.platform.artifact-id>
|
||||
<quarkus.platform.group-id>io.quarkus.platform</quarkus.platform.group-id>
|
||||
<quarkus.platform.version>3.20.0</quarkus.platform.version>
|
||||
<surefire-plugin.version>3.5.2</surefire-plugin.version>
|
||||
<quarkus.platform.version>3.27.0</quarkus.platform.version>
|
||||
<surefire-plugin.version>3.5.4</surefire-plugin.version>
|
||||
<dhfs.native-libs-dir>${project.parent.build.outputDirectory}/native</dhfs.native-libs-dir>
|
||||
</properties>
|
||||
|
||||
@@ -63,7 +63,7 @@
|
||||
<dependency>
|
||||
<groupId>org.junit.jupiter</groupId>
|
||||
<artifactId>junit-jupiter-engine</artifactId>
|
||||
<version>5.9.1</version>
|
||||
<version>5.13.4</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
|
||||
@@ -17,8 +17,8 @@ dhfs.objects.ref_verification=true
|
||||
dhfs.files.use_hash_for_chunks=false
|
||||
dhfs.objects.autosync.threads=16
|
||||
dhfs.objects.autosync.download-all=false
|
||||
dhfs.objects.move-processor.threads=16
|
||||
dhfs.objects.ref-processor.threads=16
|
||||
dhfs.objects.move-processor.threads=8
|
||||
dhfs.objects.ref-processor.threads=8
|
||||
dhfs.objects.opsender.batch-size=100
|
||||
dhfs.objects.lock_timeout_secs=2
|
||||
dhfs.local-discovery=true
|
||||
|
||||
Reference in New Issue
Block a user