mirror of
https://github.com/usatiuk/dhfs.git
synced 2025-10-28 20:47:49 +01:00
seemingly working fs with record classes
This commit is contained in:
@@ -23,6 +23,10 @@
|
||||
<artifactId>junit-jupiter-engine</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.commons</groupId>
|
||||
<artifactId>commons-collections4</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.commons</groupId>
|
||||
<artifactId>commons-lang3</artifactId>
|
||||
|
||||
@@ -18,11 +18,6 @@ public class AtomicClock implements Clock<Long>, Serializable {
|
||||
_max = timestamp;
|
||||
}
|
||||
|
||||
// FIXME:
|
||||
public void ungetTimestamp() {
|
||||
--_max;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Long peekTimestamp() {
|
||||
return _max;
|
||||
|
||||
@@ -8,15 +8,16 @@ import java.util.function.Function;
|
||||
import java.util.logging.Level;
|
||||
import java.util.logging.Logger;
|
||||
|
||||
public class KleppmannTree<TimestampT extends Comparable<TimestampT>, PeerIdT extends Comparable<PeerIdT>, MetaT extends NodeMeta, NodeIdT, WrapperT extends TreeNodeWrapper<TimestampT, PeerIdT, MetaT, NodeIdT>> {
|
||||
public class KleppmannTree<TimestampT extends Comparable<TimestampT>, PeerIdT extends Comparable<PeerIdT>, MetaT extends NodeMeta, NodeIdT> {
|
||||
private static final Logger LOGGER = Logger.getLogger(KleppmannTree.class.getName());
|
||||
private final StorageInterface<TimestampT, PeerIdT, MetaT, NodeIdT, WrapperT> _storage;
|
||||
|
||||
private final StorageInterface<TimestampT, PeerIdT, MetaT, NodeIdT> _storage;
|
||||
private final PeerInterface<PeerIdT> _peers;
|
||||
private final Clock<TimestampT> _clock;
|
||||
private final OpRecorder<TimestampT, PeerIdT, MetaT, NodeIdT> _opRecorder;
|
||||
private HashMap<NodeIdT, WrapperT> _undoCtx = null;
|
||||
private HashMap<NodeIdT, TreeNode<TimestampT, PeerIdT, MetaT, NodeIdT>> _undoCtx = null;
|
||||
|
||||
public KleppmannTree(StorageInterface<TimestampT, PeerIdT, MetaT, NodeIdT, WrapperT> storage,
|
||||
public KleppmannTree(StorageInterface<TimestampT, PeerIdT, MetaT, NodeIdT> storage,
|
||||
PeerInterface<PeerIdT> peers,
|
||||
Clock<TimestampT> clock,
|
||||
OpRecorder<TimestampT, PeerIdT, MetaT, NodeIdT> opRecorder) {
|
||||
@@ -30,13 +31,8 @@ public class KleppmannTree<TimestampT extends Comparable<TimestampT>, PeerIdT ex
|
||||
if (names.isEmpty()) return fromId;
|
||||
|
||||
var from = _storage.getById(fromId);
|
||||
from.rLock();
|
||||
NodeIdT childId;
|
||||
try {
|
||||
childId = from.getNode().getChildren().get(names.getFirst());
|
||||
} finally {
|
||||
from.rUnlock();
|
||||
}
|
||||
childId = from.children().get(names.getFirst());
|
||||
|
||||
if (childId == null)
|
||||
return null;
|
||||
@@ -45,63 +41,52 @@ public class KleppmannTree<TimestampT extends Comparable<TimestampT>, PeerIdT ex
|
||||
}
|
||||
|
||||
public NodeIdT traverse(NodeIdT fromId, List<String> names) {
|
||||
_storage.rLock();
|
||||
try {
|
||||
return traverseImpl(fromId, names.subList(1, names.size()));
|
||||
} finally {
|
||||
_storage.rUnlock();
|
||||
}
|
||||
return traverseImpl(fromId, names.subList(1, names.size()));
|
||||
}
|
||||
|
||||
public NodeIdT traverse(List<String> names) {
|
||||
_storage.rLock();
|
||||
try {
|
||||
return traverseImpl(_storage.getRootId(), names);
|
||||
} finally {
|
||||
_storage.rUnlock();
|
||||
}
|
||||
return traverseImpl(_storage.getRootId(), names);
|
||||
}
|
||||
|
||||
private void undoEffect(LogEffect<TimestampT, PeerIdT, MetaT, NodeIdT> effect) {
|
||||
_storage.assertRwLock();
|
||||
if (effect.oldInfo() != null) {
|
||||
var node = _storage.getById(effect.childId());
|
||||
var oldParent = _storage.getById(effect.oldInfo().oldParent());
|
||||
var curParent = _storage.getById(effect.newParentId());
|
||||
curParent.rwLock();
|
||||
oldParent.rwLock();
|
||||
node.rwLock();
|
||||
try {
|
||||
curParent.getNode().getChildren().remove(node.getNode().getMeta().getName());
|
||||
if (!node.getNode().getMeta().getClass().equals(effect.oldInfo().oldMeta().getClass()))
|
||||
throw new IllegalArgumentException("Class mismatch for meta for node " + node.getNode().getId());
|
||||
node.getNode().setMeta(effect.oldInfo().oldMeta());
|
||||
node.getNode().setParent(oldParent.getNode().getId());
|
||||
oldParent.getNode().getChildren().put(node.getNode().getMeta().getName(), node.getNode().getId());
|
||||
node.notifyRmRef(curParent.getNode().getId());
|
||||
node.notifyRef(oldParent.getNode().getId());
|
||||
node.getNode().setLastEffectiveOp(effect.oldInfo().oldEffectiveMove());
|
||||
} finally {
|
||||
node.rwUnlock();
|
||||
oldParent.rwUnlock();
|
||||
curParent.rwUnlock();
|
||||
{
|
||||
var newCurParentChildren = new HashMap<>(curParent.children());
|
||||
newCurParentChildren.remove(node.meta().getName());
|
||||
curParent = curParent.withChildren(newCurParentChildren);
|
||||
_storage.putNode(curParent);
|
||||
}
|
||||
|
||||
if (!node.meta().getClass().equals(effect.oldInfo().oldMeta().getClass()))
|
||||
throw new IllegalArgumentException("Class mismatch for meta for node " + node.key());
|
||||
{
|
||||
var newOldParentChildren = new HashMap<>(oldParent.children());
|
||||
newOldParentChildren.put(node.meta().getName(), node.key());
|
||||
oldParent = oldParent.withChildren(newOldParentChildren);
|
||||
_storage.putNode(oldParent);
|
||||
}
|
||||
_storage.putNode(
|
||||
node.withMeta(effect.oldInfo().oldMeta())
|
||||
.withParent(effect.oldInfo().oldParent())
|
||||
.withLastEffectiveOp(effect.oldInfo().oldEffectiveMove())
|
||||
);
|
||||
} else {
|
||||
var node = _storage.getById(effect.childId());
|
||||
var curParent = _storage.getById(effect.newParentId());
|
||||
curParent.rwLock();
|
||||
node.rwLock();
|
||||
try {
|
||||
curParent.getNode().getChildren().remove(node.getNode().getMeta().getName());
|
||||
node.freeze();
|
||||
node.getNode().setParent(null);
|
||||
node.getNode().setLastEffectiveOp(null);
|
||||
node.notifyRmRef(curParent.getNode().getId());
|
||||
_undoCtx.put(node.getNode().getId(), node);
|
||||
} finally {
|
||||
node.rwUnlock();
|
||||
curParent.rwUnlock();
|
||||
{
|
||||
var newCurParentChildren = new HashMap<>(curParent.children());
|
||||
newCurParentChildren.remove(node.meta().getName());
|
||||
curParent = curParent.withChildren(newCurParentChildren);
|
||||
_storage.putNode(curParent);
|
||||
}
|
||||
_storage.putNode(
|
||||
node.withParent(null)
|
||||
.withLastEffectiveOp(null)
|
||||
);
|
||||
_undoCtx.put(node.key(), node);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -116,7 +101,6 @@ public class KleppmannTree<TimestampT extends Comparable<TimestampT>, PeerIdT ex
|
||||
}
|
||||
|
||||
private void doAndPut(OpMove<TimestampT, PeerIdT, MetaT, NodeIdT> op, boolean failCreatingIfExists) {
|
||||
_storage.assertRwLock();
|
||||
var res = doOp(op, failCreatingIfExists);
|
||||
_storage.getLog().put(res.op().timestamp(), res);
|
||||
}
|
||||
@@ -160,22 +144,16 @@ public class KleppmannTree<TimestampT extends Comparable<TimestampT>, PeerIdT ex
|
||||
}
|
||||
if (!inTrash.isEmpty()) {
|
||||
var trash = _storage.getById(_storage.getTrashId());
|
||||
trash.rwLock();
|
||||
try {
|
||||
for (var n : inTrash) {
|
||||
var node = _storage.getById(n);
|
||||
node.rwLock();
|
||||
try {
|
||||
if (trash.getNode().getChildren().remove(n.toString()) == null)
|
||||
LOGGER.severe("Node " + node.getNode().getId() + " not found in trash but should be there");
|
||||
node.notifyRmRef(trash.getNode().getId());
|
||||
} finally {
|
||||
node.rwUnlock();
|
||||
}
|
||||
_storage.removeNode(n);
|
||||
for (var n : inTrash) {
|
||||
var node = _storage.getById(n);
|
||||
{
|
||||
var newTrashChildren = new HashMap<>(trash.children());
|
||||
if (newTrashChildren.remove(n.toString()) == null)
|
||||
LOGGER.severe("Node " + node.key() + " not found in trash but should be there");
|
||||
trash = trash.withChildren(newTrashChildren);
|
||||
_storage.putNode(trash);
|
||||
}
|
||||
} finally {
|
||||
trash.rwUnlock();
|
||||
_storage.removeNode(n);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
@@ -188,29 +166,18 @@ public class KleppmannTree<TimestampT extends Comparable<TimestampT>, PeerIdT ex
|
||||
}
|
||||
|
||||
public void move(NodeIdT newParent, MetaT newMeta, NodeIdT child, boolean failCreatingIfExists) {
|
||||
_storage.rwLock();
|
||||
try {
|
||||
var createdMove = createMove(newParent, newMeta, child);
|
||||
_opRecorder.recordOp(createdMove);
|
||||
applyOp(_peers.getSelfId(), createdMove, failCreatingIfExists);
|
||||
} finally {
|
||||
_storage.rwUnlock();
|
||||
}
|
||||
var createdMove = createMove(newParent, newMeta, child);
|
||||
_opRecorder.recordOp(createdMove);
|
||||
applyOp(_peers.getSelfId(), createdMove, failCreatingIfExists);
|
||||
}
|
||||
|
||||
public void applyExternalOp(PeerIdT from, OpMove<TimestampT, PeerIdT, MetaT, NodeIdT> op) {
|
||||
_storage.rwLock();
|
||||
try {
|
||||
_clock.updateTimestamp(op.timestamp().timestamp());
|
||||
applyOp(from, op, false);
|
||||
} finally {
|
||||
_storage.rwUnlock();
|
||||
}
|
||||
_clock.updateTimestamp(op.timestamp().timestamp());
|
||||
applyOp(from, op, false);
|
||||
}
|
||||
|
||||
// Returns true if the timestamp is newer than what's seen, false otherwise
|
||||
private boolean updateTimestampImpl(PeerIdT from, TimestampT newTimestamp) {
|
||||
_storage.assertRwLock();
|
||||
TimestampT oldRef = _storage.getPeerTimestampLog().getForPeer(from);
|
||||
if (oldRef != null && oldRef.compareTo(newTimestamp) > 0) { // FIXME?
|
||||
LOGGER.warning("Wrong op order: received older than known from " + from.toString());
|
||||
@@ -221,31 +188,18 @@ public class KleppmannTree<TimestampT extends Comparable<TimestampT>, PeerIdT ex
|
||||
}
|
||||
|
||||
public boolean updateExternalTimestamp(PeerIdT from, TimestampT timestamp) {
|
||||
_storage.rLock();
|
||||
try {
|
||||
// TODO: Ideally no point in this separate locking?
|
||||
var gotExt = _storage.getPeerTimestampLog().getForPeer(from);
|
||||
var gotSelf = _storage.getPeerTimestampLog().getForPeer(_peers.getSelfId());
|
||||
if ((gotExt != null && gotExt.compareTo(timestamp) >= 0)
|
||||
&& (gotSelf != null && gotSelf.compareTo(_clock.peekTimestamp()) >= 0)) return false;
|
||||
} finally {
|
||||
_storage.rUnlock();
|
||||
}
|
||||
_storage.rwLock();
|
||||
try {
|
||||
updateTimestampImpl(_peers.getSelfId(), _clock.peekTimestamp()); // FIXME:? Kind of a hack?
|
||||
updateTimestampImpl(from, timestamp);
|
||||
tryTrimLog();
|
||||
} finally {
|
||||
_storage.rwUnlock();
|
||||
}
|
||||
|
||||
// TODO: Ideally no point in this separate locking?
|
||||
var gotExt = _storage.getPeerTimestampLog().getForPeer(from);
|
||||
var gotSelf = _storage.getPeerTimestampLog().getForPeer(_peers.getSelfId());
|
||||
if ((gotExt != null && gotExt.compareTo(timestamp) >= 0)
|
||||
&& (gotSelf != null && gotSelf.compareTo(_clock.peekTimestamp()) >= 0)) return false;
|
||||
updateTimestampImpl(_peers.getSelfId(), _clock.peekTimestamp()); // FIXME:? Kind of a hack?
|
||||
updateTimestampImpl(from, timestamp);
|
||||
tryTrimLog();
|
||||
return true;
|
||||
}
|
||||
|
||||
private void applyOp(PeerIdT from, OpMove<TimestampT, PeerIdT, MetaT, NodeIdT> op, boolean failCreatingIfExists) {
|
||||
_storage.assertRwLock();
|
||||
|
||||
if (!updateTimestampImpl(from, op.timestamp().timestamp())) return;
|
||||
|
||||
var log = _storage.getLog();
|
||||
@@ -276,7 +230,6 @@ public class KleppmannTree<TimestampT extends Comparable<TimestampT>, PeerIdT ex
|
||||
if (!_undoCtx.isEmpty()) {
|
||||
for (var e : _undoCtx.entrySet()) {
|
||||
LOGGER.log(Level.FINE, "Dropping node " + e.getKey());
|
||||
e.getValue().unfreeze();
|
||||
_storage.removeNode(e.getKey());
|
||||
}
|
||||
}
|
||||
@@ -292,12 +245,10 @@ public class KleppmannTree<TimestampT extends Comparable<TimestampT>, PeerIdT ex
|
||||
}
|
||||
|
||||
private CombinedTimestamp<TimestampT, PeerIdT> getTimestamp() {
|
||||
_storage.assertRwLock();
|
||||
return new CombinedTimestamp<>(_clock.getTimestamp(), _peers.getSelfId());
|
||||
}
|
||||
|
||||
private <LocalMetaT extends MetaT> OpMove<TimestampT, PeerIdT, LocalMetaT, NodeIdT> createMove(NodeIdT newParent, LocalMetaT newMeta, NodeIdT node) {
|
||||
_storage.assertRwLock();
|
||||
return new OpMove<>(getTimestamp(), newParent, newMeta, node);
|
||||
}
|
||||
|
||||
@@ -317,91 +268,73 @@ public class KleppmannTree<TimestampT extends Comparable<TimestampT>, PeerIdT ex
|
||||
return computed;
|
||||
}
|
||||
|
||||
private WrapperT getNewNode(TreeNode<TimestampT, PeerIdT, MetaT, NodeIdT> desired) {
|
||||
_storage.assertRwLock();
|
||||
private TreeNode<TimestampT, PeerIdT, MetaT, NodeIdT> getNewNode(NodeIdT key, NodeIdT parent, MetaT meta) {
|
||||
if (_undoCtx != null) {
|
||||
var node = _undoCtx.get(desired.getId());
|
||||
var node = _undoCtx.get(key);
|
||||
if (node != null) {
|
||||
node.rwLock();
|
||||
try {
|
||||
if (!node.getNode().getChildren().isEmpty()) {
|
||||
LOGGER.log(Level.WARNING, "Not empty children for undone node " + desired.getId());
|
||||
if (!node.children().isEmpty()) {
|
||||
LOGGER.log(Level.WARNING, "Not empty children for undone node " + key);
|
||||
}
|
||||
node.getNode().setParent(desired.getParent());
|
||||
node.notifyRef(desired.getParent());
|
||||
node.getNode().setMeta(desired.getMeta());
|
||||
node.unfreeze();
|
||||
node = node.withParent(parent).withMeta(meta);
|
||||
} catch (Exception e) {
|
||||
LOGGER.log(Level.SEVERE, "Error while fixing up node " + desired.getId(), e);
|
||||
node.rwUnlock();
|
||||
LOGGER.log(Level.SEVERE, "Error while fixing up node " + key, e);
|
||||
node = null;
|
||||
}
|
||||
}
|
||||
if (node != null) {
|
||||
_undoCtx.remove(desired.getId());
|
||||
_undoCtx.remove(key);
|
||||
return node;
|
||||
}
|
||||
}
|
||||
return _storage.createNewNode(desired);
|
||||
return _storage.createNewNode(key, parent, meta);
|
||||
}
|
||||
|
||||
private void applyEffects(OpMove<TimestampT, PeerIdT, MetaT, NodeIdT> sourceOp, List<LogEffect<TimestampT, PeerIdT, MetaT, NodeIdT>> effects) {
|
||||
_storage.assertRwLock();
|
||||
for (var effect : effects) {
|
||||
WrapperT oldParentNode = null;
|
||||
WrapperT newParentNode;
|
||||
WrapperT node;
|
||||
TreeNode<TimestampT, PeerIdT, MetaT, NodeIdT> oldParentNode = null;
|
||||
TreeNode<TimestampT, PeerIdT, MetaT, NodeIdT> newParentNode;
|
||||
TreeNode<TimestampT, PeerIdT, MetaT, NodeIdT> node;
|
||||
|
||||
newParentNode = _storage.getById(effect.newParentId());
|
||||
newParentNode.rwLock();
|
||||
try {
|
||||
if (effect.oldInfo() != null) {
|
||||
oldParentNode = _storage.getById(effect.oldInfo().oldParent());
|
||||
oldParentNode.rwLock();
|
||||
}
|
||||
try {
|
||||
if (oldParentNode == null) {
|
||||
node = getNewNode(new TreeNode<>(effect.childId(), effect.newParentId(), effect.newMeta()));
|
||||
} else {
|
||||
node = _storage.getById(effect.childId());
|
||||
node.rwLock();
|
||||
}
|
||||
try {
|
||||
|
||||
if (oldParentNode != null) {
|
||||
oldParentNode.getNode().getChildren().remove(effect.oldInfo().oldMeta().getName());
|
||||
node.notifyRmRef(effect.oldInfo().oldParent());
|
||||
}
|
||||
|
||||
newParentNode.getNode().getChildren().put(effect.newMeta().getName(), effect.childId());
|
||||
if (effect.newParentId().equals(_storage.getTrashId()) &&
|
||||
!Objects.equals(effect.newMeta().getName(), effect.childId().toString()))
|
||||
throw new IllegalArgumentException("Move to trash should have id of node as name");
|
||||
node.getNode().setParent(effect.newParentId());
|
||||
node.getNode().setMeta(effect.newMeta());
|
||||
node.getNode().setLastEffectiveOp(effect.effectiveOp());
|
||||
node.notifyRef(effect.newParentId());
|
||||
|
||||
} finally {
|
||||
node.rwUnlock();
|
||||
}
|
||||
} finally {
|
||||
if (oldParentNode != null)
|
||||
oldParentNode.rwUnlock();
|
||||
}
|
||||
} finally {
|
||||
newParentNode.rwUnlock();
|
||||
if (effect.oldInfo() != null) {
|
||||
oldParentNode = _storage.getById(effect.oldInfo().oldParent());
|
||||
}
|
||||
if (oldParentNode == null) {
|
||||
node = getNewNode(effect.childId(), effect.newParentId(), effect.newMeta());
|
||||
} else {
|
||||
node = _storage.getById(effect.childId());
|
||||
}
|
||||
if (oldParentNode != null) {
|
||||
var newOldParentChildren = new HashMap<>(oldParentNode.children());
|
||||
newOldParentChildren.remove(effect.oldInfo().oldMeta().getName());
|
||||
oldParentNode = oldParentNode.withChildren(newOldParentChildren);
|
||||
_storage.putNode(oldParentNode);
|
||||
}
|
||||
|
||||
{
|
||||
var newNewParentChildren = new HashMap<>(newParentNode.children());
|
||||
newNewParentChildren.put(effect.newMeta().getName(), effect.childId());
|
||||
newParentNode = newParentNode.withChildren(newNewParentChildren);
|
||||
_storage.putNode(newParentNode);
|
||||
}
|
||||
if (effect.newParentId().equals(_storage.getTrashId()) &&
|
||||
!Objects.equals(effect.newMeta().getName(), effect.childId().toString()))
|
||||
throw new IllegalArgumentException("Move to trash should have id of node as name");
|
||||
_storage.putNode(
|
||||
node.withParent(effect.newParentId())
|
||||
.withMeta(effect.newMeta())
|
||||
.withLastEffectiveOp(sourceOp)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
private LogRecord<TimestampT, PeerIdT, MetaT, NodeIdT> computeEffects(OpMove<TimestampT, PeerIdT, MetaT, NodeIdT> op, boolean failCreatingIfExists) {
|
||||
_storage.assertRwLock();
|
||||
var node = _storage.getById(op.childId());
|
||||
|
||||
NodeIdT oldParentId = (node != null && node.getNode().getParent() != null) ? node.getNode().getParent() : null;
|
||||
NodeIdT oldParentId = (node != null && node.parent() != null) ? node.parent() : null;
|
||||
NodeIdT newParentId = op.newParentId();
|
||||
WrapperT newParent = _storage.getById(newParentId);
|
||||
TreeNode<TimestampT, PeerIdT, MetaT, NodeIdT> newParent = _storage.getById(newParentId);
|
||||
|
||||
if (newParent == null) {
|
||||
LOGGER.log(Level.SEVERE, "New parent not found " + op.newMeta().getName() + " " + op.childId());
|
||||
@@ -409,34 +342,24 @@ public class KleppmannTree<TimestampT extends Comparable<TimestampT>, PeerIdT ex
|
||||
}
|
||||
|
||||
if (oldParentId == null) {
|
||||
newParent.rLock();
|
||||
try {
|
||||
var conflictNodeId = newParent.getNode().getChildren().get(op.newMeta().getName());
|
||||
var conflictNodeId = newParent.children().get(op.newMeta().getName());
|
||||
|
||||
if (conflictNodeId != null) {
|
||||
if (failCreatingIfExists)
|
||||
throw new AlreadyExistsException("Already exists: " + op.newMeta().getName() + ": " + conflictNodeId);
|
||||
if (conflictNodeId != null) {
|
||||
if (failCreatingIfExists)
|
||||
throw new AlreadyExistsException("Already exists: " + op.newMeta().getName() + ": " + conflictNodeId);
|
||||
|
||||
var conflictNode = _storage.getById(conflictNodeId);
|
||||
conflictNode.rLock();
|
||||
try {
|
||||
MetaT conflictNodeMeta = conflictNode.getNode().getMeta();
|
||||
String newConflictNodeName = conflictNodeMeta.getName() + ".conflict." + conflictNode.getNode().getId();
|
||||
String newOursName = op.newMeta().getName() + ".conflict." + op.childId();
|
||||
return new LogRecord<>(op, List.of(
|
||||
new LogEffect<>(new LogEffectOld<>(conflictNode.getNode().getLastEffectiveOp(), newParentId, conflictNodeMeta), conflictNode.getNode().getLastEffectiveOp(), newParentId, (MetaT) conflictNodeMeta.withName(newConflictNodeName), conflictNodeId),
|
||||
new LogEffect<>(null, op, op.newParentId(), (MetaT) op.newMeta().withName(newOursName), op.childId())
|
||||
));
|
||||
} finally {
|
||||
conflictNode.rUnlock();
|
||||
}
|
||||
} else {
|
||||
return new LogRecord<>(op, List.of(
|
||||
new LogEffect<>(null, op, newParentId, op.newMeta(), op.childId())
|
||||
));
|
||||
}
|
||||
} finally {
|
||||
newParent.rUnlock();
|
||||
var conflictNode = _storage.getById(conflictNodeId);
|
||||
MetaT conflictNodeMeta = conflictNode.meta();
|
||||
String newConflictNodeName = conflictNodeMeta.getName() + ".conflict." + conflictNode.key();
|
||||
String newOursName = op.newMeta().getName() + ".conflict." + op.childId();
|
||||
return new LogRecord<>(op, List.of(
|
||||
new LogEffect<>(new LogEffectOld<>(conflictNode.lastEffectiveOp(), newParentId, conflictNodeMeta), conflictNode.lastEffectiveOp(), newParentId, (MetaT) conflictNodeMeta.withName(newConflictNodeName), conflictNodeId),
|
||||
new LogEffect<>(null, op, op.newParentId(), (MetaT) op.newMeta().withName(newOursName), op.childId())
|
||||
));
|
||||
} else {
|
||||
return new LogRecord<>(op, List.of(
|
||||
new LogEffect<>(null, op, newParentId, op.newMeta(), op.childId())
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -444,96 +367,64 @@ public class KleppmannTree<TimestampT extends Comparable<TimestampT>, PeerIdT ex
|
||||
return new LogRecord<>(op, null);
|
||||
}
|
||||
|
||||
node.rLock();
|
||||
newParent.rLock();
|
||||
try {
|
||||
MetaT oldMeta = node.getNode().getMeta();
|
||||
if (!oldMeta.getClass().equals(op.newMeta().getClass())) {
|
||||
LOGGER.log(Level.SEVERE, "Class mismatch for meta for node " + node.getNode().getId());
|
||||
return new LogRecord<>(op, null);
|
||||
}
|
||||
var replaceNodeId = newParent.getNode().getChildren().get(op.newMeta().getName());
|
||||
if (replaceNodeId != null) {
|
||||
var replaceNode = _storage.getById(replaceNodeId);
|
||||
try {
|
||||
replaceNode.rLock();
|
||||
var replaceNodeMeta = replaceNode.getNode().getMeta();
|
||||
return new LogRecord<>(op, List.of(
|
||||
new LogEffect<>(new LogEffectOld<>(replaceNode.getNode().getLastEffectiveOp(), newParentId, replaceNodeMeta), replaceNode.getNode().getLastEffectiveOp(), _storage.getTrashId(), (MetaT) replaceNodeMeta.withName(replaceNodeId.toString()), replaceNodeId),
|
||||
new LogEffect<>(new LogEffectOld<>(node.getNode().getLastEffectiveOp(), oldParentId, oldMeta), op, op.newParentId(), op.newMeta(), op.childId())
|
||||
));
|
||||
} finally {
|
||||
replaceNode.rUnlock();
|
||||
}
|
||||
}
|
||||
return new LogRecord<>(op, List.of(
|
||||
new LogEffect<>(new LogEffectOld<>(node.getNode().getLastEffectiveOp(), oldParentId, oldMeta), op, op.newParentId(), op.newMeta(), op.childId())
|
||||
));
|
||||
} finally {
|
||||
newParent.rUnlock();
|
||||
node.rUnlock();
|
||||
MetaT oldMeta = node.meta();
|
||||
if (!oldMeta.getClass().equals(op.newMeta().getClass())) {
|
||||
LOGGER.log(Level.SEVERE, "Class mismatch for meta for node " + node.key());
|
||||
return new LogRecord<>(op, null);
|
||||
}
|
||||
var replaceNodeId = newParent.children().get(op.newMeta().getName());
|
||||
if (replaceNodeId != null) {
|
||||
var replaceNode = _storage.getById(replaceNodeId);
|
||||
var replaceNodeMeta = replaceNode.meta();
|
||||
return new LogRecord<>(op, List.of(
|
||||
new LogEffect<>(new LogEffectOld<>(replaceNode.lastEffectiveOp(), newParentId, replaceNodeMeta), replaceNode.lastEffectiveOp(), _storage.getTrashId(), (MetaT) replaceNodeMeta.withName(replaceNodeId.toString()), replaceNodeId),
|
||||
new LogEffect<>(new LogEffectOld<>(node.lastEffectiveOp(), oldParentId, oldMeta), op, op.newParentId(), op.newMeta(), op.childId())
|
||||
));
|
||||
}
|
||||
return new LogRecord<>(op, List.of(
|
||||
new LogEffect<>(new LogEffectOld<>(node.lastEffectiveOp(), oldParentId, oldMeta), op, op.newParentId(), op.newMeta(), op.childId())
|
||||
));
|
||||
}
|
||||
|
||||
private boolean isAncestor(NodeIdT child, NodeIdT parent) {
|
||||
var node = _storage.getById(parent);
|
||||
NodeIdT curParent;
|
||||
while ((curParent = node.getNode().getParent()) != null) {
|
||||
while ((curParent = node.parent()) != null) {
|
||||
if (Objects.equals(child, curParent)) return true;
|
||||
node = _storage.getById(curParent);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
public void walkTree(Consumer<WrapperT> consumer) {
|
||||
_storage.rLock();
|
||||
try {
|
||||
ArrayDeque<NodeIdT> queue = new ArrayDeque<>();
|
||||
queue.push(_storage.getRootId());
|
||||
public void walkTree(Consumer<TreeNode<TimestampT, PeerIdT, MetaT, NodeIdT>> consumer) {
|
||||
ArrayDeque<NodeIdT> queue = new ArrayDeque<>();
|
||||
queue.push(_storage.getRootId());
|
||||
|
||||
while (!queue.isEmpty()) {
|
||||
var id = queue.pop();
|
||||
var node = _storage.getById(id);
|
||||
if (node == null) continue;
|
||||
node.rLock();
|
||||
try {
|
||||
queue.addAll(node.getNode().getChildren().values());
|
||||
consumer.accept(node);
|
||||
} finally {
|
||||
node.rUnlock();
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
_storage.rUnlock();
|
||||
while (!queue.isEmpty()) {
|
||||
var id = queue.pop();
|
||||
var node = _storage.getById(id);
|
||||
if (node == null) continue;
|
||||
queue.addAll(node.children().values());
|
||||
consumer.accept(node);
|
||||
}
|
||||
}
|
||||
|
||||
public Pair<String, NodeIdT> findParent(Function<WrapperT, Boolean> kidPredicate) {
|
||||
_storage.rLock();
|
||||
try {
|
||||
ArrayDeque<NodeIdT> queue = new ArrayDeque<>();
|
||||
queue.push(_storage.getRootId());
|
||||
public Pair<String, NodeIdT> findParent(Function<TreeNode<TimestampT, PeerIdT, MetaT, NodeIdT>, Boolean> kidPredicate) {
|
||||
ArrayDeque<NodeIdT> queue = new ArrayDeque<>();
|
||||
queue.push(_storage.getRootId());
|
||||
|
||||
while (!queue.isEmpty()) {
|
||||
var id = queue.pop();
|
||||
var node = _storage.getById(id);
|
||||
if (node == null) continue;
|
||||
node.rLock();
|
||||
try {
|
||||
var children = node.getNode().getChildren();
|
||||
for (var childEntry : children.entrySet()) {
|
||||
var child = _storage.getById(childEntry.getValue());
|
||||
if (kidPredicate.apply(child)) {
|
||||
return Pair.of(childEntry.getKey(), node.getNode().getId());
|
||||
}
|
||||
}
|
||||
queue.addAll(children.values());
|
||||
} finally {
|
||||
node.rUnlock();
|
||||
while (!queue.isEmpty()) {
|
||||
var id = queue.pop();
|
||||
var node = _storage.getById(id);
|
||||
if (node == null) continue;
|
||||
var children = node.children();
|
||||
for (var childEntry : children.entrySet()) {
|
||||
var child = _storage.getById(childEntry.getValue());
|
||||
if (kidPredicate.apply(child)) {
|
||||
return Pair.of(childEntry.getKey(), node.key());
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
_storage.rUnlock();
|
||||
queue.addAll(children.values());
|
||||
}
|
||||
return null;
|
||||
}
|
||||
@@ -541,27 +432,22 @@ public class KleppmannTree<TimestampT extends Comparable<TimestampT>, PeerIdT ex
|
||||
public void recordBoostrapFor(PeerIdT host) {
|
||||
TreeMap<CombinedTimestamp<TimestampT, PeerIdT>, OpMove<TimestampT, PeerIdT, MetaT, NodeIdT>> result = new TreeMap<>();
|
||||
|
||||
_storage.rwLock();
|
||||
try {
|
||||
walkTree(node -> {
|
||||
var op = node.getNode().getLastEffectiveOp();
|
||||
if (node.getNode().getLastEffectiveOp() == null) return;
|
||||
LOGGER.info("visited bootstrap op for " + host + ": " + op.timestamp().toString() + " " + op.newMeta().getName() + " " + op.childId() + "->" + op.newParentId());
|
||||
result.put(node.getNode().getLastEffectiveOp().timestamp(), node.getNode().getLastEffectiveOp());
|
||||
});
|
||||
walkTree(node -> {
|
||||
var op = node.lastEffectiveOp();
|
||||
if (node.lastEffectiveOp() == null) return;
|
||||
LOGGER.info("visited bootstrap op for " + host + ": " + op.timestamp().toString() + " " + op.newMeta().getName() + " " + op.childId() + "->" + op.newParentId());
|
||||
result.put(node.lastEffectiveOp().timestamp(), node.lastEffectiveOp());
|
||||
});
|
||||
|
||||
for (var le : _storage.getLog().getAll()) {
|
||||
var op = le.getValue().op();
|
||||
LOGGER.info("bootstrap op from log for " + host + ": " + op.timestamp().toString() + " " + op.newMeta().getName() + " " + op.childId() + "->" + op.newParentId());
|
||||
result.put(le.getKey(), le.getValue().op());
|
||||
}
|
||||
for (var le : _storage.getLog().getAll()) {
|
||||
var op = le.getValue().op();
|
||||
LOGGER.info("bootstrap op from log for " + host + ": " + op.timestamp().toString() + " " + op.newMeta().getName() + " " + op.childId() + "->" + op.newParentId());
|
||||
result.put(le.getKey(), le.getValue().op());
|
||||
}
|
||||
|
||||
for (var op : result.values()) {
|
||||
LOGGER.info("Recording bootstrap op for " + host + ": " + op.timestamp().toString() + " " + op.newMeta().getName() + " " + op.childId() + "->" + op.newParentId());
|
||||
_opRecorder.recordOpForPeer(host, op);
|
||||
}
|
||||
} finally {
|
||||
_storage.rwUnlock();
|
||||
for (var op : result.values()) {
|
||||
LOGGER.info("Recording bootstrap op for " + host + ": " + op.timestamp().toString() + " " + op.newMeta().getName() + " " + op.childId() + "->" + op.newParentId());
|
||||
_opRecorder.recordOpForPeer(host, op);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4,32 +4,23 @@ public interface StorageInterface<
|
||||
TimestampT extends Comparable<TimestampT>,
|
||||
PeerIdT extends Comparable<PeerIdT>,
|
||||
MetaT extends NodeMeta,
|
||||
NodeIdT,
|
||||
WrapperT extends TreeNodeWrapper<TimestampT, PeerIdT, MetaT, NodeIdT>> {
|
||||
NodeIdT> {
|
||||
NodeIdT getRootId();
|
||||
|
||||
NodeIdT getTrashId();
|
||||
|
||||
NodeIdT getNewNodeId();
|
||||
|
||||
WrapperT getById(NodeIdT id);
|
||||
TreeNode<TimestampT, PeerIdT, MetaT, NodeIdT> getById(NodeIdT id);
|
||||
|
||||
// Creates a node, returned wrapper is RW-locked
|
||||
WrapperT createNewNode(TreeNode<TimestampT, PeerIdT, MetaT, NodeIdT> node);
|
||||
TreeNode<TimestampT, PeerIdT, MetaT, NodeIdT> createNewNode(NodeIdT key, NodeIdT parent, MetaT meta);
|
||||
|
||||
void putNode(TreeNode<TimestampT, PeerIdT, MetaT, NodeIdT> node);
|
||||
|
||||
void removeNode(NodeIdT id);
|
||||
|
||||
LogInterface<TimestampT, PeerIdT, MetaT, NodeIdT> getLog();
|
||||
|
||||
PeerTimestampLogInterface<TimestampT, PeerIdT> getPeerTimestampLog();
|
||||
|
||||
void rLock();
|
||||
|
||||
void rUnlock();
|
||||
|
||||
void rwLock();
|
||||
|
||||
void rwUnlock();
|
||||
|
||||
void assertRwLock();
|
||||
}
|
||||
|
||||
@@ -1,32 +1,24 @@
|
||||
package com.usatiuk.kleppmanntree;
|
||||
|
||||
import lombok.Getter;
|
||||
import lombok.Setter;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
@Getter
|
||||
@Setter
|
||||
public class TreeNode<TimestampT extends Comparable<TimestampT>, PeerIdT extends Comparable<PeerIdT>, MetaT extends NodeMeta, NodeIdT> implements Serializable {
|
||||
private final NodeIdT _id;
|
||||
private NodeIdT _parent = null;
|
||||
private OpMove<TimestampT, PeerIdT, MetaT, NodeIdT> _lastEffectiveOp = null;
|
||||
private MetaT _meta = null;
|
||||
private Map<String, NodeIdT> _children = new HashMap<>();
|
||||
public interface TreeNode<TimestampT extends Comparable<TimestampT>, PeerIdT extends Comparable<PeerIdT>, MetaT extends NodeMeta, NodeIdT> extends Serializable {
|
||||
NodeIdT key();
|
||||
|
||||
public TreeNode(NodeIdT id, NodeIdT parent, MetaT meta) {
|
||||
_id = id;
|
||||
_meta = meta;
|
||||
_parent = parent;
|
||||
}
|
||||
NodeIdT parent();
|
||||
|
||||
public TreeNode(NodeIdT id, NodeIdT parent, MetaT meta, Map<String, NodeIdT> children) {
|
||||
_id = id;
|
||||
_meta = meta;
|
||||
_parent = parent;
|
||||
_children = children;
|
||||
}
|
||||
OpMove<TimestampT, PeerIdT, MetaT, NodeIdT> lastEffectiveOp();
|
||||
|
||||
MetaT meta();
|
||||
|
||||
Map<String, NodeIdT> children();
|
||||
|
||||
TreeNode<TimestampT, PeerIdT, MetaT, NodeIdT> withParent(NodeIdT parent);
|
||||
|
||||
TreeNode<TimestampT, PeerIdT, MetaT, NodeIdT> withLastEffectiveOp(OpMove<TimestampT, PeerIdT, MetaT, NodeIdT> lastEffectiveOp);
|
||||
|
||||
TreeNode<TimestampT, PeerIdT, MetaT, NodeIdT> withMeta(MetaT meta);
|
||||
|
||||
TreeNode<TimestampT, PeerIdT, MetaT, NodeIdT> withChildren(Map<String, NodeIdT> children);
|
||||
}
|
||||
|
||||
@@ -1,21 +0,0 @@
|
||||
package com.usatiuk.kleppmanntree;
|
||||
|
||||
public interface TreeNodeWrapper<TimestampT extends Comparable<TimestampT>, PeerIdT extends Comparable<PeerIdT>, MetaT extends NodeMeta, NodeIdT> {
|
||||
void rLock();
|
||||
|
||||
void rUnlock();
|
||||
|
||||
void rwLock();
|
||||
|
||||
void rwUnlock();
|
||||
|
||||
void freeze();
|
||||
|
||||
void unfreeze();
|
||||
|
||||
void notifyRef(NodeIdT id);
|
||||
|
||||
void notifyRmRef(NodeIdT id);
|
||||
|
||||
TreeNode<TimestampT, PeerIdT, MetaT, NodeIdT> getNode();
|
||||
}
|
||||
@@ -32,8 +32,8 @@ public class KleppmanTreeSimpleTest {
|
||||
Assertions.assertEquals(d1id, testNode2._tree.traverse(List.of("Test1")));
|
||||
Assertions.assertEquals(d2id, testNode2._tree.traverse(List.of("Test2")));
|
||||
|
||||
Assertions.assertIterableEquals(List.of("Test1", "Test2"), testNode1._storageInterface.getById(testNode2._storageInterface.getRootId()).getNode().getChildren().keySet());
|
||||
Assertions.assertIterableEquals(List.of("Test1", "Test2"), testNode2._storageInterface.getById(testNode2._storageInterface.getRootId()).getNode().getChildren().keySet());
|
||||
Assertions.assertIterableEquals(List.of("Test1", "Test2"), testNode1._storageInterface.getById(testNode2._storageInterface.getRootId()).children().keySet());
|
||||
Assertions.assertIterableEquals(List.of("Test1", "Test2"), testNode2._storageInterface.getById(testNode2._storageInterface.getRootId()).children().keySet());
|
||||
|
||||
var f1id = testNode1._storageInterface.getNewNodeId();
|
||||
|
||||
@@ -54,10 +54,10 @@ public class KleppmanTreeSimpleTest {
|
||||
testNode1._tree.move(d1id, new TestNodeMetaDir("Test2"), d2id);
|
||||
Assertions.assertEquals(d1id, testNode1._tree.traverse(List.of("Test1")));
|
||||
Assertions.assertEquals(d2id, testNode1._tree.traverse(List.of("Test1", "Test2")));
|
||||
Assertions.assertIterableEquals(List.of("Test1"), testNode1._storageInterface.getById(testNode2._storageInterface.getRootId()).getNode().getChildren().keySet());
|
||||
Assertions.assertIterableEquals(List.of("Test1"), testNode1._storageInterface.getById(testNode2._storageInterface.getRootId()).children().keySet());
|
||||
|
||||
testNode2._tree.move(d2id, new TestNodeMetaDir("Test1"), d1id);
|
||||
Assertions.assertIterableEquals(List.of("Test2"), testNode2._storageInterface.getById(testNode2._storageInterface.getRootId()).getNode().getChildren().keySet());
|
||||
Assertions.assertIterableEquals(List.of("Test2"), testNode2._storageInterface.getById(testNode2._storageInterface.getRootId()).children().keySet());
|
||||
Assertions.assertEquals(d2id, testNode2._tree.traverse(List.of("Test2")));
|
||||
Assertions.assertEquals(d1id, testNode2._tree.traverse(List.of("Test2", "Test1")));
|
||||
|
||||
@@ -72,8 +72,8 @@ public class KleppmanTreeSimpleTest {
|
||||
}
|
||||
|
||||
// Second node wins as it has smaller timestamp
|
||||
Assertions.assertIterableEquals(List.of("Test2"), testNode1._storageInterface.getById(testNode2._storageInterface.getRootId()).getNode().getChildren().keySet());
|
||||
Assertions.assertIterableEquals(List.of("Test1", "TestFile"), testNode1._storageInterface.getById(d2id).getNode().getChildren().keySet());
|
||||
Assertions.assertIterableEquals(List.of("Test2"), testNode1._storageInterface.getById(testNode2._storageInterface.getRootId()).children().keySet());
|
||||
Assertions.assertIterableEquals(List.of("Test1", "TestFile"), testNode1._storageInterface.getById(d2id).children().keySet());
|
||||
Assertions.assertEquals(d2id, testNode1._tree.traverse(List.of("Test2")));
|
||||
Assertions.assertEquals(d1id, testNode1._tree.traverse(List.of("Test2", "Test1")));
|
||||
Assertions.assertEquals(f1id, testNode1._tree.traverse(List.of("Test2", "TestFile")));
|
||||
@@ -81,8 +81,8 @@ public class KleppmanTreeSimpleTest {
|
||||
var f11 = testNode1._storageInterface.getById(f1id);
|
||||
var f12 = testNode2._storageInterface.getById(f1id);
|
||||
|
||||
Assertions.assertEquals(f11.getNode().getMeta(), f12.getNode().getMeta());
|
||||
Assertions.assertInstanceOf(TestNodeMetaFile.class, f11.getNode().getMeta());
|
||||
Assertions.assertEquals(f11.meta(), f12.meta());
|
||||
Assertions.assertInstanceOf(TestNodeMetaFile.class, f11.meta());
|
||||
|
||||
// Trim test
|
||||
Assertions.assertTrue(testNode1._storageInterface.getLog().size() <= 1);
|
||||
|
||||
@@ -9,7 +9,7 @@ public class TestNode {
|
||||
protected final TestClock _clock;
|
||||
protected final TestPeerInterface _peerInterface;
|
||||
protected final TestStorageInterface _storageInterface;
|
||||
protected final KleppmannTree<Long, Long, TestNodeMeta, Long, TestNodeWrapper> _tree;
|
||||
protected final KleppmannTree<Long, Long, TestNodeMeta, Long> _tree;
|
||||
private final TestOpRecorder _recorder;
|
||||
|
||||
public TestNode(long id) {
|
||||
|
||||
@@ -1,52 +0,0 @@
|
||||
package com.usatiuk.kleppmanntree;
|
||||
|
||||
public class TestNodeWrapper implements TreeNodeWrapper<Long, Long, TestNodeMeta, Long> {
|
||||
private final TreeNode<Long, Long, TestNodeMeta, Long> _backingNode;
|
||||
|
||||
public TestNodeWrapper(TreeNode<Long, Long, TestNodeMeta, Long> backingNode) {_backingNode = backingNode;}
|
||||
|
||||
@Override
|
||||
public void rLock() {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void rUnlock() {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void rwLock() {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void rwUnlock() {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void freeze() {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void unfreeze() {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void notifyRef(Long id) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void notifyRmRef(Long id) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public TreeNode<Long, Long, TestNodeMeta, Long> getNode() {
|
||||
return _backingNode;
|
||||
}
|
||||
}
|
||||
@@ -3,17 +3,17 @@ package com.usatiuk.kleppmanntree;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
public class TestStorageInterface implements StorageInterface<Long, Long, TestNodeMeta, Long, TestNodeWrapper> {
|
||||
public class TestStorageInterface implements StorageInterface<Long, Long, TestNodeMeta, Long> {
|
||||
private final long _peerId;
|
||||
private final Map<Long, TreeNode<Long, Long, TestNodeMeta, Long>> _nodes = new HashMap<>();
|
||||
private final Map<Long, TestTreeNode> _nodes = new HashMap<>();
|
||||
private final TestLog _log = new TestLog();
|
||||
private final TestPeerLog _peerLog = new TestPeerLog();
|
||||
private long _curId = 1;
|
||||
|
||||
public TestStorageInterface(long peerId) {
|
||||
_peerId = peerId;
|
||||
_nodes.put(getRootId(), new TreeNode<>(getRootId(), null, null));
|
||||
_nodes.put(getTrashId(), new TreeNode<>(getTrashId(), null, null));
|
||||
_nodes.put(getRootId(), new TestTreeNode(getRootId(), null, null));
|
||||
_nodes.put(getTrashId(), new TestTreeNode(getTrashId(), null, null));
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -32,18 +32,18 @@ public class TestStorageInterface implements StorageInterface<Long, Long, TestNo
|
||||
}
|
||||
|
||||
@Override
|
||||
public TestNodeWrapper getById(Long id) {
|
||||
var node = _nodes.get(id);
|
||||
return node == null ? null : new TestNodeWrapper(node);
|
||||
public TestTreeNode getById(Long id) {
|
||||
return _nodes.get(id);
|
||||
}
|
||||
|
||||
@Override
|
||||
public TestNodeWrapper createNewNode(TreeNode<Long, Long, TestNodeMeta, Long> node) {
|
||||
if (!_nodes.containsKey(node.getId())) {
|
||||
_nodes.put(node.getId(), node);
|
||||
return new TestNodeWrapper(node);
|
||||
}
|
||||
throw new IllegalStateException("Node with id " + node.getId() + " already exists");
|
||||
public TestTreeNode createNewNode(Long key, Long parent, TestNodeMeta meta) {
|
||||
return new TestTreeNode(key, parent, meta);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void putNode(TreeNode<Long, Long, TestNodeMeta, Long> node) {
|
||||
_nodes.put(node.key(), (TestTreeNode) node);
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -53,7 +53,6 @@ public class TestStorageInterface implements StorageInterface<Long, Long, TestNo
|
||||
_nodes.remove(id);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public LogInterface<Long, Long, TestNodeMeta, Long> getLog() {
|
||||
return _log;
|
||||
@@ -64,29 +63,4 @@ public class TestStorageInterface implements StorageInterface<Long, Long, TestNo
|
||||
public PeerTimestampLogInterface<Long, Long> getPeerTimestampLog() {
|
||||
return _peerLog;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void rLock() {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void rUnlock() {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void rwLock() {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void rwUnlock() {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void assertRwLock() {
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,37 @@
|
||||
package com.usatiuk.kleppmanntree;
|
||||
|
||||
import lombok.Builder;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
|
||||
@Builder(toBuilder = true)
|
||||
public record TestTreeNode(Long key, Long parent, OpMove<Long, Long, TestNodeMeta, Long> lastEffectiveOp,
|
||||
TestNodeMeta meta,
|
||||
Map<String, Long> children) implements TreeNode<Long, Long, TestNodeMeta, Long> {
|
||||
|
||||
public TestTreeNode(Long id, Long parent, TestNodeMeta meta) {
|
||||
this(id, parent, null, meta, Collections.emptyMap());
|
||||
}
|
||||
|
||||
@Override
|
||||
public TreeNode<Long, Long, TestNodeMeta, Long> withParent(Long parent) {
|
||||
return this.toBuilder().parent(parent).build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public TreeNode<Long, Long, TestNodeMeta, Long> withLastEffectiveOp(OpMove<Long, Long, TestNodeMeta, Long> lastEffectiveOp) {
|
||||
return this.toBuilder().lastEffectiveOp(lastEffectiveOp).build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public TreeNode<Long, Long, TestNodeMeta, Long> withMeta(TestNodeMeta meta) {
|
||||
return this.toBuilder().meta(meta).build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public TreeNode<Long, Long, TestNodeMeta, Long> withChildren(Map<String, Long> children) {
|
||||
return this.toBuilder().children(children).build();
|
||||
}
|
||||
}
|
||||
@@ -232,19 +232,17 @@ public class JObjectManager {
|
||||
for (var hook : _preCommitTxHooks) {
|
||||
for (var entry : drained) {
|
||||
Log.trace("Running pre-commit hook " + hook.getClass() + " for" + entry.toString());
|
||||
switch (entry) {
|
||||
case TxRecord.TxObjectRecordWrite<?> write -> {
|
||||
var oldObj = getCurrent.apply(write.key());
|
||||
if (oldObj == null) {
|
||||
hook.onCreate(write.key(), write.data());
|
||||
} else {
|
||||
hook.onChange(write.key(), oldObj, write.data());
|
||||
}
|
||||
}
|
||||
case TxRecord.TxObjectRecordDeleted deleted -> {
|
||||
hook.onDelete(deleted.key(), getCurrent.apply(deleted.key()));
|
||||
}
|
||||
default -> throw new IllegalStateException("Unexpected value: " + entry);
|
||||
var oldObj = getCurrent.apply(entry.key());
|
||||
var curObj = tx.get(JData.class, entry.key()).orElse(null);
|
||||
|
||||
assert (curObj == null) == (entry instanceof TxRecord.TxObjectRecordDeleted);
|
||||
|
||||
if (curObj == null) {
|
||||
hook.onDelete(entry.key(), oldObj);
|
||||
} else if (oldObj == null) {
|
||||
hook.onCreate(entry.key(), curObj);
|
||||
} else {
|
||||
hook.onChange(entry.key(), oldObj, curObj);
|
||||
}
|
||||
current.put(entry.key(), entry);
|
||||
}
|
||||
|
||||
@@ -31,7 +31,7 @@ public class ReadTrackingObjectSource implements TransactionObjectSource {
|
||||
}
|
||||
|
||||
got.data().ifPresent(data -> {
|
||||
if (!type.isInstance(data))
|
||||
if (!type.isInstance(data.data()))
|
||||
throw new IllegalStateException("Type mismatch for " + got + ": expected " + type + ", got " + data.getClass());
|
||||
});
|
||||
|
||||
@@ -49,7 +49,7 @@ public class ReadTrackingObjectSource implements TransactionObjectSource {
|
||||
}
|
||||
|
||||
got.data().ifPresent(data -> {
|
||||
if (!type.isInstance(data))
|
||||
if (!type.isInstance(data.data()))
|
||||
throw new IllegalStateException("Type mismatch for " + got + ": expected " + type + ", got " + data.getClass());
|
||||
});
|
||||
|
||||
|
||||
@@ -4,7 +4,6 @@ import com.usatiuk.dhfs.objects.JDataVersionedWrapper;
|
||||
import com.usatiuk.objects.common.runtime.JData;
|
||||
import com.usatiuk.objects.common.runtime.JObjectKey;
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
import jakarta.inject.Inject;
|
||||
import lombok.AccessLevel;
|
||||
import lombok.Getter;
|
||||
|
||||
@@ -30,6 +29,21 @@ public class TransactionFactoryImpl implements TransactionFactory {
|
||||
|
||||
@Override
|
||||
public <T extends JData> Optional<T> get(Class<T> type, JObjectKey key, LockingStrategy strategy) {
|
||||
switch (_writes.get(key)) {
|
||||
case TxRecord.TxObjectRecordWrite<?> write -> {
|
||||
if (type.isInstance(write.data())) {
|
||||
return Optional.of((T) write.data());
|
||||
} else {
|
||||
throw new IllegalStateException("Type mismatch for " + key + ": expected " + type + ", got " + write.data().getClass());
|
||||
}
|
||||
}
|
||||
case TxRecord.TxObjectRecordDeleted deleted -> {
|
||||
return Optional.empty();
|
||||
}
|
||||
case null, default -> {
|
||||
}
|
||||
}
|
||||
|
||||
return switch (strategy) {
|
||||
case OPTIMISTIC -> _source.get(type, key).data().map(JDataVersionedWrapper::data);
|
||||
case WRITE -> _source.getWriteLocked(type, key).data().map(JDataVersionedWrapper::data);
|
||||
|
||||
@@ -41,6 +41,25 @@ public class ObjectsTest {
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
void createGetObject() {
|
||||
{
|
||||
txm.begin();
|
||||
var newParent = new Parent(JObjectKey.of("ParentCreateGet"), "John");
|
||||
curTx.put(newParent);
|
||||
var parent = curTx.get(Parent.class, JObjectKey.of("ParentCreateGet")).orElse(null);
|
||||
Assertions.assertEquals("John", parent.name());
|
||||
txm.commit();
|
||||
}
|
||||
|
||||
{
|
||||
txm.begin();
|
||||
var parent = curTx.get(Parent.class, JObjectKey.of("ParentCreateGet")).orElse(null);
|
||||
Assertions.assertEquals("John", parent.name());
|
||||
txm.commit();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
void createDeleteObject() {
|
||||
{
|
||||
@@ -207,6 +226,7 @@ public class ObjectsTest {
|
||||
curTx.put(parent.toBuilder().name("John").build());
|
||||
Log.warn("Thread 1 commit");
|
||||
txm.commit();
|
||||
Log.warn("Thread 1 commit done");
|
||||
thread1Failed.set(false);
|
||||
return null;
|
||||
} finally {
|
||||
@@ -222,6 +242,7 @@ public class ObjectsTest {
|
||||
curTx.put(parent.toBuilder().name("John2").build());
|
||||
Log.warn("Thread 2 commit");
|
||||
txm.commit();
|
||||
Log.warn("Thread 2 commit done");
|
||||
thread2Failed.set(false);
|
||||
return null;
|
||||
} finally {
|
||||
|
||||
@@ -1,3 +1,5 @@
|
||||
dhfs.objects.persistence=memory
|
||||
quarkus.log.category."com.usatiuk".level=TRACE
|
||||
quarkus.log.category."com.usatiuk".min-level=TRACE
|
||||
quarkus.http.test-port=0
|
||||
quarkus.http.test-ssl-port=0
|
||||
|
||||
@@ -2,12 +2,27 @@ package com.usatiuk.dhfs.files.objects;
|
||||
|
||||
import com.google.protobuf.ByteString;
|
||||
import com.usatiuk.dhfs.objects.JDataRefcounted;
|
||||
import com.usatiuk.objects.common.runtime.JData;
|
||||
import com.usatiuk.objects.common.runtime.JObjectKey;
|
||||
import lombok.Builder;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.Collection;
|
||||
import java.util.HashSet;
|
||||
import java.util.LinkedHashSet;
|
||||
|
||||
public interface ChunkData extends JDataRefcounted, Serializable {
|
||||
ByteString getData();
|
||||
@Builder(toBuilder = true)
|
||||
public record ChunkData(JObjectKey key, Collection<JObjectKey> refsFrom, boolean frozen,
|
||||
ByteString data) implements JDataRefcounted {
|
||||
public ChunkData(JObjectKey key, ByteString data) {
|
||||
this(key, new LinkedHashSet<>(), false, data);
|
||||
}
|
||||
|
||||
void setData(ByteString data);
|
||||
}
|
||||
@Override
|
||||
public ChunkData withRefsFrom(Collection<JObjectKey> refs) {
|
||||
return this.toBuilder().refsFrom(refs).build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChunkData withFrozen(boolean frozen) {
|
||||
return this.toBuilder().frozen(frozen).build();
|
||||
}
|
||||
}
|
||||
@@ -1,25 +1,27 @@
|
||||
package com.usatiuk.dhfs.files.objects;
|
||||
|
||||
import com.usatiuk.objects.common.runtime.JObjectKey;
|
||||
import lombok.Builder;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.NavigableMap;
|
||||
|
||||
public interface File extends FsNode {
|
||||
NavigableMap<Long, JObjectKey> getChunks();
|
||||
|
||||
void setChunks(NavigableMap<Long, JObjectKey> chunks);
|
||||
|
||||
boolean getSymlink();
|
||||
|
||||
void setSymlink(boolean symlink);
|
||||
|
||||
long getSize();
|
||||
|
||||
void setSize(long size);
|
||||
@Builder(toBuilder = true)
|
||||
public record File(JObjectKey key, Collection<JObjectKey> refsFrom, boolean frozen, long mode, long cTime, long mTime,
|
||||
NavigableMap<Long, JObjectKey> chunks, boolean symlink, long size
|
||||
) implements FsNode {
|
||||
@Override
|
||||
public File withRefsFrom(Collection<JObjectKey> refs) {
|
||||
return this.toBuilder().refsFrom(refs).build();
|
||||
}
|
||||
|
||||
@Override
|
||||
default Collection<JObjectKey> collectRefsTo() {
|
||||
return getChunks().values().stream().toList();
|
||||
public File withFrozen(boolean frozen) {
|
||||
return this.toBuilder().frozen(frozen).build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Collection<JObjectKey> collectRefsTo() {
|
||||
return chunks().values().stream().toList();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,20 +1,11 @@
|
||||
package com.usatiuk.dhfs.files.objects;
|
||||
|
||||
import com.usatiuk.dhfs.objects.JDataRefcounted;
|
||||
import com.usatiuk.objects.common.runtime.JData;
|
||||
|
||||
import java.io.Serializable;
|
||||
public interface FsNode extends JDataRefcounted {
|
||||
long mode();
|
||||
|
||||
public interface FsNode extends JDataRefcounted, Serializable {
|
||||
long getMode();
|
||||
long cTime();
|
||||
|
||||
void setMode(long mode);
|
||||
|
||||
long getCtime();
|
||||
|
||||
void setCtime(long ctime);
|
||||
|
||||
long getMtime();
|
||||
|
||||
void setMtime(long mtime);
|
||||
long mTime();
|
||||
}
|
||||
|
||||
@@ -12,7 +12,6 @@ import com.usatiuk.dhfs.objects.jkleppmanntree.structs.JKleppmannTreeNodeMetaDir
|
||||
import com.usatiuk.dhfs.objects.jkleppmanntree.structs.JKleppmannTreeNodeMetaFile;
|
||||
import com.usatiuk.dhfs.objects.transaction.Transaction;
|
||||
import com.usatiuk.dhfs.utils.StatusRuntimeExceptionNoStacktrace;
|
||||
import com.usatiuk.objects.alloc.runtime.ObjectAllocator;
|
||||
import com.usatiuk.objects.common.runtime.JData;
|
||||
import com.usatiuk.objects.common.runtime.JObjectKey;
|
||||
import io.grpc.Status;
|
||||
@@ -37,8 +36,6 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
Transaction curTx;
|
||||
@Inject
|
||||
TransactionManager jObjectTxManager;
|
||||
@Inject
|
||||
ObjectAllocator objectAllocator;
|
||||
|
||||
@ConfigProperty(name = "dhfs.files.target_chunk_size")
|
||||
int targetChunkSize;
|
||||
@@ -75,9 +72,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
}
|
||||
|
||||
private ChunkData createChunk(ByteString bytes) {
|
||||
var newChunk = objectAllocator.create(ChunkData.class, new JObjectKey(UUID.randomUUID().toString()));
|
||||
newChunk.setData(bytes);
|
||||
newChunk.setRefsFrom(List.of());
|
||||
var newChunk = new ChunkData(JObjectKey.of(UUID.randomUUID().toString()), bytes);
|
||||
curTx.put(newChunk);
|
||||
return newChunk;
|
||||
}
|
||||
@@ -108,11 +103,11 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
if (ref == null) return Optional.empty();
|
||||
GetattrRes ret;
|
||||
if (ref instanceof File f) {
|
||||
ret = new GetattrRes(f.getMtime(), f.getCtime(), f.getMode(), f.getSymlink() ? GetattrType.SYMLINK : GetattrType.FILE);
|
||||
ret = new GetattrRes(f.mTime(), f.cTime(), f.mode(), f.symlink() ? GetattrType.SYMLINK : GetattrType.FILE);
|
||||
} else if (ref instanceof JKleppmannTreeNode) {
|
||||
ret = new GetattrRes(100, 100, 0700, GetattrType.DIRECTORY);
|
||||
} else {
|
||||
throw new StatusRuntimeException(Status.DATA_LOSS.withDescription("FsNode is not an FsNode: " + ref.getKey()));
|
||||
throw new StatusRuntimeException(Status.DATA_LOSS.withDescription("FsNode is not an FsNode: " + ref.key()));
|
||||
}
|
||||
return Optional.of(ret);
|
||||
});
|
||||
@@ -123,9 +118,9 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
return jObjectTxManager.executeTx(() -> {
|
||||
try {
|
||||
var ret = getDirEntry(name);
|
||||
return switch (ret.getNode().getMeta()) {
|
||||
return switch (ret.meta()) {
|
||||
case JKleppmannTreeNodeMetaFile f -> Optional.of(f.getFileIno());
|
||||
case JKleppmannTreeNodeMetaDirectory f -> Optional.of(ret.getKey());
|
||||
case JKleppmannTreeNodeMetaDirectory f -> Optional.of(ret.key());
|
||||
default -> Optional.empty();
|
||||
};
|
||||
} catch (StatusRuntimeException e) {
|
||||
@@ -138,8 +133,8 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
}
|
||||
|
||||
private void ensureDir(JKleppmannTreeNode entry) {
|
||||
if (!(entry.getNode().getMeta() instanceof JKleppmannTreeNodeMetaDirectory))
|
||||
throw new StatusRuntimeExceptionNoStacktrace(Status.INVALID_ARGUMENT.withDescription("Not a directory: " + entry.getKey()));
|
||||
if (!(entry.meta() instanceof JKleppmannTreeNodeMetaDirectory))
|
||||
throw new StatusRuntimeExceptionNoStacktrace(Status.INVALID_ARGUMENT.withDescription("Not a directory: " + entry.key()));
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -154,24 +149,18 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
|
||||
var fuuid = UUID.randomUUID();
|
||||
Log.debug("Creating file " + fuuid);
|
||||
File f = objectAllocator.create(File.class, new JObjectKey(fuuid.toString()));
|
||||
f.setMode(mode);
|
||||
f.setMtime(System.currentTimeMillis());
|
||||
f.setCtime(f.getMtime());
|
||||
f.setSymlink(false);
|
||||
f.setChunks(new TreeMap<>());
|
||||
f.setRefsFrom(List.of());
|
||||
File f = new File(JObjectKey.of(fuuid.toString()), new HashSet<>(), false, mode, System.currentTimeMillis(), System.currentTimeMillis(), new TreeMap<>(), false, 0);
|
||||
curTx.put(f);
|
||||
|
||||
try {
|
||||
getTree().move(parent.getKey(), new JKleppmannTreeNodeMetaFile(fname, f.getKey()), getTree().getNewNodeId());
|
||||
getTree().move(parent.key(), new JKleppmannTreeNodeMetaFile(fname, f.key()), getTree().getNewNodeId());
|
||||
} catch (Exception e) {
|
||||
// fobj.getMeta().removeRef(newNodeId);
|
||||
throw e;
|
||||
} finally {
|
||||
// fobj.rwUnlock();
|
||||
}
|
||||
return Optional.of(f.getKey());
|
||||
return Optional.of(f.key());
|
||||
});
|
||||
}
|
||||
|
||||
@@ -180,7 +169,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
public Pair<String, JObjectKey> inoToParent(JObjectKey ino) {
|
||||
return jObjectTxManager.executeTx(() -> {
|
||||
return getTree().findParent(w -> {
|
||||
if (w.getNode().getMeta() instanceof JKleppmannTreeNodeMetaFile f)
|
||||
if (w.meta() instanceof JKleppmannTreeNodeMetaFile f)
|
||||
if (f.getFileIno().equals(ino))
|
||||
return true;
|
||||
return false;
|
||||
@@ -199,7 +188,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
|
||||
Log.debug("Creating directory " + name);
|
||||
|
||||
getTree().move(parent.getKey(), new JKleppmannTreeNodeMetaDirectory(dname), getTree().getNewNodeId());
|
||||
getTree().move(parent.key(), new JKleppmannTreeNodeMetaDirectory(dname), getTree().getNewNodeId());
|
||||
});
|
||||
}
|
||||
|
||||
@@ -207,11 +196,11 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
public void unlink(String name) {
|
||||
jObjectTxManager.executeTx(() -> {
|
||||
var node = getDirEntryOpt(name).orElse(null);
|
||||
if (node.getNode().getMeta() instanceof JKleppmannTreeNodeMetaDirectory f) {
|
||||
if (!allowRecursiveDelete && !node.getNode().getChildren().isEmpty())
|
||||
if (node.meta() instanceof JKleppmannTreeNodeMetaDirectory f) {
|
||||
if (!allowRecursiveDelete && !node.children().isEmpty())
|
||||
throw new DirectoryNotEmptyException();
|
||||
}
|
||||
getTree().trash(node.getNode().getMeta(), node.getKey());
|
||||
getTree().trash(node.meta(), node.key());
|
||||
});
|
||||
}
|
||||
|
||||
@@ -219,13 +208,13 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
public Boolean rename(String from, String to) {
|
||||
return jObjectTxManager.executeTx(() -> {
|
||||
var node = getDirEntry(from);
|
||||
JKleppmannTreeNodeMeta meta = node.getNode().getMeta();
|
||||
JKleppmannTreeNodeMeta meta = node.meta();
|
||||
|
||||
var toPath = Path.of(to);
|
||||
var toDentry = getDirEntry(toPath.getParent().toString());
|
||||
ensureDir(toDentry);
|
||||
|
||||
getTree().move(toDentry.getKey(), meta.withName(toPath.getFileName().toString()), node.getKey());
|
||||
getTree().move(toDentry.key(), meta.withName(toPath.getFileName().toString()), node.key());
|
||||
return true;
|
||||
});
|
||||
}
|
||||
@@ -238,8 +227,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
if (dent instanceof JKleppmannTreeNode) {
|
||||
return true;
|
||||
} else if (dent instanceof File f) {
|
||||
f.setMode(mode);
|
||||
f.setMtime(System.currentTimeMillis());
|
||||
curTx.put(f.toBuilder().mode(mode).mTime(System.currentTimeMillis()).build());
|
||||
return true;
|
||||
} else {
|
||||
throw new IllegalArgumentException(uuid + " is not a file");
|
||||
@@ -252,10 +240,10 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
return jObjectTxManager.executeTx(() -> {
|
||||
var found = getDirEntry(name);
|
||||
|
||||
if (!(found.getNode().getMeta() instanceof JKleppmannTreeNodeMetaDirectory md))
|
||||
if (!(found.meta() instanceof JKleppmannTreeNodeMetaDirectory md))
|
||||
throw new StatusRuntimeException(Status.INVALID_ARGUMENT);
|
||||
|
||||
return found.getNode().getChildren().keySet();
|
||||
return found.children().keySet();
|
||||
});
|
||||
}
|
||||
|
||||
@@ -274,7 +262,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
}
|
||||
|
||||
try {
|
||||
var chunksAll = file.getChunks();
|
||||
var chunksAll = file.chunks();
|
||||
if (chunksAll.isEmpty()) {
|
||||
return Optional.of(ByteString.empty());
|
||||
}
|
||||
@@ -334,7 +322,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
throw new StatusRuntimeException(Status.NOT_FOUND);
|
||||
}
|
||||
|
||||
return chunkRead.getData();
|
||||
return chunkRead.data();
|
||||
}
|
||||
|
||||
private int getChunkSize(JObjectKey uuid) {
|
||||
@@ -373,7 +361,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
}
|
||||
|
||||
if (writeLogging) {
|
||||
Log.info("Writing to file: " + file.getKey() + " size=" + size(fileUuid) + " "
|
||||
Log.info("Writing to file: " + file.key() + " size=" + size(fileUuid) + " "
|
||||
+ offset + " " + data.size());
|
||||
}
|
||||
|
||||
@@ -381,7 +369,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
truncate(fileUuid, offset);
|
||||
|
||||
// FIXME: Some kind of immutable interface?
|
||||
var chunksAll = Collections.unmodifiableNavigableMap(file.getChunks());
|
||||
var chunksAll = Collections.unmodifiableNavigableMap(file.chunks());
|
||||
var first = chunksAll.floorEntry(offset);
|
||||
var last = chunksAll.lowerEntry(offset + data.size());
|
||||
NavigableMap<Long, JObjectKey> removedChunks = new TreeMap<>();
|
||||
@@ -496,14 +484,15 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
var thisChunk = pendingWrites.substring(cur, end);
|
||||
|
||||
ChunkData newChunkData = createChunk(thisChunk);
|
||||
newChunks.put(start, newChunkData.getKey());
|
||||
newChunks.put(start, newChunkData.key());
|
||||
|
||||
start += thisChunk.size();
|
||||
cur = end;
|
||||
}
|
||||
}
|
||||
|
||||
file.setChunks(newChunks);
|
||||
file = file.toBuilder().chunks(newChunks).mTime(System.currentTimeMillis()).build();
|
||||
curTx.put(file);
|
||||
cleanupChunks(file, removedChunks.values());
|
||||
updateFileSize(file);
|
||||
|
||||
@@ -524,10 +513,10 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
}
|
||||
|
||||
if (length == 0) {
|
||||
var oldChunks = Collections.unmodifiableNavigableMap(new TreeMap<>(file.getChunks()));
|
||||
var oldChunks = Collections.unmodifiableNavigableMap(new TreeMap<>(file.chunks()));
|
||||
|
||||
file.setChunks(new TreeMap<>());
|
||||
file.setMtime(System.currentTimeMillis());
|
||||
file = file.toBuilder().chunks(new TreeMap<>()).mTime(System.currentTimeMillis()).build();
|
||||
curTx.put(file);
|
||||
cleanupChunks(file, oldChunks.values());
|
||||
updateFileSize(file);
|
||||
return true;
|
||||
@@ -536,7 +525,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
var curSize = size(fileUuid);
|
||||
if (curSize == length) return true;
|
||||
|
||||
var chunksAll = Collections.unmodifiableNavigableMap(file.getChunks());
|
||||
var chunksAll = Collections.unmodifiableNavigableMap(file.chunks());
|
||||
NavigableMap<Long, JObjectKey> removedChunks = new TreeMap<>();
|
||||
NavigableMap<Long, JObjectKey> newChunks = new TreeMap<>();
|
||||
|
||||
@@ -567,9 +556,9 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
zeroCache.put(end - cur, UnsafeByteOperations.unsafeWrap(new byte[Math.toIntExact(end - cur)]));
|
||||
|
||||
ChunkData newChunkData = createChunk(zeroCache.get(end - cur));
|
||||
newChunks.put(start, newChunkData.getKey());
|
||||
newChunks.put(start, newChunkData.key());
|
||||
|
||||
start += newChunkData.getData().size();
|
||||
start += newChunkData.data().size();
|
||||
cur = end;
|
||||
}
|
||||
}
|
||||
@@ -584,10 +573,11 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
var newChunk = tailBytes.substring(0, (int) (length - tail.getKey()));
|
||||
|
||||
ChunkData newChunkData = createChunk(newChunk);
|
||||
newChunks.put(tail.getKey(), newChunkData.getKey());
|
||||
newChunks.put(tail.getKey(), newChunkData.key());
|
||||
}
|
||||
|
||||
file.setChunks(newChunks);
|
||||
file = file.toBuilder().chunks(newChunks).mTime(System.currentTimeMillis()).build();
|
||||
curTx.put(file);
|
||||
cleanupChunks(file, removedChunks.values());
|
||||
updateFileSize(file);
|
||||
return true;
|
||||
@@ -622,16 +612,14 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
var fuuid = UUID.randomUUID();
|
||||
Log.debug("Creating file " + fuuid);
|
||||
|
||||
File f = objectAllocator.create(File.class, new JObjectKey(fuuid.toString()));
|
||||
f.setSymlink(true);
|
||||
f.setRefsFrom(List.of());
|
||||
File f = new File(JObjectKey.of(fuuid.toString()), new HashSet<>(), false, 0, System.currentTimeMillis(), System.currentTimeMillis(), new TreeMap<>(), true, 0);
|
||||
ChunkData newChunkData = createChunk(UnsafeByteOperations.unsafeWrap(oldpath.getBytes(StandardCharsets.UTF_8)));
|
||||
|
||||
f.getChunks().put(0L, newChunkData.getKey());
|
||||
f.chunks().put(0L, newChunkData.key());
|
||||
updateFileSize(f);
|
||||
|
||||
getTree().move(parent.getKey(), new JKleppmannTreeNodeMetaFile(fname, f.getKey()), getTree().getNewNodeId());
|
||||
return f.getKey();
|
||||
getTree().move(parent.key(), new JKleppmannTreeNodeMetaFile(fname, f.key()), getTree().getNewNodeId());
|
||||
return f.key();
|
||||
});
|
||||
}
|
||||
|
||||
@@ -643,7 +631,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
"File not found for setTimes: " + fileUuid))
|
||||
);
|
||||
|
||||
file.setMtime(mtimeMs);
|
||||
curTx.put(file.toBuilder().cTime(atimeMs).mTime(mtimeMs).build());
|
||||
return true;
|
||||
});
|
||||
}
|
||||
@@ -653,14 +641,14 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
jObjectTxManager.executeTx(() -> {
|
||||
long realSize = 0;
|
||||
|
||||
var last = file.getChunks().lastEntry();
|
||||
var last = file.chunks().lastEntry();
|
||||
if (last != null) {
|
||||
var lastSize = getChunkSize(last.getValue());
|
||||
realSize = last.getKey() + lastSize;
|
||||
}
|
||||
|
||||
if (realSize != file.getSize()) {
|
||||
file.setSize(realSize);
|
||||
if (realSize != file.size()) {
|
||||
curTx.put(file.toBuilder().size(realSize).build());
|
||||
}
|
||||
});
|
||||
}
|
||||
@@ -671,7 +659,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
var read = curTx.get(File.class, uuid)
|
||||
.orElseThrow(() -> new StatusRuntimeException(Status.NOT_FOUND));
|
||||
|
||||
return read.getSize();
|
||||
return read.size();
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
package com.usatiuk.dhfs.objects;
|
||||
|
||||
import com.usatiuk.dhfs.objects.transaction.Transaction;
|
||||
import com.usatiuk.objects.alloc.runtime.ObjectAllocator;
|
||||
import com.usatiuk.objects.common.runtime.JData;
|
||||
import com.usatiuk.objects.common.runtime.JObjectKey;
|
||||
import io.quarkus.logging.Log;
|
||||
@@ -13,11 +12,8 @@ public class DeleterTxHook implements PreCommitTxHook {
|
||||
@Inject
|
||||
Transaction curTx;
|
||||
|
||||
@Inject
|
||||
ObjectAllocator alloc;
|
||||
|
||||
private boolean canDelete(JDataRefcounted data) {
|
||||
return !data.getFrozen() && data.getRefsFrom().isEmpty();
|
||||
return !data.frozen() && data.refsFrom().isEmpty();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -7,13 +7,13 @@ import java.util.Collection;
|
||||
import java.util.List;
|
||||
|
||||
public interface JDataRefcounted extends JData {
|
||||
Collection<JObjectKey> getRefsFrom();
|
||||
Collection<JObjectKey> refsFrom();
|
||||
|
||||
void setRefsFrom(Collection<JObjectKey> refs);
|
||||
JDataRefcounted withRefsFrom(Collection<JObjectKey> refs);
|
||||
|
||||
boolean getFrozen();
|
||||
boolean frozen();
|
||||
|
||||
void setFrozen(boolean frozen);
|
||||
JDataRefcounted withFrozen(boolean frozen);
|
||||
|
||||
default Collection<JObjectKey> collectRefsTo() {
|
||||
return List.of();
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
package com.usatiuk.dhfs.objects;
|
||||
|
||||
import com.usatiuk.dhfs.objects.transaction.Transaction;
|
||||
import com.usatiuk.objects.alloc.runtime.ObjectAllocator;
|
||||
import com.usatiuk.objects.common.runtime.JData;
|
||||
import com.usatiuk.objects.common.runtime.JObjectKey;
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
@@ -15,9 +14,6 @@ public class RefcounterTxHook implements PreCommitTxHook {
|
||||
@Inject
|
||||
Transaction curTx;
|
||||
|
||||
@Inject
|
||||
ObjectAllocator alloc;
|
||||
|
||||
@Override
|
||||
public void onChange(JObjectKey key, JData old, JData cur) {
|
||||
if (!(cur instanceof JDataRefcounted refCur)) {
|
||||
@@ -27,12 +23,12 @@ public class RefcounterTxHook implements PreCommitTxHook {
|
||||
|
||||
for (var newRef : CollectionUtils.subtract(refCur.collectRefsTo(), refOld.collectRefsTo())) {
|
||||
var referenced = curTx.get(JDataRefcounted.class, newRef).orElse(null);
|
||||
referenced.setRefsFrom(CollectionUtils.union(referenced.getRefsFrom(), Set.of(key)));
|
||||
curTx.put(referenced.withRefsFrom(CollectionUtils.union(referenced.refsFrom(), Set.of(key))));
|
||||
}
|
||||
|
||||
for (var removedRef : CollectionUtils.subtract(refOld.collectRefsTo(), refCur.collectRefsTo())) {
|
||||
var referenced = curTx.get(JDataRefcounted.class, removedRef).orElse(null);
|
||||
referenced.setRefsFrom(CollectionUtils.subtract(referenced.getRefsFrom(), Set.of(key)));
|
||||
curTx.put(referenced.withRefsFrom(CollectionUtils.subtract(referenced.refsFrom(), Set.of(key))));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -44,7 +40,7 @@ public class RefcounterTxHook implements PreCommitTxHook {
|
||||
|
||||
for (var newRef : refCur.collectRefsTo()) {
|
||||
var referenced = curTx.get(JDataRefcounted.class, newRef).orElse(null);
|
||||
referenced.setRefsFrom(CollectionUtils.union(referenced.getRefsFrom(), Set.of(key)));
|
||||
curTx.put(referenced.withRefsFrom(CollectionUtils.union(referenced.refsFrom(), Set.of(key))));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -57,7 +53,7 @@ public class RefcounterTxHook implements PreCommitTxHook {
|
||||
|
||||
for (var removedRef : refCur.collectRefsTo()) {
|
||||
var referenced = curTx.get(JDataRefcounted.class, removedRef).orElse(null);
|
||||
referenced.setRefsFrom(CollectionUtils.subtract(referenced.getRefsFrom(), Set.of(key)));
|
||||
curTx.put(referenced.withRefsFrom(CollectionUtils.subtract(referenced.refsFrom(), Set.of(key))));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -7,7 +7,6 @@ import com.usatiuk.dhfs.objects.jkleppmanntree.structs.JKleppmannTreeNodeMetaDir
|
||||
import com.usatiuk.dhfs.objects.jkleppmanntree.structs.JKleppmannTreePersistentData;
|
||||
import com.usatiuk.dhfs.objects.transaction.Transaction;
|
||||
import com.usatiuk.kleppmanntree.*;
|
||||
import com.usatiuk.objects.alloc.runtime.ObjectAllocator;
|
||||
import com.usatiuk.objects.common.runtime.JObjectKey;
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
import jakarta.inject.Inject;
|
||||
@@ -29,21 +28,26 @@ public class JKleppmannTreeManager {
|
||||
@Inject
|
||||
TransactionManager txManager;
|
||||
@Inject
|
||||
ObjectAllocator objectAllocator;
|
||||
@Inject
|
||||
JKleppmannTreePeerInterface peerInterface;
|
||||
|
||||
public JKleppmannTree getTree(JObjectKey name) {
|
||||
return txManager.executeTx(() -> {
|
||||
var data = curTx.get(JKleppmannTreePersistentData.class, name).orElse(null);
|
||||
if (data == null) {
|
||||
data = objectAllocator.create(JKleppmannTreePersistentData.class, name);
|
||||
data.setClock(new AtomicClock(1L));
|
||||
data.setQueues(new HashMap<>());
|
||||
data.setLog(new TreeMap<>());
|
||||
data.setPeerTimestampLog(new HashMap<>());
|
||||
data.setFrozen(true);
|
||||
data = new JKleppmannTreePersistentData(
|
||||
name,
|
||||
List.of(),
|
||||
true,
|
||||
1L,
|
||||
new HashMap<>(),
|
||||
new HashMap<>(),
|
||||
new TreeMap<>()
|
||||
);
|
||||
curTx.put(data);
|
||||
var rootNode = new JKleppmannTreeNode(JObjectKey.of(name.name() + "_jt_root"), null, new JKleppmannTreeNodeMetaDirectory(""));
|
||||
curTx.put(rootNode);
|
||||
var trashNode = new JKleppmannTreeNode(JObjectKey.of(name.name() + "_jt_trash"), null, new JKleppmannTreeNodeMetaDirectory(""));
|
||||
curTx.put(trashNode);
|
||||
}
|
||||
return new JKleppmannTree(data);
|
||||
// opObjectRegistry.registerObject(tree);
|
||||
@@ -51,9 +55,9 @@ public class JKleppmannTreeManager {
|
||||
}
|
||||
|
||||
public class JKleppmannTree {
|
||||
private final KleppmannTree<Long, UUID, JKleppmannTreeNodeMeta, JObjectKey, JKleppmannTreeNodeWrapper> _tree;
|
||||
private final KleppmannTree<Long, UUID, JKleppmannTreeNodeMeta, JObjectKey> _tree;
|
||||
|
||||
private final JKleppmannTreePersistentData _data;
|
||||
private JKleppmannTreePersistentData _data;
|
||||
|
||||
private final JKleppmannTreeStorageInterface _storageInterface;
|
||||
private final JKleppmannTreeClock _clock;
|
||||
@@ -61,7 +65,7 @@ public class JKleppmannTreeManager {
|
||||
private final JObjectKey _treeName;
|
||||
|
||||
JKleppmannTree(JKleppmannTreePersistentData data) {
|
||||
_treeName = data.getKey();
|
||||
_treeName = data.key();
|
||||
_data = data;
|
||||
|
||||
_storageInterface = new JKleppmannTreeStorageInterface();
|
||||
@@ -149,7 +153,7 @@ public class JKleppmannTreeManager {
|
||||
// _tree.recordBoostrapFor(host);
|
||||
// }
|
||||
|
||||
public Pair<String, JObjectKey> findParent(Function<JKleppmannTreeNodeWrapper, Boolean> predicate) {
|
||||
public Pair<String, JObjectKey> findParent(Function<TreeNode<Long, UUID, JKleppmannTreeNodeMeta, JObjectKey>, Boolean> predicate) {
|
||||
return _tree.findParent(predicate);
|
||||
}
|
||||
|
||||
@@ -273,35 +277,31 @@ public class JKleppmannTreeManager {
|
||||
private class JKleppmannTreeClock implements Clock<Long> {
|
||||
@Override
|
||||
public Long getTimestamp() {
|
||||
return _data.getClock().getTimestamp();
|
||||
var res = _data.clock() + 1;
|
||||
_data = _data.toBuilder().clock(res).build();
|
||||
curTx.put(_data);
|
||||
return res;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Long peekTimestamp() {
|
||||
return _data.getClock().peekTimestamp();
|
||||
return _data.clock();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Long updateTimestamp(Long receivedTimestamp) {
|
||||
return _data.getClock().updateTimestamp(receivedTimestamp);
|
||||
var old = _data.clock();
|
||||
_data = _data.toBuilder().clock(Math.max(old, receivedTimestamp) + 1).build();
|
||||
curTx.put(_data);
|
||||
return old;
|
||||
}
|
||||
}
|
||||
|
||||
public class JKleppmannTreeStorageInterface implements StorageInterface<Long, UUID, JKleppmannTreeNodeMeta, JObjectKey, JKleppmannTreeNodeWrapper> {
|
||||
public class JKleppmannTreeStorageInterface implements StorageInterface<Long, UUID, JKleppmannTreeNodeMeta, JObjectKey> {
|
||||
private final LogWrapper _logWrapper = new LogWrapper();
|
||||
private final PeerLogWrapper _peerLogWrapper = new PeerLogWrapper();
|
||||
|
||||
public JKleppmannTreeStorageInterface() {
|
||||
if (curTx.get(JKleppmannTreeNode.class, getRootId()).isEmpty()) {
|
||||
var rootNode = objectAllocator.create(JKleppmannTreeNode.class, getRootId());
|
||||
rootNode.setNode(new TreeNode<>(getRootId(), null, new JKleppmannTreeNodeMetaDirectory("")));
|
||||
rootNode.setRefsFrom(List.of());
|
||||
curTx.put(rootNode);
|
||||
var trashNode = objectAllocator.create(JKleppmannTreeNode.class, getTrashId());
|
||||
trashNode.setRefsFrom(List.of());
|
||||
trashNode.setNode(new TreeNode<>(getTrashId(), null, new JKleppmannTreeNodeMetaDirectory("")));
|
||||
curTx.put(trashNode);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -320,19 +320,19 @@ public class JKleppmannTreeManager {
|
||||
}
|
||||
|
||||
@Override
|
||||
public JKleppmannTreeNodeWrapper getById(JObjectKey id) {
|
||||
public JKleppmannTreeNode getById(JObjectKey id) {
|
||||
var got = curTx.get(JKleppmannTreeNode.class, id);
|
||||
if (got.isEmpty()) return null;
|
||||
return new JKleppmannTreeNodeWrapper(got.get());
|
||||
return got.orElse(null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public JKleppmannTreeNodeWrapper createNewNode(TreeNode<Long, UUID, JKleppmannTreeNodeMeta, JObjectKey> node) {
|
||||
var created = objectAllocator.create(JKleppmannTreeNode.class, node.getId());
|
||||
created.setNode(node);
|
||||
created.setRefsFrom(List.of());
|
||||
curTx.put(created);
|
||||
return new JKleppmannTreeNodeWrapper(created);
|
||||
public JKleppmannTreeNode createNewNode(JObjectKey key, JObjectKey parent, JKleppmannTreeNodeMeta meta) {
|
||||
return new JKleppmannTreeNode(key, parent, meta);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void putNode(TreeNode<Long, UUID, JKleppmannTreeNodeMeta, JObjectKey> node) {
|
||||
curTx.put(((JKleppmannTreeNode) node));
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -350,95 +350,87 @@ public class JKleppmannTreeManager {
|
||||
return _peerLogWrapper;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void rLock() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void rUnlock() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void rwLock() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void rwUnlock() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void assertRwLock() {
|
||||
}
|
||||
|
||||
private class PeerLogWrapper implements PeerTimestampLogInterface<Long, UUID> {
|
||||
@Override
|
||||
public Long getForPeer(UUID peerId) {
|
||||
return _data.getPeerTimestampLog().get(peerId);
|
||||
return _data.peerTimestampLog().get(peerId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void putForPeer(UUID peerId, Long timestamp) {
|
||||
_data.getPeerTimestampLog().put(peerId, timestamp);
|
||||
var newPeerTimestampLog = new HashMap<>(_data.peerTimestampLog());
|
||||
newPeerTimestampLog.put(peerId, timestamp);
|
||||
_data = _data.toBuilder().peerTimestampLog(newPeerTimestampLog).build();
|
||||
curTx.put(_data);
|
||||
}
|
||||
}
|
||||
|
||||
private class LogWrapper implements LogInterface<Long, UUID, JKleppmannTreeNodeMeta, JObjectKey> {
|
||||
@Override
|
||||
public Pair<CombinedTimestamp<Long, UUID>, LogRecord<Long, UUID, JKleppmannTreeNodeMeta, JObjectKey>> peekOldest() {
|
||||
var ret = _data.getLog().firstEntry();
|
||||
var ret = _data.log().firstEntry();
|
||||
if (ret == null) return null;
|
||||
return Pair.of(ret);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Pair<CombinedTimestamp<Long, UUID>, LogRecord<Long, UUID, JKleppmannTreeNodeMeta, JObjectKey>> takeOldest() {
|
||||
var ret = _data.getLog().pollFirstEntry();
|
||||
var newLog = new TreeMap<>(_data.log());
|
||||
var ret = newLog.pollFirstEntry();
|
||||
_data = _data.toBuilder().log(newLog).build();
|
||||
curTx.put(_data);
|
||||
if (ret == null) return null;
|
||||
return Pair.of(ret);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Pair<CombinedTimestamp<Long, UUID>, LogRecord<Long, UUID, JKleppmannTreeNodeMeta, JObjectKey>> peekNewest() {
|
||||
var ret = _data.getLog().lastEntry();
|
||||
var ret = _data.log().lastEntry();
|
||||
if (ret == null) return null;
|
||||
return Pair.of(ret);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Pair<CombinedTimestamp<Long, UUID>, LogRecord<Long, UUID, JKleppmannTreeNodeMeta, JObjectKey>>> newestSlice(CombinedTimestamp<Long, UUID> since, boolean inclusive) {
|
||||
return _data.getLog().tailMap(since, inclusive).entrySet().stream().map(e -> Pair.of(e.getKey(), e.getValue())).toList();
|
||||
return _data.log().tailMap(since, inclusive).entrySet().stream().map(e -> Pair.of(e.getKey(), e.getValue())).toList();
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Pair<CombinedTimestamp<Long, UUID>, LogRecord<Long, UUID, JKleppmannTreeNodeMeta, JObjectKey>>> getAll() {
|
||||
return _data.getLog().entrySet().stream().map(e -> Pair.of(e.getKey(), e.getValue())).toList();
|
||||
return _data.log().entrySet().stream().map(e -> Pair.of(e.getKey(), e.getValue())).toList();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isEmpty() {
|
||||
return _data.getLog().isEmpty();
|
||||
return _data.log().isEmpty();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean containsKey(CombinedTimestamp<Long, UUID> timestamp) {
|
||||
return _data.getLog().containsKey(timestamp);
|
||||
return _data.log().containsKey(timestamp);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long size() {
|
||||
return (long) _data.getLog().size();
|
||||
return (long) _data.log().size();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void put(CombinedTimestamp<Long, UUID> timestamp, LogRecord<Long, UUID, JKleppmannTreeNodeMeta, JObjectKey> record) {
|
||||
if (_data.getLog().containsKey(timestamp))
|
||||
if (_data.log().containsKey(timestamp))
|
||||
throw new IllegalStateException("Overwriting log entry?");
|
||||
_data.getLog().put(timestamp, record);
|
||||
var newLog = new TreeMap<>(_data.log());
|
||||
newLog.put(timestamp, record);
|
||||
_data = _data.toBuilder().log(newLog).build();
|
||||
curTx.put(_data);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void replace(CombinedTimestamp<Long, UUID> timestamp, LogRecord<Long, UUID, JKleppmannTreeNodeMeta, JObjectKey> record) {
|
||||
_data.getLog().put(timestamp, record);
|
||||
var newLog = new TreeMap<>(_data.log());
|
||||
newLog.put(timestamp, record);
|
||||
_data = _data.toBuilder().log(newLog).build();
|
||||
curTx.put(_data);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,61 +0,0 @@
|
||||
package com.usatiuk.dhfs.objects.jkleppmanntree;
|
||||
|
||||
import com.usatiuk.dhfs.objects.jkleppmanntree.structs.JKleppmannTreeNode;
|
||||
import com.usatiuk.dhfs.objects.jkleppmanntree.structs.JKleppmannTreeNodeMeta;
|
||||
import com.usatiuk.kleppmanntree.TreeNode;
|
||||
import com.usatiuk.kleppmanntree.TreeNodeWrapper;
|
||||
import com.usatiuk.objects.common.runtime.JObjectKey;
|
||||
|
||||
import java.util.UUID;
|
||||
|
||||
public class JKleppmannTreeNodeWrapper implements TreeNodeWrapper<Long, UUID, JKleppmannTreeNodeMeta, JObjectKey> {
|
||||
private final JKleppmannTreeNode _backing;
|
||||
|
||||
public JKleppmannTreeNodeWrapper(JKleppmannTreeNode backing) {
|
||||
assert backing != null;
|
||||
assert backing.getNode() != null;
|
||||
_backing = backing;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void rLock() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void rUnlock() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void rwLock() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void rwUnlock() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void freeze() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void unfreeze() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void notifyRef(JObjectKey id) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void notifyRmRef(JObjectKey id) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public TreeNode<Long, UUID, JKleppmannTreeNodeMeta, JObjectKey> getNode() {
|
||||
// TODO:
|
||||
return _backing.getNode();
|
||||
// _backing.tryResolve(JObjectManager.ResolutionStrategy.LOCAL_ONLY);
|
||||
// if (_backing.getData() == null)
|
||||
// throw new IllegalStateException("Node " + _backing.getMeta().getName() + " data lost!");
|
||||
// return _backing.getData().getNode();
|
||||
}
|
||||
}
|
||||
@@ -1,27 +1,66 @@
|
||||
package com.usatiuk.dhfs.objects.jkleppmanntree.structs;
|
||||
|
||||
import com.usatiuk.dhfs.objects.JDataRefcounted;
|
||||
import com.usatiuk.kleppmanntree.OpMove;
|
||||
import com.usatiuk.kleppmanntree.TreeNode;
|
||||
import com.usatiuk.objects.common.runtime.JObjectKey;
|
||||
import lombok.Builder;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
// FIXME: Ideally this is two classes?
|
||||
public interface JKleppmannTreeNode extends JDataRefcounted, Serializable {
|
||||
TreeNode<Long, UUID, JKleppmannTreeNodeMeta, JObjectKey> getNode();
|
||||
@Builder(toBuilder = true)
|
||||
public record JKleppmannTreeNode(JObjectKey key, Collection<JObjectKey> refsFrom, boolean frozen, JObjectKey parent,
|
||||
OpMove<Long, UUID, JKleppmannTreeNodeMeta, JObjectKey> lastEffectiveOp,
|
||||
JKleppmannTreeNodeMeta meta,
|
||||
Map<String, JObjectKey> children) implements TreeNode<Long, UUID, JKleppmannTreeNodeMeta, JObjectKey>, JDataRefcounted, Serializable {
|
||||
|
||||
void setNode(TreeNode<Long, UUID, JKleppmannTreeNodeMeta, JObjectKey> node);
|
||||
public JKleppmannTreeNode(JObjectKey id, JObjectKey parent, JKleppmannTreeNodeMeta meta) {
|
||||
this(id, Collections.emptyList(), false, parent, null, meta, Collections.emptyMap());
|
||||
}
|
||||
|
||||
@Override
|
||||
default Collection<JObjectKey> collectRefsTo() {
|
||||
return Stream.concat(getNode().getChildren().values().stream(),
|
||||
switch (getNode().getMeta()) {
|
||||
public JKleppmannTreeNode withParent(JObjectKey parent) {
|
||||
return this.toBuilder().parent(parent).build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public JKleppmannTreeNode withLastEffectiveOp(OpMove<Long, UUID, JKleppmannTreeNodeMeta, JObjectKey> lastEffectiveOp) {
|
||||
return this.toBuilder().lastEffectiveOp(lastEffectiveOp).build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public JKleppmannTreeNode withMeta(JKleppmannTreeNodeMeta meta) {
|
||||
return this.toBuilder().meta(meta).build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public JKleppmannTreeNode withChildren(Map<String, JObjectKey> children) {
|
||||
return this.toBuilder().children(children).build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public JKleppmannTreeNode withRefsFrom(Collection<JObjectKey> refs) {
|
||||
return this.toBuilder().refsFrom(refs).build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public JKleppmannTreeNode withFrozen(boolean frozen) {
|
||||
return this.toBuilder().frozen(frozen).build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Collection<JObjectKey> collectRefsTo() {
|
||||
return Stream.concat(children().values().stream(),
|
||||
switch (meta()) {
|
||||
case JKleppmannTreeNodeMetaDirectory dir -> Stream.<JObjectKey>of();
|
||||
case JKleppmannTreeNodeMetaFile file -> Stream.<JObjectKey>of(file.getFileIno());
|
||||
default -> throw new IllegalStateException("Unexpected value: " + getNode().getMeta());
|
||||
default -> throw new IllegalStateException("Unexpected value: " + meta());
|
||||
}
|
||||
).toList();
|
||||
}
|
||||
|
||||
@@ -6,49 +6,51 @@ import com.usatiuk.kleppmanntree.CombinedTimestamp;
|
||||
import com.usatiuk.kleppmanntree.LogRecord;
|
||||
import com.usatiuk.kleppmanntree.OpMove;
|
||||
import com.usatiuk.objects.common.runtime.JObjectKey;
|
||||
import lombok.Builder;
|
||||
|
||||
import java.util.*;
|
||||
|
||||
public interface JKleppmannTreePersistentData extends JDataRefcounted {
|
||||
AtomicClock getClock();
|
||||
|
||||
void setClock(AtomicClock clock);
|
||||
|
||||
HashMap<UUID, TreeMap<CombinedTimestamp<Long, UUID>, OpMove<Long, UUID, JKleppmannTreeNodeMeta, JObjectKey>>> getQueues();
|
||||
|
||||
void setQueues(HashMap<UUID, TreeMap<CombinedTimestamp<Long, UUID>, OpMove<Long, UUID, JKleppmannTreeNodeMeta, JObjectKey>>> queues);
|
||||
|
||||
HashMap<UUID, Long> getPeerTimestampLog();
|
||||
|
||||
void setPeerTimestampLog(HashMap<UUID, Long> peerTimestampLog);
|
||||
|
||||
TreeMap<CombinedTimestamp<Long, UUID>, LogRecord<Long, UUID, JKleppmannTreeNodeMeta, JObjectKey>> getLog();
|
||||
|
||||
void setLog(TreeMap<CombinedTimestamp<Long, UUID>, LogRecord<Long, UUID, JKleppmannTreeNodeMeta, JObjectKey>> log);
|
||||
|
||||
default void recordOp(UUID host, OpMove<Long, UUID, JKleppmannTreeNodeMeta, JObjectKey> opMove) {
|
||||
getQueues().computeIfAbsent(host, h -> new TreeMap<>());
|
||||
getQueues().get(host).put(opMove.timestamp(), opMove);
|
||||
@Builder(toBuilder = true)
|
||||
public record JKleppmannTreePersistentData(
|
||||
JObjectKey key, Collection<JObjectKey> refsFrom, boolean frozen,
|
||||
long clock,
|
||||
HashMap<UUID, TreeMap<CombinedTimestamp<Long, UUID>, OpMove<Long, UUID, JKleppmannTreeNodeMeta, JObjectKey>>> queues,
|
||||
HashMap<UUID, Long> peerTimestampLog,
|
||||
TreeMap<CombinedTimestamp<Long, UUID>, LogRecord<Long, UUID, JKleppmannTreeNodeMeta, JObjectKey>> log
|
||||
) implements JDataRefcounted {
|
||||
void recordOp(UUID host, OpMove<Long, UUID, JKleppmannTreeNodeMeta, JObjectKey> opMove) {
|
||||
queues().computeIfAbsent(host, h -> new TreeMap<>());
|
||||
queues().get(host).put(opMove.timestamp(), opMove);
|
||||
}
|
||||
|
||||
default void removeOp(UUID host, OpMove<Long, UUID, JKleppmannTreeNodeMeta, JObjectKey> opMove) {
|
||||
getQueues().get(host).remove(opMove.timestamp(), opMove);
|
||||
void removeOp(UUID host, OpMove<Long, UUID, JKleppmannTreeNodeMeta, JObjectKey> opMove) {
|
||||
queues().get(host).remove(opMove.timestamp(), opMove);
|
||||
}
|
||||
|
||||
default void recordOp(Collection<UUID> hosts, OpMove<Long, UUID, JKleppmannTreeNodeMeta, JObjectKey> opMove) {
|
||||
void recordOp(Collection<UUID> hosts, OpMove<Long, UUID, JKleppmannTreeNodeMeta, JObjectKey> opMove) {
|
||||
for (var u : hosts) {
|
||||
recordOp(u, opMove);
|
||||
}
|
||||
}
|
||||
|
||||
default void removeOp(Collection<UUID> hosts, OpMove<Long, UUID, JKleppmannTreeNodeMeta, JObjectKey> opMove) {
|
||||
void removeOp(Collection<UUID> hosts, OpMove<Long, UUID, JKleppmannTreeNodeMeta, JObjectKey> opMove) {
|
||||
for (var u : hosts) {
|
||||
removeOp(u, opMove);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
default Collection<JObjectKey> collectRefsTo() {
|
||||
return List.of(new JObjectKey(getKey().name() + "_jt_trash"), new JObjectKey(getKey().name() + "_jt_root"));
|
||||
public JKleppmannTreePersistentData withRefsFrom(Collection<JObjectKey> refs) {
|
||||
return this.toBuilder().refsFrom(refs).build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public JKleppmannTreePersistentData withFrozen(boolean frozen) {
|
||||
return this.toBuilder().frozen(frozen).build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Collection<JObjectKey> collectRefsTo() {
|
||||
return List.of(new JObjectKey(key().name() + "_jt_trash"), new JObjectKey(key().name() + "_jt_root"));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,6 +3,7 @@ package com.usatiuk.dhfs.benchmarks;
|
||||
import com.google.protobuf.UnsafeByteOperations;
|
||||
import com.usatiuk.dhfs.TempDataProfile;
|
||||
import com.usatiuk.dhfs.files.service.DhfsFileService;
|
||||
import com.usatiuk.objects.common.runtime.JObjectKey;
|
||||
import io.quarkus.test.junit.QuarkusTest;
|
||||
import io.quarkus.test.junit.TestProfile;
|
||||
import jakarta.inject.Inject;
|
||||
@@ -41,7 +42,7 @@ public class DhfsFileBenchmarkTest {
|
||||
@Test
|
||||
@Disabled
|
||||
void writeMbTest() {
|
||||
String file = dhfsFileService.create("/writeMbTest", 0777).get();
|
||||
JObjectKey file = dhfsFileService.create("/writeMbTest", 0777).get();
|
||||
var bb = ByteBuffer.allocateDirect(1024 * 1024);
|
||||
Benchmarker.runAndPrintMixSimple("dhfsFileService.write(\"\")",
|
||||
() -> {
|
||||
|
||||
@@ -1,23 +1,18 @@
|
||||
package com.usatiuk.dhfs.files;
|
||||
|
||||
import com.google.protobuf.ByteString;
|
||||
import com.usatiuk.dhfs.TempDataProfile;
|
||||
import com.usatiuk.dhfs.files.objects.ChunkData;
|
||||
import com.usatiuk.dhfs.files.objects.File;
|
||||
import com.usatiuk.dhfs.files.service.DhfsFileService;
|
||||
import com.usatiuk.dhfs.objects.jrepository.DeletedObjectAccessException;
|
||||
import com.usatiuk.dhfs.objects.jrepository.JObjectManager;
|
||||
import com.usatiuk.dhfs.objects.jrepository.JObjectTxManager;
|
||||
import com.usatiuk.dhfs.objects.TransactionManager;
|
||||
import com.usatiuk.dhfs.objects.transaction.Transaction;
|
||||
import com.usatiuk.kleppmanntree.AlreadyExistsException;
|
||||
import com.usatiuk.objects.common.runtime.JObjectKey;
|
||||
import jakarta.inject.Inject;
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import static org.awaitility.Awaitility.await;
|
||||
|
||||
@@ -50,54 +45,54 @@ public class DhfsFileServiceSimpleTestImpl {
|
||||
@Inject
|
||||
DhfsFileService fileService;
|
||||
@Inject
|
||||
JObjectManager jObjectManager;
|
||||
Transaction curTx;
|
||||
@Inject
|
||||
JObjectTxManager jObjectTxManager;
|
||||
TransactionManager jObjectTxManager;
|
||||
|
||||
@Test
|
||||
void readTest() {
|
||||
var fuuid = UUID.randomUUID();
|
||||
{
|
||||
ChunkData c1 = new ChunkData(ByteString.copyFrom("12345".getBytes()));
|
||||
ChunkData c2 = new ChunkData(ByteString.copyFrom("678".getBytes()));
|
||||
ChunkData c3 = new ChunkData(ByteString.copyFrom("91011".getBytes()));
|
||||
File f = new File(fuuid, 777, false);
|
||||
f.getChunks().put(0L, c1.getName());
|
||||
f.getChunks().put((long) c1.getBytes().size(), c2.getName());
|
||||
f.getChunks().put((long) c1.getBytes().size() + c2.getBytes().size(), c3.getName());
|
||||
|
||||
// FIXME: dhfs_files
|
||||
|
||||
var c1o = new AtomicReference<String>();
|
||||
var c2o = new AtomicReference<String>();
|
||||
var c3o = new AtomicReference<String>();
|
||||
var fo = new AtomicReference<String>();
|
||||
|
||||
jObjectTxManager.executeTx(() -> {
|
||||
c1o.set(jObjectManager.put(c1, Optional.of(f.getName())).getMeta().getName());
|
||||
c2o.set(jObjectManager.put(c2, Optional.of(f.getName())).getMeta().getName());
|
||||
c3o.set(jObjectManager.put(c3, Optional.of(f.getName())).getMeta().getName());
|
||||
fo.set(jObjectManager.put(f, Optional.empty()).getMeta().getName());
|
||||
});
|
||||
|
||||
var all = jObjectManager.findAll();
|
||||
Assertions.assertTrue(all.contains(c1o.get()));
|
||||
Assertions.assertTrue(all.contains(c2o.get()));
|
||||
Assertions.assertTrue(all.contains(c3o.get()));
|
||||
Assertions.assertTrue(all.contains(fo.get()));
|
||||
}
|
||||
|
||||
String all = "1234567891011";
|
||||
|
||||
{
|
||||
for (int start = 0; start < all.length(); start++) {
|
||||
for (int end = start; end <= all.length(); end++) {
|
||||
var read = fileService.read(fuuid.toString(), start, end - start);
|
||||
Assertions.assertArrayEquals(all.substring(start, end).getBytes(), read.get().toByteArray());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
// @Test
|
||||
// void readTest() {
|
||||
// var fuuid = UUID.randomUUID();
|
||||
// {
|
||||
// ChunkData c1 = new ChunkData(ByteString.copyFrom("12345".getBytes()));
|
||||
// ChunkData c2 = new ChunkData(ByteString.copyFrom("678".getBytes()));
|
||||
// ChunkData c3 = new ChunkData(ByteString.copyFrom("91011".getBytes()));
|
||||
// File f = new File(fuuid, 777, false);
|
||||
// f.chunks().put(0L, c1.getName());
|
||||
// f.chunks().put((long) c1.getBytes().size(), c2.getName());
|
||||
// f.chunks().put((long) c1.getBytes().size() + c2.getBytes().size(), c3.getName());
|
||||
//
|
||||
// // FIXME: dhfs_files
|
||||
//
|
||||
// var c1o = new AtomicReference<String>();
|
||||
// var c2o = new AtomicReference<String>();
|
||||
// var c3o = new AtomicReference<String>();
|
||||
// var fo = new AtomicReference<String>();
|
||||
//
|
||||
// jObjectTxManager.executeTx(() -> {
|
||||
// c1o.set(curTx.put(c1, Optional.of(f.getName())).getMeta().getName());
|
||||
// c2o.set(curTx.put(c2, Optional.of(f.getName())).getMeta().getName());
|
||||
// c3o.set(curTx.put(c3, Optional.of(f.getName())).getMeta().getName());
|
||||
// fo.set(curTx.put(f, Optional.empty()).getMeta().getName());
|
||||
// });
|
||||
//
|
||||
// var all = jObjectManager.findAll();
|
||||
// Assertions.assertTrue(all.contains(c1o.get()));
|
||||
// Assertions.assertTrue(all.contains(c2o.get()));
|
||||
// Assertions.assertTrue(all.contains(c3o.get()));
|
||||
// Assertions.assertTrue(all.contains(fo.get()));
|
||||
// }
|
||||
//
|
||||
// String all = "1234567891011";
|
||||
//
|
||||
// {
|
||||
// for (int start = 0; start < all.length(); start++) {
|
||||
// for (int end = start; end <= all.length(); end++) {
|
||||
// var read = fileService.read(fuuid.toString(), start, end - start);
|
||||
// Assertions.assertArrayEquals(all.substring(start, end).getBytes(), read.get().toByteArray());
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
|
||||
@Test
|
||||
void dontMkdirTwiceTest() {
|
||||
@@ -213,9 +208,12 @@ public class DhfsFileServiceSimpleTestImpl {
|
||||
fileService.write(uuid2, 0, new byte[]{11, 12, 13, 14, 15, 16, 17, 18, 19, 29});
|
||||
Assertions.assertArrayEquals(new byte[]{11, 12, 13, 14, 15, 16, 17, 18, 19, 29}, fileService.read(uuid2, 0, 10).get().toByteArray());
|
||||
|
||||
var oldfile = jObjectManager.get(ret2.get()).orElseThrow(IllegalStateException::new);
|
||||
var chunk = oldfile.runReadLocked(JObjectManager.ResolutionStrategy.LOCAL_ONLY, (m, d) -> d.extractRefs()).stream().toList().get(0);
|
||||
var chunkObj = jObjectManager.get(chunk).orElseThrow(IllegalStateException::new);
|
||||
|
||||
jObjectTxManager.run(() -> {
|
||||
var oldfile = curTx.get(File.class, ret2.get()).orElseThrow(IllegalStateException::new);
|
||||
var chunk = oldfile.chunks().get(0);
|
||||
var chunkObj = curTx.get(File.class, chunk).orElseThrow(IllegalStateException::new);
|
||||
});
|
||||
|
||||
Assertions.assertTrue(fileService.rename("/moveOverTest1", "/moveOverTest2"));
|
||||
Assertions.assertFalse(fileService.open("/moveOverTest1").isPresent());
|
||||
@@ -224,14 +222,13 @@ public class DhfsFileServiceSimpleTestImpl {
|
||||
Assertions.assertArrayEquals(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9},
|
||||
fileService.read(fileService.open("/moveOverTest2").get(), 0, 10).get().toByteArray());
|
||||
|
||||
await().atMost(5, TimeUnit.SECONDS).until(() -> {
|
||||
try {
|
||||
return chunkObj.runReadLocked(JObjectManager.ResolutionStrategy.LOCAL_ONLY,
|
||||
(m, d) -> !m.getReferrers().contains(uuid));
|
||||
} catch (DeletedObjectAccessException ignored) {
|
||||
return true;
|
||||
}
|
||||
});
|
||||
// await().atMost(5, TimeUnit.SECONDS).until(() -> {
|
||||
// jObjectTxManager.run(() -> {
|
||||
//
|
||||
// return chunkObj.runReadLocked(JObjectManager.ResolutionStrategy.LOCAL_ONLY,
|
||||
// (m, d) -> !m.getReferrers().contains(uuid));
|
||||
// });
|
||||
// });
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -270,13 +267,13 @@ public class DhfsFileServiceSimpleTestImpl {
|
||||
fileService.write(uuid, 0, new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9});
|
||||
Assertions.assertArrayEquals(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, fileService.read(uuid, 0, 10).get().toByteArray());
|
||||
|
||||
var oldfile = jObjectManager.get(uuid).orElseThrow(IllegalStateException::new);
|
||||
var chunk = oldfile.runReadLocked(JObjectManager.ResolutionStrategy.LOCAL_ONLY, (m, d) -> d.extractRefs()).stream().toList().get(0);
|
||||
var chunkObj = jObjectManager.get(chunk).orElseThrow(IllegalStateException::new);
|
||||
|
||||
chunkObj.runReadLockedVoid(JObjectManager.ResolutionStrategy.LOCAL_ONLY, (m, d) -> {
|
||||
Assertions.assertTrue(m.getReferrers().contains(uuid));
|
||||
});
|
||||
// var oldfile = jObjectManager.get(uuid).orElseThrow(IllegalStateException::new);
|
||||
// var chunk = oldfile.runReadLocked(JObjectManager.ResolutionStrategy.LOCAL_ONLY, (m, d) -> d.extractRefs()).stream().toList().get(0);
|
||||
// var chunkObj = jObjectManager.get(chunk).orElseThrow(IllegalStateException::new);
|
||||
//
|
||||
// chunkObj.runReadLockedVoid(JObjectManager.ResolutionStrategy.LOCAL_ONLY, (m, d) -> {
|
||||
// Assertions.assertTrue(m.getReferrers().contains(uuid));
|
||||
// });
|
||||
|
||||
Assertions.assertTrue(fileService.rename("/moveTest2", "/movedTest2"));
|
||||
Assertions.assertFalse(fileService.open("/moveTest2").isPresent());
|
||||
|
||||
@@ -1,95 +0,0 @@
|
||||
package com.usatiuk.dhfs.persistence;
|
||||
|
||||
import com.google.protobuf.ByteString;
|
||||
import com.usatiuk.dhfs.TempDataProfile;
|
||||
import com.usatiuk.dhfs.objects.persistence.ChunkDataP;
|
||||
import com.usatiuk.dhfs.objects.persistence.JObjectDataP;
|
||||
import com.usatiuk.dhfs.objects.persistence.ObjectMetadataP;
|
||||
import com.usatiuk.dhfs.objects.repository.persistence.FileObjectPersistentStore;
|
||||
import io.quarkus.test.junit.QuarkusTest;
|
||||
import io.quarkus.test.junit.TestProfile;
|
||||
import jakarta.inject.Inject;
|
||||
import org.apache.commons.lang3.RandomStringUtils;
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
|
||||
|
||||
class Profiles {
|
||||
public static class FileObjectPersistentStoreTestProfile extends TempDataProfile {
|
||||
@Override
|
||||
protected void getConfigOverrides(Map<String, String> ret) {
|
||||
ret.put("quarkus.log.category.\"com.usatiuk.dhfs\".level", "TRACE");
|
||||
ret.put("dhfs.fuse.enabled", "false");
|
||||
ret.put("dhfs.objects.ref_verification", "true");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@QuarkusTest
|
||||
@TestProfile(Profiles.FileObjectPersistentStoreTestProfile.class)
|
||||
public class FileObjectPersistentStoreTest {
|
||||
@Inject
|
||||
FileObjectPersistentStore fileObjectPersistentStore;
|
||||
|
||||
@Test
|
||||
public void writeReadFullObject() {
|
||||
String name = "writeReadFullObjectSmallMeta";
|
||||
|
||||
var bytes = new byte[100000];
|
||||
ThreadLocalRandom.current().nextBytes(bytes);
|
||||
|
||||
ObjectMetadataP meta = ObjectMetadataP.newBuilder().setName("verycoolname123456789").build();
|
||||
JObjectDataP data = JObjectDataP.newBuilder().setChunkData(ChunkDataP.newBuilder().setData(ByteString.copyFrom(bytes)).build()).build();
|
||||
|
||||
fileObjectPersistentStore.writeObjectDirect(name, meta, data);
|
||||
var readMeta = fileObjectPersistentStore.readObjectMeta(name);
|
||||
var readData = fileObjectPersistentStore.readObject(name);
|
||||
Assertions.assertEquals(meta, readMeta);
|
||||
Assertions.assertEquals(data, readData);
|
||||
|
||||
var bigString = RandomStringUtils.random(100000);
|
||||
|
||||
var newMeta = ObjectMetadataP.newBuilder().setName(String.valueOf(bigString)).build();
|
||||
fileObjectPersistentStore.writeObjectMetaDirect(name, newMeta);
|
||||
readMeta = fileObjectPersistentStore.readObjectMeta(name);
|
||||
readData = fileObjectPersistentStore.readObject(name);
|
||||
Assertions.assertEquals(newMeta, readMeta);
|
||||
Assertions.assertEquals(data, readData);
|
||||
|
||||
fileObjectPersistentStore.writeObjectDirect(name, newMeta, null);
|
||||
readMeta = fileObjectPersistentStore.readObjectMeta(name);
|
||||
Assertions.assertEquals(newMeta, readMeta);
|
||||
Assertions.assertThrows(Throwable.class, () -> fileObjectPersistentStore.readObject(name));
|
||||
|
||||
fileObjectPersistentStore.writeObjectMetaDirect(name, meta);
|
||||
readMeta = fileObjectPersistentStore.readObjectMeta(name);
|
||||
Assertions.assertEquals(meta, readMeta);
|
||||
Assertions.assertThrows(Throwable.class, () -> fileObjectPersistentStore.readObject(name));
|
||||
|
||||
fileObjectPersistentStore.writeObjectDirect(name, newMeta, null);
|
||||
readMeta = fileObjectPersistentStore.readObjectMeta(name);
|
||||
Assertions.assertEquals(newMeta, readMeta);
|
||||
Assertions.assertThrows(Throwable.class, () -> fileObjectPersistentStore.readObject(name));
|
||||
|
||||
fileObjectPersistentStore.writeObjectDirect(name, newMeta, data);
|
||||
readMeta = fileObjectPersistentStore.readObjectMeta(name);
|
||||
readData = fileObjectPersistentStore.readObject(name);
|
||||
Assertions.assertEquals(newMeta, readMeta);
|
||||
Assertions.assertEquals(data, readData);
|
||||
|
||||
fileObjectPersistentStore.writeObjectMetaDirect(name, meta);
|
||||
readMeta = fileObjectPersistentStore.readObjectMeta(name);
|
||||
readData = fileObjectPersistentStore.readObject(name);
|
||||
Assertions.assertEquals(meta, readMeta);
|
||||
Assertions.assertEquals(data, readData);
|
||||
|
||||
fileObjectPersistentStore.writeObjectMetaDirect(name, newMeta);
|
||||
readMeta = fileObjectPersistentStore.readObjectMeta(name);
|
||||
readData = fileObjectPersistentStore.readObject(name);
|
||||
Assertions.assertEquals(newMeta, readMeta);
|
||||
Assertions.assertEquals(data, readData);
|
||||
}
|
||||
}
|
||||
@@ -1,24 +0,0 @@
|
||||
package com.usatiuk.dhfs.persistence;
|
||||
|
||||
import io.quarkus.test.junit.QuarkusTest;
|
||||
|
||||
@QuarkusTest
|
||||
public class ProtoSerializationTest {
|
||||
|
||||
// @Inject
|
||||
// ProtoSerializerService protoSerializerService;
|
||||
//
|
||||
// @Test
|
||||
// void SerializeDeserializePeerDirectory() {
|
||||
// var pd = new PeerDirectory();
|
||||
// pd.getPeers().add(UUID.randomUUID());
|
||||
// var ser = JObjectDataP.newBuilder().setPeerDirectory((PeerDirectoryP) protoSerializerService.serialize(pd)).build();
|
||||
// var deser = (PeerDirectory) protoSerializerService.deserialize(ser);
|
||||
// Assertions.assertIterableEquals(pd.getPeers(), deser.getPeers());
|
||||
//
|
||||
// var ser2 = protoSerializerService.serializeToJObjectDataP(pd);
|
||||
// var deser2 = (PeerDirectory) protoSerializerService.deserialize(ser2);
|
||||
// Assertions.assertIterableEquals(pd.getPeers(), deser2.getPeers());
|
||||
// }
|
||||
//
|
||||
}
|
||||
Reference in New Issue
Block a user