From 259ad7763e6f12f07ec7dd17dd95d842970d78bf Mon Sep 17 00:00:00 2001 From: Stepan Usatiuk Date: Sun, 23 Jun 2024 11:34:22 +0200 Subject: [PATCH] use more configs --- .../objects/jrepository/JObjectWriteback.java | 14 ++++++++++---- .../distributed/InvalidationQueueService.java | 11 +++++++++-- .../repository/distributed/RemoteHostManager.java | 2 +- .../LocalPeerDiscoveryBroadcaster.java | 12 +++++++++--- server/src/main/resources/application.properties | 9 ++++++++- 5 files changed, 37 insertions(+), 11 deletions(-) diff --git a/server/src/main/java/com/usatiuk/dhfs/storage/objects/jrepository/JObjectWriteback.java b/server/src/main/java/com/usatiuk/dhfs/storage/objects/jrepository/JObjectWriteback.java index c79468ad..9ec402ab 100644 --- a/server/src/main/java/com/usatiuk/dhfs/storage/objects/jrepository/JObjectWriteback.java +++ b/server/src/main/java/com/usatiuk/dhfs/storage/objects/jrepository/JObjectWriteback.java @@ -10,6 +10,7 @@ import jakarta.enterprise.context.ApplicationScoped; import jakarta.enterprise.event.Observes; import jakarta.inject.Inject; import org.apache.commons.lang3.tuple.Pair; +import org.eclipse.microprofile.config.inject.ConfigProperty; import java.util.ArrayList; import java.util.Collection; @@ -25,8 +26,13 @@ public class JObjectWriteback { @Inject JObjectManager jObjectManager; + @ConfigProperty(name = "dhfs.objects.writeback.delay") + Integer delay; + + @ConfigProperty(name = "dhfs.objects.writeback.limit") + Integer limit; + private final LinkedHashMap>> _objects = new LinkedHashMap<>(); - private final LinkedHashSet _toIgnore = new LinkedHashSet<>(); private Thread _writebackThread; @@ -60,7 +66,7 @@ public class JObjectWriteback { boolean wait = false; while (!Thread.interrupted()) { if (wait) { - Thread.sleep(500); + Thread.sleep(delay); wait = false; } JObject obj; @@ -68,7 +74,7 @@ public class JObjectWriteback { while (_objects.isEmpty()) _objects.wait(); - if ((System.currentTimeMillis() - _objects.firstEntry().getValue().getLeft()) < 500L) { + if ((System.currentTimeMillis() - _objects.firstEntry().getValue().getLeft()) < delay) { wait = true; continue; } @@ -124,7 +130,7 @@ public class JObjectWriteback { } // FIXME: better logic - if (_objects.size() < 10000) { + if (_objects.size() < limit) { _objects.put(name, Pair.of(System.currentTimeMillis(), object)); _objects.notifyAll(); return; diff --git a/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/InvalidationQueueService.java b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/InvalidationQueueService.java index 81628916..6d60380a 100644 --- a/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/InvalidationQueueService.java +++ b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/InvalidationQueueService.java @@ -7,6 +7,7 @@ import jakarta.annotation.Priority; import jakarta.enterprise.context.ApplicationScoped; import jakarta.enterprise.event.Observes; import jakarta.inject.Inject; +import org.eclipse.microprofile.config.inject.ConfigProperty; import java.util.*; @@ -18,6 +19,12 @@ public class InvalidationQueueService { @Inject RemoteObjectServiceClient remoteObjectServiceClient; + @ConfigProperty(name = "dhfs.objects.distributed.invalidation.batch_size") + Integer batchSize; + + @ConfigProperty(name = "dhfs.objects.distributed.invalidation.delay") + Integer delay; + private Map> _hostToInvObj = new LinkedHashMap<>(); private Thread _senderThread; @@ -53,7 +60,7 @@ public class InvalidationQueueService { private void sender() { try { while (!Thread.interrupted()) { - Thread.sleep(1000); + Thread.sleep(delay); var data = pullAll(); String stats = "Sent invalidation: "; for (var forHost : data.entrySet()) { @@ -61,7 +68,7 @@ public class InvalidationQueueService { while (!forHost.getValue().isEmpty()) { ArrayList chunk = new ArrayList<>(); - while (chunk.size() < 1000 && !forHost.getValue().isEmpty()) { + while (chunk.size() < batchSize && !forHost.getValue().isEmpty()) { chunk.add(forHost.getValue().removeFirst()); } diff --git a/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/RemoteHostManager.java b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/RemoteHostManager.java index d411d283..462bb3e9 100644 --- a/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/RemoteHostManager.java +++ b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/RemoteHostManager.java @@ -40,7 +40,7 @@ public class RemoteHostManager { void shutdown(@Observes @Priority(250) ShutdownEvent event) throws IOException { } - @Scheduled(every = "2s", concurrentExecution = Scheduled.ConcurrentExecution.SKIP) + @Scheduled(every = "${dhfs.objects.distributed.reconnect_interval}", concurrentExecution = Scheduled.ConcurrentExecution.SKIP) @Blocking public void tryConnectAll() { for (var host : persistentRemoteHostsService.getHosts()) { diff --git a/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/peerdiscovery/LocalPeerDiscoveryBroadcaster.java b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/peerdiscovery/LocalPeerDiscoveryBroadcaster.java index 205c9a9b..997e6ba0 100644 --- a/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/peerdiscovery/LocalPeerDiscoveryBroadcaster.java +++ b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/peerdiscovery/LocalPeerDiscoveryBroadcaster.java @@ -22,6 +22,12 @@ public class LocalPeerDiscoveryBroadcaster { @ConfigProperty(name = "quarkus.http.port") Integer ourPort; + @ConfigProperty(name = "dhfs.objects.distributed.peerdiscovery.port") + Integer broadcastPort; + + @ConfigProperty(name = "dhfs.objects.distributed.peerdiscovery.interval") + Integer broadcastInterval; + private Thread _broadcasterThread; private DatagramSocket _socket; @@ -50,7 +56,7 @@ public class LocalPeerDiscoveryBroadcaster { private void broadcast() { try { while (!Thread.interrupted()) { - Thread.sleep(10000L); + Thread.sleep(broadcastInterval); try { var sendData = PeerDiscoveryInfo.newBuilder() @@ -62,7 +68,7 @@ public class LocalPeerDiscoveryBroadcaster { DatagramPacket sendPacket = new DatagramPacket(sendBytes, sendBytes.length, - InetAddress.getByName("255.255.255.255"), 42069); + InetAddress.getByName("255.255.255.255"), broadcastPort); _socket.send(sendPacket); @@ -85,7 +91,7 @@ public class LocalPeerDiscoveryBroadcaster { } try { - sendPacket = new DatagramPacket(sendBytes, sendBytes.length, broadcast, 42069); + sendPacket = new DatagramPacket(sendBytes, sendBytes.length, broadcast, broadcastPort); _socket.send(sendPacket); } catch (Exception ignored) { continue; diff --git a/server/src/main/resources/application.properties b/server/src/main/resources/application.properties index 1b99d73f..8c0bccef 100644 --- a/server/src/main/resources/application.properties +++ b/server/src/main/resources/application.properties @@ -1,5 +1,12 @@ quarkus.grpc.server.use-separate-server=false dhfs.objects.persistence.files.root=${HOME}/dhfs_data/dhfs_root dhfs.objects.distributed.root=${HOME}/dhfs_data/dhfs_root_d +dhfs.objects.distributed.peerdiscovery.port=42069 +dhfs.objects.distributed.peerdiscovery.interval=10000 +dhfs.objects.distributed.invalidation.batch_size=1000 +dhfs.objects.distributed.invalidation.delay=100 +dhfs.objects.distributed.reconnect_interval=1s dhfs.fuse.root=${HOME}/dhfs_data/dhfs_fuse_root -dhfs.storage.files.target_chunk_size=1048576 \ No newline at end of file +dhfs.storage.files.target_chunk_size=1048576 +dhfs.objects.writeback.delay=500 +dhfs.objects.writeback.limit=10000