diff --git a/dhfs-parent/objects/pom.xml b/dhfs-parent/objects/pom.xml index 66fe78ff..8f4c33ff 100644 --- a/dhfs-parent/objects/pom.xml +++ b/dhfs-parent/objects/pom.xml @@ -64,6 +64,11 @@ quarkus-junit5-mockito test + + org.lmdbjava + lmdbjava + 0.9.1 + diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/CloseableKvIterator.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/CloseableKvIterator.java new file mode 100644 index 00000000..82227750 --- /dev/null +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/CloseableKvIterator.java @@ -0,0 +1,10 @@ +package com.usatiuk.dhfs.objects; + +import com.usatiuk.dhfs.utils.AutoCloseableNoThrow; +import org.apache.commons.lang3.tuple.Pair; + +import java.util.Iterator; + +public interface CloseableKvIterator, V> extends Iterator>, AutoCloseableNoThrow { + K peekNextKey(); +} diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/JDataVersionedWrapper.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/JDataVersionedWrapper.java index b71ac8b9..facba141 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/JDataVersionedWrapper.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/JDataVersionedWrapper.java @@ -4,8 +4,8 @@ import jakarta.annotation.Nonnull; import java.io.Serializable; -public record JDataVersionedWrapper(@Nonnull T data, long version) implements Serializable { - public JDataVersionedWrapper withVersion(long version) { - return new JDataVersionedWrapper<>(data, version); +public record JDataVersionedWrapper(@Nonnull JData data, long version) implements Serializable { + public JDataVersionedWrapper withVersion(long version) { + return new JDataVersionedWrapper(data, version); } } diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/JObjectKey.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/JObjectKey.java index 67e368ac..26e5b347 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/JObjectKey.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/JObjectKey.java @@ -1,6 +1,7 @@ package com.usatiuk.dhfs.objects; import java.io.Serializable; +import java.nio.charset.StandardCharsets; public record JObjectKey(String name) implements Serializable, Comparable { public static JObjectKey of(String name) { @@ -16,4 +17,12 @@ public record JObjectKey(String name) implements Serializable, Comparable JDataVersionedWrapper get(Class type, JObjectKey key) { + private JDataVersionedWrapper get(Class type, JObjectKey key) { verifyReady(); while (true) { { @@ -63,26 +63,24 @@ public class JObjectManager { if (ref == null) { _objects.remove(key, got); } else if (type.isInstance(ref.data())) { - return (JDataVersionedWrapper) ref; + return (JDataVersionedWrapper) ref; } else { throw new IllegalArgumentException("Object type mismatch: " + ref.data().getClass() + " vs " + type); } } } - //noinspection unused try (var readLock = _objLocker.lock(key)) { if (_objects.containsKey(key)) continue; - var read = writebackObjectPersistentStore.readObject(key).orElse(null); if (read == null) return null; if (type.isInstance(read.data())) { - var wrapper = new JDataWrapper<>((JDataVersionedWrapper) read); + var wrapper = new JDataWrapper<>((JDataVersionedWrapper) read); var old = _objects.put(key, wrapper); assert old == null; - return (JDataVersionedWrapper) read; + return (JDataVersionedWrapper) read; } else { throw new IllegalArgumentException("Object type mismatch: " + read.getClass() + " vs " + type); } @@ -229,7 +227,7 @@ public class JObjectManager { switch (action.getValue()) { case TxRecord.TxObjectRecordWrite write -> { Log.trace("Writing " + action.getKey()); - var wrapped = new JDataVersionedWrapper<>(write.data(), tx.getId()); + var wrapped = new JDataVersionedWrapper(write.data(), tx.getId()); _objects.put(action.getKey(), new JDataWrapper<>(wrapped)); } case TxRecord.TxObjectRecordDeleted deleted -> { @@ -285,19 +283,19 @@ public class JObjectManager { } private record TransactionObjectNoLock - (Optional> data) + (Optional data) implements TransactionObject { } private record TransactionObjectLocked - (Optional> data, AutoCloseableNoThrow lock) + (Optional data, AutoCloseableNoThrow lock) implements TransactionObject { } - private class JDataWrapper extends WeakReference> { + private class JDataWrapper extends WeakReference { private static final Cleaner CLEANER = Cleaner.create(); - public JDataWrapper(JDataVersionedWrapper referent) { + public JDataWrapper(JDataVersionedWrapper referent) { super(referent); var key = referent.data().key(); CLEANER.register(referent, () -> { diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/MergingKvIterator.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/MergingKvIterator.java new file mode 100644 index 00000000..93305583 --- /dev/null +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/MergingKvIterator.java @@ -0,0 +1,73 @@ +package com.usatiuk.dhfs.objects; + +import org.apache.commons.lang3.tuple.Pair; + +import java.util.List; +import java.util.NoSuchElementException; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.stream.Stream; + +public class MergingKvIterator, V> implements CloseableKvIterator { + private final List> _iterators; + private final SortedMap> _sortedIterators = new TreeMap<>(); + + public MergingKvIterator(List> iterators) { + _iterators = iterators; + + for (CloseableKvIterator iterator : iterators) { + if (!iterator.hasNext()) { + continue; + } + K key = iterator.peekNextKey(); + if (key != null) { + _sortedIterators.put(key, iterator); + } + } + } + + @SafeVarargs + public MergingKvIterator(CloseableKvIterator... iterators) { + this(List.of(iterators)); + } + + @SafeVarargs + public MergingKvIterator(MergingKvIterator parent, CloseableKvIterator... iterators) { + this(Stream.concat(parent._iterators.stream(), Stream.of(iterators)).toList()); + } + + @Override + public K peekNextKey() { + var cur = _sortedIterators.pollFirstEntry(); + if (cur == null) { + throw new NoSuchElementException(); + } + return cur.getKey(); + } + + @Override + public void close() { + for (CloseableKvIterator iterator : _iterators) { + iterator.close(); + } + } + + @Override + public boolean hasNext() { + return !_sortedIterators.isEmpty(); + } + + @Override + public Pair next() { + var cur = _sortedIterators.pollFirstEntry(); + if (cur == null) { + throw new NoSuchElementException(); + } + var curVal = cur.getValue().next(); + if (cur.getValue().hasNext()) { + var nextKey = cur.getValue().peekNextKey(); + _sortedIterators.put(nextKey, cur.getValue()); + } + return curVal; + } +} diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/NavigableMapKvIterator.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/NavigableMapKvIterator.java new file mode 100644 index 00000000..ac224347 --- /dev/null +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/NavigableMapKvIterator.java @@ -0,0 +1,63 @@ +package com.usatiuk.dhfs.objects; + +import com.usatiuk.dhfs.objects.persistence.IteratorStart; +import org.apache.commons.lang3.tuple.Pair; + +import java.util.*; + +public class NavigableMapKvIterator, V> implements CloseableKvIterator { + private final Iterator> _iterator; + private Map.Entry _next; + + public NavigableMapKvIterator(NavigableMap map, IteratorStart start, K key) { + SortedMap _view; + switch (start) { + case GE -> _view = map.tailMap(key, true); + case GT -> _view = map.tailMap(key, false); + case LE -> { + var tail = map.tailMap(key, true); + if (tail.firstKey().equals(key)) _view = tail; + else _view = map.tailMap(map.lowerKey(key), true); + } + case LT -> _view = map.tailMap(map.lowerKey(key), true); + default -> throw new IllegalArgumentException("Unknown start type"); + } + _iterator = _view.entrySet().iterator(); + fillNext(); + } + + private void fillNext() { + while (_iterator.hasNext() && _next == null) { + _next = _iterator.next(); + } + } + + @Override + public K peekNextKey() { + if (_next == null) { + throw new NoSuchElementException(); + } + return _next.getKey(); + } + + @Override + public void close() { + } + + @Override + public boolean hasNext() { + return _next != null; + } + + @Override + public Pair next() { + if (_next == null) { + throw new NoSuchElementException("No more elements"); + } + var ret = _next; + _next = null; + fillNext(); + return Pair.of(ret); + } + +} diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/PredicateKvIterator.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/PredicateKvIterator.java new file mode 100644 index 00000000..4f2651c4 --- /dev/null +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/PredicateKvIterator.java @@ -0,0 +1,56 @@ +package com.usatiuk.dhfs.objects; + +import org.apache.commons.lang3.tuple.Pair; + +import java.util.NoSuchElementException; +import java.util.function.Function; + +public class PredicateKvIterator, V, V_T> implements CloseableKvIterator { + private final CloseableKvIterator _backing; + private final Function _transformer; + private Pair _next; + + public PredicateKvIterator(CloseableKvIterator backing, Function transformer) { + _backing = backing; + _transformer = transformer; + fillNext(); + } + + private void fillNext() { + while (_backing.hasNext() && _next == null) { + var next = _backing.next(); + var transformed = _transformer.apply(next.getValue()); + if (transformed == null) + continue; + _next = Pair.of(next.getKey(), transformed); + } + } + + @Override + public K peekNextKey() { + if (_next == null) + throw new NoSuchElementException(); + return _next.getKey(); + } + + @Override + public void close() { + _backing.close(); + } + + @Override + public boolean hasNext() { + return _next != null; + } + + @Override + public Pair next() { + if (_next == null) + throw new NoSuchElementException("No more elements"); + var ret = _next; + _next = null; + fillNext(); + return ret; + } + +} diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/TxBundle.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/TxBundle.java index a31dc61d..8068e262 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/TxBundle.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/TxBundle.java @@ -3,7 +3,7 @@ package com.usatiuk.dhfs.objects; public interface TxBundle { long getId(); - void commit(JDataVersionedWrapper obj); + void commit(JDataVersionedWrapper obj); void delete(JObjectKey obj); } diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/TxWriteback.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/TxWriteback.java index 27ac10e4..8723d4ee 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/TxWriteback.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/TxWriteback.java @@ -1,5 +1,7 @@ package com.usatiuk.dhfs.objects; +import com.usatiuk.dhfs.objects.persistence.IteratorStart; + import java.util.Collection; import java.util.Optional; @@ -13,6 +15,7 @@ public interface TxWriteback { void fence(long bundleId); Optional getPendingWrite(JObjectKey key); + Collection getPendingWrites(); // Executes callback after bundle with bundleId id has been persisted @@ -23,9 +26,15 @@ public interface TxWriteback { long bundleId(); } - record PendingWrite(JDataVersionedWrapper data, long bundleId) implements PendingWriteEntry { + record PendingWrite(JDataVersionedWrapper data, long bundleId) implements PendingWriteEntry { } record PendingDelete(JObjectKey key, long bundleId) implements PendingWriteEntry { } + + CloseableKvIterator getIterator(IteratorStart start, JObjectKey key); + + default CloseableKvIterator getIterator(JObjectKey key) { + return getIterator(IteratorStart.GE, key); + } } diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/TxWritebackImpl.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/TxWritebackImpl.java index 5f40d65d..4c6676c8 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/TxWritebackImpl.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/TxWritebackImpl.java @@ -1,6 +1,7 @@ package com.usatiuk.dhfs.objects; import com.usatiuk.dhfs.objects.persistence.CachingObjectPersistentStore; +import com.usatiuk.dhfs.objects.persistence.IteratorStart; import com.usatiuk.dhfs.objects.persistence.TxManifestObj; import io.quarkus.logging.Log; import io.quarkus.runtime.ShutdownEvent; @@ -14,7 +15,7 @@ import org.apache.commons.lang3.tuple.Pair; import org.eclipse.microprofile.config.inject.ConfigProperty; import java.util.*; -import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -24,7 +25,7 @@ import java.util.concurrent.atomic.AtomicLong; public class TxWritebackImpl implements TxWriteback { private final LinkedList _pendingBundles = new LinkedList<>(); - private final ConcurrentHashMap _pendingWrites = new ConcurrentHashMap<>(); + private final ConcurrentSkipListMap _pendingWrites = new ConcurrentSkipListMap<>(); private final LinkedHashMap _notFlushedBundles = new LinkedHashMap<>(); private final Object _flushWaitSynchronizer = new Object(); @@ -37,7 +38,6 @@ public class TxWritebackImpl implements TxWriteback { long sizeLimit; private long currentSize = 0; private ExecutorService _writebackExecutor; - private ExecutorService _commitExecutor; private ExecutorService _statusExecutor; private volatile boolean _ready = false; @@ -51,21 +51,13 @@ public class TxWritebackImpl implements TxWriteback { _writebackExecutor.submit(this::writeback); } - { - BasicThreadFactory factory = new BasicThreadFactory.Builder() - .namingPattern("writeback-commit-%d") - .build(); - - _commitExecutor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(), factory); - } _statusExecutor = Executors.newSingleThreadExecutor(); _statusExecutor.submit(() -> { try { while (true) { Thread.sleep(1000); if (currentSize > 0) - Log.info("Tx commit status: size=" - + currentSize / 1024 / 1024 + "MB"); + Log.info("Tx commit status: size=" + currentSize / 1024 / 1024 + "MB"); } } catch (InterruptedException ignored) { } @@ -111,12 +103,12 @@ public class TxWritebackImpl implements TxWriteback { } } - var toWrite = new ArrayList>>(); + var toWrite = new ArrayList>(); var toDelete = new ArrayList(); for (var e : bundle._entries.values()) { switch (e) { - case TxBundleImpl.CommittedEntry(JObjectKey key, JDataVersionedWrapper data, int size) -> { + case TxBundleImpl.CommittedEntry(JObjectKey key, JDataVersionedWrapper data, int size) -> { Log.trace("Writing new " + key); toWrite.add(Pair.of(key, data)); } @@ -336,7 +328,7 @@ public class TxWritebackImpl implements TxWriteback { } @Override - public void commit(JDataVersionedWrapper obj) { + public void commit(JDataVersionedWrapper obj) { synchronized (_entries) { _entries.put(obj.data().key(), new CommittedEntry(obj.data().key(), obj, obj.data().estimateSize())); } @@ -371,7 +363,7 @@ public class TxWritebackImpl implements TxWriteback { int size(); } - private record CommittedEntry(JObjectKey key, JDataVersionedWrapper data, int size) + private record CommittedEntry(JObjectKey key, JDataVersionedWrapper data, int size) implements BundleEntry { } @@ -383,4 +375,18 @@ public class TxWritebackImpl implements TxWriteback { } } } + + // Returns an iterator with a view of all commited objects + // Does not have to guarantee consistent view, snapshots are handled by upper layers + @Override + public CloseableKvIterator getIterator(IteratorStart start, JObjectKey key) { + return new PredicateKvIterator<>( + new NavigableMapKvIterator<>(_pendingWrites, start, key), + e -> { + if (e instanceof PendingWrite pw) { + return pw.data(); + } + return null; + }); + } } diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/WritebackObjectPersistentStore.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/WritebackObjectPersistentStore.java index 48f7265a..4fd115c0 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/WritebackObjectPersistentStore.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/WritebackObjectPersistentStore.java @@ -1,6 +1,7 @@ package com.usatiuk.dhfs.objects; import com.usatiuk.dhfs.objects.persistence.CachingObjectPersistentStore; +import com.usatiuk.dhfs.objects.persistence.IteratorStart; import com.usatiuk.dhfs.objects.transaction.TxRecord; import io.quarkus.logging.Log; import jakarta.enterprise.context.ApplicationScoped; @@ -34,7 +35,7 @@ public class WritebackObjectPersistentStore { } @Nonnull - Optional> readObject(JObjectKey name) { + Optional readObject(JObjectKey name) { var pending = txWriteback.getPendingWrite(name).orElse(null); return switch (pending) { case TxWriteback.PendingWrite write -> Optional.of(write.data()); @@ -51,7 +52,7 @@ public class WritebackObjectPersistentStore { switch (action) { case TxRecord.TxObjectRecordWrite write -> { Log.trace("Flushing object " + write.key()); - bundle.commit(new JDataVersionedWrapper<>(write.data(), id)); + bundle.commit(new JDataVersionedWrapper(write.data(), id)); } case TxRecord.TxObjectRecordDeleted deleted -> { Log.trace("Deleting object " + deleted.key()); @@ -74,4 +75,14 @@ public class WritebackObjectPersistentStore { return r -> txWriteback.asyncFence(bundleId, r); } + + // Returns an iterator with a view of all commited objects + // Does not have to guarantee consistent view, snapshots are handled by upper layers + public CloseableKvIterator getIterator(IteratorStart start, JObjectKey key) { + return new MergingKvIterator<>(delegate.getIterator(start, key), txWriteback.getIterator(start, key)); + } + + public CloseableKvIterator getIterator(JObjectKey key) { + return getIterator(IteratorStart.GE, key); + } } diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/persistence/CachingObjectPersistentStore.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/persistence/CachingObjectPersistentStore.java index 127cdde7..60afe37a 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/persistence/CachingObjectPersistentStore.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/persistence/CachingObjectPersistentStore.java @@ -1,7 +1,6 @@ package com.usatiuk.dhfs.objects.persistence; -import com.usatiuk.dhfs.objects.JDataVersionedWrapper; -import com.usatiuk.dhfs.objects.JObjectKey; +import com.usatiuk.dhfs.objects.*; import com.usatiuk.dhfs.utils.DataLocker; import io.quarkus.logging.Log; import io.quarkus.runtime.Startup; @@ -14,6 +13,7 @@ import javax.annotation.Nonnull; import java.util.Collection; import java.util.LinkedHashMap; import java.util.Optional; +import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.stream.Stream; @@ -21,6 +21,7 @@ import java.util.stream.Stream; @ApplicationScoped public class CachingObjectPersistentStore { private final LinkedHashMap _cache = new LinkedHashMap<>(8, 0.75f, true); + private final ConcurrentSkipListMap _sortedCache = new ConcurrentSkipListMap<>(); private final DataLocker _locker = new DataLocker(); @Inject SerializingObjectPersistentStore delegate; @@ -57,17 +58,20 @@ public class CachingObjectPersistentStore { return delegate.findAllObjects(); } - private void put(JObjectKey key, Optional> obj) { + private void put(JObjectKey key, Optional obj) { synchronized (_cache) { int size = obj.map(o -> o.data().estimateSize()).orElse(0); _curSize += size; - var old = _cache.putLast(key, new CacheEntry(obj, size)); + var entry = new CacheEntry(obj, size); + var old = _cache.putLast(key, entry); + _sortedCache.put(key, entry); if (old != null) _curSize -= old.size(); while (_curSize >= sizeLimit) { var del = _cache.pollFirstEntry(); + _sortedCache.remove(del.getKey(), del.getValue()); _curSize -= del.getValue().size(); _evict++; } @@ -75,7 +79,7 @@ public class CachingObjectPersistentStore { } @Nonnull - public Optional> readObject(JObjectKey name) { + public Optional readObject(JObjectKey name) { try (var lock = _locker.lock(name)) { synchronized (_cache) { var got = _cache.get(name); @@ -90,7 +94,7 @@ public class CachingObjectPersistentStore { } } - public void commitTx(TxManifestObj> names) { + public void commitTx(TxManifestObj names) { // During commit, readObject shouldn't be called for these items, // it should be handled by the upstream store synchronized (_cache) { @@ -98,11 +102,27 @@ public class CachingObjectPersistentStore { names.deleted().stream()).toList()) { _curSize -= Optional.ofNullable(_cache.get(key)).map(CacheEntry::size).orElse(0L); _cache.remove(key); + _sortedCache.remove(key); } } delegate.commitTx(names); } - private record CacheEntry(Optional> object, long size) { + // Returns an iterator with a view of all commited objects + // Does not have to guarantee consistent view, snapshots are handled by upper layers + public CloseableKvIterator getIterator(IteratorStart start, JObjectKey key) { + return new MergingKvIterator<>( + new PredicateKvIterator<>( + new NavigableMapKvIterator<>(_sortedCache, start, key), + e -> e.object().orElse(null) + ), + delegate.getIterator(start, key)); + } + + public CloseableKvIterator getIterator(JObjectKey key) { + return getIterator(IteratorStart.GE, key); + } + + private record CacheEntry(Optional object, long size) { } } diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/persistence/FileObjectPersistentStore.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/persistence/FileObjectPersistentStore.java deleted file mode 100644 index b668534c..00000000 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/persistence/FileObjectPersistentStore.java +++ /dev/null @@ -1,308 +0,0 @@ -package com.usatiuk.dhfs.objects.persistence; - -import com.google.protobuf.ByteString; -import com.google.protobuf.UnsafeByteOperations; -import com.usatiuk.dhfs.objects.JObjectKey; -import com.usatiuk.dhfs.supportlib.UninitializedByteBuffer; -import com.usatiuk.dhfs.utils.ByteUtils; -import com.usatiuk.dhfs.utils.SerializationHelper; -import com.usatiuk.dhfs.utils.StatusRuntimeExceptionNoStacktrace; -import io.grpc.Status; -import io.quarkus.arc.properties.IfBuildProperty; -import io.quarkus.logging.Log; -import io.quarkus.runtime.ShutdownEvent; -import io.quarkus.runtime.StartupEvent; -import jakarta.annotation.Priority; -import jakarta.enterprise.context.ApplicationScoped; -import jakarta.enterprise.event.Observes; -import net.openhft.hashing.LongHashFunction; -import org.eclipse.microprofile.config.inject.ConfigProperty; - -import javax.annotation.Nonnull; -import java.io.*; -import java.nio.ByteBuffer; -import java.nio.channels.FileChannel; -import java.nio.file.Files; -import java.nio.file.NoSuchFileException; -import java.nio.file.Path; -import java.util.*; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.stream.Stream; - -import static java.nio.file.StandardCopyOption.ATOMIC_MOVE; -import static java.nio.file.StandardCopyOption.REPLACE_EXISTING; - -// File format: -// 64-bit metadata serialized size -// 64-bit offset of "rest of" metadata (if -1 then file has no data, -// if 0 then file has data and metadata fits into META_BLOCK_SIZE) -// Until META_BLOCK_SIZE - metadata (encoded as ObjectMetadataP) -// data (encoded as JObjectDataP) -// rest of metadata - -@ApplicationScoped -@IfBuildProperty(name = "dhfs.objects.persistence", stringValue = "files") -public class FileObjectPersistentStore implements ObjectPersistentStore { - private final Path _root; - private final Path _txManifest; - private ExecutorService _flushExecutor; - private RandomAccessFile _txFile; - private boolean _ready = false; - - public FileObjectPersistentStore(@ConfigProperty(name = "dhfs.objects.persistence.files.root") String root) { - this._root = Path.of(root).resolve("objects"); - _txManifest = Path.of(root).resolve("cur-tx-manifest"); - } - - void init(@Observes @Priority(100) StartupEvent event) throws IOException { - if (!_root.toFile().exists()) { - Log.info("Initializing with root " + _root); - _root.toFile().mkdirs(); - for (int i = 0; i < 256; i++) { - _root.resolve(String.valueOf(i)).toFile().mkdirs(); - } - } - if (!Files.exists(_txManifest)) { - Files.createFile(_txManifest); - } - _txFile = new RandomAccessFile(_txManifest.toFile(), "rw"); - _flushExecutor = Executors.newVirtualThreadPerTaskExecutor(); - - tryReplay(); - Log.info("Transaction replay done"); - _ready = true; - } - - void shutdown(@Observes @Priority(900) ShutdownEvent event) throws IOException { - _ready = false; - Log.debug("Deleting manifest file"); - _txFile.close(); - Files.delete(_txManifest); - Log.debug("Manifest file deleted"); - } - - private void verifyReady() { - if (!_ready) throw new IllegalStateException("Wrong service order!"); - } - - private void tryReplay() { - var read = readTxManifest(); - if (read != null) - commitTxImpl(read, false); - } - - private Path getObjPath(@Nonnull JObjectKey obj) { - int h = Objects.hash(obj); - int p1 = h & 0b00000000_00000000_11111111_00000000; - return _root.resolve(String.valueOf(p1 >> 8)).resolve(obj.toString()); - } - - private Path getTmpObjPath(@Nonnull JObjectKey obj) { - int h = Objects.hash(obj); - int p1 = h & 0b00000000_00000000_11111111_00000000; - return _root.resolve(String.valueOf(p1 >> 8)).resolve(obj + ".tmp"); - } - - private void findAllObjectsImpl(Collection out, Path path) { - var read = path.toFile().listFiles(); - if (read == null) return; - - for (var s : read) { - if (s.isDirectory()) { - findAllObjectsImpl(out, s.toPath()); - } else { - if (s.getName().endsWith(".tmp")) continue; // FIXME: - out.add(new JObjectKey(s.getName())); // FIXME: - } - } - } - - @Nonnull - @Override - public Collection findAllObjects() { - verifyReady(); - ArrayList out = new ArrayList<>(); - findAllObjectsImpl(out, _root); - return Collections.unmodifiableCollection(out); - } - - @Nonnull - @Override - public Optional readObject(JObjectKey name) { - verifyReady(); - var path = getObjPath(name); - try (var rf = new RandomAccessFile(path.toFile(), "r")) { - ByteBuffer buf = UninitializedByteBuffer.allocateUninitialized(Math.toIntExact(rf.getChannel().size())); - fillBuffer(buf, rf.getChannel()); - buf.flip(); - - var bs = UnsafeByteOperations.unsafeWrap(buf); - // This way, the input will be considered "immutable" which would allow avoiding copies - // when parsing byte arrays -// var ch = bs.newCodedInput(); -// ch.enableAliasing(true); - return Optional.of(bs); - } catch (EOFException | FileNotFoundException | NoSuchFileException fx) { - return Optional.empty(); - } catch (IOException e) { - Log.error("Error reading file " + path, e); - throw new StatusRuntimeExceptionNoStacktrace(Status.INTERNAL); - } - } - - private void fillBuffer(ByteBuffer dst, FileChannel src) throws IOException { - int rem = dst.remaining(); - int readTotal = 0; - int readCur = 0; - while (readTotal < rem && (readCur = src.read(dst)) != -1) { - readTotal += readCur; - } - if (rem != readTotal) - throw new EOFException(); - } - - private void writeObjectImpl(Path path, ByteString data, boolean sync) throws IOException { - try (var fsb = new FileOutputStream(path.toFile(), false)) { - data.writeTo(fsb); - - if (sync) { - fsb.flush(); - fsb.getFD().sync(); - } - } - } - - private TxManifestRaw readTxManifest() { - try { - var channel = _txFile.getChannel(); - - if (channel.size() == 0) - return null; - - channel.position(0); - - var buf = ByteBuffer.allocate(Math.toIntExact(channel.size())); - - fillBuffer(buf, channel); - buf.flip(); - - long checksum = buf.getLong(); - var data = buf.slice(); - var hash = LongHashFunction.xx3().hashBytes(data); - - if (hash != checksum) - throw new StatusRuntimeExceptionNoStacktrace(Status.DATA_LOSS.withDescription("Transaction manifest checksum mismatch!")); - - return SerializationHelper.deserialize(data.array(), data.arrayOffset()); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - private void putTxManifest(TxManifestRaw manifest) { - try { - var channel = _txFile.getChannel(); - var data = SerializationHelper.serializeArray(manifest); - channel.truncate(data.length + 8); - channel.position(0); - var hash = LongHashFunction.xx3().hashBytes(data); - if (channel.write(ByteUtils.longToBb(hash)) != 8) - throw new StatusRuntimeExceptionNoStacktrace(Status.INTERNAL); - if (channel.write(ByteBuffer.wrap(data)) != data.length) - throw new StatusRuntimeExceptionNoStacktrace(Status.INTERNAL); - channel.force(true); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - @Override - public void commitTx(TxManifestRaw manifest) { - verifyReady(); - try { - _flushExecutor.invokeAll( - manifest.written().stream().map(p -> (Callable) () -> { - var tmpPath = getTmpObjPath(p.getKey()); - writeObjectImpl(tmpPath, p.getValue(), true); - return null; - }).toList() - ).forEach(p -> { - try { - p.get(); - } catch (InterruptedException | ExecutionException e) { - throw new RuntimeException(e); - } - }); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - commitTxImpl(manifest, true); - } - - public void commitTxImpl(TxManifestRaw manifest, boolean failIfNotFound) { - if (manifest.deleted().isEmpty() && manifest.written().isEmpty()) { - Log.debug("Empty manifest, skipping"); - return; - } - - putTxManifest(manifest); - - try { - _flushExecutor.invokeAll( - Stream.concat(manifest.written().stream().map(p -> (Callable) () -> { - try { - Files.move(getTmpObjPath(p.getKey()), getObjPath(p.getKey()), ATOMIC_MOVE, REPLACE_EXISTING); - } catch (NoSuchFileException n) { - if (failIfNotFound) - throw n; - } - return null; - }), - manifest.deleted().stream().map(p -> (Callable) () -> { - deleteImpl(getObjPath(p)); - return null; - })).toList() - ).forEach(p -> { - try { - p.get(); - } catch (InterruptedException | ExecutionException e) { - throw new RuntimeException(e); - } - }); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - } - - private void deleteImpl(Path path) { - try { - Files.delete(path); - } catch (NoSuchFileException ignored) { - } catch (IOException e) { - Log.error("Error deleting file " + path, e); - throw new StatusRuntimeExceptionNoStacktrace(Status.INTERNAL); - } - } - - @Override - public long getTotalSpace() { - verifyReady(); - return _root.toFile().getTotalSpace(); - } - - @Override - public long getFreeSpace() { - verifyReady(); - return _root.toFile().getFreeSpace(); - } - - @Override - public long getUsableSpace() { - verifyReady(); - return _root.toFile().getUsableSpace(); - } - -} diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/persistence/IteratorStart.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/persistence/IteratorStart.java new file mode 100644 index 00000000..6dd270c9 --- /dev/null +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/persistence/IteratorStart.java @@ -0,0 +1,8 @@ +package com.usatiuk.dhfs.objects.persistence; + +public enum IteratorStart { + LT, + LE, + GT, + GE +} diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/persistence/LmdbObjectPersistentStore.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/persistence/LmdbObjectPersistentStore.java new file mode 100644 index 00000000..05e77dfa --- /dev/null +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/persistence/LmdbObjectPersistentStore.java @@ -0,0 +1,201 @@ +package com.usatiuk.dhfs.objects.persistence; + +import com.google.protobuf.ByteString; +import com.usatiuk.dhfs.objects.CloseableKvIterator; +import com.usatiuk.dhfs.objects.JObjectKey; +import io.quarkus.arc.properties.IfBuildProperty; +import io.quarkus.logging.Log; +import io.quarkus.runtime.ShutdownEvent; +import io.quarkus.runtime.StartupEvent; +import jakarta.annotation.Priority; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.enterprise.event.Observes; +import org.apache.commons.lang3.tuple.Pair; +import org.eclipse.microprofile.config.inject.ConfigProperty; +import org.lmdbjava.*; + +import javax.annotation.Nonnull; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Path; +import java.util.Collection; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.Optional; + +import static org.lmdbjava.DbiFlags.MDB_CREATE; +import static org.lmdbjava.Env.create; + +@ApplicationScoped +@IfBuildProperty(name = "dhfs.objects.persistence", stringValue = "lmdb") +public class LmdbObjectPersistentStore implements ObjectPersistentStore { + private final Path _root; + private Env _env; + private Dbi _db; + private boolean _ready = false; + + private static final String DB_NAME = "objects"; + + public LmdbObjectPersistentStore(@ConfigProperty(name = "dhfs.objects.persistence.files.root") String root) { + _root = Path.of(root).resolve("objects"); + } + + void init(@Observes @Priority(100) StartupEvent event) throws IOException { + if (!_root.toFile().exists()) { + Log.info("Initializing with root " + _root); + _root.toFile().mkdirs(); + } + _env = create(ByteArrayProxy.PROXY_BA) + .setMapSize(1_000_000_000_000L) + .setMaxDbs(1) + .open(_root.toFile(), EnvFlags.MDB_NOTLS); + _db = _env.openDbi(DB_NAME, MDB_CREATE); + _ready = true; + } + + void shutdown(@Observes @Priority(900) ShutdownEvent event) throws IOException { + _ready = false; + _db.close(); + _env.close(); + } + + private void verifyReady() { + if (!_ready) throw new IllegalStateException("Wrong service order!"); + } + + @Nonnull + @Override + public Collection findAllObjects() { +// try (Txn txn = env.txnRead()) { +// try (var cursor = db.openCursor(txn)) { +// var keys = List.of(); +// while (cursor.next()) { +// keys.add(JObjectKey.fromBytes(cursor.key())); +// } +// return keys; +// } +// } + return List.of(); + } + + @Nonnull + @Override + public Optional readObject(JObjectKey name) { + verifyReady(); + try (Txn txn = _env.txnRead()) { + var key = name.toString().getBytes(StandardCharsets.UTF_8); + var value = _db.get(txn, key); + return Optional.ofNullable(value).map(ByteString::copyFrom); + } + } + + private class LmdbKvIterator implements CloseableKvIterator { + private final Txn _txn = _env.txnRead(); + private final Cursor _cursor = _db.openCursor(_txn); + private boolean _hasNext = false; + + LmdbKvIterator(IteratorStart start, JObjectKey key) { + verifyReady(); + if (!_cursor.get(key.toString().getBytes(StandardCharsets.UTF_8), GetOp.MDB_SET_RANGE)) { + return; + } + + var got = JObjectKey.fromBytes(_cursor.key()); + var cmp = got.compareTo(key); + + assert cmp >= 0; + + _hasNext = true; + + if (cmp == 0) { + switch (start) { + case LT -> { + _hasNext = _cursor.prev(); + } + case GT -> { + _hasNext = _cursor.next(); + } + case LE, GE -> { + } + } + } else { + switch (start) { + case LT, LE -> { + _hasNext = _cursor.prev(); + } + case GT, GE -> { + } + } + } + } + + @Override + public void close() { + _cursor.close(); + _txn.close(); + } + + @Override + public boolean hasNext() { + return _hasNext; + } + + @Override + public Pair next() { + if (!_hasNext) { + throw new NoSuchElementException("No more elements"); + } + var ret = Pair.of(JObjectKey.fromBytes(_cursor.key()), ByteString.copyFrom(_cursor.val())); + _hasNext = _cursor.next(); + return ret; + } + + @Override + public JObjectKey peekNextKey() { + if (!_hasNext) { + throw new NoSuchElementException("No more elements"); + } + return JObjectKey.fromBytes(_cursor.key()); + } + } + + @Override + public CloseableKvIterator getIterator(IteratorStart start, JObjectKey key) { + return new LmdbKvIterator(start, key); + } + + @Override + public void commitTx(TxManifestRaw names) { + verifyReady(); + try (Txn txn = _env.txnWrite()) { + for (var written : names.written()) { + var key = written.getKey().toString().getBytes(StandardCharsets.UTF_8); + _db.put(txn, key, written.getValue().toByteArray()); + } + for (JObjectKey key : names.deleted()) { + var keyBytes = key.toString().getBytes(StandardCharsets.UTF_8); + _db.delete(txn, keyBytes); + } + txn.commit(); + } + } + + @Override + public long getTotalSpace() { + verifyReady(); + return _root.toFile().getTotalSpace(); + } + + @Override + public long getFreeSpace() { + verifyReady(); + return _root.toFile().getFreeSpace(); + } + + @Override + public long getUsableSpace() { + verifyReady(); + return _root.toFile().getUsableSpace(); + } + +} diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/persistence/MemoryObjectPersistentStore.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/persistence/MemoryObjectPersistentStore.java index cc7bd59e..7bba672a 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/persistence/MemoryObjectPersistentStore.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/persistence/MemoryObjectPersistentStore.java @@ -1,20 +1,21 @@ package com.usatiuk.dhfs.objects.persistence; import com.google.protobuf.ByteString; +import com.usatiuk.dhfs.objects.CloseableKvIterator; import com.usatiuk.dhfs.objects.JObjectKey; +import com.usatiuk.dhfs.objects.NavigableMapKvIterator; import io.quarkus.arc.properties.IfBuildProperty; import jakarta.enterprise.context.ApplicationScoped; import javax.annotation.Nonnull; import java.util.Collection; -import java.util.HashMap; -import java.util.Map; import java.util.Optional; +import java.util.concurrent.ConcurrentSkipListMap; @ApplicationScoped @IfBuildProperty(name = "dhfs.objects.persistence", stringValue = "memory") public class MemoryObjectPersistentStore implements ObjectPersistentStore { - private final Map _objects = new HashMap<>(); + private final ConcurrentSkipListMap _objects = new ConcurrentSkipListMap<>(); @Nonnull @Override @@ -32,6 +33,11 @@ public class MemoryObjectPersistentStore implements ObjectPersistentStore { } } + @Override + public CloseableKvIterator getIterator(IteratorStart start, JObjectKey key) { + return new NavigableMapKvIterator<>(_objects, start, key); + } + @Override public void commitTx(TxManifestRaw names) { synchronized (this) { diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/persistence/ObjectPersistentStore.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/persistence/ObjectPersistentStore.java index 19fe5d42..3467007b 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/persistence/ObjectPersistentStore.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/persistence/ObjectPersistentStore.java @@ -1,6 +1,7 @@ package com.usatiuk.dhfs.objects.persistence; import com.google.protobuf.ByteString; +import com.usatiuk.dhfs.objects.CloseableKvIterator; import com.usatiuk.dhfs.objects.JObjectKey; import javax.annotation.Nonnull; @@ -16,6 +17,14 @@ public interface ObjectPersistentStore { @Nonnull Optional readObject(JObjectKey name); + // Returns an iterator with a view of all commited objects + // Does not have to guarantee consistent view, snapshots are handled by upper layers + CloseableKvIterator getIterator(IteratorStart start, JObjectKey key); + + default CloseableKvIterator getIterator(JObjectKey key) { + return getIterator(IteratorStart.GE, key); + } + void commitTx(TxManifestRaw names); long getTotalSpace(); diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/persistence/SerializingObjectPersistentStore.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/persistence/SerializingObjectPersistentStore.java index 99abf09c..6c339d03 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/persistence/SerializingObjectPersistentStore.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/persistence/SerializingObjectPersistentStore.java @@ -1,5 +1,7 @@ package com.usatiuk.dhfs.objects.persistence; +import com.google.protobuf.ByteString; +import com.usatiuk.dhfs.objects.CloseableKvIterator; import com.usatiuk.dhfs.objects.JDataVersionedWrapper; import com.usatiuk.dhfs.objects.JObjectKey; import com.usatiuk.dhfs.objects.ObjectSerializer; @@ -17,20 +19,59 @@ public class SerializingObjectPersistentStore { ObjectSerializer serializer; @Inject - ObjectPersistentStore delegate; + ObjectPersistentStore delegateStore; @Nonnull Collection findAllObjects() { - return delegate.findAllObjects(); + return delegateStore.findAllObjects(); } @Nonnull - Optional> readObject(JObjectKey name) { - return delegate.readObject(name).map(serializer::deserialize); + Optional readObject(JObjectKey name) { + return delegateStore.readObject(name).map(serializer::deserialize); } - void commitTx(TxManifestObj> names) { - delegate.commitTx(new TxManifestRaw( + private class SerializingKvIterator implements CloseableKvIterator { + private final CloseableKvIterator _delegate; + + private SerializingKvIterator(IteratorStart start, JObjectKey key) { + _delegate = delegateStore.getIterator(start, key); + } + + @Override + public JObjectKey peekNextKey() { + return _delegate.peekNextKey(); + } + + @Override + public void close() { + _delegate.close(); + } + + @Override + public boolean hasNext() { + return _delegate.hasNext(); + } + + @Override + public Pair next() { + var next = _delegate.next(); + return Pair.of(next.getKey(), serializer.deserialize(next.getValue())); + } + } + + // Returns an iterator with a view of all commited objects + // Does not have to guarantee consistent view, snapshots are handled by upper layers + public CloseableKvIterator getIterator(IteratorStart start, JObjectKey key) { + return new SerializingKvIterator(start, key); + } + + public CloseableKvIterator getIterator(JObjectKey key) { + return getIterator(IteratorStart.GE, key); + } + + void commitTx(TxManifestObj names) { + delegateStore.commitTx(new TxManifestRaw( names.written().stream() .map(e -> Pair.of(e.getKey(), serializer.serialize(e.getValue()))) .toList() diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/TransactionFactoryImpl.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/TransactionFactoryImpl.java index 629d8a67..cb5a4ccb 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/TransactionFactoryImpl.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/TransactionFactoryImpl.java @@ -75,8 +75,8 @@ public class TransactionFactoryImpl implements TransactionFactory { } return switch (strategy) { - case OPTIMISTIC -> _source.get(type, key).data().map(JDataVersionedWrapper::data); - case WRITE -> _source.getWriteLocked(type, key).data().map(JDataVersionedWrapper::data); + case OPTIMISTIC -> (Optional) _source.get(type, key).data().map(JDataVersionedWrapper::data); + case WRITE -> (Optional) _source.getWriteLocked(type, key).data().map(JDataVersionedWrapper::data); }; } diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/TransactionObject.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/TransactionObject.java index 5404245a..05826900 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/TransactionObject.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/TransactionObject.java @@ -6,5 +6,5 @@ import com.usatiuk.dhfs.objects.JDataVersionedWrapper; import java.util.Optional; public interface TransactionObject { - Optional> data(); + Optional data(); } diff --git a/dhfs-parent/objects/src/main/resources/application.properties b/dhfs-parent/objects/src/main/resources/application.properties index f7842d0c..93211847 100644 --- a/dhfs-parent/objects/src/main/resources/application.properties +++ b/dhfs-parent/objects/src/main/resources/application.properties @@ -1,4 +1,4 @@ -dhfs.objects.persistence=files +dhfs.objects.persistence=lmdb dhfs.objects.writeback.limit=134217728 dhfs.objects.lru.limit=134217728 dhfs.objects.lru.print-stats=true