diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/JObjectManager.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/JObjectManager.java index 944c99db..28e336f9 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/JObjectManager.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/JObjectManager.java @@ -1,11 +1,10 @@ package com.usatiuk.dhfs.objects; -import com.google.common.collect.Streams; import com.usatiuk.dhfs.objects.persistence.ObjectPersistentStore; import com.usatiuk.dhfs.objects.persistence.TxManifest; import com.usatiuk.dhfs.objects.transaction.*; +import com.usatiuk.dhfs.utils.AutoCloseableNoThrow; import com.usatiuk.dhfs.utils.DataLocker; -import com.usatiuk.dhfs.utils.VoidFn; import com.usatiuk.objects.alloc.runtime.ObjectAllocator; import com.usatiuk.objects.common.runtime.JData; import com.usatiuk.objects.common.runtime.JObjectKey; @@ -20,8 +19,8 @@ import java.lang.ref.WeakReference; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Consumer; +import java.util.function.Function; // Manages all access to com.usatiuk.objects.common.runtime.JData objects. // In particular, it serves as a source of truth for what is committed to the backing storage. @@ -44,15 +43,13 @@ public class JObjectManager { _preCommitTxHooks = preCommitTxHooks.stream().sorted(Comparator.comparingInt(PreCommitTxHook::getPriority)).toList(); } - private final DataLocker _storageReadLocker = new DataLocker(); + private final DataLocker _objLocker = new DataLocker(); private final ConcurrentHashMap> _objects = new ConcurrentHashMap<>(); private final AtomicLong _txCounter = new AtomicLong(); private class JDataWrapper extends WeakReference { private static final Cleaner CLEANER = Cleaner.create(); - final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); - public JDataWrapper(T referent) { super(referent); var key = referent.getKey(); @@ -65,15 +62,11 @@ public class JObjectManager { public String toString() { return "JDataWrapper{" + "ref=" + get() + - ", lock=" + lock + '}'; } } - private record WrapperRet(T obj, JDataWrapper wrapper) { - } - - private WrapperRet get(Class type, JObjectKey key) { + private T get(Class type, JObjectKey key) { while (true) { { var got = _objects.get(key); @@ -81,7 +74,7 @@ public class JObjectManager { if (got != null) { var ref = got.get(); if (type.isInstance(ref)) { - return new WrapperRet<>((T) ref, (JDataWrapper) got); + return type.cast(ref); } else if (ref == null) { _objects.remove(key, got); } else { @@ -91,59 +84,48 @@ public class JObjectManager { } //noinspection unused - try (var readLock = _storageReadLocker.lock(key)) { - var read = objectStorage.readObject(key).orElse(null); + try (var readLock = _objLocker.lock(key)) { + if (_objects.containsKey(key)) continue; + + var read = objectStorage.readObject(key) + .map(objectSerializer::deserialize) + .orElse(null); + if (read == null) return null; - var got = objectSerializer.deserialize(read); - - if (type.isInstance(got)) { - var wrapper = new JDataWrapper<>((T) got); - var old = _objects.putIfAbsent(key, wrapper); - if (old != null) continue; - return new WrapperRet<>((T) got, wrapper); - } else if (got == null) { - return null; + if (type.isInstance(read)) { + var wrapper = new JDataWrapper<>(type.cast(read)); + var old = _objects.put(key, wrapper); + assert old == null; + return type.cast(read); } else { - throw new IllegalArgumentException("Object type mismatch: " + got.getClass() + " vs " + type); + throw new IllegalArgumentException("Object type mismatch: " + read.getClass() + " vs " + type); } } } } - - private WrapperRet getLocked(Class type, JObjectKey key, boolean write) { - var read = get(type, key); - if (read == null) return null; - var lock = write ? read.wrapper().lock.writeLock() : read.wrapper().lock.readLock(); - lock.lock(); - while (true) { - try { - var readAgain = get(type, key); - if (readAgain == null) { - lock.unlock(); - return null; - } - if (!Objects.equals(read, readAgain)) { - lock.unlock(); - read = readAgain; - lock = write ? read.wrapper().lock.writeLock() : read.wrapper().lock.readLock(); - lock.lock(); - continue; - } - return read; - } catch (Throwable e) { - lock.unlock(); - throw e; - } - } - } - - private record TransactionObjectImpl - (T data, ReadWriteLock lock) + private record TransactionObjectNoLock + (Optional data) implements TransactionObject { } + private record TransactionObjectLocked + (Optional data, AutoCloseableNoThrow lock) + implements TransactionObject { + } + + private TransactionObjectNoLock getObj(Class type, JObjectKey key) { + var got = get(type, key); + return new TransactionObjectNoLock<>(Optional.ofNullable(got)); + } + + private TransactionObjectLocked getObjLock(Class type, JObjectKey key) { + var lock = _objLocker.lock(key); + var got = get(type, key); + return new TransactionObjectLocked<>(Optional.ofNullable(got), lock); + } + private class TransactionObjectSourceImpl implements TransactionObjectSource { private final long _txId; @@ -152,21 +134,26 @@ public class JObjectManager { } @Override - public Optional> get(Class type, JObjectKey key) { - var got = JObjectManager.this.get(type, key); - if (got == null) return Optional.empty(); - return Optional.of(new TransactionObjectImpl<>(got.obj(), got.wrapper().lock)); + public TransactionObject get(Class type, JObjectKey key) { + return getObj(type, key); +// return getObj(type, key).map(got -> { +// if (got.data().getVersion() > _txId) { +// throw new IllegalStateException("Serialization race for " + key + ": " + got.data().getVersion() + " vs " + _txId); +// } +// return got; +// }); } @Override - public Optional> getWriteLocked(Class type, JObjectKey key) { - var got = JObjectManager.this.getLocked(type, key, true); - if (got == null) return Optional.empty(); - if (got.obj.getVersion() >= _txId) { - got.wrapper().lock.writeLock().unlock(); - throw new IllegalStateException("Serialization race"); - } - return Optional.of(new TransactionObjectImpl<>(got.obj(), got.wrapper().lock)); + public TransactionObject getWriteLocked(Class type, JObjectKey key) { + return getObjLock(type, key); +// return getObjLock(type, key).map(got -> { +// if (got.data().getVersion() > _txId) { +// got.lock.close(); +// throw new IllegalStateException("Serialization race for " + key + ": " + got.data().getVersion() + " vs " + _txId); +// } +// return got; +// }); } } @@ -200,168 +187,130 @@ public class JObjectManager { public void commit(TransactionPrivate tx) { Log.trace("Committing transaction " + tx.getId()); - // This also holds the weak references - var toUnlock = new LinkedList(); + var current = new LinkedHashMap>(); + var dependenciesLocked = new LinkedHashMap>(); + var toUnlock = new ArrayList(); - var toFlush = new LinkedList>(); - var toPut = new LinkedList>(); - var toDelete = new LinkedList(); - var dependencies = new LinkedList>(); + Consumer addDependency = + key -> { + dependenciesLocked.computeIfAbsent(key, k -> { + Log.trace("Adding dependency " + k.toString()); + var got = getObjLock(JData.class, k); + toUnlock.add(got.lock); + return got; + }); + }; + + Function getCurrent = + key -> switch (current.get(key)) { + case TxRecord.TxObjectRecordWrite write -> write.data(); + case TxRecord.TxObjectRecordDeleted deleted -> null; + case null -> { + var dep = dependenciesLocked.get(key); + if (dep == null) { + throw new IllegalStateException("No dependency for " + key); + } + yield dep.data.orElse(null); + } + default -> { + throw new IllegalStateException("Unexpected value: " + current.get(key)); + } + }; // For existing objects: // Check that their version is not higher than the version of transaction being committed // TODO: check deletions, inserts - try { Collection> drained; while (!(drained = tx.drainNewWrites()).isEmpty()) { - Log.trace("Commit iteration with " + drained.size() + " records"); var toLock = new ArrayList(); - for (var entry : drained) { - Log.trace("Processing write " + entry.toString()); - switch (entry) { - case TxRecord.TxObjectRecordCopyLock copy -> { - toUnlock.add(copy.original().lock().writeLock()::unlock); - toFlush.add(copy); - } - case TxRecord.TxObjectRecordOptimistic copy -> { - toLock.add(copy.original().data().getKey()); - toFlush.add(copy); - } - case TxRecord.TxObjectRecordNew created -> { - toPut.add(created); - } - case TxRecord.TxObjectRecordDeleted deleted -> { - toLock.add(deleted.getKey()); - toDelete.add(deleted.getKey()); - } - default -> throw new IllegalStateException("Unexpected value: " + entry); - } - } + Log.trace("Commit iteration with " + drained.size() + " records"); - for (var entry : tx.reads().entrySet()) { - Log.trace("Processing read " + entry.toString()); - switch (entry.getValue()) { - case ReadTrackingObjectSource.TxReadObjectNone none -> { - // TODO: Check this - } - case ReadTrackingObjectSource.TxReadObjectSome(var obj) -> { - toLock.add(obj.data().getKey()); - dependencies.add(obj); - } - default -> throw new IllegalStateException("Unexpected value: " + entry); - } - } - - toLock.sort(Comparator.comparingInt(System::identityHashCode)); - - for (var key : toLock) { - Log.trace("Locking " + key.toString()); - - var got = getLocked(JData.class, key, true); - - if (got == null) { - throw new IllegalStateException("Object " + key + " not found"); - } - - toUnlock.add(got.wrapper().lock.writeLock()::unlock); - } + drained.stream() + .map(TxRecord.TxObjectRecord::key) + .sorted(Comparator.comparing(JObjectKey::toString)) + .forEach(addDependency); for (var hook : _preCommitTxHooks) { for (var entry : drained) { Log.trace("Running pre-commit hook " + hook.getClass() + " for" + entry.toString()); switch (entry) { - case TxRecord.TxObjectRecordCopyLock copy -> { - hook.onChange(copy.getKey(), copy.original().data(), copy.copy().wrapped()); + case TxRecord.TxObjectRecordWrite write -> { + var oldObj = getCurrent.apply(write.key()); + if (oldObj == null) { + hook.onCreate(write.key(), write.data()); + } else { + hook.onChange(write.key(), oldObj, write.data()); + } } - case TxRecord.TxObjectRecordOptimistic copy -> { - hook.onChange(copy.getKey(), copy.original().data(), copy.copy().wrapped()); - } - case TxRecord.TxObjectRecordNew created -> { - hook.onCreate(created.getKey(), created.created()); - } - case TxRecord.TxObjectRecordDeleted deleted -> { - hook.onDelete(deleted.getKey(), deleted.current()); + case TxRecord.TxObjectRecordDeleted deleted -> { + hook.onDelete(deleted.key(), getCurrent.apply(deleted.key())); } default -> throw new IllegalStateException("Unexpected value: " + entry); } + current.put(entry.key(), entry); } } } - for (var dep : dependencies) { - Log.trace("Checking dependency " + dep.toString()); - var current = _objects.get(dep.data().getKey()).get(); - - // Check that the object we have locked is really the one in the map - // Note that current can be null, not only if it doesn't exist, but - // also for example in the case when it was changed and then garbage collected - if (dep.data() != current) { - throw new IllegalStateException("Serialization hazard: " + dep.data() + " vs " + current); - } - - if (current.getVersion() >= tx.getId()) { - throw new IllegalStateException("Serialization hazard: " + current.getVersion() + " vs " + tx.getId()); + // FIXME: lock leak + for (var read : tx.reads().entrySet()) { + addDependency.accept(read.getKey()); + if (read.getValue() instanceof TransactionObjectLocked locked) { + toUnlock.add(locked.lock); } } - for (var put : toPut) { - Log.trace("Putting new object " + put.toString()); - var wrapper = new JDataWrapper<>(put.created()); - wrapper.lock.writeLock().lock(); - var old = _objects.putIfAbsent(put.created().getKey(), wrapper); - if (old != null) - throw new IllegalStateException("Object already exists: " + old.get()); - toUnlock.add(wrapper.lock.writeLock()::unlock); - } + for (var dep : dependenciesLocked.entrySet()) { + Log.trace("Checking dependency " + dep.getKey()); - for (var record : toFlush) { - if (!record.copy().isModified()) { - Log.trace("Not changed " + record.toString()); - continue; + if (dep.getValue().data.isEmpty()) continue; + + if (dep.getValue().data.get().getVersion() >= tx.getId()) { + throw new IllegalStateException("Serialization hazard: " + dep.getValue().data.get().getVersion() + " vs " + tx.getId()); } - - Log.trace("Flushing changed " + record.toString()); - var current = _objects.get(record.original().data().getKey()); - - var newWrapper = new JDataWrapper<>(record.copy().wrapped()); - newWrapper.lock.writeLock().lock(); - if (!_objects.replace(record.copy().wrapped().getKey(), current, newWrapper)) { - assert false; // Should not happen, as the object is locked - throw new IllegalStateException("Object changed during transaction after locking: " + current.get() + " vs " + record.copy().wrapped()); - } - toUnlock.add(newWrapper.lock.writeLock()::unlock); } Log.tracef("Flushing transaction %d to storage", tx.getId()); - var written = Streams.concat(toFlush.stream().map(f -> f.copy().wrapped()), - toPut.stream().map(TxRecord.TxObjectRecordNew::created)).toList(); + var toDelete = new ArrayList(); + var toWrite = new ArrayList(); - // Really flushing to storage - written.forEach(obj -> { - Log.trace("Flushing object " + obj.getKey()); - assert obj.getVersion() == tx.getId(); - var key = obj.getKey(); - var data = objectSerializer.serialize(obj); - objectStorage.writeObject(key, data); - }); + for (var action : current.entrySet()) { + switch (action.getValue()) { + case TxRecord.TxObjectRecordWrite write -> { + Log.trace("Flushing object " + action.getKey()); + toWrite.add(action.getKey()); + var data = objectSerializer.serialize(write.data()); + objectStorage.writeObject(action.getKey(), data); + _objects.put(action.getKey(), new JDataWrapper<>(write.data())); + } + case TxRecord.TxObjectRecordDeleted deleted -> { + Log.trace("Deleting object " + action.getKey()); + toDelete.add(action.getKey()); + _objects.remove(action.getKey()); + } + default -> { + throw new IllegalStateException("Unexpected value: " + action.getValue()); + } + } + } Log.tracef("Committing transaction %d to storage", tx.getId()); - objectStorage.commitTx(new SimpleTxManifest(written.stream().map(JData::getKey).toList(), toDelete)); - - for (var del : toDelete) { - _objects.remove(del); - } - } catch (Throwable t) { + objectStorage.commitTx(new SimpleTxManifest(toWrite, toDelete)); + } catch ( + Throwable t) { Log.error("Error when committing transaction", t); throw t; } finally { for (var unlock : toUnlock) { - unlock.apply(); + unlock.close(); } } } + + public void rollback(TransactionPrivate tx) { + } } \ No newline at end of file diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/TransactionManagerImpl.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/TransactionManagerImpl.java index 9e8d9433..91ee6d50 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/TransactionManagerImpl.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/TransactionManagerImpl.java @@ -2,7 +2,6 @@ package com.usatiuk.dhfs.objects; import com.usatiuk.dhfs.objects.transaction.Transaction; import com.usatiuk.dhfs.objects.transaction.TransactionPrivate; -import com.usatiuk.dhfs.objects.transaction.TxRecord; import io.quarkus.logging.Log; import jakarta.enterprise.context.ApplicationScoped; import jakarta.inject.Inject; @@ -44,16 +43,18 @@ public class TransactionManagerImpl implements TransactionManager { @Override public void rollback() { - var tx = _currentTransaction.get(); - // Works only before commit was called - for (var o : tx.drainNewWrites()) { - switch (o) { - case TxRecord.TxObjectRecordCopyLock r -> r.original().lock().writeLock().unlock(); - default -> { - } - } + if (_currentTransaction.get() == null) { + throw new IllegalStateException("No transaction started"); + } + + try { + jObjectManager.rollback(_currentTransaction.get()); + } catch (Throwable e) { + Log.error("Transaction rollback failed", e); + throw e; + } finally { + _currentTransaction.remove(); } - _currentTransaction.remove(); } @Override diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/ReadTrackingObjectSource.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/ReadTrackingObjectSource.java index a8337b95..ff8931f9 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/ReadTrackingObjectSource.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/ReadTrackingObjectSource.java @@ -6,78 +6,53 @@ import com.usatiuk.objects.common.runtime.JObjectKey; import java.util.Collections; import java.util.HashMap; import java.util.Map; -import java.util.Optional; public class ReadTrackingObjectSource implements TransactionObjectSource { private final TransactionObjectSource _delegate; - public interface TxReadObject {} - - public record TxReadObjectNone() implements TxReadObject {} - - public record TxReadObjectSome(TransactionObject obj) implements TxReadObject {} - - private final Map> _readSet = new HashMap<>(); + private final Map> _readSet = new HashMap<>(); public ReadTrackingObjectSource(TransactionObjectSource delegate) { _delegate = delegate; } - public Map> getRead() { + public Map> getRead() { return Collections.unmodifiableMap(_readSet); } @Override - public Optional> get(Class type, JObjectKey key) { + public TransactionObject get(Class type, JObjectKey key) { var got = _readSet.get(key); if (got == null) { var read = _delegate.get(type, key); - if (read.isPresent()) { - _readSet.put(key, new TxReadObjectSome<>(read.get())); - } else { - _readSet.put(key, new TxReadObjectNone<>()); - } + _readSet.put(key, read); return read; } - return switch (got) { - case TxReadObjectNone none -> Optional.empty(); - case TxReadObjectSome some -> { - if (type.isInstance(some.obj().data())) { - yield Optional.of((TransactionObject) some.obj()); - } else { - yield Optional.empty(); - } - } - default -> throw new IllegalStateException("Unexpected value: " + got); - }; + got.data().ifPresent(data -> { + if (!type.isInstance(data)) + throw new IllegalStateException("Type mismatch for " + got + ": expected " + type + ", got " + data.getClass()); + }); + + return (TransactionObject) got; } @Override - public Optional> getWriteLocked(Class type, JObjectKey key) { + public TransactionObject getWriteLocked(Class type, JObjectKey key) { var got = _readSet.get(key); if (got == null) { var read = _delegate.getWriteLocked(type, key); - if (read.isPresent()) { - _readSet.put(key, new TxReadObjectSome<>(read.get())); - } else { - _readSet.put(key, new TxReadObjectNone<>()); - } + _readSet.put(key, read); return read; } - return switch (got) { - case TxReadObjectNone none -> Optional.empty(); - case TxReadObjectSome some -> { - if (type.isInstance(some.obj().data())) { - yield Optional.of((TransactionObject) some.obj()); - } else { - yield Optional.empty(); - } - } - default -> throw new IllegalStateException("Unexpected value: " + got); - }; + got.data().ifPresent(data -> { + if (!type.isInstance(data)) + throw new IllegalStateException("Type mismatch for " + got + ": expected " + type + ", got " + data.getClass()); + }); + + return (TransactionObject) got; } } diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/TransactionFactoryImpl.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/TransactionFactoryImpl.java index 43a4bbbb..fda96766 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/TransactionFactoryImpl.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/TransactionFactoryImpl.java @@ -23,8 +23,8 @@ public class TransactionFactoryImpl implements TransactionFactory { private final long _id; private final ReadTrackingObjectSource _source; - private Map> _objects = new HashMap<>(); - private Map> _newObjects = new HashMap<>(); + private final Map> _writes = new HashMap<>(); + private Map> _newWrites = new HashMap<>(); private TransactionImpl(long id, TransactionObjectSource source) { _id = id; @@ -33,97 +33,53 @@ public class TransactionFactoryImpl implements TransactionFactory { @Override public Optional get(Class type, JObjectKey key, LockingStrategy strategy) { - var got = _objects.get(key); - if (got != null) { - var compatible = got.getIfStrategyCompatible(key, strategy); - if (compatible == null) { - throw new IllegalArgumentException("Locking strategy mismatch"); - } - if (!type.isInstance(compatible)) { - throw new IllegalArgumentException("Object type mismatch"); - } - return Optional.of(type.cast(compatible)); - } - - switch (strategy) { - case OPTIMISTIC: { - var read = _source.get(type, key).orElse(null); - if (read == null) { - return Optional.empty(); - } - var copy = objectAllocator.copy(read.data()); - _objects.put(key, new TxRecord.TxObjectRecordOptimistic<>(read, copy)); - _newObjects.put(key, new TxRecord.TxObjectRecordOptimistic<>(read, copy)); - return Optional.of(copy.wrapped()); - } - case WRITE: { - var locked = _source.getWriteLocked(type, key).orElse(null); - if (locked == null) { - return Optional.empty(); - } - var copy = objectAllocator.copy(locked.data()); - _objects.put(key, new TxRecord.TxObjectRecordCopyLock<>(locked, copy)); - _newObjects.put(key, new TxRecord.TxObjectRecordCopyLock<>(locked, copy)); - return Optional.of(copy.wrapped()); - } - default: - throw new IllegalArgumentException("Unknown locking strategy"); - } + return switch (strategy) { + case OPTIMISTIC -> _source.get(type, key).data(); + case WRITE -> _source.getWriteLocked(type, key).data(); + }; } @Override public void delete(JObjectKey key) { +// get(JData.class, key, LockingStrategy.OPTIMISTIC); + // FIXME - var got = _objects.get(key); + var got = _writes.get(key); if (got != null) { switch (got) { - case TxRecord.TxObjectRecordNew created -> { - _objects.remove(key); - _newObjects.remove(key); - } - case TxRecord.TxObjectRecordCopyLock copyLockRecord -> { - _objects.put(key, new TxRecord.TxObjectRecordDeleted<>(copyLockRecord)); - _newObjects.put(key, new TxRecord.TxObjectRecordDeleted<>(copyLockRecord)); - } - case TxRecord.TxObjectRecordOptimistic optimisticRecord -> { - _objects.put(key, new TxRecord.TxObjectRecordDeleted<>(optimisticRecord)); - _newObjects.put(key, new TxRecord.TxObjectRecordDeleted<>(optimisticRecord)); - } - case TxRecord.TxObjectRecordDeleted deletedRecord -> { + case TxRecord.TxObjectRecordDeleted deleted -> { return; } - default -> throw new IllegalStateException("Unexpected value: " + got); + default -> { + } } - return; } - - var read = _source.get(JData.class, key).orElse(null); - if (read == null) { - return; - } - _objects.put(key, new TxRecord.TxObjectRecordDeleted<>(read)); // FIXME: - _newObjects.put(key, new TxRecord.TxObjectRecordDeleted<>(read)); +// +// var read = _source.get(JData.class, key).orElse(null); +// if (read == null) { +// return; +// } + _writes.put(key, new TxRecord.TxObjectRecordDeleted(key)); // FIXME: + _newWrites.put(key, new TxRecord.TxObjectRecordDeleted(key)); } @Override public void put(JData obj) { - if (_objects.containsKey(obj.getKey())) { - throw new IllegalArgumentException("Object already exists in transaction"); - } +// get(JData.class, obj.getKey(), LockingStrategy.OPTIMISTIC); - _objects.put(obj.getKey(), new TxRecord.TxObjectRecordNew<>(obj)); - _newObjects.put(obj.getKey(), new TxRecord.TxObjectRecordNew<>(obj)); + _writes.put(obj.getKey(), new TxRecord.TxObjectRecordWrite<>(obj)); + _newWrites.put(obj.getKey(), new TxRecord.TxObjectRecordWrite<>(obj)); } @Override public Collection> drainNewWrites() { - var ret = _newObjects; - _newObjects = new HashMap<>(); + var ret = _newWrites; + _newWrites = new HashMap<>(); return ret.values(); } @Override - public Map> reads() { + public Map> reads() { return _source.getRead(); } } diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/TransactionObject.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/TransactionObject.java index cd5dc7e6..0d87926e 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/TransactionObject.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/TransactionObject.java @@ -2,10 +2,8 @@ package com.usatiuk.dhfs.objects.transaction; import com.usatiuk.objects.common.runtime.JData; -import java.util.concurrent.locks.ReadWriteLock; +import java.util.Optional; public interface TransactionObject { - T data(); - - ReadWriteLock lock(); + Optional data(); } diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/TransactionObjectSource.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/TransactionObjectSource.java index 14835797..acfae1ca 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/TransactionObjectSource.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/TransactionObjectSource.java @@ -3,10 +3,8 @@ package com.usatiuk.dhfs.objects.transaction; import com.usatiuk.objects.common.runtime.JData; import com.usatiuk.objects.common.runtime.JObjectKey; -import java.util.Optional; - public interface TransactionObjectSource { - Optional> get(Class type, JObjectKey key); + TransactionObject get(Class type, JObjectKey key); - Optional> getWriteLocked(Class type, JObjectKey key); + TransactionObject getWriteLocked(Class type, JObjectKey key); } diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/TransactionPrivate.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/TransactionPrivate.java index 25f39804..c2e147ed 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/TransactionPrivate.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/TransactionPrivate.java @@ -9,5 +9,5 @@ import java.util.Map; public interface TransactionPrivate extends Transaction { Collection> drainNewWrites(); - Map> reads(); + Map> reads(); } diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/TxRecord.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/TxRecord.java index 7104706b..7600fc31 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/TxRecord.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/TxRecord.java @@ -1,80 +1,20 @@ package com.usatiuk.dhfs.objects.transaction; -import com.usatiuk.objects.alloc.runtime.ChangeTrackingJData; import com.usatiuk.objects.common.runtime.JData; import com.usatiuk.objects.common.runtime.JObjectKey; public class TxRecord { public interface TxObjectRecord { - T getIfStrategyCompatible(JObjectKey key, LockingStrategy strategy); - - JObjectKey getKey(); + JObjectKey key(); } - public interface TxObjectRecordWrite extends TxObjectRecord { - TransactionObject original(); - - ChangeTrackingJData copy(); - - default JObjectKey getKey() { - return original().data().getKey(); + public record TxObjectRecordWrite(JData data) implements TxObjectRecord { + @Override + public JObjectKey key() { + return data.getKey(); } } - public record TxObjectRecordNew(T created) implements TxObjectRecord { - @Override - public T getIfStrategyCompatible(JObjectKey key, LockingStrategy strategy) { - if (strategy == LockingStrategy.WRITE || strategy == LockingStrategy.OPTIMISTIC) - return created; - return null; - } - - @Override - public JObjectKey getKey() { - return created.getKey(); - } - } - - public record TxObjectRecordDeleted(TransactionObject original, - T current) implements TxObjectRecord { - @Override - public T getIfStrategyCompatible(JObjectKey key, LockingStrategy strategy) { - return null; - } - - @Override - public JObjectKey getKey() { - return original.data().getKey(); - } - - public TxObjectRecordDeleted(TxObjectRecordWrite original) { - this(original.original(), original.copy().wrapped()); - } - - public TxObjectRecordDeleted(TransactionObject original) { - this(original, original.data()); - } - } - - public record TxObjectRecordCopyLock(TransactionObject original, - ChangeTrackingJData copy) - implements TxObjectRecordWrite { - @Override - public T getIfStrategyCompatible(JObjectKey key, LockingStrategy strategy) { - if (strategy == LockingStrategy.WRITE || strategy == LockingStrategy.OPTIMISTIC) - return copy.wrapped(); - return null; - } - } - - public record TxObjectRecordOptimistic(TransactionObject original, - ChangeTrackingJData copy) - implements TxObjectRecordWrite { - @Override - public T getIfStrategyCompatible(JObjectKey key, LockingStrategy strategy) { - if (strategy == LockingStrategy.WRITE || strategy == LockingStrategy.OPTIMISTIC) - return copy.wrapped(); - return null; - } + public record TxObjectRecordDeleted(JObjectKey key) implements TxObjectRecord { } } diff --git a/dhfs-parent/objects/src/test/java/com/usatiuk/dhfs/objects/ObjectsTest.java b/dhfs-parent/objects/src/test/java/com/usatiuk/dhfs/objects/ObjectsTest.java index d7262c13..d01d5cc5 100644 --- a/dhfs-parent/objects/src/test/java/com/usatiuk/dhfs/objects/ObjectsTest.java +++ b/dhfs-parent/objects/src/test/java/com/usatiuk/dhfs/objects/ObjectsTest.java @@ -86,15 +86,17 @@ public class ObjectsTest { curTx.put(newParent); txm.commit(); } - Assertions.assertThrows(Exception.class, () -> txm.run(() -> { + { + txm.begin(); var newParent = alloc.create(Parent.class, new JObjectKey("Parent7")); newParent.setLastName("John2"); curTx.put(newParent); - })); + txm.commit(); + } { txm.begin(); var parent = curTx.get(Parent.class, new JObjectKey("Parent7")).orElse(null); - Assertions.assertEquals("John", parent.getLastName()); + Assertions.assertEquals("John2", parent.getLastName()); txm.commit(); } } diff --git a/dhfs-parent/objects/src/test/java/com/usatiuk/dhfs/objects/PreCommitTxHookTest.java b/dhfs-parent/objects/src/test/java/com/usatiuk/dhfs/objects/PreCommitTxHookTest.java index 07325b76..25e9a194 100644 --- a/dhfs-parent/objects/src/test/java/com/usatiuk/dhfs/objects/PreCommitTxHookTest.java +++ b/dhfs-parent/objects/src/test/java/com/usatiuk/dhfs/objects/PreCommitTxHookTest.java @@ -44,7 +44,7 @@ public class PreCommitTxHookTest { { txm.begin(); - var parent = curTx.get(Parent.class, new JObjectKey("Parent")).orElse(null); + var parent = curTx.get(Parent.class, new JObjectKey("ParentCreate")).orElse(null); Assertions.assertEquals("John", parent.getLastName()); txm.commit(); } @@ -98,9 +98,9 @@ public class PreCommitTxHookTest { { txm.begin(); - var parent = curTx.get(Parent.class, new JObjectKey("ParentEdit")).orElse(null); - Assertions.assertEquals("John", parent.getLastName()); - parent.setLastName("John changed"); + var newParent = alloc.create(Parent.class, new JObjectKey("ParentEdit")); + newParent.setLastName("John changed"); + curTx.put(newParent); txm.commit(); } @@ -113,4 +113,33 @@ public class PreCommitTxHookTest { Assertions.assertEquals(new JObjectKey("ParentEdit"), keyCaptor.getValue()); } + @Test + void editObjectWithGet() { + { + txm.begin(); + var newParent = alloc.create(Parent.class, new JObjectKey("ParentEdit2")); + newParent.setLastName("John"); + curTx.put(newParent); + txm.commit(); + } + + { + txm.begin(); + var parent = curTx.get(Parent.class, new JObjectKey("ParentEdit2")).orElse(null); + Assertions.assertEquals("John", parent.getLastName()); + var newParent = alloc.create(Parent.class, new JObjectKey("ParentEdit2")); + newParent.setLastName("John changed"); + curTx.put(newParent); + txm.commit(); + } + + ArgumentCaptor dataCaptorOld = ArgumentCaptor.forClass(JData.class); + ArgumentCaptor dataCaptorNew = ArgumentCaptor.forClass(JData.class); + ArgumentCaptor keyCaptor = ArgumentCaptor.forClass(JObjectKey.class); + Mockito.verify(spyHook, Mockito.times(1)).onChange(keyCaptor.capture(), dataCaptorOld.capture(), dataCaptorNew.capture()); + Assertions.assertEquals("John", ((Parent) dataCaptorOld.getValue()).getLastName()); + Assertions.assertEquals("John changed", ((Parent) dataCaptorNew.getValue()).getLastName()); + Assertions.assertEquals(new JObjectKey("ParentEdit2"), keyCaptor.getValue()); + } + } diff --git a/dhfs-parent/utils/src/main/java/com/usatiuk/dhfs/utils/AutoCloseableNoThrow.java b/dhfs-parent/utils/src/main/java/com/usatiuk/dhfs/utils/AutoCloseableNoThrow.java new file mode 100644 index 00000000..29ec47ca --- /dev/null +++ b/dhfs-parent/utils/src/main/java/com/usatiuk/dhfs/utils/AutoCloseableNoThrow.java @@ -0,0 +1,6 @@ +package com.usatiuk.dhfs.utils; + +public interface AutoCloseableNoThrow extends AutoCloseable { + @Override + void close(); +} diff --git a/dhfs-parent/utils/src/main/java/com/usatiuk/dhfs/utils/DataLocker.java b/dhfs-parent/utils/src/main/java/com/usatiuk/dhfs/utils/DataLocker.java index ecb1288a..35e882da 100644 --- a/dhfs-parent/utils/src/main/java/com/usatiuk/dhfs/utils/DataLocker.java +++ b/dhfs-parent/utils/src/main/java/com/usatiuk/dhfs/utils/DataLocker.java @@ -1,41 +1,58 @@ package com.usatiuk.dhfs.utils; +import io.quarkus.logging.Log; + +import java.lang.ref.Cleaner; import java.util.concurrent.ConcurrentHashMap; public class DataLocker { private static class LockTag { boolean released = false; + final Thread owner = Thread.currentThread(); } private final ConcurrentHashMap _locks = new ConcurrentHashMap<>(); - public class Lock implements AutoCloseable { + private class Lock implements AutoCloseableNoThrow { private final Object _key; private final LockTag _tag; + private static final Cleaner CLEANER = Cleaner.create(); public Lock(Object key, LockTag tag) { _key = key; _tag = tag; + CLEANER.register(this, () -> { + if (!tag.released) { + Log.error("Lock collected without release: " + key); + } + }); } @Override public void close() { synchronized (_tag) { _tag.released = true; - _tag.notifyAll(); + _tag.notify(); _locks.remove(_key, _tag); } } } - public Lock lock(Object data) { + private static final AutoCloseableNoThrow DUMMY_LOCK = () -> { + }; + + public AutoCloseableNoThrow lock(Object data) { while (true) { try { var tag = _locks.get(data); if (tag != null) { synchronized (tag) { - if (!tag.released) + if (!tag.released) { + if (tag.owner == Thread.currentThread()) { + return DUMMY_LOCK; + } tag.wait(); + } continue; } }