somewhat working transactions 3.0

This commit is contained in:
2025-02-22 17:22:58 +01:00
parent a461dd6b80
commit c60a55b915
30 changed files with 826 additions and 354 deletions

View File

@@ -69,6 +69,10 @@
<artifactId>lmdbjava</artifactId>
<version>0.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-collections4</artifactId>
</dependency>
</dependencies>
<build>

View File

@@ -1,13 +1,15 @@
package com.usatiuk.dhfs.objects;
import com.usatiuk.dhfs.objects.persistence.IteratorStart;
import com.usatiuk.dhfs.objects.transaction.LockingStrategy;
import com.usatiuk.dhfs.objects.transaction.Transaction;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import org.apache.commons.lang3.tuple.Pair;
import javax.annotation.Nonnull;
import java.util.Collection;
import java.util.List;
import java.util.Iterator;
import java.util.Optional;
@ApplicationScoped
@@ -15,11 +17,6 @@ public class CurrentTransaction implements Transaction {
@Inject
TransactionManager transactionManager;
@Override
public long getId() {
return transactionManager.current().getId();
}
@Override
public void onCommit(Runnable runnable) {
transactionManager.current().onCommit(runnable);
@@ -46,6 +43,11 @@ public class CurrentTransaction implements Transaction {
return transactionManager.current().findAllObjects();
}
@Override
public Iterator<Pair<JObjectKey, JData>> getIterator(IteratorStart start, JObjectKey key) {
return transactionManager.current().getIterator(start, key);
}
@Override
public <T extends JData> void put(JData obj) {
transactionManager.current().put(obj);

View File

@@ -5,7 +5,4 @@ import jakarta.annotation.Nonnull;
import java.io.Serializable;
public record JDataVersionedWrapper(@Nonnull JData data, long version) implements Serializable {
public JDataVersionedWrapper withVersion(long version) {
return new JDataVersionedWrapper(data, version);
}
}

View File

@@ -2,7 +2,6 @@ package com.usatiuk.dhfs.objects;
import com.usatiuk.dhfs.objects.transaction.*;
import com.usatiuk.dhfs.utils.AutoCloseableNoThrow;
import com.usatiuk.dhfs.utils.DataLocker;
import io.quarkus.logging.Log;
import io.quarkus.runtime.StartupEvent;
import jakarta.annotation.Priority;
@@ -11,10 +10,7 @@ import jakarta.enterprise.event.Observes;
import jakarta.enterprise.inject.Instance;
import jakarta.inject.Inject;
import java.lang.ref.Cleaner;
import java.lang.ref.WeakReference;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Function;
@@ -27,21 +23,21 @@ import java.util.stream.Stream;
@ApplicationScoped
public class JObjectManager {
private final List<PreCommitTxHook> _preCommitTxHooks;
private final DataLocker _objLocker = new DataLocker();
private final ConcurrentHashMap<JObjectKey, JDataWrapper<?>> _objects = new ConcurrentHashMap<>();
private final AtomicLong _txCounter = new AtomicLong();
private boolean _ready = false;
@Inject
WritebackObjectPersistentStore writebackObjectPersistentStore;
SnapshotManager snapshotManager;
@Inject
TransactionFactory transactionFactory;
@Inject
LockManager lockManager;
private void verifyReady() {
if (!_ready) throw new IllegalStateException("Wrong service order!");
}
void init(@Observes @Priority(200) StartupEvent event) {
var read = writebackObjectPersistentStore.readObject(JDataDummy.TX_ID_OBJ_NAME).orElse(null);
var read = snapshotManager.readObjectDirect(JDataDummy.TX_ID_OBJ_NAME).orElse(null);
if (read != null) {
_txCounter.set(read.version());
}
@@ -52,80 +48,24 @@ public class JObjectManager {
_preCommitTxHooks = preCommitTxHooks.stream().sorted(Comparator.comparingInt(PreCommitTxHook::getPriority)).toList();
}
private <T extends JData> JDataVersionedWrapper get(Class<T> type, JObjectKey key) {
verifyReady();
while (true) {
{
var got = _objects.get(key);
if (got != null) {
var ref = got.get();
if (ref == null) {
_objects.remove(key, got);
} else if (type.isInstance(ref.data())) {
return (JDataVersionedWrapper) ref;
} else {
throw new IllegalArgumentException("Object type mismatch: " + ref.data().getClass() + " vs " + type);
}
}
}
//noinspection unused
try (var readLock = _objLocker.lock(key)) {
if (_objects.containsKey(key)) continue;
var read = writebackObjectPersistentStore.readObject(key).orElse(null);
if (read == null) return null;
if (type.isInstance(read.data())) {
var wrapper = new JDataWrapper<>((JDataVersionedWrapper) read);
var old = _objects.put(key, wrapper);
assert old == null;
return (JDataVersionedWrapper) read;
} else {
throw new IllegalArgumentException("Object type mismatch: " + read.getClass() + " vs " + type);
}
}
}
}
private <T extends JData> TransactionObjectNoLock<T> getObj(Class<T> type, JObjectKey key) {
verifyReady();
var got = get(type, key);
return new TransactionObjectNoLock<>(Optional.ofNullable(got));
}
private <T extends JData> TransactionObjectLocked<T> getObjLock(Class<T> type, JObjectKey key) {
verifyReady();
var lock = _objLocker.lock(key);
var got = get(type, key);
return new TransactionObjectLocked<>(Optional.ofNullable(got), lock);
}
public TransactionPrivate createTransaction() {
verifyReady();
var counter = _txCounter.getAndIncrement();
Log.trace("Creating transaction " + counter);
return transactionFactory.createTransaction(counter, new TransactionObjectSourceImpl(counter));
return transactionFactory.createTransaction(_txCounter.get());
}
public TransactionHandle commit(TransactionPrivate tx) {
verifyReady();
Log.trace("Committing transaction " + tx.getId());
// FIXME: Better way?
tx.put(JDataDummy.getInstance());
var current = new LinkedHashMap<JObjectKey, TxRecord.TxObjectRecord<?>>();
var dependenciesLocked = new LinkedHashMap<JObjectKey, TransactionObjectLocked<?>>();
Map<JObjectKey, TransactionObject<?>> reads;
var writes = new LinkedHashMap<JObjectKey, TxRecord.TxObjectRecord<?>>();
var dependenciesLocked = new LinkedHashMap<JObjectKey, Optional<JDataVersionedWrapper>>();
Map<JObjectKey, TransactionObject<?>> readSet;
var toUnlock = new ArrayList<AutoCloseableNoThrow>();
Consumer<JObjectKey> addDependency =
key -> {
dependenciesLocked.computeIfAbsent(key, k -> {
var got = getObjLock(JData.class, k);
Log.trace("Adding dependency " + k.toString() + " -> " + got.data().map(JDataVersionedWrapper::data).map(JData::key).orElse(null));
toUnlock.add(got.lock);
return got;
var lock = lockManager.lockObject(k);
toUnlock.add(lock);
return snapshotManager.readObjectDirect(k);
});
};
@@ -135,13 +75,12 @@ public class JObjectManager {
try {
try {
Function<JObjectKey, JData> getCurrent =
key -> switch (current.get(key)) {
key -> switch (writes.get(key)) {
case TxRecord.TxObjectRecordWrite<?> write -> write.data();
case TxRecord.TxObjectRecordDeleted deleted -> null;
case null ->
tx.readSource().get(JData.class, key).data().map(JDataVersionedWrapper::data).orElse(null);
case null -> tx.readSource().get(JData.class, key).orElse(null);
default -> {
throw new TxCommitException("Unexpected value: " + current.get(key));
throw new TxCommitException("Unexpected value: " + writes.get(key));
}
};
@@ -177,71 +116,82 @@ public class JObjectManager {
}
}
}
current.putAll(currentIteration);
writes.putAll(currentIteration);
} while (somethingChanged);
} finally {
reads = tx.reads();
Stream.concat(reads.keySet().stream(), current.keySet().stream())
if (writes.isEmpty()) {
Log.trace("Committing transaction - no changes");
return new TransactionHandle() {
@Override
public void onFlush(Runnable runnable) {
runnable.run();
}
};
}
Log.trace("Committing transaction start");
// FIXME: Better way?
addDependency.accept(JDataDummy.TX_ID_OBJ_NAME);
tx.put(JDataDummy.getInstance());
} finally {
readSet = tx.reads();
Stream.concat(readSet.keySet().stream(), writes.keySet().stream())
.sorted(Comparator.comparing(JObjectKey::toString))
.forEach(addDependency);
for (var read : reads.entrySet()) {
for (var read : readSet.entrySet()) {
if (read.getValue() instanceof TransactionObjectLocked<?> locked) {
toUnlock.add(locked.lock);
toUnlock.add(locked.lock());
}
}
}
for (var read : reads.entrySet()) {
var snapshotId = tx.snapshot().id();
var newId = _txCounter.get() + 1;
for (var read : readSet.entrySet()) {
var dep = dependenciesLocked.get(read.getKey());
if (dep.data().isEmpty()) {
if (dep.isEmpty() != read.getValue().data().isEmpty()) {
Log.trace("Checking read dependency " + read.getKey() + " - not found");
throw new TxCommitException("Serialization hazard: " + dep.isEmpty() + " vs " + read.getValue().data().isEmpty());
}
if (dep.isEmpty()) {
// TODO: Every write gets a dependency due to hooks
continue;
// assert false;
// throw new TxCommitException("Serialization hazard: " + dep.isEmpty() + " vs " + read.getValue().data().isEmpty());
}
if (dep.data().orElse(null) != read.getValue().data().orElse(null)) {
Log.trace("Checking dependency " + read.getKey() + " - changed already");
throw new TxCommitException("Serialization hazard: " + dep.data().get().version() + " vs " + tx.getId());
}
if (dep.data().get().version() >= tx.getId()) {
assert false;
if (dep.get().version() > snapshotId) {
Log.trace("Checking dependency " + read.getKey() + " - newer than");
throw new TxCommitException("Serialization hazard: " + dep.data().get().version() + " vs " + tx.getId());
throw new TxCommitException("Serialization hazard: " + dep.get().version() + " vs " + snapshotId);
}
Log.trace("Checking dependency " + read.getKey() + " - ok with read");
}
Log.tracef("Flushing transaction %d to storage", tx.getId());
Log.tracef("Flushing transaction %d to storage", newId);
for (var action : current.entrySet()) {
var dep = dependenciesLocked.get(action.getKey());
if (dep.data().isPresent() && dep.data.get().version() >= tx.getId()) {
Log.trace("Skipping write " + action.getKey() + " - dependency " + dep.data().get().version() + " vs " + tx.getId());
continue;
}
var realNewId = _txCounter.getAndIncrement() + 1;
assert realNewId == newId;
switch (action.getValue()) {
case TxRecord.TxObjectRecordWrite<?> write -> {
Log.trace("Writing " + action.getKey());
var wrapped = new JDataVersionedWrapper(write.data(), tx.getId());
_objects.put(action.getKey(), new JDataWrapper<>(wrapped));
}
case TxRecord.TxObjectRecordDeleted deleted -> {
Log.trace("Deleting " + action.getKey());
_objects.remove(action.getKey());
}
default -> {
throw new TxCommitException("Unexpected value: " + action.getValue());
}
}
}
Log.tracef("Committing transaction %d to storage", tx.getId());
var addFlushCallback = writebackObjectPersistentStore.commitTx(current.values(), tx.getId());
Log.tracef("Committing transaction %d to storage", newId);
var addFlushCallback = snapshotManager.commitTx(
writes.values().stream()
.filter(r -> {
if (r instanceof TxRecord.TxObjectRecordWrite<?>(JData data)) {
var dep = dependenciesLocked.get(data.key());
if (dep.isPresent() && dep.get().version() > snapshotId) {
Log.trace("Skipping write " + data.key() + " - dependency " + dep.get().version() + " vs " + snapshotId);
return false;
}
}
return true;
}).toList(),
newId);
for (var callback : tx.getOnCommit()) {
callback.run();
@@ -252,11 +202,6 @@ public class JObjectManager {
}
return new TransactionHandle() {
@Override
public long getId() {
return tx.getId();
}
@Override
public void onFlush(Runnable runnable) {
addFlushCallback.accept(runnable);
@@ -269,72 +214,44 @@ public class JObjectManager {
for (var unlock : toUnlock) {
unlock.close();
}
tx.close();
}
}
public void rollback(TransactionPrivate tx) {
verifyReady();
Log.trace("Rolling back transaction " + tx.getId());
tx.reads().forEach((key, value) -> {
if (value instanceof TransactionObjectLocked<?> locked) {
locked.lock.close();
locked.lock().close();
}
});
tx.close();
}
private record TransactionObjectNoLock<T extends JData>
(Optional<JDataVersionedWrapper> data)
implements TransactionObject<T> {
}
private record TransactionObjectLocked<T extends JData>
(Optional<JDataVersionedWrapper> data, AutoCloseableNoThrow lock)
implements TransactionObject<T> {
}
private class JDataWrapper<T extends JData> extends WeakReference<JDataVersionedWrapper> {
private static final Cleaner CLEANER = Cleaner.create();
public JDataWrapper(JDataVersionedWrapper referent) {
super(referent);
var key = referent.data().key();
CLEANER.register(referent, () -> {
_objects.remove(key, this);
});
}
@Override
public String toString() {
return "JDataWrapper{" +
"ref=" + get() +
'}';
}
}
private class TransactionObjectSourceImpl implements TransactionObjectSource {
private final long _txId;
private TransactionObjectSourceImpl(long txId) {
_txId = txId;
}
@Override
public <T extends JData> TransactionObject<T> get(Class<T> type, JObjectKey key) {
var got = getObj(type, key);
if (got.data().isPresent() && got.data().get().version() > _txId) {
throw new TxCommitException("Serialization race for " + key + ": " + got.data().get().version() + " vs " + _txId);
}
return got;
}
@Override
public <T extends JData> TransactionObject<T> getWriteLocked(Class<T> type, JObjectKey key) {
var got = getObjLock(type, key);
if (got.data().isPresent() && got.data().get().version() > _txId) {
got.lock().close();
throw new TxCommitException("Serialization race for " + key + ": " + got.data().get().version() + " vs " + _txId);
}
return got;
}
}
// private class TransactionObjectSourceImpl implements TransactionObjectSource {
// private final long _txId;
//
// private TransactionObjectSourceImpl(long txId) {
// _txId = txId;
// }
//
// @Override
// public <T extends JData> TransactionObject<T> get(Class<T> type, JObjectKey key) {
// var got = getObj(type, key);
// if (got.data().isPresent() && got.data().get().version() > _txId) {
// throw new TxCommitException("Serialization race for " + key + ": " + got.data().get().version() + " vs " + _txId);
// }
// return got;
// }
//
// @Override
// public <T extends JData> TransactionObject<T> getWriteLocked(Class<T> type, JObjectKey key) {
// var got = getObjLock(type, key);
// if (got.data().isPresent() && got.data().get().version() > _txId) {
// got.lock().close();
// throw new TxCommitException("Serialization race for " + key + ": " + got.data().get().version() + " vs " + _txId);
// }
// return got;
// }
// }
}

View File

@@ -0,0 +1,14 @@
package com.usatiuk.dhfs.objects;
import com.usatiuk.dhfs.utils.AutoCloseableNoThrow;
import com.usatiuk.dhfs.utils.DataLocker;
import jakarta.enterprise.context.ApplicationScoped;
@ApplicationScoped
public class LockManager {
private final DataLocker _objLocker = new DataLocker();
public AutoCloseableNoThrow lockObject(JObjectKey key) {
return _objLocker.lock(key);
}
}

View File

@@ -2,27 +2,22 @@ package com.usatiuk.dhfs.objects;
import org.apache.commons.lang3.tuple.Pair;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.stream.Stream;
import java.util.*;
public class MergingKvIterator<K extends Comparable<K>, V> implements CloseableKvIterator<K, V> {
private final List<CloseableKvIterator<K, V>> _iterators;
private final Map<CloseableKvIterator<K, V>, Integer> _iterators;
private final SortedMap<K, CloseableKvIterator<K, V>> _sortedIterators = new TreeMap<>();
public MergingKvIterator(List<CloseableKvIterator<K, V>> iterators) {
_iterators = iterators;
int counter = 0;
var iteratorsTmp = new HashMap<CloseableKvIterator<K, V>, Integer>();
for (CloseableKvIterator<K, V> iterator : iterators) {
iteratorsTmp.put(iterator, counter++);
}
_iterators = Collections.unmodifiableMap(iteratorsTmp);
for (CloseableKvIterator<K, V> iterator : iterators) {
if (!iterator.hasNext()) {
continue;
}
K key = iterator.peekNextKey();
if (key != null) {
_sortedIterators.put(key, iterator);
}
advanceIterator(iterator);
}
}
@@ -31,23 +26,36 @@ public class MergingKvIterator<K extends Comparable<K>, V> implements CloseableK
this(List.of(iterators));
}
@SafeVarargs
public MergingKvIterator(MergingKvIterator<K, V> parent, CloseableKvIterator<K, V>... iterators) {
this(Stream.concat(parent._iterators.stream(), Stream.of(iterators)).toList());
private void advanceIterator(CloseableKvIterator<K, V> iterator) {
if (!iterator.hasNext()) {
return;
}
K key = iterator.peekNextKey();
if (!_sortedIterators.containsKey(key)) {
_sortedIterators.put(key, iterator);
return;
}
var oursPrio = _iterators.get(iterator);
var them = _sortedIterators.get(key);
var theirsPrio = _iterators.get(them);
if (oursPrio < theirsPrio) {
_sortedIterators.put(key, iterator);
advanceIterator(them);
}
}
@Override
public K peekNextKey() {
var cur = _sortedIterators.pollFirstEntry();
if (cur == null) {
if (_sortedIterators.isEmpty())
throw new NoSuchElementException();
}
return cur.getKey();
return _sortedIterators.firstKey();
}
@Override
public void close() {
for (CloseableKvIterator<K, V> iterator : _iterators) {
for (CloseableKvIterator<K, V> iterator : _iterators.keySet()) {
iterator.close();
}
}
@@ -64,10 +72,7 @@ public class MergingKvIterator<K extends Comparable<K>, V> implements CloseableK
throw new NoSuchElementException();
}
var curVal = cur.getValue().next();
if (cur.getValue().hasNext()) {
var nextKey = cur.getValue().peekNextKey();
_sortedIterators.put(nextKey, cur.getValue());
}
advanceIterator(cur.getValue());
return curVal;
}
}

View File

@@ -0,0 +1,248 @@
package com.usatiuk.dhfs.objects;
import com.usatiuk.dhfs.objects.persistence.IteratorStart;
import com.usatiuk.dhfs.objects.transaction.TxRecord;
import com.usatiuk.dhfs.utils.AutoCloseableNoThrow;
import io.quarkus.logging.Log;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import org.apache.commons.collections4.MultiValuedMap;
import org.apache.commons.collections4.multimap.HashSetValuedHashMap;
import org.apache.commons.lang3.mutable.MutableObject;
import org.apache.commons.lang3.tuple.Pair;
import javax.annotation.Nonnull;
import java.lang.ref.Cleaner;
import java.util.*;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.function.Consumer;
@ApplicationScoped
public class SnapshotManager {
@Inject
WritebackObjectPersistentStore delegateStore;
private interface SnapshotEntry {
}
private record SnapshotEntryObject(JDataVersionedWrapper data) implements SnapshotEntry {
}
private record SnapshotEntryDeleted() implements SnapshotEntry {
}
private record SnapshotKey(JObjectKey key, long version) implements Comparable<SnapshotKey> {
@Override
public int compareTo(@Nonnull SnapshotKey o) {
return Comparator.comparing(SnapshotKey::key)
.thenComparing(SnapshotKey::version)
.compare(this, o);
}
}
private long _lastSnapshotId = 0;
private long _lastAliveSnapshotId = -1;
private final Queue<Long> _snapshotIds = new ArrayDeque<>();
private final ConcurrentSkipListMap<SnapshotKey, SnapshotEntry> _objects = new ConcurrentSkipListMap<>();
private final MultiValuedMap<Long, SnapshotKey> _snapshotBounds = new HashSetValuedHashMap<>();
private final HashMap<Long, Long> _snapshotRefCounts = new HashMap<>();
private void verify() {
assert _snapshotIds.isEmpty() == (_lastAliveSnapshotId == -1);
assert _snapshotIds.isEmpty() || _snapshotIds.peek() == _lastAliveSnapshotId;
}
Consumer<Runnable> commitTx(Collection<TxRecord.TxObjectRecord<?>> writes, long id) {
synchronized (this) {
if (!_snapshotIds.isEmpty()) {
verify();
for (var action : writes) {
var current = delegateStore.readObjectVerbose(action.key());
Pair<SnapshotKey, SnapshotEntry> newSnapshotEntry = switch (current) {
case WritebackObjectPersistentStore.VerboseReadResultPersisted(
Optional<JDataVersionedWrapper> data
) -> Pair.of(new SnapshotKey(action.key(), _snapshotIds.peek()),
data.<SnapshotEntry>map(SnapshotEntryObject::new).orElse(new SnapshotEntryDeleted()));
case WritebackObjectPersistentStore.VerboseReadResultPending(
TxWriteback.PendingWriteEntry pending
) -> switch (pending) {
case TxWriteback.PendingWrite write ->
Pair.of(new SnapshotKey(action.key(), write.bundleId()), new SnapshotEntryObject(write.data()));
case TxWriteback.PendingDelete delete ->
Pair.of(new SnapshotKey(action.key(), delete.bundleId()), new SnapshotEntryDeleted());
default -> throw new IllegalStateException("Unexpected value: " + pending);
};
default -> throw new IllegalStateException("Unexpected value: " + current);
};
_objects.put(newSnapshotEntry.getLeft(), newSnapshotEntry.getRight());
_snapshotBounds.put(newSnapshotEntry.getLeft().version(), newSnapshotEntry.getLeft());
}
}
verify();
return delegateStore.commitTx(writes, id);
}
}
private void unrefSnapshot(long id) {
synchronized (this) {
verify();
var refCount = _snapshotRefCounts.merge(id, -1L, (a, b) -> a + b == 0 ? null : a + b);
if (!(refCount == null && id == _lastAliveSnapshotId)) {
return;
}
long curCount;
long curId = id;
do {
_snapshotIds.poll();
for (var key : _snapshotBounds.remove(curId)) {
_objects.remove(key);
}
if (_snapshotIds.isEmpty()) {
_lastAliveSnapshotId = -1;
break;
}
curId = _snapshotIds.peek();
_lastAliveSnapshotId = curId;
curCount = _snapshotRefCounts.getOrDefault(curId, 0L);
} while (curCount == 0);
verify();
}
}
public class Snapshot implements AutoCloseableNoThrow {
private final long _id;
private static final Cleaner CLEANER = Cleaner.create();
private final MutableObject<Boolean> _closed = new MutableObject<>(false);
public long id() {
return _id;
}
private Snapshot(long id) {
_id = id;
synchronized (SnapshotManager.this) {
verify();
if (_lastSnapshotId > id)
throw new IllegalArgumentException("Snapshot id less than last? " + id + " vs " + _lastSnapshotId);
_lastSnapshotId = id;
if (_lastAliveSnapshotId == -1)
_lastAliveSnapshotId = id;
_snapshotIds.add(id);
_snapshotRefCounts.merge(id, 1L, Long::sum);
verify();
}
var closedRef = _closed;
var idRef = _id;
CLEANER.register(this, () -> {
if (!closedRef.getValue()) {
Log.error("Snapshot " + idRef + " was not closed before GC");
}
});
}
public class SnapshotKvIterator implements CloseableKvIterator<JObjectKey, TombstoneMergingKvIterator.DataType<JDataVersionedWrapper>> {
private final CloseableKvIterator<SnapshotKey, SnapshotEntry> _backing;
private Pair<JObjectKey, TombstoneMergingKvIterator.DataType<JDataVersionedWrapper>> _next;
public SnapshotKvIterator(IteratorStart start, JObjectKey key) {
_backing = new NavigableMapKvIterator<>(_objects, start, new SnapshotKey(key, 0L));
fillNext();
}
private void fillNext() {
while (_backing.hasNext() && _next == null) {
var next = _backing.next();
var nextNextKey = _backing.hasNext() ? _backing.peekNextKey() : null;
while (nextNextKey != null && nextNextKey.key.equals(next.getKey().key()) && nextNextKey.version() <= _id) {
next = _backing.next();
nextNextKey = _backing.peekNextKey();
}
if (next.getKey().version() <= _id) {
_next = switch (next.getValue()) {
case SnapshotEntryObject(JDataVersionedWrapper data) ->
Pair.of(next.getKey().key(), new TombstoneMergingKvIterator.Data<>(data));
case SnapshotEntryDeleted() ->
Pair.of(next.getKey().key(), new TombstoneMergingKvIterator.Tombstone<>());
default -> throw new IllegalStateException("Unexpected value: " + next.getValue());
};
}
}
}
@Override
public JObjectKey peekNextKey() {
if (_next == null)
throw new NoSuchElementException();
return _next.getKey();
}
@Override
public void close() {
_backing.close();
}
@Override
public boolean hasNext() {
return _next != null;
}
@Override
public Pair<JObjectKey, TombstoneMergingKvIterator.DataType<JDataVersionedWrapper>> next() {
if (_next == null)
throw new NoSuchElementException("No more elements");
var ret = _next;
_next = null;
fillNext();
return ret;
}
}
public CloseableKvIterator<JObjectKey, JDataVersionedWrapper> getIterator(IteratorStart start, JObjectKey key) {
return new TombstoneMergingKvIterator<>(new SnapshotKvIterator(start, key), delegateStore.getIterator(start, key));
}
public CloseableKvIterator<JObjectKey, JDataVersionedWrapper> getIterator(JObjectKey key) {
return getIterator(IteratorStart.GE, key);
}
@Nonnull
public Optional<JDataVersionedWrapper> readObject(JObjectKey name) {
try (var it = getIterator(name)) {
if (it.hasNext()) {
var read = it.next();
if (read.getKey().equals(name)) {
return Optional.of(read.getValue());
}
}
}
return Optional.empty();
}
@Override
public void close() {
if (_closed.getValue()) {
return;
}
_closed.setValue(true);
unrefSnapshot(_id);
}
}
public Snapshot createSnapshot(long id) {
return new Snapshot(id);
}
@Nonnull
Optional<JDataVersionedWrapper> readObjectDirect(JObjectKey name) {
return delegateStore.readObject(name);
}
}

View File

@@ -0,0 +1,54 @@
package com.usatiuk.dhfs.objects;
import org.apache.commons.lang3.tuple.Pair;
import java.util.List;
public class TombstoneMergingKvIterator<K extends Comparable<K>, V> implements CloseableKvIterator<K, V> {
private final CloseableKvIterator<K, V> _backing;
public TombstoneMergingKvIterator(List<CloseableKvIterator<K, DataType<V>>> iterators) {
_backing = new PredicateKvIterator<>(
new MergingKvIterator<>(iterators),
pair -> {
if (pair instanceof Tombstone) {
return null;
}
return ((Data<V>) pair).value;
});
}
@SafeVarargs
public TombstoneMergingKvIterator(CloseableKvIterator<K, DataType<V>>... iterators) {
this(List.of(iterators));
}
public interface DataType<T> {
}
public record Tombstone<V>() implements DataType<V> {
}
public record Data<V>(V value) implements DataType<V> {
}
@Override
public K peekNextKey() {
return _backing.peekNextKey();
}
@Override
public void close() {
_backing.close();
}
@Override
public boolean hasNext() {
return _backing.hasNext();
}
@Override
public Pair<K, V> next() {
return _backing.next();
}
}

View File

@@ -37,6 +37,7 @@ public class TransactionManagerImpl implements TransactionManager {
Log.trace("Transaction commit failed", e);
throw e;
} finally {
_currentTransaction.get().close();
_currentTransaction.remove();
}
}
@@ -53,6 +54,7 @@ public class TransactionManagerImpl implements TransactionManager {
Log.error("Transaction rollback failed", e);
throw e;
} finally {
_currentTransaction.get().close();
_currentTransaction.remove();
}
}

View File

@@ -0,0 +1,11 @@
package com.usatiuk.dhfs.objects;
import com.usatiuk.dhfs.objects.transaction.TransactionObject;
import com.usatiuk.dhfs.utils.AutoCloseableNoThrow;
import java.util.Optional;
public record TransactionObjectLocked<T extends JData>
(Optional<JDataVersionedWrapper> data, AutoCloseableNoThrow lock)
implements TransactionObject<T> {
}

View File

@@ -0,0 +1,10 @@
package com.usatiuk.dhfs.objects;
import com.usatiuk.dhfs.objects.transaction.TransactionObject;
import java.util.Optional;
public record TransactionObjectNoLock<T extends JData>
(Optional<JDataVersionedWrapper> data)
implements TransactionObject<T> {
}

View File

@@ -32,9 +32,9 @@ public interface TxWriteback {
record PendingDelete(JObjectKey key, long bundleId) implements PendingWriteEntry {
}
CloseableKvIterator<JObjectKey, JDataVersionedWrapper> getIterator(IteratorStart start, JObjectKey key);
CloseableKvIterator<JObjectKey, TombstoneMergingKvIterator.DataType<JDataVersionedWrapper>> getIterator(IteratorStart start, JObjectKey key);
default CloseableKvIterator<JObjectKey, JDataVersionedWrapper> getIterator(JObjectKey key) {
default CloseableKvIterator<JObjectKey, TombstoneMergingKvIterator.DataType<JDataVersionedWrapper>> getIterator(JObjectKey key) {
return getIterator(IteratorStart.GE, key);
}
}

View File

@@ -379,14 +379,13 @@ public class TxWritebackImpl implements TxWriteback {
// Returns an iterator with a view of all commited objects
// Does not have to guarantee consistent view, snapshots are handled by upper layers
@Override
public CloseableKvIterator<JObjectKey, JDataVersionedWrapper> getIterator(IteratorStart start, JObjectKey key) {
public CloseableKvIterator<JObjectKey, TombstoneMergingKvIterator.DataType<JDataVersionedWrapper>> getIterator(IteratorStart start, JObjectKey key) {
return new PredicateKvIterator<>(
new NavigableMapKvIterator<>(_pendingWrites, start, key),
e -> {
if (e instanceof PendingWrite pw) {
return pw.data();
}
return null;
e -> switch (e) {
case PendingWrite p -> new TombstoneMergingKvIterator.Data<>(p.data());
case PendingDelete d -> new TombstoneMergingKvIterator.Tombstone<>();
default -> throw new IllegalStateException("Unexpected value: " + e);
});
}
}

View File

@@ -45,6 +45,24 @@ public class WritebackObjectPersistentStore {
};
}
public interface VerboseReadResult {
}
public record VerboseReadResultPersisted(Optional<JDataVersionedWrapper> data) implements VerboseReadResult {
}
public record VerboseReadResultPending(TxWriteback.PendingWriteEntry pending) implements VerboseReadResult {
}
@Nonnull
VerboseReadResult readObjectVerbose(JObjectKey key) {
var pending = txWriteback.getPendingWrite(key).orElse(null);
if (pending != null) {
return new VerboseReadResultPending(pending);
}
return new VerboseReadResultPersisted(delegate.readObject(key));
}
Consumer<Runnable> commitTx(Collection<TxRecord.TxObjectRecord<?>> writes, long id) {
var bundle = txWriteback.createBundle();
try {
@@ -78,11 +96,12 @@ public class WritebackObjectPersistentStore {
// Returns an iterator with a view of all commited objects
// Does not have to guarantee consistent view, snapshots are handled by upper layers
public CloseableKvIterator<JObjectKey, JDataVersionedWrapper> getIterator(IteratorStart start, JObjectKey key) {
return new MergingKvIterator<>(delegate.getIterator(start, key), txWriteback.getIterator(start, key));
public CloseableKvIterator<JObjectKey, TombstoneMergingKvIterator.DataType<JDataVersionedWrapper>> getIterator(IteratorStart start, JObjectKey key) {
return new MergingKvIterator<>(txWriteback.getIterator(start, key),
new PredicateKvIterator<>(delegate.getIterator(start, key), TombstoneMergingKvIterator.Data::new));
}
public CloseableKvIterator<JObjectKey, JDataVersionedWrapper> getIterator(JObjectKey key) {
public CloseableKvIterator<JObjectKey, TombstoneMergingKvIterator.DataType<JDataVersionedWrapper>> getIterator(JObjectKey key) {
return getIterator(IteratorStart.GE, key);
}
}

View File

@@ -116,7 +116,7 @@ public class CachingObjectPersistentStore {
new NavigableMapKvIterator<>(_sortedCache, start, key),
e -> e.object().orElse(null)
),
delegate.getIterator(start, key));
delegate.getIterator(start, key)); // TODO: Doesn't work
}
public CloseableKvIterator<JObjectKey, JDataVersionedWrapper> getIterator(JObjectKey key) {

View File

@@ -4,5 +4,5 @@ public enum IteratorStart {
LT,
LE,
GT,
GE
GE,
}

View File

@@ -10,12 +10,14 @@ import io.quarkus.runtime.StartupEvent;
import jakarta.annotation.Priority;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
import org.apache.commons.lang3.mutable.MutableObject;
import org.apache.commons.lang3.tuple.Pair;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.lmdbjava.*;
import javax.annotation.Nonnull;
import java.io.IOException;
import java.lang.ref.Cleaner;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.util.Collection;
@@ -94,7 +96,17 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore {
private final Cursor<byte[]> _cursor = _db.openCursor(_txn);
private boolean _hasNext = false;
private static final Cleaner CLEANER = Cleaner.create();
private final MutableObject<Boolean> _closed = new MutableObject<>(false);
LmdbKvIterator(IteratorStart start, JObjectKey key) {
var closedRef = _closed;
CLEANER.register(this, () -> {
if (!closedRef.getValue()) {
Log.error("Iterator was not closed before GC");
}
});
verifyReady();
if (!_cursor.get(key.toString().getBytes(StandardCharsets.UTF_8), GetOp.MDB_SET_RANGE)) {
return;
@@ -131,6 +143,10 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore {
@Override
public void close() {
if (_closed.getValue()) {
return;
}
_closed.setValue(true);
_cursor.close();
_txn.close();
}

View File

@@ -1,58 +0,0 @@
package com.usatiuk.dhfs.objects.transaction;
import com.usatiuk.dhfs.objects.JData;
import com.usatiuk.dhfs.objects.JObjectKey;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
public class ReadTrackingObjectSource implements TransactionObjectSource {
private final TransactionObjectSource _delegate;
private final Map<JObjectKey, TransactionObject<?>> _readSet = new HashMap<>();
public ReadTrackingObjectSource(TransactionObjectSource delegate) {
_delegate = delegate;
}
public Map<JObjectKey, TransactionObject<?>> getRead() {
return Collections.unmodifiableMap(_readSet);
}
@Override
public <T extends JData> TransactionObject<T> get(Class<T> type, JObjectKey key) {
var got = _readSet.get(key);
if (got == null) {
var read = _delegate.get(type, key);
_readSet.put(key, read);
return read;
}
got.data().ifPresent(data -> {
if (!type.isInstance(data.data()))
throw new IllegalStateException("Type mismatch for " + got + ": expected " + type + ", got " + data.getClass());
});
return (TransactionObject<T>) got;
}
@Override
public <T extends JData> TransactionObject<T> getWriteLocked(Class<T> type, JObjectKey key) {
var got = _readSet.get(key);
if (got == null) {
var read = _delegate.getWriteLocked(type, key);
_readSet.put(key, read);
return read;
}
got.data().ifPresent(data -> {
if (!type.isInstance(data.data()))
throw new IllegalStateException("Type mismatch for " + got + ": expected " + type + ", got " + data.getClass());
});
return (TransactionObject<T>) got;
}
}

View File

@@ -0,0 +1,111 @@
package com.usatiuk.dhfs.objects.transaction;
import com.usatiuk.dhfs.objects.*;
import com.usatiuk.dhfs.objects.persistence.IteratorStart;
import com.usatiuk.dhfs.utils.AutoCloseableNoThrow;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import org.apache.commons.lang3.tuple.Pair;
import java.util.*;
@ApplicationScoped
public class ReadTrackingObjectSourceFactory {
@Inject
LockManager lockManager;
public ReadTrackingTransactionObjectSource create(SnapshotManager.Snapshot snapshot) {
return new ReadTrackingObjectSourceImpl(snapshot);
}
public class ReadTrackingObjectSourceImpl implements ReadTrackingTransactionObjectSource {
private final SnapshotManager.Snapshot _snapshot;
private final Map<JObjectKey, TransactionObject<?>> _readSet = new HashMap<>();
private final Queue<AutoCloseableNoThrow> _iterators = new ArrayDeque<>();
public ReadTrackingObjectSourceImpl(SnapshotManager.Snapshot snapshot) {
_snapshot = snapshot;
}
public Map<JObjectKey, TransactionObject<?>> getRead() {
return Collections.unmodifiableMap(_readSet);
}
@Override
public <T extends JData> Optional<T> get(Class<T> type, JObjectKey key) {
var got = _readSet.get(key);
if (got == null) {
var read = _snapshot.readObject(key);
_readSet.put(key, new TransactionObjectNoLock<>(read));
return read.map(JDataVersionedWrapper::data).map(type::cast);
}
return got.data().map(JDataVersionedWrapper::data).map(type::cast);
}
@Override
public <T extends JData> Optional<T> getWriteLocked(Class<T> type, JObjectKey key) {
var got = _readSet.get(key);
if (got == null) {
var lock = lockManager.lockObject(key);
try {
var read = _snapshot.readObject(key);
_readSet.put(key, new TransactionObjectLocked<>(read, lock));
return read.map(JDataVersionedWrapper::data).map(type::cast);
} catch (Exception e) {
lock.close();
throw e;
}
}
return got.data().map(JDataVersionedWrapper::data).map(type::cast);
}
@Override
public void close() {
for (var it : _iterators) {
it.close();
}
}
private class ReadTrackingIterator implements CloseableKvIterator<JObjectKey, JData> {
private final CloseableKvIterator<JObjectKey, JDataVersionedWrapper> _backing;
public ReadTrackingIterator(IteratorStart start, JObjectKey key) {
_backing = _snapshot.getIterator(start, key);
}
@Override
public JObjectKey peekNextKey() {
return _backing.peekNextKey();
}
@Override
public void close() {
_backing.close();
}
@Override
public boolean hasNext() {
return _backing.hasNext();
}
@Override
public Pair<JObjectKey, JData> next() {
var got = _backing.next();
_readSet.putIfAbsent(got.getKey(), new TransactionObjectNoLock<>(Optional.of(got.getValue())));
return Pair.of(got.getKey(), got.getValue().data());
}
}
@Override
public CloseableKvIterator<JObjectKey, JData> getIterator(IteratorStart start, JObjectKey key) {
var got = new ReadTrackingIterator(start, key);
_iterators.add(got);
return got;
}
}
}

View File

@@ -0,0 +1,25 @@
package com.usatiuk.dhfs.objects.transaction;
import com.usatiuk.dhfs.objects.JData;
import com.usatiuk.dhfs.objects.JObjectKey;
import com.usatiuk.dhfs.objects.persistence.IteratorStart;
import com.usatiuk.dhfs.utils.AutoCloseableNoThrow;
import org.apache.commons.lang3.tuple.Pair;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
public interface ReadTrackingTransactionObjectSource extends AutoCloseableNoThrow {
<T extends JData> Optional<T> get(Class<T> type, JObjectKey key);
<T extends JData> Optional<T> getWriteLocked(Class<T> type, JObjectKey key);
Iterator<Pair<JObjectKey, JData>> getIterator(IteratorStart start, JObjectKey key);
default Iterator<Pair<JObjectKey, JData>> getIterator(JObjectKey key) {
return getIterator(IteratorStart.GE, key);
}
Map<JObjectKey, TransactionObject<?>> getRead();
}

View File

@@ -2,15 +2,16 @@ package com.usatiuk.dhfs.objects.transaction;
import com.usatiuk.dhfs.objects.JData;
import com.usatiuk.dhfs.objects.JObjectKey;
import com.usatiuk.dhfs.objects.persistence.IteratorStart;
import org.apache.commons.lang3.tuple.Pair;
import javax.annotation.Nonnull;
import java.util.Collection;
import java.util.Iterator;
import java.util.Optional;
// The transaction interface actually used by user code to retrieve objects
public interface Transaction extends TransactionHandle {
long getId();
void onCommit(Runnable runnable);
<T extends JData> Optional<T> get(Class<T> type, JObjectKey key, LockingStrategy strategy);
@@ -25,4 +26,11 @@ public interface Transaction extends TransactionHandle {
default <T extends JData> Optional<T> get(Class<T> type, JObjectKey key) {
return get(type, key, LockingStrategy.OPTIMISTIC);
}
Iterator<Pair<JObjectKey, JData>> getIterator(IteratorStart start, JObjectKey key);
default Iterator<Pair<JObjectKey, JData>> getIterator(JObjectKey key) {
return getIterator(IteratorStart.GE, key);
}
}

View File

@@ -1,5 +1,5 @@
package com.usatiuk.dhfs.objects.transaction;
public interface TransactionFactory {
TransactionPrivate createTransaction(long id, TransactionObjectSource source);
TransactionPrivate createTransaction(long snapshotId);
}

View File

@@ -1,11 +1,12 @@
package com.usatiuk.dhfs.objects.transaction;
import com.usatiuk.dhfs.objects.JData;
import com.usatiuk.dhfs.objects.JDataVersionedWrapper;
import com.usatiuk.dhfs.objects.JObjectKey;
import com.usatiuk.dhfs.objects.WritebackObjectPersistentStore;
import com.usatiuk.dhfs.objects.SnapshotManager;
import com.usatiuk.dhfs.objects.persistence.IteratorStart;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import org.apache.commons.lang3.tuple.Pair;
import javax.annotation.Nonnull;
import java.util.*;
@@ -13,28 +14,26 @@ import java.util.*;
@ApplicationScoped
public class TransactionFactoryImpl implements TransactionFactory {
@Inject
WritebackObjectPersistentStore store; // FIXME:
SnapshotManager snapshotManager;
@Inject
ReadTrackingObjectSourceFactory readTrackingObjectSourceFactory;
@Override
public TransactionPrivate createTransaction(long id, TransactionObjectSource source) {
return new TransactionImpl(id, source);
public TransactionPrivate createTransaction(long snapshotId) {
return new TransactionImpl(snapshotId);
}
private class TransactionImpl implements TransactionPrivate {
private final long _id;
private final ReadTrackingObjectSource _source;
private final ReadTrackingTransactionObjectSource _source;
private final Map<JObjectKey, TxRecord.TxObjectRecord<?>> _writes = new HashMap<>();
private Map<JObjectKey, TxRecord.TxObjectRecord<?>> _newWrites = new HashMap<>();
private final List<Runnable> _onCommit = new ArrayList<>();
private final List<Runnable> _onFlush = new ArrayList<>();
private final SnapshotManager.Snapshot _snapshot;
private TransactionImpl(long id, TransactionObjectSource source) {
_id = id;
_source = new ReadTrackingObjectSource(source);
}
public long getId() {
return _id;
private TransactionImpl(long snapshotId) {
_snapshot = snapshotManager.createSnapshot(snapshotId);
_source = readTrackingObjectSourceFactory.create(_snapshot);
}
@Override
@@ -52,6 +51,11 @@ public class TransactionFactoryImpl implements TransactionFactory {
return Collections.unmodifiableCollection(_onCommit);
}
@Override
public SnapshotManager.Snapshot snapshot() {
return _snapshot;
}
@Override
public Collection<Runnable> getOnFlush() {
return Collections.unmodifiableCollection(_onFlush);
@@ -61,11 +65,7 @@ public class TransactionFactoryImpl implements TransactionFactory {
public <T extends JData> Optional<T> get(Class<T> type, JObjectKey key, LockingStrategy strategy) {
switch (_writes.get(key)) {
case TxRecord.TxObjectRecordWrite<?> write -> {
if (type.isInstance(write.data())) {
return Optional.of((T) write.data());
} else {
throw new IllegalStateException("Type mismatch for " + key + ": expected " + type + ", got " + write.data().getClass());
}
return Optional.of(type.cast(write.data()));
}
case TxRecord.TxObjectRecordDeleted deleted -> {
return Optional.empty();
@@ -75,45 +75,38 @@ public class TransactionFactoryImpl implements TransactionFactory {
}
return switch (strategy) {
case OPTIMISTIC -> (Optional<T>) _source.get(type, key).data().map(JDataVersionedWrapper::data);
case WRITE -> (Optional<T>) _source.getWriteLocked(type, key).data().map(JDataVersionedWrapper::data);
case OPTIMISTIC -> _source.get(type, key);
case WRITE -> _source.getWriteLocked(type, key);
};
}
@Override
public void delete(JObjectKey key) {
// get(JData.class, key, LockingStrategy.OPTIMISTIC);
// FIXME
var got = _writes.get(key);
if (got != null) {
switch (got) {
case TxRecord.TxObjectRecordDeleted deleted -> {
return;
}
default -> {
}
if (got instanceof TxRecord.TxObjectRecordDeleted) {
return;
}
}
//
// var read = _source.get(JData.class, key).orElse(null);
// if (read == null) {
// return;
// }
_writes.put(key, new TxRecord.TxObjectRecordDeleted(key)); // FIXME:
_writes.put(key, new TxRecord.TxObjectRecordDeleted(key));
_newWrites.put(key, new TxRecord.TxObjectRecordDeleted(key));
}
@Nonnull
@Override
public Collection<JObjectKey> findAllObjects() {
return store.findAllObjects();
// return store.findAllObjects();
return List.of();
}
@Override
public Iterator<Pair<JObjectKey, JData>> getIterator(IteratorStart start, JObjectKey key) {
return _source.getIterator(start, key);
}
@Override
public void put(JData obj) {
// get(JData.class, obj.getKey(), LockingStrategy.OPTIMISTIC);
_writes.put(obj.key(), new TxRecord.TxObjectRecordWrite<>(obj));
_newWrites.put(obj.key(), new TxRecord.TxObjectRecordWrite<>(obj));
}
@@ -131,9 +124,14 @@ public class TransactionFactoryImpl implements TransactionFactory {
}
@Override
public ReadTrackingObjectSource readSource() {
public ReadTrackingTransactionObjectSource readSource() {
return _source;
}
}
@Override
public void close() {
_source.close();
_snapshot.close();
}
}
}

View File

@@ -1,7 +1,5 @@
package com.usatiuk.dhfs.objects.transaction;
public interface TransactionHandle {
long getId();
void onFlush(Runnable runnable);
}

View File

@@ -1,10 +0,0 @@
package com.usatiuk.dhfs.objects.transaction;
import com.usatiuk.dhfs.objects.JData;
import com.usatiuk.dhfs.objects.JObjectKey;
public interface TransactionObjectSource {
<T extends JData> TransactionObject<T> get(Class<T> type, JObjectKey key);
<T extends JData> TransactionObject<T> getWriteLocked(Class<T> type, JObjectKey key);
}

View File

@@ -1,17 +1,21 @@
package com.usatiuk.dhfs.objects.transaction;
import com.usatiuk.dhfs.objects.JObjectKey;
import com.usatiuk.dhfs.objects.SnapshotManager;
import com.usatiuk.dhfs.utils.AutoCloseableNoThrow;
import java.util.Collection;
import java.util.Map;
// The transaction interface actually used by user code to retrieve objects
public interface TransactionPrivate extends Transaction, TransactionHandlePrivate {
public interface TransactionPrivate extends Transaction, TransactionHandlePrivate, AutoCloseableNoThrow {
Collection<TxRecord.TxObjectRecord<?>> drainNewWrites();
Map<JObjectKey, TransactionObject<?>> reads();
ReadTrackingObjectSource readSource();
ReadTrackingTransactionObjectSource readSource();
Collection<Runnable> getOnCommit();
SnapshotManager.Snapshot snapshot();
}

View File

@@ -0,0 +1,105 @@
package com.usatiuk.dhfs.objects;
import org.apache.commons.lang3.tuple.Pair;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
public class MergingKvIteratorTest {
private class SimpleIteratorWrapper<K extends Comparable<K>, V> implements CloseableKvIterator<K, V> {
private final Iterator<Pair<K, V>> _iterator;
private Pair<K, V> _next;
public SimpleIteratorWrapper(Iterator<Pair<K, V>> iterator) {
_iterator = iterator;
fillNext();
}
private void fillNext() {
while (_iterator.hasNext() && _next == null) {
_next = _iterator.next();
}
}
@Override
public K peekNextKey() {
if (_next == null) {
throw new NoSuchElementException();
}
return _next.getKey();
}
@Override
public void close() {
}
@Override
public boolean hasNext() {
return _next != null;
}
@Override
public Pair<K, V> next() {
if (_next == null) {
throw new NoSuchElementException("No more elements");
}
var ret = _next;
_next = null;
fillNext();
return ret;
}
}
@Test
public void testTestIterator() {
var list = List.of(Pair.of(1, 2), Pair.of(3, 4), Pair.of(5, 6));
var iterator = new SimpleIteratorWrapper<>(list.iterator());
var realIterator = list.iterator();
while (realIterator.hasNext()) {
Assertions.assertTrue(iterator.hasNext());
Assertions.assertEquals(realIterator.next(), iterator.next());
}
Assertions.assertFalse(iterator.hasNext());
var emptyList = List.<Pair<Integer, Integer>>of();
var emptyIterator = new SimpleIteratorWrapper<>(emptyList.iterator());
Assertions.assertFalse(emptyIterator.hasNext());
}
@Test
public void testSimple() {
var source1 = List.of(Pair.of(1, 2), Pair.of(3, 4), Pair.of(5, 6)).iterator();
var source2 = List.of(Pair.of(2, 3), Pair.of(4, 5), Pair.of(6, 7)).iterator();
var mergingIterator = new MergingKvIterator<>(new SimpleIteratorWrapper<>(source1), new SimpleIteratorWrapper<>(source2));
var expected = List.of(Pair.of(1, 2), Pair.of(2, 3), Pair.of(3, 4), Pair.of(4, 5), Pair.of(5, 6), Pair.of(6, 7));
for (var pair : expected) {
Assertions.assertTrue(mergingIterator.hasNext());
Assertions.assertEquals(pair, mergingIterator.next());
}
}
@Test
public void testPriority() {
var source1 = List.of(Pair.of(1, 2), Pair.of(2, 4), Pair.of(5, 6));
var source2 = List.of(Pair.of(1, 3), Pair.of(2, 5), Pair.of(5, 7));
var mergingIterator = new MergingKvIterator<>(new SimpleIteratorWrapper<>(source1.iterator()), new SimpleIteratorWrapper<>(source2.iterator()));
var expected = List.of(Pair.of(1, 2), Pair.of(2, 4), Pair.of(5, 6));
for (var pair : expected) {
Assertions.assertTrue(mergingIterator.hasNext());
Assertions.assertEquals(pair, mergingIterator.next());
}
Assertions.assertFalse(mergingIterator.hasNext());
var mergingIterator2 = new MergingKvIterator<>(new SimpleIteratorWrapper<>(source2.iterator()), new SimpleIteratorWrapper<>(source1.iterator()));
var expected2 = List.of(Pair.of(1, 3), Pair.of(2, 5), Pair.of(5, 7));
for (var pair : expected2) {
Assertions.assertTrue(mergingIterator2.hasNext());
Assertions.assertEquals(pair, mergingIterator2.next());
}
Assertions.assertFalse(mergingIterator2.hasNext());
}
}

View File

@@ -8,6 +8,7 @@ import io.quarkus.test.junit.QuarkusTest;
import jakarta.inject.Inject;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.RepeatedTest;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
@@ -52,7 +53,7 @@ public class ObjectsTest {
});
}
@Test
@RepeatedTest(100)
void createDeleteObject() {
txm.run(() -> {
var newParent = new Parent(JObjectKey.of("ParentCreateDeleteObject"), "John");
@@ -237,13 +238,7 @@ public class ObjectsTest {
return curTx.get(Parent.class, new JObjectKey(key)).orElse(null);
});
// It is possible that thread 2 did get the object after thread 1 committed it, so there is no conflict
Assertions.assertTrue(!thread1Failed.get() || !thread2Failed.get());
if (strategy.equals(LockingStrategy.WRITE)) {
if (!thread1Failed.get())
Assertions.assertFalse(thread2Failed.get());
}
Assertions.assertFalse(!thread1Failed.get() && !thread2Failed.get());
if (!thread1Failed.get()) {
if (!thread2Failed.get()) {

View File

@@ -23,10 +23,6 @@ public class RemoteTransaction {
@Inject
PersistentPeerDataService persistentPeerDataService;
public long getId() {
return curTx.getId();
}
private <T extends JDataRemote> Optional<RemoteObjectDataWrapper<T>> tryDownloadRemote(RemoteObjectMeta obj) {
MutableObject<RemoteObjectDataWrapper<T>> success = new MutableObject<>(null);

View File

@@ -60,6 +60,8 @@ public class DataLocker {
@Override
public void close() {
synchronized (_tag) {
if (_tag.released)
return;
_tag.released = true;
// Notify all because when the object is locked again,
// it's a different lock tag