From fc2e214ea40e981c129ad2163dc57e240f5f1f70 Mon Sep 17 00:00:00 2001 From: Stepan Usatiuk Date: Sat, 12 Oct 2024 14:17:32 +0200 Subject: [PATCH] Server: simplify tx bundle creation no evidence that threads help there --- .../objects/jrepository/JObjectTxManager.java | 53 ++++++------------- .../dhfs/objects/jrepository/TxWriteback.java | 2 + .../objects/jrepository/TxWritebackImpl.java | 12 +++++ 3 files changed, 30 insertions(+), 37 deletions(-) diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/jrepository/JObjectTxManager.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/jrepository/JObjectTxManager.java index f011ef8b..e0a70df7 100644 --- a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/jrepository/JObjectTxManager.java +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/jrepository/JObjectTxManager.java @@ -13,8 +13,6 @@ import org.apache.commons.lang3.concurrent.BasicThreadFactory; import org.eclipse.microprofile.config.inject.ConfigProperty; import java.util.*; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicLong; @@ -137,60 +135,41 @@ public class JObjectTxManager { } var bundle = txWriteback.createBundle(); - var latch = new CountDownLatch(state._writeObjects.size()); - ConcurrentLinkedQueue errors = new ConcurrentLinkedQueue<>(); - state._writeObjects.forEach((key, value) -> { - try { - key.getMeta().setLastModifiedTx(bundle.getId()); + try { + for (var e : state._writeObjects.entrySet()) { + var key = e.getKey(); + var value = e.getValue(); if (key.getMeta().isDeleted()) { bundle.delete(key); - latch.countDown(); - return; + continue; } if (!value._dataChanged && !value._metaChanged) { - latch.countDown(); - return; + continue; } if (key.getMeta().isHaveLocalCopy() && value._dataChanged) { - _serializerThreads.execute(() -> { - try { - bundle.commit(key, - value._metaSerialized, - dataProtoSerializer.serialize(key.getData()) - ); - } catch (Throwable t) { - Log.error("Error serializing " + key.getMeta().getName(), t); - errors.add(t); - } finally { - latch.countDown(); - } - }); + bundle.commit(key, + value._metaSerialized, + dataProtoSerializer.serialize(key.getData()) + ); } else if (key.getMeta().isHaveLocalCopy() && !value._dataChanged) { bundle.commitMetaChange(key, value._metaSerialized); - latch.countDown(); } else if (!key.getMeta().isHaveLocalCopy()) { bundle.commit(key, value._metaSerialized, null); - latch.countDown(); } else { throw new IllegalStateException("Unexpected object flush combination"); } - } catch (Exception e) { - Log.error("Error committing object " + key.getMeta().getName(), e); } - }); - - try { - latch.await(); - } catch (InterruptedException e) { - throw new RuntimeException(e); + } catch (Exception ex) { + Log.error("Error creating tx bundle ", ex); + txWriteback.dropBundle(bundle); + throw ex; } - if (!errors.isEmpty()) { - throw new RuntimeException("Errors when committing!"); - } + for (var e : state._writeObjects.entrySet()) + e.getKey().getMeta().setLastModifiedTx(bundle.getId()); state._writeObjects.forEach((key, value) -> key.rwUnlock()); diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/jrepository/TxWriteback.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/jrepository/TxWriteback.java index 89e4bcbb..14c6146f 100644 --- a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/jrepository/TxWriteback.java +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/jrepository/TxWriteback.java @@ -7,6 +7,8 @@ public interface TxWriteback { void commitBundle(TxBundle bundle); + void dropBundle(TxBundle bundle); + void fence(long bundleId); // Executes callback after bundle with bundleId id has been persisted diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/jrepository/TxWritebackImpl.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/jrepository/TxWritebackImpl.java index c2df09e1..50975235 100644 --- a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/jrepository/TxWritebackImpl.java +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/jrepository/TxWritebackImpl.java @@ -236,6 +236,18 @@ public class TxWritebackImpl implements TxWriteback { } } + @Override + public void dropBundle(com.usatiuk.dhfs.objects.jrepository.TxBundle bundle) { + verifyReady(); + synchronized (_pendingBundles) { + Log.warn("Dropped bundle: " + bundle); + _pendingBundles.remove((TxBundle) bundle); + synchronized (_flushWaitSynchronizer) { + currentSize -= ((TxBundle) bundle).calculateTotalSize(); + } + } + } + @Override public void fence(long bundleId) { var latch = new CountDownLatch(1);