diff --git a/dhfs-parent/objects/src/main/resources/application.properties b/dhfs-parent/objects/src/main/resources/application.properties index 24bd7282..f7842d0c 100644 --- a/dhfs-parent/objects/src/main/resources/application.properties +++ b/dhfs-parent/objects/src/main/resources/application.properties @@ -4,5 +4,4 @@ dhfs.objects.lru.limit=134217728 dhfs.objects.lru.print-stats=true dhfs.objects.lock_timeout_secs=15 dhfs.objects.persistence.files.root=${HOME}/dhfs_default/data/objs -dhfs.objects.root=${HOME}/dhfs_default/data/stuff quarkus.package.jar.decompiler.enabled=true diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/ShutdownChecker.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/ShutdownChecker.java index dcd379a8..7074af8d 100644 --- a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/ShutdownChecker.java +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/ShutdownChecker.java @@ -14,7 +14,7 @@ import java.nio.file.Paths; @ApplicationScoped public class ShutdownChecker { private static final String dataFileName = "running"; - @ConfigProperty(name = "dhfs.objects.root") + @ConfigProperty(name = "dhfs.objects.persistence.files.root") String dataRoot; boolean _cleanShutdown = true; boolean _initialized = false; diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/JDataRemote.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/JDataRemote.java new file mode 100644 index 00000000..4386f03e --- /dev/null +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/JDataRemote.java @@ -0,0 +1,4 @@ +package com.usatiuk.dhfs.objects; + +public interface JDataRemote { +} diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/PeerId.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/PeerId.java new file mode 100644 index 00000000..5c34de0e --- /dev/null +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/PeerId.java @@ -0,0 +1,23 @@ +package com.usatiuk.dhfs.objects; + +import java.io.Serializable; +import java.util.UUID; + +public record PeerId(UUID id) implements Serializable { + public static PeerId of(UUID id) { + return new PeerId(id); + } + + public static PeerId of(String id) { + return new PeerId(UUID.fromString(id)); + } + + @Override + public String toString() { + return id.toString(); + } + + public JObjectKey toJObjectKey() { + return JObjectKey.of(id.toString()); + } +} diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/RemoteObject.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/RemoteObject.java new file mode 100644 index 00000000..46719854 --- /dev/null +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/RemoteObject.java @@ -0,0 +1,67 @@ +package com.usatiuk.dhfs.objects; + +import org.pcollections.PCollection; +import org.pcollections.PMap; +import org.pcollections.PSet; + +import java.util.Collection; +import java.util.List; + +public record RemoteObject( + JObjectKey key, PCollection refsFrom, boolean frozen, + PMap knownRemoteVersions, + Class knownType, + PSet confirmedDeletes, + boolean seen, + PMap changelog, + boolean haveLocal +) implements JDataRefcounted { + @Override + public RemoteObject withRefsFrom(PCollection refs) { + return new RemoteObject<>(key, refs, frozen, knownRemoteVersions, knownType, confirmedDeletes, seen, changelog, haveLocal); + } + + @Override + public RemoteObject withFrozen(boolean frozen) { + return new RemoteObject<>(key, refsFrom, frozen, knownRemoteVersions, knownType, confirmedDeletes, seen, changelog, haveLocal); + } + + public RemoteObject withKnownRemoteVersions(PMap knownRemoteVersions) { + return new RemoteObject<>(key, refsFrom, frozen, knownRemoteVersions, knownType, confirmedDeletes, seen, changelog, haveLocal); + } + + public RemoteObject withKnownType(Class knownType) { + return new RemoteObject<>(key, refsFrom, frozen, knownRemoteVersions, knownType, confirmedDeletes, seen, changelog, haveLocal); + } + + public RemoteObject withConfirmedDeletes(PSet confirmedDeletes) { + return new RemoteObject<>(key, refsFrom, frozen, knownRemoteVersions, knownType, confirmedDeletes, seen, changelog, haveLocal); + } + + public RemoteObject withSeen(boolean seen) { + return new RemoteObject<>(key, refsFrom, frozen, knownRemoteVersions, knownType, confirmedDeletes, seen, changelog, haveLocal); + } + + public RemoteObject withChangelog(PMap changelog) { + return new RemoteObject<>(key, refsFrom, frozen, knownRemoteVersions, knownType, confirmedDeletes, seen, changelog, haveLocal); + } + + public RemoteObject withHaveLocal(boolean haveLocal) { + return new RemoteObject<>(key, refsFrom, frozen, knownRemoteVersions, knownType, confirmedDeletes, seen, changelog, haveLocal); + } + + public static JObjectKey keyFrom(JObjectKey key) { + return new JObjectKey(key + "_remote"); + } + + public JObjectKey localKey() { + if (!haveLocal) throw new IllegalStateException("No local key"); + return JObjectKey.of(key.name().substring(0, key.name().length() - "_remote".length())); + } + + @Override + public Collection collectRefsTo() { + if (haveLocal) return List.of(localKey()); + return List.of(); + } +} diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/RemoteTransaction.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/RemoteTransaction.java new file mode 100644 index 00000000..b5086e4a --- /dev/null +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/RemoteTransaction.java @@ -0,0 +1,31 @@ +package com.usatiuk.dhfs.objects; + +import com.usatiuk.dhfs.objects.transaction.LockingStrategy; +import com.usatiuk.dhfs.objects.transaction.Transaction; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; +import org.apache.commons.lang3.NotImplementedException; + +import java.util.Optional; + +@ApplicationScoped +public class RemoteTransaction { + @Inject + Transaction curTx; + + public long getId() { + return curTx.getId(); + } + + public Optional get(Class type, JObjectKey key, LockingStrategy strategy) { + throw new NotImplementedException(); + } + + public void put(JData obj) { + throw new NotImplementedException(); + } + + public Optional get(Class type, JObjectKey key) { + return get(type, key, LockingStrategy.OPTIMISTIC); + } +} diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/jkleppmanntree/structs/JKleppmannTreeNode.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/jkleppmanntree/structs/JKleppmannTreeNode.java index 890b13ea..3b4a8687 100644 --- a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/jkleppmanntree/structs/JKleppmannTreeNode.java +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/jkleppmanntree/structs/JKleppmannTreeNode.java @@ -2,6 +2,7 @@ package com.usatiuk.dhfs.objects.jkleppmanntree.structs; import com.usatiuk.dhfs.objects.JDataRefcounted; import com.usatiuk.dhfs.objects.JObjectKey; +import com.usatiuk.dhfs.objects.repository.peersync.structs.JKleppmannTreeNodeMetaPeer; import com.usatiuk.kleppmanntree.OpMove; import com.usatiuk.kleppmanntree.TreeNode; import org.pcollections.PCollection; @@ -61,6 +62,7 @@ public record JKleppmannTreeNode(JObjectKey key, PCollection refsFro switch (meta()) { case JKleppmannTreeNodeMetaDirectory dir -> Stream.of(); case JKleppmannTreeNodeMetaFile file -> Stream.of(file.getFileIno()); + case JKleppmannTreeNodeMetaPeer peer -> Stream.of(peer.getPeerId()); default -> throw new IllegalStateException("Unexpected value: " + meta()); } ).collect(Collectors.toUnmodifiableSet()); diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/jkleppmanntree/structs/JKleppmannTreeNodeMeta.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/jkleppmanntree/structs/JKleppmannTreeNodeMeta.java index a7171ee8..fb4e5311 100644 --- a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/jkleppmanntree/structs/JKleppmannTreeNodeMeta.java +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/jkleppmanntree/structs/JKleppmannTreeNodeMeta.java @@ -6,7 +6,7 @@ import com.usatiuk.kleppmanntree.NodeMeta; import java.util.Objects; -@ProtoMirror(JKleppmannTreeNodeMetaP.class) +//@ProtoMirror(JKleppmannTreeNodeMetaP.class) public abstract class JKleppmannTreeNodeMeta implements NodeMeta { private final String _name; diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/jkleppmanntree/structs/JKleppmannTreeNodeMetaDirectory.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/jkleppmanntree/structs/JKleppmannTreeNodeMetaDirectory.java index 79882017..39ebd488 100644 --- a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/jkleppmanntree/structs/JKleppmannTreeNodeMetaDirectory.java +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/jkleppmanntree/structs/JKleppmannTreeNodeMetaDirectory.java @@ -3,7 +3,7 @@ package com.usatiuk.dhfs.objects.jkleppmanntree.structs; import com.usatiuk.autoprotomap.runtime.ProtoMirror; import com.usatiuk.dhfs.objects.persistence.JKleppmannTreeNodeMetaDirectoryP; -@ProtoMirror(JKleppmannTreeNodeMetaDirectoryP.class) +//@ProtoMirror(JKleppmannTreeNodeMetaDirectoryP.class) public class JKleppmannTreeNodeMetaDirectory extends JKleppmannTreeNodeMeta { public JKleppmannTreeNodeMetaDirectory(String name) { super(name); diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/jkleppmanntree/structs/JKleppmannTreeNodeMetaFile.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/jkleppmanntree/structs/JKleppmannTreeNodeMetaFile.java index 5c1ff8f1..b51af59a 100644 --- a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/jkleppmanntree/structs/JKleppmannTreeNodeMetaFile.java +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/jkleppmanntree/structs/JKleppmannTreeNodeMetaFile.java @@ -6,7 +6,7 @@ import com.usatiuk.dhfs.objects.persistence.JKleppmannTreeNodeMetaFileP; import java.util.Objects; -@ProtoMirror(JKleppmannTreeNodeMetaFileP.class) +//@ProtoMirror(JKleppmannTreeNodeMetaFileP.class) public class JKleppmannTreeNodeMetaFile extends JKleppmannTreeNodeMeta { private final JObjectKey _fileIno; diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/CertificateTools.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/CertificateTools.java new file mode 100644 index 00000000..fcb5a07e --- /dev/null +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/CertificateTools.java @@ -0,0 +1,63 @@ +package com.usatiuk.dhfs.objects.repository; + +import org.apache.commons.codec.digest.DigestUtils; +import org.bouncycastle.asn1.ASN1ObjectIdentifier; +import org.bouncycastle.asn1.x500.X500Name; +import org.bouncycastle.asn1.x509.BasicConstraints; +import org.bouncycastle.cert.CertIOException; +import org.bouncycastle.cert.jcajce.JcaX509CertificateConverter; +import org.bouncycastle.cert.jcajce.JcaX509v3CertificateBuilder; +import org.bouncycastle.jce.provider.BouncyCastleProvider; +import org.bouncycastle.operator.ContentSigner; +import org.bouncycastle.operator.OperatorCreationException; +import org.bouncycastle.operator.jcajce.JcaContentSignerBuilder; + +import java.io.ByteArrayInputStream; +import java.io.InputStream; +import java.math.BigInteger; +import java.security.*; +import java.security.cert.CertificateException; +import java.security.cert.CertificateFactory; +import java.security.cert.X509Certificate; +import java.util.Calendar; +import java.util.Date; + +public class CertificateTools { + + public static X509Certificate certFromBytes(byte[] bytes) throws CertificateException { + CertificateFactory certFactory = CertificateFactory.getInstance("X.509"); + InputStream in = new ByteArrayInputStream(bytes); + return (X509Certificate) certFactory.generateCertificate(in); + } + + public static KeyPair generateKeyPair() throws NoSuchAlgorithmException { + KeyPairGenerator keyGen = KeyPairGenerator.getInstance("RSA"); + keyGen.initialize(2048); //FIXME: + return keyGen.generateKeyPair(); + } + + public static X509Certificate generateCertificate(KeyPair keyPair, String subject) throws CertificateException, CertIOException, NoSuchAlgorithmException, OperatorCreationException { + Provider bcProvider = new BouncyCastleProvider(); + Security.addProvider(bcProvider); + + Date startDate = new Date(); + + X500Name cnName = new X500Name("CN=" + subject); + BigInteger certSerialNumber = new BigInteger(DigestUtils.sha256(subject)); + + Calendar calendar = Calendar.getInstance(); + calendar.setTime(startDate); + calendar.add(Calendar.YEAR, 999); + + Date endDate = calendar.getTime(); + + ContentSigner contentSigner = new JcaContentSignerBuilder("SHA256WithRSA").build(keyPair.getPrivate()); + + JcaX509v3CertificateBuilder certBuilder = new JcaX509v3CertificateBuilder(cnName, certSerialNumber, startDate, endDate, cnName, keyPair.getPublic()); + + BasicConstraints basicConstraints = new BasicConstraints(false); + certBuilder.addExtension(new ASN1ObjectIdentifier("2.5.29.19"), true, basicConstraints); + + return new JcaX509CertificateConverter().setProvider(bcProvider).getCertificate(certBuilder.build(contentSigner)); + } +} diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/PeerManager.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/PeerManager.java new file mode 100644 index 00000000..81f3c764 --- /dev/null +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/PeerManager.java @@ -0,0 +1,233 @@ +package com.usatiuk.dhfs.objects.repository; + +import com.usatiuk.dhfs.objects.PeerId; +import com.usatiuk.dhfs.objects.TransactionManager; +import com.usatiuk.dhfs.objects.repository.peerdiscovery.PeerAddress; +import com.usatiuk.dhfs.objects.repository.peerdiscovery.PeerDiscoveryDirectory; +import com.usatiuk.dhfs.objects.repository.peersync.PeerInfo; +import com.usatiuk.dhfs.objects.repository.peersync.PeerInfoService; +import com.usatiuk.dhfs.objects.repository.peersync.api.PeerSyncApiClientDynamic; +import com.usatiuk.dhfs.objects.repository.peertrust.PeerTrustManager; +import com.usatiuk.dhfs.objects.repository.webapi.AvailablePeerInfo; +import com.usatiuk.dhfs.objects.transaction.Transaction; +import io.quarkus.logging.Log; +import io.quarkus.runtime.StartupEvent; +import io.quarkus.scheduler.Scheduled; +import io.smallrye.common.annotation.Blocking; +import jakarta.annotation.Priority; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.enterprise.event.Observes; +import jakarta.inject.Inject; +import org.eclipse.microprofile.config.inject.ConfigProperty; + +import java.io.IOException; +import java.util.*; +import java.util.concurrent.*; + +@ApplicationScoped +public class PeerManager { + private final ConcurrentMap _states = new ConcurrentHashMap<>(); + // FIXME: Ideally not call them on every ping + private final ArrayList _connectedListeners = new ArrayList<>(); + private final ArrayList _disconnectedListeners = new ArrayList<>(); + @Inject + PersistentPeerDataService persistentPeerDataService; + @Inject + PeerInfoService peerInfoService; + @Inject + RpcClientFactory rpcClientFactory; + @Inject + PeerSyncApiClientDynamic peerSyncApiClient; + @Inject + TransactionManager transactionManager; + @Inject + Transaction curTx; + @Inject + PeerTrustManager peerTrustManager; + @ConfigProperty(name = "dhfs.objects.sync.ping.timeout") + long pingTimeout; + @Inject + PeerDiscoveryDirectory peerDiscoveryDirectory; + private ExecutorService _heartbeatExecutor; + + // Note: keep priority updated with below + void init(@Observes @Priority(600) StartupEvent event) throws IOException { + _heartbeatExecutor = Executors.newVirtualThreadPerTaskExecutor(); + } + + @Scheduled(every = "${dhfs.objects.reconnect_interval}", concurrentExecution = Scheduled.ConcurrentExecution.SKIP) + @Blocking + public void tryConnectAll() { + if (_heartbeatExecutor == null) return; + try { + _heartbeatExecutor.invokeAll(peerInfoService.getPeersNoSelf() + .stream() + .>map(host -> () -> { + try { + if (isReachable(host)) + Log.trace("Heartbeat: " + host); + else + Log.debug("Trying to connect to " + host); + var bestAddr = selectBestAddress(host.id()); + if (pingCheck(host, bestAddr)) + handleConnectionSuccess(host, bestAddr); + else + handleConnectionError(host); + } catch (Exception e) { + Log.error("Failed to connect to " + host, e); + } + return null; + }).toList(), 30, TimeUnit.SECONDS); //FIXME: + } catch (InterruptedException iex) { + Log.error("Heartbeat was interrupted"); + } + } + + // Note: registrations should be completed with Priority < 600 + public void registerConnectEventListener(ConnectionEventListener listener) { + synchronized (_connectedListeners) { + _connectedListeners.add(listener); + } + } + + // Note: registrations should be completed with Priority < 600 + public void registerDisconnectEventListener(ConnectionEventListener listener) { + synchronized (_disconnectedListeners) { + _disconnectedListeners.add(listener); + } + } + + private void handleConnectionSuccess(PeerInfo host, PeerAddress address) { + boolean wasReachable = isReachable(host); + +// boolean shouldSyncObj = persistentPeerDataService.markInitialObjSyncDone(host); +// boolean shouldSyncOp = persistentPeerDataService.markInitialOpSyncDone(host); +// +// if (shouldSyncObj) +// syncHandler.pushInitialResyncObj(host); +// if (shouldSyncOp) +// syncHandler.pushInitialResyncOp(host); + + _states.put(host.id(), address); + + if (wasReachable) return; + + Log.info("Connected to " + host); + +// for (var l : _connectedListeners) { +// l.apply(host); +// } + } + + public void handleConnectionError(PeerInfo host) { + boolean wasReachable = isReachable(host); + + if (wasReachable) + Log.info("Lost connection to " + host); + + _states.remove(host.id()); + +// for (var l : _disconnectedListeners) { +// l.apply(host); +// } + } + + // FIXME: + private boolean pingCheck(PeerInfo host, PeerAddress address) { + try { + return rpcClientFactory.withObjSyncClient(host.id(), address, pingTimeout, c -> { + var ret = c.ping(PingRequest.newBuilder().setSelfUuid(persistentPeerDataService.getSelfUuid().toString()).build()); + if (!UUID.fromString(ret.getSelfUuid()).equals(host.id().id())) { + throw new IllegalStateException("Ping selfUuid returned " + ret.getSelfUuid() + " but expected " + host.id()); + } + return true; + }); + } catch (Exception ignored) { + Log.debug("Host " + host + " is unreachable: " + ignored.getMessage() + " " + ignored.getCause()); + return false; + } + } + + public boolean isReachable(PeerInfo host) { + return _states.containsKey(host.id()); + } + + public PeerAddress getAddress(PeerId host) { + return _states.get(host); + } + + public List getAvailableHosts() { + return _states.keySet().stream().toList(); + } + +// public List getUnavailableHosts() { +// return _transientPeersState.runReadLocked(d -> d.getStates().entrySet().stream() +// .filter(e -> !e.getValue().isReachable()) +// .map(Map.Entry::getKey).toList()); +// } + +// public HostStateSnapshot getHostStateSnapshot() { +// ArrayList available = new ArrayList<>(); +// ArrayList unavailable = new ArrayList<>(); +// _transientPeersState.runReadLocked(d -> { +// for (var v : d.getStates().entrySet()) { +// if (v.getValue().isReachable()) +// available.add(v.getKey()); +// else +// unavailable.add(v.getKey()); +// } +// return null; +// } +// ); +// return new HostStateSnapshot(available, unavailable); +// } + +// public void removeRemoteHost(UUID host) { +// persistentPeerDataService.removeHost(host); +// // Race? +// _transientPeersState.runWriteLocked(d -> { +// d.getStates().remove(host); +// return null; +// }); +// } + + private PeerAddress selectBestAddress(PeerId host) { + return peerDiscoveryDirectory.getForPeer(host).stream().findFirst().orElseThrow(); + } + + public void addRemoteHost(PeerId host) { + if (_states.containsKey(host)) { + throw new IllegalStateException("Host " + host + " is already added"); + } + + transactionManager.run(() -> { + if (peerInfoService.getPeerInfo(host).isPresent()) + throw new IllegalStateException("Host " + host + " is already added"); + + var info = peerSyncApiClient.getSelfInfo(selectBestAddress(host)); + + var cert = Base64.getDecoder().decode(info.cert()); + peerInfoService.putPeer(host, cert); + }); + + peerTrustManager.reloadTrustManagerHosts( + transactionManager.run(() -> peerInfoService.getPeers().stream().toList()) + ); //FIXME: + } + + public Collection getSeenButNotAddedHosts() { + return transactionManager.run(() -> { + return peerDiscoveryDirectory.getReachablePeers().stream().filter(p -> !peerInfoService.getPeerInfo(p).isPresent()) + .map(p -> new AvailablePeerInfo(p.toString())).toList(); + }); + } + + @FunctionalInterface + public interface ConnectionEventListener { + void apply(UUID host); + } + + public record HostStateSnapshot(List available, List unavailable) { + } + +} diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/PersistentPeerDataService.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/PersistentPeerDataService.java new file mode 100644 index 00000000..523aa7cc --- /dev/null +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/PersistentPeerDataService.java @@ -0,0 +1,179 @@ +package com.usatiuk.dhfs.objects.repository; + +import com.usatiuk.dhfs.ShutdownChecker; +import com.usatiuk.dhfs.objects.PeerId; +import com.usatiuk.dhfs.objects.TransactionManager; +import com.usatiuk.dhfs.objects.repository.peersync.PeerInfoService; +import com.usatiuk.dhfs.objects.repository.peertrust.PeerTrustManager; +import com.usatiuk.dhfs.objects.transaction.Transaction; +import io.quarkus.logging.Log; +import io.quarkus.runtime.StartupEvent; +import jakarta.annotation.Priority; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.enterprise.event.Observes; +import jakarta.inject.Inject; +import org.eclipse.microprofile.config.inject.ConfigProperty; + +import java.io.IOException; +import java.security.KeyPair; +import java.security.cert.X509Certificate; +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.ExecutorService; + +@ApplicationScoped +public class PersistentPeerDataService { + @Inject + PeerTrustManager peerTrustManager; + @Inject + ExecutorService executorService; + @Inject + RpcClientFactory rpcClientFactory; + @Inject + ShutdownChecker shutdownChecker; + @Inject + TransactionManager jObjectTxManager; + @Inject + Transaction curTx; + @Inject + PeerInfoService peerInfoService; + + @ConfigProperty(name = "dhfs.peerdiscovery.preset-uuid") + Optional presetUuid; + + private PeerId _selfUuid; + private X509Certificate _selfCertificate; + private KeyPair _selfKeyPair; + + void init(@Observes @Priority(300) StartupEvent event) throws IOException { + jObjectTxManager.run(() -> { + var selfData = curTx.get(PersistentRemoteHostsData.class, PersistentRemoteHostsData.KEY).orElse(null); + if (selfData != null) { + _selfUuid = selfData.selfUuid(); + _selfCertificate = selfData.selfCertificate(); + _selfKeyPair = selfData.selfKeyPair(); + return; + } else { + _selfUuid = presetUuid.map(s -> PeerId.of(UUID.fromString(s))).orElseGet(() -> PeerId.of(UUID.randomUUID())); + try { + Log.info("Generating a key pair, please wait"); + _selfKeyPair = CertificateTools.generateKeyPair(); + _selfCertificate = CertificateTools.generateCertificate(_selfKeyPair, _selfUuid.toString()); + + curTx.put(new PersistentRemoteHostsData(_selfUuid, 0, _selfCertificate, _selfKeyPair)); + peerInfoService.putPeer(_selfUuid, _selfCertificate.getEncoded()); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + }); + peerTrustManager.reloadTrustManagerHosts(peerInfoService.getPeers()); + Log.info("Self uuid is: " + _selfUuid.toString()); + } + +// private void pushPeerUpdates() { +// pushPeerUpdates(null); +// } + +// private void pushPeerUpdates(@Nullable JObject obj) { +// if (obj != null) +// Log.info("Scheduling certificate update after " + obj.getMeta().getName() + " was updated"); +// executorService.submit(() -> { +// updateCerts(); +// invalidationQueueService.pushInvalidationToAll(PeerDirectory.PeerDirectoryObjName); +// for (var p : peerDirectory.get().runReadLocked(JObjectManager.ResolutionStrategy.LOCAL_ONLY, (m, d) -> d.getPeers().stream().toList())) +// invalidationQueueService.pushInvalidationToAll(PersistentPeerInfo.getNameFromUuid(p)); +// }); +// } + + public PeerId getSelfUuid() { + return _selfUuid; + } + + public long getUniqueId() { + return jObjectTxManager.run(() -> { + var curData = curTx.get(PersistentRemoteHostsData.class, PersistentRemoteHostsData.KEY).orElse(null); + curTx.put(curData.withSelfCounter(curData.selfCounter() + 1)); + return curData.selfCounter(); + }); + } + +// private void updateCerts() { +// try { +// peerDirectory.get().runReadLocked(JObjectManager.ResolutionStrategy.LOCAL_ONLY, (m, d) -> { +// peerTrustManager.reloadTrustManagerHosts(getHostsNoNulls()); +// // Fixme:? I don't think it should be needed with custom trust store +// // but it doesn't work? +// rpcClientFactory.dropCache(); +// return null; +// }); +// } catch (Exception ex) { +// Log.warn("Error when refreshing certificates, will retry: " + ex.getMessage()); +// pushPeerUpdates(); +// } +// } + + public KeyPair getSelfKeypair() { + return _selfKeyPair; + } + + public X509Certificate getSelfCertificate() { + return _selfCertificate; + } + +// // Returns true if host's initial sync wasn't done before, and marks it as done +// public boolean markInitialOpSyncDone(UUID connectedHost) { +// return jObjectTxManager.executeTx(() -> { +// peerDirectoryLocal.get().rwLock(); +// try { +// peerDirectoryLocal.get().local(); +// boolean contained = peerDirectoryLocal.get().getData().getInitialOpSyncDone().contains(connectedHost); +// +// if (!contained) +// peerDirectoryLocal.get().local().mutate(new JMutator() { +// @Override +// public boolean mutate(PeerDirectoryLocal object) { +// object.getInitialOpSyncDone().add(connectedHost); +// return true; +// } +// +// @Override +// public void revert(PeerDirectoryLocal object) { +// object.getInitialOpSyncDone().remove(connectedHost); +// } +// }); +// return !contained; +// } finally { +// peerDirectoryLocal.get().rwUnlock(); +// } +// }); +// } +// +// public boolean markInitialObjSyncDone(UUID connectedHost) { +// return jObjectTxManager.executeTx(() -> { +// peerDirectoryLocal.get().rwLock(); +// try { +// peerDirectoryLocal.get().local(); +// boolean contained = peerDirectoryLocal.get().getData().getInitialObjSyncDone().contains(connectedHost); +// +// if (!contained) +// peerDirectoryLocal.get().local().mutate(new JMutator() { +// @Override +// public boolean mutate(PeerDirectoryLocal object) { +// object.getInitialObjSyncDone().add(connectedHost); +// return true; +// } +// +// @Override +// public void revert(PeerDirectoryLocal object) { +// object.getInitialObjSyncDone().remove(connectedHost); +// } +// }); +// return !contained; +// } finally { +// peerDirectoryLocal.get().rwUnlock(); +// } +// }); +// } + +} diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/PersistentRemoteHostsData.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/PersistentRemoteHostsData.java new file mode 100644 index 00000000..23a99e0e --- /dev/null +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/PersistentRemoteHostsData.java @@ -0,0 +1,25 @@ +package com.usatiuk.dhfs.objects.repository; + +import com.usatiuk.dhfs.objects.JData; +import com.usatiuk.dhfs.objects.JObjectKey; +import com.usatiuk.dhfs.objects.PeerId; + +import java.io.Serializable; +import java.security.KeyPair; +import java.security.cert.X509Certificate; + +public record PersistentRemoteHostsData(PeerId selfUuid, + long selfCounter, + X509Certificate selfCertificate, + KeyPair selfKeyPair) implements JData, Serializable { + public static final JObjectKey KEY = JObjectKey.of("self_peer_data"); + + @Override + public JObjectKey key() { + return KEY; + } + + public PersistentRemoteHostsData withSelfCounter(long selfCounter) { + return new PersistentRemoteHostsData(selfUuid, selfCounter, selfCertificate, selfKeyPair); + } +} diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/RemoteObjectServiceClient.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/RemoteObjectServiceClient.java new file mode 100644 index 00000000..37458390 --- /dev/null +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/RemoteObjectServiceClient.java @@ -0,0 +1,174 @@ +//package com.usatiuk.dhfs.objects.repository; +// +//import com.google.common.collect.Maps; +//import com.usatiuk.autoprotomap.runtime.ProtoSerializer; +//import com.usatiuk.dhfs.objects.jrepository.*; +//import com.usatiuk.dhfs.objects.persistence.JObjectDataP; +//import com.usatiuk.dhfs.objects.repository.invalidation.InvalidationQueueService; +//import com.usatiuk.dhfs.objects.repository.opsupport.Op; +//import io.grpc.Status; +//import io.grpc.StatusRuntimeException; +//import io.quarkus.logging.Log; +//import jakarta.enterprise.context.ApplicationScoped; +//import jakarta.inject.Inject; +//import org.apache.commons.lang3.tuple.Pair; +// +//import javax.annotation.Nullable; +//import java.util.*; +//import java.util.concurrent.Callable; +//import java.util.concurrent.ConcurrentLinkedDeque; +//import java.util.concurrent.Executors; +//import java.util.stream.Collectors; +// +//@ApplicationScoped +//public class RemoteObjectServiceClient { +// @Inject +// PersistentPeerDataService persistentPeerDataService; +// +// @Inject +// RpcClientFactory rpcClientFactory; +// +// @Inject +// JObjectManager jObjectManager; +// +// @Inject +// SyncHandler syncHandler; +// @Inject +// InvalidationQueueService invalidationQueueService; +// @Inject +// ProtoSerializer dataProtoSerializer; +// @Inject +// ProtoSerializer opProtoSerializer; +// @Inject +// JObjectTxManager jObjectTxManager; +// +// public Pair getSpecificObject(UUID host, String name) { +// return rpcClientFactory.withObjSyncClient(host, client -> { +// var reply = client.getObject(GetObjectRequest.newBuilder().setSelfUuid(persistentPeerDataService.getSelfUuid().toString()).setName(name).build()); +// return Pair.of(reply.getObject().getHeader(), reply.getObject().getContent()); +// }); +// } +// +// public JObjectDataP getObject(JObject jObject) { +// jObject.assertRwLock(); +// +// var targets = jObject.runReadLocked(JObjectManager.ResolutionStrategy.NO_RESOLUTION, (md, d) -> { +// var ourVersion = md.getOurVersion(); +// if (ourVersion >= 1) +// return md.getRemoteCopies().entrySet().stream() +// .filter(entry -> entry.getValue().equals(ourVersion)) +// .map(Map.Entry::getKey).toList(); +// else +// return persistentPeerDataService.getHostUuids(); +// }); +// +// if (targets.isEmpty()) +// throw new IllegalStateException("No targets for object " + jObject.getMeta().getName()); +// +// Log.info("Downloading object " + jObject.getMeta().getName() + " from " + targets.stream().map(UUID::toString).collect(Collectors.joining(", "))); +// +// return rpcClientFactory.withObjSyncClient(targets, client -> { +// var reply = client.getObject(GetObjectRequest.newBuilder().setSelfUuid(persistentPeerDataService.getSelfUuid().toString()).setName(jObject.getMeta().getName()).build()); +// +// var receivedMap = new HashMap(); +// for (var e : reply.getObject().getHeader().getChangelog().getEntriesList()) { +// receivedMap.put(UUID.fromString(e.getHost()), e.getVersion()); +// } +// +// return jObjectTxManager.executeTx(() -> { +// return jObject.runWriteLocked(JObjectManager.ResolutionStrategy.NO_RESOLUTION, (md, d, b, v) -> { +// var unexpected = !Objects.equals( +// Maps.filterValues(md.getChangelog(), val -> val != 0), +// Maps.filterValues(receivedMap, val -> val != 0)); +// +// if (unexpected) { +// try { +// syncHandler.handleOneUpdate(UUID.fromString(reply.getSelfUuid()), reply.getObject().getHeader()); +// } catch (SyncHandler.OutdatedUpdateException ignored) { +// Log.info("Outdated update of " + md.getName() + " from " + reply.getSelfUuid()); +// invalidationQueueService.pushInvalidationToOne(UUID.fromString(reply.getSelfUuid()), md.getName()); // True? +// throw new StatusRuntimeException(Status.ABORTED.withDescription("Received outdated object version")); +// } catch (Exception e) { +// Log.error("Received unexpected object version from " + reply.getSelfUuid() +// + " for " + reply.getObject().getHeader().getName() + " and conflict resolution failed", e); +// throw new StatusRuntimeException(Status.ABORTED.withDescription("Received unexpected object version")); +// } +// } +// +// return reply.getObject().getContent(); +// }); +// }); +// }); +// } +// +// @Nullable +// public IndexUpdateReply notifyUpdate(JObject obj, UUID host) { +// var builder = IndexUpdatePush.newBuilder().setSelfUuid(persistentPeerDataService.getSelfUuid().toString()); +// +// var header = obj +// .runReadLocked( +// obj.getMeta().getKnownClass().isAnnotationPresent(PushResolution.class) +// ? JObjectManager.ResolutionStrategy.LOCAL_ONLY +// : JObjectManager.ResolutionStrategy.NO_RESOLUTION, +// (m, d) -> { +// if (obj.getMeta().isDeleted()) return null; +// if (m.getKnownClass().isAnnotationPresent(PushResolution.class) && d == null) +// Log.warn("Object " + m.getName() + " is marked as PushResolution but no resolution found"); +// if (m.getKnownClass().isAnnotationPresent(PushResolution.class)) +// return m.toRpcHeader(dataProtoSerializer.serialize(d)); +// else +// return m.toRpcHeader(); +// }); +// if (header == null) return null; +// jObjectTxManager.executeTx(obj::markSeen); +// builder.setHeader(header); +// +// var send = builder.build(); +// +// return rpcClientFactory.withObjSyncClient(host, client -> client.indexUpdate(send)); +// } +// +// public OpPushReply pushOps(List ops, String queueName, UUID host) { +// for (Op op : ops) { +// for (var ref : op.getEscapedRefs()) { +// jObjectTxManager.executeTx(() -> { +// jObjectManager.get(ref).ifPresent(JObject::markSeen); +// }); +// } +// } +// var builder = OpPushMsg.newBuilder() +// .setSelfUuid(persistentPeerDataService.getSelfUuid().toString()) +// .setQueueId(queueName); +// for (var op : ops) +// builder.addMsg(opProtoSerializer.serialize(op)); +// return rpcClientFactory.withObjSyncClient(host, client -> client.opPush(builder.build())); +// } +// +// public Collection canDelete(Collection targets, String object, Collection ourReferrers) { +// ConcurrentLinkedDeque results = new ConcurrentLinkedDeque<>(); +// Log.trace("Asking canDelete for " + object + " from " + targets.stream().map(UUID::toString).collect(Collectors.joining(", "))); +// try (var executor = Executors.newVirtualThreadPerTaskExecutor()) { +// try { +// executor.invokeAll(targets.stream().>map(h -> () -> { +// try { +// var req = CanDeleteRequest.newBuilder() +// .setSelfUuid(persistentPeerDataService.getSelfUuid().toString()) +// .setName(object); +// req.addAllOurReferrers(ourReferrers); +// var res = rpcClientFactory.withObjSyncClient(h, client -> client.canDelete(req.build())); +// if (res != null) +// results.add(res); +// } catch (Exception e) { +// Log.debug("Error when asking canDelete for object " + object, e); +// } +// return null; +// }).toList()); +// } catch (InterruptedException e) { +// Log.warn("Interrupted waiting for canDelete for object " + object); +// } +// if (!executor.shutdownNow().isEmpty()) +// Log.warn("Didn't ask all targets when asking canDelete for " + object); +// } +// return results; +// } +//} diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/RemoteObjectServiceServer.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/RemoteObjectServiceServer.java new file mode 100644 index 00000000..990ad534 --- /dev/null +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/RemoteObjectServiceServer.java @@ -0,0 +1,191 @@ +package com.usatiuk.dhfs.objects.repository; + +import io.grpc.Status; +import io.grpc.StatusRuntimeException; +import io.quarkus.grpc.GrpcService; +import io.smallrye.common.annotation.Blocking; +import io.smallrye.mutiny.Uni; +import jakarta.annotation.security.RolesAllowed; +import jakarta.inject.Inject; + +/// / Note: RunOnVirtualThread hangs somehow +@GrpcService +@RolesAllowed("cluster-member") +public class RemoteObjectServiceServer implements DhfsObjectSyncGrpc { +// @Inject +// SyncHandler syncHandler; +// +// @Inject +// JObjectManager jObjectManager; +// +// @Inject +// PeerManager peerManager; +// +// @Inject +// AutoSyncProcessor autoSyncProcessor; +// +@Inject +PersistentPeerDataService persistentPeerDataService; +// +// @Inject +// InvalidationQueueService invalidationQueueService; +// +// @Inject +// ProtoSerializer dataProtoSerializer; +// @Inject +// ProtoSerializer opProtoSerializer; +// +// @Inject +// OpObjectRegistry opObjectRegistry; +// +// @Inject +// JObjectTxManager jObjectTxManager; +// +// @Override +// @Blocking +// public Uni getObject(GetObjectRequest request) { +// if (request.getSelfUuid().isBlank()) throw new StatusRuntimeException(Status.INVALID_ARGUMENT); +// if (!persistentPeerDataService.existsHost(UUID.fromString(request.getSelfUuid()))) +// throw new StatusRuntimeException(Status.UNAUTHENTICATED); +// +// Log.info("<-- getObject: " + request.getName() + " from " + request.getSelfUuid()); +// +// var obj = jObjectManager.get(request.getName()).orElseThrow(() -> new StatusRuntimeException(Status.NOT_FOUND)); +// +// // Does @Blocking break this? +// return Uni.createFrom().emitter(emitter -> { +// var replyObj = jObjectTxManager.executeTx(() -> { +// // Obj.markSeen before markSeen of its children +// obj.markSeen(); +// return obj.runReadLocked(JObjectManager.ResolutionStrategy.LOCAL_ONLY, (meta, data) -> { +// if (meta.isOnlyLocal()) +// throw new StatusRuntimeExceptionNoStacktrace(Status.INVALID_ARGUMENT.withDescription("Trying to get local-only object")); +// if (data == null) { +// Log.info("<-- getObject FAIL: " + request.getName() + " from " + request.getSelfUuid()); +// throw new StatusRuntimeException(Status.ABORTED.withDescription("Not available locally")); +// } +// data.extractRefs().forEach(ref -> +// jObjectManager.get(ref) +// .orElseThrow(() -> new IllegalStateException("Non-hydrated refs for local object?")) +// .markSeen()); +// +// return ApiObject.newBuilder() +// .setHeader(obj.getMeta().toRpcHeader()) +// .setContent(dataProtoSerializer.serialize(obj.getData())).build(); +// }); +// }); +// var ret = GetObjectReply.newBuilder() +// .setSelfUuid(persistentPeerDataService.getSelfUuid().toString()) +// .setObject(replyObj).build(); +// // TODO: Could this cause problems if we wait for too long? +// obj.commitFenceAsync(() -> emitter.complete(ret)); +// }); +// } +// +// @Override +// @Blocking +// public Uni canDelete(CanDeleteRequest request) { +// if (request.getSelfUuid().isBlank()) throw new StatusRuntimeException(Status.INVALID_ARGUMENT); +// if (!persistentPeerDataService.existsHost(UUID.fromString(request.getSelfUuid()))) +// throw new StatusRuntimeException(Status.UNAUTHENTICATED); +// +// Log.info("<-- canDelete: " + request.getName() + " from " + request.getSelfUuid()); +// +// var builder = CanDeleteReply.newBuilder(); +// +// var obj = jObjectManager.get(request.getName()); +// +// builder.setSelfUuid(persistentPeerDataService.getSelfUuid().toString()); +// builder.setObjName(request.getName()); +// +// if (obj.isPresent()) try { +// boolean tryUpdate = obj.get().runReadLocked(JObjectManager.ResolutionStrategy.NO_RESOLUTION, (m, d) -> { +// if (m.isDeleted() && !m.isDeletionCandidate()) +// throw new IllegalStateException("Object " + m.getName() + " is deleted but not a deletion candidate"); +// builder.setDeletionCandidate(m.isDeletionCandidate()); +// builder.addAllReferrers(m.getReferrers()); +// return m.isDeletionCandidate() && !m.isDeleted(); +// }); +// // FIXME +//// if (tryUpdate) { +//// obj.get().runWriteLocked(JObjectManager.ResolutionStrategy.NO_RESOLUTION, (m, d, b, v) -> { +//// return null; +//// }); +//// } +// } catch (DeletedObjectAccessException dox) { +// builder.setDeletionCandidate(true); +// } +// else { +// builder.setDeletionCandidate(true); +// } +// +// var ret = builder.build(); +// +// if (!ret.getDeletionCandidate()) +// for (var rr : request.getOurReferrersList()) +// autoSyncProcessor.add(rr); +// +// return Uni.createFrom().item(ret); +// } +// +// @Override +// @Blocking +// public Uni indexUpdate(IndexUpdatePush request) { +// if (request.getSelfUuid().isBlank()) throw new StatusRuntimeException(Status.INVALID_ARGUMENT); +// if (!persistentPeerDataService.existsHost(UUID.fromString(request.getSelfUuid()))) +// throw new StatusRuntimeException(Status.UNAUTHENTICATED); +// + /// / Log.info("<-- indexUpdate: " + request.getHeader().getName()); +// return jObjectTxManager.executeTxAndFlush(() -> { +// return Uni.createFrom().item(syncHandler.handleRemoteUpdate(request)); +// }); +// } +// +// @Override +// @Blocking +// public Uni opPush(OpPushMsg request) { +// if (request.getSelfUuid().isBlank()) throw new StatusRuntimeException(Status.INVALID_ARGUMENT); +// if (!persistentPeerDataService.existsHost(UUID.fromString(request.getSelfUuid()))) +// throw new StatusRuntimeException(Status.UNAUTHENTICATED); +// +// try { +// var objs = request.getMsgList().stream().map(opProtoSerializer::deserialize).toList(); +// jObjectTxManager.executeTxAndFlush(() -> { +// opObjectRegistry.acceptExternalOps(request.getQueueId(), UUID.fromString(request.getSelfUuid()), objs); +// }); +// } catch (Exception e) { +// Log.error(e, e); +// throw e; +// } +// return Uni.createFrom().item(OpPushReply.getDefaultInstance()); +// } +// + + @Override + public Uni getObject(GetObjectRequest request) { + return null; + } + + @Override + public Uni canDelete(CanDeleteRequest request) { + return null; + } + + @Override + public Uni indexUpdate(IndexUpdatePush request) { + return null; + } + + @Override + public Uni opPush(OpPushMsg request) { + return null; + } + + @Override + @Blocking + public Uni ping(PingRequest request) { + if (request.getSelfUuid().isBlank()) throw new StatusRuntimeException(Status.INVALID_ARGUMENT); + + return Uni.createFrom().item(PingReply.newBuilder().setSelfUuid(persistentPeerDataService.getSelfUuid().toString()).build()); + } +} diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/RpcChannelFactory.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/RpcChannelFactory.java new file mode 100644 index 00000000..a985be13 --- /dev/null +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/RpcChannelFactory.java @@ -0,0 +1,44 @@ +package com.usatiuk.dhfs.objects.repository; + +import com.usatiuk.dhfs.objects.PeerId; +import com.usatiuk.dhfs.objects.repository.peertrust.PeerTrustManager; +import io.grpc.ChannelCredentials; +import io.grpc.ManagedChannel; +import io.grpc.TlsChannelCredentials; +import io.grpc.netty.NettyChannelBuilder; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; + +import javax.net.ssl.KeyManagerFactory; +import java.security.KeyStore; +import java.security.cert.Certificate; +import java.util.concurrent.TimeUnit; + +@ApplicationScoped +public class RpcChannelFactory { + @Inject + PersistentPeerDataService persistentPeerDataService; + @Inject + PeerTrustManager peerTrustManager; + + private ChannelCredentials getChannelCredentials() { + try { + KeyStore ks = KeyStore.getInstance(KeyStore.getDefaultType()); + ks.load(null, null); + + ks.setKeyEntry("clientkey", persistentPeerDataService.getSelfKeypair().getPrivate(), null, new Certificate[]{persistentPeerDataService.getSelfCertificate()}); + + KeyManagerFactory keyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm()); + keyManagerFactory.init(ks, null); + + ChannelCredentials creds = TlsChannelCredentials.newBuilder().trustManager(peerTrustManager).keyManager(keyManagerFactory.getKeyManagers()).build(); + return creds; + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + ManagedChannel getSecureChannel(PeerId host, String address, int port) { + return NettyChannelBuilder.forAddress(address, port, getChannelCredentials()).overrideAuthority(host.toString()).idleTimeout(10, TimeUnit.SECONDS).build(); + } +} diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/RpcClientFactory.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/RpcClientFactory.java new file mode 100644 index 00000000..3dddddf3 --- /dev/null +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/RpcClientFactory.java @@ -0,0 +1,94 @@ +package com.usatiuk.dhfs.objects.repository; + +import com.usatiuk.dhfs.objects.PeerId; +import com.usatiuk.dhfs.objects.repository.peerdiscovery.IpPeerAddress; +import com.usatiuk.dhfs.objects.repository.peerdiscovery.PeerAddress; +import io.grpc.Status; +import io.grpc.StatusRuntimeException; +import io.quarkus.logging.Log; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; +import org.eclipse.microprofile.config.inject.ConfigProperty; + +import java.net.InetAddress; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; + +// TODO: Dedup this +@ApplicationScoped +public class RpcClientFactory { + @ConfigProperty(name = "dhfs.objects.sync.timeout") + long syncTimeout; + + @Inject + PeerManager peerManager; + + @Inject + RpcChannelFactory rpcChannelFactory; + + // FIXME: Leaks! + private ConcurrentMap _objSyncCache = new ConcurrentHashMap<>(); + + public R withObjSyncClient(Collection targets, ObjectSyncClientFunction fn) { + var shuffledList = new ArrayList<>(targets); + Collections.shuffle(shuffledList); + for (PeerId target : shuffledList) { + try { + return withObjSyncClient(target, fn); + } catch (StatusRuntimeException e) { + if (e.getStatus().getCode().equals(Status.UNAVAILABLE.getCode())) + Log.debug("Host " + target + " is unreachable: " + e.getMessage()); + else + Log.warn("When calling " + target + " " + e.getMessage()); + } catch (Exception e) { + Log.warn("When calling " + target + " " + e.getMessage()); + } + } + throw new StatusRuntimeException(Status.UNAVAILABLE.withDescription("No reachable targets!")); + } + + public R withObjSyncClient(PeerId target, ObjectSyncClientFunction fn) { + var hostinfo = peerManager.getAddress(target); + + if (hostinfo == null) + throw new StatusRuntimeException(Status.UNAVAILABLE.withDescription("Not known to be reachable: " + target)); + + return withObjSyncClient(target, hostinfo, syncTimeout, fn); + } + + public R withObjSyncClient(PeerId host, PeerAddress address, long timeout, ObjectSyncClientFunction fn) { + return switch (address) { + case IpPeerAddress ipPeerAddress -> + withObjSyncClient(host, ipPeerAddress.address(), ipPeerAddress.securePort(), timeout, fn); + default -> throw new IllegalStateException("Unexpected value: " + address); + }; + } + + public R withObjSyncClient(PeerId host, InetAddress addr, int port, long timeout, ObjectSyncClientFunction fn) { + var key = new ObjSyncStubKey(host, addr, port); + var stub = _objSyncCache.computeIfAbsent(key, (k) -> { + var channel = rpcChannelFactory.getSecureChannel(host, addr.getHostAddress(), port); + return DhfsObjectSyncGrpcGrpc.newBlockingStub(channel) + .withMaxOutboundMessageSize(Integer.MAX_VALUE) + .withMaxInboundMessageSize(Integer.MAX_VALUE); + }); + return fn.apply(stub.withDeadlineAfter(timeout, TimeUnit.SECONDS)); + } + + public void dropCache() { + _objSyncCache = new ConcurrentHashMap<>(); + } + + @FunctionalInterface + public interface ObjectSyncClientFunction { + R apply(DhfsObjectSyncGrpcGrpc.DhfsObjectSyncGrpcBlockingStub client); + } + + private record ObjSyncStubKey(PeerId id, InetAddress addr, int port) { + } + +} diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/SyncHandler.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/SyncHandler.java new file mode 100644 index 00000000..f47e34e2 --- /dev/null +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/SyncHandler.java @@ -0,0 +1,207 @@ +//package com.usatiuk.dhfs.objects.repository; +// +//import com.usatiuk.autoprotomap.runtime.ProtoSerializer; +//import com.usatiuk.dhfs.objects.jrepository.JObject; +//import com.usatiuk.dhfs.objects.jrepository.JObjectData; +//import com.usatiuk.dhfs.objects.jrepository.JObjectManager; +//import com.usatiuk.dhfs.objects.jrepository.JObjectTxManager; +//import com.usatiuk.dhfs.objects.persistence.JObjectDataP; +//import com.usatiuk.dhfs.objects.repository.invalidation.InvalidationQueueService; +//import com.usatiuk.dhfs.objects.repository.opsupport.OpObjectRegistry; +//import com.usatiuk.dhfs.utils.StatusRuntimeExceptionNoStacktrace; +//import io.grpc.Status; +//import io.quarkus.logging.Log; +//import jakarta.enterprise.context.ApplicationScoped; +//import jakarta.enterprise.inject.Instance; +//import jakarta.inject.Inject; +// +//import java.util.HashMap; +//import java.util.Objects; +//import java.util.Optional; +//import java.util.UUID; +//import java.util.concurrent.atomic.AtomicReference; +//import java.util.stream.Collectors; +//import java.util.stream.Stream; +// +//@ApplicationScoped +//public class SyncHandler { +// @Inject +// JObjectManager jObjectManager; +// @Inject +// PeerManager peerManager; +// @Inject +// RemoteObjectServiceClient remoteObjectServiceClient; +// @Inject +// InvalidationQueueService invalidationQueueService; +// @Inject +// Instance conflictResolvers; +// @Inject +// PersistentPeerDataService persistentPeerDataService; +// @Inject +// ProtoSerializer dataProtoSerializer; +// @Inject +// OpObjectRegistry opObjectRegistry; +// @Inject +// JObjectTxManager jObjectTxManager; +// +// public void pushInitialResyncObj(UUID host) { +// Log.info("Doing initial object push for " + host); +// +// var objs = jObjectManager.findAll(); +// +// for (var obj : objs) { +// Log.trace("IS: " + obj + " to " + host); +// invalidationQueueService.pushInvalidationToOne(host, obj); +// } +// } +// +// public void pushInitialResyncOp(UUID host) { +// Log.info("Doing initial op push for " + host); +// +// jObjectTxManager.executeTxAndFlush( +// () -> { +// opObjectRegistry.pushBootstrapData(host); +// } +// ); +// } +// +// public void handleOneUpdate(UUID from, ObjectHeader header) { +// AtomicReference> foundExt = new AtomicReference<>(); +// +// boolean conflict = jObjectTxManager.executeTx(() -> { +// JObject found = jObjectManager.getOrPut(header.getName(), JObjectData.class, Optional.empty()); +// foundExt.set(found); +// +// var receivedTotalVer = header.getChangelog().getEntriesList() +// .stream().map(ObjectChangelogEntry::getVersion).reduce(0L, Long::sum); +// +// var receivedMap = new HashMap(); +// for (var e : header.getChangelog().getEntriesList()) { +// receivedMap.put(UUID.fromString(e.getHost()), e.getVersion()); +// } +// +// return found.runWriteLocked(JObjectManager.ResolutionStrategy.NO_RESOLUTION, (md, data, bump, invalidate) -> { +// if (md.getRemoteCopies().getOrDefault(from, 0L) > receivedTotalVer) { +// Log.error("Received older index update than was known for host: " +// + from + " " + header.getName()); +// throw new OutdatedUpdateException(); +// } +// +// String rcv = ""; +// for (var e : header.getChangelog().getEntriesList()) { +// rcv += e.getHost() + ": " + e.getVersion() + "; "; +// } +// String ours = ""; +// for (var e : md.getChangelog().entrySet()) { +// ours += e.getKey() + ": " + e.getValue() + "; "; +// } +// Log.trace("Handling update: " + header.getName() + " from " + from + "\n" + "ours: " + ours + " \n" + "received: " + rcv); +// +// boolean updatedRemoteVersion = false; +// +// var oldRemoteVer = md.getRemoteCopies().put(from, receivedTotalVer); +// if (oldRemoteVer == null || !oldRemoteVer.equals(receivedTotalVer)) updatedRemoteVersion = true; +// +// boolean hasLower = false; +// boolean hasHigher = false; +// for (var e : Stream.concat(md.getChangelog().keySet().stream(), receivedMap.keySet().stream()).collect(Collectors.toSet())) { +// if (receivedMap.getOrDefault(e, 0L) < md.getChangelog().getOrDefault(e, 0L)) +// hasLower = true; +// if (receivedMap.getOrDefault(e, 0L) > md.getChangelog().getOrDefault(e, 0L)) +// hasHigher = true; +// } +// +// if (hasLower && hasHigher) { +// Log.info("Conflict on update (inconsistent version): " + header.getName() + " from " + from); +// return true; +// } +// +// if (hasLower) { +// Log.info("Received older index update than known: " +// + from + " " + header.getName()); +// throw new OutdatedUpdateException(); +// } +// +// if (hasHigher) { +// invalidate.apply(); +// md.getChangelog().clear(); +// md.getChangelog().putAll(receivedMap); +// md.getChangelog().putIfAbsent(persistentPeerDataService.getSelfUuid(), 0L); +// if (header.hasPushedData()) +// found.externalResolution(dataProtoSerializer.deserialize(header.getPushedData())); +// return false; +// } else if (data == null && header.hasPushedData()) { +// found.tryResolve(JObjectManager.ResolutionStrategy.LOCAL_ONLY); +// if (found.getData() == null) +// found.externalResolution(dataProtoSerializer.deserialize(header.getPushedData())); +// } +// +// assert Objects.equals(receivedTotalVer, md.getOurVersion()); +// +// if (!updatedRemoteVersion) +// Log.debug("No action on update: " + header.getName() + " from " + from); +// +// return false; +// }); +// }); +// +// // TODO: Is the lock gap here ok? +// if (conflict) { +// Log.info("Trying conflict resolution: " + header.getName() + " from " + from); +// var found = foundExt.get(); +// +// JObjectData theirsData; +// ObjectHeader theirsHeader; +// if (header.hasPushedData()) { +// theirsHeader = header; +// theirsData = dataProtoSerializer.deserialize(header.getPushedData()); +// } else { +// var got = remoteObjectServiceClient.getSpecificObject(from, header.getName()); +// theirsData = dataProtoSerializer.deserialize(got.getRight()); +// theirsHeader = got.getLeft(); +// } +// +// jObjectTxManager.executeTx(() -> { +// var resolverClass = found.runReadLocked(JObjectManager.ResolutionStrategy.LOCAL_ONLY, (m, d) -> { +// if (d == null) +// throw new StatusRuntimeExceptionNoStacktrace(Status.UNAVAILABLE.withDescription("No local data when conflict " + header.getName())); +// return d.getConflictResolver(); +// }); +// var resolver = conflictResolvers.select(resolverClass); +// resolver.get().resolve(from, theirsHeader, theirsData, found); +// }); +// Log.info("Resolved conflict for " + from + " " + header.getName()); +// } +// +// } +// +// public IndexUpdateReply handleRemoteUpdate(IndexUpdatePush request) { +// // TODO: Dedup +// try { +// handleOneUpdate(UUID.fromString(request.getSelfUuid()), request.getHeader()); +// } catch (OutdatedUpdateException ignored) { +// Log.warn("Outdated update of " + request.getHeader().getName() + " from " + request.getSelfUuid()); +// invalidationQueueService.pushInvalidationToOne(UUID.fromString(request.getSelfUuid()), request.getHeader().getName()); +// } catch (Exception ex) { +// Log.info("Error when handling update from " + request.getSelfUuid() + " of " + request.getHeader().getName(), ex); +// throw ex; +// } +// +// return IndexUpdateReply.getDefaultInstance(); +// } +// +// protected static class OutdatedUpdateException extends RuntimeException { +// OutdatedUpdateException() { +// super(); +// } +// +// OutdatedUpdateException(String message) { +// super(message); +// } +// +// @Override +// public synchronized Throwable fillInStackTrace() { +// return this; +// } +// } +//} \ No newline at end of file diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/peerdiscovery/IpPeerAddress.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/peerdiscovery/IpPeerAddress.java new file mode 100644 index 00000000..0d6ab1da --- /dev/null +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/peerdiscovery/IpPeerAddress.java @@ -0,0 +1,9 @@ +package com.usatiuk.dhfs.objects.repository.peerdiscovery; + +import com.usatiuk.dhfs.objects.PeerId; + +import java.net.InetAddress; + +public record IpPeerAddress(PeerId peer, PeerAddressType type, + InetAddress address, int port, int securePort) implements PeerAddress { +} diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/peerdiscovery/PeerAddress.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/peerdiscovery/PeerAddress.java new file mode 100644 index 00000000..81824de5 --- /dev/null +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/peerdiscovery/PeerAddress.java @@ -0,0 +1,8 @@ +package com.usatiuk.dhfs.objects.repository.peerdiscovery; + +import com.usatiuk.dhfs.objects.PeerId; + +public interface PeerAddress { + PeerId peer(); + PeerAddressType type(); +} diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/peerdiscovery/PeerAddressType.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/peerdiscovery/PeerAddressType.java new file mode 100644 index 00000000..b5027e4b --- /dev/null +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/peerdiscovery/PeerAddressType.java @@ -0,0 +1,7 @@ +package com.usatiuk.dhfs.objects.repository.peerdiscovery; + +public enum PeerAddressType { + LAN, + WAN, + PROXY +} diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/peerdiscovery/PeerDiscoveryDirectory.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/peerdiscovery/PeerDiscoveryDirectory.java new file mode 100644 index 00000000..50523880 --- /dev/null +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/peerdiscovery/PeerDiscoveryDirectory.java @@ -0,0 +1,70 @@ +package com.usatiuk.dhfs.objects.repository.peerdiscovery; + +import com.usatiuk.dhfs.objects.PeerId; +import jakarta.enterprise.context.ApplicationScoped; +import org.apache.commons.collections4.MultiValuedMap; +import org.apache.commons.collections4.multimap.HashSetValuedHashMap; +import org.eclipse.microprofile.config.inject.ConfigProperty; + +import java.util.Collection; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; + +@ApplicationScoped +public class PeerDiscoveryDirectory { + @ConfigProperty(name = "dhfs.peerdiscovery.timeout") + long timeout; + + private record PeerEntry(PeerAddress addr, long lastSeen) { + public PeerEntry withLastSeen(long lastSeen) { + return new PeerEntry(addr, lastSeen); + } + + @Override + public boolean equals(Object o) { + if (o == null || getClass() != o.getClass()) return false; + PeerEntry peerEntry = (PeerEntry) o; + return Objects.equals(addr, peerEntry.addr); + } + + @Override + public int hashCode() { + return Objects.hashCode(addr); + } + } + + private final MultiValuedMap _entries = new HashSetValuedHashMap<>(); + + public void notifyAddr(PeerAddress addr) { + synchronized (_entries) { + var peer = addr.peer(); + _entries.removeMapping(peer, new PeerEntry(addr, 0)); + _entries.put(peer, new PeerEntry(addr, System.currentTimeMillis())); + } + } + + public Collection getForPeer(PeerId peer) { + synchronized (_entries) { + long curTime = System.currentTimeMillis(); + var partitioned = _entries.asMap().get(peer).stream() + .collect(Collectors.partitioningBy(e -> e.lastSeen() + timeout < curTime)); + for (var entry : partitioned.get(true)) { + _entries.removeMapping(peer, entry); + } + return partitioned.get(false).stream().map(PeerEntry::addr).toList(); + } + } + + public Collection getReachablePeers() { + synchronized (_entries) { + long curTime = System.currentTimeMillis(); + var partitioned = _entries.entries().stream() + .collect(Collectors.partitioningBy(e -> e.getValue().lastSeen() + timeout < curTime)); + for (var entry : partitioned.get(true)) { + _entries.removeMapping(entry.getKey(), entry.getValue()); + } + return partitioned.get(false).stream().map(Map.Entry::getKey).collect(Collectors.toUnmodifiableSet()); + } + } +} diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/peerdiscovery/StaticPeerDiscovery.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/peerdiscovery/StaticPeerDiscovery.java new file mode 100644 index 00000000..f201c132 --- /dev/null +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/peerdiscovery/StaticPeerDiscovery.java @@ -0,0 +1,46 @@ +package com.usatiuk.dhfs.objects.repository.peerdiscovery; + +import com.usatiuk.dhfs.objects.PeerId; +import io.quarkus.scheduler.Scheduled; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; +import org.eclipse.microprofile.config.inject.ConfigProperty; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.Arrays; +import java.util.List; +import java.util.Optional; +import java.util.stream.Stream; + +@ApplicationScoped +public class StaticPeerDiscovery { + private final List _peers; + + public StaticPeerDiscovery(@ConfigProperty(name = "dhfs.peerdiscovery.static-peers") Optional staticPeers) { + var peers = staticPeers.orElse(""); + _peers = Arrays.stream(peers.split(",")).flatMap(e -> + { + if (e.isEmpty()) { + return Stream.of(); + } + var split = e.split(":"); + try { + return Stream.of(new IpPeerAddress(PeerId.of(split[0]), PeerAddressType.LAN, InetAddress.getByName(split[1]), + Integer.parseInt(split[2]), Integer.parseInt(split[3]))); + } catch (UnknownHostException ex) { + throw new RuntimeException(ex); + } + }).toList(); + } + + @Inject + PeerDiscoveryDirectory peerDiscoveryDirectory; + + @Scheduled(every = "1s", concurrentExecution = Scheduled.ConcurrentExecution.SKIP) + public void discoverPeers() { + for (var peer : _peers) { + peerDiscoveryDirectory.notifyAddr(peer); + } + } +} diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/peerdiscovery/local/LocalPeerDiscoveryBroadcaster.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/peerdiscovery/local/LocalPeerDiscoveryBroadcaster.java new file mode 100644 index 00000000..d3f77471 --- /dev/null +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/peerdiscovery/local/LocalPeerDiscoveryBroadcaster.java @@ -0,0 +1,102 @@ +package com.usatiuk.dhfs.objects.repository.peerdiscovery.local; + +import com.usatiuk.dhfs.objects.repository.PersistentPeerDataService; +import com.usatiuk.dhfs.objects.repository.peerdiscovery.PeerDiscoveryInfo; +import io.quarkus.arc.properties.IfBuildProperty; +import io.quarkus.runtime.ShutdownEvent; +import io.quarkus.runtime.Startup; +import io.quarkus.scheduler.Scheduled; +import jakarta.annotation.Priority; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.enterprise.event.Observes; +import jakarta.inject.Inject; +import org.eclipse.microprofile.config.inject.ConfigProperty; + +import java.net.*; + +@ApplicationScoped +@IfBuildProperty(name = "dhfs.local-discovery", stringValue = "true") +public class LocalPeerDiscoveryBroadcaster { + @Inject + PersistentPeerDataService persistentPeerDataService; + + @ConfigProperty(name = "quarkus.http.port") + int ourPort; + + @ConfigProperty(name = "quarkus.http.ssl-port") + int ourSecurePort; + + @ConfigProperty(name = "dhfs.objects.peerdiscovery.port") + int broadcastPort; + + @ConfigProperty(name = "dhfs.objects.peerdiscovery.broadcast") + boolean enabled; + + private DatagramSocket _socket; + + @Startup + void init() throws SocketException { + if (!enabled) { + return; + } + _socket = new DatagramSocket(); + _socket.setBroadcast(true); + } + + void shutdown(@Observes @Priority(10) ShutdownEvent event) { + if (!enabled) { + return; + } + _socket.close(); + } + + @Scheduled(every = "${dhfs.objects.peerdiscovery.interval}", concurrentExecution = Scheduled.ConcurrentExecution.SKIP) + public void broadcast() throws Exception { + if (!enabled) { + return; + } + var sendData = PeerDiscoveryInfo.newBuilder() + .setUuid(persistentPeerDataService.getSelfUuid().toString()) + .setPort(ourPort) + .setSecurePort(ourSecurePort) + .build(); + + var sendBytes = sendData.toByteArray(); + + DatagramPacket sendPacket + = new DatagramPacket(sendBytes, sendBytes.length, + InetAddress.getByName("255.255.255.255"), broadcastPort); + + _socket.send(sendPacket); + + var interfaces = NetworkInterface.getNetworkInterfaces(); + while (interfaces.hasMoreElements()) { + NetworkInterface networkInterface = interfaces.nextElement(); + + try { + if (networkInterface.isLoopback() || !networkInterface.isUp()) { + continue; + } + } catch (Exception e) { + continue; + } + + for (InterfaceAddress interfaceAddress : networkInterface.getInterfaceAddresses()) { + InetAddress broadcast = interfaceAddress.getBroadcast(); + if (broadcast == null) { + continue; + } + + try { + sendPacket = new DatagramPacket(sendBytes, sendBytes.length, broadcast, broadcastPort); + _socket.send(sendPacket); + } catch (Exception ignored) { + continue; + } + +// Log.trace(getClass().getName() + "Broadcast sent to: " + broadcast.getHostAddress() +// + ", at: " + networkInterface.getDisplayName()); + } + } + } +} diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/peerdiscovery/local/LocalPeerDiscoveryClient.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/peerdiscovery/local/LocalPeerDiscoveryClient.java new file mode 100644 index 00000000..f5ce9d6b --- /dev/null +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/peerdiscovery/local/LocalPeerDiscoveryClient.java @@ -0,0 +1,91 @@ +package com.usatiuk.dhfs.objects.repository.peerdiscovery.local; + +import com.google.protobuf.InvalidProtocolBufferException; +import com.usatiuk.dhfs.objects.PeerId; +import com.usatiuk.dhfs.objects.repository.peerdiscovery.IpPeerAddress; +import com.usatiuk.dhfs.objects.repository.peerdiscovery.PeerAddressType; +import com.usatiuk.dhfs.objects.repository.peerdiscovery.PeerDiscoveryDirectory; +import com.usatiuk.dhfs.objects.repository.peerdiscovery.PeerDiscoveryInfo; +import io.quarkus.arc.properties.IfBuildProperty; +import io.quarkus.logging.Log; +import io.quarkus.runtime.ShutdownEvent; +import io.quarkus.runtime.Startup; +import jakarta.annotation.Priority; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.enterprise.event.Observes; +import jakarta.inject.Inject; +import org.eclipse.microprofile.config.inject.ConfigProperty; + +import java.net.*; +import java.nio.ByteBuffer; +import java.util.UUID; + +@ApplicationScoped +@IfBuildProperty(name = "dhfs.local-discovery", stringValue = "true") +public class LocalPeerDiscoveryClient { + @Inject + PeerDiscoveryDirectory peerDiscoveryDirectory; + + private Thread _clientThread; + + private DatagramSocket _socket; + + @ConfigProperty(name = "dhfs.objects.peerdiscovery.broadcast") + boolean enabled; + + @Startup + void init() throws SocketException, UnknownHostException { + if (!enabled) { + return; + } + _socket = new DatagramSocket(42069, InetAddress.getByName("0.0.0.0")); + _socket.setBroadcast(true); + + _clientThread = new Thread(this::client); + _clientThread.setName("LocalPeerDiscoveryClient"); + _clientThread.start(); + } + + void shutdown(@Observes @Priority(10) ShutdownEvent event) throws InterruptedException { + if (!enabled) { + return; + } + _socket.close(); + _clientThread.interrupt(); + _clientThread.interrupt(); + while (_clientThread.isAlive()) { + try { + _clientThread.join(); + } catch (InterruptedException ignored) { + } + } + } + + private void client() { + while (!Thread.interrupted() && !_socket.isClosed()) { + try { + byte[] buf = new byte[10000]; + DatagramPacket packet = new DatagramPacket(buf, buf.length); + _socket.receive(packet); + + try { + var got = PeerDiscoveryInfo.parseFrom(ByteBuffer.wrap(buf, 0, packet.getLength())); + peerDiscoveryDirectory.notifyAddr( + new IpPeerAddress( + PeerId.of(UUID.fromString(got.getUuid())), + PeerAddressType.LAN, + packet.getAddress(), + got.getPort(), + got.getSecurePort() + ) + ); + } catch (InvalidProtocolBufferException e) { + continue; + } + } catch (Exception ex) { + Log.error(ex); + } + } + Log.info("PeerDiscoveryClient stopped"); + } +} diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/peersync/PeerInfo.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/peersync/PeerInfo.java new file mode 100644 index 00000000..caa45d3a --- /dev/null +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/peersync/PeerInfo.java @@ -0,0 +1,37 @@ +package com.usatiuk.dhfs.objects.repository.peersync; + +import com.usatiuk.dhfs.objects.JDataRefcounted; +import com.usatiuk.dhfs.objects.JDataRemote; +import com.usatiuk.dhfs.objects.JObjectKey; +import com.usatiuk.dhfs.objects.PeerId; +import com.usatiuk.dhfs.objects.repository.CertificateTools; +import org.pcollections.HashTreePSet; +import org.pcollections.PCollection; + +import java.security.cert.CertificateException; +import java.security.cert.X509Certificate; + +public record PeerInfo(JObjectKey key, PCollection refsFrom, boolean frozen, PeerId id, + byte[] cert) implements JDataRefcounted, JDataRemote { + public PeerInfo(PeerId id, byte[] cert) { + this(id.toJObjectKey(), HashTreePSet.empty(), false, id, cert); + } + + @Override + public JDataRefcounted withRefsFrom(PCollection refs) { + return new PeerInfo(key, refs, frozen, id, cert); + } + + @Override + public JDataRefcounted withFrozen(boolean frozen) { + return new PeerInfo(key, refsFrom, frozen, id, cert); + } + + public X509Certificate parsedCert() { + try { + return CertificateTools.certFromBytes(cert); + } catch (CertificateException e) { + throw new RuntimeException(e); + } + } +} diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/peersync/PeerInfoService.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/peersync/PeerInfoService.java new file mode 100644 index 00000000..c83fe311 --- /dev/null +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/peersync/PeerInfoService.java @@ -0,0 +1,76 @@ +package com.usatiuk.dhfs.objects.repository.peersync; + +import com.usatiuk.dhfs.objects.JObjectKey; +import com.usatiuk.dhfs.objects.PeerId; +import com.usatiuk.dhfs.objects.TransactionManager; +import com.usatiuk.dhfs.objects.jkleppmanntree.JKleppmannTreeManager; +import com.usatiuk.dhfs.objects.jkleppmanntree.structs.JKleppmannTreeNode; +import com.usatiuk.dhfs.objects.repository.PersistentPeerDataService; +import com.usatiuk.dhfs.objects.repository.peersync.structs.JKleppmannTreeNodeMetaPeer; +import com.usatiuk.dhfs.objects.transaction.Transaction; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; + +import java.util.List; +import java.util.Optional; + +@ApplicationScoped +public class PeerInfoService { + @Inject + Transaction curTx; + @Inject + TransactionManager jObjectTxManager; + @Inject + JKleppmannTreeManager jKleppmannTreeManager; + @Inject + PersistentPeerDataService persistentPeerDataService; + + private JKleppmannTreeManager.JKleppmannTree getTree() { + return jKleppmannTreeManager.getTree(JObjectKey.of("peers")); + } + + public Optional getPeerInfo(PeerId peer) { + return jObjectTxManager.run(() -> { + var gotKey = getTree().traverse(List.of(peer.toString())); + if (gotKey == null) { + return Optional.empty(); + } + return curTx.get(JKleppmannTreeNode.class, gotKey).flatMap(node -> { + var meta = (JKleppmannTreeNodeMetaPeer) node.meta(); + return curTx.get(PeerInfo.class, meta.getPeerId()); + }); + }); + } + + public List getPeers() { + return jObjectTxManager.run(() -> { + var gotKey = getTree().traverse(List.of()); + return curTx.get(JKleppmannTreeNode.class, gotKey).map( + node -> node.children().keySet().stream() + .map(PeerId::of).map(this::getPeerInfo) + .map(Optional::get).toList()) + .orElseThrow(); + }); + } + + public List getPeersNoSelf() { + return jObjectTxManager.run(() -> { + var gotKey = getTree().traverse(List.of()); + return curTx.get(JKleppmannTreeNode.class, gotKey).map( + node -> node.children().keySet().stream() + .map(PeerId::of).map(this::getPeerInfo) + .map(Optional::get).filter( + peerInfo -> !peerInfo.id().equals(persistentPeerDataService.getSelfUuid())).toList()) + .orElseThrow(); + }); + } + + public void putPeer(PeerId id, byte[] cert) { + jObjectTxManager.run(() -> { + var parent = getTree().traverse(List.of()); + var newPeerInfo = new PeerInfo(id, cert); + curTx.put(newPeerInfo); + getTree().move(parent, new JKleppmannTreeNodeMetaPeer(newPeerInfo.id()), getTree().getNewNodeId()); + }); + } +} diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/peersync/api/ApiPeerInfo.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/peersync/api/ApiPeerInfo.java new file mode 100644 index 00000000..e84f4d83 --- /dev/null +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/peersync/api/ApiPeerInfo.java @@ -0,0 +1,4 @@ +package com.usatiuk.dhfs.objects.repository.peersync.api; + +public record ApiPeerInfo(String selfUuid, String cert) { +} diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/peersync/api/PeerSyncApi.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/peersync/api/PeerSyncApi.java new file mode 100644 index 00000000..f3d07189 --- /dev/null +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/peersync/api/PeerSyncApi.java @@ -0,0 +1,26 @@ +package com.usatiuk.dhfs.objects.repository.peersync.api; + +import com.usatiuk.dhfs.objects.repository.PersistentPeerDataService; +import jakarta.inject.Inject; +import jakarta.ws.rs.GET; +import jakarta.ws.rs.Path; + +import java.security.cert.CertificateEncodingException; +import java.util.Base64; + +@Path("/peer-info") +public class PeerSyncApi { + @Inject + PersistentPeerDataService persistentPeerDataService; + + @Path("self") + @GET + public ApiPeerInfo getSelfInfo() { + try { + return new ApiPeerInfo(persistentPeerDataService.getSelfUuid().toString(), + Base64.getEncoder().encodeToString(persistentPeerDataService.getSelfCertificate().getEncoded())); + } catch (CertificateEncodingException e) { + throw new RuntimeException(e); + } + } +} diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/peersync/api/PeerSyncApiClient.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/peersync/api/PeerSyncApiClient.java new file mode 100644 index 00000000..49a04ac6 --- /dev/null +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/peersync/api/PeerSyncApiClient.java @@ -0,0 +1,11 @@ +package com.usatiuk.dhfs.objects.repository.peersync.api; + +import jakarta.ws.rs.GET; +import jakarta.ws.rs.Path; + +@Path("/peer-info") +public interface PeerSyncApiClient { + @Path("self") + @GET + ApiPeerInfo getSelfInfo(); +} diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/peersync/api/PeerSyncApiClientDynamic.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/peersync/api/PeerSyncApiClientDynamic.java new file mode 100644 index 00000000..c09262e9 --- /dev/null +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/peersync/api/PeerSyncApiClientDynamic.java @@ -0,0 +1,28 @@ +package com.usatiuk.dhfs.objects.repository.peersync.api; + +import com.usatiuk.dhfs.objects.repository.peerdiscovery.IpPeerAddress; +import com.usatiuk.dhfs.objects.repository.peerdiscovery.PeerAddress; +import io.quarkus.rest.client.reactive.QuarkusRestClientBuilder; +import jakarta.enterprise.context.ApplicationScoped; + +import java.net.URI; +import java.util.concurrent.TimeUnit; + +@ApplicationScoped +public class PeerSyncApiClientDynamic { + public ApiPeerInfo getSelfInfo(PeerAddress addr) { + return switch (addr) { + case IpPeerAddress ipAddr -> getSelfInfo(ipAddr.address().getHostAddress(), ipAddr.port()); + default -> throw new IllegalArgumentException("Unsupported peer address type: " + addr.getClass()); + }; + } + + private ApiPeerInfo getSelfInfo(String address, int port) { + var client = QuarkusRestClientBuilder.newBuilder() + .baseUri(URI.create("http://" + address + ":" + port)) + .connectTimeout(5, TimeUnit.SECONDS) + .readTimeout(5, TimeUnit.SECONDS) + .build(PeerSyncApiClient.class); + return client.getSelfInfo(); + } +} diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/peersync/structs/JKleppmannTreeNodeMetaPeer.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/peersync/structs/JKleppmannTreeNodeMetaPeer.java new file mode 100644 index 00000000..a9ea1800 --- /dev/null +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/peersync/structs/JKleppmannTreeNodeMetaPeer.java @@ -0,0 +1,41 @@ +package com.usatiuk.dhfs.objects.repository.peersync.structs; + +import com.usatiuk.dhfs.objects.JObjectKey; +import com.usatiuk.dhfs.objects.PeerId; +import com.usatiuk.dhfs.objects.jkleppmanntree.structs.JKleppmannTreeNodeMeta; + +import java.util.Objects; + +//@ProtoMirror(JKleppmannTreeNodeMetaFileP.class) +public class JKleppmannTreeNodeMetaPeer extends JKleppmannTreeNodeMeta { + private final JObjectKey _peerId; + + public JKleppmannTreeNodeMetaPeer(PeerId id) { + super(id.toString()); + _peerId = id.toJObjectKey(); + } + + public JObjectKey getPeerId() { + return _peerId; + } + + @Override + public JKleppmannTreeNodeMeta withName(String name) { + assert false; + return this; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + if (!super.equals(o)) return false; + JKleppmannTreeNodeMetaPeer that = (JKleppmannTreeNodeMetaPeer) o; + return Objects.equals(_peerId, that._peerId); + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), _peerId); + } +} diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/peertrust/PeerRolesAugmentor.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/peertrust/PeerRolesAugmentor.java new file mode 100644 index 00000000..d2e1c4bf --- /dev/null +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/peertrust/PeerRolesAugmentor.java @@ -0,0 +1,51 @@ +package com.usatiuk.dhfs.objects.repository.peertrust; + +import com.usatiuk.dhfs.objects.PeerId; +import com.usatiuk.dhfs.objects.repository.peersync.PeerInfoService; +import io.quarkus.logging.Log; +import io.quarkus.security.credential.CertificateCredential; +import io.quarkus.security.identity.AuthenticationRequestContext; +import io.quarkus.security.identity.SecurityIdentity; +import io.quarkus.security.identity.SecurityIdentityAugmentor; +import io.quarkus.security.runtime.QuarkusSecurityIdentity; +import io.smallrye.mutiny.Uni; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; + +import java.util.function.Supplier; + +@ApplicationScoped +public class PeerRolesAugmentor implements SecurityIdentityAugmentor { + @Inject + PeerInfoService peerInfoService; + + @Override + public Uni augment(SecurityIdentity identity, AuthenticationRequestContext context) { + return Uni.createFrom().item(build(identity)); + } + + private Supplier build(SecurityIdentity identity) { + if (identity.isAnonymous()) { + return () -> identity; + } else { + QuarkusSecurityIdentity.Builder builder = QuarkusSecurityIdentity.builder(identity); + + var uuid = identity.getPrincipal().getName().substring(3); + + try { + var entry = peerInfoService.getPeerInfo(PeerId.of(uuid)); + + if (!entry.get().parsedCert().equals(identity.getCredential(CertificateCredential.class).getCertificate())) { + Log.error("Certificate mismatch for " + uuid); + return () -> identity; + } + + builder.addRole("cluster-member"); + return builder::build; + } catch (Exception e) { + Log.error("Error when checking certificate for " + uuid, e); + return () -> identity; + } + } + } +} diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/peertrust/PeerTrustManager.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/peertrust/PeerTrustManager.java new file mode 100644 index 00000000..26573abb --- /dev/null +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/peertrust/PeerTrustManager.java @@ -0,0 +1,69 @@ +package com.usatiuk.dhfs.objects.repository.peertrust; + +import com.usatiuk.dhfs.objects.repository.peersync.PeerInfo; +import io.quarkus.logging.Log; +import jakarta.enterprise.context.ApplicationScoped; +import org.apache.commons.lang3.tuple.Pair; + +import javax.net.ssl.TrustManager; +import javax.net.ssl.TrustManagerFactory; +import javax.net.ssl.X509TrustManager; +import java.security.KeyStore; +import java.security.NoSuchAlgorithmException; +import java.security.cert.CertificateException; +import java.security.cert.X509Certificate; +import java.util.Collection; +import java.util.concurrent.atomic.AtomicReference; + +@ApplicationScoped +public class PeerTrustManager implements X509TrustManager { + private final AtomicReference trustManager = new AtomicReference<>(); + + @Override + public void checkClientTrusted(X509Certificate[] chain, String authType) throws CertificateException { + trustManager.get().checkClientTrusted(chain, authType); + } + + @Override + public void checkServerTrusted(X509Certificate[] chain, String authType) throws CertificateException { + trustManager.get().checkServerTrusted(chain, authType); + } + + @Override + public X509Certificate[] getAcceptedIssuers() { + return trustManager.get().getAcceptedIssuers(); + } + + public synchronized void reloadTrustManagerHosts(Collection hosts) { + try { + Log.info("Trying to reload trust manager: " + hosts.size() + " known hosts"); + reloadTrustManager(hosts.stream().map(hostInfo -> + Pair.of(hostInfo.id().toString(), hostInfo.parsedCert())).toList()); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + private synchronized void reloadTrustManager(Collection> certs) throws Exception { + KeyStore ts = KeyStore.getInstance(KeyStore.getDefaultType()); + ts.load(null, null); + + for (var cert : certs) { + ts.setCertificateEntry(cert.getLeft(), cert.getRight()); + } + + TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm()); + tmf.init(ts); + + TrustManager[] tms = tmf.getTrustManagers(); + for (var tm : tms) { + if (tm instanceof X509TrustManager) { + trustManager.set((X509TrustManager) tm); + return; + } + } + + throw new NoSuchAlgorithmException("No X509TrustManager in TrustManagerFactory"); + } + +} \ No newline at end of file diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/peertrust/PeerTrustServerCustomizer.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/peertrust/PeerTrustServerCustomizer.java new file mode 100644 index 00000000..167465f6 --- /dev/null +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/peertrust/PeerTrustServerCustomizer.java @@ -0,0 +1,44 @@ +package com.usatiuk.dhfs.objects.repository.peertrust; + + +import com.usatiuk.dhfs.objects.repository.PersistentPeerDataService; +import io.quarkus.vertx.http.HttpServerOptionsCustomizer; +import io.vertx.core.http.HttpServerOptions; +import io.vertx.core.net.KeyCertOptions; +import io.vertx.core.net.TrustOptions; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; + +import javax.net.ssl.KeyManagerFactory; +import java.security.KeyStore; +import java.security.cert.Certificate; + +@ApplicationScoped +public class PeerTrustServerCustomizer implements HttpServerOptionsCustomizer { + + @Inject + PeerTrustManager peerTrustManager; + + @Inject + PersistentPeerDataService persistentPeerDataService; + + @Override + public void customizeHttpsServer(HttpServerOptions options) { + try { + KeyStore ks = KeyStore.getInstance(KeyStore.getDefaultType()); + ks.load(null, null); + + ks.setKeyEntry("sslkey", + persistentPeerDataService.getSelfKeypair().getPrivate(), null, + new Certificate[]{persistentPeerDataService.getSelfCertificate()}); + + KeyManagerFactory keyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm()); + keyManagerFactory.init(ks, null); + + options.setKeyCertOptions(KeyCertOptions.wrap(keyManagerFactory)); + options.setTrustOptions(TrustOptions.wrap(peerTrustManager)); + } catch (Exception e) { + throw new RuntimeException("Error configuring https: ", e); + } + } +} diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/webapi/AvailablePeerInfo.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/webapi/AvailablePeerInfo.java new file mode 100644 index 00000000..0dbf2687 --- /dev/null +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/webapi/AvailablePeerInfo.java @@ -0,0 +1,4 @@ +package com.usatiuk.dhfs.objects.repository.webapi; + +public record AvailablePeerInfo(String uuid) { +} diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/webapi/KnownPeerDelete.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/webapi/KnownPeerDelete.java new file mode 100644 index 00000000..2d646474 --- /dev/null +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/webapi/KnownPeerDelete.java @@ -0,0 +1,4 @@ +package com.usatiuk.dhfs.objects.repository.webapi; + +public record KnownPeerDelete(String uuid) { +} diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/webapi/KnownPeerInfo.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/webapi/KnownPeerInfo.java new file mode 100644 index 00000000..5fbd9eb7 --- /dev/null +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/webapi/KnownPeerInfo.java @@ -0,0 +1,4 @@ +package com.usatiuk.dhfs.objects.repository.webapi; + +public record KnownPeerInfo(String uuid) { +} diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/webapi/KnownPeerPut.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/webapi/KnownPeerPut.java new file mode 100644 index 00000000..f1e109f8 --- /dev/null +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/webapi/KnownPeerPut.java @@ -0,0 +1,4 @@ +package com.usatiuk.dhfs.objects.repository.webapi; + +public record KnownPeerPut(String uuid) { +} diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/webapi/ManagementApi.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/webapi/ManagementApi.java new file mode 100644 index 00000000..344ef33f --- /dev/null +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/webapi/ManagementApi.java @@ -0,0 +1,45 @@ +package com.usatiuk.dhfs.objects.repository.webapi; + +import com.usatiuk.dhfs.objects.PeerId; +import com.usatiuk.dhfs.objects.repository.PeerManager; +import com.usatiuk.dhfs.objects.repository.peersync.PeerInfoService; +import jakarta.inject.Inject; +import jakarta.ws.rs.DELETE; +import jakarta.ws.rs.GET; +import jakarta.ws.rs.PUT; +import jakarta.ws.rs.Path; + +import java.util.Collection; +import java.util.List; + +@Path("/objects-manage") +public class ManagementApi { + @Inject + PeerInfoService peerInfoService; + @Inject + PeerManager peerManager; + + @Path("known-peers") + @GET + public List knownPeers() { + return peerInfoService.getPeers().stream().map(peerInfo -> new KnownPeerInfo(peerInfo.id().toString())).toList(); + } + + @Path("known-peers") + @PUT + public void addPeer(KnownPeerPut knownPeerPut) { + peerManager.addRemoteHost(PeerId.of(knownPeerPut.uuid())); + } + + @Path("known-peers") + @DELETE + public void deletePeer(KnownPeerDelete knownPeerDelete) { +// peerManager.removeRemoteHost(PeerId.of(knownPeerPut.uuid())); + } + + @Path("available-peers") + @GET + public Collection availablePeers() { + return peerManager.getSeenButNotAddedHosts(); + } +} diff --git a/dhfs-parent/server/src/main/resources/application.properties b/dhfs-parent/server/src/main/resources/application.properties index aacd8c29..bbf3bab4 100644 --- a/dhfs-parent/server/src/main/resources/application.properties +++ b/dhfs-parent/server/src/main/resources/application.properties @@ -1,6 +1,7 @@ quarkus.grpc.server.use-separate-server=false dhfs.objects.peerdiscovery.port=42069 -dhfs.objects.peerdiscovery.interval=5000 +dhfs.objects.peerdiscovery.interval=5s +dhfs.objects.peerdiscovery.broadcast=true dhfs.objects.sync.timeout=30 dhfs.objects.sync.ping.timeout=5 dhfs.objects.invalidation.threads=4 @@ -31,6 +32,7 @@ dhfs.objects.ref-processor.threads=4 dhfs.objects.opsender.batch-size=100 dhfs.objects.lock_timeout_secs=2 dhfs.local-discovery=true +dhfs.peerdiscovery.timeout=5000 quarkus.log.category."com.usatiuk.dhfs".min-level=TRACE quarkus.log.category."com.usatiuk.dhfs".level=TRACE quarkus.http.insecure-requests=enabled diff --git a/dhfs-parent/server/src/test/java/com/usatiuk/dhfs/TempDataProfile.java b/dhfs-parent/server/src/test/java/com/usatiuk/dhfs/TempDataProfile.java index 8b075b57..e5a9f59b 100644 --- a/dhfs-parent/server/src/test/java/com/usatiuk/dhfs/TempDataProfile.java +++ b/dhfs-parent/server/src/test/java/com/usatiuk/dhfs/TempDataProfile.java @@ -22,7 +22,6 @@ abstract public class TempDataProfile implements QuarkusTestProfile { } var ret = new HashMap(); ret.put("dhfs.objects.persistence.files.root", tempDirWithPrefix.resolve("dhfs_root_test").toString()); - ret.put("dhfs.objects.root", tempDirWithPrefix.resolve("dhfs_root_d_test").toString()); ret.put("dhfs.fuse.root", tempDirWithPrefix.resolve("dhfs_fuse_root_test").toString()); getConfigOverrides(ret); return ret; diff --git a/dhfs-parent/server/src/test/java/com/usatiuk/dhfs/TestDataCleaner.java b/dhfs-parent/server/src/test/java/com/usatiuk/dhfs/TestDataCleaner.java index 2a6979a6..b3659d01 100644 --- a/dhfs-parent/server/src/test/java/com/usatiuk/dhfs/TestDataCleaner.java +++ b/dhfs-parent/server/src/test/java/com/usatiuk/dhfs/TestDataCleaner.java @@ -17,13 +17,10 @@ import java.util.Objects; public class TestDataCleaner { @ConfigProperty(name = "dhfs.objects.persistence.files.root") String tempDirectory; - @ConfigProperty(name = "dhfs.objects.root") - String tempDirectoryIdx; void init(@Observes @Priority(1) StartupEvent event) throws IOException { try { purgeDirectory(Path.of(tempDirectory).toFile()); - purgeDirectory(Path.of(tempDirectoryIdx).toFile()); } catch (Exception ignored) { Log.warn("Couldn't cleanup test data on init"); } @@ -31,7 +28,6 @@ public class TestDataCleaner { void shutdown(@Observes @Priority(1000000000) ShutdownEvent event) throws IOException { purgeDirectory(Path.of(tempDirectory).toFile()); - purgeDirectory(Path.of(tempDirectoryIdx).toFile()); } void purgeDirectory(File dir) { diff --git a/webui/src/api/dto.ts b/webui/src/api/dto.ts index ac35aa0a..dd90c71a 100644 --- a/webui/src/api/dto.ts +++ b/webui/src/api/dto.ts @@ -39,8 +39,8 @@ export type TTokenToResp = z.infer; // AvailablePeerInfo export const AvailablePeerInfoTo = z.object({ uuid: z.string(), - addr: z.string(), - port: z.number(), + // addr: z.string(), + // port: z.number(), }); export type TAvailablePeerInfoTo = z.infer;