Objects: cleanup transaction put a little

This commit is contained in:
2025-05-02 12:50:43 +02:00
parent 6e9a2b25f6
commit 70fecb389b
4 changed files with 55 additions and 53 deletions

View File

@@ -3,9 +3,11 @@ package com.usatiuk.objects.stores;
import com.usatiuk.objects.JDataVersionedWrapper;
import com.usatiuk.objects.JDataVersionedWrapperImpl;
import com.usatiuk.objects.JObjectKey;
import com.usatiuk.objects.iterators.*;
import com.usatiuk.objects.iterators.CloseableKvIterator;
import com.usatiuk.objects.iterators.IteratorStart;
import com.usatiuk.objects.iterators.MaybeTombstone;
import com.usatiuk.objects.iterators.NavigableMapKvIterator;
import com.usatiuk.objects.snapshot.Snapshot;
import com.usatiuk.objects.transaction.TxCommitException;
import com.usatiuk.objects.transaction.TxRecord;
import com.usatiuk.utils.ListUtils;
import io.quarkus.logging.Log;
@@ -25,7 +27,6 @@ import javax.annotation.Nonnull;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
@@ -218,17 +219,18 @@ public class WritebackObjectPersistentStore {
var curPwMap = curPw.pendingWrites();
for (var action : writes) {
var key = action.key();
switch (action) {
case TxRecord.TxObjectRecordWrite<?> write -> {
// Log.tracev("Flushing object {0}", write.key());
var wrapper = new JDataVersionedWrapperImpl(write.data(), oursId);
curPwMap = curPwMap.plus(write.key(), new PendingWrite(wrapper, oursId));
curPwMap = curPwMap.plus(key, new PendingWrite(wrapper, oursId));
curBundle.commit(wrapper);
}
case TxRecord.TxObjectRecordDeleted deleted -> {
// Log.tracev("Deleting object {0}", deleted.key());
curPwMap = curPwMap.plus(deleted.key(), new PendingDelete(deleted.key(), oursId));
curBundle.delete(deleted.key());
curPwMap = curPwMap.plus(key, new PendingDelete(key, oursId));
curBundle.delete(key);
}
}
}

View File

@@ -19,7 +19,6 @@ import org.apache.commons.lang3.tuple.Pair;
import java.util.*;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Stream;
@ApplicationScoped
public class JObjectManager {
@@ -75,11 +74,12 @@ public class JObjectManager {
}
for (var n : tx.drainNewWrites()) {
var key = n.key();
for (var hookPut : hookIterationData) {
hookPut.pendingWrites().put(n.key(), n);
hookPut.pendingWrites().put(key, n);
pendingCount++;
}
writes.put(n.key(), n);
writes.put(key, n);
}
@@ -108,19 +108,20 @@ public class JObjectManager {
// Log.trace("Commit iteration with " + curIteration.size() + " records for hook " + hook.getClass());
for (var entry : curIteration.entrySet()) {
var key = entry.getKey();
// Log.trace("Running pre-commit hook " + hook.getClass() + " for" + entry.getKey());
var oldObj = getPrev.apply(entry.getKey());
lastCurHookSeen.put(entry.getKey(), entry.getValue());
var oldObj = getPrev.apply(key);
lastCurHookSeen.put(key, entry.getValue());
switch (entry.getValue()) {
case TxRecord.TxObjectRecordWrite<?> write -> {
if (oldObj == null) {
hook.onCreate(write.key(), write.data());
hook.onCreate(key, write.data());
} else {
hook.onChange(write.key(), oldObj, write.data());
hook.onChange(key, oldObj, write.data());
}
}
case TxRecord.TxObjectRecordDeleted deleted -> {
hook.onDelete(deleted.key(), oldObj);
hook.onDelete(key, oldObj);
}
default -> throw new TxCommitException("Unexpected value: " + entry);
}
@@ -130,16 +131,17 @@ public class JObjectManager {
curIteration.clear();
for (var n : tx.drainNewWrites()) {
var key = n.key();
for (var hookPut : hookIterationData) {
if (hookPut == hookId) {
lastCurHookSeen.put(n.key(), n);
lastCurHookSeen.put(key, n);
continue;
}
var before = hookPut.pendingWrites().put(n.key(), n);
var before = hookPut.pendingWrites().put(key, n);
if (before == null)
pendingCount++;
}
writes.put(n.key(), n);
writes.put(key, n);
}
}
}

View File

@@ -63,6 +63,8 @@ public class TransactionFactoryImpl implements TransactionFactory {
private final HashSet<JObjectKey> _knownNew = new HashSet<>();
private final Snapshot<JObjectKey, JDataVersionedWrapper> _snapshot;
private boolean _closed = false;
private boolean _writeTrack = false;
private Map<JObjectKey, TxRecord.TxObjectRecord<?>> _newWrites = new HashMap<>();
private TransactionImpl() {
@@ -99,43 +101,29 @@ public class TransactionFactoryImpl implements TransactionFactory {
if (_knownNew.contains(key)) {
return Optional.empty();
}
var got = _readSet.computeIfAbsent(key, k -> {
var read = _snapshot.readObject(k);
return read;
});
if (got.isEmpty())
return Optional.empty();
var gotData = got.get();
return Optional.of(type.cast(gotData.data()));
return _readSet.computeIfAbsent(key, _snapshot::readObject)
.map(JDataVersionedWrapper::data)
.map(type::cast);
}
@Override
public <T extends JData> Optional<T> get(Class<T> type, JObjectKey key, LockingStrategy strategy) {
switch (_writes.get(key)) {
case TxRecord.TxObjectRecordWrite<?> write -> {
return Optional.of(type.cast(write.data()));
}
case TxRecord.TxObjectRecordDeleted deleted -> {
return Optional.empty();
}
case null, default -> {
}
}
return getFromSource(type, key);
return switch (_writes.get(key)) {
case TxRecord.TxObjectRecordWrite<?> write -> Optional.of(type.cast(write.data()));
case TxRecord.TxObjectRecordDeleted deleted -> Optional.empty();
case null -> getFromSource(type, key);
};
}
@Override
public void delete(JObjectKey key) {
var got = _writes.get(key);
if (got != null) {
if (got instanceof TxRecord.TxObjectRecordDeleted) {
return;
}
var record = new TxRecord.TxObjectRecordDeleted(key);
if (_writes.put(key, record) instanceof TxRecord.TxObjectRecordDeleted) {
return;
}
_writes.put(key, new TxRecord.TxObjectRecordDeleted(key));
_newWrites.put(key, new TxRecord.TxObjectRecordDeleted(key));
if (_writeTrack)
_newWrites.put(key, record);
}
@Override
@@ -163,25 +151,35 @@ public class TransactionFactoryImpl implements TransactionFactory {
@Override
public void put(JData obj) {
var read = _readSet.get(obj.key());
var key = obj.key();
var read = _readSet.get(key);
if (read != null && (read.map(JDataVersionedWrapper::data).orElse(null) == obj)) {
return;
}
_writes.put(obj.key(), new TxRecord.TxObjectRecordWrite<>(obj));
_newWrites.put(obj.key(), new TxRecord.TxObjectRecordWrite<>(obj));
var record = new TxRecord.TxObjectRecordWrite<>(obj);
_writes.put(key, record);
if (_writeTrack)
_newWrites.put(key, record);
}
@Override
public void putNew(JData obj) {
_knownNew.add(obj.key());
var key = obj.key();
_knownNew.add(key);
_writes.put(obj.key(), new TxRecord.TxObjectRecordWrite<>(obj));
_newWrites.put(obj.key(), new TxRecord.TxObjectRecordWrite<>(obj));
var record = new TxRecord.TxObjectRecordWrite<>(obj);
_writes.put(key, record);
if (_writeTrack)
_newWrites.put(key, record);
}
@Override
public Collection<TxRecord.TxObjectRecord<?>> drainNewWrites() {
if (!_writeTrack) {
_writeTrack = true;
return Collections.unmodifiableCollection(_writes.values());
}
var ret = _newWrites;
_newWrites = new HashMap<>();
return ret.values();

View File

@@ -1,6 +1,6 @@
dhfs.objects.persistence=lmdb
dhfs.objects.writeback.limit=134217728
dhfs.objects.lru.limit=134217728
dhfs.objects.writeback.limit=67108864
dhfs.objects.lru.limit=67108864
dhfs.objects.lru.print-stats=false
dhfs.objects.lock_timeout_secs=15
dhfs.objects.persistence.files.root=${HOME}/dhfs_default/data/objs