diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/files/service/DhfsFileServiceImpl.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/files/service/DhfsFileServiceImpl.java index 53963c00..9172ee1c 100644 --- a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/files/service/DhfsFileServiceImpl.java +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/files/service/DhfsFileServiceImpl.java @@ -75,7 +75,7 @@ public class DhfsFileServiceImpl implements DhfsFileService { private ChunkData createChunk(ByteString bytes) { var newChunk = new ChunkData(JObjectKey.of(UUID.randomUUID().toString()), bytes); - remoteTx.put(newChunk); + remoteTx.putData(newChunk); return newChunk; } @@ -104,7 +104,7 @@ public class DhfsFileServiceImpl implements DhfsFileService { var ref = curTx.get(JData.class, uuid).orElse(null); if (ref == null) return Optional.empty(); GetattrRes ret; - if (ref instanceof RemoteObject r) { + if (ref instanceof RemoteObjectMeta r) { var remote = remoteTx.getData(JDataRemote.class, uuid).orElse(null); if (remote instanceof File f) { ret = new GetattrRes(f.mTime(), f.cTime(), f.mode(), f.symlink() ? GetattrType.SYMLINK : GetattrType.FILE); @@ -157,7 +157,7 @@ public class DhfsFileServiceImpl implements DhfsFileService { var fuuid = UUID.randomUUID(); Log.debug("Creating file " + fuuid); File f = new File(JObjectKey.of(fuuid.toString()), mode, System.currentTimeMillis(), System.currentTimeMillis(), TreePMap.empty(), false, 0); - remoteTx.put(f); + remoteTx.putData(f); try { getTree().move(parent.key(), new JKleppmannTreeNodeMetaFile(fname, f.key()), getTree().getNewNodeId()); @@ -230,10 +230,10 @@ public class DhfsFileServiceImpl implements DhfsFileService { if (dent instanceof JKleppmannTreeNode) { return true; - } else if (dent instanceof RemoteObject) { + } else if (dent instanceof RemoteObjectMeta) { var remote = remoteTx.getData(JDataRemote.class, uuid).orElse(null); if (remote instanceof File f) { - remoteTx.put(f.withMode(mode).withMTime(System.currentTimeMillis())); + remoteTx.putData(f.withMode(mode).withMTime(System.currentTimeMillis())); return true; } else { throw new IllegalArgumentException(uuid + " is not a file"); @@ -502,7 +502,7 @@ public class DhfsFileServiceImpl implements DhfsFileService { } file = file.withChunks(file.chunks().minusAll(removedChunks.keySet()).plusAll(newChunks)).withMTime(System.currentTimeMillis()); - remoteTx.put(file); + remoteTx.putData(file); cleanupChunks(file, removedChunks.values()); updateFileSize(file); @@ -526,7 +526,7 @@ public class DhfsFileServiceImpl implements DhfsFileService { var oldChunks = file.chunks(); file = file.withChunks(TreePMap.empty()).withMTime(System.currentTimeMillis()); - remoteTx.put(file); + remoteTx.putData(file); cleanupChunks(file, oldChunks.values()); updateFileSize(file); return true; @@ -587,7 +587,7 @@ public class DhfsFileServiceImpl implements DhfsFileService { } file = file.withChunks(file.chunks().minusAll(removedChunks.keySet()).plusAll(newChunks)).withMTime(System.currentTimeMillis()); - remoteTx.put(file); + remoteTx.putData(file); cleanupChunks(file, removedChunks.values()); updateFileSize(file); return true; @@ -640,7 +640,7 @@ public class DhfsFileServiceImpl implements DhfsFileService { "File not found for setTimes: " + fileUuid)) ); - remoteTx.put(file.withCTime(atimeMs).withMTime(mtimeMs)); + remoteTx.putData(file.withCTime(atimeMs).withMTime(mtimeMs)); return true; }); } @@ -657,7 +657,7 @@ public class DhfsFileServiceImpl implements DhfsFileService { } if (realSize != file.size()) { - remoteTx.put(file.withSize(realSize)); + remoteTx.putData(file.withSize(realSize)); } }); } diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/ConflictResolver.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/ConflictResolver.java index 1faf082c..f3b1acc7 100644 --- a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/ConflictResolver.java +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/ConflictResolver.java @@ -1,5 +1,5 @@ package com.usatiuk.dhfs.objects; public interface ConflictResolver { - void resolve(PeerId fromPeer, RemoteObject ours, RemoteObject theirs); + void resolve(PeerId fromPeer, RemoteObjectMeta ours, RemoteObjectMeta theirs); } diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/DeleterTxHook.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/DeleterTxHook.java index 25354d8c..8917ef6c 100644 --- a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/DeleterTxHook.java +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/DeleterTxHook.java @@ -22,7 +22,7 @@ public class DeleterTxHook implements PreCommitTxHook { return; } if (canDelete(refCur)) { - if (refCur instanceof RemoteObject ro) { + if (refCur instanceof RemoteObjectMeta ro) { remoteObjectDeleter.putDeletionCandidate(ro); return; } @@ -38,7 +38,7 @@ public class DeleterTxHook implements PreCommitTxHook { } if (canDelete(refCur)) { - if (refCur instanceof RemoteObject ro) { + if (refCur instanceof RemoteObjectMeta ro) { remoteObjectDeleter.putDeletionCandidate(ro); return; } diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/RefcounterTxHook.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/RefcounterTxHook.java index e239b8f2..7cb22447 100644 --- a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/RefcounterTxHook.java +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/RefcounterTxHook.java @@ -1,8 +1,6 @@ package com.usatiuk.dhfs.objects; import com.usatiuk.dhfs.objects.jkleppmanntree.structs.JKleppmannTreeNode; -import com.usatiuk.dhfs.objects.jkleppmanntree.structs.JKleppmannTreeNodeMetaFile; -import com.usatiuk.dhfs.objects.jkleppmanntree.structs.JKleppmannTreePersistentData; import com.usatiuk.dhfs.objects.transaction.Transaction; import jakarta.enterprise.context.ApplicationScoped; import jakarta.inject.Inject; @@ -19,8 +17,8 @@ public class RefcounterTxHook implements PreCommitTxHook { return found; } - if (cur instanceof RemoteObject || cur instanceof JKleppmannTreeNode) { - return new RemoteObject<>(key); + if (cur instanceof RemoteObjectMeta || cur instanceof JKleppmannTreeNode) { + return new RemoteObjectMeta(key); } else { return found; } diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/RemoteObjPusherTxHook.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/RemoteObjPusherTxHook.java index e83bc163..47100484 100644 --- a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/RemoteObjPusherTxHook.java +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/RemoteObjPusherTxHook.java @@ -16,7 +16,7 @@ public class RemoteObjPusherTxHook implements PreCommitTxHook { @Override public void onChange(JObjectKey key, JData old, JData cur) { boolean invalidate = switch (cur) { - case RemoteObject remote -> !remote.meta().changelog().equals(((RemoteObject) old).meta().changelog()); + case RemoteObjectMeta remote -> !remote.changelog().equals(((RemoteObjectMeta) old).changelog()); case JKleppmannTreePersistentData pd -> !pd.queues().equals(((JKleppmannTreePersistentData) old).queues()); default -> false; }; @@ -28,7 +28,7 @@ public class RemoteObjPusherTxHook implements PreCommitTxHook { @Override public void onCreate(JObjectKey key, JData cur) { - if (!(cur instanceof RemoteObject remote)) { + if (!(cur instanceof RemoteObjectMeta remote)) { return; } @@ -37,7 +37,7 @@ public class RemoteObjPusherTxHook implements PreCommitTxHook { @Override public void onDelete(JObjectKey key, JData cur) { - if (!(cur instanceof RemoteObject remote)) { + if (!(cur instanceof RemoteObjectMeta remote)) { return; } } diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/RemoteObject.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/RemoteObject.java deleted file mode 100644 index ce9a99fd..00000000 --- a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/RemoteObject.java +++ /dev/null @@ -1,72 +0,0 @@ -package com.usatiuk.dhfs.objects; - -import org.pcollections.HashTreePSet; -import org.pcollections.PCollection; -import org.pcollections.PMap; -import org.pcollections.TreePMap; - -import javax.annotation.Nullable; -import java.util.Collection; -import java.util.List; - -public record RemoteObject(PCollection refsFrom, boolean frozen, - RemoteObjectMeta meta, @Nullable T data) implements JDataRefcounted { - // Self put - public RemoteObject(T data, PeerId initialPeer) { - this(HashTreePSet.empty(), false, new RemoteObjectMeta(data.key(), data.getClass(), false, initialPeer), data); - } - - public RemoteObject(JObjectKey key, PMap remoteChangelog) { - this(HashTreePSet.empty(), false, new RemoteObjectMeta(key, remoteChangelog), null); - } - - public RemoteObject(JObjectKey key) { - this(HashTreePSet.empty(), false, new RemoteObjectMeta(key, TreePMap.empty()), null); - } - - @Override - public JObjectKey key() { - if (data != null && !data.key().equals(meta.key())) - throw new IllegalStateException("Corrupted object, key mismatch: " + meta.key() + " vs " + data.key()); - return meta.key(); - } - - @Override - public RemoteObject withRefsFrom(PCollection refs) { - return new RemoteObject<>(refs, frozen, meta, data); - } - - @Override - public RemoteObject withFrozen(boolean frozen) { - return new RemoteObject<>(refsFrom, frozen, meta, data); - } - - public RemoteObject withMeta(RemoteObjectMeta meta) { - return new RemoteObject<>(refsFrom, frozen, meta, data); - } - - public RemoteObject withData(T data) { - return new RemoteObject<>(refsFrom, frozen, meta, data); - } - - public RemoteObject withRefsFrom(PCollection refs, boolean frozen) { - return new RemoteObject<>(refs, frozen, meta, data); - } - - public ReceivedObject toReceivedObject() { - if (data == null) - throw new IllegalStateException("Cannot convert to ReceivedObject without data: " + meta.key()); - return new ReceivedObject(meta.key(), meta.changelog(), data); - } - - @Override - public Collection collectRefsTo() { - if (data != null) return data.collectRefsTo(); - return List.of(); - } - - @Override - public int estimateSize() { - return data == null ? 1000 : data.estimateSize(); - } -} diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/RemoteObjectDataWrapper.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/RemoteObjectDataWrapper.java new file mode 100644 index 00000000..12877b9b --- /dev/null +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/RemoteObjectDataWrapper.java @@ -0,0 +1,43 @@ +package com.usatiuk.dhfs.objects; + +import org.pcollections.HashTreePSet; +import org.pcollections.PCollection; + +import java.util.Collection; + +public record RemoteObjectDataWrapper(PCollection refsFrom, + boolean frozen, + T data) implements JDataRefcounted { + public RemoteObjectDataWrapper(T data) { + this(HashTreePSet.empty(), false, data); + } + + @Override + public RemoteObjectDataWrapper withRefsFrom(PCollection refs) { + return new RemoteObjectDataWrapper<>(refs, frozen, data); + } + + @Override + public RemoteObjectDataWrapper withFrozen(boolean frozen) { + return new RemoteObjectDataWrapper<>(refsFrom, frozen, data); + } + + public RemoteObjectDataWrapper withData(T data) { + return new RemoteObjectDataWrapper<>(refsFrom, frozen, data); + } + + @Override + public JObjectKey key() { + return RemoteObjectMeta.ofDataKey(data.key()); + } + + @Override + public Collection collectRefsTo() { + return data.collectRefsTo(); + } + + @Override + public int estimateSize() { + return data.estimateSize(); + } +} diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/RemoteObjectDeleter.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/RemoteObjectDeleter.java index c48559a0..9fe55a39 100644 --- a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/RemoteObjectDeleter.java +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/RemoteObjectDeleter.java @@ -93,17 +93,14 @@ public class RemoteObjectDeleter { // _quickCandidates.add(obj); // } - public void putDeletionCandidate(RemoteObject obj) { - synchronized (_movablesInProcessing) { - if (_movablesInProcessing.contains(obj.key())) return; - if (!obj.meta().seen()) { - if (_quickCandidates.add(obj.key())) - Log.debug("Quick deletion candidate: " + obj.key()); - return; - } - if (_candidates.add(obj.key())) - Log.debug("Deletion candidate: " + obj.key()); + public void putDeletionCandidate(RemoteObjectMeta obj) { + if (!obj.seen()) { + if (_quickCandidates.add(obj.key())) + Log.debug("Quick deletion candidate: " + obj.key()); + return; } + if (_candidates.add(obj.key())) + Log.debug("Deletion candidate: " + obj.key()); } private void asyncProcessMovable(JObjectKey objName) { @@ -118,18 +115,20 @@ public class RemoteObjectDeleter { try { delay = txm.run(() -> { Log.debugv("Starting async processing of remote obj del: {0}", objName); - RemoteObject target = curTx.get(RemoteObject.class, objName).orElse(null); + RemoteObjectMeta target = curTx.get(RemoteObjectMeta.class, objName).orElse(null); if (target == null) return true; + if (!canDelete(target)) return true; - if (canDelete(target)) { + if (canDeleteImmediately(target)) { Log.debugv("Async processing of remote obj del: immediate {0}", objName); curTx.delete(objName); return true; } + var knownHosts = peerInfoService.getPeersNoSelf(); List missing = knownHosts.stream() .map(PeerInfo::id) - .filter(id -> !target.meta().confirmedDeletes().contains(id)).toList(); + .filter(id -> !target.confirmedDeletes().contains(id)).toList(); var ret = remoteObjectServiceClient.canDelete(missing, objName, target.refsFrom()); @@ -148,7 +147,7 @@ public class RemoteObjectDeleter { Log.debugv("Delaying deletion check of {0}", objName); return true; } else { - assert canDelete(target); + assert canDeleteImmediately(target); Log.debugv("Async processing of remote obj del: after query {0}", objName); curTx.delete(objName); return false; @@ -166,15 +165,20 @@ public class RemoteObjectDeleter { }); } + // FIXME: + private boolean canDelete(JDataRefcounted obj) { + return obj.refsFrom().isEmpty() && !obj.frozen(); + } + // Returns true if the object can be deleted - private boolean canDelete(RemoteObject obj) { - if (!obj.meta().seen()) + private boolean canDeleteImmediately(RemoteObjectMeta obj) { + if (!obj.seen()) return true; var knownHosts = peerInfoService.getPeers(); boolean missing = false; for (var x : knownHosts) { - if (!obj.meta().confirmedDeletes().contains(x.id())) { + if (!obj.confirmedDeletes().contains(x.id())) { missing = true; break; } @@ -204,10 +208,12 @@ public class RemoteObjectDeleter { Stream.of(next, nextQuick).filter(Objects::nonNull).forEach(realNext -> { Log.debugv("Processing remote object deletion candidate: {0}", realNext); var deleted = txm.run(() -> { - RemoteObject target = curTx.get(RemoteObject.class, realNext).orElse(null); + RemoteObjectMeta target = curTx.get(RemoteObjectMeta.class, realNext).orElse(null); if (target == null) return true; - if (canDelete(target)) { + if (!canDelete(target)) return true; + + if (canDeleteImmediately(target)) { Log.debugv("Immediate deletion of: {0}", realNext); curTx.delete(realNext); return true; diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/RemoteObjectMeta.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/RemoteObjectMeta.java index ea360a06..6db896a2 100644 --- a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/RemoteObjectMeta.java +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/RemoteObjectMeta.java @@ -1,53 +1,103 @@ package com.usatiuk.dhfs.objects; -import org.pcollections.HashTreePMap; -import org.pcollections.HashTreePSet; -import org.pcollections.PMap; -import org.pcollections.PSet; +import org.pcollections.*; -import java.io.Serializable; +import java.util.Collection; +import java.util.List; -public record RemoteObjectMeta( - JObjectKey key, - PMap knownRemoteVersions, - Class knownType, - PSet confirmedDeletes, - boolean seen, - PMap changelog) implements Serializable { - public RemoteObjectMeta(JObjectKey key, Class type, boolean seen, PeerId initialPeer) { - this(key, HashTreePMap.empty(), type, HashTreePSet.empty(), seen, - HashTreePMap.empty().plus(initialPeer, 1L)); +public record RemoteObjectMeta(PCollection refsFrom, boolean frozen, + JObjectKey key, + PMap knownRemoteVersions, + Class knownType, + PSet confirmedDeletes, + boolean seen, + PMap changelog, + boolean hasLocalData) implements JDataRefcounted { + // Self put + public RemoteObjectMeta(JDataRemote data, PeerId initialPeer) { + this(HashTreePSet.empty(), false, + data.key(), HashTreePMap.empty(), data.getClass(), HashTreePSet.empty(), false, + HashTreePMap.empty().plus(initialPeer, 1L), + true); } public RemoteObjectMeta(JObjectKey key, PMap remoteChangelog) { - this(key, HashTreePMap.empty(), JDataRemote.class, HashTreePSet.empty(), true, remoteChangelog); + this(HashTreePSet.empty(), false, + key, HashTreePMap.empty(), JDataRemote.class, HashTreePSet.empty(), true, + remoteChangelog, + false); + } + + public RemoteObjectMeta(JObjectKey key) { + this(HashTreePSet.empty(), false, + key, HashTreePMap.empty(), JDataRemote.class, HashTreePSet.empty(), true, + TreePMap.empty(), + false); + } + + @Override + public JObjectKey key() { + return ofMetaKey(key); + } + + public static JObjectKey ofMetaKey(JObjectKey key) { + return key; + } + + public static JObjectKey ofDataKey(JObjectKey key) { + return JObjectKey.of(key.name() + "_data"); + } + + public JObjectKey dataKey() { + return ofDataKey(key); + } + + @Override + public RemoteObjectMeta withRefsFrom(PCollection refs) { + return new RemoteObjectMeta(refs, frozen, key, knownRemoteVersions, knownType, confirmedDeletes, seen, changelog, hasLocalData); + } + + @Override + public RemoteObjectMeta withFrozen(boolean frozen) { + return new RemoteObjectMeta(refsFrom, frozen, key, knownRemoteVersions, knownType, confirmedDeletes, seen, changelog, hasLocalData); } public RemoteObjectMeta withKnownRemoteVersions(PMap knownRemoteVersions) { - return new RemoteObjectMeta(key, knownRemoteVersions, knownType, confirmedDeletes, seen, changelog); + return new RemoteObjectMeta(refsFrom, frozen, key, knownRemoteVersions, knownType, confirmedDeletes, seen, changelog, hasLocalData); } public RemoteObjectMeta withKnownType(Class knownType) { - return new RemoteObjectMeta(key, knownRemoteVersions, knownType, confirmedDeletes, seen, changelog); + return new RemoteObjectMeta(refsFrom, frozen, key, knownRemoteVersions, knownType, confirmedDeletes, seen, changelog, hasLocalData); } public RemoteObjectMeta withConfirmedDeletes(PSet confirmedDeletes) { - return new RemoteObjectMeta(key, knownRemoteVersions, knownType, confirmedDeletes, seen, changelog); + return new RemoteObjectMeta(refsFrom, frozen, key, knownRemoteVersions, knownType, confirmedDeletes, seen, changelog, hasLocalData); } public RemoteObjectMeta withSeen(boolean seen) { - return new RemoteObjectMeta(key, knownRemoteVersions, knownType, confirmedDeletes, seen, changelog); + return new RemoteObjectMeta(refsFrom, frozen, key, knownRemoteVersions, knownType, confirmedDeletes, seen, changelog, hasLocalData); } public RemoteObjectMeta withChangelog(PMap changelog) { - return new RemoteObjectMeta(key, knownRemoteVersions, knownType, confirmedDeletes, seen, changelog); + return new RemoteObjectMeta(refsFrom, frozen, key, knownRemoteVersions, knownType, confirmedDeletes, seen, changelog, hasLocalData); } public RemoteObjectMeta withHaveLocal(boolean haveLocal) { - return new RemoteObjectMeta(key, knownRemoteVersions, knownType, confirmedDeletes, seen, changelog); + return new RemoteObjectMeta(refsFrom, frozen, key, knownRemoteVersions, knownType, confirmedDeletes, seen, changelog, haveLocal); } public long versionSum() { return changelog.values().stream().mapToLong(Long::longValue).sum(); } + + @Override + public Collection collectRefsTo() { + if (hasLocalData) return List.of(dataKey()); + return List.of(); + } + + @Override + public int estimateSize() { + return 1000; + } } diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/RemoteTransaction.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/RemoteTransaction.java index 81e53222..8ff36c18 100644 --- a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/RemoteTransaction.java +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/RemoteTransaction.java @@ -27,22 +27,26 @@ public class RemoteTransaction { return curTx.getId(); } - private Optional> tryDownloadRemote(RemoteObject obj) { - MutableObject> success = new MutableObject<>(null); + private Optional> tryDownloadRemote(RemoteObjectMeta obj) { + MutableObject> success = new MutableObject<>(null); try { remoteObjectServiceClient.getObject(obj.key(), rcv -> { - if (!obj.meta().knownType().isInstance(rcv.getRight().data())) - throw new IllegalStateException("Object type mismatch: " + obj.meta().knownType() + " vs " + rcv.getRight().data().getClass()); + if (!obj.knownType().isInstance(rcv.getRight().data())) + throw new IllegalStateException("Object type mismatch: " + obj.knownType() + " vs " + rcv.getRight().data().getClass()); - if (!rcv.getRight().changelog().equals(obj.meta().changelog())) { - var updated = syncHandler.handleRemoteUpdate(rcv.getLeft(), obj.key(), obj, rcv.getRight().changelog()); - if (!rcv.getRight().changelog().equals(updated.meta().changelog())) - throw new IllegalStateException("Changelog mismatch, update failed?: " + rcv.getRight().changelog() + " vs " + updated.meta().changelog()); - success.setValue(updated.withData((T) rcv.getRight().data())); - } else { - success.setValue(obj.withData((T) rcv.getRight().data())); - } + syncHandler.handleRemoteUpdate(rcv.getLeft(), obj.key(), rcv.getRight().changelog(), rcv.getRight().data()); + + var now = curTx.get(RemoteObjectMeta.class, RemoteObjectMeta.ofMetaKey(obj.key())).orElse(null); + assert now != null; + + if (!now.hasLocalData()) + return false; + + var gotData = curTx.get(RemoteObjectDataWrapper.class, RemoteObjectMeta.ofDataKey(obj.key())).orElse(null); + assert gotData != null; + + success.setValue(gotData); return true; }); } catch (Exception e) { @@ -50,60 +54,51 @@ public class RemoteTransaction { return Optional.empty(); } - curTx.put(success.getValue()); return Optional.of(success.getValue()); } @SuppressWarnings("unchecked") - public Optional> get(Class type, JObjectKey key, LockingStrategy strategy) { - return curTx.get(RemoteObject.class, key, strategy) + private Optional getData(Class type, JObjectKey key, LockingStrategy strategy, boolean tryRequest) { + return curTx.get(RemoteObjectMeta.class, RemoteObjectMeta.ofMetaKey(key), strategy) .flatMap(obj -> { - if (obj.data() != null && !type.isInstance(obj.data())) - throw new IllegalStateException("Object (real) type mismatch: " + obj.data().getClass() + " vs " + type); -// FIXME: -// if (!type.isAssignableFrom(obj.meta().knownType())) -// throw new IllegalStateException("Object (meta) type mismatch: " + obj.meta().knownType() + " vs " + type); - - if (obj.data() != null) - return Optional.of(obj); - else - return tryDownloadRemote(obj); + if (obj.hasLocalData()) { + var realData = curTx.get(RemoteObjectDataWrapper.class, RemoteObjectMeta.ofDataKey(key), strategy).orElse(null); + if (realData == null) + throw new IllegalStateException("Local data not found for " + key); // TODO: Race + if (!type.isInstance(realData.data())) + throw new IllegalStateException("Object type mismatch: " + realData.data().getClass() + " vs " + type); + return Optional.of((T) realData.data()); + } + if (!tryRequest) + return Optional.empty(); + return tryDownloadRemote(obj).map(wrapper -> (T) wrapper.data()); }); } public Optional getMeta(JObjectKey key, LockingStrategy strategy) { - return curTx.get(RemoteObject.class, key, strategy).map(obj -> obj.meta()); + return curTx.get(RemoteObjectMeta.class, RemoteObjectMeta.ofMetaKey(key), strategy); } - public Optional getData(Class type, JObjectKey key, LockingStrategy strategy) { - return get(type, key, strategy).map(RemoteObject::data); - } + public void putData(T obj) { + var curMeta = getMeta(obj.key()).orElse(null); - public void put(RemoteObject obj) { - curTx.put(obj); - } - - public void put(T obj) { - var cur = get((Class) obj.getClass(), obj.key()).orElse(null); - - if (cur == null) { - curTx.put(new RemoteObject<>(obj, persistentPeerDataService.getSelfUuid())); + if (curMeta == null) { + curTx.put(new RemoteObjectMeta(obj, persistentPeerDataService.getSelfUuid())); + curTx.put(new RemoteObjectDataWrapper<>(obj)); return; } - if (cur.data() != null && cur.data().equals(obj)) - return; - if (cur.data() != null && !cur.data().getClass().equals(obj.getClass())) - throw new IllegalStateException("Object type mismatch: " + cur.data().getClass() + " vs " + obj.getClass()); - var newMeta = cur.meta(); +// if (cur.data() != null && cur.data().equals(obj)) +// return; + if (!curMeta.knownType().isAssignableFrom(obj.getClass())) + throw new IllegalStateException("Object type mismatch: " + curMeta.knownType() + " vs " + obj.getClass()); + var newMeta = curMeta; newMeta = newMeta.withChangelog(newMeta.changelog().plus(persistentPeerDataService.getSelfUuid(), newMeta.changelog().get(persistentPeerDataService.getSelfUuid()) + 1)); - var newObj = cur.withData(obj).withMeta(newMeta); - curTx.put(newObj); - } - - public Optional> get(Class type, JObjectKey key) { - return get(type, key, LockingStrategy.OPTIMISTIC); + curTx.put(newMeta); + var newData = curTx.get(RemoteObjectDataWrapper.class, RemoteObjectMeta.ofDataKey(obj.key())) + .map(w -> w.withData(obj)).orElse(new RemoteObjectDataWrapper<>(obj)); + curTx.put(newData); } public Optional getMeta(JObjectKey key) { @@ -111,6 +106,18 @@ public class RemoteTransaction { } public Optional getData(Class type, JObjectKey key) { - return getData(type, key, LockingStrategy.OPTIMISTIC); + return getData(type, key, LockingStrategy.OPTIMISTIC, true); + } + + public Optional getDataLocal(Class type, JObjectKey key) { + return getData(type, key, LockingStrategy.OPTIMISTIC, false); + } + + public Optional getData(Class type, JObjectKey key, LockingStrategy strategy) { + return getData(type, key, strategy, true); + } + + public Optional getDataLocal(Class type, JObjectKey key, LockingStrategy strategy) { + return getData(type, key, strategy, false); } } diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/RemoteObjectServiceServer.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/RemoteObjectServiceServer.java index b4cb82c6..ace16a81 100644 --- a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/RemoteObjectServiceServer.java +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/RemoteObjectServiceServer.java @@ -15,6 +15,7 @@ import io.smallrye.common.annotation.Blocking; import io.smallrye.mutiny.Uni; import jakarta.annotation.security.RolesAllowed; import jakarta.inject.Inject; +import org.apache.commons.lang3.tuple.Pair; // Note: RunOnVirtualThread hangs somehow @GrpcService @@ -52,16 +53,32 @@ public class RemoteObjectServiceServer implements DhfsObjectSyncGrpc { public Uni getObject(GetObjectRequest request) { Log.info("<-- getObject: " + request.getName() + " from " + identity.getPrincipal().getName().substring(3)); - var obj = txm.run(() -> { - var got = remoteTx.get(JDataRemote.class, JObjectKey.of(request.getName())).orElse(null); - if (got == null) { - Log.info("<-- getObject NOT FOUND: " + request.getName() + " from " + identity.getPrincipal().getName().substring(3)); - throw new StatusRuntimeException(Status.NOT_FOUND); - } - return got; + + Pair got = txm.run(() -> { + var meta = remoteTx.getMeta(JObjectKey.of(request.getName())).orElse(null); + var obj = remoteTx.getDataLocal(JDataRemote.class, JObjectKey.of(request.getName())).orElse(null); + if (meta != null && !meta.seen()) + curTx.put(meta.withSeen(true)); + if (obj != null) + for (var ref : obj.collectRefsTo()) { + var refMeta = remoteTx.getMeta(ref).orElse(null); + if (refMeta != null && !refMeta.seen()) + curTx.put(refMeta.withSeen(true)); + } + return Pair.of(meta, obj); }); - var serialized = receivedObjectProtoSerializer.serialize(obj.toReceivedObject()); + if ((got.getValue() != null) && (got.getKey() == null)) { + Log.error("Inconsistent state for object meta: " + request.getName()); + throw new StatusRuntimeException(Status.INTERNAL); + } + + if (got.getValue() == null) { + Log.info("<-- getObject NOT FOUND: " + request.getName() + " from " + identity.getPrincipal().getName().substring(3)); + throw new StatusRuntimeException(Status.NOT_FOUND); + } + + var serialized = receivedObjectProtoSerializer.serialize(new ReceivedObject(got.getKey().key(), got.getKey().changelog(), got.getValue())); return Uni.createFrom().item(serialized); // // Does @Blocking break this? // return Uni.createFrom().emitter(emitter -> { @@ -110,7 +127,7 @@ public class RemoteObjectServiceServer implements DhfsObjectSyncGrpc { builder.setObjName(request.getName()); txm.run(() -> { - var obj = curTx.get(RemoteObject.class, JObjectKey.of(request.getName())).orElse(null); + var obj = curTx.get(RemoteObjectMeta.class, JObjectKey.of(request.getName())).orElse(null); if (obj == null) { builder.setDeletionCandidate(true); diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/SyncHandler.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/SyncHandler.java index d8f9516f..aab38104 100644 --- a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/SyncHandler.java +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/SyncHandler.java @@ -1,17 +1,14 @@ package com.usatiuk.dhfs.objects.repository; -import com.usatiuk.dhfs.objects.JDataRemote; -import com.usatiuk.dhfs.objects.JObjectKey; -import com.usatiuk.dhfs.objects.PeerId; -import com.usatiuk.dhfs.objects.RemoteObject; +import com.usatiuk.dhfs.objects.*; import com.usatiuk.dhfs.objects.transaction.Transaction; import io.quarkus.logging.Log; import jakarta.enterprise.context.ApplicationScoped; import jakarta.inject.Inject; +import org.pcollections.HashTreePMap; import org.pcollections.PMap; -import java.util.stream.Collectors; -import java.util.stream.Stream; +import javax.annotation.Nullable; // @@ -92,149 +89,163 @@ public class SyncHandler { // } // - public RemoteObject handleOneUpdate(PeerId from, RemoteObject current, PMap rcvChangelog) { -// if (!rcv.key().equals(current.key())) { -// Log.error("Received update for different object: " + rcv.key() + " from " + from); -// throw new IllegalArgumentException("Received update for different object: " + rcv.key() + " from " + from); +// public RemoteObjectMeta handleOneUpdate(PeerId from, RemoteObjectMeta current, PMap rcvChangelog) { +//// if (!rcv.key().equals(current.key())) { +//// Log.error("Received update for different object: " + rcv.key() + " from " + from); +//// throw new IllegalArgumentException("Received update for different object: " + rcv.key() + " from " + from); +//// } +// +// var receivedTotalVer = rcvChangelog.values().stream().mapToLong(Long::longValue).sum(); +// +// if (current.meta().knownRemoteVersions().getOrDefault(from, 0L) > receivedTotalVer) { +// Log.error("Received older index update than was known for host: " + from + " " + current.key()); +// throw new IllegalStateException(); // FIXME: OutdatedUpdateException // } - - var receivedTotalVer = rcvChangelog.values().stream().mapToLong(Long::longValue).sum(); - - if (current.meta().knownRemoteVersions().getOrDefault(from, 0L) > receivedTotalVer) { - Log.error("Received older index update than was known for host: " + from + " " + current.key()); - throw new IllegalStateException(); // FIXME: OutdatedUpdateException - } - - Log.trace("Handling update: " + current.key() + " from " + from + "\n" + "ours: " + current + " \n" + "received: " + rcvChangelog); - - boolean conflict = false; - boolean updatedRemoteVersion = false; - - var newObj = current; - var curKnownRemoteVersion = current.meta().knownRemoteVersions().get(from); - - if (curKnownRemoteVersion == null || !curKnownRemoteVersion.equals(receivedTotalVer)) - updatedRemoteVersion = true; - - if (updatedRemoteVersion) - newObj = current.withMeta(current.meta().withKnownRemoteVersions( - current.meta().knownRemoteVersions().plus(from, receivedTotalVer) - )); - - - boolean hasLower = false; - boolean hasHigher = false; - for (var e : Stream.concat(current.meta().changelog().keySet().stream(), rcvChangelog.keySet().stream()).collect(Collectors.toUnmodifiableSet())) { - if (rcvChangelog.getOrDefault(e, 0L) < current.meta().changelog().getOrDefault(e, 0L)) - hasLower = true; - if (rcvChangelog.getOrDefault(e, 0L) > current.meta().changelog().getOrDefault(e, 0L)) - hasHigher = true; - } - - if (hasLower && hasHigher) { - Log.info("Conflict on update (inconsistent version): " + current.key() + " from " + from); -// Log. // -// info("Trying conflict resolution: " + header.getName() + " from " + from); -// var found = foundExt.get(); +// Log.trace("Handling update: " + current.key() + " from " + from + "\n" + "ours: " + current + " \n" + "received: " + rcvChangelog); // -// JObjectData theirsData; -// ObjectHeader theirsHeader; -// if (header. hasPushedData()) { -// theirsHeader = header; -// theirsData = dataProtoSerializer. +// boolean conflict = false; +// boolean updatedRemoteVersion = false; // -// deserialize(header.getPushedData()); -// } else { -// var got = remoteObjectServiceClient.getSpecificObject(from, header.getName()); -// theirsData = dataProtoSerializer. +// var newObj = current; +// var curKnownRemoteVersion = current.meta().knownRemoteVersions().get(from); // -// deserialize(got.getRight()); -// theirsHeader = got. +// if (curKnownRemoteVersion == null || !curKnownRemoteVersion.equals(receivedTotalVer)) +// updatedRemoteVersion = true; // -// getLeft(); -// } +// if (updatedRemoteVersion) +// newObj = current.withMeta(current.meta().withKnownRemoteVersions( +// current.meta().knownRemoteVersions().plus(from, receivedTotalVer) +// )); // -// jObjectTxManager. // -// executeTx(() -> { -// var resolverClass = found.runReadLocked(JObjectManager.ResolutionStrategy.LOCAL_ONLY, (m, d) -> { -// if (d == null) -// throw new StatusRuntimeExceptionNoStacktrace(Status.UNAVAILABLE.withDescription("No local data when conflict " + header.getName())); -// return d.getConflictResolver(); -// }); -// var resolver = conflictResolvers.select(resolverClass); -// resolver. -// -// get(). -// -// resolve(from, theirsHeader, theirsData, found); -// }); -// Log. info("Resolved conflict for " + from + " " + header.getName()); -// throw new NotImplementedException(); - } else if (hasLower) { - Log.info("Received older index update than known: " + from + " " + current.key()); -// throw new OutdatedUpdateException(); -// throw new NotImplementedException(); - } else if (hasHigher) { - var newChangelog = rcvChangelog.containsKey(persistentPeerDataService.getSelfUuid()) ? - rcvChangelog : rcvChangelog.plus(persistentPeerDataService.getSelfUuid(), 0L); - - newObj = newObj.withData(null).withMeta(newObj.meta().withChangelog(newChangelog)); -// if (header.hasPushedData()) -// found.externalResolution(dataProtoSerializer.deserialize(header.getPushedData())); - } -// else if (data == null && header.hasPushedData()) { -// found.tryResolve(JObjectManager.ResolutionStrategy.LOCAL_ONLY); -// if (found.getData() == null) -// found.externalResolution(dataProtoSerializer.deserialize(header.getPushedData())); +// boolean hasLower = false; +// boolean hasHigher = false; +// for (var e : Stream.concat(current.meta().changelog().keySet().stream(), rcvChangelog.keySet().stream()).collect(Collectors.toUnmodifiableSet())) { +// if (rcvChangelog.getOrDefault(e, 0L) < current.meta().changelog().getOrDefault(e, 0L)) +// hasLower = true; +// if (rcvChangelog.getOrDefault(e, 0L) > current.meta().changelog().getOrDefault(e, 0L)) +// hasHigher = true; // } +// +// if (hasLower && hasHigher) { +// Log.info("Conflict on update (inconsistent version): " + current.key() + " from " + from); +//// Log. +//// +//// info("Trying conflict resolution: " + header.getName() + " from " + from); +//// var found = foundExt.get(); +//// +//// JObjectData theirsData; +//// ObjectHeader theirsHeader; +//// if (header. hasPushedData()) { +//// theirsHeader = header; +//// theirsData = dataProtoSerializer. +//// +//// deserialize(header.getPushedData()); +//// } else { +//// var got = remoteObjectServiceClient.getSpecificObject(from, header.getName()); +//// theirsData = dataProtoSerializer. +//// +//// deserialize(got.getRight()); +//// theirsHeader = got. +//// +//// getLeft(); +//// } +//// +//// jObjectTxManager. +//// +//// executeTx(() -> { +//// var resolverClass = found.runReadLocked(JObjectManager.ResolutionStrategy.LOCAL_ONLY, (m, d) -> { +//// if (d == null) +//// throw new StatusRuntimeExceptionNoStacktrace(Status.UNAVAILABLE.withDescription("No local data when conflict " + header.getName())); +//// return d.getConflictResolver(); +//// }); +//// var resolver = conflictResolvers.select(resolverClass); +//// resolver. +//// +//// get(). +//// +//// resolve(from, theirsHeader, theirsData, found); +//// }); +//// Log. info("Resolved conflict for " + from + " " + header.getName()); +//// throw new NotImplementedException(); +// } else if (hasLower) { +// Log.info("Received older index update than known: " + from + " " + current.key()); +//// throw new OutdatedUpdateException(); +//// throw new NotImplementedException(); +// } else if (hasHigher) { +// var newChangelog = rcvChangelog.containsKey(persistentPeerDataService.getSelfUuid()) ? +// rcvChangelog : rcvChangelog.plus(persistentPeerDataService.getSelfUuid(), 0L); +// +// newObj = newObj.withData(null).withMeta(newObj.meta().withChangelog(newChangelog)); +//// if (header.hasPushedData()) +//// found.externalResolution(dataProtoSerializer.deserialize(header.getPushedData())); +// } +//// else if (data == null && header.hasPushedData()) { +//// found.tryResolve(JObjectManager.ResolutionStrategy.LOCAL_ONLY); +//// if (found.getData() == null) +//// found.externalResolution(dataProtoSerializer.deserialize(header.getPushedData())); +//// } +// -// assert Objects.equals(receivedTotalVer, md.getOurVersion()); + /// / assert Objects.equals(receivedTotalVer, md.getOurVersion()); +// +// if (!updatedRemoteVersion) +// Log.debug("No action on update: " + current.meta().key() + " from " + from); +// +// return newObj; +// } + public void handleRemoteUpdate(PeerId from, JObjectKey key, PMap receivedChangelog, @Nullable JDataRemote receivedData) { + var current = curTx.get(RemoteObjectMeta.class, key).orElse(null); + if (current == null) { + current = new RemoteObjectMeta(key, HashTreePMap.empty()); + curTx.put(current); + } - if (!updatedRemoteVersion) - Log.debug("No action on update: " + current.meta().key() + " from " + from); + var changelogCompare = SyncHelper.compareChangelogs(current.changelog(), receivedChangelog); - return newObj; - } - - public RemoteObject handleRemoteUpdate(PeerId from, JObjectKey key, RemoteObject current, PMap rcv) { - // TODO: Dedup - try { - if (current == null) { - var obj = new RemoteObject<>(key, rcv); - curTx.put(obj); - current = (RemoteObject) obj; // Will update known remote version too + switch (changelogCompare) { + case EQUAL -> { + Log.debug("No action on update: " + key + " from " + from); + if (!current.hasLocalData() && receivedData != null) { + current = current.withHaveLocal(true); + curTx.put(current); + curTx.put(curTx.get(RemoteObjectDataWrapper.class, RemoteObjectMeta.ofDataKey(current.key())) + .map(w -> w.withData(receivedData)).orElse(new RemoteObjectDataWrapper<>(receivedData))); + } } + case NEWER -> { + Log.debug("Received newer index update than known: " + key + " from " + from); + var newChangelog = receivedChangelog.containsKey(persistentPeerDataService.getSelfUuid()) ? + receivedChangelog : receivedChangelog.plus(persistentPeerDataService.getSelfUuid(), 0L); + current = current.withChangelog(newChangelog); - var newObj = handleOneUpdate(from, current, rcv); - if (newObj != current) { - curTx.put(newObj); + if (receivedData != null) { + current = current.withHaveLocal(true); + curTx.put(current); + curTx.put(curTx.get(RemoteObjectDataWrapper.class, RemoteObjectMeta.ofDataKey(current.key())) + .map(w -> w.withData(receivedData)).orElse(new RemoteObjectDataWrapper<>(receivedData))); + } else { + current = current.withHaveLocal(false); + curTx.put(current); + } + } + case OLDER -> { + Log.debug("Received older index update than known: " + key + " from " + from); + return; + } + case CONFLICT -> { + Log.debug("Conflict on update (inconsistent version): " + key + " from " + from); + // TODO: + return; } - return newObj; -// } catch (OutdatedUpdateException ignored) { -// Log.warn("Outdated update of " + request.getHeader().getName() + " from " + request.getSelfUuid()); -// invalidationQueueService.pushInvalidationToOne(UUID.fromString(request.getSelfUuid()), request.getHeader().getName()); - } catch (Exception ex) { - Log.info("Error when handling update from " + from + " of " + current.meta().key(), ex); - throw ex; } + var curKnownRemoteVersion = current.knownRemoteVersions().get(from); + var receivedTotalVer = receivedChangelog.values().stream().mapToLong(Long::longValue).sum(); -// return IndexUpdateReply.getDefaultInstance(); - } - - protected static class OutdatedUpdateException extends RuntimeException { - OutdatedUpdateException() { - super(); - } - - OutdatedUpdateException(String message) { - super(message); - } - - @Override - public synchronized Throwable fillInStackTrace() { - return this; + if (curKnownRemoteVersion == null || curKnownRemoteVersion < receivedTotalVer) { + current = current.withKnownRemoteVersions(current.knownRemoteVersions().plus(from, receivedTotalVer)); + curTx.put(current); } } } \ No newline at end of file diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/SyncHelper.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/SyncHelper.java new file mode 100644 index 00000000..ceb391e2 --- /dev/null +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/SyncHelper.java @@ -0,0 +1,42 @@ +package com.usatiuk.dhfs.objects.repository; + +import com.usatiuk.dhfs.objects.PeerId; +import org.pcollections.PMap; + +import java.util.stream.Collectors; +import java.util.stream.Stream; + +public class SyncHelper { + public enum ChangelogCmpResult { + EQUAL, + NEWER, + OLDER, + CONFLICT + } + + public static ChangelogCmpResult compareChangelogs(PMap current, PMap other) { + boolean hasLower = false; + boolean hasHigher = false; + for (var e : Stream.concat(current.keySet().stream(), other.keySet().stream()).collect(Collectors.toUnmodifiableSet())) { + if (other.getOrDefault(e, 0L) < current.getOrDefault(e, 0L)) + hasLower = true; + if (other.getOrDefault(e, 0L) > current.getOrDefault(e, 0L)) + hasHigher = true; + } + + if (hasLower && hasHigher) + return ChangelogCmpResult.CONFLICT; + + if (hasLower) + return ChangelogCmpResult.OLDER; + + if (hasHigher) + return ChangelogCmpResult.NEWER; + + return ChangelogCmpResult.EQUAL; + } + +// public static PMap mergeChangelogs(PMap current, PMap other) { +// return current.plusAll(other); +// } +} diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/invalidation/IndexUpdateOp.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/invalidation/IndexUpdateOp.java index e2162139..18ed10b9 100644 --- a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/invalidation/IndexUpdateOp.java +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/invalidation/IndexUpdateOp.java @@ -2,11 +2,11 @@ package com.usatiuk.dhfs.objects.repository.invalidation; import com.usatiuk.dhfs.objects.JObjectKey; import com.usatiuk.dhfs.objects.PeerId; -import com.usatiuk.dhfs.objects.RemoteObject; +import com.usatiuk.dhfs.objects.RemoteObjectMeta; import org.pcollections.PMap; public record IndexUpdateOp(JObjectKey key, PMap changelog) implements Op { - public IndexUpdateOp(RemoteObject object) { - this(object.key(), object.meta().changelog()); + public IndexUpdateOp(RemoteObjectMeta object) { + this(object.key(), object.changelog()); } } diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/invalidation/OpPusher.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/invalidation/OpPusher.java index acf06393..d8d5b336 100644 --- a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/invalidation/OpPusher.java +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/invalidation/OpPusher.java @@ -27,8 +27,8 @@ public class OpPusher { Op info = txm.run(() -> { var obj = curTx.get(JData.class, key).orElse(null); switch (obj) { - case RemoteObject remote -> { - return new IndexUpdateOp(key, remote.meta().changelog()); + case RemoteObjectMeta remote -> { + return new IndexUpdateOp(key, remote.changelog()); } case JKleppmannTreePersistentData pd -> { var maybeQueue = pd.queues().get(op); diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/invalidation/PushOpHandler.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/invalidation/PushOpHandler.java index 8ee79b77..b7c4c48d 100644 --- a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/invalidation/PushOpHandler.java +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/invalidation/PushOpHandler.java @@ -1,6 +1,5 @@ package com.usatiuk.dhfs.objects.repository.invalidation; -import com.usatiuk.dhfs.objects.JDataRemote; import com.usatiuk.dhfs.objects.PeerId; import com.usatiuk.dhfs.objects.RemoteTransaction; import com.usatiuk.dhfs.objects.repository.SyncHandler; @@ -18,8 +17,6 @@ public class PushOpHandler { RemoteTransaction remoteTransaction; public void handlePush(PeerId peer, IndexUpdateOp obj) { - syncHandler.handleRemoteUpdate(peer, obj.key(), - remoteTransaction.get(JDataRemote.class, obj.key()).orElse(null), - obj.changelog()); + syncHandler.handleRemoteUpdate(peer, obj.key(), obj.changelog(), null); } } diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/peersync/PeerInfoService.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/peersync/PeerInfoService.java index be8d8a2a..5398302f 100644 --- a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/peersync/PeerInfoService.java +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/peersync/PeerInfoService.java @@ -72,7 +72,7 @@ public class PeerInfoService { jObjectTxManager.run(() -> { var parent = getTree().traverse(List.of()); var newPeerInfo = new PeerInfo(id, cert); - remoteTx.put(newPeerInfo); + remoteTx.putData(newPeerInfo); getTree().move(parent, new JKleppmannTreeNodeMetaPeer(newPeerInfo.id()), getTree().getNewNodeId()); }); } diff --git a/dhfs-parent/utils/src/main/java/com/usatiuk/dhfs/utils/DataLocker.java b/dhfs-parent/utils/src/main/java/com/usatiuk/dhfs/utils/DataLocker.java index 5648e03e..a45d0b9a 100644 --- a/dhfs-parent/utils/src/main/java/com/usatiuk/dhfs/utils/DataLocker.java +++ b/dhfs-parent/utils/src/main/java/com/usatiuk/dhfs/utils/DataLocker.java @@ -38,6 +38,7 @@ public class DataLocker { private static class LockTag { final Thread owner = Thread.currentThread(); + // final StackTraceElement[] _creationStack = Thread.currentThread().getStackTrace(); boolean released = false; }