less bad read cache

This commit is contained in:
2025-08-22 18:47:43 +02:00
parent c4945e7354
commit 19016d5e46
5 changed files with 55 additions and 153 deletions

View File

@@ -18,8 +18,6 @@ dhfs.objects.ref_verification=true
dhfs.files.use_hash_for_chunks=false
dhfs.objects.autosync.threads=16
dhfs.objects.autosync.download-all=false
dhfs.objects.move-processor.threads=16
dhfs.objects.ref-processor.threads=16
dhfs.objects.opsender.batch-size=100
dhfs.objects.lock_timeout_secs=2
dhfs.local-discovery=true

View File

@@ -14,8 +14,6 @@ dhfs.objects.deletion.can-delete-retry-delay=10000
dhfs.objects.ref_verification=true
dhfs.objects.autosync.threads=8
dhfs.objects.autosync.download-all=false
dhfs.objects.move-processor.threads=8
dhfs.objects.ref-processor.threads=8
dhfs.local-discovery=true
dhfs.peerdiscovery.timeout=10000
quarkus.log.category."com.usatiuk".min-level=TRACE

View File

@@ -18,6 +18,11 @@
</properties>
<dependencies>
<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
<version>3.2.2</version>
</dependency>
<dependency>
<groupId>net.jqwik</groupId>
<artifactId>jqwik</artifactId>

View File

