mirror of
https://github.com/usatiuk/dhfs.git
synced 2025-10-29 04:57:48 +01:00
Objects: better onflush for no write transactions
This commit is contained in:
@@ -1,29 +0,0 @@
|
||||
package com.usatiuk.objects.snapshot;
|
||||
|
||||
import com.usatiuk.objects.JDataVersionedWrapper;
|
||||
import com.usatiuk.objects.JObjectKey;
|
||||
import com.usatiuk.objects.stores.WritebackObjectPersistentStore;
|
||||
import com.usatiuk.objects.transaction.TxRecord;
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Singleton;
|
||||
|
||||
import javax.annotation.Nonnull;
|
||||
import java.util.Collection;
|
||||
import java.util.Optional;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
@Singleton
|
||||
public class SnapshotManager {
|
||||
@Inject
|
||||
WritebackObjectPersistentStore writebackStore;
|
||||
|
||||
public Snapshot<JObjectKey, JDataVersionedWrapper> createSnapshot() {
|
||||
return writebackStore.getSnapshot();
|
||||
}
|
||||
|
||||
// This should not be called for the same objects concurrently
|
||||
public Consumer<Runnable> commitTx(Collection<TxRecord.TxObjectRecord<?>> writes) {
|
||||
return writebackStore.commitTx(writes);
|
||||
}
|
||||
}
|
||||
@@ -1,12 +1,11 @@
|
||||
package com.usatiuk.objects.transaction;
|
||||
|
||||
import com.google.common.collect.Streams;
|
||||
import com.usatiuk.dhfs.utils.AutoCloseableNoThrow;
|
||||
import com.usatiuk.objects.JData;
|
||||
import com.usatiuk.objects.JDataVersionedWrapper;
|
||||
import com.usatiuk.objects.JObjectKey;
|
||||
import com.usatiuk.objects.snapshot.Snapshot;
|
||||
import com.usatiuk.objects.snapshot.SnapshotManager;
|
||||
import com.usatiuk.objects.stores.WritebackObjectPersistentStore;
|
||||
import io.quarkus.logging.Log;
|
||||
import io.quarkus.runtime.StartupEvent;
|
||||
import jakarta.annotation.Priority;
|
||||
@@ -14,10 +13,10 @@ import jakarta.enterprise.context.ApplicationScoped;
|
||||
import jakarta.enterprise.event.Observes;
|
||||
import jakarta.enterprise.inject.Instance;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Singleton;
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
@@ -31,7 +30,7 @@ public class JObjectManager {
|
||||
}
|
||||
|
||||
@Inject
|
||||
SnapshotManager snapshotManager;
|
||||
WritebackObjectPersistentStore writebackObjectPersistentStore;
|
||||
@Inject
|
||||
TransactionFactory transactionFactory;
|
||||
@Inject
|
||||
@@ -176,25 +175,36 @@ public class JObjectManager {
|
||||
toUnlock.add(lock);
|
||||
}
|
||||
|
||||
commitSnapshot = snapshotManager.createSnapshot();
|
||||
commitSnapshot = writebackObjectPersistentStore.getSnapshot();
|
||||
} else {
|
||||
Log.trace("Committing transaction - no changes");
|
||||
|
||||
long version = 0L;
|
||||
|
||||
for (var read : readSet.values()) {
|
||||
version = Math.max(version, read.data().map(JDataVersionedWrapper::version).orElse(0L));
|
||||
if (read instanceof TransactionObjectLocked<?> locked) {
|
||||
locked.lock().close();
|
||||
}
|
||||
}
|
||||
|
||||
long finalVersion = version;
|
||||
Consumer<Runnable> fenceFn = r -> {
|
||||
writebackObjectPersistentStore.asyncFence(finalVersion, r);
|
||||
};
|
||||
|
||||
return Pair.of(
|
||||
Stream.concat(
|
||||
tx.getOnCommit().stream(),
|
||||
tx.getOnFlush().stream()
|
||||
Stream.<Runnable>of(() -> {
|
||||
for (var f : tx.getOnFlush())
|
||||
fenceFn.accept(f);
|
||||
})
|
||||
).toList(),
|
||||
new TransactionHandle() {
|
||||
@Override
|
||||
public void onFlush(Runnable runnable) {
|
||||
runnable.run();
|
||||
fenceFn.accept(runnable);
|
||||
}
|
||||
});
|
||||
}
|
||||
@@ -229,8 +239,9 @@ public class JObjectManager {
|
||||
Log.tracev("Skipped dependency checks: no changes");
|
||||
}
|
||||
|
||||
var addFlushCallback = snapshotManager.commitTx(writes.values());
|
||||
var addFlushCallback = writebackObjectPersistentStore.commitTx(writes.values());
|
||||
|
||||
// TODO: is it ok to possibly run it inside transaction?
|
||||
for (var callback : tx.getOnFlush()) {
|
||||
addFlushCallback.accept(callback);
|
||||
}
|
||||
|
||||
@@ -5,7 +5,7 @@ import com.usatiuk.objects.JDataVersionedWrapper;
|
||||
import com.usatiuk.objects.JObjectKey;
|
||||
import com.usatiuk.objects.iterators.*;
|
||||
import com.usatiuk.objects.snapshot.Snapshot;
|
||||
import com.usatiuk.objects.snapshot.SnapshotManager;
|
||||
import com.usatiuk.objects.stores.WritebackObjectPersistentStore;
|
||||
import io.quarkus.logging.Log;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Singleton;
|
||||
@@ -17,7 +17,7 @@ import java.util.*;
|
||||
@Singleton
|
||||
public class TransactionFactoryImpl implements TransactionFactory {
|
||||
@Inject
|
||||
SnapshotManager snapshotManager;
|
||||
WritebackObjectPersistentStore writebackObjectPersistentStore;
|
||||
@Inject
|
||||
LockManager lockManager;
|
||||
@ConfigProperty(name = "dhfs.objects.transaction.never-lock")
|
||||
@@ -64,7 +64,7 @@ public class TransactionFactoryImpl implements TransactionFactory {
|
||||
private Map<JObjectKey, TxRecord.TxObjectRecord<?>> _newWrites = new HashMap<>();
|
||||
|
||||
private TransactionImpl() {
|
||||
_snapshot = snapshotManager.createSnapshot();
|
||||
_snapshot = writebackObjectPersistentStore.getSnapshot();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
Reference in New Issue
Block a user