diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/JData.java b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/JData.java index d1af85f9..bed0dfea 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/JData.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/JData.java @@ -2,9 +2,20 @@ package com.usatiuk.objects; import java.io.Serializable; +/** + * JData is a marker interface for all objects that can be stored in the object store. + */ public interface JData extends Serializable { + /** + * Returns the key of the object. + * @return the key of the object + */ JObjectKey key(); + /** + * Returns the estimated size of the object in bytes. + * @return the estimated size of the object in bytes + */ default int estimateSize() { return 100; } diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/JDataVersionedWrapper.java b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/JDataVersionedWrapper.java index 04328ef9..fc0117ab 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/JDataVersionedWrapper.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/JDataVersionedWrapper.java @@ -2,15 +2,34 @@ package com.usatiuk.objects; import com.usatiuk.objects.iterators.Data; +/** + * JDataVersionedWrapper is a wrapper for JData that contains its version number + * (the id of the transaction that had changed it last) + */ public sealed interface JDataVersionedWrapper extends Data permits JDataVersionedWrapperLazy, JDataVersionedWrapperImpl { @Override default JDataVersionedWrapper value() { return this; } + /** + * Returns the wrapped object. + * + * @return the wrapped object + */ JData data(); + /** + * Returns the version number of the object. + * + * @return the version number of the object + */ long version(); + /** + * Returns the estimated size of the object in bytes. + * + * @return the estimated size of the object in bytes + */ int estimateSize(); } diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/JDataVersionedWrapperImpl.java b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/JDataVersionedWrapperImpl.java index 3a2bce6d..ed029737 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/JDataVersionedWrapperImpl.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/JDataVersionedWrapperImpl.java @@ -4,6 +4,9 @@ import jakarta.annotation.Nonnull; import java.io.Serializable; +/** + * Simple wrapper for an already-existing JData object with a version. + */ public record JDataVersionedWrapperImpl(@Nonnull JData data, long version) implements Serializable, JDataVersionedWrapper { @Override diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/JDataVersionedWrapperLazy.java b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/JDataVersionedWrapperLazy.java index b1ef09e6..5d05d7b5 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/JDataVersionedWrapperLazy.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/JDataVersionedWrapperLazy.java @@ -2,18 +2,35 @@ package com.usatiuk.objects; import java.util.function.Supplier; +/** + * Lazy JDataVersionedWrapper implementation. + * The object is deserialized only when data() is called for the first time. + * Also allows to set a callback to be called when the data is loaded (e.g. to cache it). + */ public final class JDataVersionedWrapperLazy implements JDataVersionedWrapper { private final long _version; private final int _estimatedSize; private JData _data; private Supplier _producer; + /** + * Creates a new JDataVersionedWrapperLazy object. + * + * @param version the version number of the object + * @param estimatedSize the estimated size of the object in bytes + * @param producer a supplier that produces the wrapped object + */ public JDataVersionedWrapperLazy(long version, int estimatedSize, Supplier producer) { _version = version; _estimatedSize = estimatedSize; _producer = producer; } + /** + * Set a callback to be called when the data is loaded. + * + * @param cacheCallback the callback to be called + */ public void setCacheCallback(Runnable cacheCallback) { if (_data != null) { throw new IllegalStateException("Cache callback can be set only before data is loaded"); diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/JDataVersionedWrapperSerializer.java b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/JDataVersionedWrapperSerializer.java index b5a9238a..70e8dfad 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/JDataVersionedWrapperSerializer.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/JDataVersionedWrapperSerializer.java @@ -7,11 +7,21 @@ import jakarta.inject.Singleton; import java.nio.ByteBuffer; +/** + * Serializer for JDataVersionedWrapper objects. + * The objects are stored in a simple format: first is 8-byte long, then the serialized object. + */ @Singleton public class JDataVersionedWrapperSerializer { @Inject ObjectSerializer dataSerializer; + /** + * Serializes a JDataVersionedWrapper object to a ByteString. + * + * @param obj the object to serialize + * @return the serialized object as a ByteString + */ public ByteString serialize(JDataVersionedWrapper obj) { ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES); buffer.putLong(obj.version()); @@ -19,6 +29,13 @@ public class JDataVersionedWrapperSerializer { return ByteString.copyFrom(buffer).concat(dataSerializer.serialize(obj.data())); } + /** + * Deserializes a JDataVersionedWrapper object from a ByteBuffer. + * Returns a lazy wrapper (JDataVersionedWrapperLazy). + * + * @param data the ByteBuffer containing the serialized object + * @return the deserialized object + */ public JDataVersionedWrapper deserialize(ByteBuffer data) { var version = data.getLong(); return new JDataVersionedWrapperLazy(version, data.remaining(), diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/JObjectKey.java b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/JObjectKey.java index 93bd080c..de98b8d6 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/JObjectKey.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/JObjectKey.java @@ -5,30 +5,68 @@ import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.UUID; +/** + * JObjectKey is an interface for object keys to be used in the object store. + */ public sealed interface JObjectKey extends Serializable, Comparable permits JObjectKeyImpl, JObjectKeyMax, JObjectKeyMin { JObjectKeyMin MIN = new JObjectKeyMin(); JObjectKeyMax MAX = new JObjectKeyMax(); + /** + * Creates a new JObjectKey from a string value. + * + * @param value the string value of the key + * @return a new JObjectKey + */ static JObjectKey of(String value) { return new JObjectKeyImpl(value); } + /** + * Creates a new JObjectKey with a random UUID. + * + * @return a new JObjectKey with a random UUID + */ static JObjectKey random() { return new JObjectKeyImpl(UUID.randomUUID().toString()); } + /** + * Returns a JObjectKey that compares less than all other keys. + * Calling value on this key will result in an exception. + * + * @return a JObjectKey that compares less than all other keys + */ static JObjectKey first() { return MIN; } + /** + * Returns a JObjectKey that compares greater than all other keys. + * Calling value on this key will result in an exception. + * + * @return a JObjectKey that compares greater than all other keys + */ static JObjectKey last() { return MAX; } + /** + * Creates a new JObjectKey from a byte array. + * + * @param bytes the byte array representing the key + * @return a new JObjectKey + */ static JObjectKey fromBytes(byte[] bytes) { return new JObjectKeyImpl(new String(bytes, StandardCharsets.ISO_8859_1)); } + /** + * Creates a new JObjectKey from a ByteBuffer. + * + * @param buff the ByteBuffer representing the key + * @return a new JObjectKey + */ static JObjectKey fromByteBuffer(ByteBuffer buff) { byte[] bytes = new byte[buff.remaining()]; buff.get(bytes); @@ -41,7 +79,17 @@ public sealed interface JObjectKey extends Serializable, Comparable @Override String toString(); + /** + * Returns the byte buffer representation of the key. + * + * @return the byte buffer representation of the key + */ ByteBuffer toByteBuffer(); + /** + * Returns the string value of the key. + * + * @return the string value of the key + */ String value(); } diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/JObjectKeyImpl.java b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/JObjectKeyImpl.java index eca0af53..bf17af0e 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/JObjectKeyImpl.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/JObjectKeyImpl.java @@ -7,6 +7,9 @@ import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.Objects; +/** + * A "real" implementation of JObjectKey, containing an underlying string, and a cached lazily created byte buffer. + */ public final class JObjectKeyImpl implements JObjectKey { @Serial private static final long serialVersionUID = 0L; diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/JObjectKeyMax.java b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/JObjectKeyMax.java index 982e08b2..04948aeb 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/JObjectKeyMax.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/JObjectKeyMax.java @@ -2,6 +2,9 @@ package com.usatiuk.objects; import java.nio.ByteBuffer; +/** + * JObjectKey implementation that compares greater than all other keys. + */ public record JObjectKeyMax() implements JObjectKey { @Override public int compareTo(JObjectKey o) { diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/JObjectKeyMin.java b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/JObjectKeyMin.java index 4a8bbc69..fa80d456 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/JObjectKeyMin.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/JObjectKeyMin.java @@ -2,6 +2,9 @@ package com.usatiuk.objects; import java.nio.ByteBuffer; +/** + * JObjectKey implementation that compares less than all other keys. + */ public record JObjectKeyMin() implements JObjectKey { @Override public int compareTo(JObjectKey o) { diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/JavaDataSerializer.java b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/JavaDataSerializer.java index 14581634..25d7884e 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/JavaDataSerializer.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/JavaDataSerializer.java @@ -10,6 +10,9 @@ import jakarta.enterprise.context.ApplicationScoped; import java.io.IOException; import java.nio.ByteBuffer; +/** + * Simple Java object serializer. + */ @ApplicationScoped @DefaultBean public class JavaDataSerializer implements ObjectSerializer { diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/ObjectSerializer.java b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/ObjectSerializer.java index d53d482f..eac66d02 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/ObjectSerializer.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/ObjectSerializer.java @@ -4,8 +4,25 @@ import com.google.protobuf.ByteString; import java.nio.ByteBuffer; +/** + * Interface for serializing and deserializing objects. + * + * @param the type of object to serialize/deserialize + */ public interface ObjectSerializer { + /** + * Serialize an object to a ByteString. + * + * @param obj the object to serialize + * @return the serialized object as a ByteString + */ ByteString serialize(T obj); + /** + * Deserialize an object from a ByteBuffer. + * + * @param data the ByteBuffer containing the serialized object + * @return the deserialized object + */ T deserialize(ByteBuffer data); } diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/stores/CachingObjectPersistentStore.java b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/stores/CachingObjectPersistentStore.java index 4df9488b..8a654002 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/stores/CachingObjectPersistentStore.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/stores/CachingObjectPersistentStore.java @@ -27,7 +27,7 @@ import java.util.concurrent.atomic.AtomicReference; /** * CachingObjectPersistentStore is a caching layer for the SerializingObjectPersistentStore * It stores the already deserialized objects in memory. - * + * Not (yet) thread safe for writes. */ @ApplicationScoped public class CachingObjectPersistentStore { @@ -67,6 +67,12 @@ public class CachingObjectPersistentStore { } } + /** + * Commit the transaction to the underlying store and update the cache. + * Once this function returns, the transaction is committed and the cache is updated. + * @param objs the transaction manifest object + * @param txId the transaction ID + */ public void commitTx(TxManifestObj objs, long txId) { Log.tracev("Committing: {0} writes, {1} deletes", objs.written().size(), objs.deleted().size()); @@ -84,6 +90,12 @@ public class CachingObjectPersistentStore { Log.tracev("Committed: {0} writes, {1} deletes", objs.written().size(), objs.deleted().size()); } + /** + * Get a snapshot of underlying store and the cache. + * Objects are read from the cache if possible, if not, they are read from the underlying store, + * then possibly lazily cached when their data is accessed. + * @return a snapshot of the cached store + */ public Snapshot getSnapshot() { while (true) { var cache = _cache.get(); diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/stores/LmdbObjectPersistentStore.java b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/stores/LmdbObjectPersistentStore.java index ad072339..6b8e31c2 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/stores/LmdbObjectPersistentStore.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/stores/LmdbObjectPersistentStore.java @@ -26,15 +26,19 @@ import java.nio.file.Path; import java.util.List; import java.util.NoSuchElementException; import java.util.Optional; -import java.util.stream.Stream; import static org.lmdbjava.DbiFlags.MDB_CREATE; import static org.lmdbjava.Env.create; +/** + * Persistent object storage using LMDB + */ @ApplicationScoped @IfBuildProperty(name = "dhfs.objects.persistence", stringValue = "lmdb") public class LmdbObjectPersistentStore implements ObjectPersistentStore { private static final String DB_NAME = "objects"; + + // LMDB object name for the transaction id private static final String DB_VER_OBJ_NAME_STR = "__DB_VER_OBJ"; private static final ByteBuffer DB_VER_OBJ_NAME; @@ -100,6 +104,12 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore { if (!_ready) throw new IllegalStateException("Wrong service order!"); } + /** + * Get a snapshot of the database. + * Note that the ByteBuffers are invalid after the snapshot is closed. + * + * @return a snapshot of the database + */ @Override public Snapshot getSnapshot() { var txn = _env.txnRead(); diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/stores/MemoryObjectPersistentStore.java b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/stores/MemoryObjectPersistentStore.java index 7b2169e4..8d951d49 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/stores/MemoryObjectPersistentStore.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/stores/MemoryObjectPersistentStore.java @@ -15,6 +15,10 @@ import java.util.Optional; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.stream.Stream; +/** + * In-memory implementation of the ObjectPersistentStore interface. + * For testing purposes. + */ @ApplicationScoped @IfBuildProperty(name = "dhfs.objects.persistence", stringValue = "memory") public class MemoryObjectPersistentStore implements ObjectPersistentStore { diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/stores/ObjectPersistentStore.java b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/stores/ObjectPersistentStore.java index 33ee51bf..1557efe0 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/stores/ObjectPersistentStore.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/stores/ObjectPersistentStore.java @@ -8,14 +8,33 @@ import javax.annotation.Nonnull; import java.nio.ByteBuffer; import java.util.Optional; -// Persistent storage of objects -// All changes are written as sequential transactions +/** + * Interface for a persistent store of objects. + * Does not have to be thread-safe! (yet), it is expected that all commits are done by the same thread. + */ public interface ObjectPersistentStore { + /** + * Get a snapshot of the persistent store. + * @return a snapshot of the persistent store + */ Snapshot getSnapshot(); + /** + * Commit a transaction to the persistent store. + * @param names the transaction manifest + * @param txId the transaction ID + */ void commitTx(TxManifestRaw names, long txId); + /** + * Get the size of the persistent store. + * @return the size of the persistent store + */ long getTotalSpace(); + /** + * Get the free space of the persistent store. + * @return the free space of the persistent store + */ long getFreeSpace(); } diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/stores/SerializingObjectPersistentStore.java b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/stores/SerializingObjectPersistentStore.java index edda87ea..5eb5b0e4 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/stores/SerializingObjectPersistentStore.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/stores/SerializingObjectPersistentStore.java @@ -16,6 +16,11 @@ import java.util.List; import java.util.Optional; import java.util.stream.Stream; +/** + * Serializing wrapper for the ObjectPersistentStore. + * It serializes the objects before storing them in the persistent store. + * It deserializes the objects after reading them from the persistent store. + */ @ApplicationScoped public class SerializingObjectPersistentStore { @Inject @@ -24,6 +29,13 @@ public class SerializingObjectPersistentStore { @Inject ObjectPersistentStore delegateStore; + /** + * Get a snapshot of the persistent store, with deserialized objects. + * + * The objects are deserialized lazily, only when their data is accessed. + * + * @return a snapshot of the persistent store + */ public Snapshot getSnapshot() { return new Snapshot() { private final Snapshot _backing = delegateStore.getSnapshot(); @@ -54,6 +66,12 @@ public class SerializingObjectPersistentStore { } + + /** + * Serialize the objects, in parallel + * @param objs the objects to serialize + * @return the serialized objects + */ private TxManifestRaw prepareManifest(TxManifestObj objs) { return new TxManifestRaw( objs.written().parallelStream() @@ -62,6 +80,11 @@ public class SerializingObjectPersistentStore { , objs.deleted()); } + /** + * Commit a transaction to the persistent store. + * @param objects the transaction manifest + * @param txId the transaction ID + */ void commitTx(TxManifestObj objects, long txId) { delegateStore.commitTx(prepareManifest(objects), txId); } diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/stores/TxManifestObj.java b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/stores/TxManifestObj.java index debb2aa8..7517cd82 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/stores/TxManifestObj.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/stores/TxManifestObj.java @@ -6,7 +6,6 @@ import org.apache.commons.lang3.tuple.Pair; import java.io.Serializable; import java.util.Collection; -// FIXME: Serializable public record TxManifestObj(Collection> written, Collection deleted) implements Serializable { } diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/stores/TxManifestRaw.java b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/stores/TxManifestRaw.java index cda13e57..8f487f23 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/stores/TxManifestRaw.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/stores/TxManifestRaw.java @@ -7,7 +7,6 @@ import org.apache.commons.lang3.tuple.Pair; import java.io.Serializable; import java.util.Collection; -// FIXME: Serializable public record TxManifestRaw(Collection> written, Collection deleted) implements Serializable { } diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/stores/WritebackObjectPersistentStore.java b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/stores/WritebackObjectPersistentStore.java index cf8bbb4d..e5ff5302 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/stores/WritebackObjectPersistentStore.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/stores/WritebackObjectPersistentStore.java @@ -33,6 +33,10 @@ import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; import java.util.function.Consumer; +/** + * Asynchronous write cache of objects. + * Objects are put into a write queue by commitTx, and written to the storage by a separate thread. + */ @ApplicationScoped public class WritebackObjectPersistentStore { @Inject @@ -260,16 +264,23 @@ public class WritebackObjectPersistentStore { } } - public void asyncFence(long bundleId, Runnable fn) { + /** + * Run a given callback after the transaction with id txId is committed. + * If the transaction is already committed, the callback is run immediately. + * + * @param txId transaction id to wait for + * @param fn callback to run + */ + public void asyncFence(long txId, Runnable fn) { verifyReady(); - if (bundleId < 0) throw new IllegalArgumentException("txId should be >0!"); - if (_lastFlushedId.get() >= bundleId) { + if (txId < 0) throw new IllegalArgumentException("txId should be >0!"); + if (_lastFlushedId.get() >= txId) { fn.run(); return; } _pendingBundleLock.lock(); try { - if (_lastFlushedId.get() >= bundleId) { + if (_lastFlushedId.get() >= txId) { fn.run(); return; } @@ -284,12 +295,23 @@ public class WritebackObjectPersistentStore { } } + /** + * Commit a transaction to the persistent store. + * + * @param writes the transaction manifest + * @return a function that allows to add a callback to be run after the transaction is committed + */ public Consumer commitTx(Collection> writes) { long bundleId = commitBundle(writes); return r -> asyncFence(bundleId, r); } + /** + * Get a snapshot of the persistent store, including the pending writes. + * + * @return a snapshot of the store + */ public Snapshot getSnapshot() { Snapshot cache = null; PendingWriteData pw = null;