Revert "Server: proxying"

This reverts commit 6a8394852e.
This commit is contained in:
2025-03-28 17:22:06 +01:00
parent 8d40019f2c
commit 4a19f69c38
11 changed files with 21 additions and 378 deletions

View File

@@ -2,7 +2,6 @@ package com.usatiuk.dhfs.repository;
import com.usatiuk.dhfs.PeerId;
import com.usatiuk.dhfs.repository.peerdiscovery.PeerAddress;
import com.usatiuk.dhfs.repository.peerdiscovery.PeerAddressType;
import com.usatiuk.dhfs.repository.peerdiscovery.PeerDiscoveryDirectory;
import com.usatiuk.dhfs.repository.peersync.PeerInfo;
import com.usatiuk.dhfs.repository.peersync.PeerInfoService;
@@ -80,12 +79,11 @@ public class PeerManager {
.stream()
.<Callable<Void>>map(host -> () -> {
try {
boolean wasReachable = isReachable(host);
var bestAddr = selectBestAddress(host.id()).orElse(null);
if (wasReachable)
Log.tracev("Heartbeat: {0} - {1}", host, bestAddr);
if (isReachable(host))
Log.tracev("Heartbeat: {0}", host);
else
Log.debugv("Trying to connect to {0} - {1}", host, bestAddr);
Log.debugv("Trying to connect to {0}", host);
var bestAddr = selectBestAddress(host.id()).orElse(null);
if (bestAddr != null && pingCheck(host, bestAddr))
handleConnectionSuccess(host, bestAddr);
else
@@ -143,7 +141,7 @@ public class PeerManager {
return true;
});
} catch (Exception ignored) {
Log.debugv("Host {0} via {1} is unreachable: {2}, {3}", host, address, ignored.getMessage(), ignored.getCause());
Log.debugv("Host {0} is unreachable: {1}, {2}", host, ignored.getMessage(), ignored.getCause());
return false;
}
}
@@ -164,13 +162,6 @@ public class PeerManager {
return _states.keySet().stream().toList();
}
public List<PeerId> getDirectAvailableHosts() {
return _states.entrySet().stream()
.filter(p -> !p.getValue().type().equals(PeerAddressType.PROXY))
.map(Map.Entry::getKey)
.toList();
}
// public List<UUID> getUnavailableHosts() {
// return _transientPeersState.runReadLocked(d -> d.getStates().entrySet().stream()
// .filter(e -> !e.getValue().isReachable())

View File

@@ -1,12 +0,0 @@
package com.usatiuk.dhfs.repository;
import io.grpc.Context;
import io.grpc.Metadata;
public abstract class ProxyConstants {
static final Metadata.Key<String> PROXY_TO_HEADER_KEY = Metadata.Key.of("proxy_to", Metadata.ASCII_STRING_MARSHALLER);
static final Metadata.Key<String> PROXY_FROM_HEADER_KEY = Metadata.Key.of("proxy_from", Metadata.ASCII_STRING_MARSHALLER);
static final Context.Key<String> PROXY_TO_HEADER_KEY_CTX = Context.key("proxy_to");
static final Context.Key<String> PROXY_FROM_HEADER_KEY_CTX = Context.key("proxy_from");
}

View File

@@ -1,21 +0,0 @@
package com.usatiuk.dhfs.repository;
import com.usatiuk.dhfs.PeerId;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import java.util.Collection;
@ApplicationScoped
public class ProxyDiscoveryServiceClient {
@Inject
RpcClientFactory rpcClientFactory;
public Collection<PeerId> getAvailablePeers(PeerId peerId) {
return rpcClientFactory.withObjSyncClient(peerId, (peer, client) -> {
var reply = client.proxyAvailableGet(ProxyAvailableRequest.getDefaultInstance());
return reply.getAvailableTargetsList().stream().map(ProxyAvailableInfo::getUuid).map(PeerId::of).toList();
});
}
}

View File

@@ -1,36 +0,0 @@
package com.usatiuk.dhfs.repository;
import io.grpc.*;
import jakarta.enterprise.context.ApplicationScoped;
import static com.usatiuk.dhfs.repository.ProxyConstants.*;
@ApplicationScoped
public class ProxyServerInterceptor implements ServerInterceptor {
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> serverCall,
Metadata metadata, ServerCallHandler<ReqT, RespT> serverCallHandler) {
Context context = null;
if (metadata.containsKey(PROXY_TO_HEADER_KEY)
&& metadata.containsKey(PROXY_FROM_HEADER_KEY)) {
throw new StatusRuntimeException(Status.INVALID_ARGUMENT.withDescription("Both proxy_to and proxy_from headers are present"));
}
if (metadata.containsKey(PROXY_TO_HEADER_KEY)) {
context = Context.current().withValue(PROXY_TO_HEADER_KEY_CTX, metadata.get(PROXY_TO_HEADER_KEY));
} else if (metadata.containsKey(PROXY_FROM_HEADER_KEY)) {
context = Context.current().withValue(PROXY_FROM_HEADER_KEY_CTX, metadata.get(PROXY_FROM_HEADER_KEY));
}
if (context != null) {
return Contexts.interceptCall(
context,
serverCall,
metadata,
serverCallHandler
);
} else {
return serverCallHandler.startCall(serverCall, metadata);
}
}
}

