Objects: hopefully fix the file write deadlock

as some other random files could get into read set,
and a bit of refactoring
This commit is contained in:
2025-03-11 23:36:39 +01:00
parent 66bf4b7d18
commit 5c905379ae
6 changed files with 142 additions and 189 deletions

View File

@@ -71,11 +71,11 @@ public class JObjectManager {
// TODO: check deletions, inserts
try {
try {
Function<JObjectKey, JData> getCurrent =
Function<JObjectKey, JData> getPrev =
key -> switch (writes.get(key)) {
case TxRecord.TxObjectRecordWrite<?> write -> write.data();
case TxRecord.TxObjectRecordDeleted deleted -> null;
case null -> tx.readSource().get(JData.class, key).orElse(null);
case null -> tx.getFromSource(JData.class, key).orElse(null);
default -> {
throw new TxCommitException("Unexpected value: " + writes.get(key));
}
@@ -93,7 +93,7 @@ public class JObjectManager {
for (var entry : currentIteration.entrySet()) {
somethingChanged = true;
Log.trace("Running pre-commit hook " + hook.getClass() + " for" + entry.getKey());
var oldObj = getCurrent.apply(entry.getKey());
var oldObj = getPrev.apply(entry.getKey());
switch (entry.getValue()) {
case TxRecord.TxObjectRecordWrite<?> write -> {
if (oldObj == null) {

View File

@@ -123,7 +123,8 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore {
private static final Cleaner CLEANER = Cleaner.create();
private final MutableObject<Boolean> _closed = new MutableObject<>(false);
private final Exception _allocationStacktrace = new Exception();
// private final Exception _allocationStacktrace = new Exception();
private final Exception _allocationStacktrace = null;
LmdbKvIterator(IteratorStart start, JObjectKey key) {
_goingForward = true;

View File

@@ -1,138 +0,0 @@
package com.usatiuk.dhfs.objects.transaction;
import com.usatiuk.dhfs.objects.*;
import com.usatiuk.dhfs.objects.persistence.IteratorStart;
import com.usatiuk.dhfs.objects.snapshot.SnapshotManager;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import org.apache.commons.lang3.tuple.Pair;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
@ApplicationScoped
public class ReadTrackingObjectSourceFactory {
@Inject
LockManager lockManager;
public ReadTrackingTransactionObjectSource create(SnapshotManager.Snapshot snapshot) {
return new ReadTrackingObjectSourceImpl(snapshot);
}
public class ReadTrackingObjectSourceImpl implements ReadTrackingTransactionObjectSource {
private final SnapshotManager.Snapshot _snapshot;
private final Map<JObjectKey, TransactionObject<?>> _readSet = new HashMap<>();
public ReadTrackingObjectSourceImpl(SnapshotManager.Snapshot snapshot) {
_snapshot = snapshot;
}
public Map<JObjectKey, TransactionObject<?>> getRead() {
return Collections.unmodifiableMap(_readSet);
}
@Override
public <T extends JData> Optional<T> get(Class<T> type, JObjectKey key) {
var got = _readSet.get(key);
if (got == null) {
var read = _snapshot.readObject(key);
_readSet.put(key, new TransactionObjectNoLock<>(read));
return read.map(JDataVersionedWrapper::data).map(type::cast);
}
return got.data().map(JDataVersionedWrapper::data).map(type::cast);
}
@Override
public <T extends JData> Optional<T> getWriteLocked(Class<T> type, JObjectKey key) {
var got = _readSet.get(key);
if (got == null) {
var lock = lockManager.lockObject(key);
try {
var read = _snapshot.readObject(key);
_readSet.put(key, new TransactionObjectLocked<>(read, lock));
return read.map(JDataVersionedWrapper::data).map(type::cast);
} catch (Exception e) {
lock.close();
throw e;
}
}
return got.data().map(JDataVersionedWrapper::data).map(type::cast);
}
@Override
public void close() {
// for (var it : _iterators) {
// it.close();
// }
}
private class ReadTrackingIterator implements CloseableKvIterator<JObjectKey, JData> {
private final CloseableKvIterator<JObjectKey, JDataVersionedWrapper> _backing;
public ReadTrackingIterator(IteratorStart start, JObjectKey key) {
_backing = _snapshot.getIterator(start, key);
}
@Override
public JObjectKey peekNextKey() {
return _backing.peekNextKey();
}
@Override
public void skip() {
_backing.skip();
}
@Override
public JObjectKey peekPrevKey() {
return _backing.peekPrevKey();
}
@Override
public Pair<JObjectKey, JData> prev() {
var got = _backing.prev();
_readSet.putIfAbsent(got.getKey(), new TransactionObjectNoLock<>(Optional.of(got.getValue())));
return Pair.of(got.getKey(), got.getValue().data());
}
@Override
public boolean hasPrev() {
return _backing.hasPrev();
}
@Override
public void skipPrev() {
_backing.skipPrev();
}
@Override
public void close() {
_backing.close();
}
@Override
public boolean hasNext() {
return _backing.hasNext();
}
@Override
public Pair<JObjectKey, JData> next() {
var got = _backing.next();
_readSet.putIfAbsent(got.getKey(), new TransactionObjectNoLock<>(Optional.of(got.getValue())));
return Pair.of(got.getKey(), got.getValue().data());
}
}
@Override
public CloseableKvIterator<JObjectKey, JData> getIterator(IteratorStart start, JObjectKey key) {
return new ReadTrackingIterator(start, key);
}
}
}

View File

@@ -1,26 +0,0 @@
package com.usatiuk.dhfs.objects.transaction;
import com.usatiuk.dhfs.objects.CloseableKvIterator;
import com.usatiuk.dhfs.objects.JData;
import com.usatiuk.dhfs.objects.JObjectKey;
import com.usatiuk.dhfs.objects.persistence.IteratorStart;
import com.usatiuk.dhfs.utils.AutoCloseableNoThrow;
import org.apache.commons.lang3.tuple.Pair;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
public interface ReadTrackingTransactionObjectSource extends AutoCloseableNoThrow {
<T extends JData> Optional<T> get(Class<T> type, JObjectKey key);
<T extends JData> Optional<T> getWriteLocked(Class<T> type, JObjectKey key);
CloseableKvIterator<JObjectKey, JData> getIterator(IteratorStart start, JObjectKey key);
default CloseableKvIterator<JObjectKey, JData> getIterator(JObjectKey key) {
return getIterator(IteratorStart.GE, key);
}
Map<JObjectKey, TransactionObject<?>> getRead();
}

View File

@@ -6,6 +6,7 @@ import com.usatiuk.dhfs.objects.snapshot.SnapshotManager;
import io.quarkus.logging.Log;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import org.apache.commons.lang3.tuple.Pair;
import javax.annotation.Nonnull;
import java.util.*;
@@ -15,16 +16,41 @@ public class TransactionFactoryImpl implements TransactionFactory {
@Inject
SnapshotManager snapshotManager;
@Inject
ReadTrackingObjectSourceFactory readTrackingObjectSourceFactory;
LockManager lockManager;
@Override
public TransactionPrivate createTransaction() {
return new TransactionImpl();
}
private class TransactionImpl implements TransactionPrivate {
private final ReadTrackingTransactionObjectSource _source;
private interface ReadTrackingInternalCrap {
boolean fromSource();
JData obj();
}
// FIXME:
private record ReadTrackingInternalCrapSource(JDataVersionedWrapper wrapped) implements ReadTrackingInternalCrap {
@Override
public boolean fromSource() {
return true;
}
@Override
public JData obj() {
return wrapped.data();
}
}
private record ReadTrackingInternalCrapTx(JData obj) implements ReadTrackingInternalCrap {
@Override
public boolean fromSource() {
return false;
}
}
private class TransactionImpl implements TransactionPrivate {
private final Map<JObjectKey, TransactionObject<?>> _readSet = new HashMap<>();
private final NavigableMap<JObjectKey, TxRecord.TxObjectRecord<?>> _writes = new TreeMap<>();
private Map<JObjectKey, TxRecord.TxObjectRecord<?>> _newWrites = new HashMap<>();
@@ -34,7 +60,67 @@ public class TransactionFactoryImpl implements TransactionFactory {
private TransactionImpl() {
_snapshot = snapshotManager.createSnapshot();
_source = readTrackingObjectSourceFactory.create(_snapshot);
}
private class ReadTrackingIterator implements CloseableKvIterator<JObjectKey, JData> {
private final CloseableKvIterator<JObjectKey, ReadTrackingInternalCrap> _backing;
public ReadTrackingIterator(CloseableKvIterator<JObjectKey, ReadTrackingInternalCrap> backing) {
_backing = backing;
}
@Override
public JObjectKey peekNextKey() {
return _backing.peekNextKey();
}
@Override
public void skip() {
_backing.skip();
}
@Override
public JObjectKey peekPrevKey() {
return _backing.peekPrevKey();
}
@Override
public Pair<JObjectKey, JData> prev() {
var got = _backing.prev();
if (got.getValue() instanceof ReadTrackingInternalCrapSource(JDataVersionedWrapper wrapped)) {
_readSet.putIfAbsent(got.getKey(), new TransactionObjectNoLock<>(Optional.of(wrapped)));
}
return Pair.of(got.getKey(), got.getValue().obj());
}
@Override
public boolean hasPrev() {
return _backing.hasPrev();
}
@Override
public void skipPrev() {
_backing.skipPrev();
}
@Override
public void close() {
_backing.close();
}
@Override
public boolean hasNext() {
return _backing.hasNext();
}
@Override
public Pair<JObjectKey, JData> next() {
var got = _backing.next();
if (got.getValue() instanceof ReadTrackingInternalCrapSource(JDataVersionedWrapper wrapped)) {
_readSet.putIfAbsent(got.getKey(), new TransactionObjectNoLock<>(Optional.of(wrapped)));
}
return Pair.of(got.getKey(), got.getValue().obj());
}
}
@Override
@@ -62,6 +148,37 @@ public class TransactionFactoryImpl implements TransactionFactory {
return Collections.unmodifiableCollection(_onFlush);
}
@Override
public <T extends JData> Optional<T> getFromSource(Class<T> type, JObjectKey key) {
var got = _readSet.get(key);
if (got == null) {
var read = _snapshot.readObject(key);
_readSet.put(key, new TransactionObjectNoLock<>(read));
return read.map(JDataVersionedWrapper::data).map(type::cast);
}
return got.data().map(JDataVersionedWrapper::data).map(type::cast);
}
public <T extends JData> Optional<T> getWriteLockedFromSource(Class<T> type, JObjectKey key) {
var got = _readSet.get(key);
if (got == null) {
var lock = lockManager.lockObject(key);
try {
var read = _snapshot.readObject(key);
_readSet.put(key, new TransactionObjectLocked<>(read, lock));
return read.map(JDataVersionedWrapper::data).map(type::cast);
} catch (Exception e) {
lock.close();
throw e;
}
}
return got.data().map(JDataVersionedWrapper::data).map(type::cast);
}
@Override
public <T extends JData> Optional<T> get(Class<T> type, JObjectKey key, LockingStrategy strategy) {
switch (_writes.get(key)) {
@@ -76,8 +193,8 @@ public class TransactionFactoryImpl implements TransactionFactory {
}
return switch (strategy) {
case OPTIMISTIC -> _source.get(type, key);
case WRITE -> _source.getWriteLocked(type, key);
case OPTIMISTIC -> getFromSource(type, key);
case WRITE -> getWriteLockedFromSource(type, key);
};
}
@@ -104,13 +221,16 @@ public class TransactionFactoryImpl implements TransactionFactory {
@Override
public CloseableKvIterator<JObjectKey, JData> getIterator(IteratorStart start, JObjectKey key) {
Log.tracev("Getting tx iterator with start={0}, key={1}", start, key);
return new TombstoneMergingKvIterator<>("tx", start, key,
(tS, tK) -> new MappingKvIterator<>(new NavigableMapKvIterator<>(_writes, tS, tK), t -> switch (t) {
case TxRecord.TxObjectRecordWrite<?> write -> new Data<>(write.data());
case TxRecord.TxObjectRecordDeleted deleted -> new Tombstone<>();
case null, default -> null;
}),
(tS, tK) -> new MappingKvIterator<>(_source.getIterator(tS, tK), Data::new));
return new ReadTrackingIterator(new TombstoneMergingKvIterator<>("tx", start, key,
(tS, tK) -> new MappingKvIterator<>(new NavigableMapKvIterator<>(_writes, tS, tK),
t -> switch (t) {
case TxRecord.TxObjectRecordWrite<?> write ->
new Data<>(new ReadTrackingInternalCrapTx(write.data()));
case TxRecord.TxObjectRecordDeleted deleted -> new Tombstone<>();
case null, default -> null;
}),
(tS, tK) -> new MappingKvIterator<>(_snapshot.getIterator(tS, tK),
d -> new Data<ReadTrackingInternalCrap>(new ReadTrackingInternalCrapSource(d)))));
}
@Override
@@ -128,17 +248,11 @@ public class TransactionFactoryImpl implements TransactionFactory {
@Override
public Map<JObjectKey, TransactionObject<?>> reads() {
return _source.getRead();
}
@Override
public ReadTrackingTransactionObjectSource readSource() {
return _source;
return Collections.unmodifiableMap(_readSet);
}
@Override
public void close() {
_source.close();
_snapshot.close();
}
}

View File

@@ -1,11 +1,13 @@
package com.usatiuk.dhfs.objects.transaction;
import com.usatiuk.dhfs.objects.JData;
import com.usatiuk.dhfs.objects.JObjectKey;
import com.usatiuk.dhfs.objects.snapshot.SnapshotManager;
import com.usatiuk.dhfs.utils.AutoCloseableNoThrow;
import java.util.Collection;
import java.util.Map;
import java.util.Optional;
// The transaction interface actually used by user code to retrieve objects
public interface TransactionPrivate extends Transaction, TransactionHandlePrivate, AutoCloseableNoThrow {
@@ -13,7 +15,7 @@ public interface TransactionPrivate extends Transaction, TransactionHandlePrivat
Map<JObjectKey, TransactionObject<?>> reads();
ReadTrackingTransactionObjectSource readSource();
<T extends JData> Optional<T> getFromSource(Class<T> type, JObjectKey key);
Collection<Runnable> getOnCommit();