2 Commits

Author SHA1 Message Date
126bf08615 extra trickery to avoid one extra copy 2024-07-30 00:35:56 +02:00
c88d8cfbb5 use mmaped files 2024-07-29 23:23:32 +02:00
9 changed files with 87 additions and 11 deletions

View File

@@ -30,6 +30,7 @@ services:
- $HOME/dhfs/dhfs2_f:/dhfs_root/fuse:rshared
- ./target/quarkus-app:/app
command: "java --add-exports java.base/sun.nio.ch=ALL-UNNAMED
--add-exports java.base/jdk.internal.access=ALL-UNNAMED
-Ddhfs.objects.persistence.files.root=/dhfs_root/p
-Ddhfs.objects.root=/dhfs_root/d
-Ddhfs.fuse.root=/dhfs_root/fuse -Dquarkus.http.host=0.0.0.0

View File

@@ -8,7 +8,10 @@
<properties>
<compiler-plugin.version>3.12.1</compiler-plugin.version>
<maven.compiler.release>21</maven.compiler.release>
<!--FIXME-->
<maven.compiler.release></maven.compiler.release>
<maven.compiler.source>21</maven.compiler.source>
<maven.compiler.target>21</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<quarkus.platform.artifact-id>quarkus-bom</quarkus.platform.artifact-id>
@@ -167,6 +170,8 @@
<configuration>
<compilerArgs>
<arg>-parameters</arg>
<arg>--add-exports</arg>
<arg>java.base/jdk.internal.access=ALL-UNNAMED</arg>
</compilerArgs>
</configuration>
</plugin>
@@ -178,7 +183,10 @@
<java.util.logging.manager>org.jboss.logmanager.LogManager</java.util.logging.manager>
<maven.home>${maven.home}</maven.home>
</systemPropertyVariables>
<argLine>--add-exports java.base/sun.nio.ch=ALL-UNNAMED</argLine>
<argLine>
--add-exports java.base/sun.nio.ch=ALL-UNNAMED
--add-exports java.base/jdk.internal.access=ALL-UNNAMED
</argLine>
<skipTests>${skip.unit}</skipTests>
</configuration>
</plugin>

View File

