This commit is contained in:
2025-04-28 19:24:18 +02:00
parent 81703a9406
commit ddf87d2125
12 changed files with 165 additions and 47 deletions

View File

@@ -18,6 +18,11 @@
</properties>
<dependencies>
<dependency>
<groupId>org.apache.fury</groupId>
<artifactId>fury-core</artifactId>
<version>0.10.1</version>
</dependency>
<dependency>
<groupId>net.jqwik</groupId>
<artifactId>jqwik</artifactId>

View File

@@ -3,26 +3,30 @@ package com.usatiuk.objects;
import com.google.protobuf.ByteString;
import com.google.protobuf.UnsafeByteOperations;
import com.usatiuk.utils.SerializationHelper;
import io.quarkus.arc.DefaultBean;
import jakarta.enterprise.context.ApplicationScoped;
import org.apache.fury.Fury;
import org.apache.fury.ThreadSafeFury;
import org.apache.fury.config.Language;
import java.io.IOException;
import java.nio.ByteBuffer;
@ApplicationScoped
@DefaultBean
public class JavaDataSerializer implements ObjectSerializer<JData> {
private static final ThreadSafeFury fury = Fury.builder().withLanguage(Language.JAVA)
// Allow to deserialize objects unknown types,
// more flexible but less secure.
.requireClassRegistration(false)
.buildThreadSafeFury();
@Override
public ByteString serialize(JData obj) {
return SerializationHelper.serialize(obj);
return UnsafeByteOperations.unsafeWrap(fury.serialize(obj));
}
@Override
public JData deserialize(ByteBuffer data) {
try (var is = UnsafeByteOperations.unsafeWrap(data).newInput()) {
return SerializationHelper.deserialize(is);
} catch (IOException e) {
throw new RuntimeException(e);
}
return (JData) fury.deserialize(data);
}
}

View File

@@ -0,0 +1,8 @@
package com.usatiuk.objects.iterators;
import java.util.stream.Stream;
@FunctionalInterface
public interface IterProdFn2<K extends Comparable<K>, V> {
Stream<CloseableKvIterator<K, MaybeTombstone<V>>> get(IteratorStart start, K key);
}

View File

