mirror of
https://github.com/usatiuk/dhfs.git
synced 2025-10-29 04:57:48 +01:00
Compare commits
2 Commits
b9852b68d6
...
126bf08615
| Author | SHA1 | Date | |
|---|---|---|---|
| 126bf08615 | |||
| c88d8cfbb5 |
@@ -30,6 +30,7 @@ services:
|
||||
- $HOME/dhfs/dhfs2_f:/dhfs_root/fuse:rshared
|
||||
- ./target/quarkus-app:/app
|
||||
command: "java --add-exports java.base/sun.nio.ch=ALL-UNNAMED
|
||||
--add-exports java.base/jdk.internal.access=ALL-UNNAMED
|
||||
-Ddhfs.objects.persistence.files.root=/dhfs_root/p
|
||||
-Ddhfs.objects.root=/dhfs_root/d
|
||||
-Ddhfs.fuse.root=/dhfs_root/fuse -Dquarkus.http.host=0.0.0.0
|
||||
|
||||
@@ -8,7 +8,10 @@
|
||||
|
||||
<properties>
|
||||
<compiler-plugin.version>3.12.1</compiler-plugin.version>
|
||||
<maven.compiler.release>21</maven.compiler.release>
|
||||
<!--FIXME-->
|
||||
<maven.compiler.release></maven.compiler.release>
|
||||
<maven.compiler.source>21</maven.compiler.source>
|
||||
<maven.compiler.target>21</maven.compiler.target>
|
||||
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
|
||||
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
|
||||
<quarkus.platform.artifact-id>quarkus-bom</quarkus.platform.artifact-id>
|
||||
@@ -167,6 +170,8 @@
|
||||
<configuration>
|
||||
<compilerArgs>
|
||||
<arg>-parameters</arg>
|
||||
<arg>--add-exports</arg>
|
||||
<arg>java.base/jdk.internal.access=ALL-UNNAMED</arg>
|
||||
</compilerArgs>
|
||||
</configuration>
|
||||
</plugin>
|
||||
@@ -178,7 +183,10 @@
|
||||
<java.util.logging.manager>org.jboss.logmanager.LogManager</java.util.logging.manager>
|
||||
<maven.home>${maven.home}</maven.home>
|
||||
</systemPropertyVariables>
|
||||
<argLine>--add-exports java.base/sun.nio.ch=ALL-UNNAMED</argLine>
|
||||
<argLine>
|
||||
--add-exports java.base/sun.nio.ch=ALL-UNNAMED
|
||||
--add-exports java.base/jdk.internal.access=ALL-UNNAMED
|
||||
</argLine>
|
||||
<skipTests>${skip.unit}</skipTests>
|
||||
</configuration>
|
||||
</plugin>
|
||||
|
||||
@@ -49,6 +49,9 @@ public class DhfsFuse extends FuseStubFS {
|
||||
@ConfigProperty(name = "dhfs.files.target_chunk_size")
|
||||
int targetChunkSize;
|
||||
|
||||
@Inject
|
||||
JnrPtrByteOutputAccessors jnrPtrByteOutputAccessors;
|
||||
|
||||
@Inject
|
||||
DhfsFileService fileService;
|
||||
|
||||
@@ -166,7 +169,7 @@ public class DhfsFuse extends FuseStubFS {
|
||||
var file = fileOpt.get();
|
||||
var read = fileService.read(fileOpt.get(), offset, (int) size);
|
||||
if (read.isEmpty()) return 0;
|
||||
UnsafeByteOperations.unsafeWriteTo(read.get(), new JnrPtrByteOutput(buf, size));
|
||||
UnsafeByteOperations.unsafeWriteTo(read.get(), new JnrPtrByteOutput(jnrPtrByteOutputAccessors, buf, size));
|
||||
return read.get().size();
|
||||
} catch (Exception e) {
|
||||
Log.error("When reading " + path, e);
|
||||
@@ -318,7 +321,7 @@ public class DhfsFuse extends FuseStubFS {
|
||||
var file = fileOpt.get();
|
||||
var read = fileService.readlinkBS(fileOpt.get());
|
||||
if (read.isEmpty()) return 0;
|
||||
UnsafeByteOperations.unsafeWriteTo(read, new JnrPtrByteOutput(buf, size));
|
||||
UnsafeByteOperations.unsafeWriteTo(read, new JnrPtrByteOutput(jnrPtrByteOutputAccessors, buf, size));
|
||||
buf.putByte(Math.min(size - 1, read.size()), (byte) 0);
|
||||
return 0;
|
||||
} catch (Exception e) {
|
||||
|
||||
@@ -9,10 +9,13 @@ public class JnrPtrByteOutput extends ByteOutput {
|
||||
private final Pointer _backing;
|
||||
private final long _size;
|
||||
private long _pos;
|
||||
public JnrPtrByteOutput(Pointer backing, long size) {
|
||||
private final JnrPtrByteOutputAccessors _accessors;
|
||||
|
||||
public JnrPtrByteOutput(JnrPtrByteOutputAccessors accessors, Pointer backing, long size) {
|
||||
_backing = backing;
|
||||
_size = size;
|
||||
_pos = 0;
|
||||
_accessors = accessors;
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -36,11 +39,22 @@ public class JnrPtrByteOutput extends ByteOutput {
|
||||
|
||||
@Override
|
||||
public void write(ByteBuffer value) {
|
||||
throw new UnsupportedOperationException();
|
||||
var rem = value.remaining();
|
||||
if (rem + _pos > _size) throw new IndexOutOfBoundsException();
|
||||
|
||||
if (value.isDirect()) {
|
||||
long addr = _accessors.getNioAccess().getBufferAddress(value) + value.position();
|
||||
var out = _backing.address() + _pos;
|
||||
_accessors.getUnsafe().copyMemory(addr, out, rem);
|
||||
} else {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
_pos += rem;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeLazy(ByteBuffer value) {
|
||||
throw new UnsupportedOperationException();
|
||||
write(value);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,24 @@
|
||||
package com.usatiuk.dhfs.fuse;
|
||||
|
||||
import jakarta.inject.Singleton;
|
||||
import jdk.internal.access.JavaNioAccess;
|
||||
import jdk.internal.access.SharedSecrets;
|
||||
import lombok.Getter;
|
||||
import sun.misc.Unsafe;
|
||||
|
||||
import java.lang.reflect.Field;
|
||||
|
||||
@Singleton
|
||||
class JnrPtrByteOutputAccessors {
|
||||
@Getter
|
||||
JavaNioAccess _nioAccess;
|
||||
@Getter
|
||||
Unsafe _unsafe;
|
||||
|
||||
JnrPtrByteOutputAccessors() throws NoSuchFieldException, IllegalAccessException {
|
||||
_nioAccess = SharedSecrets.getJavaNioAccess();
|
||||
Field f = Unsafe.class.getDeclaredField("theUnsafe");
|
||||
f.setAccessible(true);
|
||||
_unsafe = (Unsafe) f.get(null);
|
||||
}
|
||||
}
|
||||
@@ -1,5 +1,6 @@
|
||||
package com.usatiuk.dhfs.objects.repository.persistence;
|
||||
|
||||
import com.google.protobuf.CodedOutputStream;
|
||||
import com.google.protobuf.Message;
|
||||
import com.google.protobuf.UnsafeByteOperations;
|
||||
import com.usatiuk.dhfs.objects.persistence.JObjectDataP;
|
||||
@@ -17,6 +18,7 @@ import org.eclipse.microprofile.config.inject.ConfigProperty;
|
||||
|
||||
import javax.annotation.Nonnull;
|
||||
import java.io.*;
|
||||
import java.nio.channels.FileChannel;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.NoSuchFileException;
|
||||
import java.nio.file.Path;
|
||||
@@ -33,6 +35,8 @@ public class FileObjectPersistentStore implements ObjectPersistentStore {
|
||||
private final Path metaPath;
|
||||
private final Path dataPath;
|
||||
|
||||
private final static long mmapThreshold = 65536;
|
||||
|
||||
public FileObjectPersistentStore(@ConfigProperty(name = "dhfs.objects.persistence.files.root") String root) {
|
||||
this.root = root;
|
||||
this.metaPath = Paths.get(root, "meta");
|
||||
@@ -108,9 +112,20 @@ public class FileObjectPersistentStore implements ObjectPersistentStore {
|
||||
}
|
||||
|
||||
private <T extends Message> T readObjectImpl(T defaultInstance, Path path) {
|
||||
try (var fsb = new FileInputStream(path.toFile())) {
|
||||
var file = fsb.readAllBytes();
|
||||
return (T) defaultInstance.getParserForType().parseFrom(UnsafeByteOperations.unsafeWrap(file));
|
||||
try (var rf = new RandomAccessFile(path.toFile(), "r")) {
|
||||
var len = rf.length();
|
||||
if (len > mmapThreshold) {
|
||||
var bs = UnsafeByteOperations.unsafeWrap(rf.getChannel().map(FileChannel.MapMode.READ_ONLY, 0, len));
|
||||
// This way, the input will be considered "immutable" which would allow avoiding copies
|
||||
// when parsing byte arrays
|
||||
var ch = bs.newCodedInput();
|
||||
ch.enableAliasing(true);
|
||||
return (T) defaultInstance.getParserForType().parseFrom(ch);
|
||||
} else {
|
||||
var arr = new byte[(int) len];
|
||||
rf.readFully(arr, 0, (int) len);
|
||||
return (T) defaultInstance.getParserForType().parseFrom(UnsafeByteOperations.unsafeWrap(arr));
|
||||
}
|
||||
} catch (FileNotFoundException | NoSuchFileException fx) {
|
||||
throw new StatusRuntimeExceptionNoStacktrace(Status.NOT_FOUND);
|
||||
} catch (IOException e) {
|
||||
@@ -133,6 +148,15 @@ public class FileObjectPersistentStore implements ObjectPersistentStore {
|
||||
|
||||
private void writeObjectImpl(Path path, Message data) {
|
||||
try {
|
||||
var len = data.getSerializedSize();
|
||||
if (len > mmapThreshold)
|
||||
try (var rf = new RandomAccessFile(path.toFile(), "rw")) {
|
||||
rf.setLength(len);
|
||||
var mapped = rf.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, len);
|
||||
data.writeTo(CodedOutputStream.newInstance(mapped));
|
||||
return;
|
||||
}
|
||||
|
||||
try (var fsb = new FileOutputStream(path.toFile(), false);
|
||||
var buf = new BufferedOutputStream(fsb, Math.min(65536, data.getSerializedSize()))) {
|
||||
data.writeTo(buf);
|
||||
|
||||
@@ -13,7 +13,7 @@ dhfs.fuse.root=${HOME}/dhfs_default/fuse
|
||||
dhfs.fuse.debug=false
|
||||
dhfs.fuse.enabled=true
|
||||
dhfs.files.allow_recursive_delete=false
|
||||
dhfs.files.target_chunk_size=524288
|
||||
dhfs.files.target_chunk_size=2097152
|
||||
# Writes strictly smaller than this will try to merge with blocks nearby
|
||||
dhfs.files.write_merge_threshold=0.8
|
||||
# If a merge would result in a block of greater size than this, stop merging
|
||||
|
||||
@@ -45,6 +45,7 @@ public class DhfsFuseIT {
|
||||
.run("apt update && apt install -y libfuse2 curl")
|
||||
.copy("/app", "/app")
|
||||
.cmd("java", "-ea", "-Xmx128M", "--add-exports", "java.base/sun.nio.ch=ALL-UNNAMED",
|
||||
"--add-exports", "java.base/jdk.internal.access=ALL-UNNAMED",
|
||||
"-Ddhfs.objects.peerdiscovery.interval=100",
|
||||
"-Ddhfs.objects.invalidation.delay=100",
|
||||
"-Ddhfs.objects.ref_verification=true",
|
||||
|
||||
@@ -49,6 +49,7 @@ public class DhfsFusex3IT {
|
||||
.run("apt update && apt install -y libfuse2 curl gcc")
|
||||
.copy("/app", "/app")
|
||||
.cmd("java", "-ea", "-Xmx128M", "--add-exports", "java.base/sun.nio.ch=ALL-UNNAMED",
|
||||
"--add-exports", "java.base/jdk.internal.access=ALL-UNNAMED",
|
||||
"-Ddhfs.objects.peerdiscovery.interval=100",
|
||||
"-Ddhfs.objects.invalidation.delay=100",
|
||||
"-Ddhfs.objects.deletion.delay=0",
|
||||
|
||||
Reference in New Issue
Block a user