View File

@@ -2,101 +2,46 @@ package com.usatiuk.dhfs.repository;
import com.usatiuk.dhfs.PeerId;
import io.quarkus.grpc.GrpcService;
import io.quarkus.grpc.RegisterInterceptor;
import io.quarkus.logging.Log;
import io.quarkus.security.identity.SecurityIdentity;
import io.smallrye.common.annotation.Blocking;
import io.smallrye.mutiny.Uni;
import jakarta.annotation.security.RolesAllowed;
import jakarta.inject.Inject;
import java.util.Optional;
import java.util.function.Function;
import static com.usatiuk.dhfs.repository.ProxyConstants.PROXY_FROM_HEADER_KEY_CTX;
import static com.usatiuk.dhfs.repository.ProxyConstants.PROXY_TO_HEADER_KEY_CTX;
// Note: RunOnVirtualThread hangs somehow
@GrpcService
@RolesAllowed("cluster-member")
@RegisterInterceptor(ProxyServerInterceptor.class)
public class RemoteObjectServiceServer implements DhfsObjectSyncGrpc {
@Inject
SecurityIdentity identity;
@Inject
RemoteObjectServiceServerImpl remoteObjectServiceServerImpl;
@Inject
RpcClientFactory rpcClientFactory;
@Inject
PeerManager peerManager;
private PeerId getIdentity() {
if (PROXY_FROM_HEADER_KEY_CTX.get() != null) {
return PeerId.of(PROXY_FROM_HEADER_KEY_CTX.get());
}
return PeerId.of(identity.getPrincipal().getName().substring(3));
}
private PeerId getProxyTarget() {
if (PROXY_TO_HEADER_KEY_CTX.get() != null) {
return PeerId.of(PROXY_TO_HEADER_KEY_CTX.get());
}
return null;
}
private <T> Optional<Uni<T>> tryProxy(Function<DhfsObjectSyncGrpcGrpc.DhfsObjectSyncGrpcBlockingStub, T> fn) {
var proxyTarget = getProxyTarget();
if (proxyTarget != null) {
return Optional.of(Uni.createFrom().item(rpcClientFactory.<T>withObjSyncClient(
proxyTarget,
getIdentity(),
null,
(peer, client) -> {
if (!peer.equals(proxyTarget)) {
throw new IllegalStateException("Expected " + proxyTarget + " but got " + peer + " when proxying");
}
Log.tracev("Proxying to {0}", peer);
return fn.apply(client);
}
)));
}
return Optional.empty();
}
@Override
@Blocking
public Uni<GetObjectReply> getObject(GetObjectRequest request) {
return tryProxy(client -> client.getObject(request)).orElseGet(() -> remoteObjectServiceServerImpl.getObject(getIdentity(), request));
return remoteObjectServiceServerImpl.getObject(getIdentity(), request);
}
@Override
@Blocking
public Uni<CanDeleteReply> canDelete(CanDeleteRequest request) {
return tryProxy(client -> client.canDelete(request)).orElseGet(() -> remoteObjectServiceServerImpl.canDelete(getIdentity(), request));
return remoteObjectServiceServerImpl.canDelete(getIdentity(), request);
}
@Override
@Blocking
public Uni<OpPushReply> opPush(OpPushRequest request) {
return tryProxy(client -> client.opPush(request)).orElseGet(() -> remoteObjectServiceServerImpl.opPush(getIdentity(), request));
return remoteObjectServiceServerImpl.opPush(getIdentity(), request);
}
@Override
@Blocking
public Uni<PingReply> ping(PingRequest request) {
return tryProxy(client -> client.ping(request)).orElseGet(() -> remoteObjectServiceServerImpl.ping(getIdentity(), request));
}
@Override
@Blocking
public Uni<ProxyAvailableReply> proxyAvailableGet(ProxyAvailableRequest request) {
var got = peerManager.getDirectAvailableHosts();
var builder = ProxyAvailableReply.newBuilder();
for (var host : got) {
builder.addAvailableTargetsBuilder().setUuid(host.toString());
}
return Uni.createFrom().item(builder.build());
return remoteObjectServiceServerImpl.ping(getIdentity(), request);
}
}

