Objects: interfacify MaybeTombstone Data

This commit is contained in:
2025-04-19 17:25:06 +02:00
parent f3e4d99fcb
commit eaa413e200
15 changed files with 73 additions and 91 deletions

View File

@@ -1,6 +1,13 @@
package com.usatiuk.objects; package com.usatiuk.objects;
public sealed interface JDataVersionedWrapper permits JDataVersionedWrapperLazy, JDataVersionedWrapperImpl { import com.usatiuk.objects.iterators.Data;
public sealed interface JDataVersionedWrapper extends Data<JDataVersionedWrapper> permits JDataVersionedWrapperLazy, JDataVersionedWrapperImpl {
@Override
default JDataVersionedWrapper value() {
return this;
}
JData data(); JData data();
long version(); long version();

View File

@@ -2,9 +2,6 @@ package com.usatiuk.objects.iterators;
import java.util.Optional; import java.util.Optional;
public record Data<V>(V value) implements MaybeTombstone<V> { public interface Data<V> extends MaybeTombstone<V> {
@Override V value();
public Optional<V> opt() {
return Optional.of(value);
}
} }

View File

@@ -0,0 +1,6 @@
package com.usatiuk.objects.iterators;
import java.util.Optional;
public record DataWrapper<V>(V value) implements Data<V> {
}

View File

@@ -3,5 +3,4 @@ package com.usatiuk.objects.iterators;
import java.util.Optional; import java.util.Optional;
public interface MaybeTombstone<T> { public interface MaybeTombstone<T> {
Optional<T> opt();
} }

View File

@@ -9,22 +9,22 @@ public class NavigableMapKvIterator<K extends Comparable<K>, V> extends Reversib
private Iterator<Map.Entry<K, V>> _iterator; private Iterator<Map.Entry<K, V>> _iterator;
private Map.Entry<K, V> _next; private Map.Entry<K, V> _next;
public NavigableMapKvIterator(NavigableMap<K, V> map, IteratorStart start, K key) { public NavigableMapKvIterator(NavigableMap<K, ? extends V> map, IteratorStart start, K key) {
_map = map; _map = (NavigableMap<K, V>) map;
SortedMap<K, V> _view; SortedMap<K, V> _view;
_goingForward = true; _goingForward = true;
switch (start) { switch (start) {
case GE -> _view = map.tailMap(key, true); case GE -> _view = _map.tailMap(key, true);
case GT -> _view = map.tailMap(key, false); case GT -> _view = _map.tailMap(key, false);
case LE -> { case LE -> {
var floorKey = map.floorKey(key); var floorKey = _map.floorKey(key);
if (floorKey == null) _view = _map; if (floorKey == null) _view = _map;
else _view = map.tailMap(floorKey, true); else _view = _map.tailMap(floorKey, true);
} }
case LT -> { case LT -> {
var lowerKey = map.lowerKey(key); var lowerKey = map.lowerKey(key);
if (lowerKey == null) _view = _map; if (lowerKey == null) _view = _map;
else _view = map.tailMap(lowerKey, true); else _view = _map.tailMap(lowerKey, true);
} }
default -> throw new IllegalArgumentException("Unknown start type"); default -> throw new IllegalArgumentException("Unknown start type");
} }

View File

@@ -2,9 +2,5 @@ package com.usatiuk.objects.iterators;
import java.util.Optional; import java.util.Optional;
public record Tombstone<V>() implements MaybeTombstone<V> { public interface Tombstone<V> extends MaybeTombstone<V> {
@Override
public Optional<V> opt() {
return Optional.empty();
}
} }

View File

@@ -0,0 +1,4 @@
package com.usatiuk.objects.iterators;
public record TombstoneImpl<V>() implements Tombstone<V> {
}

View File

@@ -16,7 +16,7 @@ public class TombstoneMergingKvIterator<K extends Comparable<K>, V> implements C
startType, startKey, startType, startKey,
pair -> { pair -> {
Log.tracev("{0} - Processing pair {1}", _name, pair); Log.tracev("{0} - Processing pair {1}", _name, pair);
if (pair instanceof Tombstone) { if (pair instanceof Tombstone<V>) {
return null; return null;
} }
return ((Data<V>) pair).value(); return ((Data<V>) pair).value();

View File

@@ -34,10 +34,9 @@ public class CachingObjectPersistentStore {
long version, long version,
int sizeLimit) { int sizeLimit) {
public Cache withPut(JObjectKey key, Optional<JDataVersionedWrapper> obj) { public Cache withPut(JObjectKey key, Optional<JDataVersionedWrapper> obj) {
int objSize = obj.map(JDataVersionedWrapper::estimateSize).orElse(16); var entry = obj.<CacheEntry>map(o -> new CacheEntryPresent(o, o.estimateSize())).orElse(new CacheEntryMiss());
int newSize = size() + objSize; int newSize = size() + entry.size();
var entry = new CacheEntry(obj.<MaybeTombstone<JDataVersionedWrapper>>map(Data::new).orElse(new Tombstone<>()), objSize);
var old = map.get(key); var old = map.get(key);
if (old != null) if (old != null)
@@ -188,15 +187,8 @@ public class CachingObjectPersistentStore {
@Override @Override
public CloseableKvIterator<JObjectKey, JDataVersionedWrapper> getIterator(IteratorStart start, JObjectKey key) { public CloseableKvIterator<JObjectKey, JDataVersionedWrapper> getIterator(IteratorStart start, JObjectKey key) {
return new TombstoneMergingKvIterator<>("cache", start, key, return new TombstoneMergingKvIterator<>("cache", start, key,
(mS, mK) (mS, mK) -> new NavigableMapKvIterator<JObjectKey, MaybeTombstone<JDataVersionedWrapper>>(_curCache.map(), mS, mK),
-> new MappingKvIterator<>( (mS, mK) -> new CachingKvIterator(_backing.getIterator(start, key)));
new NavigableMapKvIterator<>(_curCache.map(), mS, mK),
e -> {
// Log.tracev("Taken from cache: {0}", e);
return e.object();
}
),
(mS, mK) -> new MappingKvIterator<>(new CachingKvIterator(_backing.getIterator(start, key)), Data::new));
} }
@Nonnull @Nonnull
@@ -204,12 +196,12 @@ public class CachingObjectPersistentStore {
public Optional<JDataVersionedWrapper> readObject(JObjectKey name) { public Optional<JDataVersionedWrapper> readObject(JObjectKey name) {
var cached = _curCache.map().get(name); var cached = _curCache.map().get(name);
if (cached != null) { if (cached != null) {
return switch (cached.object()) { return switch (cached) {
case Data<JDataVersionedWrapper> data -> Optional.of(data.value()); case CacheEntryPresent data -> Optional.of(data.value());
case Tombstone<JDataVersionedWrapper> tombstone -> { case CacheEntryMiss tombstone -> {
yield Optional.empty(); yield Optional.empty();
} }
default -> throw new IllegalStateException("Unexpected value: " + cached.object()); default -> throw new IllegalStateException("Unexpected value: " + cached);
}; };
} }
var read = _backing.readObject(name); var read = _backing.readObject(name);
@@ -228,7 +220,7 @@ public class CachingObjectPersistentStore {
_backing.close(); _backing.close();
} }
private class CachingKvIterator implements CloseableKvIterator<JObjectKey, JDataVersionedWrapper> { private class CachingKvIterator implements CloseableKvIterator<JObjectKey, MaybeTombstone<JDataVersionedWrapper>> {
private final CloseableKvIterator<JObjectKey, JDataVersionedWrapper> _delegate; private final CloseableKvIterator<JObjectKey, JDataVersionedWrapper> _delegate;
private CachingKvIterator(CloseableKvIterator<JObjectKey, JDataVersionedWrapper> delegate) { private CachingKvIterator(CloseableKvIterator<JObjectKey, JDataVersionedWrapper> delegate) {
@@ -261,10 +253,10 @@ public class CachingObjectPersistentStore {
} }
@Override @Override
public Pair<JObjectKey, JDataVersionedWrapper> prev() { public Pair<JObjectKey, MaybeTombstone<JDataVersionedWrapper>> prev() {
var prev = _delegate.prev(); var prev = _delegate.prev();
maybeCache(prev.getKey(), Optional.of(prev.getValue())); maybeCache(prev.getKey(), Optional.of(prev.getValue()));
return prev; return (Pair<JObjectKey, MaybeTombstone<JDataVersionedWrapper>>) (Pair<JObjectKey, ?>) prev;
} }
@Override @Override
@@ -278,10 +270,10 @@ public class CachingObjectPersistentStore {
} }
@Override @Override
public Pair<JObjectKey, JDataVersionedWrapper> next() { public Pair<JObjectKey, MaybeTombstone<JDataVersionedWrapper>> next() {
var next = _delegate.next(); var next = _delegate.next();
maybeCache(next.getKey(), Optional.of(next.getValue())); maybeCache(next.getKey(), Optional.of(next.getValue()));
return next; return (Pair<JObjectKey, MaybeTombstone<JDataVersionedWrapper>>) (Pair<JObjectKey, ?>) next;
} }
} }
}; };
@@ -294,6 +286,18 @@ public class CachingObjectPersistentStore {
} }
} }
private record CacheEntry(MaybeTombstone<JDataVersionedWrapper> object, int size) { private interface CacheEntry extends MaybeTombstone<JDataVersionedWrapper> {
int size();
}
private record CacheEntryPresent(JDataVersionedWrapper value,
int size) implements CacheEntry, Data<JDataVersionedWrapper> {
}
private record CacheEntryMiss() implements CacheEntry, Tombstone<JDataVersionedWrapper> {
@Override
public int size() {
return 64;
}
} }
} }

View File

@@ -1,6 +1,9 @@
package com.usatiuk.objects.stores; package com.usatiuk.objects.stores;
import com.usatiuk.objects.JDataVersionedWrapper;
import com.usatiuk.objects.JObjectKey; import com.usatiuk.objects.JObjectKey;
import com.usatiuk.objects.iterators.Tombstone;
public record PendingDelete(JObjectKey key, long bundleId) implements PendingWriteEntry { public record PendingDelete(JObjectKey key,
long bundleId) implements PendingWriteEntry, Tombstone<JDataVersionedWrapper> {
} }

View File

@@ -1,6 +1,7 @@
package com.usatiuk.objects.stores; package com.usatiuk.objects.stores;
import com.usatiuk.objects.JDataVersionedWrapper; import com.usatiuk.objects.JDataVersionedWrapper;
import com.usatiuk.objects.iterators.Data;
public record PendingWrite(JDataVersionedWrapper data, long bundleId) implements PendingWriteEntry { public record PendingWrite(JDataVersionedWrapper value, long bundleId) implements PendingWriteEntry, Data<JDataVersionedWrapper> {
} }

View File

@@ -1,5 +1,8 @@
package com.usatiuk.objects.stores; package com.usatiuk.objects.stores;
public interface PendingWriteEntry { import com.usatiuk.objects.JDataVersionedWrapper;
import com.usatiuk.objects.iterators.MaybeTombstone;
public interface PendingWriteEntry extends MaybeTombstone<JDataVersionedWrapper> {
long bundleId(); long bundleId();
} }

View File

@@ -350,15 +350,9 @@ public class WritebackObjectPersistentStore {
@Override @Override
public CloseableKvIterator<JObjectKey, JDataVersionedWrapper> getIterator(IteratorStart start, JObjectKey key) { public CloseableKvIterator<JObjectKey, JDataVersionedWrapper> getIterator(IteratorStart start, JObjectKey key) {
return new TombstoneMergingKvIterator<>("writeback-ps", start, key, return new TombstoneMergingKvIterator<JObjectKey, JDataVersionedWrapper>("writeback-ps", start, key,
(tS, tK) -> new MappingKvIterator<>( (tS, tK) -> new NavigableMapKvIterator<>(_pendingWrites, tS, tK),
new NavigableMapKvIterator<>(_pendingWrites, tS, tK), (tS, tK) -> (CloseableKvIterator<JObjectKey, MaybeTombstone<JDataVersionedWrapper>>) (CloseableKvIterator<JObjectKey, ?>) _cache.getIterator(tS, tK));
e -> switch (e) {
case PendingWrite pw -> new Data<>(pw.data());
case PendingDelete d -> new Tombstone<>();
default -> throw new IllegalStateException("Unexpected value: " + e);
}),
(tS, tK) -> new MappingKvIterator<>(_cache.getIterator(tS, tK), Data::new));
} }
@Nonnull @Nonnull
@@ -367,7 +361,7 @@ public class WritebackObjectPersistentStore {
var cached = _pendingWrites.get(name); var cached = _pendingWrites.get(name);
if (cached != null) { if (cached != null) {
return switch (cached) { return switch (cached) {
case PendingWrite c -> Optional.of(c.data()); case PendingWrite c -> Optional.of(c.value());
case PendingDelete d -> { case PendingDelete d -> {
yield Optional.empty(); yield Optional.empty();
} }

View File

@@ -21,10 +21,6 @@ import java.util.*;
import java.util.function.Function; import java.util.function.Function;
import java.util.stream.Stream; import java.util.stream.Stream;
// Manages all access to com.usatiuk.objects.JData objects.
// In particular, it serves as a source of truth for what is committed to the backing storage.
// All data goes through it, it is responsible for transaction atomicity
// TODO: persistent tx id
@ApplicationScoped @ApplicationScoped
public class JObjectManager { public class JObjectManager {
private final List<PreCommitTxHook> _preCommitTxHooks; private final List<PreCommitTxHook> _preCommitTxHooks;
@@ -219,7 +215,7 @@ public class JObjectManager {
// TODO: Every write gets a dependency due to hooks // TODO: Every write gets a dependency due to hooks
continue; continue;
// assert false; // assert false;
// throw new TxCommitException("Serialization hazard: " + dep.isEmpty() + " vs " + read.getValue().data().isEmpty()); // throw new TxCommitException("Serialization hazard: " + dep.isEmpty() + " vs " + read.getValue().value().isEmpty());
} }
if (current.get().version() > snapshotId) { if (current.get().version() > snapshotId) {
@@ -270,31 +266,4 @@ public class JObjectManager {
}); });
tx.close(); tx.close();
} }
// private class TransactionObjectSourceImpl implements TransactionObjectSource {
// private final long _txId;
//
// private TransactionObjectSourceImpl(long txId) {
// _txId = txId;
// }
//
// @Override
// public <T extends JData> TransactionObject<T> get(Class<T> type, JObjectKey key) {
// var got = getObj(type, key);
// if (got.data().isPresent() && got.data().get().version() > _txId) {
// throw new TxCommitException("Serialization race for " + key + ": " + got.data().get().version() + " vs " + _txId);
// }
// return got;
// }
//
// @Override
// public <T extends JData> TransactionObject<T> getWriteLocked(Class<T> type, JObjectKey key) {
// var got = getObjLock(type, key);
// if (got.data().isPresent() && got.data().get().version() > _txId) {
// got.lock().close();
// throw new TxCommitException("Serialization race for " + key + ": " + got.data().get().version() + " vs " + _txId);
// }
// return got;
// }
// }
} }

View File

@@ -7,7 +7,6 @@ import com.usatiuk.objects.iterators.*;
import com.usatiuk.objects.snapshot.Snapshot; import com.usatiuk.objects.snapshot.Snapshot;
import com.usatiuk.objects.snapshot.SnapshotManager; import com.usatiuk.objects.snapshot.SnapshotManager;
import io.quarkus.logging.Log; import io.quarkus.logging.Log;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject; import jakarta.inject.Inject;
import jakarta.inject.Singleton; import jakarta.inject.Singleton;
import org.apache.commons.lang3.tuple.Pair; import org.apache.commons.lang3.tuple.Pair;
@@ -166,12 +165,12 @@ public class TransactionFactoryImpl implements TransactionFactory {
(tS, tK) -> new MappingKvIterator<>(new NavigableMapKvIterator<>(_writes, tS, tK), (tS, tK) -> new MappingKvIterator<>(new NavigableMapKvIterator<>(_writes, tS, tK),
t -> switch (t) { t -> switch (t) {
case TxRecord.TxObjectRecordWrite<?> write -> case TxRecord.TxObjectRecordWrite<?> write ->
new Data<>(new ReadTrackingInternalCrapTx(write.data())); new DataWrapper<>(new ReadTrackingInternalCrapTx(write.data()));
case TxRecord.TxObjectRecordDeleted deleted -> new Tombstone<>(); case TxRecord.TxObjectRecordDeleted deleted -> new TombstoneImpl<>();
case null, default -> null; case null, default -> null;
}), }),
(tS, tK) -> new MappingKvIterator<>(_snapshot.getIterator(tS, tK), (tS, tK) -> new MappingKvIterator<>(_snapshot.getIterator(tS, tK),
d -> new Data<ReadTrackingInternalCrap>(new ReadTrackingInternalCrapSource(d))))); d -> new DataWrapper<ReadTrackingInternalCrap>(new ReadTrackingInternalCrapSource(d)))));
} }
@Override @Override