broken extra checks

This commit is contained in:
2025-02-26 11:58:07 +01:00
parent 5cbf5fcda2
commit 3e84ff1ed6
8 changed files with 109 additions and 27 deletions

View File

@@ -53,6 +53,9 @@ public class InconsistentSelfRefreshingKvIterator<K extends Comparable<K>, V> im
}
} else if (_lastReturnedKey != null) {
_backing = _iteratorSupplier.apply(Pair.of(IteratorStart.GT, _lastReturnedKey));
if (_backing.hasNext() && !(_backing.peekNextKey().compareTo(_lastReturnedKey) > 0)) {
throw new StaleIteratorException();
}
} else {
_backing = _iteratorSupplier.apply(_initialStart);
}

View File

@@ -8,6 +8,7 @@ import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import org.apache.commons.lang3.mutable.MutableObject;
import org.apache.commons.lang3.tuple.Pair;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import javax.annotation.Nonnull;
import java.lang.ref.Cleaner;
@@ -16,6 +17,7 @@ import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
import java.util.function.Function;
@ApplicationScoped
public class SnapshotManager {
@@ -43,6 +45,9 @@ public class SnapshotManager {
}
}
@ConfigProperty(name = "dhfs.objects.persistence.snapshot-extra-checks")
boolean extraChecks;
private long _lastSnapshotId = 0;
private long _lastAliveSnapshotId = -1;
private final AtomicLong _snapshotVersion = new AtomicLong(0);
@@ -238,7 +243,7 @@ public class SnapshotManager {
public class SnapshotKvIterator implements CloseableKvIterator<JObjectKey, TombstoneMergingKvIterator.DataType<JDataVersionedWrapper>> {
private final CloseableKvIterator<SnapshotKey, SnapshotEntry> _backing;
private Pair<JObjectKey, TombstoneMergingKvIterator.DataType<JDataVersionedWrapper>> _next;
private Pair<JObjectKey, TombstoneMergingKvIterator.DataType<JDataVersionedWrapper>> _next = null;
public SnapshotKvIterator(IteratorStart start, JObjectKey startKey) {
_backing = new NavigableMapKvIterator<>(_objects, start, new SnapshotKey(startKey, 0L));
@@ -246,13 +251,18 @@ public class SnapshotManager {
if (_next == null) {
return;
}
if (start == IteratorStart.LE) {
if (_next.getKey().compareTo(startKey) > 0) {
_next = null;
switch (start) {
case LT -> {
assert _next.getKey().compareTo(startKey) < 0;
}
} else if (start == IteratorStart.LT) {
if (_next.getKey().compareTo(startKey) >= 0) {
_next = null;
case LE -> {
assert _next.getKey().compareTo(startKey) <= 0;
}
case GT -> {
assert _next.getKey().compareTo(startKey) > 0;
}
case GE -> {
assert _next.getKey().compareTo(startKey) >= 0;
}
}
}
@@ -338,29 +348,54 @@ public class SnapshotManager {
@Override
public JObjectKey peekNextKey() {
return _backing.peekNextKey();
try {
return _backing.peekNextKey();
} catch (StaleIteratorException e) {
assert false;
throw e;
}
}
@Override
public void skip() {
_backing.skip();
try {
_backing.skip();
} catch (StaleIteratorException e) {
assert false;
throw e;
}
}
@Override
public void close() {
_backing.close();
try {
_backing.close();
} catch (StaleIteratorException e) {
assert false;
throw e;
}
}
@Override
public boolean hasNext() {
return _backing.hasNext();
try {
return _backing.hasNext();
} catch (StaleIteratorException e) {
assert false;
throw e;
}
}
@Override
public Pair<JObjectKey, JDataVersionedWrapper> next() {
var ret = _backing.next();
assert ret.getValue().version() <= _id;
return ret;
try {
var ret = _backing.next();
assert ret.getValue().version() <= _id;
return ret;
} catch (StaleIteratorException e) {
assert false;
throw e;
}
}
}
@@ -372,15 +407,21 @@ public class SnapshotManager {
Log.tracev("Getting snapshot {0} iterator for {1} {2}", _id, start, key);
_lock.readLock().lock();
try {
return new CheckingSnapshotKvIterator(new SelfRefreshingKvIterator<>(
p ->
new TombstoneMergingKvIterator<>("snapshot", p.getKey(), p.getValue(),
SnapshotKvIterator::new,
(tS, tK) -> new MappingKvIterator<>(
writebackStore.getIterator(tS, tK),
d -> d.version() <= _id ? new TombstoneMergingKvIterator.Data<>(d) : new TombstoneMergingKvIterator.Tombstone<>())
)
, _snapshotVersion::get, _lock.readLock(), start, key));
Function<Pair<IteratorStart, JObjectKey>, CloseableKvIterator<JObjectKey, JDataVersionedWrapper>> iteratorFactory =
p -> new TombstoneMergingKvIterator<>("snapshot", p.getKey(), p.getValue(),
SnapshotKvIterator::new,
(tS, tK) -> new MappingKvIterator<>(
writebackStore.getIterator(tS, tK),
d -> d.version() <= _id ? new TombstoneMergingKvIterator.Data<>(d) : new TombstoneMergingKvIterator.Tombstone<>())
);
var backing = extraChecks ? new SelfRefreshingKvIterator<>(
iteratorFactory, _snapshotVersion::get, _lock.readLock(), start, key
) : new InconsistentSelfRefreshingKvIterator<>(
iteratorFactory, _snapshotVersion::get, _lock.readLock(), start, key
);
return new CheckingSnapshotKvIterator(backing);
} finally {
_lock.readLock().unlock();
}

View File

@@ -5,3 +5,4 @@ dhfs.objects.lru.print-stats=true
dhfs.objects.lock_timeout_secs=15
dhfs.objects.persistence.files.root=${HOME}/dhfs_default/data/objs
quarkus.package.jar.decompiler.enabled=true
dhfs.objects.persistence.snapshot-extra-checks=false

View File

@@ -0,0 +1,9 @@
package com.usatiuk.dhfs.objects;
import io.quarkus.test.junit.QuarkusTest;
import io.quarkus.test.junit.TestProfile;
@QuarkusTest
@TestProfile(Profiles.ObjectsTestProfileExtraChecks.class)
public class ObjectsTestExtraChecks extends ObjectsTestImpl{
}

View File

@@ -5,7 +5,7 @@ import com.usatiuk.dhfs.objects.persistence.IteratorStart;
import com.usatiuk.dhfs.objects.transaction.LockingStrategy;
import com.usatiuk.dhfs.objects.transaction.Transaction;
import io.quarkus.logging.Log;
import io.quarkus.test.junit.QuarkusTest;
import io.quarkus.test.junit.QuarkusTestProfile;
import jakarta.inject.Inject;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Disabled;
@@ -13,16 +13,33 @@ import org.junit.jupiter.api.RepeatedTest;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.pcollections.TreePMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
@QuarkusTest
public class ObjectsTest {
class Profiles {
public static class ObjectsTestProfileExtraChecks implements QuarkusTestProfile {
@Override
final public Map<String, String> getConfigOverrides() {
return TreePMap.<String, String>empty().plus("dhfs.objects.persistence.snapshot-extra-checks", "true");
}
}
public static class ObjectsTestProfileNoExtraChecks implements QuarkusTestProfile {
@Override
final public Map<String, String> getConfigOverrides() {
return TreePMap.<String, String>empty().plus("dhfs.objects.persistence.snapshot-extra-checks", "false");
}
}
}
public abstract class ObjectsTestImpl {
@Inject
TransactionManager txm;

View File

@@ -0,0 +1,9 @@
package com.usatiuk.dhfs.objects;
import io.quarkus.test.junit.QuarkusTest;
import io.quarkus.test.junit.TestProfile;
@QuarkusTest
@TestProfile(Profiles.ObjectsTestProfileNoExtraChecks.class)
public class ObjectsTestNoExtraChecks extends ObjectsTestImpl {
}

View File

@@ -3,3 +3,4 @@ quarkus.log.category."com.usatiuk".level=TRACE
quarkus.log.category."com.usatiuk".min-level=TRACE
quarkus.http.test-port=0
quarkus.http.test-ssl-port=0
dhfs.objects.persistence.snapshot-extra-checks=true

View File

@@ -8,4 +8,5 @@ quarkus.log.category."com.usatiuk.dhfs".min-level=TRACE
quarkus.class-loading.parent-first-artifacts=com.usatiuk.dhfs:supportlib
quarkus.http.test-port=0
quarkus.http.test-ssl-port=0
dhfs.local-discovery=false
dhfs.local-discovery=false
dhfs.objects.persistence.snapshot-extra-checks=true