diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/transaction/JObjectManager.java b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/transaction/JObjectManager.java index 5b3a1ddc..29db8322 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/transaction/JObjectManager.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/transaction/JObjectManager.java @@ -70,7 +70,7 @@ public class JObjectManager { CommitHookIterationData[] hookIterationDataArray = new CommitHookIterationData[_preCommitTxHooks.size()]; for (int i = 0; i < _preCommitTxHooks.size(); i++) { var hook = _preCommitTxHooks.get(i); - hookIterationDataArray[i] = new CommitHookIterationData(hook, new HashMap<>(), new HashMap<>()); + hookIterationDataArray[i] = new CommitHookIterationData(hook, new HashMap<>()); } hookIterationData = List.of(hookIterationDataArray); } @@ -83,6 +83,18 @@ public class JObjectManager { writes.put(n.key(), n); } + var lastIterationState = new HashMap>(); + var curIterationWrites = new HashMap>(); + Function getPrev = + key -> switch (lastIterationState.get(key)) { + case TxRecord.TxObjectRecordWrite write -> write.data(); + case TxRecord.TxObjectRecordDeleted deleted -> null; + case null -> tx.getFromSource(JData.class, key).orElse(null); + default -> { + throw new TxCommitException("Unexpected value: " + writes.get(key)); + } + }; + // Run hooks for all objects // Every hook should see every change made to every object, yet the object's evolution // should be consistent from the view point of each individual hook @@ -92,25 +104,14 @@ public class JObjectManager { while (pendingCount > 0) { for (var hookId : hookIterationData) { var hook = hookId.hook(); - var lastCurHookSeen = hookId.lastWrites(); - Function getPrev = - key -> switch (lastCurHookSeen.get(key)) { - case TxRecord.TxObjectRecordWrite write -> write.data(); - case TxRecord.TxObjectRecordDeleted deleted -> null; - case null -> tx.getFromSource(JData.class, key).orElse(null); - default -> { - throw new TxCommitException("Unexpected value: " + writes.get(key)); - } - }; var curIteration = hookId.pendingWrites(); // Log.trace("Commit iteration with " + curIteration.size() + " records for hook " + hook.getClass()); for (var entry : curIteration.entrySet()) { -// Log.trace("Running pre-commit hook " + hook.getClass() + " for" + entry.getKey()); +// Log.trace("Running pre-commit hook " + hook.getClass() + " for " + entry.getKey()); var oldObj = getPrev.apply(entry.getKey()); - lastCurHookSeen.put(entry.getKey(), entry.getValue()); switch (entry.getValue()) { case TxRecord.TxObjectRecordWrite write -> { if (oldObj == null) { @@ -131,18 +132,19 @@ public class JObjectManager { for (var n : tx.drainNewWrites()) { for (var hookPut : hookIterationData) { - if (hookPut == hookId) { - lastCurHookSeen.put(n.key(), n); - continue; + if (hookPut != hookId) { + var before = hookPut.pendingWrites().put(n.key(), n); + if (before == null) + pendingCount++; } - var before = hookPut.pendingWrites().put(n.key(), n); - if (before == null) - pendingCount++; } - writes.put(n.key(), n); + curIterationWrites.put(n.key(), n); } } + lastIterationState.putAll(curIterationWrites); + curIterationWrites.clear(); } + writes.putAll(lastIterationState); } catch (Throwable e) { for (var read : tx.reads().entrySet()) { if (read.getValue() instanceof TransactionObjectLocked locked) { @@ -164,11 +166,11 @@ public class JObjectManager { toLock.add(read.getKey()); } } - for (var write : writes.entrySet()) { - if (!readSet.containsKey(write.getKey())) - toLock.add(write.getKey()); + for (var write : writes.keySet()) { + if (!readSet.containsKey(write)) + toLock.add(write); } - Collections.sort(toLock); + toLock.sort(null); for (var key : toLock) { var lock = lockManager.lockObject(key); toUnlock.add(lock); @@ -192,14 +194,16 @@ public class JObjectManager { writebackObjectPersistentStore.asyncFence(finalVersion, r); }; + var onCommit = tx.getOnCommit(); + var onFlush = tx.getOnFlush(); + return Pair.of( - Stream.concat( - tx.getOnCommit().stream(), - Stream.of(() -> { - for (var f : tx.getOnFlush()) - fenceFn.accept(f); - }) - ).toList(), + List.of(() -> { + for (var f : onCommit) + f.run(); + for (var f : onFlush) + fenceFn.accept(f); + }), new TransactionHandle() { @Override public void onFlush(Runnable runnable) { @@ -221,7 +225,7 @@ public class JObjectManager { } if (current.isEmpty()) { - // TODO: Every write gets a dependency due to hooks + // Every write gets a dependency due to hooks continue; // assert false; // throw new TxCommitException("Serialization hazard: " + dep.isEmpty() + " vs " + read.getValue().value().isEmpty()); @@ -246,7 +250,7 @@ public class JObjectManager { } return Pair.of( - List.copyOf(tx.getOnCommit()), + tx.getOnCommit(), new TransactionHandle() { @Override public void onFlush(Runnable runnable) { @@ -278,7 +282,6 @@ public class JObjectManager { } private record CommitHookIterationData(PreCommitTxHook hook, - Map> lastWrites, Map> pendingWrites) { } } \ No newline at end of file