Sync-base: cleanup JKleppmannTree meta

This commit is contained in:
2025-04-25 09:50:54 +02:00
parent 85a1fa09ab
commit 56a15f4672
13 changed files with 99 additions and 69 deletions

View File

@@ -1,20 +1,15 @@
package com.usatiuk.dhfsfs.objects;
import com.usatiuk.dhfs.peersync.PeerId;
import com.usatiuk.dhfs.remoteobj.RemoteObjectDataWrapper;
import com.usatiuk.dhfs.remoteobj.RemoteObjectMeta;
import com.usatiuk.dhfs.remoteobj.RemoteTransaction;
import com.usatiuk.dhfsfs.service.DhfsFileService;
import com.usatiuk.dhfs.jkleppmanntree.JKleppmannTreeManager;
import com.usatiuk.dhfs.jkleppmanntree.structs.JKleppmannTreeNodeMetaFile;
import com.usatiuk.dhfs.jmap.JMapHelper;
import com.usatiuk.dhfs.remoteobj.ObjSyncHandler;
import com.usatiuk.dhfs.peersync.PeerId;
import com.usatiuk.dhfs.peersync.PersistentPeerDataService;
import com.usatiuk.dhfs.remoteobj.SyncHelper;
import com.usatiuk.dhfs.remoteobj.*;
import com.usatiuk.dhfsfs.service.DhfsFileService;
import com.usatiuk.kleppmanntree.AlreadyExistsException;
import com.usatiuk.objects.JObjectKey;
import com.usatiuk.objects.transaction.LockingStrategy;
import com.usatiuk.objects.transaction.Transaction;
import com.usatiuk.kleppmanntree.AlreadyExistsException;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.quarkus.logging.Log;
@@ -48,11 +43,11 @@ public class FileSyncHandler implements ObjSyncHandler<File, FileDto> {
DhfsFileService fileService;
private JKleppmannTreeManager.JKleppmannTree getTreeW() {
return jKleppmannTreeManager.getTree(JObjectKey.of("fs"));
return jKleppmannTreeManager.getTree(JObjectKey.of("fs")).orElseThrow();
}
private JKleppmannTreeManager.JKleppmannTree getTreeR() {
return jKleppmannTreeManager.getTree(JObjectKey.of("fs"), LockingStrategy.OPTIMISTIC);
return jKleppmannTreeManager.getTree(JObjectKey.of("fs"), LockingStrategy.OPTIMISTIC).orElseThrow();
}
private void resolveConflict(PeerId from, JObjectKey key, PMap<PeerId, Long> receivedChangelog,

View File

@@ -1,4 +1,10 @@
package com.usatiuk.dhfs.jkleppmanntree.structs;
package com.usatiuk.dhfsfs.objects;
import com.usatiuk.dhfs.jkleppmanntree.structs.JKleppmannTreeNodeMeta;
import com.usatiuk.objects.JObjectKey;
import java.util.Collection;
import java.util.List;
public class JKleppmannTreeNodeMetaDirectory extends JKleppmannTreeNodeMeta {
public JKleppmannTreeNodeMetaDirectory(String name) {
@@ -9,4 +15,9 @@ public class JKleppmannTreeNodeMetaDirectory extends JKleppmannTreeNodeMeta {
public JKleppmannTreeNodeMeta withName(String name) {
return new JKleppmannTreeNodeMetaDirectory(name);
}
@Override
public Collection<JObjectKey> collectRefsTo() {
return List.of();
}
}

View File

@@ -1,7 +1,10 @@
package com.usatiuk.dhfs.jkleppmanntree.structs;
package com.usatiuk.dhfsfs.objects;
import com.usatiuk.dhfs.jkleppmanntree.structs.JKleppmannTreeNodeMeta;
import com.usatiuk.objects.JObjectKey;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
public class JKleppmannTreeNodeMetaFile extends JKleppmannTreeNodeMeta {
@@ -42,4 +45,9 @@ public class JKleppmannTreeNodeMetaFile extends JKleppmannTreeNodeMeta {
"_fileIno=" + _fileIno +
'}';
}
@Override
public Collection<JObjectKey> collectRefsTo() {
return List.of(_fileIno);
}
}

View File

@@ -2,26 +2,26 @@ package com.usatiuk.dhfsfs.service;
import com.google.protobuf.ByteString;
import com.google.protobuf.UnsafeByteOperations;
import com.usatiuk.dhfs.jkleppmanntree.JKleppmannTreeManager;
import com.usatiuk.dhfs.jkleppmanntree.structs.JKleppmannTreeNode;
import com.usatiuk.dhfs.jkleppmanntree.structs.JKleppmannTreeNodeMeta;
import com.usatiuk.dhfs.jmap.JMapEntry;
import com.usatiuk.dhfs.jmap.JMapHelper;
import com.usatiuk.dhfs.jmap.JMapLongKey;
import com.usatiuk.dhfs.remoteobj.JDataRemote;
import com.usatiuk.dhfs.remoteobj.RemoteObjectMeta;
import com.usatiuk.dhfs.remoteobj.RemoteTransaction;
import com.usatiuk.dhfsfs.objects.ChunkData;
import com.usatiuk.dhfsfs.objects.File;
import com.usatiuk.dhfs.jkleppmanntree.JKleppmannTreeManager;
import com.usatiuk.dhfs.jkleppmanntree.structs.JKleppmannTreeNode;
import com.usatiuk.dhfs.jkleppmanntree.structs.JKleppmannTreeNodeMeta;
import com.usatiuk.dhfs.jkleppmanntree.structs.JKleppmannTreeNodeMetaDirectory;
import com.usatiuk.dhfs.jkleppmanntree.structs.JKleppmannTreeNodeMetaFile;
import com.usatiuk.dhfs.jmap.JMapEntry;
import com.usatiuk.dhfs.jmap.JMapHelper;
import com.usatiuk.dhfs.jmap.JMapLongKey;
import com.usatiuk.utils.StatusRuntimeExceptionNoStacktrace;
import com.usatiuk.dhfsfs.objects.JKleppmannTreeNodeMetaDirectory;
import com.usatiuk.dhfsfs.objects.JKleppmannTreeNodeMetaFile;
import com.usatiuk.objects.JData;
import com.usatiuk.objects.JObjectKey;
import com.usatiuk.objects.iterators.IteratorStart;
import com.usatiuk.objects.transaction.LockingStrategy;
import com.usatiuk.objects.transaction.Transaction;
import com.usatiuk.objects.transaction.TransactionManager;
import com.usatiuk.utils.StatusRuntimeExceptionNoStacktrace;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.quarkus.logging.Log;
@@ -73,11 +73,11 @@ public class DhfsFileServiceImpl implements DhfsFileService {
JMapHelper jMapHelper;
private JKleppmannTreeManager.JKleppmannTree getTreeW() {
return jKleppmannTreeManager.getTree(JObjectKey.of("fs"));
return jKleppmannTreeManager.getTree(JObjectKey.of("fs"), () -> new JKleppmannTreeNodeMetaDirectory(""));
}
private JKleppmannTreeManager.JKleppmannTree getTreeR() {
return jKleppmannTreeManager.getTree(JObjectKey.of("fs"), LockingStrategy.OPTIMISTIC);
return jKleppmannTreeManager.getTree(JObjectKey.of("fs"), LockingStrategy.OPTIMISTIC, () -> new JKleppmannTreeNodeMetaDirectory(""));
}
private ChunkData createChunk(ByteString bytes) {

View File

@@ -23,11 +23,11 @@ public class OpHandler {
if (op instanceof IndexUpdateOp iu) {
pushOpHandler.handlePush(from, iu);
} else if (op instanceof JKleppmannTreeOpWrapper jk) {
var tree = jKleppmannTreeManager.getTree(jk.treeName());
var tree = jKleppmannTreeManager.getTree(jk.treeName()).orElseThrow();
tree.acceptExternalOp(from, jk);
curTx.onCommit(() -> invalidationQueueService.pushInvalidationToOne(from, jk.treeName()));
} else if (op instanceof JKleppmannTreePeriodicPushOp pop) {
var tree = jKleppmannTreeManager.getTree(pop.treeName());
var tree = jKleppmannTreeManager.getTree(pop.treeName()).orElseThrow();
tree.acceptExternalOp(from, pop);
}
}

View File

@@ -1,9 +1,9 @@
package com.usatiuk.dhfs.jkleppmanntree;
import com.usatiuk.dhfs.peersync.PeerId;
import com.usatiuk.dhfs.invalidation.InvalidationQueueService;
import com.usatiuk.dhfs.jkleppmanntree.structs.JKleppmannTreePersistentData;
import com.usatiuk.dhfs.peersync.InitialSyncProcessor;
import com.usatiuk.dhfs.invalidation.InvalidationQueueService;
import com.usatiuk.dhfs.peersync.PeerId;
import com.usatiuk.objects.JObjectKey;
import io.quarkus.logging.Log;
import jakarta.enterprise.context.ApplicationScoped;
@@ -18,7 +18,7 @@ public class JKleppmannTreeInitialSyncProcessor implements InitialSyncProcessor<
@Override
public void prepareForInitialSync(PeerId from, JObjectKey key) {
var tree = jKleppmannTreeManager.getTree(key);
var tree = jKleppmannTreeManager.getTree(key).orElseThrow();
tree.recordBootstrap(from);
}

View File

@@ -1,15 +1,17 @@
package com.usatiuk.dhfs.jkleppmanntree;
import com.usatiuk.objects.JObjectKey;
import com.usatiuk.dhfs.peersync.PeerId;
import com.usatiuk.dhfs.jkleppmanntree.structs.*;
import com.usatiuk.dhfs.peersync.PersistentPeerDataService;
import com.usatiuk.dhfs.invalidation.Op;
import com.usatiuk.dhfs.jkleppmanntree.structs.JKleppmannTreeNode;
import com.usatiuk.dhfs.jkleppmanntree.structs.JKleppmannTreeNodeMeta;
import com.usatiuk.dhfs.jkleppmanntree.structs.JKleppmannTreePersistentData;
import com.usatiuk.dhfs.peersync.PeerId;
import com.usatiuk.dhfs.peersync.PeerInfoService;
import com.usatiuk.dhfs.peersync.PersistentPeerDataService;
import com.usatiuk.kleppmanntree.*;
import com.usatiuk.objects.JObjectKey;
import com.usatiuk.objects.transaction.LockingStrategy;
import com.usatiuk.objects.transaction.Transaction;
import com.usatiuk.objects.transaction.TransactionManager;
import com.usatiuk.kleppmanntree.*;
import io.quarkus.logging.Log;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
@@ -20,6 +22,7 @@ import org.pcollections.TreePSet;
import java.util.*;
import java.util.function.Function;
import java.util.function.Supplier;
@ApplicationScoped
public class JKleppmannTreeManager {
@@ -35,7 +38,7 @@ public class JKleppmannTreeManager {
@Inject
PersistentPeerDataService persistentPeerDataService;
public JKleppmannTree getTree(JObjectKey name, LockingStrategy lockingStrategy) {
public JKleppmannTree getTree(JObjectKey name, LockingStrategy lockingStrategy, Supplier<JKleppmannTreeNodeMeta> rootNodeSupplier) {
return txManager.executeTx(() -> {
var data = curTx.get(JKleppmannTreePersistentData.class, name, lockingStrategy).orElse(null);
if (data == null) {
@@ -49,11 +52,11 @@ public class JKleppmannTreeManager {
TreePMap.empty()
);
curTx.put(data);
var rootNode = new JKleppmannTreeNode(JObjectKey.of(name.value() + "_jt_root"), null, new JKleppmannTreeNodeMetaDirectory(""));
var rootNode = new JKleppmannTreeNode(JObjectKey.of(name.value() + "_jt_root"), null, rootNodeSupplier.get());
curTx.put(rootNode);
var trashNode = new JKleppmannTreeNode(JObjectKey.of(name.value() + "_jt_trash"), null, new JKleppmannTreeNodeMetaDirectory(""));
var trashNode = new JKleppmannTreeNode(JObjectKey.of(name.value() + "_jt_trash"), null, rootNodeSupplier.get());
curTx.put(trashNode);
var lf_node = new JKleppmannTreeNode(JObjectKey.of(name.value() + "_jt_lf"), null, new JKleppmannTreeNodeMetaDirectory(""));
var lf_node = new JKleppmannTreeNode(JObjectKey.of(name.value() + "_jt_lf"), null, rootNodeSupplier.get());
curTx.put(lf_node);
}
return new JKleppmannTree(data);
@@ -61,10 +64,20 @@ public class JKleppmannTreeManager {
});
}
public JKleppmannTree getTree(JObjectKey name) {
public Optional<JKleppmannTree> getTree(JObjectKey name) {
return getTree(name, LockingStrategy.WRITE);
}
public Optional<JKleppmannTree> getTree(JObjectKey name, LockingStrategy lockingStrategy) {
return txManager.executeTx(() -> {
return curTx.get(JKleppmannTreePersistentData.class, name, lockingStrategy).map(JKleppmannTree::new);
});
}
public JKleppmannTree getTree(JObjectKey name, Supplier<JKleppmannTreeNodeMeta> rootNodeSupplier) {
return getTree(name, LockingStrategy.WRITE, rootNodeSupplier);
}
public class JKleppmannTree {
private final KleppmannTree<Long, PeerId, JKleppmannTreeNodeMeta, JObjectKey> _tree;
private final JKleppmannTreeStorageInterface _storageInterface;

View File

@@ -1,11 +1,11 @@
package com.usatiuk.dhfs.jkleppmanntree;
import com.usatiuk.dhfs.peersync.PeerId;
import com.usatiuk.dhfs.remoteobj.RemoteTransaction;
import com.usatiuk.dhfs.jkleppmanntree.structs.JKleppmannTreePersistentData;
import com.usatiuk.dhfs.invalidation.InvalidationQueueService;
import com.usatiuk.dhfs.invalidation.Op;
import com.usatiuk.dhfs.invalidation.OpExtractor;
import com.usatiuk.dhfs.jkleppmanntree.structs.JKleppmannTreePersistentData;
import com.usatiuk.dhfs.peersync.PeerId;
import com.usatiuk.dhfs.remoteobj.RemoteTransaction;
import com.usatiuk.dhfs.syncmap.DtoMapperService;
import com.usatiuk.objects.transaction.Transaction;
import com.usatiuk.objects.transaction.TransactionManager;
@@ -34,7 +34,7 @@ public class JKleppmannTreeOpExtractor implements OpExtractor<JKleppmannTreePers
@Override
public Pair<List<Op>, Runnable> extractOps(JKleppmannTreePersistentData data, PeerId peerId) {
return txm.run(() -> {
var tree = jKleppmannTreeManager.getTree(data.key());
var tree = jKleppmannTreeManager.getTree(data.key()).orElseThrow();
if (!tree.hasPendingOpsForHost(peerId))
return Pair.of(List.of(tree.getPeriodicPushOp()), (Runnable) () -> {
@@ -48,7 +48,7 @@ public class JKleppmannTreeOpExtractor implements OpExtractor<JKleppmannTreePers
var key = data.key();
return Pair.<List<Op>, Runnable>of(ops, (Runnable) () -> {
txm.run(() -> {
var commitTree = jKleppmannTreeManager.getTree(key);
var commitTree = jKleppmannTreeManager.getTree(key).orElseThrow();
for (var op : ops) {
commitTree.commitOpForHost(peerId, op);
}

View File

@@ -1,24 +1,19 @@
package com.usatiuk.dhfs.jkleppmanntree;
import com.usatiuk.objects.JObjectKey;
import com.usatiuk.dhfs.peersync.PeerId;
import com.usatiuk.dhfs.jkleppmanntree.structs.JKleppmannTreeNodeMeta;
import com.usatiuk.dhfs.jkleppmanntree.structs.JKleppmannTreeNodeMetaFile;
import com.usatiuk.dhfs.invalidation.Op;
import com.usatiuk.dhfs.jkleppmanntree.structs.JKleppmannTreeNodeMeta;
import com.usatiuk.dhfs.peersync.PeerId;
import com.usatiuk.kleppmanntree.OpMove;
import com.usatiuk.objects.JObjectKey;
import java.io.Serializable;
import java.util.Collection;
import java.util.List;
// Wrapper to avoid having to specify generic types
public record JKleppmannTreeOpWrapper(JObjectKey treeName,
OpMove<Long, PeerId, JKleppmannTreeNodeMeta, JObjectKey> op) implements Op, Serializable {
@Override
public Collection<JObjectKey> getEscapedRefs() {
if (op.newMeta() instanceof JKleppmannTreeNodeMetaFile mf) {
return List.of(mf.getFileIno());
}
return List.of();
return op.newMeta().collectRefsTo();
}
}

View File

@@ -1,12 +1,12 @@
package com.usatiuk.dhfs.jkleppmanntree.structs;
import com.usatiuk.dhfs.peersync.PeerId;
import com.usatiuk.dhfs.refcount.JDataRef;
import com.usatiuk.dhfs.refcount.JDataRefcounted;
import com.usatiuk.objects.JObjectKey;
import com.usatiuk.dhfs.peersync.PeerId;
import com.usatiuk.dhfs.peersync.structs.JKleppmannTreeNodeMetaPeer;
import com.usatiuk.kleppmanntree.OpMove;
import com.usatiuk.kleppmanntree.TreeNode;
import com.usatiuk.objects.JObjectKey;
import jakarta.annotation.Nullable;
import org.pcollections.HashTreePMap;
import org.pcollections.PCollection;
import org.pcollections.PMap;
@@ -14,13 +14,14 @@ import org.pcollections.TreePSet;
import java.io.Serializable;
import java.util.Collection;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.Stream;
// FIXME: Ideally this is two classes?
public record JKleppmannTreeNode(JObjectKey key, PCollection<JDataRef> refsFrom, boolean frozen, JObjectKey parent,
OpMove<Long, PeerId, JKleppmannTreeNodeMeta, JObjectKey> lastEffectiveOp,
JKleppmannTreeNodeMeta meta,
@Nullable JKleppmannTreeNodeMeta meta,
PMap<String, JObjectKey> children) implements TreeNode<Long, PeerId, JKleppmannTreeNodeMeta, JObjectKey>, JDataRefcounted, Serializable {
public JKleppmannTreeNode(JObjectKey id, JObjectKey parent, JKleppmannTreeNodeMeta meta) {
@@ -60,13 +61,9 @@ public record JKleppmannTreeNode(JObjectKey key, PCollection<JDataRef> refsFrom,
@Override
public Collection<JObjectKey> collectRefsTo() {
return Stream.<JObjectKey>concat(children().values().stream(),
switch (meta()) {
case JKleppmannTreeNodeMetaDirectory dir -> Stream.<JObjectKey>empty();
case JKleppmannTreeNodeMetaFile file -> Stream.of(file.getFileIno());
case JKleppmannTreeNodeMetaPeer peer -> Stream.of(peer.getPeerId());
case null -> Stream.<JObjectKey>empty();
default -> throw new IllegalStateException("Unexpected value: " + meta());
}
).collect(Collectors.toUnmodifiableSet());
Optional.ofNullable(meta)
.<Stream<JObjectKey>>map(o -> o.collectRefsTo().stream())
.orElse(Stream.empty()))
.collect(Collectors.toUnmodifiableSet());
}
}

View File

@@ -1,7 +1,9 @@
package com.usatiuk.dhfs.jkleppmanntree.structs;
import com.usatiuk.kleppmanntree.NodeMeta;
import com.usatiuk.objects.JObjectKey;
import java.util.Collection;
import java.util.Objects;
//@ProtoMirror(JKleppmannTreeNodeMetaP.class)
@@ -38,4 +40,6 @@ public abstract class JKleppmannTreeNodeMeta implements NodeMeta {
"_name='" + _name + '\'' +
'}';
}
abstract public Collection<JObjectKey> collectRefsTo();
}

View File

@@ -1,10 +1,10 @@
package com.usatiuk.dhfs.peersync;
import com.usatiuk.objects.JObjectKey;
import com.usatiuk.dhfs.remoteobj.RemoteTransaction;
import com.usatiuk.dhfs.jkleppmanntree.JKleppmannTreeManager;
import com.usatiuk.dhfs.jkleppmanntree.structs.JKleppmannTreeNode;
import com.usatiuk.dhfs.peersync.structs.JKleppmannTreeNodeMetaPeer;
import com.usatiuk.dhfs.remoteobj.RemoteTransaction;
import com.usatiuk.objects.JObjectKey;
import com.usatiuk.objects.transaction.LockingStrategy;
import com.usatiuk.objects.transaction.Transaction;
import com.usatiuk.objects.transaction.TransactionManager;
@@ -30,11 +30,11 @@ public class PeerInfoService {
RemoteTransaction remoteTx;
private JKleppmannTreeManager.JKleppmannTree getTreeW() {
return jKleppmannTreeManager.getTree(TREE_KEY);
return jKleppmannTreeManager.getTree(TREE_KEY, () -> null);
}
private JKleppmannTreeManager.JKleppmannTree getTreeR() {
return jKleppmannTreeManager.getTree(TREE_KEY, LockingStrategy.OPTIMISTIC);
return jKleppmannTreeManager.getTree(TREE_KEY, LockingStrategy.OPTIMISTIC, () -> null);
}
public Optional<PeerInfo> getPeerInfoImpl(JObjectKey key) {

View File

@@ -4,6 +4,8 @@ import com.usatiuk.objects.JObjectKey;
import com.usatiuk.dhfs.peersync.PeerId;
import com.usatiuk.dhfs.jkleppmanntree.structs.JKleppmannTreeNodeMeta;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
//@ProtoMirror(JKleppmannTreeNodeMetaFileP.class)
@@ -50,4 +52,9 @@ public class JKleppmannTreeNodeMetaPeer extends JKleppmannTreeNodeMeta {
public int hashCode() {
return Objects.hash(super.hashCode(), _peerId);
}
@Override
public Collection<JObjectKey> collectRefsTo() {
return List.of(_peerId);
}
}