diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/JObjectManager.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/JObjectManager.java index 7a7035c7..b25c7617 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/JObjectManager.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/JObjectManager.java @@ -10,6 +10,7 @@ import jakarta.enterprise.context.ApplicationScoped; import jakarta.enterprise.event.Observes; import jakarta.enterprise.inject.Instance; import jakarta.inject.Inject; +import org.apache.commons.lang3.tuple.Pair; import java.util.*; import java.util.function.Consumer; @@ -40,7 +41,8 @@ public class JObjectManager { } JObjectManager(Instance preCommitTxHooks) { - _preCommitTxHooks = preCommitTxHooks.stream().sorted(Comparator.comparingInt(PreCommitTxHook::getPriority)).toList(); + _preCommitTxHooks = List.copyOf(preCommitTxHooks.stream().sorted(Comparator.comparingInt(PreCommitTxHook::getPriority)).toList()); + Log.debugv("Pre-commit hooks: {0}", String.join("->", _preCommitTxHooks.stream().map(Objects::toString).toList())); } public TransactionPrivate createTransaction() { @@ -66,34 +68,51 @@ public class JObjectManager { }); }; - // For existing objects: - // Check that their version is not higher than the version of transaction being committed - // TODO: check deletions, inserts try { try { - Function getPrev = - key -> switch (writes.get(key)) { - case TxRecord.TxObjectRecordWrite write -> write.data(); - case TxRecord.TxObjectRecordDeleted deleted -> null; - case null -> tx.getFromSource(JData.class, key).orElse(null); - default -> { - throw new TxCommitException("Unexpected value: " + writes.get(key)); - } - }; + long pendingCount = 0; + Map>> pendingWrites = Map.ofEntries( + _preCommitTxHooks.stream().map(p -> Pair.of(p, new HashMap<>())).toArray(Pair[]::new) + ); + Map>> lastWrites = Map.ofEntries( + _preCommitTxHooks.stream().map(p -> Pair.of(p, new HashMap<>())).toArray(Pair[]::new) + ); - boolean somethingChanged; + for (var n : tx.drainNewWrites()) { + for (var hookPut : _preCommitTxHooks) { + pendingWrites.get(hookPut).put(n.key(), n); + pendingCount++; + } + writes.put(n.key(), n); + } + + // Run hooks for all objects + // Every hook should see every change made to every object, yet the object's evolution + // should be consistent from the view point of each individual hook + // For example, when a hook makes changes to an object, and another hook changes the object before/after it + // on the next iteration, the first hook should receive the version of the object it had created + // as the "old" version, and the new version with all the changes after it. do { - somethingChanged = false; - Map> currentIteration = new HashMap(); for (var hook : _preCommitTxHooks) { - for (var n : tx.drainNewWrites()) - currentIteration.put(n.key(), n); - Log.trace("Commit iteration with " + currentIteration.size() + " records for hook " + hook.getClass()); + var lastCurHookSeen = lastWrites.get(hook); + Function getPrev = + key -> switch (lastCurHookSeen.get(key)) { + case TxRecord.TxObjectRecordWrite write -> write.data(); + case TxRecord.TxObjectRecordDeleted deleted -> null; + case null -> tx.getFromSource(JData.class, key).orElse(null); + default -> { + throw new TxCommitException("Unexpected value: " + writes.get(key)); + } + }; - for (var entry : currentIteration.entrySet()) { - somethingChanged = true; + var curIteration = pendingWrites.get(hook); + + Log.trace("Commit iteration with " + curIteration.size() + " records for hook " + hook.getClass()); + + for (var entry : curIteration.entrySet()) { Log.trace("Running pre-commit hook " + hook.getClass() + " for" + entry.getKey()); var oldObj = getPrev.apply(entry.getKey()); + lastCurHookSeen.put(entry.getKey(), entry.getValue()); switch (entry.getValue()) { case TxRecord.TxObjectRecordWrite write -> { if (oldObj == null) { @@ -108,9 +127,24 @@ public class JObjectManager { default -> throw new TxCommitException("Unexpected value: " + entry); } } + + pendingCount -= curIteration.size(); + curIteration.clear(); + + for (var n : tx.drainNewWrites()) { + for (var hookPut : _preCommitTxHooks) { + if (hookPut == hook) { + lastCurHookSeen.put(n.key(), n); + continue; + } + var before = pendingWrites.get(hookPut).put(n.key(), n); + if (before == null) + pendingCount++; + } + writes.put(n.key(), n); + } } - writes.putAll(currentIteration); - } while (somethingChanged); + } while (pendingCount > 0); } catch (Throwable e) { for (var read : tx.reads().entrySet()) { if (read.getValue() instanceof TransactionObjectLocked locked) { diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/persistence/LmdbObjectPersistentStore.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/persistence/LmdbObjectPersistentStore.java index c72731de..0c50979c 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/persistence/LmdbObjectPersistentStore.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/persistence/LmdbObjectPersistentStore.java @@ -1,7 +1,10 @@ package com.usatiuk.dhfs.objects.persistence; import com.google.protobuf.ByteString; -import com.usatiuk.dhfs.objects.*; +import com.usatiuk.dhfs.objects.CloseableKvIterator; +import com.usatiuk.dhfs.objects.JObjectKey; +import com.usatiuk.dhfs.objects.KeyPredicateKvIterator; +import com.usatiuk.dhfs.objects.ReversibleKvIterator; import com.usatiuk.dhfs.objects.snapshot.Snapshot; import com.usatiuk.dhfs.supportlib.UninitializedByteBuffer; import com.usatiuk.dhfs.utils.RefcountedCloseable; @@ -23,7 +26,9 @@ import java.lang.ref.Cleaner; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.nio.file.Path; -import java.util.*; +import java.util.Arrays; +import java.util.NoSuchElementException; +import java.util.Optional; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Consumer;