getEscapedRefs

This commit is contained in:
2025-02-16 13:39:29 +01:00
parent bcd55835ca
commit 1f30af50df
5 changed files with 27 additions and 42 deletions

View File

@@ -3,19 +3,22 @@ package com.usatiuk.dhfs.objects.jkleppmanntree;
import com.usatiuk.dhfs.objects.JObjectKey;
import com.usatiuk.dhfs.objects.PeerId;
import com.usatiuk.dhfs.objects.jkleppmanntree.structs.JKleppmannTreeNodeMeta;
import com.usatiuk.dhfs.objects.jkleppmanntree.structs.JKleppmannTreeNodeMetaFile;
import com.usatiuk.dhfs.objects.repository.invalidation.Op;
import com.usatiuk.kleppmanntree.OpMove;
import java.io.Serializable;
import java.util.Collection;
import java.util.List;
// Wrapper to avoid having to specify generic types
public record JKleppmannTreeOpWrapper(JObjectKey treeName,
OpMove<Long, PeerId, JKleppmannTreeNodeMeta, JObjectKey> op) implements Op, Serializable {
// @Override
// public Collection<JObjectKey> getEscapedRefs() {
// if (_op.newMeta() instanceof JKleppmannTreeNodeMetaFile mf) {
// return List.of(mf.getFileIno());
// }
// return List.of();
// }
@Override
public Collection<JObjectKey> getEscapedRefs() {
if (op.newMeta() instanceof JKleppmannTreeNodeMetaFile mf) {
return List.of(mf.getFileIno());
}
return List.of();
}
}

View File

@@ -139,19 +139,12 @@ public class RemoteObjectServiceClient {
// }
//
public OpPushReply pushOps(PeerId target, List<Op> ops) {
// for (Op op : ops) {
// for (var ref : op.getEscapedRefs()) {
// jObjectTxManager.executeTx(() -> {
// jObjectManager.get(ref).ifPresent(JObject::markSeen);
// });
// }
// }
// var builder = OpPushMsg.newBuilder()
// .setSelfUuid(persistentPeerDataService.getSelfUuid().toString())
// .setQueueId(queueName);
// for (var op : ops)
// builder.addMsg(opProtoSerializer.serialize(op));
for (Op op : ops) {
txm.run(() -> {
for (var ref : op.getEscapedRefs()) {
curTx.get(RemoteObjectMeta.class, ref).map(m -> m.withSeen(true)).ifPresent(curTx::put);
}
});
var serialized = opProtoSerializer.serialize(op);
var built = OpPushRequest.newBuilder().addMsg(serialized).build();
rpcClientFactory.withObjSyncClient(target, (tgt, client) -> client.opPush(built));

View File

@@ -5,8 +5,16 @@ import com.usatiuk.dhfs.objects.PeerId;
import com.usatiuk.dhfs.objects.RemoteObjectMeta;
import org.pcollections.PMap;
import java.util.Collection;
import java.util.List;
public record IndexUpdateOp(JObjectKey key, PMap<PeerId, Long> changelog) implements Op {
public IndexUpdateOp(RemoteObjectMeta object) {
this(object.key(), object.changelog());
}
@Override
public Collection<JObjectKey> getEscapedRefs() {
return List.of(key);
}
}

View File

@@ -3,12 +3,8 @@ package com.usatiuk.dhfs.objects.repository.invalidation;
import com.usatiuk.dhfs.objects.JData;
import com.usatiuk.dhfs.objects.JObjectKey;
import com.usatiuk.dhfs.objects.PeerId;
import com.usatiuk.dhfs.objects.TransactionManager;
import com.usatiuk.dhfs.objects.repository.PeerManager;
import com.usatiuk.dhfs.objects.repository.PersistentPeerDataService;
import com.usatiuk.dhfs.objects.repository.RemoteObjectServiceClient;
import com.usatiuk.dhfs.objects.repository.peersync.PeerInfoService;
import com.usatiuk.dhfs.objects.transaction.Transaction;
import com.usatiuk.dhfs.utils.HashSetDelayedBlockingQueue;
import io.quarkus.logging.Log;
import io.quarkus.runtime.ShutdownEvent;
@@ -34,14 +30,6 @@ public class InvalidationQueueService {
@Inject
PeerManager remoteHostManager;
@Inject
RemoteObjectServiceClient remoteObjectServiceClient;
@Inject
TransactionManager txm;
@Inject
Transaction curTx;
@Inject
PersistentPeerDataService persistentPeerDataService;
@Inject
DeferredInvalidationQueueService deferredInvalidationQueueService;
@Inject
PeerInfoService peerInfoService;
@@ -148,7 +136,6 @@ public class InvalidationQueueService {
}
public void pushInvalidationToAll(JObjectKey key) {
// if (obj.getMeta().isOnlyLocal()) return;
while (true) {
var queue = _toAllQueue.get();
if (queue == null) {
@@ -164,7 +151,6 @@ public class InvalidationQueueService {
}
public void pushInvalidationToOne(PeerId host, JObjectKey obj) {
// if (obj.getMeta().isOnlyLocal()) return;
if (remoteHostManager.isReachable(host))
_queue.add(Pair.of(host, obj));
else
@@ -172,18 +158,9 @@ public class InvalidationQueueService {
}
public void pushInvalidationToOne(PeerId host, JData obj) {
// if (obj.getMeta().isOnlyLocal()) return;
pushInvalidationToOne(host, obj.key());
}
// public void pushInvalidationToAll(String name) {
// pushInvalidationToAll(jObjectManager.get(name).orElseThrow(() -> new IllegalArgumentException("Object " + name + " not found")));
// }
//
// public void pushInvalidationToOne(PeerId host, JObjectKey name) {
// pushInvalidationToOne(host, jObjectManager.get(name).orElseThrow(() -> new IllegalArgumentException("Object " + name + " not found")));
// }
protected void pushDeferredInvalidations(PeerId host, JObjectKey name) {
_queue.add(Pair.of(host, name));
}

View File

@@ -1,8 +1,12 @@
package com.usatiuk.dhfs.objects.repository.invalidation;
import com.usatiuk.autoprotomap.runtime.ProtoMirror;
import com.usatiuk.dhfs.objects.JObjectKey;
import com.usatiuk.dhfs.objects.repository.OpPushPayload;
import java.util.Collection;
@ProtoMirror(OpPushPayload.class)
public interface Op {
Collection<JObjectKey> getEscapedRefs();
}