From 9bd4d1914766307d741d4b85b2c123711962c9d6 Mon Sep 17 00:00:00 2001 From: Stepan Usatiuk Date: Mon, 3 Feb 2025 22:43:58 +0100 Subject: [PATCH] barely working file sync --- .../com/usatiuk/dhfs/objects/JObjectKey.java | 5 ++ .../dhfs/objects/ConflictResolver.java | 5 ++ .../usatiuk/dhfs/objects/DeleterTxHook.java | 9 ++++ .../dhfs/objects/RemoteTransaction.java | 46 +++++++++++-------- .../dhfs/objects/repository/SyncHandler.java | 7 ++- 5 files changed, 48 insertions(+), 24 deletions(-) create mode 100644 dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/ConflictResolver.java diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/JObjectKey.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/JObjectKey.java index 2da79825..67e368ac 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/JObjectKey.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/JObjectKey.java @@ -11,4 +11,9 @@ public record JObjectKey(String name) implements Serializable, Comparable ours, RemoteObject theirs); +} diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/DeleterTxHook.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/DeleterTxHook.java index d91911be..5fae3165 100644 --- a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/DeleterTxHook.java +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/DeleterTxHook.java @@ -16,6 +16,9 @@ public class DeleterTxHook implements PreCommitTxHook { @Override public void onChange(JObjectKey key, JData old, JData cur) { + if (cur instanceof RemoteObject) { + return; // FIXME: + } if (!(cur instanceof JDataRefcounted refCur)) { return; } @@ -28,6 +31,9 @@ public class DeleterTxHook implements PreCommitTxHook { @Override public void onCreate(JObjectKey key, JData cur) { + if (cur instanceof RemoteObject) { + return; + } if (!(cur instanceof JDataRefcounted refCur)) { return; } @@ -40,6 +46,9 @@ public class DeleterTxHook implements PreCommitTxHook { @Override public void onDelete(JObjectKey key, JData cur) { + if (cur instanceof RemoteObject) { + return; + } if (!(cur instanceof JDataRefcounted refCur)) { return; } diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/RemoteTransaction.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/RemoteTransaction.java index e7187193..81e53222 100644 --- a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/RemoteTransaction.java +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/RemoteTransaction.java @@ -5,6 +5,7 @@ import com.usatiuk.dhfs.objects.repository.RemoteObjectServiceClient; import com.usatiuk.dhfs.objects.repository.SyncHandler; import com.usatiuk.dhfs.objects.transaction.LockingStrategy; import com.usatiuk.dhfs.objects.transaction.Transaction; +import io.quarkus.logging.Log; import jakarta.enterprise.context.ApplicationScoped; import jakarta.inject.Inject; import org.apache.commons.lang3.mutable.MutableObject; @@ -26,39 +27,45 @@ public class RemoteTransaction { return curTx.getId(); } - private RemoteObject tryDownloadRemote(RemoteObject obj) { + private Optional> tryDownloadRemote(RemoteObject obj) { MutableObject> success = new MutableObject<>(null); - remoteObjectServiceClient.getObject(obj.key(), rcv -> { - if (!obj.meta().knownType().isInstance(rcv.getRight().data())) - throw new IllegalStateException("Object type mismatch: " + obj.meta().knownType() + " vs " + rcv.getRight().data().getClass()); + try { + remoteObjectServiceClient.getObject(obj.key(), rcv -> { + if (!obj.meta().knownType().isInstance(rcv.getRight().data())) + throw new IllegalStateException("Object type mismatch: " + obj.meta().knownType() + " vs " + rcv.getRight().data().getClass()); - if (!rcv.getRight().changelog().equals(obj.meta().changelog())) { - var updated = syncHandler.handleRemoteUpdate(rcv.getLeft(), obj.key(), obj, rcv.getRight().changelog()); - if (!rcv.getRight().changelog().equals(updated.meta().changelog())) - throw new IllegalStateException("Changelog mismatch, update failed?: " + rcv.getRight().changelog() + " vs " + updated.meta().changelog()); - success.setValue(updated.withData((T) rcv.getRight().data())); - } else { - success.setValue(obj.withData((T) rcv.getRight().data())); - } - return true; - }); + if (!rcv.getRight().changelog().equals(obj.meta().changelog())) { + var updated = syncHandler.handleRemoteUpdate(rcv.getLeft(), obj.key(), obj, rcv.getRight().changelog()); + if (!rcv.getRight().changelog().equals(updated.meta().changelog())) + throw new IllegalStateException("Changelog mismatch, update failed?: " + rcv.getRight().changelog() + " vs " + updated.meta().changelog()); + success.setValue(updated.withData((T) rcv.getRight().data())); + } else { + success.setValue(obj.withData((T) rcv.getRight().data())); + } + return true; + }); + } catch (Exception e) { + Log.error("Failed to download object " + obj.key(), e); + return Optional.empty(); + } curTx.put(success.getValue()); - return success.getValue(); + return Optional.of(success.getValue()); } @SuppressWarnings("unchecked") public Optional> get(Class type, JObjectKey key, LockingStrategy strategy) { return curTx.get(RemoteObject.class, key, strategy) - .map(obj -> { + .flatMap(obj -> { if (obj.data() != null && !type.isInstance(obj.data())) throw new IllegalStateException("Object (real) type mismatch: " + obj.data().getClass() + " vs " + type); - if (!type.isAssignableFrom(obj.meta().knownType())) - throw new IllegalStateException("Object (meta) type mismatch: " + obj.meta().knownType() + " vs " + type); +// FIXME: +// if (!type.isAssignableFrom(obj.meta().knownType())) +// throw new IllegalStateException("Object (meta) type mismatch: " + obj.meta().knownType() + " vs " + type); if (obj.data() != null) - return obj; + return Optional.of(obj); else return tryDownloadRemote(obj); }); @@ -72,7 +79,6 @@ public class RemoteTransaction { return get(type, key, strategy).map(RemoteObject::data); } - public void put(RemoteObject obj) { curTx.put(obj); } diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/SyncHandler.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/SyncHandler.java index 4e7f883f..d8f9516f 100644 --- a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/SyncHandler.java +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/SyncHandler.java @@ -8,7 +8,6 @@ import com.usatiuk.dhfs.objects.transaction.Transaction; import io.quarkus.logging.Log; import jakarta.enterprise.context.ApplicationScoped; import jakarta.inject.Inject; -import org.apache.commons.lang3.NotImplementedException; import org.pcollections.PMap; import java.util.stream.Collectors; @@ -172,11 +171,11 @@ public class SyncHandler { // resolve(from, theirsHeader, theirsData, found); // }); // Log. info("Resolved conflict for " + from + " " + header.getName()); - throw new NotImplementedException(); +// throw new NotImplementedException(); } else if (hasLower) { Log.info("Received older index update than known: " + from + " " + current.key()); // throw new OutdatedUpdateException(); - throw new NotImplementedException(); +// throw new NotImplementedException(); } else if (hasHigher) { var newChangelog = rcvChangelog.containsKey(persistentPeerDataService.getSelfUuid()) ? rcvChangelog : rcvChangelog.plus(persistentPeerDataService.getSelfUuid(), 0L); @@ -205,7 +204,7 @@ public class SyncHandler { if (current == null) { var obj = new RemoteObject<>(key, rcv); curTx.put(obj); - return (RemoteObject) obj; + current = (RemoteObject) obj; // Will update known remote version too } var newObj = handleOneUpdate(from, current, rcv);