mirror of
https://github.com/usatiuk/dhfs.git
synced 2025-10-28 12:37:48 +01:00
@@ -1,6 +1,5 @@
|
||||
package com.usatiuk.dhfs.repository.invalidation;
|
||||
|
||||
import com.usatiuk.dhfs.utils.DataLocker;
|
||||
import com.usatiuk.objects.JObjectKey;
|
||||
import com.usatiuk.dhfs.PeerId;
|
||||
import com.usatiuk.dhfs.repository.PeerManager;
|
||||
@@ -16,7 +15,6 @@ import jakarta.enterprise.context.ApplicationScoped;
|
||||
import jakarta.enterprise.event.Observes;
|
||||
import jakarta.inject.Inject;
|
||||
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
import org.eclipse.microprofile.config.inject.ConfigProperty;
|
||||
|
||||
import java.util.concurrent.ExecutorService;
|
||||
@@ -41,8 +39,6 @@ public class InvalidationQueueService {
|
||||
@Inject
|
||||
PersistentPeerDataService persistentPeerDataService;
|
||||
|
||||
private final DataLocker _pushLocker = new DataLocker();
|
||||
|
||||
private ExecutorService _executor;
|
||||
private volatile boolean _shutdown = false;
|
||||
|
||||
@@ -124,7 +120,7 @@ public class InvalidationQueueService {
|
||||
continue;
|
||||
}
|
||||
|
||||
try (var lock = _pushLocker.lock(Pair.of(e.peer(), e.key()))) {
|
||||
try {
|
||||
opPusher.doPush(e);
|
||||
success++;
|
||||
} catch (Exception ex) {
|
||||
|
||||
@@ -4,7 +4,7 @@ dhfs.objects.peerdiscovery.interval=4s
|
||||
dhfs.objects.peerdiscovery.broadcast=true
|
||||
dhfs.objects.sync.timeout=30
|
||||
dhfs.objects.sync.ping.timeout=5
|
||||
dhfs.objects.invalidation.threads=4
|
||||
dhfs.objects.invalidation.threads=1
|
||||
dhfs.objects.invalidation.delay=1000
|
||||
dhfs.objects.reconnect_interval=5s
|
||||
dhfs.objects.write_log=false
|
||||
|
||||
@@ -123,24 +123,13 @@ public class ResyncIT {
|
||||
|
||||
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
|
||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
|
||||
Thread.sleep(20000);
|
||||
await().atMost(120, TimeUnit.SECONDS).until(() -> {
|
||||
var foundWc2 = container2.execInContainer("/bin/sh", "-c", "find /root/dhfs_default/fuse -type f | wc -l");
|
||||
int val = 0;
|
||||
try {
|
||||
val = Integer.valueOf(foundWc2.getStdout().strip());
|
||||
} catch (NumberFormatException ignored) {
|
||||
}
|
||||
return 400 == val;
|
||||
return 400 == Integer.valueOf(foundWc2.getStdout().strip());
|
||||
});
|
||||
await().atMost(120, TimeUnit.SECONDS).until(() -> {
|
||||
var foundWc1 = container1.execInContainer("/bin/sh", "-c", "find /root/dhfs_default/fuse -type f | wc -l");
|
||||
int val = 0;
|
||||
try {
|
||||
val = Integer.valueOf(foundWc1.getStdout().strip());
|
||||
} catch (NumberFormatException ignored) {
|
||||
}
|
||||
return 400 == val;
|
||||
var foundWc2 = container1.execInContainer("/bin/sh", "-c", "find /root/dhfs_default/fuse -type f | wc -l");
|
||||
return 400 == Integer.valueOf(foundWc2.getStdout().strip());
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user