Objects: remove prepareTx

This commit is contained in:
2025-05-07 16:12:47 +02:00
parent 56ab3bad4c
commit de211bb2d2
5 changed files with 18 additions and 36 deletions

View File

@@ -31,7 +31,6 @@ public class CachingObjectPersistentStore {
SerializingObjectPersistentStore delegate; SerializingObjectPersistentStore delegate;
@ConfigProperty(name = "dhfs.objects.lru.print-stats") @ConfigProperty(name = "dhfs.objects.lru.print-stats")
boolean printStats; boolean printStats;
private ExecutorService _commitExecutor;
private ExecutorService _statusExecutor; private ExecutorService _statusExecutor;
private AtomicLong _cached = new AtomicLong(); private AtomicLong _cached = new AtomicLong();
private AtomicLong _cacheTries = new AtomicLong(); private AtomicLong _cacheTries = new AtomicLong();
@@ -47,7 +46,6 @@ public class CachingObjectPersistentStore {
_cache.set(_cache.get().withVersion(s.id())); _cache.set(_cache.get().withVersion(s.id()));
} }
_commitExecutor = Executors.newSingleThreadExecutor();
if (printStats) { if (printStats) {
_statusExecutor = Executors.newSingleThreadExecutor(); _statusExecutor = Executors.newSingleThreadExecutor();
_statusExecutor.submit(() -> { _statusExecutor.submit(() -> {
@@ -68,7 +66,6 @@ public class CachingObjectPersistentStore {
Log.tracev("Committing: {0} writes, {1} deletes", objs.written().size(), objs.deleted().size()); Log.tracev("Committing: {0} writes, {1} deletes", objs.written().size(), objs.deleted().size());
var cache = _cache.get(); var cache = _cache.get();
var commitFuture = _commitExecutor.submit(() -> delegate.prepareTx(objs, txId).run());
for (var write : objs.written()) { for (var write : objs.written()) {
cache = cache.withPut(write.getLeft(), Optional.of(write.getRight())); cache = cache.withPut(write.getLeft(), Optional.of(write.getRight()));
} }
@@ -76,11 +73,7 @@ public class CachingObjectPersistentStore {
cache = cache.withPut(del, Optional.empty()); cache = cache.withPut(del, Optional.empty());
} }
cache = cache.withVersion(txId); cache = cache.withVersion(txId);
try { delegate.commitTx(objs, txId);
commitFuture.get();
} catch (Exception e) {
throw new RuntimeException(e);
}
_cache.set(cache); _cache.set(cache);
Log.tracev("Committed: {0} writes, {1} deletes", objs.written().size(), objs.deleted().size()); Log.tracev("Committed: {0} writes, {1} deletes", objs.written().size(), objs.deleted().size());

View File

@@ -145,10 +145,9 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore {
} }
@Override @Override
public Runnable prepareTx(TxManifestRaw names, long txId) { public void commitTx(TxManifestRaw names, long txId) {
verifyReady(); verifyReady();
var txn = _env.txnWrite(); try (var txn = _env.txnWrite()) {
try {
for (var written : names.written()) { for (var written : names.written()) {
var putBb = _db.reserve(txn, written.getKey().toByteBuffer(), written.getValue().size()); var putBb = _db.reserve(txn, written.getKey().toByteBuffer(), written.getValue().size());
written.getValue().copyTo(putBb); written.getValue().copyTo(putBb);
@@ -163,17 +162,8 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore {
bbData.putLong(txId); bbData.putLong(txId);
bbData.flip(); bbData.flip();
_db.put(txn, DB_VER_OBJ_NAME.asReadOnlyBuffer(), bbData); _db.put(txn, DB_VER_OBJ_NAME.asReadOnlyBuffer(), bbData);
} catch (Throwable t) { txn.commit();
txn.close();
throw t;
} }
return () -> {
try {
txn.commit();
} finally {
txn.close();
}
};
} }
@Override @Override

View File

@@ -53,19 +53,18 @@ public class MemoryObjectPersistentStore implements ObjectPersistentStore {
} }
} }
public Runnable prepareTx(TxManifestRaw names, long txId) { @Override
return () -> { public void commitTx(TxManifestRaw names, long txId) {
synchronized (this) { synchronized (this) {
for (var written : names.written()) { for (var written : names.written()) {
_objects = _objects.plus(written.getKey(), written.getValue()); _objects = _objects.plus(written.getKey(), written.getValue());
}
for (JObjectKey key : names.deleted()) {
_objects = _objects.minus(key);
}
assert txId > _lastCommitId;
_lastCommitId = txId;
} }
}; for (JObjectKey key : names.deleted()) {
_objects = _objects.minus(key);
}
assert txId > _lastCommitId;
_lastCommitId = txId;
}
} }
@Override @Override

View File

@@ -13,7 +13,7 @@ import java.util.Optional;
public interface ObjectPersistentStore { public interface ObjectPersistentStore {
Snapshot<JObjectKey, ByteBuffer> getSnapshot(); Snapshot<JObjectKey, ByteBuffer> getSnapshot();
Runnable prepareTx(TxManifestRaw names, long txId); void commitTx(TxManifestRaw names, long txId);
long getTotalSpace(); long getTotalSpace();

View File

@@ -62,7 +62,7 @@ public class SerializingObjectPersistentStore {
, objs.deleted()); , objs.deleted());
} }
Runnable prepareTx(TxManifestObj<? extends JDataVersionedWrapper> objects, long txId) { void commitTx(TxManifestObj<? extends JDataVersionedWrapper> objects, long txId) {
return delegateStore.prepareTx(prepareManifest(objects), txId); delegateStore.commitTx(prepareManifest(objects), txId);
} }
} }