3 Commits

9 changed files with 102 additions and 11 deletions

View File

@@ -18,6 +18,11 @@
<artifactId>junit-jupiter-engine</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-collections4</artifactId>

View File

@@ -64,7 +64,7 @@ public class KleppmannTree<TimestampT extends Comparable<TimestampT>, PeerIdT ex
// Needs to be read after changing curParent, as it might be the same node
var oldParent = _storage.getById(effect.oldInfo().oldParent());
{
var newOldParentChildren = oldParent.children().plus(node.meta().getName(), node.key());
var newOldParentChildren = oldParent.children().plus(effect.oldInfo().oldMeta().getName(), node.key());
oldParent = oldParent.withChildren(newOldParentChildren);
_storage.putNode(oldParent);
}
@@ -90,6 +90,7 @@ public class KleppmannTree<TimestampT extends Comparable<TimestampT>, PeerIdT ex
}
private void undoOp(LogRecord<TimestampT, PeerIdT, MetaT, NodeIdT> op) {
LOGGER.finer(() -> "Will undo op: " + op);
if (op.effects() != null)
for (var e : op.effects().reversed())
undoEffect(e);
@@ -178,7 +179,7 @@ public class KleppmannTree<TimestampT extends Comparable<TimestampT>, PeerIdT ex
// Returns true if the timestamp is newer than what's seen, false otherwise
private boolean updateTimestampImpl(PeerIdT from, TimestampT newTimestamp) {
TimestampT oldRef = _storage.getPeerTimestampLog().getForPeer(from);
if (oldRef != null && oldRef.compareTo(newTimestamp) > 0) { // FIXME?
if (oldRef != null && oldRef.compareTo(newTimestamp) >= 0) { // FIXME?
LOGGER.warning("Wrong op order: received older than known from " + from.toString());
return false;
}
@@ -199,7 +200,9 @@ public class KleppmannTree<TimestampT extends Comparable<TimestampT>, PeerIdT ex
}
private void applyOp(PeerIdT from, OpMove<TimestampT, PeerIdT, MetaT, NodeIdT> op, boolean failCreatingIfExists) {
if (!updateTimestampImpl(from, op.timestamp().timestamp())) return;
if (!updateTimestampImpl(op.timestamp().nodeId(), op.timestamp().timestamp())) return;
LOGGER.finer(() -> "Will apply op: " + op + " from " + from);
var log = _storage.getLog();
@@ -252,6 +255,7 @@ public class KleppmannTree<TimestampT extends Comparable<TimestampT>, PeerIdT ex
}
private LogRecord<TimestampT, PeerIdT, MetaT, NodeIdT> doOp(OpMove<TimestampT, PeerIdT, MetaT, NodeIdT> op, boolean failCreatingIfExists) {
LOGGER.finer(() -> "Doing op: " + op);
LogRecord<TimestampT, PeerIdT, MetaT, NodeIdT> computed;
try {
computed = computeEffects(op, failCreatingIfExists);
@@ -291,6 +295,7 @@ public class KleppmannTree<TimestampT extends Comparable<TimestampT>, PeerIdT ex
private void applyEffects(OpMove<TimestampT, PeerIdT, MetaT, NodeIdT> sourceOp, List<LogEffect<TimestampT, PeerIdT, MetaT, NodeIdT>> effects) {
for (var effect : effects) {
LOGGER.finer(() -> "Applying effect: " + effect + " from op " + sourceOp);
TreeNode<TimestampT, PeerIdT, MetaT, NodeIdT> oldParentNode = null;
TreeNode<TimestampT, PeerIdT, MetaT, NodeIdT> newParentNode;
TreeNode<TimestampT, PeerIdT, MetaT, NodeIdT> node;
@@ -354,6 +359,8 @@ public class KleppmannTree<TimestampT extends Comparable<TimestampT>, PeerIdT ex
return new LogRecord<>(op, null);
}
LOGGER.finer(() -> "Node creation conflict: " + conflictNode);
String newConflictNodeName = conflictNodeMeta.getName() + ".conflict." + conflictNode.key();
String newOursName = op.newMeta().getName() + ".conflict." + op.childId();
return new LogRecord<>(op, List.of(
@@ -361,6 +368,7 @@ public class KleppmannTree<TimestampT extends Comparable<TimestampT>, PeerIdT ex
new LogEffect<>(null, op, op.newParentId(), (MetaT) op.newMeta().withName(newOursName), op.childId())
));
} else {
LOGGER.finer(() -> "Simple node creation");
return new LogRecord<>(op, List.of(
new LogEffect<>(null, op, newParentId, op.newMeta(), op.childId())
));
@@ -385,11 +393,15 @@ public class KleppmannTree<TimestampT extends Comparable<TimestampT>, PeerIdT ex
return new LogRecord<>(op, null);
}
LOGGER.finer(() -> "Node replacement: " + replaceNode);
return new LogRecord<>(op, List.of(
new LogEffect<>(new LogEffectOld<>(replaceNode.lastEffectiveOp(), newParentId, replaceNodeMeta), replaceNode.lastEffectiveOp(), _storage.getTrashId(), (MetaT) replaceNodeMeta.withName(replaceNodeId.toString()), replaceNodeId),
new LogEffect<>(new LogEffectOld<>(node.lastEffectiveOp(), oldParentId, oldMeta), op, op.newParentId(), op.newMeta(), op.childId())
));
}
LOGGER.finer(() -> "Simple node move");
return new LogRecord<>(op, List.of(
new LogEffect<>(new LogEffectOld<>(node.lastEffectiveOp(), oldParentId, oldMeta), op, op.newParentId(), op.newMeta(), op.childId())
));

View File

@@ -2,13 +2,15 @@ package com.usatiuk.kleppmanntree;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import java.util.List;
public class KleppmanTreeSimpleTest {
private final TestNode testNode1 = new TestNode(1);
private final TestNode testNode2 = new TestNode(2);
private final TestNode testNode3 = new TestNode(3);
@Test
void circularTest() {
@@ -89,4 +91,50 @@ public class KleppmanTreeSimpleTest {
Assertions.assertTrue(testNode2._storageInterface.getLog().size() <= 1);
}
@ParameterizedTest
@ValueSource(booleans = {true, false})
void undoWithRenameTest(boolean opOrder) {
var d1id = testNode1._storageInterface.getNewNodeId();
var d2id = testNode2._storageInterface.getNewNodeId();
var d3id = testNode2._storageInterface.getNewNodeId();
testNode1._tree.move(testNode1._storageInterface.getRootId(), new TestNodeMetaDir("Test1"), d1id);
testNode2._tree.move(testNode1._storageInterface.getRootId(), new TestNodeMetaDir("Test1"), d2id);
testNode3._tree.move(testNode1._storageInterface.getRootId(), new TestNodeMetaDir("Test1"), d3id);
var r1 = testNode1.getRecorded();
var r2 = testNode2.getRecorded();
var r3 = testNode3.getRecorded();
Assertions.assertEquals(1, r1.size());
Assertions.assertEquals(1, r2.size());
Assertions.assertEquals(1, r3.size());
if (opOrder) {
testNode2._tree.applyExternalOp(3L, r3.getFirst());
testNode2._tree.applyExternalOp(1L, r1.getFirst());
} else {
testNode2._tree.applyExternalOp(1L, r1.getFirst());
testNode2._tree.applyExternalOp(3L, r3.getFirst());
}
Assertions.assertIterableEquals(List.of("Test1", "Test1.conflict." + d1id, "Test1.conflict." + d2id), testNode2._storageInterface.getById(testNode2._storageInterface.getRootId()).children().keySet());
if (opOrder) {
testNode1._tree.applyExternalOp(3L, r3.getFirst());
testNode1._tree.applyExternalOp(2L, r2.getFirst());
} else {
testNode1._tree.applyExternalOp(2L, r2.getFirst());
testNode1._tree.applyExternalOp(3L, r3.getFirst());
}
Assertions.assertIterableEquals(List.of("Test1", "Test1.conflict." + d1id, "Test1.conflict." + d2id), testNode1._storageInterface.getById(testNode1._storageInterface.getRootId()).children().keySet());
if (opOrder) {
testNode3._tree.applyExternalOp(2L, r2.getFirst());
testNode3._tree.applyExternalOp(1L, r1.getFirst());
} else {
testNode3._tree.applyExternalOp(1L, r1.getFirst());
testNode3._tree.applyExternalOp(2L, r2.getFirst());
}
Assertions.assertIterableEquals(List.of("Test1", "Test1.conflict." + d1id, "Test1.conflict." + d2id), testNode3._storageInterface.getById(testNode3._storageInterface.getRootId()).children().keySet());
}
}

View File

@@ -109,10 +109,10 @@ public class JObjectManager {
var curIteration = pendingWrites.get(hook);
Log.trace("Commit iteration with " + curIteration.size() + " records for hook " + hook.getClass());
// Log.trace("Commit iteration with " + curIteration.size() + " records for hook " + hook.getClass());
for (var entry : curIteration.entrySet()) {
Log.trace("Running pre-commit hook " + hook.getClass() + " for" + entry.getKey());
// Log.trace("Running pre-commit hook " + hook.getClass() + " for" + entry.getKey());
var oldObj = getPrev.apply(entry.getKey());
lastCurHookSeen.put(entry.getKey(), entry.getValue());
switch (entry.getValue()) {

View File

@@ -1,14 +1,16 @@
package com.usatiuk.dhfs.objects.transaction;
import com.usatiuk.dhfs.objects.*;
import com.usatiuk.dhfs.objects.JData;
import com.usatiuk.dhfs.objects.JDataVersionedWrapper;
import com.usatiuk.dhfs.objects.JObjectKey;
import com.usatiuk.dhfs.objects.iterators.*;
import com.usatiuk.dhfs.objects.iterators.IteratorStart;
import com.usatiuk.dhfs.objects.snapshot.Snapshot;
import com.usatiuk.dhfs.objects.snapshot.SnapshotManager;
import io.quarkus.logging.Log;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import org.apache.commons.lang3.tuple.Pair;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import java.util.*;
@@ -18,6 +20,8 @@ public class TransactionFactoryImpl implements TransactionFactory {
SnapshotManager snapshotManager;
@Inject
LockManager lockManager;
@ConfigProperty(name = "dhfs.objects.transaction.never-lock")
boolean neverLock;
@Override
public TransactionPrivate createTransaction() {
@@ -195,6 +199,9 @@ public class TransactionFactoryImpl implements TransactionFactory {
}
}
if (neverLock)
return getFromSource(type, key);
return switch (strategy) {
case OPTIMISTIC -> getFromSource(type, key);
case WRITE -> getWriteLockedFromSource(type, key);

View File

@@ -6,5 +6,6 @@ dhfs.objects.lock_timeout_secs=15
dhfs.objects.persistence.files.root=${HOME}/dhfs_default/data/objs
quarkus.package.jar.decompiler.enabled=true
dhfs.objects.persistence.snapshot-extra-checks=false
dhfs.objects.transaction.never-lock=true
quarkus.log.category."com.usatiuk.dhfs.objects.iterators".level=INFO
quarkus.log.category."com.usatiuk.dhfs.objects.iterators".min-level=INFO

View File

@@ -106,6 +106,8 @@ public class JKleppmannTreeManager {
collected.add(new JKleppmannTreeOpWrapper(_data.key(), node.getValue()));
if (collected.size() >= limit) break;
}
Log.tracev("Collected pending op for host: {0} - {1}, out of {2}", host, collected,
_data.queues().getOrDefault(host, TreePMap.empty()));
return Collections.unmodifiableList(collected);
}
@@ -114,11 +116,11 @@ public class JKleppmannTreeManager {
if (!(op instanceof JKleppmannTreeOpWrapper jop))
throw new IllegalArgumentException("Invalid incoming op type for JKleppmannTree: " + op.getClass());
var firstOp = _data.queues().get(host).firstEntry().getValue();
if (!Objects.equals(firstOp, jop.op()))
var firstOp = _data.queues().get(host).firstEntry();
if (!Objects.equals(firstOp.getValue(), jop.op()))
throw new IllegalArgumentException("Committed op push was not the oldest");
_data = _data.withQueues(_data.queues().plus(host, _data.queues().get(host).minus(_data.queues().get(host).firstKey())));
_data = _data.withQueues(_data.queues().plus(host, _data.queues().get(host).minus(firstOp.getKey())));
curTx.put(_data);
}

View File

@@ -30,4 +30,12 @@ public abstract class JKleppmannTreeNodeMeta implements NodeMeta {
public int hashCode() {
return Objects.hashCode(_name);
}
@Override
public String toString() {
return "JKleppmannTreeNodeMeta{" +
"class=" + this.getClass().getSimpleName() + " " +
"_name='" + _name + '\'' +
'}';
}
}

View File

@@ -35,4 +35,12 @@ public class JKleppmannTreeNodeMetaFile extends JKleppmannTreeNodeMeta {
public int hashCode() {
return Objects.hash(super.hashCode(), _fileIno);
}
@Override
public String toString() {
return "JKleppmannTreeNodeMetaFile{" +
"_name=" + getName() + ", " +
"_fileIno=" + _fileIno +
'}';
}
}