mirror of
https://github.com/usatiuk/dhfs.git
synced 2025-10-29 04:57:48 +01:00
More microoptimizations 2!
This commit is contained in:
@@ -1,5 +1,6 @@
|
|||||||
package com.usatiuk.objects.transaction;
|
package com.usatiuk.objects.transaction;
|
||||||
|
|
||||||
|
import com.google.common.collect.Streams;
|
||||||
import com.usatiuk.dhfs.utils.AutoCloseableNoThrow;
|
import com.usatiuk.dhfs.utils.AutoCloseableNoThrow;
|
||||||
import com.usatiuk.objects.JData;
|
import com.usatiuk.objects.JData;
|
||||||
import com.usatiuk.objects.JDataVersionedWrapper;
|
import com.usatiuk.objects.JDataVersionedWrapper;
|
||||||
@@ -63,11 +64,10 @@ public class JObjectManager {
|
|||||||
|
|
||||||
public Pair<Collection<Runnable>, TransactionHandle> commit(TransactionPrivate tx) {
|
public Pair<Collection<Runnable>, TransactionHandle> commit(TransactionPrivate tx) {
|
||||||
verifyReady();
|
verifyReady();
|
||||||
var writes = new LinkedHashMap<JObjectKey, TxRecord.TxObjectRecord<?>>();
|
var writes = new HashMap<JObjectKey, TxRecord.TxObjectRecord<?>>();
|
||||||
var dependenciesLocked = new LinkedHashMap<JObjectKey, Optional<JDataVersionedWrapper>>();
|
|
||||||
Snapshot<JObjectKey, JDataVersionedWrapper> commitSnapshot = null;
|
Snapshot<JObjectKey, JDataVersionedWrapper> commitSnapshot = null;
|
||||||
Map<JObjectKey, TransactionObject<?>> readSet;
|
Map<JObjectKey, TransactionObject<?>> readSet = null;
|
||||||
var toUnlock = new ArrayList<AutoCloseableNoThrow>();
|
Collection<AutoCloseableNoThrow> toUnlock = null;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
try {
|
try {
|
||||||
@@ -96,7 +96,7 @@ public class JObjectManager {
|
|||||||
// For example, when a hook makes changes to an object, and another hook changes the object before/after it
|
// For example, when a hook makes changes to an object, and another hook changes the object before/after it
|
||||||
// on the next iteration, the first hook should receive the version of the object it had created
|
// on the next iteration, the first hook should receive the version of the object it had created
|
||||||
// as the "old" version, and the new version with all the changes after it.
|
// as the "old" version, and the new version with all the changes after it.
|
||||||
do {
|
while (pendingCount > 0) {
|
||||||
for (var hookId : hookIterationData) {
|
for (var hookId : hookIterationData) {
|
||||||
var hook = hookId.hook();
|
var hook = hookId.hook();
|
||||||
var lastCurHookSeen = hookId.lastWrites();
|
var lastCurHookSeen = hookId.lastWrites();
|
||||||
@@ -149,11 +149,11 @@ public class JObjectManager {
|
|||||||
writes.put(n.key(), n);
|
writes.put(n.key(), n);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} while (pendingCount > 0);
|
}
|
||||||
} catch (Throwable e) {
|
} catch (Throwable e) {
|
||||||
for (var read : tx.reads().entrySet()) {
|
for (var read : tx.reads().entrySet()) {
|
||||||
if (read.getValue() instanceof TransactionObjectLocked<?> locked) {
|
if (read.getValue() instanceof TransactionObjectLocked<?> locked) {
|
||||||
toUnlock.add(locked.lock());
|
locked.lock().close();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
throw e;
|
throw e;
|
||||||
@@ -162,25 +162,34 @@ public class JObjectManager {
|
|||||||
readSet = tx.reads();
|
readSet = tx.reads();
|
||||||
|
|
||||||
if (!writes.isEmpty()) {
|
if (!writes.isEmpty()) {
|
||||||
Stream.concat(readSet.keySet().stream(), writes.keySet().stream())
|
toUnlock = new ArrayList<>(readSet.size() + writes.size());
|
||||||
.sorted()
|
ArrayList<JObjectKey> toLock = new ArrayList<>(readSet.size() + writes.size());
|
||||||
.forEach(k -> {
|
for (var read : readSet.entrySet()) {
|
||||||
var lock = lockManager.lockObject(k);
|
if (read.getValue() instanceof TransactionObjectLocked<?> locked) {
|
||||||
toUnlock.add(lock);
|
toUnlock.add(locked.lock());
|
||||||
});
|
} else {
|
||||||
|
toLock.add(read.getKey());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for (var write : writes.entrySet()) {
|
||||||
|
toLock.add(write.getKey());
|
||||||
|
}
|
||||||
|
Collections.sort(toLock);
|
||||||
|
for (var key : toLock) {
|
||||||
|
var lock = lockManager.lockObject(key);
|
||||||
|
toUnlock.add(lock);
|
||||||
|
}
|
||||||
|
|
||||||
commitSnapshot = snapshotManager.createSnapshot();
|
commitSnapshot = snapshotManager.createSnapshot();
|
||||||
}
|
} else {
|
||||||
|
|
||||||
for (var read : readSet.entrySet()) {
|
|
||||||
if (read.getValue() instanceof TransactionObjectLocked<?> locked) {
|
|
||||||
toUnlock.add(locked.lock());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (writes.isEmpty()) {
|
|
||||||
Log.trace("Committing transaction - no changes");
|
Log.trace("Committing transaction - no changes");
|
||||||
|
|
||||||
|
for (var read : readSet.values()) {
|
||||||
|
if (read instanceof TransactionObjectLocked<?> locked) {
|
||||||
|
locked.lock().close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return Pair.of(
|
return Pair.of(
|
||||||
Stream.concat(
|
Stream.concat(
|
||||||
tx.getOnCommit().stream(),
|
tx.getOnCommit().stream(),
|
||||||
@@ -199,24 +208,23 @@ public class JObjectManager {
|
|||||||
|
|
||||||
if (snapshotId != commitSnapshot.id()) {
|
if (snapshotId != commitSnapshot.id()) {
|
||||||
for (var read : readSet.entrySet()) {
|
for (var read : readSet.entrySet()) {
|
||||||
dependenciesLocked.put(read.getKey(), commitSnapshot.readObject(read.getKey()));
|
var current = commitSnapshot.readObject(read.getKey());
|
||||||
var dep = dependenciesLocked.get(read.getKey());
|
|
||||||
|
|
||||||
if (dep.isEmpty() != read.getValue().data().isEmpty()) {
|
if (current.isEmpty() != read.getValue().data().isEmpty()) {
|
||||||
Log.tracev("Checking read dependency {0} - not found", read.getKey());
|
Log.tracev("Checking read dependency {0} - not found", read.getKey());
|
||||||
throw new TxCommitException("Serialization hazard: " + dep.isEmpty() + " vs " + read.getValue().data().isEmpty());
|
throw new TxCommitException("Serialization hazard: " + current.isEmpty() + " vs " + read.getValue().data().isEmpty());
|
||||||
}
|
}
|
||||||
|
|
||||||
if (dep.isEmpty()) {
|
if (current.isEmpty()) {
|
||||||
// TODO: Every write gets a dependency due to hooks
|
// TODO: Every write gets a dependency due to hooks
|
||||||
continue;
|
continue;
|
||||||
// assert false;
|
// assert false;
|
||||||
// throw new TxCommitException("Serialization hazard: " + dep.isEmpty() + " vs " + read.getValue().data().isEmpty());
|
// throw new TxCommitException("Serialization hazard: " + dep.isEmpty() + " vs " + read.getValue().data().isEmpty());
|
||||||
}
|
}
|
||||||
|
|
||||||
if (dep.get().version() > snapshotId) {
|
if (current.get().version() > snapshotId) {
|
||||||
Log.tracev("Checking dependency {0} - newer than", read.getKey());
|
Log.tracev("Checking dependency {0} - newer than", read.getKey());
|
||||||
throw new TxCommitException("Serialization hazard: " + dep.get().data().key() + " " + dep.get().version() + " vs " + snapshotId);
|
throw new TxCommitException("Serialization hazard: " + current.get().data().key() + " " + current.get().version() + " vs " + snapshotId);
|
||||||
}
|
}
|
||||||
|
|
||||||
Log.tracev("Checking dependency {0} - ok with read", read.getKey());
|
Log.tracev("Checking dependency {0} - ok with read", read.getKey());
|
||||||
@@ -225,21 +233,7 @@ public class JObjectManager {
|
|||||||
Log.tracev("Skipped dependency checks: no changes");
|
Log.tracev("Skipped dependency checks: no changes");
|
||||||
}
|
}
|
||||||
|
|
||||||
boolean same = snapshotId == commitSnapshot.id();
|
var addFlushCallback = snapshotManager.commitTx(writes.values());
|
||||||
|
|
||||||
var addFlushCallback = snapshotManager.commitTx(
|
|
||||||
writes.values().stream()
|
|
||||||
.filter(r -> {
|
|
||||||
if (!same)
|
|
||||||
if (r instanceof TxRecord.TxObjectRecordWrite<?>(JData data)) {
|
|
||||||
var dep = dependenciesLocked.get(data.key());
|
|
||||||
if (dep.isPresent() && dep.get().version() > snapshotId) {
|
|
||||||
Log.trace("Skipping write " + data.key() + " - dependency " + dep.get().version() + " vs " + snapshotId);
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return true;
|
|
||||||
}).toList());
|
|
||||||
|
|
||||||
for (var callback : tx.getOnFlush()) {
|
for (var callback : tx.getOnFlush()) {
|
||||||
addFlushCallback.accept(callback);
|
addFlushCallback.accept(callback);
|
||||||
@@ -257,9 +251,10 @@ public class JObjectManager {
|
|||||||
Log.trace("Error when committing transaction", t);
|
Log.trace("Error when committing transaction", t);
|
||||||
throw new TxCommitException(t.getMessage(), t);
|
throw new TxCommitException(t.getMessage(), t);
|
||||||
} finally {
|
} finally {
|
||||||
for (var unlock : toUnlock) {
|
if (toUnlock != null)
|
||||||
unlock.close();
|
for (var unlock : toUnlock) {
|
||||||
}
|
unlock.close();
|
||||||
|
}
|
||||||
if (commitSnapshot != null)
|
if (commitSnapshot != null)
|
||||||
commitSnapshot.close();
|
commitSnapshot.close();
|
||||||
tx.close();
|
tx.close();
|
||||||
|
|||||||
@@ -6,30 +6,27 @@ import org.pcollections.PCollection;
|
|||||||
|
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
|
|
||||||
public record RemoteObjectDataWrapper<T extends JDataRemote>(PCollection<JDataRef> refsFrom,
|
public record RemoteObjectDataWrapper<T extends JDataRemote>(
|
||||||
boolean frozen,
|
JObjectKey key,
|
||||||
T data) implements JDataRefcounted {
|
PCollection<JDataRef> refsFrom,
|
||||||
|
boolean frozen,
|
||||||
|
T data) implements JDataRefcounted {
|
||||||
public RemoteObjectDataWrapper(T data) {
|
public RemoteObjectDataWrapper(T data) {
|
||||||
this(HashTreePSet.empty(), false, data);
|
this(RemoteObjectMeta.ofDataKey(data.key()), HashTreePSet.empty(), false, data);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public RemoteObjectDataWrapper<T> withRefsFrom(PCollection<JDataRef> refs) {
|
public RemoteObjectDataWrapper<T> withRefsFrom(PCollection<JDataRef> refs) {
|
||||||
return new RemoteObjectDataWrapper<>(refs, frozen, data);
|
return new RemoteObjectDataWrapper<>(key, refs, frozen, data);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public RemoteObjectDataWrapper<T> withFrozen(boolean frozen) {
|
public RemoteObjectDataWrapper<T> withFrozen(boolean frozen) {
|
||||||
return new RemoteObjectDataWrapper<>(refsFrom, frozen, data);
|
return new RemoteObjectDataWrapper<>(key, refsFrom, frozen, data);
|
||||||
}
|
}
|
||||||
|
|
||||||
public RemoteObjectDataWrapper<T> withData(T data) {
|
public RemoteObjectDataWrapper<T> withData(T data) {
|
||||||
return new RemoteObjectDataWrapper<>(refsFrom, frozen, data);
|
return new RemoteObjectDataWrapper<>(key, refsFrom, frozen, data);
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public JObjectKey key() {
|
|
||||||
return RemoteObjectMeta.ofDataKey(data.key());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|||||||
@@ -3,11 +3,11 @@ package com.usatiuk.dhfs.jmap;
|
|||||||
import com.usatiuk.objects.JData;
|
import com.usatiuk.objects.JData;
|
||||||
import com.usatiuk.objects.JObjectKey;
|
import com.usatiuk.objects.JObjectKey;
|
||||||
|
|
||||||
public record JMapEntry<K extends JMapKey>(JObjectKey holder,
|
public record JMapEntry<K extends JMapKey>(JObjectKey key,
|
||||||
|
JObjectKey holder,
|
||||||
K selfKey,
|
K selfKey,
|
||||||
JObjectKey ref) implements JData {
|
JObjectKey ref) implements JData {
|
||||||
@Override
|
public JMapEntry(JObjectKey holder, K selfKey, JObjectKey ref) {
|
||||||
public JObjectKey key() {
|
this(JMapHelper.makeKey(holder, selfKey), holder, selfKey, ref);
|
||||||
return JMapHelper.makeKey(holder, selfKey);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user