From 6a962022bb93311ef5bf1e6f0a7a80dec647ffc6 Mon Sep 17 00:00:00 2001 From: Stepan Usatiuk Date: Mon, 28 Apr 2025 09:03:13 +0200 Subject: [PATCH] dump --- dhfs-parent/objects/pom.xml | 5 ++++ .../JDataVersionedWrapperSerializer.java | 13 ++++------- .../usatiuk/objects/JavaDataSerializer.java | 23 +++++++++++-------- .../com/usatiuk/objects/ObjectSerializer.java | 4 +++- .../stores/LmdbObjectPersistentStore.java | 19 +++++++-------- .../stores/MemoryObjectPersistentStore.java | 14 ++++++----- .../objects/stores/ObjectPersistentStore.java | 3 ++- .../SerializingObjectPersistentStore.java | 6 +++-- .../objects/transaction/JObjectManager.java | 14 +++++++---- .../jkleppmanntree/JKleppmannTreeManager.java | 6 ++--- .../structs/JKleppmannTreeNodeHolder.java | 4 ++++ 11 files changed, 65 insertions(+), 46 deletions(-) diff --git a/dhfs-parent/objects/pom.xml b/dhfs-parent/objects/pom.xml index f79b8e9b..534d8cc0 100644 --- a/dhfs-parent/objects/pom.xml +++ b/dhfs-parent/objects/pom.xml @@ -18,6 +18,11 @@ + + org.apache.fury + fury-core + 0.10.1 + net.jqwik jqwik diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/JDataVersionedWrapperSerializer.java b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/JDataVersionedWrapperSerializer.java index fa7b4156..b5a9238a 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/JDataVersionedWrapperSerializer.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/JDataVersionedWrapperSerializer.java @@ -8,11 +8,10 @@ import jakarta.inject.Singleton; import java.nio.ByteBuffer; @Singleton -public class JDataVersionedWrapperSerializer implements ObjectSerializer { +public class JDataVersionedWrapperSerializer { @Inject ObjectSerializer dataSerializer; - @Override public ByteString serialize(JDataVersionedWrapper obj) { ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES); buffer.putLong(obj.version()); @@ -20,12 +19,10 @@ public class JDataVersionedWrapperSerializer implements ObjectSerializer dataSerializer.deserialize(rawData) + public JDataVersionedWrapper deserialize(ByteBuffer data) { + var version = data.getLong(); + return new JDataVersionedWrapperLazy(version, data.remaining(), + () -> dataSerializer.deserialize(data) ); } } diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/JavaDataSerializer.java b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/JavaDataSerializer.java index b519521f..75bde6af 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/JavaDataSerializer.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/JavaDataSerializer.java @@ -2,26 +2,31 @@ package com.usatiuk.objects; import com.google.protobuf.ByteString; -import com.usatiuk.utils.SerializationHelper; +import com.google.protobuf.UnsafeByteOperations; import io.quarkus.arc.DefaultBean; import jakarta.enterprise.context.ApplicationScoped; +import org.apache.fury.Fury; +import org.apache.fury.ThreadSafeFury; +import org.apache.fury.config.Language; -import java.io.IOException; +import java.nio.ByteBuffer; @ApplicationScoped @DefaultBean public class JavaDataSerializer implements ObjectSerializer { + private static final ThreadSafeFury fury = Fury.builder().withLanguage(Language.JAVA) + // Allow to deserialize objects unknown types, + // more flexible but less secure. + .requireClassRegistration(false) + .buildThreadSafeFury(); + @Override public ByteString serialize(JData obj) { - return SerializationHelper.serialize(obj); + return UnsafeByteOperations.unsafeWrap(fury.serialize(obj)); } @Override - public JData deserialize(ByteString data) { - try (var is = data.newInput()) { - return SerializationHelper.deserialize(is); - } catch (IOException e) { - throw new RuntimeException(e); - } + public JData deserialize(ByteBuffer data) { + return (JData) fury.deserialize(data); } } diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/ObjectSerializer.java b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/ObjectSerializer.java index 788f5a52..d53d482f 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/ObjectSerializer.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/ObjectSerializer.java @@ -2,8 +2,10 @@ package com.usatiuk.objects; import com.google.protobuf.ByteString; +import java.nio.ByteBuffer; + public interface ObjectSerializer { ByteString serialize(T obj); - T deserialize(ByteString data); + T deserialize(ByteBuffer data); } diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/stores/LmdbObjectPersistentStore.java b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/stores/LmdbObjectPersistentStore.java index 86eff923..c922360b 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/stores/LmdbObjectPersistentStore.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/stores/LmdbObjectPersistentStore.java @@ -1,7 +1,5 @@ package com.usatiuk.objects.stores; -import com.google.protobuf.ByteString; -import com.google.protobuf.UnsafeByteOperations; import com.usatiuk.objects.JObjectKey; import com.usatiuk.objects.JObjectKeyMax; import com.usatiuk.objects.JObjectKeyMin; @@ -104,27 +102,27 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore { } @Override - public Snapshot getSnapshot() { + public Snapshot getSnapshot() { var txn = _env.txnRead(); try { long commitId = readTxId(txn).orElseThrow(); - return new Snapshot() { + return new Snapshot() { private final Txn _txn = txn; private final long _id = commitId; private boolean _closed = false; @Override - public CloseableKvIterator getIterator(IteratorStart start, JObjectKey key) { + public CloseableKvIterator getIterator(IteratorStart start, JObjectKey key) { assert !_closed; return new KeyPredicateKvIterator<>(new LmdbKvIterator(_txn, start, key), start, key, (k) -> !k.value().equals(DB_VER_OBJ_NAME_STR)); } @Nonnull @Override - public Optional readObject(JObjectKey name) { + public Optional readObject(JObjectKey name) { assert !_closed; var got = _db.get(_txn, name.toByteBuffer()); - var ret = Optional.ofNullable(got).map(UnsafeByteOperations::unsafeWrap); + var ret = Optional.ofNullable(got).map(ByteBuffer::asReadOnlyBuffer); return ret; } @@ -197,7 +195,7 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore { return _root.toFile().getUsableSpace(); } - private class LmdbKvIterator extends ReversibleKvIterator { + private class LmdbKvIterator extends ReversibleKvIterator { private static final Cleaner CLEANER = Cleaner.create(); private final Txn _txn; // Managed by the snapshot private final Cursor _cursor; @@ -352,14 +350,13 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore { } @Override - protected Pair nextImpl() { + protected Pair nextImpl() { if (!_hasNext) { throw new NoSuchElementException("No more elements"); } // TODO: Right now with java serialization it doesn't matter, it's all copied to arrays anyway var val = _cursor.val(); - var bs = UnsafeByteOperations.unsafeWrap(val); - var ret = Pair.of(JObjectKey.fromByteBuffer(_cursor.key()), bs); + var ret = Pair.of(JObjectKey.fromByteBuffer(_cursor.key()), val.asReadOnlyBuffer()); if (_goingForward) _hasNext = _cursor.next(); else diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/stores/MemoryObjectPersistentStore.java b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/stores/MemoryObjectPersistentStore.java index b15944f5..79765b4f 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/stores/MemoryObjectPersistentStore.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/stores/MemoryObjectPersistentStore.java @@ -4,6 +4,7 @@ import com.google.protobuf.ByteString; import com.usatiuk.objects.JObjectKey; import com.usatiuk.objects.iterators.CloseableKvIterator; import com.usatiuk.objects.iterators.IteratorStart; +import com.usatiuk.objects.iterators.MappingKvIterator; import com.usatiuk.objects.iterators.NavigableMapKvIterator; import com.usatiuk.objects.snapshot.Snapshot; import io.quarkus.arc.properties.IfBuildProperty; @@ -11,6 +12,7 @@ import jakarta.enterprise.context.ApplicationScoped; import org.pcollections.TreePMap; import javax.annotation.Nonnull; +import java.nio.ByteBuffer; import java.util.Optional; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -22,21 +24,21 @@ public class MemoryObjectPersistentStore implements ObjectPersistentStore { private long _lastCommitId = 0; @Override - public Snapshot getSnapshot() { + public Snapshot getSnapshot() { synchronized (this) { - return new Snapshot() { + return new Snapshot() { private final TreePMap _objects = MemoryObjectPersistentStore.this._objects; private final long _lastCommitId = MemoryObjectPersistentStore.this._lastCommitId; @Override - public CloseableKvIterator getIterator(IteratorStart start, JObjectKey key) { - return new NavigableMapKvIterator<>(_objects, start, key); + public CloseableKvIterator getIterator(IteratorStart start, JObjectKey key) { + return new MappingKvIterator<>(new NavigableMapKvIterator<>(_objects, start, key), ByteString::asReadOnlyByteBuffer); } @Nonnull @Override - public Optional readObject(JObjectKey name) { - return Optional.ofNullable(_objects.get(name)); + public Optional readObject(JObjectKey name) { + return Optional.ofNullable(_objects.get(name)).map(ByteString::asReadOnlyByteBuffer); } @Override diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/stores/ObjectPersistentStore.java b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/stores/ObjectPersistentStore.java index 0600af3c..d1918de2 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/stores/ObjectPersistentStore.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/stores/ObjectPersistentStore.java @@ -5,12 +5,13 @@ import com.usatiuk.objects.JObjectKey; import com.usatiuk.objects.snapshot.Snapshot; import javax.annotation.Nonnull; +import java.nio.ByteBuffer; import java.util.Optional; // Persistent storage of objects // All changes are written as sequential transactions public interface ObjectPersistentStore { - Snapshot getSnapshot(); + Snapshot getSnapshot(); Runnable prepareTx(TxManifestRaw names, long txId); diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/stores/SerializingObjectPersistentStore.java b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/stores/SerializingObjectPersistentStore.java index 963c8e20..4af76bde 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/stores/SerializingObjectPersistentStore.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/stores/SerializingObjectPersistentStore.java @@ -2,6 +2,7 @@ package com.usatiuk.objects.stores; import com.google.protobuf.ByteString; import com.usatiuk.objects.JDataVersionedWrapper; +import com.usatiuk.objects.JDataVersionedWrapperSerializer; import com.usatiuk.objects.JObjectKey; import com.usatiuk.objects.ObjectSerializer; import com.usatiuk.objects.iterators.CloseableKvIterator; @@ -13,19 +14,20 @@ import jakarta.inject.Inject; import org.apache.commons.lang3.tuple.Pair; import javax.annotation.Nonnull; +import java.nio.ByteBuffer; import java.util.Optional; @ApplicationScoped public class SerializingObjectPersistentStore { @Inject - ObjectSerializer serializer; + JDataVersionedWrapperSerializer serializer; @Inject ObjectPersistentStore delegateStore; public Snapshot getSnapshot() { return new Snapshot() { - private final Snapshot _backing = delegateStore.getSnapshot(); + private final Snapshot _backing = delegateStore.getSnapshot(); @Override public CloseableKvIterator getIterator(IteratorStart start, JObjectKey key) { diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/transaction/JObjectManager.java b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/transaction/JObjectManager.java index 0093ab3f..cdc47cac 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/transaction/JObjectManager.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/transaction/JObjectManager.java @@ -11,7 +11,7 @@ import io.quarkus.runtime.StartupEvent; import jakarta.annotation.Priority; import jakarta.enterprise.context.ApplicationScoped; import jakarta.enterprise.event.Observes; -import jakarta.enterprise.inject.Instance; +import jakarta.enterprise.inject.spi.CDI; import jakarta.inject.Inject; import org.apache.commons.lang3.tuple.Pair; @@ -22,7 +22,7 @@ import java.util.stream.Stream; @ApplicationScoped public class JObjectManager { - private final List _preCommitTxHooks; + private static final List _preCommitTxHooks; @Inject WritebackObjectPersistentStore writebackObjectPersistentStore; @Inject @@ -30,9 +30,13 @@ public class JObjectManager { @Inject LockManager lockManager; private boolean _ready = false; - JObjectManager(Instance preCommitTxHooks) { - _preCommitTxHooks = List.copyOf(preCommitTxHooks.stream().sorted(Comparator.comparingInt(PreCommitTxHook::getPriority)).toList()); - Log.debugv("Pre-commit hooks: {0}", String.join("->", _preCommitTxHooks.stream().map(Objects::toString).toList())); + + static { + _preCommitTxHooks = List.copyOf(CDI.current().select(PreCommitTxHook.class).stream().sorted(Comparator.comparingInt(PreCommitTxHook::getPriority)).toList()); + } + + JObjectManager() { + Log.infov("Pre-commit hooks: {0}", String.join("->", _preCommitTxHooks.stream().map(Objects::toString).toList())); } private void verifyReady() { diff --git a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/jkleppmanntree/JKleppmannTreeManager.java b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/jkleppmanntree/JKleppmannTreeManager.java index e01d894c..c7a938e4 100644 --- a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/jkleppmanntree/JKleppmannTreeManager.java +++ b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/jkleppmanntree/JKleppmannTreeManager.java @@ -54,11 +54,11 @@ public class JKleppmannTreeManager { ); curTx.put(data); var rootNode = new JKleppmannTreeNode(JObjectKey.of(name.value() + "_jt_root"), null, rootNodeSupplier.get()); - curTx.put(new JKleppmannTreeNodeHolder(rootNode)); + curTx.put(new JKleppmannTreeNodeHolder(rootNode, true)); var trashNode = new JKleppmannTreeNode(JObjectKey.of(name.value() + "_jt_trash"), null, rootNodeSupplier.get()); - curTx.put(new JKleppmannTreeNodeHolder(trashNode)); + curTx.put(new JKleppmannTreeNodeHolder(trashNode, true)); var lf_node = new JKleppmannTreeNode(JObjectKey.of(name.value() + "_jt_lf"), null, rootNodeSupplier.get()); - curTx.put(new JKleppmannTreeNodeHolder(lf_node)); + curTx.put(new JKleppmannTreeNodeHolder(lf_node, true)); } return new JKleppmannTree(data); // opObjectRegistry.registerObject(tree); diff --git a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/jkleppmanntree/structs/JKleppmannTreeNodeHolder.java b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/jkleppmanntree/structs/JKleppmannTreeNodeHolder.java index 9efc2e22..fb3ac05c 100644 --- a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/jkleppmanntree/structs/JKleppmannTreeNodeHolder.java +++ b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/jkleppmanntree/structs/JKleppmannTreeNodeHolder.java @@ -18,6 +18,10 @@ public record JKleppmannTreeNodeHolder(PCollection refsFrom, boolean f this(TreePSet.empty(), false, node); } + public JKleppmannTreeNodeHolder(JKleppmannTreeNode node, boolean frozen) { + this(TreePSet.empty(), frozen, node); + } + public JKleppmannTreeNodeHolder withNode(JKleppmannTreeNode node) { Objects.requireNonNull(node, "node"); return new JKleppmannTreeNodeHolder(refsFrom, frozen, node);