Server: parallel op sending

This commit is contained in:
2025-03-31 16:05:09 +02:00
parent 5c50d572d0
commit 0f8002dc2c
3 changed files with 32 additions and 5 deletions

View File

@@ -1,5 +1,6 @@
package com.usatiuk.dhfs.repository.invalidation;
import com.usatiuk.dhfs.utils.DataLocker;
import com.usatiuk.objects.JObjectKey;
import com.usatiuk.dhfs.PeerId;
import com.usatiuk.dhfs.repository.PeerManager;
@@ -15,6 +16,7 @@ import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
import jakarta.inject.Inject;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.apache.commons.lang3.tuple.Pair;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import java.util.concurrent.ExecutorService;
@@ -39,6 +41,8 @@ public class InvalidationQueueService {
@Inject
PersistentPeerDataService persistentPeerDataService;
private final DataLocker _pushLocker = new DataLocker();
private ExecutorService _executor;
private volatile boolean _shutdown = false;
@@ -120,7 +124,7 @@ public class InvalidationQueueService {
continue;
}
try {
try (var lock = _pushLocker.lock(Pair.of(e.peer(), e.key()))) {
opPusher.doPush(e);
success++;
} catch (Exception ex) {
@@ -168,11 +172,23 @@ public class InvalidationQueueService {
deferredInvalidationQueueService.defer(entry);
}
void pushInvalidationToOneNoDelay(InvalidationQueueEntry entry) {
if (remoteHostManager.isReachable(entry.peer()))
_queue.addNoDelay(entry);
else
deferredInvalidationQueueService.defer(entry);
}
public void pushInvalidationToOne(PeerId host, JObjectKey obj) {
var entry = new InvalidationQueueEntry(host, obj);
pushInvalidationToOne(entry);
}
public void pushInvalidationToOneNoDelay(PeerId host, JObjectKey obj) {
var entry = new InvalidationQueueEntry(host, obj);
pushInvalidationToOneNoDelay(entry);
}
void pushDeferredInvalidations(InvalidationQueueEntry entry) {
_queue.add(entry);
}

View File

@@ -4,7 +4,7 @@ dhfs.objects.peerdiscovery.interval=4s
dhfs.objects.peerdiscovery.broadcast=true
dhfs.objects.sync.timeout=30
dhfs.objects.sync.ping.timeout=5
dhfs.objects.invalidation.threads=1
dhfs.objects.invalidation.threads=4
dhfs.objects.invalidation.delay=1000
dhfs.objects.reconnect_interval=5s
dhfs.objects.write_log=false

View File

@@ -123,13 +123,24 @@ public class ResyncIT {
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
Thread.sleep(20000);
await().atMost(120, TimeUnit.SECONDS).until(() -> {
var foundWc2 = container2.execInContainer("/bin/sh", "-c", "find /root/dhfs_default/fuse -type f | wc -l");
return 400 == Integer.valueOf(foundWc2.getStdout().strip());
int val = 0;
try {
val = Integer.valueOf(foundWc2.getStdout().strip());
} catch (NumberFormatException ignored) {
}
return 400 == val;
});
await().atMost(120, TimeUnit.SECONDS).until(() -> {
var foundWc2 = container1.execInContainer("/bin/sh", "-c", "find /root/dhfs_default/fuse -type f | wc -l");
return 400 == Integer.valueOf(foundWc2.getStdout().strip());
var foundWc1 = container1.execInContainer("/bin/sh", "-c", "find /root/dhfs_default/fuse -type f | wc -l");
int val = 0;
try {
val = Integer.valueOf(foundWc1.getStdout().strip());
} catch (NumberFormatException ignored) {
}
return 400 == val;
});
}