diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/persistence/LmdbObjectPersistentStore.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/persistence/LmdbObjectPersistentStore.java index 080b51ab..39f2e289 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/persistence/LmdbObjectPersistentStore.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/persistence/LmdbObjectPersistentStore.java @@ -6,6 +6,7 @@ import com.usatiuk.dhfs.objects.JObjectKey; import com.usatiuk.dhfs.objects.KeyPredicateKvIterator; import com.usatiuk.dhfs.objects.ReversibleKvIterator; import com.usatiuk.dhfs.supportlib.UninitializedByteBuffer; +import com.usatiuk.dhfs.utils.RefcountedCloseable; import io.quarkus.arc.properties.IfBuildProperty; import io.quarkus.logging.Log; import io.quarkus.runtime.ShutdownEvent; @@ -25,6 +26,7 @@ import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.nio.file.Path; import java.util.*; +import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Consumer; @@ -38,6 +40,7 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore { private Env _env; private Dbi _db; private boolean _ready = false; + private final AtomicReference>> _curReadTxn = new AtomicReference<>(); private long _lastTxId = 0; @@ -114,8 +117,8 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore { } private class LmdbKvIterator extends ReversibleKvIterator { - private final Txn _txn = _env.txnRead(); - private final Cursor _cursor = _db.openCursor(_txn); + private final RefcountedCloseable> _txn; + private final Cursor _cursor; private boolean _hasNext = false; private static final Cleaner CLEANER = Cleaner.create(); @@ -124,6 +127,23 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore { LmdbKvIterator(IteratorStart start, JObjectKey key) { _goingForward = true; + + _lock.readLock().lock(); + try { + var got = _curReadTxn.get(); + var refInc = Optional.ofNullable(got).map(RefcountedCloseable::ref).orElse(null); + if (refInc != null) { + _txn = got; + } else { + var newTxn = new RefcountedCloseable<>(_env.txnRead()); + _curReadTxn.compareAndSet(got, newTxn); + _txn = newTxn; + } + } finally { + _lock.readLock().unlock(); + } + _cursor = _db.openCursor(_txn.get()); + var closedRef = _closed; var bt = _allocationStacktrace; CLEANER.register(this, () -> { @@ -200,7 +220,7 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore { } _closed.setValue(true); _cursor.close(); - _txn.close(); + _txn.unref(); } @Override @@ -298,6 +318,8 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore { bbData.flip(); _db.put(txn, bb, bbData); + _curReadTxn.set(null); + txn.commit(); } finally { _lock.writeLock().unlock(); diff --git a/dhfs-parent/utils/src/main/java/com/usatiuk/dhfs/utils/RefcountedCloseable.java b/dhfs-parent/utils/src/main/java/com/usatiuk/dhfs/utils/RefcountedCloseable.java new file mode 100644 index 00000000..8c6d46d8 --- /dev/null +++ b/dhfs-parent/utils/src/main/java/com/usatiuk/dhfs/utils/RefcountedCloseable.java @@ -0,0 +1,53 @@ +package com.usatiuk.dhfs.utils; + +import io.quarkus.logging.Log; +import org.apache.commons.lang3.mutable.MutableObject; + +import java.lang.ref.Cleaner; + +public class RefcountedCloseable { + private final T _closeable; + private int _refCount = 1; + private final MutableObject _closed = new MutableObject<>(false); + private static final Cleaner CLEANER = Cleaner.create(); + + public RefcountedCloseable(T closeable) { + _closeable = closeable; + var closedRef = _closed; + CLEANER.register(this, () -> { + if (!closedRef.getValue()) { + Log.error("RefcountedCloseable was not closed before GC"); + System.exit(-1); + } + }); + } + + public RefcountedCloseable ref() { + synchronized (this) { + if (_closed.getValue()) { + return null; + } + _refCount++; + return this; + } + } + + public void unref() { + synchronized (this) { + _refCount--; + if (_refCount == 0) { + try { + assert !_closed.getValue(); + _closed.setValue(true); + _closeable.close(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + } + } + + public T get() { + return _closeable; + } +}