mirror of
https://github.com/usatiuk/dhfs.git
synced 2025-10-28 12:37:48 +01:00
Compare commits
4 Commits
8ab034402d
...
b84ef95703
| Author | SHA1 | Date | |
|---|---|---|---|
| b84ef95703 | |||
| c0735801b9 | |||
| b506ced9d5 | |||
| 46bc9fa810 |
@@ -25,36 +25,41 @@ import javax.annotation.Nonnull;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Semaphore;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.concurrent.locks.Condition;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
@ApplicationScoped
|
||||
public class WritebackObjectPersistentStore {
|
||||
@Inject
|
||||
CachingObjectPersistentStore cachedStore;
|
||||
@Inject
|
||||
ExecutorService _callbackExecutor;
|
||||
|
||||
@ConfigProperty(name = "dhfs.objects.writeback.limit")
|
||||
long sizeLimit;
|
||||
int sizeLimit;
|
||||
|
||||
private TxBundle _pendingBundle = null;
|
||||
private int _curSize = 0;
|
||||
|
||||
private final AtomicReference<TxBundle> _pendingBundle = new AtomicReference<>(null);
|
||||
private final AtomicReference<PendingWriteData> _pendingWrites = new AtomicReference<>(null);
|
||||
|
||||
private final Object _flushWaitSynchronizer = new Object();
|
||||
private final ReentrantLock _pendingBundleLock = new ReentrantLock();
|
||||
|
||||
private final Condition _newBundleCondition = _pendingBundleLock.newCondition();
|
||||
private final Condition _flushCondition = _pendingBundleLock.newCondition();
|
||||
|
||||
private final AtomicLong _lastFlushedId = new AtomicLong(-1);
|
||||
private final AtomicLong _lastCommittedId = new AtomicLong(-1);
|
||||
|
||||
private final AtomicLong _waitedTotal = new AtomicLong(0);
|
||||
private long currentSize = 0;
|
||||
|
||||
private ExecutorService _writebackExecutor;
|
||||
private ExecutorService _statusExecutor;
|
||||
|
||||
@Inject
|
||||
ExecutorService _callbackExecutor;
|
||||
|
||||
private volatile boolean _ready = false;
|
||||
|
||||
void init(@Observes @Priority(120) StartupEvent event) {
|
||||
@@ -72,8 +77,8 @@ public class WritebackObjectPersistentStore {
|
||||
try {
|
||||
while (true) {
|
||||
Thread.sleep(1000);
|
||||
if (currentSize > 0)
|
||||
Log.info("Tx commit status: size=" + currentSize / 1024 / 1024 + "MB");
|
||||
if (_curSize > 0)
|
||||
Log.info("Tx commit status: size=" + _curSize / 1024 / 1024 + "MB");
|
||||
}
|
||||
} catch (InterruptedException ignored) {
|
||||
}
|
||||
@@ -91,11 +96,14 @@ public class WritebackObjectPersistentStore {
|
||||
void shutdown(@Observes @Priority(890) ShutdownEvent event) throws InterruptedException {
|
||||
Log.info("Waiting for all transactions to drain");
|
||||
|
||||
synchronized (_flushWaitSynchronizer) {
|
||||
_ready = false;
|
||||
while (currentSize > 0) {
|
||||
_flushWaitSynchronizer.wait();
|
||||
_ready = false;
|
||||
_pendingBundleLock.lock();
|
||||
try {
|
||||
while (_curSize > 0) {
|
||||
_flushCondition.await();
|
||||
}
|
||||
} finally {
|
||||
_pendingBundleLock.unlock();
|
||||
}
|
||||
|
||||
_writebackExecutor.shutdownNow();
|
||||
@@ -110,9 +118,18 @@ public class WritebackObjectPersistentStore {
|
||||
while (!Thread.interrupted()) {
|
||||
try {
|
||||
TxBundle bundle;
|
||||
synchronized (_pendingBundle) {
|
||||
while ((bundle = _pendingBundle.getAndSet(null)) == null)
|
||||
_pendingBundle.wait();
|
||||
_pendingBundleLock.lock();
|
||||
try {
|
||||
while (_pendingBundle == null)
|
||||
_newBundleCondition.await();
|
||||
bundle = _pendingBundle;
|
||||
_pendingBundle = null;
|
||||
|
||||
_curSize -= bundle.size();
|
||||
assert _curSize == 0;
|
||||
_flushCondition.signal();
|
||||
} finally {
|
||||
_pendingBundleLock.unlock();
|
||||
}
|
||||
|
||||
var toWrite = new ArrayList<Pair<JObjectKey, JDataVersionedWrapper>>();
|
||||
@@ -136,7 +153,8 @@ public class WritebackObjectPersistentStore {
|
||||
|
||||
Log.tracev("Bundle {0} committed", bundle.id());
|
||||
|
||||
synchronized (_pendingWrites) {
|
||||
_pendingBundleLock.lock();
|
||||
try {
|
||||
var curPw = _pendingWrites.get();
|
||||
var curPwMap = curPw.pendingWrites();
|
||||
for (var e : bundle._entries.values()) {
|
||||
@@ -150,19 +168,15 @@ public class WritebackObjectPersistentStore {
|
||||
curPw.lastCommittedId()
|
||||
);
|
||||
_pendingWrites.compareAndSet(curPw, newCurPw);
|
||||
} finally {
|
||||
_pendingBundleLock.unlock();
|
||||
}
|
||||
|
||||
_lastFlushedId.set(bundle.id());
|
||||
var callbacks = bundle.getCallbacks();
|
||||
var callbacks = bundle.callbacks();
|
||||
_callbackExecutor.submit(() -> {
|
||||
callbacks.forEach(Runnable::run);
|
||||
});
|
||||
|
||||
synchronized (_flushWaitSynchronizer) {
|
||||
currentSize -= bundle.size();
|
||||
if (currentSize <= sizeLimit || !_ready)
|
||||
_flushWaitSynchronizer.notifyAll();
|
||||
}
|
||||
} catch (InterruptedException ignored) {
|
||||
} catch (Exception e) {
|
||||
Log.error("Uncaught exception in writeback", e);
|
||||
@@ -175,87 +189,72 @@ public class WritebackObjectPersistentStore {
|
||||
|
||||
public long commitBundle(Collection<TxRecord.TxObjectRecord<?>> writes) {
|
||||
verifyReady();
|
||||
while (true) {
|
||||
synchronized (_flushWaitSynchronizer) {
|
||||
_pendingBundleLock.lock();
|
||||
try {
|
||||
boolean shouldWake = false;
|
||||
if (_curSize > sizeLimit) {
|
||||
shouldWake = true;
|
||||
long started = System.currentTimeMillis();
|
||||
while (currentSize > sizeLimit) {
|
||||
try {
|
||||
_flushWaitSynchronizer.wait();
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
while (_curSize > sizeLimit)
|
||||
_flushCondition.await();
|
||||
long waited = System.currentTimeMillis() - started;
|
||||
_waitedTotal.addAndGet(waited);
|
||||
if (Log.isTraceEnabled())
|
||||
Log.tracev("Thread {0} waited for tx bundle for {1} ms", Thread.currentThread().getName(), waited);
|
||||
}
|
||||
|
||||
synchronized (_pendingBundle) {
|
||||
synchronized (_flushWaitSynchronizer) {
|
||||
if (currentSize > sizeLimit) {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
var oursId = _lastCommittedId.incrementAndGet();
|
||||
|
||||
TxBundle bundle = new TxBundle(_lastCommittedId.incrementAndGet());
|
||||
|
||||
for (var action : writes) {
|
||||
switch (action) {
|
||||
case TxRecord.TxObjectRecordWrite<?> write -> {
|
||||
// Log.tracev("Flushing object {0}", write.key());
|
||||
bundle.commit(new JDataVersionedWrapperImpl(write.data(), bundle.id()));
|
||||
}
|
||||
case TxRecord.TxObjectRecordDeleted deleted -> {
|
||||
// Log.tracev("Deleting object {0}", deleted.key());
|
||||
bundle.delete(deleted.key());
|
||||
}
|
||||
default -> {
|
||||
throw new TxCommitException("Unexpected value: " + action.key());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
synchronized (_pendingWrites) {
|
||||
var curPw = _pendingWrites.get();
|
||||
var curPwMap = curPw.pendingWrites();
|
||||
for (var e : ((TxBundle) bundle)._entries.values()) {
|
||||
switch (e) {
|
||||
case TxBundle.CommittedEntry c -> {
|
||||
curPwMap = curPwMap.plus(c.key(), new PendingWrite(c.data, bundle.id()));
|
||||
}
|
||||
case TxBundle.DeletedEntry d -> {
|
||||
curPwMap = curPwMap.plus(d.key(), new PendingDelete(d.key, bundle.id()));
|
||||
}
|
||||
default -> throw new IllegalStateException("Unexpected value: " + e);
|
||||
}
|
||||
}
|
||||
// Now, make the changes visible to new iterators
|
||||
var newCurPw = new PendingWriteData(
|
||||
curPwMap,
|
||||
curPw.lastFlushedId(),
|
||||
bundle.id()
|
||||
);
|
||||
|
||||
_pendingWrites.compareAndSet(curPw, newCurPw);
|
||||
}
|
||||
|
||||
var curBundle = _pendingBundle.get();
|
||||
long oldSize = 0;
|
||||
if (curBundle != null) {
|
||||
oldSize = curBundle.size();
|
||||
curBundle.compress(bundle);
|
||||
} else {
|
||||
curBundle = bundle;
|
||||
}
|
||||
_pendingBundle.set(curBundle);
|
||||
_pendingBundle.notifyAll();
|
||||
synchronized (_flushWaitSynchronizer) {
|
||||
currentSize += (curBundle.size() - oldSize);
|
||||
}
|
||||
|
||||
return bundle.id();
|
||||
var curBundle = _pendingBundle;
|
||||
int oldSize = 0;
|
||||
if (curBundle != null) {
|
||||
oldSize = curBundle.size();
|
||||
curBundle.setId(oursId);
|
||||
} else {
|
||||
curBundle = new TxBundle(oursId);
|
||||
}
|
||||
|
||||
var curPw = _pendingWrites.get();
|
||||
var curPwMap = curPw.pendingWrites();
|
||||
|
||||
for (var action : writes) {
|
||||
switch (action) {
|
||||
case TxRecord.TxObjectRecordWrite<?> write -> {
|
||||
// Log.tracev("Flushing object {0}", write.key());
|
||||
var wrapper = new JDataVersionedWrapperImpl(write.data(), oursId);
|
||||
curPwMap = curPwMap.plus(write.key(), new PendingWrite(wrapper, oursId));
|
||||
curBundle.commit(wrapper);
|
||||
}
|
||||
case TxRecord.TxObjectRecordDeleted deleted -> {
|
||||
// Log.tracev("Deleting object {0}", deleted.key());
|
||||
curPwMap = curPwMap.plus(deleted.key(), new PendingDelete(deleted.key(), oursId));
|
||||
curBundle.delete(deleted.key());
|
||||
}
|
||||
}
|
||||
}
|
||||
// Now, make the changes visible to new iterators
|
||||
var newCurPw = new PendingWriteData(
|
||||
curPwMap,
|
||||
curPw.lastFlushedId(),
|
||||
oursId
|
||||
);
|
||||
|
||||
_pendingWrites.compareAndSet(curPw, newCurPw);
|
||||
|
||||
_pendingBundle = curBundle;
|
||||
_newBundleCondition.signalAll();
|
||||
|
||||
_curSize += (curBundle.size() - oldSize);
|
||||
|
||||
if (shouldWake && _curSize < sizeLimit) {
|
||||
_flushCondition.signal();
|
||||
}
|
||||
|
||||
return oursId;
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
} finally {
|
||||
_pendingBundleLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -266,17 +265,20 @@ public class WritebackObjectPersistentStore {
|
||||
fn.run();
|
||||
return;
|
||||
}
|
||||
synchronized (_pendingBundle) {
|
||||
_pendingBundleLock.lock();
|
||||
try {
|
||||
if (_lastFlushedId.get() >= bundleId) {
|
||||
fn.run();
|
||||
return;
|
||||
}
|
||||
var pendingBundle = _pendingBundle.get();
|
||||
var pendingBundle = _pendingBundle;
|
||||
if (pendingBundle == null) {
|
||||
fn.run();
|
||||
return;
|
||||
}
|
||||
pendingBundle.addCallback(fn);
|
||||
} finally {
|
||||
_pendingBundleLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -363,10 +365,10 @@ public class WritebackObjectPersistentStore {
|
||||
private static class TxBundle {
|
||||
private final HashMap<JObjectKey, BundleEntry> _entries = new HashMap<>();
|
||||
private final ArrayList<Runnable> _callbacks = new ArrayList<>();
|
||||
private int _size = 0;
|
||||
private long _txId;
|
||||
private long _size = 0;
|
||||
|
||||
ArrayList<Runnable> getCallbacks() {
|
||||
ArrayList<Runnable> callbacks() {
|
||||
return _callbacks;
|
||||
}
|
||||
|
||||
@@ -374,14 +376,23 @@ public class WritebackObjectPersistentStore {
|
||||
_txId = txId;
|
||||
}
|
||||
|
||||
public void setId(long id) {
|
||||
_txId = id;
|
||||
}
|
||||
|
||||
public long id() {
|
||||
return _txId;
|
||||
|
||||
}
|
||||
|
||||
public void addCallback(Runnable callback) {
|
||||
_callbacks.add(callback);
|
||||
}
|
||||
|
||||
public int size() {
|
||||
return _size;
|
||||
}
|
||||
|
||||
private void putEntry(BundleEntry entry) {
|
||||
var old = _entries.put(entry.key(), entry);
|
||||
if (old != null) {
|
||||
@@ -398,24 +409,7 @@ public class WritebackObjectPersistentStore {
|
||||
putEntry(new DeletedEntry(obj));
|
||||
}
|
||||
|
||||
public long size() {
|
||||
return _size;
|
||||
}
|
||||
|
||||
public void compress(TxBundle other) {
|
||||
if (_txId >= other._txId)
|
||||
throw new IllegalArgumentException("Compressing an older bundle into newer");
|
||||
|
||||
_txId = other._txId;
|
||||
|
||||
for (var entry : other._entries.values()) {
|
||||
putEntry(entry);
|
||||
}
|
||||
|
||||
_callbacks.addAll(other._callbacks);
|
||||
}
|
||||
|
||||
private interface BundleEntry {
|
||||
private sealed interface BundleEntry permits CommittedEntry, DeletedEntry {
|
||||
JObjectKey key();
|
||||
|
||||
int size();
|
||||
|
||||
@@ -59,98 +59,89 @@ public class JObjectManager {
|
||||
verifyReady();
|
||||
var writes = new HashMap<JObjectKey, TxRecord.TxObjectRecord<?>>();
|
||||
Snapshot<JObjectKey, JDataVersionedWrapper> commitSnapshot = null;
|
||||
Map<JObjectKey, TransactionObject<?>> readSet = null;
|
||||
Map<JObjectKey, Optional<JDataVersionedWrapper>> readSet = null;
|
||||
Collection<AutoCloseableNoThrow> toUnlock = null;
|
||||
|
||||
try {
|
||||
try {
|
||||
long pendingCount = 0;
|
||||
List<CommitHookIterationData> hookIterationData;
|
||||
{
|
||||
CommitHookIterationData[] hookIterationDataArray = new CommitHookIterationData[_preCommitTxHooks.size()];
|
||||
for (int i = 0; i < _preCommitTxHooks.size(); i++) {
|
||||
var hook = _preCommitTxHooks.get(i);
|
||||
hookIterationDataArray[i] = new CommitHookIterationData(hook, new HashMap<>(), new HashMap<>());
|
||||
}
|
||||
hookIterationData = List.of(hookIterationDataArray);
|
||||
long pendingCount = 0;
|
||||
List<CommitHookIterationData> hookIterationData;
|
||||
{
|
||||
CommitHookIterationData[] hookIterationDataArray = new CommitHookIterationData[_preCommitTxHooks.size()];
|
||||
for (int i = 0; i < _preCommitTxHooks.size(); i++) {
|
||||
var hook = _preCommitTxHooks.get(i);
|
||||
hookIterationDataArray[i] = new CommitHookIterationData(hook, new HashMap<>(), new HashMap<>());
|
||||
}
|
||||
hookIterationData = List.of(hookIterationDataArray);
|
||||
}
|
||||
|
||||
for (var n : tx.drainNewWrites()) {
|
||||
for (var hookPut : hookIterationData) {
|
||||
hookPut.pendingWrites().put(n.key(), n);
|
||||
pendingCount++;
|
||||
}
|
||||
writes.put(n.key(), n);
|
||||
for (var n : tx.drainNewWrites()) {
|
||||
for (var hookPut : hookIterationData) {
|
||||
hookPut.pendingWrites().put(n.key(), n);
|
||||
pendingCount++;
|
||||
}
|
||||
writes.put(n.key(), n);
|
||||
}
|
||||
|
||||
|
||||
// Run hooks for all objects
|
||||
// Every hook should see every change made to every object, yet the object's evolution
|
||||
// should be consistent from the view point of each individual hook
|
||||
// For example, when a hook makes changes to an object, and another hook changes the object before/after it
|
||||
// on the next iteration, the first hook should receive the version of the object it had created
|
||||
// as the "old" version, and the new version with all the changes after it.
|
||||
while (pendingCount > 0) {
|
||||
for (var hookId : hookIterationData) {
|
||||
var hook = hookId.hook();
|
||||
var lastCurHookSeen = hookId.lastWrites();
|
||||
Function<JObjectKey, JData> getPrev =
|
||||
key -> switch (lastCurHookSeen.get(key)) {
|
||||
case TxRecord.TxObjectRecordWrite<?> write -> write.data();
|
||||
case TxRecord.TxObjectRecordDeleted deleted -> null;
|
||||
case null -> tx.getFromSource(JData.class, key).orElse(null);
|
||||
default -> {
|
||||
throw new TxCommitException("Unexpected value: " + writes.get(key));
|
||||
}
|
||||
};
|
||||
// Run hooks for all objects
|
||||
// Every hook should see every change made to every object, yet the object's evolution
|
||||
// should be consistent from the view point of each individual hook
|
||||
// For example, when a hook makes changes to an object, and another hook changes the object before/after it
|
||||
// on the next iteration, the first hook should receive the version of the object it had created
|
||||
// as the "old" version, and the new version with all the changes after it.
|
||||
while (pendingCount > 0) {
|
||||
for (var hookId : hookIterationData) {
|
||||
var hook = hookId.hook();
|
||||
var lastCurHookSeen = hookId.lastWrites();
|
||||
Function<JObjectKey, JData> getPrev =
|
||||
key -> switch (lastCurHookSeen.get(key)) {
|
||||
case TxRecord.TxObjectRecordWrite<?> write -> write.data();
|
||||
case TxRecord.TxObjectRecordDeleted deleted -> null;
|
||||
case null -> tx.getFromSource(JData.class, key).orElse(null);
|
||||
default -> {
|
||||
throw new TxCommitException("Unexpected value: " + writes.get(key));
|
||||
}
|
||||
};
|
||||
|
||||
var curIteration = hookId.pendingWrites();
|
||||
var curIteration = hookId.pendingWrites();
|
||||
|
||||
// Log.trace("Commit iteration with " + curIteration.size() + " records for hook " + hook.getClass());
|
||||
|
||||
for (var entry : curIteration.entrySet()) {
|
||||
for (var entry : curIteration.entrySet()) {
|
||||
// Log.trace("Running pre-commit hook " + hook.getClass() + " for" + entry.getKey());
|
||||
var oldObj = getPrev.apply(entry.getKey());
|
||||
lastCurHookSeen.put(entry.getKey(), entry.getValue());
|
||||
switch (entry.getValue()) {
|
||||
case TxRecord.TxObjectRecordWrite<?> write -> {
|
||||
if (oldObj == null) {
|
||||
hook.onCreate(write.key(), write.data());
|
||||
} else {
|
||||
hook.onChange(write.key(), oldObj, write.data());
|
||||
}
|
||||
var oldObj = getPrev.apply(entry.getKey());
|
||||
lastCurHookSeen.put(entry.getKey(), entry.getValue());
|
||||
switch (entry.getValue()) {
|
||||
case TxRecord.TxObjectRecordWrite<?> write -> {
|
||||
if (oldObj == null) {
|
||||
hook.onCreate(write.key(), write.data());
|
||||
} else {
|
||||
hook.onChange(write.key(), oldObj, write.data());
|
||||
}
|
||||
case TxRecord.TxObjectRecordDeleted deleted -> {
|
||||
hook.onDelete(deleted.key(), oldObj);
|
||||
}
|
||||
default -> throw new TxCommitException("Unexpected value: " + entry);
|
||||
}
|
||||
}
|
||||
|
||||
pendingCount -= curIteration.size();
|
||||
curIteration.clear();
|
||||
|
||||
for (var n : tx.drainNewWrites()) {
|
||||
for (var hookPut : hookIterationData) {
|
||||
if (hookPut == hookId) {
|
||||
lastCurHookSeen.put(n.key(), n);
|
||||
continue;
|
||||
}
|
||||
var before = hookPut.pendingWrites().put(n.key(), n);
|
||||
if (before == null)
|
||||
pendingCount++;
|
||||
}
|
||||
writes.put(n.key(), n);
|
||||
case TxRecord.TxObjectRecordDeleted deleted -> {
|
||||
hook.onDelete(deleted.key(), oldObj);
|
||||
}
|
||||
default -> throw new TxCommitException("Unexpected value: " + entry);
|
||||
}
|
||||
}
|
||||
} catch (Throwable e) {
|
||||
for (var read : tx.reads().entrySet()) {
|
||||
if (read.getValue() instanceof TransactionObjectLocked<?> locked) {
|
||||
locked.lock().close();
|
||||
|
||||
pendingCount -= curIteration.size();
|
||||
curIteration.clear();
|
||||
|
||||
for (var n : tx.drainNewWrites()) {
|
||||
for (var hookPut : hookIterationData) {
|
||||
if (hookPut == hookId) {
|
||||
lastCurHookSeen.put(n.key(), n);
|
||||
continue;
|
||||
}
|
||||
var before = hookPut.pendingWrites().put(n.key(), n);
|
||||
if (before == null)
|
||||
pendingCount++;
|
||||
}
|
||||
writes.put(n.key(), n);
|
||||
}
|
||||
}
|
||||
throw e;
|
||||
}
|
||||
|
||||
readSet = tx.reads();
|
||||
@@ -159,11 +150,7 @@ public class JObjectManager {
|
||||
toUnlock = new ArrayList<>(readSet.size() + writes.size());
|
||||
ArrayList<JObjectKey> toLock = new ArrayList<>(readSet.size() + writes.size());
|
||||
for (var read : readSet.entrySet()) {
|
||||
if (read.getValue() instanceof TransactionObjectLocked<?> locked) {
|
||||
toUnlock.add(locked.lock());
|
||||
} else {
|
||||
toLock.add(read.getKey());
|
||||
}
|
||||
toLock.add(read.getKey());
|
||||
}
|
||||
for (var write : writes.keySet()) {
|
||||
if (!readSet.containsKey(write))
|
||||
@@ -171,6 +158,8 @@ public class JObjectManager {
|
||||
}
|
||||
toLock.sort(null);
|
||||
for (var key : toLock) {
|
||||
if (tx.knownNew().contains(key))
|
||||
continue;
|
||||
var lock = lockManager.lockObject(key);
|
||||
toUnlock.add(lock);
|
||||
}
|
||||
@@ -182,10 +171,7 @@ public class JObjectManager {
|
||||
long version = 0L;
|
||||
|
||||
for (var read : readSet.values()) {
|
||||
version = Math.max(version, read.data().map(JDataVersionedWrapper::version).orElse(0L));
|
||||
if (read instanceof TransactionObjectLocked<?> locked) {
|
||||
locked.lock().close();
|
||||
}
|
||||
version = Math.max(version, read.map(JDataVersionedWrapper::version).orElse(0L));
|
||||
}
|
||||
|
||||
long finalVersion = version;
|
||||
@@ -218,9 +204,9 @@ public class JObjectManager {
|
||||
for (var read : readSet.entrySet()) {
|
||||
var current = commitSnapshot.readObject(read.getKey());
|
||||
|
||||
if (current.isEmpty() != read.getValue().data().isEmpty()) {
|
||||
if (current.isEmpty() != read.getValue().isEmpty()) {
|
||||
Log.tracev("Checking read dependency {0} - not found", read.getKey());
|
||||
throw new TxCommitException("Serialization hazard: " + current.isEmpty() + " vs " + read.getValue().data().isEmpty());
|
||||
throw new TxCommitException("Serialization hazard: " + current.isEmpty() + " vs " + read.getValue().isEmpty());
|
||||
}
|
||||
|
||||
if (current.isEmpty()) {
|
||||
@@ -272,11 +258,6 @@ public class JObjectManager {
|
||||
|
||||
public void rollback(TransactionPrivate tx) {
|
||||
verifyReady();
|
||||
tx.reads().forEach((key, value) -> {
|
||||
if (value instanceof TransactionObjectLocked<?> locked) {
|
||||
locked.lock().close();
|
||||
}
|
||||
});
|
||||
tx.close();
|
||||
}
|
||||
|
||||
|
||||
@@ -56,10 +56,10 @@ public class TransactionFactoryImpl implements TransactionFactory {
|
||||
}
|
||||
|
||||
private class TransactionImpl implements TransactionPrivate {
|
||||
private final Map<JObjectKey, TransactionObject<?>> _readSet = new HashMap<>();
|
||||
private final Map<JObjectKey, Optional<JDataVersionedWrapper>> _readSet = new HashMap<>();
|
||||
private final NavigableMap<JObjectKey, TxRecord.TxObjectRecord<?>> _writes = new TreeMap<>();
|
||||
private final List<Runnable> _onCommit = new ArrayList<>();
|
||||
private final List<Runnable> _onFlush = new ArrayList<>();
|
||||
private final List<Runnable> _onCommit = new LinkedList<>();
|
||||
private final List<Runnable> _onFlush = new LinkedList<>();
|
||||
private final HashSet<JObjectKey> _knownNew = new HashSet<>();
|
||||
private final Snapshot<JObjectKey, JDataVersionedWrapper> _snapshot;
|
||||
private boolean _closed = false;
|
||||
@@ -99,30 +99,14 @@ public class TransactionFactoryImpl implements TransactionFactory {
|
||||
if (_knownNew.contains(key)) {
|
||||
return Optional.empty();
|
||||
}
|
||||
return _readSet.computeIfAbsent(key, k -> {
|
||||
var read = _snapshot.readObject(k);
|
||||
return new TransactionObjectNoLock<>(read);
|
||||
})
|
||||
.data()
|
||||
.map(w -> type.cast(w.data()));
|
||||
}
|
||||
|
||||
public <T extends JData> Optional<T> getWriteLockedFromSource(Class<T> type, JObjectKey key) {
|
||||
var got = _readSet.get(key);
|
||||
|
||||
if (got == null) {
|
||||
var lock = lockManager.lockObject(key);
|
||||
try {
|
||||
var read = _snapshot.readObject(key);
|
||||
_readSet.put(key, new TransactionObjectLocked<>(read, lock));
|
||||
return read.map(JDataVersionedWrapper::data).map(type::cast);
|
||||
} catch (Exception e) {
|
||||
lock.close();
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
return got.data().map(JDataVersionedWrapper::data).map(type::cast);
|
||||
var got = _readSet.computeIfAbsent(key, k -> {
|
||||
var read = _snapshot.readObject(k);
|
||||
return read;
|
||||
});
|
||||
if (got.isEmpty())
|
||||
return Optional.empty();
|
||||
var gotData = got.get();
|
||||
return Optional.of(type.cast(gotData.data()));
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -138,13 +122,7 @@ public class TransactionFactoryImpl implements TransactionFactory {
|
||||
}
|
||||
}
|
||||
|
||||
if (neverLock)
|
||||
return getFromSource(type, key);
|
||||
|
||||
return switch (strategy) {
|
||||
case OPTIMISTIC -> getFromSource(type, key);
|
||||
case WRITE -> getWriteLockedFromSource(type, key);
|
||||
};
|
||||
return getFromSource(type, key);
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -186,7 +164,7 @@ public class TransactionFactoryImpl implements TransactionFactory {
|
||||
@Override
|
||||
public void put(JData obj) {
|
||||
var read = _readSet.get(obj.key());
|
||||
if (read != null && (read.data().map(JDataVersionedWrapper::data).orElse(null) == obj)) {
|
||||
if (read != null && (read.map(JDataVersionedWrapper::data).orElse(null) == obj)) {
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -210,8 +188,13 @@ public class TransactionFactoryImpl implements TransactionFactory {
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<JObjectKey, TransactionObject<?>> reads() {
|
||||
return Collections.unmodifiableMap(_readSet);
|
||||
public Map<JObjectKey, Optional<JDataVersionedWrapper>> reads() {
|
||||
return _readSet;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<JObjectKey> knownNew() {
|
||||
return _knownNew;
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -247,7 +230,7 @@ public class TransactionFactoryImpl implements TransactionFactory {
|
||||
public Pair<JObjectKey, JData> prev() {
|
||||
var got = _backing.prev();
|
||||
if (got.getValue() instanceof ReadTrackingInternalCrapSource(JDataVersionedWrapper wrapped)) {
|
||||
_readSet.putIfAbsent(got.getKey(), new TransactionObjectNoLock<>(Optional.of(wrapped)));
|
||||
_readSet.putIfAbsent(got.getKey(), Optional.of(wrapped));
|
||||
}
|
||||
return Pair.of(got.getKey(), got.getValue().obj());
|
||||
}
|
||||
@@ -276,7 +259,7 @@ public class TransactionFactoryImpl implements TransactionFactory {
|
||||
public Pair<JObjectKey, JData> next() {
|
||||
var got = _backing.next();
|
||||
if (got.getValue() instanceof ReadTrackingInternalCrapSource(JDataVersionedWrapper wrapped)) {
|
||||
_readSet.putIfAbsent(got.getKey(), new TransactionObjectNoLock<>(Optional.of(wrapped)));
|
||||
_readSet.putIfAbsent(got.getKey(), Optional.of(wrapped));
|
||||
}
|
||||
return Pair.of(got.getKey(), got.getValue().obj());
|
||||
}
|
||||
|
||||
@@ -1,10 +0,0 @@
|
||||
package com.usatiuk.objects.transaction;
|
||||
|
||||
import com.usatiuk.objects.JData;
|
||||
import com.usatiuk.objects.JDataVersionedWrapper;
|
||||
|
||||
import java.util.Optional;
|
||||
|
||||
public interface TransactionObject<T extends JData> {
|
||||
Optional<JDataVersionedWrapper> data();
|
||||
}
|
||||
@@ -1,12 +0,0 @@
|
||||
package com.usatiuk.objects.transaction;
|
||||
|
||||
import com.usatiuk.objects.JData;
|
||||
import com.usatiuk.objects.JDataVersionedWrapper;
|
||||
import com.usatiuk.utils.AutoCloseableNoThrow;
|
||||
|
||||
import java.util.Optional;
|
||||
|
||||
public record TransactionObjectLocked<T extends JData>
|
||||
(Optional<JDataVersionedWrapper> data, AutoCloseableNoThrow lock)
|
||||
implements TransactionObject<T> {
|
||||
}
|
||||
@@ -1,11 +0,0 @@
|
||||
package com.usatiuk.objects.transaction;
|
||||
|
||||
import com.usatiuk.objects.JData;
|
||||
import com.usatiuk.objects.JDataVersionedWrapper;
|
||||
|
||||
import java.util.Optional;
|
||||
|
||||
public record TransactionObjectNoLock<T extends JData>
|
||||
(Optional<JDataVersionedWrapper> data)
|
||||
implements TransactionObject<T> {
|
||||
}
|
||||
@@ -9,12 +9,15 @@ import com.usatiuk.utils.AutoCloseableNoThrow;
|
||||
import java.util.Collection;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
|
||||
// The transaction interface actually used by user code to retrieve objects
|
||||
public interface TransactionPrivate extends Transaction, TransactionHandlePrivate, AutoCloseableNoThrow {
|
||||
Collection<TxRecord.TxObjectRecord<?>> drainNewWrites();
|
||||
|
||||
Map<JObjectKey, TransactionObject<?>> reads();
|
||||
Map<JObjectKey, Optional<JDataVersionedWrapper>> reads();
|
||||
|
||||
Set<JObjectKey> knownNew();
|
||||
|
||||
<T extends JData> Optional<T> getFromSource(Class<T> type, JObjectKey key);
|
||||
|
||||
|
||||
@@ -4,7 +4,7 @@ import com.usatiuk.objects.JData;
|
||||
import com.usatiuk.objects.JObjectKey;
|
||||
|
||||
public class TxRecord {
|
||||
public interface TxObjectRecord<T> {
|
||||
public sealed interface TxObjectRecord<T> permits TxObjectRecordWrite, TxObjectRecordDeleted {
|
||||
JObjectKey key();
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user