3 Commits

Author SHA1 Message Date
de211bb2d2 Objects: remove prepareTx 2025-05-07 16:12:47 +02:00
56ab3bad4c Objects: remove TransactionPrivate 2025-05-07 15:00:15 +02:00
9403556220 Objects: remove TransactionFactory 2025-05-07 14:39:55 +02:00
13 changed files with 271 additions and 351 deletions

View File

@@ -31,7 +31,6 @@ public class CachingObjectPersistentStore {
SerializingObjectPersistentStore delegate;
@ConfigProperty(name = "dhfs.objects.lru.print-stats")
boolean printStats;
private ExecutorService _commitExecutor;
private ExecutorService _statusExecutor;
private AtomicLong _cached = new AtomicLong();
private AtomicLong _cacheTries = new AtomicLong();
@@ -47,7 +46,6 @@ public class CachingObjectPersistentStore {
_cache.set(_cache.get().withVersion(s.id()));
}
_commitExecutor = Executors.newSingleThreadExecutor();
if (printStats) {
_statusExecutor = Executors.newSingleThreadExecutor();
_statusExecutor.submit(() -> {
@@ -68,7 +66,6 @@ public class CachingObjectPersistentStore {
Log.tracev("Committing: {0} writes, {1} deletes", objs.written().size(), objs.deleted().size());
var cache = _cache.get();
var commitFuture = _commitExecutor.submit(() -> delegate.prepareTx(objs, txId).run());
for (var write : objs.written()) {
cache = cache.withPut(write.getLeft(), Optional.of(write.getRight()));
}
@@ -76,11 +73,7 @@ public class CachingObjectPersistentStore {
cache = cache.withPut(del, Optional.empty());
}
cache = cache.withVersion(txId);
try {
commitFuture.get();
} catch (Exception e) {
throw new RuntimeException(e);
}
delegate.commitTx(objs, txId);
_cache.set(cache);
Log.tracev("Committed: {0} writes, {1} deletes", objs.written().size(), objs.deleted().size());

View File

@@ -145,10 +145,9 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore {
}
@Override
public Runnable prepareTx(TxManifestRaw names, long txId) {
public void commitTx(TxManifestRaw names, long txId) {
verifyReady();
var txn = _env.txnWrite();
try {
try (var txn = _env.txnWrite()) {
for (var written : names.written()) {
var putBb = _db.reserve(txn, written.getKey().toByteBuffer(), written.getValue().size());
written.getValue().copyTo(putBb);
@@ -163,17 +162,8 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore {
bbData.putLong(txId);
bbData.flip();
_db.put(txn, DB_VER_OBJ_NAME.asReadOnlyBuffer(), bbData);
} catch (Throwable t) {
txn.close();
throw t;
txn.commit();
}
return () -> {
try {
txn.commit();
} finally {
txn.close();
}
};
}
@Override

View File

@@ -53,19 +53,18 @@ public class MemoryObjectPersistentStore implements ObjectPersistentStore {
}
}
public Runnable prepareTx(TxManifestRaw names, long txId) {
return () -> {
synchronized (this) {
for (var written : names.written()) {
_objects = _objects.plus(written.getKey(), written.getValue());
}
for (JObjectKey key : names.deleted()) {
_objects = _objects.minus(key);
}
assert txId > _lastCommitId;
_lastCommitId = txId;
@Override
public void commitTx(TxManifestRaw names, long txId) {
synchronized (this) {
for (var written : names.written()) {
_objects = _objects.plus(written.getKey(), written.getValue());
}
};
for (JObjectKey key : names.deleted()) {
_objects = _objects.minus(key);
}
assert txId > _lastCommitId;
_lastCommitId = txId;
}
}
@Override

View File

@@ -13,7 +13,7 @@ import java.util.Optional;
public interface ObjectPersistentStore {
Snapshot<JObjectKey, ByteBuffer> getSnapshot();
Runnable prepareTx(TxManifestRaw names, long txId);
void commitTx(TxManifestRaw names, long txId);
long getTotalSpace();

View File

@@ -62,7 +62,7 @@ public class SerializingObjectPersistentStore {
, objs.deleted());
}
Runnable prepareTx(TxManifestObj<? extends JDataVersionedWrapper> objects, long txId) {
return delegateStore.prepareTx(prepareManifest(objects), txId);
void commitTx(TxManifestObj<? extends JDataVersionedWrapper> objects, long txId) {
delegateStore.commitTx(prepareManifest(objects), txId);
}
}

View File

@@ -188,7 +188,7 @@ public class WritebackObjectPersistentStore {
Log.info("Writeback thread exiting");
}
public long commitBundle(Collection<TxRecord.TxObjectRecord<?>> writes) {
private long commitBundle(Collection<TxRecord.TxObjectRecord<?>> writes) {
verifyReady();
_pendingBundleLock.lock();
try {

View File

@@ -1,5 +0,0 @@
package com.usatiuk.objects.transaction;
public interface TransactionFactory {
TransactionPrivate createTransaction();
}

View File

@@ -1,262 +0,0 @@
package com.usatiuk.objects.transaction;
import com.usatiuk.objects.JData;
import com.usatiuk.objects.JDataVersionedWrapper;
import com.usatiuk.objects.JObjectKey;
import com.usatiuk.objects.iterators.*;
import com.usatiuk.objects.snapshot.Snapshot;
import com.usatiuk.objects.stores.WritebackObjectPersistentStore;
import com.usatiuk.utils.ListUtils;
import io.quarkus.logging.Log;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import org.apache.commons.lang3.tuple.Pair;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import java.util.*;
@Singleton
public class TransactionFactoryImpl implements TransactionFactory {
@Inject
WritebackObjectPersistentStore writebackObjectPersistentStore;
@Override
public TransactionPrivate createTransaction() {
return new TransactionImpl();
}
private interface ReadTrackingInternalCrap {
boolean fromSource();
JData obj();
}
// FIXME:
private record ReadTrackingInternalCrapSource(JDataVersionedWrapper wrapped) implements ReadTrackingInternalCrap {
@Override
public boolean fromSource() {
return true;
}
@Override
public JData obj() {
return wrapped.data();
}
}
private record ReadTrackingInternalCrapTx(JData obj) implements ReadTrackingInternalCrap {
@Override
public boolean fromSource() {
return false;
}
}
private class TransactionImpl implements TransactionPrivate {
private final Map<JObjectKey, Optional<JDataVersionedWrapper>> _readSet = new HashMap<>();
private final NavigableMap<JObjectKey, TxRecord.TxObjectRecord<?>> _writes = new TreeMap<>();
private final List<Runnable> _onCommit = new LinkedList<>();
private final List<Runnable> _onFlush = new LinkedList<>();
private final HashSet<JObjectKey> _knownNew = new HashSet<>();
private final Snapshot<JObjectKey, JDataVersionedWrapper> _snapshot;
private boolean _closed = false;
private boolean _writeTrack = false;
private Map<JObjectKey, TxRecord.TxObjectRecord<?>> _newWrites = new HashMap<>();
private TransactionImpl() {
_snapshot = writebackObjectPersistentStore.getSnapshot();
}
@Override
public void onCommit(Runnable runnable) {
_onCommit.add(runnable);
}
@Override
public void onFlush(Runnable runnable) {
_onFlush.add(runnable);
}
@Override
public Collection<Runnable> getOnCommit() {
return Collections.unmodifiableCollection(_onCommit);
}
@Override
public Snapshot<JObjectKey, JDataVersionedWrapper> snapshot() {
return _snapshot;
}
@Override
public Collection<Runnable> getOnFlush() {
return Collections.unmodifiableCollection(_onFlush);
}
@Override
public <T extends JData> Optional<T> getFromSource(Class<T> type, JObjectKey key) {
if (_knownNew.contains(key)) {
return Optional.empty();
}
return _readSet.computeIfAbsent(key, _snapshot::readObject)
.map(JDataVersionedWrapper::data)
.map(type::cast);
}
@Override
public <T extends JData> Optional<T> get(Class<T> type, JObjectKey key) {
return switch (_writes.get(key)) {
case TxRecord.TxObjectRecordWrite<?> write -> Optional.of(type.cast(write.data()));
case TxRecord.TxObjectRecordDeleted deleted -> Optional.empty();
case null -> getFromSource(type, key);
};
}
@Override
public void delete(JObjectKey key) {
var record = new TxRecord.TxObjectRecordDeleted(key);
if (_writes.put(key, record) instanceof TxRecord.TxObjectRecordDeleted) {
return;
}
if (_writeTrack)
_newWrites.put(key, record);
}
@Override
public CloseableKvIterator<JObjectKey, JData> getIterator(IteratorStart start, JObjectKey key) {
Log.tracev("Getting tx iterator with start={0}, key={1}", start, key);
return new ReadTrackingIterator(new TombstoneSkippingIterator<JObjectKey, ReadTrackingInternalCrap>(start, key,
ListUtils.prependAndMap(
new MappingKvIterator<>(new NavigableMapKvIterator<>(_writes, start, key),
t -> switch (t) {
case TxRecord.TxObjectRecordWrite<?> write ->
new DataWrapper<ReadTrackingInternalCrap>(new ReadTrackingInternalCrapTx(write.data()));
case TxRecord.TxObjectRecordDeleted deleted ->
new TombstoneImpl<ReadTrackingInternalCrap>();
case null, default -> null;
}),
_snapshot.getIterator(start, key),
itin -> new MappingKvIterator<JObjectKey, MaybeTombstone<JDataVersionedWrapper>, MaybeTombstone<ReadTrackingInternalCrap>>(itin,
d -> switch (d) {
case Data<JDataVersionedWrapper> w ->
new DataWrapper<>(new ReadTrackingInternalCrapSource(w.value()));
case Tombstone<JDataVersionedWrapper> t -> new TombstoneImpl<>();
case null, default -> null;
}))));
}
@Override
public void put(JData obj) {
var key = obj.key();
var read = _readSet.get(key);
if (read != null && (read.map(JDataVersionedWrapper::data).orElse(null) == obj)) {
return;
}
var record = new TxRecord.TxObjectRecordWrite<>(obj);
_writes.put(key, record);
if (_writeTrack)
_newWrites.put(key, record);
}
@Override
public void putNew(JData obj) {
var key = obj.key();
_knownNew.add(key);
var record = new TxRecord.TxObjectRecordWrite<>(obj);
_writes.put(key, record);
if (_writeTrack)
_newWrites.put(key, record);
}
@Override
public Collection<TxRecord.TxObjectRecord<?>> drainNewWrites() {
if (!_writeTrack) {
_writeTrack = true;
return Collections.unmodifiableCollection(_writes.values());
}
var ret = _newWrites;
_newWrites = new HashMap<>();
return ret.values();
}
@Override
public Map<JObjectKey, Optional<JDataVersionedWrapper>> reads() {
return _readSet;
}
@Override
public Set<JObjectKey> knownNew() {
return _knownNew;
}
@Override
public void close() {
if (_closed) return;
_closed = true;
_snapshot.close();
}
private class ReadTrackingIterator implements CloseableKvIterator<JObjectKey, JData> {
private final CloseableKvIterator<JObjectKey, ReadTrackingInternalCrap> _backing;
public ReadTrackingIterator(CloseableKvIterator<JObjectKey, ReadTrackingInternalCrap> backing) {
_backing = backing;
}
@Override
public JObjectKey peekNextKey() {
return _backing.peekNextKey();
}
@Override
public void skip() {
_backing.skip();
}
@Override
public JObjectKey peekPrevKey() {
return _backing.peekPrevKey();
}
@Override
public Pair<JObjectKey, JData> prev() {
var got = _backing.prev();
if (got.getValue() instanceof ReadTrackingInternalCrapSource(JDataVersionedWrapper wrapped)) {
_readSet.putIfAbsent(got.getKey(), Optional.of(wrapped));
}
return Pair.of(got.getKey(), got.getValue().obj());
}
@Override
public boolean hasPrev() {
return _backing.hasPrev();
}
@Override
public void skipPrev() {
_backing.skipPrev();
}
@Override
public void close() {
_backing.close();
}
@Override
public boolean hasNext() {
return _backing.hasNext();
}
@Override
public Pair<JObjectKey, JData> next() {
var got = _backing.next();
if (got.getValue() instanceof ReadTrackingInternalCrapSource(JDataVersionedWrapper wrapped)) {
_readSet.putIfAbsent(got.getKey(), Optional.of(wrapped));
}
return Pair.of(got.getKey(), got.getValue().obj());
}
}
}
}

View File

@@ -1,7 +0,0 @@
package com.usatiuk.objects.transaction;
import java.util.Collection;
public interface TransactionHandlePrivate extends TransactionHandle {
Collection<Runnable> getOnFlush();
}

View File

@@ -0,0 +1,241 @@
package com.usatiuk.objects.transaction;
import com.usatiuk.objects.JData;
import com.usatiuk.objects.JDataVersionedWrapper;
import com.usatiuk.objects.JObjectKey;
import com.usatiuk.objects.iterators.*;
import com.usatiuk.objects.snapshot.Snapshot;
import com.usatiuk.utils.AutoCloseableNoThrow;
import com.usatiuk.utils.ListUtils;
import io.quarkus.logging.Log;
import org.apache.commons.lang3.tuple.Pair;
import java.util.*;
class TransactionImpl implements Transaction, AutoCloseableNoThrow {
private final Map<JObjectKey, Optional<JDataVersionedWrapper>> _readSet = new HashMap<>();
private final NavigableMap<JObjectKey, TxRecord.TxObjectRecord<?>> _writes = new TreeMap<>();
private final List<Runnable> _onCommit = new LinkedList<>();
private final List<Runnable> _onFlush = new LinkedList<>();
private final HashSet<JObjectKey> _knownNew = new HashSet<>();
private final Snapshot<JObjectKey, JDataVersionedWrapper> _snapshot;
private boolean _closed = false;
private boolean _writeTrack = false;
private Map<JObjectKey, TxRecord.TxObjectRecord<?>> _newWrites = new HashMap<>();
private interface ReadTrackingInternalCrap {
boolean fromSource();
JData obj();
}
// FIXME:
private record ReadTrackingInternalCrapSource(JDataVersionedWrapper wrapped) implements ReadTrackingInternalCrap {
@Override
public boolean fromSource() {
return true;
}
@Override
public JData obj() {
return wrapped.data();
}
}
private record ReadTrackingInternalCrapTx(JData obj) implements ReadTrackingInternalCrap {
@Override
public boolean fromSource() {
return false;
}
}
TransactionImpl(Snapshot<JObjectKey, JDataVersionedWrapper> snapshot) {
_snapshot = snapshot;
}
@Override
public void onCommit(Runnable runnable) {
_onCommit.add(runnable);
}
@Override
public void onFlush(Runnable runnable) {
_onFlush.add(runnable);
}
Collection<Runnable> getOnCommit() {
return Collections.unmodifiableCollection(_onCommit);
}
Snapshot<JObjectKey, JDataVersionedWrapper> snapshot() {
return _snapshot;
}
Collection<Runnable> getOnFlush() {
return Collections.unmodifiableCollection(_onFlush);
}
<T extends JData> Optional<T> getFromSource(Class<T> type, JObjectKey key) {
if (_knownNew.contains(key)) {
return Optional.empty();
}
return _readSet.computeIfAbsent(key, _snapshot::readObject)
.map(JDataVersionedWrapper::data)
.map(type::cast);
}
@Override
public <T extends JData> Optional<T> get(Class<T> type, JObjectKey key) {
return switch (_writes.get(key)) {
case TxRecord.TxObjectRecordWrite<?> write -> Optional.of(type.cast(write.data()));
case TxRecord.TxObjectRecordDeleted deleted -> Optional.empty();
case null -> getFromSource(type, key);
};
}
@Override
public void delete(JObjectKey key) {
var record = new TxRecord.TxObjectRecordDeleted(key);
if (_writes.put(key, record) instanceof TxRecord.TxObjectRecordDeleted) {
return;
}
if (_writeTrack)
_newWrites.put(key, record);
}
@Override
public CloseableKvIterator<JObjectKey, JData> getIterator(IteratorStart start, JObjectKey key) {
Log.tracev("Getting tx iterator with start={0}, key={1}", start, key);
return new ReadTrackingIterator(new TombstoneSkippingIterator<JObjectKey, ReadTrackingInternalCrap>(start, key,
ListUtils.prependAndMap(
new MappingKvIterator<>(new NavigableMapKvIterator<>(_writes, start, key),
t -> switch (t) {
case TxRecord.TxObjectRecordWrite<?> write ->
new DataWrapper<ReadTrackingInternalCrap>(new ReadTrackingInternalCrapTx(write.data()));
case TxRecord.TxObjectRecordDeleted deleted ->
new TombstoneImpl<ReadTrackingInternalCrap>();
case null, default -> null;
}),
_snapshot.getIterator(start, key),
itin -> new MappingKvIterator<JObjectKey, MaybeTombstone<JDataVersionedWrapper>, MaybeTombstone<ReadTrackingInternalCrap>>(itin,
d -> switch (d) {
case Data<JDataVersionedWrapper> w ->
new DataWrapper<>(new ReadTrackingInternalCrapSource(w.value()));
case Tombstone<JDataVersionedWrapper> t -> new TombstoneImpl<>();
case null, default -> null;
}))));
}
@Override
public void put(JData obj) {
var key = obj.key();
var read = _readSet.get(key);
if (read != null && (read.map(JDataVersionedWrapper::data).orElse(null) == obj)) {
return;
}
var record = new TxRecord.TxObjectRecordWrite<>(obj);
_writes.put(key, record);
if (_writeTrack)
_newWrites.put(key, record);
}
@Override
public void putNew(JData obj) {
var key = obj.key();
_knownNew.add(key);
var record = new TxRecord.TxObjectRecordWrite<>(obj);
_writes.put(key, record);
if (_writeTrack)
_newWrites.put(key, record);
}
Collection<TxRecord.TxObjectRecord<?>> drainNewWrites() {
if (!_writeTrack) {
_writeTrack = true;
return Collections.unmodifiableCollection(_writes.values());
}
var ret = _newWrites;
_newWrites = new HashMap<>();
return ret.values();
}
Map<JObjectKey, Optional<JDataVersionedWrapper>> reads() {
return _readSet;
}
Set<JObjectKey> knownNew() {
return _knownNew;
}
@Override
public void close() {
if (_closed) return;
_closed = true;
_snapshot.close();
}
private class ReadTrackingIterator implements CloseableKvIterator<JObjectKey, JData> {
private final CloseableKvIterator<JObjectKey, ReadTrackingInternalCrap> _backing;
public ReadTrackingIterator(CloseableKvIterator<JObjectKey, ReadTrackingInternalCrap> backing) {
_backing = backing;
}
@Override
public JObjectKey peekNextKey() {
return _backing.peekNextKey();
}
@Override
public void skip() {
_backing.skip();
}
@Override
public JObjectKey peekPrevKey() {
return _backing.peekPrevKey();
}
@Override
public Pair<JObjectKey, JData> prev() {
var got = _backing.prev();
if (got.getValue() instanceof ReadTrackingInternalCrapSource(JDataVersionedWrapper wrapped)) {
_readSet.putIfAbsent(got.getKey(), Optional.of(wrapped));
}
return Pair.of(got.getKey(), got.getValue().obj());
}
@Override
public boolean hasPrev() {
return _backing.hasPrev();
}
@Override
public void skipPrev() {
_backing.skipPrev();
}
@Override
public void close() {
_backing.close();
}
@Override
public boolean hasNext() {
return _backing.hasNext();
}
@Override
public Pair<JObjectKey, JData> next() {
var got = _backing.next();
if (got.getValue() instanceof ReadTrackingInternalCrapSource(JDataVersionedWrapper wrapped)) {
_readSet.putIfAbsent(got.getKey(), Optional.of(wrapped));
}
return Pair.of(got.getKey(), got.getValue().obj());
}
}
}

View File

@@ -11,14 +11,14 @@ import java.util.Stack;
@Singleton
public class TransactionManagerImpl implements TransactionManager {
private static final ThreadLocal<Stack<TransactionPrivate>> _currentTransaction = ThreadLocal.withInitial(Stack::new);
private static final ThreadLocal<Stack<TransactionImpl>> _currentTransaction = ThreadLocal.withInitial(Stack::new);
@Inject
JObjectManager jObjectManager;
TransactionService transactionService;
@Override
public void begin() {
Log.trace("Starting transaction");
var tx = jObjectManager.createTransaction();
var tx = transactionService.createTransaction();
_currentTransaction.get().push(tx);
}
@@ -34,7 +34,7 @@ public class TransactionManagerImpl implements TransactionManager {
Pair<Collection<Runnable>, TransactionHandle> ret;
try {
ret = jObjectManager.commit(peeked);
ret = transactionService.commit(peeked);
} catch (Throwable e) {
Log.trace("Transaction commit failed", e);
throw e;
@@ -64,7 +64,7 @@ public class TransactionManagerImpl implements TransactionManager {
var peeked = stack.peek();
try {
jObjectManager.rollback(peeked);
transactionService.rollback(peeked);
} catch (Throwable e) {
Log.error("Transaction rollback failed", e);
throw e;

View File

@@ -1,27 +0,0 @@
package com.usatiuk.objects.transaction;
import com.usatiuk.objects.JData;
import com.usatiuk.objects.JDataVersionedWrapper;
import com.usatiuk.objects.JObjectKey;
import com.usatiuk.objects.snapshot.Snapshot;
import com.usatiuk.utils.AutoCloseableNoThrow;
import java.util.Collection;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
// The transaction interface actually used by user code to retrieve objects
public interface TransactionPrivate extends Transaction, TransactionHandlePrivate, AutoCloseableNoThrow {
Collection<TxRecord.TxObjectRecord<?>> drainNewWrites();
Map<JObjectKey, Optional<JDataVersionedWrapper>> reads();
Set<JObjectKey> knownNew();
<T extends JData> Optional<T> getFromSource(Class<T> type, JObjectKey key);
Collection<Runnable> getOnCommit();
Snapshot<JObjectKey, JDataVersionedWrapper> snapshot();
}

View File

@@ -22,12 +22,10 @@ import java.util.function.Consumer;
import java.util.function.Function;
@ApplicationScoped
public class JObjectManager {
public class TransactionService {
private static final List<PreCommitTxHook> _preCommitTxHooks;
@Inject
WritebackObjectPersistentStore writebackObjectPersistentStore;
@Inject
TransactionFactory transactionFactory;
private boolean _ready = false;
private final DataLocker _objLocker = new DataLocker();
@@ -36,7 +34,7 @@ public class JObjectManager {
_preCommitTxHooks = List.copyOf(CDI.current().select(PreCommitTxHook.class).stream().sorted(Comparator.comparingInt(PreCommitTxHook::getPriority)).toList());
}
JObjectManager(Instance<PreCommitTxHook> preCommitTxHooks) {
TransactionService(Instance<PreCommitTxHook> preCommitTxHooks) {
Log.debugv("Pre-commit hooks: {0}", String.join("->", _preCommitTxHooks.stream().map(Objects::toString).toList()));
}
@@ -48,14 +46,14 @@ public class JObjectManager {
_ready = true;
}
public TransactionPrivate createTransaction() {
public TransactionImpl createTransaction() {
verifyReady();
var tx = transactionFactory.createTransaction();
var tx = new TransactionImpl(writebackObjectPersistentStore.getSnapshot());
Log.tracev("Created transaction with snapshotId={0}", tx.snapshot().id());
return tx;
}
public Pair<Collection<Runnable>, TransactionHandle> commit(TransactionPrivate tx) {
public Pair<Collection<Runnable>, TransactionHandle> commit(TransactionImpl tx) {
verifyReady();
var writes = new HashMap<JObjectKey, TxRecord.TxObjectRecord<?>>();
Snapshot<JObjectKey, JDataVersionedWrapper> commitSnapshot = null;
@@ -259,7 +257,7 @@ public class JObjectManager {
}
}
public void rollback(TransactionPrivate tx) {
public void rollback(TransactionImpl tx) {
verifyReady();
tx.close();
}