3 Commits

Author SHA1 Message Date
e167c21d40 More microoptimizations 2! 2025-04-17 11:48:43 +02:00
7dc8f486ea More microoptimizations! 2025-04-17 10:02:26 +02:00
da1a996e6f Support: un-simplify allocateUninitialized 2025-04-17 09:20:56 +02:00
10 changed files with 93 additions and 102 deletions

View File

@@ -6,13 +6,14 @@ import com.usatiuk.objects.stores.WritebackObjectPersistentStore;
import com.usatiuk.objects.transaction.TxRecord;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import javax.annotation.Nonnull;
import java.util.Collection;
import java.util.Optional;
import java.util.function.Consumer;
@ApplicationScoped
@Singleton
public class SnapshotManager {
@Inject
WritebackObjectPersistentStore writebackStore;

View File

@@ -1,5 +1,6 @@
package com.usatiuk.objects.transaction;
import com.google.common.collect.Streams;
import com.usatiuk.dhfs.utils.AutoCloseableNoThrow;
import com.usatiuk.objects.JData;
import com.usatiuk.objects.JDataVersionedWrapper;
@@ -13,6 +14,7 @@ import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
import jakarta.enterprise.inject.Instance;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import org.apache.commons.lang3.tuple.Pair;
import java.util.*;
@@ -62,11 +64,10 @@ public class JObjectManager {
public Pair<Collection<Runnable>, TransactionHandle> commit(TransactionPrivate tx) {
verifyReady();
var writes = new LinkedHashMap<JObjectKey, TxRecord.TxObjectRecord<?>>();
var dependenciesLocked = new LinkedHashMap<JObjectKey, Optional<JDataVersionedWrapper>>();
var writes = new HashMap<JObjectKey, TxRecord.TxObjectRecord<?>>();
Snapshot<JObjectKey, JDataVersionedWrapper> commitSnapshot = null;
Map<JObjectKey, TransactionObject<?>> readSet;
var toUnlock = new ArrayList<AutoCloseableNoThrow>();
Map<JObjectKey, TransactionObject<?>> readSet = null;
Collection<AutoCloseableNoThrow> toUnlock = null;
try {
try {
@@ -95,7 +96,7 @@ public class JObjectManager {
// For example, when a hook makes changes to an object, and another hook changes the object before/after it
// on the next iteration, the first hook should receive the version of the object it had created
// as the "old" version, and the new version with all the changes after it.
do {
while (pendingCount > 0) {
for (var hookId : hookIterationData) {
var hook = hookId.hook();
var lastCurHookSeen = hookId.lastWrites();
@@ -148,11 +149,11 @@ public class JObjectManager {
writes.put(n.key(), n);
}
}
} while (pendingCount > 0);
}
} catch (Throwable e) {
for (var read : tx.reads().entrySet()) {
if (read.getValue() instanceof TransactionObjectLocked<?> locked) {
toUnlock.add(locked.lock());
locked.lock().close();
}
}
throw e;
@@ -161,25 +162,34 @@ public class JObjectManager {
readSet = tx.reads();
if (!writes.isEmpty()) {
Stream.concat(readSet.keySet().stream(), writes.keySet().stream())
.sorted(Comparator.comparing(JObjectKey::toString))
.forEach(k -> {
var lock = lockManager.lockObject(k);
toUnlock.add(lock);
});
toUnlock = new ArrayList<>(readSet.size() + writes.size());
ArrayList<JObjectKey> toLock = new ArrayList<>(readSet.size() + writes.size());
for (var read : readSet.entrySet()) {
if (read.getValue() instanceof TransactionObjectLocked<?> locked) {
toUnlock.add(locked.lock());
} else {
toLock.add(read.getKey());
}
}
for (var write : writes.entrySet()) {
toLock.add(write.getKey());
}
Collections.sort(toLock);
for (var key : toLock) {
var lock = lockManager.lockObject(key);
toUnlock.add(lock);
}
commitSnapshot = snapshotManager.createSnapshot();
}
for (var read : readSet.entrySet()) {
if (read.getValue() instanceof TransactionObjectLocked<?> locked) {
toUnlock.add(locked.lock());
}
}
if (writes.isEmpty()) {
} else {
Log.trace("Committing transaction - no changes");
for (var read : readSet.values()) {
if (read instanceof TransactionObjectLocked<?> locked) {
locked.lock().close();
}
}
return Pair.of(
Stream.concat(
tx.getOnCommit().stream(),
@@ -198,24 +208,23 @@ public class JObjectManager {
if (snapshotId != commitSnapshot.id()) {
for (var read : readSet.entrySet()) {
dependenciesLocked.put(read.getKey(), commitSnapshot.readObject(read.getKey()));
var dep = dependenciesLocked.get(read.getKey());
var current = commitSnapshot.readObject(read.getKey());
if (dep.isEmpty() != read.getValue().data().isEmpty()) {
if (current.isEmpty() != read.getValue().data().isEmpty()) {
Log.tracev("Checking read dependency {0} - not found", read.getKey());
throw new TxCommitException("Serialization hazard: " + dep.isEmpty() + " vs " + read.getValue().data().isEmpty());
throw new TxCommitException("Serialization hazard: " + current.isEmpty() + " vs " + read.getValue().data().isEmpty());
}
if (dep.isEmpty()) {
if (current.isEmpty()) {
// TODO: Every write gets a dependency due to hooks
continue;
// assert false;
// throw new TxCommitException("Serialization hazard: " + dep.isEmpty() + " vs " + read.getValue().data().isEmpty());
}
if (dep.get().version() > snapshotId) {
if (current.get().version() > snapshotId) {
Log.tracev("Checking dependency {0} - newer than", read.getKey());
throw new TxCommitException("Serialization hazard: " + dep.get().data().key() + " " + dep.get().version() + " vs " + snapshotId);
throw new TxCommitException("Serialization hazard: " + current.get().data().key() + " " + current.get().version() + " vs " + snapshotId);
}
Log.tracev("Checking dependency {0} - ok with read", read.getKey());
@@ -224,21 +233,7 @@ public class JObjectManager {
Log.tracev("Skipped dependency checks: no changes");
}
boolean same = snapshotId == commitSnapshot.id();
var addFlushCallback = snapshotManager.commitTx(
writes.values().stream()
.filter(r -> {
if (!same)
if (r instanceof TxRecord.TxObjectRecordWrite<?>(JData data)) {
var dep = dependenciesLocked.get(data.key());
if (dep.isPresent() && dep.get().version() > snapshotId) {
Log.trace("Skipping write " + data.key() + " - dependency " + dep.get().version() + " vs " + snapshotId);
return false;
}
}
return true;
}).toList());
var addFlushCallback = snapshotManager.commitTx(writes.values());
for (var callback : tx.getOnFlush()) {
addFlushCallback.accept(callback);
@@ -256,9 +251,10 @@ public class JObjectManager {
Log.trace("Error when committing transaction", t);
throw new TxCommitException(t.getMessage(), t);
} finally {
for (var unlock : toUnlock) {
unlock.close();
}
if (toUnlock != null)
for (var unlock : toUnlock) {
unlock.close();
}
if (commitSnapshot != null)
commitSnapshot.close();
tx.close();

View File

@@ -6,8 +6,9 @@ import com.usatiuk.dhfs.utils.DataLocker;
import jakarta.annotation.Nonnull;
import jakarta.annotation.Nullable;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Singleton;
@ApplicationScoped
@Singleton
public class LockManager {
private final DataLocker _objLocker = new DataLocker();

View File

@@ -9,12 +9,13 @@ import com.usatiuk.objects.snapshot.SnapshotManager;
import io.quarkus.logging.Log;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import org.apache.commons.lang3.tuple.Pair;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import java.util.*;
@ApplicationScoped
@Singleton
public class TransactionFactoryImpl implements TransactionFactory {
@Inject
SnapshotManager snapshotManager;

View File

@@ -9,6 +9,9 @@ public class UninitializedByteBuffer {
private static final Logger LOGGER = Logger.getLogger(UninitializedByteBuffer.class.getName());
public static ByteBuffer allocateUninitialized(int size) {
if (size < DhfsSupport.PAGE_SIZE)
return ByteBuffer.allocateDirect(size);
var bb = new ByteBuffer[1];
long token = DhfsSupport.allocateUninitializedByteBuffer(bb, size);
var ret = bb[0];

View File

@@ -6,30 +6,27 @@ import org.pcollections.PCollection;
import java.util.Collection;
public record RemoteObjectDataWrapper<T extends JDataRemote>(PCollection<JDataRef> refsFrom,
boolean frozen,
T data) implements JDataRefcounted {
public record RemoteObjectDataWrapper<T extends JDataRemote>(
JObjectKey key,
PCollection<JDataRef> refsFrom,
boolean frozen,
T data) implements JDataRefcounted {
public RemoteObjectDataWrapper(T data) {
this(HashTreePSet.empty(), false, data);
this(RemoteObjectMeta.ofDataKey(data.key()), HashTreePSet.empty(), false, data);
}
@Override
public RemoteObjectDataWrapper<T> withRefsFrom(PCollection<JDataRef> refs) {
return new RemoteObjectDataWrapper<>(refs, frozen, data);
return new RemoteObjectDataWrapper<>(key, refs, frozen, data);
}
@Override
public RemoteObjectDataWrapper<T> withFrozen(boolean frozen) {
return new RemoteObjectDataWrapper<>(refsFrom, frozen, data);
return new RemoteObjectDataWrapper<>(key, refsFrom, frozen, data);
}
public RemoteObjectDataWrapper<T> withData(T data) {
return new RemoteObjectDataWrapper<>(refsFrom, frozen, data);
}
@Override
public JObjectKey key() {
return RemoteObjectMeta.ofDataKey(data.key());
return new RemoteObjectDataWrapper<>(key, refsFrom, frozen, data);
}
@Override

View File

@@ -9,12 +9,13 @@ import com.usatiuk.objects.transaction.Transaction;
import io.quarkus.logging.Log;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import org.apache.commons.lang3.mutable.MutableObject;
import org.pcollections.HashTreePSet;
import java.util.Optional;
@ApplicationScoped
@Singleton
public class RemoteTransaction {
@Inject
Transaction curTx;

View File

@@ -3,11 +3,11 @@ package com.usatiuk.dhfs.jmap;
import com.usatiuk.objects.JData;
import com.usatiuk.objects.JObjectKey;
public record JMapEntry<K extends JMapKey>(JObjectKey holder,
public record JMapEntry<K extends JMapKey>(JObjectKey key,
JObjectKey holder,
K selfKey,
JObjectKey ref) implements JData {
@Override
public JObjectKey key() {
return JMapHelper.makeKey(holder, selfKey);
public JMapEntry(JObjectKey holder, K selfKey, JObjectKey ref) {
this(JMapHelper.makeKey(holder, selfKey), holder, selfKey, ref);
}
}

View File

@@ -7,11 +7,12 @@ import com.usatiuk.objects.transaction.Transaction;
import io.quarkus.logging.Log;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import java.util.ArrayList;
import java.util.Optional;
@ApplicationScoped
@Singleton
public class JMapHelper {
@Inject
Transaction curTx;

View File

@@ -15,57 +15,47 @@ public class DataLocker {
@Nonnull
public AutoCloseableNoThrow lock(Object data) {
while (true) {
var newTag = new LockTag();
var oldTag = _locks.putIfAbsent(data, newTag);
if (oldTag == null) {
return new Lock(data, newTag);
}
try {
var tag = _locks.get(data);
if (tag != null) {
synchronized (tag) {
while (!tag.released) {
if (tag.owner == Thread.currentThread()) {
return DUMMY_LOCK;
}
tag.wait();
synchronized (oldTag) {
while (!oldTag.released) {
if (oldTag.owner == Thread.currentThread()) {
return DUMMY_LOCK;
}
oldTag.wait();
// tag.wait(4000L);
// if (!tag.released) {
// System.out.println("Timeout waiting for lock: " + data);
// System.exit(1);
// throw new InterruptedException();
// }
}
continue;
}
}
} catch (InterruptedException ignored) {
}
var newTag = new LockTag();
var oldTag = _locks.putIfAbsent(data, newTag);
if (oldTag == null) {
return new Lock(data, newTag);
}
}
}
@Nullable
public AutoCloseableNoThrow tryLock(Object data) {
while (true) {
var tag = _locks.get(data);
if (tag != null) {
synchronized (tag) {
if (!tag.released) {
if (tag.owner == Thread.currentThread()) {
return DUMMY_LOCK;
}
return null;
}
continue;
}
}
var newTag = new LockTag();
var oldTag = _locks.putIfAbsent(data, newTag);
if (oldTag == null) {
return new Lock(data, newTag);
}
synchronized (oldTag) {
if (!oldTag.released) {
if (oldTag.owner == Thread.currentThread()) {
return DUMMY_LOCK;
}
return null;
}
}
}
}
@@ -83,11 +73,11 @@ public class DataLocker {
public Lock(Object key, LockTag tag) {
_key = key;
_tag = tag;
CLEANER.register(this, () -> {
if (!tag.released) {
Log.error("Lock collected without release: " + key);
}
});
// CLEANER.register(this, () -> {
// if (!tag.released) {
// Log.error("Lock collected without release: " + key);
// }
// });
}
@Override