mirror of
https://github.com/usatiuk/dhfs.git
synced 2025-10-29 04:57:48 +01:00
require tls auth for grpc
This commit is contained in:
@@ -23,8 +23,8 @@ echo Downloading...
|
|||||||
|
|
||||||
cd "$SCRIPT_DIR"
|
cd "$SCRIPT_DIR"
|
||||||
|
|
||||||
rm "Run wrapper.zip"
|
rm "Run wrapper.zip" || true
|
||||||
rm -rf "dhfs"
|
rm -rf "dhfs" || true
|
||||||
|
|
||||||
wget https://nightly.link/usatiuk/dhfs/actions/runs/$LATEST/Run%20wrapper.zip
|
wget https://nightly.link/usatiuk/dhfs/actions/runs/$LATEST/Run%20wrapper.zip
|
||||||
|
|
||||||
|
|||||||
@@ -73,6 +73,14 @@
|
|||||||
<groupId>io.quarkus</groupId>
|
<groupId>io.quarkus</groupId>
|
||||||
<artifactId>quarkus-rest</artifactId>
|
<artifactId>quarkus-rest</artifactId>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>io.quarkus</groupId>
|
||||||
|
<artifactId>quarkus-rest-client</artifactId>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>io.quarkus</groupId>
|
||||||
|
<artifactId>quarkus-rest-client-jsonb</artifactId>
|
||||||
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>io.quarkus</groupId>
|
<groupId>io.quarkus</groupId>
|
||||||
<artifactId>quarkus-rest-jsonb</artifactId>
|
<artifactId>quarkus-rest-jsonb</artifactId>
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
package com.usatiuk.dhfs.objects.repository;
|
package com.usatiuk.dhfs.objects.repository;
|
||||||
|
|
||||||
import com.usatiuk.dhfs.objects.repository.peersync.GetSelfInfoRequest;
|
import com.usatiuk.dhfs.objects.repository.peersync.PeerSyncApiClientDynamic;
|
||||||
import com.usatiuk.dhfs.objects.repository.peersync.PersistentPeerInfo;
|
import com.usatiuk.dhfs.objects.repository.peersync.PersistentPeerInfo;
|
||||||
import com.usatiuk.dhfs.objects.repository.webapi.AvailablePeerInfo;
|
import com.usatiuk.dhfs.objects.repository.webapi.AvailablePeerInfo;
|
||||||
import io.quarkus.logging.Log;
|
import io.quarkus.logging.Log;
|
||||||
@@ -16,10 +16,7 @@ import org.eclipse.microprofile.config.inject.ConfigProperty;
|
|||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.security.cert.CertificateException;
|
import java.security.cert.CertificateException;
|
||||||
import java.util.Collection;
|
import java.util.*;
|
||||||
import java.util.List;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.UUID;
|
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.ConcurrentMap;
|
import java.util.concurrent.ConcurrentMap;
|
||||||
|
|
||||||
@@ -33,6 +30,8 @@ public class RemoteHostManager {
|
|||||||
SyncHandler syncHandler;
|
SyncHandler syncHandler;
|
||||||
@Inject
|
@Inject
|
||||||
RpcClientFactory rpcClientFactory;
|
RpcClientFactory rpcClientFactory;
|
||||||
|
@Inject
|
||||||
|
PeerSyncApiClientDynamic peerSyncApiClient;
|
||||||
@ConfigProperty(name = "dhfs.objects.sync.ping.timeout")
|
@ConfigProperty(name = "dhfs.objects.sync.ping.timeout")
|
||||||
long pingTimeout;
|
long pingTimeout;
|
||||||
boolean _initialized = false;
|
boolean _initialized = false;
|
||||||
@@ -184,13 +183,12 @@ public class RemoteHostManager {
|
|||||||
|
|
||||||
// FIXME: race?
|
// FIXME: race?
|
||||||
|
|
||||||
var info = rpcClientFactory.withPeerSyncClient(state.getAddr(), state.getPort(), 10000L, c -> {
|
var info = peerSyncApiClient.getSelfInfo(state.getAddr(), state.getPort());
|
||||||
return c.getSelfInfo(GetSelfInfoRequest.getDefaultInstance());
|
|
||||||
});
|
|
||||||
|
|
||||||
try {
|
try {
|
||||||
persistentRemoteHostsService.addHost(
|
persistentRemoteHostsService.addHost(
|
||||||
new PersistentPeerInfo(UUID.fromString(info.getUuid()), CertificateTools.certFromBytes(info.getCert().toByteArray())));
|
new PersistentPeerInfo(UUID.fromString(info.selfUuid()),
|
||||||
|
CertificateTools.certFromBytes(Base64.getDecoder().decode(info.cert()))));
|
||||||
Log.info("Added host: " + host.toString());
|
Log.info("Added host: " + host.toString());
|
||||||
} catch (CertificateException e) {
|
} catch (CertificateException e) {
|
||||||
throw new RuntimeException(e);
|
throw new RuntimeException(e);
|
||||||
|
|||||||
@@ -1,6 +1,5 @@
|
|||||||
package com.usatiuk.dhfs.objects.repository;
|
package com.usatiuk.dhfs.objects.repository;
|
||||||
|
|
||||||
import com.usatiuk.dhfs.objects.repository.peersync.DhfsObjectPeerSyncGrpcGrpc;
|
|
||||||
import io.grpc.Status;
|
import io.grpc.Status;
|
||||||
import io.grpc.StatusRuntimeException;
|
import io.grpc.StatusRuntimeException;
|
||||||
import io.quarkus.logging.Log;
|
import io.quarkus.logging.Log;
|
||||||
@@ -32,7 +31,6 @@ public class RpcClientFactory {
|
|||||||
RpcChannelFactory rpcChannelFactory;
|
RpcChannelFactory rpcChannelFactory;
|
||||||
// FIXME: Leaks!
|
// FIXME: Leaks!
|
||||||
private ConcurrentMap<ObjSyncStubKey, DhfsObjectSyncGrpcGrpc.DhfsObjectSyncGrpcBlockingStub> _objSyncCache = new ConcurrentHashMap<>();
|
private ConcurrentMap<ObjSyncStubKey, DhfsObjectSyncGrpcGrpc.DhfsObjectSyncGrpcBlockingStub> _objSyncCache = new ConcurrentHashMap<>();
|
||||||
private ConcurrentMap<PeerSyncStubKey, DhfsObjectPeerSyncGrpcGrpc.DhfsObjectPeerSyncGrpcBlockingStub> _peerSyncCache = new ConcurrentHashMap<>();
|
|
||||||
|
|
||||||
public <R> R withObjSyncClient(Collection<UUID> targets, ObjectSyncClientFunction<R> fn) {
|
public <R> R withObjSyncClient(Collection<UUID> targets, ObjectSyncClientFunction<R> fn) {
|
||||||
var shuffledList = new ArrayList<>(targets);
|
var shuffledList = new ArrayList<>(targets);
|
||||||
@@ -85,27 +83,9 @@ public class RpcClientFactory {
|
|||||||
return fn.apply(stub.withDeadlineAfter(timeout, TimeUnit.SECONDS));
|
return fn.apply(stub.withDeadlineAfter(timeout, TimeUnit.SECONDS));
|
||||||
}
|
}
|
||||||
|
|
||||||
public <R> R withPeerSyncClient(UUID target, PeerSyncClientFunction<R> fn) {
|
|
||||||
var hostinfo = remoteHostManager.getTransientState(target);
|
|
||||||
if (hostinfo.getAddr() == null) throw new IllegalStateException("Address for " + target + " not yet known");
|
|
||||||
return withPeerSyncClient(hostinfo.getAddr(), hostinfo.getPort(), peerSyncTimeout, fn);
|
|
||||||
}
|
|
||||||
|
|
||||||
public <R> R withPeerSyncClient(String addr, int port, long timeout, PeerSyncClientFunction<R> fn) {
|
|
||||||
var key = new PeerSyncStubKey(addr, port);
|
|
||||||
var stub = _peerSyncCache.computeIfAbsent(key, (k) -> {
|
|
||||||
var channel = rpcChannelFactory.getInsecureChannel(addr, port);
|
|
||||||
return DhfsObjectPeerSyncGrpcGrpc.newBlockingStub(channel)
|
|
||||||
.withMaxOutboundMessageSize(Integer.MAX_VALUE)
|
|
||||||
.withMaxInboundMessageSize(Integer.MAX_VALUE);
|
|
||||||
});
|
|
||||||
return fn.apply(stub.withDeadlineAfter(timeout, TimeUnit.SECONDS));
|
|
||||||
}
|
|
||||||
|
|
||||||
public void dropCache() {
|
public void dropCache() {
|
||||||
rpcChannelFactory.dropCache();
|
rpcChannelFactory.dropCache();
|
||||||
_objSyncCache = new ConcurrentHashMap<>();
|
_objSyncCache = new ConcurrentHashMap<>();
|
||||||
_peerSyncCache = new ConcurrentHashMap<>();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@FunctionalInterface
|
@FunctionalInterface
|
||||||
@@ -113,15 +93,7 @@ public class RpcClientFactory {
|
|||||||
R apply(DhfsObjectSyncGrpcGrpc.DhfsObjectSyncGrpcBlockingStub client);
|
R apply(DhfsObjectSyncGrpcGrpc.DhfsObjectSyncGrpcBlockingStub client);
|
||||||
}
|
}
|
||||||
|
|
||||||
@FunctionalInterface
|
|
||||||
public interface PeerSyncClientFunction<R> {
|
|
||||||
R apply(DhfsObjectPeerSyncGrpcGrpc.DhfsObjectPeerSyncGrpcBlockingStub client);
|
|
||||||
}
|
|
||||||
|
|
||||||
private record ObjSyncStubKey(String host, String address, int port) {
|
private record ObjSyncStubKey(String host, String address, int port) {
|
||||||
}
|
}
|
||||||
|
|
||||||
private record PeerSyncStubKey(String address, int port) {
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,4 @@
|
|||||||
|
package com.usatiuk.dhfs.objects.repository.peersync;
|
||||||
|
|
||||||
|
public record PeerInfo(String selfUuid, String cert) {
|
||||||
|
}
|
||||||
@@ -0,0 +1,26 @@
|
|||||||
|
package com.usatiuk.dhfs.objects.repository.peersync;
|
||||||
|
|
||||||
|
import com.usatiuk.dhfs.objects.repository.PersistentRemoteHostsService;
|
||||||
|
import jakarta.inject.Inject;
|
||||||
|
import jakarta.ws.rs.GET;
|
||||||
|
import jakarta.ws.rs.Path;
|
||||||
|
|
||||||
|
import java.security.cert.CertificateEncodingException;
|
||||||
|
import java.util.Base64;
|
||||||
|
|
||||||
|
@Path("/peer-info")
|
||||||
|
public class PeerSyncApi {
|
||||||
|
@Inject
|
||||||
|
PersistentRemoteHostsService persistentRemoteHostsService;
|
||||||
|
|
||||||
|
@Path("self")
|
||||||
|
@GET
|
||||||
|
public PeerInfo getSelfInfo() {
|
||||||
|
try {
|
||||||
|
return new PeerInfo(persistentRemoteHostsService.getSelfUuid().toString(),
|
||||||
|
Base64.getEncoder().encodeToString(persistentRemoteHostsService.getSelfCertificate().getEncoded()));
|
||||||
|
} catch (CertificateEncodingException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,11 @@
|
|||||||
|
package com.usatiuk.dhfs.objects.repository.peersync;
|
||||||
|
|
||||||
|
import jakarta.ws.rs.GET;
|
||||||
|
import jakarta.ws.rs.Path;
|
||||||
|
|
||||||
|
@Path("/peer-info")
|
||||||
|
public interface PeerSyncApiClient {
|
||||||
|
@Path("self")
|
||||||
|
@GET
|
||||||
|
PeerInfo getSelfInfo();
|
||||||
|
}
|
||||||
@@ -0,0 +1,19 @@
|
|||||||
|
package com.usatiuk.dhfs.objects.repository.peersync;
|
||||||
|
|
||||||
|
import io.quarkus.rest.client.reactive.QuarkusRestClientBuilder;
|
||||||
|
import jakarta.enterprise.context.ApplicationScoped;
|
||||||
|
|
||||||
|
import java.net.URI;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
@ApplicationScoped
|
||||||
|
public class PeerSyncApiClientDynamic {
|
||||||
|
public PeerInfo getSelfInfo(String addr, int port) {
|
||||||
|
var client = QuarkusRestClientBuilder.newBuilder()
|
||||||
|
.baseUri(URI.create("http://" + addr + ":" + port))
|
||||||
|
.connectTimeout(5, TimeUnit.SECONDS)
|
||||||
|
.readTimeout(5, TimeUnit.SECONDS)
|
||||||
|
.build(PeerSyncApiClient.class);
|
||||||
|
return client.getSelfInfo();
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -1,30 +0,0 @@
|
|||||||
package com.usatiuk.dhfs.objects.repository.peersync;
|
|
||||||
|
|
||||||
import com.google.protobuf.ByteString;
|
|
||||||
import com.usatiuk.dhfs.objects.repository.PersistentRemoteHostsService;
|
|
||||||
import io.quarkus.grpc.GrpcService;
|
|
||||||
import io.smallrye.common.annotation.Blocking;
|
|
||||||
import io.smallrye.mutiny.Uni;
|
|
||||||
import jakarta.inject.Inject;
|
|
||||||
|
|
||||||
import java.security.cert.CertificateEncodingException;
|
|
||||||
|
|
||||||
@GrpcService
|
|
||||||
public class PeerSyncServer implements DhfsObjectPeerSyncGrpc {
|
|
||||||
@Inject
|
|
||||||
PersistentRemoteHostsService persistentRemoteHostsService;
|
|
||||||
|
|
||||||
@Override
|
|
||||||
@Blocking
|
|
||||||
public Uni<PeerInfo> getSelfInfo(GetSelfInfoRequest request) {
|
|
||||||
try {
|
|
||||||
return Uni.createFrom().item(
|
|
||||||
PeerInfo.newBuilder()
|
|
||||||
.setUuid(persistentRemoteHostsService.getSelfUuid().toString())
|
|
||||||
.setCert(ByteString.copyFrom(persistentRemoteHostsService.getSelfCertificate().getEncoded()))
|
|
||||||
.build());
|
|
||||||
} catch (CertificateEncodingException e) {
|
|
||||||
throw new RuntimeException(e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -1,20 +0,0 @@
|
|||||||
syntax = "proto3";
|
|
||||||
|
|
||||||
option java_multiple_files = true;
|
|
||||||
option java_package = "com.usatiuk.dhfs.objects.repository.peersync";
|
|
||||||
option java_outer_classname = "DhfsObjectPeerSyncApi";
|
|
||||||
|
|
||||||
package dhfs.objects.peersync;
|
|
||||||
|
|
||||||
service DhfsObjectPeerSyncGrpc {
|
|
||||||
rpc GetSelfInfo (GetSelfInfoRequest) returns (PeerInfo) {}
|
|
||||||
}
|
|
||||||
|
|
||||||
message GetSelfInfoRequest {
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
message PeerInfo {
|
|
||||||
string uuid = 1;
|
|
||||||
bytes cert = 2;
|
|
||||||
}
|
|
||||||
@@ -34,4 +34,4 @@ dhfs.objects.move-processor.threads=2
|
|||||||
quarkus.log.category."com.usatiuk.dhfs".min-level=TRACE
|
quarkus.log.category."com.usatiuk.dhfs".min-level=TRACE
|
||||||
quarkus.log.category."com.usatiuk.dhfs".level=TRACE
|
quarkus.log.category."com.usatiuk.dhfs".level=TRACE
|
||||||
quarkus.http.insecure-requests=enabled
|
quarkus.http.insecure-requests=enabled
|
||||||
quarkus.http.ssl.client-auth=request
|
quarkus.http.ssl.client-auth=required
|
||||||
|
|||||||
Reference in New Issue
Block a user