more iterator fixes

This commit is contained in:
2025-02-23 12:12:47 +01:00
parent 922bdf226c
commit 6924c70cd4
7 changed files with 205 additions and 69 deletions

View File

@@ -0,0 +1,11 @@
package com.usatiuk.dhfs.objects;
public class InvalidIteratorException extends RuntimeException {
public InvalidIteratorException() {
super();
}
public InvalidIteratorException(String message) {
super(message);
}
}

View File

@@ -0,0 +1,66 @@
package com.usatiuk.dhfs.objects;
import io.quarkus.logging.Log;
import org.apache.commons.lang3.tuple.Pair;
import java.util.concurrent.locks.Lock;
import java.util.function.Supplier;
public class InvalidatableKvIterator<K extends Comparable<K>, V> implements CloseableKvIterator<K, V> {
private final CloseableKvIterator<K, V> _backing;
private final Supplier<Long> _versionSupplier;
private final long _version;
private final Lock _lock;
public InvalidatableKvIterator(CloseableKvIterator<K, V> backing, Supplier<Long> versionSupplier, Lock lock) {
_backing = backing;
_versionSupplier = versionSupplier;
_lock = lock;
_version = _versionSupplier.get();
}
private void checkVersion() {
if (_versionSupplier.get() != _version) {
Log.errorv("Version mismatch: {0} != {1}", _versionSupplier.get(), _version);
throw new InvalidIteratorException();
}
}
@Override
public K peekNextKey() {
_lock.lock();
try {
checkVersion();
return _backing.peekNextKey();
} finally {
_lock.unlock();
}
}
@Override
public void close() {
_backing.close();
}
@Override
public boolean hasNext() {
_lock.lock();
try {
checkVersion();
return _backing.hasNext();
} finally {
_lock.unlock();
}
}
@Override
public Pair<K, V> next() {
_lock.lock();
try {
checkVersion();
return _backing.next();
} finally {
_lock.unlock();
}
}
}

View File

