code cleanup

This commit is contained in:
2024-07-10 18:13:57 +02:00
parent 9138b3df43
commit f67a0e1809
31 changed files with 229 additions and 310 deletions

View File

@@ -1,13 +1,13 @@
package com.usatiuk.dhfs.files.conflicts;
import com.usatiuk.dhfs.objects.repository.ConflictResolver;
import com.usatiuk.dhfs.objects.repository.ObjectHeader;
import com.usatiuk.dhfs.objects.repository.PersistentRemoteHostsService;
import com.usatiuk.dhfs.files.objects.Directory;
import com.usatiuk.dhfs.files.objects.FsNode;
import com.usatiuk.dhfs.objects.jrepository.JObject;
import com.usatiuk.dhfs.objects.jrepository.JObjectData;
import com.usatiuk.dhfs.objects.jrepository.JObjectManager;
import com.usatiuk.dhfs.objects.repository.ConflictResolver;
import com.usatiuk.dhfs.objects.repository.ObjectHeader;
import com.usatiuk.dhfs.objects.repository.PersistentRemoteHostsService;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.quarkus.logging.Log;

View File

@@ -1,9 +1,9 @@
package com.usatiuk.dhfs.files.conflicts;
import com.usatiuk.dhfs.objects.repository.ConflictResolver;
import com.usatiuk.dhfs.objects.repository.ObjectHeader;
import com.usatiuk.dhfs.objects.jrepository.JObject;
import com.usatiuk.dhfs.objects.jrepository.JObjectData;
import com.usatiuk.dhfs.objects.repository.ConflictResolver;
import com.usatiuk.dhfs.objects.repository.ObjectHeader;
import jakarta.enterprise.context.ApplicationScoped;
import org.apache.commons.lang3.NotImplementedException;

View File

