mirror of
https://github.com/usatiuk/dhfs.git
synced 2025-10-29 04:57:48 +01:00
Compare commits
13 Commits
367eedd540
...
060ab1767d
| Author | SHA1 | Date | |
|---|---|---|---|
| 060ab1767d | |||
| 89d87095c8 | |||
| 7425c1f312 | |||
| 428eca325f | |||
| 005bc35496 | |||
| 6685575ca5 | |||
| 1ae813aacd | |||
| e81671251a | |||
| add26bb156 | |||
| 4060045f15 | |||
| 75b484d5b2 | |||
| 1d9dc8ed4d | |||
| 7a85704862 |
@@ -159,16 +159,13 @@
|
|||||||
<groupId>org.apache.maven.plugins</groupId>
|
<groupId>org.apache.maven.plugins</groupId>
|
||||||
<artifactId>maven-failsafe-plugin</artifactId>
|
<artifactId>maven-failsafe-plugin</artifactId>
|
||||||
<configuration>
|
<configuration>
|
||||||
|
<forkCount>1C</forkCount>
|
||||||
|
<reuseForks>false</reuseForks>
|
||||||
|
<parallel>classes</parallel>
|
||||||
<systemPropertyVariables>
|
<systemPropertyVariables>
|
||||||
<junit.jupiter.execution.parallel.enabled>
|
<junit.jupiter.execution.parallel.enabled>
|
||||||
true
|
false
|
||||||
</junit.jupiter.execution.parallel.enabled>
|
</junit.jupiter.execution.parallel.enabled>
|
||||||
<junit.jupiter.execution.parallel.mode.default>
|
|
||||||
concurrent
|
|
||||||
</junit.jupiter.execution.parallel.mode.default>
|
|
||||||
<junit.jupiter.execution.parallel.config.dynamic.factor>
|
|
||||||
0.5
|
|
||||||
</junit.jupiter.execution.parallel.config.dynamic.factor>
|
|
||||||
<junit.platform.output.capture.stdout>true</junit.platform.output.capture.stdout>
|
<junit.platform.output.capture.stdout>true</junit.platform.output.capture.stdout>
|
||||||
<junit.platform.output.capture.stderr>true</junit.platform.output.capture.stderr>
|
<junit.platform.output.capture.stderr>true</junit.platform.output.capture.stderr>
|
||||||
</systemPropertyVariables>
|
</systemPropertyVariables>
|
||||||
|
|||||||
@@ -66,7 +66,7 @@ public class DhfsImage implements Future<String> {
|
|||||||
.run("apt update && apt install -y libfuse2 curl gcc")
|
.run("apt update && apt install -y libfuse2 curl gcc")
|
||||||
.copy("/app", "/app")
|
.copy("/app", "/app")
|
||||||
.copy("/libs", "/libs")
|
.copy("/libs", "/libs")
|
||||||
.cmd("java", "-ea", "-Xmx128M",
|
.cmd("java", "-ea", "-Xmx256M", "-XX:TieredStopAtLevel=1","-XX:+UseParallelGC",
|
||||||
"--add-exports", "java.base/sun.nio.ch=ALL-UNNAMED",
|
"--add-exports", "java.base/sun.nio.ch=ALL-UNNAMED",
|
||||||
"--add-exports", "java.base/jdk.internal.access=ALL-UNNAMED",
|
"--add-exports", "java.base/jdk.internal.access=ALL-UNNAMED",
|
||||||
"--add-opens=java.base/java.nio=ALL-UNNAMED",
|
"--add-opens=java.base/java.nio=ALL-UNNAMED",
|
||||||
@@ -75,7 +75,6 @@ public class DhfsImage implements Future<String> {
|
|||||||
"-Ddhfs.objects.deletion.delay=0",
|
"-Ddhfs.objects.deletion.delay=0",
|
||||||
"-Ddhfs.objects.deletion.can-delete-retry-delay=1000",
|
"-Ddhfs.objects.deletion.can-delete-retry-delay=1000",
|
||||||
"-Ddhfs.objects.ref_verification=true",
|
"-Ddhfs.objects.ref_verification=true",
|
||||||
"-Ddhfs.objects.write_log=true",
|
|
||||||
"-Ddhfs.objects.sync.timeout=10",
|
"-Ddhfs.objects.sync.timeout=10",
|
||||||
"-Ddhfs.objects.sync.ping.timeout=5",
|
"-Ddhfs.objects.sync.ping.timeout=5",
|
||||||
"-Ddhfs.objects.reconnect_interval=1s",
|
"-Ddhfs.objects.reconnect_interval=1s",
|
||||||
|
|||||||
@@ -133,7 +133,7 @@ public class LazyFs {
|
|||||||
errPiper.start();
|
errPiper.start();
|
||||||
|
|
||||||
try {
|
try {
|
||||||
if (!startLatch.await(5, TimeUnit.SECONDS))
|
if (!startLatch.await(30, TimeUnit.SECONDS))
|
||||||
throw new RuntimeException("StartLatch timed out");
|
throw new RuntimeException("StartLatch timed out");
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
throw new RuntimeException(e);
|
throw new RuntimeException(e);
|
||||||
@@ -156,14 +156,14 @@ public class LazyFs {
|
|||||||
"op=\"write\"\n" +
|
"op=\"write\"\n" +
|
||||||
"file=\"" + mdbPath() + "\"\n" +
|
"file=\"" + mdbPath() + "\"\n" +
|
||||||
"persist=[1,4]\n" +
|
"persist=[1,4]\n" +
|
||||||
"occurrence=10");
|
"occurrence=3");
|
||||||
}
|
}
|
||||||
|
|
||||||
public void startTornSeq() {
|
public void startTornSeq() {
|
||||||
start("[[injection]]\n" +
|
start("[[injection]]\n" +
|
||||||
"type=\"torn-op\"\n" +
|
"type=\"torn-op\"\n" +
|
||||||
"file=\"" + mdbPath() + "\"\n" +
|
"file=\"" + mdbPath() + "\"\n" +
|
||||||
"occurrence=10\n" +
|
"occurrence=3\n" +
|
||||||
"parts=3 #or parts_bytes=[4096,3600,1260]\n" +
|
"parts=3 #or parts_bytes=[4096,3600,1260]\n" +
|
||||||
"persist=[1,3]");
|
"persist=[1,3]");
|
||||||
}
|
}
|
||||||
@@ -173,7 +173,6 @@ public class LazyFs {
|
|||||||
var cmd = "echo \"lazyfs::crash::timing=after::op=write::from_rgx=*\" > " + fifoPath();
|
var cmd = "echo \"lazyfs::crash::timing=after::op=write::from_rgx=*\" > " + fifoPath();
|
||||||
Log.info("Running command: " + cmd);
|
Log.info("Running command: " + cmd);
|
||||||
Runtime.getRuntime().exec(new String[]{"/bin/sh", "-c", cmd}).waitFor();
|
Runtime.getRuntime().exec(new String[]{"/bin/sh", "-c", cmd}).waitFor();
|
||||||
stop();
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
throw new RuntimeException(e);
|
throw new RuntimeException(e);
|
||||||
}
|
}
|
||||||
@@ -182,11 +181,7 @@ public class LazyFs {
|
|||||||
public void stop() {
|
public void stop() {
|
||||||
try {
|
try {
|
||||||
synchronized (this) {
|
synchronized (this) {
|
||||||
if (fs == null) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
Runtime.getRuntime().exec(new String[]{"/bin/sh", "-c", "fusermount3 -u " + mountRoot}).waitFor();
|
Runtime.getRuntime().exec(new String[]{"/bin/sh", "-c", "fusermount3 -u " + mountRoot}).waitFor();
|
||||||
fs = null;
|
|
||||||
}
|
}
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
throw new RuntimeException(e);
|
throw new RuntimeException(e);
|
||||||
|
|||||||
@@ -163,10 +163,11 @@ public class LazyFsIT {
|
|||||||
Thread.sleep(3000);
|
Thread.sleep(3000);
|
||||||
Log.info("Killing");
|
Log.info("Killing");
|
||||||
lazyFs1.crash();
|
lazyFs1.crash();
|
||||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Caused by: org.lmdbjava"), 5, TimeUnit.SECONDS);
|
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Caused by: org.lmdbjava"), 60, TimeUnit.SECONDS);
|
||||||
var client = DockerClientFactory.instance().client();
|
var client = DockerClientFactory.instance().client();
|
||||||
client.killContainerCmd(container1.getContainerId()).exec();
|
client.killContainerCmd(container1.getContainerId()).exec();
|
||||||
container1.stop();
|
container1.stop();
|
||||||
|
lazyFs1.stop();
|
||||||
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Lost connection to"), 60, TimeUnit.SECONDS);
|
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Lost connection to"), 60, TimeUnit.SECONDS);
|
||||||
Log.info("Restart");
|
Log.info("Restart");
|
||||||
switch (crashType) {
|
switch (crashType) {
|
||||||
@@ -179,8 +180,16 @@ public class LazyFsIT {
|
|||||||
waitingConsumer1 = new WaitingConsumer();
|
waitingConsumer1 = new WaitingConsumer();
|
||||||
var loggingConsumer1 = new Slf4jLogConsumer(LoggerFactory.getLogger(LazyFsIT.class)).withPrefix("1-" + testInfo.getDisplayName());
|
var loggingConsumer1 = new Slf4jLogConsumer(LoggerFactory.getLogger(LazyFsIT.class)).withPrefix("1-" + testInfo.getDisplayName());
|
||||||
container1.followOutput(loggingConsumer1.andThen(waitingConsumer1));
|
container1.followOutput(loggingConsumer1.andThen(waitingConsumer1));
|
||||||
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
|
try {
|
||||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
|
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
|
||||||
|
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
|
||||||
|
} catch (TimeoutException e) {
|
||||||
|
Log.info("Failed to connect: " + testInfo.getDisplayName());
|
||||||
|
if (crashType.equals(CrashType.CRASH))
|
||||||
|
throw e;
|
||||||
|
// LazyFs can crash too early
|
||||||
|
Assumptions.assumeTrue(false);
|
||||||
|
}
|
||||||
|
|
||||||
executor.submit(() -> {
|
executor.submit(() -> {
|
||||||
try {
|
try {
|
||||||
@@ -196,7 +205,7 @@ public class LazyFsIT {
|
|||||||
Thread.sleep(3000);
|
Thread.sleep(3000);
|
||||||
lazyFs1.crash();
|
lazyFs1.crash();
|
||||||
}
|
}
|
||||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Caused by: org.lmdbjava"), 5, TimeUnit.SECONDS);
|
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Caused by: org.lmdbjava"), 60, TimeUnit.SECONDS);
|
||||||
client.killContainerCmd(container1.getContainerId()).exec();
|
client.killContainerCmd(container1.getContainerId()).exec();
|
||||||
container1.stop();
|
container1.stop();
|
||||||
lazyFs1.stop();
|
lazyFs1.stop();
|
||||||
@@ -232,10 +241,11 @@ public class LazyFsIT {
|
|||||||
Thread.sleep(3000);
|
Thread.sleep(3000);
|
||||||
Log.info("Killing");
|
Log.info("Killing");
|
||||||
lazyFs1.crash();
|
lazyFs1.crash();
|
||||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Caused by: org.lmdbjava"), 5, TimeUnit.SECONDS);
|
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Caused by: org.lmdbjava"), 60, TimeUnit.SECONDS);
|
||||||
var client = DockerClientFactory.instance().client();
|
var client = DockerClientFactory.instance().client();
|
||||||
client.killContainerCmd(container1.getContainerId()).exec();
|
client.killContainerCmd(container1.getContainerId()).exec();
|
||||||
container1.stop();
|
container1.stop();
|
||||||
|
lazyFs1.stop();
|
||||||
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Lost connection to"), 60, TimeUnit.SECONDS);
|
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Lost connection to"), 60, TimeUnit.SECONDS);
|
||||||
Log.info("Restart");
|
Log.info("Restart");
|
||||||
switch (crashType) {
|
switch (crashType) {
|
||||||
@@ -248,8 +258,17 @@ public class LazyFsIT {
|
|||||||
waitingConsumer1 = new WaitingConsumer();
|
waitingConsumer1 = new WaitingConsumer();
|
||||||
var loggingConsumer1 = new Slf4jLogConsumer(LoggerFactory.getLogger(LazyFsIT.class)).withPrefix("1-" + testInfo.getDisplayName());
|
var loggingConsumer1 = new Slf4jLogConsumer(LoggerFactory.getLogger(LazyFsIT.class)).withPrefix("1-" + testInfo.getDisplayName());
|
||||||
container1.followOutput(loggingConsumer1.andThen(waitingConsumer1));
|
container1.followOutput(loggingConsumer1.andThen(waitingConsumer1));
|
||||||
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
|
try {
|
||||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
|
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
|
||||||
|
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
|
||||||
|
} catch (TimeoutException e) {
|
||||||
|
Log.info("Failed to connect: " + testInfo.getDisplayName());
|
||||||
|
if (crashType.equals(CrashType.CRASH))
|
||||||
|
throw e;
|
||||||
|
// LazyFs can crash too early
|
||||||
|
|
||||||
|
Assumptions.assumeTrue(false);
|
||||||
|
}
|
||||||
|
|
||||||
executor.submit(() -> {
|
executor.submit(() -> {
|
||||||
try {
|
try {
|
||||||
@@ -265,7 +284,7 @@ public class LazyFsIT {
|
|||||||
Thread.sleep(3000);
|
Thread.sleep(3000);
|
||||||
lazyFs1.crash();
|
lazyFs1.crash();
|
||||||
}
|
}
|
||||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Caused by: org.lmdbjava"), 5, TimeUnit.SECONDS);
|
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Caused by: org.lmdbjava"), 60, TimeUnit.SECONDS);
|
||||||
client.killContainerCmd(container1.getContainerId()).exec();
|
client.killContainerCmd(container1.getContainerId()).exec();
|
||||||
container1.stop();
|
container1.stop();
|
||||||
lazyFs1.stop();
|
lazyFs1.stop();
|
||||||
@@ -297,12 +316,15 @@ public class LazyFsIT {
|
|||||||
}
|
}
|
||||||
});
|
});
|
||||||
barrier.await();
|
barrier.await();
|
||||||
|
Thread.sleep(3000);
|
||||||
Log.info("Killing");
|
Log.info("Killing");
|
||||||
lazyFs2.crash();
|
lazyFs2.crash();
|
||||||
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Caused by: org.lmdbjava"), 5, TimeUnit.SECONDS);
|
container1.execInContainer("/bin/sh", "-c", "touch /tmp/stopprinting1");
|
||||||
|
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Caused by: org.lmdbjava"), 60, TimeUnit.SECONDS);
|
||||||
var client = DockerClientFactory.instance().client();
|
var client = DockerClientFactory.instance().client();
|
||||||
client.killContainerCmd(container2.getContainerId()).exec();
|
client.killContainerCmd(container2.getContainerId()).exec();
|
||||||
container2.stop();
|
container2.stop();
|
||||||
|
lazyFs2.stop();
|
||||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Lost connection to"), 60, TimeUnit.SECONDS);
|
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Lost connection to"), 60, TimeUnit.SECONDS);
|
||||||
Log.info("Restart");
|
Log.info("Restart");
|
||||||
switch (crashType) {
|
switch (crashType) {
|
||||||
@@ -310,15 +332,22 @@ public class LazyFsIT {
|
|||||||
case TORN_OP -> lazyFs2.startTornOp();
|
case TORN_OP -> lazyFs2.startTornOp();
|
||||||
case TORN_SEQ -> lazyFs2.startTornSeq();
|
case TORN_SEQ -> lazyFs2.startTornSeq();
|
||||||
}
|
}
|
||||||
container1.execInContainer("/bin/sh", "-c", "touch /tmp/stopprinting1");
|
|
||||||
container2.start();
|
container2.start();
|
||||||
|
|
||||||
waitingConsumer2 = new WaitingConsumer();
|
waitingConsumer2 = new WaitingConsumer();
|
||||||
var loggingConsumer2 = new Slf4jLogConsumer(LoggerFactory.getLogger(LazyFsIT.class)).withPrefix("2-" + testInfo.getDisplayName());
|
var loggingConsumer2 = new Slf4jLogConsumer(LoggerFactory.getLogger(LazyFsIT.class)).withPrefix("2-" + testInfo.getDisplayName());
|
||||||
container2.followOutput(loggingConsumer2.andThen(waitingConsumer2));
|
container2.followOutput(loggingConsumer2.andThen(waitingConsumer2));
|
||||||
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
|
try {
|
||||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
|
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
|
||||||
|
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
|
||||||
|
} catch (TimeoutException e) {
|
||||||
|
Log.info("Failed to connect: " + testInfo.getDisplayName());
|
||||||
|
if (crashType.equals(CrashType.CRASH))
|
||||||
|
throw e;
|
||||||
|
// LazyFs can crash too early
|
||||||
|
|
||||||
|
Assumptions.assumeTrue(false);
|
||||||
|
}
|
||||||
var barrier2 = new CountDownLatch(1);
|
var barrier2 = new CountDownLatch(1);
|
||||||
executor.submit(() -> {
|
executor.submit(() -> {
|
||||||
try {
|
try {
|
||||||
@@ -331,17 +360,17 @@ public class LazyFsIT {
|
|||||||
});
|
});
|
||||||
barrier2.await();
|
barrier2.await();
|
||||||
Log.info("Killing");
|
Log.info("Killing");
|
||||||
|
Thread.sleep(3000);
|
||||||
if (crashType.equals(CrashType.CRASH)) {
|
if (crashType.equals(CrashType.CRASH)) {
|
||||||
Thread.sleep(2000);
|
|
||||||
lazyFs2.crash();
|
lazyFs2.crash();
|
||||||
}
|
}
|
||||||
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Caused by: org.lmdbjava"), 30, TimeUnit.SECONDS);
|
container1.execInContainer("/bin/sh", "-c", "touch /tmp/stopprinting2");
|
||||||
|
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Caused by: org.lmdbjava"), 60, TimeUnit.SECONDS);
|
||||||
client.killContainerCmd(container2.getContainerId()).exec();
|
client.killContainerCmd(container2.getContainerId()).exec();
|
||||||
container2.stop();
|
container2.stop();
|
||||||
lazyFs2.stop();
|
lazyFs2.stop();
|
||||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Lost connection to"), 60, TimeUnit.SECONDS);
|
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Lost connection to"), 60, TimeUnit.SECONDS);
|
||||||
Log.info("Restart");
|
Log.info("Restart");
|
||||||
container1.execInContainer("/bin/sh", "-c", "touch /tmp/stopprinting2");
|
|
||||||
lazyFs2.start();
|
lazyFs2.start();
|
||||||
container2.start();
|
container2.start();
|
||||||
|
|
||||||
@@ -372,10 +401,12 @@ public class LazyFsIT {
|
|||||||
Thread.sleep(3000);
|
Thread.sleep(3000);
|
||||||
Log.info("Killing");
|
Log.info("Killing");
|
||||||
lazyFs2.crash();
|
lazyFs2.crash();
|
||||||
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Caused by: org.lmdbjava"), 5, TimeUnit.SECONDS);
|
container1.execInContainer("/bin/sh", "-c", "touch /tmp/stopprinting1");
|
||||||
|
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Caused by: org.lmdbjava"), 60, TimeUnit.SECONDS);
|
||||||
var client = DockerClientFactory.instance().client();
|
var client = DockerClientFactory.instance().client();
|
||||||
client.killContainerCmd(container2.getContainerId()).exec();
|
client.killContainerCmd(container2.getContainerId()).exec();
|
||||||
container2.stop();
|
container2.stop();
|
||||||
|
lazyFs2.stop();
|
||||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Lost connection to"), 60, TimeUnit.SECONDS);
|
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Lost connection to"), 60, TimeUnit.SECONDS);
|
||||||
Log.info("Restart");
|
Log.info("Restart");
|
||||||
switch (crashType) {
|
switch (crashType) {
|
||||||
@@ -383,14 +414,22 @@ public class LazyFsIT {
|
|||||||
case TORN_OP -> lazyFs2.startTornOp();
|
case TORN_OP -> lazyFs2.startTornOp();
|
||||||
case TORN_SEQ -> lazyFs2.startTornSeq();
|
case TORN_SEQ -> lazyFs2.startTornSeq();
|
||||||
}
|
}
|
||||||
container1.execInContainer("/bin/sh", "-c", "touch /tmp/stopprinting1");
|
|
||||||
container2.start();
|
container2.start();
|
||||||
|
|
||||||
waitingConsumer2 = new WaitingConsumer();
|
waitingConsumer2 = new WaitingConsumer();
|
||||||
var loggingConsumer2 = new Slf4jLogConsumer(LoggerFactory.getLogger(LazyFsIT.class)).withPrefix("2-" + testInfo.getDisplayName());
|
var loggingConsumer2 = new Slf4jLogConsumer(LoggerFactory.getLogger(LazyFsIT.class)).withPrefix("2-" + testInfo.getDisplayName());
|
||||||
container2.followOutput(loggingConsumer2.andThen(waitingConsumer2));
|
container2.followOutput(loggingConsumer2.andThen(waitingConsumer2));
|
||||||
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
|
try {
|
||||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
|
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
|
||||||
|
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
|
||||||
|
} catch (TimeoutException e) {
|
||||||
|
Log.info("Failed to connect: " + testInfo.getDisplayName());
|
||||||
|
if (crashType.equals(CrashType.CRASH))
|
||||||
|
throw e;
|
||||||
|
// LazyFs can crash too early
|
||||||
|
|
||||||
|
Assumptions.assumeTrue(false);
|
||||||
|
}
|
||||||
|
|
||||||
var barrier2 = new CountDownLatch(1);
|
var barrier2 = new CountDownLatch(1);
|
||||||
executor.submit(() -> {
|
executor.submit(() -> {
|
||||||
@@ -403,17 +442,17 @@ public class LazyFsIT {
|
|||||||
}
|
}
|
||||||
});
|
});
|
||||||
barrier2.await();
|
barrier2.await();
|
||||||
|
Thread.sleep(3000);
|
||||||
Log.info("Killing");
|
Log.info("Killing");
|
||||||
if (crashType.equals(CrashType.CRASH)) {
|
if (crashType.equals(CrashType.CRASH)) {
|
||||||
Thread.sleep(2000);
|
|
||||||
lazyFs2.crash();
|
lazyFs2.crash();
|
||||||
}
|
}
|
||||||
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Caused by: org.lmdbjava"), 30, TimeUnit.SECONDS);
|
container1.execInContainer("/bin/sh", "-c", "touch /tmp/stopprinting2");
|
||||||
|
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Caused by: org.lmdbjava"), 60, TimeUnit.SECONDS);
|
||||||
client.killContainerCmd(container2.getContainerId()).exec();
|
client.killContainerCmd(container2.getContainerId()).exec();
|
||||||
container2.stop();
|
container2.stop();
|
||||||
lazyFs2.stop();
|
lazyFs2.stop();
|
||||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Lost connection to"), 60, TimeUnit.SECONDS);
|
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Lost connection to"), 60, TimeUnit.SECONDS);
|
||||||
container1.execInContainer("/bin/sh", "-c", "touch /tmp/stopprinting2");
|
|
||||||
Log.info("Restart");
|
Log.info("Restart");
|
||||||
lazyFs2.start();
|
lazyFs2.start();
|
||||||
container2.start();
|
container2.start();
|
||||||
|
|||||||
@@ -444,12 +444,12 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
|||||||
}
|
}
|
||||||
|
|
||||||
for (var e : removedChunks.entrySet()) {
|
for (var e : removedChunks.entrySet()) {
|
||||||
Log.tracev("Removing chunk {0}-{1}", e.getKey(), e.getValue());
|
// Log.tracev("Removing chunk {0}-{1}", e.getKey(), e.getValue());
|
||||||
jMapHelper.delete(file, JMapLongKey.of(e.getKey()));
|
jMapHelper.delete(file, JMapLongKey.of(e.getKey()));
|
||||||
}
|
}
|
||||||
|
|
||||||
for (var e : newChunks.entrySet()) {
|
for (var e : newChunks.entrySet()) {
|
||||||
Log.tracev("Adding chunk {0}-{1}", e.getKey(), e.getValue());
|
// Log.tracev("Adding chunk {0}-{1}", e.getKey(), e.getValue());
|
||||||
jMapHelper.put(file, JMapLongKey.of(e.getKey()), e.getValue());
|
jMapHelper.put(file, JMapLongKey.of(e.getKey()), e.getValue());
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -535,12 +535,12 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
|||||||
// file = file.withChunks(file.chunks().minusAll(removedChunks.keySet()).plusAll(newChunks)).withMTime(System.currentTimeMillis());
|
// file = file.withChunks(file.chunks().minusAll(removedChunks.keySet()).plusAll(newChunks)).withMTime(System.currentTimeMillis());
|
||||||
|
|
||||||
for (var e : removedChunks.entrySet()) {
|
for (var e : removedChunks.entrySet()) {
|
||||||
Log.tracev("Removing chunk {0}-{1}", e.getKey(), e.getValue());
|
// Log.tracev("Removing chunk {0}-{1}", e.getKey(), e.getValue());
|
||||||
jMapHelper.delete(file, JMapLongKey.of(e.getKey()));
|
jMapHelper.delete(file, JMapLongKey.of(e.getKey()));
|
||||||
}
|
}
|
||||||
|
|
||||||
for (var e : newChunks.entrySet()) {
|
for (var e : newChunks.entrySet()) {
|
||||||
Log.tracev("Adding chunk {0}-{1}", e.getKey(), e.getValue());
|
// Log.tracev("Adding chunk {0}-{1}", e.getKey(), e.getValue());
|
||||||
jMapHelper.put(file, JMapLongKey.of(e.getKey()), e.getValue());
|
jMapHelper.put(file, JMapLongKey.of(e.getKey()), e.getValue());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -81,6 +81,7 @@ public class WritebackObjectPersistentStore {
|
|||||||
lastTxId = s.id();
|
lastTxId = s.id();
|
||||||
}
|
}
|
||||||
_lastCommittedId.set(lastTxId);
|
_lastCommittedId.set(lastTxId);
|
||||||
|
_lastWrittenId.set(lastTxId);
|
||||||
_pendingWrites.set(new PendingWriteData(TreePMap.empty(), lastTxId, lastTxId));
|
_pendingWrites.set(new PendingWriteData(TreePMap.empty(), lastTxId, lastTxId));
|
||||||
_ready = true;
|
_ready = true;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -3,6 +3,7 @@ package com.usatiuk.dhfs.repository;
|
|||||||
import com.usatiuk.dhfs.PeerId;
|
import com.usatiuk.dhfs.PeerId;
|
||||||
import com.usatiuk.dhfs.repository.peerdiscovery.IpPeerAddress;
|
import com.usatiuk.dhfs.repository.peerdiscovery.IpPeerAddress;
|
||||||
import com.usatiuk.dhfs.repository.peerdiscovery.PeerAddress;
|
import com.usatiuk.dhfs.repository.peerdiscovery.PeerAddress;
|
||||||
|
import io.grpc.ManagedChannel;
|
||||||
import io.grpc.Status;
|
import io.grpc.Status;
|
||||||
import io.grpc.StatusRuntimeException;
|
import io.grpc.StatusRuntimeException;
|
||||||
import io.quarkus.logging.Log;
|
import io.quarkus.logging.Log;
|
||||||
@@ -20,7 +21,7 @@ import java.util.concurrent.TimeUnit;
|
|||||||
|
|
||||||
// TODO: Dedup this
|
// TODO: Dedup this
|
||||||
@ApplicationScoped
|
@ApplicationScoped
|
||||||
public class RpcClientFactory {
|
public class RpcClientFactory implements PeerDisconnectedEventListener {
|
||||||
@ConfigProperty(name = "dhfs.objects.sync.timeout")
|
@ConfigProperty(name = "dhfs.objects.sync.timeout")
|
||||||
long syncTimeout;
|
long syncTimeout;
|
||||||
|
|
||||||
@@ -79,8 +80,20 @@ public class RpcClientFactory {
|
|||||||
return fn.apply(host, stub.withDeadlineAfter(timeout, TimeUnit.SECONDS));
|
return fn.apply(host, stub.withDeadlineAfter(timeout, TimeUnit.SECONDS));
|
||||||
}
|
}
|
||||||
|
|
||||||
public void dropCache() {
|
@Override
|
||||||
_objSyncCache = new ConcurrentHashMap<>();
|
public void handlePeerDisconnected(PeerId peerId) {
|
||||||
|
ArrayList<ObjSyncStubKey> toRemove = new ArrayList<>();
|
||||||
|
for (var objSyncStubKey : _objSyncCache.keySet()) {
|
||||||
|
if (objSyncStubKey.id().equals(peerId)) {
|
||||||
|
toRemove.add(objSyncStubKey);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for (var objSyncStubKey : toRemove) {
|
||||||
|
var stub = _objSyncCache.remove(objSyncStubKey);
|
||||||
|
if (stub != null) {
|
||||||
|
((ManagedChannel) stub.getChannel()).shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@FunctionalInterface
|
@FunctionalInterface
|
||||||
|
|||||||
@@ -162,7 +162,7 @@ public class InvalidationQueueService {
|
|||||||
commits.get(p).forEach(Runnable::run);
|
commits.get(p).forEach(Runnable::run);
|
||||||
}
|
}
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
Log.warnv("Failed to send invalidations, will retry", e);
|
Log.warn("Failed to send invalidations, will retry", e);
|
||||||
for (var inv : data) {
|
for (var inv : data) {
|
||||||
pushInvalidationToOne(inv);
|
pushInvalidationToOne(inv);
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user