separate SelfRefreshingKvIterator

This commit is contained in:
2025-02-23 10:39:28 +01:00
parent 70db929051
commit 922bdf226c
2 changed files with 115 additions and 74 deletions

View File

@@ -0,0 +1,100 @@
package com.usatiuk.dhfs.objects;
import io.quarkus.logging.Log;
import org.apache.commons.lang3.tuple.Pair;
import java.util.NoSuchElementException;
import java.util.function.Supplier;
public class SelfRefreshingKvIterator<K extends Comparable<K>, V> implements CloseableKvIterator<K, V> {
private CloseableKvIterator<K, V> _backing;
private long _lastRefreshed = -1L;
private Pair<K, V> _next;
private final Object _synchronizer;
private final Supplier<CloseableKvIterator<K, V>> _iteratorSupplier;
private final Supplier<Long> _versionSupplier;
public SelfRefreshingKvIterator(Supplier<CloseableKvIterator<K, V>> iteratorSupplier, Supplier<Long> versionSupplier, Object synchronizer) {
_iteratorSupplier = iteratorSupplier;
_versionSupplier = versionSupplier;
_synchronizer = synchronizer;
synchronized (_synchronizer) {
long curVersion = _versionSupplier.get();
_backing = _iteratorSupplier.get();
_next = _backing.hasNext() ? _backing.next() : null;
// if (_next != null)
// assert _next.getValue().version() <= _id;
_lastRefreshed = curVersion;
}
}
private void doRefresh() {
long curVersion = _versionSupplier.get();
if (curVersion == _lastRefreshed) {
return;
}
if (_next == null) return;
synchronized (_synchronizer) {
curVersion = _versionSupplier.get();
Log.tracev("Refreshing iterator last refreshed {0}, current version {1}", _lastRefreshed, curVersion);
_backing.close();
_backing = _iteratorSupplier.get();
var next = _backing.hasNext() ? _backing.next() : null;
if (next == null) {
Log.errorv("Failed to refresh iterator, null last refreshed {0}," +
" current version {1}, current value {2}", _lastRefreshed, curVersion, next);
assert false;
} else if (!next.equals(_next)) {
Log.errorv("Failed to refresh iterator, mismatch last refreshed {0}," +
" current version {1}, current value {2}, read value {3}", _lastRefreshed, curVersion, _next, next);
assert false;
}
_next = next;
_lastRefreshed = curVersion;
}
}
// _next should always be valid, so it's ok to do the refresh "lazily"
private void prepareNext() {
doRefresh();
if (_backing.hasNext()) {
_next = _backing.next();
// assert _next.getValue().version() <= _id;
} else {
_next = null;
}
}
@Override
public K peekNextKey() {
if (_next == null) {
throw new NoSuchElementException();
}
return _next.getKey();
}
@Override
public void close() {
_backing.close();
}
@Override
public boolean hasNext() {
return _next != null;
}
@Override
public Pair<K, V> next() {
if (_next == null) {
throw new NoSuchElementException("No more elements");
}
var ret = _next;
// assert ret.getValue().version() <= _id;
prepareNext();
Log.tracev("Read: {0}, next: {1}", ret, _next);
return ret;
}
}

View File

