From 8ab034402d768eb527fdaadae12e5d228769a56a Mon Sep 17 00:00:00 2001 From: Stepan Usatiuk Date: Tue, 29 Apr 2025 20:36:19 +0200 Subject: [PATCH] Revert "Objects: simplify cache" This reverts commit d94d11ec8be3eaf9e4c3e9c76b8a28416c5b3efd. --- .../stores/CachingObjectPersistentStore.java | 400 ++++++++++-------- 1 file changed, 231 insertions(+), 169 deletions(-) diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/stores/CachingObjectPersistentStore.java b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/stores/CachingObjectPersistentStore.java index 8c33a04d..85ad5e32 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/stores/CachingObjectPersistentStore.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/stores/CachingObjectPersistentStore.java @@ -3,77 +3,59 @@ package com.usatiuk.objects.stores; import com.usatiuk.objects.JDataVersionedWrapper; import com.usatiuk.objects.JDataVersionedWrapperLazy; import com.usatiuk.objects.JObjectKey; -import com.usatiuk.objects.iterators.CloseableKvIterator; -import com.usatiuk.objects.iterators.Data; -import com.usatiuk.objects.iterators.IteratorStart; -import com.usatiuk.objects.iterators.MaybeTombstone; +import com.usatiuk.objects.iterators.*; import com.usatiuk.objects.snapshot.Snapshot; import com.usatiuk.utils.ListUtils; import io.quarkus.logging.Log; import io.quarkus.runtime.StartupEvent; -import jakarta.annotation.Nullable; import jakarta.annotation.Priority; import jakarta.enterprise.context.ApplicationScoped; import jakarta.enterprise.event.Observes; import jakarta.inject.Inject; import org.apache.commons.lang3.tuple.Pair; import org.eclipse.microprofile.config.inject.ConfigProperty; +import org.pcollections.TreePMap; import javax.annotation.Nonnull; -import java.util.LinkedHashMap; import java.util.List; import java.util.Optional; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; @ApplicationScoped public class CachingObjectPersistentStore { + private final AtomicReference _cache; @Inject SerializingObjectPersistentStore delegate; - @ConfigProperty(name = "dhfs.objects.lru.print-stats") boolean printStats; - @ConfigProperty(name = "dhfs.objects.lru.limit") - int sizeLimit; - private ExecutorService _commitExecutor; private ExecutorService _statusExecutor; + private AtomicLong _cached = new AtomicLong(); + private AtomicLong _cacheTries = new AtomicLong(); - private final LinkedHashMap _cache = new LinkedHashMap<>(8, 0.75f, true); - private long _curSize = 0; - - private void put(JObjectKey key, JDataVersionedWrapper obj) { - synchronized (_cache) { - int size = obj.estimateSize(); - - _curSize += size; - var old = _cache.putLast(key, obj); - if (old != null) - _curSize -= old.estimateSize(); - - while (_curSize >= sizeLimit) { - var del = _cache.pollFirstEntry(); - _curSize -= del.getValue().estimateSize(); -// _evict++; - } - } + public CachingObjectPersistentStore(@ConfigProperty(name = "dhfs.objects.lru.limit") int sizeLimit) { + _cache = new AtomicReference<>( + new Cache(TreePMap.empty(), 0, -1, sizeLimit) + ); } - private @Nullable JDataVersionedWrapper get(JObjectKey key) { - synchronized (_cache) { - return _cache.get(key); - } - } - - void init(@Observes @Priority(110) StartupEvent event) { + try (var s = delegate.getSnapshot()) { + _cache.set(_cache.get().withVersion(s.id())); + } + _commitExecutor = Executors.newSingleThreadExecutor(); if (printStats) { _statusExecutor = Executors.newSingleThreadExecutor(); _statusExecutor.submit(() -> { try { while (true) { - Log.infov("Cache status: size=" + _curSize / 1024 / 1024 + "MB"); + Log.infov("Cache status: size=" + _cache.get().size() / 1024 / 1024 + "MB" + " cache success ratio: " + (_cached.get() / (double) _cacheTries.get())); + _cached.set(0); + _cacheTries.set(0); Thread.sleep(1000); } } catch (InterruptedException ignored) { @@ -85,162 +67,242 @@ public class CachingObjectPersistentStore { public void commitTx(TxManifestObj objs, long txId) { Log.tracev("Committing: {0} writes, {1} deletes", objs.written().size(), objs.deleted().size()); - var commitFuture = _commitExecutor.submit( - () -> delegate - .prepareTx(objs, txId) - .run() - ); + var cache = _cache.get(); + var commitFuture = _commitExecutor.submit(() -> delegate.prepareTx(objs, txId).run()); for (var write : objs.written()) { - put(write.getKey(), write.getValue()); + cache = cache.withPut(write.getLeft(), Optional.of(write.getRight())); } + for (var del : objs.deleted()) { + cache = cache.withPut(del, Optional.empty()); + } + cache = cache.withVersion(txId); try { commitFuture.get(); } catch (Exception e) { throw new RuntimeException(e); } + _cache.set(cache); Log.tracev("Committed: {0} writes, {1} deletes", objs.written().size(), objs.deleted().size()); } public Snapshot getSnapshot() { - return new Snapshot() { - private final Snapshot _backing = delegate.getSnapshot(); - private boolean _invalid = false; - private boolean _closed = false; + while (true) { + var cache = _cache.get(); - private void doCache(JObjectKey key, JDataVersionedWrapper obj) { - if (_invalid) - return; - put(key, obj); - } + if (cache == null) + return delegate.getSnapshot(); - private void maybeCache(JObjectKey key, JDataVersionedWrapper obj) { - if (_invalid) { - return; + Cache curCache = null; + Snapshot backing = null; + + try { + curCache = _cache.get(); + backing = delegate.getSnapshot(); + + if (curCache.version() != backing.id()) { + backing.close(); + backing = null; + continue; } + Snapshot finalBacking = backing; + Cache finalCurCache = curCache; + return new Snapshot() { + private final Cache _curCache = finalCurCache; + private final Snapshot _backing = finalBacking; + private boolean _invalid = false; + private boolean _closed = false; - if (!(obj instanceof JDataVersionedWrapperLazy lazy)) { - return; - } + private void doCache(JObjectKey key, Optional obj) { + _cacheTries.incrementAndGet(); + if (_invalid) + return; - lazy.setCacheCallback(() -> { - if (_closed) { - Log.error("Cache callback called after close"); - System.exit(-1); - } - doCache(key, obj); - }); - return; - } - - @Override - public List>> getIterator(IteratorStart start, JObjectKey key) { - return ListUtils.map(_backing.getIterator(start, key), - i -> new CachingKvIterator((CloseableKvIterator) (CloseableKvIterator) i) - ); - } - - private Optional tryGet(JObjectKey key) { - var cached = get(key); - if (cached != null) { - if (cached.version() > _backing.id()) { - _invalid = true; - } else { - return Optional.of(cached); - } - } - return Optional.empty(); - } - - @Nonnull - @Override - public Optional readObject(JObjectKey name) { - return tryGet(name).or(() -> { - var read = _backing.readObject(name); - read.ifPresent(r -> maybeCache(name, r)); - return read; - }); - } - - @Override - public long id() { - return _backing.id(); - } - - @Override - public void close() { - _closed = true; - _backing.close(); - } - - private class CachingKvIterator implements CloseableKvIterator> { - private final CloseableKvIterator _delegate; - - private CachingKvIterator(CloseableKvIterator delegate) { - _delegate = delegate; - } - - @Override - public JObjectKey peekNextKey() { - return _delegate.peekNextKey(); - } - - @Override - public void skip() { - _delegate.skip(); - } - - @Override - public void close() { - _delegate.close(); - } - - @Override - public boolean hasNext() { - return _delegate.hasNext(); - } - - @Override - public JObjectKey peekPrevKey() { - return _delegate.peekPrevKey(); - } - - @Override - public Pair> prev() { - var prev = _delegate.prev(); - if (prev.getValue() instanceof Data data) { - var tried = tryGet(prev.getKey()); - if (tried.isPresent()) { - return Pair.of(prev.getKey(), tried.get()); + var globalCache = _cache.get(); + if (globalCache.version() != _curCache.version()) { + _invalid = true; + return; } - maybeCache(prev.getKey(), data.value()); + + var newCache = globalCache.withPut(key, obj); + if (_cache.compareAndSet(globalCache, newCache)) + _cached.incrementAndGet(); } - return (Pair>) (Pair) prev; - } - @Override - public boolean hasPrev() { - return _delegate.hasPrev(); - } - - @Override - public void skipPrev() { - _delegate.skipPrev(); - } - - @Override - public Pair> next() { - var next = _delegate.next(); - if (next.getValue() instanceof Data data) { - var tried = tryGet(next.getKey()); - if (tried.isPresent()) { - return Pair.of(next.getKey(), tried.get()); + private void maybeCache(JObjectKey key, Optional obj) { + if (obj.isEmpty()) { + doCache(key, obj); + return; } - maybeCache(next.getKey(), data.value()); + + var wrapper = obj.get(); + + if (!(wrapper instanceof JDataVersionedWrapperLazy lazy)) { + doCache(key, obj); + return; + } + + lazy.setCacheCallback(() -> { + if (_closed) { + Log.error("Cache callback called after close"); + System.exit(-1); + } + doCache(key, obj); + }); + return; } - return (Pair>) (Pair) next; + + @Override + public List>> getIterator(IteratorStart start, JObjectKey key) { + return ListUtils.prependAndMap( + new NavigableMapKvIterator>(_curCache.map(), start, key), + _backing.getIterator(start, key), + i -> new CachingKvIterator((CloseableKvIterator) (CloseableKvIterator) i) + ); + } + + @Nonnull + @Override + public Optional readObject(JObjectKey name) { + var cached = _curCache.map().get(name); + if (cached != null) { + return switch (cached) { + case CacheEntryPresent data -> Optional.of(data.value()); + case CacheEntryMiss tombstone -> { + yield Optional.empty(); + } + default -> throw new IllegalStateException("Unexpected value: " + cached); + }; + } + var read = _backing.readObject(name); + maybeCache(name, read); + return read; + } + + @Override + public long id() { + return _backing.id(); + } + + @Override + public void close() { + _closed = true; + _backing.close(); + } + + private class CachingKvIterator implements CloseableKvIterator> { + private final CloseableKvIterator _delegate; + + private CachingKvIterator(CloseableKvIterator delegate) { + _delegate = delegate; + } + + @Override + public JObjectKey peekNextKey() { + return _delegate.peekNextKey(); + } + + @Override + public void skip() { + _delegate.skip(); + } + + @Override + public void close() { + _delegate.close(); + } + + @Override + public boolean hasNext() { + return _delegate.hasNext(); + } + + @Override + public JObjectKey peekPrevKey() { + return _delegate.peekPrevKey(); + } + + @Override + public Pair> prev() { + var prev = _delegate.prev(); + maybeCache(prev.getKey(), Optional.of(prev.getValue())); + return (Pair>) (Pair) prev; + } + + @Override + public boolean hasPrev() { + return _delegate.hasPrev(); + } + + @Override + public void skipPrev() { + _delegate.skipPrev(); + } + + @Override + public Pair> next() { + var next = _delegate.next(); + maybeCache(next.getKey(), Optional.of(next.getValue())); + return (Pair>) (Pair) next; + } + } + }; + } catch (Throwable ex) { + if (backing != null) { + backing.close(); } + throw ex; } - }; + } + } + + private interface CacheEntry extends MaybeTombstone { + int size(); + } + + private record Cache(TreePMap map, + int size, + long version, + int sizeLimit) { + public Cache withPut(JObjectKey key, Optional obj) { + var entry = obj.map(o -> new CacheEntryPresent(o, o.estimateSize())).orElse(new CacheEntryMiss()); + + int newSize = size() + entry.size(); + + var old = map.get(key); + if (old != null) + newSize -= old.size(); + + TreePMap newCache = map(); + + while (newSize > sizeLimit) { + var del = newCache.firstEntry(); + newCache = newCache.minusFirstEntry(); + newSize -= del.getValue().size(); + } + + newCache = newCache.plus(key, entry); + return new Cache( + newCache, + newSize, + version, + sizeLimit + ); + } + + public Cache withVersion(long version) { + return new Cache(map, size, version, sizeLimit); + } + } + + private record CacheEntryPresent(JDataVersionedWrapper value, + int size) implements CacheEntry, Data { + } + + private record CacheEntryMiss() implements CacheEntry, Tombstone { + @Override + public int size() { + return 64; + } } }