mirror of
https://github.com/usatiuk/dhfs.git
synced 2025-10-29 04:57:48 +01:00
Compare commits
1 Commits
faster-2
...
nicer-iter
| Author | SHA1 | Date | |
|---|---|---|---|
| ddf87d2125 |
@@ -1,16 +1,18 @@
|
||||
<component name="ProjectRunConfigurationManager">
|
||||
<configuration default="false" name="Main" type="QsApplicationConfigurationType" factoryName="QuarkusApplication" nameIsGenerated="true">
|
||||
<option name="MAIN_CLASS_NAME" value="com.usatiuk.dhfsapp.Main" />
|
||||
<module name="dhfs-app" />
|
||||
<option name="VM_PARAMETERS" value="-XX:+UnlockDiagnosticVMOptions -XX:+UseParallelGC --enable-preview -XX:+DebugNonSafepoints --add-exports java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-exports java.base/jdk.internal.access=ALL-UNNAMED -ea -Xmx2G -Ddhfs.webui.root=$ProjectFileDir$/../webui/dist -Ddhfs.fuse.root=${HOME}/dhfs_test/1/fuse -Ddhfs.objects.persistence.files.root=${HOME}/dhfs_test/1/data -Ddhfs.objects.persistence.stuff.root=${HOME}/dhfs_test/1/data/stuff -Ddhfs.objects.peerdiscovery.broadcast=true -Dquarkus.http.port=9010 -Dquarkus.http.ssl-port=9011 -Ddhfs.peerdiscovery.preset-uuid=11000000-0000-0000-0000-000000000000 -Ddhfs.peerdiscovery.static-peers=22000000-0000-0000-0000-000000000000:127.0.0.1:9020:9021 -Dquarkus.http.host=0.0.0.0" />
|
||||
<extension name="coverage">
|
||||
<pattern>
|
||||
<option name="PATTERN" value="com.usatiuk.dhfs.remoteobj.*" />
|
||||
<option name="ENABLED" value="true" />
|
||||
</pattern>
|
||||
</extension>
|
||||
<method v="2">
|
||||
<option name="Make" enabled="true" />
|
||||
</method>
|
||||
</configuration>
|
||||
<configuration default="false" name="Main" type="QsApplicationConfigurationType" factoryName="QuarkusApplication"
|
||||
nameIsGenerated="true">
|
||||
<option name="MAIN_CLASS_NAME" value="com.usatiuk.dhfsapp.Main"/>
|
||||
<module name="dhfs-app"/>
|
||||
<option name="VM_PARAMETERS"
|
||||
value="-XX:+UnlockDiagnosticVMOptions -XX:+UseParallelGC -XX:+DebugNonSafepoints --add-exports java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-exports java.base/jdk.internal.access=ALL-UNNAMED -ea -Xmx2G -Ddhfs.webui.root=$ProjectFileDir$/../webui/dist -Ddhfs.fuse.root=${HOME}/dhfs_test/1/fuse -Ddhfs.objects.persistence.files.root=${HOME}/dhfs_test/1/data -Ddhfs.objects.persistence.stuff.root=${HOME}/dhfs_test/1/data/stuff -Ddhfs.objects.peerdiscovery.broadcast=true -Dquarkus.http.port=9010 -Dquarkus.http.ssl-port=9011 -Ddhfs.peerdiscovery.preset-uuid=11000000-0000-0000-0000-000000000000 -Ddhfs.peerdiscovery.static-peers=22000000-0000-0000-0000-000000000000:127.0.0.1:9020:9021 -Dquarkus.http.host=0.0.0.0"/>
|
||||
<extension name="coverage">
|
||||
<pattern>
|
||||
<option name="PATTERN" value="com.usatiuk.dhfs.remoteobj.*"/>
|
||||
<option name="ENABLED" value="true"/>
|
||||
</pattern>
|
||||
</extension>
|
||||
<method v="2">
|
||||
<option name="Make" enabled="true"/>
|
||||
</method>
|
||||
</configuration>
|
||||
</component>
|
||||
@@ -14,9 +14,8 @@ dhfs.objects.persistence.stuff.root=${HOME}/dhfs_default/data/stuff
|
||||
dhfs.fuse.debug=false
|
||||
dhfs.fuse.enabled=true
|
||||
dhfs.files.allow_recursive_delete=false
|
||||
dhfs.files.target_chunk_size=524288
|
||||
dhfs.files.max_chunk_size=524288
|
||||
dhfs.files.target_chunk_alignment=17
|
||||
dhfs.files.target_chunk_size=2097152
|
||||
dhfs.files.target_chunk_alignment=19
|
||||
dhfs.objects.deletion.delay=1000
|
||||
dhfs.objects.deletion.can-delete-retry-delay=10000
|
||||
dhfs.objects.ref_verification=true
|
||||
@@ -29,7 +28,7 @@ dhfs.objects.opsender.batch-size=100
|
||||
dhfs.objects.lock_timeout_secs=2
|
||||
dhfs.local-discovery=true
|
||||
dhfs.peerdiscovery.timeout=10000
|
||||
quarkus.log.category."com.usatiuk".min-level=INFO
|
||||
quarkus.log.category."com.usatiuk".level=INFO
|
||||
quarkus.log.category."com.usatiuk".min-level=TRACE
|
||||
quarkus.log.category."com.usatiuk".level=TRACE
|
||||
quarkus.http.insecure-requests=enabled
|
||||
quarkus.http.ssl.client-auth=required
|
||||
|
||||
@@ -55,9 +55,6 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
@ConfigProperty(name = "dhfs.files.target_chunk_size")
|
||||
int targetChunkSize;
|
||||
|
||||
@ConfigProperty(name = "dhfs.files.max_chunk_size")
|
||||
int maxChunkSize;
|
||||
|
||||
@ConfigProperty(name = "dhfs.files.use_hash_for_chunks")
|
||||
boolean useHashForChunks;
|
||||
|
||||
@@ -86,7 +83,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
|
||||
private ChunkData createChunk(ByteString bytes) {
|
||||
var newChunk = new ChunkData(JObjectKey.of(UUID.randomUUID().toString()), bytes);
|
||||
remoteTx.putDataNew(newChunk);
|
||||
remoteTx.putData(newChunk);
|
||||
return newChunk;
|
||||
}
|
||||
|
||||
@@ -363,10 +360,16 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
|
||||
var file = remoteTx.getData(File.class, fileUuid, LockingStrategy.WRITE).orElse(null);
|
||||
if (file == null) {
|
||||
throw new StatusRuntimeException(Status.NOT_FOUND.withDescription("File not found when trying to write: " + fileUuid));
|
||||
Log.error("File not found when trying to write: " + fileUuid);
|
||||
return -1L;
|
||||
}
|
||||
|
||||
Map<Long, JObjectKey> removedChunks = new HashMap<>();
|
||||
if (writeLogging) {
|
||||
Log.info("Writing to file: " + file.key() + " size=" + size(fileUuid) + " "
|
||||
+ offset + " " + data.size());
|
||||
}
|
||||
|
||||
NavigableMap<Long, JObjectKey> removedChunks = new TreeMap<>();
|
||||
|
||||
long realOffset = targetChunkAlignment >= 0 ? alignDown(offset, targetChunkAlignment) : offset;
|
||||
long writeEnd = offset + data.size();
|
||||
@@ -404,7 +407,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
}
|
||||
|
||||
|
||||
Map<Long, JObjectKey> newChunks = new HashMap<>();
|
||||
NavigableMap<Long, JObjectKey> newChunks = new TreeMap<>();
|
||||
|
||||
if (existingEnd < offset) {
|
||||
if (!pendingPrefix.isEmpty()) {
|
||||
@@ -421,13 +424,12 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
int combinedSize = pendingWrites.size();
|
||||
|
||||
{
|
||||
int targetChunkSize = 1 << targetChunkAlignment;
|
||||
int cur = 0;
|
||||
while (cur < combinedSize) {
|
||||
int end;
|
||||
|
||||
if (combinedSize - cur < maxChunkSize)
|
||||
end = combinedSize;
|
||||
else if (targetChunkAlignment < 0)
|
||||
if (targetChunkAlignment < 0)
|
||||
end = combinedSize;
|
||||
else
|
||||
end = Math.min(cur + targetChunkSize, combinedSize);
|
||||
@@ -548,7 +550,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
});
|
||||
}
|
||||
|
||||
private void fillZeros(long fillStart, long length, Map<Long, JObjectKey> newChunks) {
|
||||
private void fillZeros(long fillStart, long length, NavigableMap<Long, JObjectKey> newChunks) {
|
||||
long combinedSize = (length - fillStart);
|
||||
|
||||
long start = fillStart;
|
||||
|
||||
@@ -7,8 +7,6 @@ import com.usatiuk.dhfsfs.service.DirectoryNotEmptyException;
|
||||
import com.usatiuk.dhfsfs.service.GetattrRes;
|
||||
import com.usatiuk.kleppmanntree.AlreadyExistsException;
|
||||
import com.usatiuk.objects.JObjectKey;
|
||||
import com.usatiuk.utils.UninitializedByteBuffer;
|
||||
import com.usatiuk.utils.UnsafeAccessor;
|
||||
import io.grpc.Status;
|
||||
import io.grpc.StatusRuntimeException;
|
||||
import io.quarkus.logging.Log;
|
||||
@@ -29,6 +27,7 @@ import ru.serce.jnrfuse.struct.FuseFileInfo;
|
||||
import ru.serce.jnrfuse.struct.Statvfs;
|
||||
import ru.serce.jnrfuse.struct.Timespec;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.file.Paths;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Optional;
|
||||
@@ -52,6 +51,8 @@ public class DhfsFuse extends FuseStubFS {
|
||||
@ConfigProperty(name = "dhfs.files.target_chunk_size")
|
||||
int targetChunkSize;
|
||||
@Inject
|
||||
JnrPtrByteOutputAccessors jnrPtrByteOutputAccessors;
|
||||
@Inject
|
||||
DhfsFileService fileService;
|
||||
|
||||
private long allocateHandle(JObjectKey key) {
|
||||
@@ -230,7 +231,7 @@ public class DhfsFuse extends FuseStubFS {
|
||||
var fileKey = getFromHandle(fi.fh.get());
|
||||
var read = fileService.read(fileKey, offset, (int) size);
|
||||
if (read.isEmpty()) return 0;
|
||||
UnsafeByteOperations.unsafeWriteTo(read, new JnrPtrByteOutput(buf, size));
|
||||
UnsafeByteOperations.unsafeWriteTo(read, new JnrPtrByteOutput(jnrPtrByteOutputAccessors, buf, size));
|
||||
return read.size();
|
||||
} catch (Throwable e) {
|
||||
Log.error("When reading " + path, e);
|
||||
@@ -243,17 +244,21 @@ public class DhfsFuse extends FuseStubFS {
|
||||
if (offset < 0) return -ErrorCodes.EINVAL();
|
||||
try {
|
||||
var fileKey = getFromHandle(fi.fh.get());
|
||||
var buffer = UninitializedByteBuffer.allocate((int) size);
|
||||
var buffer = ByteBuffer.allocateDirect((int) size);
|
||||
|
||||
UnsafeAccessor.get().getUnsafe().copyMemory(
|
||||
buf.address(),
|
||||
UnsafeAccessor.get().getNioAccess().getBufferAddress(buffer),
|
||||
size
|
||||
);
|
||||
if (buffer.isDirect()) {
|
||||
jnrPtrByteOutputAccessors.getUnsafe().copyMemory(
|
||||
buf.address(),
|
||||
jnrPtrByteOutputAccessors.getNioAccess().getBufferAddress(buffer),
|
||||
size
|
||||
);
|
||||
} else {
|
||||
buf.get(0, buffer.array(), 0, (int) size);
|
||||
}
|
||||
|
||||
var written = fileService.write(fileKey, offset, UnsafeByteOperations.unsafeWrap(buffer));
|
||||
return written.intValue();
|
||||
} catch (Exception e) {
|
||||
} catch (Throwable e) {
|
||||
Log.error("When writing " + path, e);
|
||||
return -ErrorCodes.EIO();
|
||||
}
|
||||
@@ -389,7 +394,7 @@ public class DhfsFuse extends FuseStubFS {
|
||||
var file = fileOpt.get();
|
||||
var read = fileService.readlinkBS(fileOpt.get());
|
||||
if (read.isEmpty()) return 0;
|
||||
UnsafeByteOperations.unsafeWriteTo(read, new JnrPtrByteOutput(buf, size));
|
||||
UnsafeByteOperations.unsafeWriteTo(read, new JnrPtrByteOutput(jnrPtrByteOutputAccessors, buf, size));
|
||||
buf.putByte(Math.min(size - 1, read.size()), (byte) 0);
|
||||
return 0;
|
||||
} catch (Throwable e) {
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
package com.usatiuk.dhfsfuse;
|
||||
|
||||
import com.google.protobuf.ByteOutput;
|
||||
import com.usatiuk.utils.UnsafeAccessor;
|
||||
import jnr.ffi.Pointer;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
@@ -10,12 +9,14 @@ import java.nio.MappedByteBuffer;
|
||||
public class JnrPtrByteOutput extends ByteOutput {
|
||||
private final Pointer _backing;
|
||||
private final long _size;
|
||||
private final JnrPtrByteOutputAccessors _accessors;
|
||||
private long _pos;
|
||||
|
||||
public JnrPtrByteOutput(Pointer backing, long size) {
|
||||
public JnrPtrByteOutput(JnrPtrByteOutputAccessors accessors, Pointer backing, long size) {
|
||||
_backing = backing;
|
||||
_size = size;
|
||||
_pos = 0;
|
||||
_accessors = accessors;
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -46,9 +47,9 @@ public class JnrPtrByteOutput extends ByteOutput {
|
||||
if (value instanceof MappedByteBuffer mb) {
|
||||
mb.load();
|
||||
}
|
||||
long addr = UnsafeAccessor.get().getNioAccess().getBufferAddress(value) + value.position();
|
||||
long addr = _accessors.getNioAccess().getBufferAddress(value) + value.position();
|
||||
var out = _backing.address() + _pos;
|
||||
UnsafeAccessor.get().getUnsafe().copyMemory(addr, out, rem);
|
||||
_accessors.getUnsafe().copyMemory(addr, out, rem);
|
||||
} else {
|
||||
_backing.put(_pos, value.array(), value.arrayOffset() + value.position(), rem);
|
||||
}
|
||||
|
||||
@@ -0,0 +1,29 @@
|
||||
package com.usatiuk.dhfsfuse;
|
||||
|
||||
import jakarta.inject.Singleton;
|
||||
import jdk.internal.access.JavaNioAccess;
|
||||
import jdk.internal.access.SharedSecrets;
|
||||
import sun.misc.Unsafe;
|
||||
|
||||
import java.lang.reflect.Field;
|
||||
|
||||
@Singleton
|
||||
class JnrPtrByteOutputAccessors {
|
||||
JavaNioAccess _nioAccess;
|
||||
Unsafe _unsafe;
|
||||
|
||||
JnrPtrByteOutputAccessors() throws NoSuchFieldException, IllegalAccessException {
|
||||
_nioAccess = SharedSecrets.getJavaNioAccess();
|
||||
Field f = Unsafe.class.getDeclaredField("theUnsafe");
|
||||
f.setAccessible(true);
|
||||
_unsafe = (Unsafe) f.get(null);
|
||||
}
|
||||
|
||||
public JavaNioAccess getNioAccess() {
|
||||
return _nioAccess;
|
||||
}
|
||||
|
||||
public Unsafe getUnsafe() {
|
||||
return _unsafe;
|
||||
}
|
||||
}
|
||||
@@ -1,7 +1,5 @@
|
||||
package com.usatiuk.objects;
|
||||
|
||||
import com.usatiuk.utils.UninitializedByteBuffer;
|
||||
|
||||
import java.io.Serial;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
@@ -48,7 +46,7 @@ public final class JObjectKeyImpl implements JObjectKey {
|
||||
synchronized (this) {
|
||||
if (_bb != null) return _bb;
|
||||
var bytes = value.getBytes(StandardCharsets.ISO_8859_1);
|
||||
var directBb = UninitializedByteBuffer.allocate(bytes.length);
|
||||
var directBb = ByteBuffer.allocateDirect(bytes.length);
|
||||
directBb.put(bytes);
|
||||
directBb.flip();
|
||||
_bb = directBb;
|
||||
|
||||
@@ -144,7 +144,7 @@ public class WritebackObjectPersistentStore {
|
||||
|
||||
Log.tracev("Bundle {0} committed", bundle.id());
|
||||
|
||||
synchronized (_pendingWrites) {
|
||||
while (true) {
|
||||
var curPw = _pendingWrites.get();
|
||||
var curPwMap = curPw.pendingWrites();
|
||||
for (var e : bundle._entries.values()) {
|
||||
@@ -157,7 +157,8 @@ public class WritebackObjectPersistentStore {
|
||||
bundle.id(),
|
||||
curPw.lastCommittedId()
|
||||
);
|
||||
_pendingWrites.compareAndSet(curPw, newCurPw);
|
||||
if (_pendingWrites.compareAndSet(curPw, newCurPw))
|
||||
break;
|
||||
}
|
||||
|
||||
List<List<Runnable>> callbacks = new ArrayList<>();
|
||||
@@ -254,7 +255,7 @@ public class WritebackObjectPersistentStore {
|
||||
}
|
||||
}
|
||||
|
||||
synchronized (_pendingWrites) {
|
||||
while (true) {
|
||||
var curPw = _pendingWrites.get();
|
||||
var curPwMap = curPw.pendingWrites();
|
||||
for (var e : ((TxBundle) bundle)._entries.values()) {
|
||||
@@ -275,17 +276,18 @@ public class WritebackObjectPersistentStore {
|
||||
bundle.id()
|
||||
);
|
||||
|
||||
_pendingWrites.compareAndSet(curPw, newCurPw);
|
||||
}
|
||||
if (!_pendingWrites.compareAndSet(curPw, newCurPw))
|
||||
continue;
|
||||
|
||||
((TxBundle) bundle).setReady();
|
||||
if (_pendingBundles.peek() == bundle)
|
||||
_pendingBundles.notify();
|
||||
synchronized (_flushWaitSynchronizer) {
|
||||
currentSize += ((TxBundle) bundle).size();
|
||||
}
|
||||
((TxBundle) bundle).setReady();
|
||||
if (_pendingBundles.peek() == bundle)
|
||||
_pendingBundles.notify();
|
||||
synchronized (_flushWaitSynchronizer) {
|
||||
currentSize += ((TxBundle) bundle).size();
|
||||
}
|
||||
|
||||
return bundle.id();
|
||||
return bundle.id();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -390,7 +392,7 @@ public class WritebackObjectPersistentStore {
|
||||
}
|
||||
|
||||
private static class TxBundle {
|
||||
private final HashMap<JObjectKey, BundleEntry> _entries = new HashMap<>();
|
||||
private final LinkedHashMap<JObjectKey, BundleEntry> _entries = new LinkedHashMap<>();
|
||||
private final ArrayList<Runnable> _callbacks = new ArrayList<>();
|
||||
private long _txId;
|
||||
private volatile boolean _ready = false;
|
||||
|
||||
@@ -43,9 +43,4 @@ public class CurrentTransaction implements Transaction {
|
||||
public <T extends JData> void put(JData obj) {
|
||||
transactionManager.current().put(obj);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T extends JData> void putNew(JData obj) {
|
||||
transactionManager.current().putNew(obj);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -165,8 +165,7 @@ public class JObjectManager {
|
||||
}
|
||||
}
|
||||
for (var write : writes.entrySet()) {
|
||||
if (!readSet.containsKey(write.getKey()))
|
||||
toLock.add(write.getKey());
|
||||
toLock.add(write.getKey());
|
||||
}
|
||||
Collections.sort(toLock);
|
||||
for (var key : toLock) {
|
||||
|
||||
@@ -14,7 +14,6 @@ public interface Transaction extends TransactionHandle {
|
||||
<T extends JData> Optional<T> get(Class<T> type, JObjectKey key, LockingStrategy strategy);
|
||||
|
||||
<T extends JData> void put(JData obj);
|
||||
<T extends JData> void putNew(JData obj);
|
||||
|
||||
void delete(JObjectKey key);
|
||||
|
||||
|
||||
@@ -60,7 +60,6 @@ public class TransactionFactoryImpl implements TransactionFactory {
|
||||
private final NavigableMap<JObjectKey, TxRecord.TxObjectRecord<?>> _writes = new TreeMap<>();
|
||||
private final List<Runnable> _onCommit = new ArrayList<>();
|
||||
private final List<Runnable> _onFlush = new ArrayList<>();
|
||||
private final HashSet<JObjectKey> _knownNew = new HashSet<>();
|
||||
private final Snapshot<JObjectKey, JDataVersionedWrapper> _snapshot;
|
||||
private boolean _closed = false;
|
||||
private Map<JObjectKey, TxRecord.TxObjectRecord<?>> _newWrites = new HashMap<>();
|
||||
@@ -96,9 +95,6 @@ public class TransactionFactoryImpl implements TransactionFactory {
|
||||
|
||||
@Override
|
||||
public <T extends JData> Optional<T> getFromSource(Class<T> type, JObjectKey key) {
|
||||
if (_knownNew.contains(key)) {
|
||||
return Optional.empty();
|
||||
}
|
||||
return _readSet.computeIfAbsent(key, k -> {
|
||||
var read = _snapshot.readObject(k);
|
||||
return new TransactionObjectNoLock<>(read);
|
||||
@@ -191,14 +187,6 @@ public class TransactionFactoryImpl implements TransactionFactory {
|
||||
_newWrites.put(obj.key(), new TxRecord.TxObjectRecordWrite<>(obj));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void putNew(JData obj) {
|
||||
_knownNew.add(obj.key());
|
||||
|
||||
_writes.put(obj.key(), new TxRecord.TxObjectRecordWrite<>(obj));
|
||||
_newWrites.put(obj.key(), new TxRecord.TxObjectRecordWrite<>(obj));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Collection<TxRecord.TxObjectRecord<?>> drainNewWrites() {
|
||||
var ret = _newWrites;
|
||||
|
||||
@@ -102,7 +102,6 @@
|
||||
<arg>-parameters</arg>
|
||||
<arg>--add-exports</arg>
|
||||
<arg>java.base/jdk.internal.access=ALL-UNNAMED</arg>
|
||||
<arg>--enable-preview</arg>
|
||||
</compilerArgs>
|
||||
</configuration>
|
||||
</plugin>
|
||||
@@ -120,7 +119,6 @@
|
||||
--add-exports java.base/sun.nio.ch=ALL-UNNAMED
|
||||
--add-exports java.base/jdk.internal.access=ALL-UNNAMED
|
||||
--add-opens=java.base/java.nio=ALL-UNNAMED
|
||||
--enable-preview
|
||||
</argLine>
|
||||
<skipTests>${skip.unit}</skipTests>
|
||||
<redirectTestOutputToFile>true</redirectTestOutputToFile>
|
||||
|
||||
@@ -93,11 +93,6 @@ public class RemoteTransaction {
|
||||
curTx.put(newData);
|
||||
}
|
||||
|
||||
public <T extends JDataRemote> void putDataNew(T obj) {
|
||||
curTx.putNew(new RemoteObjectMeta(obj, persistentPeerDataService.getSelfUuid()));
|
||||
curTx.putNew(new RemoteObjectDataWrapper<>(obj));
|
||||
}
|
||||
|
||||
public <T extends JDataRemote> void putData(T obj) {
|
||||
var curMeta = getMeta(obj.key()).orElse(null);
|
||||
|
||||
|
||||
@@ -4,52 +4,93 @@ import jakarta.annotation.Nonnull;
|
||||
import jakarta.annotation.Nullable;
|
||||
|
||||
import java.lang.ref.Cleaner;
|
||||
import java.lang.ref.WeakReference;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
|
||||
public class DataLocker {
|
||||
private final ConcurrentHashMap<Object, WeakReference<ReentrantLock>> _locks = new ConcurrentHashMap<>();
|
||||
private static final Cleaner CLEANER = Cleaner.create();
|
||||
|
||||
private Lock getTag(Object data) {
|
||||
var newTag = new ReentrantLock();
|
||||
var newTagRef = new WeakReference<>(newTag);
|
||||
|
||||
while (true) {
|
||||
var oldTagRef = _locks.putIfAbsent(data, newTagRef);
|
||||
var oldTag = oldTagRef != null ? oldTagRef.get() : null;
|
||||
|
||||
if (oldTag == null && oldTagRef != null) {
|
||||
_locks.remove(data, oldTagRef);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (oldTag != null)
|
||||
return oldTag;
|
||||
|
||||
CLEANER.register(newTag, () -> {
|
||||
_locks.remove(data, newTagRef);
|
||||
});
|
||||
return newTag;
|
||||
}
|
||||
}
|
||||
private static final AutoCloseableNoThrow DUMMY_LOCK = () -> {
|
||||
};
|
||||
private final ConcurrentHashMap<Object, LockTag> _locks = new ConcurrentHashMap<>();
|
||||
|
||||
@Nonnull
|
||||
public AutoCloseableNoThrow lock(Object data) {
|
||||
var lock = getTag(data);
|
||||
lock.lock();
|
||||
return lock::unlock;
|
||||
while (true) {
|
||||
var newTag = new LockTag();
|
||||
var oldTag = _locks.putIfAbsent(data, newTag);
|
||||
if (oldTag == null) {
|
||||
return new Lock(data, newTag);
|
||||
}
|
||||
try {
|
||||
synchronized (oldTag) {
|
||||
while (!oldTag.released) {
|
||||
if (oldTag.owner == Thread.currentThread()) {
|
||||
return DUMMY_LOCK;
|
||||
}
|
||||
oldTag.wait();
|
||||
// tag.wait(4000L);
|
||||
// if (!tag.released) {
|
||||
// System.out.println("Timeout waiting for lock: " + data);
|
||||
// System.exit(1);
|
||||
// throw new InterruptedException();
|
||||
// }
|
||||
}
|
||||
}
|
||||
} catch (InterruptedException ignored) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Nullable
|
||||
public AutoCloseableNoThrow tryLock(Object data) {
|
||||
var lock = getTag(data);
|
||||
if (lock.tryLock()) {
|
||||
return lock::unlock;
|
||||
} else {
|
||||
return null;
|
||||
while (true) {
|
||||
var newTag = new LockTag();
|
||||
var oldTag = _locks.putIfAbsent(data, newTag);
|
||||
if (oldTag == null) {
|
||||
return new Lock(data, newTag);
|
||||
}
|
||||
synchronized (oldTag) {
|
||||
if (!oldTag.released) {
|
||||
if (oldTag.owner == Thread.currentThread()) {
|
||||
return DUMMY_LOCK;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static class LockTag {
|
||||
final Thread owner = Thread.currentThread();
|
||||
// final StackTraceElement[] _creationStack = Thread.currentThread().getStackTrace();
|
||||
boolean released = false;
|
||||
}
|
||||
|
||||
private class Lock implements AutoCloseableNoThrow {
|
||||
private static final Cleaner CLEANER = Cleaner.create();
|
||||
private final Object _key;
|
||||
private final LockTag _tag;
|
||||
|
||||
public Lock(Object key, LockTag tag) {
|
||||
_key = key;
|
||||
_tag = tag;
|
||||
// CLEANER.register(this, () -> {
|
||||
// if (!tag.released) {
|
||||
// Log.error("Lock collected without release: " + key);
|
||||
// }
|
||||
// });
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
synchronized (_tag) {
|
||||
if (_tag.released)
|
||||
return;
|
||||
_tag.released = true;
|
||||
// Notify all because when the object is locked again,
|
||||
// it's a different lock tag
|
||||
_tag.notifyAll();
|
||||
_locks.remove(_key, _tag);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -1,42 +0,0 @@
|
||||
package com.usatiuk.utils;
|
||||
|
||||
import java.lang.foreign.*;
|
||||
import java.lang.invoke.MethodHandle;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
public class UninitializedByteBuffer {
|
||||
|
||||
private static final Linker LINKER = Linker.nativeLinker();
|
||||
private static final MethodHandle malloc = LINKER.downcallHandle(
|
||||
LINKER.defaultLookup().find("malloc").orElseThrow(),
|
||||
FunctionDescriptor.of(ValueLayout.ADDRESS, ValueLayout.JAVA_LONG)
|
||||
);
|
||||
private static final MethodHandle free = LINKER.downcallHandle(
|
||||
LINKER.defaultLookup().find("free").orElseThrow(),
|
||||
FunctionDescriptor.ofVoid(ValueLayout.ADDRESS)
|
||||
);
|
||||
|
||||
public static ByteBuffer allocate(int capacity) {
|
||||
UnsafeAccessor.get().getNioAccess().reserveMemory(capacity, capacity);
|
||||
// Invoke malloc(), which returns a pointer
|
||||
MemorySegment segment = null;
|
||||
try {
|
||||
segment = (MemorySegment) malloc.invokeExact((long) capacity);
|
||||
} catch (Throwable e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
||||
|
||||
Consumer<MemorySegment> cleanup = s -> {
|
||||
try {
|
||||
free.invokeExact(s);
|
||||
UnsafeAccessor.get().getNioAccess().unreserveMemory(capacity, capacity);
|
||||
} catch (Throwable e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
};
|
||||
var reint = segment.reinterpret(capacity, Arena.ofAuto(), cleanup);
|
||||
return reint.asByteBuffer();
|
||||
}
|
||||
}
|
||||
@@ -1,41 +0,0 @@
|
||||
package com.usatiuk.utils;
|
||||
|
||||
import jdk.internal.access.JavaNioAccess;
|
||||
import jdk.internal.access.SharedSecrets;
|
||||
import sun.misc.Unsafe;
|
||||
|
||||
import java.lang.reflect.Field;
|
||||
|
||||
public class UnsafeAccessor {
|
||||
private static final UnsafeAccessor INSTANCE;
|
||||
|
||||
static {
|
||||
try {
|
||||
INSTANCE = new UnsafeAccessor();
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
public static UnsafeAccessor get() {
|
||||
return INSTANCE;
|
||||
}
|
||||
|
||||
private JavaNioAccess _nioAccess;
|
||||
private Unsafe _unsafe;
|
||||
|
||||
private UnsafeAccessor() throws NoSuchFieldException, IllegalAccessException {
|
||||
_nioAccess = SharedSecrets.getJavaNioAccess();
|
||||
Field f = Unsafe.class.getDeclaredField("theUnsafe");
|
||||
f.setAccessible(true);
|
||||
_unsafe = (Unsafe) f.get(null);
|
||||
}
|
||||
|
||||
public JavaNioAccess getNioAccess() {
|
||||
return _nioAccess;
|
||||
}
|
||||
|
||||
public Unsafe getUnsafe() {
|
||||
return _unsafe;
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user