diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/IterProdFn.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/IterProdFn.java index 01798da9..f39ad962 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/IterProdFn.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/IterProdFn.java @@ -1,8 +1,13 @@ package com.usatiuk.dhfs.objects; import com.usatiuk.dhfs.objects.persistence.IteratorStart; +import com.usatiuk.dhfs.utils.AutoCloseableNoThrow; @FunctionalInterface -public interface IterProdFn, V> { +public interface IterProdFn, V> extends AutoCloseableNoThrow { CloseableKvIterator get(IteratorStart start, K key); + + @Override + default void close() { + } } diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/MergingKvIterator.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/MergingKvIterator.java index b6aa55d4..af9684e4 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/MergingKvIterator.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/MergingKvIterator.java @@ -54,6 +54,9 @@ public class MergingKvIterator, V> extends ReversibleKvI } iteratorsTmp.put(iterator, counter++); } + for (var prodFn : iterators) { + prodFn.close(); + } _iterators = Map.copyOf(iteratorsTmp); _pendingIterators = null; } @@ -129,6 +132,9 @@ public class MergingKvIterator, V> extends ReversibleKvI iteratorsTmp.put(iterator, counter++); } _iterators = Map.copyOf(iteratorsTmp); + for (var prodFn : _pendingIterators) { + prodFn.close(); + } } doInitialAdvance(); diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/WritebackObjectPersistentStore.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/WritebackObjectPersistentStore.java index 8a843407..def86174 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/WritebackObjectPersistentStore.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/WritebackObjectPersistentStore.java @@ -3,6 +3,7 @@ package com.usatiuk.dhfs.objects; import com.usatiuk.dhfs.objects.persistence.CachingObjectPersistentStore; import com.usatiuk.dhfs.objects.persistence.IteratorStart; import com.usatiuk.dhfs.objects.persistence.TxManifestObj; +import com.usatiuk.dhfs.objects.snapshot.Snapshot; import com.usatiuk.dhfs.objects.transaction.TxRecord; import io.quarkus.logging.Log; import io.quarkus.runtime.ShutdownEvent; @@ -457,25 +458,72 @@ public class WritebackObjectPersistentStore { return r -> asyncFence(bundleId, r); } - // Returns an iterator with a view of all commited objects - // Does not have to guarantee consistent view, snapshots are handled by upper layers - // Invalidated by commitBundle, but might return data after it has been really committed - public CloseableKvIterator getIterator(IteratorStart start, JObjectKey key) { - Log.tracev("Getting writeback iterator: {0}, {1}", start, key); - _pendingWritesVersionLock.readLock().lock(); + + public Snapshot getSnapshot() { + PSortedMap pendingWrites; + Snapshot cache = null; + long lastTxId; + try { - var curPending = _pendingWrites.get(); - return new TombstoneMergingKvIterator<>("writeback-ps", start, key, - (tS, tK) -> new MappingKvIterator<>( - new NavigableMapKvIterator<>(curPending, tS, tK), - e -> switch (e) { - case PendingWrite pw -> new Data<>(pw.data()); - case PendingDelete d -> new Tombstone<>(); - default -> throw new IllegalStateException("Unexpected value: " + e); - }), - (tS, tK) -> cachedStore.getIterator(tS, tK)); - } finally { - _pendingWritesVersionLock.readLock().unlock(); + _pendingWritesVersionLock.readLock().lock(); + try { + pendingWrites = _pendingWrites.get(); + cache = cachedStore.getSnapshot(); + lastTxId = getLastTxId(); + } finally { + _pendingWritesVersionLock.readLock().unlock(); + } + + Snapshot finalCache = cache; + return new Snapshot() { + private final PSortedMap _pendingWrites = pendingWrites; + private final Snapshot _cache = finalCache; + private final long txId = lastTxId; + + @Override + public CloseableKvIterator getIterator(IteratorStart start, JObjectKey key) { + return new TombstoneMergingKvIterator<>("writeback-ps", start, key, + (tS, tK) -> new MappingKvIterator<>( + new NavigableMapKvIterator<>(_pendingWrites, tS, tK), + e -> switch (e) { + case PendingWrite pw -> new Data<>(pw.data()); + case PendingDelete d -> new Tombstone<>(); + default -> throw new IllegalStateException("Unexpected value: " + e); + }), + (tS, tK) -> new MappingKvIterator<>(_cache.getIterator(tS, tK), Data::new)); + } + + @Nonnull + @Override + public Optional readObject(JObjectKey name) { + var cached = _pendingWrites.get(name); + if (cached != null) { + return switch (cached) { + case PendingWrite c -> Optional.of(c.data()); + case PendingDelete d -> { + yield Optional.empty(); + } + default -> throw new IllegalStateException("Unexpected value: " + cached); + }; + } + return _cache.readObject(name); + } + + @Override + public long id() { + assert lastTxId >= _cache.id(); + return lastTxId; + } + + @Override + public void close() { + _cache.close(); + } + }; + } catch (Throwable e) { + if (cache != null) + cache.close(); + throw e; } } diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/persistence/CachingObjectPersistentStore.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/persistence/CachingObjectPersistentStore.java index b8404162..ad378606 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/persistence/CachingObjectPersistentStore.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/persistence/CachingObjectPersistentStore.java @@ -1,6 +1,7 @@ package com.usatiuk.dhfs.objects.persistence; import com.usatiuk.dhfs.objects.*; +import com.usatiuk.dhfs.objects.snapshot.Snapshot; import com.usatiuk.dhfs.utils.DataLocker; import io.quarkus.logging.Log; import io.quarkus.runtime.Startup; @@ -102,7 +103,9 @@ public class CachingObjectPersistentStore { public void commitTx(TxManifestObj names, long txId) { var serialized = delegate.prepareManifest(names); + Log.tracev("Committing: {0} writes, {1} deletes", names.written().size(), names.deleted().size()); + // A little complicated locking to minimize write lock holding time delegate.commitTx(serialized, txId, (commit) -> { _lock.writeLock().lock(); try { @@ -122,14 +125,13 @@ public class CachingObjectPersistentStore { Log.tracev("Committed: {0} writes, {1} deletes", names.written().size(), names.deleted().size()); } - private class CachingKvIterator implements CloseableKvIterator { private final CloseableKvIterator _delegate; - // This should be created under lock - private final long _curCacheVersion = _cacheVersion; + private final long _curCacheVersion; - private CachingKvIterator(CloseableKvIterator delegate) { + private CachingKvIterator(CloseableKvIterator delegate, long cacheVersion) { _delegate = delegate; + _curCacheVersion = cacheVersion; } @Override @@ -196,28 +198,74 @@ public class CachingObjectPersistentStore { } } - // Returns an iterator with a view of all commited objects - // Does not have to guarantee consistent view, snapshots are handled by upper layers - // Warning: it has a nasty side effect of global caching, so in this case don't even call next on it, - // if some objects are still in writeback - public CloseableKvIterator> getIterator(IteratorStart start, JObjectKey key) { - _lock.readLock().lock(); + public Snapshot getSnapshot() { + TreePMap curSortedCache; + Snapshot backing = null; + long cacheVersion; + try { - Log.tracev("Getting cache iterator: {0}, {1}", start, key); - var curSortedCache = _sortedCache; - return new MergingKvIterator<>("cache", start, key, - (mS, mK) - -> new MappingKvIterator<>( - new NavigableMapKvIterator<>(curSortedCache, mS, mK), - e -> { - Log.tracev("Taken from cache: {0}", e); - return e.object(); + Log.tracev("Getting cache snapshot"); + // Decrease the lock time as much as possible + _lock.readLock().lock(); + try { + curSortedCache = _sortedCache; + cacheVersion = _cacheVersion; + backing = delegate.getSnapshot(); + } finally { + _lock.readLock().unlock(); + } + + Snapshot finalBacking = backing; + return new Snapshot() { + private final TreePMap _curSortedCache = curSortedCache; + private final Snapshot _backing = finalBacking; + private final long _cacheVersion = cacheVersion; + + @Override + public CloseableKvIterator getIterator(IteratorStart start, JObjectKey key) { + return new TombstoneMergingKvIterator<>("cache", start, key, + (mS, mK) + -> new MappingKvIterator<>( + new NavigableMapKvIterator<>(_curSortedCache, mS, mK), + e -> { + Log.tracev("Taken from cache: {0}", e); + return e.object(); + } + ), + (mS, mK) -> new MappingKvIterator<>(new CachingKvIterator(_backing.getIterator(start, key), _cacheVersion), Data::new)); + } + + @Nonnull + @Override + public Optional readObject(JObjectKey name) { + var cached = _curSortedCache.get(name); + if (cached != null) { + return switch (cached.object()) { + case Data data -> Optional.of(data.value()); + case Tombstone tombstone -> { + yield Optional.empty(); } - ), - (mS, mK) - -> new MappingKvIterator<>(new CachingKvIterator(delegate.getIterator(mS, mK)), Data::new)); - } finally { - _lock.readLock().unlock(); + default -> throw new IllegalStateException("Unexpected value: " + cached.object()); + }; + } + return _backing.readObject(name); + } + + @Override + public long id() { + return _backing.id(); + } + + @Override + public void close() { + _backing.close(); + } + }; + } catch (Throwable ex) { + if (backing != null) { + backing.close(); + } + throw ex; } } diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/persistence/LmdbObjectPersistentStore.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/persistence/LmdbObjectPersistentStore.java index 17ee7360..a2bd921e 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/persistence/LmdbObjectPersistentStore.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/persistence/LmdbObjectPersistentStore.java @@ -1,10 +1,8 @@ package com.usatiuk.dhfs.objects.persistence; import com.google.protobuf.ByteString; -import com.usatiuk.dhfs.objects.CloseableKvIterator; -import com.usatiuk.dhfs.objects.JObjectKey; -import com.usatiuk.dhfs.objects.KeyPredicateKvIterator; -import com.usatiuk.dhfs.objects.ReversibleKvIterator; +import com.usatiuk.dhfs.objects.*; +import com.usatiuk.dhfs.objects.snapshot.Snapshot; import com.usatiuk.dhfs.supportlib.UninitializedByteBuffer; import com.usatiuk.dhfs.utils.RefcountedCloseable; import io.quarkus.arc.properties.IfBuildProperty; @@ -116,6 +114,23 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore { } } + private RefcountedCloseable> getCurTxn() { + _lock.readLock().lock(); + try { + var got = _curReadTxn.get(); + var refInc = Optional.ofNullable(got).map(RefcountedCloseable::ref).orElse(null); + if (refInc != null) { + return got; + } else { + var newTxn = new RefcountedCloseable<>(_env.txnRead()); + _curReadTxn.compareAndSet(got, newTxn); + return newTxn; + } + } finally { + _lock.readLock().unlock(); + } + } + private class LmdbKvIterator extends ReversibleKvIterator { private final RefcountedCloseable> _txn; private final Cursor _cursor; @@ -126,23 +141,10 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore { // private final Exception _allocationStacktrace = new Exception(); private final Exception _allocationStacktrace = null; - LmdbKvIterator(IteratorStart start, JObjectKey key) { + LmdbKvIterator(RefcountedCloseable> txn, IteratorStart start, JObjectKey key) { + _txn = txn; _goingForward = true; - _lock.readLock().lock(); - try { - var got = _curReadTxn.get(); - var refInc = Optional.ofNullable(got).map(RefcountedCloseable::ref).orElse(null); - if (refInc != null) { - _txn = got; - } else { - var newTxn = new RefcountedCloseable<>(_env.txnRead()); - _curReadTxn.compareAndSet(got, newTxn); - _txn = newTxn; - } - } finally { - _lock.readLock().unlock(); - } _cursor = _db.openCursor(_txn.get()); var closedRef = _closed; @@ -214,6 +216,10 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore { Log.tracev("got: {0}, hasNext: {1}", realGot, _hasNext); } + LmdbKvIterator(IteratorStart start, JObjectKey key) { + this(getCurTxn(), start, key); + } + @Override public void close() { if (_closed.getValue()) { @@ -292,6 +298,50 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore { return new KeyPredicateKvIterator<>(new LmdbKvIterator(start, key), start, key, (k) -> !Arrays.equals(k.name().getBytes(StandardCharsets.UTF_8), DB_VER_OBJ_NAME)); } + @Override + public Snapshot getSnapshot() { + _lock.readLock().lock(); + try { + var txn = new RefcountedCloseable<>(_env.txnRead()); + var commitId = getLastCommitId(); + return new Snapshot() { + private boolean _closed = false; + private final RefcountedCloseable> _txn = txn; + private final long _id = commitId; + + @Override + public CloseableKvIterator getIterator(IteratorStart start, JObjectKey key) { + assert !_closed; + return new KeyPredicateKvIterator<>(new LmdbKvIterator(_txn.ref(), start, key), start, key, (k) -> !Arrays.equals(k.name().getBytes(StandardCharsets.UTF_8), DB_VER_OBJ_NAME)); + } + + @Nonnull + @Override + public Optional readObject(JObjectKey name) { + assert !_closed; + var got = _db.get(_txn.get(), name.toByteBuffer()); + var ret = Optional.ofNullable(got).map(ByteString::copyFrom); + return ret; + } + + @Override + public long id() { + assert !_closed; + return _id; + } + + @Override + public void close() { + assert !_closed; + _closed = true; + _txn.unref(); + } + }; + } finally { + _lock.readLock().unlock(); + } + } + @Override public void commitTx(TxManifestRaw names, long txId, Consumer commitLocked) { verifyReady(); diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/persistence/MemoryObjectPersistentStore.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/persistence/MemoryObjectPersistentStore.java index 0cf640bf..a5011a17 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/persistence/MemoryObjectPersistentStore.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/persistence/MemoryObjectPersistentStore.java @@ -2,10 +2,13 @@ package com.usatiuk.dhfs.objects.persistence; import com.google.protobuf.ByteString; import com.usatiuk.dhfs.objects.CloseableKvIterator; +import com.usatiuk.dhfs.objects.IterProdFn; import com.usatiuk.dhfs.objects.JObjectKey; import com.usatiuk.dhfs.objects.NavigableMapKvIterator; +import com.usatiuk.dhfs.objects.snapshot.Snapshot; import io.quarkus.arc.properties.IfBuildProperty; import jakarta.enterprise.context.ApplicationScoped; +import org.pcollections.TreePMap; import javax.annotation.Nonnull; import java.util.Collection; @@ -17,7 +20,7 @@ import java.util.function.Consumer; @ApplicationScoped @IfBuildProperty(name = "dhfs.objects.persistence", stringValue = "memory") public class MemoryObjectPersistentStore implements ObjectPersistentStore { - private final ConcurrentSkipListMap _objects = new ConcurrentSkipListMap<>(); + private TreePMap _objects = TreePMap.empty(); private long _lastCommitId = 0; private final ReentrantReadWriteLock _lock = new ReentrantReadWriteLock(); @@ -42,14 +45,45 @@ public class MemoryObjectPersistentStore implements ObjectPersistentStore { return new NavigableMapKvIterator<>(_objects, start, key); } + @Override + public Snapshot getSnapshot() { + synchronized (this) { + return new Snapshot() { + private final TreePMap _objects = MemoryObjectPersistentStore.this._objects; + private final long _lastCommitId = MemoryObjectPersistentStore.this._lastCommitId; + + @Override + public CloseableKvIterator getIterator(IteratorStart start, JObjectKey key) { + return new NavigableMapKvIterator<>(_objects, start, key); + } + + @Nonnull + @Override + public Optional readObject(JObjectKey name) { + return Optional.ofNullable(_objects.get(name)); + } + + @Override + public long id() { + return _lastCommitId; + } + + @Override + public void close() { + + } + }; + } + } + @Override public void commitTx(TxManifestRaw names, long txId, Consumer commitLocked) { synchronized (this) { for (var written : names.written()) { - _objects.put(written.getKey(), written.getValue()); + _objects = _objects.plus(written.getKey(), written.getValue()); } for (JObjectKey key : names.deleted()) { - _objects.remove(key); + _objects = _objects.minus(key); } commitLocked.accept(() -> { _lock.writeLock().lock(); diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/persistence/ObjectPersistentStore.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/persistence/ObjectPersistentStore.java index bcb08401..877a360e 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/persistence/ObjectPersistentStore.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/persistence/ObjectPersistentStore.java @@ -2,7 +2,9 @@ package com.usatiuk.dhfs.objects.persistence; import com.google.protobuf.ByteString; import com.usatiuk.dhfs.objects.CloseableKvIterator; +import com.usatiuk.dhfs.objects.IterProdFn; import com.usatiuk.dhfs.objects.JObjectKey; +import com.usatiuk.dhfs.objects.snapshot.Snapshot; import javax.annotation.Nonnull; import java.util.Collection; @@ -22,6 +24,8 @@ public interface ObjectPersistentStore { // Does not have to guarantee consistent view, snapshots are handled by upper layers CloseableKvIterator getIterator(IteratorStart start, JObjectKey key); + Snapshot getSnapshot(); + /** * @param commitLocked - a function that will be called with a Runnable that will commit the transaction * the changes in the store will be visible to new transactions only after the runnable is called diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/persistence/SerializingObjectPersistentStore.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/persistence/SerializingObjectPersistentStore.java index f439731e..0dff59ee 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/persistence/SerializingObjectPersistentStore.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/persistence/SerializingObjectPersistentStore.java @@ -1,11 +1,18 @@ package com.usatiuk.dhfs.objects.persistence; +import com.google.protobuf.ByteString; import com.usatiuk.dhfs.objects.*; +import com.usatiuk.dhfs.objects.snapshot.Snapshot; +import com.usatiuk.dhfs.utils.RefcountedCloseable; import jakarta.enterprise.context.ApplicationScoped; import jakarta.inject.Inject; import org.apache.commons.lang3.tuple.Pair; +import org.lmdbjava.Txn; import javax.annotation.Nonnull; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; import java.util.Collection; import java.util.Optional; import java.util.function.Consumer; @@ -28,12 +35,6 @@ public class SerializingObjectPersistentStore { return delegateStore.readObject(name).map(serializer::deserialize); } - // Returns an iterator with a view of all commited objects - // Does not have to guarantee consistent view, snapshots are handled by upper layers - public CloseableKvIterator getIterator(IteratorStart start, JObjectKey key) { - return new MappingKvIterator<>(delegateStore.getIterator(start, key), d -> serializer.deserialize(d)); - } - public TxManifestRaw prepareManifest(TxManifestObj names) { return new TxManifestRaw( names.written().stream() @@ -42,6 +43,35 @@ public class SerializingObjectPersistentStore { , names.deleted()); } + public Snapshot getSnapshot() { + return new Snapshot() { + private final Snapshot _backing = delegateStore.getSnapshot(); + + @Override + public CloseableKvIterator getIterator(IteratorStart start, JObjectKey key) { + return new MappingKvIterator<>(_backing.getIterator(start, key), d -> serializer.deserialize(d)); + } + + @Nonnull + @Override + public Optional readObject(JObjectKey name) { + return _backing.readObject(name).map(serializer::deserialize); + } + + @Override + public long id() { + return _backing.id(); + } + + @Override + public void close() { + _backing.close(); + } + }; + + } + + // void commitTx(TxManifestObj names, Consumer commitLocked) { // delegateStore.commitTx(prepareManifest(names), commitLocked); // } diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/snapshot/Snapshot.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/snapshot/Snapshot.java new file mode 100644 index 00000000..718e7413 --- /dev/null +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/snapshot/Snapshot.java @@ -0,0 +1,18 @@ +package com.usatiuk.dhfs.objects.snapshot; + +import com.usatiuk.dhfs.objects.*; +import com.usatiuk.dhfs.objects.persistence.IteratorStart; +import com.usatiuk.dhfs.utils.AutoCloseableNoThrow; +import io.quarkus.logging.Log; + +import javax.annotation.Nonnull; +import java.util.Optional; + +public interface Snapshot, V> extends AutoCloseableNoThrow { + CloseableKvIterator getIterator(IteratorStart start, JObjectKey key); + + @Nonnull + Optional readObject(K name); + + long id(); +} diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/snapshot/SnapshotManager.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/snapshot/SnapshotManager.java index 77b36f46..203cbe90 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/snapshot/SnapshotManager.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/snapshot/SnapshotManager.java @@ -23,315 +23,17 @@ public class SnapshotManager { @Inject WritebackObjectPersistentStore writebackStore; - private final ReentrantReadWriteLock _lock = new ReentrantReadWriteLock(); - - @ConfigProperty(name = "dhfs.objects.persistence.snapshot-extra-checks") - boolean extraChecks; - - private long _lastSnapshotId = 0; - private long _lastAliveSnapshotId = -1; - - private final Queue _snapshotIds = new ArrayDeque<>(); - private TreePMap _objects = TreePMap.empty(); - private final TreeMap> _snapshotBounds = new TreeMap<>(); - private final HashMap _snapshotRefCounts = new HashMap<>(); - - private void verify() { - assert _snapshotIds.isEmpty() == (_lastAliveSnapshotId == -1); - assert _snapshotIds.isEmpty() || _snapshotIds.peek() == _lastAliveSnapshotId; + public Snapshot createSnapshot() { + return writebackStore.getSnapshot(); } // This should not be called for the same objects concurrently public Consumer commitTx(Collection> writes) { -// _lock.writeLock().lock(); -// try { -// if (!_snapshotIds.isEmpty()) { -// verify(); - HashMap newEntries = new HashMap<>(); - for (var action : writes) { - var current = writebackStore.readObjectVerbose(action.key()); - // Add to snapshot the previous visible version of the replaced object - // I.e. should be visible to all transactions with id <= id - // and at least as its corresponding version - Pair newSnapshotEntry = switch (current) { - case WritebackObjectPersistentStore.VerboseReadResultPersisted( - Optional data - ) -> Pair.of(new SnapshotKey(action.key(), data.map(JDataVersionedWrapper::version).orElse(-1L)), - data.map(o -> new SnapshotEntryObject(o, -1)).orElse(new SnapshotEntryDeleted(-1))); - case WritebackObjectPersistentStore.VerboseReadResultPending( - PendingWriteEntry pending - ) -> { - yield switch (pending) { - case PendingWrite write -> - Pair.of(new SnapshotKey(action.key(), write.bundleId()), new SnapshotEntryObject(write.data(), -1)); - case PendingDelete delete -> - Pair.of(new SnapshotKey(action.key(), delete.bundleId()), new SnapshotEntryDeleted(-1)); - default -> throw new IllegalStateException("Unexpected value: " + pending); - }; - } - default -> throw new IllegalStateException("Unexpected value: " + current); - }; - - - Log.tracev("Adding snapshot entry {0}", newSnapshotEntry); - - newEntries.put(newSnapshotEntry.getLeft(), newSnapshotEntry.getRight()); - } - - _lock.writeLock().lock(); - try { + // TODO: FIXME: + synchronized (this) { return writebackStore.commitTx(writes, (id, commit) -> { - if (!_snapshotIds.isEmpty()) { - assert id > _lastSnapshotId; - for (var newSnapshotEntry : newEntries.entrySet()) { - assert newSnapshotEntry.getKey().version() < id; - var realNewSnapshotEntry = newSnapshotEntry.getValue().withWhenToRemove(id); - if (realNewSnapshotEntry instanceof SnapshotEntryObject re) { - assert re.data().version() <= newSnapshotEntry.getKey().version(); - } - _objects = _objects.plus(newSnapshotEntry.getKey(), realNewSnapshotEntry); -// assert val == null; - _snapshotBounds.merge(newSnapshotEntry.getKey().version(), new ArrayDeque<>(List.of(newSnapshotEntry.getKey())), - (a, b) -> { - a.addAll(b); - return a; - }); - } - } commit.run(); }); - } finally { - _lock.writeLock().unlock(); - } - -// } - -// verify(); - // Commit under lock, iterators will see new version after the lock is released and writeback - // cache is updated - // TODO: Maybe writeback iterator being invalidated wouldn't be a problem? -// } finally { -// _lock.writeLock().unlock(); -// } - } - - private void unrefSnapshot(long id) { - Log.tracev("Unref snapshot {0}", id); - _lock.writeLock().lock(); - try { - verify(); - var refCount = _snapshotRefCounts.merge(id, -1L, (a, b) -> a + b == 0 ? null : a + b); - if (!(refCount == null && id == _lastAliveSnapshotId)) { - return; - } - - long curCount; - long curId = id; - long nextId; - do { - Log.tracev("Removing snapshot {0}", curId); - _snapshotIds.poll(); - nextId = _snapshotIds.isEmpty() ? -1 : _snapshotIds.peek(); - while (nextId == curId) { - _snapshotIds.poll(); - nextId = _snapshotIds.isEmpty() ? -1 : _snapshotIds.peek(); - } - - var keys = _snapshotBounds.headMap(curId, true); - - long finalCurId = curId; - long finalNextId = nextId; - ArrayList> toReAdd = new ArrayList<>(); - keys.values().stream().flatMap(Collection::stream).forEach(key -> { - var entry = _objects.get(key); - if (entry == null) { -// Log.warnv("Entry not found for key {0}", key); - return; - } - if (finalNextId == -1) { - Log.tracev("Could not find place to place entry {0}, curId={1}, nextId={2}, whenToRemove={3}, snapshotIds={4}", - entry, finalCurId, finalNextId, entry.whenToRemove(), _snapshotIds); - } else if (finalNextId < entry.whenToRemove()) { - _objects = _objects.plus(new SnapshotKey(key.key(), finalNextId), entry); - assert finalNextId > finalCurId; - toReAdd.add(Pair.of(finalNextId, new SnapshotKey(key.key(), finalNextId))); - } - _objects = _objects.minus(key); - }); - - toReAdd.forEach(p -> { - _snapshotBounds.merge(p.getLeft(), new ArrayDeque<>(List.of(p.getRight())), - (a, b) -> { - a.addAll(b); - return a; - }); - }); - - keys.clear(); - - if (_snapshotIds.isEmpty()) { - _lastAliveSnapshotId = -1; - break; - } - - curId = _snapshotIds.peek(); - _lastAliveSnapshotId = curId; - - curCount = _snapshotRefCounts.getOrDefault(curId, 0L); - } while (curCount == 0); - verify(); - } finally { - _lock.writeLock().unlock(); - } - } - - public static class IllegalSnapshotIdException extends IllegalArgumentException { - public IllegalSnapshotIdException(String message) { - super(message); - } - - @Override - public synchronized Throwable fillInStackTrace() { - return this; - } - } - - public class Snapshot implements AutoCloseableNoThrow { - private final long _id; - private static final Cleaner CLEANER = Cleaner.create(); - private final MutableObject _closed = new MutableObject<>(false); - - public long id() { - return _id; - } - - private Snapshot(long id) { - _id = id; - _lock.writeLock().lock(); - try { - verify(); - if (_lastSnapshotId > id) - throw new IllegalSnapshotIdException("Snapshot id " + id + " is less than last snapshot id " + _lastSnapshotId); - _lastSnapshotId = id; - if (_lastAliveSnapshotId == -1) - _lastAliveSnapshotId = id; - if (_snapshotRefCounts.merge(id, 1L, Long::sum) == 1) { - _snapshotIds.add(id); - } - verify(); - } finally { - _lock.writeLock().unlock(); - } - var closedRef = _closed; - var idRef = _id; - CLEANER.register(this, () -> { - if (!closedRef.getValue()) { - Log.error("Snapshot " + idRef + " was not closed before GC"); - } - }); - } - - public class CheckingSnapshotKvIterator implements CloseableKvIterator { - private final CloseableKvIterator _backing; - - public CheckingSnapshotKvIterator(CloseableKvIterator backing) { - _backing = backing; - } - - @Override - public JObjectKey peekNextKey() { - return _backing.peekNextKey(); - } - - @Override - public void skip() { - _backing.skip(); - } - - @Override - public JObjectKey peekPrevKey() { - return _backing.peekPrevKey(); - } - - @Override - public Pair prev() { - var ret = _backing.prev(); - assert ret.getValue().version() <= _id; - return ret; - } - - @Override - public boolean hasPrev() { - return _backing.hasPrev(); - } - - @Override - public void skipPrev() { - _backing.skipPrev(); - } - - @Override - public void close() { - _backing.close(); - } - - @Override - public boolean hasNext() { - return _backing.hasNext(); - } - - @Override - public Pair next() { - var ret = _backing.next(); - assert ret.getValue().version() <= _id; - return ret; - } - } - - public CloseableKvIterator getIterator(IteratorStart start, JObjectKey key) { - _lock.readLock().lock(); - try { - Log.tracev("Getting snapshot {0} iterator for {1} {2}\n" + - "objects in snapshots: {3}", _id, start, key, _objects); - return new CheckingSnapshotKvIterator(new TombstoneMergingKvIterator<>("snapshot", start, key, - (tS, tK) -> new SnapshotKvIterator(_objects, _id, tS, tK), - (tS, tK) -> new MappingKvIterator<>( - writebackStore.getIterator(tS, tK), d -> d.version() <= _id ? new Data<>(d) : new Tombstone<>()) - )); - } finally { - _lock.readLock().unlock(); - } - } - - @Nonnull - public Optional readObject(JObjectKey name) { - try (var it = getIterator(IteratorStart.GE, name)) { - if (it.hasNext()) { - if (!it.peekNextKey().equals(name)) { - return Optional.empty(); - } - return Optional.of(it.next().getValue()); - } - } - return Optional.empty(); - } - - @Override - public void close() { - if (_closed.getValue()) { - return; - } - _closed.setValue(true); - unrefSnapshot(_id); - } - } - - public Snapshot createSnapshot() { - _lock.writeLock().lock(); - try { - return new Snapshot(writebackStore.getLastTxId()); - } finally { - _lock.writeLock().unlock(); } } diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/TransactionFactoryImpl.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/TransactionFactoryImpl.java index 9288fb5a..4ec40f74 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/TransactionFactoryImpl.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/TransactionFactoryImpl.java @@ -2,6 +2,7 @@ package com.usatiuk.dhfs.objects.transaction; import com.usatiuk.dhfs.objects.*; import com.usatiuk.dhfs.objects.persistence.IteratorStart; +import com.usatiuk.dhfs.objects.snapshot.Snapshot; import com.usatiuk.dhfs.objects.snapshot.SnapshotManager; import io.quarkus.logging.Log; import jakarta.enterprise.context.ApplicationScoped; @@ -50,13 +51,15 @@ public class TransactionFactoryImpl implements TransactionFactory { } private class TransactionImpl implements TransactionPrivate { + private boolean _closed = false; + private final Map> _readSet = new HashMap<>(); private final NavigableMap> _writes = new TreeMap<>(); private Map> _newWrites = new HashMap<>(); private final List _onCommit = new ArrayList<>(); private final List _onFlush = new ArrayList<>(); - private final SnapshotManager.Snapshot _snapshot; + private final Snapshot _snapshot; private TransactionImpl() { _snapshot = snapshotManager.createSnapshot(); @@ -139,7 +142,7 @@ public class TransactionFactoryImpl implements TransactionFactory { } @Override - public SnapshotManager.Snapshot snapshot() { + public Snapshot snapshot() { return _snapshot; } @@ -253,6 +256,8 @@ public class TransactionFactoryImpl implements TransactionFactory { @Override public void close() { + if (_closed) return; + _closed = true; _snapshot.close(); } } diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/TransactionPrivate.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/TransactionPrivate.java index be59ad3f..6f009f51 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/TransactionPrivate.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/TransactionPrivate.java @@ -1,7 +1,9 @@ package com.usatiuk.dhfs.objects.transaction; import com.usatiuk.dhfs.objects.JData; +import com.usatiuk.dhfs.objects.JDataVersionedWrapper; import com.usatiuk.dhfs.objects.JObjectKey; +import com.usatiuk.dhfs.objects.snapshot.Snapshot; import com.usatiuk.dhfs.objects.snapshot.SnapshotManager; import com.usatiuk.dhfs.utils.AutoCloseableNoThrow; @@ -19,5 +21,5 @@ public interface TransactionPrivate extends Transaction, TransactionHandlePrivat Collection getOnCommit(); - SnapshotManager.Snapshot snapshot(); + Snapshot snapshot(); }