some moving around

This commit is contained in:
2025-02-26 18:34:13 +01:00
parent 3e84ff1ed6
commit 52c31bc864
11 changed files with 153 additions and 125 deletions

View File

@@ -1,5 +1,6 @@
package com.usatiuk.dhfs.objects;
import com.usatiuk.dhfs.objects.snapshot.SnapshotManager;
import com.usatiuk.dhfs.objects.transaction.*;
import com.usatiuk.dhfs.utils.AutoCloseableNoThrow;
import io.quarkus.logging.Log;

View File

@@ -384,7 +384,7 @@ public class WritebackObjectPersistentStore {
}
@Nonnull
Optional<JDataVersionedWrapper> readObject(JObjectKey name) {
public Optional<JDataVersionedWrapper> readObject(JObjectKey name) {
var pending = getPendingWrite(name).orElse(null);
return switch (pending) {
case PendingWrite write -> Optional.of(write.data());
@@ -404,7 +404,7 @@ public class WritebackObjectPersistentStore {
}
@Nonnull
VerboseReadResult readObjectVerbose(JObjectKey key) {
public VerboseReadResult readObjectVerbose(JObjectKey key) {
var pending = getPendingWrite(key).orElse(null);
if (pending != null) {
return new VerboseReadResultPending(pending);
@@ -412,7 +412,7 @@ public class WritebackObjectPersistentStore {
return new VerboseReadResultPersisted(cachedStore.readObject(key));
}
Consumer<Runnable> commitTx(Collection<TxRecord.TxObjectRecord<?>> writes, long id) {
public Consumer<Runnable> commitTx(Collection<TxRecord.TxObjectRecord<?>> writes, long id) {
var bundle = createBundle();
try {
for (var action : writes) {

View File

@@ -0,0 +1,5 @@
package com.usatiuk.dhfs.objects.snapshot;
interface SnapshotEntry {
long whenToRemove();
}

View File

@@ -0,0 +1,4 @@
package com.usatiuk.dhfs.objects.snapshot;
record SnapshotEntryDeleted(long whenToRemove) implements SnapshotEntry {
}

View File

@@ -0,0 +1,6 @@
package com.usatiuk.dhfs.objects.snapshot;
import com.usatiuk.dhfs.objects.JDataVersionedWrapper;
record SnapshotEntryObject(JDataVersionedWrapper data, long whenToRemove) implements SnapshotEntry {
}

View File

@@ -0,0 +1,15 @@
package com.usatiuk.dhfs.objects.snapshot;
import com.usatiuk.dhfs.objects.JObjectKey;
import javax.annotation.Nonnull;
import java.util.Comparator;
record SnapshotKey(JObjectKey key, long version) implements Comparable<SnapshotKey> {
@Override
public int compareTo(@Nonnull SnapshotKey o) {
return Comparator.comparing(SnapshotKey::key)
.thenComparing(SnapshotKey::version)
.compare(this, o);
}
}

View File

@@ -0,0 +1,111 @@
package com.usatiuk.dhfs.objects.snapshot;
import com.usatiuk.dhfs.objects.*;
import com.usatiuk.dhfs.objects.persistence.IteratorStart;
import io.quarkus.logging.Log;
import org.apache.commons.lang3.tuple.Pair;
import java.util.NavigableMap;
import java.util.NoSuchElementException;
public class SnapshotKvIterator implements CloseableKvIterator<JObjectKey, TombstoneMergingKvIterator.DataType<JDataVersionedWrapper>> {
private final NavigableMap<SnapshotKey, SnapshotEntry> _objects;
private final long _version;
private final CloseableKvIterator<SnapshotKey, SnapshotEntry> _backing;
private Pair<JObjectKey, TombstoneMergingKvIterator.DataType<JDataVersionedWrapper>> _next = null;
public SnapshotKvIterator(NavigableMap<SnapshotKey, SnapshotEntry> objects, long version, IteratorStart start, JObjectKey startKey) {
_objects = objects;
_version = version;
_backing = new NavigableMapKvIterator<>(_objects, start, new SnapshotKey(startKey, 0L));
fillNext();
if (_next == null) {
return;
}
switch (start) {
case LT -> {
assert _next.getKey().compareTo(startKey) < 0;
}
case LE -> {
assert _next.getKey().compareTo(startKey) <= 0;
}
case GT -> {
assert _next.getKey().compareTo(startKey) > 0;
}
case GE -> {
assert _next.getKey().compareTo(startKey) >= 0;
}
}
}
private void fillNext() {
while (_backing.hasNext() && _next == null) {
var next = _backing.next();
var nextNextKey = _backing.hasNext() ? _backing.peekNextKey() : null;
while (nextNextKey != null && nextNextKey.key().equals(next.getKey().key()) && nextNextKey.version() <= _version) {
next = _backing.next();
nextNextKey = _backing.hasNext() ? _backing.peekNextKey() : null;
}
// next.getValue().whenToRemove() >=_id, read tx might have same snapshot id as some write tx
if (next.getKey().version() <= _version && next.getValue().whenToRemove() > _version) {
_next = switch (next.getValue()) {
case SnapshotEntryObject(JDataVersionedWrapper data, long whenToRemove) ->
Pair.of(next.getKey().key(), new TombstoneMergingKvIterator.Data<>(data));
case SnapshotEntryDeleted(long whenToRemove) ->
Pair.of(next.getKey().key(), new TombstoneMergingKvIterator.Tombstone<>());
default -> throw new IllegalStateException("Unexpected value: " + next.getValue());
};
}
if (_next != null) {
if (_next.getValue() instanceof TombstoneMergingKvIterator.Data<JDataVersionedWrapper>(
JDataVersionedWrapper value
)) {
assert value.version() <= _version;
}
}
}
}
@Override
public JObjectKey peekNextKey() {
if (_next == null)
throw new NoSuchElementException();
return _next.getKey();
}
@Override
public void skip() {
if (_next == null)
throw new NoSuchElementException();
_next = null;
fillNext();
}
@Override
public void close() {
_backing.close();
}
@Override
public boolean hasNext() {
return _next != null;
}
@Override
public Pair<JObjectKey, TombstoneMergingKvIterator.DataType<JDataVersionedWrapper>> next() {
if (_next == null)
throw new NoSuchElementException("No more elements");
var ret = _next;
if (ret.getValue() instanceof TombstoneMergingKvIterator.Data<JDataVersionedWrapper>(
JDataVersionedWrapper value
)) {
assert value.version() <= _version;
}
_next = null;
fillNext();
Log.tracev("Read: {0}, next: {1}", ret, _next);
return ret;
}
}

View File

@@ -1,5 +1,6 @@
package com.usatiuk.dhfs.objects;
package com.usatiuk.dhfs.objects.snapshot;
import com.usatiuk.dhfs.objects.*;
import com.usatiuk.dhfs.objects.persistence.IteratorStart;
import com.usatiuk.dhfs.objects.transaction.TxRecord;
import com.usatiuk.dhfs.utils.AutoCloseableNoThrow;
@@ -26,25 +27,6 @@ public class SnapshotManager {
private final ReentrantReadWriteLock _lock = new ReentrantReadWriteLock();
private interface SnapshotEntry {
long whenToRemove();
}
private record SnapshotEntryObject(JDataVersionedWrapper data, long whenToRemove) implements SnapshotEntry {
}
private record SnapshotEntryDeleted(long whenToRemove) implements SnapshotEntry {
}
private record SnapshotKey(JObjectKey key, long version) implements Comparable<SnapshotKey> {
@Override
public int compareTo(@Nonnull SnapshotKey o) {
return Comparator.comparing(SnapshotKey::key)
.thenComparing(SnapshotKey::version)
.compare(this, o);
}
}
@ConfigProperty(name = "dhfs.objects.persistence.snapshot-extra-checks")
boolean extraChecks;
@@ -62,7 +44,7 @@ public class SnapshotManager {
assert _snapshotIds.isEmpty() || _snapshotIds.peek() == _lastAliveSnapshotId;
}
Consumer<Runnable> commitTx(Collection<TxRecord.TxObjectRecord<?>> writes, long id) {
public Consumer<Runnable> commitTx(Collection<TxRecord.TxObjectRecord<?>> writes, long id) {
_lock.writeLock().lock();
try {
assert id > _lastSnapshotId;
@@ -241,104 +223,6 @@ public class SnapshotManager {
});
}
public class SnapshotKvIterator implements CloseableKvIterator<JObjectKey, TombstoneMergingKvIterator.DataType<JDataVersionedWrapper>> {
private final CloseableKvIterator<SnapshotKey, SnapshotEntry> _backing;
private Pair<JObjectKey, TombstoneMergingKvIterator.DataType<JDataVersionedWrapper>> _next = null;
public SnapshotKvIterator(IteratorStart start, JObjectKey startKey) {
_backing = new NavigableMapKvIterator<>(_objects, start, new SnapshotKey(startKey, 0L));
fillNext();
if (_next == null) {
return;
}
switch (start) {
case LT -> {
assert _next.getKey().compareTo(startKey) < 0;
}
case LE -> {
assert _next.getKey().compareTo(startKey) <= 0;
}
case GT -> {
assert _next.getKey().compareTo(startKey) > 0;
}
case GE -> {
assert _next.getKey().compareTo(startKey) >= 0;
}
}
}
private void fillNext() {
while (_backing.hasNext() && _next == null) {
var next = _backing.next();
var nextNextKey = _backing.hasNext() ? _backing.peekNextKey() : null;
while (nextNextKey != null && nextNextKey.key.equals(next.getKey().key()) && nextNextKey.version() <= _id) {
next = _backing.next();
nextNextKey = _backing.hasNext() ? _backing.peekNextKey() : null;
}
// next.getValue().whenToRemove() >=_id, read tx might have same snapshot id as some write tx
if (next.getKey().version() <= _id && next.getValue().whenToRemove() > _id) {
_next = switch (next.getValue()) {
case SnapshotEntryObject(JDataVersionedWrapper data, long whenToRemove) ->
Pair.of(next.getKey().key(), new TombstoneMergingKvIterator.Data<>(data));
case SnapshotEntryDeleted(long whenToRemove) ->
Pair.of(next.getKey().key(), new TombstoneMergingKvIterator.Tombstone<>());
default -> throw new IllegalStateException("Unexpected value: " + next.getValue());
};
}
if (_next != null) {
if (_next.getValue() instanceof TombstoneMergingKvIterator.Data<JDataVersionedWrapper>(
JDataVersionedWrapper value
)) {
assert value.version() <= _id;
}
}
}
}
@Override
public JObjectKey peekNextKey() {
if (_next == null)
throw new NoSuchElementException();
return _next.getKey();
}
@Override
public void skip() {
if (_next == null)
throw new NoSuchElementException();
_next = null;
fillNext();
}
@Override
public void close() {
_backing.close();
}
@Override
public boolean hasNext() {
return _next != null;
}
@Override
public Pair<JObjectKey, TombstoneMergingKvIterator.DataType<JDataVersionedWrapper>> next() {
if (_next == null)
throw new NoSuchElementException("No more elements");
var ret = _next;
if (ret.getValue() instanceof TombstoneMergingKvIterator.Data<JDataVersionedWrapper>(
JDataVersionedWrapper value
)) {
assert value.version() <= _id;
}
_next = null;
fillNext();
Log.tracev("Read: {0}, next: {1}", ret, _next);
return ret;
}
}
public class CheckingSnapshotKvIterator implements CloseableKvIterator<JObjectKey, JDataVersionedWrapper> {
private final CloseableKvIterator<JObjectKey, JDataVersionedWrapper> _backing;
@@ -409,7 +293,7 @@ public class SnapshotManager {
try {
Function<Pair<IteratorStart, JObjectKey>, CloseableKvIterator<JObjectKey, JDataVersionedWrapper>> iteratorFactory =
p -> new TombstoneMergingKvIterator<>("snapshot", p.getKey(), p.getValue(),
SnapshotKvIterator::new,
(tS, tK) -> new SnapshotKvIterator(_objects, _id, tS, tK),
(tS, tK) -> new MappingKvIterator<>(
writebackStore.getIterator(tS, tK),
d -> d.version() <= _id ? new TombstoneMergingKvIterator.Data<>(d) : new TombstoneMergingKvIterator.Tombstone<>())
@@ -459,7 +343,7 @@ public class SnapshotManager {
}
@Nonnull
Optional<JDataVersionedWrapper> readObjectDirect(JObjectKey name) {
public Optional<JDataVersionedWrapper> readObjectDirect(JObjectKey name) {
return writebackStore.readObject(name);
}
}

View File

@@ -2,6 +2,7 @@ package com.usatiuk.dhfs.objects.transaction;
import com.usatiuk.dhfs.objects.*;
import com.usatiuk.dhfs.objects.persistence.IteratorStart;
import com.usatiuk.dhfs.objects.snapshot.SnapshotManager;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import org.apache.commons.lang3.tuple.Pair;

View File

@@ -2,6 +2,7 @@ package com.usatiuk.dhfs.objects.transaction;
import com.usatiuk.dhfs.objects.*;
import com.usatiuk.dhfs.objects.persistence.IteratorStart;
import com.usatiuk.dhfs.objects.snapshot.SnapshotManager;
import io.quarkus.logging.Log;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;

View File

@@ -1,7 +1,7 @@
package com.usatiuk.dhfs.objects.transaction;
import com.usatiuk.dhfs.objects.JObjectKey;
import com.usatiuk.dhfs.objects.SnapshotManager;
import com.usatiuk.dhfs.objects.snapshot.SnapshotManager;
import com.usatiuk.dhfs.utils.AutoCloseableNoThrow;
import java.util.Collection;