Objects: simplify snapshots

This commit is contained in:
2025-03-13 19:35:19 +01:00
parent 3faab4c324
commit 29fd2826a3
12 changed files with 328 additions and 376 deletions

View File

@@ -1,8 +1,13 @@
package com.usatiuk.dhfs.objects;
import com.usatiuk.dhfs.objects.persistence.IteratorStart;
import com.usatiuk.dhfs.utils.AutoCloseableNoThrow;
@FunctionalInterface
public interface IterProdFn<K extends Comparable<K>, V> {
public interface IterProdFn<K extends Comparable<K>, V> extends AutoCloseableNoThrow {
CloseableKvIterator<K, V> get(IteratorStart start, K key);
@Override
default void close() {
}
}

View File

@@ -54,6 +54,9 @@ public class MergingKvIterator<K extends Comparable<K>, V> extends ReversibleKvI
}
iteratorsTmp.put(iterator, counter++);
}
for (var prodFn : iterators) {
prodFn.close();
}
_iterators = Map.copyOf(iteratorsTmp);
_pendingIterators = null;
}
@@ -129,6 +132,9 @@ public class MergingKvIterator<K extends Comparable<K>, V> extends ReversibleKvI
iteratorsTmp.put(iterator, counter++);
}
_iterators = Map.copyOf(iteratorsTmp);
for (var prodFn : _pendingIterators) {
prodFn.close();
}
}
doInitialAdvance();

View File

