Server: fix syncing when moving directories

as with log trimming we might be moving to folders that don't exist
This commit is contained in:
2025-03-27 12:12:51 +01:00
parent 8f7869d87a
commit 038b873364
13 changed files with 141 additions and 27 deletions

View File

@@ -35,5 +35,9 @@
<groupId>org.pcollections</groupId>
<artifactId>pcollections</artifactId>
</dependency>
<dependency>
<groupId>jakarta.annotation</groupId>
<artifactId>jakarta.annotation-api</artifactId>
</dependency>
</dependencies>
</project>

View File

@@ -1,5 +1,7 @@
package com.usatiuk.kleppmanntree;
import jakarta.annotation.Nonnull;
import jakarta.annotation.Nullable;
import org.apache.commons.lang3.tuple.Pair;
import java.util.*;
@@ -53,18 +55,20 @@ public class KleppmannTree<TimestampT extends Comparable<TimestampT>, PeerIdT ex
var node = _storage.getById(effect.childId());
var curParent = _storage.getById(effect.newParentId());
{
var newCurParentChildren = curParent.children().minus(node.meta().getName());
var newCurParentChildren = curParent.children().minus(node.name());
curParent = curParent.withChildren(newCurParentChildren);
_storage.putNode(curParent);
}
if (!node.meta().getClass().equals(effect.oldInfo().oldMeta().getClass()))
if (effect.oldInfo().oldMeta() != null
&& node.meta() != null
&& !node.meta().getClass().equals(effect.oldInfo().oldMeta().getClass()))
throw new IllegalArgumentException("Class mismatch for meta for node " + node.key());
// Needs to be read after changing curParent, as it might be the same node
var oldParent = _storage.getById(effect.oldInfo().oldParent());
{
var newOldParentChildren = oldParent.children().plus(effect.oldInfo().oldMeta().getName(), node.key());
var newOldParentChildren = oldParent.children().plus(effect.oldName(), node.key());
oldParent = oldParent.withChildren(newOldParentChildren);
_storage.putNode(oldParent);
}
@@ -77,7 +81,7 @@ public class KleppmannTree<TimestampT extends Comparable<TimestampT>, PeerIdT ex
var node = _storage.getById(effect.childId());
var curParent = _storage.getById(effect.newParentId());
{
var newCurParentChildren = curParent.children().minus(node.meta().getName());
var newCurParentChildren = curParent.children().minus(node.name());
curParent = curParent.withChildren(newCurParentChildren);
_storage.putNode(curParent);
}
@@ -141,8 +145,8 @@ public class KleppmannTree<TimestampT extends Comparable<TimestampT>, PeerIdT ex
}
}
}
}
if (!inTrash.isEmpty()) {
var trash = _storage.getById(_storage.getTrashId());
for (var n : inTrash) {
@@ -307,7 +311,7 @@ public class KleppmannTree<TimestampT extends Comparable<TimestampT>, PeerIdT ex
node = _storage.getById(effect.childId());
}
if (oldParentNode != null) {
var newOldParentChildren = oldParentNode.children().minus(effect.oldInfo().oldMeta().getName());
var newOldParentChildren = oldParentNode.children().minus(effect.oldName());
oldParentNode = oldParentNode.withChildren(newOldParentChildren);
_storage.putNode(oldParentNode);
}
@@ -316,12 +320,12 @@ public class KleppmannTree<TimestampT extends Comparable<TimestampT>, PeerIdT ex
newParentNode = _storage.getById(effect.newParentId());
{
var newNewParentChildren = newParentNode.children().plus(effect.newMeta().getName(), effect.childId());
var newNewParentChildren = newParentNode.children().plus(effect.newName(), effect.childId());
newParentNode = newParentNode.withChildren(newNewParentChildren);
_storage.putNode(newParentNode);
}
if (effect.newParentId().equals(_storage.getTrashId()) &&
!Objects.equals(effect.newMeta().getName(), effect.childId().toString()))
!Objects.equals(effect.newName(), effect.childId().toString()))
throw new IllegalArgumentException("Move to trash should have id of node as name");
_storage.putNode(
node.withParent(effect.newParentId())
@@ -338,17 +342,32 @@ public class KleppmannTree<TimestampT extends Comparable<TimestampT>, PeerIdT ex
NodeIdT newParentId = op.newParentId();
TreeNode<TimestampT, PeerIdT, MetaT, NodeIdT> newParent = _storage.getById(newParentId);
if (newParent == null) {
LOGGER.log(Level.SEVERE, "New parent not found " + op.newMeta().getName() + " " + op.childId());
return new LogRecord<>(op, null);
LOGGER.log(Level.SEVERE, "New parent not found " + op.newName() + " " + op.childId());
// Creation
if (oldParentId == null) {
LOGGER.severe(() -> "Creating both dummy parent and child node");
return new LogRecord<>(op, List.of(
new LogEffect<>(null, op, _storage.getLostFoundId(), null, newParentId),
new LogEffect<>(null, op, newParentId, op.newMeta(), op.childId())
));
} else {
LOGGER.severe(() -> "Moving child node to dummy parent");
return new LogRecord<>(op, List.of(
new LogEffect<>(null, op, _storage.getLostFoundId(), null, newParentId),
new LogEffect<>(new LogEffectOld<>(node.lastEffectiveOp(), oldParentId, node.meta()), op, op.newParentId(), op.newMeta(), op.childId())
));
}
}
if (oldParentId == null) {
var conflictNodeId = newParent.children().get(op.newMeta().getName());
var conflictNodeId = newParent.children().get(op.newName());
if (conflictNodeId != null) {
if (failCreatingIfExists)
throw new AlreadyExistsException("Already exists: " + op.newMeta().getName() + ": " + conflictNodeId);
throw new AlreadyExistsException("Already exists: " + op.newName() + ": " + conflictNodeId);
var conflictNode = _storage.getById(conflictNodeId);
MetaT conflictNodeMeta = conflictNode.meta();
@@ -359,8 +378,8 @@ public class KleppmannTree<TimestampT extends Comparable<TimestampT>, PeerIdT ex
LOGGER.finer(() -> "Node creation conflict: " + conflictNode);
String newConflictNodeName = conflictNodeMeta.getName() + ".conflict." + conflictNode.key();
String newOursName = op.newMeta().getName() + ".conflict." + op.childId();
String newConflictNodeName = op.newName() + ".conflict." + conflictNode.key();
String newOursName = op.newName() + ".conflict." + op.childId();
return new LogRecord<>(op, List.of(
new LogEffect<>(new LogEffectOld<>(conflictNode.lastEffectiveOp(), newParentId, conflictNodeMeta), conflictNode.lastEffectiveOp(), newParentId, (MetaT) conflictNodeMeta.withName(newConflictNodeName), conflictNodeId),
new LogEffect<>(null, op, op.newParentId(), (MetaT) op.newMeta().withName(newOursName), op.childId())
@@ -378,11 +397,13 @@ public class KleppmannTree<TimestampT extends Comparable<TimestampT>, PeerIdT ex
}
MetaT oldMeta = node.meta();
if (!oldMeta.getClass().equals(op.newMeta().getClass())) {
if (oldMeta != null
&& op.newMeta() != null
&& !oldMeta.getClass().equals(op.newMeta().getClass())) {
LOGGER.log(Level.SEVERE, "Class mismatch for meta for node " + node.key());
return new LogRecord<>(op, null);
}
var replaceNodeId = newParent.children().get(op.newMeta().getName());
var replaceNodeId = newParent.children().get(op.newName());
if (replaceNodeId != null) {
var replaceNode = _storage.getById(replaceNodeId);
var replaceNodeMeta = replaceNode.meta();
@@ -454,18 +475,18 @@ public class KleppmannTree<TimestampT extends Comparable<TimestampT>, PeerIdT ex
walkTree(node -> {
var op = node.lastEffectiveOp();
if (node.lastEffectiveOp() == null) return;
LOGGER.info("visited bootstrap op for " + host + ": " + op.timestamp().toString() + " " + op.newMeta().getName() + " " + op.childId() + "->" + op.newParentId());
LOGGER.info("visited bootstrap op for " + host + ": " + op.timestamp().toString() + " " + op.newName() + " " + op.childId() + "->" + op.newParentId());
result.put(node.lastEffectiveOp().timestamp(), node.lastEffectiveOp());
});
for (var le : _storage.getLog().getAll()) {
var op = le.getValue().op();
LOGGER.info("bootstrap op from log for " + host + ": " + op.timestamp().toString() + " " + op.newMeta().getName() + " " + op.childId() + "->" + op.newParentId());
LOGGER.info("bootstrap op from log for " + host + ": " + op.timestamp().toString() + " " + op.newName() + " " + op.childId() + "->" + op.newParentId());
result.put(le.getKey(), le.getValue().op());
}
for (var op : result.values()) {
LOGGER.info("Recording bootstrap op for " + host + ": " + op.timestamp().toString() + " " + op.newMeta().getName() + " " + op.childId() + "->" + op.newParentId());
LOGGER.info("Recording bootstrap op for " + host + ": " + op.timestamp().toString() + " " + op.newName() + " " + op.childId() + "->" + op.newParentId());
_opRecorder.recordOpForPeer(host, op);
}
}

View File

@@ -8,4 +8,17 @@ public record LogEffect<TimestampT extends Comparable<TimestampT>, PeerIdT exten
NodeIdT newParentId,
MetaT newMeta,
NodeIdT childId) implements Serializable {
public String oldName() {
if (oldInfo.oldMeta() != null) {
return oldInfo.oldMeta().getName();
}
return childId.toString();
}
public String newName() {
if (newMeta != null) {
return newMeta.getName();
}
return childId.toString();
}
}

View File

@@ -5,4 +5,9 @@ import java.io.Serializable;
public record OpMove<TimestampT extends Comparable<TimestampT>, PeerIdT extends Comparable<PeerIdT>, MetaT extends NodeMeta, NodeIdT>
(CombinedTimestamp<TimestampT, PeerIdT> timestamp, NodeIdT newParentId, MetaT newMeta,
NodeIdT childId) implements Serializable {
public String newName() {
if (newMeta != null)
return newMeta.getName();
return childId.toString();
}
}

View File

@@ -9,6 +9,8 @@ public interface StorageInterface<
NodeIdT getTrashId();
NodeIdT getLostFoundId();
NodeIdT getNewNodeId();
TreeNode<TimestampT, PeerIdT, MetaT, NodeIdT> getById(NodeIdT id);

View File

@@ -1,5 +1,6 @@
package com.usatiuk.kleppmanntree;
import jakarta.annotation.Nullable;
import org.pcollections.PMap;
import java.io.Serializable;
@@ -11,8 +12,15 @@ public interface TreeNode<TimestampT extends Comparable<TimestampT>, PeerIdT ext
OpMove<TimestampT, PeerIdT, MetaT, NodeIdT> lastEffectiveOp();
@Nullable
MetaT meta();
default String name() {
var meta = meta();
if (meta != null) return meta.getName();
return key().toString();
}
PMap<String, NodeIdT> children();
TreeNode<TimestampT, PeerIdT, MetaT, NodeIdT> withParent(NodeIdT parent);

View File

@@ -147,4 +147,19 @@ public class KleppmanTreeSimpleTest {
var r1 = testNode1.getRecorded();
Assertions.assertEquals(1, r1.size());
}
@Test
void externalOpWithDummy() {
Long d1id = testNode1._storageInterface.getNewNodeId();
Long f1id = testNode1._storageInterface.getNewNodeId();
testNode1._tree.applyExternalOp(2L, new OpMove<>(
new CombinedTimestamp<>(2L, 2L), d1id, new TestNodeMetaFile("Hi", 123), f1id
));
testNode1._tree.applyExternalOp(2L, new OpMove<>(
new CombinedTimestamp<>(3L, 2L), testNode1._storageInterface.getRootId(), new TestNodeMetaDir("HiDir"), d1id
));
Assertions.assertEquals(f1id, testNode1._tree.traverse(List.of("HiDir", "Hi")));
}
}

View File

@@ -14,6 +14,7 @@ public class TestStorageInterface implements StorageInterface<Long, Long, TestNo
_peerId = peerId;
_nodes.put(getRootId(), new TestTreeNode(getRootId(), null, null));
_nodes.put(getTrashId(), new TestTreeNode(getTrashId(), null, null));
_nodes.put(getLostFoundId(), new TestTreeNode(getLostFoundId(), null, null));
}
@Override
@@ -26,6 +27,11 @@ public class TestStorageInterface implements StorageInterface<Long, Long, TestNo
return -1L;
}
@Override
public Long getLostFoundId() {
return -2L;
}
@Override
public Long getNewNodeId() {
return _curId++ | _peerId << 32;

View File

@@ -2,10 +2,7 @@ package com.usatiuk.dhfs.objects.jkleppmanntree;
import com.usatiuk.dhfs.objects.JObjectKey;
import com.usatiuk.dhfs.objects.PeerId;
import com.usatiuk.dhfs.objects.jkleppmanntree.structs.JKleppmannTreeNode;
import com.usatiuk.dhfs.objects.jkleppmanntree.structs.JKleppmannTreeNodeMeta;
import com.usatiuk.dhfs.objects.jkleppmanntree.structs.JKleppmannTreeNodeMetaDirectory;
import com.usatiuk.dhfs.objects.jkleppmanntree.structs.JKleppmannTreePersistentData;
import com.usatiuk.dhfs.objects.jkleppmanntree.structs.*;
import com.usatiuk.dhfs.objects.repository.PersistentPeerDataService;
import com.usatiuk.dhfs.objects.repository.invalidation.Op;
import com.usatiuk.dhfs.objects.repository.peersync.PeerInfoService;
@@ -56,6 +53,8 @@ public class JKleppmannTreeManager {
curTx.put(rootNode);
var trashNode = new JKleppmannTreeNode(JObjectKey.of(name.name() + "_jt_trash"), null, new JKleppmannTreeNodeMetaDirectory(""));
curTx.put(trashNode);
var lf_node = new JKleppmannTreeNode(JObjectKey.of(name.name() + "_jt_lf"), null, new JKleppmannTreeNodeMetaDirectory(""));
curTx.put(lf_node);
}
return new JKleppmannTree(data);
// opObjectRegistry.registerObject(tree);
@@ -267,6 +266,11 @@ public class JKleppmannTreeManager {
return new JObjectKey(_treeName.name() + "_jt_trash");
}
@Override
public JObjectKey getLostFoundId() {
return new JObjectKey(_treeName.name() + "_jt_lf");
}
@Override
public JObjectKey getNewNodeId() {
return new JObjectKey(UUID.randomUUID().toString());

View File

@@ -61,9 +61,10 @@ public record JKleppmannTreeNode(JObjectKey key, PCollection<JDataRef> refsFrom,
public Collection<JObjectKey> collectRefsTo() {
return Stream.<JObjectKey>concat(children().values().stream(),
switch (meta()) {
case JKleppmannTreeNodeMetaDirectory dir -> Stream.of();
case JKleppmannTreeNodeMetaDirectory dir -> Stream.<JObjectKey>empty();
case JKleppmannTreeNodeMetaFile file -> Stream.of(file.getFileIno());
case JKleppmannTreeNodeMetaPeer peer -> Stream.of(peer.getPeerId());
case null -> Stream.<JObjectKey>empty();
default -> throw new IllegalStateException("Unexpected value: " + meta());
}
).collect(Collectors.toUnmodifiableSet());

View File

@@ -49,6 +49,6 @@ public record JKleppmannTreePersistentData(
@Override
public Collection<JObjectKey> collectRefsTo() {
return List.of(new JObjectKey(key().name() + "_jt_trash"), new JObjectKey(key().name() + "_jt_root"));
return List.of(new JObjectKey(key().name() + "_jt_trash"), new JObjectKey(key().name() + "_jt_root"), new JObjectKey(key().name() + "_jt_lf"));
}
}

View File

@@ -34,7 +34,7 @@ dhfs.objects.opsender.batch-size=100
dhfs.objects.lock_timeout_secs=2
dhfs.local-discovery=true
dhfs.peerdiscovery.timeout=5000
quarkus.log.category."com.usatiuk.dhfs".min-level=TRACE
quarkus.log.category."com.usatiuk.dhfs".level=TRACE
quarkus.log.category."com.usatiuk".min-level=TRACE
quarkus.log.category."com.usatiuk".level=TRACE
quarkus.http.insecure-requests=enabled
quarkus.http.ssl.client-auth=required

View File

@@ -133,4 +133,39 @@ public class ResyncIT {
});
}
@Test
void folderAfterMove() throws IOException, InterruptedException, TimeoutException {
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "mkdir /root/dhfs_default/fuse/testd1").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo tesempty1 > /root/dhfs_default/fuse/testd1/testf1").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "mv /root/dhfs_default/fuse/testd1 /root/dhfs_default/fuse/testd2").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo tesempty2 > /root/dhfs_default/fuse/testd2/testf2").getExitCode());
c1uuid = container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/data/stuff/self_uuid").getStdout();
c2uuid = container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/data/stuff/self_uuid").getStdout();
Assertions.assertDoesNotThrow(() -> UUID.fromString(c1uuid));
Assertions.assertDoesNotThrow(() -> UUID.fromString(c2uuid));
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("New address"), 60, TimeUnit.SECONDS);
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("New address"), 60, TimeUnit.SECONDS);
var c1curl = container1.execInContainer("/bin/sh", "-c",
"curl --header \"Content-Type: application/json\" " +
" --request PUT " +
" --data '{\"uuid\":\"" + c2uuid + "\"}' " +
" http://localhost:8080/objects-manage/known-peers");
var c2curl = container2.execInContainer("/bin/sh", "-c",
"curl --header \"Content-Type: application/json\" " +
" --request PUT " +
" --data '{\"uuid\":\"" + c1uuid + "\"}' " +
" http://localhost:8080/objects-manage/known-peers");
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
await().atMost(45, TimeUnit.SECONDS).until(() -> "tesempty1\n".equals(container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testd2/testf1").getStdout()));
await().atMost(45, TimeUnit.SECONDS).until(() -> "tesempty2\n".equals(container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testd2/testf2").getStdout()));
}
}