mirror of
https://github.com/usatiuk/dhfs.git
synced 2025-10-29 04:57:48 +01:00
skip iterator
This commit is contained in:
@@ -7,4 +7,6 @@ import java.util.Iterator;
|
||||
|
||||
public interface CloseableKvIterator<K extends Comparable<K>, V> extends Iterator<Pair<K, V>>, AutoCloseableNoThrow {
|
||||
K peekNextKey();
|
||||
|
||||
void skip();
|
||||
}
|
||||
|
||||
@@ -30,6 +30,7 @@ public class InconsistentKvIteratorWrapper<K extends Comparable<K>, V> implement
|
||||
}
|
||||
|
||||
private void refresh() {
|
||||
Log.tracev("Refreshing iterator: {0}", _backing);
|
||||
_backing.close();
|
||||
if (_peekedKey != null) {
|
||||
_backing = _iteratorSupplier.apply(Pair.of(IteratorStart.GE, _peekedKey));
|
||||
@@ -69,6 +70,25 @@ public class InconsistentKvIteratorWrapper<K extends Comparable<K>, V> implement
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void skip() {
|
||||
while (true) {
|
||||
try {
|
||||
_lastReturnedKey = _backing.peekNextKey();
|
||||
_backing.skip();
|
||||
_peekedNext = false;
|
||||
_peekedKey = null;
|
||||
return;
|
||||
} catch (NoSuchElementException ignored) {
|
||||
assert !_peekedNext;
|
||||
throw ignored;
|
||||
} catch (StaleIteratorException ignored) {
|
||||
refresh();
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
_backing.close();
|
||||
|
||||
@@ -91,6 +91,20 @@ public class InconsistentSelfRefreshingKvIterator<K extends Comparable<K>, V> im
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void skip() {
|
||||
_lock.lock();
|
||||
try {
|
||||
maybeRefresh();
|
||||
_lastReturnedKey = _backing.peekNextKey();
|
||||
_backing.skip();
|
||||
_peekedNext = false;
|
||||
_peekedKey = null;
|
||||
} finally {
|
||||
_lock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
_backing.close();
|
||||
|
||||
@@ -37,6 +37,17 @@ public class InvalidatableKvIterator<K extends Comparable<K>, V> implements Clos
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void skip() {
|
||||
_lock.lock();
|
||||
try {
|
||||
checkVersion();
|
||||
_backing.skip();
|
||||
} finally {
|
||||
_lock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
_backing.close();
|
||||
|
||||
@@ -18,6 +18,11 @@ public class MappingKvIterator<K extends Comparable<K>, V, V_T> implements Close
|
||||
return _backing.peekNextKey();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void skip() {
|
||||
_backing.skip();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
_backing.close();
|
||||
|
||||
@@ -49,7 +49,7 @@ public class MergingKvIterator<K extends Comparable<K>, V> implements CloseableK
|
||||
_sortedIterators.put(key, iterator);
|
||||
advanceIterator(them);
|
||||
} else {
|
||||
iterator.next();
|
||||
iterator.skip();
|
||||
advanceIterator(iterator);
|
||||
}
|
||||
}
|
||||
@@ -61,6 +61,17 @@ public class MergingKvIterator<K extends Comparable<K>, V> implements CloseableK
|
||||
return _sortedIterators.firstKey();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void skip() {
|
||||
var cur = _sortedIterators.pollFirstEntry();
|
||||
if (cur == null) {
|
||||
throw new NoSuchElementException();
|
||||
}
|
||||
cur.getValue().skip();
|
||||
advanceIterator(cur.getValue());
|
||||
Log.tracev("{0} Skip: {1}, next: {2}", _name, cur, _sortedIterators);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
for (CloseableKvIterator<K, V> iterator : _iterators.keySet()) {
|
||||
@@ -81,7 +92,7 @@ public class MergingKvIterator<K extends Comparable<K>, V> implements CloseableK
|
||||
}
|
||||
var curVal = cur.getValue().next();
|
||||
advanceIterator(cur.getValue());
|
||||
Log.tracev("{0} Read: {1}, next: {2}", _name, curVal, _sortedIterators);
|
||||
Log.tracev("{0} Read from {1}: {2}, next: {3}", _name, cur.getValue(), curVal, _sortedIterators);
|
||||
return curVal;
|
||||
}
|
||||
|
||||
|
||||
@@ -40,6 +40,15 @@ public class NavigableMapKvIterator<K extends Comparable<K>, V> implements Close
|
||||
return _next.getKey();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void skip() {
|
||||
if (_next == null) {
|
||||
throw new NoSuchElementException();
|
||||
}
|
||||
_next = null;
|
||||
fillNext();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
}
|
||||
|
||||
@@ -33,6 +33,14 @@ public class PredicateKvIterator<K extends Comparable<K>, V, V_T> implements Clo
|
||||
return _next.getKey();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void skip() {
|
||||
if (_next == null)
|
||||
throw new NoSuchElementException();
|
||||
_next = null;
|
||||
fillNext();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
_backing.close();
|
||||
|
||||
@@ -91,6 +91,14 @@ public class SelfRefreshingKvIterator<K extends Comparable<K>, V> implements Clo
|
||||
return _next.getKey();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void skip() {
|
||||
if (_next == null) {
|
||||
throw new NoSuchElementException();
|
||||
}
|
||||
prepareNext();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
_backing.close();
|
||||
|
||||
@@ -280,6 +280,14 @@ public class SnapshotManager {
|
||||
return _next.getKey();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void skip() {
|
||||
if (_next == null)
|
||||
throw new NoSuchElementException();
|
||||
_next = null;
|
||||
fillNext();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
_backing.close();
|
||||
@@ -321,6 +329,11 @@ public class SnapshotManager {
|
||||
return _backing.peekNextKey();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void skip() {
|
||||
_backing.skip();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
_backing.close();
|
||||
|
||||
@@ -41,6 +41,11 @@ public class TombstoneMergingKvIterator<K extends Comparable<K>, V> implements C
|
||||
return _backing.peekNextKey();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void skip() {
|
||||
_backing.skip();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
_backing.close();
|
||||
|
||||
@@ -155,6 +155,11 @@ public class CachingObjectPersistentStore {
|
||||
return _delegate.peekNextKey();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void skip() {
|
||||
_delegate.skip();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
_delegate.close();
|
||||
@@ -179,6 +184,7 @@ public class CachingObjectPersistentStore {
|
||||
// Warning: it has a nasty side effect of global caching, so in this case don't even call next on it,
|
||||
// if some objects are still in writeback
|
||||
public CloseableKvIterator<JObjectKey, JDataVersionedWrapper> getIterator(IteratorStart start, JObjectKey key) {
|
||||
Log.tracev("Getting cache iterator: {0}, {1}", start, key);
|
||||
_cacheVersionLock.readLock().lock();
|
||||
try {
|
||||
return new InconsistentSelfRefreshingKvIterator<>(
|
||||
|
||||
@@ -171,6 +171,11 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore {
|
||||
return ret;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void skip() {
|
||||
_hasNext = _cursor.next();
|
||||
}
|
||||
|
||||
@Override
|
||||
public JObjectKey peekNextKey() {
|
||||
if (!_hasNext) {
|
||||
|
||||
@@ -43,6 +43,11 @@ public class SerializingObjectPersistentStore {
|
||||
return _delegate.peekNextKey();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void skip() {
|
||||
_delegate.skip();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
_delegate.close();
|
||||
|
||||
@@ -83,6 +83,11 @@ public class ReadTrackingObjectSourceFactory {
|
||||
return _backing.peekNextKey();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void skip() {
|
||||
_backing.skip();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
_backing.close();
|
||||
|
||||
@@ -33,6 +33,15 @@ public class MergingKvIteratorTest {
|
||||
return _next.getKey();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void skip() {
|
||||
if (_next == null) {
|
||||
throw new NoSuchElementException();
|
||||
}
|
||||
_next = null;
|
||||
fillNext();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user