working object edit 2

This commit is contained in:
2024-12-02 23:06:20 +01:00
parent 70b8105451
commit ce2822595b
5 changed files with 174 additions and 69 deletions

View File

@@ -92,23 +92,16 @@ public class JObjectManager {
}
}
private record TransactionObjectImpl<T extends JData>
(T data, ReadWriteLock lock)
implements TransactionObjectSource.TransactionObject<T> {}
private final TransactionObjectSource _objSource = new TransactionObjectSource() {
@Override
public <T extends JData> Optional<TransactionObject<T>> get(Class<T> type, JObjectKey key) {
var got = JObjectManager.this.get(type, key);
if (got == null) return Optional.empty();
return Optional.of(new TransactionObject<>() {
@Override
public T get() {
return got.getLeft();
}
@Override
public ReadWriteLock getLock() {
return got.getRight().lock;
}
});
return Optional.of(new TransactionObjectImpl<>(got.getLeft(), got.getRight().lock));
}
};
@@ -131,10 +124,10 @@ public class JObjectManager {
Log.trace("Processing entry " + entry.toString());
switch (entry) {
case TxRecord.TxObjectRecordRead<?> read -> {
toUnlock.add(read.original().getLock().readLock()::unlock);
toUnlock.add(read.original().lock().readLock()::unlock);
}
case TxRecord.TxObjectRecordCopyLock<?> copy -> {
toUnlock.add(copy.original().getLock().writeLock()::unlock);
toUnlock.add(copy.original().lock().writeLock()::unlock);
if (copy.copy().isModified()) {
toFlush.add(copy);
}
@@ -176,7 +169,7 @@ public class JObjectManager {
throw new IllegalStateException("Object not found during transaction");
} else if (current != null) {
var old = switch (record) {
case TxRecord.TxObjectRecordCopyLock<?> copy -> copy.original().get();
case TxRecord.TxObjectRecordCopyLock<?> copy -> copy.original().data();
case TxRecord.TxObjectRecordCopyNoLock<?> copy -> copy.original();
default -> throw new IllegalStateException("Unexpected value: " + record);
};

View File

@@ -49,27 +49,40 @@ public class TransactionFactoryImpl implements TransactionFactory {
switch (strategy) {
case READ_ONLY: {
read.getLock().readLock().lock();
var view = objectAllocator.unmodifiable(read.get());
read.lock().readLock().lock();
var view = objectAllocator.unmodifiable(read.data());
_objects.put(key, new TxRecord.TxObjectRecordRead<>(read, view));
return Optional.of(view);
}
case WRITE:
case OPTIMISTIC: {
var copy = objectAllocator.copy(read.get());
switch (strategy) {
case WRITE:
read.getLock().writeLock().lock();
_objects.put(key, new TxRecord.TxObjectRecordCopyLock<>(read, copy));
break;
case OPTIMISTIC:
_objects.put(key, new TxRecord.TxObjectRecordCopyNoLock<>(read.get(), copy));
break;
}
var copy = objectAllocator.copy(read.data());
_objects.put(key, new TxRecord.TxObjectRecordCopyNoLock<>(read.data(), copy));
return Optional.of(copy.wrapped());
}
case WRITE: {
read.lock().writeLock().lock();
while (true) {
try {
var readAgain = _source.get(type, key).orElse(null);
if (readAgain == null) {
read.lock().writeLock().unlock();
return Optional.empty();
}
if (!Objects.equals(read, readAgain)) {
read.lock().writeLock().unlock();
read = readAgain;
read.lock().writeLock().lock();
continue;
}
var copy = objectAllocator.copy(read.data());
_objects.put(key, new TxRecord.TxObjectRecordCopyLock<>(read, copy));
return Optional.of(copy.wrapped());
} catch (Throwable e) {
read.lock().writeLock().unlock();
throw e;
}
}
}
default:
throw new IllegalArgumentException("Unknown locking strategy");
}

View File

@@ -8,9 +8,9 @@ import java.util.concurrent.locks.ReadWriteLock;
public interface TransactionObjectSource {
interface TransactionObject<T extends JData> {
T get();
T data();
ReadWriteLock getLock();
ReadWriteLock lock();
}
<T extends JData> Optional<TransactionObject<T>> get(Class<T> type, JObjectKey key);

View File

@@ -8,6 +8,7 @@ import jakarta.inject.Inject;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -80,12 +81,10 @@ public class ObjectsTest {
AtomicBoolean thread2Failed = new AtomicBoolean(true);
var signal = new Semaphore(0);
var signalFin = new Semaphore(2);
var latch = new CountDownLatch(2);
Just.run(() -> {
try {
signalFin.acquire();
Log.warn("Thread 1");
txm.begin();
var newParent = alloc.create(Parent.class, new JObjectKey("Parent2"));
@@ -98,12 +97,11 @@ public class ObjectsTest {
signal.release();
return null;
} finally {
signalFin.release();
latch.countDown();
}
});
Just.run(() -> {
try {
signalFin.acquire();
Log.warn("Thread 2");
txm.begin();
var newParent = alloc.create(Parent.class, new JObjectKey("Parent2"));
@@ -116,12 +114,12 @@ public class ObjectsTest {
signal.release();
return null;
} finally {
signalFin.release();
latch.countDown();
}
});
signal.release(2);
signalFin.acquire(2);
latch.await();
txm.begin();
var got = curTx.getObject(Parent.class, new JObjectKey("Parent2"), LockingStrategy.READ_ONLY).orElse(null);
@@ -135,37 +133,136 @@ public class ObjectsTest {
} else {
Assertions.fail("No thread succeeded");
}
}
// @Test
// void editConflict() {
// {
// var tx = _tx.beginTransaction();
// var parent = tx.getObject(new JObjectKey("Parent"), Parent.class);
// parent.setName("John");
// tx.commit();
// }
//
// {
// var tx = _tx.beginTransaction();
// var parent = tx.getObject(new JObjectKey("Parent"), Parent.class);
// parent.setName("John2");
//
// var tx2 = _tx.beginTransaction();
// var parent2 = tx2.getObject(new JObjectKey("Parent"), Parent.class);
// parent2.setName("John3");
//
// tx.commit();
// Assertions.assertThrows(Exception.class, tx2::commit);
// }
//
// {
// var tx2 = _tx.beginTransaction();
// var parent = tx2.getObject(new JObjectKey("Parent"));
// Assertions.assertInstanceOf(Parent.class, parent);
// Assertions.assertEquals("John2", ((Parent) parent).getName());
// }
@Test
void editConflict() throws InterruptedException {
{
txm.begin();
var newParent = alloc.create(Parent.class, new JObjectKey("Parent4"));
newParent.setLastName("John3");
curTx.putObject(newParent);
txm.commit();
}
AtomicBoolean thread1Failed = new AtomicBoolean(true);
AtomicBoolean thread2Failed = new AtomicBoolean(true);
var signal = new Semaphore(0);
var latch = new CountDownLatch(2);
Just.run(() -> {
try {
Log.warn("Thread 1");
txm.begin();
var parent = curTx.getObject(Parent.class, new JObjectKey("Parent4"), LockingStrategy.OPTIMISTIC).orElse(null);
parent.setLastName("John");
signal.acquire();
Log.warn("Thread 1 commit");
txm.commit();
thread1Failed.set(false);
signal.release();
return null;
} finally {
latch.countDown();
}
});
Just.run(() -> {
try {
Log.warn("Thread 2");
txm.begin();
var parent = curTx.getObject(Parent.class, new JObjectKey("Parent4"), LockingStrategy.OPTIMISTIC).orElse(null);
parent.setLastName("John2");
signal.acquire();
Log.warn("Thread 2 commit");
txm.commit();
thread2Failed.set(false);
signal.release();
return null;
} finally {
latch.countDown();
}
});
signal.release(2);
latch.await();
txm.begin();
var got = curTx.getObject(Parent.class, new JObjectKey("Parent4"), LockingStrategy.READ_ONLY).orElse(null);
txm.commit();
if (!thread1Failed.get()) {
Assertions.assertTrue(thread2Failed.get());
Assertions.assertEquals("John", got.getLastName());
} else if (!thread2Failed.get()) {
Assertions.assertEquals("John2", got.getLastName());
} else {
Assertions.fail("No thread succeeded");
}
Assertions.assertTrue(thread1Failed.get() || thread2Failed.get());
}
@Test
void editLock() throws InterruptedException {
{
txm.begin();
var newParent = alloc.create(Parent.class, new JObjectKey("Parent5"));
newParent.setLastName("John3");
curTx.putObject(newParent);
txm.commit();
}
AtomicBoolean thread1Failed = new AtomicBoolean(true);
AtomicBoolean thread2Failed = new AtomicBoolean(true);
var signal = new Semaphore(0);
var latch = new CountDownLatch(2);
Just.run(() -> {
try {
Log.warn("Thread 1");
txm.begin();
var parent = curTx.getObject(Parent.class, new JObjectKey("Parent5"), LockingStrategy.WRITE).orElse(null);
parent.setLastName("John");
signal.acquire();
Log.warn("Thread 1 commit");
txm.commit();
thread1Failed.set(false);
signal.release();
return null;
} finally {
latch.countDown();
}
});
Just.run(() -> {
try {
Log.warn("Thread 2");
txm.begin();
var parent = curTx.getObject(Parent.class, new JObjectKey("Parent5"), LockingStrategy.WRITE).orElse(null);
parent.setLastName("John2");
signal.acquire();
Log.warn("Thread 2 commit");
txm.commit();
thread2Failed.set(false);
signal.release();
return null;
} finally {
latch.countDown();
}
});
signal.release(2);
latch.await();
txm.begin();
var got = curTx.getObject(Parent.class, new JObjectKey("Parent5"), LockingStrategy.READ_ONLY).orElse(null);
txm.commit();
Assertions.assertTrue(!thread1Failed.get() && !thread2Failed.get());
Assertions.assertTrue(got.getLastName().equals("John") || got.getLastName().equals("John2"));
}
// }
//
// @Test

View File

@@ -1 +1,3 @@
dhfs.objects.persistence=memory
dhfs.objects.persistence=memory
quarkus.log.category."com.usatiuk".level=TRACE
quarkus.log.category."com.usatiuk".min-level=TRACE