mirror of
https://github.com/usatiuk/dhfs.git
synced 2025-10-28 20:47:49 +01:00
Objects: seemingly more reasonably working tx hooks
This commit is contained in:
@@ -10,6 +10,7 @@ import jakarta.enterprise.context.ApplicationScoped;
|
||||
import jakarta.enterprise.event.Observes;
|
||||
import jakarta.enterprise.inject.Instance;
|
||||
import jakarta.inject.Inject;
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.function.Consumer;
|
||||
@@ -40,7 +41,8 @@ public class JObjectManager {
|
||||
}
|
||||
|
||||
JObjectManager(Instance<PreCommitTxHook> preCommitTxHooks) {
|
||||
_preCommitTxHooks = preCommitTxHooks.stream().sorted(Comparator.comparingInt(PreCommitTxHook::getPriority)).toList();
|
||||
_preCommitTxHooks = List.copyOf(preCommitTxHooks.stream().sorted(Comparator.comparingInt(PreCommitTxHook::getPriority)).toList());
|
||||
Log.debugv("Pre-commit hooks: {0}", String.join("->", _preCommitTxHooks.stream().map(Objects::toString).toList()));
|
||||
}
|
||||
|
||||
public TransactionPrivate createTransaction() {
|
||||
@@ -66,34 +68,51 @@ public class JObjectManager {
|
||||
});
|
||||
};
|
||||
|
||||
// For existing objects:
|
||||
// Check that their version is not higher than the version of transaction being committed
|
||||
// TODO: check deletions, inserts
|
||||
try {
|
||||
try {
|
||||
Function<JObjectKey, JData> getPrev =
|
||||
key -> switch (writes.get(key)) {
|
||||
case TxRecord.TxObjectRecordWrite<?> write -> write.data();
|
||||
case TxRecord.TxObjectRecordDeleted deleted -> null;
|
||||
case null -> tx.getFromSource(JData.class, key).orElse(null);
|
||||
default -> {
|
||||
throw new TxCommitException("Unexpected value: " + writes.get(key));
|
||||
}
|
||||
};
|
||||
long pendingCount = 0;
|
||||
Map<PreCommitTxHook, Map<JObjectKey, TxRecord.TxObjectRecord<?>>> pendingWrites = Map.ofEntries(
|
||||
_preCommitTxHooks.stream().map(p -> Pair.of(p, new HashMap<>())).toArray(Pair[]::new)
|
||||
);
|
||||
Map<PreCommitTxHook, Map<JObjectKey, TxRecord.TxObjectRecord<?>>> lastWrites = Map.ofEntries(
|
||||
_preCommitTxHooks.stream().map(p -> Pair.of(p, new HashMap<>())).toArray(Pair[]::new)
|
||||
);
|
||||
|
||||
boolean somethingChanged;
|
||||
for (var n : tx.drainNewWrites()) {
|
||||
for (var hookPut : _preCommitTxHooks) {
|
||||
pendingWrites.get(hookPut).put(n.key(), n);
|
||||
pendingCount++;
|
||||
}
|
||||
writes.put(n.key(), n);
|
||||
}
|
||||
|
||||
// Run hooks for all objects
|
||||
// Every hook should see every change made to every object, yet the object's evolution
|
||||
// should be consistent from the view point of each individual hook
|
||||
// For example, when a hook makes changes to an object, and another hook changes the object before/after it
|
||||
// on the next iteration, the first hook should receive the version of the object it had created
|
||||
// as the "old" version, and the new version with all the changes after it.
|
||||
do {
|
||||
somethingChanged = false;
|
||||
Map<JObjectKey, TxRecord.TxObjectRecord<?>> currentIteration = new HashMap();
|
||||
for (var hook : _preCommitTxHooks) {
|
||||
for (var n : tx.drainNewWrites())
|
||||
currentIteration.put(n.key(), n);
|
||||
Log.trace("Commit iteration with " + currentIteration.size() + " records for hook " + hook.getClass());
|
||||
var lastCurHookSeen = lastWrites.get(hook);
|
||||
Function<JObjectKey, JData> getPrev =
|
||||
key -> switch (lastCurHookSeen.get(key)) {
|
||||
case TxRecord.TxObjectRecordWrite<?> write -> write.data();
|
||||
case TxRecord.TxObjectRecordDeleted deleted -> null;
|
||||
case null -> tx.getFromSource(JData.class, key).orElse(null);
|
||||
default -> {
|
||||
throw new TxCommitException("Unexpected value: " + writes.get(key));
|
||||
}
|
||||
};
|
||||
|
||||
for (var entry : currentIteration.entrySet()) {
|
||||
somethingChanged = true;
|
||||
var curIteration = pendingWrites.get(hook);
|
||||
|
||||
Log.trace("Commit iteration with " + curIteration.size() + " records for hook " + hook.getClass());
|
||||
|
||||
for (var entry : curIteration.entrySet()) {
|
||||
Log.trace("Running pre-commit hook " + hook.getClass() + " for" + entry.getKey());
|
||||
var oldObj = getPrev.apply(entry.getKey());
|
||||
lastCurHookSeen.put(entry.getKey(), entry.getValue());
|
||||
switch (entry.getValue()) {
|
||||
case TxRecord.TxObjectRecordWrite<?> write -> {
|
||||
if (oldObj == null) {
|
||||
@@ -108,9 +127,24 @@ public class JObjectManager {
|
||||
default -> throw new TxCommitException("Unexpected value: " + entry);
|
||||
}
|
||||
}
|
||||
|
||||
pendingCount -= curIteration.size();
|
||||
curIteration.clear();
|
||||
|
||||
for (var n : tx.drainNewWrites()) {
|
||||
for (var hookPut : _preCommitTxHooks) {
|
||||
if (hookPut == hook) {
|
||||
lastCurHookSeen.put(n.key(), n);
|
||||
continue;
|
||||
}
|
||||
var before = pendingWrites.get(hookPut).put(n.key(), n);
|
||||
if (before == null)
|
||||
pendingCount++;
|
||||
}
|
||||
writes.put(n.key(), n);
|
||||
}
|
||||
}
|
||||
writes.putAll(currentIteration);
|
||||
} while (somethingChanged);
|
||||
} while (pendingCount > 0);
|
||||
} catch (Throwable e) {
|
||||
for (var read : tx.reads().entrySet()) {
|
||||
if (read.getValue() instanceof TransactionObjectLocked<?> locked) {
|
||||
|
||||
@@ -1,7 +1,10 @@
|
||||
package com.usatiuk.dhfs.objects.persistence;
|
||||
|
||||
import com.google.protobuf.ByteString;
|
||||
import com.usatiuk.dhfs.objects.*;
|
||||
import com.usatiuk.dhfs.objects.CloseableKvIterator;
|
||||
import com.usatiuk.dhfs.objects.JObjectKey;
|
||||
import com.usatiuk.dhfs.objects.KeyPredicateKvIterator;
|
||||
import com.usatiuk.dhfs.objects.ReversibleKvIterator;
|
||||
import com.usatiuk.dhfs.objects.snapshot.Snapshot;
|
||||
import com.usatiuk.dhfs.supportlib.UninitializedByteBuffer;
|
||||
import com.usatiuk.dhfs.utils.RefcountedCloseable;
|
||||
@@ -23,7 +26,9 @@ import java.lang.ref.Cleaner;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.nio.file.Path;
|
||||
import java.util.*;
|
||||
import java.util.Arrays;
|
||||
import java.util.NoSuchElementException;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
Reference in New Issue
Block a user