2 Commits

Author SHA1 Message Date
a2e75dbdc7 Server: some peer sync fixes 2025-03-16 20:07:45 +01:00
fa64dac9aa Server: peer connected/disconnected event listeners 2025-03-16 12:58:56 +01:00
17 changed files with 223 additions and 78 deletions

View File

@@ -0,0 +1,11 @@
package com.usatiuk.dhfs.objects.repository;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
public @interface JDataRemotePush {
}

View File

@@ -0,0 +1,7 @@
package com.usatiuk.dhfs.objects.repository;
import com.usatiuk.dhfs.objects.PeerId;
public interface PeerConnectedEventListener {
void handlePeerConnected(PeerId peerId);
}

View File

@@ -0,0 +1,7 @@
package com.usatiuk.dhfs.objects.repository;
import com.usatiuk.dhfs.objects.PeerId;
public interface PeerDisconnectedEventListener {
void handlePeerDisconnected(PeerId peerId);
}

View File

@@ -17,11 +17,15 @@ import io.smallrye.common.annotation.Blocking;
import jakarta.annotation.Priority;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
import jakarta.enterprise.inject.Instance;
import jakarta.inject.Inject;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import java.io.IOException;
import java.util.*;
import java.util.Base64;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.*;
import java.util.stream.Collectors;
@@ -29,8 +33,8 @@ import java.util.stream.Collectors;
public class PeerManager {
private final ConcurrentMap<PeerId, PeerAddress> _states = new ConcurrentHashMap<>();
// FIXME: Ideally not call them on every ping
private final ArrayList<ConnectionEventListener> _connectedListeners = new ArrayList<>();
private final ArrayList<ConnectionEventListener> _disconnectedListeners = new ArrayList<>();
private final Collection<PeerConnectedEventListener> _connectedListeners;
private final Collection<PeerDisconnectedEventListener> _disconnectedListeners;
@Inject
PersistentPeerDataService persistentPeerDataService;
@Inject
@@ -53,6 +57,11 @@ public class PeerManager {
SyncHandler syncHandler;
private ExecutorService _heartbeatExecutor;
public PeerManager(Instance<PeerConnectedEventListener> connectedListeners, Instance<PeerDisconnectedEventListener> disconnectedListeners) {
_connectedListeners = List.copyOf(connectedListeners.stream().toList());
_disconnectedListeners = List.copyOf(disconnectedListeners.stream().toList());
}
// Note: keep priority updated with below
void init(@Observes @Priority(600) StartupEvent event) throws IOException {
_heartbeatExecutor = Executors.newVirtualThreadPerTaskExecutor();
@@ -63,7 +72,13 @@ public class PeerManager {
public void tryConnectAll() {
if (_heartbeatExecutor == null) return;
try {
_heartbeatExecutor.invokeAll(peerInfoService.getPeersNoSelf()
var peers = peerInfoService.getPeersNoSelf();
var pids = peers.stream().map(PeerInfo::id).toList();
List<PeerId> stale = _states.keySet().stream().filter(p -> !pids.contains(p)).toList();
stale.forEach(_states.keySet()::remove);
_heartbeatExecutor.invokeAll(peers
.stream()
.<Callable<Void>>map(host -> () -> {
try {
@@ -71,13 +86,13 @@ public class PeerManager {
Log.tracev("Heartbeat: {0}", host);
else
Log.debugv("Trying to connect to {0}", host);
var bestAddr = selectBestAddress(host.id());
if (pingCheck(host, bestAddr))
var bestAddr = selectBestAddress(host.id()).orElse(null);
if (bestAddr != null && pingCheck(host, bestAddr))
handleConnectionSuccess(host, bestAddr);
else
handleConnectionError(host);
} catch (Exception e) {
Log.errorv("Failed to connect to {0} because {1}", host, e);
Log.error("Failed to connect to " + host.key(), e);
}
return null;
}).toList(), 30, TimeUnit.SECONDS); //FIXME:
@@ -86,20 +101,6 @@ public class PeerManager {
}
}
// Note: registrations should be completed with Priority < 600
public void registerConnectEventListener(ConnectionEventListener listener) {
synchronized (_connectedListeners) {
_connectedListeners.add(listener);
}
}
// Note: registrations should be completed with Priority < 600
public void registerDisconnectEventListener(ConnectionEventListener listener) {
synchronized (_disconnectedListeners) {
_disconnectedListeners.add(listener);
}
}
private void handleConnectionSuccess(PeerInfo host, PeerAddress address) {
boolean wasReachable = isReachable(host);
@@ -114,9 +115,9 @@ public class PeerManager {
Log.infov("Connected to {0}", host);
// for (var l : _connectedListeners) {
// l.apply(host);
// }
for (var l : _connectedListeners) {
l.handlePeerConnected(host.id());
}
}
public void handleConnectionError(PeerInfo host) {
@@ -127,9 +128,9 @@ public class PeerManager {
_states.remove(host.id());
// for (var l : _disconnectedListeners) {
// l.apply(host);
// }
for (var l : _disconnectedListeners) {
l.handlePeerDisconnected(host.id());
}
}
// FIXME:
@@ -181,20 +182,17 @@ public class PeerManager {
});
}
private PeerAddress selectBestAddress(PeerId host) {
return peerDiscoveryDirectory.getForPeer(host).stream().findFirst().orElseThrow();
private Optional<PeerAddress> selectBestAddress(PeerId host) {
return peerDiscoveryDirectory.getForPeer(host).stream().findFirst();
}
public void addRemoteHost(PeerId host) {
if (_states.containsKey(host)) {
throw new IllegalStateException("Host " + host + " is already added");
}
transactionManager.run(() -> {
if (peerInfoService.getPeerInfo(host).isPresent())
throw new IllegalStateException("Host " + host + " is already added");
var info = peerSyncApiClient.getSelfInfo(selectBestAddress(host));
var addr = selectBestAddress(host).orElseThrow(() -> new IllegalStateException("Host " + host + " is unreachable"));
var info = peerSyncApiClient.getSelfInfo(addr);
var cert = Base64.getDecoder().decode(info.cert());
peerInfoService.putPeer(host, cert);
@@ -210,11 +208,6 @@ public class PeerManager {
});
}
@FunctionalInterface
public interface ConnectionEventListener {
void apply(UUID host);
}
public record HostStateSnapshot(Collection<PeerId> available, Collection<PeerId> unavailable) {
}

View File

@@ -2,10 +2,10 @@ package com.usatiuk.dhfs.objects.repository;
import com.usatiuk.dhfs.ShutdownChecker;
import com.usatiuk.dhfs.objects.PeerId;
import com.usatiuk.dhfs.objects.transaction.TransactionManager;
import com.usatiuk.dhfs.objects.repository.peersync.PeerInfoService;
import com.usatiuk.dhfs.objects.repository.peertrust.PeerTrustManager;
import com.usatiuk.dhfs.objects.transaction.Transaction;
import com.usatiuk.dhfs.objects.transaction.TransactionManager;
import io.quarkus.logging.Log;
import io.quarkus.runtime.StartupEvent;
import jakarta.annotation.Priority;
@@ -102,20 +102,9 @@ public class PersistentPeerDataService {
return _selfUuid;
}
// private void updateCerts() {
// try {
// peerDirectory.get().runReadLocked(JObjectManager.ResolutionStrategy.LOCAL_ONLY, (m, d) -> {
// peerTrustManager.reloadTrustManagerHosts(getHostsNoNulls());
// // Fixme:? I don't think it should be needed with custom trust store
// // but it doesn't work?
// rpcClientFactory.dropCache();
// return null;
// });
// } catch (Exception ex) {
// Log.warn("Error when refreshing certificates, will retry: " + ex.getMessage());
// pushPeerUpdates();
// }
// }
public void updateCerts() {
peerTrustManager.reloadTrustManagerHosts(txm.run(() -> peerInfoService.getPeers().stream().toList())); //FIXME:
}
public KeyPair getSelfKeypair() {
return _selfKeyPair;

View File

@@ -1,6 +1,7 @@
package com.usatiuk.dhfs.objects.repository.invalidation;
import com.usatiuk.dhfs.objects.PeerId;
import com.usatiuk.dhfs.objects.repository.PeerConnectedEventListener;
import com.usatiuk.dhfs.objects.repository.PeerManager;
import com.usatiuk.dhfs.utils.SerializationHelper;
import io.quarkus.logging.Log;
@@ -20,7 +21,7 @@ import java.nio.file.Files;
import java.nio.file.Paths;
@ApplicationScoped
public class DeferredInvalidationQueueService {
public class DeferredInvalidationQueueService implements PeerConnectedEventListener {
private static final String dataFileName = "invqueue";
@Inject
PeerManager remoteHostManager;
@@ -37,7 +38,6 @@ public class DeferredInvalidationQueueService {
Log.info("Reading invalidation queue");
_persistentData = SerializationHelper.deserialize(Files.readAllBytes(Paths.get(dataRoot).resolve(dataFileName)));
}
// remoteHostManager.registerConnectEventListener(this::returnForHost);
}
void shutdown(@Observes @Priority(300) ShutdownEvent event) throws IOException {
@@ -80,4 +80,9 @@ public class DeferredInvalidationQueueService {
_persistentData.deferredInvalidations.put(entry.peer(), entry);
}
}
@Override
public void handlePeerConnected(PeerId peerId) {
returnForHost(peerId);
}
}

View File

@@ -1,14 +1,16 @@
package com.usatiuk.dhfs.objects.repository.invalidation;
import com.usatiuk.dhfs.objects.JDataRemote;
import com.usatiuk.dhfs.objects.JObjectKey;
import com.usatiuk.dhfs.objects.PeerId;
import com.usatiuk.dhfs.objects.RemoteObjectMeta;
import com.usatiuk.dhfs.objects.repository.JDataRemoteDto;
import org.pcollections.PMap;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
public record IndexUpdateOp(JObjectKey key, PMap<PeerId, Long> changelog) implements Op {
public record IndexUpdateOp(JObjectKey key, PMap<PeerId, Long> changelog, JDataRemoteDto data) implements Op {
@Override
public Collection<JObjectKey> getEscapedRefs() {
return List.of(key);

View File

@@ -100,6 +100,7 @@ public class InvalidationQueueService {
long success = 0;
for (var e : data) {
// TODO: Race?
if (peerInfoService.getPeerInfo(e.peer()).isEmpty()) continue;
if (!remoteHostManager.isReachable(e.peer())) {

View File

@@ -3,11 +3,15 @@ package com.usatiuk.dhfs.objects.repository.invalidation;
import com.usatiuk.dhfs.objects.JData;
import com.usatiuk.dhfs.objects.RemoteObjectMeta;
import com.usatiuk.dhfs.objects.RemoteTransaction;
import com.usatiuk.dhfs.objects.transaction.TransactionManager;
import com.usatiuk.dhfs.objects.jkleppmanntree.JKleppmannTreeManager;
import com.usatiuk.dhfs.objects.jkleppmanntree.structs.JKleppmannTreePersistentData;
import com.usatiuk.dhfs.objects.repository.JDataRemoteDto;
import com.usatiuk.dhfs.objects.repository.JDataRemotePush;
import com.usatiuk.dhfs.objects.repository.RemoteObjectServiceClient;
import com.usatiuk.dhfs.objects.repository.syncmap.DtoMapperService;
import com.usatiuk.dhfs.objects.transaction.Transaction;
import com.usatiuk.dhfs.objects.transaction.TransactionManager;
import io.quarkus.logging.Log;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
@@ -27,13 +31,24 @@ public class OpPusher {
InvalidationQueueService invalidationQueueService;
@Inject
JKleppmannTreeManager jKleppmannTreeManager;
@Inject
DtoMapperService dtoMapperService;
public void doPush(InvalidationQueueEntry entry) {
List<Op> info = txm.run(() -> {
var obj = curTx.get(JData.class, entry.key()).orElse(null);
switch (obj) {
case RemoteObjectMeta remote -> {
return List.of(new IndexUpdateOp(entry.key(), remote.changelog()));
JDataRemoteDto data =
remote.knownType().isAnnotationPresent(JDataRemotePush.class)
? remoteTransaction.getData(remote.knownType(), entry.key())
.map(d -> dtoMapperService.toDto(d, d.dtoClass())).orElse(null)
: null;
if (remote.knownType().isAnnotationPresent(JDataRemotePush.class) && data == null) {
Log.warnv("Failed to get data for push {0} of type {1}", entry.key(), remote.knownType());
}
return List.of(new IndexUpdateOp(entry.key(), remote.changelog(), data));
}
case JKleppmannTreePersistentData pd -> {
var tree = jKleppmannTreeManager.getTree(pd.key());

View File

@@ -17,6 +17,6 @@ public class PushOpHandler {
RemoteTransaction remoteTransaction;
public void handlePush(PeerId peer, IndexUpdateOp obj) {
syncHandler.handleRemoteUpdate(peer, obj.key(), obj.changelog(), null);
syncHandler.handleRemoteUpdate(peer, obj.key(), obj.changelog(), obj.data());
}
}

View File

@@ -8,6 +8,7 @@ import org.apache.commons.collections4.multimap.HashSetValuedHashMap;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
@@ -49,6 +50,9 @@ public class PeerDiscoveryDirectory {
public Collection<PeerAddress> getForPeer(PeerId peer) {
synchronized (_entries) {
long curTime = System.currentTimeMillis();
if (_entries.asMap().get(peer) == null) {
return List.of();
}
var partitioned = _entries.asMap().get(peer).stream()
.collect(Collectors.partitioningBy(e -> e.lastSeen() + timeout < curTime));
for (var entry : partitioned.get(true)) {

View File

@@ -6,9 +6,11 @@ import com.usatiuk.dhfs.objects.JObjectKey;
import com.usatiuk.dhfs.objects.PeerId;
import com.usatiuk.dhfs.objects.repository.CertificateTools;
import com.usatiuk.dhfs.objects.repository.JDataRemoteDto;
import com.usatiuk.dhfs.objects.repository.JDataRemotePush;
import java.security.cert.X509Certificate;
@JDataRemotePush
public record PeerInfo(JObjectKey key, PeerId id, ByteString cert) implements JDataRemote, JDataRemoteDto {
public PeerInfo(PeerId id, byte[] cert) {
this(id.toJObjectKey(), id, ByteString.copyFrom(cert));

View File

@@ -38,9 +38,19 @@ public class PeerInfoService {
return jKleppmannTreeManager.getTree(JObjectKey.of("peers"), LockingStrategy.OPTIMISTIC);
}
public Optional<PeerInfo> getPeerInfoImpl(JObjectKey key) {
return jObjectTxManager.run(() -> {
return curTx.get(JKleppmannTreeNode.class, key).flatMap(node -> {
var meta = (JKleppmannTreeNodeMetaPeer) node.meta();
return remoteTx.getData(PeerInfo.class, meta.getPeerId());
});
});
}
public Optional<PeerInfo> getPeerInfo(PeerId peer) {
return jObjectTxManager.run(() -> {
var gotKey = getTreeR().traverse(List.of(peer.toString()));
var gotKey = getTreeR().traverse(List.of(JKleppmannTreeNodeMetaPeer.peerIdToNodeId(peer).name()));
if (gotKey == null) {
return Optional.empty();
}
@@ -56,8 +66,12 @@ public class PeerInfoService {
var gotKey = getTreeR().traverse(List.of());
return curTx.get(JKleppmannTreeNode.class, gotKey).map(
node -> node.children().keySet().stream()
.map(PeerId::of).map(this::getPeerInfo)
.map(Optional::get).toList())
.map(JObjectKey::of).map(this::getPeerInfoImpl)
.filter(o -> {
if (o.isEmpty())
Log.warnv("Could not get peer info for peer {0}", o);
return o.isPresent();
}).map(Optional::get).toList())
.orElseThrow();
});
}
@@ -67,7 +81,7 @@ public class PeerInfoService {
var gotKey = getTreeR().traverse(List.of());
return curTx.get(JKleppmannTreeNode.class, gotKey).map(
node -> node.children().keySet().stream()
.map(PeerId::of).map(this::getPeerInfo)
.map(JObjectKey::of).map(this::getPeerInfoImpl)
.map(Optional::get).filter(
peerInfo -> !peerInfo.id().equals(persistentPeerDataService.getSelfUuid())).toList())
.orElseThrow();
@@ -79,22 +93,23 @@ public class PeerInfoService {
var parent = getTreeW().traverse(List.of());
var newPeerInfo = new PeerInfo(id, cert);
remoteTx.putData(newPeerInfo);
getTreeW().move(parent, new JKleppmannTreeNodeMetaPeer(newPeerInfo.id()), getTreeW().getNewNodeId());
getTreeW().move(parent, new JKleppmannTreeNodeMetaPeer(newPeerInfo.id()), JKleppmannTreeNodeMetaPeer.peerIdToNodeId(newPeerInfo.id()));
});
}
public void removePeer(PeerId id) {
jObjectTxManager.run(() -> {
var gotKey = getTreeW().traverse(List.of(id.toString()));
var gotKey = getTreeR().traverse(List.of(JKleppmannTreeNodeMetaPeer.peerIdToNodeId(id).name()));
if (gotKey == null) {
return;
}
var meta = curTx.get(JKleppmannTreeNode.class, gotKey).map(node -> (JKleppmannTreeNodeMetaPeer) node.meta()).orElse(null);
if (meta == null) {
var node = curTx.get(JKleppmannTreeNode.class, gotKey).orElse(null);
if (node == null) {
Log.warn("Peer " + id + " not found in the tree");
return;
}
getTreeW().trash(meta, id.toJObjectKey());
getTreeW().trash(node.meta(), node.key());
curTx.onCommit(persistentPeerDataService::updateCerts);
});
}
}

View File

@@ -11,7 +11,7 @@ public class JKleppmannTreeNodeMetaPeer extends JKleppmannTreeNodeMeta {
private final JObjectKey _peerId;
public JKleppmannTreeNodeMetaPeer(PeerId id) {
super(id.toString());
super(peerIdToNodeId(id).name());
_peerId = id.toJObjectKey();
}
@@ -19,9 +19,14 @@ public class JKleppmannTreeNodeMetaPeer extends JKleppmannTreeNodeMeta {
return _peerId;
}
public static JObjectKey peerIdToNodeId(PeerId id) {
return JObjectKey.of(id.toJObjectKey().name() + "_tree_node");
}
@Override
public JKleppmannTreeNodeMeta withName(String name) {
assert false;
assert name.equals(peerIdToNodeId(PeerId.of(getPeerId().name())).toString());
assert getName().equals(name);
return this;
}

View File

@@ -0,0 +1,73 @@
package com.usatiuk.dhfs.objects.repository.peertrust;
import com.usatiuk.dhfs.objects.JData;
import com.usatiuk.dhfs.objects.JObjectKey;
import com.usatiuk.dhfs.objects.RemoteObjectDataWrapper;
import com.usatiuk.dhfs.objects.jkleppmanntree.structs.JKleppmannTreeNode;
import com.usatiuk.dhfs.objects.repository.PersistentPeerDataService;
import com.usatiuk.dhfs.objects.repository.peersync.PeerInfo;
import com.usatiuk.dhfs.objects.transaction.PreCommitTxHook;
import com.usatiuk.dhfs.objects.transaction.Transaction;
import io.quarkus.logging.Log;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
@ApplicationScoped
public class PeerInfoCertUpdateTxHook implements PreCommitTxHook {
@Inject
Transaction curTx;
@Inject
PersistentPeerDataService persistentPeerDataService;
@Override
public void onChange(JObjectKey key, JData old, JData cur) {
if (cur instanceof JKleppmannTreeNode n) {
if (n.key().name().equals("peers_jt_root")) {
Log.infov("Changed peer tree root: {0} to {1}", key, cur);
curTx.onCommit(() -> persistentPeerDataService.updateCerts());
return;
}
}
if (!(cur instanceof RemoteObjectDataWrapper remote)) {
return;
}
if (!(remote.data() instanceof PeerInfo))
return;
Log.infov("Changed peer info: {0} to {1}", key, cur);
curTx.onCommit(() -> persistentPeerDataService.updateCerts());
}
@Override
public void onCreate(JObjectKey key, JData cur) {
if (!(cur instanceof RemoteObjectDataWrapper remote)) {
return;
}
if (!(remote.data() instanceof PeerInfo))
return;
Log.infov("Created peer info: {0}, {1}", key, cur);
curTx.onCommit(() -> persistentPeerDataService.updateCerts());
}
@Override
public void onDelete(JObjectKey key, JData cur) {
if (!(cur instanceof RemoteObjectDataWrapper remote)) {
return;
}
if (!(remote.data() instanceof PeerInfo))
return;
Log.infov("Deleted peer info: {0}, {1}", key, cur);
curTx.onCommit(() -> persistentPeerDataService.updateCerts());
}
}

View File

@@ -1,8 +1,12 @@
package com.usatiuk.dhfs.objects.repository.peertrust;
import com.usatiuk.dhfs.objects.repository.peersync.PeerInfo;
import com.usatiuk.dhfs.objects.repository.peersync.PeerInfoService;
import io.quarkus.logging.Log;
import io.quarkus.scheduler.Scheduled;
import io.smallrye.common.annotation.Blocking;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import org.apache.commons.lang3.tuple.Pair;
import javax.net.ssl.TrustManager;
@@ -17,6 +21,9 @@ import java.util.concurrent.atomic.AtomicReference;
@ApplicationScoped
public class PeerTrustManager implements X509TrustManager {
@Inject
PeerInfoService peerInfoService;
private final AtomicReference<X509TrustManager> trustManager = new AtomicReference<>();
@Override
@@ -44,6 +51,13 @@ public class PeerTrustManager implements X509TrustManager {
}
}
// FIXME:
@Scheduled(every = "15s", concurrentExecution = Scheduled.ConcurrentExecution.SKIP)
@Blocking
void hackRefresh() {
reloadTrustManagerHosts(peerInfoService.getPeers());
}
private synchronized void reloadTrustManager(Collection<Pair<String, X509Certificate>> certs) throws Exception {
KeyStore ts = KeyStore.getInstance(KeyStore.getDefaultType());
ts.load(null, null);

View File

@@ -142,17 +142,17 @@ public class DhfsFusex3IT {
@Test
void largerFileDeleteTest() throws IOException, InterruptedException, TimeoutException {
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "cd /root/dhfs_default/fuse && curl -O https://ash-speed.hetzner.com/100MB.bin").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "head -c 10 /root/dhfs_default/fuse/100MB.bin").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container3.execInContainer("/bin/sh", "-c", "rm /root/dhfs_default/fuse/100MB.bin").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "cd /root/dhfs_default/fuse && dd if=/dev/urandom of=10MB.bin bs=1M count=10").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "head -c 10 /root/dhfs_default/fuse/10MB.bin").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container3.execInContainer("/bin/sh", "-c", "rm /root/dhfs_default/fuse/10MB.bin").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> checkEmpty());
}
@Test
void largerFileDeleteTestNoDelays() throws IOException, InterruptedException, TimeoutException {
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "cd /root/dhfs_default/fuse && curl -O https://ash-speed.hetzner.com/100MB.bin").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "head -c 10 /root/dhfs_default/fuse/100MB.bin").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container3.execInContainer("/bin/sh", "-c", "rm /root/dhfs_default/fuse/100MB.bin").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "cd /root/dhfs_default/fuse && dd if=/dev/urandom of=10MB.bin bs=1M count=10").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "head -c 10 /root/dhfs_default/fuse/10MB.bin").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container3.execInContainer("/bin/sh", "-c", "rm /root/dhfs_default/fuse/10MB.bin").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> checkEmpty());
}
@@ -190,6 +190,8 @@ public class DhfsFusex3IT {
" --data '{\"uuid\":\"" + c2uuid + "\"}' " +
" http://localhost:8080/objects-manage/known-peers");
Thread.sleep(10000);
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "echo rewritten > /root/dhfs_default/fuse/testf1").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> "rewritten\n".equals(container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf1").getStdout()));
await().atMost(45, TimeUnit.SECONDS).until(() -> "tesempty\n".equals(container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf1").getStdout()));