Objects: simplify TombstoneMergingKvIterator

This commit is contained in:
2025-04-22 23:53:41 +02:00
parent 0e62a29ce0
commit 34db870fc6
5 changed files with 18 additions and 70 deletions

View File

@@ -24,6 +24,8 @@ public class JDataVersionedWrapperSerializer implements ObjectSerializer<JDataVe
public JDataVersionedWrapper deserialize(ByteString data) {
var version = data.substring(0, Long.BYTES).asReadOnlyByteBuffer().getLong();
var rawData = data.substring(Long.BYTES);
return new JDataVersionedWrapperLazy(version, rawData.size(), () -> dataSerializer.deserialize(rawData));
return new JDataVersionedWrapperLazy(version, rawData.size(),
() -> dataSerializer.deserialize(rawData)
);
}
}

View File

@@ -1,21 +1,16 @@
package com.usatiuk.objects.iterators;
import io.quarkus.logging.Log;
import org.apache.commons.lang3.tuple.Pair;
import java.util.List;
public class TombstoneMergingKvIterator<K extends Comparable<K>, V> implements CloseableKvIterator<K, V> {
private final CloseableKvIterator<K, V> _backing;
private final String _name;
public TombstoneMergingKvIterator(String name, IteratorStart startType, K startKey, List<IterProdFn<K, MaybeTombstone<V>>> iterators) {
_name = name;
_backing = new PredicateKvIterator<>(
new MergingKvIterator<>(name + "-merging", startType, startKey, iterators),
public abstract class TombstoneMergingKvIterator {
public static <K extends Comparable<K>, V> CloseableKvIterator<K, V> of(String name, IteratorStart startType, K startKey, List<IterProdFn<K, MaybeTombstone<V>>> iterators) {
return new PredicateKvIterator<K, MaybeTombstone<V>, V>(
new MergingKvIterator<K, MaybeTombstone<V>>(name + "-merging", startType, startKey, iterators),
startType, startKey,
pair -> {
Log.tracev("{0} - Processing pair {1}", _name, pair);
Log.tracev("{0} - Processing pair {1}", name, pair);
if (pair instanceof Tombstone<V>) {
return null;
}
@@ -23,61 +18,7 @@ public class TombstoneMergingKvIterator<K extends Comparable<K>, V> implements C
});
}
@SafeVarargs
public TombstoneMergingKvIterator(String name, IteratorStart startType, K startKey, IterProdFn<K, MaybeTombstone<V>>... iterators) {
this(name, startType, startKey, List.of(iterators));
}
@Override
public K peekNextKey() {
return _backing.peekNextKey();
}
@Override
public void skip() {
_backing.skip();
}
@Override
public K peekPrevKey() {
return _backing.peekPrevKey();
}
@Override
public Pair<K, V> prev() {
return _backing.prev();
}
@Override
public boolean hasPrev() {
return _backing.hasPrev();
}
@Override
public void skipPrev() {
_backing.skipPrev();
}
@Override
public void close() {
_backing.close();
}
@Override
public boolean hasNext() {
return _backing.hasNext();
}
@Override
public Pair<K, V> next() {
return _backing.next();
}
@Override
public String toString() {
return "TombstoneMergingKvIterator{" +
"_backing=" + _backing +
", _name='" + _name + '\'' +
'}';
public static <K extends Comparable<K>, V> CloseableKvIterator<K, V> of(String name, IteratorStart startType, K startKey, IterProdFn<K, MaybeTombstone<V>>... iterators) {
return of(name, startType, startKey, List.of(iterators));
}
}

View File

@@ -186,7 +186,7 @@ public class CachingObjectPersistentStore {
@Override
public CloseableKvIterator<JObjectKey, JDataVersionedWrapper> getIterator(IteratorStart start, JObjectKey key) {
return new TombstoneMergingKvIterator<>("cache", start, key,
return TombstoneMergingKvIterator.<JObjectKey, JDataVersionedWrapper>of("cache", start, key,
(mS, mK) -> new NavigableMapKvIterator<JObjectKey, MaybeTombstone<JDataVersionedWrapper>>(_curCache.map(), mS, mK),
(mS, mK) -> new CachingKvIterator(_backing.getIterator(start, key)));
}

View File

@@ -350,7 +350,7 @@ public class WritebackObjectPersistentStore {
@Override
public CloseableKvIterator<JObjectKey, JDataVersionedWrapper> getIterator(IteratorStart start, JObjectKey key) {
return new TombstoneMergingKvIterator<JObjectKey, JDataVersionedWrapper>("writeback-ps", start, key,
return TombstoneMergingKvIterator.<JObjectKey, JDataVersionedWrapper>of("writeback-ps", start, key,
(tS, tK) -> new NavigableMapKvIterator<>(_pendingWrites, tS, tK),
(tS, tK) -> (CloseableKvIterator<JObjectKey, MaybeTombstone<JDataVersionedWrapper>>) (CloseableKvIterator<JObjectKey, ?>) _cache.getIterator(tS, tK));
}

View File

@@ -161,7 +161,7 @@ public class TransactionFactoryImpl implements TransactionFactory {
@Override
public CloseableKvIterator<JObjectKey, JData> getIterator(IteratorStart start, JObjectKey key) {
Log.tracev("Getting tx iterator with start={0}, key={1}", start, key);
return new ReadTrackingIterator(new TombstoneMergingKvIterator<>("tx", start, key,
return new ReadTrackingIterator(TombstoneMergingKvIterator.<JObjectKey, ReadTrackingInternalCrap>of("tx", start, key,
(tS, tK) -> new MappingKvIterator<>(new NavigableMapKvIterator<>(_writes, tS, tK),
t -> switch (t) {
case TxRecord.TxObjectRecordWrite<?> write ->
@@ -175,6 +175,11 @@ public class TransactionFactoryImpl implements TransactionFactory {
@Override
public void put(JData obj) {
var read = _readSet.get(obj.key());
if (read != null && (read.data().map(JDataVersionedWrapper::data).orElse(null) == obj)) {
return;
}
_writes.put(obj.key(), new TxRecord.TxObjectRecordWrite<>(obj));
_newWrites.put(obj.key(), new TxRecord.TxObjectRecordWrite<>(obj));
}