slight cleanup

This commit is contained in:
2025-02-23 10:11:34 +01:00
parent cde5e44e77
commit c7104e772e

View File

@@ -13,6 +13,7 @@ import javax.annotation.Nonnull;
import java.lang.ref.Cleaner;
import java.util.*;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Function;
@@ -23,36 +24,12 @@ public class SnapshotManager {
private interface SnapshotEntry {
long whenToRemove();
SnapshotEntry withWhenToRemove(long whenToRemove);
}
private record SnapshotEntryRead(JDataVersionedWrapper data, long whenToRemove) implements SnapshotEntry {
@Override
public SnapshotEntry withWhenToRemove(long whenToRemove) {
return new SnapshotEntryRead(data, whenToRemove);
}
}
private record SnapshotEntryReadEmpty(long whenToRemove) implements SnapshotEntry {
@Override
public SnapshotEntry withWhenToRemove(long whenToRemove) {
return new SnapshotEntryReadEmpty(whenToRemove);
}
}
private record SnapshotEntryObject(JDataVersionedWrapper data, long whenToRemove) implements SnapshotEntry {
@Override
public SnapshotEntry withWhenToRemove(long whenToRemove) {
return new SnapshotEntryObject(data, whenToRemove);
}
}
private record SnapshotEntryDeleted(long whenToRemove) implements SnapshotEntry {
@Override
public SnapshotEntry withWhenToRemove(long whenToRemove) {
return new SnapshotEntryDeleted(whenToRemove);
}
}
private record SnapshotKey(JObjectKey key, long version) implements Comparable<SnapshotKey> {
@@ -66,12 +43,12 @@ public class SnapshotManager {
private long _lastSnapshotId = 0;
private long _lastAliveSnapshotId = -1;
private final AtomicLong _snapshotVersion = new AtomicLong(0);
private final Queue<Long> _snapshotIds = new ArrayDeque<>();
private final ConcurrentSkipListMap<SnapshotKey, SnapshotEntry> _objects = new ConcurrentSkipListMap<>();
private final TreeMap<Long, ArrayDeque<SnapshotKey>> _snapshotBounds = new TreeMap<>();
private final HashMap<Long, Long> _snapshotRefCounts = new HashMap<>();
private final ConcurrentSkipListMap<Long, Long> _snapshotVersions = new ConcurrentSkipListMap<>();
private void verify() {
assert _snapshotIds.isEmpty() == (_lastAliveSnapshotId == -1);
@@ -83,7 +60,6 @@ public class SnapshotManager {
assert id > _lastSnapshotId;
if (!_snapshotIds.isEmpty()) {
verify();
boolean hadBackward = false;
for (var action : writes) {
var current = delegateStore.readObjectVerbose(action.key());
// Add to snapshot the previous visible version of the replaced object
@@ -93,9 +69,8 @@ public class SnapshotManager {
case WritebackObjectPersistentStore.VerboseReadResultPersisted(
Optional<JDataVersionedWrapper> data
) -> {
hadBackward = true;
yield Pair.of(new SnapshotKey(action.key(), Math.max(_snapshotIds.peek(), data.map(JDataVersionedWrapper::version).orElse(0L))),
data.<SnapshotEntry>map(o -> new SnapshotEntryRead(o, id)).orElse(new SnapshotEntryReadEmpty(id)));
data.<SnapshotEntry>map(o -> new SnapshotEntryObject(o, id)).orElse(new SnapshotEntryDeleted(id)));
}
case WritebackObjectPersistentStore.VerboseReadResultPending(
TxWriteback.PendingWriteEntry pending
@@ -112,7 +87,7 @@ public class SnapshotManager {
default -> throw new IllegalStateException("Unexpected value: " + current);
};
if (newSnapshotEntry.getValue() instanceof SnapshotEntryRead re) {
if (newSnapshotEntry.getValue() instanceof SnapshotEntryObject re) {
assert re.data().version() <= newSnapshotEntry.getKey().version();
}
if (newSnapshotEntry.getValue() instanceof SnapshotEntryObject re) {
@@ -130,10 +105,7 @@ public class SnapshotManager {
});
}
// if (hadBackward)
for (var sid : _snapshotIds) {
_snapshotVersions.merge(sid, 1L, Long::sum);
}
_snapshotVersion.incrementAndGet();
}
verify();
@@ -156,7 +128,6 @@ public class SnapshotManager {
do {
Log.tracev("Removing snapshot {0}", curId);
_snapshotIds.poll();
_snapshotVersions.remove(curId);
nextId = _snapshotIds.isEmpty() ? -1 : _snapshotIds.peek();
while (nextId == curId) {
_snapshotIds.poll();
@@ -240,7 +211,6 @@ public class SnapshotManager {
_lastAliveSnapshotId = id;
if (_snapshotRefCounts.merge(id, 1L, Long::sum) == 1) {
_snapshotIds.add(id);
_snapshotVersions.put(id, 0L);
}
verify();
}
@@ -275,10 +245,6 @@ public class SnapshotManager {
_next = switch (next.getValue()) {
case SnapshotEntryObject(JDataVersionedWrapper data, long whenToRemove) ->
Pair.of(next.getKey().key(), new TombstoneMergingKvIterator.Data<>(data));
case SnapshotEntryRead(JDataVersionedWrapper data, long whenToRemove) ->
Pair.of(next.getKey().key(), new TombstoneMergingKvIterator.Data<>(data));
case SnapshotEntryReadEmpty(long whenToRemove) ->
Pair.of(next.getKey().key(), new TombstoneMergingKvIterator.Tombstone<>());
case SnapshotEntryDeleted(long whenToRemove) ->
Pair.of(next.getKey().key(), new TombstoneMergingKvIterator.Tombstone<>());
default -> throw new IllegalStateException("Unexpected value: " + next.getValue());
@@ -345,7 +311,7 @@ public class SnapshotManager {
public AutoRefreshingSnapshotKvIterator(IteratorStart start, JObjectKey key) {
synchronized (SnapshotManager.this) {
long curVersion = _snapshotVersions.get(_id);
long curVersion = _snapshotVersion.get();
_backing = new TombstoneMergingKvIterator<>(new SnapshotKvIterator(start, key),
new MappingKvIterator<>(delegateStore.getIterator(start, key), _downstreamTombstoneMapper));
_next = _backing.hasNext() ? _backing.next() : null;
@@ -356,13 +322,13 @@ public class SnapshotManager {
}
private void doRefresh() {
long curVersion = _snapshotVersions.get(_id);
long curVersion = _snapshotVersion.get();
if (curVersion == _lastRefreshed) {
return;
}
if (_next == null) return;
synchronized (SnapshotManager.this) {
curVersion = _snapshotVersions.get(_id);
curVersion = _snapshotVersion.get();
Log.tracev("Refreshing snapshot iterator {0}, last refreshed {1}, current version {2}", _id, _lastRefreshed, curVersion);
_backing.close();
_backing = new TombstoneMergingKvIterator<>(new SnapshotKvIterator(IteratorStart.GE, _next.getKey()),