Sync-base: fix leaking non-flushed ops

This commit is contained in:
2025-04-23 14:00:30 +02:00
parent 16da05292f
commit adc7356d4a
2 changed files with 25 additions and 7 deletions

View File

@@ -18,10 +18,7 @@ import org.apache.commons.lang3.tuple.Pair;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.*;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -90,18 +87,24 @@ public class RemoteObjectServiceClient {
}
public OpPushReply pushOps(PeerId target, List<Op> ops) {
var barrier = new CountDownLatch(ops.size());
for (Op op : ops) {
txm.run(() -> {
for (var ref : op.getEscapedRefs()) {
curTx.get(RemoteObjectMeta.class, ref).map(m -> m.withSeen(true)).ifPresent(curTx::put);
}
});
}).onFlush(barrier::countDown);
}
var builder = OpPushRequest.newBuilder();
for (Op op : ops) {
builder.addMsg(opProtoSerializer.serialize(op));
}
var built = builder.build();
try {
barrier.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
rpcClientFactory.withObjSyncClient(target, (tgt, client) -> client.opPush(built));
return OpPushReply.getDefaultInstance();
}

View File

@@ -8,6 +8,7 @@ import com.usatiuk.dhfs.repository.invalidation.OpHandler;
import com.usatiuk.dhfs.repository.syncmap.DtoMapperService;
import com.usatiuk.objects.JObjectKey;
import com.usatiuk.objects.transaction.Transaction;
import com.usatiuk.objects.transaction.TransactionHandle;
import com.usatiuk.objects.transaction.TransactionManager;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
@@ -17,6 +18,9 @@ import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import org.apache.commons.lang3.tuple.Pair;
import java.util.ArrayList;
import java.util.concurrent.atomic.AtomicLong;
// Note: RunOnVirtualThread hangs somehow
@ApplicationScoped
public class RemoteObjectServiceServerImpl {
@@ -101,19 +105,30 @@ public class RemoteObjectServiceServerImpl {
}
public Uni<OpPushReply> opPush(PeerId from, OpPushRequest request) {
var handles = new ArrayList<TransactionHandle>();
try {
var ops = request.getMsgList().stream().map(opProtoSerializer::deserialize).toList();
for (var op : ops) {
Log.infov("<-- opPush: {0} from {1}", op, from);
txm.run(() -> {
var handle = txm.run(() -> {
opHandler.handleOp(from, op);
});
handles.add(handle);
}
} catch (Exception e) {
Log.error("Error handling ops", e);
throw e;
}
return Uni.createFrom().item(OpPushReply.getDefaultInstance());
return Uni.createFrom().emitter(e -> {
var counter = new AtomicLong(handles.size());
for (var handle : handles) {
handle.onFlush(() -> {
if (counter.decrementAndGet() == 0) {
e.complete(OpPushReply.getDefaultInstance());
}
});
}
});
}
public Uni<PingReply> ping(PeerId from, PingRequest request) {