fix stuff not being deleted

This commit is contained in:
2025-02-16 11:03:30 +01:00
parent 00ee6f3135
commit bcd55835ca
13 changed files with 98 additions and 23 deletions

View File

@@ -17,6 +17,16 @@ public class CurrentTransaction implements Transaction {
return transactionManager.current().getId();
}
@Override
public void onCommit(Runnable runnable) {
transactionManager.current().onCommit(runnable);
}
@Override
public void onFlush(Runnable runnable) {
transactionManager.current().onFlush(runnable);
}
@Override
public <T extends JData> Optional<T> get(Class<T> type, JObjectKey key, LockingStrategy strategy) {
return transactionManager.current().get(type, key, strategy);

View File

@@ -110,7 +110,7 @@ public class JObjectManager {
return transactionFactory.createTransaction(counter, new TransactionObjectSourceImpl(counter));
}
public void commit(TransactionPrivate tx) {
public TransactionHandle commit(TransactionPrivate tx) {
verifyReady();
Log.trace("Committing transaction " + tx.getId());
// FIXME: Better way?
@@ -243,7 +243,27 @@ public class JObjectManager {
}
Log.tracef("Committing transaction %d to storage", tx.getId());
writebackObjectPersistentStore.commitTx(current.values(), tx.getId());
var addFlushCallback = writebackObjectPersistentStore.commitTx(current.values(), tx.getId());
for (var callback : tx.getOnCommit()) {
callback.run();
}
for (var callback : tx.getOnFlush()) {
addFlushCallback.accept(callback);
}
return new TransactionHandle() {
@Override
public long getId() {
return tx.getId();
}
@Override
public void onFlush(Runnable runnable) {
addFlushCallback.accept(runnable);
}
};
} catch (Throwable t) {
Log.trace("Error when committing transaction", t);
throw new TxCommitException(t.getMessage(), t);

View File

@@ -1,6 +1,7 @@
package com.usatiuk.dhfs.objects;
import com.usatiuk.dhfs.objects.transaction.Transaction;
import com.usatiuk.dhfs.objects.transaction.TransactionHandle;
import com.usatiuk.dhfs.utils.VoidFn;
import io.quarkus.logging.Log;
@@ -9,7 +10,7 @@ import java.util.function.Supplier;
public interface TransactionManager {
void begin();
void commit();
TransactionHandle commit();
void rollback();

View File

@@ -1,6 +1,7 @@
package com.usatiuk.dhfs.objects;
import com.usatiuk.dhfs.objects.transaction.Transaction;
import com.usatiuk.dhfs.objects.transaction.TransactionHandle;
import com.usatiuk.dhfs.objects.transaction.TransactionPrivate;
import io.quarkus.logging.Log;
import jakarta.enterprise.context.ApplicationScoped;
@@ -24,14 +25,14 @@ public class TransactionManagerImpl implements TransactionManager {
}
@Override
public void commit() {
public TransactionHandle commit() {
if (_currentTransaction.get() == null) {
throw new IllegalStateException("No transaction started");
}
Log.trace("Committing transaction");
try {
jObjectManager.commit(_currentTransaction.get());
return jObjectManager.commit(_currentTransaction.get());
} catch (Throwable e) {
Log.trace("Transaction commit failed", e);
throw e;

View File

@@ -17,7 +17,7 @@ public interface TxWriteback {
// Executes callback after bundle with bundleId id has been persisted
// if it was already, runs callback on the caller thread
void asyncFence(long bundleId, VoidFn callback);
void asyncFence(long bundleId, Runnable callback);
interface PendingWriteEntry {
long bundleId();

View File

@@ -145,14 +145,14 @@ public class TxWritebackImpl implements TxWriteback {
});
}
List<List<VoidFn>> callbacks = new ArrayList<>();
List<List<Runnable>> callbacks = new ArrayList<>();
synchronized (_notFlushedBundles) {
_lastWrittenTx.set(bundle.getId());
while (!_notFlushedBundles.isEmpty() && _notFlushedBundles.firstEntry().getKey() <= bundle.getId()) {
callbacks.add(_notFlushedBundles.pollFirstEntry().getValue().setCommitted());
}
}
callbacks.forEach(l -> l.forEach(VoidFn::apply));
callbacks.forEach(l -> l.forEach(Runnable::run));
synchronized (_flushWaitSynchronizer) {
currentSize -= bundle.calculateTotalSize();
@@ -278,16 +278,16 @@ public class TxWritebackImpl implements TxWriteback {
}
@Override
public void asyncFence(long bundleId, VoidFn fn) {
public void asyncFence(long bundleId, Runnable fn) {
verifyReady();
if (bundleId < 0) throw new IllegalArgumentException("txId should be >0!");
if (_lastWrittenTx.get() >= bundleId) {
fn.apply();
fn.run();
return;
}
synchronized (_notFlushedBundles) {
if (_lastWrittenTx.get() >= bundleId) {
fn.apply();
fn.run();
return;
}
_notFlushedBundles.get(bundleId).addCallback(fn);
@@ -296,7 +296,7 @@ public class TxWritebackImpl implements TxWriteback {
private class TxBundleImpl implements TxBundle {
private final LinkedHashMap<JObjectKey, BundleEntry> _entries = new LinkedHashMap<>();
private final ArrayList<VoidFn> _callbacks = new ArrayList<>();
private final ArrayList<Runnable> _callbacks = new ArrayList<>();
private long _txId;
private volatile boolean _ready = false;
private long _size = -1;
@@ -315,14 +315,14 @@ public class TxWritebackImpl implements TxWriteback {
_ready = true;
}
public void addCallback(VoidFn callback) {
public void addCallback(Runnable callback) {
synchronized (_callbacks) {
if (_wasCommitted) throw new IllegalStateException();
_callbacks.add(callback);
}
}
public List<VoidFn> setCommitted() {
public List<Runnable> setCommitted() {
synchronized (_callbacks) {
_wasCommitted = true;
return Collections.unmodifiableList(_callbacks);

View File

@@ -9,6 +9,7 @@ import jakarta.inject.Inject;
import javax.annotation.Nonnull;
import java.util.Collection;
import java.util.Optional;
import java.util.function.Consumer;
@ApplicationScoped
public class WritebackObjectPersistentStore {
@@ -33,7 +34,7 @@ public class WritebackObjectPersistentStore {
};
}
void commitTx(Collection<TxRecord.TxObjectRecord<?>> writes, long id) {
Consumer<Runnable> commitTx(Collection<TxRecord.TxObjectRecord<?>> writes, long id) {
var bundle = txWriteback.createBundle();
try {
for (var action : writes) {
@@ -58,5 +59,9 @@ public class WritebackObjectPersistentStore {
Log.tracef("Committing transaction %d to storage", id);
txWriteback.commitBundle(bundle);
long bundleId = bundle.getId();
return r -> txWriteback.asyncFence(bundleId, r);
}
}

View File

@@ -6,9 +6,11 @@ import com.usatiuk.dhfs.objects.JObjectKey;
import java.util.Optional;
// The transaction interface actually used by user code to retrieve objects
public interface Transaction {
public interface Transaction extends TransactionHandle {
long getId();
void onCommit(Runnable runnable);
<T extends JData> Optional<T> get(Class<T> type, JObjectKey key, LockingStrategy strategy);
<T extends JData> void put(JData obj);

View File

@@ -5,10 +5,7 @@ import com.usatiuk.dhfs.objects.JDataVersionedWrapper;
import com.usatiuk.dhfs.objects.JObjectKey;
import jakarta.enterprise.context.ApplicationScoped;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.*;
@ApplicationScoped
public class TransactionFactoryImpl implements TransactionFactory {
@@ -22,6 +19,9 @@ public class TransactionFactoryImpl implements TransactionFactory {
private final ReadTrackingObjectSource _source;
private final Map<JObjectKey, TxRecord.TxObjectRecord<?>> _writes = new HashMap<>();
private Map<JObjectKey, TxRecord.TxObjectRecord<?>> _newWrites = new HashMap<>();
private final List<Runnable> _onCommit = new ArrayList<>();
private final List<Runnable> _onFlush = new ArrayList<>();
private TransactionImpl(long id, TransactionObjectSource source) {
_id = id;
_source = new ReadTrackingObjectSource(source);
@@ -31,6 +31,26 @@ public class TransactionFactoryImpl implements TransactionFactory {
return _id;
}
@Override
public void onCommit(Runnable runnable) {
_onCommit.add(runnable);
}
@Override
public void onFlush(Runnable runnable) {
_onFlush.add(runnable);
}
@Override
public Collection<Runnable> getOnCommit() {
return Collections.unmodifiableCollection(_onCommit);
}
@Override
public Collection<Runnable> getOnFlush() {
return Collections.unmodifiableCollection(_onFlush);
}
@Override
public <T extends JData> Optional<T> get(Class<T> type, JObjectKey key, LockingStrategy strategy) {
switch (_writes.get(key)) {

View File

@@ -0,0 +1,7 @@
package com.usatiuk.dhfs.objects.transaction;
public interface TransactionHandle {
long getId();
void onFlush(Runnable runnable);
}

View File

@@ -0,0 +1,7 @@
package com.usatiuk.dhfs.objects.transaction;
import java.util.Collection;
public interface TransactionHandlePrivate extends TransactionHandle {
Collection<Runnable> getOnFlush();
}

View File

@@ -6,10 +6,12 @@ import java.util.Collection;
import java.util.Map;
// The transaction interface actually used by user code to retrieve objects
public interface TransactionPrivate extends Transaction {
public interface TransactionPrivate extends Transaction, TransactionHandlePrivate {
Collection<TxRecord.TxObjectRecord<?>> drainNewWrites();
Map<JObjectKey, TransactionObject<?>> reads();
ReadTrackingObjectSource readSource();
Collection<Runnable> getOnCommit();
}

View File

@@ -23,7 +23,7 @@ public class DeleterTxHook implements PreCommitTxHook {
}
if (canDelete(refCur)) {
if (refCur instanceof RemoteObjectMeta ro) {
remoteObjectDeleter.putDeletionCandidate(ro);
curTx.onCommit(() -> remoteObjectDeleter.putDeletionCandidate(ro));
return;
}
Log.trace("Deleting object on change: " + key);
@@ -39,7 +39,7 @@ public class DeleterTxHook implements PreCommitTxHook {
if (canDelete(refCur)) {
if (refCur instanceof RemoteObjectMeta ro) {
remoteObjectDeleter.putDeletionCandidate(ro);
curTx.onCommit(() -> remoteObjectDeleter.putDeletionCandidate(ro));
return;
}
Log.warn("Deleting object on creation: " + key);