6 Commits

Author SHA1 Message Date
7aa968a569 Dhfs-fuse: fix import 2025-04-29 00:45:34 +02:00
e348c39be1 Utils: add UnsafeAccessor to JnrPtrByteOutput
oops
2025-04-28 23:50:31 +02:00
1b54830651 Objects: don't lock some objects twice for no reason 2025-04-28 23:49:45 +02:00
bc5f0b816c Objects: add putNew
to avoid searching for nonexistent objects
2025-04-28 23:47:53 +02:00
9ff914bdaa Utils: move UnsafeAccessor to utils 2025-04-28 23:36:42 +02:00
1cee6f62b8 Utils: less dumb DataLocker 2025-04-28 23:34:30 +02:00
11 changed files with 116 additions and 127 deletions

View File

@@ -83,7 +83,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
private ChunkData createChunk(ByteString bytes) { private ChunkData createChunk(ByteString bytes) {
var newChunk = new ChunkData(JObjectKey.of(UUID.randomUUID().toString()), bytes); var newChunk = new ChunkData(JObjectKey.of(UUID.randomUUID().toString()), bytes);
remoteTx.putData(newChunk); remoteTx.putDataNew(newChunk);
return newChunk; return newChunk;
} }

View File

@@ -7,6 +7,7 @@ import com.usatiuk.dhfsfs.service.DirectoryNotEmptyException;
import com.usatiuk.dhfsfs.service.GetattrRes; import com.usatiuk.dhfsfs.service.GetattrRes;
import com.usatiuk.kleppmanntree.AlreadyExistsException; import com.usatiuk.kleppmanntree.AlreadyExistsException;
import com.usatiuk.objects.JObjectKey; import com.usatiuk.objects.JObjectKey;
import com.usatiuk.utils.UnsafeAccessor;
import io.grpc.Status; import io.grpc.Status;
import io.grpc.StatusRuntimeException; import io.grpc.StatusRuntimeException;
import io.quarkus.logging.Log; import io.quarkus.logging.Log;
@@ -51,8 +52,6 @@ public class DhfsFuse extends FuseStubFS {
@ConfigProperty(name = "dhfs.files.target_chunk_size") @ConfigProperty(name = "dhfs.files.target_chunk_size")
int targetChunkSize; int targetChunkSize;
@Inject @Inject
JnrPtrByteOutputAccessors jnrPtrByteOutputAccessors;
@Inject
DhfsFileService fileService; DhfsFileService fileService;
private long allocateHandle(JObjectKey key) { private long allocateHandle(JObjectKey key) {
@@ -231,7 +230,7 @@ public class DhfsFuse extends FuseStubFS {
var fileKey = getFromHandle(fi.fh.get()); var fileKey = getFromHandle(fi.fh.get());
var read = fileService.read(fileKey, offset, (int) size); var read = fileService.read(fileKey, offset, (int) size);
if (read.isEmpty()) return 0; if (read.isEmpty()) return 0;
UnsafeByteOperations.unsafeWriteTo(read, new JnrPtrByteOutput(jnrPtrByteOutputAccessors, buf, size)); UnsafeByteOperations.unsafeWriteTo(read, new JnrPtrByteOutput(buf, size));
return read.size(); return read.size();
} catch (Throwable e) { } catch (Throwable e) {
Log.error("When reading " + path, e); Log.error("When reading " + path, e);
@@ -246,19 +245,15 @@ public class DhfsFuse extends FuseStubFS {
var fileKey = getFromHandle(fi.fh.get()); var fileKey = getFromHandle(fi.fh.get());
var buffer = ByteBuffer.allocateDirect((int) size); var buffer = ByteBuffer.allocateDirect((int) size);
if (buffer.isDirect()) { UnsafeAccessor.get().getUnsafe().copyMemory(
jnrPtrByteOutputAccessors.getUnsafe().copyMemory( buf.address(),
buf.address(), UnsafeAccessor.get().getNioAccess().getBufferAddress(buffer),
jnrPtrByteOutputAccessors.getNioAccess().getBufferAddress(buffer), size
size );
);
} else {
buf.get(0, buffer.array(), 0, (int) size);
}
var written = fileService.write(fileKey, offset, UnsafeByteOperations.unsafeWrap(buffer)); var written = fileService.write(fileKey, offset, UnsafeByteOperations.unsafeWrap(buffer));
return written.intValue(); return written.intValue();
} catch (Throwable e) { } catch (Exception e) {
Log.error("When writing " + path, e); Log.error("When writing " + path, e);
return -ErrorCodes.EIO(); return -ErrorCodes.EIO();
} }
@@ -394,7 +389,7 @@ public class DhfsFuse extends FuseStubFS {
var file = fileOpt.get(); var file = fileOpt.get();
var read = fileService.readlinkBS(fileOpt.get()); var read = fileService.readlinkBS(fileOpt.get());
if (read.isEmpty()) return 0; if (read.isEmpty()) return 0;
UnsafeByteOperations.unsafeWriteTo(read, new JnrPtrByteOutput(jnrPtrByteOutputAccessors, buf, size)); UnsafeByteOperations.unsafeWriteTo(read, new JnrPtrByteOutput(buf, size));
buf.putByte(Math.min(size - 1, read.size()), (byte) 0); buf.putByte(Math.min(size - 1, read.size()), (byte) 0);
return 0; return 0;
} catch (Throwable e) { } catch (Throwable e) {

View File

@@ -1,6 +1,7 @@
package com.usatiuk.dhfsfuse; package com.usatiuk.dhfsfuse;
import com.google.protobuf.ByteOutput; import com.google.protobuf.ByteOutput;
import com.usatiuk.utils.UnsafeAccessor;
import jnr.ffi.Pointer; import jnr.ffi.Pointer;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
@@ -9,14 +10,12 @@ import java.nio.MappedByteBuffer;
public class JnrPtrByteOutput extends ByteOutput { public class JnrPtrByteOutput extends ByteOutput {
private final Pointer _backing; private final Pointer _backing;
private final long _size; private final long _size;
private final JnrPtrByteOutputAccessors _accessors;
private long _pos; private long _pos;
public JnrPtrByteOutput(JnrPtrByteOutputAccessors accessors, Pointer backing, long size) { public JnrPtrByteOutput(Pointer backing, long size) {
_backing = backing; _backing = backing;
_size = size; _size = size;
_pos = 0; _pos = 0;
_accessors = accessors;
} }
@Override @Override
@@ -47,9 +46,9 @@ public class JnrPtrByteOutput extends ByteOutput {
if (value instanceof MappedByteBuffer mb) { if (value instanceof MappedByteBuffer mb) {
mb.load(); mb.load();
} }
long addr = _accessors.getNioAccess().getBufferAddress(value) + value.position(); long addr = UnsafeAccessor.get().getNioAccess().getBufferAddress(value) + value.position();
var out = _backing.address() + _pos; var out = _backing.address() + _pos;
_accessors.getUnsafe().copyMemory(addr, out, rem); UnsafeAccessor.get().getUnsafe().copyMemory(addr, out, rem);
} else { } else {
_backing.put(_pos, value.array(), value.arrayOffset() + value.position(), rem); _backing.put(_pos, value.array(), value.arrayOffset() + value.position(), rem);
} }

View File

@@ -1,29 +0,0 @@
package com.usatiuk.dhfsfuse;
import jakarta.inject.Singleton;
import jdk.internal.access.JavaNioAccess;
import jdk.internal.access.SharedSecrets;
import sun.misc.Unsafe;
import java.lang.reflect.Field;
@Singleton
class JnrPtrByteOutputAccessors {
JavaNioAccess _nioAccess;
Unsafe _unsafe;
JnrPtrByteOutputAccessors() throws NoSuchFieldException, IllegalAccessException {
_nioAccess = SharedSecrets.getJavaNioAccess();
Field f = Unsafe.class.getDeclaredField("theUnsafe");
f.setAccessible(true);
_unsafe = (Unsafe) f.get(null);
}
public JavaNioAccess getNioAccess() {
return _nioAccess;
}
public Unsafe getUnsafe() {
return _unsafe;
}
}

View File

@@ -43,4 +43,9 @@ public class CurrentTransaction implements Transaction {
public <T extends JData> void put(JData obj) { public <T extends JData> void put(JData obj) {
transactionManager.current().put(obj); transactionManager.current().put(obj);
} }
@Override
public <T extends JData> void putNew(JData obj) {
transactionManager.current().putNew(obj);
}
} }

View File

@@ -165,7 +165,8 @@ public class JObjectManager {
} }
} }
for (var write : writes.entrySet()) { for (var write : writes.entrySet()) {
toLock.add(write.getKey()); if (!readSet.containsKey(write.getKey()))
toLock.add(write.getKey());
} }
Collections.sort(toLock); Collections.sort(toLock);
for (var key : toLock) { for (var key : toLock) {

View File

@@ -14,6 +14,7 @@ public interface Transaction extends TransactionHandle {
<T extends JData> Optional<T> get(Class<T> type, JObjectKey key, LockingStrategy strategy); <T extends JData> Optional<T> get(Class<T> type, JObjectKey key, LockingStrategy strategy);
<T extends JData> void put(JData obj); <T extends JData> void put(JData obj);
<T extends JData> void putNew(JData obj);
void delete(JObjectKey key); void delete(JObjectKey key);

View File

@@ -59,6 +59,7 @@ public class TransactionFactoryImpl implements TransactionFactory {
private final NavigableMap<JObjectKey, TxRecord.TxObjectRecord<?>> _writes = new TreeMap<>(); private final NavigableMap<JObjectKey, TxRecord.TxObjectRecord<?>> _writes = new TreeMap<>();
private final List<Runnable> _onCommit = new ArrayList<>(); private final List<Runnable> _onCommit = new ArrayList<>();
private final List<Runnable> _onFlush = new ArrayList<>(); private final List<Runnable> _onFlush = new ArrayList<>();
private final HashSet<JObjectKey> _knownNew = new HashSet<>();
private final Snapshot<JObjectKey, JDataVersionedWrapper> _snapshot; private final Snapshot<JObjectKey, JDataVersionedWrapper> _snapshot;
private boolean _closed = false; private boolean _closed = false;
private Map<JObjectKey, TxRecord.TxObjectRecord<?>> _newWrites = new HashMap<>(); private Map<JObjectKey, TxRecord.TxObjectRecord<?>> _newWrites = new HashMap<>();
@@ -94,6 +95,9 @@ public class TransactionFactoryImpl implements TransactionFactory {
@Override @Override
public <T extends JData> Optional<T> getFromSource(Class<T> type, JObjectKey key) { public <T extends JData> Optional<T> getFromSource(Class<T> type, JObjectKey key) {
if (_knownNew.contains(key)) {
return Optional.empty();
}
return _readSet.computeIfAbsent(key, k -> { return _readSet.computeIfAbsent(key, k -> {
var read = _snapshot.readObject(k); var read = _snapshot.readObject(k);
return new TransactionObjectNoLock<>(read); return new TransactionObjectNoLock<>(read);
@@ -181,6 +185,14 @@ public class TransactionFactoryImpl implements TransactionFactory {
_newWrites.put(obj.key(), new TxRecord.TxObjectRecordWrite<>(obj)); _newWrites.put(obj.key(), new TxRecord.TxObjectRecordWrite<>(obj));
} }
@Override
public void putNew(JData obj) {
_knownNew.add(obj.key());
_writes.put(obj.key(), new TxRecord.TxObjectRecordWrite<>(obj));
_newWrites.put(obj.key(), new TxRecord.TxObjectRecordWrite<>(obj));
}
@Override @Override
public Collection<TxRecord.TxObjectRecord<?>> drainNewWrites() { public Collection<TxRecord.TxObjectRecord<?>> drainNewWrites() {
var ret = _newWrites; var ret = _newWrites;

View File

@@ -93,6 +93,11 @@ public class RemoteTransaction {
curTx.put(newData); curTx.put(newData);
} }
public <T extends JDataRemote> void putDataNew(T obj) {
curTx.putNew(new RemoteObjectMeta(obj, persistentPeerDataService.getSelfUuid()));
curTx.putNew(new RemoteObjectDataWrapper<>(obj));
}
public <T extends JDataRemote> void putData(T obj) { public <T extends JDataRemote> void putData(T obj) {
var curMeta = getMeta(obj.key()).orElse(null); var curMeta = getMeta(obj.key()).orElse(null);

View File

@@ -4,93 +4,52 @@ import jakarta.annotation.Nonnull;
import jakarta.annotation.Nullable; import jakarta.annotation.Nullable;
import java.lang.ref.Cleaner; import java.lang.ref.Cleaner;
import java.lang.ref.WeakReference;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class DataLocker { public class DataLocker {
private static final AutoCloseableNoThrow DUMMY_LOCK = () -> { private final ConcurrentHashMap<Object, WeakReference<ReentrantLock>> _locks = new ConcurrentHashMap<>();
}; private static final Cleaner CLEANER = Cleaner.create();
private final ConcurrentHashMap<Object, LockTag> _locks = new ConcurrentHashMap<>();
private Lock getTag(Object data) {
var newTag = new ReentrantLock();
var newTagRef = new WeakReference<>(newTag);
while (true) {
var oldTagRef = _locks.putIfAbsent(data, newTagRef);
var oldTag = oldTagRef != null ? oldTagRef.get() : null;
if (oldTag == null && oldTagRef != null) {
_locks.remove(data, oldTagRef);
continue;
}
if (oldTag != null)
return oldTag;
CLEANER.register(newTag, () -> {
_locks.remove(data, newTagRef);
});
return newTag;
}
}
@Nonnull @Nonnull
public AutoCloseableNoThrow lock(Object data) { public AutoCloseableNoThrow lock(Object data) {
while (true) { var lock = getTag(data);
var newTag = new LockTag(); lock.lock();
var oldTag = _locks.putIfAbsent(data, newTag); return lock::unlock;
if (oldTag == null) {
return new Lock(data, newTag);
}
try {
synchronized (oldTag) {
while (!oldTag.released) {
if (oldTag.owner == Thread.currentThread()) {
return DUMMY_LOCK;
}
oldTag.wait();
// tag.wait(4000L);
// if (!tag.released) {
// System.out.println("Timeout waiting for lock: " + data);
// System.exit(1);
// throw new InterruptedException();
// }
}
}
} catch (InterruptedException ignored) {
}
}
} }
@Nullable @Nullable
public AutoCloseableNoThrow tryLock(Object data) { public AutoCloseableNoThrow tryLock(Object data) {
while (true) { var lock = getTag(data);
var newTag = new LockTag(); if (lock.tryLock()) {
var oldTag = _locks.putIfAbsent(data, newTag); return lock::unlock;
if (oldTag == null) { } else {
return new Lock(data, newTag); return null;
}
synchronized (oldTag) {
if (!oldTag.released) {
if (oldTag.owner == Thread.currentThread()) {
return DUMMY_LOCK;
}
return null;
}
}
} }
} }
private static class LockTag {
final Thread owner = Thread.currentThread();
// final StackTraceElement[] _creationStack = Thread.currentThread().getStackTrace();
boolean released = false;
}
private class Lock implements AutoCloseableNoThrow {
private static final Cleaner CLEANER = Cleaner.create();
private final Object _key;
private final LockTag _tag;
public Lock(Object key, LockTag tag) {
_key = key;
_tag = tag;
// CLEANER.register(this, () -> {
// if (!tag.released) {
// Log.error("Lock collected without release: " + key);
// }
// });
}
@Override
public void close() {
synchronized (_tag) {
if (_tag.released)
return;
_tag.released = true;
// Notify all because when the object is locked again,
// it's a different lock tag
_tag.notifyAll();
_locks.remove(_key, _tag);
}
}
}
} }

View File

@@ -0,0 +1,41 @@
package com.usatiuk.utils;
import jdk.internal.access.JavaNioAccess;
import jdk.internal.access.SharedSecrets;
import sun.misc.Unsafe;
import java.lang.reflect.Field;
public class UnsafeAccessor {
private static final UnsafeAccessor INSTANCE;
static {
try {
INSTANCE = new UnsafeAccessor();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
public static UnsafeAccessor get() {
return INSTANCE;
}
private JavaNioAccess _nioAccess;
private Unsafe _unsafe;
private UnsafeAccessor() throws NoSuchFieldException, IllegalAccessException {
_nioAccess = SharedSecrets.getJavaNioAccess();
Field f = Unsafe.class.getDeclaredField("theUnsafe");
f.setAccessible(true);
_unsafe = (Unsafe) f.get(null);
}
public JavaNioAccess getNioAccess() {
return _nioAccess;
}
public Unsafe getUnsafe() {
return _unsafe;
}
}