diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/transaction/JObjectManager.java b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/transaction/JObjectManager.java index f61ea544..4f468d5f 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/transaction/JObjectManager.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/transaction/JObjectManager.java @@ -1,5 +1,6 @@ package com.usatiuk.objects.transaction; +import com.google.common.collect.Streams; import com.usatiuk.dhfs.utils.AutoCloseableNoThrow; import com.usatiuk.objects.JData; import com.usatiuk.objects.JDataVersionedWrapper; @@ -63,11 +64,10 @@ public class JObjectManager { public Pair, TransactionHandle> commit(TransactionPrivate tx) { verifyReady(); - var writes = new LinkedHashMap>(); - var dependenciesLocked = new LinkedHashMap>(); + var writes = new HashMap>(); Snapshot commitSnapshot = null; - Map> readSet; - var toUnlock = new ArrayList(); + Map> readSet = null; + Collection toUnlock = null; try { try { @@ -96,7 +96,7 @@ public class JObjectManager { // For example, when a hook makes changes to an object, and another hook changes the object before/after it // on the next iteration, the first hook should receive the version of the object it had created // as the "old" version, and the new version with all the changes after it. - do { + while (pendingCount > 0) { for (var hookId : hookIterationData) { var hook = hookId.hook(); var lastCurHookSeen = hookId.lastWrites(); @@ -149,11 +149,11 @@ public class JObjectManager { writes.put(n.key(), n); } } - } while (pendingCount > 0); + } } catch (Throwable e) { for (var read : tx.reads().entrySet()) { if (read.getValue() instanceof TransactionObjectLocked locked) { - toUnlock.add(locked.lock()); + locked.lock().close(); } } throw e; @@ -162,25 +162,34 @@ public class JObjectManager { readSet = tx.reads(); if (!writes.isEmpty()) { - Stream.concat(readSet.keySet().stream(), writes.keySet().stream()) - .sorted() - .forEach(k -> { - var lock = lockManager.lockObject(k); - toUnlock.add(lock); - }); + toUnlock = new ArrayList<>(readSet.size() + writes.size()); + ArrayList toLock = new ArrayList<>(readSet.size() + writes.size()); + for (var read : readSet.entrySet()) { + if (read.getValue() instanceof TransactionObjectLocked locked) { + toUnlock.add(locked.lock()); + } else { + toLock.add(read.getKey()); + } + } + for (var write : writes.entrySet()) { + toLock.add(write.getKey()); + } + Collections.sort(toLock); + for (var key : toLock) { + var lock = lockManager.lockObject(key); + toUnlock.add(lock); + } commitSnapshot = snapshotManager.createSnapshot(); - } - - for (var read : readSet.entrySet()) { - if (read.getValue() instanceof TransactionObjectLocked locked) { - toUnlock.add(locked.lock()); - } - } - - if (writes.isEmpty()) { + } else { Log.trace("Committing transaction - no changes"); + for (var read : readSet.values()) { + if (read instanceof TransactionObjectLocked locked) { + locked.lock().close(); + } + } + return Pair.of( Stream.concat( tx.getOnCommit().stream(), @@ -199,24 +208,23 @@ public class JObjectManager { if (snapshotId != commitSnapshot.id()) { for (var read : readSet.entrySet()) { - dependenciesLocked.put(read.getKey(), commitSnapshot.readObject(read.getKey())); - var dep = dependenciesLocked.get(read.getKey()); + var current = commitSnapshot.readObject(read.getKey()); - if (dep.isEmpty() != read.getValue().data().isEmpty()) { + if (current.isEmpty() != read.getValue().data().isEmpty()) { Log.tracev("Checking read dependency {0} - not found", read.getKey()); - throw new TxCommitException("Serialization hazard: " + dep.isEmpty() + " vs " + read.getValue().data().isEmpty()); + throw new TxCommitException("Serialization hazard: " + current.isEmpty() + " vs " + read.getValue().data().isEmpty()); } - if (dep.isEmpty()) { + if (current.isEmpty()) { // TODO: Every write gets a dependency due to hooks continue; // assert false; // throw new TxCommitException("Serialization hazard: " + dep.isEmpty() + " vs " + read.getValue().data().isEmpty()); } - if (dep.get().version() > snapshotId) { + if (current.get().version() > snapshotId) { Log.tracev("Checking dependency {0} - newer than", read.getKey()); - throw new TxCommitException("Serialization hazard: " + dep.get().data().key() + " " + dep.get().version() + " vs " + snapshotId); + throw new TxCommitException("Serialization hazard: " + current.get().data().key() + " " + current.get().version() + " vs " + snapshotId); } Log.tracev("Checking dependency {0} - ok with read", read.getKey()); @@ -225,21 +233,7 @@ public class JObjectManager { Log.tracev("Skipped dependency checks: no changes"); } - boolean same = snapshotId == commitSnapshot.id(); - - var addFlushCallback = snapshotManager.commitTx( - writes.values().stream() - .filter(r -> { - if (!same) - if (r instanceof TxRecord.TxObjectRecordWrite(JData data)) { - var dep = dependenciesLocked.get(data.key()); - if (dep.isPresent() && dep.get().version() > snapshotId) { - Log.trace("Skipping write " + data.key() + " - dependency " + dep.get().version() + " vs " + snapshotId); - return false; - } - } - return true; - }).toList()); + var addFlushCallback = snapshotManager.commitTx(writes.values()); for (var callback : tx.getOnFlush()) { addFlushCallback.accept(callback); @@ -257,9 +251,10 @@ public class JObjectManager { Log.trace("Error when committing transaction", t); throw new TxCommitException(t.getMessage(), t); } finally { - for (var unlock : toUnlock) { - unlock.close(); - } + if (toUnlock != null) + for (var unlock : toUnlock) { + unlock.close(); + } if (commitSnapshot != null) commitSnapshot.close(); tx.close(); diff --git a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/RemoteObjectDataWrapper.java b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/RemoteObjectDataWrapper.java index 4916f38f..c9d7295d 100644 --- a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/RemoteObjectDataWrapper.java +++ b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/RemoteObjectDataWrapper.java @@ -6,30 +6,27 @@ import org.pcollections.PCollection; import java.util.Collection; -public record RemoteObjectDataWrapper(PCollection refsFrom, - boolean frozen, - T data) implements JDataRefcounted { +public record RemoteObjectDataWrapper( + JObjectKey key, + PCollection refsFrom, + boolean frozen, + T data) implements JDataRefcounted { public RemoteObjectDataWrapper(T data) { - this(HashTreePSet.empty(), false, data); + this(RemoteObjectMeta.ofDataKey(data.key()), HashTreePSet.empty(), false, data); } @Override public RemoteObjectDataWrapper withRefsFrom(PCollection refs) { - return new RemoteObjectDataWrapper<>(refs, frozen, data); + return new RemoteObjectDataWrapper<>(key, refs, frozen, data); } @Override public RemoteObjectDataWrapper withFrozen(boolean frozen) { - return new RemoteObjectDataWrapper<>(refsFrom, frozen, data); + return new RemoteObjectDataWrapper<>(key, refsFrom, frozen, data); } public RemoteObjectDataWrapper withData(T data) { - return new RemoteObjectDataWrapper<>(refsFrom, frozen, data); - } - - @Override - public JObjectKey key() { - return RemoteObjectMeta.ofDataKey(data.key()); + return new RemoteObjectDataWrapper<>(key, refsFrom, frozen, data); } @Override diff --git a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/jmap/JMapEntry.java b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/jmap/JMapEntry.java index 8fd0a0b9..0045c9c4 100644 --- a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/jmap/JMapEntry.java +++ b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/jmap/JMapEntry.java @@ -3,11 +3,11 @@ package com.usatiuk.dhfs.jmap; import com.usatiuk.objects.JData; import com.usatiuk.objects.JObjectKey; -public record JMapEntry(JObjectKey holder, +public record JMapEntry(JObjectKey key, + JObjectKey holder, K selfKey, JObjectKey ref) implements JData { - @Override - public JObjectKey key() { - return JMapHelper.makeKey(holder, selfKey); + public JMapEntry(JObjectKey holder, K selfKey, JObjectKey ref) { + this(JMapHelper.makeKey(holder, selfKey), holder, selfKey, ref); } }