@@ -1,9 +1,12 @@
package com.usatiuk.objects.stores;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.usatiuk.objects.JDataVersionedWrapper;
import com.usatiuk.objects.JDataVersionedWrapperLazy;
import com.usatiuk.objects.JObjectKey;
import com.usatiuk.objects.iterators.*;
import com.usatiuk.objects.iterators.CloseableKvIterator;
import com.usatiuk.objects.iterators.IteratorStart;
import com.usatiuk.objects.iterators.MaybeTombstone;
import com.usatiuk.objects.snapshot.Snapshot;
import com.usatiuk.utils.ListUtils;
import io.quarkus.logging.Log;
@@ -14,15 +17,11 @@ import jakarta.enterprise.event.Observes;
import jakarta.inject.Inject;
import org.apache.commons.lang3.tuple.Pair;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.pcollections.TreePMap;
import javax.annotation.Nonnull;
import java.time.Duration;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
/**
* CachingObjectPersistentStore is a caching layer for the SerializingObjectPersistentStore
@@ -31,61 +30,49 @@ import java.util.concurrent.atomic.AtomicReference;
*/
@ApplicationScoped
public class CachingObjectPersistentStore {
private final AtomicReference<Cache> _cache;
@Inject
SerializingObjectPersistentStore delegate;
@ConfigProperty(name = "dhfs.objects.lru.print-stats")
boolean printStats;
private ExecutorService _statusExecutor;
private AtomicLong _cached = new AtomicLong();
private AtomicLong _cacheTries = new AtomicLong();
// private ExecutorService _statusExecutor;
private final com.github.benmanes.caffeine.cache.Cache<Pair<Long, JObjectKey>, JDataVersionedWrapper> _cache;
public CachingObjectPersistentStore(@ConfigProperty(name = "dhfs.objects.lru.limit") int sizeLimit) {
_cache = new AtomicReference<>(
new Cache(TreePMap.empty(), 0, -1, sizeLimit)
);
_cache = Caffeine.newBuilder()
.maximumWeight(sizeLimit)
.weigher((Pair<Long, JObjectKey> key, JDataVersionedWrapper value) -> value.estimateSize())
.expireAfterWrite(Duration.ofMinutes(5)).build();
}
void init(@Observes @Priority(110) StartupEvent event) {
try (var s = delegate.getSnapshot()) {
_cache.set(_cache.get().withVersion(s.id()));
}
if (printStats) {
_statusExecutor = Executors.newSingleThreadExecutor();
_statusExecutor.submit(() -> {
try {
while (true) {
Log.infov("Cache status: size=" + _cache.get().size() / 1024 / 1024 + "MB" + " cache success ratio: " + (_cached.get() / (double) _cacheTries.get()));
_cached.set(0);
_cacheTries.set(0);
Thread.sleep(1000);
}
} catch (InterruptedException ignored) {
}
});
}
// if (printStats) {
// _statusExecutor = Executors.newSingleThreadExecutor();
// _statusExecutor.submit(() -> {
// try {
// while (true) {
// Log.infov("Cache status: size=" + _cache.estimatedSize() / 1024 / 1024 + "MB" + " cache success ratio: " + (_cached.get() / (double) _cacheTries.get()));
// _cached.set(0);
// _cacheTries.set(0);
// Thread.sleep(1000);
// }
// } catch (InterruptedException ignored) {
// }
// });
// }
}
/**
* Commit the transaction to the underlying store and update the cache.
* Once this function returns, the transaction is committed and the cache is updated.
*
* @param objs the transaction manifest object
* @param txId the transaction ID
*/
public void commitTx(TxManifestObj<? extends JDataVersionedWrapper> objs, long txId) {
Log.tracev("Committing: {0} writes, {1} deletes", objs.written().size(), objs.deleted().size());
var cache = _cache.get();
for (var write : objs.written()) {
cache = cache.withPut(write.getLeft(), Optional.of(write.getRight()));
}
for (var del : objs.deleted()) {
cache = cache.withPut(del, Optional.empty());
}
cache = cache.withVersion(txId);
delegate.commitTx(objs, txId);
_cache.set(cache);
Log.tracev("Committed: {0} writes, {1} deletes", objs.written().size(), objs.deleted().size());
}
@@ -94,60 +81,28 @@ public class CachingObjectPersistentStore {
* Get a snapshot of underlying store and the cache.
* Objects are read from the cache if possible, if not, they are read from the underlying store,
* then possibly lazily cached when their data is accessed.
*
* @return a snapshot of the cached store
*/
public Snapshot<JObjectKey, JDataVersionedWrapper> getSnapshot() {
while (true) {
var cache = _cache.get();
if (cache == null)
return delegate.getSnapshot();
Cache curCache = null;
Snapshot<JObjectKey, JDataVersionedWrapper> backing = null;
try {
curCache = _cache.get();
backing = delegate.getSnapshot();
if (curCache.version() != backing.id()) {
backing.close();
backing = null;
continue;
}
Snapshot<JObjectKey, JDataVersionedWrapper> finalBacking = backing;
Cache finalCurCache = curCache;
return new Snapshot<JObjectKey, JDataVersionedWrapper>() {
private final Cache _curCache = finalCurCache;
private final Snapshot<JObjectKey, JDataVersionedWrapper> _backing = finalBacking;
private boolean _invalid = false;
private boolean _closed = false;
private void doCache(JObjectKey key, Optional<JDataVersionedWrapper> obj) {
_cacheTries.incrementAndGet();
if (_invalid)
return;
var globalCache = _cache.get();
if (globalCache.version() != _curCache.version()) {
_invalid = true;
return;
private void doCache(JObjectKey key, JDataVersionedWrapper obj) {
var cacheKey = Pair.of(obj.version(), key);
_cache.put(cacheKey, obj);
}
var newCache = globalCache.withPut(key, obj);
if (_cache.compareAndSet(globalCache, newCache))
_cached.incrementAndGet();
}
private void maybeCache(JObjectKey key, Optional<JDataVersionedWrapper> obj) {
if (obj.isEmpty()) {
doCache(key, obj);
return;
}
var wrapper = obj.get();
if (!(wrapper instanceof JDataVersionedWrapperLazy lazy)) {
private void maybeCache(JObjectKey key, JDataVersionedWrapper obj) {
if (!(obj instanceof JDataVersionedWrapperLazy lazy)) {
doCache(key, obj);
return;
}
@@ -164,29 +119,25 @@ public class CachingObjectPersistentStore {
@Override
public List<CloseableKvIterator<JObjectKey, MaybeTombstone<JDataVersionedWrapper>>> getIterator(IteratorStart start, JObjectKey key) {
return ListUtils.prependAndMap(
new NavigableMapKvIterator<JObjectKey, MaybeTombstone<JDataVersionedWrapper>>(_curCache.map(), start, key),
return ListUtils.map(
_backing.getIterator(start, key),
i -> new CachingKvIterator((CloseableKvIterator<JObjectKey, JDataVersionedWrapper>) (CloseableKvIterator<JObjectKey, ?>) i)
);
}
private JDataVersionedWrapper tryGetCached(JObjectKey key, JDataVersionedWrapper obj) {
var cached = _cache.getIfPresent(Pair.of(obj.version(), key));
if (cached != null) {
return cached;
}
maybeCache(key, obj);
return obj;
}
@Nonnull
@Override
public Optional<JDataVersionedWrapper> readObject(JObjectKey name) {
var cached = _curCache.map().get(name);
if (cached != null) {
return switch (cached) {
case CacheEntryPresent data -> Optional.of(data.value());
case CacheEntryMiss tombstone -> {
yield Optional.empty();
}
default -> throw new IllegalStateException("Unexpected value: " + cached);
};
}
var read = _backing.readObject(name);
maybeCache(name, read);
return read;
return _backing.readObject(name).map(o -> tryGetCached(name, o));
}
@Override
@@ -235,8 +186,8 @@ public class CachingObjectPersistentStore {
@Override
public Pair<JObjectKey, MaybeTombstone<JDataVersionedWrapper>> prev() {
var prev = _delegate.prev();
maybeCache(prev.getKey(), Optional.of(prev.getValue()));
return (Pair<JObjectKey, MaybeTombstone<JDataVersionedWrapper>>) (Pair<JObjectKey, ?>) prev;
var cached = tryGetCached(prev.getKey(), prev.getValue());
return Pair.of(prev.getKey(), cached);
}
@Override
@@ -252,8 +203,8 @@ public class CachingObjectPersistentStore {
@Override
public Pair<JObjectKey, MaybeTombstone<JDataVersionedWrapper>> next() {
var next = _delegate.next();
maybeCache(next.getKey(), Optional.of(next.getValue()));
return (Pair<JObjectKey, MaybeTombstone<JDataVersionedWrapper>>) (Pair<JObjectKey, ?>) next;
var cached = tryGetCached(next.getKey(), next.getValue());
return Pair.of(next.getKey(), cached);
}
}
};
@@ -265,54 +216,4 @@ public class CachingObjectPersistentStore {
}
}
}
private interface CacheEntry extends MaybeTombstone<JDataVersionedWrapper> {
int size();
}
private record Cache(TreePMap<JObjectKey, CacheEntry> map,
int size,
long version,
int sizeLimit) {
public Cache withPut(JObjectKey key, Optional<JDataVersionedWrapper> obj) {
var entry = obj.<CacheEntry>map(o -> new CacheEntryPresent(o, o.estimateSize())).orElse(new CacheEntryMiss());
int newSize = size() + entry.size();
var old = map.get(key);
if (old != null)
newSize -= old.size();
TreePMap<JObjectKey, CacheEntry> newCache = map();
while (newSize > sizeLimit) {
var del = newCache.firstEntry();
newCache = newCache.minusFirstEntry();
newSize -= del.getValue().size();
}
newCache = newCache.plus(key, entry);
return new Cache(
newCache,
newSize,
version,
sizeLimit
);
}
public Cache withVersion(long version) {
return new Cache(map, size, version, sizeLimit);
}
}
private record CacheEntryPresent(JDataVersionedWrapper value,
int size) implements CacheEntry, Data<JDataVersionedWrapper> {
}
private record CacheEntryMiss() implements CacheEntry, Tombstone<JDataVersionedWrapper> {
@Override
public int size() {
return 64;
}
}
}

View File

@@ -17,8 +17,8 @@ dhfs.objects.ref_verification=true
dhfs.files.use_hash_for_chunks=false
dhfs.objects.autosync.threads=16
dhfs.objects.autosync.download-all=false
dhfs.objects.move-processor.threads=16
dhfs.objects.ref-processor.threads=16
dhfs.objects.move-processor.threads=8
dhfs.objects.ref-processor.threads=8
dhfs.objects.opsender.batch-size=100
dhfs.objects.lock_timeout_secs=2
dhfs.local-discovery=true