use uninitialized byte buffers

This commit is contained in:
2024-08-23 22:51:34 +02:00
parent dca507ec4b
commit 848ab14f8e
11 changed files with 170 additions and 40 deletions

View File

@@ -6,7 +6,7 @@ import com.usatiuk.dhfs.files.service.DhfsFileService;
import com.usatiuk.dhfs.files.service.DirectoryNotEmptyException;
import com.usatiuk.dhfs.files.service.GetattrRes;
import com.usatiuk.dhfs.objects.repository.persistence.ObjectPersistentStore;
import com.usatiuk.dhfs.supportlib.DhfsSupport;
import com.usatiuk.dhfs.supportlib.UninitializedByteBuffer;
import com.usatiuk.kleppmanntree.AlreadyExistsException;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
@@ -28,7 +28,6 @@ import ru.serce.jnrfuse.struct.FuseFileInfo;
import ru.serce.jnrfuse.struct.Statvfs;
import ru.serce.jnrfuse.struct.Timespec;
import java.nio.ByteBuffer;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Optional;
@@ -55,7 +54,6 @@ public class DhfsFuse extends FuseStubFS {
DhfsFileService fileService;
void init(@Observes @Priority(100000) StartupEvent event) {
DhfsSupport.hello();
if (!enabled) return;
Paths.get(root).toFile().mkdirs();
Log.info("Mounting with root " + root);
@@ -203,7 +201,7 @@ public class DhfsFuse extends FuseStubFS {
try {
var fileOpt = fileService.open(path);
if (fileOpt.isEmpty()) return -ErrorCodes.ENOENT();
var buffer = ByteBuffer.allocateDirect((int) size);
var buffer = UninitializedByteBuffer.allocateUninitialized((int) size);
jnrPtrByteOutputAccessors.getUnsafe().copyMemory(
buf.address(),

View File

@@ -10,6 +10,8 @@ import com.usatiuk.kleppmanntree.CombinedTimestamp;
import com.usatiuk.kleppmanntree.LogRecord;
import com.usatiuk.kleppmanntree.StorageInterface;
import com.usatiuk.kleppmanntree.TreeNode;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import org.apache.commons.lang3.NotImplementedException;
import java.util.*;

View File

@@ -13,6 +13,7 @@ import org.eclipse.microprofile.config.inject.ConfigProperty;
import java.util.Collection;
import java.util.HashMap;
import java.util.Objects;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -102,6 +103,7 @@ public class JObjectTxManager {
var bundle = txWriteback.createBundle();
var latch = new CountDownLatch(state._writeObjects.size());
ConcurrentLinkedQueue<Throwable> errors = new ConcurrentLinkedQueue<>();
state._writeObjects.forEach((key, value) -> {
try {
@@ -110,11 +112,17 @@ public class JObjectTxManager {
latch.countDown();
} else if (key.getMeta().isHaveLocalCopy() && key.getData() != null) {
_serializerThreads.execute(() -> {
bundle.commit(key,
protoSerializerService.serialize(key.getMeta()),
protoSerializerService.serializeToJObjectDataP(key.getData())
);
latch.countDown();
try {
bundle.commit(key,
protoSerializerService.serialize(key.getMeta()),
protoSerializerService.serializeToJObjectDataP(key.getData())
);
} catch (Throwable t) {
Log.error("Error serializing " + key.getMeta().getName(), t);
errors.add(t);
} finally {
latch.countDown();
}
});
} else if (key.getMeta().isHaveLocalCopy() && key.getData() == null) {
bundle.commitMetaChange(key,
@@ -139,6 +147,10 @@ public class JObjectTxManager {
throw new RuntimeException(e);
}
if (!errors.isEmpty()) {
throw new RuntimeException("Errors when committing!");
}
state._writeObjects.forEach((key, value) -> key.rwUnlock());
txWriteback.commitBundle(bundle);

View File

@@ -18,7 +18,8 @@ import org.eclipse.microprofile.config.inject.ConfigProperty;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.TreeMap;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
@@ -93,28 +94,43 @@ public class TxWritebackImpl implements TxWriteback {
currentSize += bundle.calculateTotalSize();
}
ArrayList<Callable<Void>> tasks = new ArrayList<>();
var latch = new CountDownLatch(bundle._committed.size() + bundle._meta.size());
ConcurrentLinkedQueue<Throwable> errors = new ConcurrentLinkedQueue<>();
for (var c : bundle._committed.values()) {
tasks.add(() -> {
Log.trace("Writing new " + c.newMeta.getName());
objectPersistentStore.writeNewObject(c.newMeta.getName(), c.newMeta, c.newData);
return null;
_commitExecutor.execute(() -> {
try {
Log.trace("Writing new " + c.newMeta.getName());
objectPersistentStore.writeNewObject(c.newMeta.getName(), c.newMeta, c.newData);
} catch (Throwable t) {
Log.error("Error writing " + c.newMeta.getName(), t);
errors.add(t);
} finally {
latch.countDown();
}
});
}
for (var c : bundle._meta.values()) {
tasks.add(() -> {
Log.trace("Writing (meta) " + c.newMeta.getName());
objectPersistentStore.writeNewObjectMeta(c.newMeta.getName(), c.newMeta);
return null;
_commitExecutor.execute(() -> {
try {
Log.trace("Writing (meta) " + c.newMeta.getName());
objectPersistentStore.writeNewObjectMeta(c.newMeta.getName(), c.newMeta);
} catch (Throwable t) {
Log.error("Error writing " + c.newMeta.getName(), t);
errors.add(t);
} finally {
latch.countDown();
}
});
}
if (Log.isDebugEnabled())
for (var d : bundle._deleted.keySet())
Log.debug("Deleting from persistent storage " + d.getMeta().getName()); // FIXME: For tests
_commitExecutor.invokeAll(tasks);
latch.await();
if (!errors.isEmpty()) {
throw new RuntimeException("Errors in writeback!");
}
objectPersistentStore.commitTx(
new TxManifest(
Stream.concat(bundle._committed.keySet().stream().map(t -> t.getMeta().getName()),

View File

@@ -5,6 +5,7 @@ import com.google.protobuf.UnsafeByteOperations;
import com.usatiuk.dhfs.SerializationHelper;
import com.usatiuk.dhfs.objects.persistence.JObjectDataP;
import com.usatiuk.dhfs.objects.persistence.ObjectMetadataP;
import com.usatiuk.dhfs.supportlib.UninitializedByteBuffer;
import com.usatiuk.utils.StatusRuntimeExceptionNoStacktrace;
import io.grpc.Status;
import io.quarkus.logging.Log;
@@ -28,6 +29,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Objects;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -146,7 +148,7 @@ public class FileObjectPersistentStore implements ObjectPersistentStore {
if (false && len > mmapThreshold) {
buf = rf.getChannel().map(FileChannel.MapMode.READ_ONLY, 8, toRead);
} else {
buf = ByteBuffer.allocateDirect(toRead);
buf = UninitializedByteBuffer.allocateUninitialized(toRead);
fillBuffer(buf, rf.getChannel());
buf.flip();
}
@@ -218,7 +220,7 @@ public class FileObjectPersistentStore implements ObjectPersistentStore {
var totalSize = dataSize + meta.getSerializedSize();
// Avoids CodedOutputStream flushing all the time
var bb = ByteBuffer.allocateDirect(totalSize);
var bb = UninitializedByteBuffer.allocateUninitialized(totalSize);
var bbOut = CodedOutputStream.newInstance(bb);
if (data != null)
@@ -239,7 +241,7 @@ public class FileObjectPersistentStore implements ObjectPersistentStore {
private void writeObjectMetaImpl(Path path, ObjectMetadataP meta, boolean sync) throws IOException {
try (var rf = new RandomAccessFile(path.toFile(), "rw");
var ch = rf.getChannel()) {
var longBuf = ByteBuffer.allocateDirect(8);
var longBuf = UninitializedByteBuffer.allocateUninitialized(8);
fillBuffer(longBuf, ch);
longBuf.flip();
@@ -250,7 +252,7 @@ public class FileObjectPersistentStore implements ObjectPersistentStore {
var totalSize = meta.getSerializedSize();
// Avoids CodedOutputStream flushing all the time
var bb = ByteBuffer.allocateDirect(totalSize);
var bb = UninitializedByteBuffer.allocateUninitialized(totalSize);
var bbOut = CodedOutputStream.newInstance(bb);
meta.writeTo(bbOut);
@@ -330,26 +332,39 @@ public class FileObjectPersistentStore implements ObjectPersistentStore {
putTransactionManifest(manifest);
var latch = new CountDownLatch(manifest.getWritten().size() + manifest.getDeleted().size());
ConcurrentLinkedQueue<Throwable> errors = new ConcurrentLinkedQueue<>();
for (var n : manifest.getWritten()) {
_flushExecutor.execute(() -> {
try {
Files.move(getTmpObjPath(n), getObjPath(n), ATOMIC_MOVE, REPLACE_EXISTING);
} catch (Throwable t) {
Log.error("Error writing " + n, t);
errors.add(t);
} finally {
latch.countDown();
} catch (IOException e) {
throw new RuntimeException(e);
}
});
}
for (var d : manifest.getDeleted()) {
_flushExecutor.execute(() -> {
deleteImpl(getObjPath(d));
latch.countDown();
try {
deleteImpl(getObjPath(d));
} catch (Throwable t) {
Log.error("Error deleting " + d, t);
errors.add(t);
} finally {
latch.countDown();
}
});
}
latch.await();
if (!errors.isEmpty()) {
throw new RuntimeException("Errors when commiting tx!");
}
// No real need to truncate here
// try (var channel = _txFile.getChannel()) {
// channel.truncate(0);

View File

@@ -2,19 +2,14 @@ package com.usatiuk.dhfs.supportlib;
import java.nio.file.Path;
public class DhfsSupport {
class DhfsNativeLibFinder {
static private final String LibName = "libdhfs_support";
static private Path getLibPath() {
static Path getLibPath() {
var override = System.getProperty("com.usatiuk.dhfs.supportlib.native-path-override");
if (override != null)
return Path.of(override);
return Path.of(System.getProperty("com.usatiuk.dhfs.supportlib.native-path"))
.resolve(SysUtils.getLibPlatform() + "-" + SysUtils.getLibArch()).resolve(LibName + "." + SysUtils.getLibExtension());
}
static {
System.load(getLibPath().toAbsolutePath().toString());
}
public static void hello() {
DhfsSupportNative.hello();
}
}

View File

@@ -1,8 +1,18 @@
package com.usatiuk.dhfs.supportlib;
import java.nio.ByteBuffer;
class DhfsSupportNative {
static {
System.load(DhfsNativeLibFinder.getLibPath().toAbsolutePath().toString());
}
public static native void hello();
static native long allocateUninitializedByteBuffer(ByteBuffer[] bb, int size);
static native void dropByteBuffer(long token);
}

View File

@@ -0,0 +1,16 @@
package com.usatiuk.dhfs.supportlib;
import java.lang.ref.Cleaner;
import java.nio.ByteBuffer;
public class UninitializedByteBuffer {
private static final Cleaner CLEANER = Cleaner.create();
public static ByteBuffer allocateUninitialized(int size) {
var bb = new ByteBuffer[1];
long token = DhfsSupportNative.allocateUninitializedByteBuffer(bb, size);
var ret = bb[0];
CLEANER.register(ret, () -> DhfsSupportNative.dropByteBuffer(token));
return ret;
}
}

View File

@@ -84,5 +84,10 @@
<option name="/Default/CodeStyle/CodeFormatting/CppFormatting/INDENT_SIZE/@EntryValue" value="4" type="int" />
<option name="/Default/CodeStyle/CodeFormatting/CppFormatting/CONTINUOUS_LINE_INDENT/@EntryValue" value="Double" type="string" />
<option name="/Default/CodeStyle/CodeFormatting/CppFormatting/TAB_WIDTH/@EntryValue" value="8" type="long" />
<option name="/Default/CodeInspection/ExcludedFiles/FilesAndFoldersToSkip2/=7020124F_002D9FFC_002D4AC3_002D8F3D_002DAAB8E0240759_002Ff_003Ajni_002Eh_002Fl_003A_002E_002E_003F_002E_002E_003F_002E_002E_003F_002E_002E_003F_002E_002E_003FLibrary_003FJava_003FJavaVirtualMachines_003Fzulu_002D21_002Ejdk_003FContents_003FHome_003Finclude_003Fjni_002Eh/@EntryIndexedValue" value="ExplicitlyExcluded" type="string" />
<option name="/Default/CodeInspection/ExcludedFiles/FilesAndFoldersToSkip2/=7020124F_002D9FFC_002D4AC3_002D8F3D_002DAAB8E0240759_002Ff_003A_005Fmalloc_002Eh_002Fl_003A_002E_002E_003F_002E_002E_003F_002E_002E_003F_002E_002E_003F_002E_002E_003FApplications_003FXcode_002Eapp_003FContents_003FDeveloper_003FPlatforms_003FMacOSX_002Eplatform_003FDeveloper_003FSDKs_003FMacOSX14_002E5_002Esdk_003Fusr_003Finclude_003Fmalloc_003F_005Fmalloc_002Eh/@EntryIndexedValue" value="ExplicitlyExcluded" type="string" />
<option name="/Default/CodeInspection/ExcludedFiles/FilesAndFoldersToSkip2/=7020124F_002D9FFC_002D4AC3_002D8F3D_002DAAB8E0240759_002Ff_003A_005Fmalloc_002Eh_002Fl_003A_002E_002E_003F_002E_002E_003F_002E_002E_003F_002E_002E_003F_002E_002E_003FApplications_003FXcode_002Eapp_003FContents_003FDeveloper_003FPlatforms_003FMacOSX_002Eplatform_003FDeveloper_003FSDKs_003FMacOSX14_002E5_002Esdk_003Fusr_003Finclude_003Fmalloc_003F_005Fmalloc_002Eh/@EntryIndexRemoved" />
<option name="/Default/CodeInspection/ExcludedFiles/FilesAndFoldersToSkip2/=7020124F_002D9FFC_002D4AC3_002D8F3D_002DAAB8E0240759_002Ff_003Ajni_005Fmd_002Eh_002Fl_003A_002E_002E_003F_002E_002E_003F_002E_002E_003F_002E_002E_003F_002E_002E_003FLibrary_003FJava_003FJavaVirtualMachines_003Fzulu_002D21_002Ejdk_003FContents_003FHome_003Finclude_003Fdarwin_003Fjni_005Fmd_002Eh/@EntryIndexedValue" value="ExplicitlyExcluded" type="string" />
<option name="/Default/CodeInspection/ExcludedFiles/FilesAndFoldersToSkip2/=7020124F_002D9FFC_002D4AC3_002D8F3D_002DAAB8E0240759_002Ff_003Ajni_005Fmd_002Eh_002Fl_003A_002E_002E_003F_002E_002E_003F_002E_002E_003F_002E_002E_003F_002E_002E_003FLibrary_003FJava_003FJavaVirtualMachines_003Fzulu_002D21_002Ejdk_003FContents_003FHome_003Finclude_003Fdarwin_003Fjni_005Fmd_002Eh/@EntryIndexRemoved" />
</component>
</project>

View File

@@ -1,9 +1,61 @@
#include <cstdio>
#include <cstdlib>
#include <cstdint>
#include <cassert>
#include <unistd.h>
#include "com_usatiuk_dhfs_supportlib_DhfsSupportNative.h"
long get_page_size() {
static const long PAGE_SIZE = sysconf(_SC_PAGESIZE);
return PAGE_SIZE;
}
constexpr uintptr_t align_up(uintptr_t what, size_t alignment) {
assert(__builtin_popcount(alignment) == 1);
const uintptr_t mask = alignment - 1;
if (what & mask) {
return (what & ~mask) + alignment;
}
return what;
}
extern "C" {
void Java_com_usatiuk_dhfs_supportlib_DhfsSupportNative_hello(JNIEnv* env, jclass klass) {
JNIEXPORT void JNICALL Java_com_usatiuk_dhfs_supportlib_DhfsSupportNative_hello(JNIEnv* env, jclass klass) {
printf("Hello, World!\n");
}
JNIEXPORT jlong JNICALL Java_com_usatiuk_dhfs_supportlib_DhfsSupportNative_allocateUninitializedByteBuffer
(JNIEnv* env, jclass klass, jobjectArray bb, jint size) {
void* buf;
if (size < get_page_size())
buf = malloc(size);
else
buf = std::aligned_alloc(get_page_size(), align_up(size, get_page_size()));
if (buf == nullptr) {
env->ThrowNew(env->FindClass("java/lang/OutOfMemoryError"), "Buffer memory allocation failed");
return 0;
}
env->SetObjectArrayElement(bb, 0, env->NewDirectByteBuffer(buf, size));
jlong token = static_cast<jlong>((uintptr_t) buf);
assert(token == (uintptr_t)buf);
return token;
}
JNIEXPORT void JNICALL Java_com_usatiuk_dhfs_supportlib_DhfsSupportNative_dropByteBuffer
(JNIEnv* env, jclass klass, jlong token) {
uintptr_t addr = static_cast<uintptr_t>(token);
assert(addr == token);
if (addr == 0) {
env->ThrowNew(env->FindClass("java/lang/IllegalArgumentException"), "Trying to free null pointer");
return;
}
free((void*) addr);
}
}

View File

@@ -0,0 +1,9 @@
package com.usatiuk.dhfs.supportlib;
import java.nio.file.Path;
class DhfsNativeLibFinder {
static Path getLibPath() {
return null;
}
}