Kelppmanntree: remove undocontext

This commit is contained in:
2025-04-25 21:37:50 +02:00
parent b034591091
commit a53fc5e973
10 changed files with 96 additions and 123 deletions

View File

@@ -4,6 +4,7 @@ import com.google.protobuf.ByteString;
import com.google.protobuf.UnsafeByteOperations;
import com.usatiuk.dhfs.jkleppmanntree.JKleppmannTreeManager;
import com.usatiuk.dhfs.jkleppmanntree.structs.JKleppmannTreeNode;
import com.usatiuk.dhfs.jkleppmanntree.structs.JKleppmannTreeNodeHolder;
import com.usatiuk.dhfs.jkleppmanntree.structs.JKleppmannTreeNodeMeta;
import com.usatiuk.dhfs.jmap.JMapEntry;
import com.usatiuk.dhfs.jmap.JMapHelper;
@@ -94,21 +95,21 @@ public class DhfsFileServiceImpl implements DhfsFileService {
private JKleppmannTreeNode getDirEntryW(String name) {
var res = getTreeW().traverse(StreamSupport.stream(Path.of(name).spliterator(), false).map(p -> p.toString()).toList());
if (res == null) throw new StatusRuntimeExceptionNoStacktrace(Status.NOT_FOUND);
var ret = curTx.get(JKleppmannTreeNode.class, res).orElseThrow(() -> new StatusRuntimeException(Status.NOT_FOUND.withDescription("Tree node exists but not found as jObject: " + name)));
var ret = curTx.get(JKleppmannTreeNodeHolder.class, res).map(JKleppmannTreeNodeHolder::node).orElseThrow(() -> new StatusRuntimeException(Status.NOT_FOUND.withDescription("Tree node exists but not found as jObject: " + name)));
return ret;
}
private JKleppmannTreeNode getDirEntryR(String name) {
var res = getTreeR().traverse(StreamSupport.stream(Path.of(name).spliterator(), false).map(p -> p.toString()).toList());
if (res == null) throw new StatusRuntimeExceptionNoStacktrace(Status.NOT_FOUND);
var ret = curTx.get(JKleppmannTreeNode.class, res).orElseThrow(() -> new StatusRuntimeException(Status.NOT_FOUND.withDescription("Tree node exists but not found as jObject: " + name)));
var ret = curTx.get(JKleppmannTreeNodeHolder.class, res).map(JKleppmannTreeNodeHolder::node).orElseThrow(() -> new StatusRuntimeException(Status.NOT_FOUND.withDescription("Tree node exists but not found as jObject: " + name)));
return ret;
}
private Optional<JKleppmannTreeNode> getDirEntryOpt(String name) {
var res = getTreeW().traverse(StreamSupport.stream(Path.of(name).spliterator(), false).map(p -> p.toString()).toList());
if (res == null) return Optional.empty();
var ret = curTx.get(JKleppmannTreeNode.class, res);
var ret = curTx.get(JKleppmannTreeNodeHolder.class, res).map(JKleppmannTreeNodeHolder::node);
return ret;
}
@@ -125,7 +126,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
} else {
throw new StatusRuntimeException(Status.DATA_LOSS.withDescription("FsNode is not an FsNode: " + ref.key()));
}
} else if (ref instanceof JKleppmannTreeNode) {
} else if (ref instanceof JKleppmannTreeNodeHolder) {
ret = new GetattrRes(100, 100, 0700, GetattrType.DIRECTORY);
} else {
throw new StatusRuntimeException(Status.DATA_LOSS.withDescription("FsNode is not an FsNode: " + ref.key()));
@@ -242,7 +243,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
return jObjectTxManager.executeTx(() -> {
var dent = curTx.get(JData.class, uuid).orElseThrow(() -> new StatusRuntimeExceptionNoStacktrace(Status.NOT_FOUND));
if (dent instanceof JKleppmannTreeNode) {
if (dent instanceof JKleppmannTreeNodeHolder) {
return true;
} else if (dent instanceof RemoteObjectMeta) {
var remote = remoteTx.getData(JDataRemote.class, uuid).orElse(null);
@@ -628,7 +629,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
var dent = curTx.get(JData.class, fileUuid).orElseThrow(() -> new StatusRuntimeExceptionNoStacktrace(Status.NOT_FOUND));
// FIXME:
if (dent instanceof JKleppmannTreeNode) {
if (dent instanceof JKleppmannTreeNodeHolder) {
return true;
} else if (dent instanceof RemoteObjectMeta) {
var remote = remoteTx.getData(JDataRemote.class, fileUuid).orElse(null);

View File

@@ -15,7 +15,6 @@ public class KleppmannTree<TimestampT extends Comparable<TimestampT>, PeerIdT ex
private final PeerInterface<PeerIdT> _peers;
private final Clock<TimestampT> _clock;
private final OpRecorder<TimestampT, PeerIdT, MetaT, NodeIdT> _opRecorder;
private HashMap<NodeIdT, TreeNode<TimestampT, PeerIdT, MetaT, NodeIdT>> _undoCtx = null;
public KleppmannTree(StorageInterface<TimestampT, PeerIdT, MetaT, NodeIdT> storage,
PeerInterface<PeerIdT> peers,
@@ -87,7 +86,6 @@ public class KleppmannTree<TimestampT extends Comparable<TimestampT>, PeerIdT ex
node.withParent(null)
.withLastEffectiveOp(null)
);
_undoCtx.put(node.key(), node);
}
}
@@ -217,7 +215,6 @@ public class KleppmannTree<TimestampT extends Comparable<TimestampT>, PeerIdT ex
if (cmp < 0) {
if (log.containsKey(op.timestamp())) return;
var toUndo = log.newestSlice(op.timestamp(), false);
_undoCtx = new HashMap<>();
for (var entry : toUndo.reversed()) {
undoOp(entry.getValue());
}
@@ -225,13 +222,6 @@ public class KleppmannTree<TimestampT extends Comparable<TimestampT>, PeerIdT ex
for (var entry : toUndo) {
redoOp(entry);
}
if (!_undoCtx.isEmpty()) {
for (var e : _undoCtx.entrySet()) {
LOGGER.log(Level.FINE, "Dropping node " + e.getKey());
_storage.removeNode(e.getKey());
}
}
_undoCtx = null;
tryTrimLog();
} else {
doAndPut(op, failCreatingIfExists);
@@ -264,24 +254,6 @@ public class KleppmannTree<TimestampT extends Comparable<TimestampT>, PeerIdT ex
}
private TreeNode<TimestampT, PeerIdT, MetaT, NodeIdT> getNewNode(NodeIdT key, NodeIdT parent, MetaT meta) {
if (_undoCtx != null) {
var node = _undoCtx.get(key);
if (node != null) {
try {
if (!node.children().isEmpty()) {
LOGGER.log(Level.WARNING, "Not empty children for undone node " + key);
}
node = node.withParent(parent).withMeta(meta);
} catch (Exception e) {
LOGGER.log(Level.SEVERE, "Error while fixing up node " + key, e);
node = null;
}
}
if (node != null) {
_undoCtx.remove(key);
return node;
}
}
return _storage.createNewNode(key, parent, meta);
}

View File

@@ -13,6 +13,10 @@
</parent>
<dependencies>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-smallrye-openapi</artifactId>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>

View File

@@ -2,6 +2,7 @@ package com.usatiuk.dhfs.jkleppmanntree;
import com.usatiuk.dhfs.invalidation.Op;
import com.usatiuk.dhfs.jkleppmanntree.structs.JKleppmannTreeNode;
import com.usatiuk.dhfs.jkleppmanntree.structs.JKleppmannTreeNodeHolder;
import com.usatiuk.dhfs.jkleppmanntree.structs.JKleppmannTreeNodeMeta;
import com.usatiuk.dhfs.jkleppmanntree.structs.JKleppmannTreePersistentData;
import com.usatiuk.dhfs.peersync.PeerId;
@@ -53,11 +54,11 @@ public class JKleppmannTreeManager {
);
curTx.put(data);
var rootNode = new JKleppmannTreeNode(JObjectKey.of(name.value() + "_jt_root"), null, rootNodeSupplier.get());
curTx.put(rootNode);
curTx.put(new JKleppmannTreeNodeHolder(rootNode));
var trashNode = new JKleppmannTreeNode(JObjectKey.of(name.value() + "_jt_trash"), null, rootNodeSupplier.get());
curTx.put(trashNode);
curTx.put(new JKleppmannTreeNodeHolder(trashNode));
var lf_node = new JKleppmannTreeNode(JObjectKey.of(name.value() + "_jt_lf"), null, rootNodeSupplier.get());
curTx.put(lf_node);
curTx.put(new JKleppmannTreeNodeHolder(lf_node));
}
return new JKleppmannTree(data);
// opObjectRegistry.registerObject(tree);
@@ -170,60 +171,13 @@ public class JKleppmannTreeManager {
if (Log.isTraceEnabled())
Log.trace("Received op from " + from + ": " + jop.op().timestamp().timestamp() + " " + jop.op().childId() + "->" + jop.op().newParentId() + " as " + jop.op().newMeta().name());
try {
_tree.applyExternalOp(from, jop.op());
} catch (Exception e) {
Log.error("Error applying external op", e);
throw e;
} finally {
// FIXME:
// Fixup the ref if it didn't really get applied
// if ((fileRef == null) && (jop.getOp().newMeta() instanceof JKleppmannTreeNodeMetaFile))
// Log.error("Could not create child of pushed op: " + jop.getOp());
// if (jop.getOp().newMeta() instanceof JKleppmannTreeNodeMetaFile f) {
// if (fileRef != null) {
// var got = jObjectManager.get(jop.getOp().childId()).orElse(null);
//
// VoidFn remove = () -> {
// fileRef.runWriteLockedVoid(JObjectManager.ResolutionStrategy.LOCAL_ONLY, (m, d, b, v) -> {
// m.removeRef(jop.getOp().childId());
// });
// };
//
// if (got == null) {
// remove.apply();
// } else {
// try {
// got.rLock();
// try {
// got.tryResolve(JObjectManager.ResolutionStrategy.LOCAL_ONLY);
// if (got.getData() == null || !got.getData().extractRefs().contains(f.getFileIno()))
// remove.apply();
// } finally {
// got.rUnlock();
// }
// } catch (DeletedObjectAccessException dex) {
// remove.apply();
// }
// }
// }
// }
}
_tree.applyExternalOp(from, jop.op());
}
public Op getPeriodicPushOp() {
return new JKleppmannTreePeriodicPushOp(_treeName, persistentPeerDataService.getSelfUuid(), _clock.peekTimestamp());
}
// @Override
// public void addToTx() {
// // FIXME: a hack
// _persistentData.get().rwLockNoCopy();
// _persistentData.get().rwUnlock();
// }
private class JOpRecorder implements OpRecorder<Long, PeerId, JKleppmannTreeNodeMeta, JObjectKey> {
@Override
public void recordOp(OpMove<Long, PeerId, JKleppmannTreeNodeMeta, JObjectKey> op) {
@@ -291,8 +245,8 @@ public class JKleppmannTreeManager {
@Override
public JKleppmannTreeNode getById(JObjectKey id) {
var got = curTx.get(JKleppmannTreeNode.class, id);
return got.orElse(null);
var got = curTx.get(JKleppmannTreeNodeHolder.class, id);
return got.map(JKleppmannTreeNodeHolder::node).orElse(null);
}
@Override
@@ -302,12 +256,14 @@ public class JKleppmannTreeManager {
@Override
public void putNode(TreeNode<Long, PeerId, JKleppmannTreeNodeMeta, JObjectKey> node) {
curTx.put(((JKleppmannTreeNode) node));
curTx.put(curTx.get(JKleppmannTreeNodeHolder.class, node.key())
.map(n -> n.withNode((JKleppmannTreeNode) node))
.orElse(new JKleppmannTreeNodeHolder((JKleppmannTreeNode) node)));
}
@Override
public void removeNode(JObjectKey id) {
// TODO
// GC
}
@Override

View File

@@ -1,16 +1,12 @@
package com.usatiuk.dhfs.jkleppmanntree.structs;
import com.usatiuk.dhfs.peersync.PeerId;
import com.usatiuk.dhfs.refcount.JDataRef;
import com.usatiuk.dhfs.refcount.JDataRefcounted;
import com.usatiuk.kleppmanntree.OpMove;
import com.usatiuk.kleppmanntree.TreeNode;
import com.usatiuk.objects.JObjectKey;
import jakarta.annotation.Nullable;
import org.pcollections.HashTreePMap;
import org.pcollections.PCollection;
import org.pcollections.PMap;
import org.pcollections.TreePSet;
import java.io.Serializable;
import java.util.Collection;
@@ -18,47 +14,35 @@ import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.Stream;
// FIXME: Ideally this is two classes?
public record JKleppmannTreeNode(JObjectKey key, PCollection<JDataRef> refsFrom, boolean frozen, JObjectKey parent,
public record JKleppmannTreeNode(JObjectKey key, JObjectKey parent,
OpMove<Long, PeerId, JKleppmannTreeNodeMeta, JObjectKey> lastEffectiveOp,
@Nullable JKleppmannTreeNodeMeta meta,
PMap<String, JObjectKey> children) implements TreeNode<Long, PeerId, JKleppmannTreeNodeMeta, JObjectKey>, JDataRefcounted, Serializable {
PMap<String, JObjectKey> children) implements TreeNode<Long, PeerId, JKleppmannTreeNodeMeta, JObjectKey>, Serializable {
public JKleppmannTreeNode(JObjectKey id, JObjectKey parent, JKleppmannTreeNodeMeta meta) {
this(id, TreePSet.empty(), false, parent, null, meta, HashTreePMap.empty());
this(id, parent, null, meta, HashTreePMap.empty());
}
@Override
public JKleppmannTreeNode withParent(JObjectKey parent) {
return new JKleppmannTreeNode(key, refsFrom, frozen, parent, lastEffectiveOp, meta, children);
return new JKleppmannTreeNode(key, parent, lastEffectiveOp, meta, children);
}
@Override
public JKleppmannTreeNode withLastEffectiveOp(OpMove<Long, PeerId, JKleppmannTreeNodeMeta, JObjectKey> lastEffectiveOp) {
return new JKleppmannTreeNode(key, refsFrom, frozen, parent, lastEffectiveOp, meta, children);
return new JKleppmannTreeNode(key, parent, lastEffectiveOp, meta, children);
}
@Override
public JKleppmannTreeNode withMeta(JKleppmannTreeNodeMeta meta) {
return new JKleppmannTreeNode(key, refsFrom, frozen, parent, lastEffectiveOp, meta, children);
return new JKleppmannTreeNode(key, parent, lastEffectiveOp, meta, children);
}
@Override
public JKleppmannTreeNode withChildren(PMap<String, JObjectKey> children) {
return new JKleppmannTreeNode(key, refsFrom, frozen, parent, lastEffectiveOp, meta, children);
return new JKleppmannTreeNode(key, parent, lastEffectiveOp, meta, children);
}
@Override
public JKleppmannTreeNode withRefsFrom(PCollection<JDataRef> refs) {
return new JKleppmannTreeNode(key, refs, frozen, parent, lastEffectiveOp, meta, children);
}
@Override
public JKleppmannTreeNode withFrozen(boolean frozen) {
return new JKleppmannTreeNode(key, refsFrom, frozen, parent, lastEffectiveOp, meta, children);
}
@Override
public Collection<JObjectKey> collectRefsTo() {
return Stream.<JObjectKey>concat(children().values().stream(),
Optional.ofNullable(meta)
@@ -66,4 +50,8 @@ public record JKleppmannTreeNode(JObjectKey key, PCollection<JDataRef> refsFrom,
.orElse(Stream.empty()))
.collect(Collectors.toUnmodifiableSet());
}
public int estimateSize() {
return children.size() * 64;
}
}

View File

@@ -0,0 +1,50 @@
package com.usatiuk.dhfs.jkleppmanntree.structs;
import com.usatiuk.dhfs.refcount.JDataRef;
import com.usatiuk.dhfs.refcount.JDataRefcounted;
import com.usatiuk.objects.JObjectKey;
import org.pcollections.PCollection;
import org.pcollections.TreePSet;
import java.io.Serializable;
import java.util.Collection;
import java.util.Objects;
// Separate refcounting from JKleppmannTreeNode
public record JKleppmannTreeNodeHolder(PCollection<JDataRef> refsFrom, boolean frozen,
JKleppmannTreeNode node) implements JDataRefcounted, Serializable {
public JKleppmannTreeNodeHolder(JKleppmannTreeNode node) {
this(TreePSet.empty(), false, node);
}
public JKleppmannTreeNodeHolder withNode(JKleppmannTreeNode node) {
Objects.requireNonNull(node, "node");
return new JKleppmannTreeNodeHolder(refsFrom, frozen, node);
}
@Override
public JKleppmannTreeNodeHolder withRefsFrom(PCollection<JDataRef> refs) {
return new JKleppmannTreeNodeHolder(refs, frozen, node);
}
@Override
public JKleppmannTreeNodeHolder withFrozen(boolean frozen) {
return new JKleppmannTreeNodeHolder(refsFrom, frozen, node);
}
@Override
public Collection<JObjectKey> collectRefsTo() {
return node.collectRefsTo();
}
@Override
public JObjectKey key() {
return node.key();
}
@Override
public int estimateSize() {
return node.estimateSize();
}
}

View File

@@ -11,6 +11,7 @@ import jakarta.inject.Singleton;
import java.util.ArrayList;
import java.util.Optional;
// TODO: It's not actually generic right now, only longs are supported essentially
@Singleton
public class JMapHelper {
@Inject
@@ -45,7 +46,6 @@ public class JMapHelper {
}
public <K extends JMapKey> Optional<JMapEntry<K>> get(JMapHolder<K> holder, K key) {
// TODO:
return curTx.get(JMapEntry.class, makeKey(holder.key(), key)).map(e -> (JMapEntry<K>) e);
}

View File

@@ -1,7 +1,7 @@
package com.usatiuk.dhfs.peersync;
import com.usatiuk.dhfs.jkleppmanntree.JKleppmannTreeManager;
import com.usatiuk.dhfs.jkleppmanntree.structs.JKleppmannTreeNode;
import com.usatiuk.dhfs.jkleppmanntree.structs.JKleppmannTreeNodeHolder;
import com.usatiuk.dhfs.peersync.structs.JKleppmannTreeNodeMetaPeer;
import com.usatiuk.dhfs.remoteobj.RemoteTransaction;
import com.usatiuk.objects.JObjectKey;
@@ -39,7 +39,7 @@ public class PeerInfoService {
public Optional<PeerInfo> getPeerInfoImpl(JObjectKey key) {
return jObjectTxManager.run(() -> {
return curTx.get(JKleppmannTreeNode.class, key).flatMap(node -> {
return curTx.get(JKleppmannTreeNodeHolder.class, key).map(JKleppmannTreeNodeHolder::node).flatMap(node -> {
var meta = (JKleppmannTreeNodeMetaPeer) node.meta();
return remoteTx.getData(PeerInfo.class, meta.peerId());
});
@@ -63,7 +63,7 @@ public class PeerInfoService {
if (gotKey == null) {
return Optional.empty();
}
return curTx.get(JKleppmannTreeNode.class, gotKey).flatMap(node -> {
return curTx.get(JKleppmannTreeNodeHolder.class, gotKey).map(JKleppmannTreeNodeHolder::node).flatMap(node -> {
var meta = (JKleppmannTreeNodeMetaPeer) node.meta();
return remoteTx.getData(PeerInfo.class, meta.peerId());
});
@@ -73,7 +73,7 @@ public class PeerInfoService {
public List<PeerInfo> getPeers() {
return jObjectTxManager.run(() -> {
var gotKey = getTreeR().traverse(List.of());
return curTx.get(JKleppmannTreeNode.class, gotKey).map(
return curTx.get(JKleppmannTreeNodeHolder.class, gotKey).map(JKleppmannTreeNodeHolder::node).map(
node -> node.children().keySet().stream()
.map(JObjectKey::of).map(this::getPeerInfoImpl)
.filter(o -> {
@@ -88,7 +88,7 @@ public class PeerInfoService {
public List<PeerInfo> getPeersNoSelf() {
return jObjectTxManager.run(() -> {
var gotKey = getTreeR().traverse(List.of());
return curTx.get(JKleppmannTreeNode.class, gotKey).map(
return curTx.get(JKleppmannTreeNodeHolder.class, gotKey).map(JKleppmannTreeNodeHolder::node).map(
node -> node.children().keySet().stream()
.map(JObjectKey::of).map(this::getPeerInfoImpl)
.filter(o -> {
@@ -116,7 +116,7 @@ public class PeerInfoService {
if (gotKey == null) {
return;
}
var node = curTx.get(JKleppmannTreeNode.class, gotKey).orElse(null);
var node = curTx.get(JKleppmannTreeNodeHolder.class, gotKey).map(JKleppmannTreeNodeHolder::node).orElse(null);
if (node == null) {
Log.warn("Peer " + id + " not found in the tree");
return;

View File

@@ -2,6 +2,7 @@ package com.usatiuk.dhfs.peertrust;
import com.usatiuk.dhfs.invalidation.InvalidationQueueService;
import com.usatiuk.dhfs.jkleppmanntree.structs.JKleppmannTreeNode;
import com.usatiuk.dhfs.jkleppmanntree.structs.JKleppmannTreeNodeHolder;
import com.usatiuk.dhfs.peersync.PeerInfo;
import com.usatiuk.dhfs.peersync.PeerInfoService;
import com.usatiuk.dhfs.peersync.PersistentPeerDataService;
@@ -29,18 +30,18 @@ public class PeerInfoCertUpdateTxHook implements PreCommitTxHook {
// We also need to force pushing invalidation to all, in case our node acts as a "middleman"
// connecting two other nodes
// TODO: Can there be a prettier way to do this? (e.g. more general proxying of ops?)
if (cur instanceof JKleppmannTreeNode n) {
if (cur instanceof JKleppmannTreeNodeHolder n) {
if (n.key().value().equals("peers_jt_root")) {
// TODO: This is kinda sucky
Log.infov("Changed peer tree root: {0} to {1}", key, cur);
curTx.onCommit(() -> persistentPeerDataService.updateCerts());
curTx.onCommit(() -> invalidationQueueService.pushInvalidationToAll(PeerInfoService.TREE_KEY));
if (!(old instanceof JKleppmannTreeNode oldNode))
if (!(old instanceof JKleppmannTreeNodeHolder oldNode))
throw new IllegalStateException("Old node is not a tree node");
for (var curRef : oldNode.children().entrySet()) {
if (!n.children().containsKey(curRef.getKey())) {
for (var curRef : oldNode.node().children().entrySet()) {
if (!n.node().children().containsKey(curRef.getKey())) {
Log.infov("Will reset sync state for {0}", curRef.getValue());
curTx.onCommit(() -> persistentPeerDataService.resetInitialSyncDone(JKleppmannTreeNodeMetaPeer.nodeIdToPeerId(curRef.getValue())));
}

View File

@@ -122,6 +122,7 @@ public class RemoteObjectServiceClient {
try {
return _batchExecutor.invokeAll(targets.stream().<Callable<Pair<PeerId, CanDeleteReply>>>map(h -> () -> {
var req = CanDeleteRequest.newBuilder().setName(JObjectKeyP.newBuilder().setName(objKey.toString()).build());
assert ourReferrers.isEmpty();
for (var ref : ourReferrers) {
req.addOurReferrers(JObjectKeyP.newBuilder().setName(ref.obj().toString()).build());
}