1 Commits

Author SHA1 Message Date
a410ed3030 dump 2025-04-29 00:46:58 +02:00
35 changed files with 1281 additions and 711 deletions

View File

@@ -1,16 +1,17 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="Main 2" type="QsApplicationConfigurationType" factoryName="QuarkusApplication">
<option name="MAIN_CLASS_NAME" value="com.usatiuk.dhfsapp.Main" />
<module name="dhfs-app" />
<option name="VM_PARAMETERS" value="-XX:+UnlockDiagnosticVMOptions -XX:+DebugNonSafepoints --enable-preview --add-exports java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-exports java.base/jdk.internal.access=ALL-UNNAMED -ea -Xmx2G -Ddhfs.webui.root=$ProjectFileDir$/../webui/dist -Ddhfs.fuse.root=${HOME}/dhfs_test/2/fuse -Ddhfs.objects.persistence.files.root=${HOME}/dhfs_test/2/data -Ddhfs.objects.persistence.stuff.root=${HOME}/dhfs_test/2/data/stuff -Ddhfs.objects.peerdiscovery.broadcast=false -Dquarkus.http.port=9020 -Dquarkus.http.ssl-port=9021 -Ddhfs.peerdiscovery.preset-uuid=22000000-0000-0000-0000-000000000000 -Ddhfs.peerdiscovery.static-peers=11000000-0000-0000-0000-000000000000:127.0.0.1:9010:9011" />
<extension name="coverage">
<pattern>
<option name="PATTERN" value="com.usatiuk.dhfs.remoteobj.*" />
<option name="ENABLED" value="true" />
</pattern>
</extension>
<method v="2">
<option name="Make" enabled="true" />
</method>
</configuration>
<configuration default="false" name="Main 2" type="QsApplicationConfigurationType" factoryName="QuarkusApplication">
<option name="MAIN_CLASS_NAME" value="com.usatiuk.dhfsapp.Main"/>
<module name="dhfs-app"/>
<option name="VM_PARAMETERS"
value="-XX:+UnlockDiagnosticVMOptions -XX:+DebugNonSafepoints --add-exports java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-exports java.base/jdk.internal.access=ALL-UNNAMED -ea -Xmx2G -Ddhfs.webui.root=$ProjectFileDir$/../webui/dist -Ddhfs.fuse.root=${HOME}/dhfs_test/2/fuse -Ddhfs.objects.persistence.files.root=${HOME}/dhfs_test/2/data -Ddhfs.objects.persistence.stuff.root=${HOME}/dhfs_test/2/data/stuff -Ddhfs.objects.peerdiscovery.broadcast=false -Dquarkus.http.port=9020 -Dquarkus.http.ssl-port=9021 -Ddhfs.peerdiscovery.preset-uuid=22000000-0000-0000-0000-000000000000 -Ddhfs.peerdiscovery.static-peers=11000000-0000-0000-0000-000000000000:127.0.0.1:9010:9011"/>
<extension name="coverage">
<pattern>
<option name="PATTERN" value="com.usatiuk.dhfs.remoteobj.*"/>
<option name="ENABLED" value="true"/>
</pattern>
</extension>
<method v="2">
<option name="Make" enabled="true"/>
</method>
</configuration>
</component>

View File

@@ -29,7 +29,7 @@ dhfs.objects.opsender.batch-size=100
dhfs.objects.lock_timeout_secs=2
dhfs.local-discovery=true
dhfs.peerdiscovery.timeout=10000
quarkus.log.category."com.usatiuk".min-level=TRACE
quarkus.log.category."com.usatiuk".level=TRACE
quarkus.log.category."com.usatiuk".min-level=INFO
quarkus.log.category."com.usatiuk".level=INFO
quarkus.http.insecure-requests=enabled
quarkus.http.ssl.client-auth=required

View File

