diff --git a/dhfs-parent/dhfs-fs/src/main/resources/application.properties b/dhfs-parent/dhfs-fs/src/main/resources/application.properties index a2132ecd..c6611d0d 100644 --- a/dhfs-parent/dhfs-fs/src/main/resources/application.properties +++ b/dhfs-parent/dhfs-fs/src/main/resources/application.properties @@ -18,8 +18,6 @@ dhfs.objects.ref_verification=true dhfs.files.use_hash_for_chunks=false dhfs.objects.autosync.threads=16 dhfs.objects.autosync.download-all=false -dhfs.objects.move-processor.threads=16 -dhfs.objects.ref-processor.threads=16 dhfs.objects.opsender.batch-size=100 dhfs.objects.lock_timeout_secs=2 dhfs.local-discovery=true diff --git a/dhfs-parent/dhfs-fuse/src/main/resources/application.properties b/dhfs-parent/dhfs-fuse/src/main/resources/application.properties index 8c02bee4..9b4a9dd4 100644 --- a/dhfs-parent/dhfs-fuse/src/main/resources/application.properties +++ b/dhfs-parent/dhfs-fuse/src/main/resources/application.properties @@ -14,8 +14,6 @@ dhfs.objects.deletion.can-delete-retry-delay=10000 dhfs.objects.ref_verification=true dhfs.objects.autosync.threads=8 dhfs.objects.autosync.download-all=false -dhfs.objects.move-processor.threads=8 -dhfs.objects.ref-processor.threads=8 dhfs.local-discovery=true dhfs.peerdiscovery.timeout=10000 quarkus.log.category."com.usatiuk".min-level=TRACE diff --git a/dhfs-parent/objects/pom.xml b/dhfs-parent/objects/pom.xml index 566217bb..8190f576 100644 --- a/dhfs-parent/objects/pom.xml +++ b/dhfs-parent/objects/pom.xml @@ -18,6 +18,11 @@ + + com.github.ben-manes.caffeine + caffeine + 3.2.2 + net.jqwik jqwik diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/stores/CachingObjectPersistentStore.java b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/stores/CachingObjectPersistentStore.java index 8a654002..ff5b7477 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/stores/CachingObjectPersistentStore.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/stores/CachingObjectPersistentStore.java @@ -1,9 +1,12 @@ package com.usatiuk.objects.stores; +import com.github.benmanes.caffeine.cache.Caffeine; import com.usatiuk.objects.JDataVersionedWrapper; import com.usatiuk.objects.JDataVersionedWrapperLazy; import com.usatiuk.objects.JObjectKey; -import com.usatiuk.objects.iterators.*; +import com.usatiuk.objects.iterators.CloseableKvIterator; +import com.usatiuk.objects.iterators.IteratorStart; +import com.usatiuk.objects.iterators.MaybeTombstone; import com.usatiuk.objects.snapshot.Snapshot; import com.usatiuk.utils.ListUtils; import io.quarkus.logging.Log; @@ -14,15 +17,11 @@ import jakarta.enterprise.event.Observes; import jakarta.inject.Inject; import org.apache.commons.lang3.tuple.Pair; import org.eclipse.microprofile.config.inject.ConfigProperty; -import org.pcollections.TreePMap; import javax.annotation.Nonnull; +import java.time.Duration; import java.util.List; import java.util.Optional; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; /** * CachingObjectPersistentStore is a caching layer for the SerializingObjectPersistentStore @@ -31,61 +30,49 @@ import java.util.concurrent.atomic.AtomicReference; */ @ApplicationScoped public class CachingObjectPersistentStore { - private final AtomicReference _cache; @Inject SerializingObjectPersistentStore delegate; @ConfigProperty(name = "dhfs.objects.lru.print-stats") boolean printStats; - private ExecutorService _statusExecutor; - private AtomicLong _cached = new AtomicLong(); - private AtomicLong _cacheTries = new AtomicLong(); +// private ExecutorService _statusExecutor; + + private final com.github.benmanes.caffeine.cache.Cache, JDataVersionedWrapper> _cache; public CachingObjectPersistentStore(@ConfigProperty(name = "dhfs.objects.lru.limit") int sizeLimit) { - _cache = new AtomicReference<>( - new Cache(TreePMap.empty(), 0, -1, sizeLimit) - ); + _cache = Caffeine.newBuilder() + .maximumWeight(sizeLimit) + .weigher((Pair key, JDataVersionedWrapper value) -> value.estimateSize()) + .expireAfterWrite(Duration.ofMinutes(5)).build(); } void init(@Observes @Priority(110) StartupEvent event) { - try (var s = delegate.getSnapshot()) { - _cache.set(_cache.get().withVersion(s.id())); - } - - if (printStats) { - _statusExecutor = Executors.newSingleThreadExecutor(); - _statusExecutor.submit(() -> { - try { - while (true) { - Log.infov("Cache status: size=" + _cache.get().size() / 1024 / 1024 + "MB" + " cache success ratio: " + (_cached.get() / (double) _cacheTries.get())); - _cached.set(0); - _cacheTries.set(0); - Thread.sleep(1000); - } - } catch (InterruptedException ignored) { - } - }); - } +// if (printStats) { +// _statusExecutor = Executors.newSingleThreadExecutor(); +// _statusExecutor.submit(() -> { +// try { +// while (true) { +// Log.infov("Cache status: size=" + _cache.estimatedSize() / 1024 / 1024 + "MB" + " cache success ratio: " + (_cached.get() / (double) _cacheTries.get())); +// _cached.set(0); +// _cacheTries.set(0); +// Thread.sleep(1000); +// } +// } catch (InterruptedException ignored) { +// } +// }); +// } } /** * Commit the transaction to the underlying store and update the cache. * Once this function returns, the transaction is committed and the cache is updated. + * * @param objs the transaction manifest object * @param txId the transaction ID */ public void commitTx(TxManifestObj objs, long txId) { Log.tracev("Committing: {0} writes, {1} deletes", objs.written().size(), objs.deleted().size()); - var cache = _cache.get(); - for (var write : objs.written()) { - cache = cache.withPut(write.getLeft(), Optional.of(write.getRight())); - } - for (var del : objs.deleted()) { - cache = cache.withPut(del, Optional.empty()); - } - cache = cache.withVersion(txId); delegate.commitTx(objs, txId); - _cache.set(cache); Log.tracev("Committed: {0} writes, {1} deletes", objs.written().size(), objs.deleted().size()); } @@ -94,60 +81,28 @@ public class CachingObjectPersistentStore { * Get a snapshot of underlying store and the cache. * Objects are read from the cache if possible, if not, they are read from the underlying store, * then possibly lazily cached when their data is accessed. + * * @return a snapshot of the cached store */ public Snapshot getSnapshot() { while (true) { - var cache = _cache.get(); - - if (cache == null) - return delegate.getSnapshot(); - - Cache curCache = null; Snapshot backing = null; try { - curCache = _cache.get(); backing = delegate.getSnapshot(); - if (curCache.version() != backing.id()) { - backing.close(); - backing = null; - continue; - } Snapshot finalBacking = backing; - Cache finalCurCache = curCache; return new Snapshot() { - private final Cache _curCache = finalCurCache; private final Snapshot _backing = finalBacking; - private boolean _invalid = false; private boolean _closed = false; - private void doCache(JObjectKey key, Optional obj) { - _cacheTries.incrementAndGet(); - if (_invalid) - return; - - var globalCache = _cache.get(); - if (globalCache.version() != _curCache.version()) { - _invalid = true; - return; - } - - var newCache = globalCache.withPut(key, obj); - if (_cache.compareAndSet(globalCache, newCache)) - _cached.incrementAndGet(); + private void doCache(JObjectKey key, JDataVersionedWrapper obj) { + var cacheKey = Pair.of(obj.version(), key); + _cache.put(cacheKey, obj); } - private void maybeCache(JObjectKey key, Optional obj) { - if (obj.isEmpty()) { - doCache(key, obj); - return; - } - - var wrapper = obj.get(); - - if (!(wrapper instanceof JDataVersionedWrapperLazy lazy)) { + private void maybeCache(JObjectKey key, JDataVersionedWrapper obj) { + if (!(obj instanceof JDataVersionedWrapperLazy lazy)) { doCache(key, obj); return; } @@ -164,29 +119,25 @@ public class CachingObjectPersistentStore { @Override public List>> getIterator(IteratorStart start, JObjectKey key) { - return ListUtils.prependAndMap( - new NavigableMapKvIterator>(_curCache.map(), start, key), + return ListUtils.map( _backing.getIterator(start, key), i -> new CachingKvIterator((CloseableKvIterator) (CloseableKvIterator) i) ); } + private JDataVersionedWrapper tryGetCached(JObjectKey key, JDataVersionedWrapper obj) { + var cached = _cache.getIfPresent(Pair.of(obj.version(), key)); + if (cached != null) { + return cached; + } + maybeCache(key, obj); + return obj; + } + @Nonnull @Override public Optional readObject(JObjectKey name) { - var cached = _curCache.map().get(name); - if (cached != null) { - return switch (cached) { - case CacheEntryPresent data -> Optional.of(data.value()); - case CacheEntryMiss tombstone -> { - yield Optional.empty(); - } - default -> throw new IllegalStateException("Unexpected value: " + cached); - }; - } - var read = _backing.readObject(name); - maybeCache(name, read); - return read; + return _backing.readObject(name).map(o -> tryGetCached(name, o)); } @Override @@ -235,8 +186,8 @@ public class CachingObjectPersistentStore { @Override public Pair> prev() { var prev = _delegate.prev(); - maybeCache(prev.getKey(), Optional.of(prev.getValue())); - return (Pair>) (Pair) prev; + var cached = tryGetCached(prev.getKey(), prev.getValue()); + return Pair.of(prev.getKey(), cached); } @Override @@ -252,8 +203,8 @@ public class CachingObjectPersistentStore { @Override public Pair> next() { var next = _delegate.next(); - maybeCache(next.getKey(), Optional.of(next.getValue())); - return (Pair>) (Pair) next; + var cached = tryGetCached(next.getKey(), next.getValue()); + return Pair.of(next.getKey(), cached); } } }; @@ -265,54 +216,4 @@ public class CachingObjectPersistentStore { } } } - - private interface CacheEntry extends MaybeTombstone { - int size(); - } - - private record Cache(TreePMap map, - int size, - long version, - int sizeLimit) { - public Cache withPut(JObjectKey key, Optional obj) { - var entry = obj.map(o -> new CacheEntryPresent(o, o.estimateSize())).orElse(new CacheEntryMiss()); - - int newSize = size() + entry.size(); - - var old = map.get(key); - if (old != null) - newSize -= old.size(); - - TreePMap newCache = map(); - - while (newSize > sizeLimit) { - var del = newCache.firstEntry(); - newCache = newCache.minusFirstEntry(); - newSize -= del.getValue().size(); - } - - newCache = newCache.plus(key, entry); - return new Cache( - newCache, - newSize, - version, - sizeLimit - ); - } - - public Cache withVersion(long version) { - return new Cache(map, size, version, sizeLimit); - } - } - - private record CacheEntryPresent(JDataVersionedWrapper value, - int size) implements CacheEntry, Data { - } - - private record CacheEntryMiss() implements CacheEntry, Tombstone { - @Override - public int size() { - return 64; - } - } } diff --git a/dhfs-parent/sync-base/src/main/resources/application.properties b/dhfs-parent/sync-base/src/main/resources/application.properties index fbd3e5e7..335c25de 100644 --- a/dhfs-parent/sync-base/src/main/resources/application.properties +++ b/dhfs-parent/sync-base/src/main/resources/application.properties @@ -17,8 +17,8 @@ dhfs.objects.ref_verification=true dhfs.files.use_hash_for_chunks=false dhfs.objects.autosync.threads=16 dhfs.objects.autosync.download-all=false -dhfs.objects.move-processor.threads=16 -dhfs.objects.ref-processor.threads=16 +dhfs.objects.move-processor.threads=8 +dhfs.objects.ref-processor.threads=8 dhfs.objects.opsender.batch-size=100 dhfs.objects.lock_timeout_secs=2 dhfs.local-discovery=true