diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/JObjectManager.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/JObjectManager.java index 687a15fa..e7466e59 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/JObjectManager.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/JObjectManager.java @@ -24,6 +24,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; // Manages all access to com.usatiuk.objects.common.runtime.JData objects. // In particular, it serves as a source of truth for what is committed to the backing storage. // All data goes through it, it is responsible for transaction atomicity +// TODO: persistent tx id @ApplicationScoped public class JObjectManager { @Inject @@ -173,28 +174,26 @@ public class JObjectManager { var toFlush = new LinkedList>(); var toPut = new LinkedList>(); - var toLock = new ArrayList>(); + var toLock = new ArrayList>(); var dependencies = new LinkedList>(); Log.trace("Committing transaction " + tx.getId()); + // For existing objects: + // Check that their version is not higher than the version of transaction being committed + // TODO: check deletions, inserts + try { - for (var entry : tx.drain()) { - Log.trace("Processing entry " + entry.toString()); + for (var entry : tx.writes()) { + Log.trace("Processing write " + entry.toString()); switch (entry) { case TxRecord.TxObjectRecordCopyLock copy -> { toUnlock.add(copy.original().lock().writeLock()::unlock); - dependencies.add(copy.original()); - if (copy.copy().isModified()) { - toFlush.add(copy); - } + toFlush.add(copy); } case TxRecord.TxObjectRecordOptimistic copy -> { - toLock.add(copy); - dependencies.add(copy.original()); - if (copy.copy().isModified()) { - toFlush.add(copy); - } + toLock.add(copy.original()); + toFlush.add(copy); } case TxRecord.TxObjectRecordNew created -> { toPut.add(created); @@ -203,21 +202,35 @@ public class JObjectManager { } } - toLock.sort(Comparator.comparingInt(a -> System.identityHashCode(a.original()))); + for (var entry : tx.reads().entrySet()) { + Log.trace("Processing read " + entry.toString()); + switch (entry.getValue()) { + case ReadTrackingObjectSource.TxReadObjectNone none -> { + // TODO: Check this + } + case ReadTrackingObjectSource.TxReadObjectSome(var obj) -> { + toLock.add(obj); + dependencies.add(obj); + } + default -> throw new IllegalStateException("Unexpected value: " + entry); + } + } + + toLock.sort(Comparator.comparingInt(System::identityHashCode)); for (var record : toLock) { Log.trace("Locking " + record.toString()); - var got = getLocked(record.original().data().getClass(), record.original().data().getKey(), true); + var got = getLocked(record.data().getClass(), record.data().getKey(), true); if (got == null) { - throw new IllegalStateException("Object " + record.original().data().getKey() + " not found"); + throw new IllegalStateException("Object " + record.data().getKey() + " not found"); } toUnlock.add(got.wrapper().lock.writeLock()::unlock); - if (got.obj() != record.original().data()) { - throw new IllegalStateException("Object changed during transaction: " + got.obj() + " vs " + record.original().data()); + if (got.obj() != record.data()) { + throw new IllegalStateException("Object changed during transaction: " + got.obj() + " vs " + record.data()); } } @@ -244,9 +257,13 @@ public class JObjectManager { } for (var record : toFlush) { + if (!record.copy().isModified()) { + Log.trace("Not changed " + record.toString()); + continue; + } + Log.trace("Flushing changed " + record.toString()); var current = _objects.get(record.original().data().getKey()); - assert record.copy().isModified(); var newWrapper = new JDataWrapper<>(record.copy().wrapped()); newWrapper.lock.writeLock().lock(); diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/TransactionManagerImpl.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/TransactionManagerImpl.java index f872e932..f08159a4 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/TransactionManagerImpl.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/TransactionManagerImpl.java @@ -45,7 +45,7 @@ public class TransactionManagerImpl implements TransactionManager { @Override public void rollback() { var tx = _currentTransaction.get(); - for (var o : tx.drain()) { + for (var o : tx.writes()) { switch (o) { case TxRecord.TxObjectRecordCopyLock r -> r.original().lock().writeLock().unlock(); default -> { diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/ReadTrackingObjectSource.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/ReadTrackingObjectSource.java new file mode 100644 index 00000000..a8337b95 --- /dev/null +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/ReadTrackingObjectSource.java @@ -0,0 +1,83 @@ +package com.usatiuk.dhfs.objects.transaction; + +import com.usatiuk.objects.common.runtime.JData; +import com.usatiuk.objects.common.runtime.JObjectKey; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +public class ReadTrackingObjectSource implements TransactionObjectSource { + private final TransactionObjectSource _delegate; + + public interface TxReadObject {} + + public record TxReadObjectNone() implements TxReadObject {} + + public record TxReadObjectSome(TransactionObject obj) implements TxReadObject {} + + private final Map> _readSet = new HashMap<>(); + + public ReadTrackingObjectSource(TransactionObjectSource delegate) { + _delegate = delegate; + } + + public Map> getRead() { + return Collections.unmodifiableMap(_readSet); + } + + @Override + public Optional> get(Class type, JObjectKey key) { + var got = _readSet.get(key); + + if (got == null) { + var read = _delegate.get(type, key); + if (read.isPresent()) { + _readSet.put(key, new TxReadObjectSome<>(read.get())); + } else { + _readSet.put(key, new TxReadObjectNone<>()); + } + return read; + } + + return switch (got) { + case TxReadObjectNone none -> Optional.empty(); + case TxReadObjectSome some -> { + if (type.isInstance(some.obj().data())) { + yield Optional.of((TransactionObject) some.obj()); + } else { + yield Optional.empty(); + } + } + default -> throw new IllegalStateException("Unexpected value: " + got); + }; + } + + @Override + public Optional> getWriteLocked(Class type, JObjectKey key) { + var got = _readSet.get(key); + + if (got == null) { + var read = _delegate.getWriteLocked(type, key); + if (read.isPresent()) { + _readSet.put(key, new TxReadObjectSome<>(read.get())); + } else { + _readSet.put(key, new TxReadObjectNone<>()); + } + return read; + } + + return switch (got) { + case TxReadObjectNone none -> Optional.empty(); + case TxReadObjectSome some -> { + if (type.isInstance(some.obj().data())) { + yield Optional.of((TransactionObject) some.obj()); + } else { + yield Optional.empty(); + } + } + default -> throw new IllegalStateException("Unexpected value: " + got); + }; + } +} diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/TransactionFactoryImpl.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/TransactionFactoryImpl.java index b10ff167..3b35db50 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/TransactionFactoryImpl.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/TransactionFactoryImpl.java @@ -18,13 +18,13 @@ public class TransactionFactoryImpl implements TransactionFactory { private class TransactionImpl implements TransactionPrivate { @Getter(AccessLevel.PUBLIC) private final long _id; - private final TransactionObjectSource _source; + private final ReadTrackingObjectSource _source; private final Map> _objects = new HashMap<>(); private TransactionImpl(long id, TransactionObjectSource source) { _id = id; - _source = source; + _source = new ReadTrackingObjectSource(source); } @Override @@ -75,9 +75,14 @@ public class TransactionFactoryImpl implements TransactionFactory { } @Override - public Collection> drain() { + public Collection> writes() { return Collections.unmodifiableCollection(_objects.values()); } + + @Override + public Map> reads() { + return _source.getRead(); + } } @Override diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/TransactionPrivate.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/TransactionPrivate.java index d2f12014..18a5f488 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/TransactionPrivate.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/TransactionPrivate.java @@ -1,8 +1,13 @@ package com.usatiuk.dhfs.objects.transaction; +import com.usatiuk.objects.common.runtime.JObjectKey; + import java.util.Collection; +import java.util.Map; // The transaction interface actually used by user code to retrieve objects -public interface TransactionPrivate extends Transaction{ - Collection> drain(); +public interface TransactionPrivate extends Transaction { + Collection> writes(); + + Map> reads(); } diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/TxRecord.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/TxRecord.java index dc0b590d..818ab340 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/TxRecord.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/TxRecord.java @@ -9,6 +9,13 @@ public class TxRecord { T getIfStrategyCompatible(JObjectKey key, LockingStrategy strategy); } + public record TxObjectRecordMissing(JObjectKey key) implements TxObjectRecord { + @Override + public T getIfStrategyCompatible(JObjectKey key, LockingStrategy strategy) { + return null; + } + } + public interface TxObjectRecordWrite extends TxObjectRecord { TransactionObject original();