@@ -1,69 +1,84 @@
package com.usatiuk.dhfs.objects;
import com.usatiuk.dhfs.objects.persistence.IteratorStart;
import io.quarkus.logging.Log;
import org.apache.commons.lang3.tuple.Pair;
import java.util.NoSuchElementException;
import java.util.concurrent.locks.Lock;
import java.util.function.Function;
import java.util.function.Supplier;
// Also checks that the next provided item is always consistent after a refresh
public class SelfRefreshingKvIterator<K extends Comparable<K>, V> implements CloseableKvIterator<K, V> {
private CloseableKvIterator<K, V> _backing;
private long _lastRefreshed = -1L;
private Pair<K, V> _next;
private final Object _synchronizer;
private final Supplier<CloseableKvIterator<K, V>> _iteratorSupplier;
private long _curVersion = -1L;
private final Lock _lock;
private final Function<Pair<IteratorStart, K>, CloseableKvIterator<K, V>> _iteratorSupplier;
private final Supplier<Long> _versionSupplier;
private Pair<K, V> _next;
public SelfRefreshingKvIterator(Supplier<CloseableKvIterator<K, V>> iteratorSupplier, Supplier<Long> versionSupplier, Object synchronizer) {
public SelfRefreshingKvIterator(Function<Pair<IteratorStart, K>, CloseableKvIterator<K, V>> iteratorSupplier, Supplier<Long> versionSupplier, Lock lock,
IteratorStart start, K key) {
_iteratorSupplier = iteratorSupplier;
_versionSupplier = versionSupplier;
_synchronizer = synchronizer;
_lock = lock;
synchronized (_synchronizer) {
_lock.lock();
try {
long curVersion = _versionSupplier.get();
_backing = _iteratorSupplier.get();
_backing = _iteratorSupplier.apply(Pair.of(start, key));
_next = _backing.hasNext() ? _backing.next() : null;
// if (_next != null)
// assert _next.getValue().version() <= _id;
_lastRefreshed = curVersion;
_curVersion = curVersion;
} finally {
_lock.unlock();
}
}
private void doRefresh() {
long curVersion = _versionSupplier.get();
if (curVersion == _lastRefreshed) {
return;
}
if (_next == null) return;
synchronized (_synchronizer) {
curVersion = _versionSupplier.get();
Log.tracev("Refreshing iterator last refreshed {0}, current version {1}", _lastRefreshed, curVersion);
_backing.close();
_backing = _iteratorSupplier.get();
private void maybeRefresh() {
_lock.lock();
CloseableKvIterator<K, V> oldBacking = null;
try {
if (_versionSupplier.get() == _curVersion) {
return;
}
long newVersion = _versionSupplier.get();
Log.tracev("Refreshing iterator last refreshed {0}, current version {1}", _curVersion, newVersion);
oldBacking = _backing;
_backing = _iteratorSupplier.apply(Pair.of(IteratorStart.GE, _next.getKey()));
var next = _backing.hasNext() ? _backing.next() : null;
if (next == null) {
Log.errorv("Failed to refresh iterator, null last refreshed {0}," +
" current version {1}, current value {2}", _lastRefreshed, curVersion, next);
" current version {1}, current value {2}", _curVersion, newVersion, next);
assert false;
} else if (!next.equals(_next)) {
Log.errorv("Failed to refresh iterator, mismatch last refreshed {0}," +
" current version {1}, current value {2}, read value {3}", _lastRefreshed, curVersion, _next, next);
" current version {1}, current value {2}, read value {3}", _curVersion, newVersion, _next, next);
assert false;
}
_next = next;
_lastRefreshed = curVersion;
_curVersion = newVersion;
} finally {
_lock.unlock();
if (oldBacking != null) {
oldBacking.close();
}
}
}
// _next should always be valid, so it's ok to do the refresh "lazily"
private void prepareNext() {
doRefresh();
if (_backing.hasNext()) {
_next = _backing.next();
// assert _next.getValue().version() <= _id;
} else {
_next = null;
_lock.lock();
try {
maybeRefresh();
if (_backing.hasNext()) {
_next = _backing.next();
} else {
_next = null;
}
} finally {
_lock.unlock();
}
}
@@ -91,7 +106,6 @@ public class SelfRefreshingKvIterator<K extends Comparable<K>, V> implements Clo
throw new NoSuchElementException("No more elements");
}
var ret = _next;
// assert ret.getValue().version() <= _id;
prepareNext();
Log.tracev("Read: {0}, next: {1}", ret, _next);
return ret;

View File

@@ -14,6 +14,7 @@ import java.lang.ref.Cleaner;
import java.util.*;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
@ApplicationScoped
@@ -21,6 +22,8 @@ public class SnapshotManager {
@Inject
WritebackObjectPersistentStore delegateStore;
private final ReentrantReadWriteLock _lock = new ReentrantReadWriteLock();
private interface SnapshotEntry {
long whenToRemove();
}
@@ -55,7 +58,8 @@ public class SnapshotManager {
}
Consumer<Runnable> commitTx(Collection<TxRecord.TxObjectRecord<?>> writes, long id) {
synchronized (this) {
_lock.writeLock().lock();
try {
assert id > _lastSnapshotId;
if (!_snapshotIds.isEmpty()) {
verify();
@@ -67,10 +71,9 @@ public class SnapshotManager {
Pair<SnapshotKey, SnapshotEntry> newSnapshotEntry = switch (current) {
case WritebackObjectPersistentStore.VerboseReadResultPersisted(
Optional<JDataVersionedWrapper> data
) -> {
yield Pair.of(new SnapshotKey(action.key(), Math.max(_snapshotIds.peek(), data.map(JDataVersionedWrapper::version).orElse(0L))),
data.<SnapshotEntry>map(o -> new SnapshotEntryObject(o, id)).orElse(new SnapshotEntryDeleted(id)));
}
) ->
Pair.of(new SnapshotKey(action.key(), Math.max(_snapshotIds.peek(), data.map(JDataVersionedWrapper::version).orElse(0L))),
data.<SnapshotEntry>map(o -> new SnapshotEntryObject(o, id)).orElse(new SnapshotEntryDeleted(id)));
case WritebackObjectPersistentStore.VerboseReadResultPending(
TxWriteback.PendingWriteEntry pending
) -> {
@@ -108,13 +111,19 @@ public class SnapshotManager {
}
verify();
// Commit under lock, iterators will see new version after the lock is released and writeback
// cache is updated
// TODO: Maybe writeback iterator being invalidated wouldn't be a problem?
return delegateStore.commitTx(writes, id);
} finally {
_lock.writeLock().unlock();
}
}
private void unrefSnapshot(long id) {
Log.tracev("Unref snapshot {0}", id);
synchronized (this) {
_lock.writeLock().lock();
try {
verify();
var refCount = _snapshotRefCounts.merge(id, -1L, (a, b) -> a + b == 0 ? null : a + b);
if (!(refCount == null && id == _lastAliveSnapshotId)) {
@@ -176,6 +185,8 @@ public class SnapshotManager {
curCount = _snapshotRefCounts.getOrDefault(curId, 0L);
} while (curCount == 0);
verify();
} finally {
_lock.writeLock().unlock();
}
}
@@ -201,7 +212,8 @@ public class SnapshotManager {
private Snapshot(long id) {
_id = id;
synchronized (SnapshotManager.this) {
_lock.writeLock().lock();
try {
verify();
if (_lastSnapshotId > id)
throw new IllegalSnapshotIdException("Snapshot id " + id + " is less than last snapshot id " + _lastSnapshotId);
@@ -212,6 +224,8 @@ public class SnapshotManager {
_snapshotIds.add(id);
}
verify();
} finally {
_lock.writeLock().unlock();
}
var closedRef = _closed;
var idRef = _id;
@@ -295,9 +309,6 @@ public class SnapshotManager {
}
// In case something was added to the snapshot, it is not guaranteed that the iterators will see it,
// so refresh them manually. Otherwise, it could be possible that something from the writeback cache will
// be served instead.
public class CheckingSnapshotKvIterator implements CloseableKvIterator<JObjectKey, JDataVersionedWrapper> {
private final CloseableKvIterator<JObjectKey, JDataVersionedWrapper> _backing;
@@ -329,14 +340,18 @@ public class SnapshotManager {
}
public CloseableKvIterator<JObjectKey, JDataVersionedWrapper> getIterator(IteratorStart start, JObjectKey key) {
return new CheckingSnapshotKvIterator(new SelfRefreshingKvIterator<>(() ->
new TombstoneMergingKvIterator<>(new SnapshotKvIterator(start, key),
new MappingKvIterator<>(delegateStore.getIterator(start, key), d -> switch (d) {
// In case something was added to the snapshot, it is not guaranteed that the iterators will see it,
// so refresh them manually. Otherwise, it could be possible that something from the writeback cache will
// be served instead. Note that refreshing the iterator will also refresh the writeback iterator,
// so it also should be consistent.
return new CheckingSnapshotKvIterator(new SelfRefreshingKvIterator<>((params) ->
new TombstoneMergingKvIterator<>(new SnapshotKvIterator(params.getLeft(), params.getRight()),
new MappingKvIterator<>(delegateStore.getIterator(params.getLeft(), params.getRight()), d -> switch (d) {
case TombstoneMergingKvIterator.Tombstone<JDataVersionedWrapper>() -> d;
case TombstoneMergingKvIterator.Data<JDataVersionedWrapper> data ->
data.value().version() <= _id ? data : new TombstoneMergingKvIterator.Tombstone<>();
default -> throw new IllegalStateException("Unexpected value: " + d);
})), _snapshotVersion::get, SnapshotManager.this));
})), _snapshotVersion::get, _lock.readLock(), start, key));
}
public CloseableKvIterator<JObjectKey, JDataVersionedWrapper> getIterator(JObjectKey key) {

View File

@@ -20,12 +20,15 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@ApplicationScoped
public class TxWritebackImpl implements TxWriteback {
private final LinkedList<TxBundleImpl> _pendingBundles = new LinkedList<>();
private final ReentrantReadWriteLock _pendingBundlesVersionLock = new ReentrantReadWriteLock();
private final ConcurrentSkipListMap<JObjectKey, PendingWriteEntry> _pendingWrites = new ConcurrentSkipListMap<>();
private final AtomicLong _pendingWritesVersion = new AtomicLong();
private final LinkedHashMap<Long, TxBundleImpl> _notFlushedBundles = new LinkedHashMap<>();
private final Object _flushWaitSynchronizer = new Object();
@@ -128,12 +131,14 @@ public class TxWritebackImpl implements TxWriteback {
Log.trace("Bundle " + bundle.getId() + " committed");
// Remove from pending writes, after real commit
synchronized (_pendingBundles) {
bundle._entries.values().forEach(e -> {
var cur = _pendingWrites.get(e.key());
if (cur.bundleId() <= bundle.getId())
_pendingWrites.remove(e.key(), cur);
});
// No need to increment version
}
List<List<Runnable>> callbacks = new ArrayList<>();
@@ -219,22 +224,28 @@ public class TxWritebackImpl implements TxWriteback {
@Override
public void commitBundle(TxBundle bundle) {
verifyReady();
synchronized (_pendingBundles) {
((TxBundleImpl) bundle).setReady();
((TxBundleImpl) bundle)._entries.values().forEach(e -> {
switch (e) {
case TxBundleImpl.CommittedEntry c ->
_pendingWrites.put(c.key(), new PendingWrite(c.data, bundle.getId()));
case TxBundleImpl.DeletedEntry d ->
_pendingWrites.put(d.key(), new PendingDelete(d.key, bundle.getId()));
default -> throw new IllegalStateException("Unexpected value: " + e);
_pendingBundlesVersionLock.writeLock().lock();
try {
synchronized (_pendingBundles) {
((TxBundleImpl) bundle).setReady();
((TxBundleImpl) bundle)._entries.values().forEach(e -> {
switch (e) {
case TxBundleImpl.CommittedEntry c ->
_pendingWrites.put(c.key(), new PendingWrite(c.data, bundle.getId()));
case TxBundleImpl.DeletedEntry d ->
_pendingWrites.put(d.key(), new PendingDelete(d.key, bundle.getId()));
default -> throw new IllegalStateException("Unexpected value: " + e);
}
});
_pendingWritesVersion.incrementAndGet();
if (_pendingBundles.peek() == bundle)
_pendingBundles.notify();
synchronized (_flushWaitSynchronizer) {
currentSize += ((TxBundleImpl) bundle).calculateTotalSize();
}
});
if (_pendingBundles.peek() == bundle)
_pendingBundles.notify();
synchronized (_flushWaitSynchronizer) {
currentSize += ((TxBundleImpl) bundle).calculateTotalSize();
}
} finally {
_pendingBundlesVersionLock.writeLock().unlock();
}
}
@@ -378,14 +389,20 @@ public class TxWritebackImpl implements TxWriteback {
// Returns an iterator with a view of all commited objects
// Does not have to guarantee consistent view, snapshots are handled by upper layers
// Invalidated by commitBundle, but might return data after it has been really committed
@Override
public CloseableKvIterator<JObjectKey, TombstoneMergingKvIterator.DataType<JDataVersionedWrapper>> getIterator(IteratorStart start, JObjectKey key) {
return new MappingKvIterator<>(
new NavigableMapKvIterator<>(_pendingWrites, start, key),
e -> switch (e) {
case PendingWrite p -> new TombstoneMergingKvIterator.Data<>(p.data());
case PendingDelete d -> new TombstoneMergingKvIterator.Tombstone<>();
default -> throw new IllegalStateException("Unexpected value: " + e);
});
_pendingBundlesVersionLock.readLock().lock();
try {
return new InvalidatableKvIterator<>(new MappingKvIterator<>(
new NavigableMapKvIterator<>(_pendingWrites, start, key),
e -> switch (e) {
case PendingWrite p -> new TombstoneMergingKvIterator.Data<>(p.data());
case PendingDelete d -> new TombstoneMergingKvIterator.Tombstone<>();
default -> throw new IllegalStateException("Unexpected value: " + e);
}), _pendingWritesVersion::get, _pendingBundlesVersionLock.readLock());
} finally {
_pendingBundlesVersionLock.readLock().unlock();
}
}
}

View File

@@ -11,6 +11,8 @@ import javax.annotation.Nonnull;
import java.util.Collection;
import java.util.HashSet;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
@ApplicationScoped
@@ -19,6 +21,8 @@ public class WritebackObjectPersistentStore {
CachingObjectPersistentStore delegate;
@Inject
TxWriteback txWriteback;
private final AtomicLong _commitCounter = new AtomicLong(0);
private final ReentrantReadWriteLock _lock = new ReentrantReadWriteLock();
@Nonnull
public Collection<JObjectKey> findAllObjects() {
@@ -88,6 +92,7 @@ public class WritebackObjectPersistentStore {
Log.tracef("Committing transaction %d to storage", id);
txWriteback.commitBundle(bundle);
_commitCounter.incrementAndGet();
long bundleId = bundle.getId();
@@ -96,9 +101,16 @@ public class WritebackObjectPersistentStore {
// Returns an iterator with a view of all commited objects
// Does not have to guarantee consistent view, snapshots are handled by upper layers
// Should be refreshed after each commit
public CloseableKvIterator<JObjectKey, TombstoneMergingKvIterator.DataType<JDataVersionedWrapper>> getIterator(IteratorStart start, JObjectKey key) {
return new MergingKvIterator<>(txWriteback.getIterator(start, key),
new MappingKvIterator<>(delegate.getIterator(start, key), TombstoneMergingKvIterator.Data::new));
_lock.readLock().lock();
try {
return new InvalidatableKvIterator<>(new MergingKvIterator<>(txWriteback.getIterator(start, key),
new MappingKvIterator<>(delegate.getIterator(start, key), TombstoneMergingKvIterator.Data::new)),
_commitCounter::get, _lock.readLock());
} finally {
_lock.readLock().unlock();
}
}
public CloseableKvIterator<JObjectKey, TombstoneMergingKvIterator.DataType<JDataVersionedWrapper>> getIterator(JObjectKey key) {

View File

@@ -113,6 +113,7 @@ public class CachingObjectPersistentStore {
}
}
delegate.commitTx(names);
// Now, reading from the backing store should return the new data
synchronized (_cache) {
for (var key : Stream.concat(names.written().stream().map(Pair::getLeft),
names.deleted().stream()).toList()) {