mirror of
https://github.com/usatiuk/dhfs.git
synced 2025-10-29 04:57:48 +01:00
Compare commits
9 Commits
bb-keys
...
e167c21d40
| Author | SHA1 | Date | |
|---|---|---|---|
| e167c21d40 | |||
| 7dc8f486ea | |||
| da1a996e6f | |||
| bb52a3af0e | |||
| de0b868349 | |||
| d4d4e150c1 | |||
| c9b0400d50 | |||
| 94218330b1 | |||
| dbe2a72f7c |
@@ -26,11 +26,11 @@ public sealed interface JObjectKey extends Serializable, Comparable<JObjectKey>
|
||||
}
|
||||
|
||||
static JObjectKey fromBytes(byte[] bytes) {
|
||||
return new JObjectKeyImpl(new String(bytes, StandardCharsets.UTF_8));
|
||||
return new JObjectKeyImpl(new String(bytes, StandardCharsets.ISO_8859_1));
|
||||
}
|
||||
|
||||
static JObjectKey fromByteBuffer(ByteBuffer buff) {
|
||||
return new JObjectKeyImpl(StandardCharsets.UTF_8.decode(buff).toString());
|
||||
return new JObjectKeyImpl(StandardCharsets.ISO_8859_1.decode(buff).toString());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -28,12 +28,12 @@ public record JObjectKeyImpl(String value) implements JObjectKey {
|
||||
|
||||
@Override
|
||||
public byte[] bytes() {
|
||||
return value.getBytes(StandardCharsets.UTF_8);
|
||||
return value.getBytes(StandardCharsets.ISO_8859_1);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ByteBuffer toByteBuffer() {
|
||||
var heapBb = StandardCharsets.UTF_8.encode(value);
|
||||
var heapBb = StandardCharsets.ISO_8859_1.encode(value);
|
||||
if (heapBb.isDirect()) return heapBb;
|
||||
var directBb = UninitializedByteBuffer.allocateUninitialized(heapBb.remaining());
|
||||
directBb.put(heapBb);
|
||||
|
||||
@@ -1,25 +1,37 @@
|
||||
package com.usatiuk.objects.iterators;
|
||||
|
||||
import io.quarkus.logging.Log;
|
||||
import org.apache.commons.lang3.mutable.MutableObject;
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.stream.IntStream;
|
||||
import java.util.List;
|
||||
import java.util.NavigableMap;
|
||||
import java.util.NoSuchElementException;
|
||||
import java.util.TreeMap;
|
||||
|
||||
public class MergingKvIterator<K extends Comparable<K>, V> extends ReversibleKvIterator<K, V> {
|
||||
private final NavigableMap<K, CloseableKvIterator<K, V>> _sortedIterators = new TreeMap<>();
|
||||
private record IteratorEntry<K extends Comparable<K>, V>(int priority, CloseableKvIterator<K, V> iterator) {
|
||||
public IteratorEntry<K, V> reversed() {
|
||||
return new IteratorEntry<>(priority, iterator.reversed());
|
||||
}
|
||||
}
|
||||
|
||||
private final NavigableMap<K, IteratorEntry<K, V>> _sortedIterators = new TreeMap<>();
|
||||
private final String _name;
|
||||
private final Map<CloseableKvIterator<K, V>, Integer> _iterators;
|
||||
private final List<IteratorEntry<K, V>> _iterators;
|
||||
|
||||
public MergingKvIterator(String name, IteratorStart startType, K startKey, List<IterProdFn<K, V>> iterators) {
|
||||
_goingForward = true;
|
||||
_name = name;
|
||||
|
||||
_iterators = Map.ofEntries(
|
||||
IntStream.range(0, iterators.size())
|
||||
.mapToObj(i -> Pair.of(iterators.get(i).get(startType, startKey), i))
|
||||
.toArray(Pair[]::new)
|
||||
);
|
||||
// Why streams are so slow?
|
||||
{
|
||||
IteratorEntry<K, V>[] iteratorEntries = new IteratorEntry[iterators.size()];
|
||||
for (int i = 0; i < iterators.size(); i++) {
|
||||
iteratorEntries[i] = new IteratorEntry<>(i, iterators.get(i).get(startType, startKey));
|
||||
}
|
||||
_iterators = List.of(iteratorEntries);
|
||||
}
|
||||
|
||||
if (startType == IteratorStart.LT || startType == IteratorStart.LE) {
|
||||
// Starting at a greatest key less than/less or equal than:
|
||||
@@ -30,7 +42,8 @@ public class MergingKvIterator<K extends Comparable<K>, V> extends ReversibleKvI
|
||||
K greatestLess = null;
|
||||
K smallestMore = null;
|
||||
|
||||
for (var it : _iterators.keySet()) {
|
||||
for (var ite : _iterators) {
|
||||
var it = ite.iterator();
|
||||
if (it.hasNext()) {
|
||||
var peeked = it.peekNextKey();
|
||||
if (startType == IteratorStart.LE ? peeked.compareTo(startKey) <= 0 : peeked.compareTo(startKey) < 0) {
|
||||
@@ -55,14 +68,15 @@ public class MergingKvIterator<K extends Comparable<K>, V> extends ReversibleKvI
|
||||
// Empty iterators
|
||||
}
|
||||
|
||||
for (var iterator : _iterators.keySet()) {
|
||||
for (var ite : _iterators) {
|
||||
var iterator = ite.iterator();
|
||||
while (iterator.hasNext() && iterator.peekNextKey().compareTo(initialMaxValue) < 0) {
|
||||
iterator.skip();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for (CloseableKvIterator<K, V> iterator : _iterators.keySet()) {
|
||||
for (IteratorEntry<K, V> iterator : _iterators) {
|
||||
advanceIterator(iterator);
|
||||
}
|
||||
|
||||
@@ -88,29 +102,39 @@ public class MergingKvIterator<K extends Comparable<K>, V> extends ReversibleKvI
|
||||
this(name, startType, startKey, List.of(iterators));
|
||||
}
|
||||
|
||||
private void advanceIterator(CloseableKvIterator<K, V> iterator) {
|
||||
if (!iterator.hasNext()) {
|
||||
return;
|
||||
}
|
||||
private void advanceIterator(IteratorEntry<K, V> iteratorEntry) {
|
||||
while (iteratorEntry.iterator().hasNext()) {
|
||||
K key = iteratorEntry.iterator().peekNextKey();
|
||||
Log.tracev("{0} Advance peeked: {1}-{2}", _name, iteratorEntry, key);
|
||||
|
||||
K key = iterator.peekNextKey();
|
||||
Log.tracev("{0} Advance peeked: {1}-{2}", _name, iterator, key);
|
||||
if (!_sortedIterators.containsKey(key)) {
|
||||
_sortedIterators.put(key, iterator);
|
||||
return;
|
||||
}
|
||||
MutableObject<IteratorEntry<K, V>> mutableBoolean = new MutableObject<>(null);
|
||||
|
||||
// Expects that reversed iterator returns itself when reversed again
|
||||
var oursPrio = _iterators.get(_goingForward ? iterator : iterator.reversed());
|
||||
var them = _sortedIterators.get(key);
|
||||
var theirsPrio = _iterators.get(_goingForward ? them : them.reversed());
|
||||
if (oursPrio < theirsPrio) {
|
||||
_sortedIterators.put(key, iterator);
|
||||
advanceIterator(them);
|
||||
} else {
|
||||
Log.tracev("{0} Skipped: {1}", _name, iterator.peekNextKey());
|
||||
iterator.skip();
|
||||
advanceIterator(iterator);
|
||||
var newVal = _sortedIterators.merge(key, iteratorEntry, (theirsEntry, oldValOurs) -> {
|
||||
var oursPrio = oldValOurs.priority();
|
||||
var theirsPrio = theirsEntry.priority();
|
||||
|
||||
if (oursPrio < theirsPrio) {
|
||||
mutableBoolean.setValue(theirsEntry);
|
||||
return oldValOurs;
|
||||
// advance them
|
||||
// return
|
||||
} else {
|
||||
return theirsEntry;
|
||||
// skip, continue
|
||||
}
|
||||
});
|
||||
|
||||
if (newVal != iteratorEntry) {
|
||||
Log.tracev("{0} Skipped: {1}", _name, iteratorEntry.iterator().peekNextKey());
|
||||
iteratorEntry.iterator().skip();
|
||||
continue;
|
||||
}
|
||||
|
||||
if (mutableBoolean.getValue() != null) {
|
||||
advanceIterator(mutableBoolean.getValue());
|
||||
return;
|
||||
}
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -120,7 +144,7 @@ public class MergingKvIterator<K extends Comparable<K>, V> extends ReversibleKvI
|
||||
Log.tracev("{0} Reversing from {1}", _name, cur);
|
||||
_goingForward = !_goingForward;
|
||||
_sortedIterators.clear();
|
||||
for (CloseableKvIterator<K, V> iterator : _iterators.keySet()) {
|
||||
for (IteratorEntry<K, V> iterator : _iterators) {
|
||||
// _goingForward inverted already
|
||||
advanceIterator(!_goingForward ? iterator.reversed() : iterator);
|
||||
}
|
||||
@@ -150,7 +174,7 @@ public class MergingKvIterator<K extends Comparable<K>, V> extends ReversibleKvI
|
||||
if (cur == null) {
|
||||
throw new NoSuchElementException();
|
||||
}
|
||||
cur.getValue().skip();
|
||||
cur.getValue().iterator().skip();
|
||||
advanceIterator(cur.getValue());
|
||||
Log.tracev("{0} Skip: {1}, next: {2}", _name, cur, _sortedIterators);
|
||||
}
|
||||
@@ -166,7 +190,7 @@ public class MergingKvIterator<K extends Comparable<K>, V> extends ReversibleKvI
|
||||
if (cur == null) {
|
||||
throw new NoSuchElementException();
|
||||
}
|
||||
var curVal = cur.getValue().next();
|
||||
var curVal = cur.getValue().iterator().next();
|
||||
advanceIterator(cur.getValue());
|
||||
// Log.tracev("{0} Read from {1}: {2}, next: {3}", _name, cur.getValue(), curVal, _sortedIterators.keySet());
|
||||
return curVal;
|
||||
@@ -174,8 +198,8 @@ public class MergingKvIterator<K extends Comparable<K>, V> extends ReversibleKvI
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
for (CloseableKvIterator<K, V> iterator : _iterators.keySet()) {
|
||||
iterator.close();
|
||||
for (IteratorEntry<K, V> iterator : _iterators) {
|
||||
iterator.iterator().close();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -6,13 +6,14 @@ import com.usatiuk.objects.stores.WritebackObjectPersistentStore;
|
||||
import com.usatiuk.objects.transaction.TxRecord;
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Singleton;
|
||||
|
||||
import javax.annotation.Nonnull;
|
||||
import java.util.Collection;
|
||||
import java.util.Optional;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
@ApplicationScoped
|
||||
@Singleton
|
||||
public class SnapshotManager {
|
||||
@Inject
|
||||
WritebackObjectPersistentStore writebackStore;
|
||||
|
||||
@@ -2,6 +2,8 @@ package com.usatiuk.objects.stores;
|
||||
|
||||
import com.google.protobuf.ByteString;
|
||||
import com.google.protobuf.UnsafeByteOperations;
|
||||
import com.usatiuk.dhfs.supportlib.UninitializedByteBuffer;
|
||||
import com.usatiuk.dhfs.utils.RefcountedCloseable;
|
||||
import com.usatiuk.objects.JObjectKey;
|
||||
import com.usatiuk.objects.JObjectKeyMax;
|
||||
import com.usatiuk.objects.JObjectKeyMin;
|
||||
@@ -10,8 +12,6 @@ import com.usatiuk.objects.iterators.IteratorStart;
|
||||
import com.usatiuk.objects.iterators.KeyPredicateKvIterator;
|
||||
import com.usatiuk.objects.iterators.ReversibleKvIterator;
|
||||
import com.usatiuk.objects.snapshot.Snapshot;
|
||||
import com.usatiuk.dhfs.supportlib.UninitializedByteBuffer;
|
||||
import com.usatiuk.dhfs.utils.RefcountedCloseable;
|
||||
import io.quarkus.arc.properties.IfBuildProperty;
|
||||
import io.quarkus.logging.Log;
|
||||
import io.quarkus.runtime.ShutdownEvent;
|
||||
@@ -30,7 +30,6 @@ import java.lang.ref.Cleaner;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.nio.file.Path;
|
||||
import java.util.Arrays;
|
||||
import java.util.NoSuchElementException;
|
||||
import java.util.Optional;
|
||||
|
||||
@@ -41,7 +40,17 @@ import static org.lmdbjava.Env.create;
|
||||
@IfBuildProperty(name = "dhfs.objects.persistence", stringValue = "lmdb")
|
||||
public class LmdbObjectPersistentStore implements ObjectPersistentStore {
|
||||
private static final String DB_NAME = "objects";
|
||||
private static final byte[] DB_VER_OBJ_NAME = "__DB_VER_OBJ".getBytes(StandardCharsets.UTF_8);
|
||||
private static final String DB_VER_OBJ_NAME_STR = "__DB_VER_OBJ";
|
||||
private static final ByteBuffer DB_VER_OBJ_NAME;
|
||||
|
||||
static {
|
||||
byte[] tmp = DB_VER_OBJ_NAME_STR.getBytes(StandardCharsets.ISO_8859_1);
|
||||
var bb = ByteBuffer.allocateDirect(tmp.length);
|
||||
bb.put(tmp);
|
||||
bb.flip();
|
||||
DB_VER_OBJ_NAME = bb.asReadOnlyBuffer();
|
||||
}
|
||||
|
||||
private final Path _root;
|
||||
private Env<ByteBuffer> _env;
|
||||
private Dbi<ByteBuffer> _db;
|
||||
@@ -67,13 +76,10 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore {
|
||||
if (read.isPresent()) {
|
||||
Log.infov("Read tx id {0}", read.get());
|
||||
} else {
|
||||
var bb = ByteBuffer.allocateDirect(DB_VER_OBJ_NAME.length);
|
||||
bb.put(DB_VER_OBJ_NAME);
|
||||
bb.flip();
|
||||
var bbData = ByteBuffer.allocateDirect(8);
|
||||
bbData.putLong(0);
|
||||
bbData.flip();
|
||||
_db.put(txn, bb, bbData);
|
||||
_db.put(txn, DB_VER_OBJ_NAME.asReadOnlyBuffer(), bbData);
|
||||
txn.commit();
|
||||
}
|
||||
}
|
||||
@@ -82,10 +88,7 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore {
|
||||
}
|
||||
|
||||
private Optional<Long> readTxId(Txn<ByteBuffer> txn) {
|
||||
var bb = ByteBuffer.allocateDirect(DB_VER_OBJ_NAME.length);
|
||||
bb.put(DB_VER_OBJ_NAME);
|
||||
bb.flip();
|
||||
var value = _db.get(txn, bb);
|
||||
var value = _db.get(txn, DB_VER_OBJ_NAME.asReadOnlyBuffer());
|
||||
return Optional.ofNullable(value).map(ByteBuffer::getLong);
|
||||
}
|
||||
|
||||
@@ -121,7 +124,7 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore {
|
||||
@Override
|
||||
public CloseableKvIterator<JObjectKey, ByteString> getIterator(IteratorStart start, JObjectKey key) {
|
||||
assert !_closed;
|
||||
return new KeyPredicateKvIterator<>(new LmdbKvIterator(_txn.ref(), start, key), start, key, (k) -> !Arrays.equals(k.value().getBytes(StandardCharsets.UTF_8), DB_VER_OBJ_NAME));
|
||||
return new KeyPredicateKvIterator<>(new LmdbKvIterator(_txn.ref(), start, key), start, key, (k) -> !k.value().equals(DB_VER_OBJ_NAME_STR));
|
||||
}
|
||||
|
||||
@Nonnull
|
||||
@@ -168,13 +171,10 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore {
|
||||
|
||||
assert txId > readTxId(txn).orElseThrow();
|
||||
|
||||
var bb = ByteBuffer.allocateDirect(DB_VER_OBJ_NAME.length);
|
||||
bb.put(DB_VER_OBJ_NAME);
|
||||
bb.flip();
|
||||
var bbData = ByteBuffer.allocateDirect(8);
|
||||
bbData.putLong(txId);
|
||||
bbData.flip();
|
||||
_db.put(txn, bb, bbData);
|
||||
_db.put(txn, DB_VER_OBJ_NAME.asReadOnlyBuffer(), bbData);
|
||||
} catch (Throwable t) {
|
||||
txn.close();
|
||||
throw t;
|
||||
|
||||
@@ -1,11 +1,12 @@
|
||||
package com.usatiuk.objects.transaction;
|
||||
|
||||
import com.google.common.collect.Streams;
|
||||
import com.usatiuk.dhfs.utils.AutoCloseableNoThrow;
|
||||
import com.usatiuk.objects.JData;
|
||||
import com.usatiuk.objects.JDataVersionedWrapper;
|
||||
import com.usatiuk.objects.JObjectKey;
|
||||
import com.usatiuk.objects.snapshot.Snapshot;
|
||||
import com.usatiuk.objects.snapshot.SnapshotManager;
|
||||
import com.usatiuk.dhfs.utils.AutoCloseableNoThrow;
|
||||
import io.quarkus.logging.Log;
|
||||
import io.quarkus.runtime.StartupEvent;
|
||||
import jakarta.annotation.Priority;
|
||||
@@ -13,10 +14,10 @@ import jakarta.enterprise.context.ApplicationScoped;
|
||||
import jakarta.enterprise.event.Observes;
|
||||
import jakarta.enterprise.inject.Instance;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Singleton;
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
@@ -27,6 +28,12 @@ import java.util.stream.Stream;
|
||||
@ApplicationScoped
|
||||
public class JObjectManager {
|
||||
private final List<PreCommitTxHook> _preCommitTxHooks;
|
||||
|
||||
private record CommitHookIterationData(PreCommitTxHook hook,
|
||||
Map<JObjectKey, TxRecord.TxObjectRecord<?>> lastWrites,
|
||||
Map<JObjectKey, TxRecord.TxObjectRecord<?>> pendingWrites) {
|
||||
}
|
||||
|
||||
@Inject
|
||||
SnapshotManager snapshotManager;
|
||||
@Inject
|
||||
@@ -57,25 +64,27 @@ public class JObjectManager {
|
||||
|
||||
public Pair<Collection<Runnable>, TransactionHandle> commit(TransactionPrivate tx) {
|
||||
verifyReady();
|
||||
var writes = new LinkedHashMap<JObjectKey, TxRecord.TxObjectRecord<?>>();
|
||||
var dependenciesLocked = new LinkedHashMap<JObjectKey, Optional<JDataVersionedWrapper>>();
|
||||
var writes = new HashMap<JObjectKey, TxRecord.TxObjectRecord<?>>();
|
||||
Snapshot<JObjectKey, JDataVersionedWrapper> commitSnapshot = null;
|
||||
Map<JObjectKey, TransactionObject<?>> readSet;
|
||||
var toUnlock = new ArrayList<AutoCloseableNoThrow>();
|
||||
Map<JObjectKey, TransactionObject<?>> readSet = null;
|
||||
Collection<AutoCloseableNoThrow> toUnlock = null;
|
||||
|
||||
try {
|
||||
try {
|
||||
long pendingCount = 0;
|
||||
Map<PreCommitTxHook, Map<JObjectKey, TxRecord.TxObjectRecord<?>>> pendingWrites = Map.ofEntries(
|
||||
_preCommitTxHooks.stream().map(p -> Pair.of(p, new HashMap<>())).toArray(Pair[]::new)
|
||||
);
|
||||
Map<PreCommitTxHook, Map<JObjectKey, TxRecord.TxObjectRecord<?>>> lastWrites = Map.ofEntries(
|
||||
_preCommitTxHooks.stream().map(p -> Pair.of(p, new HashMap<>())).toArray(Pair[]::new)
|
||||
);
|
||||
List<CommitHookIterationData> hookIterationData;
|
||||
{
|
||||
CommitHookIterationData[] hookIterationDataArray = new CommitHookIterationData[_preCommitTxHooks.size()];
|
||||
for (int i = 0; i < _preCommitTxHooks.size(); i++) {
|
||||
var hook = _preCommitTxHooks.get(i);
|
||||
hookIterationDataArray[i] = new CommitHookIterationData(hook, new HashMap<>(), new HashMap<>());
|
||||
}
|
||||
hookIterationData = List.of(hookIterationDataArray);
|
||||
}
|
||||
|
||||
for (var n : tx.drainNewWrites()) {
|
||||
for (var hookPut : _preCommitTxHooks) {
|
||||
pendingWrites.get(hookPut).put(n.key(), n);
|
||||
for (var hookPut : hookIterationData) {
|
||||
hookPut.pendingWrites().put(n.key(), n);
|
||||
pendingCount++;
|
||||
}
|
||||
writes.put(n.key(), n);
|
||||
@@ -87,9 +96,10 @@ public class JObjectManager {
|
||||
// For example, when a hook makes changes to an object, and another hook changes the object before/after it
|
||||
// on the next iteration, the first hook should receive the version of the object it had created
|
||||
// as the "old" version, and the new version with all the changes after it.
|
||||
do {
|
||||
for (var hook : _preCommitTxHooks) {
|
||||
var lastCurHookSeen = lastWrites.get(hook);
|
||||
while (pendingCount > 0) {
|
||||
for (var hookId : hookIterationData) {
|
||||
var hook = hookId.hook();
|
||||
var lastCurHookSeen = hookId.lastWrites();
|
||||
Function<JObjectKey, JData> getPrev =
|
||||
key -> switch (lastCurHookSeen.get(key)) {
|
||||
case TxRecord.TxObjectRecordWrite<?> write -> write.data();
|
||||
@@ -100,7 +110,7 @@ public class JObjectManager {
|
||||
}
|
||||
};
|
||||
|
||||
var curIteration = pendingWrites.get(hook);
|
||||
var curIteration = hookId.pendingWrites();
|
||||
|
||||
// Log.trace("Commit iteration with " + curIteration.size() + " records for hook " + hook.getClass());
|
||||
|
||||
@@ -127,23 +137,23 @@ public class JObjectManager {
|
||||
curIteration.clear();
|
||||
|
||||
for (var n : tx.drainNewWrites()) {
|
||||
for (var hookPut : _preCommitTxHooks) {
|
||||
if (hookPut == hook) {
|
||||
for (var hookPut : hookIterationData) {
|
||||
if (hookPut == hookId) {
|
||||
lastCurHookSeen.put(n.key(), n);
|
||||
continue;
|
||||
}
|
||||
var before = pendingWrites.get(hookPut).put(n.key(), n);
|
||||
var before = hookPut.pendingWrites().put(n.key(), n);
|
||||
if (before == null)
|
||||
pendingCount++;
|
||||
}
|
||||
writes.put(n.key(), n);
|
||||
}
|
||||
}
|
||||
} while (pendingCount > 0);
|
||||
}
|
||||
} catch (Throwable e) {
|
||||
for (var read : tx.reads().entrySet()) {
|
||||
if (read.getValue() instanceof TransactionObjectLocked<?> locked) {
|
||||
toUnlock.add(locked.lock());
|
||||
locked.lock().close();
|
||||
}
|
||||
}
|
||||
throw e;
|
||||
@@ -152,25 +162,34 @@ public class JObjectManager {
|
||||
readSet = tx.reads();
|
||||
|
||||
if (!writes.isEmpty()) {
|
||||
Stream.concat(readSet.keySet().stream(), writes.keySet().stream())
|
||||
.sorted(Comparator.comparing(JObjectKey::toString))
|
||||
.forEach(k -> {
|
||||
var lock = lockManager.lockObject(k);
|
||||
toUnlock.add(lock);
|
||||
});
|
||||
toUnlock = new ArrayList<>(readSet.size() + writes.size());
|
||||
ArrayList<JObjectKey> toLock = new ArrayList<>(readSet.size() + writes.size());
|
||||
for (var read : readSet.entrySet()) {
|
||||
if (read.getValue() instanceof TransactionObjectLocked<?> locked) {
|
||||
toUnlock.add(locked.lock());
|
||||
} else {
|
||||
toLock.add(read.getKey());
|
||||
}
|
||||
}
|
||||
for (var write : writes.entrySet()) {
|
||||
toLock.add(write.getKey());
|
||||
}
|
||||
Collections.sort(toLock);
|
||||
for (var key : toLock) {
|
||||
var lock = lockManager.lockObject(key);
|
||||
toUnlock.add(lock);
|
||||
}
|
||||
|
||||
commitSnapshot = snapshotManager.createSnapshot();
|
||||
}
|
||||
|
||||
for (var read : readSet.entrySet()) {
|
||||
if (read.getValue() instanceof TransactionObjectLocked<?> locked) {
|
||||
toUnlock.add(locked.lock());
|
||||
}
|
||||
}
|
||||
|
||||
if (writes.isEmpty()) {
|
||||
} else {
|
||||
Log.trace("Committing transaction - no changes");
|
||||
|
||||
for (var read : readSet.values()) {
|
||||
if (read instanceof TransactionObjectLocked<?> locked) {
|
||||
locked.lock().close();
|
||||
}
|
||||
}
|
||||
|
||||
return Pair.of(
|
||||
Stream.concat(
|
||||
tx.getOnCommit().stream(),
|
||||
@@ -189,24 +208,23 @@ public class JObjectManager {
|
||||
|
||||
if (snapshotId != commitSnapshot.id()) {
|
||||
for (var read : readSet.entrySet()) {
|
||||
dependenciesLocked.put(read.getKey(), commitSnapshot.readObject(read.getKey()));
|
||||
var dep = dependenciesLocked.get(read.getKey());
|
||||
var current = commitSnapshot.readObject(read.getKey());
|
||||
|
||||
if (dep.isEmpty() != read.getValue().data().isEmpty()) {
|
||||
if (current.isEmpty() != read.getValue().data().isEmpty()) {
|
||||
Log.tracev("Checking read dependency {0} - not found", read.getKey());
|
||||
throw new TxCommitException("Serialization hazard: " + dep.isEmpty() + " vs " + read.getValue().data().isEmpty());
|
||||
throw new TxCommitException("Serialization hazard: " + current.isEmpty() + " vs " + read.getValue().data().isEmpty());
|
||||
}
|
||||
|
||||
if (dep.isEmpty()) {
|
||||
if (current.isEmpty()) {
|
||||
// TODO: Every write gets a dependency due to hooks
|
||||
continue;
|
||||
// assert false;
|
||||
// throw new TxCommitException("Serialization hazard: " + dep.isEmpty() + " vs " + read.getValue().data().isEmpty());
|
||||
}
|
||||
|
||||
if (dep.get().version() > snapshotId) {
|
||||
if (current.get().version() > snapshotId) {
|
||||
Log.tracev("Checking dependency {0} - newer than", read.getKey());
|
||||
throw new TxCommitException("Serialization hazard: " + dep.get().data().key() + " " + dep.get().version() + " vs " + snapshotId);
|
||||
throw new TxCommitException("Serialization hazard: " + current.get().data().key() + " " + current.get().version() + " vs " + snapshotId);
|
||||
}
|
||||
|
||||
Log.tracev("Checking dependency {0} - ok with read", read.getKey());
|
||||
@@ -215,21 +233,7 @@ public class JObjectManager {
|
||||
Log.tracev("Skipped dependency checks: no changes");
|
||||
}
|
||||
|
||||
boolean same = snapshotId == commitSnapshot.id();
|
||||
|
||||
var addFlushCallback = snapshotManager.commitTx(
|
||||
writes.values().stream()
|
||||
.filter(r -> {
|
||||
if (!same)
|
||||
if (r instanceof TxRecord.TxObjectRecordWrite<?>(JData data)) {
|
||||
var dep = dependenciesLocked.get(data.key());
|
||||
if (dep.isPresent() && dep.get().version() > snapshotId) {
|
||||
Log.trace("Skipping write " + data.key() + " - dependency " + dep.get().version() + " vs " + snapshotId);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}).toList());
|
||||
var addFlushCallback = snapshotManager.commitTx(writes.values());
|
||||
|
||||
for (var callback : tx.getOnFlush()) {
|
||||
addFlushCallback.accept(callback);
|
||||
@@ -247,9 +251,10 @@ public class JObjectManager {
|
||||
Log.trace("Error when committing transaction", t);
|
||||
throw new TxCommitException(t.getMessage(), t);
|
||||
} finally {
|
||||
for (var unlock : toUnlock) {
|
||||
unlock.close();
|
||||
}
|
||||
if (toUnlock != null)
|
||||
for (var unlock : toUnlock) {
|
||||
unlock.close();
|
||||
}
|
||||
if (commitSnapshot != null)
|
||||
commitSnapshot.close();
|
||||
tx.close();
|
||||
|
||||
@@ -6,8 +6,9 @@ import com.usatiuk.dhfs.utils.DataLocker;
|
||||
import jakarta.annotation.Nonnull;
|
||||
import jakarta.annotation.Nullable;
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
import jakarta.inject.Singleton;
|
||||
|
||||
@ApplicationScoped
|
||||
@Singleton
|
||||
public class LockManager {
|
||||
private final DataLocker _objLocker = new DataLocker();
|
||||
|
||||
|
||||
@@ -9,12 +9,13 @@ import com.usatiuk.objects.snapshot.SnapshotManager;
|
||||
import io.quarkus.logging.Log;
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Singleton;
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
import org.eclipse.microprofile.config.inject.ConfigProperty;
|
||||
|
||||
import java.util.*;
|
||||
|
||||
@ApplicationScoped
|
||||
@Singleton
|
||||
public class TransactionFactoryImpl implements TransactionFactory {
|
||||
@Inject
|
||||
SnapshotManager snapshotManager;
|
||||
|
||||
@@ -9,24 +9,20 @@ public class UninitializedByteBuffer {
|
||||
private static final Logger LOGGER = Logger.getLogger(UninitializedByteBuffer.class.getName());
|
||||
|
||||
public static ByteBuffer allocateUninitialized(int size) {
|
||||
try {
|
||||
if (size < DhfsSupport.PAGE_SIZE)
|
||||
return ByteBuffer.allocateDirect(size);
|
||||
if (size < DhfsSupport.PAGE_SIZE)
|
||||
return ByteBuffer.allocateDirect(size);
|
||||
|
||||
var bb = new ByteBuffer[1];
|
||||
long token = DhfsSupport.allocateUninitializedByteBuffer(bb, size);
|
||||
var ret = bb[0];
|
||||
CLEANER.register(ret, () -> {
|
||||
try {
|
||||
DhfsSupport.releaseByteBuffer(token);
|
||||
} catch (Throwable e) {
|
||||
LOGGER.severe("Error releasing buffer: " + e);
|
||||
System.exit(-1);
|
||||
}
|
||||
});
|
||||
return ret;
|
||||
} catch (OutOfMemoryError e) {
|
||||
return ByteBuffer.allocate(size);
|
||||
}
|
||||
var bb = new ByteBuffer[1];
|
||||
long token = DhfsSupport.allocateUninitializedByteBuffer(bb, size);
|
||||
var ret = bb[0];
|
||||
CLEANER.register(ret, () -> {
|
||||
try {
|
||||
DhfsSupport.releaseByteBuffer(token);
|
||||
} catch (Throwable e) {
|
||||
LOGGER.severe("Error releasing buffer: " + e);
|
||||
System.exit(-1);
|
||||
}
|
||||
});
|
||||
return ret;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -6,30 +6,27 @@ import org.pcollections.PCollection;
|
||||
|
||||
import java.util.Collection;
|
||||
|
||||
public record RemoteObjectDataWrapper<T extends JDataRemote>(PCollection<JDataRef> refsFrom,
|
||||
boolean frozen,
|
||||
T data) implements JDataRefcounted {
|
||||
public record RemoteObjectDataWrapper<T extends JDataRemote>(
|
||||
JObjectKey key,
|
||||
PCollection<JDataRef> refsFrom,
|
||||
boolean frozen,
|
||||
T data) implements JDataRefcounted {
|
||||
public RemoteObjectDataWrapper(T data) {
|
||||
this(HashTreePSet.empty(), false, data);
|
||||
this(RemoteObjectMeta.ofDataKey(data.key()), HashTreePSet.empty(), false, data);
|
||||
}
|
||||
|
||||
@Override
|
||||
public RemoteObjectDataWrapper<T> withRefsFrom(PCollection<JDataRef> refs) {
|
||||
return new RemoteObjectDataWrapper<>(refs, frozen, data);
|
||||
return new RemoteObjectDataWrapper<>(key, refs, frozen, data);
|
||||
}
|
||||
|
||||
@Override
|
||||
public RemoteObjectDataWrapper<T> withFrozen(boolean frozen) {
|
||||
return new RemoteObjectDataWrapper<>(refsFrom, frozen, data);
|
||||
return new RemoteObjectDataWrapper<>(key, refsFrom, frozen, data);
|
||||
}
|
||||
|
||||
public RemoteObjectDataWrapper<T> withData(T data) {
|
||||
return new RemoteObjectDataWrapper<>(refsFrom, frozen, data);
|
||||
}
|
||||
|
||||
@Override
|
||||
public JObjectKey key() {
|
||||
return RemoteObjectMeta.ofDataKey(data.key());
|
||||
return new RemoteObjectDataWrapper<>(key, refsFrom, frozen, data);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -9,12 +9,13 @@ import com.usatiuk.objects.transaction.Transaction;
|
||||
import io.quarkus.logging.Log;
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Singleton;
|
||||
import org.apache.commons.lang3.mutable.MutableObject;
|
||||
import org.pcollections.HashTreePSet;
|
||||
|
||||
import java.util.Optional;
|
||||
|
||||
@ApplicationScoped
|
||||
@Singleton
|
||||
public class RemoteTransaction {
|
||||
@Inject
|
||||
Transaction curTx;
|
||||
|
||||
@@ -3,11 +3,11 @@ package com.usatiuk.dhfs.jmap;
|
||||
import com.usatiuk.objects.JData;
|
||||
import com.usatiuk.objects.JObjectKey;
|
||||
|
||||
public record JMapEntry<K extends JMapKey>(JObjectKey holder,
|
||||
public record JMapEntry<K extends JMapKey>(JObjectKey key,
|
||||
JObjectKey holder,
|
||||
K selfKey,
|
||||
JObjectKey ref) implements JData {
|
||||
@Override
|
||||
public JObjectKey key() {
|
||||
return JMapHelper.makeKey(holder, selfKey);
|
||||
public JMapEntry(JObjectKey holder, K selfKey, JObjectKey ref) {
|
||||
this(JMapHelper.makeKey(holder, selfKey), holder, selfKey, ref);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -7,11 +7,12 @@ import com.usatiuk.objects.transaction.Transaction;
|
||||
import io.quarkus.logging.Log;
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Singleton;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Optional;
|
||||
|
||||
@ApplicationScoped
|
||||
@Singleton
|
||||
public class JMapHelper {
|
||||
@Inject
|
||||
Transaction curTx;
|
||||
|
||||
@@ -15,57 +15,47 @@ public class DataLocker {
|
||||
@Nonnull
|
||||
public AutoCloseableNoThrow lock(Object data) {
|
||||
while (true) {
|
||||
var newTag = new LockTag();
|
||||
var oldTag = _locks.putIfAbsent(data, newTag);
|
||||
if (oldTag == null) {
|
||||
return new Lock(data, newTag);
|
||||
}
|
||||
try {
|
||||
var tag = _locks.get(data);
|
||||
if (tag != null) {
|
||||
synchronized (tag) {
|
||||
while (!tag.released) {
|
||||
if (tag.owner == Thread.currentThread()) {
|
||||
return DUMMY_LOCK;
|
||||
}
|
||||
tag.wait();
|
||||
synchronized (oldTag) {
|
||||
while (!oldTag.released) {
|
||||
if (oldTag.owner == Thread.currentThread()) {
|
||||
return DUMMY_LOCK;
|
||||
}
|
||||
oldTag.wait();
|
||||
// tag.wait(4000L);
|
||||
// if (!tag.released) {
|
||||
// System.out.println("Timeout waiting for lock: " + data);
|
||||
// System.exit(1);
|
||||
// throw new InterruptedException();
|
||||
// }
|
||||
}
|
||||
continue;
|
||||
}
|
||||
}
|
||||
} catch (InterruptedException ignored) {
|
||||
}
|
||||
|
||||
var newTag = new LockTag();
|
||||
var oldTag = _locks.putIfAbsent(data, newTag);
|
||||
if (oldTag == null) {
|
||||
return new Lock(data, newTag);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Nullable
|
||||
public AutoCloseableNoThrow tryLock(Object data) {
|
||||
while (true) {
|
||||
var tag = _locks.get(data);
|
||||
if (tag != null) {
|
||||
synchronized (tag) {
|
||||
if (!tag.released) {
|
||||
if (tag.owner == Thread.currentThread()) {
|
||||
return DUMMY_LOCK;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
var newTag = new LockTag();
|
||||
var oldTag = _locks.putIfAbsent(data, newTag);
|
||||
if (oldTag == null) {
|
||||
return new Lock(data, newTag);
|
||||
}
|
||||
synchronized (oldTag) {
|
||||
if (!oldTag.released) {
|
||||
if (oldTag.owner == Thread.currentThread()) {
|
||||
return DUMMY_LOCK;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -83,11 +73,11 @@ public class DataLocker {
|
||||
public Lock(Object key, LockTag tag) {
|
||||
_key = key;
|
||||
_tag = tag;
|
||||
CLEANER.register(this, () -> {
|
||||
if (!tag.released) {
|
||||
Log.error("Lock collected without release: " + key);
|
||||
}
|
||||
});
|
||||
// CLEANER.register(this, () -> {
|
||||
// if (!tag.released) {
|
||||
// Log.error("Lock collected without release: " + key);
|
||||
// }
|
||||
// });
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
Reference in New Issue
Block a user