From b0345910919a7e86584710b1bc4b6e15e21c0ad2 Mon Sep 17 00:00:00 2001 From: Stepan Usatiuk Date: Fri, 25 Apr 2025 15:04:07 +0200 Subject: [PATCH] Sync-base: OpHandler interface --- .../usatiuk/dhfs/invalidation/OpHandler.java | 31 +------------ .../dhfs/invalidation/OpHandlerService.java | 44 +++++++++++++++++++ .../dhfs/invalidation/PushOpHandler.java | 22 ---------- .../JKleppmannTreeOpHandler.java | 31 +++++++++++++ .../JKleppmannTreePeriodicOpHandler.java | 23 ++++++++++ .../dhfs/remoteobj/IndexUpdateOpHandler.java | 26 +++++++++++ .../rpc/RemoteObjectServiceServerImpl.java | 6 +-- 7 files changed, 129 insertions(+), 54 deletions(-) create mode 100644 dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/invalidation/OpHandlerService.java delete mode 100644 dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/invalidation/PushOpHandler.java create mode 100644 dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/jkleppmanntree/JKleppmannTreeOpHandler.java create mode 100644 dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/jkleppmanntree/JKleppmannTreePeriodicOpHandler.java create mode 100644 dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/remoteobj/IndexUpdateOpHandler.java diff --git a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/invalidation/OpHandler.java b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/invalidation/OpHandler.java index ecc0f13e..ddada994 100644 --- a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/invalidation/OpHandler.java +++ b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/invalidation/OpHandler.java @@ -1,34 +1,7 @@ package com.usatiuk.dhfs.invalidation; -import com.usatiuk.dhfs.jkleppmanntree.JKleppmannTreeManager; -import com.usatiuk.dhfs.jkleppmanntree.JKleppmannTreeOpWrapper; -import com.usatiuk.dhfs.jkleppmanntree.JKleppmannTreePeriodicPushOp; import com.usatiuk.dhfs.peersync.PeerId; -import com.usatiuk.objects.transaction.Transaction; -import jakarta.enterprise.context.ApplicationScoped; -import jakarta.inject.Inject; -@ApplicationScoped -public class OpHandler { - @Inject - PushOpHandler pushOpHandler; - @Inject - Transaction curTx; - @Inject - JKleppmannTreeManager jKleppmannTreeManager; - @Inject - InvalidationQueueService invalidationQueueService; - - public void handleOp(PeerId from, Op op) { - if (op instanceof IndexUpdateOp iu) { - pushOpHandler.handlePush(from, iu); - } else if (op instanceof JKleppmannTreeOpWrapper jk) { - var tree = jKleppmannTreeManager.getTree(jk.treeName()).orElseThrow(); - tree.acceptExternalOp(from, jk); - curTx.onCommit(() -> invalidationQueueService.pushInvalidationToOne(from, jk.treeName())); - } else if (op instanceof JKleppmannTreePeriodicPushOp pop) { - var tree = jKleppmannTreeManager.getTree(pop.treeName()).orElseThrow(); - tree.acceptExternalOp(from, pop); - } - } +public interface OpHandler { + void handleOp(PeerId from, T op); } diff --git a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/invalidation/OpHandlerService.java b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/invalidation/OpHandlerService.java new file mode 100644 index 00000000..e6ee8d59 --- /dev/null +++ b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/invalidation/OpHandlerService.java @@ -0,0 +1,44 @@ +package com.usatiuk.dhfs.invalidation; + +import com.usatiuk.dhfs.peersync.PeerId; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.enterprise.inject.Instance; + +import java.lang.reflect.ParameterizedType; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.stream.Stream; + +@ApplicationScoped +public class OpHandlerService { + private final Map, OpHandler> _opHandlerMap; + + public OpHandlerService(Instance> OpHandlers) { + HashMap, OpHandler> OpHandlerMap = new HashMap<>(); + + for (var OpHandler : OpHandlers.handles()) { + for (var type : Arrays.stream(OpHandler.getBean().getBeanClass().getGenericInterfaces()).flatMap( + t -> { + if (!(t instanceof ParameterizedType pm)) return Stream.empty(); + if (pm.getRawType().equals(OpHandler.class)) return Stream.of(pm); + return Stream.empty(); + } + ).toList()) { + var orig = type.getActualTypeArguments()[0]; + assert Op.class.isAssignableFrom((Class) orig); + OpHandlerMap.put((Class) orig, OpHandler.get()); + } + } + + _opHandlerMap = Map.copyOf(OpHandlerMap); + } + + public void handleOp(PeerId from, Op op) { + var handler = _opHandlerMap.get(op.getClass()); + if (handler == null) { + throw new IllegalArgumentException("No handler for op: " + op.getClass()); + } + handler.handleOp(from, op); + } +} diff --git a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/invalidation/PushOpHandler.java b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/invalidation/PushOpHandler.java deleted file mode 100644 index fe160fad..00000000 --- a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/invalidation/PushOpHandler.java +++ /dev/null @@ -1,22 +0,0 @@ -package com.usatiuk.dhfs.invalidation; - -import com.usatiuk.dhfs.peersync.PeerId; -import com.usatiuk.dhfs.remoteobj.RemoteTransaction; -import com.usatiuk.dhfs.remoteobj.SyncHandler; -import com.usatiuk.objects.transaction.Transaction; -import jakarta.enterprise.context.ApplicationScoped; -import jakarta.inject.Inject; - -@ApplicationScoped -public class PushOpHandler { - @Inject - Transaction curTx; - @Inject - SyncHandler syncHandler; - @Inject - RemoteTransaction remoteTransaction; - - public void handlePush(PeerId peer, IndexUpdateOp obj) { - syncHandler.handleRemoteUpdate(peer, obj.key(), obj.changelog(), null); - } -} diff --git a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/jkleppmanntree/JKleppmannTreeOpHandler.java b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/jkleppmanntree/JKleppmannTreeOpHandler.java new file mode 100644 index 00000000..2dacca29 --- /dev/null +++ b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/jkleppmanntree/JKleppmannTreeOpHandler.java @@ -0,0 +1,31 @@ +package com.usatiuk.dhfs.jkleppmanntree; + +import com.usatiuk.dhfs.invalidation.InvalidationQueueService; +import com.usatiuk.dhfs.invalidation.OpHandler; +import com.usatiuk.dhfs.peersync.PeerId; +import com.usatiuk.objects.transaction.Transaction; +import com.usatiuk.objects.transaction.TransactionManager; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; + +@ApplicationScoped +public class JKleppmannTreeOpHandler implements OpHandler { +@Inject + TransactionManager txm; + @Inject + Transaction curTx; + @Inject + JKleppmannTreeManager jKleppmannTreeManager; + @Inject + InvalidationQueueService invalidationQueueService; + + @Override + public void handleOp(PeerId from, JKleppmannTreeOpWrapper op) { + txm.run(()->{ + var tree = jKleppmannTreeManager.getTree(op.treeName()).orElseThrow(); + tree.acceptExternalOp(from, op); + // Push ack op + curTx.onCommit(() -> invalidationQueueService.pushInvalidationToOne(from, op.treeName())); + }); + } +} diff --git a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/jkleppmanntree/JKleppmannTreePeriodicOpHandler.java b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/jkleppmanntree/JKleppmannTreePeriodicOpHandler.java new file mode 100644 index 00000000..e3b0585b --- /dev/null +++ b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/jkleppmanntree/JKleppmannTreePeriodicOpHandler.java @@ -0,0 +1,23 @@ +package com.usatiuk.dhfs.jkleppmanntree; + +import com.usatiuk.dhfs.invalidation.OpHandler; +import com.usatiuk.dhfs.peersync.PeerId; +import com.usatiuk.objects.transaction.TransactionManager; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; + +@ApplicationScoped +public class JKleppmannTreePeriodicOpHandler implements OpHandler { + @Inject + TransactionManager txm; + @Inject + JKleppmannTreeManager jKleppmannTreeManager; + + @Override + public void handleOp(PeerId from, JKleppmannTreePeriodicPushOp op) { + txm.run(() -> { + var tree = jKleppmannTreeManager.getTree(op.treeName()).orElseThrow(); + tree.acceptExternalOp(from, op); + }); + } +} diff --git a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/remoteobj/IndexUpdateOpHandler.java b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/remoteobj/IndexUpdateOpHandler.java new file mode 100644 index 00000000..4b6a506e --- /dev/null +++ b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/remoteobj/IndexUpdateOpHandler.java @@ -0,0 +1,26 @@ +package com.usatiuk.dhfs.remoteobj; + +import com.usatiuk.dhfs.invalidation.IndexUpdateOp; +import com.usatiuk.dhfs.invalidation.OpHandler; +import com.usatiuk.dhfs.peersync.PeerId; +import com.usatiuk.objects.transaction.Transaction; +import com.usatiuk.objects.transaction.TransactionManager; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; + +@ApplicationScoped +public class IndexUpdateOpHandler implements OpHandler { + @Inject + TransactionManager txm; + @Inject + Transaction curTx; + @Inject + SyncHandler syncHandler; + + @Override + public void handleOp(PeerId from, IndexUpdateOp op) { + txm.run(() -> { + syncHandler.handleRemoteUpdate(from, op.key(), op.changelog(), null); + }); + } +} diff --git a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/rpc/RemoteObjectServiceServerImpl.java b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/rpc/RemoteObjectServiceServerImpl.java index 9d14455c..048522a1 100644 --- a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/rpc/RemoteObjectServiceServerImpl.java +++ b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/rpc/RemoteObjectServiceServerImpl.java @@ -3,7 +3,7 @@ package com.usatiuk.dhfs.rpc; import com.usatiuk.dhfs.ProtoSerializer; import com.usatiuk.dhfs.autosync.AutosyncProcessor; import com.usatiuk.dhfs.invalidation.Op; -import com.usatiuk.dhfs.invalidation.OpHandler; +import com.usatiuk.dhfs.invalidation.OpHandlerService; import com.usatiuk.dhfs.peersync.PeerId; import com.usatiuk.dhfs.peersync.PeerManager; import com.usatiuk.dhfs.persistence.JObjectKeyP; @@ -42,7 +42,7 @@ public class RemoteObjectServiceServerImpl { @Inject RemoteTransaction remoteTx; @Inject - OpHandler opHandler; + OpHandlerService opHandlerService; @Inject DtoMapperService dtoMapperService; @Inject @@ -120,7 +120,7 @@ public class RemoteObjectServiceServerImpl { for (var op : ops) { Log.infov("<-- opPush: {0} from {1}", op, from); var handle = txm.run(() -> { - opHandler.handleOp(from, op); + opHandlerService.handleOp(from, op); }); handles.add(handle); }