Objects: javadocs

This commit is contained in:
2025-05-13 20:53:44 +02:00
parent 66dabdef25
commit dbad8a2b22
19 changed files with 242 additions and 10 deletions

View File

@@ -2,9 +2,20 @@ package com.usatiuk.objects;
import java.io.Serializable; import java.io.Serializable;
/**
* JData is a marker interface for all objects that can be stored in the object store.
*/
public interface JData extends Serializable { public interface JData extends Serializable {
/**
* Returns the key of the object.
* @return the key of the object
*/
JObjectKey key(); JObjectKey key();
/**
* Returns the estimated size of the object in bytes.
* @return the estimated size of the object in bytes
*/
default int estimateSize() { default int estimateSize() {
return 100; return 100;
} }

View File

@@ -2,15 +2,34 @@ package com.usatiuk.objects;
import com.usatiuk.objects.iterators.Data; import com.usatiuk.objects.iterators.Data;
/**
* JDataVersionedWrapper is a wrapper for JData that contains its version number
* (the id of the transaction that had changed it last)
*/
public sealed interface JDataVersionedWrapper extends Data<JDataVersionedWrapper> permits JDataVersionedWrapperLazy, JDataVersionedWrapperImpl { public sealed interface JDataVersionedWrapper extends Data<JDataVersionedWrapper> permits JDataVersionedWrapperLazy, JDataVersionedWrapperImpl {
@Override @Override
default JDataVersionedWrapper value() { default JDataVersionedWrapper value() {
return this; return this;
} }
/**
* Returns the wrapped object.
*
* @return the wrapped object
*/
JData data(); JData data();
/**
* Returns the version number of the object.
*
* @return the version number of the object
*/
long version(); long version();
/**
* Returns the estimated size of the object in bytes.
*
* @return the estimated size of the object in bytes
*/
int estimateSize(); int estimateSize();
} }

View File

@@ -4,6 +4,9 @@ import jakarta.annotation.Nonnull;
import java.io.Serializable; import java.io.Serializable;
/**
* Simple wrapper for an already-existing JData object with a version.
*/
public record JDataVersionedWrapperImpl(@Nonnull JData data, public record JDataVersionedWrapperImpl(@Nonnull JData data,
long version) implements Serializable, JDataVersionedWrapper { long version) implements Serializable, JDataVersionedWrapper {
@Override @Override

View File

@@ -2,18 +2,35 @@ package com.usatiuk.objects;
import java.util.function.Supplier; import java.util.function.Supplier;
/**
* Lazy JDataVersionedWrapper implementation.
* The object is deserialized only when data() is called for the first time.
* Also allows to set a callback to be called when the data is loaded (e.g. to cache it).
*/
public final class JDataVersionedWrapperLazy implements JDataVersionedWrapper { public final class JDataVersionedWrapperLazy implements JDataVersionedWrapper {
private final long _version; private final long _version;
private final int _estimatedSize; private final int _estimatedSize;
private JData _data; private JData _data;
private Supplier<JData> _producer; private Supplier<JData> _producer;
/**
* Creates a new JDataVersionedWrapperLazy object.
*
* @param version the version number of the object
* @param estimatedSize the estimated size of the object in bytes
* @param producer a supplier that produces the wrapped object
*/
public JDataVersionedWrapperLazy(long version, int estimatedSize, Supplier<JData> producer) { public JDataVersionedWrapperLazy(long version, int estimatedSize, Supplier<JData> producer) {
_version = version; _version = version;
_estimatedSize = estimatedSize; _estimatedSize = estimatedSize;
_producer = producer; _producer = producer;
} }
/**
* Set a callback to be called when the data is loaded.
*
* @param cacheCallback the callback to be called
*/
public void setCacheCallback(Runnable cacheCallback) { public void setCacheCallback(Runnable cacheCallback) {
if (_data != null) { if (_data != null) {
throw new IllegalStateException("Cache callback can be set only before data is loaded"); throw new IllegalStateException("Cache callback can be set only before data is loaded");

View File

@@ -7,11 +7,21 @@ import jakarta.inject.Singleton;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
/**
* Serializer for JDataVersionedWrapper objects.
* The objects are stored in a simple format: first is 8-byte long, then the serialized object.
*/
@Singleton @Singleton
public class JDataVersionedWrapperSerializer { public class JDataVersionedWrapperSerializer {
@Inject @Inject
ObjectSerializer<JData> dataSerializer; ObjectSerializer<JData> dataSerializer;
/**
* Serializes a JDataVersionedWrapper object to a ByteString.
*
* @param obj the object to serialize
* @return the serialized object as a ByteString
*/
public ByteString serialize(JDataVersionedWrapper obj) { public ByteString serialize(JDataVersionedWrapper obj) {
ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES); ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES);
buffer.putLong(obj.version()); buffer.putLong(obj.version());
@@ -19,6 +29,13 @@ public class JDataVersionedWrapperSerializer {
return ByteString.copyFrom(buffer).concat(dataSerializer.serialize(obj.data())); return ByteString.copyFrom(buffer).concat(dataSerializer.serialize(obj.data()));
} }
/**
* Deserializes a JDataVersionedWrapper object from a ByteBuffer.
* Returns a lazy wrapper (JDataVersionedWrapperLazy).
*
* @param data the ByteBuffer containing the serialized object
* @return the deserialized object
*/
public JDataVersionedWrapper deserialize(ByteBuffer data) { public JDataVersionedWrapper deserialize(ByteBuffer data) {
var version = data.getLong(); var version = data.getLong();
return new JDataVersionedWrapperLazy(version, data.remaining(), return new JDataVersionedWrapperLazy(version, data.remaining(),

View File

@@ -5,30 +5,68 @@ import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.UUID; import java.util.UUID;
/**
* JObjectKey is an interface for object keys to be used in the object store.
*/
public sealed interface JObjectKey extends Serializable, Comparable<JObjectKey> permits JObjectKeyImpl, JObjectKeyMax, JObjectKeyMin { public sealed interface JObjectKey extends Serializable, Comparable<JObjectKey> permits JObjectKeyImpl, JObjectKeyMax, JObjectKeyMin {
JObjectKeyMin MIN = new JObjectKeyMin(); JObjectKeyMin MIN = new JObjectKeyMin();
JObjectKeyMax MAX = new JObjectKeyMax(); JObjectKeyMax MAX = new JObjectKeyMax();
/**
* Creates a new JObjectKey from a string value.
*
* @param value the string value of the key
* @return a new JObjectKey
*/
static JObjectKey of(String value) { static JObjectKey of(String value) {
return new JObjectKeyImpl(value); return new JObjectKeyImpl(value);
} }
/**
* Creates a new JObjectKey with a random UUID.
*
* @return a new JObjectKey with a random UUID
*/
static JObjectKey random() { static JObjectKey random() {
return new JObjectKeyImpl(UUID.randomUUID().toString()); return new JObjectKeyImpl(UUID.randomUUID().toString());
} }
/**
* Returns a JObjectKey that compares less than all other keys.
* Calling value on this key will result in an exception.
*
* @return a JObjectKey that compares less than all other keys
*/
static JObjectKey first() { static JObjectKey first() {
return MIN; return MIN;
} }
/**
* Returns a JObjectKey that compares greater than all other keys.
* Calling value on this key will result in an exception.
*
* @return a JObjectKey that compares greater than all other keys
*/
static JObjectKey last() { static JObjectKey last() {
return MAX; return MAX;
} }
/**
* Creates a new JObjectKey from a byte array.
*
* @param bytes the byte array representing the key
* @return a new JObjectKey
*/
static JObjectKey fromBytes(byte[] bytes) { static JObjectKey fromBytes(byte[] bytes) {
return new JObjectKeyImpl(new String(bytes, StandardCharsets.ISO_8859_1)); return new JObjectKeyImpl(new String(bytes, StandardCharsets.ISO_8859_1));
} }
/**
* Creates a new JObjectKey from a ByteBuffer.
*
* @param buff the ByteBuffer representing the key
* @return a new JObjectKey
*/
static JObjectKey fromByteBuffer(ByteBuffer buff) { static JObjectKey fromByteBuffer(ByteBuffer buff) {
byte[] bytes = new byte[buff.remaining()]; byte[] bytes = new byte[buff.remaining()];
buff.get(bytes); buff.get(bytes);
@@ -41,7 +79,17 @@ public sealed interface JObjectKey extends Serializable, Comparable<JObjectKey>
@Override @Override
String toString(); String toString();
/**
* Returns the byte buffer representation of the key.
*
* @return the byte buffer representation of the key
*/
ByteBuffer toByteBuffer(); ByteBuffer toByteBuffer();
/**
* Returns the string value of the key.
*
* @return the string value of the key
*/
String value(); String value();
} }

View File

@@ -7,6 +7,9 @@ import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.Objects; import java.util.Objects;
/**
* A "real" implementation of JObjectKey, containing an underlying string, and a cached lazily created byte buffer.
*/
public final class JObjectKeyImpl implements JObjectKey { public final class JObjectKeyImpl implements JObjectKey {
@Serial @Serial
private static final long serialVersionUID = 0L; private static final long serialVersionUID = 0L;

View File

@@ -2,6 +2,9 @@ package com.usatiuk.objects;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
/**
* JObjectKey implementation that compares greater than all other keys.
*/
public record JObjectKeyMax() implements JObjectKey { public record JObjectKeyMax() implements JObjectKey {
@Override @Override
public int compareTo(JObjectKey o) { public int compareTo(JObjectKey o) {

View File

@@ -2,6 +2,9 @@ package com.usatiuk.objects;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
/**
* JObjectKey implementation that compares less than all other keys.
*/
public record JObjectKeyMin() implements JObjectKey { public record JObjectKeyMin() implements JObjectKey {
@Override @Override
public int compareTo(JObjectKey o) { public int compareTo(JObjectKey o) {

View File

@@ -10,6 +10,9 @@ import jakarta.enterprise.context.ApplicationScoped;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
/**
* Simple Java object serializer.
*/
@ApplicationScoped @ApplicationScoped
@DefaultBean @DefaultBean
public class JavaDataSerializer implements ObjectSerializer<JData> { public class JavaDataSerializer implements ObjectSerializer<JData> {

View File

@@ -4,8 +4,25 @@ import com.google.protobuf.ByteString;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
/**
* Interface for serializing and deserializing objects.
*
* @param <T> the type of object to serialize/deserialize
*/
public interface ObjectSerializer<T> { public interface ObjectSerializer<T> {
/**
* Serialize an object to a ByteString.
*
* @param obj the object to serialize
* @return the serialized object as a ByteString
*/
ByteString serialize(T obj); ByteString serialize(T obj);
/**
* Deserialize an object from a ByteBuffer.
*
* @param data the ByteBuffer containing the serialized object
* @return the deserialized object
*/
T deserialize(ByteBuffer data); T deserialize(ByteBuffer data);
} }

View File

@@ -27,7 +27,7 @@ import java.util.concurrent.atomic.AtomicReference;
/** /**
* CachingObjectPersistentStore is a caching layer for the SerializingObjectPersistentStore * CachingObjectPersistentStore is a caching layer for the SerializingObjectPersistentStore
* It stores the already deserialized objects in memory. * It stores the already deserialized objects in memory.
* * Not (yet) thread safe for writes.
*/ */
@ApplicationScoped @ApplicationScoped
public class CachingObjectPersistentStore { public class CachingObjectPersistentStore {
@@ -67,6 +67,12 @@ public class CachingObjectPersistentStore {
} }
} }
/**
* Commit the transaction to the underlying store and update the cache.
* Once this function returns, the transaction is committed and the cache is updated.
* @param objs the transaction manifest object
* @param txId the transaction ID
*/
public void commitTx(TxManifestObj<? extends JDataVersionedWrapper> objs, long txId) { public void commitTx(TxManifestObj<? extends JDataVersionedWrapper> objs, long txId) {
Log.tracev("Committing: {0} writes, {1} deletes", objs.written().size(), objs.deleted().size()); Log.tracev("Committing: {0} writes, {1} deletes", objs.written().size(), objs.deleted().size());
@@ -84,6 +90,12 @@ public class CachingObjectPersistentStore {
Log.tracev("Committed: {0} writes, {1} deletes", objs.written().size(), objs.deleted().size()); Log.tracev("Committed: {0} writes, {1} deletes", objs.written().size(), objs.deleted().size());
} }
/**
* Get a snapshot of underlying store and the cache.
* Objects are read from the cache if possible, if not, they are read from the underlying store,
* then possibly lazily cached when their data is accessed.
* @return a snapshot of the cached store
*/
public Snapshot<JObjectKey, JDataVersionedWrapper> getSnapshot() { public Snapshot<JObjectKey, JDataVersionedWrapper> getSnapshot() {
while (true) { while (true) {
var cache = _cache.get(); var cache = _cache.get();

View File

@@ -26,15 +26,19 @@ import java.nio.file.Path;
import java.util.List; import java.util.List;
import java.util.NoSuchElementException; import java.util.NoSuchElementException;
import java.util.Optional; import java.util.Optional;
import java.util.stream.Stream;
import static org.lmdbjava.DbiFlags.MDB_CREATE; import static org.lmdbjava.DbiFlags.MDB_CREATE;
import static org.lmdbjava.Env.create; import static org.lmdbjava.Env.create;
/**
* Persistent object storage using LMDB
*/
@ApplicationScoped @ApplicationScoped
@IfBuildProperty(name = "dhfs.objects.persistence", stringValue = "lmdb") @IfBuildProperty(name = "dhfs.objects.persistence", stringValue = "lmdb")
public class LmdbObjectPersistentStore implements ObjectPersistentStore { public class LmdbObjectPersistentStore implements ObjectPersistentStore {
private static final String DB_NAME = "objects"; private static final String DB_NAME = "objects";
// LMDB object name for the transaction id
private static final String DB_VER_OBJ_NAME_STR = "__DB_VER_OBJ"; private static final String DB_VER_OBJ_NAME_STR = "__DB_VER_OBJ";
private static final ByteBuffer DB_VER_OBJ_NAME; private static final ByteBuffer DB_VER_OBJ_NAME;
@@ -100,6 +104,12 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore {
if (!_ready) throw new IllegalStateException("Wrong service order!"); if (!_ready) throw new IllegalStateException("Wrong service order!");
} }
/**
* Get a snapshot of the database.
* Note that the ByteBuffers are invalid after the snapshot is closed.
*
* @return a snapshot of the database
*/
@Override @Override
public Snapshot<JObjectKey, ByteBuffer> getSnapshot() { public Snapshot<JObjectKey, ByteBuffer> getSnapshot() {
var txn = _env.txnRead(); var txn = _env.txnRead();

View File

@@ -15,6 +15,10 @@ import java.util.Optional;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Stream; import java.util.stream.Stream;
/**
* In-memory implementation of the ObjectPersistentStore interface.
* For testing purposes.
*/
@ApplicationScoped @ApplicationScoped
@IfBuildProperty(name = "dhfs.objects.persistence", stringValue = "memory") @IfBuildProperty(name = "dhfs.objects.persistence", stringValue = "memory")
public class MemoryObjectPersistentStore implements ObjectPersistentStore { public class MemoryObjectPersistentStore implements ObjectPersistentStore {

View File

@@ -8,14 +8,33 @@ import javax.annotation.Nonnull;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.Optional; import java.util.Optional;
// Persistent storage of objects /**
// All changes are written as sequential transactions * Interface for a persistent store of objects.
* Does not have to be thread-safe! (yet), it is expected that all commits are done by the same thread.
*/
public interface ObjectPersistentStore { public interface ObjectPersistentStore {
/**
* Get a snapshot of the persistent store.
* @return a snapshot of the persistent store
*/
Snapshot<JObjectKey, ByteBuffer> getSnapshot(); Snapshot<JObjectKey, ByteBuffer> getSnapshot();
/**
* Commit a transaction to the persistent store.
* @param names the transaction manifest
* @param txId the transaction ID
*/
void commitTx(TxManifestRaw names, long txId); void commitTx(TxManifestRaw names, long txId);
/**
* Get the size of the persistent store.
* @return the size of the persistent store
*/
long getTotalSpace(); long getTotalSpace();
/**
* Get the free space of the persistent store.
* @return the free space of the persistent store
*/
long getFreeSpace(); long getFreeSpace();
} }

View File

@@ -16,6 +16,11 @@ import java.util.List;
import java.util.Optional; import java.util.Optional;
import java.util.stream.Stream; import java.util.stream.Stream;
/**
* Serializing wrapper for the ObjectPersistentStore.
* It serializes the objects before storing them in the persistent store.
* It deserializes the objects after reading them from the persistent store.
*/
@ApplicationScoped @ApplicationScoped
public class SerializingObjectPersistentStore { public class SerializingObjectPersistentStore {
@Inject @Inject
@@ -24,6 +29,13 @@ public class SerializingObjectPersistentStore {
@Inject @Inject
ObjectPersistentStore delegateStore; ObjectPersistentStore delegateStore;
/**
* Get a snapshot of the persistent store, with deserialized objects.
*
* The objects are deserialized lazily, only when their data is accessed.
*
* @return a snapshot of the persistent store
*/
public Snapshot<JObjectKey, JDataVersionedWrapper> getSnapshot() { public Snapshot<JObjectKey, JDataVersionedWrapper> getSnapshot() {
return new Snapshot<JObjectKey, JDataVersionedWrapper>() { return new Snapshot<JObjectKey, JDataVersionedWrapper>() {
private final Snapshot<JObjectKey, ByteBuffer> _backing = delegateStore.getSnapshot(); private final Snapshot<JObjectKey, ByteBuffer> _backing = delegateStore.getSnapshot();
@@ -54,6 +66,12 @@ public class SerializingObjectPersistentStore {
} }
/**
* Serialize the objects, in parallel
* @param objs the objects to serialize
* @return the serialized objects
*/
private TxManifestRaw prepareManifest(TxManifestObj<? extends JDataVersionedWrapper> objs) { private TxManifestRaw prepareManifest(TxManifestObj<? extends JDataVersionedWrapper> objs) {
return new TxManifestRaw( return new TxManifestRaw(
objs.written().parallelStream() objs.written().parallelStream()
@@ -62,6 +80,11 @@ public class SerializingObjectPersistentStore {
, objs.deleted()); , objs.deleted());
} }
/**
* Commit a transaction to the persistent store.
* @param objects the transaction manifest
* @param txId the transaction ID
*/
void commitTx(TxManifestObj<? extends JDataVersionedWrapper> objects, long txId) { void commitTx(TxManifestObj<? extends JDataVersionedWrapper> objects, long txId) {
delegateStore.commitTx(prepareManifest(objects), txId); delegateStore.commitTx(prepareManifest(objects), txId);
} }

View File

@@ -6,7 +6,6 @@ import org.apache.commons.lang3.tuple.Pair;
import java.io.Serializable; import java.io.Serializable;
import java.util.Collection; import java.util.Collection;
// FIXME: Serializable
public record TxManifestObj<T>(Collection<Pair<JObjectKey, T>> written, public record TxManifestObj<T>(Collection<Pair<JObjectKey, T>> written,
Collection<JObjectKey> deleted) implements Serializable { Collection<JObjectKey> deleted) implements Serializable {
} }

View File

@@ -7,7 +7,6 @@ import org.apache.commons.lang3.tuple.Pair;
import java.io.Serializable; import java.io.Serializable;
import java.util.Collection; import java.util.Collection;
// FIXME: Serializable
public record TxManifestRaw(Collection<Pair<JObjectKey, ByteString>> written, public record TxManifestRaw(Collection<Pair<JObjectKey, ByteString>> written,
Collection<JObjectKey> deleted) implements Serializable { Collection<JObjectKey> deleted) implements Serializable {
} }

View File

@@ -33,6 +33,10 @@ import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer; import java.util.function.Consumer;
/**
* Asynchronous write cache of objects.
* Objects are put into a write queue by commitTx, and written to the storage by a separate thread.
*/
@ApplicationScoped @ApplicationScoped
public class WritebackObjectPersistentStore { public class WritebackObjectPersistentStore {
@Inject @Inject
@@ -260,16 +264,23 @@ public class WritebackObjectPersistentStore {
} }
} }
public void asyncFence(long bundleId, Runnable fn) { /**
* Run a given callback after the transaction with id txId is committed.
* If the transaction is already committed, the callback is run immediately.
*
* @param txId transaction id to wait for
* @param fn callback to run
*/
public void asyncFence(long txId, Runnable fn) {
verifyReady(); verifyReady();
if (bundleId < 0) throw new IllegalArgumentException("txId should be >0!"); if (txId < 0) throw new IllegalArgumentException("txId should be >0!");
if (_lastFlushedId.get() >= bundleId) { if (_lastFlushedId.get() >= txId) {
fn.run(); fn.run();
return; return;
} }
_pendingBundleLock.lock(); _pendingBundleLock.lock();
try { try {
if (_lastFlushedId.get() >= bundleId) { if (_lastFlushedId.get() >= txId) {
fn.run(); fn.run();
return; return;
} }
@@ -284,12 +295,23 @@ public class WritebackObjectPersistentStore {
} }
} }
/**
* Commit a transaction to the persistent store.
*
* @param writes the transaction manifest
* @return a function that allows to add a callback to be run after the transaction is committed
*/
public Consumer<Runnable> commitTx(Collection<TxRecord.TxObjectRecord<?>> writes) { public Consumer<Runnable> commitTx(Collection<TxRecord.TxObjectRecord<?>> writes) {
long bundleId = commitBundle(writes); long bundleId = commitBundle(writes);
return r -> asyncFence(bundleId, r); return r -> asyncFence(bundleId, r);
} }
/**
* Get a snapshot of the persistent store, including the pending writes.
*
* @return a snapshot of the store
*/
public Snapshot<JObjectKey, JDataVersionedWrapper> getSnapshot() { public Snapshot<JObjectKey, JDataVersionedWrapper> getSnapshot() {
Snapshot<JObjectKey, JDataVersionedWrapper> cache = null; Snapshot<JObjectKey, JDataVersionedWrapper> cache = null;
PendingWriteData pw = null; PendingWriteData pw = null;