diff --git a/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/DistributedObjectRepository.java b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/DistributedObjectRepository.java index 4451c9be..828e7e81 100644 --- a/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/DistributedObjectRepository.java +++ b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/DistributedObjectRepository.java @@ -1,6 +1,5 @@ package com.usatiuk.dhfs.storage.objects.repository.distributed; -import com.usatiuk.dhfs.objects.repository.distributed.IndexUpdatePush; import com.usatiuk.dhfs.storage.objects.repository.ObjectRepository; import com.usatiuk.dhfs.storage.objects.repository.persistence.ObjectPersistentStore; import io.quarkus.logging.Log; @@ -39,6 +38,9 @@ public class DistributedObjectRepository implements ObjectRepository { @Inject SyncHandler syncHandler; + @Inject + InvalidationQueueService invalidationQueueService; + void init(@Observes @Priority(400) StartupEvent event) throws IOException { } @@ -94,7 +96,7 @@ public class DistributedObjectRepository implements ObjectRepository { }); // FIXME: Race? try { - syncHandler.notifyUpdateAll(name); + invalidationQueueService.pushInvalidationToAll(name); } catch (Exception e) { Log.error("Error when notifying remote update:"); Log.error(e); diff --git a/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/InvalidationQueue.java b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/InvalidationQueue.java index 74cee895..6ab7fea6 100644 --- a/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/InvalidationQueue.java +++ b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/InvalidationQueue.java @@ -2,7 +2,34 @@ package com.usatiuk.dhfs.storage.objects.repository.distributed; import jakarta.enterprise.context.ApplicationScoped; -@ApplicationScoped +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + public class InvalidationQueue { + private final InvalidationQueueData _data = new InvalidationQueueData(); + private final ReadWriteLock _dataLock = new ReentrantReadWriteLock(); + + @FunctionalInterface + public interface InvalidationQueueDataFn { + R apply(InvalidationQueueData data); + } + + public R runReadLocked(InvalidationQueueDataFn fn) { + _dataLock.readLock().lock(); + try { + return fn.apply(_data); + } finally { + _dataLock.readLock().unlock(); + } + } + + public R runWriteLocked(InvalidationQueueDataFn fn) { + _dataLock.writeLock().lock(); + try { + return fn.apply(_data); + } finally { + _dataLock.writeLock().unlock(); + } + } } diff --git a/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/InvalidationQueueData.java b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/InvalidationQueueData.java new file mode 100644 index 00000000..c88e7bea --- /dev/null +++ b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/InvalidationQueueData.java @@ -0,0 +1,23 @@ +package com.usatiuk.dhfs.storage.objects.repository.distributed; + +import lombok.Getter; + +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.Map; +import java.util.Set; + +public class InvalidationQueueData { + @Getter + private Map> _hostToInvObj = new LinkedHashMap<>(); + + public Set getSetForHost(String host) { + return _hostToInvObj.computeIfAbsent(host, k -> new LinkedHashSet<>()); + } + + public Map> pullAll() { + var ret = _hostToInvObj; + _hostToInvObj = new LinkedHashMap<>(); + return ret; + } +} diff --git a/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/InvalidationQueueService.java b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/InvalidationQueueService.java new file mode 100644 index 00000000..74a86cac --- /dev/null +++ b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/InvalidationQueueService.java @@ -0,0 +1,50 @@ +package com.usatiuk.dhfs.storage.objects.repository.distributed; + +import io.quarkus.logging.Log; +import io.quarkus.scheduler.Scheduled; +import io.smallrye.common.annotation.RunOnVirtualThread; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; + +@ApplicationScoped +public class InvalidationQueueService { + private final InvalidationQueue _data = new InvalidationQueue(); + + @Inject + RemoteHostManager remoteHostManager; + + @Inject + RemoteObjectServiceClient remoteObjectServiceClient; + + @Scheduled(every = "1s", concurrentExecution = Scheduled.ConcurrentExecution.SKIP) + @RunOnVirtualThread + public void trySend() { + var data = _data.runReadLocked(InvalidationQueueData::pullAll); + for (var forHost : data.entrySet()) { + for (var obj : forHost.getValue()) { + try { + remoteObjectServiceClient.notifyUpdate(forHost.getKey(), obj); + } catch (Exception e) { + Log.info("Failed to send invalidation to " + forHost.getKey() + " of " + obj + ": " + e.getMessage() + " will retry"); + pushInvalidationToOne(forHost.getKey(), obj); + } + } + } + } + + public void pushInvalidationToAll(String name) { + _data.runWriteLocked(d -> { + for (var h : remoteHostManager.getSeenHosts()) { + d.getSetForHost(h).add(name); + } + return null; + }); + } + + public void pushInvalidationToOne(String host, String name) { + _data.runWriteLocked(d -> { + d.getSetForHost(host).add(name); + return null; + }); + } +} diff --git a/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/RemoteHostManager.java b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/RemoteHostManager.java index d26c13a5..b001683f 100644 --- a/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/RemoteHostManager.java +++ b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/RemoteHostManager.java @@ -9,6 +9,7 @@ import io.quarkus.logging.Log; import io.quarkus.runtime.ShutdownEvent; import io.quarkus.runtime.StartupEvent; import io.quarkus.scheduler.Scheduled; +import io.smallrye.common.annotation.RunOnVirtualThread; import jakarta.annotation.Priority; import jakarta.enterprise.context.ApplicationScoped; import jakarta.enterprise.event.Observes; @@ -37,7 +38,8 @@ public class RemoteHostManager { void shutdown(@Observes @Priority(250) ShutdownEvent event) throws IOException { } - @Scheduled(every = "10s") + @Scheduled(every = "10s", concurrentExecution = Scheduled.ConcurrentExecution.SKIP) + @RunOnVirtualThread public void tryConnectAll() { for (var host : persistentRemoteHostsService.getHosts()) { var shouldTry = _transientPeersState.runReadLocked(d -> { @@ -170,4 +172,11 @@ public class RemoteHostManager { .filter(e -> e.getValue().getState().equals(TransientPeersStateData.TransientPeerState.ConnectionState.REACHABLE)) .map(Map.Entry::getKey).toList()); } + + public List getSeenHosts() { + return _transientPeersState.runReadLocked(d -> d.getStates().entrySet().stream() + .filter(e -> !e.getValue().getState().equals(TransientPeersStateData.TransientPeerState.ConnectionState.NOT_SEEN)) + .map(Map.Entry::getKey).toList()); + } + } diff --git a/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/SyncHandler.java b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/SyncHandler.java index f7037f79..cdebba8d 100644 --- a/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/SyncHandler.java +++ b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/SyncHandler.java @@ -41,6 +41,9 @@ public class SyncHandler { @Inject RemoteObjectServiceClient remoteObjectServiceClient; + @Inject + InvalidationQueueService invalidationQueueService; + void init(@Observes @Priority(340) StartupEvent event) throws IOException { remoteHostManager.addConnectionSuccessHandler((host) -> { doInitialResync(host); @@ -64,7 +67,7 @@ public class SyncHandler { toPush.add(name); }); for (String name : toPush) { - remoteObjectServiceClient.notifyUpdate(host, name); + invalidationQueueService.pushInvalidationToOne(host, name); } } @@ -121,22 +124,4 @@ public class SyncHandler { return IndexUpdateReply.newBuilder().build(); } - - public void notifyUpdateAll(String name) { - for (var host : remoteHostManager.getAvailableHosts()) - remoteHostManager.withClient(host, client -> { - var meta = objectIndexService.getMeta(name).orElseThrow(() -> { - Log.error("Race when trying to notify update"); - return new NotImplementedException(); - }); - - var builder = IndexUpdatePush.newBuilder().setSelfname(selfname); - - client.indexUpdate(builder.setHeader( - meta.runReadLocked(ObjectMetaData::toRpcHeader) - ).build()); - return null; - }); - } - }