From 3470ce869086dd5a6c3e17d8fd9a9f2b40254e3f Mon Sep 17 00:00:00 2001 From: Stepan Usatiuk Date: Thu, 13 Mar 2025 10:07:49 +0100 Subject: [PATCH] Objects: lazier iterators --- .../dhfs/objects/MergingKvIterator.java | 146 +++++++++++++++++- .../dhfs/objects/PredicateKvIterator.java | 27 +++- .../objects/snapshot/SnapshotKvIterator.java | 7 +- 3 files changed, 166 insertions(+), 14 deletions(-) diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/MergingKvIterator.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/MergingKvIterator.java index 16ec6a87..b6aa55d4 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/MergingKvIterator.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/MergingKvIterator.java @@ -8,25 +8,62 @@ import java.util.*; import java.util.stream.Collectors; public class MergingKvIterator, V> extends ReversibleKvIterator { - private final Map, Integer> _iterators; private final NavigableMap> _sortedIterators = new TreeMap<>(); private final String _name; + private Map, Integer> _iterators; + + private final IteratorStart _initialStartType; + private final K _initialStartKey; + + private interface FirstMatchState, V> { + } + + private record FirstMatchNone, V>() implements FirstMatchState { + } + + private record FirstMatchFound, V>( + CloseableKvIterator iterator) implements FirstMatchState { + } + + private record FirstMatchConsumed, V>() implements FirstMatchState { + } + + // Fast path for the first element + private FirstMatchState _firstMatchState; + private final List> _pendingIterators; public MergingKvIterator(String name, IteratorStart startType, K startKey, List> iterators) { _goingForward = true; _name = name; + _initialStartType = startType; + _initialStartKey = startKey; { int counter = 0; var iteratorsTmp = new HashMap, Integer>(); for (var iteratorFn : iterators) { var iterator = iteratorFn.get(startType, startKey); + if ((counter == 0) // Not really a requirement but simplifies some things for now + && (startType == IteratorStart.GE || startType == IteratorStart.LE) + && iterator.hasNext() + && iterator.peekNextKey().equals(startKey)) { + _firstMatchState = new FirstMatchFound<>(iterator); + _pendingIterators = iterators; + Log.tracev("{0} Created fast match: {1}", _name, _firstMatchState); + return; + } iteratorsTmp.put(iterator, counter++); } _iterators = Map.copyOf(iteratorsTmp); + _pendingIterators = null; } - if (startType == IteratorStart.LT || startType == IteratorStart.LE) { + _firstMatchState = new FirstMatchNone<>(); + doInitialAdvance(); + } + + private void doInitialAdvance() { + if (_initialStartType == IteratorStart.LT || _initialStartType == IteratorStart.LE) { // Starting at a greatest key less than/less or equal than: // We have a bunch of iterators that have given us theirs "greatest LT/LE key" // now we need to pick the greatest of those to start with @@ -37,7 +74,7 @@ public class MergingKvIterator, V> extends ReversibleKvI var peeked = i.peekNextKey(); // Log.warnv("peeked: {0}, from {1}", peeked, i.getClass()); return peeked; - }).distinct().collect(Collectors.partitioningBy(e -> startType == IteratorStart.LE ? e.compareTo(startKey) <= 0 : e.compareTo(startKey) < 0)); + }).distinct().collect(Collectors.partitioningBy(e -> _initialStartType == IteratorStart.LE ? e.compareTo(_initialStartKey) <= 0 : e.compareTo(_initialStartKey) < 0)); K initialMaxValue; if (!found.get(true).isEmpty()) initialMaxValue = found.get(true).stream().max(Comparator.naturalOrder()).orElse(null); @@ -55,8 +92,8 @@ public class MergingKvIterator, V> extends ReversibleKvI advanceIterator(iterator); } - Log.tracev("{0} Created: {1}", _name, _sortedIterators); - switch (startType) { + Log.tracev("{0} Initialized: {1}", _name, _sortedIterators); + switch (_initialStartType) { // case LT -> { // assert _sortedIterators.isEmpty() || _sortedIterators.firstKey().compareTo(initialStartKey) < 0; // } @@ -64,20 +101,44 @@ public class MergingKvIterator, V> extends ReversibleKvI // assert _sortedIterators.isEmpty() || _sortedIterators.firstKey().compareTo(initialStartKey) <= 0; // } case GT -> { - assert _sortedIterators.isEmpty() || _sortedIterators.firstKey().compareTo(startKey) > 0; + assert _sortedIterators.isEmpty() || _sortedIterators.firstKey().compareTo(_initialStartKey) > 0; } case GE -> { - assert _sortedIterators.isEmpty() || _sortedIterators.firstKey().compareTo(startKey) >= 0; + assert _sortedIterators.isEmpty() || _sortedIterators.firstKey().compareTo(_initialStartKey) >= 0; } } } + private void doHydrate() { + if (_firstMatchState instanceof FirstMatchNone) { + return; + } + + boolean consumed = _firstMatchState instanceof FirstMatchConsumed; + if (_firstMatchState instanceof FirstMatchFound(CloseableKvIterator iterator)) { + iterator.close(); + } + + _firstMatchState = new FirstMatchNone<>(); + + { + int counter = 0; + var iteratorsTmp = new HashMap, Integer>(); + for (var iteratorFn : _pendingIterators) { + var iterator = iteratorFn.get(consumed ? IteratorStart.GT : IteratorStart.GE, _initialStartKey); + iteratorsTmp.put(iterator, counter++); + } + _iterators = Map.copyOf(iteratorsTmp); + } + + doInitialAdvance(); + } + @SafeVarargs public MergingKvIterator(String name, IteratorStart startType, K startKey, IterProdFn... iterators) { this(name, startType, startKey, List.of(iterators)); } - private void advanceIterator(CloseableKvIterator iterator) { if (!iterator.hasNext()) { return; @@ -106,6 +167,17 @@ public class MergingKvIterator, V> extends ReversibleKvI @Override protected void reverse() { + switch (_firstMatchState) { + case FirstMatchFound firstMatchFound -> { + doHydrate(); + } + case FirstMatchConsumed firstMatchConsumed -> { + doHydrate(); + } + default -> { + } + } + var cur = _goingForward ? _sortedIterators.pollFirstEntry() : _sortedIterators.pollLastEntry(); Log.tracev("{0} Reversing from {1}", _name, cur); _goingForward = !_goingForward; @@ -129,6 +201,18 @@ public class MergingKvIterator, V> extends ReversibleKvI @Override protected K peekImpl() { + switch (_firstMatchState) { + case FirstMatchFound firstMatchFound -> { + return firstMatchFound.iterator.peekNextKey(); + } + case FirstMatchConsumed firstMatchConsumed -> { + doHydrate(); + break; + } + default -> { + } + } + if (_sortedIterators.isEmpty()) throw new NoSuchElementException(); return _goingForward ? _sortedIterators.firstKey() : _sortedIterators.lastKey(); @@ -136,6 +220,22 @@ public class MergingKvIterator, V> extends ReversibleKvI @Override protected void skipImpl() { + switch (_firstMatchState) { + case FirstMatchFound firstMatchFound -> { + var curVal = firstMatchFound.iterator.next(); + firstMatchFound.iterator.close(); + _firstMatchState = new FirstMatchConsumed<>(); +// Log.tracev("{0} Read from {1}: {2}, next: {3}", _name, firstMatchFound.iterator, curVal, _sortedIterators.keySet()); + return; + } + case FirstMatchConsumed firstMatchConsumed -> { + doHydrate(); + break; + } + default -> { + } + } + var cur = _goingForward ? _sortedIterators.pollFirstEntry() : _sortedIterators.pollLastEntry(); if (cur == null) { throw new NoSuchElementException(); @@ -147,11 +247,38 @@ public class MergingKvIterator, V> extends ReversibleKvI @Override protected boolean hasImpl() { + switch (_firstMatchState) { + case FirstMatchFound firstMatchFound -> { + return true; + } + case FirstMatchConsumed firstMatchConsumed -> { + doHydrate(); + break; + } + default -> { + } + } return !_sortedIterators.isEmpty(); } @Override protected Pair nextImpl() { + switch (_firstMatchState) { + case FirstMatchFound firstMatchFound -> { + var curVal = firstMatchFound.iterator.next(); + firstMatchFound.iterator.close(); + _firstMatchState = new FirstMatchConsumed<>(); +// Log.tracev("{0} Read from {1}: {2}, next: {3}", _name, firstMatchFound.iterator, curVal, _sortedIterators.keySet()); + return curVal; + } + case FirstMatchConsumed firstMatchConsumed -> { + doHydrate(); + break; + } + default -> { + } + } + var cur = _goingForward ? _sortedIterators.pollFirstEntry() : _sortedIterators.pollLastEntry(); if (cur == null) { throw new NoSuchElementException(); @@ -165,6 +292,9 @@ public class MergingKvIterator, V> extends ReversibleKvI @Override public void close() { + if (_firstMatchState instanceof FirstMatchFound(CloseableKvIterator iterator)) { + iterator.close(); + } for (CloseableKvIterator iterator : _iterators.keySet()) { iterator.close(); } diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/PredicateKvIterator.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/PredicateKvIterator.java index cfe85ffa..a39bfad4 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/PredicateKvIterator.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/PredicateKvIterator.java @@ -10,12 +10,17 @@ import java.util.function.Function; public class PredicateKvIterator, V, V_T> extends ReversibleKvIterator { private final CloseableKvIterator _backing; private final Function _transformer; - private Pair _next; + private Pair _next = null; + private boolean _checkedNext = false; public PredicateKvIterator(CloseableKvIterator backing, IteratorStart start, K startKey, Function transformer) { _goingForward = true; _backing = backing; _transformer = transformer; + + if (start == IteratorStart.GE || start == IteratorStart.GT) + return; + fillNext(); boolean shouldGoBack = false; @@ -64,6 +69,7 @@ public class PredicateKvIterator, V, V_T> extends Revers continue; _next = Pair.of(next.getKey(), transformed); } + _checkedNext = true; } @Override @@ -80,12 +86,14 @@ public class PredicateKvIterator, V, V_T> extends Revers Log.tracev("Skipped in reverse: {0}", _next); _next = null; - - fillNext(); + _checkedNext = false; } @Override protected K peekImpl() { + if (!_checkedNext) + fillNext(); + if (_next == null) throw new NoSuchElementException(); return _next.getKey(); @@ -93,24 +101,33 @@ public class PredicateKvIterator, V, V_T> extends Revers @Override protected void skipImpl() { + if (!_checkedNext) + fillNext(); + if (_next == null) throw new NoSuchElementException(); _next = null; - fillNext(); + _checkedNext = false; } @Override protected boolean hasImpl() { + if (!_checkedNext) + fillNext(); + return _next != null; } @Override protected Pair nextImpl() { + if (!_checkedNext) + fillNext(); + if (_next == null) throw new NoSuchElementException("No more elements"); var ret = _next; _next = null; - fillNext(); + _checkedNext = false; return ret; } diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/snapshot/SnapshotKvIterator.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/snapshot/SnapshotKvIterator.java index 1d045665..f4db3043 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/snapshot/SnapshotKvIterator.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/snapshot/SnapshotKvIterator.java @@ -20,7 +20,12 @@ public class SnapshotKvIterator extends ReversibleKvIterator(_objects, start, new SnapshotKey(startKey, Long.MIN_VALUE)); + if (start == IteratorStart.LT || start == IteratorStart.GE) + _backing = new NavigableMapKvIterator<>(_objects, start, new SnapshotKey(startKey, Long.MIN_VALUE)); + else if (start == IteratorStart.GT || start == IteratorStart.LE) + _backing = new NavigableMapKvIterator<>(_objects, start, new SnapshotKey(startKey, Long.MAX_VALUE)); + else + throw new UnsupportedOperationException(); fill(); boolean shouldGoBack = false;