From a2e75dbdc7a51cf1c04c1f93e6510190a82f9129 Mon Sep 17 00:00:00 2001 From: Stepan Usatiuk Date: Sun, 16 Mar 2025 20:07:45 +0100 Subject: [PATCH] Server: some peer sync fixes --- .../objects/repository/JDataRemotePush.java | 11 +++ .../dhfs/objects/repository/PeerManager.java | 26 ++++--- .../repository/PersistentPeerDataService.java | 17 +---- .../invalidation/IndexUpdateOp.java | 6 +- .../InvalidationQueueService.java | 1 + .../repository/invalidation/OpPusher.java | 19 ++++- .../invalidation/PushOpHandler.java | 2 +- .../peerdiscovery/PeerDiscoveryDirectory.java | 4 + .../objects/repository/peersync/PeerInfo.java | 2 + .../repository/peersync/PeerInfoService.java | 33 ++++++--- .../structs/JKleppmannTreeNodeMetaPeer.java | 9 ++- .../peertrust/PeerInfoCertUpdateTxHook.java | 73 +++++++++++++++++++ .../peertrust/PeerTrustManager.java | 14 ++++ .../dhfs/integration/DhfsFusex3IT.java | 14 ++-- 14 files changed, 184 insertions(+), 47 deletions(-) create mode 100644 dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/JDataRemotePush.java create mode 100644 dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/peertrust/PeerInfoCertUpdateTxHook.java diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/JDataRemotePush.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/JDataRemotePush.java new file mode 100644 index 00000000..567ed036 --- /dev/null +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/JDataRemotePush.java @@ -0,0 +1,11 @@ +package com.usatiuk.dhfs.objects.repository; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +@Retention(RetentionPolicy.RUNTIME) +@Target(ElementType.TYPE) +public @interface JDataRemotePush { +} diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/PeerManager.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/PeerManager.java index 5788ed6b..bb0ba9be 100644 --- a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/PeerManager.java +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/PeerManager.java @@ -25,6 +25,7 @@ import java.io.IOException; import java.util.Base64; import java.util.Collection; import java.util.List; +import java.util.Optional; import java.util.concurrent.*; import java.util.stream.Collectors; @@ -71,7 +72,13 @@ public class PeerManager { public void tryConnectAll() { if (_heartbeatExecutor == null) return; try { - _heartbeatExecutor.invokeAll(peerInfoService.getPeersNoSelf() + var peers = peerInfoService.getPeersNoSelf(); + var pids = peers.stream().map(PeerInfo::id).toList(); + + List stale = _states.keySet().stream().filter(p -> !pids.contains(p)).toList(); + stale.forEach(_states.keySet()::remove); + + _heartbeatExecutor.invokeAll(peers .stream() .>map(host -> () -> { try { @@ -79,13 +86,13 @@ public class PeerManager { Log.tracev("Heartbeat: {0}", host); else Log.debugv("Trying to connect to {0}", host); - var bestAddr = selectBestAddress(host.id()); - if (pingCheck(host, bestAddr)) + var bestAddr = selectBestAddress(host.id()).orElse(null); + if (bestAddr != null && pingCheck(host, bestAddr)) handleConnectionSuccess(host, bestAddr); else handleConnectionError(host); } catch (Exception e) { - Log.errorv("Failed to connect to {0} because {1}", host, e); + Log.error("Failed to connect to " + host.key(), e); } return null; }).toList(), 30, TimeUnit.SECONDS); //FIXME: @@ -175,20 +182,17 @@ public class PeerManager { }); } - private PeerAddress selectBestAddress(PeerId host) { - return peerDiscoveryDirectory.getForPeer(host).stream().findFirst().orElseThrow(); + private Optional selectBestAddress(PeerId host) { + return peerDiscoveryDirectory.getForPeer(host).stream().findFirst(); } public void addRemoteHost(PeerId host) { - if (_states.containsKey(host)) { - throw new IllegalStateException("Host " + host + " is already added"); - } - transactionManager.run(() -> { if (peerInfoService.getPeerInfo(host).isPresent()) throw new IllegalStateException("Host " + host + " is already added"); - var info = peerSyncApiClient.getSelfInfo(selectBestAddress(host)); + var addr = selectBestAddress(host).orElseThrow(() -> new IllegalStateException("Host " + host + " is unreachable")); + var info = peerSyncApiClient.getSelfInfo(addr); var cert = Base64.getDecoder().decode(info.cert()); peerInfoService.putPeer(host, cert); diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/PersistentPeerDataService.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/PersistentPeerDataService.java index af2c15cc..a46d672b 100644 --- a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/PersistentPeerDataService.java +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/PersistentPeerDataService.java @@ -102,20 +102,9 @@ public class PersistentPeerDataService { return _selfUuid; } -// private void updateCerts() { -// try { -// peerDirectory.get().runReadLocked(JObjectManager.ResolutionStrategy.LOCAL_ONLY, (m, d) -> { -// peerTrustManager.reloadTrustManagerHosts(getHostsNoNulls()); -// // Fixme:? I don't think it should be needed with custom trust store -// // but it doesn't work? -// rpcClientFactory.dropCache(); -// return null; -// }); -// } catch (Exception ex) { -// Log.warn("Error when refreshing certificates, will retry: " + ex.getMessage()); -// pushPeerUpdates(); -// } -// } + public void updateCerts() { + peerTrustManager.reloadTrustManagerHosts(txm.run(() -> peerInfoService.getPeers().stream().toList())); //FIXME: + } public KeyPair getSelfKeypair() { return _selfKeyPair; diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/invalidation/IndexUpdateOp.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/invalidation/IndexUpdateOp.java index 38e1aef3..84ad6b40 100644 --- a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/invalidation/IndexUpdateOp.java +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/invalidation/IndexUpdateOp.java @@ -1,14 +1,16 @@ package com.usatiuk.dhfs.objects.repository.invalidation; +import com.usatiuk.dhfs.objects.JDataRemote; import com.usatiuk.dhfs.objects.JObjectKey; import com.usatiuk.dhfs.objects.PeerId; -import com.usatiuk.dhfs.objects.RemoteObjectMeta; +import com.usatiuk.dhfs.objects.repository.JDataRemoteDto; import org.pcollections.PMap; import java.util.Collection; import java.util.List; +import java.util.Optional; -public record IndexUpdateOp(JObjectKey key, PMap changelog) implements Op { +public record IndexUpdateOp(JObjectKey key, PMap changelog, JDataRemoteDto data) implements Op { @Override public Collection getEscapedRefs() { return List.of(key); diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/invalidation/InvalidationQueueService.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/invalidation/InvalidationQueueService.java index cb31e2b0..fcbf7671 100644 --- a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/invalidation/InvalidationQueueService.java +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/invalidation/InvalidationQueueService.java @@ -100,6 +100,7 @@ public class InvalidationQueueService { long success = 0; for (var e : data) { + // TODO: Race? if (peerInfoService.getPeerInfo(e.peer()).isEmpty()) continue; if (!remoteHostManager.isReachable(e.peer())) { diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/invalidation/OpPusher.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/invalidation/OpPusher.java index f573126c..24f8da9b 100644 --- a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/invalidation/OpPusher.java +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/invalidation/OpPusher.java @@ -3,11 +3,15 @@ package com.usatiuk.dhfs.objects.repository.invalidation; import com.usatiuk.dhfs.objects.JData; import com.usatiuk.dhfs.objects.RemoteObjectMeta; import com.usatiuk.dhfs.objects.RemoteTransaction; -import com.usatiuk.dhfs.objects.transaction.TransactionManager; import com.usatiuk.dhfs.objects.jkleppmanntree.JKleppmannTreeManager; import com.usatiuk.dhfs.objects.jkleppmanntree.structs.JKleppmannTreePersistentData; +import com.usatiuk.dhfs.objects.repository.JDataRemoteDto; +import com.usatiuk.dhfs.objects.repository.JDataRemotePush; import com.usatiuk.dhfs.objects.repository.RemoteObjectServiceClient; +import com.usatiuk.dhfs.objects.repository.syncmap.DtoMapperService; import com.usatiuk.dhfs.objects.transaction.Transaction; +import com.usatiuk.dhfs.objects.transaction.TransactionManager; +import io.quarkus.logging.Log; import jakarta.enterprise.context.ApplicationScoped; import jakarta.inject.Inject; @@ -27,13 +31,24 @@ public class OpPusher { InvalidationQueueService invalidationQueueService; @Inject JKleppmannTreeManager jKleppmannTreeManager; + @Inject + DtoMapperService dtoMapperService; public void doPush(InvalidationQueueEntry entry) { List info = txm.run(() -> { var obj = curTx.get(JData.class, entry.key()).orElse(null); switch (obj) { case RemoteObjectMeta remote -> { - return List.of(new IndexUpdateOp(entry.key(), remote.changelog())); + JDataRemoteDto data = + remote.knownType().isAnnotationPresent(JDataRemotePush.class) + ? remoteTransaction.getData(remote.knownType(), entry.key()) + .map(d -> dtoMapperService.toDto(d, d.dtoClass())).orElse(null) + : null; + + if (remote.knownType().isAnnotationPresent(JDataRemotePush.class) && data == null) { + Log.warnv("Failed to get data for push {0} of type {1}", entry.key(), remote.knownType()); + } + return List.of(new IndexUpdateOp(entry.key(), remote.changelog(), data)); } case JKleppmannTreePersistentData pd -> { var tree = jKleppmannTreeManager.getTree(pd.key()); diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/invalidation/PushOpHandler.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/invalidation/PushOpHandler.java index b7c4c48d..f31cfa04 100644 --- a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/invalidation/PushOpHandler.java +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/invalidation/PushOpHandler.java @@ -17,6 +17,6 @@ public class PushOpHandler { RemoteTransaction remoteTransaction; public void handlePush(PeerId peer, IndexUpdateOp obj) { - syncHandler.handleRemoteUpdate(peer, obj.key(), obj.changelog(), null); + syncHandler.handleRemoteUpdate(peer, obj.key(), obj.changelog(), obj.data()); } } diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/peerdiscovery/PeerDiscoveryDirectory.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/peerdiscovery/PeerDiscoveryDirectory.java index 1021add5..cfe5219d 100644 --- a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/peerdiscovery/PeerDiscoveryDirectory.java +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/peerdiscovery/PeerDiscoveryDirectory.java @@ -8,6 +8,7 @@ import org.apache.commons.collections4.multimap.HashSetValuedHashMap; import org.eclipse.microprofile.config.inject.ConfigProperty; import java.util.Collection; +import java.util.List; import java.util.Map; import java.util.Objects; import java.util.stream.Collectors; @@ -49,6 +50,9 @@ public class PeerDiscoveryDirectory { public Collection getForPeer(PeerId peer) { synchronized (_entries) { long curTime = System.currentTimeMillis(); + if (_entries.asMap().get(peer) == null) { + return List.of(); + } var partitioned = _entries.asMap().get(peer).stream() .collect(Collectors.partitioningBy(e -> e.lastSeen() + timeout < curTime)); for (var entry : partitioned.get(true)) { diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/peersync/PeerInfo.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/peersync/PeerInfo.java index deb3cb81..e4a25a9a 100644 --- a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/peersync/PeerInfo.java +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/peersync/PeerInfo.java @@ -6,9 +6,11 @@ import com.usatiuk.dhfs.objects.JObjectKey; import com.usatiuk.dhfs.objects.PeerId; import com.usatiuk.dhfs.objects.repository.CertificateTools; import com.usatiuk.dhfs.objects.repository.JDataRemoteDto; +import com.usatiuk.dhfs.objects.repository.JDataRemotePush; import java.security.cert.X509Certificate; +@JDataRemotePush public record PeerInfo(JObjectKey key, PeerId id, ByteString cert) implements JDataRemote, JDataRemoteDto { public PeerInfo(PeerId id, byte[] cert) { this(id.toJObjectKey(), id, ByteString.copyFrom(cert)); diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/peersync/PeerInfoService.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/peersync/PeerInfoService.java index a05b26bb..17405711 100644 --- a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/peersync/PeerInfoService.java +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/peersync/PeerInfoService.java @@ -38,9 +38,19 @@ public class PeerInfoService { return jKleppmannTreeManager.getTree(JObjectKey.of("peers"), LockingStrategy.OPTIMISTIC); } + public Optional getPeerInfoImpl(JObjectKey key) { + return jObjectTxManager.run(() -> { + return curTx.get(JKleppmannTreeNode.class, key).flatMap(node -> { + var meta = (JKleppmannTreeNodeMetaPeer) node.meta(); + return remoteTx.getData(PeerInfo.class, meta.getPeerId()); + }); + }); + + } + public Optional getPeerInfo(PeerId peer) { return jObjectTxManager.run(() -> { - var gotKey = getTreeR().traverse(List.of(peer.toString())); + var gotKey = getTreeR().traverse(List.of(JKleppmannTreeNodeMetaPeer.peerIdToNodeId(peer).name())); if (gotKey == null) { return Optional.empty(); } @@ -56,8 +66,12 @@ public class PeerInfoService { var gotKey = getTreeR().traverse(List.of()); return curTx.get(JKleppmannTreeNode.class, gotKey).map( node -> node.children().keySet().stream() - .map(PeerId::of).map(this::getPeerInfo) - .map(Optional::get).toList()) + .map(JObjectKey::of).map(this::getPeerInfoImpl) + .filter(o -> { + if (o.isEmpty()) + Log.warnv("Could not get peer info for peer {0}", o); + return o.isPresent(); + }).map(Optional::get).toList()) .orElseThrow(); }); } @@ -67,7 +81,7 @@ public class PeerInfoService { var gotKey = getTreeR().traverse(List.of()); return curTx.get(JKleppmannTreeNode.class, gotKey).map( node -> node.children().keySet().stream() - .map(PeerId::of).map(this::getPeerInfo) + .map(JObjectKey::of).map(this::getPeerInfoImpl) .map(Optional::get).filter( peerInfo -> !peerInfo.id().equals(persistentPeerDataService.getSelfUuid())).toList()) .orElseThrow(); @@ -79,22 +93,23 @@ public class PeerInfoService { var parent = getTreeW().traverse(List.of()); var newPeerInfo = new PeerInfo(id, cert); remoteTx.putData(newPeerInfo); - getTreeW().move(parent, new JKleppmannTreeNodeMetaPeer(newPeerInfo.id()), getTreeW().getNewNodeId()); + getTreeW().move(parent, new JKleppmannTreeNodeMetaPeer(newPeerInfo.id()), JKleppmannTreeNodeMetaPeer.peerIdToNodeId(newPeerInfo.id())); }); } public void removePeer(PeerId id) { jObjectTxManager.run(() -> { - var gotKey = getTreeW().traverse(List.of(id.toString())); + var gotKey = getTreeR().traverse(List.of(JKleppmannTreeNodeMetaPeer.peerIdToNodeId(id).name())); if (gotKey == null) { return; } - var meta = curTx.get(JKleppmannTreeNode.class, gotKey).map(node -> (JKleppmannTreeNodeMetaPeer) node.meta()).orElse(null); - if (meta == null) { + var node = curTx.get(JKleppmannTreeNode.class, gotKey).orElse(null); + if (node == null) { Log.warn("Peer " + id + " not found in the tree"); return; } - getTreeW().trash(meta, id.toJObjectKey()); + getTreeW().trash(node.meta(), node.key()); + curTx.onCommit(persistentPeerDataService::updateCerts); }); } } diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/peersync/structs/JKleppmannTreeNodeMetaPeer.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/peersync/structs/JKleppmannTreeNodeMetaPeer.java index a9ea1800..09d9be10 100644 --- a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/peersync/structs/JKleppmannTreeNodeMetaPeer.java +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/peersync/structs/JKleppmannTreeNodeMetaPeer.java @@ -11,7 +11,7 @@ public class JKleppmannTreeNodeMetaPeer extends JKleppmannTreeNodeMeta { private final JObjectKey _peerId; public JKleppmannTreeNodeMetaPeer(PeerId id) { - super(id.toString()); + super(peerIdToNodeId(id).name()); _peerId = id.toJObjectKey(); } @@ -19,9 +19,14 @@ public class JKleppmannTreeNodeMetaPeer extends JKleppmannTreeNodeMeta { return _peerId; } + public static JObjectKey peerIdToNodeId(PeerId id) { + return JObjectKey.of(id.toJObjectKey().name() + "_tree_node"); + } + @Override public JKleppmannTreeNodeMeta withName(String name) { - assert false; + assert name.equals(peerIdToNodeId(PeerId.of(getPeerId().name())).toString()); + assert getName().equals(name); return this; } diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/peertrust/PeerInfoCertUpdateTxHook.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/peertrust/PeerInfoCertUpdateTxHook.java new file mode 100644 index 00000000..cb7129e0 --- /dev/null +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/peertrust/PeerInfoCertUpdateTxHook.java @@ -0,0 +1,73 @@ +package com.usatiuk.dhfs.objects.repository.peertrust; + +import com.usatiuk.dhfs.objects.JData; +import com.usatiuk.dhfs.objects.JObjectKey; +import com.usatiuk.dhfs.objects.RemoteObjectDataWrapper; +import com.usatiuk.dhfs.objects.jkleppmanntree.structs.JKleppmannTreeNode; +import com.usatiuk.dhfs.objects.repository.PersistentPeerDataService; +import com.usatiuk.dhfs.objects.repository.peersync.PeerInfo; +import com.usatiuk.dhfs.objects.transaction.PreCommitTxHook; +import com.usatiuk.dhfs.objects.transaction.Transaction; +import io.quarkus.logging.Log; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; + +@ApplicationScoped +public class PeerInfoCertUpdateTxHook implements PreCommitTxHook { + @Inject + Transaction curTx; + @Inject + PersistentPeerDataService persistentPeerDataService; + + @Override + public void onChange(JObjectKey key, JData old, JData cur) { + if (cur instanceof JKleppmannTreeNode n) { + if (n.key().name().equals("peers_jt_root")) { + Log.infov("Changed peer tree root: {0} to {1}", key, cur); + + curTx.onCommit(() -> persistentPeerDataService.updateCerts()); + return; + } + } + + if (!(cur instanceof RemoteObjectDataWrapper remote)) { + return; + } + + if (!(remote.data() instanceof PeerInfo)) + return; + + Log.infov("Changed peer info: {0} to {1}", key, cur); + + curTx.onCommit(() -> persistentPeerDataService.updateCerts()); + } + + @Override + public void onCreate(JObjectKey key, JData cur) { + if (!(cur instanceof RemoteObjectDataWrapper remote)) { + return; + } + + if (!(remote.data() instanceof PeerInfo)) + return; + + Log.infov("Created peer info: {0}, {1}", key, cur); + + curTx.onCommit(() -> persistentPeerDataService.updateCerts()); + } + + @Override + public void onDelete(JObjectKey key, JData cur) { + if (!(cur instanceof RemoteObjectDataWrapper remote)) { + return; + } + + if (!(remote.data() instanceof PeerInfo)) + return; + + Log.infov("Deleted peer info: {0}, {1}", key, cur); + + curTx.onCommit(() -> persistentPeerDataService.updateCerts()); + } + +} diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/peertrust/PeerTrustManager.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/peertrust/PeerTrustManager.java index 26573abb..d392d472 100644 --- a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/peertrust/PeerTrustManager.java +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/peertrust/PeerTrustManager.java @@ -1,8 +1,12 @@ package com.usatiuk.dhfs.objects.repository.peertrust; import com.usatiuk.dhfs.objects.repository.peersync.PeerInfo; +import com.usatiuk.dhfs.objects.repository.peersync.PeerInfoService; import io.quarkus.logging.Log; +import io.quarkus.scheduler.Scheduled; +import io.smallrye.common.annotation.Blocking; import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; import org.apache.commons.lang3.tuple.Pair; import javax.net.ssl.TrustManager; @@ -17,6 +21,9 @@ import java.util.concurrent.atomic.AtomicReference; @ApplicationScoped public class PeerTrustManager implements X509TrustManager { + @Inject + PeerInfoService peerInfoService; + private final AtomicReference trustManager = new AtomicReference<>(); @Override @@ -44,6 +51,13 @@ public class PeerTrustManager implements X509TrustManager { } } + // FIXME: + @Scheduled(every = "15s", concurrentExecution = Scheduled.ConcurrentExecution.SKIP) + @Blocking + void hackRefresh() { + reloadTrustManagerHosts(peerInfoService.getPeers()); + } + private synchronized void reloadTrustManager(Collection> certs) throws Exception { KeyStore ts = KeyStore.getInstance(KeyStore.getDefaultType()); ts.load(null, null); diff --git a/dhfs-parent/server/src/test/java/com/usatiuk/dhfs/integration/DhfsFusex3IT.java b/dhfs-parent/server/src/test/java/com/usatiuk/dhfs/integration/DhfsFusex3IT.java index 089cee37..55e219bc 100644 --- a/dhfs-parent/server/src/test/java/com/usatiuk/dhfs/integration/DhfsFusex3IT.java +++ b/dhfs-parent/server/src/test/java/com/usatiuk/dhfs/integration/DhfsFusex3IT.java @@ -142,17 +142,17 @@ public class DhfsFusex3IT { @Test void largerFileDeleteTest() throws IOException, InterruptedException, TimeoutException { - await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "cd /root/dhfs_default/fuse && curl -O https://ash-speed.hetzner.com/100MB.bin").getExitCode()); - await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "head -c 10 /root/dhfs_default/fuse/100MB.bin").getExitCode()); - await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container3.execInContainer("/bin/sh", "-c", "rm /root/dhfs_default/fuse/100MB.bin").getExitCode()); + await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "cd /root/dhfs_default/fuse && dd if=/dev/urandom of=10MB.bin bs=1M count=10").getExitCode()); + await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "head -c 10 /root/dhfs_default/fuse/10MB.bin").getExitCode()); + await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container3.execInContainer("/bin/sh", "-c", "rm /root/dhfs_default/fuse/10MB.bin").getExitCode()); await().atMost(45, TimeUnit.SECONDS).until(() -> checkEmpty()); } @Test void largerFileDeleteTestNoDelays() throws IOException, InterruptedException, TimeoutException { - await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "cd /root/dhfs_default/fuse && curl -O https://ash-speed.hetzner.com/100MB.bin").getExitCode()); - await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "head -c 10 /root/dhfs_default/fuse/100MB.bin").getExitCode()); - await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container3.execInContainer("/bin/sh", "-c", "rm /root/dhfs_default/fuse/100MB.bin").getExitCode()); + await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "cd /root/dhfs_default/fuse && dd if=/dev/urandom of=10MB.bin bs=1M count=10").getExitCode()); + await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "head -c 10 /root/dhfs_default/fuse/10MB.bin").getExitCode()); + await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container3.execInContainer("/bin/sh", "-c", "rm /root/dhfs_default/fuse/10MB.bin").getExitCode()); await().atMost(45, TimeUnit.SECONDS).until(() -> checkEmpty()); } @@ -190,6 +190,8 @@ public class DhfsFusex3IT { " --data '{\"uuid\":\"" + c2uuid + "\"}' " + " http://localhost:8080/objects-manage/known-peers"); + Thread.sleep(10000); + await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "echo rewritten > /root/dhfs_default/fuse/testf1").getExitCode()); await().atMost(45, TimeUnit.SECONDS).until(() -> "rewritten\n".equals(container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf1").getStdout())); await().atMost(45, TimeUnit.SECONDS).until(() -> "tesempty\n".equals(container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf1").getStdout()));