@@ -70,7 +70,6 @@ public class DhfsImage implements Future<String> {
"--add-exports", "java.base/sun.nio.ch=ALL-UNNAMED",
"--add-exports", "java.base/jdk.internal.access=ALL-UNNAMED",
"--add-opens=java.base/java.nio=ALL-UNNAMED",
"--enable-preview",
"-Ddhfs.objects.peerdiscovery.interval=1s",
"-Ddhfs.objects.invalidation.delay=100",
"-Ddhfs.objects.deletion.delay=0",

View File

@@ -55,7 +55,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
@ConfigProperty(name = "dhfs.files.target_chunk_size")
int targetChunkSize;
@ConfigProperty(name = "dhfs.files.max_chunk_size", defaultValue = "524288")
@ConfigProperty(name = "dhfs.files.max_chunk_size")
int maxChunkSize;
@ConfigProperty(name = "dhfs.files.use_hash_for_chunks")

View File

@@ -1,7 +1,6 @@
package com.usatiuk.dhfsfuse;
import com.google.protobuf.UnsafeByteOperations;
import com.kenai.jffi.MemoryIO;
import com.sun.security.auth.module.UnixSystem;
import com.usatiuk.dhfsfs.service.DhfsFileService;
import com.usatiuk.dhfsfs.service.DirectoryNotEmptyException;
@@ -20,20 +19,16 @@ import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
import jakarta.inject.Inject;
import jnr.ffi.Pointer;
import jnr.ffi.Runtime;
import jnr.ffi.Struct;
import jnr.ffi.types.off_t;
import jnr.ffi.types.size_t;
import org.apache.commons.lang3.SystemUtils;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import ru.serce.jnrfuse.ErrorCodes;
import ru.serce.jnrfuse.FuseFillDir;
import ru.serce.jnrfuse.FuseStubFS;
import ru.serce.jnrfuse.NotImplemented;
import ru.serce.jnrfuse.flags.FuseBufFlags;
import ru.serce.jnrfuse.struct.*;
import ru.serce.jnrfuse.struct.FileStat;
import ru.serce.jnrfuse.struct.FuseFileInfo;
import ru.serce.jnrfuse.struct.Statvfs;
import ru.serce.jnrfuse.struct.Timespec;
import java.nio.ByteBuffer;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Optional;
@@ -245,19 +240,17 @@ public class DhfsFuse extends FuseStubFS {
@Override
public int write(String path, Pointer buf, long size, long offset, FuseFileInfo fi) {
var buffer = UninitializedByteBuffer.allocate((int) size);
UnsafeAccessor.get().getUnsafe().copyMemory(
buf.address(),
UnsafeAccessor.get().getNioAccess().getBufferAddress(buffer),
size
);
return write(path, buffer, offset, fi);
}
public int write(String path, ByteBuffer buffer, long offset, FuseFileInfo fi) {
if (offset < 0) return -ErrorCodes.EINVAL();
try {
var fileKey = getFromHandle(fi.fh.get());
var buffer = UninitializedByteBuffer.allocate((int) size);
UnsafeAccessor.get().getUnsafe().copyMemory(
buf.address(),
UnsafeAccessor.get().getNioAccess().getBufferAddress(buffer),
size
);
var written = fileService.write(fileKey, offset, UnsafeByteOperations.unsafeWrap(buffer));
return written.intValue();
} catch (Exception e) {
@@ -428,29 +421,4 @@ public class DhfsFuse extends FuseStubFS {
return -ErrorCodes.EIO();
}
}
@Override
public int write_buf(String path, FuseBufvec buf, @off_t long off, FuseFileInfo fi) {
int size = (int) libFuse.fuse_buf_size(buf);
FuseBufvec tmpVec = new FuseBufvec(Runtime.getSystemRuntime());
long tmpVecAddr = MemoryIO.getInstance().allocateMemory(Struct.size(tmpVec), false);
try {
tmpVec.useMemory(Pointer.wrap(Runtime.getSystemRuntime(), tmpVecAddr));
FuseBufvec.init(tmpVec, size);
var bb = UninitializedByteBuffer.allocate(size);
var mem = UninitializedByteBuffer.getAddress(bb);
tmpVec.buf.mem.set(mem);
tmpVec.buf.size.set(size);
int res = (int) libFuse.fuse_buf_copy(tmpVec, buf, 0);
if (res != size) {
Log.errorv("fuse_buf_copy failed: {0} != {1}", res, size);
return -ErrorCodes.ENOMEM();
}
return write(path, bb, off, fi);
} finally {
if (tmpVecAddr != 0) {
MemoryIO.getInstance().freeMemory(tmpVecAddr);
}
}
}
}

View File

@@ -18,6 +18,11 @@
</properties>
<dependencies>
<dependency>
<groupId>org.apache.fury</groupId>
<artifactId>fury-core</artifactId>
<version>0.10.1</version>
</dependency>
<dependency>
<groupId>net.jqwik</groupId>
<artifactId>jqwik</artifactId>

View File

@@ -3,26 +3,30 @@ package com.usatiuk.objects;
import com.google.protobuf.ByteString;
import com.google.protobuf.UnsafeByteOperations;
import com.usatiuk.utils.SerializationHelper;
import io.quarkus.arc.DefaultBean;
import jakarta.enterprise.context.ApplicationScoped;
import org.apache.fury.Fury;
import org.apache.fury.ThreadSafeFury;
import org.apache.fury.config.Language;
import java.io.IOException;
import java.nio.ByteBuffer;
@ApplicationScoped
@DefaultBean
public class JavaDataSerializer implements ObjectSerializer<JData> {
private static final ThreadSafeFury fury = Fury.builder().withLanguage(Language.JAVA)
// Allow to deserialize objects unknown types,
// more flexible but less secure.
.requireClassRegistration(false)
.buildThreadSafeFury();
@Override
public ByteString serialize(JData obj) {
return SerializationHelper.serialize(obj);
return UnsafeByteOperations.unsafeWrap(fury.serialize(obj));
}
@Override
public JData deserialize(ByteBuffer data) {
try (var is = UnsafeByteOperations.unsafeWrap(data).newInput()) {
return SerializationHelper.deserialize(is);
} catch (IOException e) {
throw new RuntimeException(e);
}
return (JData) fury.deserialize(data);
}
}

View File

@@ -0,0 +1,6 @@
package com.usatiuk.objects.iterators;
@FunctionalInterface
public interface IterProdFn<K extends Comparable<K>, V> {
CloseableKvIterator<K, V> get(IteratorStart start, K key);
}

View File

@@ -0,0 +1,8 @@
package com.usatiuk.objects.iterators;
import java.util.stream.Stream;
@FunctionalInterface
public interface IterProdFn2<K extends Comparable<K>, V> {
Stream<CloseableKvIterator<K, MaybeTombstone<V>>> get(IteratorStart start, K key);
}

View File

@@ -11,15 +11,96 @@ import java.util.TreeMap;
public class MergingKvIterator<K extends Comparable<K>, V> extends ReversibleKvIterator<K, V> {
private final NavigableMap<K, IteratorEntry<K, V>> _sortedIterators = new TreeMap<>();
private final String _name;
private final List<IteratorEntry<K, V>> _iterators;
public MergingKvIterator(IteratorStart startType, K startKey, List<CloseableKvIterator<K, V>> iterators) {
public MergingKvIterator(String name, IteratorStart startType, K startKey, List<IterProdFn<K, V>> iterators) {
_goingForward = true;
_name = name;
// Why streams are so slow?
{
IteratorEntry<K, V>[] iteratorEntries = new IteratorEntry[iterators.size()];
for (int i = 0; i < iterators.size(); i++) {
iteratorEntries[i] = new IteratorEntry<>(i, iterators.get(i));
iteratorEntries[i] = new IteratorEntry<>(i, iterators.get(i).get(startType, startKey));
}
_iterators = List.of(iteratorEntries);
}
if (startType == IteratorStart.LT || startType == IteratorStart.LE) {
// Starting at a greatest key less than/less or equal than:
// We have a bunch of iterators that have given us theirs "greatest LT/LE key"
// now we need to pick the greatest of those to start with
// But if some of them don't have a lesser key, we need to pick the smallest of those
K greatestLess = null;
K smallestMore = null;
for (var ite : _iterators) {
var it = ite.iterator();
if (it.hasNext()) {
var peeked = it.peekNextKey();
if (startType == IteratorStart.LE ? peeked.compareTo(startKey) <= 0 : peeked.compareTo(startKey) < 0) {
if (greatestLess == null || peeked.compareTo(greatestLess) > 0) {
greatestLess = peeked;
}
} else {
if (smallestMore == null || peeked.compareTo(smallestMore) < 0) {
smallestMore = peeked;
}
}
}
}
K initialMaxValue;
if (greatestLess != null)
initialMaxValue = greatestLess;
else
initialMaxValue = smallestMore;
if (initialMaxValue == null) {
// Empty iterators
}
for (var ite : _iterators) {
var iterator = ite.iterator();
while (iterator.hasNext() && iterator.peekNextKey().compareTo(initialMaxValue) < 0) {
iterator.skip();
}
}
}
for (IteratorEntry<K, V> iterator : _iterators) {
advanceIterator(iterator);
}
// Log.tracev("{0} Initialized: {1}", _name, _sortedIterators);
// switch (startType) {
//// case LT -> {
//// assert _sortedIterators.isEmpty() || _sortedIterators.firstKey().compareTo(initialStartKey) < 0;
//// }
//// case LE -> {
//// assert _sortedIterators.isEmpty() || _sortedIterators.firstKey().compareTo(initialStartKey) <= 0;
//// }
// case GT -> {
// assert _sortedIterators.isEmpty() || _sortedIterators.firstKey().compareTo(startKey) > 0;
// }
// case GE -> {
// assert _sortedIterators.isEmpty() || _sortedIterators.firstKey().compareTo(startKey) >= 0;
// }
// }
}
public MergingKvIterator(String name, IteratorStart startType, K startKey, IterProdFn2<K, V> iteratorsProd) {
_goingForward = true;
_name = name;
// Why streams are so slow?
{
var iterators = iteratorsProd.get(startType, startKey).toList();
IteratorEntry<K, V>[] iteratorEntries = new IteratorEntry[iterators.size()];
for (int i = 0; i < iterators.size(); i++) {
iteratorEntries[i] = new IteratorEntry<>(i, (CloseableKvIterator<K, V>) iterators.get(i));
}
_iterators = List.of(iteratorEntries);
}
@@ -89,8 +170,8 @@ public class MergingKvIterator<K extends Comparable<K>, V> extends ReversibleKvI
}
@SafeVarargs
public MergingKvIterator(IteratorStart startType, K startKey, CloseableKvIterator<K, V>... iterators) {
this(startType, startKey, List.of(iterators));
public MergingKvIterator(String name, IteratorStart startType, K startKey, IterProdFn<K, V>... iterators) {
this(name, startType, startKey, List.of(iterators));
}
private void advanceIterator(IteratorEntry<K, V> iteratorEntry) {
@@ -149,6 +230,7 @@ public class MergingKvIterator<K extends Comparable<K>, V> extends ReversibleKvI
|| (!_goingForward && peekImpl().compareTo(cur.getKey()) >= 0))) {
skipImpl();
}
Log.tracev("{0} Reversed to {1}", _name, _sortedIterators);
}
@Override
@@ -196,14 +278,28 @@ public class MergingKvIterator<K extends Comparable<K>, V> extends ReversibleKvI
@Override
public String toString() {
return "MergingKvIterator{" +
"_name='" + _name + '\'' +
", _sortedIterators=" + _sortedIterators.keySet() +
", _iterators=" + _iterators +
'}';
}
private interface FirstMatchState<K extends Comparable<K>, V> {
}
private record IteratorEntry<K extends Comparable<K>, V>(int priority, CloseableKvIterator<K, V> iterator) {
public IteratorEntry<K, V> reversed() {
return new IteratorEntry<>(priority, iterator.reversed());
}
}
private record FirstMatchNone<K extends Comparable<K>, V>() implements FirstMatchState<K, V> {
}
private record FirstMatchFound<K extends Comparable<K>, V>(
CloseableKvIterator<K, V> iterator) implements FirstMatchState<K, V> {
}
private record FirstMatchConsumed<K extends Comparable<K>, V>() implements FirstMatchState<K, V> {
}
}

View File

@@ -2,26 +2,26 @@ package com.usatiuk.objects.iterators;
import org.apache.commons.lang3.tuple.Pair;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.function.Function;
public class TombstoneSkippingIterator<K extends Comparable<K>, V> extends ReversibleKvIterator<K, V> {
private final MergingKvIterator<K, MaybeTombstone<V>> _backing;
private Pair<K, V> _next = null;
public class PredicateKvIterator<K extends Comparable<K>, V, V_T> extends ReversibleKvIterator<K, V_T> {
private final CloseableKvIterator<K, V> _backing;
private final Function<V, V_T> _transformer;
private Pair<K, V_T> _next = null;
private boolean _checkedNext = false;
public TombstoneSkippingIterator(IteratorStart start, K startKey, List<CloseableKvIterator<K, MaybeTombstone<V>>> iterators) {
public PredicateKvIterator(CloseableKvIterator<K, V> backing, IteratorStart start, K startKey, Function<V, V_T> transformer) {
_goingForward = true;
_backing = new MergingKvIterator<>(start, startKey, iterators);
_backing = backing;
_transformer = transformer;
if (start == IteratorStart.GE || start == IteratorStart.GT)
return;
boolean shouldGoBack = false;
if (canHaveNext())
tryFillNext();
fillNext();
boolean shouldGoBack = false;
if (start == IteratorStart.LE) {
if (_next == null || _next.getKey().compareTo(startKey) > 0) {
shouldGoBack = true;
@@ -38,27 +38,34 @@ public class TombstoneSkippingIterator<K extends Comparable<K>, V> extends Rever
_backing.skipPrev();
fillNext();
_goingForward = true;
if (_next != null)
_backing.skip();
_backing.skip();
fillNext();
}
}
private boolean canHaveNext() {
return (_goingForward ? _backing.hasNext() : _backing.hasPrev());
}
private boolean tryFillNext() {
var next = _goingForward ? _backing.next() : _backing.prev();
if (next.getValue() instanceof Tombstone<?>)
return false;
_next = Pair.of(next.getKey(), ((Data<V>) next.getValue()).value());
return true;
// switch (start) {
// case LT -> {
//// assert _next == null || _next.getKey().compareTo(startKey) < 0;
// }
// case LE -> {
//// assert _next == null || _next.getKey().compareTo(startKey) <= 0;
// }
// case GT -> {
// assert _next == null || _next.getKey().compareTo(startKey) > 0;
// }
// case GE -> {
// assert _next == null || _next.getKey().compareTo(startKey) >= 0;
// }
// }
}
private void fillNext() {
while (_next == null && canHaveNext()) {
tryFillNext();
while ((_goingForward ? _backing.hasNext() : _backing.hasPrev()) && _next == null) {
var next = _goingForward ? _backing.next() : _backing.prev();
var transformed = _transformer.apply(next.getValue());
if (transformed == null)
continue;
_next = Pair.of(next.getKey(), transformed);
}
_checkedNext = true;
}
@@ -73,6 +80,9 @@ public class TombstoneSkippingIterator<K extends Comparable<K>, V> extends Rever
else if (!_goingForward && !wasAtEnd)
_backing.skipPrev();
// if (!wasAtEnd)
// Log.tracev("Skipped in reverse: {0}", _next);
_next = null;
_checkedNext = false;
}
@@ -107,7 +117,7 @@ public class TombstoneSkippingIterator<K extends Comparable<K>, V> extends Rever
}
@Override
protected Pair<K, V> nextImpl() {
protected Pair<K, V_T> nextImpl() {
if (!_checkedNext)
fillNext();
@@ -127,6 +137,7 @@ public class TombstoneSkippingIterator<K extends Comparable<K>, V> extends Rever
@Override
public String toString() {
return "PredicateKvIterator{" +
"_backing=" + _backing +
", _next=" + _next +
'}';
}

View File

@@ -0,0 +1,36 @@
package com.usatiuk.objects.iterators;
import java.util.List;
public abstract class TombstoneMergingKvIterator {
public static <K extends Comparable<K>, V> CloseableKvIterator<K, V> of(String name, IteratorStart startType, K startKey, List<IterProdFn<K, MaybeTombstone<V>>> iterators) {
return new PredicateKvIterator<K, MaybeTombstone<V>, V>(
new MergingKvIterator<K, MaybeTombstone<V>>(name + "-merging", startType, startKey, iterators),
startType, startKey,
pair -> {
// Log.tracev("{0} - Processing pair {1}", name, pair);
if (pair instanceof Tombstone<V>) {
return null;
}
return ((Data<V>) pair).value();
});
}
public static <K extends Comparable<K>, V> CloseableKvIterator<K, V> of(String name, IteratorStart startType, K startKey, IterProdFn<K, MaybeTombstone<V>>... iterators) {
return of(name, startType, startKey, List.of(iterators));
}
public static <K extends Comparable<K>, V> CloseableKvIterator<K, V> of(String name, IteratorStart startType, K startKey, IterProdFn2<K, V> itProd) {
return new PredicateKvIterator<K, MaybeTombstone<V>, V>(
new MergingKvIterator<K, MaybeTombstone<V>>(name + "-merging", startType, startKey, (IterProdFn2<K, MaybeTombstone<V>>) itProd),
startType, startKey,
pair -> {
// Log.tracev("{0} - Processing pair {1}", name, pair);
if (pair instanceof Tombstone<V>) {
return null;
}
return ((Data<V>) pair).value();
});
}
}

View File

@@ -7,12 +7,11 @@ import com.usatiuk.objects.iterators.Tombstone;
import com.usatiuk.utils.AutoCloseableNoThrow;
import javax.annotation.Nonnull;
import java.util.List;
import java.util.Optional;
import java.util.stream.Stream;
public interface Snapshot<K extends Comparable<K>, V> extends AutoCloseableNoThrow {
List<CloseableKvIterator<K, MaybeTombstone<V>>> getIterator(IteratorStart start, K key);
Stream<CloseableKvIterator<K, MaybeTombstone<V>>> getIterator(IteratorStart start, K key);
@Nonnull
Optional<V> readObject(K name);

View File

@@ -5,7 +5,6 @@ import com.usatiuk.objects.JDataVersionedWrapperLazy;
import com.usatiuk.objects.JObjectKey;
import com.usatiuk.objects.iterators.*;
import com.usatiuk.objects.snapshot.Snapshot;
import com.usatiuk.utils.ListUtils;
import io.quarkus.logging.Log;
import io.quarkus.runtime.StartupEvent;
import jakarta.annotation.Priority;
@@ -17,12 +16,12 @@ import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.pcollections.TreePMap;
import javax.annotation.Nonnull;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Stream;
@ApplicationScoped
public class CachingObjectPersistentStore {
@@ -153,11 +152,10 @@ public class CachingObjectPersistentStore {
}
@Override
public List<CloseableKvIterator<JObjectKey, MaybeTombstone<JDataVersionedWrapper>>> getIterator(IteratorStart start, JObjectKey key) {
return ListUtils.prependAndMap(
new NavigableMapKvIterator<JObjectKey, MaybeTombstone<JDataVersionedWrapper>>(_curCache.map(), start, key),
_backing.getIterator(start, key),
i -> new CachingKvIterator((CloseableKvIterator<JObjectKey, JDataVersionedWrapper>) (CloseableKvIterator<JObjectKey, ?>) i)
public Stream<CloseableKvIterator<JObjectKey, MaybeTombstone<JDataVersionedWrapper>>> getIterator(IteratorStart start, JObjectKey key) {
return Stream.concat(
Stream.of(new NavigableMapKvIterator<JObjectKey, MaybeTombstone<JDataVersionedWrapper>>(_curCache.map(), start, key)),
_backing.getIterator(start, key).map(i -> new CachingKvIterator((CloseableKvIterator<JObjectKey, JDataVersionedWrapper>) (CloseableKvIterator<JObjectKey, ?>) i))
);
}

View File

@@ -23,7 +23,6 @@ import java.lang.ref.Cleaner;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.stream.Stream;
@@ -111,9 +110,9 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore {
private boolean _closed = false;
@Override
public List<CloseableKvIterator<JObjectKey, MaybeTombstone<ByteBuffer>>> getIterator(IteratorStart start, JObjectKey key) {
public Stream<CloseableKvIterator<JObjectKey, MaybeTombstone<ByteBuffer>>> getIterator(IteratorStart start, JObjectKey key) {
assert !_closed;
return List.of(new KeyPredicateKvIterator<>(new LmdbKvIterator(_txn, start, key), start, key, (k) -> !k.value().equals(DB_VER_OBJ_NAME_STR)));
return Stream.of(new KeyPredicateKvIterator<>(new LmdbKvIterator(_txn, start, key), start, key, (k) -> !k.value().equals(DB_VER_OBJ_NAME_STR)));
}
@Nonnull

View File

@@ -10,7 +10,6 @@ import org.pcollections.TreePMap;
import javax.annotation.Nonnull;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Stream;
@@ -30,8 +29,8 @@ public class MemoryObjectPersistentStore implements ObjectPersistentStore {
private final long _lastCommitId = MemoryObjectPersistentStore.this._lastCommitId;
@Override
public List<CloseableKvIterator<JObjectKey, MaybeTombstone<ByteBuffer>>> getIterator(IteratorStart start, JObjectKey key) {
return List.of(new MappingKvIterator<>(new NavigableMapKvIterator<>(_objects, start, key), s -> new DataWrapper<>(s.asReadOnlyByteBuffer())));
public Stream<CloseableKvIterator<JObjectKey, MaybeTombstone<ByteBuffer>>> getIterator(IteratorStart start, JObjectKey key) {
return Stream.of(new MappingKvIterator<>(new NavigableMapKvIterator<>(_objects, start, key), s -> new DataWrapper<>(s.asReadOnlyByteBuffer())));
}
@Nonnull

View File

@@ -5,14 +5,12 @@ import com.usatiuk.objects.JDataVersionedWrapperSerializer;
import com.usatiuk.objects.JObjectKey;
import com.usatiuk.objects.iterators.*;
import com.usatiuk.objects.snapshot.Snapshot;
import com.usatiuk.utils.ListUtils;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import org.apache.commons.lang3.tuple.Pair;
import javax.annotation.Nonnull;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Optional;
import java.util.stream.Stream;
@@ -29,10 +27,9 @@ public class SerializingObjectPersistentStore {
private final Snapshot<JObjectKey, ByteBuffer> _backing = delegateStore.getSnapshot();
@Override
public List<CloseableKvIterator<JObjectKey, MaybeTombstone<JDataVersionedWrapper>>> getIterator(IteratorStart start, JObjectKey key) {
return ListUtils.map(_backing.getIterator(start, key),
i -> new MappingKvIterator<JObjectKey, MaybeTombstone<ByteBuffer>, MaybeTombstone<JDataVersionedWrapper>>(i,
d -> serializer.deserialize(((DataWrapper<ByteBuffer>) d).value())));
public Stream<CloseableKvIterator<JObjectKey, MaybeTombstone<JDataVersionedWrapper>>> getIterator(IteratorStart start, JObjectKey key) {
return _backing.getIterator(start, key).map(i -> new MappingKvIterator<JObjectKey, MaybeTombstone<ByteBuffer>, MaybeTombstone<JDataVersionedWrapper>>(i,
d -> serializer.deserialize(((DataWrapper<ByteBuffer>) d).value())));
}
@Nonnull

View File

@@ -3,11 +3,13 @@ package com.usatiuk.objects.stores;
import com.usatiuk.objects.JDataVersionedWrapper;
import com.usatiuk.objects.JDataVersionedWrapperImpl;
import com.usatiuk.objects.JObjectKey;
import com.usatiuk.objects.iterators.*;
import com.usatiuk.objects.iterators.CloseableKvIterator;
import com.usatiuk.objects.iterators.IteratorStart;
import com.usatiuk.objects.iterators.MaybeTombstone;
import com.usatiuk.objects.iterators.NavigableMapKvIterator;
import com.usatiuk.objects.snapshot.Snapshot;
import com.usatiuk.objects.transaction.TxCommitException;
import com.usatiuk.objects.transaction.TxRecord;
import com.usatiuk.utils.ListUtils;
import io.quarkus.logging.Log;
import io.quarkus.runtime.ShutdownEvent;
import io.quarkus.runtime.StartupEvent;
@@ -25,41 +27,27 @@ import javax.annotation.Nonnull;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import java.util.stream.Stream;
@ApplicationScoped
public class WritebackObjectPersistentStore {
private final LinkedList<TxBundle> _pendingBundles = new LinkedList<>();
private final LinkedHashMap<Long, TxBundle> _notFlushedBundles = new LinkedHashMap<>();
private final AtomicReference<PendingWriteData> _pendingWrites = new AtomicReference<>(null);
private final Object _flushWaitSynchronizer = new Object();
private final AtomicLong _lastWrittenId = new AtomicLong(-1);
private final AtomicLong _lastCommittedId = new AtomicLong();
private final AtomicLong _waitedTotal = new AtomicLong(0);
@Inject
CachingObjectPersistentStore cachedStore;
@Inject
ExecutorService _callbackExecutor;
@ConfigProperty(name = "dhfs.objects.writeback.limit")
int sizeLimit;
private TxBundle _pendingBundle = null;
private int _curSize = 0;
private final AtomicReference<PendingWriteData> _pendingWrites = new AtomicReference<>(null);
private final ReentrantLock _pendingBundleLock = new ReentrantLock();
private final Condition _newBundleCondition = _pendingBundleLock.newCondition();
private final Condition _flushCondition = _pendingBundleLock.newCondition();
private final AtomicLong _lastFlushedId = new AtomicLong(-1);
private final AtomicLong _lastCommittedId = new AtomicLong(-1);
private final AtomicLong _waitedTotal = new AtomicLong(0);
long sizeLimit;
private long currentSize = 0;
private ExecutorService _writebackExecutor;
private ExecutorService _statusExecutor;
private volatile boolean _ready = false;
void init(@Observes @Priority(120) StartupEvent event) {
@@ -77,8 +65,8 @@ public class WritebackObjectPersistentStore {
try {
while (true) {
Thread.sleep(1000);
if (_curSize > 0)
Log.info("Tx commit status: size=" + _curSize / 1024 / 1024 + "MB");
if (currentSize > 0)
Log.info("Tx commit status: size=" + currentSize / 1024 / 1024 + "MB");
}
} catch (InterruptedException ignored) {
}
@@ -88,7 +76,7 @@ public class WritebackObjectPersistentStore {
lastTxId = s.id();
}
_lastCommittedId.set(lastTxId);
_lastFlushedId.set(lastTxId);
_lastWrittenId.set(lastTxId);
_pendingWrites.set(new PendingWriteData(TreePMap.empty(), lastTxId, lastTxId));
_ready = true;
}
@@ -96,14 +84,11 @@ public class WritebackObjectPersistentStore {
void shutdown(@Observes @Priority(890) ShutdownEvent event) throws InterruptedException {
Log.info("Waiting for all transactions to drain");
_ready = false;
_pendingBundleLock.lock();
try {
while (_curSize > 0) {
_flushCondition.await();
synchronized (_flushWaitSynchronizer) {
_ready = false;
while (currentSize > 0) {
_flushWaitSynchronizer.wait();
}
} finally {
_pendingBundleLock.unlock();
}
_writebackExecutor.shutdownNow();
@@ -117,19 +102,21 @@ public class WritebackObjectPersistentStore {
private void writeback() {
while (!Thread.interrupted()) {
try {
TxBundle bundle;
_pendingBundleLock.lock();
try {
while (_pendingBundle == null)
_newBundleCondition.await();
bundle = _pendingBundle;
_pendingBundle = null;
TxBundle bundle = new TxBundle(0);
synchronized (_pendingBundles) {
while (_pendingBundles.isEmpty() || !_pendingBundles.peek()._ready)
_pendingBundles.wait();
_curSize -= bundle.size();
assert _curSize == 0;
_flushCondition.signal();
} finally {
_pendingBundleLock.unlock();
long diff = 0;
while (!_pendingBundles.isEmpty() && _pendingBundles.peek()._ready) {
var toCompress = _pendingBundles.poll();
diff -= toCompress.size();
bundle.compress(toCompress);
}
diff += bundle.size();
synchronized (_flushWaitSynchronizer) {
currentSize += diff;
}
}
var toWrite = new ArrayList<Pair<JObjectKey, JDataVersionedWrapper>>();
@@ -149,12 +136,15 @@ public class WritebackObjectPersistentStore {
}
}
cachedStore.commitTx(new TxManifestObj<>(toWrite, toDelete), bundle.id());
cachedStore.commitTx(
new TxManifestObj<>(
Collections.unmodifiableList(toWrite),
Collections.unmodifiableList(toDelete)
), bundle.id());
Log.tracev("Bundle {0} committed", bundle.id());
_pendingBundleLock.lock();
try {
synchronized (_pendingWrites) {
var curPw = _pendingWrites.get();
var curPwMap = curPw.pendingWrites();
for (var e : bundle._entries.values()) {
@@ -168,15 +158,23 @@ public class WritebackObjectPersistentStore {
curPw.lastCommittedId()
);
_pendingWrites.compareAndSet(curPw, newCurPw);
} finally {
_pendingBundleLock.unlock();
}
_lastFlushedId.set(bundle.id());
var callbacks = bundle.callbacks();
_callbackExecutor.submit(() -> {
callbacks.forEach(Runnable::run);
});
List<List<Runnable>> callbacks = new ArrayList<>();
synchronized (_notFlushedBundles) {
_lastWrittenId.set(bundle.id());
while (!_notFlushedBundles.isEmpty() && _notFlushedBundles.firstEntry().getKey() <= bundle.id()) {
callbacks.add(_notFlushedBundles.pollFirstEntry().getValue().setCommitted());
}
}
callbacks.forEach(l -> l.forEach(Runnable::run));
synchronized (_flushWaitSynchronizer) {
currentSize -= bundle.size();
// FIXME:
if (currentSize <= sizeLimit || !_ready)
_flushWaitSynchronizer.notifyAll();
}
} catch (InterruptedException ignored) {
} catch (Exception e) {
Log.error("Uncaught exception in writeback", e);
@@ -189,96 +187,122 @@ public class WritebackObjectPersistentStore {
public long commitBundle(Collection<TxRecord.TxObjectRecord<?>> writes) {
verifyReady();
_pendingBundleLock.lock();
try {
boolean shouldWake = false;
if (_curSize > sizeLimit) {
shouldWake = true;
long started = System.currentTimeMillis();
while (_curSize > sizeLimit)
_flushCondition.await();
long waited = System.currentTimeMillis() - started;
_waitedTotal.addAndGet(waited);
if (Log.isTraceEnabled())
Log.tracev("Thread {0} waited for tx bundle for {1} ms", Thread.currentThread().getName(), waited);
}
var oursId = _lastCommittedId.incrementAndGet();
var curBundle = _pendingBundle;
int oldSize = 0;
if (curBundle != null) {
oldSize = curBundle.size();
curBundle.setId(oursId);
} else {
curBundle = new TxBundle(oursId);
}
var curPw = _pendingWrites.get();
var curPwMap = curPw.pendingWrites();
for (var action : writes) {
switch (action) {
case TxRecord.TxObjectRecordWrite<?> write -> {
// Log.tracev("Flushing object {0}", write.key());
var wrapper = new JDataVersionedWrapperImpl(write.data(), oursId);
curPwMap = curPwMap.plus(write.key(), new PendingWrite(wrapper, oursId));
curBundle.commit(wrapper);
}
case TxRecord.TxObjectRecordDeleted deleted -> {
// Log.tracev("Deleting object {0}", deleted.key());
curPwMap = curPwMap.plus(deleted.key(), new PendingDelete(deleted.key(), oursId));
curBundle.delete(deleted.key());
boolean wait = false;
while (true) {
if (wait) {
synchronized (_flushWaitSynchronizer) {
long started = System.currentTimeMillis();
while (currentSize > sizeLimit) {
try {
_flushWaitSynchronizer.wait();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
long waited = System.currentTimeMillis() - started;
_waitedTotal.addAndGet(waited);
if (Log.isTraceEnabled())
Log.tracev("Thread {0} waited for tx bundle for {1} ms", Thread.currentThread().getName(), waited);
wait = false;
}
}
// Now, make the changes visible to new iterators
var newCurPw = new PendingWriteData(
curPwMap,
curPw.lastFlushedId(),
oursId
);
_pendingWrites.compareAndSet(curPw, newCurPw);
synchronized (_pendingBundles) {
synchronized (_flushWaitSynchronizer) {
if (currentSize > sizeLimit) {
if (!_pendingBundles.isEmpty() && _pendingBundles.peek()._ready) {
var target = _pendingBundles.poll();
_pendingBundle = curBundle;
_newBundleCondition.signalAll();
long diff = -target.size();
while (!_pendingBundles.isEmpty() && _pendingBundles.peek()._ready) {
var toCompress = _pendingBundles.poll();
diff -= toCompress.size();
target.compress(toCompress);
}
diff += target.size();
currentSize += diff;
_pendingBundles.addFirst(target);
}
}
_curSize += (curBundle.size() - oldSize);
if (currentSize > sizeLimit) {
wait = true;
continue;
}
}
if (shouldWake && _curSize < sizeLimit) {
_flushCondition.signal();
TxBundle bundle;
synchronized (_notFlushedBundles) {
bundle = new TxBundle(_lastCommittedId.incrementAndGet());
_pendingBundles.addLast(bundle);
_notFlushedBundles.put(bundle.id(), bundle);
}
for (var action : writes) {
switch (action) {
case TxRecord.TxObjectRecordWrite<?> write -> {
Log.tracev("Flushing object {0}", write.key());
bundle.commit(new JDataVersionedWrapperImpl(write.data(), bundle.id()));
}
case TxRecord.TxObjectRecordDeleted deleted -> {
Log.tracev("Deleting object {0}", deleted.key());
bundle.delete(deleted.key());
}
default -> {
throw new TxCommitException("Unexpected value: " + action.key());
}
}
}
synchronized (_pendingWrites) {
var curPw = _pendingWrites.get();
var curPwMap = curPw.pendingWrites();
for (var e : ((TxBundle) bundle)._entries.values()) {
switch (e) {
case TxBundle.CommittedEntry c -> {
curPwMap = curPwMap.plus(c.key(), new PendingWrite(c.data, bundle.id()));
}
case TxBundle.DeletedEntry d -> {
curPwMap = curPwMap.plus(d.key(), new PendingDelete(d.key, bundle.id()));
}
default -> throw new IllegalStateException("Unexpected value: " + e);
}
}
// Now, make the changes visible to new iterators
var newCurPw = new PendingWriteData(
curPwMap,
curPw.lastFlushedId(),
bundle.id()
);
_pendingWrites.compareAndSet(curPw, newCurPw);
}
((TxBundle) bundle).setReady();
if (_pendingBundles.peek() == bundle)
_pendingBundles.notify();
synchronized (_flushWaitSynchronizer) {
currentSize += ((TxBundle) bundle).size();
}
return bundle.id();
}
return oursId;
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
_pendingBundleLock.unlock();
}
}
public void asyncFence(long bundleId, Runnable fn) {
verifyReady();
if (bundleId < 0) throw new IllegalArgumentException("txId should be >0!");
if (_lastFlushedId.get() >= bundleId) {
if (_lastWrittenId.get() >= bundleId) {
fn.run();
return;
}
_pendingBundleLock.lock();
try {
if (_lastFlushedId.get() >= bundleId) {
synchronized (_notFlushedBundles) {
if (_lastWrittenId.get() >= bundleId) {
fn.run();
return;
}
var pendingBundle = _pendingBundle;
if (pendingBundle == null) {
fn.run();
return;
}
pendingBundle.addCallback(fn);
} finally {
_pendingBundleLock.unlock();
_notFlushedBundles.get(bundleId).addCallback(fn);
}
}
@@ -319,8 +343,8 @@ public class WritebackObjectPersistentStore {
private final long txId = finalPw.lastCommittedId();
@Override
public List<CloseableKvIterator<JObjectKey, MaybeTombstone<JDataVersionedWrapper>>> getIterator(IteratorStart start, JObjectKey key) {
return ListUtils.prepend(new NavigableMapKvIterator<>(_pendingWrites, start, key), _cache.getIterator(start, key));
public Stream<CloseableKvIterator<JObjectKey, MaybeTombstone<JDataVersionedWrapper>>> getIterator(IteratorStart start, JObjectKey key) {
return Stream.concat(Stream.of(new NavigableMapKvIterator<>(_pendingWrites, start, key)), _cache.getIterator(start, key));
}
@Nonnull
@@ -357,6 +381,9 @@ public class WritebackObjectPersistentStore {
}
}
public interface VerboseReadResult {
}
private record PendingWriteData(TreePMap<JObjectKey, PendingWriteEntry> pendingWrites,
long lastFlushedId,
long lastCommittedId) {
@@ -365,32 +392,35 @@ public class WritebackObjectPersistentStore {
private static class TxBundle {
private final HashMap<JObjectKey, BundleEntry> _entries = new HashMap<>();
private final ArrayList<Runnable> _callbacks = new ArrayList<>();
private int _size = 0;
private long _txId;
ArrayList<Runnable> callbacks() {
return _callbacks;
}
private volatile boolean _ready = false;
private long _size = 0;
private boolean _wasCommitted = false;
private TxBundle(long txId) {
_txId = txId;
}
public void setId(long id) {
_txId = id;
}
public long id() {
return _txId;
}
public void setReady() {
_ready = true;
}
public void addCallback(Runnable callback) {
_callbacks.add(callback);
synchronized (_callbacks) {
if (_wasCommitted) throw new IllegalStateException();
_callbacks.add(callback);
}
}
public int size() {
return _size;
public List<Runnable> setCommitted() {
synchronized (_callbacks) {
_wasCommitted = true;
return Collections.unmodifiableList(_callbacks);
}
}
private void putEntry(BundleEntry entry) {
@@ -409,7 +439,28 @@ public class WritebackObjectPersistentStore {
putEntry(new DeletedEntry(obj));
}
private sealed interface BundleEntry permits CommittedEntry, DeletedEntry {
public long size() {
return _size;
}
public void compress(TxBundle other) {
if (_txId >= other._txId)
throw new IllegalArgumentException("Compressing an older bundle into newer");
_txId = other._txId;
for (var entry : other._entries.values()) {
putEntry(entry);
}
synchronized (_callbacks) {
assert !_wasCommitted;
assert !other._wasCommitted;
_callbacks.addAll(other._callbacks);
}
}
private interface BundleEntry {
JObjectKey key();
int size();
@@ -427,4 +478,10 @@ public class WritebackObjectPersistentStore {
}
}
}
public record VerboseReadResultPersisted(Optional<JDataVersionedWrapper> data) implements VerboseReadResult {
}
public record VerboseReadResultPending(PendingWriteEntry pending) implements VerboseReadResult {
}
}

View File

@@ -59,89 +59,97 @@ public class JObjectManager {
verifyReady();
var writes = new HashMap<JObjectKey, TxRecord.TxObjectRecord<?>>();
Snapshot<JObjectKey, JDataVersionedWrapper> commitSnapshot = null;
Map<JObjectKey, Optional<JDataVersionedWrapper>> readSet = null;
Map<JObjectKey, TransactionObject<?>> readSet = null;
Collection<AutoCloseableNoThrow> toUnlock = null;
try {
long pendingCount = 0;
List<CommitHookIterationData> hookIterationData;
{
CommitHookIterationData[] hookIterationDataArray = new CommitHookIterationData[_preCommitTxHooks.size()];
for (int i = 0; i < _preCommitTxHooks.size(); i++) {
var hook = _preCommitTxHooks.get(i);
hookIterationDataArray[i] = new CommitHookIterationData(hook, new HashMap<>(), new HashMap<>());
try {
long pendingCount = 0;
List<CommitHookIterationData> hookIterationData;
{
CommitHookIterationData[] hookIterationDataArray = new CommitHookIterationData[_preCommitTxHooks.size()];
for (int i = 0; i < _preCommitTxHooks.size(); i++) {
var hook = _preCommitTxHooks.get(i);
hookIterationDataArray[i] = new CommitHookIterationData(hook, new HashMap<>(), new HashMap<>());
}
hookIterationData = List.of(hookIterationDataArray);
}
hookIterationData = List.of(hookIterationDataArray);
}
for (var n : tx.drainNewWrites()) {
for (var hookPut : hookIterationData) {
hookPut.pendingWrites().put(n.key(), n);
pendingCount++;
for (var n : tx.drainNewWrites()) {
for (var hookPut : hookIterationData) {
hookPut.pendingWrites().put(n.key(), n);
pendingCount++;
}
writes.put(n.key(), n);
}
writes.put(n.key(), n);
}
// Run hooks for all objects
// Every hook should see every change made to every object, yet the object's evolution
// should be consistent from the view point of each individual hook
// For example, when a hook makes changes to an object, and another hook changes the object before/after it
// on the next iteration, the first hook should receive the version of the object it had created
// as the "old" version, and the new version with all the changes after it.
while (pendingCount > 0) {
for (var hookId : hookIterationData) {
var hook = hookId.hook();
var lastCurHookSeen = hookId.lastWrites();
Function<JObjectKey, JData> getPrev =
key -> switch (lastCurHookSeen.get(key)) {
case TxRecord.TxObjectRecordWrite<?> write -> write.data();
case TxRecord.TxObjectRecordDeleted deleted -> null;
case null -> tx.getFromSource(JData.class, key).orElse(null);
default -> {
throw new TxCommitException("Unexpected value: " + writes.get(key));
}
};
// Run hooks for all objects
// Every hook should see every change made to every object, yet the object's evolution
// should be consistent from the view point of each individual hook
// For example, when a hook makes changes to an object, and another hook changes the object before/after it
// on the next iteration, the first hook should receive the version of the object it had created
// as the "old" version, and the new version with all the changes after it.
while (pendingCount > 0) {
for (var hookId : hookIterationData) {
var hook = hookId.hook();
var lastCurHookSeen = hookId.lastWrites();
Function<JObjectKey, JData> getPrev =
key -> switch (lastCurHookSeen.get(key)) {
case TxRecord.TxObjectRecordWrite<?> write -> write.data();
case TxRecord.TxObjectRecordDeleted deleted -> null;
case null -> tx.getFromSource(JData.class, key).orElse(null);
default -> {
throw new TxCommitException("Unexpected value: " + writes.get(key));
}
};
var curIteration = hookId.pendingWrites();
var curIteration = hookId.pendingWrites();
// Log.trace("Commit iteration with " + curIteration.size() + " records for hook " + hook.getClass());
for (var entry : curIteration.entrySet()) {
for (var entry : curIteration.entrySet()) {
// Log.trace("Running pre-commit hook " + hook.getClass() + " for" + entry.getKey());
var oldObj = getPrev.apply(entry.getKey());
lastCurHookSeen.put(entry.getKey(), entry.getValue());
switch (entry.getValue()) {
case TxRecord.TxObjectRecordWrite<?> write -> {
if (oldObj == null) {
hook.onCreate(write.key(), write.data());
} else {
hook.onChange(write.key(), oldObj, write.data());
var oldObj = getPrev.apply(entry.getKey());
lastCurHookSeen.put(entry.getKey(), entry.getValue());
switch (entry.getValue()) {
case TxRecord.TxObjectRecordWrite<?> write -> {
if (oldObj == null) {
hook.onCreate(write.key(), write.data());
} else {
hook.onChange(write.key(), oldObj, write.data());
}
}
case TxRecord.TxObjectRecordDeleted deleted -> {
hook.onDelete(deleted.key(), oldObj);
}
default -> throw new TxCommitException("Unexpected value: " + entry);
}
case TxRecord.TxObjectRecordDeleted deleted -> {
hook.onDelete(deleted.key(), oldObj);
}
default -> throw new TxCommitException("Unexpected value: " + entry);
}
}
pendingCount -= curIteration.size();
curIteration.clear();
pendingCount -= curIteration.size();
curIteration.clear();
for (var n : tx.drainNewWrites()) {
for (var hookPut : hookIterationData) {
if (hookPut == hookId) {
lastCurHookSeen.put(n.key(), n);
continue;
for (var n : tx.drainNewWrites()) {
for (var hookPut : hookIterationData) {
if (hookPut == hookId) {
lastCurHookSeen.put(n.key(), n);
continue;
}
var before = hookPut.pendingWrites().put(n.key(), n);
if (before == null)
pendingCount++;
}
var before = hookPut.pendingWrites().put(n.key(), n);
if (before == null)
pendingCount++;
writes.put(n.key(), n);
}
writes.put(n.key(), n);
}
}
} catch (Throwable e) {
for (var read : tx.reads().entrySet()) {
if (read.getValue() instanceof TransactionObjectLocked<?> locked) {
locked.lock().close();
}
}
throw e;
}
readSet = tx.reads();
@@ -150,16 +158,18 @@ public class JObjectManager {
toUnlock = new ArrayList<>(readSet.size() + writes.size());
ArrayList<JObjectKey> toLock = new ArrayList<>(readSet.size() + writes.size());
for (var read : readSet.entrySet()) {
toLock.add(read.getKey());
if (read.getValue() instanceof TransactionObjectLocked<?> locked) {
toUnlock.add(locked.lock());
} else {
toLock.add(read.getKey());
}
}
for (var write : writes.keySet()) {
if (!readSet.containsKey(write))
toLock.add(write);
for (var write : writes.entrySet()) {
if (!readSet.containsKey(write.getKey()))
toLock.add(write.getKey());
}
toLock.sort(null);
Collections.sort(toLock);
for (var key : toLock) {
if (tx.knownNew().contains(key))
continue;
var lock = lockManager.lockObject(key);
toUnlock.add(lock);
}
@@ -171,7 +181,10 @@ public class JObjectManager {
long version = 0L;
for (var read : readSet.values()) {
version = Math.max(version, read.map(JDataVersionedWrapper::version).orElse(0L));
version = Math.max(version, read.data().map(JDataVersionedWrapper::version).orElse(0L));
if (read instanceof TransactionObjectLocked<?> locked) {
locked.lock().close();
}
}
long finalVersion = version;
@@ -179,16 +192,14 @@ public class JObjectManager {
writebackObjectPersistentStore.asyncFence(finalVersion, r);
};
var onCommit = tx.getOnCommit();
var onFlush = tx.getOnFlush();
return Pair.of(
List.of(() -> {
for (var f : onCommit)
f.run();
for (var f : onFlush)
fenceFn.accept(f);
}),
Stream.concat(
tx.getOnCommit().stream(),
Stream.<Runnable>of(() -> {
for (var f : tx.getOnFlush())
fenceFn.accept(f);
})
).toList(),
new TransactionHandle() {
@Override
public void onFlush(Runnable runnable) {
@@ -204,13 +215,13 @@ public class JObjectManager {
for (var read : readSet.entrySet()) {
var current = commitSnapshot.readObject(read.getKey());
if (current.isEmpty() != read.getValue().isEmpty()) {
if (current.isEmpty() != read.getValue().data().isEmpty()) {
Log.tracev("Checking read dependency {0} - not found", read.getKey());
throw new TxCommitException("Serialization hazard: " + current.isEmpty() + " vs " + read.getValue().isEmpty());
throw new TxCommitException("Serialization hazard: " + current.isEmpty() + " vs " + read.getValue().data().isEmpty());
}
if (current.isEmpty()) {
// Every write gets a dependency due to hooks
// TODO: Every write gets a dependency due to hooks
continue;
// assert false;
// throw new TxCommitException("Serialization hazard: " + dep.isEmpty() + " vs " + read.getValue().value().isEmpty());
@@ -235,7 +246,7 @@ public class JObjectManager {
}
return Pair.of(
tx.getOnCommit(),
List.copyOf(tx.getOnCommit()),
new TransactionHandle() {
@Override
public void onFlush(Runnable runnable) {
@@ -258,6 +269,11 @@ public class JObjectManager {
public void rollback(TransactionPrivate tx) {
verifyReady();
tx.reads().forEach((key, value) -> {
if (value instanceof TransactionObjectLocked<?> locked) {
locked.lock().close();
}
});
tx.close();
}

View File

@@ -6,7 +6,6 @@ import com.usatiuk.objects.JObjectKey;
import com.usatiuk.objects.iterators.*;
import com.usatiuk.objects.snapshot.Snapshot;
import com.usatiuk.objects.stores.WritebackObjectPersistentStore;
import com.usatiuk.utils.ListUtils;
import io.quarkus.logging.Log;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
@@ -14,6 +13,7 @@ import org.apache.commons.lang3.tuple.Pair;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import java.util.*;
import java.util.stream.Stream;
@Singleton
public class TransactionFactoryImpl implements TransactionFactory {
@@ -56,10 +56,10 @@ public class TransactionFactoryImpl implements TransactionFactory {
}
private class TransactionImpl implements TransactionPrivate {
private final Map<JObjectKey, Optional<JDataVersionedWrapper>> _readSet = new HashMap<>();
private final Map<JObjectKey, TransactionObject<?>> _readSet = new HashMap<>();
private final NavigableMap<JObjectKey, TxRecord.TxObjectRecord<?>> _writes = new TreeMap<>();
private final List<Runnable> _onCommit = new LinkedList<>();
private final List<Runnable> _onFlush = new LinkedList<>();
private final List<Runnable> _onCommit = new ArrayList<>();
private final List<Runnable> _onFlush = new ArrayList<>();
private final HashSet<JObjectKey> _knownNew = new HashSet<>();
private final Snapshot<JObjectKey, JDataVersionedWrapper> _snapshot;
private boolean _closed = false;
@@ -99,14 +99,30 @@ public class TransactionFactoryImpl implements TransactionFactory {
if (_knownNew.contains(key)) {
return Optional.empty();
}
var got = _readSet.computeIfAbsent(key, k -> {
var read = _snapshot.readObject(k);
return read;
});
if (got.isEmpty())
return Optional.empty();
var gotData = got.get();
return Optional.of(type.cast(gotData.data()));
return _readSet.computeIfAbsent(key, k -> {
var read = _snapshot.readObject(k);
return new TransactionObjectNoLock<>(read);
})
.data()
.map(w -> type.cast(w.data()));
}
public <T extends JData> Optional<T> getWriteLockedFromSource(Class<T> type, JObjectKey key) {
var got = _readSet.get(key);
if (got == null) {
var lock = lockManager.lockObject(key);
try {
var read = _snapshot.readObject(key);
_readSet.put(key, new TransactionObjectLocked<>(read, lock));
return read.map(JDataVersionedWrapper::data).map(type::cast);
} catch (Exception e) {
lock.close();
throw e;
}
}
return got.data().map(JDataVersionedWrapper::data).map(type::cast);
}
@Override
@@ -122,7 +138,13 @@ public class TransactionFactoryImpl implements TransactionFactory {
}
}
return getFromSource(type, key);
if (neverLock)
return getFromSource(type, key);
return switch (strategy) {
case OPTIMISTIC -> getFromSource(type, key);
case WRITE -> getWriteLockedFromSource(type, key);
};
}
@Override
@@ -141,30 +163,27 @@ public class TransactionFactoryImpl implements TransactionFactory {
@Override
public CloseableKvIterator<JObjectKey, JData> getIterator(IteratorStart start, JObjectKey key) {
Log.tracev("Getting tx iterator with start={0}, key={1}", start, key);
return new ReadTrackingIterator(new TombstoneSkippingIterator<JObjectKey, ReadTrackingInternalCrap>(start, key,
ListUtils.prependAndMap(
new MappingKvIterator<>(new NavigableMapKvIterator<>(_writes, start, key),
t -> switch (t) {
case TxRecord.TxObjectRecordWrite<?> write ->
new DataWrapper<ReadTrackingInternalCrap>(new ReadTrackingInternalCrapTx(write.data()));
case TxRecord.TxObjectRecordDeleted deleted ->
new TombstoneImpl<ReadTrackingInternalCrap>();
case null, default -> null;
}),
_snapshot.getIterator(start, key),
itin -> new MappingKvIterator<JObjectKey, MaybeTombstone<JDataVersionedWrapper>, MaybeTombstone<ReadTrackingInternalCrap>>(itin,
d -> switch (d) {
case Data<JDataVersionedWrapper> w ->
new DataWrapper<>(new ReadTrackingInternalCrapSource(w.value()));
case Tombstone<JDataVersionedWrapper> t -> new TombstoneImpl<>();
case null, default -> null;
}))));
return new ReadTrackingIterator(TombstoneMergingKvIterator.<JObjectKey, ReadTrackingInternalCrap>of("tx", start, key, (tS, tK) ->
Stream.concat(Stream.of(new MappingKvIterator<>(new NavigableMapKvIterator<>(_writes, tS, tK),
t -> switch (t) {
case TxRecord.TxObjectRecordWrite<?> write ->
new DataWrapper<ReadTrackingInternalCrap>(new ReadTrackingInternalCrapTx(write.data()));
case TxRecord.TxObjectRecordDeleted deleted ->
new TombstoneImpl<ReadTrackingInternalCrap>();
case null, default -> null;
})), _snapshot.getIterator(tS, tK).map(itin -> new MappingKvIterator<JObjectKey, MaybeTombstone<JDataVersionedWrapper>, MaybeTombstone<ReadTrackingInternalCrap>>(itin,
d -> switch (d) {
case Data<JDataVersionedWrapper> w ->
new DataWrapper<>(new ReadTrackingInternalCrapSource(w.value()));
case Tombstone<JDataVersionedWrapper> t -> new TombstoneImpl<>();
case null, default -> null;
})))));
}
@Override
public void put(JData obj) {
var read = _readSet.get(obj.key());
if (read != null && (read.map(JDataVersionedWrapper::data).orElse(null) == obj)) {
if (read != null && (read.data().map(JDataVersionedWrapper::data).orElse(null) == obj)) {
return;
}
@@ -188,13 +207,8 @@ public class TransactionFactoryImpl implements TransactionFactory {
}
@Override
public Map<JObjectKey, Optional<JDataVersionedWrapper>> reads() {
return _readSet;
}
@Override
public Set<JObjectKey> knownNew() {
return _knownNew;
public Map<JObjectKey, TransactionObject<?>> reads() {
return Collections.unmodifiableMap(_readSet);
}
@Override
@@ -230,7 +244,7 @@ public class TransactionFactoryImpl implements TransactionFactory {
public Pair<JObjectKey, JData> prev() {
var got = _backing.prev();
if (got.getValue() instanceof ReadTrackingInternalCrapSource(JDataVersionedWrapper wrapped)) {
_readSet.putIfAbsent(got.getKey(), Optional.of(wrapped));
_readSet.putIfAbsent(got.getKey(), new TransactionObjectNoLock<>(Optional.of(wrapped)));
}
return Pair.of(got.getKey(), got.getValue().obj());
}
@@ -259,7 +273,7 @@ public class TransactionFactoryImpl implements TransactionFactory {
public Pair<JObjectKey, JData> next() {
var got = _backing.next();
if (got.getValue() instanceof ReadTrackingInternalCrapSource(JDataVersionedWrapper wrapped)) {
_readSet.putIfAbsent(got.getKey(), Optional.of(wrapped));
_readSet.putIfAbsent(got.getKey(), new TransactionObjectNoLock<>(Optional.of(wrapped)));
}
return Pair.of(got.getKey(), got.getValue().obj());
}

View File

@@ -6,7 +6,6 @@ import jakarta.inject.Singleton;
import org.apache.commons.lang3.tuple.Pair;
import java.util.Collection;
import java.util.concurrent.ExecutorService;
@Singleton
public class TransactionManagerImpl implements TransactionManager {

View File

@@ -0,0 +1,10 @@
package com.usatiuk.objects.transaction;
import com.usatiuk.objects.JData;
import com.usatiuk.objects.JDataVersionedWrapper;
import java.util.Optional;
public interface TransactionObject<T extends JData> {
Optional<JDataVersionedWrapper> data();
}

View File

@@ -0,0 +1,12 @@
package com.usatiuk.objects.transaction;
import com.usatiuk.objects.JData;
import com.usatiuk.objects.JDataVersionedWrapper;
import com.usatiuk.utils.AutoCloseableNoThrow;
import java.util.Optional;
public record TransactionObjectLocked<T extends JData>
(Optional<JDataVersionedWrapper> data, AutoCloseableNoThrow lock)
implements TransactionObject<T> {
}

View File

@@ -0,0 +1,11 @@
package com.usatiuk.objects.transaction;
import com.usatiuk.objects.JData;
import com.usatiuk.objects.JDataVersionedWrapper;
import java.util.Optional;
public record TransactionObjectNoLock<T extends JData>
(Optional<JDataVersionedWrapper> data)
implements TransactionObject<T> {
}

View File

@@ -9,15 +9,12 @@ import com.usatiuk.utils.AutoCloseableNoThrow;
import java.util.Collection;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
// The transaction interface actually used by user code to retrieve objects
public interface TransactionPrivate extends Transaction, TransactionHandlePrivate, AutoCloseableNoThrow {
Collection<TxRecord.TxObjectRecord<?>> drainNewWrites();
Map<JObjectKey, Optional<JDataVersionedWrapper>> reads();
Set<JObjectKey> knownNew();
Map<JObjectKey, TransactionObject<?>> reads();
<T extends JData> Optional<T> getFromSource(Class<T> type, JObjectKey key);

View File

@@ -4,7 +4,7 @@ import com.usatiuk.objects.JData;
import com.usatiuk.objects.JObjectKey;
public class TxRecord {
public sealed interface TxObjectRecord<T> permits TxObjectRecordWrite, TxObjectRecordDeleted {
public interface TxObjectRecord<T> {
JObjectKey key();
}

View File

@@ -1,6 +1,5 @@
package com.usatiuk.objects.iterators;
import jnr.ffi.annotations.In;
import net.jqwik.api.*;
import net.jqwik.api.state.Action;
import net.jqwik.api.state.ActionChain;
@@ -10,7 +9,6 @@ import org.junit.jupiter.api.Assertions;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.function.BiConsumer;
public class MergingKvIteratorPbtTest {
@Property
@@ -56,8 +54,8 @@ public class MergingKvIteratorPbtTest {
}
}
mergedIterator = new NavigableMapKvIterator<>(perfectMerged, startType, startKey);
mergingIterator = new MergingKvIterator<>(startType, startKey, pairs.stream().<CloseableKvIterator<Integer, Integer>>map(
list -> new NavigableMapKvIterator<>(new TreeMap<Integer, Integer>(Map.ofEntries(list.toArray(Map.Entry[]::new))), startType, startKey)
mergingIterator = new MergingKvIterator<>("test", startType, startKey, pairs.stream().<IterProdFn<Integer, Integer>>map(
list -> (IteratorStart start, Integer key) -> new NavigableMapKvIterator<>(new TreeMap<Integer, Integer>(Map.ofEntries(list.toArray(Map.Entry[]::new))), start, key)
).toList());
}

View File

@@ -0,0 +1,348 @@
package com.usatiuk.objects.iterators;
import com.usatiuk.objects.Just;
import org.apache.commons.lang3.tuple.Pair;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.pcollections.TreePMap;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
public class MergingKvIteratorTest {
@Test
public void testTestIterator() {
var list = List.of(Pair.of(1, 2), Pair.of(3, 4), Pair.of(5, 6));
var iterator = new SimpleIteratorWrapper<>(list.iterator());
var realIterator = list.iterator();
while (realIterator.hasNext()) {
Assertions.assertTrue(iterator.hasNext());
Assertions.assertEquals(realIterator.next(), iterator.next());
}
Assertions.assertFalse(iterator.hasNext());
var emptyList = List.<Pair<Integer, Integer>>of();
var emptyIterator = new SimpleIteratorWrapper<>(emptyList.iterator());
Assertions.assertFalse(emptyIterator.hasNext());
}
@Test
public void testSimple() {
var source1 = List.of(Pair.of(1, 2), Pair.of(3, 4), Pair.of(5, 6)).iterator();
var source2 = List.of(Pair.of(2, 3), Pair.of(4, 5), Pair.of(6, 7)).iterator();
var mergingIterator = new MergingKvIterator<>("test", IteratorStart.GE, 0, (a, b) -> new SimpleIteratorWrapper<>(source1), (a, b) -> new SimpleIteratorWrapper<>(source2));
var expected = List.of(Pair.of(1, 2), Pair.of(2, 3), Pair.of(3, 4), Pair.of(4, 5), Pair.of(5, 6), Pair.of(6, 7));
for (var pair : expected) {
Assertions.assertTrue(mergingIterator.hasNext());
Assertions.assertEquals(pair, mergingIterator.next());
}
}
@Test
public void testPriority() {
var source1 = List.of(Pair.of(1, 2), Pair.of(2, 4), Pair.of(5, 6));
var source2 = List.of(Pair.of(1, 3), Pair.of(2, 5), Pair.of(5, 7));
var mergingIterator = new MergingKvIterator<>("test", IteratorStart.GE, 0, (a, b) -> new SimpleIteratorWrapper<>(source1.iterator()), (a, b) -> new SimpleIteratorWrapper<>(source2.iterator()));
var expected = List.of(Pair.of(1, 2), Pair.of(2, 4), Pair.of(5, 6));
for (var pair : expected) {
Assertions.assertTrue(mergingIterator.hasNext());
Assertions.assertEquals(pair, mergingIterator.next());
}
Assertions.assertFalse(mergingIterator.hasNext());
var mergingIterator2 = new MergingKvIterator<>("test", IteratorStart.GE, 0, (a, b) -> new SimpleIteratorWrapper<>(source2.iterator()), (a, b) -> new SimpleIteratorWrapper<>(source1.iterator()));
var expected2 = List.of(Pair.of(1, 3), Pair.of(2, 5), Pair.of(5, 7));
for (var pair : expected2) {
Assertions.assertTrue(mergingIterator2.hasNext());
Assertions.assertEquals(pair, mergingIterator2.next());
}
Assertions.assertFalse(mergingIterator2.hasNext());
}
@Test
public void testPriority2() {
var source1 = List.of(Pair.of(2, 4), Pair.of(5, 6));
var source2 = List.of(Pair.of(1, 3), Pair.of(2, 5));
var mergingIterator = new MergingKvIterator<>("test", IteratorStart.GE, 0, (a, b) -> new SimpleIteratorWrapper<>(source1.iterator()), (a, b) -> new SimpleIteratorWrapper<>(source2.iterator()));
var expected = List.of(Pair.of(1, 3), Pair.of(2, 4), Pair.of(5, 6));
for (var pair : expected) {
Assertions.assertTrue(mergingIterator.hasNext());
Assertions.assertEquals(pair, mergingIterator.next());
}
Assertions.assertFalse(mergingIterator.hasNext());
var mergingIterator2 = new MergingKvIterator<>("test", IteratorStart.GE, 0, (a, b) -> new SimpleIteratorWrapper<>(source2.iterator()), (a, b) -> new SimpleIteratorWrapper<>(source1.iterator()));
var expected2 = List.of(Pair.of(1, 3), Pair.of(2, 5), Pair.of(5, 6));
for (var pair : expected2) {
Assertions.assertTrue(mergingIterator2.hasNext());
Assertions.assertEquals(pair, mergingIterator2.next());
}
Assertions.assertFalse(mergingIterator2.hasNext());
}
@Test
public void testPriorityLe() {
var source1 = TreePMap.<Integer, Integer>empty().plus(2, 4).plus(5, 6);
var source2 = TreePMap.<Integer, Integer>empty().plus(1, 3).plus(2, 5);
var mergingIterator = new MergingKvIterator<>("test", IteratorStart.LE, 5, (mS, mK) -> new NavigableMapKvIterator<>(source1, mS, mK), (mS, mK) -> new NavigableMapKvIterator<>(source2, mS, mK));
var expected = List.of(Pair.of(5, 6));
for (var pair : expected) {
Assertions.assertTrue(mergingIterator.hasNext());
Assertions.assertEquals(pair, mergingIterator.next());
}
Assertions.assertFalse(mergingIterator.hasNext());
Just.checkIterator(mergingIterator.reversed(), Pair.of(5, 6), Pair.of(2, 4), Pair.of(1, 3));
Assertions.assertFalse(mergingIterator.reversed().hasNext());
Just.checkIterator(mergingIterator, Pair.of(1, 3), Pair.of(2, 4), Pair.of(5, 6));
Assertions.assertFalse(mergingIterator.hasNext());
var mergingIterator2 = new MergingKvIterator<>("test", IteratorStart.LE, 5, (mS, mK) -> new NavigableMapKvIterator<>(source2, mS, mK), (mS, mK) -> new NavigableMapKvIterator<>(source1, mS, mK));
var expected2 = List.of(Pair.of(5, 6));
for (var pair : expected2) {
Assertions.assertTrue(mergingIterator2.hasNext());
Assertions.assertEquals(pair, mergingIterator2.next());
}
Assertions.assertFalse(mergingIterator2.hasNext());
Just.checkIterator(mergingIterator2.reversed(), Pair.of(5, 6), Pair.of(2, 5), Pair.of(1, 3));
Assertions.assertFalse(mergingIterator2.reversed().hasNext());
Just.checkIterator(mergingIterator2, Pair.of(1, 3), Pair.of(2, 5), Pair.of(5, 6));
Assertions.assertFalse(mergingIterator2.hasNext());
var mergingIterator3 = new MergingKvIterator<>("test", IteratorStart.LE, 5, (mS, mK) -> new NavigableMapKvIterator<>(source1, mS, mK), (mS, mK) -> new NavigableMapKvIterator<>(source2, mS, mK));
Assertions.assertEquals(5, mergingIterator3.peekNextKey());
Assertions.assertEquals(2, mergingIterator3.peekPrevKey());
Assertions.assertEquals(5, mergingIterator3.peekNextKey());
Assertions.assertEquals(2, mergingIterator3.peekPrevKey());
}
@Test
public void testPriorityLe2() {
var source1 = TreePMap.<Integer, Integer>empty().plus(2, 4).plus(5, 6);
var source2 = TreePMap.<Integer, Integer>empty().plus(1, 3).plus(2, 5).plus(3, 4);
var mergingIterator = new MergingKvIterator<>("test", IteratorStart.LE, 5, (mS, mK) -> new NavigableMapKvIterator<>(source1, mS, mK), (mS, mK) -> new NavigableMapKvIterator<>(source2, mS, mK));
var expected = List.of(Pair.of(5, 6));
for (var pair : expected) {
Assertions.assertTrue(mergingIterator.hasNext());
Assertions.assertEquals(pair, mergingIterator.next());
}
Assertions.assertFalse(mergingIterator.hasNext());
}
@Test
public void testPriorityLe3() {
var source1 = TreePMap.<Integer, Integer>empty().plus(2, 4).plus(5, 6);
var source2 = TreePMap.<Integer, Integer>empty().plus(1, 3).plus(2, 5).plus(6, 8);
var mergingIterator = new MergingKvIterator<>("test", IteratorStart.LE, 5, (mS, mK) -> new NavigableMapKvIterator<>(source1, mS, mK), (mS, mK) -> new NavigableMapKvIterator<>(source2, mS, mK));
var expected = List.of(Pair.of(5, 6), Pair.of(6, 8));
for (var pair : expected) {
Assertions.assertTrue(mergingIterator.hasNext());
Assertions.assertEquals(pair, mergingIterator.next());
}
Assertions.assertFalse(mergingIterator.hasNext());
Just.checkIterator(mergingIterator.reversed(), Pair.of(6, 8), Pair.of(5, 6), Pair.of(2, 4), Pair.of(1, 3));
Assertions.assertFalse(mergingIterator.reversed().hasNext());
Just.checkIterator(mergingIterator, Pair.of(1, 3), Pair.of(2, 4), Pair.of(5, 6), Pair.of(6, 8));
Assertions.assertFalse(mergingIterator.hasNext());
var mergingIterator2 = new MergingKvIterator<>("test", IteratorStart.LE, 5, (mS, mK) -> new NavigableMapKvIterator<>(source2, mS, mK), (mS, mK) -> new NavigableMapKvIterator<>(source1, mS, mK));
var expected2 = List.of(Pair.of(5, 6), Pair.of(6, 8));
for (var pair : expected2) {
Assertions.assertTrue(mergingIterator2.hasNext());
Assertions.assertEquals(pair, mergingIterator2.next());
}
Assertions.assertFalse(mergingIterator2.hasNext());
var mergingIterator3 = new MergingKvIterator<>("test", IteratorStart.LE, 5, (mS, mK) -> new NavigableMapKvIterator<>(source1, mS, mK), (mS, mK) -> new NavigableMapKvIterator<>(source2, mS, mK));
Assertions.assertEquals(5, mergingIterator3.peekNextKey());
Assertions.assertEquals(2, mergingIterator3.peekPrevKey());
Assertions.assertEquals(5, mergingIterator3.peekNextKey());
Assertions.assertEquals(2, mergingIterator3.peekPrevKey());
Assertions.assertTrue(mergingIterator3.hasPrev());
Assertions.assertTrue(mergingIterator3.hasNext());
Assertions.assertEquals(5, mergingIterator3.peekNextKey());
}
@Test
public void testPriorityLe4() {
var source1 = TreePMap.<Integer, Integer>empty().plus(6, 7);
var source2 = TreePMap.<Integer, Integer>empty().plus(1, 3).plus(2, 5).plus(3, 4);
var mergingIterator = new MergingKvIterator<>("test", IteratorStart.LE, 5, (mS, mK) -> new NavigableMapKvIterator<>(source1, mS, mK), (mS, mK) -> new NavigableMapKvIterator<>(source2, mS, mK));
var expected = List.of(Pair.of(3, 4), Pair.of(6, 7));
for (var pair : expected) {
Assertions.assertTrue(mergingIterator.hasNext());
Assertions.assertEquals(pair, mergingIterator.next());
}
Assertions.assertFalse(mergingIterator.hasNext());
var mergingIterator2 = new MergingKvIterator<>("test", IteratorStart.LE, 5, (mS, mK) -> new NavigableMapKvIterator<>(source2, mS, mK), (mS, mK) -> new NavigableMapKvIterator<>(source1, mS, mK));
var expected2 = List.of(Pair.of(3, 4), Pair.of(6, 7));
for (var pair : expected2) {
Assertions.assertTrue(mergingIterator2.hasNext());
Assertions.assertEquals(pair, mergingIterator2.next());
}
Assertions.assertFalse(mergingIterator2.hasNext());
}
@Test
public void testPriorityLe5() {
var source1 = TreePMap.<Integer, Integer>empty().plus(1, 2).plus(6, 7);
var source2 = TreePMap.<Integer, Integer>empty().plus(1, 3).plus(2, 5).plus(3, 4);
var mergingIterator = new MergingKvIterator<>("test", IteratorStart.LE, 5, (mS, mK) -> new NavigableMapKvIterator<>(source1, mS, mK), (mS, mK) -> new NavigableMapKvIterator<>(source2, mS, mK));
var expected = List.of(Pair.of(3, 4), Pair.of(6, 7));
for (var pair : expected) {
Assertions.assertTrue(mergingIterator.hasNext());
Assertions.assertEquals(pair, mergingIterator.next());
}
Assertions.assertFalse(mergingIterator.hasNext());
var mergingIterator2 = new MergingKvIterator<>("test", IteratorStart.LE, 5, (mS, mK) -> new NavigableMapKvIterator<>(source2, mS, mK), (mS, mK) -> new NavigableMapKvIterator<>(source1, mS, mK));
var expected2 = List.of(Pair.of(3, 4), Pair.of(6, 7));
for (var pair : expected2) {
Assertions.assertTrue(mergingIterator2.hasNext());
Assertions.assertEquals(pair, mergingIterator2.next());
}
Assertions.assertFalse(mergingIterator2.hasNext());
}
@Test
public void testPriorityLe6() {
var source1 = TreePMap.<Integer, Integer>empty().plus(1, 3).plus(2, 5).plus(3, 4);
var source2 = TreePMap.<Integer, Integer>empty().plus(4, 6);
var mergingIterator = new MergingKvIterator<>("test", IteratorStart.LE, 5, (mS, mK) -> new NavigableMapKvIterator<>(source1, mS, mK), (mS, mK) -> new NavigableMapKvIterator<>(source2, mS, mK));
var expected = List.of(Pair.of(4, 6));
for (var pair : expected) {
Assertions.assertTrue(mergingIterator.hasNext());
Assertions.assertEquals(pair, mergingIterator.next());
}
Assertions.assertFalse(mergingIterator.hasNext());
var mergingIterator2 = new MergingKvIterator<>("test", IteratorStart.LE, 5, (mS, mK) -> new NavigableMapKvIterator<>(source2, mS, mK), (mS, mK) -> new NavigableMapKvIterator<>(source1, mS, mK));
var expected2 = List.of(Pair.of(4, 6));
for (var pair : expected2) {
Assertions.assertTrue(mergingIterator2.hasNext());
Assertions.assertEquals(pair, mergingIterator2.next());
}
Assertions.assertFalse(mergingIterator2.hasNext());
}
@Test
public void testPriorityLe7() {
var source1 = TreePMap.<Integer, Integer>empty().plus(1, 3).plus(3, 5).plus(4, 6);
var source2 = TreePMap.<Integer, Integer>empty().plus(1, 4).plus(3, 5).plus(4, 6);
var mergingIterator = new MergingKvIterator<>("test", IteratorStart.LE, 2, (mS, mK) -> new NavigableMapKvIterator<>(source1, mS, mK), (mS, mK) -> new NavigableMapKvIterator<>(source2, mS, mK));
var expected = List.of(Pair.of(1, 3), Pair.of(3, 5), Pair.of(4, 6));
for (var pair : expected) {
Assertions.assertTrue(mergingIterator.hasNext());
Assertions.assertEquals(pair, mergingIterator.next());
}
Assertions.assertFalse(mergingIterator.hasNext());
Just.checkIterator(mergingIterator.reversed(), Pair.of(4, 6), Pair.of(3, 5), Pair.of(1, 3));
Just.checkIterator(mergingIterator, Pair.of(1, 3), Pair.of(3, 5), Pair.of(4, 6));
var mergingIterator2 = new MergingKvIterator<>("test", IteratorStart.LE, 2, (mS, mK) -> new NavigableMapKvIterator<>(source2, mS, mK), (mS, mK) -> new NavigableMapKvIterator<>(source1, mS, mK));
var expected2 = List.of(Pair.of(1, 4), Pair.of(3, 5), Pair.of(4, 6));
for (var pair : expected2) {
Assertions.assertTrue(mergingIterator2.hasNext());
Assertions.assertEquals(pair, mergingIterator2.next());
}
Assertions.assertFalse(mergingIterator2.hasNext());
}
@Test
public void testPriorityLt() {
var source1 = TreePMap.<Integer, Integer>empty().plus(2, 4).plus(5, 6);
var source2 = TreePMap.<Integer, Integer>empty().plus(1, 3).plus(2, 5);
var mergingIterator = new MergingKvIterator<>("test", IteratorStart.LT, 5, (mS, mK) -> new NavigableMapKvIterator<>(source1, mS, mK), (mS, mK) -> new NavigableMapKvIterator<>(source2, mS, mK));
var expected = List.of(Pair.of(2, 4), Pair.of(5, 6));
for (var pair : expected) {
Assertions.assertTrue(mergingIterator.hasNext());
Assertions.assertEquals(pair, mergingIterator.next());
}
Assertions.assertFalse(mergingIterator.hasNext());
var mergingIterator2 = new MergingKvIterator<>("test", IteratorStart.LT, 5, (mS, mK) -> new NavigableMapKvIterator<>(source2, mS, mK), (mS, mK) -> new NavigableMapKvIterator<>(source1, mS, mK));
var expected2 = List.of(Pair.of(2, 5), Pair.of(5, 6));
for (var pair : expected2) {
Assertions.assertTrue(mergingIterator2.hasNext());
Assertions.assertEquals(pair, mergingIterator2.next());
}
Assertions.assertFalse(mergingIterator2.hasNext());
}
private class SimpleIteratorWrapper<K extends Comparable<K>, V> implements CloseableKvIterator<K, V> {
private final Iterator<Pair<K, V>> _iterator;
private Pair<K, V> _next;
public SimpleIteratorWrapper(Iterator<Pair<K, V>> iterator) {
_iterator = iterator;
fillNext();
}
private void fillNext() {
while (_iterator.hasNext() && _next == null) {
_next = _iterator.next();
}
}
@Override
public K peekNextKey() {
if (_next == null) {
throw new NoSuchElementException();
}
return _next.getKey();
}
@Override
public void skip() {
if (_next == null) {
throw new NoSuchElementException();
}
_next = null;
fillNext();
}
@Override
public K peekPrevKey() {
throw new UnsupportedOperationException();
}
@Override
public Pair<K, V> prev() {
throw new UnsupportedOperationException();
}
@Override
public boolean hasPrev() {
throw new UnsupportedOperationException();
}
@Override
public void skipPrev() {
throw new UnsupportedOperationException();
}
@Override
public void close() {
}
@Override
public boolean hasNext() {
return _next != null;
}
@Override
public Pair<K, V> next() {
if (_next == null) {
throw new NoSuchElementException("No more elements");
}
var ret = _next;
_next = null;
fillNext();
return ret;
}
}
}

View File

@@ -0,0 +1,161 @@
package com.usatiuk.objects.iterators;
import com.usatiuk.objects.Just;
import org.apache.commons.lang3.tuple.Pair;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.pcollections.TreePMap;
import java.util.List;
public class PredicateKvIteratorTest {
@Test
public void simpleTest() {
var source1 = TreePMap.<Integer, Integer>empty().plus(1, 3).plus(3, 5).plus(4, 6);
var pit = new PredicateKvIterator<>(new NavigableMapKvIterator<>(source1, IteratorStart.GT, 1),
IteratorStart.GE, 1, v -> (v % 2 == 0) ? v : null);
var expected = List.of(Pair.of(4, 6));
for (var pair : expected) {
Assertions.assertTrue(pit.hasNext());
Assertions.assertEquals(pair, pit.next());
}
}
@Test
public void ltTest() {
var source1 = TreePMap.<Integer, Integer>empty().plus(1, 3).plus(3, 5).plus(4, 6);
var pit = new PredicateKvIterator<>(new NavigableMapKvIterator<>(source1, IteratorStart.LT, 4),
IteratorStart.LT, 4, v -> (v % 2 == 0) ? v : null);
var expected = List.of(Pair.of(4, 6));
for (var pair : expected) {
Assertions.assertTrue(pit.hasNext());
Assertions.assertEquals(pair, pit.next());
}
Assertions.assertFalse(pit.hasNext());
}
@Test
public void ltTest2() {
var source1 = TreePMap.<Integer, Integer>empty().plus(1, 3).plus(3, 5).plus(4, 6);
var pit = new PredicateKvIterator<>(new NavigableMapKvIterator<>(source1, IteratorStart.LT, 1),
IteratorStart.LT, 1, v -> (v % 2 == 0) ? v : null);
Just.checkIterator(pit, Pair.of(4, 6));
Assertions.assertFalse(pit.hasNext());
pit = new PredicateKvIterator<>(new NavigableMapKvIterator<>(source1, IteratorStart.LT, 2),
IteratorStart.LT, 2, v -> (v % 2 == 0) ? v : null);
Just.checkIterator(pit, Pair.of(4, 6));
Assertions.assertFalse(pit.hasNext());
pit = new PredicateKvIterator<>(new NavigableMapKvIterator<>(source1, IteratorStart.LT, 4),
IteratorStart.LT, 4, v -> (v % 2 == 0) ? v : null);
Just.checkIterator(pit, Pair.of(4, 6));
Assertions.assertFalse(pit.hasNext());
pit = new PredicateKvIterator<>(new NavigableMapKvIterator<>(source1, IteratorStart.LE, 4),
IteratorStart.LE, 4, v -> (v % 2 == 0) ? v : null);
Just.checkIterator(pit, Pair.of(4, 6));
Assertions.assertFalse(pit.hasNext());
}
@Test
public void ltTest3() {
var source1 = TreePMap.<Integer, Integer>empty().plus(1, 3).plus(3, 5).plus(4, 6).plus(5, 7).plus(6, 8);
var pit = new PredicateKvIterator<>(new NavigableMapKvIterator<>(source1, IteratorStart.LT, 4),
IteratorStart.LT, 4, v -> (v % 2 == 0) ? v : null);
Just.checkIterator(pit, Pair.of(4, 6), Pair.of(6, 8));
Assertions.assertFalse(pit.hasNext());
pit = new PredicateKvIterator<>(new NavigableMapKvIterator<>(source1, IteratorStart.LT, 5),
IteratorStart.LT, 5, v -> (v % 2 == 0) ? v : null);
Just.checkIterator(pit, Pair.of(4, 6), Pair.of(6, 8));
Assertions.assertFalse(pit.hasNext());
pit = new PredicateKvIterator<>(new NavigableMapKvIterator<>(source1, IteratorStart.LT, 6),
IteratorStart.LT, 6, v -> (v % 2 == 0) ? v : null);
Just.checkIterator(pit, Pair.of(4, 6), Pair.of(6, 8));
Assertions.assertFalse(pit.hasNext());
pit = new PredicateKvIterator<>(new NavigableMapKvIterator<>(source1, IteratorStart.LT, 7),
IteratorStart.LT, 7, v -> (v % 2 == 0) ? v : null);
Just.checkIterator(pit, Pair.of(6, 8));
Assertions.assertFalse(pit.hasNext());
pit = new PredicateKvIterator<>(new NavigableMapKvIterator<>(source1, IteratorStart.LT, 8),
IteratorStart.LT, 8, v -> (v % 2 == 0) ? v : null);
Just.checkIterator(pit, Pair.of(6, 8));
Assertions.assertFalse(pit.hasNext());
pit = new PredicateKvIterator<>(new NavigableMapKvIterator<>(source1, IteratorStart.LE, 6),
IteratorStart.LE, 6, v -> (v % 2 == 0) ? v : null);
Just.checkIterator(pit, Pair.of(6, 8));
Assertions.assertFalse(pit.hasNext());
pit = new PredicateKvIterator<>(new NavigableMapKvIterator<>(source1, IteratorStart.LT, 6),
IteratorStart.LT, 6, v -> (v % 2 == 0) ? v : null);
Assertions.assertTrue(pit.hasNext());
Assertions.assertEquals(4, pit.peekNextKey());
Assertions.assertFalse(pit.hasPrev());
Assertions.assertEquals(4, pit.peekNextKey());
Assertions.assertFalse(pit.hasPrev());
Assertions.assertEquals(Pair.of(4, 6), pit.next());
Assertions.assertTrue(pit.hasNext());
Assertions.assertEquals(6, pit.peekNextKey());
Assertions.assertEquals(4, pit.peekPrevKey());
Assertions.assertEquals(6, pit.peekNextKey());
Assertions.assertEquals(4, pit.peekPrevKey());
}
@Test
public void itTest4() {
var source1 = TreePMap.<Integer, Integer>empty().plus(1, 3).plus(3, 5).plus(4, 6).plus(5, 8).plus(6, 10);
var pit = new PredicateKvIterator<>(new NavigableMapKvIterator<>(source1, IteratorStart.LT, 4),
IteratorStart.LT, 4, v -> (v % 2 == 0) ? v : null);
Just.checkIterator(pit, Pair.of(4, 6), Pair.of(5, 8), Pair.of(6, 10));
Assertions.assertFalse(pit.hasNext());
pit = new PredicateKvIterator<>(new NavigableMapKvIterator<>(source1, IteratorStart.LT, 5),
IteratorStart.LT, 5, v -> (v % 2 == 0) ? v : null);
Just.checkIterator(pit, Pair.of(4, 6), Pair.of(5, 8), Pair.of(6, 10));
Assertions.assertFalse(pit.hasNext());
pit = new PredicateKvIterator<>(new NavigableMapKvIterator<>(source1, IteratorStart.LT, 6),
IteratorStart.LT, 6, v -> (v % 2 == 0) ? v : null);
Just.checkIterator(pit, Pair.of(5, 8), Pair.of(6, 10));
Assertions.assertFalse(pit.hasNext());
pit = new PredicateKvIterator<>(new NavigableMapKvIterator<>(source1, IteratorStart.LT, 7),
IteratorStart.LT, 7, v -> (v % 2 == 0) ? v : null);
Just.checkIterator(pit, Pair.of(6, 10));
Assertions.assertFalse(pit.hasNext());
Assertions.assertTrue(pit.hasPrev());
Assertions.assertEquals(6, pit.peekPrevKey());
Assertions.assertEquals(Pair.of(6, 10), pit.prev());
Assertions.assertTrue(pit.hasNext());
Assertions.assertEquals(6, pit.peekNextKey());
pit = new PredicateKvIterator<>(new NavigableMapKvIterator<>(source1, IteratorStart.LT, 6),
IteratorStart.LT, 6, v -> (v % 2 == 0) ? v : null);
Assertions.assertTrue(pit.hasNext());
Assertions.assertEquals(5, pit.peekNextKey());
Assertions.assertTrue(pit.hasPrev());
Assertions.assertEquals(4, pit.peekPrevKey());
Assertions.assertEquals(5, pit.peekNextKey());
Assertions.assertEquals(4, pit.peekPrevKey());
Assertions.assertEquals(Pair.of(5, 8), pit.next());
Assertions.assertTrue(pit.hasNext());
Assertions.assertEquals(6, pit.peekNextKey());
Assertions.assertEquals(5, pit.peekPrevKey());
Assertions.assertEquals(6, pit.peekNextKey());
Assertions.assertEquals(5, pit.peekPrevKey());
}
// @Test
// public void reverseTest() {
// var source1 = TreePMap.<Integer, Integer>empty().plus(1, 3).plus(3, 5).plus(4, 6);
// var pit = new PredicateKvIterator<>(new NavigableMapKvIterator<>(source1, IteratorStart.LT, 4),
// IteratorStart.LT, 4, v -> (v % 2 == 0) ? v : null);
//
// }
}

View File

@@ -1,275 +0,0 @@
package com.usatiuk.objects.iterators;
import net.jqwik.api.*;
import net.jqwik.api.state.Action;
import net.jqwik.api.state.ActionChain;
import org.apache.commons.lang3.tuple.Pair;
import org.junit.jupiter.api.Assertions;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
public class TombstoneSkippingIteratorPbtTest {
@Property
public void checkMergingIterator(@ForAll("actions") ActionChain<MergingIteratorModel> actions) {
actions.run();
}
@Provide
Arbitrary<ActionChain<MergingIteratorModel>> actions(@ForAll("lists") List<List<Map.Entry<Integer, MaybeTombstone<Integer>>>> list,
@ForAll IteratorStart iteratorStart, @ForAll("startKey") Integer startKey) {
return ActionChain.startWith(() -> new MergingIteratorModel(list, iteratorStart, startKey))
.withAction(new NextAction())
.withAction(new PeekNextKeyAction())
.withAction(new SkipAction())
.withAction(new PeekPrevKeyAction())
.withAction(new SkipPrevAction())
.withAction(new PrevAction())
.withAction(new HasNextAction())
.withAction(new HasPrevAction());
}
@Provide
Arbitrary<List<List<Map.Entry<Integer, MaybeTombstone<Integer>>>>> lists() {
return Arbitraries.entries(Arbitraries.integers().between(-50, 50),
Arbitraries.integers().between(-50, 50).flatMap(i -> Arbitraries.of(true, false).<MaybeTombstone<Integer>>flatMap(
b -> b ? Arbitraries.just(new DataWrapper<Integer>(i)) : Arbitraries.just(new TombstoneImpl<>())
))
)
.list().uniqueElements(Map.Entry::getKey).ofMinSize(0).ofMaxSize(20)
.list().ofMinSize(1).ofMaxSize(5);
}
@Provide
Arbitrary<Integer> startKey() {
return Arbitraries.integers().between(-51, 51);
}
static class MergingIteratorModel implements CloseableKvIterator<Integer, Integer> {
private final CloseableKvIterator<Integer, Integer> mergedIterator;
private final CloseableKvIterator<Integer, Integer> mergingIterator;
private MergingIteratorModel(List<List<Map.Entry<Integer, MaybeTombstone<Integer>>>> pairs, IteratorStart startType, Integer startKey) {
TreeMap<Integer, MaybeTombstone<Integer>> perfectMergedTombstones = new TreeMap<>();
for (List<Map.Entry<Integer, MaybeTombstone<Integer>>> list : pairs) {
for (Map.Entry<Integer, MaybeTombstone<Integer>> pair : list) {
perfectMergedTombstones.putIfAbsent(pair.getKey(), pair.getValue());
}
}
TreeMap<Integer, Integer> perfectMerged = new TreeMap<>();
for (var e : perfectMergedTombstones.entrySet()) {
if (e.getValue() instanceof Data<Integer> data)
perfectMerged.put(e.getKey(), data.value());
}
mergedIterator = new NavigableMapKvIterator<>(perfectMerged, startType, startKey);
mergingIterator = new TombstoneSkippingIterator<>(startType, startKey, pairs.stream().<CloseableKvIterator<Integer, MaybeTombstone<Integer>>>map(
list -> new NavigableMapKvIterator<Integer, MaybeTombstone<Integer>>(new TreeMap<Integer, MaybeTombstone<Integer>>(Map.ofEntries(list.toArray(Map.Entry[]::new))), startType, startKey)
).toList());
}
@Override
public Integer peekNextKey() {
var mergedKey = mergedIterator.peekNextKey();
var mergingKey = mergingIterator.peekNextKey();
Assertions.assertEquals(mergedKey, mergingKey);
return mergedKey;
}
@Override
public void skip() {
mergedIterator.skip();
mergingIterator.skip();
}
@Override
public Integer peekPrevKey() {
var mergedKey = mergedIterator.peekPrevKey();
var mergingKey = mergingIterator.peekPrevKey();
Assertions.assertEquals(mergedKey, mergingKey);
return mergedKey;
}
@Override
public Pair<Integer, Integer> prev() {
var mergedKey = mergedIterator.prev();
var mergingKey = mergingIterator.prev();
Assertions.assertEquals(mergedKey, mergingKey);
return mergedKey;
}
@Override
public boolean hasPrev() {
var mergedKey = mergedIterator.hasPrev();
var mergingKey = mergingIterator.hasPrev();
Assertions.assertEquals(mergedKey, mergingKey);
return mergedKey;
}
@Override
public void skipPrev() {
mergedIterator.skipPrev();
mergingIterator.skipPrev();
}
@Override
public void close() {
mergedIterator.close();
mergingIterator.close();
}
@Override
public boolean hasNext() {
var mergedKey = mergedIterator.hasNext();
var mergingKey = mergingIterator.hasNext();
Assertions.assertEquals(mergedKey, mergingKey);
return mergedKey;
}
@Override
public Pair<Integer, Integer> next() {
var mergedKey = mergedIterator.next();
var mergingKey = mergingIterator.next();
Assertions.assertEquals(mergedKey, mergingKey);
return mergedKey;
}
}
static class PeekNextKeyAction extends Action.JustMutate<MergingIteratorModel> {
@Override
public void mutate(MergingIteratorModel state) {
state.peekNextKey();
}
@Override
public boolean precondition(MergingIteratorModel state) {
return state.hasNext();
}
@Override
public String description() {
return "Peek next key";
}
}
static class SkipAction extends Action.JustMutate<MergingIteratorModel> {
@Override
public void mutate(MergingIteratorModel state) {
state.skip();
}
@Override
public boolean precondition(MergingIteratorModel state) {
return state.hasNext();
}
@Override
public String description() {
return "Skip next key";
}
}
static class PeekPrevKeyAction extends Action.JustMutate<MergingIteratorModel> {
@Override
public void mutate(MergingIteratorModel state) {
state.peekPrevKey();
}
@Override
public boolean precondition(MergingIteratorModel state) {
return state.hasPrev();
}
@Override
public String description() {
return "Peek prev key";
}
}
static class SkipPrevAction extends Action.JustMutate<MergingIteratorModel> {
@Override
public void mutate(MergingIteratorModel state) {
state.skipPrev();
}
@Override
public boolean precondition(MergingIteratorModel state) {
return state.hasPrev();
}
@Override
public String description() {
return "Skip prev key";
}
}
static class PrevAction extends Action.JustMutate<MergingIteratorModel> {
@Override
public void mutate(MergingIteratorModel state) {
state.prev();
}
@Override
public boolean precondition(MergingIteratorModel state) {
return state.hasPrev();
}
@Override
public String description() {
return "Prev key";
}
}
static class NextAction extends Action.JustMutate<MergingIteratorModel> {
@Override
public void mutate(MergingIteratorModel state) {
state.next();
}
@Override
public boolean precondition(MergingIteratorModel state) {
return state.hasNext();
}
@Override
public String description() {
return "Next key";
}
}
static class HasNextAction extends Action.JustMutate<MergingIteratorModel> {
@Override
public void mutate(MergingIteratorModel state) {
state.hasNext();
}
@Override
public boolean precondition(MergingIteratorModel state) {
return true;
}
@Override
public String description() {
return "Has next key";
}
}
static class HasPrevAction extends Action.JustMutate<MergingIteratorModel> {
@Override
public void mutate(MergingIteratorModel state) {
state.hasPrev();
}
@Override
public boolean precondition(MergingIteratorModel state) {
return true;
}
@Override
public String description() {
return "Has prev key";
}
}
}

View File

@@ -0,0 +1,130 @@
package com.usatiuk.objects.stores;
import com.google.protobuf.ByteString;
import com.usatiuk.objects.JObjectKey;
import com.usatiuk.objects.Just;
import com.usatiuk.objects.TempDataProfile;
import com.usatiuk.objects.iterators.IteratorStart;
import io.quarkus.test.junit.QuarkusTest;
import io.quarkus.test.junit.TestProfile;
import jakarta.inject.Inject;
import org.apache.commons.lang3.tuple.Pair;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.RepeatedTest;
import java.nio.ByteBuffer;
import java.util.List;
class Profiles {
public static class LmdbKvIteratorTestProfile extends TempDataProfile {
}
}
@QuarkusTest
@TestProfile(Profiles.LmdbKvIteratorTestProfile.class)
public class LmdbKvIteratorTest {
@Inject
LmdbObjectPersistentStore store;
long getNextTxId() {
try (var s = store.getSnapshot()) {
return s.id() + 1;
}
}
@RepeatedTest(100)
public void iteratorTest1() {
store.prepareTx(
new TxManifestRaw(
List.of(Pair.of(JObjectKey.of(Long.toString(1)), ByteString.copyFrom(new byte[]{2})),
Pair.of(JObjectKey.of(Long.toString(2)), ByteString.copyFrom(new byte[]{3})),
Pair.of(JObjectKey.of(Long.toString(3)), ByteString.copyFrom(new byte[]{4}))),
List.of()
), getNextTxId()
).run();
try (var snapshot = store.getSnapshot()) {
var iterator = snapshot.getIterator(IteratorStart.GE, JObjectKey.of(""));
Just.checkIterator(iterator, List.of(Pair.of(JObjectKey.of(Long.toString(1)), ByteBuffer.wrap(new byte[]{2})),
Pair.of(JObjectKey.of(Long.toString(2)), ByteBuffer.wrap(new byte[]{3})),
Pair.of(JObjectKey.of(Long.toString(3)), ByteBuffer.wrap(new byte[]{4}))));
Assertions.assertFalse(iterator.hasNext());
iterator.close();
iterator = snapshot.getIterator(IteratorStart.LE, JObjectKey.of(Long.toString(3)));
Just.checkIterator(iterator, Pair.of(JObjectKey.of(Long.toString(3)), ByteBuffer.wrap(new byte[]{4})));
Assertions.assertFalse(iterator.hasNext());
iterator.close();
iterator = snapshot.getIterator(IteratorStart.LE, JObjectKey.of(Long.toString(2)));
Just.checkIterator(iterator, Pair.of(JObjectKey.of(Long.toString(2)), ByteBuffer.wrap(new byte[]{3})), Pair.of(JObjectKey.of(Long.toString(3)), ByteBuffer.wrap(new byte[]{4})));
Assertions.assertFalse(iterator.hasNext());
iterator.close();
iterator = snapshot.getIterator(IteratorStart.GE, JObjectKey.of(Long.toString(2)));
Just.checkIterator(iterator, Pair.of(JObjectKey.of(Long.toString(2)), ByteBuffer.wrap(new byte[]{3})), Pair.of(JObjectKey.of(Long.toString(3)), ByteBuffer.wrap(new byte[]{4})));
Assertions.assertFalse(iterator.hasNext());
iterator.close();
iterator = snapshot.getIterator(IteratorStart.GT, JObjectKey.of(Long.toString(2)));
Just.checkIterator(iterator, Pair.of(JObjectKey.of(Long.toString(3)), ByteBuffer.wrap(new byte[]{4})));
Assertions.assertFalse(iterator.hasNext());
iterator.close();
iterator = snapshot.getIterator(IteratorStart.LT, JObjectKey.of(Long.toString(3)));
Just.checkIterator(iterator, Pair.of(JObjectKey.of(Long.toString(2)), ByteBuffer.wrap(new byte[]{3})), Pair.of(JObjectKey.of(Long.toString(3)), ByteBuffer.wrap(new byte[]{4})));
Assertions.assertFalse(iterator.hasNext());
iterator.close();
iterator = snapshot.getIterator(IteratorStart.LT, JObjectKey.of(Long.toString(2)));
Just.checkIterator(iterator, Pair.of(JObjectKey.of(Long.toString(1)), ByteBuffer.wrap(new byte[]{2})), Pair.of(JObjectKey.of(Long.toString(2)), ByteBuffer.wrap(new byte[]{3})), Pair.of(JObjectKey.of(Long.toString(3)), ByteBuffer.wrap(new byte[]{4})));
Assertions.assertFalse(iterator.hasNext());
iterator.close();
iterator = snapshot.getIterator(IteratorStart.LT, JObjectKey.of(Long.toString(1)));
Just.checkIterator(iterator, Pair.of(JObjectKey.of(Long.toString(1)), ByteBuffer.wrap(new byte[]{2})), Pair.of(JObjectKey.of(Long.toString(2)), ByteBuffer.wrap(new byte[]{3})), Pair.of(JObjectKey.of(Long.toString(3)), ByteBuffer.wrap(new byte[]{4})));
Assertions.assertFalse(iterator.hasNext());
iterator.close();
iterator = snapshot.getIterator(IteratorStart.LE, JObjectKey.of(Long.toString(1)));
Just.checkIterator(iterator, Pair.of(JObjectKey.of(Long.toString(1)), ByteBuffer.wrap(new byte[]{2})), Pair.of(JObjectKey.of(Long.toString(2)), ByteBuffer.wrap(new byte[]{3})), Pair.of(JObjectKey.of(Long.toString(3)), ByteBuffer.wrap(new byte[]{4})));
Assertions.assertFalse(iterator.hasNext());
iterator.close();
iterator = snapshot.getIterator(IteratorStart.GT, JObjectKey.of(Long.toString(3)));
Assertions.assertFalse(iterator.hasNext());
iterator.close();
iterator = snapshot.getIterator(IteratorStart.GT, JObjectKey.of(Long.toString(4)));
Assertions.assertFalse(iterator.hasNext());
iterator.close();
iterator = snapshot.getIterator(IteratorStart.LE, JObjectKey.of(Long.toString(0)));
Just.checkIterator(iterator, Pair.of(JObjectKey.of(Long.toString(1)), ByteBuffer.wrap(new byte[]{2})), Pair.of(JObjectKey.of(Long.toString(2)), ByteBuffer.wrap(new byte[]{3})), Pair.of(JObjectKey.of(Long.toString(3)), ByteBuffer.wrap(new byte[]{4})));
Assertions.assertFalse(iterator.hasNext());
iterator.close();
iterator = snapshot.getIterator(IteratorStart.GE, JObjectKey.of(Long.toString(2)));
Assertions.assertTrue(iterator.hasNext());
Assertions.assertEquals(JObjectKey.of(Long.toString(2)), iterator.peekNextKey());
Assertions.assertEquals(JObjectKey.of(Long.toString(1)), iterator.peekPrevKey());
Assertions.assertEquals(JObjectKey.of(Long.toString(2)), iterator.peekNextKey());
Assertions.assertEquals(JObjectKey.of(Long.toString(1)), iterator.peekPrevKey());
Just.checkIterator(iterator.reversed(), Pair.of(JObjectKey.of(Long.toString(1)), ByteBuffer.wrap(new byte[]{2})));
Just.checkIterator(iterator, Pair.of(JObjectKey.of(Long.toString(1)), ByteBuffer.wrap(new byte[]{2})), Pair.of(JObjectKey.of(Long.toString(2)), ByteBuffer.wrap(new byte[]{3})), Pair.of(JObjectKey.of(Long.toString(3)), ByteBuffer.wrap(new byte[]{4})));
Assertions.assertEquals(Pair.of(JObjectKey.of(Long.toString(3)), ByteBuffer.wrap(new byte[]{4})), iterator.prev());
Assertions.assertEquals(Pair.of(JObjectKey.of(Long.toString(2)), ByteBuffer.wrap(new byte[]{3})), iterator.prev());
Assertions.assertEquals(Pair.of(JObjectKey.of(Long.toString(2)), ByteBuffer.wrap(new byte[]{3})), iterator.next());
iterator.close();
}
store.prepareTx(new TxManifestRaw(
List.of(),
List.of(JObjectKey.of(Long.toString(1)), JObjectKey.of(Long.toString(2)), JObjectKey.of(Long.toString(3)))
),
getNextTxId()
).run();
}
}

View File

@@ -34,9 +34,11 @@ public class HashSetDelayedBlockingQueue<T> {
synchronized (this) {
if (_closed) throw new IllegalStateException("Adding to a queue that is closed!");
if (_set.putIfAbsent(el, new SetElement<>(el, System.currentTimeMillis())) != null)
if (_set.containsKey(el))
return false;
_set.put(el, new SetElement<>(el, System.currentTimeMillis()));
this.notify();
return true;
}

View File

@@ -1,33 +0,0 @@
package com.usatiuk.utils;
import java.util.List;
import java.util.function.Function;
public class ListUtils {
public static <T, T_V> List<T_V> prependAndMap(T_V item, List<T> suffix, Function<T, T_V> suffixFn) {
T_V[] arr = (T_V[]) new Object[suffix.size() + 1];
arr[0] = item;
for (int i = 0; i < suffix.size(); i++) {
arr[i + 1] = suffixFn.apply(suffix.get(i));
}
return List.of(arr);
}
public static <T> List<T> prepend(T item, List<T> suffix) {
T[] arr = (T[]) new Object[suffix.size() + 1];
arr[0] = item;
for (int i = 0; i < suffix.size(); i++) {
arr[i + 1] = suffix.get(i);
}
return List.of(arr);
}
public static <T, T_V> List<T_V> map(List<T> suffix, Function<T, T_V> suffixFn) {
T_V[] arr = (T_V[]) new Object[suffix.size()];
for (int i = 0; i < suffix.size(); i++) {
arr[i] = suffixFn.apply(suffix.get(i));
}
return List.of(arr);
}
}

View File

@@ -6,6 +6,7 @@ import java.nio.ByteBuffer;
import java.util.function.Consumer;
public class UninitializedByteBuffer {
private static final Linker LINKER = Linker.nativeLinker();
private static final MethodHandle malloc = LINKER.downcallHandle(
LINKER.defaultLookup().find("malloc").orElseThrow(),
@@ -18,7 +19,7 @@ public class UninitializedByteBuffer {
public static ByteBuffer allocate(int capacity) {
UnsafeAccessor.get().getNioAccess().reserveMemory(capacity, capacity);
// Invoke malloc(), which returns a pointer
MemorySegment segment = null;
try {
segment = (MemorySegment) malloc.invokeExact((long) capacity);
@@ -26,6 +27,7 @@ public class UninitializedByteBuffer {
throw new RuntimeException(e);
}
Consumer<MemorySegment> cleanup = s -> {
try {
free.invokeExact(s);
@@ -37,8 +39,4 @@ public class UninitializedByteBuffer {
var reint = segment.reinterpret(capacity, Arena.ofAuto(), cleanup);
return reint.asByteBuffer();
}
public static long getAddress(ByteBuffer buffer) {
return UnsafeAccessor.get().getNioAccess().getBufferAddress(buffer);
}
}

View File

@@ -25,7 +25,6 @@ echo "Extra options: $EXTRAOPTS_PARSED"
java \
-Xmx512M \
--enable-preview \
-Ddhfs.objects.writeback.limit=134217728 \
-Ddhfs.objects.lru.limit=134217728 \
--add-exports java.base/sun.nio.ch=ALL-UNNAMED \