mirror of
https://github.com/usatiuk/dhfs.git
synced 2025-10-28 20:47:49 +01:00
Revert "Server: hopefully working reverse proxy connection"
This reverts commit 312cf18b27.
This commit is contained in:
@@ -3,13 +3,10 @@ package com.usatiuk.dhfs.repository;
|
||||
import io.grpc.Context;
|
||||
import io.grpc.Metadata;
|
||||
|
||||
import java.net.SocketAddress;
|
||||
|
||||
public abstract class ProxyConstants {
|
||||
static final Metadata.Key<String> PROXY_TO_HEADER_KEY = Metadata.Key.of("proxy_to", Metadata.ASCII_STRING_MARSHALLER);
|
||||
static final Metadata.Key<String> PROXY_FROM_HEADER_KEY = Metadata.Key.of("proxy_from", Metadata.ASCII_STRING_MARSHALLER);
|
||||
|
||||
static final Context.Key<String> PROXY_TO_HEADER_KEY_CTX = Context.key("proxy_to");
|
||||
static final Context.Key<SocketAddress> PROXY_TO_FROM_ADDR_KEY_CTX = Context.key("proxy_to_from_addr");
|
||||
static final Context.Key<String> PROXY_FROM_HEADER_KEY_CTX = Context.key("proxy_from");
|
||||
}
|
||||
|
||||
@@ -17,14 +17,11 @@ public class ProxyServerInterceptor implements ServerInterceptor {
|
||||
}
|
||||
|
||||
if (metadata.containsKey(PROXY_TO_HEADER_KEY)) {
|
||||
var socketAddress = serverCall.getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR);
|
||||
context = Context.current().withValue(PROXY_TO_HEADER_KEY_CTX, metadata.get(PROXY_TO_HEADER_KEY))
|
||||
.withValue(PROXY_TO_FROM_ADDR_KEY_CTX, socketAddress);
|
||||
context = Context.current().withValue(PROXY_TO_HEADER_KEY_CTX, metadata.get(PROXY_TO_HEADER_KEY));
|
||||
} else if (metadata.containsKey(PROXY_FROM_HEADER_KEY)) {
|
||||
context = Context.current().withValue(PROXY_FROM_HEADER_KEY_CTX, metadata.get(PROXY_FROM_HEADER_KEY));
|
||||
}
|
||||
|
||||
|
||||
if (context != null) {
|
||||
return Contexts.interceptCall(
|
||||
context,
|
||||
|
||||
@@ -1,9 +1,6 @@
|
||||
package com.usatiuk.dhfs.repository;
|
||||
|
||||
import com.usatiuk.dhfs.PeerId;
|
||||
import com.usatiuk.dhfs.repository.peerdiscovery.IpPeerAddress;
|
||||
import com.usatiuk.dhfs.repository.peerdiscovery.PeerAddressType;
|
||||
import com.usatiuk.dhfs.repository.peerdiscovery.PeerDiscoveryDirectory;
|
||||
import io.quarkus.grpc.GrpcService;
|
||||
import io.quarkus.grpc.RegisterInterceptor;
|
||||
import io.quarkus.logging.Log;
|
||||
@@ -13,14 +10,13 @@ import io.smallrye.mutiny.Uni;
|
||||
import jakarta.annotation.security.RolesAllowed;
|
||||
import jakarta.inject.Inject;
|
||||
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.Optional;
|
||||
import java.util.function.Function;
|
||||
|
||||
import static com.usatiuk.dhfs.repository.ProxyConstants.*;
|
||||
import static com.usatiuk.dhfs.repository.ProxyConstants.PROXY_FROM_HEADER_KEY_CTX;
|
||||
import static com.usatiuk.dhfs.repository.ProxyConstants.PROXY_TO_HEADER_KEY_CTX;
|
||||
|
||||
// Note: RunOnVirtualThread hangs somehow
|
||||
|
||||
@GrpcService
|
||||
@RolesAllowed("cluster-member")
|
||||
@RegisterInterceptor(ProxyServerInterceptor.class)
|
||||
@@ -33,8 +29,7 @@ public class RemoteObjectServiceServer implements DhfsObjectSyncGrpc {
|
||||
RpcClientFactory rpcClientFactory;
|
||||
@Inject
|
||||
PeerManager peerManager;
|
||||
@Inject
|
||||
PeerDiscoveryDirectory peerDiscoveryDirectory;
|
||||
|
||||
|
||||
private PeerId getIdentity() {
|
||||
if (PROXY_FROM_HEADER_KEY_CTX.get() != null) {
|
||||
@@ -54,21 +49,6 @@ public class RemoteObjectServiceServer implements DhfsObjectSyncGrpc {
|
||||
private <T> Optional<Uni<T>> tryProxy(Function<DhfsObjectSyncGrpcGrpc.DhfsObjectSyncGrpcBlockingStub, T> fn) {
|
||||
var proxyTarget = getProxyTarget();
|
||||
if (proxyTarget != null) {
|
||||
var fromAddr = PROXY_TO_FROM_ADDR_KEY_CTX.get();
|
||||
if (fromAddr instanceof InetSocketAddress inetAddr) {
|
||||
peerDiscoveryDirectory.notifyAddr(
|
||||
new IpPeerAddress(
|
||||
PeerId.of(identity.getPrincipal().getName().substring(3)),
|
||||
PeerAddressType.WAN,
|
||||
inetAddr.getAddress(),
|
||||
-1,
|
||||
inetAddr.getPort()
|
||||
)
|
||||
);
|
||||
} else {
|
||||
Log.warnv("Expected InetSocketAddress but got {0}", fromAddr);
|
||||
}
|
||||
|
||||
return Optional.of(Uni.createFrom().item(rpcClientFactory.<T>withObjSyncClient(
|
||||
proxyTarget,
|
||||
getIdentity(),
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
quarkus.grpc.server.use-separate-server=false
|
||||
dhfs.objects.peerdiscovery.port=42069
|
||||
dhfs.objects.peerdiscovery.interval=4s
|
||||
dhfs.objects.peerdiscovery.interval=5s
|
||||
dhfs.objects.peerdiscovery.broadcast=true
|
||||
dhfs.objects.sync.timeout=30
|
||||
dhfs.objects.sync.ping.timeout=5
|
||||
@@ -33,7 +33,7 @@ dhfs.objects.ref-processor.threads=4
|
||||
dhfs.objects.opsender.batch-size=100
|
||||
dhfs.objects.lock_timeout_secs=2
|
||||
dhfs.local-discovery=true
|
||||
dhfs.peerdiscovery.timeout=10000
|
||||
dhfs.peerdiscovery.timeout=5000
|
||||
quarkus.log.category."com.usatiuk".min-level=TRACE
|
||||
quarkus.log.category."com.usatiuk".level=TRACE
|
||||
quarkus.http.insecure-requests=enabled
|
||||
|
||||
Reference in New Issue
Block a user