This commit is contained in:
2025-04-28 09:03:13 +02:00
parent f87eb365c3
commit 6a962022bb
11 changed files with 65 additions and 46 deletions

View File

@@ -18,6 +18,11 @@
</properties> </properties>
<dependencies> <dependencies>
<dependency>
<groupId>org.apache.fury</groupId>
<artifactId>fury-core</artifactId>
<version>0.10.1</version>
</dependency>
<dependency> <dependency>
<groupId>net.jqwik</groupId> <groupId>net.jqwik</groupId>
<artifactId>jqwik</artifactId> <artifactId>jqwik</artifactId>

View File

@@ -8,11 +8,10 @@ import jakarta.inject.Singleton;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
@Singleton @Singleton
public class JDataVersionedWrapperSerializer implements ObjectSerializer<JDataVersionedWrapper> { public class JDataVersionedWrapperSerializer {
@Inject @Inject
ObjectSerializer<JData> dataSerializer; ObjectSerializer<JData> dataSerializer;
@Override
public ByteString serialize(JDataVersionedWrapper obj) { public ByteString serialize(JDataVersionedWrapper obj) {
ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES); ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES);
buffer.putLong(obj.version()); buffer.putLong(obj.version());
@@ -20,12 +19,10 @@ public class JDataVersionedWrapperSerializer implements ObjectSerializer<JDataVe
return ByteString.copyFrom(buffer).concat(dataSerializer.serialize(obj.data())); return ByteString.copyFrom(buffer).concat(dataSerializer.serialize(obj.data()));
} }
@Override public JDataVersionedWrapper deserialize(ByteBuffer data) {
public JDataVersionedWrapper deserialize(ByteString data) { var version = data.getLong();
var version = data.substring(0, Long.BYTES).asReadOnlyByteBuffer().getLong(); return new JDataVersionedWrapperLazy(version, data.remaining(),
var rawData = data.substring(Long.BYTES); () -> dataSerializer.deserialize(data)
return new JDataVersionedWrapperLazy(version, rawData.size(),
() -> dataSerializer.deserialize(rawData)
); );
} }
} }

View File

@@ -2,26 +2,31 @@ package com.usatiuk.objects;
import com.google.protobuf.ByteString; import com.google.protobuf.ByteString;
import com.usatiuk.utils.SerializationHelper; import com.google.protobuf.UnsafeByteOperations;
import io.quarkus.arc.DefaultBean; import io.quarkus.arc.DefaultBean;
import jakarta.enterprise.context.ApplicationScoped; import jakarta.enterprise.context.ApplicationScoped;
import org.apache.fury.Fury;
import org.apache.fury.ThreadSafeFury;
import org.apache.fury.config.Language;
import java.io.IOException; import java.nio.ByteBuffer;
@ApplicationScoped @ApplicationScoped
@DefaultBean @DefaultBean
public class JavaDataSerializer implements ObjectSerializer<JData> { public class JavaDataSerializer implements ObjectSerializer<JData> {
private static final ThreadSafeFury fury = Fury.builder().withLanguage(Language.JAVA)
// Allow to deserialize objects unknown types,
// more flexible but less secure.
.requireClassRegistration(false)
.buildThreadSafeFury();
@Override @Override
public ByteString serialize(JData obj) { public ByteString serialize(JData obj) {
return SerializationHelper.serialize(obj); return UnsafeByteOperations.unsafeWrap(fury.serialize(obj));
} }
@Override @Override
public JData deserialize(ByteString data) { public JData deserialize(ByteBuffer data) {
try (var is = data.newInput()) { return (JData) fury.deserialize(data);
return SerializationHelper.deserialize(is);
} catch (IOException e) {
throw new RuntimeException(e);
}
} }
} }

View File

@@ -2,8 +2,10 @@ package com.usatiuk.objects;
import com.google.protobuf.ByteString; import com.google.protobuf.ByteString;
import java.nio.ByteBuffer;
public interface ObjectSerializer<T> { public interface ObjectSerializer<T> {
ByteString serialize(T obj); ByteString serialize(T obj);
T deserialize(ByteString data); T deserialize(ByteBuffer data);
} }

View File

