diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/CurrentTransaction.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/CurrentTransaction.java index 52b97a5a..089c97ec 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/CurrentTransaction.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/CurrentTransaction.java @@ -17,6 +17,16 @@ public class CurrentTransaction implements Transaction { return transactionManager.current().getId(); } + @Override + public void onCommit(Runnable runnable) { + transactionManager.current().onCommit(runnable); + } + + @Override + public void onFlush(Runnable runnable) { + transactionManager.current().onFlush(runnable); + } + @Override public Optional get(Class type, JObjectKey key, LockingStrategy strategy) { return transactionManager.current().get(type, key, strategy); diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/JObjectManager.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/JObjectManager.java index eb4c466d..3210a937 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/JObjectManager.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/JObjectManager.java @@ -110,7 +110,7 @@ public class JObjectManager { return transactionFactory.createTransaction(counter, new TransactionObjectSourceImpl(counter)); } - public void commit(TransactionPrivate tx) { + public TransactionHandle commit(TransactionPrivate tx) { verifyReady(); Log.trace("Committing transaction " + tx.getId()); // FIXME: Better way? @@ -243,7 +243,27 @@ public class JObjectManager { } Log.tracef("Committing transaction %d to storage", tx.getId()); - writebackObjectPersistentStore.commitTx(current.values(), tx.getId()); + var addFlushCallback = writebackObjectPersistentStore.commitTx(current.values(), tx.getId()); + + for (var callback : tx.getOnCommit()) { + callback.run(); + } + + for (var callback : tx.getOnFlush()) { + addFlushCallback.accept(callback); + } + + return new TransactionHandle() { + @Override + public long getId() { + return tx.getId(); + } + + @Override + public void onFlush(Runnable runnable) { + addFlushCallback.accept(runnable); + } + }; } catch (Throwable t) { Log.trace("Error when committing transaction", t); throw new TxCommitException(t.getMessage(), t); diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/TransactionManager.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/TransactionManager.java index ffff3751..754858f0 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/TransactionManager.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/TransactionManager.java @@ -1,6 +1,7 @@ package com.usatiuk.dhfs.objects; import com.usatiuk.dhfs.objects.transaction.Transaction; +import com.usatiuk.dhfs.objects.transaction.TransactionHandle; import com.usatiuk.dhfs.utils.VoidFn; import io.quarkus.logging.Log; @@ -9,7 +10,7 @@ import java.util.function.Supplier; public interface TransactionManager { void begin(); - void commit(); + TransactionHandle commit(); void rollback(); diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/TransactionManagerImpl.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/TransactionManagerImpl.java index d8bbf6a4..2f3c212d 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/TransactionManagerImpl.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/TransactionManagerImpl.java @@ -1,6 +1,7 @@ package com.usatiuk.dhfs.objects; import com.usatiuk.dhfs.objects.transaction.Transaction; +import com.usatiuk.dhfs.objects.transaction.TransactionHandle; import com.usatiuk.dhfs.objects.transaction.TransactionPrivate; import io.quarkus.logging.Log; import jakarta.enterprise.context.ApplicationScoped; @@ -24,14 +25,14 @@ public class TransactionManagerImpl implements TransactionManager { } @Override - public void commit() { + public TransactionHandle commit() { if (_currentTransaction.get() == null) { throw new IllegalStateException("No transaction started"); } Log.trace("Committing transaction"); try { - jObjectManager.commit(_currentTransaction.get()); + return jObjectManager.commit(_currentTransaction.get()); } catch (Throwable e) { Log.trace("Transaction commit failed", e); throw e; diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/TxWriteback.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/TxWriteback.java index 66138d60..6d73de04 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/TxWriteback.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/TxWriteback.java @@ -17,7 +17,7 @@ public interface TxWriteback { // Executes callback after bundle with bundleId id has been persisted // if it was already, runs callback on the caller thread - void asyncFence(long bundleId, VoidFn callback); + void asyncFence(long bundleId, Runnable callback); interface PendingWriteEntry { long bundleId(); diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/TxWritebackImpl.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/TxWritebackImpl.java index 66ad87b4..2b7ba3c8 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/TxWritebackImpl.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/TxWritebackImpl.java @@ -145,14 +145,14 @@ public class TxWritebackImpl implements TxWriteback { }); } - List> callbacks = new ArrayList<>(); + List> callbacks = new ArrayList<>(); synchronized (_notFlushedBundles) { _lastWrittenTx.set(bundle.getId()); while (!_notFlushedBundles.isEmpty() && _notFlushedBundles.firstEntry().getKey() <= bundle.getId()) { callbacks.add(_notFlushedBundles.pollFirstEntry().getValue().setCommitted()); } } - callbacks.forEach(l -> l.forEach(VoidFn::apply)); + callbacks.forEach(l -> l.forEach(Runnable::run)); synchronized (_flushWaitSynchronizer) { currentSize -= bundle.calculateTotalSize(); @@ -278,16 +278,16 @@ public class TxWritebackImpl implements TxWriteback { } @Override - public void asyncFence(long bundleId, VoidFn fn) { + public void asyncFence(long bundleId, Runnable fn) { verifyReady(); if (bundleId < 0) throw new IllegalArgumentException("txId should be >0!"); if (_lastWrittenTx.get() >= bundleId) { - fn.apply(); + fn.run(); return; } synchronized (_notFlushedBundles) { if (_lastWrittenTx.get() >= bundleId) { - fn.apply(); + fn.run(); return; } _notFlushedBundles.get(bundleId).addCallback(fn); @@ -296,7 +296,7 @@ public class TxWritebackImpl implements TxWriteback { private class TxBundleImpl implements TxBundle { private final LinkedHashMap _entries = new LinkedHashMap<>(); - private final ArrayList _callbacks = new ArrayList<>(); + private final ArrayList _callbacks = new ArrayList<>(); private long _txId; private volatile boolean _ready = false; private long _size = -1; @@ -315,14 +315,14 @@ public class TxWritebackImpl implements TxWriteback { _ready = true; } - public void addCallback(VoidFn callback) { + public void addCallback(Runnable callback) { synchronized (_callbacks) { if (_wasCommitted) throw new IllegalStateException(); _callbacks.add(callback); } } - public List setCommitted() { + public List setCommitted() { synchronized (_callbacks) { _wasCommitted = true; return Collections.unmodifiableList(_callbacks); diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/WritebackObjectPersistentStore.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/WritebackObjectPersistentStore.java index c40164a9..e126aff6 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/WritebackObjectPersistentStore.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/WritebackObjectPersistentStore.java @@ -9,6 +9,7 @@ import jakarta.inject.Inject; import javax.annotation.Nonnull; import java.util.Collection; import java.util.Optional; +import java.util.function.Consumer; @ApplicationScoped public class WritebackObjectPersistentStore { @@ -33,7 +34,7 @@ public class WritebackObjectPersistentStore { }; } - void commitTx(Collection> writes, long id) { + Consumer commitTx(Collection> writes, long id) { var bundle = txWriteback.createBundle(); try { for (var action : writes) { @@ -58,5 +59,9 @@ public class WritebackObjectPersistentStore { Log.tracef("Committing transaction %d to storage", id); txWriteback.commitBundle(bundle); + + long bundleId = bundle.getId(); + + return r -> txWriteback.asyncFence(bundleId, r); } } diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/Transaction.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/Transaction.java index 198c8f30..166aceb3 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/Transaction.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/Transaction.java @@ -6,9 +6,11 @@ import com.usatiuk.dhfs.objects.JObjectKey; import java.util.Optional; // The transaction interface actually used by user code to retrieve objects -public interface Transaction { +public interface Transaction extends TransactionHandle { long getId(); + void onCommit(Runnable runnable); + Optional get(Class type, JObjectKey key, LockingStrategy strategy); void put(JData obj); diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/TransactionFactoryImpl.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/TransactionFactoryImpl.java index f7357822..da9da306 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/TransactionFactoryImpl.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/TransactionFactoryImpl.java @@ -5,10 +5,7 @@ import com.usatiuk.dhfs.objects.JDataVersionedWrapper; import com.usatiuk.dhfs.objects.JObjectKey; import jakarta.enterprise.context.ApplicationScoped; -import java.util.Collection; -import java.util.HashMap; -import java.util.Map; -import java.util.Optional; +import java.util.*; @ApplicationScoped public class TransactionFactoryImpl implements TransactionFactory { @@ -22,6 +19,9 @@ public class TransactionFactoryImpl implements TransactionFactory { private final ReadTrackingObjectSource _source; private final Map> _writes = new HashMap<>(); private Map> _newWrites = new HashMap<>(); + private final List _onCommit = new ArrayList<>(); + private final List _onFlush = new ArrayList<>(); + private TransactionImpl(long id, TransactionObjectSource source) { _id = id; _source = new ReadTrackingObjectSource(source); @@ -31,6 +31,26 @@ public class TransactionFactoryImpl implements TransactionFactory { return _id; } + @Override + public void onCommit(Runnable runnable) { + _onCommit.add(runnable); + } + + @Override + public void onFlush(Runnable runnable) { + _onFlush.add(runnable); + } + + @Override + public Collection getOnCommit() { + return Collections.unmodifiableCollection(_onCommit); + } + + @Override + public Collection getOnFlush() { + return Collections.unmodifiableCollection(_onFlush); + } + @Override public Optional get(Class type, JObjectKey key, LockingStrategy strategy) { switch (_writes.get(key)) { diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/TransactionHandle.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/TransactionHandle.java new file mode 100644 index 00000000..d55ee1ea --- /dev/null +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/TransactionHandle.java @@ -0,0 +1,7 @@ +package com.usatiuk.dhfs.objects.transaction; + +public interface TransactionHandle { + long getId(); + + void onFlush(Runnable runnable); +} diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/TransactionHandlePrivate.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/TransactionHandlePrivate.java new file mode 100644 index 00000000..cc9c8e7d --- /dev/null +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/TransactionHandlePrivate.java @@ -0,0 +1,7 @@ +package com.usatiuk.dhfs.objects.transaction; + +import java.util.Collection; + +public interface TransactionHandlePrivate extends TransactionHandle { + Collection getOnFlush(); +} diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/TransactionPrivate.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/TransactionPrivate.java index e7ca7d05..1de3b1d8 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/TransactionPrivate.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/TransactionPrivate.java @@ -6,10 +6,12 @@ import java.util.Collection; import java.util.Map; // The transaction interface actually used by user code to retrieve objects -public interface TransactionPrivate extends Transaction { +public interface TransactionPrivate extends Transaction, TransactionHandlePrivate { Collection> drainNewWrites(); Map> reads(); ReadTrackingObjectSource readSource(); + + Collection getOnCommit(); } diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/DeleterTxHook.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/DeleterTxHook.java index 8917ef6c..6910cb1a 100644 --- a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/DeleterTxHook.java +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/DeleterTxHook.java @@ -23,7 +23,7 @@ public class DeleterTxHook implements PreCommitTxHook { } if (canDelete(refCur)) { if (refCur instanceof RemoteObjectMeta ro) { - remoteObjectDeleter.putDeletionCandidate(ro); + curTx.onCommit(() -> remoteObjectDeleter.putDeletionCandidate(ro)); return; } Log.trace("Deleting object on change: " + key); @@ -39,7 +39,7 @@ public class DeleterTxHook implements PreCommitTxHook { if (canDelete(refCur)) { if (refCur instanceof RemoteObjectMeta ro) { - remoteObjectDeleter.putDeletionCandidate(ro); + curTx.onCommit(() -> remoteObjectDeleter.putDeletionCandidate(ro)); return; } Log.warn("Deleting object on creation: " + key);