mirror of
https://github.com/usatiuk/dhfs.git
synced 2025-10-28 20:47:49 +01:00
Compare commits
6 Commits
81703a9406
...
7aa968a569
| Author | SHA1 | Date | |
|---|---|---|---|
| 7aa968a569 | |||
| e348c39be1 | |||
| 1b54830651 | |||
| bc5f0b816c | |||
| 9ff914bdaa | |||
| 1cee6f62b8 |
@@ -83,7 +83,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
|||||||
|
|
||||||
private ChunkData createChunk(ByteString bytes) {
|
private ChunkData createChunk(ByteString bytes) {
|
||||||
var newChunk = new ChunkData(JObjectKey.of(UUID.randomUUID().toString()), bytes);
|
var newChunk = new ChunkData(JObjectKey.of(UUID.randomUUID().toString()), bytes);
|
||||||
remoteTx.putData(newChunk);
|
remoteTx.putDataNew(newChunk);
|
||||||
return newChunk;
|
return newChunk;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -7,6 +7,7 @@ import com.usatiuk.dhfsfs.service.DirectoryNotEmptyException;
|
|||||||
import com.usatiuk.dhfsfs.service.GetattrRes;
|
import com.usatiuk.dhfsfs.service.GetattrRes;
|
||||||
import com.usatiuk.kleppmanntree.AlreadyExistsException;
|
import com.usatiuk.kleppmanntree.AlreadyExistsException;
|
||||||
import com.usatiuk.objects.JObjectKey;
|
import com.usatiuk.objects.JObjectKey;
|
||||||
|
import com.usatiuk.utils.UnsafeAccessor;
|
||||||
import io.grpc.Status;
|
import io.grpc.Status;
|
||||||
import io.grpc.StatusRuntimeException;
|
import io.grpc.StatusRuntimeException;
|
||||||
import io.quarkus.logging.Log;
|
import io.quarkus.logging.Log;
|
||||||
@@ -51,8 +52,6 @@ public class DhfsFuse extends FuseStubFS {
|
|||||||
@ConfigProperty(name = "dhfs.files.target_chunk_size")
|
@ConfigProperty(name = "dhfs.files.target_chunk_size")
|
||||||
int targetChunkSize;
|
int targetChunkSize;
|
||||||
@Inject
|
@Inject
|
||||||
JnrPtrByteOutputAccessors jnrPtrByteOutputAccessors;
|
|
||||||
@Inject
|
|
||||||
DhfsFileService fileService;
|
DhfsFileService fileService;
|
||||||
|
|
||||||
private long allocateHandle(JObjectKey key) {
|
private long allocateHandle(JObjectKey key) {
|
||||||
@@ -231,7 +230,7 @@ public class DhfsFuse extends FuseStubFS {
|
|||||||
var fileKey = getFromHandle(fi.fh.get());
|
var fileKey = getFromHandle(fi.fh.get());
|
||||||
var read = fileService.read(fileKey, offset, (int) size);
|
var read = fileService.read(fileKey, offset, (int) size);
|
||||||
if (read.isEmpty()) return 0;
|
if (read.isEmpty()) return 0;
|
||||||
UnsafeByteOperations.unsafeWriteTo(read, new JnrPtrByteOutput(jnrPtrByteOutputAccessors, buf, size));
|
UnsafeByteOperations.unsafeWriteTo(read, new JnrPtrByteOutput(buf, size));
|
||||||
return read.size();
|
return read.size();
|
||||||
} catch (Throwable e) {
|
} catch (Throwable e) {
|
||||||
Log.error("When reading " + path, e);
|
Log.error("When reading " + path, e);
|
||||||
@@ -246,19 +245,15 @@ public class DhfsFuse extends FuseStubFS {
|
|||||||
var fileKey = getFromHandle(fi.fh.get());
|
var fileKey = getFromHandle(fi.fh.get());
|
||||||
var buffer = ByteBuffer.allocateDirect((int) size);
|
var buffer = ByteBuffer.allocateDirect((int) size);
|
||||||
|
|
||||||
if (buffer.isDirect()) {
|
UnsafeAccessor.get().getUnsafe().copyMemory(
|
||||||
jnrPtrByteOutputAccessors.getUnsafe().copyMemory(
|
buf.address(),
|
||||||
buf.address(),
|
UnsafeAccessor.get().getNioAccess().getBufferAddress(buffer),
|
||||||
jnrPtrByteOutputAccessors.getNioAccess().getBufferAddress(buffer),
|
size
|
||||||
size
|
);
|
||||||
);
|
|
||||||
} else {
|
|
||||||
buf.get(0, buffer.array(), 0, (int) size);
|
|
||||||
}
|
|
||||||
|
|
||||||
var written = fileService.write(fileKey, offset, UnsafeByteOperations.unsafeWrap(buffer));
|
var written = fileService.write(fileKey, offset, UnsafeByteOperations.unsafeWrap(buffer));
|
||||||
return written.intValue();
|
return written.intValue();
|
||||||
} catch (Throwable e) {
|
} catch (Exception e) {
|
||||||
Log.error("When writing " + path, e);
|
Log.error("When writing " + path, e);
|
||||||
return -ErrorCodes.EIO();
|
return -ErrorCodes.EIO();
|
||||||
}
|
}
|
||||||
@@ -394,7 +389,7 @@ public class DhfsFuse extends FuseStubFS {
|
|||||||
var file = fileOpt.get();
|
var file = fileOpt.get();
|
||||||
var read = fileService.readlinkBS(fileOpt.get());
|
var read = fileService.readlinkBS(fileOpt.get());
|
||||||
if (read.isEmpty()) return 0;
|
if (read.isEmpty()) return 0;
|
||||||
UnsafeByteOperations.unsafeWriteTo(read, new JnrPtrByteOutput(jnrPtrByteOutputAccessors, buf, size));
|
UnsafeByteOperations.unsafeWriteTo(read, new JnrPtrByteOutput(buf, size));
|
||||||
buf.putByte(Math.min(size - 1, read.size()), (byte) 0);
|
buf.putByte(Math.min(size - 1, read.size()), (byte) 0);
|
||||||
return 0;
|
return 0;
|
||||||
} catch (Throwable e) {
|
} catch (Throwable e) {
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
package com.usatiuk.dhfsfuse;
|
package com.usatiuk.dhfsfuse;
|
||||||
|
|
||||||
import com.google.protobuf.ByteOutput;
|
import com.google.protobuf.ByteOutput;
|
||||||
|
import com.usatiuk.utils.UnsafeAccessor;
|
||||||
import jnr.ffi.Pointer;
|
import jnr.ffi.Pointer;
|
||||||
|
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
@@ -9,14 +10,12 @@ import java.nio.MappedByteBuffer;
|
|||||||
public class JnrPtrByteOutput extends ByteOutput {
|
public class JnrPtrByteOutput extends ByteOutput {
|
||||||
private final Pointer _backing;
|
private final Pointer _backing;
|
||||||
private final long _size;
|
private final long _size;
|
||||||
private final JnrPtrByteOutputAccessors _accessors;
|
|
||||||
private long _pos;
|
private long _pos;
|
||||||
|
|
||||||
public JnrPtrByteOutput(JnrPtrByteOutputAccessors accessors, Pointer backing, long size) {
|
public JnrPtrByteOutput(Pointer backing, long size) {
|
||||||
_backing = backing;
|
_backing = backing;
|
||||||
_size = size;
|
_size = size;
|
||||||
_pos = 0;
|
_pos = 0;
|
||||||
_accessors = accessors;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@@ -47,9 +46,9 @@ public class JnrPtrByteOutput extends ByteOutput {
|
|||||||
if (value instanceof MappedByteBuffer mb) {
|
if (value instanceof MappedByteBuffer mb) {
|
||||||
mb.load();
|
mb.load();
|
||||||
}
|
}
|
||||||
long addr = _accessors.getNioAccess().getBufferAddress(value) + value.position();
|
long addr = UnsafeAccessor.get().getNioAccess().getBufferAddress(value) + value.position();
|
||||||
var out = _backing.address() + _pos;
|
var out = _backing.address() + _pos;
|
||||||
_accessors.getUnsafe().copyMemory(addr, out, rem);
|
UnsafeAccessor.get().getUnsafe().copyMemory(addr, out, rem);
|
||||||
} else {
|
} else {
|
||||||
_backing.put(_pos, value.array(), value.arrayOffset() + value.position(), rem);
|
_backing.put(_pos, value.array(), value.arrayOffset() + value.position(), rem);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,29 +0,0 @@
|
|||||||
package com.usatiuk.dhfsfuse;
|
|
||||||
|
|
||||||
import jakarta.inject.Singleton;
|
|
||||||
import jdk.internal.access.JavaNioAccess;
|
|
||||||
import jdk.internal.access.SharedSecrets;
|
|
||||||
import sun.misc.Unsafe;
|
|
||||||
|
|
||||||
import java.lang.reflect.Field;
|
|
||||||
|
|
||||||
@Singleton
|
|
||||||
class JnrPtrByteOutputAccessors {
|
|
||||||
JavaNioAccess _nioAccess;
|
|
||||||
Unsafe _unsafe;
|
|
||||||
|
|
||||||
JnrPtrByteOutputAccessors() throws NoSuchFieldException, IllegalAccessException {
|
|
||||||
_nioAccess = SharedSecrets.getJavaNioAccess();
|
|
||||||
Field f = Unsafe.class.getDeclaredField("theUnsafe");
|
|
||||||
f.setAccessible(true);
|
|
||||||
_unsafe = (Unsafe) f.get(null);
|
|
||||||
}
|
|
||||||
|
|
||||||
public JavaNioAccess getNioAccess() {
|
|
||||||
return _nioAccess;
|
|
||||||
}
|
|
||||||
|
|
||||||
public Unsafe getUnsafe() {
|
|
||||||
return _unsafe;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -43,4 +43,9 @@ public class CurrentTransaction implements Transaction {
|
|||||||
public <T extends JData> void put(JData obj) {
|
public <T extends JData> void put(JData obj) {
|
||||||
transactionManager.current().put(obj);
|
transactionManager.current().put(obj);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public <T extends JData> void putNew(JData obj) {
|
||||||
|
transactionManager.current().putNew(obj);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -165,7 +165,8 @@ public class JObjectManager {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
for (var write : writes.entrySet()) {
|
for (var write : writes.entrySet()) {
|
||||||
toLock.add(write.getKey());
|
if (!readSet.containsKey(write.getKey()))
|
||||||
|
toLock.add(write.getKey());
|
||||||
}
|
}
|
||||||
Collections.sort(toLock);
|
Collections.sort(toLock);
|
||||||
for (var key : toLock) {
|
for (var key : toLock) {
|
||||||
|
|||||||
@@ -14,6 +14,7 @@ public interface Transaction extends TransactionHandle {
|
|||||||
<T extends JData> Optional<T> get(Class<T> type, JObjectKey key, LockingStrategy strategy);
|
<T extends JData> Optional<T> get(Class<T> type, JObjectKey key, LockingStrategy strategy);
|
||||||
|
|
||||||
<T extends JData> void put(JData obj);
|
<T extends JData> void put(JData obj);
|
||||||
|
<T extends JData> void putNew(JData obj);
|
||||||
|
|
||||||
void delete(JObjectKey key);
|
void delete(JObjectKey key);
|
||||||
|
|
||||||
|
|||||||
@@ -59,6 +59,7 @@ public class TransactionFactoryImpl implements TransactionFactory {
|
|||||||
private final NavigableMap<JObjectKey, TxRecord.TxObjectRecord<?>> _writes = new TreeMap<>();
|
private final NavigableMap<JObjectKey, TxRecord.TxObjectRecord<?>> _writes = new TreeMap<>();
|
||||||
private final List<Runnable> _onCommit = new ArrayList<>();
|
private final List<Runnable> _onCommit = new ArrayList<>();
|
||||||
private final List<Runnable> _onFlush = new ArrayList<>();
|
private final List<Runnable> _onFlush = new ArrayList<>();
|
||||||
|
private final HashSet<JObjectKey> _knownNew = new HashSet<>();
|
||||||
private final Snapshot<JObjectKey, JDataVersionedWrapper> _snapshot;
|
private final Snapshot<JObjectKey, JDataVersionedWrapper> _snapshot;
|
||||||
private boolean _closed = false;
|
private boolean _closed = false;
|
||||||
private Map<JObjectKey, TxRecord.TxObjectRecord<?>> _newWrites = new HashMap<>();
|
private Map<JObjectKey, TxRecord.TxObjectRecord<?>> _newWrites = new HashMap<>();
|
||||||
@@ -94,6 +95,9 @@ public class TransactionFactoryImpl implements TransactionFactory {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public <T extends JData> Optional<T> getFromSource(Class<T> type, JObjectKey key) {
|
public <T extends JData> Optional<T> getFromSource(Class<T> type, JObjectKey key) {
|
||||||
|
if (_knownNew.contains(key)) {
|
||||||
|
return Optional.empty();
|
||||||
|
}
|
||||||
return _readSet.computeIfAbsent(key, k -> {
|
return _readSet.computeIfAbsent(key, k -> {
|
||||||
var read = _snapshot.readObject(k);
|
var read = _snapshot.readObject(k);
|
||||||
return new TransactionObjectNoLock<>(read);
|
return new TransactionObjectNoLock<>(read);
|
||||||
@@ -181,6 +185,14 @@ public class TransactionFactoryImpl implements TransactionFactory {
|
|||||||
_newWrites.put(obj.key(), new TxRecord.TxObjectRecordWrite<>(obj));
|
_newWrites.put(obj.key(), new TxRecord.TxObjectRecordWrite<>(obj));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void putNew(JData obj) {
|
||||||
|
_knownNew.add(obj.key());
|
||||||
|
|
||||||
|
_writes.put(obj.key(), new TxRecord.TxObjectRecordWrite<>(obj));
|
||||||
|
_newWrites.put(obj.key(), new TxRecord.TxObjectRecordWrite<>(obj));
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Collection<TxRecord.TxObjectRecord<?>> drainNewWrites() {
|
public Collection<TxRecord.TxObjectRecord<?>> drainNewWrites() {
|
||||||
var ret = _newWrites;
|
var ret = _newWrites;
|
||||||
|
|||||||
@@ -93,6 +93,11 @@ public class RemoteTransaction {
|
|||||||
curTx.put(newData);
|
curTx.put(newData);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public <T extends JDataRemote> void putDataNew(T obj) {
|
||||||
|
curTx.putNew(new RemoteObjectMeta(obj, persistentPeerDataService.getSelfUuid()));
|
||||||
|
curTx.putNew(new RemoteObjectDataWrapper<>(obj));
|
||||||
|
}
|
||||||
|
|
||||||
public <T extends JDataRemote> void putData(T obj) {
|
public <T extends JDataRemote> void putData(T obj) {
|
||||||
var curMeta = getMeta(obj.key()).orElse(null);
|
var curMeta = getMeta(obj.key()).orElse(null);
|
||||||
|
|
||||||
|
|||||||
@@ -4,93 +4,52 @@ import jakarta.annotation.Nonnull;
|
|||||||
import jakarta.annotation.Nullable;
|
import jakarta.annotation.Nullable;
|
||||||
|
|
||||||
import java.lang.ref.Cleaner;
|
import java.lang.ref.Cleaner;
|
||||||
|
import java.lang.ref.WeakReference;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.concurrent.locks.Lock;
|
||||||
|
import java.util.concurrent.locks.ReentrantLock;
|
||||||
|
|
||||||
public class DataLocker {
|
public class DataLocker {
|
||||||
private static final AutoCloseableNoThrow DUMMY_LOCK = () -> {
|
private final ConcurrentHashMap<Object, WeakReference<ReentrantLock>> _locks = new ConcurrentHashMap<>();
|
||||||
};
|
private static final Cleaner CLEANER = Cleaner.create();
|
||||||
private final ConcurrentHashMap<Object, LockTag> _locks = new ConcurrentHashMap<>();
|
|
||||||
|
private Lock getTag(Object data) {
|
||||||
|
var newTag = new ReentrantLock();
|
||||||
|
var newTagRef = new WeakReference<>(newTag);
|
||||||
|
|
||||||
|
while (true) {
|
||||||
|
var oldTagRef = _locks.putIfAbsent(data, newTagRef);
|
||||||
|
var oldTag = oldTagRef != null ? oldTagRef.get() : null;
|
||||||
|
|
||||||
|
if (oldTag == null && oldTagRef != null) {
|
||||||
|
_locks.remove(data, oldTagRef);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (oldTag != null)
|
||||||
|
return oldTag;
|
||||||
|
|
||||||
|
CLEANER.register(newTag, () -> {
|
||||||
|
_locks.remove(data, newTagRef);
|
||||||
|
});
|
||||||
|
return newTag;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Nonnull
|
@Nonnull
|
||||||
public AutoCloseableNoThrow lock(Object data) {
|
public AutoCloseableNoThrow lock(Object data) {
|
||||||
while (true) {
|
var lock = getTag(data);
|
||||||
var newTag = new LockTag();
|
lock.lock();
|
||||||
var oldTag = _locks.putIfAbsent(data, newTag);
|
return lock::unlock;
|
||||||
if (oldTag == null) {
|
|
||||||
return new Lock(data, newTag);
|
|
||||||
}
|
|
||||||
try {
|
|
||||||
synchronized (oldTag) {
|
|
||||||
while (!oldTag.released) {
|
|
||||||
if (oldTag.owner == Thread.currentThread()) {
|
|
||||||
return DUMMY_LOCK;
|
|
||||||
}
|
|
||||||
oldTag.wait();
|
|
||||||
// tag.wait(4000L);
|
|
||||||
// if (!tag.released) {
|
|
||||||
// System.out.println("Timeout waiting for lock: " + data);
|
|
||||||
// System.exit(1);
|
|
||||||
// throw new InterruptedException();
|
|
||||||
// }
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} catch (InterruptedException ignored) {
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Nullable
|
@Nullable
|
||||||
public AutoCloseableNoThrow tryLock(Object data) {
|
public AutoCloseableNoThrow tryLock(Object data) {
|
||||||
while (true) {
|
var lock = getTag(data);
|
||||||
var newTag = new LockTag();
|
if (lock.tryLock()) {
|
||||||
var oldTag = _locks.putIfAbsent(data, newTag);
|
return lock::unlock;
|
||||||
if (oldTag == null) {
|
} else {
|
||||||
return new Lock(data, newTag);
|
return null;
|
||||||
}
|
|
||||||
synchronized (oldTag) {
|
|
||||||
if (!oldTag.released) {
|
|
||||||
if (oldTag.owner == Thread.currentThread()) {
|
|
||||||
return DUMMY_LOCK;
|
|
||||||
}
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static class LockTag {
|
|
||||||
final Thread owner = Thread.currentThread();
|
|
||||||
// final StackTraceElement[] _creationStack = Thread.currentThread().getStackTrace();
|
|
||||||
boolean released = false;
|
|
||||||
}
|
|
||||||
|
|
||||||
private class Lock implements AutoCloseableNoThrow {
|
|
||||||
private static final Cleaner CLEANER = Cleaner.create();
|
|
||||||
private final Object _key;
|
|
||||||
private final LockTag _tag;
|
|
||||||
|
|
||||||
public Lock(Object key, LockTag tag) {
|
|
||||||
_key = key;
|
|
||||||
_tag = tag;
|
|
||||||
// CLEANER.register(this, () -> {
|
|
||||||
// if (!tag.released) {
|
|
||||||
// Log.error("Lock collected without release: " + key);
|
|
||||||
// }
|
|
||||||
// });
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void close() {
|
|
||||||
synchronized (_tag) {
|
|
||||||
if (_tag.released)
|
|
||||||
return;
|
|
||||||
_tag.released = true;
|
|
||||||
// Notify all because when the object is locked again,
|
|
||||||
// it's a different lock tag
|
|
||||||
_tag.notifyAll();
|
|
||||||
_locks.remove(_key, _tag);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,41 @@
|
|||||||
|
package com.usatiuk.utils;
|
||||||
|
|
||||||
|
import jdk.internal.access.JavaNioAccess;
|
||||||
|
import jdk.internal.access.SharedSecrets;
|
||||||
|
import sun.misc.Unsafe;
|
||||||
|
|
||||||
|
import java.lang.reflect.Field;
|
||||||
|
|
||||||
|
public class UnsafeAccessor {
|
||||||
|
private static final UnsafeAccessor INSTANCE;
|
||||||
|
|
||||||
|
static {
|
||||||
|
try {
|
||||||
|
INSTANCE = new UnsafeAccessor();
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static UnsafeAccessor get() {
|
||||||
|
return INSTANCE;
|
||||||
|
}
|
||||||
|
|
||||||
|
private JavaNioAccess _nioAccess;
|
||||||
|
private Unsafe _unsafe;
|
||||||
|
|
||||||
|
private UnsafeAccessor() throws NoSuchFieldException, IllegalAccessException {
|
||||||
|
_nioAccess = SharedSecrets.getJavaNioAccess();
|
||||||
|
Field f = Unsafe.class.getDeclaredField("theUnsafe");
|
||||||
|
f.setAccessible(true);
|
||||||
|
_unsafe = (Unsafe) f.get(null);
|
||||||
|
}
|
||||||
|
|
||||||
|
public JavaNioAccess getNioAccess() {
|
||||||
|
return _nioAccess;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Unsafe getUnsafe() {
|
||||||
|
return _unsafe;
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user