mirror of
https://github.com/usatiuk/dhfs.git
synced 2025-10-28 20:47:49 +01:00
k-v based map draft, seems to work!
This commit is contained in:
@@ -44,7 +44,7 @@ public class CurrentTransaction implements Transaction {
|
||||
}
|
||||
|
||||
@Override
|
||||
public Iterator<Pair<JObjectKey, JData>> getIterator(IteratorStart start, JObjectKey key) {
|
||||
public CloseableKvIterator<JObjectKey, JData> getIterator(IteratorStart start, JObjectKey key) {
|
||||
return transactionManager.current().getIterator(start, key);
|
||||
}
|
||||
|
||||
|
||||
@@ -0,0 +1,8 @@
|
||||
package com.usatiuk.dhfs.objects;
|
||||
|
||||
import com.usatiuk.dhfs.objects.persistence.IteratorStart;
|
||||
|
||||
@FunctionalInterface
|
||||
public interface IterProdFn<K extends Comparable<K>, V> {
|
||||
CloseableKvIterator<K, V> get(IteratorStart start, K key);
|
||||
}
|
||||
@@ -136,10 +136,6 @@ public class JObjectManager {
|
||||
};
|
||||
}
|
||||
|
||||
Log.trace("Committing transaction start");
|
||||
// FIXME: Better way?
|
||||
addDependency.accept(JDataDummy.TX_ID_OBJ_NAME);
|
||||
writes.put(JDataDummy.TX_ID_OBJ_NAME, new TxRecord.TxObjectRecordWrite<>(JDataDummy.getInstance()));
|
||||
} finally {
|
||||
readSet = tx.reads();
|
||||
|
||||
@@ -153,6 +149,11 @@ public class JObjectManager {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Log.trace("Committing transaction start");
|
||||
// FIXME: Better way?
|
||||
addDependency.accept(JDataDummy.TX_ID_OBJ_NAME);
|
||||
writes.put(JDataDummy.TX_ID_OBJ_NAME, new TxRecord.TxObjectRecordWrite<>(JDataDummy.getInstance()));
|
||||
|
||||
var snapshotId = tx.snapshot().id();
|
||||
var newId = _txCounter.get() + 1;
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
package com.usatiuk.dhfs.objects;
|
||||
|
||||
import com.usatiuk.dhfs.objects.persistence.IteratorStart;
|
||||
import io.quarkus.logging.Log;
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
|
||||
@@ -7,30 +8,79 @@ import java.util.*;
|
||||
|
||||
public class MergingKvIterator<K extends Comparable<K>, V> implements CloseableKvIterator<K, V> {
|
||||
private final Map<CloseableKvIterator<K, V>, Integer> _iterators;
|
||||
private final SortedMap<K, CloseableKvIterator<K, V>> _sortedIterators = new TreeMap<>();
|
||||
private final NavigableMap<K, CloseableKvIterator<K, V>> _sortedIterators = new TreeMap<>();
|
||||
private final String _name;
|
||||
|
||||
public MergingKvIterator(String name, List<CloseableKvIterator<K, V>> iterators) {
|
||||
public MergingKvIterator(String name, IteratorStart startType, K startKey, List<IterProdFn<K, V>> iterators) {
|
||||
_name = name;
|
||||
|
||||
IteratorStart initialStartType = startType;
|
||||
K initialStartKey = startKey;
|
||||
boolean fail = false;
|
||||
if (startType == IteratorStart.LT || startType == IteratorStart.LE) {
|
||||
// Starting at a greatest key less than/less or equal than:
|
||||
// We have a bunch of iterators that have given us theirs "greatest LT/LE key"
|
||||
// now we need to pick the greatest of those to start with
|
||||
var initialIterators = iterators.stream().map(p -> p.get(initialStartType, initialStartKey)).toList();
|
||||
try {
|
||||
K initialMaxValue = initialIterators.stream()
|
||||
.filter(CloseableKvIterator::hasNext)
|
||||
.map((i) -> {
|
||||
var peeked = i.peekNextKey();
|
||||
// Log.warnv("peeked: {0}, from {1}", peeked, i.getClass());
|
||||
return peeked;
|
||||
})
|
||||
.max(Comparator.naturalOrder()).orElse(null);
|
||||
if (initialMaxValue == null) {
|
||||
fail = true;
|
||||
}
|
||||
startKey = initialMaxValue;
|
||||
startType = IteratorStart.GE;
|
||||
} finally {
|
||||
initialIterators.forEach(CloseableKvIterator::close);
|
||||
}
|
||||
}
|
||||
|
||||
if (fail) {
|
||||
_iterators = Map.of();
|
||||
return;
|
||||
}
|
||||
|
||||
int counter = 0;
|
||||
var iteratorsTmp = new HashMap<CloseableKvIterator<K, V>, Integer>();
|
||||
for (CloseableKvIterator<K, V> iterator : iterators) {
|
||||
for (var iteratorFn : iterators) {
|
||||
var iterator = iteratorFn.get(startType, startKey);
|
||||
iteratorsTmp.put(iterator, counter++);
|
||||
}
|
||||
_iterators = Collections.unmodifiableMap(iteratorsTmp);
|
||||
_iterators = Map.copyOf(iteratorsTmp);
|
||||
|
||||
for (CloseableKvIterator<K, V> iterator : iterators) {
|
||||
for (CloseableKvIterator<K, V> iterator : _iterators.keySet()) {
|
||||
advanceIterator(iterator);
|
||||
}
|
||||
|
||||
Log.tracev("{0} Created: {1}", _name, _sortedIterators);
|
||||
switch (initialStartType) {
|
||||
case LT -> {
|
||||
assert _sortedIterators.isEmpty() || _sortedIterators.firstKey().compareTo(initialStartKey) < 0;
|
||||
}
|
||||
case LE -> {
|
||||
assert _sortedIterators.isEmpty() || _sortedIterators.firstKey().compareTo(initialStartKey) <= 0;
|
||||
}
|
||||
case GT -> {
|
||||
assert _sortedIterators.isEmpty() || _sortedIterators.firstKey().compareTo(initialStartKey) > 0;
|
||||
}
|
||||
case GE -> {
|
||||
assert _sortedIterators.isEmpty() || _sortedIterators.firstKey().compareTo(initialStartKey) >= 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@SafeVarargs
|
||||
public MergingKvIterator(String name, CloseableKvIterator<K, V>... iterators) {
|
||||
this(name, List.of(iterators));
|
||||
public MergingKvIterator(String name, IteratorStart startType, K startKey, IterProdFn<K, V>... iterators) {
|
||||
this(name, startType, startKey, List.of(iterators));
|
||||
}
|
||||
|
||||
|
||||
private void advanceIterator(CloseableKvIterator<K, V> iterator) {
|
||||
if (!iterator.hasNext()) {
|
||||
return;
|
||||
@@ -49,6 +99,7 @@ public class MergingKvIterator<K extends Comparable<K>, V> implements CloseableK
|
||||
_sortedIterators.put(key, iterator);
|
||||
advanceIterator(them);
|
||||
} else {
|
||||
Log.tracev("{0} Skipped: {1}", _name, iterator.peekNextKey());
|
||||
iterator.skip();
|
||||
advanceIterator(iterator);
|
||||
}
|
||||
@@ -92,7 +143,7 @@ public class MergingKvIterator<K extends Comparable<K>, V> implements CloseableK
|
||||
}
|
||||
var curVal = cur.getValue().next();
|
||||
advanceIterator(cur.getValue());
|
||||
Log.tracev("{0} Read from {1}: {2}, next: {3}", _name, cur.getValue(), curVal, _sortedIterators);
|
||||
// Log.tracev("{0} Read from {1}: {2}, next: {3}", _name, cur.getValue(), curVal, _sortedIterators.keySet());
|
||||
return curVal;
|
||||
}
|
||||
|
||||
@@ -100,7 +151,7 @@ public class MergingKvIterator<K extends Comparable<K>, V> implements CloseableK
|
||||
public String toString() {
|
||||
return "MergingKvIterator{" +
|
||||
"_name='" + _name + '\'' +
|
||||
", _sortedIterators=" + _sortedIterators +
|
||||
", _sortedIterators=" + _sortedIterators.keySet() +
|
||||
", _iterators=" + _iterators +
|
||||
'}';
|
||||
}
|
||||
|
||||
@@ -15,11 +15,15 @@ public class NavigableMapKvIterator<K extends Comparable<K>, V> implements Close
|
||||
case GE -> _view = map.tailMap(key, true);
|
||||
case GT -> _view = map.tailMap(key, false);
|
||||
case LE -> {
|
||||
var tail = map.tailMap(key, true);
|
||||
if (tail.firstKey().equals(key)) _view = tail;
|
||||
else _view = map.tailMap(map.lowerKey(key), true);
|
||||
var floorKey = map.floorKey(key);
|
||||
if (floorKey == null) _view = Collections.emptyNavigableMap();
|
||||
else _view = map.tailMap(floorKey, true);
|
||||
}
|
||||
case LT -> {
|
||||
var lowerKey = map.lowerKey(key);
|
||||
if (lowerKey == null) _view = Collections.emptyNavigableMap();
|
||||
else _view = map.tailMap(lowerKey, true);
|
||||
}
|
||||
case LT -> _view = map.tailMap(map.lowerKey(key), true);
|
||||
default -> throw new IllegalArgumentException("Unknown start type");
|
||||
}
|
||||
_iterator = _view.entrySet().iterator();
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
package com.usatiuk.dhfs.objects;
|
||||
|
||||
import com.usatiuk.dhfs.objects.persistence.IteratorStart;
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
|
||||
import java.util.NoSuchElementException;
|
||||
@@ -10,10 +11,22 @@ public class PredicateKvIterator<K extends Comparable<K>, V, V_T> implements Clo
|
||||
private final Function<V, V_T> _transformer;
|
||||
private Pair<K, V_T> _next;
|
||||
|
||||
public PredicateKvIterator(CloseableKvIterator<K, V> backing, Function<V, V_T> transformer) {
|
||||
public PredicateKvIterator(CloseableKvIterator<K, V> backing, IteratorStart start, K startKey, Function<V, V_T> transformer) {
|
||||
_backing = backing;
|
||||
_transformer = transformer;
|
||||
fillNext();
|
||||
if (_next == null) {
|
||||
return;
|
||||
}
|
||||
if (start == IteratorStart.LE) {
|
||||
if (_next.getKey().compareTo(startKey) > 0) {
|
||||
_next = null;
|
||||
}
|
||||
} else if (start == IteratorStart.LT) {
|
||||
if (_next.getKey().compareTo(startKey) >= 0) {
|
||||
_next = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void fillNext() {
|
||||
|
||||
@@ -42,13 +42,15 @@ public class SelfRefreshingKvIterator<K extends Comparable<K>, V> implements Clo
|
||||
if (_versionSupplier.get() == _curVersion) {
|
||||
return;
|
||||
}
|
||||
Log.tracev("Refreshing iterator last refreshed {0}, current version {1}, current value {2}",
|
||||
_curVersion, _versionSupplier.get(), _next);
|
||||
long newVersion = _versionSupplier.get();
|
||||
oldBacking = _backing;
|
||||
_backing = _iteratorSupplier.apply(Pair.of(IteratorStart.GE, _next.getKey()));
|
||||
var next = _backing.hasNext() ? _backing.next() : null;
|
||||
if (next == null) {
|
||||
Log.errorv("Failed to refresh iterator, null last refreshed {0}," +
|
||||
" current version {1}, current value {2}", _curVersion, newVersion, next);
|
||||
" current version {1}, current value {2}, read value {3}", _curVersion, newVersion, _next, next);
|
||||
assert false;
|
||||
} else if (!next.equals(_next)) {
|
||||
Log.errorv("Failed to refresh iterator, mismatch last refreshed {0}," +
|
||||
|
||||
@@ -240,9 +240,21 @@ public class SnapshotManager {
|
||||
private final CloseableKvIterator<SnapshotKey, SnapshotEntry> _backing;
|
||||
private Pair<JObjectKey, TombstoneMergingKvIterator.DataType<JDataVersionedWrapper>> _next;
|
||||
|
||||
public SnapshotKvIterator(IteratorStart start, JObjectKey key) {
|
||||
_backing = new NavigableMapKvIterator<>(_objects, start, new SnapshotKey(key, 0L));
|
||||
public SnapshotKvIterator(IteratorStart start, JObjectKey startKey) {
|
||||
_backing = new NavigableMapKvIterator<>(_objects, start, new SnapshotKey(startKey, 0L));
|
||||
fillNext();
|
||||
if (_next == null) {
|
||||
return;
|
||||
}
|
||||
if (start == IteratorStart.LE) {
|
||||
if (_next.getKey().compareTo(startKey) > 0) {
|
||||
_next = null;
|
||||
}
|
||||
} else if (start == IteratorStart.LT) {
|
||||
if (_next.getKey().compareTo(startKey) >= 0) {
|
||||
_next = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void fillNext() {
|
||||
@@ -357,11 +369,21 @@ public class SnapshotManager {
|
||||
// so refresh them manually. Otherwise, it could be possible that something from the writeback cache will
|
||||
// be served instead. Note that refreshing the iterator will also refresh the writeback iterator,
|
||||
// so it also should be consistent.
|
||||
return new CheckingSnapshotKvIterator(new SelfRefreshingKvIterator<>((params) ->
|
||||
new TombstoneMergingKvIterator<>("snapshot", new SnapshotKvIterator(params.getLeft(), params.getRight()),
|
||||
new MappingKvIterator<>(writebackStore.getIterator(params.getLeft(), params.getRight()), d ->
|
||||
d.version() <= _id ? new TombstoneMergingKvIterator.Data<>(d) : new TombstoneMergingKvIterator.Tombstone<>()
|
||||
)), _snapshotVersion::get, _lock.readLock(), start, key));
|
||||
Log.tracev("Getting snapshot {0} iterator for {1} {2}", _id, start, key);
|
||||
_lock.readLock().lock();
|
||||
try {
|
||||
return new CheckingSnapshotKvIterator(new SelfRefreshingKvIterator<>(
|
||||
p ->
|
||||
new TombstoneMergingKvIterator<>("snapshot", p.getKey(), p.getValue(),
|
||||
SnapshotKvIterator::new,
|
||||
(tS, tK) -> new MappingKvIterator<>(
|
||||
writebackStore.getIterator(tS, tK),
|
||||
d -> d.version() <= _id ? new TombstoneMergingKvIterator.Data<>(d) : new TombstoneMergingKvIterator.Tombstone<>())
|
||||
)
|
||||
, _snapshotVersion::get, _lock.readLock(), start, key));
|
||||
} finally {
|
||||
_lock.readLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
public CloseableKvIterator<JObjectKey, JDataVersionedWrapper> getIterator(JObjectKey key) {
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
package com.usatiuk.dhfs.objects;
|
||||
|
||||
import com.usatiuk.dhfs.objects.persistence.IteratorStart;
|
||||
import io.quarkus.logging.Log;
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
|
||||
@@ -9,10 +10,11 @@ public class TombstoneMergingKvIterator<K extends Comparable<K>, V> implements C
|
||||
private final CloseableKvIterator<K, V> _backing;
|
||||
private final String _name;
|
||||
|
||||
public TombstoneMergingKvIterator(String name, List<CloseableKvIterator<K, DataType<V>>> iterators) {
|
||||
public TombstoneMergingKvIterator(String name, IteratorStart startType, K startKey, List<IterProdFn<K, DataType<V>>> iterators) {
|
||||
_name = name;
|
||||
_backing = new PredicateKvIterator<>(
|
||||
new MergingKvIterator<>(name + "-merging", iterators),
|
||||
new MergingKvIterator<>(name + "-merging", startType, startKey, iterators),
|
||||
startType, startKey,
|
||||
pair -> {
|
||||
Log.tracev("{0} - Processing pair {1}", _name, pair);
|
||||
if (pair instanceof Tombstone) {
|
||||
@@ -23,8 +25,8 @@ public class TombstoneMergingKvIterator<K extends Comparable<K>, V> implements C
|
||||
}
|
||||
|
||||
@SafeVarargs
|
||||
public TombstoneMergingKvIterator(String name, CloseableKvIterator<K, DataType<V>>... iterators) {
|
||||
this(name, List.of(iterators));
|
||||
public TombstoneMergingKvIterator(String name, IteratorStart startType, K startKey, IterProdFn<K, DataType<V>>... iterators) {
|
||||
this(name, startType, startKey, List.of(iterators));
|
||||
}
|
||||
|
||||
public interface DataType<T> {
|
||||
|
||||
@@ -447,22 +447,26 @@ public class WritebackObjectPersistentStore {
|
||||
// Does not have to guarantee consistent view, snapshots are handled by upper layers
|
||||
// Invalidated by commitBundle, but might return data after it has been really committed
|
||||
public CloseableKvIterator<JObjectKey, JDataVersionedWrapper> getIterator(IteratorStart start, JObjectKey key) {
|
||||
Log.tracev("Getting writeback iterator: {0}, {1}", start, key);
|
||||
_pendingWritesVersionLock.readLock().lock();
|
||||
try {
|
||||
CloseableKvIterator<JObjectKey, TombstoneMergingKvIterator.DataType<JDataVersionedWrapper>> oursIterator = new MappingKvIterator<>(
|
||||
new NavigableMapKvIterator<>(_pendingWrites.get(), start, key),
|
||||
e -> switch (e) {
|
||||
case PendingWrite p -> new TombstoneMergingKvIterator.Data<>(p.data());
|
||||
case PendingDelete d -> new TombstoneMergingKvIterator.Tombstone<>();
|
||||
default -> throw new IllegalStateException("Unexpected value: " + e);
|
||||
});
|
||||
var curPending = _pendingWrites.get();
|
||||
|
||||
return new InvalidatableKvIterator<>(
|
||||
new InconsistentKvIteratorWrapper<>(
|
||||
(p) ->
|
||||
new TombstoneMergingKvIterator<>("writeback-ps",
|
||||
oursIterator,
|
||||
new MappingKvIterator<>(cachedStore.getIterator(p.getLeft(), p.getRight()), TombstoneMergingKvIterator.Data::new)), start, key),
|
||||
p ->
|
||||
new TombstoneMergingKvIterator<>("writeback-ps", p.getLeft(), p.getRight(),
|
||||
(tS, tK) -> new MappingKvIterator<>(
|
||||
new NavigableMapKvIterator<>(curPending, tS, tK),
|
||||
e -> switch (e) {
|
||||
case PendingWrite pw ->
|
||||
new TombstoneMergingKvIterator.Data<>(pw.data());
|
||||
case PendingDelete d ->
|
||||
new TombstoneMergingKvIterator.Tombstone<>();
|
||||
default ->
|
||||
throw new IllegalStateException("Unexpected value: " + e);
|
||||
}),
|
||||
(tS, tK) -> new MappingKvIterator<>(cachedStore.getIterator(tS, tK), TombstoneMergingKvIterator.Data::new)), start, key),
|
||||
_pendingWritesVersion::get, _pendingWritesVersionLock.readLock());
|
||||
} finally {
|
||||
_pendingWritesVersionLock.readLock().unlock();
|
||||
|
||||
@@ -188,14 +188,15 @@ public class CachingObjectPersistentStore {
|
||||
_cacheVersionLock.readLock().lock();
|
||||
try {
|
||||
return new InconsistentSelfRefreshingKvIterator<>(
|
||||
(bp) -> new MergingKvIterator<>("cache",
|
||||
new PredicateKvIterator<>(
|
||||
new NavigableMapKvIterator<>(_sortedCache, bp.getLeft(), bp.getRight()),
|
||||
p -> new MergingKvIterator<>("cache", p.getLeft(), p.getRight(),
|
||||
(mS, mK) -> new PredicateKvIterator<>(
|
||||
new NavigableMapKvIterator<>(_sortedCache, mS, mK),
|
||||
mS, mK,
|
||||
e -> {
|
||||
Log.tracev("Taken from cache: {0}", e);
|
||||
return e.object().orElse(null);
|
||||
}
|
||||
), new CachingKvIterator(delegate.getIterator(bp.getLeft(), bp.getRight()))), _cacheVersion::get,
|
||||
), (mS, mK) -> new CachingKvIterator(delegate.getIterator(mS, mK))), _cacheVersion::get,
|
||||
_cacheVersionLock.readLock(), start, key);
|
||||
} finally {
|
||||
_cacheVersionLock.readLock().unlock();
|
||||
|
||||
@@ -142,7 +142,24 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore {
|
||||
}
|
||||
}
|
||||
|
||||
Log.tracev("got: {0}, hasNext: {1}", got, _hasNext);
|
||||
var realGot = JObjectKey.fromByteBuffer(_cursor.key());
|
||||
_cursor.key().flip();
|
||||
|
||||
switch (start) {
|
||||
case LT -> {
|
||||
assert !_hasNext || realGot.compareTo(key) < 0;
|
||||
}
|
||||
case LE -> {
|
||||
assert !_hasNext || realGot.compareTo(key) <= 0;
|
||||
}
|
||||
case GT -> {
|
||||
assert !_hasNext || realGot.compareTo(key) > 0;
|
||||
}
|
||||
case GE -> {
|
||||
assert !_hasNext || realGot.compareTo(key) >= 0;
|
||||
}
|
||||
}
|
||||
Log.tracev("got: {0}, hasNext: {1}", realGot, _hasNext);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -2,12 +2,14 @@ package com.usatiuk.dhfs.objects.transaction;
|
||||
|
||||
import com.usatiuk.dhfs.objects.*;
|
||||
import com.usatiuk.dhfs.objects.persistence.IteratorStart;
|
||||
import com.usatiuk.dhfs.utils.AutoCloseableNoThrow;
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
import jakarta.inject.Inject;
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
|
||||
@ApplicationScoped
|
||||
public class ReadTrackingObjectSourceFactory {
|
||||
@@ -22,7 +24,6 @@ public class ReadTrackingObjectSourceFactory {
|
||||
private final SnapshotManager.Snapshot _snapshot;
|
||||
|
||||
private final Map<JObjectKey, TransactionObject<?>> _readSet = new HashMap<>();
|
||||
private final Queue<AutoCloseableNoThrow> _iterators = new ArrayDeque<>();
|
||||
|
||||
public ReadTrackingObjectSourceImpl(SnapshotManager.Snapshot snapshot) {
|
||||
_snapshot = snapshot;
|
||||
@@ -66,9 +67,9 @@ public class ReadTrackingObjectSourceFactory {
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
for (var it : _iterators) {
|
||||
it.close();
|
||||
}
|
||||
// for (var it : _iterators) {
|
||||
// it.close();
|
||||
// }
|
||||
}
|
||||
|
||||
private class ReadTrackingIterator implements CloseableKvIterator<JObjectKey, JData> {
|
||||
@@ -108,9 +109,7 @@ public class ReadTrackingObjectSourceFactory {
|
||||
|
||||
@Override
|
||||
public CloseableKvIterator<JObjectKey, JData> getIterator(IteratorStart start, JObjectKey key) {
|
||||
var got = new ReadTrackingIterator(start, key);
|
||||
_iterators.add(got);
|
||||
return got;
|
||||
return new ReadTrackingIterator(start, key);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,5 +1,6 @@
|
||||
package com.usatiuk.dhfs.objects.transaction;
|
||||
|
||||
import com.usatiuk.dhfs.objects.CloseableKvIterator;
|
||||
import com.usatiuk.dhfs.objects.JData;
|
||||
import com.usatiuk.dhfs.objects.JObjectKey;
|
||||
import com.usatiuk.dhfs.objects.persistence.IteratorStart;
|
||||
@@ -15,9 +16,9 @@ public interface ReadTrackingTransactionObjectSource extends AutoCloseableNoThro
|
||||
|
||||
<T extends JData> Optional<T> getWriteLocked(Class<T> type, JObjectKey key);
|
||||
|
||||
Iterator<Pair<JObjectKey, JData>> getIterator(IteratorStart start, JObjectKey key);
|
||||
CloseableKvIterator<JObjectKey, JData> getIterator(IteratorStart start, JObjectKey key);
|
||||
|
||||
default Iterator<Pair<JObjectKey, JData>> getIterator(JObjectKey key) {
|
||||
default CloseableKvIterator<JObjectKey, JData> getIterator(JObjectKey key) {
|
||||
return getIterator(IteratorStart.GE, key);
|
||||
}
|
||||
|
||||
|
||||
@@ -1,13 +1,12 @@
|
||||
package com.usatiuk.dhfs.objects.transaction;
|
||||
|
||||
import com.usatiuk.dhfs.objects.CloseableKvIterator;
|
||||
import com.usatiuk.dhfs.objects.JData;
|
||||
import com.usatiuk.dhfs.objects.JObjectKey;
|
||||
import com.usatiuk.dhfs.objects.persistence.IteratorStart;
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
|
||||
import javax.annotation.Nonnull;
|
||||
import java.util.Collection;
|
||||
import java.util.Iterator;
|
||||
import java.util.Optional;
|
||||
|
||||
// The transaction interface actually used by user code to retrieve objects
|
||||
@@ -27,9 +26,9 @@ public interface Transaction extends TransactionHandle {
|
||||
return get(type, key, LockingStrategy.OPTIMISTIC);
|
||||
}
|
||||
|
||||
Iterator<Pair<JObjectKey, JData>> getIterator(IteratorStart start, JObjectKey key);
|
||||
CloseableKvIterator<JObjectKey, JData> getIterator(IteratorStart start, JObjectKey key);
|
||||
|
||||
default Iterator<Pair<JObjectKey, JData>> getIterator(JObjectKey key) {
|
||||
default CloseableKvIterator<JObjectKey, JData> getIterator(JObjectKey key) {
|
||||
return getIterator(IteratorStart.GE, key);
|
||||
}
|
||||
|
||||
|
||||
@@ -1,13 +1,10 @@
|
||||
package com.usatiuk.dhfs.objects.transaction;
|
||||
|
||||
import com.usatiuk.dhfs.objects.JData;
|
||||
import com.usatiuk.dhfs.objects.JObjectKey;
|
||||
import com.usatiuk.dhfs.objects.SnapshotManager;
|
||||
import com.usatiuk.dhfs.objects.*;
|
||||
import com.usatiuk.dhfs.objects.persistence.IteratorStart;
|
||||
import io.quarkus.logging.Log;
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
import jakarta.inject.Inject;
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
|
||||
import javax.annotation.Nonnull;
|
||||
import java.util.*;
|
||||
@@ -27,7 +24,10 @@ public class TransactionFactoryImpl implements TransactionFactory {
|
||||
|
||||
private class TransactionImpl implements TransactionPrivate {
|
||||
private final ReadTrackingTransactionObjectSource _source;
|
||||
private final Map<JObjectKey, TxRecord.TxObjectRecord<?>> _writes = new HashMap<>();
|
||||
|
||||
private final NavigableMap<JObjectKey, TxRecord.TxObjectRecord<?>> _writes = new TreeMap<>();
|
||||
private long _writeVersion = 0;
|
||||
|
||||
private Map<JObjectKey, TxRecord.TxObjectRecord<?>> _newWrites = new HashMap<>();
|
||||
private final List<Runnable> _onCommit = new ArrayList<>();
|
||||
private final List<Runnable> _onFlush = new ArrayList<>();
|
||||
@@ -103,8 +103,16 @@ public class TransactionFactoryImpl implements TransactionFactory {
|
||||
}
|
||||
|
||||
@Override
|
||||
public Iterator<Pair<JObjectKey, JData>> getIterator(IteratorStart start, JObjectKey key) {
|
||||
return _source.getIterator(start, key);
|
||||
public CloseableKvIterator<JObjectKey, JData> getIterator(IteratorStart start, JObjectKey key) {
|
||||
Log.tracev("Getting tx iterator with start={0}, key={1}", start, key);
|
||||
return new TombstoneMergingKvIterator<>("tx", start, key,
|
||||
(tS, tK) -> new MappingKvIterator<>(new NavigableMapKvIterator<>(_writes, tS, tK), t -> switch (t) {
|
||||
case TxRecord.TxObjectRecordWrite<?> write ->
|
||||
new TombstoneMergingKvIterator.Data<>(write.data());
|
||||
case TxRecord.TxObjectRecordDeleted deleted -> new TombstoneMergingKvIterator.Tombstone<>();
|
||||
case null, default -> null;
|
||||
}),
|
||||
(tS, tK) -> new MappingKvIterator<>(_source.getIterator(tS, tK), TombstoneMergingKvIterator.Data::new));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -1,8 +1,10 @@
|
||||
package com.usatiuk.dhfs.objects;
|
||||
|
||||
import com.usatiuk.dhfs.objects.persistence.IteratorStart;
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.pcollections.TreePMap;
|
||||
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
@@ -83,7 +85,7 @@ public class MergingKvIteratorTest {
|
||||
public void testSimple() {
|
||||
var source1 = List.of(Pair.of(1, 2), Pair.of(3, 4), Pair.of(5, 6)).iterator();
|
||||
var source2 = List.of(Pair.of(2, 3), Pair.of(4, 5), Pair.of(6, 7)).iterator();
|
||||
var mergingIterator = new MergingKvIterator<>("test", new SimpleIteratorWrapper<>(source1), new SimpleIteratorWrapper<>(source2));
|
||||
var mergingIterator = new MergingKvIterator<>("test", IteratorStart.GE, 0, (a, b) -> new SimpleIteratorWrapper<>(source1), (a, b) -> new SimpleIteratorWrapper<>(source2));
|
||||
var expected = List.of(Pair.of(1, 2), Pair.of(2, 3), Pair.of(3, 4), Pair.of(4, 5), Pair.of(5, 6), Pair.of(6, 7));
|
||||
for (var pair : expected) {
|
||||
Assertions.assertTrue(mergingIterator.hasNext());
|
||||
@@ -95,7 +97,7 @@ public class MergingKvIteratorTest {
|
||||
public void testPriority() {
|
||||
var source1 = List.of(Pair.of(1, 2), Pair.of(2, 4), Pair.of(5, 6));
|
||||
var source2 = List.of(Pair.of(1, 3), Pair.of(2, 5), Pair.of(5, 7));
|
||||
var mergingIterator = new MergingKvIterator<>("test", new SimpleIteratorWrapper<>(source1.iterator()), new SimpleIteratorWrapper<>(source2.iterator()));
|
||||
var mergingIterator = new MergingKvIterator<>("test", IteratorStart.GE, 0, (a, b) -> new SimpleIteratorWrapper<>(source1.iterator()), (a, b) -> new SimpleIteratorWrapper<>(source2.iterator()));
|
||||
var expected = List.of(Pair.of(1, 2), Pair.of(2, 4), Pair.of(5, 6));
|
||||
for (var pair : expected) {
|
||||
Assertions.assertTrue(mergingIterator.hasNext());
|
||||
@@ -103,7 +105,7 @@ public class MergingKvIteratorTest {
|
||||
}
|
||||
Assertions.assertFalse(mergingIterator.hasNext());
|
||||
|
||||
var mergingIterator2 = new MergingKvIterator<>("test", new SimpleIteratorWrapper<>(source2.iterator()), new SimpleIteratorWrapper<>(source1.iterator()));
|
||||
var mergingIterator2 = new MergingKvIterator<>("test", IteratorStart.GE, 0, (a, b) -> new SimpleIteratorWrapper<>(source2.iterator()), (a, b) -> new SimpleIteratorWrapper<>(source1.iterator()));
|
||||
var expected2 = List.of(Pair.of(1, 3), Pair.of(2, 5), Pair.of(5, 7));
|
||||
for (var pair : expected2) {
|
||||
Assertions.assertTrue(mergingIterator2.hasNext());
|
||||
@@ -116,7 +118,7 @@ public class MergingKvIteratorTest {
|
||||
public void testPriority2() {
|
||||
var source1 = List.of(Pair.of(2, 4), Pair.of(5, 6));
|
||||
var source2 = List.of(Pair.of(1, 3), Pair.of(2, 5));
|
||||
var mergingIterator = new MergingKvIterator<>("test", new SimpleIteratorWrapper<>(source1.iterator()), new SimpleIteratorWrapper<>(source2.iterator()));
|
||||
var mergingIterator = new MergingKvIterator<>("test", IteratorStart.GE, 0, (a, b) -> new SimpleIteratorWrapper<>(source1.iterator()), (a, b) -> new SimpleIteratorWrapper<>(source2.iterator()));
|
||||
var expected = List.of(Pair.of(1, 3), Pair.of(2, 4), Pair.of(5, 6));
|
||||
for (var pair : expected) {
|
||||
Assertions.assertTrue(mergingIterator.hasNext());
|
||||
@@ -124,7 +126,7 @@ public class MergingKvIteratorTest {
|
||||
}
|
||||
Assertions.assertFalse(mergingIterator.hasNext());
|
||||
|
||||
var mergingIterator2 = new MergingKvIterator<>("test", new SimpleIteratorWrapper<>(source2.iterator()), new SimpleIteratorWrapper<>(source1.iterator()));
|
||||
var mergingIterator2 = new MergingKvIterator<>("test", IteratorStart.GE, 0, (a, b) -> new SimpleIteratorWrapper<>(source2.iterator()), (a, b) -> new SimpleIteratorWrapper<>(source1.iterator()));
|
||||
var expected2 = List.of(Pair.of(1, 3), Pair.of(2, 5), Pair.of(5, 6));
|
||||
for (var pair : expected2) {
|
||||
Assertions.assertTrue(mergingIterator2.hasNext());
|
||||
@@ -132,4 +134,164 @@ public class MergingKvIteratorTest {
|
||||
}
|
||||
Assertions.assertFalse(mergingIterator2.hasNext());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPriorityLe() {
|
||||
var source1 = TreePMap.<Integer, Integer>empty().plus(2, 4).plus(5, 6);
|
||||
var source2 = TreePMap.<Integer, Integer>empty().plus(1, 3).plus(2, 5);
|
||||
var mergingIterator = new MergingKvIterator<>("test", IteratorStart.LE, 5, (mS, mK) -> new NavigableMapKvIterator<>(source1, mS, mK), (mS, mK) -> new NavigableMapKvIterator<>(source2, mS, mK));
|
||||
var expected = List.of(Pair.of(5, 6));
|
||||
for (var pair : expected) {
|
||||
Assertions.assertTrue(mergingIterator.hasNext());
|
||||
Assertions.assertEquals(pair, mergingIterator.next());
|
||||
}
|
||||
Assertions.assertFalse(mergingIterator.hasNext());
|
||||
|
||||
var mergingIterator2 = new MergingKvIterator<>("test", IteratorStart.LE, 5, (mS, mK) -> new NavigableMapKvIterator<>(source2, mS, mK), (mS, mK) -> new NavigableMapKvIterator<>(source1, mS, mK));
|
||||
var expected2 = List.of(Pair.of(5, 6));
|
||||
for (var pair : expected2) {
|
||||
Assertions.assertTrue(mergingIterator2.hasNext());
|
||||
Assertions.assertEquals(pair, mergingIterator2.next());
|
||||
}
|
||||
Assertions.assertFalse(mergingIterator2.hasNext());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPriorityLe2() {
|
||||
var source1 = TreePMap.<Integer, Integer>empty().plus(2, 4).plus(5, 6);
|
||||
var source2 = TreePMap.<Integer, Integer>empty().plus(1, 3).plus(2, 5).plus(3, 4);
|
||||
var mergingIterator = new MergingKvIterator<>("test", IteratorStart.LE, 5, (mS, mK) -> new NavigableMapKvIterator<>(source1, mS, mK), (mS, mK) -> new NavigableMapKvIterator<>(source2, mS, mK));
|
||||
var expected = List.of(Pair.of(5, 6));
|
||||
for (var pair : expected) {
|
||||
Assertions.assertTrue(mergingIterator.hasNext());
|
||||
Assertions.assertEquals(pair, mergingIterator.next());
|
||||
}
|
||||
Assertions.assertFalse(mergingIterator.hasNext());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPriorityLe3() {
|
||||
var source1 = TreePMap.<Integer, Integer>empty().plus(2, 4).plus(5, 6);
|
||||
var source2 = TreePMap.<Integer, Integer>empty().plus(1, 3).plus(2, 5).plus(6, 8);
|
||||
var mergingIterator = new MergingKvIterator<>("test", IteratorStart.LE, 5, (mS, mK) -> new NavigableMapKvIterator<>(source1, mS, mK), (mS, mK) -> new NavigableMapKvIterator<>(source2, mS, mK));
|
||||
var expected = List.of(Pair.of(5, 6), Pair.of(6, 8));
|
||||
for (var pair : expected) {
|
||||
Assertions.assertTrue(mergingIterator.hasNext());
|
||||
Assertions.assertEquals(pair, mergingIterator.next());
|
||||
}
|
||||
Assertions.assertFalse(mergingIterator.hasNext());
|
||||
|
||||
var mergingIterator2 = new MergingKvIterator<>("test", IteratorStart.LE, 5, (mS, mK) -> new NavigableMapKvIterator<>(source2, mS, mK), (mS, mK) -> new NavigableMapKvIterator<>(source1, mS, mK));
|
||||
var expected2 = List.of(Pair.of(5, 6), Pair.of(6, 8));
|
||||
for (var pair : expected2) {
|
||||
Assertions.assertTrue(mergingIterator2.hasNext());
|
||||
Assertions.assertEquals(pair, mergingIterator2.next());
|
||||
}
|
||||
Assertions.assertFalse(mergingIterator2.hasNext());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPriorityLe4() {
|
||||
var source1 = TreePMap.<Integer, Integer>empty().plus(6, 7);
|
||||
var source2 = TreePMap.<Integer, Integer>empty().plus(1, 3).plus(2, 5).plus(3, 4);
|
||||
var mergingIterator = new MergingKvIterator<>("test", IteratorStart.LE, 5, (mS, mK) -> new NavigableMapKvIterator<>(source1, mS, mK), (mS, mK) -> new NavigableMapKvIterator<>(source2, mS, mK));
|
||||
var expected = List.of(Pair.of(3, 4), Pair.of(6, 7));
|
||||
for (var pair : expected) {
|
||||
Assertions.assertTrue(mergingIterator.hasNext());
|
||||
Assertions.assertEquals(pair, mergingIterator.next());
|
||||
}
|
||||
Assertions.assertFalse(mergingIterator.hasNext());
|
||||
|
||||
var mergingIterator2 = new MergingKvIterator<>("test", IteratorStart.LE, 5, (mS, mK) -> new NavigableMapKvIterator<>(source2, mS, mK), (mS, mK) -> new NavigableMapKvIterator<>(source1, mS, mK));
|
||||
var expected2 = List.of(Pair.of(3, 4), Pair.of(6, 7));
|
||||
for (var pair : expected2) {
|
||||
Assertions.assertTrue(mergingIterator2.hasNext());
|
||||
Assertions.assertEquals(pair, mergingIterator2.next());
|
||||
}
|
||||
Assertions.assertFalse(mergingIterator2.hasNext());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPriorityLe5() {
|
||||
var source1 = TreePMap.<Integer, Integer>empty().plus(1, 2).plus(6, 7);
|
||||
var source2 = TreePMap.<Integer, Integer>empty().plus(1, 3).plus(2, 5).plus(3, 4);
|
||||
var mergingIterator = new MergingKvIterator<>("test", IteratorStart.LE, 5, (mS, mK) -> new NavigableMapKvIterator<>(source1, mS, mK), (mS, mK) -> new NavigableMapKvIterator<>(source2, mS, mK));
|
||||
var expected = List.of(Pair.of(3, 4), Pair.of(6, 7));
|
||||
for (var pair : expected) {
|
||||
Assertions.assertTrue(mergingIterator.hasNext());
|
||||
Assertions.assertEquals(pair, mergingIterator.next());
|
||||
}
|
||||
Assertions.assertFalse(mergingIterator.hasNext());
|
||||
|
||||
var mergingIterator2 = new MergingKvIterator<>("test", IteratorStart.LE, 5, (mS, mK) -> new NavigableMapKvIterator<>(source2, mS, mK), (mS, mK) -> new NavigableMapKvIterator<>(source1, mS, mK));
|
||||
var expected2 = List.of(Pair.of(3, 4), Pair.of(6, 7));
|
||||
for (var pair : expected2) {
|
||||
Assertions.assertTrue(mergingIterator2.hasNext());
|
||||
Assertions.assertEquals(pair, mergingIterator2.next());
|
||||
}
|
||||
Assertions.assertFalse(mergingIterator2.hasNext());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPriorityLe6() {
|
||||
var source1 = TreePMap.<Integer, Integer>empty().plus(1, 3).plus(2, 5).plus(3, 4);
|
||||
var source2 = TreePMap.<Integer, Integer>empty().plus(4, 6);
|
||||
var mergingIterator = new MergingKvIterator<>("test", IteratorStart.LE, 5, (mS, mK) -> new NavigableMapKvIterator<>(source1, mS, mK), (mS, mK) -> new NavigableMapKvIterator<>(source2, mS, mK));
|
||||
var expected = List.of(Pair.of(4, 6));
|
||||
for (var pair : expected) {
|
||||
Assertions.assertTrue(mergingIterator.hasNext());
|
||||
Assertions.assertEquals(pair, mergingIterator.next());
|
||||
}
|
||||
Assertions.assertFalse(mergingIterator.hasNext());
|
||||
|
||||
var mergingIterator2 = new MergingKvIterator<>("test", IteratorStart.LE, 5, (mS, mK) -> new NavigableMapKvIterator<>(source2, mS, mK), (mS, mK) -> new NavigableMapKvIterator<>(source1, mS, mK));
|
||||
var expected2 = List.of(Pair.of(4, 6));
|
||||
for (var pair : expected2) {
|
||||
Assertions.assertTrue(mergingIterator2.hasNext());
|
||||
Assertions.assertEquals(pair, mergingIterator2.next());
|
||||
}
|
||||
Assertions.assertFalse(mergingIterator2.hasNext());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPriorityLe7() {
|
||||
var source1 = TreePMap.<Integer, Integer>empty().plus(1, 3).plus(3, 5).plus(4, 6);
|
||||
var source2 = TreePMap.<Integer, Integer>empty().plus(1, 4).plus(3, 5).plus(4, 6);
|
||||
var mergingIterator = new MergingKvIterator<>("test", IteratorStart.LE, 2, (mS, mK) -> new NavigableMapKvIterator<>(source1, mS, mK), (mS, mK) -> new NavigableMapKvIterator<>(source2, mS, mK));
|
||||
var expected = List.of(Pair.of(1, 3), Pair.of(3, 5), Pair.of(4, 6));
|
||||
for (var pair : expected) {
|
||||
Assertions.assertTrue(mergingIterator.hasNext());
|
||||
Assertions.assertEquals(pair, mergingIterator.next());
|
||||
}
|
||||
Assertions.assertFalse(mergingIterator.hasNext());
|
||||
|
||||
var mergingIterator2 = new MergingKvIterator<>("test", IteratorStart.LE, 2, (mS, mK) -> new NavigableMapKvIterator<>(source2, mS, mK), (mS, mK) -> new NavigableMapKvIterator<>(source1, mS, mK));
|
||||
var expected2 = List.of(Pair.of(1, 4), Pair.of(3, 5), Pair.of(4, 6));
|
||||
for (var pair : expected2) {
|
||||
Assertions.assertTrue(mergingIterator2.hasNext());
|
||||
Assertions.assertEquals(pair, mergingIterator2.next());
|
||||
}
|
||||
Assertions.assertFalse(mergingIterator2.hasNext());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPriorityLt() {
|
||||
var source1 = TreePMap.<Integer, Integer>empty().plus(2, 4).plus(5, 6);
|
||||
var source2 = TreePMap.<Integer, Integer>empty().plus(1, 3).plus(2, 5);
|
||||
var mergingIterator = new MergingKvIterator<>("test", IteratorStart.LT, 5, (mS, mK) -> new NavigableMapKvIterator<>(source1, mS, mK), (mS, mK) -> new NavigableMapKvIterator<>(source2, mS, mK));
|
||||
var expected = List.of(Pair.of(2, 4), Pair.of(5, 6));
|
||||
for (var pair : expected) {
|
||||
Assertions.assertTrue(mergingIterator.hasNext());
|
||||
Assertions.assertEquals(pair, mergingIterator.next());
|
||||
}
|
||||
Assertions.assertFalse(mergingIterator.hasNext());
|
||||
|
||||
var mergingIterator2 = new MergingKvIterator<>("test", IteratorStart.LT, 5, (mS, mK) -> new NavigableMapKvIterator<>(source2, mS, mK), (mS, mK) -> new NavigableMapKvIterator<>(source1, mS, mK));
|
||||
var expected2 = List.of(Pair.of(2, 5), Pair.of(5, 6));
|
||||
for (var pair : expected2) {
|
||||
Assertions.assertTrue(mergingIterator2.hasNext());
|
||||
Assertions.assertEquals(pair, mergingIterator2.next());
|
||||
}
|
||||
Assertions.assertFalse(mergingIterator2.hasNext());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,37 @@
|
||||
package com.usatiuk.dhfs.objects;
|
||||
|
||||
import com.usatiuk.dhfs.objects.persistence.IteratorStart;
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.pcollections.TreePMap;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
public class PredicateKvIteratorTest {
|
||||
|
||||
@Test
|
||||
public void simpleTest() {
|
||||
var source1 = TreePMap.<Integer, Integer>empty().plus(1, 3).plus(3, 5).plus(4, 6);
|
||||
var pit = new PredicateKvIterator<>(new NavigableMapKvIterator<>(source1, IteratorStart.GT, 1),
|
||||
IteratorStart.GE, 1, v -> (v % 2 == 0) ? v : null);
|
||||
var expected = List.of(Pair.of(4, 6));
|
||||
for (var pair : expected) {
|
||||
Assertions.assertTrue(pit.hasNext());
|
||||
Assertions.assertEquals(pair, pit.next());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void ltTest() {
|
||||
var source1 = TreePMap.<Integer, Integer>empty().plus(1, 3).plus(3, 5).plus(4, 6);
|
||||
var pit = new PredicateKvIterator<>(new NavigableMapKvIterator<>(source1, IteratorStart.LT, 4),
|
||||
IteratorStart.LT, 4, v -> (v % 2 == 0) ? v : null);
|
||||
var expected = List.of();
|
||||
for (var pair : expected) {
|
||||
Assertions.assertTrue(pit.hasNext());
|
||||
Assertions.assertEquals(pair, pit.next());
|
||||
}
|
||||
Assertions.assertFalse(pit.hasNext());
|
||||
}
|
||||
}
|
||||
@@ -1,47 +1,45 @@
|
||||
package com.usatiuk.dhfs.files.objects;
|
||||
|
||||
import com.usatiuk.autoprotomap.runtime.ProtoMirror;
|
||||
import com.usatiuk.dhfs.objects.JDataRemote;
|
||||
import com.usatiuk.dhfs.objects.JObjectKey;
|
||||
import org.pcollections.TreePMap;
|
||||
import com.usatiuk.dhfs.objects.jmap.JMapHolder;
|
||||
import com.usatiuk.dhfs.objects.jmap.JMapLongKey;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.Set;
|
||||
|
||||
//@ProtoMirror(ChunkDataP.class)
|
||||
public record File(JObjectKey key, long mode, long cTime, long mTime,
|
||||
TreePMap<Long, JObjectKey> chunks, boolean symlink, long size
|
||||
) implements FsNode {
|
||||
public File withChunks(TreePMap<Long, JObjectKey> chunks) {
|
||||
return new File(key, mode, cTime, mTime, chunks, symlink, size);
|
||||
}
|
||||
|
||||
boolean symlink, long size
|
||||
) implements JDataRemote, JMapHolder<JMapLongKey> {
|
||||
public File withSymlink(boolean symlink) {
|
||||
return new File(key, mode, cTime, mTime, chunks, symlink, size);
|
||||
return new File(key, mode, cTime, mTime, symlink, size);
|
||||
}
|
||||
|
||||
public File withSize(long size) {
|
||||
return new File(key, mode, cTime, mTime, chunks, symlink, size);
|
||||
return new File(key, mode, cTime, mTime, symlink, size);
|
||||
}
|
||||
|
||||
public File withMode(long mode) {
|
||||
return new File(key, mode, cTime, mTime, chunks, symlink, size);
|
||||
return new File(key, mode, cTime, mTime, symlink, size);
|
||||
}
|
||||
|
||||
public File withCTime(long cTime) {
|
||||
return new File(key, mode, cTime, mTime, chunks, symlink, size);
|
||||
return new File(key, mode, cTime, mTime, symlink, size);
|
||||
}
|
||||
|
||||
public File withMTime(long mTime) {
|
||||
return new File(key, mode, cTime, mTime, chunks, symlink, size);
|
||||
return new File(key, mode, cTime, mTime, symlink, size);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Collection<JObjectKey> collectRefsTo() {
|
||||
return Set.copyOf(chunks().values());
|
||||
return Set.of();
|
||||
// return Set.copyOf(chunks().values());
|
||||
}
|
||||
|
||||
@Override
|
||||
public int estimateSize() {
|
||||
return chunks.size() * 64;
|
||||
return 64;
|
||||
// return chunks.size() * 64;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,11 +0,0 @@
|
||||
package com.usatiuk.dhfs.files.objects;
|
||||
|
||||
import com.usatiuk.dhfs.objects.JDataRemote;
|
||||
|
||||
public interface FsNode extends JDataRemote {
|
||||
long mode();
|
||||
|
||||
long cTime();
|
||||
|
||||
long mTime();
|
||||
}
|
||||
@@ -10,6 +10,10 @@ import com.usatiuk.dhfs.objects.jkleppmanntree.structs.JKleppmannTreeNode;
|
||||
import com.usatiuk.dhfs.objects.jkleppmanntree.structs.JKleppmannTreeNodeMeta;
|
||||
import com.usatiuk.dhfs.objects.jkleppmanntree.structs.JKleppmannTreeNodeMetaDirectory;
|
||||
import com.usatiuk.dhfs.objects.jkleppmanntree.structs.JKleppmannTreeNodeMetaFile;
|
||||
import com.usatiuk.dhfs.objects.jmap.JMapEntry;
|
||||
import com.usatiuk.dhfs.objects.jmap.JMapHelper;
|
||||
import com.usatiuk.dhfs.objects.jmap.JMapLongKey;
|
||||
import com.usatiuk.dhfs.objects.persistence.IteratorStart;
|
||||
import com.usatiuk.dhfs.objects.transaction.LockingStrategy;
|
||||
import com.usatiuk.dhfs.objects.transaction.Transaction;
|
||||
import com.usatiuk.dhfs.utils.StatusRuntimeExceptionNoStacktrace;
|
||||
@@ -23,7 +27,6 @@ import jakarta.enterprise.event.Observes;
|
||||
import jakarta.inject.Inject;
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
import org.eclipse.microprofile.config.inject.ConfigProperty;
|
||||
import org.pcollections.TreePMap;
|
||||
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.nio.file.Path;
|
||||
@@ -69,6 +72,9 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
@Inject
|
||||
JKleppmannTreeManager jKleppmannTreeManager;
|
||||
|
||||
@Inject
|
||||
JMapHelper jMapHelper;
|
||||
|
||||
private JKleppmannTreeManager.JKleppmannTree getTree() {
|
||||
return jKleppmannTreeManager.getTree(new JObjectKey("fs"));
|
||||
}
|
||||
@@ -156,7 +162,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
|
||||
var fuuid = UUID.randomUUID();
|
||||
Log.debug("Creating file " + fuuid);
|
||||
File f = new File(JObjectKey.of(fuuid.toString()), mode, System.currentTimeMillis(), System.currentTimeMillis(), TreePMap.empty(), false, 0);
|
||||
File f = new File(JObjectKey.of(fuuid.toString()), mode, System.currentTimeMillis(), System.currentTimeMillis(), false, 0);
|
||||
remoteTx.putData(f);
|
||||
|
||||
try {
|
||||
@@ -270,31 +276,27 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
return Optional.empty();
|
||||
}
|
||||
|
||||
try {
|
||||
var chunksAll = file.chunks();
|
||||
if (chunksAll.isEmpty()) {
|
||||
try (var it = jMapHelper.getIterator(file, IteratorStart.LE, JMapLongKey.of(offset))) {
|
||||
if (!it.hasNext())
|
||||
return Optional.of(ByteString.empty());
|
||||
}
|
||||
var chunksList = chunksAll.tailMap(chunksAll.floorKey(offset)).entrySet();
|
||||
|
||||
if (chunksList.isEmpty()) {
|
||||
return Optional.of(ByteString.empty());
|
||||
}
|
||||
|
||||
var chunks = chunksList.iterator();
|
||||
// if (it.peekNextKey().key() != offset) {
|
||||
// Log.warnv("Read over the end of file: {0} {1} {2}, next chunk: {3}", fileUuid, offset, length, it.peekNextKey());
|
||||
// return Optional.of(ByteString.empty());
|
||||
// }
|
||||
long curPos = offset;
|
||||
ByteString buf = ByteString.empty();
|
||||
|
||||
long curPos = offset;
|
||||
var chunk = chunks.next();
|
||||
var chunk = it.next();
|
||||
|
||||
while (curPos < offset + length) {
|
||||
var chunkPos = chunk.getKey();
|
||||
var chunkPos = chunk.getKey().key();
|
||||
|
||||
long offInChunk = curPos - chunkPos;
|
||||
|
||||
long toReadInChunk = (offset + length) - curPos;
|
||||
|
||||
var chunkBytes = readChunk(chunk.getValue());
|
||||
var chunkBytes = readChunk(chunk.getValue().ref());
|
||||
|
||||
long readableLen = chunkBytes.size() - offInChunk;
|
||||
|
||||
@@ -309,12 +311,11 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
if (readableLen > toReadInChunk)
|
||||
break;
|
||||
|
||||
if (!chunks.hasNext()) break;
|
||||
if (!it.hasNext()) break;
|
||||
|
||||
chunk = chunks.next();
|
||||
chunk = it.next();
|
||||
}
|
||||
|
||||
// FIXME:
|
||||
return Optional.of(buf);
|
||||
} catch (Exception e) {
|
||||
Log.error("Error reading file: " + fileUuid, e);
|
||||
@@ -379,41 +380,68 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
file = remoteTx.getData(File.class, fileUuid).orElse(null);
|
||||
}
|
||||
|
||||
var chunksAll = file.chunks();
|
||||
var first = chunksAll.floorEntry(offset);
|
||||
var last = chunksAll.lowerEntry(offset + data.size());
|
||||
Pair<JMapLongKey, JMapEntry<JMapLongKey>> first;
|
||||
Pair<JMapLongKey, JMapEntry<JMapLongKey>> last;
|
||||
Log.tracev("Getting last");
|
||||
try (var it = jMapHelper.getIterator(file, IteratorStart.LT, JMapLongKey.of(offset + data.size()))) {
|
||||
last = it.hasNext() ? it.next() : null;
|
||||
Log.tracev("Last: {0}", last);
|
||||
}
|
||||
|
||||
NavigableMap<Long, JObjectKey> removedChunks = new TreeMap<>();
|
||||
|
||||
long start = 0;
|
||||
|
||||
NavigableMap<Long, JObjectKey> beforeFirst = first != null ? chunksAll.headMap(first.getKey(), false) : Collections.emptyNavigableMap();
|
||||
NavigableMap<Long, JObjectKey> afterLast = last != null ? chunksAll.tailMap(last.getKey(), false) : Collections.emptyNavigableMap();
|
||||
|
||||
if (first != null && (getChunkSize(first.getValue()) + first.getKey() <= offset)) {
|
||||
beforeFirst = chunksAll;
|
||||
afterLast = Collections.emptyNavigableMap();
|
||||
first = null;
|
||||
last = null;
|
||||
start = offset;
|
||||
} else if (!chunksAll.isEmpty()) {
|
||||
var between = chunksAll.subMap(first.getKey(), true, last.getKey(), true);
|
||||
removedChunks.putAll(between);
|
||||
start = first.getKey();
|
||||
try (var it = jMapHelper.getIterator(file, IteratorStart.LE, JMapLongKey.of(offset))) {
|
||||
first = it.hasNext() ? it.next() : null;
|
||||
Log.tracev("First: {0}", first);
|
||||
boolean empty = last == null;
|
||||
if (first != null && getChunkSize(first.getValue().ref()) + first.getKey().key() <= offset) {
|
||||
first = null;
|
||||
last = null;
|
||||
start = offset;
|
||||
} else if (!empty) {
|
||||
assert first != null;
|
||||
removedChunks.put(first.getKey().key(), first.getValue().ref());
|
||||
while (it.hasNext() && it.peekNextKey().compareTo(last.getKey()) <= 0) {
|
||||
var next = it.next();
|
||||
Log.tracev("Next: {0}", next);
|
||||
removedChunks.put(next.getKey().key(), next.getValue().ref());
|
||||
}
|
||||
removedChunks.put(last.getKey().key(), last.getValue().ref());
|
||||
start = first.getKey().key();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// NavigableMap<Long, JObjectKey> beforeFirst = first != null ? chunksAll.headMap(first.getKey(), false) : Collections.emptyNavigableMap();
|
||||
// NavigableMap<Long, JObjectKey> afterLast = last != null ? chunksAll.tailMap(last.getKey(), false) : Collections.emptyNavigableMap();
|
||||
|
||||
// if (first != null && (getChunkSize(first.getValue()) + first.getKey() <= offset)) {
|
||||
// beforeFirst = chunksAll;
|
||||
// afterLast = Collections.emptyNavigableMap();
|
||||
// first = null;
|
||||
// last = null;
|
||||
// start = offset;
|
||||
// } else if (!chunksAll.isEmpty()) {
|
||||
// var between = chunksAll.subMap(first.getKey(), true, last.getKey(), true);
|
||||
// removedChunks.putAll(between);
|
||||
// start = first.getKey();
|
||||
// }
|
||||
|
||||
ByteString pendingWrites = ByteString.empty();
|
||||
|
||||
if (first != null && first.getKey() < offset) {
|
||||
var chunkBytes = readChunk(first.getValue());
|
||||
pendingWrites = pendingWrites.concat(chunkBytes.substring(0, (int) (offset - first.getKey())));
|
||||
if (first != null && first.getKey().key() < offset) {
|
||||
var chunkBytes = readChunk(first.getValue().ref());
|
||||
pendingWrites = pendingWrites.concat(chunkBytes.substring(0, (int) (offset - first.getKey().key())));
|
||||
}
|
||||
pendingWrites = pendingWrites.concat(data);
|
||||
|
||||
if (last != null) {
|
||||
var lchunkBytes = readChunk(last.getValue());
|
||||
if (last.getKey() + lchunkBytes.size() > offset + data.size()) {
|
||||
var lchunkBytes = readChunk(last.getValue().ref());
|
||||
if (last.getKey().key() + lchunkBytes.size() > offset + data.size()) {
|
||||
var startInFile = offset + data.size();
|
||||
var startInChunk = startInFile - last.getKey();
|
||||
var startInChunk = startInFile - last.getKey().key();
|
||||
pendingWrites = pendingWrites.concat(lchunkBytes.substring((int) startInChunk, lchunkBytes.size()));
|
||||
}
|
||||
}
|
||||
@@ -421,57 +449,57 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
int combinedSize = pendingWrites.size();
|
||||
|
||||
if (targetChunkSize > 0) {
|
||||
if (combinedSize < (targetChunkSize * writeMergeThreshold)) {
|
||||
boolean leftDone = false;
|
||||
boolean rightDone = false;
|
||||
while (!leftDone && !rightDone) {
|
||||
if (beforeFirst.isEmpty()) leftDone = true;
|
||||
if (!beforeFirst.isEmpty() || !leftDone) {
|
||||
var takeLeft = beforeFirst.lastEntry();
|
||||
|
||||
var cuuid = takeLeft.getValue();
|
||||
|
||||
if (getChunkSize(cuuid) >= (targetChunkSize * writeMergeMaxChunkToTake)) {
|
||||
leftDone = true;
|
||||
continue;
|
||||
}
|
||||
|
||||
if ((combinedSize + getChunkSize(cuuid)) > (targetChunkSize * writeMergeLimit)) {
|
||||
leftDone = true;
|
||||
continue;
|
||||
}
|
||||
|
||||
// FIXME: (and test this)
|
||||
beforeFirst = beforeFirst.headMap(takeLeft.getKey(), false);
|
||||
start = takeLeft.getKey();
|
||||
pendingWrites = readChunk(cuuid).concat(pendingWrites);
|
||||
combinedSize += getChunkSize(cuuid);
|
||||
removedChunks.put(takeLeft.getKey(), takeLeft.getValue());
|
||||
}
|
||||
if (afterLast.isEmpty()) rightDone = true;
|
||||
if (!afterLast.isEmpty() && !rightDone) {
|
||||
var takeRight = afterLast.firstEntry();
|
||||
|
||||
var cuuid = takeRight.getValue();
|
||||
|
||||
if (getChunkSize(cuuid) >= (targetChunkSize * writeMergeMaxChunkToTake)) {
|
||||
rightDone = true;
|
||||
continue;
|
||||
}
|
||||
|
||||
if ((combinedSize + getChunkSize(cuuid)) > (targetChunkSize * writeMergeLimit)) {
|
||||
rightDone = true;
|
||||
continue;
|
||||
}
|
||||
|
||||
// FIXME: (and test this)
|
||||
afterLast = afterLast.tailMap(takeRight.getKey(), false);
|
||||
pendingWrites = pendingWrites.concat(readChunk(cuuid));
|
||||
combinedSize += getChunkSize(cuuid);
|
||||
removedChunks.put(takeRight.getKey(), takeRight.getValue());
|
||||
}
|
||||
}
|
||||
}
|
||||
// if (combinedSize < (targetChunkSize * writeMergeThreshold)) {
|
||||
// boolean leftDone = false;
|
||||
// boolean rightDone = false;
|
||||
// while (!leftDone && !rightDone) {
|
||||
// if (beforeFirst.isEmpty()) leftDone = true;
|
||||
// if (!beforeFirst.isEmpty() || !leftDone) {
|
||||
// var takeLeft = beforeFirst.lastEntry();
|
||||
//
|
||||
// var cuuid = takeLeft.getValue();
|
||||
//
|
||||
// if (getChunkSize(cuuid) >= (targetChunkSize * writeMergeMaxChunkToTake)) {
|
||||
// leftDone = true;
|
||||
// continue;
|
||||
// }
|
||||
//
|
||||
// if ((combinedSize + getChunkSize(cuuid)) > (targetChunkSize * writeMergeLimit)) {
|
||||
// leftDone = true;
|
||||
// continue;
|
||||
// }
|
||||
//
|
||||
// // FIXME: (and test this)
|
||||
// beforeFirst = beforeFirst.headMap(takeLeft.getKey(), false);
|
||||
// start = takeLeft.getKey();
|
||||
// pendingWrites = readChunk(cuuid).concat(pendingWrites);
|
||||
// combinedSize += getChunkSize(cuuid);
|
||||
// removedChunks.put(takeLeft.getKey(), takeLeft.getValue());
|
||||
// }
|
||||
// if (afterLast.isEmpty()) rightDone = true;
|
||||
// if (!afterLast.isEmpty() && !rightDone) {
|
||||
// var takeRight = afterLast.firstEntry();
|
||||
//
|
||||
// var cuuid = takeRight.getValue();
|
||||
//
|
||||
// if (getChunkSize(cuuid) >= (targetChunkSize * writeMergeMaxChunkToTake)) {
|
||||
// rightDone = true;
|
||||
// continue;
|
||||
// }
|
||||
//
|
||||
// if ((combinedSize + getChunkSize(cuuid)) > (targetChunkSize * writeMergeLimit)) {
|
||||
// rightDone = true;
|
||||
// continue;
|
||||
// }
|
||||
//
|
||||
// // FIXME: (and test this)
|
||||
// afterLast = afterLast.tailMap(takeRight.getKey(), false);
|
||||
// pendingWrites = pendingWrites.concat(readChunk(cuuid));
|
||||
// combinedSize += getChunkSize(cuuid);
|
||||
// removedChunks.put(takeRight.getKey(), takeRight.getValue());
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
}
|
||||
|
||||
NavigableMap<Long, JObjectKey> newChunks = new TreeMap<>();
|
||||
@@ -501,7 +529,16 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
}
|
||||
}
|
||||
|
||||
file = file.withChunks(file.chunks().minusAll(removedChunks.keySet()).plusAll(newChunks)).withMTime(System.currentTimeMillis());
|
||||
for (var e : removedChunks.entrySet()) {
|
||||
Log.tracev("Removing chunk {0}-{1}", e.getKey(), e.getValue());
|
||||
jMapHelper.delete(file, JMapLongKey.of(e.getKey()));
|
||||
}
|
||||
|
||||
for (var e : newChunks.entrySet()) {
|
||||
Log.tracev("Adding chunk {0}-{1}", e.getKey(), e.getValue());
|
||||
jMapHelper.put(file, JMapLongKey.of(e.getKey()), e.getValue());
|
||||
}
|
||||
|
||||
remoteTx.putData(file);
|
||||
cleanupChunks(file, removedChunks.values());
|
||||
updateFileSize(file);
|
||||
@@ -523,11 +560,17 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
}
|
||||
|
||||
if (length == 0) {
|
||||
var oldChunks = file.chunks();
|
||||
|
||||
file = file.withChunks(TreePMap.empty()).withMTime(System.currentTimeMillis());
|
||||
try (var it = jMapHelper.getIterator(file, IteratorStart.GE, JMapLongKey.of(0))) {
|
||||
while (it.hasNext()) {
|
||||
var next = it.next();
|
||||
jMapHelper.delete(file, next.getKey());
|
||||
}
|
||||
}
|
||||
// var oldChunks = file.chunks();
|
||||
//
|
||||
// file = file.withChunks(TreePMap.empty()).withMTime(System.currentTimeMillis());
|
||||
remoteTx.putData(file);
|
||||
cleanupChunks(file, oldChunks.values());
|
||||
// cleanupChunks(file, oldChunks.values());
|
||||
updateFileSize(file);
|
||||
return true;
|
||||
}
|
||||
@@ -535,7 +578,6 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
var curSize = size(fileUuid);
|
||||
if (curSize == length) return true;
|
||||
|
||||
var chunksAll = file.chunks();
|
||||
NavigableMap<Long, JObjectKey> removedChunks = new TreeMap<>();
|
||||
NavigableMap<Long, JObjectKey> newChunks = new TreeMap<>();
|
||||
|
||||
@@ -573,20 +615,64 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
}
|
||||
}
|
||||
} else {
|
||||
var tail = chunksAll.lowerEntry(length);
|
||||
var afterTail = chunksAll.tailMap(tail.getKey(), false);
|
||||
// Pair<JMapLongKey, JMapEntry<JMapLongKey>> first;
|
||||
Pair<JMapLongKey, JMapEntry<JMapLongKey>> last;
|
||||
try (var it = jMapHelper.getIterator(file, IteratorStart.LT, JMapLongKey.of(length))) {
|
||||
last = it.hasNext() ? it.next() : null;
|
||||
while (it.hasNext()) {
|
||||
var next = it.next();
|
||||
removedChunks.put(next.getKey().key(), next.getValue().ref());
|
||||
}
|
||||
}
|
||||
removedChunks.put(last.getKey().key(), last.getValue().ref());
|
||||
//
|
||||
// NavigableMap<Long, JObjectKey> removedChunks = new TreeMap<>();
|
||||
//
|
||||
// long start = 0;
|
||||
//
|
||||
// try (var it = jMapHelper.getIterator(file, IteratorStart.LE, JMapLongKey.of(offset))) {
|
||||
// first = it.hasNext() ? it.next() : null;
|
||||
// boolean empty = last == null;
|
||||
// if (first != null && getChunkSize(first.getValue().ref()) + first.getKey().key() <= offset) {
|
||||
// first = null;
|
||||
// last = null;
|
||||
// start = offset;
|
||||
// } else if (!empty) {
|
||||
// assert first != null;
|
||||
// removedChunks.put(first.getKey().key(), first.getValue().ref());
|
||||
// while (it.hasNext() && it.peekNextKey() != last.getKey()) {
|
||||
// var next = it.next();
|
||||
// removedChunks.put(next.getKey().key(), next.getValue().ref());
|
||||
// }
|
||||
// removedChunks.put(last.getKey().key(), last.getValue().ref());
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// var tail = chunksAll.lowerEntry(length);
|
||||
// var afterTail = chunksAll.tailMap(tail.getKey(), false);
|
||||
//
|
||||
// removedChunks.put(tail.getKey(), tail.getValue());
|
||||
// removedChunks.putAll(afterTail);
|
||||
|
||||
removedChunks.put(tail.getKey(), tail.getValue());
|
||||
removedChunks.putAll(afterTail);
|
||||
|
||||
var tailBytes = readChunk(tail.getValue());
|
||||
var newChunk = tailBytes.substring(0, (int) (length - tail.getKey()));
|
||||
var tailBytes = readChunk(last.getValue().ref());
|
||||
var newChunk = tailBytes.substring(0, (int) (length - last.getKey().key()));
|
||||
|
||||
ChunkData newChunkData = createChunk(newChunk);
|
||||
newChunks.put(tail.getKey(), newChunkData.key());
|
||||
newChunks.put(last.getKey().key(), newChunkData.key());
|
||||
}
|
||||
|
||||
// file = file.withChunks(file.chunks().minusAll(removedChunks.keySet()).plusAll(newChunks)).withMTime(System.currentTimeMillis());
|
||||
|
||||
for (var e : removedChunks.entrySet()) {
|
||||
Log.tracev("Removing chunk {0}-{1}", e.getKey(), e.getValue());
|
||||
jMapHelper.delete(file, JMapLongKey.of(e.getKey()));
|
||||
}
|
||||
|
||||
for (var e : newChunks.entrySet()) {
|
||||
Log.tracev("Adding chunk {0}-{1}", e.getKey(), e.getValue());
|
||||
jMapHelper.put(file, JMapLongKey.of(e.getKey()), e.getValue());
|
||||
}
|
||||
|
||||
file = file.withChunks(file.chunks().minusAll(removedChunks.keySet()).plusAll(newChunks)).withMTime(System.currentTimeMillis());
|
||||
remoteTx.putData(file);
|
||||
cleanupChunks(file, removedChunks.values());
|
||||
updateFileSize(file);
|
||||
@@ -623,7 +709,8 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
Log.debug("Creating file " + fuuid);
|
||||
|
||||
ChunkData newChunkData = createChunk(UnsafeByteOperations.unsafeWrap(oldpath.getBytes(StandardCharsets.UTF_8)));
|
||||
File f = new File(JObjectKey.of(fuuid.toString()), 0, System.currentTimeMillis(), System.currentTimeMillis(), TreePMap.<Long, JObjectKey>empty().plus(0L, newChunkData.key()), true, 0);
|
||||
File f = new File(JObjectKey.of(fuuid.toString()), 0, System.currentTimeMillis(), System.currentTimeMillis(), true, 0);
|
||||
jMapHelper.put(f, JMapLongKey.of(0), newChunkData.key());
|
||||
|
||||
updateFileSize(f);
|
||||
|
||||
@@ -650,10 +737,14 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
jObjectTxManager.executeTx(() -> {
|
||||
long realSize = 0;
|
||||
|
||||
if (!file.chunks().isEmpty()) {
|
||||
var last = file.chunks().lastEntry();
|
||||
var lastSize = getChunkSize(last.getValue());
|
||||
realSize = last.getKey() + lastSize;
|
||||
Pair<JMapLongKey, JMapEntry<JMapLongKey>> last;
|
||||
Log.tracev("Getting last");
|
||||
try (var it = jMapHelper.getIterator(file, IteratorStart.LT, JMapLongKey.max())) {
|
||||
last = it.hasNext() ? it.next() : null;
|
||||
}
|
||||
|
||||
if (last != null) {
|
||||
realSize = last.getKey().key() + getChunkSize(last.getValue().ref());
|
||||
}
|
||||
|
||||
if (realSize != file.size()) {
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
package com.usatiuk.dhfs.objects;
|
||||
|
||||
import com.usatiuk.dhfs.objects.jkleppmanntree.structs.JKleppmannTreeNode;
|
||||
import com.usatiuk.dhfs.objects.jmap.JMapEntry;
|
||||
import com.usatiuk.dhfs.objects.transaction.Transaction;
|
||||
import io.quarkus.logging.Log;
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
@@ -11,24 +11,30 @@ public class RefcounterTxHook implements PreCommitTxHook {
|
||||
@Inject
|
||||
Transaction curTx;
|
||||
|
||||
private JDataRefcounted getRef(JDataRefcounted cur, JObjectKey key) {
|
||||
private JDataRefcounted getRef(JObjectKey key) {
|
||||
var found = curTx.get(JDataRefcounted.class, key).orElse(null);
|
||||
|
||||
if (found != null) {
|
||||
return found;
|
||||
}
|
||||
|
||||
if (cur instanceof RemoteObjectDataWrapper<?> || cur instanceof JKleppmannTreeNode) {
|
||||
// FIXME:
|
||||
return new RemoteObjectMeta(key);
|
||||
} else {
|
||||
return found;
|
||||
}
|
||||
|
||||
return new RemoteObjectMeta(key);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onChange(JObjectKey key, JData old, JData cur) {
|
||||
if (cur instanceof JMapEntry<?> me) {
|
||||
var oldMe = (JMapEntry<?>) old;
|
||||
var oldRef = oldMe.ref();
|
||||
var curRef = me.ref();
|
||||
var referencedOld = getRef(oldRef);
|
||||
curTx.put(referencedOld.withRefsFrom(referencedOld.refsFrom().minus(key)));
|
||||
var referencedCur = getRef(curRef);
|
||||
curTx.put(referencedCur.withRefsFrom(referencedCur.refsFrom().plus(key)));
|
||||
Log.tracev("Removed ref from {0} to {1}, added ref to {2}", key, oldRef, curRef);
|
||||
return;
|
||||
}
|
||||
|
||||
if (!(cur instanceof JDataRefcounted refCur)) {
|
||||
return;
|
||||
}
|
||||
@@ -39,7 +45,7 @@ public class RefcounterTxHook implements PreCommitTxHook {
|
||||
|
||||
for (var curRef : curRefs) {
|
||||
if (!oldRefs.contains(curRef)) {
|
||||
var referenced = getRef(refCur, curRef);
|
||||
var referenced = getRef(curRef);
|
||||
curTx.put(referenced.withRefsFrom(referenced.refsFrom().plus(key)));
|
||||
Log.tracev("Added ref from {0} to {1}", key, curRef);
|
||||
}
|
||||
@@ -47,7 +53,7 @@ public class RefcounterTxHook implements PreCommitTxHook {
|
||||
|
||||
for (var oldRef : oldRefs) {
|
||||
if (!curRefs.contains(oldRef)) {
|
||||
var referenced = getRef(refCur, oldRef);
|
||||
var referenced = getRef(oldRef);
|
||||
curTx.put(referenced.withRefsFrom(referenced.refsFrom().minus(key)));
|
||||
Log.tracev("Removed ref from {0} to {1}", key, oldRef);
|
||||
}
|
||||
@@ -56,12 +62,20 @@ public class RefcounterTxHook implements PreCommitTxHook {
|
||||
|
||||
@Override
|
||||
public void onCreate(JObjectKey key, JData cur) {
|
||||
if (cur instanceof JMapEntry<?> me) {
|
||||
var curRef = me.ref();
|
||||
var referencedCur = getRef(curRef);
|
||||
curTx.put(referencedCur.withRefsFrom(referencedCur.refsFrom().plus(key)));
|
||||
Log.tracev("Added ref from {0} to {1}", key, curRef);
|
||||
return;
|
||||
}
|
||||
|
||||
if (!(cur instanceof JDataRefcounted refCur)) {
|
||||
return;
|
||||
}
|
||||
|
||||
for (var newRef : refCur.collectRefsTo()) {
|
||||
var referenced = getRef(refCur, newRef);
|
||||
var referenced = getRef(newRef);
|
||||
curTx.put(referenced.withRefsFrom(referenced.refsFrom().plus(key)));
|
||||
Log.tracev("Added ref from {0} to {1}", key, newRef);
|
||||
}
|
||||
@@ -69,12 +83,20 @@ public class RefcounterTxHook implements PreCommitTxHook {
|
||||
|
||||
@Override
|
||||
public void onDelete(JObjectKey key, JData cur) {
|
||||
if (cur instanceof JMapEntry<?> me) {
|
||||
var oldRef = me.ref();
|
||||
var referencedOld = getRef(oldRef);
|
||||
curTx.put(referencedOld.withRefsFrom(referencedOld.refsFrom().minus(key)));
|
||||
Log.tracev("Removed ref from {0} to {1}", key, oldRef);
|
||||
return;
|
||||
}
|
||||
|
||||
if (!(cur instanceof JDataRefcounted refCur)) {
|
||||
return;
|
||||
}
|
||||
|
||||
for (var removedRef : refCur.collectRefsTo()) {
|
||||
var referenced = getRef(refCur, removedRef);
|
||||
var referenced = getRef(removedRef);
|
||||
curTx.put(referenced.withRefsFrom(referenced.refsFrom().minus(key)));
|
||||
Log.tracev("Removed ref from {0} to {1}", key, removedRef);
|
||||
}
|
||||
|
||||
@@ -45,7 +45,7 @@ public record RemoteObjectMeta(PCollection<JObjectKey> refsFrom, boolean frozen,
|
||||
}
|
||||
|
||||
public static JObjectKey ofDataKey(JObjectKey key) {
|
||||
return JObjectKey.of(key.name() + "_data");
|
||||
return JObjectKey.of("data_" + key.name());
|
||||
}
|
||||
|
||||
public JObjectKey dataKey() {
|
||||
|
||||
@@ -0,0 +1,12 @@
|
||||
package com.usatiuk.dhfs.objects.jmap;
|
||||
|
||||
import com.usatiuk.dhfs.objects.JData;
|
||||
import com.usatiuk.dhfs.objects.JObjectKey;
|
||||
|
||||
public record JMapEntry<K extends JMapKey & Comparable<K>>(JObjectKey holder, K selfKey,
|
||||
JObjectKey ref) implements JData {
|
||||
@Override
|
||||
public JObjectKey key() {
|
||||
return JMapHelper.makeKey(holder, selfKey);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,49 @@
|
||||
package com.usatiuk.dhfs.objects.jmap;
|
||||
|
||||
import com.usatiuk.dhfs.objects.CloseableKvIterator;
|
||||
import com.usatiuk.dhfs.objects.JObjectKey;
|
||||
import com.usatiuk.dhfs.objects.persistence.IteratorStart;
|
||||
import com.usatiuk.dhfs.objects.transaction.Transaction;
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
import jakarta.inject.Inject;
|
||||
|
||||
import java.util.Optional;
|
||||
|
||||
@ApplicationScoped
|
||||
public class JMapHelper {
|
||||
@Inject
|
||||
Transaction curTx;
|
||||
|
||||
static <K extends JMapKey & Comparable<K>> JObjectKey makePrefix(JObjectKey holder) {
|
||||
return JObjectKey.of(holder.name() + "/");
|
||||
}
|
||||
|
||||
static <K extends JMapKey & Comparable<K>> JObjectKey makeKey(JObjectKey holder, K key) {
|
||||
return JObjectKey.of(makePrefix(holder).name() + key.toString());
|
||||
}
|
||||
|
||||
public <K extends JMapKey & Comparable<K>> CloseableKvIterator<K, JMapEntry<K>> getIterator(JMapHolder<K> holder, IteratorStart start, K key) {
|
||||
return new JMapIterator<>(curTx.getIterator(start, makeKey(holder.key(), key)), holder);
|
||||
}
|
||||
|
||||
public <K extends JMapKey & Comparable<K>> CloseableKvIterator<K, JMapEntry<K>> getIterator(JMapHolder<K> holder, K key) {
|
||||
return getIterator(holder, IteratorStart.GE, key);
|
||||
}
|
||||
|
||||
public <K extends JMapKey & Comparable<K>> CloseableKvIterator<K, JMapEntry<K>> getIterator(JMapHolder<K> holder, IteratorStart start) {
|
||||
return new JMapIterator<>(curTx.getIterator(start, makePrefix(holder.key())), holder);
|
||||
}
|
||||
|
||||
public <K extends JMapKey & Comparable<K>> void put(JMapHolder<K> holder, K key, JObjectKey ref) {
|
||||
curTx.put(new JMapEntry<>(holder.key(), key, ref));
|
||||
}
|
||||
|
||||
public <K extends JMapKey & Comparable<K>> Optional<JMapEntry<K>> get(JMapHolder<K> holder, K key) {
|
||||
// TODO:
|
||||
return curTx.get(JMapEntry.class, makeKey(holder.key(), key)).map(e -> (JMapEntry<K>) e);
|
||||
}
|
||||
|
||||
public <K extends JMapKey & Comparable<K>> void delete(JMapHolder<K> holder, K key) {
|
||||
curTx.delete(makeKey(holder.key(), key));
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,6 @@
|
||||
package com.usatiuk.dhfs.objects.jmap;
|
||||
|
||||
import com.usatiuk.dhfs.objects.JData;
|
||||
|
||||
public interface JMapHolder<K extends JMapKey & Comparable<K>> extends JData {
|
||||
}
|
||||
@@ -0,0 +1,75 @@
|
||||
package com.usatiuk.dhfs.objects.jmap;
|
||||
|
||||
import com.usatiuk.dhfs.objects.CloseableKvIterator;
|
||||
import com.usatiuk.dhfs.objects.JData;
|
||||
import com.usatiuk.dhfs.objects.JObjectKey;
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
|
||||
public class JMapIterator<K extends JMapKey & Comparable<K>> implements CloseableKvIterator<K, JMapEntry<K>> {
|
||||
private final CloseableKvIterator<JObjectKey, JData> _backing;
|
||||
private final JObjectKey _prefix;
|
||||
private boolean _hasNext = true;
|
||||
|
||||
public JMapIterator(CloseableKvIterator<JObjectKey, JData> backing, JMapHolder<K> holder) {
|
||||
_backing = backing;
|
||||
_prefix = JMapHelper.makePrefix(holder.key());
|
||||
advance();
|
||||
}
|
||||
|
||||
void advance() {
|
||||
assert _hasNext;
|
||||
if (!_backing.hasNext()) {
|
||||
_hasNext = false;
|
||||
return;
|
||||
}
|
||||
if (!_backing.peekNextKey().name().startsWith(_prefix.name())) {
|
||||
_backing.skip();
|
||||
if (!_backing.peekNextKey().name().startsWith(_prefix.name())) {
|
||||
_hasNext = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public K keyToKey(JObjectKey key) {
|
||||
var keyPart = key.name().substring(_prefix.name().length());
|
||||
return (K) JMapLongKey.of(Long.parseLong(keyPart));
|
||||
}
|
||||
|
||||
@Override
|
||||
public K peekNextKey() {
|
||||
if (!_hasNext) {
|
||||
throw new IllegalStateException("No next element");
|
||||
}
|
||||
|
||||
return keyToKey(_backing.peekNextKey());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void skip() {
|
||||
if (!_hasNext) {
|
||||
throw new IllegalStateException("No next element");
|
||||
}
|
||||
advance();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
_backing.close();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasNext() {
|
||||
return _hasNext;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Pair<K, JMapEntry<K>> next() {
|
||||
if (!_hasNext) {
|
||||
throw new IllegalStateException("No next element");
|
||||
}
|
||||
var next = _backing.next();
|
||||
assert next.getKey().name().startsWith(_prefix.name());
|
||||
advance();
|
||||
return Pair.of(keyToKey(next.getKey()), (JMapEntry<K>) next.getValue());
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,4 @@
|
||||
package com.usatiuk.dhfs.objects.jmap;
|
||||
|
||||
public interface JMapKey {
|
||||
}
|
||||
@@ -0,0 +1,24 @@
|
||||
package com.usatiuk.dhfs.objects.jmap;
|
||||
|
||||
import javax.annotation.Nonnull;
|
||||
import java.io.Serializable;
|
||||
|
||||
public record JMapLongKey(long key) implements JMapKey, Comparable<JMapLongKey>, Serializable {
|
||||
public static JMapLongKey of(long key) {
|
||||
return new JMapLongKey(key);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return String.format("%016d", key);
|
||||
}
|
||||
|
||||
public static JMapLongKey max() {
|
||||
return new JMapLongKey(Long.MAX_VALUE);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int compareTo(@Nonnull JMapLongKey o) {
|
||||
return Long.compare(key, o.key);
|
||||
}
|
||||
}
|
||||
@@ -24,4 +24,12 @@ public record PersistentRemoteHostsData(PeerId selfUuid,
|
||||
public PersistentRemoteHostsData withInitialSyncDone(PSet<PeerId> initialSyncDone) {
|
||||
return new PersistentRemoteHostsData(selfUuid, selfCertificate, selfKeyPair, initialSyncDone);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "PersistentRemoteHostsData{" +
|
||||
"selfUuid=" + selfUuid +
|
||||
", initialSyncDone=" + initialSyncDone +
|
||||
'}';
|
||||
}
|
||||
}
|
||||
|
||||
@@ -16,4 +16,12 @@ public record PeerInfo(JObjectKey key, PeerId id, ByteString cert) implements JD
|
||||
public X509Certificate parsedCert() {
|
||||
return CertificateTools.certFromBytes(cert.toByteArray());
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "PeerInfo{" +
|
||||
"key=" + key +
|
||||
", id=" + id +
|
||||
'}';
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
package com.usatiuk.dhfs.files;
|
||||
|
||||
import com.usatiuk.dhfs.TempDataProfile;
|
||||
import com.usatiuk.dhfs.files.objects.ChunkData;
|
||||
import com.usatiuk.dhfs.files.objects.File;
|
||||
import com.usatiuk.dhfs.files.service.DhfsFileService;
|
||||
import com.usatiuk.dhfs.objects.RemoteTransaction;
|
||||
@@ -112,6 +111,7 @@ public class DhfsFileServiceSimpleTestImpl {
|
||||
|
||||
fileService.write(uuid, 0, new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9});
|
||||
Assertions.assertArrayEquals(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, fileService.read(uuid, 0, 10).get().toByteArray());
|
||||
Assertions.assertArrayEquals(new byte[]{2, 3, 4, 5, 6, 7, 8, 9}, fileService.read(uuid, 2, 8).get().toByteArray());
|
||||
fileService.write(uuid, 4, new byte[]{10, 11, 12});
|
||||
Assertions.assertArrayEquals(new byte[]{0, 1, 2, 3, 10, 11, 12, 7, 8, 9}, fileService.read(uuid, 0, 10).get().toByteArray());
|
||||
fileService.write(uuid, 10, new byte[]{13, 14});
|
||||
@@ -154,19 +154,23 @@ public class DhfsFileServiceSimpleTestImpl {
|
||||
Assertions.assertArrayEquals(new byte[]{0, 1, 2, 3, 4, 10, 11, 12, 13, 14, 15, 16, 17, 0, 0, 0, 0, 0, 0, 0}, fileService.read(uuid, 0, 20).get().toByteArray());
|
||||
}
|
||||
|
||||
@Test
|
||||
@RepeatedTest(100)
|
||||
void truncateTest2() {
|
||||
var ret = fileService.create("/truncateTest2", 777);
|
||||
Assertions.assertTrue(ret.isPresent());
|
||||
try {
|
||||
Assertions.assertTrue(ret.isPresent());
|
||||
|
||||
var uuid = ret.get();
|
||||
var uuid = ret.get();
|
||||
|
||||
fileService.write(uuid, 0, new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9});
|
||||
Assertions.assertArrayEquals(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, fileService.read(uuid, 0, 10).get().toByteArray());
|
||||
fileService.write(uuid, 0, new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9});
|
||||
Assertions.assertArrayEquals(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, fileService.read(uuid, 0, 10).get().toByteArray());
|
||||
|
||||
fileService.truncate(uuid, 20);
|
||||
fileService.write(uuid, 10, new byte[]{11, 12, 13, 14, 15, 16, 17, 18, 19, 20});
|
||||
Assertions.assertArrayEquals(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20}, fileService.read(uuid, 0, 20).get().toByteArray());
|
||||
fileService.truncate(uuid, 20);
|
||||
fileService.write(uuid, 10, new byte[]{11, 12, 13, 14, 15, 16, 17, 18, 19, 20});
|
||||
Assertions.assertArrayEquals(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20}, fileService.read(uuid, 0, 20).get().toByteArray());
|
||||
} finally {
|
||||
fileService.unlink("/truncateTest2");
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -217,8 +221,8 @@ public class DhfsFileServiceSimpleTestImpl {
|
||||
|
||||
jObjectTxManager.run(() -> {
|
||||
var oldfile = remoteTx.getData(File.class, ret2.get()).orElseThrow(IllegalStateException::new);
|
||||
var chunk = oldfile.chunks().get(0L);
|
||||
var chunkObj = remoteTx.getData(ChunkData.class, chunk).orElseThrow(IllegalStateException::new);
|
||||
// var chunk = oldfile.chunks().get(0L);
|
||||
// var chunkObj = remoteTx.getData(ChunkData.class, chunk).orElseThrow(IllegalStateException::new);
|
||||
});
|
||||
|
||||
Assertions.assertTrue(fileService.rename("/moveOverTest1", "/moveOverTest2"));
|
||||
|
||||
Reference in New Issue
Block a user