diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/invalidation/OpPusher.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/invalidation/OpPusher.java index de603c32..a762c89e 100644 --- a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/invalidation/OpPusher.java +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/invalidation/OpPusher.java @@ -16,6 +16,7 @@ import jakarta.enterprise.context.ApplicationScoped; import jakarta.inject.Inject; import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; @ApplicationScoped public class OpPusher { @@ -35,59 +36,64 @@ public class OpPusher { DtoMapperService dtoMapperService; public void doPush(InvalidationQueueEntry entry) { - List info = txm.run(() -> { - var obj = curTx.get(JData.class, entry.key()).orElse(null); - switch (obj) { - case RemoteObjectMeta remote -> { - JDataRemoteDto data = - remote.knownType().isAnnotationPresent(JDataRemotePush.class) - ? remoteTransaction.getData(remote.knownType(), entry.key()) - .map(d -> dtoMapperService.toDto(d, d.dtoClass())).orElse(null) - : null; + AtomicBoolean doAgain = new AtomicBoolean(false); + do { + // FIXME: + doAgain.set(false); + List info = txm.run(() -> { + var obj = curTx.get(JData.class, entry.key()).orElse(null); + switch (obj) { + case RemoteObjectMeta remote -> { + JDataRemoteDto data = + remote.knownType().isAnnotationPresent(JDataRemotePush.class) + ? remoteTransaction.getData(remote.knownType(), entry.key()) + .map(d -> dtoMapperService.toDto(d, d.dtoClass())).orElse(null) + : null; - if (remote.knownType().isAnnotationPresent(JDataRemotePush.class) && data == null) { - Log.warnv("Failed to get data for push {0} of type {1}", entry.key(), remote.knownType()); + if (remote.knownType().isAnnotationPresent(JDataRemotePush.class) && data == null) { + Log.warnv("Failed to get data for push {0} of type {1}", entry.key(), remote.knownType()); + } + return List.of(new IndexUpdateOp(entry.key(), remote.changelog(), data)); } - return List.of(new IndexUpdateOp(entry.key(), remote.changelog(), data)); - } - case JKleppmannTreePersistentData pd -> { - var tree = jKleppmannTreeManager.getTree(pd.key()); + case JKleppmannTreePersistentData pd -> { + var tree = jKleppmannTreeManager.getTree(pd.key()); - if (!tree.hasPendingOpsForHost(entry.peer())) - return List.of(tree.getPeriodicPushOp()); + if (!tree.hasPendingOpsForHost(entry.peer())) + return List.of(tree.getPeriodicPushOp()); - var ops = tree.getPendingOpsForHost(entry.peer(), 1); + var ops = tree.getPendingOpsForHost(entry.peer(), 100); - if (tree.hasPendingOpsForHost(entry.peer())) - invalidationQueueService.pushInvalidationToOne(entry.peer(), pd.key()); - - return ops; - } - case null, - default -> { - return null; - } - } - }); - if (info == null) { - return; - } - Log.debugv("Pushing invalidation: entry {0}, sending {1}", entry, info); - remoteObjectServiceClient.pushOps(entry.peer(), info); - txm.run(() -> { - var obj = curTx.get(JData.class, entry.key()).orElse(null); - switch (obj) { - case JKleppmannTreePersistentData pd: { - var tree = jKleppmannTreeManager.getTree(pd.key()); - for (var op : info) { - tree.commitOpForHost(entry.peer(), op); + if (tree.hasPendingOpsForHost(entry.peer())) { + doAgain.set(true); + invalidationQueueService.pushInvalidationToOne(entry.peer(), pd.key()); + } + return ops; + } + case null, + default -> { + return null; } - break; } - case null: - default: + }); + if (info == null) { + return; } - }); - + Log.debugv("Pushing invalidation: entry {0}, sending {1}", entry, info); + remoteObjectServiceClient.pushOps(entry.peer(), info); + txm.run(() -> { + var obj = curTx.get(JData.class, entry.key()).orElse(null); + switch (obj) { + case JKleppmannTreePersistentData pd: { + var tree = jKleppmannTreeManager.getTree(pd.key()); + for (var op : info) { + tree.commitOpForHost(entry.peer(), op); + } + break; + } + case null: + default: + } + }); + } while (doAgain.get()); } }