From 5c905379ae6a6427a92bb58aafeca3df92d0d1d4 Mon Sep 17 00:00:00 2001 From: Stepan Usatiuk Date: Tue, 11 Mar 2025 23:36:39 +0100 Subject: [PATCH] Objects: hopefully fix the file write deadlock as some other random files could get into read set, and a bit of refactoring --- .../usatiuk/dhfs/objects/JObjectManager.java | 6 +- .../LmdbObjectPersistentStore.java | 3 +- .../ReadTrackingObjectSourceFactory.java | 138 ---------------- .../ReadTrackingTransactionObjectSource.java | 26 --- .../transaction/TransactionFactoryImpl.java | 154 +++++++++++++++--- .../transaction/TransactionPrivate.java | 4 +- 6 files changed, 142 insertions(+), 189 deletions(-) delete mode 100644 dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/ReadTrackingObjectSourceFactory.java delete mode 100644 dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/ReadTrackingTransactionObjectSource.java diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/JObjectManager.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/JObjectManager.java index e1dc0e5f..9fbdd63c 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/JObjectManager.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/JObjectManager.java @@ -71,11 +71,11 @@ public class JObjectManager { // TODO: check deletions, inserts try { try { - Function getCurrent = + Function getPrev = key -> switch (writes.get(key)) { case TxRecord.TxObjectRecordWrite write -> write.data(); case TxRecord.TxObjectRecordDeleted deleted -> null; - case null -> tx.readSource().get(JData.class, key).orElse(null); + case null -> tx.getFromSource(JData.class, key).orElse(null); default -> { throw new TxCommitException("Unexpected value: " + writes.get(key)); } @@ -93,7 +93,7 @@ public class JObjectManager { for (var entry : currentIteration.entrySet()) { somethingChanged = true; Log.trace("Running pre-commit hook " + hook.getClass() + " for" + entry.getKey()); - var oldObj = getCurrent.apply(entry.getKey()); + var oldObj = getPrev.apply(entry.getKey()); switch (entry.getValue()) { case TxRecord.TxObjectRecordWrite write -> { if (oldObj == null) { diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/persistence/LmdbObjectPersistentStore.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/persistence/LmdbObjectPersistentStore.java index f14d07ae..17ee7360 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/persistence/LmdbObjectPersistentStore.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/persistence/LmdbObjectPersistentStore.java @@ -123,7 +123,8 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore { private static final Cleaner CLEANER = Cleaner.create(); private final MutableObject _closed = new MutableObject<>(false); - private final Exception _allocationStacktrace = new Exception(); + // private final Exception _allocationStacktrace = new Exception(); + private final Exception _allocationStacktrace = null; LmdbKvIterator(IteratorStart start, JObjectKey key) { _goingForward = true; diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/ReadTrackingObjectSourceFactory.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/ReadTrackingObjectSourceFactory.java deleted file mode 100644 index e609081b..00000000 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/ReadTrackingObjectSourceFactory.java +++ /dev/null @@ -1,138 +0,0 @@ -package com.usatiuk.dhfs.objects.transaction; - -import com.usatiuk.dhfs.objects.*; -import com.usatiuk.dhfs.objects.persistence.IteratorStart; -import com.usatiuk.dhfs.objects.snapshot.SnapshotManager; -import jakarta.enterprise.context.ApplicationScoped; -import jakarta.inject.Inject; -import org.apache.commons.lang3.tuple.Pair; - -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; -import java.util.Optional; - -@ApplicationScoped -public class ReadTrackingObjectSourceFactory { - @Inject - LockManager lockManager; - - public ReadTrackingTransactionObjectSource create(SnapshotManager.Snapshot snapshot) { - return new ReadTrackingObjectSourceImpl(snapshot); - } - - public class ReadTrackingObjectSourceImpl implements ReadTrackingTransactionObjectSource { - private final SnapshotManager.Snapshot _snapshot; - - private final Map> _readSet = new HashMap<>(); - - public ReadTrackingObjectSourceImpl(SnapshotManager.Snapshot snapshot) { - _snapshot = snapshot; - } - - public Map> getRead() { - return Collections.unmodifiableMap(_readSet); - } - - @Override - public Optional get(Class type, JObjectKey key) { - var got = _readSet.get(key); - - if (got == null) { - var read = _snapshot.readObject(key); - _readSet.put(key, new TransactionObjectNoLock<>(read)); - return read.map(JDataVersionedWrapper::data).map(type::cast); - } - - return got.data().map(JDataVersionedWrapper::data).map(type::cast); - } - - @Override - public Optional getWriteLocked(Class type, JObjectKey key) { - var got = _readSet.get(key); - - if (got == null) { - var lock = lockManager.lockObject(key); - try { - var read = _snapshot.readObject(key); - _readSet.put(key, new TransactionObjectLocked<>(read, lock)); - return read.map(JDataVersionedWrapper::data).map(type::cast); - } catch (Exception e) { - lock.close(); - throw e; - } - } - - return got.data().map(JDataVersionedWrapper::data).map(type::cast); - } - - @Override - public void close() { -// for (var it : _iterators) { -// it.close(); -// } - } - - private class ReadTrackingIterator implements CloseableKvIterator { - private final CloseableKvIterator _backing; - - public ReadTrackingIterator(IteratorStart start, JObjectKey key) { - _backing = _snapshot.getIterator(start, key); - } - - @Override - public JObjectKey peekNextKey() { - return _backing.peekNextKey(); - } - - @Override - public void skip() { - _backing.skip(); - } - - @Override - public JObjectKey peekPrevKey() { - return _backing.peekPrevKey(); - } - - @Override - public Pair prev() { - var got = _backing.prev(); - _readSet.putIfAbsent(got.getKey(), new TransactionObjectNoLock<>(Optional.of(got.getValue()))); - return Pair.of(got.getKey(), got.getValue().data()); - } - - @Override - public boolean hasPrev() { - return _backing.hasPrev(); - } - - @Override - public void skipPrev() { - _backing.skipPrev(); - } - - @Override - public void close() { - _backing.close(); - } - - @Override - public boolean hasNext() { - return _backing.hasNext(); - } - - @Override - public Pair next() { - var got = _backing.next(); - _readSet.putIfAbsent(got.getKey(), new TransactionObjectNoLock<>(Optional.of(got.getValue()))); - return Pair.of(got.getKey(), got.getValue().data()); - } - } - - @Override - public CloseableKvIterator getIterator(IteratorStart start, JObjectKey key) { - return new ReadTrackingIterator(start, key); - } - } -} \ No newline at end of file diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/ReadTrackingTransactionObjectSource.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/ReadTrackingTransactionObjectSource.java deleted file mode 100644 index 171ea1d4..00000000 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/ReadTrackingTransactionObjectSource.java +++ /dev/null @@ -1,26 +0,0 @@ -package com.usatiuk.dhfs.objects.transaction; - -import com.usatiuk.dhfs.objects.CloseableKvIterator; -import com.usatiuk.dhfs.objects.JData; -import com.usatiuk.dhfs.objects.JObjectKey; -import com.usatiuk.dhfs.objects.persistence.IteratorStart; -import com.usatiuk.dhfs.utils.AutoCloseableNoThrow; -import org.apache.commons.lang3.tuple.Pair; - -import java.util.Iterator; -import java.util.Map; -import java.util.Optional; - -public interface ReadTrackingTransactionObjectSource extends AutoCloseableNoThrow { - Optional get(Class type, JObjectKey key); - - Optional getWriteLocked(Class type, JObjectKey key); - - CloseableKvIterator getIterator(IteratorStart start, JObjectKey key); - - default CloseableKvIterator getIterator(JObjectKey key) { - return getIterator(IteratorStart.GE, key); - } - - Map> getRead(); -} diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/TransactionFactoryImpl.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/TransactionFactoryImpl.java index 331fb033..9288fb5a 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/TransactionFactoryImpl.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/TransactionFactoryImpl.java @@ -6,6 +6,7 @@ import com.usatiuk.dhfs.objects.snapshot.SnapshotManager; import io.quarkus.logging.Log; import jakarta.enterprise.context.ApplicationScoped; import jakarta.inject.Inject; +import org.apache.commons.lang3.tuple.Pair; import javax.annotation.Nonnull; import java.util.*; @@ -15,16 +16,41 @@ public class TransactionFactoryImpl implements TransactionFactory { @Inject SnapshotManager snapshotManager; @Inject - ReadTrackingObjectSourceFactory readTrackingObjectSourceFactory; + LockManager lockManager; @Override public TransactionPrivate createTransaction() { return new TransactionImpl(); } - private class TransactionImpl implements TransactionPrivate { - private final ReadTrackingTransactionObjectSource _source; + private interface ReadTrackingInternalCrap { + boolean fromSource(); + JData obj(); + } + + // FIXME: + private record ReadTrackingInternalCrapSource(JDataVersionedWrapper wrapped) implements ReadTrackingInternalCrap { + @Override + public boolean fromSource() { + return true; + } + + @Override + public JData obj() { + return wrapped.data(); + } + } + + private record ReadTrackingInternalCrapTx(JData obj) implements ReadTrackingInternalCrap { + @Override + public boolean fromSource() { + return false; + } + } + + private class TransactionImpl implements TransactionPrivate { + private final Map> _readSet = new HashMap<>(); private final NavigableMap> _writes = new TreeMap<>(); private Map> _newWrites = new HashMap<>(); @@ -34,7 +60,67 @@ public class TransactionFactoryImpl implements TransactionFactory { private TransactionImpl() { _snapshot = snapshotManager.createSnapshot(); - _source = readTrackingObjectSourceFactory.create(_snapshot); + } + + private class ReadTrackingIterator implements CloseableKvIterator { + private final CloseableKvIterator _backing; + + public ReadTrackingIterator(CloseableKvIterator backing) { + _backing = backing; + } + + @Override + public JObjectKey peekNextKey() { + return _backing.peekNextKey(); + } + + @Override + public void skip() { + _backing.skip(); + } + + @Override + public JObjectKey peekPrevKey() { + return _backing.peekPrevKey(); + } + + @Override + public Pair prev() { + var got = _backing.prev(); + if (got.getValue() instanceof ReadTrackingInternalCrapSource(JDataVersionedWrapper wrapped)) { + _readSet.putIfAbsent(got.getKey(), new TransactionObjectNoLock<>(Optional.of(wrapped))); + } + return Pair.of(got.getKey(), got.getValue().obj()); + } + + @Override + public boolean hasPrev() { + return _backing.hasPrev(); + } + + @Override + public void skipPrev() { + _backing.skipPrev(); + } + + @Override + public void close() { + _backing.close(); + } + + @Override + public boolean hasNext() { + return _backing.hasNext(); + } + + @Override + public Pair next() { + var got = _backing.next(); + if (got.getValue() instanceof ReadTrackingInternalCrapSource(JDataVersionedWrapper wrapped)) { + _readSet.putIfAbsent(got.getKey(), new TransactionObjectNoLock<>(Optional.of(wrapped))); + } + return Pair.of(got.getKey(), got.getValue().obj()); + } } @Override @@ -62,6 +148,37 @@ public class TransactionFactoryImpl implements TransactionFactory { return Collections.unmodifiableCollection(_onFlush); } + @Override + public Optional getFromSource(Class type, JObjectKey key) { + var got = _readSet.get(key); + + if (got == null) { + var read = _snapshot.readObject(key); + _readSet.put(key, new TransactionObjectNoLock<>(read)); + return read.map(JDataVersionedWrapper::data).map(type::cast); + } + + return got.data().map(JDataVersionedWrapper::data).map(type::cast); + } + + public Optional getWriteLockedFromSource(Class type, JObjectKey key) { + var got = _readSet.get(key); + + if (got == null) { + var lock = lockManager.lockObject(key); + try { + var read = _snapshot.readObject(key); + _readSet.put(key, new TransactionObjectLocked<>(read, lock)); + return read.map(JDataVersionedWrapper::data).map(type::cast); + } catch (Exception e) { + lock.close(); + throw e; + } + } + + return got.data().map(JDataVersionedWrapper::data).map(type::cast); + } + @Override public Optional get(Class type, JObjectKey key, LockingStrategy strategy) { switch (_writes.get(key)) { @@ -76,8 +193,8 @@ public class TransactionFactoryImpl implements TransactionFactory { } return switch (strategy) { - case OPTIMISTIC -> _source.get(type, key); - case WRITE -> _source.getWriteLocked(type, key); + case OPTIMISTIC -> getFromSource(type, key); + case WRITE -> getWriteLockedFromSource(type, key); }; } @@ -104,13 +221,16 @@ public class TransactionFactoryImpl implements TransactionFactory { @Override public CloseableKvIterator getIterator(IteratorStart start, JObjectKey key) { Log.tracev("Getting tx iterator with start={0}, key={1}", start, key); - return new TombstoneMergingKvIterator<>("tx", start, key, - (tS, tK) -> new MappingKvIterator<>(new NavigableMapKvIterator<>(_writes, tS, tK), t -> switch (t) { - case TxRecord.TxObjectRecordWrite write -> new Data<>(write.data()); - case TxRecord.TxObjectRecordDeleted deleted -> new Tombstone<>(); - case null, default -> null; - }), - (tS, tK) -> new MappingKvIterator<>(_source.getIterator(tS, tK), Data::new)); + return new ReadTrackingIterator(new TombstoneMergingKvIterator<>("tx", start, key, + (tS, tK) -> new MappingKvIterator<>(new NavigableMapKvIterator<>(_writes, tS, tK), + t -> switch (t) { + case TxRecord.TxObjectRecordWrite write -> + new Data<>(new ReadTrackingInternalCrapTx(write.data())); + case TxRecord.TxObjectRecordDeleted deleted -> new Tombstone<>(); + case null, default -> null; + }), + (tS, tK) -> new MappingKvIterator<>(_snapshot.getIterator(tS, tK), + d -> new Data(new ReadTrackingInternalCrapSource(d))))); } @Override @@ -128,17 +248,11 @@ public class TransactionFactoryImpl implements TransactionFactory { @Override public Map> reads() { - return _source.getRead(); - } - - @Override - public ReadTrackingTransactionObjectSource readSource() { - return _source; + return Collections.unmodifiableMap(_readSet); } @Override public void close() { - _source.close(); _snapshot.close(); } } diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/TransactionPrivate.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/TransactionPrivate.java index 766a3a63..be59ad3f 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/TransactionPrivate.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/TransactionPrivate.java @@ -1,11 +1,13 @@ package com.usatiuk.dhfs.objects.transaction; +import com.usatiuk.dhfs.objects.JData; import com.usatiuk.dhfs.objects.JObjectKey; import com.usatiuk.dhfs.objects.snapshot.SnapshotManager; import com.usatiuk.dhfs.utils.AutoCloseableNoThrow; import java.util.Collection; import java.util.Map; +import java.util.Optional; // The transaction interface actually used by user code to retrieve objects public interface TransactionPrivate extends Transaction, TransactionHandlePrivate, AutoCloseableNoThrow { @@ -13,7 +15,7 @@ public interface TransactionPrivate extends Transaction, TransactionHandlePrivat Map> reads(); - ReadTrackingTransactionObjectSource readSource(); + Optional getFromSource(Class type, JObjectKey key); Collection getOnCommit();