From a8cf483eeeb7598785344cb45c8466b0fbc45322 Mon Sep 17 00:00:00 2001 From: Stepan Usatiuk Date: Sat, 26 Apr 2025 11:11:51 +0200 Subject: [PATCH] Simplify ObjectPersistentStore --- .../stores/LmdbObjectPersistentStore.java | 101 ++++++++---------- .../stores/MemoryObjectPersistentStore.java | 8 -- .../objects/stores/ObjectPersistentStore.java | 3 - .../SerializingObjectPersistentStore.java | 5 - .../usatiuk/utils/RefcountedCloseable.java | 53 --------- 5 files changed, 47 insertions(+), 123 deletions(-) delete mode 100644 dhfs-parent/utils/src/main/java/com/usatiuk/utils/RefcountedCloseable.java diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/stores/LmdbObjectPersistentStore.java b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/stores/LmdbObjectPersistentStore.java index 8f786d7a..86eff923 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/stores/LmdbObjectPersistentStore.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/stores/LmdbObjectPersistentStore.java @@ -10,7 +10,6 @@ import com.usatiuk.objects.iterators.IteratorStart; import com.usatiuk.objects.iterators.KeyPredicateKvIterator; import com.usatiuk.objects.iterators.ReversibleKvIterator; import com.usatiuk.objects.snapshot.Snapshot; -import com.usatiuk.utils.RefcountedCloseable; import io.quarkus.arc.properties.IfBuildProperty; import io.quarkus.logging.Log; import io.quarkus.runtime.ShutdownEvent; @@ -104,53 +103,48 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore { if (!_ready) throw new IllegalStateException("Wrong service order!"); } - @Nonnull - @Override - public Optional readObject(JObjectKey name) { - verifyReady(); - try (Txn txn = _env.txnRead()) { - var value = _db.get(txn, name.toByteBuffer()); - return Optional.ofNullable(value).map(ByteString::copyFrom); - } - } - @Override public Snapshot getSnapshot() { - var txn = new RefcountedCloseable<>(_env.txnRead()); - long commitId = readTxId(txn.get()).orElseThrow(); - return new Snapshot() { - private final RefcountedCloseable> _txn = txn; - private final long _id = commitId; - private boolean _closed = false; + var txn = _env.txnRead(); + try { + long commitId = readTxId(txn).orElseThrow(); + return new Snapshot() { + private final Txn _txn = txn; + private final long _id = commitId; + private boolean _closed = false; - @Override - public CloseableKvIterator getIterator(IteratorStart start, JObjectKey key) { - assert !_closed; - return new KeyPredicateKvIterator<>(new LmdbKvIterator(_txn.ref(), start, key), start, key, (k) -> !k.value().equals(DB_VER_OBJ_NAME_STR)); - } + @Override + public CloseableKvIterator getIterator(IteratorStart start, JObjectKey key) { + assert !_closed; + return new KeyPredicateKvIterator<>(new LmdbKvIterator(_txn, start, key), start, key, (k) -> !k.value().equals(DB_VER_OBJ_NAME_STR)); + } - @Nonnull - @Override - public Optional readObject(JObjectKey name) { - assert !_closed; - var got = _db.get(_txn.get(), name.toByteBuffer()); - var ret = Optional.ofNullable(got).map(UnsafeByteOperations::unsafeWrap); - return ret; - } + @Nonnull + @Override + public Optional readObject(JObjectKey name) { + assert !_closed; + var got = _db.get(_txn, name.toByteBuffer()); + var ret = Optional.ofNullable(got).map(UnsafeByteOperations::unsafeWrap); + return ret; + } - @Override - public long id() { - assert !_closed; - return _id; - } + @Override + public long id() { + assert !_closed; + return _id; + } - @Override - public void close() { - assert !_closed; - _closed = true; - _txn.unref(); - } - }; + @Override + public void close() { + assert !_closed; + _closed = true; + _txn.close(); + } + }; + } catch (Exception e) { + txn.close(); + throw e; + } } @Override @@ -205,28 +199,28 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore { private class LmdbKvIterator extends ReversibleKvIterator { private static final Cleaner CLEANER = Cleaner.create(); - private final RefcountedCloseable> _txn; + private final Txn _txn; // Managed by the snapshot private final Cursor _cursor; private final MutableObject _closed = new MutableObject<>(false); // private final Exception _allocationStacktrace = new Exception(); - private final Exception _allocationStacktrace = null; +// private final Exception _allocationStacktrace = null; private boolean _hasNext = false; private JObjectKey _peekedNextKey = null; - LmdbKvIterator(RefcountedCloseable> txn, IteratorStart start, JObjectKey key) { + LmdbKvIterator(Txn txn, IteratorStart start, JObjectKey key) { _txn = txn; _goingForward = true; - _cursor = _db.openCursor(_txn.get()); + _cursor = _db.openCursor(_txn); var closedRef = _closed; - var bt = _allocationStacktrace; - CLEANER.register(this, () -> { - if (!closedRef.getValue()) { - Log.error("Iterator was not closed before GC, allocated at: {0}", bt); - System.exit(-1); - } - }); +// var bt = _allocationStacktrace; +// CLEANER.register(this, () -> { +// if (!closedRef.getValue()) { +// Log.error("Iterator was not closed before GC, allocated at: {0}", bt); +// System.exit(-1); +// } +// }); verifyReady(); @@ -308,7 +302,6 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore { } _closed.setValue(true); _cursor.close(); - _txn.unref(); } @Override diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/stores/MemoryObjectPersistentStore.java b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/stores/MemoryObjectPersistentStore.java index 8e3af9ba..b15944f5 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/stores/MemoryObjectPersistentStore.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/stores/MemoryObjectPersistentStore.java @@ -21,14 +21,6 @@ public class MemoryObjectPersistentStore implements ObjectPersistentStore { private TreePMap _objects = TreePMap.empty(); private long _lastCommitId = 0; - @Nonnull - @Override - public Optional readObject(JObjectKey name) { - synchronized (this) { - return Optional.ofNullable(_objects.get(name)); - } - } - @Override public Snapshot getSnapshot() { synchronized (this) { diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/stores/ObjectPersistentStore.java b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/stores/ObjectPersistentStore.java index b84adb94..0600af3c 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/stores/ObjectPersistentStore.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/stores/ObjectPersistentStore.java @@ -10,9 +10,6 @@ import java.util.Optional; // Persistent storage of objects // All changes are written as sequential transactions public interface ObjectPersistentStore { - @Nonnull - Optional readObject(JObjectKey name); - Snapshot getSnapshot(); Runnable prepareTx(TxManifestRaw names, long txId); diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/stores/SerializingObjectPersistentStore.java b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/stores/SerializingObjectPersistentStore.java index 92a6217e..963c8e20 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/stores/SerializingObjectPersistentStore.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/stores/SerializingObjectPersistentStore.java @@ -23,11 +23,6 @@ public class SerializingObjectPersistentStore { @Inject ObjectPersistentStore delegateStore; - @Nonnull - Optional readObject(JObjectKey name) { - return delegateStore.readObject(name).map(serializer::deserialize); - } - public Snapshot getSnapshot() { return new Snapshot() { private final Snapshot _backing = delegateStore.getSnapshot(); diff --git a/dhfs-parent/utils/src/main/java/com/usatiuk/utils/RefcountedCloseable.java b/dhfs-parent/utils/src/main/java/com/usatiuk/utils/RefcountedCloseable.java deleted file mode 100644 index 3ed16c88..00000000 --- a/dhfs-parent/utils/src/main/java/com/usatiuk/utils/RefcountedCloseable.java +++ /dev/null @@ -1,53 +0,0 @@ -package com.usatiuk.utils; - -import io.quarkus.logging.Log; -import org.apache.commons.lang3.mutable.MutableObject; - -import java.lang.ref.Cleaner; - -public class RefcountedCloseable { - private static final Cleaner CLEANER = Cleaner.create(); - private final T _closeable; - private final MutableObject _closed = new MutableObject<>(false); - private int _refCount = 1; - - public RefcountedCloseable(T closeable) { - _closeable = closeable; - var closedRef = _closed; - CLEANER.register(this, () -> { - if (!closedRef.getValue()) { - Log.error("RefcountedCloseable was not closed before GC"); - System.exit(-1); - } - }); - } - - public RefcountedCloseable ref() { - synchronized (this) { - if (_closed.getValue()) { - return null; - } - _refCount++; - return this; - } - } - - public void unref() { - synchronized (this) { - _refCount--; - if (_refCount == 0) { - try { - assert !_closed.getValue(); - _closed.setValue(true); - _closeable.close(); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - } - } - - public T get() { - return _closeable; - } -}