Objects: remove TransactionFactory

This commit is contained in:
2025-05-07 14:36:21 +02:00
parent 469a6b9011
commit 9403556220
6 changed files with 255 additions and 277 deletions

View File

@@ -188,7 +188,7 @@ public class WritebackObjectPersistentStore {
Log.info("Writeback thread exiting");
}
public long commitBundle(Collection<TxRecord.TxObjectRecord<?>> writes) {
private long commitBundle(Collection<TxRecord.TxObjectRecord<?>> writes) {
verifyReady();
_pendingBundleLock.lock();
try {

View File

@@ -1,5 +0,0 @@
package com.usatiuk.objects.transaction;
public interface TransactionFactory {
TransactionPrivate createTransaction();
}

View File

@@ -1,262 +0,0 @@
package com.usatiuk.objects.transaction;
import com.usatiuk.objects.JData;
import com.usatiuk.objects.JDataVersionedWrapper;
import com.usatiuk.objects.JObjectKey;
import com.usatiuk.objects.iterators.*;
import com.usatiuk.objects.snapshot.Snapshot;
import com.usatiuk.objects.stores.WritebackObjectPersistentStore;
import com.usatiuk.utils.ListUtils;
import io.quarkus.logging.Log;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import org.apache.commons.lang3.tuple.Pair;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import java.util.*;
@Singleton
public class TransactionFactoryImpl implements TransactionFactory {
@Inject
WritebackObjectPersistentStore writebackObjectPersistentStore;
@Override
public TransactionPrivate createTransaction() {
return new TransactionImpl();
}
private interface ReadTrackingInternalCrap {
boolean fromSource();
JData obj();
}
// FIXME:
private record ReadTrackingInternalCrapSource(JDataVersionedWrapper wrapped) implements ReadTrackingInternalCrap {
@Override
public boolean fromSource() {
return true;
}
@Override
public JData obj() {
return wrapped.data();
}
}
private record ReadTrackingInternalCrapTx(JData obj) implements ReadTrackingInternalCrap {
@Override
public boolean fromSource() {
return false;
}
}
private class TransactionImpl implements TransactionPrivate {
private final Map<JObjectKey, Optional<JDataVersionedWrapper>> _readSet = new HashMap<>();
private final NavigableMap<JObjectKey, TxRecord.TxObjectRecord<?>> _writes = new TreeMap<>();
private final List<Runnable> _onCommit = new LinkedList<>();
private final List<Runnable> _onFlush = new LinkedList<>();
private final HashSet<JObjectKey> _knownNew = new HashSet<>();
private final Snapshot<JObjectKey, JDataVersionedWrapper> _snapshot;
private boolean _closed = false;
private boolean _writeTrack = false;
private Map<JObjectKey, TxRecord.TxObjectRecord<?>> _newWrites = new HashMap<>();
private TransactionImpl() {
_snapshot = writebackObjectPersistentStore.getSnapshot();
}
@Override
public void onCommit(Runnable runnable) {
_onCommit.add(runnable);
}
@Override
public void onFlush(Runnable runnable) {
_onFlush.add(runnable);
}
@Override
public Collection<Runnable> getOnCommit() {
return Collections.unmodifiableCollection(_onCommit);
}
@Override
public Snapshot<JObjectKey, JDataVersionedWrapper> snapshot() {
return _snapshot;
}
@Override
public Collection<Runnable> getOnFlush() {
return Collections.unmodifiableCollection(_onFlush);
}
@Override
public <T extends JData> Optional<T> getFromSource(Class<T> type, JObjectKey key) {
if (_knownNew.contains(key)) {
return Optional.empty();
}
return _readSet.computeIfAbsent(key, _snapshot::readObject)
.map(JDataVersionedWrapper::data)
.map(type::cast);
}
@Override
public <T extends JData> Optional<T> get(Class<T> type, JObjectKey key) {
return switch (_writes.get(key)) {
case TxRecord.TxObjectRecordWrite<?> write -> Optional.of(type.cast(write.data()));
case TxRecord.TxObjectRecordDeleted deleted -> Optional.empty();
case null -> getFromSource(type, key);
};
}
@Override
public void delete(JObjectKey key) {
var record = new TxRecord.TxObjectRecordDeleted(key);
if (_writes.put(key, record) instanceof TxRecord.TxObjectRecordDeleted) {
return;
}
if (_writeTrack)
_newWrites.put(key, record);
}
@Override
public CloseableKvIterator<JObjectKey, JData> getIterator(IteratorStart start, JObjectKey key) {
Log.tracev("Getting tx iterator with start={0}, key={1}", start, key);
return new ReadTrackingIterator(new TombstoneSkippingIterator<JObjectKey, ReadTrackingInternalCrap>(start, key,
ListUtils.prependAndMap(
new MappingKvIterator<>(new NavigableMapKvIterator<>(_writes, start, key),
t -> switch (t) {
case TxRecord.TxObjectRecordWrite<?> write ->
new DataWrapper<ReadTrackingInternalCrap>(new ReadTrackingInternalCrapTx(write.data()));
case TxRecord.TxObjectRecordDeleted deleted ->
new TombstoneImpl<ReadTrackingInternalCrap>();
case null, default -> null;
}),
_snapshot.getIterator(start, key),
itin -> new MappingKvIterator<JObjectKey, MaybeTombstone<JDataVersionedWrapper>, MaybeTombstone<ReadTrackingInternalCrap>>(itin,
d -> switch (d) {
case Data<JDataVersionedWrapper> w ->
new DataWrapper<>(new ReadTrackingInternalCrapSource(w.value()));
case Tombstone<JDataVersionedWrapper> t -> new TombstoneImpl<>();
case null, default -> null;
}))));
}
@Override
public void put(JData obj) {
var key = obj.key();
var read = _readSet.get(key);
if (read != null && (read.map(JDataVersionedWrapper::data).orElse(null) == obj)) {
return;
}
var record = new TxRecord.TxObjectRecordWrite<>(obj);
_writes.put(key, record);
if (_writeTrack)
_newWrites.put(key, record);
}
@Override
public void putNew(JData obj) {
var key = obj.key();
_knownNew.add(key);
var record = new TxRecord.TxObjectRecordWrite<>(obj);
_writes.put(key, record);
if (_writeTrack)
_newWrites.put(key, record);
}
@Override
public Collection<TxRecord.TxObjectRecord<?>> drainNewWrites() {
if (!_writeTrack) {
_writeTrack = true;
return Collections.unmodifiableCollection(_writes.values());
}
var ret = _newWrites;
_newWrites = new HashMap<>();
return ret.values();
}
@Override
public Map<JObjectKey, Optional<JDataVersionedWrapper>> reads() {
return _readSet;
}
@Override
public Set<JObjectKey> knownNew() {
return _knownNew;
}
@Override
public void close() {
if (_closed) return;
_closed = true;
_snapshot.close();
}
private class ReadTrackingIterator implements CloseableKvIterator<JObjectKey, JData> {
private final CloseableKvIterator<JObjectKey, ReadTrackingInternalCrap> _backing;
public ReadTrackingIterator(CloseableKvIterator<JObjectKey, ReadTrackingInternalCrap> backing) {
_backing = backing;
}
@Override
public JObjectKey peekNextKey() {
return _backing.peekNextKey();
}
@Override
public void skip() {
_backing.skip();
}
@Override
public JObjectKey peekPrevKey() {
return _backing.peekPrevKey();
}
@Override
public Pair<JObjectKey, JData> prev() {
var got = _backing.prev();
if (got.getValue() instanceof ReadTrackingInternalCrapSource(JDataVersionedWrapper wrapped)) {
_readSet.putIfAbsent(got.getKey(), Optional.of(wrapped));
}
return Pair.of(got.getKey(), got.getValue().obj());
}
@Override
public boolean hasPrev() {
return _backing.hasPrev();
}
@Override
public void skipPrev() {
_backing.skipPrev();
}
@Override
public void close() {
_backing.close();
}
@Override
public boolean hasNext() {
return _backing.hasNext();
}
@Override
public Pair<JObjectKey, JData> next() {
var got = _backing.next();
if (got.getValue() instanceof ReadTrackingInternalCrapSource(JDataVersionedWrapper wrapped)) {
_readSet.putIfAbsent(got.getKey(), Optional.of(wrapped));
}
return Pair.of(got.getKey(), got.getValue().obj());
}
}
}
}

View File

@@ -0,0 +1,247 @@
package com.usatiuk.objects.transaction;
import com.usatiuk.objects.JData;
import com.usatiuk.objects.JDataVersionedWrapper;
import com.usatiuk.objects.JObjectKey;
import com.usatiuk.objects.iterators.*;
import com.usatiuk.objects.snapshot.Snapshot;
import com.usatiuk.utils.ListUtils;
import io.quarkus.logging.Log;
import org.apache.commons.lang3.tuple.Pair;
import java.util.*;
class TransactionImpl implements TransactionPrivate {
private final Map<JObjectKey, Optional<JDataVersionedWrapper>> _readSet = new HashMap<>();
private final NavigableMap<JObjectKey, TxRecord.TxObjectRecord<?>> _writes = new TreeMap<>();
private final List<Runnable> _onCommit = new LinkedList<>();
private final List<Runnable> _onFlush = new LinkedList<>();
private final HashSet<JObjectKey> _knownNew = new HashSet<>();
private final Snapshot<JObjectKey, JDataVersionedWrapper> _snapshot;
private boolean _closed = false;
private boolean _writeTrack = false;
private Map<JObjectKey, TxRecord.TxObjectRecord<?>> _newWrites = new HashMap<>();
private interface ReadTrackingInternalCrap {
boolean fromSource();
JData obj();
}
// FIXME:
private record ReadTrackingInternalCrapSource(JDataVersionedWrapper wrapped) implements ReadTrackingInternalCrap {
@Override
public boolean fromSource() {
return true;
}
@Override
public JData obj() {
return wrapped.data();
}
}
private record ReadTrackingInternalCrapTx(JData obj) implements ReadTrackingInternalCrap {
@Override
public boolean fromSource() {
return false;
}
}
TransactionImpl(Snapshot<JObjectKey, JDataVersionedWrapper> snapshot) {
_snapshot = snapshot;
}
@Override
public void onCommit(Runnable runnable) {
_onCommit.add(runnable);
}
@Override
public void onFlush(Runnable runnable) {
_onFlush.add(runnable);
}
@Override
public Collection<Runnable> getOnCommit() {
return Collections.unmodifiableCollection(_onCommit);
}
@Override
public Snapshot<JObjectKey, JDataVersionedWrapper> snapshot() {
return _snapshot;
}
@Override
public Collection<Runnable> getOnFlush() {
return Collections.unmodifiableCollection(_onFlush);
}
@Override
public <T extends JData> Optional<T> getFromSource(Class<T> type, JObjectKey key) {
if (_knownNew.contains(key)) {
return Optional.empty();
}
return _readSet.computeIfAbsent(key, _snapshot::readObject)
.map(JDataVersionedWrapper::data)
.map(type::cast);
}
@Override
public <T extends JData> Optional<T> get(Class<T> type, JObjectKey key) {
return switch (_writes.get(key)) {
case TxRecord.TxObjectRecordWrite<?> write -> Optional.of(type.cast(write.data()));
case TxRecord.TxObjectRecordDeleted deleted -> Optional.empty();
case null -> getFromSource(type, key);
};
}
@Override
public void delete(JObjectKey key) {
var record = new TxRecord.TxObjectRecordDeleted(key);
if (_writes.put(key, record) instanceof TxRecord.TxObjectRecordDeleted) {
return;
}
if (_writeTrack)
_newWrites.put(key, record);
}
@Override
public CloseableKvIterator<JObjectKey, JData> getIterator(IteratorStart start, JObjectKey key) {
Log.tracev("Getting tx iterator with start={0}, key={1}", start, key);
return new ReadTrackingIterator(new TombstoneSkippingIterator<JObjectKey, ReadTrackingInternalCrap>(start, key,
ListUtils.prependAndMap(
new MappingKvIterator<>(new NavigableMapKvIterator<>(_writes, start, key),
t -> switch (t) {
case TxRecord.TxObjectRecordWrite<?> write ->
new DataWrapper<ReadTrackingInternalCrap>(new ReadTrackingInternalCrapTx(write.data()));
case TxRecord.TxObjectRecordDeleted deleted ->
new TombstoneImpl<ReadTrackingInternalCrap>();
case null, default -> null;
}),
_snapshot.getIterator(start, key),
itin -> new MappingKvIterator<JObjectKey, MaybeTombstone<JDataVersionedWrapper>, MaybeTombstone<ReadTrackingInternalCrap>>(itin,
d -> switch (d) {
case Data<JDataVersionedWrapper> w ->
new DataWrapper<>(new ReadTrackingInternalCrapSource(w.value()));
case Tombstone<JDataVersionedWrapper> t -> new TombstoneImpl<>();
case null, default -> null;
}))));
}
@Override
public void put(JData obj) {
var key = obj.key();
var read = _readSet.get(key);
if (read != null && (read.map(JDataVersionedWrapper::data).orElse(null) == obj)) {
return;
}
var record = new TxRecord.TxObjectRecordWrite<>(obj);
_writes.put(key, record);
if (_writeTrack)
_newWrites.put(key, record);
}
@Override
public void putNew(JData obj) {
var key = obj.key();
_knownNew.add(key);
var record = new TxRecord.TxObjectRecordWrite<>(obj);
_writes.put(key, record);
if (_writeTrack)
_newWrites.put(key, record);
}
@Override
public Collection<TxRecord.TxObjectRecord<?>> drainNewWrites() {
if (!_writeTrack) {
_writeTrack = true;
return Collections.unmodifiableCollection(_writes.values());
}
var ret = _newWrites;
_newWrites = new HashMap<>();
return ret.values();
}
@Override
public Map<JObjectKey, Optional<JDataVersionedWrapper>> reads() {
return _readSet;
}
@Override
public Set<JObjectKey> knownNew() {
return _knownNew;
}
@Override
public void close() {
if (_closed) return;
_closed = true;
_snapshot.close();
}
private class ReadTrackingIterator implements CloseableKvIterator<JObjectKey, JData> {
private final CloseableKvIterator<JObjectKey, ReadTrackingInternalCrap> _backing;
public ReadTrackingIterator(CloseableKvIterator<JObjectKey, ReadTrackingInternalCrap> backing) {
_backing = backing;
}
@Override
public JObjectKey peekNextKey() {
return _backing.peekNextKey();
}
@Override
public void skip() {
_backing.skip();
}
@Override
public JObjectKey peekPrevKey() {
return _backing.peekPrevKey();
}
@Override
public Pair<JObjectKey, JData> prev() {
var got = _backing.prev();
if (got.getValue() instanceof ReadTrackingInternalCrapSource(JDataVersionedWrapper wrapped)) {
_readSet.putIfAbsent(got.getKey(), Optional.of(wrapped));
}
return Pair.of(got.getKey(), got.getValue().obj());
}
@Override
public boolean hasPrev() {
return _backing.hasPrev();
}
@Override
public void skipPrev() {
_backing.skipPrev();
}
@Override
public void close() {
_backing.close();
}
@Override
public boolean hasNext() {
return _backing.hasNext();
}
@Override
public Pair<JObjectKey, JData> next() {
var got = _backing.next();
if (got.getValue() instanceof ReadTrackingInternalCrapSource(JDataVersionedWrapper wrapped)) {
_readSet.putIfAbsent(got.getKey(), Optional.of(wrapped));
}
return Pair.of(got.getKey(), got.getValue().obj());
}
}
}

View File

@@ -13,12 +13,12 @@ import java.util.Stack;
public class TransactionManagerImpl implements TransactionManager {
private static final ThreadLocal<Stack<TransactionPrivate>> _currentTransaction = ThreadLocal.withInitial(Stack::new);
@Inject
JObjectManager jObjectManager;
TransactionService transactionService;
@Override
public void begin() {
Log.trace("Starting transaction");
var tx = jObjectManager.createTransaction();
var tx = transactionService.createTransaction();
_currentTransaction.get().push(tx);
}
@@ -34,7 +34,7 @@ public class TransactionManagerImpl implements TransactionManager {
Pair<Collection<Runnable>, TransactionHandle> ret;
try {
ret = jObjectManager.commit(peeked);
ret = transactionService.commit(peeked);
} catch (Throwable e) {
Log.trace("Transaction commit failed", e);
throw e;
@@ -64,7 +64,7 @@ public class TransactionManagerImpl implements TransactionManager {
var peeked = stack.peek();
try {
jObjectManager.rollback(peeked);
transactionService.rollback(peeked);
} catch (Throwable e) {
Log.error("Transaction rollback failed", e);
throw e;

View File

@@ -22,12 +22,10 @@ import java.util.function.Consumer;
import java.util.function.Function;
@ApplicationScoped
public class JObjectManager {
public class TransactionService {
private static final List<PreCommitTxHook> _preCommitTxHooks;
@Inject
WritebackObjectPersistentStore writebackObjectPersistentStore;
@Inject
TransactionFactory transactionFactory;
private boolean _ready = false;
private final DataLocker _objLocker = new DataLocker();
@@ -36,7 +34,7 @@ public class JObjectManager {
_preCommitTxHooks = List.copyOf(CDI.current().select(PreCommitTxHook.class).stream().sorted(Comparator.comparingInt(PreCommitTxHook::getPriority)).toList());
}
JObjectManager(Instance<PreCommitTxHook> preCommitTxHooks) {
TransactionService(Instance<PreCommitTxHook> preCommitTxHooks) {
Log.debugv("Pre-commit hooks: {0}", String.join("->", _preCommitTxHooks.stream().map(Objects::toString).toList()));
}
@@ -50,7 +48,7 @@ public class JObjectManager {
public TransactionPrivate createTransaction() {
verifyReady();
var tx = transactionFactory.createTransaction();
var tx = new TransactionImpl(writebackObjectPersistentStore.getSnapshot());
Log.tracev("Created transaction with snapshotId={0}", tx.snapshot().id());
return tx;
}