much simplified transactions with immutable objects

This commit is contained in:
2024-12-31 16:10:46 +01:00
parent dc19e1862d
commit a0cad2a5f6
12 changed files with 266 additions and 395 deletions

View File

@@ -1,11 +1,10 @@
package com.usatiuk.dhfs.objects; package com.usatiuk.dhfs.objects;
import com.google.common.collect.Streams;
import com.usatiuk.dhfs.objects.persistence.ObjectPersistentStore; import com.usatiuk.dhfs.objects.persistence.ObjectPersistentStore;
import com.usatiuk.dhfs.objects.persistence.TxManifest; import com.usatiuk.dhfs.objects.persistence.TxManifest;
import com.usatiuk.dhfs.objects.transaction.*; import com.usatiuk.dhfs.objects.transaction.*;
import com.usatiuk.dhfs.utils.AutoCloseableNoThrow;
import com.usatiuk.dhfs.utils.DataLocker; import com.usatiuk.dhfs.utils.DataLocker;
import com.usatiuk.dhfs.utils.VoidFn;
import com.usatiuk.objects.alloc.runtime.ObjectAllocator; import com.usatiuk.objects.alloc.runtime.ObjectAllocator;
import com.usatiuk.objects.common.runtime.JData; import com.usatiuk.objects.common.runtime.JData;
import com.usatiuk.objects.common.runtime.JObjectKey; import com.usatiuk.objects.common.runtime.JObjectKey;
@@ -20,8 +19,8 @@ import java.lang.ref.WeakReference;
import java.util.*; import java.util.*;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReadWriteLock; import java.util.function.Consumer;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Function;
// Manages all access to com.usatiuk.objects.common.runtime.JData objects. // Manages all access to com.usatiuk.objects.common.runtime.JData objects.
// In particular, it serves as a source of truth for what is committed to the backing storage. // In particular, it serves as a source of truth for what is committed to the backing storage.
@@ -44,15 +43,13 @@ public class JObjectManager {
_preCommitTxHooks = preCommitTxHooks.stream().sorted(Comparator.comparingInt(PreCommitTxHook::getPriority)).toList(); _preCommitTxHooks = preCommitTxHooks.stream().sorted(Comparator.comparingInt(PreCommitTxHook::getPriority)).toList();
} }
private final DataLocker _storageReadLocker = new DataLocker(); private final DataLocker _objLocker = new DataLocker();
private final ConcurrentHashMap<JObjectKey, JDataWrapper<?>> _objects = new ConcurrentHashMap<>(); private final ConcurrentHashMap<JObjectKey, JDataWrapper<?>> _objects = new ConcurrentHashMap<>();
private final AtomicLong _txCounter = new AtomicLong(); private final AtomicLong _txCounter = new AtomicLong();
private class JDataWrapper<T extends JData> extends WeakReference<T> { private class JDataWrapper<T extends JData> extends WeakReference<T> {
private static final Cleaner CLEANER = Cleaner.create(); private static final Cleaner CLEANER = Cleaner.create();
final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
public JDataWrapper(T referent) { public JDataWrapper(T referent) {
super(referent); super(referent);
var key = referent.getKey(); var key = referent.getKey();
@@ -65,15 +62,11 @@ public class JObjectManager {
public String toString() { public String toString() {
return "JDataWrapper{" + return "JDataWrapper{" +
"ref=" + get() + "ref=" + get() +
", lock=" + lock +
'}'; '}';
} }
} }
private record WrapperRet<T extends JData>(T obj, JDataWrapper<T> wrapper) { private <T extends JData> T get(Class<T> type, JObjectKey key) {
}
private <T extends JData> WrapperRet<T> get(Class<T> type, JObjectKey key) {
while (true) { while (true) {
{ {
var got = _objects.get(key); var got = _objects.get(key);
@@ -81,7 +74,7 @@ public class JObjectManager {
if (got != null) { if (got != null) {
var ref = got.get(); var ref = got.get();
if (type.isInstance(ref)) { if (type.isInstance(ref)) {
return new WrapperRet<>((T) ref, (JDataWrapper<T>) got); return type.cast(ref);
} else if (ref == null) { } else if (ref == null) {
_objects.remove(key, got); _objects.remove(key, got);
} else { } else {
@@ -91,59 +84,48 @@ public class JObjectManager {
} }
//noinspection unused //noinspection unused
try (var readLock = _storageReadLocker.lock(key)) { try (var readLock = _objLocker.lock(key)) {
var read = objectStorage.readObject(key).orElse(null); if (_objects.containsKey(key)) continue;
var read = objectStorage.readObject(key)
.map(objectSerializer::deserialize)
.orElse(null);
if (read == null) return null; if (read == null) return null;
var got = objectSerializer.deserialize(read); if (type.isInstance(read)) {
var wrapper = new JDataWrapper<>(type.cast(read));
if (type.isInstance(got)) { var old = _objects.put(key, wrapper);
var wrapper = new JDataWrapper<>((T) got); assert old == null;
var old = _objects.putIfAbsent(key, wrapper); return type.cast(read);
if (old != null) continue;
return new WrapperRet<>((T) got, wrapper);
} else if (got == null) {
return null;
} else { } else {
throw new IllegalArgumentException("Object type mismatch: " + got.getClass() + " vs " + type); throw new IllegalArgumentException("Object type mismatch: " + read.getClass() + " vs " + type);
} }
} }
} }
} }
private record TransactionObjectNoLock<T extends JData>
private <T extends JData> WrapperRet<T> getLocked(Class<T> type, JObjectKey key, boolean write) { (Optional<T> data)
var read = get(type, key);
if (read == null) return null;
var lock = write ? read.wrapper().lock.writeLock() : read.wrapper().lock.readLock();
lock.lock();
while (true) {
try {
var readAgain = get(type, key);
if (readAgain == null) {
lock.unlock();
return null;
}
if (!Objects.equals(read, readAgain)) {
lock.unlock();
read = readAgain;
lock = write ? read.wrapper().lock.writeLock() : read.wrapper().lock.readLock();
lock.lock();
continue;
}
return read;
} catch (Throwable e) {
lock.unlock();
throw e;
}
}
}
private record TransactionObjectImpl<T extends JData>
(T data, ReadWriteLock lock)
implements TransactionObject<T> { implements TransactionObject<T> {
} }
private record TransactionObjectLocked<T extends JData>
(Optional<T> data, AutoCloseableNoThrow lock)
implements TransactionObject<T> {
}
private <T extends JData> TransactionObjectNoLock<T> getObj(Class<T> type, JObjectKey key) {
var got = get(type, key);
return new TransactionObjectNoLock<>(Optional.ofNullable(got));
}
private <T extends JData> TransactionObjectLocked<T> getObjLock(Class<T> type, JObjectKey key) {
var lock = _objLocker.lock(key);
var got = get(type, key);
return new TransactionObjectLocked<>(Optional.ofNullable(got), lock);
}
private class TransactionObjectSourceImpl implements TransactionObjectSource { private class TransactionObjectSourceImpl implements TransactionObjectSource {
private final long _txId; private final long _txId;
@@ -152,21 +134,26 @@ public class JObjectManager {
} }
@Override @Override
public <T extends JData> Optional<TransactionObject<T>> get(Class<T> type, JObjectKey key) { public <T extends JData> TransactionObject<T> get(Class<T> type, JObjectKey key) {
var got = JObjectManager.this.get(type, key); return getObj(type, key);
if (got == null) return Optional.empty(); // return getObj(type, key).map(got -> {
return Optional.of(new TransactionObjectImpl<>(got.obj(), got.wrapper().lock)); // if (got.data().getVersion() > _txId) {
// throw new IllegalStateException("Serialization race for " + key + ": " + got.data().getVersion() + " vs " + _txId);
// }
// return got;
// });
} }
@Override @Override
public <T extends JData> Optional<TransactionObject<T>> getWriteLocked(Class<T> type, JObjectKey key) { public <T extends JData> TransactionObject<T> getWriteLocked(Class<T> type, JObjectKey key) {
var got = JObjectManager.this.getLocked(type, key, true); return getObjLock(type, key);
if (got == null) return Optional.empty(); // return getObjLock(type, key).map(got -> {
if (got.obj.getVersion() >= _txId) { // if (got.data().getVersion() > _txId) {
got.wrapper().lock.writeLock().unlock(); // got.lock.close();
throw new IllegalStateException("Serialization race"); // throw new IllegalStateException("Serialization race for " + key + ": " + got.data().getVersion() + " vs " + _txId);
} // }
return Optional.of(new TransactionObjectImpl<>(got.obj(), got.wrapper().lock)); // return got;
// });
} }
} }
@@ -200,168 +187,130 @@ public class JObjectManager {
public void commit(TransactionPrivate tx) { public void commit(TransactionPrivate tx) {
Log.trace("Committing transaction " + tx.getId()); Log.trace("Committing transaction " + tx.getId());
// This also holds the weak references var current = new LinkedHashMap<JObjectKey, TxRecord.TxObjectRecord<?>>();
var toUnlock = new LinkedList<VoidFn>(); var dependenciesLocked = new LinkedHashMap<JObjectKey, TransactionObjectLocked<?>>();
var toUnlock = new ArrayList<AutoCloseableNoThrow>();
var toFlush = new LinkedList<TxRecord.TxObjectRecordWrite<?>>(); Consumer<JObjectKey> addDependency =
var toPut = new LinkedList<TxRecord.TxObjectRecordNew<?>>(); key -> {
var toDelete = new LinkedList<JObjectKey>(); dependenciesLocked.computeIfAbsent(key, k -> {
var dependencies = new LinkedList<TransactionObject<?>>(); Log.trace("Adding dependency " + k.toString());
var got = getObjLock(JData.class, k);
toUnlock.add(got.lock);
return got;
});
};
Function<JObjectKey, JData> getCurrent =
key -> switch (current.get(key)) {
case TxRecord.TxObjectRecordWrite<?> write -> write.data();
case TxRecord.TxObjectRecordDeleted deleted -> null;
case null -> {
var dep = dependenciesLocked.get(key);
if (dep == null) {
throw new IllegalStateException("No dependency for " + key);
}
yield dep.data.orElse(null);
}
default -> {
throw new IllegalStateException("Unexpected value: " + current.get(key));
}
};
// For existing objects: // For existing objects:
// Check that their version is not higher than the version of transaction being committed // Check that their version is not higher than the version of transaction being committed
// TODO: check deletions, inserts // TODO: check deletions, inserts
try { try {
Collection<TxRecord.TxObjectRecord<?>> drained; Collection<TxRecord.TxObjectRecord<?>> drained;
while (!(drained = tx.drainNewWrites()).isEmpty()) { while (!(drained = tx.drainNewWrites()).isEmpty()) {
Log.trace("Commit iteration with " + drained.size() + " records");
var toLock = new ArrayList<JObjectKey>(); var toLock = new ArrayList<JObjectKey>();
for (var entry : drained) { Log.trace("Commit iteration with " + drained.size() + " records");
Log.trace("Processing write " + entry.toString());
switch (entry) {
case TxRecord.TxObjectRecordCopyLock<?> copy -> {
toUnlock.add(copy.original().lock().writeLock()::unlock);
toFlush.add(copy);
}
case TxRecord.TxObjectRecordOptimistic<?> copy -> {
toLock.add(copy.original().data().getKey());
toFlush.add(copy);
}
case TxRecord.TxObjectRecordNew<?> created -> {
toPut.add(created);
}
case TxRecord.TxObjectRecordDeleted<?> deleted -> {
toLock.add(deleted.getKey());
toDelete.add(deleted.getKey());
}
default -> throw new IllegalStateException("Unexpected value: " + entry);
}
}
for (var entry : tx.reads().entrySet()) { drained.stream()
Log.trace("Processing read " + entry.toString()); .map(TxRecord.TxObjectRecord::key)
switch (entry.getValue()) { .sorted(Comparator.comparing(JObjectKey::toString))
case ReadTrackingObjectSource.TxReadObjectNone<?> none -> { .forEach(addDependency);
// TODO: Check this
}
case ReadTrackingObjectSource.TxReadObjectSome<?>(var obj) -> {
toLock.add(obj.data().getKey());
dependencies.add(obj);
}
default -> throw new IllegalStateException("Unexpected value: " + entry);
}
}
toLock.sort(Comparator.comparingInt(System::identityHashCode));
for (var key : toLock) {
Log.trace("Locking " + key.toString());
var got = getLocked(JData.class, key, true);
if (got == null) {
throw new IllegalStateException("Object " + key + " not found");
}
toUnlock.add(got.wrapper().lock.writeLock()::unlock);
}
for (var hook : _preCommitTxHooks) { for (var hook : _preCommitTxHooks) {
for (var entry : drained) { for (var entry : drained) {
Log.trace("Running pre-commit hook " + hook.getClass() + " for" + entry.toString()); Log.trace("Running pre-commit hook " + hook.getClass() + " for" + entry.toString());
switch (entry) { switch (entry) {
case TxRecord.TxObjectRecordCopyLock<?> copy -> { case TxRecord.TxObjectRecordWrite<?> write -> {
hook.onChange(copy.getKey(), copy.original().data(), copy.copy().wrapped()); var oldObj = getCurrent.apply(write.key());
if (oldObj == null) {
hook.onCreate(write.key(), write.data());
} else {
hook.onChange(write.key(), oldObj, write.data());
}
} }
case TxRecord.TxObjectRecordOptimistic<?> copy -> { case TxRecord.TxObjectRecordDeleted deleted -> {
hook.onChange(copy.getKey(), copy.original().data(), copy.copy().wrapped()); hook.onDelete(deleted.key(), getCurrent.apply(deleted.key()));
}
case TxRecord.TxObjectRecordNew<?> created -> {
hook.onCreate(created.getKey(), created.created());
}
case TxRecord.TxObjectRecordDeleted<?> deleted -> {
hook.onDelete(deleted.getKey(), deleted.current());
} }
default -> throw new IllegalStateException("Unexpected value: " + entry); default -> throw new IllegalStateException("Unexpected value: " + entry);
} }
current.put(entry.key(), entry);
} }
} }
} }
for (var dep : dependencies) { // FIXME: lock leak
Log.trace("Checking dependency " + dep.toString()); for (var read : tx.reads().entrySet()) {
var current = _objects.get(dep.data().getKey()).get(); addDependency.accept(read.getKey());
if (read.getValue() instanceof TransactionObjectLocked<?> locked) {
// Check that the object we have locked is really the one in the map toUnlock.add(locked.lock);
// Note that current can be null, not only if it doesn't exist, but
// also for example in the case when it was changed and then garbage collected
if (dep.data() != current) {
throw new IllegalStateException("Serialization hazard: " + dep.data() + " vs " + current);
}
if (current.getVersion() >= tx.getId()) {
throw new IllegalStateException("Serialization hazard: " + current.getVersion() + " vs " + tx.getId());
} }
} }
for (var put : toPut) { for (var dep : dependenciesLocked.entrySet()) {
Log.trace("Putting new object " + put.toString()); Log.trace("Checking dependency " + dep.getKey());
var wrapper = new JDataWrapper<>(put.created());
wrapper.lock.writeLock().lock();
var old = _objects.putIfAbsent(put.created().getKey(), wrapper);
if (old != null)
throw new IllegalStateException("Object already exists: " + old.get());
toUnlock.add(wrapper.lock.writeLock()::unlock);
}
for (var record : toFlush) { if (dep.getValue().data.isEmpty()) continue;
if (!record.copy().isModified()) {
Log.trace("Not changed " + record.toString()); if (dep.getValue().data.get().getVersion() >= tx.getId()) {
continue; throw new IllegalStateException("Serialization hazard: " + dep.getValue().data.get().getVersion() + " vs " + tx.getId());
} }
Log.trace("Flushing changed " + record.toString());
var current = _objects.get(record.original().data().getKey());
var newWrapper = new JDataWrapper<>(record.copy().wrapped());
newWrapper.lock.writeLock().lock();
if (!_objects.replace(record.copy().wrapped().getKey(), current, newWrapper)) {
assert false; // Should not happen, as the object is locked
throw new IllegalStateException("Object changed during transaction after locking: " + current.get() + " vs " + record.copy().wrapped());
}
toUnlock.add(newWrapper.lock.writeLock()::unlock);
} }
Log.tracef("Flushing transaction %d to storage", tx.getId()); Log.tracef("Flushing transaction %d to storage", tx.getId());
var written = Streams.concat(toFlush.stream().map(f -> f.copy().wrapped()), var toDelete = new ArrayList<JObjectKey>();
toPut.stream().map(TxRecord.TxObjectRecordNew::created)).toList(); var toWrite = new ArrayList<JObjectKey>();
// Really flushing to storage for (var action : current.entrySet()) {
written.forEach(obj -> { switch (action.getValue()) {
Log.trace("Flushing object " + obj.getKey()); case TxRecord.TxObjectRecordWrite<?> write -> {
assert obj.getVersion() == tx.getId(); Log.trace("Flushing object " + action.getKey());
var key = obj.getKey(); toWrite.add(action.getKey());
var data = objectSerializer.serialize(obj); var data = objectSerializer.serialize(write.data());
objectStorage.writeObject(key, data); objectStorage.writeObject(action.getKey(), data);
}); _objects.put(action.getKey(), new JDataWrapper<>(write.data()));
}
case TxRecord.TxObjectRecordDeleted deleted -> {
Log.trace("Deleting object " + action.getKey());
toDelete.add(action.getKey());
_objects.remove(action.getKey());
}
default -> {
throw new IllegalStateException("Unexpected value: " + action.getValue());
}
}
}
Log.tracef("Committing transaction %d to storage", tx.getId()); Log.tracef("Committing transaction %d to storage", tx.getId());
objectStorage.commitTx(new SimpleTxManifest(written.stream().map(JData::getKey).toList(), toDelete)); objectStorage.commitTx(new SimpleTxManifest(toWrite, toDelete));
} catch (
for (var del : toDelete) { Throwable t) {
_objects.remove(del);
}
} catch (Throwable t) {
Log.error("Error when committing transaction", t); Log.error("Error when committing transaction", t);
throw t; throw t;
} finally { } finally {
for (var unlock : toUnlock) { for (var unlock : toUnlock) {
unlock.apply(); unlock.close();
} }
} }
} }
public void rollback(TransactionPrivate tx) {
}
} }

View File

@@ -2,7 +2,6 @@ package com.usatiuk.dhfs.objects;
import com.usatiuk.dhfs.objects.transaction.Transaction; import com.usatiuk.dhfs.objects.transaction.Transaction;
import com.usatiuk.dhfs.objects.transaction.TransactionPrivate; import com.usatiuk.dhfs.objects.transaction.TransactionPrivate;
import com.usatiuk.dhfs.objects.transaction.TxRecord;
import io.quarkus.logging.Log; import io.quarkus.logging.Log;
import jakarta.enterprise.context.ApplicationScoped; import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject; import jakarta.inject.Inject;
@@ -44,16 +43,18 @@ public class TransactionManagerImpl implements TransactionManager {
@Override @Override
public void rollback() { public void rollback() {
var tx = _currentTransaction.get(); if (_currentTransaction.get() == null) {
// Works only before commit was called throw new IllegalStateException("No transaction started");
for (var o : tx.drainNewWrites()) { }
switch (o) {
case TxRecord.TxObjectRecordCopyLock<?> r -> r.original().lock().writeLock().unlock(); try {
default -> { jObjectManager.rollback(_currentTransaction.get());
} } catch (Throwable e) {
} Log.error("Transaction rollback failed", e);
throw e;
} finally {
_currentTransaction.remove();
} }
_currentTransaction.remove();
} }
@Override @Override

View File

@@ -6,78 +6,53 @@ import com.usatiuk.objects.common.runtime.JObjectKey;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Optional;
public class ReadTrackingObjectSource implements TransactionObjectSource { public class ReadTrackingObjectSource implements TransactionObjectSource {
private final TransactionObjectSource _delegate; private final TransactionObjectSource _delegate;
public interface TxReadObject<T extends JData> {} private final Map<JObjectKey, TransactionObject<?>> _readSet = new HashMap<>();
public record TxReadObjectNone<T extends JData>() implements TxReadObject<T> {}
public record TxReadObjectSome<T extends JData>(TransactionObject<T> obj) implements TxReadObject<T> {}
private final Map<JObjectKey, TxReadObject<?>> _readSet = new HashMap<>();
public ReadTrackingObjectSource(TransactionObjectSource delegate) { public ReadTrackingObjectSource(TransactionObjectSource delegate) {
_delegate = delegate; _delegate = delegate;
} }
public Map<JObjectKey, TxReadObject<?>> getRead() { public Map<JObjectKey, TransactionObject<?>> getRead() {
return Collections.unmodifiableMap(_readSet); return Collections.unmodifiableMap(_readSet);
} }
@Override @Override
public <T extends JData> Optional<TransactionObject<T>> get(Class<T> type, JObjectKey key) { public <T extends JData> TransactionObject<T> get(Class<T> type, JObjectKey key) {
var got = _readSet.get(key); var got = _readSet.get(key);
if (got == null) { if (got == null) {
var read = _delegate.get(type, key); var read = _delegate.get(type, key);
if (read.isPresent()) { _readSet.put(key, read);
_readSet.put(key, new TxReadObjectSome<>(read.get()));
} else {
_readSet.put(key, new TxReadObjectNone<>());
}
return read; return read;
} }
return switch (got) { got.data().ifPresent(data -> {
case TxReadObjectNone<?> none -> Optional.empty(); if (!type.isInstance(data))
case TxReadObjectSome<?> some -> { throw new IllegalStateException("Type mismatch for " + got + ": expected " + type + ", got " + data.getClass());
if (type.isInstance(some.obj().data())) { });
yield Optional.of((TransactionObject<T>) some.obj());
} else { return (TransactionObject<T>) got;
yield Optional.empty();
}
}
default -> throw new IllegalStateException("Unexpected value: " + got);
};
} }
@Override @Override
public <T extends JData> Optional<TransactionObject<T>> getWriteLocked(Class<T> type, JObjectKey key) { public <T extends JData> TransactionObject<T> getWriteLocked(Class<T> type, JObjectKey key) {
var got = _readSet.get(key); var got = _readSet.get(key);
if (got == null) { if (got == null) {
var read = _delegate.getWriteLocked(type, key); var read = _delegate.getWriteLocked(type, key);
if (read.isPresent()) { _readSet.put(key, read);
_readSet.put(key, new TxReadObjectSome<>(read.get()));
} else {
_readSet.put(key, new TxReadObjectNone<>());
}
return read; return read;
} }
return switch (got) { got.data().ifPresent(data -> {
case TxReadObjectNone<?> none -> Optional.empty(); if (!type.isInstance(data))
case TxReadObjectSome<?> some -> { throw new IllegalStateException("Type mismatch for " + got + ": expected " + type + ", got " + data.getClass());
if (type.isInstance(some.obj().data())) { });
yield Optional.of((TransactionObject<T>) some.obj());
} else { return (TransactionObject<T>) got;
yield Optional.empty();
}
}
default -> throw new IllegalStateException("Unexpected value: " + got);
};
} }
} }

View File

@@ -23,8 +23,8 @@ public class TransactionFactoryImpl implements TransactionFactory {
private final long _id; private final long _id;
private final ReadTrackingObjectSource _source; private final ReadTrackingObjectSource _source;
private Map<JObjectKey, TxRecord.TxObjectRecord<?>> _objects = new HashMap<>(); private final Map<JObjectKey, TxRecord.TxObjectRecord<?>> _writes = new HashMap<>();
private Map<JObjectKey, TxRecord.TxObjectRecord<?>> _newObjects = new HashMap<>(); private Map<JObjectKey, TxRecord.TxObjectRecord<?>> _newWrites = new HashMap<>();
private TransactionImpl(long id, TransactionObjectSource source) { private TransactionImpl(long id, TransactionObjectSource source) {
_id = id; _id = id;
@@ -33,97 +33,53 @@ public class TransactionFactoryImpl implements TransactionFactory {
@Override @Override
public <T extends JData> Optional<T> get(Class<T> type, JObjectKey key, LockingStrategy strategy) { public <T extends JData> Optional<T> get(Class<T> type, JObjectKey key, LockingStrategy strategy) {
var got = _objects.get(key); return switch (strategy) {
if (got != null) { case OPTIMISTIC -> _source.get(type, key).data();
var compatible = got.getIfStrategyCompatible(key, strategy); case WRITE -> _source.getWriteLocked(type, key).data();
if (compatible == null) { };
throw new IllegalArgumentException("Locking strategy mismatch");
}
if (!type.isInstance(compatible)) {
throw new IllegalArgumentException("Object type mismatch");
}
return Optional.of(type.cast(compatible));
}
switch (strategy) {
case OPTIMISTIC: {
var read = _source.get(type, key).orElse(null);
if (read == null) {
return Optional.empty();
}
var copy = objectAllocator.copy(read.data());
_objects.put(key, new TxRecord.TxObjectRecordOptimistic<>(read, copy));
_newObjects.put(key, new TxRecord.TxObjectRecordOptimistic<>(read, copy));
return Optional.of(copy.wrapped());
}
case WRITE: {
var locked = _source.getWriteLocked(type, key).orElse(null);
if (locked == null) {
return Optional.empty();
}
var copy = objectAllocator.copy(locked.data());
_objects.put(key, new TxRecord.TxObjectRecordCopyLock<>(locked, copy));
_newObjects.put(key, new TxRecord.TxObjectRecordCopyLock<>(locked, copy));
return Optional.of(copy.wrapped());
}
default:
throw new IllegalArgumentException("Unknown locking strategy");
}
} }
@Override @Override
public void delete(JObjectKey key) { public void delete(JObjectKey key) {
// get(JData.class, key, LockingStrategy.OPTIMISTIC);
// FIXME // FIXME
var got = _objects.get(key); var got = _writes.get(key);
if (got != null) { if (got != null) {
switch (got) { switch (got) {
case TxRecord.TxObjectRecordNew<?> created -> { case TxRecord.TxObjectRecordDeleted deleted -> {
_objects.remove(key);
_newObjects.remove(key);
}
case TxRecord.TxObjectRecordCopyLock<?> copyLockRecord -> {
_objects.put(key, new TxRecord.TxObjectRecordDeleted<>(copyLockRecord));
_newObjects.put(key, new TxRecord.TxObjectRecordDeleted<>(copyLockRecord));
}
case TxRecord.TxObjectRecordOptimistic<?> optimisticRecord -> {
_objects.put(key, new TxRecord.TxObjectRecordDeleted<>(optimisticRecord));
_newObjects.put(key, new TxRecord.TxObjectRecordDeleted<>(optimisticRecord));
}
case TxRecord.TxObjectRecordDeleted<?> deletedRecord -> {
return; return;
} }
default -> throw new IllegalStateException("Unexpected value: " + got); default -> {
}
} }
return;
} }
//
var read = _source.get(JData.class, key).orElse(null); // var read = _source.get(JData.class, key).orElse(null);
if (read == null) { // if (read == null) {
return; // return;
} // }
_objects.put(key, new TxRecord.TxObjectRecordDeleted<>(read)); // FIXME: _writes.put(key, new TxRecord.TxObjectRecordDeleted(key)); // FIXME:
_newObjects.put(key, new TxRecord.TxObjectRecordDeleted<>(read)); _newWrites.put(key, new TxRecord.TxObjectRecordDeleted(key));
} }
@Override @Override
public void put(JData obj) { public void put(JData obj) {
if (_objects.containsKey(obj.getKey())) { // get(JData.class, obj.getKey(), LockingStrategy.OPTIMISTIC);
throw new IllegalArgumentException("Object already exists in transaction");
}
_objects.put(obj.getKey(), new TxRecord.TxObjectRecordNew<>(obj)); _writes.put(obj.getKey(), new TxRecord.TxObjectRecordWrite<>(obj));
_newObjects.put(obj.getKey(), new TxRecord.TxObjectRecordNew<>(obj)); _newWrites.put(obj.getKey(), new TxRecord.TxObjectRecordWrite<>(obj));
} }
@Override @Override
public Collection<TxRecord.TxObjectRecord<?>> drainNewWrites() { public Collection<TxRecord.TxObjectRecord<?>> drainNewWrites() {
var ret = _newObjects; var ret = _newWrites;
_newObjects = new HashMap<>(); _newWrites = new HashMap<>();
return ret.values(); return ret.values();
} }
@Override @Override
public Map<JObjectKey, ReadTrackingObjectSource.TxReadObject<?>> reads() { public Map<JObjectKey, TransactionObject<?>> reads() {
return _source.getRead(); return _source.getRead();
} }
} }

View File

@@ -2,10 +2,8 @@ package com.usatiuk.dhfs.objects.transaction;
import com.usatiuk.objects.common.runtime.JData; import com.usatiuk.objects.common.runtime.JData;
import java.util.concurrent.locks.ReadWriteLock; import java.util.Optional;
public interface TransactionObject<T extends JData> { public interface TransactionObject<T extends JData> {
T data(); Optional<T> data();
ReadWriteLock lock();
} }

View File

@@ -3,10 +3,8 @@ package com.usatiuk.dhfs.objects.transaction;
import com.usatiuk.objects.common.runtime.JData; import com.usatiuk.objects.common.runtime.JData;
import com.usatiuk.objects.common.runtime.JObjectKey; import com.usatiuk.objects.common.runtime.JObjectKey;
import java.util.Optional;
public interface TransactionObjectSource { public interface TransactionObjectSource {
<T extends JData> Optional<TransactionObject<T>> get(Class<T> type, JObjectKey key); <T extends JData> TransactionObject<T> get(Class<T> type, JObjectKey key);
<T extends JData> Optional<TransactionObject<T>> getWriteLocked(Class<T> type, JObjectKey key); <T extends JData> TransactionObject<T> getWriteLocked(Class<T> type, JObjectKey key);
} }

View File

@@ -9,5 +9,5 @@ import java.util.Map;
public interface TransactionPrivate extends Transaction { public interface TransactionPrivate extends Transaction {
Collection<TxRecord.TxObjectRecord<?>> drainNewWrites(); Collection<TxRecord.TxObjectRecord<?>> drainNewWrites();
Map<JObjectKey, ReadTrackingObjectSource.TxReadObject<?>> reads(); Map<JObjectKey, TransactionObject<?>> reads();
} }

View File

@@ -1,80 +1,20 @@
package com.usatiuk.dhfs.objects.transaction; package com.usatiuk.dhfs.objects.transaction;
import com.usatiuk.objects.alloc.runtime.ChangeTrackingJData;
import com.usatiuk.objects.common.runtime.JData; import com.usatiuk.objects.common.runtime.JData;
import com.usatiuk.objects.common.runtime.JObjectKey; import com.usatiuk.objects.common.runtime.JObjectKey;
public class TxRecord { public class TxRecord {
public interface TxObjectRecord<T> { public interface TxObjectRecord<T> {
T getIfStrategyCompatible(JObjectKey key, LockingStrategy strategy); JObjectKey key();
JObjectKey getKey();
} }
public interface TxObjectRecordWrite<T extends JData> extends TxObjectRecord<T> { public record TxObjectRecordWrite<T extends JData>(JData data) implements TxObjectRecord<T> {
TransactionObject<T> original(); @Override
public JObjectKey key() {
ChangeTrackingJData<T> copy(); return data.getKey();
default JObjectKey getKey() {
return original().data().getKey();
} }
} }
public record TxObjectRecordNew<T extends JData>(T created) implements TxObjectRecord<T> { public record TxObjectRecordDeleted(JObjectKey key) implements TxObjectRecord<JData> {
@Override
public T getIfStrategyCompatible(JObjectKey key, LockingStrategy strategy) {
if (strategy == LockingStrategy.WRITE || strategy == LockingStrategy.OPTIMISTIC)
return created;
return null;
}
@Override
public JObjectKey getKey() {
return created.getKey();
}
}
public record TxObjectRecordDeleted<T extends JData>(TransactionObject<T> original,
T current) implements TxObjectRecord<T> {
@Override
public T getIfStrategyCompatible(JObjectKey key, LockingStrategy strategy) {
return null;
}
@Override
public JObjectKey getKey() {
return original.data().getKey();
}
public TxObjectRecordDeleted(TxObjectRecordWrite<T> original) {
this(original.original(), original.copy().wrapped());
}
public TxObjectRecordDeleted(TransactionObject<T> original) {
this(original, original.data());
}
}
public record TxObjectRecordCopyLock<T extends JData>(TransactionObject<T> original,
ChangeTrackingJData<T> copy)
implements TxObjectRecordWrite<T> {
@Override
public T getIfStrategyCompatible(JObjectKey key, LockingStrategy strategy) {
if (strategy == LockingStrategy.WRITE || strategy == LockingStrategy.OPTIMISTIC)
return copy.wrapped();
return null;
}
}
public record TxObjectRecordOptimistic<T extends JData>(TransactionObject<T> original,
ChangeTrackingJData<T> copy)
implements TxObjectRecordWrite<T> {
@Override
public T getIfStrategyCompatible(JObjectKey key, LockingStrategy strategy) {
if (strategy == LockingStrategy.WRITE || strategy == LockingStrategy.OPTIMISTIC)
return copy.wrapped();
return null;
}
} }
} }

View File

@@ -86,15 +86,17 @@ public class ObjectsTest {
curTx.put(newParent); curTx.put(newParent);
txm.commit(); txm.commit();
} }
Assertions.assertThrows(Exception.class, () -> txm.run(() -> { {
txm.begin();
var newParent = alloc.create(Parent.class, new JObjectKey("Parent7")); var newParent = alloc.create(Parent.class, new JObjectKey("Parent7"));
newParent.setLastName("John2"); newParent.setLastName("John2");
curTx.put(newParent); curTx.put(newParent);
})); txm.commit();
}
{ {
txm.begin(); txm.begin();
var parent = curTx.get(Parent.class, new JObjectKey("Parent7")).orElse(null); var parent = curTx.get(Parent.class, new JObjectKey("Parent7")).orElse(null);
Assertions.assertEquals("John", parent.getLastName()); Assertions.assertEquals("John2", parent.getLastName());
txm.commit(); txm.commit();
} }
} }

View File

@@ -44,7 +44,7 @@ public class PreCommitTxHookTest {
{ {
txm.begin(); txm.begin();
var parent = curTx.get(Parent.class, new JObjectKey("Parent")).orElse(null); var parent = curTx.get(Parent.class, new JObjectKey("ParentCreate")).orElse(null);
Assertions.assertEquals("John", parent.getLastName()); Assertions.assertEquals("John", parent.getLastName());
txm.commit(); txm.commit();
} }
@@ -98,9 +98,9 @@ public class PreCommitTxHookTest {
{ {
txm.begin(); txm.begin();
var parent = curTx.get(Parent.class, new JObjectKey("ParentEdit")).orElse(null); var newParent = alloc.create(Parent.class, new JObjectKey("ParentEdit"));
Assertions.assertEquals("John", parent.getLastName()); newParent.setLastName("John changed");
parent.setLastName("John changed"); curTx.put(newParent);
txm.commit(); txm.commit();
} }
@@ -113,4 +113,33 @@ public class PreCommitTxHookTest {
Assertions.assertEquals(new JObjectKey("ParentEdit"), keyCaptor.getValue()); Assertions.assertEquals(new JObjectKey("ParentEdit"), keyCaptor.getValue());
} }
@Test
void editObjectWithGet() {
{
txm.begin();
var newParent = alloc.create(Parent.class, new JObjectKey("ParentEdit2"));
newParent.setLastName("John");
curTx.put(newParent);
txm.commit();
}
{
txm.begin();
var parent = curTx.get(Parent.class, new JObjectKey("ParentEdit2")).orElse(null);
Assertions.assertEquals("John", parent.getLastName());
var newParent = alloc.create(Parent.class, new JObjectKey("ParentEdit2"));
newParent.setLastName("John changed");
curTx.put(newParent);
txm.commit();
}
ArgumentCaptor<JData> dataCaptorOld = ArgumentCaptor.forClass(JData.class);
ArgumentCaptor<JData> dataCaptorNew = ArgumentCaptor.forClass(JData.class);
ArgumentCaptor<JObjectKey> keyCaptor = ArgumentCaptor.forClass(JObjectKey.class);
Mockito.verify(spyHook, Mockito.times(1)).onChange(keyCaptor.capture(), dataCaptorOld.capture(), dataCaptorNew.capture());
Assertions.assertEquals("John", ((Parent) dataCaptorOld.getValue()).getLastName());
Assertions.assertEquals("John changed", ((Parent) dataCaptorNew.getValue()).getLastName());
Assertions.assertEquals(new JObjectKey("ParentEdit2"), keyCaptor.getValue());
}
} }

View File

@@ -0,0 +1,6 @@
package com.usatiuk.dhfs.utils;
public interface AutoCloseableNoThrow extends AutoCloseable {
@Override
void close();
}

View File

@@ -1,41 +1,58 @@
package com.usatiuk.dhfs.utils; package com.usatiuk.dhfs.utils;
import io.quarkus.logging.Log;
import java.lang.ref.Cleaner;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
public class DataLocker { public class DataLocker {
private static class LockTag { private static class LockTag {
boolean released = false; boolean released = false;
final Thread owner = Thread.currentThread();
} }
private final ConcurrentHashMap<Object, LockTag> _locks = new ConcurrentHashMap<>(); private final ConcurrentHashMap<Object, LockTag> _locks = new ConcurrentHashMap<>();
public class Lock implements AutoCloseable { private class Lock implements AutoCloseableNoThrow {
private final Object _key; private final Object _key;
private final LockTag _tag; private final LockTag _tag;
private static final Cleaner CLEANER = Cleaner.create();
public Lock(Object key, LockTag tag) { public Lock(Object key, LockTag tag) {
_key = key; _key = key;
_tag = tag; _tag = tag;
CLEANER.register(this, () -> {
if (!tag.released) {
Log.error("Lock collected without release: " + key);
}
});
} }
@Override @Override
public void close() { public void close() {
synchronized (_tag) { synchronized (_tag) {
_tag.released = true; _tag.released = true;
_tag.notifyAll(); _tag.notify();
_locks.remove(_key, _tag); _locks.remove(_key, _tag);
} }
} }
} }
public Lock lock(Object data) { private static final AutoCloseableNoThrow DUMMY_LOCK = () -> {
};
public AutoCloseableNoThrow lock(Object data) {
while (true) { while (true) {
try { try {
var tag = _locks.get(data); var tag = _locks.get(data);
if (tag != null) { if (tag != null) {
synchronized (tag) { synchronized (tag) {
if (!tag.released) if (!tag.released) {
if (tag.owner == Thread.currentThread()) {
return DUMMY_LOCK;
}
tag.wait(); tag.wait();
}
continue; continue;
} }
} }