3 Commits

Author SHA1 Message Date
e167c21d40 More microoptimizations 2! 2025-04-17 11:48:43 +02:00
7dc8f486ea More microoptimizations! 2025-04-17 10:02:26 +02:00
da1a996e6f Support: un-simplify allocateUninitialized 2025-04-17 09:20:56 +02:00
10 changed files with 93 additions and 102 deletions

View File

@@ -6,13 +6,14 @@ import com.usatiuk.objects.stores.WritebackObjectPersistentStore;
import com.usatiuk.objects.transaction.TxRecord; import com.usatiuk.objects.transaction.TxRecord;
import jakarta.enterprise.context.ApplicationScoped; import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject; import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import javax.annotation.Nonnull; import javax.annotation.Nonnull;
import java.util.Collection; import java.util.Collection;
import java.util.Optional; import java.util.Optional;
import java.util.function.Consumer; import java.util.function.Consumer;
@ApplicationScoped @Singleton
public class SnapshotManager { public class SnapshotManager {
@Inject @Inject
WritebackObjectPersistentStore writebackStore; WritebackObjectPersistentStore writebackStore;

View File

@@ -1,5 +1,6 @@
package com.usatiuk.objects.transaction; package com.usatiuk.objects.transaction;
import com.google.common.collect.Streams;
import com.usatiuk.dhfs.utils.AutoCloseableNoThrow; import com.usatiuk.dhfs.utils.AutoCloseableNoThrow;
import com.usatiuk.objects.JData; import com.usatiuk.objects.JData;
import com.usatiuk.objects.JDataVersionedWrapper; import com.usatiuk.objects.JDataVersionedWrapper;
@@ -13,6 +14,7 @@ import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes; import jakarta.enterprise.event.Observes;
import jakarta.enterprise.inject.Instance; import jakarta.enterprise.inject.Instance;
import jakarta.inject.Inject; import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import org.apache.commons.lang3.tuple.Pair; import org.apache.commons.lang3.tuple.Pair;
import java.util.*; import java.util.*;
@@ -62,11 +64,10 @@ public class JObjectManager {
public Pair<Collection<Runnable>, TransactionHandle> commit(TransactionPrivate tx) { public Pair<Collection<Runnable>, TransactionHandle> commit(TransactionPrivate tx) {
verifyReady(); verifyReady();
var writes = new LinkedHashMap<JObjectKey, TxRecord.TxObjectRecord<?>>(); var writes = new HashMap<JObjectKey, TxRecord.TxObjectRecord<?>>();
var dependenciesLocked = new LinkedHashMap<JObjectKey, Optional<JDataVersionedWrapper>>();
Snapshot<JObjectKey, JDataVersionedWrapper> commitSnapshot = null; Snapshot<JObjectKey, JDataVersionedWrapper> commitSnapshot = null;
Map<JObjectKey, TransactionObject<?>> readSet; Map<JObjectKey, TransactionObject<?>> readSet = null;
var toUnlock = new ArrayList<AutoCloseableNoThrow>(); Collection<AutoCloseableNoThrow> toUnlock = null;
try { try {
try { try {
@@ -95,7 +96,7 @@ public class JObjectManager {
// For example, when a hook makes changes to an object, and another hook changes the object before/after it // For example, when a hook makes changes to an object, and another hook changes the object before/after it
// on the next iteration, the first hook should receive the version of the object it had created // on the next iteration, the first hook should receive the version of the object it had created
// as the "old" version, and the new version with all the changes after it. // as the "old" version, and the new version with all the changes after it.
do { while (pendingCount > 0) {
for (var hookId : hookIterationData) { for (var hookId : hookIterationData) {
var hook = hookId.hook(); var hook = hookId.hook();
var lastCurHookSeen = hookId.lastWrites(); var lastCurHookSeen = hookId.lastWrites();
@@ -148,11 +149,11 @@ public class JObjectManager {
writes.put(n.key(), n); writes.put(n.key(), n);
} }
} }
} while (pendingCount > 0); }
} catch (Throwable e) { } catch (Throwable e) {
for (var read : tx.reads().entrySet()) { for (var read : tx.reads().entrySet()) {
if (read.getValue() instanceof TransactionObjectLocked<?> locked) { if (read.getValue() instanceof TransactionObjectLocked<?> locked) {
toUnlock.add(locked.lock()); locked.lock().close();
} }
} }
throw e; throw e;
@@ -161,25 +162,34 @@ public class JObjectManager {
readSet = tx.reads(); readSet = tx.reads();
if (!writes.isEmpty()) { if (!writes.isEmpty()) {
Stream.concat(readSet.keySet().stream(), writes.keySet().stream()) toUnlock = new ArrayList<>(readSet.size() + writes.size());
.sorted(Comparator.comparing(JObjectKey::toString)) ArrayList<JObjectKey> toLock = new ArrayList<>(readSet.size() + writes.size());
.forEach(k -> { for (var read : readSet.entrySet()) {
var lock = lockManager.lockObject(k); if (read.getValue() instanceof TransactionObjectLocked<?> locked) {
toUnlock.add(lock); toUnlock.add(locked.lock());
}); } else {
toLock.add(read.getKey());
}
}
for (var write : writes.entrySet()) {
toLock.add(write.getKey());
}
Collections.sort(toLock);
for (var key : toLock) {
var lock = lockManager.lockObject(key);
toUnlock.add(lock);
}
commitSnapshot = snapshotManager.createSnapshot(); commitSnapshot = snapshotManager.createSnapshot();
} } else {
for (var read : readSet.entrySet()) {
if (read.getValue() instanceof TransactionObjectLocked<?> locked) {
toUnlock.add(locked.lock());
}
}
if (writes.isEmpty()) {
Log.trace("Committing transaction - no changes"); Log.trace("Committing transaction - no changes");
for (var read : readSet.values()) {
if (read instanceof TransactionObjectLocked<?> locked) {
locked.lock().close();
}
}
return Pair.of( return Pair.of(
Stream.concat( Stream.concat(
tx.getOnCommit().stream(), tx.getOnCommit().stream(),
@@ -198,24 +208,23 @@ public class JObjectManager {
if (snapshotId != commitSnapshot.id()) { if (snapshotId != commitSnapshot.id()) {
for (var read : readSet.entrySet()) { for (var read : readSet.entrySet()) {
dependenciesLocked.put(read.getKey(), commitSnapshot.readObject(read.getKey())); var current = commitSnapshot.readObject(read.getKey());
var dep = dependenciesLocked.get(read.getKey());
if (dep.isEmpty() != read.getValue().data().isEmpty()) { if (current.isEmpty() != read.getValue().data().isEmpty()) {
Log.tracev("Checking read dependency {0} - not found", read.getKey()); Log.tracev("Checking read dependency {0} - not found", read.getKey());
throw new TxCommitException("Serialization hazard: " + dep.isEmpty() + " vs " + read.getValue().data().isEmpty()); throw new TxCommitException("Serialization hazard: " + current.isEmpty() + " vs " + read.getValue().data().isEmpty());
} }
if (dep.isEmpty()) { if (current.isEmpty()) {
// TODO: Every write gets a dependency due to hooks // TODO: Every write gets a dependency due to hooks
continue; continue;
// assert false; // assert false;
// throw new TxCommitException("Serialization hazard: " + dep.isEmpty() + " vs " + read.getValue().data().isEmpty()); // throw new TxCommitException("Serialization hazard: " + dep.isEmpty() + " vs " + read.getValue().data().isEmpty());
} }
if (dep.get().version() > snapshotId) { if (current.get().version() > snapshotId) {
Log.tracev("Checking dependency {0} - newer than", read.getKey()); Log.tracev("Checking dependency {0} - newer than", read.getKey());
throw new TxCommitException("Serialization hazard: " + dep.get().data().key() + " " + dep.get().version() + " vs " + snapshotId); throw new TxCommitException("Serialization hazard: " + current.get().data().key() + " " + current.get().version() + " vs " + snapshotId);
} }
Log.tracev("Checking dependency {0} - ok with read", read.getKey()); Log.tracev("Checking dependency {0} - ok with read", read.getKey());
@@ -224,21 +233,7 @@ public class JObjectManager {
Log.tracev("Skipped dependency checks: no changes"); Log.tracev("Skipped dependency checks: no changes");
} }
boolean same = snapshotId == commitSnapshot.id(); var addFlushCallback = snapshotManager.commitTx(writes.values());
var addFlushCallback = snapshotManager.commitTx(
writes.values().stream()
.filter(r -> {
if (!same)
if (r instanceof TxRecord.TxObjectRecordWrite<?>(JData data)) {
var dep = dependenciesLocked.get(data.key());
if (dep.isPresent() && dep.get().version() > snapshotId) {
Log.trace("Skipping write " + data.key() + " - dependency " + dep.get().version() + " vs " + snapshotId);
return false;
}
}
return true;
}).toList());
for (var callback : tx.getOnFlush()) { for (var callback : tx.getOnFlush()) {
addFlushCallback.accept(callback); addFlushCallback.accept(callback);
@@ -256,9 +251,10 @@ public class JObjectManager {
Log.trace("Error when committing transaction", t); Log.trace("Error when committing transaction", t);
throw new TxCommitException(t.getMessage(), t); throw new TxCommitException(t.getMessage(), t);
} finally { } finally {
for (var unlock : toUnlock) { if (toUnlock != null)
unlock.close(); for (var unlock : toUnlock) {
} unlock.close();
}
if (commitSnapshot != null) if (commitSnapshot != null)
commitSnapshot.close(); commitSnapshot.close();
tx.close(); tx.close();

View File

@@ -6,8 +6,9 @@ import com.usatiuk.dhfs.utils.DataLocker;
import jakarta.annotation.Nonnull; import jakarta.annotation.Nonnull;
import jakarta.annotation.Nullable; import jakarta.annotation.Nullable;
import jakarta.enterprise.context.ApplicationScoped; import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Singleton;
@ApplicationScoped @Singleton
public class LockManager { public class LockManager {
private final DataLocker _objLocker = new DataLocker(); private final DataLocker _objLocker = new DataLocker();

View File

@@ -9,12 +9,13 @@ import com.usatiuk.objects.snapshot.SnapshotManager;
import io.quarkus.logging.Log; import io.quarkus.logging.Log;
import jakarta.enterprise.context.ApplicationScoped; import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject; import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import org.apache.commons.lang3.tuple.Pair; import org.apache.commons.lang3.tuple.Pair;
import org.eclipse.microprofile.config.inject.ConfigProperty; import org.eclipse.microprofile.config.inject.ConfigProperty;
import java.util.*; import java.util.*;
@ApplicationScoped @Singleton
public class TransactionFactoryImpl implements TransactionFactory { public class TransactionFactoryImpl implements TransactionFactory {
@Inject @Inject
SnapshotManager snapshotManager; SnapshotManager snapshotManager;

View File

@@ -9,6 +9,9 @@ public class UninitializedByteBuffer {
private static final Logger LOGGER = Logger.getLogger(UninitializedByteBuffer.class.getName()); private static final Logger LOGGER = Logger.getLogger(UninitializedByteBuffer.class.getName());
public static ByteBuffer allocateUninitialized(int size) { public static ByteBuffer allocateUninitialized(int size) {
if (size < DhfsSupport.PAGE_SIZE)
return ByteBuffer.allocateDirect(size);
var bb = new ByteBuffer[1]; var bb = new ByteBuffer[1];
long token = DhfsSupport.allocateUninitializedByteBuffer(bb, size); long token = DhfsSupport.allocateUninitializedByteBuffer(bb, size);
var ret = bb[0]; var ret = bb[0];

View File

@@ -6,30 +6,27 @@ import org.pcollections.PCollection;
import java.util.Collection; import java.util.Collection;
public record RemoteObjectDataWrapper<T extends JDataRemote>(PCollection<JDataRef> refsFrom, public record RemoteObjectDataWrapper<T extends JDataRemote>(
boolean frozen, JObjectKey key,
T data) implements JDataRefcounted { PCollection<JDataRef> refsFrom,
boolean frozen,
T data) implements JDataRefcounted {
public RemoteObjectDataWrapper(T data) { public RemoteObjectDataWrapper(T data) {
this(HashTreePSet.empty(), false, data); this(RemoteObjectMeta.ofDataKey(data.key()), HashTreePSet.empty(), false, data);
} }
@Override @Override
public RemoteObjectDataWrapper<T> withRefsFrom(PCollection<JDataRef> refs) { public RemoteObjectDataWrapper<T> withRefsFrom(PCollection<JDataRef> refs) {
return new RemoteObjectDataWrapper<>(refs, frozen, data); return new RemoteObjectDataWrapper<>(key, refs, frozen, data);
} }
@Override @Override
public RemoteObjectDataWrapper<T> withFrozen(boolean frozen) { public RemoteObjectDataWrapper<T> withFrozen(boolean frozen) {
return new RemoteObjectDataWrapper<>(refsFrom, frozen, data); return new RemoteObjectDataWrapper<>(key, refsFrom, frozen, data);
} }
public RemoteObjectDataWrapper<T> withData(T data) { public RemoteObjectDataWrapper<T> withData(T data) {
return new RemoteObjectDataWrapper<>(refsFrom, frozen, data); return new RemoteObjectDataWrapper<>(key, refsFrom, frozen, data);
}
@Override
public JObjectKey key() {
return RemoteObjectMeta.ofDataKey(data.key());
} }
@Override @Override

View File

@@ -9,12 +9,13 @@ import com.usatiuk.objects.transaction.Transaction;
import io.quarkus.logging.Log; import io.quarkus.logging.Log;
import jakarta.enterprise.context.ApplicationScoped; import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject; import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import org.apache.commons.lang3.mutable.MutableObject; import org.apache.commons.lang3.mutable.MutableObject;
import org.pcollections.HashTreePSet; import org.pcollections.HashTreePSet;
import java.util.Optional; import java.util.Optional;
@ApplicationScoped @Singleton
public class RemoteTransaction { public class RemoteTransaction {
@Inject @Inject
Transaction curTx; Transaction curTx;

View File

@@ -3,11 +3,11 @@ package com.usatiuk.dhfs.jmap;
import com.usatiuk.objects.JData; import com.usatiuk.objects.JData;
import com.usatiuk.objects.JObjectKey; import com.usatiuk.objects.JObjectKey;
public record JMapEntry<K extends JMapKey>(JObjectKey holder, public record JMapEntry<K extends JMapKey>(JObjectKey key,
JObjectKey holder,
K selfKey, K selfKey,
JObjectKey ref) implements JData { JObjectKey ref) implements JData {
@Override public JMapEntry(JObjectKey holder, K selfKey, JObjectKey ref) {
public JObjectKey key() { this(JMapHelper.makeKey(holder, selfKey), holder, selfKey, ref);
return JMapHelper.makeKey(holder, selfKey);
} }
} }

View File

@@ -7,11 +7,12 @@ import com.usatiuk.objects.transaction.Transaction;
import io.quarkus.logging.Log; import io.quarkus.logging.Log;
import jakarta.enterprise.context.ApplicationScoped; import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject; import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Optional; import java.util.Optional;
@ApplicationScoped @Singleton
public class JMapHelper { public class JMapHelper {
@Inject @Inject
Transaction curTx; Transaction curTx;

View File

@@ -15,57 +15,47 @@ public class DataLocker {
@Nonnull @Nonnull
public AutoCloseableNoThrow lock(Object data) { public AutoCloseableNoThrow lock(Object data) {
while (true) { while (true) {
var newTag = new LockTag();
var oldTag = _locks.putIfAbsent(data, newTag);
if (oldTag == null) {
return new Lock(data, newTag);
}
try { try {
var tag = _locks.get(data); synchronized (oldTag) {
if (tag != null) { while (!oldTag.released) {
synchronized (tag) { if (oldTag.owner == Thread.currentThread()) {
while (!tag.released) { return DUMMY_LOCK;
if (tag.owner == Thread.currentThread()) { }
return DUMMY_LOCK; oldTag.wait();
}
tag.wait();
// tag.wait(4000L); // tag.wait(4000L);
// if (!tag.released) { // if (!tag.released) {
// System.out.println("Timeout waiting for lock: " + data); // System.out.println("Timeout waiting for lock: " + data);
// System.exit(1); // System.exit(1);
// throw new InterruptedException(); // throw new InterruptedException();
// } // }
}
continue;
} }
} }
} catch (InterruptedException ignored) { } catch (InterruptedException ignored) {
} }
var newTag = new LockTag();
var oldTag = _locks.putIfAbsent(data, newTag);
if (oldTag == null) {
return new Lock(data, newTag);
}
} }
} }
@Nullable @Nullable
public AutoCloseableNoThrow tryLock(Object data) { public AutoCloseableNoThrow tryLock(Object data) {
while (true) { while (true) {
var tag = _locks.get(data);
if (tag != null) {
synchronized (tag) {
if (!tag.released) {
if (tag.owner == Thread.currentThread()) {
return DUMMY_LOCK;
}
return null;
}
continue;
}
}
var newTag = new LockTag(); var newTag = new LockTag();
var oldTag = _locks.putIfAbsent(data, newTag); var oldTag = _locks.putIfAbsent(data, newTag);
if (oldTag == null) { if (oldTag == null) {
return new Lock(data, newTag); return new Lock(data, newTag);
} }
synchronized (oldTag) {
if (!oldTag.released) {
if (oldTag.owner == Thread.currentThread()) {
return DUMMY_LOCK;
}
return null;
}
}
} }
} }
@@ -83,11 +73,11 @@ public class DataLocker {
public Lock(Object key, LockTag tag) { public Lock(Object key, LockTag tag) {
_key = key; _key = key;
_tag = tag; _tag = tag;
CLEANER.register(this, () -> { // CLEANER.register(this, () -> {
if (!tag.released) { // if (!tag.released) {
Log.error("Lock collected without release: " + key); // Log.error("Lock collected without release: " + key);
} // }
}); // });
} }
@Override @Override