delay canDelete retries

This commit is contained in:
2024-07-12 21:29:59 +02:00
parent 22cd7ceda9
commit 83f5cabd48
7 changed files with 129 additions and 14 deletions

View File

@@ -23,6 +23,7 @@ import java.util.stream.Stream;
@ApplicationScoped
public class JObjectRefProcessor {
private final HashSetDelayedBlockingQueue<String> _candidates;
private final HashSetDelayedBlockingQueue<String> _canDeleteRetries;
private final HashSet<String> _movablesInProcessing = new HashSet<>();
@Inject
JObjectManager jObjectManager;
@@ -34,11 +35,15 @@ public class JObjectRefProcessor {
AutoSyncProcessor autoSyncProcessor;
@ConfigProperty(name = "dhfs.objects.move-processor.threads")
int moveProcessorThreads;
@ConfigProperty(name = "dhfs.objects.deletion.can-delete-retry-delay")
long canDeleteRetryDelay;
ExecutorService _movableProcessorExecutorService;
private Thread _refProcessorThread;
public JObjectRefProcessor(@ConfigProperty(name = "dhfs.objects.deletion.delay") long deletionDelay) {
public JObjectRefProcessor(@ConfigProperty(name = "dhfs.objects.deletion.delay") long deletionDelay,
@ConfigProperty(name = "dhfs.objects.deletion.can-delete-retry-delay") long canDeleteRetryDelay) {
_candidates = new HashSetDelayedBlockingQueue<>(deletionDelay);
_canDeleteRetries = new HashSetDelayedBlockingQueue<>(canDeleteRetryDelay);
}
@Startup
@@ -71,6 +76,7 @@ public class JObjectRefProcessor {
}
_movableProcessorExecutorService.submit(() -> {
boolean delay = false;
try {
var knownHosts = persistentRemoteHostsService.getHostsUuid();
List<UUID> missing = new ArrayList<>();
@@ -82,10 +88,19 @@ public class JObjectRefProcessor {
});
var ret = remoteObjectServiceClient.canDelete(missing, obj.getName(), ourReferrers);
long ok = 0;
for (var r : ret) {
if (!r.getDeletionCandidate())
for (var rr : r.getReferrersList())
autoSyncProcessor.add(rr);
else
ok++;
}
if (ok != missing.size()) {
Log.trace("Delaying deletion check of " + obj.getName());
delay = true;
}
obj.runWriteLocked(JObject.ResolutionStrategy.NO_RESOLUTION, (m, d, b, i) -> {
@@ -95,11 +110,14 @@ public class JObjectRefProcessor {
return null;
});
} catch (Exception e) {
Log.error("When processing deletion of movable object " + obj.getName(), e);
Log.warn("When processing deletion of movable object " + obj.getName(), e);
} finally {
synchronized (_movablesInProcessing) {
_movablesInProcessing.remove(obj.getName());
putDeletionCandidate(obj.getName());
if (!delay)
_candidates.add(obj.getName());
else
_canDeleteRetries.add(obj.getName());
}
}
});
@@ -123,7 +141,13 @@ public class JObjectRefProcessor {
private void refProcessorThread() {
try {
while (!Thread.interrupted()) {
String next = _candidates.get();
String next = null;
while (next == null) {
next = _canDeleteRetries.tryGet();
if (next == null)
next = _candidates.get(canDeleteRetryDelay);
}
var got = jObjectManager.get(next);
if (got.isEmpty()) continue;

View File

@@ -5,7 +5,6 @@ import com.google.protobuf.ByteString;
import com.usatiuk.dhfs.objects.jrepository.JObject;
import com.usatiuk.dhfs.objects.jrepository.JObjectManager;
import com.usatiuk.dhfs.objects.jrepository.PushResolution;
import com.usatiuk.dhfs.objects.repository.peersync.PersistentPeerInfo;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.quarkus.logging.Log;
@@ -126,6 +125,7 @@ public class RemoteObjectServiceClient {
public Collection<CanDeleteReply> canDelete(Collection<UUID> targets, String object, Collection<String> ourReferrers) {
ConcurrentLinkedDeque<CanDeleteReply> results = new ConcurrentLinkedDeque<>();
Log.trace("Asking canDelete for " + object + " from " + targets.stream().map(UUID::toString).collect(Collectors.joining(", ")));
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
try {
executor.invokeAll(targets.stream().<Callable<Void>>map(h -> () -> {

View File

@@ -1,5 +1,7 @@
package com.usatiuk.utils;
import jakarta.annotation.Nullable;
import java.util.*;
public class HashSetDelayedBlockingQueue<T> {
@@ -74,7 +76,8 @@ public class HashSetDelayedBlockingQueue<T> {
}
}
public T get() throws InterruptedException {
public T get(Long timeout) throws InterruptedException {
long startedWaiting = System.currentTimeMillis();
try {
synchronized (this) {
_waiting.add(Thread.currentThread());
@@ -82,7 +85,13 @@ public class HashSetDelayedBlockingQueue<T> {
while (!Thread.interrupted()) {
long sleep;
synchronized (this) {
while (_set.isEmpty()) this.wait();
while (_set.isEmpty()) {
if (timeout == null) this.wait();
else {
this.wait(timeout);
if (System.currentTimeMillis() > (startedWaiting + timeout)) return null;
}
}
var curTime = System.currentTimeMillis();
@@ -101,6 +110,10 @@ public class HashSetDelayedBlockingQueue<T> {
throw new InterruptedException();
}
public T get() throws InterruptedException {
return get(null);
}
public T getNoDelay() throws InterruptedException {
synchronized (this) {
while (_set.isEmpty()) this.wait();
@@ -109,6 +122,19 @@ public class HashSetDelayedBlockingQueue<T> {
}
}
@Nullable
public T tryGet() throws InterruptedException {
synchronized (this) {
if (_set.isEmpty()) return null;
var curTime = System.currentTimeMillis();
var first = _set.firstEntry().getValue()._time;
if (first + _delay > curTime) return null;
else return _set.pollFirstEntry().getValue()._el;
}
}
public Collection<T> getAll() {
ArrayList<T> out = new ArrayList<>();

View File

@@ -26,6 +26,7 @@ dhfs.objects.writeback.watermark-high=0.6
dhfs.objects.writeback.watermark-low=0.4
dhfs.objects.writeback.threads=4
dhfs.objects.deletion.delay=0
dhfs.objects.deletion.can-delete-retry-delay=1000
dhfs.objects.ref_verification=true
dhfs.files.use_hash_for_chunks=false
dhfs.objects.autosync.threads=2

View File

@@ -33,7 +33,7 @@ public class DhfsFuseIT {
String c2uuid;
@BeforeEach
void setup() throws IOException, InterruptedException, TimeoutException {
void setup(TestInfo testInfo) throws IOException, InterruptedException, TimeoutException {
String buildPath = System.getProperty("buildDirectory");
System.out.println("Build path: " + buildPath);
@@ -66,10 +66,10 @@ public class DhfsFuseIT {
Stream.of(container1, container2).parallel().forEach(GenericContainer::start);
waitingConsumer1 = new WaitingConsumer();
var loggingConsumer1 = new Slf4jLogConsumer(LoggerFactory.getLogger(DhfsFuseIT.class)).withPrefix("1");
var loggingConsumer1 = new Slf4jLogConsumer(LoggerFactory.getLogger(DhfsFuseIT.class)).withPrefix("1-" + testInfo.getDisplayName());
container1.followOutput(loggingConsumer1.andThen(waitingConsumer1));
waitingConsumer2 = new WaitingConsumer();
var loggingConsumer2 = new Slf4jLogConsumer(LoggerFactory.getLogger(DhfsFuseIT.class)).withPrefix("2");
var loggingConsumer2 = new Slf4jLogConsumer(LoggerFactory.getLogger(DhfsFuseIT.class)).withPrefix("2-" + testInfo.getDisplayName());
container2.followOutput(loggingConsumer2.andThen(waitingConsumer2));
c1uuid = container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_root_d/self_uuid").getStdout();
@@ -165,6 +165,57 @@ public class DhfsFuseIT {
Assertions.assertEquals("rewritten\n", container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_fuse_root/testf1").getStdout());
}
@Test
void deleteDelayedTest() throws IOException, InterruptedException, TimeoutException {
Assertions.assertEquals(0, container1.execInContainer("/bin/sh", "-c", "echo tesempty > /root/dhfs_data/dhfs_fuse_root/testf1").getExitCode());
Thread.sleep(1000);
Assertions.assertEquals("tesempty\n", container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_fuse_root/testf1").getStdout());
var client = DockerClientFactory.instance().client();
client.pauseContainerCmd(container2.getContainerId()).exec();
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Lost connection to"), 60, TimeUnit.SECONDS);
Assertions.assertEquals(0, container1.execInContainer("/bin/sh", "-c", "rm /root/dhfs_data/dhfs_fuse_root/testf1").getExitCode());
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Delaying deletion check"), 60, TimeUnit.SECONDS, 1);
client.unpauseContainerCmd(container2.getContainerId()).exec();
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
Thread.sleep(1000);
Assertions.assertEquals(0, container2.execInContainer("/bin/sh", "-c", "ls /root/dhfs_data/dhfs_fuse_root").getExitCode());
Thread.sleep(1000);
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Deleting from persistent"), 60, TimeUnit.SECONDS, 1);
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Deleting from persistent"), 60, TimeUnit.SECONDS, 3);
Thread.sleep(1000);
Assertions.assertEquals(1, container2.execInContainer("/bin/sh", "-c", "test -f /root/dhfs_data/dhfs_fuse_root/testf1").getExitCode());
Assertions.assertEquals(1, container1.execInContainer("/bin/sh", "-c", "test -f /root/dhfs_data/dhfs_fuse_root/testf1").getExitCode());
}
@Test
void deleteTest() throws IOException, InterruptedException, TimeoutException {
Assertions.assertEquals(0, container1.execInContainer("/bin/sh", "-c", "echo tesempty > /root/dhfs_data/dhfs_fuse_root/testf1").getExitCode());
Thread.sleep(1000);
Assertions.assertEquals("tesempty\n", container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_fuse_root/testf1").getStdout());
Assertions.assertEquals(0, container1.execInContainer("/bin/sh", "-c", "rm /root/dhfs_data/dhfs_fuse_root/testf1").getExitCode());
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Deleting from persistent"), 60, TimeUnit.SECONDS, 1);
Thread.sleep(500);
Assertions.assertEquals(0, container2.execInContainer("/bin/sh", "-c", "ls /root/dhfs_data/dhfs_fuse_root").getExitCode());
Thread.sleep(500);
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Deleting from persistent"), 60, TimeUnit.SECONDS, 2);
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Deleting from persistent"), 60, TimeUnit.SECONDS, 3);
Thread.sleep(1000);
Assertions.assertEquals(1, container2.execInContainer("/bin/sh", "-c", "test -f /root/dhfs_data/dhfs_fuse_root/testf1").getExitCode());
Assertions.assertEquals(1, container1.execInContainer("/bin/sh", "-c", "test -f /root/dhfs_data/dhfs_fuse_root/testf1").getExitCode());
}
@Test
void moveFileTest() throws IOException, InterruptedException, TimeoutException {

View File

@@ -36,7 +36,7 @@ public class DhfsFusex3IT {
String c3uuid;
@BeforeEach
void setup() throws IOException, InterruptedException, TimeoutException {
void setup(TestInfo testInfo) throws IOException, InterruptedException, TimeoutException {
String buildPath = System.getProperty("buildDirectory");
System.out.println("Build path: " + buildPath);
@@ -80,13 +80,16 @@ public class DhfsFusex3IT {
c3uuid = container3.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_root_d/self_uuid").getStdout();
waitingConsumer1 = new WaitingConsumer();
var loggingConsumer1 = new Slf4jLogConsumer(LoggerFactory.getLogger(DhfsFusex3IT.class)).withPrefix(c1uuid);
var loggingConsumer1 = new Slf4jLogConsumer(LoggerFactory.getLogger(DhfsFusex3IT.class))
.withPrefix(c1uuid.substring(0, 4) + "-" + testInfo.getDisplayName());
container1.followOutput(loggingConsumer1.andThen(waitingConsumer1));
waitingConsumer2 = new WaitingConsumer();
var loggingConsumer2 = new Slf4jLogConsumer(LoggerFactory.getLogger(DhfsFusex3IT.class)).withPrefix(c2uuid);
var loggingConsumer2 = new Slf4jLogConsumer(LoggerFactory.getLogger(DhfsFusex3IT.class))
.withPrefix(c2uuid.substring(0, 4) + "-" + testInfo.getDisplayName());
container2.followOutput(loggingConsumer2.andThen(waitingConsumer2));
waitingConsumer3 = new WaitingConsumer();
var loggingConsumer3 = new Slf4jLogConsumer(LoggerFactory.getLogger(DhfsFusex3IT.class)).withPrefix(c3uuid);
var loggingConsumer3 = new Slf4jLogConsumer(LoggerFactory.getLogger(DhfsFusex3IT.class))
.withPrefix(c3uuid.substring(0, 4) + "-" + testInfo.getDisplayName());
container3.followOutput(loggingConsumer3.andThen(waitingConsumer3));
Assertions.assertDoesNotThrow(() -> UUID.fromString(c1uuid));

View File

@@ -23,6 +23,16 @@ public class HashSetDelayedBlockingQueueTest {
Assertions.assertTrue((gotTime - curTime) >= 1000);
}
@Test
void GetTimeout() throws InterruptedException {
var queue = new HashSetDelayedBlockingQueue<>(1000);
var curTime = System.currentTimeMillis();
var thing = queue.get(500L);
Assertions.assertNull(thing);
var gotTime = System.currentTimeMillis();
Assertions.assertTrue((gotTime - curTime) <= 10000);
}
@Test
void GetAll() throws InterruptedException {