@@ -3,6 +3,7 @@ package com.usatiuk.dhfs.objects;
import com.usatiuk.dhfs.objects.persistence.CachingObjectPersistentStore;
import com.usatiuk.dhfs.objects.persistence.IteratorStart;
import com.usatiuk.dhfs.objects.persistence.TxManifestObj;
import com.usatiuk.dhfs.objects.snapshot.Snapshot;
import com.usatiuk.dhfs.objects.transaction.TxRecord;
import io.quarkus.logging.Log;
import io.quarkus.runtime.ShutdownEvent;
@@ -457,25 +458,72 @@ public class WritebackObjectPersistentStore {
return r -> asyncFence(bundleId, r);
}
// Returns an iterator with a view of all commited objects
// Does not have to guarantee consistent view, snapshots are handled by upper layers
// Invalidated by commitBundle, but might return data after it has been really committed
public CloseableKvIterator<JObjectKey, JDataVersionedWrapper> getIterator(IteratorStart start, JObjectKey key) {
Log.tracev("Getting writeback iterator: {0}, {1}", start, key);
_pendingWritesVersionLock.readLock().lock();
public Snapshot<JObjectKey, JDataVersionedWrapper> getSnapshot() {
PSortedMap<JObjectKey, PendingWriteEntry> pendingWrites;
Snapshot<JObjectKey, JDataVersionedWrapper> cache = null;
long lastTxId;
try {
var curPending = _pendingWrites.get();
return new TombstoneMergingKvIterator<>("writeback-ps", start, key,
(tS, tK) -> new MappingKvIterator<>(
new NavigableMapKvIterator<>(curPending, tS, tK),
e -> switch (e) {
case PendingWrite pw -> new Data<>(pw.data());
case PendingDelete d -> new Tombstone<>();
default -> throw new IllegalStateException("Unexpected value: " + e);
}),
(tS, tK) -> cachedStore.getIterator(tS, tK));
} finally {
_pendingWritesVersionLock.readLock().unlock();
_pendingWritesVersionLock.readLock().lock();
try {
pendingWrites = _pendingWrites.get();
cache = cachedStore.getSnapshot();
lastTxId = getLastTxId();
} finally {
_pendingWritesVersionLock.readLock().unlock();
}
Snapshot<JObjectKey, JDataVersionedWrapper> finalCache = cache;
return new Snapshot<JObjectKey, JDataVersionedWrapper>() {
private final PSortedMap<JObjectKey, PendingWriteEntry> _pendingWrites = pendingWrites;
private final Snapshot<JObjectKey, JDataVersionedWrapper> _cache = finalCache;
private final long txId = lastTxId;
@Override
public CloseableKvIterator<JObjectKey, JDataVersionedWrapper> getIterator(IteratorStart start, JObjectKey key) {
return new TombstoneMergingKvIterator<>("writeback-ps", start, key,
(tS, tK) -> new MappingKvIterator<>(
new NavigableMapKvIterator<>(_pendingWrites, tS, tK),
e -> switch (e) {
case PendingWrite pw -> new Data<>(pw.data());
case PendingDelete d -> new Tombstone<>();
default -> throw new IllegalStateException("Unexpected value: " + e);
}),
(tS, tK) -> new MappingKvIterator<>(_cache.getIterator(tS, tK), Data::new));
}
@Nonnull
@Override
public Optional<JDataVersionedWrapper> readObject(JObjectKey name) {
var cached = _pendingWrites.get(name);
if (cached != null) {
return switch (cached) {
case PendingWrite c -> Optional.of(c.data());
case PendingDelete d -> {
yield Optional.empty();
}
default -> throw new IllegalStateException("Unexpected value: " + cached);
};
}
return _cache.readObject(name);
}
@Override
public long id() {
assert lastTxId >= _cache.id();
return lastTxId;
}
@Override
public void close() {
_cache.close();
}
};
} catch (Throwable e) {
if (cache != null)
cache.close();
throw e;
}
}

View File

@@ -1,6 +1,7 @@
package com.usatiuk.dhfs.objects.persistence;
import com.usatiuk.dhfs.objects.*;
import com.usatiuk.dhfs.objects.snapshot.Snapshot;
import com.usatiuk.dhfs.utils.DataLocker;
import io.quarkus.logging.Log;
import io.quarkus.runtime.Startup;
@@ -102,7 +103,9 @@ public class CachingObjectPersistentStore {
public void commitTx(TxManifestObj<? extends JDataVersionedWrapper> names, long txId) {
var serialized = delegate.prepareManifest(names);
Log.tracev("Committing: {0} writes, {1} deletes", names.written().size(), names.deleted().size());
// A little complicated locking to minimize write lock holding time
delegate.commitTx(serialized, txId, (commit) -> {
_lock.writeLock().lock();
try {
@@ -122,14 +125,13 @@ public class CachingObjectPersistentStore {
Log.tracev("Committed: {0} writes, {1} deletes", names.written().size(), names.deleted().size());
}
private class CachingKvIterator implements CloseableKvIterator<JObjectKey, JDataVersionedWrapper> {
private final CloseableKvIterator<JObjectKey, JDataVersionedWrapper> _delegate;
// This should be created under lock
private final long _curCacheVersion = _cacheVersion;
private final long _curCacheVersion;
private CachingKvIterator(CloseableKvIterator<JObjectKey, JDataVersionedWrapper> delegate) {
private CachingKvIterator(CloseableKvIterator<JObjectKey, JDataVersionedWrapper> delegate, long cacheVersion) {
_delegate = delegate;
_curCacheVersion = cacheVersion;
}
@Override
@@ -196,28 +198,74 @@ public class CachingObjectPersistentStore {
}
}
// Returns an iterator with a view of all commited objects
// Does not have to guarantee consistent view, snapshots are handled by upper layers
// Warning: it has a nasty side effect of global caching, so in this case don't even call next on it,
// if some objects are still in writeback
public CloseableKvIterator<JObjectKey, MaybeTombstone<JDataVersionedWrapper>> getIterator(IteratorStart start, JObjectKey key) {
_lock.readLock().lock();
public Snapshot<JObjectKey, JDataVersionedWrapper> getSnapshot() {
TreePMap<JObjectKey, CacheEntry> curSortedCache;
Snapshot<JObjectKey, JDataVersionedWrapper> backing = null;
long cacheVersion;
try {
Log.tracev("Getting cache iterator: {0}, {1}", start, key);
var curSortedCache = _sortedCache;
return new MergingKvIterator<>("cache", start, key,
(mS, mK)
-> new MappingKvIterator<>(
new NavigableMapKvIterator<>(curSortedCache, mS, mK),
e -> {
Log.tracev("Taken from cache: {0}", e);
return e.object();
Log.tracev("Getting cache snapshot");
// Decrease the lock time as much as possible
_lock.readLock().lock();
try {
curSortedCache = _sortedCache;
cacheVersion = _cacheVersion;
backing = delegate.getSnapshot();
} finally {
_lock.readLock().unlock();
}
Snapshot<JObjectKey, JDataVersionedWrapper> finalBacking = backing;
return new Snapshot<JObjectKey, JDataVersionedWrapper>() {
private final TreePMap<JObjectKey, CacheEntry> _curSortedCache = curSortedCache;
private final Snapshot<JObjectKey, JDataVersionedWrapper> _backing = finalBacking;
private final long _cacheVersion = cacheVersion;
@Override
public CloseableKvIterator<JObjectKey, JDataVersionedWrapper> getIterator(IteratorStart start, JObjectKey key) {
return new TombstoneMergingKvIterator<>("cache", start, key,
(mS, mK)
-> new MappingKvIterator<>(
new NavigableMapKvIterator<>(_curSortedCache, mS, mK),
e -> {
Log.tracev("Taken from cache: {0}", e);
return e.object();
}
),
(mS, mK) -> new MappingKvIterator<>(new CachingKvIterator(_backing.getIterator(start, key), _cacheVersion), Data::new));
}
@Nonnull
@Override
public Optional<JDataVersionedWrapper> readObject(JObjectKey name) {
var cached = _curSortedCache.get(name);
if (cached != null) {
return switch (cached.object()) {
case Data<JDataVersionedWrapper> data -> Optional.of(data.value());
case Tombstone<JDataVersionedWrapper> tombstone -> {
yield Optional.empty();
}
),
(mS, mK)
-> new MappingKvIterator<>(new CachingKvIterator(delegate.getIterator(mS, mK)), Data::new));
} finally {
_lock.readLock().unlock();
default -> throw new IllegalStateException("Unexpected value: " + cached.object());
};
}
return _backing.readObject(name);
}
@Override
public long id() {
return _backing.id();
}
@Override
public void close() {
_backing.close();
}
};
} catch (Throwable ex) {
if (backing != null) {
backing.close();
}
throw ex;
}
}

View File

@@ -1,10 +1,8 @@
package com.usatiuk.dhfs.objects.persistence;
import com.google.protobuf.ByteString;
import com.usatiuk.dhfs.objects.CloseableKvIterator;
import com.usatiuk.dhfs.objects.JObjectKey;
import com.usatiuk.dhfs.objects.KeyPredicateKvIterator;
import com.usatiuk.dhfs.objects.ReversibleKvIterator;
import com.usatiuk.dhfs.objects.*;
import com.usatiuk.dhfs.objects.snapshot.Snapshot;
import com.usatiuk.dhfs.supportlib.UninitializedByteBuffer;
import com.usatiuk.dhfs.utils.RefcountedCloseable;
import io.quarkus.arc.properties.IfBuildProperty;
@@ -116,6 +114,23 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore {
}
}
private RefcountedCloseable<Txn<ByteBuffer>> getCurTxn() {
_lock.readLock().lock();
try {
var got = _curReadTxn.get();
var refInc = Optional.ofNullable(got).map(RefcountedCloseable::ref).orElse(null);
if (refInc != null) {
return got;
} else {
var newTxn = new RefcountedCloseable<>(_env.txnRead());
_curReadTxn.compareAndSet(got, newTxn);
return newTxn;
}
} finally {
_lock.readLock().unlock();
}
}
private class LmdbKvIterator extends ReversibleKvIterator<JObjectKey, ByteString> {
private final RefcountedCloseable<Txn<ByteBuffer>> _txn;
private final Cursor<ByteBuffer> _cursor;
@@ -126,23 +141,10 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore {
// private final Exception _allocationStacktrace = new Exception();
private final Exception _allocationStacktrace = null;
LmdbKvIterator(IteratorStart start, JObjectKey key) {
LmdbKvIterator(RefcountedCloseable<Txn<ByteBuffer>> txn, IteratorStart start, JObjectKey key) {
_txn = txn;
_goingForward = true;
_lock.readLock().lock();
try {
var got = _curReadTxn.get();
var refInc = Optional.ofNullable(got).map(RefcountedCloseable::ref).orElse(null);
if (refInc != null) {
_txn = got;
} else {
var newTxn = new RefcountedCloseable<>(_env.txnRead());
_curReadTxn.compareAndSet(got, newTxn);
_txn = newTxn;
}
} finally {
_lock.readLock().unlock();
}
_cursor = _db.openCursor(_txn.get());
var closedRef = _closed;
@@ -214,6 +216,10 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore {
Log.tracev("got: {0}, hasNext: {1}", realGot, _hasNext);
}
LmdbKvIterator(IteratorStart start, JObjectKey key) {
this(getCurTxn(), start, key);
}
@Override
public void close() {
if (_closed.getValue()) {
@@ -292,6 +298,50 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore {
return new KeyPredicateKvIterator<>(new LmdbKvIterator(start, key), start, key, (k) -> !Arrays.equals(k.name().getBytes(StandardCharsets.UTF_8), DB_VER_OBJ_NAME));
}
@Override
public Snapshot<JObjectKey, ByteString> getSnapshot() {
_lock.readLock().lock();
try {
var txn = new RefcountedCloseable<>(_env.txnRead());
var commitId = getLastCommitId();
return new Snapshot<JObjectKey, ByteString>() {
private boolean _closed = false;
private final RefcountedCloseable<Txn<ByteBuffer>> _txn = txn;
private final long _id = commitId;
@Override
public CloseableKvIterator<JObjectKey, ByteString> getIterator(IteratorStart start, JObjectKey key) {
assert !_closed;
return new KeyPredicateKvIterator<>(new LmdbKvIterator(_txn.ref(), start, key), start, key, (k) -> !Arrays.equals(k.name().getBytes(StandardCharsets.UTF_8), DB_VER_OBJ_NAME));
}
@Nonnull
@Override
public Optional<ByteString> readObject(JObjectKey name) {
assert !_closed;
var got = _db.get(_txn.get(), name.toByteBuffer());
var ret = Optional.ofNullable(got).map(ByteString::copyFrom);
return ret;
}
@Override
public long id() {
assert !_closed;
return _id;
}
@Override
public void close() {
assert !_closed;
_closed = true;
_txn.unref();
}
};
} finally {
_lock.readLock().unlock();
}
}
@Override
public void commitTx(TxManifestRaw names, long txId, Consumer<Runnable> commitLocked) {
verifyReady();

View File

@@ -2,10 +2,13 @@ package com.usatiuk.dhfs.objects.persistence;
import com.google.protobuf.ByteString;
import com.usatiuk.dhfs.objects.CloseableKvIterator;
import com.usatiuk.dhfs.objects.IterProdFn;
import com.usatiuk.dhfs.objects.JObjectKey;
import com.usatiuk.dhfs.objects.NavigableMapKvIterator;
import com.usatiuk.dhfs.objects.snapshot.Snapshot;
import io.quarkus.arc.properties.IfBuildProperty;
import jakarta.enterprise.context.ApplicationScoped;
import org.pcollections.TreePMap;
import javax.annotation.Nonnull;
import java.util.Collection;
@@ -17,7 +20,7 @@ import java.util.function.Consumer;
@ApplicationScoped
@IfBuildProperty(name = "dhfs.objects.persistence", stringValue = "memory")
public class MemoryObjectPersistentStore implements ObjectPersistentStore {
private final ConcurrentSkipListMap<JObjectKey, ByteString> _objects = new ConcurrentSkipListMap<>();
private TreePMap<JObjectKey, ByteString> _objects = TreePMap.empty();
private long _lastCommitId = 0;
private final ReentrantReadWriteLock _lock = new ReentrantReadWriteLock();
@@ -42,14 +45,45 @@ public class MemoryObjectPersistentStore implements ObjectPersistentStore {
return new NavigableMapKvIterator<>(_objects, start, key);
}
@Override
public Snapshot<JObjectKey, ByteString> getSnapshot() {
synchronized (this) {
return new Snapshot<JObjectKey, ByteString>() {
private final TreePMap<JObjectKey, ByteString> _objects = MemoryObjectPersistentStore.this._objects;
private final long _lastCommitId = MemoryObjectPersistentStore.this._lastCommitId;
@Override
public CloseableKvIterator<JObjectKey, ByteString> getIterator(IteratorStart start, JObjectKey key) {
return new NavigableMapKvIterator<>(_objects, start, key);
}
@Nonnull
@Override
public Optional<ByteString> readObject(JObjectKey name) {
return Optional.ofNullable(_objects.get(name));
}
@Override
public long id() {
return _lastCommitId;
}
@Override
public void close() {
}
};
}
}
@Override
public void commitTx(TxManifestRaw names, long txId, Consumer<Runnable> commitLocked) {
synchronized (this) {
for (var written : names.written()) {
_objects.put(written.getKey(), written.getValue());
_objects = _objects.plus(written.getKey(), written.getValue());
}
for (JObjectKey key : names.deleted()) {
_objects.remove(key);
_objects = _objects.minus(key);
}
commitLocked.accept(() -> {
_lock.writeLock().lock();

View File

@@ -2,7 +2,9 @@ package com.usatiuk.dhfs.objects.persistence;
import com.google.protobuf.ByteString;
import com.usatiuk.dhfs.objects.CloseableKvIterator;
import com.usatiuk.dhfs.objects.IterProdFn;
import com.usatiuk.dhfs.objects.JObjectKey;
import com.usatiuk.dhfs.objects.snapshot.Snapshot;
import javax.annotation.Nonnull;
import java.util.Collection;
@@ -22,6 +24,8 @@ public interface ObjectPersistentStore {
// Does not have to guarantee consistent view, snapshots are handled by upper layers
CloseableKvIterator<JObjectKey, ByteString> getIterator(IteratorStart start, JObjectKey key);
Snapshot<JObjectKey, ByteString> getSnapshot();
/**
* @param commitLocked - a function that will be called with a Runnable that will commit the transaction
* the changes in the store will be visible to new transactions only after the runnable is called

View File

@@ -1,11 +1,18 @@
package com.usatiuk.dhfs.objects.persistence;
import com.google.protobuf.ByteString;
import com.usatiuk.dhfs.objects.*;
import com.usatiuk.dhfs.objects.snapshot.Snapshot;
import com.usatiuk.dhfs.utils.RefcountedCloseable;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import org.apache.commons.lang3.tuple.Pair;
import org.lmdbjava.Txn;
import javax.annotation.Nonnull;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collection;
import java.util.Optional;
import java.util.function.Consumer;
@@ -28,12 +35,6 @@ public class SerializingObjectPersistentStore {
return delegateStore.readObject(name).map(serializer::deserialize);
}
// Returns an iterator with a view of all commited objects
// Does not have to guarantee consistent view, snapshots are handled by upper layers
public CloseableKvIterator<JObjectKey, JDataVersionedWrapper> getIterator(IteratorStart start, JObjectKey key) {
return new MappingKvIterator<>(delegateStore.getIterator(start, key), d -> serializer.deserialize(d));
}
public TxManifestRaw prepareManifest(TxManifestObj<? extends JDataVersionedWrapper> names) {
return new TxManifestRaw(
names.written().stream()
@@ -42,6 +43,35 @@ public class SerializingObjectPersistentStore {
, names.deleted());
}
public Snapshot<JObjectKey, JDataVersionedWrapper> getSnapshot() {
return new Snapshot<JObjectKey, JDataVersionedWrapper>() {
private final Snapshot<JObjectKey, ByteString> _backing = delegateStore.getSnapshot();
@Override
public CloseableKvIterator<JObjectKey, JDataVersionedWrapper> getIterator(IteratorStart start, JObjectKey key) {
return new MappingKvIterator<>(_backing.getIterator(start, key), d -> serializer.deserialize(d));
}
@Nonnull
@Override
public Optional<JDataVersionedWrapper> readObject(JObjectKey name) {
return _backing.readObject(name).map(serializer::deserialize);
}
@Override
public long id() {
return _backing.id();
}
@Override
public void close() {
_backing.close();
}
};
}
// void commitTx(TxManifestObj<? extends JDataVersionedWrapper> names, Consumer<Runnable> commitLocked) {
// delegateStore.commitTx(prepareManifest(names), commitLocked);
// }

View File

@@ -0,0 +1,18 @@
package com.usatiuk.dhfs.objects.snapshot;
import com.usatiuk.dhfs.objects.*;
import com.usatiuk.dhfs.objects.persistence.IteratorStart;
import com.usatiuk.dhfs.utils.AutoCloseableNoThrow;
import io.quarkus.logging.Log;
import javax.annotation.Nonnull;
import java.util.Optional;
public interface Snapshot<K extends Comparable<K>, V> extends AutoCloseableNoThrow {
CloseableKvIterator<K, V> getIterator(IteratorStart start, JObjectKey key);
@Nonnull
Optional<V> readObject(K name);
long id();
}

View File

@@ -23,315 +23,17 @@ public class SnapshotManager {
@Inject
WritebackObjectPersistentStore writebackStore;
private final ReentrantReadWriteLock _lock = new ReentrantReadWriteLock();
@ConfigProperty(name = "dhfs.objects.persistence.snapshot-extra-checks")
boolean extraChecks;
private long _lastSnapshotId = 0;
private long _lastAliveSnapshotId = -1;
private final Queue<Long> _snapshotIds = new ArrayDeque<>();
private TreePMap<SnapshotKey, SnapshotEntry> _objects = TreePMap.empty();
private final TreeMap<Long, ArrayDeque<SnapshotKey>> _snapshotBounds = new TreeMap<>();
private final HashMap<Long, Long> _snapshotRefCounts = new HashMap<>();
private void verify() {
assert _snapshotIds.isEmpty() == (_lastAliveSnapshotId == -1);
assert _snapshotIds.isEmpty() || _snapshotIds.peek() == _lastAliveSnapshotId;
public Snapshot<JObjectKey, JDataVersionedWrapper> createSnapshot() {
return writebackStore.getSnapshot();
}
// This should not be called for the same objects concurrently
public Consumer<Runnable> commitTx(Collection<TxRecord.TxObjectRecord<?>> writes) {
// _lock.writeLock().lock();
// try {
// if (!_snapshotIds.isEmpty()) {
// verify();
HashMap<SnapshotKey, SnapshotEntry> newEntries = new HashMap<>();
for (var action : writes) {
var current = writebackStore.readObjectVerbose(action.key());
// Add to snapshot the previous visible version of the replaced object
// I.e. should be visible to all transactions with id <= id
// and at least as its corresponding version
Pair<SnapshotKey, SnapshotEntry> newSnapshotEntry = switch (current) {
case WritebackObjectPersistentStore.VerboseReadResultPersisted(
Optional<JDataVersionedWrapper> data
) -> Pair.of(new SnapshotKey(action.key(), data.map(JDataVersionedWrapper::version).orElse(-1L)),
data.<SnapshotEntry>map(o -> new SnapshotEntryObject(o, -1)).orElse(new SnapshotEntryDeleted(-1)));
case WritebackObjectPersistentStore.VerboseReadResultPending(
PendingWriteEntry pending
) -> {
yield switch (pending) {
case PendingWrite write ->
Pair.of(new SnapshotKey(action.key(), write.bundleId()), new SnapshotEntryObject(write.data(), -1));
case PendingDelete delete ->
Pair.of(new SnapshotKey(action.key(), delete.bundleId()), new SnapshotEntryDeleted(-1));
default -> throw new IllegalStateException("Unexpected value: " + pending);
};
}
default -> throw new IllegalStateException("Unexpected value: " + current);
};
Log.tracev("Adding snapshot entry {0}", newSnapshotEntry);
newEntries.put(newSnapshotEntry.getLeft(), newSnapshotEntry.getRight());
}
_lock.writeLock().lock();
try {
// TODO: FIXME:
synchronized (this) {
return writebackStore.commitTx(writes, (id, commit) -> {
if (!_snapshotIds.isEmpty()) {
assert id > _lastSnapshotId;
for (var newSnapshotEntry : newEntries.entrySet()) {
assert newSnapshotEntry.getKey().version() < id;
var realNewSnapshotEntry = newSnapshotEntry.getValue().withWhenToRemove(id);
if (realNewSnapshotEntry instanceof SnapshotEntryObject re) {
assert re.data().version() <= newSnapshotEntry.getKey().version();
}
_objects = _objects.plus(newSnapshotEntry.getKey(), realNewSnapshotEntry);
// assert val == null;
_snapshotBounds.merge(newSnapshotEntry.getKey().version(), new ArrayDeque<>(List.of(newSnapshotEntry.getKey())),
(a, b) -> {
a.addAll(b);
return a;
});
}
}
commit.run();
});
} finally {
_lock.writeLock().unlock();
}
// }
// verify();
// Commit under lock, iterators will see new version after the lock is released and writeback
// cache is updated
// TODO: Maybe writeback iterator being invalidated wouldn't be a problem?
// } finally {
// _lock.writeLock().unlock();
// }
}
private void unrefSnapshot(long id) {
Log.tracev("Unref snapshot {0}", id);
_lock.writeLock().lock();
try {
verify();
var refCount = _snapshotRefCounts.merge(id, -1L, (a, b) -> a + b == 0 ? null : a + b);
if (!(refCount == null && id == _lastAliveSnapshotId)) {
return;
}
long curCount;
long curId = id;
long nextId;
do {
Log.tracev("Removing snapshot {0}", curId);
_snapshotIds.poll();
nextId = _snapshotIds.isEmpty() ? -1 : _snapshotIds.peek();
while (nextId == curId) {
_snapshotIds.poll();
nextId = _snapshotIds.isEmpty() ? -1 : _snapshotIds.peek();
}
var keys = _snapshotBounds.headMap(curId, true);
long finalCurId = curId;
long finalNextId = nextId;
ArrayList<Pair<Long, SnapshotKey>> toReAdd = new ArrayList<>();
keys.values().stream().flatMap(Collection::stream).forEach(key -> {
var entry = _objects.get(key);
if (entry == null) {
// Log.warnv("Entry not found for key {0}", key);
return;
}
if (finalNextId == -1) {
Log.tracev("Could not find place to place entry {0}, curId={1}, nextId={2}, whenToRemove={3}, snapshotIds={4}",
entry, finalCurId, finalNextId, entry.whenToRemove(), _snapshotIds);
} else if (finalNextId < entry.whenToRemove()) {
_objects = _objects.plus(new SnapshotKey(key.key(), finalNextId), entry);
assert finalNextId > finalCurId;
toReAdd.add(Pair.of(finalNextId, new SnapshotKey(key.key(), finalNextId)));
}
_objects = _objects.minus(key);
});
toReAdd.forEach(p -> {
_snapshotBounds.merge(p.getLeft(), new ArrayDeque<>(List.of(p.getRight())),
(a, b) -> {
a.addAll(b);
return a;
});
});
keys.clear();
if (_snapshotIds.isEmpty()) {
_lastAliveSnapshotId = -1;
break;
}
curId = _snapshotIds.peek();
_lastAliveSnapshotId = curId;
curCount = _snapshotRefCounts.getOrDefault(curId, 0L);
} while (curCount == 0);
verify();
} finally {
_lock.writeLock().unlock();
}
}
public static class IllegalSnapshotIdException extends IllegalArgumentException {
public IllegalSnapshotIdException(String message) {
super(message);
}
@Override
public synchronized Throwable fillInStackTrace() {
return this;
}
}
public class Snapshot implements AutoCloseableNoThrow {
private final long _id;
private static final Cleaner CLEANER = Cleaner.create();
private final MutableObject<Boolean> _closed = new MutableObject<>(false);
public long id() {
return _id;
}
private Snapshot(long id) {
_id = id;
_lock.writeLock().lock();
try {
verify();
if (_lastSnapshotId > id)
throw new IllegalSnapshotIdException("Snapshot id " + id + " is less than last snapshot id " + _lastSnapshotId);
_lastSnapshotId = id;
if (_lastAliveSnapshotId == -1)
_lastAliveSnapshotId = id;
if (_snapshotRefCounts.merge(id, 1L, Long::sum) == 1) {
_snapshotIds.add(id);
}
verify();
} finally {
_lock.writeLock().unlock();
}
var closedRef = _closed;
var idRef = _id;
CLEANER.register(this, () -> {
if (!closedRef.getValue()) {
Log.error("Snapshot " + idRef + " was not closed before GC");
}
});
}
public class CheckingSnapshotKvIterator implements CloseableKvIterator<JObjectKey, JDataVersionedWrapper> {
private final CloseableKvIterator<JObjectKey, JDataVersionedWrapper> _backing;
public CheckingSnapshotKvIterator(CloseableKvIterator<JObjectKey, JDataVersionedWrapper> backing) {
_backing = backing;
}
@Override
public JObjectKey peekNextKey() {
return _backing.peekNextKey();
}
@Override
public void skip() {
_backing.skip();
}
@Override
public JObjectKey peekPrevKey() {
return _backing.peekPrevKey();
}
@Override
public Pair<JObjectKey, JDataVersionedWrapper> prev() {
var ret = _backing.prev();
assert ret.getValue().version() <= _id;
return ret;
}
@Override
public boolean hasPrev() {
return _backing.hasPrev();
}
@Override
public void skipPrev() {
_backing.skipPrev();
}
@Override
public void close() {
_backing.close();
}
@Override
public boolean hasNext() {
return _backing.hasNext();
}
@Override
public Pair<JObjectKey, JDataVersionedWrapper> next() {
var ret = _backing.next();
assert ret.getValue().version() <= _id;
return ret;
}
}
public CloseableKvIterator<JObjectKey, JDataVersionedWrapper> getIterator(IteratorStart start, JObjectKey key) {
_lock.readLock().lock();
try {
Log.tracev("Getting snapshot {0} iterator for {1} {2}\n" +
"objects in snapshots: {3}", _id, start, key, _objects);
return new CheckingSnapshotKvIterator(new TombstoneMergingKvIterator<>("snapshot", start, key,
(tS, tK) -> new SnapshotKvIterator(_objects, _id, tS, tK),
(tS, tK) -> new MappingKvIterator<>(
writebackStore.getIterator(tS, tK), d -> d.version() <= _id ? new Data<>(d) : new Tombstone<>())
));
} finally {
_lock.readLock().unlock();
}
}
@Nonnull
public Optional<JDataVersionedWrapper> readObject(JObjectKey name) {
try (var it = getIterator(IteratorStart.GE, name)) {
if (it.hasNext()) {
if (!it.peekNextKey().equals(name)) {
return Optional.empty();
}
return Optional.of(it.next().getValue());
}
}
return Optional.empty();
}
@Override
public void close() {
if (_closed.getValue()) {
return;
}
_closed.setValue(true);
unrefSnapshot(_id);
}
}
public Snapshot createSnapshot() {
_lock.writeLock().lock();
try {
return new Snapshot(writebackStore.getLastTxId());
} finally {
_lock.writeLock().unlock();
}
}

View File

@@ -2,6 +2,7 @@ package com.usatiuk.dhfs.objects.transaction;
import com.usatiuk.dhfs.objects.*;
import com.usatiuk.dhfs.objects.persistence.IteratorStart;
import com.usatiuk.dhfs.objects.snapshot.Snapshot;
import com.usatiuk.dhfs.objects.snapshot.SnapshotManager;
import io.quarkus.logging.Log;
import jakarta.enterprise.context.ApplicationScoped;
@@ -50,13 +51,15 @@ public class TransactionFactoryImpl implements TransactionFactory {
}
private class TransactionImpl implements TransactionPrivate {
private boolean _closed = false;
private final Map<JObjectKey, TransactionObject<?>> _readSet = new HashMap<>();
private final NavigableMap<JObjectKey, TxRecord.TxObjectRecord<?>> _writes = new TreeMap<>();
private Map<JObjectKey, TxRecord.TxObjectRecord<?>> _newWrites = new HashMap<>();
private final List<Runnable> _onCommit = new ArrayList<>();
private final List<Runnable> _onFlush = new ArrayList<>();
private final SnapshotManager.Snapshot _snapshot;
private final Snapshot<JObjectKey, JDataVersionedWrapper> _snapshot;
private TransactionImpl() {
_snapshot = snapshotManager.createSnapshot();
@@ -139,7 +142,7 @@ public class TransactionFactoryImpl implements TransactionFactory {
}
@Override
public SnapshotManager.Snapshot snapshot() {
public Snapshot<JObjectKey, JDataVersionedWrapper> snapshot() {
return _snapshot;
}
@@ -253,6 +256,8 @@ public class TransactionFactoryImpl implements TransactionFactory {
@Override
public void close() {
if (_closed) return;
_closed = true;
_snapshot.close();
}
}

View File

@@ -1,7 +1,9 @@
package com.usatiuk.dhfs.objects.transaction;
import com.usatiuk.dhfs.objects.JData;
import com.usatiuk.dhfs.objects.JDataVersionedWrapper;
import com.usatiuk.dhfs.objects.JObjectKey;
import com.usatiuk.dhfs.objects.snapshot.Snapshot;
import com.usatiuk.dhfs.objects.snapshot.SnapshotManager;
import com.usatiuk.dhfs.utils.AutoCloseableNoThrow;
@@ -19,5 +21,5 @@ public interface TransactionPrivate extends Transaction, TransactionHandlePrivat
Collection<Runnable> getOnCommit();
SnapshotManager.Snapshot snapshot();
Snapshot<JObjectKey, JDataVersionedWrapper> snapshot();
}