use more configs

This commit is contained in:
2024-06-23 11:34:22 +02:00
parent cb9fc19f1a
commit 259ad7763e
5 changed files with 37 additions and 11 deletions

View File

@@ -10,6 +10,7 @@ import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
import jakarta.inject.Inject;
import org.apache.commons.lang3.tuple.Pair;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import java.util.ArrayList;
import java.util.Collection;
@@ -25,8 +26,13 @@ public class JObjectWriteback {
@Inject
JObjectManager jObjectManager;
@ConfigProperty(name = "dhfs.objects.writeback.delay")
Integer delay;
@ConfigProperty(name = "dhfs.objects.writeback.limit")
Integer limit;
private final LinkedHashMap<String, Pair<Long, JObject<?>>> _objects = new LinkedHashMap<>();
private final LinkedHashSet<String> _toIgnore = new LinkedHashSet<>();
private Thread _writebackThread;
@@ -60,7 +66,7 @@ public class JObjectWriteback {
boolean wait = false;
while (!Thread.interrupted()) {
if (wait) {
Thread.sleep(500);
Thread.sleep(delay);
wait = false;
}
JObject<?> obj;
@@ -68,7 +74,7 @@ public class JObjectWriteback {
while (_objects.isEmpty())
_objects.wait();
if ((System.currentTimeMillis() - _objects.firstEntry().getValue().getLeft()) < 500L) {
if ((System.currentTimeMillis() - _objects.firstEntry().getValue().getLeft()) < delay) {
wait = true;
continue;
}
@@ -124,7 +130,7 @@ public class JObjectWriteback {
}
// FIXME: better logic
if (_objects.size() < 10000) {
if (_objects.size() < limit) {
_objects.put(name, Pair.of(System.currentTimeMillis(), object));
_objects.notifyAll();
return;

View File

@@ -7,6 +7,7 @@ import jakarta.annotation.Priority;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
import jakarta.inject.Inject;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import java.util.*;
@@ -18,6 +19,12 @@ public class InvalidationQueueService {
@Inject
RemoteObjectServiceClient remoteObjectServiceClient;
@ConfigProperty(name = "dhfs.objects.distributed.invalidation.batch_size")
Integer batchSize;
@ConfigProperty(name = "dhfs.objects.distributed.invalidation.delay")
Integer delay;
private Map<UUID, SequencedSet<String>> _hostToInvObj = new LinkedHashMap<>();
private Thread _senderThread;
@@ -53,7 +60,7 @@ public class InvalidationQueueService {
private void sender() {
try {
while (!Thread.interrupted()) {
Thread.sleep(1000);
Thread.sleep(delay);
var data = pullAll();
String stats = "Sent invalidation: ";
for (var forHost : data.entrySet()) {
@@ -61,7 +68,7 @@ public class InvalidationQueueService {
while (!forHost.getValue().isEmpty()) {
ArrayList<String> chunk = new ArrayList<>();
while (chunk.size() < 1000 && !forHost.getValue().isEmpty()) {
while (chunk.size() < batchSize && !forHost.getValue().isEmpty()) {
chunk.add(forHost.getValue().removeFirst());
}

View File

@@ -40,7 +40,7 @@ public class RemoteHostManager {
void shutdown(@Observes @Priority(250) ShutdownEvent event) throws IOException {
}
@Scheduled(every = "2s", concurrentExecution = Scheduled.ConcurrentExecution.SKIP)
@Scheduled(every = "${dhfs.objects.distributed.reconnect_interval}", concurrentExecution = Scheduled.ConcurrentExecution.SKIP)
@Blocking
public void tryConnectAll() {
for (var host : persistentRemoteHostsService.getHosts()) {

View File

@@ -22,6 +22,12 @@ public class LocalPeerDiscoveryBroadcaster {
@ConfigProperty(name = "quarkus.http.port")
Integer ourPort;
@ConfigProperty(name = "dhfs.objects.distributed.peerdiscovery.port")
Integer broadcastPort;
@ConfigProperty(name = "dhfs.objects.distributed.peerdiscovery.interval")
Integer broadcastInterval;
private Thread _broadcasterThread;
private DatagramSocket _socket;
@@ -50,7 +56,7 @@ public class LocalPeerDiscoveryBroadcaster {
private void broadcast() {
try {
while (!Thread.interrupted()) {
Thread.sleep(10000L);
Thread.sleep(broadcastInterval);
try {
var sendData = PeerDiscoveryInfo.newBuilder()
@@ -62,7 +68,7 @@ public class LocalPeerDiscoveryBroadcaster {
DatagramPacket sendPacket
= new DatagramPacket(sendBytes, sendBytes.length,
InetAddress.getByName("255.255.255.255"), 42069);
InetAddress.getByName("255.255.255.255"), broadcastPort);
_socket.send(sendPacket);
@@ -85,7 +91,7 @@ public class LocalPeerDiscoveryBroadcaster {
}
try {
sendPacket = new DatagramPacket(sendBytes, sendBytes.length, broadcast, 42069);
sendPacket = new DatagramPacket(sendBytes, sendBytes.length, broadcast, broadcastPort);
_socket.send(sendPacket);
} catch (Exception ignored) {
continue;

View File

@@ -1,5 +1,12 @@
quarkus.grpc.server.use-separate-server=false
dhfs.objects.persistence.files.root=${HOME}/dhfs_data/dhfs_root
dhfs.objects.distributed.root=${HOME}/dhfs_data/dhfs_root_d
dhfs.objects.distributed.peerdiscovery.port=42069
dhfs.objects.distributed.peerdiscovery.interval=10000
dhfs.objects.distributed.invalidation.batch_size=1000
dhfs.objects.distributed.invalidation.delay=100
dhfs.objects.distributed.reconnect_interval=1s
dhfs.fuse.root=${HOME}/dhfs_data/dhfs_fuse_root
dhfs.storage.files.target_chunk_size=1048576
dhfs.objects.writeback.delay=500
dhfs.objects.writeback.limit=10000