Objects: simplify tx commit a little

This commit is contained in:
2025-04-29 15:25:39 +02:00
parent 2766ef1bae
commit df27e41caf

View File

@@ -70,7 +70,7 @@ public class JObjectManager {
CommitHookIterationData[] hookIterationDataArray = new CommitHookIterationData[_preCommitTxHooks.size()];
for (int i = 0; i < _preCommitTxHooks.size(); i++) {
var hook = _preCommitTxHooks.get(i);
hookIterationDataArray[i] = new CommitHookIterationData(hook, new HashMap<>(), new HashMap<>());
hookIterationDataArray[i] = new CommitHookIterationData(hook, new HashMap<>());
}
hookIterationData = List.of(hookIterationDataArray);
}
@@ -83,6 +83,18 @@ public class JObjectManager {
writes.put(n.key(), n);
}
var lastIterationState = new HashMap<JObjectKey, TxRecord.TxObjectRecord<?>>();
var curIterationWrites = new HashMap<JObjectKey, TxRecord.TxObjectRecord<?>>();
Function<JObjectKey, JData> getPrev =
key -> switch (lastIterationState.get(key)) {
case TxRecord.TxObjectRecordWrite<?> write -> write.data();
case TxRecord.TxObjectRecordDeleted deleted -> null;
case null -> tx.getFromSource(JData.class, key).orElse(null);
default -> {
throw new TxCommitException("Unexpected value: " + writes.get(key));
}
};
// Run hooks for all objects
// Every hook should see every change made to every object, yet the object's evolution
// should be consistent from the view point of each individual hook
@@ -92,25 +104,14 @@ public class JObjectManager {
while (pendingCount > 0) {
for (var hookId : hookIterationData) {
var hook = hookId.hook();
var lastCurHookSeen = hookId.lastWrites();
Function<JObjectKey, JData> getPrev =
key -> switch (lastCurHookSeen.get(key)) {
case TxRecord.TxObjectRecordWrite<?> write -> write.data();
case TxRecord.TxObjectRecordDeleted deleted -> null;
case null -> tx.getFromSource(JData.class, key).orElse(null);
default -> {
throw new TxCommitException("Unexpected value: " + writes.get(key));
}
};
var curIteration = hookId.pendingWrites();
// Log.trace("Commit iteration with " + curIteration.size() + " records for hook " + hook.getClass());
for (var entry : curIteration.entrySet()) {
// Log.trace("Running pre-commit hook " + hook.getClass() + " for" + entry.getKey());
// Log.trace("Running pre-commit hook " + hook.getClass() + " for " + entry.getKey());
var oldObj = getPrev.apply(entry.getKey());
lastCurHookSeen.put(entry.getKey(), entry.getValue());
switch (entry.getValue()) {
case TxRecord.TxObjectRecordWrite<?> write -> {
if (oldObj == null) {
@@ -131,18 +132,19 @@ public class JObjectManager {
for (var n : tx.drainNewWrites()) {
for (var hookPut : hookIterationData) {
if (hookPut == hookId) {
lastCurHookSeen.put(n.key(), n);
continue;
if (hookPut != hookId) {
var before = hookPut.pendingWrites().put(n.key(), n);
if (before == null)
pendingCount++;
}
var before = hookPut.pendingWrites().put(n.key(), n);
if (before == null)
pendingCount++;
}
writes.put(n.key(), n);
curIterationWrites.put(n.key(), n);
}
}
lastIterationState.putAll(curIterationWrites);
curIterationWrites.clear();
}
writes.putAll(lastIterationState);
} catch (Throwable e) {
for (var read : tx.reads().entrySet()) {
if (read.getValue() instanceof TransactionObjectLocked<?> locked) {
@@ -164,11 +166,11 @@ public class JObjectManager {
toLock.add(read.getKey());
}
}
for (var write : writes.entrySet()) {
if (!readSet.containsKey(write.getKey()))
toLock.add(write.getKey());
for (var write : writes.keySet()) {
if (!readSet.containsKey(write))
toLock.add(write);
}
Collections.sort(toLock);
toLock.sort(null);
for (var key : toLock) {
var lock = lockManager.lockObject(key);
toUnlock.add(lock);
@@ -192,14 +194,16 @@ public class JObjectManager {
writebackObjectPersistentStore.asyncFence(finalVersion, r);
};
var onCommit = tx.getOnCommit();
var onFlush = tx.getOnFlush();
return Pair.of(
Stream.concat(
tx.getOnCommit().stream(),
Stream.<Runnable>of(() -> {
for (var f : tx.getOnFlush())
fenceFn.accept(f);
})
).toList(),
List.of(() -> {
for (var f : onCommit)
f.run();
for (var f : onFlush)
fenceFn.accept(f);
}),
new TransactionHandle() {
@Override
public void onFlush(Runnable runnable) {
@@ -221,7 +225,7 @@ public class JObjectManager {
}
if (current.isEmpty()) {
// TODO: Every write gets a dependency due to hooks
// Every write gets a dependency due to hooks
continue;
// assert false;
// throw new TxCommitException("Serialization hazard: " + dep.isEmpty() + " vs " + read.getValue().value().isEmpty());
@@ -246,7 +250,7 @@ public class JObjectManager {
}
return Pair.of(
List.copyOf(tx.getOnCommit()),
tx.getOnCommit(),
new TransactionHandle() {
@Override
public void onFlush(Runnable runnable) {
@@ -278,7 +282,6 @@ public class JObjectManager {
}
private record CommitHookIterationData(PreCommitTxHook hook,
Map<JObjectKey, TxRecord.TxObjectRecord<?>> lastWrites,
Map<JObjectKey, TxRecord.TxObjectRecord<?>> pendingWrites) {
}
}