somewhat even more working transactions 3.0?

This commit is contained in:
2025-02-22 18:35:12 +01:00
parent c60a55b915
commit f5c815f02a
4 changed files with 304 additions and 16 deletions

View File

@@ -173,11 +173,6 @@ public class JObjectManager {
Log.trace("Checking dependency " + read.getKey() + " - ok with read");
}
Log.tracef("Flushing transaction %d to storage", newId);
var realNewId = _txCounter.getAndIncrement() + 1;
assert realNewId == newId;
Log.tracef("Committing transaction %d to storage", newId);
var addFlushCallback = snapshotManager.commitTx(
writes.values().stream()
@@ -193,6 +188,9 @@ public class JObjectManager {
}).toList(),
newId);
var realNewId = _txCounter.getAndIncrement() + 1;
assert realNewId == newId;
for (var callback : tx.getOnCommit()) {
callback.run();
}

View File

@@ -25,6 +25,9 @@ public class SnapshotManager {
private interface SnapshotEntry {
}
private record SnapshotEntryRead(JDataVersionedWrapper data, long whenToRemove) implements SnapshotEntry {
}
private record SnapshotEntryObject(JDataVersionedWrapper data) implements SnapshotEntry {
}
@@ -47,6 +50,7 @@ public class SnapshotManager {
private final ConcurrentSkipListMap<SnapshotKey, SnapshotEntry> _objects = new ConcurrentSkipListMap<>();
private final MultiValuedMap<Long, SnapshotKey> _snapshotBounds = new HashSetValuedHashMap<>();
private final HashMap<Long, Long> _snapshotRefCounts = new HashMap<>();
private final ConcurrentSkipListMap<Long, Long> _snapshotVersions = new ConcurrentSkipListMap<>();
private void verify() {
assert _snapshotIds.isEmpty() == (_lastAliveSnapshotId == -1);
@@ -55,15 +59,20 @@ public class SnapshotManager {
Consumer<Runnable> commitTx(Collection<TxRecord.TxObjectRecord<?>> writes, long id) {
synchronized (this) {
assert id > _lastSnapshotId;
if (!_snapshotIds.isEmpty()) {
verify();
boolean hadBackward = false;
for (var action : writes) {
var current = delegateStore.readObjectVerbose(action.key());
Pair<SnapshotKey, SnapshotEntry> newSnapshotEntry = switch (current) {
case WritebackObjectPersistentStore.VerboseReadResultPersisted(
Optional<JDataVersionedWrapper> data
) -> Pair.of(new SnapshotKey(action.key(), _snapshotIds.peek()),
data.<SnapshotEntry>map(SnapshotEntryObject::new).orElse(new SnapshotEntryDeleted()));
) -> {
hadBackward = true;
yield Pair.of(new SnapshotKey(action.key(), _snapshotIds.peek()),
data.<SnapshotEntry>map(o -> new SnapshotEntryRead(o, id)).orElse(new SnapshotEntryDeleted()));
}
case WritebackObjectPersistentStore.VerboseReadResultPending(
TxWriteback.PendingWriteEntry pending
) -> switch (pending) {
@@ -79,6 +88,11 @@ public class SnapshotManager {
_objects.put(newSnapshotEntry.getLeft(), newSnapshotEntry.getRight());
_snapshotBounds.put(newSnapshotEntry.getLeft().version(), newSnapshotEntry.getLeft());
}
if (hadBackward)
for (var sid : _snapshotIds) {
_snapshotVersions.merge(sid, 1L, Long::sum);
}
}
verify();
@@ -96,10 +110,22 @@ public class SnapshotManager {
long curCount;
long curId = id;
long nextId;
do {
_snapshotIds.poll();
_snapshotVersions.remove(curId);
nextId = _snapshotIds.isEmpty() ? -1 : _snapshotIds.peek();
for (var key : _snapshotBounds.remove(curId)) {
var entry = _objects.get(key);
if (entry instanceof SnapshotEntryRead read) {
if (curId != read.whenToRemove() - 1) {
assert nextId != -1;
if (nextId < read.whenToRemove()) {
_objects.put(new SnapshotKey(key.key(), nextId), entry);
}
}
}
_objects.remove(key);
}
@@ -137,6 +163,7 @@ public class SnapshotManager {
_lastAliveSnapshotId = id;
_snapshotIds.add(id);
_snapshotRefCounts.merge(id, 1L, Long::sum);
_snapshotVersions.put(id, 0L);
verify();
}
var closedRef = _closed;
@@ -169,6 +196,8 @@ public class SnapshotManager {
_next = switch (next.getValue()) {
case SnapshotEntryObject(JDataVersionedWrapper data) ->
Pair.of(next.getKey().key(), new TombstoneMergingKvIterator.Data<>(data));
case SnapshotEntryRead(JDataVersionedWrapper data, long whenToRemove) ->
Pair.of(next.getKey().key(), new TombstoneMergingKvIterator.Data<>(data));
case SnapshotEntryDeleted() ->
Pair.of(next.getKey().key(), new TombstoneMergingKvIterator.Tombstone<>());
default -> throw new IllegalStateException("Unexpected value: " + next.getValue());
@@ -206,8 +235,75 @@ public class SnapshotManager {
}
public class AutoRefreshingSnapshotKvIterator implements CloseableKvIterator<JObjectKey, JDataVersionedWrapper> {
private CloseableKvIterator<JObjectKey, JDataVersionedWrapper> _backing;
private long _lastRefreshed = -1L;
private Pair<JObjectKey, JDataVersionedWrapper> _next;
public AutoRefreshingSnapshotKvIterator(IteratorStart start, JObjectKey key) {
synchronized (SnapshotManager.this) {
long curVersion = _snapshotVersions.get(_id);
_backing = new TombstoneMergingKvIterator<>(new SnapshotKvIterator(start, key), delegateStore.getIterator(start, key));
_next = _backing.hasNext() ? _backing.next() : null;
_lastRefreshed = curVersion;
}
}
private void doRefresh() {
long curVersion = _snapshotVersions.get(_id);
if (curVersion == _lastRefreshed) {
return;
}
if (_next == null) return;
synchronized (SnapshotManager.this) {
curVersion = _snapshotVersions.get(_id);
_backing.close();
_backing = new TombstoneMergingKvIterator<>(new SnapshotKvIterator(IteratorStart.GE, _next.getKey()), delegateStore.getIterator(IteratorStart.GE, _next.getKey()));
var next = _backing.hasNext() ? _backing.next() : null;
assert next != null;
assert next.equals(_next);
_next = next;
_lastRefreshed = curVersion;
}
}
private void prepareNext() {
doRefresh();
if (_backing.hasNext()) {
_next = _backing.next();
} else {
_next = null;
}
}
@Override
public JObjectKey peekNextKey() {
return _next.getKey();
}
@Override
public void close() {
_backing.close();
}
@Override
public boolean hasNext() {
return _next != null;
}
@Override
public Pair<JObjectKey, JDataVersionedWrapper> next() {
if (_next == null) {
throw new NoSuchElementException("No more elements");
}
var ret = _next;
prepareNext();
return ret;
}
}
public CloseableKvIterator<JObjectKey, JDataVersionedWrapper> getIterator(IteratorStart start, JObjectKey key) {
return new TombstoneMergingKvIterator<>(new SnapshotKvIterator(start, key), delegateStore.getIterator(start, key));
return new AutoRefreshingSnapshotKvIterator(start, key);
}
public CloseableKvIterator<JObjectKey, JDataVersionedWrapper> getIterator(JObjectKey key) {

View File

@@ -46,10 +46,15 @@ public interface TransactionManager {
}
}
default void runTries(VoidFn fn, int tries) {
default TransactionHandle runTries(VoidFn fn, int tries) {
if (current() != null) {
fn.apply();
return;
return new TransactionHandle() {
@Override
public void onFlush(Runnable runnable) {
current().onCommit(runnable);
}
};
}
begin();
@@ -61,25 +66,24 @@ public interface TransactionManager {
Log.error("Transaction commit failed", txCommitException);
throw txCommitException;
}
runTries(fn, tries - 1);
return;
return runTries(fn, tries - 1);
} catch (Throwable e) {
rollback();
throw e;
}
try {
commit();
return commit();
} catch (TxCommitException txCommitException) {
if (tries == 0) {
Log.error("Transaction commit failed", txCommitException);
throw txCommitException;
}
runTries(fn, tries - 1);
return runTries(fn, tries - 1);
}
}
default void run(VoidFn fn) {
runTries(fn, 10);
default TransactionHandle run(VoidFn fn) {
return runTries(fn, 10);
}
default <T> T run(Supplier<T> supplier) {

View File

@@ -13,8 +13,11 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
@QuarkusTest
@@ -25,6 +28,17 @@ public class ObjectsTest {
@Inject
Transaction curTx;
private void deleteAndCheck(JObjectKey key) {
txm.run(() -> {
curTx.delete(key);
});
txm.run(() -> {
var parent = curTx.get(JData.class, key).orElse(null);
Assertions.assertNull(parent);
});
}
@Test
void createObject() {
txm.run(() -> {
@@ -252,6 +266,182 @@ public class ObjectsTest {
}
}
@RepeatedTest(100)
void snapshotTest1() {
var key = "SnapshotTest1";
var barrier1 = new CyclicBarrier(2);
var barrier2 = new CyclicBarrier(2);
try (ExecutorService ex = Executors.newFixedThreadPool(3)) {
ex.invokeAll(List.of(
() -> {
barrier1.await();
Log.info("Thread 2 starting tx");
txm.run(() -> {
Log.info("Thread 2 started tx");
curTx.put(new Parent(JObjectKey.of(key), "John"));
Log.info("Thread 2 committing");
});
Log.info("Thread 2 commited");
try {
barrier2.await();
} catch (Throwable e) {
throw new RuntimeException(e);
}
return null;
},
() -> {
Log.info("Thread 1 starting tx");
txm.run(() -> {
try {
Log.info("Thread 1 started tx");
barrier1.await();
barrier2.await();
} catch (Throwable e) {
throw new RuntimeException(e);
}
Log.info("Thread 1 reading");
Assertions.assertTrue(curTx.get(Parent.class, new JObjectKey(key)).isEmpty());
Log.info("Thread 1 done reading");
});
Log.info("Thread 1 finished");
return null;
}
)).forEach(f -> {
try {
f.get();
} catch (Throwable e) {
throw new RuntimeException(e);
}
});
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
txm.run(() -> {
Assertions.assertEquals("John", curTx.get(Parent.class, new JObjectKey(key)).orElseThrow().name());
});
deleteAndCheck(new JObjectKey(key));
}
@RepeatedTest(100)
void snapshotTest2() {
var key = "SnapshotTest2";
var barrier1 = new CyclicBarrier(2);
var barrier2 = new CyclicBarrier(2);
txm.run(() -> {
curTx.put(new Parent(JObjectKey.of(key), "John"));
});
try (ExecutorService ex = Executors.newFixedThreadPool(3)) {
ex.invokeAll(List.of(
() -> {
barrier1.await();
Log.info("Thread 2 starting tx");
txm.run(() -> {
Log.info("Thread 2 started tx");
curTx.put(new Parent(JObjectKey.of(key), "John2"));
Log.info("Thread 2 committing");
});
Log.info("Thread 2 commited");
try {
barrier2.await();
} catch (Throwable e) {
throw new RuntimeException(e);
}
return null;
},
() -> {
Log.info("Thread 1 starting tx");
txm.run(() -> {
try {
Log.info("Thread 1 started tx");
barrier1.await();
barrier2.await();
} catch (Throwable e) {
throw new RuntimeException(e);
}
Log.info("Thread 1 reading");
Assertions.assertEquals("John", curTx.get(Parent.class, new JObjectKey(key)).orElseThrow().name());
Log.info("Thread 1 done reading");
});
Log.info("Thread 1 finished");
return null;
}
)).forEach(f -> {
try {
f.get();
} catch (Throwable e) {
throw new RuntimeException(e);
}
});
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
txm.run(() -> {
Assertions.assertEquals("John2", curTx.get(Parent.class, new JObjectKey(key)).orElseThrow().name());
});
deleteAndCheck(new JObjectKey(key));
}
@RepeatedTest(100)
void snapshotTest3() throws InterruptedException {
var key = "SnapshotTest3";
var barrier0 = new CountDownLatch(1);
var barrier1 = new CyclicBarrier(2);
var barrier2 = new CyclicBarrier(2);
txm.run(() -> {
curTx.put(new Parent(JObjectKey.of(key), "John"));
}).onFlush(barrier0::countDown);
barrier0.await();
try (ExecutorService ex = Executors.newFixedThreadPool(3)) {
ex.invokeAll(List.of(
() -> {
barrier1.await();
Log.info("Thread 2 starting tx");
txm.run(() -> {
Log.info("Thread 2 started tx");
curTx.put(new Parent(JObjectKey.of(key), "John2"));
Log.info("Thread 2 committing");
});
Log.info("Thread 2 commited");
try {
barrier2.await();
} catch (Throwable e) {
throw new RuntimeException(e);
}
return null;
},
() -> {
Log.info("Thread 1 starting tx");
txm.run(() -> {
try {
Log.info("Thread 1 started tx");
barrier1.await();
barrier2.await();
} catch (Throwable e) {
throw new RuntimeException(e);
}
Log.info("Thread 1 reading");
Assertions.assertEquals("John", curTx.get(Parent.class, new JObjectKey(key)).orElseThrow().name());
Log.info("Thread 1 done reading");
});
Log.info("Thread 1 finished");
return null;
}
)).forEach(f -> {
try {
f.get();
} catch (Throwable e) {
throw new RuntimeException(e);
}
});
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
txm.run(() -> {
Assertions.assertEquals("John2", curTx.get(Parent.class, new JObjectKey(key)).orElseThrow().name());
});
deleteAndCheck(new JObjectKey(key));
}
// }
//
// @Test