slightly nicer tx dependency tracking

This commit is contained in:
2024-12-07 16:58:23 +01:00
parent aa69ae13a4
commit b92877025f
6 changed files with 141 additions and 24 deletions

View File

@@ -24,6 +24,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
// Manages all access to com.usatiuk.objects.common.runtime.JData objects. // Manages all access to com.usatiuk.objects.common.runtime.JData objects.
// In particular, it serves as a source of truth for what is committed to the backing storage. // In particular, it serves as a source of truth for what is committed to the backing storage.
// All data goes through it, it is responsible for transaction atomicity // All data goes through it, it is responsible for transaction atomicity
// TODO: persistent tx id
@ApplicationScoped @ApplicationScoped
public class JObjectManager { public class JObjectManager {
@Inject @Inject
@@ -173,28 +174,26 @@ public class JObjectManager {
var toFlush = new LinkedList<TxRecord.TxObjectRecordWrite<?>>(); var toFlush = new LinkedList<TxRecord.TxObjectRecordWrite<?>>();
var toPut = new LinkedList<TxRecord.TxObjectRecordNew<?>>(); var toPut = new LinkedList<TxRecord.TxObjectRecordNew<?>>();
var toLock = new ArrayList<TxRecord.TxObjectRecordOptimistic<?>>(); var toLock = new ArrayList<TransactionObject<?>>();
var dependencies = new LinkedList<TransactionObject<?>>(); var dependencies = new LinkedList<TransactionObject<?>>();
Log.trace("Committing transaction " + tx.getId()); Log.trace("Committing transaction " + tx.getId());
// For existing objects:
// Check that their version is not higher than the version of transaction being committed
// TODO: check deletions, inserts
try { try {
for (var entry : tx.drain()) { for (var entry : tx.writes()) {
Log.trace("Processing entry " + entry.toString()); Log.trace("Processing write " + entry.toString());
switch (entry) { switch (entry) {
case TxRecord.TxObjectRecordCopyLock<?> copy -> { case TxRecord.TxObjectRecordCopyLock<?> copy -> {
toUnlock.add(copy.original().lock().writeLock()::unlock); toUnlock.add(copy.original().lock().writeLock()::unlock);
dependencies.add(copy.original()); toFlush.add(copy);
if (copy.copy().isModified()) {
toFlush.add(copy);
}
} }
case TxRecord.TxObjectRecordOptimistic<?> copy -> { case TxRecord.TxObjectRecordOptimistic<?> copy -> {
toLock.add(copy); toLock.add(copy.original());
dependencies.add(copy.original()); toFlush.add(copy);
if (copy.copy().isModified()) {
toFlush.add(copy);
}
} }
case TxRecord.TxObjectRecordNew<?> created -> { case TxRecord.TxObjectRecordNew<?> created -> {
toPut.add(created); toPut.add(created);
@@ -203,21 +202,35 @@ public class JObjectManager {
} }
} }
toLock.sort(Comparator.comparingInt(a -> System.identityHashCode(a.original()))); for (var entry : tx.reads().entrySet()) {
Log.trace("Processing read " + entry.toString());
switch (entry.getValue()) {
case ReadTrackingObjectSource.TxReadObjectNone<?> none -> {
// TODO: Check this
}
case ReadTrackingObjectSource.TxReadObjectSome<?>(var obj) -> {
toLock.add(obj);
dependencies.add(obj);
}
default -> throw new IllegalStateException("Unexpected value: " + entry);
}
}
toLock.sort(Comparator.comparingInt(System::identityHashCode));
for (var record : toLock) { for (var record : toLock) {
Log.trace("Locking " + record.toString()); Log.trace("Locking " + record.toString());
var got = getLocked(record.original().data().getClass(), record.original().data().getKey(), true); var got = getLocked(record.data().getClass(), record.data().getKey(), true);
if (got == null) { if (got == null) {
throw new IllegalStateException("Object " + record.original().data().getKey() + " not found"); throw new IllegalStateException("Object " + record.data().getKey() + " not found");
} }
toUnlock.add(got.wrapper().lock.writeLock()::unlock); toUnlock.add(got.wrapper().lock.writeLock()::unlock);
if (got.obj() != record.original().data()) { if (got.obj() != record.data()) {
throw new IllegalStateException("Object changed during transaction: " + got.obj() + " vs " + record.original().data()); throw new IllegalStateException("Object changed during transaction: " + got.obj() + " vs " + record.data());
} }
} }
@@ -244,9 +257,13 @@ public class JObjectManager {
} }
for (var record : toFlush) { for (var record : toFlush) {
if (!record.copy().isModified()) {
Log.trace("Not changed " + record.toString());
continue;
}
Log.trace("Flushing changed " + record.toString()); Log.trace("Flushing changed " + record.toString());
var current = _objects.get(record.original().data().getKey()); var current = _objects.get(record.original().data().getKey());
assert record.copy().isModified();
var newWrapper = new JDataWrapper<>(record.copy().wrapped()); var newWrapper = new JDataWrapper<>(record.copy().wrapped());
newWrapper.lock.writeLock().lock(); newWrapper.lock.writeLock().lock();

View File

@@ -45,7 +45,7 @@ public class TransactionManagerImpl implements TransactionManager {
@Override @Override
public void rollback() { public void rollback() {
var tx = _currentTransaction.get(); var tx = _currentTransaction.get();
for (var o : tx.drain()) { for (var o : tx.writes()) {
switch (o) { switch (o) {
case TxRecord.TxObjectRecordCopyLock<?> r -> r.original().lock().writeLock().unlock(); case TxRecord.TxObjectRecordCopyLock<?> r -> r.original().lock().writeLock().unlock();
default -> { default -> {

View File

@@ -0,0 +1,83 @@
package com.usatiuk.dhfs.objects.transaction;
import com.usatiuk.objects.common.runtime.JData;
import com.usatiuk.objects.common.runtime.JObjectKey;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
public class ReadTrackingObjectSource implements TransactionObjectSource {
private final TransactionObjectSource _delegate;
public interface TxReadObject<T extends JData> {}
public record TxReadObjectNone<T extends JData>() implements TxReadObject<T> {}
public record TxReadObjectSome<T extends JData>(TransactionObject<T> obj) implements TxReadObject<T> {}
private final Map<JObjectKey, TxReadObject<?>> _readSet = new HashMap<>();
public ReadTrackingObjectSource(TransactionObjectSource delegate) {
_delegate = delegate;
}
public Map<JObjectKey, TxReadObject<?>> getRead() {
return Collections.unmodifiableMap(_readSet);
}
@Override
public <T extends JData> Optional<TransactionObject<T>> get(Class<T> type, JObjectKey key) {
var got = _readSet.get(key);
if (got == null) {
var read = _delegate.get(type, key);
if (read.isPresent()) {
_readSet.put(key, new TxReadObjectSome<>(read.get()));
} else {
_readSet.put(key, new TxReadObjectNone<>());
}
return read;
}
return switch (got) {
case TxReadObjectNone<?> none -> Optional.empty();
case TxReadObjectSome<?> some -> {
if (type.isInstance(some.obj().data())) {
yield Optional.of((TransactionObject<T>) some.obj());
} else {
yield Optional.empty();
}
}
default -> throw new IllegalStateException("Unexpected value: " + got);
};
}
@Override
public <T extends JData> Optional<TransactionObject<T>> getWriteLocked(Class<T> type, JObjectKey key) {
var got = _readSet.get(key);
if (got == null) {
var read = _delegate.getWriteLocked(type, key);
if (read.isPresent()) {
_readSet.put(key, new TxReadObjectSome<>(read.get()));
} else {
_readSet.put(key, new TxReadObjectNone<>());
}
return read;
}
return switch (got) {
case TxReadObjectNone<?> none -> Optional.empty();
case TxReadObjectSome<?> some -> {
if (type.isInstance(some.obj().data())) {
yield Optional.of((TransactionObject<T>) some.obj());
} else {
yield Optional.empty();
}
}
default -> throw new IllegalStateException("Unexpected value: " + got);
};
}
}

View File

@@ -18,13 +18,13 @@ public class TransactionFactoryImpl implements TransactionFactory {
private class TransactionImpl implements TransactionPrivate { private class TransactionImpl implements TransactionPrivate {
@Getter(AccessLevel.PUBLIC) @Getter(AccessLevel.PUBLIC)
private final long _id; private final long _id;
private final TransactionObjectSource _source; private final ReadTrackingObjectSource _source;
private final Map<JObjectKey, TxRecord.TxObjectRecord<?>> _objects = new HashMap<>(); private final Map<JObjectKey, TxRecord.TxObjectRecord<?>> _objects = new HashMap<>();
private TransactionImpl(long id, TransactionObjectSource source) { private TransactionImpl(long id, TransactionObjectSource source) {
_id = id; _id = id;
_source = source; _source = new ReadTrackingObjectSource(source);
} }
@Override @Override
@@ -75,9 +75,14 @@ public class TransactionFactoryImpl implements TransactionFactory {
} }
@Override @Override
public Collection<TxRecord.TxObjectRecord<?>> drain() { public Collection<TxRecord.TxObjectRecord<?>> writes() {
return Collections.unmodifiableCollection(_objects.values()); return Collections.unmodifiableCollection(_objects.values());
} }
@Override
public Map<JObjectKey, ReadTrackingObjectSource.TxReadObject<?>> reads() {
return _source.getRead();
}
} }
@Override @Override

View File

@@ -1,8 +1,13 @@
package com.usatiuk.dhfs.objects.transaction; package com.usatiuk.dhfs.objects.transaction;
import com.usatiuk.objects.common.runtime.JObjectKey;
import java.util.Collection; import java.util.Collection;
import java.util.Map;
// The transaction interface actually used by user code to retrieve objects // The transaction interface actually used by user code to retrieve objects
public interface TransactionPrivate extends Transaction{ public interface TransactionPrivate extends Transaction {
Collection<TxRecord.TxObjectRecord<?>> drain(); Collection<TxRecord.TxObjectRecord<?>> writes();
Map<JObjectKey, ReadTrackingObjectSource.TxReadObject<?>> reads();
} }

View File

@@ -9,6 +9,13 @@ public class TxRecord {
T getIfStrategyCompatible(JObjectKey key, LockingStrategy strategy); T getIfStrategyCompatible(JObjectKey key, LockingStrategy strategy);
} }
public record TxObjectRecordMissing<T extends JData>(JObjectKey key) implements TxObjectRecord<T> {
@Override
public T getIfStrategyCompatible(JObjectKey key, LockingStrategy strategy) {
return null;
}
}
public interface TxObjectRecordWrite<T extends JData> extends TxObjectRecord<T> { public interface TxObjectRecordWrite<T extends JData> extends TxObjectRecord<T> {
TransactionObject<T> original(); TransactionObject<T> original();