From 75fec73b39445d6016cc8689549f2daa035af7f2 Mon Sep 17 00:00:00 2001 From: Stepan Usatiuk Date: Sat, 8 Mar 2025 00:35:56 +0100 Subject: [PATCH] reverse iterators and a bunch of fixes with lt/gt iterator start --- .../dhfs/objects/CloseableKvIterator.java | 16 +- .../java/com/usatiuk/dhfs/objects/Data.java | 10 + .../InconsistentKvIteratorWrapper.java | 134 ---------- .../InconsistentSelfRefreshingKvIterator.java | 148 ----------- .../objects/InvalidIteratorException.java | 11 - .../dhfs/objects/InvalidatableKvIterator.java | 77 ------ .../com/usatiuk/dhfs/objects/JDataDummy.java | 28 --- .../usatiuk/dhfs/objects/JObjectManager.java | 35 +-- .../dhfs/objects/KeyPredicateKvIterator.java | 129 ++++++++++ .../usatiuk/dhfs/objects/MaybeTombstone.java | 7 + .../dhfs/objects/MergingKvIterator.java | 28 ++- .../dhfs/objects/PredicateKvIterator.java | 12 +- .../objects/SelfRefreshingKvIterator.java | 125 ---------- .../dhfs/objects/StaleIteratorException.java | 11 - .../com/usatiuk/dhfs/objects/Tombstone.java | 10 + .../objects/TombstoneMergingKvIterator.java | 35 ++- .../com/usatiuk/dhfs/objects/TxBundle.java | 9 - .../WritebackObjectPersistentStore.java | 97 ++++---- .../CachingObjectPersistentStore.java | 150 ++++++------ .../LmdbObjectPersistentStore.java | 73 +++++- .../MemoryObjectPersistentStore.java | 25 +- .../persistence/ObjectPersistentStore.java | 13 +- .../SerializingObjectPersistentStore.java | 13 +- .../dhfs/objects/snapshot/SnapshotEntry.java | 2 + .../snapshot/SnapshotEntryDeleted.java | 4 + .../objects/snapshot/SnapshotEntryObject.java | 4 + .../objects/snapshot/SnapshotKvIterator.java | 120 +++++++-- .../objects/snapshot/SnapshotManager.java | 231 +++++++++--------- .../ReadTrackingObjectSourceFactory.java | 22 ++ .../transaction/TransactionFactory.java | 2 +- .../transaction/TransactionFactoryImpl.java | 17 +- .../objects/KeyPredicateKvIteratorTest.java | 154 ++++++++++++ .../dhfs/objects/MergingKvIteratorTest.java | 28 ++- .../dhfs/objects/ObjectsTestExtraChecks.java | 2 +- .../usatiuk/dhfs/objects/ObjectsTestImpl.java | 53 +++- .../dhfs/objects/PreCommitTxHookTest.java | 2 + .../dhfs/objects/PredicateKvIteratorTest.java | 7 +- .../persistence/LmdbKvIteratorTest.java | 21 +- .../dhfs/objects/jmap/JMapIterator.java | 21 ++ 39 files changed, 967 insertions(+), 919 deletions(-) create mode 100644 dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/Data.java delete mode 100644 dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/InconsistentKvIteratorWrapper.java delete mode 100644 dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/InconsistentSelfRefreshingKvIterator.java delete mode 100644 dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/InvalidIteratorException.java delete mode 100644 dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/InvalidatableKvIterator.java delete mode 100644 dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/JDataDummy.java create mode 100644 dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/KeyPredicateKvIterator.java create mode 100644 dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/MaybeTombstone.java delete mode 100644 dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/SelfRefreshingKvIterator.java delete mode 100644 dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/StaleIteratorException.java create mode 100644 dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/Tombstone.java delete mode 100644 dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/TxBundle.java create mode 100644 dhfs-parent/objects/src/test/java/com/usatiuk/dhfs/objects/KeyPredicateKvIteratorTest.java diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/CloseableKvIterator.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/CloseableKvIterator.java index 13b439af..7014f8a2 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/CloseableKvIterator.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/CloseableKvIterator.java @@ -10,21 +10,13 @@ public interface CloseableKvIterator, V> extends Iterato void skip(); - default K peekPrevKey() { - throw new UnsupportedOperationException(); - } + K peekPrevKey(); - default Pair prev() { - throw new UnsupportedOperationException(); - } + Pair prev(); - default boolean hasPrev() { - throw new UnsupportedOperationException(); - } + boolean hasPrev(); - default void skipPrev() { - throw new UnsupportedOperationException(); - } + void skipPrev(); default CloseableKvIterator reversed() { return new ReversedKvIterator<>(this); diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/Data.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/Data.java new file mode 100644 index 00000000..b1f7bcb7 --- /dev/null +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/Data.java @@ -0,0 +1,10 @@ +package com.usatiuk.dhfs.objects; + +import java.util.Optional; + +public record Data(V value) implements MaybeTombstone { + @Override + public Optional opt() { + return Optional.of(value); + } +} diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/InconsistentKvIteratorWrapper.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/InconsistentKvIteratorWrapper.java deleted file mode 100644 index d3da5bfa..00000000 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/InconsistentKvIteratorWrapper.java +++ /dev/null @@ -1,134 +0,0 @@ -package com.usatiuk.dhfs.objects; - -import com.usatiuk.dhfs.objects.persistence.IteratorStart; -import io.quarkus.logging.Log; -import org.apache.commons.lang3.tuple.Pair; - -import java.util.NoSuchElementException; -import java.util.function.Function; - -// Also checks that the next provided item is always consistent after a refresh -public class InconsistentKvIteratorWrapper, V> implements CloseableKvIterator { - private CloseableKvIterator _backing; - private final Function, CloseableKvIterator> _iteratorSupplier; - private K _lastReturnedKey = null; - private K _peekedKey = null; - private boolean _peekedNext = false; - private final Pair _initialStart; - - public InconsistentKvIteratorWrapper(Function, CloseableKvIterator> iteratorSupplier, IteratorStart start, K key) { - _iteratorSupplier = iteratorSupplier; - _initialStart = Pair.of(start, key); - while (true) { - try { - _backing = _iteratorSupplier.apply(Pair.of(start, key)); - break; - } catch (StaleIteratorException ignored) { - continue; - } - } - } - - private void refresh() { - Log.tracev("Refreshing iterator: {0}", _backing); - _backing.close(); - if (_peekedKey != null) { - _backing = _iteratorSupplier.apply(Pair.of(IteratorStart.GE, _peekedKey)); - if (!_backing.hasNext() || !_backing.peekNextKey().equals(_peekedKey)) { - assert false; - } - } else if (_lastReturnedKey != null) { - _backing = _iteratorSupplier.apply(Pair.of(IteratorStart.GT, _lastReturnedKey)); - } else { - _backing = _iteratorSupplier.apply(_initialStart); - } - - if (_peekedNext && !_backing.hasNext()) { - assert false; - } - } - - @Override - public K peekNextKey() { - while (true) { - if (_peekedKey != null) { - return _peekedKey; - } - try { - _peekedKey = _backing.peekNextKey(); - assert _lastReturnedKey == null || _peekedKey.compareTo(_lastReturnedKey) > 0; - } catch (NoSuchElementException ignored) { - assert !_peekedNext; - throw ignored; - } catch (StaleIteratorException ignored) { - refresh(); - continue; - } - _peekedNext = true; - Log.tracev("Peeked key: {0}", _peekedKey); - return _peekedKey; - } - } - - @Override - public void skip() { - while (true) { - try { - _lastReturnedKey = _backing.peekNextKey(); - _backing.skip(); - _peekedNext = false; - _peekedKey = null; - return; - } catch (NoSuchElementException ignored) { - assert !_peekedNext; - throw ignored; - } catch (StaleIteratorException ignored) { - refresh(); - continue; - } - } - } - - @Override - public void close() { - _backing.close(); - } - - @Override - public boolean hasNext() { - while (true) { - if (_peekedNext) { - return true; - } - try { - _peekedNext = _backing.hasNext(); - Log.tracev("Peeked next: {0}", _peekedNext); - return _peekedNext; - } catch (StaleIteratorException ignored) { - refresh(); - continue; - } - } - } - - @Override - public Pair next() { - while (true) { - try { - var got = _backing.next(); - assert _lastReturnedKey == null || _peekedKey.compareTo(_lastReturnedKey) > 0; - _peekedNext = false; - _peekedKey = null; - _lastReturnedKey = got.getKey(); - return got; - } catch (NoSuchElementException ignored) { - assert !_peekedNext; - throw ignored; - } catch (StaleIteratorException ignored) { - refresh(); - continue; - } - } - } - -} diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/InconsistentSelfRefreshingKvIterator.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/InconsistentSelfRefreshingKvIterator.java deleted file mode 100644 index c503ad1a..00000000 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/InconsistentSelfRefreshingKvIterator.java +++ /dev/null @@ -1,148 +0,0 @@ -package com.usatiuk.dhfs.objects; - -import com.usatiuk.dhfs.objects.persistence.IteratorStart; -import io.quarkus.logging.Log; -import org.apache.commons.lang3.tuple.Pair; - -import java.util.concurrent.locks.Lock; -import java.util.function.Function; -import java.util.function.Supplier; - -// Also checks that the next provided item is always consistent after a refresh -public class InconsistentSelfRefreshingKvIterator, V> implements CloseableKvIterator { - private CloseableKvIterator _backing; - private long _curVersion = -1L; - private final Lock _lock; - private final Function, CloseableKvIterator> _iteratorSupplier; - private final Supplier _versionSupplier; - private K _lastReturnedKey = null; - private K _peekedKey = null; - private boolean _peekedNext = false; - private final Pair _initialStart; - - public InconsistentSelfRefreshingKvIterator(Function, CloseableKvIterator> iteratorSupplier, Supplier versionSupplier, Lock lock, - IteratorStart start, K key) { - _iteratorSupplier = iteratorSupplier; - _versionSupplier = versionSupplier; - _lock = lock; - _initialStart = Pair.of(start, key); - - _lock.lock(); - try { - long curVersion = _versionSupplier.get(); - _backing = _iteratorSupplier.apply(Pair.of(start, key)); - _curVersion = curVersion; - } finally { - _lock.unlock(); - } - } - - private void maybeRefresh() { - _lock.lock(); - CloseableKvIterator oldBacking = null; - try { - if (_versionSupplier.get() == _curVersion) { - return; - } - long newVersion = _versionSupplier.get(); - oldBacking = _backing; - if (_peekedKey != null) { - _backing = _iteratorSupplier.apply(Pair.of(IteratorStart.GE, _peekedKey)); - if (!_backing.hasNext() || !_backing.peekNextKey().equals(_peekedKey)) { - throw new StaleIteratorException(); - } - } else if (_lastReturnedKey != null) { - _backing = _iteratorSupplier.apply(Pair.of(IteratorStart.GT, _lastReturnedKey)); - if (_backing.hasNext() && !(_backing.peekNextKey().compareTo(_lastReturnedKey) > 0)) { - throw new StaleIteratorException(); - } - } else { - _backing = _iteratorSupplier.apply(_initialStart); - } - - if (_peekedNext && !_backing.hasNext()) { - throw new StaleIteratorException(); - } - - Log.tracev("Refreshed iterator last refreshed {0}, current version {1}", - _curVersion, newVersion); - - _curVersion = newVersion; - } finally { - _lock.unlock(); - if (oldBacking != null) { - oldBacking.close(); - } - } - } - - @Override - public K peekNextKey() { - if (_peekedKey != null) { - return _peekedKey; - } - _lock.lock(); - try { - maybeRefresh(); - _peekedKey = _backing.peekNextKey(); - assert _lastReturnedKey == null || _peekedKey.compareTo(_lastReturnedKey) > 0; - _peekedNext = true; - Log.tracev("Peeked key: {0}", _peekedKey); - return _peekedKey; - } finally { - _lock.unlock(); - } - } - - @Override - public void skip() { - _lock.lock(); - try { - maybeRefresh(); - _lastReturnedKey = _backing.peekNextKey(); - _backing.skip(); - _peekedNext = false; - _peekedKey = null; - } finally { - _lock.unlock(); - } - } - - @Override - public void close() { - _backing.close(); - } - - @Override - public boolean hasNext() { - if (_peekedNext) { - return true; - } - _lock.lock(); - try { - maybeRefresh(); - _peekedNext = _backing.hasNext(); - Log.tracev("Peeked next: {0}", _peekedNext); - return _peekedNext; - } finally { - _lock.unlock(); - } - } - - @Override - public Pair next() { - _lock.lock(); - try { - maybeRefresh(); - var got = _backing.next(); - assert _lastReturnedKey == null || got.getKey().compareTo(_lastReturnedKey) > 0; - _peekedNext = false; - _peekedKey = null; - _lastReturnedKey = got.getKey(); - return got; - } finally { - _lock.unlock(); - } - } - -} diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/InvalidIteratorException.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/InvalidIteratorException.java deleted file mode 100644 index fa2bb988..00000000 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/InvalidIteratorException.java +++ /dev/null @@ -1,11 +0,0 @@ -package com.usatiuk.dhfs.objects; - -public class InvalidIteratorException extends RuntimeException { - public InvalidIteratorException() { - super(); - } - - public InvalidIteratorException(String message) { - super(message); - } -} diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/InvalidatableKvIterator.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/InvalidatableKvIterator.java deleted file mode 100644 index a83b36a4..00000000 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/InvalidatableKvIterator.java +++ /dev/null @@ -1,77 +0,0 @@ -package com.usatiuk.dhfs.objects; - -import io.quarkus.logging.Log; -import org.apache.commons.lang3.tuple.Pair; - -import java.util.concurrent.locks.Lock; -import java.util.function.Supplier; - -public class InvalidatableKvIterator, V> implements CloseableKvIterator { - private final CloseableKvIterator _backing; - private final Supplier _versionSupplier; - private final long _version; - private final Lock _lock; - - public InvalidatableKvIterator(CloseableKvIterator backing, Supplier versionSupplier, Lock lock) { - _backing = backing; - _versionSupplier = versionSupplier; - _lock = lock; - _version = _versionSupplier.get(); - } - - private void checkVersion() { - if (_versionSupplier.get() != _version) { - Log.errorv("Version mismatch: {0} != {1}", _versionSupplier.get(), _version); - throw new InvalidIteratorException(); - } - } - - @Override - public K peekNextKey() { - _lock.lock(); - try { - checkVersion(); - return _backing.peekNextKey(); - } finally { - _lock.unlock(); - } - } - - @Override - public void skip() { - _lock.lock(); - try { - checkVersion(); - _backing.skip(); - } finally { - _lock.unlock(); - } - } - - @Override - public void close() { - _backing.close(); - } - - @Override - public boolean hasNext() { - _lock.lock(); - try { - checkVersion(); - return _backing.hasNext(); - } finally { - _lock.unlock(); - } - } - - @Override - public Pair next() { - _lock.lock(); - try { - checkVersion(); - return _backing.next(); - } finally { - _lock.unlock(); - } - } -} diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/JDataDummy.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/JDataDummy.java deleted file mode 100644 index cbc3dc29..00000000 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/JDataDummy.java +++ /dev/null @@ -1,28 +0,0 @@ -package com.usatiuk.dhfs.objects; - -public class JDataDummy implements JData { - public static final JObjectKey TX_ID_OBJ_NAME = JObjectKey.of("tx_id"); - private static final JDataDummy INSTANCE = new JDataDummy(); - - public static JDataDummy getInstance() { - return INSTANCE; - } - - @Override - public JObjectKey key() { - return TX_ID_OBJ_NAME; - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - return true; - } - - // hashCode - @Override - public int hashCode() { - return 0; - } -} diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/JObjectManager.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/JObjectManager.java index 6aedd59a..37e6798d 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/JObjectManager.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/JObjectManager.java @@ -12,7 +12,6 @@ import jakarta.enterprise.inject.Instance; import jakarta.inject.Inject; import java.util.*; -import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; import java.util.function.Function; import java.util.stream.Stream; @@ -24,7 +23,6 @@ import java.util.stream.Stream; @ApplicationScoped public class JObjectManager { private final List _preCommitTxHooks; - private final AtomicLong _txCounter = new AtomicLong(); private boolean _ready = false; @Inject SnapshotManager snapshotManager; @@ -38,10 +36,6 @@ public class JObjectManager { } void init(@Observes @Priority(200) StartupEvent event) { - var read = snapshotManager.readObjectDirect(JDataDummy.TX_ID_OBJ_NAME).orElse(null); - if (read != null) { - _txCounter.set(read.version()); - } _ready = true; } @@ -51,14 +45,9 @@ public class JObjectManager { public TransactionPrivate createTransaction() { verifyReady(); - while (true) { - try { - var tx = transactionFactory.createTransaction(_txCounter.get()); - Log.tracev("Created transaction with snapshotId={0}", tx.snapshot().id()); - return tx; - } catch (SnapshotManager.IllegalSnapshotIdException ignored) { - } - } + var tx = transactionFactory.createTransaction(); + Log.tracev("Created transaction with snapshotId={0}", tx.snapshot().id()); + return tx; } public TransactionHandle commit(TransactionPrivate tx) { @@ -102,10 +91,6 @@ public class JObjectManager { Log.trace("Commit iteration with " + currentIteration.size() + " records for hook " + hook.getClass()); for (var entry : currentIteration.entrySet()) { - // FIXME: Kinda hack? - if (entry.getKey().equals(JDataDummy.TX_ID_OBJ_NAME)) { - continue; - } somethingChanged = true; Log.trace("Running pre-commit hook " + hook.getClass() + " for" + entry.getKey()); var oldObj = getCurrent.apply(entry.getKey()); @@ -150,14 +135,9 @@ public class JObjectManager { } } } - - Log.trace("Committing transaction start"); - // FIXME: Better way? - addDependency.accept(JDataDummy.TX_ID_OBJ_NAME); - writes.put(JDataDummy.TX_ID_OBJ_NAME, new TxRecord.TxObjectRecordWrite<>(JDataDummy.getInstance())); + Log.trace("Committing transaction start"); var snapshotId = tx.snapshot().id(); - var newId = _txCounter.get() + 1; for (var read : readSet.entrySet()) { var dep = dependenciesLocked.get(read.getKey()); @@ -182,7 +162,6 @@ public class JObjectManager { Log.trace("Checking dependency " + read.getKey() + " - ok with read"); } - Log.tracef("Committing transaction %d to storage", newId); var addFlushCallback = snapshotManager.commitTx( writes.values().stream() .filter(r -> { @@ -194,11 +173,7 @@ public class JObjectManager { } } return true; - }).toList(), - newId); - - var realNewId = _txCounter.getAndIncrement() + 1; - assert realNewId == newId; + }).toList()); for (var callback : tx.getOnCommit()) { callback.run(); diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/KeyPredicateKvIterator.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/KeyPredicateKvIterator.java new file mode 100644 index 00000000..b43308d2 --- /dev/null +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/KeyPredicateKvIterator.java @@ -0,0 +1,129 @@ +package com.usatiuk.dhfs.objects; + +import com.usatiuk.dhfs.objects.persistence.IteratorStart; +import org.apache.commons.lang3.tuple.Pair; + +import java.util.NoSuchElementException; +import java.util.function.Function; + +public class KeyPredicateKvIterator, V> extends ReversibleKvIterator { + private final CloseableKvIterator _backing; + private final Function _filter; + private K _next; + + public KeyPredicateKvIterator(CloseableKvIterator backing, IteratorStart start, K startKey, Function filter) { + _goingForward = true; + _backing = backing; + _filter = filter; + fillNext(); + + boolean shouldGoBack = false; + if (start == IteratorStart.LE) { + if (_next == null || _next.compareTo(startKey) > 0) { + shouldGoBack = true; + } + } else if (start == IteratorStart.LT) { + if (_next == null || _next.compareTo(startKey) >= 0) { + shouldGoBack = true; + } + } + + if (shouldGoBack && _backing.hasPrev()) { + _goingForward = false; + _next = null; + fillNext(); + if (_next != null) + _backing.skipPrev(); + _goingForward = true; +// _backing.skip(); + fillNext(); + } + + + switch (start) { + case LT -> { +// assert _next == null || _next.getKey().compareTo(startKey) < 0; + } + case LE -> { +// assert _next == null || _next.getKey().compareTo(startKey) <= 0; + } + case GT -> { + assert _next == null || _next.compareTo(startKey) > 0; + } + case GE -> { + assert _next == null || _next.compareTo(startKey) >= 0; + } + } + } + + private void fillNext() { + while ((_goingForward ? _backing.hasNext() : _backing.hasPrev()) && _next == null) { + var next = _goingForward ? _backing.peekNextKey() : _backing.peekPrevKey(); + if (!_filter.apply(next)) { + if (_goingForward) + _backing.skip(); + else + _backing.skipPrev(); + continue; + } + _next = next; + } + } + + @Override + protected void reverse() { + _goingForward = !_goingForward; + _next = null; + + fillNext(); + } + + @Override + protected K peekImpl() { + if (_next == null) + throw new NoSuchElementException(); + return _next; + } + + @Override + protected void skipImpl() { + if (_next == null) + throw new NoSuchElementException(); + _next = null; + if (_goingForward) + _backing.skip(); + else + _backing.skipPrev(); + fillNext(); + } + + @Override + protected boolean hasImpl() { + return _next != null; + } + + @Override + protected Pair nextImpl() { + if (_next == null) + throw new NoSuchElementException("No more elements"); + var retKey = _next; + _next = null; + var got = _goingForward ? _backing.next() : _backing.prev(); + assert got.getKey().equals(retKey); + fillNext(); + return got; + } + + @Override + public void close() { + _backing.close(); + } + + @Override + public String toString() { + return "KeyPredicateKvIterator{" + + "_backing=" + _backing + + ", _next=" + _next + + '}'; + } +} diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/MaybeTombstone.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/MaybeTombstone.java new file mode 100644 index 00000000..f6d47c71 --- /dev/null +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/MaybeTombstone.java @@ -0,0 +1,7 @@ +package com.usatiuk.dhfs.objects; + +import java.util.Optional; + +public interface MaybeTombstone { + Optional opt(); +} diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/MergingKvIterator.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/MergingKvIterator.java index a4f193c6..78c8e482 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/MergingKvIterator.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/MergingKvIterator.java @@ -5,6 +5,7 @@ import io.quarkus.logging.Log; import org.apache.commons.lang3.tuple.Pair; import java.util.*; +import java.util.stream.Collectors; public class MergingKvIterator, V> extends ReversibleKvIterator { private final Map, Integer> _iterators; @@ -22,16 +23,22 @@ public class MergingKvIterator, V> extends ReversibleKvI // Starting at a greatest key less than/less or equal than: // We have a bunch of iterators that have given us theirs "greatest LT/LE key" // now we need to pick the greatest of those to start with + // But if some of them don't have a lesser key, we need to pick the smallest of those var initialIterators = iterators.stream().map(p -> p.get(initialStartType, initialStartKey)).toList(); try { - K initialMaxValue = initialIterators.stream() + IteratorStart finalStartType = startType; + var found = initialIterators.stream() .filter(CloseableKvIterator::hasNext) .map((i) -> { var peeked = i.peekNextKey(); // Log.warnv("peeked: {0}, from {1}", peeked, i.getClass()); return peeked; - }) - .max(Comparator.naturalOrder()).orElse(null); + }).distinct().collect(Collectors.partitioningBy(e -> finalStartType == IteratorStart.LE ? e.compareTo(initialStartKey) <= 0 : e.compareTo(initialStartKey) < 0)); + K initialMaxValue; + if (!found.get(true).isEmpty()) + initialMaxValue = found.get(true).stream().max(Comparator.naturalOrder()).orElse(null); + else + initialMaxValue = found.get(false).stream().min(Comparator.naturalOrder()).orElse(null); if (initialMaxValue == null) { fail = true; } @@ -61,12 +68,12 @@ public class MergingKvIterator, V> extends ReversibleKvI Log.tracev("{0} Created: {1}", _name, _sortedIterators); switch (initialStartType) { - case LT -> { - assert _sortedIterators.isEmpty() || _sortedIterators.firstKey().compareTo(initialStartKey) < 0; - } - case LE -> { - assert _sortedIterators.isEmpty() || _sortedIterators.firstKey().compareTo(initialStartKey) <= 0; - } +// case LT -> { +// assert _sortedIterators.isEmpty() || _sortedIterators.firstKey().compareTo(initialStartKey) < 0; +// } +// case LE -> { +// assert _sortedIterators.isEmpty() || _sortedIterators.firstKey().compareTo(initialStartKey) <= 0; +// } case GT -> { assert _sortedIterators.isEmpty() || _sortedIterators.firstKey().compareTo(initialStartKey) > 0; } @@ -88,6 +95,7 @@ public class MergingKvIterator, V> extends ReversibleKvI } K key = iterator.peekNextKey(); + Log.tracev("{0} Advance peeked: {1}-{2}", _name, iterator, key); if (!_sortedIterators.containsKey(key)) { _sortedIterators.put(key, iterator); return; @@ -110,6 +118,7 @@ public class MergingKvIterator, V> extends ReversibleKvI @Override protected void reverse() { var cur = _goingForward ? _sortedIterators.pollFirstEntry() : _sortedIterators.pollLastEntry(); + Log.tracev("{0} Reversing from {1}", _name, cur); _goingForward = !_goingForward; _sortedIterators.clear(); for (CloseableKvIterator iterator : _iterators.keySet()) { @@ -126,6 +135,7 @@ public class MergingKvIterator, V> extends ReversibleKvI || (!_goingForward && peekImpl().compareTo(cur.getKey()) >= 0))) { skipImpl(); } + Log.tracev("{0} Reversed to {1}", _name, _sortedIterators); } @Override diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/PredicateKvIterator.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/PredicateKvIterator.java index 22b6dbe8..cfe85ffa 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/PredicateKvIterator.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/PredicateKvIterator.java @@ -1,6 +1,7 @@ package com.usatiuk.dhfs.objects; import com.usatiuk.dhfs.objects.persistence.IteratorStart; +import io.quarkus.logging.Log; import org.apache.commons.lang3.tuple.Pair; import java.util.NoSuchElementException; @@ -68,13 +69,18 @@ public class PredicateKvIterator, V, V_T> extends Revers @Override protected void reverse() { _goingForward = !_goingForward; - _next = null; + boolean wasAtEnd = _next == null; - if (_goingForward && _backing.hasNext()) + if (_goingForward && !wasAtEnd) _backing.skip(); - else if (!_goingForward && _backing.hasPrev()) + else if (!_goingForward && !wasAtEnd) _backing.skipPrev(); + if (!wasAtEnd) + Log.tracev("Skipped in reverse: {0}", _next); + + _next = null; + fillNext(); } diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/SelfRefreshingKvIterator.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/SelfRefreshingKvIterator.java deleted file mode 100644 index 1ce8dd05..00000000 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/SelfRefreshingKvIterator.java +++ /dev/null @@ -1,125 +0,0 @@ -package com.usatiuk.dhfs.objects; - -import com.usatiuk.dhfs.objects.persistence.IteratorStart; -import io.quarkus.logging.Log; -import org.apache.commons.lang3.tuple.Pair; - -import java.util.NoSuchElementException; -import java.util.concurrent.locks.Lock; -import java.util.function.Function; -import java.util.function.Supplier; - -// Also checks that the next provided item is always consistent after a refresh -public class SelfRefreshingKvIterator, V> implements CloseableKvIterator { - private CloseableKvIterator _backing; - private long _curVersion = -1L; - private final Lock _lock; - private final Function, CloseableKvIterator> _iteratorSupplier; - private final Supplier _versionSupplier; - private Pair _next; - - public SelfRefreshingKvIterator(Function, CloseableKvIterator> iteratorSupplier, Supplier versionSupplier, Lock lock, - IteratorStart start, K key) { - _iteratorSupplier = iteratorSupplier; - _versionSupplier = versionSupplier; - _lock = lock; - - _lock.lock(); - try { - long curVersion = _versionSupplier.get(); - _backing = _iteratorSupplier.apply(Pair.of(start, key)); - _next = _backing.hasNext() ? _backing.next() : null; - _curVersion = curVersion; - } finally { - _lock.unlock(); - } - } - - private void maybeRefresh() { - _lock.lock(); - CloseableKvIterator oldBacking = null; - try { - if (_versionSupplier.get() == _curVersion) { - return; - } - Log.tracev("Refreshing iterator last refreshed {0}, current version {1}, current value {2}", - _curVersion, _versionSupplier.get(), _next); - long newVersion = _versionSupplier.get(); - oldBacking = _backing; - _backing = _iteratorSupplier.apply(Pair.of(IteratorStart.GE, _next.getKey())); - var next = _backing.hasNext() ? _backing.next() : null; - if (next == null) { - Log.errorv("Failed to refresh iterator, null last refreshed {0}," + - " current version {1}, current value {2}, read value {3}", _curVersion, newVersion, _next, next); - assert false; - } else if (!next.equals(_next)) { - Log.errorv("Failed to refresh iterator, mismatch last refreshed {0}," + - " current version {1}, current value {2}, read value {3}", _curVersion, newVersion, _next, next); - assert false; - } - Log.tracev("Refreshed iterator last refreshed {0}, current version {1}, old value {2}, new value {3}", - _curVersion, newVersion, _next, next); - - _next = next; - _curVersion = newVersion; - } finally { - _lock.unlock(); - if (oldBacking != null) { - oldBacking.close(); - } - } - } - - // _next should always be valid, so it's ok to do the refresh "lazily" - private void prepareNext() { - _lock.lock(); - try { - maybeRefresh(); - if (_backing.hasNext()) { - _next = _backing.next(); - } else { - _next = null; - } - } finally { - _lock.unlock(); - } - } - - @Override - public K peekNextKey() { - if (_next == null) { - throw new NoSuchElementException(); - } - return _next.getKey(); - } - - @Override - public void skip() { - if (_next == null) { - throw new NoSuchElementException(); - } - prepareNext(); - } - - @Override - public void close() { - _backing.close(); - } - - @Override - public boolean hasNext() { - return _next != null; - } - - @Override - public Pair next() { - if (_next == null) { - throw new NoSuchElementException("No more elements"); - } - var ret = _next; - prepareNext(); - Log.tracev("Read: {0}, next: {1}", ret, _next); - return ret; - } - -} diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/StaleIteratorException.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/StaleIteratorException.java deleted file mode 100644 index 249f1c2f..00000000 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/StaleIteratorException.java +++ /dev/null @@ -1,11 +0,0 @@ -package com.usatiuk.dhfs.objects; - -public class StaleIteratorException extends RuntimeException { - public StaleIteratorException() { - super(); - } - - public StaleIteratorException(String message) { - super(message); - } -} diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/Tombstone.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/Tombstone.java new file mode 100644 index 00000000..62a7ca1c --- /dev/null +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/Tombstone.java @@ -0,0 +1,10 @@ +package com.usatiuk.dhfs.objects; + +import java.util.Optional; + +public record Tombstone() implements MaybeTombstone { + @Override + public Optional opt() { + return Optional.empty(); + } +} diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/TombstoneMergingKvIterator.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/TombstoneMergingKvIterator.java index d84bdd79..e8e01e27 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/TombstoneMergingKvIterator.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/TombstoneMergingKvIterator.java @@ -10,7 +10,7 @@ public class TombstoneMergingKvIterator, V> implements C private final CloseableKvIterator _backing; private final String _name; - public TombstoneMergingKvIterator(String name, IteratorStart startType, K startKey, List>> iterators) { + public TombstoneMergingKvIterator(String name, IteratorStart startType, K startKey, List>> iterators) { _name = name; _backing = new PredicateKvIterator<>( new MergingKvIterator<>(name + "-merging", startType, startKey, iterators), @@ -20,24 +20,15 @@ public class TombstoneMergingKvIterator, V> implements C if (pair instanceof Tombstone) { return null; } - return ((Data) pair).value; + return ((Data) pair).value(); }); } @SafeVarargs - public TombstoneMergingKvIterator(String name, IteratorStart startType, K startKey, IterProdFn>... iterators) { + public TombstoneMergingKvIterator(String name, IteratorStart startType, K startKey, IterProdFn>... iterators) { this(name, startType, startKey, List.of(iterators)); } - public interface DataType { - } - - public record Tombstone() implements DataType { - } - - public record Data(V value) implements DataType { - } - @Override public K peekNextKey() { return _backing.peekNextKey(); @@ -48,6 +39,26 @@ public class TombstoneMergingKvIterator, V> implements C _backing.skip(); } + @Override + public K peekPrevKey() { + return _backing.peekPrevKey(); + } + + @Override + public Pair prev() { + return _backing.prev(); + } + + @Override + public boolean hasPrev() { + return _backing.hasPrev(); + } + + @Override + public void skipPrev() { + _backing.skipPrev(); + } + @Override public void close() { _backing.close(); diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/TxBundle.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/TxBundle.java deleted file mode 100644 index 8068e262..00000000 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/TxBundle.java +++ /dev/null @@ -1,9 +0,0 @@ -package com.usatiuk.dhfs.objects; - -public interface TxBundle { - long getId(); - - void commit(JDataVersionedWrapper obj); - - void delete(JObjectKey obj); -} diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/WritebackObjectPersistentStore.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/WritebackObjectPersistentStore.java index 56c84aed..2fb14558 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/WritebackObjectPersistentStore.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/WritebackObjectPersistentStore.java @@ -25,20 +25,21 @@ import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.BiConsumer; import java.util.function.Consumer; @ApplicationScoped public class WritebackObjectPersistentStore { - private final LinkedList _pendingBundles = new LinkedList<>(); + private final LinkedList _pendingBundles = new LinkedList<>(); private final AtomicReference> _pendingWrites = new AtomicReference<>(TreePMap.empty()); private final ReentrantReadWriteLock _pendingWritesVersionLock = new ReentrantReadWriteLock(); - private final AtomicLong _pendingWritesVersion = new AtomicLong(); - private final LinkedHashMap _notFlushedBundles = new LinkedHashMap<>(); + private final LinkedHashMap _notFlushedBundles = new LinkedHashMap<>(); private final Object _flushWaitSynchronizer = new Object(); private final AtomicLong _lastWrittenTx = new AtomicLong(-1); private final AtomicLong _counter = new AtomicLong(); + private final AtomicLong _lastCommittedTx = new AtomicLong(-1); private final AtomicLong _waitedTotal = new AtomicLong(0); @Inject CachingObjectPersistentStore cachedStore; @@ -70,6 +71,8 @@ public class WritebackObjectPersistentStore { } catch (InterruptedException ignored) { } }); + _counter.set(cachedStore.getLastTxId()); + _lastCommittedTx.set(cachedStore.getLastTxId()); _ready = true; } @@ -94,7 +97,7 @@ public class WritebackObjectPersistentStore { private void writeback() { while (!Thread.interrupted()) { try { - TxBundleImpl bundle = new TxBundleImpl(0); + TxBundle bundle = new TxBundle(0); synchronized (_pendingBundles) { while (_pendingBundles.isEmpty() || !_pendingBundles.peek()._ready) _pendingBundles.wait(); @@ -116,11 +119,11 @@ public class WritebackObjectPersistentStore { for (var e : bundle._entries.values()) { switch (e) { - case TxBundleImpl.CommittedEntry(JObjectKey key, JDataVersionedWrapper data, int size) -> { + case TxBundle.CommittedEntry(JObjectKey key, JDataVersionedWrapper data, int size) -> { Log.trace("Writing new " + key); toWrite.add(Pair.of(key, data)); } - case TxBundleImpl.DeletedEntry(JObjectKey key) -> { + case TxBundle.DeletedEntry(JObjectKey key) -> { Log.trace("Deleting from persistent storage " + key); toDelete.add(key); } @@ -132,11 +135,13 @@ public class WritebackObjectPersistentStore { new TxManifestObj<>( Collections.unmodifiableList(toWrite), Collections.unmodifiableList(toDelete) - )); + ), bundle.getId()); Log.trace("Bundle " + bundle.getId() + " committed"); // Remove from pending writes, after real commit + // As we are the only writers to _pendingWrites, no need to synchronize with iterator creation + // if they get the older version, as it will still contain all the new changes synchronized (_pendingBundles) { var curPw = _pendingWrites.get(); for (var e : bundle._entries.values()) { @@ -219,7 +224,7 @@ public class WritebackObjectPersistentStore { } } synchronized (_notFlushedBundles) { - var bundle = new TxBundleImpl(_counter.incrementAndGet()); + var bundle = new TxBundle(_counter.incrementAndGet()); _pendingBundles.addLast(bundle); _notFlushedBundles.put(bundle.getId(), bundle); return bundle; @@ -234,26 +239,28 @@ public class WritebackObjectPersistentStore { try { synchronized (_pendingBundles) { var curPw = _pendingWrites.get(); - for (var e : ((TxBundleImpl) bundle)._entries.values()) { + for (var e : ((TxBundle) bundle)._entries.values()) { switch (e) { - case TxBundleImpl.CommittedEntry c -> { + case TxBundle.CommittedEntry c -> { curPw = curPw.plus(c.key(), new PendingWrite(c.data, bundle.getId())); } - case TxBundleImpl.DeletedEntry d -> { + case TxBundle.DeletedEntry d -> { curPw = curPw.plus(d.key(), new PendingDelete(d.key, bundle.getId())); } default -> throw new IllegalStateException("Unexpected value: " + e); } } + // Now, make the changes visible to new iterators _pendingWrites.set(curPw); - ((TxBundleImpl) bundle).setReady(); - _pendingWritesVersion.incrementAndGet(); + ((TxBundle) bundle).setReady(); if (_pendingBundles.peek() == bundle) _pendingBundles.notify(); synchronized (_flushWaitSynchronizer) { - currentSize += ((TxBundleImpl) bundle).calculateTotalSize(); + currentSize += ((TxBundle) bundle).calculateTotalSize(); } } + assert bundle.getId() > _lastCommittedTx.get(); + _lastCommittedTx.set(bundle.getId()); } finally { _pendingWritesVersionLock.writeLock().unlock(); } @@ -263,9 +270,9 @@ public class WritebackObjectPersistentStore { verifyReady(); synchronized (_pendingBundles) { Log.warn("Dropped bundle: " + bundle); - _pendingBundles.remove((TxBundleImpl) bundle); + _pendingBundles.remove((TxBundle) bundle); synchronized (_flushWaitSynchronizer) { - currentSize -= ((TxBundleImpl) bundle).calculateTotalSize(); + currentSize -= ((TxBundle) bundle).calculateTotalSize(); } } } @@ -296,7 +303,7 @@ public class WritebackObjectPersistentStore { } } - private static class TxBundleImpl implements TxBundle { + private static class TxBundle { private final LinkedHashMap _entries = new LinkedHashMap<>(); private final ArrayList _callbacks = new ArrayList<>(); private long _txId; @@ -304,7 +311,7 @@ public class WritebackObjectPersistentStore { private long _size = -1; private boolean _wasCommitted = false; - private TxBundleImpl(long txId) { + private TxBundle(long txId) { _txId = txId; } @@ -348,7 +355,7 @@ public class WritebackObjectPersistentStore { return _size; } - public void compress(TxBundleImpl other) { + public void compress(TxBundle other) { if (_txId >= other._txId) throw new IllegalArgumentException("Compressing an older bundle into newer"); @@ -412,14 +419,20 @@ public class WritebackObjectPersistentStore { return new VerboseReadResultPersisted(cachedStore.readObject(key)); } - public Consumer commitTx(Collection> writes, long id) { + /** + * @param commitLocked - a function that will be called with a Consumer of a new transaction id, + * that will commit the transaction the changes in the store will be visible to new transactions + * only after the runnable is called + */ + public Consumer commitTx(Collection> writes, BiConsumer commitLocked) { var bundle = createBundle(); + long bundleId = bundle.getId(); try { for (var action : writes) { switch (action) { case TxRecord.TxObjectRecordWrite write -> { Log.trace("Flushing object " + write.key()); - bundle.commit(new JDataVersionedWrapper(write.data(), id)); + bundle.commit(new JDataVersionedWrapper(write.data(), bundleId)); } case TxRecord.TxObjectRecordDeleted deleted -> { Log.trace("Deleting object " + deleted.key()); @@ -435,10 +448,11 @@ public class WritebackObjectPersistentStore { throw new TxCommitException(t.getMessage(), t); } - Log.tracef("Committing transaction %d to storage", id); - commitBundle(bundle); - long bundleId = bundle.getId(); + Log.tracef("Committing transaction %d to storage", bundleId); + commitLocked.accept(bundleId, () -> { + commitBundle(bundle); + }); return r -> asyncFence(bundleId, r); } @@ -451,29 +465,26 @@ public class WritebackObjectPersistentStore { _pendingWritesVersionLock.readLock().lock(); try { var curPending = _pendingWrites.get(); - - return new InvalidatableKvIterator<>( - new InconsistentKvIteratorWrapper<>( - p -> - new TombstoneMergingKvIterator<>("writeback-ps", p.getLeft(), p.getRight(), - (tS, tK) -> new MappingKvIterator<>( - new NavigableMapKvIterator<>(curPending, tS, tK), - e -> switch (e) { - case PendingWrite pw -> - new TombstoneMergingKvIterator.Data<>(pw.data()); - case PendingDelete d -> - new TombstoneMergingKvIterator.Tombstone<>(); - default -> - throw new IllegalStateException("Unexpected value: " + e); - }), - (tS, tK) -> new MappingKvIterator<>(cachedStore.getIterator(tS, tK), TombstoneMergingKvIterator.Data::new)), start, key), - _pendingWritesVersion::get, _pendingWritesVersionLock.readLock()); + return new TombstoneMergingKvIterator<>("writeback-ps", start, key, + (tS, tK) -> new MappingKvIterator<>( + new NavigableMapKvIterator<>(curPending, tS, tK), + e -> switch (e) { + case PendingWrite pw -> new Data<>(pw.data()); + case PendingDelete d -> new Tombstone<>(); + default -> throw new IllegalStateException("Unexpected value: " + e); + }), + (tS, tK) -> cachedStore.getIterator(tS, tK)); } finally { _pendingWritesVersionLock.readLock().unlock(); } } - public CloseableKvIterator getIterator(JObjectKey key) { - return getIterator(IteratorStart.GE, key); + public long getLastTxId() { + _pendingWritesVersionLock.readLock().lock(); + try { + return _lastCommittedTx.get(); + } finally { + _pendingWritesVersionLock.readLock().unlock(); + } } } diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/persistence/CachingObjectPersistentStore.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/persistence/CachingObjectPersistentStore.java index 1d0c1b98..c3bd22dd 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/persistence/CachingObjectPersistentStore.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/persistence/CachingObjectPersistentStore.java @@ -8,29 +8,24 @@ import jakarta.enterprise.context.ApplicationScoped; import jakarta.inject.Inject; import org.apache.commons.lang3.tuple.Pair; import org.eclipse.microprofile.config.inject.ConfigProperty; +import org.pcollections.TreePMap; import javax.annotation.Nonnull; -import java.util.Collection; -import java.util.HashSet; import java.util.LinkedHashMap; import java.util.Optional; -import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantReadWriteLock; -import java.util.stream.Stream; @ApplicationScoped public class CachingObjectPersistentStore { private final LinkedHashMap _cache = new LinkedHashMap<>(8, 0.75f, true); - private final ConcurrentSkipListMap _sortedCache = new ConcurrentSkipListMap<>(); + private TreePMap _sortedCache = TreePMap.empty(); + private long _cacheVersion = 0; - private final AtomicLong _cacheVersion = new AtomicLong(0); - private final ReentrantReadWriteLock _cacheVersionLock = new ReentrantReadWriteLock(); - - private final HashSet _pendingWrites = new HashSet<>(); + private final ReentrantReadWriteLock _lock = new ReentrantReadWriteLock(); private final DataLocker _locker = new DataLocker(); + @Inject SerializingObjectPersistentStore delegate; @ConfigProperty(name = "dhfs.objects.lru.limit") @@ -61,90 +56,78 @@ public class CachingObjectPersistentStore { } } - @Nonnull - public Collection findAllObjects() { - return delegate.findAllObjects(); - } - private void put(JObjectKey key, Optional obj) { // Log.tracev("Adding {0} to cache: {1}", key, obj); - synchronized (_cache) { - assert !_pendingWrites.contains(key); - int size = obj.map(o -> o.data().estimateSize()).orElse(0); + _lock.writeLock().lock(); + try { + int size = obj.map(o -> o.data().estimateSize()).orElse(16); _curSize += size; - var entry = new CacheEntry(obj, size); + var entry = new CacheEntry(obj.>map(Data::new).orElse(new Tombstone<>()), size); var old = _cache.putLast(key, entry); - _sortedCache.put(key, entry); + + _sortedCache = _sortedCache.plus(key, entry); if (old != null) _curSize -= old.size(); while (_curSize >= sizeLimit) { var del = _cache.pollFirstEntry(); - _sortedCache.remove(del.getKey(), del.getValue()); + _sortedCache = _sortedCache.minus(del.getKey()); _curSize -= del.getValue().size(); _evict++; } + } finally { + _lock.writeLock().unlock(); } } @Nonnull public Optional readObject(JObjectKey name) { try (var lock = _locker.lock(name)) { - synchronized (_cache) { + _lock.readLock().lock(); + try { var got = _cache.get(name); if (got != null) { - return got.object(); + return got.object().opt(); } + } finally { + _lock.readLock().unlock(); } - var got = delegate.readObject(name); - put(name, got); - return got; + // TODO: This is possibly racy +// var got = delegate.readObject(name); +// put(name, got); + return delegate.readObject(name); } } - public void commitTx(TxManifestObj names) { + public void commitTx(TxManifestObj names, long txId) { var serialized = delegate.prepareManifest(names); - _cacheVersionLock.writeLock().lock(); - try { - // During commit, readObject shouldn't be called for these items, - // it should be handled by the upstream store - synchronized (_cache) { + Log.tracev("Committing: {0} writes, {1} deletes", names.written().size(), names.deleted().size()); + delegate.commitTx(serialized, txId, (commit) -> { + _lock.writeLock().lock(); + try { + // Make the changes visible atomically both in cache and in the underlying store for (var write : names.written()) { put(write.getLeft(), Optional.of(write.getRight())); - var added = _pendingWrites.add(write.getLeft()); - assert added; } for (var del : names.deleted()) { - // TODO: tombstone cache? - _curSize -= Optional.ofNullable(_cache.get(del)).map(CacheEntry::size).orElse(0L); - _cache.remove(del); - _sortedCache.remove(del); - var added = _pendingWrites.add(del); - assert added; + put(del, Optional.empty()); } + ++_cacheVersion; + commit.run(); + } finally { + _lock.writeLock().unlock(); } - Log.tracev("Committing: {0} writes, {1} deletes", names.written().size(), names.deleted().size()); - delegate.commitTx(serialized); - // Now, reading from the backing store should return the new data - synchronized (_cache) { - for (var key : Stream.concat(names.written().stream().map(Pair::getLeft), - names.deleted().stream()).toList()) { - var removed = _pendingWrites.remove(key); - assert removed; - } - } - _cacheVersion.incrementAndGet(); - Log.tracev("Committed: {0} writes, {1} deletes", names.written().size(), names.deleted().size()); - } finally { - _cacheVersionLock.writeLock().unlock(); - } + }); + Log.tracev("Committed: {0} writes, {1} deletes", names.written().size(), names.deleted().size()); } private class CachingKvIterator implements CloseableKvIterator { private final CloseableKvIterator _delegate; + // This should be created under lock + private final long _curCacheVersion = _cacheVersion; private CachingKvIterator(CloseableKvIterator delegate) { _delegate = delegate; @@ -175,11 +158,24 @@ public class CachingObjectPersistentStore { return _delegate.peekPrevKey(); } + private void maybeCache(Pair prev) { + _lock.writeLock().lock(); + try { + if (_cacheVersion != _curCacheVersion) { + Log.tracev("Not caching: {0}", prev); + } else { + Log.tracev("Caching: {0}", prev); + put(prev.getKey(), Optional.of(prev.getValue())); + } + } finally { + _lock.writeLock().unlock(); + } + } + @Override public Pair prev() { var prev = _delegate.prev(); - Log.tracev("Caching: {0}", prev); - put(prev.getKey(), Optional.of(prev.getValue())); + maybeCache(prev); return prev; } @@ -196,8 +192,7 @@ public class CachingObjectPersistentStore { @Override public Pair next() { var next = _delegate.next(); - Log.tracev("Caching: {0}", next); - put(next.getKey(), Optional.of(next.getValue())); + maybeCache(next); return next; } } @@ -206,30 +201,31 @@ public class CachingObjectPersistentStore { // Does not have to guarantee consistent view, snapshots are handled by upper layers // Warning: it has a nasty side effect of global caching, so in this case don't even call next on it, // if some objects are still in writeback - public CloseableKvIterator getIterator(IteratorStart start, JObjectKey key) { - Log.tracev("Getting cache iterator: {0}, {1}", start, key); - _cacheVersionLock.readLock().lock(); + public CloseableKvIterator> getIterator(IteratorStart start, JObjectKey key) { + _lock.readLock().lock(); try { - return new InconsistentSelfRefreshingKvIterator<>( - p -> new MergingKvIterator<>("cache", p.getLeft(), p.getRight(), - (mS, mK) -> new PredicateKvIterator<>( - new NavigableMapKvIterator<>(_sortedCache, mS, mK), - mS, mK, - e -> { - Log.tracev("Taken from cache: {0}", e); - return e.object().orElse(null); - } - ), (mS, mK) -> new CachingKvIterator(delegate.getIterator(mS, mK))), _cacheVersion::get, - _cacheVersionLock.readLock(), start, key); + Log.tracev("Getting cache iterator: {0}, {1}", start, key); + var curSortedCache = _sortedCache; + return new MergingKvIterator<>("cache", start, key, + (mS, mK) + -> new MappingKvIterator<>( + new NavigableMapKvIterator<>(curSortedCache, mS, mK), + e -> { + Log.tracev("Taken from cache: {0}", e); + return e.object(); + } + ), + (mS, mK) + -> new MappingKvIterator<>(new CachingKvIterator(delegate.getIterator(mS, mK)), Data::new)); } finally { - _cacheVersionLock.readLock().unlock(); + _lock.readLock().unlock(); } } - public CloseableKvIterator getIterator(JObjectKey key) { - return getIterator(IteratorStart.GE, key); + private record CacheEntry(MaybeTombstone object, long size) { } - private record CacheEntry(Optional object, long size) { + public long getLastTxId() { + return delegate.getLastCommitId(); } } diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/persistence/LmdbObjectPersistentStore.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/persistence/LmdbObjectPersistentStore.java index a38f964c..080b51ab 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/persistence/LmdbObjectPersistentStore.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/persistence/LmdbObjectPersistentStore.java @@ -3,6 +3,7 @@ package com.usatiuk.dhfs.objects.persistence; import com.google.protobuf.ByteString; import com.usatiuk.dhfs.objects.CloseableKvIterator; import com.usatiuk.dhfs.objects.JObjectKey; +import com.usatiuk.dhfs.objects.KeyPredicateKvIterator; import com.usatiuk.dhfs.objects.ReversibleKvIterator; import com.usatiuk.dhfs.supportlib.UninitializedByteBuffer; import io.quarkus.arc.properties.IfBuildProperty; @@ -21,11 +22,11 @@ import javax.annotation.Nonnull; import java.io.IOException; import java.lang.ref.Cleaner; import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; import java.nio.file.Path; -import java.util.Collection; -import java.util.List; -import java.util.NoSuchElementException; -import java.util.Optional; +import java.util.*; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Consumer; import static org.lmdbjava.DbiFlags.MDB_CREATE; import static org.lmdbjava.Env.create; @@ -38,7 +39,12 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore { private Dbi _db; private boolean _ready = false; + private long _lastTxId = 0; + + private final ReentrantReadWriteLock _lock = new ReentrantReadWriteLock(); + private static final String DB_NAME = "objects"; + private static final byte[] DB_VER_OBJ_NAME = "__DB_VER_OBJ".getBytes(StandardCharsets.UTF_8); public LmdbObjectPersistentStore(@ConfigProperty(name = "dhfs.objects.persistence.files.root") String root) { _root = Path.of(root).resolve("objects"); @@ -54,6 +60,20 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore { .setMaxDbs(1) .open(_root.toFile(), EnvFlags.MDB_NOTLS); _db = _env.openDbi(DB_NAME, MDB_CREATE); + + var bb = ByteBuffer.allocateDirect(DB_VER_OBJ_NAME.length); + bb.put(DB_VER_OBJ_NAME); + bb.flip(); + + try (Txn txn = _env.txnRead()) { + var value = _db.get(txn, bb); + if (value != null) { + var ver = value.getLong(); + Log.infov("Read version: {0}", ver); + _lastTxId = ver; + } + } + _ready = true; } @@ -100,13 +120,16 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore { private static final Cleaner CLEANER = Cleaner.create(); private final MutableObject _closed = new MutableObject<>(false); + private final Exception _allocationStacktrace = new Exception(); LmdbKvIterator(IteratorStart start, JObjectKey key) { _goingForward = true; var closedRef = _closed; + var bt = _allocationStacktrace; CLEANER.register(this, () -> { if (!closedRef.getValue()) { - Log.error("Iterator was not closed before GC"); + Log.error("Iterator was not closed before GC, allocated at: {0}", bt); + System.exit(-1); } }); @@ -238,11 +261,11 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore { @Override public CloseableKvIterator getIterator(IteratorStart start, JObjectKey key) { - return new LmdbKvIterator(start, key); + return new KeyPredicateKvIterator<>(new LmdbKvIterator(start, key), start, key, (k) -> !Arrays.equals(k.name().getBytes(StandardCharsets.UTF_8), DB_VER_OBJ_NAME)); } @Override - public void commitTx(TxManifestRaw names) { + public void commitTx(TxManifestRaw names, long txId, Consumer commitLocked) { verifyReady(); try (Txn txn = _env.txnWrite()) { for (var written : names.written()) { @@ -255,7 +278,31 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore { for (JObjectKey key : names.deleted()) { _db.delete(txn, key.toByteBuffer()); } - txn.commit(); + + var bb = ByteBuffer.allocateDirect(DB_VER_OBJ_NAME.length); + bb.put(DB_VER_OBJ_NAME); + bb.flip(); + var bbData = ByteBuffer.allocateDirect(8); + + commitLocked.accept(() -> { + _lock.writeLock().lock(); + try { + var realTxId = txId; + if (realTxId == -1) + realTxId = _lastTxId + 1; + + assert realTxId > _lastTxId; + _lastTxId = realTxId; + + bbData.putLong(realTxId); + bbData.flip(); + _db.put(txn, bb, bbData); + + txn.commit(); + } finally { + _lock.writeLock().unlock(); + } + }); } } @@ -277,4 +324,14 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore { return _root.toFile().getUsableSpace(); } + @Override + public long getLastCommitId() { + _lock.readLock().lock(); + try { + return _lastTxId; + } finally { + _lock.readLock().unlock(); + } + } + } diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/persistence/MemoryObjectPersistentStore.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/persistence/MemoryObjectPersistentStore.java index 7bba672a..0cf640bf 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/persistence/MemoryObjectPersistentStore.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/persistence/MemoryObjectPersistentStore.java @@ -11,11 +11,15 @@ import javax.annotation.Nonnull; import java.util.Collection; import java.util.Optional; import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Consumer; @ApplicationScoped @IfBuildProperty(name = "dhfs.objects.persistence", stringValue = "memory") public class MemoryObjectPersistentStore implements ObjectPersistentStore { private final ConcurrentSkipListMap _objects = new ConcurrentSkipListMap<>(); + private long _lastCommitId = 0; + private final ReentrantReadWriteLock _lock = new ReentrantReadWriteLock(); @Nonnull @Override @@ -39,7 +43,7 @@ public class MemoryObjectPersistentStore implements ObjectPersistentStore { } @Override - public void commitTx(TxManifestRaw names) { + public void commitTx(TxManifestRaw names, long txId, Consumer commitLocked) { synchronized (this) { for (var written : names.written()) { _objects.put(written.getKey(), written.getValue()); @@ -47,6 +51,15 @@ public class MemoryObjectPersistentStore implements ObjectPersistentStore { for (JObjectKey key : names.deleted()) { _objects.remove(key); } + commitLocked.accept(() -> { + _lock.writeLock().lock(); + try { + assert txId > _lastCommitId; + _lastCommitId = txId; + } finally { + _lock.writeLock().unlock(); + } + }); } } @@ -64,4 +77,14 @@ public class MemoryObjectPersistentStore implements ObjectPersistentStore { public long getUsableSpace() { return 0; } + + @Override + public long getLastCommitId() { + _lock.readLock().lock(); + try { + return _lastCommitId; + } finally { + _lock.readLock().unlock(); + } + } } diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/persistence/ObjectPersistentStore.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/persistence/ObjectPersistentStore.java index 3467007b..bcb08401 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/persistence/ObjectPersistentStore.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/persistence/ObjectPersistentStore.java @@ -7,6 +7,7 @@ import com.usatiuk.dhfs.objects.JObjectKey; import javax.annotation.Nonnull; import java.util.Collection; import java.util.Optional; +import java.util.function.Consumer; // Persistent storage of objects // All changes are written as sequential transactions @@ -21,15 +22,17 @@ public interface ObjectPersistentStore { // Does not have to guarantee consistent view, snapshots are handled by upper layers CloseableKvIterator getIterator(IteratorStart start, JObjectKey key); - default CloseableKvIterator getIterator(JObjectKey key) { - return getIterator(IteratorStart.GE, key); - } - - void commitTx(TxManifestRaw names); + /** + * @param commitLocked - a function that will be called with a Runnable that will commit the transaction + * the changes in the store will be visible to new transactions only after the runnable is called + */ + void commitTx(TxManifestRaw names, long txId, Consumer commitLocked); long getTotalSpace(); long getFreeSpace(); long getUsableSpace(); + + long getLastCommitId(); } diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/persistence/SerializingObjectPersistentStore.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/persistence/SerializingObjectPersistentStore.java index a38604db..f439731e 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/persistence/SerializingObjectPersistentStore.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/persistence/SerializingObjectPersistentStore.java @@ -8,6 +8,7 @@ import org.apache.commons.lang3.tuple.Pair; import javax.annotation.Nonnull; import java.util.Collection; import java.util.Optional; +import java.util.function.Consumer; @ApplicationScoped public class SerializingObjectPersistentStore { @@ -41,11 +42,15 @@ public class SerializingObjectPersistentStore { , names.deleted()); } - void commitTx(TxManifestObj names) { - delegateStore.commitTx(prepareManifest(names)); +// void commitTx(TxManifestObj names, Consumer commitLocked) { +// delegateStore.commitTx(prepareManifest(names), commitLocked); +// } + + void commitTx(TxManifestRaw names, long txId, Consumer commitLocked) { + delegateStore.commitTx(names, txId, commitLocked); } - void commitTx(TxManifestRaw names) { - delegateStore.commitTx(names); + long getLastCommitId() { + return delegateStore.getLastCommitId(); } } diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/snapshot/SnapshotEntry.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/snapshot/SnapshotEntry.java index 1cdefc96..e783a2cf 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/snapshot/SnapshotEntry.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/snapshot/SnapshotEntry.java @@ -2,4 +2,6 @@ package com.usatiuk.dhfs.objects.snapshot; public interface SnapshotEntry { long whenToRemove(); + + SnapshotEntry withWhenToRemove(long whenToRemove); } diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/snapshot/SnapshotEntryDeleted.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/snapshot/SnapshotEntryDeleted.java index 3b0dbd6f..71113d45 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/snapshot/SnapshotEntryDeleted.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/snapshot/SnapshotEntryDeleted.java @@ -1,4 +1,8 @@ package com.usatiuk.dhfs.objects.snapshot; public record SnapshotEntryDeleted(long whenToRemove) implements SnapshotEntry { + @Override + public SnapshotEntryDeleted withWhenToRemove(long whenToRemove) { + return new SnapshotEntryDeleted(whenToRemove); + } } diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/snapshot/SnapshotEntryObject.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/snapshot/SnapshotEntryObject.java index 78036e17..98cfbefc 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/snapshot/SnapshotEntryObject.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/snapshot/SnapshotEntryObject.java @@ -3,4 +3,8 @@ package com.usatiuk.dhfs.objects.snapshot; import com.usatiuk.dhfs.objects.JDataVersionedWrapper; public record SnapshotEntryObject(JDataVersionedWrapper data, long whenToRemove) implements SnapshotEntry { + @Override + public SnapshotEntryObject withWhenToRemove(long whenToRemove) { + return new SnapshotEntryObject(data, whenToRemove); + } } diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/snapshot/SnapshotKvIterator.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/snapshot/SnapshotKvIterator.java index 2b52fae4..1d045665 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/snapshot/SnapshotKvIterator.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/snapshot/SnapshotKvIterator.java @@ -7,42 +7,98 @@ import org.apache.commons.lang3.tuple.Pair; import java.util.NavigableMap; import java.util.NoSuchElementException; +import java.util.Optional; -public class SnapshotKvIterator implements CloseableKvIterator> { +// TODO: test me +public class SnapshotKvIterator extends ReversibleKvIterator> { private final NavigableMap _objects; private final long _version; private final CloseableKvIterator _backing; - private Pair> _next = null; + private Pair> _next = null; public SnapshotKvIterator(NavigableMap objects, long version, IteratorStart start, JObjectKey startKey) { _objects = objects; _version = version; - _backing = new NavigableMapKvIterator<>(_objects, start, new SnapshotKey(startKey, 0L)); - fillNext(); - if (_next == null) { - return; + _goingForward = true; + _backing = new NavigableMapKvIterator<>(_objects, start, new SnapshotKey(startKey, Long.MIN_VALUE)); + fill(); + + boolean shouldGoBack = false; + if (start == IteratorStart.LE) { + if (_next == null || _next.getKey().compareTo(startKey) > 0) { + shouldGoBack = true; + } + } else if (start == IteratorStart.LT) { + if (_next == null || _next.getKey().compareTo(startKey) >= 0) { + shouldGoBack = true; + } } + + if (shouldGoBack && _backing.hasPrev()) { + _goingForward = false; + _backing.skipPrev(); + fill(); + _goingForward = true; + _backing.skip(); + fill(); + } + + switch (start) { case LT -> { - assert _next.getKey().compareTo(startKey) < 0; +// assert _next == null || _next.getKey().compareTo(startKey) < 0; } case LE -> { - assert _next.getKey().compareTo(startKey) <= 0; +// assert _next == null || _next.getKey().compareTo(startKey) <= 0; } case GT -> { - assert _next.getKey().compareTo(startKey) > 0; + assert _next == null || _next.getKey().compareTo(startKey) > 0; } case GE -> { - assert _next.getKey().compareTo(startKey) >= 0; + assert _next == null || _next.getKey().compareTo(startKey) >= 0; + } + } + + } + + private void fillPrev(JObjectKey ltKey) { + if (ltKey != null) + while (_backing.hasPrev() && _backing.peekPrevKey().key().equals(ltKey)) { + Log.tracev("Snapshot skipping prev: {0}", _backing.peekPrevKey()); + _backing.skipPrev(); + } + + _next = null; + + while (_backing.hasPrev() && _next == null) { + var prev = _backing.prev(); + if (prev.getKey().version() <= _version && prev.getValue().whenToRemove() > _version) { + Log.tracev("Snapshot skipping prev: {0} (too new)", prev); + _next = switch (prev.getValue()) { + case SnapshotEntryObject(JDataVersionedWrapper data, long whenToRemove) -> + Pair.of(prev.getKey().key(), new Data<>(data)); + case SnapshotEntryDeleted(long whenToRemove) -> Pair.of(prev.getKey().key(), new Tombstone<>()); + default -> throw new IllegalStateException("Unexpected value: " + prev.getValue()); + }; + } + } + + if (_next != null) { + if (_next.getValue() instanceof Data( + JDataVersionedWrapper value + )) { + assert value.version() <= _version; } } } private void fillNext() { + _next = null; while (_backing.hasNext() && _next == null) { var next = _backing.next(); var nextNextKey = _backing.hasNext() ? _backing.peekNextKey() : null; while (nextNextKey != null && nextNextKey.key().equals(next.getKey().key()) && nextNextKey.version() <= _version) { + Log.tracev("Snapshot skipping next: {0} (too old)", next); next = _backing.next(); nextNextKey = _backing.hasNext() ? _backing.peekNextKey() : null; } @@ -50,14 +106,13 @@ public class SnapshotKvIterator implements CloseableKvIterator _version) { _next = switch (next.getValue()) { case SnapshotEntryObject(JDataVersionedWrapper data, long whenToRemove) -> - Pair.of(next.getKey().key(), new TombstoneMergingKvIterator.Data<>(data)); - case SnapshotEntryDeleted(long whenToRemove) -> - Pair.of(next.getKey().key(), new TombstoneMergingKvIterator.Tombstone<>()); + Pair.of(next.getKey().key(), new Data<>(data)); + case SnapshotEntryDeleted(long whenToRemove) -> Pair.of(next.getKey().key(), new Tombstone<>()); default -> throw new IllegalStateException("Unexpected value: " + next.getValue()); }; } if (_next != null) { - if (_next.getValue() instanceof TombstoneMergingKvIterator.Data( + if (_next.getValue() instanceof Data( JDataVersionedWrapper value )) { assert value.version() <= _version; @@ -66,19 +121,39 @@ public class SnapshotKvIterator implements CloseableKvIterator> next() { + public Pair> nextImpl() { if (_next == null) throw new NoSuchElementException("No more elements"); var ret = _next; - if (ret.getValue() instanceof TombstoneMergingKvIterator.Data( + if (ret.getValue() instanceof Data( JDataVersionedWrapper value )) { assert value.version() <= _version; } - _next = null; - fillNext(); + fill(); Log.tracev("Read: {0}, next: {1}", ret, _next); return ret; } diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/snapshot/SnapshotManager.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/snapshot/SnapshotManager.java index bb7be190..77b36f46 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/snapshot/SnapshotManager.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/snapshot/SnapshotManager.java @@ -10,15 +10,13 @@ import jakarta.inject.Inject; import org.apache.commons.lang3.mutable.MutableObject; import org.apache.commons.lang3.tuple.Pair; import org.eclipse.microprofile.config.inject.ConfigProperty; +import org.pcollections.TreePMap; import javax.annotation.Nonnull; import java.lang.ref.Cleaner; import java.util.*; -import java.util.concurrent.ConcurrentSkipListMap; -import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Consumer; -import java.util.function.Function; @ApplicationScoped public class SnapshotManager { @@ -32,10 +30,9 @@ public class SnapshotManager { private long _lastSnapshotId = 0; private long _lastAliveSnapshotId = -1; - private final AtomicLong _snapshotVersion = new AtomicLong(0); private final Queue _snapshotIds = new ArrayDeque<>(); - private final ConcurrentSkipListMap _objects = new ConcurrentSkipListMap<>(); + private TreePMap _objects = TreePMap.empty(); private final TreeMap> _snapshotBounds = new TreeMap<>(); private final HashMap _snapshotRefCounts = new HashMap<>(); @@ -44,67 +41,78 @@ public class SnapshotManager { assert _snapshotIds.isEmpty() || _snapshotIds.peek() == _lastAliveSnapshotId; } - public Consumer commitTx(Collection> writes, long id) { + // This should not be called for the same objects concurrently + public Consumer commitTx(Collection> writes) { +// _lock.writeLock().lock(); +// try { +// if (!_snapshotIds.isEmpty()) { +// verify(); + HashMap newEntries = new HashMap<>(); + for (var action : writes) { + var current = writebackStore.readObjectVerbose(action.key()); + // Add to snapshot the previous visible version of the replaced object + // I.e. should be visible to all transactions with id <= id + // and at least as its corresponding version + Pair newSnapshotEntry = switch (current) { + case WritebackObjectPersistentStore.VerboseReadResultPersisted( + Optional data + ) -> Pair.of(new SnapshotKey(action.key(), data.map(JDataVersionedWrapper::version).orElse(-1L)), + data.map(o -> new SnapshotEntryObject(o, -1)).orElse(new SnapshotEntryDeleted(-1))); + case WritebackObjectPersistentStore.VerboseReadResultPending( + PendingWriteEntry pending + ) -> { + yield switch (pending) { + case PendingWrite write -> + Pair.of(new SnapshotKey(action.key(), write.bundleId()), new SnapshotEntryObject(write.data(), -1)); + case PendingDelete delete -> + Pair.of(new SnapshotKey(action.key(), delete.bundleId()), new SnapshotEntryDeleted(-1)); + default -> throw new IllegalStateException("Unexpected value: " + pending); + }; + } + default -> throw new IllegalStateException("Unexpected value: " + current); + }; + + + Log.tracev("Adding snapshot entry {0}", newSnapshotEntry); + + newEntries.put(newSnapshotEntry.getLeft(), newSnapshotEntry.getRight()); + } + _lock.writeLock().lock(); try { - assert id > _lastSnapshotId; - if (!_snapshotIds.isEmpty()) { - verify(); - for (var action : writes) { - var current = writebackStore.readObjectVerbose(action.key()); - // Add to snapshot the previous visible version of the replaced object - // I.e. should be visible to all transactions with id <= id - // and at least as its corresponding version - Pair newSnapshotEntry = switch (current) { - case WritebackObjectPersistentStore.VerboseReadResultPersisted( - Optional data - ) -> - Pair.of(new SnapshotKey(action.key(), Math.max(_snapshotIds.peek(), data.map(JDataVersionedWrapper::version).orElse(0L))), - data.map(o -> new SnapshotEntryObject(o, id)).orElse(new SnapshotEntryDeleted(id))); - case WritebackObjectPersistentStore.VerboseReadResultPending( - PendingWriteEntry pending - ) -> { - assert pending.bundleId() < id; - yield switch (pending) { - case PendingWrite write -> - Pair.of(new SnapshotKey(action.key(), write.bundleId()), new SnapshotEntryObject(write.data(), id)); - case PendingDelete delete -> - Pair.of(new SnapshotKey(action.key(), delete.bundleId()), new SnapshotEntryDeleted(id)); - default -> throw new IllegalStateException("Unexpected value: " + pending); - }; + return writebackStore.commitTx(writes, (id, commit) -> { + if (!_snapshotIds.isEmpty()) { + assert id > _lastSnapshotId; + for (var newSnapshotEntry : newEntries.entrySet()) { + assert newSnapshotEntry.getKey().version() < id; + var realNewSnapshotEntry = newSnapshotEntry.getValue().withWhenToRemove(id); + if (realNewSnapshotEntry instanceof SnapshotEntryObject re) { + assert re.data().version() <= newSnapshotEntry.getKey().version(); } - default -> throw new IllegalStateException("Unexpected value: " + current); - }; - - if (newSnapshotEntry.getValue() instanceof SnapshotEntryObject re) { - assert re.data().version() <= newSnapshotEntry.getKey().version(); - } - if (newSnapshotEntry.getValue() instanceof SnapshotEntryObject re) { - assert re.data().version() <= newSnapshotEntry.getKey().version(); - } - - Log.tracev("Adding snapshot entry {0}", newSnapshotEntry); - - var val = _objects.put(newSnapshotEntry.getLeft(), newSnapshotEntry.getRight()); + _objects = _objects.plus(newSnapshotEntry.getKey(), realNewSnapshotEntry); // assert val == null; - _snapshotBounds.merge(newSnapshotEntry.getLeft().version(), new ArrayDeque<>(List.of(newSnapshotEntry.getLeft())), - (a, b) -> { - a.addAll(b); - return a; - }); + _snapshotBounds.merge(newSnapshotEntry.getKey().version(), new ArrayDeque<>(List.of(newSnapshotEntry.getKey())), + (a, b) -> { + a.addAll(b); + return a; + }); + } } - - _snapshotVersion.incrementAndGet(); - } - - verify(); - // Commit under lock, iterators will see new version after the lock is released and writeback - // cache is updated - // TODO: Maybe writeback iterator being invalidated wouldn't be a problem? - return writebackStore.commitTx(writes, id); + commit.run(); + }); } finally { _lock.writeLock().unlock(); } + +// } + +// verify(); + // Commit under lock, iterators will see new version after the lock is released and writeback + // cache is updated + // TODO: Maybe writeback iterator being invalidated wouldn't be a problem? +// } finally { +// _lock.writeLock().unlock(); +// } } private void unrefSnapshot(long id) { @@ -144,11 +152,11 @@ public class SnapshotManager { Log.tracev("Could not find place to place entry {0}, curId={1}, nextId={2}, whenToRemove={3}, snapshotIds={4}", entry, finalCurId, finalNextId, entry.whenToRemove(), _snapshotIds); } else if (finalNextId < entry.whenToRemove()) { - _objects.put(new SnapshotKey(key.key(), finalNextId), entry); + _objects = _objects.plus(new SnapshotKey(key.key(), finalNextId), entry); assert finalNextId > finalCurId; toReAdd.add(Pair.of(finalNextId, new SnapshotKey(key.key(), finalNextId))); } - _objects.remove(key); + _objects = _objects.minus(key); }); toReAdd.forEach(p -> { @@ -232,92 +240,72 @@ public class SnapshotManager { @Override public JObjectKey peekNextKey() { - try { - return _backing.peekNextKey(); - } catch (StaleIteratorException e) { - assert false; - throw e; - } + return _backing.peekNextKey(); } @Override public void skip() { - try { - _backing.skip(); - } catch (StaleIteratorException e) { - assert false; - throw e; - } + _backing.skip(); + } + + @Override + public JObjectKey peekPrevKey() { + return _backing.peekPrevKey(); + } + + @Override + public Pair prev() { + var ret = _backing.prev(); + assert ret.getValue().version() <= _id; + return ret; + } + + @Override + public boolean hasPrev() { + return _backing.hasPrev(); + } + + @Override + public void skipPrev() { + _backing.skipPrev(); } @Override public void close() { - try { - _backing.close(); - } catch (StaleIteratorException e) { - assert false; - throw e; - } + _backing.close(); } @Override public boolean hasNext() { - try { - return _backing.hasNext(); - } catch (StaleIteratorException e) { - assert false; - throw e; - } + return _backing.hasNext(); } @Override public Pair next() { - try { - var ret = _backing.next(); - assert ret.getValue().version() <= _id; - return ret; - } catch (StaleIteratorException e) { - assert false; - throw e; - } + var ret = _backing.next(); + assert ret.getValue().version() <= _id; + return ret; } } public CloseableKvIterator getIterator(IteratorStart start, JObjectKey key) { - // In case something was added to the snapshot, it is not guaranteed that the iterators will see it, - // so refresh them manually. Otherwise, it could be possible that something from the writeback cache will - // be served instead. Note that refreshing the iterator will also refresh the writeback iterator, - // so it also should be consistent. - Log.tracev("Getting snapshot {0} iterator for {1} {2}", _id, start, key); _lock.readLock().lock(); try { - Function, CloseableKvIterator> iteratorFactory = - p -> new TombstoneMergingKvIterator<>("snapshot", p.getKey(), p.getValue(), - (tS, tK) -> new SnapshotKvIterator(_objects, _id, tS, tK), - (tS, tK) -> new MappingKvIterator<>( - writebackStore.getIterator(tS, tK), - d -> d.version() <= _id ? new TombstoneMergingKvIterator.Data<>(d) : new TombstoneMergingKvIterator.Tombstone<>()) - ); - - var backing = extraChecks ? new SelfRefreshingKvIterator<>( - iteratorFactory, _snapshotVersion::get, _lock.readLock(), start, key - ) : new InconsistentSelfRefreshingKvIterator<>( - iteratorFactory, _snapshotVersion::get, _lock.readLock(), start, key - ); - - return new CheckingSnapshotKvIterator(backing); + Log.tracev("Getting snapshot {0} iterator for {1} {2}\n" + + "objects in snapshots: {3}", _id, start, key, _objects); + return new CheckingSnapshotKvIterator(new TombstoneMergingKvIterator<>("snapshot", start, key, + (tS, tK) -> new SnapshotKvIterator(_objects, _id, tS, tK), + (tS, tK) -> new MappingKvIterator<>( + writebackStore.getIterator(tS, tK), d -> d.version() <= _id ? new Data<>(d) : new Tombstone<>()) + )); } finally { _lock.readLock().unlock(); } } - public CloseableKvIterator getIterator(JObjectKey key) { - return getIterator(IteratorStart.GE, key); - } - @Nonnull public Optional readObject(JObjectKey name) { - try (var it = getIterator(name)) { + try (var it = getIterator(IteratorStart.GE, name)) { if (it.hasNext()) { if (!it.peekNextKey().equals(name)) { return Optional.empty(); @@ -338,8 +326,13 @@ public class SnapshotManager { } } - public Snapshot createSnapshot(long id) { - return new Snapshot(id); + public Snapshot createSnapshot() { + _lock.writeLock().lock(); + try { + return new Snapshot(writebackStore.getLastTxId()); + } finally { + _lock.writeLock().unlock(); + } } @Nonnull diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/ReadTrackingObjectSourceFactory.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/ReadTrackingObjectSourceFactory.java index eae1d216..e609081b 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/ReadTrackingObjectSourceFactory.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/ReadTrackingObjectSourceFactory.java @@ -90,6 +90,28 @@ public class ReadTrackingObjectSourceFactory { _backing.skip(); } + @Override + public JObjectKey peekPrevKey() { + return _backing.peekPrevKey(); + } + + @Override + public Pair prev() { + var got = _backing.prev(); + _readSet.putIfAbsent(got.getKey(), new TransactionObjectNoLock<>(Optional.of(got.getValue()))); + return Pair.of(got.getKey(), got.getValue().data()); + } + + @Override + public boolean hasPrev() { + return _backing.hasPrev(); + } + + @Override + public void skipPrev() { + _backing.skipPrev(); + } + @Override public void close() { _backing.close(); diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/TransactionFactory.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/TransactionFactory.java index c4007d69..634daa22 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/TransactionFactory.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/TransactionFactory.java @@ -1,5 +1,5 @@ package com.usatiuk.dhfs.objects.transaction; public interface TransactionFactory { - TransactionPrivate createTransaction(long snapshotId); + TransactionPrivate createTransaction(); } diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/TransactionFactoryImpl.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/TransactionFactoryImpl.java index 29c03c12..331fb033 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/TransactionFactoryImpl.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/TransactionFactoryImpl.java @@ -18,24 +18,22 @@ public class TransactionFactoryImpl implements TransactionFactory { ReadTrackingObjectSourceFactory readTrackingObjectSourceFactory; @Override - public TransactionPrivate createTransaction(long snapshotId) { - Log.tracev("Trying to create transaction with snapshotId={0}", snapshotId); - return new TransactionImpl(snapshotId); + public TransactionPrivate createTransaction() { + return new TransactionImpl(); } private class TransactionImpl implements TransactionPrivate { private final ReadTrackingTransactionObjectSource _source; private final NavigableMap> _writes = new TreeMap<>(); - private long _writeVersion = 0; private Map> _newWrites = new HashMap<>(); private final List _onCommit = new ArrayList<>(); private final List _onFlush = new ArrayList<>(); private final SnapshotManager.Snapshot _snapshot; - private TransactionImpl(long snapshotId) { - _snapshot = snapshotManager.createSnapshot(snapshotId); + private TransactionImpl() { + _snapshot = snapshotManager.createSnapshot(); _source = readTrackingObjectSourceFactory.create(_snapshot); } @@ -108,12 +106,11 @@ public class TransactionFactoryImpl implements TransactionFactory { Log.tracev("Getting tx iterator with start={0}, key={1}", start, key); return new TombstoneMergingKvIterator<>("tx", start, key, (tS, tK) -> new MappingKvIterator<>(new NavigableMapKvIterator<>(_writes, tS, tK), t -> switch (t) { - case TxRecord.TxObjectRecordWrite write -> - new TombstoneMergingKvIterator.Data<>(write.data()); - case TxRecord.TxObjectRecordDeleted deleted -> new TombstoneMergingKvIterator.Tombstone<>(); + case TxRecord.TxObjectRecordWrite write -> new Data<>(write.data()); + case TxRecord.TxObjectRecordDeleted deleted -> new Tombstone<>(); case null, default -> null; }), - (tS, tK) -> new MappingKvIterator<>(_source.getIterator(tS, tK), TombstoneMergingKvIterator.Data::new)); + (tS, tK) -> new MappingKvIterator<>(_source.getIterator(tS, tK), Data::new)); } @Override diff --git a/dhfs-parent/objects/src/test/java/com/usatiuk/dhfs/objects/KeyPredicateKvIteratorTest.java b/dhfs-parent/objects/src/test/java/com/usatiuk/dhfs/objects/KeyPredicateKvIteratorTest.java new file mode 100644 index 00000000..055f4f29 --- /dev/null +++ b/dhfs-parent/objects/src/test/java/com/usatiuk/dhfs/objects/KeyPredicateKvIteratorTest.java @@ -0,0 +1,154 @@ +package com.usatiuk.dhfs.objects; + +import com.usatiuk.dhfs.objects.persistence.IteratorStart; +import org.apache.commons.lang3.tuple.Pair; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.pcollections.TreePMap; + +import java.util.List; + +public class KeyPredicateKvIteratorTest { + + @Test + public void simpleTest() { + var source1 = TreePMap.empty().plus(3, 3).plus(5, 5).plus(6, 6); + var pit = new KeyPredicateKvIterator<>(new NavigableMapKvIterator<>(source1, IteratorStart.GT, 3), + IteratorStart.GE, 3, v -> (v % 2 == 0)); + var expected = List.of(Pair.of(6, 6)); + for (var pair : expected) { + Assertions.assertTrue(pit.hasNext()); + Assertions.assertEquals(pair, pit.next()); + } + } + + @Test + public void ltTest() { + var source1 = TreePMap.empty().plus(3, 3).plus(5, 5).plus(6, 6); + var pit = new KeyPredicateKvIterator<>(new NavigableMapKvIterator<>(source1, IteratorStart.LT, 5), + IteratorStart.LT, 5, v -> (v % 2 == 0)); + var expected = List.of(Pair.of(6, 6)); + for (var pair : expected) { + Assertions.assertTrue(pit.hasNext()); + Assertions.assertEquals(pair, pit.next()); + } + Assertions.assertFalse(pit.hasNext()); + } + + @Test + public void ltTest2() { + var source1 = TreePMap.empty().plus(3, 3).plus(5, 5).plus(6, 6); + var pit = new KeyPredicateKvIterator<>(new NavigableMapKvIterator<>(source1, IteratorStart.LT, 3), + IteratorStart.LT, 2, v -> (v % 2 == 0)); + Just.checkIterator(pit, Pair.of(6, 6)); + Assertions.assertFalse(pit.hasNext()); + + pit = new KeyPredicateKvIterator<>(new NavigableMapKvIterator<>(source1, IteratorStart.LT, 4), + IteratorStart.LT, 4, v -> (v % 2 == 0)); + Just.checkIterator(pit, Pair.of(6, 6)); + Assertions.assertFalse(pit.hasNext()); + + pit = new KeyPredicateKvIterator<>(new NavigableMapKvIterator<>(source1, IteratorStart.LT, 5), + IteratorStart.LT, 5, v -> (v % 2 == 0)); + Just.checkIterator(pit, Pair.of(6, 6)); + Assertions.assertFalse(pit.hasNext()); + + pit = new KeyPredicateKvIterator<>(new NavigableMapKvIterator<>(source1, IteratorStart.LE, 5), + IteratorStart.LE, 5, v -> (v % 2 == 0)); + Just.checkIterator(pit, Pair.of(6, 6)); + Assertions.assertFalse(pit.hasNext()); + } + + @Test + public void ltTest3() { + var source1 = TreePMap.empty().plus(3, 3).plus(5, 5).plus(6, 6).plus(7, 7).plus(8, 8); + var pit = new KeyPredicateKvIterator<>(new NavigableMapKvIterator<>(source1, IteratorStart.LT, 5), + IteratorStart.LT, 5, v -> (v % 2 == 0)); + Just.checkIterator(pit, Pair.of(6, 6), Pair.of(8, 8)); + Assertions.assertFalse(pit.hasNext()); + + pit = new KeyPredicateKvIterator<>(new NavigableMapKvIterator<>(source1, IteratorStart.LT, 5), + IteratorStart.LT, 5, v -> (v % 2 == 0)); + Just.checkIterator(pit, Pair.of(6, 6), Pair.of(8, 8)); + Assertions.assertFalse(pit.hasNext()); + + pit = new KeyPredicateKvIterator<>(new NavigableMapKvIterator<>(source1, IteratorStart.LT, 6), + IteratorStart.LT, 6, v -> (v % 2 == 0)); + Just.checkIterator(pit, Pair.of(6, 6), Pair.of(8, 8)); + Assertions.assertFalse(pit.hasNext()); + + pit = new KeyPredicateKvIterator<>(new NavigableMapKvIterator<>(source1, IteratorStart.LT, 7), + IteratorStart.LT, 7, v -> (v % 2 == 0)); + Just.checkIterator(pit, Pair.of(6, 6), Pair.of(8, 8)); + Assertions.assertFalse(pit.hasNext()); + + pit = new KeyPredicateKvIterator<>(new NavigableMapKvIterator<>(source1, IteratorStart.LT, 8), + IteratorStart.LT, 8, v -> (v % 2 == 0)); + Just.checkIterator(pit, Pair.of(6, 6), Pair.of(8, 8)); + Assertions.assertFalse(pit.hasNext()); + + pit = new KeyPredicateKvIterator<>(new NavigableMapKvIterator<>(source1, IteratorStart.LE, 6), + IteratorStart.LE, 6, v -> (v % 2 == 0)); + Just.checkIterator(pit, Pair.of(6, 6), Pair.of(8, 8)); + Assertions.assertFalse(pit.hasNext()); + + pit = new KeyPredicateKvIterator<>(new NavigableMapKvIterator<>(source1, IteratorStart.LT, 6), + IteratorStart.LT, 6, v -> (v % 2 == 0)); + Assertions.assertTrue(pit.hasNext()); + Assertions.assertEquals(6, pit.peekNextKey()); + Assertions.assertFalse(pit.hasPrev()); + Assertions.assertEquals(6, pit.peekNextKey()); + Assertions.assertFalse(pit.hasPrev()); + Assertions.assertEquals(Pair.of(6, 6), pit.next()); + Assertions.assertTrue(pit.hasNext()); + Assertions.assertEquals(8, pit.peekNextKey()); + Assertions.assertEquals(6, pit.peekPrevKey()); + Assertions.assertEquals(8, pit.peekNextKey()); + Assertions.assertEquals(6, pit.peekPrevKey()); + } + + @Test + public void itTest4() { + var source1 = TreePMap.empty().plus(3, 3).plus(5, 5).plus(6, 6).plus(8, 8).plus(10, 10); + var pit = new KeyPredicateKvIterator<>(new NavigableMapKvIterator<>(source1, IteratorStart.LT, 5), + IteratorStart.LT, 5, v -> (v % 2 == 0)); + Just.checkIterator(pit, Pair.of(6, 6), Pair.of(8, 8), Pair.of(10, 10)); + Assertions.assertFalse(pit.hasNext()); + + pit = new KeyPredicateKvIterator<>(new NavigableMapKvIterator<>(source1, IteratorStart.LT, 5), + IteratorStart.LT, 5, v -> (v % 2 == 0)); + Just.checkIterator(pit, Pair.of(6, 6), Pair.of(8, 8), Pair.of(10, 10)); + Assertions.assertFalse(pit.hasNext()); + + pit = new KeyPredicateKvIterator<>(new NavigableMapKvIterator<>(source1, IteratorStart.LT, 6), + IteratorStart.LT, 6, v -> (v % 2 == 0)); + Just.checkIterator(pit, Pair.of(6, 6), Pair.of(8, 8), Pair.of(10, 10)); + Assertions.assertFalse(pit.hasNext()); + + pit = new KeyPredicateKvIterator<>(new NavigableMapKvIterator<>(source1, IteratorStart.LT, 7), + IteratorStart.LT, 7, v -> (v % 2 == 0)); + Just.checkIterator(pit, Pair.of(6, 6), Pair.of(8, 8), Pair.of(10, 10)); + Assertions.assertFalse(pit.hasNext()); + + pit = new KeyPredicateKvIterator<>(new NavigableMapKvIterator<>(source1, IteratorStart.LT, 6), + IteratorStart.LT, 6, v -> (v % 2 == 0)); + Assertions.assertTrue(pit.hasNext()); + Assertions.assertEquals(6, pit.peekNextKey()); + Assertions.assertFalse(pit.hasPrev()); + Assertions.assertEquals(6, pit.peekNextKey()); + Assertions.assertEquals(Pair.of(6, 6), pit.next()); + Assertions.assertTrue(pit.hasNext()); + Assertions.assertEquals(8, pit.peekNextKey()); + Assertions.assertEquals(6, pit.peekPrevKey()); + Assertions.assertEquals(8, pit.peekNextKey()); + Assertions.assertEquals(6, pit.peekPrevKey()); + } + +// @Test +// public void reverseTest() { +// var source1 = TreePMap.empty().plus(3, 3).plus(5, 5).plus(6, 6); +// var pit = new KeyPredicateKvIterator<>(new NavigableMapKvIterator<>(source1, IteratorStart.LT, 4), +// IteratorStart.LT, 4, v -> (v % 2 == 0) ); +// +// } +} diff --git a/dhfs-parent/objects/src/test/java/com/usatiuk/dhfs/objects/MergingKvIteratorTest.java b/dhfs-parent/objects/src/test/java/com/usatiuk/dhfs/objects/MergingKvIteratorTest.java index 63f25100..430dc635 100644 --- a/dhfs-parent/objects/src/test/java/com/usatiuk/dhfs/objects/MergingKvIteratorTest.java +++ b/dhfs-parent/objects/src/test/java/com/usatiuk/dhfs/objects/MergingKvIteratorTest.java @@ -44,6 +44,27 @@ public class MergingKvIteratorTest { fillNext(); } + @Override + public K peekPrevKey() { + throw new UnsupportedOperationException(); + } + + @Override + public Pair prev() { + throw new UnsupportedOperationException(); + } + + @Override + public boolean hasPrev() { + throw new UnsupportedOperationException(); + } + + @Override + public void skipPrev() { + throw new UnsupportedOperationException(); + + } + @Override public void close() { } @@ -148,7 +169,7 @@ public class MergingKvIteratorTest { Assertions.assertFalse(mergingIterator.hasNext()); Just.checkIterator(mergingIterator.reversed(), Pair.of(5, 6), Pair.of(2, 4), Pair.of(1, 3)); Assertions.assertFalse(mergingIterator.reversed().hasNext()); - Just.checkIterator(mergingIterator, Pair.of(1,3), Pair.of(2, 4), Pair.of(5, 6)); + Just.checkIterator(mergingIterator, Pair.of(1, 3), Pair.of(2, 4), Pair.of(5, 6)); Assertions.assertFalse(mergingIterator.hasNext()); @@ -161,7 +182,7 @@ public class MergingKvIteratorTest { Assertions.assertFalse(mergingIterator2.hasNext()); Just.checkIterator(mergingIterator2.reversed(), Pair.of(5, 6), Pair.of(2, 5), Pair.of(1, 3)); Assertions.assertFalse(mergingIterator2.reversed().hasNext()); - Just.checkIterator(mergingIterator2, Pair.of(1,3), Pair.of(2, 5), Pair.of(5, 6)); + Just.checkIterator(mergingIterator2, Pair.of(1, 3), Pair.of(2, 5), Pair.of(5, 6)); Assertions.assertFalse(mergingIterator2.hasNext()); var mergingIterator3 = new MergingKvIterator<>("test", IteratorStart.LE, 5, (mS, mK) -> new NavigableMapKvIterator<>(source1, mS, mK), (mS, mK) -> new NavigableMapKvIterator<>(source2, mS, mK)); @@ -213,6 +234,9 @@ public class MergingKvIteratorTest { Assertions.assertEquals(2, mergingIterator3.peekPrevKey()); Assertions.assertEquals(5, mergingIterator3.peekNextKey()); Assertions.assertEquals(2, mergingIterator3.peekPrevKey()); + Assertions.assertTrue(mergingIterator3.hasPrev()); + Assertions.assertTrue(mergingIterator3.hasNext()); + Assertions.assertEquals(5, mergingIterator3.peekNextKey()); } @Test diff --git a/dhfs-parent/objects/src/test/java/com/usatiuk/dhfs/objects/ObjectsTestExtraChecks.java b/dhfs-parent/objects/src/test/java/com/usatiuk/dhfs/objects/ObjectsTestExtraChecks.java index 9c933417..a4bdfb57 100644 --- a/dhfs-parent/objects/src/test/java/com/usatiuk/dhfs/objects/ObjectsTestExtraChecks.java +++ b/dhfs-parent/objects/src/test/java/com/usatiuk/dhfs/objects/ObjectsTestExtraChecks.java @@ -5,5 +5,5 @@ import io.quarkus.test.junit.TestProfile; @QuarkusTest @TestProfile(Profiles.ObjectsTestProfileExtraChecks.class) -public class ObjectsTestExtraChecks extends ObjectsTestImpl{ +public class ObjectsTestExtraChecks extends ObjectsTestImpl { } diff --git a/dhfs-parent/objects/src/test/java/com/usatiuk/dhfs/objects/ObjectsTestImpl.java b/dhfs-parent/objects/src/test/java/com/usatiuk/dhfs/objects/ObjectsTestImpl.java index a3c346ff..32a9ea31 100644 --- a/dhfs-parent/objects/src/test/java/com/usatiuk/dhfs/objects/ObjectsTestImpl.java +++ b/dhfs-parent/objects/src/test/java/com/usatiuk/dhfs/objects/ObjectsTestImpl.java @@ -5,7 +5,6 @@ import com.usatiuk.dhfs.objects.persistence.IteratorStart; import com.usatiuk.dhfs.objects.transaction.LockingStrategy; import com.usatiuk.dhfs.objects.transaction.Transaction; import io.quarkus.logging.Log; -import io.quarkus.test.junit.QuarkusTestProfile; import jakarta.inject.Inject; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Disabled; @@ -13,7 +12,6 @@ import org.junit.jupiter.api.RepeatedTest; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.EnumSource; -import org.pcollections.TreePMap; import java.util.List; import java.util.Map; @@ -24,17 +22,17 @@ import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; class Profiles { - public static class ObjectsTestProfileExtraChecks implements QuarkusTestProfile { + public static class ObjectsTestProfileExtraChecks extends TempDataProfile { @Override - final public Map getConfigOverrides() { - return TreePMap.empty().plus("dhfs.objects.persistence.snapshot-extra-checks", "true"); + protected void getConfigOverrides(Map toPut) { + toPut.put("dhfs.objects.persistence.snapshot-extra-checks", "true"); } } - public static class ObjectsTestProfileNoExtraChecks implements QuarkusTestProfile { + public static class ObjectsTestProfileNoExtraChecks extends TempDataProfile { @Override - final public Map getConfigOverrides() { - return TreePMap.empty().plus("dhfs.objects.persistence.snapshot-extra-checks", "false"); + protected void getConfigOverrides(Map toPut) { + toPut.put("dhfs.objects.persistence.snapshot-extra-checks", "false"); } } } @@ -582,6 +580,7 @@ public abstract class ObjectsTestImpl { Assertions.assertEquals(key3, got.getKey().name()); got = iter.next(); Assertions.assertEquals(key4, got.getKey().name()); + iter.close(); }); } @@ -611,6 +610,18 @@ public abstract class ObjectsTestImpl { Assertions.assertEquals(key4, got.getKey().name()); } }); + txm.run(() -> { + try (var iter = curTx.getIterator(IteratorStart.LT, new JObjectKey(key + "_5"))) { + var got = iter.next(); + Assertions.assertEquals(key4, got.getKey().name()); + Assertions.assertTrue(iter.hasPrev()); + got = iter.prev(); + Assertions.assertEquals(key4, got.getKey().name()); + Assertions.assertTrue(iter.hasNext()); + got = iter.next(); + Assertions.assertEquals(key4, got.getKey().name()); + } + }); txm.run(() -> { curTx.delete(new JObjectKey(key)); curTx.delete(new JObjectKey(key1)); @@ -816,6 +827,32 @@ public abstract class ObjectsTestImpl { try { barrier.await(); barrier2.await(); + try (var iter = curTx.getIterator(IteratorStart.LE, new JObjectKey(key3))) { + var got = iter.next(); + Assertions.assertEquals(key2, got.getKey().name()); + Assertions.assertEquals("John2", ((Parent) got.getValue()).name()); + Assertions.assertTrue(iter.hasNext()); + Assertions.assertTrue(iter.hasPrev()); + got = iter.next(); + Assertions.assertEquals(key4, got.getKey().name()); + Assertions.assertTrue(iter.hasPrev()); + got = iter.prev(); + Assertions.assertEquals(key4, got.getKey().name()); + Assertions.assertTrue(iter.hasPrev()); + got = iter.prev(); + Assertions.assertEquals("John2", ((Parent) got.getValue()).name()); + Assertions.assertTrue(iter.hasPrev()); + got = iter.prev(); + Assertions.assertEquals(key1, got.getKey().name()); + Assertions.assertTrue(iter.hasNext()); + got = iter.next(); + Assertions.assertEquals(key1, got.getKey().name()); + got = iter.next(); + Assertions.assertEquals(key2, got.getKey().name()); + Assertions.assertEquals("John2", ((Parent) got.getValue()).name()); + got = iter.next(); + Assertions.assertEquals(key4, got.getKey().name()); + } try (var iter = curTx.getIterator(IteratorStart.GT, new JObjectKey(key))) { var got = iter.next(); Assertions.assertEquals(key1, got.getKey().name()); diff --git a/dhfs-parent/objects/src/test/java/com/usatiuk/dhfs/objects/PreCommitTxHookTest.java b/dhfs-parent/objects/src/test/java/com/usatiuk/dhfs/objects/PreCommitTxHookTest.java index 0598e61e..1bae7b0a 100644 --- a/dhfs-parent/objects/src/test/java/com/usatiuk/dhfs/objects/PreCommitTxHookTest.java +++ b/dhfs-parent/objects/src/test/java/com/usatiuk/dhfs/objects/PreCommitTxHookTest.java @@ -3,6 +3,7 @@ package com.usatiuk.dhfs.objects; import com.usatiuk.dhfs.objects.data.Parent; import com.usatiuk.dhfs.objects.transaction.Transaction; import io.quarkus.test.junit.QuarkusTest; +import io.quarkus.test.junit.TestProfile; import io.quarkus.test.junit.mockito.InjectSpy; import jakarta.enterprise.context.ApplicationScoped; import jakarta.inject.Inject; @@ -12,6 +13,7 @@ import org.mockito.ArgumentCaptor; import org.mockito.Mockito; @QuarkusTest +@TestProfile(TempDataProfile.class) public class PreCommitTxHookTest { @Inject TransactionManager txm; diff --git a/dhfs-parent/objects/src/test/java/com/usatiuk/dhfs/objects/PredicateKvIteratorTest.java b/dhfs-parent/objects/src/test/java/com/usatiuk/dhfs/objects/PredicateKvIteratorTest.java index 3cf41813..05ad6d4b 100644 --- a/dhfs-parent/objects/src/test/java/com/usatiuk/dhfs/objects/PredicateKvIteratorTest.java +++ b/dhfs-parent/objects/src/test/java/com/usatiuk/dhfs/objects/PredicateKvIteratorTest.java @@ -27,7 +27,7 @@ public class PredicateKvIteratorTest { var source1 = TreePMap.empty().plus(1, 3).plus(3, 5).plus(4, 6); var pit = new PredicateKvIterator<>(new NavigableMapKvIterator<>(source1, IteratorStart.LT, 4), IteratorStart.LT, 4, v -> (v % 2 == 0) ? v : null); - var expected = List.of(); + var expected = List.of(Pair.of(4, 6)); for (var pair : expected) { Assertions.assertTrue(pit.hasNext()); Assertions.assertEquals(pair, pit.next()); @@ -129,6 +129,11 @@ public class PredicateKvIteratorTest { IteratorStart.LT, 7, v -> (v % 2 == 0) ? v : null); Just.checkIterator(pit, Pair.of(6, 10)); Assertions.assertFalse(pit.hasNext()); + Assertions.assertTrue(pit.hasPrev()); + Assertions.assertEquals(6, pit.peekPrevKey()); + Assertions.assertEquals(Pair.of(6, 10), pit.prev()); + Assertions.assertTrue(pit.hasNext()); + Assertions.assertEquals(6, pit.peekNextKey()); pit = new PredicateKvIterator<>(new NavigableMapKvIterator<>(source1, IteratorStart.LT, 6), IteratorStart.LT, 6, v -> (v % 2 == 0) ? v : null); diff --git a/dhfs-parent/objects/src/test/java/com/usatiuk/dhfs/objects/persistence/LmdbKvIteratorTest.java b/dhfs-parent/objects/src/test/java/com/usatiuk/dhfs/objects/persistence/LmdbKvIteratorTest.java index e6baa8fa..483297ef 100644 --- a/dhfs-parent/objects/src/test/java/com/usatiuk/dhfs/objects/persistence/LmdbKvIteratorTest.java +++ b/dhfs-parent/objects/src/test/java/com/usatiuk/dhfs/objects/persistence/LmdbKvIteratorTest.java @@ -10,18 +10,23 @@ import io.quarkus.test.junit.TestProfile; import jakarta.inject.Inject; import org.apache.commons.lang3.tuple.Pair; import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.RepeatedTest; import java.util.List; +class Profiles { + public static class LmdbKvIteratorTestProfile extends TempDataProfile { + } +} + @QuarkusTest -@TestProfile(TempDataProfile.class) +@TestProfile(Profiles.LmdbKvIteratorTestProfile.class) public class LmdbKvIteratorTest { @Inject LmdbObjectPersistentStore store; - @Test + @RepeatedTest(100) public void iteratorTest1() { store.commitTx( new TxManifestRaw( @@ -29,7 +34,7 @@ public class LmdbKvIteratorTest { Pair.of(JObjectKey.of(Long.toString(2)), ByteString.copyFrom(new byte[]{3})), Pair.of(JObjectKey.of(Long.toString(3)), ByteString.copyFrom(new byte[]{4}))), List.of() - ) + ), -1, Runnable::run ); var iterator = store.getIterator(IteratorStart.LE, JObjectKey.of(Long.toString(3))); @@ -99,8 +104,10 @@ public class LmdbKvIteratorTest { iterator.close(); store.commitTx(new TxManifestRaw( - List.of(), - List.of(JObjectKey.of(Long.toString(1)), JObjectKey.of(Long.toString(2)), JObjectKey.of(Long.toString(3))) - )); + List.of(), + List.of(JObjectKey.of(Long.toString(1)), JObjectKey.of(Long.toString(2)), JObjectKey.of(Long.toString(3))) + ), + -1, Runnable::run + ); } } diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/jmap/JMapIterator.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/jmap/JMapIterator.java index d997f3b8..f13f1af7 100644 --- a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/jmap/JMapIterator.java +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/jmap/JMapIterator.java @@ -3,6 +3,7 @@ package com.usatiuk.dhfs.objects.jmap; import com.usatiuk.dhfs.objects.CloseableKvIterator; import com.usatiuk.dhfs.objects.JData; import com.usatiuk.dhfs.objects.JObjectKey; +import org.apache.commons.lang3.NotImplementedException; import org.apache.commons.lang3.tuple.Pair; public class JMapIterator> implements CloseableKvIterator> { @@ -52,6 +53,26 @@ public class JMapIterator> implements Closeabl advance(); } + @Override + public K peekPrevKey() { + throw new NotImplementedException(); + } + + @Override + public Pair> prev() { + throw new NotImplementedException(); + } + + @Override + public boolean hasPrev() { + throw new NotImplementedException(); + } + + @Override + public void skipPrev() { + throw new NotImplementedException(); + } + @Override public void close() { _backing.close();