add check in cache that we don't put stale info there

This commit is contained in:
2025-02-23 10:26:47 +01:00
parent c7104e772e
commit 70db929051
2 changed files with 18 additions and 4 deletions

View File

@@ -296,6 +296,9 @@ public class SnapshotManager {
}
// In case something was added to the snapshot, it is not guaranteed that the iterators will see it,
// so refresh them manually. Otherwise, it could be possible that something from the writeback cache will
// be served instead.
public class AutoRefreshingSnapshotKvIterator implements CloseableKvIterator<JObjectKey, JDataVersionedWrapper> {
private CloseableKvIterator<JObjectKey, JDataVersionedWrapper> _backing;
private long _lastRefreshed = -1L;
@@ -334,23 +337,22 @@ public class SnapshotManager {
_backing = new TombstoneMergingKvIterator<>(new SnapshotKvIterator(IteratorStart.GE, _next.getKey()),
new MappingKvIterator<>(delegateStore.getIterator(IteratorStart.GE, _next.getKey()), _downstreamTombstoneMapper));
var next = _backing.hasNext() ? _backing.next() : null;
boolean fail = false;
if (next == null) {
Log.errorv("Failed to refresh snapshot iterator, null {0}, last refreshed {1}," +
" current version {2}, current value {3}", _id, _lastRefreshed, curVersion, next);
fail = true;
assert false;
} else if (!next.equals(_next)) {
Log.errorv("Failed to refresh snapshot iterator, mismatch {0}, last refreshed {1}," +
" current version {2}, current value {3}, read value {4}", _id, _lastRefreshed, curVersion, _next, next);
fail = true;
assert false;
}
assert !fail;
_next = next;
_lastRefreshed = curVersion;
}
}
// _next should always be valid, so it's ok to do the refresh "lazily"
private void prepareNext() {
doRefresh();
if (_backing.hasNext()) {

View File

@@ -11,6 +11,7 @@ import org.eclipse.microprofile.config.inject.ConfigProperty;
import javax.annotation.Nonnull;
import java.util.Collection;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.Optional;
import java.util.concurrent.ConcurrentSkipListMap;
@@ -22,6 +23,7 @@ import java.util.stream.Stream;
public class CachingObjectPersistentStore {
private final LinkedHashMap<JObjectKey, CacheEntry> _cache = new LinkedHashMap<>(8, 0.75f, true);
private final ConcurrentSkipListMap<JObjectKey, CacheEntry> _sortedCache = new ConcurrentSkipListMap<>();
private final HashSet<JObjectKey> _pendingWrites = new HashSet<>();
private final DataLocker _locker = new DataLocker();
@Inject
SerializingObjectPersistentStore delegate;
@@ -61,6 +63,7 @@ public class CachingObjectPersistentStore {
private void put(JObjectKey key, Optional<JDataVersionedWrapper> obj) {
// Log.tracev("Adding {0} to cache: {1}", key, obj);
synchronized (_cache) {
assert !_pendingWrites.contains(key);
int size = obj.map(o -> o.data().estimateSize()).orElse(0);
_curSize += size;
@@ -105,9 +108,18 @@ public class CachingObjectPersistentStore {
_cache.remove(key);
_sortedCache.remove(key);
// Log.tracev("Removing {0} from cache", key);
var added = _pendingWrites.add(key);
assert added;
}
}
delegate.commitTx(names);
synchronized (_cache) {
for (var key : Stream.concat(names.written().stream().map(Pair::getLeft),
names.deleted().stream()).toList()) {
var removed = _pendingWrites.remove(key);
assert removed;
}
}
}