more fixes 2

This commit is contained in:
2025-02-22 22:16:03 +01:00
parent fa76828d04
commit b12606f9f4

View File

@@ -85,12 +85,15 @@ public class SnapshotManager {
boolean hadBackward = false; boolean hadBackward = false;
for (var action : writes) { for (var action : writes) {
var current = delegateStore.readObjectVerbose(action.key()); var current = delegateStore.readObjectVerbose(action.key());
// Add to snapshot the previous visible version of the replaced object
// I.e. should be visible to all transactions with id <= id
// and at least as its corresponding version
Pair<SnapshotKey, SnapshotEntry> newSnapshotEntry = switch (current) { Pair<SnapshotKey, SnapshotEntry> newSnapshotEntry = switch (current) {
case WritebackObjectPersistentStore.VerboseReadResultPersisted( case WritebackObjectPersistentStore.VerboseReadResultPersisted(
Optional<JDataVersionedWrapper> data Optional<JDataVersionedWrapper> data
) -> { ) -> {
hadBackward = true; hadBackward = true;
yield Pair.of(new SnapshotKey(action.key(), _snapshotIds.peek()), yield Pair.of(new SnapshotKey(action.key(), Math.max(_snapshotIds.peek(), data.map(JDataVersionedWrapper::version).orElse(0L))),
data.<SnapshotEntry>map(o -> new SnapshotEntryRead(o, id)).orElse(new SnapshotEntryReadEmpty(id))); data.<SnapshotEntry>map(o -> new SnapshotEntryRead(o, id)).orElse(new SnapshotEntryReadEmpty(id)));
} }
case WritebackObjectPersistentStore.VerboseReadResultPending( case WritebackObjectPersistentStore.VerboseReadResultPending(
@@ -99,15 +102,22 @@ public class SnapshotManager {
assert pending.bundleId() < id; assert pending.bundleId() < id;
yield switch (pending) { yield switch (pending) {
case TxWriteback.PendingWrite write -> case TxWriteback.PendingWrite write ->
Pair.of(new SnapshotKey(action.key(), write.bundleId()), new SnapshotEntryObject(write.data(), write.bundleId())); Pair.of(new SnapshotKey(action.key(), write.bundleId()), new SnapshotEntryObject(write.data(), id));
case TxWriteback.PendingDelete delete -> case TxWriteback.PendingDelete delete ->
Pair.of(new SnapshotKey(action.key(), delete.bundleId()), new SnapshotEntryDeleted(delete.bundleId())); Pair.of(new SnapshotKey(action.key(), delete.bundleId()), new SnapshotEntryDeleted(id));
default -> throw new IllegalStateException("Unexpected value: " + pending); default -> throw new IllegalStateException("Unexpected value: " + pending);
}; };
} }
default -> throw new IllegalStateException("Unexpected value: " + current); default -> throw new IllegalStateException("Unexpected value: " + current);
}; };
if (newSnapshotEntry.getValue() instanceof SnapshotEntryRead re) {
assert re.data().version() <= newSnapshotEntry.getKey().version();
}
if (newSnapshotEntry.getValue() instanceof SnapshotEntryObject re) {
assert re.data().version() <= newSnapshotEntry.getKey().version();
}
Log.tracev("Adding snapshot entry {0}", newSnapshotEntry); Log.tracev("Adding snapshot entry {0}", newSnapshotEntry);
var val = _objects.put(newSnapshotEntry.getLeft(), newSnapshotEntry.getRight()); var val = _objects.put(newSnapshotEntry.getLeft(), newSnapshotEntry.getRight());
@@ -147,11 +157,16 @@ public class SnapshotManager {
_snapshotIds.poll(); _snapshotIds.poll();
_snapshotVersions.remove(curId); _snapshotVersions.remove(curId);
nextId = _snapshotIds.isEmpty() ? -1 : _snapshotIds.peek(); nextId = _snapshotIds.isEmpty() ? -1 : _snapshotIds.peek();
while (nextId == curId) {
_snapshotIds.poll();
nextId = _snapshotIds.isEmpty() ? -1 : _snapshotIds.peek();
}
var keys = _snapshotBounds.headMap(curId, true); var keys = _snapshotBounds.headMap(curId, true);
long finalCurId = curId; long finalCurId = curId;
long finalNextId = nextId; long finalNextId = nextId;
ArrayList<Pair<Long, SnapshotKey>> toReAdd = new ArrayList<>();
keys.values().stream().flatMap(Collection::stream).forEach(key -> { keys.values().stream().flatMap(Collection::stream).forEach(key -> {
var entry = _objects.get(key); var entry = _objects.get(key);
if (entry == null) { if (entry == null) {
@@ -162,21 +177,21 @@ public class SnapshotManager {
Log.tracev("Could not find place to place entry {0}, curId={1}, nextId={2}, whenToRemove={3}, snapshotIds={4}", Log.tracev("Could not find place to place entry {0}, curId={1}, nextId={2}, whenToRemove={3}, snapshotIds={4}",
entry, finalCurId, finalNextId, entry.whenToRemove(), _snapshotIds); entry, finalCurId, finalNextId, entry.whenToRemove(), _snapshotIds);
} else if (finalNextId < entry.whenToRemove()) { } else if (finalNextId < entry.whenToRemove()) {
if (!(entry instanceof SnapshotEntryRead || entry instanceof SnapshotEntryReadEmpty)) {
Log.errorv("Unexpected entry type: {0}, key: {1}, nextId: {2}, whenToRemove: {3}, snapshotIds: {4}",
entry, key, finalNextId, entry.whenToRemove(), _snapshotIds);
assert false;
}
_objects.put(new SnapshotKey(key.key(), finalNextId), entry); _objects.put(new SnapshotKey(key.key(), finalNextId), entry);
_snapshotBounds.merge(finalNextId, new ArrayDeque<>(List.of(new SnapshotKey(key.key(), finalNextId))), assert finalNextId > finalCurId;
(a, b) -> { toReAdd.add(Pair.of(finalNextId, new SnapshotKey(key.key(), finalNextId)));
a.addAll(b);
return a;
});
} }
_objects.remove(key); _objects.remove(key);
}); });
toReAdd.forEach(p -> {
_snapshotBounds.merge(p.getLeft(), new ArrayDeque<>(List.of(p.getRight())),
(a, b) -> {
a.addAll(b);
return a;
});
});
keys.clear(); keys.clear();
if (_snapshotIds.isEmpty()) { if (_snapshotIds.isEmpty()) {
@@ -255,7 +270,7 @@ public class SnapshotManager {
nextNextKey = _backing.hasNext() ? _backing.peekNextKey() : null; nextNextKey = _backing.hasNext() ? _backing.peekNextKey() : null;
} }
// next.getValue().whenToRemove() >=_id, read tx might have same snapshot id as some write tx // next.getValue().whenToRemove() >=_id, read tx might have same snapshot id as some write tx
if (next.getKey().version() <= _id && next.getValue().whenToRemove() >= _id) { if (next.getKey().version() <= _id && next.getValue().whenToRemove() > _id) {
_next = switch (next.getValue()) { _next = switch (next.getValue()) {
case SnapshotEntryObject(JDataVersionedWrapper data, long whenToRemove) -> case SnapshotEntryObject(JDataVersionedWrapper data, long whenToRemove) ->
Pair.of(next.getKey().key(), new TombstoneMergingKvIterator.Data<>(data)); Pair.of(next.getKey().key(), new TombstoneMergingKvIterator.Data<>(data));
@@ -268,6 +283,13 @@ public class SnapshotManager {
default -> throw new IllegalStateException("Unexpected value: " + next.getValue()); default -> throw new IllegalStateException("Unexpected value: " + next.getValue());
}; };
} }
if (_next != null) {
if (_next.getValue() instanceof TombstoneMergingKvIterator.Data<JDataVersionedWrapper>(
JDataVersionedWrapper value
)) {
assert value.version() <= _id;
}
}
} }
} }
@@ -293,6 +315,12 @@ public class SnapshotManager {
if (_next == null) if (_next == null)
throw new NoSuchElementException("No more elements"); throw new NoSuchElementException("No more elements");
var ret = _next; var ret = _next;
if (ret.getValue() instanceof TombstoneMergingKvIterator.Data<JDataVersionedWrapper>(
JDataVersionedWrapper value
)) {
assert value.version() <= _id;
}
_next = null; _next = null;
fillNext(); fillNext();
Log.tracev("Read: {0}, next: {1}", ret, _next); Log.tracev("Read: {0}, next: {1}", ret, _next);