mirror of
https://github.com/usatiuk/dhfs.git
synced 2025-10-28 20:47:49 +01:00
use bytebuffers
This commit is contained in:
@@ -1,6 +1,9 @@
|
||||
package com.usatiuk.dhfs.objects;
|
||||
|
||||
import com.usatiuk.dhfs.supportlib.UninitializedByteBuffer;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
|
||||
public record JObjectKey(String name) implements Serializable, Comparable<JObjectKey> {
|
||||
@@ -22,7 +25,20 @@ public record JObjectKey(String name) implements Serializable, Comparable<JObjec
|
||||
return name.getBytes(StandardCharsets.UTF_8);
|
||||
}
|
||||
|
||||
public ByteBuffer toByteBuffer() {
|
||||
var heapBb = StandardCharsets.UTF_8.encode(name);
|
||||
if (heapBb.isDirect()) return heapBb;
|
||||
var directBb = UninitializedByteBuffer.allocateUninitialized(heapBb.remaining());
|
||||
directBb.put(heapBb);
|
||||
directBb.flip();
|
||||
return directBb;
|
||||
}
|
||||
|
||||
public static JObjectKey fromBytes(byte[] bytes) {
|
||||
return new JObjectKey(new String(bytes, StandardCharsets.UTF_8));
|
||||
}
|
||||
|
||||
public static JObjectKey fromByteBuffer(ByteBuffer buff) {
|
||||
return new JObjectKey(StandardCharsets.UTF_8.decode(buff).toString());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,6 +3,7 @@ package com.usatiuk.dhfs.objects.persistence;
|
||||
import com.google.protobuf.ByteString;
|
||||
import com.usatiuk.dhfs.objects.CloseableKvIterator;
|
||||
import com.usatiuk.dhfs.objects.JObjectKey;
|
||||
import com.usatiuk.dhfs.supportlib.UninitializedByteBuffer;
|
||||
import io.quarkus.arc.properties.IfBuildProperty;
|
||||
import io.quarkus.logging.Log;
|
||||
import io.quarkus.runtime.ShutdownEvent;
|
||||
@@ -18,7 +19,7 @@ import org.lmdbjava.*;
|
||||
import javax.annotation.Nonnull;
|
||||
import java.io.IOException;
|
||||
import java.lang.ref.Cleaner;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.file.Path;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
@@ -32,8 +33,8 @@ import static org.lmdbjava.Env.create;
|
||||
@IfBuildProperty(name = "dhfs.objects.persistence", stringValue = "lmdb")
|
||||
public class LmdbObjectPersistentStore implements ObjectPersistentStore {
|
||||
private final Path _root;
|
||||
private Env<byte[]> _env;
|
||||
private Dbi<byte[]> _db;
|
||||
private Env<ByteBuffer> _env;
|
||||
private Dbi<ByteBuffer> _db;
|
||||
private boolean _ready = false;
|
||||
|
||||
private static final String DB_NAME = "objects";
|
||||
@@ -47,7 +48,7 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore {
|
||||
Log.info("Initializing with root " + _root);
|
||||
_root.toFile().mkdirs();
|
||||
}
|
||||
_env = create(ByteArrayProxy.PROXY_BA)
|
||||
_env = create()
|
||||
.setMapSize(1_000_000_000_000L)
|
||||
.setMaxDbs(1)
|
||||
.open(_root.toFile(), EnvFlags.MDB_NOTLS);
|
||||
@@ -80,20 +81,20 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore {
|
||||
return List.of();
|
||||
}
|
||||
|
||||
|
||||
@Nonnull
|
||||
@Override
|
||||
public Optional<ByteString> readObject(JObjectKey name) {
|
||||
verifyReady();
|
||||
try (Txn<byte[]> txn = _env.txnRead()) {
|
||||
var key = name.toString().getBytes(StandardCharsets.UTF_8);
|
||||
var value = _db.get(txn, key);
|
||||
try (Txn<ByteBuffer> txn = _env.txnRead()) {
|
||||
var value = _db.get(txn, name.toByteBuffer());
|
||||
return Optional.ofNullable(value).map(ByteString::copyFrom);
|
||||
}
|
||||
}
|
||||
|
||||
private class LmdbKvIterator implements CloseableKvIterator<JObjectKey, ByteString> {
|
||||
private final Txn<byte[]> _txn = _env.txnRead();
|
||||
private final Cursor<byte[]> _cursor = _db.openCursor(_txn);
|
||||
private final Txn<ByteBuffer> _txn = _env.txnRead();
|
||||
private final Cursor<ByteBuffer> _cursor = _db.openCursor(_txn);
|
||||
private boolean _hasNext = false;
|
||||
|
||||
private static final Cleaner CLEANER = Cleaner.create();
|
||||
@@ -108,11 +109,12 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore {
|
||||
});
|
||||
|
||||
verifyReady();
|
||||
if (!_cursor.get(key.toString().getBytes(StandardCharsets.UTF_8), GetOp.MDB_SET_RANGE)) {
|
||||
if (!_cursor.get(key.toByteBuffer(), GetOp.MDB_SET_RANGE)) {
|
||||
return;
|
||||
}
|
||||
|
||||
var got = JObjectKey.fromBytes(_cursor.key());
|
||||
var got = JObjectKey.fromByteBuffer(_cursor.key());
|
||||
_cursor.key().flip();
|
||||
var cmp = got.compareTo(key);
|
||||
|
||||
assert cmp >= 0;
|
||||
@@ -163,7 +165,7 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore {
|
||||
if (!_hasNext) {
|
||||
throw new NoSuchElementException("No more elements");
|
||||
}
|
||||
var ret = Pair.of(JObjectKey.fromBytes(_cursor.key()), ByteString.copyFrom(_cursor.val()));
|
||||
var ret = Pair.of(JObjectKey.fromByteBuffer(_cursor.key()), ByteString.copyFrom(_cursor.val()));
|
||||
_hasNext = _cursor.next();
|
||||
Log.tracev("Read: {0}, hasNext: {1}", ret, _hasNext);
|
||||
return ret;
|
||||
@@ -174,7 +176,9 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore {
|
||||
if (!_hasNext) {
|
||||
throw new NoSuchElementException("No more elements");
|
||||
}
|
||||
return JObjectKey.fromBytes(_cursor.key());
|
||||
var ret = JObjectKey.fromByteBuffer(_cursor.key());
|
||||
_cursor.key().flip();
|
||||
return ret;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -186,14 +190,16 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore {
|
||||
@Override
|
||||
public void commitTx(TxManifestRaw names) {
|
||||
verifyReady();
|
||||
try (Txn<byte[]> txn = _env.txnWrite()) {
|
||||
try (Txn<ByteBuffer> txn = _env.txnWrite()) {
|
||||
for (var written : names.written()) {
|
||||
var key = written.getKey().toString().getBytes(StandardCharsets.UTF_8);
|
||||
_db.put(txn, key, written.getValue().toByteArray());
|
||||
// TODO:
|
||||
var bb = UninitializedByteBuffer.allocateUninitialized(written.getValue().size());
|
||||
bb.put(written.getValue().asReadOnlyByteBuffer());
|
||||
bb.flip();
|
||||
_db.put(txn, written.getKey().toByteBuffer(), bb);
|
||||
}
|
||||
for (JObjectKey key : names.deleted()) {
|
||||
var keyBytes = key.toString().getBytes(StandardCharsets.UTF_8);
|
||||
_db.delete(txn, keyBytes);
|
||||
_db.delete(txn, key.toByteBuffer());
|
||||
}
|
||||
txn.commit();
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user