barely working file sync

This commit is contained in:
2025-02-03 22:43:58 +01:00
parent 7c06241876
commit 9bd4d19147
5 changed files with 48 additions and 24 deletions

View File

@@ -11,4 +11,9 @@ public record JObjectKey(String name) implements Serializable, Comparable<JObjec
public int compareTo(JObjectKey o) {
return name.compareTo(o.name);
}
@Override
public String toString() {
return name;
}
}

View File

@@ -0,0 +1,5 @@
package com.usatiuk.dhfs.objects;
public interface ConflictResolver {
void resolve(PeerId fromPeer, RemoteObject<?> ours, RemoteObject<?> theirs);
}

View File

@@ -16,6 +16,9 @@ public class DeleterTxHook implements PreCommitTxHook {
@Override
public void onChange(JObjectKey key, JData old, JData cur) {
if (cur instanceof RemoteObject<?>) {
return; // FIXME:
}
if (!(cur instanceof JDataRefcounted refCur)) {
return;
}
@@ -28,6 +31,9 @@ public class DeleterTxHook implements PreCommitTxHook {
@Override
public void onCreate(JObjectKey key, JData cur) {
if (cur instanceof RemoteObject<?>) {
return;
}
if (!(cur instanceof JDataRefcounted refCur)) {
return;
}
@@ -40,6 +46,9 @@ public class DeleterTxHook implements PreCommitTxHook {
@Override
public void onDelete(JObjectKey key, JData cur) {
if (cur instanceof RemoteObject<?>) {
return;
}
if (!(cur instanceof JDataRefcounted refCur)) {
return;
}

View File

@@ -5,6 +5,7 @@ import com.usatiuk.dhfs.objects.repository.RemoteObjectServiceClient;
import com.usatiuk.dhfs.objects.repository.SyncHandler;
import com.usatiuk.dhfs.objects.transaction.LockingStrategy;
import com.usatiuk.dhfs.objects.transaction.Transaction;
import io.quarkus.logging.Log;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import org.apache.commons.lang3.mutable.MutableObject;
@@ -26,39 +27,45 @@ public class RemoteTransaction {
return curTx.getId();
}
private <T extends JDataRemote> RemoteObject<T> tryDownloadRemote(RemoteObject<T> obj) {
private <T extends JDataRemote> Optional<RemoteObject<T>> tryDownloadRemote(RemoteObject<T> obj) {
MutableObject<RemoteObject<T>> success = new MutableObject<>(null);
remoteObjectServiceClient.getObject(obj.key(), rcv -> {
if (!obj.meta().knownType().isInstance(rcv.getRight().data()))
throw new IllegalStateException("Object type mismatch: " + obj.meta().knownType() + " vs " + rcv.getRight().data().getClass());
try {
remoteObjectServiceClient.getObject(obj.key(), rcv -> {
if (!obj.meta().knownType().isInstance(rcv.getRight().data()))
throw new IllegalStateException("Object type mismatch: " + obj.meta().knownType() + " vs " + rcv.getRight().data().getClass());
if (!rcv.getRight().changelog().equals(obj.meta().changelog())) {
var updated = syncHandler.handleRemoteUpdate(rcv.getLeft(), obj.key(), obj, rcv.getRight().changelog());
if (!rcv.getRight().changelog().equals(updated.meta().changelog()))
throw new IllegalStateException("Changelog mismatch, update failed?: " + rcv.getRight().changelog() + " vs " + updated.meta().changelog());
success.setValue(updated.withData((T) rcv.getRight().data()));
} else {
success.setValue(obj.withData((T) rcv.getRight().data()));
}
return true;
});
if (!rcv.getRight().changelog().equals(obj.meta().changelog())) {
var updated = syncHandler.handleRemoteUpdate(rcv.getLeft(), obj.key(), obj, rcv.getRight().changelog());
if (!rcv.getRight().changelog().equals(updated.meta().changelog()))
throw new IllegalStateException("Changelog mismatch, update failed?: " + rcv.getRight().changelog() + " vs " + updated.meta().changelog());
success.setValue(updated.withData((T) rcv.getRight().data()));
} else {
success.setValue(obj.withData((T) rcv.getRight().data()));
}
return true;
});
} catch (Exception e) {
Log.error("Failed to download object " + obj.key(), e);
return Optional.empty();
}
curTx.put(success.getValue());
return success.getValue();
return Optional.of(success.getValue());
}
@SuppressWarnings("unchecked")
public <T extends JDataRemote> Optional<RemoteObject<T>> get(Class<T> type, JObjectKey key, LockingStrategy strategy) {
return curTx.get(RemoteObject.class, key, strategy)
.map(obj -> {
.flatMap(obj -> {
if (obj.data() != null && !type.isInstance(obj.data()))
throw new IllegalStateException("Object (real) type mismatch: " + obj.data().getClass() + " vs " + type);
if (!type.isAssignableFrom(obj.meta().knownType()))
throw new IllegalStateException("Object (meta) type mismatch: " + obj.meta().knownType() + " vs " + type);
// FIXME:
// if (!type.isAssignableFrom(obj.meta().knownType()))
// throw new IllegalStateException("Object (meta) type mismatch: " + obj.meta().knownType() + " vs " + type);
if (obj.data() != null)
return obj;
return Optional.of(obj);
else
return tryDownloadRemote(obj);
});
@@ -72,7 +79,6 @@ public class RemoteTransaction {
return get(type, key, strategy).map(RemoteObject::data);
}
public <T extends JDataRemote> void put(RemoteObject<T> obj) {
curTx.put(obj);
}

View File

@@ -8,7 +8,6 @@ import com.usatiuk.dhfs.objects.transaction.Transaction;
import io.quarkus.logging.Log;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import org.apache.commons.lang3.NotImplementedException;
import org.pcollections.PMap;
import java.util.stream.Collectors;
@@ -172,11 +171,11 @@ public class SyncHandler {
// resolve(from, theirsHeader, theirsData, found);
// });
// Log. info("Resolved conflict for " + from + " " + header.getName());
throw new NotImplementedException();
// throw new NotImplementedException();
} else if (hasLower) {
Log.info("Received older index update than known: " + from + " " + current.key());
// throw new OutdatedUpdateException();
throw new NotImplementedException();
// throw new NotImplementedException();
} else if (hasHigher) {
var newChangelog = rcvChangelog.containsKey(persistentPeerDataService.getSelfUuid()) ?
rcvChangelog : rcvChangelog.plus(persistentPeerDataService.getSelfUuid(), 0L);
@@ -205,7 +204,7 @@ public class SyncHandler {
if (current == null) {
var obj = new RemoteObject<>(key, rcv);
curTx.put(obj);
return (RemoteObject<T>) obj;
current = (RemoteObject<T>) obj; // Will update known remote version too
}
var newObj = handleOneUpdate(from, current, rcv);