Objects: get rid of locks in lmdb store

This commit is contained in:
2025-04-02 15:13:58 +02:00
parent 194166109e
commit 735dd605d7
9 changed files with 402 additions and 508 deletions

View File

@@ -30,9 +30,4 @@ public class SnapshotManager {
});
}
}
@Nonnull
public Optional<JDataVersionedWrapper> readObjectDirect(JObjectKey name) {
return writebackStore.readObject(name);
}
}

View File

@@ -4,52 +4,86 @@ import com.usatiuk.objects.JDataVersionedWrapper;
import com.usatiuk.objects.JObjectKey;
import com.usatiuk.objects.iterators.*;
import com.usatiuk.objects.snapshot.Snapshot;
import com.usatiuk.objects.transaction.LockManager;
import io.quarkus.logging.Log;
import io.quarkus.runtime.Startup;
import io.quarkus.runtime.StartupEvent;
import jakarta.annotation.Priority;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
import jakarta.inject.Inject;
import org.apache.commons.lang3.tuple.Pair;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.pcollections.TreePMap;
import javax.annotation.Nonnull;
import java.util.LinkedHashMap;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.atomic.AtomicReference;
@ApplicationScoped
public class CachingObjectPersistentStore {
private final LinkedHashMap<JObjectKey, CacheEntry> _cache = new LinkedHashMap<>();
private final ReentrantReadWriteLock _lock = new ReentrantReadWriteLock();
@Inject
LockManager lockManager;
@Inject
SerializingObjectPersistentStore delegate;
@ConfigProperty(name = "dhfs.objects.lru.limit")
long sizeLimit;
@ConfigProperty(name = "dhfs.objects.lru.print-stats")
boolean printStats;
private TreePMap<JObjectKey, CacheEntry> _sortedCache = TreePMap.empty();
private long _cacheVersion = 0;
private long _curSize = 0;
private long _evict = 0;
private ExecutorService _statusExecutor = null;
private record Cache(TreePMap<JObjectKey, CacheEntry> map,
int size,
long version,
int sizeLimit) {
public Cache withPut(JObjectKey key, Optional<JDataVersionedWrapper> obj) {
int objSize = obj.map(JDataVersionedWrapper::estimateSize).orElse(16);
@Startup
void init() {
int newSize = size() + objSize;
var entry = new CacheEntry(obj.<MaybeTombstone<JDataVersionedWrapper>>map(Data::new).orElse(new Tombstone<>()), objSize);
var old = map.get(key);
if (old != null)
newSize -= old.size();
TreePMap<JObjectKey, CacheEntry> newCache = map().plus(key, entry);
while (newSize > sizeLimit) {
var del = newCache.firstEntry();
newCache = newCache.minusFirstEntry();
newSize -= del.getValue().size();
}
return new Cache(
newCache,
newSize,
version,
sizeLimit
);
}
public Cache withVersion(long version) {
return new Cache(map, size, version, sizeLimit);
}
}
private final AtomicReference<Cache> _cache;
private ExecutorService _commitExecutor;
public CachingObjectPersistentStore(@ConfigProperty(name = "dhfs.objects.lru.limit") int sizeLimit) {
_cache = new AtomicReference<>(
new Cache(TreePMap.empty(), 0, -1, sizeLimit)
);
}
void init(@Observes @Priority(110) StartupEvent event) {
try (var s = delegate.getSnapshot()) {
_cache.set(_cache.get().withVersion(s.id()));
}
_commitExecutor = Executors.newSingleThreadExecutor();
if (printStats) {
_statusExecutor = Executors.newSingleThreadExecutor();
ExecutorService _statusExecutor = Executors.newSingleThreadExecutor();
_statusExecutor.submit(() -> {
try {
while (true) {
Log.debugv("Cache status: size=" + _cache.get().size() / 1024 / 1024 + "MB");
Thread.sleep(10000);
if (_curSize > 0)
Log.debugv("Cache status: size=" + _curSize / 1024 / 1024 + "MB" + " evicted=" + _evict);
_evict = 0;
}
} catch (InterruptedException ignored) {
}
@@ -57,237 +91,181 @@ public class CachingObjectPersistentStore {
}
}
private void put(JObjectKey key, Optional<JDataVersionedWrapper> obj) {
// Log.tracev("Adding {0} to cache: {1}", key, obj);
_lock.writeLock().lock();
public void commitTx(TxManifestObj<? extends JDataVersionedWrapper> objs, long txId) {
Log.tracev("Committing: {0} writes, {1} deletes", objs.written().size(), objs.deleted().size());
var cache = _cache.get();
var commitFuture = _commitExecutor.submit(() -> delegate.prepareTx(objs, txId).run());
for (var write : objs.written()) {
cache = cache.withPut(write.getLeft(), Optional.of(write.getRight()));
}
for (var del : objs.deleted()) {
cache = cache.withPut(del, Optional.empty());
}
cache = cache.withVersion(txId);
try {
int size = obj.map(JDataVersionedWrapper::estimateSize).orElse(16);
_curSize += size;
var entry = new CacheEntry(obj.<MaybeTombstone<JDataVersionedWrapper>>map(Data::new).orElse(new Tombstone<>()), size);
var old = _cache.putLast(key, entry);
_sortedCache = _sortedCache.plus(key, entry);
if (old != null)
_curSize -= old.size();
while (_curSize >= sizeLimit) {
var del = _cache.pollFirstEntry();
_sortedCache = _sortedCache.minus(del.getKey());
_curSize -= del.getValue().size();
_evict++;
}
} finally {
_lock.writeLock().unlock();
commitFuture.get();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
_cache.set(cache);
@Nonnull
public Optional<JDataVersionedWrapper> readObject(JObjectKey name) {
_lock.readLock().lock();
try {
var got = _cache.get(name);
if (got != null) {
return got.object().opt();
}
} finally {
_lock.readLock().unlock();
}
// Global object lock, prevent putting the object into cache
// if its being written right now by another thread
try (var lock = lockManager.tryLockObject(name)) {
var got = delegate.readObject(name);
if (lock == null)
return got;
_lock.writeLock().lock();
try {
put(name, got);
// No need to increase cache version, the objects didn't change
} finally {
_lock.writeLock().unlock();
}
return got;
}
}
public void commitTx(TxManifestObj<? extends JDataVersionedWrapper> names, long txId) {
var serialized = delegate.prepareManifest(names);
Log.tracev("Committing: {0} writes, {1} deletes", names.written().size(), names.deleted().size());
// A little complicated locking to minimize write lock holding time
delegate.commitTx(serialized, txId, (commit) -> {
_lock.writeLock().lock();
try {
// Make the changes visible atomically both in cache and in the underlying store
for (var write : names.written()) {
put(write.getLeft(), Optional.of(write.getRight()));
}
for (var del : names.deleted()) {
put(del, Optional.empty());
}
++_cacheVersion;
commit.run();
} finally {
_lock.writeLock().unlock();
}
});
Log.tracev("Committed: {0} writes, {1} deletes", names.written().size(), names.deleted().size());
Log.tracev("Committed: {0} writes, {1} deletes", objs.written().size(), objs.deleted().size());
}
public Snapshot<JObjectKey, JDataVersionedWrapper> getSnapshot() {
TreePMap<JObjectKey, CacheEntry> curSortedCache;
Snapshot<JObjectKey, JDataVersionedWrapper> backing = null;
long cacheVersion;
while (true) {
var cache = _cache.get();
if (cache == null)
return delegate.getSnapshot();
Cache curCache = null;
Snapshot<JObjectKey, JDataVersionedWrapper> backing = null;
try {
// Log.tracev("Getting cache snapshot");
// Decrease the lock time as much as possible
_lock.readLock().lock();
try {
curSortedCache = _sortedCache;
cacheVersion = _cacheVersion;
// TODO: Could this be done without lock?
curCache = _cache.get();
backing = delegate.getSnapshot();
} finally {
_lock.readLock().unlock();
}
Snapshot<JObjectKey, JDataVersionedWrapper> finalBacking = backing;
return new Snapshot<JObjectKey, JDataVersionedWrapper>() {
private final TreePMap<JObjectKey, CacheEntry> _curSortedCache = curSortedCache;
private final Snapshot<JObjectKey, JDataVersionedWrapper> _backing = finalBacking;
private final long _snapshotCacheVersion = cacheVersion;
private void maybeCache(JObjectKey key, Optional<JDataVersionedWrapper> obj) {
if (_snapshotCacheVersion != _cacheVersion)
return;
_lock.writeLock().lock();
try {
if (_snapshotCacheVersion != _cacheVersion) {
// Log.tracev("Not caching: {0}", key);
} else {
// Log.tracev("Caching: {0}", key);
put(key, obj);
}
} finally {
_lock.writeLock().unlock();
}
if (curCache.version() != backing.id()) {
backing.close();
backing = null;
continue;
}
Snapshot<JObjectKey, JDataVersionedWrapper> finalBacking = backing;
Cache finalCurCache = curCache;
return new Snapshot<JObjectKey, JDataVersionedWrapper>() {
private boolean _invalid = false;
private Cache _curCache = finalCurCache;
private final Snapshot<JObjectKey, JDataVersionedWrapper> _backing = finalBacking;
@Override
public CloseableKvIterator<JObjectKey, JDataVersionedWrapper> getIterator(IteratorStart start, JObjectKey key) {
return new TombstoneMergingKvIterator<>("cache", start, key,
(mS, mK)
-> new MappingKvIterator<>(
new NavigableMapKvIterator<>(_curSortedCache, mS, mK),
e -> {
// Log.tracev("Taken from cache: {0}", e);
return e.object();
}
),
(mS, mK) -> new MappingKvIterator<>(new CachingKvIterator(_backing.getIterator(start, key)), Data::new));
}
private void maybeCache(JObjectKey key, Optional<JDataVersionedWrapper> obj) {
if (_invalid)
return;
@Nonnull
@Override
public Optional<JDataVersionedWrapper> readObject(JObjectKey name) {
var cached = _curSortedCache.get(name);
if (cached != null) {
return switch (cached.object()) {
case Data<JDataVersionedWrapper> data -> Optional.of(data.value());
case Tombstone<JDataVersionedWrapper> tombstone -> {
yield Optional.empty();
for (int i = 0; i < 10; i++) {
var globalCache = _cache.get();
if (globalCache.version() != _curCache.version()) {
_invalid = true;
return;
}
default -> throw new IllegalStateException("Unexpected value: " + cached.object());
};
}
var read = _backing.readObject(name);
maybeCache(name, read);
return _backing.readObject(name);
}
@Override
public long id() {
return _backing.id();
}
var newCache = globalCache.withPut(key, obj);
if (!_cache.compareAndSet(globalCache, newCache))
continue;
@Override
public void close() {
_backing.close();
}
private class CachingKvIterator implements CloseableKvIterator<JObjectKey, JDataVersionedWrapper> {
private final CloseableKvIterator<JObjectKey, JDataVersionedWrapper> _delegate;
private CachingKvIterator(CloseableKvIterator<JObjectKey, JDataVersionedWrapper> delegate) {
_delegate = delegate;
_curCache = newCache;
}
}
@Override
public JObjectKey peekNextKey() {
return _delegate.peekNextKey();
public CloseableKvIterator<JObjectKey, JDataVersionedWrapper> getIterator(IteratorStart start, JObjectKey key) {
return new TombstoneMergingKvIterator<>("cache", start, key,
(mS, mK)
-> new MappingKvIterator<>(
new NavigableMapKvIterator<>(_curCache.map(), mS, mK),
e -> {
// Log.tracev("Taken from cache: {0}", e);
return e.object();
}
),
(mS, mK) -> new MappingKvIterator<>(new CachingKvIterator(_backing.getIterator(start, key)), Data::new));
}
@Nonnull
@Override
public Optional<JDataVersionedWrapper> readObject(JObjectKey name) {
var cached = _curCache.map().get(name);
if (cached != null) {
return switch (cached.object()) {
case Data<JDataVersionedWrapper> data -> Optional.of(data.value());
case Tombstone<JDataVersionedWrapper> tombstone -> {
yield Optional.empty();
}
default -> throw new IllegalStateException("Unexpected value: " + cached.object());
};
}
var read = _backing.readObject(name);
maybeCache(name, read);
return _backing.readObject(name);
}
@Override
public void skip() {
_delegate.skip();
public long id() {
return _backing.id();
}
@Override
public void close() {
_delegate.close();
_backing.close();
}
@Override
public boolean hasNext() {
return _delegate.hasNext();
}
private class CachingKvIterator implements CloseableKvIterator<JObjectKey, JDataVersionedWrapper> {
private final CloseableKvIterator<JObjectKey, JDataVersionedWrapper> _delegate;
@Override
public JObjectKey peekPrevKey() {
return _delegate.peekPrevKey();
}
private CachingKvIterator(CloseableKvIterator<JObjectKey, JDataVersionedWrapper> delegate) {
_delegate = delegate;
}
@Override
public Pair<JObjectKey, JDataVersionedWrapper> prev() {
var prev = _delegate.prev();
maybeCache(prev.getKey(), Optional.of(prev.getValue()));
return prev;
}
@Override
public JObjectKey peekNextKey() {
return _delegate.peekNextKey();
}
@Override
public boolean hasPrev() {
return _delegate.hasPrev();
}
@Override
public void skip() {
_delegate.skip();
}
@Override
public void skipPrev() {
_delegate.skipPrev();
}
@Override
public void close() {
_delegate.close();
}
@Override
public Pair<JObjectKey, JDataVersionedWrapper> next() {
var next = _delegate.next();
maybeCache(next.getKey(), Optional.of(next.getValue()));
return next;
@Override
public boolean hasNext() {
return _delegate.hasNext();
}
@Override
public JObjectKey peekPrevKey() {
return _delegate.peekPrevKey();
}
@Override
public Pair<JObjectKey, JDataVersionedWrapper> prev() {
var prev = _delegate.prev();
maybeCache(prev.getKey(), Optional.of(prev.getValue()));
return prev;
}
@Override
public boolean hasPrev() {
return _delegate.hasPrev();
}
@Override
public void skipPrev() {
_delegate.skipPrev();
}
@Override
public Pair<JObjectKey, JDataVersionedWrapper> next() {
var next = _delegate.next();
maybeCache(next.getKey(), Optional.of(next.getValue()));
return next;
}
}
};
} catch (Throwable ex) {
if (backing != null) {
backing.close();
}
};
} catch (Throwable ex) {
if (backing != null) {
backing.close();
throw ex;
}
throw ex;
}
}
public long getLastTxId() {
return delegate.getLastCommitId();
}
private record CacheEntry(MaybeTombstone<JDataVersionedWrapper> object, long size) {
private record CacheEntry(MaybeTombstone<JDataVersionedWrapper> object, int size) {
}
}

View File

@@ -43,12 +43,9 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore {
private static final String DB_NAME = "objects";
private static final byte[] DB_VER_OBJ_NAME = "__DB_VER_OBJ".getBytes(StandardCharsets.UTF_8);
private final Path _root;
private final AtomicReference<RefcountedCloseable<Txn<ByteBuffer>>> _curReadTxn = new AtomicReference<>();
private final ReentrantReadWriteLock _lock = new ReentrantReadWriteLock();
private Env<ByteBuffer> _env;
private Dbi<ByteBuffer> _db;
private boolean _ready = false;
private long _lastTxId = 0;
public LmdbObjectPersistentStore(@ConfigProperty(name = "dhfs.objects.persistence.files.root") String root) {
_root = Path.of(root).resolve("objects");
@@ -65,22 +62,33 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore {
.open(_root.toFile(), EnvFlags.MDB_NOTLS);
_db = _env.openDbi(DB_NAME, MDB_CREATE);
var bb = ByteBuffer.allocateDirect(DB_VER_OBJ_NAME.length);
bb.put(DB_VER_OBJ_NAME);
bb.flip();
try (Txn<ByteBuffer> txn = _env.txnRead()) {
var value = _db.get(txn, bb);
if (value != null) {
var ver = value.getLong();
Log.infov("Read version: {0}", ver);
_lastTxId = ver;
try (Txn<ByteBuffer> txn = _env.txnWrite()) {
var read = readTxId(txn);
if (read.isPresent()) {
Log.infov("Read tx id {0}", read.get());
} else {
var bb = ByteBuffer.allocateDirect(DB_VER_OBJ_NAME.length);
bb.put(DB_VER_OBJ_NAME);
bb.flip();
var bbData = ByteBuffer.allocateDirect(8);
bbData.putLong(0);
bbData.flip();
_db.put(txn, bb, bbData);
txn.commit();
}
}
_ready = true;
}
private Optional<Long> readTxId(Txn<ByteBuffer> txn) {
var bb = ByteBuffer.allocateDirect(DB_VER_OBJ_NAME.length);
bb.put(DB_VER_OBJ_NAME);
bb.flip();
var value = _db.get(txn, bb);
return Optional.ofNullable(value).map(ByteBuffer::getLong);
}
void shutdown(@Observes @Priority(900) ShutdownEvent event) throws IOException {
_ready = false;
_db.close();
@@ -101,76 +109,50 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore {
}
}
private RefcountedCloseable<Txn<ByteBuffer>> getCurTxn() {
_lock.readLock().lock();
try {
var got = _curReadTxn.get();
var refInc = Optional.ofNullable(got).map(RefcountedCloseable::ref).orElse(null);
if (refInc != null) {
return got;
} else {
var newTxn = new RefcountedCloseable<>(_env.txnRead());
_curReadTxn.compareAndSet(got, newTxn);
return newTxn;
}
} finally {
_lock.readLock().unlock();
}
}
@Override
public CloseableKvIterator<JObjectKey, ByteString> getIterator(IteratorStart start, JObjectKey key) {
return new KeyPredicateKvIterator<>(new LmdbKvIterator(start, key), start, key, (k) -> !Arrays.equals(k.name().getBytes(StandardCharsets.UTF_8), DB_VER_OBJ_NAME));
}
@Override
public Snapshot<JObjectKey, ByteString> getSnapshot() {
_lock.readLock().lock();
try {
var txn = new RefcountedCloseable<>(_env.txnRead());
var commitId = getLastCommitId();
return new Snapshot<JObjectKey, ByteString>() {
private final RefcountedCloseable<Txn<ByteBuffer>> _txn = txn;
private final long _id = commitId;
private boolean _closed = false;
var txn = new RefcountedCloseable<>(_env.txnRead());
long commitId = readTxId(txn.get()).orElseThrow();
return new Snapshot<JObjectKey, ByteString>() {
private final RefcountedCloseable<Txn<ByteBuffer>> _txn = txn;
private final long _id = commitId;
private boolean _closed = false;
@Override
public CloseableKvIterator<JObjectKey, ByteString> getIterator(IteratorStart start, JObjectKey key) {
assert !_closed;
return new KeyPredicateKvIterator<>(new LmdbKvIterator(_txn.ref(), start, key), start, key, (k) -> !Arrays.equals(k.name().getBytes(StandardCharsets.UTF_8), DB_VER_OBJ_NAME));
}
@Override
public CloseableKvIterator<JObjectKey, ByteString> getIterator(IteratorStart start, JObjectKey key) {
assert !_closed;
return new KeyPredicateKvIterator<>(new LmdbKvIterator(_txn.ref(), start, key), start, key, (k) -> !Arrays.equals(k.name().getBytes(StandardCharsets.UTF_8), DB_VER_OBJ_NAME));
}
@Nonnull
@Override
public Optional<ByteString> readObject(JObjectKey name) {
assert !_closed;
var got = _db.get(_txn.get(), name.toByteBuffer());
var ret = Optional.ofNullable(got).map(ByteString::copyFrom);
return ret;
}
@Nonnull
@Override
public Optional<ByteString> readObject(JObjectKey name) {
assert !_closed;
var got = _db.get(_txn.get(), name.toByteBuffer());
var ret = Optional.ofNullable(got).map(ByteString::copyFrom);
return ret;
}
@Override
public long id() {
assert !_closed;
return _id;
}
@Override
public long id() {
assert !_closed;
return _id;
}
@Override
public void close() {
assert !_closed;
_closed = true;
_txn.unref();
}
};
} finally {
_lock.readLock().unlock();
}
@Override
public void close() {
assert !_closed;
_closed = true;
_txn.unref();
}
};
}
@Override
public void commitTx(TxManifestRaw names, long txId, Consumer<Runnable> commitLocked) {
public Runnable prepareTx(TxManifestRaw names, long txId) {
verifyReady();
try (Txn<ByteBuffer> txn = _env.txnWrite()) {
var txn = _env.txnWrite();
try {
for (var written : names.written()) {
// TODO:
var bb = UninitializedByteBuffer.allocateUninitialized(written.getValue().size());
@@ -182,33 +164,26 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore {
_db.delete(txn, key.toByteBuffer());
}
assert txId > readTxId(txn).orElseThrow();
var bb = ByteBuffer.allocateDirect(DB_VER_OBJ_NAME.length);
bb.put(DB_VER_OBJ_NAME);
bb.flip();
var bbData = ByteBuffer.allocateDirect(8);
commitLocked.accept(() -> {
_lock.writeLock().lock();
try {
var realTxId = txId;
if (realTxId == -1)
realTxId = _lastTxId + 1;
assert realTxId > _lastTxId;
_lastTxId = realTxId;
bbData.putLong(realTxId);
bbData.flip();
_db.put(txn, bb, bbData);
_curReadTxn.set(null);
txn.commit();
} finally {
_lock.writeLock().unlock();
}
});
bbData.putLong(txId);
bbData.flip();
_db.put(txn, bb, bbData);
} catch (Throwable t) {
txn.close();
throw t;
}
return () -> {
try {
txn.commit();
} finally {
txn.close();
}
};
}
@Override
@@ -229,16 +204,6 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore {
return _root.toFile().getUsableSpace();
}
@Override
public long getLastCommitId() {
_lock.readLock().lock();
try {
return _lastTxId;
} finally {
_lock.readLock().unlock();
}
}
private class LmdbKvIterator extends ReversibleKvIterator<JObjectKey, ByteString> {
private static final Cleaner CLEANER = Cleaner.create();
private final RefcountedCloseable<Txn<ByteBuffer>> _txn;
@@ -326,10 +291,6 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore {
Log.tracev("got: {0}, hasNext: {1}", realGot, _hasNext);
}
LmdbKvIterator(IteratorStart start, JObjectKey key) {
this(getCurTxn(), start, key);
}
@Override
public void close() {
if (_closed.getValue()) {

View File

@@ -30,11 +30,6 @@ public class MemoryObjectPersistentStore implements ObjectPersistentStore {
}
}
@Override
public CloseableKvIterator<JObjectKey, ByteString> getIterator(IteratorStart start, JObjectKey key) {
return new NavigableMapKvIterator<>(_objects, start, key);
}
@Override
public Snapshot<JObjectKey, ByteString> getSnapshot() {
synchronized (this) {
@@ -66,25 +61,19 @@ public class MemoryObjectPersistentStore implements ObjectPersistentStore {
}
}
@Override
public void commitTx(TxManifestRaw names, long txId, Consumer<Runnable> commitLocked) {
synchronized (this) {
for (var written : names.written()) {
_objects = _objects.plus(written.getKey(), written.getValue());
}
for (JObjectKey key : names.deleted()) {
_objects = _objects.minus(key);
}
commitLocked.accept(() -> {
_lock.writeLock().lock();
try {
assert txId > _lastCommitId;
_lastCommitId = txId;
} finally {
_lock.writeLock().unlock();
public Runnable prepareTx(TxManifestRaw names, long txId) {
return () -> {
synchronized (this) {
for (var written : names.written()) {
_objects = _objects.plus(written.getKey(), written.getValue());
}
});
}
for (JObjectKey key : names.deleted()) {
_objects = _objects.minus(key);
}
assert txId > _lastCommitId;
_lastCommitId = txId;
}
};
}
@Override
@@ -101,14 +90,4 @@ public class MemoryObjectPersistentStore implements ObjectPersistentStore {
public long getUsableSpace() {
return 0;
}
@Override
public long getLastCommitId() {
_lock.readLock().lock();
try {
return _lastCommitId;
} finally {
_lock.readLock().unlock();
}
}
}

View File

@@ -16,23 +16,13 @@ public interface ObjectPersistentStore {
@Nonnull
Optional<ByteString> readObject(JObjectKey name);
// Returns an iterator with a view of all commited objects
// Does not have to guarantee consistent view, snapshots are handled by upper layers
CloseableKvIterator<JObjectKey, ByteString> getIterator(IteratorStart start, JObjectKey key);
Snapshot<JObjectKey, ByteString> getSnapshot();
/**
* @param commitLocked - a function that will be called with a Runnable that will commit the transaction
* the changes in the store will be visible to new transactions only after the runnable is called
*/
void commitTx(TxManifestRaw names, long txId, Consumer<Runnable> commitLocked);
Runnable prepareTx(TxManifestRaw names, long txId);
long getTotalSpace();
long getFreeSpace();
long getUsableSpace();
long getLastCommitId();
}

View File

@@ -29,14 +29,6 @@ public class SerializingObjectPersistentStore {
return delegateStore.readObject(name).map(serializer::deserialize);
}
public TxManifestRaw prepareManifest(TxManifestObj<? extends JDataVersionedWrapper> names) {
return new TxManifestRaw(
names.written().stream()
.map(e -> Pair.of(e.getKey(), serializer.serialize(e.getValue())))
.toList()
, names.deleted());
}
public Snapshot<JObjectKey, JDataVersionedWrapper> getSnapshot() {
return new Snapshot<JObjectKey, JDataVersionedWrapper>() {
private final Snapshot<JObjectKey, ByteString> _backing = delegateStore.getSnapshot();
@@ -65,16 +57,15 @@ public class SerializingObjectPersistentStore {
}
// void commitTx(TxManifestObj<? extends JDataVersionedWrapper> names, Consumer<Runnable> commitLocked) {
// delegateStore.commitTx(prepareManifest(names), commitLocked);
// }
void commitTx(TxManifestRaw names, long txId, Consumer<Runnable> commitLocked) {
delegateStore.commitTx(names, txId, commitLocked);
private TxManifestRaw prepareManifest(TxManifestObj<? extends JDataVersionedWrapper> objs) {
return new TxManifestRaw(
objs.written().stream()
.map(e -> Pair.of(e.getKey(), serializer.serialize(e.getValue())))
.toList()
, objs.deleted());
}
long getLastCommitId() {
return delegateStore.getLastCommitId();
Runnable prepareTx(TxManifestObj<? extends JDataVersionedWrapper> objects, long txId) {
return delegateStore.prepareTx(prepareManifest(objects), txId);
}
}

View File

@@ -54,7 +54,7 @@ public class WritebackObjectPersistentStore {
private ExecutorService _statusExecutor;
private volatile boolean _ready = false;
void init(@Observes @Priority(110) StartupEvent event) {
void init(@Observes @Priority(120) StartupEvent event) {
{
BasicThreadFactory factory = new BasicThreadFactory.Builder()
.namingPattern("tx-writeback-%d")
@@ -75,8 +75,12 @@ public class WritebackObjectPersistentStore {
} catch (InterruptedException ignored) {
}
});
_counter.set(cachedStore.getLastTxId());
_lastCommittedTx.set(cachedStore.getLastTxId());
long lastTxId;
try (var s = cachedStore.getSnapshot()) {
lastTxId = s.id();
}
_counter.set(lastTxId);
_lastCommittedTx.set(lastTxId);
_ready = true;
}
@@ -313,26 +317,6 @@ public class WritebackObjectPersistentStore {
}
}
@Nonnull
public Optional<JDataVersionedWrapper> readObject(JObjectKey name) {
var pending = getPendingWrite(name).orElse(null);
return switch (pending) {
case PendingWrite write -> Optional.of(write.data());
case PendingDelete ignored -> Optional.empty();
case null -> cachedStore.readObject(name);
default -> throw new IllegalStateException("Unexpected value: " + pending);
};
}
@Nonnull
public VerboseReadResult readObjectVerbose(JObjectKey key) {
var pending = getPendingWrite(key).orElse(null);
if (pending != null) {
return new VerboseReadResultPending(pending);
}
return new VerboseReadResultPersisted(cachedStore.readObject(key));
}
/**
* @param commitLocked - a function that will be called with a Consumer of a new transaction id,
* that will commit the transaction the changes in the store will be visible to new transactions

View File

@@ -3,6 +3,7 @@ package com.usatiuk.objects.transaction;
import com.usatiuk.objects.JData;
import com.usatiuk.objects.JDataVersionedWrapper;
import com.usatiuk.objects.JObjectKey;
import com.usatiuk.objects.snapshot.Snapshot;
import com.usatiuk.objects.snapshot.SnapshotManager;
import com.usatiuk.dhfs.utils.AutoCloseableNoThrow;
import io.quarkus.logging.Log;
@@ -58,18 +59,10 @@ public class JObjectManager {
verifyReady();
var writes = new LinkedHashMap<JObjectKey, TxRecord.TxObjectRecord<?>>();
var dependenciesLocked = new LinkedHashMap<JObjectKey, Optional<JDataVersionedWrapper>>();
Snapshot<JObjectKey, JDataVersionedWrapper> commitSnapshot = null;
Map<JObjectKey, TransactionObject<?>> readSet;
var toUnlock = new ArrayList<AutoCloseableNoThrow>();
Consumer<JObjectKey> addDependency =
key -> {
dependenciesLocked.computeIfAbsent(key, k -> {
var lock = lockManager.lockObject(k);
toUnlock.add(lock);
return snapshotManager.readObjectDirect(k);
});
};
try {
try {
long pendingCount = 0;
@@ -161,7 +154,12 @@ public class JObjectManager {
if (!writes.isEmpty()) {
Stream.concat(readSet.keySet().stream(), writes.keySet().stream())
.sorted(Comparator.comparing(JObjectKey::toString))
.forEach(addDependency);
.forEach(k -> {
var lock = lockManager.lockObject(k);
toUnlock.add(lock);
});
commitSnapshot = snapshotManager.createSnapshot();
}
for (var read : readSet.entrySet()) {
@@ -189,39 +187,47 @@ public class JObjectManager {
Log.trace("Committing transaction start");
var snapshotId = tx.snapshot().id();
for (var read : readSet.entrySet()) {
var dep = dependenciesLocked.get(read.getKey());
if (snapshotId != commitSnapshot.id()) {
for (var read : readSet.entrySet()) {
dependenciesLocked.put(read.getKey(), commitSnapshot.readObject(read.getKey()));
var dep = dependenciesLocked.get(read.getKey());
if (dep.isEmpty() != read.getValue().data().isEmpty()) {
Log.trace("Checking read dependency " + read.getKey() + " - not found");
throw new TxCommitException("Serialization hazard: " + dep.isEmpty() + " vs " + read.getValue().data().isEmpty());
}
if (dep.isEmpty() != read.getValue().data().isEmpty()) {
Log.trace("Checking read dependency " + read.getKey() + " - not found");
throw new TxCommitException("Serialization hazard: " + dep.isEmpty() + " vs " + read.getValue().data().isEmpty());
}
if (dep.isEmpty()) {
// TODO: Every write gets a dependency due to hooks
continue;
if (dep.isEmpty()) {
// TODO: Every write gets a dependency due to hooks
continue;
// assert false;
// throw new TxCommitException("Serialization hazard: " + dep.isEmpty() + " vs " + read.getValue().data().isEmpty());
}
}
if (dep.get().version() > snapshotId) {
Log.trace("Checking dependency " + read.getKey() + " - newer than");
throw new TxCommitException("Serialization hazard: " + dep.get().data().key() + " " + dep.get().version() + " vs " + snapshotId);
}
if (dep.get().version() > snapshotId) {
Log.trace("Checking dependency " + read.getKey() + " - newer than");
throw new TxCommitException("Serialization hazard: " + dep.get().data().key() + " " + dep.get().version() + " vs " + snapshotId);
}
Log.trace("Checking dependency " + read.getKey() + " - ok with read");
Log.trace("Checking dependency " + read.getKey() + " - ok with read");
}
} else {
Log.tracev("Skipped dependency checks: no changes");
}
boolean same = snapshotId == commitSnapshot.id();
var addFlushCallback = snapshotManager.commitTx(
writes.values().stream()
.filter(r -> {
if (r instanceof TxRecord.TxObjectRecordWrite<?>(JData data)) {
var dep = dependenciesLocked.get(data.key());
if (dep.isPresent() && dep.get().version() > snapshotId) {
Log.trace("Skipping write " + data.key() + " - dependency " + dep.get().version() + " vs " + snapshotId);
return false;
if (!same)
if (r instanceof TxRecord.TxObjectRecordWrite<?>(JData data)) {
var dep = dependenciesLocked.get(data.key());
if (dep.isPresent() && dep.get().version() > snapshotId) {
Log.trace("Skipping write " + data.key() + " - dependency " + dep.get().version() + " vs " + snapshotId);
return false;
}
}
}
return true;
}).toList());
@@ -244,6 +250,8 @@ public class JObjectManager {
for (var unlock : toUnlock) {
unlock.close();
}
if (commitSnapshot != null)
commitSnapshot.close();
tx.close();
}
}

View File

@@ -27,95 +27,103 @@ public class LmdbKvIteratorTest {
@Inject
LmdbObjectPersistentStore store;
long getNextTxId() {
try (var s = store.getSnapshot()) {
return s.id() + 1;
}
}
@RepeatedTest(100)
public void iteratorTest1() {
store.commitTx(
store.prepareTx(
new TxManifestRaw(
List.of(Pair.of(JObjectKey.of(Long.toString(1)), ByteString.copyFrom(new byte[]{2})),
Pair.of(JObjectKey.of(Long.toString(2)), ByteString.copyFrom(new byte[]{3})),
Pair.of(JObjectKey.of(Long.toString(3)), ByteString.copyFrom(new byte[]{4}))),
List.of()
), -1, Runnable::run
);
), getNextTxId()
).run();
var iterator = store.getIterator(IteratorStart.GE, JObjectKey.of(""));
Just.checkIterator(iterator, List.of(Pair.of(JObjectKey.of(Long.toString(1)), ByteString.copyFrom(new byte[]{2})),
Pair.of(JObjectKey.of(Long.toString(2)), ByteString.copyFrom(new byte[]{3})),
Pair.of(JObjectKey.of(Long.toString(3)), ByteString.copyFrom(new byte[]{4}))));
Assertions.assertFalse(iterator.hasNext());
iterator.close();
try (var snapshot = store.getSnapshot()) {
var iterator = snapshot.getIterator(IteratorStart.GE, JObjectKey.of(""));
Just.checkIterator(iterator, List.of(Pair.of(JObjectKey.of(Long.toString(1)), ByteString.copyFrom(new byte[]{2})),
Pair.of(JObjectKey.of(Long.toString(2)), ByteString.copyFrom(new byte[]{3})),
Pair.of(JObjectKey.of(Long.toString(3)), ByteString.copyFrom(new byte[]{4}))));
Assertions.assertFalse(iterator.hasNext());
iterator.close();
iterator = store.getIterator(IteratorStart.LE, JObjectKey.of(Long.toString(3)));
Just.checkIterator(iterator, Pair.of(JObjectKey.of(Long.toString(3)), ByteString.copyFrom(new byte[]{4})));
Assertions.assertFalse(iterator.hasNext());
iterator.close();
iterator = snapshot.getIterator(IteratorStart.LE, JObjectKey.of(Long.toString(3)));
Just.checkIterator(iterator, Pair.of(JObjectKey.of(Long.toString(3)), ByteString.copyFrom(new byte[]{4})));
Assertions.assertFalse(iterator.hasNext());
iterator.close();
iterator = store.getIterator(IteratorStart.LE, JObjectKey.of(Long.toString(2)));
Just.checkIterator(iterator, Pair.of(JObjectKey.of(Long.toString(2)), ByteString.copyFrom(new byte[]{3})), Pair.of(JObjectKey.of(Long.toString(3)), ByteString.copyFrom(new byte[]{4})));
Assertions.assertFalse(iterator.hasNext());
iterator.close();
iterator = snapshot.getIterator(IteratorStart.LE, JObjectKey.of(Long.toString(2)));
Just.checkIterator(iterator, Pair.of(JObjectKey.of(Long.toString(2)), ByteString.copyFrom(new byte[]{3})), Pair.of(JObjectKey.of(Long.toString(3)), ByteString.copyFrom(new byte[]{4})));
Assertions.assertFalse(iterator.hasNext());
iterator.close();
iterator = store.getIterator(IteratorStart.GE, JObjectKey.of(Long.toString(2)));
Just.checkIterator(iterator, Pair.of(JObjectKey.of(Long.toString(2)), ByteString.copyFrom(new byte[]{3})), Pair.of(JObjectKey.of(Long.toString(3)), ByteString.copyFrom(new byte[]{4})));
Assertions.assertFalse(iterator.hasNext());
iterator.close();
iterator = snapshot.getIterator(IteratorStart.GE, JObjectKey.of(Long.toString(2)));
Just.checkIterator(iterator, Pair.of(JObjectKey.of(Long.toString(2)), ByteString.copyFrom(new byte[]{3})), Pair.of(JObjectKey.of(Long.toString(3)), ByteString.copyFrom(new byte[]{4})));
Assertions.assertFalse(iterator.hasNext());
iterator.close();
iterator = store.getIterator(IteratorStart.GT, JObjectKey.of(Long.toString(2)));
Just.checkIterator(iterator, Pair.of(JObjectKey.of(Long.toString(3)), ByteString.copyFrom(new byte[]{4})));
Assertions.assertFalse(iterator.hasNext());
iterator.close();
iterator = snapshot.getIterator(IteratorStart.GT, JObjectKey.of(Long.toString(2)));
Just.checkIterator(iterator, Pair.of(JObjectKey.of(Long.toString(3)), ByteString.copyFrom(new byte[]{4})));
Assertions.assertFalse(iterator.hasNext());
iterator.close();
iterator = store.getIterator(IteratorStart.LT, JObjectKey.of(Long.toString(3)));
Just.checkIterator(iterator, Pair.of(JObjectKey.of(Long.toString(2)), ByteString.copyFrom(new byte[]{3})), Pair.of(JObjectKey.of(Long.toString(3)), ByteString.copyFrom(new byte[]{4})));
Assertions.assertFalse(iterator.hasNext());
iterator.close();
iterator = snapshot.getIterator(IteratorStart.LT, JObjectKey.of(Long.toString(3)));
Just.checkIterator(iterator, Pair.of(JObjectKey.of(Long.toString(2)), ByteString.copyFrom(new byte[]{3})), Pair.of(JObjectKey.of(Long.toString(3)), ByteString.copyFrom(new byte[]{4})));
Assertions.assertFalse(iterator.hasNext());
iterator.close();
iterator = store.getIterator(IteratorStart.LT, JObjectKey.of(Long.toString(2)));
Just.checkIterator(iterator, Pair.of(JObjectKey.of(Long.toString(1)), ByteString.copyFrom(new byte[]{2})), Pair.of(JObjectKey.of(Long.toString(2)), ByteString.copyFrom(new byte[]{3})), Pair.of(JObjectKey.of(Long.toString(3)), ByteString.copyFrom(new byte[]{4})));
Assertions.assertFalse(iterator.hasNext());
iterator.close();
iterator = snapshot.getIterator(IteratorStart.LT, JObjectKey.of(Long.toString(2)));
Just.checkIterator(iterator, Pair.of(JObjectKey.of(Long.toString(1)), ByteString.copyFrom(new byte[]{2})), Pair.of(JObjectKey.of(Long.toString(2)), ByteString.copyFrom(new byte[]{3})), Pair.of(JObjectKey.of(Long.toString(3)), ByteString.copyFrom(new byte[]{4})));
Assertions.assertFalse(iterator.hasNext());
iterator.close();
iterator = store.getIterator(IteratorStart.LT, JObjectKey.of(Long.toString(1)));
Just.checkIterator(iterator, Pair.of(JObjectKey.of(Long.toString(1)), ByteString.copyFrom(new byte[]{2})), Pair.of(JObjectKey.of(Long.toString(2)), ByteString.copyFrom(new byte[]{3})), Pair.of(JObjectKey.of(Long.toString(3)), ByteString.copyFrom(new byte[]{4})));
Assertions.assertFalse(iterator.hasNext());
iterator.close();
iterator = snapshot.getIterator(IteratorStart.LT, JObjectKey.of(Long.toString(1)));
Just.checkIterator(iterator, Pair.of(JObjectKey.of(Long.toString(1)), ByteString.copyFrom(new byte[]{2})), Pair.of(JObjectKey.of(Long.toString(2)), ByteString.copyFrom(new byte[]{3})), Pair.of(JObjectKey.of(Long.toString(3)), ByteString.copyFrom(new byte[]{4})));
Assertions.assertFalse(iterator.hasNext());
iterator.close();
iterator = store.getIterator(IteratorStart.LE, JObjectKey.of(Long.toString(1)));
Just.checkIterator(iterator, Pair.of(JObjectKey.of(Long.toString(1)), ByteString.copyFrom(new byte[]{2})), Pair.of(JObjectKey.of(Long.toString(2)), ByteString.copyFrom(new byte[]{3})), Pair.of(JObjectKey.of(Long.toString(3)), ByteString.copyFrom(new byte[]{4})));
Assertions.assertFalse(iterator.hasNext());
iterator.close();
iterator = snapshot.getIterator(IteratorStart.LE, JObjectKey.of(Long.toString(1)));
Just.checkIterator(iterator, Pair.of(JObjectKey.of(Long.toString(1)), ByteString.copyFrom(new byte[]{2})), Pair.of(JObjectKey.of(Long.toString(2)), ByteString.copyFrom(new byte[]{3})), Pair.of(JObjectKey.of(Long.toString(3)), ByteString.copyFrom(new byte[]{4})));
Assertions.assertFalse(iterator.hasNext());
iterator.close();
iterator = store.getIterator(IteratorStart.GT, JObjectKey.of(Long.toString(3)));
Assertions.assertFalse(iterator.hasNext());
iterator.close();
iterator = snapshot.getIterator(IteratorStart.GT, JObjectKey.of(Long.toString(3)));
Assertions.assertFalse(iterator.hasNext());
iterator.close();
iterator = store.getIterator(IteratorStart.GT, JObjectKey.of(Long.toString(4)));
Assertions.assertFalse(iterator.hasNext());
iterator.close();
iterator = snapshot.getIterator(IteratorStart.GT, JObjectKey.of(Long.toString(4)));
Assertions.assertFalse(iterator.hasNext());
iterator.close();
iterator = store.getIterator(IteratorStart.LE, JObjectKey.of(Long.toString(0)));
Just.checkIterator(iterator, Pair.of(JObjectKey.of(Long.toString(1)), ByteString.copyFrom(new byte[]{2})), Pair.of(JObjectKey.of(Long.toString(2)), ByteString.copyFrom(new byte[]{3})), Pair.of(JObjectKey.of(Long.toString(3)), ByteString.copyFrom(new byte[]{4})));
Assertions.assertFalse(iterator.hasNext());
iterator.close();
iterator = snapshot.getIterator(IteratorStart.LE, JObjectKey.of(Long.toString(0)));
Just.checkIterator(iterator, Pair.of(JObjectKey.of(Long.toString(1)), ByteString.copyFrom(new byte[]{2})), Pair.of(JObjectKey.of(Long.toString(2)), ByteString.copyFrom(new byte[]{3})), Pair.of(JObjectKey.of(Long.toString(3)), ByteString.copyFrom(new byte[]{4})));
Assertions.assertFalse(iterator.hasNext());
iterator.close();
iterator = store.getIterator(IteratorStart.GE, JObjectKey.of(Long.toString(2)));
Assertions.assertTrue(iterator.hasNext());
Assertions.assertEquals(JObjectKey.of(Long.toString(2)), iterator.peekNextKey());
Assertions.assertEquals(JObjectKey.of(Long.toString(1)), iterator.peekPrevKey());
Assertions.assertEquals(JObjectKey.of(Long.toString(2)), iterator.peekNextKey());
Assertions.assertEquals(JObjectKey.of(Long.toString(1)), iterator.peekPrevKey());
Just.checkIterator(iterator.reversed(), Pair.of(JObjectKey.of(Long.toString(1)), ByteString.copyFrom(new byte[]{2})));
Just.checkIterator(iterator, Pair.of(JObjectKey.of(Long.toString(1)), ByteString.copyFrom(new byte[]{2})), Pair.of(JObjectKey.of(Long.toString(2)), ByteString.copyFrom(new byte[]{3})), Pair.of(JObjectKey.of(Long.toString(3)), ByteString.copyFrom(new byte[]{4})));
Assertions.assertEquals(Pair.of(JObjectKey.of(Long.toString(3)), ByteString.copyFrom(new byte[]{4})), iterator.prev());
Assertions.assertEquals(Pair.of(JObjectKey.of(Long.toString(2)), ByteString.copyFrom(new byte[]{3})), iterator.prev());
Assertions.assertEquals(Pair.of(JObjectKey.of(Long.toString(2)), ByteString.copyFrom(new byte[]{3})), iterator.next());
iterator.close();
iterator = snapshot.getIterator(IteratorStart.GE, JObjectKey.of(Long.toString(2)));
Assertions.assertTrue(iterator.hasNext());
Assertions.assertEquals(JObjectKey.of(Long.toString(2)), iterator.peekNextKey());
Assertions.assertEquals(JObjectKey.of(Long.toString(1)), iterator.peekPrevKey());
Assertions.assertEquals(JObjectKey.of(Long.toString(2)), iterator.peekNextKey());
Assertions.assertEquals(JObjectKey.of(Long.toString(1)), iterator.peekPrevKey());
Just.checkIterator(iterator.reversed(), Pair.of(JObjectKey.of(Long.toString(1)), ByteString.copyFrom(new byte[]{2})));
Just.checkIterator(iterator, Pair.of(JObjectKey.of(Long.toString(1)), ByteString.copyFrom(new byte[]{2})), Pair.of(JObjectKey.of(Long.toString(2)), ByteString.copyFrom(new byte[]{3})), Pair.of(JObjectKey.of(Long.toString(3)), ByteString.copyFrom(new byte[]{4})));
Assertions.assertEquals(Pair.of(JObjectKey.of(Long.toString(3)), ByteString.copyFrom(new byte[]{4})), iterator.prev());
Assertions.assertEquals(Pair.of(JObjectKey.of(Long.toString(2)), ByteString.copyFrom(new byte[]{3})), iterator.prev());
Assertions.assertEquals(Pair.of(JObjectKey.of(Long.toString(2)), ByteString.copyFrom(new byte[]{3})), iterator.next());
iterator.close();
}
store.commitTx(new TxManifestRaw(
store.prepareTx(new TxManifestRaw(
List.of(),
List.of(JObjectKey.of(Long.toString(1)), JObjectKey.of(Long.toString(2)), JObjectKey.of(Long.toString(3)))
),
-1, Runnable::run
);
getNextTxId()
).run();
}
}