push update when getting outdated one

This commit is contained in:
2024-06-30 19:49:03 +02:00
parent 2f31753446
commit 41533804c2
6 changed files with 13 additions and 3 deletions

View File

@@ -44,7 +44,7 @@ public class JObjectResolver {
public void backupRefs(JObject<?> self) {
self.assertRWLock();
if (self.getData() != null) {
if (self.getMeta().getSavedRefs() != null) {
if ((self.getMeta().getSavedRefs() != null) && (!self.getMeta().getSavedRefs().isEmpty())) {
Log.error("Saved refs not empty for " + self.getName() + " will clean");
self.getMeta().setSavedRefs(null);
}

View File

@@ -27,6 +27,8 @@ public class RemoteObjectServiceClient {
@Inject
SyncHandler syncHandler;
@Inject
InvalidationQueueService invalidationQueueService;
public Pair<ObjectHeader, ByteString> getSpecificObject(UUID host, String name) {
return rpcClientFactory.withObjSyncClient(host, client -> {
@@ -64,6 +66,9 @@ public class RemoteObjectServiceClient {
if (unexpected) {
try {
syncHandler.handleOneUpdate(UUID.fromString(reply.getSelfUuid()), reply.getObject().getHeader());
} catch (SyncHandler.OutdatedUpdateException ignored) {
Log.info("Outdated update of " + md.getName() + " from " + reply.getSelfUuid());
invalidationQueueService.pushInvalidationToOne(UUID.fromString(reply.getSelfUuid()), md.getName(), true); // True?
} catch (Exception e) {
Log.error("Received unexpected object version from " + reply.getSelfUuid()
+ " for " + reply.getObject().getHeader().getName() + " and conflict resolution failed", e);

View File

@@ -15,7 +15,7 @@ import java.util.*;
@ApplicationScoped
public class SyncHandler {
private static class OutdatedUpdateException extends RuntimeException {
protected static class OutdatedUpdateException extends RuntimeException {
}
@Inject
@@ -141,10 +141,12 @@ public class SyncHandler {
var builder = IndexUpdateReply.newBuilder().setSelfUuid(persistentRemoteHostsService.getSelfUuid().toString());
for (var u : request.getHeaderList()) {
// TODO: Dedup
try {
handleOneUpdate(UUID.fromString(request.getSelfUuid()), u);
} catch (OutdatedUpdateException ignored) {
Log.info("Outdated update of " + u.getName() + " from " + request.getSelfUuid());
invalidationQueueService.pushInvalidationToOne(UUID.fromString(request.getSelfUuid()), u.getName(), true); // True?
} catch (Exception ex) {
Log.info("Error when handling update from " + request.getSelfUuid() + " of " + u.getName(), ex);
builder.addErrors(IndexUpdateError.newBuilder().setObjectName(u.getName()).setError(ex + Arrays.toString(ex.getStackTrace())).build());

View File

@@ -44,6 +44,7 @@ public class DhfsFuseIT {
"-Ddhfs.objects.distributed.peerdiscovery.interval=100",
"-Ddhfs.objects.distributed.invalidation.delay=100",
"-Ddhfs.objects.ref_verification=true",
"-Dquarkus.log.category.\"com.usatiuk.dhfs\".level=TRACE",
"-jar", "/app/quarkus-run.jar")
.build())
.withFileFromPath("/app", Paths.get(buildPath, "quarkus-app"));

View File

@@ -49,6 +49,7 @@ public class DhfsFusex3IT {
"-Ddhfs.objects.distributed.invalidation.delay=100",
"-Djava.util.concurrent.ForkJoinPool.common.parallelism=4",
"-Ddhfs.objects.ref_verification=true",
"-Dquarkus.log.category.\"com.usatiuk.dhfs\".level=TRACE",
"-jar", "/app/quarkus-run.jar")
.build())
.withFileFromPath("/app", Paths.get(buildPath, "quarkus-app"));

View File

@@ -1,4 +1,5 @@
dhfs.objects.persistence.files.root=${HOME}/dhfs_data/dhfs_root_test
dhfs.objects.distributed.root=${HOME}/dhfs_data/dhfs_root_d_test
dhfs.fuse.root=${HOME}/dhfs_data/dhfs_fuse_root_test
dhfs.objects.ref_verification=true
dhfs.objects.ref_verification=true
quarkus.log.category."com.usatiuk.dhfs".level=TRACE