passing tests

This commit is contained in:
2025-02-22 19:54:05 +01:00
parent dfa2fe78bd
commit 36bc7eea40
5 changed files with 77 additions and 26 deletions

View File

@@ -52,7 +52,9 @@ public class JObjectManager {
verifyReady();
while (true) {
try {
return transactionFactory.createTransaction(_txCounter.get());
var tx = transactionFactory.createTransaction(_txCounter.get());
Log.tracev("Created transaction with snapshotId={0}", tx.snapshot().id());
return tx;
} catch (SnapshotManager.IllegalSnapshotIdException ignored) {
}
}

View File

@@ -23,15 +23,37 @@ public class SnapshotManager {
WritebackObjectPersistentStore delegateStore;
private interface SnapshotEntry {
long whenToRemove();
SnapshotEntry withWhenToRemove(long whenToRemove);
}
private record SnapshotEntryRead(JDataVersionedWrapper data, long whenToRemove) implements SnapshotEntry {
@Override
public SnapshotEntry withWhenToRemove(long whenToRemove) {
return new SnapshotEntryRead(data, whenToRemove);
}
}
private record SnapshotEntryObject(JDataVersionedWrapper data) implements SnapshotEntry {
private record SnapshotEntryReadEmpty(long whenToRemove) implements SnapshotEntry {
@Override
public SnapshotEntry withWhenToRemove(long whenToRemove) {
return new SnapshotEntryReadEmpty(whenToRemove);
}
}
private record SnapshotEntryDeleted() implements SnapshotEntry {
private record SnapshotEntryObject(JDataVersionedWrapper data, long whenToRemove) implements SnapshotEntry {
@Override
public SnapshotEntry withWhenToRemove(long whenToRemove) {
return new SnapshotEntryObject(data, whenToRemove);
}
}
private record SnapshotEntryDeleted(long whenToRemove) implements SnapshotEntry {
@Override
public SnapshotEntry withWhenToRemove(long whenToRemove) {
return new SnapshotEntryDeleted(whenToRemove);
}
}
private record SnapshotKey(JObjectKey key, long version) implements Comparable<SnapshotKey> {
@@ -71,20 +93,25 @@ public class SnapshotManager {
) -> {
hadBackward = true;
yield Pair.of(new SnapshotKey(action.key(), _snapshotIds.peek()),
data.<SnapshotEntry>map(o -> new SnapshotEntryRead(o, id)).orElse(new SnapshotEntryDeleted()));
data.<SnapshotEntry>map(o -> new SnapshotEntryRead(o, id)).orElse(new SnapshotEntryReadEmpty(id)));
}
case WritebackObjectPersistentStore.VerboseReadResultPending(
TxWriteback.PendingWriteEntry pending
) -> switch (pending) {
case TxWriteback.PendingWrite write ->
Pair.of(new SnapshotKey(action.key(), write.bundleId()), new SnapshotEntryObject(write.data()));
case TxWriteback.PendingDelete delete ->
Pair.of(new SnapshotKey(action.key(), delete.bundleId()), new SnapshotEntryDeleted());
default -> throw new IllegalStateException("Unexpected value: " + pending);
};
) -> {
assert pending.bundleId() < id;
yield switch (pending) {
case TxWriteback.PendingWrite write ->
Pair.of(new SnapshotKey(action.key(), write.bundleId()), new SnapshotEntryObject(write.data(), write.bundleId()));
case TxWriteback.PendingDelete delete ->
Pair.of(new SnapshotKey(action.key(), delete.bundleId()), new SnapshotEntryDeleted(delete.bundleId()));
default -> throw new IllegalStateException("Unexpected value: " + pending);
};
}
default -> throw new IllegalStateException("Unexpected value: " + current);
};
Log.tracev("Adding snapshot entry {0}", newSnapshotEntry);
_objects.put(newSnapshotEntry.getLeft(), newSnapshotEntry.getRight());
_snapshotBounds.put(newSnapshotEntry.getLeft().version(), newSnapshotEntry.getLeft());
}
@@ -101,6 +128,7 @@ public class SnapshotManager {
}
private void unrefSnapshot(long id) {
Log.tracev("Unref snapshot {0}", id);
synchronized (this) {
verify();
var refCount = _snapshotRefCounts.merge(id, -1L, (a, b) -> a + b == 0 ? null : a + b);
@@ -112,19 +140,23 @@ public class SnapshotManager {
long curId = id;
long nextId;
do {
Log.tracev("Removing snapshot {0}", curId);
_snapshotIds.poll();
_snapshotVersions.remove(curId);
nextId = _snapshotIds.isEmpty() ? -1 : _snapshotIds.peek();
for (var key : _snapshotBounds.remove(curId)) {
var entry = _objects.get(key);
if (entry instanceof SnapshotEntryRead read) {
if (curId != read.whenToRemove() - 1) {
assert nextId != -1;
if (nextId < read.whenToRemove()) {
_objects.put(new SnapshotKey(key.key(), nextId), entry);
}
if (nextId == -1) {
Log.tracev("Could not find place to place entry {0}, curId={1}, nextId={2}, whenToRemove={3}, snapshotIds={4}",
entry, curId, nextId, entry.whenToRemove(), _snapshotIds);
} else if (nextId < entry.whenToRemove()) {
if (!(entry instanceof SnapshotEntryRead || entry instanceof SnapshotEntryReadEmpty)) {
Log.errorv("Unexpected entry type: {0}, key: {1}, nextId: {2}, whenToRemove: {3}, snapshotIds: {4}",
entry, key, nextId, entry.whenToRemove(), _snapshotIds);
assert false;
}
_objects.put(new SnapshotKey(key.key(), nextId), entry);
}
_objects.remove(key);
}
@@ -172,9 +204,10 @@ public class SnapshotManager {
_lastSnapshotId = id;
if (_lastAliveSnapshotId == -1)
_lastAliveSnapshotId = id;
_snapshotIds.add(id);
_snapshotRefCounts.merge(id, 1L, Long::sum);
_snapshotVersions.put(id, 0L);
if (_snapshotRefCounts.merge(id, 1L, Long::sum) == 1) {
_snapshotIds.add(id);
_snapshotVersions.put(id, 0L);
}
verify();
}
var closedRef = _closed;
@@ -201,15 +234,17 @@ public class SnapshotManager {
var nextNextKey = _backing.hasNext() ? _backing.peekNextKey() : null;
while (nextNextKey != null && nextNextKey.key.equals(next.getKey().key()) && nextNextKey.version() <= _id) {
next = _backing.next();
nextNextKey = _backing.peekNextKey();
nextNextKey = _backing.hasNext() ? _backing.peekNextKey() : null;
}
if (next.getKey().version() <= _id) {
if (next.getKey().version() <= _id && next.getValue().whenToRemove() > _id) {
_next = switch (next.getValue()) {
case SnapshotEntryObject(JDataVersionedWrapper data) ->
case SnapshotEntryObject(JDataVersionedWrapper data, long whenToRemove) ->
Pair.of(next.getKey().key(), new TombstoneMergingKvIterator.Data<>(data));
case SnapshotEntryRead(JDataVersionedWrapper data, long whenToRemove) ->
Pair.of(next.getKey().key(), new TombstoneMergingKvIterator.Data<>(data));
case SnapshotEntryDeleted() ->
case SnapshotEntryReadEmpty(long whenToRemove) ->
Pair.of(next.getKey().key(), new TombstoneMergingKvIterator.Tombstone<>());
case SnapshotEntryDeleted(long whenToRemove) ->
Pair.of(next.getKey().key(), new TombstoneMergingKvIterator.Tombstone<>());
default -> throw new IllegalStateException("Unexpected value: " + next.getValue());
};
@@ -241,6 +276,7 @@ public class SnapshotManager {
var ret = _next;
_next = null;
fillNext();
Log.tracev("Read: {0}, next: {1}", ret, _next);
return ret;
}
@@ -268,6 +304,7 @@ public class SnapshotManager {
if (_next == null) return;
synchronized (SnapshotManager.this) {
curVersion = _snapshotVersions.get(_id);
Log.tracev("Refreshing snapshot iterator {0}, last refreshed {1}, current version {2}", _id, _lastRefreshed, curVersion);
_backing.close();
_backing = new TombstoneMergingKvIterator<>(new SnapshotKvIterator(IteratorStart.GE, _next.getKey()), delegateStore.getIterator(IteratorStart.GE, _next.getKey()));
var next = _backing.hasNext() ? _backing.next() : null;
@@ -309,6 +346,7 @@ public class SnapshotManager {
}
var ret = _next;
prepareNext();
Log.tracev("Read: {0}, next: {1}", ret, _next);
return ret;
}
}

View File

@@ -139,6 +139,8 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore {
}
}
}
Log.tracev("got: {0}, hasNext: {1}", got, _hasNext);
}
@Override
@@ -163,6 +165,7 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore {
}
var ret = Pair.of(JObjectKey.fromBytes(_cursor.key()), ByteString.copyFrom(_cursor.val()));
_hasNext = _cursor.next();
Log.tracev("Read: {0}, hasNext: {1}", ret, _hasNext);
return ret;
}

View File

@@ -4,6 +4,7 @@ import com.usatiuk.dhfs.objects.JData;
import com.usatiuk.dhfs.objects.JObjectKey;
import com.usatiuk.dhfs.objects.SnapshotManager;
import com.usatiuk.dhfs.objects.persistence.IteratorStart;
import io.quarkus.logging.Log;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import org.apache.commons.lang3.tuple.Pair;
@@ -20,6 +21,7 @@ public class TransactionFactoryImpl implements TransactionFactory {
@Override
public TransactionPrivate createTransaction(long snapshotId) {
Log.tracev("Trying to create transaction with snapshotId={0}", snapshotId);
return new TransactionImpl(snapshotId);
}

View File

@@ -10,6 +10,7 @@ import com.usatiuk.dhfs.objects.transaction.Transaction;
import com.usatiuk.kleppmanntree.AlreadyExistsException;
import jakarta.inject.Inject;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.RepeatedTest;
import org.junit.jupiter.api.Test;
import java.util.Map;
@@ -94,13 +95,15 @@ public class DhfsFileServiceSimpleTestImpl {
// }
// }
@Test
@RepeatedTest(100)
void dontMkdirTwiceTest() {
Assertions.assertDoesNotThrow(() -> fileService.mkdir("/dontMkdirTwiceTest", 777));
Assertions.assertThrows(AlreadyExistsException.class, () -> fileService.mkdir("/dontMkdirTwiceTest", 777));
fileService.unlink("/dontMkdirTwiceTest");
Assertions.assertFalse(fileService.open("/dontMkdirTwiceTest").isPresent());
}
@Test
@RepeatedTest(100)
void writeTest() {
var ret = fileService.create("/writeTest", 777);
Assertions.assertTrue(ret.isPresent());
@@ -117,6 +120,9 @@ public class DhfsFileServiceSimpleTestImpl {
Assertions.assertArrayEquals(new byte[]{0, 1, 2, 3, 10, 11, 15, 16, 8, 9, 13, 14}, fileService.read(uuid, 0, 12).get().toByteArray());
fileService.write(uuid, 3, new byte[]{17, 18});
Assertions.assertArrayEquals(new byte[]{0, 1, 2, 17, 18, 11, 15, 16, 8, 9, 13, 14}, fileService.read(uuid, 0, 12).get().toByteArray());
fileService.unlink("/writeTest");
Assertions.assertFalse(fileService.open("/writeTest").isPresent());
}
@Test