From 18d5a7f90e1a42b792eb3aec382e1430bb91bcce Mon Sep 17 00:00:00 2001 From: Stepan Usatiuk Date: Thu, 27 Mar 2025 13:56:45 +0100 Subject: [PATCH] Server: move server impl to a separate bean --- .../repository/RemoteObjectServiceServer.java | 147 ++---------------- .../RemoteObjectServiceServerImpl.java | 123 +++++++++++++++ 2 files changed, 132 insertions(+), 138 deletions(-) create mode 100644 dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/RemoteObjectServiceServerImpl.java diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/RemoteObjectServiceServer.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/RemoteObjectServiceServer.java index cfb59805..bb388d4b 100644 --- a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/RemoteObjectServiceServer.java +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/RemoteObjectServiceServer.java @@ -24,165 +24,36 @@ import org.apache.commons.lang3.tuple.Pair; @GrpcService @RolesAllowed("cluster-member") public class RemoteObjectServiceServer implements DhfsObjectSyncGrpc { -// @Inject -// SyncHandler syncHandler; - - @Inject - TransactionManager txm; - @Inject - PeerManager peerManager; - @Inject - Transaction curTx; - @Inject - PersistentPeerDataService persistentPeerDataService; - - @Inject - InvalidationQueueService invalidationQueueService; @Inject SecurityIdentity identity; @Inject - ProtoSerializer opProtoSerializer; - @Inject - ProtoSerializer receivedObjectProtoSerializer; - @Inject - RemoteTransaction remoteTx; - @Inject - OpHandler opHandler; - @Inject - DtoMapperService dtoMapperService; - @Inject - AutosyncProcessor autosyncProcessor; + RemoteObjectServiceServerImpl remoteObjectServiceServerImpl; + + PeerId getIdentity() { + return PeerId.of(identity.getPrincipal().getName().substring(3)); + } @Override @Blocking public Uni getObject(GetObjectRequest request) { - Log.info("<-- getObject: " + request.getName() + " from " + identity.getPrincipal().getName().substring(3)); - - Pair got = txm.run(() -> { - var meta = remoteTx.getMeta(JObjectKey.of(request.getName().getName())).orElse(null); - var obj = remoteTx.getDataLocal(JDataRemote.class, JObjectKey.of(request.getName().getName())).orElse(null); - if (meta != null && !meta.seen()) - curTx.put(meta.withSeen(true)); - if (obj != null) - for (var ref : obj.collectRefsTo()) { - var refMeta = remoteTx.getMeta(ref).orElse(null); - if (refMeta != null && !refMeta.seen()) - curTx.put(refMeta.withSeen(true)); - } - return Pair.of(meta, obj == null ? null : dtoMapperService.toDto(obj, obj.dtoClass())); - }); - - if ((got.getValue() != null) && (got.getKey() == null)) { - Log.error("Inconsistent state for object meta: " + request.getName()); - throw new StatusRuntimeException(Status.INTERNAL); - } - - if (got.getValue() == null) { - Log.info("<-- getObject NOT FOUND: " + request.getName() + " from " + identity.getPrincipal().getName().substring(3)); - throw new StatusRuntimeException(Status.NOT_FOUND); - } - - var serialized = receivedObjectProtoSerializer.serialize(new ReceivedObject(got.getKey().changelog(), got.getRight())); - return Uni.createFrom().item(serialized); -// // Does @Blocking break this? -// return Uni.createFrom().emitter(emitter -> { -// try { -// } catch (Exception e) { -// emitter.fail(e); -// } -// var replyObj = txm.run(() -> { -// var cur = curTx.get(JDataRemote.class, JObjectKey.of(request.getName())).orElseThrow(() -> new StatusRuntimeException(Status.NOT_FOUND)); -// // Obj.markSeen before markSeen of its children -// obj.markSeen(); -// return obj.runReadLocked(JObjectManager.ResolutionStrategy.LOCAL_ONLY, (meta, data) -> { -// if (meta.isOnlyLocal()) -// throw new StatusRuntimeExceptionNoStacktrace(Status.INVALID_ARGUMENT.withDescription("Trying to get local-only object")); -// if (data == null) { -// Log.info("<-- getObject FAIL: " + request.getName() + " from " + request.getSelfUuid()); -// throw new StatusRuntimeException(Status.ABORTED.withDescription("Not available locally")); -// } -// data.extractRefs().forEach(ref -> -// jObjectManager.get(ref) -// .orElseThrow(() -> new IllegalStateException("Non-hydrated refs for local object?")) -// .markSeen()); -// -// return ApiObject.newBuilder() -// .setHeader(obj.getMeta().toRpcHeader()) -// .setContent(dataProtoSerializer.serialize(obj.getData())).build(); -// }); -// }); -// var ret = GetObjectReply.newBuilder() -// .setSelfUuid(persistentPeerDataService.getSelfUuid().toString()) -// .setObject(replyObj).build(); -// emitter.complete(ret); -// // TODO: Could this cause problems if we wait for too long? -//// obj.commitFenceAsync(() -> emitter.complete(ret)); -// }); + return remoteObjectServiceServerImpl.getObject(getIdentity(), request); } @Override @Blocking public Uni canDelete(CanDeleteRequest request) { - var peerId = identity.getPrincipal().getName().substring(3); - - Log.info("<-- canDelete: " + request.getName() + " from " + peerId); - - var builder = CanDeleteReply.newBuilder(); - - txm.run(() -> { - var obj = curTx.get(RemoteObjectMeta.class, JObjectKey.of(request.getName().getName())).orElse(null); - - if (obj == null) { - builder.setDeletionCandidate(true); - return; - } - - builder.setDeletionCandidate(!obj.frozen() && obj.refsFrom().isEmpty()); - - if (!builder.getDeletionCandidate()) { - for (var r : obj.refsFrom()) { - builder.addReferrers(JObjectKeyP.newBuilder().setName(r.obj().toString()).build()); - curTx.onCommit(() -> autosyncProcessor.add(r.obj())); - } - } - }); - return Uni.createFrom().item(builder.build()); + return remoteObjectServiceServerImpl.canDelete(getIdentity(), request); } - // @Override -// @Blocking -// public Uni indexUpdate(IndexUpdatePush request) { -// if (request.getSelfUuid().isBlank()) throw new StatusRuntimeException(Status.INVALID_ARGUMENT); -// if (!persistentPeerDataService.existsHost(UUID.fromString(request.getSelfUuid()))) -// throw new StatusRuntimeException(Status.UNAUTHENTICATED); -// -// Log.info("<-- indexUpdate: " + request.getHeader().getName()); -// return jObjectTxManager.executeTxAndFlush(() -> { -// return Uni.createFrom().item(syncHandler.handleRemoteUpdate(request)); -// }); -// } @Override @Blocking public Uni opPush(OpPushRequest request) { - try { - var ops = request.getMsgList().stream().map(opProtoSerializer::deserialize).toList(); - for (var op : ops) { - Log.info("<-- op: " + op + " from " + identity.getPrincipal().getName().substring(3)); - txm.run(() -> { - opHandler.handleOp(PeerId.of(identity.getPrincipal().getName().substring(3)), op); - }); - } - } catch (Exception e) { - Log.error(e, e); - throw e; - } - return Uni.createFrom().item(OpPushReply.getDefaultInstance()); + return remoteObjectServiceServerImpl.opPush(getIdentity(), request); } - @Override @Blocking public Uni ping(PingRequest request) { - return Uni.createFrom().item(PingReply.getDefaultInstance()); + return remoteObjectServiceServerImpl.ping(getIdentity(), request); } } diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/RemoteObjectServiceServerImpl.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/RemoteObjectServiceServerImpl.java new file mode 100644 index 00000000..215cbbad --- /dev/null +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/RemoteObjectServiceServerImpl.java @@ -0,0 +1,123 @@ +package com.usatiuk.dhfs.objects.repository; + +import com.usatiuk.autoprotomap.runtime.ProtoSerializer; +import com.usatiuk.dhfs.objects.*; +import com.usatiuk.dhfs.objects.persistence.JObjectKeyP; +import com.usatiuk.dhfs.objects.repository.invalidation.Op; +import com.usatiuk.dhfs.objects.repository.invalidation.OpHandler; +import com.usatiuk.dhfs.objects.repository.syncmap.DtoMapperService; +import com.usatiuk.dhfs.objects.transaction.Transaction; +import com.usatiuk.dhfs.objects.transaction.TransactionManager; +import io.grpc.Status; +import io.grpc.StatusRuntimeException; +import io.quarkus.grpc.GrpcService; +import io.quarkus.logging.Log; +import io.quarkus.security.identity.SecurityIdentity; +import io.smallrye.common.annotation.Blocking; +import io.smallrye.mutiny.Uni; +import jakarta.annotation.security.RolesAllowed; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; +import org.apache.commons.lang3.tuple.Pair; + +// Note: RunOnVirtualThread hangs somehow +@ApplicationScoped +public class RemoteObjectServiceServerImpl { + @Inject + TransactionManager txm; + @Inject + PeerManager peerManager; + @Inject + Transaction curTx; + + @Inject + ProtoSerializer opProtoSerializer; + @Inject + ProtoSerializer receivedObjectProtoSerializer; + @Inject + RemoteTransaction remoteTx; + @Inject + OpHandler opHandler; + @Inject + DtoMapperService dtoMapperService; + @Inject + AutosyncProcessor autosyncProcessor; + + public Uni getObject(PeerId from, GetObjectRequest request) { + Log.info("<-- getObject: " + request.getName() + " from " + from); + + Pair got = txm.run(() -> { + var meta = remoteTx.getMeta(JObjectKey.of(request.getName().getName())).orElse(null); + var obj = remoteTx.getDataLocal(JDataRemote.class, JObjectKey.of(request.getName().getName())).orElse(null); + if (meta != null && !meta.seen()) + curTx.put(meta.withSeen(true)); + if (obj != null) + for (var ref : obj.collectRefsTo()) { + var refMeta = remoteTx.getMeta(ref).orElse(null); + if (refMeta != null && !refMeta.seen()) + curTx.put(refMeta.withSeen(true)); + } + return Pair.of(meta, obj == null ? null : dtoMapperService.toDto(obj, obj.dtoClass())); + }); + + if ((got.getValue() != null) && (got.getKey() == null)) { + Log.error("Inconsistent state for object meta: " + request.getName()); + throw new StatusRuntimeException(Status.INTERNAL); + } + + if (got.getValue() == null) { + Log.info("<-- getObject NOT FOUND: " + request.getName() + " from " + from); + throw new StatusRuntimeException(Status.NOT_FOUND); + } + + var serialized = receivedObjectProtoSerializer.serialize(new ReceivedObject(got.getKey().changelog(), got.getRight())); + return Uni.createFrom().item(serialized); + } + + public Uni canDelete(PeerId from, CanDeleteRequest request) { + var peerId = from; + + Log.info("<-- canDelete: " + request.getName() + " from " + peerId); + + var builder = CanDeleteReply.newBuilder(); + + txm.run(() -> { + var obj = curTx.get(RemoteObjectMeta.class, JObjectKey.of(request.getName().getName())).orElse(null); + + if (obj == null) { + builder.setDeletionCandidate(true); + return; + } + + builder.setDeletionCandidate(!obj.frozen() && obj.refsFrom().isEmpty()); + + if (!builder.getDeletionCandidate()) { + for (var r : obj.refsFrom()) { + builder.addReferrers(JObjectKeyP.newBuilder().setName(r.obj().toString()).build()); + curTx.onCommit(() -> autosyncProcessor.add(r.obj())); + } + } + }); + return Uni.createFrom().item(builder.build()); + } + + public Uni opPush(PeerId from, OpPushRequest request) { + try { + var ops = request.getMsgList().stream().map(opProtoSerializer::deserialize).toList(); + for (var op : ops) { + Log.info("<-- op: " + op + " from " + from); + txm.run(() -> { + opHandler.handleOp(from, op); + }); + } + } catch (Exception e) { + Log.error(e, e); + throw e; + } + return Uni.createFrom().item(OpPushReply.getDefaultInstance()); + } + + public Uni ping(PeerId from, PingRequest request) { + return Uni.createFrom().item(PingReply.getDefaultInstance()); + } +}