mirror of
https://github.com/usatiuk/dhfs.git
synced 2025-10-29 04:57:48 +01:00
Compare commits
10 Commits
speed-dump
...
7aa968a569
| Author | SHA1 | Date | |
|---|---|---|---|
| 7aa968a569 | |||
| e348c39be1 | |||
| 1b54830651 | |||
| bc5f0b816c | |||
| 9ff914bdaa | |||
| 1cee6f62b8 | |||
| 81703a9406 | |||
| 1757034e0b | |||
| d9765a51d8 | |||
| 99ef560b95 |
@@ -83,7 +83,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
|
||||
private ChunkData createChunk(ByteString bytes) {
|
||||
var newChunk = new ChunkData(JObjectKey.of(UUID.randomUUID().toString()), bytes);
|
||||
remoteTx.putData(newChunk);
|
||||
remoteTx.putDataNew(newChunk);
|
||||
return newChunk;
|
||||
}
|
||||
|
||||
|
||||
@@ -7,6 +7,7 @@ import com.usatiuk.dhfsfs.service.DirectoryNotEmptyException;
|
||||
import com.usatiuk.dhfsfs.service.GetattrRes;
|
||||
import com.usatiuk.kleppmanntree.AlreadyExistsException;
|
||||
import com.usatiuk.objects.JObjectKey;
|
||||
import com.usatiuk.utils.UnsafeAccessor;
|
||||
import io.grpc.Status;
|
||||
import io.grpc.StatusRuntimeException;
|
||||
import io.quarkus.logging.Log;
|
||||
@@ -51,8 +52,6 @@ public class DhfsFuse extends FuseStubFS {
|
||||
@ConfigProperty(name = "dhfs.files.target_chunk_size")
|
||||
int targetChunkSize;
|
||||
@Inject
|
||||
JnrPtrByteOutputAccessors jnrPtrByteOutputAccessors;
|
||||
@Inject
|
||||
DhfsFileService fileService;
|
||||
|
||||
private long allocateHandle(JObjectKey key) {
|
||||
@@ -231,7 +230,7 @@ public class DhfsFuse extends FuseStubFS {
|
||||
var fileKey = getFromHandle(fi.fh.get());
|
||||
var read = fileService.read(fileKey, offset, (int) size);
|
||||
if (read.isEmpty()) return 0;
|
||||
UnsafeByteOperations.unsafeWriteTo(read, new JnrPtrByteOutput(jnrPtrByteOutputAccessors, buf, size));
|
||||
UnsafeByteOperations.unsafeWriteTo(read, new JnrPtrByteOutput(buf, size));
|
||||
return read.size();
|
||||
} catch (Throwable e) {
|
||||
Log.error("When reading " + path, e);
|
||||
@@ -246,19 +245,15 @@ public class DhfsFuse extends FuseStubFS {
|
||||
var fileKey = getFromHandle(fi.fh.get());
|
||||
var buffer = ByteBuffer.allocateDirect((int) size);
|
||||
|
||||
if (buffer.isDirect()) {
|
||||
jnrPtrByteOutputAccessors.getUnsafe().copyMemory(
|
||||
buf.address(),
|
||||
jnrPtrByteOutputAccessors.getNioAccess().getBufferAddress(buffer),
|
||||
size
|
||||
);
|
||||
} else {
|
||||
buf.get(0, buffer.array(), 0, (int) size);
|
||||
}
|
||||
UnsafeAccessor.get().getUnsafe().copyMemory(
|
||||
buf.address(),
|
||||
UnsafeAccessor.get().getNioAccess().getBufferAddress(buffer),
|
||||
size
|
||||
);
|
||||
|
||||
var written = fileService.write(fileKey, offset, UnsafeByteOperations.unsafeWrap(buffer));
|
||||
return written.intValue();
|
||||
} catch (Throwable e) {
|
||||
} catch (Exception e) {
|
||||
Log.error("When writing " + path, e);
|
||||
return -ErrorCodes.EIO();
|
||||
}
|
||||
@@ -394,7 +389,7 @@ public class DhfsFuse extends FuseStubFS {
|
||||
var file = fileOpt.get();
|
||||
var read = fileService.readlinkBS(fileOpt.get());
|
||||
if (read.isEmpty()) return 0;
|
||||
UnsafeByteOperations.unsafeWriteTo(read, new JnrPtrByteOutput(jnrPtrByteOutputAccessors, buf, size));
|
||||
UnsafeByteOperations.unsafeWriteTo(read, new JnrPtrByteOutput(buf, size));
|
||||
buf.putByte(Math.min(size - 1, read.size()), (byte) 0);
|
||||
return 0;
|
||||
} catch (Throwable e) {
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package com.usatiuk.dhfsfuse;
|
||||
|
||||
import com.google.protobuf.ByteOutput;
|
||||
import com.usatiuk.utils.UnsafeAccessor;
|
||||
import jnr.ffi.Pointer;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
@@ -9,14 +10,12 @@ import java.nio.MappedByteBuffer;
|
||||
public class JnrPtrByteOutput extends ByteOutput {
|
||||
private final Pointer _backing;
|
||||
private final long _size;
|
||||
private final JnrPtrByteOutputAccessors _accessors;
|
||||
private long _pos;
|
||||
|
||||
public JnrPtrByteOutput(JnrPtrByteOutputAccessors accessors, Pointer backing, long size) {
|
||||
public JnrPtrByteOutput(Pointer backing, long size) {
|
||||
_backing = backing;
|
||||
_size = size;
|
||||
_pos = 0;
|
||||
_accessors = accessors;
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -47,9 +46,9 @@ public class JnrPtrByteOutput extends ByteOutput {
|
||||
if (value instanceof MappedByteBuffer mb) {
|
||||
mb.load();
|
||||
}
|
||||
long addr = _accessors.getNioAccess().getBufferAddress(value) + value.position();
|
||||
long addr = UnsafeAccessor.get().getNioAccess().getBufferAddress(value) + value.position();
|
||||
var out = _backing.address() + _pos;
|
||||
_accessors.getUnsafe().copyMemory(addr, out, rem);
|
||||
UnsafeAccessor.get().getUnsafe().copyMemory(addr, out, rem);
|
||||
} else {
|
||||
_backing.put(_pos, value.array(), value.arrayOffset() + value.position(), rem);
|
||||
}
|
||||
|
||||
@@ -1,29 +0,0 @@
|
||||
package com.usatiuk.dhfsfuse;
|
||||
|
||||
import jakarta.inject.Singleton;
|
||||
import jdk.internal.access.JavaNioAccess;
|
||||
import jdk.internal.access.SharedSecrets;
|
||||
import sun.misc.Unsafe;
|
||||
|
||||
import java.lang.reflect.Field;
|
||||
|
||||
@Singleton
|
||||
class JnrPtrByteOutputAccessors {
|
||||
JavaNioAccess _nioAccess;
|
||||
Unsafe _unsafe;
|
||||
|
||||
JnrPtrByteOutputAccessors() throws NoSuchFieldException, IllegalAccessException {
|
||||
_nioAccess = SharedSecrets.getJavaNioAccess();
|
||||
Field f = Unsafe.class.getDeclaredField("theUnsafe");
|
||||
f.setAccessible(true);
|
||||
_unsafe = (Unsafe) f.get(null);
|
||||
}
|
||||
|
||||
public JavaNioAccess getNioAccess() {
|
||||
return _nioAccess;
|
||||
}
|
||||
|
||||
public Unsafe getUnsafe() {
|
||||
return _unsafe;
|
||||
}
|
||||
}
|
||||
@@ -8,11 +8,10 @@ import jakarta.inject.Singleton;
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
@Singleton
|
||||
public class JDataVersionedWrapperSerializer implements ObjectSerializer<JDataVersionedWrapper> {
|
||||
public class JDataVersionedWrapperSerializer {
|
||||
@Inject
|
||||
ObjectSerializer<JData> dataSerializer;
|
||||
|
||||
@Override
|
||||
public ByteString serialize(JDataVersionedWrapper obj) {
|
||||
ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES);
|
||||
buffer.putLong(obj.version());
|
||||
@@ -20,12 +19,10 @@ public class JDataVersionedWrapperSerializer implements ObjectSerializer<JDataVe
|
||||
return ByteString.copyFrom(buffer).concat(dataSerializer.serialize(obj.data()));
|
||||
}
|
||||
|
||||
@Override
|
||||
public JDataVersionedWrapper deserialize(ByteString data) {
|
||||
var version = data.substring(0, Long.BYTES).asReadOnlyByteBuffer().getLong();
|
||||
var rawData = data.substring(Long.BYTES);
|
||||
return new JDataVersionedWrapperLazy(version, rawData.size(),
|
||||
() -> dataSerializer.deserialize(rawData)
|
||||
public JDataVersionedWrapper deserialize(ByteBuffer data) {
|
||||
var version = data.getLong();
|
||||
return new JDataVersionedWrapperLazy(version, data.remaining(),
|
||||
() -> dataSerializer.deserialize(data)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -69,7 +69,7 @@ public final class JObjectKeyImpl implements JObjectKey {
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(value);
|
||||
return value.hashCode();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -2,11 +2,13 @@ package com.usatiuk.objects;
|
||||
|
||||
|
||||
import com.google.protobuf.ByteString;
|
||||
import com.google.protobuf.UnsafeByteOperations;
|
||||
import com.usatiuk.utils.SerializationHelper;
|
||||
import io.quarkus.arc.DefaultBean;
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
@ApplicationScoped
|
||||
@DefaultBean
|
||||
@@ -16,9 +18,8 @@ public class JavaDataSerializer implements ObjectSerializer<JData> {
|
||||
return SerializationHelper.serialize(obj);
|
||||
}
|
||||
|
||||
@Override
|
||||
public JData deserialize(ByteString data) {
|
||||
try (var is = data.newInput()) {
|
||||
public JData deserialize(ByteBuffer data) {
|
||||
try (var is = UnsafeByteOperations.unsafeWrap(data).newInput()) {
|
||||
return SerializationHelper.deserialize(is);
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
|
||||
@@ -2,8 +2,10 @@ package com.usatiuk.objects;
|
||||
|
||||
import com.google.protobuf.ByteString;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
public interface ObjectSerializer<T> {
|
||||
ByteString serialize(T obj);
|
||||
|
||||
T deserialize(ByteString data);
|
||||
T deserialize(ByteBuffer data);
|
||||
}
|
||||
|
||||
@@ -1,7 +1,5 @@
|
||||
package com.usatiuk.objects.stores;
|
||||
|
||||
import com.google.protobuf.ByteString;
|
||||
import com.google.protobuf.UnsafeByteOperations;
|
||||
import com.usatiuk.objects.JObjectKey;
|
||||
import com.usatiuk.objects.JObjectKeyMax;
|
||||
import com.usatiuk.objects.JObjectKeyMin;
|
||||
@@ -104,27 +102,27 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore {
|
||||
}
|
||||
|
||||
@Override
|
||||
public Snapshot<JObjectKey, ByteString> getSnapshot() {
|
||||
public Snapshot<JObjectKey, ByteBuffer> getSnapshot() {
|
||||
var txn = _env.txnRead();
|
||||
try {
|
||||
long commitId = readTxId(txn).orElseThrow();
|
||||
return new Snapshot<JObjectKey, ByteString>() {
|
||||
return new Snapshot<JObjectKey, ByteBuffer>() {
|
||||
private final Txn<ByteBuffer> _txn = txn;
|
||||
private final long _id = commitId;
|
||||
private boolean _closed = false;
|
||||
|
||||
@Override
|
||||
public CloseableKvIterator<JObjectKey, ByteString> getIterator(IteratorStart start, JObjectKey key) {
|
||||
public CloseableKvIterator<JObjectKey, ByteBuffer> getIterator(IteratorStart start, JObjectKey key) {
|
||||
assert !_closed;
|
||||
return new KeyPredicateKvIterator<>(new LmdbKvIterator(_txn, start, key), start, key, (k) -> !k.value().equals(DB_VER_OBJ_NAME_STR));
|
||||
}
|
||||
|
||||
@Nonnull
|
||||
@Override
|
||||
public Optional<ByteString> readObject(JObjectKey name) {
|
||||
public Optional<ByteBuffer> readObject(JObjectKey name) {
|
||||
assert !_closed;
|
||||
var got = _db.get(_txn, name.toByteBuffer());
|
||||
var ret = Optional.ofNullable(got).map(UnsafeByteOperations::unsafeWrap);
|
||||
var ret = Optional.ofNullable(got).map(ByteBuffer::asReadOnlyBuffer);
|
||||
return ret;
|
||||
}
|
||||
|
||||
@@ -197,7 +195,7 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore {
|
||||
return _root.toFile().getUsableSpace();
|
||||
}
|
||||
|
||||
private class LmdbKvIterator extends ReversibleKvIterator<JObjectKey, ByteString> {
|
||||
private class LmdbKvIterator extends ReversibleKvIterator<JObjectKey, ByteBuffer> {
|
||||
private static final Cleaner CLEANER = Cleaner.create();
|
||||
private final Txn<ByteBuffer> _txn; // Managed by the snapshot
|
||||
private final Cursor<ByteBuffer> _cursor;
|
||||
@@ -352,14 +350,13 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Pair<JObjectKey, ByteString> nextImpl() {
|
||||
protected Pair<JObjectKey, ByteBuffer> nextImpl() {
|
||||
if (!_hasNext) {
|
||||
throw new NoSuchElementException("No more elements");
|
||||
}
|
||||
// TODO: Right now with java serialization it doesn't matter, it's all copied to arrays anyway
|
||||
var val = _cursor.val();
|
||||
var bs = UnsafeByteOperations.unsafeWrap(val);
|
||||
var ret = Pair.of(JObjectKey.fromByteBuffer(_cursor.key()), bs);
|
||||
var ret = Pair.of(JObjectKey.fromByteBuffer(_cursor.key()), val.asReadOnlyBuffer());
|
||||
if (_goingForward)
|
||||
_hasNext = _cursor.next();
|
||||
else
|
||||
|
||||
@@ -4,6 +4,7 @@ import com.google.protobuf.ByteString;
|
||||
import com.usatiuk.objects.JObjectKey;
|
||||
import com.usatiuk.objects.iterators.CloseableKvIterator;
|
||||
import com.usatiuk.objects.iterators.IteratorStart;
|
||||
import com.usatiuk.objects.iterators.MappingKvIterator;
|
||||
import com.usatiuk.objects.iterators.NavigableMapKvIterator;
|
||||
import com.usatiuk.objects.snapshot.Snapshot;
|
||||
import io.quarkus.arc.properties.IfBuildProperty;
|
||||
@@ -11,6 +12,7 @@ import jakarta.enterprise.context.ApplicationScoped;
|
||||
import org.pcollections.TreePMap;
|
||||
|
||||
import javax.annotation.Nonnull;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
|
||||
@@ -22,21 +24,21 @@ public class MemoryObjectPersistentStore implements ObjectPersistentStore {
|
||||
private long _lastCommitId = 0;
|
||||
|
||||
@Override
|
||||
public Snapshot<JObjectKey, ByteString> getSnapshot() {
|
||||
public Snapshot<JObjectKey, ByteBuffer> getSnapshot() {
|
||||
synchronized (this) {
|
||||
return new Snapshot<JObjectKey, ByteString>() {
|
||||
return new Snapshot<JObjectKey, ByteBuffer>() {
|
||||
private final TreePMap<JObjectKey, ByteString> _objects = MemoryObjectPersistentStore.this._objects;
|
||||
private final long _lastCommitId = MemoryObjectPersistentStore.this._lastCommitId;
|
||||
|
||||
@Override
|
||||
public CloseableKvIterator<JObjectKey, ByteString> getIterator(IteratorStart start, JObjectKey key) {
|
||||
return new NavigableMapKvIterator<>(_objects, start, key);
|
||||
public CloseableKvIterator<JObjectKey, ByteBuffer> getIterator(IteratorStart start, JObjectKey key) {
|
||||
return new MappingKvIterator<>(new NavigableMapKvIterator<>(_objects, start, key), ByteString::asReadOnlyByteBuffer);
|
||||
}
|
||||
|
||||
@Nonnull
|
||||
@Override
|
||||
public Optional<ByteString> readObject(JObjectKey name) {
|
||||
return Optional.ofNullable(_objects.get(name));
|
||||
public Optional<ByteBuffer> readObject(JObjectKey name) {
|
||||
return Optional.ofNullable(_objects.get(name)).map(ByteString::asReadOnlyByteBuffer);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -5,12 +5,13 @@ import com.usatiuk.objects.JObjectKey;
|
||||
import com.usatiuk.objects.snapshot.Snapshot;
|
||||
|
||||
import javax.annotation.Nonnull;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Optional;
|
||||
|
||||
// Persistent storage of objects
|
||||
// All changes are written as sequential transactions
|
||||
public interface ObjectPersistentStore {
|
||||
Snapshot<JObjectKey, ByteString> getSnapshot();
|
||||
Snapshot<JObjectKey, ByteBuffer> getSnapshot();
|
||||
|
||||
Runnable prepareTx(TxManifestRaw names, long txId);
|
||||
|
||||
|
||||
@@ -2,6 +2,7 @@ package com.usatiuk.objects.stores;
|
||||
|
||||
import com.google.protobuf.ByteString;
|
||||
import com.usatiuk.objects.JDataVersionedWrapper;
|
||||
import com.usatiuk.objects.JDataVersionedWrapperSerializer;
|
||||
import com.usatiuk.objects.JObjectKey;
|
||||
import com.usatiuk.objects.ObjectSerializer;
|
||||
import com.usatiuk.objects.iterators.CloseableKvIterator;
|
||||
@@ -13,19 +14,20 @@ import jakarta.inject.Inject;
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
|
||||
import javax.annotation.Nonnull;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Optional;
|
||||
|
||||
@ApplicationScoped
|
||||
public class SerializingObjectPersistentStore {
|
||||
@Inject
|
||||
ObjectSerializer<JDataVersionedWrapper> serializer;
|
||||
JDataVersionedWrapperSerializer serializer;
|
||||
|
||||
@Inject
|
||||
ObjectPersistentStore delegateStore;
|
||||
|
||||
public Snapshot<JObjectKey, JDataVersionedWrapper> getSnapshot() {
|
||||
return new Snapshot<JObjectKey, JDataVersionedWrapper>() {
|
||||
private final Snapshot<JObjectKey, ByteString> _backing = delegateStore.getSnapshot();
|
||||
private final Snapshot<JObjectKey, ByteBuffer> _backing = delegateStore.getSnapshot();
|
||||
|
||||
@Override
|
||||
public CloseableKvIterator<JObjectKey, JDataVersionedWrapper> getIterator(IteratorStart start, JObjectKey key) {
|
||||
|
||||
@@ -43,4 +43,9 @@ public class CurrentTransaction implements Transaction {
|
||||
public <T extends JData> void put(JData obj) {
|
||||
transactionManager.current().put(obj);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T extends JData> void putNew(JData obj) {
|
||||
transactionManager.current().putNew(obj);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -12,6 +12,7 @@ import jakarta.annotation.Priority;
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
import jakarta.enterprise.event.Observes;
|
||||
import jakarta.enterprise.inject.Instance;
|
||||
import jakarta.enterprise.inject.spi.CDI;
|
||||
import jakarta.inject.Inject;
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
|
||||
@@ -22,7 +23,7 @@ import java.util.stream.Stream;
|
||||
|
||||
@ApplicationScoped
|
||||
public class JObjectManager {
|
||||
private final List<PreCommitTxHook> _preCommitTxHooks;
|
||||
private static final List<PreCommitTxHook> _preCommitTxHooks;
|
||||
@Inject
|
||||
WritebackObjectPersistentStore writebackObjectPersistentStore;
|
||||
@Inject
|
||||
@@ -30,8 +31,12 @@ public class JObjectManager {
|
||||
@Inject
|
||||
LockManager lockManager;
|
||||
private boolean _ready = false;
|
||||
|
||||
static {
|
||||
_preCommitTxHooks = List.copyOf(CDI.current().select(PreCommitTxHook.class).stream().sorted(Comparator.comparingInt(PreCommitTxHook::getPriority)).toList());
|
||||
}
|
||||
|
||||
JObjectManager(Instance<PreCommitTxHook> preCommitTxHooks) {
|
||||
_preCommitTxHooks = List.copyOf(preCommitTxHooks.stream().sorted(Comparator.comparingInt(PreCommitTxHook::getPriority)).toList());
|
||||
Log.debugv("Pre-commit hooks: {0}", String.join("->", _preCommitTxHooks.stream().map(Objects::toString).toList()));
|
||||
}
|
||||
|
||||
@@ -160,7 +165,8 @@ public class JObjectManager {
|
||||
}
|
||||
}
|
||||
for (var write : writes.entrySet()) {
|
||||
toLock.add(write.getKey());
|
||||
if (!readSet.containsKey(write.getKey()))
|
||||
toLock.add(write.getKey());
|
||||
}
|
||||
Collections.sort(toLock);
|
||||
for (var key : toLock) {
|
||||
|
||||
@@ -14,6 +14,7 @@ public interface Transaction extends TransactionHandle {
|
||||
<T extends JData> Optional<T> get(Class<T> type, JObjectKey key, LockingStrategy strategy);
|
||||
|
||||
<T extends JData> void put(JData obj);
|
||||
<T extends JData> void putNew(JData obj);
|
||||
|
||||
void delete(JObjectKey key);
|
||||
|
||||
|
||||
@@ -59,6 +59,7 @@ public class TransactionFactoryImpl implements TransactionFactory {
|
||||
private final NavigableMap<JObjectKey, TxRecord.TxObjectRecord<?>> _writes = new TreeMap<>();
|
||||
private final List<Runnable> _onCommit = new ArrayList<>();
|
||||
private final List<Runnable> _onFlush = new ArrayList<>();
|
||||
private final HashSet<JObjectKey> _knownNew = new HashSet<>();
|
||||
private final Snapshot<JObjectKey, JDataVersionedWrapper> _snapshot;
|
||||
private boolean _closed = false;
|
||||
private Map<JObjectKey, TxRecord.TxObjectRecord<?>> _newWrites = new HashMap<>();
|
||||
@@ -94,6 +95,9 @@ public class TransactionFactoryImpl implements TransactionFactory {
|
||||
|
||||
@Override
|
||||
public <T extends JData> Optional<T> getFromSource(Class<T> type, JObjectKey key) {
|
||||
if (_knownNew.contains(key)) {
|
||||
return Optional.empty();
|
||||
}
|
||||
return _readSet.computeIfAbsent(key, k -> {
|
||||
var read = _snapshot.readObject(k);
|
||||
return new TransactionObjectNoLock<>(read);
|
||||
@@ -181,6 +185,14 @@ public class TransactionFactoryImpl implements TransactionFactory {
|
||||
_newWrites.put(obj.key(), new TxRecord.TxObjectRecordWrite<>(obj));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void putNew(JData obj) {
|
||||
_knownNew.add(obj.key());
|
||||
|
||||
_writes.put(obj.key(), new TxRecord.TxObjectRecordWrite<>(obj));
|
||||
_newWrites.put(obj.key(), new TxRecord.TxObjectRecordWrite<>(obj));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Collection<TxRecord.TxObjectRecord<?>> drainNewWrites() {
|
||||
var ret = _newWrites;
|
||||
|
||||
@@ -13,6 +13,7 @@ import org.apache.commons.lang3.tuple.Pair;
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.RepeatedTest;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.List;
|
||||
|
||||
class Profiles {
|
||||
@@ -46,49 +47,49 @@ public class LmdbKvIteratorTest {
|
||||
|
||||
try (var snapshot = store.getSnapshot()) {
|
||||
var iterator = snapshot.getIterator(IteratorStart.GE, JObjectKey.of(""));
|
||||
Just.checkIterator(iterator, List.of(Pair.of(JObjectKey.of(Long.toString(1)), ByteString.copyFrom(new byte[]{2})),
|
||||
Pair.of(JObjectKey.of(Long.toString(2)), ByteString.copyFrom(new byte[]{3})),
|
||||
Pair.of(JObjectKey.of(Long.toString(3)), ByteString.copyFrom(new byte[]{4}))));
|
||||
Just.checkIterator(iterator, List.of(Pair.of(JObjectKey.of(Long.toString(1)), ByteBuffer.wrap(new byte[]{2})),
|
||||
Pair.of(JObjectKey.of(Long.toString(2)), ByteBuffer.wrap(new byte[]{3})),
|
||||
Pair.of(JObjectKey.of(Long.toString(3)), ByteBuffer.wrap(new byte[]{4}))));
|
||||
Assertions.assertFalse(iterator.hasNext());
|
||||
iterator.close();
|
||||
|
||||
iterator = snapshot.getIterator(IteratorStart.LE, JObjectKey.of(Long.toString(3)));
|
||||
Just.checkIterator(iterator, Pair.of(JObjectKey.of(Long.toString(3)), ByteString.copyFrom(new byte[]{4})));
|
||||
Just.checkIterator(iterator, Pair.of(JObjectKey.of(Long.toString(3)), ByteBuffer.wrap(new byte[]{4})));
|
||||
Assertions.assertFalse(iterator.hasNext());
|
||||
iterator.close();
|
||||
|
||||
iterator = snapshot.getIterator(IteratorStart.LE, JObjectKey.of(Long.toString(2)));
|
||||
Just.checkIterator(iterator, Pair.of(JObjectKey.of(Long.toString(2)), ByteString.copyFrom(new byte[]{3})), Pair.of(JObjectKey.of(Long.toString(3)), ByteString.copyFrom(new byte[]{4})));
|
||||
Just.checkIterator(iterator, Pair.of(JObjectKey.of(Long.toString(2)), ByteBuffer.wrap(new byte[]{3})), Pair.of(JObjectKey.of(Long.toString(3)), ByteBuffer.wrap(new byte[]{4})));
|
||||
Assertions.assertFalse(iterator.hasNext());
|
||||
iterator.close();
|
||||
|
||||
iterator = snapshot.getIterator(IteratorStart.GE, JObjectKey.of(Long.toString(2)));
|
||||
Just.checkIterator(iterator, Pair.of(JObjectKey.of(Long.toString(2)), ByteString.copyFrom(new byte[]{3})), Pair.of(JObjectKey.of(Long.toString(3)), ByteString.copyFrom(new byte[]{4})));
|
||||
Just.checkIterator(iterator, Pair.of(JObjectKey.of(Long.toString(2)), ByteBuffer.wrap(new byte[]{3})), Pair.of(JObjectKey.of(Long.toString(3)), ByteBuffer.wrap(new byte[]{4})));
|
||||
Assertions.assertFalse(iterator.hasNext());
|
||||
iterator.close();
|
||||
|
||||
iterator = snapshot.getIterator(IteratorStart.GT, JObjectKey.of(Long.toString(2)));
|
||||
Just.checkIterator(iterator, Pair.of(JObjectKey.of(Long.toString(3)), ByteString.copyFrom(new byte[]{4})));
|
||||
Just.checkIterator(iterator, Pair.of(JObjectKey.of(Long.toString(3)), ByteBuffer.wrap(new byte[]{4})));
|
||||
Assertions.assertFalse(iterator.hasNext());
|
||||
iterator.close();
|
||||
|
||||
iterator = snapshot.getIterator(IteratorStart.LT, JObjectKey.of(Long.toString(3)));
|
||||
Just.checkIterator(iterator, Pair.of(JObjectKey.of(Long.toString(2)), ByteString.copyFrom(new byte[]{3})), Pair.of(JObjectKey.of(Long.toString(3)), ByteString.copyFrom(new byte[]{4})));
|
||||
Just.checkIterator(iterator, Pair.of(JObjectKey.of(Long.toString(2)), ByteBuffer.wrap(new byte[]{3})), Pair.of(JObjectKey.of(Long.toString(3)), ByteBuffer.wrap(new byte[]{4})));
|
||||
Assertions.assertFalse(iterator.hasNext());
|
||||
iterator.close();
|
||||
|
||||
iterator = snapshot.getIterator(IteratorStart.LT, JObjectKey.of(Long.toString(2)));
|
||||
Just.checkIterator(iterator, Pair.of(JObjectKey.of(Long.toString(1)), ByteString.copyFrom(new byte[]{2})), Pair.of(JObjectKey.of(Long.toString(2)), ByteString.copyFrom(new byte[]{3})), Pair.of(JObjectKey.of(Long.toString(3)), ByteString.copyFrom(new byte[]{4})));
|
||||
Just.checkIterator(iterator, Pair.of(JObjectKey.of(Long.toString(1)), ByteBuffer.wrap(new byte[]{2})), Pair.of(JObjectKey.of(Long.toString(2)), ByteBuffer.wrap(new byte[]{3})), Pair.of(JObjectKey.of(Long.toString(3)), ByteBuffer.wrap(new byte[]{4})));
|
||||
Assertions.assertFalse(iterator.hasNext());
|
||||
iterator.close();
|
||||
|
||||
iterator = snapshot.getIterator(IteratorStart.LT, JObjectKey.of(Long.toString(1)));
|
||||
Just.checkIterator(iterator, Pair.of(JObjectKey.of(Long.toString(1)), ByteString.copyFrom(new byte[]{2})), Pair.of(JObjectKey.of(Long.toString(2)), ByteString.copyFrom(new byte[]{3})), Pair.of(JObjectKey.of(Long.toString(3)), ByteString.copyFrom(new byte[]{4})));
|
||||
Just.checkIterator(iterator, Pair.of(JObjectKey.of(Long.toString(1)), ByteBuffer.wrap(new byte[]{2})), Pair.of(JObjectKey.of(Long.toString(2)), ByteBuffer.wrap(new byte[]{3})), Pair.of(JObjectKey.of(Long.toString(3)), ByteBuffer.wrap(new byte[]{4})));
|
||||
Assertions.assertFalse(iterator.hasNext());
|
||||
iterator.close();
|
||||
|
||||
iterator = snapshot.getIterator(IteratorStart.LE, JObjectKey.of(Long.toString(1)));
|
||||
Just.checkIterator(iterator, Pair.of(JObjectKey.of(Long.toString(1)), ByteString.copyFrom(new byte[]{2})), Pair.of(JObjectKey.of(Long.toString(2)), ByteString.copyFrom(new byte[]{3})), Pair.of(JObjectKey.of(Long.toString(3)), ByteString.copyFrom(new byte[]{4})));
|
||||
Just.checkIterator(iterator, Pair.of(JObjectKey.of(Long.toString(1)), ByteBuffer.wrap(new byte[]{2})), Pair.of(JObjectKey.of(Long.toString(2)), ByteBuffer.wrap(new byte[]{3})), Pair.of(JObjectKey.of(Long.toString(3)), ByteBuffer.wrap(new byte[]{4})));
|
||||
Assertions.assertFalse(iterator.hasNext());
|
||||
iterator.close();
|
||||
|
||||
@@ -101,7 +102,7 @@ public class LmdbKvIteratorTest {
|
||||
iterator.close();
|
||||
|
||||
iterator = snapshot.getIterator(IteratorStart.LE, JObjectKey.of(Long.toString(0)));
|
||||
Just.checkIterator(iterator, Pair.of(JObjectKey.of(Long.toString(1)), ByteString.copyFrom(new byte[]{2})), Pair.of(JObjectKey.of(Long.toString(2)), ByteString.copyFrom(new byte[]{3})), Pair.of(JObjectKey.of(Long.toString(3)), ByteString.copyFrom(new byte[]{4})));
|
||||
Just.checkIterator(iterator, Pair.of(JObjectKey.of(Long.toString(1)), ByteBuffer.wrap(new byte[]{2})), Pair.of(JObjectKey.of(Long.toString(2)), ByteBuffer.wrap(new byte[]{3})), Pair.of(JObjectKey.of(Long.toString(3)), ByteBuffer.wrap(new byte[]{4})));
|
||||
Assertions.assertFalse(iterator.hasNext());
|
||||
iterator.close();
|
||||
|
||||
@@ -111,11 +112,11 @@ public class LmdbKvIteratorTest {
|
||||
Assertions.assertEquals(JObjectKey.of(Long.toString(1)), iterator.peekPrevKey());
|
||||
Assertions.assertEquals(JObjectKey.of(Long.toString(2)), iterator.peekNextKey());
|
||||
Assertions.assertEquals(JObjectKey.of(Long.toString(1)), iterator.peekPrevKey());
|
||||
Just.checkIterator(iterator.reversed(), Pair.of(JObjectKey.of(Long.toString(1)), ByteString.copyFrom(new byte[]{2})));
|
||||
Just.checkIterator(iterator, Pair.of(JObjectKey.of(Long.toString(1)), ByteString.copyFrom(new byte[]{2})), Pair.of(JObjectKey.of(Long.toString(2)), ByteString.copyFrom(new byte[]{3})), Pair.of(JObjectKey.of(Long.toString(3)), ByteString.copyFrom(new byte[]{4})));
|
||||
Assertions.assertEquals(Pair.of(JObjectKey.of(Long.toString(3)), ByteString.copyFrom(new byte[]{4})), iterator.prev());
|
||||
Assertions.assertEquals(Pair.of(JObjectKey.of(Long.toString(2)), ByteString.copyFrom(new byte[]{3})), iterator.prev());
|
||||
Assertions.assertEquals(Pair.of(JObjectKey.of(Long.toString(2)), ByteString.copyFrom(new byte[]{3})), iterator.next());
|
||||
Just.checkIterator(iterator.reversed(), Pair.of(JObjectKey.of(Long.toString(1)), ByteBuffer.wrap(new byte[]{2})));
|
||||
Just.checkIterator(iterator, Pair.of(JObjectKey.of(Long.toString(1)), ByteBuffer.wrap(new byte[]{2})), Pair.of(JObjectKey.of(Long.toString(2)), ByteBuffer.wrap(new byte[]{3})), Pair.of(JObjectKey.of(Long.toString(3)), ByteBuffer.wrap(new byte[]{4})));
|
||||
Assertions.assertEquals(Pair.of(JObjectKey.of(Long.toString(3)), ByteBuffer.wrap(new byte[]{4})), iterator.prev());
|
||||
Assertions.assertEquals(Pair.of(JObjectKey.of(Long.toString(2)), ByteBuffer.wrap(new byte[]{3})), iterator.prev());
|
||||
Assertions.assertEquals(Pair.of(JObjectKey.of(Long.toString(2)), ByteBuffer.wrap(new byte[]{3})), iterator.next());
|
||||
iterator.close();
|
||||
}
|
||||
|
||||
|
||||
@@ -54,11 +54,11 @@ public class JKleppmannTreeManager {
|
||||
);
|
||||
curTx.put(data);
|
||||
var rootNode = new JKleppmannTreeNode(JObjectKey.of(name.value() + "_jt_root"), null, rootNodeSupplier.get());
|
||||
curTx.put(new JKleppmannTreeNodeHolder(rootNode));
|
||||
curTx.put(new JKleppmannTreeNodeHolder(rootNode, true));
|
||||
var trashNode = new JKleppmannTreeNode(JObjectKey.of(name.value() + "_jt_trash"), null, rootNodeSupplier.get());
|
||||
curTx.put(new JKleppmannTreeNodeHolder(trashNode));
|
||||
curTx.put(new JKleppmannTreeNodeHolder(trashNode, true));
|
||||
var lf_node = new JKleppmannTreeNode(JObjectKey.of(name.value() + "_jt_lf"), null, rootNodeSupplier.get());
|
||||
curTx.put(new JKleppmannTreeNodeHolder(lf_node));
|
||||
curTx.put(new JKleppmannTreeNodeHolder(lf_node, true));
|
||||
}
|
||||
return new JKleppmannTree(data);
|
||||
// opObjectRegistry.registerObject(tree);
|
||||
|
||||
@@ -18,6 +18,10 @@ public record JKleppmannTreeNodeHolder(PCollection<JDataRef> refsFrom, boolean f
|
||||
this(TreePSet.empty(), false, node);
|
||||
}
|
||||
|
||||
public JKleppmannTreeNodeHolder(JKleppmannTreeNode node, boolean frozen) {
|
||||
this(TreePSet.empty(), frozen, node);
|
||||
}
|
||||
|
||||
public JKleppmannTreeNodeHolder withNode(JKleppmannTreeNode node) {
|
||||
Objects.requireNonNull(node, "node");
|
||||
return new JKleppmannTreeNodeHolder(refsFrom, frozen, node);
|
||||
|
||||
@@ -19,8 +19,8 @@ public class RemoteObjPusherTxHook implements PreCommitTxHook {
|
||||
@Override
|
||||
public void onChange(JObjectKey key, JData old, JData cur) {
|
||||
boolean invalidate = switch (cur) {
|
||||
case RemoteObjectMeta remote -> !remote.changelog().equals(((RemoteObjectMeta) old).changelog());
|
||||
case JKleppmannTreePersistentData pd -> !pd.queues().equals(((JKleppmannTreePersistentData) old).queues());
|
||||
case RemoteObjectMeta remote -> remote.changelog() != ((RemoteObjectMeta) old).changelog();
|
||||
case JKleppmannTreePersistentData pd -> pd.queues() != ((JKleppmannTreePersistentData) old).queues();
|
||||
default -> false;
|
||||
};
|
||||
|
||||
|
||||
@@ -93,6 +93,11 @@ public class RemoteTransaction {
|
||||
curTx.put(newData);
|
||||
}
|
||||
|
||||
public <T extends JDataRemote> void putDataNew(T obj) {
|
||||
curTx.putNew(new RemoteObjectMeta(obj, persistentPeerDataService.getSelfUuid()));
|
||||
curTx.putNew(new RemoteObjectDataWrapper<>(obj));
|
||||
}
|
||||
|
||||
public <T extends JDataRemote> void putData(T obj) {
|
||||
var curMeta = getMeta(obj.key()).orElse(null);
|
||||
|
||||
|
||||
@@ -4,93 +4,52 @@ import jakarta.annotation.Nonnull;
|
||||
import jakarta.annotation.Nullable;
|
||||
|
||||
import java.lang.ref.Cleaner;
|
||||
import java.lang.ref.WeakReference;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
|
||||
public class DataLocker {
|
||||
private static final AutoCloseableNoThrow DUMMY_LOCK = () -> {
|
||||
};
|
||||
private final ConcurrentHashMap<Object, LockTag> _locks = new ConcurrentHashMap<>();
|
||||
private final ConcurrentHashMap<Object, WeakReference<ReentrantLock>> _locks = new ConcurrentHashMap<>();
|
||||
private static final Cleaner CLEANER = Cleaner.create();
|
||||
|
||||
private Lock getTag(Object data) {
|
||||
var newTag = new ReentrantLock();
|
||||
var newTagRef = new WeakReference<>(newTag);
|
||||
|
||||
while (true) {
|
||||
var oldTagRef = _locks.putIfAbsent(data, newTagRef);
|
||||
var oldTag = oldTagRef != null ? oldTagRef.get() : null;
|
||||
|
||||
if (oldTag == null && oldTagRef != null) {
|
||||
_locks.remove(data, oldTagRef);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (oldTag != null)
|
||||
return oldTag;
|
||||
|
||||
CLEANER.register(newTag, () -> {
|
||||
_locks.remove(data, newTagRef);
|
||||
});
|
||||
return newTag;
|
||||
}
|
||||
}
|
||||
|
||||
@Nonnull
|
||||
public AutoCloseableNoThrow lock(Object data) {
|
||||
while (true) {
|
||||
var newTag = new LockTag();
|
||||
var oldTag = _locks.putIfAbsent(data, newTag);
|
||||
if (oldTag == null) {
|
||||
return new Lock(data, newTag);
|
||||
}
|
||||
try {
|
||||
synchronized (oldTag) {
|
||||
while (!oldTag.released) {
|
||||
if (oldTag.owner == Thread.currentThread()) {
|
||||
return DUMMY_LOCK;
|
||||
}
|
||||
oldTag.wait();
|
||||
// tag.wait(4000L);
|
||||
// if (!tag.released) {
|
||||
// System.out.println("Timeout waiting for lock: " + data);
|
||||
// System.exit(1);
|
||||
// throw new InterruptedException();
|
||||
// }
|
||||
}
|
||||
}
|
||||
} catch (InterruptedException ignored) {
|
||||
}
|
||||
}
|
||||
var lock = getTag(data);
|
||||
lock.lock();
|
||||
return lock::unlock;
|
||||
}
|
||||
|
||||
@Nullable
|
||||
public AutoCloseableNoThrow tryLock(Object data) {
|
||||
while (true) {
|
||||
var newTag = new LockTag();
|
||||
var oldTag = _locks.putIfAbsent(data, newTag);
|
||||
if (oldTag == null) {
|
||||
return new Lock(data, newTag);
|
||||
}
|
||||
synchronized (oldTag) {
|
||||
if (!oldTag.released) {
|
||||
if (oldTag.owner == Thread.currentThread()) {
|
||||
return DUMMY_LOCK;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}
|
||||
var lock = getTag(data);
|
||||
if (lock.tryLock()) {
|
||||
return lock::unlock;
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
private static class LockTag {
|
||||
final Thread owner = Thread.currentThread();
|
||||
// final StackTraceElement[] _creationStack = Thread.currentThread().getStackTrace();
|
||||
boolean released = false;
|
||||
}
|
||||
|
||||
private class Lock implements AutoCloseableNoThrow {
|
||||
private static final Cleaner CLEANER = Cleaner.create();
|
||||
private final Object _key;
|
||||
private final LockTag _tag;
|
||||
|
||||
public Lock(Object key, LockTag tag) {
|
||||
_key = key;
|
||||
_tag = tag;
|
||||
// CLEANER.register(this, () -> {
|
||||
// if (!tag.released) {
|
||||
// Log.error("Lock collected without release: " + key);
|
||||
// }
|
||||
// });
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
synchronized (_tag) {
|
||||
if (_tag.released)
|
||||
return;
|
||||
_tag.released = true;
|
||||
// Notify all because when the object is locked again,
|
||||
// it's a different lock tag
|
||||
_tag.notifyAll();
|
||||
_locks.remove(_key, _tag);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -0,0 +1,41 @@
|
||||
package com.usatiuk.utils;
|
||||
|
||||
import jdk.internal.access.JavaNioAccess;
|
||||
import jdk.internal.access.SharedSecrets;
|
||||
import sun.misc.Unsafe;
|
||||
|
||||
import java.lang.reflect.Field;
|
||||
|
||||
public class UnsafeAccessor {
|
||||
private static final UnsafeAccessor INSTANCE;
|
||||
|
||||
static {
|
||||
try {
|
||||
INSTANCE = new UnsafeAccessor();
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
public static UnsafeAccessor get() {
|
||||
return INSTANCE;
|
||||
}
|
||||
|
||||
private JavaNioAccess _nioAccess;
|
||||
private Unsafe _unsafe;
|
||||
|
||||
private UnsafeAccessor() throws NoSuchFieldException, IllegalAccessException {
|
||||
_nioAccess = SharedSecrets.getJavaNioAccess();
|
||||
Field f = Unsafe.class.getDeclaredField("theUnsafe");
|
||||
f.setAccessible(true);
|
||||
_unsafe = (Unsafe) f.get(null);
|
||||
}
|
||||
|
||||
public JavaNioAccess getNioAccess() {
|
||||
return _nioAccess;
|
||||
}
|
||||
|
||||
public Unsafe getUnsafe() {
|
||||
return _unsafe;
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user