safer cache iterator

This commit is contained in:
2025-02-23 12:38:37 +01:00
parent 6924c70cd4
commit 9b2dbe01f1
3 changed files with 173 additions and 23 deletions

View File

@@ -0,0 +1,125 @@
package com.usatiuk.dhfs.objects;
import com.usatiuk.dhfs.objects.persistence.IteratorStart;
import io.quarkus.logging.Log;
import org.apache.commons.lang3.tuple.Pair;
import java.util.concurrent.locks.Lock;
import java.util.function.Function;
import java.util.function.Supplier;
// Also checks that the next provided item is always consistent after a refresh
public class InconsistentSelfRefreshingKvIterator<K extends Comparable<K>, V> implements CloseableKvIterator<K, V> {
private CloseableKvIterator<K, V> _backing;
private long _curVersion = -1L;
private final Lock _lock;
private final Function<Pair<IteratorStart, K>, CloseableKvIterator<K, V>> _iteratorSupplier;
private final Supplier<Long> _versionSupplier;
private K _lastReturnedKey = null;
private K _peekedKey = null;
private boolean _peekedNext = false;
private final Pair<IteratorStart, K> _initialStart;
public InconsistentSelfRefreshingKvIterator(Function<Pair<IteratorStart, K>, CloseableKvIterator<K, V>> iteratorSupplier, Supplier<Long> versionSupplier, Lock lock,
IteratorStart start, K key) {
_iteratorSupplier = iteratorSupplier;
_versionSupplier = versionSupplier;
_lock = lock;
_initialStart = Pair.of(start, key);
_lock.lock();
try {
long curVersion = _versionSupplier.get();
_backing = _iteratorSupplier.apply(Pair.of(start, key));
_curVersion = curVersion;
} finally {
_lock.unlock();
}
}
private void maybeRefresh() {
_lock.lock();
CloseableKvIterator<K, V> oldBacking = null;
try {
if (_versionSupplier.get() == _curVersion) {
return;
}
long newVersion = _versionSupplier.get();
Log.tracev("Refreshing iterator last refreshed {0}, current version {1}", _curVersion, newVersion);
oldBacking = _backing;
if (_peekedKey != null) {
_backing = _iteratorSupplier.apply(Pair.of(IteratorStart.GE, _peekedKey));
if (!_backing.hasNext() || !_backing.peekNextKey().equals(_peekedKey)) {
throw new StaleIteratorException();
}
} else if (_lastReturnedKey != null) {
_backing = _iteratorSupplier.apply(Pair.of(IteratorStart.GT, _lastReturnedKey));
} else {
_backing = _iteratorSupplier.apply(_initialStart);
}
if (_peekedNext && !_backing.hasNext()) {
throw new StaleIteratorException();
}
_curVersion = newVersion;
} finally {
_lock.unlock();
if (oldBacking != null) {
oldBacking.close();
}
}
}
@Override
public K peekNextKey() {
if (_peekedKey != null) {
return _peekedKey;
}
_lock.lock();
try {
maybeRefresh();
_peekedKey = _backing.peekNextKey();
_peekedNext = true;
return _peekedKey;
} finally {
_lock.unlock();
}
}
@Override
public void close() {
_backing.close();
}
@Override
public boolean hasNext() {
if (_peekedNext) {
return true;
}
_lock.lock();
try {
maybeRefresh();
_peekedNext = _backing.hasNext();
return _peekedNext;
} finally {
_lock.unlock();
}
}
@Override
public Pair<K, V> next() {
_lock.lock();
try {
maybeRefresh();
var got = _backing.next();
_peekedNext = false;
_peekedKey = null;
_lastReturnedKey = got.getKey();
return got;
} finally {
_lock.unlock();
}
}
}

View File

@@ -0,0 +1,11 @@
package com.usatiuk.dhfs.objects;
public class StaleIteratorException extends RuntimeException {
public StaleIteratorException() {
super();
}
public StaleIteratorException(String message) {
super(message);
}
}

View File

@@ -17,12 +17,18 @@ import java.util.Optional;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Stream;
@ApplicationScoped
public class CachingObjectPersistentStore {
private final LinkedHashMap<JObjectKey, CacheEntry> _cache = new LinkedHashMap<>(8, 0.75f, true);
private final ConcurrentSkipListMap<JObjectKey, CacheEntry> _sortedCache = new ConcurrentSkipListMap<>();
private final AtomicLong _cacheVersion = new AtomicLong(0);
private final ReentrantReadWriteLock _cacheVersionLock = new ReentrantReadWriteLock();
private final HashSet<JObjectKey> _pendingWrites = new HashSet<>();
private final DataLocker _locker = new DataLocker();
@Inject
@@ -99,27 +105,33 @@ public class CachingObjectPersistentStore {
}
public void commitTx(TxManifestObj<? extends JDataVersionedWrapper> names) {
// During commit, readObject shouldn't be called for these items,
// it should be handled by the upstream store
synchronized (_cache) {
for (var key : Stream.concat(names.written().stream().map(Pair::getLeft),
names.deleted().stream()).toList()) {
_curSize -= Optional.ofNullable(_cache.get(key)).map(CacheEntry::size).orElse(0L);
_cache.remove(key);
_sortedCache.remove(key);
_cacheVersionLock.writeLock().lock();
try {
// During commit, readObject shouldn't be called for these items,
// it should be handled by the upstream store
synchronized (_cache) {
for (var key : Stream.concat(names.written().stream().map(Pair::getLeft),
names.deleted().stream()).toList()) {
_curSize -= Optional.ofNullable(_cache.get(key)).map(CacheEntry::size).orElse(0L);
_cache.remove(key);
_sortedCache.remove(key);
// Log.tracev("Removing {0} from cache", key);
var added = _pendingWrites.add(key);
assert added;
var added = _pendingWrites.add(key);
assert added;
}
}
}
delegate.commitTx(names);
// Now, reading from the backing store should return the new data
synchronized (_cache) {
for (var key : Stream.concat(names.written().stream().map(Pair::getLeft),
names.deleted().stream()).toList()) {
var removed = _pendingWrites.remove(key);
assert removed;
delegate.commitTx(names);
// Now, reading from the backing store should return the new data
synchronized (_cache) {
for (var key : Stream.concat(names.written().stream().map(Pair::getLeft),
names.deleted().stream()).toList()) {
var removed = _pendingWrites.remove(key);
assert removed;
}
}
_cacheVersion.incrementAndGet();
} finally {
_cacheVersionLock.writeLock().unlock();
}
}
@@ -159,11 +171,13 @@ public class CachingObjectPersistentStore {
// Warning: it has a nasty side effect of global caching, so in this case don't even call next on it,
// if some objects are still in writeback
public CloseableKvIterator<JObjectKey, JDataVersionedWrapper> getIterator(IteratorStart start, JObjectKey key) {
return new MergingKvIterator<>(
new PredicateKvIterator<>(
new NavigableMapKvIterator<>(_sortedCache, start, key),
e -> e.object().orElse(null)
), new CachingKvIterator(delegate.getIterator(start, key)));
return new InconsistentSelfRefreshingKvIterator<>(
(bp) -> new MergingKvIterator<>(
new PredicateKvIterator<>(
new NavigableMapKvIterator<>(_sortedCache, bp.getLeft(), bp.getRight()),
e -> e.object().orElse(null)
), new CachingKvIterator(delegate.getIterator(bp.getLeft(), bp.getRight()))), _cacheVersion::get,
_cacheVersionLock.readLock(), start, key);
}
public CloseableKvIterator<JObjectKey, JDataVersionedWrapper> getIterator(JObjectKey key) {