mirror of
https://github.com/usatiuk/dhfs.git
synced 2025-10-28 20:47:49 +01:00
simplify transaction isolation
This commit is contained in:
@@ -1,5 +1,6 @@
|
||||
package com.usatiuk.objects.alloc.deployment;
|
||||
|
||||
import com.usatiuk.objects.alloc.runtime.ChangeTrackingJData;
|
||||
import com.usatiuk.objects.alloc.runtime.ObjectAllocator;
|
||||
import com.usatiuk.objects.common.runtime.JData;
|
||||
import com.usatiuk.objects.common.runtime.JObjectKey;
|
||||
@@ -120,7 +121,7 @@ class ObjectsAllocProcessor {
|
||||
for (var item : jDataItems) {
|
||||
try (ClassCreator classCreator = ClassCreator.builder()
|
||||
.className(getCTClassName(item.klass).toString())
|
||||
.interfaces(JData.class, ObjectAllocator.ChangeTrackingJData.class)
|
||||
.interfaces(JData.class, ChangeTrackingJData.class)
|
||||
.interfaces(item.klass.name().toString())
|
||||
.interfaces(Serializable.class)
|
||||
.classOutput(gizmoAdapter)
|
||||
@@ -192,7 +193,7 @@ class ObjectsAllocProcessor {
|
||||
for (var item : jDataItems) {
|
||||
try (ClassCreator classCreator = ClassCreator.builder()
|
||||
.className(getImmutableClassName(item.klass).toString())
|
||||
.interfaces(JData.class, ObjectAllocator.ChangeTrackingJData.class)
|
||||
.interfaces(JData.class, ChangeTrackingJData.class)
|
||||
.interfaces(item.klass.name().toString())
|
||||
.interfaces(Serializable.class)
|
||||
.classOutput(gizmoAdapter)
|
||||
@@ -357,7 +358,7 @@ class ObjectsAllocProcessor {
|
||||
methodCreator.throwException(IllegalArgumentException.class, "Unknown type");
|
||||
}
|
||||
|
||||
try (MethodCreator methodCreator = classCreator.getMethodCreator("copy", ObjectAllocator.ChangeTrackingJData.class, JData.class)) {
|
||||
try (MethodCreator methodCreator = classCreator.getMethodCreator("copy", ChangeTrackingJData.class, JData.class)) {
|
||||
matchClass(methodCreator, methodCreator.getMethodParam(0), classes, (type, branch, value) -> {
|
||||
branch.returnValue(branch.newInstance(MethodDescriptor.ofConstructor(getCTClassName(type).toString(), type.name().toString()), value));
|
||||
});
|
||||
|
||||
@@ -0,0 +1,9 @@
|
||||
package com.usatiuk.objects.alloc.runtime;
|
||||
|
||||
import com.usatiuk.objects.common.runtime.JData;
|
||||
|
||||
public interface ChangeTrackingJData<T extends JData> {
|
||||
T wrapped();
|
||||
|
||||
boolean isModified();
|
||||
}
|
||||
@@ -6,13 +6,6 @@ import com.usatiuk.objects.common.runtime.JObjectKey;
|
||||
public interface ObjectAllocator {
|
||||
<T extends JData> T create(Class<T> type, JObjectKey key);
|
||||
|
||||
interface ChangeTrackingJData<T extends JData> {
|
||||
T wrapped();
|
||||
|
||||
boolean isModified();
|
||||
}
|
||||
|
||||
// A copy of data that can be modified without affecting the original, and that can track changes
|
||||
<T extends JData> ChangeTrackingJData<T> copy(T obj);
|
||||
|
||||
<T extends JData> T unmodifiable(T obj);
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
package com.usatiuk.dhfs.objects;
|
||||
|
||||
import com.google.common.collect.Streams;
|
||||
import com.usatiuk.dhfs.objects.persistence.ObjectPersistentStore;
|
||||
import com.usatiuk.dhfs.objects.persistence.TxManifest;
|
||||
import com.usatiuk.dhfs.objects.transaction.*;
|
||||
@@ -42,7 +43,7 @@ public class JObjectManager {
|
||||
private static final Cleaner CLEANER = Cleaner.create();
|
||||
|
||||
final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
|
||||
long lastWriteTx = -1;
|
||||
long lastWriteTx = -1; // FIXME: This should be persistent
|
||||
|
||||
public JDataWrapper(T referent) {
|
||||
super(referent);
|
||||
@@ -77,7 +78,7 @@ public class JObjectManager {
|
||||
} else if (ref == null) {
|
||||
_objects.remove(key, got);
|
||||
} else {
|
||||
throw new IllegalArgumentException("Object type mismatch");
|
||||
throw new IllegalArgumentException("Object type mismatch: " + ref.getClass() + " vs " + type);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -85,7 +86,7 @@ public class JObjectManager {
|
||||
//noinspection unused
|
||||
try (var readLock = _storageReadLocker.lock(key)) {
|
||||
var read = objectStorage.readObject(key).orElse(null);
|
||||
if (read == null) throw new IllegalArgumentException("Object not found");
|
||||
if (read == null) throw new IllegalArgumentException("Object not found: " + key);
|
||||
|
||||
var got = objectSerializer.deserialize(read);
|
||||
|
||||
@@ -97,7 +98,7 @@ public class JObjectManager {
|
||||
} else if (got == null) {
|
||||
return null;
|
||||
} else {
|
||||
throw new IllegalArgumentException("Object type mismatch");
|
||||
throw new IllegalArgumentException("Object type mismatch: " + got.getClass() + " vs " + type);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -133,7 +134,7 @@ public class JObjectManager {
|
||||
|
||||
private record TransactionObjectImpl<T extends JData>
|
||||
(T data, ReadWriteLock lock)
|
||||
implements TransactionObjectSource.TransactionObject<T> {
|
||||
implements TransactionObject<T> {
|
||||
}
|
||||
|
||||
private class TransactionObjectSourceImpl implements TransactionObjectSource {
|
||||
@@ -150,33 +151,8 @@ public class JObjectManager {
|
||||
return Optional.of(new TransactionObjectImpl<>(got.obj(), got.wrapper().lock));
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T extends JData> Optional<TransactionObject<T>> getReadLocked(Class<T> type, JObjectKey key) {
|
||||
var got = JObjectManager.this.getLocked(type, key, false);
|
||||
if (got == null) return Optional.empty();
|
||||
return Optional.of(new TransactionObjectImpl<>(got.obj(), got.wrapper().lock));
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T extends JData> Optional<TransactionObject<T>> getWriteLocked(Class<T> type, JObjectKey key) {
|
||||
var got = JObjectManager.this.getLocked(type, key, true);
|
||||
if (got == null) return Optional.empty();
|
||||
return Optional.of(new TransactionObjectImpl<>(got.obj(), got.wrapper().lock));
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T extends JData> Optional<TransactionObject<T>> getReadLockedSerializable(Class<T> type, JObjectKey key) {
|
||||
var got = JObjectManager.this.getLocked(type, key, false);
|
||||
if (got == null) return Optional.empty();
|
||||
if (got.wrapper().lastWriteTx >= _txId) {
|
||||
got.wrapper().lock.readLock().unlock();
|
||||
throw new IllegalStateException("Serialization race");
|
||||
}
|
||||
return Optional.of(new TransactionObjectImpl<>(got.obj(), got.wrapper().lock));
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T extends JData> Optional<TransactionObject<T>> getWriteLockedSerializable(Class<T> type, JObjectKey key) {
|
||||
var got = JObjectManager.this.getLocked(type, key, true);
|
||||
if (got == null) return Optional.empty();
|
||||
if (got.wrapper().lastWriteTx >= _txId) {
|
||||
@@ -187,8 +163,6 @@ public class JObjectManager {
|
||||
}
|
||||
}
|
||||
|
||||
;
|
||||
|
||||
public TransactionPrivate createTransaction() {
|
||||
var counter = _txCounter.getAndIncrement();
|
||||
Log.trace("Creating transaction " + counter);
|
||||
@@ -198,7 +172,9 @@ public class JObjectManager {
|
||||
public void commit(TransactionPrivate tx) {
|
||||
var toUnlock = new LinkedList<VoidFn>();
|
||||
var toFlush = new LinkedList<TxRecord.TxObjectRecordWrite<?>>();
|
||||
var toLock = new ArrayList<TxRecord.TxObjectRecordCopyNoLock<?>>();
|
||||
var toPut = new LinkedList<TxRecord.TxObjectRecordNew<?>>();
|
||||
var toLock = new ArrayList<TxRecord.TxObjectRecordOptimistic<?>>();
|
||||
var dependencies = new LinkedList<TransactionObject<?>>();
|
||||
|
||||
Log.trace("Committing transaction " + tx.getId());
|
||||
|
||||
@@ -206,29 +182,22 @@ public class JObjectManager {
|
||||
for (var entry : tx.drain()) {
|
||||
Log.trace("Processing entry " + entry.toString());
|
||||
switch (entry) {
|
||||
case TxRecord.TxObjectRecordRead<?> read -> toUnlock.add(read.original().lock().readLock()::unlock);
|
||||
case TxRecord.TxObjectRecordReadSerializable<?> read ->
|
||||
toUnlock.add(read.original().lock().readLock()::unlock);
|
||||
case TxRecord.TxObjectRecordCopyLock<?> copy -> {
|
||||
toUnlock.add(copy.original().lock().writeLock()::unlock);
|
||||
dependencies.add(copy.original());
|
||||
if (copy.copy().isModified()) {
|
||||
toFlush.add(copy);
|
||||
}
|
||||
}
|
||||
case TxRecord.TxObjectRecordCopyLockSerializable<?> copy -> { // FIXME
|
||||
toUnlock.add(copy.original().lock().writeLock()::unlock);
|
||||
case TxRecord.TxObjectRecordOptimistic<?> copy -> {
|
||||
toLock.add(copy);
|
||||
dependencies.add(copy.original());
|
||||
if (copy.copy().isModified()) {
|
||||
toFlush.add(copy);
|
||||
}
|
||||
}
|
||||
case TxRecord.TxObjectRecordCopyNoLock<?> copy -> {
|
||||
if (copy.copy().isModified()) {
|
||||
toLock.add(copy);
|
||||
toFlush.add(copy);
|
||||
}
|
||||
}
|
||||
case TxRecord.TxObjectRecordNew<?> created -> {
|
||||
toFlush.add(created);
|
||||
toPut.add(created);
|
||||
}
|
||||
default -> throw new IllegalStateException("Unexpected value: " + entry);
|
||||
}
|
||||
@@ -239,86 +208,77 @@ public class JObjectManager {
|
||||
for (var record : toLock) {
|
||||
Log.trace("Locking " + record.toString());
|
||||
|
||||
var got = getLocked(record.original().getClass(), record.original().getKey(), true);
|
||||
var got = getLocked(record.original().data().getClass(), record.original().data().getKey(), true);
|
||||
|
||||
if (got == null) {
|
||||
throw new IllegalStateException("Object not found");
|
||||
throw new IllegalStateException("Object " + record.original().data().getKey() + " not found");
|
||||
}
|
||||
|
||||
toUnlock.add(got.wrapper().lock.writeLock()::unlock);
|
||||
|
||||
if (got.obj() != record.original()) {
|
||||
throw new IllegalStateException("Object changed during transaction");
|
||||
if (got.obj() != record.original().data()) {
|
||||
throw new IllegalStateException("Object changed during transaction: " + got.obj() + " vs " + record.original().data());
|
||||
}
|
||||
}
|
||||
|
||||
for (var record : toFlush) {
|
||||
Log.trace("Processing flush entry " + record.toString());
|
||||
for (var dep : dependencies) {
|
||||
Log.trace("Checking dependency " + dep.toString());
|
||||
var current = _objects.get(dep.data().getKey());
|
||||
|
||||
var current = _objects.get(record.copy().wrapped().getKey());
|
||||
if (current.get() != dep.data()) {
|
||||
throw new IllegalStateException("Object changed during transaction: " + current.get() + " vs " + dep.data());
|
||||
}
|
||||
|
||||
if (current == null && !(record instanceof TxRecord.TxObjectRecordNew<?>)) {
|
||||
throw new IllegalStateException("Object not found during transaction");
|
||||
} else if (current != null) {
|
||||
var old = switch (record) {
|
||||
case TxRecord.TxObjectRecordCopyLock<?> copy -> copy.original().data();
|
||||
case TxRecord.TxObjectRecordCopyLockSerializable<?> copy -> copy.original().data();
|
||||
case TxRecord.TxObjectRecordCopyNoLock<?> copy -> copy.original();
|
||||
default -> throw new IllegalStateException("Unexpected value: " + record);
|
||||
};
|
||||
|
||||
if (current.get() != old) {
|
||||
assert record instanceof TxRecord.TxObjectRecordCopyNoLock;
|
||||
throw new IllegalStateException("Object changed during transaction");
|
||||
}
|
||||
|
||||
// In case of NoLock write, the instance is replaced and the following shouldn't happen
|
||||
// It can happen without serializable writes, as then the read of object to transaction
|
||||
// can happen after another transaction had read, changed and committed it.
|
||||
if (record instanceof TxRecord.TxObjectRecordCopyLockSerializable
|
||||
&& current.lastWriteTx > tx.getId()) {
|
||||
assert false;
|
||||
// Shouldn't happen as we should check for serialization in the tx object source
|
||||
throw new IllegalStateException("Transaction race");
|
||||
}
|
||||
|
||||
var newWrapper = new JDataWrapper<>(record.copy().wrapped());
|
||||
newWrapper.lock.writeLock().lock();
|
||||
if (!_objects.replace(record.copy().wrapped().getKey(), current, newWrapper)) {
|
||||
throw new IllegalStateException("Object changed during transaction");
|
||||
}
|
||||
toUnlock.add(newWrapper.lock.writeLock()::unlock);
|
||||
} else if (record instanceof TxRecord.TxObjectRecordNew<?> created) {
|
||||
var wrapper = new JDataWrapper<>(created.created());
|
||||
wrapper.lock.writeLock().lock();
|
||||
var old = _objects.putIfAbsent(created.created().getKey(), wrapper);
|
||||
if (old != null)
|
||||
throw new IllegalStateException("Object already exists");
|
||||
toUnlock.add(wrapper.lock.writeLock()::unlock);
|
||||
} else {
|
||||
throw new IllegalStateException("Object not found during transaction");
|
||||
if (current.lastWriteTx >= tx.getId()) {
|
||||
throw new IllegalStateException("Serialization hazard: " + current.lastWriteTx + " vs " + tx.getId());
|
||||
}
|
||||
}
|
||||
|
||||
// Have all locks now
|
||||
for (var record : toFlush) {
|
||||
Log.trace("Flushing " + record.toString() + " " + _objects.get(record.copy().wrapped().getKey()).toString());
|
||||
for (var put : toPut) {
|
||||
Log.trace("Putting new object " + put.toString());
|
||||
var wrapper = new JDataWrapper<>(put.created());
|
||||
wrapper.lock.writeLock().lock();
|
||||
var old = _objects.putIfAbsent(put.created().getKey(), wrapper);
|
||||
if (old != null)
|
||||
throw new IllegalStateException("Object already exists: " + old.get());
|
||||
toUnlock.add(wrapper.lock.writeLock()::unlock);
|
||||
}
|
||||
|
||||
for (var record : toFlush) {
|
||||
Log.trace("Flushing changed " + record.toString());
|
||||
var current = _objects.get(record.original().data().getKey());
|
||||
assert record.copy().isModified();
|
||||
|
||||
var obj = record.copy().wrapped();
|
||||
var newWrapper = new JDataWrapper<>(record.copy().wrapped());
|
||||
newWrapper.lock.writeLock().lock();
|
||||
if (!_objects.replace(record.copy().wrapped().getKey(), current, newWrapper)) {
|
||||
assert false;
|
||||
throw new IllegalStateException("Object changed during transaction after locking: " + current.get() + " vs " + record.copy().wrapped());
|
||||
}
|
||||
toUnlock.add(newWrapper.lock.writeLock()::unlock);
|
||||
}
|
||||
|
||||
Log.tracef("Flushing transaction %d to storage", tx.getId());
|
||||
|
||||
var written = Streams.concat(toFlush.stream().map(f -> f.copy().wrapped()),
|
||||
toPut.stream().map(TxRecord.TxObjectRecordNew::created)).toList();
|
||||
|
||||
// Really flushing to storage
|
||||
written.forEach(obj -> {
|
||||
Log.trace("Flushing object " + obj.getKey());
|
||||
var key = obj.getKey();
|
||||
var data = objectSerializer.serialize(obj);
|
||||
objectStorage.writeObject(key, data);
|
||||
_objects.get(key).lastWriteTx = tx.getId(); // FIXME:
|
||||
}
|
||||
});
|
||||
|
||||
Log.trace("Flushing transaction " + tx.getId());
|
||||
Log.tracef("Committing transaction %d to storage", tx.getId());
|
||||
|
||||
objectStorage.commitTx(new TxManifest() {
|
||||
@Override
|
||||
public List<JObjectKey> getWritten() {
|
||||
return toFlush.stream().map(r -> r.copy().wrapped().getKey()).toList();
|
||||
// FIXME:
|
||||
return written.stream().map(JData::getKey).toList();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -47,10 +47,7 @@ public class TransactionManagerImpl implements TransactionManager {
|
||||
var tx = _currentTransaction.get();
|
||||
for (var o : tx.drain()) {
|
||||
switch (o) {
|
||||
case TxRecord.TxObjectRecordRead<?> r -> r.original().lock().readLock().unlock();
|
||||
case TxRecord.TxObjectRecordReadSerializable<?> r -> r.original().lock().readLock().unlock();
|
||||
case TxRecord.TxObjectRecordCopyLock<?> r -> r.original().lock().writeLock().unlock();
|
||||
case TxRecord.TxObjectRecordCopyLockSerializable<?> r -> r.original().lock().writeLock().unlock();
|
||||
default -> {
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,9 +1,6 @@
|
||||
package com.usatiuk.dhfs.objects.transaction;
|
||||
|
||||
public enum LockingStrategy {
|
||||
READ, // Read only, no writes allowed, blocks writers
|
||||
READ_SERIALIZABLE, // Exception if object was written to after transaction start
|
||||
OPTIMISTIC, // Optimistic write, no blocking other possible writers
|
||||
OPTIMISTIC, // Optimistic write, no blocking other possible writers/readers
|
||||
WRITE, // Write lock, blocks all other writers
|
||||
WRITE_SERIALIZABLE // Exception if object was written to after transaction start
|
||||
}
|
||||
|
||||
@@ -12,6 +12,6 @@ public interface Transaction {
|
||||
<T extends JData> void putObject(JData obj);
|
||||
|
||||
default <T extends JData> Optional<T> getObject(Class<T> type, JObjectKey key) {
|
||||
return getObject(type, key, LockingStrategy.READ);
|
||||
return getObject(type, key, LockingStrategy.OPTIMISTIC);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -42,46 +42,22 @@ public class TransactionFactoryImpl implements TransactionFactory {
|
||||
}
|
||||
|
||||
switch (strategy) {
|
||||
case READ:
|
||||
case READ_SERIALIZABLE: {
|
||||
var locked = strategy == LockingStrategy.READ_SERIALIZABLE
|
||||
? _source.getReadLockedSerializable(type, key).orElse(null)
|
||||
: _source.getReadLocked(type, key).orElse(null);
|
||||
if (locked == null) {
|
||||
return Optional.empty();
|
||||
}
|
||||
var view = objectAllocator.unmodifiable(locked.data());
|
||||
_objects.put(key,
|
||||
strategy == LockingStrategy.READ_SERIALIZABLE
|
||||
? new TxRecord.TxObjectRecordReadSerializable<>(locked, view)
|
||||
: new TxRecord.TxObjectRecordRead<>(locked, view)
|
||||
);
|
||||
return Optional.of(view);
|
||||
}
|
||||
case OPTIMISTIC: {
|
||||
var read = _source.get(type, key).orElse(null);
|
||||
|
||||
if (read == null) {
|
||||
return Optional.empty();
|
||||
}
|
||||
var copy = objectAllocator.copy(read.data());
|
||||
_objects.put(key, new TxRecord.TxObjectRecordCopyNoLock<>(read.data(), copy));
|
||||
_objects.put(key, new TxRecord.TxObjectRecordOptimistic<>(read, copy));
|
||||
return Optional.of(copy.wrapped());
|
||||
}
|
||||
case WRITE:
|
||||
case WRITE_SERIALIZABLE: {
|
||||
var locked = strategy == LockingStrategy.WRITE_SERIALIZABLE
|
||||
? _source.getWriteLockedSerializable(type, key).orElse(null)
|
||||
: _source.getWriteLocked(type, key).orElse(null);
|
||||
case WRITE: {
|
||||
var locked = _source.getWriteLocked(type, key).orElse(null);
|
||||
if (locked == null) {
|
||||
return Optional.empty();
|
||||
}
|
||||
var copy = objectAllocator.copy(locked.data());
|
||||
_objects.put(key,
|
||||
strategy == LockingStrategy.WRITE_SERIALIZABLE
|
||||
? new TxRecord.TxObjectRecordCopyLockSerializable<>(locked, copy)
|
||||
: new TxRecord.TxObjectRecordCopyLock<>(locked, copy)
|
||||
);
|
||||
_objects.put(key, new TxRecord.TxObjectRecordCopyLock<>(locked, copy));
|
||||
return Optional.of(copy.wrapped());
|
||||
}
|
||||
default:
|
||||
|
||||
@@ -0,0 +1,11 @@
|
||||
package com.usatiuk.dhfs.objects.transaction;
|
||||
|
||||
import com.usatiuk.objects.common.runtime.JData;
|
||||
|
||||
import java.util.concurrent.locks.ReadWriteLock;
|
||||
|
||||
public interface TransactionObject<T extends JData> {
|
||||
T data();
|
||||
|
||||
ReadWriteLock lock();
|
||||
}
|
||||
@@ -4,22 +4,9 @@ import com.usatiuk.objects.common.runtime.JData;
|
||||
import com.usatiuk.objects.common.runtime.JObjectKey;
|
||||
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.locks.ReadWriteLock;
|
||||
|
||||
public interface TransactionObjectSource {
|
||||
interface TransactionObject<T extends JData> {
|
||||
T data();
|
||||
|
||||
ReadWriteLock lock();
|
||||
}
|
||||
|
||||
<T extends JData> Optional<TransactionObject<T>> get(Class<T> type, JObjectKey key);
|
||||
|
||||
<T extends JData> Optional<TransactionObject<T>> getReadLocked(Class<T> type, JObjectKey key);
|
||||
|
||||
<T extends JData> Optional<TransactionObject<T>> getWriteLocked(Class<T> type, JObjectKey key);
|
||||
|
||||
<T extends JData> Optional<TransactionObject<T>> getReadLockedSerializable(Class<T> type, JObjectKey key);
|
||||
|
||||
<T extends JData> Optional<TransactionObject<T>> getWriteLockedSerializable(Class<T> type, JObjectKey key);
|
||||
}
|
||||
|
||||
@@ -1,8 +1,8 @@
|
||||
package com.usatiuk.dhfs.objects.transaction;
|
||||
|
||||
import com.usatiuk.objects.alloc.runtime.ChangeTrackingJData;
|
||||
import com.usatiuk.objects.common.runtime.JData;
|
||||
import com.usatiuk.objects.common.runtime.JObjectKey;
|
||||
import com.usatiuk.objects.alloc.runtime.ObjectAllocator;
|
||||
|
||||
public class TxRecord {
|
||||
public interface TxObjectRecord<T> {
|
||||
@@ -10,58 +10,22 @@ public class TxRecord {
|
||||
}
|
||||
|
||||
public interface TxObjectRecordWrite<T extends JData> extends TxObjectRecord<T> {
|
||||
ObjectAllocator.ChangeTrackingJData<T> copy();
|
||||
TransactionObject<T> original();
|
||||
|
||||
ChangeTrackingJData<T> copy();
|
||||
}
|
||||
|
||||
public record TxObjectRecordRead<T extends JData>(TransactionObjectSource.TransactionObject<T> original,
|
||||
T copy)
|
||||
implements TxObjectRecord<T> {
|
||||
@Override
|
||||
public T getIfStrategyCompatible(JObjectKey key, LockingStrategy strategy) {
|
||||
if (strategy == LockingStrategy.READ)
|
||||
return copy;
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
public record TxObjectRecordReadSerializable<T extends JData>(TransactionObjectSource.TransactionObject<T> original,
|
||||
T copy)
|
||||
implements TxObjectRecord<T> {
|
||||
@Override
|
||||
public T getIfStrategyCompatible(JObjectKey key, LockingStrategy strategy) {
|
||||
if (strategy == LockingStrategy.READ_SERIALIZABLE)
|
||||
return copy;
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
public record TxObjectRecordNew<T extends JData>(T created)
|
||||
implements TxObjectRecordWrite<T> {
|
||||
public record TxObjectRecordNew<T extends JData>(T created) implements TxObjectRecord<T> {
|
||||
@Override
|
||||
public T getIfStrategyCompatible(JObjectKey key, LockingStrategy strategy) {
|
||||
if (strategy == LockingStrategy.WRITE || strategy == LockingStrategy.OPTIMISTIC)
|
||||
return created;
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ObjectAllocator.ChangeTrackingJData<T> copy() {
|
||||
return new ObjectAllocator.ChangeTrackingJData<T>() {
|
||||
@Override
|
||||
public T wrapped() {
|
||||
return created;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isModified() {
|
||||
return true;
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
public record TxObjectRecordCopyLock<T extends JData>(TransactionObjectSource.TransactionObject<T> original,
|
||||
ObjectAllocator.ChangeTrackingJData<T> copy)
|
||||
public record TxObjectRecordCopyLock<T extends JData>(TransactionObject<T> original,
|
||||
ChangeTrackingJData<T> copy)
|
||||
implements TxObjectRecordWrite<T> {
|
||||
@Override
|
||||
public T getIfStrategyCompatible(JObjectKey key, LockingStrategy strategy) {
|
||||
@@ -71,20 +35,8 @@ public class TxRecord {
|
||||
}
|
||||
}
|
||||
|
||||
public record TxObjectRecordCopyLockSerializable<T extends JData>(
|
||||
TransactionObjectSource.TransactionObject<T> original,
|
||||
ObjectAllocator.ChangeTrackingJData<T> copy)
|
||||
implements TxObjectRecordWrite<T> {
|
||||
@Override
|
||||
public T getIfStrategyCompatible(JObjectKey key, LockingStrategy strategy) {
|
||||
if (strategy == LockingStrategy.WRITE_SERIALIZABLE)
|
||||
return copy.wrapped();
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
public record TxObjectRecordCopyNoLock<T extends JData>(T original,
|
||||
ObjectAllocator.ChangeTrackingJData<T> copy)
|
||||
public record TxObjectRecordOptimistic<T extends JData>(TransactionObject<T> original,
|
||||
ChangeTrackingJData<T> copy)
|
||||
implements TxObjectRecordWrite<T> {
|
||||
@Override
|
||||
public T getIfStrategyCompatible(JObjectKey key, LockingStrategy strategy) {
|
||||
|
||||
@@ -9,8 +9,9 @@ import io.quarkus.logging.Log;
|
||||
import io.quarkus.test.junit.QuarkusTest;
|
||||
import jakarta.inject.Inject;
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.Disabled;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.params.ParameterizedTest;
|
||||
import org.junit.jupiter.params.provider.EnumSource;
|
||||
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.CyclicBarrier;
|
||||
@@ -39,7 +40,29 @@ public class ObjectsTest {
|
||||
|
||||
{
|
||||
txm.begin();
|
||||
var parent = curTx.getObject(Parent.class, new JObjectKey("Parent"), LockingStrategy.READ).orElse(null);
|
||||
var parent = curTx.getObject(Parent.class, new JObjectKey("Parent")).orElse(null);
|
||||
Assertions.assertEquals("John", parent.getLastName());
|
||||
txm.commit();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
void createCreateObject() {
|
||||
{
|
||||
txm.begin();
|
||||
var newParent = alloc.create(Parent.class, new JObjectKey("Parent"));
|
||||
newParent.setLastName("John");
|
||||
curTx.putObject(newParent);
|
||||
txm.commit();
|
||||
}
|
||||
Assertions.assertThrows(Exception.class, () -> txm.run(() -> {
|
||||
var newParent = alloc.create(Parent.class, new JObjectKey("Parent"));
|
||||
newParent.setLastName("John2");
|
||||
curTx.putObject(newParent);
|
||||
}));
|
||||
{
|
||||
txm.begin();
|
||||
var parent = curTx.getObject(Parent.class, new JObjectKey("Parent")).orElse(null);
|
||||
Assertions.assertEquals("John", parent.getLastName());
|
||||
txm.commit();
|
||||
}
|
||||
@@ -73,7 +96,7 @@ public class ObjectsTest {
|
||||
|
||||
{
|
||||
txm.begin();
|
||||
var parent = curTx.getObject(Parent.class, new JObjectKey("Parent3"), LockingStrategy.READ).orElse(null);
|
||||
var parent = curTx.getObject(Parent.class, new JObjectKey("Parent3")).orElse(null);
|
||||
Assertions.assertEquals("John3", parent.getLastName());
|
||||
txm.commit();
|
||||
}
|
||||
@@ -123,7 +146,7 @@ public class ObjectsTest {
|
||||
latch.await();
|
||||
|
||||
txm.begin();
|
||||
var got = curTx.getObject(Parent.class, new JObjectKey("Parent2"), LockingStrategy.READ).orElse(null);
|
||||
var got = curTx.getObject(Parent.class, new JObjectKey("Parent2")).orElse(null);
|
||||
txm.commit();
|
||||
|
||||
if (!thread1Failed.get()) {
|
||||
@@ -136,11 +159,13 @@ public class ObjectsTest {
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
void editConflict() throws InterruptedException {
|
||||
@ParameterizedTest
|
||||
@EnumSource(LockingStrategy.class)
|
||||
void editConflict(LockingStrategy strategy) throws InterruptedException {
|
||||
String key = "Parent4" + strategy.name();
|
||||
{
|
||||
txm.begin();
|
||||
var newParent = alloc.create(Parent.class, new JObjectKey("Parent4"));
|
||||
var newParent = alloc.create(Parent.class, new JObjectKey(key));
|
||||
newParent.setLastName("John3");
|
||||
curTx.putObject(newParent);
|
||||
txm.commit();
|
||||
@@ -157,7 +182,7 @@ public class ObjectsTest {
|
||||
Log.warn("Thread 1");
|
||||
txm.begin();
|
||||
barrier.await();
|
||||
var parent = curTx.getObject(Parent.class, new JObjectKey("Parent4"), LockingStrategy.OPTIMISTIC).orElse(null);
|
||||
var parent = curTx.getObject(Parent.class, new JObjectKey(key), strategy).orElse(null);
|
||||
parent.setLastName("John");
|
||||
Log.warn("Thread 1 commit");
|
||||
txm.commit();
|
||||
@@ -172,7 +197,7 @@ public class ObjectsTest {
|
||||
Log.warn("Thread 2");
|
||||
txm.begin();
|
||||
barrier.await();
|
||||
var parent = curTx.getObject(Parent.class, new JObjectKey("Parent4"), LockingStrategy.OPTIMISTIC).orElse(null);
|
||||
var parent = curTx.getObject(Parent.class, new JObjectKey(key), strategy).orElse(null);
|
||||
parent.setLastName("John2");
|
||||
Log.warn("Thread 2 commit");
|
||||
txm.commit();
|
||||
@@ -186,7 +211,7 @@ public class ObjectsTest {
|
||||
latchEnd.await();
|
||||
|
||||
txm.begin();
|
||||
var got = curTx.getObject(Parent.class, new JObjectKey("Parent4"), LockingStrategy.READ).orElse(null);
|
||||
var got = curTx.getObject(Parent.class, new JObjectKey(key)).orElse(null);
|
||||
txm.commit();
|
||||
|
||||
if (!thread1Failed.get()) {
|
||||
@@ -201,128 +226,6 @@ public class ObjectsTest {
|
||||
Assertions.assertTrue(thread1Failed.get() || thread2Failed.get());
|
||||
}
|
||||
|
||||
@Test
|
||||
void editLock() throws InterruptedException {
|
||||
{
|
||||
txm.begin();
|
||||
var newParent = alloc.create(Parent.class, new JObjectKey("Parent5"));
|
||||
newParent.setLastName("John3");
|
||||
curTx.putObject(newParent);
|
||||
txm.commit();
|
||||
}
|
||||
|
||||
AtomicBoolean thread1Failed = new AtomicBoolean(true);
|
||||
AtomicBoolean thread2Failed = new AtomicBoolean(true);
|
||||
|
||||
var barrier = new CyclicBarrier(2);
|
||||
var latch = new CountDownLatch(2);
|
||||
|
||||
Just.run(() -> {
|
||||
try {
|
||||
Log.warn("Thread 1");
|
||||
txm.begin();
|
||||
barrier.await();
|
||||
var parent = curTx.getObject(Parent.class, new JObjectKey("Parent5"), LockingStrategy.WRITE).orElse(null);
|
||||
parent.setLastName("John");
|
||||
Log.warn("Thread 1 commit");
|
||||
txm.commit();
|
||||
thread1Failed.set(false);
|
||||
return null;
|
||||
} finally {
|
||||
latch.countDown();
|
||||
}
|
||||
});
|
||||
Just.run(() -> {
|
||||
try {
|
||||
Log.warn("Thread 2");
|
||||
txm.begin();
|
||||
barrier.await();
|
||||
var parent = curTx.getObject(Parent.class, new JObjectKey("Parent5"), LockingStrategy.WRITE).orElse(null);
|
||||
parent.setLastName("John2");
|
||||
Log.warn("Thread 2 commit");
|
||||
txm.commit();
|
||||
thread2Failed.set(false);
|
||||
return null;
|
||||
} finally {
|
||||
latch.countDown();
|
||||
}
|
||||
});
|
||||
|
||||
latch.await();
|
||||
|
||||
txm.begin();
|
||||
var got = curTx.getObject(Parent.class, new JObjectKey("Parent5"), LockingStrategy.READ).orElse(null);
|
||||
txm.commit();
|
||||
|
||||
Assertions.assertTrue(!thread1Failed.get() && !thread2Failed.get());
|
||||
Assertions.assertTrue(got.getLastName().equals("John") || got.getLastName().equals("John2"));
|
||||
}
|
||||
|
||||
@Test
|
||||
@Disabled // Doesn't work as "lastWrittenTx" is not persistent
|
||||
void editLockSerializable() throws InterruptedException {
|
||||
{
|
||||
txm.begin();
|
||||
var newParent = alloc.create(Parent.class, new JObjectKey("Parent6"));
|
||||
newParent.setLastName("John3");
|
||||
curTx.putObject(newParent);
|
||||
txm.commit();
|
||||
}
|
||||
|
||||
AtomicBoolean thread1Failed = new AtomicBoolean(true);
|
||||
AtomicBoolean thread2Failed = new AtomicBoolean(true);
|
||||
|
||||
var barrier = new CyclicBarrier(2);
|
||||
var latchEnd = new CountDownLatch(2);
|
||||
|
||||
Just.run(() -> {
|
||||
try {
|
||||
Log.warn("Thread 1");
|
||||
txm.begin();
|
||||
barrier.await();
|
||||
var parent = curTx.getObject(Parent.class, new JObjectKey("Parent6"), LockingStrategy.WRITE_SERIALIZABLE).orElse(null);
|
||||
parent.setLastName("John");
|
||||
Log.warn("Thread 1 commit");
|
||||
txm.commit();
|
||||
thread1Failed.set(false);
|
||||
return null;
|
||||
} finally {
|
||||
latchEnd.countDown();
|
||||
}
|
||||
});
|
||||
Just.run(() -> {
|
||||
try {
|
||||
Log.warn("Thread 2");
|
||||
txm.begin();
|
||||
barrier.await();
|
||||
var parent = curTx.getObject(Parent.class, new JObjectKey("Parent6"), LockingStrategy.WRITE_SERIALIZABLE).orElse(null);
|
||||
parent.setLastName("John2");
|
||||
Log.warn("Thread 2 commit");
|
||||
txm.commit();
|
||||
thread2Failed.set(false);
|
||||
return null;
|
||||
} finally {
|
||||
latchEnd.countDown();
|
||||
}
|
||||
});
|
||||
|
||||
latchEnd.await();
|
||||
|
||||
txm.begin();
|
||||
var got = curTx.getObject(Parent.class, new JObjectKey("Parent6"), LockingStrategy.READ).orElse(null);
|
||||
txm.commit();
|
||||
|
||||
if (!thread1Failed.get()) {
|
||||
Assertions.assertTrue(thread2Failed.get());
|
||||
Assertions.assertEquals("John", got.getLastName());
|
||||
} else if (!thread2Failed.get()) {
|
||||
Assertions.assertEquals("John2", got.getLastName());
|
||||
} else {
|
||||
Assertions.fail("No thread succeeded");
|
||||
}
|
||||
|
||||
Assertions.assertTrue(thread1Failed.get() || thread2Failed.get());
|
||||
}
|
||||
// }
|
||||
//
|
||||
// @Test
|
||||
|
||||
Reference in New Issue
Block a user