From c329c1f9828f552353f4f9603407fbb7f2bbcf61 Mon Sep 17 00:00:00 2001 From: Stepan Usatiuk Date: Sat, 3 May 2025 13:57:44 +0200 Subject: [PATCH] Objects: nested transactions --- .../transaction/TransactionManager.java | 31 +++++++++--- .../transaction/TransactionManagerImpl.java | 42 ++++++++++------ .../dhfs/autosync/AutosyncProcessor.java | 22 +++------ .../usatiuk/dhfs/remoteobj/SyncHandler.java | 49 ++++++++----------- 4 files changed, 77 insertions(+), 67 deletions(-) diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/transaction/TransactionManager.java b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/transaction/TransactionManager.java index 3ae998a0..ace63adf 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/transaction/TransactionManager.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/transaction/TransactionManager.java @@ -12,8 +12,8 @@ public interface TransactionManager { void rollback(); - default T runTries(Supplier supplier, int tries) { - if (current() != null) { + default T runTries(Supplier supplier, int tries, boolean nest) { + if (!nest && current() != null) { return supplier.get(); } @@ -41,8 +41,8 @@ public interface TransactionManager { } } - default TransactionHandle runTries(VoidFn fn, int tries) { - if (current() != null) { + default TransactionHandle runTries(VoidFn fn, int tries, boolean nest) { + if (!nest && current() != null) { fn.apply(); return new TransactionHandle() { @Override @@ -74,23 +74,38 @@ public interface TransactionManager { throw e; } } + } + default T runTries(Supplier supplier, int tries) { + return runTries(supplier, tries, false); + } + + default TransactionHandle runTries(VoidFn fn, int tries) { + return runTries(fn, tries, false); + } + + default TransactionHandle run(VoidFn fn, boolean nest) { + return runTries(fn, 10, nest); + } + + default T run(Supplier supplier, boolean nest) { + return runTries(supplier, 10, nest); } default TransactionHandle run(VoidFn fn) { - return runTries(fn, 10); + return run(fn, false); } default T run(Supplier supplier) { - return runTries(supplier, 10); + return run(supplier, false); } default void executeTx(VoidFn fn) { - run(fn); + run(fn, false); } default T executeTx(Supplier supplier) { - return run(supplier); + return run(supplier, false); } Transaction current(); diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/transaction/TransactionManagerImpl.java b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/transaction/TransactionManagerImpl.java index baca7eb6..92f40dc3 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/transaction/TransactionManagerImpl.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/transaction/TransactionManagerImpl.java @@ -1,47 +1,48 @@ package com.usatiuk.objects.transaction; import io.quarkus.logging.Log; +import jakarta.annotation.Nullable; import jakarta.inject.Inject; import jakarta.inject.Singleton; import org.apache.commons.lang3.tuple.Pair; import java.util.Collection; -import java.util.concurrent.ExecutorService; +import java.util.Stack; @Singleton public class TransactionManagerImpl implements TransactionManager { - private static final ThreadLocal _currentTransaction = new ThreadLocal<>(); + private static final ThreadLocal> _currentTransaction = ThreadLocal.withInitial(Stack::new); @Inject JObjectManager jObjectManager; @Override public void begin() { - if (_currentTransaction.get() != null) { - throw new IllegalStateException("Transaction already started"); - } - Log.trace("Starting transaction"); var tx = jObjectManager.createTransaction(); - _currentTransaction.set(tx); + _currentTransaction.get().push(tx); } @Override public TransactionHandle commit() { - if (_currentTransaction.get() == null) { + var stack = _currentTransaction.get(); + if (stack.empty()) { throw new IllegalStateException("No transaction started"); } + var peeked = stack.peek(); Log.trace("Committing transaction"); Pair, TransactionHandle> ret; try { - ret = jObjectManager.commit(_currentTransaction.get()); + ret = jObjectManager.commit(peeked); } catch (Throwable e) { Log.trace("Transaction commit failed", e); throw e; } finally { - _currentTransaction.get().close(); - _currentTransaction.remove(); + peeked.close(); + stack.pop(); + if (stack.empty()) + _currentTransaction.remove(); } for (var r : ret.getLeft()) { @@ -56,24 +57,33 @@ public class TransactionManagerImpl implements TransactionManager { @Override public void rollback() { - if (_currentTransaction.get() == null) { + var stack = _currentTransaction.get(); + if (stack.empty()) { throw new IllegalStateException("No transaction started"); } + var peeked = stack.peek(); try { - jObjectManager.rollback(_currentTransaction.get()); + jObjectManager.rollback(peeked); } catch (Throwable e) { Log.error("Transaction rollback failed", e); throw e; } finally { - _currentTransaction.get().close(); - _currentTransaction.remove(); + peeked.close(); + stack.pop(); + if (stack.empty()) + _currentTransaction.remove(); } } @Override + @Nullable public Transaction current() { - return _currentTransaction.get(); + var stack = _currentTransaction.get(); + if (stack.empty()) { + return null; + } + return stack.peek(); } } diff --git a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/autosync/AutosyncProcessor.java b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/autosync/AutosyncProcessor.java index 483bb717..ce680536 100644 --- a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/autosync/AutosyncProcessor.java +++ b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/autosync/AutosyncProcessor.java @@ -19,8 +19,6 @@ import jakarta.inject.Inject; import org.apache.commons.lang3.concurrent.BasicThreadFactory; import org.eclipse.microprofile.config.inject.ConfigProperty; -import java.util.LinkedList; -import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -56,27 +54,21 @@ public class AutosyncProcessor { if (downloadAll) executorService.submit(() -> { Log.info("Adding all to autosync"); - List objs = new LinkedList<>(); txm.run(() -> { try (var it = curTx.getIterator(IteratorStart.GE, JObjectKey.first())) { while (it.hasNext()) { var key = it.peekNextKey(); - objs.add(key); - // TODO: Nested transactions + txm.run(() -> { + var gotObj = curTx.get(JData.class, key).orElse(null); + if (!(gotObj instanceof RemoteObjectMeta meta)) + return; + if (!meta.hasLocalData()) + add(meta.key()); + }, true); it.skip(); } } }); - - for (var obj : objs) { - txm.run(() -> { - var gotObj = curTx.get(JData.class, obj).orElse(null); - if (!(gotObj instanceof RemoteObjectMeta meta)) - return; - if (!meta.hasLocalData()) - add(meta.key()); - }); - } Log.info("Adding all to autosync: finished"); }); } diff --git a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/remoteobj/SyncHandler.java b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/remoteobj/SyncHandler.java index aaf6ab8e..8fdfc2df 100644 --- a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/remoteobj/SyncHandler.java +++ b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/remoteobj/SyncHandler.java @@ -23,7 +23,10 @@ import org.pcollections.PMap; import javax.annotation.Nullable; import java.lang.reflect.ParameterizedType; -import java.util.*; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; import java.util.stream.Stream; @ApplicationScoped @@ -124,52 +127,42 @@ public class SyncHandler { public void resyncAfterCrash(@Observes @Priority(100000) StartupEvent event) { if (shutdownChecker.lastShutdownClean()) return; - List objs = new LinkedList<>(); txm.run(() -> { try (var it = curTx.getIterator(IteratorStart.GE, JObjectKey.first())) { while (it.hasNext()) { var key = it.peekNextKey(); - objs.add(key); - // TODO: Nested transactions + txm.run(() -> { + var proc = curTx.get(JData.class, key).flatMap(o -> Optional.ofNullable(_initialSyncProcessors.get(o.getClass()))).orElse(null); + if (proc != null) { + proc.handleCrash(key); + } + Log.infov("Handled crash of {0}", key); + + }, true); it.skip(); } } }); - - for (var obj : objs) { - txm.run(() -> { - var proc = curTx.get(JData.class, obj).flatMap(o -> Optional.ofNullable(_initialSyncProcessors.get(o.getClass()))).orElse(null); - if (proc != null) { - proc.handleCrash(obj); - } - Log.infov("Handled crash of {0}", obj); - }); - } } public void doInitialSync(PeerId peer) { - List objs = new LinkedList<>(); txm.run(() -> { Log.tracev("Will do initial sync for {0}", peer); try (var it = curTx.getIterator(IteratorStart.GE, JObjectKey.first())) { while (it.hasNext()) { var key = it.peekNextKey(); - objs.add(key); - // TODO: Nested transactions + txm.run(() -> { + var proc = curTx.get(JData.class, key).flatMap(o -> Optional.ofNullable(_initialSyncProcessors.get(o.getClass()))).orElse(null); + if (proc != null) { + proc.prepareForInitialSync(peer, key); + } + Log.infov("Adding to initial sync for peer {0}: {1}", peer, key); + invalidationQueueService.pushInvalidationToOne(peer, key); + + }, true); it.skip(); } } }); - - for (var obj : objs) { - txm.run(() -> { - var proc = curTx.get(JData.class, obj).flatMap(o -> Optional.ofNullable(_initialSyncProcessors.get(o.getClass()))).orElse(null); - if (proc != null) { - proc.prepareForInitialSync(peer, obj); - } - Log.infov("Adding to initial sync for peer {0}: {1}", peer, obj); - invalidationQueueService.pushInvalidationToOne(peer, obj); - }); - } } } \ No newline at end of file