From 4a19f69c38604d074099906bf1088f26e53fe390 Mon Sep 17 00:00:00 2001 From: Stepan Usatiuk Date: Fri, 28 Mar 2025 17:22:06 +0100 Subject: [PATCH] Revert "Server: proxying" This reverts commit 6a8394852ea25902614ee23f6551f6ec3bbeffd8. --- .../usatiuk/dhfs/repository/PeerManager.java | 19 +-- .../dhfs/repository/ProxyConstants.java | 12 -- .../ProxyDiscoveryServiceClient.java | 21 --- .../repository/ProxyServerInterceptor.java | 36 ----- .../repository/RemoteObjectServiceServer.java | 63 +------- .../dhfs/repository/RpcClientFactory.java | 49 ++---- .../peerdiscovery/ProxyPeerAddress.java | 7 - .../peerdiscovery/ProxyPeerDiscovery.java | 32 ---- .../src/main/proto/dhfs_objects_sync.proto | 15 -- .../src/main/resources/application.properties | 4 +- .../dhfs/integration/DhfsFusex3ProxyIT.java | 141 ------------------ 11 files changed, 21 insertions(+), 378 deletions(-) delete mode 100644 dhfs-parent/server/src/main/java/com/usatiuk/dhfs/repository/ProxyConstants.java delete mode 100644 dhfs-parent/server/src/main/java/com/usatiuk/dhfs/repository/ProxyDiscoveryServiceClient.java delete mode 100644 dhfs-parent/server/src/main/java/com/usatiuk/dhfs/repository/ProxyServerInterceptor.java delete mode 100644 dhfs-parent/server/src/main/java/com/usatiuk/dhfs/repository/peerdiscovery/ProxyPeerAddress.java delete mode 100644 dhfs-parent/server/src/main/java/com/usatiuk/dhfs/repository/peerdiscovery/ProxyPeerDiscovery.java delete mode 100644 dhfs-parent/server/src/test/java/com/usatiuk/dhfs/integration/DhfsFusex3ProxyIT.java diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/repository/PeerManager.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/repository/PeerManager.java index 06154ac6..d1d975ca 100644 --- a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/repository/PeerManager.java +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/repository/PeerManager.java @@ -2,7 +2,6 @@ package com.usatiuk.dhfs.repository; import com.usatiuk.dhfs.PeerId; import com.usatiuk.dhfs.repository.peerdiscovery.PeerAddress; -import com.usatiuk.dhfs.repository.peerdiscovery.PeerAddressType; import com.usatiuk.dhfs.repository.peerdiscovery.PeerDiscoveryDirectory; import com.usatiuk.dhfs.repository.peersync.PeerInfo; import com.usatiuk.dhfs.repository.peersync.PeerInfoService; @@ -80,12 +79,11 @@ public class PeerManager { .stream() .>map(host -> () -> { try { - boolean wasReachable = isReachable(host); - var bestAddr = selectBestAddress(host.id()).orElse(null); - if (wasReachable) - Log.tracev("Heartbeat: {0} - {1}", host, bestAddr); + if (isReachable(host)) + Log.tracev("Heartbeat: {0}", host); else - Log.debugv("Trying to connect to {0} - {1}", host, bestAddr); + Log.debugv("Trying to connect to {0}", host); + var bestAddr = selectBestAddress(host.id()).orElse(null); if (bestAddr != null && pingCheck(host, bestAddr)) handleConnectionSuccess(host, bestAddr); else @@ -143,7 +141,7 @@ public class PeerManager { return true; }); } catch (Exception ignored) { - Log.debugv("Host {0} via {1} is unreachable: {2}, {3}", host, address, ignored.getMessage(), ignored.getCause()); + Log.debugv("Host {0} is unreachable: {1}, {2}", host, ignored.getMessage(), ignored.getCause()); return false; } } @@ -164,13 +162,6 @@ public class PeerManager { return _states.keySet().stream().toList(); } - public List getDirectAvailableHosts() { - return _states.entrySet().stream() - .filter(p -> !p.getValue().type().equals(PeerAddressType.PROXY)) - .map(Map.Entry::getKey) - .toList(); - } - // public List getUnavailableHosts() { // return _transientPeersState.runReadLocked(d -> d.getStates().entrySet().stream() // .filter(e -> !e.getValue().isReachable()) diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/repository/ProxyConstants.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/repository/ProxyConstants.java deleted file mode 100644 index 846bf2f0..00000000 --- a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/repository/ProxyConstants.java +++ /dev/null @@ -1,12 +0,0 @@ -package com.usatiuk.dhfs.repository; - -import io.grpc.Context; -import io.grpc.Metadata; - -public abstract class ProxyConstants { - static final Metadata.Key PROXY_TO_HEADER_KEY = Metadata.Key.of("proxy_to", Metadata.ASCII_STRING_MARSHALLER); - static final Metadata.Key PROXY_FROM_HEADER_KEY = Metadata.Key.of("proxy_from", Metadata.ASCII_STRING_MARSHALLER); - - static final Context.Key PROXY_TO_HEADER_KEY_CTX = Context.key("proxy_to"); - static final Context.Key PROXY_FROM_HEADER_KEY_CTX = Context.key("proxy_from"); -} diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/repository/ProxyDiscoveryServiceClient.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/repository/ProxyDiscoveryServiceClient.java deleted file mode 100644 index e57b6cc2..00000000 --- a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/repository/ProxyDiscoveryServiceClient.java +++ /dev/null @@ -1,21 +0,0 @@ -package com.usatiuk.dhfs.repository; - -import com.usatiuk.dhfs.PeerId; -import jakarta.enterprise.context.ApplicationScoped; -import jakarta.inject.Inject; - -import java.util.Collection; - -@ApplicationScoped -public class ProxyDiscoveryServiceClient { - @Inject - RpcClientFactory rpcClientFactory; - - public Collection getAvailablePeers(PeerId peerId) { - return rpcClientFactory.withObjSyncClient(peerId, (peer, client) -> { - var reply = client.proxyAvailableGet(ProxyAvailableRequest.getDefaultInstance()); - return reply.getAvailableTargetsList().stream().map(ProxyAvailableInfo::getUuid).map(PeerId::of).toList(); - }); - } - -} diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/repository/ProxyServerInterceptor.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/repository/ProxyServerInterceptor.java deleted file mode 100644 index df41d208..00000000 --- a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/repository/ProxyServerInterceptor.java +++ /dev/null @@ -1,36 +0,0 @@ -package com.usatiuk.dhfs.repository; - -import io.grpc.*; -import jakarta.enterprise.context.ApplicationScoped; - -import static com.usatiuk.dhfs.repository.ProxyConstants.*; - -@ApplicationScoped -public class ProxyServerInterceptor implements ServerInterceptor { - @Override - public ServerCall.Listener interceptCall(ServerCall serverCall, - Metadata metadata, ServerCallHandler serverCallHandler) { - Context context = null; - if (metadata.containsKey(PROXY_TO_HEADER_KEY) - && metadata.containsKey(PROXY_FROM_HEADER_KEY)) { - throw new StatusRuntimeException(Status.INVALID_ARGUMENT.withDescription("Both proxy_to and proxy_from headers are present")); - } - - if (metadata.containsKey(PROXY_TO_HEADER_KEY)) { - context = Context.current().withValue(PROXY_TO_HEADER_KEY_CTX, metadata.get(PROXY_TO_HEADER_KEY)); - } else if (metadata.containsKey(PROXY_FROM_HEADER_KEY)) { - context = Context.current().withValue(PROXY_FROM_HEADER_KEY_CTX, metadata.get(PROXY_FROM_HEADER_KEY)); - } - - if (context != null) { - return Contexts.interceptCall( - context, - serverCall, - metadata, - serverCallHandler - ); - } else { - return serverCallHandler.startCall(serverCall, metadata); - } - } -} diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/repository/RemoteObjectServiceServer.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/repository/RemoteObjectServiceServer.java index 24fb9758..ca163616 100644 --- a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/repository/RemoteObjectServiceServer.java +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/repository/RemoteObjectServiceServer.java @@ -2,101 +2,46 @@ package com.usatiuk.dhfs.repository; import com.usatiuk.dhfs.PeerId; import io.quarkus.grpc.GrpcService; -import io.quarkus.grpc.RegisterInterceptor; -import io.quarkus.logging.Log; import io.quarkus.security.identity.SecurityIdentity; import io.smallrye.common.annotation.Blocking; import io.smallrye.mutiny.Uni; import jakarta.annotation.security.RolesAllowed; import jakarta.inject.Inject; -import java.util.Optional; -import java.util.function.Function; - -import static com.usatiuk.dhfs.repository.ProxyConstants.PROXY_FROM_HEADER_KEY_CTX; -import static com.usatiuk.dhfs.repository.ProxyConstants.PROXY_TO_HEADER_KEY_CTX; - // Note: RunOnVirtualThread hangs somehow @GrpcService @RolesAllowed("cluster-member") -@RegisterInterceptor(ProxyServerInterceptor.class) public class RemoteObjectServiceServer implements DhfsObjectSyncGrpc { @Inject SecurityIdentity identity; @Inject RemoteObjectServiceServerImpl remoteObjectServiceServerImpl; - @Inject - RpcClientFactory rpcClientFactory; - @Inject - PeerManager peerManager; - private PeerId getIdentity() { - if (PROXY_FROM_HEADER_KEY_CTX.get() != null) { - return PeerId.of(PROXY_FROM_HEADER_KEY_CTX.get()); - } - return PeerId.of(identity.getPrincipal().getName().substring(3)); } - private PeerId getProxyTarget() { - if (PROXY_TO_HEADER_KEY_CTX.get() != null) { - return PeerId.of(PROXY_TO_HEADER_KEY_CTX.get()); - } - return null; - } - - private Optional> tryProxy(Function fn) { - var proxyTarget = getProxyTarget(); - if (proxyTarget != null) { - return Optional.of(Uni.createFrom().item(rpcClientFactory.withObjSyncClient( - proxyTarget, - getIdentity(), - null, - (peer, client) -> { - if (!peer.equals(proxyTarget)) { - throw new IllegalStateException("Expected " + proxyTarget + " but got " + peer + " when proxying"); - } - Log.tracev("Proxying to {0}", peer); - return fn.apply(client); - } - ))); - } - return Optional.empty(); - } - @Override @Blocking public Uni getObject(GetObjectRequest request) { - return tryProxy(client -> client.getObject(request)).orElseGet(() -> remoteObjectServiceServerImpl.getObject(getIdentity(), request)); + return remoteObjectServiceServerImpl.getObject(getIdentity(), request); } @Override @Blocking public Uni canDelete(CanDeleteRequest request) { - return tryProxy(client -> client.canDelete(request)).orElseGet(() -> remoteObjectServiceServerImpl.canDelete(getIdentity(), request)); + return remoteObjectServiceServerImpl.canDelete(getIdentity(), request); } @Override @Blocking public Uni opPush(OpPushRequest request) { - return tryProxy(client -> client.opPush(request)).orElseGet(() -> remoteObjectServiceServerImpl.opPush(getIdentity(), request)); + return remoteObjectServiceServerImpl.opPush(getIdentity(), request); } @Override @Blocking public Uni ping(PingRequest request) { - return tryProxy(client -> client.ping(request)).orElseGet(() -> remoteObjectServiceServerImpl.ping(getIdentity(), request)); - } - - @Override - @Blocking - public Uni proxyAvailableGet(ProxyAvailableRequest request) { - var got = peerManager.getDirectAvailableHosts(); - var builder = ProxyAvailableReply.newBuilder(); - for (var host : got) { - builder.addAvailableTargetsBuilder().setUuid(host.toString()); - } - return Uni.createFrom().item(builder.build()); + return remoteObjectServiceServerImpl.ping(getIdentity(), request); } } diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/repository/RpcClientFactory.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/repository/RpcClientFactory.java index 2a53ee44..24f86755 100644 --- a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/repository/RpcClientFactory.java +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/repository/RpcClientFactory.java @@ -3,13 +3,9 @@ package com.usatiuk.dhfs.repository; import com.usatiuk.dhfs.PeerId; import com.usatiuk.dhfs.repository.peerdiscovery.IpPeerAddress; import com.usatiuk.dhfs.repository.peerdiscovery.PeerAddress; -import com.usatiuk.dhfs.repository.peerdiscovery.ProxyPeerAddress; -import io.grpc.Metadata; import io.grpc.Status; import io.grpc.StatusRuntimeException; -import io.quarkus.grpc.GrpcClientUtils; import io.quarkus.logging.Log; -import jakarta.annotation.Nullable; import jakarta.enterprise.context.ApplicationScoped; import jakarta.inject.Inject; import org.eclipse.microprofile.config.inject.ConfigProperty; @@ -22,9 +18,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; -import static com.usatiuk.dhfs.repository.ProxyConstants.PROXY_FROM_HEADER_KEY; -import static com.usatiuk.dhfs.repository.ProxyConstants.PROXY_TO_HEADER_KEY; - // TODO: Dedup this @ApplicationScoped public class RpcClientFactory { @@ -58,52 +51,30 @@ public class RpcClientFactory { throw new StatusRuntimeException(Status.UNAVAILABLE.withDescription("No reachable targets!")); } - public R withObjSyncClient(PeerId target, PeerId proxyFrom, PeerId proxyTo, ObjectSyncClientFunction fn) { - var hostInfo = peerManager.getAddress(target); + public R withObjSyncClient(PeerId target, ObjectSyncClientFunction fn) { + var hostinfo = peerManager.getAddress(target); - if (hostInfo == null) + if (hostinfo == null) throw new StatusRuntimeException(Status.UNAVAILABLE.withDescription("Not known to be reachable: " + target)); - return withObjSyncClient(target, proxyFrom, proxyTo, hostInfo, syncTimeout, fn); + return withObjSyncClient(target, hostinfo, syncTimeout, fn); } - public R withObjSyncClient(PeerId host, PeerId proxyFrom, PeerId proxyTo, PeerAddress address, long timeout, ObjectSyncClientFunction fn) { + public R withObjSyncClient(PeerId host, PeerAddress address, long timeout, ObjectSyncClientFunction fn) { return switch (address) { case IpPeerAddress ipPeerAddress -> - withObjSyncClient(host, proxyFrom, proxyTo, ipPeerAddress.address(), ipPeerAddress.securePort(), timeout, fn); - case ProxyPeerAddress pp -> withObjSyncClient(pp.proxyThrough(), null, host, fn); // TODO: Timeout + withObjSyncClient(host, ipPeerAddress.address(), ipPeerAddress.securePort(), timeout, fn); default -> throw new IllegalStateException("Unexpected value: " + address); }; } - public R withObjSyncClient(PeerId target, ObjectSyncClientFunction fn) { - return withObjSyncClient(target, null, null, fn); - } - - public R withObjSyncClient(PeerId host, PeerAddress address, long timeout, ObjectSyncClientFunction fn) { - return withObjSyncClient(host, null, null, address, timeout, fn); - } - - public R withObjSyncClient(PeerId host, @Nullable PeerId proxyFrom, @Nullable PeerId proxyTo, InetAddress addr, int port, long timeout, ObjectSyncClientFunction fn) { - var key = new ObjSyncStubKey(host, proxyFrom, proxyTo, addr, port); + public R withObjSyncClient(PeerId host, InetAddress addr, int port, long timeout, ObjectSyncClientFunction fn) { + var key = new ObjSyncStubKey(host, addr, port); var stub = _objSyncCache.computeIfAbsent(key, (k) -> { var channel = rpcChannelFactory.getSecureChannel(host, addr.getHostAddress(), port); - - var client = DhfsObjectSyncGrpcGrpc.newBlockingStub(channel) + return DhfsObjectSyncGrpcGrpc.newBlockingStub(channel) .withMaxOutboundMessageSize(Integer.MAX_VALUE) .withMaxInboundMessageSize(Integer.MAX_VALUE); - - if (proxyFrom != null) { - Metadata headers = new Metadata(); - headers.put(PROXY_FROM_HEADER_KEY, proxyFrom.toString()); - return GrpcClientUtils.attachHeaders(client, headers); - } else if (proxyTo != null) { - Metadata headers = new Metadata(); - headers.put(PROXY_TO_HEADER_KEY, proxyTo.toString()); - return GrpcClientUtils.attachHeaders(client, headers); - } else { - return client; - } }); return fn.apply(host, stub.withDeadlineAfter(timeout, TimeUnit.SECONDS)); } @@ -117,7 +88,7 @@ public class RpcClientFactory { R apply(PeerId peer, DhfsObjectSyncGrpcGrpc.DhfsObjectSyncGrpcBlockingStub client); } - private record ObjSyncStubKey(PeerId id, @Nullable PeerId from, @Nullable PeerId to, InetAddress addr, int port) { + private record ObjSyncStubKey(PeerId id, InetAddress addr, int port) { } } diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/repository/peerdiscovery/ProxyPeerAddress.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/repository/peerdiscovery/ProxyPeerAddress.java deleted file mode 100644 index de6b7cc7..00000000 --- a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/repository/peerdiscovery/ProxyPeerAddress.java +++ /dev/null @@ -1,7 +0,0 @@ -package com.usatiuk.dhfs.repository.peerdiscovery; - -import com.usatiuk.dhfs.PeerId; - -public record ProxyPeerAddress(PeerId peer, PeerAddressType type, - PeerId proxyThrough) implements PeerAddress { -} diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/repository/peerdiscovery/ProxyPeerDiscovery.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/repository/peerdiscovery/ProxyPeerDiscovery.java deleted file mode 100644 index fa241d67..00000000 --- a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/repository/peerdiscovery/ProxyPeerDiscovery.java +++ /dev/null @@ -1,32 +0,0 @@ -package com.usatiuk.dhfs.repository.peerdiscovery; - -import com.usatiuk.dhfs.repository.PeerManager; -import com.usatiuk.dhfs.repository.ProxyDiscoveryServiceClient; -import io.quarkus.logging.Log; -import io.quarkus.scheduler.Scheduled; -import jakarta.enterprise.context.ApplicationScoped; -import jakarta.inject.Inject; - -@ApplicationScoped -public class ProxyPeerDiscovery { - @Inject - PeerDiscoveryDirectory peerDiscoveryDirectory; - @Inject - ProxyDiscoveryServiceClient proxyDiscoveryServiceClient; - @Inject - PeerManager peerManager; - - @Scheduled(every = "1s", concurrentExecution = Scheduled.ConcurrentExecution.SKIP) - public void discoverPeers() { - Log.tracev("Discovering proxy peers"); - for (var p : peerManager.getDirectAvailableHosts()) { - var got = proxyDiscoveryServiceClient.getAvailablePeers(p); - Log.tracev("Asked {0} for peers, got {1}", p, got); - for (var peer : got) { - peerDiscoveryDirectory.notifyAddr( - new ProxyPeerAddress(peer, PeerAddressType.PROXY, p) - ); - } - } - } -} diff --git a/dhfs-parent/server/src/main/proto/dhfs_objects_sync.proto b/dhfs-parent/server/src/main/proto/dhfs_objects_sync.proto index 88ba2a67..91cda667 100644 --- a/dhfs-parent/server/src/main/proto/dhfs_objects_sync.proto +++ b/dhfs-parent/server/src/main/proto/dhfs_objects_sync.proto @@ -15,23 +15,8 @@ service DhfsObjectSyncGrpc { rpc CanDelete (CanDeleteRequest) returns (CanDeleteReply) {} rpc Ping (PingRequest) returns (PingReply) {} - - // TODO: Probably this would be better in a separate service - // but it will be more boilerplate with the client factories - rpc ProxyAvailableGet (ProxyAvailableRequest) returns (ProxyAvailableReply) {} } -message ProxyAvailableInfo { - string uuid = 1; -} - -message ProxyAvailableRequest { - -} - -message ProxyAvailableReply { - repeated ProxyAvailableInfo availableTargets = 1; -} message PingRequest {} message PingReply {} diff --git a/dhfs-parent/server/src/main/resources/application.properties b/dhfs-parent/server/src/main/resources/application.properties index 85807041..b11a23ee 100644 --- a/dhfs-parent/server/src/main/resources/application.properties +++ b/dhfs-parent/server/src/main/resources/application.properties @@ -1,6 +1,6 @@ quarkus.grpc.server.use-separate-server=false dhfs.objects.peerdiscovery.port=42069 -dhfs.objects.peerdiscovery.interval=5s +dhfs.objects.peerdiscovery.interval=4s dhfs.objects.peerdiscovery.broadcast=true dhfs.objects.sync.timeout=30 dhfs.objects.sync.ping.timeout=5 @@ -33,7 +33,7 @@ dhfs.objects.ref-processor.threads=4 dhfs.objects.opsender.batch-size=100 dhfs.objects.lock_timeout_secs=2 dhfs.local-discovery=true -dhfs.peerdiscovery.timeout=5000 +dhfs.peerdiscovery.timeout=10000 quarkus.log.category."com.usatiuk".min-level=TRACE quarkus.log.category."com.usatiuk".level=TRACE quarkus.http.insecure-requests=enabled diff --git a/dhfs-parent/server/src/test/java/com/usatiuk/dhfs/integration/DhfsFusex3ProxyIT.java b/dhfs-parent/server/src/test/java/com/usatiuk/dhfs/integration/DhfsFusex3ProxyIT.java deleted file mode 100644 index 57949e01..00000000 --- a/dhfs-parent/server/src/test/java/com/usatiuk/dhfs/integration/DhfsFusex3ProxyIT.java +++ /dev/null @@ -1,141 +0,0 @@ -package com.usatiuk.dhfs.integration; - -import com.github.dockerjava.api.model.Device; -import io.quarkus.logging.Log; -import org.junit.jupiter.api.*; -import org.slf4j.LoggerFactory; -import org.testcontainers.DockerClientFactory; -import org.testcontainers.containers.GenericContainer; -import org.testcontainers.containers.Network; -import org.testcontainers.containers.output.Slf4jLogConsumer; -import org.testcontainers.containers.output.WaitingConsumer; -import org.testcontainers.containers.wait.strategy.Wait; - -import java.io.IOException; -import java.time.Duration; -import java.util.Objects; -import java.util.UUID; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.stream.Stream; - -import static org.awaitility.Awaitility.await; - -public class DhfsFusex3ProxyIT { - GenericContainer container1; - GenericContainer container2; - GenericContainer container3; - - WaitingConsumer waitingConsumer1; - WaitingConsumer waitingConsumer2; - WaitingConsumer waitingConsumer3; - - String c1uuid; - String c2uuid; - String c3uuid; - - // This calculation is somewhat racy, so keep it hardcoded for now - long emptyFileCount = 9; - - @BeforeEach - void setup(TestInfo testInfo) throws IOException, InterruptedException, TimeoutException { - // TODO: Dedup - Network network1 = Network.newNetwork(); - Network network2 = Network.newNetwork(); - - container1 = new GenericContainer<>(DhfsImage.getInstance()) - .withPrivilegedMode(true) - .withCreateContainerCmdModifier(cmd -> Objects.requireNonNull(cmd.getHostConfig()).withDevices(Device.parse("/dev/fuse"))) - .waitingFor(Wait.forLogMessage(".*Listening.*", 1).withStartupTimeout(Duration.ofSeconds(60))).withNetwork(network1); - container2 = new GenericContainer<>(DhfsImage.getInstance()) - .withPrivilegedMode(true) - .withCreateContainerCmdModifier(cmd -> Objects.requireNonNull(cmd.getHostConfig()).withDevices(Device.parse("/dev/fuse"))) - .waitingFor(Wait.forLogMessage(".*Listening.*", 1).withStartupTimeout(Duration.ofSeconds(60))).withNetwork(network1); - container3 = new GenericContainer<>(DhfsImage.getInstance()) - .withPrivilegedMode(true) - .withCreateContainerCmdModifier(cmd -> Objects.requireNonNull(cmd.getHostConfig()).withDevices(Device.parse("/dev/fuse"))) - .waitingFor(Wait.forLogMessage(".*Listening.*", 1).withStartupTimeout(Duration.ofSeconds(60))).withNetwork(network2); - - Stream.of(container1, container2, container3).parallel().forEach(GenericContainer::start); - - var client = DockerClientFactory.instance().client(); - client.connectToNetworkCmd().withContainerId(container2.getContainerId()).withNetworkId(network2.getId()).exec(); - - c1uuid = container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/data/stuff/self_uuid").getStdout(); - c2uuid = container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/data/stuff/self_uuid").getStdout(); - c3uuid = container3.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/data/stuff/self_uuid").getStdout(); - - Log.info(container1.getContainerId() + "=" + c1uuid); - Log.info(container2.getContainerId() + "=" + c2uuid); - Log.info(container3.getContainerId() + "=" + c3uuid); - - waitingConsumer1 = new WaitingConsumer(); - var loggingConsumer1 = new Slf4jLogConsumer(LoggerFactory.getLogger(DhfsFusex3ProxyIT.class)) - .withPrefix(c1uuid.substring(0, 4) + "-1-" + testInfo.getDisplayName()); - container1.followOutput(loggingConsumer1.andThen(waitingConsumer1)); - waitingConsumer2 = new WaitingConsumer(); - var loggingConsumer2 = new Slf4jLogConsumer(LoggerFactory.getLogger(DhfsFusex3ProxyIT.class)) - .withPrefix(c2uuid.substring(0, 4) + "-2-" + testInfo.getDisplayName()); - container2.followOutput(loggingConsumer2.andThen(waitingConsumer2)); - waitingConsumer3 = new WaitingConsumer(); - var loggingConsumer3 = new Slf4jLogConsumer(LoggerFactory.getLogger(DhfsFusex3ProxyIT.class)) - .withPrefix(c3uuid.substring(0, 4) + "-3-" + testInfo.getDisplayName()); - container3.followOutput(loggingConsumer3.andThen(waitingConsumer3)); - - Assertions.assertDoesNotThrow(() -> UUID.fromString(c1uuid)); - Assertions.assertDoesNotThrow(() -> UUID.fromString(c2uuid)); - Assertions.assertDoesNotThrow(() -> UUID.fromString(c3uuid)); - - waitingConsumer3.waitUntil(frame -> frame.getUtf8String().contains("New address"), 60, TimeUnit.SECONDS, 1); - waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("New address"), 60, TimeUnit.SECONDS, 2); - waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("New address"), 60, TimeUnit.SECONDS, 1); - - Thread.sleep(2000); - - var c1curl = container1.execInContainer("/bin/sh", "-c", - "curl --fail --header \"Content-Type: application/json\" " + - " --request PUT " + - " --data '{\"uuid\":\"" + c2uuid + "\"}' " + - " http://localhost:8080/objects-manage/known-peers"); - - var c2curl1 = container2.execInContainer("/bin/sh", "-c", - "curl --fail --header \"Content-Type: application/json\" " + - " --request PUT " + - " --data '{\"uuid\":\"" + c1uuid + "\"}' " + - " http://localhost:8080/objects-manage/known-peers"); - - var c2curl3 = container2.execInContainer("/bin/sh", "-c", - "curl --fail --header \"Content-Type: application/json\" " + - " --request PUT " + - " --data '{\"uuid\":\"" + c3uuid + "\"}' " + - " http://localhost:8080/objects-manage/known-peers"); - - var c3curl = container3.execInContainer("/bin/sh", "-c", - "curl --fail --header \"Content-Type: application/json\" " + - " --request PUT " + - " --data '{\"uuid\":\"" + c2uuid + "\"}' " + - " http://localhost:8080/objects-manage/known-peers"); - - Assertions.assertEquals(0, c1curl.getExitCode()); - Assertions.assertEquals(0, c2curl1.getExitCode()); - Assertions.assertEquals(0, c2curl3.getExitCode()); - Assertions.assertEquals(0, c3curl.getExitCode()); - - waitingConsumer3.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS, 2); - waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS, 2); - waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS, 2); - } - - @AfterEach - void stop() { - Stream.of(container1, container2, container3).parallel().forEach(GenericContainer::stop); - } - - @Test - void readWriteFileTest() throws IOException, InterruptedException, TimeoutException { - await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo tesempty > /root/dhfs_default/fuse/testf1").getExitCode()); - await().atMost(45, TimeUnit.SECONDS).until(() -> "tesempty\n".equals(container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf1").getStdout())); - await().atMost(45, TimeUnit.SECONDS).until(() -> "tesempty\n".equals(container3.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf1").getStdout())); - } - -}