6 Commits

Author SHA1 Message Date
ad4ce72fdd Dhfs-fuse: attempt at windows support 2025-04-25 22:17:55 +02:00
26ba65fdce Sync-base: make Pushing invalidations log message trace
it's too big
2025-04-25 22:02:21 +02:00
697add66d5 Kelppmanntree: fix a dumb bug
directories are always the same duh
2025-04-25 22:00:33 +02:00
a53fc5e973 Kelppmanntree: remove undocontext 2025-04-25 21:37:50 +02:00
b034591091 Sync-base: OpHandler interface 2025-04-25 15:04:07 +02:00
07133a7186 Sync-base: get rid of JDataRemotePush 2025-04-25 14:57:06 +02:00
24 changed files with 304 additions and 243 deletions

View File

@@ -80,6 +80,22 @@ public class DhfsFuseIT {
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
}
private void checkConsistency() {
await().atMost(45, TimeUnit.SECONDS).until(() -> {
Log.info("Listing consistency");
var cat1 = container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/*/*");
var cat2 = container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/*/*");
var ls1 = container1.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse/*/*");
var ls2 = container2.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse/*/*");
Log.info(ls1);
Log.info(cat1);
Log.info(ls2);
Log.info(cat2);
return ls1.equals(ls2) && cat1.equals(cat2);
});
}
@AfterEach
void stop() {
Stream.of(container1, container2).parallel().forEach(GenericContainer::stop);
@@ -249,7 +265,7 @@ public class DhfsFuseIT {
"curl --header \"Content-Type: application/json\" " +
" --request DELETE " +
" --data '{}' " +
" http://localhost:8080/peers-manage/known-peers/"+c1uuid);
" http://localhost:8080/peers-manage/known-peers/" + c1uuid);
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "echo rewritten > /dhfs_test/fuse/testf1").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "echo jioadsd > /dhfs_test/fuse/newfile1").getExitCode());
@@ -265,7 +281,7 @@ public class DhfsFuseIT {
"curl --header \"Content-Type: application/json\" " +
" --request PUT " +
" --data '{}' " +
" http://localhost:8080/peers-manage/known-peers/"+c1uuid);
" http://localhost:8080/peers-manage/known-peers/" + c1uuid);
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
@@ -309,6 +325,33 @@ public class DhfsFuseIT {
});
}
@Test
void dirConflictTest2() throws IOException, InterruptedException, TimeoutException {
var client = DockerClientFactory.instance().client();
client.disconnectFromNetworkCmd().withContainerId(container1.getContainerId()).withNetworkId(network.getId()).exec();
client.disconnectFromNetworkCmd().withContainerId(container2.getContainerId()).withNetworkId(network.getId()).exec();
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Lost connection to"), 60, TimeUnit.SECONDS, 1);
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Lost connection to"), 60, TimeUnit.SECONDS, 1);
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "mkdir /dhfs_test/fuse/a && echo fdsaio >> /dhfs_test/fuse/a/testf").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "mkdir /dhfs_test/fuse/a && echo exgrg >> /dhfs_test/fuse/a/testf").getExitCode());
client.connectToNetworkCmd().withContainerId(container1.getContainerId()).withNetworkId(network.getId()).exec();
client.connectToNetworkCmd().withContainerId(container2.getContainerId()).withNetworkId(network.getId()).exec();
Log.warn("Waiting for connections");
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS, 1);
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS, 1);
Log.warn("Connected");
checkConsistency();
var ls1 = container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/a*/*");
Assertions.assertTrue(ls1.getStdout().contains("fdsaio"));
Assertions.assertTrue(ls1.getStdout().contains("exgrg"));
}
@Test
void dirCycleTest() throws IOException, InterruptedException, TimeoutException {
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse").getExitCode());

View File

@@ -4,6 +4,7 @@ import com.google.protobuf.ByteString;
import com.google.protobuf.UnsafeByteOperations;
import com.usatiuk.dhfs.jkleppmanntree.JKleppmannTreeManager;
import com.usatiuk.dhfs.jkleppmanntree.structs.JKleppmannTreeNode;
import com.usatiuk.dhfs.jkleppmanntree.structs.JKleppmannTreeNodeHolder;
import com.usatiuk.dhfs.jkleppmanntree.structs.JKleppmannTreeNodeMeta;
import com.usatiuk.dhfs.jmap.JMapEntry;
import com.usatiuk.dhfs.jmap.JMapHelper;
@@ -94,21 +95,21 @@ public class DhfsFileServiceImpl implements DhfsFileService {
private JKleppmannTreeNode getDirEntryW(String name) {
var res = getTreeW().traverse(StreamSupport.stream(Path.of(name).spliterator(), false).map(p -> p.toString()).toList());
if (res == null) throw new StatusRuntimeExceptionNoStacktrace(Status.NOT_FOUND);
var ret = curTx.get(JKleppmannTreeNode.class, res).orElseThrow(() -> new StatusRuntimeException(Status.NOT_FOUND.withDescription("Tree node exists but not found as jObject: " + name)));
var ret = curTx.get(JKleppmannTreeNodeHolder.class, res).map(JKleppmannTreeNodeHolder::node).orElseThrow(() -> new StatusRuntimeException(Status.NOT_FOUND.withDescription("Tree node exists but not found as jObject: " + name)));
return ret;
}
private JKleppmannTreeNode getDirEntryR(String name) {
var res = getTreeR().traverse(StreamSupport.stream(Path.of(name).spliterator(), false).map(p -> p.toString()).toList());
if (res == null) throw new StatusRuntimeExceptionNoStacktrace(Status.NOT_FOUND);
var ret = curTx.get(JKleppmannTreeNode.class, res).orElseThrow(() -> new StatusRuntimeException(Status.NOT_FOUND.withDescription("Tree node exists but not found as jObject: " + name)));
var ret = curTx.get(JKleppmannTreeNodeHolder.class, res).map(JKleppmannTreeNodeHolder::node).orElseThrow(() -> new StatusRuntimeException(Status.NOT_FOUND.withDescription("Tree node exists but not found as jObject: " + name)));
return ret;
}
private Optional<JKleppmannTreeNode> getDirEntryOpt(String name) {
var res = getTreeW().traverse(StreamSupport.stream(Path.of(name).spliterator(), false).map(p -> p.toString()).toList());
if (res == null) return Optional.empty();
var ret = curTx.get(JKleppmannTreeNode.class, res);
var ret = curTx.get(JKleppmannTreeNodeHolder.class, res).map(JKleppmannTreeNodeHolder::node);
return ret;
}
@@ -125,7 +126,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
} else {
throw new StatusRuntimeException(Status.DATA_LOSS.withDescription("FsNode is not an FsNode: " + ref.key()));
}
} else if (ref instanceof JKleppmannTreeNode) {
} else if (ref instanceof JKleppmannTreeNodeHolder) {
ret = new GetattrRes(100, 100, 0700, GetattrType.DIRECTORY);
} else {
throw new StatusRuntimeException(Status.DATA_LOSS.withDescription("FsNode is not an FsNode: " + ref.key()));
@@ -242,7 +243,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
return jObjectTxManager.executeTx(() -> {
var dent = curTx.get(JData.class, uuid).orElseThrow(() -> new StatusRuntimeExceptionNoStacktrace(Status.NOT_FOUND));
if (dent instanceof JKleppmannTreeNode) {
if (dent instanceof JKleppmannTreeNodeHolder) {
return true;
} else if (dent instanceof RemoteObjectMeta) {
var remote = remoteTx.getData(JDataRemote.class, uuid).orElse(null);
@@ -628,7 +629,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
var dent = curTx.get(JData.class, fileUuid).orElseThrow(() -> new StatusRuntimeExceptionNoStacktrace(Status.NOT_FOUND));
// FIXME:
if (dent instanceof JKleppmannTreeNode) {
if (dent instanceof JKleppmannTreeNodeHolder) {
return true;
} else if (dent instanceof RemoteObjectMeta) {
var remote = remoteTx.getData(JDataRemote.class, fileUuid).orElse(null);

View File

@@ -72,40 +72,48 @@ public class DhfsFuse extends FuseStubFS {
void init(@Observes @Priority(100000) StartupEvent event) {
if (!enabled) return;
Paths.get(root).toFile().mkdirs();
if (!Paths.get(root).toFile().isDirectory())
throw new IllegalStateException("Could not create directory " + root);
Log.info("Mounting with root " + root);
var uid = new UnixSystem().getUid();
var gid = new UnixSystem().getGid();
var opts = new ArrayList<String>();
// Assuming macFuse
if (SystemUtils.IS_OS_MAC) {
if (SystemUtils.IS_OS_WINDOWS) {
opts.add("-o");
opts.add("iosize=" + iosize);
} else if (SystemUtils.IS_OS_LINUX) {
// FIXME: There's something else missing: the writes still seem to be 32k max
opts.add("auto_cache");
opts.add("-o");
opts.add("uid=-1");
opts.add("-o");
opts.add("gid=-1");
} else {
Paths.get(root).toFile().mkdirs();
if (!Paths.get(root).toFile().isDirectory())
throw new IllegalStateException("Could not create directory " + root);
var uid = new UnixSystem().getUid();
var gid = new UnixSystem().getGid();
// Assuming macFuse
if (SystemUtils.IS_OS_MAC) {
opts.add("-o");
opts.add("iosize=" + iosize);
} else if (SystemUtils.IS_OS_LINUX) {
// FIXME: There's something else missing: the writes still seem to be 32k max
// opts.add("-o");
// opts.add("large_read");
opts.add("-o");
opts.add("big_writes");
opts.add("-o");
opts.add("max_read=" + iosize);
opts.add("-o");
opts.add("max_write=" + iosize);
}
opts.add("-o");
opts.add("big_writes");
opts.add("auto_cache");
opts.add("-o");
opts.add("max_read=" + iosize);
opts.add("uid=" + uid);
opts.add("-o");
opts.add("max_write=" + iosize);
opts.add("gid=" + gid);
}
opts.add("-o");
opts.add("auto_cache");
opts.add("-o");
opts.add("uid=" + uid);
opts.add("-o");
opts.add("gid=" + gid);
mount(Paths.get(root), false, debug, opts.toArray(String[]::new));
}

View File

@@ -15,7 +15,6 @@ public class KleppmannTree<TimestampT extends Comparable<TimestampT>, PeerIdT ex
private final PeerInterface<PeerIdT> _peers;
private final Clock<TimestampT> _clock;
private final OpRecorder<TimestampT, PeerIdT, MetaT, NodeIdT> _opRecorder;
private HashMap<NodeIdT, TreeNode<TimestampT, PeerIdT, MetaT, NodeIdT>> _undoCtx = null;
public KleppmannTree(StorageInterface<TimestampT, PeerIdT, MetaT, NodeIdT> storage,
PeerInterface<PeerIdT> peers,
@@ -87,7 +86,6 @@ public class KleppmannTree<TimestampT extends Comparable<TimestampT>, PeerIdT ex
node.withParent(null)
.withLastEffectiveOp(null)
);
_undoCtx.put(node.key(), node);
}
}
@@ -217,7 +215,6 @@ public class KleppmannTree<TimestampT extends Comparable<TimestampT>, PeerIdT ex
if (cmp < 0) {
if (log.containsKey(op.timestamp())) return;
var toUndo = log.newestSlice(op.timestamp(), false);
_undoCtx = new HashMap<>();
for (var entry : toUndo.reversed()) {
undoOp(entry.getValue());
}
@@ -225,13 +222,6 @@ public class KleppmannTree<TimestampT extends Comparable<TimestampT>, PeerIdT ex
for (var entry : toUndo) {
redoOp(entry);
}
if (!_undoCtx.isEmpty()) {
for (var e : _undoCtx.entrySet()) {
LOGGER.log(Level.FINE, "Dropping node " + e.getKey());
_storage.removeNode(e.getKey());
}
}
_undoCtx = null;
tryTrimLog();
} else {
doAndPut(op, failCreatingIfExists);
@@ -264,24 +254,6 @@ public class KleppmannTree<TimestampT extends Comparable<TimestampT>, PeerIdT ex
}
private TreeNode<TimestampT, PeerIdT, MetaT, NodeIdT> getNewNode(NodeIdT key, NodeIdT parent, MetaT meta) {
if (_undoCtx != null) {
var node = _undoCtx.get(key);
if (node != null) {
try {
if (!node.children().isEmpty()) {
LOGGER.log(Level.WARNING, "Not empty children for undone node " + key);
}
node = node.withParent(parent).withMeta(meta);
} catch (Exception e) {
LOGGER.log(Level.SEVERE, "Error while fixing up node " + key, e);
node = null;
}
}
if (node != null) {
_undoCtx.remove(key);
return node;
}
}
return _storage.createNewNode(key, parent, meta);
}
@@ -362,11 +334,6 @@ public class KleppmannTree<TimestampT extends Comparable<TimestampT>, PeerIdT ex
var conflictNode = _storage.getById(conflictNodeId);
MetaT conflictNodeMeta = conflictNode.meta();
if (Objects.equals(conflictNodeMeta, op.newMeta())) {
LOGGER.finer(() -> "Node creation conflict (the same): " + conflictNode);
return new LogRecord<>(op, null);
}
LOGGER.finer(() -> "Node creation conflict: " + conflictNode);
String newConflictNodeName = op.newName() + ".conflict." + conflictNode.key();
@@ -399,10 +366,6 @@ public class KleppmannTree<TimestampT extends Comparable<TimestampT>, PeerIdT ex
var replaceNode = _storage.getById(replaceNodeId);
var replaceNodeMeta = replaceNode.meta();
if (Objects.equals(replaceNodeMeta, op.newMeta())) {
return new LogRecord<>(op, null);
}
LOGGER.finer(() -> "Node replacement: " + replaceNode);
return new LogRecord<>(op, List.of(

View File

@@ -13,6 +13,10 @@
</parent>
<dependencies>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-smallrye-openapi</artifactId>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>

View File

@@ -1,14 +1,13 @@
package com.usatiuk.dhfs.invalidation;
import com.usatiuk.dhfs.peersync.PeerId;
import com.usatiuk.dhfs.remoteobj.JDataRemoteDto;
import com.usatiuk.objects.JObjectKey;
import org.pcollections.PMap;
import java.util.Collection;
import java.util.List;
public record IndexUpdateOp(JObjectKey key, PMap<PeerId, Long> changelog, JDataRemoteDto data) implements Op {
public record IndexUpdateOp(JObjectKey key, PMap<PeerId, Long> changelog) implements Op {
@Override
public Collection<JObjectKey> getEscapedRefs() {
return List.of(key);

View File

@@ -168,7 +168,7 @@ public class InvalidationQueueService {
for (var p : ops.keySet()) {
var list = ops.get(p);
Log.infov("Pushing invalidations to {0}: {1}", p, list);
Log.tracev("Pushing invalidations to {0}: {1}", p, list);
remoteObjectServiceClient.pushOps(p, list);
commits.get(p).forEach(Runnable::run);
}

View File

@@ -1,34 +1,7 @@
package com.usatiuk.dhfs.invalidation;
import com.usatiuk.dhfs.jkleppmanntree.JKleppmannTreeManager;
import com.usatiuk.dhfs.jkleppmanntree.JKleppmannTreeOpWrapper;
import com.usatiuk.dhfs.jkleppmanntree.JKleppmannTreePeriodicPushOp;
import com.usatiuk.dhfs.peersync.PeerId;
import com.usatiuk.objects.transaction.Transaction;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
@ApplicationScoped
public class OpHandler {
@Inject
PushOpHandler pushOpHandler;
@Inject
Transaction curTx;
@Inject
JKleppmannTreeManager jKleppmannTreeManager;
@Inject
InvalidationQueueService invalidationQueueService;
public void handleOp(PeerId from, Op op) {
if (op instanceof IndexUpdateOp iu) {
pushOpHandler.handlePush(from, iu);
} else if (op instanceof JKleppmannTreeOpWrapper jk) {
var tree = jKleppmannTreeManager.getTree(jk.treeName()).orElseThrow();
tree.acceptExternalOp(from, jk);
curTx.onCommit(() -> invalidationQueueService.pushInvalidationToOne(from, jk.treeName()));
} else if (op instanceof JKleppmannTreePeriodicPushOp pop) {
var tree = jKleppmannTreeManager.getTree(pop.treeName()).orElseThrow();
tree.acceptExternalOp(from, pop);
}
}
public interface OpHandler<T extends Op> {
void handleOp(PeerId from, T op);
}

View File

@@ -0,0 +1,44 @@
package com.usatiuk.dhfs.invalidation;
import com.usatiuk.dhfs.peersync.PeerId;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Instance;
import java.lang.reflect.ParameterizedType;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Stream;
@ApplicationScoped
public class OpHandlerService {
private final Map<Class<? extends Op>, OpHandler> _opHandlerMap;
public OpHandlerService(Instance<OpHandler<?>> OpHandlers) {
HashMap<Class<? extends Op>, OpHandler> OpHandlerMap = new HashMap<>();
for (var OpHandler : OpHandlers.handles()) {
for (var type : Arrays.stream(OpHandler.getBean().getBeanClass().getGenericInterfaces()).flatMap(
t -> {
if (!(t instanceof ParameterizedType pm)) return Stream.empty();
if (pm.getRawType().equals(OpHandler.class)) return Stream.of(pm);
return Stream.empty();
}
).toList()) {
var orig = type.getActualTypeArguments()[0];
assert Op.class.isAssignableFrom((Class<?>) orig);
OpHandlerMap.put((Class<? extends Op>) orig, OpHandler.get());
}
}
_opHandlerMap = Map.copyOf(OpHandlerMap);
}
public void handleOp(PeerId from, Op op) {
var handler = _opHandlerMap.get(op.getClass());
if (handler == null) {
throw new IllegalArgumentException("No handler for op: " + op.getClass());
}
handler.handleOp(from, op);
}
}

View File

@@ -1,22 +0,0 @@
package com.usatiuk.dhfs.invalidation;
import com.usatiuk.dhfs.peersync.PeerId;
import com.usatiuk.dhfs.remoteobj.RemoteTransaction;
import com.usatiuk.dhfs.remoteobj.SyncHandler;
import com.usatiuk.objects.transaction.Transaction;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
@ApplicationScoped
public class PushOpHandler {
@Inject
Transaction curTx;
@Inject
SyncHandler syncHandler;
@Inject
RemoteTransaction remoteTransaction;
public void handlePush(PeerId peer, IndexUpdateOp obj) {
syncHandler.handleRemoteUpdate(peer, obj.key(), obj.changelog(), obj.data());
}
}

View File

@@ -1,14 +1,10 @@
package com.usatiuk.dhfs.invalidation;
import com.usatiuk.dhfs.peersync.PeerId;
import com.usatiuk.dhfs.remoteobj.JDataRemoteDto;
import com.usatiuk.dhfs.remoteobj.JDataRemotePush;
import com.usatiuk.dhfs.remoteobj.RemoteObjectMeta;
import com.usatiuk.dhfs.remoteobj.RemoteTransaction;
import com.usatiuk.dhfs.syncmap.DtoMapperService;
import com.usatiuk.objects.transaction.Transaction;
import com.usatiuk.objects.transaction.TransactionManager;
import io.quarkus.logging.Log;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import org.apache.commons.lang3.tuple.Pair;
@@ -23,22 +19,11 @@ public class RemoteObjectMetaOpExtractor implements OpExtractor<RemoteObjectMeta
Transaction curTx;
@Inject
RemoteTransaction remoteTransaction;
@Inject
DtoMapperService dtoMapperService;
@Override
public Pair<List<Op>, Runnable> extractOps(RemoteObjectMeta data, PeerId peerId) {
return txm.run(() -> {
JDataRemoteDto dto =
data.knownType().isAnnotationPresent(JDataRemotePush.class)
? remoteTransaction.getData(data.knownType(), data.key())
.map(d -> dtoMapperService.toDto(d, d.dtoClass())).orElse(null)
: null;
if (data.knownType().isAnnotationPresent(JDataRemotePush.class) && dto == null) {
Log.warnv("Failed to get data for push {0} of type {1}", data.key(), data.knownType());
}
return Pair.of(List.of(new IndexUpdateOp(data.key(), data.changelog(), dto)), () -> {
return Pair.of(List.of(new IndexUpdateOp(data.key(), data.changelog())), () -> {
});
});
}

View File

@@ -2,6 +2,7 @@ package com.usatiuk.dhfs.jkleppmanntree;
import com.usatiuk.dhfs.invalidation.Op;
import com.usatiuk.dhfs.jkleppmanntree.structs.JKleppmannTreeNode;
import com.usatiuk.dhfs.jkleppmanntree.structs.JKleppmannTreeNodeHolder;
import com.usatiuk.dhfs.jkleppmanntree.structs.JKleppmannTreeNodeMeta;
import com.usatiuk.dhfs.jkleppmanntree.structs.JKleppmannTreePersistentData;
import com.usatiuk.dhfs.peersync.PeerId;
@@ -53,11 +54,11 @@ public class JKleppmannTreeManager {
);
curTx.put(data);
var rootNode = new JKleppmannTreeNode(JObjectKey.of(name.value() + "_jt_root"), null, rootNodeSupplier.get());
curTx.put(rootNode);
curTx.put(new JKleppmannTreeNodeHolder(rootNode));
var trashNode = new JKleppmannTreeNode(JObjectKey.of(name.value() + "_jt_trash"), null, rootNodeSupplier.get());
curTx.put(trashNode);
curTx.put(new JKleppmannTreeNodeHolder(trashNode));
var lf_node = new JKleppmannTreeNode(JObjectKey.of(name.value() + "_jt_lf"), null, rootNodeSupplier.get());
curTx.put(lf_node);
curTx.put(new JKleppmannTreeNodeHolder(lf_node));
}
return new JKleppmannTree(data);
// opObjectRegistry.registerObject(tree);
@@ -170,60 +171,13 @@ public class JKleppmannTreeManager {
if (Log.isTraceEnabled())
Log.trace("Received op from " + from + ": " + jop.op().timestamp().timestamp() + " " + jop.op().childId() + "->" + jop.op().newParentId() + " as " + jop.op().newMeta().name());
try {
_tree.applyExternalOp(from, jop.op());
} catch (Exception e) {
Log.error("Error applying external op", e);
throw e;
} finally {
// FIXME:
// Fixup the ref if it didn't really get applied
// if ((fileRef == null) && (jop.getOp().newMeta() instanceof JKleppmannTreeNodeMetaFile))
// Log.error("Could not create child of pushed op: " + jop.getOp());
// if (jop.getOp().newMeta() instanceof JKleppmannTreeNodeMetaFile f) {
// if (fileRef != null) {
// var got = jObjectManager.get(jop.getOp().childId()).orElse(null);
//
// VoidFn remove = () -> {
// fileRef.runWriteLockedVoid(JObjectManager.ResolutionStrategy.LOCAL_ONLY, (m, d, b, v) -> {
// m.removeRef(jop.getOp().childId());
// });
// };
//
// if (got == null) {
// remove.apply();
// } else {
// try {
// got.rLock();
// try {
// got.tryResolve(JObjectManager.ResolutionStrategy.LOCAL_ONLY);
// if (got.getData() == null || !got.getData().extractRefs().contains(f.getFileIno()))
// remove.apply();
// } finally {
// got.rUnlock();
// }
// } catch (DeletedObjectAccessException dex) {
// remove.apply();
// }
// }
// }
// }
}
_tree.applyExternalOp(from, jop.op());
}
public Op getPeriodicPushOp() {
return new JKleppmannTreePeriodicPushOp(_treeName, persistentPeerDataService.getSelfUuid(), _clock.peekTimestamp());
}
// @Override
// public void addToTx() {
// // FIXME: a hack
// _persistentData.get().rwLockNoCopy();
// _persistentData.get().rwUnlock();
// }
private class JOpRecorder implements OpRecorder<Long, PeerId, JKleppmannTreeNodeMeta, JObjectKey> {
@Override
public void recordOp(OpMove<Long, PeerId, JKleppmannTreeNodeMeta, JObjectKey> op) {
@@ -291,8 +245,8 @@ public class JKleppmannTreeManager {
@Override
public JKleppmannTreeNode getById(JObjectKey id) {
var got = curTx.get(JKleppmannTreeNode.class, id);
return got.orElse(null);
var got = curTx.get(JKleppmannTreeNodeHolder.class, id);
return got.map(JKleppmannTreeNodeHolder::node).orElse(null);
}
@Override
@@ -302,12 +256,14 @@ public class JKleppmannTreeManager {
@Override
public void putNode(TreeNode<Long, PeerId, JKleppmannTreeNodeMeta, JObjectKey> node) {
curTx.put(((JKleppmannTreeNode) node));
curTx.put(curTx.get(JKleppmannTreeNodeHolder.class, node.key())
.map(n -> n.withNode((JKleppmannTreeNode) node))
.orElse(new JKleppmannTreeNodeHolder((JKleppmannTreeNode) node)));
}
@Override
public void removeNode(JObjectKey id) {
// TODO
// GC
}
@Override

View File

@@ -0,0 +1,31 @@
package com.usatiuk.dhfs.jkleppmanntree;
import com.usatiuk.dhfs.invalidation.InvalidationQueueService;
import com.usatiuk.dhfs.invalidation.OpHandler;
import com.usatiuk.dhfs.peersync.PeerId;
import com.usatiuk.objects.transaction.Transaction;
import com.usatiuk.objects.transaction.TransactionManager;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
@ApplicationScoped
public class JKleppmannTreeOpHandler implements OpHandler<JKleppmannTreeOpWrapper> {
@Inject
TransactionManager txm;
@Inject
Transaction curTx;
@Inject
JKleppmannTreeManager jKleppmannTreeManager;
@Inject
InvalidationQueueService invalidationQueueService;
@Override
public void handleOp(PeerId from, JKleppmannTreeOpWrapper op) {
txm.run(()->{
var tree = jKleppmannTreeManager.getTree(op.treeName()).orElseThrow();
tree.acceptExternalOp(from, op);
// Push ack op
curTx.onCommit(() -> invalidationQueueService.pushInvalidationToOne(from, op.treeName()));
});
}
}

View File

@@ -0,0 +1,23 @@
package com.usatiuk.dhfs.jkleppmanntree;
import com.usatiuk.dhfs.invalidation.OpHandler;
import com.usatiuk.dhfs.peersync.PeerId;
import com.usatiuk.objects.transaction.TransactionManager;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
@ApplicationScoped
public class JKleppmannTreePeriodicOpHandler implements OpHandler<JKleppmannTreePeriodicPushOp> {
@Inject
TransactionManager txm;
@Inject
JKleppmannTreeManager jKleppmannTreeManager;
@Override
public void handleOp(PeerId from, JKleppmannTreePeriodicPushOp op) {
txm.run(() -> {
var tree = jKleppmannTreeManager.getTree(op.treeName()).orElseThrow();
tree.acceptExternalOp(from, op);
});
}
}

View File

@@ -1,16 +1,12 @@
package com.usatiuk.dhfs.jkleppmanntree.structs;
import com.usatiuk.dhfs.peersync.PeerId;
import com.usatiuk.dhfs.refcount.JDataRef;
import com.usatiuk.dhfs.refcount.JDataRefcounted;
import com.usatiuk.kleppmanntree.OpMove;
import com.usatiuk.kleppmanntree.TreeNode;
import com.usatiuk.objects.JObjectKey;
import jakarta.annotation.Nullable;
import org.pcollections.HashTreePMap;
import org.pcollections.PCollection;
import org.pcollections.PMap;
import org.pcollections.TreePSet;
import java.io.Serializable;
import java.util.Collection;
@@ -18,47 +14,35 @@ import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.Stream;
// FIXME: Ideally this is two classes?
public record JKleppmannTreeNode(JObjectKey key, PCollection<JDataRef> refsFrom, boolean frozen, JObjectKey parent,
public record JKleppmannTreeNode(JObjectKey key, JObjectKey parent,
OpMove<Long, PeerId, JKleppmannTreeNodeMeta, JObjectKey> lastEffectiveOp,
@Nullable JKleppmannTreeNodeMeta meta,
PMap<String, JObjectKey> children) implements TreeNode<Long, PeerId, JKleppmannTreeNodeMeta, JObjectKey>, JDataRefcounted, Serializable {
PMap<String, JObjectKey> children) implements TreeNode<Long, PeerId, JKleppmannTreeNodeMeta, JObjectKey>, Serializable {
public JKleppmannTreeNode(JObjectKey id, JObjectKey parent, JKleppmannTreeNodeMeta meta) {
this(id, TreePSet.empty(), false, parent, null, meta, HashTreePMap.empty());
this(id, parent, null, meta, HashTreePMap.empty());
}
@Override
public JKleppmannTreeNode withParent(JObjectKey parent) {
return new JKleppmannTreeNode(key, refsFrom, frozen, parent, lastEffectiveOp, meta, children);
return new JKleppmannTreeNode(key, parent, lastEffectiveOp, meta, children);
}
@Override
public JKleppmannTreeNode withLastEffectiveOp(OpMove<Long, PeerId, JKleppmannTreeNodeMeta, JObjectKey> lastEffectiveOp) {
return new JKleppmannTreeNode(key, refsFrom, frozen, parent, lastEffectiveOp, meta, children);
return new JKleppmannTreeNode(key, parent, lastEffectiveOp, meta, children);
}
@Override
public JKleppmannTreeNode withMeta(JKleppmannTreeNodeMeta meta) {
return new JKleppmannTreeNode(key, refsFrom, frozen, parent, lastEffectiveOp, meta, children);
return new JKleppmannTreeNode(key, parent, lastEffectiveOp, meta, children);
}
@Override
public JKleppmannTreeNode withChildren(PMap<String, JObjectKey> children) {
return new JKleppmannTreeNode(key, refsFrom, frozen, parent, lastEffectiveOp, meta, children);
return new JKleppmannTreeNode(key, parent, lastEffectiveOp, meta, children);
}
@Override
public JKleppmannTreeNode withRefsFrom(PCollection<JDataRef> refs) {
return new JKleppmannTreeNode(key, refs, frozen, parent, lastEffectiveOp, meta, children);
}
@Override
public JKleppmannTreeNode withFrozen(boolean frozen) {
return new JKleppmannTreeNode(key, refsFrom, frozen, parent, lastEffectiveOp, meta, children);
}
@Override
public Collection<JObjectKey> collectRefsTo() {
return Stream.<JObjectKey>concat(children().values().stream(),
Optional.ofNullable(meta)
@@ -66,4 +50,8 @@ public record JKleppmannTreeNode(JObjectKey key, PCollection<JDataRef> refsFrom,
.orElse(Stream.empty()))
.collect(Collectors.toUnmodifiableSet());
}
public int estimateSize() {
return children.size() * 64;
}
}

View File

@@ -0,0 +1,50 @@
package com.usatiuk.dhfs.jkleppmanntree.structs;
import com.usatiuk.dhfs.refcount.JDataRef;
import com.usatiuk.dhfs.refcount.JDataRefcounted;
import com.usatiuk.objects.JObjectKey;
import org.pcollections.PCollection;
import org.pcollections.TreePSet;
import java.io.Serializable;
import java.util.Collection;
import java.util.Objects;
// Separate refcounting from JKleppmannTreeNode
public record JKleppmannTreeNodeHolder(PCollection<JDataRef> refsFrom, boolean frozen,
JKleppmannTreeNode node) implements JDataRefcounted, Serializable {
public JKleppmannTreeNodeHolder(JKleppmannTreeNode node) {
this(TreePSet.empty(), false, node);
}
public JKleppmannTreeNodeHolder withNode(JKleppmannTreeNode node) {
Objects.requireNonNull(node, "node");
return new JKleppmannTreeNodeHolder(refsFrom, frozen, node);
}
@Override
public JKleppmannTreeNodeHolder withRefsFrom(PCollection<JDataRef> refs) {
return new JKleppmannTreeNodeHolder(refs, frozen, node);
}
@Override
public JKleppmannTreeNodeHolder withFrozen(boolean frozen) {
return new JKleppmannTreeNodeHolder(refsFrom, frozen, node);
}
@Override
public Collection<JObjectKey> collectRefsTo() {
return node.collectRefsTo();
}
@Override
public JObjectKey key() {
return node.key();
}
@Override
public int estimateSize() {
return node.estimateSize();
}
}

View File

@@ -11,6 +11,7 @@ import jakarta.inject.Singleton;
import java.util.ArrayList;
import java.util.Optional;
// TODO: It's not actually generic right now, only longs are supported essentially
@Singleton
public class JMapHelper {
@Inject
@@ -45,7 +46,6 @@ public class JMapHelper {
}
public <K extends JMapKey> Optional<JMapEntry<K>> get(JMapHolder<K> holder, K key) {
// TODO:
return curTx.get(JMapEntry.class, makeKey(holder.key(), key)).map(e -> (JMapEntry<K>) e);
}

View File

@@ -4,12 +4,10 @@ import com.google.protobuf.ByteString;
import com.usatiuk.dhfs.peertrust.CertificateTools;
import com.usatiuk.dhfs.remoteobj.JDataRemote;
import com.usatiuk.dhfs.remoteobj.JDataRemoteDto;
import com.usatiuk.dhfs.remoteobj.JDataRemotePush;
import com.usatiuk.objects.JObjectKey;
import java.security.cert.X509Certificate;
@JDataRemotePush
public record PeerInfo(JObjectKey key, PeerId id, ByteString cert) implements JDataRemote, JDataRemoteDto {
public PeerInfo(PeerId id, byte[] cert) {
this(id.toJObjectKey(), id, ByteString.copyFrom(cert));

View File

@@ -1,7 +1,7 @@
package com.usatiuk.dhfs.peersync;
import com.usatiuk.dhfs.jkleppmanntree.JKleppmannTreeManager;
import com.usatiuk.dhfs.jkleppmanntree.structs.JKleppmannTreeNode;
import com.usatiuk.dhfs.jkleppmanntree.structs.JKleppmannTreeNodeHolder;
import com.usatiuk.dhfs.peersync.structs.JKleppmannTreeNodeMetaPeer;
import com.usatiuk.dhfs.remoteobj.RemoteTransaction;
import com.usatiuk.objects.JObjectKey;
@@ -39,7 +39,7 @@ public class PeerInfoService {
public Optional<PeerInfo> getPeerInfoImpl(JObjectKey key) {
return jObjectTxManager.run(() -> {
return curTx.get(JKleppmannTreeNode.class, key).flatMap(node -> {
return curTx.get(JKleppmannTreeNodeHolder.class, key).map(JKleppmannTreeNodeHolder::node).flatMap(node -> {
var meta = (JKleppmannTreeNodeMetaPeer) node.meta();
return remoteTx.getData(PeerInfo.class, meta.peerId());
});
@@ -63,7 +63,7 @@ public class PeerInfoService {
if (gotKey == null) {
return Optional.empty();
}
return curTx.get(JKleppmannTreeNode.class, gotKey).flatMap(node -> {
return curTx.get(JKleppmannTreeNodeHolder.class, gotKey).map(JKleppmannTreeNodeHolder::node).flatMap(node -> {
var meta = (JKleppmannTreeNodeMetaPeer) node.meta();
return remoteTx.getData(PeerInfo.class, meta.peerId());
});
@@ -73,7 +73,7 @@ public class PeerInfoService {
public List<PeerInfo> getPeers() {
return jObjectTxManager.run(() -> {
var gotKey = getTreeR().traverse(List.of());
return curTx.get(JKleppmannTreeNode.class, gotKey).map(
return curTx.get(JKleppmannTreeNodeHolder.class, gotKey).map(JKleppmannTreeNodeHolder::node).map(
node -> node.children().keySet().stream()
.map(JObjectKey::of).map(this::getPeerInfoImpl)
.filter(o -> {
@@ -88,7 +88,7 @@ public class PeerInfoService {
public List<PeerInfo> getPeersNoSelf() {
return jObjectTxManager.run(() -> {
var gotKey = getTreeR().traverse(List.of());
return curTx.get(JKleppmannTreeNode.class, gotKey).map(
return curTx.get(JKleppmannTreeNodeHolder.class, gotKey).map(JKleppmannTreeNodeHolder::node).map(
node -> node.children().keySet().stream()
.map(JObjectKey::of).map(this::getPeerInfoImpl)
.filter(o -> {
@@ -116,7 +116,7 @@ public class PeerInfoService {
if (gotKey == null) {
return;
}
var node = curTx.get(JKleppmannTreeNode.class, gotKey).orElse(null);
var node = curTx.get(JKleppmannTreeNodeHolder.class, gotKey).map(JKleppmannTreeNodeHolder::node).orElse(null);
if (node == null) {
Log.warn("Peer " + id + " not found in the tree");
return;

View File

@@ -2,6 +2,7 @@ package com.usatiuk.dhfs.peertrust;
import com.usatiuk.dhfs.invalidation.InvalidationQueueService;
import com.usatiuk.dhfs.jkleppmanntree.structs.JKleppmannTreeNode;
import com.usatiuk.dhfs.jkleppmanntree.structs.JKleppmannTreeNodeHolder;
import com.usatiuk.dhfs.peersync.PeerInfo;
import com.usatiuk.dhfs.peersync.PeerInfoService;
import com.usatiuk.dhfs.peersync.PersistentPeerDataService;
@@ -29,18 +30,18 @@ public class PeerInfoCertUpdateTxHook implements PreCommitTxHook {
// We also need to force pushing invalidation to all, in case our node acts as a "middleman"
// connecting two other nodes
// TODO: Can there be a prettier way to do this? (e.g. more general proxying of ops?)
if (cur instanceof JKleppmannTreeNode n) {
if (cur instanceof JKleppmannTreeNodeHolder n) {
if (n.key().value().equals("peers_jt_root")) {
// TODO: This is kinda sucky
Log.infov("Changed peer tree root: {0} to {1}", key, cur);
curTx.onCommit(() -> persistentPeerDataService.updateCerts());
curTx.onCommit(() -> invalidationQueueService.pushInvalidationToAll(PeerInfoService.TREE_KEY));
if (!(old instanceof JKleppmannTreeNode oldNode))
if (!(old instanceof JKleppmannTreeNodeHolder oldNode))
throw new IllegalStateException("Old node is not a tree node");
for (var curRef : oldNode.children().entrySet()) {
if (!n.children().containsKey(curRef.getKey())) {
for (var curRef : oldNode.node().children().entrySet()) {
if (!n.node().children().containsKey(curRef.getKey())) {
Log.infov("Will reset sync state for {0}", curRef.getValue());
curTx.onCommit(() -> persistentPeerDataService.resetInitialSyncDone(JKleppmannTreeNodeMetaPeer.nodeIdToPeerId(curRef.getValue())));
}

View File

@@ -0,0 +1,26 @@
package com.usatiuk.dhfs.remoteobj;
import com.usatiuk.dhfs.invalidation.IndexUpdateOp;
import com.usatiuk.dhfs.invalidation.OpHandler;
import com.usatiuk.dhfs.peersync.PeerId;
import com.usatiuk.objects.transaction.Transaction;
import com.usatiuk.objects.transaction.TransactionManager;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
@ApplicationScoped
public class IndexUpdateOpHandler implements OpHandler<IndexUpdateOp> {
@Inject
TransactionManager txm;
@Inject
Transaction curTx;
@Inject
SyncHandler syncHandler;
@Override
public void handleOp(PeerId from, IndexUpdateOp op) {
txm.run(() -> {
syncHandler.handleRemoteUpdate(from, op.key(), op.changelog(), null);
});
}
}

View File

@@ -1,11 +0,0 @@
package com.usatiuk.dhfs.remoteobj;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
public @interface JDataRemotePush {
}

View File

@@ -122,6 +122,7 @@ public class RemoteObjectServiceClient {
try {
return _batchExecutor.invokeAll(targets.stream().<Callable<Pair<PeerId, CanDeleteReply>>>map(h -> () -> {
var req = CanDeleteRequest.newBuilder().setName(JObjectKeyP.newBuilder().setName(objKey.toString()).build());
assert ourReferrers.isEmpty();
for (var ref : ourReferrers) {
req.addOurReferrers(JObjectKeyP.newBuilder().setName(ref.obj().toString()).build());
}

View File

@@ -3,7 +3,7 @@ package com.usatiuk.dhfs.rpc;
import com.usatiuk.dhfs.ProtoSerializer;
import com.usatiuk.dhfs.autosync.AutosyncProcessor;
import com.usatiuk.dhfs.invalidation.Op;
import com.usatiuk.dhfs.invalidation.OpHandler;
import com.usatiuk.dhfs.invalidation.OpHandlerService;
import com.usatiuk.dhfs.peersync.PeerId;
import com.usatiuk.dhfs.peersync.PeerManager;
import com.usatiuk.dhfs.persistence.JObjectKeyP;
@@ -42,7 +42,7 @@ public class RemoteObjectServiceServerImpl {
@Inject
RemoteTransaction remoteTx;
@Inject
OpHandler opHandler;
OpHandlerService opHandlerService;
@Inject
DtoMapperService dtoMapperService;
@Inject
@@ -120,7 +120,7 @@ public class RemoteObjectServiceServerImpl {
for (var op : ops) {
Log.infov("<-- opPush: {0} from {1}", op, from);
var handle = txm.run(() -> {
opHandler.handleOp(from, op);
opHandlerService.handleOp(from, op);
});
handles.add(handle);
}