Objects: remove transactionobject

This commit is contained in:
2025-05-01 09:14:50 +02:00
parent 8ab034402d
commit 46bc9fa810
6 changed files with 95 additions and 161 deletions

View File

@@ -59,98 +59,89 @@ public class JObjectManager {
verifyReady();
var writes = new HashMap<JObjectKey, TxRecord.TxObjectRecord<?>>();
Snapshot<JObjectKey, JDataVersionedWrapper> commitSnapshot = null;
Map<JObjectKey, TransactionObject<?>> readSet = null;
Map<JObjectKey, Optional<JDataVersionedWrapper>> readSet = null;
Collection<AutoCloseableNoThrow> toUnlock = null;
try {
try {
long pendingCount = 0;
List<CommitHookIterationData> hookIterationData;
{
CommitHookIterationData[] hookIterationDataArray = new CommitHookIterationData[_preCommitTxHooks.size()];
for (int i = 0; i < _preCommitTxHooks.size(); i++) {
var hook = _preCommitTxHooks.get(i);
hookIterationDataArray[i] = new CommitHookIterationData(hook, new HashMap<>(), new HashMap<>());
}
hookIterationData = List.of(hookIterationDataArray);
long pendingCount = 0;
List<CommitHookIterationData> hookIterationData;
{
CommitHookIterationData[] hookIterationDataArray = new CommitHookIterationData[_preCommitTxHooks.size()];
for (int i = 0; i < _preCommitTxHooks.size(); i++) {
var hook = _preCommitTxHooks.get(i);
hookIterationDataArray[i] = new CommitHookIterationData(hook, new HashMap<>(), new HashMap<>());
}
hookIterationData = List.of(hookIterationDataArray);
}
for (var n : tx.drainNewWrites()) {
for (var hookPut : hookIterationData) {
hookPut.pendingWrites().put(n.key(), n);
pendingCount++;
}
writes.put(n.key(), n);
for (var n : tx.drainNewWrites()) {
for (var hookPut : hookIterationData) {
hookPut.pendingWrites().put(n.key(), n);
pendingCount++;
}
writes.put(n.key(), n);
}
// Run hooks for all objects
// Every hook should see every change made to every object, yet the object's evolution
// should be consistent from the view point of each individual hook
// For example, when a hook makes changes to an object, and another hook changes the object before/after it
// on the next iteration, the first hook should receive the version of the object it had created
// as the "old" version, and the new version with all the changes after it.
while (pendingCount > 0) {
for (var hookId : hookIterationData) {
var hook = hookId.hook();
var lastCurHookSeen = hookId.lastWrites();
Function<JObjectKey, JData> getPrev =
key -> switch (lastCurHookSeen.get(key)) {
case TxRecord.TxObjectRecordWrite<?> write -> write.data();
case TxRecord.TxObjectRecordDeleted deleted -> null;
case null -> tx.getFromSource(JData.class, key).orElse(null);
default -> {
throw new TxCommitException("Unexpected value: " + writes.get(key));
}
};
// Run hooks for all objects
// Every hook should see every change made to every object, yet the object's evolution
// should be consistent from the view point of each individual hook
// For example, when a hook makes changes to an object, and another hook changes the object before/after it
// on the next iteration, the first hook should receive the version of the object it had created
// as the "old" version, and the new version with all the changes after it.
while (pendingCount > 0) {
for (var hookId : hookIterationData) {
var hook = hookId.hook();
var lastCurHookSeen = hookId.lastWrites();
Function<JObjectKey, JData> getPrev =
key -> switch (lastCurHookSeen.get(key)) {
case TxRecord.TxObjectRecordWrite<?> write -> write.data();
case TxRecord.TxObjectRecordDeleted deleted -> null;
case null -> tx.getFromSource(JData.class, key).orElse(null);
default -> {
throw new TxCommitException("Unexpected value: " + writes.get(key));
}
};
var curIteration = hookId.pendingWrites();
var curIteration = hookId.pendingWrites();
// Log.trace("Commit iteration with " + curIteration.size() + " records for hook " + hook.getClass());
for (var entry : curIteration.entrySet()) {
for (var entry : curIteration.entrySet()) {
// Log.trace("Running pre-commit hook " + hook.getClass() + " for" + entry.getKey());
var oldObj = getPrev.apply(entry.getKey());
lastCurHookSeen.put(entry.getKey(), entry.getValue());
switch (entry.getValue()) {
case TxRecord.TxObjectRecordWrite<?> write -> {
if (oldObj == null) {
hook.onCreate(write.key(), write.data());
} else {
hook.onChange(write.key(), oldObj, write.data());
}
var oldObj = getPrev.apply(entry.getKey());
lastCurHookSeen.put(entry.getKey(), entry.getValue());
switch (entry.getValue()) {
case TxRecord.TxObjectRecordWrite<?> write -> {
if (oldObj == null) {
hook.onCreate(write.key(), write.data());
} else {
hook.onChange(write.key(), oldObj, write.data());
}
case TxRecord.TxObjectRecordDeleted deleted -> {
hook.onDelete(deleted.key(), oldObj);
}
default -> throw new TxCommitException("Unexpected value: " + entry);
}
}
pendingCount -= curIteration.size();
curIteration.clear();
for (var n : tx.drainNewWrites()) {
for (var hookPut : hookIterationData) {
if (hookPut == hookId) {
lastCurHookSeen.put(n.key(), n);
continue;
}
var before = hookPut.pendingWrites().put(n.key(), n);
if (before == null)
pendingCount++;
}
writes.put(n.key(), n);
case TxRecord.TxObjectRecordDeleted deleted -> {
hook.onDelete(deleted.key(), oldObj);
}
default -> throw new TxCommitException("Unexpected value: " + entry);
}
}
} catch (Throwable e) {
for (var read : tx.reads().entrySet()) {
if (read.getValue() instanceof TransactionObjectLocked<?> locked) {
locked.lock().close();
pendingCount -= curIteration.size();
curIteration.clear();
for (var n : tx.drainNewWrites()) {
for (var hookPut : hookIterationData) {
if (hookPut == hookId) {
lastCurHookSeen.put(n.key(), n);
continue;
}
var before = hookPut.pendingWrites().put(n.key(), n);
if (before == null)
pendingCount++;
}
writes.put(n.key(), n);
}
}
throw e;
}
readSet = tx.reads();
@@ -159,11 +150,7 @@ public class JObjectManager {
toUnlock = new ArrayList<>(readSet.size() + writes.size());
ArrayList<JObjectKey> toLock = new ArrayList<>(readSet.size() + writes.size());
for (var read : readSet.entrySet()) {
if (read.getValue() instanceof TransactionObjectLocked<?> locked) {
toUnlock.add(locked.lock());
} else {
toLock.add(read.getKey());
}
toLock.add(read.getKey());
}
for (var write : writes.keySet()) {
if (!readSet.containsKey(write))
@@ -171,6 +158,8 @@ public class JObjectManager {
}
toLock.sort(null);
for (var key : toLock) {
if (tx.knownNew().contains(key))
continue;
var lock = lockManager.lockObject(key);
toUnlock.add(lock);
}
@@ -182,10 +171,7 @@ public class JObjectManager {
long version = 0L;
for (var read : readSet.values()) {
version = Math.max(version, read.data().map(JDataVersionedWrapper::version).orElse(0L));
if (read instanceof TransactionObjectLocked<?> locked) {
locked.lock().close();
}
version = Math.max(version, read.map(JDataVersionedWrapper::version).orElse(0L));
}
long finalVersion = version;
@@ -218,9 +204,9 @@ public class JObjectManager {
for (var read : readSet.entrySet()) {
var current = commitSnapshot.readObject(read.getKey());
if (current.isEmpty() != read.getValue().data().isEmpty()) {
if (current.isEmpty() != read.getValue().isEmpty()) {
Log.tracev("Checking read dependency {0} - not found", read.getKey());
throw new TxCommitException("Serialization hazard: " + current.isEmpty() + " vs " + read.getValue().data().isEmpty());
throw new TxCommitException("Serialization hazard: " + current.isEmpty() + " vs " + read.getValue().isEmpty());
}
if (current.isEmpty()) {
@@ -272,11 +258,6 @@ public class JObjectManager {
public void rollback(TransactionPrivate tx) {
verifyReady();
tx.reads().forEach((key, value) -> {
if (value instanceof TransactionObjectLocked<?> locked) {
locked.lock().close();
}
});
tx.close();
}

View File

@@ -56,10 +56,10 @@ public class TransactionFactoryImpl implements TransactionFactory {
}
private class TransactionImpl implements TransactionPrivate {
private final Map<JObjectKey, TransactionObject<?>> _readSet = new HashMap<>();
private final Map<JObjectKey, Optional<JDataVersionedWrapper>> _readSet = new HashMap<>();
private final NavigableMap<JObjectKey, TxRecord.TxObjectRecord<?>> _writes = new TreeMap<>();
private final List<Runnable> _onCommit = new ArrayList<>();
private final List<Runnable> _onFlush = new ArrayList<>();
private final List<Runnable> _onCommit = new LinkedList<>();
private final List<Runnable> _onFlush = new LinkedList<>();
private final HashSet<JObjectKey> _knownNew = new HashSet<>();
private final Snapshot<JObjectKey, JDataVersionedWrapper> _snapshot;
private boolean _closed = false;
@@ -99,30 +99,14 @@ public class TransactionFactoryImpl implements TransactionFactory {
if (_knownNew.contains(key)) {
return Optional.empty();
}
return _readSet.computeIfAbsent(key, k -> {
var read = _snapshot.readObject(k);
return new TransactionObjectNoLock<>(read);
})
.data()
.map(w -> type.cast(w.data()));
}
public <T extends JData> Optional<T> getWriteLockedFromSource(Class<T> type, JObjectKey key) {
var got = _readSet.get(key);
if (got == null) {
var lock = lockManager.lockObject(key);
try {
var read = _snapshot.readObject(key);
_readSet.put(key, new TransactionObjectLocked<>(read, lock));
return read.map(JDataVersionedWrapper::data).map(type::cast);
} catch (Exception e) {
lock.close();
throw e;
}
}
return got.data().map(JDataVersionedWrapper::data).map(type::cast);
var got = _readSet.computeIfAbsent(key, k -> {
var read = _snapshot.readObject(k);
return read;
});
if (got.isEmpty())
return Optional.empty();
var gotData = got.get();
return Optional.of(type.cast(gotData.data()));
}
@Override
@@ -138,13 +122,7 @@ public class TransactionFactoryImpl implements TransactionFactory {
}
}
if (neverLock)
return getFromSource(type, key);
return switch (strategy) {
case OPTIMISTIC -> getFromSource(type, key);
case WRITE -> getWriteLockedFromSource(type, key);
};
return getFromSource(type, key);
}
@Override
@@ -186,7 +164,7 @@ public class TransactionFactoryImpl implements TransactionFactory {
@Override
public void put(JData obj) {
var read = _readSet.get(obj.key());
if (read != null && (read.data().map(JDataVersionedWrapper::data).orElse(null) == obj)) {
if (read != null && (read.map(JDataVersionedWrapper::data).orElse(null) == obj)) {
return;
}
@@ -210,8 +188,13 @@ public class TransactionFactoryImpl implements TransactionFactory {
}
@Override
public Map<JObjectKey, TransactionObject<?>> reads() {
return Collections.unmodifiableMap(_readSet);
public Map<JObjectKey, Optional<JDataVersionedWrapper>> reads() {
return _readSet;
}
@Override
public Set<JObjectKey> knownNew() {
return _knownNew;
}
@Override
@@ -247,7 +230,7 @@ public class TransactionFactoryImpl implements TransactionFactory {
public Pair<JObjectKey, JData> prev() {
var got = _backing.prev();
if (got.getValue() instanceof ReadTrackingInternalCrapSource(JDataVersionedWrapper wrapped)) {
_readSet.putIfAbsent(got.getKey(), new TransactionObjectNoLock<>(Optional.of(wrapped)));
_readSet.putIfAbsent(got.getKey(), Optional.of(wrapped));
}
return Pair.of(got.getKey(), got.getValue().obj());
}
@@ -276,7 +259,7 @@ public class TransactionFactoryImpl implements TransactionFactory {
public Pair<JObjectKey, JData> next() {
var got = _backing.next();
if (got.getValue() instanceof ReadTrackingInternalCrapSource(JDataVersionedWrapper wrapped)) {
_readSet.putIfAbsent(got.getKey(), new TransactionObjectNoLock<>(Optional.of(wrapped)));
_readSet.putIfAbsent(got.getKey(), Optional.of(wrapped));
}
return Pair.of(got.getKey(), got.getValue().obj());
}

View File

@@ -1,10 +0,0 @@
package com.usatiuk.objects.transaction;
import com.usatiuk.objects.JData;
import com.usatiuk.objects.JDataVersionedWrapper;
import java.util.Optional;
public interface TransactionObject<T extends JData> {
Optional<JDataVersionedWrapper> data();
}

View File

@@ -1,12 +0,0 @@
package com.usatiuk.objects.transaction;
import com.usatiuk.objects.JData;
import com.usatiuk.objects.JDataVersionedWrapper;
import com.usatiuk.utils.AutoCloseableNoThrow;
import java.util.Optional;
public record TransactionObjectLocked<T extends JData>
(Optional<JDataVersionedWrapper> data, AutoCloseableNoThrow lock)
implements TransactionObject<T> {
}

View File

@@ -1,11 +0,0 @@
package com.usatiuk.objects.transaction;
import com.usatiuk.objects.JData;
import com.usatiuk.objects.JDataVersionedWrapper;
import java.util.Optional;
public record TransactionObjectNoLock<T extends JData>
(Optional<JDataVersionedWrapper> data)
implements TransactionObject<T> {
}

View File

@@ -9,12 +9,15 @@ import com.usatiuk.utils.AutoCloseableNoThrow;
import java.util.Collection;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
// The transaction interface actually used by user code to retrieve objects
public interface TransactionPrivate extends Transaction, TransactionHandlePrivate, AutoCloseableNoThrow {
Collection<TxRecord.TxObjectRecord<?>> drainNewWrites();
Map<JObjectKey, TransactionObject<?>> reads();
Map<JObjectKey, Optional<JDataVersionedWrapper>> reads();
Set<JObjectKey> knownNew();
<T extends JData> Optional<T> getFromSource(Class<T> type, JObjectKey key);