Server: parallel op sending

This commit is contained in:
2025-03-31 16:32:13 +02:00
parent 8351bec59a
commit 06335b4b99
3 changed files with 12 additions and 9 deletions

View File

@@ -1,5 +1,6 @@
package com.usatiuk.dhfs.repository.invalidation;
import com.usatiuk.dhfs.utils.DataLocker;
import com.usatiuk.objects.JObjectKey;
import com.usatiuk.dhfs.PeerId;
import com.usatiuk.dhfs.repository.PeerManager;
@@ -39,6 +40,7 @@ public class InvalidationQueueService {
@Inject
PersistentPeerDataService persistentPeerDataService;
private final DataLocker _locker = new DataLocker();
private ExecutorService _executor;
private volatile boolean _shutdown = false;
@@ -120,7 +122,7 @@ public class InvalidationQueueService {
continue;
}
try {
try (var lock = _locker.lock(e)) {
opPusher.doPush(e);
success++;
} catch (Exception ex) {

View File

@@ -4,7 +4,7 @@ dhfs.objects.peerdiscovery.interval=4s
dhfs.objects.peerdiscovery.broadcast=true
dhfs.objects.sync.timeout=30
dhfs.objects.sync.ping.timeout=5
dhfs.objects.invalidation.threads=1
dhfs.objects.invalidation.threads=4
dhfs.objects.invalidation.delay=1000
dhfs.objects.reconnect_interval=5s
dhfs.objects.write_log=false

View File

@@ -19,16 +19,17 @@ public class DataLocker {
var tag = _locks.get(data);
if (tag != null) {
synchronized (tag) {
if (!tag.released) {
while (!tag.released) {
if (tag.owner == Thread.currentThread()) {
return DUMMY_LOCK;
}
tag.wait(4000L);
if (!tag.released) {
System.out.println("Timeout waiting for lock: " + data);
System.exit(1);
throw new InterruptedException();
}
tag.wait();
// tag.wait(4000L);
// if (!tag.released) {
// System.out.println("Timeout waiting for lock: " + data);
// System.exit(1);
// throw new InterruptedException();
// }
}
continue;
}