Server: 100 ops push

This commit is contained in:
2025-03-24 23:04:44 +01:00
parent e0b4f97349
commit 8f7869d87a

View File

@@ -16,6 +16,7 @@ import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
@ApplicationScoped
public class OpPusher {
@@ -35,59 +36,64 @@ public class OpPusher {
DtoMapperService dtoMapperService;
public void doPush(InvalidationQueueEntry entry) {
List<Op> info = txm.run(() -> {
var obj = curTx.get(JData.class, entry.key()).orElse(null);
switch (obj) {
case RemoteObjectMeta remote -> {
JDataRemoteDto data =
remote.knownType().isAnnotationPresent(JDataRemotePush.class)
? remoteTransaction.getData(remote.knownType(), entry.key())
.map(d -> dtoMapperService.toDto(d, d.dtoClass())).orElse(null)
: null;
AtomicBoolean doAgain = new AtomicBoolean(false);
do {
// FIXME:
doAgain.set(false);
List<Op> info = txm.run(() -> {
var obj = curTx.get(JData.class, entry.key()).orElse(null);
switch (obj) {
case RemoteObjectMeta remote -> {
JDataRemoteDto data =
remote.knownType().isAnnotationPresent(JDataRemotePush.class)
? remoteTransaction.getData(remote.knownType(), entry.key())
.map(d -> dtoMapperService.toDto(d, d.dtoClass())).orElse(null)
: null;
if (remote.knownType().isAnnotationPresent(JDataRemotePush.class) && data == null) {
Log.warnv("Failed to get data for push {0} of type {1}", entry.key(), remote.knownType());
if (remote.knownType().isAnnotationPresent(JDataRemotePush.class) && data == null) {
Log.warnv("Failed to get data for push {0} of type {1}", entry.key(), remote.knownType());
}
return List.of(new IndexUpdateOp(entry.key(), remote.changelog(), data));
}
return List.of(new IndexUpdateOp(entry.key(), remote.changelog(), data));
}
case JKleppmannTreePersistentData pd -> {
var tree = jKleppmannTreeManager.getTree(pd.key());
case JKleppmannTreePersistentData pd -> {
var tree = jKleppmannTreeManager.getTree(pd.key());
if (!tree.hasPendingOpsForHost(entry.peer()))
return List.of(tree.getPeriodicPushOp());
if (!tree.hasPendingOpsForHost(entry.peer()))
return List.of(tree.getPeriodicPushOp());
var ops = tree.getPendingOpsForHost(entry.peer(), 1);
var ops = tree.getPendingOpsForHost(entry.peer(), 100);
if (tree.hasPendingOpsForHost(entry.peer()))
invalidationQueueService.pushInvalidationToOne(entry.peer(), pd.key());
return ops;
}
case null,
default -> {
return null;
}
}
});
if (info == null) {
return;
}
Log.debugv("Pushing invalidation: entry {0}, sending {1}", entry, info);
remoteObjectServiceClient.pushOps(entry.peer(), info);
txm.run(() -> {
var obj = curTx.get(JData.class, entry.key()).orElse(null);
switch (obj) {
case JKleppmannTreePersistentData pd: {
var tree = jKleppmannTreeManager.getTree(pd.key());
for (var op : info) {
tree.commitOpForHost(entry.peer(), op);
if (tree.hasPendingOpsForHost(entry.peer())) {
doAgain.set(true);
invalidationQueueService.pushInvalidationToOne(entry.peer(), pd.key());
}
return ops;
}
case null,
default -> {
return null;
}
break;
}
case null:
default:
});
if (info == null) {
return;
}
});
Log.debugv("Pushing invalidation: entry {0}, sending {1}", entry, info);
remoteObjectServiceClient.pushOps(entry.peer(), info);
txm.run(() -> {
var obj = curTx.get(JData.class, entry.key()).orElse(null);
switch (obj) {
case JKleppmannTreePersistentData pd: {
var tree = jKleppmannTreeManager.getTree(pd.key());
for (var op : info) {
tree.commitOpForHost(entry.peer(), op);
}
break;
}
case null:
default:
}
});
} while (doAgain.get());
}
}