@@ -1,7 +1,5 @@
package com.usatiuk.objects.stores; package com.usatiuk.objects.stores;
import com.google.protobuf.ByteString;
import com.google.protobuf.UnsafeByteOperations;
import com.usatiuk.objects.JObjectKey; import com.usatiuk.objects.JObjectKey;
import com.usatiuk.objects.JObjectKeyMax; import com.usatiuk.objects.JObjectKeyMax;
import com.usatiuk.objects.JObjectKeyMin; import com.usatiuk.objects.JObjectKeyMin;
@@ -104,27 +102,27 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore {
} }
@Override @Override
public Snapshot<JObjectKey, ByteString> getSnapshot() { public Snapshot<JObjectKey, ByteBuffer> getSnapshot() {
var txn = _env.txnRead(); var txn = _env.txnRead();
try { try {
long commitId = readTxId(txn).orElseThrow(); long commitId = readTxId(txn).orElseThrow();
return new Snapshot<JObjectKey, ByteString>() { return new Snapshot<JObjectKey, ByteBuffer>() {
private final Txn<ByteBuffer> _txn = txn; private final Txn<ByteBuffer> _txn = txn;
private final long _id = commitId; private final long _id = commitId;
private boolean _closed = false; private boolean _closed = false;
@Override @Override
public CloseableKvIterator<JObjectKey, ByteString> getIterator(IteratorStart start, JObjectKey key) { public CloseableKvIterator<JObjectKey, ByteBuffer> getIterator(IteratorStart start, JObjectKey key) {
assert !_closed; assert !_closed;
return new KeyPredicateKvIterator<>(new LmdbKvIterator(_txn, start, key), start, key, (k) -> !k.value().equals(DB_VER_OBJ_NAME_STR)); return new KeyPredicateKvIterator<>(new LmdbKvIterator(_txn, start, key), start, key, (k) -> !k.value().equals(DB_VER_OBJ_NAME_STR));
} }
@Nonnull @Nonnull
@Override @Override
public Optional<ByteString> readObject(JObjectKey name) { public Optional<ByteBuffer> readObject(JObjectKey name) {
assert !_closed; assert !_closed;
var got = _db.get(_txn, name.toByteBuffer()); var got = _db.get(_txn, name.toByteBuffer());
var ret = Optional.ofNullable(got).map(UnsafeByteOperations::unsafeWrap); var ret = Optional.ofNullable(got).map(ByteBuffer::asReadOnlyBuffer);
return ret; return ret;
} }
@@ -197,7 +195,7 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore {
return _root.toFile().getUsableSpace(); return _root.toFile().getUsableSpace();
} }
private class LmdbKvIterator extends ReversibleKvIterator<JObjectKey, ByteString> { private class LmdbKvIterator extends ReversibleKvIterator<JObjectKey, ByteBuffer> {
private static final Cleaner CLEANER = Cleaner.create(); private static final Cleaner CLEANER = Cleaner.create();
private final Txn<ByteBuffer> _txn; // Managed by the snapshot private final Txn<ByteBuffer> _txn; // Managed by the snapshot
private final Cursor<ByteBuffer> _cursor; private final Cursor<ByteBuffer> _cursor;
@@ -352,14 +350,13 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore {
} }
@Override @Override
protected Pair<JObjectKey, ByteString> nextImpl() { protected Pair<JObjectKey, ByteBuffer> nextImpl() {
if (!_hasNext) { if (!_hasNext) {
throw new NoSuchElementException("No more elements"); throw new NoSuchElementException("No more elements");
} }
// TODO: Right now with java serialization it doesn't matter, it's all copied to arrays anyway // TODO: Right now with java serialization it doesn't matter, it's all copied to arrays anyway
var val = _cursor.val(); var val = _cursor.val();
var bs = UnsafeByteOperations.unsafeWrap(val); var ret = Pair.of(JObjectKey.fromByteBuffer(_cursor.key()), val.asReadOnlyBuffer());
var ret = Pair.of(JObjectKey.fromByteBuffer(_cursor.key()), bs);
if (_goingForward) if (_goingForward)
_hasNext = _cursor.next(); _hasNext = _cursor.next();
else else

View File

@@ -4,6 +4,7 @@ import com.google.protobuf.ByteString;
import com.usatiuk.objects.JObjectKey; import com.usatiuk.objects.JObjectKey;
import com.usatiuk.objects.iterators.CloseableKvIterator; import com.usatiuk.objects.iterators.CloseableKvIterator;
import com.usatiuk.objects.iterators.IteratorStart; import com.usatiuk.objects.iterators.IteratorStart;
import com.usatiuk.objects.iterators.MappingKvIterator;
import com.usatiuk.objects.iterators.NavigableMapKvIterator; import com.usatiuk.objects.iterators.NavigableMapKvIterator;
import com.usatiuk.objects.snapshot.Snapshot; import com.usatiuk.objects.snapshot.Snapshot;
import io.quarkus.arc.properties.IfBuildProperty; import io.quarkus.arc.properties.IfBuildProperty;
@@ -11,6 +12,7 @@ import jakarta.enterprise.context.ApplicationScoped;
import org.pcollections.TreePMap; import org.pcollections.TreePMap;
import javax.annotation.Nonnull; import javax.annotation.Nonnull;
import java.nio.ByteBuffer;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -22,21 +24,21 @@ public class MemoryObjectPersistentStore implements ObjectPersistentStore {
private long _lastCommitId = 0; private long _lastCommitId = 0;
@Override @Override
public Snapshot<JObjectKey, ByteString> getSnapshot() { public Snapshot<JObjectKey, ByteBuffer> getSnapshot() {
synchronized (this) { synchronized (this) {
return new Snapshot<JObjectKey, ByteString>() { return new Snapshot<JObjectKey, ByteBuffer>() {
private final TreePMap<JObjectKey, ByteString> _objects = MemoryObjectPersistentStore.this._objects; private final TreePMap<JObjectKey, ByteString> _objects = MemoryObjectPersistentStore.this._objects;
private final long _lastCommitId = MemoryObjectPersistentStore.this._lastCommitId; private final long _lastCommitId = MemoryObjectPersistentStore.this._lastCommitId;
@Override @Override
public CloseableKvIterator<JObjectKey, ByteString> getIterator(IteratorStart start, JObjectKey key) { public CloseableKvIterator<JObjectKey, ByteBuffer> getIterator(IteratorStart start, JObjectKey key) {
return new NavigableMapKvIterator<>(_objects, start, key); return new MappingKvIterator<>(new NavigableMapKvIterator<>(_objects, start, key), ByteString::asReadOnlyByteBuffer);
} }
@Nonnull @Nonnull
@Override @Override
public Optional<ByteString> readObject(JObjectKey name) { public Optional<ByteBuffer> readObject(JObjectKey name) {
return Optional.ofNullable(_objects.get(name)); return Optional.ofNullable(_objects.get(name)).map(ByteString::asReadOnlyByteBuffer);
} }
@Override @Override

View File

@@ -5,12 +5,13 @@ import com.usatiuk.objects.JObjectKey;
import com.usatiuk.objects.snapshot.Snapshot; import com.usatiuk.objects.snapshot.Snapshot;
import javax.annotation.Nonnull; import javax.annotation.Nonnull;
import java.nio.ByteBuffer;
import java.util.Optional; import java.util.Optional;
// Persistent storage of objects // Persistent storage of objects
// All changes are written as sequential transactions // All changes are written as sequential transactions
public interface ObjectPersistentStore { public interface ObjectPersistentStore {
Snapshot<JObjectKey, ByteString> getSnapshot(); Snapshot<JObjectKey, ByteBuffer> getSnapshot();
Runnable prepareTx(TxManifestRaw names, long txId); Runnable prepareTx(TxManifestRaw names, long txId);

View File

@@ -2,6 +2,7 @@ package com.usatiuk.objects.stores;
import com.google.protobuf.ByteString; import com.google.protobuf.ByteString;
import com.usatiuk.objects.JDataVersionedWrapper; import com.usatiuk.objects.JDataVersionedWrapper;
import com.usatiuk.objects.JDataVersionedWrapperSerializer;
import com.usatiuk.objects.JObjectKey; import com.usatiuk.objects.JObjectKey;
import com.usatiuk.objects.ObjectSerializer; import com.usatiuk.objects.ObjectSerializer;
import com.usatiuk.objects.iterators.CloseableKvIterator; import com.usatiuk.objects.iterators.CloseableKvIterator;
@@ -13,19 +14,20 @@ import jakarta.inject.Inject;
import org.apache.commons.lang3.tuple.Pair; import org.apache.commons.lang3.tuple.Pair;
import javax.annotation.Nonnull; import javax.annotation.Nonnull;
import java.nio.ByteBuffer;
import java.util.Optional; import java.util.Optional;
@ApplicationScoped @ApplicationScoped
public class SerializingObjectPersistentStore { public class SerializingObjectPersistentStore {
@Inject @Inject
ObjectSerializer<JDataVersionedWrapper> serializer; JDataVersionedWrapperSerializer serializer;
@Inject @Inject
ObjectPersistentStore delegateStore; ObjectPersistentStore delegateStore;
public Snapshot<JObjectKey, JDataVersionedWrapper> getSnapshot() { public Snapshot<JObjectKey, JDataVersionedWrapper> getSnapshot() {
return new Snapshot<JObjectKey, JDataVersionedWrapper>() { return new Snapshot<JObjectKey, JDataVersionedWrapper>() {
private final Snapshot<JObjectKey, ByteString> _backing = delegateStore.getSnapshot(); private final Snapshot<JObjectKey, ByteBuffer> _backing = delegateStore.getSnapshot();
@Override @Override
public CloseableKvIterator<JObjectKey, JDataVersionedWrapper> getIterator(IteratorStart start, JObjectKey key) { public CloseableKvIterator<JObjectKey, JDataVersionedWrapper> getIterator(IteratorStart start, JObjectKey key) {

View File

@@ -11,7 +11,7 @@ import io.quarkus.runtime.StartupEvent;
import jakarta.annotation.Priority; import jakarta.annotation.Priority;
import jakarta.enterprise.context.ApplicationScoped; import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes; import jakarta.enterprise.event.Observes;
import jakarta.enterprise.inject.Instance; import jakarta.enterprise.inject.spi.CDI;
import jakarta.inject.Inject; import jakarta.inject.Inject;
import org.apache.commons.lang3.tuple.Pair; import org.apache.commons.lang3.tuple.Pair;
@@ -22,7 +22,7 @@ import java.util.stream.Stream;
@ApplicationScoped @ApplicationScoped
public class JObjectManager { public class JObjectManager {
private final List<PreCommitTxHook> _preCommitTxHooks; private static final List<PreCommitTxHook> _preCommitTxHooks;
@Inject @Inject
WritebackObjectPersistentStore writebackObjectPersistentStore; WritebackObjectPersistentStore writebackObjectPersistentStore;
@Inject @Inject
@@ -30,9 +30,13 @@ public class JObjectManager {
@Inject @Inject
LockManager lockManager; LockManager lockManager;
private boolean _ready = false; private boolean _ready = false;
JObjectManager(Instance<PreCommitTxHook> preCommitTxHooks) {
_preCommitTxHooks = List.copyOf(preCommitTxHooks.stream().sorted(Comparator.comparingInt(PreCommitTxHook::getPriority)).toList()); static {
Log.debugv("Pre-commit hooks: {0}", String.join("->", _preCommitTxHooks.stream().map(Objects::toString).toList())); _preCommitTxHooks = List.copyOf(CDI.current().select(PreCommitTxHook.class).stream().sorted(Comparator.comparingInt(PreCommitTxHook::getPriority)).toList());
}
JObjectManager() {
Log.infov("Pre-commit hooks: {0}", String.join("->", _preCommitTxHooks.stream().map(Objects::toString).toList()));
} }
private void verifyReady() { private void verifyReady() {

View File

@@ -54,11 +54,11 @@ public class JKleppmannTreeManager {
); );
curTx.put(data); curTx.put(data);
var rootNode = new JKleppmannTreeNode(JObjectKey.of(name.value() + "_jt_root"), null, rootNodeSupplier.get()); var rootNode = new JKleppmannTreeNode(JObjectKey.of(name.value() + "_jt_root"), null, rootNodeSupplier.get());
curTx.put(new JKleppmannTreeNodeHolder(rootNode)); curTx.put(new JKleppmannTreeNodeHolder(rootNode, true));
var trashNode = new JKleppmannTreeNode(JObjectKey.of(name.value() + "_jt_trash"), null, rootNodeSupplier.get()); var trashNode = new JKleppmannTreeNode(JObjectKey.of(name.value() + "_jt_trash"), null, rootNodeSupplier.get());
curTx.put(new JKleppmannTreeNodeHolder(trashNode)); curTx.put(new JKleppmannTreeNodeHolder(trashNode, true));
var lf_node = new JKleppmannTreeNode(JObjectKey.of(name.value() + "_jt_lf"), null, rootNodeSupplier.get()); var lf_node = new JKleppmannTreeNode(JObjectKey.of(name.value() + "_jt_lf"), null, rootNodeSupplier.get());
curTx.put(new JKleppmannTreeNodeHolder(lf_node)); curTx.put(new JKleppmannTreeNodeHolder(lf_node, true));
} }
return new JKleppmannTree(data); return new JKleppmannTree(data);
// opObjectRegistry.registerObject(tree); // opObjectRegistry.registerObject(tree);

View File

@@ -18,6 +18,10 @@ public record JKleppmannTreeNodeHolder(PCollection<JDataRef> refsFrom, boolean f
this(TreePSet.empty(), false, node); this(TreePSet.empty(), false, node);
} }
public JKleppmannTreeNodeHolder(JKleppmannTreeNode node, boolean frozen) {
this(TreePSet.empty(), frozen, node);
}
public JKleppmannTreeNodeHolder withNode(JKleppmannTreeNode node) { public JKleppmannTreeNodeHolder withNode(JKleppmannTreeNode node) {
Objects.requireNonNull(node, "node"); Objects.requireNonNull(node, "node");
return new JKleppmannTreeNodeHolder(refsFrom, frozen, node); return new JKleppmannTreeNodeHolder(refsFrom, frozen, node);