reverse iterators and a bunch of fixes with lt/gt iterator start

This commit is contained in:
2025-03-08 00:35:56 +01:00
parent e7bea01faf
commit 75fec73b39
39 changed files with 967 additions and 919 deletions

View File

@@ -10,21 +10,13 @@ public interface CloseableKvIterator<K extends Comparable<K>, V> extends Iterato
void skip(); void skip();
default K peekPrevKey() { K peekPrevKey();
throw new UnsupportedOperationException();
}
default Pair<K, V> prev() { Pair<K, V> prev();
throw new UnsupportedOperationException();
}
default boolean hasPrev() { boolean hasPrev();
throw new UnsupportedOperationException();
}
default void skipPrev() { void skipPrev();
throw new UnsupportedOperationException();
}
default CloseableKvIterator<K, V> reversed() { default CloseableKvIterator<K, V> reversed() {
return new ReversedKvIterator<>(this); return new ReversedKvIterator<>(this);

View File

@@ -0,0 +1,10 @@
package com.usatiuk.dhfs.objects;
import java.util.Optional;
public record Data<V>(V value) implements MaybeTombstone<V> {
@Override
public Optional<V> opt() {
return Optional.of(value);
}
}

View File

@@ -1,134 +0,0 @@
package com.usatiuk.dhfs.objects;
import com.usatiuk.dhfs.objects.persistence.IteratorStart;
import io.quarkus.logging.Log;
import org.apache.commons.lang3.tuple.Pair;
import java.util.NoSuchElementException;
import java.util.function.Function;
// Also checks that the next provided item is always consistent after a refresh
public class InconsistentKvIteratorWrapper<K extends Comparable<K>, V> implements CloseableKvIterator<K, V> {
private CloseableKvIterator<K, V> _backing;
private final Function<Pair<IteratorStart, K>, CloseableKvIterator<K, V>> _iteratorSupplier;
private K _lastReturnedKey = null;
private K _peekedKey = null;
private boolean _peekedNext = false;
private final Pair<IteratorStart, K> _initialStart;
public InconsistentKvIteratorWrapper(Function<Pair<IteratorStart, K>, CloseableKvIterator<K, V>> iteratorSupplier, IteratorStart start, K key) {
_iteratorSupplier = iteratorSupplier;
_initialStart = Pair.of(start, key);
while (true) {
try {
_backing = _iteratorSupplier.apply(Pair.of(start, key));
break;
} catch (StaleIteratorException ignored) {
continue;
}
}
}
private void refresh() {
Log.tracev("Refreshing iterator: {0}", _backing);
_backing.close();
if (_peekedKey != null) {
_backing = _iteratorSupplier.apply(Pair.of(IteratorStart.GE, _peekedKey));
if (!_backing.hasNext() || !_backing.peekNextKey().equals(_peekedKey)) {
assert false;
}
} else if (_lastReturnedKey != null) {
_backing = _iteratorSupplier.apply(Pair.of(IteratorStart.GT, _lastReturnedKey));
} else {
_backing = _iteratorSupplier.apply(_initialStart);
}
if (_peekedNext && !_backing.hasNext()) {
assert false;
}
}
@Override
public K peekNextKey() {
while (true) {
if (_peekedKey != null) {
return _peekedKey;
}
try {
_peekedKey = _backing.peekNextKey();
assert _lastReturnedKey == null || _peekedKey.compareTo(_lastReturnedKey) > 0;
} catch (NoSuchElementException ignored) {
assert !_peekedNext;
throw ignored;
} catch (StaleIteratorException ignored) {
refresh();
continue;
}
_peekedNext = true;
Log.tracev("Peeked key: {0}", _peekedKey);
return _peekedKey;
}
}
@Override
public void skip() {
while (true) {
try {
_lastReturnedKey = _backing.peekNextKey();
_backing.skip();
_peekedNext = false;
_peekedKey = null;
return;
} catch (NoSuchElementException ignored) {
assert !_peekedNext;
throw ignored;
} catch (StaleIteratorException ignored) {
refresh();
continue;
}
}
}
@Override
public void close() {
_backing.close();
}
@Override
public boolean hasNext() {
while (true) {
if (_peekedNext) {
return true;
}
try {
_peekedNext = _backing.hasNext();
Log.tracev("Peeked next: {0}", _peekedNext);
return _peekedNext;
} catch (StaleIteratorException ignored) {
refresh();
continue;
}
}
}
@Override
public Pair<K, V> next() {
while (true) {
try {
var got = _backing.next();
assert _lastReturnedKey == null || _peekedKey.compareTo(_lastReturnedKey) > 0;
_peekedNext = false;
_peekedKey = null;
_lastReturnedKey = got.getKey();
return got;
} catch (NoSuchElementException ignored) {
assert !_peekedNext;
throw ignored;
} catch (StaleIteratorException ignored) {
refresh();
continue;
}
}
}
}

View File

@@ -1,148 +0,0 @@
package com.usatiuk.dhfs.objects;
import com.usatiuk.dhfs.objects.persistence.IteratorStart;
import io.quarkus.logging.Log;
import org.apache.commons.lang3.tuple.Pair;
import java.util.concurrent.locks.Lock;
import java.util.function.Function;
import java.util.function.Supplier;
// Also checks that the next provided item is always consistent after a refresh
public class InconsistentSelfRefreshingKvIterator<K extends Comparable<K>, V> implements CloseableKvIterator<K, V> {
private CloseableKvIterator<K, V> _backing;
private long _curVersion = -1L;
private final Lock _lock;
private final Function<Pair<IteratorStart, K>, CloseableKvIterator<K, V>> _iteratorSupplier;
private final Supplier<Long> _versionSupplier;
private K _lastReturnedKey = null;
private K _peekedKey = null;
private boolean _peekedNext = false;
private final Pair<IteratorStart, K> _initialStart;
public InconsistentSelfRefreshingKvIterator(Function<Pair<IteratorStart, K>, CloseableKvIterator<K, V>> iteratorSupplier, Supplier<Long> versionSupplier, Lock lock,
IteratorStart start, K key) {
_iteratorSupplier = iteratorSupplier;
_versionSupplier = versionSupplier;
_lock = lock;
_initialStart = Pair.of(start, key);
_lock.lock();
try {
long curVersion = _versionSupplier.get();
_backing = _iteratorSupplier.apply(Pair.of(start, key));
_curVersion = curVersion;
} finally {
_lock.unlock();
}
}
private void maybeRefresh() {
_lock.lock();
CloseableKvIterator<K, V> oldBacking = null;
try {
if (_versionSupplier.get() == _curVersion) {
return;
}
long newVersion = _versionSupplier.get();
oldBacking = _backing;
if (_peekedKey != null) {
_backing = _iteratorSupplier.apply(Pair.of(IteratorStart.GE, _peekedKey));
if (!_backing.hasNext() || !_backing.peekNextKey().equals(_peekedKey)) {
throw new StaleIteratorException();
}
} else if (_lastReturnedKey != null) {
_backing = _iteratorSupplier.apply(Pair.of(IteratorStart.GT, _lastReturnedKey));
if (_backing.hasNext() && !(_backing.peekNextKey().compareTo(_lastReturnedKey) > 0)) {
throw new StaleIteratorException();
}
} else {
_backing = _iteratorSupplier.apply(_initialStart);
}
if (_peekedNext && !_backing.hasNext()) {
throw new StaleIteratorException();
}
Log.tracev("Refreshed iterator last refreshed {0}, current version {1}",
_curVersion, newVersion);
_curVersion = newVersion;
} finally {
_lock.unlock();
if (oldBacking != null) {
oldBacking.close();
}
}
}
@Override
public K peekNextKey() {
if (_peekedKey != null) {
return _peekedKey;
}
_lock.lock();
try {
maybeRefresh();
_peekedKey = _backing.peekNextKey();
assert _lastReturnedKey == null || _peekedKey.compareTo(_lastReturnedKey) > 0;
_peekedNext = true;
Log.tracev("Peeked key: {0}", _peekedKey);
return _peekedKey;
} finally {
_lock.unlock();
}
}
@Override
public void skip() {
_lock.lock();
try {
maybeRefresh();
_lastReturnedKey = _backing.peekNextKey();
_backing.skip();
_peekedNext = false;
_peekedKey = null;
} finally {
_lock.unlock();
}
}
@Override
public void close() {
_backing.close();
}
@Override
public boolean hasNext() {
if (_peekedNext) {
return true;
}
_lock.lock();
try {
maybeRefresh();
_peekedNext = _backing.hasNext();
Log.tracev("Peeked next: {0}", _peekedNext);
return _peekedNext;
} finally {
_lock.unlock();
}
}
@Override
public Pair<K, V> next() {
_lock.lock();
try {
maybeRefresh();
var got = _backing.next();
assert _lastReturnedKey == null || got.getKey().compareTo(_lastReturnedKey) > 0;
_peekedNext = false;
_peekedKey = null;
_lastReturnedKey = got.getKey();
return got;
} finally {
_lock.unlock();
}
}
}

View File

@@ -1,11 +0,0 @@
package com.usatiuk.dhfs.objects;
public class InvalidIteratorException extends RuntimeException {
public InvalidIteratorException() {
super();
}
public InvalidIteratorException(String message) {
super(message);
}
}

View File

@@ -1,77 +0,0 @@
package com.usatiuk.dhfs.objects;
import io.quarkus.logging.Log;
import org.apache.commons.lang3.tuple.Pair;
import java.util.concurrent.locks.Lock;
import java.util.function.Supplier;
public class InvalidatableKvIterator<K extends Comparable<K>, V> implements CloseableKvIterator<K, V> {
private final CloseableKvIterator<K, V> _backing;
private final Supplier<Long> _versionSupplier;
private final long _version;
private final Lock _lock;
public InvalidatableKvIterator(CloseableKvIterator<K, V> backing, Supplier<Long> versionSupplier, Lock lock) {
_backing = backing;
_versionSupplier = versionSupplier;
_lock = lock;
_version = _versionSupplier.get();
}
private void checkVersion() {
if (_versionSupplier.get() != _version) {
Log.errorv("Version mismatch: {0} != {1}", _versionSupplier.get(), _version);
throw new InvalidIteratorException();
}
}
@Override
public K peekNextKey() {
_lock.lock();
try {
checkVersion();
return _backing.peekNextKey();
} finally {
_lock.unlock();
}
}
@Override
public void skip() {
_lock.lock();
try {
checkVersion();
_backing.skip();
} finally {
_lock.unlock();
}
}
@Override
public void close() {
_backing.close();
}
@Override
public boolean hasNext() {
_lock.lock();
try {
checkVersion();
return _backing.hasNext();
} finally {
_lock.unlock();
}
}
@Override
public Pair<K, V> next() {
_lock.lock();
try {
checkVersion();
return _backing.next();
} finally {
_lock.unlock();
}
}
}

View File

@@ -1,28 +0,0 @@
package com.usatiuk.dhfs.objects;
public class JDataDummy implements JData {
public static final JObjectKey TX_ID_OBJ_NAME = JObjectKey.of("tx_id");
private static final JDataDummy INSTANCE = new JDataDummy();
public static JDataDummy getInstance() {
return INSTANCE;
}
@Override
public JObjectKey key() {
return TX_ID_OBJ_NAME;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
return true;
}
// hashCode
@Override
public int hashCode() {
return 0;
}
}

View File

@@ -12,7 +12,6 @@ import jakarta.enterprise.inject.Instance;
import jakarta.inject.Inject; import jakarta.inject.Inject;
import java.util.*; import java.util.*;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer; import java.util.function.Consumer;
import java.util.function.Function; import java.util.function.Function;
import java.util.stream.Stream; import java.util.stream.Stream;
@@ -24,7 +23,6 @@ import java.util.stream.Stream;
@ApplicationScoped @ApplicationScoped
public class JObjectManager { public class JObjectManager {
private final List<PreCommitTxHook> _preCommitTxHooks; private final List<PreCommitTxHook> _preCommitTxHooks;
private final AtomicLong _txCounter = new AtomicLong();
private boolean _ready = false; private boolean _ready = false;
@Inject @Inject
SnapshotManager snapshotManager; SnapshotManager snapshotManager;
@@ -38,10 +36,6 @@ public class JObjectManager {
} }
void init(@Observes @Priority(200) StartupEvent event) { void init(@Observes @Priority(200) StartupEvent event) {
var read = snapshotManager.readObjectDirect(JDataDummy.TX_ID_OBJ_NAME).orElse(null);
if (read != null) {
_txCounter.set(read.version());
}
_ready = true; _ready = true;
} }
@@ -51,14 +45,9 @@ public class JObjectManager {
public TransactionPrivate createTransaction() { public TransactionPrivate createTransaction() {
verifyReady(); verifyReady();
while (true) { var tx = transactionFactory.createTransaction();
try { Log.tracev("Created transaction with snapshotId={0}", tx.snapshot().id());
var tx = transactionFactory.createTransaction(_txCounter.get()); return tx;
Log.tracev("Created transaction with snapshotId={0}", tx.snapshot().id());
return tx;
} catch (SnapshotManager.IllegalSnapshotIdException ignored) {
}
}
} }
public TransactionHandle commit(TransactionPrivate tx) { public TransactionHandle commit(TransactionPrivate tx) {
@@ -102,10 +91,6 @@ public class JObjectManager {
Log.trace("Commit iteration with " + currentIteration.size() + " records for hook " + hook.getClass()); Log.trace("Commit iteration with " + currentIteration.size() + " records for hook " + hook.getClass());
for (var entry : currentIteration.entrySet()) { for (var entry : currentIteration.entrySet()) {
// FIXME: Kinda hack?
if (entry.getKey().equals(JDataDummy.TX_ID_OBJ_NAME)) {
continue;
}
somethingChanged = true; somethingChanged = true;
Log.trace("Running pre-commit hook " + hook.getClass() + " for" + entry.getKey()); Log.trace("Running pre-commit hook " + hook.getClass() + " for" + entry.getKey());
var oldObj = getCurrent.apply(entry.getKey()); var oldObj = getCurrent.apply(entry.getKey());
@@ -150,14 +135,9 @@ public class JObjectManager {
} }
} }
} }
Log.trace("Committing transaction start");
// FIXME: Better way?
addDependency.accept(JDataDummy.TX_ID_OBJ_NAME);
writes.put(JDataDummy.TX_ID_OBJ_NAME, new TxRecord.TxObjectRecordWrite<>(JDataDummy.getInstance()));
Log.trace("Committing transaction start");
var snapshotId = tx.snapshot().id(); var snapshotId = tx.snapshot().id();
var newId = _txCounter.get() + 1;
for (var read : readSet.entrySet()) { for (var read : readSet.entrySet()) {
var dep = dependenciesLocked.get(read.getKey()); var dep = dependenciesLocked.get(read.getKey());
@@ -182,7 +162,6 @@ public class JObjectManager {
Log.trace("Checking dependency " + read.getKey() + " - ok with read"); Log.trace("Checking dependency " + read.getKey() + " - ok with read");
} }
Log.tracef("Committing transaction %d to storage", newId);
var addFlushCallback = snapshotManager.commitTx( var addFlushCallback = snapshotManager.commitTx(
writes.values().stream() writes.values().stream()
.filter(r -> { .filter(r -> {
@@ -194,11 +173,7 @@ public class JObjectManager {
} }
} }
return true; return true;
}).toList(), }).toList());
newId);
var realNewId = _txCounter.getAndIncrement() + 1;
assert realNewId == newId;
for (var callback : tx.getOnCommit()) { for (var callback : tx.getOnCommit()) {
callback.run(); callback.run();

View File

@@ -0,0 +1,129 @@
package com.usatiuk.dhfs.objects;
import com.usatiuk.dhfs.objects.persistence.IteratorStart;
import org.apache.commons.lang3.tuple.Pair;
import java.util.NoSuchElementException;
import java.util.function.Function;
public class KeyPredicateKvIterator<K extends Comparable<K>, V> extends ReversibleKvIterator<K, V> {
private final CloseableKvIterator<K, V> _backing;
private final Function<K, Boolean> _filter;
private K _next;
public KeyPredicateKvIterator(CloseableKvIterator<K, V> backing, IteratorStart start, K startKey, Function<K, Boolean> filter) {
_goingForward = true;
_backing = backing;
_filter = filter;
fillNext();
boolean shouldGoBack = false;
if (start == IteratorStart.LE) {
if (_next == null || _next.compareTo(startKey) > 0) {
shouldGoBack = true;
}
} else if (start == IteratorStart.LT) {
if (_next == null || _next.compareTo(startKey) >= 0) {
shouldGoBack = true;
}
}
if (shouldGoBack && _backing.hasPrev()) {
_goingForward = false;
_next = null;
fillNext();
if (_next != null)
_backing.skipPrev();
_goingForward = true;
// _backing.skip();
fillNext();
}
switch (start) {
case LT -> {
// assert _next == null || _next.getKey().compareTo(startKey) < 0;
}
case LE -> {
// assert _next == null || _next.getKey().compareTo(startKey) <= 0;
}
case GT -> {
assert _next == null || _next.compareTo(startKey) > 0;
}
case GE -> {
assert _next == null || _next.compareTo(startKey) >= 0;
}
}
}
private void fillNext() {
while ((_goingForward ? _backing.hasNext() : _backing.hasPrev()) && _next == null) {
var next = _goingForward ? _backing.peekNextKey() : _backing.peekPrevKey();
if (!_filter.apply(next)) {
if (_goingForward)
_backing.skip();
else
_backing.skipPrev();
continue;
}
_next = next;
}
}
@Override
protected void reverse() {
_goingForward = !_goingForward;
_next = null;
fillNext();
}
@Override
protected K peekImpl() {
if (_next == null)
throw new NoSuchElementException();
return _next;
}
@Override
protected void skipImpl() {
if (_next == null)
throw new NoSuchElementException();
_next = null;
if (_goingForward)
_backing.skip();
else
_backing.skipPrev();
fillNext();
}
@Override
protected boolean hasImpl() {
return _next != null;
}
@Override
protected Pair<K, V> nextImpl() {
if (_next == null)
throw new NoSuchElementException("No more elements");
var retKey = _next;
_next = null;
var got = _goingForward ? _backing.next() : _backing.prev();
assert got.getKey().equals(retKey);
fillNext();
return got;
}
@Override
public void close() {
_backing.close();
}
@Override
public String toString() {
return "KeyPredicateKvIterator{" +
"_backing=" + _backing +
", _next=" + _next +
'}';
}
}

View File

@@ -0,0 +1,7 @@
package com.usatiuk.dhfs.objects;
import java.util.Optional;
public interface MaybeTombstone<T> {
Optional<T> opt();
}

View File

@@ -5,6 +5,7 @@ import io.quarkus.logging.Log;
import org.apache.commons.lang3.tuple.Pair; import org.apache.commons.lang3.tuple.Pair;
import java.util.*; import java.util.*;
import java.util.stream.Collectors;
public class MergingKvIterator<K extends Comparable<K>, V> extends ReversibleKvIterator<K, V> { public class MergingKvIterator<K extends Comparable<K>, V> extends ReversibleKvIterator<K, V> {
private final Map<CloseableKvIterator<K, V>, Integer> _iterators; private final Map<CloseableKvIterator<K, V>, Integer> _iterators;
@@ -22,16 +23,22 @@ public class MergingKvIterator<K extends Comparable<K>, V> extends ReversibleKvI
// Starting at a greatest key less than/less or equal than: // Starting at a greatest key less than/less or equal than:
// We have a bunch of iterators that have given us theirs "greatest LT/LE key" // We have a bunch of iterators that have given us theirs "greatest LT/LE key"
// now we need to pick the greatest of those to start with // now we need to pick the greatest of those to start with
// But if some of them don't have a lesser key, we need to pick the smallest of those
var initialIterators = iterators.stream().map(p -> p.get(initialStartType, initialStartKey)).toList(); var initialIterators = iterators.stream().map(p -> p.get(initialStartType, initialStartKey)).toList();
try { try {
K initialMaxValue = initialIterators.stream() IteratorStart finalStartType = startType;
var found = initialIterators.stream()
.filter(CloseableKvIterator::hasNext) .filter(CloseableKvIterator::hasNext)
.map((i) -> { .map((i) -> {
var peeked = i.peekNextKey(); var peeked = i.peekNextKey();
// Log.warnv("peeked: {0}, from {1}", peeked, i.getClass()); // Log.warnv("peeked: {0}, from {1}", peeked, i.getClass());
return peeked; return peeked;
}) }).distinct().collect(Collectors.partitioningBy(e -> finalStartType == IteratorStart.LE ? e.compareTo(initialStartKey) <= 0 : e.compareTo(initialStartKey) < 0));
.max(Comparator.naturalOrder()).orElse(null); K initialMaxValue;
if (!found.get(true).isEmpty())
initialMaxValue = found.get(true).stream().max(Comparator.naturalOrder()).orElse(null);
else
initialMaxValue = found.get(false).stream().min(Comparator.naturalOrder()).orElse(null);
if (initialMaxValue == null) { if (initialMaxValue == null) {
fail = true; fail = true;
} }
@@ -61,12 +68,12 @@ public class MergingKvIterator<K extends Comparable<K>, V> extends ReversibleKvI
Log.tracev("{0} Created: {1}", _name, _sortedIterators); Log.tracev("{0} Created: {1}", _name, _sortedIterators);
switch (initialStartType) { switch (initialStartType) {
case LT -> { // case LT -> {
assert _sortedIterators.isEmpty() || _sortedIterators.firstKey().compareTo(initialStartKey) < 0; // assert _sortedIterators.isEmpty() || _sortedIterators.firstKey().compareTo(initialStartKey) < 0;
} // }
case LE -> { // case LE -> {
assert _sortedIterators.isEmpty() || _sortedIterators.firstKey().compareTo(initialStartKey) <= 0; // assert _sortedIterators.isEmpty() || _sortedIterators.firstKey().compareTo(initialStartKey) <= 0;
} // }
case GT -> { case GT -> {
assert _sortedIterators.isEmpty() || _sortedIterators.firstKey().compareTo(initialStartKey) > 0; assert _sortedIterators.isEmpty() || _sortedIterators.firstKey().compareTo(initialStartKey) > 0;
} }
@@ -88,6 +95,7 @@ public class MergingKvIterator<K extends Comparable<K>, V> extends ReversibleKvI
} }
K key = iterator.peekNextKey(); K key = iterator.peekNextKey();
Log.tracev("{0} Advance peeked: {1}-{2}", _name, iterator, key);
if (!_sortedIterators.containsKey(key)) { if (!_sortedIterators.containsKey(key)) {
_sortedIterators.put(key, iterator); _sortedIterators.put(key, iterator);
return; return;
@@ -110,6 +118,7 @@ public class MergingKvIterator<K extends Comparable<K>, V> extends ReversibleKvI
@Override @Override
protected void reverse() { protected void reverse() {
var cur = _goingForward ? _sortedIterators.pollFirstEntry() : _sortedIterators.pollLastEntry(); var cur = _goingForward ? _sortedIterators.pollFirstEntry() : _sortedIterators.pollLastEntry();
Log.tracev("{0} Reversing from {1}", _name, cur);
_goingForward = !_goingForward; _goingForward = !_goingForward;
_sortedIterators.clear(); _sortedIterators.clear();
for (CloseableKvIterator<K, V> iterator : _iterators.keySet()) { for (CloseableKvIterator<K, V> iterator : _iterators.keySet()) {
@@ -126,6 +135,7 @@ public class MergingKvIterator<K extends Comparable<K>, V> extends ReversibleKvI
|| (!_goingForward && peekImpl().compareTo(cur.getKey()) >= 0))) { || (!_goingForward && peekImpl().compareTo(cur.getKey()) >= 0))) {
skipImpl(); skipImpl();
} }
Log.tracev("{0} Reversed to {1}", _name, _sortedIterators);
} }
@Override @Override

View File

@@ -1,6 +1,7 @@
package com.usatiuk.dhfs.objects; package com.usatiuk.dhfs.objects;
import com.usatiuk.dhfs.objects.persistence.IteratorStart; import com.usatiuk.dhfs.objects.persistence.IteratorStart;
import io.quarkus.logging.Log;
import org.apache.commons.lang3.tuple.Pair; import org.apache.commons.lang3.tuple.Pair;
import java.util.NoSuchElementException; import java.util.NoSuchElementException;
@@ -68,13 +69,18 @@ public class PredicateKvIterator<K extends Comparable<K>, V, V_T> extends Revers
@Override @Override
protected void reverse() { protected void reverse() {
_goingForward = !_goingForward; _goingForward = !_goingForward;
_next = null; boolean wasAtEnd = _next == null;
if (_goingForward && _backing.hasNext()) if (_goingForward && !wasAtEnd)
_backing.skip(); _backing.skip();
else if (!_goingForward && _backing.hasPrev()) else if (!_goingForward && !wasAtEnd)
_backing.skipPrev(); _backing.skipPrev();
if (!wasAtEnd)
Log.tracev("Skipped in reverse: {0}", _next);
_next = null;
fillNext(); fillNext();
} }

View File

@@ -1,125 +0,0 @@
package com.usatiuk.dhfs.objects;
import com.usatiuk.dhfs.objects.persistence.IteratorStart;
import io.quarkus.logging.Log;
import org.apache.commons.lang3.tuple.Pair;
import java.util.NoSuchElementException;
import java.util.concurrent.locks.Lock;
import java.util.function.Function;
import java.util.function.Supplier;
// Also checks that the next provided item is always consistent after a refresh
public class SelfRefreshingKvIterator<K extends Comparable<K>, V> implements CloseableKvIterator<K, V> {
private CloseableKvIterator<K, V> _backing;
private long _curVersion = -1L;
private final Lock _lock;
private final Function<Pair<IteratorStart, K>, CloseableKvIterator<K, V>> _iteratorSupplier;
private final Supplier<Long> _versionSupplier;
private Pair<K, V> _next;
public SelfRefreshingKvIterator(Function<Pair<IteratorStart, K>, CloseableKvIterator<K, V>> iteratorSupplier, Supplier<Long> versionSupplier, Lock lock,
IteratorStart start, K key) {
_iteratorSupplier = iteratorSupplier;
_versionSupplier = versionSupplier;
_lock = lock;
_lock.lock();
try {
long curVersion = _versionSupplier.get();
_backing = _iteratorSupplier.apply(Pair.of(start, key));
_next = _backing.hasNext() ? _backing.next() : null;
_curVersion = curVersion;
} finally {
_lock.unlock();
}
}
private void maybeRefresh() {
_lock.lock();
CloseableKvIterator<K, V> oldBacking = null;
try {
if (_versionSupplier.get() == _curVersion) {
return;
}
Log.tracev("Refreshing iterator last refreshed {0}, current version {1}, current value {2}",
_curVersion, _versionSupplier.get(), _next);
long newVersion = _versionSupplier.get();
oldBacking = _backing;
_backing = _iteratorSupplier.apply(Pair.of(IteratorStart.GE, _next.getKey()));
var next = _backing.hasNext() ? _backing.next() : null;
if (next == null) {
Log.errorv("Failed to refresh iterator, null last refreshed {0}," +
" current version {1}, current value {2}, read value {3}", _curVersion, newVersion, _next, next);
assert false;
} else if (!next.equals(_next)) {
Log.errorv("Failed to refresh iterator, mismatch last refreshed {0}," +
" current version {1}, current value {2}, read value {3}", _curVersion, newVersion, _next, next);
assert false;
}
Log.tracev("Refreshed iterator last refreshed {0}, current version {1}, old value {2}, new value {3}",
_curVersion, newVersion, _next, next);
_next = next;
_curVersion = newVersion;
} finally {
_lock.unlock();
if (oldBacking != null) {
oldBacking.close();
}
}
}
// _next should always be valid, so it's ok to do the refresh "lazily"
private void prepareNext() {
_lock.lock();
try {
maybeRefresh();
if (_backing.hasNext()) {
_next = _backing.next();
} else {
_next = null;
}
} finally {
_lock.unlock();
}
}
@Override
public K peekNextKey() {
if (_next == null) {
throw new NoSuchElementException();
}
return _next.getKey();
}
@Override
public void skip() {
if (_next == null) {
throw new NoSuchElementException();
}
prepareNext();
}
@Override
public void close() {
_backing.close();
}
@Override
public boolean hasNext() {
return _next != null;
}
@Override
public Pair<K, V> next() {
if (_next == null) {
throw new NoSuchElementException("No more elements");
}
var ret = _next;
prepareNext();
Log.tracev("Read: {0}, next: {1}", ret, _next);
return ret;
}
}

View File

@@ -1,11 +0,0 @@
package com.usatiuk.dhfs.objects;
public class StaleIteratorException extends RuntimeException {
public StaleIteratorException() {
super();
}
public StaleIteratorException(String message) {
super(message);
}
}

View File

@@ -0,0 +1,10 @@
package com.usatiuk.dhfs.objects;
import java.util.Optional;
public record Tombstone<V>() implements MaybeTombstone<V> {
@Override
public Optional<V> opt() {
return Optional.empty();
}
}

View File

@@ -10,7 +10,7 @@ public class TombstoneMergingKvIterator<K extends Comparable<K>, V> implements C
private final CloseableKvIterator<K, V> _backing; private final CloseableKvIterator<K, V> _backing;
private final String _name; private final String _name;
public TombstoneMergingKvIterator(String name, IteratorStart startType, K startKey, List<IterProdFn<K, DataType<V>>> iterators) { public TombstoneMergingKvIterator(String name, IteratorStart startType, K startKey, List<IterProdFn<K, MaybeTombstone<V>>> iterators) {
_name = name; _name = name;
_backing = new PredicateKvIterator<>( _backing = new PredicateKvIterator<>(
new MergingKvIterator<>(name + "-merging", startType, startKey, iterators), new MergingKvIterator<>(name + "-merging", startType, startKey, iterators),
@@ -20,24 +20,15 @@ public class TombstoneMergingKvIterator<K extends Comparable<K>, V> implements C
if (pair instanceof Tombstone) { if (pair instanceof Tombstone) {
return null; return null;
} }
return ((Data<V>) pair).value; return ((Data<V>) pair).value();
}); });
} }
@SafeVarargs @SafeVarargs
public TombstoneMergingKvIterator(String name, IteratorStart startType, K startKey, IterProdFn<K, DataType<V>>... iterators) { public TombstoneMergingKvIterator(String name, IteratorStart startType, K startKey, IterProdFn<K, MaybeTombstone<V>>... iterators) {
this(name, startType, startKey, List.of(iterators)); this(name, startType, startKey, List.of(iterators));
} }
public interface DataType<T> {
}
public record Tombstone<V>() implements DataType<V> {
}
public record Data<V>(V value) implements DataType<V> {
}
@Override @Override
public K peekNextKey() { public K peekNextKey() {
return _backing.peekNextKey(); return _backing.peekNextKey();
@@ -48,6 +39,26 @@ public class TombstoneMergingKvIterator<K extends Comparable<K>, V> implements C
_backing.skip(); _backing.skip();
} }
@Override
public K peekPrevKey() {
return _backing.peekPrevKey();
}
@Override
public Pair<K, V> prev() {
return _backing.prev();
}
@Override
public boolean hasPrev() {
return _backing.hasPrev();
}
@Override
public void skipPrev() {
_backing.skipPrev();
}
@Override @Override
public void close() { public void close() {
_backing.close(); _backing.close();

View File

@@ -1,9 +0,0 @@
package com.usatiuk.dhfs.objects;
public interface TxBundle {
long getId();
void commit(JDataVersionedWrapper obj);
void delete(JObjectKey obj);
}

View File

@@ -25,20 +25,21 @@ import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.BiConsumer;
import java.util.function.Consumer; import java.util.function.Consumer;
@ApplicationScoped @ApplicationScoped
public class WritebackObjectPersistentStore { public class WritebackObjectPersistentStore {
private final LinkedList<TxBundleImpl> _pendingBundles = new LinkedList<>(); private final LinkedList<TxBundle> _pendingBundles = new LinkedList<>();
private final AtomicReference<PSortedMap<JObjectKey, PendingWriteEntry>> _pendingWrites = new AtomicReference<>(TreePMap.empty()); private final AtomicReference<PSortedMap<JObjectKey, PendingWriteEntry>> _pendingWrites = new AtomicReference<>(TreePMap.empty());
private final ReentrantReadWriteLock _pendingWritesVersionLock = new ReentrantReadWriteLock(); private final ReentrantReadWriteLock _pendingWritesVersionLock = new ReentrantReadWriteLock();
private final AtomicLong _pendingWritesVersion = new AtomicLong(); private final LinkedHashMap<Long, TxBundle> _notFlushedBundles = new LinkedHashMap<>();
private final LinkedHashMap<Long, TxBundleImpl> _notFlushedBundles = new LinkedHashMap<>();
private final Object _flushWaitSynchronizer = new Object(); private final Object _flushWaitSynchronizer = new Object();
private final AtomicLong _lastWrittenTx = new AtomicLong(-1); private final AtomicLong _lastWrittenTx = new AtomicLong(-1);
private final AtomicLong _counter = new AtomicLong(); private final AtomicLong _counter = new AtomicLong();
private final AtomicLong _lastCommittedTx = new AtomicLong(-1);
private final AtomicLong _waitedTotal = new AtomicLong(0); private final AtomicLong _waitedTotal = new AtomicLong(0);
@Inject @Inject
CachingObjectPersistentStore cachedStore; CachingObjectPersistentStore cachedStore;
@@ -70,6 +71,8 @@ public class WritebackObjectPersistentStore {
} catch (InterruptedException ignored) { } catch (InterruptedException ignored) {
} }
}); });
_counter.set(cachedStore.getLastTxId());
_lastCommittedTx.set(cachedStore.getLastTxId());
_ready = true; _ready = true;
} }
@@ -94,7 +97,7 @@ public class WritebackObjectPersistentStore {
private void writeback() { private void writeback() {
while (!Thread.interrupted()) { while (!Thread.interrupted()) {
try { try {
TxBundleImpl bundle = new TxBundleImpl(0); TxBundle bundle = new TxBundle(0);
synchronized (_pendingBundles) { synchronized (_pendingBundles) {
while (_pendingBundles.isEmpty() || !_pendingBundles.peek()._ready) while (_pendingBundles.isEmpty() || !_pendingBundles.peek()._ready)
_pendingBundles.wait(); _pendingBundles.wait();
@@ -116,11 +119,11 @@ public class WritebackObjectPersistentStore {
for (var e : bundle._entries.values()) { for (var e : bundle._entries.values()) {
switch (e) { switch (e) {
case TxBundleImpl.CommittedEntry(JObjectKey key, JDataVersionedWrapper data, int size) -> { case TxBundle.CommittedEntry(JObjectKey key, JDataVersionedWrapper data, int size) -> {
Log.trace("Writing new " + key); Log.trace("Writing new " + key);
toWrite.add(Pair.of(key, data)); toWrite.add(Pair.of(key, data));
} }
case TxBundleImpl.DeletedEntry(JObjectKey key) -> { case TxBundle.DeletedEntry(JObjectKey key) -> {
Log.trace("Deleting from persistent storage " + key); Log.trace("Deleting from persistent storage " + key);
toDelete.add(key); toDelete.add(key);
} }
@@ -132,11 +135,13 @@ public class WritebackObjectPersistentStore {
new TxManifestObj<>( new TxManifestObj<>(
Collections.unmodifiableList(toWrite), Collections.unmodifiableList(toWrite),
Collections.unmodifiableList(toDelete) Collections.unmodifiableList(toDelete)
)); ), bundle.getId());
Log.trace("Bundle " + bundle.getId() + " committed"); Log.trace("Bundle " + bundle.getId() + " committed");
// Remove from pending writes, after real commit // Remove from pending writes, after real commit
// As we are the only writers to _pendingWrites, no need to synchronize with iterator creation
// if they get the older version, as it will still contain all the new changes
synchronized (_pendingBundles) { synchronized (_pendingBundles) {
var curPw = _pendingWrites.get(); var curPw = _pendingWrites.get();
for (var e : bundle._entries.values()) { for (var e : bundle._entries.values()) {
@@ -219,7 +224,7 @@ public class WritebackObjectPersistentStore {
} }
} }
synchronized (_notFlushedBundles) { synchronized (_notFlushedBundles) {
var bundle = new TxBundleImpl(_counter.incrementAndGet()); var bundle = new TxBundle(_counter.incrementAndGet());
_pendingBundles.addLast(bundle); _pendingBundles.addLast(bundle);
_notFlushedBundles.put(bundle.getId(), bundle); _notFlushedBundles.put(bundle.getId(), bundle);
return bundle; return bundle;
@@ -234,26 +239,28 @@ public class WritebackObjectPersistentStore {
try { try {
synchronized (_pendingBundles) { synchronized (_pendingBundles) {
var curPw = _pendingWrites.get(); var curPw = _pendingWrites.get();
for (var e : ((TxBundleImpl) bundle)._entries.values()) { for (var e : ((TxBundle) bundle)._entries.values()) {
switch (e) { switch (e) {
case TxBundleImpl.CommittedEntry c -> { case TxBundle.CommittedEntry c -> {
curPw = curPw.plus(c.key(), new PendingWrite(c.data, bundle.getId())); curPw = curPw.plus(c.key(), new PendingWrite(c.data, bundle.getId()));
} }
case TxBundleImpl.DeletedEntry d -> { case TxBundle.DeletedEntry d -> {
curPw = curPw.plus(d.key(), new PendingDelete(d.key, bundle.getId())); curPw = curPw.plus(d.key(), new PendingDelete(d.key, bundle.getId()));
} }
default -> throw new IllegalStateException("Unexpected value: " + e); default -> throw new IllegalStateException("Unexpected value: " + e);
} }
} }
// Now, make the changes visible to new iterators
_pendingWrites.set(curPw); _pendingWrites.set(curPw);
((TxBundleImpl) bundle).setReady(); ((TxBundle) bundle).setReady();
_pendingWritesVersion.incrementAndGet();
if (_pendingBundles.peek() == bundle) if (_pendingBundles.peek() == bundle)
_pendingBundles.notify(); _pendingBundles.notify();
synchronized (_flushWaitSynchronizer) { synchronized (_flushWaitSynchronizer) {
currentSize += ((TxBundleImpl) bundle).calculateTotalSize(); currentSize += ((TxBundle) bundle).calculateTotalSize();
} }
} }
assert bundle.getId() > _lastCommittedTx.get();
_lastCommittedTx.set(bundle.getId());
} finally { } finally {
_pendingWritesVersionLock.writeLock().unlock(); _pendingWritesVersionLock.writeLock().unlock();
} }
@@ -263,9 +270,9 @@ public class WritebackObjectPersistentStore {
verifyReady(); verifyReady();
synchronized (_pendingBundles) { synchronized (_pendingBundles) {
Log.warn("Dropped bundle: " + bundle); Log.warn("Dropped bundle: " + bundle);
_pendingBundles.remove((TxBundleImpl) bundle); _pendingBundles.remove((TxBundle) bundle);
synchronized (_flushWaitSynchronizer) { synchronized (_flushWaitSynchronizer) {
currentSize -= ((TxBundleImpl) bundle).calculateTotalSize(); currentSize -= ((TxBundle) bundle).calculateTotalSize();
} }
} }
} }
@@ -296,7 +303,7 @@ public class WritebackObjectPersistentStore {
} }
} }
private static class TxBundleImpl implements TxBundle { private static class TxBundle {
private final LinkedHashMap<JObjectKey, BundleEntry> _entries = new LinkedHashMap<>(); private final LinkedHashMap<JObjectKey, BundleEntry> _entries = new LinkedHashMap<>();
private final ArrayList<Runnable> _callbacks = new ArrayList<>(); private final ArrayList<Runnable> _callbacks = new ArrayList<>();
private long _txId; private long _txId;
@@ -304,7 +311,7 @@ public class WritebackObjectPersistentStore {
private long _size = -1; private long _size = -1;
private boolean _wasCommitted = false; private boolean _wasCommitted = false;
private TxBundleImpl(long txId) { private TxBundle(long txId) {
_txId = txId; _txId = txId;
} }
@@ -348,7 +355,7 @@ public class WritebackObjectPersistentStore {
return _size; return _size;
} }
public void compress(TxBundleImpl other) { public void compress(TxBundle other) {
if (_txId >= other._txId) if (_txId >= other._txId)
throw new IllegalArgumentException("Compressing an older bundle into newer"); throw new IllegalArgumentException("Compressing an older bundle into newer");
@@ -412,14 +419,20 @@ public class WritebackObjectPersistentStore {
return new VerboseReadResultPersisted(cachedStore.readObject(key)); return new VerboseReadResultPersisted(cachedStore.readObject(key));
} }
public Consumer<Runnable> commitTx(Collection<TxRecord.TxObjectRecord<?>> writes, long id) { /**
* @param commitLocked - a function that will be called with a Consumer of a new transaction id,
* that will commit the transaction the changes in the store will be visible to new transactions
* only after the runnable is called
*/
public Consumer<Runnable> commitTx(Collection<TxRecord.TxObjectRecord<?>> writes, BiConsumer<Long, Runnable> commitLocked) {
var bundle = createBundle(); var bundle = createBundle();
long bundleId = bundle.getId();
try { try {
for (var action : writes) { for (var action : writes) {
switch (action) { switch (action) {
case TxRecord.TxObjectRecordWrite<?> write -> { case TxRecord.TxObjectRecordWrite<?> write -> {
Log.trace("Flushing object " + write.key()); Log.trace("Flushing object " + write.key());
bundle.commit(new JDataVersionedWrapper(write.data(), id)); bundle.commit(new JDataVersionedWrapper(write.data(), bundleId));
} }
case TxRecord.TxObjectRecordDeleted deleted -> { case TxRecord.TxObjectRecordDeleted deleted -> {
Log.trace("Deleting object " + deleted.key()); Log.trace("Deleting object " + deleted.key());
@@ -435,10 +448,11 @@ public class WritebackObjectPersistentStore {
throw new TxCommitException(t.getMessage(), t); throw new TxCommitException(t.getMessage(), t);
} }
Log.tracef("Committing transaction %d to storage", id);
commitBundle(bundle);
long bundleId = bundle.getId(); Log.tracef("Committing transaction %d to storage", bundleId);
commitLocked.accept(bundleId, () -> {
commitBundle(bundle);
});
return r -> asyncFence(bundleId, r); return r -> asyncFence(bundleId, r);
} }
@@ -451,29 +465,26 @@ public class WritebackObjectPersistentStore {
_pendingWritesVersionLock.readLock().lock(); _pendingWritesVersionLock.readLock().lock();
try { try {
var curPending = _pendingWrites.get(); var curPending = _pendingWrites.get();
return new TombstoneMergingKvIterator<>("writeback-ps", start, key,
return new InvalidatableKvIterator<>( (tS, tK) -> new MappingKvIterator<>(
new InconsistentKvIteratorWrapper<>( new NavigableMapKvIterator<>(curPending, tS, tK),
p -> e -> switch (e) {
new TombstoneMergingKvIterator<>("writeback-ps", p.getLeft(), p.getRight(), case PendingWrite pw -> new Data<>(pw.data());
(tS, tK) -> new MappingKvIterator<>( case PendingDelete d -> new Tombstone<>();
new NavigableMapKvIterator<>(curPending, tS, tK), default -> throw new IllegalStateException("Unexpected value: " + e);
e -> switch (e) { }),
case PendingWrite pw -> (tS, tK) -> cachedStore.getIterator(tS, tK));
new TombstoneMergingKvIterator.Data<>(pw.data());
case PendingDelete d ->
new TombstoneMergingKvIterator.Tombstone<>();
default ->
throw new IllegalStateException("Unexpected value: " + e);
}),
(tS, tK) -> new MappingKvIterator<>(cachedStore.getIterator(tS, tK), TombstoneMergingKvIterator.Data::new)), start, key),
_pendingWritesVersion::get, _pendingWritesVersionLock.readLock());
} finally { } finally {
_pendingWritesVersionLock.readLock().unlock(); _pendingWritesVersionLock.readLock().unlock();
} }
} }
public CloseableKvIterator<JObjectKey, JDataVersionedWrapper> getIterator(JObjectKey key) { public long getLastTxId() {
return getIterator(IteratorStart.GE, key); _pendingWritesVersionLock.readLock().lock();
try {
return _lastCommittedTx.get();
} finally {
_pendingWritesVersionLock.readLock().unlock();
}
} }
} }

View File

@@ -8,29 +8,24 @@ import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject; import jakarta.inject.Inject;
import org.apache.commons.lang3.tuple.Pair; import org.apache.commons.lang3.tuple.Pair;
import org.eclipse.microprofile.config.inject.ConfigProperty; import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.pcollections.TreePMap;
import javax.annotation.Nonnull; import javax.annotation.Nonnull;
import java.util.Collection;
import java.util.HashSet;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Stream;
@ApplicationScoped @ApplicationScoped
public class CachingObjectPersistentStore { public class CachingObjectPersistentStore {
private final LinkedHashMap<JObjectKey, CacheEntry> _cache = new LinkedHashMap<>(8, 0.75f, true); private final LinkedHashMap<JObjectKey, CacheEntry> _cache = new LinkedHashMap<>(8, 0.75f, true);
private final ConcurrentSkipListMap<JObjectKey, CacheEntry> _sortedCache = new ConcurrentSkipListMap<>(); private TreePMap<JObjectKey, CacheEntry> _sortedCache = TreePMap.empty();
private long _cacheVersion = 0;
private final AtomicLong _cacheVersion = new AtomicLong(0); private final ReentrantReadWriteLock _lock = new ReentrantReadWriteLock();
private final ReentrantReadWriteLock _cacheVersionLock = new ReentrantReadWriteLock();
private final HashSet<JObjectKey> _pendingWrites = new HashSet<>();
private final DataLocker _locker = new DataLocker(); private final DataLocker _locker = new DataLocker();
@Inject @Inject
SerializingObjectPersistentStore delegate; SerializingObjectPersistentStore delegate;
@ConfigProperty(name = "dhfs.objects.lru.limit") @ConfigProperty(name = "dhfs.objects.lru.limit")
@@ -61,90 +56,78 @@ public class CachingObjectPersistentStore {
} }
} }
@Nonnull
public Collection<JObjectKey> findAllObjects() {
return delegate.findAllObjects();
}
private void put(JObjectKey key, Optional<JDataVersionedWrapper> obj) { private void put(JObjectKey key, Optional<JDataVersionedWrapper> obj) {
// Log.tracev("Adding {0} to cache: {1}", key, obj); // Log.tracev("Adding {0} to cache: {1}", key, obj);
synchronized (_cache) { _lock.writeLock().lock();
assert !_pendingWrites.contains(key); try {
int size = obj.map(o -> o.data().estimateSize()).orElse(0); int size = obj.map(o -> o.data().estimateSize()).orElse(16);
_curSize += size; _curSize += size;
var entry = new CacheEntry(obj, size); var entry = new CacheEntry(obj.<MaybeTombstone<JDataVersionedWrapper>>map(Data::new).orElse(new Tombstone<>()), size);
var old = _cache.putLast(key, entry); var old = _cache.putLast(key, entry);
_sortedCache.put(key, entry);
_sortedCache = _sortedCache.plus(key, entry);
if (old != null) if (old != null)
_curSize -= old.size(); _curSize -= old.size();
while (_curSize >= sizeLimit) { while (_curSize >= sizeLimit) {
var del = _cache.pollFirstEntry(); var del = _cache.pollFirstEntry();
_sortedCache.remove(del.getKey(), del.getValue()); _sortedCache = _sortedCache.minus(del.getKey());
_curSize -= del.getValue().size(); _curSize -= del.getValue().size();
_evict++; _evict++;
} }
} finally {
_lock.writeLock().unlock();
} }
} }
@Nonnull @Nonnull
public Optional<JDataVersionedWrapper> readObject(JObjectKey name) { public Optional<JDataVersionedWrapper> readObject(JObjectKey name) {
try (var lock = _locker.lock(name)) { try (var lock = _locker.lock(name)) {
synchronized (_cache) { _lock.readLock().lock();
try {
var got = _cache.get(name); var got = _cache.get(name);
if (got != null) { if (got != null) {
return got.object(); return got.object().opt();
} }
} finally {
_lock.readLock().unlock();
} }
var got = delegate.readObject(name); // TODO: This is possibly racy
put(name, got); // var got = delegate.readObject(name);
return got; // put(name, got);
return delegate.readObject(name);
} }
} }
public void commitTx(TxManifestObj<? extends JDataVersionedWrapper> names) { public void commitTx(TxManifestObj<? extends JDataVersionedWrapper> names, long txId) {
var serialized = delegate.prepareManifest(names); var serialized = delegate.prepareManifest(names);
_cacheVersionLock.writeLock().lock(); Log.tracev("Committing: {0} writes, {1} deletes", names.written().size(), names.deleted().size());
try { delegate.commitTx(serialized, txId, (commit) -> {
// During commit, readObject shouldn't be called for these items, _lock.writeLock().lock();
// it should be handled by the upstream store try {
synchronized (_cache) { // Make the changes visible atomically both in cache and in the underlying store
for (var write : names.written()) { for (var write : names.written()) {
put(write.getLeft(), Optional.of(write.getRight())); put(write.getLeft(), Optional.of(write.getRight()));
var added = _pendingWrites.add(write.getLeft());
assert added;
} }
for (var del : names.deleted()) { for (var del : names.deleted()) {
// TODO: tombstone cache? put(del, Optional.empty());
_curSize -= Optional.ofNullable(_cache.get(del)).map(CacheEntry::size).orElse(0L);
_cache.remove(del);
_sortedCache.remove(del);
var added = _pendingWrites.add(del);
assert added;
} }
++_cacheVersion;
commit.run();
} finally {
_lock.writeLock().unlock();
} }
Log.tracev("Committing: {0} writes, {1} deletes", names.written().size(), names.deleted().size()); });
delegate.commitTx(serialized); Log.tracev("Committed: {0} writes, {1} deletes", names.written().size(), names.deleted().size());
// Now, reading from the backing store should return the new data
synchronized (_cache) {
for (var key : Stream.concat(names.written().stream().map(Pair::getLeft),
names.deleted().stream()).toList()) {
var removed = _pendingWrites.remove(key);
assert removed;
}
}
_cacheVersion.incrementAndGet();
Log.tracev("Committed: {0} writes, {1} deletes", names.written().size(), names.deleted().size());
} finally {
_cacheVersionLock.writeLock().unlock();
}
} }
private class CachingKvIterator implements CloseableKvIterator<JObjectKey, JDataVersionedWrapper> { private class CachingKvIterator implements CloseableKvIterator<JObjectKey, JDataVersionedWrapper> {
private final CloseableKvIterator<JObjectKey, JDataVersionedWrapper> _delegate; private final CloseableKvIterator<JObjectKey, JDataVersionedWrapper> _delegate;
// This should be created under lock
private final long _curCacheVersion = _cacheVersion;
private CachingKvIterator(CloseableKvIterator<JObjectKey, JDataVersionedWrapper> delegate) { private CachingKvIterator(CloseableKvIterator<JObjectKey, JDataVersionedWrapper> delegate) {
_delegate = delegate; _delegate = delegate;
@@ -175,11 +158,24 @@ public class CachingObjectPersistentStore {
return _delegate.peekPrevKey(); return _delegate.peekPrevKey();
} }
private void maybeCache(Pair<JObjectKey, JDataVersionedWrapper> prev) {
_lock.writeLock().lock();
try {
if (_cacheVersion != _curCacheVersion) {
Log.tracev("Not caching: {0}", prev);
} else {
Log.tracev("Caching: {0}", prev);
put(prev.getKey(), Optional.of(prev.getValue()));
}
} finally {
_lock.writeLock().unlock();
}
}
@Override @Override
public Pair<JObjectKey, JDataVersionedWrapper> prev() { public Pair<JObjectKey, JDataVersionedWrapper> prev() {
var prev = _delegate.prev(); var prev = _delegate.prev();
Log.tracev("Caching: {0}", prev); maybeCache(prev);
put(prev.getKey(), Optional.of(prev.getValue()));
return prev; return prev;
} }
@@ -196,8 +192,7 @@ public class CachingObjectPersistentStore {
@Override @Override
public Pair<JObjectKey, JDataVersionedWrapper> next() { public Pair<JObjectKey, JDataVersionedWrapper> next() {
var next = _delegate.next(); var next = _delegate.next();
Log.tracev("Caching: {0}", next); maybeCache(next);
put(next.getKey(), Optional.of(next.getValue()));
return next; return next;
} }
} }
@@ -206,30 +201,31 @@ public class CachingObjectPersistentStore {
// Does not have to guarantee consistent view, snapshots are handled by upper layers // Does not have to guarantee consistent view, snapshots are handled by upper layers
// Warning: it has a nasty side effect of global caching, so in this case don't even call next on it, // Warning: it has a nasty side effect of global caching, so in this case don't even call next on it,
// if some objects are still in writeback // if some objects are still in writeback
public CloseableKvIterator<JObjectKey, JDataVersionedWrapper> getIterator(IteratorStart start, JObjectKey key) { public CloseableKvIterator<JObjectKey, MaybeTombstone<JDataVersionedWrapper>> getIterator(IteratorStart start, JObjectKey key) {
Log.tracev("Getting cache iterator: {0}, {1}", start, key); _lock.readLock().lock();
_cacheVersionLock.readLock().lock();
try { try {
return new InconsistentSelfRefreshingKvIterator<>( Log.tracev("Getting cache iterator: {0}, {1}", start, key);
p -> new MergingKvIterator<>("cache", p.getLeft(), p.getRight(), var curSortedCache = _sortedCache;
(mS, mK) -> new PredicateKvIterator<>( return new MergingKvIterator<>("cache", start, key,
new NavigableMapKvIterator<>(_sortedCache, mS, mK), (mS, mK)
mS, mK, -> new MappingKvIterator<>(
e -> { new NavigableMapKvIterator<>(curSortedCache, mS, mK),
Log.tracev("Taken from cache: {0}", e); e -> {
return e.object().orElse(null); Log.tracev("Taken from cache: {0}", e);
} return e.object();
), (mS, mK) -> new CachingKvIterator(delegate.getIterator(mS, mK))), _cacheVersion::get, }
_cacheVersionLock.readLock(), start, key); ),
(mS, mK)
-> new MappingKvIterator<>(new CachingKvIterator(delegate.getIterator(mS, mK)), Data::new));
} finally { } finally {
_cacheVersionLock.readLock().unlock(); _lock.readLock().unlock();
} }
} }
public CloseableKvIterator<JObjectKey, JDataVersionedWrapper> getIterator(JObjectKey key) { private record CacheEntry(MaybeTombstone<JDataVersionedWrapper> object, long size) {
return getIterator(IteratorStart.GE, key);
} }
private record CacheEntry(Optional<JDataVersionedWrapper> object, long size) { public long getLastTxId() {
return delegate.getLastCommitId();
} }
} }

View File

@@ -3,6 +3,7 @@ package com.usatiuk.dhfs.objects.persistence;
import com.google.protobuf.ByteString; import com.google.protobuf.ByteString;
import com.usatiuk.dhfs.objects.CloseableKvIterator; import com.usatiuk.dhfs.objects.CloseableKvIterator;
import com.usatiuk.dhfs.objects.JObjectKey; import com.usatiuk.dhfs.objects.JObjectKey;
import com.usatiuk.dhfs.objects.KeyPredicateKvIterator;
import com.usatiuk.dhfs.objects.ReversibleKvIterator; import com.usatiuk.dhfs.objects.ReversibleKvIterator;
import com.usatiuk.dhfs.supportlib.UninitializedByteBuffer; import com.usatiuk.dhfs.supportlib.UninitializedByteBuffer;
import io.quarkus.arc.properties.IfBuildProperty; import io.quarkus.arc.properties.IfBuildProperty;
@@ -21,11 +22,11 @@ import javax.annotation.Nonnull;
import java.io.IOException; import java.io.IOException;
import java.lang.ref.Cleaner; import java.lang.ref.Cleaner;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path; import java.nio.file.Path;
import java.util.Collection; import java.util.*;
import java.util.List; import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.NoSuchElementException; import java.util.function.Consumer;
import java.util.Optional;
import static org.lmdbjava.DbiFlags.MDB_CREATE; import static org.lmdbjava.DbiFlags.MDB_CREATE;
import static org.lmdbjava.Env.create; import static org.lmdbjava.Env.create;
@@ -38,7 +39,12 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore {
private Dbi<ByteBuffer> _db; private Dbi<ByteBuffer> _db;
private boolean _ready = false; private boolean _ready = false;
private long _lastTxId = 0;
private final ReentrantReadWriteLock _lock = new ReentrantReadWriteLock();
private static final String DB_NAME = "objects"; private static final String DB_NAME = "objects";
private static final byte[] DB_VER_OBJ_NAME = "__DB_VER_OBJ".getBytes(StandardCharsets.UTF_8);
public LmdbObjectPersistentStore(@ConfigProperty(name = "dhfs.objects.persistence.files.root") String root) { public LmdbObjectPersistentStore(@ConfigProperty(name = "dhfs.objects.persistence.files.root") String root) {
_root = Path.of(root).resolve("objects"); _root = Path.of(root).resolve("objects");
@@ -54,6 +60,20 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore {
.setMaxDbs(1) .setMaxDbs(1)
.open(_root.toFile(), EnvFlags.MDB_NOTLS); .open(_root.toFile(), EnvFlags.MDB_NOTLS);
_db = _env.openDbi(DB_NAME, MDB_CREATE); _db = _env.openDbi(DB_NAME, MDB_CREATE);
var bb = ByteBuffer.allocateDirect(DB_VER_OBJ_NAME.length);
bb.put(DB_VER_OBJ_NAME);
bb.flip();
try (Txn<ByteBuffer> txn = _env.txnRead()) {
var value = _db.get(txn, bb);
if (value != null) {
var ver = value.getLong();
Log.infov("Read version: {0}", ver);
_lastTxId = ver;
}
}
_ready = true; _ready = true;
} }
@@ -100,13 +120,16 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore {
private static final Cleaner CLEANER = Cleaner.create(); private static final Cleaner CLEANER = Cleaner.create();
private final MutableObject<Boolean> _closed = new MutableObject<>(false); private final MutableObject<Boolean> _closed = new MutableObject<>(false);
private final Exception _allocationStacktrace = new Exception();
LmdbKvIterator(IteratorStart start, JObjectKey key) { LmdbKvIterator(IteratorStart start, JObjectKey key) {
_goingForward = true; _goingForward = true;
var closedRef = _closed; var closedRef = _closed;
var bt = _allocationStacktrace;
CLEANER.register(this, () -> { CLEANER.register(this, () -> {
if (!closedRef.getValue()) { if (!closedRef.getValue()) {
Log.error("Iterator was not closed before GC"); Log.error("Iterator was not closed before GC, allocated at: {0}", bt);
System.exit(-1);
} }
}); });
@@ -238,11 +261,11 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore {
@Override @Override
public CloseableKvIterator<JObjectKey, ByteString> getIterator(IteratorStart start, JObjectKey key) { public CloseableKvIterator<JObjectKey, ByteString> getIterator(IteratorStart start, JObjectKey key) {
return new LmdbKvIterator(start, key); return new KeyPredicateKvIterator<>(new LmdbKvIterator(start, key), start, key, (k) -> !Arrays.equals(k.name().getBytes(StandardCharsets.UTF_8), DB_VER_OBJ_NAME));
} }
@Override @Override
public void commitTx(TxManifestRaw names) { public void commitTx(TxManifestRaw names, long txId, Consumer<Runnable> commitLocked) {
verifyReady(); verifyReady();
try (Txn<ByteBuffer> txn = _env.txnWrite()) { try (Txn<ByteBuffer> txn = _env.txnWrite()) {
for (var written : names.written()) { for (var written : names.written()) {
@@ -255,7 +278,31 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore {
for (JObjectKey key : names.deleted()) { for (JObjectKey key : names.deleted()) {
_db.delete(txn, key.toByteBuffer()); _db.delete(txn, key.toByteBuffer());
} }
txn.commit();
var bb = ByteBuffer.allocateDirect(DB_VER_OBJ_NAME.length);
bb.put(DB_VER_OBJ_NAME);
bb.flip();
var bbData = ByteBuffer.allocateDirect(8);
commitLocked.accept(() -> {
_lock.writeLock().lock();
try {
var realTxId = txId;
if (realTxId == -1)
realTxId = _lastTxId + 1;
assert realTxId > _lastTxId;
_lastTxId = realTxId;
bbData.putLong(realTxId);
bbData.flip();
_db.put(txn, bb, bbData);
txn.commit();
} finally {
_lock.writeLock().unlock();
}
});
} }
} }
@@ -277,4 +324,14 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore {
return _root.toFile().getUsableSpace(); return _root.toFile().getUsableSpace();
} }
@Override
public long getLastCommitId() {
_lock.readLock().lock();
try {
return _lastTxId;
} finally {
_lock.readLock().unlock();
}
}
} }

View File

@@ -11,11 +11,15 @@ import javax.annotation.Nonnull;
import java.util.Collection; import java.util.Collection;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
@ApplicationScoped @ApplicationScoped
@IfBuildProperty(name = "dhfs.objects.persistence", stringValue = "memory") @IfBuildProperty(name = "dhfs.objects.persistence", stringValue = "memory")
public class MemoryObjectPersistentStore implements ObjectPersistentStore { public class MemoryObjectPersistentStore implements ObjectPersistentStore {
private final ConcurrentSkipListMap<JObjectKey, ByteString> _objects = new ConcurrentSkipListMap<>(); private final ConcurrentSkipListMap<JObjectKey, ByteString> _objects = new ConcurrentSkipListMap<>();
private long _lastCommitId = 0;
private final ReentrantReadWriteLock _lock = new ReentrantReadWriteLock();
@Nonnull @Nonnull
@Override @Override
@@ -39,7 +43,7 @@ public class MemoryObjectPersistentStore implements ObjectPersistentStore {
} }
@Override @Override
public void commitTx(TxManifestRaw names) { public void commitTx(TxManifestRaw names, long txId, Consumer<Runnable> commitLocked) {
synchronized (this) { synchronized (this) {
for (var written : names.written()) { for (var written : names.written()) {
_objects.put(written.getKey(), written.getValue()); _objects.put(written.getKey(), written.getValue());
@@ -47,6 +51,15 @@ public class MemoryObjectPersistentStore implements ObjectPersistentStore {
for (JObjectKey key : names.deleted()) { for (JObjectKey key : names.deleted()) {
_objects.remove(key); _objects.remove(key);
} }
commitLocked.accept(() -> {
_lock.writeLock().lock();
try {
assert txId > _lastCommitId;
_lastCommitId = txId;
} finally {
_lock.writeLock().unlock();
}
});
} }
} }
@@ -64,4 +77,14 @@ public class MemoryObjectPersistentStore implements ObjectPersistentStore {
public long getUsableSpace() { public long getUsableSpace() {
return 0; return 0;
} }
@Override
public long getLastCommitId() {
_lock.readLock().lock();
try {
return _lastCommitId;
} finally {
_lock.readLock().unlock();
}
}
} }

View File

@@ -7,6 +7,7 @@ import com.usatiuk.dhfs.objects.JObjectKey;
import javax.annotation.Nonnull; import javax.annotation.Nonnull;
import java.util.Collection; import java.util.Collection;
import java.util.Optional; import java.util.Optional;
import java.util.function.Consumer;
// Persistent storage of objects // Persistent storage of objects
// All changes are written as sequential transactions // All changes are written as sequential transactions
@@ -21,15 +22,17 @@ public interface ObjectPersistentStore {
// Does not have to guarantee consistent view, snapshots are handled by upper layers // Does not have to guarantee consistent view, snapshots are handled by upper layers
CloseableKvIterator<JObjectKey, ByteString> getIterator(IteratorStart start, JObjectKey key); CloseableKvIterator<JObjectKey, ByteString> getIterator(IteratorStart start, JObjectKey key);
default CloseableKvIterator<JObjectKey, ByteString> getIterator(JObjectKey key) { /**
return getIterator(IteratorStart.GE, key); * @param commitLocked - a function that will be called with a Runnable that will commit the transaction
} * the changes in the store will be visible to new transactions only after the runnable is called
*/
void commitTx(TxManifestRaw names); void commitTx(TxManifestRaw names, long txId, Consumer<Runnable> commitLocked);
long getTotalSpace(); long getTotalSpace();
long getFreeSpace(); long getFreeSpace();
long getUsableSpace(); long getUsableSpace();
long getLastCommitId();
} }

View File

@@ -8,6 +8,7 @@ import org.apache.commons.lang3.tuple.Pair;
import javax.annotation.Nonnull; import javax.annotation.Nonnull;
import java.util.Collection; import java.util.Collection;
import java.util.Optional; import java.util.Optional;
import java.util.function.Consumer;
@ApplicationScoped @ApplicationScoped
public class SerializingObjectPersistentStore { public class SerializingObjectPersistentStore {
@@ -41,11 +42,15 @@ public class SerializingObjectPersistentStore {
, names.deleted()); , names.deleted());
} }
void commitTx(TxManifestObj<? extends JDataVersionedWrapper> names) { // void commitTx(TxManifestObj<? extends JDataVersionedWrapper> names, Consumer<Runnable> commitLocked) {
delegateStore.commitTx(prepareManifest(names)); // delegateStore.commitTx(prepareManifest(names), commitLocked);
// }
void commitTx(TxManifestRaw names, long txId, Consumer<Runnable> commitLocked) {
delegateStore.commitTx(names, txId, commitLocked);
} }
void commitTx(TxManifestRaw names) { long getLastCommitId() {
delegateStore.commitTx(names); return delegateStore.getLastCommitId();
} }
} }

View File

@@ -2,4 +2,6 @@ package com.usatiuk.dhfs.objects.snapshot;
public interface SnapshotEntry { public interface SnapshotEntry {
long whenToRemove(); long whenToRemove();
SnapshotEntry withWhenToRemove(long whenToRemove);
} }

View File

@@ -1,4 +1,8 @@
package com.usatiuk.dhfs.objects.snapshot; package com.usatiuk.dhfs.objects.snapshot;
public record SnapshotEntryDeleted(long whenToRemove) implements SnapshotEntry { public record SnapshotEntryDeleted(long whenToRemove) implements SnapshotEntry {
@Override
public SnapshotEntryDeleted withWhenToRemove(long whenToRemove) {
return new SnapshotEntryDeleted(whenToRemove);
}
} }

View File

@@ -3,4 +3,8 @@ package com.usatiuk.dhfs.objects.snapshot;
import com.usatiuk.dhfs.objects.JDataVersionedWrapper; import com.usatiuk.dhfs.objects.JDataVersionedWrapper;
public record SnapshotEntryObject(JDataVersionedWrapper data, long whenToRemove) implements SnapshotEntry { public record SnapshotEntryObject(JDataVersionedWrapper data, long whenToRemove) implements SnapshotEntry {
@Override
public SnapshotEntryObject withWhenToRemove(long whenToRemove) {
return new SnapshotEntryObject(data, whenToRemove);
}
} }

View File

@@ -7,42 +7,98 @@ import org.apache.commons.lang3.tuple.Pair;
import java.util.NavigableMap; import java.util.NavigableMap;
import java.util.NoSuchElementException; import java.util.NoSuchElementException;
import java.util.Optional;
public class SnapshotKvIterator implements CloseableKvIterator<JObjectKey, TombstoneMergingKvIterator.DataType<JDataVersionedWrapper>> { // TODO: test me
public class SnapshotKvIterator extends ReversibleKvIterator<JObjectKey, MaybeTombstone<JDataVersionedWrapper>> {
private final NavigableMap<SnapshotKey, SnapshotEntry> _objects; private final NavigableMap<SnapshotKey, SnapshotEntry> _objects;
private final long _version; private final long _version;
private final CloseableKvIterator<SnapshotKey, SnapshotEntry> _backing; private final CloseableKvIterator<SnapshotKey, SnapshotEntry> _backing;
private Pair<JObjectKey, TombstoneMergingKvIterator.DataType<JDataVersionedWrapper>> _next = null; private Pair<JObjectKey, MaybeTombstone<JDataVersionedWrapper>> _next = null;
public SnapshotKvIterator(NavigableMap<SnapshotKey, SnapshotEntry> objects, long version, IteratorStart start, JObjectKey startKey) { public SnapshotKvIterator(NavigableMap<SnapshotKey, SnapshotEntry> objects, long version, IteratorStart start, JObjectKey startKey) {
_objects = objects; _objects = objects;
_version = version; _version = version;
_backing = new NavigableMapKvIterator<>(_objects, start, new SnapshotKey(startKey, 0L)); _goingForward = true;
fillNext(); _backing = new NavigableMapKvIterator<>(_objects, start, new SnapshotKey(startKey, Long.MIN_VALUE));
if (_next == null) { fill();
return;
boolean shouldGoBack = false;
if (start == IteratorStart.LE) {
if (_next == null || _next.getKey().compareTo(startKey) > 0) {
shouldGoBack = true;
}
} else if (start == IteratorStart.LT) {
if (_next == null || _next.getKey().compareTo(startKey) >= 0) {
shouldGoBack = true;
}
} }
if (shouldGoBack && _backing.hasPrev()) {
_goingForward = false;
_backing.skipPrev();
fill();
_goingForward = true;
_backing.skip();
fill();
}
switch (start) { switch (start) {
case LT -> { case LT -> {
assert _next.getKey().compareTo(startKey) < 0; // assert _next == null || _next.getKey().compareTo(startKey) < 0;
} }
case LE -> { case LE -> {
assert _next.getKey().compareTo(startKey) <= 0; // assert _next == null || _next.getKey().compareTo(startKey) <= 0;
} }
case GT -> { case GT -> {
assert _next.getKey().compareTo(startKey) > 0; assert _next == null || _next.getKey().compareTo(startKey) > 0;
} }
case GE -> { case GE -> {
assert _next.getKey().compareTo(startKey) >= 0; assert _next == null || _next.getKey().compareTo(startKey) >= 0;
}
}
}
private void fillPrev(JObjectKey ltKey) {
if (ltKey != null)
while (_backing.hasPrev() && _backing.peekPrevKey().key().equals(ltKey)) {
Log.tracev("Snapshot skipping prev: {0}", _backing.peekPrevKey());
_backing.skipPrev();
}
_next = null;
while (_backing.hasPrev() && _next == null) {
var prev = _backing.prev();
if (prev.getKey().version() <= _version && prev.getValue().whenToRemove() > _version) {
Log.tracev("Snapshot skipping prev: {0} (too new)", prev);
_next = switch (prev.getValue()) {
case SnapshotEntryObject(JDataVersionedWrapper data, long whenToRemove) ->
Pair.of(prev.getKey().key(), new Data<>(data));
case SnapshotEntryDeleted(long whenToRemove) -> Pair.of(prev.getKey().key(), new Tombstone<>());
default -> throw new IllegalStateException("Unexpected value: " + prev.getValue());
};
}
}
if (_next != null) {
if (_next.getValue() instanceof Data<JDataVersionedWrapper>(
JDataVersionedWrapper value
)) {
assert value.version() <= _version;
} }
} }
} }
private void fillNext() { private void fillNext() {
_next = null;
while (_backing.hasNext() && _next == null) { while (_backing.hasNext() && _next == null) {
var next = _backing.next(); var next = _backing.next();
var nextNextKey = _backing.hasNext() ? _backing.peekNextKey() : null; var nextNextKey = _backing.hasNext() ? _backing.peekNextKey() : null;
while (nextNextKey != null && nextNextKey.key().equals(next.getKey().key()) && nextNextKey.version() <= _version) { while (nextNextKey != null && nextNextKey.key().equals(next.getKey().key()) && nextNextKey.version() <= _version) {
Log.tracev("Snapshot skipping next: {0} (too old)", next);
next = _backing.next(); next = _backing.next();
nextNextKey = _backing.hasNext() ? _backing.peekNextKey() : null; nextNextKey = _backing.hasNext() ? _backing.peekNextKey() : null;
} }
@@ -50,14 +106,13 @@ public class SnapshotKvIterator implements CloseableKvIterator<JObjectKey, Tombs
if (next.getKey().version() <= _version && next.getValue().whenToRemove() > _version) { if (next.getKey().version() <= _version && next.getValue().whenToRemove() > _version) {
_next = switch (next.getValue()) { _next = switch (next.getValue()) {
case SnapshotEntryObject(JDataVersionedWrapper data, long whenToRemove) -> case SnapshotEntryObject(JDataVersionedWrapper data, long whenToRemove) ->
Pair.of(next.getKey().key(), new TombstoneMergingKvIterator.Data<>(data)); Pair.of(next.getKey().key(), new Data<>(data));
case SnapshotEntryDeleted(long whenToRemove) -> case SnapshotEntryDeleted(long whenToRemove) -> Pair.of(next.getKey().key(), new Tombstone<>());
Pair.of(next.getKey().key(), new TombstoneMergingKvIterator.Tombstone<>());
default -> throw new IllegalStateException("Unexpected value: " + next.getValue()); default -> throw new IllegalStateException("Unexpected value: " + next.getValue());
}; };
} }
if (_next != null) { if (_next != null) {
if (_next.getValue() instanceof TombstoneMergingKvIterator.Data<JDataVersionedWrapper>( if (_next.getValue() instanceof Data<JDataVersionedWrapper>(
JDataVersionedWrapper value JDataVersionedWrapper value
)) { )) {
assert value.version() <= _version; assert value.version() <= _version;
@@ -66,19 +121,39 @@ public class SnapshotKvIterator implements CloseableKvIterator<JObjectKey, Tombs
} }
} }
private void fill() {
if (_goingForward)
fillNext();
else
fillPrev(Optional.ofNullable(_next).map(Pair::getKey).orElse(null));
}
@Override @Override
public JObjectKey peekNextKey() { protected void reverse() {
_goingForward = !_goingForward;
boolean wasAtEnd = _next == null;
if (_goingForward && !wasAtEnd)
_backing.skip();
else if (!_goingForward && !wasAtEnd)
_backing.skipPrev();
fill();
}
@Override
public JObjectKey peekImpl() {
if (_next == null) if (_next == null)
throw new NoSuchElementException(); throw new NoSuchElementException();
return _next.getKey(); return _next.getKey();
} }
@Override @Override
public void skip() { public void skipImpl() {
if (_next == null) if (_next == null)
throw new NoSuchElementException(); throw new NoSuchElementException();
_next = null; fill();
fillNext();
} }
@Override @Override
@@ -87,23 +162,22 @@ public class SnapshotKvIterator implements CloseableKvIterator<JObjectKey, Tombs
} }
@Override @Override
public boolean hasNext() { public boolean hasImpl() {
return _next != null; return _next != null;
} }
@Override @Override
public Pair<JObjectKey, TombstoneMergingKvIterator.DataType<JDataVersionedWrapper>> next() { public Pair<JObjectKey, MaybeTombstone<JDataVersionedWrapper>> nextImpl() {
if (_next == null) if (_next == null)
throw new NoSuchElementException("No more elements"); throw new NoSuchElementException("No more elements");
var ret = _next; var ret = _next;
if (ret.getValue() instanceof TombstoneMergingKvIterator.Data<JDataVersionedWrapper>( if (ret.getValue() instanceof Data<JDataVersionedWrapper>(
JDataVersionedWrapper value JDataVersionedWrapper value
)) { )) {
assert value.version() <= _version; assert value.version() <= _version;
} }
_next = null; fill();
fillNext();
Log.tracev("Read: {0}, next: {1}", ret, _next); Log.tracev("Read: {0}, next: {1}", ret, _next);
return ret; return ret;
} }

View File

@@ -10,15 +10,13 @@ import jakarta.inject.Inject;
import org.apache.commons.lang3.mutable.MutableObject; import org.apache.commons.lang3.mutable.MutableObject;
import org.apache.commons.lang3.tuple.Pair; import org.apache.commons.lang3.tuple.Pair;
import org.eclipse.microprofile.config.inject.ConfigProperty; import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.pcollections.TreePMap;
import javax.annotation.Nonnull; import javax.annotation.Nonnull;
import java.lang.ref.Cleaner; import java.lang.ref.Cleaner;
import java.util.*; import java.util.*;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer; import java.util.function.Consumer;
import java.util.function.Function;
@ApplicationScoped @ApplicationScoped
public class SnapshotManager { public class SnapshotManager {
@@ -32,10 +30,9 @@ public class SnapshotManager {
private long _lastSnapshotId = 0; private long _lastSnapshotId = 0;
private long _lastAliveSnapshotId = -1; private long _lastAliveSnapshotId = -1;
private final AtomicLong _snapshotVersion = new AtomicLong(0);
private final Queue<Long> _snapshotIds = new ArrayDeque<>(); private final Queue<Long> _snapshotIds = new ArrayDeque<>();
private final ConcurrentSkipListMap<SnapshotKey, SnapshotEntry> _objects = new ConcurrentSkipListMap<>(); private TreePMap<SnapshotKey, SnapshotEntry> _objects = TreePMap.empty();
private final TreeMap<Long, ArrayDeque<SnapshotKey>> _snapshotBounds = new TreeMap<>(); private final TreeMap<Long, ArrayDeque<SnapshotKey>> _snapshotBounds = new TreeMap<>();
private final HashMap<Long, Long> _snapshotRefCounts = new HashMap<>(); private final HashMap<Long, Long> _snapshotRefCounts = new HashMap<>();
@@ -44,67 +41,78 @@ public class SnapshotManager {
assert _snapshotIds.isEmpty() || _snapshotIds.peek() == _lastAliveSnapshotId; assert _snapshotIds.isEmpty() || _snapshotIds.peek() == _lastAliveSnapshotId;
} }
public Consumer<Runnable> commitTx(Collection<TxRecord.TxObjectRecord<?>> writes, long id) { // This should not be called for the same objects concurrently
public Consumer<Runnable> commitTx(Collection<TxRecord.TxObjectRecord<?>> writes) {
// _lock.writeLock().lock();
// try {
// if (!_snapshotIds.isEmpty()) {
// verify();
HashMap<SnapshotKey, SnapshotEntry> newEntries = new HashMap<>();
for (var action : writes) {
var current = writebackStore.readObjectVerbose(action.key());
// Add to snapshot the previous visible version of the replaced object
// I.e. should be visible to all transactions with id <= id
// and at least as its corresponding version
Pair<SnapshotKey, SnapshotEntry> newSnapshotEntry = switch (current) {
case WritebackObjectPersistentStore.VerboseReadResultPersisted(
Optional<JDataVersionedWrapper> data
) -> Pair.of(new SnapshotKey(action.key(), data.map(JDataVersionedWrapper::version).orElse(-1L)),
data.<SnapshotEntry>map(o -> new SnapshotEntryObject(o, -1)).orElse(new SnapshotEntryDeleted(-1)));
case WritebackObjectPersistentStore.VerboseReadResultPending(
PendingWriteEntry pending
) -> {
yield switch (pending) {
case PendingWrite write ->
Pair.of(new SnapshotKey(action.key(), write.bundleId()), new SnapshotEntryObject(write.data(), -1));
case PendingDelete delete ->
Pair.of(new SnapshotKey(action.key(), delete.bundleId()), new SnapshotEntryDeleted(-1));
default -> throw new IllegalStateException("Unexpected value: " + pending);
};
}
default -> throw new IllegalStateException("Unexpected value: " + current);
};
Log.tracev("Adding snapshot entry {0}", newSnapshotEntry);
newEntries.put(newSnapshotEntry.getLeft(), newSnapshotEntry.getRight());
}
_lock.writeLock().lock(); _lock.writeLock().lock();
try { try {
assert id > _lastSnapshotId; return writebackStore.commitTx(writes, (id, commit) -> {
if (!_snapshotIds.isEmpty()) { if (!_snapshotIds.isEmpty()) {
verify(); assert id > _lastSnapshotId;
for (var action : writes) { for (var newSnapshotEntry : newEntries.entrySet()) {
var current = writebackStore.readObjectVerbose(action.key()); assert newSnapshotEntry.getKey().version() < id;
// Add to snapshot the previous visible version of the replaced object var realNewSnapshotEntry = newSnapshotEntry.getValue().withWhenToRemove(id);
// I.e. should be visible to all transactions with id <= id if (realNewSnapshotEntry instanceof SnapshotEntryObject re) {
// and at least as its corresponding version assert re.data().version() <= newSnapshotEntry.getKey().version();
Pair<SnapshotKey, SnapshotEntry> newSnapshotEntry = switch (current) {
case WritebackObjectPersistentStore.VerboseReadResultPersisted(
Optional<JDataVersionedWrapper> data
) ->
Pair.of(new SnapshotKey(action.key(), Math.max(_snapshotIds.peek(), data.map(JDataVersionedWrapper::version).orElse(0L))),
data.<SnapshotEntry>map(o -> new SnapshotEntryObject(o, id)).orElse(new SnapshotEntryDeleted(id)));
case WritebackObjectPersistentStore.VerboseReadResultPending(
PendingWriteEntry pending
) -> {
assert pending.bundleId() < id;
yield switch (pending) {
case PendingWrite write ->
Pair.of(new SnapshotKey(action.key(), write.bundleId()), new SnapshotEntryObject(write.data(), id));
case PendingDelete delete ->
Pair.of(new SnapshotKey(action.key(), delete.bundleId()), new SnapshotEntryDeleted(id));
default -> throw new IllegalStateException("Unexpected value: " + pending);
};
} }
default -> throw new IllegalStateException("Unexpected value: " + current); _objects = _objects.plus(newSnapshotEntry.getKey(), realNewSnapshotEntry);
};
if (newSnapshotEntry.getValue() instanceof SnapshotEntryObject re) {
assert re.data().version() <= newSnapshotEntry.getKey().version();
}
if (newSnapshotEntry.getValue() instanceof SnapshotEntryObject re) {
assert re.data().version() <= newSnapshotEntry.getKey().version();
}
Log.tracev("Adding snapshot entry {0}", newSnapshotEntry);
var val = _objects.put(newSnapshotEntry.getLeft(), newSnapshotEntry.getRight());
// assert val == null; // assert val == null;
_snapshotBounds.merge(newSnapshotEntry.getLeft().version(), new ArrayDeque<>(List.of(newSnapshotEntry.getLeft())), _snapshotBounds.merge(newSnapshotEntry.getKey().version(), new ArrayDeque<>(List.of(newSnapshotEntry.getKey())),
(a, b) -> { (a, b) -> {
a.addAll(b); a.addAll(b);
return a; return a;
}); });
}
} }
commit.run();
_snapshotVersion.incrementAndGet(); });
}
verify();
// Commit under lock, iterators will see new version after the lock is released and writeback
// cache is updated
// TODO: Maybe writeback iterator being invalidated wouldn't be a problem?
return writebackStore.commitTx(writes, id);
} finally { } finally {
_lock.writeLock().unlock(); _lock.writeLock().unlock();
} }
// }
// verify();
// Commit under lock, iterators will see new version after the lock is released and writeback
// cache is updated
// TODO: Maybe writeback iterator being invalidated wouldn't be a problem?
// } finally {
// _lock.writeLock().unlock();
// }
} }
private void unrefSnapshot(long id) { private void unrefSnapshot(long id) {
@@ -144,11 +152,11 @@ public class SnapshotManager {
Log.tracev("Could not find place to place entry {0}, curId={1}, nextId={2}, whenToRemove={3}, snapshotIds={4}", Log.tracev("Could not find place to place entry {0}, curId={1}, nextId={2}, whenToRemove={3}, snapshotIds={4}",
entry, finalCurId, finalNextId, entry.whenToRemove(), _snapshotIds); entry, finalCurId, finalNextId, entry.whenToRemove(), _snapshotIds);
} else if (finalNextId < entry.whenToRemove()) { } else if (finalNextId < entry.whenToRemove()) {
_objects.put(new SnapshotKey(key.key(), finalNextId), entry); _objects = _objects.plus(new SnapshotKey(key.key(), finalNextId), entry);
assert finalNextId > finalCurId; assert finalNextId > finalCurId;
toReAdd.add(Pair.of(finalNextId, new SnapshotKey(key.key(), finalNextId))); toReAdd.add(Pair.of(finalNextId, new SnapshotKey(key.key(), finalNextId)));
} }
_objects.remove(key); _objects = _objects.minus(key);
}); });
toReAdd.forEach(p -> { toReAdd.forEach(p -> {
@@ -232,92 +240,72 @@ public class SnapshotManager {
@Override @Override
public JObjectKey peekNextKey() { public JObjectKey peekNextKey() {
try { return _backing.peekNextKey();
return _backing.peekNextKey();
} catch (StaleIteratorException e) {
assert false;
throw e;
}
} }
@Override @Override
public void skip() { public void skip() {
try { _backing.skip();
_backing.skip(); }
} catch (StaleIteratorException e) {
assert false; @Override
throw e; public JObjectKey peekPrevKey() {
} return _backing.peekPrevKey();
}
@Override
public Pair<JObjectKey, JDataVersionedWrapper> prev() {
var ret = _backing.prev();
assert ret.getValue().version() <= _id;
return ret;
}
@Override
public boolean hasPrev() {
return _backing.hasPrev();
}
@Override
public void skipPrev() {
_backing.skipPrev();
} }
@Override @Override
public void close() { public void close() {
try { _backing.close();
_backing.close();
} catch (StaleIteratorException e) {
assert false;
throw e;
}
} }
@Override @Override
public boolean hasNext() { public boolean hasNext() {
try { return _backing.hasNext();
return _backing.hasNext();
} catch (StaleIteratorException e) {
assert false;
throw e;
}
} }
@Override @Override
public Pair<JObjectKey, JDataVersionedWrapper> next() { public Pair<JObjectKey, JDataVersionedWrapper> next() {
try { var ret = _backing.next();
var ret = _backing.next(); assert ret.getValue().version() <= _id;
assert ret.getValue().version() <= _id; return ret;
return ret;
} catch (StaleIteratorException e) {
assert false;
throw e;
}
} }
} }
public CloseableKvIterator<JObjectKey, JDataVersionedWrapper> getIterator(IteratorStart start, JObjectKey key) { public CloseableKvIterator<JObjectKey, JDataVersionedWrapper> getIterator(IteratorStart start, JObjectKey key) {
// In case something was added to the snapshot, it is not guaranteed that the iterators will see it,
// so refresh them manually. Otherwise, it could be possible that something from the writeback cache will
// be served instead. Note that refreshing the iterator will also refresh the writeback iterator,
// so it also should be consistent.
Log.tracev("Getting snapshot {0} iterator for {1} {2}", _id, start, key);
_lock.readLock().lock(); _lock.readLock().lock();
try { try {
Function<Pair<IteratorStart, JObjectKey>, CloseableKvIterator<JObjectKey, JDataVersionedWrapper>> iteratorFactory = Log.tracev("Getting snapshot {0} iterator for {1} {2}\n" +
p -> new TombstoneMergingKvIterator<>("snapshot", p.getKey(), p.getValue(), "objects in snapshots: {3}", _id, start, key, _objects);
(tS, tK) -> new SnapshotKvIterator(_objects, _id, tS, tK), return new CheckingSnapshotKvIterator(new TombstoneMergingKvIterator<>("snapshot", start, key,
(tS, tK) -> new MappingKvIterator<>( (tS, tK) -> new SnapshotKvIterator(_objects, _id, tS, tK),
writebackStore.getIterator(tS, tK), (tS, tK) -> new MappingKvIterator<>(
d -> d.version() <= _id ? new TombstoneMergingKvIterator.Data<>(d) : new TombstoneMergingKvIterator.Tombstone<>()) writebackStore.getIterator(tS, tK), d -> d.version() <= _id ? new Data<>(d) : new Tombstone<>())
); ));
var backing = extraChecks ? new SelfRefreshingKvIterator<>(
iteratorFactory, _snapshotVersion::get, _lock.readLock(), start, key
) : new InconsistentSelfRefreshingKvIterator<>(
iteratorFactory, _snapshotVersion::get, _lock.readLock(), start, key
);
return new CheckingSnapshotKvIterator(backing);
} finally { } finally {
_lock.readLock().unlock(); _lock.readLock().unlock();
} }
} }
public CloseableKvIterator<JObjectKey, JDataVersionedWrapper> getIterator(JObjectKey key) {
return getIterator(IteratorStart.GE, key);
}
@Nonnull @Nonnull
public Optional<JDataVersionedWrapper> readObject(JObjectKey name) { public Optional<JDataVersionedWrapper> readObject(JObjectKey name) {
try (var it = getIterator(name)) { try (var it = getIterator(IteratorStart.GE, name)) {
if (it.hasNext()) { if (it.hasNext()) {
if (!it.peekNextKey().equals(name)) { if (!it.peekNextKey().equals(name)) {
return Optional.empty(); return Optional.empty();
@@ -338,8 +326,13 @@ public class SnapshotManager {
} }
} }
public Snapshot createSnapshot(long id) { public Snapshot createSnapshot() {
return new Snapshot(id); _lock.writeLock().lock();
try {
return new Snapshot(writebackStore.getLastTxId());
} finally {
_lock.writeLock().unlock();
}
} }
@Nonnull @Nonnull

View File

@@ -90,6 +90,28 @@ public class ReadTrackingObjectSourceFactory {
_backing.skip(); _backing.skip();
} }
@Override
public JObjectKey peekPrevKey() {
return _backing.peekPrevKey();
}
@Override
public Pair<JObjectKey, JData> prev() {
var got = _backing.prev();
_readSet.putIfAbsent(got.getKey(), new TransactionObjectNoLock<>(Optional.of(got.getValue())));
return Pair.of(got.getKey(), got.getValue().data());
}
@Override
public boolean hasPrev() {
return _backing.hasPrev();
}
@Override
public void skipPrev() {
_backing.skipPrev();
}
@Override @Override
public void close() { public void close() {
_backing.close(); _backing.close();

View File

@@ -1,5 +1,5 @@
package com.usatiuk.dhfs.objects.transaction; package com.usatiuk.dhfs.objects.transaction;
public interface TransactionFactory { public interface TransactionFactory {
TransactionPrivate createTransaction(long snapshotId); TransactionPrivate createTransaction();
} }

View File

@@ -18,24 +18,22 @@ public class TransactionFactoryImpl implements TransactionFactory {
ReadTrackingObjectSourceFactory readTrackingObjectSourceFactory; ReadTrackingObjectSourceFactory readTrackingObjectSourceFactory;
@Override @Override
public TransactionPrivate createTransaction(long snapshotId) { public TransactionPrivate createTransaction() {
Log.tracev("Trying to create transaction with snapshotId={0}", snapshotId); return new TransactionImpl();
return new TransactionImpl(snapshotId);
} }
private class TransactionImpl implements TransactionPrivate { private class TransactionImpl implements TransactionPrivate {
private final ReadTrackingTransactionObjectSource _source; private final ReadTrackingTransactionObjectSource _source;
private final NavigableMap<JObjectKey, TxRecord.TxObjectRecord<?>> _writes = new TreeMap<>(); private final NavigableMap<JObjectKey, TxRecord.TxObjectRecord<?>> _writes = new TreeMap<>();
private long _writeVersion = 0;
private Map<JObjectKey, TxRecord.TxObjectRecord<?>> _newWrites = new HashMap<>(); private Map<JObjectKey, TxRecord.TxObjectRecord<?>> _newWrites = new HashMap<>();
private final List<Runnable> _onCommit = new ArrayList<>(); private final List<Runnable> _onCommit = new ArrayList<>();
private final List<Runnable> _onFlush = new ArrayList<>(); private final List<Runnable> _onFlush = new ArrayList<>();
private final SnapshotManager.Snapshot _snapshot; private final SnapshotManager.Snapshot _snapshot;
private TransactionImpl(long snapshotId) { private TransactionImpl() {
_snapshot = snapshotManager.createSnapshot(snapshotId); _snapshot = snapshotManager.createSnapshot();
_source = readTrackingObjectSourceFactory.create(_snapshot); _source = readTrackingObjectSourceFactory.create(_snapshot);
} }
@@ -108,12 +106,11 @@ public class TransactionFactoryImpl implements TransactionFactory {
Log.tracev("Getting tx iterator with start={0}, key={1}", start, key); Log.tracev("Getting tx iterator with start={0}, key={1}", start, key);
return new TombstoneMergingKvIterator<>("tx", start, key, return new TombstoneMergingKvIterator<>("tx", start, key,
(tS, tK) -> new MappingKvIterator<>(new NavigableMapKvIterator<>(_writes, tS, tK), t -> switch (t) { (tS, tK) -> new MappingKvIterator<>(new NavigableMapKvIterator<>(_writes, tS, tK), t -> switch (t) {
case TxRecord.TxObjectRecordWrite<?> write -> case TxRecord.TxObjectRecordWrite<?> write -> new Data<>(write.data());
new TombstoneMergingKvIterator.Data<>(write.data()); case TxRecord.TxObjectRecordDeleted deleted -> new Tombstone<>();
case TxRecord.TxObjectRecordDeleted deleted -> new TombstoneMergingKvIterator.Tombstone<>();
case null, default -> null; case null, default -> null;
}), }),
(tS, tK) -> new MappingKvIterator<>(_source.getIterator(tS, tK), TombstoneMergingKvIterator.Data::new)); (tS, tK) -> new MappingKvIterator<>(_source.getIterator(tS, tK), Data::new));
} }
@Override @Override

View File

@@ -0,0 +1,154 @@
package com.usatiuk.dhfs.objects;
import com.usatiuk.dhfs.objects.persistence.IteratorStart;
import org.apache.commons.lang3.tuple.Pair;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.pcollections.TreePMap;
import java.util.List;
public class KeyPredicateKvIteratorTest {
@Test
public void simpleTest() {
var source1 = TreePMap.<Integer, Integer>empty().plus(3, 3).plus(5, 5).plus(6, 6);
var pit = new KeyPredicateKvIterator<>(new NavigableMapKvIterator<>(source1, IteratorStart.GT, 3),
IteratorStart.GE, 3, v -> (v % 2 == 0));
var expected = List.of(Pair.of(6, 6));
for (var pair : expected) {
Assertions.assertTrue(pit.hasNext());
Assertions.assertEquals(pair, pit.next());
}
}
@Test
public void ltTest() {
var source1 = TreePMap.<Integer, Integer>empty().plus(3, 3).plus(5, 5).plus(6, 6);
var pit = new KeyPredicateKvIterator<>(new NavigableMapKvIterator<>(source1, IteratorStart.LT, 5),
IteratorStart.LT, 5, v -> (v % 2 == 0));
var expected = List.of(Pair.of(6, 6));
for (var pair : expected) {
Assertions.assertTrue(pit.hasNext());
Assertions.assertEquals(pair, pit.next());
}
Assertions.assertFalse(pit.hasNext());
}
@Test
public void ltTest2() {
var source1 = TreePMap.<Integer, Integer>empty().plus(3, 3).plus(5, 5).plus(6, 6);
var pit = new KeyPredicateKvIterator<>(new NavigableMapKvIterator<>(source1, IteratorStart.LT, 3),
IteratorStart.LT, 2, v -> (v % 2 == 0));
Just.checkIterator(pit, Pair.of(6, 6));
Assertions.assertFalse(pit.hasNext());
pit = new KeyPredicateKvIterator<>(new NavigableMapKvIterator<>(source1, IteratorStart.LT, 4),
IteratorStart.LT, 4, v -> (v % 2 == 0));
Just.checkIterator(pit, Pair.of(6, 6));
Assertions.assertFalse(pit.hasNext());
pit = new KeyPredicateKvIterator<>(new NavigableMapKvIterator<>(source1, IteratorStart.LT, 5),
IteratorStart.LT, 5, v -> (v % 2 == 0));
Just.checkIterator(pit, Pair.of(6, 6));
Assertions.assertFalse(pit.hasNext());
pit = new KeyPredicateKvIterator<>(new NavigableMapKvIterator<>(source1, IteratorStart.LE, 5),
IteratorStart.LE, 5, v -> (v % 2 == 0));
Just.checkIterator(pit, Pair.of(6, 6));
Assertions.assertFalse(pit.hasNext());
}
@Test
public void ltTest3() {
var source1 = TreePMap.<Integer, Integer>empty().plus(3, 3).plus(5, 5).plus(6, 6).plus(7, 7).plus(8, 8);
var pit = new KeyPredicateKvIterator<>(new NavigableMapKvIterator<>(source1, IteratorStart.LT, 5),
IteratorStart.LT, 5, v -> (v % 2 == 0));
Just.checkIterator(pit, Pair.of(6, 6), Pair.of(8, 8));
Assertions.assertFalse(pit.hasNext());
pit = new KeyPredicateKvIterator<>(new NavigableMapKvIterator<>(source1, IteratorStart.LT, 5),
IteratorStart.LT, 5, v -> (v % 2 == 0));
Just.checkIterator(pit, Pair.of(6, 6), Pair.of(8, 8));
Assertions.assertFalse(pit.hasNext());
pit = new KeyPredicateKvIterator<>(new NavigableMapKvIterator<>(source1, IteratorStart.LT, 6),
IteratorStart.LT, 6, v -> (v % 2 == 0));
Just.checkIterator(pit, Pair.of(6, 6), Pair.of(8, 8));
Assertions.assertFalse(pit.hasNext());
pit = new KeyPredicateKvIterator<>(new NavigableMapKvIterator<>(source1, IteratorStart.LT, 7),
IteratorStart.LT, 7, v -> (v % 2 == 0));
Just.checkIterator(pit, Pair.of(6, 6), Pair.of(8, 8));
Assertions.assertFalse(pit.hasNext());
pit = new KeyPredicateKvIterator<>(new NavigableMapKvIterator<>(source1, IteratorStart.LT, 8),
IteratorStart.LT, 8, v -> (v % 2 == 0));
Just.checkIterator(pit, Pair.of(6, 6), Pair.of(8, 8));
Assertions.assertFalse(pit.hasNext());
pit = new KeyPredicateKvIterator<>(new NavigableMapKvIterator<>(source1, IteratorStart.LE, 6),
IteratorStart.LE, 6, v -> (v % 2 == 0));
Just.checkIterator(pit, Pair.of(6, 6), Pair.of(8, 8));
Assertions.assertFalse(pit.hasNext());
pit = new KeyPredicateKvIterator<>(new NavigableMapKvIterator<>(source1, IteratorStart.LT, 6),
IteratorStart.LT, 6, v -> (v % 2 == 0));
Assertions.assertTrue(pit.hasNext());
Assertions.assertEquals(6, pit.peekNextKey());
Assertions.assertFalse(pit.hasPrev());
Assertions.assertEquals(6, pit.peekNextKey());
Assertions.assertFalse(pit.hasPrev());
Assertions.assertEquals(Pair.of(6, 6), pit.next());
Assertions.assertTrue(pit.hasNext());
Assertions.assertEquals(8, pit.peekNextKey());
Assertions.assertEquals(6, pit.peekPrevKey());
Assertions.assertEquals(8, pit.peekNextKey());
Assertions.assertEquals(6, pit.peekPrevKey());
}
@Test
public void itTest4() {
var source1 = TreePMap.<Integer, Integer>empty().plus(3, 3).plus(5, 5).plus(6, 6).plus(8, 8).plus(10, 10);
var pit = new KeyPredicateKvIterator<>(new NavigableMapKvIterator<>(source1, IteratorStart.LT, 5),
IteratorStart.LT, 5, v -> (v % 2 == 0));
Just.checkIterator(pit, Pair.of(6, 6), Pair.of(8, 8), Pair.of(10, 10));
Assertions.assertFalse(pit.hasNext());
pit = new KeyPredicateKvIterator<>(new NavigableMapKvIterator<>(source1, IteratorStart.LT, 5),
IteratorStart.LT, 5, v -> (v % 2 == 0));
Just.checkIterator(pit, Pair.of(6, 6), Pair.of(8, 8), Pair.of(10, 10));
Assertions.assertFalse(pit.hasNext());
pit = new KeyPredicateKvIterator<>(new NavigableMapKvIterator<>(source1, IteratorStart.LT, 6),
IteratorStart.LT, 6, v -> (v % 2 == 0));
Just.checkIterator(pit, Pair.of(6, 6), Pair.of(8, 8), Pair.of(10, 10));
Assertions.assertFalse(pit.hasNext());
pit = new KeyPredicateKvIterator<>(new NavigableMapKvIterator<>(source1, IteratorStart.LT, 7),
IteratorStart.LT, 7, v -> (v % 2 == 0));
Just.checkIterator(pit, Pair.of(6, 6), Pair.of(8, 8), Pair.of(10, 10));
Assertions.assertFalse(pit.hasNext());
pit = new KeyPredicateKvIterator<>(new NavigableMapKvIterator<>(source1, IteratorStart.LT, 6),
IteratorStart.LT, 6, v -> (v % 2 == 0));
Assertions.assertTrue(pit.hasNext());
Assertions.assertEquals(6, pit.peekNextKey());
Assertions.assertFalse(pit.hasPrev());
Assertions.assertEquals(6, pit.peekNextKey());
Assertions.assertEquals(Pair.of(6, 6), pit.next());
Assertions.assertTrue(pit.hasNext());
Assertions.assertEquals(8, pit.peekNextKey());
Assertions.assertEquals(6, pit.peekPrevKey());
Assertions.assertEquals(8, pit.peekNextKey());
Assertions.assertEquals(6, pit.peekPrevKey());
}
// @Test
// public void reverseTest() {
// var source1 = TreePMap.<Integer, Integer>empty().plus(3, 3).plus(5, 5).plus(6, 6);
// var pit = new KeyPredicateKvIterator<>(new NavigableMapKvIterator<>(source1, IteratorStart.LT, 4),
// IteratorStart.LT, 4, v -> (v % 2 == 0) );
//
// }
}

View File

@@ -44,6 +44,27 @@ public class MergingKvIteratorTest {
fillNext(); fillNext();
} }
@Override
public K peekPrevKey() {
throw new UnsupportedOperationException();
}
@Override
public Pair<K, V> prev() {
throw new UnsupportedOperationException();
}
@Override
public boolean hasPrev() {
throw new UnsupportedOperationException();
}
@Override
public void skipPrev() {
throw new UnsupportedOperationException();
}
@Override @Override
public void close() { public void close() {
} }
@@ -148,7 +169,7 @@ public class MergingKvIteratorTest {
Assertions.assertFalse(mergingIterator.hasNext()); Assertions.assertFalse(mergingIterator.hasNext());
Just.checkIterator(mergingIterator.reversed(), Pair.of(5, 6), Pair.of(2, 4), Pair.of(1, 3)); Just.checkIterator(mergingIterator.reversed(), Pair.of(5, 6), Pair.of(2, 4), Pair.of(1, 3));
Assertions.assertFalse(mergingIterator.reversed().hasNext()); Assertions.assertFalse(mergingIterator.reversed().hasNext());
Just.checkIterator(mergingIterator, Pair.of(1,3), Pair.of(2, 4), Pair.of(5, 6)); Just.checkIterator(mergingIterator, Pair.of(1, 3), Pair.of(2, 4), Pair.of(5, 6));
Assertions.assertFalse(mergingIterator.hasNext()); Assertions.assertFalse(mergingIterator.hasNext());
@@ -161,7 +182,7 @@ public class MergingKvIteratorTest {
Assertions.assertFalse(mergingIterator2.hasNext()); Assertions.assertFalse(mergingIterator2.hasNext());
Just.checkIterator(mergingIterator2.reversed(), Pair.of(5, 6), Pair.of(2, 5), Pair.of(1, 3)); Just.checkIterator(mergingIterator2.reversed(), Pair.of(5, 6), Pair.of(2, 5), Pair.of(1, 3));
Assertions.assertFalse(mergingIterator2.reversed().hasNext()); Assertions.assertFalse(mergingIterator2.reversed().hasNext());
Just.checkIterator(mergingIterator2, Pair.of(1,3), Pair.of(2, 5), Pair.of(5, 6)); Just.checkIterator(mergingIterator2, Pair.of(1, 3), Pair.of(2, 5), Pair.of(5, 6));
Assertions.assertFalse(mergingIterator2.hasNext()); Assertions.assertFalse(mergingIterator2.hasNext());
var mergingIterator3 = new MergingKvIterator<>("test", IteratorStart.LE, 5, (mS, mK) -> new NavigableMapKvIterator<>(source1, mS, mK), (mS, mK) -> new NavigableMapKvIterator<>(source2, mS, mK)); var mergingIterator3 = new MergingKvIterator<>("test", IteratorStart.LE, 5, (mS, mK) -> new NavigableMapKvIterator<>(source1, mS, mK), (mS, mK) -> new NavigableMapKvIterator<>(source2, mS, mK));
@@ -213,6 +234,9 @@ public class MergingKvIteratorTest {
Assertions.assertEquals(2, mergingIterator3.peekPrevKey()); Assertions.assertEquals(2, mergingIterator3.peekPrevKey());
Assertions.assertEquals(5, mergingIterator3.peekNextKey()); Assertions.assertEquals(5, mergingIterator3.peekNextKey());
Assertions.assertEquals(2, mergingIterator3.peekPrevKey()); Assertions.assertEquals(2, mergingIterator3.peekPrevKey());
Assertions.assertTrue(mergingIterator3.hasPrev());
Assertions.assertTrue(mergingIterator3.hasNext());
Assertions.assertEquals(5, mergingIterator3.peekNextKey());
} }
@Test @Test

View File

@@ -5,5 +5,5 @@ import io.quarkus.test.junit.TestProfile;
@QuarkusTest @QuarkusTest
@TestProfile(Profiles.ObjectsTestProfileExtraChecks.class) @TestProfile(Profiles.ObjectsTestProfileExtraChecks.class)
public class ObjectsTestExtraChecks extends ObjectsTestImpl{ public class ObjectsTestExtraChecks extends ObjectsTestImpl {
} }

View File

@@ -5,7 +5,6 @@ import com.usatiuk.dhfs.objects.persistence.IteratorStart;
import com.usatiuk.dhfs.objects.transaction.LockingStrategy; import com.usatiuk.dhfs.objects.transaction.LockingStrategy;
import com.usatiuk.dhfs.objects.transaction.Transaction; import com.usatiuk.dhfs.objects.transaction.Transaction;
import io.quarkus.logging.Log; import io.quarkus.logging.Log;
import io.quarkus.test.junit.QuarkusTestProfile;
import jakarta.inject.Inject; import jakarta.inject.Inject;
import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Disabled;
@@ -13,7 +12,6 @@ import org.junit.jupiter.api.RepeatedTest;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource; import org.junit.jupiter.params.provider.EnumSource;
import org.pcollections.TreePMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@@ -24,17 +22,17 @@ import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
class Profiles { class Profiles {
public static class ObjectsTestProfileExtraChecks implements QuarkusTestProfile { public static class ObjectsTestProfileExtraChecks extends TempDataProfile {
@Override @Override
final public Map<String, String> getConfigOverrides() { protected void getConfigOverrides(Map<String, String> toPut) {
return TreePMap.<String, String>empty().plus("dhfs.objects.persistence.snapshot-extra-checks", "true"); toPut.put("dhfs.objects.persistence.snapshot-extra-checks", "true");
} }
} }
public static class ObjectsTestProfileNoExtraChecks implements QuarkusTestProfile { public static class ObjectsTestProfileNoExtraChecks extends TempDataProfile {
@Override @Override
final public Map<String, String> getConfigOverrides() { protected void getConfigOverrides(Map<String, String> toPut) {
return TreePMap.<String, String>empty().plus("dhfs.objects.persistence.snapshot-extra-checks", "false"); toPut.put("dhfs.objects.persistence.snapshot-extra-checks", "false");
} }
} }
} }
@@ -582,6 +580,7 @@ public abstract class ObjectsTestImpl {
Assertions.assertEquals(key3, got.getKey().name()); Assertions.assertEquals(key3, got.getKey().name());
got = iter.next(); got = iter.next();
Assertions.assertEquals(key4, got.getKey().name()); Assertions.assertEquals(key4, got.getKey().name());
iter.close();
}); });
} }
@@ -611,6 +610,18 @@ public abstract class ObjectsTestImpl {
Assertions.assertEquals(key4, got.getKey().name()); Assertions.assertEquals(key4, got.getKey().name());
} }
}); });
txm.run(() -> {
try (var iter = curTx.getIterator(IteratorStart.LT, new JObjectKey(key + "_5"))) {
var got = iter.next();
Assertions.assertEquals(key4, got.getKey().name());
Assertions.assertTrue(iter.hasPrev());
got = iter.prev();
Assertions.assertEquals(key4, got.getKey().name());
Assertions.assertTrue(iter.hasNext());
got = iter.next();
Assertions.assertEquals(key4, got.getKey().name());
}
});
txm.run(() -> { txm.run(() -> {
curTx.delete(new JObjectKey(key)); curTx.delete(new JObjectKey(key));
curTx.delete(new JObjectKey(key1)); curTx.delete(new JObjectKey(key1));
@@ -816,6 +827,32 @@ public abstract class ObjectsTestImpl {
try { try {
barrier.await(); barrier.await();
barrier2.await(); barrier2.await();
try (var iter = curTx.getIterator(IteratorStart.LE, new JObjectKey(key3))) {
var got = iter.next();
Assertions.assertEquals(key2, got.getKey().name());
Assertions.assertEquals("John2", ((Parent) got.getValue()).name());
Assertions.assertTrue(iter.hasNext());
Assertions.assertTrue(iter.hasPrev());
got = iter.next();
Assertions.assertEquals(key4, got.getKey().name());
Assertions.assertTrue(iter.hasPrev());
got = iter.prev();
Assertions.assertEquals(key4, got.getKey().name());
Assertions.assertTrue(iter.hasPrev());
got = iter.prev();
Assertions.assertEquals("John2", ((Parent) got.getValue()).name());
Assertions.assertTrue(iter.hasPrev());
got = iter.prev();
Assertions.assertEquals(key1, got.getKey().name());
Assertions.assertTrue(iter.hasNext());
got = iter.next();
Assertions.assertEquals(key1, got.getKey().name());
got = iter.next();
Assertions.assertEquals(key2, got.getKey().name());
Assertions.assertEquals("John2", ((Parent) got.getValue()).name());
got = iter.next();
Assertions.assertEquals(key4, got.getKey().name());
}
try (var iter = curTx.getIterator(IteratorStart.GT, new JObjectKey(key))) { try (var iter = curTx.getIterator(IteratorStart.GT, new JObjectKey(key))) {
var got = iter.next(); var got = iter.next();
Assertions.assertEquals(key1, got.getKey().name()); Assertions.assertEquals(key1, got.getKey().name());

View File

@@ -3,6 +3,7 @@ package com.usatiuk.dhfs.objects;
import com.usatiuk.dhfs.objects.data.Parent; import com.usatiuk.dhfs.objects.data.Parent;
import com.usatiuk.dhfs.objects.transaction.Transaction; import com.usatiuk.dhfs.objects.transaction.Transaction;
import io.quarkus.test.junit.QuarkusTest; import io.quarkus.test.junit.QuarkusTest;
import io.quarkus.test.junit.TestProfile;
import io.quarkus.test.junit.mockito.InjectSpy; import io.quarkus.test.junit.mockito.InjectSpy;
import jakarta.enterprise.context.ApplicationScoped; import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject; import jakarta.inject.Inject;
@@ -12,6 +13,7 @@ import org.mockito.ArgumentCaptor;
import org.mockito.Mockito; import org.mockito.Mockito;
@QuarkusTest @QuarkusTest
@TestProfile(TempDataProfile.class)
public class PreCommitTxHookTest { public class PreCommitTxHookTest {
@Inject @Inject
TransactionManager txm; TransactionManager txm;

View File

@@ -27,7 +27,7 @@ public class PredicateKvIteratorTest {
var source1 = TreePMap.<Integer, Integer>empty().plus(1, 3).plus(3, 5).plus(4, 6); var source1 = TreePMap.<Integer, Integer>empty().plus(1, 3).plus(3, 5).plus(4, 6);
var pit = new PredicateKvIterator<>(new NavigableMapKvIterator<>(source1, IteratorStart.LT, 4), var pit = new PredicateKvIterator<>(new NavigableMapKvIterator<>(source1, IteratorStart.LT, 4),
IteratorStart.LT, 4, v -> (v % 2 == 0) ? v : null); IteratorStart.LT, 4, v -> (v % 2 == 0) ? v : null);
var expected = List.of(); var expected = List.of(Pair.of(4, 6));
for (var pair : expected) { for (var pair : expected) {
Assertions.assertTrue(pit.hasNext()); Assertions.assertTrue(pit.hasNext());
Assertions.assertEquals(pair, pit.next()); Assertions.assertEquals(pair, pit.next());
@@ -129,6 +129,11 @@ public class PredicateKvIteratorTest {
IteratorStart.LT, 7, v -> (v % 2 == 0) ? v : null); IteratorStart.LT, 7, v -> (v % 2 == 0) ? v : null);
Just.checkIterator(pit, Pair.of(6, 10)); Just.checkIterator(pit, Pair.of(6, 10));
Assertions.assertFalse(pit.hasNext()); Assertions.assertFalse(pit.hasNext());
Assertions.assertTrue(pit.hasPrev());
Assertions.assertEquals(6, pit.peekPrevKey());
Assertions.assertEquals(Pair.of(6, 10), pit.prev());
Assertions.assertTrue(pit.hasNext());
Assertions.assertEquals(6, pit.peekNextKey());
pit = new PredicateKvIterator<>(new NavigableMapKvIterator<>(source1, IteratorStart.LT, 6), pit = new PredicateKvIterator<>(new NavigableMapKvIterator<>(source1, IteratorStart.LT, 6),
IteratorStart.LT, 6, v -> (v % 2 == 0) ? v : null); IteratorStart.LT, 6, v -> (v % 2 == 0) ? v : null);

View File

@@ -10,18 +10,23 @@ import io.quarkus.test.junit.TestProfile;
import jakarta.inject.Inject; import jakarta.inject.Inject;
import org.apache.commons.lang3.tuple.Pair; import org.apache.commons.lang3.tuple.Pair;
import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.RepeatedTest;
import java.util.List; import java.util.List;
class Profiles {
public static class LmdbKvIteratorTestProfile extends TempDataProfile {
}
}
@QuarkusTest @QuarkusTest
@TestProfile(TempDataProfile.class) @TestProfile(Profiles.LmdbKvIteratorTestProfile.class)
public class LmdbKvIteratorTest { public class LmdbKvIteratorTest {
@Inject @Inject
LmdbObjectPersistentStore store; LmdbObjectPersistentStore store;
@Test @RepeatedTest(100)
public void iteratorTest1() { public void iteratorTest1() {
store.commitTx( store.commitTx(
new TxManifestRaw( new TxManifestRaw(
@@ -29,7 +34,7 @@ public class LmdbKvIteratorTest {
Pair.of(JObjectKey.of(Long.toString(2)), ByteString.copyFrom(new byte[]{3})), Pair.of(JObjectKey.of(Long.toString(2)), ByteString.copyFrom(new byte[]{3})),
Pair.of(JObjectKey.of(Long.toString(3)), ByteString.copyFrom(new byte[]{4}))), Pair.of(JObjectKey.of(Long.toString(3)), ByteString.copyFrom(new byte[]{4}))),
List.of() List.of()
) ), -1, Runnable::run
); );
var iterator = store.getIterator(IteratorStart.LE, JObjectKey.of(Long.toString(3))); var iterator = store.getIterator(IteratorStart.LE, JObjectKey.of(Long.toString(3)));
@@ -99,8 +104,10 @@ public class LmdbKvIteratorTest {
iterator.close(); iterator.close();
store.commitTx(new TxManifestRaw( store.commitTx(new TxManifestRaw(
List.of(), List.of(),
List.of(JObjectKey.of(Long.toString(1)), JObjectKey.of(Long.toString(2)), JObjectKey.of(Long.toString(3))) List.of(JObjectKey.of(Long.toString(1)), JObjectKey.of(Long.toString(2)), JObjectKey.of(Long.toString(3)))
)); ),
-1, Runnable::run
);
} }
} }

View File

@@ -3,6 +3,7 @@ package com.usatiuk.dhfs.objects.jmap;
import com.usatiuk.dhfs.objects.CloseableKvIterator; import com.usatiuk.dhfs.objects.CloseableKvIterator;
import com.usatiuk.dhfs.objects.JData; import com.usatiuk.dhfs.objects.JData;
import com.usatiuk.dhfs.objects.JObjectKey; import com.usatiuk.dhfs.objects.JObjectKey;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.commons.lang3.tuple.Pair; import org.apache.commons.lang3.tuple.Pair;
public class JMapIterator<K extends JMapKey & Comparable<K>> implements CloseableKvIterator<K, JMapEntry<K>> { public class JMapIterator<K extends JMapKey & Comparable<K>> implements CloseableKvIterator<K, JMapEntry<K>> {
@@ -52,6 +53,26 @@ public class JMapIterator<K extends JMapKey & Comparable<K>> implements Closeabl
advance(); advance();
} }
@Override
public K peekPrevKey() {
throw new NotImplementedException();
}
@Override
public Pair<K, JMapEntry<K>> prev() {
throw new NotImplementedException();
}
@Override
public boolean hasPrev() {
throw new NotImplementedException();
}
@Override
public void skipPrev() {
throw new NotImplementedException();
}
@Override @Override
public void close() { public void close() {
_backing.close(); _backing.close();