@@ -13,6 +13,7 @@ public class MergingKvIterator<K extends Comparable<K>, V> extends ReversibleKvI
private final NavigableMap<K, IteratorEntry<K, V>> _sortedIterators = new TreeMap<>();
private final String _name;
private final List<IteratorEntry<K, V>> _iterators;
public MergingKvIterator(String name, IteratorStart startType, K startKey, List<IterProdFn<K, V>> iterators) {
_goingForward = true;
_name = name;
@@ -73,6 +74,84 @@ public class MergingKvIterator<K extends Comparable<K>, V> extends ReversibleKvI
advanceIterator(iterator);
}
// Log.tracev("{0} Initialized: {1}", _name, _sortedIterators);
// switch (startType) {
//// case LT -> {
//// assert _sortedIterators.isEmpty() || _sortedIterators.firstKey().compareTo(initialStartKey) < 0;
//// }
//// case LE -> {
//// assert _sortedIterators.isEmpty() || _sortedIterators.firstKey().compareTo(initialStartKey) <= 0;
//// }
// case GT -> {
// assert _sortedIterators.isEmpty() || _sortedIterators.firstKey().compareTo(startKey) > 0;
// }
// case GE -> {
// assert _sortedIterators.isEmpty() || _sortedIterators.firstKey().compareTo(startKey) >= 0;
// }
// }
}
public MergingKvIterator(String name, IteratorStart startType, K startKey, IterProdFn2<K, V> iteratorsProd) {
_goingForward = true;
_name = name;
// Why streams are so slow?
{
var iterators = iteratorsProd.get(startType, startKey).toList();
IteratorEntry<K, V>[] iteratorEntries = new IteratorEntry[iterators.size()];
for (int i = 0; i < iterators.size(); i++) {
iteratorEntries[i] = new IteratorEntry<>(i, (CloseableKvIterator<K, V>) iterators.get(i));
}
_iterators = List.of(iteratorEntries);
}
if (startType == IteratorStart.LT || startType == IteratorStart.LE) {
// Starting at a greatest key less than/less or equal than:
// We have a bunch of iterators that have given us theirs "greatest LT/LE key"
// now we need to pick the greatest of those to start with
// But if some of them don't have a lesser key, we need to pick the smallest of those
K greatestLess = null;
K smallestMore = null;
for (var ite : _iterators) {
var it = ite.iterator();
if (it.hasNext()) {
var peeked = it.peekNextKey();
if (startType == IteratorStart.LE ? peeked.compareTo(startKey) <= 0 : peeked.compareTo(startKey) < 0) {
if (greatestLess == null || peeked.compareTo(greatestLess) > 0) {
greatestLess = peeked;
}
} else {
if (smallestMore == null || peeked.compareTo(smallestMore) < 0) {
smallestMore = peeked;
}
}
}
}
K initialMaxValue;
if (greatestLess != null)
initialMaxValue = greatestLess;
else
initialMaxValue = smallestMore;
if (initialMaxValue == null) {
// Empty iterators
}
for (var ite : _iterators) {
var iterator = ite.iterator();
while (iterator.hasNext() && iterator.peekNextKey().compareTo(initialMaxValue) < 0) {
iterator.skip();
}
}
}
for (IteratorEntry<K, V> iterator : _iterators) {
advanceIterator(iterator);
}
// Log.tracev("{0} Initialized: {1}", _name, _sortedIterators);
// switch (startType) {
//// case LT -> {

View File

@@ -19,4 +19,18 @@ public abstract class TombstoneMergingKvIterator {
public static <K extends Comparable<K>, V> CloseableKvIterator<K, V> of(String name, IteratorStart startType, K startKey, IterProdFn<K, MaybeTombstone<V>>... iterators) {
return of(name, startType, startKey, List.of(iterators));
}
public static <K extends Comparable<K>, V> CloseableKvIterator<K, V> of(String name, IteratorStart startType, K startKey, IterProdFn2<K, V> itProd) {
return new PredicateKvIterator<K, MaybeTombstone<V>, V>(
new MergingKvIterator<K, MaybeTombstone<V>>(name + "-merging", startType, startKey, (IterProdFn2<K, MaybeTombstone<V>>) itProd),
startType, startKey,
pair -> {
// Log.tracev("{0} - Processing pair {1}", name, pair);
if (pair instanceof Tombstone<V>) {
return null;
}
return ((Data<V>) pair).value();
});
}
}

View File

@@ -2,13 +2,16 @@ package com.usatiuk.objects.snapshot;
import com.usatiuk.objects.iterators.CloseableKvIterator;
import com.usatiuk.objects.iterators.IteratorStart;
import com.usatiuk.objects.iterators.MaybeTombstone;
import com.usatiuk.objects.iterators.Tombstone;
import com.usatiuk.utils.AutoCloseableNoThrow;
import javax.annotation.Nonnull;
import java.util.Optional;
import java.util.stream.Stream;
public interface Snapshot<K extends Comparable<K>, V> extends AutoCloseableNoThrow {
CloseableKvIterator<K, V> getIterator(IteratorStart start, K key);
Stream<CloseableKvIterator<K, MaybeTombstone<V>>> getIterator(IteratorStart start, K key);
@Nonnull
Optional<V> readObject(K name);

View File

@@ -21,6 +21,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Stream;
@ApplicationScoped
public class CachingObjectPersistentStore {
@@ -33,6 +34,7 @@ public class CachingObjectPersistentStore {
private ExecutorService _statusExecutor;
private AtomicLong _cached = new AtomicLong();
private AtomicLong _cacheTries = new AtomicLong();
public CachingObjectPersistentStore(@ConfigProperty(name = "dhfs.objects.lru.limit") int sizeLimit) {
_cache = new AtomicReference<>(
new Cache(TreePMap.empty(), 0, -1, sizeLimit)
@@ -150,10 +152,11 @@ public class CachingObjectPersistentStore {
}
@Override
public CloseableKvIterator<JObjectKey, JDataVersionedWrapper> getIterator(IteratorStart start, JObjectKey key) {
return TombstoneMergingKvIterator.<JObjectKey, JDataVersionedWrapper>of("cache", start, key,
(mS, mK) -> new NavigableMapKvIterator<JObjectKey, MaybeTombstone<JDataVersionedWrapper>>(_curCache.map(), mS, mK),
(mS, mK) -> new CachingKvIterator(_backing.getIterator(start, key)));
public Stream<CloseableKvIterator<JObjectKey, MaybeTombstone<JDataVersionedWrapper>>> getIterator(IteratorStart start, JObjectKey key) {
return Stream.concat(
Stream.of(new NavigableMapKvIterator<JObjectKey, MaybeTombstone<JDataVersionedWrapper>>(_curCache.map(), start, key)),
_backing.getIterator(start, key).map(i -> new CachingKvIterator((CloseableKvIterator<JObjectKey, JDataVersionedWrapper>) (CloseableKvIterator<JObjectKey, ?>) i))
);
}
@Nonnull

View File

@@ -3,10 +3,7 @@ package com.usatiuk.objects.stores;
import com.usatiuk.objects.JObjectKey;
import com.usatiuk.objects.JObjectKeyMax;
import com.usatiuk.objects.JObjectKeyMin;
import com.usatiuk.objects.iterators.CloseableKvIterator;
import com.usatiuk.objects.iterators.IteratorStart;
import com.usatiuk.objects.iterators.KeyPredicateKvIterator;
import com.usatiuk.objects.iterators.ReversibleKvIterator;
import com.usatiuk.objects.iterators.*;
import com.usatiuk.objects.snapshot.Snapshot;
import io.quarkus.arc.properties.IfBuildProperty;
import io.quarkus.logging.Log;
@@ -28,6 +25,7 @@ import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.stream.Stream;
import static org.lmdbjava.DbiFlags.MDB_CREATE;
import static org.lmdbjava.Env.create;
@@ -112,9 +110,9 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore {
private boolean _closed = false;
@Override
public CloseableKvIterator<JObjectKey, ByteBuffer> getIterator(IteratorStart start, JObjectKey key) {
public Stream<CloseableKvIterator<JObjectKey, MaybeTombstone<ByteBuffer>>> getIterator(IteratorStart start, JObjectKey key) {
assert !_closed;
return new KeyPredicateKvIterator<>(new LmdbKvIterator(_txn, start, key), start, key, (k) -> !k.value().equals(DB_VER_OBJ_NAME_STR));
return Stream.of(new KeyPredicateKvIterator<>(new LmdbKvIterator(_txn, start, key), start, key, (k) -> !k.value().equals(DB_VER_OBJ_NAME_STR)));
}
@Nonnull
@@ -195,7 +193,7 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore {
return _root.toFile().getUsableSpace();
}
private class LmdbKvIterator extends ReversibleKvIterator<JObjectKey, ByteBuffer> {
private class LmdbKvIterator extends ReversibleKvIterator<JObjectKey, MaybeTombstone<ByteBuffer>> {
private static final Cleaner CLEANER = Cleaner.create();
private final Txn<ByteBuffer> _txn; // Managed by the snapshot
private final Cursor<ByteBuffer> _cursor;
@@ -350,13 +348,13 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore {
}
@Override
protected Pair<JObjectKey, ByteBuffer> nextImpl() {
protected Pair<JObjectKey, MaybeTombstone<ByteBuffer>> nextImpl() {
if (!_hasNext) {
throw new NoSuchElementException("No more elements");
}
// TODO: Right now with java serialization it doesn't matter, it's all copied to arrays anyway
var val = _cursor.val();
var ret = Pair.of(JObjectKey.fromByteBuffer(_cursor.key()), val.asReadOnlyBuffer());
Pair<JObjectKey, MaybeTombstone<ByteBuffer>> ret = Pair.of(JObjectKey.fromByteBuffer(_cursor.key()), new DataWrapper<>(val.asReadOnlyBuffer()));
if (_goingForward)
_hasNext = _cursor.next();
else

View File

@@ -2,10 +2,7 @@ package com.usatiuk.objects.stores;
import com.google.protobuf.ByteString;
import com.usatiuk.objects.JObjectKey;
import com.usatiuk.objects.iterators.CloseableKvIterator;
import com.usatiuk.objects.iterators.IteratorStart;
import com.usatiuk.objects.iterators.MappingKvIterator;
import com.usatiuk.objects.iterators.NavigableMapKvIterator;
import com.usatiuk.objects.iterators.*;
import com.usatiuk.objects.snapshot.Snapshot;
import io.quarkus.arc.properties.IfBuildProperty;
import jakarta.enterprise.context.ApplicationScoped;
@@ -15,6 +12,7 @@ import javax.annotation.Nonnull;
import java.nio.ByteBuffer;
import java.util.Optional;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Stream;
@ApplicationScoped
@IfBuildProperty(name = "dhfs.objects.persistence", stringValue = "memory")
@@ -31,8 +29,8 @@ public class MemoryObjectPersistentStore implements ObjectPersistentStore {
private final long _lastCommitId = MemoryObjectPersistentStore.this._lastCommitId;
@Override
public CloseableKvIterator<JObjectKey, ByteBuffer> getIterator(IteratorStart start, JObjectKey key) {
return new MappingKvIterator<>(new NavigableMapKvIterator<>(_objects, start, key), ByteString::asReadOnlyByteBuffer);
public Stream<CloseableKvIterator<JObjectKey, MaybeTombstone<ByteBuffer>>> getIterator(IteratorStart start, JObjectKey key) {
return Stream.of(new MappingKvIterator<>(new NavigableMapKvIterator<>(_objects, start, key), s -> new DataWrapper<>(s.asReadOnlyByteBuffer())));
}
@Nonnull

View File

@@ -1,13 +1,9 @@
package com.usatiuk.objects.stores;
import com.google.protobuf.ByteString;
import com.usatiuk.objects.JDataVersionedWrapper;
import com.usatiuk.objects.JDataVersionedWrapperSerializer;
import com.usatiuk.objects.JObjectKey;
import com.usatiuk.objects.ObjectSerializer;
import com.usatiuk.objects.iterators.CloseableKvIterator;
import com.usatiuk.objects.iterators.IteratorStart;
import com.usatiuk.objects.iterators.MappingKvIterator;
import com.usatiuk.objects.iterators.*;
import com.usatiuk.objects.snapshot.Snapshot;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
@@ -16,6 +12,7 @@ import org.apache.commons.lang3.tuple.Pair;
import javax.annotation.Nonnull;
import java.nio.ByteBuffer;
import java.util.Optional;
import java.util.stream.Stream;
@ApplicationScoped
public class SerializingObjectPersistentStore {
@@ -30,8 +27,9 @@ public class SerializingObjectPersistentStore {
private final Snapshot<JObjectKey, ByteBuffer> _backing = delegateStore.getSnapshot();
@Override
public CloseableKvIterator<JObjectKey, JDataVersionedWrapper> getIterator(IteratorStart start, JObjectKey key) {
return new MappingKvIterator<>(_backing.getIterator(start, key), d -> serializer.deserialize(d));
public Stream<CloseableKvIterator<JObjectKey, MaybeTombstone<JDataVersionedWrapper>>> getIterator(IteratorStart start, JObjectKey key) {
return _backing.getIterator(start, key).map(i -> new MappingKvIterator<JObjectKey, MaybeTombstone<ByteBuffer>, MaybeTombstone<JDataVersionedWrapper>>(i,
d -> serializer.deserialize(((DataWrapper<ByteBuffer>) d).value())));
}
@Nonnull

View File

@@ -3,7 +3,10 @@ package com.usatiuk.objects.stores;
import com.usatiuk.objects.JDataVersionedWrapper;
import com.usatiuk.objects.JDataVersionedWrapperImpl;
import com.usatiuk.objects.JObjectKey;
import com.usatiuk.objects.iterators.*;
import com.usatiuk.objects.iterators.CloseableKvIterator;
import com.usatiuk.objects.iterators.IteratorStart;
import com.usatiuk.objects.iterators.MaybeTombstone;
import com.usatiuk.objects.iterators.NavigableMapKvIterator;
import com.usatiuk.objects.snapshot.Snapshot;
import com.usatiuk.objects.transaction.TxCommitException;
import com.usatiuk.objects.transaction.TxRecord;
@@ -27,6 +30,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.stream.Stream;
@ApplicationScoped
public class WritebackObjectPersistentStore {
@@ -341,10 +345,8 @@ public class WritebackObjectPersistentStore {
private final long txId = finalPw.lastCommittedId();
@Override
public CloseableKvIterator<JObjectKey, JDataVersionedWrapper> getIterator(IteratorStart start, JObjectKey key) {
return TombstoneMergingKvIterator.<JObjectKey, JDataVersionedWrapper>of("writeback-ps", start, key,
(tS, tK) -> new NavigableMapKvIterator<>(_pendingWrites, tS, tK),
(tS, tK) -> (CloseableKvIterator<JObjectKey, MaybeTombstone<JDataVersionedWrapper>>) (CloseableKvIterator<JObjectKey, ?>) _cache.getIterator(tS, tK));
public Stream<CloseableKvIterator<JObjectKey, MaybeTombstone<JDataVersionedWrapper>>> getIterator(IteratorStart start, JObjectKey key) {
return Stream.concat(Stream.of(new NavigableMapKvIterator<>(_pendingWrites, start, key)), _cache.getIterator(start, key));
}
@Nonnull

View File

@@ -13,6 +13,7 @@ import org.apache.commons.lang3.tuple.Pair;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import java.util.*;
import java.util.stream.Stream;
@Singleton
public class TransactionFactoryImpl implements TransactionFactory {
@@ -158,16 +159,21 @@ public class TransactionFactoryImpl implements TransactionFactory {
@Override
public CloseableKvIterator<JObjectKey, JData> getIterator(IteratorStart start, JObjectKey key) {
Log.tracev("Getting tx iterator with start={0}, key={1}", start, key);
return new ReadTrackingIterator(TombstoneMergingKvIterator.<JObjectKey, ReadTrackingInternalCrap>of("tx", start, key,
(tS, tK) -> new MappingKvIterator<>(new NavigableMapKvIterator<>(_writes, tS, tK),
return new ReadTrackingIterator(TombstoneMergingKvIterator.<JObjectKey, ReadTrackingInternalCrap>of("tx", start, key, (tS, tK) ->
Stream.concat(Stream.of(new MappingKvIterator<>(new NavigableMapKvIterator<>(_writes, tS, tK),
t -> switch (t) {
case TxRecord.TxObjectRecordWrite<?> write ->
new DataWrapper<>(new ReadTrackingInternalCrapTx(write.data()));
case TxRecord.TxObjectRecordDeleted deleted -> new TombstoneImpl<>();
new DataWrapper<ReadTrackingInternalCrap>(new ReadTrackingInternalCrapTx(write.data()));
case TxRecord.TxObjectRecordDeleted deleted ->
new TombstoneImpl<ReadTrackingInternalCrap>();
case null, default -> null;
}),
(tS, tK) -> new MappingKvIterator<>(_snapshot.getIterator(tS, tK),
d -> new DataWrapper<ReadTrackingInternalCrap>(new ReadTrackingInternalCrapSource(d)))));
})), _snapshot.getIterator(tS, tK).map(itin -> new MappingKvIterator<JObjectKey, MaybeTombstone<JDataVersionedWrapper>, MaybeTombstone<ReadTrackingInternalCrap>>(itin,
d -> switch (d) {
case Data<JDataVersionedWrapper> w ->
new DataWrapper<>(new ReadTrackingInternalCrapSource(w.value()));
case Tombstone<JDataVersionedWrapper> t -> new TombstoneImpl<>();
case null, default -> null;
})))));
}
@Override