View File

@@ -3,13 +3,9 @@ package com.usatiuk.dhfs.repository;
import com.usatiuk.dhfs.PeerId;
import com.usatiuk.dhfs.repository.peerdiscovery.IpPeerAddress;
import com.usatiuk.dhfs.repository.peerdiscovery.PeerAddress;
import com.usatiuk.dhfs.repository.peerdiscovery.ProxyPeerAddress;
import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.quarkus.grpc.GrpcClientUtils;
import io.quarkus.logging.Log;
import jakarta.annotation.Nullable;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import org.eclipse.microprofile.config.inject.ConfigProperty;
@@ -22,9 +18,6 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import static com.usatiuk.dhfs.repository.ProxyConstants.PROXY_FROM_HEADER_KEY;
import static com.usatiuk.dhfs.repository.ProxyConstants.PROXY_TO_HEADER_KEY;
// TODO: Dedup this
@ApplicationScoped
public class RpcClientFactory {
@@ -58,52 +51,30 @@ public class RpcClientFactory {
throw new StatusRuntimeException(Status.UNAVAILABLE.withDescription("No reachable targets!"));
}
public <R> R withObjSyncClient(PeerId target, PeerId proxyFrom, PeerId proxyTo, ObjectSyncClientFunction<R> fn) {
var hostInfo = peerManager.getAddress(target);
public <R> R withObjSyncClient(PeerId target, ObjectSyncClientFunction<R> fn) {
var hostinfo = peerManager.getAddress(target);
if (hostInfo == null)
if (hostinfo == null)
throw new StatusRuntimeException(Status.UNAVAILABLE.withDescription("Not known to be reachable: " + target));
return withObjSyncClient(target, proxyFrom, proxyTo, hostInfo, syncTimeout, fn);
return withObjSyncClient(target, hostinfo, syncTimeout, fn);
}
public <R> R withObjSyncClient(PeerId host, PeerId proxyFrom, PeerId proxyTo, PeerAddress address, long timeout, ObjectSyncClientFunction<R> fn) {
public <R> R withObjSyncClient(PeerId host, PeerAddress address, long timeout, ObjectSyncClientFunction<R> fn) {
return switch (address) {
case IpPeerAddress ipPeerAddress ->
withObjSyncClient(host, proxyFrom, proxyTo, ipPeerAddress.address(), ipPeerAddress.securePort(), timeout, fn);
case ProxyPeerAddress pp -> withObjSyncClient(pp.proxyThrough(), null, host, fn); // TODO: Timeout
withObjSyncClient(host, ipPeerAddress.address(), ipPeerAddress.securePort(), timeout, fn);
default -> throw new IllegalStateException("Unexpected value: " + address);
};
}
public <R> R withObjSyncClient(PeerId target, ObjectSyncClientFunction<R> fn) {
return withObjSyncClient(target, null, null, fn);
}
public <R> R withObjSyncClient(PeerId host, PeerAddress address, long timeout, ObjectSyncClientFunction<R> fn) {
return withObjSyncClient(host, null, null, address, timeout, fn);
}
public <R> R withObjSyncClient(PeerId host, @Nullable PeerId proxyFrom, @Nullable PeerId proxyTo, InetAddress addr, int port, long timeout, ObjectSyncClientFunction<R> fn) {
var key = new ObjSyncStubKey(host, proxyFrom, proxyTo, addr, port);
public <R> R withObjSyncClient(PeerId host, InetAddress addr, int port, long timeout, ObjectSyncClientFunction<R> fn) {
var key = new ObjSyncStubKey(host, addr, port);
var stub = _objSyncCache.computeIfAbsent(key, (k) -> {
var channel = rpcChannelFactory.getSecureChannel(host, addr.getHostAddress(), port);
var client = DhfsObjectSyncGrpcGrpc.newBlockingStub(channel)
return DhfsObjectSyncGrpcGrpc.newBlockingStub(channel)
.withMaxOutboundMessageSize(Integer.MAX_VALUE)
.withMaxInboundMessageSize(Integer.MAX_VALUE);
if (proxyFrom != null) {
Metadata headers = new Metadata();
headers.put(PROXY_FROM_HEADER_KEY, proxyFrom.toString());
return GrpcClientUtils.attachHeaders(client, headers);
} else if (proxyTo != null) {
Metadata headers = new Metadata();
headers.put(PROXY_TO_HEADER_KEY, proxyTo.toString());
return GrpcClientUtils.attachHeaders(client, headers);
} else {
return client;
}
});
return fn.apply(host, stub.withDeadlineAfter(timeout, TimeUnit.SECONDS));
}
@@ -117,7 +88,7 @@ public class RpcClientFactory {
R apply(PeerId peer, DhfsObjectSyncGrpcGrpc.DhfsObjectSyncGrpcBlockingStub client);
}
private record ObjSyncStubKey(PeerId id, @Nullable PeerId from, @Nullable PeerId to, InetAddress addr, int port) {
private record ObjSyncStubKey(PeerId id, InetAddress addr, int port) {
}
}

View File

@@ -1,7 +0,0 @@
package com.usatiuk.dhfs.repository.peerdiscovery;
import com.usatiuk.dhfs.PeerId;
public record ProxyPeerAddress(PeerId peer, PeerAddressType type,
PeerId proxyThrough) implements PeerAddress {
}

View File

@@ -1,32 +0,0 @@
package com.usatiuk.dhfs.repository.peerdiscovery;
import com.usatiuk.dhfs.repository.PeerManager;
import com.usatiuk.dhfs.repository.ProxyDiscoveryServiceClient;
import io.quarkus.logging.Log;
import io.quarkus.scheduler.Scheduled;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
@ApplicationScoped
public class ProxyPeerDiscovery {
@Inject
PeerDiscoveryDirectory peerDiscoveryDirectory;
@Inject
ProxyDiscoveryServiceClient proxyDiscoveryServiceClient;
@Inject
PeerManager peerManager;
@Scheduled(every = "1s", concurrentExecution = Scheduled.ConcurrentExecution.SKIP)
public void discoverPeers() {
Log.tracev("Discovering proxy peers");
for (var p : peerManager.getDirectAvailableHosts()) {
var got = proxyDiscoveryServiceClient.getAvailablePeers(p);
Log.tracev("Asked {0} for peers, got {1}", p, got);
for (var peer : got) {
peerDiscoveryDirectory.notifyAddr(
new ProxyPeerAddress(peer, PeerAddressType.PROXY, p)
);
}
}
}
}

View File

@@ -15,23 +15,8 @@ service DhfsObjectSyncGrpc {
rpc CanDelete (CanDeleteRequest) returns (CanDeleteReply) {}
rpc Ping (PingRequest) returns (PingReply) {}
// TODO: Probably this would be better in a separate service
// but it will be more boilerplate with the client factories
rpc ProxyAvailableGet (ProxyAvailableRequest) returns (ProxyAvailableReply) {}
}
message ProxyAvailableInfo {
string uuid = 1;
}
message ProxyAvailableRequest {
}
message ProxyAvailableReply {
repeated ProxyAvailableInfo availableTargets = 1;
}
message PingRequest {}
message PingReply {}

View File

@@ -1,6 +1,6 @@
quarkus.grpc.server.use-separate-server=false
dhfs.objects.peerdiscovery.port=42069
dhfs.objects.peerdiscovery.interval=5s
dhfs.objects.peerdiscovery.interval=4s
dhfs.objects.peerdiscovery.broadcast=true
dhfs.objects.sync.timeout=30
dhfs.objects.sync.ping.timeout=5
@@ -33,7 +33,7 @@ dhfs.objects.ref-processor.threads=4
dhfs.objects.opsender.batch-size=100
dhfs.objects.lock_timeout_secs=2
dhfs.local-discovery=true
dhfs.peerdiscovery.timeout=5000
dhfs.peerdiscovery.timeout=10000
quarkus.log.category."com.usatiuk".min-level=TRACE
quarkus.log.category."com.usatiuk".level=TRACE
quarkus.http.insecure-requests=enabled

View File

@@ -1,141 +0,0 @@
package com.usatiuk.dhfs.integration;
import com.github.dockerjava.api.model.Device;
import io.quarkus.logging.Log;
import org.junit.jupiter.api.*;
import org.slf4j.LoggerFactory;
import org.testcontainers.DockerClientFactory;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.containers.output.WaitingConsumer;
import org.testcontainers.containers.wait.strategy.Wait;
import java.io.IOException;
import java.time.Duration;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Stream;
import static org.awaitility.Awaitility.await;
public class DhfsFusex3ProxyIT {
GenericContainer<?> container1;
GenericContainer<?> container2;
GenericContainer<?> container3;
WaitingConsumer waitingConsumer1;
WaitingConsumer waitingConsumer2;
WaitingConsumer waitingConsumer3;
String c1uuid;
String c2uuid;
String c3uuid;
// This calculation is somewhat racy, so keep it hardcoded for now
long emptyFileCount = 9;
@BeforeEach
void setup(TestInfo testInfo) throws IOException, InterruptedException, TimeoutException {
// TODO: Dedup
Network network1 = Network.newNetwork();
Network network2 = Network.newNetwork();
container1 = new GenericContainer<>(DhfsImage.getInstance())
.withPrivilegedMode(true)
.withCreateContainerCmdModifier(cmd -> Objects.requireNonNull(cmd.getHostConfig()).withDevices(Device.parse("/dev/fuse")))
.waitingFor(Wait.forLogMessage(".*Listening.*", 1).withStartupTimeout(Duration.ofSeconds(60))).withNetwork(network1);
container2 = new GenericContainer<>(DhfsImage.getInstance())
.withPrivilegedMode(true)
.withCreateContainerCmdModifier(cmd -> Objects.requireNonNull(cmd.getHostConfig()).withDevices(Device.parse("/dev/fuse")))
.waitingFor(Wait.forLogMessage(".*Listening.*", 1).withStartupTimeout(Duration.ofSeconds(60))).withNetwork(network1);
container3 = new GenericContainer<>(DhfsImage.getInstance())
.withPrivilegedMode(true)
.withCreateContainerCmdModifier(cmd -> Objects.requireNonNull(cmd.getHostConfig()).withDevices(Device.parse("/dev/fuse")))
.waitingFor(Wait.forLogMessage(".*Listening.*", 1).withStartupTimeout(Duration.ofSeconds(60))).withNetwork(network2);
Stream.of(container1, container2, container3).parallel().forEach(GenericContainer::start);
var client = DockerClientFactory.instance().client();
client.connectToNetworkCmd().withContainerId(container2.getContainerId()).withNetworkId(network2.getId()).exec();
c1uuid = container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/data/stuff/self_uuid").getStdout();
c2uuid = container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/data/stuff/self_uuid").getStdout();
c3uuid = container3.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/data/stuff/self_uuid").getStdout();
Log.info(container1.getContainerId() + "=" + c1uuid);
Log.info(container2.getContainerId() + "=" + c2uuid);
Log.info(container3.getContainerId() + "=" + c3uuid);
waitingConsumer1 = new WaitingConsumer();
var loggingConsumer1 = new Slf4jLogConsumer(LoggerFactory.getLogger(DhfsFusex3ProxyIT.class))
.withPrefix(c1uuid.substring(0, 4) + "-1-" + testInfo.getDisplayName());
container1.followOutput(loggingConsumer1.andThen(waitingConsumer1));
waitingConsumer2 = new WaitingConsumer();
var loggingConsumer2 = new Slf4jLogConsumer(LoggerFactory.getLogger(DhfsFusex3ProxyIT.class))
.withPrefix(c2uuid.substring(0, 4) + "-2-" + testInfo.getDisplayName());
container2.followOutput(loggingConsumer2.andThen(waitingConsumer2));
waitingConsumer3 = new WaitingConsumer();
var loggingConsumer3 = new Slf4jLogConsumer(LoggerFactory.getLogger(DhfsFusex3ProxyIT.class))
.withPrefix(c3uuid.substring(0, 4) + "-3-" + testInfo.getDisplayName());
container3.followOutput(loggingConsumer3.andThen(waitingConsumer3));
Assertions.assertDoesNotThrow(() -> UUID.fromString(c1uuid));
Assertions.assertDoesNotThrow(() -> UUID.fromString(c2uuid));
Assertions.assertDoesNotThrow(() -> UUID.fromString(c3uuid));
waitingConsumer3.waitUntil(frame -> frame.getUtf8String().contains("New address"), 60, TimeUnit.SECONDS, 1);
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("New address"), 60, TimeUnit.SECONDS, 2);
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("New address"), 60, TimeUnit.SECONDS, 1);
Thread.sleep(2000);
var c1curl = container1.execInContainer("/bin/sh", "-c",
"curl --fail --header \"Content-Type: application/json\" " +
" --request PUT " +
" --data '{\"uuid\":\"" + c2uuid + "\"}' " +
" http://localhost:8080/objects-manage/known-peers");
var c2curl1 = container2.execInContainer("/bin/sh", "-c",
"curl --fail --header \"Content-Type: application/json\" " +
" --request PUT " +
" --data '{\"uuid\":\"" + c1uuid + "\"}' " +
" http://localhost:8080/objects-manage/known-peers");
var c2curl3 = container2.execInContainer("/bin/sh", "-c",
"curl --fail --header \"Content-Type: application/json\" " +
" --request PUT " +
" --data '{\"uuid\":\"" + c3uuid + "\"}' " +
" http://localhost:8080/objects-manage/known-peers");
var c3curl = container3.execInContainer("/bin/sh", "-c",
"curl --fail --header \"Content-Type: application/json\" " +
" --request PUT " +
" --data '{\"uuid\":\"" + c2uuid + "\"}' " +
" http://localhost:8080/objects-manage/known-peers");
Assertions.assertEquals(0, c1curl.getExitCode());
Assertions.assertEquals(0, c2curl1.getExitCode());
Assertions.assertEquals(0, c2curl3.getExitCode());
Assertions.assertEquals(0, c3curl.getExitCode());
waitingConsumer3.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS, 2);
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS, 2);
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS, 2);
}
@AfterEach
void stop() {
Stream.of(container1, container2, container3).parallel().forEach(GenericContainer::stop);
}
@Test
void readWriteFileTest() throws IOException, InterruptedException, TimeoutException {
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo tesempty > /root/dhfs_default/fuse/testf1").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> "tesempty\n".equals(container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf1").getStdout()));
await().atMost(45, TimeUnit.SECONDS).until(() -> "tesempty\n".equals(container3.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf1").getStdout()));
}
}