mirror of
https://github.com/usatiuk/dhfs.git
synced 2025-10-28 20:47:49 +01:00
tests fix
Some checks failed
Server / build (push) Failing after 17m18s
Some checks failed
Server / build (push) Failing after 17m18s
This commit is contained in:
@@ -158,9 +158,13 @@ public class RemoteHostManager {
|
||||
Log.trace("Ignoring new address from unknown host " + ": addr=" + addr + " port=" + port);
|
||||
return;
|
||||
}
|
||||
|
||||
_transientPeersState.runWriteLocked(d -> {
|
||||
Log.trace("Updating connection info for " + host + ": addr=" + addr + " port=" + port);
|
||||
d.getStates().put(host, state);
|
||||
d.getStates().putIfAbsent(host, new TransientPeerState()); // FIXME:? set reachable here?
|
||||
d.getStates().get(host).setAddr(addr);
|
||||
d.getStates().get(host).setPort(port);
|
||||
d.getStates().get(host).setSecurePort(securePort);
|
||||
return null;
|
||||
});
|
||||
}
|
||||
|
||||
@@ -50,6 +50,9 @@ public class RemoteObjectServiceClient {
|
||||
return persistentRemoteHostsService.getHosts().stream().map(HostInfo::getUuid).toList();
|
||||
});
|
||||
|
||||
if (targets.isEmpty())
|
||||
throw new IllegalStateException("No targets for object " + jObject.getName());
|
||||
|
||||
return rpcClientFactory.withObjSyncClient(targets, client -> {
|
||||
var reply = client.getObject(GetObjectRequest.newBuilder().setSelfUuid(persistentRemoteHostsService.getSelfUuid().toString()).setName(jObject.getName()).build());
|
||||
|
||||
|
||||
@@ -17,10 +17,7 @@ import org.eclipse.microprofile.config.inject.ConfigProperty;
|
||||
import javax.net.ssl.KeyManagerFactory;
|
||||
import java.security.KeyStore;
|
||||
import java.security.cert.Certificate;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.UUID;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
// TODO: Dedup this
|
||||
@@ -52,10 +49,14 @@ public class RpcClientFactory {
|
||||
for (UUID target : shuffledList) {
|
||||
var hostinfo = remoteHostManager.getTransientState(target);
|
||||
|
||||
boolean shouldTry = remoteHostManager.isReachable(target)
|
||||
&& hostinfo.getAddr() != null;
|
||||
boolean reachable = remoteHostManager.isReachable(target);
|
||||
var addr = hostinfo.getAddr();
|
||||
boolean shouldTry = reachable && addr != null;
|
||||
|
||||
if (!shouldTry) continue;
|
||||
if (!shouldTry) {
|
||||
Log.trace("Not trying " + target + ": " + "addr=" + Objects.toString(addr) + " reachable=" + reachable);
|
||||
continue;
|
||||
}
|
||||
|
||||
try {
|
||||
return withObjSyncClient(target.toString(), hostinfo.getAddr(), hostinfo.getSecurePort(), syncTimeout, fn);
|
||||
@@ -78,8 +79,7 @@ public class RpcClientFactory {
|
||||
|
||||
public <R> R withObjSyncClient(UUID target, ObjectSyncClientFunction<R> fn) {
|
||||
var hostinfo = remoteHostManager.getTransientState(target);
|
||||
if (hostinfo.getAddr() == null)
|
||||
throw new IllegalStateException("Address for " + target + " not yet known");
|
||||
if (hostinfo.getAddr() == null) throw new IllegalStateException("Address for " + target + " not yet known");
|
||||
return withObjSyncClient(target.toString(), hostinfo.getAddr(), hostinfo.getSecurePort(), syncTimeout, fn);
|
||||
}
|
||||
|
||||
@@ -88,17 +88,12 @@ public class RpcClientFactory {
|
||||
KeyStore ks = KeyStore.getInstance(KeyStore.getDefaultType());
|
||||
ks.load(null, null);
|
||||
|
||||
ks.setKeyEntry("clientkey",
|
||||
persistentRemoteHostsService.getSelfKeypair().getPrivate(), null,
|
||||
new Certificate[]{persistentRemoteHostsService.getSelfCertificate()});
|
||||
ks.setKeyEntry("clientkey", persistentRemoteHostsService.getSelfKeypair().getPrivate(), null, new Certificate[]{persistentRemoteHostsService.getSelfCertificate()});
|
||||
|
||||
KeyManagerFactory keyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
|
||||
keyManagerFactory.init(ks, null);
|
||||
|
||||
ChannelCredentials creds = TlsChannelCredentials.newBuilder()
|
||||
.trustManager(peerTrustManager)
|
||||
.keyManager(keyManagerFactory.getKeyManagers())
|
||||
.build();
|
||||
ChannelCredentials creds = TlsChannelCredentials.newBuilder().trustManager(peerTrustManager).keyManager(keyManagerFactory.getKeyManagers()).build();
|
||||
return creds;
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
@@ -107,12 +102,8 @@ public class RpcClientFactory {
|
||||
|
||||
public <R> R withObjSyncClient(String host, String addr, int port, long timeout, ObjectSyncClientFunction<R> fn) {
|
||||
var creds = getChannelCredentials();
|
||||
var channel = NettyChannelBuilder.forAddress(addr, port, creds)
|
||||
.overrideAuthority(host).build();
|
||||
var client = DhfsObjectSyncGrpcGrpc.newBlockingStub(channel)
|
||||
.withMaxOutboundMessageSize(Integer.MAX_VALUE)
|
||||
.withMaxInboundMessageSize(Integer.MAX_VALUE)
|
||||
.withDeadlineAfter(timeout, TimeUnit.SECONDS);
|
||||
var channel = NettyChannelBuilder.forAddress(addr, port, creds).overrideAuthority(host).build();
|
||||
var client = DhfsObjectSyncGrpcGrpc.newBlockingStub(channel).withMaxOutboundMessageSize(Integer.MAX_VALUE).withMaxInboundMessageSize(Integer.MAX_VALUE).withDeadlineAfter(timeout, TimeUnit.SECONDS);
|
||||
try {
|
||||
return fn.apply(client);
|
||||
} finally {
|
||||
@@ -127,18 +118,13 @@ public class RpcClientFactory {
|
||||
|
||||
public <R> R withPeerSyncClient(UUID target, PeerSyncClientFunction<R> fn) {
|
||||
var hostinfo = remoteHostManager.getTransientState(target);
|
||||
if (hostinfo.getAddr() == null)
|
||||
throw new IllegalStateException("Address for " + target + " not yet known");
|
||||
if (hostinfo.getAddr() == null) throw new IllegalStateException("Address for " + target + " not yet known");
|
||||
return withPeerSyncClient(hostinfo.getAddr(), hostinfo.getPort(), peerSyncTimeout, fn);
|
||||
}
|
||||
|
||||
public <R> R withPeerSyncClient(String addr, int port, long timeout, PeerSyncClientFunction<R> fn) {
|
||||
var channel = NettyChannelBuilder.forAddress(addr, port).negotiationType(NegotiationType.PLAINTEXT)
|
||||
.usePlaintext().build();
|
||||
var client = DhfsObjectPeerSyncGrpcGrpc.newBlockingStub(channel)
|
||||
.withMaxOutboundMessageSize(Integer.MAX_VALUE)
|
||||
.withMaxInboundMessageSize(Integer.MAX_VALUE)
|
||||
.withDeadlineAfter(timeout, TimeUnit.SECONDS);
|
||||
var channel = NettyChannelBuilder.forAddress(addr, port).negotiationType(NegotiationType.PLAINTEXT).usePlaintext().build();
|
||||
var client = DhfsObjectPeerSyncGrpcGrpc.newBlockingStub(channel).withMaxOutboundMessageSize(Integer.MAX_VALUE).withMaxInboundMessageSize(Integer.MAX_VALUE).withDeadlineAfter(timeout, TimeUnit.SECONDS);
|
||||
try {
|
||||
return fn.apply(client);
|
||||
} finally {
|
||||
|
||||
@@ -73,16 +73,20 @@ public class DhfsFuseIT {
|
||||
Assertions.assertDoesNotThrow(() -> UUID.fromString(c1uuid));
|
||||
Assertions.assertDoesNotThrow(() -> UUID.fromString(c2uuid));
|
||||
|
||||
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Ignoring new address"), 60, TimeUnit.SECONDS);
|
||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Ignoring new address"), 60, TimeUnit.SECONDS);
|
||||
|
||||
var c1curl = container1.execInContainer("/bin/sh", "-c",
|
||||
"curl --header \"Content-Type: application/json\" " +
|
||||
" --request PUT " +
|
||||
" --data '{\"uuid\":\"" + c2uuid + "\"}' " +
|
||||
" --data " + c2uuid +
|
||||
" http://localhost:8080/objects-manage/known-peers");
|
||||
|
||||
var c1res = container1.execInContainer("/bin/sh", "-c",
|
||||
"curl http://localhost:8080/objects-manage/known-peers");
|
||||
var c2res = container2.execInContainer("/bin/sh", "-c",
|
||||
"curl http://localhost:8080/objects-manage/known-peers");
|
||||
var c2curl = container2.execInContainer("/bin/sh", "-c",
|
||||
"curl --header \"Content-Type: application/json\" " +
|
||||
" --request PUT " +
|
||||
" --data " + c1uuid +
|
||||
" http://localhost:8080/objects-manage/known-peers");
|
||||
|
||||
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
|
||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
|
||||
|
||||
@@ -88,15 +88,32 @@ public class DhfsFusex3IT {
|
||||
Assertions.assertDoesNotThrow(() -> UUID.fromString(c2uuid));
|
||||
Assertions.assertDoesNotThrow(() -> UUID.fromString(c3uuid));
|
||||
|
||||
waitingConsumer3.waitUntil(frame -> frame.getUtf8String().contains("Ignoring new address"), 60, TimeUnit.SECONDS, 4);
|
||||
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Ignoring new address"), 60, TimeUnit.SECONDS, 4);
|
||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Ignoring new address"), 60, TimeUnit.SECONDS, 4);
|
||||
|
||||
var c1curl = container1.execInContainer("/bin/sh", "-c",
|
||||
"curl --header \"Content-Type: application/json\" " +
|
||||
" --request PUT " +
|
||||
" --data '{\"uuid\":\"" + c2uuid + "\"}' " +
|
||||
" --data " + c2uuid +
|
||||
" http://localhost:8080/objects-manage/known-peers");
|
||||
|
||||
var c2curl1 = container2.execInContainer("/bin/sh", "-c",
|
||||
"curl --header \"Content-Type: application/json\" " +
|
||||
" --request PUT " +
|
||||
" --data " + c1uuid +
|
||||
" http://localhost:8080/objects-manage/known-peers");
|
||||
|
||||
var c2curl3 = container2.execInContainer("/bin/sh", "-c",
|
||||
"curl --header \"Content-Type: application/json\" " +
|
||||
" --request PUT " +
|
||||
" --data " + c3uuid +
|
||||
" http://localhost:8080/objects-manage/known-peers");
|
||||
|
||||
var c3curl = container3.execInContainer("/bin/sh", "-c",
|
||||
"curl --header \"Content-Type: application/json\" " +
|
||||
" --request PUT " +
|
||||
" --data '{\"uuid\":\"" + c1uuid + "\"}' " +
|
||||
" --data " + c2uuid +
|
||||
" http://localhost:8080/objects-manage/known-peers");
|
||||
|
||||
waitingConsumer3.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS, 2);
|
||||
|
||||
Reference in New Issue
Block a user