mirror of
https://github.com/usatiuk/dhfs.git
synced 2025-10-28 20:47:49 +01:00
Objects: nested transactions
This commit is contained in:
@@ -12,8 +12,8 @@ public interface TransactionManager {
|
||||
|
||||
void rollback();
|
||||
|
||||
default <T> T runTries(Supplier<T> supplier, int tries) {
|
||||
if (current() != null) {
|
||||
default <T> T runTries(Supplier<T> supplier, int tries, boolean nest) {
|
||||
if (!nest && current() != null) {
|
||||
return supplier.get();
|
||||
}
|
||||
|
||||
@@ -41,8 +41,8 @@ public interface TransactionManager {
|
||||
}
|
||||
}
|
||||
|
||||
default TransactionHandle runTries(VoidFn fn, int tries) {
|
||||
if (current() != null) {
|
||||
default TransactionHandle runTries(VoidFn fn, int tries, boolean nest) {
|
||||
if (!nest && current() != null) {
|
||||
fn.apply();
|
||||
return new TransactionHandle() {
|
||||
@Override
|
||||
@@ -74,23 +74,38 @@ public interface TransactionManager {
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
default <T> T runTries(Supplier<T> supplier, int tries) {
|
||||
return runTries(supplier, tries, false);
|
||||
}
|
||||
|
||||
default TransactionHandle runTries(VoidFn fn, int tries) {
|
||||
return runTries(fn, tries, false);
|
||||
}
|
||||
|
||||
default TransactionHandle run(VoidFn fn, boolean nest) {
|
||||
return runTries(fn, 10, nest);
|
||||
}
|
||||
|
||||
default <T> T run(Supplier<T> supplier, boolean nest) {
|
||||
return runTries(supplier, 10, nest);
|
||||
}
|
||||
|
||||
default TransactionHandle run(VoidFn fn) {
|
||||
return runTries(fn, 10);
|
||||
return run(fn, false);
|
||||
}
|
||||
|
||||
default <T> T run(Supplier<T> supplier) {
|
||||
return runTries(supplier, 10);
|
||||
return run(supplier, false);
|
||||
}
|
||||
|
||||
default void executeTx(VoidFn fn) {
|
||||
run(fn);
|
||||
run(fn, false);
|
||||
}
|
||||
|
||||
default <T> T executeTx(Supplier<T> supplier) {
|
||||
return run(supplier);
|
||||
return run(supplier, false);
|
||||
}
|
||||
|
||||
Transaction current();
|
||||
|
||||
@@ -1,47 +1,48 @@
|
||||
package com.usatiuk.objects.transaction;
|
||||
|
||||
import io.quarkus.logging.Log;
|
||||
import jakarta.annotation.Nullable;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Singleton;
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.Stack;
|
||||
|
||||
@Singleton
|
||||
public class TransactionManagerImpl implements TransactionManager {
|
||||
private static final ThreadLocal<TransactionPrivate> _currentTransaction = new ThreadLocal<>();
|
||||
private static final ThreadLocal<Stack<TransactionPrivate>> _currentTransaction = ThreadLocal.withInitial(Stack::new);
|
||||
@Inject
|
||||
JObjectManager jObjectManager;
|
||||
|
||||
@Override
|
||||
public void begin() {
|
||||
if (_currentTransaction.get() != null) {
|
||||
throw new IllegalStateException("Transaction already started");
|
||||
}
|
||||
|
||||
Log.trace("Starting transaction");
|
||||
var tx = jObjectManager.createTransaction();
|
||||
_currentTransaction.set(tx);
|
||||
_currentTransaction.get().push(tx);
|
||||
}
|
||||
|
||||
@Override
|
||||
public TransactionHandle commit() {
|
||||
if (_currentTransaction.get() == null) {
|
||||
var stack = _currentTransaction.get();
|
||||
if (stack.empty()) {
|
||||
throw new IllegalStateException("No transaction started");
|
||||
}
|
||||
var peeked = stack.peek();
|
||||
|
||||
Log.trace("Committing transaction");
|
||||
|
||||
Pair<Collection<Runnable>, TransactionHandle> ret;
|
||||
try {
|
||||
ret = jObjectManager.commit(_currentTransaction.get());
|
||||
ret = jObjectManager.commit(peeked);
|
||||
} catch (Throwable e) {
|
||||
Log.trace("Transaction commit failed", e);
|
||||
throw e;
|
||||
} finally {
|
||||
_currentTransaction.get().close();
|
||||
_currentTransaction.remove();
|
||||
peeked.close();
|
||||
stack.pop();
|
||||
if (stack.empty())
|
||||
_currentTransaction.remove();
|
||||
}
|
||||
|
||||
for (var r : ret.getLeft()) {
|
||||
@@ -56,24 +57,33 @@ public class TransactionManagerImpl implements TransactionManager {
|
||||
|
||||
@Override
|
||||
public void rollback() {
|
||||
if (_currentTransaction.get() == null) {
|
||||
var stack = _currentTransaction.get();
|
||||
if (stack.empty()) {
|
||||
throw new IllegalStateException("No transaction started");
|
||||
}
|
||||
var peeked = stack.peek();
|
||||
|
||||
try {
|
||||
jObjectManager.rollback(_currentTransaction.get());
|
||||
jObjectManager.rollback(peeked);
|
||||
} catch (Throwable e) {
|
||||
Log.error("Transaction rollback failed", e);
|
||||
throw e;
|
||||
} finally {
|
||||
_currentTransaction.get().close();
|
||||
_currentTransaction.remove();
|
||||
peeked.close();
|
||||
stack.pop();
|
||||
if (stack.empty())
|
||||
_currentTransaction.remove();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
@Nullable
|
||||
public Transaction current() {
|
||||
return _currentTransaction.get();
|
||||
var stack = _currentTransaction.get();
|
||||
if (stack.empty()) {
|
||||
return null;
|
||||
}
|
||||
return stack.peek();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -19,8 +19,6 @@ import jakarta.inject.Inject;
|
||||
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
|
||||
import org.eclipse.microprofile.config.inject.ConfigProperty;
|
||||
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
|
||||
@@ -56,27 +54,21 @@ public class AutosyncProcessor {
|
||||
if (downloadAll)
|
||||
executorService.submit(() -> {
|
||||
Log.info("Adding all to autosync");
|
||||
List<JObjectKey> objs = new LinkedList<>();
|
||||
txm.run(() -> {
|
||||
try (var it = curTx.getIterator(IteratorStart.GE, JObjectKey.first())) {
|
||||
while (it.hasNext()) {
|
||||
var key = it.peekNextKey();
|
||||
objs.add(key);
|
||||
// TODO: Nested transactions
|
||||
txm.run(() -> {
|
||||
var gotObj = curTx.get(JData.class, key).orElse(null);
|
||||
if (!(gotObj instanceof RemoteObjectMeta meta))
|
||||
return;
|
||||
if (!meta.hasLocalData())
|
||||
add(meta.key());
|
||||
}, true);
|
||||
it.skip();
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
for (var obj : objs) {
|
||||
txm.run(() -> {
|
||||
var gotObj = curTx.get(JData.class, obj).orElse(null);
|
||||
if (!(gotObj instanceof RemoteObjectMeta meta))
|
||||
return;
|
||||
if (!meta.hasLocalData())
|
||||
add(meta.key());
|
||||
});
|
||||
}
|
||||
Log.info("Adding all to autosync: finished");
|
||||
});
|
||||
}
|
||||
|
||||
@@ -23,7 +23,10 @@ import org.pcollections.PMap;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.lang.reflect.ParameterizedType;
|
||||
import java.util.*;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
@ApplicationScoped
|
||||
@@ -124,52 +127,42 @@ public class SyncHandler {
|
||||
public void resyncAfterCrash(@Observes @Priority(100000) StartupEvent event) {
|
||||
if (shutdownChecker.lastShutdownClean())
|
||||
return;
|
||||
List<JObjectKey> objs = new LinkedList<>();
|
||||
txm.run(() -> {
|
||||
try (var it = curTx.getIterator(IteratorStart.GE, JObjectKey.first())) {
|
||||
while (it.hasNext()) {
|
||||
var key = it.peekNextKey();
|
||||
objs.add(key);
|
||||
// TODO: Nested transactions
|
||||
txm.run(() -> {
|
||||
var proc = curTx.get(JData.class, key).flatMap(o -> Optional.ofNullable(_initialSyncProcessors.get(o.getClass()))).orElse(null);
|
||||
if (proc != null) {
|
||||
proc.handleCrash(key);
|
||||
}
|
||||
Log.infov("Handled crash of {0}", key);
|
||||
|
||||
}, true);
|
||||
it.skip();
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
for (var obj : objs) {
|
||||
txm.run(() -> {
|
||||
var proc = curTx.get(JData.class, obj).flatMap(o -> Optional.ofNullable(_initialSyncProcessors.get(o.getClass()))).orElse(null);
|
||||
if (proc != null) {
|
||||
proc.handleCrash(obj);
|
||||
}
|
||||
Log.infov("Handled crash of {0}", obj);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
public void doInitialSync(PeerId peer) {
|
||||
List<JObjectKey> objs = new LinkedList<>();
|
||||
txm.run(() -> {
|
||||
Log.tracev("Will do initial sync for {0}", peer);
|
||||
try (var it = curTx.getIterator(IteratorStart.GE, JObjectKey.first())) {
|
||||
while (it.hasNext()) {
|
||||
var key = it.peekNextKey();
|
||||
objs.add(key);
|
||||
// TODO: Nested transactions
|
||||
txm.run(() -> {
|
||||
var proc = curTx.get(JData.class, key).flatMap(o -> Optional.ofNullable(_initialSyncProcessors.get(o.getClass()))).orElse(null);
|
||||
if (proc != null) {
|
||||
proc.prepareForInitialSync(peer, key);
|
||||
}
|
||||
Log.infov("Adding to initial sync for peer {0}: {1}", peer, key);
|
||||
invalidationQueueService.pushInvalidationToOne(peer, key);
|
||||
|
||||
}, true);
|
||||
it.skip();
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
for (var obj : objs) {
|
||||
txm.run(() -> {
|
||||
var proc = curTx.get(JData.class, obj).flatMap(o -> Optional.ofNullable(_initialSyncProcessors.get(o.getClass()))).orElse(null);
|
||||
if (proc != null) {
|
||||
proc.prepareForInitialSync(peer, obj);
|
||||
}
|
||||
Log.infov("Adding to initial sync for peer {0}: {1}", peer, obj);
|
||||
invalidationQueueService.pushInvalidationToOne(peer, obj);
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user