mirror of
https://github.com/usatiuk/dhfs.git
synced 2025-10-29 04:57:48 +01:00
Compare commits
9 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| edf1ae85f5 | |||
| b42461f188 | |||
| 513cbd717d | |||
| 075867daaa | |||
| 8e4ea67e53 | |||
| fb128882cb | |||
| cb8c50000a | |||
| 4c5cbfb5bf | |||
| 6bcec4a260 |
@@ -69,6 +69,11 @@
|
||||
<artifactId>lmdbjava</artifactId>
|
||||
<version>0.9.1</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.rocksdb</groupId>
|
||||
<artifactId>rocksdbjni</artifactId>
|
||||
<version>9.10.0</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.commons</groupId>
|
||||
<artifactId>commons-collections4</artifactId>
|
||||
|
||||
@@ -4,71 +4,56 @@ import io.quarkus.logging.Log;
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.IntStream;
|
||||
|
||||
public class MergingKvIterator<K extends Comparable<K>, V> extends ReversibleKvIterator<K, V> {
|
||||
private final NavigableMap<K, CloseableKvIterator<K, V>> _sortedIterators = new TreeMap<>();
|
||||
private final String _name;
|
||||
private final IteratorStart _initialStartType;
|
||||
private final K _initialStartKey;
|
||||
private final List<IterProdFn<K, V>> _pendingIterators;
|
||||
private Map<CloseableKvIterator<K, V>, Integer> _iterators;
|
||||
// Fast path for the first element
|
||||
private FirstMatchState<K, V> _firstMatchState;
|
||||
private final Map<CloseableKvIterator<K, V>, Integer> _iterators;
|
||||
|
||||
public MergingKvIterator(String name, IteratorStart startType, K startKey, List<IterProdFn<K, V>> iterators) {
|
||||
_goingForward = true;
|
||||
_name = name;
|
||||
_initialStartType = startType;
|
||||
_initialStartKey = startKey;
|
||||
|
||||
{
|
||||
int counter = 0;
|
||||
var iteratorsTmp = new HashMap<CloseableKvIterator<K, V>, Integer>();
|
||||
for (var iteratorFn : iterators) {
|
||||
var iterator = iteratorFn.get(startType, startKey);
|
||||
if ((counter == 0) // Not really a requirement but simplifies some things for now
|
||||
&& (startType == IteratorStart.GE || startType == IteratorStart.LE)
|
||||
&& iterator.hasNext()
|
||||
&& iterator.peekNextKey().equals(startKey)) {
|
||||
_firstMatchState = new FirstMatchFound<>(iterator);
|
||||
_pendingIterators = iterators;
|
||||
Log.tracev("{0} Created fast match: {1}", _name, _firstMatchState);
|
||||
return;
|
||||
}
|
||||
iteratorsTmp.put(iterator, counter++);
|
||||
}
|
||||
_iterators = Map.copyOf(iteratorsTmp);
|
||||
_pendingIterators = null;
|
||||
}
|
||||
_iterators = Map.ofEntries(
|
||||
IntStream.range(0, iterators.size())
|
||||
.mapToObj(i -> Pair.of(iterators.get(i).get(startType, startKey), i))
|
||||
.toArray(Pair[]::new)
|
||||
);
|
||||
|
||||
_firstMatchState = new FirstMatchNone<>();
|
||||
doInitialAdvance();
|
||||
}
|
||||
|
||||
@SafeVarargs
|
||||
public MergingKvIterator(String name, IteratorStart startType, K startKey, IterProdFn<K, V>... iterators) {
|
||||
this(name, startType, startKey, List.of(iterators));
|
||||
}
|
||||
|
||||
private void doInitialAdvance() {
|
||||
if (_initialStartType == IteratorStart.LT || _initialStartType == IteratorStart.LE) {
|
||||
if (startType == IteratorStart.LT || startType == IteratorStart.LE) {
|
||||
// Starting at a greatest key less than/less or equal than:
|
||||
// We have a bunch of iterators that have given us theirs "greatest LT/LE key"
|
||||
// now we need to pick the greatest of those to start with
|
||||
// But if some of them don't have a lesser key, we need to pick the smallest of those
|
||||
var found = _iterators.keySet().stream()
|
||||
.filter(CloseableKvIterator::hasNext)
|
||||
.map((i) -> {
|
||||
var peeked = i.peekNextKey();
|
||||
// Log.warnv("peeked: {0}, from {1}", peeked, i.getClass());
|
||||
return peeked;
|
||||
}).distinct().collect(Collectors.partitioningBy(e -> _initialStartType == IteratorStart.LE ? e.compareTo(_initialStartKey) <= 0 : e.compareTo(_initialStartKey) < 0));
|
||||
|
||||
K greatestLess = null;
|
||||
K smallestMore = null;
|
||||
|
||||
for (var it : _iterators.keySet()) {
|
||||
if (it.hasNext()) {
|
||||
var peeked = it.peekNextKey();
|
||||
if (startType == IteratorStart.LE ? peeked.compareTo(startKey) <= 0 : peeked.compareTo(startKey) < 0) {
|
||||
if (greatestLess == null || peeked.compareTo(greatestLess) > 0) {
|
||||
greatestLess = peeked;
|
||||
}
|
||||
} else {
|
||||
if (smallestMore == null || peeked.compareTo(smallestMore) < 0) {
|
||||
smallestMore = peeked;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
K initialMaxValue;
|
||||
if (!found.get(true).isEmpty())
|
||||
initialMaxValue = found.get(true).stream().max(Comparator.naturalOrder()).orElse(null);
|
||||
if (greatestLess != null)
|
||||
initialMaxValue = greatestLess;
|
||||
else
|
||||
initialMaxValue = found.get(false).stream().min(Comparator.naturalOrder()).orElse(null);
|
||||
initialMaxValue = smallestMore;
|
||||
|
||||
if (initialMaxValue == null) {
|
||||
// Empty iterators
|
||||
}
|
||||
|
||||
for (var iterator : _iterators.keySet()) {
|
||||
while (iterator.hasNext() && iterator.peekNextKey().compareTo(initialMaxValue) < 0) {
|
||||
@@ -82,7 +67,7 @@ public class MergingKvIterator<K extends Comparable<K>, V> extends ReversibleKvI
|
||||
}
|
||||
|
||||
Log.tracev("{0} Initialized: {1}", _name, _sortedIterators);
|
||||
switch (_initialStartType) {
|
||||
switch (startType) {
|
||||
// case LT -> {
|
||||
// assert _sortedIterators.isEmpty() || _sortedIterators.firstKey().compareTo(initialStartKey) < 0;
|
||||
// }
|
||||
@@ -90,37 +75,17 @@ public class MergingKvIterator<K extends Comparable<K>, V> extends ReversibleKvI
|
||||
// assert _sortedIterators.isEmpty() || _sortedIterators.firstKey().compareTo(initialStartKey) <= 0;
|
||||
// }
|
||||
case GT -> {
|
||||
assert _sortedIterators.isEmpty() || _sortedIterators.firstKey().compareTo(_initialStartKey) > 0;
|
||||
assert _sortedIterators.isEmpty() || _sortedIterators.firstKey().compareTo(startKey) > 0;
|
||||
}
|
||||
case GE -> {
|
||||
assert _sortedIterators.isEmpty() || _sortedIterators.firstKey().compareTo(_initialStartKey) >= 0;
|
||||
assert _sortedIterators.isEmpty() || _sortedIterators.firstKey().compareTo(startKey) >= 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void doHydrate() {
|
||||
if (_firstMatchState instanceof FirstMatchNone) {
|
||||
return;
|
||||
}
|
||||
|
||||
boolean consumed = _firstMatchState instanceof FirstMatchConsumed;
|
||||
if (_firstMatchState instanceof FirstMatchFound(CloseableKvIterator iterator)) {
|
||||
iterator.close();
|
||||
}
|
||||
|
||||
_firstMatchState = new FirstMatchNone<>();
|
||||
|
||||
{
|
||||
int counter = 0;
|
||||
var iteratorsTmp = new HashMap<CloseableKvIterator<K, V>, Integer>();
|
||||
for (var iteratorFn : _pendingIterators) {
|
||||
var iterator = iteratorFn.get(consumed ? IteratorStart.GT : IteratorStart.GE, _initialStartKey);
|
||||
iteratorsTmp.put(iterator, counter++);
|
||||
}
|
||||
_iterators = Map.copyOf(iteratorsTmp);
|
||||
}
|
||||
|
||||
doInitialAdvance();
|
||||
@SafeVarargs
|
||||
public MergingKvIterator(String name, IteratorStart startType, K startKey, IterProdFn<K, V>... iterators) {
|
||||
this(name, startType, startKey, List.of(iterators));
|
||||
}
|
||||
|
||||
private void advanceIterator(CloseableKvIterator<K, V> iterator) {
|
||||
@@ -151,17 +116,6 @@ public class MergingKvIterator<K extends Comparable<K>, V> extends ReversibleKvI
|
||||
|
||||
@Override
|
||||
protected void reverse() {
|
||||
switch (_firstMatchState) {
|
||||
case FirstMatchFound<K, V> firstMatchFound -> {
|
||||
doHydrate();
|
||||
}
|
||||
case FirstMatchConsumed<K, V> firstMatchConsumed -> {
|
||||
doHydrate();
|
||||
}
|
||||
default -> {
|
||||
}
|
||||
}
|
||||
|
||||
var cur = _goingForward ? _sortedIterators.pollFirstEntry() : _sortedIterators.pollLastEntry();
|
||||
Log.tracev("{0} Reversing from {1}", _name, cur);
|
||||
_goingForward = !_goingForward;
|
||||
@@ -185,18 +139,6 @@ public class MergingKvIterator<K extends Comparable<K>, V> extends ReversibleKvI
|
||||
|
||||
@Override
|
||||
protected K peekImpl() {
|
||||
switch (_firstMatchState) {
|
||||
case FirstMatchFound<K, V> firstMatchFound -> {
|
||||
return firstMatchFound.iterator.peekNextKey();
|
||||
}
|
||||
case FirstMatchConsumed<K, V> firstMatchConsumed -> {
|
||||
doHydrate();
|
||||
break;
|
||||
}
|
||||
default -> {
|
||||
}
|
||||
}
|
||||
|
||||
if (_sortedIterators.isEmpty())
|
||||
throw new NoSuchElementException();
|
||||
return _goingForward ? _sortedIterators.firstKey() : _sortedIterators.lastKey();
|
||||
@@ -204,22 +146,6 @@ public class MergingKvIterator<K extends Comparable<K>, V> extends ReversibleKvI
|
||||
|
||||
@Override
|
||||
protected void skipImpl() {
|
||||
switch (_firstMatchState) {
|
||||
case FirstMatchFound<K, V> firstMatchFound -> {
|
||||
var curVal = firstMatchFound.iterator.next();
|
||||
firstMatchFound.iterator.close();
|
||||
_firstMatchState = new FirstMatchConsumed<>();
|
||||
// Log.tracev("{0} Read from {1}: {2}, next: {3}", _name, firstMatchFound.iterator, curVal, _sortedIterators.keySet());
|
||||
return;
|
||||
}
|
||||
case FirstMatchConsumed<K, V> firstMatchConsumed -> {
|
||||
doHydrate();
|
||||
break;
|
||||
}
|
||||
default -> {
|
||||
}
|
||||
}
|
||||
|
||||
var cur = _goingForward ? _sortedIterators.pollFirstEntry() : _sortedIterators.pollLastEntry();
|
||||
if (cur == null) {
|
||||
throw new NoSuchElementException();
|
||||
@@ -231,38 +157,11 @@ public class MergingKvIterator<K extends Comparable<K>, V> extends ReversibleKvI
|
||||
|
||||
@Override
|
||||
protected boolean hasImpl() {
|
||||
switch (_firstMatchState) {
|
||||
case FirstMatchFound<K, V> firstMatchFound -> {
|
||||
return true;
|
||||
}
|
||||
case FirstMatchConsumed<K, V> firstMatchConsumed -> {
|
||||
doHydrate();
|
||||
break;
|
||||
}
|
||||
default -> {
|
||||
}
|
||||
}
|
||||
return !_sortedIterators.isEmpty();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Pair<K, V> nextImpl() {
|
||||
switch (_firstMatchState) {
|
||||
case FirstMatchFound<K, V> firstMatchFound -> {
|
||||
var curVal = firstMatchFound.iterator.next();
|
||||
firstMatchFound.iterator.close();
|
||||
_firstMatchState = new FirstMatchConsumed<>();
|
||||
// Log.tracev("{0} Read from {1}: {2}, next: {3}", _name, firstMatchFound.iterator, curVal, _sortedIterators.keySet());
|
||||
return curVal;
|
||||
}
|
||||
case FirstMatchConsumed<K, V> firstMatchConsumed -> {
|
||||
doHydrate();
|
||||
break;
|
||||
}
|
||||
default -> {
|
||||
}
|
||||
}
|
||||
|
||||
var cur = _goingForward ? _sortedIterators.pollFirstEntry() : _sortedIterators.pollLastEntry();
|
||||
if (cur == null) {
|
||||
throw new NoSuchElementException();
|
||||
@@ -275,9 +174,6 @@ public class MergingKvIterator<K extends Comparable<K>, V> extends ReversibleKvI
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
if (_firstMatchState instanceof FirstMatchFound(CloseableKvIterator iterator)) {
|
||||
iterator.close();
|
||||
}
|
||||
for (CloseableKvIterator<K, V> iterator : _iterators.keySet()) {
|
||||
iterator.close();
|
||||
}
|
||||
|
||||
@@ -189,7 +189,7 @@ public class CachingObjectPersistentStore {
|
||||
}
|
||||
var read = _backing.readObject(name);
|
||||
maybeCache(name, read);
|
||||
return _backing.readObject(name);
|
||||
return read;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -1,8 +1,9 @@
|
||||
package com.usatiuk.objects.stores;
|
||||
|
||||
import com.google.protobuf.ByteString;
|
||||
import com.usatiuk.dhfs.supportlib.UninitializedByteBuffer;
|
||||
import com.usatiuk.dhfs.utils.RefcountedCloseable;
|
||||
import com.usatiuk.objects.JObjectKey;
|
||||
import com.usatiuk.objects.JObjectKeyImpl;
|
||||
import com.usatiuk.objects.JObjectKeyMax;
|
||||
import com.usatiuk.objects.JObjectKeyMin;
|
||||
import com.usatiuk.objects.iterators.CloseableKvIterator;
|
||||
@@ -10,8 +11,6 @@ import com.usatiuk.objects.iterators.IteratorStart;
|
||||
import com.usatiuk.objects.iterators.KeyPredicateKvIterator;
|
||||
import com.usatiuk.objects.iterators.ReversibleKvIterator;
|
||||
import com.usatiuk.objects.snapshot.Snapshot;
|
||||
import com.usatiuk.dhfs.supportlib.UninitializedByteBuffer;
|
||||
import com.usatiuk.dhfs.utils.RefcountedCloseable;
|
||||
import io.quarkus.arc.properties.IfBuildProperty;
|
||||
import io.quarkus.logging.Log;
|
||||
import io.quarkus.runtime.ShutdownEvent;
|
||||
|
||||
@@ -0,0 +1,280 @@
|
||||
package com.usatiuk.objects.stores;
|
||||
|
||||
import com.google.protobuf.ByteString;
|
||||
import com.usatiuk.objects.JObjectKey;
|
||||
import com.usatiuk.objects.JObjectKeyMax;
|
||||
import com.usatiuk.objects.JObjectKeyMin;
|
||||
import com.usatiuk.objects.iterators.CloseableKvIterator;
|
||||
import com.usatiuk.objects.iterators.IteratorStart;
|
||||
import com.usatiuk.objects.iterators.ReversibleKvIterator;
|
||||
import com.usatiuk.objects.snapshot.Snapshot;
|
||||
import io.quarkus.arc.properties.IfBuildProperty;
|
||||
import io.quarkus.logging.Log;
|
||||
import io.quarkus.runtime.ShutdownEvent;
|
||||
import io.quarkus.runtime.StartupEvent;
|
||||
import jakarta.annotation.Priority;
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
import jakarta.enterprise.event.Observes;
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
import org.eclipse.microprofile.config.inject.ConfigProperty;
|
||||
import org.rocksdb.*;
|
||||
|
||||
import javax.annotation.Nonnull;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.nio.file.Path;
|
||||
import java.util.NoSuchElementException;
|
||||
import java.util.Optional;
|
||||
|
||||
@ApplicationScoped
|
||||
@IfBuildProperty(name = "dhfs.objects.persistence", stringValue = "rocks")
|
||||
public class RocksDbObjectPersistentStore implements ObjectPersistentStore {
|
||||
private static final String DB_NAME = "objects";
|
||||
private static final byte[] DB_VER_OBJ_NAME = "__DB_VER_OBJ".getBytes(StandardCharsets.UTF_8);
|
||||
private final Path _root;
|
||||
private Options _options;
|
||||
private TransactionDBOptions _transactionDBOptions;
|
||||
private TransactionDB _db;
|
||||
private boolean _ready = false;
|
||||
|
||||
public RocksDbObjectPersistentStore(@ConfigProperty(name = "dhfs.objects.persistence.files.root") String root) {
|
||||
_root = Path.of(root).resolve("objects");
|
||||
}
|
||||
|
||||
void init(@Observes @Priority(100) StartupEvent event) throws RocksDBException {
|
||||
if (!_root.toFile().exists()) {
|
||||
Log.info("Initializing with root " + _root);
|
||||
_root.toFile().mkdirs();
|
||||
}
|
||||
|
||||
RocksDB.loadLibrary();
|
||||
|
||||
_options = new Options().setCreateIfMissing(true);
|
||||
_transactionDBOptions = new TransactionDBOptions();
|
||||
_db = TransactionDB.open(_options, _transactionDBOptions, _root.toString());
|
||||
|
||||
try (var txn = _db.beginTransaction(new WriteOptions())) {
|
||||
var read = readTxId(txn);
|
||||
if (read.isPresent()) {
|
||||
Log.infov("Read tx id {0}", read.get());
|
||||
} else {
|
||||
txn.put(DB_VER_OBJ_NAME, ByteBuffer.allocate(8).putLong(0).array());
|
||||
txn.commit();
|
||||
}
|
||||
}
|
||||
|
||||
_ready = true;
|
||||
}
|
||||
|
||||
private Optional<Long> readTxId(Transaction txn) throws RocksDBException {
|
||||
var value = txn.get(new ReadOptions(), DB_VER_OBJ_NAME);
|
||||
return Optional.ofNullable(value).map(ByteBuffer::wrap).map(ByteBuffer::getLong);
|
||||
}
|
||||
|
||||
void shutdown(@Observes @Priority(900) ShutdownEvent event) {
|
||||
_ready = false;
|
||||
_db.close();
|
||||
}
|
||||
|
||||
private void verifyReady() {
|
||||
if (!_ready) throw new IllegalStateException("Wrong service order!");
|
||||
}
|
||||
|
||||
@Nonnull
|
||||
@Override
|
||||
public Optional<ByteString> readObject(JObjectKey name) {
|
||||
verifyReady();
|
||||
byte[] got = null;
|
||||
try {
|
||||
got = _db.get(new ReadOptions(), name.bytes());
|
||||
} catch (RocksDBException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
return Optional.ofNullable(got).map(ByteString::copyFrom);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Snapshot<JObjectKey, ByteString> getSnapshot() {
|
||||
var txn = _db.beginTransaction(new WriteOptions());
|
||||
txn.setSnapshot();
|
||||
var rocksDbSnapshot = txn.getSnapshot();
|
||||
long commitId = 0;
|
||||
try {
|
||||
commitId = readTxId(txn).orElseThrow();
|
||||
} catch (RocksDBException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
long finalCommitId = commitId;
|
||||
return new Snapshot<JObjectKey, ByteString>() {
|
||||
private final Transaction _txn = txn;
|
||||
private final long _id = finalCommitId;
|
||||
private final org.rocksdb.Snapshot _rocksDbSnapshot = rocksDbSnapshot;
|
||||
private boolean _closed = false;
|
||||
|
||||
@Override
|
||||
public CloseableKvIterator<JObjectKey, ByteString> getIterator(IteratorStart start, JObjectKey key) {
|
||||
assert !_closed;
|
||||
return new RocksDbKvIterator(_txn, start, key, _rocksDbSnapshot);
|
||||
}
|
||||
|
||||
@Nonnull
|
||||
@Override
|
||||
public Optional<ByteString> readObject(JObjectKey name) {
|
||||
assert !_closed;
|
||||
try (var readOptions = new ReadOptions().setSnapshot(_rocksDbSnapshot)) {
|
||||
var got = _txn.get(readOptions, name.bytes());
|
||||
return Optional.ofNullable(got).map(ByteString::copyFrom);
|
||||
} catch (RocksDBException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public long id() {
|
||||
assert !_closed;
|
||||
return _id;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
assert !_closed;
|
||||
_closed = true;
|
||||
_txn.close();
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public Runnable prepareTx(TxManifestRaw names, long txId) {
|
||||
verifyReady();
|
||||
var txn = _db.beginTransaction(new WriteOptions());
|
||||
try {
|
||||
for (var written : names.written()) {
|
||||
txn.put(written.getKey().bytes(), written.getValue().toByteArray());
|
||||
}
|
||||
for (JObjectKey key : names.deleted()) {
|
||||
txn.delete(key.bytes());
|
||||
}
|
||||
|
||||
assert txId > readTxId(txn).orElseThrow();
|
||||
|
||||
txn.put(DB_VER_OBJ_NAME, ByteBuffer.allocate(8).putLong(txId).array());
|
||||
} catch (Throwable t) {
|
||||
txn.close();
|
||||
throw new RuntimeException(t);
|
||||
}
|
||||
return () -> {
|
||||
try {
|
||||
txn.commit();
|
||||
} catch (RocksDBException e) {
|
||||
throw new RuntimeException(e);
|
||||
} finally {
|
||||
txn.close();
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getTotalSpace() {
|
||||
verifyReady();
|
||||
return _root.toFile().getTotalSpace();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getFreeSpace() {
|
||||
verifyReady();
|
||||
return _root.toFile().getFreeSpace();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getUsableSpace() {
|
||||
verifyReady();
|
||||
return _root.toFile().getUsableSpace();
|
||||
}
|
||||
|
||||
private class RocksDbKvIterator extends ReversibleKvIterator<JObjectKey, ByteString> {
|
||||
private final RocksIterator _iterator;
|
||||
private final org.rocksdb.Snapshot _rocksDbSnapshot;
|
||||
private final ReadOptions _readOptions;
|
||||
private boolean _hasNext;
|
||||
|
||||
RocksDbKvIterator(Transaction txn, IteratorStart start, JObjectKey key, org.rocksdb.Snapshot rocksDbSnapshot) {
|
||||
_rocksDbSnapshot = rocksDbSnapshot;
|
||||
_readOptions = new ReadOptions().setSnapshot(_rocksDbSnapshot);
|
||||
_iterator = txn.getIterator(_readOptions);
|
||||
verifyReady();
|
||||
|
||||
if (key instanceof JObjectKeyMin) {
|
||||
_iterator.seekToFirst();
|
||||
} else if (key instanceof JObjectKeyMax) {
|
||||
_iterator.seekToLast();
|
||||
} else {
|
||||
_iterator.seek(key.bytes());
|
||||
}
|
||||
_hasNext = _iterator.isValid();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
_iterator.close();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void reverse() {
|
||||
if (_hasNext) {
|
||||
if (_goingForward) {
|
||||
_iterator.prev();
|
||||
} else {
|
||||
_iterator.next();
|
||||
}
|
||||
} else {
|
||||
if (_goingForward) {
|
||||
_iterator.seekToLast();
|
||||
} else {
|
||||
_iterator.seekToFirst();
|
||||
}
|
||||
}
|
||||
_goingForward = !_goingForward;
|
||||
_hasNext = _iterator.isValid();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected JObjectKey peekImpl() {
|
||||
if (!_hasNext) {
|
||||
throw new NoSuchElementException("No more elements");
|
||||
}
|
||||
return JObjectKey.fromByteBuffer(ByteBuffer.wrap(_iterator.key()));
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void skipImpl() {
|
||||
if (_goingForward) {
|
||||
_iterator.next();
|
||||
} else {
|
||||
_iterator.prev();
|
||||
}
|
||||
_hasNext = _iterator.isValid();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean hasImpl() {
|
||||
return _hasNext;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Pair<JObjectKey, ByteString> nextImpl() {
|
||||
if (!_hasNext) {
|
||||
throw new NoSuchElementException("No more elements");
|
||||
}
|
||||
var key = JObjectKey.fromByteBuffer(ByteBuffer.wrap(_iterator.key()));
|
||||
var value = ByteString.copyFrom(_iterator.value());
|
||||
if (_goingForward) {
|
||||
_iterator.next();
|
||||
} else {
|
||||
_iterator.prev();
|
||||
}
|
||||
_hasNext = _iterator.isValid();
|
||||
return Pair.of(key, value);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -24,10 +24,6 @@ public class SerializingObjectPersistentStore {
|
||||
@Inject
|
||||
ObjectPersistentStore delegateStore;
|
||||
|
||||
@Nonnull
|
||||
Optional<JDataVersionedWrapper> readObject(JObjectKey name) {
|
||||
return delegateStore.readObject(name).map(serializer::deserialize);
|
||||
}
|
||||
|
||||
public Snapshot<JObjectKey, JDataVersionedWrapper> getSnapshot() {
|
||||
return new Snapshot<JObjectKey, JDataVersionedWrapper>() {
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
dhfs.objects.persistence=lmdb
|
||||
dhfs.objects.persistence=rocks
|
||||
dhfs.objects.writeback.limit=134217728
|
||||
dhfs.objects.lru.limit=134217728
|
||||
dhfs.objects.lru.print-stats=true
|
||||
|
||||
@@ -1,129 +1,129 @@
|
||||
package com.usatiuk.objects.stores;
|
||||
|
||||
|
||||
import com.google.protobuf.ByteString;
|
||||
import com.usatiuk.objects.JObjectKey;
|
||||
import com.usatiuk.objects.Just;
|
||||
import com.usatiuk.objects.TempDataProfile;
|
||||
import com.usatiuk.objects.iterators.IteratorStart;
|
||||
import io.quarkus.test.junit.QuarkusTest;
|
||||
import io.quarkus.test.junit.TestProfile;
|
||||
import jakarta.inject.Inject;
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.RepeatedTest;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
class Profiles {
|
||||
public static class LmdbKvIteratorTestProfile extends TempDataProfile {
|
||||
}
|
||||
}
|
||||
|
||||
@QuarkusTest
|
||||
@TestProfile(Profiles.LmdbKvIteratorTestProfile.class)
|
||||
public class LmdbKvIteratorTest {
|
||||
|
||||
@Inject
|
||||
LmdbObjectPersistentStore store;
|
||||
|
||||
long getNextTxId() {
|
||||
try (var s = store.getSnapshot()) {
|
||||
return s.id() + 1;
|
||||
}
|
||||
}
|
||||
|
||||
@RepeatedTest(100)
|
||||
public void iteratorTest1() {
|
||||
store.prepareTx(
|
||||
new TxManifestRaw(
|
||||
List.of(Pair.of(JObjectKey.of(Long.toString(1)), ByteString.copyFrom(new byte[]{2})),
|
||||
Pair.of(JObjectKey.of(Long.toString(2)), ByteString.copyFrom(new byte[]{3})),
|
||||
Pair.of(JObjectKey.of(Long.toString(3)), ByteString.copyFrom(new byte[]{4}))),
|
||||
List.of()
|
||||
), getNextTxId()
|
||||
).run();
|
||||
|
||||
try (var snapshot = store.getSnapshot()) {
|
||||
var iterator = snapshot.getIterator(IteratorStart.GE, JObjectKey.of(""));
|
||||
Just.checkIterator(iterator, List.of(Pair.of(JObjectKey.of(Long.toString(1)), ByteString.copyFrom(new byte[]{2})),
|
||||
Pair.of(JObjectKey.of(Long.toString(2)), ByteString.copyFrom(new byte[]{3})),
|
||||
Pair.of(JObjectKey.of(Long.toString(3)), ByteString.copyFrom(new byte[]{4}))));
|
||||
Assertions.assertFalse(iterator.hasNext());
|
||||
iterator.close();
|
||||
|
||||
iterator = snapshot.getIterator(IteratorStart.LE, JObjectKey.of(Long.toString(3)));
|
||||
Just.checkIterator(iterator, Pair.of(JObjectKey.of(Long.toString(3)), ByteString.copyFrom(new byte[]{4})));
|
||||
Assertions.assertFalse(iterator.hasNext());
|
||||
iterator.close();
|
||||
|
||||
iterator = snapshot.getIterator(IteratorStart.LE, JObjectKey.of(Long.toString(2)));
|
||||
Just.checkIterator(iterator, Pair.of(JObjectKey.of(Long.toString(2)), ByteString.copyFrom(new byte[]{3})), Pair.of(JObjectKey.of(Long.toString(3)), ByteString.copyFrom(new byte[]{4})));
|
||||
Assertions.assertFalse(iterator.hasNext());
|
||||
iterator.close();
|
||||
|
||||
iterator = snapshot.getIterator(IteratorStart.GE, JObjectKey.of(Long.toString(2)));
|
||||
Just.checkIterator(iterator, Pair.of(JObjectKey.of(Long.toString(2)), ByteString.copyFrom(new byte[]{3})), Pair.of(JObjectKey.of(Long.toString(3)), ByteString.copyFrom(new byte[]{4})));
|
||||
Assertions.assertFalse(iterator.hasNext());
|
||||
iterator.close();
|
||||
|
||||
iterator = snapshot.getIterator(IteratorStart.GT, JObjectKey.of(Long.toString(2)));
|
||||
Just.checkIterator(iterator, Pair.of(JObjectKey.of(Long.toString(3)), ByteString.copyFrom(new byte[]{4})));
|
||||
Assertions.assertFalse(iterator.hasNext());
|
||||
iterator.close();
|
||||
|
||||
iterator = snapshot.getIterator(IteratorStart.LT, JObjectKey.of(Long.toString(3)));
|
||||
Just.checkIterator(iterator, Pair.of(JObjectKey.of(Long.toString(2)), ByteString.copyFrom(new byte[]{3})), Pair.of(JObjectKey.of(Long.toString(3)), ByteString.copyFrom(new byte[]{4})));
|
||||
Assertions.assertFalse(iterator.hasNext());
|
||||
iterator.close();
|
||||
|
||||
iterator = snapshot.getIterator(IteratorStart.LT, JObjectKey.of(Long.toString(2)));
|
||||
Just.checkIterator(iterator, Pair.of(JObjectKey.of(Long.toString(1)), ByteString.copyFrom(new byte[]{2})), Pair.of(JObjectKey.of(Long.toString(2)), ByteString.copyFrom(new byte[]{3})), Pair.of(JObjectKey.of(Long.toString(3)), ByteString.copyFrom(new byte[]{4})));
|
||||
Assertions.assertFalse(iterator.hasNext());
|
||||
iterator.close();
|
||||
|
||||
iterator = snapshot.getIterator(IteratorStart.LT, JObjectKey.of(Long.toString(1)));
|
||||
Just.checkIterator(iterator, Pair.of(JObjectKey.of(Long.toString(1)), ByteString.copyFrom(new byte[]{2})), Pair.of(JObjectKey.of(Long.toString(2)), ByteString.copyFrom(new byte[]{3})), Pair.of(JObjectKey.of(Long.toString(3)), ByteString.copyFrom(new byte[]{4})));
|
||||
Assertions.assertFalse(iterator.hasNext());
|
||||
iterator.close();
|
||||
|
||||
iterator = snapshot.getIterator(IteratorStart.LE, JObjectKey.of(Long.toString(1)));
|
||||
Just.checkIterator(iterator, Pair.of(JObjectKey.of(Long.toString(1)), ByteString.copyFrom(new byte[]{2})), Pair.of(JObjectKey.of(Long.toString(2)), ByteString.copyFrom(new byte[]{3})), Pair.of(JObjectKey.of(Long.toString(3)), ByteString.copyFrom(new byte[]{4})));
|
||||
Assertions.assertFalse(iterator.hasNext());
|
||||
iterator.close();
|
||||
|
||||
iterator = snapshot.getIterator(IteratorStart.GT, JObjectKey.of(Long.toString(3)));
|
||||
Assertions.assertFalse(iterator.hasNext());
|
||||
iterator.close();
|
||||
|
||||
iterator = snapshot.getIterator(IteratorStart.GT, JObjectKey.of(Long.toString(4)));
|
||||
Assertions.assertFalse(iterator.hasNext());
|
||||
iterator.close();
|
||||
|
||||
iterator = snapshot.getIterator(IteratorStart.LE, JObjectKey.of(Long.toString(0)));
|
||||
Just.checkIterator(iterator, Pair.of(JObjectKey.of(Long.toString(1)), ByteString.copyFrom(new byte[]{2})), Pair.of(JObjectKey.of(Long.toString(2)), ByteString.copyFrom(new byte[]{3})), Pair.of(JObjectKey.of(Long.toString(3)), ByteString.copyFrom(new byte[]{4})));
|
||||
Assertions.assertFalse(iterator.hasNext());
|
||||
iterator.close();
|
||||
|
||||
iterator = snapshot.getIterator(IteratorStart.GE, JObjectKey.of(Long.toString(2)));
|
||||
Assertions.assertTrue(iterator.hasNext());
|
||||
Assertions.assertEquals(JObjectKey.of(Long.toString(2)), iterator.peekNextKey());
|
||||
Assertions.assertEquals(JObjectKey.of(Long.toString(1)), iterator.peekPrevKey());
|
||||
Assertions.assertEquals(JObjectKey.of(Long.toString(2)), iterator.peekNextKey());
|
||||
Assertions.assertEquals(JObjectKey.of(Long.toString(1)), iterator.peekPrevKey());
|
||||
Just.checkIterator(iterator.reversed(), Pair.of(JObjectKey.of(Long.toString(1)), ByteString.copyFrom(new byte[]{2})));
|
||||
Just.checkIterator(iterator, Pair.of(JObjectKey.of(Long.toString(1)), ByteString.copyFrom(new byte[]{2})), Pair.of(JObjectKey.of(Long.toString(2)), ByteString.copyFrom(new byte[]{3})), Pair.of(JObjectKey.of(Long.toString(3)), ByteString.copyFrom(new byte[]{4})));
|
||||
Assertions.assertEquals(Pair.of(JObjectKey.of(Long.toString(3)), ByteString.copyFrom(new byte[]{4})), iterator.prev());
|
||||
Assertions.assertEquals(Pair.of(JObjectKey.of(Long.toString(2)), ByteString.copyFrom(new byte[]{3})), iterator.prev());
|
||||
Assertions.assertEquals(Pair.of(JObjectKey.of(Long.toString(2)), ByteString.copyFrom(new byte[]{3})), iterator.next());
|
||||
iterator.close();
|
||||
}
|
||||
|
||||
store.prepareTx(new TxManifestRaw(
|
||||
List.of(),
|
||||
List.of(JObjectKey.of(Long.toString(1)), JObjectKey.of(Long.toString(2)), JObjectKey.of(Long.toString(3)))
|
||||
),
|
||||
getNextTxId()
|
||||
).run();
|
||||
}
|
||||
}
|
||||
//package com.usatiuk.objects.stores;
|
||||
//
|
||||
//
|
||||
//import com.google.protobuf.ByteString;
|
||||
//import com.usatiuk.objects.JObjectKey;
|
||||
//import com.usatiuk.objects.Just;
|
||||
//import com.usatiuk.objects.TempDataProfile;
|
||||
//import com.usatiuk.objects.iterators.IteratorStart;
|
||||
//import io.quarkus.test.junit.QuarkusTest;
|
||||
//import io.quarkus.test.junit.TestProfile;
|
||||
//import jakarta.inject.Inject;
|
||||
//import org.apache.commons.lang3.tuple.Pair;
|
||||
//import org.junit.jupiter.api.Assertions;
|
||||
//import org.junit.jupiter.api.RepeatedTest;
|
||||
//
|
||||
//import java.util.List;
|
||||
//
|
||||
//class Profiles {
|
||||
// public static class LmdbKvIteratorTestProfile extends TempDataProfile {
|
||||
// }
|
||||
//}
|
||||
//
|
||||
//@QuarkusTest
|
||||
//@TestProfile(Profiles.LmdbKvIteratorTestProfile.class)
|
||||
//public class LmdbKvIteratorTest {
|
||||
//
|
||||
// @Inject
|
||||
// LmdbObjectPersistentStore store;
|
||||
//
|
||||
// long getNextTxId() {
|
||||
// try (var s = store.getSnapshot()) {
|
||||
// return s.id() + 1;
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// @RepeatedTest(100)
|
||||
// public void iteratorTest1() {
|
||||
// store.prepareTx(
|
||||
// new TxManifestRaw(
|
||||
// List.of(Pair.of(JObjectKey.of(Long.toString(1)), ByteString.copyFrom(new byte[]{2})),
|
||||
// Pair.of(JObjectKey.of(Long.toString(2)), ByteString.copyFrom(new byte[]{3})),
|
||||
// Pair.of(JObjectKey.of(Long.toString(3)), ByteString.copyFrom(new byte[]{4}))),
|
||||
// List.of()
|
||||
// ), getNextTxId()
|
||||
// ).run();
|
||||
//
|
||||
// try (var snapshot = store.getSnapshot()) {
|
||||
// var iterator = snapshot.getIterator(IteratorStart.GE, JObjectKey.of(""));
|
||||
// Just.checkIterator(iterator, List.of(Pair.of(JObjectKey.of(Long.toString(1)), ByteString.copyFrom(new byte[]{2})),
|
||||
// Pair.of(JObjectKey.of(Long.toString(2)), ByteString.copyFrom(new byte[]{3})),
|
||||
// Pair.of(JObjectKey.of(Long.toString(3)), ByteString.copyFrom(new byte[]{4}))));
|
||||
// Assertions.assertFalse(iterator.hasNext());
|
||||
// iterator.close();
|
||||
//
|
||||
// iterator = snapshot.getIterator(IteratorStart.LE, JObjectKey.of(Long.toString(3)));
|
||||
// Just.checkIterator(iterator, Pair.of(JObjectKey.of(Long.toString(3)), ByteString.copyFrom(new byte[]{4})));
|
||||
// Assertions.assertFalse(iterator.hasNext());
|
||||
// iterator.close();
|
||||
//
|
||||
// iterator = snapshot.getIterator(IteratorStart.LE, JObjectKey.of(Long.toString(2)));
|
||||
// Just.checkIterator(iterator, Pair.of(JObjectKey.of(Long.toString(2)), ByteString.copyFrom(new byte[]{3})), Pair.of(JObjectKey.of(Long.toString(3)), ByteString.copyFrom(new byte[]{4})));
|
||||
// Assertions.assertFalse(iterator.hasNext());
|
||||
// iterator.close();
|
||||
//
|
||||
// iterator = snapshot.getIterator(IteratorStart.GE, JObjectKey.of(Long.toString(2)));
|
||||
// Just.checkIterator(iterator, Pair.of(JObjectKey.of(Long.toString(2)), ByteString.copyFrom(new byte[]{3})), Pair.of(JObjectKey.of(Long.toString(3)), ByteString.copyFrom(new byte[]{4})));
|
||||
// Assertions.assertFalse(iterator.hasNext());
|
||||
// iterator.close();
|
||||
//
|
||||
// iterator = snapshot.getIterator(IteratorStart.GT, JObjectKey.of(Long.toString(2)));
|
||||
// Just.checkIterator(iterator, Pair.of(JObjectKey.of(Long.toString(3)), ByteString.copyFrom(new byte[]{4})));
|
||||
// Assertions.assertFalse(iterator.hasNext());
|
||||
// iterator.close();
|
||||
//
|
||||
// iterator = snapshot.getIterator(IteratorStart.LT, JObjectKey.of(Long.toString(3)));
|
||||
// Just.checkIterator(iterator, Pair.of(JObjectKey.of(Long.toString(2)), ByteString.copyFrom(new byte[]{3})), Pair.of(JObjectKey.of(Long.toString(3)), ByteString.copyFrom(new byte[]{4})));
|
||||
// Assertions.assertFalse(iterator.hasNext());
|
||||
// iterator.close();
|
||||
//
|
||||
// iterator = snapshot.getIterator(IteratorStart.LT, JObjectKey.of(Long.toString(2)));
|
||||
// Just.checkIterator(iterator, Pair.of(JObjectKey.of(Long.toString(1)), ByteString.copyFrom(new byte[]{2})), Pair.of(JObjectKey.of(Long.toString(2)), ByteString.copyFrom(new byte[]{3})), Pair.of(JObjectKey.of(Long.toString(3)), ByteString.copyFrom(new byte[]{4})));
|
||||
// Assertions.assertFalse(iterator.hasNext());
|
||||
// iterator.close();
|
||||
//
|
||||
// iterator = snapshot.getIterator(IteratorStart.LT, JObjectKey.of(Long.toString(1)));
|
||||
// Just.checkIterator(iterator, Pair.of(JObjectKey.of(Long.toString(1)), ByteString.copyFrom(new byte[]{2})), Pair.of(JObjectKey.of(Long.toString(2)), ByteString.copyFrom(new byte[]{3})), Pair.of(JObjectKey.of(Long.toString(3)), ByteString.copyFrom(new byte[]{4})));
|
||||
// Assertions.assertFalse(iterator.hasNext());
|
||||
// iterator.close();
|
||||
//
|
||||
// iterator = snapshot.getIterator(IteratorStart.LE, JObjectKey.of(Long.toString(1)));
|
||||
// Just.checkIterator(iterator, Pair.of(JObjectKey.of(Long.toString(1)), ByteString.copyFrom(new byte[]{2})), Pair.of(JObjectKey.of(Long.toString(2)), ByteString.copyFrom(new byte[]{3})), Pair.of(JObjectKey.of(Long.toString(3)), ByteString.copyFrom(new byte[]{4})));
|
||||
// Assertions.assertFalse(iterator.hasNext());
|
||||
// iterator.close();
|
||||
//
|
||||
// iterator = snapshot.getIterator(IteratorStart.GT, JObjectKey.of(Long.toString(3)));
|
||||
// Assertions.assertFalse(iterator.hasNext());
|
||||
// iterator.close();
|
||||
//
|
||||
// iterator = snapshot.getIterator(IteratorStart.GT, JObjectKey.of(Long.toString(4)));
|
||||
// Assertions.assertFalse(iterator.hasNext());
|
||||
// iterator.close();
|
||||
//
|
||||
// iterator = snapshot.getIterator(IteratorStart.LE, JObjectKey.of(Long.toString(0)));
|
||||
// Just.checkIterator(iterator, Pair.of(JObjectKey.of(Long.toString(1)), ByteString.copyFrom(new byte[]{2})), Pair.of(JObjectKey.of(Long.toString(2)), ByteString.copyFrom(new byte[]{3})), Pair.of(JObjectKey.of(Long.toString(3)), ByteString.copyFrom(new byte[]{4})));
|
||||
// Assertions.assertFalse(iterator.hasNext());
|
||||
// iterator.close();
|
||||
//
|
||||
// iterator = snapshot.getIterator(IteratorStart.GE, JObjectKey.of(Long.toString(2)));
|
||||
// Assertions.assertTrue(iterator.hasNext());
|
||||
// Assertions.assertEquals(JObjectKey.of(Long.toString(2)), iterator.peekNextKey());
|
||||
// Assertions.assertEquals(JObjectKey.of(Long.toString(1)), iterator.peekPrevKey());
|
||||
// Assertions.assertEquals(JObjectKey.of(Long.toString(2)), iterator.peekNextKey());
|
||||
// Assertions.assertEquals(JObjectKey.of(Long.toString(1)), iterator.peekPrevKey());
|
||||
// Just.checkIterator(iterator.reversed(), Pair.of(JObjectKey.of(Long.toString(1)), ByteString.copyFrom(new byte[]{2})));
|
||||
// Just.checkIterator(iterator, Pair.of(JObjectKey.of(Long.toString(1)), ByteString.copyFrom(new byte[]{2})), Pair.of(JObjectKey.of(Long.toString(2)), ByteString.copyFrom(new byte[]{3})), Pair.of(JObjectKey.of(Long.toString(3)), ByteString.copyFrom(new byte[]{4})));
|
||||
// Assertions.assertEquals(Pair.of(JObjectKey.of(Long.toString(3)), ByteString.copyFrom(new byte[]{4})), iterator.prev());
|
||||
// Assertions.assertEquals(Pair.of(JObjectKey.of(Long.toString(2)), ByteString.copyFrom(new byte[]{3})), iterator.prev());
|
||||
// Assertions.assertEquals(Pair.of(JObjectKey.of(Long.toString(2)), ByteString.copyFrom(new byte[]{3})), iterator.next());
|
||||
// iterator.close();
|
||||
// }
|
||||
//
|
||||
// store.prepareTx(new TxManifestRaw(
|
||||
// List.of(),
|
||||
// List.of(JObjectKey.of(Long.toString(1)), JObjectKey.of(Long.toString(2)), JObjectKey.of(Long.toString(3)))
|
||||
// ),
|
||||
// getNextTxId()
|
||||
// ).run();
|
||||
// }
|
||||
//}
|
||||
|
||||
@@ -7,10 +7,6 @@ import com.usatiuk.dhfs.RemoteObjectMeta;
|
||||
import com.usatiuk.dhfs.RemoteTransaction;
|
||||
import com.usatiuk.dhfs.files.objects.ChunkData;
|
||||
import com.usatiuk.dhfs.files.objects.File;
|
||||
import com.usatiuk.objects.JData;
|
||||
import com.usatiuk.objects.JObjectKey;
|
||||
import com.usatiuk.objects.JObjectKeyImpl;
|
||||
import com.usatiuk.objects.iterators.IteratorStart;
|
||||
import com.usatiuk.dhfs.jkleppmanntree.JKleppmannTreeManager;
|
||||
import com.usatiuk.dhfs.jkleppmanntree.structs.JKleppmannTreeNode;
|
||||
import com.usatiuk.dhfs.jkleppmanntree.structs.JKleppmannTreeNodeMeta;
|
||||
@@ -19,10 +15,13 @@ import com.usatiuk.dhfs.jkleppmanntree.structs.JKleppmannTreeNodeMetaFile;
|
||||
import com.usatiuk.dhfs.jmap.JMapEntry;
|
||||
import com.usatiuk.dhfs.jmap.JMapHelper;
|
||||
import com.usatiuk.dhfs.jmap.JMapLongKey;
|
||||
import com.usatiuk.dhfs.utils.StatusRuntimeExceptionNoStacktrace;
|
||||
import com.usatiuk.objects.JData;
|
||||
import com.usatiuk.objects.JObjectKey;
|
||||
import com.usatiuk.objects.iterators.IteratorStart;
|
||||
import com.usatiuk.objects.transaction.LockingStrategy;
|
||||
import com.usatiuk.objects.transaction.Transaction;
|
||||
import com.usatiuk.objects.transaction.TransactionManager;
|
||||
import com.usatiuk.dhfs.utils.StatusRuntimeExceptionNoStacktrace;
|
||||
import io.grpc.Status;
|
||||
import io.grpc.StatusRuntimeException;
|
||||
import io.quarkus.logging.Log;
|
||||
@@ -48,21 +47,12 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
@Inject
|
||||
TransactionManager jObjectTxManager;
|
||||
|
||||
@ConfigProperty(name = "dhfs.files.target_chunk_alignment")
|
||||
int targetChunkAlignment;
|
||||
|
||||
@ConfigProperty(name = "dhfs.files.target_chunk_size")
|
||||
int targetChunkSize;
|
||||
|
||||
@ConfigProperty(name = "dhfs.files.write_merge_threshold")
|
||||
float writeMergeThreshold;
|
||||
|
||||
@ConfigProperty(name = "dhfs.files.write_merge_max_chunk_to_take")
|
||||
float writeMergeMaxChunkToTake;
|
||||
|
||||
@ConfigProperty(name = "dhfs.files.write_merge_limit")
|
||||
float writeMergeLimit;
|
||||
|
||||
@ConfigProperty(name = "dhfs.files.write_last_chunk_limit")
|
||||
float writeLastChunkLimit;
|
||||
|
||||
@ConfigProperty(name = "dhfs.files.use_hash_for_chunks")
|
||||
boolean useHashForChunks;
|
||||
|
||||
@@ -356,22 +346,8 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
return readChunk(uuid).size();
|
||||
}
|
||||
|
||||
private void cleanupChunks(File f, Collection<JObjectKey> uuids) {
|
||||
// FIXME:
|
||||
// var inFile = useHashForChunks ? new HashSet<>(f.getChunks().values()) : Collections.emptySet();
|
||||
// for (var cuuid : uuids) {
|
||||
// try {
|
||||
// if (inFile.contains(cuuid)) continue;
|
||||
// jObjectManager.get(cuuid)
|
||||
// .ifPresent(jObject -> jObject.runWriteLocked(JObjectManager.ResolutionStrategy.NO_RESOLUTION,
|
||||
// (m, d, b, v) -> {
|
||||
// m.removeRef(f.getName());
|
||||
// return null;
|
||||
// }));
|
||||
// } catch (Exception e) {
|
||||
// Log.error("Error when cleaning chunk " + cuuid, e);
|
||||
// }
|
||||
// }
|
||||
private long alignDown(long num, long n) {
|
||||
return num & -(1L << n);
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -380,7 +356,6 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
if (offset < 0)
|
||||
throw new StatusRuntimeException(Status.INVALID_ARGUMENT.withDescription("Offset should be more than zero: " + offset));
|
||||
|
||||
// FIXME:
|
||||
var file = remoteTx.getData(File.class, fileUuid, LockingStrategy.WRITE).orElse(null);
|
||||
if (file == null) {
|
||||
Log.error("File not found when trying to write: " + fileUuid);
|
||||
@@ -397,144 +372,58 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
file = remoteTx.getData(File.class, fileUuid).orElse(null);
|
||||
}
|
||||
|
||||
Pair<JMapLongKey, JMapEntry<JMapLongKey>> first;
|
||||
Pair<JMapLongKey, JMapEntry<JMapLongKey>> last;
|
||||
Log.tracev("Getting last");
|
||||
try (var it = jMapHelper.getIterator(file, IteratorStart.LT, JMapLongKey.of(offset + data.size()))) {
|
||||
last = it.hasNext() ? it.next() : null;
|
||||
Log.tracev("Last: {0}", last);
|
||||
}
|
||||
|
||||
NavigableMap<Long, JObjectKey> removedChunks = new TreeMap<>();
|
||||
|
||||
long start = 0;
|
||||
long realOffset = targetChunkAlignment >= 0 ? alignDown(offset, targetChunkAlignment) : offset;
|
||||
long writeEnd = offset + data.size();
|
||||
long start = realOffset;
|
||||
ByteString pendingPrefix = ByteString.empty();
|
||||
ByteString pendingSuffix = ByteString.empty();
|
||||
|
||||
try (var it = jMapHelper.getIterator(file, IteratorStart.LE, JMapLongKey.of(offset))) {
|
||||
first = it.hasNext() ? it.next() : null;
|
||||
Log.tracev("First: {0}", first);
|
||||
boolean empty = last == null;
|
||||
if (first != null && getChunkSize(first.getValue().ref()) + first.getKey().key() <= offset) {
|
||||
first = null;
|
||||
last = null;
|
||||
start = offset;
|
||||
} else if (!empty) {
|
||||
assert first != null;
|
||||
removedChunks.put(first.getKey().key(), first.getValue().ref());
|
||||
while (it.hasNext() && it.peekNextKey().compareTo(last.getKey()) <= 0) {
|
||||
var next = it.next();
|
||||
Log.tracev("Next: {0}", next);
|
||||
removedChunks.put(next.getKey().key(), next.getValue().ref());
|
||||
try (var it = jMapHelper.getIterator(file, IteratorStart.LE, JMapLongKey.of(realOffset))) {
|
||||
while (it.hasNext()) {
|
||||
var curEntry = it.next();
|
||||
long curChunkStart = curEntry.getKey().key();
|
||||
var curChunkId = curEntry.getValue().ref();
|
||||
long curChunkEnd = curChunkStart + getChunkSize(curChunkId);
|
||||
|
||||
if (curChunkEnd <= realOffset) break;
|
||||
|
||||
removedChunks.put(curEntry.getKey().key(), curChunkId);
|
||||
|
||||
if (curChunkStart < offset) {
|
||||
if (curChunkStart < start)
|
||||
start = curChunkStart;
|
||||
|
||||
var readChunk = readChunk(curChunkId);
|
||||
pendingPrefix = pendingPrefix.concat(readChunk.substring(0, Math.min(readChunk.size(), (int) (offset - curChunkStart))));
|
||||
}
|
||||
removedChunks.put(last.getKey().key(), last.getValue().ref());
|
||||
start = first.getKey().key();
|
||||
|
||||
if (curChunkEnd > writeEnd) {
|
||||
var readChunk = readChunk(curChunkId);
|
||||
pendingSuffix = pendingSuffix.concat(readChunk.substring((int) (writeEnd - curChunkStart), readChunk.size()));
|
||||
}
|
||||
|
||||
if (curChunkEnd >= writeEnd) break;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// NavigableMap<Long, JObjectKey> beforeFirst = first != null ? chunksAll.headMap(first.getKey(), false) : Collections.emptyNavigableMap();
|
||||
// NavigableMap<Long, JObjectKey> afterLast = last != null ? chunksAll.tailMap(last.getKey(), false) : Collections.emptyNavigableMap();
|
||||
|
||||
// if (first != null && (getChunkSize(first.getValue()) + first.getKey() <= offset)) {
|
||||
// beforeFirst = chunksAll;
|
||||
// afterLast = Collections.emptyNavigableMap();
|
||||
// first = null;
|
||||
// last = null;
|
||||
// start = offset;
|
||||
// } else if (!chunksAll.isEmpty()) {
|
||||
// var between = chunksAll.subMap(first.getKey(), true, last.getKey(), true);
|
||||
// removedChunks.putAll(between);
|
||||
// start = first.getKey();
|
||||
// }
|
||||
|
||||
ByteString pendingWrites = ByteString.empty();
|
||||
|
||||
if (first != null && first.getKey().key() < offset) {
|
||||
var chunkBytes = readChunk(first.getValue().ref());
|
||||
pendingWrites = pendingWrites.concat(chunkBytes.substring(0, (int) (offset - first.getKey().key())));
|
||||
}
|
||||
pendingWrites = pendingWrites.concat(data);
|
||||
|
||||
if (last != null) {
|
||||
var lchunkBytes = readChunk(last.getValue().ref());
|
||||
if (last.getKey().key() + lchunkBytes.size() > offset + data.size()) {
|
||||
var startInFile = offset + data.size();
|
||||
var startInChunk = startInFile - last.getKey().key();
|
||||
pendingWrites = pendingWrites.concat(lchunkBytes.substring((int) startInChunk, lchunkBytes.size()));
|
||||
}
|
||||
}
|
||||
ByteString pendingWrites = pendingPrefix.concat(data).concat(pendingSuffix);
|
||||
|
||||
int combinedSize = pendingWrites.size();
|
||||
|
||||
if (targetChunkSize > 0) {
|
||||
// if (combinedSize < (targetChunkSize * writeMergeThreshold)) {
|
||||
// boolean leftDone = false;
|
||||
// boolean rightDone = false;
|
||||
// while (!leftDone && !rightDone) {
|
||||
// if (beforeFirst.isEmpty()) leftDone = true;
|
||||
// if (!beforeFirst.isEmpty() || !leftDone) {
|
||||
// var takeLeft = beforeFirst.lastEntry();
|
||||
//
|
||||
// var cuuid = takeLeft.getValue();
|
||||
//
|
||||
// if (getChunkSize(cuuid) >= (targetChunkSize * writeMergeMaxChunkToTake)) {
|
||||
// leftDone = true;
|
||||
// continue;
|
||||
// }
|
||||
//
|
||||
// if ((combinedSize + getChunkSize(cuuid)) > (targetChunkSize * writeMergeLimit)) {
|
||||
// leftDone = true;
|
||||
// continue;
|
||||
// }
|
||||
//
|
||||
// // FIXME: (and test this)
|
||||
// beforeFirst = beforeFirst.headMap(takeLeft.getKey(), false);
|
||||
// start = takeLeft.getKey();
|
||||
// pendingWrites = readChunk(cuuid).concat(pendingWrites);
|
||||
// combinedSize += getChunkSize(cuuid);
|
||||
// removedChunks.put(takeLeft.getKey(), takeLeft.getValue());
|
||||
// }
|
||||
// if (afterLast.isEmpty()) rightDone = true;
|
||||
// if (!afterLast.isEmpty() && !rightDone) {
|
||||
// var takeRight = afterLast.firstEntry();
|
||||
//
|
||||
// var cuuid = takeRight.getValue();
|
||||
//
|
||||
// if (getChunkSize(cuuid) >= (targetChunkSize * writeMergeMaxChunkToTake)) {
|
||||
// rightDone = true;
|
||||
// continue;
|
||||
// }
|
||||
//
|
||||
// if ((combinedSize + getChunkSize(cuuid)) > (targetChunkSize * writeMergeLimit)) {
|
||||
// rightDone = true;
|
||||
// continue;
|
||||
// }
|
||||
//
|
||||
// // FIXME: (and test this)
|
||||
// afterLast = afterLast.tailMap(takeRight.getKey(), false);
|
||||
// pendingWrites = pendingWrites.concat(readChunk(cuuid));
|
||||
// combinedSize += getChunkSize(cuuid);
|
||||
// removedChunks.put(takeRight.getKey(), takeRight.getValue());
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
}
|
||||
|
||||
NavigableMap<Long, JObjectKey> newChunks = new TreeMap<>();
|
||||
|
||||
{
|
||||
int targetChunkSize = 1 << targetChunkAlignment;
|
||||
int cur = 0;
|
||||
while (cur < combinedSize) {
|
||||
int end;
|
||||
|
||||
if (targetChunkSize <= 0)
|
||||
if (targetChunkAlignment < 0)
|
||||
end = combinedSize;
|
||||
else {
|
||||
if ((combinedSize - cur) > (targetChunkSize * writeLastChunkLimit)) {
|
||||
end = Math.min(cur + targetChunkSize, combinedSize);
|
||||
} else {
|
||||
end = combinedSize;
|
||||
}
|
||||
}
|
||||
else
|
||||
end = Math.min(cur + targetChunkSize, combinedSize);
|
||||
|
||||
var thisChunk = pendingWrites.substring(cur, end);
|
||||
|
||||
@@ -557,7 +446,6 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
}
|
||||
|
||||
remoteTx.putData(file);
|
||||
cleanupChunks(file, removedChunks.values());
|
||||
|
||||
return (long) data.size();
|
||||
});
|
||||
@@ -576,17 +464,8 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
}
|
||||
|
||||
if (length == 0) {
|
||||
try (var it = jMapHelper.getIterator(file, IteratorStart.GE, JMapLongKey.of(0))) {
|
||||
while (it.hasNext()) {
|
||||
var next = it.next();
|
||||
jMapHelper.delete(file, next.getKey());
|
||||
}
|
||||
}
|
||||
// var oldChunks = file.chunks();
|
||||
//
|
||||
// file = file.withChunks(TreePMap.empty()).withMTime(System.currentTimeMillis());
|
||||
jMapHelper.deleteAll(file);
|
||||
remoteTx.putData(file);
|
||||
// cleanupChunks(file, oldChunks.values());
|
||||
return true;
|
||||
}
|
||||
|
||||
@@ -689,7 +568,6 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
}
|
||||
|
||||
remoteTx.putData(file);
|
||||
cleanupChunks(file, removedChunks.values());
|
||||
return true;
|
||||
});
|
||||
}
|
||||
|
||||
@@ -17,23 +17,27 @@ public class JMapHelper {
|
||||
Transaction curTx;
|
||||
|
||||
static <K extends JMapKey> JObjectKey makePrefix(JObjectKey holder) {
|
||||
return JObjectKey.of(holder.name() + "/");
|
||||
return JObjectKey.of(holder.name() + "=");
|
||||
}
|
||||
|
||||
static <K extends JMapKey> JObjectKey makeKeyFirst(JObjectKey holder) {
|
||||
return JObjectKey.of(holder.name() + "<");
|
||||
}
|
||||
|
||||
static <K extends JMapKey> JObjectKey makeKey(JObjectKey holder, K key) {
|
||||
return JObjectKey.of(makePrefix(holder).name() + key.toString());
|
||||
}
|
||||
|
||||
static <K extends JMapKey> JObjectKey makeKeyLast(JObjectKey holder) {
|
||||
return JObjectKey.of(holder.name() + ">");
|
||||
}
|
||||
|
||||
public <K extends JMapKey> CloseableKvIterator<K, JMapEntry<K>> getIterator(JMapHolder<K> holder, IteratorStart start, K key) {
|
||||
return new JMapIterator<>(curTx.getIterator(start, makeKey(holder.key(), key)), holder);
|
||||
}
|
||||
|
||||
public <K extends JMapKey> CloseableKvIterator<K, JMapEntry<K>> getIterator(JMapHolder<K> holder, K key) {
|
||||
return getIterator(holder, IteratorStart.GE, key);
|
||||
}
|
||||
|
||||
public <K extends JMapKey> CloseableKvIterator<K, JMapEntry<K>> getIterator(JMapHolder<K> holder) {
|
||||
return new JMapIterator<>(curTx.getIterator(IteratorStart.GE, makePrefix(holder.key())), holder);
|
||||
return new JMapIterator<>(curTx.getIterator(IteratorStart.GT, makeKeyFirst(holder.key())), holder);
|
||||
}
|
||||
|
||||
public <K extends JMapKey> void put(JMapHolder<K> holder, K key, JObjectKey ref) {
|
||||
|
||||
@@ -1,5 +1,7 @@
|
||||
package com.usatiuk.dhfs.jmap;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
|
||||
import javax.annotation.Nonnull;
|
||||
import java.io.Serializable;
|
||||
|
||||
@@ -14,7 +16,7 @@ public record JMapLongKey(long key) implements JMapKey, Comparable<JMapKey>, Ser
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return String.format("%016d", key);
|
||||
return StringUtils.leftPad(String.valueOf(key), 20, '0');
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -4,7 +4,7 @@ option java_multiple_files = true;
|
||||
option java_package = "com.usatiuk.dhfs.repository.peerdiscovery";
|
||||
option java_outer_classname = "DhfsObjectPeerDiscoveryApi";
|
||||
|
||||
package dhfs.objects.peerdiscovery;
|
||||
package dhfs.peerdiscovery;
|
||||
|
||||
message PeerDiscoveryInfo {
|
||||
string uuid = 1;
|
||||
@@ -4,7 +4,7 @@ option java_multiple_files = true;
|
||||
option java_package = "com.usatiuk.dhfs.persistence";
|
||||
option java_outer_classname = "DhfsObjectPersistence";
|
||||
|
||||
package dhfs.objects.persistence;
|
||||
package dhfs.persistence;
|
||||
|
||||
message JObjectKeyP {
|
||||
string name = 1;
|
||||
@@ -1,12 +1,12 @@
|
||||
syntax = "proto3";
|
||||
|
||||
import "dhfs_objects_serial.proto";
|
||||
import "dhfs_serial.proto";
|
||||
|
||||
option java_multiple_files = true;
|
||||
option java_package = "com.usatiuk.dhfs.repository";
|
||||
option java_outer_classname = "DhfsObjectSyncApi";
|
||||
|
||||
package dhfs.objects.sync;
|
||||
package dhfs.sync;
|
||||
|
||||
service DhfsObjectSyncGrpc {
|
||||
rpc OpPush (OpPushRequest) returns (OpPushReply) {}
|
||||
@@ -22,22 +22,22 @@ message PingRequest {}
|
||||
message PingReply {}
|
||||
|
||||
message GetObjectRequest {
|
||||
dhfs.objects.persistence.JObjectKeyP name = 2;
|
||||
dhfs.persistence.JObjectKeyP name = 2;
|
||||
}
|
||||
|
||||
message GetObjectReply {
|
||||
dhfs.objects.persistence.ObjectChangelog changelog = 5;
|
||||
dhfs.objects.persistence.JDataRemoteDtoP pushedData = 6;
|
||||
dhfs.persistence.ObjectChangelog changelog = 5;
|
||||
dhfs.persistence.JDataRemoteDtoP pushedData = 6;
|
||||
}
|
||||
|
||||
message CanDeleteRequest {
|
||||
dhfs.objects.persistence.JObjectKeyP name = 2;
|
||||
repeated dhfs.objects.persistence.JObjectKeyP ourReferrers = 3;
|
||||
dhfs.persistence.JObjectKeyP name = 2;
|
||||
repeated dhfs.persistence.JObjectKeyP ourReferrers = 3;
|
||||
}
|
||||
|
||||
message CanDeleteReply {
|
||||
bool deletionCandidate = 2;
|
||||
repeated dhfs.objects.persistence.JObjectKeyP referrers = 3;
|
||||
repeated dhfs.persistence.JObjectKeyP referrers = 3;
|
||||
}
|
||||
|
||||
message OpPushRequest {
|
||||
@@ -15,13 +15,7 @@ dhfs.fuse.debug=false
|
||||
dhfs.fuse.enabled=true
|
||||
dhfs.files.allow_recursive_delete=false
|
||||
dhfs.files.target_chunk_size=2097152
|
||||
# Writes strictly smaller than this will try to merge with blocks nearby
|
||||
dhfs.files.write_merge_threshold=0.8
|
||||
# If a merge would result in a block of greater size than this, stop merging
|
||||
dhfs.files.write_merge_limit=1.2
|
||||
# Don't take blocks of this size and above when merging
|
||||
dhfs.files.write_merge_max_chunk_to_take=1
|
||||
dhfs.files.write_last_chunk_limit=1.5
|
||||
dhfs.files.target_chunk_alignment=19
|
||||
dhfs.objects.deletion.delay=1000
|
||||
dhfs.objects.deletion.can-delete-retry-delay=10000
|
||||
dhfs.objects.ref_verification=true
|
||||
|
||||
@@ -1,12 +1,12 @@
|
||||
package com.usatiuk.dhfs.files;
|
||||
|
||||
import com.usatiuk.dhfs.RemoteTransaction;
|
||||
import com.usatiuk.dhfs.TempDataProfile;
|
||||
import com.usatiuk.dhfs.files.objects.File;
|
||||
import com.usatiuk.dhfs.files.service.DhfsFileService;
|
||||
import com.usatiuk.dhfs.RemoteTransaction;
|
||||
import com.usatiuk.kleppmanntree.AlreadyExistsException;
|
||||
import com.usatiuk.objects.transaction.Transaction;
|
||||
import com.usatiuk.objects.transaction.TransactionManager;
|
||||
import com.usatiuk.kleppmanntree.AlreadyExistsException;
|
||||
import jakarta.inject.Inject;
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.RepeatedTest;
|
||||
@@ -27,6 +27,7 @@ class Profiles {
|
||||
protected void getConfigOverrides(Map<String, String> ret) {
|
||||
ret.put("dhfs.fuse.enabled", "false");
|
||||
ret.put("dhfs.files.target_chunk_size", "-1");
|
||||
ret.put("dhfs.files.target_chunk_alignment", "-1");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -35,6 +36,7 @@ class Profiles {
|
||||
protected void getConfigOverrides(Map<String, String> ret) {
|
||||
ret.put("dhfs.fuse.enabled", "false");
|
||||
ret.put("dhfs.files.target_chunk_size", "3");
|
||||
ret.put("dhfs.files.target_chunk_alignment", "2");
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -150,6 +152,7 @@ public abstract class DhfsFileServiceSimpleTestImpl {
|
||||
Assertions.assertArrayEquals(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, fileService.read(uuid, 0, 10).get().toByteArray());
|
||||
|
||||
fileService.truncate(uuid, 20);
|
||||
Assertions.assertArrayEquals(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, fileService.read(uuid, 0, 20).get().toByteArray());
|
||||
fileService.write(uuid, 5, new byte[]{10, 11, 12, 13, 14, 15, 16, 17});
|
||||
Assertions.assertArrayEquals(new byte[]{0, 1, 2, 3, 4, 10, 11, 12, 13, 14, 15, 16, 17, 0, 0, 0, 0, 0, 0, 0}, fileService.read(uuid, 0, 20).get().toByteArray());
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user