commit retry

This commit is contained in:
2025-01-02 11:06:06 +01:00
parent 7b71d405e1
commit 2aa07b205f
3 changed files with 43 additions and 24 deletions

View File

@@ -185,12 +185,12 @@ public class JObjectManager {
case null -> {
var dep = dependenciesLocked.get(key);
if (dep == null) {
throw new IllegalStateException("No dependency for " + key);
throw new TxCommitException("No dependency for " + key);
}
yield dep.data.map(JDataVersionedWrapper::data).orElse(null);
}
default -> {
throw new IllegalStateException("Unexpected value: " + current.get(key));
throw new TxCommitException("Unexpected value: " + current.get(key));
}
};
@@ -227,7 +227,7 @@ public class JObjectManager {
case TxRecord.TxObjectRecordDeleted deleted -> {
hook.onDelete(deleted.key(), oldObj);
}
default -> throw new IllegalStateException("Unexpected value: " + entry);
default -> throw new TxCommitException("Unexpected value: " + entry);
}
current.put(entry.key(), entry);
}
@@ -250,13 +250,13 @@ public class JObjectManager {
if (dep.getValue().data().get().version() >= tx.getId()) {
Log.trace("Checking dependency " + dep.getKey() + " - newer than");
throw new IllegalStateException("Serialization hazard: " + dep.getValue().data().get().version() + " vs " + tx.getId());
throw new TxCommitException("Serialization hazard: " + dep.getValue().data().get().version() + " vs " + tx.getId());
}
var read = reads.get(dep.getKey());
if (read != null && read.data().orElse(null) != dep.getValue().data().orElse(null)) {
Log.trace("Checking dependency " + dep.getKey() + " - read mismatch");
throw new IllegalStateException("Read mismatch for " + dep.getKey() + ": " + read + " vs " + dep.getValue());
throw new TxCommitException("Read mismatch for " + dep.getKey() + ": " + read + " vs " + dep.getValue());
}
Log.trace("Checking dependency " + dep.getKey() + " - ok");
@@ -265,32 +265,35 @@ public class JObjectManager {
Log.tracef("Flushing transaction %d to storage", tx.getId());
var bundle = txWriteback.createBundle();
for (var action : current.entrySet()) {
switch (action.getValue()) {
case TxRecord.TxObjectRecordWrite<?> write -> {
Log.trace("Flushing object " + action.getKey());
var wrapped = new JDataVersionedWrapper<>(write.data(), tx.getId());
bundle.commit(wrapped);
_objects.put(action.getKey(), new JDataWrapper<>(wrapped));
}
case TxRecord.TxObjectRecordDeleted deleted -> {
Log.trace("Deleting object " + action.getKey());
bundle.delete(action.getKey());
_objects.remove(action.getKey());
}
default -> {
throw new IllegalStateException("Unexpected value: " + action.getValue());
try {
for (var action : current.entrySet()) {
switch (action.getValue()) {
case TxRecord.TxObjectRecordWrite<?> write -> {
Log.trace("Flushing object " + action.getKey());
var wrapped = new JDataVersionedWrapper<>(write.data(), tx.getId());
bundle.commit(wrapped);
_objects.put(action.getKey(), new JDataWrapper<>(wrapped));
}
case TxRecord.TxObjectRecordDeleted deleted -> {
Log.trace("Deleting object " + action.getKey());
bundle.delete(action.getKey());
_objects.remove(action.getKey());
}
default -> {
throw new TxCommitException("Unexpected value: " + action.getValue());
}
}
}
} catch (Throwable t) {
txWriteback.dropBundle(bundle);
throw new TxCommitException(t.getMessage(), t);
}
Log.tracef("Committing transaction %d to storage", tx.getId());
txWriteback.commitBundle(bundle);
} catch (
Throwable t) {
} catch (Throwable t) {
Log.error("Error when committing transaction", t);
throw t;
throw new TxCommitException(t.getMessage(), t);
} finally {
for (var unlock : toUnlock) {
unlock.close();

View File

@@ -22,6 +22,8 @@ public interface TransactionManager {
var ret = supplier.get();
commit();
return ret;
} catch (TxCommitException txCommitException) {
return run(supplier);
} catch (Throwable e) {
rollback();
throw e;
@@ -38,6 +40,9 @@ public interface TransactionManager {
try {
fn.apply();
commit();
} catch (TxCommitException txCommitException) {
run(fn);
return;
} catch (Throwable e) {
rollback();
throw e;

View File

@@ -0,0 +1,11 @@
package com.usatiuk.dhfs.objects;
public class TxCommitException extends RuntimeException {
public TxCommitException(String message) {
super(message);
}
public TxCommitException(String message, Throwable cause) {
super(message, cause);
}
}