mirror of
https://github.com/usatiuk/dhfs.git
synced 2025-10-29 04:57:48 +01:00
Compare commits
3 Commits
df00584367
...
cb8c50000a
| Author | SHA1 | Date | |
|---|---|---|---|
| cb8c50000a | |||
| 4c5cbfb5bf | |||
| 6bcec4a260 |
@@ -4,71 +4,56 @@ import io.quarkus.logging.Log;
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.IntStream;
|
||||
|
||||
public class MergingKvIterator<K extends Comparable<K>, V> extends ReversibleKvIterator<K, V> {
|
||||
private final NavigableMap<K, CloseableKvIterator<K, V>> _sortedIterators = new TreeMap<>();
|
||||
private final String _name;
|
||||
private final IteratorStart _initialStartType;
|
||||
private final K _initialStartKey;
|
||||
private final List<IterProdFn<K, V>> _pendingIterators;
|
||||
private Map<CloseableKvIterator<K, V>, Integer> _iterators;
|
||||
// Fast path for the first element
|
||||
private FirstMatchState<K, V> _firstMatchState;
|
||||
private final Map<CloseableKvIterator<K, V>, Integer> _iterators;
|
||||
|
||||
public MergingKvIterator(String name, IteratorStart startType, K startKey, List<IterProdFn<K, V>> iterators) {
|
||||
_goingForward = true;
|
||||
_name = name;
|
||||
_initialStartType = startType;
|
||||
_initialStartKey = startKey;
|
||||
|
||||
{
|
||||
int counter = 0;
|
||||
var iteratorsTmp = new HashMap<CloseableKvIterator<K, V>, Integer>();
|
||||
for (var iteratorFn : iterators) {
|
||||
var iterator = iteratorFn.get(startType, startKey);
|
||||
if ((counter == 0) // Not really a requirement but simplifies some things for now
|
||||
&& (startType == IteratorStart.GE || startType == IteratorStart.LE)
|
||||
&& iterator.hasNext()
|
||||
&& iterator.peekNextKey().equals(startKey)) {
|
||||
_firstMatchState = new FirstMatchFound<>(iterator);
|
||||
_pendingIterators = iterators;
|
||||
Log.tracev("{0} Created fast match: {1}", _name, _firstMatchState);
|
||||
return;
|
||||
}
|
||||
iteratorsTmp.put(iterator, counter++);
|
||||
}
|
||||
_iterators = Map.copyOf(iteratorsTmp);
|
||||
_pendingIterators = null;
|
||||
}
|
||||
_iterators = Map.ofEntries(
|
||||
IntStream.range(0, iterators.size())
|
||||
.mapToObj(i -> Pair.of(iterators.get(i).get(startType, startKey), i))
|
||||
.toArray(Pair[]::new)
|
||||
);
|
||||
|
||||
_firstMatchState = new FirstMatchNone<>();
|
||||
doInitialAdvance();
|
||||
}
|
||||
|
||||
@SafeVarargs
|
||||
public MergingKvIterator(String name, IteratorStart startType, K startKey, IterProdFn<K, V>... iterators) {
|
||||
this(name, startType, startKey, List.of(iterators));
|
||||
}
|
||||
|
||||
private void doInitialAdvance() {
|
||||
if (_initialStartType == IteratorStart.LT || _initialStartType == IteratorStart.LE) {
|
||||
if (startType == IteratorStart.LT || startType == IteratorStart.LE) {
|
||||
// Starting at a greatest key less than/less or equal than:
|
||||
// We have a bunch of iterators that have given us theirs "greatest LT/LE key"
|
||||
// now we need to pick the greatest of those to start with
|
||||
// But if some of them don't have a lesser key, we need to pick the smallest of those
|
||||
var found = _iterators.keySet().stream()
|
||||
.filter(CloseableKvIterator::hasNext)
|
||||
.map((i) -> {
|
||||
var peeked = i.peekNextKey();
|
||||
// Log.warnv("peeked: {0}, from {1}", peeked, i.getClass());
|
||||
return peeked;
|
||||
}).distinct().collect(Collectors.partitioningBy(e -> _initialStartType == IteratorStart.LE ? e.compareTo(_initialStartKey) <= 0 : e.compareTo(_initialStartKey) < 0));
|
||||
|
||||
K greatestLess = null;
|
||||
K smallestMore = null;
|
||||
|
||||
for (var it : _iterators.keySet()) {
|
||||
if (it.hasNext()) {
|
||||
var peeked = it.peekNextKey();
|
||||
if (startType == IteratorStart.LE ? peeked.compareTo(startKey) <= 0 : peeked.compareTo(startKey) < 0) {
|
||||
if (greatestLess == null || peeked.compareTo(greatestLess) > 0) {
|
||||
greatestLess = peeked;
|
||||
}
|
||||
} else {
|
||||
if (smallestMore == null || peeked.compareTo(smallestMore) < 0) {
|
||||
smallestMore = peeked;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
K initialMaxValue;
|
||||
if (!found.get(true).isEmpty())
|
||||
initialMaxValue = found.get(true).stream().max(Comparator.naturalOrder()).orElse(null);
|
||||
if (greatestLess != null)
|
||||
initialMaxValue = greatestLess;
|
||||
else
|
||||
initialMaxValue = found.get(false).stream().min(Comparator.naturalOrder()).orElse(null);
|
||||
initialMaxValue = smallestMore;
|
||||
|
||||
if (initialMaxValue == null) {
|
||||
// Empty iterators
|
||||
}
|
||||
|
||||
for (var iterator : _iterators.keySet()) {
|
||||
while (iterator.hasNext() && iterator.peekNextKey().compareTo(initialMaxValue) < 0) {
|
||||
@@ -82,7 +67,7 @@ public class MergingKvIterator<K extends Comparable<K>, V> extends ReversibleKvI
|
||||
}
|
||||
|
||||
Log.tracev("{0} Initialized: {1}", _name, _sortedIterators);
|
||||
switch (_initialStartType) {
|
||||
switch (startType) {
|
||||
// case LT -> {
|
||||
// assert _sortedIterators.isEmpty() || _sortedIterators.firstKey().compareTo(initialStartKey) < 0;
|
||||
// }
|
||||
@@ -90,37 +75,17 @@ public class MergingKvIterator<K extends Comparable<K>, V> extends ReversibleKvI
|
||||
// assert _sortedIterators.isEmpty() || _sortedIterators.firstKey().compareTo(initialStartKey) <= 0;
|
||||
// }
|
||||
case GT -> {
|
||||
assert _sortedIterators.isEmpty() || _sortedIterators.firstKey().compareTo(_initialStartKey) > 0;
|
||||
assert _sortedIterators.isEmpty() || _sortedIterators.firstKey().compareTo(startKey) > 0;
|
||||
}
|
||||
case GE -> {
|
||||
assert _sortedIterators.isEmpty() || _sortedIterators.firstKey().compareTo(_initialStartKey) >= 0;
|
||||
assert _sortedIterators.isEmpty() || _sortedIterators.firstKey().compareTo(startKey) >= 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void doHydrate() {
|
||||
if (_firstMatchState instanceof FirstMatchNone) {
|
||||
return;
|
||||
}
|
||||
|
||||
boolean consumed = _firstMatchState instanceof FirstMatchConsumed;
|
||||
if (_firstMatchState instanceof FirstMatchFound(CloseableKvIterator iterator)) {
|
||||
iterator.close();
|
||||
}
|
||||
|
||||
_firstMatchState = new FirstMatchNone<>();
|
||||
|
||||
{
|
||||
int counter = 0;
|
||||
var iteratorsTmp = new HashMap<CloseableKvIterator<K, V>, Integer>();
|
||||
for (var iteratorFn : _pendingIterators) {
|
||||
var iterator = iteratorFn.get(consumed ? IteratorStart.GT : IteratorStart.GE, _initialStartKey);
|
||||
iteratorsTmp.put(iterator, counter++);
|
||||
}
|
||||
_iterators = Map.copyOf(iteratorsTmp);
|
||||
}
|
||||
|
||||
doInitialAdvance();
|
||||
@SafeVarargs
|
||||
public MergingKvIterator(String name, IteratorStart startType, K startKey, IterProdFn<K, V>... iterators) {
|
||||
this(name, startType, startKey, List.of(iterators));
|
||||
}
|
||||
|
||||
private void advanceIterator(CloseableKvIterator<K, V> iterator) {
|
||||
@@ -151,17 +116,6 @@ public class MergingKvIterator<K extends Comparable<K>, V> extends ReversibleKvI
|
||||
|
||||
@Override
|
||||
protected void reverse() {
|
||||
switch (_firstMatchState) {
|
||||
case FirstMatchFound<K, V> firstMatchFound -> {
|
||||
doHydrate();
|
||||
}
|
||||
case FirstMatchConsumed<K, V> firstMatchConsumed -> {
|
||||
doHydrate();
|
||||
}
|
||||
default -> {
|
||||
}
|
||||
}
|
||||
|
||||
var cur = _goingForward ? _sortedIterators.pollFirstEntry() : _sortedIterators.pollLastEntry();
|
||||
Log.tracev("{0} Reversing from {1}", _name, cur);
|
||||
_goingForward = !_goingForward;
|
||||
@@ -185,18 +139,6 @@ public class MergingKvIterator<K extends Comparable<K>, V> extends ReversibleKvI
|
||||
|
||||
@Override
|
||||
protected K peekImpl() {
|
||||
switch (_firstMatchState) {
|
||||
case FirstMatchFound<K, V> firstMatchFound -> {
|
||||
return firstMatchFound.iterator.peekNextKey();
|
||||
}
|
||||
case FirstMatchConsumed<K, V> firstMatchConsumed -> {
|
||||
doHydrate();
|
||||
break;
|
||||
}
|
||||
default -> {
|
||||
}
|
||||
}
|
||||
|
||||
if (_sortedIterators.isEmpty())
|
||||
throw new NoSuchElementException();
|
||||
return _goingForward ? _sortedIterators.firstKey() : _sortedIterators.lastKey();
|
||||
@@ -204,22 +146,6 @@ public class MergingKvIterator<K extends Comparable<K>, V> extends ReversibleKvI
|
||||
|
||||
@Override
|
||||
protected void skipImpl() {
|
||||
switch (_firstMatchState) {
|
||||
case FirstMatchFound<K, V> firstMatchFound -> {
|
||||
var curVal = firstMatchFound.iterator.next();
|
||||
firstMatchFound.iterator.close();
|
||||
_firstMatchState = new FirstMatchConsumed<>();
|
||||
// Log.tracev("{0} Read from {1}: {2}, next: {3}", _name, firstMatchFound.iterator, curVal, _sortedIterators.keySet());
|
||||
return;
|
||||
}
|
||||
case FirstMatchConsumed<K, V> firstMatchConsumed -> {
|
||||
doHydrate();
|
||||
break;
|
||||
}
|
||||
default -> {
|
||||
}
|
||||
}
|
||||
|
||||
var cur = _goingForward ? _sortedIterators.pollFirstEntry() : _sortedIterators.pollLastEntry();
|
||||
if (cur == null) {
|
||||
throw new NoSuchElementException();
|
||||
@@ -231,38 +157,11 @@ public class MergingKvIterator<K extends Comparable<K>, V> extends ReversibleKvI
|
||||
|
||||
@Override
|
||||
protected boolean hasImpl() {
|
||||
switch (_firstMatchState) {
|
||||
case FirstMatchFound<K, V> firstMatchFound -> {
|
||||
return true;
|
||||
}
|
||||
case FirstMatchConsumed<K, V> firstMatchConsumed -> {
|
||||
doHydrate();
|
||||
break;
|
||||
}
|
||||
default -> {
|
||||
}
|
||||
}
|
||||
return !_sortedIterators.isEmpty();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Pair<K, V> nextImpl() {
|
||||
switch (_firstMatchState) {
|
||||
case FirstMatchFound<K, V> firstMatchFound -> {
|
||||
var curVal = firstMatchFound.iterator.next();
|
||||
firstMatchFound.iterator.close();
|
||||
_firstMatchState = new FirstMatchConsumed<>();
|
||||
// Log.tracev("{0} Read from {1}: {2}, next: {3}", _name, firstMatchFound.iterator, curVal, _sortedIterators.keySet());
|
||||
return curVal;
|
||||
}
|
||||
case FirstMatchConsumed<K, V> firstMatchConsumed -> {
|
||||
doHydrate();
|
||||
break;
|
||||
}
|
||||
default -> {
|
||||
}
|
||||
}
|
||||
|
||||
var cur = _goingForward ? _sortedIterators.pollFirstEntry() : _sortedIterators.pollLastEntry();
|
||||
if (cur == null) {
|
||||
throw new NoSuchElementException();
|
||||
@@ -275,9 +174,6 @@ public class MergingKvIterator<K extends Comparable<K>, V> extends ReversibleKvI
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
if (_firstMatchState instanceof FirstMatchFound(CloseableKvIterator iterator)) {
|
||||
iterator.close();
|
||||
}
|
||||
for (CloseableKvIterator<K, V> iterator : _iterators.keySet()) {
|
||||
iterator.close();
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user