mirror of
https://github.com/usatiuk/dhfs.git
synced 2025-10-29 04:57:48 +01:00
deferred invalidations
This commit is contained in:
@@ -0,0 +1,17 @@
|
||||
package com.usatiuk.dhfs.objects.repository;
|
||||
|
||||
import lombok.Getter;
|
||||
import org.apache.commons.collections4.MultiValuedMap;
|
||||
import org.apache.commons.collections4.multimap.HashSetValuedHashMap;
|
||||
|
||||
import java.io.Serial;
|
||||
import java.io.Serializable;
|
||||
import java.util.UUID;
|
||||
|
||||
public class DeferredInvalidationQueueData implements Serializable {
|
||||
@Serial
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
@Getter
|
||||
private final MultiValuedMap<UUID, String> _deferredInvalidations = new HashSetValuedHashMap<>();
|
||||
}
|
||||
@@ -0,0 +1,66 @@
|
||||
package com.usatiuk.dhfs.objects.repository;
|
||||
|
||||
import com.usatiuk.dhfs.SerializationHelper;
|
||||
import io.quarkus.logging.Log;
|
||||
import io.quarkus.runtime.ShutdownEvent;
|
||||
import io.quarkus.runtime.StartupEvent;
|
||||
import jakarta.annotation.Priority;
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
import jakarta.enterprise.event.Observes;
|
||||
import jakarta.inject.Inject;
|
||||
import org.apache.commons.lang3.SerializationUtils;
|
||||
import org.eclipse.microprofile.config.inject.ConfigProperty;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Paths;
|
||||
import java.util.UUID;
|
||||
|
||||
@ApplicationScoped
|
||||
public class DeferredInvalidationQueueService {
|
||||
@Inject
|
||||
RemoteHostManager remoteHostManager;
|
||||
@Inject
|
||||
InvalidationQueueService invalidationQueueService;
|
||||
@ConfigProperty(name = "dhfs.objects.root")
|
||||
String dataRoot;
|
||||
private static final String dataFileName = "hosts";
|
||||
|
||||
// FIXME: DB when?
|
||||
private DeferredInvalidationQueueData _persistentData = new DeferredInvalidationQueueData();
|
||||
|
||||
void init(@Observes @Priority(300) StartupEvent event) throws IOException {
|
||||
Paths.get(dataRoot).toFile().mkdirs();
|
||||
Log.info("Initializing with root " + dataRoot);
|
||||
if (Paths.get(dataRoot).resolve(dataFileName).toFile().exists()) {
|
||||
Log.info("Reading hosts");
|
||||
_persistentData = SerializationHelper.deserialize(Files.readAllBytes(Paths.get(dataRoot).resolve(dataFileName)));
|
||||
}
|
||||
|
||||
remoteHostManager.registerConnectEventListener(this::returnForHost);
|
||||
}
|
||||
|
||||
void shutdown(@Observes @Priority(300) ShutdownEvent event) throws IOException {
|
||||
Log.info("Saving deferred invalidations");
|
||||
Files.write(Paths.get(dataRoot).resolve(dataFileName), SerializationUtils.serialize(_persistentData));
|
||||
Log.info("Saved deferred invalidations");
|
||||
}
|
||||
|
||||
void returnForHost(UUID host) {
|
||||
synchronized (this) {
|
||||
var col = _persistentData.getDeferredInvalidations().get(host);
|
||||
for (var s : col) {
|
||||
Log.trace("Un-deferred invalidation to " + host + " of " + s);
|
||||
invalidationQueueService.pushDeferredInvalidations(host, s);
|
||||
}
|
||||
col.clear();
|
||||
}
|
||||
}
|
||||
|
||||
void defer(UUID host, String object) {
|
||||
synchronized (this) {
|
||||
Log.trace("Deferred invalidation to " + host + " of " + object);
|
||||
_persistentData.getDeferredInvalidations().put(host, object);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -10,11 +10,13 @@ import jakarta.annotation.Priority;
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
import jakarta.enterprise.event.Observes;
|
||||
import jakarta.inject.Inject;
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
import org.eclipse.microprofile.config.inject.ConfigProperty;
|
||||
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
@ApplicationScoped
|
||||
public class InvalidationQueueService {
|
||||
@@ -30,13 +32,13 @@ public class InvalidationQueueService {
|
||||
@Inject
|
||||
PersistentRemoteHostsService persistentRemoteHostsService;
|
||||
|
||||
@Inject
|
||||
DeferredInvalidationQueueService deferredInvalidationQueueService;
|
||||
|
||||
@ConfigProperty(name = "dhfs.objects.invalidation.threads")
|
||||
int threads;
|
||||
|
||||
private record QueueEntry(UUID host, String obj) {
|
||||
}
|
||||
|
||||
private final HashSetDelayedBlockingQueue<QueueEntry> _queue;
|
||||
private final HashSetDelayedBlockingQueue<Pair<UUID, String>> _queue;
|
||||
private ExecutorService _executor;
|
||||
|
||||
public InvalidationQueueService(@ConfigProperty(name = "dhfs.objects.invalidation.delay") int delay) {
|
||||
@@ -54,6 +56,9 @@ public class InvalidationQueueService {
|
||||
|
||||
void shutdown(@Observes @Priority(10) ShutdownEvent event) throws InterruptedException {
|
||||
_executor.shutdownNow();
|
||||
if (!_executor.awaitTermination(30, TimeUnit.SECONDS)) {
|
||||
Log.error("Failed to shut down invalidation sender thread");
|
||||
}
|
||||
}
|
||||
|
||||
private void sender() {
|
||||
@@ -65,21 +70,26 @@ public class InvalidationQueueService {
|
||||
long success = 0;
|
||||
|
||||
for (var e : data) {
|
||||
if (!persistentRemoteHostsService.existsHost(e.host)) continue;
|
||||
if (!persistentRemoteHostsService.existsHost(e.getLeft())) continue;
|
||||
|
||||
if (!remoteHostManager.isReachable(e.getLeft())) {
|
||||
deferredInvalidationQueueService.defer(e.getLeft(), e.getRight());
|
||||
continue;
|
||||
}
|
||||
|
||||
try {
|
||||
jObjectManager.get(e.obj).ifPresent(obj -> {
|
||||
remoteObjectServiceClient.notifyUpdate(obj, e.host);
|
||||
jObjectManager.get(e.getRight()).ifPresent(obj -> {
|
||||
remoteObjectServiceClient.notifyUpdate(obj, e.getLeft());
|
||||
});
|
||||
success++;
|
||||
} catch (DeletedObjectAccessException ignored) {
|
||||
} catch (Exception ex) {
|
||||
Log.info("Failed to send invalidation to " + e.host + ", will retry", ex);
|
||||
pushInvalidationToOne(e.host, e.obj);
|
||||
Log.info("Failed to send invalidation to " + e.getLeft() + ", will retry", ex);
|
||||
pushInvalidationToOne(e.getLeft(), e.getRight());
|
||||
}
|
||||
if (Thread.interrupted()) {
|
||||
Log.info("Invalidation sender exiting");
|
||||
return;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -94,16 +104,29 @@ public class InvalidationQueueService {
|
||||
} catch (InterruptedException ignored) {
|
||||
}
|
||||
Log.info("Invalidation sender exiting");
|
||||
var data = _queue.close();
|
||||
for (var e : data)
|
||||
deferredInvalidationQueueService.defer(e.getLeft(), e.getRight());
|
||||
Log.info("Invalidation sender exited");
|
||||
}
|
||||
|
||||
public void pushInvalidationToAll(String name) {
|
||||
var hosts = remoteHostManager.getSeenHosts();
|
||||
for (var h : hosts) {
|
||||
_queue.add(new QueueEntry(h, name));
|
||||
}
|
||||
var hosts = remoteHostManager.getAvailableHosts();
|
||||
for (var h : hosts)
|
||||
_queue.add(Pair.of(h, name));
|
||||
var unavailable = remoteHostManager.getUnavailableHosts();
|
||||
for (var u : unavailable)
|
||||
deferredInvalidationQueueService.defer(u, name);
|
||||
}
|
||||
|
||||
public void pushInvalidationToOne(UUID host, String name) {
|
||||
_queue.add(new QueueEntry(host, name));
|
||||
if (remoteHostManager.isReachable(host))
|
||||
_queue.add(Pair.of(host, name));
|
||||
else
|
||||
deferredInvalidationQueueService.returnForHost(host);
|
||||
}
|
||||
|
||||
protected void pushDeferredInvalidations(UUID host, String name) {
|
||||
_queue.add(Pair.of(host, name));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -7,6 +7,7 @@ import java.io.Serial;
|
||||
import java.io.Serializable;
|
||||
import java.security.KeyPair;
|
||||
import java.security.cert.X509Certificate;
|
||||
import java.util.HashSet;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
@@ -24,4 +25,7 @@ public class PersistentRemoteHostsData implements Serializable {
|
||||
@Getter
|
||||
@Setter
|
||||
private KeyPair _selfKeyPair = null;
|
||||
|
||||
@Getter
|
||||
private final HashSet<UUID> _initialSyncDone = new HashSet<>();
|
||||
}
|
||||
|
||||
@@ -201,6 +201,7 @@ public class PersistentRemoteHostsService {
|
||||
boolean removed = getPeerDirectory().runWriteLocked(JObject.ResolutionStrategy.LOCAL_ONLY, (m, d, b, v) -> {
|
||||
boolean removedInner = d.getPeers().remove(host);
|
||||
if (removedInner) {
|
||||
_persistentData.runWriteLocked(pd -> pd.getInitialSyncDone().remove(host));
|
||||
getPeer(host).runWriteLocked(JObject.ResolutionStrategy.LOCAL_ONLY, (mp, dp, bp, vp) -> {
|
||||
mp.removeRef(m.getName());
|
||||
return null;
|
||||
@@ -240,4 +241,9 @@ public class PersistentRemoteHostsService {
|
||||
return _persistentData.runReadLocked(PersistentRemoteHostsData::getSelfCertificate);
|
||||
}
|
||||
|
||||
// Returns true if host's initial sync wasn't done before, and marks it as done
|
||||
public boolean markInitialSyncDone(UUID connectedHost) {
|
||||
return _persistentData.runWriteLocked(d -> d.getInitialSyncDone().add(connectedHost));
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -17,13 +17,15 @@ import org.eclipse.microprofile.config.inject.ConfigProperty;
|
||||
import java.io.IOException;
|
||||
import java.security.cert.CertificateException;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.*;
|
||||
|
||||
@ApplicationScoped
|
||||
public class RemoteHostManager {
|
||||
private final TransientPeersState _transientPeersState = new TransientPeersState();
|
||||
private final ConcurrentMap<UUID, TransientPeerState> _seenHostsButNotAdded = new ConcurrentHashMap<>();
|
||||
// FIXME: Ideally not call them on every ping
|
||||
private final ArrayList<ConnectionEventListener> _connectedListeners = new ArrayList<>();
|
||||
private final ArrayList<ConnectionEventListener> _disconnectedListeners = new ArrayList<>();
|
||||
@Inject
|
||||
PersistentRemoteHostsService persistentRemoteHostsService;
|
||||
@Inject
|
||||
@@ -34,9 +36,18 @@ public class RemoteHostManager {
|
||||
PeerSyncApiClientDynamic peerSyncApiClient;
|
||||
@ConfigProperty(name = "dhfs.objects.sync.ping.timeout")
|
||||
long pingTimeout;
|
||||
private ExecutorService _heartbeatExecutor;
|
||||
boolean _initialized = false;
|
||||
|
||||
// Note: keep priority updated with below
|
||||
void init(@Observes @Priority(350) StartupEvent event) throws IOException {
|
||||
_heartbeatExecutor = Executors.newVirtualThreadPerTaskExecutor();
|
||||
|
||||
// Note: newly added hosts aren't in _transientPeersState
|
||||
// but that's ok as they don't have initialSyncDone set
|
||||
for (var h : persistentRemoteHostsService.getHostsUuid())
|
||||
_transientPeersState.runWriteLocked(d -> d.get(h));
|
||||
|
||||
_initialized = true;
|
||||
}
|
||||
|
||||
@@ -47,54 +58,84 @@ public class RemoteHostManager {
|
||||
@Blocking
|
||||
public void tryConnectAll() {
|
||||
if (!_initialized) return;
|
||||
for (var host : persistentRemoteHostsService.getHosts()) {
|
||||
try {
|
||||
var shouldTry = _transientPeersState.runReadLocked(d -> {
|
||||
var s = d.getStates().get(host.getUuid());
|
||||
if (s == null) return true;
|
||||
return !s.getState().equals(TransientPeerState.ConnectionState.REACHABLE) && s.getAddr() != null;
|
||||
});
|
||||
if (shouldTry) {
|
||||
Log.info("Trying to connect to " + host.getUuid());
|
||||
if (pingCheck(host.getUuid())) {
|
||||
handleConnectionSuccess(host.getUuid());
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
Log.error("Failed to connect to " + host.getUuid(), e);
|
||||
continue;
|
||||
}
|
||||
try {
|
||||
_heartbeatExecutor.invokeAll(persistentRemoteHostsService.getHostsUuid().stream()
|
||||
.<Callable<Void>>map(host -> () -> {
|
||||
try {
|
||||
Log.debug("Trying to connect to " + host);
|
||||
if (pingCheck(host))
|
||||
handleConnectionSuccess(host);
|
||||
else
|
||||
handleConnectionError(host);
|
||||
} catch (Exception e) {
|
||||
Log.error("Failed to connect to " + host, e);
|
||||
}
|
||||
return null;
|
||||
}).toList(), 30, TimeUnit.SECONDS); //FIXME:
|
||||
} catch (InterruptedException iex) {
|
||||
Log.error("Heartbeat was interrupted");
|
||||
}
|
||||
}
|
||||
|
||||
@FunctionalInterface
|
||||
public interface ConnectionEventListener {
|
||||
void apply(UUID host);
|
||||
}
|
||||
|
||||
// Note: registrations should be completed with Priority < 350
|
||||
public void registerConnectEventListener(ConnectionEventListener listener) {
|
||||
synchronized (_connectedListeners) {
|
||||
_connectedListeners.add(listener);
|
||||
}
|
||||
}
|
||||
|
||||
// Note: registrations should be completed with Priority < 350
|
||||
public void registerDisconnectEventListener(ConnectionEventListener listener) {
|
||||
synchronized (_disconnectedListeners) {
|
||||
_disconnectedListeners.add(listener);
|
||||
}
|
||||
}
|
||||
|
||||
public void handleConnectionSuccess(UUID host) {
|
||||
if (_transientPeersState.runReadLocked(d -> d.getStates().getOrDefault(
|
||||
host, new TransientPeerState(TransientPeerState.ConnectionState.NOT_SEEN)
|
||||
)).getState().equals(TransientPeerState.ConnectionState.REACHABLE)) return;
|
||||
boolean wasReachable = isReachable(host);
|
||||
|
||||
_transientPeersState.runWriteLocked(d -> {
|
||||
d.getStates().putIfAbsent(host, new TransientPeerState());
|
||||
var curState = d.getStates().get(host);
|
||||
curState.setState(TransientPeerState.ConnectionState.REACHABLE);
|
||||
d.get(host).setReachable(true);
|
||||
return null;
|
||||
});
|
||||
|
||||
for (var l : _connectedListeners) {
|
||||
l.apply(host);
|
||||
}
|
||||
|
||||
if (wasReachable) return;
|
||||
|
||||
Log.info("Connected to " + host);
|
||||
syncHandler.doInitialResync(host);
|
||||
|
||||
if (persistentRemoteHostsService.markInitialSyncDone(host))
|
||||
syncHandler.doInitialResync(host);
|
||||
}
|
||||
|
||||
public void handleConnectionError(UUID host) {
|
||||
Log.info("Lost connection to " + host);
|
||||
boolean wasReachable = isReachable(host);
|
||||
|
||||
if (wasReachable)
|
||||
Log.info("Lost connection to " + host);
|
||||
|
||||
_transientPeersState.runWriteLocked(d -> {
|
||||
d.getStates().putIfAbsent(host, new TransientPeerState());
|
||||
var curState = d.getStates().get(host);
|
||||
curState.setState(TransientPeerState.ConnectionState.UNREACHABLE);
|
||||
d.get(host).setReachable(false);
|
||||
return null;
|
||||
});
|
||||
|
||||
for (var l : _disconnectedListeners) {
|
||||
l.apply(host);
|
||||
}
|
||||
}
|
||||
|
||||
// FIXME:
|
||||
private boolean pingCheck(UUID host) {
|
||||
TransientPeerState state = _transientPeersState.runReadLocked(s -> s.getStates().get(host));
|
||||
if (state == null) return false;
|
||||
TransientPeerState state = _transientPeersState.runReadLocked(s -> s.getCopy(host));
|
||||
|
||||
try {
|
||||
return rpcClientFactory.withObjSyncClient(host.toString(), state.getAddr(), state.getSecurePort(), pingTimeout, c -> {
|
||||
var ret = c.ping(PingRequest.newBuilder().setSelfUuid(persistentRemoteHostsService.getSelfUuid().toString()).build());
|
||||
@@ -110,28 +151,22 @@ public class RemoteHostManager {
|
||||
}
|
||||
|
||||
public boolean isReachable(UUID host) {
|
||||
return _transientPeersState.runReadLocked(d -> {
|
||||
var res = d.getStates().get(host);
|
||||
return res.getState() == TransientPeerState.ConnectionState.REACHABLE;
|
||||
});
|
||||
return _transientPeersState.runReadLocked(d -> d.get(host).isReachable());
|
||||
}
|
||||
|
||||
public TransientPeerState getTransientState(UUID host) {
|
||||
return _transientPeersState.runReadLocked(d -> {
|
||||
d.getStates().putIfAbsent(host, new TransientPeerState());
|
||||
return d.getStates().get(host);
|
||||
});
|
||||
return _transientPeersState.runReadLocked(d -> d.getCopy(host));
|
||||
}
|
||||
|
||||
public List<UUID> getAvailableHosts() {
|
||||
return _transientPeersState.runReadLocked(d -> d.getStates().entrySet().stream()
|
||||
.filter(e -> e.getValue().getState().equals(TransientPeerState.ConnectionState.REACHABLE))
|
||||
.filter(e -> e.getValue().isReachable())
|
||||
.map(Map.Entry::getKey).toList());
|
||||
}
|
||||
|
||||
public List<UUID> getSeenHosts() {
|
||||
public List<UUID> getUnavailableHosts() {
|
||||
return _transientPeersState.runReadLocked(d -> d.getStates().entrySet().stream()
|
||||
.filter(e -> !e.getValue().getState().equals(TransientPeerState.ConnectionState.NOT_SEEN))
|
||||
.filter(e -> !e.getValue().isReachable())
|
||||
.map(Map.Entry::getKey).toList());
|
||||
}
|
||||
|
||||
@@ -154,10 +189,9 @@ public class RemoteHostManager {
|
||||
|
||||
_transientPeersState.runWriteLocked(d -> {
|
||||
// Log.trace("Updating connection info for " + host + ": addr=" + addr + " port=" + port);
|
||||
d.getStates().putIfAbsent(host, new TransientPeerState()); // FIXME:? set reachable here?
|
||||
d.getStates().get(host).setAddr(addr);
|
||||
d.getStates().get(host).setPort(port);
|
||||
d.getStates().get(host).setSecurePort(securePort);
|
||||
d.get(host).setAddr(addr);
|
||||
d.get(host).setPort(port);
|
||||
d.get(host).setSecurePort(securePort);
|
||||
return null;
|
||||
});
|
||||
}
|
||||
|
||||
@@ -52,7 +52,7 @@ public class RemoteObjectServiceClient {
|
||||
.filter(entry -> entry.getValue().equals(ourVersion))
|
||||
.map(Map.Entry::getKey).toList();
|
||||
else
|
||||
return persistentRemoteHostsService.getHosts().stream().map(PersistentPeerInfo::getUuid).toList();
|
||||
return persistentRemoteHostsService.getHostsUuid();
|
||||
});
|
||||
|
||||
if (targets.isEmpty())
|
||||
|
||||
@@ -4,7 +4,6 @@ import com.usatiuk.dhfs.objects.repository.peertrust.PeerTrustManager;
|
||||
import io.grpc.ChannelCredentials;
|
||||
import io.grpc.ManagedChannel;
|
||||
import io.grpc.TlsChannelCredentials;
|
||||
import io.grpc.netty.NegotiationType;
|
||||
import io.grpc.netty.NettyChannelBuilder;
|
||||
import io.quarkus.runtime.ShutdownEvent;
|
||||
import jakarta.annotation.Priority;
|
||||
@@ -27,11 +26,9 @@ public class RpcChannelFactory {
|
||||
@Inject
|
||||
PeerTrustManager peerTrustManager;
|
||||
private ConcurrentMap<SecureChannelKey, ManagedChannel> _secureChannelCache = new ConcurrentHashMap<>();
|
||||
private ConcurrentMap<InsecureChannelKey, ManagedChannel> _insecureChannelCache = new ConcurrentHashMap<>();
|
||||
|
||||
void shutdown(@Observes @Priority(100000) ShutdownEvent event) {
|
||||
for (var c : _secureChannelCache.values()) c.shutdownNow();
|
||||
for (var i : _insecureChannelCache.values()) i.shutdownNow();
|
||||
}
|
||||
|
||||
private ChannelCredentials getChannelCredentials() {
|
||||
@@ -58,20 +55,10 @@ public class RpcChannelFactory {
|
||||
});
|
||||
}
|
||||
|
||||
ManagedChannel getInsecureChannel(String address, int port) {
|
||||
var key = new InsecureChannelKey(address, port);
|
||||
return _insecureChannelCache.computeIfAbsent(key, (k) -> {
|
||||
return NettyChannelBuilder.forAddress(address, port).negotiationType(NegotiationType.PLAINTEXT).idleTimeout(10, TimeUnit.SECONDS).usePlaintext().build();
|
||||
});
|
||||
}
|
||||
|
||||
public void dropCache() {
|
||||
var oldS = _secureChannelCache;
|
||||
var oldI = _insecureChannelCache;
|
||||
_secureChannelCache = new ConcurrentHashMap<>();
|
||||
_insecureChannelCache = new ConcurrentHashMap<>();
|
||||
oldS.values().forEach(ManagedChannel::shutdown);
|
||||
oldI.values().forEach(ManagedChannel::shutdown);
|
||||
}
|
||||
|
||||
private record SecureChannelKey(String host, String address, int port) {
|
||||
|
||||
@@ -36,38 +36,27 @@ public class RpcClientFactory {
|
||||
var shuffledList = new ArrayList<>(targets);
|
||||
Collections.shuffle(shuffledList);
|
||||
for (UUID target : shuffledList) {
|
||||
var hostinfo = remoteHostManager.getTransientState(target);
|
||||
|
||||
boolean reachable = remoteHostManager.isReachable(target);
|
||||
var addr = hostinfo.getAddr();
|
||||
boolean shouldTry = reachable && addr != null;
|
||||
|
||||
if (!shouldTry) {
|
||||
Log.trace("Not trying " + target + ": " + "addr=" + addr + " reachable=" + reachable);
|
||||
continue;
|
||||
}
|
||||
|
||||
try {
|
||||
return withObjSyncClient(target.toString(), hostinfo.getAddr(), hostinfo.getSecurePort(), syncTimeout, fn);
|
||||
return withObjSyncClient(target, fn);
|
||||
} catch (StatusRuntimeException e) {
|
||||
if (e.getStatus().getCode().equals(Status.UNAVAILABLE.getCode())) {
|
||||
Log.info("Host " + target + " is unreachable: " + e.getMessage());
|
||||
remoteHostManager.handleConnectionError(target);
|
||||
} else {
|
||||
Log.error("When calling " + target, e);
|
||||
continue;
|
||||
}
|
||||
if (e.getStatus().getCode().equals(Status.UNAVAILABLE.getCode()))
|
||||
Log.trace("Host " + target + " is unreachable: " + e.getMessage());
|
||||
else
|
||||
Log.warn("When calling " + target + " " + e.getMessage());
|
||||
} catch (Exception e) {
|
||||
Log.error("When calling " + target, e);
|
||||
continue;
|
||||
Log.warn("When calling " + target + " " + e.getMessage());
|
||||
}
|
||||
}
|
||||
throw new IllegalStateException("No reachable targets!");
|
||||
throw new StatusRuntimeException(Status.UNAVAILABLE.withDescription("No reachable targets!"));
|
||||
}
|
||||
|
||||
public <R> R withObjSyncClient(UUID target, ObjectSyncClientFunction<R> fn) {
|
||||
var hostinfo = remoteHostManager.getTransientState(target);
|
||||
if (hostinfo.getAddr() == null) throw new IllegalStateException("Address for " + target + " not yet known");
|
||||
boolean reachable = remoteHostManager.isReachable(target);
|
||||
|
||||
if (hostinfo.getAddr() == null || !reachable)
|
||||
throw new StatusRuntimeException(Status.UNAVAILABLE.withDescription("Address for " + target + " not yet known"));
|
||||
|
||||
return withObjSyncClient(target.toString(), hostinfo.getAddr(), hostinfo.getSecurePort(), syncTimeout, fn);
|
||||
}
|
||||
|
||||
|
||||
@@ -34,6 +34,8 @@ public class SyncHandler {
|
||||
PersistentRemoteHostsService persistentRemoteHostsService;
|
||||
|
||||
public void doInitialResync(UUID host) {
|
||||
Log.info("Doing initial resync for " + host);
|
||||
|
||||
remoteObjectServiceClient.getIndex(host);
|
||||
// Push our index to the other peer too, as they might not request it if
|
||||
// they didn't thing we were disconnected
|
||||
|
||||
@@ -10,7 +10,7 @@ import lombok.Setter;
|
||||
public class TransientPeerState {
|
||||
@Getter
|
||||
@Setter
|
||||
private ConnectionState _state = ConnectionState.NOT_SEEN;
|
||||
private boolean _reachable = false;
|
||||
@Getter
|
||||
@Setter
|
||||
private String _addr;
|
||||
@@ -21,13 +21,14 @@ public class TransientPeerState {
|
||||
@Setter
|
||||
private int _securePort;
|
||||
|
||||
public TransientPeerState(ConnectionState connectionState) {
|
||||
_state = connectionState;
|
||||
public TransientPeerState(boolean reachable) {
|
||||
_reachable = reachable;
|
||||
}
|
||||
|
||||
public enum ConnectionState {
|
||||
NOT_SEEN,
|
||||
REACHABLE,
|
||||
UNREACHABLE
|
||||
public TransientPeerState(TransientPeerState source) {
|
||||
_reachable = source._reachable;
|
||||
_addr = source._addr;
|
||||
_port = source._port;
|
||||
_securePort = source._securePort;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -11,4 +11,12 @@ public class TransientPeersStateData {
|
||||
|
||||
@Getter
|
||||
private final Map<UUID, TransientPeerState> _states = new LinkedHashMap<>();
|
||||
|
||||
TransientPeerState get(UUID host) {
|
||||
return _states.computeIfAbsent(host, k -> new TransientPeerState());
|
||||
}
|
||||
|
||||
TransientPeerState getCopy(UUID host) {
|
||||
return new TransientPeerState(get(host));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5,6 +5,7 @@ import io.quarkus.logging.Log;
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
import org.junit.jupiter.api.*;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.testcontainers.DockerClientFactory;
|
||||
import org.testcontainers.containers.GenericContainer;
|
||||
import org.testcontainers.containers.Network;
|
||||
import org.testcontainers.containers.output.Slf4jLogConsumer;
|
||||
@@ -120,6 +121,51 @@ public class DhfsFuseIT {
|
||||
Assertions.assertEquals("rewritten\n", container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_fuse_root/testf1").getStdout());
|
||||
}
|
||||
|
||||
@Test
|
||||
void createDelayedTest() throws IOException, InterruptedException, TimeoutException {
|
||||
Assertions.assertEquals(0, container1.execInContainer("/bin/sh", "-c", "echo tesempty > /root/dhfs_data/dhfs_fuse_root/testf1").getExitCode());
|
||||
Thread.sleep(1000);
|
||||
Assertions.assertEquals("tesempty\n", container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_fuse_root/testf1").getStdout());
|
||||
|
||||
var client = DockerClientFactory.instance().client();
|
||||
client.pauseContainerCmd(container2.getContainerId()).exec();
|
||||
|
||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Lost connection to"), 60, TimeUnit.SECONDS);
|
||||
|
||||
Assertions.assertEquals(0, container1.execInContainer("/bin/sh", "-c", "echo newfile > /root/dhfs_data/dhfs_fuse_root/testf2").getExitCode());
|
||||
|
||||
client.unpauseContainerCmd(container2.getContainerId()).exec();
|
||||
|
||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
|
||||
|
||||
Thread.sleep(2000);
|
||||
Assertions.assertEquals("newfile\n", container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_fuse_root/testf2").getStdout());
|
||||
Assertions.assertEquals("newfile\n", container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_fuse_root/testf2").getStdout());
|
||||
}
|
||||
|
||||
@Test
|
||||
void writeRewriteDelayedTest() throws IOException, InterruptedException, TimeoutException {
|
||||
Assertions.assertEquals(0, container1.execInContainer("/bin/sh", "-c", "echo tesempty > /root/dhfs_data/dhfs_fuse_root/testf1").getExitCode());
|
||||
Thread.sleep(1000);
|
||||
Assertions.assertEquals("tesempty\n", container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_fuse_root/testf1").getStdout());
|
||||
|
||||
var client = DockerClientFactory.instance().client();
|
||||
client.pauseContainerCmd(container2.getContainerId()).exec();
|
||||
|
||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Lost connection to"), 60, TimeUnit.SECONDS);
|
||||
|
||||
Assertions.assertEquals(0, container1.execInContainer("/bin/sh", "-c", "echo rewritten > /root/dhfs_data/dhfs_fuse_root/testf1").getExitCode());
|
||||
|
||||
client.unpauseContainerCmd(container2.getContainerId()).exec();
|
||||
|
||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
|
||||
|
||||
Thread.sleep(2000);
|
||||
Assertions.assertEquals("rewritten\n", container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_fuse_root/testf1").getStdout());
|
||||
Assertions.assertEquals("rewritten\n", container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_fuse_root/testf1").getStdout());
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
void moveFileTest() throws IOException, InterruptedException, TimeoutException {
|
||||
Log.info("Creating");
|
||||
@@ -162,7 +208,7 @@ public class DhfsFuseIT {
|
||||
" http://localhost:8080/objects-manage/known-peers");
|
||||
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
|
||||
|
||||
Thread.sleep(1000);
|
||||
Thread.sleep(2000);
|
||||
Assertions.assertEquals("rewritten\n", container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_fuse_root/testf1").getStdout());
|
||||
Assertions.assertEquals("rewritten\n", container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_fuse_root/testf1").getStdout());
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user