mirror of
https://github.com/usatiuk/dhfs.git
synced 2025-10-29 04:57:48 +01:00
Compare commits
3 Commits
92bca1e4e1
...
16eb1d28d9
| Author | SHA1 | Date | |
|---|---|---|---|
| 16eb1d28d9 | |||
| 4f397cd2d4 | |||
| 6a20550353 |
@@ -18,6 +18,11 @@
|
||||
<artifactId>junit-jupiter-engine</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.junit.jupiter</groupId>
|
||||
<artifactId>junit-jupiter</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.commons</groupId>
|
||||
<artifactId>commons-collections4</artifactId>
|
||||
|
||||
@@ -64,7 +64,7 @@ public class KleppmannTree<TimestampT extends Comparable<TimestampT>, PeerIdT ex
|
||||
// Needs to be read after changing curParent, as it might be the same node
|
||||
var oldParent = _storage.getById(effect.oldInfo().oldParent());
|
||||
{
|
||||
var newOldParentChildren = oldParent.children().plus(node.meta().getName(), node.key());
|
||||
var newOldParentChildren = oldParent.children().plus(effect.oldInfo().oldMeta().getName(), node.key());
|
||||
oldParent = oldParent.withChildren(newOldParentChildren);
|
||||
_storage.putNode(oldParent);
|
||||
}
|
||||
@@ -90,6 +90,7 @@ public class KleppmannTree<TimestampT extends Comparable<TimestampT>, PeerIdT ex
|
||||
}
|
||||
|
||||
private void undoOp(LogRecord<TimestampT, PeerIdT, MetaT, NodeIdT> op) {
|
||||
LOGGER.finer(() -> "Will undo op: " + op);
|
||||
if (op.effects() != null)
|
||||
for (var e : op.effects().reversed())
|
||||
undoEffect(e);
|
||||
@@ -178,7 +179,7 @@ public class KleppmannTree<TimestampT extends Comparable<TimestampT>, PeerIdT ex
|
||||
// Returns true if the timestamp is newer than what's seen, false otherwise
|
||||
private boolean updateTimestampImpl(PeerIdT from, TimestampT newTimestamp) {
|
||||
TimestampT oldRef = _storage.getPeerTimestampLog().getForPeer(from);
|
||||
if (oldRef != null && oldRef.compareTo(newTimestamp) > 0) { // FIXME?
|
||||
if (oldRef != null && oldRef.compareTo(newTimestamp) >= 0) { // FIXME?
|
||||
LOGGER.warning("Wrong op order: received older than known from " + from.toString());
|
||||
return false;
|
||||
}
|
||||
@@ -199,7 +200,9 @@ public class KleppmannTree<TimestampT extends Comparable<TimestampT>, PeerIdT ex
|
||||
}
|
||||
|
||||
private void applyOp(PeerIdT from, OpMove<TimestampT, PeerIdT, MetaT, NodeIdT> op, boolean failCreatingIfExists) {
|
||||
if (!updateTimestampImpl(from, op.timestamp().timestamp())) return;
|
||||
if (!updateTimestampImpl(op.timestamp().nodeId(), op.timestamp().timestamp())) return;
|
||||
|
||||
LOGGER.finer(() -> "Will apply op: " + op + " from " + from);
|
||||
|
||||
var log = _storage.getLog();
|
||||
|
||||
@@ -252,6 +255,7 @@ public class KleppmannTree<TimestampT extends Comparable<TimestampT>, PeerIdT ex
|
||||
}
|
||||
|
||||
private LogRecord<TimestampT, PeerIdT, MetaT, NodeIdT> doOp(OpMove<TimestampT, PeerIdT, MetaT, NodeIdT> op, boolean failCreatingIfExists) {
|
||||
LOGGER.finer(() -> "Doing op: " + op);
|
||||
LogRecord<TimestampT, PeerIdT, MetaT, NodeIdT> computed;
|
||||
try {
|
||||
computed = computeEffects(op, failCreatingIfExists);
|
||||
@@ -291,6 +295,7 @@ public class KleppmannTree<TimestampT extends Comparable<TimestampT>, PeerIdT ex
|
||||
|
||||
private void applyEffects(OpMove<TimestampT, PeerIdT, MetaT, NodeIdT> sourceOp, List<LogEffect<TimestampT, PeerIdT, MetaT, NodeIdT>> effects) {
|
||||
for (var effect : effects) {
|
||||
LOGGER.finer(() -> "Applying effect: " + effect + " from op " + sourceOp);
|
||||
TreeNode<TimestampT, PeerIdT, MetaT, NodeIdT> oldParentNode = null;
|
||||
TreeNode<TimestampT, PeerIdT, MetaT, NodeIdT> newParentNode;
|
||||
TreeNode<TimestampT, PeerIdT, MetaT, NodeIdT> node;
|
||||
@@ -354,6 +359,8 @@ public class KleppmannTree<TimestampT extends Comparable<TimestampT>, PeerIdT ex
|
||||
return new LogRecord<>(op, null);
|
||||
}
|
||||
|
||||
LOGGER.finer(() -> "Node creation conflict: " + conflictNode);
|
||||
|
||||
String newConflictNodeName = conflictNodeMeta.getName() + ".conflict." + conflictNode.key();
|
||||
String newOursName = op.newMeta().getName() + ".conflict." + op.childId();
|
||||
return new LogRecord<>(op, List.of(
|
||||
@@ -361,6 +368,7 @@ public class KleppmannTree<TimestampT extends Comparable<TimestampT>, PeerIdT ex
|
||||
new LogEffect<>(null, op, op.newParentId(), (MetaT) op.newMeta().withName(newOursName), op.childId())
|
||||
));
|
||||
} else {
|
||||
LOGGER.finer(() -> "Simple node creation");
|
||||
return new LogRecord<>(op, List.of(
|
||||
new LogEffect<>(null, op, newParentId, op.newMeta(), op.childId())
|
||||
));
|
||||
@@ -385,11 +393,15 @@ public class KleppmannTree<TimestampT extends Comparable<TimestampT>, PeerIdT ex
|
||||
return new LogRecord<>(op, null);
|
||||
}
|
||||
|
||||
LOGGER.finer(() -> "Node replacement: " + replaceNode);
|
||||
|
||||
return new LogRecord<>(op, List.of(
|
||||
new LogEffect<>(new LogEffectOld<>(replaceNode.lastEffectiveOp(), newParentId, replaceNodeMeta), replaceNode.lastEffectiveOp(), _storage.getTrashId(), (MetaT) replaceNodeMeta.withName(replaceNodeId.toString()), replaceNodeId),
|
||||
new LogEffect<>(new LogEffectOld<>(node.lastEffectiveOp(), oldParentId, oldMeta), op, op.newParentId(), op.newMeta(), op.childId())
|
||||
));
|
||||
}
|
||||
|
||||
LOGGER.finer(() -> "Simple node move");
|
||||
return new LogRecord<>(op, List.of(
|
||||
new LogEffect<>(new LogEffectOld<>(node.lastEffectiveOp(), oldParentId, oldMeta), op, op.newParentId(), op.newMeta(), op.childId())
|
||||
));
|
||||
|
||||
@@ -2,13 +2,15 @@ package com.usatiuk.kleppmanntree;
|
||||
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.params.ParameterizedTest;
|
||||
import org.junit.jupiter.params.provider.ValueSource;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
public class KleppmanTreeSimpleTest {
|
||||
private final TestNode testNode1 = new TestNode(1);
|
||||
private final TestNode testNode2 = new TestNode(2);
|
||||
|
||||
private final TestNode testNode3 = new TestNode(3);
|
||||
|
||||
@Test
|
||||
void circularTest() {
|
||||
@@ -89,4 +91,50 @@ public class KleppmanTreeSimpleTest {
|
||||
Assertions.assertTrue(testNode2._storageInterface.getLog().size() <= 1);
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@ValueSource(booleans = {true, false})
|
||||
void undoWithRenameTest(boolean opOrder) {
|
||||
var d1id = testNode1._storageInterface.getNewNodeId();
|
||||
var d2id = testNode2._storageInterface.getNewNodeId();
|
||||
var d3id = testNode2._storageInterface.getNewNodeId();
|
||||
testNode1._tree.move(testNode1._storageInterface.getRootId(), new TestNodeMetaDir("Test1"), d1id);
|
||||
testNode2._tree.move(testNode1._storageInterface.getRootId(), new TestNodeMetaDir("Test1"), d2id);
|
||||
testNode3._tree.move(testNode1._storageInterface.getRootId(), new TestNodeMetaDir("Test1"), d3id);
|
||||
var r1 = testNode1.getRecorded();
|
||||
var r2 = testNode2.getRecorded();
|
||||
var r3 = testNode3.getRecorded();
|
||||
Assertions.assertEquals(1, r1.size());
|
||||
Assertions.assertEquals(1, r2.size());
|
||||
Assertions.assertEquals(1, r3.size());
|
||||
|
||||
if (opOrder) {
|
||||
testNode2._tree.applyExternalOp(3L, r3.getFirst());
|
||||
testNode2._tree.applyExternalOp(1L, r1.getFirst());
|
||||
} else {
|
||||
testNode2._tree.applyExternalOp(1L, r1.getFirst());
|
||||
testNode2._tree.applyExternalOp(3L, r3.getFirst());
|
||||
}
|
||||
|
||||
Assertions.assertIterableEquals(List.of("Test1", "Test1.conflict." + d1id, "Test1.conflict." + d2id), testNode2._storageInterface.getById(testNode2._storageInterface.getRootId()).children().keySet());
|
||||
|
||||
if (opOrder) {
|
||||
testNode1._tree.applyExternalOp(3L, r3.getFirst());
|
||||
testNode1._tree.applyExternalOp(2L, r2.getFirst());
|
||||
} else {
|
||||
testNode1._tree.applyExternalOp(2L, r2.getFirst());
|
||||
testNode1._tree.applyExternalOp(3L, r3.getFirst());
|
||||
}
|
||||
|
||||
Assertions.assertIterableEquals(List.of("Test1", "Test1.conflict." + d1id, "Test1.conflict." + d2id), testNode1._storageInterface.getById(testNode1._storageInterface.getRootId()).children().keySet());
|
||||
|
||||
if (opOrder) {
|
||||
testNode3._tree.applyExternalOp(2L, r2.getFirst());
|
||||
testNode3._tree.applyExternalOp(1L, r1.getFirst());
|
||||
} else {
|
||||
testNode3._tree.applyExternalOp(1L, r1.getFirst());
|
||||
testNode3._tree.applyExternalOp(2L, r2.getFirst());
|
||||
}
|
||||
|
||||
Assertions.assertIterableEquals(List.of("Test1", "Test1.conflict." + d1id, "Test1.conflict." + d2id), testNode3._storageInterface.getById(testNode3._storageInterface.getRootId()).children().keySet());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -109,10 +109,10 @@ public class JObjectManager {
|
||||
|
||||
var curIteration = pendingWrites.get(hook);
|
||||
|
||||
Log.trace("Commit iteration with " + curIteration.size() + " records for hook " + hook.getClass());
|
||||
// Log.trace("Commit iteration with " + curIteration.size() + " records for hook " + hook.getClass());
|
||||
|
||||
for (var entry : curIteration.entrySet()) {
|
||||
Log.trace("Running pre-commit hook " + hook.getClass() + " for" + entry.getKey());
|
||||
// Log.trace("Running pre-commit hook " + hook.getClass() + " for" + entry.getKey());
|
||||
var oldObj = getPrev.apply(entry.getKey());
|
||||
lastCurHookSeen.put(entry.getKey(), entry.getValue());
|
||||
switch (entry.getValue()) {
|
||||
|
||||
@@ -1,14 +1,16 @@
|
||||
package com.usatiuk.dhfs.objects.transaction;
|
||||
|
||||
import com.usatiuk.dhfs.objects.*;
|
||||
import com.usatiuk.dhfs.objects.JData;
|
||||
import com.usatiuk.dhfs.objects.JDataVersionedWrapper;
|
||||
import com.usatiuk.dhfs.objects.JObjectKey;
|
||||
import com.usatiuk.dhfs.objects.iterators.*;
|
||||
import com.usatiuk.dhfs.objects.iterators.IteratorStart;
|
||||
import com.usatiuk.dhfs.objects.snapshot.Snapshot;
|
||||
import com.usatiuk.dhfs.objects.snapshot.SnapshotManager;
|
||||
import io.quarkus.logging.Log;
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
import jakarta.inject.Inject;
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
import org.eclipse.microprofile.config.inject.ConfigProperty;
|
||||
|
||||
import java.util.*;
|
||||
|
||||
@@ -18,6 +20,8 @@ public class TransactionFactoryImpl implements TransactionFactory {
|
||||
SnapshotManager snapshotManager;
|
||||
@Inject
|
||||
LockManager lockManager;
|
||||
@ConfigProperty(name = "dhfs.objects.transaction.never-lock")
|
||||
boolean neverLock;
|
||||
|
||||
@Override
|
||||
public TransactionPrivate createTransaction() {
|
||||
@@ -195,6 +199,9 @@ public class TransactionFactoryImpl implements TransactionFactory {
|
||||
}
|
||||
}
|
||||
|
||||
if (neverLock)
|
||||
return getFromSource(type, key);
|
||||
|
||||
return switch (strategy) {
|
||||
case OPTIMISTIC -> getFromSource(type, key);
|
||||
case WRITE -> getWriteLockedFromSource(type, key);
|
||||
|
||||
@@ -6,5 +6,6 @@ dhfs.objects.lock_timeout_secs=15
|
||||
dhfs.objects.persistence.files.root=${HOME}/dhfs_default/data/objs
|
||||
quarkus.package.jar.decompiler.enabled=true
|
||||
dhfs.objects.persistence.snapshot-extra-checks=false
|
||||
dhfs.objects.transaction.never-lock=true
|
||||
quarkus.log.category."com.usatiuk.dhfs.objects.iterators".level=INFO
|
||||
quarkus.log.category."com.usatiuk.dhfs.objects.iterators".min-level=INFO
|
||||
|
||||
@@ -106,6 +106,8 @@ public class JKleppmannTreeManager {
|
||||
collected.add(new JKleppmannTreeOpWrapper(_data.key(), node.getValue()));
|
||||
if (collected.size() >= limit) break;
|
||||
}
|
||||
Log.tracev("Collected pending op for host: {0} - {1}, out of {2}", host, collected,
|
||||
_data.queues().getOrDefault(host, TreePMap.empty()));
|
||||
return Collections.unmodifiableList(collected);
|
||||
}
|
||||
|
||||
@@ -114,11 +116,11 @@ public class JKleppmannTreeManager {
|
||||
if (!(op instanceof JKleppmannTreeOpWrapper jop))
|
||||
throw new IllegalArgumentException("Invalid incoming op type for JKleppmannTree: " + op.getClass());
|
||||
|
||||
var firstOp = _data.queues().get(host).firstEntry().getValue();
|
||||
if (!Objects.equals(firstOp, jop.op()))
|
||||
var firstOp = _data.queues().get(host).firstEntry();
|
||||
if (!Objects.equals(firstOp.getValue(), jop.op()))
|
||||
throw new IllegalArgumentException("Committed op push was not the oldest");
|
||||
|
||||
_data = _data.withQueues(_data.queues().plus(host, _data.queues().get(host).minus(_data.queues().get(host).firstKey())));
|
||||
_data = _data.withQueues(_data.queues().plus(host, _data.queues().get(host).minus(firstOp.getKey())));
|
||||
curTx.put(_data);
|
||||
}
|
||||
|
||||
|
||||
@@ -30,4 +30,12 @@ public abstract class JKleppmannTreeNodeMeta implements NodeMeta {
|
||||
public int hashCode() {
|
||||
return Objects.hashCode(_name);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "JKleppmannTreeNodeMeta{" +
|
||||
"class=" + this.getClass().getSimpleName() + " " +
|
||||
"_name='" + _name + '\'' +
|
||||
'}';
|
||||
}
|
||||
}
|
||||
|
||||
@@ -35,4 +35,12 @@ public class JKleppmannTreeNodeMetaFile extends JKleppmannTreeNodeMeta {
|
||||
public int hashCode() {
|
||||
return Objects.hash(super.hashCode(), _fileIno);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "JKleppmannTreeNodeMetaFile{" +
|
||||
"_name=" + getName() + ", " +
|
||||
"_fileIno=" + _fileIno +
|
||||
'}';
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user