6 Commits

Author SHA1 Message Date
7aa968a569 Dhfs-fuse: fix import 2025-04-29 00:45:34 +02:00
e348c39be1 Utils: add UnsafeAccessor to JnrPtrByteOutput
oops
2025-04-28 23:50:31 +02:00
1b54830651 Objects: don't lock some objects twice for no reason 2025-04-28 23:49:45 +02:00
bc5f0b816c Objects: add putNew
to avoid searching for nonexistent objects
2025-04-28 23:47:53 +02:00
9ff914bdaa Utils: move UnsafeAccessor to utils 2025-04-28 23:36:42 +02:00
1cee6f62b8 Utils: less dumb DataLocker 2025-04-28 23:34:30 +02:00
11 changed files with 116 additions and 127 deletions

View File

@@ -83,7 +83,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
private ChunkData createChunk(ByteString bytes) {
var newChunk = new ChunkData(JObjectKey.of(UUID.randomUUID().toString()), bytes);
remoteTx.putData(newChunk);
remoteTx.putDataNew(newChunk);
return newChunk;
}

View File

@@ -7,6 +7,7 @@ import com.usatiuk.dhfsfs.service.DirectoryNotEmptyException;
import com.usatiuk.dhfsfs.service.GetattrRes;
import com.usatiuk.kleppmanntree.AlreadyExistsException;
import com.usatiuk.objects.JObjectKey;
import com.usatiuk.utils.UnsafeAccessor;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.quarkus.logging.Log;
@@ -51,8 +52,6 @@ public class DhfsFuse extends FuseStubFS {
@ConfigProperty(name = "dhfs.files.target_chunk_size")
int targetChunkSize;
@Inject
JnrPtrByteOutputAccessors jnrPtrByteOutputAccessors;
@Inject
DhfsFileService fileService;
private long allocateHandle(JObjectKey key) {
@@ -231,7 +230,7 @@ public class DhfsFuse extends FuseStubFS {
var fileKey = getFromHandle(fi.fh.get());
var read = fileService.read(fileKey, offset, (int) size);
if (read.isEmpty()) return 0;
UnsafeByteOperations.unsafeWriteTo(read, new JnrPtrByteOutput(jnrPtrByteOutputAccessors, buf, size));
UnsafeByteOperations.unsafeWriteTo(read, new JnrPtrByteOutput(buf, size));
return read.size();
} catch (Throwable e) {
Log.error("When reading " + path, e);
@@ -246,19 +245,15 @@ public class DhfsFuse extends FuseStubFS {
var fileKey = getFromHandle(fi.fh.get());
var buffer = ByteBuffer.allocateDirect((int) size);
if (buffer.isDirect()) {
jnrPtrByteOutputAccessors.getUnsafe().copyMemory(
buf.address(),
jnrPtrByteOutputAccessors.getNioAccess().getBufferAddress(buffer),
size
);
} else {
buf.get(0, buffer.array(), 0, (int) size);
}
UnsafeAccessor.get().getUnsafe().copyMemory(
buf.address(),
UnsafeAccessor.get().getNioAccess().getBufferAddress(buffer),
size
);
var written = fileService.write(fileKey, offset, UnsafeByteOperations.unsafeWrap(buffer));
return written.intValue();
} catch (Throwable e) {
} catch (Exception e) {
Log.error("When writing " + path, e);
return -ErrorCodes.EIO();
}
@@ -394,7 +389,7 @@ public class DhfsFuse extends FuseStubFS {
var file = fileOpt.get();
var read = fileService.readlinkBS(fileOpt.get());
if (read.isEmpty()) return 0;
UnsafeByteOperations.unsafeWriteTo(read, new JnrPtrByteOutput(jnrPtrByteOutputAccessors, buf, size));
UnsafeByteOperations.unsafeWriteTo(read, new JnrPtrByteOutput(buf, size));
buf.putByte(Math.min(size - 1, read.size()), (byte) 0);
return 0;
} catch (Throwable e) {

View File

@@ -1,6 +1,7 @@
package com.usatiuk.dhfsfuse;
import com.google.protobuf.ByteOutput;
import com.usatiuk.utils.UnsafeAccessor;
import jnr.ffi.Pointer;
import java.nio.ByteBuffer;
@@ -9,14 +10,12 @@ import java.nio.MappedByteBuffer;
public class JnrPtrByteOutput extends ByteOutput {
private final Pointer _backing;
private final long _size;
private final JnrPtrByteOutputAccessors _accessors;
private long _pos;
public JnrPtrByteOutput(JnrPtrByteOutputAccessors accessors, Pointer backing, long size) {
public JnrPtrByteOutput(Pointer backing, long size) {
_backing = backing;
_size = size;
_pos = 0;
_accessors = accessors;
}
@Override
@@ -47,9 +46,9 @@ public class JnrPtrByteOutput extends ByteOutput {
if (value instanceof MappedByteBuffer mb) {
mb.load();
}
long addr = _accessors.getNioAccess().getBufferAddress(value) + value.position();
long addr = UnsafeAccessor.get().getNioAccess().getBufferAddress(value) + value.position();
var out = _backing.address() + _pos;
_accessors.getUnsafe().copyMemory(addr, out, rem);
UnsafeAccessor.get().getUnsafe().copyMemory(addr, out, rem);
} else {
_backing.put(_pos, value.array(), value.arrayOffset() + value.position(), rem);
}

View File

@@ -1,29 +0,0 @@
package com.usatiuk.dhfsfuse;
import jakarta.inject.Singleton;
import jdk.internal.access.JavaNioAccess;
import jdk.internal.access.SharedSecrets;
import sun.misc.Unsafe;
import java.lang.reflect.Field;
@Singleton
class JnrPtrByteOutputAccessors {
JavaNioAccess _nioAccess;
Unsafe _unsafe;
JnrPtrByteOutputAccessors() throws NoSuchFieldException, IllegalAccessException {
_nioAccess = SharedSecrets.getJavaNioAccess();
Field f = Unsafe.class.getDeclaredField("theUnsafe");
f.setAccessible(true);
_unsafe = (Unsafe) f.get(null);
}
public JavaNioAccess getNioAccess() {
return _nioAccess;
}
public Unsafe getUnsafe() {
return _unsafe;
}
}

View File

@@ -43,4 +43,9 @@ public class CurrentTransaction implements Transaction {
public <T extends JData> void put(JData obj) {
transactionManager.current().put(obj);
}
@Override
public <T extends JData> void putNew(JData obj) {
transactionManager.current().putNew(obj);
}
}

View File

@@ -165,7 +165,8 @@ public class JObjectManager {
}
}
for (var write : writes.entrySet()) {
toLock.add(write.getKey());
if (!readSet.containsKey(write.getKey()))
toLock.add(write.getKey());
}
Collections.sort(toLock);
for (var key : toLock) {

View File

@@ -14,6 +14,7 @@ public interface Transaction extends TransactionHandle {
<T extends JData> Optional<T> get(Class<T> type, JObjectKey key, LockingStrategy strategy);
<T extends JData> void put(JData obj);
<T extends JData> void putNew(JData obj);
void delete(JObjectKey key);

View File

@@ -59,6 +59,7 @@ public class TransactionFactoryImpl implements TransactionFactory {
private final NavigableMap<JObjectKey, TxRecord.TxObjectRecord<?>> _writes = new TreeMap<>();
private final List<Runnable> _onCommit = new ArrayList<>();
private final List<Runnable> _onFlush = new ArrayList<>();
private final HashSet<JObjectKey> _knownNew = new HashSet<>();
private final Snapshot<JObjectKey, JDataVersionedWrapper> _snapshot;
private boolean _closed = false;
private Map<JObjectKey, TxRecord.TxObjectRecord<?>> _newWrites = new HashMap<>();
@@ -94,6 +95,9 @@ public class TransactionFactoryImpl implements TransactionFactory {
@Override
public <T extends JData> Optional<T> getFromSource(Class<T> type, JObjectKey key) {
if (_knownNew.contains(key)) {
return Optional.empty();
}
return _readSet.computeIfAbsent(key, k -> {
var read = _snapshot.readObject(k);
return new TransactionObjectNoLock<>(read);
@@ -181,6 +185,14 @@ public class TransactionFactoryImpl implements TransactionFactory {
_newWrites.put(obj.key(), new TxRecord.TxObjectRecordWrite<>(obj));
}
@Override
public void putNew(JData obj) {
_knownNew.add(obj.key());
_writes.put(obj.key(), new TxRecord.TxObjectRecordWrite<>(obj));
_newWrites.put(obj.key(), new TxRecord.TxObjectRecordWrite<>(obj));
}
@Override
public Collection<TxRecord.TxObjectRecord<?>> drainNewWrites() {
var ret = _newWrites;

View File

@@ -93,6 +93,11 @@ public class RemoteTransaction {
curTx.put(newData);
}
public <T extends JDataRemote> void putDataNew(T obj) {
curTx.putNew(new RemoteObjectMeta(obj, persistentPeerDataService.getSelfUuid()));
curTx.putNew(new RemoteObjectDataWrapper<>(obj));
}
public <T extends JDataRemote> void putData(T obj) {
var curMeta = getMeta(obj.key()).orElse(null);

View File

@@ -4,93 +4,52 @@ import jakarta.annotation.Nonnull;
import jakarta.annotation.Nullable;
import java.lang.ref.Cleaner;
import java.lang.ref.WeakReference;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class DataLocker {
private static final AutoCloseableNoThrow DUMMY_LOCK = () -> {
};
private final ConcurrentHashMap<Object, LockTag> _locks = new ConcurrentHashMap<>();
private final ConcurrentHashMap<Object, WeakReference<ReentrantLock>> _locks = new ConcurrentHashMap<>();
private static final Cleaner CLEANER = Cleaner.create();
private Lock getTag(Object data) {
var newTag = new ReentrantLock();
var newTagRef = new WeakReference<>(newTag);
while (true) {
var oldTagRef = _locks.putIfAbsent(data, newTagRef);
var oldTag = oldTagRef != null ? oldTagRef.get() : null;
if (oldTag == null && oldTagRef != null) {
_locks.remove(data, oldTagRef);
continue;
}
if (oldTag != null)
return oldTag;
CLEANER.register(newTag, () -> {
_locks.remove(data, newTagRef);
});
return newTag;
}
}
@Nonnull
public AutoCloseableNoThrow lock(Object data) {
while (true) {
var newTag = new LockTag();
var oldTag = _locks.putIfAbsent(data, newTag);
if (oldTag == null) {
return new Lock(data, newTag);
}
try {
synchronized (oldTag) {
while (!oldTag.released) {
if (oldTag.owner == Thread.currentThread()) {
return DUMMY_LOCK;
}
oldTag.wait();
// tag.wait(4000L);
// if (!tag.released) {
// System.out.println("Timeout waiting for lock: " + data);
// System.exit(1);
// throw new InterruptedException();
// }
}
}
} catch (InterruptedException ignored) {
}
}
var lock = getTag(data);
lock.lock();
return lock::unlock;
}
@Nullable
public AutoCloseableNoThrow tryLock(Object data) {
while (true) {
var newTag = new LockTag();
var oldTag = _locks.putIfAbsent(data, newTag);
if (oldTag == null) {
return new Lock(data, newTag);
}
synchronized (oldTag) {
if (!oldTag.released) {
if (oldTag.owner == Thread.currentThread()) {
return DUMMY_LOCK;
}
return null;
}
}
var lock = getTag(data);
if (lock.tryLock()) {
return lock::unlock;
} else {
return null;
}
}
private static class LockTag {
final Thread owner = Thread.currentThread();
// final StackTraceElement[] _creationStack = Thread.currentThread().getStackTrace();
boolean released = false;
}
private class Lock implements AutoCloseableNoThrow {
private static final Cleaner CLEANER = Cleaner.create();
private final Object _key;
private final LockTag _tag;
public Lock(Object key, LockTag tag) {
_key = key;
_tag = tag;
// CLEANER.register(this, () -> {
// if (!tag.released) {
// Log.error("Lock collected without release: " + key);
// }
// });
}
@Override
public void close() {
synchronized (_tag) {
if (_tag.released)
return;
_tag.released = true;
// Notify all because when the object is locked again,
// it's a different lock tag
_tag.notifyAll();
_locks.remove(_key, _tag);
}
}
}
}

View File

@@ -0,0 +1,41 @@
package com.usatiuk.utils;
import jdk.internal.access.JavaNioAccess;
import jdk.internal.access.SharedSecrets;
import sun.misc.Unsafe;
import java.lang.reflect.Field;
public class UnsafeAccessor {
private static final UnsafeAccessor INSTANCE;
static {
try {
INSTANCE = new UnsafeAccessor();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
public static UnsafeAccessor get() {
return INSTANCE;
}
private JavaNioAccess _nioAccess;
private Unsafe _unsafe;
private UnsafeAccessor() throws NoSuchFieldException, IllegalAccessException {
_nioAccess = SharedSecrets.getJavaNioAccess();
Field f = Unsafe.class.getDeclaredField("theUnsafe");
f.setAccessible(true);
_unsafe = (Unsafe) f.get(null);
}
public JavaNioAccess getNioAccess() {
return _nioAccess;
}
public Unsafe getUnsafe() {
return _unsafe;
}
}