@@ -15,7 +15,6 @@ import java.util.*;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Function;
@ApplicationScoped
public class SnapshotManager {
@@ -299,76 +298,16 @@ public class SnapshotManager {
// In case something was added to the snapshot, it is not guaranteed that the iterators will see it,
// so refresh them manually. Otherwise, it could be possible that something from the writeback cache will
// be served instead.
public class AutoRefreshingSnapshotKvIterator implements CloseableKvIterator<JObjectKey, JDataVersionedWrapper> {
private CloseableKvIterator<JObjectKey, JDataVersionedWrapper> _backing;
private long _lastRefreshed = -1L;
private Pair<JObjectKey, JDataVersionedWrapper> _next;
public class CheckingSnapshotKvIterator implements CloseableKvIterator<JObjectKey, JDataVersionedWrapper> {
private final CloseableKvIterator<JObjectKey, JDataVersionedWrapper> _backing;
private final Function<TombstoneMergingKvIterator.DataType<JDataVersionedWrapper>, TombstoneMergingKvIterator.DataType<JDataVersionedWrapper>> _downstreamTombstoneMapper
= d -> switch (d) {
case TombstoneMergingKvIterator.Tombstone<JDataVersionedWrapper>() -> d;
case TombstoneMergingKvIterator.Data<JDataVersionedWrapper> data ->
data.value().version() <= _id ? data : new TombstoneMergingKvIterator.Tombstone<>();
default -> throw new IllegalStateException("Unexpected value: " + d);
};
public AutoRefreshingSnapshotKvIterator(IteratorStart start, JObjectKey key) {
synchronized (SnapshotManager.this) {
long curVersion = _snapshotVersion.get();
_backing = new TombstoneMergingKvIterator<>(new SnapshotKvIterator(start, key),
new MappingKvIterator<>(delegateStore.getIterator(start, key), _downstreamTombstoneMapper));
_next = _backing.hasNext() ? _backing.next() : null;
if (_next != null)
assert _next.getValue().version() <= _id;
_lastRefreshed = curVersion;
}
}
private void doRefresh() {
long curVersion = _snapshotVersion.get();
if (curVersion == _lastRefreshed) {
return;
}
if (_next == null) return;
synchronized (SnapshotManager.this) {
curVersion = _snapshotVersion.get();
Log.tracev("Refreshing snapshot iterator {0}, last refreshed {1}, current version {2}", _id, _lastRefreshed, curVersion);
_backing.close();
_backing = new TombstoneMergingKvIterator<>(new SnapshotKvIterator(IteratorStart.GE, _next.getKey()),
new MappingKvIterator<>(delegateStore.getIterator(IteratorStart.GE, _next.getKey()), _downstreamTombstoneMapper));
var next = _backing.hasNext() ? _backing.next() : null;
if (next == null) {
Log.errorv("Failed to refresh snapshot iterator, null {0}, last refreshed {1}," +
" current version {2}, current value {3}", _id, _lastRefreshed, curVersion, next);
assert false;
} else if (!next.equals(_next)) {
Log.errorv("Failed to refresh snapshot iterator, mismatch {0}, last refreshed {1}," +
" current version {2}, current value {3}, read value {4}", _id, _lastRefreshed, curVersion, _next, next);
assert false;
}
_next = next;
_lastRefreshed = curVersion;
}
}
// _next should always be valid, so it's ok to do the refresh "lazily"
private void prepareNext() {
doRefresh();
if (_backing.hasNext()) {
_next = _backing.next();
assert _next.getValue().version() <= _id;
} else {
_next = null;
}
public CheckingSnapshotKvIterator(CloseableKvIterator<JObjectKey, JDataVersionedWrapper> backing) {
_backing = backing;
}
@Override
public JObjectKey peekNextKey() {
if (_next == null) {
throw new NoSuchElementException();
}
return _next.getKey();
return _backing.peekNextKey();
}
@Override
@@ -378,24 +317,26 @@ public class SnapshotManager {
@Override
public boolean hasNext() {
return _next != null;
return _backing.hasNext();
}
@Override
public Pair<JObjectKey, JDataVersionedWrapper> next() {
if (_next == null) {
throw new NoSuchElementException("No more elements");
}
var ret = _next;
var ret = _backing.next();
assert ret.getValue().version() <= _id;
prepareNext();
Log.tracev("Read: {0}, next: {1}", ret, _next);
return ret;
}
}
public CloseableKvIterator<JObjectKey, JDataVersionedWrapper> getIterator(IteratorStart start, JObjectKey key) {
return new AutoRefreshingSnapshotKvIterator(start, key);
return new CheckingSnapshotKvIterator(new SelfRefreshingKvIterator<>(() ->
new TombstoneMergingKvIterator<>(new SnapshotKvIterator(start, key),
new MappingKvIterator<>(delegateStore.getIterator(start, key), d -> switch (d) {
case TombstoneMergingKvIterator.Tombstone<JDataVersionedWrapper>() -> d;
case TombstoneMergingKvIterator.Data<JDataVersionedWrapper> data ->
data.value().version() <= _id ? data : new TombstoneMergingKvIterator.Tombstone<>();
default -> throw new IllegalStateException("Unexpected value: " + d);
})), _snapshotVersion::get, SnapshotManager.this));
}
public CloseableKvIterator<JObjectKey, JDataVersionedWrapper> getIterator(JObjectKey key) {