diff --git a/dhfs-parent/objects-alloc/deployment/src/main/java/com/usatiuk/objects/alloc/deployment/ObjectsAllocProcessor.java b/dhfs-parent/objects-alloc/deployment/src/main/java/com/usatiuk/objects/alloc/deployment/ObjectsAllocProcessor.java index f839e363..91bedd0d 100644 --- a/dhfs-parent/objects-alloc/deployment/src/main/java/com/usatiuk/objects/alloc/deployment/ObjectsAllocProcessor.java +++ b/dhfs-parent/objects-alloc/deployment/src/main/java/com/usatiuk/objects/alloc/deployment/ObjectsAllocProcessor.java @@ -1,5 +1,6 @@ package com.usatiuk.objects.alloc.deployment; +import com.usatiuk.objects.alloc.runtime.ChangeTrackingJData; import com.usatiuk.objects.alloc.runtime.ObjectAllocator; import com.usatiuk.objects.common.runtime.JData; import com.usatiuk.objects.common.runtime.JObjectKey; @@ -120,7 +121,7 @@ class ObjectsAllocProcessor { for (var item : jDataItems) { try (ClassCreator classCreator = ClassCreator.builder() .className(getCTClassName(item.klass).toString()) - .interfaces(JData.class, ObjectAllocator.ChangeTrackingJData.class) + .interfaces(JData.class, ChangeTrackingJData.class) .interfaces(item.klass.name().toString()) .interfaces(Serializable.class) .classOutput(gizmoAdapter) @@ -192,7 +193,7 @@ class ObjectsAllocProcessor { for (var item : jDataItems) { try (ClassCreator classCreator = ClassCreator.builder() .className(getImmutableClassName(item.klass).toString()) - .interfaces(JData.class, ObjectAllocator.ChangeTrackingJData.class) + .interfaces(JData.class, ChangeTrackingJData.class) .interfaces(item.klass.name().toString()) .interfaces(Serializable.class) .classOutput(gizmoAdapter) @@ -357,7 +358,7 @@ class ObjectsAllocProcessor { methodCreator.throwException(IllegalArgumentException.class, "Unknown type"); } - try (MethodCreator methodCreator = classCreator.getMethodCreator("copy", ObjectAllocator.ChangeTrackingJData.class, JData.class)) { + try (MethodCreator methodCreator = classCreator.getMethodCreator("copy", ChangeTrackingJData.class, JData.class)) { matchClass(methodCreator, methodCreator.getMethodParam(0), classes, (type, branch, value) -> { branch.returnValue(branch.newInstance(MethodDescriptor.ofConstructor(getCTClassName(type).toString(), type.name().toString()), value)); }); diff --git a/dhfs-parent/objects-alloc/runtime/src/main/java/com/usatiuk/objects/alloc/runtime/ChangeTrackingJData.java b/dhfs-parent/objects-alloc/runtime/src/main/java/com/usatiuk/objects/alloc/runtime/ChangeTrackingJData.java new file mode 100644 index 00000000..846ab9fa --- /dev/null +++ b/dhfs-parent/objects-alloc/runtime/src/main/java/com/usatiuk/objects/alloc/runtime/ChangeTrackingJData.java @@ -0,0 +1,9 @@ +package com.usatiuk.objects.alloc.runtime; + +import com.usatiuk.objects.common.runtime.JData; + +public interface ChangeTrackingJData { + T wrapped(); + + boolean isModified(); +} diff --git a/dhfs-parent/objects-alloc/runtime/src/main/java/com/usatiuk/objects/alloc/runtime/ObjectAllocator.java b/dhfs-parent/objects-alloc/runtime/src/main/java/com/usatiuk/objects/alloc/runtime/ObjectAllocator.java index b4a3361e..763f1051 100644 --- a/dhfs-parent/objects-alloc/runtime/src/main/java/com/usatiuk/objects/alloc/runtime/ObjectAllocator.java +++ b/dhfs-parent/objects-alloc/runtime/src/main/java/com/usatiuk/objects/alloc/runtime/ObjectAllocator.java @@ -6,13 +6,6 @@ import com.usatiuk.objects.common.runtime.JObjectKey; public interface ObjectAllocator { T create(Class type, JObjectKey key); - interface ChangeTrackingJData { - T wrapped(); - - boolean isModified(); - } - - // A copy of data that can be modified without affecting the original, and that can track changes ChangeTrackingJData copy(T obj); T unmodifiable(T obj); diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/JObjectManager.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/JObjectManager.java index 0fa3b3a7..43a172fe 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/JObjectManager.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/JObjectManager.java @@ -1,5 +1,6 @@ package com.usatiuk.dhfs.objects; +import com.google.common.collect.Streams; import com.usatiuk.dhfs.objects.persistence.ObjectPersistentStore; import com.usatiuk.dhfs.objects.persistence.TxManifest; import com.usatiuk.dhfs.objects.transaction.*; @@ -42,7 +43,7 @@ public class JObjectManager { private static final Cleaner CLEANER = Cleaner.create(); final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); - long lastWriteTx = -1; + long lastWriteTx = -1; // FIXME: This should be persistent public JDataWrapper(T referent) { super(referent); @@ -77,7 +78,7 @@ public class JObjectManager { } else if (ref == null) { _objects.remove(key, got); } else { - throw new IllegalArgumentException("Object type mismatch"); + throw new IllegalArgumentException("Object type mismatch: " + ref.getClass() + " vs " + type); } } } @@ -85,7 +86,7 @@ public class JObjectManager { //noinspection unused try (var readLock = _storageReadLocker.lock(key)) { var read = objectStorage.readObject(key).orElse(null); - if (read == null) throw new IllegalArgumentException("Object not found"); + if (read == null) throw new IllegalArgumentException("Object not found: " + key); var got = objectSerializer.deserialize(read); @@ -97,7 +98,7 @@ public class JObjectManager { } else if (got == null) { return null; } else { - throw new IllegalArgumentException("Object type mismatch"); + throw new IllegalArgumentException("Object type mismatch: " + got.getClass() + " vs " + type); } } } @@ -133,7 +134,7 @@ public class JObjectManager { private record TransactionObjectImpl (T data, ReadWriteLock lock) - implements TransactionObjectSource.TransactionObject { + implements TransactionObject { } private class TransactionObjectSourceImpl implements TransactionObjectSource { @@ -150,33 +151,8 @@ public class JObjectManager { return Optional.of(new TransactionObjectImpl<>(got.obj(), got.wrapper().lock)); } - @Override - public Optional> getReadLocked(Class type, JObjectKey key) { - var got = JObjectManager.this.getLocked(type, key, false); - if (got == null) return Optional.empty(); - return Optional.of(new TransactionObjectImpl<>(got.obj(), got.wrapper().lock)); - } - @Override public Optional> getWriteLocked(Class type, JObjectKey key) { - var got = JObjectManager.this.getLocked(type, key, true); - if (got == null) return Optional.empty(); - return Optional.of(new TransactionObjectImpl<>(got.obj(), got.wrapper().lock)); - } - - @Override - public Optional> getReadLockedSerializable(Class type, JObjectKey key) { - var got = JObjectManager.this.getLocked(type, key, false); - if (got == null) return Optional.empty(); - if (got.wrapper().lastWriteTx >= _txId) { - got.wrapper().lock.readLock().unlock(); - throw new IllegalStateException("Serialization race"); - } - return Optional.of(new TransactionObjectImpl<>(got.obj(), got.wrapper().lock)); - } - - @Override - public Optional> getWriteLockedSerializable(Class type, JObjectKey key) { var got = JObjectManager.this.getLocked(type, key, true); if (got == null) return Optional.empty(); if (got.wrapper().lastWriteTx >= _txId) { @@ -187,8 +163,6 @@ public class JObjectManager { } } - ; - public TransactionPrivate createTransaction() { var counter = _txCounter.getAndIncrement(); Log.trace("Creating transaction " + counter); @@ -198,7 +172,9 @@ public class JObjectManager { public void commit(TransactionPrivate tx) { var toUnlock = new LinkedList(); var toFlush = new LinkedList>(); - var toLock = new ArrayList>(); + var toPut = new LinkedList>(); + var toLock = new ArrayList>(); + var dependencies = new LinkedList>(); Log.trace("Committing transaction " + tx.getId()); @@ -206,29 +182,22 @@ public class JObjectManager { for (var entry : tx.drain()) { Log.trace("Processing entry " + entry.toString()); switch (entry) { - case TxRecord.TxObjectRecordRead read -> toUnlock.add(read.original().lock().readLock()::unlock); - case TxRecord.TxObjectRecordReadSerializable read -> - toUnlock.add(read.original().lock().readLock()::unlock); case TxRecord.TxObjectRecordCopyLock copy -> { toUnlock.add(copy.original().lock().writeLock()::unlock); + dependencies.add(copy.original()); if (copy.copy().isModified()) { toFlush.add(copy); } } - case TxRecord.TxObjectRecordCopyLockSerializable copy -> { // FIXME - toUnlock.add(copy.original().lock().writeLock()::unlock); + case TxRecord.TxObjectRecordOptimistic copy -> { + toLock.add(copy); + dependencies.add(copy.original()); if (copy.copy().isModified()) { toFlush.add(copy); } } - case TxRecord.TxObjectRecordCopyNoLock copy -> { - if (copy.copy().isModified()) { - toLock.add(copy); - toFlush.add(copy); - } - } case TxRecord.TxObjectRecordNew created -> { - toFlush.add(created); + toPut.add(created); } default -> throw new IllegalStateException("Unexpected value: " + entry); } @@ -239,86 +208,77 @@ public class JObjectManager { for (var record : toLock) { Log.trace("Locking " + record.toString()); - var got = getLocked(record.original().getClass(), record.original().getKey(), true); + var got = getLocked(record.original().data().getClass(), record.original().data().getKey(), true); if (got == null) { - throw new IllegalStateException("Object not found"); + throw new IllegalStateException("Object " + record.original().data().getKey() + " not found"); } toUnlock.add(got.wrapper().lock.writeLock()::unlock); - if (got.obj() != record.original()) { - throw new IllegalStateException("Object changed during transaction"); + if (got.obj() != record.original().data()) { + throw new IllegalStateException("Object changed during transaction: " + got.obj() + " vs " + record.original().data()); } } - for (var record : toFlush) { - Log.trace("Processing flush entry " + record.toString()); + for (var dep : dependencies) { + Log.trace("Checking dependency " + dep.toString()); + var current = _objects.get(dep.data().getKey()); - var current = _objects.get(record.copy().wrapped().getKey()); + if (current.get() != dep.data()) { + throw new IllegalStateException("Object changed during transaction: " + current.get() + " vs " + dep.data()); + } - if (current == null && !(record instanceof TxRecord.TxObjectRecordNew)) { - throw new IllegalStateException("Object not found during transaction"); - } else if (current != null) { - var old = switch (record) { - case TxRecord.TxObjectRecordCopyLock copy -> copy.original().data(); - case TxRecord.TxObjectRecordCopyLockSerializable copy -> copy.original().data(); - case TxRecord.TxObjectRecordCopyNoLock copy -> copy.original(); - default -> throw new IllegalStateException("Unexpected value: " + record); - }; - - if (current.get() != old) { - assert record instanceof TxRecord.TxObjectRecordCopyNoLock; - throw new IllegalStateException("Object changed during transaction"); - } - - // In case of NoLock write, the instance is replaced and the following shouldn't happen - // It can happen without serializable writes, as then the read of object to transaction - // can happen after another transaction had read, changed and committed it. - if (record instanceof TxRecord.TxObjectRecordCopyLockSerializable - && current.lastWriteTx > tx.getId()) { - assert false; - // Shouldn't happen as we should check for serialization in the tx object source - throw new IllegalStateException("Transaction race"); - } - - var newWrapper = new JDataWrapper<>(record.copy().wrapped()); - newWrapper.lock.writeLock().lock(); - if (!_objects.replace(record.copy().wrapped().getKey(), current, newWrapper)) { - throw new IllegalStateException("Object changed during transaction"); - } - toUnlock.add(newWrapper.lock.writeLock()::unlock); - } else if (record instanceof TxRecord.TxObjectRecordNew created) { - var wrapper = new JDataWrapper<>(created.created()); - wrapper.lock.writeLock().lock(); - var old = _objects.putIfAbsent(created.created().getKey(), wrapper); - if (old != null) - throw new IllegalStateException("Object already exists"); - toUnlock.add(wrapper.lock.writeLock()::unlock); - } else { - throw new IllegalStateException("Object not found during transaction"); + if (current.lastWriteTx >= tx.getId()) { + throw new IllegalStateException("Serialization hazard: " + current.lastWriteTx + " vs " + tx.getId()); } } - // Have all locks now - for (var record : toFlush) { - Log.trace("Flushing " + record.toString() + " " + _objects.get(record.copy().wrapped().getKey()).toString()); + for (var put : toPut) { + Log.trace("Putting new object " + put.toString()); + var wrapper = new JDataWrapper<>(put.created()); + wrapper.lock.writeLock().lock(); + var old = _objects.putIfAbsent(put.created().getKey(), wrapper); + if (old != null) + throw new IllegalStateException("Object already exists: " + old.get()); + toUnlock.add(wrapper.lock.writeLock()::unlock); + } + for (var record : toFlush) { + Log.trace("Flushing changed " + record.toString()); + var current = _objects.get(record.original().data().getKey()); assert record.copy().isModified(); - var obj = record.copy().wrapped(); + var newWrapper = new JDataWrapper<>(record.copy().wrapped()); + newWrapper.lock.writeLock().lock(); + if (!_objects.replace(record.copy().wrapped().getKey(), current, newWrapper)) { + assert false; + throw new IllegalStateException("Object changed during transaction after locking: " + current.get() + " vs " + record.copy().wrapped()); + } + toUnlock.add(newWrapper.lock.writeLock()::unlock); + } + + Log.tracef("Flushing transaction %d to storage", tx.getId()); + + var written = Streams.concat(toFlush.stream().map(f -> f.copy().wrapped()), + toPut.stream().map(TxRecord.TxObjectRecordNew::created)).toList(); + + // Really flushing to storage + written.forEach(obj -> { + Log.trace("Flushing object " + obj.getKey()); var key = obj.getKey(); var data = objectSerializer.serialize(obj); objectStorage.writeObject(key, data); _objects.get(key).lastWriteTx = tx.getId(); // FIXME: - } + }); - Log.trace("Flushing transaction " + tx.getId()); + Log.tracef("Committing transaction %d to storage", tx.getId()); objectStorage.commitTx(new TxManifest() { @Override public List getWritten() { - return toFlush.stream().map(r -> r.copy().wrapped().getKey()).toList(); + // FIXME: + return written.stream().map(JData::getKey).toList(); } @Override diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/TransactionManagerImpl.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/TransactionManagerImpl.java index 83e8f2ec..f872e932 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/TransactionManagerImpl.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/TransactionManagerImpl.java @@ -47,10 +47,7 @@ public class TransactionManagerImpl implements TransactionManager { var tx = _currentTransaction.get(); for (var o : tx.drain()) { switch (o) { - case TxRecord.TxObjectRecordRead r -> r.original().lock().readLock().unlock(); - case TxRecord.TxObjectRecordReadSerializable r -> r.original().lock().readLock().unlock(); case TxRecord.TxObjectRecordCopyLock r -> r.original().lock().writeLock().unlock(); - case TxRecord.TxObjectRecordCopyLockSerializable r -> r.original().lock().writeLock().unlock(); default -> { } } diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/LockingStrategy.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/LockingStrategy.java index 52cc750b..1cf28822 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/LockingStrategy.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/LockingStrategy.java @@ -1,9 +1,6 @@ package com.usatiuk.dhfs.objects.transaction; public enum LockingStrategy { - READ, // Read only, no writes allowed, blocks writers - READ_SERIALIZABLE, // Exception if object was written to after transaction start - OPTIMISTIC, // Optimistic write, no blocking other possible writers + OPTIMISTIC, // Optimistic write, no blocking other possible writers/readers WRITE, // Write lock, blocks all other writers - WRITE_SERIALIZABLE // Exception if object was written to after transaction start } diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/Transaction.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/Transaction.java index f92368fe..544c3a5c 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/Transaction.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/Transaction.java @@ -12,6 +12,6 @@ public interface Transaction { void putObject(JData obj); default Optional getObject(Class type, JObjectKey key) { - return getObject(type, key, LockingStrategy.READ); + return getObject(type, key, LockingStrategy.OPTIMISTIC); } } diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/TransactionFactoryImpl.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/TransactionFactoryImpl.java index f4e364b9..b10ff167 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/TransactionFactoryImpl.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/TransactionFactoryImpl.java @@ -42,46 +42,22 @@ public class TransactionFactoryImpl implements TransactionFactory { } switch (strategy) { - case READ: - case READ_SERIALIZABLE: { - var locked = strategy == LockingStrategy.READ_SERIALIZABLE - ? _source.getReadLockedSerializable(type, key).orElse(null) - : _source.getReadLocked(type, key).orElse(null); - if (locked == null) { - return Optional.empty(); - } - var view = objectAllocator.unmodifiable(locked.data()); - _objects.put(key, - strategy == LockingStrategy.READ_SERIALIZABLE - ? new TxRecord.TxObjectRecordReadSerializable<>(locked, view) - : new TxRecord.TxObjectRecordRead<>(locked, view) - ); - return Optional.of(view); - } case OPTIMISTIC: { var read = _source.get(type, key).orElse(null); - if (read == null) { return Optional.empty(); } var copy = objectAllocator.copy(read.data()); - _objects.put(key, new TxRecord.TxObjectRecordCopyNoLock<>(read.data(), copy)); + _objects.put(key, new TxRecord.TxObjectRecordOptimistic<>(read, copy)); return Optional.of(copy.wrapped()); } - case WRITE: - case WRITE_SERIALIZABLE: { - var locked = strategy == LockingStrategy.WRITE_SERIALIZABLE - ? _source.getWriteLockedSerializable(type, key).orElse(null) - : _source.getWriteLocked(type, key).orElse(null); + case WRITE: { + var locked = _source.getWriteLocked(type, key).orElse(null); if (locked == null) { return Optional.empty(); } var copy = objectAllocator.copy(locked.data()); - _objects.put(key, - strategy == LockingStrategy.WRITE_SERIALIZABLE - ? new TxRecord.TxObjectRecordCopyLockSerializable<>(locked, copy) - : new TxRecord.TxObjectRecordCopyLock<>(locked, copy) - ); + _objects.put(key, new TxRecord.TxObjectRecordCopyLock<>(locked, copy)); return Optional.of(copy.wrapped()); } default: diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/TransactionObject.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/TransactionObject.java new file mode 100644 index 00000000..cd5dc7e6 --- /dev/null +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/TransactionObject.java @@ -0,0 +1,11 @@ +package com.usatiuk.dhfs.objects.transaction; + +import com.usatiuk.objects.common.runtime.JData; + +import java.util.concurrent.locks.ReadWriteLock; + +public interface TransactionObject { + T data(); + + ReadWriteLock lock(); +} diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/TransactionObjectSource.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/TransactionObjectSource.java index 815f7aff..14835797 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/TransactionObjectSource.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/TransactionObjectSource.java @@ -4,22 +4,9 @@ import com.usatiuk.objects.common.runtime.JData; import com.usatiuk.objects.common.runtime.JObjectKey; import java.util.Optional; -import java.util.concurrent.locks.ReadWriteLock; public interface TransactionObjectSource { - interface TransactionObject { - T data(); - - ReadWriteLock lock(); - } - Optional> get(Class type, JObjectKey key); - Optional> getReadLocked(Class type, JObjectKey key); - Optional> getWriteLocked(Class type, JObjectKey key); - - Optional> getReadLockedSerializable(Class type, JObjectKey key); - - Optional> getWriteLockedSerializable(Class type, JObjectKey key); } diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/TxRecord.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/TxRecord.java index 039bd88e..dc0b590d 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/TxRecord.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/TxRecord.java @@ -1,8 +1,8 @@ package com.usatiuk.dhfs.objects.transaction; +import com.usatiuk.objects.alloc.runtime.ChangeTrackingJData; import com.usatiuk.objects.common.runtime.JData; import com.usatiuk.objects.common.runtime.JObjectKey; -import com.usatiuk.objects.alloc.runtime.ObjectAllocator; public class TxRecord { public interface TxObjectRecord { @@ -10,58 +10,22 @@ public class TxRecord { } public interface TxObjectRecordWrite extends TxObjectRecord { - ObjectAllocator.ChangeTrackingJData copy(); + TransactionObject original(); + + ChangeTrackingJData copy(); } - public record TxObjectRecordRead(TransactionObjectSource.TransactionObject original, - T copy) - implements TxObjectRecord { - @Override - public T getIfStrategyCompatible(JObjectKey key, LockingStrategy strategy) { - if (strategy == LockingStrategy.READ) - return copy; - return null; - } - } - - public record TxObjectRecordReadSerializable(TransactionObjectSource.TransactionObject original, - T copy) - implements TxObjectRecord { - @Override - public T getIfStrategyCompatible(JObjectKey key, LockingStrategy strategy) { - if (strategy == LockingStrategy.READ_SERIALIZABLE) - return copy; - return null; - } - } - - public record TxObjectRecordNew(T created) - implements TxObjectRecordWrite { + public record TxObjectRecordNew(T created) implements TxObjectRecord { @Override public T getIfStrategyCompatible(JObjectKey key, LockingStrategy strategy) { if (strategy == LockingStrategy.WRITE || strategy == LockingStrategy.OPTIMISTIC) return created; return null; } - - @Override - public ObjectAllocator.ChangeTrackingJData copy() { - return new ObjectAllocator.ChangeTrackingJData() { - @Override - public T wrapped() { - return created; - } - - @Override - public boolean isModified() { - return true; - } - }; - } } - public record TxObjectRecordCopyLock(TransactionObjectSource.TransactionObject original, - ObjectAllocator.ChangeTrackingJData copy) + public record TxObjectRecordCopyLock(TransactionObject original, + ChangeTrackingJData copy) implements TxObjectRecordWrite { @Override public T getIfStrategyCompatible(JObjectKey key, LockingStrategy strategy) { @@ -71,20 +35,8 @@ public class TxRecord { } } - public record TxObjectRecordCopyLockSerializable( - TransactionObjectSource.TransactionObject original, - ObjectAllocator.ChangeTrackingJData copy) - implements TxObjectRecordWrite { - @Override - public T getIfStrategyCompatible(JObjectKey key, LockingStrategy strategy) { - if (strategy == LockingStrategy.WRITE_SERIALIZABLE) - return copy.wrapped(); - return null; - } - } - - public record TxObjectRecordCopyNoLock(T original, - ObjectAllocator.ChangeTrackingJData copy) + public record TxObjectRecordOptimistic(TransactionObject original, + ChangeTrackingJData copy) implements TxObjectRecordWrite { @Override public T getIfStrategyCompatible(JObjectKey key, LockingStrategy strategy) { diff --git a/dhfs-parent/objects/src/test/java/com/usatiuk/dhfs/objects/ObjectsTest.java b/dhfs-parent/objects/src/test/java/com/usatiuk/dhfs/objects/ObjectsTest.java index a175cd0e..89e2d8d7 100644 --- a/dhfs-parent/objects/src/test/java/com/usatiuk/dhfs/objects/ObjectsTest.java +++ b/dhfs-parent/objects/src/test/java/com/usatiuk/dhfs/objects/ObjectsTest.java @@ -9,8 +9,9 @@ import io.quarkus.logging.Log; import io.quarkus.test.junit.QuarkusTest; import jakarta.inject.Inject; import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; @@ -39,7 +40,29 @@ public class ObjectsTest { { txm.begin(); - var parent = curTx.getObject(Parent.class, new JObjectKey("Parent"), LockingStrategy.READ).orElse(null); + var parent = curTx.getObject(Parent.class, new JObjectKey("Parent")).orElse(null); + Assertions.assertEquals("John", parent.getLastName()); + txm.commit(); + } + } + + @Test + void createCreateObject() { + { + txm.begin(); + var newParent = alloc.create(Parent.class, new JObjectKey("Parent")); + newParent.setLastName("John"); + curTx.putObject(newParent); + txm.commit(); + } + Assertions.assertThrows(Exception.class, () -> txm.run(() -> { + var newParent = alloc.create(Parent.class, new JObjectKey("Parent")); + newParent.setLastName("John2"); + curTx.putObject(newParent); + })); + { + txm.begin(); + var parent = curTx.getObject(Parent.class, new JObjectKey("Parent")).orElse(null); Assertions.assertEquals("John", parent.getLastName()); txm.commit(); } @@ -73,7 +96,7 @@ public class ObjectsTest { { txm.begin(); - var parent = curTx.getObject(Parent.class, new JObjectKey("Parent3"), LockingStrategy.READ).orElse(null); + var parent = curTx.getObject(Parent.class, new JObjectKey("Parent3")).orElse(null); Assertions.assertEquals("John3", parent.getLastName()); txm.commit(); } @@ -123,7 +146,7 @@ public class ObjectsTest { latch.await(); txm.begin(); - var got = curTx.getObject(Parent.class, new JObjectKey("Parent2"), LockingStrategy.READ).orElse(null); + var got = curTx.getObject(Parent.class, new JObjectKey("Parent2")).orElse(null); txm.commit(); if (!thread1Failed.get()) { @@ -136,11 +159,13 @@ public class ObjectsTest { } } - @Test - void editConflict() throws InterruptedException { + @ParameterizedTest + @EnumSource(LockingStrategy.class) + void editConflict(LockingStrategy strategy) throws InterruptedException { + String key = "Parent4" + strategy.name(); { txm.begin(); - var newParent = alloc.create(Parent.class, new JObjectKey("Parent4")); + var newParent = alloc.create(Parent.class, new JObjectKey(key)); newParent.setLastName("John3"); curTx.putObject(newParent); txm.commit(); @@ -157,7 +182,7 @@ public class ObjectsTest { Log.warn("Thread 1"); txm.begin(); barrier.await(); - var parent = curTx.getObject(Parent.class, new JObjectKey("Parent4"), LockingStrategy.OPTIMISTIC).orElse(null); + var parent = curTx.getObject(Parent.class, new JObjectKey(key), strategy).orElse(null); parent.setLastName("John"); Log.warn("Thread 1 commit"); txm.commit(); @@ -172,7 +197,7 @@ public class ObjectsTest { Log.warn("Thread 2"); txm.begin(); barrier.await(); - var parent = curTx.getObject(Parent.class, new JObjectKey("Parent4"), LockingStrategy.OPTIMISTIC).orElse(null); + var parent = curTx.getObject(Parent.class, new JObjectKey(key), strategy).orElse(null); parent.setLastName("John2"); Log.warn("Thread 2 commit"); txm.commit(); @@ -186,7 +211,7 @@ public class ObjectsTest { latchEnd.await(); txm.begin(); - var got = curTx.getObject(Parent.class, new JObjectKey("Parent4"), LockingStrategy.READ).orElse(null); + var got = curTx.getObject(Parent.class, new JObjectKey(key)).orElse(null); txm.commit(); if (!thread1Failed.get()) { @@ -201,128 +226,6 @@ public class ObjectsTest { Assertions.assertTrue(thread1Failed.get() || thread2Failed.get()); } - @Test - void editLock() throws InterruptedException { - { - txm.begin(); - var newParent = alloc.create(Parent.class, new JObjectKey("Parent5")); - newParent.setLastName("John3"); - curTx.putObject(newParent); - txm.commit(); - } - - AtomicBoolean thread1Failed = new AtomicBoolean(true); - AtomicBoolean thread2Failed = new AtomicBoolean(true); - - var barrier = new CyclicBarrier(2); - var latch = new CountDownLatch(2); - - Just.run(() -> { - try { - Log.warn("Thread 1"); - txm.begin(); - barrier.await(); - var parent = curTx.getObject(Parent.class, new JObjectKey("Parent5"), LockingStrategy.WRITE).orElse(null); - parent.setLastName("John"); - Log.warn("Thread 1 commit"); - txm.commit(); - thread1Failed.set(false); - return null; - } finally { - latch.countDown(); - } - }); - Just.run(() -> { - try { - Log.warn("Thread 2"); - txm.begin(); - barrier.await(); - var parent = curTx.getObject(Parent.class, new JObjectKey("Parent5"), LockingStrategy.WRITE).orElse(null); - parent.setLastName("John2"); - Log.warn("Thread 2 commit"); - txm.commit(); - thread2Failed.set(false); - return null; - } finally { - latch.countDown(); - } - }); - - latch.await(); - - txm.begin(); - var got = curTx.getObject(Parent.class, new JObjectKey("Parent5"), LockingStrategy.READ).orElse(null); - txm.commit(); - - Assertions.assertTrue(!thread1Failed.get() && !thread2Failed.get()); - Assertions.assertTrue(got.getLastName().equals("John") || got.getLastName().equals("John2")); - } - - @Test - @Disabled // Doesn't work as "lastWrittenTx" is not persistent - void editLockSerializable() throws InterruptedException { - { - txm.begin(); - var newParent = alloc.create(Parent.class, new JObjectKey("Parent6")); - newParent.setLastName("John3"); - curTx.putObject(newParent); - txm.commit(); - } - - AtomicBoolean thread1Failed = new AtomicBoolean(true); - AtomicBoolean thread2Failed = new AtomicBoolean(true); - - var barrier = new CyclicBarrier(2); - var latchEnd = new CountDownLatch(2); - - Just.run(() -> { - try { - Log.warn("Thread 1"); - txm.begin(); - barrier.await(); - var parent = curTx.getObject(Parent.class, new JObjectKey("Parent6"), LockingStrategy.WRITE_SERIALIZABLE).orElse(null); - parent.setLastName("John"); - Log.warn("Thread 1 commit"); - txm.commit(); - thread1Failed.set(false); - return null; - } finally { - latchEnd.countDown(); - } - }); - Just.run(() -> { - try { - Log.warn("Thread 2"); - txm.begin(); - barrier.await(); - var parent = curTx.getObject(Parent.class, new JObjectKey("Parent6"), LockingStrategy.WRITE_SERIALIZABLE).orElse(null); - parent.setLastName("John2"); - Log.warn("Thread 2 commit"); - txm.commit(); - thread2Failed.set(false); - return null; - } finally { - latchEnd.countDown(); - } - }); - - latchEnd.await(); - - txm.begin(); - var got = curTx.getObject(Parent.class, new JObjectKey("Parent6"), LockingStrategy.READ).orElse(null); - txm.commit(); - - if (!thread1Failed.get()) { - Assertions.assertTrue(thread2Failed.get()); - Assertions.assertEquals("John", got.getLastName()); - } else if (!thread2Failed.get()) { - Assertions.assertEquals("John2", got.getLastName()); - } else { - Assertions.fail("No thread succeeded"); - } - - Assertions.assertTrue(thread1Failed.get() || thread2Failed.get()); - } // } // // @Test