diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/stores/CachingObjectPersistentStore.java b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/stores/CachingObjectPersistentStore.java index 85ad5e32..8c33a04d 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/stores/CachingObjectPersistentStore.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/stores/CachingObjectPersistentStore.java @@ -3,59 +3,77 @@ package com.usatiuk.objects.stores; import com.usatiuk.objects.JDataVersionedWrapper; import com.usatiuk.objects.JDataVersionedWrapperLazy; import com.usatiuk.objects.JObjectKey; -import com.usatiuk.objects.iterators.*; +import com.usatiuk.objects.iterators.CloseableKvIterator; +import com.usatiuk.objects.iterators.Data; +import com.usatiuk.objects.iterators.IteratorStart; +import com.usatiuk.objects.iterators.MaybeTombstone; import com.usatiuk.objects.snapshot.Snapshot; import com.usatiuk.utils.ListUtils; import io.quarkus.logging.Log; import io.quarkus.runtime.StartupEvent; +import jakarta.annotation.Nullable; import jakarta.annotation.Priority; import jakarta.enterprise.context.ApplicationScoped; import jakarta.enterprise.event.Observes; import jakarta.inject.Inject; import org.apache.commons.lang3.tuple.Pair; import org.eclipse.microprofile.config.inject.ConfigProperty; -import org.pcollections.TreePMap; import javax.annotation.Nonnull; +import java.util.LinkedHashMap; import java.util.List; import java.util.Optional; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; @ApplicationScoped public class CachingObjectPersistentStore { - private final AtomicReference _cache; @Inject SerializingObjectPersistentStore delegate; + @ConfigProperty(name = "dhfs.objects.lru.print-stats") boolean printStats; + @ConfigProperty(name = "dhfs.objects.lru.limit") + int sizeLimit; + private ExecutorService _commitExecutor; private ExecutorService _statusExecutor; - private AtomicLong _cached = new AtomicLong(); - private AtomicLong _cacheTries = new AtomicLong(); - public CachingObjectPersistentStore(@ConfigProperty(name = "dhfs.objects.lru.limit") int sizeLimit) { - _cache = new AtomicReference<>( - new Cache(TreePMap.empty(), 0, -1, sizeLimit) - ); + private final LinkedHashMap _cache = new LinkedHashMap<>(8, 0.75f, true); + private long _curSize = 0; + + private void put(JObjectKey key, JDataVersionedWrapper obj) { + synchronized (_cache) { + int size = obj.estimateSize(); + + _curSize += size; + var old = _cache.putLast(key, obj); + if (old != null) + _curSize -= old.estimateSize(); + + while (_curSize >= sizeLimit) { + var del = _cache.pollFirstEntry(); + _curSize -= del.getValue().estimateSize(); +// _evict++; + } + } } - void init(@Observes @Priority(110) StartupEvent event) { - try (var s = delegate.getSnapshot()) { - _cache.set(_cache.get().withVersion(s.id())); + private @Nullable JDataVersionedWrapper get(JObjectKey key) { + synchronized (_cache) { + return _cache.get(key); } + } + + void init(@Observes @Priority(110) StartupEvent event) { _commitExecutor = Executors.newSingleThreadExecutor(); if (printStats) { _statusExecutor = Executors.newSingleThreadExecutor(); _statusExecutor.submit(() -> { try { while (true) { - Log.infov("Cache status: size=" + _cache.get().size() / 1024 / 1024 + "MB" + " cache success ratio: " + (_cached.get() / (double) _cacheTries.get())); - _cached.set(0); - _cacheTries.set(0); + Log.infov("Cache status: size=" + _curSize / 1024 / 1024 + "MB"); Thread.sleep(1000); } } catch (InterruptedException ignored) { @@ -67,242 +85,162 @@ public class CachingObjectPersistentStore { public void commitTx(TxManifestObj objs, long txId) { Log.tracev("Committing: {0} writes, {1} deletes", objs.written().size(), objs.deleted().size()); - var cache = _cache.get(); - var commitFuture = _commitExecutor.submit(() -> delegate.prepareTx(objs, txId).run()); + var commitFuture = _commitExecutor.submit( + () -> delegate + .prepareTx(objs, txId) + .run() + ); for (var write : objs.written()) { - cache = cache.withPut(write.getLeft(), Optional.of(write.getRight())); + put(write.getKey(), write.getValue()); } - for (var del : objs.deleted()) { - cache = cache.withPut(del, Optional.empty()); - } - cache = cache.withVersion(txId); try { commitFuture.get(); } catch (Exception e) { throw new RuntimeException(e); } - _cache.set(cache); Log.tracev("Committed: {0} writes, {1} deletes", objs.written().size(), objs.deleted().size()); } public Snapshot getSnapshot() { - while (true) { - var cache = _cache.get(); + return new Snapshot() { + private final Snapshot _backing = delegate.getSnapshot(); + private boolean _invalid = false; + private boolean _closed = false; - if (cache == null) - return delegate.getSnapshot(); - - Cache curCache = null; - Snapshot backing = null; - - try { - curCache = _cache.get(); - backing = delegate.getSnapshot(); - - if (curCache.version() != backing.id()) { - backing.close(); - backing = null; - continue; - } - Snapshot finalBacking = backing; - Cache finalCurCache = curCache; - return new Snapshot() { - private final Cache _curCache = finalCurCache; - private final Snapshot _backing = finalBacking; - private boolean _invalid = false; - private boolean _closed = false; - - private void doCache(JObjectKey key, Optional obj) { - _cacheTries.incrementAndGet(); - if (_invalid) - return; - - var globalCache = _cache.get(); - if (globalCache.version() != _curCache.version()) { - _invalid = true; - return; - } - - var newCache = globalCache.withPut(key, obj); - if (_cache.compareAndSet(globalCache, newCache)) - _cached.incrementAndGet(); - } - - private void maybeCache(JObjectKey key, Optional obj) { - if (obj.isEmpty()) { - doCache(key, obj); - return; - } - - var wrapper = obj.get(); - - if (!(wrapper instanceof JDataVersionedWrapperLazy lazy)) { - doCache(key, obj); - return; - } - - lazy.setCacheCallback(() -> { - if (_closed) { - Log.error("Cache callback called after close"); - System.exit(-1); - } - doCache(key, obj); - }); - return; - } - - @Override - public List>> getIterator(IteratorStart start, JObjectKey key) { - return ListUtils.prependAndMap( - new NavigableMapKvIterator>(_curCache.map(), start, key), - _backing.getIterator(start, key), - i -> new CachingKvIterator((CloseableKvIterator) (CloseableKvIterator) i) - ); - } - - @Nonnull - @Override - public Optional readObject(JObjectKey name) { - var cached = _curCache.map().get(name); - if (cached != null) { - return switch (cached) { - case CacheEntryPresent data -> Optional.of(data.value()); - case CacheEntryMiss tombstone -> { - yield Optional.empty(); - } - default -> throw new IllegalStateException("Unexpected value: " + cached); - }; - } - var read = _backing.readObject(name); - maybeCache(name, read); - return read; - } - - @Override - public long id() { - return _backing.id(); - } - - @Override - public void close() { - _closed = true; - _backing.close(); - } - - private class CachingKvIterator implements CloseableKvIterator> { - private final CloseableKvIterator _delegate; - - private CachingKvIterator(CloseableKvIterator delegate) { - _delegate = delegate; - } - - @Override - public JObjectKey peekNextKey() { - return _delegate.peekNextKey(); - } - - @Override - public void skip() { - _delegate.skip(); - } - - @Override - public void close() { - _delegate.close(); - } - - @Override - public boolean hasNext() { - return _delegate.hasNext(); - } - - @Override - public JObjectKey peekPrevKey() { - return _delegate.peekPrevKey(); - } - - @Override - public Pair> prev() { - var prev = _delegate.prev(); - maybeCache(prev.getKey(), Optional.of(prev.getValue())); - return (Pair>) (Pair) prev; - } - - @Override - public boolean hasPrev() { - return _delegate.hasPrev(); - } - - @Override - public void skipPrev() { - _delegate.skipPrev(); - } - - @Override - public Pair> next() { - var next = _delegate.next(); - maybeCache(next.getKey(), Optional.of(next.getValue())); - return (Pair>) (Pair) next; - } - } - }; - } catch (Throwable ex) { - if (backing != null) { - backing.close(); - } - throw ex; - } - } - } - - private interface CacheEntry extends MaybeTombstone { - int size(); - } - - private record Cache(TreePMap map, - int size, - long version, - int sizeLimit) { - public Cache withPut(JObjectKey key, Optional obj) { - var entry = obj.map(o -> new CacheEntryPresent(o, o.estimateSize())).orElse(new CacheEntryMiss()); - - int newSize = size() + entry.size(); - - var old = map.get(key); - if (old != null) - newSize -= old.size(); - - TreePMap newCache = map(); - - while (newSize > sizeLimit) { - var del = newCache.firstEntry(); - newCache = newCache.minusFirstEntry(); - newSize -= del.getValue().size(); + private void doCache(JObjectKey key, JDataVersionedWrapper obj) { + if (_invalid) + return; + put(key, obj); } - newCache = newCache.plus(key, entry); - return new Cache( - newCache, - newSize, - version, - sizeLimit - ); - } + private void maybeCache(JObjectKey key, JDataVersionedWrapper obj) { + if (_invalid) { + return; + } - public Cache withVersion(long version) { - return new Cache(map, size, version, sizeLimit); - } - } + if (!(obj instanceof JDataVersionedWrapperLazy lazy)) { + return; + } - private record CacheEntryPresent(JDataVersionedWrapper value, - int size) implements CacheEntry, Data { - } + lazy.setCacheCallback(() -> { + if (_closed) { + Log.error("Cache callback called after close"); + System.exit(-1); + } + doCache(key, obj); + }); + return; + } - private record CacheEntryMiss() implements CacheEntry, Tombstone { - @Override - public int size() { - return 64; - } + @Override + public List>> getIterator(IteratorStart start, JObjectKey key) { + return ListUtils.map(_backing.getIterator(start, key), + i -> new CachingKvIterator((CloseableKvIterator) (CloseableKvIterator) i) + ); + } + + private Optional tryGet(JObjectKey key) { + var cached = get(key); + if (cached != null) { + if (cached.version() > _backing.id()) { + _invalid = true; + } else { + return Optional.of(cached); + } + } + return Optional.empty(); + } + + @Nonnull + @Override + public Optional readObject(JObjectKey name) { + return tryGet(name).or(() -> { + var read = _backing.readObject(name); + read.ifPresent(r -> maybeCache(name, r)); + return read; + }); + } + + @Override + public long id() { + return _backing.id(); + } + + @Override + public void close() { + _closed = true; + _backing.close(); + } + + private class CachingKvIterator implements CloseableKvIterator> { + private final CloseableKvIterator _delegate; + + private CachingKvIterator(CloseableKvIterator delegate) { + _delegate = delegate; + } + + @Override + public JObjectKey peekNextKey() { + return _delegate.peekNextKey(); + } + + @Override + public void skip() { + _delegate.skip(); + } + + @Override + public void close() { + _delegate.close(); + } + + @Override + public boolean hasNext() { + return _delegate.hasNext(); + } + + @Override + public JObjectKey peekPrevKey() { + return _delegate.peekPrevKey(); + } + + @Override + public Pair> prev() { + var prev = _delegate.prev(); + if (prev.getValue() instanceof Data data) { + var tried = tryGet(prev.getKey()); + if (tried.isPresent()) { + return Pair.of(prev.getKey(), tried.get()); + } + maybeCache(prev.getKey(), data.value()); + } + return (Pair>) (Pair) prev; + } + + @Override + public boolean hasPrev() { + return _delegate.hasPrev(); + } + + @Override + public void skipPrev() { + _delegate.skipPrev(); + } + + @Override + public Pair> next() { + var next = _delegate.next(); + if (next.getValue() instanceof Data data) { + var tried = tryGet(next.getKey()); + if (tried.isPresent()) { + return Pair.of(next.getKey(), tried.get()); + } + maybeCache(next.getKey(), data.value()); + } + return (Pair>) (Pair) next; + } + } + }; } }