more fixes 3

This commit is contained in:
2025-02-22 22:39:04 +01:00
parent b12606f9f4
commit cde5e44e77
2 changed files with 14 additions and 3 deletions

View File

@@ -139,7 +139,7 @@ public class JObjectManager {
Log.trace("Committing transaction start");
// FIXME: Better way?
addDependency.accept(JDataDummy.TX_ID_OBJ_NAME);
tx.put(JDataDummy.getInstance());
writes.put(JDataDummy.TX_ID_OBJ_NAME, new TxRecord.TxObjectRecordWrite<>(JDataDummy.getInstance()));
} finally {
readSet = tx.reads();

View File

@@ -14,6 +14,7 @@ import java.lang.ref.Cleaner;
import java.util.*;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.function.Consumer;
import java.util.function.Function;
@ApplicationScoped
public class SnapshotManager {
@@ -334,10 +335,19 @@ public class SnapshotManager {
private long _lastRefreshed = -1L;
private Pair<JObjectKey, JDataVersionedWrapper> _next;
private final Function<TombstoneMergingKvIterator.DataType<JDataVersionedWrapper>, TombstoneMergingKvIterator.DataType<JDataVersionedWrapper>> _downstreamTombstoneMapper
= d -> switch (d) {
case TombstoneMergingKvIterator.Tombstone<JDataVersionedWrapper>() -> d;
case TombstoneMergingKvIterator.Data<JDataVersionedWrapper> data ->
data.value().version() <= _id ? data : new TombstoneMergingKvIterator.Tombstone<>();
default -> throw new IllegalStateException("Unexpected value: " + d);
};
public AutoRefreshingSnapshotKvIterator(IteratorStart start, JObjectKey key) {
synchronized (SnapshotManager.this) {
long curVersion = _snapshotVersions.get(_id);
_backing = new TombstoneMergingKvIterator<>(new SnapshotKvIterator(start, key), delegateStore.getIterator(start, key));
_backing = new TombstoneMergingKvIterator<>(new SnapshotKvIterator(start, key),
new MappingKvIterator<>(delegateStore.getIterator(start, key), _downstreamTombstoneMapper));
_next = _backing.hasNext() ? _backing.next() : null;
if (_next != null)
assert _next.getValue().version() <= _id;
@@ -355,7 +365,8 @@ public class SnapshotManager {
curVersion = _snapshotVersions.get(_id);
Log.tracev("Refreshing snapshot iterator {0}, last refreshed {1}, current version {2}", _id, _lastRefreshed, curVersion);
_backing.close();
_backing = new TombstoneMergingKvIterator<>(new SnapshotKvIterator(IteratorStart.GE, _next.getKey()), delegateStore.getIterator(IteratorStart.GE, _next.getKey()));
_backing = new TombstoneMergingKvIterator<>(new SnapshotKvIterator(IteratorStart.GE, _next.getKey()),
new MappingKvIterator<>(delegateStore.getIterator(IteratorStart.GE, _next.getKey()), _downstreamTombstoneMapper));
var next = _backing.hasNext() ? _backing.next() : null;
boolean fail = false;
if (next == null) {