Server: simplify tx bundle creation

no evidence that threads help there
This commit is contained in:
2024-10-12 14:17:32 +02:00
parent 8dc16e672a
commit fc2e214ea4
3 changed files with 30 additions and 37 deletions

View File

@@ -13,8 +13,6 @@ import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import java.util.*;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
@@ -137,60 +135,41 @@ public class JObjectTxManager {
}
var bundle = txWriteback.createBundle();
var latch = new CountDownLatch(state._writeObjects.size());
ConcurrentLinkedQueue<Throwable> errors = new ConcurrentLinkedQueue<>();
state._writeObjects.forEach((key, value) -> {
try {
key.getMeta().setLastModifiedTx(bundle.getId());
try {
for (var e : state._writeObjects.entrySet()) {
var key = e.getKey();
var value = e.getValue();
if (key.getMeta().isDeleted()) {
bundle.delete(key);
latch.countDown();
return;
continue;
}
if (!value._dataChanged && !value._metaChanged) {
latch.countDown();
return;
continue;
}
if (key.getMeta().isHaveLocalCopy() && value._dataChanged) {
_serializerThreads.execute(() -> {
try {
bundle.commit(key,
value._metaSerialized,
dataProtoSerializer.serialize(key.getData())
);
} catch (Throwable t) {
Log.error("Error serializing " + key.getMeta().getName(), t);
errors.add(t);
} finally {
latch.countDown();
}
});
bundle.commit(key,
value._metaSerialized,
dataProtoSerializer.serialize(key.getData())
);
} else if (key.getMeta().isHaveLocalCopy() && !value._dataChanged) {
bundle.commitMetaChange(key, value._metaSerialized);
latch.countDown();
} else if (!key.getMeta().isHaveLocalCopy()) {
bundle.commit(key, value._metaSerialized, null);
latch.countDown();
} else {
throw new IllegalStateException("Unexpected object flush combination");
}
} catch (Exception e) {
Log.error("Error committing object " + key.getMeta().getName(), e);
}
});
try {
latch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (Exception ex) {
Log.error("Error creating tx bundle ", ex);
txWriteback.dropBundle(bundle);
throw ex;
}
if (!errors.isEmpty()) {
throw new RuntimeException("Errors when committing!");
}
for (var e : state._writeObjects.entrySet())
e.getKey().getMeta().setLastModifiedTx(bundle.getId());
state._writeObjects.forEach((key, value) -> key.rwUnlock());

View File

@@ -7,6 +7,8 @@ public interface TxWriteback {
void commitBundle(TxBundle bundle);
void dropBundle(TxBundle bundle);
void fence(long bundleId);
// Executes callback after bundle with bundleId id has been persisted

View File

@@ -236,6 +236,18 @@ public class TxWritebackImpl implements TxWriteback {
}
}
@Override
public void dropBundle(com.usatiuk.dhfs.objects.jrepository.TxBundle bundle) {
verifyReady();
synchronized (_pendingBundles) {
Log.warn("Dropped bundle: " + bundle);
_pendingBundles.remove((TxBundle) bundle);
synchronized (_flushWaitSynchronizer) {
currentSize -= ((TxBundle) bundle).calculateTotalSize();
}
}
}
@Override
public void fence(long bundleId) {
var latch = new CountDownLatch(1);