mirror of
https://github.com/usatiuk/dhfs.git
synced 2025-10-29 04:57:48 +01:00
Compare commits
3 Commits
8559c9b984
...
e0b4f97349
| Author | SHA1 | Date | |
|---|---|---|---|
| e0b4f97349 | |||
| 035f64df5a | |||
| 4c5fd91050 |
@@ -1 +0,0 @@
|
||||
quarkus.package.jar.decompiler.enabled=true
|
||||
@@ -187,16 +187,14 @@ public class KleppmannTree<TimestampT extends Comparable<TimestampT>, PeerIdT ex
|
||||
return true;
|
||||
}
|
||||
|
||||
public boolean updateExternalTimestamp(PeerIdT from, TimestampT timestamp) {
|
||||
// TODO: Ideally no point in this separate locking?
|
||||
public void updateExternalTimestamp(PeerIdT from, TimestampT timestamp) {
|
||||
var gotExt = _storage.getPeerTimestampLog().getForPeer(from);
|
||||
var gotSelf = _storage.getPeerTimestampLog().getForPeer(_peers.getSelfId());
|
||||
if ((gotExt != null && gotExt.compareTo(timestamp) >= 0)
|
||||
&& (gotSelf != null && gotSelf.compareTo(_clock.peekTimestamp()) >= 0)) return false;
|
||||
updateTimestampImpl(_peers.getSelfId(), _clock.peekTimestamp()); // FIXME:? Kind of a hack?
|
||||
updateTimestampImpl(from, timestamp);
|
||||
if (!(gotExt != null && gotExt.compareTo(timestamp) >= 0))
|
||||
updateTimestampImpl(from, timestamp);
|
||||
if (!(gotSelf != null && gotSelf.compareTo(_clock.peekTimestamp()) >= 0))
|
||||
updateTimestampImpl(_peers.getSelfId(), _clock.peekTimestamp()); // FIXME:? Kind of a hack?
|
||||
tryTrimLog();
|
||||
return true;
|
||||
}
|
||||
|
||||
private void applyOp(PeerIdT from, OpMove<TimestampT, PeerIdT, MetaT, NodeIdT> op, boolean failCreatingIfExists) {
|
||||
|
||||
@@ -4,7 +4,6 @@ dhfs.objects.lru.limit=134217728
|
||||
dhfs.objects.lru.print-stats=true
|
||||
dhfs.objects.lock_timeout_secs=15
|
||||
dhfs.objects.persistence.files.root=${HOME}/dhfs_default/data/objs
|
||||
quarkus.package.jar.decompiler.enabled=true
|
||||
dhfs.objects.persistence.snapshot-extra-checks=false
|
||||
dhfs.objects.transaction.never-lock=true
|
||||
quarkus.log.category."com.usatiuk.dhfs.objects.iterators".level=INFO
|
||||
|
||||
@@ -166,6 +166,11 @@
|
||||
<forkCount>1C</forkCount>
|
||||
<reuseForks>false</reuseForks>
|
||||
<parallel>classes</parallel>
|
||||
<systemPropertyVariables>
|
||||
<junit.jupiter.execution.parallel.enabled>
|
||||
false
|
||||
</junit.jupiter.execution.parallel.enabled>
|
||||
</systemPropertyVariables>
|
||||
</configuration>
|
||||
</plugin>
|
||||
<plugin>
|
||||
|
||||
@@ -14,7 +14,7 @@ public interface JDataRefcounted extends JData {
|
||||
|
||||
JDataRefcounted withFrozen(boolean frozen);
|
||||
|
||||
default Collection<JDataRef> collectRefsTo() {
|
||||
default Collection<JObjectKey> collectRefsTo() {
|
||||
return List.of();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -33,16 +33,16 @@ public class RefcounterTxHook implements PreCommitTxHook {
|
||||
|
||||
for (var curRef : curRefs) {
|
||||
if (!oldRefs.contains(curRef)) {
|
||||
var referenced = getRef(curRef.obj());
|
||||
curTx.put(referenced.withRefsFrom(referenced.refsFrom().plus(new JDataNormalRef(curRef.obj()))));
|
||||
var referenced = getRef(curRef);
|
||||
curTx.put(referenced.withRefsFrom(referenced.refsFrom().plus(new JDataNormalRef(key))));
|
||||
Log.tracev("Added ref from {0} to {1}", key, curRef);
|
||||
}
|
||||
}
|
||||
|
||||
for (var oldRef : oldRefs) {
|
||||
if (!curRefs.contains(oldRef)) {
|
||||
var referenced = getRef(oldRef.obj());
|
||||
curTx.put(referenced.withRefsFrom(referenced.refsFrom().minus(new JDataNormalRef(oldRef.obj()))));
|
||||
var referenced = getRef(oldRef);
|
||||
curTx.put(referenced.withRefsFrom(referenced.refsFrom().minus(new JDataNormalRef(key))));
|
||||
Log.tracev("Removed ref from {0} to {1}", key, oldRef);
|
||||
}
|
||||
}
|
||||
@@ -55,8 +55,8 @@ public class RefcounterTxHook implements PreCommitTxHook {
|
||||
}
|
||||
|
||||
for (var newRef : refCur.collectRefsTo()) {
|
||||
var referenced = getRef(newRef.obj());
|
||||
curTx.put(referenced.withRefsFrom(referenced.refsFrom().plus(new JDataNormalRef(newRef.obj()))));
|
||||
var referenced = getRef(newRef);
|
||||
curTx.put(referenced.withRefsFrom(referenced.refsFrom().plus(new JDataNormalRef(key))));
|
||||
Log.tracev("Added ref from {0} to {1}", key, newRef);
|
||||
}
|
||||
}
|
||||
@@ -68,8 +68,8 @@ public class RefcounterTxHook implements PreCommitTxHook {
|
||||
}
|
||||
|
||||
for (var removedRef : refCur.collectRefsTo()) {
|
||||
var referenced = getRef(removedRef.obj());
|
||||
curTx.put(referenced.withRefsFrom(referenced.refsFrom().minus(new JDataNormalRef(removedRef.obj()))));
|
||||
var referenced = getRef(removedRef);
|
||||
curTx.put(referenced.withRefsFrom(referenced.refsFrom().minus(new JDataNormalRef(key))));
|
||||
Log.tracev("Removed ref from {0} to {1}", key, removedRef);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -32,8 +32,8 @@ public record RemoteObjectDataWrapper<T extends JDataRemote>(PCollection<JDataRe
|
||||
}
|
||||
|
||||
@Override
|
||||
public Collection<JDataRef> collectRefsTo() {
|
||||
return data.collectRefsTo().stream().<JDataRef>map(JDataNormalRef::new).toList();
|
||||
public Collection<JObjectKey> collectRefsTo() {
|
||||
return data.collectRefsTo();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -91,8 +91,8 @@ public record RemoteObjectMeta(PCollection<JDataRef> refsFrom, boolean frozen,
|
||||
}
|
||||
|
||||
@Override
|
||||
public Collection<JDataRef> collectRefsTo() {
|
||||
if (hasLocalData) return List.of(new JDataNormalRef(dataKey()));
|
||||
public Collection<JObjectKey> collectRefsTo() {
|
||||
if (hasLocalData) return List.of(dataKey());
|
||||
return List.of();
|
||||
}
|
||||
|
||||
|
||||
@@ -6,6 +6,7 @@ import com.usatiuk.dhfs.objects.jkleppmanntree.structs.JKleppmannTreeNode;
|
||||
import com.usatiuk.dhfs.objects.jkleppmanntree.structs.JKleppmannTreeNodeMeta;
|
||||
import com.usatiuk.dhfs.objects.jkleppmanntree.structs.JKleppmannTreeNodeMetaDirectory;
|
||||
import com.usatiuk.dhfs.objects.jkleppmanntree.structs.JKleppmannTreePersistentData;
|
||||
import com.usatiuk.dhfs.objects.repository.PersistentPeerDataService;
|
||||
import com.usatiuk.dhfs.objects.repository.invalidation.Op;
|
||||
import com.usatiuk.dhfs.objects.repository.peersync.PeerInfoService;
|
||||
import com.usatiuk.dhfs.objects.transaction.LockingStrategy;
|
||||
@@ -34,6 +35,8 @@ public class JKleppmannTreeManager {
|
||||
JKleppmannTreePeerInterface peerInterface;
|
||||
@Inject
|
||||
PeerInfoService peerInfoService;
|
||||
@Inject
|
||||
PersistentPeerDataService persistentPeerDataService;
|
||||
|
||||
public JKleppmannTree getTree(JObjectKey name, LockingStrategy lockingStrategy) {
|
||||
return txManager.executeTx(() -> {
|
||||
@@ -113,6 +116,9 @@ public class JKleppmannTreeManager {
|
||||
|
||||
// @Override
|
||||
public void commitOpForHost(PeerId host, Op op) {
|
||||
if (op instanceof JKleppmannTreePeriodicPushOp)
|
||||
return;
|
||||
|
||||
if (!(op instanceof JKleppmannTreeOpWrapper jop))
|
||||
throw new IllegalArgumentException("Invalid incoming op type for JKleppmannTree: " + op.getClass());
|
||||
|
||||
@@ -133,9 +139,10 @@ public class JKleppmannTreeManager {
|
||||
}
|
||||
|
||||
// @Override
|
||||
public boolean acceptExternalOp(PeerId from, Op op) {
|
||||
if (op instanceof JKleppmannTreePeriodicPushOp pushOp) {
|
||||
return _tree.updateExternalTimestamp(pushOp.getFrom(), pushOp.getTimestamp());
|
||||
public void acceptExternalOp(PeerId from, Op op) {
|
||||
if (op instanceof JKleppmannTreePeriodicPushOp(JObjectKey treeName, PeerId from1, long timestamp)) {
|
||||
_tree.updateExternalTimestamp(from1, timestamp);
|
||||
return;
|
||||
}
|
||||
|
||||
if (!(op instanceof JKleppmannTreeOpWrapper jop))
|
||||
@@ -192,13 +199,11 @@ public class JKleppmannTreeManager {
|
||||
// }
|
||||
// }
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
// @Override
|
||||
// public Op getPeriodicPushOp() {
|
||||
// return new JKleppmannTreePeriodicPushOp(persistentPeerDataService.getSelfUuid(), _clock.peekTimestamp());
|
||||
// }
|
||||
public Op getPeriodicPushOp() {
|
||||
return new JKleppmannTreePeriodicPushOp(_treeName, persistentPeerDataService.getSelfUuid(), _clock.peekTimestamp());
|
||||
}
|
||||
|
||||
// @Override
|
||||
// public void addToTx() {
|
||||
|
||||
@@ -1,26 +1,17 @@
|
||||
package com.usatiuk.dhfs.objects.jkleppmanntree;
|
||||
|
||||
import com.usatiuk.dhfs.objects.JObjectKey;
|
||||
import com.usatiuk.dhfs.objects.PeerId;
|
||||
import com.usatiuk.dhfs.objects.repository.invalidation.Op;
|
||||
|
||||
public class JKleppmannTreePeriodicPushOp {
|
||||
private final PeerId _from;
|
||||
private final long _timestamp;
|
||||
import java.io.Serializable;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
|
||||
public JKleppmannTreePeriodicPushOp(PeerId from, long timestamp) {
|
||||
_from = from;
|
||||
_timestamp = timestamp;
|
||||
public record JKleppmannTreePeriodicPushOp(JObjectKey treeName, PeerId from,
|
||||
long timestamp) implements Op, Serializable {
|
||||
@Override
|
||||
public Collection<JObjectKey> getEscapedRefs() {
|
||||
return List.of();
|
||||
}
|
||||
|
||||
public PeerId getFrom() {
|
||||
return _from;
|
||||
}
|
||||
|
||||
public long getTimestamp() {
|
||||
return _timestamp;
|
||||
}
|
||||
|
||||
// @Override
|
||||
// public Collection<String> getEscapedRefs() {
|
||||
// return List.of();
|
||||
// }
|
||||
}
|
||||
|
||||
@@ -1,6 +1,9 @@
|
||||
package com.usatiuk.dhfs.objects.jkleppmanntree.structs;
|
||||
|
||||
import com.usatiuk.dhfs.objects.*;
|
||||
import com.usatiuk.dhfs.objects.JDataRef;
|
||||
import com.usatiuk.dhfs.objects.JDataRefcounted;
|
||||
import com.usatiuk.dhfs.objects.JObjectKey;
|
||||
import com.usatiuk.dhfs.objects.PeerId;
|
||||
import com.usatiuk.dhfs.objects.repository.peersync.structs.JKleppmannTreeNodeMetaPeer;
|
||||
import com.usatiuk.kleppmanntree.OpMove;
|
||||
import com.usatiuk.kleppmanntree.TreeNode;
|
||||
@@ -55,12 +58,12 @@ public record JKleppmannTreeNode(JObjectKey key, PCollection<JDataRef> refsFrom,
|
||||
}
|
||||
|
||||
@Override
|
||||
public Collection<JDataRef> collectRefsTo() {
|
||||
return Stream.<JDataRef>concat(children().values().stream().map(JDataNormalRef::new),
|
||||
public Collection<JObjectKey> collectRefsTo() {
|
||||
return Stream.<JObjectKey>concat(children().values().stream(),
|
||||
switch (meta()) {
|
||||
case JKleppmannTreeNodeMetaDirectory dir -> Stream.<JDataNormalRef>of();
|
||||
case JKleppmannTreeNodeMetaFile file -> Stream.of(new JDataNormalRef(file.getFileIno()));
|
||||
case JKleppmannTreeNodeMetaPeer peer -> Stream.of(new JDataNormalRef(peer.getPeerId()));
|
||||
case JKleppmannTreeNodeMetaDirectory dir -> Stream.of();
|
||||
case JKleppmannTreeNodeMetaFile file -> Stream.of(file.getFileIno());
|
||||
case JKleppmannTreeNodeMetaPeer peer -> Stream.of(peer.getPeerId());
|
||||
default -> throw new IllegalStateException("Unexpected value: " + meta());
|
||||
}
|
||||
).collect(Collectors.toUnmodifiableSet());
|
||||
|
||||
@@ -1,6 +1,9 @@
|
||||
package com.usatiuk.dhfs.objects.jkleppmanntree.structs;
|
||||
|
||||
import com.usatiuk.dhfs.objects.*;
|
||||
import com.usatiuk.dhfs.objects.JDataRef;
|
||||
import com.usatiuk.dhfs.objects.JDataRefcounted;
|
||||
import com.usatiuk.dhfs.objects.JObjectKey;
|
||||
import com.usatiuk.dhfs.objects.PeerId;
|
||||
import com.usatiuk.kleppmanntree.CombinedTimestamp;
|
||||
import com.usatiuk.kleppmanntree.LogRecord;
|
||||
import com.usatiuk.kleppmanntree.OpMove;
|
||||
@@ -45,8 +48,7 @@ public record JKleppmannTreePersistentData(
|
||||
}
|
||||
|
||||
@Override
|
||||
public Collection<JDataRef> collectRefsTo() {
|
||||
return List.of(new JObjectKey(key().name() + "_jt_trash"), new JObjectKey(key().name() + "_jt_root"))
|
||||
.stream().<JDataRef>map(JDataNormalRef::new).toList();
|
||||
public Collection<JObjectKey> collectRefsTo() {
|
||||
return List.of(new JObjectKey(key().name() + "_jt_trash"), new JObjectKey(key().name() + "_jt_root"));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,6 +3,7 @@ package com.usatiuk.dhfs.objects.repository.invalidation;
|
||||
import com.usatiuk.dhfs.objects.PeerId;
|
||||
import com.usatiuk.dhfs.objects.jkleppmanntree.JKleppmannTreeManager;
|
||||
import com.usatiuk.dhfs.objects.jkleppmanntree.JKleppmannTreeOpWrapper;
|
||||
import com.usatiuk.dhfs.objects.jkleppmanntree.JKleppmannTreePeriodicPushOp;
|
||||
import com.usatiuk.dhfs.objects.transaction.Transaction;
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
import jakarta.inject.Inject;
|
||||
@@ -15,6 +16,8 @@ public class OpHandler {
|
||||
Transaction curTx;
|
||||
@Inject
|
||||
JKleppmannTreeManager jKleppmannTreeManager;
|
||||
@Inject
|
||||
InvalidationQueueService invalidationQueueService;
|
||||
|
||||
public void handleOp(PeerId from, Op op) {
|
||||
if (op instanceof IndexUpdateOp iu) {
|
||||
@@ -22,6 +25,10 @@ public class OpHandler {
|
||||
} else if (op instanceof JKleppmannTreeOpWrapper jk) {
|
||||
var tree = jKleppmannTreeManager.getTree(jk.treeName());
|
||||
tree.acceptExternalOp(from, jk);
|
||||
curTx.onCommit(() -> invalidationQueueService.pushInvalidationToOne(from, jk.treeName()));
|
||||
} else if (op instanceof JKleppmannTreePeriodicPushOp pop) {
|
||||
var tree = jKleppmannTreeManager.getTree(pop.treeName());
|
||||
tree.acceptExternalOp(from, pop);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -54,7 +54,7 @@ public class OpPusher {
|
||||
var tree = jKleppmannTreeManager.getTree(pd.key());
|
||||
|
||||
if (!tree.hasPendingOpsForHost(entry.peer()))
|
||||
return null;
|
||||
return List.of(tree.getPeriodicPushOp());
|
||||
|
||||
var ops = tree.getPendingOpsForHost(entry.peer(), 1);
|
||||
|
||||
|
||||
@@ -0,0 +1,69 @@
|
||||
package com.usatiuk.dhfs;
|
||||
|
||||
import com.usatiuk.dhfs.objects.JDataRef;
|
||||
import com.usatiuk.dhfs.objects.JObjectKey;
|
||||
import com.usatiuk.dhfs.objects.testobjs.TestRefcount;
|
||||
import com.usatiuk.dhfs.objects.transaction.Transaction;
|
||||
import com.usatiuk.dhfs.objects.transaction.TransactionManager;
|
||||
import io.quarkus.test.junit.QuarkusTest;
|
||||
import io.quarkus.test.junit.TestProfile;
|
||||
import jakarta.inject.Inject;
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.pcollections.HashTreePSet;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
class Profiles {
|
||||
public static class RefcounterTestProfile extends TempDataProfile {
|
||||
@Override
|
||||
protected void getConfigOverrides(Map<String, String> ret) {
|
||||
ret.put("quarkus.log.category.\"com.usatiuk.dhfs\".level", "INFO");
|
||||
ret.put("dhfs.fuse.enabled", "false");
|
||||
ret.put("dhfs.objects.ref_verification", "false");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@QuarkusTest
|
||||
@TestProfile(Profiles.RefcounterTestProfile.class)
|
||||
public class RefcounterTest {
|
||||
|
||||
@Inject
|
||||
Transaction curTx;
|
||||
@Inject
|
||||
TransactionManager txm;
|
||||
|
||||
@Test
|
||||
void refcountParentChange() {
|
||||
final JObjectKey PARENT_1_KEY = JObjectKey.of("refcountParentChange_parent1");
|
||||
final JObjectKey PARENT_2_KEY = JObjectKey.of("refcountParentChange_parent2");
|
||||
final JObjectKey CHILD_KEY = JObjectKey.of("refcountParentChange_child");
|
||||
|
||||
txm.run(() -> {
|
||||
curTx.put((new TestRefcount(PARENT_1_KEY)).withFrozen(true));
|
||||
curTx.put((new TestRefcount(PARENT_2_KEY)).withFrozen(true));
|
||||
});
|
||||
|
||||
txm.run(() -> {
|
||||
curTx.put((new TestRefcount(CHILD_KEY)).withFrozen(false));
|
||||
curTx.put(curTx.get(TestRefcount.class, PARENT_1_KEY).get().withKids(HashTreePSet.<JObjectKey>empty().plus(CHILD_KEY)));
|
||||
});
|
||||
|
||||
txm.run(() -> {
|
||||
var kid = curTx.get(TestRefcount.class, CHILD_KEY).get();
|
||||
Assertions.assertIterableEquals(List.of(PARENT_1_KEY), kid.refsFrom().stream().map(JDataRef::obj).toList());
|
||||
});
|
||||
|
||||
txm.run(() -> {
|
||||
curTx.put(curTx.get(TestRefcount.class, PARENT_1_KEY).get().withKids(HashTreePSet.<JObjectKey>empty().minus(CHILD_KEY)));
|
||||
curTx.put(curTx.get(TestRefcount.class, PARENT_2_KEY).get().withKids(HashTreePSet.<JObjectKey>empty().plus(CHILD_KEY)));
|
||||
});
|
||||
|
||||
txm.run(() -> {
|
||||
Assertions.assertIterableEquals(List.of(PARENT_2_KEY), curTx.get(TestRefcount.class, CHILD_KEY).get().refsFrom().stream().map(JDataRef::obj).toList());
|
||||
});
|
||||
}
|
||||
|
||||
}
|
||||
@@ -39,7 +39,7 @@ class Profiles {
|
||||
}
|
||||
}
|
||||
|
||||
public class DhfsFileServiceSimpleTestImpl {
|
||||
public abstract class DhfsFileServiceSimpleTestImpl {
|
||||
@Inject
|
||||
DhfsFileService fileService;
|
||||
@Inject
|
||||
|
||||
@@ -192,12 +192,6 @@ public class DhfsFuseIT {
|
||||
0 == container2.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse").getExitCode());
|
||||
Log.info("Deleted");
|
||||
|
||||
// TODO: Fix this
|
||||
Log.info("Dummy write");
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo tesempty > /root/dhfs_default/fuse/testd1").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "echo tesempty > /root/dhfs_default/fuse/testd2").getExitCode());
|
||||
Log.info("Dummy written");
|
||||
|
||||
// FIXME?
|
||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Deleting from persistent"), 60, TimeUnit.SECONDS, 3);
|
||||
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Deleting from persistent"), 60, TimeUnit.SECONDS, 3);
|
||||
@@ -355,4 +349,82 @@ public class DhfsFuseIT {
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
void removeAndMove() throws IOException, InterruptedException, TimeoutException {
|
||||
var client = DockerClientFactory.instance().client();
|
||||
Log.info("Creating");
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo tesempty > /root/dhfs_default/fuse/testf1").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> "tesempty\n".equals(container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf1").getStdout()));
|
||||
Log.info("Listing");
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse/").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> "tesempty\n".equals(container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf1").getStdout()));
|
||||
|
||||
client.pauseContainerCmd(container1.getContainerId()).exec();
|
||||
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Lost connection to"), 60, TimeUnit.SECONDS, 1);
|
||||
|
||||
Log.info("Removing");
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "rm /root/dhfs_default/fuse/testf1").getExitCode());
|
||||
|
||||
client.pauseContainerCmd(container2.getContainerId()).exec();
|
||||
client.unpauseContainerCmd(container1.getContainerId()).exec();
|
||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Lost connection to"), 60, TimeUnit.SECONDS, 1);
|
||||
Log.info("Moving");
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "mv /root/dhfs_default/fuse/testf1 /root/dhfs_default/fuse/testf2").getExitCode());
|
||||
Log.info("Listing");
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse/").getExitCode());
|
||||
Log.info("Reading");
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> "tesempty\n".equals(container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf2").getStdout()));
|
||||
client.unpauseContainerCmd(container2.getContainerId()).exec();
|
||||
|
||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS, 1);
|
||||
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS, 1);
|
||||
// Either removed, or moved
|
||||
// TODO: it always seems to be removed?
|
||||
Log.info("Reading both");
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> {
|
||||
var ls1 = container1.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse/");
|
||||
var ls2 = container2.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse/");
|
||||
var cat1 = container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/*");
|
||||
var cat2 = container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/*");
|
||||
Log.info("cat1: " + cat1);
|
||||
Log.info("cat2: " + cat2);
|
||||
Log.info("ls1: " + ls1);
|
||||
Log.info("ls2: " + ls2);
|
||||
|
||||
if (!ls1.getStdout().equals(ls2.getStdout())) {
|
||||
Log.info("Different ls?");
|
||||
return false;
|
||||
}
|
||||
|
||||
if (ls1.getStdout().trim().isEmpty() && ls2.getStdout().trim().isEmpty()) {
|
||||
Log.info("Both empty");
|
||||
return true;
|
||||
}
|
||||
|
||||
if (!cat1.getStdout().equals(cat2.getStdout())) {
|
||||
Log.info("Different cat?");
|
||||
return false;
|
||||
}
|
||||
|
||||
if (!(cat1.getExitCode() == 0 && cat2.getExitCode() == 0 && ls1.getExitCode() == 0 && ls2.getExitCode() == 0)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
boolean hasMoved = cat1.getStdout().contains("tesempty") && cat2.getStdout().contains("tesempty")
|
||||
&& ls1.getStdout().contains("testf2") && !ls1.getStdout().contains("testf1")
|
||||
&& ls2.getStdout().contains("testf2") && !ls2.getStdout().contains("testf1");
|
||||
|
||||
boolean removed = !cat1.getStdout().contains("tesempty") && !cat2.getStdout().contains("tesempty")
|
||||
&& !ls1.getStdout().contains("testf2") && !ls1.getStdout().contains("testf1")
|
||||
&& !ls2.getStdout().contains("testf2") && !ls2.getStdout().contains("testf1");
|
||||
|
||||
if (hasMoved && removed) {
|
||||
Log.info("Both removed and moved");
|
||||
return false;
|
||||
}
|
||||
|
||||
return hasMoved || removed;
|
||||
});
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -0,0 +1,36 @@
|
||||
package com.usatiuk.dhfs.objects.testobjs;
|
||||
|
||||
import com.usatiuk.dhfs.objects.JDataRef;
|
||||
import com.usatiuk.dhfs.objects.JDataRefcounted;
|
||||
import com.usatiuk.dhfs.objects.JObjectKey;
|
||||
import org.pcollections.HashTreePSet;
|
||||
import org.pcollections.PCollection;
|
||||
|
||||
import java.util.Collection;
|
||||
|
||||
public record TestRefcount(JObjectKey key, PCollection<JDataRef> refsFrom, boolean frozen,
|
||||
PCollection<JObjectKey> kids) implements JDataRefcounted {
|
||||
|
||||
public TestRefcount(JObjectKey key) {
|
||||
this(key, HashTreePSet.empty(), false, HashTreePSet.empty());
|
||||
}
|
||||
|
||||
@Override
|
||||
public TestRefcount withRefsFrom(PCollection<JDataRef> refs) {
|
||||
return new TestRefcount(key, refs, frozen, kids);
|
||||
}
|
||||
|
||||
@Override
|
||||
public TestRefcount withFrozen(boolean frozen) {
|
||||
return new TestRefcount(key, refsFrom, frozen, kids);
|
||||
}
|
||||
|
||||
public TestRefcount withKids(PCollection<JObjectKey> kids) {
|
||||
return new TestRefcount(key, refsFrom, frozen, kids);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Collection<JObjectKey> collectRefsTo() {
|
||||
return kids;
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user