@@ -49,6 +49,9 @@ public class DhfsFuse extends FuseStubFS {
@ConfigProperty(name = "dhfs.files.target_chunk_size")
int targetChunkSize;
@Inject
JnrPtrByteOutputAccessors jnrPtrByteOutputAccessors;
@Inject
DhfsFileService fileService;
@@ -166,7 +169,7 @@ public class DhfsFuse extends FuseStubFS {
var file = fileOpt.get();
var read = fileService.read(fileOpt.get(), offset, (int) size);
if (read.isEmpty()) return 0;
UnsafeByteOperations.unsafeWriteTo(read.get(), new JnrPtrByteOutput(buf, size));
UnsafeByteOperations.unsafeWriteTo(read.get(), new JnrPtrByteOutput(jnrPtrByteOutputAccessors, buf, size));
return read.get().size();
} catch (Exception e) {
Log.error("When reading " + path, e);
@@ -318,7 +321,7 @@ public class DhfsFuse extends FuseStubFS {
var file = fileOpt.get();
var read = fileService.readlinkBS(fileOpt.get());
if (read.isEmpty()) return 0;
UnsafeByteOperations.unsafeWriteTo(read, new JnrPtrByteOutput(buf, size));
UnsafeByteOperations.unsafeWriteTo(read, new JnrPtrByteOutput(jnrPtrByteOutputAccessors, buf, size));
buf.putByte(Math.min(size - 1, read.size()), (byte) 0);
return 0;
} catch (Exception e) {

View File

@@ -9,10 +9,13 @@ public class JnrPtrByteOutput extends ByteOutput {
private final Pointer _backing;
private final long _size;
private long _pos;
public JnrPtrByteOutput(Pointer backing, long size) {
private final JnrPtrByteOutputAccessors _accessors;
public JnrPtrByteOutput(JnrPtrByteOutputAccessors accessors, Pointer backing, long size) {
_backing = backing;
_size = size;
_pos = 0;
_accessors = accessors;
}
@Override
@@ -36,11 +39,22 @@ public class JnrPtrByteOutput extends ByteOutput {
@Override
public void write(ByteBuffer value) {
throw new UnsupportedOperationException();
var rem = value.remaining();
if (rem + _pos > _size) throw new IndexOutOfBoundsException();
if (value.isDirect()) {
long addr = _accessors.getNioAccess().getBufferAddress(value) + value.position();
var out = _backing.address() + _pos;
_accessors.getUnsafe().copyMemory(addr, out, rem);
} else {
throw new UnsupportedOperationException();
}
_pos += rem;
}
@Override
public void writeLazy(ByteBuffer value) {
throw new UnsupportedOperationException();
write(value);
}
}

View File

@@ -0,0 +1,24 @@
package com.usatiuk.dhfs.fuse;
import jakarta.inject.Singleton;
import jdk.internal.access.JavaNioAccess;
import jdk.internal.access.SharedSecrets;
import lombok.Getter;
import sun.misc.Unsafe;
import java.lang.reflect.Field;
@Singleton
class JnrPtrByteOutputAccessors {
@Getter
JavaNioAccess _nioAccess;
@Getter
Unsafe _unsafe;
JnrPtrByteOutputAccessors() throws NoSuchFieldException, IllegalAccessException {
_nioAccess = SharedSecrets.getJavaNioAccess();
Field f = Unsafe.class.getDeclaredField("theUnsafe");
f.setAccessible(true);
_unsafe = (Unsafe) f.get(null);
}
}

View File

@@ -1,5 +1,6 @@
package com.usatiuk.dhfs.objects.repository.persistence;
import com.google.protobuf.CodedOutputStream;
import com.google.protobuf.Message;
import com.google.protobuf.UnsafeByteOperations;
import com.usatiuk.dhfs.objects.persistence.JObjectDataP;
@@ -17,6 +18,7 @@ import org.eclipse.microprofile.config.inject.ConfigProperty;
import javax.annotation.Nonnull;
import java.io.*;
import java.nio.channels.FileChannel;
import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
@@ -33,6 +35,8 @@ public class FileObjectPersistentStore implements ObjectPersistentStore {
private final Path metaPath;
private final Path dataPath;
private final static long mmapThreshold = 65536;
public FileObjectPersistentStore(@ConfigProperty(name = "dhfs.objects.persistence.files.root") String root) {
this.root = root;
this.metaPath = Paths.get(root, "meta");
@@ -108,9 +112,20 @@ public class FileObjectPersistentStore implements ObjectPersistentStore {
}
private <T extends Message> T readObjectImpl(T defaultInstance, Path path) {
try (var fsb = new FileInputStream(path.toFile())) {
var file = fsb.readAllBytes();
return (T) defaultInstance.getParserForType().parseFrom(UnsafeByteOperations.unsafeWrap(file));
try (var rf = new RandomAccessFile(path.toFile(), "r")) {
var len = rf.length();
if (len > mmapThreshold) {
var bs = UnsafeByteOperations.unsafeWrap(rf.getChannel().map(FileChannel.MapMode.READ_ONLY, 0, len));
// This way, the input will be considered "immutable" which would allow avoiding copies
// when parsing byte arrays
var ch = bs.newCodedInput();
ch.enableAliasing(true);
return (T) defaultInstance.getParserForType().parseFrom(ch);
} else {
var arr = new byte[(int) len];
rf.readFully(arr, 0, (int) len);
return (T) defaultInstance.getParserForType().parseFrom(UnsafeByteOperations.unsafeWrap(arr));
}
} catch (FileNotFoundException | NoSuchFileException fx) {
throw new StatusRuntimeExceptionNoStacktrace(Status.NOT_FOUND);
} catch (IOException e) {
@@ -133,6 +148,15 @@ public class FileObjectPersistentStore implements ObjectPersistentStore {
private void writeObjectImpl(Path path, Message data) {
try {
var len = data.getSerializedSize();
if (len > mmapThreshold)
try (var rf = new RandomAccessFile(path.toFile(), "rw")) {
rf.setLength(len);
var mapped = rf.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, len);
data.writeTo(CodedOutputStream.newInstance(mapped));
return;
}
try (var fsb = new FileOutputStream(path.toFile(), false);
var buf = new BufferedOutputStream(fsb, Math.min(65536, data.getSerializedSize()))) {
data.writeTo(buf);

View File

@@ -13,7 +13,7 @@ dhfs.fuse.root=${HOME}/dhfs_default/fuse
dhfs.fuse.debug=false
dhfs.fuse.enabled=true
dhfs.files.allow_recursive_delete=false
dhfs.files.target_chunk_size=524288
dhfs.files.target_chunk_size=2097152
# Writes strictly smaller than this will try to merge with blocks nearby
dhfs.files.write_merge_threshold=0.8
# If a merge would result in a block of greater size than this, stop merging

View File

@@ -45,6 +45,7 @@ public class DhfsFuseIT {
.run("apt update && apt install -y libfuse2 curl")
.copy("/app", "/app")
.cmd("java", "-ea", "-Xmx128M", "--add-exports", "java.base/sun.nio.ch=ALL-UNNAMED",
"--add-exports", "java.base/jdk.internal.access=ALL-UNNAMED",
"-Ddhfs.objects.peerdiscovery.interval=100",
"-Ddhfs.objects.invalidation.delay=100",
"-Ddhfs.objects.ref_verification=true",

View File

@@ -49,6 +49,7 @@ public class DhfsFusex3IT {
.run("apt update && apt install -y libfuse2 curl gcc")
.copy("/app", "/app")
.cmd("java", "-ea", "-Xmx128M", "--add-exports", "java.base/sun.nio.ch=ALL-UNNAMED",
"--add-exports", "java.base/jdk.internal.access=ALL-UNNAMED",
"-Ddhfs.objects.peerdiscovery.interval=100",
"-Ddhfs.objects.invalidation.delay=100",
"-Ddhfs.objects.deletion.delay=0",