diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/JObjectManager.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/JObjectManager.java index fdd3f421..6aedd59a 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/JObjectManager.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/JObjectManager.java @@ -1,5 +1,6 @@ package com.usatiuk.dhfs.objects; +import com.usatiuk.dhfs.objects.snapshot.SnapshotManager; import com.usatiuk.dhfs.objects.transaction.*; import com.usatiuk.dhfs.utils.AutoCloseableNoThrow; import io.quarkus.logging.Log; diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/WritebackObjectPersistentStore.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/WritebackObjectPersistentStore.java index 173689b0..56c84aed 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/WritebackObjectPersistentStore.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/WritebackObjectPersistentStore.java @@ -384,7 +384,7 @@ public class WritebackObjectPersistentStore { } @Nonnull - Optional readObject(JObjectKey name) { + public Optional readObject(JObjectKey name) { var pending = getPendingWrite(name).orElse(null); return switch (pending) { case PendingWrite write -> Optional.of(write.data()); @@ -404,7 +404,7 @@ public class WritebackObjectPersistentStore { } @Nonnull - VerboseReadResult readObjectVerbose(JObjectKey key) { + public VerboseReadResult readObjectVerbose(JObjectKey key) { var pending = getPendingWrite(key).orElse(null); if (pending != null) { return new VerboseReadResultPending(pending); @@ -412,7 +412,7 @@ public class WritebackObjectPersistentStore { return new VerboseReadResultPersisted(cachedStore.readObject(key)); } - Consumer commitTx(Collection> writes, long id) { + public Consumer commitTx(Collection> writes, long id) { var bundle = createBundle(); try { for (var action : writes) { diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/snapshot/SnapshotEntry.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/snapshot/SnapshotEntry.java new file mode 100644 index 00000000..5bb49752 --- /dev/null +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/snapshot/SnapshotEntry.java @@ -0,0 +1,5 @@ +package com.usatiuk.dhfs.objects.snapshot; + +interface SnapshotEntry { + long whenToRemove(); +} diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/snapshot/SnapshotEntryDeleted.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/snapshot/SnapshotEntryDeleted.java new file mode 100644 index 00000000..bd838c17 --- /dev/null +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/snapshot/SnapshotEntryDeleted.java @@ -0,0 +1,4 @@ +package com.usatiuk.dhfs.objects.snapshot; + +record SnapshotEntryDeleted(long whenToRemove) implements SnapshotEntry { +} diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/snapshot/SnapshotEntryObject.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/snapshot/SnapshotEntryObject.java new file mode 100644 index 00000000..f4d7e40d --- /dev/null +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/snapshot/SnapshotEntryObject.java @@ -0,0 +1,6 @@ +package com.usatiuk.dhfs.objects.snapshot; + +import com.usatiuk.dhfs.objects.JDataVersionedWrapper; + +record SnapshotEntryObject(JDataVersionedWrapper data, long whenToRemove) implements SnapshotEntry { +} diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/snapshot/SnapshotKey.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/snapshot/SnapshotKey.java new file mode 100644 index 00000000..9fd6e80f --- /dev/null +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/snapshot/SnapshotKey.java @@ -0,0 +1,15 @@ +package com.usatiuk.dhfs.objects.snapshot; + +import com.usatiuk.dhfs.objects.JObjectKey; + +import javax.annotation.Nonnull; +import java.util.Comparator; + +record SnapshotKey(JObjectKey key, long version) implements Comparable { + @Override + public int compareTo(@Nonnull SnapshotKey o) { + return Comparator.comparing(SnapshotKey::key) + .thenComparing(SnapshotKey::version) + .compare(this, o); + } +} diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/snapshot/SnapshotKvIterator.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/snapshot/SnapshotKvIterator.java new file mode 100644 index 00000000..2b52fae4 --- /dev/null +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/snapshot/SnapshotKvIterator.java @@ -0,0 +1,111 @@ +package com.usatiuk.dhfs.objects.snapshot; + +import com.usatiuk.dhfs.objects.*; +import com.usatiuk.dhfs.objects.persistence.IteratorStart; +import io.quarkus.logging.Log; +import org.apache.commons.lang3.tuple.Pair; + +import java.util.NavigableMap; +import java.util.NoSuchElementException; + +public class SnapshotKvIterator implements CloseableKvIterator> { + private final NavigableMap _objects; + private final long _version; + private final CloseableKvIterator _backing; + private Pair> _next = null; + + public SnapshotKvIterator(NavigableMap objects, long version, IteratorStart start, JObjectKey startKey) { + _objects = objects; + _version = version; + _backing = new NavigableMapKvIterator<>(_objects, start, new SnapshotKey(startKey, 0L)); + fillNext(); + if (_next == null) { + return; + } + switch (start) { + case LT -> { + assert _next.getKey().compareTo(startKey) < 0; + } + case LE -> { + assert _next.getKey().compareTo(startKey) <= 0; + } + case GT -> { + assert _next.getKey().compareTo(startKey) > 0; + } + case GE -> { + assert _next.getKey().compareTo(startKey) >= 0; + } + } + } + + private void fillNext() { + while (_backing.hasNext() && _next == null) { + var next = _backing.next(); + var nextNextKey = _backing.hasNext() ? _backing.peekNextKey() : null; + while (nextNextKey != null && nextNextKey.key().equals(next.getKey().key()) && nextNextKey.version() <= _version) { + next = _backing.next(); + nextNextKey = _backing.hasNext() ? _backing.peekNextKey() : null; + } + // next.getValue().whenToRemove() >=_id, read tx might have same snapshot id as some write tx + if (next.getKey().version() <= _version && next.getValue().whenToRemove() > _version) { + _next = switch (next.getValue()) { + case SnapshotEntryObject(JDataVersionedWrapper data, long whenToRemove) -> + Pair.of(next.getKey().key(), new TombstoneMergingKvIterator.Data<>(data)); + case SnapshotEntryDeleted(long whenToRemove) -> + Pair.of(next.getKey().key(), new TombstoneMergingKvIterator.Tombstone<>()); + default -> throw new IllegalStateException("Unexpected value: " + next.getValue()); + }; + } + if (_next != null) { + if (_next.getValue() instanceof TombstoneMergingKvIterator.Data( + JDataVersionedWrapper value + )) { + assert value.version() <= _version; + } + } + } + } + + @Override + public JObjectKey peekNextKey() { + if (_next == null) + throw new NoSuchElementException(); + return _next.getKey(); + } + + @Override + public void skip() { + if (_next == null) + throw new NoSuchElementException(); + _next = null; + fillNext(); + } + + @Override + public void close() { + _backing.close(); + } + + @Override + public boolean hasNext() { + return _next != null; + } + + @Override + public Pair> next() { + if (_next == null) + throw new NoSuchElementException("No more elements"); + var ret = _next; + if (ret.getValue() instanceof TombstoneMergingKvIterator.Data( + JDataVersionedWrapper value + )) { + assert value.version() <= _version; + } + + _next = null; + fillNext(); + Log.tracev("Read: {0}, next: {1}", ret, _next); + return ret; + } + +} diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/SnapshotManager.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/snapshot/SnapshotManager.java similarity index 73% rename from dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/SnapshotManager.java rename to dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/snapshot/SnapshotManager.java index 5ca22c02..bb7be190 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/SnapshotManager.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/snapshot/SnapshotManager.java @@ -1,5 +1,6 @@ -package com.usatiuk.dhfs.objects; +package com.usatiuk.dhfs.objects.snapshot; +import com.usatiuk.dhfs.objects.*; import com.usatiuk.dhfs.objects.persistence.IteratorStart; import com.usatiuk.dhfs.objects.transaction.TxRecord; import com.usatiuk.dhfs.utils.AutoCloseableNoThrow; @@ -26,25 +27,6 @@ public class SnapshotManager { private final ReentrantReadWriteLock _lock = new ReentrantReadWriteLock(); - private interface SnapshotEntry { - long whenToRemove(); - } - - private record SnapshotEntryObject(JDataVersionedWrapper data, long whenToRemove) implements SnapshotEntry { - } - - private record SnapshotEntryDeleted(long whenToRemove) implements SnapshotEntry { - } - - private record SnapshotKey(JObjectKey key, long version) implements Comparable { - @Override - public int compareTo(@Nonnull SnapshotKey o) { - return Comparator.comparing(SnapshotKey::key) - .thenComparing(SnapshotKey::version) - .compare(this, o); - } - } - @ConfigProperty(name = "dhfs.objects.persistence.snapshot-extra-checks") boolean extraChecks; @@ -62,7 +44,7 @@ public class SnapshotManager { assert _snapshotIds.isEmpty() || _snapshotIds.peek() == _lastAliveSnapshotId; } - Consumer commitTx(Collection> writes, long id) { + public Consumer commitTx(Collection> writes, long id) { _lock.writeLock().lock(); try { assert id > _lastSnapshotId; @@ -241,104 +223,6 @@ public class SnapshotManager { }); } - public class SnapshotKvIterator implements CloseableKvIterator> { - private final CloseableKvIterator _backing; - private Pair> _next = null; - - public SnapshotKvIterator(IteratorStart start, JObjectKey startKey) { - _backing = new NavigableMapKvIterator<>(_objects, start, new SnapshotKey(startKey, 0L)); - fillNext(); - if (_next == null) { - return; - } - switch (start) { - case LT -> { - assert _next.getKey().compareTo(startKey) < 0; - } - case LE -> { - assert _next.getKey().compareTo(startKey) <= 0; - } - case GT -> { - assert _next.getKey().compareTo(startKey) > 0; - } - case GE -> { - assert _next.getKey().compareTo(startKey) >= 0; - } - } - } - - private void fillNext() { - while (_backing.hasNext() && _next == null) { - var next = _backing.next(); - var nextNextKey = _backing.hasNext() ? _backing.peekNextKey() : null; - while (nextNextKey != null && nextNextKey.key.equals(next.getKey().key()) && nextNextKey.version() <= _id) { - next = _backing.next(); - nextNextKey = _backing.hasNext() ? _backing.peekNextKey() : null; - } - // next.getValue().whenToRemove() >=_id, read tx might have same snapshot id as some write tx - if (next.getKey().version() <= _id && next.getValue().whenToRemove() > _id) { - _next = switch (next.getValue()) { - case SnapshotEntryObject(JDataVersionedWrapper data, long whenToRemove) -> - Pair.of(next.getKey().key(), new TombstoneMergingKvIterator.Data<>(data)); - case SnapshotEntryDeleted(long whenToRemove) -> - Pair.of(next.getKey().key(), new TombstoneMergingKvIterator.Tombstone<>()); - default -> throw new IllegalStateException("Unexpected value: " + next.getValue()); - }; - } - if (_next != null) { - if (_next.getValue() instanceof TombstoneMergingKvIterator.Data( - JDataVersionedWrapper value - )) { - assert value.version() <= _id; - } - } - } - } - - @Override - public JObjectKey peekNextKey() { - if (_next == null) - throw new NoSuchElementException(); - return _next.getKey(); - } - - @Override - public void skip() { - if (_next == null) - throw new NoSuchElementException(); - _next = null; - fillNext(); - } - - @Override - public void close() { - _backing.close(); - } - - @Override - public boolean hasNext() { - return _next != null; - } - - @Override - public Pair> next() { - if (_next == null) - throw new NoSuchElementException("No more elements"); - var ret = _next; - if (ret.getValue() instanceof TombstoneMergingKvIterator.Data( - JDataVersionedWrapper value - )) { - assert value.version() <= _id; - } - - _next = null; - fillNext(); - Log.tracev("Read: {0}, next: {1}", ret, _next); - return ret; - } - - } - public class CheckingSnapshotKvIterator implements CloseableKvIterator { private final CloseableKvIterator _backing; @@ -409,7 +293,7 @@ public class SnapshotManager { try { Function, CloseableKvIterator> iteratorFactory = p -> new TombstoneMergingKvIterator<>("snapshot", p.getKey(), p.getValue(), - SnapshotKvIterator::new, + (tS, tK) -> new SnapshotKvIterator(_objects, _id, tS, tK), (tS, tK) -> new MappingKvIterator<>( writebackStore.getIterator(tS, tK), d -> d.version() <= _id ? new TombstoneMergingKvIterator.Data<>(d) : new TombstoneMergingKvIterator.Tombstone<>()) @@ -459,7 +343,7 @@ public class SnapshotManager { } @Nonnull - Optional readObjectDirect(JObjectKey name) { + public Optional readObjectDirect(JObjectKey name) { return writebackStore.readObject(name); } } diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/ReadTrackingObjectSourceFactory.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/ReadTrackingObjectSourceFactory.java index 17881c03..eae1d216 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/ReadTrackingObjectSourceFactory.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/ReadTrackingObjectSourceFactory.java @@ -2,6 +2,7 @@ package com.usatiuk.dhfs.objects.transaction; import com.usatiuk.dhfs.objects.*; import com.usatiuk.dhfs.objects.persistence.IteratorStart; +import com.usatiuk.dhfs.objects.snapshot.SnapshotManager; import jakarta.enterprise.context.ApplicationScoped; import jakarta.inject.Inject; import org.apache.commons.lang3.tuple.Pair; diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/TransactionFactoryImpl.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/TransactionFactoryImpl.java index 58939457..29c03c12 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/TransactionFactoryImpl.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/TransactionFactoryImpl.java @@ -2,6 +2,7 @@ package com.usatiuk.dhfs.objects.transaction; import com.usatiuk.dhfs.objects.*; import com.usatiuk.dhfs.objects.persistence.IteratorStart; +import com.usatiuk.dhfs.objects.snapshot.SnapshotManager; import io.quarkus.logging.Log; import jakarta.enterprise.context.ApplicationScoped; import jakarta.inject.Inject; diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/TransactionPrivate.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/TransactionPrivate.java index 7a3c0705..766a3a63 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/TransactionPrivate.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/TransactionPrivate.java @@ -1,7 +1,7 @@ package com.usatiuk.dhfs.objects.transaction; import com.usatiuk.dhfs.objects.JObjectKey; -import com.usatiuk.dhfs.objects.SnapshotManager; +import com.usatiuk.dhfs.objects.snapshot.SnapshotManager; import com.usatiuk.dhfs.utils.AutoCloseableNoThrow; import java.util.Collection;