simple peer connectivity

This commit is contained in:
2025-01-28 07:53:45 +01:00
parent e34225eb0a
commit 55ea9ddc44
45 changed files with 2127 additions and 13 deletions

View File

@@ -4,5 +4,4 @@ dhfs.objects.lru.limit=134217728
dhfs.objects.lru.print-stats=true
dhfs.objects.lock_timeout_secs=15
dhfs.objects.persistence.files.root=${HOME}/dhfs_default/data/objs
dhfs.objects.root=${HOME}/dhfs_default/data/stuff
quarkus.package.jar.decompiler.enabled=true

View File

@@ -14,7 +14,7 @@ import java.nio.file.Paths;
@ApplicationScoped
public class ShutdownChecker {
private static final String dataFileName = "running";
@ConfigProperty(name = "dhfs.objects.root")
@ConfigProperty(name = "dhfs.objects.persistence.files.root")
String dataRoot;
boolean _cleanShutdown = true;
boolean _initialized = false;

View File

@@ -0,0 +1,4 @@
package com.usatiuk.dhfs.objects;
public interface JDataRemote {
}

View File

@@ -0,0 +1,23 @@
package com.usatiuk.dhfs.objects;
import java.io.Serializable;
import java.util.UUID;
public record PeerId(UUID id) implements Serializable {
public static PeerId of(UUID id) {
return new PeerId(id);
}
public static PeerId of(String id) {
return new PeerId(UUID.fromString(id));
}
@Override
public String toString() {
return id.toString();
}
public JObjectKey toJObjectKey() {
return JObjectKey.of(id.toString());
}
}

View File

@@ -0,0 +1,67 @@
package com.usatiuk.dhfs.objects;
import org.pcollections.PCollection;
import org.pcollections.PMap;
import org.pcollections.PSet;
import java.util.Collection;
import java.util.List;
public record RemoteObject<T>(
JObjectKey key, PCollection<JObjectKey> refsFrom, boolean frozen,
PMap<PeerId, Long> knownRemoteVersions,
Class<? extends JData> knownType,
PSet<PeerId> confirmedDeletes,
boolean seen,
PMap<PeerId, Long> changelog,
boolean haveLocal
) implements JDataRefcounted {
@Override
public RemoteObject<T> withRefsFrom(PCollection<JObjectKey> refs) {
return new RemoteObject<>(key, refs, frozen, knownRemoteVersions, knownType, confirmedDeletes, seen, changelog, haveLocal);
}
@Override
public RemoteObject<T> withFrozen(boolean frozen) {
return new RemoteObject<>(key, refsFrom, frozen, knownRemoteVersions, knownType, confirmedDeletes, seen, changelog, haveLocal);
}
public RemoteObject<T> withKnownRemoteVersions(PMap<PeerId, Long> knownRemoteVersions) {
return new RemoteObject<>(key, refsFrom, frozen, knownRemoteVersions, knownType, confirmedDeletes, seen, changelog, haveLocal);
}
public RemoteObject<T> withKnownType(Class<? extends JData> knownType) {
return new RemoteObject<>(key, refsFrom, frozen, knownRemoteVersions, knownType, confirmedDeletes, seen, changelog, haveLocal);
}
public RemoteObject<T> withConfirmedDeletes(PSet<PeerId> confirmedDeletes) {
return new RemoteObject<>(key, refsFrom, frozen, knownRemoteVersions, knownType, confirmedDeletes, seen, changelog, haveLocal);
}
public RemoteObject<T> withSeen(boolean seen) {
return new RemoteObject<>(key, refsFrom, frozen, knownRemoteVersions, knownType, confirmedDeletes, seen, changelog, haveLocal);
}
public RemoteObject<T> withChangelog(PMap<PeerId, Long> changelog) {
return new RemoteObject<>(key, refsFrom, frozen, knownRemoteVersions, knownType, confirmedDeletes, seen, changelog, haveLocal);
}
public RemoteObject<T> withHaveLocal(boolean haveLocal) {
return new RemoteObject<>(key, refsFrom, frozen, knownRemoteVersions, knownType, confirmedDeletes, seen, changelog, haveLocal);
}
public static JObjectKey keyFrom(JObjectKey key) {
return new JObjectKey(key + "_remote");
}
public JObjectKey localKey() {
if (!haveLocal) throw new IllegalStateException("No local key");
return JObjectKey.of(key.name().substring(0, key.name().length() - "_remote".length()));
}
@Override
public Collection<JObjectKey> collectRefsTo() {
if (haveLocal) return List.of(localKey());
return List.of();
}
}

View File

@@ -0,0 +1,31 @@
package com.usatiuk.dhfs.objects;
import com.usatiuk.dhfs.objects.transaction.LockingStrategy;
import com.usatiuk.dhfs.objects.transaction.Transaction;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import org.apache.commons.lang3.NotImplementedException;
import java.util.Optional;
@ApplicationScoped
public class RemoteTransaction {
@Inject
Transaction curTx;
public long getId() {
return curTx.getId();
}
public <T extends JDataRemote> Optional<T> get(Class<T> type, JObjectKey key, LockingStrategy strategy) {
throw new NotImplementedException();
}
public <T extends JDataRemote> void put(JData obj) {
throw new NotImplementedException();
}
public <T extends JDataRemote> Optional<T> get(Class<T> type, JObjectKey key) {
return get(type, key, LockingStrategy.OPTIMISTIC);
}
}

View File

@@ -2,6 +2,7 @@ package com.usatiuk.dhfs.objects.jkleppmanntree.structs;
import com.usatiuk.dhfs.objects.JDataRefcounted;
import com.usatiuk.dhfs.objects.JObjectKey;
import com.usatiuk.dhfs.objects.repository.peersync.structs.JKleppmannTreeNodeMetaPeer;
import com.usatiuk.kleppmanntree.OpMove;
import com.usatiuk.kleppmanntree.TreeNode;
import org.pcollections.PCollection;
@@ -61,6 +62,7 @@ public record JKleppmannTreeNode(JObjectKey key, PCollection<JObjectKey> refsFro
switch (meta()) {
case JKleppmannTreeNodeMetaDirectory dir -> Stream.<JObjectKey>of();
case JKleppmannTreeNodeMetaFile file -> Stream.of(file.getFileIno());
case JKleppmannTreeNodeMetaPeer peer -> Stream.of(peer.getPeerId());
default -> throw new IllegalStateException("Unexpected value: " + meta());
}
).collect(Collectors.toUnmodifiableSet());

View File

@@ -6,7 +6,7 @@ import com.usatiuk.kleppmanntree.NodeMeta;
import java.util.Objects;
@ProtoMirror(JKleppmannTreeNodeMetaP.class)
//@ProtoMirror(JKleppmannTreeNodeMetaP.class)
public abstract class JKleppmannTreeNodeMeta implements NodeMeta {
private final String _name;

View File

@@ -3,7 +3,7 @@ package com.usatiuk.dhfs.objects.jkleppmanntree.structs;
import com.usatiuk.autoprotomap.runtime.ProtoMirror;
import com.usatiuk.dhfs.objects.persistence.JKleppmannTreeNodeMetaDirectoryP;
@ProtoMirror(JKleppmannTreeNodeMetaDirectoryP.class)
//@ProtoMirror(JKleppmannTreeNodeMetaDirectoryP.class)
public class JKleppmannTreeNodeMetaDirectory extends JKleppmannTreeNodeMeta {
public JKleppmannTreeNodeMetaDirectory(String name) {
super(name);

View File

@@ -6,7 +6,7 @@ import com.usatiuk.dhfs.objects.persistence.JKleppmannTreeNodeMetaFileP;
import java.util.Objects;
@ProtoMirror(JKleppmannTreeNodeMetaFileP.class)
//@ProtoMirror(JKleppmannTreeNodeMetaFileP.class)
public class JKleppmannTreeNodeMetaFile extends JKleppmannTreeNodeMeta {
private final JObjectKey _fileIno;

View File

@@ -0,0 +1,63 @@
package com.usatiuk.dhfs.objects.repository;
import org.apache.commons.codec.digest.DigestUtils;
import org.bouncycastle.asn1.ASN1ObjectIdentifier;
import org.bouncycastle.asn1.x500.X500Name;
import org.bouncycastle.asn1.x509.BasicConstraints;
import org.bouncycastle.cert.CertIOException;
import org.bouncycastle.cert.jcajce.JcaX509CertificateConverter;
import org.bouncycastle.cert.jcajce.JcaX509v3CertificateBuilder;
import org.bouncycastle.jce.provider.BouncyCastleProvider;
import org.bouncycastle.operator.ContentSigner;
import org.bouncycastle.operator.OperatorCreationException;
import org.bouncycastle.operator.jcajce.JcaContentSignerBuilder;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.math.BigInteger;
import java.security.*;
import java.security.cert.CertificateException;
import java.security.cert.CertificateFactory;
import java.security.cert.X509Certificate;
import java.util.Calendar;
import java.util.Date;
public class CertificateTools {
public static X509Certificate certFromBytes(byte[] bytes) throws CertificateException {
CertificateFactory certFactory = CertificateFactory.getInstance("X.509");
InputStream in = new ByteArrayInputStream(bytes);
return (X509Certificate) certFactory.generateCertificate(in);
}
public static KeyPair generateKeyPair() throws NoSuchAlgorithmException {
KeyPairGenerator keyGen = KeyPairGenerator.getInstance("RSA");
keyGen.initialize(2048); //FIXME:
return keyGen.generateKeyPair();
}
public static X509Certificate generateCertificate(KeyPair keyPair, String subject) throws CertificateException, CertIOException, NoSuchAlgorithmException, OperatorCreationException {
Provider bcProvider = new BouncyCastleProvider();
Security.addProvider(bcProvider);
Date startDate = new Date();
X500Name cnName = new X500Name("CN=" + subject);
BigInteger certSerialNumber = new BigInteger(DigestUtils.sha256(subject));
Calendar calendar = Calendar.getInstance();
calendar.setTime(startDate);
calendar.add(Calendar.YEAR, 999);
Date endDate = calendar.getTime();
ContentSigner contentSigner = new JcaContentSignerBuilder("SHA256WithRSA").build(keyPair.getPrivate());
JcaX509v3CertificateBuilder certBuilder = new JcaX509v3CertificateBuilder(cnName, certSerialNumber, startDate, endDate, cnName, keyPair.getPublic());
BasicConstraints basicConstraints = new BasicConstraints(false);
certBuilder.addExtension(new ASN1ObjectIdentifier("2.5.29.19"), true, basicConstraints);
return new JcaX509CertificateConverter().setProvider(bcProvider).getCertificate(certBuilder.build(contentSigner));
}
}

View File

@@ -0,0 +1,233 @@
package com.usatiuk.dhfs.objects.repository;
import com.usatiuk.dhfs.objects.PeerId;
import com.usatiuk.dhfs.objects.TransactionManager;
import com.usatiuk.dhfs.objects.repository.peerdiscovery.PeerAddress;
import com.usatiuk.dhfs.objects.repository.peerdiscovery.PeerDiscoveryDirectory;
import com.usatiuk.dhfs.objects.repository.peersync.PeerInfo;
import com.usatiuk.dhfs.objects.repository.peersync.PeerInfoService;
import com.usatiuk.dhfs.objects.repository.peersync.api.PeerSyncApiClientDynamic;
import com.usatiuk.dhfs.objects.repository.peertrust.PeerTrustManager;
import com.usatiuk.dhfs.objects.repository.webapi.AvailablePeerInfo;
import com.usatiuk.dhfs.objects.transaction.Transaction;
import io.quarkus.logging.Log;
import io.quarkus.runtime.StartupEvent;
import io.quarkus.scheduler.Scheduled;
import io.smallrye.common.annotation.Blocking;
import jakarta.annotation.Priority;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
import jakarta.inject.Inject;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.*;
@ApplicationScoped
public class PeerManager {
private final ConcurrentMap<PeerId, PeerAddress> _states = new ConcurrentHashMap<>();
// FIXME: Ideally not call them on every ping
private final ArrayList<ConnectionEventListener> _connectedListeners = new ArrayList<>();
private final ArrayList<ConnectionEventListener> _disconnectedListeners = new ArrayList<>();
@Inject
PersistentPeerDataService persistentPeerDataService;
@Inject
PeerInfoService peerInfoService;
@Inject
RpcClientFactory rpcClientFactory;
@Inject
PeerSyncApiClientDynamic peerSyncApiClient;
@Inject
TransactionManager transactionManager;
@Inject
Transaction curTx;
@Inject
PeerTrustManager peerTrustManager;
@ConfigProperty(name = "dhfs.objects.sync.ping.timeout")
long pingTimeout;
@Inject
PeerDiscoveryDirectory peerDiscoveryDirectory;
private ExecutorService _heartbeatExecutor;
// Note: keep priority updated with below
void init(@Observes @Priority(600) StartupEvent event) throws IOException {
_heartbeatExecutor = Executors.newVirtualThreadPerTaskExecutor();
}
@Scheduled(every = "${dhfs.objects.reconnect_interval}", concurrentExecution = Scheduled.ConcurrentExecution.SKIP)
@Blocking
public void tryConnectAll() {
if (_heartbeatExecutor == null) return;
try {
_heartbeatExecutor.invokeAll(peerInfoService.getPeersNoSelf()
.stream()
.<Callable<Void>>map(host -> () -> {
try {
if (isReachable(host))
Log.trace("Heartbeat: " + host);
else
Log.debug("Trying to connect to " + host);
var bestAddr = selectBestAddress(host.id());
if (pingCheck(host, bestAddr))
handleConnectionSuccess(host, bestAddr);
else
handleConnectionError(host);
} catch (Exception e) {
Log.error("Failed to connect to " + host, e);
}
return null;
}).toList(), 30, TimeUnit.SECONDS); //FIXME:
} catch (InterruptedException iex) {
Log.error("Heartbeat was interrupted");
}
}
// Note: registrations should be completed with Priority < 600
public void registerConnectEventListener(ConnectionEventListener listener) {
synchronized (_connectedListeners) {
_connectedListeners.add(listener);
}
}
// Note: registrations should be completed with Priority < 600
public void registerDisconnectEventListener(ConnectionEventListener listener) {
synchronized (_disconnectedListeners) {
_disconnectedListeners.add(listener);
}
}
private void handleConnectionSuccess(PeerInfo host, PeerAddress address) {
boolean wasReachable = isReachable(host);
// boolean shouldSyncObj = persistentPeerDataService.markInitialObjSyncDone(host);
// boolean shouldSyncOp = persistentPeerDataService.markInitialOpSyncDone(host);
//
// if (shouldSyncObj)
// syncHandler.pushInitialResyncObj(host);
// if (shouldSyncOp)
// syncHandler.pushInitialResyncOp(host);
_states.put(host.id(), address);
if (wasReachable) return;
Log.info("Connected to " + host);
// for (var l : _connectedListeners) {
// l.apply(host);
// }
}
public void handleConnectionError(PeerInfo host) {
boolean wasReachable = isReachable(host);
if (wasReachable)
Log.info("Lost connection to " + host);
_states.remove(host.id());
// for (var l : _disconnectedListeners) {
// l.apply(host);
// }
}
// FIXME:
private boolean pingCheck(PeerInfo host, PeerAddress address) {
try {
return rpcClientFactory.withObjSyncClient(host.id(), address, pingTimeout, c -> {
var ret = c.ping(PingRequest.newBuilder().setSelfUuid(persistentPeerDataService.getSelfUuid().toString()).build());
if (!UUID.fromString(ret.getSelfUuid()).equals(host.id().id())) {
throw new IllegalStateException("Ping selfUuid returned " + ret.getSelfUuid() + " but expected " + host.id());
}
return true;
});
} catch (Exception ignored) {
Log.debug("Host " + host + " is unreachable: " + ignored.getMessage() + " " + ignored.getCause());
return false;
}
}
public boolean isReachable(PeerInfo host) {
return _states.containsKey(host.id());
}
public PeerAddress getAddress(PeerId host) {
return _states.get(host);
}
public List<PeerId> getAvailableHosts() {
return _states.keySet().stream().toList();
}
// public List<UUID> getUnavailableHosts() {
// return _transientPeersState.runReadLocked(d -> d.getStates().entrySet().stream()
// .filter(e -> !e.getValue().isReachable())
// .map(Map.Entry::getKey).toList());
// }
// public HostStateSnapshot getHostStateSnapshot() {
// ArrayList<UUID> available = new ArrayList<>();
// ArrayList<UUID> unavailable = new ArrayList<>();
// _transientPeersState.runReadLocked(d -> {
// for (var v : d.getStates().entrySet()) {
// if (v.getValue().isReachable())
// available.add(v.getKey());
// else
// unavailable.add(v.getKey());
// }
// return null;
// }
// );
// return new HostStateSnapshot(available, unavailable);
// }
// public void removeRemoteHost(UUID host) {
// persistentPeerDataService.removeHost(host);
// // Race?
// _transientPeersState.runWriteLocked(d -> {
// d.getStates().remove(host);
// return null;
// });
// }
private PeerAddress selectBestAddress(PeerId host) {
return peerDiscoveryDirectory.getForPeer(host).stream().findFirst().orElseThrow();
}
public void addRemoteHost(PeerId host) {
if (_states.containsKey(host)) {
throw new IllegalStateException("Host " + host + " is already added");
}
transactionManager.run(() -> {
if (peerInfoService.getPeerInfo(host).isPresent())
throw new IllegalStateException("Host " + host + " is already added");
var info = peerSyncApiClient.getSelfInfo(selectBestAddress(host));
var cert = Base64.getDecoder().decode(info.cert());
peerInfoService.putPeer(host, cert);
});
peerTrustManager.reloadTrustManagerHosts(
transactionManager.run(() -> peerInfoService.getPeers().stream().toList())
); //FIXME:
}
public Collection<AvailablePeerInfo> getSeenButNotAddedHosts() {
return transactionManager.run(() -> {
return peerDiscoveryDirectory.getReachablePeers().stream().filter(p -> !peerInfoService.getPeerInfo(p).isPresent())
.map(p -> new AvailablePeerInfo(p.toString())).toList();
});
}
@FunctionalInterface
public interface ConnectionEventListener {
void apply(UUID host);
}
public record HostStateSnapshot(List<UUID> available, List<UUID> unavailable) {
}
}

View File

@@ -0,0 +1,179 @@
package com.usatiuk.dhfs.objects.repository;
import com.usatiuk.dhfs.ShutdownChecker;
import com.usatiuk.dhfs.objects.PeerId;
import com.usatiuk.dhfs.objects.TransactionManager;
import com.usatiuk.dhfs.objects.repository.peersync.PeerInfoService;
import com.usatiuk.dhfs.objects.repository.peertrust.PeerTrustManager;
import com.usatiuk.dhfs.objects.transaction.Transaction;
import io.quarkus.logging.Log;
import io.quarkus.runtime.StartupEvent;
import jakarta.annotation.Priority;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
import jakarta.inject.Inject;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import java.io.IOException;
import java.security.KeyPair;
import java.security.cert.X509Certificate;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
@ApplicationScoped
public class PersistentPeerDataService {
@Inject
PeerTrustManager peerTrustManager;
@Inject
ExecutorService executorService;
@Inject
RpcClientFactory rpcClientFactory;
@Inject
ShutdownChecker shutdownChecker;
@Inject
TransactionManager jObjectTxManager;
@Inject
Transaction curTx;
@Inject
PeerInfoService peerInfoService;
@ConfigProperty(name = "dhfs.peerdiscovery.preset-uuid")
Optional<String> presetUuid;
private PeerId _selfUuid;
private X509Certificate _selfCertificate;
private KeyPair _selfKeyPair;
void init(@Observes @Priority(300) StartupEvent event) throws IOException {
jObjectTxManager.run(() -> {
var selfData = curTx.get(PersistentRemoteHostsData.class, PersistentRemoteHostsData.KEY).orElse(null);
if (selfData != null) {
_selfUuid = selfData.selfUuid();
_selfCertificate = selfData.selfCertificate();
_selfKeyPair = selfData.selfKeyPair();
return;
} else {
_selfUuid = presetUuid.map(s -> PeerId.of(UUID.fromString(s))).orElseGet(() -> PeerId.of(UUID.randomUUID()));
try {
Log.info("Generating a key pair, please wait");
_selfKeyPair = CertificateTools.generateKeyPair();
_selfCertificate = CertificateTools.generateCertificate(_selfKeyPair, _selfUuid.toString());
curTx.put(new PersistentRemoteHostsData(_selfUuid, 0, _selfCertificate, _selfKeyPair));
peerInfoService.putPeer(_selfUuid, _selfCertificate.getEncoded());
} catch (Exception e) {
throw new RuntimeException(e);
}
}
});
peerTrustManager.reloadTrustManagerHosts(peerInfoService.getPeers());
Log.info("Self uuid is: " + _selfUuid.toString());
}
// private void pushPeerUpdates() {
// pushPeerUpdates(null);
// }
// private void pushPeerUpdates(@Nullable JObject<?> obj) {
// if (obj != null)
// Log.info("Scheduling certificate update after " + obj.getMeta().getName() + " was updated");
// executorService.submit(() -> {
// updateCerts();
// invalidationQueueService.pushInvalidationToAll(PeerDirectory.PeerDirectoryObjName);
// for (var p : peerDirectory.get().runReadLocked(JObjectManager.ResolutionStrategy.LOCAL_ONLY, (m, d) -> d.getPeers().stream().toList()))
// invalidationQueueService.pushInvalidationToAll(PersistentPeerInfo.getNameFromUuid(p));
// });
// }
public PeerId getSelfUuid() {
return _selfUuid;
}
public long getUniqueId() {
return jObjectTxManager.run(() -> {
var curData = curTx.get(PersistentRemoteHostsData.class, PersistentRemoteHostsData.KEY).orElse(null);
curTx.put(curData.withSelfCounter(curData.selfCounter() + 1));
return curData.selfCounter();
});
}
// private void updateCerts() {
// try {
// peerDirectory.get().runReadLocked(JObjectManager.ResolutionStrategy.LOCAL_ONLY, (m, d) -> {
// peerTrustManager.reloadTrustManagerHosts(getHostsNoNulls());
// // Fixme:? I don't think it should be needed with custom trust store
// // but it doesn't work?
// rpcClientFactory.dropCache();
// return null;
// });
// } catch (Exception ex) {
// Log.warn("Error when refreshing certificates, will retry: " + ex.getMessage());
// pushPeerUpdates();
// }
// }
public KeyPair getSelfKeypair() {
return _selfKeyPair;
}
public X509Certificate getSelfCertificate() {
return _selfCertificate;
}
// // Returns true if host's initial sync wasn't done before, and marks it as done
// public boolean markInitialOpSyncDone(UUID connectedHost) {
// return jObjectTxManager.executeTx(() -> {
// peerDirectoryLocal.get().rwLock();
// try {
// peerDirectoryLocal.get().local();
// boolean contained = peerDirectoryLocal.get().getData().getInitialOpSyncDone().contains(connectedHost);
//
// if (!contained)
// peerDirectoryLocal.get().local().mutate(new JMutator<PeerDirectoryLocal>() {
// @Override
// public boolean mutate(PeerDirectoryLocal object) {
// object.getInitialOpSyncDone().add(connectedHost);
// return true;
// }
//
// @Override
// public void revert(PeerDirectoryLocal object) {
// object.getInitialOpSyncDone().remove(connectedHost);
// }
// });
// return !contained;
// } finally {
// peerDirectoryLocal.get().rwUnlock();
// }
// });
// }
//
// public boolean markInitialObjSyncDone(UUID connectedHost) {
// return jObjectTxManager.executeTx(() -> {
// peerDirectoryLocal.get().rwLock();
// try {
// peerDirectoryLocal.get().local();
// boolean contained = peerDirectoryLocal.get().getData().getInitialObjSyncDone().contains(connectedHost);
//
// if (!contained)
// peerDirectoryLocal.get().local().mutate(new JMutator<PeerDirectoryLocal>() {
// @Override
// public boolean mutate(PeerDirectoryLocal object) {
// object.getInitialObjSyncDone().add(connectedHost);
// return true;
// }
//
// @Override
// public void revert(PeerDirectoryLocal object) {
// object.getInitialObjSyncDone().remove(connectedHost);
// }
// });
// return !contained;
// } finally {
// peerDirectoryLocal.get().rwUnlock();
// }
// });
// }
}

View File

@@ -0,0 +1,25 @@
package com.usatiuk.dhfs.objects.repository;
import com.usatiuk.dhfs.objects.JData;
import com.usatiuk.dhfs.objects.JObjectKey;
import com.usatiuk.dhfs.objects.PeerId;
import java.io.Serializable;
import java.security.KeyPair;
import java.security.cert.X509Certificate;
public record PersistentRemoteHostsData(PeerId selfUuid,
long selfCounter,
X509Certificate selfCertificate,
KeyPair selfKeyPair) implements JData, Serializable {
public static final JObjectKey KEY = JObjectKey.of("self_peer_data");
@Override
public JObjectKey key() {
return KEY;
}
public PersistentRemoteHostsData withSelfCounter(long selfCounter) {
return new PersistentRemoteHostsData(selfUuid, selfCounter, selfCertificate, selfKeyPair);
}
}

View File

@@ -0,0 +1,174 @@
//package com.usatiuk.dhfs.objects.repository;
//
//import com.google.common.collect.Maps;
//import com.usatiuk.autoprotomap.runtime.ProtoSerializer;
//import com.usatiuk.dhfs.objects.jrepository.*;
//import com.usatiuk.dhfs.objects.persistence.JObjectDataP;
//import com.usatiuk.dhfs.objects.repository.invalidation.InvalidationQueueService;
//import com.usatiuk.dhfs.objects.repository.opsupport.Op;
//import io.grpc.Status;
//import io.grpc.StatusRuntimeException;
//import io.quarkus.logging.Log;
//import jakarta.enterprise.context.ApplicationScoped;
//import jakarta.inject.Inject;
//import org.apache.commons.lang3.tuple.Pair;
//
//import javax.annotation.Nullable;
//import java.util.*;
//import java.util.concurrent.Callable;
//import java.util.concurrent.ConcurrentLinkedDeque;
//import java.util.concurrent.Executors;
//import java.util.stream.Collectors;
//
//@ApplicationScoped
//public class RemoteObjectServiceClient {
// @Inject
// PersistentPeerDataService persistentPeerDataService;
//
// @Inject
// RpcClientFactory rpcClientFactory;
//
// @Inject
// JObjectManager jObjectManager;
//
// @Inject
// SyncHandler syncHandler;
// @Inject
// InvalidationQueueService invalidationQueueService;
// @Inject
// ProtoSerializer<JObjectDataP, JObjectData> dataProtoSerializer;
// @Inject
// ProtoSerializer<OpPushPayload, Op> opProtoSerializer;
// @Inject
// JObjectTxManager jObjectTxManager;
//
// public Pair<ObjectHeader, JObjectDataP> getSpecificObject(UUID host, String name) {
// return rpcClientFactory.withObjSyncClient(host, client -> {
// var reply = client.getObject(GetObjectRequest.newBuilder().setSelfUuid(persistentPeerDataService.getSelfUuid().toString()).setName(name).build());
// return Pair.of(reply.getObject().getHeader(), reply.getObject().getContent());
// });
// }
//
// public JObjectDataP getObject(JObject<?> jObject) {
// jObject.assertRwLock();
//
// var targets = jObject.runReadLocked(JObjectManager.ResolutionStrategy.NO_RESOLUTION, (md, d) -> {
// var ourVersion = md.getOurVersion();
// if (ourVersion >= 1)
// return md.getRemoteCopies().entrySet().stream()
// .filter(entry -> entry.getValue().equals(ourVersion))
// .map(Map.Entry::getKey).toList();
// else
// return persistentPeerDataService.getHostUuids();
// });
//
// if (targets.isEmpty())
// throw new IllegalStateException("No targets for object " + jObject.getMeta().getName());
//
// Log.info("Downloading object " + jObject.getMeta().getName() + " from " + targets.stream().map(UUID::toString).collect(Collectors.joining(", ")));
//
// return rpcClientFactory.withObjSyncClient(targets, client -> {
// var reply = client.getObject(GetObjectRequest.newBuilder().setSelfUuid(persistentPeerDataService.getSelfUuid().toString()).setName(jObject.getMeta().getName()).build());
//
// var receivedMap = new HashMap<UUID, Long>();
// for (var e : reply.getObject().getHeader().getChangelog().getEntriesList()) {
// receivedMap.put(UUID.fromString(e.getHost()), e.getVersion());
// }
//
// return jObjectTxManager.executeTx(() -> {
// return jObject.runWriteLocked(JObjectManager.ResolutionStrategy.NO_RESOLUTION, (md, d, b, v) -> {
// var unexpected = !Objects.equals(
// Maps.filterValues(md.getChangelog(), val -> val != 0),
// Maps.filterValues(receivedMap, val -> val != 0));
//
// if (unexpected) {
// try {
// syncHandler.handleOneUpdate(UUID.fromString(reply.getSelfUuid()), reply.getObject().getHeader());
// } catch (SyncHandler.OutdatedUpdateException ignored) {
// Log.info("Outdated update of " + md.getName() + " from " + reply.getSelfUuid());
// invalidationQueueService.pushInvalidationToOne(UUID.fromString(reply.getSelfUuid()), md.getName()); // True?
// throw new StatusRuntimeException(Status.ABORTED.withDescription("Received outdated object version"));
// } catch (Exception e) {
// Log.error("Received unexpected object version from " + reply.getSelfUuid()
// + " for " + reply.getObject().getHeader().getName() + " and conflict resolution failed", e);
// throw new StatusRuntimeException(Status.ABORTED.withDescription("Received unexpected object version"));
// }
// }
//
// return reply.getObject().getContent();
// });
// });
// });
// }
//
// @Nullable
// public IndexUpdateReply notifyUpdate(JObject<?> obj, UUID host) {
// var builder = IndexUpdatePush.newBuilder().setSelfUuid(persistentPeerDataService.getSelfUuid().toString());
//
// var header = obj
// .runReadLocked(
// obj.getMeta().getKnownClass().isAnnotationPresent(PushResolution.class)
// ? JObjectManager.ResolutionStrategy.LOCAL_ONLY
// : JObjectManager.ResolutionStrategy.NO_RESOLUTION,
// (m, d) -> {
// if (obj.getMeta().isDeleted()) return null;
// if (m.getKnownClass().isAnnotationPresent(PushResolution.class) && d == null)
// Log.warn("Object " + m.getName() + " is marked as PushResolution but no resolution found");
// if (m.getKnownClass().isAnnotationPresent(PushResolution.class))
// return m.toRpcHeader(dataProtoSerializer.serialize(d));
// else
// return m.toRpcHeader();
// });
// if (header == null) return null;
// jObjectTxManager.executeTx(obj::markSeen);
// builder.setHeader(header);
//
// var send = builder.build();
//
// return rpcClientFactory.withObjSyncClient(host, client -> client.indexUpdate(send));
// }
//
// public OpPushReply pushOps(List<Op> ops, String queueName, UUID host) {
// for (Op op : ops) {
// for (var ref : op.getEscapedRefs()) {
// jObjectTxManager.executeTx(() -> {
// jObjectManager.get(ref).ifPresent(JObject::markSeen);
// });
// }
// }
// var builder = OpPushMsg.newBuilder()
// .setSelfUuid(persistentPeerDataService.getSelfUuid().toString())
// .setQueueId(queueName);
// for (var op : ops)
// builder.addMsg(opProtoSerializer.serialize(op));
// return rpcClientFactory.withObjSyncClient(host, client -> client.opPush(builder.build()));
// }
//
// public Collection<CanDeleteReply> canDelete(Collection<UUID> targets, String object, Collection<String> ourReferrers) {
// ConcurrentLinkedDeque<CanDeleteReply> results = new ConcurrentLinkedDeque<>();
// Log.trace("Asking canDelete for " + object + " from " + targets.stream().map(UUID::toString).collect(Collectors.joining(", ")));
// try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
// try {
// executor.invokeAll(targets.stream().<Callable<Void>>map(h -> () -> {
// try {
// var req = CanDeleteRequest.newBuilder()
// .setSelfUuid(persistentPeerDataService.getSelfUuid().toString())
// .setName(object);
// req.addAllOurReferrers(ourReferrers);
// var res = rpcClientFactory.withObjSyncClient(h, client -> client.canDelete(req.build()));
// if (res != null)
// results.add(res);
// } catch (Exception e) {
// Log.debug("Error when asking canDelete for object " + object, e);
// }
// return null;
// }).toList());
// } catch (InterruptedException e) {
// Log.warn("Interrupted waiting for canDelete for object " + object);
// }
// if (!executor.shutdownNow().isEmpty())
// Log.warn("Didn't ask all targets when asking canDelete for " + object);
// }
// return results;
// }
//}

View File

@@ -0,0 +1,191 @@
package com.usatiuk.dhfs.objects.repository;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.quarkus.grpc.GrpcService;
import io.smallrye.common.annotation.Blocking;
import io.smallrye.mutiny.Uni;
import jakarta.annotation.security.RolesAllowed;
import jakarta.inject.Inject;
/// / Note: RunOnVirtualThread hangs somehow
@GrpcService
@RolesAllowed("cluster-member")
public class RemoteObjectServiceServer implements DhfsObjectSyncGrpc {
// @Inject
// SyncHandler syncHandler;
//
// @Inject
// JObjectManager jObjectManager;
//
// @Inject
// PeerManager peerManager;
//
// @Inject
// AutoSyncProcessor autoSyncProcessor;
//
@Inject
PersistentPeerDataService persistentPeerDataService;
//
// @Inject
// InvalidationQueueService invalidationQueueService;
//
// @Inject
// ProtoSerializer<JObjectDataP, JObjectData> dataProtoSerializer;
// @Inject
// ProtoSerializer<OpPushPayload, Op> opProtoSerializer;
//
// @Inject
// OpObjectRegistry opObjectRegistry;
//
// @Inject
// JObjectTxManager jObjectTxManager;
//
// @Override
// @Blocking
// public Uni<GetObjectReply> getObject(GetObjectRequest request) {
// if (request.getSelfUuid().isBlank()) throw new StatusRuntimeException(Status.INVALID_ARGUMENT);
// if (!persistentPeerDataService.existsHost(UUID.fromString(request.getSelfUuid())))
// throw new StatusRuntimeException(Status.UNAUTHENTICATED);
//
// Log.info("<-- getObject: " + request.getName() + " from " + request.getSelfUuid());
//
// var obj = jObjectManager.get(request.getName()).orElseThrow(() -> new StatusRuntimeException(Status.NOT_FOUND));
//
// // Does @Blocking break this?
// return Uni.createFrom().emitter(emitter -> {
// var replyObj = jObjectTxManager.executeTx(() -> {
// // Obj.markSeen before markSeen of its children
// obj.markSeen();
// return obj.runReadLocked(JObjectManager.ResolutionStrategy.LOCAL_ONLY, (meta, data) -> {
// if (meta.isOnlyLocal())
// throw new StatusRuntimeExceptionNoStacktrace(Status.INVALID_ARGUMENT.withDescription("Trying to get local-only object"));
// if (data == null) {
// Log.info("<-- getObject FAIL: " + request.getName() + " from " + request.getSelfUuid());
// throw new StatusRuntimeException(Status.ABORTED.withDescription("Not available locally"));
// }
// data.extractRefs().forEach(ref ->
// jObjectManager.get(ref)
// .orElseThrow(() -> new IllegalStateException("Non-hydrated refs for local object?"))
// .markSeen());
//
// return ApiObject.newBuilder()
// .setHeader(obj.getMeta().toRpcHeader())
// .setContent(dataProtoSerializer.serialize(obj.getData())).build();
// });
// });
// var ret = GetObjectReply.newBuilder()
// .setSelfUuid(persistentPeerDataService.getSelfUuid().toString())
// .setObject(replyObj).build();
// // TODO: Could this cause problems if we wait for too long?
// obj.commitFenceAsync(() -> emitter.complete(ret));
// });
// }
//
// @Override
// @Blocking
// public Uni<CanDeleteReply> canDelete(CanDeleteRequest request) {
// if (request.getSelfUuid().isBlank()) throw new StatusRuntimeException(Status.INVALID_ARGUMENT);
// if (!persistentPeerDataService.existsHost(UUID.fromString(request.getSelfUuid())))
// throw new StatusRuntimeException(Status.UNAUTHENTICATED);
//
// Log.info("<-- canDelete: " + request.getName() + " from " + request.getSelfUuid());
//
// var builder = CanDeleteReply.newBuilder();
//
// var obj = jObjectManager.get(request.getName());
//
// builder.setSelfUuid(persistentPeerDataService.getSelfUuid().toString());
// builder.setObjName(request.getName());
//
// if (obj.isPresent()) try {
// boolean tryUpdate = obj.get().runReadLocked(JObjectManager.ResolutionStrategy.NO_RESOLUTION, (m, d) -> {
// if (m.isDeleted() && !m.isDeletionCandidate())
// throw new IllegalStateException("Object " + m.getName() + " is deleted but not a deletion candidate");
// builder.setDeletionCandidate(m.isDeletionCandidate());
// builder.addAllReferrers(m.getReferrers());
// return m.isDeletionCandidate() && !m.isDeleted();
// });
// // FIXME
//// if (tryUpdate) {
//// obj.get().runWriteLocked(JObjectManager.ResolutionStrategy.NO_RESOLUTION, (m, d, b, v) -> {
//// return null;
//// });
//// }
// } catch (DeletedObjectAccessException dox) {
// builder.setDeletionCandidate(true);
// }
// else {
// builder.setDeletionCandidate(true);
// }
//
// var ret = builder.build();
//
// if (!ret.getDeletionCandidate())
// for (var rr : request.getOurReferrersList())
// autoSyncProcessor.add(rr);
//
// return Uni.createFrom().item(ret);
// }
//
// @Override
// @Blocking
// public Uni<IndexUpdateReply> indexUpdate(IndexUpdatePush request) {
// if (request.getSelfUuid().isBlank()) throw new StatusRuntimeException(Status.INVALID_ARGUMENT);
// if (!persistentPeerDataService.existsHost(UUID.fromString(request.getSelfUuid())))
// throw new StatusRuntimeException(Status.UNAUTHENTICATED);
//
/// / Log.info("<-- indexUpdate: " + request.getHeader().getName());
// return jObjectTxManager.executeTxAndFlush(() -> {
// return Uni.createFrom().item(syncHandler.handleRemoteUpdate(request));
// });
// }
//
// @Override
// @Blocking
// public Uni<OpPushReply> opPush(OpPushMsg request) {
// if (request.getSelfUuid().isBlank()) throw new StatusRuntimeException(Status.INVALID_ARGUMENT);
// if (!persistentPeerDataService.existsHost(UUID.fromString(request.getSelfUuid())))
// throw new StatusRuntimeException(Status.UNAUTHENTICATED);
//
// try {
// var objs = request.getMsgList().stream().map(opProtoSerializer::deserialize).toList();
// jObjectTxManager.executeTxAndFlush(() -> {
// opObjectRegistry.acceptExternalOps(request.getQueueId(), UUID.fromString(request.getSelfUuid()), objs);
// });
// } catch (Exception e) {
// Log.error(e, e);
// throw e;
// }
// return Uni.createFrom().item(OpPushReply.getDefaultInstance());
// }
//
@Override
public Uni<GetObjectReply> getObject(GetObjectRequest request) {
return null;
}
@Override
public Uni<CanDeleteReply> canDelete(CanDeleteRequest request) {
return null;
}
@Override
public Uni<IndexUpdateReply> indexUpdate(IndexUpdatePush request) {
return null;
}
@Override
public Uni<OpPushReply> opPush(OpPushMsg request) {
return null;
}
@Override
@Blocking
public Uni<PingReply> ping(PingRequest request) {
if (request.getSelfUuid().isBlank()) throw new StatusRuntimeException(Status.INVALID_ARGUMENT);
return Uni.createFrom().item(PingReply.newBuilder().setSelfUuid(persistentPeerDataService.getSelfUuid().toString()).build());
}
}

View File

@@ -0,0 +1,44 @@
package com.usatiuk.dhfs.objects.repository;
import com.usatiuk.dhfs.objects.PeerId;
import com.usatiuk.dhfs.objects.repository.peertrust.PeerTrustManager;
import io.grpc.ChannelCredentials;
import io.grpc.ManagedChannel;
import io.grpc.TlsChannelCredentials;
import io.grpc.netty.NettyChannelBuilder;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import javax.net.ssl.KeyManagerFactory;
import java.security.KeyStore;
import java.security.cert.Certificate;
import java.util.concurrent.TimeUnit;
@ApplicationScoped
public class RpcChannelFactory {
@Inject
PersistentPeerDataService persistentPeerDataService;
@Inject
PeerTrustManager peerTrustManager;
private ChannelCredentials getChannelCredentials() {
try {
KeyStore ks = KeyStore.getInstance(KeyStore.getDefaultType());
ks.load(null, null);
ks.setKeyEntry("clientkey", persistentPeerDataService.getSelfKeypair().getPrivate(), null, new Certificate[]{persistentPeerDataService.getSelfCertificate()});
KeyManagerFactory keyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
keyManagerFactory.init(ks, null);
ChannelCredentials creds = TlsChannelCredentials.newBuilder().trustManager(peerTrustManager).keyManager(keyManagerFactory.getKeyManagers()).build();
return creds;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
ManagedChannel getSecureChannel(PeerId host, String address, int port) {
return NettyChannelBuilder.forAddress(address, port, getChannelCredentials()).overrideAuthority(host.toString()).idleTimeout(10, TimeUnit.SECONDS).build();
}
}

View File

@@ -0,0 +1,94 @@
package com.usatiuk.dhfs.objects.repository;
import com.usatiuk.dhfs.objects.PeerId;
import com.usatiuk.dhfs.objects.repository.peerdiscovery.IpPeerAddress;
import com.usatiuk.dhfs.objects.repository.peerdiscovery.PeerAddress;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.quarkus.logging.Log;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
// TODO: Dedup this
@ApplicationScoped
public class RpcClientFactory {
@ConfigProperty(name = "dhfs.objects.sync.timeout")
long syncTimeout;
@Inject
PeerManager peerManager;
@Inject
RpcChannelFactory rpcChannelFactory;
// FIXME: Leaks!
private ConcurrentMap<ObjSyncStubKey, DhfsObjectSyncGrpcGrpc.DhfsObjectSyncGrpcBlockingStub> _objSyncCache = new ConcurrentHashMap<>();
public <R> R withObjSyncClient(Collection<PeerId> targets, ObjectSyncClientFunction<R> fn) {
var shuffledList = new ArrayList<>(targets);
Collections.shuffle(shuffledList);
for (PeerId target : shuffledList) {
try {
return withObjSyncClient(target, fn);
} catch (StatusRuntimeException e) {
if (e.getStatus().getCode().equals(Status.UNAVAILABLE.getCode()))
Log.debug("Host " + target + " is unreachable: " + e.getMessage());
else
Log.warn("When calling " + target + " " + e.getMessage());
} catch (Exception e) {
Log.warn("When calling " + target + " " + e.getMessage());
}
}
throw new StatusRuntimeException(Status.UNAVAILABLE.withDescription("No reachable targets!"));
}
public <R> R withObjSyncClient(PeerId target, ObjectSyncClientFunction<R> fn) {
var hostinfo = peerManager.getAddress(target);
if (hostinfo == null)
throw new StatusRuntimeException(Status.UNAVAILABLE.withDescription("Not known to be reachable: " + target));
return withObjSyncClient(target, hostinfo, syncTimeout, fn);
}
public <R> R withObjSyncClient(PeerId host, PeerAddress address, long timeout, ObjectSyncClientFunction<R> fn) {
return switch (address) {
case IpPeerAddress ipPeerAddress ->
withObjSyncClient(host, ipPeerAddress.address(), ipPeerAddress.securePort(), timeout, fn);
default -> throw new IllegalStateException("Unexpected value: " + address);
};
}
public <R> R withObjSyncClient(PeerId host, InetAddress addr, int port, long timeout, ObjectSyncClientFunction<R> fn) {
var key = new ObjSyncStubKey(host, addr, port);
var stub = _objSyncCache.computeIfAbsent(key, (k) -> {
var channel = rpcChannelFactory.getSecureChannel(host, addr.getHostAddress(), port);
return DhfsObjectSyncGrpcGrpc.newBlockingStub(channel)
.withMaxOutboundMessageSize(Integer.MAX_VALUE)
.withMaxInboundMessageSize(Integer.MAX_VALUE);
});
return fn.apply(stub.withDeadlineAfter(timeout, TimeUnit.SECONDS));
}
public void dropCache() {
_objSyncCache = new ConcurrentHashMap<>();
}
@FunctionalInterface
public interface ObjectSyncClientFunction<R> {
R apply(DhfsObjectSyncGrpcGrpc.DhfsObjectSyncGrpcBlockingStub client);
}
private record ObjSyncStubKey(PeerId id, InetAddress addr, int port) {
}
}

View File

@@ -0,0 +1,207 @@
//package com.usatiuk.dhfs.objects.repository;
//
//import com.usatiuk.autoprotomap.runtime.ProtoSerializer;
//import com.usatiuk.dhfs.objects.jrepository.JObject;
//import com.usatiuk.dhfs.objects.jrepository.JObjectData;
//import com.usatiuk.dhfs.objects.jrepository.JObjectManager;
//import com.usatiuk.dhfs.objects.jrepository.JObjectTxManager;
//import com.usatiuk.dhfs.objects.persistence.JObjectDataP;
//import com.usatiuk.dhfs.objects.repository.invalidation.InvalidationQueueService;
//import com.usatiuk.dhfs.objects.repository.opsupport.OpObjectRegistry;
//import com.usatiuk.dhfs.utils.StatusRuntimeExceptionNoStacktrace;
//import io.grpc.Status;
//import io.quarkus.logging.Log;
//import jakarta.enterprise.context.ApplicationScoped;
//import jakarta.enterprise.inject.Instance;
//import jakarta.inject.Inject;
//
//import java.util.HashMap;
//import java.util.Objects;
//import java.util.Optional;
//import java.util.UUID;
//import java.util.concurrent.atomic.AtomicReference;
//import java.util.stream.Collectors;
//import java.util.stream.Stream;
//
//@ApplicationScoped
//public class SyncHandler {
// @Inject
// JObjectManager jObjectManager;
// @Inject
// PeerManager peerManager;
// @Inject
// RemoteObjectServiceClient remoteObjectServiceClient;
// @Inject
// InvalidationQueueService invalidationQueueService;
// @Inject
// Instance<ConflictResolver> conflictResolvers;
// @Inject
// PersistentPeerDataService persistentPeerDataService;
// @Inject
// ProtoSerializer<JObjectDataP, JObjectData> dataProtoSerializer;
// @Inject
// OpObjectRegistry opObjectRegistry;
// @Inject
// JObjectTxManager jObjectTxManager;
//
// public void pushInitialResyncObj(UUID host) {
// Log.info("Doing initial object push for " + host);
//
// var objs = jObjectManager.findAll();
//
// for (var obj : objs) {
// Log.trace("IS: " + obj + " to " + host);
// invalidationQueueService.pushInvalidationToOne(host, obj);
// }
// }
//
// public void pushInitialResyncOp(UUID host) {
// Log.info("Doing initial op push for " + host);
//
// jObjectTxManager.executeTxAndFlush(
// () -> {
// opObjectRegistry.pushBootstrapData(host);
// }
// );
// }
//
// public void handleOneUpdate(UUID from, ObjectHeader header) {
// AtomicReference<JObject<?>> foundExt = new AtomicReference<>();
//
// boolean conflict = jObjectTxManager.executeTx(() -> {
// JObject<?> found = jObjectManager.getOrPut(header.getName(), JObjectData.class, Optional.empty());
// foundExt.set(found);
//
// var receivedTotalVer = header.getChangelog().getEntriesList()
// .stream().map(ObjectChangelogEntry::getVersion).reduce(0L, Long::sum);
//
// var receivedMap = new HashMap<UUID, Long>();
// for (var e : header.getChangelog().getEntriesList()) {
// receivedMap.put(UUID.fromString(e.getHost()), e.getVersion());
// }
//
// return found.runWriteLocked(JObjectManager.ResolutionStrategy.NO_RESOLUTION, (md, data, bump, invalidate) -> {
// if (md.getRemoteCopies().getOrDefault(from, 0L) > receivedTotalVer) {
// Log.error("Received older index update than was known for host: "
// + from + " " + header.getName());
// throw new OutdatedUpdateException();
// }
//
// String rcv = "";
// for (var e : header.getChangelog().getEntriesList()) {
// rcv += e.getHost() + ": " + e.getVersion() + "; ";
// }
// String ours = "";
// for (var e : md.getChangelog().entrySet()) {
// ours += e.getKey() + ": " + e.getValue() + "; ";
// }
// Log.trace("Handling update: " + header.getName() + " from " + from + "\n" + "ours: " + ours + " \n" + "received: " + rcv);
//
// boolean updatedRemoteVersion = false;
//
// var oldRemoteVer = md.getRemoteCopies().put(from, receivedTotalVer);
// if (oldRemoteVer == null || !oldRemoteVer.equals(receivedTotalVer)) updatedRemoteVersion = true;
//
// boolean hasLower = false;
// boolean hasHigher = false;
// for (var e : Stream.concat(md.getChangelog().keySet().stream(), receivedMap.keySet().stream()).collect(Collectors.toSet())) {
// if (receivedMap.getOrDefault(e, 0L) < md.getChangelog().getOrDefault(e, 0L))
// hasLower = true;
// if (receivedMap.getOrDefault(e, 0L) > md.getChangelog().getOrDefault(e, 0L))
// hasHigher = true;
// }
//
// if (hasLower && hasHigher) {
// Log.info("Conflict on update (inconsistent version): " + header.getName() + " from " + from);
// return true;
// }
//
// if (hasLower) {
// Log.info("Received older index update than known: "
// + from + " " + header.getName());
// throw new OutdatedUpdateException();
// }
//
// if (hasHigher) {
// invalidate.apply();
// md.getChangelog().clear();
// md.getChangelog().putAll(receivedMap);
// md.getChangelog().putIfAbsent(persistentPeerDataService.getSelfUuid(), 0L);
// if (header.hasPushedData())
// found.externalResolution(dataProtoSerializer.deserialize(header.getPushedData()));
// return false;
// } else if (data == null && header.hasPushedData()) {
// found.tryResolve(JObjectManager.ResolutionStrategy.LOCAL_ONLY);
// if (found.getData() == null)
// found.externalResolution(dataProtoSerializer.deserialize(header.getPushedData()));
// }
//
// assert Objects.equals(receivedTotalVer, md.getOurVersion());
//
// if (!updatedRemoteVersion)
// Log.debug("No action on update: " + header.getName() + " from " + from);
//
// return false;
// });
// });
//
// // TODO: Is the lock gap here ok?
// if (conflict) {
// Log.info("Trying conflict resolution: " + header.getName() + " from " + from);
// var found = foundExt.get();
//
// JObjectData theirsData;
// ObjectHeader theirsHeader;
// if (header.hasPushedData()) {
// theirsHeader = header;
// theirsData = dataProtoSerializer.deserialize(header.getPushedData());
// } else {
// var got = remoteObjectServiceClient.getSpecificObject(from, header.getName());
// theirsData = dataProtoSerializer.deserialize(got.getRight());
// theirsHeader = got.getLeft();
// }
//
// jObjectTxManager.executeTx(() -> {
// var resolverClass = found.runReadLocked(JObjectManager.ResolutionStrategy.LOCAL_ONLY, (m, d) -> {
// if (d == null)
// throw new StatusRuntimeExceptionNoStacktrace(Status.UNAVAILABLE.withDescription("No local data when conflict " + header.getName()));
// return d.getConflictResolver();
// });
// var resolver = conflictResolvers.select(resolverClass);
// resolver.get().resolve(from, theirsHeader, theirsData, found);
// });
// Log.info("Resolved conflict for " + from + " " + header.getName());
// }
//
// }
//
// public IndexUpdateReply handleRemoteUpdate(IndexUpdatePush request) {
// // TODO: Dedup
// try {
// handleOneUpdate(UUID.fromString(request.getSelfUuid()), request.getHeader());
// } catch (OutdatedUpdateException ignored) {
// Log.warn("Outdated update of " + request.getHeader().getName() + " from " + request.getSelfUuid());
// invalidationQueueService.pushInvalidationToOne(UUID.fromString(request.getSelfUuid()), request.getHeader().getName());
// } catch (Exception ex) {
// Log.info("Error when handling update from " + request.getSelfUuid() + " of " + request.getHeader().getName(), ex);
// throw ex;
// }
//
// return IndexUpdateReply.getDefaultInstance();
// }
//
// protected static class OutdatedUpdateException extends RuntimeException {
// OutdatedUpdateException() {
// super();
// }
//
// OutdatedUpdateException(String message) {
// super(message);
// }
//
// @Override
// public synchronized Throwable fillInStackTrace() {
// return this;
// }
// }
//}

View File

@@ -0,0 +1,9 @@
package com.usatiuk.dhfs.objects.repository.peerdiscovery;
import com.usatiuk.dhfs.objects.PeerId;
import java.net.InetAddress;
public record IpPeerAddress(PeerId peer, PeerAddressType type,
InetAddress address, int port, int securePort) implements PeerAddress {
}

View File

@@ -0,0 +1,8 @@
package com.usatiuk.dhfs.objects.repository.peerdiscovery;
import com.usatiuk.dhfs.objects.PeerId;
public interface PeerAddress {
PeerId peer();
PeerAddressType type();
}

View File

@@ -0,0 +1,7 @@
package com.usatiuk.dhfs.objects.repository.peerdiscovery;
public enum PeerAddressType {
LAN,
WAN,
PROXY
}

View File

@@ -0,0 +1,70 @@
package com.usatiuk.dhfs.objects.repository.peerdiscovery;
import com.usatiuk.dhfs.objects.PeerId;
import jakarta.enterprise.context.ApplicationScoped;
import org.apache.commons.collections4.MultiValuedMap;
import org.apache.commons.collections4.multimap.HashSetValuedHashMap;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import java.util.Collection;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
@ApplicationScoped
public class PeerDiscoveryDirectory {
@ConfigProperty(name = "dhfs.peerdiscovery.timeout")
long timeout;
private record PeerEntry(PeerAddress addr, long lastSeen) {
public PeerEntry withLastSeen(long lastSeen) {
return new PeerEntry(addr, lastSeen);
}
@Override
public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) return false;
PeerEntry peerEntry = (PeerEntry) o;
return Objects.equals(addr, peerEntry.addr);
}
@Override
public int hashCode() {
return Objects.hashCode(addr);
}
}
private final MultiValuedMap<PeerId, PeerEntry> _entries = new HashSetValuedHashMap<>();
public void notifyAddr(PeerAddress addr) {
synchronized (_entries) {
var peer = addr.peer();
_entries.removeMapping(peer, new PeerEntry(addr, 0));
_entries.put(peer, new PeerEntry(addr, System.currentTimeMillis()));
}
}
public Collection<PeerAddress> getForPeer(PeerId peer) {
synchronized (_entries) {
long curTime = System.currentTimeMillis();
var partitioned = _entries.asMap().get(peer).stream()
.collect(Collectors.partitioningBy(e -> e.lastSeen() + timeout < curTime));
for (var entry : partitioned.get(true)) {
_entries.removeMapping(peer, entry);
}
return partitioned.get(false).stream().map(PeerEntry::addr).toList();
}
}
public Collection<PeerId> getReachablePeers() {
synchronized (_entries) {
long curTime = System.currentTimeMillis();
var partitioned = _entries.entries().stream()
.collect(Collectors.partitioningBy(e -> e.getValue().lastSeen() + timeout < curTime));
for (var entry : partitioned.get(true)) {
_entries.removeMapping(entry.getKey(), entry.getValue());
}
return partitioned.get(false).stream().map(Map.Entry::getKey).collect(Collectors.toUnmodifiableSet());
}
}
}

View File

@@ -0,0 +1,46 @@
package com.usatiuk.dhfs.objects.repository.peerdiscovery;
import com.usatiuk.dhfs.objects.PeerId;
import io.quarkus.scheduler.Scheduled;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.stream.Stream;
@ApplicationScoped
public class StaticPeerDiscovery {
private final List<IpPeerAddress> _peers;
public StaticPeerDiscovery(@ConfigProperty(name = "dhfs.peerdiscovery.static-peers") Optional<String> staticPeers) {
var peers = staticPeers.orElse("");
_peers = Arrays.stream(peers.split(",")).flatMap(e ->
{
if (e.isEmpty()) {
return Stream.of();
}
var split = e.split(":");
try {
return Stream.of(new IpPeerAddress(PeerId.of(split[0]), PeerAddressType.LAN, InetAddress.getByName(split[1]),
Integer.parseInt(split[2]), Integer.parseInt(split[3])));
} catch (UnknownHostException ex) {
throw new RuntimeException(ex);
}
}).toList();
}
@Inject
PeerDiscoveryDirectory peerDiscoveryDirectory;
@Scheduled(every = "1s", concurrentExecution = Scheduled.ConcurrentExecution.SKIP)
public void discoverPeers() {
for (var peer : _peers) {
peerDiscoveryDirectory.notifyAddr(peer);
}
}
}

View File

@@ -0,0 +1,102 @@
package com.usatiuk.dhfs.objects.repository.peerdiscovery.local;
import com.usatiuk.dhfs.objects.repository.PersistentPeerDataService;
import com.usatiuk.dhfs.objects.repository.peerdiscovery.PeerDiscoveryInfo;
import io.quarkus.arc.properties.IfBuildProperty;
import io.quarkus.runtime.ShutdownEvent;
import io.quarkus.runtime.Startup;
import io.quarkus.scheduler.Scheduled;
import jakarta.annotation.Priority;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
import jakarta.inject.Inject;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import java.net.*;
@ApplicationScoped
@IfBuildProperty(name = "dhfs.local-discovery", stringValue = "true")
public class LocalPeerDiscoveryBroadcaster {
@Inject
PersistentPeerDataService persistentPeerDataService;
@ConfigProperty(name = "quarkus.http.port")
int ourPort;
@ConfigProperty(name = "quarkus.http.ssl-port")
int ourSecurePort;
@ConfigProperty(name = "dhfs.objects.peerdiscovery.port")
int broadcastPort;
@ConfigProperty(name = "dhfs.objects.peerdiscovery.broadcast")
boolean enabled;
private DatagramSocket _socket;
@Startup
void init() throws SocketException {
if (!enabled) {
return;
}
_socket = new DatagramSocket();
_socket.setBroadcast(true);
}
void shutdown(@Observes @Priority(10) ShutdownEvent event) {
if (!enabled) {
return;
}
_socket.close();
}
@Scheduled(every = "${dhfs.objects.peerdiscovery.interval}", concurrentExecution = Scheduled.ConcurrentExecution.SKIP)
public void broadcast() throws Exception {
if (!enabled) {
return;
}
var sendData = PeerDiscoveryInfo.newBuilder()
.setUuid(persistentPeerDataService.getSelfUuid().toString())
.setPort(ourPort)
.setSecurePort(ourSecurePort)
.build();
var sendBytes = sendData.toByteArray();
DatagramPacket sendPacket
= new DatagramPacket(sendBytes, sendBytes.length,
InetAddress.getByName("255.255.255.255"), broadcastPort);
_socket.send(sendPacket);
var interfaces = NetworkInterface.getNetworkInterfaces();
while (interfaces.hasMoreElements()) {
NetworkInterface networkInterface = interfaces.nextElement();
try {
if (networkInterface.isLoopback() || !networkInterface.isUp()) {
continue;
}
} catch (Exception e) {
continue;
}
for (InterfaceAddress interfaceAddress : networkInterface.getInterfaceAddresses()) {
InetAddress broadcast = interfaceAddress.getBroadcast();
if (broadcast == null) {
continue;
}
try {
sendPacket = new DatagramPacket(sendBytes, sendBytes.length, broadcast, broadcastPort);
_socket.send(sendPacket);
} catch (Exception ignored) {
continue;
}
// Log.trace(getClass().getName() + "Broadcast sent to: " + broadcast.getHostAddress()
// + ", at: " + networkInterface.getDisplayName());
}
}
}
}

View File

@@ -0,0 +1,91 @@
package com.usatiuk.dhfs.objects.repository.peerdiscovery.local;
import com.google.protobuf.InvalidProtocolBufferException;
import com.usatiuk.dhfs.objects.PeerId;
import com.usatiuk.dhfs.objects.repository.peerdiscovery.IpPeerAddress;
import com.usatiuk.dhfs.objects.repository.peerdiscovery.PeerAddressType;
import com.usatiuk.dhfs.objects.repository.peerdiscovery.PeerDiscoveryDirectory;
import com.usatiuk.dhfs.objects.repository.peerdiscovery.PeerDiscoveryInfo;
import io.quarkus.arc.properties.IfBuildProperty;
import io.quarkus.logging.Log;
import io.quarkus.runtime.ShutdownEvent;
import io.quarkus.runtime.Startup;
import jakarta.annotation.Priority;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
import jakarta.inject.Inject;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import java.net.*;
import java.nio.ByteBuffer;
import java.util.UUID;
@ApplicationScoped
@IfBuildProperty(name = "dhfs.local-discovery", stringValue = "true")
public class LocalPeerDiscoveryClient {
@Inject
PeerDiscoveryDirectory peerDiscoveryDirectory;
private Thread _clientThread;
private DatagramSocket _socket;
@ConfigProperty(name = "dhfs.objects.peerdiscovery.broadcast")
boolean enabled;
@Startup
void init() throws SocketException, UnknownHostException {
if (!enabled) {
return;
}
_socket = new DatagramSocket(42069, InetAddress.getByName("0.0.0.0"));
_socket.setBroadcast(true);
_clientThread = new Thread(this::client);
_clientThread.setName("LocalPeerDiscoveryClient");
_clientThread.start();
}
void shutdown(@Observes @Priority(10) ShutdownEvent event) throws InterruptedException {
if (!enabled) {
return;
}
_socket.close();
_clientThread.interrupt();
_clientThread.interrupt();
while (_clientThread.isAlive()) {
try {
_clientThread.join();
} catch (InterruptedException ignored) {
}
}
}
private void client() {
while (!Thread.interrupted() && !_socket.isClosed()) {
try {
byte[] buf = new byte[10000];
DatagramPacket packet = new DatagramPacket(buf, buf.length);
_socket.receive(packet);
try {
var got = PeerDiscoveryInfo.parseFrom(ByteBuffer.wrap(buf, 0, packet.getLength()));
peerDiscoveryDirectory.notifyAddr(
new IpPeerAddress(
PeerId.of(UUID.fromString(got.getUuid())),
PeerAddressType.LAN,
packet.getAddress(),
got.getPort(),
got.getSecurePort()
)
);
} catch (InvalidProtocolBufferException e) {
continue;
}
} catch (Exception ex) {
Log.error(ex);
}
}
Log.info("PeerDiscoveryClient stopped");
}
}

View File

@@ -0,0 +1,37 @@
package com.usatiuk.dhfs.objects.repository.peersync;
import com.usatiuk.dhfs.objects.JDataRefcounted;
import com.usatiuk.dhfs.objects.JDataRemote;
import com.usatiuk.dhfs.objects.JObjectKey;
import com.usatiuk.dhfs.objects.PeerId;
import com.usatiuk.dhfs.objects.repository.CertificateTools;
import org.pcollections.HashTreePSet;
import org.pcollections.PCollection;
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
public record PeerInfo(JObjectKey key, PCollection<JObjectKey> refsFrom, boolean frozen, PeerId id,
byte[] cert) implements JDataRefcounted, JDataRemote {
public PeerInfo(PeerId id, byte[] cert) {
this(id.toJObjectKey(), HashTreePSet.empty(), false, id, cert);
}
@Override
public JDataRefcounted withRefsFrom(PCollection<JObjectKey> refs) {
return new PeerInfo(key, refs, frozen, id, cert);
}
@Override
public JDataRefcounted withFrozen(boolean frozen) {
return new PeerInfo(key, refsFrom, frozen, id, cert);
}
public X509Certificate parsedCert() {
try {
return CertificateTools.certFromBytes(cert);
} catch (CertificateException e) {
throw new RuntimeException(e);
}
}
}

View File

@@ -0,0 +1,76 @@
package com.usatiuk.dhfs.objects.repository.peersync;
import com.usatiuk.dhfs.objects.JObjectKey;
import com.usatiuk.dhfs.objects.PeerId;
import com.usatiuk.dhfs.objects.TransactionManager;
import com.usatiuk.dhfs.objects.jkleppmanntree.JKleppmannTreeManager;
import com.usatiuk.dhfs.objects.jkleppmanntree.structs.JKleppmannTreeNode;
import com.usatiuk.dhfs.objects.repository.PersistentPeerDataService;
import com.usatiuk.dhfs.objects.repository.peersync.structs.JKleppmannTreeNodeMetaPeer;
import com.usatiuk.dhfs.objects.transaction.Transaction;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import java.util.List;
import java.util.Optional;
@ApplicationScoped
public class PeerInfoService {
@Inject
Transaction curTx;
@Inject
TransactionManager jObjectTxManager;
@Inject
JKleppmannTreeManager jKleppmannTreeManager;
@Inject
PersistentPeerDataService persistentPeerDataService;
private JKleppmannTreeManager.JKleppmannTree getTree() {
return jKleppmannTreeManager.getTree(JObjectKey.of("peers"));
}
public Optional<PeerInfo> getPeerInfo(PeerId peer) {
return jObjectTxManager.run(() -> {
var gotKey = getTree().traverse(List.of(peer.toString()));
if (gotKey == null) {
return Optional.empty();
}
return curTx.get(JKleppmannTreeNode.class, gotKey).flatMap(node -> {
var meta = (JKleppmannTreeNodeMetaPeer) node.meta();
return curTx.get(PeerInfo.class, meta.getPeerId());
});
});
}
public List<PeerInfo> getPeers() {
return jObjectTxManager.run(() -> {
var gotKey = getTree().traverse(List.of());
return curTx.get(JKleppmannTreeNode.class, gotKey).map(
node -> node.children().keySet().stream()
.map(PeerId::of).map(this::getPeerInfo)
.map(Optional::get).toList())
.orElseThrow();
});
}
public List<PeerInfo> getPeersNoSelf() {
return jObjectTxManager.run(() -> {
var gotKey = getTree().traverse(List.of());
return curTx.get(JKleppmannTreeNode.class, gotKey).map(
node -> node.children().keySet().stream()
.map(PeerId::of).map(this::getPeerInfo)
.map(Optional::get).filter(
peerInfo -> !peerInfo.id().equals(persistentPeerDataService.getSelfUuid())).toList())
.orElseThrow();
});
}
public void putPeer(PeerId id, byte[] cert) {
jObjectTxManager.run(() -> {
var parent = getTree().traverse(List.of());
var newPeerInfo = new PeerInfo(id, cert);
curTx.put(newPeerInfo);
getTree().move(parent, new JKleppmannTreeNodeMetaPeer(newPeerInfo.id()), getTree().getNewNodeId());
});
}
}

View File

@@ -0,0 +1,4 @@
package com.usatiuk.dhfs.objects.repository.peersync.api;
public record ApiPeerInfo(String selfUuid, String cert) {
}

View File

@@ -0,0 +1,26 @@
package com.usatiuk.dhfs.objects.repository.peersync.api;
import com.usatiuk.dhfs.objects.repository.PersistentPeerDataService;
import jakarta.inject.Inject;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import java.security.cert.CertificateEncodingException;
import java.util.Base64;
@Path("/peer-info")
public class PeerSyncApi {
@Inject
PersistentPeerDataService persistentPeerDataService;
@Path("self")
@GET
public ApiPeerInfo getSelfInfo() {
try {
return new ApiPeerInfo(persistentPeerDataService.getSelfUuid().toString(),
Base64.getEncoder().encodeToString(persistentPeerDataService.getSelfCertificate().getEncoded()));
} catch (CertificateEncodingException e) {
throw new RuntimeException(e);
}
}
}

View File

@@ -0,0 +1,11 @@
package com.usatiuk.dhfs.objects.repository.peersync.api;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
@Path("/peer-info")
public interface PeerSyncApiClient {
@Path("self")
@GET
ApiPeerInfo getSelfInfo();
}

View File

@@ -0,0 +1,28 @@
package com.usatiuk.dhfs.objects.repository.peersync.api;
import com.usatiuk.dhfs.objects.repository.peerdiscovery.IpPeerAddress;
import com.usatiuk.dhfs.objects.repository.peerdiscovery.PeerAddress;
import io.quarkus.rest.client.reactive.QuarkusRestClientBuilder;
import jakarta.enterprise.context.ApplicationScoped;
import java.net.URI;
import java.util.concurrent.TimeUnit;
@ApplicationScoped
public class PeerSyncApiClientDynamic {
public ApiPeerInfo getSelfInfo(PeerAddress addr) {
return switch (addr) {
case IpPeerAddress ipAddr -> getSelfInfo(ipAddr.address().getHostAddress(), ipAddr.port());
default -> throw new IllegalArgumentException("Unsupported peer address type: " + addr.getClass());
};
}
private ApiPeerInfo getSelfInfo(String address, int port) {
var client = QuarkusRestClientBuilder.newBuilder()
.baseUri(URI.create("http://" + address + ":" + port))
.connectTimeout(5, TimeUnit.SECONDS)
.readTimeout(5, TimeUnit.SECONDS)
.build(PeerSyncApiClient.class);
return client.getSelfInfo();
}
}

View File

@@ -0,0 +1,41 @@
package com.usatiuk.dhfs.objects.repository.peersync.structs;
import com.usatiuk.dhfs.objects.JObjectKey;
import com.usatiuk.dhfs.objects.PeerId;
import com.usatiuk.dhfs.objects.jkleppmanntree.structs.JKleppmannTreeNodeMeta;
import java.util.Objects;
//@ProtoMirror(JKleppmannTreeNodeMetaFileP.class)
public class JKleppmannTreeNodeMetaPeer extends JKleppmannTreeNodeMeta {
private final JObjectKey _peerId;
public JKleppmannTreeNodeMetaPeer(PeerId id) {
super(id.toString());
_peerId = id.toJObjectKey();
}
public JObjectKey getPeerId() {
return _peerId;
}
@Override
public JKleppmannTreeNodeMeta withName(String name) {
assert false;
return this;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
if (!super.equals(o)) return false;
JKleppmannTreeNodeMetaPeer that = (JKleppmannTreeNodeMetaPeer) o;
return Objects.equals(_peerId, that._peerId);
}
@Override
public int hashCode() {
return Objects.hash(super.hashCode(), _peerId);
}
}

View File

@@ -0,0 +1,51 @@
package com.usatiuk.dhfs.objects.repository.peertrust;
import com.usatiuk.dhfs.objects.PeerId;
import com.usatiuk.dhfs.objects.repository.peersync.PeerInfoService;
import io.quarkus.logging.Log;
import io.quarkus.security.credential.CertificateCredential;
import io.quarkus.security.identity.AuthenticationRequestContext;
import io.quarkus.security.identity.SecurityIdentity;
import io.quarkus.security.identity.SecurityIdentityAugmentor;
import io.quarkus.security.runtime.QuarkusSecurityIdentity;
import io.smallrye.mutiny.Uni;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import java.util.function.Supplier;
@ApplicationScoped
public class PeerRolesAugmentor implements SecurityIdentityAugmentor {
@Inject
PeerInfoService peerInfoService;
@Override
public Uni<SecurityIdentity> augment(SecurityIdentity identity, AuthenticationRequestContext context) {
return Uni.createFrom().item(build(identity));
}
private Supplier<SecurityIdentity> build(SecurityIdentity identity) {
if (identity.isAnonymous()) {
return () -> identity;
} else {
QuarkusSecurityIdentity.Builder builder = QuarkusSecurityIdentity.builder(identity);
var uuid = identity.getPrincipal().getName().substring(3);
try {
var entry = peerInfoService.getPeerInfo(PeerId.of(uuid));
if (!entry.get().parsedCert().equals(identity.getCredential(CertificateCredential.class).getCertificate())) {
Log.error("Certificate mismatch for " + uuid);
return () -> identity;
}
builder.addRole("cluster-member");
return builder::build;
} catch (Exception e) {
Log.error("Error when checking certificate for " + uuid, e);
return () -> identity;
}
}
}
}

View File

@@ -0,0 +1,69 @@
package com.usatiuk.dhfs.objects.repository.peertrust;
import com.usatiuk.dhfs.objects.repository.peersync.PeerInfo;
import io.quarkus.logging.Log;
import jakarta.enterprise.context.ApplicationScoped;
import org.apache.commons.lang3.tuple.Pair;
import javax.net.ssl.TrustManager;
import javax.net.ssl.TrustManagerFactory;
import javax.net.ssl.X509TrustManager;
import java.security.KeyStore;
import java.security.NoSuchAlgorithmException;
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
import java.util.Collection;
import java.util.concurrent.atomic.AtomicReference;
@ApplicationScoped
public class PeerTrustManager implements X509TrustManager {
private final AtomicReference<X509TrustManager> trustManager = new AtomicReference<>();
@Override
public void checkClientTrusted(X509Certificate[] chain, String authType) throws CertificateException {
trustManager.get().checkClientTrusted(chain, authType);
}
@Override
public void checkServerTrusted(X509Certificate[] chain, String authType) throws CertificateException {
trustManager.get().checkServerTrusted(chain, authType);
}
@Override
public X509Certificate[] getAcceptedIssuers() {
return trustManager.get().getAcceptedIssuers();
}
public synchronized void reloadTrustManagerHosts(Collection<PeerInfo> hosts) {
try {
Log.info("Trying to reload trust manager: " + hosts.size() + " known hosts");
reloadTrustManager(hosts.stream().map(hostInfo ->
Pair.of(hostInfo.id().toString(), hostInfo.parsedCert())).toList());
} catch (Exception e) {
throw new RuntimeException(e);
}
}
private synchronized void reloadTrustManager(Collection<Pair<String, X509Certificate>> certs) throws Exception {
KeyStore ts = KeyStore.getInstance(KeyStore.getDefaultType());
ts.load(null, null);
for (var cert : certs) {
ts.setCertificateEntry(cert.getLeft(), cert.getRight());
}
TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
tmf.init(ts);
TrustManager[] tms = tmf.getTrustManagers();
for (var tm : tms) {
if (tm instanceof X509TrustManager) {
trustManager.set((X509TrustManager) tm);
return;
}
}
throw new NoSuchAlgorithmException("No X509TrustManager in TrustManagerFactory");
}
}

View File

@@ -0,0 +1,44 @@
package com.usatiuk.dhfs.objects.repository.peertrust;
import com.usatiuk.dhfs.objects.repository.PersistentPeerDataService;
import io.quarkus.vertx.http.HttpServerOptionsCustomizer;
import io.vertx.core.http.HttpServerOptions;
import io.vertx.core.net.KeyCertOptions;
import io.vertx.core.net.TrustOptions;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import javax.net.ssl.KeyManagerFactory;
import java.security.KeyStore;
import java.security.cert.Certificate;
@ApplicationScoped
public class PeerTrustServerCustomizer implements HttpServerOptionsCustomizer {
@Inject
PeerTrustManager peerTrustManager;
@Inject
PersistentPeerDataService persistentPeerDataService;
@Override
public void customizeHttpsServer(HttpServerOptions options) {
try {
KeyStore ks = KeyStore.getInstance(KeyStore.getDefaultType());
ks.load(null, null);
ks.setKeyEntry("sslkey",
persistentPeerDataService.getSelfKeypair().getPrivate(), null,
new Certificate[]{persistentPeerDataService.getSelfCertificate()});
KeyManagerFactory keyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
keyManagerFactory.init(ks, null);
options.setKeyCertOptions(KeyCertOptions.wrap(keyManagerFactory));
options.setTrustOptions(TrustOptions.wrap(peerTrustManager));
} catch (Exception e) {
throw new RuntimeException("Error configuring https: ", e);
}
}
}

View File

@@ -0,0 +1,4 @@
package com.usatiuk.dhfs.objects.repository.webapi;
public record AvailablePeerInfo(String uuid) {
}

View File

@@ -0,0 +1,4 @@
package com.usatiuk.dhfs.objects.repository.webapi;
public record KnownPeerDelete(String uuid) {
}

View File

@@ -0,0 +1,4 @@
package com.usatiuk.dhfs.objects.repository.webapi;
public record KnownPeerInfo(String uuid) {
}

View File

@@ -0,0 +1,4 @@
package com.usatiuk.dhfs.objects.repository.webapi;
public record KnownPeerPut(String uuid) {
}

View File

@@ -0,0 +1,45 @@
package com.usatiuk.dhfs.objects.repository.webapi;
import com.usatiuk.dhfs.objects.PeerId;
import com.usatiuk.dhfs.objects.repository.PeerManager;
import com.usatiuk.dhfs.objects.repository.peersync.PeerInfoService;
import jakarta.inject.Inject;
import jakarta.ws.rs.DELETE;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.PUT;
import jakarta.ws.rs.Path;
import java.util.Collection;
import java.util.List;
@Path("/objects-manage")
public class ManagementApi {
@Inject
PeerInfoService peerInfoService;
@Inject
PeerManager peerManager;
@Path("known-peers")
@GET
public List<KnownPeerInfo> knownPeers() {
return peerInfoService.getPeers().stream().map(peerInfo -> new KnownPeerInfo(peerInfo.id().toString())).toList();
}
@Path("known-peers")
@PUT
public void addPeer(KnownPeerPut knownPeerPut) {
peerManager.addRemoteHost(PeerId.of(knownPeerPut.uuid()));
}
@Path("known-peers")
@DELETE
public void deletePeer(KnownPeerDelete knownPeerDelete) {
// peerManager.removeRemoteHost(PeerId.of(knownPeerPut.uuid()));
}
@Path("available-peers")
@GET
public Collection<AvailablePeerInfo> availablePeers() {
return peerManager.getSeenButNotAddedHosts();
}
}

View File

@@ -1,6 +1,7 @@
quarkus.grpc.server.use-separate-server=false
dhfs.objects.peerdiscovery.port=42069
dhfs.objects.peerdiscovery.interval=5000
dhfs.objects.peerdiscovery.interval=5s
dhfs.objects.peerdiscovery.broadcast=true
dhfs.objects.sync.timeout=30
dhfs.objects.sync.ping.timeout=5
dhfs.objects.invalidation.threads=4
@@ -31,6 +32,7 @@ dhfs.objects.ref-processor.threads=4
dhfs.objects.opsender.batch-size=100
dhfs.objects.lock_timeout_secs=2
dhfs.local-discovery=true
dhfs.peerdiscovery.timeout=5000
quarkus.log.category."com.usatiuk.dhfs".min-level=TRACE
quarkus.log.category."com.usatiuk.dhfs".level=TRACE
quarkus.http.insecure-requests=enabled

View File

@@ -22,7 +22,6 @@ abstract public class TempDataProfile implements QuarkusTestProfile {
}
var ret = new HashMap<String, String>();
ret.put("dhfs.objects.persistence.files.root", tempDirWithPrefix.resolve("dhfs_root_test").toString());
ret.put("dhfs.objects.root", tempDirWithPrefix.resolve("dhfs_root_d_test").toString());
ret.put("dhfs.fuse.root", tempDirWithPrefix.resolve("dhfs_fuse_root_test").toString());
getConfigOverrides(ret);
return ret;

View File

@@ -17,13 +17,10 @@ import java.util.Objects;
public class TestDataCleaner {
@ConfigProperty(name = "dhfs.objects.persistence.files.root")
String tempDirectory;
@ConfigProperty(name = "dhfs.objects.root")
String tempDirectoryIdx;
void init(@Observes @Priority(1) StartupEvent event) throws IOException {
try {
purgeDirectory(Path.of(tempDirectory).toFile());
purgeDirectory(Path.of(tempDirectoryIdx).toFile());
} catch (Exception ignored) {
Log.warn("Couldn't cleanup test data on init");
}
@@ -31,7 +28,6 @@ public class TestDataCleaner {
void shutdown(@Observes @Priority(1000000000) ShutdownEvent event) throws IOException {
purgeDirectory(Path.of(tempDirectory).toFile());
purgeDirectory(Path.of(tempDirectoryIdx).toFile());
}
void purgeDirectory(File dir) {

View File

@@ -39,8 +39,8 @@ export type TTokenToResp = z.infer<typeof TokenToResp>;
// AvailablePeerInfo
export const AvailablePeerInfoTo = z.object({
uuid: z.string(),
addr: z.string(),
port: z.number(),
// addr: z.string(),
// port: z.number(),
});
export type TAvailablePeerInfoTo = z.infer<typeof AvailablePeerInfoTo>;