4 Commits

8 changed files with 209 additions and 281 deletions

View File

@@ -25,36 +25,41 @@ import javax.annotation.Nonnull;
import java.util.*; import java.util.*;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer; import java.util.function.Consumer;
import java.util.stream.Stream;
@ApplicationScoped @ApplicationScoped
public class WritebackObjectPersistentStore { public class WritebackObjectPersistentStore {
@Inject @Inject
CachingObjectPersistentStore cachedStore; CachingObjectPersistentStore cachedStore;
@Inject
ExecutorService _callbackExecutor;
@ConfigProperty(name = "dhfs.objects.writeback.limit") @ConfigProperty(name = "dhfs.objects.writeback.limit")
long sizeLimit; int sizeLimit;
private TxBundle _pendingBundle = null;
private int _curSize = 0;
private final AtomicReference<TxBundle> _pendingBundle = new AtomicReference<>(null);
private final AtomicReference<PendingWriteData> _pendingWrites = new AtomicReference<>(null); private final AtomicReference<PendingWriteData> _pendingWrites = new AtomicReference<>(null);
private final Object _flushWaitSynchronizer = new Object(); private final ReentrantLock _pendingBundleLock = new ReentrantLock();
private final Condition _newBundleCondition = _pendingBundleLock.newCondition();
private final Condition _flushCondition = _pendingBundleLock.newCondition();
private final AtomicLong _lastFlushedId = new AtomicLong(-1); private final AtomicLong _lastFlushedId = new AtomicLong(-1);
private final AtomicLong _lastCommittedId = new AtomicLong(-1); private final AtomicLong _lastCommittedId = new AtomicLong(-1);
private final AtomicLong _waitedTotal = new AtomicLong(0); private final AtomicLong _waitedTotal = new AtomicLong(0);
private long currentSize = 0;
private ExecutorService _writebackExecutor; private ExecutorService _writebackExecutor;
private ExecutorService _statusExecutor; private ExecutorService _statusExecutor;
@Inject
ExecutorService _callbackExecutor;
private volatile boolean _ready = false; private volatile boolean _ready = false;
void init(@Observes @Priority(120) StartupEvent event) { void init(@Observes @Priority(120) StartupEvent event) {
@@ -72,8 +77,8 @@ public class WritebackObjectPersistentStore {
try { try {
while (true) { while (true) {
Thread.sleep(1000); Thread.sleep(1000);
if (currentSize > 0) if (_curSize > 0)
Log.info("Tx commit status: size=" + currentSize / 1024 / 1024 + "MB"); Log.info("Tx commit status: size=" + _curSize / 1024 / 1024 + "MB");
} }
} catch (InterruptedException ignored) { } catch (InterruptedException ignored) {
} }
@@ -91,11 +96,14 @@ public class WritebackObjectPersistentStore {
void shutdown(@Observes @Priority(890) ShutdownEvent event) throws InterruptedException { void shutdown(@Observes @Priority(890) ShutdownEvent event) throws InterruptedException {
Log.info("Waiting for all transactions to drain"); Log.info("Waiting for all transactions to drain");
synchronized (_flushWaitSynchronizer) { _ready = false;
_ready = false; _pendingBundleLock.lock();
while (currentSize > 0) { try {
_flushWaitSynchronizer.wait(); while (_curSize > 0) {
_flushCondition.await();
} }
} finally {
_pendingBundleLock.unlock();
} }
_writebackExecutor.shutdownNow(); _writebackExecutor.shutdownNow();
@@ -110,9 +118,18 @@ public class WritebackObjectPersistentStore {
while (!Thread.interrupted()) { while (!Thread.interrupted()) {
try { try {
TxBundle bundle; TxBundle bundle;
synchronized (_pendingBundle) { _pendingBundleLock.lock();
while ((bundle = _pendingBundle.getAndSet(null)) == null) try {
_pendingBundle.wait(); while (_pendingBundle == null)
_newBundleCondition.await();
bundle = _pendingBundle;
_pendingBundle = null;
_curSize -= bundle.size();
assert _curSize == 0;
_flushCondition.signal();
} finally {
_pendingBundleLock.unlock();
} }
var toWrite = new ArrayList<Pair<JObjectKey, JDataVersionedWrapper>>(); var toWrite = new ArrayList<Pair<JObjectKey, JDataVersionedWrapper>>();
@@ -136,7 +153,8 @@ public class WritebackObjectPersistentStore {
Log.tracev("Bundle {0} committed", bundle.id()); Log.tracev("Bundle {0} committed", bundle.id());
synchronized (_pendingWrites) { _pendingBundleLock.lock();
try {
var curPw = _pendingWrites.get(); var curPw = _pendingWrites.get();
var curPwMap = curPw.pendingWrites(); var curPwMap = curPw.pendingWrites();
for (var e : bundle._entries.values()) { for (var e : bundle._entries.values()) {
@@ -150,19 +168,15 @@ public class WritebackObjectPersistentStore {
curPw.lastCommittedId() curPw.lastCommittedId()
); );
_pendingWrites.compareAndSet(curPw, newCurPw); _pendingWrites.compareAndSet(curPw, newCurPw);
} finally {
_pendingBundleLock.unlock();
} }
_lastFlushedId.set(bundle.id()); _lastFlushedId.set(bundle.id());
var callbacks = bundle.getCallbacks(); var callbacks = bundle.callbacks();
_callbackExecutor.submit(() -> { _callbackExecutor.submit(() -> {
callbacks.forEach(Runnable::run); callbacks.forEach(Runnable::run);
}); });
synchronized (_flushWaitSynchronizer) {
currentSize -= bundle.size();
if (currentSize <= sizeLimit || !_ready)
_flushWaitSynchronizer.notifyAll();
}
} catch (InterruptedException ignored) { } catch (InterruptedException ignored) {
} catch (Exception e) { } catch (Exception e) {
Log.error("Uncaught exception in writeback", e); Log.error("Uncaught exception in writeback", e);
@@ -175,87 +189,72 @@ public class WritebackObjectPersistentStore {
public long commitBundle(Collection<TxRecord.TxObjectRecord<?>> writes) { public long commitBundle(Collection<TxRecord.TxObjectRecord<?>> writes) {
verifyReady(); verifyReady();
while (true) { _pendingBundleLock.lock();
synchronized (_flushWaitSynchronizer) { try {
boolean shouldWake = false;
if (_curSize > sizeLimit) {
shouldWake = true;
long started = System.currentTimeMillis(); long started = System.currentTimeMillis();
while (currentSize > sizeLimit) { while (_curSize > sizeLimit)
try { _flushCondition.await();
_flushWaitSynchronizer.wait();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
long waited = System.currentTimeMillis() - started; long waited = System.currentTimeMillis() - started;
_waitedTotal.addAndGet(waited); _waitedTotal.addAndGet(waited);
if (Log.isTraceEnabled()) if (Log.isTraceEnabled())
Log.tracev("Thread {0} waited for tx bundle for {1} ms", Thread.currentThread().getName(), waited); Log.tracev("Thread {0} waited for tx bundle for {1} ms", Thread.currentThread().getName(), waited);
} }
synchronized (_pendingBundle) { var oursId = _lastCommittedId.incrementAndGet();
synchronized (_flushWaitSynchronizer) {
if (currentSize > sizeLimit) {
continue;
}
}
TxBundle bundle = new TxBundle(_lastCommittedId.incrementAndGet()); var curBundle = _pendingBundle;
int oldSize = 0;
for (var action : writes) { if (curBundle != null) {
switch (action) { oldSize = curBundle.size();
case TxRecord.TxObjectRecordWrite<?> write -> { curBundle.setId(oursId);
// Log.tracev("Flushing object {0}", write.key()); } else {
bundle.commit(new JDataVersionedWrapperImpl(write.data(), bundle.id())); curBundle = new TxBundle(oursId);
}
case TxRecord.TxObjectRecordDeleted deleted -> {
// Log.tracev("Deleting object {0}", deleted.key());
bundle.delete(deleted.key());
}
default -> {
throw new TxCommitException("Unexpected value: " + action.key());
}
}
}
synchronized (_pendingWrites) {
var curPw = _pendingWrites.get();
var curPwMap = curPw.pendingWrites();
for (var e : ((TxBundle) bundle)._entries.values()) {
switch (e) {
case TxBundle.CommittedEntry c -> {
curPwMap = curPwMap.plus(c.key(), new PendingWrite(c.data, bundle.id()));
}
case TxBundle.DeletedEntry d -> {
curPwMap = curPwMap.plus(d.key(), new PendingDelete(d.key, bundle.id()));
}
default -> throw new IllegalStateException("Unexpected value: " + e);
}
}
// Now, make the changes visible to new iterators
var newCurPw = new PendingWriteData(
curPwMap,
curPw.lastFlushedId(),
bundle.id()
);
_pendingWrites.compareAndSet(curPw, newCurPw);
}
var curBundle = _pendingBundle.get();
long oldSize = 0;
if (curBundle != null) {
oldSize = curBundle.size();
curBundle.compress(bundle);
} else {
curBundle = bundle;
}
_pendingBundle.set(curBundle);
_pendingBundle.notifyAll();
synchronized (_flushWaitSynchronizer) {
currentSize += (curBundle.size() - oldSize);
}
return bundle.id();
} }
var curPw = _pendingWrites.get();
var curPwMap = curPw.pendingWrites();
for (var action : writes) {
switch (action) {
case TxRecord.TxObjectRecordWrite<?> write -> {
// Log.tracev("Flushing object {0}", write.key());
var wrapper = new JDataVersionedWrapperImpl(write.data(), oursId);
curPwMap = curPwMap.plus(write.key(), new PendingWrite(wrapper, oursId));
curBundle.commit(wrapper);
}
case TxRecord.TxObjectRecordDeleted deleted -> {
// Log.tracev("Deleting object {0}", deleted.key());
curPwMap = curPwMap.plus(deleted.key(), new PendingDelete(deleted.key(), oursId));
curBundle.delete(deleted.key());
}
}
}
// Now, make the changes visible to new iterators
var newCurPw = new PendingWriteData(
curPwMap,
curPw.lastFlushedId(),
oursId
);
_pendingWrites.compareAndSet(curPw, newCurPw);
_pendingBundle = curBundle;
_newBundleCondition.signalAll();
_curSize += (curBundle.size() - oldSize);
if (shouldWake && _curSize < sizeLimit) {
_flushCondition.signal();
}
return oursId;
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
_pendingBundleLock.unlock();
} }
} }
@@ -266,17 +265,20 @@ public class WritebackObjectPersistentStore {
fn.run(); fn.run();
return; return;
} }
synchronized (_pendingBundle) { _pendingBundleLock.lock();
try {
if (_lastFlushedId.get() >= bundleId) { if (_lastFlushedId.get() >= bundleId) {
fn.run(); fn.run();
return; return;
} }
var pendingBundle = _pendingBundle.get(); var pendingBundle = _pendingBundle;
if (pendingBundle == null) { if (pendingBundle == null) {
fn.run(); fn.run();
return; return;
} }
pendingBundle.addCallback(fn); pendingBundle.addCallback(fn);
} finally {
_pendingBundleLock.unlock();
} }
} }
@@ -363,10 +365,10 @@ public class WritebackObjectPersistentStore {
private static class TxBundle { private static class TxBundle {
private final HashMap<JObjectKey, BundleEntry> _entries = new HashMap<>(); private final HashMap<JObjectKey, BundleEntry> _entries = new HashMap<>();
private final ArrayList<Runnable> _callbacks = new ArrayList<>(); private final ArrayList<Runnable> _callbacks = new ArrayList<>();
private int _size = 0;
private long _txId; private long _txId;
private long _size = 0;
ArrayList<Runnable> getCallbacks() { ArrayList<Runnable> callbacks() {
return _callbacks; return _callbacks;
} }
@@ -374,14 +376,23 @@ public class WritebackObjectPersistentStore {
_txId = txId; _txId = txId;
} }
public void setId(long id) {
_txId = id;
}
public long id() { public long id() {
return _txId; return _txId;
} }
public void addCallback(Runnable callback) { public void addCallback(Runnable callback) {
_callbacks.add(callback); _callbacks.add(callback);
} }
public int size() {
return _size;
}
private void putEntry(BundleEntry entry) { private void putEntry(BundleEntry entry) {
var old = _entries.put(entry.key(), entry); var old = _entries.put(entry.key(), entry);
if (old != null) { if (old != null) {
@@ -398,24 +409,7 @@ public class WritebackObjectPersistentStore {
putEntry(new DeletedEntry(obj)); putEntry(new DeletedEntry(obj));
} }
public long size() { private sealed interface BundleEntry permits CommittedEntry, DeletedEntry {
return _size;
}
public void compress(TxBundle other) {
if (_txId >= other._txId)
throw new IllegalArgumentException("Compressing an older bundle into newer");
_txId = other._txId;
for (var entry : other._entries.values()) {
putEntry(entry);
}
_callbacks.addAll(other._callbacks);
}
private interface BundleEntry {
JObjectKey key(); JObjectKey key();
int size(); int size();

View File

@@ -59,98 +59,89 @@ public class JObjectManager {
verifyReady(); verifyReady();
var writes = new HashMap<JObjectKey, TxRecord.TxObjectRecord<?>>(); var writes = new HashMap<JObjectKey, TxRecord.TxObjectRecord<?>>();
Snapshot<JObjectKey, JDataVersionedWrapper> commitSnapshot = null; Snapshot<JObjectKey, JDataVersionedWrapper> commitSnapshot = null;
Map<JObjectKey, TransactionObject<?>> readSet = null; Map<JObjectKey, Optional<JDataVersionedWrapper>> readSet = null;
Collection<AutoCloseableNoThrow> toUnlock = null; Collection<AutoCloseableNoThrow> toUnlock = null;
try { try {
try { long pendingCount = 0;
long pendingCount = 0; List<CommitHookIterationData> hookIterationData;
List<CommitHookIterationData> hookIterationData; {
{ CommitHookIterationData[] hookIterationDataArray = new CommitHookIterationData[_preCommitTxHooks.size()];
CommitHookIterationData[] hookIterationDataArray = new CommitHookIterationData[_preCommitTxHooks.size()]; for (int i = 0; i < _preCommitTxHooks.size(); i++) {
for (int i = 0; i < _preCommitTxHooks.size(); i++) { var hook = _preCommitTxHooks.get(i);
var hook = _preCommitTxHooks.get(i); hookIterationDataArray[i] = new CommitHookIterationData(hook, new HashMap<>(), new HashMap<>());
hookIterationDataArray[i] = new CommitHookIterationData(hook, new HashMap<>(), new HashMap<>());
}
hookIterationData = List.of(hookIterationDataArray);
} }
hookIterationData = List.of(hookIterationDataArray);
}
for (var n : tx.drainNewWrites()) { for (var n : tx.drainNewWrites()) {
for (var hookPut : hookIterationData) { for (var hookPut : hookIterationData) {
hookPut.pendingWrites().put(n.key(), n); hookPut.pendingWrites().put(n.key(), n);
pendingCount++; pendingCount++;
}
writes.put(n.key(), n);
} }
writes.put(n.key(), n);
}
// Run hooks for all objects // Run hooks for all objects
// Every hook should see every change made to every object, yet the object's evolution // Every hook should see every change made to every object, yet the object's evolution
// should be consistent from the view point of each individual hook // should be consistent from the view point of each individual hook
// For example, when a hook makes changes to an object, and another hook changes the object before/after it // For example, when a hook makes changes to an object, and another hook changes the object before/after it
// on the next iteration, the first hook should receive the version of the object it had created // on the next iteration, the first hook should receive the version of the object it had created
// as the "old" version, and the new version with all the changes after it. // as the "old" version, and the new version with all the changes after it.
while (pendingCount > 0) { while (pendingCount > 0) {
for (var hookId : hookIterationData) { for (var hookId : hookIterationData) {
var hook = hookId.hook(); var hook = hookId.hook();
var lastCurHookSeen = hookId.lastWrites(); var lastCurHookSeen = hookId.lastWrites();
Function<JObjectKey, JData> getPrev = Function<JObjectKey, JData> getPrev =
key -> switch (lastCurHookSeen.get(key)) { key -> switch (lastCurHookSeen.get(key)) {
case TxRecord.TxObjectRecordWrite<?> write -> write.data(); case TxRecord.TxObjectRecordWrite<?> write -> write.data();
case TxRecord.TxObjectRecordDeleted deleted -> null; case TxRecord.TxObjectRecordDeleted deleted -> null;
case null -> tx.getFromSource(JData.class, key).orElse(null); case null -> tx.getFromSource(JData.class, key).orElse(null);
default -> { default -> {
throw new TxCommitException("Unexpected value: " + writes.get(key)); throw new TxCommitException("Unexpected value: " + writes.get(key));
} }
}; };
var curIteration = hookId.pendingWrites(); var curIteration = hookId.pendingWrites();
// Log.trace("Commit iteration with " + curIteration.size() + " records for hook " + hook.getClass()); // Log.trace("Commit iteration with " + curIteration.size() + " records for hook " + hook.getClass());
for (var entry : curIteration.entrySet()) { for (var entry : curIteration.entrySet()) {
// Log.trace("Running pre-commit hook " + hook.getClass() + " for" + entry.getKey()); // Log.trace("Running pre-commit hook " + hook.getClass() + " for" + entry.getKey());
var oldObj = getPrev.apply(entry.getKey()); var oldObj = getPrev.apply(entry.getKey());
lastCurHookSeen.put(entry.getKey(), entry.getValue()); lastCurHookSeen.put(entry.getKey(), entry.getValue());
switch (entry.getValue()) { switch (entry.getValue()) {
case TxRecord.TxObjectRecordWrite<?> write -> { case TxRecord.TxObjectRecordWrite<?> write -> {
if (oldObj == null) { if (oldObj == null) {
hook.onCreate(write.key(), write.data()); hook.onCreate(write.key(), write.data());
} else { } else {
hook.onChange(write.key(), oldObj, write.data()); hook.onChange(write.key(), oldObj, write.data());
}
} }
case TxRecord.TxObjectRecordDeleted deleted -> {
hook.onDelete(deleted.key(), oldObj);
}
default -> throw new TxCommitException("Unexpected value: " + entry);
} }
} case TxRecord.TxObjectRecordDeleted deleted -> {
hook.onDelete(deleted.key(), oldObj);
pendingCount -= curIteration.size();
curIteration.clear();
for (var n : tx.drainNewWrites()) {
for (var hookPut : hookIterationData) {
if (hookPut == hookId) {
lastCurHookSeen.put(n.key(), n);
continue;
}
var before = hookPut.pendingWrites().put(n.key(), n);
if (before == null)
pendingCount++;
}
writes.put(n.key(), n);
} }
default -> throw new TxCommitException("Unexpected value: " + entry);
} }
} }
} catch (Throwable e) {
for (var read : tx.reads().entrySet()) { pendingCount -= curIteration.size();
if (read.getValue() instanceof TransactionObjectLocked<?> locked) { curIteration.clear();
locked.lock().close();
for (var n : tx.drainNewWrites()) {
for (var hookPut : hookIterationData) {
if (hookPut == hookId) {
lastCurHookSeen.put(n.key(), n);
continue;
}
var before = hookPut.pendingWrites().put(n.key(), n);
if (before == null)
pendingCount++;
}
writes.put(n.key(), n);
} }
} }
throw e;
} }
readSet = tx.reads(); readSet = tx.reads();
@@ -159,11 +150,7 @@ public class JObjectManager {
toUnlock = new ArrayList<>(readSet.size() + writes.size()); toUnlock = new ArrayList<>(readSet.size() + writes.size());
ArrayList<JObjectKey> toLock = new ArrayList<>(readSet.size() + writes.size()); ArrayList<JObjectKey> toLock = new ArrayList<>(readSet.size() + writes.size());
for (var read : readSet.entrySet()) { for (var read : readSet.entrySet()) {
if (read.getValue() instanceof TransactionObjectLocked<?> locked) { toLock.add(read.getKey());
toUnlock.add(locked.lock());
} else {
toLock.add(read.getKey());
}
} }
for (var write : writes.keySet()) { for (var write : writes.keySet()) {
if (!readSet.containsKey(write)) if (!readSet.containsKey(write))
@@ -171,6 +158,8 @@ public class JObjectManager {
} }
toLock.sort(null); toLock.sort(null);
for (var key : toLock) { for (var key : toLock) {
if (tx.knownNew().contains(key))
continue;
var lock = lockManager.lockObject(key); var lock = lockManager.lockObject(key);
toUnlock.add(lock); toUnlock.add(lock);
} }
@@ -182,10 +171,7 @@ public class JObjectManager {
long version = 0L; long version = 0L;
for (var read : readSet.values()) { for (var read : readSet.values()) {
version = Math.max(version, read.data().map(JDataVersionedWrapper::version).orElse(0L)); version = Math.max(version, read.map(JDataVersionedWrapper::version).orElse(0L));
if (read instanceof TransactionObjectLocked<?> locked) {
locked.lock().close();
}
} }
long finalVersion = version; long finalVersion = version;
@@ -218,9 +204,9 @@ public class JObjectManager {
for (var read : readSet.entrySet()) { for (var read : readSet.entrySet()) {
var current = commitSnapshot.readObject(read.getKey()); var current = commitSnapshot.readObject(read.getKey());
if (current.isEmpty() != read.getValue().data().isEmpty()) { if (current.isEmpty() != read.getValue().isEmpty()) {
Log.tracev("Checking read dependency {0} - not found", read.getKey()); Log.tracev("Checking read dependency {0} - not found", read.getKey());
throw new TxCommitException("Serialization hazard: " + current.isEmpty() + " vs " + read.getValue().data().isEmpty()); throw new TxCommitException("Serialization hazard: " + current.isEmpty() + " vs " + read.getValue().isEmpty());
} }
if (current.isEmpty()) { if (current.isEmpty()) {
@@ -272,11 +258,6 @@ public class JObjectManager {
public void rollback(TransactionPrivate tx) { public void rollback(TransactionPrivate tx) {
verifyReady(); verifyReady();
tx.reads().forEach((key, value) -> {
if (value instanceof TransactionObjectLocked<?> locked) {
locked.lock().close();
}
});
tx.close(); tx.close();
} }

View File

@@ -56,10 +56,10 @@ public class TransactionFactoryImpl implements TransactionFactory {
} }
private class TransactionImpl implements TransactionPrivate { private class TransactionImpl implements TransactionPrivate {
private final Map<JObjectKey, TransactionObject<?>> _readSet = new HashMap<>(); private final Map<JObjectKey, Optional<JDataVersionedWrapper>> _readSet = new HashMap<>();
private final NavigableMap<JObjectKey, TxRecord.TxObjectRecord<?>> _writes = new TreeMap<>(); private final NavigableMap<JObjectKey, TxRecord.TxObjectRecord<?>> _writes = new TreeMap<>();
private final List<Runnable> _onCommit = new ArrayList<>(); private final List<Runnable> _onCommit = new LinkedList<>();
private final List<Runnable> _onFlush = new ArrayList<>(); private final List<Runnable> _onFlush = new LinkedList<>();
private final HashSet<JObjectKey> _knownNew = new HashSet<>(); private final HashSet<JObjectKey> _knownNew = new HashSet<>();
private final Snapshot<JObjectKey, JDataVersionedWrapper> _snapshot; private final Snapshot<JObjectKey, JDataVersionedWrapper> _snapshot;
private boolean _closed = false; private boolean _closed = false;
@@ -99,30 +99,14 @@ public class TransactionFactoryImpl implements TransactionFactory {
if (_knownNew.contains(key)) { if (_knownNew.contains(key)) {
return Optional.empty(); return Optional.empty();
} }
return _readSet.computeIfAbsent(key, k -> { var got = _readSet.computeIfAbsent(key, k -> {
var read = _snapshot.readObject(k); var read = _snapshot.readObject(k);
return new TransactionObjectNoLock<>(read); return read;
}) });
.data() if (got.isEmpty())
.map(w -> type.cast(w.data())); return Optional.empty();
} var gotData = got.get();
return Optional.of(type.cast(gotData.data()));
public <T extends JData> Optional<T> getWriteLockedFromSource(Class<T> type, JObjectKey key) {
var got = _readSet.get(key);
if (got == null) {
var lock = lockManager.lockObject(key);
try {
var read = _snapshot.readObject(key);
_readSet.put(key, new TransactionObjectLocked<>(read, lock));
return read.map(JDataVersionedWrapper::data).map(type::cast);
} catch (Exception e) {
lock.close();
throw e;
}
}
return got.data().map(JDataVersionedWrapper::data).map(type::cast);
} }
@Override @Override
@@ -138,13 +122,7 @@ public class TransactionFactoryImpl implements TransactionFactory {
} }
} }
if (neverLock) return getFromSource(type, key);
return getFromSource(type, key);
return switch (strategy) {
case OPTIMISTIC -> getFromSource(type, key);
case WRITE -> getWriteLockedFromSource(type, key);
};
} }
@Override @Override
@@ -186,7 +164,7 @@ public class TransactionFactoryImpl implements TransactionFactory {
@Override @Override
public void put(JData obj) { public void put(JData obj) {
var read = _readSet.get(obj.key()); var read = _readSet.get(obj.key());
if (read != null && (read.data().map(JDataVersionedWrapper::data).orElse(null) == obj)) { if (read != null && (read.map(JDataVersionedWrapper::data).orElse(null) == obj)) {
return; return;
} }
@@ -210,8 +188,13 @@ public class TransactionFactoryImpl implements TransactionFactory {
} }
@Override @Override
public Map<JObjectKey, TransactionObject<?>> reads() { public Map<JObjectKey, Optional<JDataVersionedWrapper>> reads() {
return Collections.unmodifiableMap(_readSet); return _readSet;
}
@Override
public Set<JObjectKey> knownNew() {
return _knownNew;
} }
@Override @Override
@@ -247,7 +230,7 @@ public class TransactionFactoryImpl implements TransactionFactory {
public Pair<JObjectKey, JData> prev() { public Pair<JObjectKey, JData> prev() {
var got = _backing.prev(); var got = _backing.prev();
if (got.getValue() instanceof ReadTrackingInternalCrapSource(JDataVersionedWrapper wrapped)) { if (got.getValue() instanceof ReadTrackingInternalCrapSource(JDataVersionedWrapper wrapped)) {
_readSet.putIfAbsent(got.getKey(), new TransactionObjectNoLock<>(Optional.of(wrapped))); _readSet.putIfAbsent(got.getKey(), Optional.of(wrapped));
} }
return Pair.of(got.getKey(), got.getValue().obj()); return Pair.of(got.getKey(), got.getValue().obj());
} }
@@ -276,7 +259,7 @@ public class TransactionFactoryImpl implements TransactionFactory {
public Pair<JObjectKey, JData> next() { public Pair<JObjectKey, JData> next() {
var got = _backing.next(); var got = _backing.next();
if (got.getValue() instanceof ReadTrackingInternalCrapSource(JDataVersionedWrapper wrapped)) { if (got.getValue() instanceof ReadTrackingInternalCrapSource(JDataVersionedWrapper wrapped)) {
_readSet.putIfAbsent(got.getKey(), new TransactionObjectNoLock<>(Optional.of(wrapped))); _readSet.putIfAbsent(got.getKey(), Optional.of(wrapped));
} }
return Pair.of(got.getKey(), got.getValue().obj()); return Pair.of(got.getKey(), got.getValue().obj());
} }

View File

@@ -1,10 +0,0 @@
package com.usatiuk.objects.transaction;
import com.usatiuk.objects.JData;
import com.usatiuk.objects.JDataVersionedWrapper;
import java.util.Optional;
public interface TransactionObject<T extends JData> {
Optional<JDataVersionedWrapper> data();
}

View File

@@ -1,12 +0,0 @@
package com.usatiuk.objects.transaction;
import com.usatiuk.objects.JData;
import com.usatiuk.objects.JDataVersionedWrapper;
import com.usatiuk.utils.AutoCloseableNoThrow;
import java.util.Optional;
public record TransactionObjectLocked<T extends JData>
(Optional<JDataVersionedWrapper> data, AutoCloseableNoThrow lock)
implements TransactionObject<T> {
}

View File

@@ -1,11 +0,0 @@
package com.usatiuk.objects.transaction;
import com.usatiuk.objects.JData;
import com.usatiuk.objects.JDataVersionedWrapper;
import java.util.Optional;
public record TransactionObjectNoLock<T extends JData>
(Optional<JDataVersionedWrapper> data)
implements TransactionObject<T> {
}

View File

@@ -9,12 +9,15 @@ import com.usatiuk.utils.AutoCloseableNoThrow;
import java.util.Collection; import java.util.Collection;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.Set;
// The transaction interface actually used by user code to retrieve objects // The transaction interface actually used by user code to retrieve objects
public interface TransactionPrivate extends Transaction, TransactionHandlePrivate, AutoCloseableNoThrow { public interface TransactionPrivate extends Transaction, TransactionHandlePrivate, AutoCloseableNoThrow {
Collection<TxRecord.TxObjectRecord<?>> drainNewWrites(); Collection<TxRecord.TxObjectRecord<?>> drainNewWrites();
Map<JObjectKey, TransactionObject<?>> reads(); Map<JObjectKey, Optional<JDataVersionedWrapper>> reads();
Set<JObjectKey> knownNew();
<T extends JData> Optional<T> getFromSource(Class<T> type, JObjectKey key); <T extends JData> Optional<T> getFromSource(Class<T> type, JObjectKey key);

View File

@@ -4,7 +4,7 @@ import com.usatiuk.objects.JData;
import com.usatiuk.objects.JObjectKey; import com.usatiuk.objects.JObjectKey;
public class TxRecord { public class TxRecord {
public interface TxObjectRecord<T> { public sealed interface TxObjectRecord<T> permits TxObjectRecordWrite, TxObjectRecordDeleted {
JObjectKey key(); JObjectKey key();
} }