mirror of
https://github.com/usatiuk/dhfs.git
synced 2025-10-29 04:57:48 +01:00
a little improved file storage format
This commit is contained in:
@@ -21,7 +21,6 @@
|
||||
<dependency>
|
||||
<groupId>org.junit.jupiter</groupId>
|
||||
<artifactId>junit-jupiter-engine</artifactId>
|
||||
<version>5.9.1</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
|
||||
@@ -53,6 +53,12 @@
|
||||
<version>1.18.34</version>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.junit.jupiter</groupId>
|
||||
<artifactId>junit-jupiter-engine</artifactId>
|
||||
<version>5.9.1</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</dependencyManagement>
|
||||
|
||||
|
||||
@@ -1,14 +1,17 @@
|
||||
package com.usatiuk.dhfs.objects.repository.persistence;
|
||||
|
||||
import com.google.protobuf.ByteString;
|
||||
import com.google.protobuf.CodedOutputStream;
|
||||
import com.google.protobuf.UnsafeByteOperations;
|
||||
import com.usatiuk.dhfs.SerializationHelper;
|
||||
import com.usatiuk.dhfs.objects.persistence.JObjectDataP;
|
||||
import com.usatiuk.dhfs.objects.persistence.ObjectMetadataP;
|
||||
import com.usatiuk.dhfs.supportlib.DhfsSupportNative;
|
||||
import com.usatiuk.dhfs.supportlib.UninitializedByteBuffer;
|
||||
import com.usatiuk.utils.ByteUtils;
|
||||
import com.usatiuk.utils.StatusRuntimeExceptionNoStacktrace;
|
||||
import io.grpc.Status;
|
||||
import io.grpc.StatusRuntimeException;
|
||||
import io.quarkus.logging.Log;
|
||||
import io.quarkus.runtime.ShutdownEvent;
|
||||
import io.quarkus.runtime.StartupEvent;
|
||||
@@ -39,13 +42,16 @@ import static java.nio.file.StandardCopyOption.ATOMIC_MOVE;
|
||||
import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
|
||||
|
||||
// File format:
|
||||
// 64-bit offset of metadata (if 8 then file has no data)
|
||||
// 64-bit metadata serialized size
|
||||
// 64-bit offset of "rest of" metadata (if -1 then file has no data,
|
||||
// if 0 then file has data and metadata fits into META_BLOCK_SIZE)
|
||||
// Until META_BLOCK_SIZE - metadata (encoded as ObjectMetadataP)
|
||||
// data (encoded as JObjectDataP)
|
||||
// metadata (encoded as ObjectMetadataP)
|
||||
// rest of metadata
|
||||
|
||||
@ApplicationScoped
|
||||
public class FileObjectPersistentStore implements ObjectPersistentStore {
|
||||
private final static long mmapThreshold = 65536;
|
||||
private final int META_BLOCK_SIZE = DhfsSupportNative.PAGE_SIZE;
|
||||
private final Path _root;
|
||||
private final Path _txManifest;
|
||||
private ExecutorService _flushExecutor;
|
||||
@@ -141,27 +147,26 @@ public class FileObjectPersistentStore implements ObjectPersistentStore {
|
||||
verifyReady();
|
||||
var path = getObjPath(name);
|
||||
try (var rf = new RandomAccessFile(path.toFile(), "r")) {
|
||||
var len = rf.length();
|
||||
var longBuf = new byte[8];
|
||||
rf.seek(8);
|
||||
rf.readFully(longBuf);
|
||||
int metaOff = Math.toIntExact(ByteUtils.bytesToLong(longBuf));
|
||||
|
||||
var metaOff = ByteUtils.bytesToLong(longBuf);
|
||||
if (metaOff < 0)
|
||||
throw new StatusRuntimeException(Status.NOT_FOUND);
|
||||
|
||||
int toRead = (int) (metaOff - 8);
|
||||
int toRead;
|
||||
|
||||
if (toRead <= 0)
|
||||
throw new StatusRuntimeExceptionNoStacktrace(Status.NOT_FOUND);
|
||||
if (metaOff > 0)
|
||||
toRead = metaOff - META_BLOCK_SIZE;
|
||||
else
|
||||
toRead = Math.toIntExact(rf.length()) - META_BLOCK_SIZE;
|
||||
|
||||
ByteBuffer buf;
|
||||
rf.seek(META_BLOCK_SIZE);
|
||||
|
||||
//FIXME: rewriting files breaks!
|
||||
if (false && len > mmapThreshold) {
|
||||
buf = rf.getChannel().map(FileChannel.MapMode.READ_ONLY, 8, toRead);
|
||||
} else {
|
||||
buf = UninitializedByteBuffer.allocateUninitialized(toRead);
|
||||
fillBuffer(buf, rf.getChannel());
|
||||
buf.flip();
|
||||
}
|
||||
ByteBuffer buf = UninitializedByteBuffer.allocateUninitialized(toRead);
|
||||
fillBuffer(buf, rf.getChannel());
|
||||
buf.flip();
|
||||
|
||||
var bs = UnsafeByteOperations.unsafeWrap(buf);
|
||||
// This way, the input will be considered "immutable" which would allow avoiding copies
|
||||
@@ -183,19 +188,40 @@ public class FileObjectPersistentStore implements ObjectPersistentStore {
|
||||
verifyReady();
|
||||
var path = getObjPath(name);
|
||||
try (var rf = new RandomAccessFile(path.toFile(), "r")) {
|
||||
var len = rf.length();
|
||||
var longBuf = new byte[8];
|
||||
int len = Math.toIntExact(rf.length());
|
||||
var buf = UninitializedByteBuffer.allocateUninitialized(META_BLOCK_SIZE);
|
||||
fillBuffer(buf, rf.getChannel());
|
||||
|
||||
rf.readFully(longBuf);
|
||||
buf.flip();
|
||||
int metaSize = Math.toIntExact(buf.getLong());
|
||||
int metaOff = Math.toIntExact(buf.getLong());
|
||||
|
||||
var metaOff = ByteUtils.bytesToLong(longBuf);
|
||||
ByteBuffer extraBuf;
|
||||
|
||||
int toRead = (int) (len - metaOff);
|
||||
if (metaOff > 0) {
|
||||
extraBuf = UninitializedByteBuffer.allocateUninitialized(len - metaOff);
|
||||
rf.seek(metaOff);
|
||||
fillBuffer(extraBuf, rf.getChannel());
|
||||
} else if (metaOff < 0) {
|
||||
if (len > META_BLOCK_SIZE) {
|
||||
extraBuf = UninitializedByteBuffer.allocateUninitialized(len - META_BLOCK_SIZE);
|
||||
fillBuffer(extraBuf, rf.getChannel());
|
||||
} else {
|
||||
extraBuf = null;
|
||||
}
|
||||
} else {
|
||||
extraBuf = null;
|
||||
}
|
||||
|
||||
var arr = new byte[(int) toRead];
|
||||
rf.seek(metaOff);
|
||||
rf.readFully(arr, 0, toRead);
|
||||
return ObjectMetadataP.parseFrom(UnsafeByteOperations.unsafeWrap(arr));
|
||||
ByteString bs = UnsafeByteOperations.unsafeWrap(buf.position(16).slice());
|
||||
if (extraBuf != null) {
|
||||
extraBuf.flip();
|
||||
bs = bs.concat(UnsafeByteOperations.unsafeWrap(extraBuf));
|
||||
}
|
||||
|
||||
bs = bs.substring(0, metaSize);
|
||||
|
||||
return ObjectMetadataP.parseFrom(bs);
|
||||
} catch (FileNotFoundException | NoSuchFileException fx) {
|
||||
throw new StatusRuntimeExceptionNoStacktrace(Status.NOT_FOUND);
|
||||
} catch (IOException e) {
|
||||
@@ -217,22 +243,43 @@ public class FileObjectPersistentStore implements ObjectPersistentStore {
|
||||
|
||||
private void writeObjectImpl(Path path, ObjectMetadataP meta, JObjectDataP data, boolean sync) throws IOException {
|
||||
try (var fsb = new FileOutputStream(path.toFile(), false)) {
|
||||
var dataSize = data != null ? data.getSerializedSize() : 0;
|
||||
fsb.write(ByteUtils.longToBytes(dataSize + 8));
|
||||
int metaSize = meta.getSerializedSize() + 16;
|
||||
int dataSize = data == null ? 0 : data.getSerializedSize();
|
||||
|
||||
var totalSize = dataSize + meta.getSerializedSize();
|
||||
// Avoids CodedOutputStream flushing all the time
|
||||
var bb = UninitializedByteBuffer.allocateUninitialized(totalSize);
|
||||
var bbOut = CodedOutputStream.newInstance(bb);
|
||||
var metaBb = UninitializedByteBuffer.allocateUninitialized(Math.max(META_BLOCK_SIZE, meta.getSerializedSize() + 16));
|
||||
metaBb.putLong(metaSize - 16);
|
||||
if (data == null)
|
||||
metaBb.putLong(-1);
|
||||
else if (metaSize <= META_BLOCK_SIZE)
|
||||
metaBb.putLong(0);
|
||||
else
|
||||
metaBb.putLong(META_BLOCK_SIZE + dataSize);
|
||||
{
|
||||
var metaBbOut = CodedOutputStream.newInstance(metaBb);
|
||||
meta.writeTo(metaBbOut);
|
||||
metaBbOut.flush();
|
||||
metaBb.flip();
|
||||
}
|
||||
|
||||
if (data != null)
|
||||
data.writeTo(bbOut);
|
||||
meta.writeTo(bbOut);
|
||||
bbOut.flush();
|
||||
|
||||
if (fsb.getChannel().write(bb.flip()) != totalSize)
|
||||
if (fsb.getChannel().write(metaBb.limit(META_BLOCK_SIZE)) != META_BLOCK_SIZE)
|
||||
throw new IOException("Could not write to file");
|
||||
|
||||
if (data != null) {
|
||||
var dataBb = UninitializedByteBuffer.allocateUninitialized(dataSize);
|
||||
var dataBbOut = CodedOutputStream.newInstance(dataBb);
|
||||
data.writeTo(dataBbOut);
|
||||
dataBbOut.flush();
|
||||
dataBb.flip();
|
||||
if (fsb.getChannel().write(dataBb) != dataSize)
|
||||
throw new IOException("Could not write to file");
|
||||
}
|
||||
|
||||
if (metaSize > META_BLOCK_SIZE) {
|
||||
if (fsb.getChannel().write(metaBb.limit(metaSize).position(META_BLOCK_SIZE)) != metaSize - META_BLOCK_SIZE)
|
||||
throw new IOException("Could not write to file");
|
||||
}
|
||||
|
||||
if (sync) {
|
||||
fsb.flush();
|
||||
fsb.getFD().sync();
|
||||
@@ -243,25 +290,53 @@ public class FileObjectPersistentStore implements ObjectPersistentStore {
|
||||
private void writeObjectMetaImpl(Path path, ObjectMetadataP meta, boolean sync) throws IOException {
|
||||
try (var rf = new RandomAccessFile(path.toFile(), "rw");
|
||||
var ch = rf.getChannel()) {
|
||||
var longBuf = UninitializedByteBuffer.allocateUninitialized(8);
|
||||
fillBuffer(longBuf, ch);
|
||||
int len = Math.toIntExact(rf.length());
|
||||
var buf = UninitializedByteBuffer.allocateUninitialized(META_BLOCK_SIZE);
|
||||
fillBuffer(buf, rf.getChannel());
|
||||
|
||||
longBuf.flip();
|
||||
var metaOff = longBuf.getLong();
|
||||
buf.flip();
|
||||
buf.position(8);
|
||||
int metaOff = Math.toIntExact(buf.getLong());
|
||||
|
||||
ch.truncate(metaOff + meta.getSerializedSize());
|
||||
ch.position(metaOff);
|
||||
int metaSize = meta.getSerializedSize() + 16;
|
||||
int dataSize;
|
||||
|
||||
if (metaOff > 0) {
|
||||
dataSize = metaOff - META_BLOCK_SIZE;
|
||||
} else if (metaOff < 0) {
|
||||
dataSize = 0;
|
||||
} else {
|
||||
dataSize = len - META_BLOCK_SIZE;
|
||||
}
|
||||
|
||||
ch.truncate(metaSize + dataSize);
|
||||
ch.position(0);
|
||||
|
||||
var totalSize = meta.getSerializedSize();
|
||||
// Avoids CodedOutputStream flushing all the time
|
||||
var bb = UninitializedByteBuffer.allocateUninitialized(totalSize);
|
||||
var bbOut = CodedOutputStream.newInstance(bb);
|
||||
var metaBb = UninitializedByteBuffer.allocateUninitialized(Math.max(META_BLOCK_SIZE, meta.getSerializedSize() + 16));
|
||||
metaBb.putLong(metaSize - 16);
|
||||
if (dataSize == 0)
|
||||
metaBb.putLong(-1);
|
||||
else if (metaSize <= META_BLOCK_SIZE)
|
||||
metaBb.putLong(0);
|
||||
else
|
||||
metaBb.putLong(META_BLOCK_SIZE + dataSize);
|
||||
{
|
||||
var metaBbOut = CodedOutputStream.newInstance(metaBb);
|
||||
meta.writeTo(metaBbOut);
|
||||
metaBbOut.flush();
|
||||
metaBb.flip();
|
||||
}
|
||||
|
||||
meta.writeTo(bbOut);
|
||||
bbOut.flush();
|
||||
if (ch.write(bb.flip()) != totalSize)
|
||||
if (ch.write(metaBb.limit(META_BLOCK_SIZE)) != META_BLOCK_SIZE)
|
||||
throw new IOException("Could not write to file");
|
||||
|
||||
if (metaSize > META_BLOCK_SIZE) {
|
||||
ch.position(META_BLOCK_SIZE + dataSize);
|
||||
if (ch.write(metaBb.limit(metaSize).position(META_BLOCK_SIZE)) != metaSize - META_BLOCK_SIZE)
|
||||
throw new IOException("Could not write to file");
|
||||
}
|
||||
|
||||
if (sync)
|
||||
rf.getFD().sync();
|
||||
}
|
||||
|
||||
@@ -0,0 +1,86 @@
|
||||
package com.usatiuk.dhfs.persistence;
|
||||
|
||||
import com.google.protobuf.ByteString;
|
||||
import com.usatiuk.dhfs.objects.persistence.ChunkDataP;
|
||||
import com.usatiuk.dhfs.objects.persistence.JObjectDataP;
|
||||
import com.usatiuk.dhfs.objects.persistence.ObjectMetadataP;
|
||||
import com.usatiuk.dhfs.objects.repository.persistence.FileObjectPersistentStore;
|
||||
import io.quarkus.test.junit.QuarkusTest;
|
||||
import io.quarkus.test.junit.QuarkusTestProfile;
|
||||
import io.quarkus.test.junit.TestProfile;
|
||||
import jakarta.inject.Inject;
|
||||
import org.apache.commons.lang3.RandomStringUtils;
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
|
||||
|
||||
class Profiles {
|
||||
public static class FileObjectPersistentStoreTestProfile implements QuarkusTestProfile {
|
||||
@Override
|
||||
public Map<String, String> getConfigOverrides() {
|
||||
var ret = new HashMap<String, String>();
|
||||
ret.put("quarkus.log.category.\"com.usatiuk.dhfs\".level", "TRACE");
|
||||
ret.put("dhfs.fuse.enabled", "false");
|
||||
ret.put("dhfs.objects.ref_verification", "true");
|
||||
return ret;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@QuarkusTest
|
||||
@TestProfile(Profiles.FileObjectPersistentStoreTestProfile.class)
|
||||
public class FileObjectPersistentStoreTest {
|
||||
@Inject
|
||||
FileObjectPersistentStore fileObjectPersistentStore;
|
||||
|
||||
@Test
|
||||
public void writeReadFullObjectSmallMeta() {
|
||||
String name = "writeReadFullObjectSmallMeta";
|
||||
|
||||
var bytes = new byte[100000];
|
||||
ThreadLocalRandom.current().nextBytes(bytes);
|
||||
|
||||
ObjectMetadataP meta = ObjectMetadataP.newBuilder().setName("verycoolname123456789").build();
|
||||
JObjectDataP data = JObjectDataP.newBuilder().setChunkData(ChunkDataP.newBuilder().setData(ByteString.copyFrom(bytes)).build()).build();
|
||||
|
||||
fileObjectPersistentStore.writeObjectDirect(name, meta, data);
|
||||
var readMeta = fileObjectPersistentStore.readObjectMeta(name);
|
||||
var readData = fileObjectPersistentStore.readObject(name);
|
||||
Assertions.assertEquals(meta, readMeta);
|
||||
Assertions.assertEquals(data, readData);
|
||||
|
||||
var bigString = RandomStringUtils.random(100000);
|
||||
|
||||
var newMeta = ObjectMetadataP.newBuilder().setName(String.valueOf(bigString)).build();
|
||||
fileObjectPersistentStore.writeObjectMetaDirect(name, newMeta);
|
||||
readMeta = fileObjectPersistentStore.readObjectMeta(name);
|
||||
readData = fileObjectPersistentStore.readObject(name);
|
||||
Assertions.assertEquals(newMeta, readMeta);
|
||||
Assertions.assertEquals(data, readData);
|
||||
|
||||
fileObjectPersistentStore.writeObjectDirect(name, newMeta, null);
|
||||
readMeta = fileObjectPersistentStore.readObjectMeta(name);
|
||||
Assertions.assertEquals(newMeta, readMeta);
|
||||
Assertions.assertThrows(Throwable.class, () -> fileObjectPersistentStore.readObject(name));
|
||||
|
||||
fileObjectPersistentStore.writeObjectMetaDirect(name, meta);
|
||||
readMeta = fileObjectPersistentStore.readObjectMeta(name);
|
||||
Assertions.assertEquals(meta, readMeta);
|
||||
Assertions.assertThrows(Throwable.class, () -> fileObjectPersistentStore.readObject(name));
|
||||
|
||||
fileObjectPersistentStore.writeObjectDirect(name, newMeta, null);
|
||||
readMeta = fileObjectPersistentStore.readObjectMeta(name);
|
||||
Assertions.assertEquals(newMeta, readMeta);
|
||||
Assertions.assertThrows(Throwable.class, () -> fileObjectPersistentStore.readObject(name));
|
||||
|
||||
fileObjectPersistentStore.writeObjectDirect(name, newMeta, data);
|
||||
readMeta = fileObjectPersistentStore.readObjectMeta(name);
|
||||
readData = fileObjectPersistentStore.readObject(name);
|
||||
Assertions.assertEquals(newMeta, readMeta);
|
||||
Assertions.assertEquals(data, readData);
|
||||
}
|
||||
}
|
||||
@@ -4,7 +4,7 @@ package com.usatiuk.dhfs.supportlib;
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
|
||||
class DhfsSupportNative {
|
||||
public class DhfsSupportNative {
|
||||
static public final int PAGE_SIZE;
|
||||
|
||||
static {
|
||||
|
||||
@@ -2,9 +2,11 @@ package com.usatiuk.dhfs.supportlib;
|
||||
|
||||
import java.lang.ref.Cleaner;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.logging.Logger;
|
||||
|
||||
public class UninitializedByteBuffer {
|
||||
private static final Cleaner CLEANER = Cleaner.create();
|
||||
private static final Logger LOGGER = Logger.getLogger(UninitializedByteBuffer.class.getName());
|
||||
|
||||
public static ByteBuffer allocateUninitialized(int size) {
|
||||
if (size < DhfsSupportNative.PAGE_SIZE)
|
||||
@@ -13,7 +15,14 @@ public class UninitializedByteBuffer {
|
||||
var bb = new ByteBuffer[1];
|
||||
long token = DhfsSupportNative.allocateUninitializedByteBuffer(bb, size);
|
||||
var ret = bb[0];
|
||||
CLEANER.register(ret, () -> DhfsSupportNative.releaseByteBuffer(token));
|
||||
CLEANER.register(ret, () -> {
|
||||
try {
|
||||
DhfsSupportNative.releaseByteBuffer(token);
|
||||
} catch (Throwable e) {
|
||||
LOGGER.severe("Error releasing buffer: " + e.toString());
|
||||
System.exit(-1);
|
||||
}
|
||||
});
|
||||
return ret;
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user