Objects: lazier iterators

This commit is contained in:
2025-03-13 10:07:49 +01:00
parent 1d22465e4a
commit 3470ce8690
3 changed files with 166 additions and 14 deletions

View File

@@ -8,25 +8,62 @@ import java.util.*;
import java.util.stream.Collectors;
public class MergingKvIterator<K extends Comparable<K>, V> extends ReversibleKvIterator<K, V> {
private final Map<CloseableKvIterator<K, V>, Integer> _iterators;
private final NavigableMap<K, CloseableKvIterator<K, V>> _sortedIterators = new TreeMap<>();
private final String _name;
private Map<CloseableKvIterator<K, V>, Integer> _iterators;
private final IteratorStart _initialStartType;
private final K _initialStartKey;
private interface FirstMatchState<K extends Comparable<K>, V> {
}
private record FirstMatchNone<K extends Comparable<K>, V>() implements FirstMatchState<K, V> {
}
private record FirstMatchFound<K extends Comparable<K>, V>(
CloseableKvIterator<K, V> iterator) implements FirstMatchState<K, V> {
}
private record FirstMatchConsumed<K extends Comparable<K>, V>() implements FirstMatchState<K, V> {
}
// Fast path for the first element
private FirstMatchState<K, V> _firstMatchState;
private final List<IterProdFn<K, V>> _pendingIterators;
public MergingKvIterator(String name, IteratorStart startType, K startKey, List<IterProdFn<K, V>> iterators) {
_goingForward = true;
_name = name;
_initialStartType = startType;
_initialStartKey = startKey;
{
int counter = 0;
var iteratorsTmp = new HashMap<CloseableKvIterator<K, V>, Integer>();
for (var iteratorFn : iterators) {
var iterator = iteratorFn.get(startType, startKey);
if ((counter == 0) // Not really a requirement but simplifies some things for now
&& (startType == IteratorStart.GE || startType == IteratorStart.LE)
&& iterator.hasNext()
&& iterator.peekNextKey().equals(startKey)) {
_firstMatchState = new FirstMatchFound<>(iterator);
_pendingIterators = iterators;
Log.tracev("{0} Created fast match: {1}", _name, _firstMatchState);
return;
}
iteratorsTmp.put(iterator, counter++);
}
_iterators = Map.copyOf(iteratorsTmp);
_pendingIterators = null;
}
if (startType == IteratorStart.LT || startType == IteratorStart.LE) {
_firstMatchState = new FirstMatchNone<>();
doInitialAdvance();
}
private void doInitialAdvance() {
if (_initialStartType == IteratorStart.LT || _initialStartType == IteratorStart.LE) {
// Starting at a greatest key less than/less or equal than:
// We have a bunch of iterators that have given us theirs "greatest LT/LE key"
// now we need to pick the greatest of those to start with
@@ -37,7 +74,7 @@ public class MergingKvIterator<K extends Comparable<K>, V> extends ReversibleKvI
var peeked = i.peekNextKey();
// Log.warnv("peeked: {0}, from {1}", peeked, i.getClass());
return peeked;
}).distinct().collect(Collectors.partitioningBy(e -> startType == IteratorStart.LE ? e.compareTo(startKey) <= 0 : e.compareTo(startKey) < 0));
}).distinct().collect(Collectors.partitioningBy(e -> _initialStartType == IteratorStart.LE ? e.compareTo(_initialStartKey) <= 0 : e.compareTo(_initialStartKey) < 0));
K initialMaxValue;
if (!found.get(true).isEmpty())
initialMaxValue = found.get(true).stream().max(Comparator.naturalOrder()).orElse(null);
@@ -55,8 +92,8 @@ public class MergingKvIterator<K extends Comparable<K>, V> extends ReversibleKvI
advanceIterator(iterator);
}
Log.tracev("{0} Created: {1}", _name, _sortedIterators);
switch (startType) {
Log.tracev("{0} Initialized: {1}", _name, _sortedIterators);
switch (_initialStartType) {
// case LT -> {
// assert _sortedIterators.isEmpty() || _sortedIterators.firstKey().compareTo(initialStartKey) < 0;
// }
@@ -64,20 +101,44 @@ public class MergingKvIterator<K extends Comparable<K>, V> extends ReversibleKvI
// assert _sortedIterators.isEmpty() || _sortedIterators.firstKey().compareTo(initialStartKey) <= 0;
// }
case GT -> {
assert _sortedIterators.isEmpty() || _sortedIterators.firstKey().compareTo(startKey) > 0;
assert _sortedIterators.isEmpty() || _sortedIterators.firstKey().compareTo(_initialStartKey) > 0;
}
case GE -> {
assert _sortedIterators.isEmpty() || _sortedIterators.firstKey().compareTo(startKey) >= 0;
assert _sortedIterators.isEmpty() || _sortedIterators.firstKey().compareTo(_initialStartKey) >= 0;
}
}
}
private void doHydrate() {
if (_firstMatchState instanceof FirstMatchNone) {
return;
}
boolean consumed = _firstMatchState instanceof FirstMatchConsumed;
if (_firstMatchState instanceof FirstMatchFound(CloseableKvIterator iterator)) {
iterator.close();
}
_firstMatchState = new FirstMatchNone<>();
{
int counter = 0;
var iteratorsTmp = new HashMap<CloseableKvIterator<K, V>, Integer>();
for (var iteratorFn : _pendingIterators) {
var iterator = iteratorFn.get(consumed ? IteratorStart.GT : IteratorStart.GE, _initialStartKey);
iteratorsTmp.put(iterator, counter++);
}
_iterators = Map.copyOf(iteratorsTmp);
}
doInitialAdvance();
}
@SafeVarargs
public MergingKvIterator(String name, IteratorStart startType, K startKey, IterProdFn<K, V>... iterators) {
this(name, startType, startKey, List.of(iterators));
}
private void advanceIterator(CloseableKvIterator<K, V> iterator) {
if (!iterator.hasNext()) {
return;
@@ -106,6 +167,17 @@ public class MergingKvIterator<K extends Comparable<K>, V> extends ReversibleKvI
@Override
protected void reverse() {
switch (_firstMatchState) {
case FirstMatchFound<K, V> firstMatchFound -> {
doHydrate();
}
case FirstMatchConsumed<K, V> firstMatchConsumed -> {
doHydrate();
}
default -> {
}
}
var cur = _goingForward ? _sortedIterators.pollFirstEntry() : _sortedIterators.pollLastEntry();
Log.tracev("{0} Reversing from {1}", _name, cur);
_goingForward = !_goingForward;
@@ -129,6 +201,18 @@ public class MergingKvIterator<K extends Comparable<K>, V> extends ReversibleKvI
@Override
protected K peekImpl() {
switch (_firstMatchState) {
case FirstMatchFound<K, V> firstMatchFound -> {
return firstMatchFound.iterator.peekNextKey();
}
case FirstMatchConsumed<K, V> firstMatchConsumed -> {
doHydrate();
break;
}
default -> {
}
}
if (_sortedIterators.isEmpty())
throw new NoSuchElementException();
return _goingForward ? _sortedIterators.firstKey() : _sortedIterators.lastKey();
@@ -136,6 +220,22 @@ public class MergingKvIterator<K extends Comparable<K>, V> extends ReversibleKvI
@Override
protected void skipImpl() {
switch (_firstMatchState) {
case FirstMatchFound<K, V> firstMatchFound -> {
var curVal = firstMatchFound.iterator.next();
firstMatchFound.iterator.close();
_firstMatchState = new FirstMatchConsumed<>();
// Log.tracev("{0} Read from {1}: {2}, next: {3}", _name, firstMatchFound.iterator, curVal, _sortedIterators.keySet());
return;
}
case FirstMatchConsumed<K, V> firstMatchConsumed -> {
doHydrate();
break;
}
default -> {
}
}
var cur = _goingForward ? _sortedIterators.pollFirstEntry() : _sortedIterators.pollLastEntry();
if (cur == null) {
throw new NoSuchElementException();
@@ -147,11 +247,38 @@ public class MergingKvIterator<K extends Comparable<K>, V> extends ReversibleKvI
@Override
protected boolean hasImpl() {
switch (_firstMatchState) {
case FirstMatchFound<K, V> firstMatchFound -> {
return true;
}
case FirstMatchConsumed<K, V> firstMatchConsumed -> {
doHydrate();
break;
}
default -> {
}
}
return !_sortedIterators.isEmpty();
}
@Override
protected Pair<K, V> nextImpl() {
switch (_firstMatchState) {
case FirstMatchFound<K, V> firstMatchFound -> {
var curVal = firstMatchFound.iterator.next();
firstMatchFound.iterator.close();
_firstMatchState = new FirstMatchConsumed<>();
// Log.tracev("{0} Read from {1}: {2}, next: {3}", _name, firstMatchFound.iterator, curVal, _sortedIterators.keySet());
return curVal;
}
case FirstMatchConsumed<K, V> firstMatchConsumed -> {
doHydrate();
break;
}
default -> {
}
}
var cur = _goingForward ? _sortedIterators.pollFirstEntry() : _sortedIterators.pollLastEntry();
if (cur == null) {
throw new NoSuchElementException();
@@ -165,6 +292,9 @@ public class MergingKvIterator<K extends Comparable<K>, V> extends ReversibleKvI
@Override
public void close() {
if (_firstMatchState instanceof FirstMatchFound(CloseableKvIterator iterator)) {
iterator.close();
}
for (CloseableKvIterator<K, V> iterator : _iterators.keySet()) {
iterator.close();
}

View File

@@ -10,12 +10,17 @@ import java.util.function.Function;
public class PredicateKvIterator<K extends Comparable<K>, V, V_T> extends ReversibleKvIterator<K, V_T> {
private final CloseableKvIterator<K, V> _backing;
private final Function<V, V_T> _transformer;
private Pair<K, V_T> _next;
private Pair<K, V_T> _next = null;
private boolean _checkedNext = false;
public PredicateKvIterator(CloseableKvIterator<K, V> backing, IteratorStart start, K startKey, Function<V, V_T> transformer) {
_goingForward = true;
_backing = backing;
_transformer = transformer;
if (start == IteratorStart.GE || start == IteratorStart.GT)
return;
fillNext();
boolean shouldGoBack = false;
@@ -64,6 +69,7 @@ public class PredicateKvIterator<K extends Comparable<K>, V, V_T> extends Revers
continue;
_next = Pair.of(next.getKey(), transformed);
}
_checkedNext = true;
}
@Override
@@ -80,12 +86,14 @@ public class PredicateKvIterator<K extends Comparable<K>, V, V_T> extends Revers
Log.tracev("Skipped in reverse: {0}", _next);
_next = null;
fillNext();
_checkedNext = false;
}
@Override
protected K peekImpl() {
if (!_checkedNext)
fillNext();
if (_next == null)
throw new NoSuchElementException();
return _next.getKey();
@@ -93,24 +101,33 @@ public class PredicateKvIterator<K extends Comparable<K>, V, V_T> extends Revers
@Override
protected void skipImpl() {
if (!_checkedNext)
fillNext();
if (_next == null)
throw new NoSuchElementException();
_next = null;
fillNext();
_checkedNext = false;
}
@Override
protected boolean hasImpl() {
if (!_checkedNext)
fillNext();
return _next != null;
}
@Override
protected Pair<K, V_T> nextImpl() {
if (!_checkedNext)
fillNext();
if (_next == null)
throw new NoSuchElementException("No more elements");
var ret = _next;
_next = null;
fillNext();
_checkedNext = false;
return ret;
}

View File

@@ -20,7 +20,12 @@ public class SnapshotKvIterator extends ReversibleKvIterator<JObjectKey, MaybeTo
_objects = objects;
_version = version;
_goingForward = true;
_backing = new NavigableMapKvIterator<>(_objects, start, new SnapshotKey(startKey, Long.MIN_VALUE));
if (start == IteratorStart.LT || start == IteratorStart.GE)
_backing = new NavigableMapKvIterator<>(_objects, start, new SnapshotKey(startKey, Long.MIN_VALUE));
else if (start == IteratorStart.GT || start == IteratorStart.LE)
_backing = new NavigableMapKvIterator<>(_objects, start, new SnapshotKey(startKey, Long.MAX_VALUE));
else
throw new UnsupportedOperationException();
fill();
boolean shouldGoBack = false;