Sync-base: OpHandler interface

This commit is contained in:
2025-04-25 15:04:07 +02:00
parent 07133a7186
commit b034591091
7 changed files with 129 additions and 54 deletions

View File

@@ -1,34 +1,7 @@
package com.usatiuk.dhfs.invalidation;
import com.usatiuk.dhfs.jkleppmanntree.JKleppmannTreeManager;
import com.usatiuk.dhfs.jkleppmanntree.JKleppmannTreeOpWrapper;
import com.usatiuk.dhfs.jkleppmanntree.JKleppmannTreePeriodicPushOp;
import com.usatiuk.dhfs.peersync.PeerId;
import com.usatiuk.objects.transaction.Transaction;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
@ApplicationScoped
public class OpHandler {
@Inject
PushOpHandler pushOpHandler;
@Inject
Transaction curTx;
@Inject
JKleppmannTreeManager jKleppmannTreeManager;
@Inject
InvalidationQueueService invalidationQueueService;
public void handleOp(PeerId from, Op op) {
if (op instanceof IndexUpdateOp iu) {
pushOpHandler.handlePush(from, iu);
} else if (op instanceof JKleppmannTreeOpWrapper jk) {
var tree = jKleppmannTreeManager.getTree(jk.treeName()).orElseThrow();
tree.acceptExternalOp(from, jk);
curTx.onCommit(() -> invalidationQueueService.pushInvalidationToOne(from, jk.treeName()));
} else if (op instanceof JKleppmannTreePeriodicPushOp pop) {
var tree = jKleppmannTreeManager.getTree(pop.treeName()).orElseThrow();
tree.acceptExternalOp(from, pop);
}
}
public interface OpHandler<T extends Op> {
void handleOp(PeerId from, T op);
}

View File

@@ -0,0 +1,44 @@
package com.usatiuk.dhfs.invalidation;
import com.usatiuk.dhfs.peersync.PeerId;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Instance;
import java.lang.reflect.ParameterizedType;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Stream;
@ApplicationScoped
public class OpHandlerService {
private final Map<Class<? extends Op>, OpHandler> _opHandlerMap;
public OpHandlerService(Instance<OpHandler<?>> OpHandlers) {
HashMap<Class<? extends Op>, OpHandler> OpHandlerMap = new HashMap<>();
for (var OpHandler : OpHandlers.handles()) {
for (var type : Arrays.stream(OpHandler.getBean().getBeanClass().getGenericInterfaces()).flatMap(
t -> {
if (!(t instanceof ParameterizedType pm)) return Stream.empty();
if (pm.getRawType().equals(OpHandler.class)) return Stream.of(pm);
return Stream.empty();
}
).toList()) {
var orig = type.getActualTypeArguments()[0];
assert Op.class.isAssignableFrom((Class<?>) orig);
OpHandlerMap.put((Class<? extends Op>) orig, OpHandler.get());
}
}
_opHandlerMap = Map.copyOf(OpHandlerMap);
}
public void handleOp(PeerId from, Op op) {
var handler = _opHandlerMap.get(op.getClass());
if (handler == null) {
throw new IllegalArgumentException("No handler for op: " + op.getClass());
}
handler.handleOp(from, op);
}
}

View File

@@ -1,22 +0,0 @@
package com.usatiuk.dhfs.invalidation;
import com.usatiuk.dhfs.peersync.PeerId;
import com.usatiuk.dhfs.remoteobj.RemoteTransaction;
import com.usatiuk.dhfs.remoteobj.SyncHandler;
import com.usatiuk.objects.transaction.Transaction;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
@ApplicationScoped
public class PushOpHandler {
@Inject
Transaction curTx;
@Inject
SyncHandler syncHandler;
@Inject
RemoteTransaction remoteTransaction;
public void handlePush(PeerId peer, IndexUpdateOp obj) {
syncHandler.handleRemoteUpdate(peer, obj.key(), obj.changelog(), null);
}
}

View File

@@ -0,0 +1,31 @@
package com.usatiuk.dhfs.jkleppmanntree;
import com.usatiuk.dhfs.invalidation.InvalidationQueueService;
import com.usatiuk.dhfs.invalidation.OpHandler;
import com.usatiuk.dhfs.peersync.PeerId;
import com.usatiuk.objects.transaction.Transaction;
import com.usatiuk.objects.transaction.TransactionManager;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
@ApplicationScoped
public class JKleppmannTreeOpHandler implements OpHandler<JKleppmannTreeOpWrapper> {
@Inject
TransactionManager txm;
@Inject
Transaction curTx;
@Inject
JKleppmannTreeManager jKleppmannTreeManager;
@Inject
InvalidationQueueService invalidationQueueService;
@Override
public void handleOp(PeerId from, JKleppmannTreeOpWrapper op) {
txm.run(()->{
var tree = jKleppmannTreeManager.getTree(op.treeName()).orElseThrow();
tree.acceptExternalOp(from, op);
// Push ack op
curTx.onCommit(() -> invalidationQueueService.pushInvalidationToOne(from, op.treeName()));
});
}
}

View File

@@ -0,0 +1,23 @@
package com.usatiuk.dhfs.jkleppmanntree;
import com.usatiuk.dhfs.invalidation.OpHandler;
import com.usatiuk.dhfs.peersync.PeerId;
import com.usatiuk.objects.transaction.TransactionManager;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
@ApplicationScoped
public class JKleppmannTreePeriodicOpHandler implements OpHandler<JKleppmannTreePeriodicPushOp> {
@Inject
TransactionManager txm;
@Inject
JKleppmannTreeManager jKleppmannTreeManager;
@Override
public void handleOp(PeerId from, JKleppmannTreePeriodicPushOp op) {
txm.run(() -> {
var tree = jKleppmannTreeManager.getTree(op.treeName()).orElseThrow();
tree.acceptExternalOp(from, op);
});
}
}

View File

@@ -0,0 +1,26 @@
package com.usatiuk.dhfs.remoteobj;
import com.usatiuk.dhfs.invalidation.IndexUpdateOp;
import com.usatiuk.dhfs.invalidation.OpHandler;
import com.usatiuk.dhfs.peersync.PeerId;
import com.usatiuk.objects.transaction.Transaction;
import com.usatiuk.objects.transaction.TransactionManager;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
@ApplicationScoped
public class IndexUpdateOpHandler implements OpHandler<IndexUpdateOp> {
@Inject
TransactionManager txm;
@Inject
Transaction curTx;
@Inject
SyncHandler syncHandler;
@Override
public void handleOp(PeerId from, IndexUpdateOp op) {
txm.run(() -> {
syncHandler.handleRemoteUpdate(from, op.key(), op.changelog(), null);
});
}
}

View File

@@ -3,7 +3,7 @@ package com.usatiuk.dhfs.rpc;
import com.usatiuk.dhfs.ProtoSerializer;
import com.usatiuk.dhfs.autosync.AutosyncProcessor;
import com.usatiuk.dhfs.invalidation.Op;
import com.usatiuk.dhfs.invalidation.OpHandler;
import com.usatiuk.dhfs.invalidation.OpHandlerService;
import com.usatiuk.dhfs.peersync.PeerId;
import com.usatiuk.dhfs.peersync.PeerManager;
import com.usatiuk.dhfs.persistence.JObjectKeyP;
@@ -42,7 +42,7 @@ public class RemoteObjectServiceServerImpl {
@Inject
RemoteTransaction remoteTx;
@Inject
OpHandler opHandler;
OpHandlerService opHandlerService;
@Inject
DtoMapperService dtoMapperService;
@Inject
@@ -120,7 +120,7 @@ public class RemoteObjectServiceServerImpl {
for (var op : ops) {
Log.infov("<-- opPush: {0} from {1}", op, from);
var handle = txm.run(() -> {
opHandler.handleOp(from, op);
opHandlerService.handleOp(from, op);
});
handles.add(handle);
}