objects: some basic iterator structure

This commit is contained in:
2025-02-22 12:26:38 +01:00
parent 5b3e55d1bb
commit a461dd6b80
22 changed files with 579 additions and 362 deletions

View File

@@ -64,6 +64,11 @@
<artifactId>quarkus-junit5-mockito</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.lmdbjava</groupId>
<artifactId>lmdbjava</artifactId>
<version>0.9.1</version>
</dependency>
</dependencies>
<build>

View File

@@ -0,0 +1,10 @@
package com.usatiuk.dhfs.objects;
import com.usatiuk.dhfs.utils.AutoCloseableNoThrow;
import org.apache.commons.lang3.tuple.Pair;
import java.util.Iterator;
public interface CloseableKvIterator<K extends Comparable<K>, V> extends Iterator<Pair<K, V>>, AutoCloseableNoThrow {
K peekNextKey();
}

View File

@@ -4,8 +4,8 @@ import jakarta.annotation.Nonnull;
import java.io.Serializable;
public record JDataVersionedWrapper<T extends JData>(@Nonnull T data, long version) implements Serializable {
public JDataVersionedWrapper<T> withVersion(long version) {
return new JDataVersionedWrapper<>(data, version);
public record JDataVersionedWrapper(@Nonnull JData data, long version) implements Serializable {
public JDataVersionedWrapper withVersion(long version) {
return new JDataVersionedWrapper(data, version);
}
}

View File

@@ -1,6 +1,7 @@
package com.usatiuk.dhfs.objects;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
public record JObjectKey(String name) implements Serializable, Comparable<JObjectKey> {
public static JObjectKey of(String name) {
@@ -16,4 +17,12 @@ public record JObjectKey(String name) implements Serializable, Comparable<JObjec
public String toString() {
return name;
}
public byte[] bytes() {
return name.getBytes(StandardCharsets.UTF_8);
}
public static JObjectKey fromBytes(byte[] bytes) {
return new JObjectKey(new String(bytes, StandardCharsets.UTF_8));
}
}

View File

@@ -52,7 +52,7 @@ public class JObjectManager {
_preCommitTxHooks = preCommitTxHooks.stream().sorted(Comparator.comparingInt(PreCommitTxHook::getPriority)).toList();
}
private <T extends JData> JDataVersionedWrapper<T> get(Class<T> type, JObjectKey key) {
private <T extends JData> JDataVersionedWrapper get(Class<T> type, JObjectKey key) {
verifyReady();
while (true) {
{
@@ -63,26 +63,24 @@ public class JObjectManager {
if (ref == null) {
_objects.remove(key, got);
} else if (type.isInstance(ref.data())) {
return (JDataVersionedWrapper<T>) ref;
return (JDataVersionedWrapper) ref;
} else {
throw new IllegalArgumentException("Object type mismatch: " + ref.data().getClass() + " vs " + type);
}
}
}
//noinspection unused
try (var readLock = _objLocker.lock(key)) {
if (_objects.containsKey(key)) continue;
var read = writebackObjectPersistentStore.readObject(key).orElse(null);
if (read == null) return null;
if (type.isInstance(read.data())) {
var wrapper = new JDataWrapper<>((JDataVersionedWrapper<T>) read);
var wrapper = new JDataWrapper<>((JDataVersionedWrapper) read);
var old = _objects.put(key, wrapper);
assert old == null;
return (JDataVersionedWrapper<T>) read;
return (JDataVersionedWrapper) read;
} else {
throw new IllegalArgumentException("Object type mismatch: " + read.getClass() + " vs " + type);
}
@@ -229,7 +227,7 @@ public class JObjectManager {
switch (action.getValue()) {
case TxRecord.TxObjectRecordWrite<?> write -> {
Log.trace("Writing " + action.getKey());
var wrapped = new JDataVersionedWrapper<>(write.data(), tx.getId());
var wrapped = new JDataVersionedWrapper(write.data(), tx.getId());
_objects.put(action.getKey(), new JDataWrapper<>(wrapped));
}
case TxRecord.TxObjectRecordDeleted deleted -> {
@@ -285,19 +283,19 @@ public class JObjectManager {
}
private record TransactionObjectNoLock<T extends JData>
(Optional<JDataVersionedWrapper<T>> data)
(Optional<JDataVersionedWrapper> data)
implements TransactionObject<T> {
}
private record TransactionObjectLocked<T extends JData>
(Optional<JDataVersionedWrapper<T>> data, AutoCloseableNoThrow lock)
(Optional<JDataVersionedWrapper> data, AutoCloseableNoThrow lock)
implements TransactionObject<T> {
}
private class JDataWrapper<T extends JData> extends WeakReference<JDataVersionedWrapper<T>> {
private class JDataWrapper<T extends JData> extends WeakReference<JDataVersionedWrapper> {
private static final Cleaner CLEANER = Cleaner.create();
public JDataWrapper(JDataVersionedWrapper<T> referent) {
public JDataWrapper(JDataVersionedWrapper referent) {
super(referent);
var key = referent.data().key();
CLEANER.register(referent, () -> {

View File

@@ -0,0 +1,73 @@
package com.usatiuk.dhfs.objects;
import org.apache.commons.lang3.tuple.Pair;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.stream.Stream;
public class MergingKvIterator<K extends Comparable<K>, V> implements CloseableKvIterator<K, V> {
private final List<CloseableKvIterator<K, V>> _iterators;
private final SortedMap<K, CloseableKvIterator<K, V>> _sortedIterators = new TreeMap<>();
public MergingKvIterator(List<CloseableKvIterator<K, V>> iterators) {
_iterators = iterators;
for (CloseableKvIterator<K, V> iterator : iterators) {
if (!iterator.hasNext()) {
continue;
}
K key = iterator.peekNextKey();
if (key != null) {
_sortedIterators.put(key, iterator);
}
}
}
@SafeVarargs
public MergingKvIterator(CloseableKvIterator<K, V>... iterators) {
this(List.of(iterators));
}
@SafeVarargs
public MergingKvIterator(MergingKvIterator<K, V> parent, CloseableKvIterator<K, V>... iterators) {
this(Stream.concat(parent._iterators.stream(), Stream.of(iterators)).toList());
}
@Override
public K peekNextKey() {
var cur = _sortedIterators.pollFirstEntry();
if (cur == null) {
throw new NoSuchElementException();
}
return cur.getKey();
}
@Override
public void close() {
for (CloseableKvIterator<K, V> iterator : _iterators) {
iterator.close();
}
}
@Override
public boolean hasNext() {
return !_sortedIterators.isEmpty();
}
@Override
public Pair<K, V> next() {
var cur = _sortedIterators.pollFirstEntry();
if (cur == null) {
throw new NoSuchElementException();
}
var curVal = cur.getValue().next();
if (cur.getValue().hasNext()) {
var nextKey = cur.getValue().peekNextKey();
_sortedIterators.put(nextKey, cur.getValue());
}
return curVal;
}
}

View File

@@ -0,0 +1,63 @@
package com.usatiuk.dhfs.objects;
import com.usatiuk.dhfs.objects.persistence.IteratorStart;
import org.apache.commons.lang3.tuple.Pair;
import java.util.*;
public class NavigableMapKvIterator<K extends Comparable<K>, V> implements CloseableKvIterator<K, V> {
private final Iterator<Map.Entry<K, V>> _iterator;
private Map.Entry<K, V> _next;
public NavigableMapKvIterator(NavigableMap<K, V> map, IteratorStart start, K key) {
SortedMap<K, V> _view;
switch (start) {
case GE -> _view = map.tailMap(key, true);
case GT -> _view = map.tailMap(key, false);
case LE -> {
var tail = map.tailMap(key, true);
if (tail.firstKey().equals(key)) _view = tail;
else _view = map.tailMap(map.lowerKey(key), true);
}
case LT -> _view = map.tailMap(map.lowerKey(key), true);
default -> throw new IllegalArgumentException("Unknown start type");
}
_iterator = _view.entrySet().iterator();
fillNext();
}
private void fillNext() {
while (_iterator.hasNext() && _next == null) {
_next = _iterator.next();
}
}
@Override
public K peekNextKey() {
if (_next == null) {
throw new NoSuchElementException();
}
return _next.getKey();
}
@Override
public void close() {
}
@Override
public boolean hasNext() {
return _next != null;
}
@Override
public Pair<K, V> next() {
if (_next == null) {
throw new NoSuchElementException("No more elements");
}
var ret = _next;
_next = null;
fillNext();
return Pair.of(ret);
}
}

View File

@@ -0,0 +1,56 @@
package com.usatiuk.dhfs.objects;
import org.apache.commons.lang3.tuple.Pair;
import java.util.NoSuchElementException;
import java.util.function.Function;
public class PredicateKvIterator<K extends Comparable<K>, V, V_T> implements CloseableKvIterator<K, V_T> {
private final CloseableKvIterator<K, V> _backing;
private final Function<V, V_T> _transformer;
private Pair<K, V_T> _next;
public PredicateKvIterator(CloseableKvIterator<K, V> backing, Function<V, V_T> transformer) {
_backing = backing;
_transformer = transformer;
fillNext();
}
private void fillNext() {
while (_backing.hasNext() && _next == null) {
var next = _backing.next();
var transformed = _transformer.apply(next.getValue());
if (transformed == null)
continue;
_next = Pair.of(next.getKey(), transformed);
}
}
@Override
public K peekNextKey() {
if (_next == null)
throw new NoSuchElementException();
return _next.getKey();
}
@Override
public void close() {
_backing.close();
}
@Override
public boolean hasNext() {
return _next != null;
}
@Override
public Pair<K, V_T> next() {
if (_next == null)
throw new NoSuchElementException("No more elements");
var ret = _next;
_next = null;
fillNext();
return ret;
}
}

View File

@@ -3,7 +3,7 @@ package com.usatiuk.dhfs.objects;
public interface TxBundle {
long getId();
void commit(JDataVersionedWrapper<?> obj);
void commit(JDataVersionedWrapper obj);
void delete(JObjectKey obj);
}

View File

@@ -1,5 +1,7 @@
package com.usatiuk.dhfs.objects;
import com.usatiuk.dhfs.objects.persistence.IteratorStart;
import java.util.Collection;
import java.util.Optional;
@@ -13,6 +15,7 @@ public interface TxWriteback {
void fence(long bundleId);
Optional<PendingWriteEntry> getPendingWrite(JObjectKey key);
Collection<PendingWriteEntry> getPendingWrites();
// Executes callback after bundle with bundleId id has been persisted
@@ -23,9 +26,15 @@ public interface TxWriteback {
long bundleId();
}
record PendingWrite(JDataVersionedWrapper<?> data, long bundleId) implements PendingWriteEntry {
record PendingWrite(JDataVersionedWrapper data, long bundleId) implements PendingWriteEntry {
}
record PendingDelete(JObjectKey key, long bundleId) implements PendingWriteEntry {
}
CloseableKvIterator<JObjectKey, JDataVersionedWrapper> getIterator(IteratorStart start, JObjectKey key);
default CloseableKvIterator<JObjectKey, JDataVersionedWrapper> getIterator(JObjectKey key) {
return getIterator(IteratorStart.GE, key);
}
}

View File

@@ -1,6 +1,7 @@
package com.usatiuk.dhfs.objects;
import com.usatiuk.dhfs.objects.persistence.CachingObjectPersistentStore;
import com.usatiuk.dhfs.objects.persistence.IteratorStart;
import com.usatiuk.dhfs.objects.persistence.TxManifestObj;
import io.quarkus.logging.Log;
import io.quarkus.runtime.ShutdownEvent;
@@ -14,7 +15,7 @@ import org.apache.commons.lang3.tuple.Pair;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -24,7 +25,7 @@ import java.util.concurrent.atomic.AtomicLong;
public class TxWritebackImpl implements TxWriteback {
private final LinkedList<TxBundleImpl> _pendingBundles = new LinkedList<>();
private final ConcurrentHashMap<JObjectKey, PendingWriteEntry> _pendingWrites = new ConcurrentHashMap<>();
private final ConcurrentSkipListMap<JObjectKey, PendingWriteEntry> _pendingWrites = new ConcurrentSkipListMap<>();
private final LinkedHashMap<Long, TxBundleImpl> _notFlushedBundles = new LinkedHashMap<>();
private final Object _flushWaitSynchronizer = new Object();
@@ -37,7 +38,6 @@ public class TxWritebackImpl implements TxWriteback {
long sizeLimit;
private long currentSize = 0;
private ExecutorService _writebackExecutor;
private ExecutorService _commitExecutor;
private ExecutorService _statusExecutor;
private volatile boolean _ready = false;
@@ -51,21 +51,13 @@ public class TxWritebackImpl implements TxWriteback {
_writebackExecutor.submit(this::writeback);
}
{
BasicThreadFactory factory = new BasicThreadFactory.Builder()
.namingPattern("writeback-commit-%d")
.build();
_commitExecutor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(), factory);
}
_statusExecutor = Executors.newSingleThreadExecutor();
_statusExecutor.submit(() -> {
try {
while (true) {
Thread.sleep(1000);
if (currentSize > 0)
Log.info("Tx commit status: size="
+ currentSize / 1024 / 1024 + "MB");
Log.info("Tx commit status: size=" + currentSize / 1024 / 1024 + "MB");
}
} catch (InterruptedException ignored) {
}
@@ -111,12 +103,12 @@ public class TxWritebackImpl implements TxWriteback {
}
}
var toWrite = new ArrayList<Pair<JObjectKey, JDataVersionedWrapper<?>>>();
var toWrite = new ArrayList<Pair<JObjectKey, JDataVersionedWrapper>>();
var toDelete = new ArrayList<JObjectKey>();
for (var e : bundle._entries.values()) {
switch (e) {
case TxBundleImpl.CommittedEntry(JObjectKey key, JDataVersionedWrapper<?> data, int size) -> {
case TxBundleImpl.CommittedEntry(JObjectKey key, JDataVersionedWrapper data, int size) -> {
Log.trace("Writing new " + key);
toWrite.add(Pair.of(key, data));
}
@@ -336,7 +328,7 @@ public class TxWritebackImpl implements TxWriteback {
}
@Override
public void commit(JDataVersionedWrapper<?> obj) {
public void commit(JDataVersionedWrapper obj) {
synchronized (_entries) {
_entries.put(obj.data().key(), new CommittedEntry(obj.data().key(), obj, obj.data().estimateSize()));
}
@@ -371,7 +363,7 @@ public class TxWritebackImpl implements TxWriteback {
int size();
}
private record CommittedEntry(JObjectKey key, JDataVersionedWrapper<?> data, int size)
private record CommittedEntry(JObjectKey key, JDataVersionedWrapper data, int size)
implements BundleEntry {
}
@@ -383,4 +375,18 @@ public class TxWritebackImpl implements TxWriteback {
}
}
}
// Returns an iterator with a view of all commited objects
// Does not have to guarantee consistent view, snapshots are handled by upper layers
@Override
public CloseableKvIterator<JObjectKey, JDataVersionedWrapper> getIterator(IteratorStart start, JObjectKey key) {
return new PredicateKvIterator<>(
new NavigableMapKvIterator<>(_pendingWrites, start, key),
e -> {
if (e instanceof PendingWrite pw) {
return pw.data();
}
return null;
});
}
}

View File

@@ -1,6 +1,7 @@
package com.usatiuk.dhfs.objects;
import com.usatiuk.dhfs.objects.persistence.CachingObjectPersistentStore;
import com.usatiuk.dhfs.objects.persistence.IteratorStart;
import com.usatiuk.dhfs.objects.transaction.TxRecord;
import io.quarkus.logging.Log;
import jakarta.enterprise.context.ApplicationScoped;
@@ -34,7 +35,7 @@ public class WritebackObjectPersistentStore {
}
@Nonnull
Optional<JDataVersionedWrapper<?>> readObject(JObjectKey name) {
Optional<JDataVersionedWrapper> readObject(JObjectKey name) {
var pending = txWriteback.getPendingWrite(name).orElse(null);
return switch (pending) {
case TxWriteback.PendingWrite write -> Optional.of(write.data());
@@ -51,7 +52,7 @@ public class WritebackObjectPersistentStore {
switch (action) {
case TxRecord.TxObjectRecordWrite<?> write -> {
Log.trace("Flushing object " + write.key());
bundle.commit(new JDataVersionedWrapper<>(write.data(), id));
bundle.commit(new JDataVersionedWrapper(write.data(), id));
}
case TxRecord.TxObjectRecordDeleted deleted -> {
Log.trace("Deleting object " + deleted.key());
@@ -74,4 +75,14 @@ public class WritebackObjectPersistentStore {
return r -> txWriteback.asyncFence(bundleId, r);
}
// Returns an iterator with a view of all commited objects
// Does not have to guarantee consistent view, snapshots are handled by upper layers
public CloseableKvIterator<JObjectKey, JDataVersionedWrapper> getIterator(IteratorStart start, JObjectKey key) {
return new MergingKvIterator<>(delegate.getIterator(start, key), txWriteback.getIterator(start, key));
}
public CloseableKvIterator<JObjectKey, JDataVersionedWrapper> getIterator(JObjectKey key) {
return getIterator(IteratorStart.GE, key);
}
}

View File

@@ -1,7 +1,6 @@
package com.usatiuk.dhfs.objects.persistence;
import com.usatiuk.dhfs.objects.JDataVersionedWrapper;
import com.usatiuk.dhfs.objects.JObjectKey;
import com.usatiuk.dhfs.objects.*;
import com.usatiuk.dhfs.utils.DataLocker;
import io.quarkus.logging.Log;
import io.quarkus.runtime.Startup;
@@ -14,6 +13,7 @@ import javax.annotation.Nonnull;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.Optional;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Stream;
@@ -21,6 +21,7 @@ import java.util.stream.Stream;
@ApplicationScoped
public class CachingObjectPersistentStore {
private final LinkedHashMap<JObjectKey, CacheEntry> _cache = new LinkedHashMap<>(8, 0.75f, true);
private final ConcurrentSkipListMap<JObjectKey, CacheEntry> _sortedCache = new ConcurrentSkipListMap<>();
private final DataLocker _locker = new DataLocker();
@Inject
SerializingObjectPersistentStore delegate;
@@ -57,17 +58,20 @@ public class CachingObjectPersistentStore {
return delegate.findAllObjects();
}
private void put(JObjectKey key, Optional<JDataVersionedWrapper<?>> obj) {
private void put(JObjectKey key, Optional<JDataVersionedWrapper> obj) {
synchronized (_cache) {
int size = obj.map(o -> o.data().estimateSize()).orElse(0);
_curSize += size;
var old = _cache.putLast(key, new CacheEntry(obj, size));
var entry = new CacheEntry(obj, size);
var old = _cache.putLast(key, entry);
_sortedCache.put(key, entry);
if (old != null)
_curSize -= old.size();
while (_curSize >= sizeLimit) {
var del = _cache.pollFirstEntry();
_sortedCache.remove(del.getKey(), del.getValue());
_curSize -= del.getValue().size();
_evict++;
}
@@ -75,7 +79,7 @@ public class CachingObjectPersistentStore {
}
@Nonnull
public Optional<JDataVersionedWrapper<?>> readObject(JObjectKey name) {
public Optional<JDataVersionedWrapper> readObject(JObjectKey name) {
try (var lock = _locker.lock(name)) {
synchronized (_cache) {
var got = _cache.get(name);
@@ -90,7 +94,7 @@ public class CachingObjectPersistentStore {
}
}
public void commitTx(TxManifestObj<? extends JDataVersionedWrapper<?>> names) {
public void commitTx(TxManifestObj<? extends JDataVersionedWrapper> names) {
// During commit, readObject shouldn't be called for these items,
// it should be handled by the upstream store
synchronized (_cache) {
@@ -98,11 +102,27 @@ public class CachingObjectPersistentStore {
names.deleted().stream()).toList()) {
_curSize -= Optional.ofNullable(_cache.get(key)).map(CacheEntry::size).orElse(0L);
_cache.remove(key);
_sortedCache.remove(key);
}
}
delegate.commitTx(names);
}
private record CacheEntry(Optional<JDataVersionedWrapper<?>> object, long size) {
// Returns an iterator with a view of all commited objects
// Does not have to guarantee consistent view, snapshots are handled by upper layers
public CloseableKvIterator<JObjectKey, JDataVersionedWrapper> getIterator(IteratorStart start, JObjectKey key) {
return new MergingKvIterator<>(
new PredicateKvIterator<>(
new NavigableMapKvIterator<>(_sortedCache, start, key),
e -> e.object().orElse(null)
),
delegate.getIterator(start, key));
}
public CloseableKvIterator<JObjectKey, JDataVersionedWrapper> getIterator(JObjectKey key) {
return getIterator(IteratorStart.GE, key);
}
private record CacheEntry(Optional<JDataVersionedWrapper> object, long size) {
}
}

View File

@@ -1,308 +0,0 @@
package com.usatiuk.dhfs.objects.persistence;
import com.google.protobuf.ByteString;
import com.google.protobuf.UnsafeByteOperations;
import com.usatiuk.dhfs.objects.JObjectKey;
import com.usatiuk.dhfs.supportlib.UninitializedByteBuffer;
import com.usatiuk.dhfs.utils.ByteUtils;
import com.usatiuk.dhfs.utils.SerializationHelper;
import com.usatiuk.dhfs.utils.StatusRuntimeExceptionNoStacktrace;
import io.grpc.Status;
import io.quarkus.arc.properties.IfBuildProperty;
import io.quarkus.logging.Log;
import io.quarkus.runtime.ShutdownEvent;
import io.quarkus.runtime.StartupEvent;
import jakarta.annotation.Priority;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
import net.openhft.hashing.LongHashFunction;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import javax.annotation.Nonnull;
import java.io.*;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Stream;
import static java.nio.file.StandardCopyOption.ATOMIC_MOVE;
import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
// File format:
// 64-bit metadata serialized size
// 64-bit offset of "rest of" metadata (if -1 then file has no data,
// if 0 then file has data and metadata fits into META_BLOCK_SIZE)
// Until META_BLOCK_SIZE - metadata (encoded as ObjectMetadataP)
// data (encoded as JObjectDataP)
// rest of metadata
@ApplicationScoped
@IfBuildProperty(name = "dhfs.objects.persistence", stringValue = "files")
public class FileObjectPersistentStore implements ObjectPersistentStore {
private final Path _root;
private final Path _txManifest;
private ExecutorService _flushExecutor;
private RandomAccessFile _txFile;
private boolean _ready = false;
public FileObjectPersistentStore(@ConfigProperty(name = "dhfs.objects.persistence.files.root") String root) {
this._root = Path.of(root).resolve("objects");
_txManifest = Path.of(root).resolve("cur-tx-manifest");
}
void init(@Observes @Priority(100) StartupEvent event) throws IOException {
if (!_root.toFile().exists()) {
Log.info("Initializing with root " + _root);
_root.toFile().mkdirs();
for (int i = 0; i < 256; i++) {
_root.resolve(String.valueOf(i)).toFile().mkdirs();
}
}
if (!Files.exists(_txManifest)) {
Files.createFile(_txManifest);
}
_txFile = new RandomAccessFile(_txManifest.toFile(), "rw");
_flushExecutor = Executors.newVirtualThreadPerTaskExecutor();
tryReplay();
Log.info("Transaction replay done");
_ready = true;
}
void shutdown(@Observes @Priority(900) ShutdownEvent event) throws IOException {
_ready = false;
Log.debug("Deleting manifest file");
_txFile.close();
Files.delete(_txManifest);
Log.debug("Manifest file deleted");
}
private void verifyReady() {
if (!_ready) throw new IllegalStateException("Wrong service order!");
}
private void tryReplay() {
var read = readTxManifest();
if (read != null)
commitTxImpl(read, false);
}
private Path getObjPath(@Nonnull JObjectKey obj) {
int h = Objects.hash(obj);
int p1 = h & 0b00000000_00000000_11111111_00000000;
return _root.resolve(String.valueOf(p1 >> 8)).resolve(obj.toString());
}
private Path getTmpObjPath(@Nonnull JObjectKey obj) {
int h = Objects.hash(obj);
int p1 = h & 0b00000000_00000000_11111111_00000000;
return _root.resolve(String.valueOf(p1 >> 8)).resolve(obj + ".tmp");
}
private void findAllObjectsImpl(Collection<JObjectKey> out, Path path) {
var read = path.toFile().listFiles();
if (read == null) return;
for (var s : read) {
if (s.isDirectory()) {
findAllObjectsImpl(out, s.toPath());
} else {
if (s.getName().endsWith(".tmp")) continue; // FIXME:
out.add(new JObjectKey(s.getName())); // FIXME:
}
}
}
@Nonnull
@Override
public Collection<JObjectKey> findAllObjects() {
verifyReady();
ArrayList<JObjectKey> out = new ArrayList<>();
findAllObjectsImpl(out, _root);
return Collections.unmodifiableCollection(out);
}
@Nonnull
@Override
public Optional<ByteString> readObject(JObjectKey name) {
verifyReady();
var path = getObjPath(name);
try (var rf = new RandomAccessFile(path.toFile(), "r")) {
ByteBuffer buf = UninitializedByteBuffer.allocateUninitialized(Math.toIntExact(rf.getChannel().size()));
fillBuffer(buf, rf.getChannel());
buf.flip();
var bs = UnsafeByteOperations.unsafeWrap(buf);
// This way, the input will be considered "immutable" which would allow avoiding copies
// when parsing byte arrays
// var ch = bs.newCodedInput();
// ch.enableAliasing(true);
return Optional.of(bs);
} catch (EOFException | FileNotFoundException | NoSuchFileException fx) {
return Optional.empty();
} catch (IOException e) {
Log.error("Error reading file " + path, e);
throw new StatusRuntimeExceptionNoStacktrace(Status.INTERNAL);
}
}
private void fillBuffer(ByteBuffer dst, FileChannel src) throws IOException {
int rem = dst.remaining();
int readTotal = 0;
int readCur = 0;
while (readTotal < rem && (readCur = src.read(dst)) != -1) {
readTotal += readCur;
}
if (rem != readTotal)
throw new EOFException();
}
private void writeObjectImpl(Path path, ByteString data, boolean sync) throws IOException {
try (var fsb = new FileOutputStream(path.toFile(), false)) {
data.writeTo(fsb);
if (sync) {
fsb.flush();
fsb.getFD().sync();
}
}
}
private TxManifestRaw readTxManifest() {
try {
var channel = _txFile.getChannel();
if (channel.size() == 0)
return null;
channel.position(0);
var buf = ByteBuffer.allocate(Math.toIntExact(channel.size()));
fillBuffer(buf, channel);
buf.flip();
long checksum = buf.getLong();
var data = buf.slice();
var hash = LongHashFunction.xx3().hashBytes(data);
if (hash != checksum)
throw new StatusRuntimeExceptionNoStacktrace(Status.DATA_LOSS.withDescription("Transaction manifest checksum mismatch!"));
return SerializationHelper.deserialize(data.array(), data.arrayOffset());
} catch (IOException e) {
throw new RuntimeException(e);
}
}
private void putTxManifest(TxManifestRaw manifest) {
try {
var channel = _txFile.getChannel();
var data = SerializationHelper.serializeArray(manifest);
channel.truncate(data.length + 8);
channel.position(0);
var hash = LongHashFunction.xx3().hashBytes(data);
if (channel.write(ByteUtils.longToBb(hash)) != 8)
throw new StatusRuntimeExceptionNoStacktrace(Status.INTERNAL);
if (channel.write(ByteBuffer.wrap(data)) != data.length)
throw new StatusRuntimeExceptionNoStacktrace(Status.INTERNAL);
channel.force(true);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public void commitTx(TxManifestRaw manifest) {
verifyReady();
try {
_flushExecutor.invokeAll(
manifest.written().stream().map(p -> (Callable<Void>) () -> {
var tmpPath = getTmpObjPath(p.getKey());
writeObjectImpl(tmpPath, p.getValue(), true);
return null;
}).toList()
).forEach(p -> {
try {
p.get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
});
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
commitTxImpl(manifest, true);
}
public void commitTxImpl(TxManifestRaw manifest, boolean failIfNotFound) {
if (manifest.deleted().isEmpty() && manifest.written().isEmpty()) {
Log.debug("Empty manifest, skipping");
return;
}
putTxManifest(manifest);
try {
_flushExecutor.invokeAll(
Stream.concat(manifest.written().stream().map(p -> (Callable<Void>) () -> {
try {
Files.move(getTmpObjPath(p.getKey()), getObjPath(p.getKey()), ATOMIC_MOVE, REPLACE_EXISTING);
} catch (NoSuchFileException n) {
if (failIfNotFound)
throw n;
}
return null;
}),
manifest.deleted().stream().map(p -> (Callable<Void>) () -> {
deleteImpl(getObjPath(p));
return null;
})).toList()
).forEach(p -> {
try {
p.get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
});
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
private void deleteImpl(Path path) {
try {
Files.delete(path);
} catch (NoSuchFileException ignored) {
} catch (IOException e) {
Log.error("Error deleting file " + path, e);
throw new StatusRuntimeExceptionNoStacktrace(Status.INTERNAL);
}
}
@Override
public long getTotalSpace() {
verifyReady();
return _root.toFile().getTotalSpace();
}
@Override
public long getFreeSpace() {
verifyReady();
return _root.toFile().getFreeSpace();
}
@Override
public long getUsableSpace() {
verifyReady();
return _root.toFile().getUsableSpace();
}
}

View File

@@ -0,0 +1,8 @@
package com.usatiuk.dhfs.objects.persistence;
public enum IteratorStart {
LT,
LE,
GT,
GE
}

View File

@@ -0,0 +1,201 @@
package com.usatiuk.dhfs.objects.persistence;
import com.google.protobuf.ByteString;
import com.usatiuk.dhfs.objects.CloseableKvIterator;
import com.usatiuk.dhfs.objects.JObjectKey;
import io.quarkus.arc.properties.IfBuildProperty;
import io.quarkus.logging.Log;
import io.quarkus.runtime.ShutdownEvent;
import io.quarkus.runtime.StartupEvent;
import jakarta.annotation.Priority;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
import org.apache.commons.lang3.tuple.Pair;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.lmdbjava.*;
import javax.annotation.Nonnull;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.util.Collection;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Optional;
import static org.lmdbjava.DbiFlags.MDB_CREATE;
import static org.lmdbjava.Env.create;
@ApplicationScoped
@IfBuildProperty(name = "dhfs.objects.persistence", stringValue = "lmdb")
public class LmdbObjectPersistentStore implements ObjectPersistentStore {
private final Path _root;
private Env<byte[]> _env;
private Dbi<byte[]> _db;
private boolean _ready = false;
private static final String DB_NAME = "objects";
public LmdbObjectPersistentStore(@ConfigProperty(name = "dhfs.objects.persistence.files.root") String root) {
_root = Path.of(root).resolve("objects");
}
void init(@Observes @Priority(100) StartupEvent event) throws IOException {
if (!_root.toFile().exists()) {
Log.info("Initializing with root " + _root);
_root.toFile().mkdirs();
}
_env = create(ByteArrayProxy.PROXY_BA)
.setMapSize(1_000_000_000_000L)
.setMaxDbs(1)
.open(_root.toFile(), EnvFlags.MDB_NOTLS);
_db = _env.openDbi(DB_NAME, MDB_CREATE);
_ready = true;
}
void shutdown(@Observes @Priority(900) ShutdownEvent event) throws IOException {
_ready = false;
_db.close();
_env.close();
}
private void verifyReady() {
if (!_ready) throw new IllegalStateException("Wrong service order!");
}
@Nonnull
@Override
public Collection<JObjectKey> findAllObjects() {
// try (Txn<ByteBuffer> txn = env.txnRead()) {
// try (var cursor = db.openCursor(txn)) {
// var keys = List.of();
// while (cursor.next()) {
// keys.add(JObjectKey.fromBytes(cursor.key()));
// }
// return keys;
// }
// }
return List.of();
}
@Nonnull
@Override
public Optional<ByteString> readObject(JObjectKey name) {
verifyReady();
try (Txn<byte[]> txn = _env.txnRead()) {
var key = name.toString().getBytes(StandardCharsets.UTF_8);
var value = _db.get(txn, key);
return Optional.ofNullable(value).map(ByteString::copyFrom);
}
}
private class LmdbKvIterator implements CloseableKvIterator<JObjectKey, ByteString> {
private final Txn<byte[]> _txn = _env.txnRead();
private final Cursor<byte[]> _cursor = _db.openCursor(_txn);
private boolean _hasNext = false;
LmdbKvIterator(IteratorStart start, JObjectKey key) {
verifyReady();
if (!_cursor.get(key.toString().getBytes(StandardCharsets.UTF_8), GetOp.MDB_SET_RANGE)) {
return;
}
var got = JObjectKey.fromBytes(_cursor.key());
var cmp = got.compareTo(key);
assert cmp >= 0;
_hasNext = true;
if (cmp == 0) {
switch (start) {
case LT -> {
_hasNext = _cursor.prev();
}
case GT -> {
_hasNext = _cursor.next();
}
case LE, GE -> {
}
}
} else {
switch (start) {
case LT, LE -> {
_hasNext = _cursor.prev();
}
case GT, GE -> {
}
}
}
}
@Override
public void close() {
_cursor.close();
_txn.close();
}
@Override
public boolean hasNext() {
return _hasNext;
}
@Override
public Pair<JObjectKey, ByteString> next() {
if (!_hasNext) {
throw new NoSuchElementException("No more elements");
}
var ret = Pair.of(JObjectKey.fromBytes(_cursor.key()), ByteString.copyFrom(_cursor.val()));
_hasNext = _cursor.next();
return ret;
}
@Override
public JObjectKey peekNextKey() {
if (!_hasNext) {
throw new NoSuchElementException("No more elements");
}
return JObjectKey.fromBytes(_cursor.key());
}
}
@Override
public CloseableKvIterator<JObjectKey, ByteString> getIterator(IteratorStart start, JObjectKey key) {
return new LmdbKvIterator(start, key);
}
@Override
public void commitTx(TxManifestRaw names) {
verifyReady();
try (Txn<byte[]> txn = _env.txnWrite()) {
for (var written : names.written()) {
var key = written.getKey().toString().getBytes(StandardCharsets.UTF_8);
_db.put(txn, key, written.getValue().toByteArray());
}
for (JObjectKey key : names.deleted()) {
var keyBytes = key.toString().getBytes(StandardCharsets.UTF_8);
_db.delete(txn, keyBytes);
}
txn.commit();
}
}
@Override
public long getTotalSpace() {
verifyReady();
return _root.toFile().getTotalSpace();
}
@Override
public long getFreeSpace() {
verifyReady();
return _root.toFile().getFreeSpace();
}
@Override
public long getUsableSpace() {
verifyReady();
return _root.toFile().getUsableSpace();
}
}

View File

@@ -1,20 +1,21 @@
package com.usatiuk.dhfs.objects.persistence;
import com.google.protobuf.ByteString;
import com.usatiuk.dhfs.objects.CloseableKvIterator;
import com.usatiuk.dhfs.objects.JObjectKey;
import com.usatiuk.dhfs.objects.NavigableMapKvIterator;
import io.quarkus.arc.properties.IfBuildProperty;
import jakarta.enterprise.context.ApplicationScoped;
import javax.annotation.Nonnull;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentSkipListMap;
@ApplicationScoped
@IfBuildProperty(name = "dhfs.objects.persistence", stringValue = "memory")
public class MemoryObjectPersistentStore implements ObjectPersistentStore {
private final Map<JObjectKey, ByteString> _objects = new HashMap<>();
private final ConcurrentSkipListMap<JObjectKey, ByteString> _objects = new ConcurrentSkipListMap<>();
@Nonnull
@Override
@@ -32,6 +33,11 @@ public class MemoryObjectPersistentStore implements ObjectPersistentStore {
}
}
@Override
public CloseableKvIterator<JObjectKey, ByteString> getIterator(IteratorStart start, JObjectKey key) {
return new NavigableMapKvIterator<>(_objects, start, key);
}
@Override
public void commitTx(TxManifestRaw names) {
synchronized (this) {

View File

@@ -1,6 +1,7 @@
package com.usatiuk.dhfs.objects.persistence;
import com.google.protobuf.ByteString;
import com.usatiuk.dhfs.objects.CloseableKvIterator;
import com.usatiuk.dhfs.objects.JObjectKey;
import javax.annotation.Nonnull;
@@ -16,6 +17,14 @@ public interface ObjectPersistentStore {
@Nonnull
Optional<ByteString> readObject(JObjectKey name);
// Returns an iterator with a view of all commited objects
// Does not have to guarantee consistent view, snapshots are handled by upper layers
CloseableKvIterator<JObjectKey, ByteString> getIterator(IteratorStart start, JObjectKey key);
default CloseableKvIterator<JObjectKey, ByteString> getIterator(JObjectKey key) {
return getIterator(IteratorStart.GE, key);
}
void commitTx(TxManifestRaw names);
long getTotalSpace();

View File

@@ -1,5 +1,7 @@
package com.usatiuk.dhfs.objects.persistence;
import com.google.protobuf.ByteString;
import com.usatiuk.dhfs.objects.CloseableKvIterator;
import com.usatiuk.dhfs.objects.JDataVersionedWrapper;
import com.usatiuk.dhfs.objects.JObjectKey;
import com.usatiuk.dhfs.objects.ObjectSerializer;
@@ -17,20 +19,59 @@ public class SerializingObjectPersistentStore {
ObjectSerializer<JDataVersionedWrapper> serializer;
@Inject
ObjectPersistentStore delegate;
ObjectPersistentStore delegateStore;
@Nonnull
Collection<JObjectKey> findAllObjects() {
return delegate.findAllObjects();
return delegateStore.findAllObjects();
}
@Nonnull
Optional<JDataVersionedWrapper<?>> readObject(JObjectKey name) {
return delegate.readObject(name).map(serializer::deserialize);
Optional<JDataVersionedWrapper> readObject(JObjectKey name) {
return delegateStore.readObject(name).map(serializer::deserialize);
}
void commitTx(TxManifestObj<? extends JDataVersionedWrapper<?>> names) {
delegate.commitTx(new TxManifestRaw(
private class SerializingKvIterator implements CloseableKvIterator<JObjectKey, JDataVersionedWrapper> {
private final CloseableKvIterator<JObjectKey, ByteString> _delegate;
private SerializingKvIterator(IteratorStart start, JObjectKey key) {
_delegate = delegateStore.getIterator(start, key);
}
@Override
public JObjectKey peekNextKey() {
return _delegate.peekNextKey();
}
@Override
public void close() {
_delegate.close();
}
@Override
public boolean hasNext() {
return _delegate.hasNext();
}
@Override
public Pair<JObjectKey, JDataVersionedWrapper> next() {
var next = _delegate.next();
return Pair.of(next.getKey(), serializer.deserialize(next.getValue()));
}
}
// Returns an iterator with a view of all commited objects
// Does not have to guarantee consistent view, snapshots are handled by upper layers
public CloseableKvIterator<JObjectKey, JDataVersionedWrapper> getIterator(IteratorStart start, JObjectKey key) {
return new SerializingKvIterator(start, key);
}
public CloseableKvIterator<JObjectKey, JDataVersionedWrapper> getIterator(JObjectKey key) {
return getIterator(IteratorStart.GE, key);
}
void commitTx(TxManifestObj<? extends JDataVersionedWrapper> names) {
delegateStore.commitTx(new TxManifestRaw(
names.written().stream()
.map(e -> Pair.of(e.getKey(), serializer.serialize(e.getValue())))
.toList()

View File

@@ -75,8 +75,8 @@ public class TransactionFactoryImpl implements TransactionFactory {
}
return switch (strategy) {
case OPTIMISTIC -> _source.get(type, key).data().map(JDataVersionedWrapper::data);
case WRITE -> _source.getWriteLocked(type, key).data().map(JDataVersionedWrapper::data);
case OPTIMISTIC -> (Optional<T>) _source.get(type, key).data().map(JDataVersionedWrapper::data);
case WRITE -> (Optional<T>) _source.getWriteLocked(type, key).data().map(JDataVersionedWrapper::data);
};
}

View File

@@ -6,5 +6,5 @@ import com.usatiuk.dhfs.objects.JDataVersionedWrapper;
import java.util.Optional;
public interface TransactionObject<T extends JData> {
Optional<JDataVersionedWrapper<T>> data();
Optional<JDataVersionedWrapper> data();
}

View File

@@ -1,4 +1,4 @@
dhfs.objects.persistence=files
dhfs.objects.persistence=lmdb
dhfs.objects.writeback.limit=134217728
dhfs.objects.lru.limit=134217728
dhfs.objects.lru.print-stats=true