Revert "Objects: simplify tx commit hooks"

This reverts commit c0735801b9.
This commit is contained in:
2025-05-01 13:42:06 +02:00
parent c0735801b9
commit b84ef95703

View File

@@ -19,6 +19,7 @@ import org.apache.commons.lang3.tuple.Pair;
import java.util.*; import java.util.*;
import java.util.function.Consumer; import java.util.function.Consumer;
import java.util.function.Function; import java.util.function.Function;
import java.util.stream.Stream;
@ApplicationScoped @ApplicationScoped
public class JObjectManager { public class JObjectManager {
@@ -36,7 +37,7 @@ public class JObjectManager {
} }
JObjectManager(Instance<PreCommitTxHook> preCommitTxHooks) { JObjectManager(Instance<PreCommitTxHook> preCommitTxHooks) {
Log.infov("Pre-commit hooks: {0}", String.join("->", _preCommitTxHooks.stream().map(Objects::toString).toList())); Log.debugv("Pre-commit hooks: {0}", String.join("->", _preCommitTxHooks.stream().map(Objects::toString).toList()));
} }
private void verifyReady() { private void verifyReady() {
@@ -62,21 +63,25 @@ public class JObjectManager {
Collection<AutoCloseableNoThrow> toUnlock = null; Collection<AutoCloseableNoThrow> toUnlock = null;
try { try {
Map<JObjectKey, TxRecord.TxObjectRecord<?>> lastIterationWrites = new HashMap<>(); long pendingCount = 0;
Map<JObjectKey, TxRecord.TxObjectRecord<?>> currentIterationPending = new HashMap<>(); List<CommitHookIterationData> hookIterationData;
Map<JObjectKey, TxRecord.TxObjectRecord<?>> currentIterationWrites = new HashMap<>(); {
CommitHookIterationData[] hookIterationDataArray = new CommitHookIterationData[_preCommitTxHooks.size()];
for (int i = 0; i < _preCommitTxHooks.size(); i++) {
var hook = _preCommitTxHooks.get(i);
hookIterationDataArray[i] = new CommitHookIterationData(hook, new HashMap<>(), new HashMap<>());
}
hookIterationData = List.of(hookIterationDataArray);
}
for (var n : tx.drainNewWrites()) { for (var n : tx.drainNewWrites()) {
currentIterationPending.put(n.key(), n); for (var hookPut : hookIterationData) {
hookPut.pendingWrites().put(n.key(), n);
pendingCount++;
}
writes.put(n.key(), n); writes.put(n.key(), n);
} }
Function<JObjectKey, JData> getPrev =
key -> switch (lastIterationWrites.get(key)) {
case TxRecord.TxObjectRecordWrite<?> write -> write.data();
case TxRecord.TxObjectRecordDeleted deleted -> null;
case null -> tx.getFromSource(JData.class, key).orElse(null);
};
// Run hooks for all objects // Run hooks for all objects
// Every hook should see every change made to every object, yet the object's evolution // Every hook should see every change made to every object, yet the object's evolution
@@ -84,18 +89,29 @@ public class JObjectManager {
// For example, when a hook makes changes to an object, and another hook changes the object before/after it // For example, when a hook makes changes to an object, and another hook changes the object before/after it
// on the next iteration, the first hook should receive the version of the object it had created // on the next iteration, the first hook should receive the version of the object it had created
// as the "old" version, and the new version with all the changes after it. // as the "old" version, and the new version with all the changes after it.
while (!currentIterationPending.isEmpty()) { while (pendingCount > 0) {
for (var hookId : hookIterationData) {
var hook = hookId.hook();
var lastCurHookSeen = hookId.lastWrites();
Function<JObjectKey, JData> getPrev =
key -> switch (lastCurHookSeen.get(key)) {
case TxRecord.TxObjectRecordWrite<?> write -> write.data();
case TxRecord.TxObjectRecordDeleted deleted -> null;
case null -> tx.getFromSource(JData.class, key).orElse(null);
default -> {
throw new TxCommitException("Unexpected value: " + writes.get(key));
}
};
var curIteration = hookId.pendingWrites();
// Log.trace("Commit iteration with " + curIteration.size() + " records for hook " + hook.getClass()); // Log.trace("Commit iteration with " + curIteration.size() + " records for hook " + hook.getClass());
for (var hook : _preCommitTxHooks) {
for (var entry : currentIterationPending.entrySet()) { for (var entry : curIteration.entrySet()) {
// Log.trace("Running pre-commit hook " + hook.getClass() + " for" + entry.getKey()); // Log.trace("Running pre-commit hook " + hook.getClass() + " for" + entry.getKey());
var oldObj = getPrev.apply(entry.getKey()); var oldObj = getPrev.apply(entry.getKey());
TxRecord.TxObjectRecord<?> curObj lastCurHookSeen.put(entry.getKey(), entry.getValue());
= Optional.ofNullable(currentIterationWrites.get(entry.getKey())) switch (entry.getValue()) {
.orElse((TxRecord.TxObjectRecord) entry.getValue());
switch (curObj) {
case TxRecord.TxObjectRecordWrite<?> write -> { case TxRecord.TxObjectRecordWrite<?> write -> {
if (oldObj == null) { if (oldObj == null) {
hook.onCreate(write.key(), write.data()); hook.onCreate(write.key(), write.data());
@@ -106,19 +122,27 @@ public class JObjectManager {
case TxRecord.TxObjectRecordDeleted deleted -> { case TxRecord.TxObjectRecordDeleted deleted -> {
hook.onDelete(deleted.key(), oldObj); hook.onDelete(deleted.key(), oldObj);
} }
} default -> throw new TxCommitException("Unexpected value: " + entry);
for (var n : tx.drainNewWrites()) {
currentIterationWrites.put(n.key(), n);
} }
} }
}
lastIterationWrites.putAll(currentIterationWrites);
currentIterationPending = currentIterationWrites;
currentIterationWrites = new HashMap<>();
}
writes.putAll(lastIterationWrites); pendingCount -= curIteration.size();
curIteration.clear();
for (var n : tx.drainNewWrites()) {
for (var hookPut : hookIterationData) {
if (hookPut == hookId) {
lastCurHookSeen.put(n.key(), n);
continue;
}
var before = hookPut.pendingWrites().put(n.key(), n);
if (before == null)
pendingCount++;
}
writes.put(n.key(), n);
}
}
}
readSet = tx.reads(); readSet = tx.reads();