From f5c815f02ab5ed45ad1d89addfbd840867e8b9b8 Mon Sep 17 00:00:00 2001 From: Stepan Usatiuk Date: Sat, 22 Feb 2025 18:35:12 +0100 Subject: [PATCH] somewhat even more working transactions 3.0? --- .../usatiuk/dhfs/objects/JObjectManager.java | 8 +- .../usatiuk/dhfs/objects/SnapshotManager.java | 102 +++++++++- .../dhfs/objects/TransactionManager.java | 20 +- .../com/usatiuk/dhfs/objects/ObjectsTest.java | 190 ++++++++++++++++++ 4 files changed, 304 insertions(+), 16 deletions(-) diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/JObjectManager.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/JObjectManager.java index 5c0aa18b..0a563412 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/JObjectManager.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/JObjectManager.java @@ -173,11 +173,6 @@ public class JObjectManager { Log.trace("Checking dependency " + read.getKey() + " - ok with read"); } - Log.tracef("Flushing transaction %d to storage", newId); - - var realNewId = _txCounter.getAndIncrement() + 1; - assert realNewId == newId; - Log.tracef("Committing transaction %d to storage", newId); var addFlushCallback = snapshotManager.commitTx( writes.values().stream() @@ -193,6 +188,9 @@ public class JObjectManager { }).toList(), newId); + var realNewId = _txCounter.getAndIncrement() + 1; + assert realNewId == newId; + for (var callback : tx.getOnCommit()) { callback.run(); } diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/SnapshotManager.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/SnapshotManager.java index 75653f01..5f5f2c1d 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/SnapshotManager.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/SnapshotManager.java @@ -25,6 +25,9 @@ public class SnapshotManager { private interface SnapshotEntry { } + private record SnapshotEntryRead(JDataVersionedWrapper data, long whenToRemove) implements SnapshotEntry { + } + private record SnapshotEntryObject(JDataVersionedWrapper data) implements SnapshotEntry { } @@ -47,6 +50,7 @@ public class SnapshotManager { private final ConcurrentSkipListMap _objects = new ConcurrentSkipListMap<>(); private final MultiValuedMap _snapshotBounds = new HashSetValuedHashMap<>(); private final HashMap _snapshotRefCounts = new HashMap<>(); + private final ConcurrentSkipListMap _snapshotVersions = new ConcurrentSkipListMap<>(); private void verify() { assert _snapshotIds.isEmpty() == (_lastAliveSnapshotId == -1); @@ -55,15 +59,20 @@ public class SnapshotManager { Consumer commitTx(Collection> writes, long id) { synchronized (this) { + assert id > _lastSnapshotId; if (!_snapshotIds.isEmpty()) { verify(); + boolean hadBackward = false; for (var action : writes) { var current = delegateStore.readObjectVerbose(action.key()); Pair newSnapshotEntry = switch (current) { case WritebackObjectPersistentStore.VerboseReadResultPersisted( Optional data - ) -> Pair.of(new SnapshotKey(action.key(), _snapshotIds.peek()), - data.map(SnapshotEntryObject::new).orElse(new SnapshotEntryDeleted())); + ) -> { + hadBackward = true; + yield Pair.of(new SnapshotKey(action.key(), _snapshotIds.peek()), + data.map(o -> new SnapshotEntryRead(o, id)).orElse(new SnapshotEntryDeleted())); + } case WritebackObjectPersistentStore.VerboseReadResultPending( TxWriteback.PendingWriteEntry pending ) -> switch (pending) { @@ -79,6 +88,11 @@ public class SnapshotManager { _objects.put(newSnapshotEntry.getLeft(), newSnapshotEntry.getRight()); _snapshotBounds.put(newSnapshotEntry.getLeft().version(), newSnapshotEntry.getLeft()); } + + if (hadBackward) + for (var sid : _snapshotIds) { + _snapshotVersions.merge(sid, 1L, Long::sum); + } } verify(); @@ -96,10 +110,22 @@ public class SnapshotManager { long curCount; long curId = id; + long nextId; do { _snapshotIds.poll(); + _snapshotVersions.remove(curId); + nextId = _snapshotIds.isEmpty() ? -1 : _snapshotIds.peek(); for (var key : _snapshotBounds.remove(curId)) { + var entry = _objects.get(key); + if (entry instanceof SnapshotEntryRead read) { + if (curId != read.whenToRemove() - 1) { + assert nextId != -1; + if (nextId < read.whenToRemove()) { + _objects.put(new SnapshotKey(key.key(), nextId), entry); + } + } + } _objects.remove(key); } @@ -137,6 +163,7 @@ public class SnapshotManager { _lastAliveSnapshotId = id; _snapshotIds.add(id); _snapshotRefCounts.merge(id, 1L, Long::sum); + _snapshotVersions.put(id, 0L); verify(); } var closedRef = _closed; @@ -169,6 +196,8 @@ public class SnapshotManager { _next = switch (next.getValue()) { case SnapshotEntryObject(JDataVersionedWrapper data) -> Pair.of(next.getKey().key(), new TombstoneMergingKvIterator.Data<>(data)); + case SnapshotEntryRead(JDataVersionedWrapper data, long whenToRemove) -> + Pair.of(next.getKey().key(), new TombstoneMergingKvIterator.Data<>(data)); case SnapshotEntryDeleted() -> Pair.of(next.getKey().key(), new TombstoneMergingKvIterator.Tombstone<>()); default -> throw new IllegalStateException("Unexpected value: " + next.getValue()); @@ -206,8 +235,75 @@ public class SnapshotManager { } + public class AutoRefreshingSnapshotKvIterator implements CloseableKvIterator { + private CloseableKvIterator _backing; + private long _lastRefreshed = -1L; + private Pair _next; + + public AutoRefreshingSnapshotKvIterator(IteratorStart start, JObjectKey key) { + synchronized (SnapshotManager.this) { + long curVersion = _snapshotVersions.get(_id); + _backing = new TombstoneMergingKvIterator<>(new SnapshotKvIterator(start, key), delegateStore.getIterator(start, key)); + _next = _backing.hasNext() ? _backing.next() : null; + _lastRefreshed = curVersion; + } + } + + private void doRefresh() { + long curVersion = _snapshotVersions.get(_id); + if (curVersion == _lastRefreshed) { + return; + } + if (_next == null) return; + synchronized (SnapshotManager.this) { + curVersion = _snapshotVersions.get(_id); + _backing.close(); + _backing = new TombstoneMergingKvIterator<>(new SnapshotKvIterator(IteratorStart.GE, _next.getKey()), delegateStore.getIterator(IteratorStart.GE, _next.getKey())); + var next = _backing.hasNext() ? _backing.next() : null; + assert next != null; + assert next.equals(_next); + _next = next; + _lastRefreshed = curVersion; + } + } + + private void prepareNext() { + doRefresh(); + if (_backing.hasNext()) { + _next = _backing.next(); + } else { + _next = null; + } + } + + @Override + public JObjectKey peekNextKey() { + return _next.getKey(); + } + + @Override + public void close() { + _backing.close(); + } + + @Override + public boolean hasNext() { + return _next != null; + } + + @Override + public Pair next() { + if (_next == null) { + throw new NoSuchElementException("No more elements"); + } + var ret = _next; + prepareNext(); + return ret; + } + } + public CloseableKvIterator getIterator(IteratorStart start, JObjectKey key) { - return new TombstoneMergingKvIterator<>(new SnapshotKvIterator(start, key), delegateStore.getIterator(start, key)); + return new AutoRefreshingSnapshotKvIterator(start, key); } public CloseableKvIterator getIterator(JObjectKey key) { diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/TransactionManager.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/TransactionManager.java index 754858f0..2fe54390 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/TransactionManager.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/TransactionManager.java @@ -46,10 +46,15 @@ public interface TransactionManager { } } - default void runTries(VoidFn fn, int tries) { + default TransactionHandle runTries(VoidFn fn, int tries) { if (current() != null) { fn.apply(); - return; + return new TransactionHandle() { + @Override + public void onFlush(Runnable runnable) { + current().onCommit(runnable); + } + }; } begin(); @@ -61,25 +66,24 @@ public interface TransactionManager { Log.error("Transaction commit failed", txCommitException); throw txCommitException; } - runTries(fn, tries - 1); - return; + return runTries(fn, tries - 1); } catch (Throwable e) { rollback(); throw e; } try { - commit(); + return commit(); } catch (TxCommitException txCommitException) { if (tries == 0) { Log.error("Transaction commit failed", txCommitException); throw txCommitException; } - runTries(fn, tries - 1); + return runTries(fn, tries - 1); } } - default void run(VoidFn fn) { - runTries(fn, 10); + default TransactionHandle run(VoidFn fn) { + return runTries(fn, 10); } default T run(Supplier supplier) { diff --git a/dhfs-parent/objects/src/test/java/com/usatiuk/dhfs/objects/ObjectsTest.java b/dhfs-parent/objects/src/test/java/com/usatiuk/dhfs/objects/ObjectsTest.java index 6036cad9..7d9ecab1 100644 --- a/dhfs-parent/objects/src/test/java/com/usatiuk/dhfs/objects/ObjectsTest.java +++ b/dhfs-parent/objects/src/test/java/com/usatiuk/dhfs/objects/ObjectsTest.java @@ -13,8 +13,11 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.EnumSource; +import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; @QuarkusTest @@ -25,6 +28,17 @@ public class ObjectsTest { @Inject Transaction curTx; + private void deleteAndCheck(JObjectKey key) { + txm.run(() -> { + curTx.delete(key); + }); + + txm.run(() -> { + var parent = curTx.get(JData.class, key).orElse(null); + Assertions.assertNull(parent); + }); + } + @Test void createObject() { txm.run(() -> { @@ -252,6 +266,182 @@ public class ObjectsTest { } } + @RepeatedTest(100) + void snapshotTest1() { + var key = "SnapshotTest1"; + var barrier1 = new CyclicBarrier(2); + var barrier2 = new CyclicBarrier(2); + try (ExecutorService ex = Executors.newFixedThreadPool(3)) { + ex.invokeAll(List.of( + () -> { + barrier1.await(); + Log.info("Thread 2 starting tx"); + txm.run(() -> { + Log.info("Thread 2 started tx"); + curTx.put(new Parent(JObjectKey.of(key), "John")); + Log.info("Thread 2 committing"); + }); + Log.info("Thread 2 commited"); + try { + barrier2.await(); + } catch (Throwable e) { + throw new RuntimeException(e); + } + return null; + }, + () -> { + Log.info("Thread 1 starting tx"); + txm.run(() -> { + try { + Log.info("Thread 1 started tx"); + barrier1.await(); + barrier2.await(); + } catch (Throwable e) { + throw new RuntimeException(e); + } + Log.info("Thread 1 reading"); + Assertions.assertTrue(curTx.get(Parent.class, new JObjectKey(key)).isEmpty()); + Log.info("Thread 1 done reading"); + }); + Log.info("Thread 1 finished"); + return null; + } + )).forEach(f -> { + try { + f.get(); + } catch (Throwable e) { + throw new RuntimeException(e); + } + }); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + txm.run(() -> { + Assertions.assertEquals("John", curTx.get(Parent.class, new JObjectKey(key)).orElseThrow().name()); + }); + deleteAndCheck(new JObjectKey(key)); + } + + @RepeatedTest(100) + void snapshotTest2() { + var key = "SnapshotTest2"; + var barrier1 = new CyclicBarrier(2); + var barrier2 = new CyclicBarrier(2); + txm.run(() -> { + curTx.put(new Parent(JObjectKey.of(key), "John")); + }); + try (ExecutorService ex = Executors.newFixedThreadPool(3)) { + ex.invokeAll(List.of( + () -> { + barrier1.await(); + Log.info("Thread 2 starting tx"); + txm.run(() -> { + Log.info("Thread 2 started tx"); + curTx.put(new Parent(JObjectKey.of(key), "John2")); + Log.info("Thread 2 committing"); + }); + Log.info("Thread 2 commited"); + try { + barrier2.await(); + } catch (Throwable e) { + throw new RuntimeException(e); + } + return null; + }, + () -> { + Log.info("Thread 1 starting tx"); + txm.run(() -> { + try { + Log.info("Thread 1 started tx"); + barrier1.await(); + barrier2.await(); + } catch (Throwable e) { + throw new RuntimeException(e); + } + Log.info("Thread 1 reading"); + Assertions.assertEquals("John", curTx.get(Parent.class, new JObjectKey(key)).orElseThrow().name()); + Log.info("Thread 1 done reading"); + }); + Log.info("Thread 1 finished"); + return null; + } + )).forEach(f -> { + try { + f.get(); + } catch (Throwable e) { + throw new RuntimeException(e); + } + }); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + txm.run(() -> { + Assertions.assertEquals("John2", curTx.get(Parent.class, new JObjectKey(key)).orElseThrow().name()); + }); + deleteAndCheck(new JObjectKey(key)); + } + + @RepeatedTest(100) + void snapshotTest3() throws InterruptedException { + var key = "SnapshotTest3"; + var barrier0 = new CountDownLatch(1); + var barrier1 = new CyclicBarrier(2); + var barrier2 = new CyclicBarrier(2); + txm.run(() -> { + curTx.put(new Parent(JObjectKey.of(key), "John")); + }).onFlush(barrier0::countDown); + barrier0.await(); + try (ExecutorService ex = Executors.newFixedThreadPool(3)) { + ex.invokeAll(List.of( + () -> { + barrier1.await(); + Log.info("Thread 2 starting tx"); + txm.run(() -> { + Log.info("Thread 2 started tx"); + curTx.put(new Parent(JObjectKey.of(key), "John2")); + Log.info("Thread 2 committing"); + }); + Log.info("Thread 2 commited"); + try { + barrier2.await(); + } catch (Throwable e) { + throw new RuntimeException(e); + } + return null; + }, + () -> { + Log.info("Thread 1 starting tx"); + txm.run(() -> { + try { + Log.info("Thread 1 started tx"); + barrier1.await(); + barrier2.await(); + } catch (Throwable e) { + throw new RuntimeException(e); + } + Log.info("Thread 1 reading"); + Assertions.assertEquals("John", curTx.get(Parent.class, new JObjectKey(key)).orElseThrow().name()); + Log.info("Thread 1 done reading"); + }); + Log.info("Thread 1 finished"); + return null; + } + )).forEach(f -> { + try { + f.get(); + } catch (Throwable e) { + throw new RuntimeException(e); + } + }); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + txm.run(() -> { + Assertions.assertEquals("John2", curTx.get(Parent.class, new JObjectKey(key)).orElseThrow().name()); + }); + deleteAndCheck(new JObjectKey(key)); + } + // } // // @Test