diff --git a/dhfs-parent/objects/pom.xml b/dhfs-parent/objects/pom.xml index f79b8e9b..534d8cc0 100644 --- a/dhfs-parent/objects/pom.xml +++ b/dhfs-parent/objects/pom.xml @@ -18,6 +18,11 @@ + + org.apache.fury + fury-core + 0.10.1 + net.jqwik jqwik diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/JavaDataSerializer.java b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/JavaDataSerializer.java index 14581634..75bde6af 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/JavaDataSerializer.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/JavaDataSerializer.java @@ -3,26 +3,30 @@ package com.usatiuk.objects; import com.google.protobuf.ByteString; import com.google.protobuf.UnsafeByteOperations; -import com.usatiuk.utils.SerializationHelper; import io.quarkus.arc.DefaultBean; import jakarta.enterprise.context.ApplicationScoped; +import org.apache.fury.Fury; +import org.apache.fury.ThreadSafeFury; +import org.apache.fury.config.Language; -import java.io.IOException; import java.nio.ByteBuffer; @ApplicationScoped @DefaultBean public class JavaDataSerializer implements ObjectSerializer { + private static final ThreadSafeFury fury = Fury.builder().withLanguage(Language.JAVA) + // Allow to deserialize objects unknown types, + // more flexible but less secure. + .requireClassRegistration(false) + .buildThreadSafeFury(); + @Override public ByteString serialize(JData obj) { - return SerializationHelper.serialize(obj); + return UnsafeByteOperations.unsafeWrap(fury.serialize(obj)); } + @Override public JData deserialize(ByteBuffer data) { - try (var is = UnsafeByteOperations.unsafeWrap(data).newInput()) { - return SerializationHelper.deserialize(is); - } catch (IOException e) { - throw new RuntimeException(e); - } + return (JData) fury.deserialize(data); } } diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/iterators/IterProdFn2.java b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/iterators/IterProdFn2.java new file mode 100644 index 00000000..d51afdf4 --- /dev/null +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/iterators/IterProdFn2.java @@ -0,0 +1,8 @@ +package com.usatiuk.objects.iterators; + +import java.util.stream.Stream; + +@FunctionalInterface +public interface IterProdFn2, V> { + Stream>> get(IteratorStart start, K key); +} diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/iterators/MergingKvIterator.java b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/iterators/MergingKvIterator.java index 49953199..c144be88 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/iterators/MergingKvIterator.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/iterators/MergingKvIterator.java @@ -13,6 +13,7 @@ public class MergingKvIterator, V> extends ReversibleKvI private final NavigableMap> _sortedIterators = new TreeMap<>(); private final String _name; private final List> _iterators; + public MergingKvIterator(String name, IteratorStart startType, K startKey, List> iterators) { _goingForward = true; _name = name; @@ -73,6 +74,84 @@ public class MergingKvIterator, V> extends ReversibleKvI advanceIterator(iterator); } +// Log.tracev("{0} Initialized: {1}", _name, _sortedIterators); +// switch (startType) { +//// case LT -> { +//// assert _sortedIterators.isEmpty() || _sortedIterators.firstKey().compareTo(initialStartKey) < 0; +//// } +//// case LE -> { +//// assert _sortedIterators.isEmpty() || _sortedIterators.firstKey().compareTo(initialStartKey) <= 0; +//// } +// case GT -> { +// assert _sortedIterators.isEmpty() || _sortedIterators.firstKey().compareTo(startKey) > 0; +// } +// case GE -> { +// assert _sortedIterators.isEmpty() || _sortedIterators.firstKey().compareTo(startKey) >= 0; +// } +// } + } + + public MergingKvIterator(String name, IteratorStart startType, K startKey, IterProdFn2 iteratorsProd) { + _goingForward = true; + _name = name; + + // Why streams are so slow? + { + var iterators = iteratorsProd.get(startType, startKey).toList(); + IteratorEntry[] iteratorEntries = new IteratorEntry[iterators.size()]; + for (int i = 0; i < iterators.size(); i++) { + iteratorEntries[i] = new IteratorEntry<>(i, (CloseableKvIterator) iterators.get(i)); + } + _iterators = List.of(iteratorEntries); + } + + if (startType == IteratorStart.LT || startType == IteratorStart.LE) { + // Starting at a greatest key less than/less or equal than: + // We have a bunch of iterators that have given us theirs "greatest LT/LE key" + // now we need to pick the greatest of those to start with + // But if some of them don't have a lesser key, we need to pick the smallest of those + + K greatestLess = null; + K smallestMore = null; + + for (var ite : _iterators) { + var it = ite.iterator(); + if (it.hasNext()) { + var peeked = it.peekNextKey(); + if (startType == IteratorStart.LE ? peeked.compareTo(startKey) <= 0 : peeked.compareTo(startKey) < 0) { + if (greatestLess == null || peeked.compareTo(greatestLess) > 0) { + greatestLess = peeked; + } + } else { + if (smallestMore == null || peeked.compareTo(smallestMore) < 0) { + smallestMore = peeked; + } + } + } + } + + K initialMaxValue; + if (greatestLess != null) + initialMaxValue = greatestLess; + else + initialMaxValue = smallestMore; + + if (initialMaxValue == null) { + // Empty iterators + } + + for (var ite : _iterators) { + var iterator = ite.iterator(); + while (iterator.hasNext() && iterator.peekNextKey().compareTo(initialMaxValue) < 0) { + iterator.skip(); + } + } + } + + for (IteratorEntry iterator : _iterators) { + advanceIterator(iterator); + } + // Log.tracev("{0} Initialized: {1}", _name, _sortedIterators); // switch (startType) { //// case LT -> { diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/iterators/TombstoneMergingKvIterator.java b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/iterators/TombstoneMergingKvIterator.java index 097b23e8..07290c3f 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/iterators/TombstoneMergingKvIterator.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/iterators/TombstoneMergingKvIterator.java @@ -19,4 +19,18 @@ public abstract class TombstoneMergingKvIterator { public static , V> CloseableKvIterator of(String name, IteratorStart startType, K startKey, IterProdFn>... iterators) { return of(name, startType, startKey, List.of(iterators)); } + + public static , V> CloseableKvIterator of(String name, IteratorStart startType, K startKey, IterProdFn2 itProd) { + return new PredicateKvIterator, V>( + new MergingKvIterator>(name + "-merging", startType, startKey, (IterProdFn2>) itProd), + startType, startKey, + pair -> { +// Log.tracev("{0} - Processing pair {1}", name, pair); + if (pair instanceof Tombstone) { + return null; + } + return ((Data) pair).value(); + }); + } + } diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/snapshot/Snapshot.java b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/snapshot/Snapshot.java index 6b1a6bdd..7fbcab3a 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/snapshot/Snapshot.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/snapshot/Snapshot.java @@ -2,13 +2,16 @@ package com.usatiuk.objects.snapshot; import com.usatiuk.objects.iterators.CloseableKvIterator; import com.usatiuk.objects.iterators.IteratorStart; +import com.usatiuk.objects.iterators.MaybeTombstone; +import com.usatiuk.objects.iterators.Tombstone; import com.usatiuk.utils.AutoCloseableNoThrow; import javax.annotation.Nonnull; import java.util.Optional; +import java.util.stream.Stream; public interface Snapshot, V> extends AutoCloseableNoThrow { - CloseableKvIterator getIterator(IteratorStart start, K key); + Stream>> getIterator(IteratorStart start, K key); @Nonnull Optional readObject(K name); diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/stores/CachingObjectPersistentStore.java b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/stores/CachingObjectPersistentStore.java index a04c70e8..8c1b92ae 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/stores/CachingObjectPersistentStore.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/stores/CachingObjectPersistentStore.java @@ -21,6 +21,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Stream; @ApplicationScoped public class CachingObjectPersistentStore { @@ -33,6 +34,7 @@ public class CachingObjectPersistentStore { private ExecutorService _statusExecutor; private AtomicLong _cached = new AtomicLong(); private AtomicLong _cacheTries = new AtomicLong(); + public CachingObjectPersistentStore(@ConfigProperty(name = "dhfs.objects.lru.limit") int sizeLimit) { _cache = new AtomicReference<>( new Cache(TreePMap.empty(), 0, -1, sizeLimit) @@ -150,10 +152,11 @@ public class CachingObjectPersistentStore { } @Override - public CloseableKvIterator getIterator(IteratorStart start, JObjectKey key) { - return TombstoneMergingKvIterator.of("cache", start, key, - (mS, mK) -> new NavigableMapKvIterator>(_curCache.map(), mS, mK), - (mS, mK) -> new CachingKvIterator(_backing.getIterator(start, key))); + public Stream>> getIterator(IteratorStart start, JObjectKey key) { + return Stream.concat( + Stream.of(new NavigableMapKvIterator>(_curCache.map(), start, key)), + _backing.getIterator(start, key).map(i -> new CachingKvIterator((CloseableKvIterator) (CloseableKvIterator) i)) + ); } @Nonnull diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/stores/LmdbObjectPersistentStore.java b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/stores/LmdbObjectPersistentStore.java index c922360b..c50335a2 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/stores/LmdbObjectPersistentStore.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/stores/LmdbObjectPersistentStore.java @@ -3,10 +3,7 @@ package com.usatiuk.objects.stores; import com.usatiuk.objects.JObjectKey; import com.usatiuk.objects.JObjectKeyMax; import com.usatiuk.objects.JObjectKeyMin; -import com.usatiuk.objects.iterators.CloseableKvIterator; -import com.usatiuk.objects.iterators.IteratorStart; -import com.usatiuk.objects.iterators.KeyPredicateKvIterator; -import com.usatiuk.objects.iterators.ReversibleKvIterator; +import com.usatiuk.objects.iterators.*; import com.usatiuk.objects.snapshot.Snapshot; import io.quarkus.arc.properties.IfBuildProperty; import io.quarkus.logging.Log; @@ -28,6 +25,7 @@ import java.nio.charset.StandardCharsets; import java.nio.file.Path; import java.util.NoSuchElementException; import java.util.Optional; +import java.util.stream.Stream; import static org.lmdbjava.DbiFlags.MDB_CREATE; import static org.lmdbjava.Env.create; @@ -112,9 +110,9 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore { private boolean _closed = false; @Override - public CloseableKvIterator getIterator(IteratorStart start, JObjectKey key) { + public Stream>> getIterator(IteratorStart start, JObjectKey key) { assert !_closed; - return new KeyPredicateKvIterator<>(new LmdbKvIterator(_txn, start, key), start, key, (k) -> !k.value().equals(DB_VER_OBJ_NAME_STR)); + return Stream.of(new KeyPredicateKvIterator<>(new LmdbKvIterator(_txn, start, key), start, key, (k) -> !k.value().equals(DB_VER_OBJ_NAME_STR))); } @Nonnull @@ -195,7 +193,7 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore { return _root.toFile().getUsableSpace(); } - private class LmdbKvIterator extends ReversibleKvIterator { + private class LmdbKvIterator extends ReversibleKvIterator> { private static final Cleaner CLEANER = Cleaner.create(); private final Txn _txn; // Managed by the snapshot private final Cursor _cursor; @@ -350,13 +348,13 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore { } @Override - protected Pair nextImpl() { + protected Pair> nextImpl() { if (!_hasNext) { throw new NoSuchElementException("No more elements"); } // TODO: Right now with java serialization it doesn't matter, it's all copied to arrays anyway var val = _cursor.val(); - var ret = Pair.of(JObjectKey.fromByteBuffer(_cursor.key()), val.asReadOnlyBuffer()); + Pair> ret = Pair.of(JObjectKey.fromByteBuffer(_cursor.key()), new DataWrapper<>(val.asReadOnlyBuffer())); if (_goingForward) _hasNext = _cursor.next(); else diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/stores/MemoryObjectPersistentStore.java b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/stores/MemoryObjectPersistentStore.java index 79765b4f..3cefb2f6 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/stores/MemoryObjectPersistentStore.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/stores/MemoryObjectPersistentStore.java @@ -2,10 +2,7 @@ package com.usatiuk.objects.stores; import com.google.protobuf.ByteString; import com.usatiuk.objects.JObjectKey; -import com.usatiuk.objects.iterators.CloseableKvIterator; -import com.usatiuk.objects.iterators.IteratorStart; -import com.usatiuk.objects.iterators.MappingKvIterator; -import com.usatiuk.objects.iterators.NavigableMapKvIterator; +import com.usatiuk.objects.iterators.*; import com.usatiuk.objects.snapshot.Snapshot; import io.quarkus.arc.properties.IfBuildProperty; import jakarta.enterprise.context.ApplicationScoped; @@ -15,6 +12,7 @@ import javax.annotation.Nonnull; import java.nio.ByteBuffer; import java.util.Optional; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Stream; @ApplicationScoped @IfBuildProperty(name = "dhfs.objects.persistence", stringValue = "memory") @@ -31,8 +29,8 @@ public class MemoryObjectPersistentStore implements ObjectPersistentStore { private final long _lastCommitId = MemoryObjectPersistentStore.this._lastCommitId; @Override - public CloseableKvIterator getIterator(IteratorStart start, JObjectKey key) { - return new MappingKvIterator<>(new NavigableMapKvIterator<>(_objects, start, key), ByteString::asReadOnlyByteBuffer); + public Stream>> getIterator(IteratorStart start, JObjectKey key) { + return Stream.of(new MappingKvIterator<>(new NavigableMapKvIterator<>(_objects, start, key), s -> new DataWrapper<>(s.asReadOnlyByteBuffer()))); } @Nonnull diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/stores/SerializingObjectPersistentStore.java b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/stores/SerializingObjectPersistentStore.java index 4af76bde..ef27e1e6 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/stores/SerializingObjectPersistentStore.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/stores/SerializingObjectPersistentStore.java @@ -1,13 +1,9 @@ package com.usatiuk.objects.stores; -import com.google.protobuf.ByteString; import com.usatiuk.objects.JDataVersionedWrapper; import com.usatiuk.objects.JDataVersionedWrapperSerializer; import com.usatiuk.objects.JObjectKey; -import com.usatiuk.objects.ObjectSerializer; -import com.usatiuk.objects.iterators.CloseableKvIterator; -import com.usatiuk.objects.iterators.IteratorStart; -import com.usatiuk.objects.iterators.MappingKvIterator; +import com.usatiuk.objects.iterators.*; import com.usatiuk.objects.snapshot.Snapshot; import jakarta.enterprise.context.ApplicationScoped; import jakarta.inject.Inject; @@ -16,6 +12,7 @@ import org.apache.commons.lang3.tuple.Pair; import javax.annotation.Nonnull; import java.nio.ByteBuffer; import java.util.Optional; +import java.util.stream.Stream; @ApplicationScoped public class SerializingObjectPersistentStore { @@ -30,8 +27,9 @@ public class SerializingObjectPersistentStore { private final Snapshot _backing = delegateStore.getSnapshot(); @Override - public CloseableKvIterator getIterator(IteratorStart start, JObjectKey key) { - return new MappingKvIterator<>(_backing.getIterator(start, key), d -> serializer.deserialize(d)); + public Stream>> getIterator(IteratorStart start, JObjectKey key) { + return _backing.getIterator(start, key).map(i -> new MappingKvIterator, MaybeTombstone>(i, + d -> serializer.deserialize(((DataWrapper) d).value()))); } @Nonnull diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/stores/WritebackObjectPersistentStore.java b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/stores/WritebackObjectPersistentStore.java index 41a449e9..738d01ad 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/stores/WritebackObjectPersistentStore.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/stores/WritebackObjectPersistentStore.java @@ -3,7 +3,10 @@ package com.usatiuk.objects.stores; import com.usatiuk.objects.JDataVersionedWrapper; import com.usatiuk.objects.JDataVersionedWrapperImpl; import com.usatiuk.objects.JObjectKey; -import com.usatiuk.objects.iterators.*; +import com.usatiuk.objects.iterators.CloseableKvIterator; +import com.usatiuk.objects.iterators.IteratorStart; +import com.usatiuk.objects.iterators.MaybeTombstone; +import com.usatiuk.objects.iterators.NavigableMapKvIterator; import com.usatiuk.objects.snapshot.Snapshot; import com.usatiuk.objects.transaction.TxCommitException; import com.usatiuk.objects.transaction.TxRecord; @@ -27,6 +30,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; +import java.util.stream.Stream; @ApplicationScoped public class WritebackObjectPersistentStore { @@ -341,10 +345,8 @@ public class WritebackObjectPersistentStore { private final long txId = finalPw.lastCommittedId(); @Override - public CloseableKvIterator getIterator(IteratorStart start, JObjectKey key) { - return TombstoneMergingKvIterator.of("writeback-ps", start, key, - (tS, tK) -> new NavigableMapKvIterator<>(_pendingWrites, tS, tK), - (tS, tK) -> (CloseableKvIterator>) (CloseableKvIterator) _cache.getIterator(tS, tK)); + public Stream>> getIterator(IteratorStart start, JObjectKey key) { + return Stream.concat(Stream.of(new NavigableMapKvIterator<>(_pendingWrites, start, key)), _cache.getIterator(start, key)); } @Nonnull diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/transaction/TransactionFactoryImpl.java b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/transaction/TransactionFactoryImpl.java index ae11f3d6..66e4e4b8 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/transaction/TransactionFactoryImpl.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/transaction/TransactionFactoryImpl.java @@ -13,6 +13,7 @@ import org.apache.commons.lang3.tuple.Pair; import org.eclipse.microprofile.config.inject.ConfigProperty; import java.util.*; +import java.util.stream.Stream; @Singleton public class TransactionFactoryImpl implements TransactionFactory { @@ -158,16 +159,21 @@ public class TransactionFactoryImpl implements TransactionFactory { @Override public CloseableKvIterator getIterator(IteratorStart start, JObjectKey key) { Log.tracev("Getting tx iterator with start={0}, key={1}", start, key); - return new ReadTrackingIterator(TombstoneMergingKvIterator.of("tx", start, key, - (tS, tK) -> new MappingKvIterator<>(new NavigableMapKvIterator<>(_writes, tS, tK), + return new ReadTrackingIterator(TombstoneMergingKvIterator.of("tx", start, key, (tS, tK) -> + Stream.concat(Stream.of(new MappingKvIterator<>(new NavigableMapKvIterator<>(_writes, tS, tK), t -> switch (t) { case TxRecord.TxObjectRecordWrite write -> - new DataWrapper<>(new ReadTrackingInternalCrapTx(write.data())); - case TxRecord.TxObjectRecordDeleted deleted -> new TombstoneImpl<>(); + new DataWrapper(new ReadTrackingInternalCrapTx(write.data())); + case TxRecord.TxObjectRecordDeleted deleted -> + new TombstoneImpl(); case null, default -> null; - }), - (tS, tK) -> new MappingKvIterator<>(_snapshot.getIterator(tS, tK), - d -> new DataWrapper(new ReadTrackingInternalCrapSource(d))))); + })), _snapshot.getIterator(tS, tK).map(itin -> new MappingKvIterator, MaybeTombstone>(itin, + d -> switch (d) { + case Data w -> + new DataWrapper<>(new ReadTrackingInternalCrapSource(w.value())); + case Tombstone t -> new TombstoneImpl<>(); + case null, default -> null; + }))))); } @Override