half broken peer sync

This commit is contained in:
2024-07-06 22:09:07 +02:00
parent 672e9a6f37
commit f26a332500
37 changed files with 483 additions and 239 deletions

View File

@@ -13,7 +13,8 @@ services:
command: "java --add-exports java.base/sun.nio.ch=ALL-UNNAMED
-Ddhfs.objects.persistence.files.root=/dhfs_root/p
-Ddhfs.objects.distributed.root=/dhfs_root/d
-Ddhfs.fuse.root=/dhfs_root/fuse -Dquarkus.http.host=0.0.0.0 -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=*:5005
-Ddhfs.fuse.root=/dhfs_root/fuse -Dquarkus.http.host=0.0.0.0
-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=*:5005
-jar /app/quarkus-run.jar"
ports:
- 8080:8080
@@ -32,7 +33,9 @@ services:
-Ddhfs.objects.persistence.files.root=/dhfs_root/p
-Ddhfs.objects.distributed.root=/dhfs_root/d
-Ddhfs.fuse.root=/dhfs_root/fuse -Dquarkus.http.host=0.0.0.0
-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=*:5010
-jar /app/quarkus-run.jar"
ports:
- 8090:8080
- 8091:8443
- 5010:5010

View File

@@ -1,8 +1,10 @@
package com.usatiuk.dhfs.storage.files.conflicts;
import com.usatiuk.dhfs.storage.SerializationHelper;
import com.usatiuk.dhfs.objects.repository.distributed.ObjectHeader;
import com.usatiuk.dhfs.storage.files.objects.Directory;
import com.usatiuk.dhfs.storage.files.objects.FsNode;
import com.usatiuk.dhfs.storage.objects.jrepository.JObject;
import com.usatiuk.dhfs.storage.objects.jrepository.JObjectData;
import com.usatiuk.dhfs.storage.objects.jrepository.JObjectManager;
import com.usatiuk.dhfs.storage.objects.repository.distributed.ConflictResolver;
import com.usatiuk.dhfs.storage.objects.repository.distributed.PersistentRemoteHostsService;
@@ -28,10 +30,8 @@ public class DirectoryConflictResolver implements ConflictResolver {
JObjectManager jObjectManager;
@Override
public ConflictResolutionResult resolve(UUID conflictHost, JObject<?> ours) {
var theirsData = remoteObjectServiceClient.getSpecificObject(conflictHost, ours.getName());
var theirsDir = (Directory) SerializationHelper.deserialize(theirsData.getRight());
public ConflictResolutionResult resolve(UUID conflictHost, ObjectHeader theirsHeader, JObjectData theirsData, JObject<?> ours) {
var theirsDir = (Directory) theirsData;
if (!theirsDir.getClass().equals(Directory.class)) {
Log.error("Object type mismatch!");
throw new NotImplementedException();
@@ -43,8 +43,6 @@ public class DirectoryConflictResolver implements ConflictResolver {
if (!(oursDirU instanceof Directory oursDir))
throw new NotImplementedException("Type conflict for " + ours.getName() + ", directory was expected");
var theirsHeader = theirsData.getLeft();
Directory first;
Directory second;
UUID otherHostname;
@@ -96,7 +94,7 @@ public class DirectoryConflictResolver implements ConflictResolver {
for (var child : mergedChildren.values()) {
if (!(new HashSet<>(oursDir.getChildren().values()).contains(child))) {
jObjectManager.getOrPut(child.toString(), Optional.of(oursDir.getName()));
jObjectManager.getOrPut(child.toString(), FsNode.class, Optional.of(oursDir.getName()));
}
}

View File

@@ -1,11 +1,12 @@
package com.usatiuk.dhfs.storage.files.conflicts;
import com.usatiuk.dhfs.storage.SerializationHelper;
import com.usatiuk.dhfs.objects.repository.distributed.ObjectHeader;
import com.usatiuk.dhfs.storage.files.objects.ChunkData;
import com.usatiuk.dhfs.storage.files.objects.ChunkInfo;
import com.usatiuk.dhfs.storage.files.objects.Directory;
import com.usatiuk.dhfs.storage.files.objects.File;
import com.usatiuk.dhfs.storage.objects.jrepository.JObject;
import com.usatiuk.dhfs.storage.objects.jrepository.JObjectData;
import com.usatiuk.dhfs.storage.objects.jrepository.JObjectManager;
import com.usatiuk.dhfs.storage.objects.repository.distributed.ConflictResolver;
import com.usatiuk.dhfs.storage.objects.repository.distributed.PersistentRemoteHostsService;
@@ -36,10 +37,9 @@ public class FileConflictResolver implements ConflictResolver {
boolean useHashForChunks;
@Override
public ConflictResolutionResult resolve(UUID conflictHost, JObject<?> ours) {
var theirsData = remoteObjectServiceClient.getSpecificObject(conflictHost, ours.getName());
public ConflictResolutionResult resolve(UUID conflictHost, ObjectHeader theirsHeader, JObjectData theirsData, JObject<?> ours) {
var theirsFile = (File) SerializationHelper.deserialize(theirsData.getRight());
var theirsFile = (File) theirsData;
if (!theirsFile.getClass().equals(File.class)) {
Log.error("Object type mismatch!");
throw new NotImplementedException();
@@ -69,8 +69,6 @@ public class FileConflictResolver implements ConflictResolver {
// TODO: dedup
var theirsHeader = theirsData.getLeft();
File first;
File second;
UUID otherHostname;
@@ -123,8 +121,8 @@ public class FileConflictResolver implements ConflictResolver {
for (var e : firstChunksCopy) {
oursFile.getChunks().put(e.getLeft(), e.getValue());
jObjectManager.getOrPut(ChunkData.getNameFromHash(e.getValue()), Optional.of(ChunkInfo.getNameFromHash(e.getValue())));
jObjectManager.getOrPut(ChunkInfo.getNameFromHash(e.getValue()), Optional.of(oursFile.getName()));
jObjectManager.getOrPut(ChunkData.getNameFromHash(e.getValue()), ChunkData.class, Optional.of(ChunkInfo.getNameFromHash(e.getValue())));
jObjectManager.getOrPut(ChunkInfo.getNameFromHash(e.getValue()), ChunkInfo.class, Optional.of(oursFile.getName()));
}
oursFile.setMtime(first.getMtime());
@@ -137,8 +135,8 @@ public class FileConflictResolver implements ConflictResolver {
for (var e : secondChunksCopy) {
newFile.getChunks().put(e.getLeft(), e.getValue());
jObjectManager.getOrPut(ChunkData.getNameFromHash(e.getValue()), Optional.of(ChunkInfo.getNameFromHash(e.getValue())));
jObjectManager.getOrPut(ChunkInfo.getNameFromHash(e.getValue()), Optional.ofNullable(newFile.getName()));
jObjectManager.getOrPut(ChunkData.getNameFromHash(e.getValue()), ChunkData.class, Optional.of(ChunkInfo.getNameFromHash(e.getValue())));
jObjectManager.getOrPut(ChunkInfo.getNameFromHash(e.getValue()), ChunkInfo.class, Optional.ofNullable(newFile.getName()));
}
var theName = oursDir.getChildren().entrySet().stream()

View File

@@ -1,6 +1,8 @@
package com.usatiuk.dhfs.storage.files.conflicts;
import com.usatiuk.dhfs.objects.repository.distributed.ObjectHeader;
import com.usatiuk.dhfs.storage.objects.jrepository.JObject;
import com.usatiuk.dhfs.storage.objects.jrepository.JObjectData;
import com.usatiuk.dhfs.storage.objects.repository.distributed.ConflictResolver;
import jakarta.enterprise.context.ApplicationScoped;
@@ -9,7 +11,7 @@ import java.util.UUID;
@ApplicationScoped
public class NoOpConflictResolver implements ConflictResolver {
@Override
public ConflictResolutionResult resolve(UUID conflictHost, JObject<?> conflictSource) {
public ConflictResolutionResult resolve(UUID conflictHost, ObjectHeader theirsHeader, JObjectData theirsData, JObject<?> ours) {
// Maybe check types?
return ConflictResolutionResult.RESOLVED;
}

View File

@@ -1,6 +1,8 @@
package com.usatiuk.dhfs.storage.files.conflicts;
import com.usatiuk.dhfs.objects.repository.distributed.ObjectHeader;
import com.usatiuk.dhfs.storage.objects.jrepository.JObject;
import com.usatiuk.dhfs.storage.objects.jrepository.JObjectData;
import com.usatiuk.dhfs.storage.objects.repository.distributed.ConflictResolver;
import jakarta.enterprise.context.ApplicationScoped;
import org.apache.commons.lang3.NotImplementedException;
@@ -10,7 +12,7 @@ import java.util.UUID;
@ApplicationScoped
public class NotImplementedConflictResolver implements ConflictResolver {
@Override
public ConflictResolutionResult resolve(UUID conflictHost, JObject<?> conflictSource) {
public ConflictResolutionResult resolve(UUID conflictHost, ObjectHeader theirsHeader, JObjectData theirsData, JObject<?> ours) {
throw new NotImplementedException();
}
}

View File

@@ -1,6 +1,7 @@
package com.usatiuk.dhfs.storage.files.objects;
import com.usatiuk.dhfs.storage.files.conflicts.NoOpConflictResolver;
import com.usatiuk.dhfs.storage.objects.jrepository.JObject;
import com.usatiuk.dhfs.storage.objects.jrepository.JObjectData;
import com.usatiuk.dhfs.storage.objects.repository.distributed.ConflictResolver;
import lombok.Getter;
@@ -47,6 +48,11 @@ public class ChunkInfo extends JObjectData {
return "info_" + hash;
}
@Override
public Class<? extends JObjectData> getRefType() {
return ChunkData.class;
}
@Override
public Collection<String> extractRefs() {
return List.of(ChunkData.getNameFromHash(_hash));

View File

@@ -1,6 +1,7 @@
package com.usatiuk.dhfs.storage.files.objects;
import com.usatiuk.dhfs.storage.files.conflicts.DirectoryConflictResolver;
import com.usatiuk.dhfs.storage.objects.jrepository.JObjectData;
import com.usatiuk.dhfs.storage.objects.repository.distributed.ConflictResolver;
import lombok.Getter;
import lombok.Setter;
@@ -41,6 +42,10 @@ public class Directory extends FsNode {
_children.put(name, uuid);
return true;
}
@Override
public Class<? extends JObjectData> getRefType() {
return FsNode.class;
}
@Override
public Collection<String> extractRefs() {

View File

@@ -1,6 +1,7 @@
package com.usatiuk.dhfs.storage.files.objects;
import com.usatiuk.dhfs.storage.files.conflicts.FileConflictResolver;
import com.usatiuk.dhfs.storage.objects.jrepository.JObjectData;
import com.usatiuk.dhfs.storage.objects.repository.distributed.ConflictResolver;
import lombok.Getter;
@@ -20,6 +21,11 @@ public class File extends FsNode {
return FileConflictResolver.class;
}
@Override
public Class<? extends JObjectData> getRefType() {
return ChunkInfo.class;
}
@Override
public Collection<String> extractRefs() {
return _chunks.values().stream().map(ChunkInfo::getNameFromHash).toList();

View File

@@ -1,7 +1,6 @@
package com.usatiuk.dhfs.storage.objects.jrepository;
import com.usatiuk.dhfs.storage.objects.repository.distributed.ConflictResolver;
import com.usatiuk.dhfs.storage.objects.repository.distributed.ObjectMetadata;
import io.quarkus.logging.Log;
import jakarta.annotation.Nullable;
import org.apache.commons.lang3.NotImplementedException;
@@ -37,7 +36,7 @@ public class JObject<T extends JObjectData> implements Serializable, Comparable<
// Create a new object
protected JObject(JObjectResolver resolver, String name, UUID selfUuid, T obj) {
_resolver = resolver;
_metaPart = new ObjectMetadata(name, false);
_metaPart = new ObjectMetadata(name, false, obj.getClass());
_dataPart.set(obj);
// FIXME:?
if (!obj.assumeUnique())
@@ -50,6 +49,15 @@ public class JObject<T extends JObjectData> implements Serializable, Comparable<
_metaPart = objectMetadata;
}
public Class<? extends JObjectData> getKnownClass() {
return _metaPart.getKnownClass();
}
public void narrowClass(Class<? extends JObjectData> klass) {
_metaPart.narrowClass(klass);
}
public String getName() {
return _metaPart.getName();
}
@@ -106,7 +114,9 @@ public class JObject<T extends JObjectData> implements Serializable, Comparable<
_lock.writeLock().lock();
try {
if (_dataPart.get() == null) {
_dataPart.set(_resolver.resolveData(this));
var res = _resolver.resolveData(this);
_metaPart.narrowClass(res.getClass());
_dataPart.set(res);
hydrateRefs();
verifyRefs();
} // _dataPart.get() == null
@@ -118,21 +128,36 @@ public class JObject<T extends JObjectData> implements Serializable, Comparable<
private void tryLocalResolve() {
if (_dataPart.get() == null) {
_lock.writeLock().lock();
_lock.readLock().lock();
try {
if (_dataPart.get() == null) {
if (_metaPart.getSavedRefs() != null && !_metaPart.getSavedRefs().isEmpty())
throw new IllegalStateException("Object " + getName() + " has non-hydrated refs when written locally");
var res = _resolver.resolveDataLocal(this);
if (res.isEmpty()) return;
_dataPart.set(res.get());
hydrateRefs();
verifyRefs();
_metaPart.narrowClass(res.get().getClass());
_dataPart.compareAndSet(null, res.get());
} // _dataPart.get() == null
} finally {
_lock.writeLock().unlock();
_lock.readLock().unlock();
} // try
} // _dataPart.get() == null
}
public void externalResolution(T data) {
assertRWLock();
if (_dataPart.get() != null)
throw new IllegalStateException("Data is not null when recording external resolution of " + getName());
_metaPart.narrowClass(data.getClass());
_dataPart.set(data);
if (!_metaPart.isLocked())
_metaPart.lock();
hydrateRefs();
verifyRefs();
}
public enum ResolutionStrategy {
NO_RESOLUTION,
LOCAL_ONLY,
@@ -177,6 +202,17 @@ public class JObject<T extends JObjectData> implements Serializable, Comparable<
};
var ret = fn.apply(_metaPart, _dataPart.get(), this::bumpVer, invalidateFn);
_resolver.updateDeletionState(this);
if (_resolver._bumpVerification) {
if (_dataPart.get() != null && _dataPart.get().assumeUnique())
if (!Objects.equals(ver, _metaPart.getChangelog()))
throw new IllegalStateException("Object changed but is assumed immutable: " + getName());
// Todo: data check?
}
if (_dataPart.get() != null)
_metaPart.narrowClass(_dataPart.get().getClass());
if (!Objects.equals(ver, _metaPart.getChangelog())
|| ref != _metaPart.getRefcount()
|| wasDeleted != _metaPart.isDeleted()

View File

@@ -7,12 +7,21 @@ import java.util.Collection;
public abstract class JObjectData implements Serializable {
public abstract String getName();
public abstract Class<? extends ConflictResolver> getConflictResolver();
public boolean assumeUnique() {
return false;
}
public Class<? extends JObjectData> getRefType() {
throw new UnsupportedOperationException("This object shouldn't have refs");
}
public boolean pushResolution() {
return false;
}
public abstract Collection<String> extractRefs();
public long estimateSize() {

View File

@@ -13,7 +13,7 @@ public interface JObjectManager {
<T extends JObjectData> JObject<T> put(T object, Optional<String> parent);
// Get an object with a name if it exists, otherwise create new one based on metadata
JObject<?> getOrPut(String name, Optional<String> parent);
JObject<?> getOrPut(String name, Class<? extends JObjectData> klass, Optional<String> parent);
void notifySent(String key);
}

View File

@@ -1,9 +1,7 @@
package com.usatiuk.dhfs.storage.objects.jrepository;
import com.google.common.collect.Streams;
import com.google.protobuf.ByteString;
import com.usatiuk.dhfs.storage.SerializationHelper;
import com.usatiuk.dhfs.storage.objects.repository.distributed.ObjectMetadata;
import com.usatiuk.dhfs.storage.objects.repository.distributed.PersistentRemoteHostsService;
import com.usatiuk.dhfs.storage.objects.repository.persistence.ObjectPersistentStore;
import io.grpc.Status;
@@ -21,7 +19,6 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Optional;
import java.util.stream.Stream;
@ApplicationScoped
public class JObjectManagerImpl implements JObjectManager {
@@ -165,13 +162,16 @@ public class JObjectManagerImpl implements JObjectManager {
}
@Override
public JObject<?> getOrPut(String name, Optional<String> parent) {
public JObject<?> getOrPut(String name, Class<? extends JObjectData> klass, Optional<String> parent) {
while (true) {
var got = get(name);
if (got.isPresent()) {
got.get().narrowClass(klass);
if (parent.isPresent())
got.get().runWriteLocked(JObject.ResolutionStrategy.NO_RESOLUTION, (m, d, b, i) -> {
if (m.isLocked())
m.unlock();
m.addRef(parent.get());
return true;
});
@@ -187,7 +187,7 @@ public class JObjectManagerImpl implements JObjectManager {
if (objectPersistentStore.existsObject("meta_" + name))
continue;
var created = new JObject<>(jObjectResolver, new ObjectMetadata(name, false));
var created = new JObject<>(jObjectResolver, new ObjectMetadata(name, false, klass));
_map.put(name, new NamedSoftReference(created, _refQueue));
created.runWriteLocked(JObject.ResolutionStrategy.NO_RESOLUTION, (m, d, b, i) -> {
parent.ifPresent(m::addRef);

View File

@@ -75,6 +75,7 @@ public class JObjectRefProcessor {
if (got.isEmpty()) continue;
try {
got.get().runWriteLocked(JObject.ResolutionStrategy.NO_RESOLUTION, (m, d, v, i) -> {
if (m.isLocked()) return null;
if (m.isDeleted()) return null;
if (m.getRefcount() > 0) return null;

View File

@@ -10,6 +10,8 @@ import io.grpc.StatusRuntimeException;
import io.quarkus.logging.Log;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import org.apache.commons.collections4.MultiValuedMap;
import org.apache.commons.collections4.multimap.ArrayListValuedHashMap;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import java.util.Collections;
@@ -39,9 +41,19 @@ public class JObjectResolver {
@Inject
JObjectRefProcessor jObjectRefProcessor;
private final MultiValuedMap<Class<? extends JObjectData>, JObject.ObjectFnWrite<?, Void>> _writeListeners
= new ArrayListValuedHashMap<>();
@ConfigProperty(name = "dhfs.objects.ref_verification")
boolean refVerification;
@ConfigProperty(name = "dhfs.objects.bump_verification")
boolean _bumpVerification;
public <T extends JObjectData> void registerWriteListener(Class<T> klass, JObject.ObjectFnWrite<T, Void> fn) {
_writeListeners.put(klass, fn);
}
public void backupRefs(JObject<?> self) {
self.assertRWLock();
if (self.getData() != null) {
@@ -69,7 +81,7 @@ public class JObjectResolver {
for (var r : extracted) {
if (!self.getMeta().getSavedRefs().contains(r)) {
Log.trace("Hydrating ref " + r + " for " + self.getName());
jobjectManager.getOrPut(r, Optional.of(self.getName()));
jobjectManager.getOrPut(r, self.getData().getRefType(), Optional.of(self.getName()));
}
}
self.getMeta().setSavedRefs(null);
@@ -85,7 +97,7 @@ public class JObjectResolver {
if (self.isResolved()) {
for (var r : self.getData().extractRefs()) {
Log.trace("Hydrating ref after undelete " + r + " for " + self.getName());
jobjectManager.getOrPut(r, Optional.of(self.getName()));
jobjectManager.getOrPut(r, self.getData().getRefType(), Optional.of(self.getName()));
}
}
}
@@ -97,7 +109,8 @@ public class JObjectResolver {
}
public <T extends JObjectData> Optional<T> resolveDataLocal(JObject<T> jObject) {
jObject.assertRWLock();
// jObject.assertRWLock();
// FIXME: No way to assert read lock?
if (objectPersistentStore.existsObject(jObject.getName()))
return Optional.of(SerializationHelper.deserialize(objectPersistentStore.readObject(jObject.getName())));
return Optional.empty();
@@ -133,11 +146,16 @@ public class JObjectResolver {
jObjectWriteback.markDirty(self);
}
public void notifyWriteData(JObject<?> self) {
public <T extends JObjectData> void notifyWriteData(JObject<T> self) {
self.assertRWLock();
jObjectWriteback.markDirty(self);
if (self.isResolved())
if (self.isResolved()) {
invalidationQueueService.pushInvalidationToAll(self.getName(), !self.getMeta().isSeen());
for (var l : _writeListeners.get(self.getData().getClass())) {
// TODO: Assert types?
self.runWriteLocked(JObject.ResolutionStrategy.NO_RESOLUTION, (JObject.ObjectFnWrite<T, ?>) l);
}
}
}
public void bumpVersionSelf(JObject<?> self) {

View File

@@ -1,7 +1,6 @@
package com.usatiuk.dhfs.storage.objects.jrepository;
import com.usatiuk.dhfs.storage.SerializationHelper;
import com.usatiuk.dhfs.storage.objects.repository.distributed.ObjectMetadata;
import com.usatiuk.dhfs.storage.objects.repository.persistence.ObjectPersistentStore;
import io.quarkus.logging.Log;
import io.quarkus.runtime.ShutdownEvent;

View File

@@ -1,8 +1,13 @@
package com.usatiuk.dhfs.storage.objects.repository.distributed;
package com.usatiuk.dhfs.storage.objects.jrepository;
import com.usatiuk.dhfs.objects.repository.distributed.ObjectChangelog;
import com.usatiuk.dhfs.objects.repository.distributed.ObjectChangelogEntry;
import com.usatiuk.dhfs.objects.repository.distributed.ObjectHeader;
import com.usatiuk.dhfs.storage.SerializationHelper;
import com.usatiuk.dhfs.storage.objects.repository.distributed.PersistentRemoteHostsService;
import com.usatiuk.dhfs.storage.objects.repository.distributed.peersync.PersistentPeerInfo;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import jakarta.enterprise.inject.spi.CDI;
import lombok.Getter;
import lombok.Setter;
@@ -13,11 +18,13 @@ import java.io.Serial;
import java.io.Serializable;
import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
public class ObjectMetadata implements Serializable {
public ObjectMetadata(String name, boolean written) {
public ObjectMetadata(String name, boolean written, Class<? extends JObjectData> knownClass) {
_name = name;
_written.set(written);
_knownClass.set(knownClass);
}
@Getter
@@ -34,6 +41,8 @@ public class ObjectMetadata implements Serializable {
@Setter
private Set<String> _savedRefs = Collections.emptySet();
private final AtomicReference<Class<? extends JObjectData>> _knownClass = new AtomicReference<>();
@Getter
private long _refcount = 0L;
@@ -46,6 +55,21 @@ public class ObjectMetadata implements Serializable {
private final AtomicBoolean _deleted = new AtomicBoolean(false);
public Class<? extends JObjectData> getKnownClass() {
return _knownClass.get();
}
protected void narrowClass(Class<? extends JObjectData> klass) {
Class<? extends JObjectData> got = null;
do {
got = _knownClass.get();
if (got.equals(klass)) return;
if (klass.isAssignableFrom(got)) return;
if (!got.isAssignableFrom(klass))
throw new StatusRuntimeException(Status.DATA_LOSS.withDescription("Could not narrow class of object " + getName() + " from " + got + " to " + klass));
} while (!_knownClass.compareAndSet(got, klass));
}
@Serial
private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
in.defaultReadObject();
@@ -131,7 +155,7 @@ public class ObjectMetadata implements Serializable {
var changelogBuilder = ObjectChangelog.newBuilder();
var hosts = CDI.current().select(PersistentRemoteHostsService.class).get();
var all = new ArrayList<>(hosts.getHosts().stream().map(HostInfo::getUuid).toList());
var all = new ArrayList<>(hosts.getHosts().stream().map(PersistentPeerInfo::getUuid).toList());
all.add(hosts.getSelfUuid());
for (var h : all) {
@@ -148,6 +172,17 @@ public class ObjectMetadata implements Serializable {
public ObjectHeader toRpcHeader() {
var headerBuilder = ObjectHeader.newBuilder().setName(getName());
headerBuilder.setChangelog(toRpcChangelog());
return headerBuilder.build();
}
public ObjectHeader toRpcHeader(JObjectData data) {
var headerBuilder = ObjectHeader.newBuilder().setName(getName());
headerBuilder.setChangelog(toRpcChangelog());
if (data != null && data.pushResolution())
headerBuilder.setPushedData(SerializationHelper.serialize(data));
return headerBuilder.build();
}
}

View File

@@ -1,6 +1,8 @@
package com.usatiuk.dhfs.storage.objects.repository.distributed;
import com.usatiuk.dhfs.objects.repository.distributed.ObjectHeader;
import com.usatiuk.dhfs.storage.objects.jrepository.JObject;
import com.usatiuk.dhfs.storage.objects.jrepository.JObjectData;
import java.util.UUID;
@@ -11,5 +13,5 @@ public interface ConflictResolver {
}
ConflictResolutionResult
resolve(UUID conflictHost, JObject<?> conflictSource);
resolve(UUID conflictHost, ObjectHeader conflictHeader, JObjectData conflictData, JObject<?> conflictSource);
}

View File

@@ -1,29 +0,0 @@
package com.usatiuk.dhfs.storage.objects.repository.distributed;
import com.google.protobuf.ByteString;
import com.usatiuk.dhfs.objects.repository.distributed.peersync.PeerInfo;
import jakarta.json.bind.annotation.JsonbCreator;
import jakarta.json.bind.annotation.JsonbProperty;
import lombok.AllArgsConstructor;
import lombok.Getter;
import java.io.Serializable;
import java.security.cert.CertificateEncodingException;
import java.security.cert.X509Certificate;
import java.util.UUID;
@Getter
@AllArgsConstructor
public class HostInfo implements Serializable {
private final UUID _uuid;
private final X509Certificate _certificate;
public PeerInfo toPeerInfo() {
try {
return PeerInfo.newBuilder().setUuid(_uuid.toString())
.setCert(ByteString.copyFrom(_certificate.getEncoded())).build();
} catch (CertificateEncodingException e) {
throw new RuntimeException(e);
}
}
}

View File

@@ -6,14 +6,10 @@ import lombok.Setter;
import java.io.Serializable;
import java.security.KeyPair;
import java.security.cert.X509Certificate;
import java.util.HashMap;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
public class PersistentRemoteHostsData implements Serializable {
@Getter
private final HashMap<UUID, HostInfo> _remoteHosts = new HashMap<>();
@Getter
private final UUID _selfUuid = UUID.randomUUID();

View File

@@ -1,7 +1,11 @@
package com.usatiuk.dhfs.storage.objects.repository.distributed;
import com.usatiuk.dhfs.storage.SerializationHelper;
import com.usatiuk.dhfs.storage.objects.repository.distributed.peersync.PeerSyncClient;
import com.usatiuk.dhfs.storage.objects.jrepository.JObject;
import com.usatiuk.dhfs.storage.objects.jrepository.JObjectManager;
import com.usatiuk.dhfs.storage.objects.jrepository.JObjectResolver;
import com.usatiuk.dhfs.storage.objects.repository.distributed.peersync.PeerDirectory;
import com.usatiuk.dhfs.storage.objects.repository.distributed.peersync.PersistentPeerInfo;
import com.usatiuk.dhfs.storage.objects.repository.distributed.peertrust.PeerTrustManager;
import io.quarkus.logging.Log;
import io.quarkus.runtime.ShutdownEvent;
@@ -19,7 +23,10 @@ import java.nio.file.Paths;
import java.security.KeyPair;
import java.security.cert.X509Certificate;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
@ApplicationScoped
public class PersistentRemoteHostsService {
@@ -27,10 +34,16 @@ public class PersistentRemoteHostsService {
String dataRoot;
@Inject
PeerSyncClient peerSyncClient;
PeerTrustManager peerTrustManager;
@Inject
PeerTrustManager peerTrustManager;
JObjectManager jObjectManager;
@Inject
JObjectResolver jObjectResolver;
@Inject
ExecutorService executorService;
final String dataFileName = "hosts";
@@ -47,7 +60,7 @@ public class PersistentRemoteHostsService {
}
_selfUuid = _persistentData.runReadLocked(PersistentRemoteHostsData::getSelfUuid);
if (_persistentData.runReadLocked(d -> d.getSelfCertificate() == null))
if (_persistentData.runReadLocked(d -> d.getSelfCertificate() == null)) {
_persistentData.runWriteLocked(d -> {
try {
Log.info("Generating a key pair, please wait");
@@ -58,12 +71,23 @@ public class PersistentRemoteHostsService {
}
return null;
});
jObjectManager.put(new PeerDirectory(), Optional.empty());
}
_persistentData.runReadLocked(d -> {
peerTrustManager.reloadTrustManagerHosts(d.getRemoteHosts().values());
jObjectResolver.registerWriteListener(PersistentPeerInfo.class, (m, d, i, v) -> {
Log.info("Scheduling certificate update after " + m.getName() + " was updated");
executorService.submit(this::updateCerts);
return null;
});
jObjectResolver.registerWriteListener(PeerDirectory.class, (m, d, i, v) -> {
Log.info("Scheduling certificate update after " + m.getName() + " was updated");
executorService.submit(this::updateCerts);
return null;
});
updateCerts();
Files.writeString(Paths.get(dataRoot, "self_uuid"), _selfUuid.toString());
Log.info("Self uuid is: " + _selfUuid.toString());
}
@@ -74,6 +98,41 @@ public class PersistentRemoteHostsService {
Log.info("Shutdown");
}
private JObject<PeerDirectory> getPeerDirectory() {
var got = jObjectManager.get(PeerDirectory.PeerDirectoryObjName).orElseThrow(() -> new IllegalStateException("Peer directory not found"));
got.runReadLocked(JObject.ResolutionStrategy.LOCAL_ONLY, (m, d) -> {
if (d == null) throw new IllegalStateException("Could not resolve peer directory!");
if (!(d instanceof PeerDirectory))
throw new IllegalStateException("Peer directory is of wrong type!");
return null;
});
return (JObject<PeerDirectory>) got;
}
private JObject<PersistentPeerInfo> getPeer(UUID uuid) {
var got = jObjectManager.get(uuid.toString()).orElseThrow(() -> new IllegalStateException("Peer " + uuid + " not found"));
got.runReadLocked(JObject.ResolutionStrategy.LOCAL_ONLY, (m, d) -> {
if (d == null) throw new IllegalStateException("Could not resolve peer " + uuid);
if (!(d instanceof PersistentPeerInfo))
throw new IllegalStateException("Peer " + uuid + " is of wrong type!");
return null;
});
return (JObject<PersistentPeerInfo>) got;
}
private List<PersistentPeerInfo> getPeersSnapshot() {
return getPeerDirectory().runReadLocked(JObject.ResolutionStrategy.LOCAL_ONLY, (m, d) -> {
return d.getPeers().stream().map(u -> {
try {
return getPeer(u).runReadLocked(JObject.ResolutionStrategy.LOCAL_ONLY, (m2, d2) -> d2);
} catch (Exception e) {
Log.warn("Error making snapshot of peer " + u, e);
return null;
}
}).filter(Objects::nonNull).toList();
});
}
public UUID getSelfUuid() {
if (_selfUuid == null)
throw new IllegalStateException();
@@ -84,40 +143,43 @@ public class PersistentRemoteHostsService {
return _selfUuid.toString() + _persistentData.runReadLocked(d -> d.getSelfCounter().addAndGet(1)).toString();
}
public HostInfo getInfo(UUID name) {
return _persistentData.runReadLocked(data -> {
return data.getRemoteHosts().get(name);
public PersistentPeerInfo getInfo(UUID name) {
return getPeer(name).runReadLocked(JObject.ResolutionStrategy.LOCAL_ONLY, (m, d) -> {
return d;
});
}
public List<HostInfo> getHosts() {
return _persistentData.runReadLocked(data -> {
return data.getRemoteHosts().values().stream().toList();
});
public List<PersistentPeerInfo> getHosts() {
return getPeersSnapshot().stream().filter(i -> !i.getUuid().equals(_selfUuid)).toList();
}
public boolean addHost(HostInfo hostInfo) {
if (hostInfo.getUuid().equals(_selfUuid)) return false;
boolean added = _persistentData.runWriteLocked(d -> {
return d.getRemoteHosts().put(hostInfo.getUuid(), hostInfo) == null;
public boolean addHost(PersistentPeerInfo persistentPeerInfo) {
if (persistentPeerInfo.getUuid().equals(_selfUuid)) return false;
boolean added = getPeerDirectory().runWriteLocked(JObject.ResolutionStrategy.LOCAL_ONLY, (m, d, b, v) -> {
boolean addedInner = d.getPeers().add(persistentPeerInfo.getUuid());
if (addedInner) {
jObjectManager.put(persistentPeerInfo, Optional.of(m.getName()));
b.apply();
}
return addedInner;
});
if (added) {
_persistentData.runReadLocked(d -> {
peerTrustManager.reloadTrustManagerHosts(d.getRemoteHosts().values());
return null;
});
// FIXME: async
peerSyncClient.syncPeersAll();
}
if (added)
updateCerts();
return added;
}
public boolean existsHost(UUID uuid) {
return _persistentData.runReadLocked(d -> {
return d.getRemoteHosts().containsKey(uuid);
private void updateCerts() {
getPeerDirectory().runReadLocked(JObject.ResolutionStrategy.LOCAL_ONLY, (m, d) -> {
peerTrustManager.reloadTrustManagerHosts(getHosts());
return null;
});
}
public boolean existsHost(UUID uuid) {
return getPeerDirectory().runReadLocked(JObject.ResolutionStrategy.LOCAL_ONLY, (m, d) -> d.getPeers().contains(uuid));
}
public KeyPair getSelfKeypair() {
return _persistentData.runReadLocked(PersistentRemoteHostsData::getSelfKeyPair);
}

View File

@@ -2,7 +2,7 @@ package com.usatiuk.dhfs.storage.objects.repository.distributed;
import com.usatiuk.dhfs.objects.repository.distributed.PingRequest;
import com.usatiuk.dhfs.objects.repository.distributed.peersync.GetSelfInfoRequest;
import com.usatiuk.dhfs.storage.objects.repository.distributed.peersync.PeerSyncClient;
import com.usatiuk.dhfs.storage.objects.repository.distributed.peersync.PersistentPeerInfo;
import com.usatiuk.dhfs.storage.objects.repository.distributed.webapi.AvailablePeerInfo;
import io.quarkus.logging.Log;
import io.quarkus.runtime.ShutdownEvent;
@@ -35,9 +35,6 @@ public class RemoteHostManager {
@Inject
RpcClientFactory rpcClientFactory;
@Inject
PeerSyncClient peerSyncClient;
@ConfigProperty(name = "dhfs.objects.distributed.sync.ping.timeout")
long pingTimeout;
@@ -45,7 +42,10 @@ public class RemoteHostManager {
private final ConcurrentMap<UUID, TransientPeerState> _seenHostsButNotAdded = new ConcurrentHashMap<>();
boolean _initialized = false;
void init(@Observes @Priority(350) StartupEvent event) throws IOException {
_initialized = true;
}
void shutdown(@Observes @Priority(250) ShutdownEvent event) throws IOException {
@@ -54,6 +54,7 @@ public class RemoteHostManager {
@Scheduled(every = "${dhfs.objects.distributed.reconnect_interval}", concurrentExecution = Scheduled.ConcurrentExecution.SKIP)
@Blocking
public void tryConnectAll() {
if (!_initialized) return;
for (var host : persistentRemoteHostsService.getHosts()) {
try {
var shouldTry = _transientPeersState.runReadLocked(d -> {
@@ -85,7 +86,6 @@ public class RemoteHostManager {
return null;
});
Log.info("Connected to " + host);
peerSyncClient.syncPeersOne(host);
syncHandler.doInitialResync(host);
}
@@ -155,12 +155,12 @@ public class RemoteHostManager {
if (!persistentRemoteHostsService.existsHost(host)) {
_seenHostsButNotAdded.put(host, state);
Log.trace("Ignoring new address from unknown host " + ": addr=" + addr + " port=" + port);
// Log.trace("Ignoring new address from unknown host " + ": addr=" + addr + " port=" + port);
return;
}
_transientPeersState.runWriteLocked(d -> {
Log.trace("Updating connection info for " + host + ": addr=" + addr + " port=" + port);
// Log.trace("Updating connection info for " + host + ": addr=" + addr + " port=" + port);
d.getStates().putIfAbsent(host, new TransientPeerState()); // FIXME:? set reachable here?
d.getStates().get(host).setAddr(addr);
d.getStates().get(host).setPort(port);
@@ -187,7 +187,7 @@ public class RemoteHostManager {
try {
persistentRemoteHostsService.addHost(
new HostInfo(UUID.fromString(info.getUuid()), CertificateTools.certFromBytes(info.getCert().toByteArray())));
new PersistentPeerInfo(UUID.fromString(info.getUuid()), CertificateTools.certFromBytes(info.getCert().toByteArray())));
Log.info("Added host: " + host.toString());
} catch (CertificateException e) {
throw new RuntimeException(e);

View File

@@ -5,6 +5,8 @@ import com.google.protobuf.ByteString;
import com.usatiuk.dhfs.objects.repository.distributed.*;
import com.usatiuk.dhfs.storage.objects.jrepository.JObject;
import com.usatiuk.dhfs.storage.objects.jrepository.JObjectManager;
import com.usatiuk.dhfs.storage.objects.repository.distributed.peersync.PeerDirectory;
import com.usatiuk.dhfs.storage.objects.repository.distributed.peersync.PersistentPeerInfo;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.quarkus.logging.Log;
@@ -47,7 +49,7 @@ public class RemoteObjectServiceClient {
.filter(entry -> entry.getValue().equals(ourVersion))
.map(Map.Entry::getKey).toList();
else
return persistentRemoteHostsService.getHosts().stream().map(HostInfo::getUuid).toList();
return persistentRemoteHostsService.getHosts().stream().map(PersistentPeerInfo::getUuid).toList();
});
if (targets.isEmpty())
@@ -99,7 +101,7 @@ public class RemoteObjectServiceClient {
if (obj.isEmpty()) continue;
try {
var header = obj.get().runReadLocked(JObject.ResolutionStrategy.NO_RESOLUTION, (m, d) -> Pair.of(m.toRpcHeader(), m.isSeen()));
var header = obj.get().runReadLocked(JObject.ResolutionStrategy.NO_RESOLUTION, (m, d) -> Pair.of(m.toRpcHeader(d), m.isSeen()));
if (!header.getRight())
obj.get().runWriteLocked(JObject.ResolutionStrategy.NO_RESOLUTION, (m, d, b, i) -> {
m.markSeen();

View File

@@ -70,7 +70,7 @@ public class RemoteObjectServiceServer implements DhfsObjectSyncGrpc {
var objs = jObjectManager.find("");
for (var obj : objs) {
var header = obj.runReadLocked(JObject.ResolutionStrategy.NO_RESOLUTION, (meta, data) -> meta.toRpcHeader());
var header = obj.runReadLocked(JObject.ResolutionStrategy.NO_RESOLUTION, (meta, data) -> meta.toRpcHeader(data));
builder.addHeader(header);
}
return Uni.createFrom().item(builder.build());

View File

@@ -54,7 +54,7 @@ public class RpcClientFactory {
boolean shouldTry = reachable && addr != null;
if (!shouldTry) {
Log.trace("Not trying " + target + ": " + "addr=" + Objects.toString(addr) + " reachable=" + reachable);
Log.trace("Not trying " + target + ": " + "addr=" + addr + " reachable=" + reachable);
continue;
}

View File

@@ -1,7 +1,9 @@
package com.usatiuk.dhfs.storage.objects.repository.distributed;
import com.usatiuk.dhfs.objects.repository.distributed.*;
import com.usatiuk.dhfs.storage.SerializationHelper;
import com.usatiuk.dhfs.storage.objects.jrepository.JObject;
import com.usatiuk.dhfs.storage.objects.jrepository.JObjectData;
import com.usatiuk.dhfs.storage.objects.jrepository.JObjectManager;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
@@ -53,7 +55,7 @@ public class SyncHandler {
}
public void handleOneUpdate(UUID from, ObjectHeader header) {
JObject<?> found = jObjectManager.getOrPut(header.getName(), Optional.empty());
JObject<?> found = jObjectManager.getOrPut(header.getName(), JObjectData.class, Optional.empty());
var receivedTotalVer = header.getChangelog().getEntriesList()
.stream().map(ObjectChangelogEntry::getVersion).reduce(0L, Long::sum);
@@ -107,7 +109,11 @@ public class SyncHandler {
md.getChangelog().clear();
md.getChangelog().putAll(receivedMap);
md.getChangelog().putIfAbsent(persistentRemoteHostsService.getSelfUuid(), 0L);
if (header.hasPushedData())
found.externalResolution(SerializationHelper.deserialize(header.getPushedData()));
return false;
} else if (data == null && header.hasPushedData()) {
found.externalResolution(SerializationHelper.deserialize(header.getPushedData()));
}
assert Objects.equals(receivedTotalVer, md.getOurVersion());
@@ -119,9 +125,21 @@ public class SyncHandler {
if (conflict) {
Log.info("Trying conflict resolution: " + header.getName() + " from " + from);
JObjectData theirsData;
ObjectHeader theirsHeader;
if (header.hasPushedData()) {
theirsHeader = header;
theirsData = SerializationHelper.deserialize(header.getPushedData());
} else {
var got = remoteObjectServiceClient.getSpecificObject(from, header.getName());
theirsData = SerializationHelper.deserialize(got.getRight());
theirsHeader = got.getLeft();
}
var resolverClass = found.runReadLocked(JObject.ResolutionStrategy.LOCAL_ONLY, (m, d) -> found.getConflictResolver());
var resolver = conflictResolvers.select(resolverClass);
var result = resolver.get().resolve(from, found);
var result = resolver.get().resolve(from, theirsHeader, theirsData, found);
if (result.equals(ConflictResolver.ConflictResolutionResult.RESOLVED)) {
Log.info("Resolved conflict for " + from + " " + header.getName());
} else {

View File

@@ -101,8 +101,8 @@ public class LocalPeerDiscoveryBroadcaster {
continue;
}
Log.trace(getClass().getName() + "Broadcast sent to: " + broadcast.getHostAddress()
+ ", at: " + networkInterface.getDisplayName());
// Log.trace(getClass().getName() + "Broadcast sent to: " + broadcast.getHostAddress()
// + ", at: " + networkInterface.getDisplayName());
}
}

View File

@@ -0,0 +1,47 @@
package com.usatiuk.dhfs.storage.objects.repository.distributed.peersync;
import com.usatiuk.dhfs.storage.objects.jrepository.JObjectData;
import com.usatiuk.dhfs.storage.objects.repository.distributed.ConflictResolver;
import lombok.Getter;
import java.util.Collection;
import java.util.LinkedHashSet;
import java.util.Set;
import java.util.UUID;
public class PeerDirectory extends JObjectData {
@Getter
private final Set<UUID> _peers = new LinkedHashSet<>();
public static final String PeerDirectoryObjName = "peer_directory";
@Override
public String getName() {
return PeerDirectoryObjName;
}
@Override
public Class<? extends ConflictResolver> getConflictResolver() {
return PeerDirectoryConflictResolver.class;
}
@Override
public Class<? extends JObjectData> getRefType() {
return PersistentPeerInfo.class;
}
@Override
public boolean pushResolution() {
return true;
}
@Override
public Collection<String> extractRefs() {
return _peers.stream().map(UUID::toString).toList();
}
@Override
public long estimateSize() {
return _peers.size() * 16L;
}
}

View File

@@ -0,0 +1,75 @@
package com.usatiuk.dhfs.storage.objects.repository.distributed.peersync;
import com.usatiuk.dhfs.objects.repository.distributed.ObjectHeader;
import com.usatiuk.dhfs.storage.objects.jrepository.JObject;
import com.usatiuk.dhfs.storage.objects.jrepository.JObjectData;
import com.usatiuk.dhfs.storage.objects.jrepository.JObjectManager;
import com.usatiuk.dhfs.storage.objects.repository.distributed.ConflictResolver;
import com.usatiuk.dhfs.storage.objects.repository.distributed.PersistentRemoteHostsService;
import com.usatiuk.dhfs.storage.objects.repository.distributed.RemoteObjectServiceClient;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.quarkus.logging.Log;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import org.apache.commons.lang3.NotImplementedException;
import java.util.*;
@ApplicationScoped
public class PeerDirectoryConflictResolver implements ConflictResolver {
@Inject
PersistentRemoteHostsService persistentRemoteHostsService;
@Inject
RemoteObjectServiceClient remoteObjectServiceClient;
@Inject
JObjectManager jObjectManager;
@Override
public ConflictResolutionResult resolve(UUID conflictHost, ObjectHeader theirsHeader, JObjectData theirsData, JObject<?> ours) {
var theirsDir = (PeerDirectory) theirsData;
if (!theirsDir.getClass().equals(PeerDirectory.class)) {
Log.error("Object type mismatch!");
throw new NotImplementedException();
}
ours.runWriteLocked(JObject.ResolutionStrategy.LOCAL_ONLY, (m, oursDirU, bump, invalidate) -> {
if (oursDirU == null)
throw new StatusRuntimeException(Status.ABORTED.withDescription("Conflict but we don't have local copy"));
if (!(oursDirU instanceof PeerDirectory oursPD))
throw new NotImplementedException("Type conflict for " + ours.getName() + ", directory was expected");
LinkedHashSet<UUID> mergedChildren = new LinkedHashSet<>(oursPD.getPeers());
mergedChildren.addAll(theirsDir.getPeers());
Map<UUID, Long> newChangelog = new LinkedHashMap<>(m.getChangelog());
for (var entry : theirsHeader.getChangelog().getEntriesList()) {
newChangelog.merge(UUID.fromString(entry.getHost()), entry.getVersion(), Long::max);
}
boolean wasChanged = oursPD.getPeers().size() != mergedChildren.size();
if (m.getBestVersion() > newChangelog.values().stream().reduce(0L, Long::sum))
throw new StatusRuntimeException(Status.ABORTED.withDescription("Race when conflict resolving"));
if (wasChanged) {
newChangelog.merge(persistentRemoteHostsService.getSelfUuid(), 1L, Long::sum);
for (var child : mergedChildren) {
if (!(new HashSet<>(oursPD.getPeers()).contains(child))) {
jObjectManager.getOrPut(child.toString(), PersistentPeerInfo.class, Optional.of(oursPD.getName()));
}
}
oursPD.getPeers().addAll(mergedChildren);
}
m.setChangelog(newChangelog);
return null;
});
return ConflictResolutionResult.RESOLVED;
}
}

View File

@@ -1,63 +0,0 @@
package com.usatiuk.dhfs.storage.objects.repository.distributed.peersync;
import com.google.protobuf.ByteString;
import com.usatiuk.dhfs.objects.repository.distributed.peersync.PeerInfo;
import com.usatiuk.dhfs.objects.repository.distributed.peersync.SyncPeersData;
import com.usatiuk.dhfs.storage.objects.repository.distributed.*;
import io.quarkus.logging.Log;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import java.security.cert.CertificateEncodingException;
import java.security.cert.CertificateException;
import java.util.UUID;
@ApplicationScoped
public class PeerSyncClient {
@Inject
RemoteHostManager remoteHostManager;
@Inject
RpcClientFactory rpcClientFactory;
@Inject
PersistentRemoteHostsService persistentRemoteHostsService;
public void syncPeersOne(UUID host) {
var ret = rpcClientFactory.withPeerSyncClient(host, client -> {
var builder = SyncPeersData.newBuilder();
builder.setSelfUuid(persistentRemoteHostsService.getSelfUuid().toString());
for (var h : persistentRemoteHostsService.getHosts()) {
builder.addMyPeers(h.toPeerInfo());
}
try {
builder.addMyPeers(PeerInfo.newBuilder().setUuid(persistentRemoteHostsService.getSelfUuid().toString())
.setCert(ByteString.copyFrom(persistentRemoteHostsService.getSelfCertificate().getEncoded()))
.build());
} catch (CertificateEncodingException e) {
throw new RuntimeException(e);
}
return client.syncPeers(builder.build());
});
for (var np : ret.getMyPeersList()) {
try {
persistentRemoteHostsService.addHost(
new HostInfo(UUID.fromString(np.getUuid()),
CertificateTools.certFromBytes(np.getCert().toByteArray())));
} catch (CertificateException e) {
Log.error("Error adding peer " + np.getUuid(), e);
}
}
}
public void syncPeersAll() {
for (var h : remoteHostManager.getSeenHosts()) {
try {
syncPeersOne(h);
} catch (Exception e) {
Log.info("Failed syncing hosts with " + h, e);
}
}
}
}

View File

@@ -4,55 +4,19 @@ import com.google.protobuf.ByteString;
import com.usatiuk.dhfs.objects.repository.distributed.peersync.DhfsObjectPeerSyncGrpc;
import com.usatiuk.dhfs.objects.repository.distributed.peersync.GetSelfInfoRequest;
import com.usatiuk.dhfs.objects.repository.distributed.peersync.PeerInfo;
import com.usatiuk.dhfs.objects.repository.distributed.peersync.SyncPeersData;
import com.usatiuk.dhfs.storage.objects.repository.distributed.CertificateTools;
import com.usatiuk.dhfs.storage.objects.repository.distributed.HostInfo;
import com.usatiuk.dhfs.storage.objects.repository.distributed.PersistentRemoteHostsService;
import io.quarkus.grpc.GrpcService;
import io.quarkus.logging.Log;
import io.smallrye.common.annotation.Blocking;
import io.smallrye.mutiny.Uni;
import jakarta.inject.Inject;
import java.security.cert.CertificateEncodingException;
import java.security.cert.CertificateException;
import java.util.UUID;
@GrpcService
public class PeerSyncServer implements DhfsObjectPeerSyncGrpc {
@Inject
PersistentRemoteHostsService persistentRemoteHostsService;
@Override
@Blocking
public Uni<SyncPeersData> syncPeers(SyncPeersData request) {
var builder = SyncPeersData.newBuilder();
builder.setSelfUuid(persistentRemoteHostsService.getSelfUuid().toString());
for (var h : persistentRemoteHostsService.getHosts()) {
builder.addMyPeers(h.toPeerInfo());
}
try {
builder.addMyPeers(PeerInfo.newBuilder()
.setUuid(persistentRemoteHostsService.getSelfUuid().toString())
.setCert(ByteString.copyFrom(persistentRemoteHostsService.getSelfCertificate().getEncoded()))
.build());
} catch (CertificateEncodingException e) {
throw new RuntimeException(e);
}
for (var np : request.getMyPeersList()) {
try {
persistentRemoteHostsService.addHost(
new HostInfo(UUID.fromString(np.getUuid()),
CertificateTools.certFromBytes(np.getCert().toByteArray())));
} catch (CertificateException e) {
Log.error("Error adding peer " + np.getUuid(), e);
}
}
return Uni.createFrom().item(builder.build());
}
@Override
@Blocking
public Uni<PeerInfo> getSelfInfo(GetSelfInfoRequest request) {

View File

@@ -0,0 +1,56 @@
package com.usatiuk.dhfs.storage.objects.repository.distributed.peersync;
import com.google.protobuf.ByteString;
import com.usatiuk.dhfs.objects.repository.distributed.peersync.PeerInfo;
import com.usatiuk.dhfs.storage.files.conflicts.NotImplementedConflictResolver;
import com.usatiuk.dhfs.storage.objects.jrepository.JObjectData;
import com.usatiuk.dhfs.storage.objects.repository.distributed.ConflictResolver;
import lombok.AllArgsConstructor;
import lombok.Getter;
import java.security.cert.CertificateEncodingException;
import java.security.cert.X509Certificate;
import java.util.Collection;
import java.util.List;
import java.util.UUID;
@Getter
@AllArgsConstructor
public class PersistentPeerInfo extends JObjectData {
private final UUID _uuid;
private final X509Certificate _certificate;
public PeerInfo toPeerInfo() {
try {
return PeerInfo.newBuilder().setUuid(_uuid.toString())
.setCert(ByteString.copyFrom(_certificate.getEncoded())).build();
} catch (CertificateEncodingException e) {
throw new RuntimeException(e);
}
}
@Override
public String getName() {
return _uuid.toString();
}
@Override
public boolean pushResolution() {
return true;
}
public boolean assumeUnique() {
return true;
}
// FIXME: Maybe check the certs?
@Override
public Class<? extends ConflictResolver> getConflictResolver() {
return NotImplementedConflictResolver.class;
}
@Override
public Collection<String> extractRefs() {
return List.of();
}
}

View File

@@ -1,6 +1,7 @@
package com.usatiuk.dhfs.storage.objects.repository.distributed.peertrust;
import com.usatiuk.dhfs.storage.objects.repository.distributed.HostInfo;
import com.usatiuk.dhfs.storage.objects.repository.distributed.peersync.PersistentPeerInfo;
import io.quarkus.logging.Log;
import jakarta.enterprise.context.ApplicationScoped;
import org.apache.commons.lang3.tuple.Pair;
@@ -34,8 +35,9 @@ public class PeerTrustManager implements X509TrustManager {
return trustManager.getAcceptedIssuers();
}
public void reloadTrustManagerHosts(Collection<HostInfo> hosts) {
public void reloadTrustManagerHosts(Collection<PersistentPeerInfo> hosts) {
try {
Log.info("Trying to reload trust manager: " + hosts.size() + " known hosts");
reloadTrustManager(hosts.stream().map(hostInfo ->
Pair.of(hostInfo.getUuid().toString(), hostInfo.getCertificate())).toList());
} catch (Exception e) {
@@ -54,7 +56,7 @@ public class PeerTrustManager implements X509TrustManager {
TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
tmf.init(ts);
TrustManager tms[] = tmf.getTrustManagers();
TrustManager[] tms = tmf.getTrustManagers();
for (var tm : tms) {
if (tm instanceof X509TrustManager) {
trustManager = (X509TrustManager) tm;

View File

@@ -7,8 +7,6 @@ option java_outer_classname = "DhfsObjectPeerSyncApi";
package dhfs.objects.peersync;
service DhfsObjectPeerSyncGrpc {
rpc SyncPeers (SyncPeersData) returns (SyncPeersData) {}
rpc GetSelfInfo (GetSelfInfoRequest) returns (PeerInfo) {}
}
@@ -19,10 +17,4 @@ message GetSelfInfoRequest {
message PeerInfo {
string uuid = 1;
bytes cert = 2;
}
message SyncPeersData {
string selfUuid = 1;
repeated PeerInfo my_peers = 2;
}
}

View File

@@ -34,6 +34,7 @@ message ObjectChangelog {
message ObjectHeader {
string name = 2;
ObjectChangelog changelog = 5;
optional bytes pushedData = 6;
}
message ApiObject {

View File

@@ -27,6 +27,7 @@ dhfs.objects.writeback.threads=2
dhfs.objects.writeback.nursery_limit=-1
dhfs.objects.deletion.delay=0
dhfs.objects.ref_verification=false
dhfs.objects.bump_verification=true
dhfs.files.use_hash_for_chunks=false
quarkus.log.category."com.usatiuk.dhfs".min-level=TRACE
quarkus.http.insecure-requests=enabled

View File

@@ -79,13 +79,13 @@ public class DhfsFuseIT {
var c1curl = container1.execInContainer("/bin/sh", "-c",
"curl --header \"Content-Type: application/json\" " +
" --request PUT " +
" --data " + c2uuid +
" --data '{\"uuid\":\"" + c2uuid + "\"}' " +
" http://localhost:8080/objects-manage/known-peers");
var c2curl = container2.execInContainer("/bin/sh", "-c",
"curl --header \"Content-Type: application/json\" " +
" --request PUT " +
" --data " + c1uuid +
" --data '{\"uuid\":\"" + c1uuid + "\"}' " +
" http://localhost:8080/objects-manage/known-peers");
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);

View File

@@ -95,25 +95,25 @@ public class DhfsFusex3IT {
var c1curl = container1.execInContainer("/bin/sh", "-c",
"curl --header \"Content-Type: application/json\" " +
" --request PUT " +
" --data " + c2uuid +
" --data '{\"uuid\":\"" + c2uuid + "\"}' " +
" http://localhost:8080/objects-manage/known-peers");
var c2curl1 = container2.execInContainer("/bin/sh", "-c",
"curl --header \"Content-Type: application/json\" " +
" --request PUT " +
" --data " + c1uuid +
" --data '{\"uuid\":\"" + c1uuid + "\"}' " +
" http://localhost:8080/objects-manage/known-peers");
var c2curl3 = container2.execInContainer("/bin/sh", "-c",
"curl --header \"Content-Type: application/json\" " +
" --request PUT " +
" --data " + c3uuid +
" --data '{\"uuid\":\"" + c3uuid + "\"}' " +
" http://localhost:8080/objects-manage/known-peers");
var c3curl = container3.execInContainer("/bin/sh", "-c",
"curl --header \"Content-Type: application/json\" " +
" --request PUT " +
" --data " + c2uuid +
" --data '{\"uuid\":\"" + c2uuid + "\"}' " +
" http://localhost:8080/objects-manage/known-peers");
waitingConsumer3.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS, 2);