This commit is contained in:
2025-03-20 17:14:25 +01:00
parent 65cfabe197
commit bf03719d5d
3 changed files with 90 additions and 15 deletions

View File

@@ -5,12 +5,17 @@ import com.usatiuk.dhfs.supportlib.UninitializedByteBuffer;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.UUID;
public record JObjectKey(String name) implements Serializable, Comparable<JObjectKey> {
public static JObjectKey of(String name) {
return new JObjectKey(name);
}
public static JObjectKey random() {
return new JObjectKey(UUID.randomUUID().toString());
}
public static JObjectKey first() {
return new JObjectKey("");
}

View File

@@ -1,17 +1,23 @@
package com.usatiuk.dhfs.files.objects;
import com.usatiuk.dhfs.files.service.DhfsFileService;
import com.usatiuk.dhfs.objects.*;
import com.usatiuk.dhfs.objects.jkleppmanntree.JKleppmannTreeManager;
import com.usatiuk.dhfs.objects.jkleppmanntree.structs.JKleppmannTreeNodeMetaFile;
import com.usatiuk.dhfs.objects.jmap.JMapHelper;
import com.usatiuk.dhfs.objects.repository.ObjSyncHandler;
import com.usatiuk.dhfs.objects.repository.PersistentPeerDataService;
import com.usatiuk.dhfs.objects.repository.SyncHelper;
import com.usatiuk.dhfs.objects.transaction.LockingStrategy;
import com.usatiuk.dhfs.objects.transaction.Transaction;
import com.usatiuk.kleppmanntree.AlreadyExistsException;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.quarkus.logging.Log;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import org.apache.commons.lang3.tuple.Pair;
import org.pcollections.HashPMap;
import org.pcollections.HashTreePMap;
import org.pcollections.PMap;
@@ -34,16 +40,26 @@ public class FileSyncHandler implements ObjSyncHandler<File, FileDto> {
@Inject
FileHelper fileHelper;
@Inject
JKleppmannTreeManager jKleppmannTreeManager;
@Inject
DhfsFileService fileService;
private JKleppmannTreeManager.JKleppmannTree getTreeW() {
return jKleppmannTreeManager.getTree(new JObjectKey("fs"));
}
private JKleppmannTreeManager.JKleppmannTree getTreeR() {
return jKleppmannTreeManager.getTree(new JObjectKey("fs"), LockingStrategy.OPTIMISTIC);
}
private void resolveConflict(PeerId from, JObjectKey key, PMap<PeerId, Long> receivedChangelog,
@Nullable FileDto receivedData) {
var current = curTx.get(RemoteObjectMeta.class, key).orElse(null);
var curKnownRemoteVersion = current.knownRemoteVersions().get(from);
var receivedTotalVer = receivedChangelog.values().stream().mapToLong(Long::longValue).sum();
var data = remoteTx.getDataLocal(File.class, key).orElse(null);
if (data == null)
var oursCurMeta = curTx.get(RemoteObjectMeta.class, key).orElse(null);
var oursCurFile = remoteTx.getDataLocal(File.class, key).orElse(null);
if (oursCurFile == null)
throw new StatusRuntimeException(Status.ABORTED.withDescription("Conflict but we don't have local copy"));
var oursFile = data;
var theirsFile = receivedData.file();
File first;
@@ -52,24 +68,26 @@ public class FileSyncHandler implements ObjSyncHandler<File, FileDto> {
List<Pair<Long, JObjectKey>> secondChunks;
PeerId otherHostname;
if (oursFile.mTime() >= theirsFile.mTime()) {
first = oursFile;
firstChunks = fileHelper.getChunks(oursFile);
if (oursCurFile.mTime() >= theirsFile.mTime()) {
first = oursCurFile;
firstChunks = fileHelper.getChunks(oursCurFile);
second = theirsFile;
secondChunks = receivedData.chunks();
otherHostname = from;
} else {
second = oursFile;
secondChunks = fileHelper.getChunks(oursFile);
second = oursCurFile;
secondChunks = fileHelper.getChunks(oursCurFile);
first = theirsFile;
firstChunks = receivedData.chunks();
otherHostname = persistentPeerDataService.getSelfUuid();
}
Map<PeerId, Long> newChangelog = new LinkedHashMap<>(current.changelog());
HashPMap<PeerId, Long> newChangelog = HashTreePMap.from(oursCurMeta.changelog());
for (var entry : receivedChangelog.entrySet()) {
newChangelog.merge(entry.getKey(), entry.getValue(), Long::max);
newChangelog = newChangelog.plus(entry.getKey(),
Long.max(newChangelog.getOrDefault(entry.getKey(), 0L), entry.getValue())
);
}
boolean chunksDiff = !Objects.equals(firstChunks, secondChunks);
@@ -79,10 +97,48 @@ public class FileSyncHandler implements ObjSyncHandler<File, FileDto> {
|| first.symlink() != second.symlink()
|| chunksDiff;
if (wasChanged) {
oursCurMeta = oursCurMeta.withChangelog(
newChangelog.plus(persistentPeerDataService.getSelfUuid(), newChangelog.get(persistentPeerDataService.getSelfUuid()) + 1)
);
curTx.put(oursCurMeta);
remoteTx.putDataRaw(oursCurFile.withCTime(first.cTime()).withMTime(first.mTime()).withMode(first.mode()).withSymlink(first.symlink()));
fileHelper.replaceChunks(oursCurFile, firstChunks);
var newFile = new File(
JObjectKey.random(), second.mode(), second.cTime(), second.mTime(), second.symlink()
);
remoteTx.putData(newFile);
fileHelper.replaceChunks(newFile, secondChunks);
var parent = fileService.inoToParent(oursCurFile.key());
int i = 0;
do {
try {
getTreeW().move(parent.getRight(),
new JKleppmannTreeNodeMetaFile(
parent.getLeft() + ".fconflict." + persistentPeerDataService.getSelfUuid() + "." + otherHostname.toString() + "." + i,
newFile.key()
),
getTreeW().getNewNodeId()
);
} catch (AlreadyExistsException aex) {
i++;
continue;
}
break;
} while (true);
}
var curKnownRemoteVersion = oursCurMeta.knownRemoteVersions().get(from);
var receivedTotalVer = receivedChangelog.values().stream().mapToLong(Long::longValue).sum();
if (curKnownRemoteVersion == null || curKnownRemoteVersion < receivedTotalVer) {
current = current.withKnownRemoteVersions(current.knownRemoteVersions().plus(from, receivedTotalVer));
curTx.put(current);
oursCurMeta = oursCurMeta.withKnownRemoteVersions(oursCurMeta.knownRemoteVersions().plus(from, receivedTotalVer));
curTx.put(oursCurMeta);
}
}
@@ -133,6 +189,7 @@ public class FileSyncHandler implements ObjSyncHandler<File, FileDto> {
}
case CONFLICT -> {
Log.debug("Conflict on update (inconsistent version): " + key + " from " + from);
resolveConflict(from, key, receivedChangelog, receivedData);
// TODO:
return;
}

View File

@@ -75,6 +75,19 @@ public class RemoteTransaction {
return curTx.get(RemoteObjectMeta.class, RemoteObjectMeta.ofMetaKey(key), strategy);
}
public <T extends JDataRemote> void putDataRaw(T obj) {
var curMeta = getMeta(obj.key()).orElse(null);
if (curMeta == null)
throw new IllegalArgumentException("No data found for " + obj.key() + " when in putDataRaw");
if (!curMeta.knownType().isAssignableFrom(obj.getClass()))
throw new IllegalStateException("Object type mismatch: " + curMeta.knownType() + " vs " + obj.getClass());
var newData = curTx.get(RemoteObjectDataWrapper.class, RemoteObjectMeta.ofDataKey(obj.key()))
.map(w -> w.withData(obj)).orElse(new RemoteObjectDataWrapper<>(obj));
curTx.put(newData);
}
public <T extends JDataRemote> void putData(T obj) {
var curMeta = getMeta(obj.key()).orElse(null);