7 Commits

Author SHA1 Message Date
a7f385139c some fixes 2024-08-04 19:57:55 +02:00
0bebc90585 directory-level conflict resolution 2024-08-04 19:30:04 +02:00
a507ae7ac4 a little tree cleanup 2024-08-04 18:23:37 +02:00
e8b0f65181 log trimming 2024-08-04 14:46:54 +02:00
3ea6e62a32 p2p op propagation basics 2024-08-04 13:16:39 +02:00
3145ba2c27 fix timestamp on replace-move 2024-08-04 10:58:25 +02:00
8134444786 symlink fix order 2024-08-03 23:27:24 +02:00
46 changed files with 948 additions and 455 deletions

View File

@@ -14,11 +14,11 @@ public class AtomicClock implements Clock<Long>, Serializable {
@Override
public void updateTimestamp(Long receivedTimestamp) {
long exp = _max.get();
long set = Math.max(exp, receivedTimestamp);
long set = Math.max(exp, receivedTimestamp) + 1;
// TODO: What is correct memory ordering?
while (!_max.weakCompareAndSetVolatile(exp, set)) {
exp = _max.get();
set = Math.max(exp, set);
set = Math.max(exp, set) + 1;
}
}
}

View File

@@ -9,6 +9,10 @@ public record CombinedTimestamp<TimestampT extends Comparable<TimestampT>, PeerI
@Override
public int compareTo(CombinedTimestamp<TimestampT, PeerIdT> o) {
if (nodeId == null || o.nodeId == null) {
return Comparator.comparing((CombinedTimestamp<TimestampT, PeerIdT> t) -> t.timestamp)
.compare(this, o);
}
return Comparator.comparing((CombinedTimestamp<TimestampT, PeerIdT> t) -> t.timestamp)
.thenComparing((CombinedTimestamp<TimestampT, PeerIdT> t) -> t.nodeId)
.compare(this, o);

View File

@@ -1,20 +1,25 @@
package com.usatiuk.kleppmanntree;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.*;
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Level;
import java.util.logging.Logger;
public class KleppmannTree<TimestampT extends Comparable<TimestampT>, PeerIdT extends Comparable<PeerIdT>, MetaT extends NodeMeta, NodeIdT, WrapperT extends TreeNodeWrapper<MetaT, NodeIdT>> {
private final StorageInterface<TimestampT, PeerIdT, MetaT, NodeIdT, WrapperT> _storage;
private final PeerInterface<PeerIdT> _peers;
private final Clock<TimestampT> _clock;
private final OpRecorder<TimestampT, PeerIdT, MetaT, NodeIdT> _opRecorder;
private static final Logger LOGGER = Logger.getLogger(KleppmannTree.class.getName());
public KleppmannTree(StorageInterface<TimestampT, PeerIdT, MetaT, NodeIdT, WrapperT> storage,
PeerInterface<PeerIdT> peers,
Clock<TimestampT> clock) {
Clock<TimestampT> clock,
OpRecorder<TimestampT, PeerIdT, MetaT, NodeIdT> opRecorder) {
_storage = storage;
_peers = peers;
_clock = clock;
_opRecorder = opRecorder;
}
@@ -37,27 +42,24 @@ public class KleppmannTree<TimestampT extends Comparable<TimestampT>, PeerIdT ex
}
public NodeIdT traverse(List<String> names) {
_storage.globalRLock();
try {
return traverse(_storage.getRootId(), names);
} finally {
_storage.globalRUnlock();
}
return traverse(_storage.getRootId(), names);
}
private void undoOp(LogOpMove<TimestampT, PeerIdT, ? extends MetaT, NodeIdT> op) {
if (op.oldInfo() != null) {
var node = _storage.getById(op.op().childId());
var oldParent = _storage.getById(op.oldInfo().oldParent());
var curParent = _storage.getById(op.op().newParentId());
private HashMap<NodeIdT, WrapperT> _undoCtx = null;
private void undoEffect(LogEffect<? extends MetaT, NodeIdT> effect) {
if (effect.oldInfo() != null) {
var node = _storage.getById(effect.childId());
var oldParent = _storage.getById(effect.oldInfo().oldParent());
var curParent = _storage.getById(effect.newParentId());
curParent.rwLock();
oldParent.rwLock();
node.rwLock();
try {
curParent.getNode().getChildren().remove(node.getNode().getMeta().getName());
if (!node.getNode().getMeta().getClass().equals(op.oldInfo().oldMeta().getClass()))
if (!node.getNode().getMeta().getClass().equals(effect.oldInfo().oldMeta().getClass()))
throw new IllegalArgumentException("Class mismatch for meta for node " + node.getNode().getId());
node.getNode().setMeta(op.oldInfo().oldMeta());
node.getNode().setMeta(effect.oldInfo().oldMeta());
node.getNode().setParent(oldParent.getNode().getId());
oldParent.getNode().getChildren().put(node.getNode().getMeta().getName(), node.getNode().getId());
node.notifyRmRef(curParent.getNode().getId());
@@ -68,14 +70,16 @@ public class KleppmannTree<TimestampT extends Comparable<TimestampT>, PeerIdT ex
curParent.rwUnlock();
}
} else {
var node = _storage.getById(op.op().childId());
var curParent = _storage.getById(op.op().newParentId());
var node = _storage.getById(effect.childId());
var curParent = _storage.getById(effect.newParentId());
curParent.rwLock();
node.rwLock();
try {
curParent.getNode().getChildren().remove(node.getNode().getMeta().getName());
node.lock();
node.getNode().setParent(null);
node.notifyRmRef(curParent.getNode().getId());
_storage.removeNode(node.getNode().getId());
_undoCtx.put(node.getNode().getId(), node);
} finally {
node.rwUnlock();
curParent.rwUnlock();
@@ -83,59 +87,189 @@ public class KleppmannTree<TimestampT extends Comparable<TimestampT>, PeerIdT ex
}
}
private void redoOp(Map.Entry<CombinedTimestamp<TimestampT, PeerIdT>, LogOpMove<TimestampT, PeerIdT, ? extends MetaT, NodeIdT>> entry) {
entry.setValue(doOp(entry.getValue().op()));
private void undoOp(LogRecord<TimestampT, PeerIdT, ? extends MetaT, NodeIdT> op) {
for (var e : op.effects().reversed())
undoEffect(e);
}
public <LocalMetaT extends MetaT> void applyOp(OpMove<TimestampT, PeerIdT, LocalMetaT, NodeIdT> op) {
_clock.updateTimestamp(op.timestamp().timestamp());
private void redoOp(Map.Entry<CombinedTimestamp<TimestampT, PeerIdT>, LogRecord<TimestampT, PeerIdT, ? extends MetaT, NodeIdT>> entry) {
entry.setValue(doOp(null, entry.getValue().op(), false));
}
private <LocalMetaT extends MetaT> void doAndPut(PeerIdT from, OpMove<TimestampT, PeerIdT, LocalMetaT, NodeIdT> op) {
var res = doOp(from, op, true);
var log = _storage.getLog();
log.put(res.op().timestamp(), res);
}
int cmp;
private void tryTrimLog() {
var log = _storage.getLog();
var timeLog = _storage.getPeerTimestampLog();
TimestampT min = null;
for (var e : _peers.getAllPeers()) {
var got = timeLog.get(e);
if (got == null || got.get() == null) return;
var gotNum = got.get();
if (min == null) {
min = gotNum;
continue;
}
if (gotNum.compareTo(min) < 0)
min = gotNum;
}
if (min == null) return;
_storage.globalRLock();
try {
if (log.isEmpty()) {
// doOp can't be a move here, otherwise we deadlock
log.put(op.timestamp(), doOp(op));
var canTrim = log.headMap(new CombinedTimestamp<>(min, null), true);
if (!canTrim.isEmpty()) {
canTrim = log.headMap(new CombinedTimestamp<>(min, null), true);
Set<NodeIdT> inTrash = new HashSet<>();
for (var le : canTrim.values()) {
for (var e : le.effects()) {
if (Objects.equals(e.newParentId(), _storage.getTrashId())) {
inTrash.add(e.childId());
} else {
inTrash.remove(e.childId());
}
}
}
canTrim.clear();
if (!inTrash.isEmpty()) {
var trash = _storage.getById(_storage.getTrashId());
trash.rwLock();
try {
for (var n : inTrash) {
var node = _storage.getById(n);
node.rwLock();
try {
trash.getNode().getChildren().remove(node.getNode().getMeta().getName());
node.notifyRmRef(trash.getNode().getId());
} finally {
node.rwUnlock();
}
_storage.removeNode(n);
}
} finally {
trash.rwUnlock();
}
}
}
}
private void maybeRecord(PeerIdT from, OpMove<TimestampT, PeerIdT, ? extends MetaT, NodeIdT> op) {
if (Objects.equals(from, _peers.getSelfId())) {
if (!_storage.getLog().containsKey(op.timestamp()))
_opRecorder.recordOp(op);
}
}
public <LocalMetaT extends MetaT> void move(NodeIdT newParent, LocalMetaT newMeta, NodeIdT child) {
synchronized (this) {
applyOp(_peers.getSelfId(), createMove(newParent, newMeta, child));
}
}
public void applyExternalOp(PeerIdT from, OpMove<TimestampT, PeerIdT, ? extends MetaT, NodeIdT> op) {
applyOp(from, op);
}
private void applyOp(PeerIdT from, OpMove<TimestampT, PeerIdT, ? extends MetaT, NodeIdT> op) {
synchronized (this) {
_clock.updateTimestamp(op.timestamp().timestamp());
var ref = _storage.getPeerTimestampLog().computeIfAbsent(from, f -> new AtomicReference<>());
// TODO: I guess it's not actually needed since one peer can't handle concurrent updates?
TimestampT oldRef;
TimestampT newRef;
do {
oldRef = ref.get();
if (oldRef != null && oldRef.compareTo(op.timestamp().timestamp()) >= 0)
throw new IllegalArgumentException("Wrong op order: received older than known from " + from.toString());
newRef = op.timestamp().timestamp();
} while (!ref.compareAndSet(oldRef, newRef));
var log = _storage.getLog();
// FIXME: hack?
int cmp = log.isEmpty() ? 1 : op.timestamp().compareTo(log.lastEntry().getKey());
if (log.containsKey(op.timestamp())) {
tryTrimLog();
return;
}
cmp = op.timestamp().compareTo(log.lastEntry().getKey());
} finally {
_storage.globalRUnlock();
}
assert cmp != 0;
if (cmp < 0) {
try {
if (log.containsKey(op.timestamp())) return;
var toUndo = log.tailMap(op.timestamp(), false);
_undoCtx = new HashMap<>();
for (var entry : toUndo.reversed().entrySet()) {
undoOp(entry.getValue());
}
doAndPut(from, op);
for (var entry : toUndo.entrySet()) {
redoOp(entry);
}
assert cmp != 0;
if (cmp < 0) {
_storage.globalRwLock();
try {
if (log.containsKey(op.timestamp())) return;
var toUndo = log.tailMap(op.timestamp(), false);
for (var entry : toUndo.reversed().entrySet()) {
undoOp(entry.getValue());
if (!_undoCtx.isEmpty()) {
for (var e : _undoCtx.entrySet()) {
LOGGER.log(Level.FINE, "Dropping node " + e.getKey());
e.getValue().unlock();
_storage.removeNode(e.getKey());
}
}
_undoCtx = null;
} finally {
tryTrimLog();
}
log.put(op.timestamp(), doOp(op));
for (var entry : toUndo.entrySet()) {
redoOp(entry);
}
} finally {
_storage.globalRwUnlock();
} else {
doAndPut(from, op);
tryTrimLog();
}
} else {
var res = doOp(op);
log.put(op.timestamp(), res);
}
}
public CombinedTimestamp<TimestampT, PeerIdT> getTimestamp() {
private CombinedTimestamp<TimestampT, PeerIdT> getTimestamp() {
return new CombinedTimestamp<>(_clock.getTimestamp(), _peers.getSelfId());
}
public OpMove<TimestampT, PeerIdT, MetaT, NodeIdT> createMove(NodeIdT newParent, MetaT newMeta, NodeIdT node) {
private <LocalMetaT extends MetaT> OpMove<TimestampT, PeerIdT, LocalMetaT, NodeIdT> createMove(NodeIdT newParent, LocalMetaT newMeta, NodeIdT node) {
return new OpMove<>(getTimestamp(), newParent, newMeta, node);
}
private <LocalMetaT extends MetaT> LogOpMove<TimestampT, PeerIdT, LocalMetaT, NodeIdT> doOp(OpMove<TimestampT, PeerIdT, LocalMetaT, NodeIdT> op) {
private WrapperT getNode(TreeNode<MetaT, NodeIdT> desired) {
if (_undoCtx != null) {
var node = _undoCtx.get(desired.getId());
if (node != null) {
node.rwLock();
try {
if (!node.getNode().getChildren().isEmpty()) {
LOGGER.log(Level.WARNING, "Not empty children for undone node " + desired.getId());
assert node.getNode().getChildren().isEmpty();
node.getNode().getChildren().clear();
}
node.getNode().setParent(desired.getParent());
node.notifyRef(desired.getParent());
node.getNode().setMeta(desired.getMeta());
node.unlock();
} catch (Exception e) {
LOGGER.log(Level.SEVERE, "Error while fixing up node " + desired.getId(), e);
node.rwUnlock();
node = null;
}
}
if (node != null) {
_undoCtx.remove(desired.getId());
return node;
}
}
return _storage.createNewNode(desired);
}
private <LocalMetaT extends MetaT> LogRecord<TimestampT, PeerIdT, LocalMetaT, NodeIdT> doOp(PeerIdT from, OpMove<TimestampT, PeerIdT, LocalMetaT, NodeIdT> op, boolean record) {
var node = _storage.getById(op.childId());
var oldParent = (node != null && node.getNode().getParent() != null) ? _storage.getById(node.getNode().getParent()) : null;
@@ -148,11 +282,35 @@ public class KleppmannTree<TimestampT extends Comparable<TimestampT>, PeerIdT ex
if (oldParent == null) {
newParent.rwLock();
try {
node = _storage.createNewNode(new TreeNode<>(op.childId(), op.newParentId(), op.newMeta()));
node = getNode(new TreeNode<>(op.childId(), op.newParentId(), op.newMeta()));
try {
var old = newParent.getNode().getChildren().get(node.getNode().getMeta().getName());
var effects = new ArrayList<LogEffect<LocalMetaT, NodeIdT>>(2);
if (old != null) {
var oldN = _storage.getById(old);
oldN.rwLock();
try {
var oldMeta = oldN.getNode().getMeta();
oldN.getNode().setMeta((MetaT) oldN.getNode().getMeta().withName(oldN.getNode().getMeta().getName() + ".conflict." + oldN.getNode().getId()));
node.getNode().setMeta((MetaT) node.getNode().getMeta().withName(node.getNode().getMeta().getName() + ".conflict." + node.getNode().getId()));
newParent.getNode().getChildren().remove(node.getNode().getMeta().getName());
newParent.getNode().getChildren().put(oldN.getNode().getMeta().getName(), oldN.getNode().getId());
effects.add(new LogEffect<>(new LogEffectOld<>(newParent.getNode().getId(), (LocalMetaT) oldMeta), op.newParentId(), (LocalMetaT) oldN.getNode().getMeta(), oldN.getNode().getId()));
effects.add(new LogEffect<>(null, op.newParentId(), (LocalMetaT) node.getNode().getMeta(), op.childId()));
} finally {
oldN.rwUnlock();
}
} else {
effects.add(new LogEffect<>(null, op.newParentId(), (LocalMetaT) node.getNode().getMeta(), op.childId()));
}
newParent.getNode().getChildren().put(node.getNode().getMeta().getName(), node.getNode().getId());
node.notifyRef(newParent.getNode().getId());
return new LogOpMove<>(null, op);
if (record)
maybeRecord(from, op);
return new LogRecord<>(op, Collections.unmodifiableList(effects));
} finally {
node.rwUnlock();
}
@@ -161,42 +319,55 @@ public class KleppmannTree<TimestampT extends Comparable<TimestampT>, PeerIdT ex
}
}
// FIXME:
_storage.globalRwLock();
if (Objects.equals(op.childId(), op.newParentId()) || isAncestor(op.childId(), op.newParentId())) {
if (record)
maybeRecord(from, op);
return new LogRecord<>(op, null);
}
var trash = _storage.getById(_storage.getTrashId());
trash.rwLock();
newParent.rwLock();
oldParent.rwLock();
node.rwLock();
try {
if (op.childId() == op.newParentId() || isAncestor(op.childId(), op.newParentId()))
return new LogOpMove<>(null, op);
var trash = _storage.getById(_storage.getTrashId());
trash.rwLock();
newParent.rwLock();
oldParent.rwLock();
node.rwLock();
try {
oldParent.getNode().getChildren().remove(node.getNode().getMeta().getName());
var oldMeta = node.getNode().getMeta();
if (!node.getNode().getMeta().getClass().equals(op.newMeta().getClass()))
throw new IllegalArgumentException("Class mismatch for meta for node " + node.getNode().getId());
node.getNode().setMeta(op.newMeta());
node.getNode().setParent(newParent.getNode().getId());
var old = newParent.getNode().getChildren().get(op.newMeta().getName());
if (old != null) {
var oldNode = _storage.getById(old);
applyOp(createMove(_storage.getTrashId(), (MetaT) oldNode.getNode().getMeta().withName(oldNode.getNode().getId().toString()), oldNode.getNode().getId()));
oldParent.getNode().getChildren().remove(node.getNode().getMeta().getName());
var oldMeta = node.getNode().getMeta();
if (!node.getNode().getMeta().getClass().equals(op.newMeta().getClass()))
throw new IllegalArgumentException("Class mismatch for meta for node " + node.getNode().getId());
node.getNode().setMeta(op.newMeta());
node.getNode().setParent(newParent.getNode().getId());
var old = newParent.getNode().getChildren().get(op.newMeta().getName());
// TODO: somehow detect when this might be a conflict? (2 devices move two different files into one)
var effects = new ArrayList<LogEffect<LocalMetaT, NodeIdT>>(2);
if (old != null) {
var oldNode = _storage.getById(old);
try {
oldNode.rwLock();
trash.getNode().getChildren().put(oldNode.getNode().getId().toString(), oldNode.getNode().getId());
var oldOldMeta = oldNode.getNode().getMeta();
oldNode.notifyRmRef(newParent.getNode().getId());
oldNode.notifyRef(trash.getNode().getId());
oldNode.getNode().setMeta((MetaT) oldNode.getNode().getMeta().withName(oldNode.getNode().getId().toString()));
oldNode.getNode().setParent(_storage.getTrashId());
effects.add(new LogEffect<>(new LogEffectOld<>(newParent.getNode().getId(), (LocalMetaT) oldOldMeta), trash.getNode().getId(), (LocalMetaT) oldNode.getNode().getMeta(), oldNode.getNode().getId()));
} finally {
oldNode.rwUnlock();
}
newParent.getNode().getChildren().put(op.newMeta().getName(), node.getNode().getId());
node.notifyRmRef(oldParent.getNode().getId());
node.notifyRef(newParent.getNode().getId());
return new LogOpMove<>(new LogOpMoveOld<>(oldParent.getNode().getId(), (LocalMetaT) oldMeta), op);
} finally {
node.rwUnlock();
oldParent.rwUnlock();
newParent.rwUnlock();
trash.rwUnlock();
}
newParent.getNode().getChildren().put(op.newMeta().getName(), node.getNode().getId());
node.notifyRmRef(oldParent.getNode().getId());
node.notifyRef(newParent.getNode().getId());
if (record)
maybeRecord(from, op);
effects.add(new LogEffect<>(new LogEffectOld<>(oldParent.getNode().getId(), (LocalMetaT) oldMeta), op.newParentId(), (LocalMetaT) node.getNode().getMeta(), node.getNode().getId()));
return new LogRecord<>(op, Collections.unmodifiableList(effects));
} finally {
_storage.globalRwUnlock();
node.rwUnlock();
oldParent.rwUnlock();
newParent.rwUnlock();
trash.rwUnlock();
}
}

View File

@@ -0,0 +1,10 @@
package com.usatiuk.kleppmanntree;
import java.io.Serializable;
public record LogEffect<MetaT extends NodeMeta, NodeIdT>(
LogEffectOld<MetaT, NodeIdT> oldInfo,
NodeIdT newParentId,
MetaT newMeta,
NodeIdT childId) implements Serializable {
}

View File

@@ -2,5 +2,5 @@ package com.usatiuk.kleppmanntree;
import java.io.Serializable;
public record LogOpMoveOld<MetaT extends NodeMeta, NodeIdT>
public record LogEffectOld<MetaT extends NodeMeta, NodeIdT>
(NodeIdT oldParent, MetaT oldMeta) implements Serializable {}

View File

@@ -1,7 +0,0 @@
package com.usatiuk.kleppmanntree;
import java.io.Serializable;
public record LogOpMove<TimestampT extends Comparable<TimestampT>, PeerIdT extends Comparable<PeerIdT>, MetaT extends NodeMeta, NodeIdT>
(LogOpMoveOld<MetaT, NodeIdT> oldInfo,
OpMove<TimestampT, PeerIdT, MetaT, NodeIdT> op) implements Serializable {}

View File

@@ -0,0 +1,8 @@
package com.usatiuk.kleppmanntree;
import java.io.Serializable;
import java.util.List;
public record LogRecord<TimestampT extends Comparable<TimestampT>, PeerIdT extends Comparable<PeerIdT>, MetaT extends NodeMeta, NodeIdT>
(OpMove<TimestampT, PeerIdT, MetaT, NodeIdT> op,
List<LogEffect<MetaT, NodeIdT>> effects) implements Serializable {}

View File

@@ -0,0 +1,5 @@
package com.usatiuk.kleppmanntree;
public interface OpRecorder<TimestampT extends Comparable<TimestampT>, PeerIdT extends Comparable<PeerIdT>, MetaT extends NodeMeta, NodeIdT> {
void recordOp(OpMove<TimestampT, PeerIdT, ? extends MetaT, NodeIdT> op);
}

View File

@@ -1,7 +1,9 @@
package com.usatiuk.kleppmanntree;
import java.util.Collection;
import java.util.Map;
import java.util.NavigableMap;
import java.util.concurrent.atomic.AtomicReference;
public interface StorageInterface<
TimestampT extends Comparable<TimestampT>,
@@ -25,15 +27,7 @@ public interface StorageInterface<
void lockSet(Collection<WrapperT> nodes);
// It is expected that the map allows concurrent additions at the end
NavigableMap<CombinedTimestamp<TimestampT, PeerIdT>, LogOpMove<TimestampT, PeerIdT, ? extends MetaT, NodeIdT>> getLog();
NavigableMap<CombinedTimestamp<TimestampT, PeerIdT>, LogRecord<TimestampT, PeerIdT, ? extends MetaT, NodeIdT>> getLog();
// Locks all the objects from being changed
void globalRwLock();
void globalRwUnlock();
// Locks all the objects from being changed
void globalRLock();
void globalRUnlock();
Map<PeerIdT, AtomicReference<TimestampT>> getPeerTimestampLog();
}

View File

@@ -9,6 +9,10 @@ public interface TreeNodeWrapper<MetaT extends NodeMeta, NodeIdT> {
void rwUnlock();
void lock();
void unlock();
void notifyRef(NodeIdT id);
void notifyRmRef(NodeIdT id);

View File

@@ -8,26 +8,24 @@ import java.util.List;
public class KleppmanTreeSimpleTest {
private final TestNode testNode1 = new TestNode(1);
private final TestNode testNode2 = new TestNode(2);
private final TestNode testNode3 = new TestNode(3);
private final TestNode testNode4 = new TestNode(4);
@Test
void circularTest() {
var d1id = testNode1._storageInterface.getNewNodeId();
var d2id = testNode2._storageInterface.getNewNodeId();
var op1 = new OpMove<>(new CombinedTimestamp<>(testNode1._clock.getTimestamp(), 1L),
testNode1._storageInterface.getRootId(),
new TestNodeMetaDir("Test1"),
d1id);
var op2 = new OpMove<>(new CombinedTimestamp<>(testNode2._clock.getTimestamp(), 2L),
testNode2._storageInterface.getRootId(),
new TestNodeMetaDir("Test2"),
d2id);
testNode1._tree.applyOp(op1);
testNode2._tree.applyOp(op2);
testNode1._tree.applyOp(op2);
testNode2._tree.applyOp(op1);
testNode1._tree.move(testNode1._storageInterface.getRootId(), new TestNodeMetaDir("Test1"), d1id);
testNode2._tree.move(testNode1._storageInterface.getRootId(), new TestNodeMetaDir("Test2"), d2id);
{
var r1 = testNode1.getRecorded();
Assertions.assertEquals(1, r1.size());
testNode2._tree.applyExternalOp(1L, r1.getFirst());
var r2 = testNode2.getRecorded();
Assertions.assertEquals(1, r2.size());
testNode1._tree.applyExternalOp(2L, r2.getFirst());
}
Assertions.assertEquals(d1id, testNode1._tree.traverse(List.of("Test1")));
Assertions.assertEquals(d2id, testNode1._tree.traverse(List.of("Test2")));
@@ -38,12 +36,13 @@ public class KleppmanTreeSimpleTest {
Assertions.assertIterableEquals(List.of("Test1", "Test2"), testNode2._storageInterface.getById(testNode2._storageInterface.getRootId()).getNode().getChildren().keySet());
var f1id = testNode1._storageInterface.getNewNodeId();
var op3 = new OpMove<>(new CombinedTimestamp<>(testNode1._clock.getTimestamp(), 1L),
d2id,
new TestNodeMetaFile("TestFile", 1234),
f1id);
testNode1._tree.applyOp(op3);
testNode2._tree.applyOp(op3);
testNode1._tree.move(d2id, new TestNodeMetaFile("TestFile", 1234), f1id);
{
var r1 = testNode1.getRecorded();
Assertions.assertEquals(1, r1.size());
testNode2._tree.applyExternalOp(1L, r1.getFirst());
}
Assertions.assertEquals(f1id, testNode1._tree.traverse(List.of("Test2", "TestFile")));
Assertions.assertEquals(f1id, testNode2._tree.traverse(List.of("Test2", "TestFile")));
@@ -52,22 +51,26 @@ public class KleppmanTreeSimpleTest {
d1id,
new TestNodeMetaDir("Test2"),
d2id);
testNode1._tree.applyOp(cop1);
testNode1._tree.move(d1id, new TestNodeMetaDir("Test2"), d2id);
Assertions.assertEquals(d1id, testNode1._tree.traverse(List.of("Test1")));
Assertions.assertEquals(d2id, testNode1._tree.traverse(List.of("Test1", "Test2")));
Assertions.assertIterableEquals(List.of("Test1"), testNode1._storageInterface.getById(testNode2._storageInterface.getRootId()).getNode().getChildren().keySet());
var cop2 = new OpMove<>(new CombinedTimestamp<>(testNode2._clock.getTimestamp(), 2L),
d2id,
new TestNodeMetaDir("Test1"),
d1id);
testNode2._tree.applyOp(cop2);
testNode2._tree.move(d2id,new TestNodeMetaDir("Test1"),d1id);
Assertions.assertIterableEquals(List.of("Test2"), testNode2._storageInterface.getById(testNode2._storageInterface.getRootId()).getNode().getChildren().keySet());
Assertions.assertEquals(d2id, testNode2._tree.traverse(List.of("Test2")));
Assertions.assertEquals(d1id, testNode2._tree.traverse(List.of("Test2", "Test1")));
testNode1._tree.applyOp(cop2);
testNode2._tree.applyOp(cop1);
{
var r1 = testNode1.getRecorded();
Assertions.assertEquals(1, r1.size());
testNode2._tree.applyExternalOp(1L, r1.getFirst());
var r2 = testNode2.getRecorded();
Assertions.assertEquals(1, r2.size());
testNode1._tree.applyExternalOp(2L, r2.getFirst());
}
// Second node wins as it has smaller timestamp
Assertions.assertIterableEquals(List.of("Test2"), testNode1._storageInterface.getById(testNode2._storageInterface.getRootId()).getNode().getChildren().keySet());
Assertions.assertIterableEquals(List.of("Test1", "TestFile"), testNode1._storageInterface.getById(d2id).getNode().getChildren().keySet());
@@ -80,6 +83,10 @@ public class KleppmanTreeSimpleTest {
Assertions.assertEquals(f11.getNode().getMeta(), f12.getNode().getMeta());
Assertions.assertInstanceOf(TestNodeMetaFile.class, f11.getNode().getMeta());
// Trim test
Assertions.assertTrue(testNode1._storageInterface.getLog().size() <= 1);
Assertions.assertTrue(testNode2._storageInterface.getLog().size() <= 1);
}
}

View File

@@ -1,5 +1,8 @@
package com.usatiuk.kleppmanntree;
import java.util.ArrayList;
import java.util.List;
public class TestNode {
protected final long _id;
@@ -8,11 +11,29 @@ public class TestNode {
protected final TestStorageInterface _storageInterface;
protected final KleppmannTree<Long, Long, TestNodeMeta, Long, TestNodeWrapper> _tree;
private class TestOpRecorder implements OpRecorder<Long, Long, TestNodeMeta, Long> {
ArrayList<OpMove<Long, Long, ? extends TestNodeMeta, Long>> ops = new ArrayList<>();
@Override
public void recordOp(OpMove<Long, Long, ? extends TestNodeMeta, Long> op) {
ops.add(op);
}
}
private final TestOpRecorder _recorder;
public TestNode(long id) {
_id = id;
_clock = new TestClock();
_peerInterface = new TestPeerInterface(_id);
_storageInterface = new TestStorageInterface(_id);
_tree = new KleppmannTree<>(_storageInterface, _peerInterface, _clock);
_recorder = new TestOpRecorder();
_tree = new KleppmannTree<>(_storageInterface, _peerInterface, _clock, _recorder);
}
List<OpMove<Long, Long, ? extends TestNodeMeta, Long>> getRecorded() {
var ret = _recorder.ops;
_recorder.ops = new ArrayList<>();
return ret;
}
}

View File

@@ -25,6 +25,16 @@ public class TestNodeWrapper implements TreeNodeWrapper<TestNodeMeta, Long> {
}
@Override
public void lock() {
}
@Override
public void unlock() {
}
@Override
public void notifyRef(Long id) {

View File

@@ -15,6 +15,6 @@ public class TestPeerInterface implements PeerInterface<Long> {
@Override
public Collection<Long> getAllPeers() {
return List.of(1L, 2L, 3L, 4L);
return List.of(1L, 2L);
}
}

View File

@@ -1,13 +1,15 @@
package com.usatiuk.kleppmanntree;
import java.util.*;
import java.util.concurrent.atomic.AtomicReference;
public class TestStorageInterface implements StorageInterface<Long, Long, TestNodeMeta, Long, TestNodeWrapper> {
private long _curId = 1;
private final long _peerId;
private final Map<Long, TreeNode<TestNodeMeta, Long>> _nodes = new HashMap<>();
private final NavigableMap<CombinedTimestamp<Long, Long>, LogOpMove<Long, Long, ? extends TestNodeMeta, Long>> _log = new TreeMap<>();
private final NavigableMap<CombinedTimestamp<Long, Long>, LogRecord<Long, Long, ? extends TestNodeMeta, Long>> _log = new TreeMap<>();
private final Map<Long, AtomicReference<Long>> _peerTimestampLog = new HashMap<>();
public TestStorageInterface(long peerId) {
_peerId = peerId;
@@ -58,27 +60,12 @@ public class TestStorageInterface implements StorageInterface<Long, Long, TestNo
}
@Override
public NavigableMap<CombinedTimestamp<Long, Long>, LogOpMove<Long, Long, ? extends TestNodeMeta, Long>> getLog() {
public NavigableMap<CombinedTimestamp<Long, Long>, LogRecord<Long, Long, ? extends TestNodeMeta, Long>> getLog() {
return _log;
}
@Override
public void globalRwLock() {
}
@Override
public void globalRwUnlock() {
}
@Override
public void globalRLock() {
}
@Override
public void globalRUnlock() {
public Map<Long, AtomicReference<Long>> getPeerTimestampLog() {
return _peerTimestampLog;
}
}

View File

@@ -686,8 +686,6 @@ public class DhfsFileServiceImpl implements DhfsFileService {
File f = new File(fuuid, 0, true);
var newNodeId = _tree.getNewNodeId();
_tree.move(parent.getName(), new JTreeNodeMetaFile(fname, f.getName()), newNodeId);
ChunkData newChunkData = createChunk(UnsafeByteOperations.unsafeWrap(oldpath.getBytes(StandardCharsets.UTF_8)));
ChunkInfo newChunkInfo = new ChunkInfo(newChunkData.getHash(), newChunkData.getBytes().size());
@@ -697,6 +695,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
jObjectManager.put(newChunkInfo, Optional.of(f.getName()));
jObjectManager.put(f, Optional.of(newNodeId));
_tree.move(parent.getName(), new JTreeNodeMetaFile(fname, f.getName()), newNodeId);
return f.getName();
}

View File

@@ -4,6 +4,7 @@ import com.usatiuk.dhfs.objects.jkleppmanntree.helpers.StorageInterfaceService;
import com.usatiuk.dhfs.objects.jkleppmanntree.structs.JTreeNodeMeta;
import com.usatiuk.kleppmanntree.KleppmannTree;
import com.usatiuk.kleppmanntree.OpMove;
import com.usatiuk.kleppmanntree.OpRecorder;
import java.util.List;
import java.util.UUID;
@@ -12,34 +13,42 @@ public class JKleppmannTree {
private final JKleppmannTreePersistentData _persistentData;
private final JStorageInterface _storageInterface;
private final KleppmannTree<Long, UUID, JTreeNodeMeta, String, JTreeNodeWrapper> _tree;
private final JPeerInterface _peerInterface;
private class JOpRecorder implements OpRecorder<Long, UUID, JTreeNodeMeta, String> {
@Override
public void recordOp(OpMove<Long, UUID, ? extends JTreeNodeMeta, String> op) {
_persistentData.recordOp(op);
}
}
JKleppmannTree(JKleppmannTreePersistentData persistentData, StorageInterfaceService storageInterfaceService, JPeerInterface peerInterface) {
_persistentData = persistentData;
var si = new JStorageInterface(persistentData, storageInterfaceService);
_storageInterface = si;
si.ensureRootCreated();
_tree = new KleppmannTree<>(si, peerInterface, _persistentData.getClock());
_peerInterface = peerInterface;
_tree = new KleppmannTree<>(si, peerInterface, _persistentData.getClock(), new JOpRecorder());
}
public String traverse(List<String> names) {
return _tree.traverse(names);
}
public void move(String newParent, JTreeNodeMeta newMeta, String node) {
applyOp(_tree.createMove(newParent, newMeta, node));
}
public String getNewNodeId() {
return _storageInterface.getNewNodeId();
}
public void trash(JTreeNodeMeta newMeta, String node) {
applyOp(_tree.createMove(_storageInterface.getTrashId(), newMeta.withName(node), node));
public void move(String newParent, JTreeNodeMeta newMeta, String node) {
_tree.move(newParent, newMeta, node);
}
public void applyOp(OpMove<Long, UUID, JTreeNodeMeta, String> opMove) {
_persistentData.recordOp(opMove);
_tree.applyOp(opMove);
public void trash(JTreeNodeMeta newMeta, String node) {
_tree.move(_storageInterface.getTrashId(), newMeta, node);
}
void applyExternalOp(UUID from, OpMove<Long, UUID, ? extends JTreeNodeMeta, String> op) {
_tree.applyExternalOp(from, op);
}
}

View File

@@ -1,8 +1,15 @@
package com.usatiuk.dhfs.objects.jkleppmanntree;
import com.usatiuk.dhfs.SerializationHelper;
import com.usatiuk.dhfs.files.objects.File;
import com.usatiuk.dhfs.objects.jkleppmanntree.helpers.JOpWrapper;
import com.usatiuk.dhfs.objects.jkleppmanntree.helpers.OpQueueHelper;
import com.usatiuk.dhfs.objects.jkleppmanntree.helpers.StorageInterfaceService;
import com.usatiuk.dhfs.objects.jkleppmanntree.structs.JTreeNodeMetaFile;
import com.usatiuk.dhfs.objects.jrepository.JObjectManager;
import com.usatiuk.dhfs.objects.repository.invalidation.IncomingOpListener;
import com.usatiuk.dhfs.objects.repository.invalidation.Op;
import com.usatiuk.dhfs.objects.repository.invalidation.OpListenerDispatcher;
import com.usatiuk.kleppmanntree.AtomicClock;
import io.quarkus.logging.Log;
import io.quarkus.runtime.ShutdownEvent;
@@ -17,6 +24,8 @@ import org.eclipse.microprofile.config.inject.ConfigProperty;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
@@ -29,6 +38,10 @@ public class JKleppmannTreeManager {
StorageInterfaceService storageInterfaceService;
@Inject
OpQueueHelper opQueueHelper;
@Inject
OpListenerDispatcher opListenerDispatcher;
@Inject
JObjectManager jObjectManager;
@ConfigProperty(name = "dhfs.objects.root")
String dataRoot;
@@ -76,6 +89,24 @@ public class JKleppmannTreeManager {
private JKleppmannTree createTree(String name) {
var pdata = _persistentData.computeIfAbsent(name, n -> new JKleppmannTreePersistentData(opQueueHelper, n, new AtomicClock()));
pdata.restoreHelper(opQueueHelper);
return new JKleppmannTree(pdata, storageInterfaceService, jPeerInterface);
var tree = new JKleppmannTree(pdata, storageInterfaceService, jPeerInterface);
opListenerDispatcher.registerListener(pdata.getId(), new IncomingOpListener() {
@Override
public void accept(UUID incomingPeer, Op op) {
if (!(op instanceof JOpWrapper jop))
throw new IllegalArgumentException("Invalid incoming op type for JKleppmannTree: " + op.getClass() + " " + name);
if (jop.getOp().newMeta() instanceof JTreeNodeMetaFile f) {
var fino = f.getFileIno();
jObjectManager.getOrPut(fino, File.class, Optional.of(jop.getOp().childId()));
}
if (Log.isTraceEnabled())
Log.trace("Received op from " + incomingPeer + ": " + jop.getOp().timestamp().timestamp() + " " + jop.getOp().childId() + "->" + jop.getOp().newParentId());
tree.applyExternalOp(incomingPeer, jop.getOp());
}
});
return tree;
}
}

View File

@@ -1,34 +1,35 @@
package com.usatiuk.dhfs.objects.jkleppmanntree;
import com.usatiuk.dhfs.objects.jkleppmanntree.helpers.JOpWrapper;
import com.usatiuk.dhfs.objects.jkleppmanntree.helpers.OpQueueHelper;
import com.usatiuk.dhfs.objects.jkleppmanntree.structs.JTreeNodeMeta;
import com.usatiuk.dhfs.objects.repository.invalidation.Op;
import com.usatiuk.dhfs.objects.repository.invalidation.OpQueue;
import com.usatiuk.kleppmanntree.AtomicClock;
import com.usatiuk.kleppmanntree.CombinedTimestamp;
import com.usatiuk.kleppmanntree.LogOpMove;
import com.usatiuk.kleppmanntree.LogRecord;
import com.usatiuk.kleppmanntree.OpMove;
import lombok.Getter;
import java.io.Serializable;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.atomic.AtomicReference;
public class JKleppmannTreePersistentData implements Serializable, OpQueue {
@Getter
private final String _name;
@Getter
private final AtomicClock _clock;
@Getter
private final UUID _selfUuid;
@Getter
private final ConcurrentHashMap<UUID, ConcurrentLinkedQueue<OpMove<Long, UUID, JTreeNodeMeta, String>>> _queues = new ConcurrentHashMap<>();
private final ConcurrentHashMap<UUID, ConcurrentLinkedQueue<OpMove<Long, UUID, ? extends JTreeNodeMeta, String>>> _queues = new ConcurrentHashMap<>();
@Getter
private final ConcurrentSkipListMap<CombinedTimestamp<Long, UUID>, LogOpMove<Long, UUID, ? extends JTreeNodeMeta, String>> _log = new ConcurrentSkipListMap<>();
private final TreeMap<CombinedTimestamp<Long, UUID>, LogRecord<Long, UUID, ? extends JTreeNodeMeta, String>> _log = new TreeMap<>();
@Getter
private final ReentrantReadWriteLock _logLock = new ReentrantReadWriteLock();
private final ConcurrentHashMap<UUID, AtomicReference<Long>> _peerTimestampLog = new ConcurrentHashMap<>();
private transient OpQueueHelper _helper;
@@ -41,18 +42,34 @@ public class JKleppmannTreePersistentData implements Serializable, OpQueue {
public void restoreHelper(OpQueueHelper opQueueHelper) {
_helper = opQueueHelper;
_helper.registerOnConnection(this);
_helper.onRestore(this);
}
@Override
public Object getForHost(UUID host) {
public Op getForHost(UUID host) {
if (_queues.containsKey(host)) {
return _queues.get(host).poll();
var peeked = _queues.get(host).peek();
return peeked != null ? new JOpWrapper(_queues.get(host).peek()) : null;
}
return null;
}
void recordOp(OpMove<Long, UUID, JTreeNodeMeta, String> opMove) {
@Override
public String getId() {
return _name;
}
@Override
public void commitOneForHost(UUID host, Op op) {
var got = _queues.get(host).poll();
if (!(op instanceof JOpWrapper jw))
throw new IllegalArgumentException("Unexpected type for commitOneForHost: " + op.getClass().getName());
if (jw.getOp() != got) {
throw new IllegalArgumentException("Committed op push was not the oldest");
}
}
void recordOp(OpMove<Long, UUID, ? extends JTreeNodeMeta, String> opMove) {
for (var u : _helper.getHostList()) {
_queues.computeIfAbsent(u, h -> new ConcurrentLinkedQueue<>());
_queues.get(u).add(opMove);
@@ -61,5 +78,6 @@ public class JKleppmannTreePersistentData implements Serializable, OpQueue {
}
protected void notifyInvQueue() {
_helper.notifyOpSender(this);
}
}

View File

@@ -4,7 +4,6 @@ import com.usatiuk.dhfs.objects.repository.PersistentRemoteHostsService;
import com.usatiuk.kleppmanntree.PeerInterface;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import org.apache.commons.lang3.NotImplementedException;
import java.util.Collection;
import java.util.UUID;
@@ -21,6 +20,6 @@ public class JPeerInterface implements PeerInterface<UUID> {
@Override
public Collection<UUID> getAllPeers() {
throw new NotImplementedException();
return persistentRemoteHostsService.getHostUuidsAndSelf();
}
}

View File

@@ -6,16 +6,18 @@ import com.usatiuk.dhfs.objects.jkleppmanntree.structs.JTreeNodeMetaDirectory;
import com.usatiuk.dhfs.objects.jkleppmanntree.structs.TreeNodeJObjectData;
import com.usatiuk.dhfs.objects.jrepository.JObject;
import com.usatiuk.kleppmanntree.CombinedTimestamp;
import com.usatiuk.kleppmanntree.LogOpMove;
import com.usatiuk.kleppmanntree.LogRecord;
import com.usatiuk.kleppmanntree.StorageInterface;
import com.usatiuk.kleppmanntree.TreeNode;
import org.apache.commons.lang3.NotImplementedException;
import java.util.Collection;
import java.util.Map;
import java.util.NavigableMap;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
public class JStorageInterface implements StorageInterface<Long, UUID, JTreeNodeMeta, String, JTreeNodeWrapper> {
public class JStorageInterface implements StorageInterface<Long, UUID, JTreeNodeMeta, String, JTreeNodeWrapper> {
private final JKleppmannTreePersistentData _persistentData;
private final StorageInterfaceService _storageInterfaceService;
@@ -34,12 +36,12 @@ public class JStorageInterface implements StorageInterface<Long, UUID, JTreeNod
@Override
public String getRootId() {
return _persistentData.getName() + "_jt_root";
return _persistentData.getId() + "_jt_root";
}
@Override
public String getTrashId() {
return _persistentData.getName() + "_jt_trash";
return _persistentData.getId() + "_jt_trash";
}
@Override
@@ -55,7 +57,7 @@ public class JStorageInterface implements StorageInterface<Long, UUID, JTreeNod
}
@Override
public JTreeNodeWrapper createNewNode(TreeNode< JTreeNodeMeta, String> node) {
public JTreeNodeWrapper createNewNode(TreeNode<JTreeNodeMeta, String> node) {
return new JTreeNodeWrapper(_storageInterfaceService.putObjectLocked(new TreeNodeJObjectData(node)));
}
@@ -70,27 +72,12 @@ public class JStorageInterface implements StorageInterface<Long, UUID, JTreeNod
}
@Override
public NavigableMap<CombinedTimestamp<Long, UUID>, LogOpMove<Long, UUID, ? extends JTreeNodeMeta, String>> getLog() {
public NavigableMap<CombinedTimestamp<Long, UUID>, LogRecord<Long, UUID, ? extends JTreeNodeMeta, String>> getLog() {
return _persistentData.getLog();
}
@Override
public void globalRwLock() {
_persistentData.getLogLock().writeLock().lock();
}
@Override
public void globalRwUnlock() {
_persistentData.getLogLock().writeLock().unlock();
}
@Override
public void globalRLock() {
_persistentData.getLogLock().readLock().lock();
}
@Override
public void globalRUnlock() {
_persistentData.getLogLock().readLock().unlock();
public Map<UUID, AtomicReference<Long>> getPeerTimestampLog() {
return _persistentData.getPeerTimestampLog();
}
}

View File

@@ -32,6 +32,22 @@ public class JTreeNodeWrapper implements TreeNodeWrapper<JTreeNodeMeta, String>
_backing.rwUnlock();
}
@Override
public void lock() {
_backing.runWriteLocked(JObject.ResolutionStrategy.NO_RESOLUTION, (m, d, b, v) -> {
m.lock();
return null;
});
}
@Override
public void unlock() {
_backing.runWriteLocked(JObject.ResolutionStrategy.NO_RESOLUTION, (m, d, b, v) -> {
m.unlock();
return null;
});
}
@Override
public void notifyRef(String id) {
_backing.getMeta().addRef(id);
@@ -44,7 +60,7 @@ public class JTreeNodeWrapper implements TreeNodeWrapper<JTreeNodeMeta, String>
@Override
public TreeNode<JTreeNodeMeta, String> getNode() {
_backing.tryResolve(JObject.ResolutionStrategy.REMOTE);
_backing.tryResolve(JObject.ResolutionStrategy.LOCAL_ONLY);
if (_backing.getData() == null) throw new IllegalStateException("Node " + _backing.getName() + " data lost!");
return _backing.getData().getNode();
}

View File

@@ -0,0 +1,19 @@
package com.usatiuk.dhfs.objects.jkleppmanntree.helpers;
import com.usatiuk.dhfs.objects.jkleppmanntree.structs.JTreeNodeMeta;
import com.usatiuk.dhfs.objects.repository.invalidation.Op;
import com.usatiuk.kleppmanntree.OpMove;
import lombok.Getter;
import java.util.UUID;
// Wrapper to avoid having to specify generic types
public class JOpWrapper implements Op {
@Getter
private final OpMove<Long, UUID, ? extends JTreeNodeMeta, String> _op;
public JOpWrapper(OpMove<Long, UUID, ? extends JTreeNodeMeta, String> op) {
if (op == null) throw new IllegalArgumentException("op shouldn't be null");
_op = op;
}
}

View File

@@ -0,0 +1,40 @@
package com.usatiuk.dhfs.objects.jkleppmanntree.helpers;
import com.usatiuk.dhfs.objects.jkleppmanntree.structs.JTreeNodeMeta;
import com.usatiuk.dhfs.objects.protoserializer.ProtoDeserializer;
import com.usatiuk.dhfs.objects.protoserializer.ProtoSerializer;
import com.usatiuk.dhfs.objects.protoserializer.ProtoSerializerService;
import com.usatiuk.dhfs.objects.repository.OpPushJKleppmannTree;
import com.usatiuk.kleppmanntree.CombinedTimestamp;
import com.usatiuk.kleppmanntree.OpMove;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import java.util.UUID;
@ApplicationScoped
public class OpProtoSerializer implements ProtoDeserializer<OpPushJKleppmannTree, JOpWrapper>, ProtoSerializer<OpPushJKleppmannTree, JOpWrapper> {
@Inject
ProtoSerializerService protoSerializerService;
@Override
public JOpWrapper deserialize(OpPushJKleppmannTree message) {
return new JOpWrapper(new OpMove<>(
new CombinedTimestamp<>(message.getTimestamp(), UUID.fromString(message.getPeer())), message.getNewParentId(),
message.hasMeta() ? protoSerializerService.deserialize(message.getMeta()) : null,
message.getChild()
));
}
@Override
public OpPushJKleppmannTree serialize(JOpWrapper object) {
var builder = OpPushJKleppmannTree.newBuilder();
builder.setTimestamp(object.getOp().timestamp().timestamp())
.setPeer(object.getOp().timestamp().nodeId().toString())
.setNewParentId(object.getOp().newParentId())
.setChild(object.getOp().childId());
if (object.getOp().newMeta() != null)
builder.setMeta(protoSerializerService.serializeToTreeNodeMetaP(object.getOp().newMeta()));
return builder.build();
}
}

View File

@@ -3,7 +3,7 @@ package com.usatiuk.dhfs.objects.jkleppmanntree.helpers;
import com.usatiuk.dhfs.objects.jkleppmanntree.JKleppmannTreePersistentData;
import com.usatiuk.dhfs.objects.repository.PersistentRemoteHostsService;
import com.usatiuk.dhfs.objects.repository.RemoteHostManager;
import com.usatiuk.dhfs.objects.repository.invalidation.InvalidationQueueService;
import com.usatiuk.dhfs.objects.repository.invalidation.OpSender;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
@@ -15,16 +15,16 @@ public class OpQueueHelper {
@Inject
RemoteHostManager remoteHostManager;
@Inject
InvalidationQueueService invalidationQueueService;
OpSender opSender;
@Inject
PersistentRemoteHostsService persistentRemoteHostsService;
public void registerOnConnection(JKleppmannTreePersistentData self) {
remoteHostManager.registerConnectEventListener(h -> notifyInvQueue(self));
public void onRestore(JKleppmannTreePersistentData self) {
remoteHostManager.registerConnectEventListener(h -> notifyOpSender(self));
}
public void notifyInvQueue(JKleppmannTreePersistentData self) {
invalidationQueueService.pushInvalidationToAll(self);
public void notifyOpSender(JKleppmannTreePersistentData self) {
opSender.push(self);
}
public UUID getSelfUUid() {

View File

@@ -0,0 +1,26 @@
package com.usatiuk.dhfs.objects.jkleppmanntree.serializers;
import com.usatiuk.dhfs.objects.jkleppmanntree.structs.JTreeNodeMeta;
import com.usatiuk.dhfs.objects.jkleppmanntree.structs.JTreeNodeMetaDirectory;
import com.usatiuk.dhfs.objects.jkleppmanntree.structs.JTreeNodeMetaFile;
import com.usatiuk.dhfs.objects.persistence.TreeNodeMetaP;
import com.usatiuk.dhfs.objects.protoserializer.ProtoDeserializer;
import com.usatiuk.dhfs.objects.protoserializer.ProtoSerializerService;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
@ApplicationScoped
public class TreeNodeMetaProtoDeseraializer implements ProtoDeserializer<TreeNodeMetaP, JTreeNodeMeta> {
@Inject
ProtoSerializerService protoSerializerService;
@Override
public JTreeNodeMeta deserialize(TreeNodeMetaP message) {
return switch (message.getMetaCase()) {
case FILE -> (JTreeNodeMetaFile) protoSerializerService.deserialize(message.getFile());
case DIR -> (JTreeNodeMetaDirectory) protoSerializerService.deserialize(message.getDir());
case META_NOT_SET -> throw new IllegalArgumentException("TreeNodeMetaP is null");
};
}
}

View File

@@ -1,11 +1,7 @@
package com.usatiuk.dhfs.objects.jkleppmanntree.serializers;
import com.usatiuk.dhfs.objects.jkleppmanntree.structs.JTreeNodeMeta;
import com.usatiuk.dhfs.objects.jkleppmanntree.structs.JTreeNodeMetaDirectory;
import com.usatiuk.dhfs.objects.jkleppmanntree.structs.JTreeNodeMetaFile;
import com.usatiuk.dhfs.objects.jkleppmanntree.structs.TreeNodeJObjectData;
import com.usatiuk.dhfs.objects.persistence.TreeNodeMetaDirectoryP;
import com.usatiuk.dhfs.objects.persistence.TreeNodeMetaFileP;
import com.usatiuk.dhfs.objects.persistence.TreeNodeP;
import com.usatiuk.dhfs.objects.protoserializer.ProtoDeserializer;
import com.usatiuk.dhfs.objects.protoserializer.ProtoSerializer;
@@ -21,15 +17,10 @@ public class TreeNodeProtoSerializer implements ProtoDeserializer<TreeNodeP, Tre
@Override
public TreeNodeJObjectData deserialize(TreeNodeP message) {
JTreeNodeMeta meta = switch (message.getMetaCase()) {
case FILE -> (JTreeNodeMetaFile) protoSerializerService.deserialize(message.getFile());
case DIR -> (JTreeNodeMetaDirectory) protoSerializerService.deserialize(message.getDir());
case META_NOT_SET -> null;
};
var node = new TreeNode<>(
message.getId(),
message.hasParent() ? message.getParent() : null,
meta
message.hasMeta() ? (JTreeNodeMeta) protoSerializerService.deserialize(message.getMeta()) : null
);
node.getChildren().putAll(message.getChildrenMap());
return new TreeNodeJObjectData(node);
@@ -40,13 +31,8 @@ public class TreeNodeProtoSerializer implements ProtoDeserializer<TreeNodeP, Tre
var builder = TreeNodeP.newBuilder().setId(object.getNode().getId()).putAllChildren(object.getNode().getChildren());
if (object.getNode().getParent() != null)
builder.setParent(object.getNode().getParent());
switch (object.getNode().getMeta()) {
case JTreeNodeMetaFile jTreeNodeMetaFile ->
builder.setFile((TreeNodeMetaFileP) protoSerializerService.serialize(jTreeNodeMetaFile));
case JTreeNodeMetaDirectory jTreeNodeMetaDirectory ->
builder.setDir((TreeNodeMetaDirectoryP) protoSerializerService.serialize(jTreeNodeMetaDirectory));
case null, default -> {
}
if (object.getNode().getMeta() != null) {
builder.setMeta(protoSerializerService.serializeToTreeNodeMetaP(object.getNode().getMeta()));
}
return builder.build();
}

View File

@@ -36,4 +36,9 @@ public class TreeNodeJObjectData extends JObjectData {
return List.of(((JTreeNodeMetaFile) _node.getMeta()).getFileIno());
return Collections.unmodifiableCollection(_node.getChildren().values());
}
@Override
public Class<? extends JObjectData> getRefType() {
return JObjectData.class;
}
}

View File

@@ -113,9 +113,9 @@ public class JObjectManagerImpl implements JObjectManager {
@Override
public Collection<String> findAll() {
var out = _map.values().stream().map(WeakReference::get)
.filter(Objects::nonNull)
.map(JObject::getName)
.collect(Collectors.toCollection((Supplier<LinkedHashSet<String>>) LinkedHashSet::new));
.filter(Objects::nonNull)
.map(JObject::getName)
.collect(Collectors.toCollection((Supplier<LinkedHashSet<String>>) LinkedHashSet::new));
out.addAll(objectPersistentStore.findAllObjects());
return out;
}
@@ -193,7 +193,6 @@ public class JObjectManagerImpl implements JObjectManager {
m.addRef(s);
return true;
}));
jObjectLRU.notifyAccess(got.get());
return got.get();
}

View File

@@ -87,8 +87,8 @@ public class JObjectWriteback {
Thread.sleep(1000);
if (_currentSize.get() > 0)
Log.info("Writeback status: size="
+ _currentSize.get() / 1024 / 1024 + "MB"
+ " watermark=" + (_watermarkReached.get() ? "reached" : "not reached"));
+ _currentSize.get() / 1024 / 1024 + "MB"
+ " watermark=" + (_watermarkReached.get() ? "reached" : "not reached"));
}
} catch (InterruptedException ignored) {
}
@@ -166,6 +166,7 @@ public class JObjectWriteback {
// FIXME: assert Rw lock here?
Log.debug("Deleting from persistent storage " + m.getName());
objectPersistentStore.deleteObject(m.getName());
m.markUnWritten();
return;
}
objectPersistentStore.writeObjectMeta(m.getName(), protoSerializerService.serialize(m));

View File

@@ -94,6 +94,11 @@ public class ObjectMetadata implements Serializable {
_written = true;
}
// FIXME:? a better way?
public void markUnWritten() {
_written = false;
}
public boolean isReferred() {
return !_referrers.isEmpty();
}
@@ -123,10 +128,6 @@ public class ObjectMetadata implements Serializable {
}
public void removeRef(String from) {
if (isLocked()) {
unlock();
Log.error("Object " + getName() + " is locked, but we removed a reference to it, unlocking!");
}
if (Log.isTraceEnabled())
Log.trace("Removing ref " + from + " from " + getName());
_referrers.remove(from);

View File

@@ -1,7 +1,11 @@
package com.usatiuk.dhfs.objects.protoserializer;
import com.google.protobuf.Message;
import com.usatiuk.dhfs.objects.jkleppmanntree.structs.JTreeNodeMeta;
import com.usatiuk.dhfs.objects.persistence.*;
import com.usatiuk.dhfs.objects.repository.OpPushJKleppmannTree;
import com.usatiuk.dhfs.objects.repository.OpPushPayload;
import com.usatiuk.dhfs.objects.repository.invalidation.Op;
import io.quarkus.arc.ClientProxy;
import jakarta.annotation.PostConstruct;
import jakarta.enterprise.context.ApplicationScoped;
@@ -108,6 +112,30 @@ public class ProtoSerializerService {
return serializeToJObjectDataPInternal(object).orElseThrow(() -> new IllegalStateException("Unknown JObjectDataP type: " + object.getClass()));
}
// FIXME: This is annoying
public <O extends JTreeNodeMeta> TreeNodeMetaP serializeToTreeNodeMetaP(O object) {
if (object == null) throw new IllegalArgumentException("Object to serialize shouldn't be null");
var ser = serialize(object);
if (ser instanceof TreeNodeMetaDirectoryP) {
return TreeNodeMetaP.newBuilder().setDir((TreeNodeMetaDirectoryP) ser).build();
} else if (ser instanceof TreeNodeMetaFileP) {
return TreeNodeMetaP.newBuilder().setFile((TreeNodeMetaFileP) ser).build();
} else {
throw new IllegalArgumentException("Unexpected object type on input to serializeToTreeNodeMetaP: " + object.getClass());
}
}
// FIXME: This is annoying
public <O extends Op> OpPushPayload serializeToOpPushPayload(O object) {
if (object == null) throw new IllegalArgumentException("Object to serialize shouldn't be null");
var ser = serialize(object);
if (ser instanceof OpPushJKleppmannTree) {
return OpPushPayload.newBuilder().setJKleppmannTreeOp((OpPushJKleppmannTree) ser).build();
} else {
throw new IllegalArgumentException("Unexpected object type on input to serializeToTreeNodeMetaP: " + object.getClass());
}
}
public <M extends Message, O> O deserialize(M message) {
if (!_deserializers.containsKey(message.getClass()))
throw new IllegalStateException("Deserializer not registered: " + message.getClass());

View File

@@ -161,14 +161,14 @@ public class PersistentRemoteHostsService {
private List<PersistentPeerInfo> getPeersSnapshot() {
return getPeerDirectory().runReadLocked(JObject.ResolutionStrategy.LOCAL_ONLY,
(m, d) -> d.getPeers().stream().map(u -> {
try {
return getPeer(u).runReadLocked(JObject.ResolutionStrategy.LOCAL_ONLY, (m2, d2) -> d2);
} catch (Exception e) {
Log.warn("Error making snapshot of peer " + u, e);
return null;
}
}).filter(Objects::nonNull).toList());
(m, d) -> d.getPeers().stream().map(u -> {
try {
return getPeer(u).runReadLocked(JObject.ResolutionStrategy.LOCAL_ONLY, (m2, d2) -> d2);
} catch (Exception e) {
Log.warn("Error making snapshot of peer " + u, e);
return null;
}
}).filter(Objects::nonNull).toList());
}
public UUID getSelfUuid() {
@@ -197,14 +197,18 @@ public class PersistentRemoteHostsService {
return getPeerDirectory().runReadLocked(JObject.ResolutionStrategy.LOCAL_ONLY, (m, d) -> d.getPeers().stream().filter(i -> !i.equals(_selfUuid)).toList());
}
public List<UUID> getHostUuidsAndSelf() {
return getPeerDirectory().runReadLocked(JObject.ResolutionStrategy.LOCAL_ONLY, (m, d) -> d.getPeers().stream().toList());
}
public List<PersistentPeerInfo> getHostsNoNulls() {
for (int i = 0; i < 5; i++) {
try {
return getPeerDirectory()
.runReadLocked(JObject.ResolutionStrategy.LOCAL_ONLY,
(m, d) -> d.getPeers().stream()
.map(u -> getPeer(u).runReadLocked(JObject.ResolutionStrategy.LOCAL_ONLY, (m2, d2) -> d2))
.filter(e -> !e.getUuid().equals(_selfUuid)).toList());
(m, d) -> d.getPeers().stream()
.map(u -> getPeer(u).runReadLocked(JObject.ResolutionStrategy.LOCAL_ONLY, (m2, d2) -> d2))
.filter(e -> !e.getUuid().equals(_selfUuid)).toList());
} catch (Exception e) {
Log.warn("Error when making snapshot of hosts ", e);
try {

View File

@@ -7,6 +7,7 @@ import com.usatiuk.dhfs.objects.jrepository.PushResolution;
import com.usatiuk.dhfs.objects.persistence.JObjectDataP;
import com.usatiuk.dhfs.objects.protoserializer.ProtoSerializerService;
import com.usatiuk.dhfs.objects.repository.invalidation.InvalidationQueueService;
import com.usatiuk.dhfs.objects.repository.invalidation.Op;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.quarkus.logging.Log;
@@ -52,8 +53,8 @@ public class RemoteObjectServiceClient {
var ourVersion = md.getOurVersion();
if (ourVersion >= 1)
return md.getRemoteCopies().entrySet().stream()
.filter(entry -> entry.getValue().equals(ourVersion))
.map(Map.Entry::getKey).toList();
.filter(entry -> entry.getValue().equals(ourVersion))
.map(Map.Entry::getKey).toList();
else
return persistentRemoteHostsService.getHostUuids();
});
@@ -85,7 +86,7 @@ public class RemoteObjectServiceClient {
throw new StatusRuntimeException(Status.ABORTED.withDescription("Received outdated object version"));
} catch (Exception e) {
Log.error("Received unexpected object version from " + reply.getSelfUuid()
+ " for " + reply.getObject().getHeader().getName() + " and conflict resolution failed", e);
+ " for " + reply.getObject().getHeader().getName() + " and conflict resolution failed", e);
throw new StatusRuntimeException(Status.ABORTED.withDescription("Received unexpected object version"));
}
}
@@ -108,8 +109,8 @@ public class RemoteObjectServiceClient {
var header = obj
.runReadLocked(
obj.getKnownClass().isAnnotationPresent(PushResolution.class)
? JObject.ResolutionStrategy.LOCAL_ONLY
: JObject.ResolutionStrategy.NO_RESOLUTION,
? JObject.ResolutionStrategy.LOCAL_ONLY
: JObject.ResolutionStrategy.NO_RESOLUTION,
(m, d) -> {
if (m.getKnownClass().isAnnotationPresent(PushResolution.class) && d == null)
Log.warn("Object " + m.getName() + " is marked as PushResolution but no resolution found");
@@ -126,6 +127,15 @@ public class RemoteObjectServiceClient {
return rpcClientFactory.withObjSyncClient(host, client -> client.indexUpdate(send));
}
public OpPushReply pushOp(Op op, String queueName, UUID host) {
var msg = OpPushMsg.newBuilder()
.setSelfUuid(persistentRemoteHostsService.getSelfUuid().toString())
.setQueueId(queueName)
.setMsg(protoSerializerService.serializeToOpPushPayload(op))
.build();
return rpcClientFactory.withObjSyncClient(host, client -> client.opPush(msg));
}
public Collection<CanDeleteReply> canDelete(Collection<UUID> targets, String object, Collection<String> ourReferrers) {
ConcurrentLinkedDeque<CanDeleteReply> results = new ConcurrentLinkedDeque<>();
Log.trace("Asking canDelete for " + object + " from " + targets.stream().map(UUID::toString).collect(Collectors.joining(", ")));
@@ -134,8 +144,8 @@ public class RemoteObjectServiceClient {
executor.invokeAll(targets.stream().<Callable<Void>>map(h -> () -> {
try {
var req = CanDeleteRequest.newBuilder()
.setSelfUuid(persistentRemoteHostsService.getSelfUuid().toString())
.setName(object);
.setSelfUuid(persistentRemoteHostsService.getSelfUuid().toString())
.setName(object);
req.addAllOurReferrers(ourReferrers);
var res = rpcClientFactory.withObjSyncClient(h, client -> client.canDelete(req.build()));
if (res != null)

View File

@@ -7,6 +7,7 @@ import com.usatiuk.dhfs.objects.persistence.JObjectDataP;
import com.usatiuk.dhfs.objects.protoserializer.ProtoSerializerService;
import com.usatiuk.dhfs.objects.repository.autosync.AutoSyncProcessor;
import com.usatiuk.dhfs.objects.repository.invalidation.InvalidationQueueService;
import com.usatiuk.dhfs.objects.repository.invalidation.OpListenerDispatcher;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.quarkus.grpc.GrpcService;
@@ -44,6 +45,9 @@ public class RemoteObjectServiceServer implements DhfsObjectSyncGrpc {
@Inject
ProtoSerializerService protoSerializerService;
@Inject
OpListenerDispatcher opListenerDispatcher;
@Override
@Blocking
public Uni<GetObjectReply> getObject(GetObjectRequest request) {
@@ -66,8 +70,8 @@ public class RemoteObjectServiceServer implements DhfsObjectSyncGrpc {
obj.markSeen();
var replyObj = ApiObject.newBuilder().setHeader(read.getLeft()).setContent(read.getRight()).build();
return Uni.createFrom().item(GetObjectReply.newBuilder()
.setSelfUuid(persistentRemoteHostsService.getSelfUuid().toString())
.setObject(replyObj).build());
.setSelfUuid(persistentRemoteHostsService.getSelfUuid().toString())
.setObject(replyObj).build());
}
@Override
@@ -142,6 +146,21 @@ public class RemoteObjectServiceServer implements DhfsObjectSyncGrpc {
return Uni.createFrom().item(syncHandler.handleRemoteUpdate(request));
}
@Override
public Uni<OpPushReply> opPush(OpPushMsg request) {
if (request.getSelfUuid().isBlank()) throw new StatusRuntimeException(Status.INVALID_ARGUMENT);
if (!persistentRemoteHostsService.existsHost(UUID.fromString(request.getSelfUuid())))
throw new StatusRuntimeException(Status.UNAUTHENTICATED);
try {
opListenerDispatcher.accept(request.getQueueId(), UUID.fromString(request.getSelfUuid()), protoSerializerService.deserialize(request.getMsg()));
} catch (Exception e) {
Log.error(e, e);
throw e;
}
return Uni.createFrom().item(OpPushReply.getDefaultInstance());
}
@Override
@Blocking
public Uni<PingReply> ping(PingRequest request) {

View File

@@ -0,0 +1,7 @@
package com.usatiuk.dhfs.objects.repository.invalidation;
import java.util.UUID;
public interface IncomingOpListener {
void accept(UUID incomingPeer, Op op);
}

View File

@@ -181,10 +181,6 @@ public class InvalidationQueueService {
pushInvalidationToOne(host, jObjectManager.get(name).orElseThrow(() -> new IllegalArgumentException("Object " + name + " not found")));
}
public void pushInvalidationToAll(OpQueue queue) {
}
protected void pushDeferredInvalidations(UUID host, String name) {
_queue.add(Pair.of(host, name));
}

View File

@@ -0,0 +1,4 @@
package com.usatiuk.dhfs.objects.repository.invalidation;
public interface Op {
}

View File

@@ -0,0 +1,21 @@
package com.usatiuk.dhfs.objects.repository.invalidation;
import jakarta.enterprise.context.ApplicationScoped;
import java.util.HashMap;
import java.util.UUID;
@ApplicationScoped
public class OpListenerDispatcher {
private final HashMap<String, IncomingOpListener> _listeners = new HashMap<>();
public void registerListener(String queueId, IncomingOpListener listener) {
_listeners.put(queueId, listener);
}
public void accept(String queueId, UUID hostFrom, Op op) {
var got = _listeners.get(queueId);
if (got == null) throw new IllegalArgumentException("Queue with id " + queueId + " not registered");
got.accept(hostFrom, op);
}
}

View File

@@ -0,0 +1,23 @@
package com.usatiuk.dhfs.objects.repository.invalidation;
import com.usatiuk.dhfs.objects.jkleppmanntree.helpers.JOpWrapper;
import com.usatiuk.dhfs.objects.protoserializer.ProtoDeserializer;
import com.usatiuk.dhfs.objects.protoserializer.ProtoSerializerService;
import com.usatiuk.dhfs.objects.repository.OpPushPayload;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
@ApplicationScoped
public class OpPushPayloadDeserializer implements ProtoDeserializer<OpPushPayload, Op> {
@Inject
ProtoSerializerService protoSerializerService;
@Override
public Op deserialize(OpPushPayload message) {
return switch (message.getPayloadCase()) {
case JKLEPPMANNTREEOP -> (JOpWrapper) protoSerializerService.deserialize(message.getJKleppmannTreeOp());
case PAYLOAD_NOT_SET -> throw new IllegalArgumentException("OpPushPayload is null");
};
}
}

View File

@@ -1,8 +1,11 @@
package com.usatiuk.dhfs.objects.repository.invalidation;
import java.util.Collection;
import java.util.UUID;
public interface OpQueue {
Object getForHost(UUID host);
Op getForHost(UUID host);
String getId();
void commitOneForHost(UUID host, Op op);
}

View File

@@ -0,0 +1,75 @@
package com.usatiuk.dhfs.objects.repository.invalidation;
import com.usatiuk.dhfs.objects.repository.RemoteHostManager;
import com.usatiuk.dhfs.objects.repository.RemoteObjectServiceClient;
import com.usatiuk.utils.HashSetDelayedBlockingQueue;
import io.quarkus.logging.Log;
import io.quarkus.runtime.ShutdownEvent;
import io.quarkus.runtime.Startup;
import jakarta.annotation.Priority;
import jakarta.enterprise.event.Observes;
import jakarta.inject.Inject;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class OpSender {
@Inject
RemoteHostManager remoteHostManager;
@Inject
RemoteObjectServiceClient remoteObjectServiceClient;
private static final int _threads = 1;
private ExecutorService _executor;
private volatile boolean _shutdown = false;
private final HashSetDelayedBlockingQueue<OpQueue> _queue = new HashSetDelayedBlockingQueue<>(0); // FIXME:
@Startup
void init() {
BasicThreadFactory factory = new BasicThreadFactory.Builder()
.namingPattern("opsender-%d")
.build();
_executor = Executors.newFixedThreadPool(_threads, factory);
for (int i = 0; i < _threads; i++) {
_executor.submit(this::sender);
}
}
void shutdown(@Observes @Priority(10) ShutdownEvent event) throws InterruptedException {
_shutdown = true;
_executor.shutdownNow();
if (!_executor.awaitTermination(30, TimeUnit.SECONDS)) {
Log.error("Failed to shut down op sender thread");
}
}
private void sender() {
while (!_shutdown) {
try {
var got = _queue.get();
for (var h : remoteHostManager.getAvailableHosts()) {
Op op;
while ((op = got.getForHost(h)) != null) {
try {
remoteObjectServiceClient.pushOp(op, got.getId(), h);
got.commitOneForHost(h, op);
} catch (Exception e) {
Log.info("Error sending op to " + h, e);
break;
}
}
}
} catch (InterruptedException ignored) {
}
}
}
public void push(OpQueue queue) {
_queue.readd(queue);
}
}

View File

@@ -66,14 +66,18 @@ message TreeNodeMetaDirectoryP {
string name = 1;
}
message TreeNodeMetaP {
oneof meta {
TreeNodeMetaFileP file = 1;
TreeNodeMetaDirectoryP dir = 2;
}
}
message TreeNodeP {
optional string parent = 1;
string id = 2;
map<string, string> children = 3;
oneof meta {
TreeNodeMetaFileP file = 4;
TreeNodeMetaDirectoryP dir = 5;
}
optional TreeNodeMetaP meta = 4;
}
message JObjectDataP {

View File

@@ -13,6 +13,7 @@ service DhfsObjectSyncGrpc {
rpc CanDelete (CanDeleteRequest) returns (CanDeleteReply) {}
rpc GetIndex (GetIndexRequest) returns (GetIndexReply) {}
rpc IndexUpdate (IndexUpdatePush) returns (IndexUpdateReply) {}
rpc OpPush (OpPushMsg) returns (OpPushReply) {}
rpc Ping (PingRequest) returns (PingReply) {}
}
@@ -83,4 +84,28 @@ message IndexUpdatePush {
ObjectHeader header = 1;
}
message IndexUpdateReply {}
message IndexUpdateReply {}
message OpPushJKleppmannTree {
int64 timestamp = 1;
string peer = 2;
string newParentId = 3;
dhfs.objects.persistence.TreeNodeMetaP meta = 4;
string child = 5;
}
message OpPushPayload {
oneof payload {
OpPushJKleppmannTree jKleppmannTreeOp = 1;
}
}
message OpPushMsg {
string selfUuid = 10;
string queueId = 1;
OpPushPayload msg = 2;
}
message OpPushReply {
}

View File

@@ -40,23 +40,23 @@ public class DhfsFuseIT {
Network network = Network.newNetwork();
var image = new ImageFromDockerfile()
.withDockerfileFromBuilder(builder ->
builder
.from("azul/zulu-openjdk-debian:21-jre-latest")
.run("apt update && apt install -y libfuse2 curl")
.copy("/app", "/app")
.cmd("java", "-ea", "-Xmx128M", "--add-exports", "java.base/sun.nio.ch=ALL-UNNAMED",
"--add-exports", "java.base/jdk.internal.access=ALL-UNNAMED",
"-Ddhfs.objects.peerdiscovery.interval=100",
"-Ddhfs.objects.invalidation.delay=100",
"-Ddhfs.objects.ref_verification=true",
"-Ddhfs.objects.deletion.delay=0",
"-Ddhfs.objects.write_log=true",
"-Ddhfs.objects.sync.timeout=20",
"-Ddhfs.objects.sync.ping.timeout=20",
"-Ddhfs.objects.reconnect_interval=1s",
"-Dquarkus.log.category.\"com.usatiuk.dhfs\".level=TRACE",
"-jar", "/app/quarkus-run.jar")
.build())
builder
.from("azul/zulu-openjdk-debian:21-jre-latest")
.run("apt update && apt install -y libfuse2 curl")
.copy("/app", "/app")
.cmd("java", "-ea", "-Xmx128M", "--add-exports", "java.base/sun.nio.ch=ALL-UNNAMED",
"--add-exports", "java.base/jdk.internal.access=ALL-UNNAMED",
"-Ddhfs.objects.peerdiscovery.interval=100",
"-Ddhfs.objects.invalidation.delay=100",
"-Ddhfs.objects.ref_verification=true",
"-Ddhfs.objects.deletion.delay=0",
"-Ddhfs.objects.write_log=true",
"-Ddhfs.objects.sync.timeout=20",
"-Ddhfs.objects.sync.ping.timeout=20",
"-Ddhfs.objects.reconnect_interval=1s",
"-Dquarkus.log.category.\"com.usatiuk.dhfs\".level=TRACE",
"-jar", "/app/quarkus-run.jar")
.build())
.withFileFromPath("/app", Paths.get(buildPath, "quarkus-app"));
container1 = new GenericContainer<>(image)
.withPrivilegedMode(true)
@@ -86,16 +86,16 @@ public class DhfsFuseIT {
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Ignoring new address"), 60, TimeUnit.SECONDS);
var c1curl = container1.execInContainer("/bin/sh", "-c",
"curl --header \"Content-Type: application/json\" " +
" --request PUT " +
" --data '{\"uuid\":\"" + c2uuid + "\"}' " +
" http://localhost:8080/objects-manage/known-peers");
"curl --header \"Content-Type: application/json\" " +
" --request PUT " +
" --data '{\"uuid\":\"" + c2uuid + "\"}' " +
" http://localhost:8080/objects-manage/known-peers");
var c2curl = container2.execInContainer("/bin/sh", "-c",
"curl --header \"Content-Type: application/json\" " +
" --request PUT " +
" --data '{\"uuid\":\"" + c1uuid + "\"}' " +
" http://localhost:8080/objects-manage/known-peers");
"curl --header \"Content-Type: application/json\" " +
" --request PUT " +
" --data '{\"uuid\":\"" + c1uuid + "\"}' " +
" http://localhost:8080/objects-manage/known-peers");
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
@@ -172,6 +172,7 @@ public class DhfsFuseIT {
}
@Test
@Disabled // TODO: How this fits with the tree?
void deleteDelayedTest() throws IOException, InterruptedException, TimeoutException {
Assertions.assertEquals(0, container1.execInContainer("/bin/sh", "-c", "echo tesempty > /root/dhfs_default/fuse/testf1").getExitCode());
Thread.sleep(1000);
@@ -214,6 +215,10 @@ public class DhfsFuseIT {
Assertions.assertEquals(0, container2.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse").getExitCode());
Thread.sleep(500);
// Motivate the log a little
Assertions.assertEquals(0, container1.execInContainer("/bin/sh", "-c", "echo tesempty2 > /root/dhfs_default/fuse/testf2").getExitCode());
Assertions.assertEquals(0, container2.execInContainer("/bin/sh", "-c", "echo tesempty3 > /root/dhfs_default/fuse/testf3").getExitCode());
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Deleting from persistent"), 60, TimeUnit.SECONDS, 3);
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Deleting from persistent"), 60, TimeUnit.SECONDS, 3);
@@ -263,6 +268,7 @@ public class DhfsFuseIT {
}
// TODO: This probably shouldn't be working right now
@Test
void removeAddHostTest() throws IOException, InterruptedException, TimeoutException {
Assertions.assertEquals(0, container1.execInContainer("/bin/sh", "-c", "echo tesempty > /root/dhfs_default/fuse/testf1").getExitCode());
@@ -271,10 +277,10 @@ public class DhfsFuseIT {
Assertions.assertEquals("tesempty\n", container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf1").getStdout());
var c2curl = container2.execInContainer("/bin/sh", "-c",
"curl --header \"Content-Type: application/json\" " +
" --request DELETE " +
" --data '{\"uuid\":\"" + c1uuid + "\"}' " +
" http://localhost:8080/objects-manage/known-peers");
"curl --header \"Content-Type: application/json\" " +
" --request DELETE " +
" --data '{\"uuid\":\"" + c1uuid + "\"}' " +
" http://localhost:8080/objects-manage/known-peers");
Assertions.assertEquals(0, container2.execInContainer("/bin/sh", "-c", "echo rewritten > /root/dhfs_default/fuse/testf1").getExitCode());
Thread.sleep(1000);
@@ -283,10 +289,10 @@ public class DhfsFuseIT {
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Lost connection to"), 60, TimeUnit.SECONDS);
container2.execInContainer("/bin/sh", "-c",
"curl --header \"Content-Type: application/json\" " +
" --request PUT " +
" --data '{\"uuid\":\"" + c1uuid + "\"}' " +
" http://localhost:8080/objects-manage/known-peers");
"curl --header \"Content-Type: application/json\" " +
" --request PUT " +
" --data '{\"uuid\":\"" + c1uuid + "\"}' " +
" http://localhost:8080/objects-manage/known-peers");
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
@@ -301,7 +307,7 @@ public class DhfsFuseIT {
Assertions.assertEquals(0, container2.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse").getExitCode());
Thread.sleep(1000);
boolean createFail = Stream.of(Pair.of(container1, "echo test1 >> /root/dhfs_default/fuse/testf"),
Pair.of(container2, "echo test2 >> /root/dhfs_default/fuse/testf")).parallel().map(p -> {
Pair.of(container2, "echo test2 >> /root/dhfs_default/fuse/testf")).parallel().map(p -> {
try {
return p.getLeft().execInContainer("/bin/sh", "-c", p.getRight()).getExitCode();
} catch (Exception e) {
@@ -362,60 +368,4 @@ public class DhfsFuseIT {
Assertions.assertTrue(c2ls2.getStdout().contains("ahinou"));
}
@Test
void dirConflictTest2() throws IOException, InterruptedException, TimeoutException {
Assertions.assertEquals(0, container1.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse").getExitCode());
Assertions.assertEquals(0, container2.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse").getExitCode());
boolean createFail = Stream.of(Pair.of(container1, "echo test1 >> /root/dhfs_default/fuse/testf"),
Pair.of(container2, "echo test2 >> /root/dhfs_default/fuse/testf")).parallel().map(p -> {
try {
return p.getLeft().execInContainer("/bin/sh", "-c", p.getRight()).getExitCode();
} catch (Exception e) {
throw new RuntimeException(e);
}
}).anyMatch(r -> r != 0);
Assumptions.assumeTrue(!createFail, "Failed creating one or more files");
Thread.sleep(1000);
var ls = container2.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse");
var cat = container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/*");
Log.info(ls);
Log.info(cat);
Assertions.assertTrue(cat.getStdout().contains("test1"));
Assertions.assertTrue(cat.getStdout().contains("test2"));
}
@Test
void dirConflictTest3() throws IOException, InterruptedException, TimeoutException {
boolean createFail = Stream.of(Pair.of(container1, "echo test1 >> /root/dhfs_default/fuse/testf"),
Pair.of(container2, "echo test2 >> /root/dhfs_default/fuse/testf")).parallel().map(p -> {
try {
return p.getLeft().execInContainer("/bin/sh", "-c", p.getRight()).getExitCode();
} catch (Exception e) {
throw new RuntimeException(e);
}
}).anyMatch(r -> r != 0);
Assumptions.assumeTrue(!createFail, "Failed creating one or more files");
Thread.sleep(1000);
var ls = container2.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse");
var cat = container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/*");
Log.info(ls);
Log.info(cat);
Assertions.assertTrue(cat.getStdout().contains("test1"));
Assertions.assertTrue(cat.getStdout().contains("test2"));
}
@Test
void dirConflictTest4() throws IOException, InterruptedException, TimeoutException {
boolean createdOk = (container1.execInContainer("/bin/sh", "-c", "echo test1 >> /root/dhfs_default/fuse/testf").getExitCode() == 0)
&& (container2.execInContainer("/bin/sh", "-c", "echo test2 >> /root/dhfs_default/fuse/testf").getExitCode() == 0);
Assumptions.assumeTrue(createdOk, "Failed creating one or more files");
Thread.sleep(1000);
var ls = container2.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse");
var cat = container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/*");
Log.info(ls);
Log.info(cat);
Assertions.assertTrue(cat.getStdout().contains("test1"));
Assertions.assertTrue(cat.getStdout().contains("test2"));
}
}

View File

@@ -5,6 +5,7 @@ import io.quarkus.logging.Log;
import org.apache.commons.lang3.tuple.Pair;
import org.junit.jupiter.api.*;
import org.slf4j.LoggerFactory;
import org.testcontainers.DockerClientFactory;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.output.Slf4jLogConsumer;
@@ -44,23 +45,23 @@ public class DhfsFusex3IT {
Network network = Network.newNetwork();
var image = new ImageFromDockerfile()
.withDockerfileFromBuilder(builder ->
builder
.from("azul/zulu-openjdk-debian:21-jre-latest")
.run("apt update && apt install -y libfuse2 curl gcc")
.copy("/app", "/app")
.cmd("java", "-ea", "-Xmx128M", "--add-exports", "java.base/sun.nio.ch=ALL-UNNAMED",
"--add-exports", "java.base/jdk.internal.access=ALL-UNNAMED",
"-Ddhfs.objects.peerdiscovery.interval=100",
"-Ddhfs.objects.invalidation.delay=100",
"-Ddhfs.objects.deletion.delay=0",
"-Ddhfs.objects.ref_verification=true",
"-Ddhfs.objects.write_log=true",
"-Ddhfs.objects.sync.timeout=20",
"-Ddhfs.objects.sync.ping.timeout=20",
"-Ddhfs.objects.reconnect_interval=1s",
"-Dquarkus.log.category.\"com.usatiuk.dhfs\".level=TRACE",
"-jar", "/app/quarkus-run.jar")
.build())
builder
.from("azul/zulu-openjdk-debian:21-jre-latest")
.run("apt update && apt install -y libfuse2 curl gcc")
.copy("/app", "/app")
.cmd("java", "-ea", "-Xmx128M", "--add-exports", "java.base/sun.nio.ch=ALL-UNNAMED",
"--add-exports", "java.base/jdk.internal.access=ALL-UNNAMED",
"-Ddhfs.objects.peerdiscovery.interval=100",
"-Ddhfs.objects.invalidation.delay=100",
"-Ddhfs.objects.deletion.delay=0",
"-Ddhfs.objects.ref_verification=true",
"-Ddhfs.objects.write_log=true",
"-Ddhfs.objects.sync.timeout=10",
"-Ddhfs.objects.sync.ping.timeout=5",
"-Ddhfs.objects.reconnect_interval=1s",
"-Dquarkus.log.category.\"com.usatiuk.dhfs\".level=TRACE",
"-jar", "/app/quarkus-run.jar")
.build())
.withFileFromPath("/app", Paths.get(buildPath, "quarkus-app"));
container1 = new GenericContainer<>(image)
.withPrivilegedMode(true)
@@ -104,28 +105,28 @@ public class DhfsFusex3IT {
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Ignoring new address"), 60, TimeUnit.SECONDS, 2);
var c1curl = container1.execInContainer("/bin/sh", "-c",
"curl --header \"Content-Type: application/json\" " +
" --request PUT " +
" --data '{\"uuid\":\"" + c2uuid + "\"}' " +
" http://localhost:8080/objects-manage/known-peers");
"curl --header \"Content-Type: application/json\" " +
" --request PUT " +
" --data '{\"uuid\":\"" + c2uuid + "\"}' " +
" http://localhost:8080/objects-manage/known-peers");
var c2curl1 = container2.execInContainer("/bin/sh", "-c",
"curl --header \"Content-Type: application/json\" " +
" --request PUT " +
" --data '{\"uuid\":\"" + c1uuid + "\"}' " +
" http://localhost:8080/objects-manage/known-peers");
"curl --header \"Content-Type: application/json\" " +
" --request PUT " +
" --data '{\"uuid\":\"" + c1uuid + "\"}' " +
" http://localhost:8080/objects-manage/known-peers");
var c2curl3 = container2.execInContainer("/bin/sh", "-c",
"curl --header \"Content-Type: application/json\" " +
" --request PUT " +
" --data '{\"uuid\":\"" + c3uuid + "\"}' " +
" http://localhost:8080/objects-manage/known-peers");
"curl --header \"Content-Type: application/json\" " +
" --request PUT " +
" --data '{\"uuid\":\"" + c3uuid + "\"}' " +
" http://localhost:8080/objects-manage/known-peers");
var c3curl = container3.execInContainer("/bin/sh", "-c",
"curl --header \"Content-Type: application/json\" " +
" --request PUT " +
" --data '{\"uuid\":\"" + c2uuid + "\"}' " +
" http://localhost:8080/objects-manage/known-peers");
"curl --header \"Content-Type: application/json\" " +
" --request PUT " +
" --data '{\"uuid\":\"" + c2uuid + "\"}' " +
" http://localhost:8080/objects-manage/known-peers");
waitingConsumer3.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS, 2);
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS, 2);
@@ -174,10 +175,10 @@ public class DhfsFusex3IT {
Assertions.assertEquals("tesempty\n", container3.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf1").getStdout());
var c3curl = container3.execInContainer("/bin/sh", "-c",
"curl --header \"Content-Type: application/json\" " +
" --request DELETE " +
" --data '{\"uuid\":\"" + c2uuid + "\"}' " +
" http://localhost:8080/objects-manage/known-peers");
"curl --header \"Content-Type: application/json\" " +
" --request DELETE " +
" --data '{\"uuid\":\"" + c2uuid + "\"}' " +
" http://localhost:8080/objects-manage/known-peers");
Thread.sleep(2000);
Assertions.assertEquals(0, container2.execInContainer("/bin/sh", "-c", "echo rewritten > /root/dhfs_default/fuse/testf1").getExitCode());
@@ -189,16 +190,26 @@ public class DhfsFusex3IT {
@Test
void dirConflictTest() throws IOException, InterruptedException, TimeoutException {
boolean createFail = Stream.of(Pair.of(container1, "echo test1 >> /root/dhfs_default/fuse/testf"),
Pair.of(container2, "echo test2 >> /root/dhfs_default/fuse/testf"),
Pair.of(container3, "echo test3 >> /root/dhfs_default/fuse/testf")).parallel().map(p -> {
try {
return p.getLeft().execInContainer("/bin/sh", "-c", p.getRight()).getExitCode();
} catch (Exception e) {
throw new RuntimeException(e);
}
}).anyMatch(r -> r != 0);
Assumptions.assumeTrue(!createFail, "Failed creating one or more files");
var client = DockerClientFactory.instance().client();
client.pauseContainerCmd(container1.getContainerId()).exec();
client.pauseContainerCmd(container2.getContainerId()).exec();
// Pauses needed as otherwise docker buffers some incoming packets
waitingConsumer3.waitUntil(frame -> frame.getUtf8String().contains("Lost connection to"), 60, TimeUnit.SECONDS, 2);
Assertions.assertEquals(0, container3.execInContainer("/bin/sh", "-c", "echo test3 >> /root/dhfs_default/fuse/testf").getExitCode());
client.pauseContainerCmd(container3.getContainerId()).exec();
client.unpauseContainerCmd(container2.getContainerId()).exec();
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Lost connection to"), 60, TimeUnit.SECONDS, 2);
Assertions.assertEquals(0, container2.execInContainer("/bin/sh", "-c", "echo test2 >> /root/dhfs_default/fuse/testf").getExitCode());
client.pauseContainerCmd(container2.getContainerId()).exec();
client.unpauseContainerCmd(container1.getContainerId()).exec();
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Lost connection to"), 60, TimeUnit.SECONDS, 2);
Assertions.assertEquals(0, container1.execInContainer("/bin/sh", "-c", "echo test1 >> /root/dhfs_default/fuse/testf").getExitCode());
client.unpauseContainerCmd(container2.getContainerId()).exec();
client.unpauseContainerCmd(container3.getContainerId()).exec();
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS, 2);
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS, 2);
waitingConsumer3.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS, 2);
Thread.sleep(2000);
for (var c : List.of(container1, container2, container3)) {
var ls = c.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse");
@@ -210,53 +221,16 @@ public class DhfsFusex3IT {
Assertions.assertTrue(cat.getStdout().contains("test3"));
}
Assertions.assertEquals(container1.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse").getStdout(),
container2.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse").getStdout());
container2.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse").getStdout());
Assertions.assertEquals(container3.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse").getStdout(),
container2.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse").getStdout());
container2.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse").getStdout());
Assertions.assertEquals(container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/*").getStdout(),
container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/*").getStdout());
container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/*").getStdout());
Assertions.assertEquals(container3.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/*").getStdout(),
container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/*").getStdout());
container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/*").getStdout());
}
@Test
void dirConflictTest3() throws IOException, InterruptedException, TimeoutException {
Assertions.assertEquals(0, container1.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse").getExitCode());
Assertions.assertEquals(0, container2.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse").getExitCode());
Assertions.assertEquals(0, container3.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse").getExitCode());
boolean createFail = Stream.of(
Pair.of(container1, "echo test1 >> /root/dhfs_default/fuse/testf"),
Pair.of(container2, "echo test2 >> /root/dhfs_default/fuse/testf"),
Pair.of(container3, "echo test3 >> /root/dhfs_default/fuse/testf")).parallel().map(p -> {
try {
Log.info("Creating");
return p.getLeft().execInContainer("/bin/sh", "-c", p.getRight()).getExitCode();
} catch (Exception e) {
throw new RuntimeException(e);
}
}).anyMatch(r -> r != 0);
Assumptions.assumeTrue(!createFail, "Failed creating one or more files");
Thread.sleep(2000);
for (var c : List.of(container1, container2, container3)) {
var ls = c.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse");
var cat = c.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/*");
Log.info(ls);
Log.info(cat);
Assertions.assertTrue(cat.getStdout().contains("test1"));
Assertions.assertTrue(cat.getStdout().contains("test2"));
Assertions.assertTrue(cat.getStdout().contains("test3"));
}
Assertions.assertEquals(container1.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse").getStdout(),
container2.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse").getStdout());
Assertions.assertEquals(container3.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse").getStdout(),
container2.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse").getStdout());
Assertions.assertEquals(container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/*").getStdout(),
container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/*").getStdout());
Assertions.assertEquals(container3.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/*").getStdout(),
container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/*").getStdout());
}
@Test
@Disabled
void fileConflictTest2() throws IOException, InterruptedException, TimeoutException {
@@ -266,8 +240,8 @@ public class DhfsFusex3IT {
Assertions.assertEquals("tesempty\n", container3.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf1").getStdout());
boolean writeFail = Stream.of(Pair.of(container1, "echo test1 >> /root/dhfs_default/fuse/testf1"),
Pair.of(container2, "echo test2 >> /root/dhfs_default/fuse/testf1"),
Pair.of(container3, "echo test3 >> /root/dhfs_default/fuse/testf1")).parallel().map(p -> {
Pair.of(container2, "echo test2 >> /root/dhfs_default/fuse/testf1"),
Pair.of(container3, "echo test3 >> /root/dhfs_default/fuse/testf1")).parallel().map(p -> {
try {
return p.getLeft().execInContainer("/bin/sh", "-c", p.getRight()).getExitCode();
} catch (Exception e) {
@@ -292,13 +266,13 @@ public class DhfsFusex3IT {
Assertions.assertTrue(cat.getStdout().contains("test3"));
}
Assertions.assertEquals(container1.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse").getStdout(),
container2.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse").getStdout());
container2.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse").getStdout());
Assertions.assertEquals(container3.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse").getStdout(),
container2.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse").getStdout());
container2.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse").getStdout());
Assertions.assertEquals(container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/*").getStdout(),
container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/*").getStdout());
container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/*").getStdout());
Assertions.assertEquals(container3.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/*").getStdout(),
container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/*").getStdout());
container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/*").getStdout());
}
}