mirror of
https://github.com/usatiuk/dhfs.git
synced 2025-10-28 12:37:48 +01:00
non-blocking index updates
This commit is contained in:
@@ -1,6 +1,5 @@
|
||||
package com.usatiuk.dhfs.storage.objects.repository.distributed;
|
||||
|
||||
import com.usatiuk.dhfs.objects.repository.distributed.IndexUpdatePush;
|
||||
import com.usatiuk.dhfs.storage.objects.repository.ObjectRepository;
|
||||
import com.usatiuk.dhfs.storage.objects.repository.persistence.ObjectPersistentStore;
|
||||
import io.quarkus.logging.Log;
|
||||
@@ -39,6 +38,9 @@ public class DistributedObjectRepository implements ObjectRepository {
|
||||
@Inject
|
||||
SyncHandler syncHandler;
|
||||
|
||||
@Inject
|
||||
InvalidationQueueService invalidationQueueService;
|
||||
|
||||
void init(@Observes @Priority(400) StartupEvent event) throws IOException {
|
||||
|
||||
}
|
||||
@@ -94,7 +96,7 @@ public class DistributedObjectRepository implements ObjectRepository {
|
||||
});
|
||||
// FIXME: Race?
|
||||
try {
|
||||
syncHandler.notifyUpdateAll(name);
|
||||
invalidationQueueService.pushInvalidationToAll(name);
|
||||
} catch (Exception e) {
|
||||
Log.error("Error when notifying remote update:");
|
||||
Log.error(e);
|
||||
|
||||
@@ -2,7 +2,34 @@ package com.usatiuk.dhfs.storage.objects.repository.distributed;
|
||||
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
|
||||
@ApplicationScoped
|
||||
import java.util.concurrent.locks.ReadWriteLock;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
|
||||
public class InvalidationQueue {
|
||||
private final InvalidationQueueData _data = new InvalidationQueueData();
|
||||
private final ReadWriteLock _dataLock = new ReentrantReadWriteLock();
|
||||
|
||||
@FunctionalInterface
|
||||
public interface InvalidationQueueDataFn<R> {
|
||||
R apply(InvalidationQueueData data);
|
||||
}
|
||||
|
||||
public <R> R runReadLocked(InvalidationQueueDataFn<R> fn) {
|
||||
_dataLock.readLock().lock();
|
||||
try {
|
||||
return fn.apply(_data);
|
||||
} finally {
|
||||
_dataLock.readLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
public <R> R runWriteLocked(InvalidationQueueDataFn<R> fn) {
|
||||
_dataLock.writeLock().lock();
|
||||
try {
|
||||
return fn.apply(_data);
|
||||
} finally {
|
||||
_dataLock.writeLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -0,0 +1,23 @@
|
||||
package com.usatiuk.dhfs.storage.objects.repository.distributed;
|
||||
|
||||
import lombok.Getter;
|
||||
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.LinkedHashSet;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
public class InvalidationQueueData {
|
||||
@Getter
|
||||
private Map<String, Set<String>> _hostToInvObj = new LinkedHashMap<>();
|
||||
|
||||
public Set<String> getSetForHost(String host) {
|
||||
return _hostToInvObj.computeIfAbsent(host, k -> new LinkedHashSet<>());
|
||||
}
|
||||
|
||||
public Map<String, Set<String>> pullAll() {
|
||||
var ret = _hostToInvObj;
|
||||
_hostToInvObj = new LinkedHashMap<>();
|
||||
return ret;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,50 @@
|
||||
package com.usatiuk.dhfs.storage.objects.repository.distributed;
|
||||
|
||||
import io.quarkus.logging.Log;
|
||||
import io.quarkus.scheduler.Scheduled;
|
||||
import io.smallrye.common.annotation.RunOnVirtualThread;
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
import jakarta.inject.Inject;
|
||||
|
||||
@ApplicationScoped
|
||||
public class InvalidationQueueService {
|
||||
private final InvalidationQueue _data = new InvalidationQueue();
|
||||
|
||||
@Inject
|
||||
RemoteHostManager remoteHostManager;
|
||||
|
||||
@Inject
|
||||
RemoteObjectServiceClient remoteObjectServiceClient;
|
||||
|
||||
@Scheduled(every = "1s", concurrentExecution = Scheduled.ConcurrentExecution.SKIP)
|
||||
@RunOnVirtualThread
|
||||
public void trySend() {
|
||||
var data = _data.runReadLocked(InvalidationQueueData::pullAll);
|
||||
for (var forHost : data.entrySet()) {
|
||||
for (var obj : forHost.getValue()) {
|
||||
try {
|
||||
remoteObjectServiceClient.notifyUpdate(forHost.getKey(), obj);
|
||||
} catch (Exception e) {
|
||||
Log.info("Failed to send invalidation to " + forHost.getKey() + " of " + obj + ": " + e.getMessage() + " will retry");
|
||||
pushInvalidationToOne(forHost.getKey(), obj);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void pushInvalidationToAll(String name) {
|
||||
_data.runWriteLocked(d -> {
|
||||
for (var h : remoteHostManager.getSeenHosts()) {
|
||||
d.getSetForHost(h).add(name);
|
||||
}
|
||||
return null;
|
||||
});
|
||||
}
|
||||
|
||||
public void pushInvalidationToOne(String host, String name) {
|
||||
_data.runWriteLocked(d -> {
|
||||
d.getSetForHost(host).add(name);
|
||||
return null;
|
||||
});
|
||||
}
|
||||
}
|
||||
@@ -9,6 +9,7 @@ import io.quarkus.logging.Log;
|
||||
import io.quarkus.runtime.ShutdownEvent;
|
||||
import io.quarkus.runtime.StartupEvent;
|
||||
import io.quarkus.scheduler.Scheduled;
|
||||
import io.smallrye.common.annotation.RunOnVirtualThread;
|
||||
import jakarta.annotation.Priority;
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
import jakarta.enterprise.event.Observes;
|
||||
@@ -37,7 +38,8 @@ public class RemoteHostManager {
|
||||
void shutdown(@Observes @Priority(250) ShutdownEvent event) throws IOException {
|
||||
}
|
||||
|
||||
@Scheduled(every = "10s")
|
||||
@Scheduled(every = "10s", concurrentExecution = Scheduled.ConcurrentExecution.SKIP)
|
||||
@RunOnVirtualThread
|
||||
public void tryConnectAll() {
|
||||
for (var host : persistentRemoteHostsService.getHosts()) {
|
||||
var shouldTry = _transientPeersState.runReadLocked(d -> {
|
||||
@@ -170,4 +172,11 @@ public class RemoteHostManager {
|
||||
.filter(e -> e.getValue().getState().equals(TransientPeersStateData.TransientPeerState.ConnectionState.REACHABLE))
|
||||
.map(Map.Entry::getKey).toList());
|
||||
}
|
||||
|
||||
public List<String> getSeenHosts() {
|
||||
return _transientPeersState.runReadLocked(d -> d.getStates().entrySet().stream()
|
||||
.filter(e -> !e.getValue().getState().equals(TransientPeersStateData.TransientPeerState.ConnectionState.NOT_SEEN))
|
||||
.map(Map.Entry::getKey).toList());
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -41,6 +41,9 @@ public class SyncHandler {
|
||||
@Inject
|
||||
RemoteObjectServiceClient remoteObjectServiceClient;
|
||||
|
||||
@Inject
|
||||
InvalidationQueueService invalidationQueueService;
|
||||
|
||||
void init(@Observes @Priority(340) StartupEvent event) throws IOException {
|
||||
remoteHostManager.addConnectionSuccessHandler((host) -> {
|
||||
doInitialResync(host);
|
||||
@@ -64,7 +67,7 @@ public class SyncHandler {
|
||||
toPush.add(name);
|
||||
});
|
||||
for (String name : toPush) {
|
||||
remoteObjectServiceClient.notifyUpdate(host, name);
|
||||
invalidationQueueService.pushInvalidationToOne(host, name);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -121,22 +124,4 @@ public class SyncHandler {
|
||||
|
||||
return IndexUpdateReply.newBuilder().build();
|
||||
}
|
||||
|
||||
public void notifyUpdateAll(String name) {
|
||||
for (var host : remoteHostManager.getAvailableHosts())
|
||||
remoteHostManager.withClient(host, client -> {
|
||||
var meta = objectIndexService.getMeta(name).orElseThrow(() -> {
|
||||
Log.error("Race when trying to notify update");
|
||||
return new NotImplementedException();
|
||||
});
|
||||
|
||||
var builder = IndexUpdatePush.newBuilder().setSelfname(selfname);
|
||||
|
||||
client.indexUpdate(builder.setHeader(
|
||||
meta.runReadLocked(ObjectMetaData::toRpcHeader)
|
||||
).build());
|
||||
return null;
|
||||
});
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user