mirror of
https://github.com/usatiuk/dhfs.git
synced 2025-10-28 20:47:49 +01:00
fix deadlock and possible inconsistency in transactions
This commit is contained in:
@@ -18,6 +18,7 @@ import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
// Manages all access to com.usatiuk.dhfs.objects.JData objects.
|
||||
// In particular, it serves as a source of truth for what is committed to the backing storage.
|
||||
@@ -130,27 +131,22 @@ public class JObjectManager {
|
||||
});
|
||||
};
|
||||
|
||||
Function<JObjectKey, JData> getCurrent =
|
||||
key -> switch (current.get(key)) {
|
||||
case TxRecord.TxObjectRecordWrite<?> write -> write.data();
|
||||
case TxRecord.TxObjectRecordDeleted deleted -> null;
|
||||
case null -> {
|
||||
var dep = dependenciesLocked.get(key);
|
||||
if (dep == null) {
|
||||
throw new TxCommitException("No dependency for " + key);
|
||||
}
|
||||
yield dep.data.map(JDataVersionedWrapper::data).orElse(null);
|
||||
}
|
||||
default -> {
|
||||
throw new TxCommitException("Unexpected value: " + current.get(key));
|
||||
}
|
||||
};
|
||||
|
||||
// For existing objects:
|
||||
// Check that their version is not higher than the version of transaction being committed
|
||||
// TODO: check deletions, inserts
|
||||
try {
|
||||
try {
|
||||
Function<JObjectKey, JData> getCurrent =
|
||||
key -> switch (current.get(key)) {
|
||||
case TxRecord.TxObjectRecordWrite<?> write -> write.data();
|
||||
case TxRecord.TxObjectRecordDeleted deleted -> null;
|
||||
case null ->
|
||||
tx.readSource().get(JData.class, key).data().map(JDataVersionedWrapper::data).orElse(null);
|
||||
default -> {
|
||||
throw new TxCommitException("Unexpected value: " + current.get(key));
|
||||
}
|
||||
};
|
||||
|
||||
boolean somethingChanged;
|
||||
do {
|
||||
somethingChanged = false;
|
||||
@@ -160,10 +156,6 @@ public class JObjectManager {
|
||||
currentIteration.put(n.key(), n);
|
||||
Log.trace("Commit iteration with " + currentIteration.size() + " records for hook " + hook.getClass());
|
||||
|
||||
currentIteration.keySet().stream()
|
||||
.sorted(Comparator.comparing(JObjectKey::toString))
|
||||
.forEach(addDependency);
|
||||
|
||||
for (var entry : currentIteration.entrySet()) {
|
||||
// FIXME: Kinda hack?
|
||||
if (entry.getKey().equals(JDataDummy.TX_ID_OBJ_NAME)) {
|
||||
@@ -191,8 +183,12 @@ public class JObjectManager {
|
||||
} while (somethingChanged);
|
||||
} finally {
|
||||
reads = tx.reads();
|
||||
|
||||
Stream.concat(reads.keySet().stream(), current.keySet().stream())
|
||||
.sorted(Comparator.comparing(JObjectKey::toString))
|
||||
.forEach(addDependency);
|
||||
|
||||
for (var read : reads.entrySet()) {
|
||||
addDependency.accept(read.getKey());
|
||||
if (read.getValue() instanceof TransactionObjectLocked<?> locked) {
|
||||
toUnlock.add(locked.lock);
|
||||
}
|
||||
@@ -207,7 +203,13 @@ public class JObjectManager {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (dep.data().orElse(null) != read.getValue().data().orElse(null)) {
|
||||
Log.trace("Checking dependency " + read.getKey() + " - changed already");
|
||||
throw new TxCommitException("Serialization hazard: " + dep.data().get().version() + " vs " + tx.getId());
|
||||
}
|
||||
|
||||
if (dep.data().get().version() >= tx.getId()) {
|
||||
assert false;
|
||||
Log.trace("Checking dependency " + read.getKey() + " - newer than");
|
||||
throw new TxCommitException("Serialization hazard: " + dep.data().get().version() + " vs " + tx.getId());
|
||||
}
|
||||
|
||||
@@ -97,6 +97,11 @@ public class TransactionFactoryImpl implements TransactionFactory {
|
||||
public Map<JObjectKey, TransactionObject<?>> reads() {
|
||||
return _source.getRead();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ReadTrackingObjectSource readSource() {
|
||||
return _source;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -10,4 +10,6 @@ public interface TransactionPrivate extends Transaction {
|
||||
Collection<TxRecord.TxObjectRecord<?>> drainNewWrites();
|
||||
|
||||
Map<JObjectKey, TransactionObject<?>> reads();
|
||||
|
||||
ReadTrackingObjectSource readSource();
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user