basic pre-tx hook

This commit is contained in:
2024-12-29 19:27:18 +01:00
parent 097929260b
commit 62fbaa206a
8 changed files with 274 additions and 70 deletions

View File

@@ -80,11 +80,11 @@
<artifactId>objects-common</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<!-- <dependency>-->
<!-- <groupId>com.usatiuk</groupId>-->
<!-- <artifactId>objects-common-deployment</artifactId>-->
<!-- <version>1.0-SNAPSHOT</version>-->
<!-- </dependency>-->
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-junit5-mockito</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>

View File

@@ -11,6 +11,7 @@ import com.usatiuk.objects.common.runtime.JData;
import com.usatiuk.objects.common.runtime.JObjectKey;
import io.quarkus.logging.Log;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Instance;
import jakarta.inject.Inject;
import java.io.Serializable;
@@ -36,6 +37,8 @@ public class JObjectManager {
ObjectAllocator objectAllocator;
@Inject
TransactionFactory transactionFactory;
@Inject
Instance<PreCommitTxHook> preCommitTxHooks;
private final DataLocker _storageReadLocker = new DataLocker();
private final ConcurrentHashMap<JObjectKey, JDataWrapper<?>> _objects = new ConcurrentHashMap<>();
@@ -191,77 +194,108 @@ public class JObjectManager {
}
public void commit(TransactionPrivate tx) {
Log.trace("Committing transaction " + tx.getId());
// This also holds the weak references
var toUnlock = new LinkedList<VoidFn>();
var toFlush = new LinkedList<TxRecord.TxObjectRecordWrite<?>>();
var toPut = new LinkedList<TxRecord.TxObjectRecordNew<?>>();
var toDelete = new LinkedList<JObjectKey>();
var toLock = new ArrayList<JObjectKey>();
var dependencies = new LinkedList<TransactionObject<?>>();
Log.trace("Committing transaction " + tx.getId());
// For existing objects:
// Check that their version is not higher than the version of transaction being committed
// TODO: check deletions, inserts
try {
for (var entry : tx.writes()) {
Log.trace("Processing write " + entry.toString());
switch (entry) {
case TxRecord.TxObjectRecordCopyLock<?> copy -> {
toUnlock.add(copy.original().lock().writeLock()::unlock);
toFlush.add(copy);
}
case TxRecord.TxObjectRecordOptimistic<?> copy -> {
toLock.add(copy.original().data().getKey());
toFlush.add(copy);
}
case TxRecord.TxObjectRecordNew<?> created -> {
toPut.add(created);
}
case TxRecord.TxObjectRecordDeleted deleted -> {
toLock.add(deleted.key());
toDelete.add(deleted.key());
}
default -> throw new IllegalStateException("Unexpected value: " + entry);
}
}
Collection<TxRecord.TxObjectRecord<?>> drained;
while (!(drained = tx.drainWrites()).isEmpty()) {
Log.trace("Commit iteration with " + drained.size() + " records");
var toLock = new ArrayList<JObjectKey>();
for (var entry : tx.reads().entrySet()) {
Log.trace("Processing read " + entry.toString());
switch (entry.getValue()) {
case ReadTrackingObjectSource.TxReadObjectNone<?> none -> {
// TODO: Check this
for (var entry : drained) {
Log.trace("Processing write " + entry.toString());
switch (entry) {
case TxRecord.TxObjectRecordCopyLock<?> copy -> {
toUnlock.add(copy.original().lock().writeLock()::unlock);
toFlush.add(copy);
}
case TxRecord.TxObjectRecordOptimistic<?> copy -> {
toLock.add(copy.original().data().getKey());
toFlush.add(copy);
}
case TxRecord.TxObjectRecordNew<?> created -> {
toPut.add(created);
}
case TxRecord.TxObjectRecordDeleted deleted -> {
toLock.add(deleted.getKey());
toDelete.add(deleted.getKey());
}
default -> throw new IllegalStateException("Unexpected value: " + entry);
}
case ReadTrackingObjectSource.TxReadObjectSome<?>(var obj) -> {
toLock.add(obj.data().getKey());
dependencies.add(obj);
}
default -> throw new IllegalStateException("Unexpected value: " + entry);
}
}
toLock.sort(Comparator.comparingInt(System::identityHashCode));
for (var key : toLock) {
Log.trace("Locking " + key.toString());
var got = getLocked(JData.class, key, true);
if (got == null) {
throw new IllegalStateException("Object " + key + " not found");
}
toUnlock.add(got.wrapper().lock.writeLock()::unlock);
for (var entry : tx.drainReads().entrySet()) {
Log.trace("Processing read " + entry.toString());
switch (entry.getValue()) {
case ReadTrackingObjectSource.TxReadObjectNone<?> none -> {
// TODO: Check this
}
case ReadTrackingObjectSource.TxReadObjectSome<?>(var obj) -> {
toLock.add(obj.data().getKey());
dependencies.add(obj);
}
default -> throw new IllegalStateException("Unexpected value: " + entry);
}
}
toLock.sort(Comparator.comparingInt(System::identityHashCode));
for (var key : toLock) {
Log.trace("Locking " + key.toString());
var got = getLocked(JData.class, key, true);
if (got == null) {
throw new IllegalStateException("Object " + key + " not found");
}
toUnlock.add(got.wrapper().lock.writeLock()::unlock);
}
for (var hook : preCommitTxHooks) {
for (var entry : drained) {
Log.trace("Running pre-commit hook " + hook.getClass() + " for" + entry.toString());
switch (entry) {
case TxRecord.TxObjectRecordCopyLock<?> copy -> {
hook.onChange(copy.getKey(), copy.original().data(), copy.copy().wrapped());
}
case TxRecord.TxObjectRecordOptimistic<?> copy -> {
hook.onChange(copy.getKey(), copy.original().data(), copy.copy().wrapped());
}
case TxRecord.TxObjectRecordNew<?> created -> {
hook.onCreate(created.getKey(), created.created());
}
case TxRecord.TxObjectRecordDeleted<?> deleted -> {
hook.onDelete(deleted.getKey(), deleted.original().data());
}
default -> throw new IllegalStateException("Unexpected value: " + entry);
}
}
}
}
for (var dep : dependencies) {
Log.trace("Checking dependency " + dep.toString());
var current = _objects.get(dep.data().getKey()).get();
if (current == null) continue; // FIXME? Does this matter much for deletion
// Check that the object we have locked is really the one in the map
// Note that current can be null, not only if it doesn't exist, but
// also for example in the case when it was changed and then garbage collected
if (dep.data() != current) {
throw new IllegalStateException("Serialization hazard: " + dep.data() + " vs " + current);
}
if (current.getVersion() >= tx.getId()) {
throw new IllegalStateException("Serialization hazard: " + current.getVersion() + " vs " + tx.getId());

View File

@@ -0,0 +1,15 @@
package com.usatiuk.dhfs.objects;
import com.usatiuk.objects.common.runtime.JData;
import com.usatiuk.objects.common.runtime.JObjectKey;
public interface PreCommitTxHook {
default void onChange(JObjectKey key, JData old, JData cur) {
}
default void onCreate(JObjectKey key, JData cur) {
}
default void onDelete(JObjectKey key, JData cur) {
}
}

View File

@@ -45,7 +45,8 @@ public class TransactionManagerImpl implements TransactionManager {
@Override
public void rollback() {
var tx = _currentTransaction.get();
for (var o : tx.writes()) {
// Works only before commit was called
for (var o : tx.drainWrites()) {
switch (o) {
case TxRecord.TxObjectRecordCopyLock<?> r -> r.original().lock().writeLock().unlock();
default -> {

View File

@@ -8,7 +8,10 @@ import jakarta.inject.Inject;
import lombok.AccessLevel;
import lombok.Getter;
import java.util.*;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
@ApplicationScoped
public class TransactionFactoryImpl implements TransactionFactory {
@@ -20,7 +23,7 @@ public class TransactionFactoryImpl implements TransactionFactory {
private final long _id;
private final ReadTrackingObjectSource _source;
private final Map<JObjectKey, TxRecord.TxObjectRecord<?>> _objects = new HashMap<>();
private Map<JObjectKey, TxRecord.TxObjectRecord<?>> _objects = new HashMap<>();
private TransactionImpl(long id, TransactionObjectSource source) {
_id = id;
@@ -67,7 +70,31 @@ public class TransactionFactoryImpl implements TransactionFactory {
@Override
public void delete(JObjectKey key) {
_objects.put(key, new TxRecord.TxObjectRecordDeleted(key));
// FIXME
var got = _objects.get(key);
if (got != null) {
switch (got) {
case TxRecord.TxObjectRecordNew<?> created -> {
_objects.remove(key);
}
case TxRecord.TxObjectRecordCopyLock<?> copyLockRecord -> {
_objects.put(key, new TxRecord.TxObjectRecordDeleted<>(copyLockRecord.original()));
}
case TxRecord.TxObjectRecordOptimistic<?> optimisticRecord -> {
_objects.put(key, new TxRecord.TxObjectRecordDeleted<>(optimisticRecord.original()));
}
case TxRecord.TxObjectRecordDeleted<?> deletedRecord -> {
return;
}
default -> throw new IllegalStateException("Unexpected value: " + got);
}
}
var read = _source.get(JData.class, key).orElse(null);
if (read == null) {
return;
}
_objects.put(key, new TxRecord.TxObjectRecordDeleted<>(read));
}
@Override
@@ -80,12 +107,14 @@ public class TransactionFactoryImpl implements TransactionFactory {
}
@Override
public Collection<TxRecord.TxObjectRecord<?>> writes() {
return Collections.unmodifiableCollection(_objects.values());
public Collection<TxRecord.TxObjectRecord<?>> drainWrites() {
var ret = _objects;
_objects = new HashMap<>();
return ret.values();
}
@Override
public Map<JObjectKey, ReadTrackingObjectSource.TxReadObject<?>> reads() {
public Map<JObjectKey, ReadTrackingObjectSource.TxReadObject<?>> drainReads() {
return _source.getRead();
}
}

View File

@@ -7,7 +7,7 @@ import java.util.Map;
// The transaction interface actually used by user code to retrieve objects
public interface TransactionPrivate extends Transaction {
Collection<TxRecord.TxObjectRecord<?>> writes();
Collection<TxRecord.TxObjectRecord<?>> drainWrites();
Map<JObjectKey, ReadTrackingObjectSource.TxReadObject<?>> reads();
Map<JObjectKey, ReadTrackingObjectSource.TxReadObject<?>> drainReads();
}

View File

@@ -7,19 +7,18 @@ import com.usatiuk.objects.common.runtime.JObjectKey;
public class TxRecord {
public interface TxObjectRecord<T> {
T getIfStrategyCompatible(JObjectKey key, LockingStrategy strategy);
}
public record TxObjectRecordMissing<T extends JData>(JObjectKey key) implements TxObjectRecord<T> {
@Override
public T getIfStrategyCompatible(JObjectKey key, LockingStrategy strategy) {
return null;
}
JObjectKey getKey();
}
public interface TxObjectRecordWrite<T extends JData> extends TxObjectRecord<T> {
TransactionObject<T> original();
ChangeTrackingJData<T> copy();
default JObjectKey getKey() {
return original().data().getKey();
}
}
public record TxObjectRecordNew<T extends JData>(T created) implements TxObjectRecord<T> {
@@ -29,13 +28,23 @@ public class TxRecord {
return created;
return null;
}
@Override
public JObjectKey getKey() {
return created.getKey();
}
}
public record TxObjectRecordDeleted(JObjectKey key) implements TxObjectRecord<JData> {
public record TxObjectRecordDeleted<T extends JData>(TransactionObject<T> original) implements TxObjectRecord<T> {
@Override
public JData getIfStrategyCompatible(JObjectKey key, LockingStrategy strategy) {
public T getIfStrategyCompatible(JObjectKey key, LockingStrategy strategy) {
return null;
}
@Override
public JObjectKey getKey() {
return original.data().getKey();
}
}
public record TxObjectRecordCopyLock<T extends JData>(TransactionObject<T> original,

View File

@@ -0,0 +1,116 @@
package com.usatiuk.dhfs.objects;
import com.usatiuk.dhfs.objects.data.Parent;
import com.usatiuk.dhfs.objects.transaction.Transaction;
import com.usatiuk.objects.alloc.runtime.ObjectAllocator;
import com.usatiuk.objects.common.runtime.JData;
import com.usatiuk.objects.common.runtime.JObjectKey;
import io.quarkus.test.junit.QuarkusTest;
import io.quarkus.test.junit.mockito.InjectSpy;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
@QuarkusTest
public class PreCommitTxHookTest {
@Inject
TransactionManager txm;
@Inject
Transaction curTx;
@Inject
ObjectAllocator alloc;
@ApplicationScoped
public static class DummyPreCommitTxHook implements PreCommitTxHook {
}
@InjectSpy
private DummyPreCommitTxHook spyHook;
@Test
void createObject() {
{
txm.begin();
var newParent = alloc.create(Parent.class, new JObjectKey("ParentCreate"));
newParent.setLastName("John");
curTx.put(newParent);
txm.commit();
}
{
txm.begin();
var parent = curTx.get(Parent.class, new JObjectKey("Parent")).orElse(null);
Assertions.assertEquals("John", parent.getLastName());
txm.commit();
}
ArgumentCaptor<JData> dataCaptor = ArgumentCaptor.forClass(JData.class);
ArgumentCaptor<JObjectKey> keyCaptor = ArgumentCaptor.forClass(JObjectKey.class);
Mockito.verify(spyHook, Mockito.times(1)).onCreate(keyCaptor.capture(), dataCaptor.capture());
Assertions.assertEquals("John", ((Parent) dataCaptor.getValue()).getLastName());
Assertions.assertEquals(new JObjectKey("ParentCreate"), keyCaptor.getValue());
}
@Test
void deleteObject() {
{
txm.begin();
var newParent = alloc.create(Parent.class, new JObjectKey("ParentDel"));
newParent.setLastName("John");
curTx.put(newParent);
txm.commit();
}
{
txm.begin();
var parent = curTx.get(Parent.class, new JObjectKey("ParentDel")).orElse(null);
Assertions.assertEquals("John", parent.getLastName());
txm.commit();
}
{
txm.begin();
curTx.delete(new JObjectKey("ParentDel"));
txm.commit();
}
ArgumentCaptor<JData> dataCaptor = ArgumentCaptor.forClass(JData.class);
ArgumentCaptor<JObjectKey> keyCaptor = ArgumentCaptor.forClass(JObjectKey.class);
Mockito.verify(spyHook, Mockito.times(1)).onDelete(keyCaptor.capture(), dataCaptor.capture());
Assertions.assertEquals("John", ((Parent) dataCaptor.getValue()).getLastName());
Assertions.assertEquals(new JObjectKey("ParentDel"), keyCaptor.getValue());
}
@Test
void editObject() {
{
txm.begin();
var newParent = alloc.create(Parent.class, new JObjectKey("ParentEdit"));
newParent.setLastName("John");
curTx.put(newParent);
txm.commit();
}
{
txm.begin();
var parent = curTx.get(Parent.class, new JObjectKey("ParentEdit")).orElse(null);
Assertions.assertEquals("John", parent.getLastName());
parent.setLastName("John changed");
txm.commit();
}
ArgumentCaptor<JData> dataCaptorOld = ArgumentCaptor.forClass(JData.class);
ArgumentCaptor<JData> dataCaptorNew = ArgumentCaptor.forClass(JData.class);
ArgumentCaptor<JObjectKey> keyCaptor = ArgumentCaptor.forClass(JObjectKey.class);
Mockito.verify(spyHook, Mockito.times(1)).onChange(keyCaptor.capture(), dataCaptorOld.capture(), dataCaptorNew.capture());
Assertions.assertEquals("John", ((Parent) dataCaptorOld.getValue()).getLastName());
Assertions.assertEquals("John changed", ((Parent) dataCaptorNew.getValue()).getLastName());
Assertions.assertEquals(new JObjectKey("ParentEdit"), keyCaptor.getValue());
}
}