@@ -36,6 +36,10 @@ public class ChunkData extends JObjectData {
this._hash = name;
}
public static String getNameFromHash(String hash) {
return "data_" + hash;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
@@ -59,10 +63,6 @@ public class ChunkData extends JObjectData {
return NoOpConflictResolver.class;
}
public static String getNameFromHash(String hash) {
return "data_" + hash;
}
@Override
public Collection<String> extractRefs() {
return List.of();

View File

@@ -27,6 +27,10 @@ public class ChunkInfo extends JObjectData {
this._size = size;
}
public static String getNameFromHash(String hash) {
return "info_" + hash;
}
@Override
public String getName() {
return getNameFromHash(_hash);
@@ -37,10 +41,6 @@ public class ChunkInfo extends JObjectData {
return NoOpConflictResolver.class;
}
public static String getNameFromHash(String hash) {
return "info_" + hash;
}
@Override
public Class<? extends JObjectData> getRefType() {
return ChunkData.class;

View File

@@ -12,6 +12,9 @@ import java.util.*;
public class Directory extends FsNode {
@Serial
private static final long serialVersionUID = 1;
@Getter
@Setter
private Map<String, UUID> _children = new TreeMap<>();
public Directory(UUID uuid) {
super(uuid);
@@ -21,10 +24,6 @@ public class Directory extends FsNode {
super(uuid, mode);
}
@Getter
@Setter
private Map<String, UUID> _children = new TreeMap<>();
@Override
public Class<? extends ConflictResolver> getConflictResolver() {
return DirectoryConflictResolver.class;
@@ -46,6 +45,7 @@ public class Directory extends FsNode {
_children.put(name, uuid);
return true;
}
@Override
public Class<? extends JObjectData> getRefType() {
return FsNode.class;

View File

@@ -14,6 +14,10 @@ import java.util.UUID;
public class File extends FsNode {
@Serial
private static final long serialVersionUID = 1;
@Getter
private final NavigableMap<Long, String> _chunks = new TreeMap<>();
@Getter
private final UUID _parent;
public File(UUID uuid, long mode, UUID parent) {
super(uuid, mode);
@@ -35,12 +39,6 @@ public class File extends FsNode {
return _chunks.values().stream().map(ChunkInfo::getNameFromHash).toList();
}
@Getter
private final NavigableMap<Long, String> _chunks = new TreeMap<>();
@Getter
private final UUID _parent;
@Override
public long estimateSize() {
return _chunks.size() * 16L;

View File

@@ -15,6 +15,15 @@ public abstract class FsNode extends JObjectData {
@Getter
final UUID _uuid;
@Getter
@Setter
private long _mode;
@Getter
@Setter
private long _ctime;
@Getter
@Setter
private long _mtime;
protected FsNode(UUID uuid) {
this._uuid = uuid;
@@ -29,7 +38,6 @@ public abstract class FsNode extends JObjectData {
this._mtime = this._ctime;
}
@Override
public String getName() {
return _uuid.toString();
@@ -39,16 +47,4 @@ public abstract class FsNode extends JObjectData {
public Class<? extends ConflictResolver> getConflictResolver() {
return NotImplementedConflictResolver.class;
}
@Getter
@Setter
private long _mode;
@Getter
@Setter
private long _ctime;
@Getter
@Setter
private long _mtime;
}

View File

@@ -7,19 +7,30 @@ import java.util.Optional;
public interface DhfsFileService {
Optional<String> open(String name);
Optional<String> create(String name, long mode);
Optional<String> mkdir(String name, long mode);
Optional<FsNode> getattr(String name);
Boolean chmod(String name, long mode);
Boolean rmdir(String name);
Boolean unlink(String name);
Boolean rename(String from, String to);
Boolean setTimes(String fileUuid, long atimeMs, long mtimeMs);
Iterable<String> readDir(String name);
Long size(String f);
Optional<ByteString> read(String fileUuid, long offset, int length);
Long write(String fileUuid, long offset, byte[] data);
Boolean truncate(String fileUuid, long length);
}

View File

@@ -6,16 +6,15 @@ import jnr.ffi.Pointer;
import java.nio.ByteBuffer;
public class JnrPtrByteOutput extends ByteOutput {
private final Pointer _backing;
private final long _size;
private long _pos;
public JnrPtrByteOutput(Pointer backing, long size) {
_backing = backing;
_size = size;
_pos = 0;
}
private final Pointer _backing;
private final long _size;
private long _pos;
@Override
public void write(byte value) {
throw new UnsupportedOperationException();

View File

@@ -1,4 +1,4 @@
package com.usatiuk.dhfs.objects.jrepository;
public class DeletedObjectAccessException extends RuntimeException {
public class DeletedObjectAccessException extends RuntimeException {
}

View File

@@ -12,6 +12,29 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
public class JObject<T extends JObjectData> implements Serializable, Comparable<JObject<?>> {
private final ReentrantReadWriteLock _lock = new ReentrantReadWriteLock();
private final ObjectMetadata _metaPart;
private final JObjectResolver _resolver;
private final AtomicReference<T> _dataPart = new AtomicReference<>();
// Create a new object
protected JObject(JObjectResolver resolver, String name, UUID selfUuid, T obj) {
_resolver = resolver;
_metaPart = new ObjectMetadata(name, false, obj.getClass());
_dataPart.set(obj);
_metaPart.bumpVersion(selfUuid);
}
// Create an object from existing metadata
protected JObject(JObjectResolver resolver, ObjectMetadata objectMetadata) {
_resolver = resolver;
_metaPart = objectMetadata;
}
static public void rwLockAll(List<JObject<?>> objects) {
objects.stream().sorted(Comparator.comparingInt(System::identityHashCode)).forEach(JObject::rwLock);
}
@Override
public int compareTo(JObject<?> o) {
return getName().compareTo(o.getName());
@@ -30,20 +53,6 @@ public class JObject<T extends JObjectData> implements Serializable, Comparable<
return Objects.hashCode(_metaPart.getName());
}
// Create a new object
protected JObject(JObjectResolver resolver, String name, UUID selfUuid, T obj) {
_resolver = resolver;
_metaPart = new ObjectMetadata(name, false, obj.getClass());
_dataPart.set(obj);
_metaPart.bumpVersion(selfUuid);
}
// Create an object from existing metadata
protected JObject(JObjectResolver resolver, ObjectMetadata objectMetadata) {
_resolver = resolver;
_metaPart = objectMetadata;
}
public Class<? extends JObjectData> getKnownClass() {
return _metaPart.getKnownClass();
}
@@ -52,16 +61,10 @@ public class JObject<T extends JObjectData> implements Serializable, Comparable<
_metaPart.narrowClass(klass);
}
public String getName() {
return _metaPart.getName();
}
private final ReentrantReadWriteLock _lock = new ReentrantReadWriteLock();
private final ObjectMetadata _metaPart;
private final JObjectResolver _resolver;
private final AtomicReference<T> _dataPart = new AtomicReference<>();
public T getData() {
assertRWLock();
return _dataPart.get();
@@ -93,21 +96,6 @@ public class JObject<T extends JObjectData> implements Serializable, Comparable<
_metaPart.markSeen();
}
@FunctionalInterface
public interface VoidFn {
void apply();
}
@FunctionalInterface
public interface ObjectFnRead<T, R> {
R apply(ObjectMetadata meta, @Nullable T data);
}
@FunctionalInterface
public interface ObjectFnWrite<T, R> {
R apply(ObjectMetadata indexData, @Nullable T data, VoidFn bump, VoidFn invalidate);
}
private void hydrateRefs() {
_resolver.hydrateRefs(this);
}
@@ -169,12 +157,6 @@ public class JObject<T extends JObjectData> implements Serializable, Comparable<
verifyRefs();
}
public enum ResolutionStrategy {
NO_RESOLUTION,
LOCAL_ONLY,
REMOTE
}
public <R> R runReadLocked(ResolutionStrategy resolutionStrategy, ObjectFnRead<T, R> fn) {
tryResolve(resolutionStrategy);
@@ -303,12 +285,29 @@ public class JObject<T extends JObjectData> implements Serializable, Comparable<
_metaPart.setSavedRefs(Collections.emptySet());
}
static public void rwLockAll(List<JObject<?>> objects) {
objects.stream().sorted(Comparator.comparingInt(System::identityHashCode)).forEach(JObject::rwLock);
}
public void assertRWLock() {
if (!_lock.isWriteLockedByCurrentThread())
throw new IllegalStateException("Expected to be write-locked there: " + getName() + " " + Thread.currentThread().getName());
}
public enum ResolutionStrategy {
NO_RESOLUTION,
LOCAL_ONLY,
REMOTE
}
@FunctionalInterface
public interface VoidFn {
void apply();
}
@FunctionalInterface
public interface ObjectFnRead<T, R> {
R apply(ObjectMetadata meta, @Nullable T data);
}
@FunctionalInterface
public interface ObjectFnWrite<T, R> {
R apply(ObjectMetadata indexData, @Nullable T data, VoidFn bump, VoidFn invalidate);
}
}

View File

@@ -22,28 +22,14 @@ import java.util.Optional;
@ApplicationScoped
public class JObjectManagerImpl implements JObjectManager {
@Inject
ObjectPersistentStore objectPersistentStore;
@Inject
JObjectResolver jObjectResolver;
@Inject
PersistentRemoteHostsService persistentRemoteHostsService;
private static class NamedSoftReference extends SoftReference<JObject<?>> {
public NamedSoftReference(JObject<?> target, ReferenceQueue<JObject<?>> q) {
super(target, q);
this._key = target.getName();
}
@Getter
final String _key;
}
private final HashMap<String, NamedSoftReference> _map = new HashMap<>();
private final ReferenceQueue<JObject<?>> _refQueue = new ReferenceQueue<>();
@Inject
ObjectPersistentStore objectPersistentStore;
@Inject
JObjectResolver jObjectResolver;
@Inject
PersistentRemoteHostsService persistentRemoteHostsService;
private Thread _refCleanupThread;
@Startup
@@ -206,4 +192,14 @@ public class JObjectManagerImpl implements JObjectManager {
}
}
}
private static class NamedSoftReference extends SoftReference<JObject<?>> {
@Getter
final String _key;
public NamedSoftReference(JObject<?> target, ReferenceQueue<JObject<?>> q) {
super(target, q);
this._key = target.getName();
}
}
}

View File

@@ -18,27 +18,22 @@ import java.util.stream.Stream;
@ApplicationScoped
public class JObjectRefProcessor {
private Thread _refProcessorThread;
private final LinkedHashMap<String, Long> _candidates = new LinkedHashMap<>();
private final HashSet<String> _movablesInProcessing = new HashSet<>();
@ConfigProperty(name = "dhfs.objects.deletion.delay")
Long deletionDelay;
@Inject
JObjectManager jObjectManager;
@Inject
PersistentRemoteHostsService persistentRemoteHostsService;
@Inject
RemoteObjectServiceClient remoteObjectServiceClient;
@Inject
AutoSyncProcessor autoSyncProcessor;
@ConfigProperty(name = "dhfs.objects.move-processor.threads")
int moveProcessorThreads;
ExecutorService _movableProcessorExecutorService;
private Thread _refProcessorThread;
@Startup
void init() {
@@ -55,8 +50,6 @@ public class JObjectRefProcessor {
_refProcessorThread.join();
}
private final LinkedHashMap<String, Long> _candidates = new LinkedHashMap<>();
public void putDeletionCandidate(String name) {
synchronized (_movablesInProcessing) {
if (_movablesInProcessing.contains(name)) return;
@@ -69,8 +62,6 @@ public class JObjectRefProcessor {
}
}
private final HashSet<String> _movablesInProcessing = new HashSet<>();
private void asyncProcessMovable(JObject<?> obj) {
synchronized (_movablesInProcessing) {
if (_movablesInProcessing.contains(obj.getName())) return;

View File

@@ -23,44 +23,30 @@ import java.util.stream.Stream;
@Singleton
public class JObjectResolver {
@Inject
ObjectPersistentStore objectPersistentStore;
@Inject
RemoteObjectServiceClient remoteObjectServiceClient;
@Inject
InvalidationQueueService invalidationQueueService;
@Inject
JObjectWriteback jObjectWriteback;
@Inject
JObjectManager jobjectManager;
@Inject
PersistentRemoteHostsService persistentRemoteHostsService;
@Inject
JObjectRefProcessor jObjectRefProcessor;
private final MultiValuedMap<Class<? extends JObjectData>, WriteListenerFn<?>> _writeListeners
= new ArrayListValuedHashMap<>();
private final MultiValuedMap<Class<? extends JObjectData>, WriteListenerFn<?>> _metaWriteListeners
= new ArrayListValuedHashMap<>();
@Inject
ObjectPersistentStore objectPersistentStore;
@Inject
RemoteObjectServiceClient remoteObjectServiceClient;
@Inject
InvalidationQueueService invalidationQueueService;
@Inject
JObjectWriteback jObjectWriteback;
@Inject
JObjectManager jobjectManager;
@Inject
PersistentRemoteHostsService persistentRemoteHostsService;
@Inject
JObjectRefProcessor jObjectRefProcessor;
@ConfigProperty(name = "dhfs.objects.ref_verification")
boolean refVerification;
@ConfigProperty(name = "dhfs.objects.bump_verification")
boolean bumpVerification;
@FunctionalInterface
public interface WriteListenerFn<T extends JObjectData> {
void apply(JObject<T> obj);
}
public <T extends JObjectData> void registerWriteListener(Class<T> klass, WriteListenerFn<T> fn) {
_writeListeners.put(klass, fn);
}
@@ -221,4 +207,9 @@ public class JObjectResolver {
throw new IllegalStateException("Object " + r + " is not referenced by " + self.getName() + " but should be");
}
}
@FunctionalInterface
public interface WriteListenerFn<T extends JObjectData> {
void apply(JObject<T> obj);
}
}

View File

@@ -1,7 +1,7 @@
package com.usatiuk.dhfs.objects.jrepository;
import com.usatiuk.dhfs.objects.repository.persistence.ObjectPersistentStore;
import com.usatiuk.dhfs.SerializationHelper;
import com.usatiuk.dhfs.objects.repository.persistence.ObjectPersistentStore;
import io.quarkus.logging.Log;
import io.quarkus.runtime.ShutdownEvent;
import io.quarkus.runtime.Startup;
@@ -24,38 +24,27 @@ import java.util.concurrent.atomic.AtomicLong;
@Singleton
public class JObjectWriteback {
@Inject
ObjectPersistentStore objectPersistentStore;
@Inject
JObjectManager jObjectManager;
@Inject
JObjectSizeEstimator jObjectSizeEstimator;
@ConfigProperty(name = "dhfs.objects.writeback.delay")
long promotionDelay;
@ConfigProperty(name = "dhfs.objects.writeback.limit")
long sizeLimit;
@ConfigProperty(name = "dhfs.objects.writeback.nursery_limit")
int nurseryLimit;
@ConfigProperty(name = "dhfs.objects.writeback.threads")
int writebackThreads;
AtomicLong _currentSize = new AtomicLong(0);
private final LinkedHashMap<JObject<?>, Pair<Long, Long>> _nursery = new LinkedHashMap<>();
// FIXME: Kind of a hack
private final OrderedBidiMap<Pair<Long, String>, JObject<?>> _writeQueue = new TreeBidiMap<>();
private Thread _promotionThread;
private ExecutorService _writebackExecutor;
@Inject
ObjectPersistentStore objectPersistentStore;
@Inject
JObjectManager jObjectManager;
@Inject
JObjectSizeEstimator jObjectSizeEstimator;
@ConfigProperty(name = "dhfs.objects.writeback.delay")
long promotionDelay;
@ConfigProperty(name = "dhfs.objects.writeback.limit")
long sizeLimit;
@ConfigProperty(name = "dhfs.objects.writeback.nursery_limit")
int nurseryLimit;
@ConfigProperty(name = "dhfs.objects.writeback.threads")
int writebackThreads;
AtomicLong _currentSize = new AtomicLong(0);
boolean overload = false;
private Thread _promotionThread;
private ExecutorService _writebackExecutor;
@Startup
void init() {

View File

@@ -22,6 +22,25 @@ import java.util.concurrent.atomic.AtomicReference;
public class ObjectMetadata implements Serializable {
@Serial
private static final long serialVersionUID = 1;
@Getter
private final String _name;
@Getter
private final Map<UUID, Long> _remoteCopies = new LinkedHashMap<>();
private final AtomicReference<Class<? extends JObjectData>> _knownClass = new AtomicReference<>();
private final AtomicBoolean _seen = new AtomicBoolean(false);
private final AtomicBoolean _deleted = new AtomicBoolean(false);
@Getter
private final HashSet<UUID> _confirmedDeletes = new HashSet<>();
private final Set<String> _referrers = new ConcurrentSkipListSet<>();
@Getter
@Setter
private Map<UUID, Long> _changelog = new LinkedHashMap<>();
@Getter
@Setter
private Set<String> _savedRefs = Collections.emptySet();
@Getter
private boolean _locked = false;
private transient AtomicBoolean _written = new AtomicBoolean(true);
public ObjectMetadata(String name, boolean written, Class<? extends JObjectData> knownClass) {
_name = name;
@@ -29,34 +48,6 @@ public class ObjectMetadata implements Serializable {
_knownClass.set(knownClass);
}
@Getter
private final String _name;
@Getter
private final Map<UUID, Long> _remoteCopies = new LinkedHashMap<>();
@Getter
@Setter
private Map<UUID, Long> _changelog = new LinkedHashMap<>();
@Getter
@Setter
private Set<String> _savedRefs = Collections.emptySet();
private final AtomicReference<Class<? extends JObjectData>> _knownClass = new AtomicReference<>();
@Getter
private boolean _locked = false;
private transient AtomicBoolean _written = new AtomicBoolean(true);
private final AtomicBoolean _seen = new AtomicBoolean(false);
private final AtomicBoolean _deleted = new AtomicBoolean(false);
@Getter
private final HashSet<UUID> _confirmedDeletes = new HashSet<>();
public Class<? extends JObjectData> getKnownClass() {
return _knownClass.get();
}
@@ -107,8 +98,6 @@ public class ObjectMetadata implements Serializable {
_written.set(true);
}
private final Set<String> _referrers = new ConcurrentSkipListSet<>();
public boolean isReferred() {
return !_referrers.isEmpty();
}

View File

@@ -6,11 +6,11 @@ import com.usatiuk.dhfs.objects.jrepository.JObjectData;
import java.util.UUID;
public interface ConflictResolver {
ConflictResolutionResult
resolve(UUID conflictHost, ObjectHeader conflictHeader, JObjectData conflictData, JObject<?> conflictSource);
enum ConflictResolutionResult {
RESOLVED,
FAILED
}
ConflictResolutionResult
resolve(UUID conflictHost, ObjectHeader conflictHeader, JObjectData conflictData, JObject<?> conflictSource);
}

View File

@@ -1,9 +1,4 @@
package com.usatiuk.dhfs.objects.repository;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Set;
public class InvalidationQueue {
}

View File

@@ -12,11 +12,6 @@ public class PersistentRemoteHosts implements Serializable {
private final PersistentRemoteHostsData _data = new PersistentRemoteHostsData();
private final ReadWriteLock _lock = new ReentrantReadWriteLock();
@FunctionalInterface
public interface PersistentRemoteHostsFn<R> {
R apply(PersistentRemoteHostsData hostsData);
}
public <R> R runReadLocked(PersistentRemoteHostsFn<R> fn) {
_lock.readLock().lock();
try {
@@ -34,4 +29,9 @@ public class PersistentRemoteHosts implements Serializable {
_lock.writeLock().unlock();
}
}
@FunctionalInterface
public interface PersistentRemoteHostsFn<R> {
R apply(PersistentRemoteHostsData hostsData);
}
}

View File

@@ -16,15 +16,12 @@ public class PersistentRemoteHostsData implements Serializable {
@Getter
private final UUID _selfUuid = UUID.randomUUID();
@Getter
private final AtomicLong _selfCounter = new AtomicLong();
@Getter
@Setter
private X509Certificate _selfCertificate = null;
@Getter
@Setter
private KeyPair _selfKeyPair = null;
@Getter
private final AtomicLong _selfCounter = new AtomicLong();
}

View File

@@ -32,29 +32,21 @@ import java.util.concurrent.ExecutorService;
@ApplicationScoped
public class PersistentRemoteHostsService {
final String dataFileName = "hosts";
@ConfigProperty(name = "dhfs.objects.root")
String dataRoot;
@Inject
PeerTrustManager peerTrustManager;
@Inject
JObjectManager jObjectManager;
@Inject
JObjectResolver jObjectResolver;
@Inject
ExecutorService executorService;
@Inject
InvalidationQueueService invalidationQueueService;
@Inject
RpcClientFactory rpcClientFactory;
final String dataFileName = "hosts";
private PersistentRemoteHosts _persistentData = new PersistentRemoteHosts();
private UUID _selfUuid;

View File

@@ -25,22 +25,16 @@ import java.util.concurrent.ConcurrentMap;
@ApplicationScoped
public class RemoteHostManager {
private final TransientPeersState _transientPeersState = new TransientPeersState();
private final ConcurrentMap<UUID, TransientPeerState> _seenHostsButNotAdded = new ConcurrentHashMap<>();
@Inject
PersistentRemoteHostsService persistentRemoteHostsService;
@Inject
SyncHandler syncHandler;
@Inject
RpcClientFactory rpcClientFactory;
@ConfigProperty(name = "dhfs.objects.sync.ping.timeout")
long pingTimeout;
private final TransientPeersState _transientPeersState = new TransientPeersState();
private final ConcurrentMap<UUID, TransientPeerState> _seenHostsButNotAdded = new ConcurrentHashMap<>();
boolean _initialized = false;
void init(@Observes @Priority(350) StartupEvent event) throws IOException {

View File

@@ -26,21 +26,14 @@ public class RpcChannelFactory {
PersistentRemoteHostsService persistentRemoteHostsService;
@Inject
PeerTrustManager peerTrustManager;
private ConcurrentMap<SecureChannelKey, ManagedChannel> _secureChannelCache = new ConcurrentHashMap<>();
private ConcurrentMap<InsecureChannelKey, ManagedChannel> _insecureChannelCache = new ConcurrentHashMap<>();
void shutdown(@Observes @Priority(100000) ShutdownEvent event) {
for (var c : _secureChannelCache.values()) c.shutdownNow();
for (var i : _insecureChannelCache.values()) i.shutdownNow();
}
private record SecureChannelKey(String host, String address, int port) {
}
private record InsecureChannelKey(String address, int port) {
}
private ConcurrentMap<SecureChannelKey, ManagedChannel> _secureChannelCache = new ConcurrentHashMap<>();
private ConcurrentMap<InsecureChannelKey, ManagedChannel> _insecureChannelCache = new ConcurrentHashMap<>();
private ChannelCredentials getChannelCredentials() {
try {
KeyStore ks = KeyStore.getInstance(KeyStore.getDefaultType());
@@ -80,4 +73,10 @@ public class RpcChannelFactory {
oldS.values().forEach(ManagedChannel::shutdown);
oldI.values().forEach(ManagedChannel::shutdown);
}
private record SecureChannelKey(String host, String address, int port) {
}
private record InsecureChannelKey(String address, int port) {
}
}

View File

@@ -30,23 +30,10 @@ public class RpcClientFactory {
@Inject
RpcChannelFactory rpcChannelFactory;
private record ObjSyncStubKey(String host, String address, int port) {
}
private record PeerSyncStubKey(String address, int port) {
}
// FIXME: Leaks!
private ConcurrentMap<ObjSyncStubKey, DhfsObjectSyncGrpcGrpc.DhfsObjectSyncGrpcBlockingStub> _objSyncCache = new ConcurrentHashMap<>();
private ConcurrentMap<PeerSyncStubKey, DhfsObjectPeerSyncGrpcGrpc.DhfsObjectPeerSyncGrpcBlockingStub> _peerSyncCache = new ConcurrentHashMap<>();
@FunctionalInterface
public interface ObjectSyncClientFunction<R> {
R apply(DhfsObjectSyncGrpcGrpc.DhfsObjectSyncGrpcBlockingStub client);
}
public <R> R withObjSyncClient(Collection<UUID> targets, ObjectSyncClientFunction<R> fn) {
var shuffledList = new ArrayList<>(targets);
Collections.shuffle(shuffledList);
@@ -80,14 +67,12 @@ public class RpcClientFactory {
throw new IllegalStateException("No reachable targets!");
}
public <R> R withObjSyncClient(UUID target, ObjectSyncClientFunction<R> fn) {
var hostinfo = remoteHostManager.getTransientState(target);
if (hostinfo.getAddr() == null) throw new IllegalStateException("Address for " + target + " not yet known");
return withObjSyncClient(target.toString(), hostinfo.getAddr(), hostinfo.getSecurePort(), syncTimeout, fn);
}
public <R> R withObjSyncClient(String host, String addr, int port, long timeout, ObjectSyncClientFunction<R> fn) {
var key = new ObjSyncStubKey(host, addr, port);
var stub = _objSyncCache.computeIfAbsent(key, (k) -> {
@@ -100,11 +85,6 @@ public class RpcClientFactory {
return fn.apply(stub.withDeadlineAfter(timeout, TimeUnit.SECONDS));
}
@FunctionalInterface
public interface PeerSyncClientFunction<R> {
R apply(DhfsObjectPeerSyncGrpcGrpc.DhfsObjectPeerSyncGrpcBlockingStub client);
}
public <R> R withPeerSyncClient(UUID target, PeerSyncClientFunction<R> fn) {
var hostinfo = remoteHostManager.getTransientState(target);
if (hostinfo.getAddr() == null) throw new IllegalStateException("Address for " + target + " not yet known");
@@ -128,4 +108,20 @@ public class RpcClientFactory {
_peerSyncCache = new ConcurrentHashMap<>();
}
@FunctionalInterface
public interface ObjectSyncClientFunction<R> {
R apply(DhfsObjectSyncGrpcGrpc.DhfsObjectSyncGrpcBlockingStub client);
}
@FunctionalInterface
public interface PeerSyncClientFunction<R> {
R apply(DhfsObjectPeerSyncGrpcGrpc.DhfsObjectPeerSyncGrpcBlockingStub client);
}
private record ObjSyncStubKey(String host, String address, int port) {
}
private record PeerSyncStubKey(String address, int port) {
}
}

View File

@@ -17,24 +17,16 @@ import java.util.stream.Stream;
@ApplicationScoped
public class SyncHandler {
protected static class OutdatedUpdateException extends RuntimeException {
}
@Inject
JObjectManager jObjectManager;
@Inject
RemoteHostManager remoteHostManager;
@Inject
RemoteObjectServiceClient remoteObjectServiceClient;
@Inject
InvalidationQueueService invalidationQueueService;
@Inject
Instance<ConflictResolver> conflictResolvers;
@Inject
PersistentRemoteHostsService persistentRemoteHostsService;
@@ -166,4 +158,7 @@ public class SyncHandler {
}
return builder.build();
}
protected static class OutdatedUpdateException extends RuntimeException {
}
}

View File

@@ -8,6 +8,19 @@ import lombok.Setter;
@AllArgsConstructor
@NoArgsConstructor
public class TransientPeerState {
@Getter
@Setter
private ConnectionState _state = ConnectionState.NOT_SEEN;
@Getter
@Setter
private String _addr;
@Getter
@Setter
private int _port;
@Getter
@Setter
private int _securePort;
public TransientPeerState(ConnectionState connectionState) {
_state = connectionState;
}
@@ -17,20 +30,4 @@ public class TransientPeerState {
REACHABLE,
UNREACHABLE
}
@Getter
@Setter
private ConnectionState _state = ConnectionState.NOT_SEEN;
@Getter
@Setter
private String _addr;
@Getter
@Setter
private int _port;
@Getter
@Setter
private int _securePort;
}

View File

@@ -7,11 +7,6 @@ public class TransientPeersState {
private final TransientPeersStateData _data = new TransientPeersStateData();
private final ReadWriteLock _lock = new ReentrantReadWriteLock();
@FunctionalInterface
public interface TransientPeersStaten<R> {
R apply(TransientPeersStateData hostsData);
}
public <R> R runReadLocked(TransientPeersStaten<R> fn) {
_lock.readLock().lock();
try {
@@ -29,4 +24,9 @@ public class TransientPeersState {
_lock.writeLock().unlock();
}
}
@FunctionalInterface
public interface TransientPeersStaten<R> {
R apply(TransientPeersStateData hostsData);
}
}

View File

@@ -19,24 +19,18 @@ import java.util.concurrent.Executors;
@ApplicationScoped
public class AutoSyncProcessor {
private final SequencedSet<String> _pending = new LinkedHashSet<>();
@Inject
JObjectResolver jObjectResolver;
@Inject
JObjectManager jObjectManager;
@ConfigProperty(name = "dhfs.objects.autosync.threads")
int autosyncThreads;
@ConfigProperty(name = "dhfs.objects.autosync.download-all")
boolean downloadAll;
private ExecutorService _autosyncExcecutor;
@Inject
ExecutorService executorService;
private final SequencedSet<String> _pending = new LinkedHashSet<>();
private ExecutorService _autosyncExcecutor;
@Startup
void init() {

View File

@@ -13,14 +13,12 @@ import java.util.UUID;
@PushResolution
public class PeerDirectory extends JObjectData {
public static final String PeerDirectoryObjName = "peer_directory";
@Serial
private static final long serialVersionUID = 1;
@Getter
private final Set<UUID> _peers = new LinkedHashSet<>();
public static final String PeerDirectoryObjName = "peer_directory";
@Override
public String getName() {
return PeerDirectoryObjName;

View File

@@ -1,10 +1,10 @@
package com.usatiuk.dhfs.objects.repository.peersync;
import com.usatiuk.dhfs.objects.repository.ObjectHeader;
import com.usatiuk.dhfs.objects.jrepository.JObject;
import com.usatiuk.dhfs.objects.jrepository.JObjectData;
import com.usatiuk.dhfs.objects.jrepository.JObjectManager;
import com.usatiuk.dhfs.objects.repository.ConflictResolver;
import com.usatiuk.dhfs.objects.repository.ObjectHeader;
import com.usatiuk.dhfs.objects.repository.PersistentRemoteHostsService;
import com.usatiuk.dhfs.objects.repository.RemoteObjectServiceClient;
import io.grpc.Status;

View File

@@ -8,11 +8,14 @@ import java.util.List;
public interface ObjectPersistentStore {
@Nonnull
List<String> findObjects(String prefix);
@Nonnull
Boolean existsObject(String name);
@Nonnull
ByteString readObject(String name);
void writeObject(String name, ByteString data);
void deleteObject(String name);
}