some more lock tidying, serializable thingy

This commit is contained in:
2024-12-04 16:57:03 +01:00
parent 00a5015208
commit c242a318f3
9 changed files with 299 additions and 100 deletions

View File

@@ -2,10 +2,7 @@ package com.usatiuk.dhfs.objects;
import com.usatiuk.dhfs.objects.persistence.ObjectPersistentStore;
import com.usatiuk.dhfs.objects.persistence.TxManifest;
import com.usatiuk.dhfs.objects.transaction.TransactionFactory;
import com.usatiuk.dhfs.objects.transaction.TransactionObjectSource;
import com.usatiuk.dhfs.objects.transaction.TransactionPrivate;
import com.usatiuk.dhfs.objects.transaction.TxRecord;
import com.usatiuk.dhfs.objects.transaction.*;
import com.usatiuk.dhfs.utils.DataLocker;
import com.usatiuk.dhfs.utils.VoidFn;
import com.usatiuk.objects.alloc.runtime.ObjectAllocator;
@@ -14,7 +11,6 @@ import com.usatiuk.objects.common.runtime.JObjectKey;
import io.quarkus.logging.Log;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import org.apache.commons.lang3.tuple.Pair;
import java.lang.ref.Cleaner;
import java.lang.ref.WeakReference;
@@ -46,7 +42,7 @@ public class JObjectManager {
private static final Cleaner CLEANER = Cleaner.create();
final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
long lastWriteTx = 0;
long lastWriteTx = -1;
public JDataWrapper(T referent) {
super(referent);
@@ -55,9 +51,21 @@ public class JObjectManager {
_objects.remove(key, this);
});
}
@Override
public String toString() {
return "JDataWrapper{" +
"ref=" + get() +
", lock=" + lock +
", lastWriteTx=" + lastWriteTx +
'}';
}
}
private <T extends JData> Pair<T, JDataWrapper<T>> get(Class<T> type, JObjectKey key) {
private record WrapperRet<T extends JData>(T obj, JDataWrapper<T> wrapper) {
}
private <T extends JData> WrapperRet<T> get(Class<T> type, JObjectKey key) {
while (true) {
{
var got = _objects.get(key);
@@ -65,7 +73,7 @@ public class JObjectManager {
if (got != null) {
var ref = got.get();
if (type.isInstance(ref)) {
return Pair.of(type.cast(ref), (JDataWrapper<T>) got);
return new WrapperRet<>((T) ref, (JDataWrapper<T>) got);
} else if (ref == null) {
_objects.remove(key, got);
} else {
@@ -82,10 +90,10 @@ public class JObjectManager {
var got = objectSerializer.deserialize(read);
if (type.isInstance(got)) {
var wrapper = new JDataWrapper<T>((T) got);
var wrapper = new JDataWrapper<>((T) got);
var old = _objects.putIfAbsent(key, wrapper);
if (old != null) continue;
return Pair.of(type.cast(got), wrapper);
return new WrapperRet<>((T) got, wrapper);
} else if (got == null) {
return null;
} else {
@@ -95,60 +103,96 @@ public class JObjectManager {
}
}
private <T extends JData> WrapperRet<T> getLocked(Class<T> type, JObjectKey key, boolean write) {
var read = get(type, key);
if (read == null) return null;
var lock = write ? read.wrapper().lock.writeLock() : read.wrapper().lock.readLock();
lock.lock();
while (true) {
try {
var readAgain = get(type, key);
if (readAgain == null) {
lock.unlock();
return null;
}
if (!Objects.equals(read, readAgain)) {
lock.unlock();
read = readAgain;
lock = write ? read.wrapper().lock.writeLock() : read.wrapper().lock.readLock();
lock.lock();
continue;
}
return read;
} catch (Throwable e) {
lock.unlock();
throw e;
}
}
}
private record TransactionObjectImpl<T extends JData>
(T data, ReadWriteLock lock)
implements TransactionObjectSource.TransactionObject<T> {}
implements TransactionObjectSource.TransactionObject<T> {
}
private class TransactionObjectSourceImpl implements TransactionObjectSource {
private final long _txId;
private TransactionObjectSourceImpl(long txId) {
_txId = txId;
}
private final TransactionObjectSource _objSource = new TransactionObjectSource() {
@Override
public <T extends JData> Optional<TransactionObject<T>> get(Class<T> type, JObjectKey key) {
var got = JObjectManager.this.get(type, key);
if (got == null) return Optional.empty();
return Optional.of(new TransactionObjectImpl<>(got.getLeft(), got.getRight().lock));
}
private <T extends JData> Optional<TransactionObject<T>> getLocked(Class<T> type, JObjectKey key, boolean write) {
var read = get(type, key).orElse(null);
if (read == null) return Optional.empty();
var lock = write ? read.lock().writeLock() : read.lock().readLock();
lock.lock();
while (true) {
try {
var readAgain = get(type, key).orElse(null);
if (readAgain == null) {
lock.unlock();
return Optional.empty();
}
if (!Objects.equals(read, readAgain)) {
lock.unlock();
read = readAgain;
lock = write ? read.lock().writeLock() : read.lock().readLock();
lock.lock();
continue;
}
return Optional.of(new TransactionObjectImpl<>(read.data(), read.lock()));
} catch (Throwable e) {
lock.unlock();
throw e;
}
}
return Optional.of(new TransactionObjectImpl<>(got.obj(), got.wrapper().lock));
}
@Override
public <T extends JData> Optional<TransactionObject<T>> getReadLocked(Class<T> type, JObjectKey key) {
return getLocked(type, key, false);
var got = JObjectManager.this.getLocked(type, key, false);
if (got == null) return Optional.empty();
return Optional.of(new TransactionObjectImpl<>(got.obj(), got.wrapper().lock));
}
@Override
public <T extends JData> Optional<TransactionObject<T>> getWriteLocked(Class<T> type, JObjectKey key) {
return getLocked(type, key, true);
var got = JObjectManager.this.getLocked(type, key, true);
if (got == null) return Optional.empty();
return Optional.of(new TransactionObjectImpl<>(got.obj(), got.wrapper().lock));
}
};
@Override
public <T extends JData> Optional<TransactionObject<T>> getReadLockedSerializable(Class<T> type, JObjectKey key) {
var got = JObjectManager.this.getLocked(type, key, false);
if (got == null) return Optional.empty();
if (got.wrapper().lastWriteTx >= _txId) {
got.wrapper().lock.readLock().unlock();
throw new IllegalStateException("Serialization race");
}
return Optional.of(new TransactionObjectImpl<>(got.obj(), got.wrapper().lock));
}
@Override
public <T extends JData> Optional<TransactionObject<T>> getWriteLockedSerializable(Class<T> type, JObjectKey key) {
var got = JObjectManager.this.getLocked(type, key, true);
if (got == null) return Optional.empty();
if (got.wrapper().lastWriteTx >= _txId) {
got.wrapper().lock.writeLock().unlock();
throw new IllegalStateException("Serialization race");
}
return Optional.of(new TransactionObjectImpl<>(got.obj(), got.wrapper().lock));
}
}
;
public TransactionPrivate createTransaction() {
var counter = _txCounter.getAndIncrement();
Log.trace("Creating transaction " + counter);
return transactionFactory.createTransaction(counter, _objSource);
return transactionFactory.createTransaction(counter, new TransactionObjectSourceImpl(counter));
}
public void commit(TransactionPrivate tx) {
@@ -162,15 +206,21 @@ public class JObjectManager {
for (var entry : tx.drain()) {
Log.trace("Processing entry " + entry.toString());
switch (entry) {
case TxRecord.TxObjectRecordRead<?> read -> {
toUnlock.add(read.original().lock().readLock()::unlock);
}
case TxRecord.TxObjectRecordRead<?> read -> toUnlock.add(read.original().lock().readLock()::unlock);
case TxRecord.TxObjectRecordReadSerializable<?> read ->
toUnlock.add(read.original().lock().readLock()::unlock);
case TxRecord.TxObjectRecordCopyLock<?> copy -> {
toUnlock.add(copy.original().lock().writeLock()::unlock);
if (copy.copy().isModified()) {
toFlush.add(copy);
}
}
case TxRecord.TxObjectRecordCopyLockSerializable<?> copy -> { // FIXME
toUnlock.add(copy.original().lock().writeLock()::unlock);
if (copy.copy().isModified()) {
toFlush.add(copy);
}
}
case TxRecord.TxObjectRecordCopyNoLock<?> copy -> {
if (copy.copy().isModified()) {
toLock.add(copy);
@@ -189,14 +239,17 @@ public class JObjectManager {
for (var record : toLock) {
Log.trace("Locking " + record.toString());
var found = _objects.get(record.original().getKey());
var got = getLocked(record.original().getClass(), record.original().getKey(), true);
if (found.get() != record.original()) {
throw new IllegalStateException("Object changed during transaction");
if (got == null) {
throw new IllegalStateException("Object not found");
}
found.lock.writeLock().lock();
toUnlock.add(found.lock.writeLock()::unlock);
toUnlock.add(got.wrapper().lock.writeLock()::unlock);
if (got.obj() != record.original()) {
throw new IllegalStateException("Object changed during transaction");
}
}
for (var record : toFlush) {
@@ -209,15 +262,23 @@ public class JObjectManager {
} else if (current != null) {
var old = switch (record) {
case TxRecord.TxObjectRecordCopyLock<?> copy -> copy.original().data();
case TxRecord.TxObjectRecordCopyLockSerializable<?> copy -> copy.original().data();
case TxRecord.TxObjectRecordCopyNoLock<?> copy -> copy.original();
default -> throw new IllegalStateException("Unexpected value: " + record);
};
if (current.get() != old) {
assert record instanceof TxRecord.TxObjectRecordCopyNoLock;
throw new IllegalStateException("Object changed during transaction");
}
if (current.lastWriteTx > tx.getId()) {
// In case of NoLock write, the instance is replaced and the following shouldn't happen
// It can happen without serializable writes, as then the read of object to transaction
// can happen after another transaction had read, changed and committed it.
if (record instanceof TxRecord.TxObjectRecordCopyLockSerializable
&& current.lastWriteTx > tx.getId()) {
assert false;
// Shouldn't happen as we should check for serialization in the tx object source
throw new IllegalStateException("Transaction race");
}
@@ -241,7 +302,7 @@ public class JObjectManager {
// Have all locks now
for (var record : toFlush) {
Log.trace("Flushing " + record.toString());
Log.trace("Flushing " + record.toString() + " " + _objects.get(record.copy().wrapped().getKey()).toString());
assert record.copy().isModified();

View File

@@ -1,6 +1,9 @@
package com.usatiuk.dhfs.objects;
import com.usatiuk.dhfs.objects.transaction.Transaction;
import com.usatiuk.dhfs.utils.VoidFn;
import java.util.function.Supplier;
public interface TransactionManager {
void begin();
@@ -9,5 +12,29 @@ public interface TransactionManager {
void rollback();
default <T> T run(Supplier<T> supplier) {
begin();
try {
var ret = supplier.get();
commit();
return ret;
} catch (Throwable e) {
rollback();
throw e;
}
}
default void run(VoidFn fn) {
begin();
try {
fn.apply();
commit();
} catch (Throwable e) {
rollback();
throw e;
}
}
Transaction current();
}

View File

@@ -2,6 +2,7 @@ package com.usatiuk.dhfs.objects;
import com.usatiuk.dhfs.objects.transaction.Transaction;
import com.usatiuk.dhfs.objects.transaction.TransactionPrivate;
import com.usatiuk.dhfs.objects.transaction.TxRecord;
import io.quarkus.logging.Log;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
@@ -43,6 +44,17 @@ public class TransactionManagerImpl implements TransactionManager {
@Override
public void rollback() {
var tx = _currentTransaction.get();
for (var o : tx.drain()) {
switch (o) {
case TxRecord.TxObjectRecordRead<?> r -> r.original().lock().readLock().unlock();
case TxRecord.TxObjectRecordReadSerializable<?> r -> r.original().lock().readLock().unlock();
case TxRecord.TxObjectRecordCopyLock<?> r -> r.original().lock().writeLock().unlock();
case TxRecord.TxObjectRecordCopyLockSerializable<?> r -> r.original().lock().writeLock().unlock();
default -> {
}
}
}
_currentTransaction.remove();
}

View File

@@ -1,7 +1,9 @@
package com.usatiuk.dhfs.objects.transaction;
public enum LockingStrategy {
READ_ONLY, // Read only, no writes allowed, blocks writers
OPTIMISTIC, // Optimistic write, no blocking other possible writers
WRITE // Write lock, blocks all other writers
READ, // Read only, no writes allowed, blocks writers
READ_SERIALIZABLE, // Exception if object was written to after transaction start
OPTIMISTIC, // Optimistic write, no blocking other possible writers
WRITE, // Write lock, blocks all other writers
WRITE_SERIALIZABLE // Exception if object was written to after transaction start
}

View File

@@ -12,6 +12,6 @@ public interface Transaction {
<T extends JData> void putObject(JData obj);
default <T extends JData> Optional<T> getObject(Class<T> type, JObjectKey key) {
return getObject(type, key, LockingStrategy.READ_ONLY);
return getObject(type, key, LockingStrategy.READ);
}
}

View File

@@ -41,34 +41,47 @@ public class TransactionFactoryImpl implements TransactionFactory {
return Optional.of(type.cast(compatible));
}
var read = _source.get(type, key).orElse(null);
if (read == null) {
return Optional.empty();
}
switch (strategy) {
case READ_ONLY: {
var locked = _source.getReadLocked(type, key).orElse(null);
case READ:
case READ_SERIALIZABLE: {
var locked = strategy == LockingStrategy.READ_SERIALIZABLE
? _source.getReadLockedSerializable(type, key).orElse(null)
: _source.getReadLocked(type, key).orElse(null);
if (locked == null) {
return Optional.empty();
}
var view = objectAllocator.unmodifiable(locked.data());
_objects.put(key, new TxRecord.TxObjectRecordRead<>(locked, view));
_objects.put(key,
strategy == LockingStrategy.READ_SERIALIZABLE
? new TxRecord.TxObjectRecordReadSerializable<>(locked, view)
: new TxRecord.TxObjectRecordRead<>(locked, view)
);
return Optional.of(view);
}
case OPTIMISTIC: {
var read = _source.get(type, key).orElse(null);
if (read == null) {
return Optional.empty();
}
var copy = objectAllocator.copy(read.data());
_objects.put(key, new TxRecord.TxObjectRecordCopyNoLock<>(read.data(), copy));
return Optional.of(copy.wrapped());
}
case WRITE: {
var locked = _source.getWriteLocked(type, key).orElse(null);
case WRITE:
case WRITE_SERIALIZABLE: {
var locked = strategy == LockingStrategy.WRITE_SERIALIZABLE
? _source.getWriteLockedSerializable(type, key).orElse(null)
: _source.getWriteLocked(type, key).orElse(null);
if (locked == null) {
return Optional.empty();
}
var copy = objectAllocator.copy(locked.data());
_objects.put(key, new TxRecord.TxObjectRecordCopyLock<>(locked, copy));
_objects.put(key,
strategy == LockingStrategy.WRITE_SERIALIZABLE
? new TxRecord.TxObjectRecordCopyLockSerializable<>(locked, copy)
: new TxRecord.TxObjectRecordCopyLock<>(locked, copy)
);
return Optional.of(copy.wrapped());
}
default:

View File

@@ -18,4 +18,8 @@ public interface TransactionObjectSource {
<T extends JData> Optional<TransactionObject<T>> getReadLocked(Class<T> type, JObjectKey key);
<T extends JData> Optional<TransactionObject<T>> getWriteLocked(Class<T> type, JObjectKey key);
<T extends JData> Optional<TransactionObject<T>> getReadLockedSerializable(Class<T> type, JObjectKey key);
<T extends JData> Optional<TransactionObject<T>> getWriteLockedSerializable(Class<T> type, JObjectKey key);
}

View File

@@ -18,7 +18,18 @@ public class TxRecord {
implements TxObjectRecord<T> {
@Override
public T getIfStrategyCompatible(JObjectKey key, LockingStrategy strategy) {
if (strategy == LockingStrategy.READ_ONLY)
if (strategy == LockingStrategy.READ)
return copy;
return null;
}
}
public record TxObjectRecordReadSerializable<T extends JData>(TransactionObjectSource.TransactionObject<T> original,
T copy)
implements TxObjectRecord<T> {
@Override
public T getIfStrategyCompatible(JObjectKey key, LockingStrategy strategy) {
if (strategy == LockingStrategy.READ_SERIALIZABLE)
return copy;
return null;
}
@@ -50,7 +61,7 @@ public class TxRecord {
}
public record TxObjectRecordCopyLock<T extends JData>(TransactionObjectSource.TransactionObject<T> original,
ObjectAllocator.ChangeTrackingJData<T> copy)
ObjectAllocator.ChangeTrackingJData<T> copy)
implements TxObjectRecordWrite<T> {
@Override
public T getIfStrategyCompatible(JObjectKey key, LockingStrategy strategy) {
@@ -60,6 +71,18 @@ public class TxRecord {
}
}
public record TxObjectRecordCopyLockSerializable<T extends JData>(
TransactionObjectSource.TransactionObject<T> original,
ObjectAllocator.ChangeTrackingJData<T> copy)
implements TxObjectRecordWrite<T> {
@Override
public T getIfStrategyCompatible(JObjectKey key, LockingStrategy strategy) {
if (strategy == LockingStrategy.WRITE_SERIALIZABLE)
return copy.wrapped();
return null;
}
}
public record TxObjectRecordCopyNoLock<T extends JData>(T original,
ObjectAllocator.ChangeTrackingJData<T> copy)
implements TxObjectRecordWrite<T> {

View File

@@ -9,10 +9,11 @@ import io.quarkus.logging.Log;
import io.quarkus.test.junit.QuarkusTest;
import jakarta.inject.Inject;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.atomic.AtomicBoolean;
@QuarkusTest
@@ -38,7 +39,7 @@ public class ObjectsTest {
{
txm.begin();
var parent = curTx.getObject(Parent.class, new JObjectKey("Parent"), LockingStrategy.READ_ONLY).orElse(null);
var parent = curTx.getObject(Parent.class, new JObjectKey("Parent"), LockingStrategy.READ).orElse(null);
Assertions.assertEquals("John", parent.getLastName());
txm.commit();
}
@@ -72,7 +73,7 @@ public class ObjectsTest {
{
txm.begin();
var parent = curTx.getObject(Parent.class, new JObjectKey("Parent3"), LockingStrategy.READ_ONLY).orElse(null);
var parent = curTx.getObject(Parent.class, new JObjectKey("Parent3"), LockingStrategy.READ).orElse(null);
Assertions.assertEquals("John3", parent.getLastName());
txm.commit();
}
@@ -83,21 +84,20 @@ public class ObjectsTest {
AtomicBoolean thread1Failed = new AtomicBoolean(true);
AtomicBoolean thread2Failed = new AtomicBoolean(true);
var signal = new Semaphore(0);
var barrier = new CyclicBarrier(2);
var latch = new CountDownLatch(2);
Just.run(() -> {
try {
Log.warn("Thread 1");
txm.begin();
barrier.await();
var newParent = alloc.create(Parent.class, new JObjectKey("Parent2"));
newParent.setLastName("John");
curTx.putObject(newParent);
signal.acquire();
Log.warn("Thread 1 commit");
txm.commit();
thread1Failed.set(false);
signal.release();
return null;
} finally {
latch.countDown();
@@ -107,25 +107,23 @@ public class ObjectsTest {
try {
Log.warn("Thread 2");
txm.begin();
barrier.await();
var newParent = alloc.create(Parent.class, new JObjectKey("Parent2"));
newParent.setLastName("John2");
curTx.putObject(newParent);
signal.acquire();
Log.warn("Thread 2 commit");
txm.commit();
thread2Failed.set(false);
signal.release();
return null;
} finally {
latch.countDown();
}
});
signal.release(2);
latch.await();
txm.begin();
var got = curTx.getObject(Parent.class, new JObjectKey("Parent2"), LockingStrategy.READ_ONLY).orElse(null);
var got = curTx.getObject(Parent.class, new JObjectKey("Parent2"), LockingStrategy.READ).orElse(null);
txm.commit();
if (!thread1Failed.get()) {
@@ -151,47 +149,44 @@ public class ObjectsTest {
AtomicBoolean thread1Failed = new AtomicBoolean(true);
AtomicBoolean thread2Failed = new AtomicBoolean(true);
var signal = new Semaphore(0);
var latch = new CountDownLatch(2);
var barrier = new CyclicBarrier(2);
var latchEnd = new CountDownLatch(2);
Just.run(() -> {
try {
Log.warn("Thread 1");
txm.begin();
barrier.await();
var parent = curTx.getObject(Parent.class, new JObjectKey("Parent4"), LockingStrategy.OPTIMISTIC).orElse(null);
parent.setLastName("John");
signal.acquire();
Log.warn("Thread 1 commit");
txm.commit();
thread1Failed.set(false);
signal.release();
return null;
} finally {
latch.countDown();
latchEnd.countDown();
}
});
Just.run(() -> {
try {
Log.warn("Thread 2");
txm.begin();
barrier.await();
var parent = curTx.getObject(Parent.class, new JObjectKey("Parent4"), LockingStrategy.OPTIMISTIC).orElse(null);
parent.setLastName("John2");
signal.acquire();
Log.warn("Thread 2 commit");
txm.commit();
thread2Failed.set(false);
signal.release();
return null;
} finally {
latch.countDown();
latchEnd.countDown();
}
});
signal.release(2);
latch.await();
latchEnd.await();
txm.begin();
var got = curTx.getObject(Parent.class, new JObjectKey("Parent4"), LockingStrategy.READ_ONLY).orElse(null);
var got = curTx.getObject(Parent.class, new JObjectKey("Parent4"), LockingStrategy.READ).orElse(null);
txm.commit();
if (!thread1Failed.get()) {
@@ -219,20 +214,19 @@ public class ObjectsTest {
AtomicBoolean thread1Failed = new AtomicBoolean(true);
AtomicBoolean thread2Failed = new AtomicBoolean(true);
var signal = new Semaphore(0);
var barrier = new CyclicBarrier(2);
var latch = new CountDownLatch(2);
Just.run(() -> {
try {
Log.warn("Thread 1");
txm.begin();
barrier.await();
var parent = curTx.getObject(Parent.class, new JObjectKey("Parent5"), LockingStrategy.WRITE).orElse(null);
parent.setLastName("John");
signal.acquire();
Log.warn("Thread 1 commit");
txm.commit();
thread1Failed.set(false);
signal.release();
return null;
} finally {
latch.countDown();
@@ -242,30 +236,93 @@ public class ObjectsTest {
try {
Log.warn("Thread 2");
txm.begin();
barrier.await();
var parent = curTx.getObject(Parent.class, new JObjectKey("Parent5"), LockingStrategy.WRITE).orElse(null);
parent.setLastName("John2");
signal.acquire();
Log.warn("Thread 2 commit");
txm.commit();
thread2Failed.set(false);
signal.release();
return null;
} finally {
latch.countDown();
}
});
signal.release(2);
latch.await();
txm.begin();
var got = curTx.getObject(Parent.class, new JObjectKey("Parent5"), LockingStrategy.READ_ONLY).orElse(null);
var got = curTx.getObject(Parent.class, new JObjectKey("Parent5"), LockingStrategy.READ).orElse(null);
txm.commit();
Assertions.assertTrue(!thread1Failed.get() && !thread2Failed.get());
Assertions.assertTrue(got.getLastName().equals("John") || got.getLastName().equals("John2"));
}
@Test
@Disabled // Doesn't work as "lastWrittenTx" is not persistent
void editLockSerializable() throws InterruptedException {
{
txm.begin();
var newParent = alloc.create(Parent.class, new JObjectKey("Parent6"));
newParent.setLastName("John3");
curTx.putObject(newParent);
txm.commit();
}
AtomicBoolean thread1Failed = new AtomicBoolean(true);
AtomicBoolean thread2Failed = new AtomicBoolean(true);
var barrier = new CyclicBarrier(2);
var latchEnd = new CountDownLatch(2);
Just.run(() -> {
try {
Log.warn("Thread 1");
txm.begin();
barrier.await();
var parent = curTx.getObject(Parent.class, new JObjectKey("Parent6"), LockingStrategy.WRITE_SERIALIZABLE).orElse(null);
parent.setLastName("John");
Log.warn("Thread 1 commit");
txm.commit();
thread1Failed.set(false);
return null;
} finally {
latchEnd.countDown();
}
});
Just.run(() -> {
try {
Log.warn("Thread 2");
txm.begin();
barrier.await();
var parent = curTx.getObject(Parent.class, new JObjectKey("Parent6"), LockingStrategy.WRITE_SERIALIZABLE).orElse(null);
parent.setLastName("John2");
Log.warn("Thread 2 commit");
txm.commit();
thread2Failed.set(false);
return null;
} finally {
latchEnd.countDown();
}
});
latchEnd.await();
txm.begin();
var got = curTx.getObject(Parent.class, new JObjectKey("Parent6"), LockingStrategy.READ).orElse(null);
txm.commit();
if (!thread1Failed.get()) {
Assertions.assertTrue(thread2Failed.get());
Assertions.assertEquals("John", got.getLastName());
} else if (!thread2Failed.get()) {
Assertions.assertEquals("John2", got.getLastName());
} else {
Assertions.fail("No thread succeeded");
}
Assertions.assertTrue(thread1Failed.get() || thread2Failed.get());
}
// }
//
// @Test