diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/JObjectManager.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/JObjectManager.java index 3ad37c83..2ecb5a99 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/JObjectManager.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/JObjectManager.java @@ -92,23 +92,16 @@ public class JObjectManager { } } + private record TransactionObjectImpl + (T data, ReadWriteLock lock) + implements TransactionObjectSource.TransactionObject {} + private final TransactionObjectSource _objSource = new TransactionObjectSource() { @Override public Optional> get(Class type, JObjectKey key) { var got = JObjectManager.this.get(type, key); if (got == null) return Optional.empty(); - return Optional.of(new TransactionObject<>() { - @Override - public T get() { - return got.getLeft(); - } - - @Override - public ReadWriteLock getLock() { - return got.getRight().lock; - } - }); - + return Optional.of(new TransactionObjectImpl<>(got.getLeft(), got.getRight().lock)); } }; @@ -131,10 +124,10 @@ public class JObjectManager { Log.trace("Processing entry " + entry.toString()); switch (entry) { case TxRecord.TxObjectRecordRead read -> { - toUnlock.add(read.original().getLock().readLock()::unlock); + toUnlock.add(read.original().lock().readLock()::unlock); } case TxRecord.TxObjectRecordCopyLock copy -> { - toUnlock.add(copy.original().getLock().writeLock()::unlock); + toUnlock.add(copy.original().lock().writeLock()::unlock); if (copy.copy().isModified()) { toFlush.add(copy); } @@ -176,7 +169,7 @@ public class JObjectManager { throw new IllegalStateException("Object not found during transaction"); } else if (current != null) { var old = switch (record) { - case TxRecord.TxObjectRecordCopyLock copy -> copy.original().get(); + case TxRecord.TxObjectRecordCopyLock copy -> copy.original().data(); case TxRecord.TxObjectRecordCopyNoLock copy -> copy.original(); default -> throw new IllegalStateException("Unexpected value: " + record); }; diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/TransactionFactoryImpl.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/TransactionFactoryImpl.java index c0f8c55c..5de8255a 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/TransactionFactoryImpl.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/TransactionFactoryImpl.java @@ -49,27 +49,40 @@ public class TransactionFactoryImpl implements TransactionFactory { switch (strategy) { case READ_ONLY: { - read.getLock().readLock().lock(); - var view = objectAllocator.unmodifiable(read.get()); + read.lock().readLock().lock(); + var view = objectAllocator.unmodifiable(read.data()); _objects.put(key, new TxRecord.TxObjectRecordRead<>(read, view)); return Optional.of(view); } - case WRITE: case OPTIMISTIC: { - var copy = objectAllocator.copy(read.get()); - - switch (strategy) { - case WRITE: - read.getLock().writeLock().lock(); - _objects.put(key, new TxRecord.TxObjectRecordCopyLock<>(read, copy)); - break; - case OPTIMISTIC: - _objects.put(key, new TxRecord.TxObjectRecordCopyNoLock<>(read.get(), copy)); - break; - } - + var copy = objectAllocator.copy(read.data()); + _objects.put(key, new TxRecord.TxObjectRecordCopyNoLock<>(read.data(), copy)); return Optional.of(copy.wrapped()); } + case WRITE: { + read.lock().writeLock().lock(); + while (true) { + try { + var readAgain = _source.get(type, key).orElse(null); + if (readAgain == null) { + read.lock().writeLock().unlock(); + return Optional.empty(); + } + if (!Objects.equals(read, readAgain)) { + read.lock().writeLock().unlock(); + read = readAgain; + read.lock().writeLock().lock(); + continue; + } + var copy = objectAllocator.copy(read.data()); + _objects.put(key, new TxRecord.TxObjectRecordCopyLock<>(read, copy)); + return Optional.of(copy.wrapped()); + } catch (Throwable e) { + read.lock().writeLock().unlock(); + throw e; + } + } + } default: throw new IllegalArgumentException("Unknown locking strategy"); } diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/TransactionObjectSource.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/TransactionObjectSource.java index 55301c9c..2526bcd4 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/TransactionObjectSource.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/TransactionObjectSource.java @@ -8,9 +8,9 @@ import java.util.concurrent.locks.ReadWriteLock; public interface TransactionObjectSource { interface TransactionObject { - T get(); + T data(); - ReadWriteLock getLock(); + ReadWriteLock lock(); } Optional> get(Class type, JObjectKey key); diff --git a/dhfs-parent/objects/src/test/java/com/usatiuk/dhfs/objects/ObjectsTest.java b/dhfs-parent/objects/src/test/java/com/usatiuk/dhfs/objects/ObjectsTest.java index 26af7975..777e3045 100644 --- a/dhfs-parent/objects/src/test/java/com/usatiuk/dhfs/objects/ObjectsTest.java +++ b/dhfs-parent/objects/src/test/java/com/usatiuk/dhfs/objects/ObjectsTest.java @@ -8,6 +8,7 @@ import jakarta.inject.Inject; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicBoolean; @@ -80,12 +81,10 @@ public class ObjectsTest { AtomicBoolean thread2Failed = new AtomicBoolean(true); var signal = new Semaphore(0); - var signalFin = new Semaphore(2); - + var latch = new CountDownLatch(2); Just.run(() -> { try { - signalFin.acquire(); Log.warn("Thread 1"); txm.begin(); var newParent = alloc.create(Parent.class, new JObjectKey("Parent2")); @@ -98,12 +97,11 @@ public class ObjectsTest { signal.release(); return null; } finally { - signalFin.release(); + latch.countDown(); } }); Just.run(() -> { try { - signalFin.acquire(); Log.warn("Thread 2"); txm.begin(); var newParent = alloc.create(Parent.class, new JObjectKey("Parent2")); @@ -116,12 +114,12 @@ public class ObjectsTest { signal.release(); return null; } finally { - signalFin.release(); + latch.countDown(); } }); signal.release(2); - signalFin.acquire(2); + latch.await(); txm.begin(); var got = curTx.getObject(Parent.class, new JObjectKey("Parent2"), LockingStrategy.READ_ONLY).orElse(null); @@ -135,37 +133,136 @@ public class ObjectsTest { } else { Assertions.fail("No thread succeeded"); } - } -// @Test -// void editConflict() { -// { -// var tx = _tx.beginTransaction(); -// var parent = tx.getObject(new JObjectKey("Parent"), Parent.class); -// parent.setName("John"); -// tx.commit(); -// } -// -// { -// var tx = _tx.beginTransaction(); -// var parent = tx.getObject(new JObjectKey("Parent"), Parent.class); -// parent.setName("John2"); -// -// var tx2 = _tx.beginTransaction(); -// var parent2 = tx2.getObject(new JObjectKey("Parent"), Parent.class); -// parent2.setName("John3"); -// -// tx.commit(); -// Assertions.assertThrows(Exception.class, tx2::commit); -// } -// -// { -// var tx2 = _tx.beginTransaction(); -// var parent = tx2.getObject(new JObjectKey("Parent")); -// Assertions.assertInstanceOf(Parent.class, parent); -// Assertions.assertEquals("John2", ((Parent) parent).getName()); -// } + @Test + void editConflict() throws InterruptedException { + { + txm.begin(); + var newParent = alloc.create(Parent.class, new JObjectKey("Parent4")); + newParent.setLastName("John3"); + curTx.putObject(newParent); + txm.commit(); + } + + AtomicBoolean thread1Failed = new AtomicBoolean(true); + AtomicBoolean thread2Failed = new AtomicBoolean(true); + + var signal = new Semaphore(0); + var latch = new CountDownLatch(2); + + Just.run(() -> { + try { + Log.warn("Thread 1"); + txm.begin(); + var parent = curTx.getObject(Parent.class, new JObjectKey("Parent4"), LockingStrategy.OPTIMISTIC).orElse(null); + parent.setLastName("John"); + signal.acquire(); + Log.warn("Thread 1 commit"); + txm.commit(); + thread1Failed.set(false); + signal.release(); + return null; + } finally { + latch.countDown(); + } + }); + Just.run(() -> { + try { + Log.warn("Thread 2"); + txm.begin(); + var parent = curTx.getObject(Parent.class, new JObjectKey("Parent4"), LockingStrategy.OPTIMISTIC).orElse(null); + parent.setLastName("John2"); + signal.acquire(); + Log.warn("Thread 2 commit"); + txm.commit(); + thread2Failed.set(false); + signal.release(); + return null; + } finally { + latch.countDown(); + } + }); + + signal.release(2); + latch.await(); + + txm.begin(); + var got = curTx.getObject(Parent.class, new JObjectKey("Parent4"), LockingStrategy.READ_ONLY).orElse(null); + txm.commit(); + + if (!thread1Failed.get()) { + Assertions.assertTrue(thread2Failed.get()); + Assertions.assertEquals("John", got.getLastName()); + } else if (!thread2Failed.get()) { + Assertions.assertEquals("John2", got.getLastName()); + } else { + Assertions.fail("No thread succeeded"); + } + + Assertions.assertTrue(thread1Failed.get() || thread2Failed.get()); + } + + @Test + void editLock() throws InterruptedException { + { + txm.begin(); + var newParent = alloc.create(Parent.class, new JObjectKey("Parent5")); + newParent.setLastName("John3"); + curTx.putObject(newParent); + txm.commit(); + } + + AtomicBoolean thread1Failed = new AtomicBoolean(true); + AtomicBoolean thread2Failed = new AtomicBoolean(true); + + var signal = new Semaphore(0); + var latch = new CountDownLatch(2); + + Just.run(() -> { + try { + Log.warn("Thread 1"); + txm.begin(); + var parent = curTx.getObject(Parent.class, new JObjectKey("Parent5"), LockingStrategy.WRITE).orElse(null); + parent.setLastName("John"); + signal.acquire(); + Log.warn("Thread 1 commit"); + txm.commit(); + thread1Failed.set(false); + signal.release(); + return null; + } finally { + latch.countDown(); + } + }); + Just.run(() -> { + try { + Log.warn("Thread 2"); + txm.begin(); + var parent = curTx.getObject(Parent.class, new JObjectKey("Parent5"), LockingStrategy.WRITE).orElse(null); + parent.setLastName("John2"); + signal.acquire(); + Log.warn("Thread 2 commit"); + txm.commit(); + thread2Failed.set(false); + signal.release(); + return null; + } finally { + latch.countDown(); + } + }); + + signal.release(2); + latch.await(); + + txm.begin(); + var got = curTx.getObject(Parent.class, new JObjectKey("Parent5"), LockingStrategy.READ_ONLY).orElse(null); + txm.commit(); + + Assertions.assertTrue(!thread1Failed.get() && !thread2Failed.get()); + Assertions.assertTrue(got.getLastName().equals("John") || got.getLastName().equals("John2")); + } + // } // // @Test diff --git a/dhfs-parent/objects/src/test/resources/application.properties b/dhfs-parent/objects/src/test/resources/application.properties index 1b0d9d26..41617308 100644 --- a/dhfs-parent/objects/src/test/resources/application.properties +++ b/dhfs-parent/objects/src/test/resources/application.properties @@ -1 +1,3 @@ -dhfs.objects.persistence=memory \ No newline at end of file +dhfs.objects.persistence=memory +quarkus.log.category."com.usatiuk".level=TRACE +quarkus.log.category."com.usatiuk".min-level=TRACE