mirror of
https://github.com/usatiuk/dhfs.git
synced 2025-10-28 20:47:49 +01:00
basic mtls
This commit is contained in:
@@ -17,6 +17,7 @@ services:
|
||||
-jar /app/quarkus-run.jar"
|
||||
ports:
|
||||
- 8080:8080
|
||||
- 8081:8443
|
||||
- 5005:5005
|
||||
dhfs2:
|
||||
build: .
|
||||
@@ -33,4 +34,5 @@ services:
|
||||
-Ddhfs.fuse.root=/dhfs_root/fuse -Dquarkus.http.host=0.0.0.0
|
||||
-jar /app/quarkus-run.jar"
|
||||
ports:
|
||||
- 8081:8080
|
||||
- 8090:8080
|
||||
- 8091:8443
|
||||
|
||||
@@ -41,6 +41,20 @@
|
||||
<groupId>org.testcontainers</groupId>
|
||||
<artifactId>testcontainers</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.bouncycastle</groupId>
|
||||
<artifactId>bcprov-jdk18on</artifactId>
|
||||
<version>1.78.1</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.bouncycastle</groupId>
|
||||
<artifactId>bcpkix-jdk18on</artifactId>
|
||||
<version>1.78.1</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.quarkus</groupId>
|
||||
<artifactId>quarkus-security</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>net.openhft</groupId>
|
||||
<artifactId>zero-allocation-hashing</artifactId>
|
||||
|
||||
@@ -0,0 +1,64 @@
|
||||
package com.usatiuk.dhfs.storage.objects.repository.distributed;
|
||||
|
||||
import org.apache.commons.codec.digest.DigestUtils;
|
||||
import org.bouncycastle.asn1.ASN1ObjectIdentifier;
|
||||
import org.bouncycastle.asn1.x500.X500Name;
|
||||
import org.bouncycastle.asn1.x509.BasicConstraints;
|
||||
import org.bouncycastle.cert.CertIOException;
|
||||
import org.bouncycastle.cert.jcajce.JcaX509CertificateConverter;
|
||||
import org.bouncycastle.cert.jcajce.JcaX509v3CertificateBuilder;
|
||||
import org.bouncycastle.jce.provider.BouncyCastleProvider;
|
||||
import org.bouncycastle.operator.ContentSigner;
|
||||
import org.bouncycastle.operator.OperatorCreationException;
|
||||
import org.bouncycastle.operator.jcajce.JcaContentSignerBuilder;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.InputStream;
|
||||
import java.math.BigInteger;
|
||||
import java.security.*;
|
||||
import java.security.cert.CertificateException;
|
||||
import java.security.cert.CertificateFactory;
|
||||
import java.security.cert.X509Certificate;
|
||||
import java.util.Calendar;
|
||||
import java.util.Date;
|
||||
|
||||
public class CertificateTools {
|
||||
|
||||
public static X509Certificate certFromBytes(byte[] bytes) throws CertificateException {
|
||||
CertificateFactory certFactory = CertificateFactory.getInstance("X.509");
|
||||
InputStream in = new ByteArrayInputStream(bytes);
|
||||
return (X509Certificate) certFactory.generateCertificate(in);
|
||||
}
|
||||
|
||||
public static KeyPair generateKeyPair() throws NoSuchAlgorithmException {
|
||||
KeyPairGenerator keyGen = KeyPairGenerator.getInstance("RSA");
|
||||
keyGen.initialize(1024); //FIXME:
|
||||
return keyGen.generateKeyPair();
|
||||
}
|
||||
|
||||
public static X509Certificate generateCertificate(KeyPair keyPair, String subject) throws CertificateException, CertIOException, NoSuchAlgorithmException, OperatorCreationException {
|
||||
Provider bcProvider = new BouncyCastleProvider();
|
||||
Security.addProvider(bcProvider);
|
||||
|
||||
Date startDate = new Date();
|
||||
|
||||
X500Name dnName = new X500Name("DN=" + subject);
|
||||
X500Name cnName = new X500Name("CN=" + subject);
|
||||
BigInteger certSerialNumber = new BigInteger(DigestUtils.sha256(subject));
|
||||
|
||||
Calendar calendar = Calendar.getInstance();
|
||||
calendar.setTime(startDate);
|
||||
calendar.add(Calendar.YEAR, 999);
|
||||
|
||||
Date endDate = calendar.getTime();
|
||||
|
||||
ContentSigner contentSigner = new JcaContentSignerBuilder("SHA256WithRSA").build(keyPair.getPrivate());
|
||||
|
||||
JcaX509v3CertificateBuilder certBuilder = new JcaX509v3CertificateBuilder(dnName, certSerialNumber, startDate, endDate, cnName, keyPair.getPublic());
|
||||
|
||||
BasicConstraints basicConstraints = new BasicConstraints(false);
|
||||
certBuilder.addExtension(new ASN1ObjectIdentifier("2.5.29.19"), true, basicConstraints);
|
||||
|
||||
return new JcaX509CertificateConverter().setProvider(bcProvider).getCertificate(certBuilder.build(contentSigner));
|
||||
}
|
||||
}
|
||||
@@ -1,23 +1,29 @@
|
||||
package com.usatiuk.dhfs.storage.objects.repository.distributed;
|
||||
|
||||
import com.google.protobuf.ByteString;
|
||||
import com.usatiuk.dhfs.objects.repository.distributed.peersync.PeerInfo;
|
||||
import jakarta.json.bind.annotation.JsonbCreator;
|
||||
import jakarta.json.bind.annotation.JsonbProperty;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Getter;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.security.cert.CertificateEncodingException;
|
||||
import java.security.cert.X509Certificate;
|
||||
import java.util.UUID;
|
||||
|
||||
@Getter
|
||||
@AllArgsConstructor
|
||||
public class HostInfo implements Serializable {
|
||||
private final UUID _uuid;
|
||||
|
||||
@JsonbCreator
|
||||
public HostInfo(@JsonbProperty("uuid") String uuid) {
|
||||
_uuid = UUID.fromString(uuid);
|
||||
}
|
||||
private final X509Certificate _certificate;
|
||||
|
||||
public PeerInfo toPeerInfo() {
|
||||
return PeerInfo.newBuilder().setUuid(_uuid.toString()).build();
|
||||
try {
|
||||
return PeerInfo.newBuilder().setUuid(_uuid.toString())
|
||||
.setCert(ByteString.copyFrom(_certificate.getEncoded())).build();
|
||||
} catch (CertificateEncodingException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,29 +0,0 @@
|
||||
package com.usatiuk.dhfs.storage.objects.repository.distributed;
|
||||
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.ws.rs.GET;
|
||||
import jakarta.ws.rs.PUT;
|
||||
import jakarta.ws.rs.Path;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
@Path("/objects-manage")
|
||||
public class ManagementApi {
|
||||
@Inject
|
||||
RemoteHostManager remoteHostManager;
|
||||
|
||||
@Inject
|
||||
PersistentRemoteHostsService persistentRemoteHostsService;
|
||||
|
||||
@Path("known-peers")
|
||||
@GET
|
||||
public List<HostInfo> knownPeers() {
|
||||
return persistentRemoteHostsService.getHosts();
|
||||
}
|
||||
|
||||
@Path("known-peers")
|
||||
@PUT
|
||||
public void addPeer(HostInfo hostInfo) {
|
||||
persistentRemoteHostsService.addHost(hostInfo);
|
||||
}
|
||||
}
|
||||
@@ -1,8 +1,11 @@
|
||||
package com.usatiuk.dhfs.storage.objects.repository.distributed;
|
||||
|
||||
import lombok.Getter;
|
||||
import lombok.Setter;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.security.KeyPair;
|
||||
import java.security.cert.X509Certificate;
|
||||
import java.util.HashMap;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
@@ -14,6 +17,14 @@ public class PersistentRemoteHostsData implements Serializable {
|
||||
@Getter
|
||||
private final UUID _selfUuid = UUID.randomUUID();
|
||||
|
||||
@Getter
|
||||
@Setter
|
||||
private X509Certificate _selfCertificate = null;
|
||||
|
||||
@Getter
|
||||
@Setter
|
||||
private KeyPair _selfKeyPair = null;
|
||||
|
||||
@Getter
|
||||
private final AtomicLong _selfCounter = new AtomicLong();
|
||||
}
|
||||
|
||||
@@ -2,6 +2,7 @@ package com.usatiuk.dhfs.storage.objects.repository.distributed;
|
||||
|
||||
import com.usatiuk.dhfs.storage.SerializationHelper;
|
||||
import com.usatiuk.dhfs.storage.objects.repository.distributed.peersync.PeerSyncClient;
|
||||
import com.usatiuk.dhfs.storage.objects.repository.distributed.peertrust.PeerTrustManager;
|
||||
import io.quarkus.logging.Log;
|
||||
import io.quarkus.runtime.ShutdownEvent;
|
||||
import io.quarkus.runtime.StartupEvent;
|
||||
@@ -15,6 +16,8 @@ import org.eclipse.microprofile.config.inject.ConfigProperty;
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Paths;
|
||||
import java.security.KeyPair;
|
||||
import java.security.cert.X509Certificate;
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
|
||||
@@ -26,6 +29,9 @@ public class PersistentRemoteHostsService {
|
||||
@Inject
|
||||
PeerSyncClient peerSyncClient;
|
||||
|
||||
@Inject
|
||||
PeerTrustManager peerTrustManager;
|
||||
|
||||
final String dataFileName = "hosts";
|
||||
|
||||
private PersistentRemoteHosts _persistentData = new PersistentRemoteHosts();
|
||||
@@ -40,6 +46,24 @@ public class PersistentRemoteHostsService {
|
||||
_persistentData = SerializationHelper.deserialize(Files.readAllBytes(Paths.get(dataRoot).resolve(dataFileName)));
|
||||
}
|
||||
_selfUuid = _persistentData.runReadLocked(PersistentRemoteHostsData::getSelfUuid);
|
||||
|
||||
if (_persistentData.runReadLocked(d -> d.getSelfCertificate() == null))
|
||||
_persistentData.runWriteLocked(d -> {
|
||||
try {
|
||||
Log.info("Generating a key pair, please wait");
|
||||
d.setSelfKeyPair(CertificateTools.generateKeyPair());
|
||||
d.setSelfCertificate(CertificateTools.generateCertificate(d.getSelfKeyPair(), _selfUuid.toString()));
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException("Failed generating cert", e);
|
||||
}
|
||||
return null;
|
||||
});
|
||||
|
||||
_persistentData.runReadLocked(d -> {
|
||||
peerTrustManager.reloadTrustManagerHosts(d.getRemoteHosts().values());
|
||||
return null;
|
||||
});
|
||||
|
||||
Files.writeString(Paths.get(dataRoot, "self_uuid"), _selfUuid.toString());
|
||||
Log.info("Self uuid is: " + _selfUuid.toString());
|
||||
}
|
||||
@@ -72,15 +96,20 @@ public class PersistentRemoteHostsService {
|
||||
});
|
||||
}
|
||||
|
||||
public void addHost(HostInfo hostInfo) {
|
||||
if (hostInfo.getUuid().equals(_selfUuid)) return;
|
||||
public boolean addHost(HostInfo hostInfo) {
|
||||
if (hostInfo.getUuid().equals(_selfUuid)) return false;
|
||||
boolean added = _persistentData.runWriteLocked(d -> {
|
||||
return d.getRemoteHosts().put(hostInfo.getUuid(), hostInfo) == null;
|
||||
});
|
||||
if (added) {
|
||||
_persistentData.runReadLocked(d -> {
|
||||
peerTrustManager.reloadTrustManagerHosts(d.getRemoteHosts().values());
|
||||
return null;
|
||||
});
|
||||
// FIXME: async
|
||||
peerSyncClient.syncPeersAll();
|
||||
}
|
||||
return added;
|
||||
}
|
||||
|
||||
public boolean existsHost(UUID uuid) {
|
||||
@@ -88,4 +117,13 @@ public class PersistentRemoteHostsService {
|
||||
return d.getRemoteHosts().containsKey(uuid);
|
||||
});
|
||||
}
|
||||
|
||||
public KeyPair getSelfKeypair() {
|
||||
return _persistentData.runReadLocked(PersistentRemoteHostsData::getSelfKeyPair);
|
||||
}
|
||||
|
||||
public X509Certificate getSelfCertificate() {
|
||||
return _persistentData.runReadLocked(PersistentRemoteHostsData::getSelfCertificate);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -1,7 +1,9 @@
|
||||
package com.usatiuk.dhfs.storage.objects.repository.distributed;
|
||||
|
||||
import com.usatiuk.dhfs.objects.repository.distributed.PingRequest;
|
||||
import com.usatiuk.dhfs.objects.repository.distributed.peersync.GetSelfInfoRequest;
|
||||
import com.usatiuk.dhfs.storage.objects.repository.distributed.peersync.PeerSyncClient;
|
||||
import com.usatiuk.dhfs.storage.objects.repository.distributed.webapi.AvailablePeerInfo;
|
||||
import io.quarkus.logging.Log;
|
||||
import io.quarkus.runtime.ShutdownEvent;
|
||||
import io.quarkus.runtime.StartupEvent;
|
||||
@@ -14,9 +16,13 @@ import jakarta.inject.Inject;
|
||||
import org.eclipse.microprofile.config.inject.ConfigProperty;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.security.cert.CertificateException;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
|
||||
@ApplicationScoped
|
||||
public class RemoteHostManager {
|
||||
@@ -37,6 +43,8 @@ public class RemoteHostManager {
|
||||
|
||||
private final TransientPeersState _transientPeersState = new TransientPeersState();
|
||||
|
||||
private final ConcurrentMap<UUID, TransientPeerState> _seenHostsButNotAdded = new ConcurrentHashMap<>();
|
||||
|
||||
void init(@Observes @Priority(350) StartupEvent event) throws IOException {
|
||||
}
|
||||
|
||||
@@ -96,7 +104,7 @@ public class RemoteHostManager {
|
||||
TransientPeerState state = _transientPeersState.runReadLocked(s -> s.getStates().get(host));
|
||||
if (state == null) return false;
|
||||
try {
|
||||
return rpcClientFactory.withObjSyncClient(state.getAddr(), state.getPort(), pingTimeout, c -> {
|
||||
return rpcClientFactory.withObjSyncClient(host.toString(), state.getAddr(), state.getSecurePort(), pingTimeout, c -> {
|
||||
var ret = c.ping(PingRequest.newBuilder().setSelfUuid(persistentRemoteHostsService.getSelfUuid().toString()).build());
|
||||
if (!UUID.fromString(ret.getSelfUuid()).equals(host)) {
|
||||
throw new IllegalStateException("Ping selfUuid returned " + ret.getSelfUuid() + " but expected " + host);
|
||||
@@ -135,21 +143,57 @@ public class RemoteHostManager {
|
||||
.map(Map.Entry::getKey).toList());
|
||||
}
|
||||
|
||||
public void notifyAddr(UUID host, String addr, Integer port) {
|
||||
public void notifyAddr(UUID host, String addr, Integer port, Integer securePort) {
|
||||
if (host.equals(persistentRemoteHostsService.getSelfUuid())) {
|
||||
return;
|
||||
}
|
||||
|
||||
var state = new TransientPeerState();
|
||||
state.setAddr(addr);
|
||||
state.setPort(port);
|
||||
state.setSecurePort(securePort);
|
||||
|
||||
if (!persistentRemoteHostsService.existsHost(host)) {
|
||||
_seenHostsButNotAdded.put(host, state);
|
||||
Log.trace("Ignoring new address from unknown host " + ": addr=" + addr + " port=" + port);
|
||||
return;
|
||||
}
|
||||
_transientPeersState.runWriteLocked(d -> {
|
||||
Log.trace("Updating connection info for " + host + ": addr=" + addr + " port=" + port);
|
||||
d.getStates().putIfAbsent(host, new TransientPeerState());
|
||||
d.getStates().get(host).setAddr(addr);
|
||||
d.getStates().get(host).setPort(port);
|
||||
d.getStates().put(host, state);
|
||||
return null;
|
||||
});
|
||||
}
|
||||
|
||||
public void addRemoteHost(UUID host) {
|
||||
if (!_seenHostsButNotAdded.containsKey(host)) {
|
||||
throw new IllegalStateException("Host " + host + " is not seen");
|
||||
}
|
||||
if (persistentRemoteHostsService.existsHost(host)) {
|
||||
throw new IllegalStateException("Host " + host + " is already added");
|
||||
}
|
||||
|
||||
var state = _seenHostsButNotAdded.get(host);
|
||||
|
||||
// FIXME: race?
|
||||
|
||||
var info = rpcClientFactory.withPeerSyncClient(state.getAddr(), state.getPort(), 10000L, c -> {
|
||||
return c.getSelfInfo(GetSelfInfoRequest.getDefaultInstance());
|
||||
});
|
||||
|
||||
try {
|
||||
persistentRemoteHostsService.addHost(
|
||||
new HostInfo(UUID.fromString(info.getUuid()), CertificateTools.certFromBytes(info.getCert().toByteArray())));
|
||||
Log.info("Added host: " + host.toString());
|
||||
} catch (CertificateException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
public Collection<AvailablePeerInfo> getSeenButNotAddedHosts() {
|
||||
return _seenHostsButNotAdded.entrySet().stream()
|
||||
.map(e -> new AvailablePeerInfo(e.getKey().toString(), e.getValue().getAddr(), e.getValue().getPort()))
|
||||
.toList();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -11,6 +11,7 @@ import io.quarkus.grpc.GrpcService;
|
||||
import io.quarkus.logging.Log;
|
||||
import io.smallrye.common.annotation.Blocking;
|
||||
import io.smallrye.mutiny.Uni;
|
||||
import jakarta.annotation.security.RolesAllowed;
|
||||
import jakarta.inject.Inject;
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
|
||||
@@ -18,6 +19,7 @@ import java.util.UUID;
|
||||
|
||||
// Note: RunOnVirtualThread hangs somehow
|
||||
@GrpcService
|
||||
@RolesAllowed("cluster-member")
|
||||
public class RemoteObjectServiceServer implements DhfsObjectSyncGrpc {
|
||||
@Inject
|
||||
SyncHandler syncHandler;
|
||||
|
||||
@@ -2,8 +2,11 @@ package com.usatiuk.dhfs.storage.objects.repository.distributed;
|
||||
|
||||
import com.usatiuk.dhfs.objects.repository.distributed.DhfsObjectSyncGrpcGrpc;
|
||||
import com.usatiuk.dhfs.objects.repository.distributed.peersync.DhfsObjectPeerSyncGrpcGrpc;
|
||||
import com.usatiuk.dhfs.storage.objects.repository.distributed.peertrust.PeerTrustManager;
|
||||
import io.grpc.ChannelCredentials;
|
||||
import io.grpc.Status;
|
||||
import io.grpc.StatusRuntimeException;
|
||||
import io.grpc.TlsChannelCredentials;
|
||||
import io.grpc.netty.NegotiationType;
|
||||
import io.grpc.netty.NettyChannelBuilder;
|
||||
import io.quarkus.logging.Log;
|
||||
@@ -11,6 +14,9 @@ import jakarta.enterprise.context.ApplicationScoped;
|
||||
import jakarta.inject.Inject;
|
||||
import org.eclipse.microprofile.config.inject.ConfigProperty;
|
||||
|
||||
import javax.net.ssl.KeyManagerFactory;
|
||||
import java.security.KeyStore;
|
||||
import java.security.cert.Certificate;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
@@ -23,6 +29,9 @@ public class RpcClientFactory {
|
||||
@Inject
|
||||
PersistentRemoteHostsService persistentRemoteHostsService;
|
||||
|
||||
@Inject
|
||||
PeerTrustManager peerTrustManager;
|
||||
|
||||
@ConfigProperty(name = "dhfs.objects.distributed.sync.timeout")
|
||||
long syncTimeout;
|
||||
|
||||
@@ -49,7 +58,7 @@ public class RpcClientFactory {
|
||||
if (!shouldTry) continue;
|
||||
|
||||
try {
|
||||
return withObjSyncClient(hostinfo.getAddr(), hostinfo.getPort(), syncTimeout, fn);
|
||||
return withObjSyncClient(target.toString(), hostinfo.getAddr(), hostinfo.getSecurePort(), syncTimeout, fn);
|
||||
} catch (StatusRuntimeException e) {
|
||||
if (e.getStatus().getCode().equals(Status.UNAVAILABLE.getCode())) {
|
||||
Log.info("Host " + target + " is unreachable: " + e.getMessage());
|
||||
@@ -71,12 +80,35 @@ public class RpcClientFactory {
|
||||
var hostinfo = remoteHostManager.getTransientState(target);
|
||||
if (hostinfo.getAddr() == null)
|
||||
throw new IllegalStateException("Address for " + target + " not yet known");
|
||||
return withObjSyncClient(hostinfo.getAddr(), hostinfo.getPort(), syncTimeout, fn);
|
||||
return withObjSyncClient(target.toString(), hostinfo.getAddr(), hostinfo.getSecurePort(), syncTimeout, fn);
|
||||
}
|
||||
|
||||
public <R> R withObjSyncClient(String addr, int port, long timeout, ObjectSyncClientFunction<R> fn) {
|
||||
var channel = NettyChannelBuilder.forAddress(addr, port).negotiationType(NegotiationType.PLAINTEXT)
|
||||
.usePlaintext().build();
|
||||
private ChannelCredentials getChannelCredentials() {
|
||||
try {
|
||||
KeyStore ks = KeyStore.getInstance(KeyStore.getDefaultType());
|
||||
ks.load(null, null);
|
||||
|
||||
ks.setKeyEntry("clientkey",
|
||||
persistentRemoteHostsService.getSelfKeypair().getPrivate(), null,
|
||||
new Certificate[]{persistentRemoteHostsService.getSelfCertificate()});
|
||||
|
||||
KeyManagerFactory keyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
|
||||
keyManagerFactory.init(ks, null);
|
||||
|
||||
ChannelCredentials creds = TlsChannelCredentials.newBuilder()
|
||||
.trustManager(peerTrustManager)
|
||||
.keyManager(keyManagerFactory.getKeyManagers())
|
||||
.build();
|
||||
return creds;
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
public <R> R withObjSyncClient(String host, String addr, int port, long timeout, ObjectSyncClientFunction<R> fn) {
|
||||
var creds = getChannelCredentials();
|
||||
var channel = NettyChannelBuilder.forAddress(addr, port, creds)
|
||||
.overrideAuthority(host).build();
|
||||
var client = DhfsObjectSyncGrpcGrpc.newBlockingStub(channel)
|
||||
.withMaxOutboundMessageSize(Integer.MAX_VALUE)
|
||||
.withMaxInboundMessageSize(Integer.MAX_VALUE)
|
||||
|
||||
@@ -119,7 +119,8 @@ public class SyncHandler {
|
||||
|
||||
if (conflict) {
|
||||
Log.info("Trying conflict resolution: " + header.getName() + " from " + from);
|
||||
var resolver = conflictResolvers.select(found.getConflictResolver());
|
||||
var resolverClass = found.runReadLocked(JObject.ResolutionStrategy.LOCAL_ONLY, (m, d) -> found.getConflictResolver());
|
||||
var resolver = conflictResolvers.select(resolverClass);
|
||||
var result = resolver.get().resolve(from, found);
|
||||
if (result.equals(ConflictResolver.ConflictResolutionResult.RESOLVED)) {
|
||||
Log.info("Resolved conflict for " + from + " " + header.getName());
|
||||
|
||||
@@ -29,4 +29,8 @@ public class TransientPeerState {
|
||||
@Getter
|
||||
@Setter
|
||||
private int _port;
|
||||
|
||||
@Getter
|
||||
@Setter
|
||||
private int _securePort;
|
||||
}
|
||||
|
||||
@@ -22,6 +22,9 @@ public class LocalPeerDiscoveryBroadcaster {
|
||||
@ConfigProperty(name = "quarkus.http.port")
|
||||
Integer ourPort;
|
||||
|
||||
@ConfigProperty(name = "quarkus.http.ssl-port")
|
||||
Integer ourSecurePort;
|
||||
|
||||
@ConfigProperty(name = "dhfs.objects.distributed.peerdiscovery.port")
|
||||
Integer broadcastPort;
|
||||
|
||||
@@ -62,6 +65,7 @@ public class LocalPeerDiscoveryBroadcaster {
|
||||
var sendData = PeerDiscoveryInfo.newBuilder()
|
||||
.setUuid(persistentRemoteHostsService.getSelfUuid().toString())
|
||||
.setPort(ourPort)
|
||||
.setSecurePort(ourSecurePort)
|
||||
.build();
|
||||
|
||||
var sendBytes = sendData.toByteArray();
|
||||
|
||||
@@ -57,7 +57,7 @@ public class LocalPeerDiscoveryClient {
|
||||
try {
|
||||
var got = PeerDiscoveryInfo.parseFrom(ByteBuffer.wrap(buf, 0, packet.getLength()));
|
||||
|
||||
remoteHostManager.notifyAddr(UUID.fromString(got.getUuid()), packet.getAddress().getHostAddress(), got.getPort());
|
||||
remoteHostManager.notifyAddr(UUID.fromString(got.getUuid()), packet.getAddress().getHostAddress(), got.getPort(), got.getSecurePort());
|
||||
|
||||
} catch (InvalidProtocolBufferException e) {
|
||||
continue;
|
||||
|
||||
@@ -1,15 +1,15 @@
|
||||
package com.usatiuk.dhfs.storage.objects.repository.distributed.peersync;
|
||||
|
||||
import com.google.protobuf.ByteString;
|
||||
import com.usatiuk.dhfs.objects.repository.distributed.peersync.PeerInfo;
|
||||
import com.usatiuk.dhfs.objects.repository.distributed.peersync.SyncPeersData;
|
||||
import com.usatiuk.dhfs.storage.objects.repository.distributed.HostInfo;
|
||||
import com.usatiuk.dhfs.storage.objects.repository.distributed.PersistentRemoteHostsService;
|
||||
import com.usatiuk.dhfs.storage.objects.repository.distributed.RemoteHostManager;
|
||||
import com.usatiuk.dhfs.storage.objects.repository.distributed.RpcClientFactory;
|
||||
import com.usatiuk.dhfs.storage.objects.repository.distributed.*;
|
||||
import io.quarkus.logging.Log;
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
import jakarta.inject.Inject;
|
||||
|
||||
import java.security.cert.CertificateEncodingException;
|
||||
import java.security.cert.CertificateException;
|
||||
import java.util.UUID;
|
||||
|
||||
@ApplicationScoped
|
||||
@@ -30,12 +30,24 @@ public class PeerSyncClient {
|
||||
for (var h : persistentRemoteHostsService.getHosts()) {
|
||||
builder.addMyPeers(h.toPeerInfo());
|
||||
}
|
||||
builder.addMyPeers(PeerInfo.newBuilder().setUuid(persistentRemoteHostsService.getSelfUuid().toString()).build());
|
||||
try {
|
||||
builder.addMyPeers(PeerInfo.newBuilder().setUuid(persistentRemoteHostsService.getSelfUuid().toString())
|
||||
.setCert(ByteString.copyFrom(persistentRemoteHostsService.getSelfCertificate().getEncoded()))
|
||||
.build());
|
||||
} catch (CertificateEncodingException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
return client.syncPeers(builder.build());
|
||||
});
|
||||
|
||||
for (var np : ret.getMyPeersList()) {
|
||||
persistentRemoteHostsService.addHost(new HostInfo(np.getUuid()));
|
||||
try {
|
||||
persistentRemoteHostsService.addHost(
|
||||
new HostInfo(UUID.fromString(np.getUuid()),
|
||||
CertificateTools.certFromBytes(np.getCert().toByteArray())));
|
||||
} catch (CertificateException e) {
|
||||
Log.error("Error adding peer " + np.getUuid(), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,15 +1,23 @@
|
||||
package com.usatiuk.dhfs.storage.objects.repository.distributed.peersync;
|
||||
|
||||
import com.google.protobuf.ByteString;
|
||||
import com.usatiuk.dhfs.objects.repository.distributed.peersync.DhfsObjectPeerSyncGrpc;
|
||||
import com.usatiuk.dhfs.objects.repository.distributed.peersync.GetSelfInfoRequest;
|
||||
import com.usatiuk.dhfs.objects.repository.distributed.peersync.PeerInfo;
|
||||
import com.usatiuk.dhfs.objects.repository.distributed.peersync.SyncPeersData;
|
||||
import com.usatiuk.dhfs.storage.objects.repository.distributed.CertificateTools;
|
||||
import com.usatiuk.dhfs.storage.objects.repository.distributed.HostInfo;
|
||||
import com.usatiuk.dhfs.storage.objects.repository.distributed.PersistentRemoteHostsService;
|
||||
import io.quarkus.grpc.GrpcService;
|
||||
import io.quarkus.logging.Log;
|
||||
import io.smallrye.common.annotation.Blocking;
|
||||
import io.smallrye.mutiny.Uni;
|
||||
import jakarta.inject.Inject;
|
||||
|
||||
import java.security.cert.CertificateEncodingException;
|
||||
import java.security.cert.CertificateException;
|
||||
import java.util.UUID;
|
||||
|
||||
@GrpcService
|
||||
public class PeerSyncServer implements DhfsObjectPeerSyncGrpc {
|
||||
@Inject
|
||||
@@ -23,12 +31,39 @@ public class PeerSyncServer implements DhfsObjectPeerSyncGrpc {
|
||||
for (var h : persistentRemoteHostsService.getHosts()) {
|
||||
builder.addMyPeers(h.toPeerInfo());
|
||||
}
|
||||
builder.addMyPeers(PeerInfo.newBuilder().setUuid(persistentRemoteHostsService.getSelfUuid().toString()).build());
|
||||
try {
|
||||
builder.addMyPeers(PeerInfo.newBuilder()
|
||||
.setUuid(persistentRemoteHostsService.getSelfUuid().toString())
|
||||
.setCert(ByteString.copyFrom(persistentRemoteHostsService.getSelfCertificate().getEncoded()))
|
||||
.build());
|
||||
} catch (CertificateEncodingException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
||||
for (var np : request.getMyPeersList()) {
|
||||
persistentRemoteHostsService.addHost(new HostInfo(np.getUuid()));
|
||||
try {
|
||||
persistentRemoteHostsService.addHost(
|
||||
new HostInfo(UUID.fromString(np.getUuid()),
|
||||
CertificateTools.certFromBytes(np.getCert().toByteArray())));
|
||||
} catch (CertificateException e) {
|
||||
Log.error("Error adding peer " + np.getUuid(), e);
|
||||
}
|
||||
}
|
||||
|
||||
return Uni.createFrom().item(builder.build());
|
||||
}
|
||||
|
||||
@Override
|
||||
@Blocking
|
||||
public Uni<PeerInfo> getSelfInfo(GetSelfInfoRequest request) {
|
||||
try {
|
||||
return Uni.createFrom().item(
|
||||
PeerInfo.newBuilder()
|
||||
.setUuid(persistentRemoteHostsService.getSelfUuid().toString())
|
||||
.setCert(ByteString.copyFrom(persistentRemoteHostsService.getSelfCertificate().getEncoded()))
|
||||
.build());
|
||||
} catch (CertificateEncodingException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,44 @@
|
||||
package com.usatiuk.dhfs.storage.objects.repository.distributed.peertrust;
|
||||
|
||||
import com.usatiuk.dhfs.storage.objects.repository.distributed.PersistentRemoteHostsService;
|
||||
import io.quarkus.security.credential.CertificateCredential;
|
||||
import io.quarkus.security.identity.AuthenticationRequestContext;
|
||||
import io.quarkus.security.identity.SecurityIdentity;
|
||||
import io.quarkus.security.identity.SecurityIdentityAugmentor;
|
||||
import io.quarkus.security.runtime.QuarkusSecurityIdentity;
|
||||
import io.smallrye.mutiny.Uni;
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
import jakarta.inject.Inject;
|
||||
|
||||
import java.util.function.Supplier;
|
||||
|
||||
@ApplicationScoped
|
||||
public class PeerRolesAugmentor implements SecurityIdentityAugmentor {
|
||||
@Inject
|
||||
PersistentRemoteHostsService persistentRemoteHostsService;
|
||||
|
||||
@Override
|
||||
public Uni<SecurityIdentity> augment(SecurityIdentity identity, AuthenticationRequestContext context) {
|
||||
return Uni.createFrom().item(build(identity));
|
||||
}
|
||||
|
||||
private Supplier<SecurityIdentity> build(SecurityIdentity identity) {
|
||||
if (identity.isAnonymous()) {
|
||||
return () -> identity;
|
||||
} else {
|
||||
QuarkusSecurityIdentity.Builder builder = QuarkusSecurityIdentity.builder(identity);
|
||||
|
||||
var entry = persistentRemoteHostsService.getHosts().stream()
|
||||
.filter(i -> i.getUuid().toString()
|
||||
.equals(identity.getPrincipal().getName().substring(3)))
|
||||
.findFirst();
|
||||
if (entry.isEmpty()) return () -> identity;
|
||||
|
||||
if (!entry.get().getCertificate().equals(identity.getCredential(CertificateCredential.class).getCertificate()))
|
||||
return () -> identity;
|
||||
|
||||
builder.addRole("cluster-member");
|
||||
return builder::build;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,68 @@
|
||||
package com.usatiuk.dhfs.storage.objects.repository.distributed.peertrust;
|
||||
|
||||
import com.usatiuk.dhfs.storage.objects.repository.distributed.HostInfo;
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
|
||||
import javax.net.ssl.TrustManager;
|
||||
import javax.net.ssl.TrustManagerFactory;
|
||||
import javax.net.ssl.X509TrustManager;
|
||||
import java.io.IOException;
|
||||
import java.security.KeyStore;
|
||||
import java.security.KeyStoreException;
|
||||
import java.security.NoSuchAlgorithmException;
|
||||
import java.security.cert.CertificateException;
|
||||
import java.security.cert.X509Certificate;
|
||||
import java.util.Collection;
|
||||
|
||||
@ApplicationScoped
|
||||
public class PeerTrustManager implements X509TrustManager {
|
||||
private X509TrustManager trustManager;
|
||||
|
||||
@Override
|
||||
public void checkClientTrusted(X509Certificate[] chain, String authType) throws CertificateException {
|
||||
trustManager.checkClientTrusted(chain, authType);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void checkServerTrusted(X509Certificate[] chain, String authType) throws CertificateException {
|
||||
trustManager.checkServerTrusted(chain, authType);
|
||||
}
|
||||
|
||||
@Override
|
||||
public X509Certificate[] getAcceptedIssuers() {
|
||||
return new X509Certificate[0];
|
||||
}
|
||||
|
||||
public void reloadTrustManagerHosts(Collection<HostInfo> hosts) {
|
||||
try {
|
||||
reloadTrustManager(hosts.stream().map(hostInfo ->
|
||||
Pair.of(hostInfo.getUuid().toString(), hostInfo.getCertificate())).toList());
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
private void reloadTrustManager(Collection<Pair<String, X509Certificate>> certs) throws KeyStoreException, NoSuchAlgorithmException, CertificateException, IOException {
|
||||
KeyStore ts = KeyStore.getInstance(KeyStore.getDefaultType());
|
||||
ts.load(null, null);
|
||||
|
||||
for (var cert : certs) {
|
||||
ts.setCertificateEntry(cert.getLeft(), cert.getRight());
|
||||
}
|
||||
|
||||
TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
|
||||
tmf.init(ts);
|
||||
|
||||
TrustManager tms[] = tmf.getTrustManagers();
|
||||
for (var tm : tms) {
|
||||
if (tm instanceof X509TrustManager) {
|
||||
trustManager = (X509TrustManager) tm;
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
throw new NoSuchAlgorithmException("No X509TrustManager in TrustManagerFactory");
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,44 @@
|
||||
package com.usatiuk.dhfs.storage.objects.repository.distributed.peertrust;
|
||||
|
||||
|
||||
import com.usatiuk.dhfs.storage.objects.repository.distributed.PersistentRemoteHostsService;
|
||||
import io.quarkus.vertx.http.HttpServerOptionsCustomizer;
|
||||
import io.vertx.core.http.HttpServerOptions;
|
||||
import io.vertx.core.net.KeyCertOptions;
|
||||
import io.vertx.core.net.TrustOptions;
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
import jakarta.inject.Inject;
|
||||
|
||||
import javax.net.ssl.KeyManagerFactory;
|
||||
import java.security.KeyStore;
|
||||
import java.security.cert.Certificate;
|
||||
|
||||
@ApplicationScoped
|
||||
public class PeerTrustServerCustomizer implements HttpServerOptionsCustomizer {
|
||||
|
||||
@Inject
|
||||
PeerTrustManager peerTrustManager;
|
||||
|
||||
@Inject
|
||||
PersistentRemoteHostsService persistentRemoteHostsService;
|
||||
|
||||
@Override
|
||||
public void customizeHttpsServer(HttpServerOptions options) {
|
||||
try {
|
||||
KeyStore ks = KeyStore.getInstance(KeyStore.getDefaultType());
|
||||
ks.load(null, null);
|
||||
|
||||
ks.setKeyEntry("sslkey",
|
||||
persistentRemoteHostsService.getSelfKeypair().getPrivate(), null,
|
||||
new Certificate[]{persistentRemoteHostsService.getSelfCertificate()});
|
||||
|
||||
KeyManagerFactory keyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
|
||||
keyManagerFactory.init(ks, null);
|
||||
|
||||
options.setKeyCertOptions(KeyCertOptions.wrap(keyManagerFactory));
|
||||
options.setTrustOptions(TrustOptions.wrap(peerTrustManager));
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException("Error configuring https: ", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,4 @@
|
||||
package com.usatiuk.dhfs.storage.objects.repository.distributed.webapi;
|
||||
|
||||
public record AvailablePeerInfo(String uuid, String addr, int port) {
|
||||
}
|
||||
@@ -0,0 +1,40 @@
|
||||
package com.usatiuk.dhfs.storage.objects.repository.distributed.webapi;
|
||||
|
||||
import com.usatiuk.dhfs.storage.objects.repository.distributed.HostInfo;
|
||||
import com.usatiuk.dhfs.storage.objects.repository.distributed.PersistentRemoteHostsService;
|
||||
import com.usatiuk.dhfs.storage.objects.repository.distributed.RemoteHostManager;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.ws.rs.GET;
|
||||
import jakarta.ws.rs.PUT;
|
||||
import jakarta.ws.rs.Path;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
|
||||
@Path("/objects-manage")
|
||||
public class ManagementApi {
|
||||
@Inject
|
||||
RemoteHostManager remoteHostManager;
|
||||
|
||||
@Inject
|
||||
PersistentRemoteHostsService persistentRemoteHostsService;
|
||||
|
||||
@Path("known-peers")
|
||||
@GET
|
||||
public List<HostInfo> knownPeers() {
|
||||
return persistentRemoteHostsService.getHosts();
|
||||
}
|
||||
|
||||
@Path("known-peers")
|
||||
@PUT
|
||||
public void addPeer(String hostname) {
|
||||
remoteHostManager.addRemoteHost(UUID.fromString(hostname));
|
||||
}
|
||||
|
||||
@Path("available-peers")
|
||||
@GET
|
||||
public Collection<AvailablePeerInfo> availablePeers() {
|
||||
return remoteHostManager.getSeenButNotAddedHosts();
|
||||
}
|
||||
}
|
||||
@@ -9,4 +9,5 @@ package dhfs.objects.peerdiscovery;
|
||||
message PeerDiscoveryInfo {
|
||||
string uuid = 1;
|
||||
uint32 port = 2;
|
||||
uint32 securePort = 3;
|
||||
}
|
||||
|
||||
@@ -8,10 +8,17 @@ package dhfs.objects.peersync;
|
||||
|
||||
service DhfsObjectPeerSyncGrpc {
|
||||
rpc SyncPeers (SyncPeersData) returns (SyncPeersData) {}
|
||||
|
||||
rpc GetSelfInfo (GetSelfInfoRequest) returns (PeerInfo) {}
|
||||
}
|
||||
|
||||
message GetSelfInfoRequest {
|
||||
|
||||
}
|
||||
|
||||
message PeerInfo {
|
||||
string uuid = 1;
|
||||
bytes cert = 2;
|
||||
}
|
||||
|
||||
message SyncPeersData {
|
||||
|
||||
@@ -29,3 +29,5 @@ dhfs.objects.deletion.delay=0
|
||||
dhfs.objects.ref_verification=false
|
||||
dhfs.files.use_hash_for_chunks=false
|
||||
quarkus.log.category."com.usatiuk.dhfs".min-level=TRACE
|
||||
quarkus.http.insecure-requests=enabled
|
||||
quarkus.http.ssl.client-auth=request
|
||||
Reference in New Issue
Block a user