Objects: simplify merging iterator

remove the first match "optimization", as it doesn't really
matter with the separate read object methods
This commit is contained in:
2025-04-03 22:13:34 +02:00
parent df00584367
commit 6bcec4a260

View File

@@ -9,50 +9,21 @@ import java.util.stream.Collectors;
public class MergingKvIterator<K extends Comparable<K>, V> extends ReversibleKvIterator<K, V> {
private final NavigableMap<K, CloseableKvIterator<K, V>> _sortedIterators = new TreeMap<>();
private final String _name;
private final IteratorStart _initialStartType;
private final K _initialStartKey;
private final List<IterProdFn<K, V>> _pendingIterators;
private Map<CloseableKvIterator<K, V>, Integer> _iterators;
// Fast path for the first element
private FirstMatchState<K, V> _firstMatchState;
private final Map<CloseableKvIterator<K, V>, Integer> _iterators;
public MergingKvIterator(String name, IteratorStart startType, K startKey, List<IterProdFn<K, V>> iterators) {
_goingForward = true;
_name = name;
_initialStartType = startType;
_initialStartKey = startKey;
{
int counter = 0;
var iteratorsTmp = new HashMap<CloseableKvIterator<K, V>, Integer>();
for (var iteratorFn : iterators) {
var iterator = iteratorFn.get(startType, startKey);
if ((counter == 0) // Not really a requirement but simplifies some things for now
&& (startType == IteratorStart.GE || startType == IteratorStart.LE)
&& iterator.hasNext()
&& iterator.peekNextKey().equals(startKey)) {
_firstMatchState = new FirstMatchFound<>(iterator);
_pendingIterators = iterators;
Log.tracev("{0} Created fast match: {1}", _name, _firstMatchState);
return;
}
iteratorsTmp.put(iterator, counter++);
}
_iterators = Map.copyOf(iteratorsTmp);
_pendingIterators = null;
int counter = 0;
var iteratorsTmp = new HashMap<CloseableKvIterator<K, V>, Integer>();
for (var iteratorFn : iterators) {
var iterator = iteratorFn.get(startType, startKey);
iteratorsTmp.put(iterator, counter++);
}
_iterators = Map.copyOf(iteratorsTmp);
_firstMatchState = new FirstMatchNone<>();
doInitialAdvance();
}
@SafeVarargs
public MergingKvIterator(String name, IteratorStart startType, K startKey, IterProdFn<K, V>... iterators) {
this(name, startType, startKey, List.of(iterators));
}
private void doInitialAdvance() {
if (_initialStartType == IteratorStart.LT || _initialStartType == IteratorStart.LE) {
if (startType == IteratorStart.LT || startType == IteratorStart.LE) {
// Starting at a greatest key less than/less or equal than:
// We have a bunch of iterators that have given us theirs "greatest LT/LE key"
// now we need to pick the greatest of those to start with
@@ -63,7 +34,7 @@ public class MergingKvIterator<K extends Comparable<K>, V> extends ReversibleKvI
var peeked = i.peekNextKey();
// Log.warnv("peeked: {0}, from {1}", peeked, i.getClass());
return peeked;
}).distinct().collect(Collectors.partitioningBy(e -> _initialStartType == IteratorStart.LE ? e.compareTo(_initialStartKey) <= 0 : e.compareTo(_initialStartKey) < 0));
}).distinct().collect(Collectors.partitioningBy(e -> startType == IteratorStart.LE ? e.compareTo(startKey) <= 0 : e.compareTo(startKey) < 0));
K initialMaxValue;
if (!found.get(true).isEmpty())
initialMaxValue = found.get(true).stream().max(Comparator.naturalOrder()).orElse(null);
@@ -82,7 +53,7 @@ public class MergingKvIterator<K extends Comparable<K>, V> extends ReversibleKvI
}
Log.tracev("{0} Initialized: {1}", _name, _sortedIterators);
switch (_initialStartType) {
switch (startType) {
// case LT -> {
// assert _sortedIterators.isEmpty() || _sortedIterators.firstKey().compareTo(initialStartKey) < 0;
// }
@@ -90,37 +61,17 @@ public class MergingKvIterator<K extends Comparable<K>, V> extends ReversibleKvI
// assert _sortedIterators.isEmpty() || _sortedIterators.firstKey().compareTo(initialStartKey) <= 0;
// }
case GT -> {
assert _sortedIterators.isEmpty() || _sortedIterators.firstKey().compareTo(_initialStartKey) > 0;
assert _sortedIterators.isEmpty() || _sortedIterators.firstKey().compareTo(startKey) > 0;
}
case GE -> {
assert _sortedIterators.isEmpty() || _sortedIterators.firstKey().compareTo(_initialStartKey) >= 0;
assert _sortedIterators.isEmpty() || _sortedIterators.firstKey().compareTo(startKey) >= 0;
}
}
}
private void doHydrate() {
if (_firstMatchState instanceof FirstMatchNone) {
return;
}
boolean consumed = _firstMatchState instanceof FirstMatchConsumed;
if (_firstMatchState instanceof FirstMatchFound(CloseableKvIterator iterator)) {
iterator.close();
}
_firstMatchState = new FirstMatchNone<>();
{
int counter = 0;
var iteratorsTmp = new HashMap<CloseableKvIterator<K, V>, Integer>();
for (var iteratorFn : _pendingIterators) {
var iterator = iteratorFn.get(consumed ? IteratorStart.GT : IteratorStart.GE, _initialStartKey);
iteratorsTmp.put(iterator, counter++);
}
_iterators = Map.copyOf(iteratorsTmp);
}
doInitialAdvance();
@SafeVarargs
public MergingKvIterator(String name, IteratorStart startType, K startKey, IterProdFn<K, V>... iterators) {
this(name, startType, startKey, List.of(iterators));
}
private void advanceIterator(CloseableKvIterator<K, V> iterator) {
@@ -151,17 +102,6 @@ public class MergingKvIterator<K extends Comparable<K>, V> extends ReversibleKvI
@Override
protected void reverse() {
switch (_firstMatchState) {
case FirstMatchFound<K, V> firstMatchFound -> {
doHydrate();
}
case FirstMatchConsumed<K, V> firstMatchConsumed -> {
doHydrate();
}
default -> {
}
}
var cur = _goingForward ? _sortedIterators.pollFirstEntry() : _sortedIterators.pollLastEntry();
Log.tracev("{0} Reversing from {1}", _name, cur);
_goingForward = !_goingForward;
@@ -185,18 +125,6 @@ public class MergingKvIterator<K extends Comparable<K>, V> extends ReversibleKvI
@Override
protected K peekImpl() {
switch (_firstMatchState) {
case FirstMatchFound<K, V> firstMatchFound -> {
return firstMatchFound.iterator.peekNextKey();
}
case FirstMatchConsumed<K, V> firstMatchConsumed -> {
doHydrate();
break;
}
default -> {
}
}
if (_sortedIterators.isEmpty())
throw new NoSuchElementException();
return _goingForward ? _sortedIterators.firstKey() : _sortedIterators.lastKey();
@@ -204,22 +132,6 @@ public class MergingKvIterator<K extends Comparable<K>, V> extends ReversibleKvI
@Override
protected void skipImpl() {
switch (_firstMatchState) {
case FirstMatchFound<K, V> firstMatchFound -> {
var curVal = firstMatchFound.iterator.next();
firstMatchFound.iterator.close();
_firstMatchState = new FirstMatchConsumed<>();
// Log.tracev("{0} Read from {1}: {2}, next: {3}", _name, firstMatchFound.iterator, curVal, _sortedIterators.keySet());
return;
}
case FirstMatchConsumed<K, V> firstMatchConsumed -> {
doHydrate();
break;
}
default -> {
}
}
var cur = _goingForward ? _sortedIterators.pollFirstEntry() : _sortedIterators.pollLastEntry();
if (cur == null) {
throw new NoSuchElementException();
@@ -231,38 +143,11 @@ public class MergingKvIterator<K extends Comparable<K>, V> extends ReversibleKvI
@Override
protected boolean hasImpl() {
switch (_firstMatchState) {
case FirstMatchFound<K, V> firstMatchFound -> {
return true;
}
case FirstMatchConsumed<K, V> firstMatchConsumed -> {
doHydrate();
break;
}
default -> {
}
}
return !_sortedIterators.isEmpty();
}
@Override
protected Pair<K, V> nextImpl() {
switch (_firstMatchState) {
case FirstMatchFound<K, V> firstMatchFound -> {
var curVal = firstMatchFound.iterator.next();
firstMatchFound.iterator.close();
_firstMatchState = new FirstMatchConsumed<>();
// Log.tracev("{0} Read from {1}: {2}, next: {3}", _name, firstMatchFound.iterator, curVal, _sortedIterators.keySet());
return curVal;
}
case FirstMatchConsumed<K, V> firstMatchConsumed -> {
doHydrate();
break;
}
default -> {
}
}
var cur = _goingForward ? _sortedIterators.pollFirstEntry() : _sortedIterators.pollLastEntry();
if (cur == null) {
throw new NoSuchElementException();
@@ -275,9 +160,6 @@ public class MergingKvIterator<K extends Comparable<K>, V> extends ReversibleKvI
@Override
public void close() {
if (_firstMatchState instanceof FirstMatchFound(CloseableKvIterator iterator)) {
iterator.close();
}
for (CloseableKvIterator<K, V> iterator : _iterators.keySet()) {
iterator.close();
}