snapshot leak fix

This commit is contained in:
2025-02-22 20:25:57 +01:00
parent 36bc7eea40
commit 92004a8163

View File

@@ -6,8 +6,6 @@ import com.usatiuk.dhfs.utils.AutoCloseableNoThrow;
import io.quarkus.logging.Log;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import org.apache.commons.collections4.MultiValuedMap;
import org.apache.commons.collections4.multimap.HashSetValuedHashMap;
import org.apache.commons.lang3.mutable.MutableObject;
import org.apache.commons.lang3.tuple.Pair;
@@ -70,7 +68,7 @@ public class SnapshotManager {
private final Queue<Long> _snapshotIds = new ArrayDeque<>();
private final ConcurrentSkipListMap<SnapshotKey, SnapshotEntry> _objects = new ConcurrentSkipListMap<>();
private final MultiValuedMap<Long, SnapshotKey> _snapshotBounds = new HashSetValuedHashMap<>();
private final TreeMap<Long, ArrayDeque<SnapshotKey>> _snapshotBounds = new TreeMap<>();
private final HashMap<Long, Long> _snapshotRefCounts = new HashMap<>();
private final ConcurrentSkipListMap<Long, Long> _snapshotVersions = new ConcurrentSkipListMap<>();
@@ -112,14 +110,19 @@ public class SnapshotManager {
Log.tracev("Adding snapshot entry {0}", newSnapshotEntry);
_objects.put(newSnapshotEntry.getLeft(), newSnapshotEntry.getRight());
_snapshotBounds.put(newSnapshotEntry.getLeft().version(), newSnapshotEntry.getLeft());
var val = _objects.put(newSnapshotEntry.getLeft(), newSnapshotEntry.getRight());
// assert val == null;
_snapshotBounds.merge(newSnapshotEntry.getLeft().version(), new ArrayDeque<>(List.of(newSnapshotEntry.getLeft())),
(a, b) -> {
a.addAll(b);
return a;
});
}
if (hadBackward)
for (var sid : _snapshotIds) {
_snapshotVersions.merge(sid, 1L, Long::sum);
}
// if (hadBackward)
for (var sid : _snapshotIds) {
_snapshotVersions.merge(sid, 1L, Long::sum);
}
}
verify();
@@ -145,21 +148,36 @@ public class SnapshotManager {
_snapshotVersions.remove(curId);
nextId = _snapshotIds.isEmpty() ? -1 : _snapshotIds.peek();
for (var key : _snapshotBounds.remove(curId)) {
var keys = _snapshotBounds.headMap(curId, true);
long finalCurId = curId;
long finalNextId = nextId;
keys.values().stream().flatMap(Collection::stream).forEach(key -> {
var entry = _objects.get(key);
if (nextId == -1) {
if (entry == null) {
// Log.warnv("Entry not found for key {0}", key);
return;
}
if (finalNextId == -1) {
Log.tracev("Could not find place to place entry {0}, curId={1}, nextId={2}, whenToRemove={3}, snapshotIds={4}",
entry, curId, nextId, entry.whenToRemove(), _snapshotIds);
} else if (nextId < entry.whenToRemove()) {
entry, finalCurId, finalNextId, entry.whenToRemove(), _snapshotIds);
} else if (finalNextId < entry.whenToRemove()) {
if (!(entry instanceof SnapshotEntryRead || entry instanceof SnapshotEntryReadEmpty)) {
Log.errorv("Unexpected entry type: {0}, key: {1}, nextId: {2}, whenToRemove: {3}, snapshotIds: {4}",
entry, key, nextId, entry.whenToRemove(), _snapshotIds);
entry, key, finalNextId, entry.whenToRemove(), _snapshotIds);
assert false;
}
_objects.put(new SnapshotKey(key.key(), nextId), entry);
_objects.put(new SnapshotKey(key.key(), finalNextId), entry);
_snapshotBounds.merge(finalNextId, new ArrayDeque<>(List.of(new SnapshotKey(key.key(), finalNextId))),
(a, b) -> {
a.addAll(b);
return a;
});
}
_objects.remove(key);
}
});
keys.clear();
if (_snapshotIds.isEmpty()) {
_lastAliveSnapshotId = -1;