diff --git a/dhfs-parent/.run/Main.run.xml b/dhfs-parent/.run/Main.run.xml
index cf3d6aae..867167f1 100644
--- a/dhfs-parent/.run/Main.run.xml
+++ b/dhfs-parent/.run/Main.run.xml
@@ -1,18 +1,16 @@
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/dhfs-parent/dhfs-app/src/main/resources/application.properties b/dhfs-parent/dhfs-app/src/main/resources/application.properties
index f3000b63..a1e4bafe 100644
--- a/dhfs-parent/dhfs-app/src/main/resources/application.properties
+++ b/dhfs-parent/dhfs-app/src/main/resources/application.properties
@@ -14,8 +14,9 @@ dhfs.objects.persistence.stuff.root=${HOME}/dhfs_default/data/stuff
dhfs.fuse.debug=false
dhfs.fuse.enabled=true
dhfs.files.allow_recursive_delete=false
-dhfs.files.target_chunk_size=2097152
-dhfs.files.target_chunk_alignment=19
+dhfs.files.target_chunk_size=524288
+dhfs.files.max_chunk_size=524288
+dhfs.files.target_chunk_alignment=17
dhfs.objects.deletion.delay=1000
dhfs.objects.deletion.can-delete-retry-delay=10000
dhfs.objects.ref_verification=true
@@ -28,7 +29,7 @@ dhfs.objects.opsender.batch-size=100
dhfs.objects.lock_timeout_secs=2
dhfs.local-discovery=true
dhfs.peerdiscovery.timeout=10000
-quarkus.log.category."com.usatiuk".min-level=TRACE
-quarkus.log.category."com.usatiuk".level=TRACE
+quarkus.log.category."com.usatiuk".min-level=INFO
+quarkus.log.category."com.usatiuk".level=INFO
quarkus.http.insecure-requests=enabled
quarkus.http.ssl.client-auth=required
diff --git a/dhfs-parent/dhfs-fs/src/main/java/com/usatiuk/dhfsfs/service/DhfsFileServiceImpl.java b/dhfs-parent/dhfs-fs/src/main/java/com/usatiuk/dhfsfs/service/DhfsFileServiceImpl.java
index 68b61aa6..566023a5 100644
--- a/dhfs-parent/dhfs-fs/src/main/java/com/usatiuk/dhfsfs/service/DhfsFileServiceImpl.java
+++ b/dhfs-parent/dhfs-fs/src/main/java/com/usatiuk/dhfsfs/service/DhfsFileServiceImpl.java
@@ -55,6 +55,9 @@ public class DhfsFileServiceImpl implements DhfsFileService {
@ConfigProperty(name = "dhfs.files.target_chunk_size")
int targetChunkSize;
+ @ConfigProperty(name = "dhfs.files.max_chunk_size")
+ int maxChunkSize;
+
@ConfigProperty(name = "dhfs.files.use_hash_for_chunks")
boolean useHashForChunks;
@@ -360,16 +363,10 @@ public class DhfsFileServiceImpl implements DhfsFileService {
var file = remoteTx.getData(File.class, fileUuid, LockingStrategy.WRITE).orElse(null);
if (file == null) {
- Log.error("File not found when trying to write: " + fileUuid);
- return -1L;
+ throw new StatusRuntimeException(Status.NOT_FOUND.withDescription("File not found when trying to write: " + fileUuid));
}
- if (writeLogging) {
- Log.info("Writing to file: " + file.key() + " size=" + size(fileUuid) + " "
- + offset + " " + data.size());
- }
-
- NavigableMap removedChunks = new TreeMap<>();
+ Map removedChunks = new HashMap<>();
long realOffset = targetChunkAlignment >= 0 ? alignDown(offset, targetChunkAlignment) : offset;
long writeEnd = offset + data.size();
@@ -407,7 +404,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
}
- NavigableMap newChunks = new TreeMap<>();
+ Map newChunks = new HashMap<>();
if (existingEnd < offset) {
if (!pendingPrefix.isEmpty()) {
@@ -424,12 +421,13 @@ public class DhfsFileServiceImpl implements DhfsFileService {
int combinedSize = pendingWrites.size();
{
- int targetChunkSize = 1 << targetChunkAlignment;
int cur = 0;
while (cur < combinedSize) {
int end;
- if (targetChunkAlignment < 0)
+ if (combinedSize - cur < maxChunkSize)
+ end = combinedSize;
+ else if (targetChunkAlignment < 0)
end = combinedSize;
else
end = Math.min(cur + targetChunkSize, combinedSize);
@@ -550,7 +548,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
});
}
- private void fillZeros(long fillStart, long length, NavigableMap newChunks) {
+ private void fillZeros(long fillStart, long length, Map newChunks) {
long combinedSize = (length - fillStart);
long start = fillStart;
diff --git a/dhfs-parent/dhfs-fuse/src/main/java/com/usatiuk/dhfsfuse/DhfsFuse.java b/dhfs-parent/dhfs-fuse/src/main/java/com/usatiuk/dhfsfuse/DhfsFuse.java
index 666156e3..97a0cf7a 100644
--- a/dhfs-parent/dhfs-fuse/src/main/java/com/usatiuk/dhfsfuse/DhfsFuse.java
+++ b/dhfs-parent/dhfs-fuse/src/main/java/com/usatiuk/dhfsfuse/DhfsFuse.java
@@ -7,6 +7,7 @@ import com.usatiuk.dhfsfs.service.DirectoryNotEmptyException;
import com.usatiuk.dhfsfs.service.GetattrRes;
import com.usatiuk.kleppmanntree.AlreadyExistsException;
import com.usatiuk.objects.JObjectKey;
+import com.usatiuk.utils.UninitializedByteBuffer;
import com.usatiuk.utils.UnsafeAccessor;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
@@ -28,7 +29,6 @@ import ru.serce.jnrfuse.struct.FuseFileInfo;
import ru.serce.jnrfuse.struct.Statvfs;
import ru.serce.jnrfuse.struct.Timespec;
-import java.nio.ByteBuffer;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Optional;
@@ -243,7 +243,7 @@ public class DhfsFuse extends FuseStubFS {
if (offset < 0) return -ErrorCodes.EINVAL();
try {
var fileKey = getFromHandle(fi.fh.get());
- var buffer = ByteBuffer.allocateDirect((int) size);
+ var buffer = UninitializedByteBuffer.allocate((int) size);
UnsafeAccessor.get().getUnsafe().copyMemory(
buf.address(),
diff --git a/dhfs-parent/objects/pom.xml b/dhfs-parent/objects/pom.xml
index f79b8e9b..534d8cc0 100644
--- a/dhfs-parent/objects/pom.xml
+++ b/dhfs-parent/objects/pom.xml
@@ -18,6 +18,11 @@
+
+ org.apache.fury
+ fury-core
+ 0.10.1
+
net.jqwik
jqwik
diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/JObjectKeyImpl.java b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/JObjectKeyImpl.java
index 584c35b5..eca0af53 100644
--- a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/JObjectKeyImpl.java
+++ b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/JObjectKeyImpl.java
@@ -1,5 +1,7 @@
package com.usatiuk.objects;
+import com.usatiuk.utils.UninitializedByteBuffer;
+
import java.io.Serial;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
@@ -46,7 +48,7 @@ public final class JObjectKeyImpl implements JObjectKey {
synchronized (this) {
if (_bb != null) return _bb;
var bytes = value.getBytes(StandardCharsets.ISO_8859_1);
- var directBb = ByteBuffer.allocateDirect(bytes.length);
+ var directBb = UninitializedByteBuffer.allocate(bytes.length);
directBb.put(bytes);
directBb.flip();
_bb = directBb;
diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/JavaDataSerializer.java b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/JavaDataSerializer.java
index 14581634..75bde6af 100644
--- a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/JavaDataSerializer.java
+++ b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/JavaDataSerializer.java
@@ -3,26 +3,30 @@ package com.usatiuk.objects;
import com.google.protobuf.ByteString;
import com.google.protobuf.UnsafeByteOperations;
-import com.usatiuk.utils.SerializationHelper;
import io.quarkus.arc.DefaultBean;
import jakarta.enterprise.context.ApplicationScoped;
+import org.apache.fury.Fury;
+import org.apache.fury.ThreadSafeFury;
+import org.apache.fury.config.Language;
-import java.io.IOException;
import java.nio.ByteBuffer;
@ApplicationScoped
@DefaultBean
public class JavaDataSerializer implements ObjectSerializer {
+ private static final ThreadSafeFury fury = Fury.builder().withLanguage(Language.JAVA)
+ // Allow to deserialize objects unknown types,
+ // more flexible but less secure.
+ .requireClassRegistration(false)
+ .buildThreadSafeFury();
+
@Override
public ByteString serialize(JData obj) {
- return SerializationHelper.serialize(obj);
+ return UnsafeByteOperations.unsafeWrap(fury.serialize(obj));
}
+ @Override
public JData deserialize(ByteBuffer data) {
- try (var is = UnsafeByteOperations.unsafeWrap(data).newInput()) {
- return SerializationHelper.deserialize(is);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
+ return (JData) fury.deserialize(data);
}
}
diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/iterators/IterProdFn2.java b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/iterators/IterProdFn2.java
new file mode 100644
index 00000000..d51afdf4
--- /dev/null
+++ b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/iterators/IterProdFn2.java
@@ -0,0 +1,8 @@
+package com.usatiuk.objects.iterators;
+
+import java.util.stream.Stream;
+
+@FunctionalInterface
+public interface IterProdFn2, V> {
+ Stream>> get(IteratorStart start, K key);
+}
diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/iterators/MergingKvIterator.java b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/iterators/MergingKvIterator.java
index 49953199..c144be88 100644
--- a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/iterators/MergingKvIterator.java
+++ b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/iterators/MergingKvIterator.java
@@ -13,6 +13,7 @@ public class MergingKvIterator, V> extends ReversibleKvI
private final NavigableMap> _sortedIterators = new TreeMap<>();
private final String _name;
private final List> _iterators;
+
public MergingKvIterator(String name, IteratorStart startType, K startKey, List> iterators) {
_goingForward = true;
_name = name;
@@ -73,6 +74,84 @@ public class MergingKvIterator, V> extends ReversibleKvI
advanceIterator(iterator);
}
+// Log.tracev("{0} Initialized: {1}", _name, _sortedIterators);
+// switch (startType) {
+//// case LT -> {
+//// assert _sortedIterators.isEmpty() || _sortedIterators.firstKey().compareTo(initialStartKey) < 0;
+//// }
+//// case LE -> {
+//// assert _sortedIterators.isEmpty() || _sortedIterators.firstKey().compareTo(initialStartKey) <= 0;
+//// }
+// case GT -> {
+// assert _sortedIterators.isEmpty() || _sortedIterators.firstKey().compareTo(startKey) > 0;
+// }
+// case GE -> {
+// assert _sortedIterators.isEmpty() || _sortedIterators.firstKey().compareTo(startKey) >= 0;
+// }
+// }
+ }
+
+ public MergingKvIterator(String name, IteratorStart startType, K startKey, IterProdFn2 iteratorsProd) {
+ _goingForward = true;
+ _name = name;
+
+ // Why streams are so slow?
+ {
+ var iterators = iteratorsProd.get(startType, startKey).toList();
+ IteratorEntry[] iteratorEntries = new IteratorEntry[iterators.size()];
+ for (int i = 0; i < iterators.size(); i++) {
+ iteratorEntries[i] = new IteratorEntry<>(i, (CloseableKvIterator) iterators.get(i));
+ }
+ _iterators = List.of(iteratorEntries);
+ }
+
+ if (startType == IteratorStart.LT || startType == IteratorStart.LE) {
+ // Starting at a greatest key less than/less or equal than:
+ // We have a bunch of iterators that have given us theirs "greatest LT/LE key"
+ // now we need to pick the greatest of those to start with
+ // But if some of them don't have a lesser key, we need to pick the smallest of those
+
+ K greatestLess = null;
+ K smallestMore = null;
+
+ for (var ite : _iterators) {
+ var it = ite.iterator();
+ if (it.hasNext()) {
+ var peeked = it.peekNextKey();
+ if (startType == IteratorStart.LE ? peeked.compareTo(startKey) <= 0 : peeked.compareTo(startKey) < 0) {
+ if (greatestLess == null || peeked.compareTo(greatestLess) > 0) {
+ greatestLess = peeked;
+ }
+ } else {
+ if (smallestMore == null || peeked.compareTo(smallestMore) < 0) {
+ smallestMore = peeked;
+ }
+ }
+ }
+ }
+
+ K initialMaxValue;
+ if (greatestLess != null)
+ initialMaxValue = greatestLess;
+ else
+ initialMaxValue = smallestMore;
+
+ if (initialMaxValue == null) {
+ // Empty iterators
+ }
+
+ for (var ite : _iterators) {
+ var iterator = ite.iterator();
+ while (iterator.hasNext() && iterator.peekNextKey().compareTo(initialMaxValue) < 0) {
+ iterator.skip();
+ }
+ }
+ }
+
+ for (IteratorEntry iterator : _iterators) {
+ advanceIterator(iterator);
+ }
+
// Log.tracev("{0} Initialized: {1}", _name, _sortedIterators);
// switch (startType) {
//// case LT -> {
diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/iterators/TombstoneMergingKvIterator.java b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/iterators/TombstoneMergingKvIterator.java
index 097b23e8..07290c3f 100644
--- a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/iterators/TombstoneMergingKvIterator.java
+++ b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/iterators/TombstoneMergingKvIterator.java
@@ -19,4 +19,18 @@ public abstract class TombstoneMergingKvIterator {
public static , V> CloseableKvIterator of(String name, IteratorStart startType, K startKey, IterProdFn>... iterators) {
return of(name, startType, startKey, List.of(iterators));
}
+
+ public static , V> CloseableKvIterator of(String name, IteratorStart startType, K startKey, IterProdFn2 itProd) {
+ return new PredicateKvIterator, V>(
+ new MergingKvIterator>(name + "-merging", startType, startKey, (IterProdFn2>) itProd),
+ startType, startKey,
+ pair -> {
+// Log.tracev("{0} - Processing pair {1}", name, pair);
+ if (pair instanceof Tombstone) {
+ return null;
+ }
+ return ((Data) pair).value();
+ });
+ }
+
}
diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/snapshot/Snapshot.java b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/snapshot/Snapshot.java
index 6b1a6bdd..7fbcab3a 100644
--- a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/snapshot/Snapshot.java
+++ b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/snapshot/Snapshot.java
@@ -2,13 +2,16 @@ package com.usatiuk.objects.snapshot;
import com.usatiuk.objects.iterators.CloseableKvIterator;
import com.usatiuk.objects.iterators.IteratorStart;
+import com.usatiuk.objects.iterators.MaybeTombstone;
+import com.usatiuk.objects.iterators.Tombstone;
import com.usatiuk.utils.AutoCloseableNoThrow;
import javax.annotation.Nonnull;
import java.util.Optional;
+import java.util.stream.Stream;
public interface Snapshot, V> extends AutoCloseableNoThrow {
- CloseableKvIterator getIterator(IteratorStart start, K key);
+ Stream>> getIterator(IteratorStart start, K key);
@Nonnull
Optional readObject(K name);
diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/stores/CachingObjectPersistentStore.java b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/stores/CachingObjectPersistentStore.java
index a04c70e8..8c1b92ae 100644
--- a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/stores/CachingObjectPersistentStore.java
+++ b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/stores/CachingObjectPersistentStore.java
@@ -21,6 +21,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Stream;
@ApplicationScoped
public class CachingObjectPersistentStore {
@@ -33,6 +34,7 @@ public class CachingObjectPersistentStore {
private ExecutorService _statusExecutor;
private AtomicLong _cached = new AtomicLong();
private AtomicLong _cacheTries = new AtomicLong();
+
public CachingObjectPersistentStore(@ConfigProperty(name = "dhfs.objects.lru.limit") int sizeLimit) {
_cache = new AtomicReference<>(
new Cache(TreePMap.empty(), 0, -1, sizeLimit)
@@ -150,10 +152,11 @@ public class CachingObjectPersistentStore {
}
@Override
- public CloseableKvIterator getIterator(IteratorStart start, JObjectKey key) {
- return TombstoneMergingKvIterator.of("cache", start, key,
- (mS, mK) -> new NavigableMapKvIterator>(_curCache.map(), mS, mK),
- (mS, mK) -> new CachingKvIterator(_backing.getIterator(start, key)));
+ public Stream>> getIterator(IteratorStart start, JObjectKey key) {
+ return Stream.concat(
+ Stream.of(new NavigableMapKvIterator>(_curCache.map(), start, key)),
+ _backing.getIterator(start, key).map(i -> new CachingKvIterator((CloseableKvIterator) (CloseableKvIterator) i))
+ );
}
@Nonnull
diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/stores/LmdbObjectPersistentStore.java b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/stores/LmdbObjectPersistentStore.java
index c922360b..c50335a2 100644
--- a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/stores/LmdbObjectPersistentStore.java
+++ b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/stores/LmdbObjectPersistentStore.java
@@ -3,10 +3,7 @@ package com.usatiuk.objects.stores;
import com.usatiuk.objects.JObjectKey;
import com.usatiuk.objects.JObjectKeyMax;
import com.usatiuk.objects.JObjectKeyMin;
-import com.usatiuk.objects.iterators.CloseableKvIterator;
-import com.usatiuk.objects.iterators.IteratorStart;
-import com.usatiuk.objects.iterators.KeyPredicateKvIterator;
-import com.usatiuk.objects.iterators.ReversibleKvIterator;
+import com.usatiuk.objects.iterators.*;
import com.usatiuk.objects.snapshot.Snapshot;
import io.quarkus.arc.properties.IfBuildProperty;
import io.quarkus.logging.Log;
@@ -28,6 +25,7 @@ import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.util.NoSuchElementException;
import java.util.Optional;
+import java.util.stream.Stream;
import static org.lmdbjava.DbiFlags.MDB_CREATE;
import static org.lmdbjava.Env.create;
@@ -112,9 +110,9 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore {
private boolean _closed = false;
@Override
- public CloseableKvIterator getIterator(IteratorStart start, JObjectKey key) {
+ public Stream>> getIterator(IteratorStart start, JObjectKey key) {
assert !_closed;
- return new KeyPredicateKvIterator<>(new LmdbKvIterator(_txn, start, key), start, key, (k) -> !k.value().equals(DB_VER_OBJ_NAME_STR));
+ return Stream.of(new KeyPredicateKvIterator<>(new LmdbKvIterator(_txn, start, key), start, key, (k) -> !k.value().equals(DB_VER_OBJ_NAME_STR)));
}
@Nonnull
@@ -195,7 +193,7 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore {
return _root.toFile().getUsableSpace();
}
- private class LmdbKvIterator extends ReversibleKvIterator {
+ private class LmdbKvIterator extends ReversibleKvIterator> {
private static final Cleaner CLEANER = Cleaner.create();
private final Txn _txn; // Managed by the snapshot
private final Cursor _cursor;
@@ -350,13 +348,13 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore {
}
@Override
- protected Pair nextImpl() {
+ protected Pair> nextImpl() {
if (!_hasNext) {
throw new NoSuchElementException("No more elements");
}
// TODO: Right now with java serialization it doesn't matter, it's all copied to arrays anyway
var val = _cursor.val();
- var ret = Pair.of(JObjectKey.fromByteBuffer(_cursor.key()), val.asReadOnlyBuffer());
+ Pair> ret = Pair.of(JObjectKey.fromByteBuffer(_cursor.key()), new DataWrapper<>(val.asReadOnlyBuffer()));
if (_goingForward)
_hasNext = _cursor.next();
else
diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/stores/MemoryObjectPersistentStore.java b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/stores/MemoryObjectPersistentStore.java
index 79765b4f..3cefb2f6 100644
--- a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/stores/MemoryObjectPersistentStore.java
+++ b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/stores/MemoryObjectPersistentStore.java
@@ -2,10 +2,7 @@ package com.usatiuk.objects.stores;
import com.google.protobuf.ByteString;
import com.usatiuk.objects.JObjectKey;
-import com.usatiuk.objects.iterators.CloseableKvIterator;
-import com.usatiuk.objects.iterators.IteratorStart;
-import com.usatiuk.objects.iterators.MappingKvIterator;
-import com.usatiuk.objects.iterators.NavigableMapKvIterator;
+import com.usatiuk.objects.iterators.*;
import com.usatiuk.objects.snapshot.Snapshot;
import io.quarkus.arc.properties.IfBuildProperty;
import jakarta.enterprise.context.ApplicationScoped;
@@ -15,6 +12,7 @@ import javax.annotation.Nonnull;
import java.nio.ByteBuffer;
import java.util.Optional;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Stream;
@ApplicationScoped
@IfBuildProperty(name = "dhfs.objects.persistence", stringValue = "memory")
@@ -31,8 +29,8 @@ public class MemoryObjectPersistentStore implements ObjectPersistentStore {
private final long _lastCommitId = MemoryObjectPersistentStore.this._lastCommitId;
@Override
- public CloseableKvIterator getIterator(IteratorStart start, JObjectKey key) {
- return new MappingKvIterator<>(new NavigableMapKvIterator<>(_objects, start, key), ByteString::asReadOnlyByteBuffer);
+ public Stream>> getIterator(IteratorStart start, JObjectKey key) {
+ return Stream.of(new MappingKvIterator<>(new NavigableMapKvIterator<>(_objects, start, key), s -> new DataWrapper<>(s.asReadOnlyByteBuffer())));
}
@Nonnull
diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/stores/SerializingObjectPersistentStore.java b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/stores/SerializingObjectPersistentStore.java
index 4af76bde..ef27e1e6 100644
--- a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/stores/SerializingObjectPersistentStore.java
+++ b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/stores/SerializingObjectPersistentStore.java
@@ -1,13 +1,9 @@
package com.usatiuk.objects.stores;
-import com.google.protobuf.ByteString;
import com.usatiuk.objects.JDataVersionedWrapper;
import com.usatiuk.objects.JDataVersionedWrapperSerializer;
import com.usatiuk.objects.JObjectKey;
-import com.usatiuk.objects.ObjectSerializer;
-import com.usatiuk.objects.iterators.CloseableKvIterator;
-import com.usatiuk.objects.iterators.IteratorStart;
-import com.usatiuk.objects.iterators.MappingKvIterator;
+import com.usatiuk.objects.iterators.*;
import com.usatiuk.objects.snapshot.Snapshot;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
@@ -16,6 +12,7 @@ import org.apache.commons.lang3.tuple.Pair;
import javax.annotation.Nonnull;
import java.nio.ByteBuffer;
import java.util.Optional;
+import java.util.stream.Stream;
@ApplicationScoped
public class SerializingObjectPersistentStore {
@@ -30,8 +27,9 @@ public class SerializingObjectPersistentStore {
private final Snapshot _backing = delegateStore.getSnapshot();
@Override
- public CloseableKvIterator getIterator(IteratorStart start, JObjectKey key) {
- return new MappingKvIterator<>(_backing.getIterator(start, key), d -> serializer.deserialize(d));
+ public Stream>> getIterator(IteratorStart start, JObjectKey key) {
+ return _backing.getIterator(start, key).map(i -> new MappingKvIterator, MaybeTombstone>(i,
+ d -> serializer.deserialize(((DataWrapper) d).value())));
}
@Nonnull
diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/stores/WritebackObjectPersistentStore.java b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/stores/WritebackObjectPersistentStore.java
index 41a449e9..ea2ac1d8 100644
--- a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/stores/WritebackObjectPersistentStore.java
+++ b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/stores/WritebackObjectPersistentStore.java
@@ -3,7 +3,10 @@ package com.usatiuk.objects.stores;
import com.usatiuk.objects.JDataVersionedWrapper;
import com.usatiuk.objects.JDataVersionedWrapperImpl;
import com.usatiuk.objects.JObjectKey;
-import com.usatiuk.objects.iterators.*;
+import com.usatiuk.objects.iterators.CloseableKvIterator;
+import com.usatiuk.objects.iterators.IteratorStart;
+import com.usatiuk.objects.iterators.MaybeTombstone;
+import com.usatiuk.objects.iterators.NavigableMapKvIterator;
import com.usatiuk.objects.snapshot.Snapshot;
import com.usatiuk.objects.transaction.TxCommitException;
import com.usatiuk.objects.transaction.TxRecord;
@@ -27,6 +30,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
+import java.util.stream.Stream;
@ApplicationScoped
public class WritebackObjectPersistentStore {
@@ -140,7 +144,7 @@ public class WritebackObjectPersistentStore {
Log.tracev("Bundle {0} committed", bundle.id());
- while (true) {
+ synchronized (_pendingWrites) {
var curPw = _pendingWrites.get();
var curPwMap = curPw.pendingWrites();
for (var e : bundle._entries.values()) {
@@ -153,8 +157,7 @@ public class WritebackObjectPersistentStore {
bundle.id(),
curPw.lastCommittedId()
);
- if (_pendingWrites.compareAndSet(curPw, newCurPw))
- break;
+ _pendingWrites.compareAndSet(curPw, newCurPw);
}
List> callbacks = new ArrayList<>();
@@ -251,7 +254,7 @@ public class WritebackObjectPersistentStore {
}
}
- while (true) {
+ synchronized (_pendingWrites) {
var curPw = _pendingWrites.get();
var curPwMap = curPw.pendingWrites();
for (var e : ((TxBundle) bundle)._entries.values()) {
@@ -272,18 +275,17 @@ public class WritebackObjectPersistentStore {
bundle.id()
);
- if (!_pendingWrites.compareAndSet(curPw, newCurPw))
- continue;
-
- ((TxBundle) bundle).setReady();
- if (_pendingBundles.peek() == bundle)
- _pendingBundles.notify();
- synchronized (_flushWaitSynchronizer) {
- currentSize += ((TxBundle) bundle).size();
- }
-
- return bundle.id();
+ _pendingWrites.compareAndSet(curPw, newCurPw);
}
+
+ ((TxBundle) bundle).setReady();
+ if (_pendingBundles.peek() == bundle)
+ _pendingBundles.notify();
+ synchronized (_flushWaitSynchronizer) {
+ currentSize += ((TxBundle) bundle).size();
+ }
+
+ return bundle.id();
}
}
}
@@ -341,10 +343,8 @@ public class WritebackObjectPersistentStore {
private final long txId = finalPw.lastCommittedId();
@Override
- public CloseableKvIterator getIterator(IteratorStart start, JObjectKey key) {
- return TombstoneMergingKvIterator.of("writeback-ps", start, key,
- (tS, tK) -> new NavigableMapKvIterator<>(_pendingWrites, tS, tK),
- (tS, tK) -> (CloseableKvIterator>) (CloseableKvIterator) _cache.getIterator(tS, tK));
+ public Stream>> getIterator(IteratorStart start, JObjectKey key) {
+ return Stream.concat(Stream.of(new NavigableMapKvIterator<>(_pendingWrites, start, key)), _cache.getIterator(start, key));
}
@Nonnull
@@ -390,7 +390,7 @@ public class WritebackObjectPersistentStore {
}
private static class TxBundle {
- private final LinkedHashMap _entries = new LinkedHashMap<>();
+ private final HashMap _entries = new HashMap<>();
private final ArrayList _callbacks = new ArrayList<>();
private long _txId;
private volatile boolean _ready = false;
diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/transaction/TransactionFactoryImpl.java b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/transaction/TransactionFactoryImpl.java
index d13f35ac..786dbcba 100644
--- a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/transaction/TransactionFactoryImpl.java
+++ b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/transaction/TransactionFactoryImpl.java
@@ -13,6 +13,7 @@ import org.apache.commons.lang3.tuple.Pair;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import java.util.*;
+import java.util.stream.Stream;
@Singleton
public class TransactionFactoryImpl implements TransactionFactory {
@@ -162,16 +163,21 @@ public class TransactionFactoryImpl implements TransactionFactory {
@Override
public CloseableKvIterator getIterator(IteratorStart start, JObjectKey key) {
Log.tracev("Getting tx iterator with start={0}, key={1}", start, key);
- return new ReadTrackingIterator(TombstoneMergingKvIterator.of("tx", start, key,
- (tS, tK) -> new MappingKvIterator<>(new NavigableMapKvIterator<>(_writes, tS, tK),
+ return new ReadTrackingIterator(TombstoneMergingKvIterator.of("tx", start, key, (tS, tK) ->
+ Stream.concat(Stream.of(new MappingKvIterator<>(new NavigableMapKvIterator<>(_writes, tS, tK),
t -> switch (t) {
case TxRecord.TxObjectRecordWrite> write ->
- new DataWrapper<>(new ReadTrackingInternalCrapTx(write.data()));
- case TxRecord.TxObjectRecordDeleted deleted -> new TombstoneImpl<>();
+ new DataWrapper(new ReadTrackingInternalCrapTx(write.data()));
+ case TxRecord.TxObjectRecordDeleted deleted ->
+ new TombstoneImpl();
case null, default -> null;
- }),
- (tS, tK) -> new MappingKvIterator<>(_snapshot.getIterator(tS, tK),
- d -> new DataWrapper(new ReadTrackingInternalCrapSource(d)))));
+ })), _snapshot.getIterator(tS, tK).map(itin -> new MappingKvIterator, MaybeTombstone>(itin,
+ d -> switch (d) {
+ case Data w ->
+ new DataWrapper<>(new ReadTrackingInternalCrapSource(w.value()));
+ case Tombstone t -> new TombstoneImpl<>();
+ case null, default -> null;
+ })))));
}
@Override
diff --git a/dhfs-parent/pom.xml b/dhfs-parent/pom.xml
index 2fa02021..fe3ab053 100644
--- a/dhfs-parent/pom.xml
+++ b/dhfs-parent/pom.xml
@@ -102,6 +102,7 @@
-parameters
--add-exports
java.base/jdk.internal.access=ALL-UNNAMED
+ --enable-preview
@@ -119,6 +120,7 @@
--add-exports java.base/sun.nio.ch=ALL-UNNAMED
--add-exports java.base/jdk.internal.access=ALL-UNNAMED
--add-opens=java.base/java.nio=ALL-UNNAMED
+ --enable-preview
${skip.unit}
true
diff --git a/dhfs-parent/utils/src/main/java/com/usatiuk/utils/UninitializedByteBuffer.java b/dhfs-parent/utils/src/main/java/com/usatiuk/utils/UninitializedByteBuffer.java
new file mode 100644
index 00000000..85da4694
--- /dev/null
+++ b/dhfs-parent/utils/src/main/java/com/usatiuk/utils/UninitializedByteBuffer.java
@@ -0,0 +1,42 @@
+package com.usatiuk.utils;
+
+import java.lang.foreign.*;
+import java.lang.invoke.MethodHandle;
+import java.nio.ByteBuffer;
+import java.util.function.Consumer;
+
+public class UninitializedByteBuffer {
+
+ private static final Linker LINKER = Linker.nativeLinker();
+ private static final MethodHandle malloc = LINKER.downcallHandle(
+ LINKER.defaultLookup().find("malloc").orElseThrow(),
+ FunctionDescriptor.of(ValueLayout.ADDRESS, ValueLayout.JAVA_LONG)
+ );
+ private static final MethodHandle free = LINKER.downcallHandle(
+ LINKER.defaultLookup().find("free").orElseThrow(),
+ FunctionDescriptor.ofVoid(ValueLayout.ADDRESS)
+ );
+
+ public static ByteBuffer allocate(int capacity) {
+ UnsafeAccessor.get().getNioAccess().reserveMemory(capacity, capacity);
+ // Invoke malloc(), which returns a pointer
+ MemorySegment segment = null;
+ try {
+ segment = (MemorySegment) malloc.invokeExact((long) capacity);
+ } catch (Throwable e) {
+ throw new RuntimeException(e);
+ }
+
+
+ Consumer cleanup = s -> {
+ try {
+ free.invokeExact(s);
+ UnsafeAccessor.get().getNioAccess().unreserveMemory(capacity, capacity);
+ } catch (Throwable e) {
+ throw new RuntimeException(e);
+ }
+ };
+ var reint = segment.reinterpret(capacity, Arena.ofAuto(), cleanup);
+ return reint.asByteBuffer();
+ }
+}