mirror of
https://github.com/usatiuk/dhfs.git
synced 2025-10-29 04:57:48 +01:00
Server: op sender batching
This commit is contained in:
@@ -14,6 +14,7 @@ import jakarta.enterprise.context.ApplicationScoped;
|
||||
import jakarta.inject.Inject;
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.UUID;
|
||||
@@ -95,13 +96,20 @@ public class JKleppmannTreeManager {
|
||||
}
|
||||
|
||||
@Override
|
||||
public Op getPendingOpForHost(UUID host) {
|
||||
public List<Op> getPendingOpsForHost(UUID host, int limit) {
|
||||
return _persistentData.get().runReadLocked(JObjectManager.ResolutionStrategy.LOCAL_ONLY, (m, d) -> {
|
||||
if (d.getQueues().containsKey(host)) {
|
||||
var peeked = d.getQueues().get(host).firstEntry();
|
||||
return peeked != null ? new JKleppmannTreeOpWrapper(d.getQueues().get(host).firstEntry().getValue()) : null;
|
||||
var queue = d.getQueues().get(host);
|
||||
ArrayList<Op> collected = new ArrayList<>();
|
||||
|
||||
for (var node : queue.entrySet()) {
|
||||
collected.add(new JKleppmannTreeOpWrapper(node.getValue()));
|
||||
if (collected.size() >= limit) break;
|
||||
}
|
||||
|
||||
return collected;
|
||||
}
|
||||
return null;
|
||||
return List.of();
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -128,18 +128,20 @@ public class RemoteObjectServiceClient {
|
||||
return rpcClientFactory.withObjSyncClient(host, client -> client.indexUpdate(send));
|
||||
}
|
||||
|
||||
public OpPushReply pushOp(Op op, String queueName, UUID host) {
|
||||
for (var ref : op.getEscapedRefs()) {
|
||||
jObjectTxManager.executeTx(() -> {
|
||||
jObjectManager.get(ref).ifPresent(JObject::markSeen);
|
||||
});
|
||||
public OpPushReply pushOps(List<Op> ops, String queueName, UUID host) {
|
||||
for (Op op : ops) {
|
||||
for (var ref : op.getEscapedRefs()) {
|
||||
jObjectTxManager.executeTx(() -> {
|
||||
jObjectManager.get(ref).ifPresent(JObject::markSeen);
|
||||
});
|
||||
}
|
||||
}
|
||||
var msg = OpPushMsg.newBuilder()
|
||||
var builder = OpPushMsg.newBuilder()
|
||||
.setSelfUuid(persistentPeerDataService.getSelfUuid().toString())
|
||||
.setQueueId(queueName)
|
||||
.setMsg(opProtoSerializer.serialize(op))
|
||||
.build();
|
||||
return rpcClientFactory.withObjSyncClient(host, client -> client.opPush(msg));
|
||||
.setQueueId(queueName);
|
||||
for (var op : ops)
|
||||
builder.addMsg(opProtoSerializer.serialize(op));
|
||||
return rpcClientFactory.withObjSyncClient(host, client -> client.opPush(builder.build()));
|
||||
}
|
||||
|
||||
public Collection<CanDeleteReply> canDelete(Collection<UUID> targets, String object, Collection<String> ourReferrers) {
|
||||
|
||||
@@ -163,8 +163,9 @@ public class RemoteObjectServiceServer implements DhfsObjectSyncGrpc {
|
||||
throw new StatusRuntimeException(Status.UNAUTHENTICATED);
|
||||
|
||||
try {
|
||||
var objs = request.getMsgList().stream().map(opProtoSerializer::deserialize).toList();
|
||||
jObjectTxManager.executeTxAndFlush(() -> {
|
||||
opObjectRegistry.acceptExternalOp(request.getQueueId(), UUID.fromString(request.getSelfUuid()), opProtoSerializer.deserialize(request.getMsg()));
|
||||
opObjectRegistry.acceptExternalOps(request.getQueueId(), UUID.fromString(request.getSelfUuid()), objs);
|
||||
});
|
||||
} catch (Exception e) {
|
||||
Log.error(e, e);
|
||||
|
||||
@@ -1,11 +1,12 @@
|
||||
package com.usatiuk.dhfs.objects.repository.opsupport;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
|
||||
public interface OpObject {
|
||||
String getId();
|
||||
|
||||
Op getPendingOpForHost(UUID host);
|
||||
List<Op> getPendingOpsForHost(UUID host, int limit);
|
||||
|
||||
void commitOpForHost(UUID host, Op op);
|
||||
|
||||
|
||||
@@ -8,6 +8,7 @@ import io.smallrye.common.annotation.Blocking;
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
import jakarta.inject.Inject;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
@@ -26,12 +27,13 @@ public class OpObjectRegistry {
|
||||
});
|
||||
}
|
||||
|
||||
public void acceptExternalOp(String objId, UUID from, Op op) {
|
||||
public void acceptExternalOps(String objId, UUID from, List<Op> ops) {
|
||||
var got = _objects.get(objId);
|
||||
if (got == null)
|
||||
throw new StatusRuntimeException(Status.NOT_FOUND.withDescription("Queue with id " + objId + " not registered"));
|
||||
got.addToTx();
|
||||
got.acceptExternalOp(from, op);
|
||||
for (Op op : ops)
|
||||
got.acceptExternalOp(from, op);
|
||||
}
|
||||
|
||||
public void pushBootstrapData(UUID host) {
|
||||
|
||||
@@ -12,7 +12,9 @@ import jakarta.enterprise.context.ApplicationScoped;
|
||||
import jakarta.enterprise.event.Observes;
|
||||
import jakarta.inject.Inject;
|
||||
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
|
||||
import org.eclipse.microprofile.config.inject.ConfigProperty;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
@@ -30,6 +32,8 @@ public class OpSender {
|
||||
JObjectTxManager jObjectTxManager;
|
||||
private ExecutorService _executor;
|
||||
private volatile boolean _shutdown = false;
|
||||
@ConfigProperty(name = "dhfs.objects.opsender.batch-size")
|
||||
int batchSize;
|
||||
|
||||
@Startup
|
||||
void init() {
|
||||
@@ -70,33 +74,27 @@ public class OpSender {
|
||||
// Must be peeked before getPendingOpForHost
|
||||
var periodicPushOp = obj.getPeriodicPushOp();
|
||||
|
||||
long sendCount = 0;
|
||||
Op op;
|
||||
List<Op> collected = obj.getPendingOpsForHost(host, batchSize);
|
||||
|
||||
while ((op = obj.getPendingOpForHost(host)) != null) {
|
||||
if (!collected.isEmpty()) {
|
||||
try {
|
||||
remoteObjectServiceClient.pushOp(op, obj.getId(), host);
|
||||
Op finalOp = op;
|
||||
remoteObjectServiceClient.pushOps(collected, obj.getId(), host);
|
||||
jObjectTxManager.executeTx(() -> {
|
||||
obj.commitOpForHost(host, finalOp);
|
||||
for (var op : collected)
|
||||
obj.commitOpForHost(host, op);
|
||||
});
|
||||
sendCount++;
|
||||
} catch (Exception e) {
|
||||
Log.debug("Sent " + collected.size() + " op updates to " + host + "of" + obj.getId());
|
||||
} catch (Throwable e) {
|
||||
Log.warn("Error sending op to " + host, e);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (sendCount == 0) {
|
||||
if (periodicPushOp == null) return;
|
||||
try {
|
||||
remoteObjectServiceClient.pushOp(periodicPushOp, obj.getId(), host);
|
||||
Log.debug("Sent periodic op update to " + host + "of" + obj.getId());
|
||||
} catch (Exception e) {
|
||||
Log.warn("Error pushing periodic op for " + host + " of " + obj.getId(), e);
|
||||
}
|
||||
} else {
|
||||
Log.debug("Sent " + sendCount + " op updates to " + host + "of" + obj.getId());
|
||||
if (periodicPushOp == null) return;
|
||||
try {
|
||||
remoteObjectServiceClient.pushOps(List.of(periodicPushOp), obj.getId(), host);
|
||||
Log.debug("Sent periodic op update to " + host + "of" + obj.getId());
|
||||
} catch (Throwable e) {
|
||||
Log.warn("Error pushing periodic op for " + host + " of " + obj.getId(), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -94,7 +94,7 @@ message OpPushPayload {
|
||||
message OpPushMsg {
|
||||
string selfUuid = 10;
|
||||
string queueId = 1;
|
||||
OpPushPayload msg = 2;
|
||||
repeated OpPushPayload msg = 2;
|
||||
}
|
||||
|
||||
message OpPushReply {
|
||||
|
||||
@@ -36,6 +36,7 @@ dhfs.objects.autosync.threads=2
|
||||
dhfs.objects.autosync.download-all=false
|
||||
dhfs.objects.move-processor.threads=4
|
||||
dhfs.objects.ref-processor.threads=4
|
||||
dhfs.objects.opsender.batch-size=100
|
||||
dhfs.objects.lock_timeout_secs=15
|
||||
quarkus.log.category."com.usatiuk.dhfs".min-level=TRACE
|
||||
quarkus.log.category."com.usatiuk.dhfs".level=TRACE
|
||||
|
||||
Reference in New Issue
Block a user