Objects: faster MergingKvIterator

This commit is contained in:
2025-04-16 23:41:30 +02:00
parent 94218330b1
commit c9b0400d50

View File

@@ -3,23 +3,34 @@ package com.usatiuk.objects.iterators;
import io.quarkus.logging.Log;
import org.apache.commons.lang3.tuple.Pair;
import java.util.*;
import java.util.stream.IntStream;
import java.util.List;
import java.util.NavigableMap;
import java.util.NoSuchElementException;
import java.util.TreeMap;
public class MergingKvIterator<K extends Comparable<K>, V> extends ReversibleKvIterator<K, V> {
private final NavigableMap<K, CloseableKvIterator<K, V>> _sortedIterators = new TreeMap<>();
private record IteratorEntry<K extends Comparable<K>, V>(int priority, CloseableKvIterator<K, V> iterator) {
public IteratorEntry<K, V> reversed() {
return new IteratorEntry<>(priority, iterator.reversed());
}
}
private final NavigableMap<K, IteratorEntry<K, V>> _sortedIterators = new TreeMap<>();
private final String _name;
private final Map<CloseableKvIterator<K, V>, Integer> _iterators;
private final List<IteratorEntry<K, V>> _iterators;
public MergingKvIterator(String name, IteratorStart startType, K startKey, List<IterProdFn<K, V>> iterators) {
_goingForward = true;
_name = name;
_iterators = Map.ofEntries(
IntStream.range(0, iterators.size())
.mapToObj(i -> Pair.of(iterators.get(i).get(startType, startKey), i))
.toArray(Pair[]::new)
);
// Why streams are so slow?
{
IteratorEntry<K, V>[] iteratorEntries = new IteratorEntry[iterators.size()];
for (int i = 0; i < iterators.size(); i++) {
iteratorEntries[i] = new IteratorEntry<>(i, iterators.get(i).get(startType, startKey));
}
_iterators = List.of(iteratorEntries);
}
if (startType == IteratorStart.LT || startType == IteratorStart.LE) {
// Starting at a greatest key less than/less or equal than:
@@ -30,7 +41,8 @@ public class MergingKvIterator<K extends Comparable<K>, V> extends ReversibleKvI
K greatestLess = null;
K smallestMore = null;
for (var it : _iterators.keySet()) {
for (var ite : _iterators) {
var it = ite.iterator();
if (it.hasNext()) {
var peeked = it.peekNextKey();
if (startType == IteratorStart.LE ? peeked.compareTo(startKey) <= 0 : peeked.compareTo(startKey) < 0) {
@@ -55,14 +67,15 @@ public class MergingKvIterator<K extends Comparable<K>, V> extends ReversibleKvI
// Empty iterators
}
for (var iterator : _iterators.keySet()) {
for (var ite : _iterators) {
var iterator = ite.iterator();
while (iterator.hasNext() && iterator.peekNextKey().compareTo(initialMaxValue) < 0) {
iterator.skip();
}
}
}
for (CloseableKvIterator<K, V> iterator : _iterators.keySet()) {
for (IteratorEntry<K, V> iterator : _iterators) {
advanceIterator(iterator);
}
@@ -88,29 +101,28 @@ public class MergingKvIterator<K extends Comparable<K>, V> extends ReversibleKvI
this(name, startType, startKey, List.of(iterators));
}
private void advanceIterator(CloseableKvIterator<K, V> iterator) {
if (!iterator.hasNext()) {
return;
}
private void advanceIterator(IteratorEntry<K, V> iteratorEntry) {
while (iteratorEntry.iterator().hasNext()) {
K key = iteratorEntry.iterator().peekNextKey();
Log.tracev("{0} Advance peeked: {1}-{2}", _name, iteratorEntry, key);
K key = iterator.peekNextKey();
Log.tracev("{0} Advance peeked: {1}-{2}", _name, iterator, key);
if (!_sortedIterators.containsKey(key)) {
_sortedIterators.put(key, iterator);
return;
}
var them = _sortedIterators.get(key);
if (them == null) {
_sortedIterators.put(key, iteratorEntry);
return;
}
// Expects that reversed iterator returns itself when reversed again
var oursPrio = _iterators.get(_goingForward ? iterator : iterator.reversed());
var them = _sortedIterators.get(key);
var theirsPrio = _iterators.get(_goingForward ? them : them.reversed());
if (oursPrio < theirsPrio) {
_sortedIterators.put(key, iterator);
advanceIterator(them);
} else {
Log.tracev("{0} Skipped: {1}", _name, iterator.peekNextKey());
iterator.skip();
advanceIterator(iterator);
// Expects that reversed iterator returns itself when reversed again
var oursPrio = iteratorEntry.priority();
var theirsPrio = them.priority();
if (oursPrio < theirsPrio) {
_sortedIterators.put(key, iteratorEntry);
advanceIterator(them);
return;
} else {
Log.tracev("{0} Skipped: {1}", _name, iteratorEntry.iterator().peekNextKey());
iteratorEntry.iterator().skip();
}
}
}
@@ -120,7 +132,7 @@ public class MergingKvIterator<K extends Comparable<K>, V> extends ReversibleKvI
Log.tracev("{0} Reversing from {1}", _name, cur);
_goingForward = !_goingForward;
_sortedIterators.clear();
for (CloseableKvIterator<K, V> iterator : _iterators.keySet()) {
for (IteratorEntry<K, V> iterator : _iterators) {
// _goingForward inverted already
advanceIterator(!_goingForward ? iterator.reversed() : iterator);
}
@@ -150,7 +162,7 @@ public class MergingKvIterator<K extends Comparable<K>, V> extends ReversibleKvI
if (cur == null) {
throw new NoSuchElementException();
}
cur.getValue().skip();
cur.getValue().iterator().skip();
advanceIterator(cur.getValue());
Log.tracev("{0} Skip: {1}, next: {2}", _name, cur, _sortedIterators);
}
@@ -166,7 +178,7 @@ public class MergingKvIterator<K extends Comparable<K>, V> extends ReversibleKvI
if (cur == null) {
throw new NoSuchElementException();
}
var curVal = cur.getValue().next();
var curVal = cur.getValue().iterator().next();
advanceIterator(cur.getValue());
// Log.tracev("{0} Read from {1}: {2}, next: {3}", _name, cur.getValue(), curVal, _sortedIterators.keySet());
return curVal;
@@ -174,8 +186,8 @@ public class MergingKvIterator<K extends Comparable<K>, V> extends ReversibleKvI
@Override
public void close() {
for (CloseableKvIterator<K, V> iterator : _iterators.keySet()) {
iterator.close();
for (IteratorEntry<K, V> iterator : _iterators) {
iterator.iterator().close();
}
}