Server: some peer sync fixes

This commit is contained in:
2025-03-16 20:07:45 +01:00
parent fa64dac9aa
commit a2e75dbdc7
14 changed files with 184 additions and 47 deletions

View File

@@ -0,0 +1,11 @@
package com.usatiuk.dhfs.objects.repository;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
public @interface JDataRemotePush {
}

View File

@@ -25,6 +25,7 @@ import java.io.IOException;
import java.util.Base64;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.*;
import java.util.stream.Collectors;
@@ -71,7 +72,13 @@ public class PeerManager {
public void tryConnectAll() {
if (_heartbeatExecutor == null) return;
try {
_heartbeatExecutor.invokeAll(peerInfoService.getPeersNoSelf()
var peers = peerInfoService.getPeersNoSelf();
var pids = peers.stream().map(PeerInfo::id).toList();
List<PeerId> stale = _states.keySet().stream().filter(p -> !pids.contains(p)).toList();
stale.forEach(_states.keySet()::remove);
_heartbeatExecutor.invokeAll(peers
.stream()
.<Callable<Void>>map(host -> () -> {
try {
@@ -79,13 +86,13 @@ public class PeerManager {
Log.tracev("Heartbeat: {0}", host);
else
Log.debugv("Trying to connect to {0}", host);
var bestAddr = selectBestAddress(host.id());
if (pingCheck(host, bestAddr))
var bestAddr = selectBestAddress(host.id()).orElse(null);
if (bestAddr != null && pingCheck(host, bestAddr))
handleConnectionSuccess(host, bestAddr);
else
handleConnectionError(host);
} catch (Exception e) {
Log.errorv("Failed to connect to {0} because {1}", host, e);
Log.error("Failed to connect to " + host.key(), e);
}
return null;
}).toList(), 30, TimeUnit.SECONDS); //FIXME:
@@ -175,20 +182,17 @@ public class PeerManager {
});
}
private PeerAddress selectBestAddress(PeerId host) {
return peerDiscoveryDirectory.getForPeer(host).stream().findFirst().orElseThrow();
private Optional<PeerAddress> selectBestAddress(PeerId host) {
return peerDiscoveryDirectory.getForPeer(host).stream().findFirst();
}
public void addRemoteHost(PeerId host) {
if (_states.containsKey(host)) {
throw new IllegalStateException("Host " + host + " is already added");
}
transactionManager.run(() -> {
if (peerInfoService.getPeerInfo(host).isPresent())
throw new IllegalStateException("Host " + host + " is already added");
var info = peerSyncApiClient.getSelfInfo(selectBestAddress(host));
var addr = selectBestAddress(host).orElseThrow(() -> new IllegalStateException("Host " + host + " is unreachable"));
var info = peerSyncApiClient.getSelfInfo(addr);
var cert = Base64.getDecoder().decode(info.cert());
peerInfoService.putPeer(host, cert);

View File

@@ -102,20 +102,9 @@ public class PersistentPeerDataService {
return _selfUuid;
}
// private void updateCerts() {
// try {
// peerDirectory.get().runReadLocked(JObjectManager.ResolutionStrategy.LOCAL_ONLY, (m, d) -> {
// peerTrustManager.reloadTrustManagerHosts(getHostsNoNulls());
// // Fixme:? I don't think it should be needed with custom trust store
// // but it doesn't work?
// rpcClientFactory.dropCache();
// return null;
// });
// } catch (Exception ex) {
// Log.warn("Error when refreshing certificates, will retry: " + ex.getMessage());
// pushPeerUpdates();
// }
// }
public void updateCerts() {
peerTrustManager.reloadTrustManagerHosts(txm.run(() -> peerInfoService.getPeers().stream().toList())); //FIXME:
}
public KeyPair getSelfKeypair() {
return _selfKeyPair;

View File

@@ -1,14 +1,16 @@
package com.usatiuk.dhfs.objects.repository.invalidation;
import com.usatiuk.dhfs.objects.JDataRemote;
import com.usatiuk.dhfs.objects.JObjectKey;
import com.usatiuk.dhfs.objects.PeerId;
import com.usatiuk.dhfs.objects.RemoteObjectMeta;
import com.usatiuk.dhfs.objects.repository.JDataRemoteDto;
import org.pcollections.PMap;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
public record IndexUpdateOp(JObjectKey key, PMap<PeerId, Long> changelog) implements Op {
public record IndexUpdateOp(JObjectKey key, PMap<PeerId, Long> changelog, JDataRemoteDto data) implements Op {
@Override
public Collection<JObjectKey> getEscapedRefs() {
return List.of(key);

View File

@@ -100,6 +100,7 @@ public class InvalidationQueueService {
long success = 0;
for (var e : data) {
// TODO: Race?
if (peerInfoService.getPeerInfo(e.peer()).isEmpty()) continue;
if (!remoteHostManager.isReachable(e.peer())) {

View File

@@ -3,11 +3,15 @@ package com.usatiuk.dhfs.objects.repository.invalidation;
import com.usatiuk.dhfs.objects.JData;
import com.usatiuk.dhfs.objects.RemoteObjectMeta;
import com.usatiuk.dhfs.objects.RemoteTransaction;
import com.usatiuk.dhfs.objects.transaction.TransactionManager;
import com.usatiuk.dhfs.objects.jkleppmanntree.JKleppmannTreeManager;
import com.usatiuk.dhfs.objects.jkleppmanntree.structs.JKleppmannTreePersistentData;
import com.usatiuk.dhfs.objects.repository.JDataRemoteDto;
import com.usatiuk.dhfs.objects.repository.JDataRemotePush;
import com.usatiuk.dhfs.objects.repository.RemoteObjectServiceClient;
import com.usatiuk.dhfs.objects.repository.syncmap.DtoMapperService;
import com.usatiuk.dhfs.objects.transaction.Transaction;
import com.usatiuk.dhfs.objects.transaction.TransactionManager;
import io.quarkus.logging.Log;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
@@ -27,13 +31,24 @@ public class OpPusher {
InvalidationQueueService invalidationQueueService;
@Inject
JKleppmannTreeManager jKleppmannTreeManager;
@Inject
DtoMapperService dtoMapperService;
public void doPush(InvalidationQueueEntry entry) {
List<Op> info = txm.run(() -> {
var obj = curTx.get(JData.class, entry.key()).orElse(null);
switch (obj) {
case RemoteObjectMeta remote -> {
return List.of(new IndexUpdateOp(entry.key(), remote.changelog()));
JDataRemoteDto data =
remote.knownType().isAnnotationPresent(JDataRemotePush.class)
? remoteTransaction.getData(remote.knownType(), entry.key())
.map(d -> dtoMapperService.toDto(d, d.dtoClass())).orElse(null)
: null;
if (remote.knownType().isAnnotationPresent(JDataRemotePush.class) && data == null) {
Log.warnv("Failed to get data for push {0} of type {1}", entry.key(), remote.knownType());
}
return List.of(new IndexUpdateOp(entry.key(), remote.changelog(), data));
}
case JKleppmannTreePersistentData pd -> {
var tree = jKleppmannTreeManager.getTree(pd.key());

View File

@@ -17,6 +17,6 @@ public class PushOpHandler {
RemoteTransaction remoteTransaction;
public void handlePush(PeerId peer, IndexUpdateOp obj) {
syncHandler.handleRemoteUpdate(peer, obj.key(), obj.changelog(), null);
syncHandler.handleRemoteUpdate(peer, obj.key(), obj.changelog(), obj.data());
}
}

View File

@@ -8,6 +8,7 @@ import org.apache.commons.collections4.multimap.HashSetValuedHashMap;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
@@ -49,6 +50,9 @@ public class PeerDiscoveryDirectory {
public Collection<PeerAddress> getForPeer(PeerId peer) {
synchronized (_entries) {
long curTime = System.currentTimeMillis();
if (_entries.asMap().get(peer) == null) {
return List.of();
}
var partitioned = _entries.asMap().get(peer).stream()
.collect(Collectors.partitioningBy(e -> e.lastSeen() + timeout < curTime));
for (var entry : partitioned.get(true)) {

View File

@@ -6,9 +6,11 @@ import com.usatiuk.dhfs.objects.JObjectKey;
import com.usatiuk.dhfs.objects.PeerId;
import com.usatiuk.dhfs.objects.repository.CertificateTools;
import com.usatiuk.dhfs.objects.repository.JDataRemoteDto;
import com.usatiuk.dhfs.objects.repository.JDataRemotePush;
import java.security.cert.X509Certificate;
@JDataRemotePush
public record PeerInfo(JObjectKey key, PeerId id, ByteString cert) implements JDataRemote, JDataRemoteDto {
public PeerInfo(PeerId id, byte[] cert) {
this(id.toJObjectKey(), id, ByteString.copyFrom(cert));

View File

@@ -38,9 +38,19 @@ public class PeerInfoService {
return jKleppmannTreeManager.getTree(JObjectKey.of("peers"), LockingStrategy.OPTIMISTIC);
}
public Optional<PeerInfo> getPeerInfoImpl(JObjectKey key) {
return jObjectTxManager.run(() -> {
return curTx.get(JKleppmannTreeNode.class, key).flatMap(node -> {
var meta = (JKleppmannTreeNodeMetaPeer) node.meta();
return remoteTx.getData(PeerInfo.class, meta.getPeerId());
});
});
}
public Optional<PeerInfo> getPeerInfo(PeerId peer) {
return jObjectTxManager.run(() -> {
var gotKey = getTreeR().traverse(List.of(peer.toString()));
var gotKey = getTreeR().traverse(List.of(JKleppmannTreeNodeMetaPeer.peerIdToNodeId(peer).name()));
if (gotKey == null) {
return Optional.empty();
}
@@ -56,8 +66,12 @@ public class PeerInfoService {
var gotKey = getTreeR().traverse(List.of());
return curTx.get(JKleppmannTreeNode.class, gotKey).map(
node -> node.children().keySet().stream()
.map(PeerId::of).map(this::getPeerInfo)
.map(Optional::get).toList())
.map(JObjectKey::of).map(this::getPeerInfoImpl)
.filter(o -> {
if (o.isEmpty())
Log.warnv("Could not get peer info for peer {0}", o);
return o.isPresent();
}).map(Optional::get).toList())
.orElseThrow();
});
}
@@ -67,7 +81,7 @@ public class PeerInfoService {
var gotKey = getTreeR().traverse(List.of());
return curTx.get(JKleppmannTreeNode.class, gotKey).map(
node -> node.children().keySet().stream()
.map(PeerId::of).map(this::getPeerInfo)
.map(JObjectKey::of).map(this::getPeerInfoImpl)
.map(Optional::get).filter(
peerInfo -> !peerInfo.id().equals(persistentPeerDataService.getSelfUuid())).toList())
.orElseThrow();
@@ -79,22 +93,23 @@ public class PeerInfoService {
var parent = getTreeW().traverse(List.of());
var newPeerInfo = new PeerInfo(id, cert);
remoteTx.putData(newPeerInfo);
getTreeW().move(parent, new JKleppmannTreeNodeMetaPeer(newPeerInfo.id()), getTreeW().getNewNodeId());
getTreeW().move(parent, new JKleppmannTreeNodeMetaPeer(newPeerInfo.id()), JKleppmannTreeNodeMetaPeer.peerIdToNodeId(newPeerInfo.id()));
});
}
public void removePeer(PeerId id) {
jObjectTxManager.run(() -> {
var gotKey = getTreeW().traverse(List.of(id.toString()));
var gotKey = getTreeR().traverse(List.of(JKleppmannTreeNodeMetaPeer.peerIdToNodeId(id).name()));
if (gotKey == null) {
return;
}
var meta = curTx.get(JKleppmannTreeNode.class, gotKey).map(node -> (JKleppmannTreeNodeMetaPeer) node.meta()).orElse(null);
if (meta == null) {
var node = curTx.get(JKleppmannTreeNode.class, gotKey).orElse(null);
if (node == null) {
Log.warn("Peer " + id + " not found in the tree");
return;
}
getTreeW().trash(meta, id.toJObjectKey());
getTreeW().trash(node.meta(), node.key());
curTx.onCommit(persistentPeerDataService::updateCerts);
});
}
}

View File

@@ -11,7 +11,7 @@ public class JKleppmannTreeNodeMetaPeer extends JKleppmannTreeNodeMeta {
private final JObjectKey _peerId;
public JKleppmannTreeNodeMetaPeer(PeerId id) {
super(id.toString());
super(peerIdToNodeId(id).name());
_peerId = id.toJObjectKey();
}
@@ -19,9 +19,14 @@ public class JKleppmannTreeNodeMetaPeer extends JKleppmannTreeNodeMeta {
return _peerId;
}
public static JObjectKey peerIdToNodeId(PeerId id) {
return JObjectKey.of(id.toJObjectKey().name() + "_tree_node");
}
@Override
public JKleppmannTreeNodeMeta withName(String name) {
assert false;
assert name.equals(peerIdToNodeId(PeerId.of(getPeerId().name())).toString());
assert getName().equals(name);
return this;
}

View File

@@ -0,0 +1,73 @@
package com.usatiuk.dhfs.objects.repository.peertrust;
import com.usatiuk.dhfs.objects.JData;
import com.usatiuk.dhfs.objects.JObjectKey;
import com.usatiuk.dhfs.objects.RemoteObjectDataWrapper;
import com.usatiuk.dhfs.objects.jkleppmanntree.structs.JKleppmannTreeNode;
import com.usatiuk.dhfs.objects.repository.PersistentPeerDataService;
import com.usatiuk.dhfs.objects.repository.peersync.PeerInfo;
import com.usatiuk.dhfs.objects.transaction.PreCommitTxHook;
import com.usatiuk.dhfs.objects.transaction.Transaction;
import io.quarkus.logging.Log;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
@ApplicationScoped
public class PeerInfoCertUpdateTxHook implements PreCommitTxHook {
@Inject
Transaction curTx;
@Inject
PersistentPeerDataService persistentPeerDataService;
@Override
public void onChange(JObjectKey key, JData old, JData cur) {
if (cur instanceof JKleppmannTreeNode n) {
if (n.key().name().equals("peers_jt_root")) {
Log.infov("Changed peer tree root: {0} to {1}", key, cur);
curTx.onCommit(() -> persistentPeerDataService.updateCerts());
return;
}
}
if (!(cur instanceof RemoteObjectDataWrapper remote)) {
return;
}
if (!(remote.data() instanceof PeerInfo))
return;
Log.infov("Changed peer info: {0} to {1}", key, cur);
curTx.onCommit(() -> persistentPeerDataService.updateCerts());
}
@Override
public void onCreate(JObjectKey key, JData cur) {
if (!(cur instanceof RemoteObjectDataWrapper remote)) {
return;
}
if (!(remote.data() instanceof PeerInfo))
return;
Log.infov("Created peer info: {0}, {1}", key, cur);
curTx.onCommit(() -> persistentPeerDataService.updateCerts());
}
@Override
public void onDelete(JObjectKey key, JData cur) {
if (!(cur instanceof RemoteObjectDataWrapper remote)) {
return;
}
if (!(remote.data() instanceof PeerInfo))
return;
Log.infov("Deleted peer info: {0}, {1}", key, cur);
curTx.onCommit(() -> persistentPeerDataService.updateCerts());
}
}

View File

@@ -1,8 +1,12 @@
package com.usatiuk.dhfs.objects.repository.peertrust;
import com.usatiuk.dhfs.objects.repository.peersync.PeerInfo;
import com.usatiuk.dhfs.objects.repository.peersync.PeerInfoService;
import io.quarkus.logging.Log;
import io.quarkus.scheduler.Scheduled;
import io.smallrye.common.annotation.Blocking;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import org.apache.commons.lang3.tuple.Pair;
import javax.net.ssl.TrustManager;
@@ -17,6 +21,9 @@ import java.util.concurrent.atomic.AtomicReference;
@ApplicationScoped
public class PeerTrustManager implements X509TrustManager {
@Inject
PeerInfoService peerInfoService;
private final AtomicReference<X509TrustManager> trustManager = new AtomicReference<>();
@Override
@@ -44,6 +51,13 @@ public class PeerTrustManager implements X509TrustManager {
}
}
// FIXME:
@Scheduled(every = "15s", concurrentExecution = Scheduled.ConcurrentExecution.SKIP)
@Blocking
void hackRefresh() {
reloadTrustManagerHosts(peerInfoService.getPeers());
}
private synchronized void reloadTrustManager(Collection<Pair<String, X509Certificate>> certs) throws Exception {
KeyStore ts = KeyStore.getInstance(KeyStore.getDefaultType());
ts.load(null, null);

View File

@@ -142,17 +142,17 @@ public class DhfsFusex3IT {
@Test
void largerFileDeleteTest() throws IOException, InterruptedException, TimeoutException {
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "cd /root/dhfs_default/fuse && curl -O https://ash-speed.hetzner.com/100MB.bin").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "head -c 10 /root/dhfs_default/fuse/100MB.bin").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container3.execInContainer("/bin/sh", "-c", "rm /root/dhfs_default/fuse/100MB.bin").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "cd /root/dhfs_default/fuse && dd if=/dev/urandom of=10MB.bin bs=1M count=10").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "head -c 10 /root/dhfs_default/fuse/10MB.bin").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container3.execInContainer("/bin/sh", "-c", "rm /root/dhfs_default/fuse/10MB.bin").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> checkEmpty());
}
@Test
void largerFileDeleteTestNoDelays() throws IOException, InterruptedException, TimeoutException {
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "cd /root/dhfs_default/fuse && curl -O https://ash-speed.hetzner.com/100MB.bin").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "head -c 10 /root/dhfs_default/fuse/100MB.bin").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container3.execInContainer("/bin/sh", "-c", "rm /root/dhfs_default/fuse/100MB.bin").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "cd /root/dhfs_default/fuse && dd if=/dev/urandom of=10MB.bin bs=1M count=10").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "head -c 10 /root/dhfs_default/fuse/10MB.bin").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container3.execInContainer("/bin/sh", "-c", "rm /root/dhfs_default/fuse/10MB.bin").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> checkEmpty());
}
@@ -190,6 +190,8 @@ public class DhfsFusex3IT {
" --data '{\"uuid\":\"" + c2uuid + "\"}' " +
" http://localhost:8080/objects-manage/known-peers");
Thread.sleep(10000);
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "echo rewritten > /root/dhfs_default/fuse/testf1").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> "rewritten\n".equals(container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf1").getStdout()));
await().atMost(45, TimeUnit.SECONDS).until(() -> "tesempty\n".equals(container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf1").getStdout()));