mirror of
https://github.com/usatiuk/dhfs.git
synced 2025-10-28 20:47:49 +01:00
Server: remove periodic push ops
just send them as an "ack" to other ops
This commit is contained in:
@@ -220,14 +220,14 @@ public class KleppmannTree<TimestampT extends Comparable<TimestampT>, PeerIdT ex
|
||||
return true;
|
||||
}
|
||||
|
||||
public void updateExternalTimestamp(PeerIdT from, TimestampT timestamp) {
|
||||
public boolean updateExternalTimestamp(PeerIdT from, TimestampT timestamp) {
|
||||
_storage.rLock();
|
||||
try {
|
||||
// TODO: Ideally no point in this separate locking?
|
||||
var gotExt = _storage.getPeerTimestampLog().getForPeer(from);
|
||||
var gotSelf = _storage.getPeerTimestampLog().getForPeer(_peers.getSelfId());
|
||||
if ((gotExt != null && gotExt.compareTo(timestamp) >= 0)
|
||||
&& (gotSelf != null && gotSelf.compareTo(_clock.peekTimestamp()) >= 0)) return;
|
||||
&& (gotSelf != null && gotSelf.compareTo(_clock.peekTimestamp()) >= 0)) return false;
|
||||
} finally {
|
||||
_storage.rUnlock();
|
||||
}
|
||||
@@ -239,6 +239,8 @@ public class KleppmannTree<TimestampT extends Comparable<TimestampT>, PeerIdT ex
|
||||
} finally {
|
||||
_storage.rwUnlock();
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
private void applyOp(PeerIdT from, OpMove<TimestampT, PeerIdT, MetaT, NodeIdT> op, boolean failCreatingIfExists) {
|
||||
|
||||
@@ -161,10 +161,9 @@ public class JKleppmannTreeManager {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void acceptExternalOp(UUID from, Op op) {
|
||||
public boolean acceptExternalOp(UUID from, Op op) {
|
||||
if (op instanceof JKleppmannTreePeriodicPushOp pushOp) {
|
||||
_tree.updateExternalTimestamp(pushOp.getFrom(), pushOp.getTimestamp());
|
||||
return;
|
||||
return _tree.updateExternalTimestamp(pushOp.getFrom(), pushOp.getTimestamp());
|
||||
}
|
||||
|
||||
if (!(op instanceof JKleppmannTreeOpWrapper jop))
|
||||
@@ -222,6 +221,7 @@ public class JKleppmannTreeManager {
|
||||
}
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -14,7 +14,7 @@ public interface OpObject {
|
||||
|
||||
void pushBootstrap(UUID host);
|
||||
|
||||
void acceptExternalOp(UUID from, Op op);
|
||||
boolean acceptExternalOp(UUID from, Op op);
|
||||
|
||||
Op getPeriodicPushOp();
|
||||
|
||||
|
||||
@@ -3,8 +3,6 @@ package com.usatiuk.dhfs.objects.repository.opsupport;
|
||||
import com.usatiuk.dhfs.objects.repository.PeerManager;
|
||||
import io.grpc.Status;
|
||||
import io.grpc.StatusRuntimeException;
|
||||
import io.quarkus.scheduler.Scheduled;
|
||||
import io.smallrye.common.annotation.Blocking;
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
import jakarta.inject.Inject;
|
||||
|
||||
@@ -32,8 +30,11 @@ public class OpObjectRegistry {
|
||||
if (got == null)
|
||||
throw new StatusRuntimeException(Status.NOT_FOUND.withDescription("Queue with id " + objId + " not registered"));
|
||||
got.addToTx();
|
||||
boolean push = false;
|
||||
for (Op op : ops)
|
||||
got.acceptExternalOp(from, op);
|
||||
push |= got.acceptExternalOp(from, op);
|
||||
if (push)
|
||||
opSender.push(got);
|
||||
}
|
||||
|
||||
public void pushBootstrapData(UUID host) {
|
||||
@@ -43,13 +44,4 @@ public class OpObjectRegistry {
|
||||
o.pushBootstrap(host);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Scheduled(every = "${dhfs.objects.periodic-push-op-interval}", concurrentExecution = Scheduled.ConcurrentExecution.SKIP)
|
||||
@Blocking
|
||||
void periodicPush() {
|
||||
for (var obj : _objects.values()) {
|
||||
opSender.push(obj);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -186,9 +186,11 @@ public class DhfsFuseIT {
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() ->
|
||||
"tesempty\n".equals(container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf1").getStdout()));
|
||||
|
||||
Log.info("Deleting");
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "rm /root/dhfs_default/fuse/testf1").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() ->
|
||||
0 == container2.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse").getExitCode());
|
||||
Log.info("Deleted");
|
||||
|
||||
// FIXME?
|
||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Deleting from persistent"), 60, TimeUnit.SECONDS, 3);
|
||||
@@ -295,12 +297,13 @@ public class DhfsFuseIT {
|
||||
}
|
||||
}).anyMatch(r -> r != 0);
|
||||
Assumptions.assumeTrue(!createFail, "Failed creating one or more files");
|
||||
var ls = container2.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse");
|
||||
var cat = container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/*");
|
||||
Log.info(ls);
|
||||
Log.info(cat);
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> cat.getStdout().contains("test1") && cat.getStdout().contains("test2"));
|
||||
// Assertions.assertTrue(ls.getStdout().chars().filter(c -> c == '\n').count() >= 2);
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> {
|
||||
var ls = container2.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse");
|
||||
var cat = container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/*");
|
||||
Log.info(ls);
|
||||
Log.info(cat);
|
||||
return cat.getStdout().contains("test1") && cat.getStdout().contains("test2");
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
|
||||
@@ -35,7 +35,8 @@ public class DhfsFusex3IT {
|
||||
String c2uuid;
|
||||
String c3uuid;
|
||||
|
||||
long emptyFileCount;
|
||||
// This calculation is somewhat racy, so keep it hardcoded for now
|
||||
long emptyFileCount = 9;
|
||||
|
||||
@BeforeEach
|
||||
void setup(TestInfo testInfo) throws IOException, InterruptedException, TimeoutException {
|
||||
@@ -114,8 +115,6 @@ public class DhfsFusex3IT {
|
||||
waitingConsumer3.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS, 2);
|
||||
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS, 2);
|
||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS, 2);
|
||||
|
||||
emptyFileCount = Integer.valueOf(container1.execInContainer("/bin/sh", "-c", "find /root/dhfs_default/data/objs -type f | wc -l").getStdout().strip());
|
||||
}
|
||||
|
||||
private boolean checkEmpty() throws IOException, InterruptedException {
|
||||
|
||||
Reference in New Issue
Block a user