diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/JDataVersionedWrapperSerializer.java b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/JDataVersionedWrapperSerializer.java index fa7b4156..b5a9238a 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/JDataVersionedWrapperSerializer.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/JDataVersionedWrapperSerializer.java @@ -8,11 +8,10 @@ import jakarta.inject.Singleton; import java.nio.ByteBuffer; @Singleton -public class JDataVersionedWrapperSerializer implements ObjectSerializer { +public class JDataVersionedWrapperSerializer { @Inject ObjectSerializer dataSerializer; - @Override public ByteString serialize(JDataVersionedWrapper obj) { ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES); buffer.putLong(obj.version()); @@ -20,12 +19,10 @@ public class JDataVersionedWrapperSerializer implements ObjectSerializer dataSerializer.deserialize(rawData) + public JDataVersionedWrapper deserialize(ByteBuffer data) { + var version = data.getLong(); + return new JDataVersionedWrapperLazy(version, data.remaining(), + () -> dataSerializer.deserialize(data) ); } } diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/JObjectKeyImpl.java b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/JObjectKeyImpl.java index f436637a..584c35b5 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/JObjectKeyImpl.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/JObjectKeyImpl.java @@ -69,7 +69,7 @@ public final class JObjectKeyImpl implements JObjectKey { @Override public int hashCode() { - return Objects.hash(value); + return value.hashCode(); } } diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/JavaDataSerializer.java b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/JavaDataSerializer.java index b519521f..14581634 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/JavaDataSerializer.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/JavaDataSerializer.java @@ -2,11 +2,13 @@ package com.usatiuk.objects; import com.google.protobuf.ByteString; +import com.google.protobuf.UnsafeByteOperations; import com.usatiuk.utils.SerializationHelper; import io.quarkus.arc.DefaultBean; import jakarta.enterprise.context.ApplicationScoped; import java.io.IOException; +import java.nio.ByteBuffer; @ApplicationScoped @DefaultBean @@ -16,9 +18,8 @@ public class JavaDataSerializer implements ObjectSerializer { return SerializationHelper.serialize(obj); } - @Override - public JData deserialize(ByteString data) { - try (var is = data.newInput()) { + public JData deserialize(ByteBuffer data) { + try (var is = UnsafeByteOperations.unsafeWrap(data).newInput()) { return SerializationHelper.deserialize(is); } catch (IOException e) { throw new RuntimeException(e); diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/ObjectSerializer.java b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/ObjectSerializer.java index 788f5a52..d53d482f 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/ObjectSerializer.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/ObjectSerializer.java @@ -2,8 +2,10 @@ package com.usatiuk.objects; import com.google.protobuf.ByteString; +import java.nio.ByteBuffer; + public interface ObjectSerializer { ByteString serialize(T obj); - T deserialize(ByteString data); + T deserialize(ByteBuffer data); } diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/stores/LmdbObjectPersistentStore.java b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/stores/LmdbObjectPersistentStore.java index 86eff923..c922360b 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/stores/LmdbObjectPersistentStore.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/stores/LmdbObjectPersistentStore.java @@ -1,7 +1,5 @@ package com.usatiuk.objects.stores; -import com.google.protobuf.ByteString; -import com.google.protobuf.UnsafeByteOperations; import com.usatiuk.objects.JObjectKey; import com.usatiuk.objects.JObjectKeyMax; import com.usatiuk.objects.JObjectKeyMin; @@ -104,27 +102,27 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore { } @Override - public Snapshot getSnapshot() { + public Snapshot getSnapshot() { var txn = _env.txnRead(); try { long commitId = readTxId(txn).orElseThrow(); - return new Snapshot() { + return new Snapshot() { private final Txn _txn = txn; private final long _id = commitId; private boolean _closed = false; @Override - public CloseableKvIterator getIterator(IteratorStart start, JObjectKey key) { + public CloseableKvIterator getIterator(IteratorStart start, JObjectKey key) { assert !_closed; return new KeyPredicateKvIterator<>(new LmdbKvIterator(_txn, start, key), start, key, (k) -> !k.value().equals(DB_VER_OBJ_NAME_STR)); } @Nonnull @Override - public Optional readObject(JObjectKey name) { + public Optional readObject(JObjectKey name) { assert !_closed; var got = _db.get(_txn, name.toByteBuffer()); - var ret = Optional.ofNullable(got).map(UnsafeByteOperations::unsafeWrap); + var ret = Optional.ofNullable(got).map(ByteBuffer::asReadOnlyBuffer); return ret; } @@ -197,7 +195,7 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore { return _root.toFile().getUsableSpace(); } - private class LmdbKvIterator extends ReversibleKvIterator { + private class LmdbKvIterator extends ReversibleKvIterator { private static final Cleaner CLEANER = Cleaner.create(); private final Txn _txn; // Managed by the snapshot private final Cursor _cursor; @@ -352,14 +350,13 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore { } @Override - protected Pair nextImpl() { + protected Pair nextImpl() { if (!_hasNext) { throw new NoSuchElementException("No more elements"); } // TODO: Right now with java serialization it doesn't matter, it's all copied to arrays anyway var val = _cursor.val(); - var bs = UnsafeByteOperations.unsafeWrap(val); - var ret = Pair.of(JObjectKey.fromByteBuffer(_cursor.key()), bs); + var ret = Pair.of(JObjectKey.fromByteBuffer(_cursor.key()), val.asReadOnlyBuffer()); if (_goingForward) _hasNext = _cursor.next(); else diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/stores/MemoryObjectPersistentStore.java b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/stores/MemoryObjectPersistentStore.java index b15944f5..79765b4f 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/stores/MemoryObjectPersistentStore.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/stores/MemoryObjectPersistentStore.java @@ -4,6 +4,7 @@ import com.google.protobuf.ByteString; import com.usatiuk.objects.JObjectKey; import com.usatiuk.objects.iterators.CloseableKvIterator; import com.usatiuk.objects.iterators.IteratorStart; +import com.usatiuk.objects.iterators.MappingKvIterator; import com.usatiuk.objects.iterators.NavigableMapKvIterator; import com.usatiuk.objects.snapshot.Snapshot; import io.quarkus.arc.properties.IfBuildProperty; @@ -11,6 +12,7 @@ import jakarta.enterprise.context.ApplicationScoped; import org.pcollections.TreePMap; import javax.annotation.Nonnull; +import java.nio.ByteBuffer; import java.util.Optional; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -22,21 +24,21 @@ public class MemoryObjectPersistentStore implements ObjectPersistentStore { private long _lastCommitId = 0; @Override - public Snapshot getSnapshot() { + public Snapshot getSnapshot() { synchronized (this) { - return new Snapshot() { + return new Snapshot() { private final TreePMap _objects = MemoryObjectPersistentStore.this._objects; private final long _lastCommitId = MemoryObjectPersistentStore.this._lastCommitId; @Override - public CloseableKvIterator getIterator(IteratorStart start, JObjectKey key) { - return new NavigableMapKvIterator<>(_objects, start, key); + public CloseableKvIterator getIterator(IteratorStart start, JObjectKey key) { + return new MappingKvIterator<>(new NavigableMapKvIterator<>(_objects, start, key), ByteString::asReadOnlyByteBuffer); } @Nonnull @Override - public Optional readObject(JObjectKey name) { - return Optional.ofNullable(_objects.get(name)); + public Optional readObject(JObjectKey name) { + return Optional.ofNullable(_objects.get(name)).map(ByteString::asReadOnlyByteBuffer); } @Override diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/stores/ObjectPersistentStore.java b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/stores/ObjectPersistentStore.java index 0600af3c..d1918de2 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/stores/ObjectPersistentStore.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/stores/ObjectPersistentStore.java @@ -5,12 +5,13 @@ import com.usatiuk.objects.JObjectKey; import com.usatiuk.objects.snapshot.Snapshot; import javax.annotation.Nonnull; +import java.nio.ByteBuffer; import java.util.Optional; // Persistent storage of objects // All changes are written as sequential transactions public interface ObjectPersistentStore { - Snapshot getSnapshot(); + Snapshot getSnapshot(); Runnable prepareTx(TxManifestRaw names, long txId); diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/stores/SerializingObjectPersistentStore.java b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/stores/SerializingObjectPersistentStore.java index 963c8e20..4af76bde 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/stores/SerializingObjectPersistentStore.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/stores/SerializingObjectPersistentStore.java @@ -2,6 +2,7 @@ package com.usatiuk.objects.stores; import com.google.protobuf.ByteString; import com.usatiuk.objects.JDataVersionedWrapper; +import com.usatiuk.objects.JDataVersionedWrapperSerializer; import com.usatiuk.objects.JObjectKey; import com.usatiuk.objects.ObjectSerializer; import com.usatiuk.objects.iterators.CloseableKvIterator; @@ -13,19 +14,20 @@ import jakarta.inject.Inject; import org.apache.commons.lang3.tuple.Pair; import javax.annotation.Nonnull; +import java.nio.ByteBuffer; import java.util.Optional; @ApplicationScoped public class SerializingObjectPersistentStore { @Inject - ObjectSerializer serializer; + JDataVersionedWrapperSerializer serializer; @Inject ObjectPersistentStore delegateStore; public Snapshot getSnapshot() { return new Snapshot() { - private final Snapshot _backing = delegateStore.getSnapshot(); + private final Snapshot _backing = delegateStore.getSnapshot(); @Override public CloseableKvIterator getIterator(IteratorStart start, JObjectKey key) { diff --git a/dhfs-parent/objects/src/test/java/com/usatiuk/objects/stores/LmdbKvIteratorTest.java b/dhfs-parent/objects/src/test/java/com/usatiuk/objects/stores/LmdbKvIteratorTest.java index 5e9cc04d..c6a113dc 100644 --- a/dhfs-parent/objects/src/test/java/com/usatiuk/objects/stores/LmdbKvIteratorTest.java +++ b/dhfs-parent/objects/src/test/java/com/usatiuk/objects/stores/LmdbKvIteratorTest.java @@ -13,6 +13,7 @@ import org.apache.commons.lang3.tuple.Pair; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.RepeatedTest; +import java.nio.ByteBuffer; import java.util.List; class Profiles { @@ -46,49 +47,49 @@ public class LmdbKvIteratorTest { try (var snapshot = store.getSnapshot()) { var iterator = snapshot.getIterator(IteratorStart.GE, JObjectKey.of("")); - Just.checkIterator(iterator, List.of(Pair.of(JObjectKey.of(Long.toString(1)), ByteString.copyFrom(new byte[]{2})), - Pair.of(JObjectKey.of(Long.toString(2)), ByteString.copyFrom(new byte[]{3})), - Pair.of(JObjectKey.of(Long.toString(3)), ByteString.copyFrom(new byte[]{4})))); + Just.checkIterator(iterator, List.of(Pair.of(JObjectKey.of(Long.toString(1)), ByteBuffer.wrap(new byte[]{2})), + Pair.of(JObjectKey.of(Long.toString(2)), ByteBuffer.wrap(new byte[]{3})), + Pair.of(JObjectKey.of(Long.toString(3)), ByteBuffer.wrap(new byte[]{4})))); Assertions.assertFalse(iterator.hasNext()); iterator.close(); iterator = snapshot.getIterator(IteratorStart.LE, JObjectKey.of(Long.toString(3))); - Just.checkIterator(iterator, Pair.of(JObjectKey.of(Long.toString(3)), ByteString.copyFrom(new byte[]{4}))); + Just.checkIterator(iterator, Pair.of(JObjectKey.of(Long.toString(3)), ByteBuffer.wrap(new byte[]{4}))); Assertions.assertFalse(iterator.hasNext()); iterator.close(); iterator = snapshot.getIterator(IteratorStart.LE, JObjectKey.of(Long.toString(2))); - Just.checkIterator(iterator, Pair.of(JObjectKey.of(Long.toString(2)), ByteString.copyFrom(new byte[]{3})), Pair.of(JObjectKey.of(Long.toString(3)), ByteString.copyFrom(new byte[]{4}))); + Just.checkIterator(iterator, Pair.of(JObjectKey.of(Long.toString(2)), ByteBuffer.wrap(new byte[]{3})), Pair.of(JObjectKey.of(Long.toString(3)), ByteBuffer.wrap(new byte[]{4}))); Assertions.assertFalse(iterator.hasNext()); iterator.close(); iterator = snapshot.getIterator(IteratorStart.GE, JObjectKey.of(Long.toString(2))); - Just.checkIterator(iterator, Pair.of(JObjectKey.of(Long.toString(2)), ByteString.copyFrom(new byte[]{3})), Pair.of(JObjectKey.of(Long.toString(3)), ByteString.copyFrom(new byte[]{4}))); + Just.checkIterator(iterator, Pair.of(JObjectKey.of(Long.toString(2)), ByteBuffer.wrap(new byte[]{3})), Pair.of(JObjectKey.of(Long.toString(3)), ByteBuffer.wrap(new byte[]{4}))); Assertions.assertFalse(iterator.hasNext()); iterator.close(); iterator = snapshot.getIterator(IteratorStart.GT, JObjectKey.of(Long.toString(2))); - Just.checkIterator(iterator, Pair.of(JObjectKey.of(Long.toString(3)), ByteString.copyFrom(new byte[]{4}))); + Just.checkIterator(iterator, Pair.of(JObjectKey.of(Long.toString(3)), ByteBuffer.wrap(new byte[]{4}))); Assertions.assertFalse(iterator.hasNext()); iterator.close(); iterator = snapshot.getIterator(IteratorStart.LT, JObjectKey.of(Long.toString(3))); - Just.checkIterator(iterator, Pair.of(JObjectKey.of(Long.toString(2)), ByteString.copyFrom(new byte[]{3})), Pair.of(JObjectKey.of(Long.toString(3)), ByteString.copyFrom(new byte[]{4}))); + Just.checkIterator(iterator, Pair.of(JObjectKey.of(Long.toString(2)), ByteBuffer.wrap(new byte[]{3})), Pair.of(JObjectKey.of(Long.toString(3)), ByteBuffer.wrap(new byte[]{4}))); Assertions.assertFalse(iterator.hasNext()); iterator.close(); iterator = snapshot.getIterator(IteratorStart.LT, JObjectKey.of(Long.toString(2))); - Just.checkIterator(iterator, Pair.of(JObjectKey.of(Long.toString(1)), ByteString.copyFrom(new byte[]{2})), Pair.of(JObjectKey.of(Long.toString(2)), ByteString.copyFrom(new byte[]{3})), Pair.of(JObjectKey.of(Long.toString(3)), ByteString.copyFrom(new byte[]{4}))); + Just.checkIterator(iterator, Pair.of(JObjectKey.of(Long.toString(1)), ByteBuffer.wrap(new byte[]{2})), Pair.of(JObjectKey.of(Long.toString(2)), ByteBuffer.wrap(new byte[]{3})), Pair.of(JObjectKey.of(Long.toString(3)), ByteBuffer.wrap(new byte[]{4}))); Assertions.assertFalse(iterator.hasNext()); iterator.close(); iterator = snapshot.getIterator(IteratorStart.LT, JObjectKey.of(Long.toString(1))); - Just.checkIterator(iterator, Pair.of(JObjectKey.of(Long.toString(1)), ByteString.copyFrom(new byte[]{2})), Pair.of(JObjectKey.of(Long.toString(2)), ByteString.copyFrom(new byte[]{3})), Pair.of(JObjectKey.of(Long.toString(3)), ByteString.copyFrom(new byte[]{4}))); + Just.checkIterator(iterator, Pair.of(JObjectKey.of(Long.toString(1)), ByteBuffer.wrap(new byte[]{2})), Pair.of(JObjectKey.of(Long.toString(2)), ByteBuffer.wrap(new byte[]{3})), Pair.of(JObjectKey.of(Long.toString(3)), ByteBuffer.wrap(new byte[]{4}))); Assertions.assertFalse(iterator.hasNext()); iterator.close(); iterator = snapshot.getIterator(IteratorStart.LE, JObjectKey.of(Long.toString(1))); - Just.checkIterator(iterator, Pair.of(JObjectKey.of(Long.toString(1)), ByteString.copyFrom(new byte[]{2})), Pair.of(JObjectKey.of(Long.toString(2)), ByteString.copyFrom(new byte[]{3})), Pair.of(JObjectKey.of(Long.toString(3)), ByteString.copyFrom(new byte[]{4}))); + Just.checkIterator(iterator, Pair.of(JObjectKey.of(Long.toString(1)), ByteBuffer.wrap(new byte[]{2})), Pair.of(JObjectKey.of(Long.toString(2)), ByteBuffer.wrap(new byte[]{3})), Pair.of(JObjectKey.of(Long.toString(3)), ByteBuffer.wrap(new byte[]{4}))); Assertions.assertFalse(iterator.hasNext()); iterator.close(); @@ -101,7 +102,7 @@ public class LmdbKvIteratorTest { iterator.close(); iterator = snapshot.getIterator(IteratorStart.LE, JObjectKey.of(Long.toString(0))); - Just.checkIterator(iterator, Pair.of(JObjectKey.of(Long.toString(1)), ByteString.copyFrom(new byte[]{2})), Pair.of(JObjectKey.of(Long.toString(2)), ByteString.copyFrom(new byte[]{3})), Pair.of(JObjectKey.of(Long.toString(3)), ByteString.copyFrom(new byte[]{4}))); + Just.checkIterator(iterator, Pair.of(JObjectKey.of(Long.toString(1)), ByteBuffer.wrap(new byte[]{2})), Pair.of(JObjectKey.of(Long.toString(2)), ByteBuffer.wrap(new byte[]{3})), Pair.of(JObjectKey.of(Long.toString(3)), ByteBuffer.wrap(new byte[]{4}))); Assertions.assertFalse(iterator.hasNext()); iterator.close(); @@ -111,11 +112,11 @@ public class LmdbKvIteratorTest { Assertions.assertEquals(JObjectKey.of(Long.toString(1)), iterator.peekPrevKey()); Assertions.assertEquals(JObjectKey.of(Long.toString(2)), iterator.peekNextKey()); Assertions.assertEquals(JObjectKey.of(Long.toString(1)), iterator.peekPrevKey()); - Just.checkIterator(iterator.reversed(), Pair.of(JObjectKey.of(Long.toString(1)), ByteString.copyFrom(new byte[]{2}))); - Just.checkIterator(iterator, Pair.of(JObjectKey.of(Long.toString(1)), ByteString.copyFrom(new byte[]{2})), Pair.of(JObjectKey.of(Long.toString(2)), ByteString.copyFrom(new byte[]{3})), Pair.of(JObjectKey.of(Long.toString(3)), ByteString.copyFrom(new byte[]{4}))); - Assertions.assertEquals(Pair.of(JObjectKey.of(Long.toString(3)), ByteString.copyFrom(new byte[]{4})), iterator.prev()); - Assertions.assertEquals(Pair.of(JObjectKey.of(Long.toString(2)), ByteString.copyFrom(new byte[]{3})), iterator.prev()); - Assertions.assertEquals(Pair.of(JObjectKey.of(Long.toString(2)), ByteString.copyFrom(new byte[]{3})), iterator.next()); + Just.checkIterator(iterator.reversed(), Pair.of(JObjectKey.of(Long.toString(1)), ByteBuffer.wrap(new byte[]{2}))); + Just.checkIterator(iterator, Pair.of(JObjectKey.of(Long.toString(1)), ByteBuffer.wrap(new byte[]{2})), Pair.of(JObjectKey.of(Long.toString(2)), ByteBuffer.wrap(new byte[]{3})), Pair.of(JObjectKey.of(Long.toString(3)), ByteBuffer.wrap(new byte[]{4}))); + Assertions.assertEquals(Pair.of(JObjectKey.of(Long.toString(3)), ByteBuffer.wrap(new byte[]{4})), iterator.prev()); + Assertions.assertEquals(Pair.of(JObjectKey.of(Long.toString(2)), ByteBuffer.wrap(new byte[]{3})), iterator.prev()); + Assertions.assertEquals(Pair.of(JObjectKey.of(Long.toString(2)), ByteBuffer.wrap(new byte[]{3})), iterator.next()); iterator.close(); }