mirror of
https://github.com/usatiuk/dhfs.git
synced 2025-10-28 20:47:49 +01:00
Server: seemingly almost working file conflict resolution
though the removeAddHostTest doesn't seem to quite like it
This commit is contained in:
@@ -5,12 +5,17 @@ import com.usatiuk.dhfs.supportlib.UninitializedByteBuffer;
|
||||
import java.io.Serializable;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.UUID;
|
||||
|
||||
public record JObjectKey(String name) implements Serializable, Comparable<JObjectKey> {
|
||||
public static JObjectKey of(String name) {
|
||||
return new JObjectKey(name);
|
||||
}
|
||||
|
||||
public static JObjectKey random() {
|
||||
return new JObjectKey(UUID.randomUUID().toString());
|
||||
}
|
||||
|
||||
public static JObjectKey first() {
|
||||
return new JObjectKey("");
|
||||
}
|
||||
|
||||
@@ -1,31 +1,20 @@
|
||||
package com.usatiuk.dhfs.files.objects;
|
||||
|
||||
import com.usatiuk.dhfs.objects.JObjectKey;
|
||||
import com.usatiuk.dhfs.objects.jmap.JMapHelper;
|
||||
import com.usatiuk.dhfs.objects.repository.syncmap.DtoMapper;
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
import jakarta.inject.Inject;
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
|
||||
@ApplicationScoped
|
||||
public class FileDtoMapper implements DtoMapper<File, FileDto> {
|
||||
@Inject
|
||||
JMapHelper jMapHelper;
|
||||
@Inject
|
||||
FileHelper fileHelper;
|
||||
|
||||
@Override
|
||||
public FileDto toDto(File obj) {
|
||||
ArrayList<Pair<Long, JObjectKey>> chunks = new ArrayList<>();
|
||||
try (var it = jMapHelper.getIterator(obj)) {
|
||||
while (it.hasNext()) {
|
||||
var cur = it.next();
|
||||
chunks.add(Pair.of(cur.getKey().key(), cur.getValue().ref()));
|
||||
}
|
||||
}
|
||||
|
||||
return new FileDto(obj, Collections.unmodifiableList(chunks));
|
||||
return new FileDto(obj, fileHelper.getChunks(obj));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -0,0 +1,36 @@
|
||||
package com.usatiuk.dhfs.files.objects;
|
||||
|
||||
import com.usatiuk.dhfs.objects.JObjectKey;
|
||||
import com.usatiuk.dhfs.objects.jmap.JMapHelper;
|
||||
import com.usatiuk.dhfs.objects.jmap.JMapLongKey;
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
import jakarta.inject.Inject;
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
@ApplicationScoped
|
||||
public class FileHelper {
|
||||
@Inject
|
||||
JMapHelper jMapHelper;
|
||||
|
||||
public List<Pair<Long, JObjectKey>> getChunks(File file) {
|
||||
ArrayList<Pair<Long, JObjectKey>> chunks = new ArrayList<>();
|
||||
try (var it = jMapHelper.getIterator(file)) {
|
||||
while (it.hasNext()) {
|
||||
var cur = it.next();
|
||||
chunks.add(Pair.of(cur.getKey().key(), cur.getValue().ref()));
|
||||
}
|
||||
}
|
||||
return List.copyOf(chunks);
|
||||
}
|
||||
|
||||
public void replaceChunks(File file, List<Pair<Long, JObjectKey>> chunks) {
|
||||
jMapHelper.deleteAll(file);
|
||||
|
||||
for (var f : chunks) {
|
||||
jMapHelper.put(file, JMapLongKey.of(f.getLeft()), f.getRight());
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,22 +1,29 @@
|
||||
package com.usatiuk.dhfs.files.objects;
|
||||
|
||||
import com.usatiuk.dhfs.objects.JObjectKey;
|
||||
import com.usatiuk.dhfs.objects.PeerId;
|
||||
import com.usatiuk.dhfs.objects.RemoteObjectDataWrapper;
|
||||
import com.usatiuk.dhfs.objects.RemoteObjectMeta;
|
||||
import com.usatiuk.dhfs.files.service.DhfsFileService;
|
||||
import com.usatiuk.dhfs.objects.*;
|
||||
import com.usatiuk.dhfs.objects.jkleppmanntree.JKleppmannTreeManager;
|
||||
import com.usatiuk.dhfs.objects.jkleppmanntree.structs.JKleppmannTreeNodeMetaFile;
|
||||
import com.usatiuk.dhfs.objects.jmap.JMapHelper;
|
||||
import com.usatiuk.dhfs.objects.jmap.JMapLongKey;
|
||||
import com.usatiuk.dhfs.objects.repository.ObjSyncHandler;
|
||||
import com.usatiuk.dhfs.objects.repository.PersistentPeerDataService;
|
||||
import com.usatiuk.dhfs.objects.repository.SyncHelper;
|
||||
import com.usatiuk.dhfs.objects.transaction.LockingStrategy;
|
||||
import com.usatiuk.dhfs.objects.transaction.Transaction;
|
||||
import com.usatiuk.kleppmanntree.AlreadyExistsException;
|
||||
import io.grpc.Status;
|
||||
import io.grpc.StatusRuntimeException;
|
||||
import io.quarkus.logging.Log;
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
import jakarta.inject.Inject;
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
import org.pcollections.HashPMap;
|
||||
import org.pcollections.HashTreePMap;
|
||||
import org.pcollections.PMap;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
|
||||
@ApplicationScoped
|
||||
public class FileSyncHandler implements ObjSyncHandler<File, FileDto> {
|
||||
@@ -26,6 +33,121 @@ public class FileSyncHandler implements ObjSyncHandler<File, FileDto> {
|
||||
PersistentPeerDataService persistentPeerDataService;
|
||||
@Inject
|
||||
JMapHelper jMapHelper;
|
||||
@Inject
|
||||
RemoteTransaction remoteTx;
|
||||
@Inject
|
||||
FileHelper fileHelper;
|
||||
|
||||
@Inject
|
||||
JKleppmannTreeManager jKleppmannTreeManager;
|
||||
@Inject
|
||||
DhfsFileService fileService;
|
||||
|
||||
private JKleppmannTreeManager.JKleppmannTree getTreeW() {
|
||||
return jKleppmannTreeManager.getTree(new JObjectKey("fs"));
|
||||
}
|
||||
|
||||
private JKleppmannTreeManager.JKleppmannTree getTreeR() {
|
||||
return jKleppmannTreeManager.getTree(new JObjectKey("fs"), LockingStrategy.OPTIMISTIC);
|
||||
}
|
||||
|
||||
private void resolveConflict(PeerId from, JObjectKey key, PMap<PeerId, Long> receivedChangelog,
|
||||
@Nullable FileDto receivedData) {
|
||||
var oursCurMeta = curTx.get(RemoteObjectMeta.class, key).orElse(null);
|
||||
|
||||
if (!oursCurMeta.knownType().isAssignableFrom(File.class))
|
||||
throw new IllegalStateException("Object type mismatch: " + oursCurMeta.knownType() + " vs " + File.class);
|
||||
|
||||
if (!oursCurMeta.knownType().equals(File.class))
|
||||
oursCurMeta = oursCurMeta.withKnownType(File.class);
|
||||
|
||||
curTx.put(oursCurMeta);
|
||||
|
||||
var oursCurFile = remoteTx.getDataLocal(File.class, key).orElse(null);
|
||||
if (oursCurFile == null)
|
||||
throw new StatusRuntimeException(Status.ABORTED.withDescription("Conflict but we don't have local copy"));
|
||||
|
||||
var theirsFile = receivedData.file();
|
||||
|
||||
File first;
|
||||
File second;
|
||||
List<Pair<Long, JObjectKey>> firstChunks;
|
||||
List<Pair<Long, JObjectKey>> secondChunks;
|
||||
PeerId otherHostname;
|
||||
|
||||
if (oursCurFile.mTime() >= theirsFile.mTime()) {
|
||||
first = oursCurFile;
|
||||
firstChunks = fileHelper.getChunks(oursCurFile);
|
||||
second = theirsFile;
|
||||
secondChunks = receivedData.chunks();
|
||||
otherHostname = from;
|
||||
} else {
|
||||
second = oursCurFile;
|
||||
secondChunks = fileHelper.getChunks(oursCurFile);
|
||||
first = theirsFile;
|
||||
firstChunks = receivedData.chunks();
|
||||
otherHostname = persistentPeerDataService.getSelfUuid();
|
||||
}
|
||||
|
||||
HashPMap<PeerId, Long> newChangelog = HashTreePMap.from(oursCurMeta.changelog());
|
||||
|
||||
for (var entry : receivedChangelog.entrySet()) {
|
||||
newChangelog = newChangelog.plus(entry.getKey(),
|
||||
Long.max(newChangelog.getOrDefault(entry.getKey(), 0L), entry.getValue())
|
||||
);
|
||||
}
|
||||
|
||||
boolean chunksDiff = !Objects.equals(firstChunks, secondChunks);
|
||||
|
||||
boolean wasChanged = first.mTime() != second.mTime()
|
||||
|| first.cTime() != second.cTime()
|
||||
|| first.symlink() != second.symlink()
|
||||
|| chunksDiff;
|
||||
|
||||
if (wasChanged) {
|
||||
oursCurMeta = oursCurMeta.withChangelog(
|
||||
newChangelog.plus(persistentPeerDataService.getSelfUuid(), newChangelog.get(persistentPeerDataService.getSelfUuid()) + 1)
|
||||
);
|
||||
|
||||
curTx.put(oursCurMeta);
|
||||
remoteTx.putDataRaw(oursCurFile.withCTime(first.cTime()).withMTime(first.mTime()).withMode(first.mode()).withSymlink(first.symlink()));
|
||||
fileHelper.replaceChunks(oursCurFile, firstChunks);
|
||||
|
||||
var newFile = new File(
|
||||
JObjectKey.random(), second.mode(), second.cTime(), second.mTime(), second.symlink()
|
||||
);
|
||||
remoteTx.putData(newFile);
|
||||
fileHelper.replaceChunks(newFile, secondChunks);
|
||||
|
||||
var parent = fileService.inoToParent(oursCurFile.key());
|
||||
|
||||
int i = 0;
|
||||
|
||||
do {
|
||||
try {
|
||||
getTreeW().move(parent.getRight(),
|
||||
new JKleppmannTreeNodeMetaFile(
|
||||
parent.getLeft() + ".fconflict." + persistentPeerDataService.getSelfUuid() + "." + otherHostname.toString() + "." + i,
|
||||
newFile.key()
|
||||
),
|
||||
getTreeW().getNewNodeId()
|
||||
);
|
||||
} catch (AlreadyExistsException aex) {
|
||||
i++;
|
||||
continue;
|
||||
}
|
||||
break;
|
||||
} while (true);
|
||||
}
|
||||
|
||||
var curKnownRemoteVersion = oursCurMeta.knownRemoteVersions().get(from);
|
||||
var receivedTotalVer = receivedChangelog.values().stream().mapToLong(Long::longValue).sum();
|
||||
|
||||
if (curKnownRemoteVersion == null || curKnownRemoteVersion < receivedTotalVer) {
|
||||
oursCurMeta = oursCurMeta.withKnownRemoteVersions(oursCurMeta.knownRemoteVersions().plus(from, receivedTotalVer));
|
||||
curTx.put(oursCurMeta);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleRemoteUpdate(PeerId from, JObjectKey key, PMap<PeerId, Long> receivedChangelog,
|
||||
@@ -47,11 +169,15 @@ public class FileSyncHandler implements ObjSyncHandler<File, FileDto> {
|
||||
curTx.put(curTx.get(RemoteObjectDataWrapper.class, RemoteObjectMeta.ofDataKey(current.key()))
|
||||
.map(w -> w.withData(receivedData.file())).orElse(new RemoteObjectDataWrapper<>(receivedData.file())));
|
||||
|
||||
jMapHelper.deleteAll(receivedData.file());
|
||||
if (!current.knownType().isAssignableFrom(File.class))
|
||||
throw new IllegalStateException("Object type mismatch: " + current.knownType() + " vs " + File.class);
|
||||
|
||||
for (var f : receivedData.chunks()) {
|
||||
jMapHelper.put(receivedData.file(), JMapLongKey.of(f.getLeft()), f.getRight());
|
||||
}
|
||||
if (!current.knownType().equals(File.class))
|
||||
current = current.withKnownType(File.class);
|
||||
|
||||
curTx.put(current);
|
||||
|
||||
fileHelper.replaceChunks(receivedData.file(), receivedData.chunks());
|
||||
}
|
||||
}
|
||||
case NEWER -> {
|
||||
@@ -66,11 +192,15 @@ public class FileSyncHandler implements ObjSyncHandler<File, FileDto> {
|
||||
curTx.put(curTx.get(RemoteObjectDataWrapper.class, RemoteObjectMeta.ofDataKey(current.key()))
|
||||
.map(w -> w.withData(receivedData.file())).orElse(new RemoteObjectDataWrapper<>(receivedData.file())));
|
||||
|
||||
jMapHelper.deleteAll(receivedData.file());
|
||||
if (!current.knownType().isAssignableFrom(File.class))
|
||||
throw new IllegalStateException("Object type mismatch: " + current.knownType() + " vs " + File.class);
|
||||
|
||||
for (var f : receivedData.chunks()) {
|
||||
jMapHelper.put(receivedData.file(), JMapLongKey.of(f.getLeft()), f.getRight());
|
||||
}
|
||||
if (!current.knownType().equals(File.class))
|
||||
current = current.withKnownType(File.class);
|
||||
|
||||
curTx.put(current);
|
||||
|
||||
fileHelper.replaceChunks(receivedData.file(), receivedData.chunks());
|
||||
} else {
|
||||
current = current.withHaveLocal(false);
|
||||
curTx.put(current);
|
||||
@@ -82,6 +212,8 @@ public class FileSyncHandler implements ObjSyncHandler<File, FileDto> {
|
||||
}
|
||||
case CONFLICT -> {
|
||||
Log.debug("Conflict on update (inconsistent version): " + key + " from " + from);
|
||||
assert receivedData != null;
|
||||
resolveConflict(from, key, receivedChangelog, receivedData);
|
||||
// TODO:
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -75,6 +75,19 @@ public class RemoteTransaction {
|
||||
return curTx.get(RemoteObjectMeta.class, RemoteObjectMeta.ofMetaKey(key), strategy);
|
||||
}
|
||||
|
||||
public <T extends JDataRemote> void putDataRaw(T obj) {
|
||||
var curMeta = getMeta(obj.key()).orElse(null);
|
||||
if (curMeta == null)
|
||||
throw new IllegalArgumentException("No data found for " + obj.key() + " when in putDataRaw");
|
||||
|
||||
if (!curMeta.knownType().isAssignableFrom(obj.getClass()))
|
||||
throw new IllegalStateException("Object type mismatch: " + curMeta.knownType() + " vs " + obj.getClass());
|
||||
|
||||
var newData = curTx.get(RemoteObjectDataWrapper.class, RemoteObjectMeta.ofDataKey(obj.key()))
|
||||
.map(w -> w.withData(obj)).orElse(new RemoteObjectDataWrapper<>(obj));
|
||||
curTx.put(newData);
|
||||
}
|
||||
|
||||
public <T extends JDataRemote> void putData(T obj) {
|
||||
var curMeta = getMeta(obj.key()).orElse(null);
|
||||
|
||||
@@ -88,7 +101,12 @@ public class RemoteTransaction {
|
||||
// return;
|
||||
if (!curMeta.knownType().isAssignableFrom(obj.getClass()))
|
||||
throw new IllegalStateException("Object type mismatch: " + curMeta.knownType() + " vs " + obj.getClass());
|
||||
|
||||
var newMeta = curMeta;
|
||||
|
||||
if (!curMeta.knownType().equals(obj.getClass()))
|
||||
newMeta = newMeta.withKnownType(obj.getClass());
|
||||
|
||||
newMeta = newMeta.withChangelog(newMeta.changelog().plus(persistentPeerDataService.getSelfUuid(),
|
||||
newMeta.changelog().get(persistentPeerDataService.getSelfUuid()) + 1));
|
||||
curTx.put(newMeta);
|
||||
|
||||
@@ -36,6 +36,14 @@ public class DefaultObjSyncHandler {
|
||||
curTx.put(current);
|
||||
curTx.put(curTx.get(RemoteObjectDataWrapper.class, RemoteObjectMeta.ofDataKey(current.key()))
|
||||
.map(w -> w.withData(receivedData)).orElse(new RemoteObjectDataWrapper<>(receivedData)));
|
||||
|
||||
if (!current.knownType().isAssignableFrom(receivedData.getClass()))
|
||||
throw new IllegalStateException("Object type mismatch: " + current.knownType() + " vs " + receivedData.getClass());
|
||||
|
||||
if (!current.knownType().equals(receivedData.getClass()))
|
||||
current = current.withKnownType(receivedData.getClass());
|
||||
|
||||
curTx.put(current);
|
||||
}
|
||||
}
|
||||
case NEWER -> {
|
||||
@@ -49,6 +57,14 @@ public class DefaultObjSyncHandler {
|
||||
curTx.put(current);
|
||||
curTx.put(curTx.get(RemoteObjectDataWrapper.class, RemoteObjectMeta.ofDataKey(current.key()))
|
||||
.map(w -> w.withData(receivedData)).orElse(new RemoteObjectDataWrapper<>(receivedData)));
|
||||
|
||||
if (!current.knownType().isAssignableFrom(receivedData.getClass()))
|
||||
throw new IllegalStateException("Object type mismatch: " + current.knownType() + " vs " + receivedData.getClass());
|
||||
|
||||
if (!current.knownType().equals(receivedData.getClass()))
|
||||
current = current.withKnownType(receivedData.getClass());
|
||||
|
||||
curTx.put(current);
|
||||
} else {
|
||||
current = current.withHaveLocal(false);
|
||||
curTx.put(current);
|
||||
|
||||
@@ -50,12 +50,13 @@ public class RemoteObjectServiceClient {
|
||||
|
||||
private final ExecutorService _batchExecutor = Executors.newVirtualThreadPerTaskExecutor();
|
||||
|
||||
// public Pair<ObjectHeader, JObjectDataP> getSpecificObject(UUID host, String name) {
|
||||
// return rpcClientFactory.withObjSyncClient(host, client -> {
|
||||
// var reply = client.getObject(GetObjectRequest.newBuilder().setSelfUuid(persistentPeerDataService.getSelfUuid().toString()).setName(name).build());
|
||||
// return Pair.of(reply.getObject().getHeader(), reply.getObject().getContent());
|
||||
// });
|
||||
// }
|
||||
public Pair<PeerId, ReceivedObject> getSpecificObject(JObjectKey key, PeerId peerId) {
|
||||
return rpcClientFactory.withObjSyncClient(peerId, (peer, client) -> {
|
||||
var reply = client.getObject(GetObjectRequest.newBuilder().setName(JObjectKeyP.newBuilder().setName(key.toString()).build()).build());
|
||||
var deserialized = receivedObjectProtoSerializer.deserialize(reply);
|
||||
return Pair.of(peer, deserialized);
|
||||
});
|
||||
}
|
||||
|
||||
public void getObject(JObjectKey key, Function<Pair<PeerId, ReceivedObject>, Boolean> onReceive) {
|
||||
var objMeta = remoteTx.getMeta(key).orElse(null);
|
||||
|
||||
@@ -3,6 +3,7 @@ package com.usatiuk.dhfs.objects.repository;
|
||||
import com.usatiuk.dhfs.objects.JDataRemote;
|
||||
import com.usatiuk.dhfs.objects.JObjectKey;
|
||||
import com.usatiuk.dhfs.objects.PeerId;
|
||||
import com.usatiuk.dhfs.objects.RemoteTransaction;
|
||||
import com.usatiuk.dhfs.objects.iterators.IteratorStart;
|
||||
import com.usatiuk.dhfs.objects.repository.invalidation.InvalidationQueueService;
|
||||
import com.usatiuk.dhfs.objects.transaction.Transaction;
|
||||
@@ -33,9 +34,13 @@ public class SyncHandler {
|
||||
InvalidationQueueService invalidationQueueService;
|
||||
@Inject
|
||||
DefaultObjSyncHandler defaultObjSyncHandler;
|
||||
@Inject
|
||||
RemoteTransaction remoteTx;
|
||||
|
||||
private final Map<Class<? extends JDataRemote>, ObjSyncHandler> _objToSyncHandler;
|
||||
private final Map<Class<? extends JDataRemoteDto>, ObjSyncHandler> _dtoToSyncHandler;
|
||||
@Inject
|
||||
RemoteObjectServiceClient remoteObjectServiceClient;
|
||||
|
||||
public SyncHandler(Instance<ObjSyncHandler<?, ?>> syncHandlers) {
|
||||
HashMap<Class<? extends JDataRemote>, ObjSyncHandler> objToHandlerMap = new HashMap<>();
|
||||
@@ -65,6 +70,18 @@ public class SyncHandler {
|
||||
public <D extends JDataRemoteDto> void handleRemoteUpdate(PeerId from, JObjectKey key,
|
||||
PMap<PeerId, Long> receivedChangelog,
|
||||
@Nullable D receivedData) {
|
||||
if (receivedData == null) {
|
||||
var current = remoteTx.getMeta(key);
|
||||
if (current.isPresent()) {
|
||||
var cmp = SyncHelper.compareChangelogs(current.get().changelog(), receivedChangelog);
|
||||
if (cmp.equals(SyncHelper.ChangelogCmpResult.CONFLICT)) {
|
||||
var got = remoteObjectServiceClient.getSpecificObject(key, from);
|
||||
handleRemoteUpdate(from, key, got.getRight().changelog(), got.getRight().data());
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
var got = Optional.ofNullable(receivedData).flatMap(d -> Optional.ofNullable(_dtoToSyncHandler.get(d.getClass()))).orElse(null);
|
||||
if (got == null) {
|
||||
assert receivedData == null || receivedData.objClass().equals(receivedData.getClass());
|
||||
|
||||
Reference in New Issue
Block a user