mirror of
https://github.com/usatiuk/dhfs.git
synced 2025-10-28 20:47:49 +01:00
Compare commits
6 Commits
8cbecf1714
...
ad4ce72fdd
| Author | SHA1 | Date | |
|---|---|---|---|
| ad4ce72fdd | |||
| 26ba65fdce | |||
| 697add66d5 | |||
| a53fc5e973 | |||
| b034591091 | |||
| 07133a7186 |
@@ -80,6 +80,22 @@ public class DhfsFuseIT {
|
||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
private void checkConsistency() {
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> {
|
||||
Log.info("Listing consistency");
|
||||
var cat1 = container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/*/*");
|
||||
var cat2 = container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/*/*");
|
||||
var ls1 = container1.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse/*/*");
|
||||
var ls2 = container2.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse/*/*");
|
||||
Log.info(ls1);
|
||||
Log.info(cat1);
|
||||
Log.info(ls2);
|
||||
Log.info(cat2);
|
||||
|
||||
return ls1.equals(ls2) && cat1.equals(cat2);
|
||||
});
|
||||
}
|
||||
|
||||
@AfterEach
|
||||
void stop() {
|
||||
Stream.of(container1, container2).parallel().forEach(GenericContainer::stop);
|
||||
@@ -249,7 +265,7 @@ public class DhfsFuseIT {
|
||||
"curl --header \"Content-Type: application/json\" " +
|
||||
" --request DELETE " +
|
||||
" --data '{}' " +
|
||||
" http://localhost:8080/peers-manage/known-peers/"+c1uuid);
|
||||
" http://localhost:8080/peers-manage/known-peers/" + c1uuid);
|
||||
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "echo rewritten > /dhfs_test/fuse/testf1").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "echo jioadsd > /dhfs_test/fuse/newfile1").getExitCode());
|
||||
@@ -265,7 +281,7 @@ public class DhfsFuseIT {
|
||||
"curl --header \"Content-Type: application/json\" " +
|
||||
" --request PUT " +
|
||||
" --data '{}' " +
|
||||
" http://localhost:8080/peers-manage/known-peers/"+c1uuid);
|
||||
" http://localhost:8080/peers-manage/known-peers/" + c1uuid);
|
||||
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
|
||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
|
||||
|
||||
@@ -309,6 +325,33 @@ public class DhfsFuseIT {
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
void dirConflictTest2() throws IOException, InterruptedException, TimeoutException {
|
||||
var client = DockerClientFactory.instance().client();
|
||||
client.disconnectFromNetworkCmd().withContainerId(container1.getContainerId()).withNetworkId(network.getId()).exec();
|
||||
client.disconnectFromNetworkCmd().withContainerId(container2.getContainerId()).withNetworkId(network.getId()).exec();
|
||||
|
||||
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Lost connection to"), 60, TimeUnit.SECONDS, 1);
|
||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Lost connection to"), 60, TimeUnit.SECONDS, 1);
|
||||
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "mkdir /dhfs_test/fuse/a && echo fdsaio >> /dhfs_test/fuse/a/testf").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "mkdir /dhfs_test/fuse/a && echo exgrg >> /dhfs_test/fuse/a/testf").getExitCode());
|
||||
|
||||
client.connectToNetworkCmd().withContainerId(container1.getContainerId()).withNetworkId(network.getId()).exec();
|
||||
client.connectToNetworkCmd().withContainerId(container2.getContainerId()).withNetworkId(network.getId()).exec();
|
||||
|
||||
Log.warn("Waiting for connections");
|
||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS, 1);
|
||||
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS, 1);
|
||||
Log.warn("Connected");
|
||||
|
||||
checkConsistency();
|
||||
|
||||
var ls1 = container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/a*/*");
|
||||
Assertions.assertTrue(ls1.getStdout().contains("fdsaio"));
|
||||
Assertions.assertTrue(ls1.getStdout().contains("exgrg"));
|
||||
}
|
||||
|
||||
@Test
|
||||
void dirCycleTest() throws IOException, InterruptedException, TimeoutException {
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse").getExitCode());
|
||||
|
||||
@@ -4,6 +4,7 @@ import com.google.protobuf.ByteString;
|
||||
import com.google.protobuf.UnsafeByteOperations;
|
||||
import com.usatiuk.dhfs.jkleppmanntree.JKleppmannTreeManager;
|
||||
import com.usatiuk.dhfs.jkleppmanntree.structs.JKleppmannTreeNode;
|
||||
import com.usatiuk.dhfs.jkleppmanntree.structs.JKleppmannTreeNodeHolder;
|
||||
import com.usatiuk.dhfs.jkleppmanntree.structs.JKleppmannTreeNodeMeta;
|
||||
import com.usatiuk.dhfs.jmap.JMapEntry;
|
||||
import com.usatiuk.dhfs.jmap.JMapHelper;
|
||||
@@ -94,21 +95,21 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
private JKleppmannTreeNode getDirEntryW(String name) {
|
||||
var res = getTreeW().traverse(StreamSupport.stream(Path.of(name).spliterator(), false).map(p -> p.toString()).toList());
|
||||
if (res == null) throw new StatusRuntimeExceptionNoStacktrace(Status.NOT_FOUND);
|
||||
var ret = curTx.get(JKleppmannTreeNode.class, res).orElseThrow(() -> new StatusRuntimeException(Status.NOT_FOUND.withDescription("Tree node exists but not found as jObject: " + name)));
|
||||
var ret = curTx.get(JKleppmannTreeNodeHolder.class, res).map(JKleppmannTreeNodeHolder::node).orElseThrow(() -> new StatusRuntimeException(Status.NOT_FOUND.withDescription("Tree node exists but not found as jObject: " + name)));
|
||||
return ret;
|
||||
}
|
||||
|
||||
private JKleppmannTreeNode getDirEntryR(String name) {
|
||||
var res = getTreeR().traverse(StreamSupport.stream(Path.of(name).spliterator(), false).map(p -> p.toString()).toList());
|
||||
if (res == null) throw new StatusRuntimeExceptionNoStacktrace(Status.NOT_FOUND);
|
||||
var ret = curTx.get(JKleppmannTreeNode.class, res).orElseThrow(() -> new StatusRuntimeException(Status.NOT_FOUND.withDescription("Tree node exists but not found as jObject: " + name)));
|
||||
var ret = curTx.get(JKleppmannTreeNodeHolder.class, res).map(JKleppmannTreeNodeHolder::node).orElseThrow(() -> new StatusRuntimeException(Status.NOT_FOUND.withDescription("Tree node exists but not found as jObject: " + name)));
|
||||
return ret;
|
||||
}
|
||||
|
||||
private Optional<JKleppmannTreeNode> getDirEntryOpt(String name) {
|
||||
var res = getTreeW().traverse(StreamSupport.stream(Path.of(name).spliterator(), false).map(p -> p.toString()).toList());
|
||||
if (res == null) return Optional.empty();
|
||||
var ret = curTx.get(JKleppmannTreeNode.class, res);
|
||||
var ret = curTx.get(JKleppmannTreeNodeHolder.class, res).map(JKleppmannTreeNodeHolder::node);
|
||||
return ret;
|
||||
}
|
||||
|
||||
@@ -125,7 +126,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
} else {
|
||||
throw new StatusRuntimeException(Status.DATA_LOSS.withDescription("FsNode is not an FsNode: " + ref.key()));
|
||||
}
|
||||
} else if (ref instanceof JKleppmannTreeNode) {
|
||||
} else if (ref instanceof JKleppmannTreeNodeHolder) {
|
||||
ret = new GetattrRes(100, 100, 0700, GetattrType.DIRECTORY);
|
||||
} else {
|
||||
throw new StatusRuntimeException(Status.DATA_LOSS.withDescription("FsNode is not an FsNode: " + ref.key()));
|
||||
@@ -242,7 +243,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
return jObjectTxManager.executeTx(() -> {
|
||||
var dent = curTx.get(JData.class, uuid).orElseThrow(() -> new StatusRuntimeExceptionNoStacktrace(Status.NOT_FOUND));
|
||||
|
||||
if (dent instanceof JKleppmannTreeNode) {
|
||||
if (dent instanceof JKleppmannTreeNodeHolder) {
|
||||
return true;
|
||||
} else if (dent instanceof RemoteObjectMeta) {
|
||||
var remote = remoteTx.getData(JDataRemote.class, uuid).orElse(null);
|
||||
@@ -628,7 +629,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
var dent = curTx.get(JData.class, fileUuid).orElseThrow(() -> new StatusRuntimeExceptionNoStacktrace(Status.NOT_FOUND));
|
||||
|
||||
// FIXME:
|
||||
if (dent instanceof JKleppmannTreeNode) {
|
||||
if (dent instanceof JKleppmannTreeNodeHolder) {
|
||||
return true;
|
||||
} else if (dent instanceof RemoteObjectMeta) {
|
||||
var remote = remoteTx.getData(JDataRemote.class, fileUuid).orElse(null);
|
||||
|
||||
@@ -72,40 +72,48 @@ public class DhfsFuse extends FuseStubFS {
|
||||
|
||||
void init(@Observes @Priority(100000) StartupEvent event) {
|
||||
if (!enabled) return;
|
||||
Paths.get(root).toFile().mkdirs();
|
||||
|
||||
if (!Paths.get(root).toFile().isDirectory())
|
||||
throw new IllegalStateException("Could not create directory " + root);
|
||||
|
||||
Log.info("Mounting with root " + root);
|
||||
|
||||
var uid = new UnixSystem().getUid();
|
||||
var gid = new UnixSystem().getGid();
|
||||
|
||||
var opts = new ArrayList<String>();
|
||||
|
||||
// Assuming macFuse
|
||||
if (SystemUtils.IS_OS_MAC) {
|
||||
if (SystemUtils.IS_OS_WINDOWS) {
|
||||
opts.add("-o");
|
||||
opts.add("iosize=" + iosize);
|
||||
} else if (SystemUtils.IS_OS_LINUX) {
|
||||
// FIXME: There's something else missing: the writes still seem to be 32k max
|
||||
opts.add("auto_cache");
|
||||
opts.add("-o");
|
||||
opts.add("uid=-1");
|
||||
opts.add("-o");
|
||||
opts.add("gid=-1");
|
||||
} else {
|
||||
Paths.get(root).toFile().mkdirs();
|
||||
|
||||
if (!Paths.get(root).toFile().isDirectory())
|
||||
throw new IllegalStateException("Could not create directory " + root);
|
||||
|
||||
var uid = new UnixSystem().getUid();
|
||||
var gid = new UnixSystem().getGid();
|
||||
|
||||
// Assuming macFuse
|
||||
if (SystemUtils.IS_OS_MAC) {
|
||||
opts.add("-o");
|
||||
opts.add("iosize=" + iosize);
|
||||
} else if (SystemUtils.IS_OS_LINUX) {
|
||||
// FIXME: There's something else missing: the writes still seem to be 32k max
|
||||
// opts.add("-o");
|
||||
// opts.add("large_read");
|
||||
opts.add("-o");
|
||||
opts.add("big_writes");
|
||||
opts.add("-o");
|
||||
opts.add("max_read=" + iosize);
|
||||
opts.add("-o");
|
||||
opts.add("max_write=" + iosize);
|
||||
}
|
||||
opts.add("-o");
|
||||
opts.add("big_writes");
|
||||
opts.add("auto_cache");
|
||||
opts.add("-o");
|
||||
opts.add("max_read=" + iosize);
|
||||
opts.add("uid=" + uid);
|
||||
opts.add("-o");
|
||||
opts.add("max_write=" + iosize);
|
||||
opts.add("gid=" + gid);
|
||||
}
|
||||
opts.add("-o");
|
||||
opts.add("auto_cache");
|
||||
opts.add("-o");
|
||||
opts.add("uid=" + uid);
|
||||
opts.add("-o");
|
||||
opts.add("gid=" + gid);
|
||||
|
||||
mount(Paths.get(root), false, debug, opts.toArray(String[]::new));
|
||||
}
|
||||
|
||||
|
||||
@@ -15,7 +15,6 @@ public class KleppmannTree<TimestampT extends Comparable<TimestampT>, PeerIdT ex
|
||||
private final PeerInterface<PeerIdT> _peers;
|
||||
private final Clock<TimestampT> _clock;
|
||||
private final OpRecorder<TimestampT, PeerIdT, MetaT, NodeIdT> _opRecorder;
|
||||
private HashMap<NodeIdT, TreeNode<TimestampT, PeerIdT, MetaT, NodeIdT>> _undoCtx = null;
|
||||
|
||||
public KleppmannTree(StorageInterface<TimestampT, PeerIdT, MetaT, NodeIdT> storage,
|
||||
PeerInterface<PeerIdT> peers,
|
||||
@@ -87,7 +86,6 @@ public class KleppmannTree<TimestampT extends Comparable<TimestampT>, PeerIdT ex
|
||||
node.withParent(null)
|
||||
.withLastEffectiveOp(null)
|
||||
);
|
||||
_undoCtx.put(node.key(), node);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -217,7 +215,6 @@ public class KleppmannTree<TimestampT extends Comparable<TimestampT>, PeerIdT ex
|
||||
if (cmp < 0) {
|
||||
if (log.containsKey(op.timestamp())) return;
|
||||
var toUndo = log.newestSlice(op.timestamp(), false);
|
||||
_undoCtx = new HashMap<>();
|
||||
for (var entry : toUndo.reversed()) {
|
||||
undoOp(entry.getValue());
|
||||
}
|
||||
@@ -225,13 +222,6 @@ public class KleppmannTree<TimestampT extends Comparable<TimestampT>, PeerIdT ex
|
||||
for (var entry : toUndo) {
|
||||
redoOp(entry);
|
||||
}
|
||||
if (!_undoCtx.isEmpty()) {
|
||||
for (var e : _undoCtx.entrySet()) {
|
||||
LOGGER.log(Level.FINE, "Dropping node " + e.getKey());
|
||||
_storage.removeNode(e.getKey());
|
||||
}
|
||||
}
|
||||
_undoCtx = null;
|
||||
tryTrimLog();
|
||||
} else {
|
||||
doAndPut(op, failCreatingIfExists);
|
||||
@@ -264,24 +254,6 @@ public class KleppmannTree<TimestampT extends Comparable<TimestampT>, PeerIdT ex
|
||||
}
|
||||
|
||||
private TreeNode<TimestampT, PeerIdT, MetaT, NodeIdT> getNewNode(NodeIdT key, NodeIdT parent, MetaT meta) {
|
||||
if (_undoCtx != null) {
|
||||
var node = _undoCtx.get(key);
|
||||
if (node != null) {
|
||||
try {
|
||||
if (!node.children().isEmpty()) {
|
||||
LOGGER.log(Level.WARNING, "Not empty children for undone node " + key);
|
||||
}
|
||||
node = node.withParent(parent).withMeta(meta);
|
||||
} catch (Exception e) {
|
||||
LOGGER.log(Level.SEVERE, "Error while fixing up node " + key, e);
|
||||
node = null;
|
||||
}
|
||||
}
|
||||
if (node != null) {
|
||||
_undoCtx.remove(key);
|
||||
return node;
|
||||
}
|
||||
}
|
||||
return _storage.createNewNode(key, parent, meta);
|
||||
}
|
||||
|
||||
@@ -362,11 +334,6 @@ public class KleppmannTree<TimestampT extends Comparable<TimestampT>, PeerIdT ex
|
||||
var conflictNode = _storage.getById(conflictNodeId);
|
||||
MetaT conflictNodeMeta = conflictNode.meta();
|
||||
|
||||
if (Objects.equals(conflictNodeMeta, op.newMeta())) {
|
||||
LOGGER.finer(() -> "Node creation conflict (the same): " + conflictNode);
|
||||
return new LogRecord<>(op, null);
|
||||
}
|
||||
|
||||
LOGGER.finer(() -> "Node creation conflict: " + conflictNode);
|
||||
|
||||
String newConflictNodeName = op.newName() + ".conflict." + conflictNode.key();
|
||||
@@ -399,10 +366,6 @@ public class KleppmannTree<TimestampT extends Comparable<TimestampT>, PeerIdT ex
|
||||
var replaceNode = _storage.getById(replaceNodeId);
|
||||
var replaceNodeMeta = replaceNode.meta();
|
||||
|
||||
if (Objects.equals(replaceNodeMeta, op.newMeta())) {
|
||||
return new LogRecord<>(op, null);
|
||||
}
|
||||
|
||||
LOGGER.finer(() -> "Node replacement: " + replaceNode);
|
||||
|
||||
return new LogRecord<>(op, List.of(
|
||||
|
||||
@@ -13,6 +13,10 @@
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>io.quarkus</groupId>
|
||||
<artifactId>quarkus-smallrye-openapi</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.testcontainers</groupId>
|
||||
<artifactId>testcontainers</artifactId>
|
||||
|
||||
@@ -1,14 +1,13 @@
|
||||
package com.usatiuk.dhfs.invalidation;
|
||||
|
||||
import com.usatiuk.dhfs.peersync.PeerId;
|
||||
import com.usatiuk.dhfs.remoteobj.JDataRemoteDto;
|
||||
import com.usatiuk.objects.JObjectKey;
|
||||
import org.pcollections.PMap;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
|
||||
public record IndexUpdateOp(JObjectKey key, PMap<PeerId, Long> changelog, JDataRemoteDto data) implements Op {
|
||||
public record IndexUpdateOp(JObjectKey key, PMap<PeerId, Long> changelog) implements Op {
|
||||
@Override
|
||||
public Collection<JObjectKey> getEscapedRefs() {
|
||||
return List.of(key);
|
||||
|
||||
@@ -168,7 +168,7 @@ public class InvalidationQueueService {
|
||||
|
||||
for (var p : ops.keySet()) {
|
||||
var list = ops.get(p);
|
||||
Log.infov("Pushing invalidations to {0}: {1}", p, list);
|
||||
Log.tracev("Pushing invalidations to {0}: {1}", p, list);
|
||||
remoteObjectServiceClient.pushOps(p, list);
|
||||
commits.get(p).forEach(Runnable::run);
|
||||
}
|
||||
|
||||
@@ -1,34 +1,7 @@
|
||||
package com.usatiuk.dhfs.invalidation;
|
||||
|
||||
import com.usatiuk.dhfs.jkleppmanntree.JKleppmannTreeManager;
|
||||
import com.usatiuk.dhfs.jkleppmanntree.JKleppmannTreeOpWrapper;
|
||||
import com.usatiuk.dhfs.jkleppmanntree.JKleppmannTreePeriodicPushOp;
|
||||
import com.usatiuk.dhfs.peersync.PeerId;
|
||||
import com.usatiuk.objects.transaction.Transaction;
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
import jakarta.inject.Inject;
|
||||
|
||||
@ApplicationScoped
|
||||
public class OpHandler {
|
||||
@Inject
|
||||
PushOpHandler pushOpHandler;
|
||||
@Inject
|
||||
Transaction curTx;
|
||||
@Inject
|
||||
JKleppmannTreeManager jKleppmannTreeManager;
|
||||
@Inject
|
||||
InvalidationQueueService invalidationQueueService;
|
||||
|
||||
public void handleOp(PeerId from, Op op) {
|
||||
if (op instanceof IndexUpdateOp iu) {
|
||||
pushOpHandler.handlePush(from, iu);
|
||||
} else if (op instanceof JKleppmannTreeOpWrapper jk) {
|
||||
var tree = jKleppmannTreeManager.getTree(jk.treeName()).orElseThrow();
|
||||
tree.acceptExternalOp(from, jk);
|
||||
curTx.onCommit(() -> invalidationQueueService.pushInvalidationToOne(from, jk.treeName()));
|
||||
} else if (op instanceof JKleppmannTreePeriodicPushOp pop) {
|
||||
var tree = jKleppmannTreeManager.getTree(pop.treeName()).orElseThrow();
|
||||
tree.acceptExternalOp(from, pop);
|
||||
}
|
||||
}
|
||||
public interface OpHandler<T extends Op> {
|
||||
void handleOp(PeerId from, T op);
|
||||
}
|
||||
|
||||
@@ -0,0 +1,44 @@
|
||||
package com.usatiuk.dhfs.invalidation;
|
||||
|
||||
import com.usatiuk.dhfs.peersync.PeerId;
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
import jakarta.enterprise.inject.Instance;
|
||||
|
||||
import java.lang.reflect.ParameterizedType;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
@ApplicationScoped
|
||||
public class OpHandlerService {
|
||||
private final Map<Class<? extends Op>, OpHandler> _opHandlerMap;
|
||||
|
||||
public OpHandlerService(Instance<OpHandler<?>> OpHandlers) {
|
||||
HashMap<Class<? extends Op>, OpHandler> OpHandlerMap = new HashMap<>();
|
||||
|
||||
for (var OpHandler : OpHandlers.handles()) {
|
||||
for (var type : Arrays.stream(OpHandler.getBean().getBeanClass().getGenericInterfaces()).flatMap(
|
||||
t -> {
|
||||
if (!(t instanceof ParameterizedType pm)) return Stream.empty();
|
||||
if (pm.getRawType().equals(OpHandler.class)) return Stream.of(pm);
|
||||
return Stream.empty();
|
||||
}
|
||||
).toList()) {
|
||||
var orig = type.getActualTypeArguments()[0];
|
||||
assert Op.class.isAssignableFrom((Class<?>) orig);
|
||||
OpHandlerMap.put((Class<? extends Op>) orig, OpHandler.get());
|
||||
}
|
||||
}
|
||||
|
||||
_opHandlerMap = Map.copyOf(OpHandlerMap);
|
||||
}
|
||||
|
||||
public void handleOp(PeerId from, Op op) {
|
||||
var handler = _opHandlerMap.get(op.getClass());
|
||||
if (handler == null) {
|
||||
throw new IllegalArgumentException("No handler for op: " + op.getClass());
|
||||
}
|
||||
handler.handleOp(from, op);
|
||||
}
|
||||
}
|
||||
@@ -1,22 +0,0 @@
|
||||
package com.usatiuk.dhfs.invalidation;
|
||||
|
||||
import com.usatiuk.dhfs.peersync.PeerId;
|
||||
import com.usatiuk.dhfs.remoteobj.RemoteTransaction;
|
||||
import com.usatiuk.dhfs.remoteobj.SyncHandler;
|
||||
import com.usatiuk.objects.transaction.Transaction;
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
import jakarta.inject.Inject;
|
||||
|
||||
@ApplicationScoped
|
||||
public class PushOpHandler {
|
||||
@Inject
|
||||
Transaction curTx;
|
||||
@Inject
|
||||
SyncHandler syncHandler;
|
||||
@Inject
|
||||
RemoteTransaction remoteTransaction;
|
||||
|
||||
public void handlePush(PeerId peer, IndexUpdateOp obj) {
|
||||
syncHandler.handleRemoteUpdate(peer, obj.key(), obj.changelog(), obj.data());
|
||||
}
|
||||
}
|
||||
@@ -1,14 +1,10 @@
|
||||
package com.usatiuk.dhfs.invalidation;
|
||||
|
||||
import com.usatiuk.dhfs.peersync.PeerId;
|
||||
import com.usatiuk.dhfs.remoteobj.JDataRemoteDto;
|
||||
import com.usatiuk.dhfs.remoteobj.JDataRemotePush;
|
||||
import com.usatiuk.dhfs.remoteobj.RemoteObjectMeta;
|
||||
import com.usatiuk.dhfs.remoteobj.RemoteTransaction;
|
||||
import com.usatiuk.dhfs.syncmap.DtoMapperService;
|
||||
import com.usatiuk.objects.transaction.Transaction;
|
||||
import com.usatiuk.objects.transaction.TransactionManager;
|
||||
import io.quarkus.logging.Log;
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
import jakarta.inject.Inject;
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
@@ -23,22 +19,11 @@ public class RemoteObjectMetaOpExtractor implements OpExtractor<RemoteObjectMeta
|
||||
Transaction curTx;
|
||||
@Inject
|
||||
RemoteTransaction remoteTransaction;
|
||||
@Inject
|
||||
DtoMapperService dtoMapperService;
|
||||
|
||||
@Override
|
||||
public Pair<List<Op>, Runnable> extractOps(RemoteObjectMeta data, PeerId peerId) {
|
||||
return txm.run(() -> {
|
||||
JDataRemoteDto dto =
|
||||
data.knownType().isAnnotationPresent(JDataRemotePush.class)
|
||||
? remoteTransaction.getData(data.knownType(), data.key())
|
||||
.map(d -> dtoMapperService.toDto(d, d.dtoClass())).orElse(null)
|
||||
: null;
|
||||
|
||||
if (data.knownType().isAnnotationPresent(JDataRemotePush.class) && dto == null) {
|
||||
Log.warnv("Failed to get data for push {0} of type {1}", data.key(), data.knownType());
|
||||
}
|
||||
return Pair.of(List.of(new IndexUpdateOp(data.key(), data.changelog(), dto)), () -> {
|
||||
return Pair.of(List.of(new IndexUpdateOp(data.key(), data.changelog())), () -> {
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
@@ -2,6 +2,7 @@ package com.usatiuk.dhfs.jkleppmanntree;
|
||||
|
||||
import com.usatiuk.dhfs.invalidation.Op;
|
||||
import com.usatiuk.dhfs.jkleppmanntree.structs.JKleppmannTreeNode;
|
||||
import com.usatiuk.dhfs.jkleppmanntree.structs.JKleppmannTreeNodeHolder;
|
||||
import com.usatiuk.dhfs.jkleppmanntree.structs.JKleppmannTreeNodeMeta;
|
||||
import com.usatiuk.dhfs.jkleppmanntree.structs.JKleppmannTreePersistentData;
|
||||
import com.usatiuk.dhfs.peersync.PeerId;
|
||||
@@ -53,11 +54,11 @@ public class JKleppmannTreeManager {
|
||||
);
|
||||
curTx.put(data);
|
||||
var rootNode = new JKleppmannTreeNode(JObjectKey.of(name.value() + "_jt_root"), null, rootNodeSupplier.get());
|
||||
curTx.put(rootNode);
|
||||
curTx.put(new JKleppmannTreeNodeHolder(rootNode));
|
||||
var trashNode = new JKleppmannTreeNode(JObjectKey.of(name.value() + "_jt_trash"), null, rootNodeSupplier.get());
|
||||
curTx.put(trashNode);
|
||||
curTx.put(new JKleppmannTreeNodeHolder(trashNode));
|
||||
var lf_node = new JKleppmannTreeNode(JObjectKey.of(name.value() + "_jt_lf"), null, rootNodeSupplier.get());
|
||||
curTx.put(lf_node);
|
||||
curTx.put(new JKleppmannTreeNodeHolder(lf_node));
|
||||
}
|
||||
return new JKleppmannTree(data);
|
||||
// opObjectRegistry.registerObject(tree);
|
||||
@@ -170,60 +171,13 @@ public class JKleppmannTreeManager {
|
||||
if (Log.isTraceEnabled())
|
||||
Log.trace("Received op from " + from + ": " + jop.op().timestamp().timestamp() + " " + jop.op().childId() + "->" + jop.op().newParentId() + " as " + jop.op().newMeta().name());
|
||||
|
||||
try {
|
||||
_tree.applyExternalOp(from, jop.op());
|
||||
} catch (Exception e) {
|
||||
Log.error("Error applying external op", e);
|
||||
throw e;
|
||||
} finally {
|
||||
// FIXME:
|
||||
// Fixup the ref if it didn't really get applied
|
||||
|
||||
// if ((fileRef == null) && (jop.getOp().newMeta() instanceof JKleppmannTreeNodeMetaFile))
|
||||
// Log.error("Could not create child of pushed op: " + jop.getOp());
|
||||
|
||||
// if (jop.getOp().newMeta() instanceof JKleppmannTreeNodeMetaFile f) {
|
||||
// if (fileRef != null) {
|
||||
// var got = jObjectManager.get(jop.getOp().childId()).orElse(null);
|
||||
//
|
||||
// VoidFn remove = () -> {
|
||||
// fileRef.runWriteLockedVoid(JObjectManager.ResolutionStrategy.LOCAL_ONLY, (m, d, b, v) -> {
|
||||
// m.removeRef(jop.getOp().childId());
|
||||
// });
|
||||
// };
|
||||
//
|
||||
// if (got == null) {
|
||||
// remove.apply();
|
||||
// } else {
|
||||
// try {
|
||||
// got.rLock();
|
||||
// try {
|
||||
// got.tryResolve(JObjectManager.ResolutionStrategy.LOCAL_ONLY);
|
||||
// if (got.getData() == null || !got.getData().extractRefs().contains(f.getFileIno()))
|
||||
// remove.apply();
|
||||
// } finally {
|
||||
// got.rUnlock();
|
||||
// }
|
||||
// } catch (DeletedObjectAccessException dex) {
|
||||
// remove.apply();
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
}
|
||||
_tree.applyExternalOp(from, jop.op());
|
||||
}
|
||||
|
||||
public Op getPeriodicPushOp() {
|
||||
return new JKleppmannTreePeriodicPushOp(_treeName, persistentPeerDataService.getSelfUuid(), _clock.peekTimestamp());
|
||||
}
|
||||
|
||||
// @Override
|
||||
// public void addToTx() {
|
||||
// // FIXME: a hack
|
||||
// _persistentData.get().rwLockNoCopy();
|
||||
// _persistentData.get().rwUnlock();
|
||||
// }
|
||||
|
||||
private class JOpRecorder implements OpRecorder<Long, PeerId, JKleppmannTreeNodeMeta, JObjectKey> {
|
||||
@Override
|
||||
public void recordOp(OpMove<Long, PeerId, JKleppmannTreeNodeMeta, JObjectKey> op) {
|
||||
@@ -291,8 +245,8 @@ public class JKleppmannTreeManager {
|
||||
|
||||
@Override
|
||||
public JKleppmannTreeNode getById(JObjectKey id) {
|
||||
var got = curTx.get(JKleppmannTreeNode.class, id);
|
||||
return got.orElse(null);
|
||||
var got = curTx.get(JKleppmannTreeNodeHolder.class, id);
|
||||
return got.map(JKleppmannTreeNodeHolder::node).orElse(null);
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -302,12 +256,14 @@ public class JKleppmannTreeManager {
|
||||
|
||||
@Override
|
||||
public void putNode(TreeNode<Long, PeerId, JKleppmannTreeNodeMeta, JObjectKey> node) {
|
||||
curTx.put(((JKleppmannTreeNode) node));
|
||||
curTx.put(curTx.get(JKleppmannTreeNodeHolder.class, node.key())
|
||||
.map(n -> n.withNode((JKleppmannTreeNode) node))
|
||||
.orElse(new JKleppmannTreeNodeHolder((JKleppmannTreeNode) node)));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void removeNode(JObjectKey id) {
|
||||
// TODO
|
||||
// GC
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -0,0 +1,31 @@
|
||||
package com.usatiuk.dhfs.jkleppmanntree;
|
||||
|
||||
import com.usatiuk.dhfs.invalidation.InvalidationQueueService;
|
||||
import com.usatiuk.dhfs.invalidation.OpHandler;
|
||||
import com.usatiuk.dhfs.peersync.PeerId;
|
||||
import com.usatiuk.objects.transaction.Transaction;
|
||||
import com.usatiuk.objects.transaction.TransactionManager;
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
import jakarta.inject.Inject;
|
||||
|
||||
@ApplicationScoped
|
||||
public class JKleppmannTreeOpHandler implements OpHandler<JKleppmannTreeOpWrapper> {
|
||||
@Inject
|
||||
TransactionManager txm;
|
||||
@Inject
|
||||
Transaction curTx;
|
||||
@Inject
|
||||
JKleppmannTreeManager jKleppmannTreeManager;
|
||||
@Inject
|
||||
InvalidationQueueService invalidationQueueService;
|
||||
|
||||
@Override
|
||||
public void handleOp(PeerId from, JKleppmannTreeOpWrapper op) {
|
||||
txm.run(()->{
|
||||
var tree = jKleppmannTreeManager.getTree(op.treeName()).orElseThrow();
|
||||
tree.acceptExternalOp(from, op);
|
||||
// Push ack op
|
||||
curTx.onCommit(() -> invalidationQueueService.pushInvalidationToOne(from, op.treeName()));
|
||||
});
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,23 @@
|
||||
package com.usatiuk.dhfs.jkleppmanntree;
|
||||
|
||||
import com.usatiuk.dhfs.invalidation.OpHandler;
|
||||
import com.usatiuk.dhfs.peersync.PeerId;
|
||||
import com.usatiuk.objects.transaction.TransactionManager;
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
import jakarta.inject.Inject;
|
||||
|
||||
@ApplicationScoped
|
||||
public class JKleppmannTreePeriodicOpHandler implements OpHandler<JKleppmannTreePeriodicPushOp> {
|
||||
@Inject
|
||||
TransactionManager txm;
|
||||
@Inject
|
||||
JKleppmannTreeManager jKleppmannTreeManager;
|
||||
|
||||
@Override
|
||||
public void handleOp(PeerId from, JKleppmannTreePeriodicPushOp op) {
|
||||
txm.run(() -> {
|
||||
var tree = jKleppmannTreeManager.getTree(op.treeName()).orElseThrow();
|
||||
tree.acceptExternalOp(from, op);
|
||||
});
|
||||
}
|
||||
}
|
||||
@@ -1,16 +1,12 @@
|
||||
package com.usatiuk.dhfs.jkleppmanntree.structs;
|
||||
|
||||
import com.usatiuk.dhfs.peersync.PeerId;
|
||||
import com.usatiuk.dhfs.refcount.JDataRef;
|
||||
import com.usatiuk.dhfs.refcount.JDataRefcounted;
|
||||
import com.usatiuk.kleppmanntree.OpMove;
|
||||
import com.usatiuk.kleppmanntree.TreeNode;
|
||||
import com.usatiuk.objects.JObjectKey;
|
||||
import jakarta.annotation.Nullable;
|
||||
import org.pcollections.HashTreePMap;
|
||||
import org.pcollections.PCollection;
|
||||
import org.pcollections.PMap;
|
||||
import org.pcollections.TreePSet;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.Collection;
|
||||
@@ -18,47 +14,35 @@ import java.util.Optional;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
// FIXME: Ideally this is two classes?
|
||||
public record JKleppmannTreeNode(JObjectKey key, PCollection<JDataRef> refsFrom, boolean frozen, JObjectKey parent,
|
||||
public record JKleppmannTreeNode(JObjectKey key, JObjectKey parent,
|
||||
OpMove<Long, PeerId, JKleppmannTreeNodeMeta, JObjectKey> lastEffectiveOp,
|
||||
@Nullable JKleppmannTreeNodeMeta meta,
|
||||
PMap<String, JObjectKey> children) implements TreeNode<Long, PeerId, JKleppmannTreeNodeMeta, JObjectKey>, JDataRefcounted, Serializable {
|
||||
PMap<String, JObjectKey> children) implements TreeNode<Long, PeerId, JKleppmannTreeNodeMeta, JObjectKey>, Serializable {
|
||||
|
||||
public JKleppmannTreeNode(JObjectKey id, JObjectKey parent, JKleppmannTreeNodeMeta meta) {
|
||||
this(id, TreePSet.empty(), false, parent, null, meta, HashTreePMap.empty());
|
||||
this(id, parent, null, meta, HashTreePMap.empty());
|
||||
}
|
||||
|
||||
@Override
|
||||
public JKleppmannTreeNode withParent(JObjectKey parent) {
|
||||
return new JKleppmannTreeNode(key, refsFrom, frozen, parent, lastEffectiveOp, meta, children);
|
||||
return new JKleppmannTreeNode(key, parent, lastEffectiveOp, meta, children);
|
||||
}
|
||||
|
||||
@Override
|
||||
public JKleppmannTreeNode withLastEffectiveOp(OpMove<Long, PeerId, JKleppmannTreeNodeMeta, JObjectKey> lastEffectiveOp) {
|
||||
return new JKleppmannTreeNode(key, refsFrom, frozen, parent, lastEffectiveOp, meta, children);
|
||||
return new JKleppmannTreeNode(key, parent, lastEffectiveOp, meta, children);
|
||||
}
|
||||
|
||||
@Override
|
||||
public JKleppmannTreeNode withMeta(JKleppmannTreeNodeMeta meta) {
|
||||
return new JKleppmannTreeNode(key, refsFrom, frozen, parent, lastEffectiveOp, meta, children);
|
||||
return new JKleppmannTreeNode(key, parent, lastEffectiveOp, meta, children);
|
||||
}
|
||||
|
||||
@Override
|
||||
public JKleppmannTreeNode withChildren(PMap<String, JObjectKey> children) {
|
||||
return new JKleppmannTreeNode(key, refsFrom, frozen, parent, lastEffectiveOp, meta, children);
|
||||
return new JKleppmannTreeNode(key, parent, lastEffectiveOp, meta, children);
|
||||
}
|
||||
|
||||
@Override
|
||||
public JKleppmannTreeNode withRefsFrom(PCollection<JDataRef> refs) {
|
||||
return new JKleppmannTreeNode(key, refs, frozen, parent, lastEffectiveOp, meta, children);
|
||||
}
|
||||
|
||||
@Override
|
||||
public JKleppmannTreeNode withFrozen(boolean frozen) {
|
||||
return new JKleppmannTreeNode(key, refsFrom, frozen, parent, lastEffectiveOp, meta, children);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Collection<JObjectKey> collectRefsTo() {
|
||||
return Stream.<JObjectKey>concat(children().values().stream(),
|
||||
Optional.ofNullable(meta)
|
||||
@@ -66,4 +50,8 @@ public record JKleppmannTreeNode(JObjectKey key, PCollection<JDataRef> refsFrom,
|
||||
.orElse(Stream.empty()))
|
||||
.collect(Collectors.toUnmodifiableSet());
|
||||
}
|
||||
|
||||
public int estimateSize() {
|
||||
return children.size() * 64;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,50 @@
|
||||
package com.usatiuk.dhfs.jkleppmanntree.structs;
|
||||
|
||||
import com.usatiuk.dhfs.refcount.JDataRef;
|
||||
import com.usatiuk.dhfs.refcount.JDataRefcounted;
|
||||
import com.usatiuk.objects.JObjectKey;
|
||||
import org.pcollections.PCollection;
|
||||
import org.pcollections.TreePSet;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.Collection;
|
||||
import java.util.Objects;
|
||||
|
||||
// Separate refcounting from JKleppmannTreeNode
|
||||
public record JKleppmannTreeNodeHolder(PCollection<JDataRef> refsFrom, boolean frozen,
|
||||
JKleppmannTreeNode node) implements JDataRefcounted, Serializable {
|
||||
|
||||
public JKleppmannTreeNodeHolder(JKleppmannTreeNode node) {
|
||||
this(TreePSet.empty(), false, node);
|
||||
}
|
||||
|
||||
public JKleppmannTreeNodeHolder withNode(JKleppmannTreeNode node) {
|
||||
Objects.requireNonNull(node, "node");
|
||||
return new JKleppmannTreeNodeHolder(refsFrom, frozen, node);
|
||||
}
|
||||
|
||||
@Override
|
||||
public JKleppmannTreeNodeHolder withRefsFrom(PCollection<JDataRef> refs) {
|
||||
return new JKleppmannTreeNodeHolder(refs, frozen, node);
|
||||
}
|
||||
|
||||
@Override
|
||||
public JKleppmannTreeNodeHolder withFrozen(boolean frozen) {
|
||||
return new JKleppmannTreeNodeHolder(refsFrom, frozen, node);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Collection<JObjectKey> collectRefsTo() {
|
||||
return node.collectRefsTo();
|
||||
}
|
||||
|
||||
@Override
|
||||
public JObjectKey key() {
|
||||
return node.key();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int estimateSize() {
|
||||
return node.estimateSize();
|
||||
}
|
||||
}
|
||||
@@ -11,6 +11,7 @@ import jakarta.inject.Singleton;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Optional;
|
||||
|
||||
// TODO: It's not actually generic right now, only longs are supported essentially
|
||||
@Singleton
|
||||
public class JMapHelper {
|
||||
@Inject
|
||||
@@ -45,7 +46,6 @@ public class JMapHelper {
|
||||
}
|
||||
|
||||
public <K extends JMapKey> Optional<JMapEntry<K>> get(JMapHolder<K> holder, K key) {
|
||||
// TODO:
|
||||
return curTx.get(JMapEntry.class, makeKey(holder.key(), key)).map(e -> (JMapEntry<K>) e);
|
||||
}
|
||||
|
||||
|
||||
@@ -4,12 +4,10 @@ import com.google.protobuf.ByteString;
|
||||
import com.usatiuk.dhfs.peertrust.CertificateTools;
|
||||
import com.usatiuk.dhfs.remoteobj.JDataRemote;
|
||||
import com.usatiuk.dhfs.remoteobj.JDataRemoteDto;
|
||||
import com.usatiuk.dhfs.remoteobj.JDataRemotePush;
|
||||
import com.usatiuk.objects.JObjectKey;
|
||||
|
||||
import java.security.cert.X509Certificate;
|
||||
|
||||
@JDataRemotePush
|
||||
public record PeerInfo(JObjectKey key, PeerId id, ByteString cert) implements JDataRemote, JDataRemoteDto {
|
||||
public PeerInfo(PeerId id, byte[] cert) {
|
||||
this(id.toJObjectKey(), id, ByteString.copyFrom(cert));
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
package com.usatiuk.dhfs.peersync;
|
||||
|
||||
import com.usatiuk.dhfs.jkleppmanntree.JKleppmannTreeManager;
|
||||
import com.usatiuk.dhfs.jkleppmanntree.structs.JKleppmannTreeNode;
|
||||
import com.usatiuk.dhfs.jkleppmanntree.structs.JKleppmannTreeNodeHolder;
|
||||
import com.usatiuk.dhfs.peersync.structs.JKleppmannTreeNodeMetaPeer;
|
||||
import com.usatiuk.dhfs.remoteobj.RemoteTransaction;
|
||||
import com.usatiuk.objects.JObjectKey;
|
||||
@@ -39,7 +39,7 @@ public class PeerInfoService {
|
||||
|
||||
public Optional<PeerInfo> getPeerInfoImpl(JObjectKey key) {
|
||||
return jObjectTxManager.run(() -> {
|
||||
return curTx.get(JKleppmannTreeNode.class, key).flatMap(node -> {
|
||||
return curTx.get(JKleppmannTreeNodeHolder.class, key).map(JKleppmannTreeNodeHolder::node).flatMap(node -> {
|
||||
var meta = (JKleppmannTreeNodeMetaPeer) node.meta();
|
||||
return remoteTx.getData(PeerInfo.class, meta.peerId());
|
||||
});
|
||||
@@ -63,7 +63,7 @@ public class PeerInfoService {
|
||||
if (gotKey == null) {
|
||||
return Optional.empty();
|
||||
}
|
||||
return curTx.get(JKleppmannTreeNode.class, gotKey).flatMap(node -> {
|
||||
return curTx.get(JKleppmannTreeNodeHolder.class, gotKey).map(JKleppmannTreeNodeHolder::node).flatMap(node -> {
|
||||
var meta = (JKleppmannTreeNodeMetaPeer) node.meta();
|
||||
return remoteTx.getData(PeerInfo.class, meta.peerId());
|
||||
});
|
||||
@@ -73,7 +73,7 @@ public class PeerInfoService {
|
||||
public List<PeerInfo> getPeers() {
|
||||
return jObjectTxManager.run(() -> {
|
||||
var gotKey = getTreeR().traverse(List.of());
|
||||
return curTx.get(JKleppmannTreeNode.class, gotKey).map(
|
||||
return curTx.get(JKleppmannTreeNodeHolder.class, gotKey).map(JKleppmannTreeNodeHolder::node).map(
|
||||
node -> node.children().keySet().stream()
|
||||
.map(JObjectKey::of).map(this::getPeerInfoImpl)
|
||||
.filter(o -> {
|
||||
@@ -88,7 +88,7 @@ public class PeerInfoService {
|
||||
public List<PeerInfo> getPeersNoSelf() {
|
||||
return jObjectTxManager.run(() -> {
|
||||
var gotKey = getTreeR().traverse(List.of());
|
||||
return curTx.get(JKleppmannTreeNode.class, gotKey).map(
|
||||
return curTx.get(JKleppmannTreeNodeHolder.class, gotKey).map(JKleppmannTreeNodeHolder::node).map(
|
||||
node -> node.children().keySet().stream()
|
||||
.map(JObjectKey::of).map(this::getPeerInfoImpl)
|
||||
.filter(o -> {
|
||||
@@ -116,7 +116,7 @@ public class PeerInfoService {
|
||||
if (gotKey == null) {
|
||||
return;
|
||||
}
|
||||
var node = curTx.get(JKleppmannTreeNode.class, gotKey).orElse(null);
|
||||
var node = curTx.get(JKleppmannTreeNodeHolder.class, gotKey).map(JKleppmannTreeNodeHolder::node).orElse(null);
|
||||
if (node == null) {
|
||||
Log.warn("Peer " + id + " not found in the tree");
|
||||
return;
|
||||
|
||||
@@ -2,6 +2,7 @@ package com.usatiuk.dhfs.peertrust;
|
||||
|
||||
import com.usatiuk.dhfs.invalidation.InvalidationQueueService;
|
||||
import com.usatiuk.dhfs.jkleppmanntree.structs.JKleppmannTreeNode;
|
||||
import com.usatiuk.dhfs.jkleppmanntree.structs.JKleppmannTreeNodeHolder;
|
||||
import com.usatiuk.dhfs.peersync.PeerInfo;
|
||||
import com.usatiuk.dhfs.peersync.PeerInfoService;
|
||||
import com.usatiuk.dhfs.peersync.PersistentPeerDataService;
|
||||
@@ -29,18 +30,18 @@ public class PeerInfoCertUpdateTxHook implements PreCommitTxHook {
|
||||
// We also need to force pushing invalidation to all, in case our node acts as a "middleman"
|
||||
// connecting two other nodes
|
||||
// TODO: Can there be a prettier way to do this? (e.g. more general proxying of ops?)
|
||||
if (cur instanceof JKleppmannTreeNode n) {
|
||||
if (cur instanceof JKleppmannTreeNodeHolder n) {
|
||||
if (n.key().value().equals("peers_jt_root")) {
|
||||
// TODO: This is kinda sucky
|
||||
Log.infov("Changed peer tree root: {0} to {1}", key, cur);
|
||||
|
||||
curTx.onCommit(() -> persistentPeerDataService.updateCerts());
|
||||
curTx.onCommit(() -> invalidationQueueService.pushInvalidationToAll(PeerInfoService.TREE_KEY));
|
||||
if (!(old instanceof JKleppmannTreeNode oldNode))
|
||||
if (!(old instanceof JKleppmannTreeNodeHolder oldNode))
|
||||
throw new IllegalStateException("Old node is not a tree node");
|
||||
|
||||
for (var curRef : oldNode.children().entrySet()) {
|
||||
if (!n.children().containsKey(curRef.getKey())) {
|
||||
for (var curRef : oldNode.node().children().entrySet()) {
|
||||
if (!n.node().children().containsKey(curRef.getKey())) {
|
||||
Log.infov("Will reset sync state for {0}", curRef.getValue());
|
||||
curTx.onCommit(() -> persistentPeerDataService.resetInitialSyncDone(JKleppmannTreeNodeMetaPeer.nodeIdToPeerId(curRef.getValue())));
|
||||
}
|
||||
|
||||
@@ -0,0 +1,26 @@
|
||||
package com.usatiuk.dhfs.remoteobj;
|
||||
|
||||
import com.usatiuk.dhfs.invalidation.IndexUpdateOp;
|
||||
import com.usatiuk.dhfs.invalidation.OpHandler;
|
||||
import com.usatiuk.dhfs.peersync.PeerId;
|
||||
import com.usatiuk.objects.transaction.Transaction;
|
||||
import com.usatiuk.objects.transaction.TransactionManager;
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
import jakarta.inject.Inject;
|
||||
|
||||
@ApplicationScoped
|
||||
public class IndexUpdateOpHandler implements OpHandler<IndexUpdateOp> {
|
||||
@Inject
|
||||
TransactionManager txm;
|
||||
@Inject
|
||||
Transaction curTx;
|
||||
@Inject
|
||||
SyncHandler syncHandler;
|
||||
|
||||
@Override
|
||||
public void handleOp(PeerId from, IndexUpdateOp op) {
|
||||
txm.run(() -> {
|
||||
syncHandler.handleRemoteUpdate(from, op.key(), op.changelog(), null);
|
||||
});
|
||||
}
|
||||
}
|
||||
@@ -1,11 +0,0 @@
|
||||
package com.usatiuk.dhfs.remoteobj;
|
||||
|
||||
import java.lang.annotation.ElementType;
|
||||
import java.lang.annotation.Retention;
|
||||
import java.lang.annotation.RetentionPolicy;
|
||||
import java.lang.annotation.Target;
|
||||
|
||||
@Retention(RetentionPolicy.RUNTIME)
|
||||
@Target(ElementType.TYPE)
|
||||
public @interface JDataRemotePush {
|
||||
}
|
||||
@@ -122,6 +122,7 @@ public class RemoteObjectServiceClient {
|
||||
try {
|
||||
return _batchExecutor.invokeAll(targets.stream().<Callable<Pair<PeerId, CanDeleteReply>>>map(h -> () -> {
|
||||
var req = CanDeleteRequest.newBuilder().setName(JObjectKeyP.newBuilder().setName(objKey.toString()).build());
|
||||
assert ourReferrers.isEmpty();
|
||||
for (var ref : ourReferrers) {
|
||||
req.addOurReferrers(JObjectKeyP.newBuilder().setName(ref.obj().toString()).build());
|
||||
}
|
||||
|
||||
@@ -3,7 +3,7 @@ package com.usatiuk.dhfs.rpc;
|
||||
import com.usatiuk.dhfs.ProtoSerializer;
|
||||
import com.usatiuk.dhfs.autosync.AutosyncProcessor;
|
||||
import com.usatiuk.dhfs.invalidation.Op;
|
||||
import com.usatiuk.dhfs.invalidation.OpHandler;
|
||||
import com.usatiuk.dhfs.invalidation.OpHandlerService;
|
||||
import com.usatiuk.dhfs.peersync.PeerId;
|
||||
import com.usatiuk.dhfs.peersync.PeerManager;
|
||||
import com.usatiuk.dhfs.persistence.JObjectKeyP;
|
||||
@@ -42,7 +42,7 @@ public class RemoteObjectServiceServerImpl {
|
||||
@Inject
|
||||
RemoteTransaction remoteTx;
|
||||
@Inject
|
||||
OpHandler opHandler;
|
||||
OpHandlerService opHandlerService;
|
||||
@Inject
|
||||
DtoMapperService dtoMapperService;
|
||||
@Inject
|
||||
@@ -120,7 +120,7 @@ public class RemoteObjectServiceServerImpl {
|
||||
for (var op : ops) {
|
||||
Log.infov("<-- opPush: {0} from {1}", op, from);
|
||||
var handle = txm.run(() -> {
|
||||
opHandler.handleOp(from, op);
|
||||
opHandlerService.handleOp(from, op);
|
||||
});
|
||||
handles.add(handle);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user