1 Commits

Author SHA1 Message Date
6a962022bb dump 2025-04-28 09:03:13 +02:00
11 changed files with 65 additions and 46 deletions

View File

@@ -18,6 +18,11 @@
</properties>
<dependencies>
<dependency>
<groupId>org.apache.fury</groupId>
<artifactId>fury-core</artifactId>
<version>0.10.1</version>
</dependency>
<dependency>
<groupId>net.jqwik</groupId>
<artifactId>jqwik</artifactId>

View File

@@ -8,11 +8,10 @@ import jakarta.inject.Singleton;
import java.nio.ByteBuffer;
@Singleton
public class JDataVersionedWrapperSerializer implements ObjectSerializer<JDataVersionedWrapper> {
public class JDataVersionedWrapperSerializer {
@Inject
ObjectSerializer<JData> dataSerializer;
@Override
public ByteString serialize(JDataVersionedWrapper obj) {
ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES);
buffer.putLong(obj.version());
@@ -20,12 +19,10 @@ public class JDataVersionedWrapperSerializer implements ObjectSerializer<JDataVe
return ByteString.copyFrom(buffer).concat(dataSerializer.serialize(obj.data()));
}
@Override
public JDataVersionedWrapper deserialize(ByteString data) {
var version = data.substring(0, Long.BYTES).asReadOnlyByteBuffer().getLong();
var rawData = data.substring(Long.BYTES);
return new JDataVersionedWrapperLazy(version, rawData.size(),
() -> dataSerializer.deserialize(rawData)
public JDataVersionedWrapper deserialize(ByteBuffer data) {
var version = data.getLong();
return new JDataVersionedWrapperLazy(version, data.remaining(),
() -> dataSerializer.deserialize(data)
);
}
}

View File

@@ -2,26 +2,31 @@ package com.usatiuk.objects;
import com.google.protobuf.ByteString;
import com.usatiuk.utils.SerializationHelper;
import com.google.protobuf.UnsafeByteOperations;
import io.quarkus.arc.DefaultBean;
import jakarta.enterprise.context.ApplicationScoped;
import org.apache.fury.Fury;
import org.apache.fury.ThreadSafeFury;
import org.apache.fury.config.Language;
import java.io.IOException;
import java.nio.ByteBuffer;
@ApplicationScoped
@DefaultBean
public class JavaDataSerializer implements ObjectSerializer<JData> {
private static final ThreadSafeFury fury = Fury.builder().withLanguage(Language.JAVA)
// Allow to deserialize objects unknown types,
// more flexible but less secure.
.requireClassRegistration(false)
.buildThreadSafeFury();
@Override
public ByteString serialize(JData obj) {
return SerializationHelper.serialize(obj);
return UnsafeByteOperations.unsafeWrap(fury.serialize(obj));
}
@Override
public JData deserialize(ByteString data) {
try (var is = data.newInput()) {
return SerializationHelper.deserialize(is);
} catch (IOException e) {
throw new RuntimeException(e);
}
public JData deserialize(ByteBuffer data) {
return (JData) fury.deserialize(data);
}
}

View File

@@ -2,8 +2,10 @@ package com.usatiuk.objects;
import com.google.protobuf.ByteString;
import java.nio.ByteBuffer;
public interface ObjectSerializer<T> {
ByteString serialize(T obj);
T deserialize(ByteString data);
T deserialize(ByteBuffer data);
}

View File

@@ -1,7 +1,5 @@
package com.usatiuk.objects.stores;
import com.google.protobuf.ByteString;
import com.google.protobuf.UnsafeByteOperations;
import com.usatiuk.objects.JObjectKey;
import com.usatiuk.objects.JObjectKeyMax;
import com.usatiuk.objects.JObjectKeyMin;
@@ -104,27 +102,27 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore {
}
@Override
public Snapshot<JObjectKey, ByteString> getSnapshot() {
public Snapshot<JObjectKey, ByteBuffer> getSnapshot() {
var txn = _env.txnRead();
try {
long commitId = readTxId(txn).orElseThrow();
return new Snapshot<JObjectKey, ByteString>() {
return new Snapshot<JObjectKey, ByteBuffer>() {
private final Txn<ByteBuffer> _txn = txn;
private final long _id = commitId;
private boolean _closed = false;
@Override
public CloseableKvIterator<JObjectKey, ByteString> getIterator(IteratorStart start, JObjectKey key) {
public CloseableKvIterator<JObjectKey, ByteBuffer> getIterator(IteratorStart start, JObjectKey key) {
assert !_closed;
return new KeyPredicateKvIterator<>(new LmdbKvIterator(_txn, start, key), start, key, (k) -> !k.value().equals(DB_VER_OBJ_NAME_STR));
}
@Nonnull
@Override
public Optional<ByteString> readObject(JObjectKey name) {
public Optional<ByteBuffer> readObject(JObjectKey name) {
assert !_closed;
var got = _db.get(_txn, name.toByteBuffer());
var ret = Optional.ofNullable(got).map(UnsafeByteOperations::unsafeWrap);
var ret = Optional.ofNullable(got).map(ByteBuffer::asReadOnlyBuffer);
return ret;
}
@@ -197,7 +195,7 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore {
return _root.toFile().getUsableSpace();
}
private class LmdbKvIterator extends ReversibleKvIterator<JObjectKey, ByteString> {
private class LmdbKvIterator extends ReversibleKvIterator<JObjectKey, ByteBuffer> {
private static final Cleaner CLEANER = Cleaner.create();
private final Txn<ByteBuffer> _txn; // Managed by the snapshot
private final Cursor<ByteBuffer> _cursor;
@@ -352,14 +350,13 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore {
}
@Override
protected Pair<JObjectKey, ByteString> nextImpl() {
protected Pair<JObjectKey, ByteBuffer> nextImpl() {
if (!_hasNext) {
throw new NoSuchElementException("No more elements");
}
// TODO: Right now with java serialization it doesn't matter, it's all copied to arrays anyway
var val = _cursor.val();
var bs = UnsafeByteOperations.unsafeWrap(val);
var ret = Pair.of(JObjectKey.fromByteBuffer(_cursor.key()), bs);
var ret = Pair.of(JObjectKey.fromByteBuffer(_cursor.key()), val.asReadOnlyBuffer());
if (_goingForward)
_hasNext = _cursor.next();
else

View File

@@ -4,6 +4,7 @@ import com.google.protobuf.ByteString;
import com.usatiuk.objects.JObjectKey;
import com.usatiuk.objects.iterators.CloseableKvIterator;
import com.usatiuk.objects.iterators.IteratorStart;
import com.usatiuk.objects.iterators.MappingKvIterator;
import com.usatiuk.objects.iterators.NavigableMapKvIterator;
import com.usatiuk.objects.snapshot.Snapshot;
import io.quarkus.arc.properties.IfBuildProperty;
@@ -11,6 +12,7 @@ import jakarta.enterprise.context.ApplicationScoped;
import org.pcollections.TreePMap;
import javax.annotation.Nonnull;
import java.nio.ByteBuffer;
import java.util.Optional;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -22,21 +24,21 @@ public class MemoryObjectPersistentStore implements ObjectPersistentStore {
private long _lastCommitId = 0;
@Override
public Snapshot<JObjectKey, ByteString> getSnapshot() {
public Snapshot<JObjectKey, ByteBuffer> getSnapshot() {
synchronized (this) {
return new Snapshot<JObjectKey, ByteString>() {
return new Snapshot<JObjectKey, ByteBuffer>() {
private final TreePMap<JObjectKey, ByteString> _objects = MemoryObjectPersistentStore.this._objects;
private final long _lastCommitId = MemoryObjectPersistentStore.this._lastCommitId;
@Override
public CloseableKvIterator<JObjectKey, ByteString> getIterator(IteratorStart start, JObjectKey key) {
return new NavigableMapKvIterator<>(_objects, start, key);
public CloseableKvIterator<JObjectKey, ByteBuffer> getIterator(IteratorStart start, JObjectKey key) {
return new MappingKvIterator<>(new NavigableMapKvIterator<>(_objects, start, key), ByteString::asReadOnlyByteBuffer);
}
@Nonnull
@Override
public Optional<ByteString> readObject(JObjectKey name) {
return Optional.ofNullable(_objects.get(name));
public Optional<ByteBuffer> readObject(JObjectKey name) {
return Optional.ofNullable(_objects.get(name)).map(ByteString::asReadOnlyByteBuffer);
}
@Override

View File

@@ -5,12 +5,13 @@ import com.usatiuk.objects.JObjectKey;
import com.usatiuk.objects.snapshot.Snapshot;
import javax.annotation.Nonnull;
import java.nio.ByteBuffer;
import java.util.Optional;
// Persistent storage of objects
// All changes are written as sequential transactions
public interface ObjectPersistentStore {
Snapshot<JObjectKey, ByteString> getSnapshot();
Snapshot<JObjectKey, ByteBuffer> getSnapshot();
Runnable prepareTx(TxManifestRaw names, long txId);

View File

@@ -2,6 +2,7 @@ package com.usatiuk.objects.stores;
import com.google.protobuf.ByteString;
import com.usatiuk.objects.JDataVersionedWrapper;
import com.usatiuk.objects.JDataVersionedWrapperSerializer;
import com.usatiuk.objects.JObjectKey;
import com.usatiuk.objects.ObjectSerializer;
import com.usatiuk.objects.iterators.CloseableKvIterator;
@@ -13,19 +14,20 @@ import jakarta.inject.Inject;
import org.apache.commons.lang3.tuple.Pair;
import javax.annotation.Nonnull;
import java.nio.ByteBuffer;
import java.util.Optional;
@ApplicationScoped
public class SerializingObjectPersistentStore {
@Inject
ObjectSerializer<JDataVersionedWrapper> serializer;
JDataVersionedWrapperSerializer serializer;
@Inject
ObjectPersistentStore delegateStore;
public Snapshot<JObjectKey, JDataVersionedWrapper> getSnapshot() {
return new Snapshot<JObjectKey, JDataVersionedWrapper>() {
private final Snapshot<JObjectKey, ByteString> _backing = delegateStore.getSnapshot();
private final Snapshot<JObjectKey, ByteBuffer> _backing = delegateStore.getSnapshot();
@Override
public CloseableKvIterator<JObjectKey, JDataVersionedWrapper> getIterator(IteratorStart start, JObjectKey key) {

View File

@@ -11,7 +11,7 @@ import io.quarkus.runtime.StartupEvent;
import jakarta.annotation.Priority;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
import jakarta.enterprise.inject.Instance;
import jakarta.enterprise.inject.spi.CDI;
import jakarta.inject.Inject;
import org.apache.commons.lang3.tuple.Pair;
@@ -22,7 +22,7 @@ import java.util.stream.Stream;
@ApplicationScoped
public class JObjectManager {
private final List<PreCommitTxHook> _preCommitTxHooks;
private static final List<PreCommitTxHook> _preCommitTxHooks;
@Inject
WritebackObjectPersistentStore writebackObjectPersistentStore;
@Inject
@@ -30,9 +30,13 @@ public class JObjectManager {
@Inject
LockManager lockManager;
private boolean _ready = false;
JObjectManager(Instance<PreCommitTxHook> preCommitTxHooks) {
_preCommitTxHooks = List.copyOf(preCommitTxHooks.stream().sorted(Comparator.comparingInt(PreCommitTxHook::getPriority)).toList());
Log.debugv("Pre-commit hooks: {0}", String.join("->", _preCommitTxHooks.stream().map(Objects::toString).toList()));
static {
_preCommitTxHooks = List.copyOf(CDI.current().select(PreCommitTxHook.class).stream().sorted(Comparator.comparingInt(PreCommitTxHook::getPriority)).toList());
}
JObjectManager() {
Log.infov("Pre-commit hooks: {0}", String.join("->", _preCommitTxHooks.stream().map(Objects::toString).toList()));
}
private void verifyReady() {

View File

@@ -54,11 +54,11 @@ public class JKleppmannTreeManager {
);
curTx.put(data);
var rootNode = new JKleppmannTreeNode(JObjectKey.of(name.value() + "_jt_root"), null, rootNodeSupplier.get());
curTx.put(new JKleppmannTreeNodeHolder(rootNode));
curTx.put(new JKleppmannTreeNodeHolder(rootNode, true));
var trashNode = new JKleppmannTreeNode(JObjectKey.of(name.value() + "_jt_trash"), null, rootNodeSupplier.get());
curTx.put(new JKleppmannTreeNodeHolder(trashNode));
curTx.put(new JKleppmannTreeNodeHolder(trashNode, true));
var lf_node = new JKleppmannTreeNode(JObjectKey.of(name.value() + "_jt_lf"), null, rootNodeSupplier.get());
curTx.put(new JKleppmannTreeNodeHolder(lf_node));
curTx.put(new JKleppmannTreeNodeHolder(lf_node, true));
}
return new JKleppmannTree(data);
// opObjectRegistry.registerObject(tree);

View File

@@ -18,6 +18,10 @@ public record JKleppmannTreeNodeHolder(PCollection<JDataRef> refsFrom, boolean f
this(TreePSet.empty(), false, node);
}
public JKleppmannTreeNodeHolder(JKleppmannTreeNode node, boolean frozen) {
this(TreePSet.empty(), frozen, node);
}
public JKleppmannTreeNodeHolder withNode(JKleppmannTreeNode node) {
Objects.requireNonNull(node, "node");
return new JKleppmannTreeNodeHolder(refsFrom, frozen, node);