mirror of
https://github.com/usatiuk/dhfs.git
synced 2025-10-29 04:57:48 +01:00
Server: move server impl to a separate bean
This commit is contained in:
@@ -24,165 +24,36 @@ import org.apache.commons.lang3.tuple.Pair;
|
|||||||
@GrpcService
|
@GrpcService
|
||||||
@RolesAllowed("cluster-member")
|
@RolesAllowed("cluster-member")
|
||||||
public class RemoteObjectServiceServer implements DhfsObjectSyncGrpc {
|
public class RemoteObjectServiceServer implements DhfsObjectSyncGrpc {
|
||||||
// @Inject
|
|
||||||
// SyncHandler syncHandler;
|
|
||||||
|
|
||||||
@Inject
|
|
||||||
TransactionManager txm;
|
|
||||||
@Inject
|
|
||||||
PeerManager peerManager;
|
|
||||||
@Inject
|
|
||||||
Transaction curTx;
|
|
||||||
@Inject
|
|
||||||
PersistentPeerDataService persistentPeerDataService;
|
|
||||||
|
|
||||||
@Inject
|
|
||||||
InvalidationQueueService invalidationQueueService;
|
|
||||||
@Inject
|
@Inject
|
||||||
SecurityIdentity identity;
|
SecurityIdentity identity;
|
||||||
@Inject
|
@Inject
|
||||||
ProtoSerializer<OpP, Op> opProtoSerializer;
|
RemoteObjectServiceServerImpl remoteObjectServiceServerImpl;
|
||||||
@Inject
|
|
||||||
ProtoSerializer<GetObjectReply, ReceivedObject> receivedObjectProtoSerializer;
|
PeerId getIdentity() {
|
||||||
@Inject
|
return PeerId.of(identity.getPrincipal().getName().substring(3));
|
||||||
RemoteTransaction remoteTx;
|
}
|
||||||
@Inject
|
|
||||||
OpHandler opHandler;
|
|
||||||
@Inject
|
|
||||||
DtoMapperService dtoMapperService;
|
|
||||||
@Inject
|
|
||||||
AutosyncProcessor autosyncProcessor;
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@Blocking
|
@Blocking
|
||||||
public Uni<GetObjectReply> getObject(GetObjectRequest request) {
|
public Uni<GetObjectReply> getObject(GetObjectRequest request) {
|
||||||
Log.info("<-- getObject: " + request.getName() + " from " + identity.getPrincipal().getName().substring(3));
|
return remoteObjectServiceServerImpl.getObject(getIdentity(), request);
|
||||||
|
|
||||||
Pair<RemoteObjectMeta, JDataRemoteDto> got = txm.run(() -> {
|
|
||||||
var meta = remoteTx.getMeta(JObjectKey.of(request.getName().getName())).orElse(null);
|
|
||||||
var obj = remoteTx.getDataLocal(JDataRemote.class, JObjectKey.of(request.getName().getName())).orElse(null);
|
|
||||||
if (meta != null && !meta.seen())
|
|
||||||
curTx.put(meta.withSeen(true));
|
|
||||||
if (obj != null)
|
|
||||||
for (var ref : obj.collectRefsTo()) {
|
|
||||||
var refMeta = remoteTx.getMeta(ref).orElse(null);
|
|
||||||
if (refMeta != null && !refMeta.seen())
|
|
||||||
curTx.put(refMeta.withSeen(true));
|
|
||||||
}
|
|
||||||
return Pair.of(meta, obj == null ? null : dtoMapperService.toDto(obj, obj.dtoClass()));
|
|
||||||
});
|
|
||||||
|
|
||||||
if ((got.getValue() != null) && (got.getKey() == null)) {
|
|
||||||
Log.error("Inconsistent state for object meta: " + request.getName());
|
|
||||||
throw new StatusRuntimeException(Status.INTERNAL);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (got.getValue() == null) {
|
|
||||||
Log.info("<-- getObject NOT FOUND: " + request.getName() + " from " + identity.getPrincipal().getName().substring(3));
|
|
||||||
throw new StatusRuntimeException(Status.NOT_FOUND);
|
|
||||||
}
|
|
||||||
|
|
||||||
var serialized = receivedObjectProtoSerializer.serialize(new ReceivedObject(got.getKey().changelog(), got.getRight()));
|
|
||||||
return Uni.createFrom().item(serialized);
|
|
||||||
// // Does @Blocking break this?
|
|
||||||
// return Uni.createFrom().emitter(emitter -> {
|
|
||||||
// try {
|
|
||||||
// } catch (Exception e) {
|
|
||||||
// emitter.fail(e);
|
|
||||||
// }
|
|
||||||
// var replyObj = txm.run(() -> {
|
|
||||||
// var cur = curTx.get(JDataRemote.class, JObjectKey.of(request.getName())).orElseThrow(() -> new StatusRuntimeException(Status.NOT_FOUND));
|
|
||||||
// // Obj.markSeen before markSeen of its children
|
|
||||||
// obj.markSeen();
|
|
||||||
// return obj.runReadLocked(JObjectManager.ResolutionStrategy.LOCAL_ONLY, (meta, data) -> {
|
|
||||||
// if (meta.isOnlyLocal())
|
|
||||||
// throw new StatusRuntimeExceptionNoStacktrace(Status.INVALID_ARGUMENT.withDescription("Trying to get local-only object"));
|
|
||||||
// if (data == null) {
|
|
||||||
// Log.info("<-- getObject FAIL: " + request.getName() + " from " + request.getSelfUuid());
|
|
||||||
// throw new StatusRuntimeException(Status.ABORTED.withDescription("Not available locally"));
|
|
||||||
// }
|
|
||||||
// data.extractRefs().forEach(ref ->
|
|
||||||
// jObjectManager.get(ref)
|
|
||||||
// .orElseThrow(() -> new IllegalStateException("Non-hydrated refs for local object?"))
|
|
||||||
// .markSeen());
|
|
||||||
//
|
|
||||||
// return ApiObject.newBuilder()
|
|
||||||
// .setHeader(obj.getMeta().toRpcHeader())
|
|
||||||
// .setContent(dataProtoSerializer.serialize(obj.getData())).build();
|
|
||||||
// });
|
|
||||||
// });
|
|
||||||
// var ret = GetObjectReply.newBuilder()
|
|
||||||
// .setSelfUuid(persistentPeerDataService.getSelfUuid().toString())
|
|
||||||
// .setObject(replyObj).build();
|
|
||||||
// emitter.complete(ret);
|
|
||||||
// // TODO: Could this cause problems if we wait for too long?
|
|
||||||
//// obj.commitFenceAsync(() -> emitter.complete(ret));
|
|
||||||
// });
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@Blocking
|
@Blocking
|
||||||
public Uni<CanDeleteReply> canDelete(CanDeleteRequest request) {
|
public Uni<CanDeleteReply> canDelete(CanDeleteRequest request) {
|
||||||
var peerId = identity.getPrincipal().getName().substring(3);
|
return remoteObjectServiceServerImpl.canDelete(getIdentity(), request);
|
||||||
|
|
||||||
Log.info("<-- canDelete: " + request.getName() + " from " + peerId);
|
|
||||||
|
|
||||||
var builder = CanDeleteReply.newBuilder();
|
|
||||||
|
|
||||||
txm.run(() -> {
|
|
||||||
var obj = curTx.get(RemoteObjectMeta.class, JObjectKey.of(request.getName().getName())).orElse(null);
|
|
||||||
|
|
||||||
if (obj == null) {
|
|
||||||
builder.setDeletionCandidate(true);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
builder.setDeletionCandidate(!obj.frozen() && obj.refsFrom().isEmpty());
|
|
||||||
|
|
||||||
if (!builder.getDeletionCandidate()) {
|
|
||||||
for (var r : obj.refsFrom()) {
|
|
||||||
builder.addReferrers(JObjectKeyP.newBuilder().setName(r.obj().toString()).build());
|
|
||||||
curTx.onCommit(() -> autosyncProcessor.add(r.obj()));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
return Uni.createFrom().item(builder.build());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// @Override
|
|
||||||
// @Blocking
|
|
||||||
// public Uni<IndexUpdateReply> indexUpdate(IndexUpdatePush request) {
|
|
||||||
// if (request.getSelfUuid().isBlank()) throw new StatusRuntimeException(Status.INVALID_ARGUMENT);
|
|
||||||
// if (!persistentPeerDataService.existsHost(UUID.fromString(request.getSelfUuid())))
|
|
||||||
// throw new StatusRuntimeException(Status.UNAUTHENTICATED);
|
|
||||||
//
|
|
||||||
// Log.info("<-- indexUpdate: " + request.getHeader().getName());
|
|
||||||
// return jObjectTxManager.executeTxAndFlush(() -> {
|
|
||||||
// return Uni.createFrom().item(syncHandler.handleRemoteUpdate(request));
|
|
||||||
// });
|
|
||||||
// }
|
|
||||||
@Override
|
@Override
|
||||||
@Blocking
|
@Blocking
|
||||||
public Uni<OpPushReply> opPush(OpPushRequest request) {
|
public Uni<OpPushReply> opPush(OpPushRequest request) {
|
||||||
try {
|
return remoteObjectServiceServerImpl.opPush(getIdentity(), request);
|
||||||
var ops = request.getMsgList().stream().map(opProtoSerializer::deserialize).toList();
|
|
||||||
for (var op : ops) {
|
|
||||||
Log.info("<-- op: " + op + " from " + identity.getPrincipal().getName().substring(3));
|
|
||||||
txm.run(() -> {
|
|
||||||
opHandler.handleOp(PeerId.of(identity.getPrincipal().getName().substring(3)), op);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
} catch (Exception e) {
|
|
||||||
Log.error(e, e);
|
|
||||||
throw e;
|
|
||||||
}
|
|
||||||
return Uni.createFrom().item(OpPushReply.getDefaultInstance());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@Blocking
|
@Blocking
|
||||||
public Uni<PingReply> ping(PingRequest request) {
|
public Uni<PingReply> ping(PingRequest request) {
|
||||||
return Uni.createFrom().item(PingReply.getDefaultInstance());
|
return remoteObjectServiceServerImpl.ping(getIdentity(), request);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,123 @@
|
|||||||
|
package com.usatiuk.dhfs.objects.repository;
|
||||||
|
|
||||||
|
import com.usatiuk.autoprotomap.runtime.ProtoSerializer;
|
||||||
|
import com.usatiuk.dhfs.objects.*;
|
||||||
|
import com.usatiuk.dhfs.objects.persistence.JObjectKeyP;
|
||||||
|
import com.usatiuk.dhfs.objects.repository.invalidation.Op;
|
||||||
|
import com.usatiuk.dhfs.objects.repository.invalidation.OpHandler;
|
||||||
|
import com.usatiuk.dhfs.objects.repository.syncmap.DtoMapperService;
|
||||||
|
import com.usatiuk.dhfs.objects.transaction.Transaction;
|
||||||
|
import com.usatiuk.dhfs.objects.transaction.TransactionManager;
|
||||||
|
import io.grpc.Status;
|
||||||
|
import io.grpc.StatusRuntimeException;
|
||||||
|
import io.quarkus.grpc.GrpcService;
|
||||||
|
import io.quarkus.logging.Log;
|
||||||
|
import io.quarkus.security.identity.SecurityIdentity;
|
||||||
|
import io.smallrye.common.annotation.Blocking;
|
||||||
|
import io.smallrye.mutiny.Uni;
|
||||||
|
import jakarta.annotation.security.RolesAllowed;
|
||||||
|
import jakarta.enterprise.context.ApplicationScoped;
|
||||||
|
import jakarta.inject.Inject;
|
||||||
|
import org.apache.commons.lang3.tuple.Pair;
|
||||||
|
|
||||||
|
// Note: RunOnVirtualThread hangs somehow
|
||||||
|
@ApplicationScoped
|
||||||
|
public class RemoteObjectServiceServerImpl {
|
||||||
|
@Inject
|
||||||
|
TransactionManager txm;
|
||||||
|
@Inject
|
||||||
|
PeerManager peerManager;
|
||||||
|
@Inject
|
||||||
|
Transaction curTx;
|
||||||
|
|
||||||
|
@Inject
|
||||||
|
ProtoSerializer<OpP, Op> opProtoSerializer;
|
||||||
|
@Inject
|
||||||
|
ProtoSerializer<GetObjectReply, ReceivedObject> receivedObjectProtoSerializer;
|
||||||
|
@Inject
|
||||||
|
RemoteTransaction remoteTx;
|
||||||
|
@Inject
|
||||||
|
OpHandler opHandler;
|
||||||
|
@Inject
|
||||||
|
DtoMapperService dtoMapperService;
|
||||||
|
@Inject
|
||||||
|
AutosyncProcessor autosyncProcessor;
|
||||||
|
|
||||||
|
public Uni<GetObjectReply> getObject(PeerId from, GetObjectRequest request) {
|
||||||
|
Log.info("<-- getObject: " + request.getName() + " from " + from);
|
||||||
|
|
||||||
|
Pair<RemoteObjectMeta, JDataRemoteDto> got = txm.run(() -> {
|
||||||
|
var meta = remoteTx.getMeta(JObjectKey.of(request.getName().getName())).orElse(null);
|
||||||
|
var obj = remoteTx.getDataLocal(JDataRemote.class, JObjectKey.of(request.getName().getName())).orElse(null);
|
||||||
|
if (meta != null && !meta.seen())
|
||||||
|
curTx.put(meta.withSeen(true));
|
||||||
|
if (obj != null)
|
||||||
|
for (var ref : obj.collectRefsTo()) {
|
||||||
|
var refMeta = remoteTx.getMeta(ref).orElse(null);
|
||||||
|
if (refMeta != null && !refMeta.seen())
|
||||||
|
curTx.put(refMeta.withSeen(true));
|
||||||
|
}
|
||||||
|
return Pair.of(meta, obj == null ? null : dtoMapperService.toDto(obj, obj.dtoClass()));
|
||||||
|
});
|
||||||
|
|
||||||
|
if ((got.getValue() != null) && (got.getKey() == null)) {
|
||||||
|
Log.error("Inconsistent state for object meta: " + request.getName());
|
||||||
|
throw new StatusRuntimeException(Status.INTERNAL);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (got.getValue() == null) {
|
||||||
|
Log.info("<-- getObject NOT FOUND: " + request.getName() + " from " + from);
|
||||||
|
throw new StatusRuntimeException(Status.NOT_FOUND);
|
||||||
|
}
|
||||||
|
|
||||||
|
var serialized = receivedObjectProtoSerializer.serialize(new ReceivedObject(got.getKey().changelog(), got.getRight()));
|
||||||
|
return Uni.createFrom().item(serialized);
|
||||||
|
}
|
||||||
|
|
||||||
|
public Uni<CanDeleteReply> canDelete(PeerId from, CanDeleteRequest request) {
|
||||||
|
var peerId = from;
|
||||||
|
|
||||||
|
Log.info("<-- canDelete: " + request.getName() + " from " + peerId);
|
||||||
|
|
||||||
|
var builder = CanDeleteReply.newBuilder();
|
||||||
|
|
||||||
|
txm.run(() -> {
|
||||||
|
var obj = curTx.get(RemoteObjectMeta.class, JObjectKey.of(request.getName().getName())).orElse(null);
|
||||||
|
|
||||||
|
if (obj == null) {
|
||||||
|
builder.setDeletionCandidate(true);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
builder.setDeletionCandidate(!obj.frozen() && obj.refsFrom().isEmpty());
|
||||||
|
|
||||||
|
if (!builder.getDeletionCandidate()) {
|
||||||
|
for (var r : obj.refsFrom()) {
|
||||||
|
builder.addReferrers(JObjectKeyP.newBuilder().setName(r.obj().toString()).build());
|
||||||
|
curTx.onCommit(() -> autosyncProcessor.add(r.obj()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
return Uni.createFrom().item(builder.build());
|
||||||
|
}
|
||||||
|
|
||||||
|
public Uni<OpPushReply> opPush(PeerId from, OpPushRequest request) {
|
||||||
|
try {
|
||||||
|
var ops = request.getMsgList().stream().map(opProtoSerializer::deserialize).toList();
|
||||||
|
for (var op : ops) {
|
||||||
|
Log.info("<-- op: " + op + " from " + from);
|
||||||
|
txm.run(() -> {
|
||||||
|
opHandler.handleOp(from, op);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
Log.error(e, e);
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
return Uni.createFrom().item(OpPushReply.getDefaultInstance());
|
||||||
|
}
|
||||||
|
|
||||||
|
public Uni<PingReply> ping(PeerId from, PingRequest request) {
|
||||||
|
return Uni.createFrom().item(PingReply.getDefaultInstance());
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user