A little more flexible conflict resolution

This commit is contained in:
2024-06-19 17:17:29 +02:00
parent 874906431f
commit 59ef0555e9
4 changed files with 63 additions and 68 deletions

View File

@@ -2,17 +2,17 @@ package com.usatiuk.dhfs.storage.files.objects;
import com.usatiuk.dhfs.objects.repository.distributed.ObjectHeader;
import com.usatiuk.dhfs.storage.DeserializationHelper;
import com.usatiuk.dhfs.storage.objects.repository.distributed.ConflictResolver;
import com.usatiuk.dhfs.storage.objects.repository.distributed.ObjectMetaData;
import com.usatiuk.dhfs.storage.objects.jrepository.JObjectManager;
import com.usatiuk.dhfs.storage.objects.repository.distributed.*;
import com.usatiuk.dhfs.storage.objects.repository.persistence.ObjectPersistentStore;
import io.quarkus.logging.Log;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.UUID;
@ApplicationScoped
@@ -20,10 +20,33 @@ public class DirectoryConflictResolver implements ConflictResolver {
@ConfigProperty(name = "dhfs.objects.distributed.selfname")
String selfname;
@Inject
ObjectPersistentStore objectPersistentStore;
@Inject
RemoteObjectServiceClient remoteObjectServiceClient;
@Inject
ObjectIndexService objectIndexService;
@Inject
InvalidationQueueService invalidationQueueService;
@Inject
JObjectManager jObjectManager;
@Override
public ConflictResolutionResult resolve(byte[] oursData, ObjectHeader oursHeader, byte[] theirsData, ObjectHeader theirsHeader, String theirsSelfname) {
public ConflictResolutionResult resolve(String conflictHost,
ObjectHeader conflictSource,
ObjectMetaData localMeta) {
var oursData = objectPersistentStore.readObject(localMeta.getName());
var theirsData = remoteObjectServiceClient.getSpecificObject(conflictHost, conflictSource.getName());
var oursHeader = localMeta.toRpcHeader();
var theirsHeader = theirsData.getLeft();
var ours = (Directory) DeserializationHelper.deserialize(oursData);
var theirs = (Directory) DeserializationHelper.deserialize(theirsData);
var theirs = (Directory) DeserializationHelper.deserialize(theirsData.getRight());
if (!ours.getClass().equals(Directory.class) || !theirs.getClass().equals(Directory.class)) {
Log.error("Object type mismatch!");
throw new NotImplementedException();
@@ -32,7 +55,7 @@ public class DirectoryConflictResolver implements ConflictResolver {
LinkedHashMap<String, UUID> mergedChildren = new LinkedHashMap<>(((Directory) ours).getChildrenMap());
for (var entry : ((Directory) theirs).getChildrenMap().entrySet()) {
if (mergedChildren.containsKey(entry.getKey())) {
mergedChildren.put(entry.getValue() + ".conflict." + theirsSelfname, entry.getValue());
mergedChildren.put(entry.getValue() + ".conflict." + conflictHost, entry.getValue());
}
}
@@ -56,7 +79,19 @@ public class DirectoryConflictResolver implements ConflictResolver {
newDir.setMtime(System.currentTimeMillis());
newDir.setCtime(ours.getCtime());
return new ConflictResolutionResult(ConflictResolutionResult.Type.RESOLVED,
List.of(Pair.of(newHdr, SerializationUtils.serialize(newDir))));
var newBytes = SerializationUtils.serialize(newDir);
objectIndexService.getOrCreateMeta(oursHeader.getName(), oursHeader.getConflictResolver()).runWriteLocked(m -> {
m.getChangelog().clear();
m.getChangelog().putAll(newMetaData.getChangelog());
objectPersistentStore.writeObject(m.getName(), newBytes);
return null;
});
invalidationQueueService.pushInvalidationToAll(oursHeader.getName());
jObjectManager.invalidateJObject(oursHeader.getName());
return ConflictResolutionResult.RESOLVED;
}
}

View File

@@ -3,12 +3,12 @@ package com.usatiuk.dhfs.storage.objects.jrepository;
import com.usatiuk.dhfs.objects.repository.distributed.ObjectHeader;
import com.usatiuk.dhfs.storage.DeserializationHelper;
import com.usatiuk.dhfs.storage.objects.repository.distributed.ConflictResolver;
import io.quarkus.logging.Log;
import com.usatiuk.dhfs.storage.objects.repository.distributed.ObjectMetaData;
import com.usatiuk.dhfs.storage.objects.repository.persistence.ObjectPersistentStore;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Instance;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import org.apache.commons.lang3.NotImplementedException;
@ApplicationScoped
@Named("JObjectConflictResolution")
@@ -16,15 +16,14 @@ public class JObjectConflictResolution implements ConflictResolver {
@Inject
Instance<ConflictResolver> conflictResolvers;
@Inject
ObjectPersistentStore objectPersistentStore;
@Override
public ConflictResolutionResult
resolve(byte[] oursData, ObjectHeader oursHeader, byte[] theirsData, ObjectHeader theirsHeader, String theirsSelfname) {
resolve(String conflictHost, ObjectHeader conflictSource, ObjectMetaData localMeta) {
var oursData = objectPersistentStore.readObject(localMeta.getName());
var ours = (JObject) DeserializationHelper.deserialize(oursData);
var theirs = (JObject) DeserializationHelper.deserialize(theirsData);
if (!ours.getClass().equals(theirs.getClass())) {
Log.error("Object type mismatch!");
throw new NotImplementedException();
}
return conflictResolvers.select(ours.getConflictResolver()).get().resolve(oursData, oursHeader, theirsData, theirsHeader, theirsSelfname);
return conflictResolvers.select(ours.getConflictResolver()).get().resolve(conflictHost, conflictSource, localMeta);
}
}

View File

@@ -1,26 +1,13 @@
package com.usatiuk.dhfs.storage.objects.repository.distributed;
import com.usatiuk.dhfs.objects.repository.distributed.ObjectHeader;
import lombok.AllArgsConstructor;
import lombok.Getter;
import org.apache.commons.lang3.tuple.Pair;
import java.util.List;
public interface ConflictResolver {
@AllArgsConstructor
@Getter
public static class ConflictResolutionResult {
public enum Type {
EQUIVALENT,
RESOLVED,
FAILED
}
private final Type _type;
private final List<Pair<ObjectHeader, byte[]>> _results;
public enum ConflictResolutionResult {
RESOLVED,
FAILED
}
public ConflictResolutionResult
resolve(byte[] oursData, ObjectHeader oursHeader, byte[] theirsData, ObjectHeader theirsHeader, String theirsSelfname);
resolve(String conflictHost, ObjectHeader conflictSource, ObjectMetaData localMeta);
}

View File

@@ -3,7 +3,6 @@ package com.usatiuk.dhfs.storage.objects.repository.distributed;
import com.usatiuk.dhfs.objects.repository.distributed.IndexUpdatePush;
import com.usatiuk.dhfs.objects.repository.distributed.IndexUpdateReply;
import com.usatiuk.dhfs.objects.repository.distributed.ObjectChangelogEntry;
import com.usatiuk.dhfs.objects.repository.distributed.ObjectHeader;
import com.usatiuk.dhfs.storage.objects.jrepository.JObjectManager;
import com.usatiuk.dhfs.storage.objects.repository.persistence.ObjectPersistentStore;
import io.grpc.Status;
@@ -17,7 +16,6 @@ import jakarta.enterprise.event.Observes;
import jakarta.enterprise.inject.Instance;
import jakarta.enterprise.inject.literal.NamedLiteral;
import jakarta.inject.Inject;
import org.apache.commons.lang3.NotImplementedException;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import java.io.IOException;
@@ -100,8 +98,13 @@ public class SyncHandler {
var conflict = data.getChangelog().get(selfname) > receivedSelfVer;
if (conflict) {
handleConflict(request.getSelfname(), request.getHeader(), data);
return null;
var resolver = conflictResolvers.select(NamedLiteral.of(meta.getConflictResolver()));
var result = resolver.get().resolve(request.getSelfname(), request.getHeader(), data);
if (result.equals(ConflictResolver.ConflictResolutionResult.RESOLVED)) {
Log.info("Resolved conflict for " + request.getSelfname() + " " + request.getHeader().getName());
return null;
} else
Log.error("Failed conflict resolution for " + request.getSelfname() + " " + request.getHeader().getName());
}
if (receivedTotalVer.equals(data.getTotalVersion())) {
@@ -130,33 +133,4 @@ public class SyncHandler {
return IndexUpdateReply.newBuilder().build();
}
public void handleConflict(String conflictHost, ObjectHeader conflictSource,
ObjectMetaData localMeta) {
var resolver = conflictResolvers.select(NamedLiteral.of(localMeta.getConflictResolver()));
var theirs = remoteObjectServiceClient.getSpecificObject(conflictHost, conflictSource.getName());
var oursData = objectPersistentStore.readObject(localMeta.getName());
var res = resolver.get().resolve(oursData, localMeta.toRpcHeader(), theirs.getRight(), theirs.getLeft(), conflictHost);
if (res.getType().equals(ConflictResolver.ConflictResolutionResult.Type.FAILED)) {
Log.error("Failed resolving conflict");
throw new NotImplementedException();
}
if (res.getType().equals(ConflictResolver.ConflictResolutionResult.Type.RESOLVED)) {
Log.error("Resolved conflict for " + localMeta.getName());
for (var obj : res.getResults()) {
objectIndexService.getOrCreateMeta(obj.getLeft().getName(), obj.getLeft().getConflictResolver()).runWriteLocked(m -> {
m.getChangelog().clear();
for (var entry : obj.getLeft().getChangelog().getEntriesList()) {
m.getChangelog().put(entry.getHost(), entry.getVersion());
}
m.getChangelog().putIfAbsent(selfname, 0L);
objectPersistentStore.writeObject(m.getName(), obj.getRight());
return null;
});
invalidationQueueService.pushInvalidationToAll(obj.getLeft().getName());
}
}
}
}