13 Commits

8 changed files with 91 additions and 47 deletions

View File

@@ -159,16 +159,13 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-failsafe-plugin</artifactId>
<configuration>
<forkCount>1C</forkCount>
<reuseForks>false</reuseForks>
<parallel>classes</parallel>
<systemPropertyVariables>
<junit.jupiter.execution.parallel.enabled>
true
false
</junit.jupiter.execution.parallel.enabled>
<junit.jupiter.execution.parallel.mode.default>
concurrent
</junit.jupiter.execution.parallel.mode.default>
<junit.jupiter.execution.parallel.config.dynamic.factor>
0.5
</junit.jupiter.execution.parallel.config.dynamic.factor>
<junit.platform.output.capture.stdout>true</junit.platform.output.capture.stdout>
<junit.platform.output.capture.stderr>true</junit.platform.output.capture.stderr>
</systemPropertyVariables>

View File

@@ -66,7 +66,7 @@ public class DhfsImage implements Future<String> {
.run("apt update && apt install -y libfuse2 curl gcc")
.copy("/app", "/app")
.copy("/libs", "/libs")
.cmd("java", "-ea", "-Xmx128M",
.cmd("java", "-ea", "-Xmx256M", "-XX:TieredStopAtLevel=1","-XX:+UseParallelGC",
"--add-exports", "java.base/sun.nio.ch=ALL-UNNAMED",
"--add-exports", "java.base/jdk.internal.access=ALL-UNNAMED",
"--add-opens=java.base/java.nio=ALL-UNNAMED",
@@ -75,7 +75,6 @@ public class DhfsImage implements Future<String> {
"-Ddhfs.objects.deletion.delay=0",
"-Ddhfs.objects.deletion.can-delete-retry-delay=1000",
"-Ddhfs.objects.ref_verification=true",
"-Ddhfs.objects.write_log=true",
"-Ddhfs.objects.sync.timeout=10",
"-Ddhfs.objects.sync.ping.timeout=5",
"-Ddhfs.objects.reconnect_interval=1s",

View File

@@ -133,7 +133,7 @@ public class LazyFs {
errPiper.start();
try {
if (!startLatch.await(5, TimeUnit.SECONDS))
if (!startLatch.await(30, TimeUnit.SECONDS))
throw new RuntimeException("StartLatch timed out");
} catch (InterruptedException e) {
throw new RuntimeException(e);
@@ -156,14 +156,14 @@ public class LazyFs {
"op=\"write\"\n" +
"file=\"" + mdbPath() + "\"\n" +
"persist=[1,4]\n" +
"occurrence=10");
"occurrence=3");
}
public void startTornSeq() {
start("[[injection]]\n" +
"type=\"torn-op\"\n" +
"file=\"" + mdbPath() + "\"\n" +
"occurrence=10\n" +
"occurrence=3\n" +
"parts=3 #or parts_bytes=[4096,3600,1260]\n" +
"persist=[1,3]");
}
@@ -173,7 +173,6 @@ public class LazyFs {
var cmd = "echo \"lazyfs::crash::timing=after::op=write::from_rgx=*\" > " + fifoPath();
Log.info("Running command: " + cmd);
Runtime.getRuntime().exec(new String[]{"/bin/sh", "-c", cmd}).waitFor();
stop();
} catch (Exception e) {
throw new RuntimeException(e);
}
@@ -182,11 +181,7 @@ public class LazyFs {
public void stop() {
try {
synchronized (this) {
if (fs == null) {
return;
}
Runtime.getRuntime().exec(new String[]{"/bin/sh", "-c", "fusermount3 -u " + mountRoot}).waitFor();
fs = null;
}
} catch (Exception e) {
throw new RuntimeException(e);

View File

@@ -163,10 +163,11 @@ public class LazyFsIT {
Thread.sleep(3000);
Log.info("Killing");
lazyFs1.crash();
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Caused by: org.lmdbjava"), 5, TimeUnit.SECONDS);
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Caused by: org.lmdbjava"), 60, TimeUnit.SECONDS);
var client = DockerClientFactory.instance().client();
client.killContainerCmd(container1.getContainerId()).exec();
container1.stop();
lazyFs1.stop();
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Lost connection to"), 60, TimeUnit.SECONDS);
Log.info("Restart");
switch (crashType) {
@@ -179,8 +180,16 @@ public class LazyFsIT {
waitingConsumer1 = new WaitingConsumer();
var loggingConsumer1 = new Slf4jLogConsumer(LoggerFactory.getLogger(LazyFsIT.class)).withPrefix("1-" + testInfo.getDisplayName());
container1.followOutput(loggingConsumer1.andThen(waitingConsumer1));
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
try {
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
} catch (TimeoutException e) {
Log.info("Failed to connect: " + testInfo.getDisplayName());
if (crashType.equals(CrashType.CRASH))
throw e;
// LazyFs can crash too early
Assumptions.assumeTrue(false);
}
executor.submit(() -> {
try {
@@ -196,7 +205,7 @@ public class LazyFsIT {
Thread.sleep(3000);
lazyFs1.crash();
}
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Caused by: org.lmdbjava"), 5, TimeUnit.SECONDS);
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Caused by: org.lmdbjava"), 60, TimeUnit.SECONDS);
client.killContainerCmd(container1.getContainerId()).exec();
container1.stop();
lazyFs1.stop();
@@ -232,10 +241,11 @@ public class LazyFsIT {
Thread.sleep(3000);
Log.info("Killing");
lazyFs1.crash();
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Caused by: org.lmdbjava"), 5, TimeUnit.SECONDS);
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Caused by: org.lmdbjava"), 60, TimeUnit.SECONDS);
var client = DockerClientFactory.instance().client();
client.killContainerCmd(container1.getContainerId()).exec();
container1.stop();
lazyFs1.stop();
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Lost connection to"), 60, TimeUnit.SECONDS);
Log.info("Restart");
switch (crashType) {
@@ -248,8 +258,17 @@ public class LazyFsIT {
waitingConsumer1 = new WaitingConsumer();
var loggingConsumer1 = new Slf4jLogConsumer(LoggerFactory.getLogger(LazyFsIT.class)).withPrefix("1-" + testInfo.getDisplayName());
container1.followOutput(loggingConsumer1.andThen(waitingConsumer1));
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
try {
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
} catch (TimeoutException e) {
Log.info("Failed to connect: " + testInfo.getDisplayName());
if (crashType.equals(CrashType.CRASH))
throw e;
// LazyFs can crash too early
Assumptions.assumeTrue(false);
}
executor.submit(() -> {
try {
@@ -265,7 +284,7 @@ public class LazyFsIT {
Thread.sleep(3000);
lazyFs1.crash();
}
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Caused by: org.lmdbjava"), 5, TimeUnit.SECONDS);
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Caused by: org.lmdbjava"), 60, TimeUnit.SECONDS);
client.killContainerCmd(container1.getContainerId()).exec();
container1.stop();
lazyFs1.stop();
@@ -297,12 +316,15 @@ public class LazyFsIT {
}
});
barrier.await();
Thread.sleep(3000);
Log.info("Killing");
lazyFs2.crash();
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Caused by: org.lmdbjava"), 5, TimeUnit.SECONDS);
container1.execInContainer("/bin/sh", "-c", "touch /tmp/stopprinting1");
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Caused by: org.lmdbjava"), 60, TimeUnit.SECONDS);
var client = DockerClientFactory.instance().client();
client.killContainerCmd(container2.getContainerId()).exec();
container2.stop();
lazyFs2.stop();
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Lost connection to"), 60, TimeUnit.SECONDS);
Log.info("Restart");
switch (crashType) {
@@ -310,15 +332,22 @@ public class LazyFsIT {
case TORN_OP -> lazyFs2.startTornOp();
case TORN_SEQ -> lazyFs2.startTornSeq();
}
container1.execInContainer("/bin/sh", "-c", "touch /tmp/stopprinting1");
container2.start();
waitingConsumer2 = new WaitingConsumer();
var loggingConsumer2 = new Slf4jLogConsumer(LoggerFactory.getLogger(LazyFsIT.class)).withPrefix("2-" + testInfo.getDisplayName());
container2.followOutput(loggingConsumer2.andThen(waitingConsumer2));
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
try {
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
} catch (TimeoutException e) {
Log.info("Failed to connect: " + testInfo.getDisplayName());
if (crashType.equals(CrashType.CRASH))
throw e;
// LazyFs can crash too early
Assumptions.assumeTrue(false);
}
var barrier2 = new CountDownLatch(1);
executor.submit(() -> {
try {
@@ -331,17 +360,17 @@ public class LazyFsIT {
});
barrier2.await();
Log.info("Killing");
Thread.sleep(3000);
if (crashType.equals(CrashType.CRASH)) {
Thread.sleep(2000);
lazyFs2.crash();
}
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Caused by: org.lmdbjava"), 30, TimeUnit.SECONDS);
container1.execInContainer("/bin/sh", "-c", "touch /tmp/stopprinting2");
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Caused by: org.lmdbjava"), 60, TimeUnit.SECONDS);
client.killContainerCmd(container2.getContainerId()).exec();
container2.stop();
lazyFs2.stop();
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Lost connection to"), 60, TimeUnit.SECONDS);
Log.info("Restart");
container1.execInContainer("/bin/sh", "-c", "touch /tmp/stopprinting2");
lazyFs2.start();
container2.start();
@@ -372,10 +401,12 @@ public class LazyFsIT {
Thread.sleep(3000);
Log.info("Killing");
lazyFs2.crash();
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Caused by: org.lmdbjava"), 5, TimeUnit.SECONDS);
container1.execInContainer("/bin/sh", "-c", "touch /tmp/stopprinting1");
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Caused by: org.lmdbjava"), 60, TimeUnit.SECONDS);
var client = DockerClientFactory.instance().client();
client.killContainerCmd(container2.getContainerId()).exec();
container2.stop();
lazyFs2.stop();
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Lost connection to"), 60, TimeUnit.SECONDS);
Log.info("Restart");
switch (crashType) {
@@ -383,14 +414,22 @@ public class LazyFsIT {
case TORN_OP -> lazyFs2.startTornOp();
case TORN_SEQ -> lazyFs2.startTornSeq();
}
container1.execInContainer("/bin/sh", "-c", "touch /tmp/stopprinting1");
container2.start();
waitingConsumer2 = new WaitingConsumer();
var loggingConsumer2 = new Slf4jLogConsumer(LoggerFactory.getLogger(LazyFsIT.class)).withPrefix("2-" + testInfo.getDisplayName());
container2.followOutput(loggingConsumer2.andThen(waitingConsumer2));
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
try {
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
} catch (TimeoutException e) {
Log.info("Failed to connect: " + testInfo.getDisplayName());
if (crashType.equals(CrashType.CRASH))
throw e;
// LazyFs can crash too early
Assumptions.assumeTrue(false);
}
var barrier2 = new CountDownLatch(1);
executor.submit(() -> {
@@ -403,17 +442,17 @@ public class LazyFsIT {
}
});
barrier2.await();
Thread.sleep(3000);
Log.info("Killing");
if (crashType.equals(CrashType.CRASH)) {
Thread.sleep(2000);
lazyFs2.crash();
}
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Caused by: org.lmdbjava"), 30, TimeUnit.SECONDS);
container1.execInContainer("/bin/sh", "-c", "touch /tmp/stopprinting2");
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Caused by: org.lmdbjava"), 60, TimeUnit.SECONDS);
client.killContainerCmd(container2.getContainerId()).exec();
container2.stop();
lazyFs2.stop();
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Lost connection to"), 60, TimeUnit.SECONDS);
container1.execInContainer("/bin/sh", "-c", "touch /tmp/stopprinting2");
Log.info("Restart");
lazyFs2.start();
container2.start();

View File

@@ -444,12 +444,12 @@ public class DhfsFileServiceImpl implements DhfsFileService {
}
for (var e : removedChunks.entrySet()) {
Log.tracev("Removing chunk {0}-{1}", e.getKey(), e.getValue());
// Log.tracev("Removing chunk {0}-{1}", e.getKey(), e.getValue());
jMapHelper.delete(file, JMapLongKey.of(e.getKey()));
}
for (var e : newChunks.entrySet()) {
Log.tracev("Adding chunk {0}-{1}", e.getKey(), e.getValue());
// Log.tracev("Adding chunk {0}-{1}", e.getKey(), e.getValue());
jMapHelper.put(file, JMapLongKey.of(e.getKey()), e.getValue());
}
@@ -535,12 +535,12 @@ public class DhfsFileServiceImpl implements DhfsFileService {
// file = file.withChunks(file.chunks().minusAll(removedChunks.keySet()).plusAll(newChunks)).withMTime(System.currentTimeMillis());
for (var e : removedChunks.entrySet()) {
Log.tracev("Removing chunk {0}-{1}", e.getKey(), e.getValue());
// Log.tracev("Removing chunk {0}-{1}", e.getKey(), e.getValue());
jMapHelper.delete(file, JMapLongKey.of(e.getKey()));
}
for (var e : newChunks.entrySet()) {
Log.tracev("Adding chunk {0}-{1}", e.getKey(), e.getValue());
// Log.tracev("Adding chunk {0}-{1}", e.getKey(), e.getValue());
jMapHelper.put(file, JMapLongKey.of(e.getKey()), e.getValue());
}

View File

@@ -81,6 +81,7 @@ public class WritebackObjectPersistentStore {
lastTxId = s.id();
}
_lastCommittedId.set(lastTxId);
_lastWrittenId.set(lastTxId);
_pendingWrites.set(new PendingWriteData(TreePMap.empty(), lastTxId, lastTxId));
_ready = true;
}

View File

@@ -3,6 +3,7 @@ package com.usatiuk.dhfs.repository;
import com.usatiuk.dhfs.PeerId;
import com.usatiuk.dhfs.repository.peerdiscovery.IpPeerAddress;
import com.usatiuk.dhfs.repository.peerdiscovery.PeerAddress;
import io.grpc.ManagedChannel;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.quarkus.logging.Log;
@@ -20,7 +21,7 @@ import java.util.concurrent.TimeUnit;
// TODO: Dedup this
@ApplicationScoped
public class RpcClientFactory {
public class RpcClientFactory implements PeerDisconnectedEventListener {
@ConfigProperty(name = "dhfs.objects.sync.timeout")
long syncTimeout;
@@ -79,8 +80,20 @@ public class RpcClientFactory {
return fn.apply(host, stub.withDeadlineAfter(timeout, TimeUnit.SECONDS));
}
public void dropCache() {
_objSyncCache = new ConcurrentHashMap<>();
@Override
public void handlePeerDisconnected(PeerId peerId) {
ArrayList<ObjSyncStubKey> toRemove = new ArrayList<>();
for (var objSyncStubKey : _objSyncCache.keySet()) {
if (objSyncStubKey.id().equals(peerId)) {
toRemove.add(objSyncStubKey);
}
}
for (var objSyncStubKey : toRemove) {
var stub = _objSyncCache.remove(objSyncStubKey);
if (stub != null) {
((ManagedChannel) stub.getChannel()).shutdown();
}
}
}
@FunctionalInterface

View File

@@ -162,7 +162,7 @@ public class InvalidationQueueService {
commits.get(p).forEach(Runnable::run);
}
} catch (Exception e) {
Log.warnv("Failed to send invalidations, will retry", e);
Log.warn("Failed to send invalidations, will retry", e);
for (var inv : data) {
pushInvalidationToOne(inv);
}