mirror of
https://github.com/usatiuk/dhfs.git
synced 2025-10-29 04:57:48 +01:00
Compare commits
1 Commits
1025e6b246
...
cursed-ite
| Author | SHA1 | Date | |
|---|---|---|---|
| 5cd0e5f045 |
@@ -1,6 +1,12 @@
|
|||||||
package com.usatiuk.objects.iterators;
|
package com.usatiuk.objects.iterators;
|
||||||
|
|
||||||
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
@FunctionalInterface
|
@FunctionalInterface
|
||||||
public interface IterProdFn<K extends Comparable<K>, V> {
|
public interface IterProdFn<K extends Comparable<K>, V> {
|
||||||
CloseableKvIterator<K, V> get(IteratorStart start, K key);
|
CloseableKvIterator<K, V> get(IteratorStart start, K key);
|
||||||
|
|
||||||
|
default Stream<CloseableKvIterator<K, MaybeTombstone<V>>> getFlat(IteratorStart start, K key) {
|
||||||
|
return Stream.of(new MappingKvIterator<>(get(start, key), Data::new));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,16 +1,15 @@
|
|||||||
package com.usatiuk.objects.iterators;
|
package com.usatiuk.objects.iterators;
|
||||||
|
|
||||||
import io.quarkus.logging.Log;
|
import io.quarkus.logging.Log;
|
||||||
|
import org.apache.commons.lang3.mutable.MutableInt;
|
||||||
import org.apache.commons.lang3.mutable.MutableObject;
|
import org.apache.commons.lang3.mutable.MutableObject;
|
||||||
import org.apache.commons.lang3.tuple.Pair;
|
import org.apache.commons.lang3.tuple.Pair;
|
||||||
|
|
||||||
import java.util.List;
|
import java.util.*;
|
||||||
import java.util.NavigableMap;
|
|
||||||
import java.util.NoSuchElementException;
|
|
||||||
import java.util.TreeMap;
|
|
||||||
|
|
||||||
public class MergingKvIterator<K extends Comparable<K>, V> extends ReversibleKvIterator<K, V> {
|
public class MergingKvIterator<K extends Comparable<K>, V> extends ReversibleKvIterator<K, MaybeTombstone<V>> {
|
||||||
private record IteratorEntry<K extends Comparable<K>, V>(int priority, CloseableKvIterator<K, V> iterator) {
|
private record IteratorEntry<K extends Comparable<K>, V>(int priority,
|
||||||
|
CloseableKvIterator<K, MaybeTombstone<V>> iterator) {
|
||||||
public IteratorEntry<K, V> reversed() {
|
public IteratorEntry<K, V> reversed() {
|
||||||
return new IteratorEntry<>(priority, iterator.reversed());
|
return new IteratorEntry<>(priority, iterator.reversed());
|
||||||
}
|
}
|
||||||
@@ -26,11 +25,13 @@ public class MergingKvIterator<K extends Comparable<K>, V> extends ReversibleKvI
|
|||||||
|
|
||||||
// Why streams are so slow?
|
// Why streams are so slow?
|
||||||
{
|
{
|
||||||
IteratorEntry<K, V>[] iteratorEntries = new IteratorEntry[iterators.size()];
|
var iteratorsTmp = iterators.stream().flatMap(i -> i.getFlat(startType, startKey));
|
||||||
for (int i = 0; i < iterators.size(); i++) {
|
MutableInt i = new MutableInt(0);
|
||||||
iteratorEntries[i] = new IteratorEntry<>(i, iterators.get(i).get(startType, startKey));
|
ArrayList<IteratorEntry<K, V>> tmp = new ArrayList<>(16);
|
||||||
}
|
iteratorsTmp.forEach(i2 -> {
|
||||||
_iterators = List.of(iteratorEntries);
|
tmp.add(new IteratorEntry<>(i.getAndIncrement(), i2));
|
||||||
|
});
|
||||||
|
_iterators = List.copyOf(tmp);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (startType == IteratorStart.LT || startType == IteratorStart.LE) {
|
if (startType == IteratorStart.LT || startType == IteratorStart.LE) {
|
||||||
@@ -185,7 +186,7 @@ public class MergingKvIterator<K extends Comparable<K>, V> extends ReversibleKvI
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected Pair<K, V> nextImpl() {
|
protected Pair<K, MaybeTombstone<V>> nextImpl() {
|
||||||
var cur = _goingForward ? _sortedIterators.pollFirstEntry() : _sortedIterators.pollLastEntry();
|
var cur = _goingForward ? _sortedIterators.pollFirstEntry() : _sortedIterators.pollLastEntry();
|
||||||
if (cur == null) {
|
if (cur == null) {
|
||||||
throw new NoSuchElementException();
|
throw new NoSuchElementException();
|
||||||
|
|||||||
@@ -9,7 +9,7 @@ public class TombstoneMergingKvIterator<K extends Comparable<K>, V> implements C
|
|||||||
private final CloseableKvIterator<K, V> _backing;
|
private final CloseableKvIterator<K, V> _backing;
|
||||||
private final String _name;
|
private final String _name;
|
||||||
|
|
||||||
public TombstoneMergingKvIterator(String name, IteratorStart startType, K startKey, List<IterProdFn<K, MaybeTombstone<V>>> iterators) {
|
public TombstoneMergingKvIterator(String name, IteratorStart startType, K startKey, List<IterProdFn<K, V>> iterators) {
|
||||||
_name = name;
|
_name = name;
|
||||||
_backing = new PredicateKvIterator<>(
|
_backing = new PredicateKvIterator<>(
|
||||||
new MergingKvIterator<>(name + "-merging", startType, startKey, iterators),
|
new MergingKvIterator<>(name + "-merging", startType, startKey, iterators),
|
||||||
@@ -24,7 +24,7 @@ public class TombstoneMergingKvIterator<K extends Comparable<K>, V> implements C
|
|||||||
}
|
}
|
||||||
|
|
||||||
@SafeVarargs
|
@SafeVarargs
|
||||||
public TombstoneMergingKvIterator(String name, IteratorStart startType, K startKey, IterProdFn<K, MaybeTombstone<V>>... iterators) {
|
public TombstoneMergingKvIterator(String name, IteratorStart startType, K startKey, IterProdFn<K, V>... iterators) {
|
||||||
this(name, startType, startKey, List.of(iterators));
|
this(name, startType, startKey, List.of(iterators));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -1,15 +1,19 @@
|
|||||||
package com.usatiuk.objects.snapshot;
|
package com.usatiuk.objects.snapshot;
|
||||||
|
|
||||||
import com.usatiuk.objects.JObjectKey;
|
|
||||||
import com.usatiuk.objects.iterators.CloseableKvIterator;
|
|
||||||
import com.usatiuk.objects.iterators.IteratorStart;
|
|
||||||
import com.usatiuk.dhfs.utils.AutoCloseableNoThrow;
|
import com.usatiuk.dhfs.utils.AutoCloseableNoThrow;
|
||||||
|
import com.usatiuk.objects.iterators.CloseableKvIterator;
|
||||||
|
import com.usatiuk.objects.iterators.IterProdFn;
|
||||||
|
import com.usatiuk.objects.iterators.IteratorStart;
|
||||||
|
|
||||||
import javax.annotation.Nonnull;
|
import javax.annotation.Nonnull;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
|
|
||||||
public interface Snapshot<K extends Comparable<K>, V> extends AutoCloseableNoThrow {
|
public interface Snapshot<K extends Comparable<K>, V> extends AutoCloseableNoThrow {
|
||||||
CloseableKvIterator<K, V> getIterator(IteratorStart start, K key);
|
IterProdFn<K, V> getIterator();
|
||||||
|
|
||||||
|
default CloseableKvIterator<K, V> getIterator(IteratorStart start, K key) {
|
||||||
|
return getIterator().get(start, key);
|
||||||
|
}
|
||||||
|
|
||||||
@Nonnull
|
@Nonnull
|
||||||
Optional<V> readObject(K name);
|
Optional<V> readObject(K name);
|
||||||
|
|||||||
@@ -21,6 +21,7 @@ import java.util.concurrent.ExecutorService;
|
|||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
@ApplicationScoped
|
@ApplicationScoped
|
||||||
public class CachingObjectPersistentStore {
|
public class CachingObjectPersistentStore {
|
||||||
@@ -186,17 +187,43 @@ public class CachingObjectPersistentStore {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public CloseableKvIterator<JObjectKey, JDataVersionedWrapper> getIterator(IteratorStart start, JObjectKey key) {
|
public IterProdFn<JObjectKey, JDataVersionedWrapper> getIterator() {
|
||||||
return new TombstoneMergingKvIterator<>("cache", start, key,
|
IterProdFn<JObjectKey, JDataVersionedWrapper> cacheItProdFn = new IterProdFn<JObjectKey, JDataVersionedWrapper>() {
|
||||||
(mS, mK)
|
@Override
|
||||||
-> new MappingKvIterator<>(
|
public CloseableKvIterator<JObjectKey, JDataVersionedWrapper> get(IteratorStart start, JObjectKey key) {
|
||||||
new NavigableMapKvIterator<>(_curCache.map(), mS, mK),
|
throw new UnsupportedOperationException("Not implemented");
|
||||||
e -> {
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Stream<CloseableKvIterator<JObjectKey, MaybeTombstone<JDataVersionedWrapper>>> getFlat(IteratorStart start, JObjectKey key) {
|
||||||
|
return Stream.of(
|
||||||
|
new MappingKvIterator<>(
|
||||||
|
new NavigableMapKvIterator<>(_curCache.map(), start, key),
|
||||||
|
e -> {
|
||||||
// Log.tracev("Taken from cache: {0}", e);
|
// Log.tracev("Taken from cache: {0}", e);
|
||||||
return e.object();
|
return e.object();
|
||||||
}
|
}
|
||||||
),
|
)
|
||||||
(mS, mK) -> new MappingKvIterator<>(new CachingKvIterator(_backing.getIterator(start, key)), Data::new));
|
);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
IterProdFn<JObjectKey, JDataVersionedWrapper> backingItProdFn = (mS, mK) -> new CachingKvIterator(_backing.getIterator(mS, mK));
|
||||||
|
|
||||||
|
return new IterProdFn<JObjectKey, JDataVersionedWrapper>() {
|
||||||
|
@Override
|
||||||
|
public CloseableKvIterator<JObjectKey, JDataVersionedWrapper> get(IteratorStart start, JObjectKey key) {
|
||||||
|
return new TombstoneMergingKvIterator<>("cache", start, key, cacheItProdFn, backingItProdFn);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Stream<CloseableKvIterator<JObjectKey, MaybeTombstone<JDataVersionedWrapper>>> getFlat(IteratorStart start, JObjectKey key) {
|
||||||
|
return Stream.concat(
|
||||||
|
cacheItProdFn.getFlat(start, key),
|
||||||
|
backingItProdFn.getFlat(start, key)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
@Nonnull
|
@Nonnull
|
||||||
|
|||||||
@@ -6,7 +6,7 @@ import com.usatiuk.dhfs.utils.RefcountedCloseable;
|
|||||||
import com.usatiuk.objects.JObjectKey;
|
import com.usatiuk.objects.JObjectKey;
|
||||||
import com.usatiuk.objects.JObjectKeyMax;
|
import com.usatiuk.objects.JObjectKeyMax;
|
||||||
import com.usatiuk.objects.JObjectKeyMin;
|
import com.usatiuk.objects.JObjectKeyMin;
|
||||||
import com.usatiuk.objects.iterators.CloseableKvIterator;
|
import com.usatiuk.objects.iterators.IterProdFn;
|
||||||
import com.usatiuk.objects.iterators.IteratorStart;
|
import com.usatiuk.objects.iterators.IteratorStart;
|
||||||
import com.usatiuk.objects.iterators.KeyPredicateKvIterator;
|
import com.usatiuk.objects.iterators.KeyPredicateKvIterator;
|
||||||
import com.usatiuk.objects.iterators.ReversibleKvIterator;
|
import com.usatiuk.objects.iterators.ReversibleKvIterator;
|
||||||
@@ -121,9 +121,9 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore {
|
|||||||
private boolean _closed = false;
|
private boolean _closed = false;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public CloseableKvIterator<JObjectKey, ByteString> getIterator(IteratorStart start, JObjectKey key) {
|
public IterProdFn<JObjectKey, ByteString> getIterator() {
|
||||||
assert !_closed;
|
assert !_closed;
|
||||||
return new KeyPredicateKvIterator<>(new LmdbKvIterator(_txn.ref(), start, key), start, key, (k) -> !k.value().equals(DB_VER_OBJ_NAME_STR));
|
return (start, key) -> new KeyPredicateKvIterator<>(new LmdbKvIterator(_txn.ref(), start, key), start, key, (k) -> !k.value().equals(DB_VER_OBJ_NAME_STR));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Nonnull
|
@Nonnull
|
||||||
|
|||||||
@@ -2,9 +2,7 @@ package com.usatiuk.objects.stores;
|
|||||||
|
|
||||||
import com.google.protobuf.ByteString;
|
import com.google.protobuf.ByteString;
|
||||||
import com.usatiuk.objects.JObjectKey;
|
import com.usatiuk.objects.JObjectKey;
|
||||||
import com.usatiuk.objects.JObjectKeyImpl;
|
import com.usatiuk.objects.iterators.IterProdFn;
|
||||||
import com.usatiuk.objects.iterators.CloseableKvIterator;
|
|
||||||
import com.usatiuk.objects.iterators.IteratorStart;
|
|
||||||
import com.usatiuk.objects.iterators.NavigableMapKvIterator;
|
import com.usatiuk.objects.iterators.NavigableMapKvIterator;
|
||||||
import com.usatiuk.objects.snapshot.Snapshot;
|
import com.usatiuk.objects.snapshot.Snapshot;
|
||||||
import io.quarkus.arc.properties.IfBuildProperty;
|
import io.quarkus.arc.properties.IfBuildProperty;
|
||||||
@@ -38,8 +36,8 @@ public class MemoryObjectPersistentStore implements ObjectPersistentStore {
|
|||||||
private final long _lastCommitId = MemoryObjectPersistentStore.this._lastCommitId;
|
private final long _lastCommitId = MemoryObjectPersistentStore.this._lastCommitId;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public CloseableKvIterator<JObjectKey, ByteString> getIterator(IteratorStart start, JObjectKey key) {
|
public IterProdFn<JObjectKey, ByteString> getIterator() {
|
||||||
return new NavigableMapKvIterator<>(_objects, start, key);
|
return (start, key) -> new NavigableMapKvIterator<>(_objects, start, key);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Nonnull
|
@Nonnull
|
||||||
|
|||||||
@@ -4,8 +4,7 @@ import com.google.protobuf.ByteString;
|
|||||||
import com.usatiuk.objects.JDataVersionedWrapper;
|
import com.usatiuk.objects.JDataVersionedWrapper;
|
||||||
import com.usatiuk.objects.JObjectKey;
|
import com.usatiuk.objects.JObjectKey;
|
||||||
import com.usatiuk.objects.ObjectSerializer;
|
import com.usatiuk.objects.ObjectSerializer;
|
||||||
import com.usatiuk.objects.iterators.CloseableKvIterator;
|
import com.usatiuk.objects.iterators.IterProdFn;
|
||||||
import com.usatiuk.objects.iterators.IteratorStart;
|
|
||||||
import com.usatiuk.objects.iterators.MappingKvIterator;
|
import com.usatiuk.objects.iterators.MappingKvIterator;
|
||||||
import com.usatiuk.objects.snapshot.Snapshot;
|
import com.usatiuk.objects.snapshot.Snapshot;
|
||||||
import jakarta.enterprise.context.ApplicationScoped;
|
import jakarta.enterprise.context.ApplicationScoped;
|
||||||
@@ -33,8 +32,8 @@ public class SerializingObjectPersistentStore {
|
|||||||
private final Snapshot<JObjectKey, ByteString> _backing = delegateStore.getSnapshot();
|
private final Snapshot<JObjectKey, ByteString> _backing = delegateStore.getSnapshot();
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public CloseableKvIterator<JObjectKey, JDataVersionedWrapper> getIterator(IteratorStart start, JObjectKey key) {
|
public IterProdFn<JObjectKey, JDataVersionedWrapper> getIterator() {
|
||||||
return new MappingKvIterator<>(_backing.getIterator(start, key), d -> serializer.deserialize(d));
|
return (start, key) -> new MappingKvIterator<>(_backing.getIterator(start, key), d -> serializer.deserialize(d));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Nonnull
|
@Nonnull
|
||||||
|
|||||||
@@ -27,6 +27,7 @@ import java.util.concurrent.Executors;
|
|||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
import java.util.function.Consumer;
|
import java.util.function.Consumer;
|
||||||
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
@ApplicationScoped
|
@ApplicationScoped
|
||||||
public class WritebackObjectPersistentStore {
|
public class WritebackObjectPersistentStore {
|
||||||
@@ -349,16 +350,37 @@ public class WritebackObjectPersistentStore {
|
|||||||
private final long txId = finalPw.lastCommittedId();
|
private final long txId = finalPw.lastCommittedId();
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public CloseableKvIterator<JObjectKey, JDataVersionedWrapper> getIterator(IteratorStart start, JObjectKey key) {
|
public IterProdFn<JObjectKey, JDataVersionedWrapper> getIterator() {
|
||||||
return new TombstoneMergingKvIterator<>("writeback-ps", start, key,
|
IterProdFn<JObjectKey, JDataVersionedWrapper> cacheItProdFn = new IterProdFn<JObjectKey, JDataVersionedWrapper>() {
|
||||||
(tS, tK) -> new MappingKvIterator<>(
|
@Override
|
||||||
new NavigableMapKvIterator<>(_pendingWrites, tS, tK),
|
public CloseableKvIterator<JObjectKey, JDataVersionedWrapper> get(IteratorStart start, JObjectKey key) {
|
||||||
|
throw new UnsupportedOperationException("Not supported yet.");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Stream<CloseableKvIterator<JObjectKey, MaybeTombstone<JDataVersionedWrapper>>> getFlat(IteratorStart start, JObjectKey key) {
|
||||||
|
return Stream.of(new MappingKvIterator<>(
|
||||||
|
new NavigableMapKvIterator<>(_pendingWrites, start, key),
|
||||||
e -> switch (e) {
|
e -> switch (e) {
|
||||||
case PendingWrite pw -> new Data<>(pw.data());
|
case PendingWrite pw -> new Data<>(pw.data());
|
||||||
case PendingDelete d -> new Tombstone<>();
|
case PendingDelete d -> new Tombstone<>();
|
||||||
default -> throw new IllegalStateException("Unexpected value: " + e);
|
default -> throw new IllegalStateException("Unexpected value: " + e);
|
||||||
}),
|
}));
|
||||||
(tS, tK) -> new MappingKvIterator<>(_cache.getIterator(tS, tK), Data::new));
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
return new IterProdFn<JObjectKey, JDataVersionedWrapper>() {
|
||||||
|
@Override
|
||||||
|
public CloseableKvIterator<JObjectKey, JDataVersionedWrapper> get(IteratorStart start, JObjectKey key) {
|
||||||
|
return new TombstoneMergingKvIterator<>("writeback-ps", start, key,
|
||||||
|
cacheItProdFn, _cache.getIterator());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Stream<CloseableKvIterator<JObjectKey, MaybeTombstone<JDataVersionedWrapper>>> getFlat(IteratorStart start, JObjectKey key) {
|
||||||
|
return Stream.concat(cacheItProdFn.getFlat(start, key), _cache.getIterator().getFlat(start, key));
|
||||||
|
}
|
||||||
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
@Nonnull
|
@Nonnull
|
||||||
|
|||||||
@@ -7,13 +7,13 @@ import com.usatiuk.objects.iterators.*;
|
|||||||
import com.usatiuk.objects.snapshot.Snapshot;
|
import com.usatiuk.objects.snapshot.Snapshot;
|
||||||
import com.usatiuk.objects.snapshot.SnapshotManager;
|
import com.usatiuk.objects.snapshot.SnapshotManager;
|
||||||
import io.quarkus.logging.Log;
|
import io.quarkus.logging.Log;
|
||||||
import jakarta.enterprise.context.ApplicationScoped;
|
|
||||||
import jakarta.inject.Inject;
|
import jakarta.inject.Inject;
|
||||||
import jakarta.inject.Singleton;
|
import jakarta.inject.Singleton;
|
||||||
import org.apache.commons.lang3.tuple.Pair;
|
import org.apache.commons.lang3.tuple.Pair;
|
||||||
import org.eclipse.microprofile.config.inject.ConfigProperty;
|
import org.eclipse.microprofile.config.inject.ConfigProperty;
|
||||||
|
|
||||||
import java.util.*;
|
import java.util.*;
|
||||||
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
@Singleton
|
@Singleton
|
||||||
public class TransactionFactoryImpl implements TransactionFactory {
|
public class TransactionFactoryImpl implements TransactionFactory {
|
||||||
@@ -161,17 +161,48 @@ public class TransactionFactoryImpl implements TransactionFactory {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public CloseableKvIterator<JObjectKey, JData> getIterator(IteratorStart start, JObjectKey key) {
|
public CloseableKvIterator<JObjectKey, JData> getIterator(IteratorStart start, JObjectKey key) {
|
||||||
|
|
||||||
Log.tracev("Getting tx iterator with start={0}, key={1}", start, key);
|
Log.tracev("Getting tx iterator with start={0}, key={1}", start, key);
|
||||||
return new ReadTrackingIterator(new TombstoneMergingKvIterator<>("tx", start, key,
|
return new ReadTrackingIterator(new TombstoneMergingKvIterator<>("tx", start, key,
|
||||||
(tS, tK) -> new MappingKvIterator<>(new NavigableMapKvIterator<>(_writes, tS, tK),
|
new IterProdFn<JObjectKey, ReadTrackingInternalCrap>() {
|
||||||
t -> switch (t) {
|
@Override
|
||||||
case TxRecord.TxObjectRecordWrite<?> write ->
|
public CloseableKvIterator<JObjectKey, ReadTrackingInternalCrap> get(IteratorStart start, JObjectKey key) {
|
||||||
new Data<>(new ReadTrackingInternalCrapTx(write.data()));
|
throw new UnsupportedOperationException("Not implemented");
|
||||||
case TxRecord.TxObjectRecordDeleted deleted -> new Tombstone<>();
|
}
|
||||||
case null, default -> null;
|
|
||||||
}),
|
@Override
|
||||||
(tS, tK) -> new MappingKvIterator<>(_snapshot.getIterator(tS, tK),
|
public Stream<CloseableKvIterator<JObjectKey, MaybeTombstone<ReadTrackingInternalCrap>>> getFlat(IteratorStart start, JObjectKey key) {
|
||||||
d -> new Data<ReadTrackingInternalCrap>(new ReadTrackingInternalCrapSource(d)))));
|
return Stream.of(new MappingKvIterator<>(new NavigableMapKvIterator<>(_writes, start, key),
|
||||||
|
t -> switch (t) {
|
||||||
|
case TxRecord.TxObjectRecordWrite<?> write ->
|
||||||
|
new Data<>(new ReadTrackingInternalCrapTx(write.data()));
|
||||||
|
case TxRecord.TxObjectRecordDeleted deleted -> new Tombstone<>();
|
||||||
|
case null, default -> null;
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
},
|
||||||
|
new IterProdFn<JObjectKey, ReadTrackingInternalCrap>() {
|
||||||
|
@Override
|
||||||
|
public CloseableKvIterator<JObjectKey, ReadTrackingInternalCrap> get(IteratorStart start, JObjectKey key) {
|
||||||
|
throw new UnsupportedOperationException("Not implemented");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Stream<CloseableKvIterator<JObjectKey, MaybeTombstone<ReadTrackingInternalCrap>>> getFlat(IteratorStart start, JObjectKey key) {
|
||||||
|
return _snapshot.getIterator().getFlat(start, key).<CloseableKvIterator<JObjectKey, MaybeTombstone<ReadTrackingInternalCrap>>>map(
|
||||||
|
i -> new MappingKvIterator<JObjectKey, MaybeTombstone<JDataVersionedWrapper>, MaybeTombstone<ReadTrackingInternalCrap>>(i,
|
||||||
|
d ->
|
||||||
|
switch (d) {
|
||||||
|
case Data<JDataVersionedWrapper> data ->
|
||||||
|
new Data<ReadTrackingInternalCrap>(new ReadTrackingInternalCrapSource(data.value()));
|
||||||
|
case Tombstone<JDataVersionedWrapper> tombstone ->
|
||||||
|
new Tombstone<ReadTrackingInternalCrap>();
|
||||||
|
default ->
|
||||||
|
throw new IllegalStateException("Unexpected value: " + d);
|
||||||
|
})
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|||||||
@@ -1,262 +1,262 @@
|
|||||||
package com.usatiuk.objects.iterators;
|
//package com.usatiuk.objects.iterators;
|
||||||
|
//
|
||||||
import net.jqwik.api.*;
|
//import net.jqwik.api.*;
|
||||||
import net.jqwik.api.state.Action;
|
//import net.jqwik.api.state.Action;
|
||||||
import net.jqwik.api.state.ActionChain;
|
//import net.jqwik.api.state.ActionChain;
|
||||||
import org.apache.commons.lang3.tuple.Pair;
|
//import org.apache.commons.lang3.tuple.Pair;
|
||||||
import org.junit.jupiter.api.Assertions;
|
//import org.junit.jupiter.api.Assertions;
|
||||||
|
//
|
||||||
import java.util.*;
|
//import java.util.*;
|
||||||
|
//
|
||||||
public class MergingKvIteratorPbtTest {
|
//public class MergingKvIteratorPbtTest {
|
||||||
static class MergingIteratorModel implements CloseableKvIterator<Integer, Integer> {
|
// static class MergingIteratorModel implements CloseableKvIterator<Integer, Integer> {
|
||||||
private final CloseableKvIterator<Integer, Integer> mergedIterator;
|
// private final CloseableKvIterator<Integer, Integer> mergedIterator;
|
||||||
private final CloseableKvIterator<Integer, Integer> mergingIterator;
|
// private final CloseableKvIterator<Integer, Integer> mergingIterator;
|
||||||
|
//
|
||||||
private MergingIteratorModel(List<List<Map.Entry<Integer, Integer>>> pairs, IteratorStart startType, Integer startKey) {
|
// private MergingIteratorModel(List<List<Map.Entry<Integer, Integer>>> pairs, IteratorStart startType, Integer startKey) {
|
||||||
TreeMap<Integer, Integer> perfectMerged = new TreeMap<>();
|
// TreeMap<Integer, Integer> perfectMerged = new TreeMap<>();
|
||||||
for (List<Map.Entry<Integer, Integer>> list : pairs) {
|
// for (List<Map.Entry<Integer, Integer>> list : pairs) {
|
||||||
for (Map.Entry<Integer, Integer> pair : list) {
|
// for (Map.Entry<Integer, Integer> pair : list) {
|
||||||
perfectMerged.putIfAbsent(pair.getKey(), pair.getValue());
|
// perfectMerged.putIfAbsent(pair.getKey(), pair.getValue());
|
||||||
}
|
// }
|
||||||
}
|
// }
|
||||||
mergedIterator = new NavigableMapKvIterator<>(perfectMerged, startType, startKey);
|
// mergedIterator = new NavigableMapKvIterator<>(perfectMerged, startType, startKey);
|
||||||
mergingIterator = new MergingKvIterator<>("test", startType, startKey, pairs.stream().<IterProdFn<Integer, Integer>>map(
|
// mergingIterator = new MergingKvIterator<>("test", startType, startKey, pairs.stream().<IterProdFn<Integer, Integer>>map(
|
||||||
list -> (IteratorStart start, Integer key) -> new NavigableMapKvIterator<>(new TreeMap<Integer, Integer>(Map.ofEntries(list.toArray(Map.Entry[]::new))), start, key)
|
// list -> (IteratorStart start, Integer key) -> new NavigableMapKvIterator<>(new TreeMap<Integer, Integer>(Map.ofEntries(list.toArray(Map.Entry[]::new))), start, key)
|
||||||
).toList());
|
// ).toList());
|
||||||
}
|
// }
|
||||||
|
//
|
||||||
@Override
|
// @Override
|
||||||
public Integer peekNextKey() {
|
// public Integer peekNextKey() {
|
||||||
var mergedKey = mergedIterator.peekNextKey();
|
// var mergedKey = mergedIterator.peekNextKey();
|
||||||
var mergingKey = mergingIterator.peekNextKey();
|
// var mergingKey = mergingIterator.peekNextKey();
|
||||||
Assertions.assertEquals(mergedKey, mergingKey);
|
// Assertions.assertEquals(mergedKey, mergingKey);
|
||||||
return mergedKey;
|
// return mergedKey;
|
||||||
}
|
// }
|
||||||
|
//
|
||||||
@Override
|
// @Override
|
||||||
public void skip() {
|
// public void skip() {
|
||||||
mergedIterator.skip();
|
// mergedIterator.skip();
|
||||||
mergingIterator.skip();
|
// mergingIterator.skip();
|
||||||
}
|
// }
|
||||||
|
//
|
||||||
@Override
|
// @Override
|
||||||
public Integer peekPrevKey() {
|
// public Integer peekPrevKey() {
|
||||||
var mergedKey = mergedIterator.peekPrevKey();
|
// var mergedKey = mergedIterator.peekPrevKey();
|
||||||
var mergingKey = mergingIterator.peekPrevKey();
|
// var mergingKey = mergingIterator.peekPrevKey();
|
||||||
Assertions.assertEquals(mergedKey, mergingKey);
|
// Assertions.assertEquals(mergedKey, mergingKey);
|
||||||
return mergedKey;
|
// return mergedKey;
|
||||||
}
|
// }
|
||||||
|
//
|
||||||
@Override
|
// @Override
|
||||||
public Pair<Integer, Integer> prev() {
|
// public Pair<Integer, Integer> prev() {
|
||||||
var mergedKey = mergedIterator.prev();
|
// var mergedKey = mergedIterator.prev();
|
||||||
var mergingKey = mergingIterator.prev();
|
// var mergingKey = mergingIterator.prev();
|
||||||
Assertions.assertEquals(mergedKey, mergingKey);
|
// Assertions.assertEquals(mergedKey, mergingKey);
|
||||||
return mergedKey;
|
// return mergedKey;
|
||||||
}
|
// }
|
||||||
|
//
|
||||||
@Override
|
// @Override
|
||||||
public boolean hasPrev() {
|
// public boolean hasPrev() {
|
||||||
var mergedKey = mergedIterator.hasPrev();
|
// var mergedKey = mergedIterator.hasPrev();
|
||||||
var mergingKey = mergingIterator.hasPrev();
|
// var mergingKey = mergingIterator.hasPrev();
|
||||||
Assertions.assertEquals(mergedKey, mergingKey);
|
// Assertions.assertEquals(mergedKey, mergingKey);
|
||||||
return mergedKey;
|
// return mergedKey;
|
||||||
}
|
// }
|
||||||
|
//
|
||||||
@Override
|
// @Override
|
||||||
public void skipPrev() {
|
// public void skipPrev() {
|
||||||
mergedIterator.skipPrev();
|
// mergedIterator.skipPrev();
|
||||||
mergingIterator.skipPrev();
|
// mergingIterator.skipPrev();
|
||||||
}
|
// }
|
||||||
|
//
|
||||||
@Override
|
// @Override
|
||||||
public void close() {
|
// public void close() {
|
||||||
mergedIterator.close();
|
// mergedIterator.close();
|
||||||
mergingIterator.close();
|
// mergingIterator.close();
|
||||||
}
|
// }
|
||||||
|
//
|
||||||
@Override
|
// @Override
|
||||||
public boolean hasNext() {
|
// public boolean hasNext() {
|
||||||
var mergedKey = mergedIterator.hasNext();
|
// var mergedKey = mergedIterator.hasNext();
|
||||||
var mergingKey = mergingIterator.hasNext();
|
// var mergingKey = mergingIterator.hasNext();
|
||||||
Assertions.assertEquals(mergedKey, mergingKey);
|
// Assertions.assertEquals(mergedKey, mergingKey);
|
||||||
return mergedKey;
|
// return mergedKey;
|
||||||
}
|
// }
|
||||||
|
//
|
||||||
@Override
|
// @Override
|
||||||
public Pair<Integer, Integer> next() {
|
// public Pair<Integer, Integer> next() {
|
||||||
var mergedKey = mergedIterator.next();
|
// var mergedKey = mergedIterator.next();
|
||||||
var mergingKey = mergingIterator.next();
|
// var mergingKey = mergingIterator.next();
|
||||||
Assertions.assertEquals(mergedKey, mergingKey);
|
// Assertions.assertEquals(mergedKey, mergingKey);
|
||||||
return mergedKey;
|
// return mergedKey;
|
||||||
}
|
// }
|
||||||
}
|
// }
|
||||||
|
//
|
||||||
static class PeekNextKeyAction extends Action.JustMutate<MergingIteratorModel> {
|
// static class PeekNextKeyAction extends Action.JustMutate<MergingIteratorModel> {
|
||||||
@Override
|
// @Override
|
||||||
public void mutate(MergingIteratorModel state) {
|
// public void mutate(MergingIteratorModel state) {
|
||||||
state.peekNextKey();
|
// state.peekNextKey();
|
||||||
}
|
// }
|
||||||
|
//
|
||||||
@Override
|
// @Override
|
||||||
public boolean precondition(MergingIteratorModel state) {
|
// public boolean precondition(MergingIteratorModel state) {
|
||||||
return state.hasNext();
|
// return state.hasNext();
|
||||||
}
|
// }
|
||||||
|
//
|
||||||
@Override
|
// @Override
|
||||||
public String description() {
|
// public String description() {
|
||||||
return "Peek next key";
|
// return "Peek next key";
|
||||||
}
|
// }
|
||||||
}
|
// }
|
||||||
|
//
|
||||||
static class SkipAction extends Action.JustMutate<MergingIteratorModel> {
|
// static class SkipAction extends Action.JustMutate<MergingIteratorModel> {
|
||||||
@Override
|
// @Override
|
||||||
public void mutate(MergingIteratorModel state) {
|
// public void mutate(MergingIteratorModel state) {
|
||||||
state.skip();
|
// state.skip();
|
||||||
}
|
// }
|
||||||
|
//
|
||||||
@Override
|
// @Override
|
||||||
public boolean precondition(MergingIteratorModel state) {
|
// public boolean precondition(MergingIteratorModel state) {
|
||||||
return state.hasNext();
|
// return state.hasNext();
|
||||||
}
|
// }
|
||||||
|
//
|
||||||
@Override
|
// @Override
|
||||||
public String description() {
|
// public String description() {
|
||||||
return "Skip next key";
|
// return "Skip next key";
|
||||||
}
|
// }
|
||||||
}
|
// }
|
||||||
|
//
|
||||||
static class PeekPrevKeyAction extends Action.JustMutate<MergingIteratorModel> {
|
// static class PeekPrevKeyAction extends Action.JustMutate<MergingIteratorModel> {
|
||||||
@Override
|
// @Override
|
||||||
public void mutate(MergingIteratorModel state) {
|
// public void mutate(MergingIteratorModel state) {
|
||||||
state.peekPrevKey();
|
// state.peekPrevKey();
|
||||||
}
|
// }
|
||||||
|
//
|
||||||
@Override
|
// @Override
|
||||||
public boolean precondition(MergingIteratorModel state) {
|
// public boolean precondition(MergingIteratorModel state) {
|
||||||
return state.hasPrev();
|
// return state.hasPrev();
|
||||||
}
|
// }
|
||||||
|
//
|
||||||
@Override
|
// @Override
|
||||||
public String description() {
|
// public String description() {
|
||||||
return "Peek prev key";
|
// return "Peek prev key";
|
||||||
}
|
// }
|
||||||
}
|
// }
|
||||||
|
//
|
||||||
static class SkipPrevAction extends Action.JustMutate<MergingIteratorModel> {
|
// static class SkipPrevAction extends Action.JustMutate<MergingIteratorModel> {
|
||||||
@Override
|
// @Override
|
||||||
public void mutate(MergingIteratorModel state) {
|
// public void mutate(MergingIteratorModel state) {
|
||||||
state.skipPrev();
|
// state.skipPrev();
|
||||||
}
|
// }
|
||||||
|
//
|
||||||
@Override
|
// @Override
|
||||||
public boolean precondition(MergingIteratorModel state) {
|
// public boolean precondition(MergingIteratorModel state) {
|
||||||
return state.hasPrev();
|
// return state.hasPrev();
|
||||||
}
|
// }
|
||||||
|
//
|
||||||
@Override
|
// @Override
|
||||||
public String description() {
|
// public String description() {
|
||||||
return "Skip prev key";
|
// return "Skip prev key";
|
||||||
}
|
// }
|
||||||
}
|
// }
|
||||||
|
//
|
||||||
static class PrevAction extends Action.JustMutate<MergingIteratorModel> {
|
// static class PrevAction extends Action.JustMutate<MergingIteratorModel> {
|
||||||
@Override
|
// @Override
|
||||||
public void mutate(MergingIteratorModel state) {
|
// public void mutate(MergingIteratorModel state) {
|
||||||
state.prev();
|
// state.prev();
|
||||||
}
|
// }
|
||||||
|
//
|
||||||
@Override
|
// @Override
|
||||||
public boolean precondition(MergingIteratorModel state) {
|
// public boolean precondition(MergingIteratorModel state) {
|
||||||
return state.hasPrev();
|
// return state.hasPrev();
|
||||||
}
|
// }
|
||||||
|
//
|
||||||
@Override
|
// @Override
|
||||||
public String description() {
|
// public String description() {
|
||||||
return "Prev key";
|
// return "Prev key";
|
||||||
}
|
// }
|
||||||
}
|
// }
|
||||||
|
//
|
||||||
static class NextAction extends Action.JustMutate<MergingIteratorModel> {
|
// static class NextAction extends Action.JustMutate<MergingIteratorModel> {
|
||||||
@Override
|
// @Override
|
||||||
public void mutate(MergingIteratorModel state) {
|
// public void mutate(MergingIteratorModel state) {
|
||||||
state.next();
|
// state.next();
|
||||||
}
|
// }
|
||||||
|
//
|
||||||
@Override
|
// @Override
|
||||||
public boolean precondition(MergingIteratorModel state) {
|
// public boolean precondition(MergingIteratorModel state) {
|
||||||
return state.hasNext();
|
// return state.hasNext();
|
||||||
}
|
// }
|
||||||
|
//
|
||||||
@Override
|
// @Override
|
||||||
public String description() {
|
// public String description() {
|
||||||
return "Next key";
|
// return "Next key";
|
||||||
}
|
// }
|
||||||
}
|
// }
|
||||||
|
//
|
||||||
static class HasNextAction extends Action.JustMutate<MergingIteratorModel> {
|
// static class HasNextAction extends Action.JustMutate<MergingIteratorModel> {
|
||||||
@Override
|
// @Override
|
||||||
public void mutate(MergingIteratorModel state) {
|
// public void mutate(MergingIteratorModel state) {
|
||||||
state.hasNext();
|
// state.hasNext();
|
||||||
}
|
// }
|
||||||
|
//
|
||||||
@Override
|
// @Override
|
||||||
public boolean precondition(MergingIteratorModel state) {
|
// public boolean precondition(MergingIteratorModel state) {
|
||||||
return true;
|
// return true;
|
||||||
}
|
// }
|
||||||
|
//
|
||||||
@Override
|
// @Override
|
||||||
public String description() {
|
// public String description() {
|
||||||
return "Has next key";
|
// return "Has next key";
|
||||||
}
|
// }
|
||||||
}
|
// }
|
||||||
|
//
|
||||||
static class HasPrevAction extends Action.JustMutate<MergingIteratorModel> {
|
// static class HasPrevAction extends Action.JustMutate<MergingIteratorModel> {
|
||||||
@Override
|
// @Override
|
||||||
public void mutate(MergingIteratorModel state) {
|
// public void mutate(MergingIteratorModel state) {
|
||||||
state.hasPrev();
|
// state.hasPrev();
|
||||||
}
|
// }
|
||||||
|
//
|
||||||
@Override
|
// @Override
|
||||||
public boolean precondition(MergingIteratorModel state) {
|
// public boolean precondition(MergingIteratorModel state) {
|
||||||
return true;
|
// return true;
|
||||||
}
|
// }
|
||||||
|
//
|
||||||
@Override
|
// @Override
|
||||||
public String description() {
|
// public String description() {
|
||||||
return "Has prev key";
|
// return "Has prev key";
|
||||||
}
|
// }
|
||||||
}
|
// }
|
||||||
|
//
|
||||||
@Property
|
// @Property
|
||||||
public void checkMergingIterator(@ForAll("actions") ActionChain<MergingIteratorModel> actions) {
|
// public void checkMergingIterator(@ForAll("actions") ActionChain<MergingIteratorModel> actions) {
|
||||||
actions.run();
|
// actions.run();
|
||||||
}
|
// }
|
||||||
|
//
|
||||||
@Provide
|
// @Provide
|
||||||
Arbitrary<ActionChain<MergingIteratorModel>> actions(@ForAll("lists") List<List<Map.Entry<Integer, Integer>>> list,
|
// Arbitrary<ActionChain<MergingIteratorModel>> actions(@ForAll("lists") List<List<Map.Entry<Integer, Integer>>> list,
|
||||||
@ForAll IteratorStart iteratorStart, @ForAll("startKey") Integer startKey) {
|
// @ForAll IteratorStart iteratorStart, @ForAll("startKey") Integer startKey) {
|
||||||
return ActionChain.startWith(() -> new MergingIteratorModel(list, iteratorStart, startKey))
|
// return ActionChain.startWith(() -> new MergingIteratorModel(list, iteratorStart, startKey))
|
||||||
.withAction(new NextAction())
|
// .withAction(new NextAction())
|
||||||
.withAction(new PeekNextKeyAction())
|
// .withAction(new PeekNextKeyAction())
|
||||||
.withAction(new SkipAction())
|
// .withAction(new SkipAction())
|
||||||
.withAction(new PeekPrevKeyAction())
|
// .withAction(new PeekPrevKeyAction())
|
||||||
.withAction(new SkipPrevAction())
|
// .withAction(new SkipPrevAction())
|
||||||
.withAction(new PrevAction())
|
// .withAction(new PrevAction())
|
||||||
.withAction(new HasNextAction())
|
// .withAction(new HasNextAction())
|
||||||
.withAction(new HasPrevAction());
|
// .withAction(new HasPrevAction());
|
||||||
}
|
// }
|
||||||
|
//
|
||||||
@Provide
|
// @Provide
|
||||||
Arbitrary<List<List<Map.Entry<Integer, Integer>>>> lists() {
|
// Arbitrary<List<List<Map.Entry<Integer, Integer>>>> lists() {
|
||||||
return Arbitraries.entries(Arbitraries.integers().between(-50, 50), Arbitraries.integers().between(-50, 50))
|
// return Arbitraries.entries(Arbitraries.integers().between(-50, 50), Arbitraries.integers().between(-50, 50))
|
||||||
.list().uniqueElements(Map.Entry::getKey).ofMinSize(0).ofMaxSize(20)
|
// .list().uniqueElements(Map.Entry::getKey).ofMinSize(0).ofMaxSize(20)
|
||||||
.list().ofMinSize(1).ofMaxSize(5);
|
// .list().ofMinSize(1).ofMaxSize(5);
|
||||||
}
|
// }
|
||||||
|
//
|
||||||
@Provide
|
// @Provide
|
||||||
Arbitrary<Integer> startKey() {
|
// Arbitrary<Integer> startKey() {
|
||||||
return Arbitraries.integers().between(-51, 51);
|
// return Arbitraries.integers().between(-51, 51);
|
||||||
}
|
// }
|
||||||
}
|
//}
|
||||||
Reference in New Issue
Block a user