mirror of
https://github.com/usatiuk/dhfs.git
synced 2025-10-28 20:47:49 +01:00
cache lmdb transactions
This commit is contained in:
@@ -6,6 +6,7 @@ import com.usatiuk.dhfs.objects.JObjectKey;
|
||||
import com.usatiuk.dhfs.objects.KeyPredicateKvIterator;
|
||||
import com.usatiuk.dhfs.objects.ReversibleKvIterator;
|
||||
import com.usatiuk.dhfs.supportlib.UninitializedByteBuffer;
|
||||
import com.usatiuk.dhfs.utils.RefcountedCloseable;
|
||||
import io.quarkus.arc.properties.IfBuildProperty;
|
||||
import io.quarkus.logging.Log;
|
||||
import io.quarkus.runtime.ShutdownEvent;
|
||||
@@ -25,6 +26,7 @@ import java.nio.ByteBuffer;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.nio.file.Path;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
@@ -38,6 +40,7 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore {
|
||||
private Env<ByteBuffer> _env;
|
||||
private Dbi<ByteBuffer> _db;
|
||||
private boolean _ready = false;
|
||||
private final AtomicReference<RefcountedCloseable<Txn<ByteBuffer>>> _curReadTxn = new AtomicReference<>();
|
||||
|
||||
private long _lastTxId = 0;
|
||||
|
||||
@@ -114,8 +117,8 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore {
|
||||
}
|
||||
|
||||
private class LmdbKvIterator extends ReversibleKvIterator<JObjectKey, ByteString> {
|
||||
private final Txn<ByteBuffer> _txn = _env.txnRead();
|
||||
private final Cursor<ByteBuffer> _cursor = _db.openCursor(_txn);
|
||||
private final RefcountedCloseable<Txn<ByteBuffer>> _txn;
|
||||
private final Cursor<ByteBuffer> _cursor;
|
||||
private boolean _hasNext = false;
|
||||
|
||||
private static final Cleaner CLEANER = Cleaner.create();
|
||||
@@ -124,6 +127,23 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore {
|
||||
|
||||
LmdbKvIterator(IteratorStart start, JObjectKey key) {
|
||||
_goingForward = true;
|
||||
|
||||
_lock.readLock().lock();
|
||||
try {
|
||||
var got = _curReadTxn.get();
|
||||
var refInc = Optional.ofNullable(got).map(RefcountedCloseable::ref).orElse(null);
|
||||
if (refInc != null) {
|
||||
_txn = got;
|
||||
} else {
|
||||
var newTxn = new RefcountedCloseable<>(_env.txnRead());
|
||||
_curReadTxn.compareAndSet(got, newTxn);
|
||||
_txn = newTxn;
|
||||
}
|
||||
} finally {
|
||||
_lock.readLock().unlock();
|
||||
}
|
||||
_cursor = _db.openCursor(_txn.get());
|
||||
|
||||
var closedRef = _closed;
|
||||
var bt = _allocationStacktrace;
|
||||
CLEANER.register(this, () -> {
|
||||
@@ -200,7 +220,7 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore {
|
||||
}
|
||||
_closed.setValue(true);
|
||||
_cursor.close();
|
||||
_txn.close();
|
||||
_txn.unref();
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -298,6 +318,8 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore {
|
||||
bbData.flip();
|
||||
_db.put(txn, bb, bbData);
|
||||
|
||||
_curReadTxn.set(null);
|
||||
|
||||
txn.commit();
|
||||
} finally {
|
||||
_lock.writeLock().unlock();
|
||||
|
||||
@@ -0,0 +1,53 @@
|
||||
package com.usatiuk.dhfs.utils;
|
||||
|
||||
import io.quarkus.logging.Log;
|
||||
import org.apache.commons.lang3.mutable.MutableObject;
|
||||
|
||||
import java.lang.ref.Cleaner;
|
||||
|
||||
public class RefcountedCloseable<T extends AutoCloseable> {
|
||||
private final T _closeable;
|
||||
private int _refCount = 1;
|
||||
private final MutableObject<Boolean> _closed = new MutableObject<>(false);
|
||||
private static final Cleaner CLEANER = Cleaner.create();
|
||||
|
||||
public RefcountedCloseable(T closeable) {
|
||||
_closeable = closeable;
|
||||
var closedRef = _closed;
|
||||
CLEANER.register(this, () -> {
|
||||
if (!closedRef.getValue()) {
|
||||
Log.error("RefcountedCloseable was not closed before GC");
|
||||
System.exit(-1);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public RefcountedCloseable<T> ref() {
|
||||
synchronized (this) {
|
||||
if (_closed.getValue()) {
|
||||
return null;
|
||||
}
|
||||
_refCount++;
|
||||
return this;
|
||||
}
|
||||
}
|
||||
|
||||
public void unref() {
|
||||
synchronized (this) {
|
||||
_refCount--;
|
||||
if (_refCount == 0) {
|
||||
try {
|
||||
assert !_closed.getValue();
|
||||
_closed.setValue(true);
|
||||
_closeable.close();
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public T get() {
|
||||
return _closeable;
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user