mirror of
https://github.com/usatiuk/dhfs.git
synced 2025-10-28 20:47:49 +01:00
some deletion fixes
This commit is contained in:
@@ -2,6 +2,7 @@ package com.usatiuk.dhfs.objects;
|
||||
|
||||
import com.usatiuk.dhfs.objects.jkleppmanntree.structs.JKleppmannTreeNode;
|
||||
import com.usatiuk.dhfs.objects.transaction.Transaction;
|
||||
import io.quarkus.logging.Log;
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
import jakarta.inject.Inject;
|
||||
|
||||
@@ -17,7 +18,8 @@ public class RefcounterTxHook implements PreCommitTxHook {
|
||||
return found;
|
||||
}
|
||||
|
||||
if (cur instanceof RemoteObjectMeta || cur instanceof JKleppmannTreeNode) {
|
||||
if (cur instanceof RemoteObjectDataWrapper<?> || cur instanceof JKleppmannTreeNode) {
|
||||
// FIXME:
|
||||
return new RemoteObjectMeta(key);
|
||||
} else {
|
||||
return found;
|
||||
@@ -39,6 +41,7 @@ public class RefcounterTxHook implements PreCommitTxHook {
|
||||
if (!oldRefs.contains(curRef)) {
|
||||
var referenced = getRef(refCur, curRef);
|
||||
curTx.put(referenced.withRefsFrom(referenced.refsFrom().plus(key)));
|
||||
Log.tracev("Added ref from {0} to {1}", key, curRef);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -46,6 +49,7 @@ public class RefcounterTxHook implements PreCommitTxHook {
|
||||
if (!curRefs.contains(oldRef)) {
|
||||
var referenced = getRef(refCur, oldRef);
|
||||
curTx.put(referenced.withRefsFrom(referenced.refsFrom().minus(key)));
|
||||
Log.tracev("Removed ref from {0} to {1}", key, oldRef);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -59,6 +63,7 @@ public class RefcounterTxHook implements PreCommitTxHook {
|
||||
for (var newRef : refCur.collectRefsTo()) {
|
||||
var referenced = getRef(refCur, newRef);
|
||||
curTx.put(referenced.withRefsFrom(referenced.refsFrom().plus(key)));
|
||||
Log.tracev("Added ref from {0} to {1}", key, newRef);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -71,6 +76,7 @@ public class RefcounterTxHook implements PreCommitTxHook {
|
||||
for (var removedRef : refCur.collectRefsTo()) {
|
||||
var referenced = getRef(refCur, removedRef);
|
||||
curTx.put(referenced.withRefsFrom(referenced.refsFrom().minus(key)));
|
||||
Log.tracev("Removed ref from {0} to {1}", key, removedRef);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -126,23 +126,27 @@ public class RemoteObjectDeleter {
|
||||
}
|
||||
|
||||
var knownHosts = peerInfoService.getPeersNoSelf();
|
||||
RemoteObjectMeta finalTarget = target;
|
||||
List<PeerId> missing = knownHosts.stream()
|
||||
.map(PeerInfo::id)
|
||||
.filter(id -> !target.confirmedDeletes().contains(id)).toList();
|
||||
.filter(id -> !finalTarget.confirmedDeletes().contains(id)).toList();
|
||||
|
||||
var ret = remoteObjectServiceClient.canDelete(missing, objName, target.refsFrom());
|
||||
|
||||
long ok = 0;
|
||||
|
||||
for (var r : ret) {
|
||||
if (!r.getDeletionCandidate()) {
|
||||
if (!r.getValue().getDeletionCandidate()) {
|
||||
// for (var rr : r.getReferrersList())
|
||||
// autoSyncProcessor.add(rr);
|
||||
} else {
|
||||
target = target.withConfirmedDeletes(target.confirmedDeletes().plus(r.getKey()));
|
||||
ok++;
|
||||
}
|
||||
}
|
||||
|
||||
curTx.put(target);
|
||||
|
||||
if (ok != missing.size()) {
|
||||
Log.debugv("Delaying deletion check of {0}", objName);
|
||||
return true;
|
||||
@@ -175,7 +179,7 @@ public class RemoteObjectDeleter {
|
||||
if (!obj.seen())
|
||||
return true;
|
||||
|
||||
var knownHosts = peerInfoService.getPeers();
|
||||
var knownHosts = peerInfoService.getPeersNoSelf();
|
||||
boolean missing = false;
|
||||
for (var x : knownHosts) {
|
||||
if (!obj.confirmedDeletes().contains(x.id())) {
|
||||
|
||||
@@ -159,15 +159,15 @@ public class RemoteObjectServiceClient {
|
||||
return OpPushReply.getDefaultInstance();
|
||||
}
|
||||
|
||||
public Collection<CanDeleteReply> canDelete(Collection<PeerId> targets, JObjectKey object, Collection<JObjectKey> ourReferrers) {
|
||||
public Collection<Pair<PeerId, CanDeleteReply>> canDelete(Collection<PeerId> targets, JObjectKey object, Collection<JObjectKey> ourReferrers) {
|
||||
Log.trace("Asking canDelete for " + object + " from " + targets.stream().map(PeerId::toString).collect(Collectors.joining(", ")));
|
||||
try {
|
||||
return _batchExecutor.invokeAll(targets.stream().<Callable<CanDeleteReply>>map(h -> () -> {
|
||||
return _batchExecutor.invokeAll(targets.stream().<Callable<Pair<PeerId, CanDeleteReply>>>map(h -> () -> {
|
||||
var req = CanDeleteRequest.newBuilder().setName(object.toString());
|
||||
for (var ref : ourReferrers) {
|
||||
req.addOurReferrers(ref.toString());
|
||||
}
|
||||
return rpcClientFactory.withObjSyncClient(h, (p, client) -> client.canDelete(req.build()));
|
||||
return Pair.of(h, rpcClientFactory.withObjSyncClient(h, (p, client) -> client.canDelete(req.build())));
|
||||
}).toList()).stream().map(f -> {
|
||||
try {
|
||||
return f.get();
|
||||
|
||||
Reference in New Issue
Block a user