mirror of
https://github.com/usatiuk/dhfs.git
synced 2025-10-28 20:47:49 +01:00
hopefully working deletion
todo: mark seen
This commit is contained in:
@@ -9,6 +9,8 @@ import jakarta.inject.Inject;
|
||||
public class DeleterTxHook implements PreCommitTxHook {
|
||||
@Inject
|
||||
Transaction curTx;
|
||||
@Inject
|
||||
RemoteObjectDeleter remoteObjectDeleter;
|
||||
|
||||
private boolean canDelete(JDataRefcounted data) {
|
||||
return !data.frozen() && data.refsFrom().isEmpty();
|
||||
@@ -16,14 +18,14 @@ public class DeleterTxHook implements PreCommitTxHook {
|
||||
|
||||
@Override
|
||||
public void onChange(JObjectKey key, JData old, JData cur) {
|
||||
if (cur instanceof RemoteObject<?>) {
|
||||
return; // FIXME:
|
||||
}
|
||||
if (!(cur instanceof JDataRefcounted refCur)) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (canDelete(refCur)) {
|
||||
if (refCur instanceof RemoteObject<?> ro) {
|
||||
remoteObjectDeleter.putDeletionCandidate(ro);
|
||||
return;
|
||||
}
|
||||
Log.trace("Deleting object on change: " + key);
|
||||
curTx.delete(key);
|
||||
}
|
||||
@@ -31,14 +33,15 @@ public class DeleterTxHook implements PreCommitTxHook {
|
||||
|
||||
@Override
|
||||
public void onCreate(JObjectKey key, JData cur) {
|
||||
if (cur instanceof RemoteObject<?>) {
|
||||
return;
|
||||
}
|
||||
if (!(cur instanceof JDataRefcounted refCur)) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (canDelete(refCur)) {
|
||||
if (refCur instanceof RemoteObject<?> ro) {
|
||||
remoteObjectDeleter.putDeletionCandidate(ro);
|
||||
return;
|
||||
}
|
||||
Log.warn("Deleting object on creation: " + key);
|
||||
curTx.delete(key);
|
||||
}
|
||||
@@ -46,9 +49,6 @@ public class DeleterTxHook implements PreCommitTxHook {
|
||||
|
||||
@Override
|
||||
public void onDelete(JObjectKey key, JData cur) {
|
||||
if (cur instanceof RemoteObject<?>) {
|
||||
return;
|
||||
}
|
||||
if (!(cur instanceof JDataRefcounted refCur)) {
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -11,8 +11,9 @@ import java.util.List;
|
||||
|
||||
public record RemoteObject<T extends JDataRemote>(PCollection<JObjectKey> refsFrom, boolean frozen,
|
||||
RemoteObjectMeta meta, @Nullable T data) implements JDataRefcounted {
|
||||
// Self put
|
||||
public RemoteObject(T data, PeerId initialPeer) {
|
||||
this(HashTreePSet.empty(), false, new RemoteObjectMeta(data.key(), data.getClass(), initialPeer), data);
|
||||
this(HashTreePSet.empty(), false, new RemoteObjectMeta(data.key(), data.getClass(), false, initialPeer), data);
|
||||
}
|
||||
|
||||
public RemoteObject(JObjectKey key, PMap<PeerId, Long> remoteChangelog) {
|
||||
|
||||
@@ -0,0 +1,231 @@
|
||||
package com.usatiuk.dhfs.objects;
|
||||
|
||||
import com.usatiuk.dhfs.objects.repository.RemoteObjectServiceClient;
|
||||
import com.usatiuk.dhfs.objects.repository.peersync.PeerInfo;
|
||||
import com.usatiuk.dhfs.objects.repository.peersync.PeerInfoService;
|
||||
import com.usatiuk.dhfs.objects.transaction.Transaction;
|
||||
import com.usatiuk.dhfs.utils.HashSetDelayedBlockingQueue;
|
||||
import io.quarkus.logging.Log;
|
||||
import io.quarkus.runtime.ShutdownEvent;
|
||||
import io.quarkus.runtime.StartupEvent;
|
||||
import jakarta.annotation.Priority;
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
import jakarta.enterprise.event.Observes;
|
||||
import jakarta.inject.Inject;
|
||||
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
|
||||
import org.eclipse.microprofile.config.inject.ConfigProperty;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
@ApplicationScoped
|
||||
public class RemoteObjectDeleter {
|
||||
private final HashSetDelayedBlockingQueue<JObjectKey> _quickCandidates = new HashSetDelayedBlockingQueue<>(0);
|
||||
private final HashSetDelayedBlockingQueue<JObjectKey> _candidates;
|
||||
private final HashSetDelayedBlockingQueue<JObjectKey> _canDeleteRetries;
|
||||
private final HashSet<JObjectKey> _movablesInProcessing = new HashSet<>();
|
||||
|
||||
@Inject
|
||||
TransactionManager txm;
|
||||
@Inject
|
||||
Transaction curTx;
|
||||
@Inject
|
||||
RemoteTransaction remoteTx;
|
||||
@Inject
|
||||
PeerInfoService peerInfoService;
|
||||
@Inject
|
||||
RemoteObjectServiceClient remoteObjectServiceClient;
|
||||
|
||||
@ConfigProperty(name = "dhfs.objects.move-processor.threads")
|
||||
int moveProcessorThreads;
|
||||
@ConfigProperty(name = "dhfs.objects.ref-processor.threads")
|
||||
int refProcessorThreads;
|
||||
@ConfigProperty(name = "dhfs.objects.deletion.can-delete-retry-delay")
|
||||
long canDeleteRetryDelay;
|
||||
|
||||
private ExecutorService _movableProcessorExecutorService;
|
||||
private ExecutorService _refProcessorExecutorService;
|
||||
|
||||
public RemoteObjectDeleter(@ConfigProperty(name = "dhfs.objects.deletion.delay") long deletionDelay,
|
||||
@ConfigProperty(name = "dhfs.objects.deletion.can-delete-retry-delay") long canDeleteRetryDelay) {
|
||||
_candidates = new HashSetDelayedBlockingQueue<>(deletionDelay);
|
||||
_canDeleteRetries = new HashSetDelayedBlockingQueue<>(canDeleteRetryDelay);
|
||||
}
|
||||
|
||||
void init(@Observes @Priority(200) StartupEvent event) throws IOException {
|
||||
BasicThreadFactory factory = new BasicThreadFactory.Builder()
|
||||
.namingPattern("move-proc-%d")
|
||||
.build();
|
||||
_movableProcessorExecutorService = Executors.newFixedThreadPool(moveProcessorThreads, factory);
|
||||
|
||||
BasicThreadFactory factoryRef = new BasicThreadFactory.Builder()
|
||||
.namingPattern("ref-proc-%d")
|
||||
.build();
|
||||
_refProcessorExecutorService = Executors.newFixedThreadPool(refProcessorThreads, factoryRef);
|
||||
for (int i = 0; i < refProcessorThreads; i++) {
|
||||
_refProcessorExecutorService.submit(this::refProcessor);
|
||||
}
|
||||
|
||||
// Continue GC from last shutdown
|
||||
//FIXME
|
||||
// executorService.submit(() ->
|
||||
// jObjectManager.findAll().forEach(n -> {
|
||||
// jObjectManager.get(n).ifPresent(o -> o.runWriteLocked(JObjectManager.ResolutionStrategy.NO_RESOLUTION, (m, d, b, v) -> {
|
||||
// return null;
|
||||
// }));
|
||||
// }));
|
||||
}
|
||||
|
||||
void shutdown(@Observes @Priority(800) ShutdownEvent event) throws InterruptedException {
|
||||
_refProcessorExecutorService.shutdownNow();
|
||||
if (!_refProcessorExecutorService.awaitTermination(30, TimeUnit.SECONDS)) {
|
||||
Log.error("Refcounting threads didn't exit in 30 seconds");
|
||||
}
|
||||
}
|
||||
|
||||
// public void putQuickDeletionCandidate(JObjectKey obj) {
|
||||
// _quickCandidates.add(obj);
|
||||
// }
|
||||
|
||||
public void putDeletionCandidate(RemoteObject<?> obj) {
|
||||
synchronized (_movablesInProcessing) {
|
||||
if (_movablesInProcessing.contains(obj.key())) return;
|
||||
if (!obj.meta().seen()) {
|
||||
if (_quickCandidates.add(obj.key()))
|
||||
Log.debug("Quick deletion candidate: " + obj.key());
|
||||
return;
|
||||
}
|
||||
if (_candidates.add(obj.key()))
|
||||
Log.debug("Deletion candidate: " + obj.key());
|
||||
}
|
||||
}
|
||||
|
||||
private void asyncProcessMovable(JObjectKey objName) {
|
||||
synchronized (_movablesInProcessing) {
|
||||
if (_movablesInProcessing.contains(objName)) return;
|
||||
_movablesInProcessing.add(objName);
|
||||
}
|
||||
Log.debugv("Async processing of remote obj del: {0}", objName);
|
||||
|
||||
_movableProcessorExecutorService.submit(() -> {
|
||||
boolean delay = true;
|
||||
try {
|
||||
delay = txm.run(() -> {
|
||||
Log.debugv("Starting async processing of remote obj del: {0}", objName);
|
||||
RemoteObject<?> target = curTx.get(RemoteObject.class, objName).orElse(null);
|
||||
if (target == null) return true;
|
||||
|
||||
if (canDelete(target)) {
|
||||
Log.debugv("Async processing of remote obj del: immediate {0}", objName);
|
||||
curTx.delete(objName);
|
||||
return true;
|
||||
}
|
||||
var knownHosts = peerInfoService.getPeersNoSelf();
|
||||
List<PeerId> missing = knownHosts.stream()
|
||||
.map(PeerInfo::id)
|
||||
.filter(id -> !target.meta().confirmedDeletes().contains(id)).toList();
|
||||
|
||||
var ret = remoteObjectServiceClient.canDelete(missing, objName, target.refsFrom());
|
||||
|
||||
long ok = 0;
|
||||
|
||||
for (var r : ret) {
|
||||
if (!r.getDeletionCandidate()) {
|
||||
// for (var rr : r.getReferrersList())
|
||||
// autoSyncProcessor.add(rr);
|
||||
} else {
|
||||
ok++;
|
||||
}
|
||||
}
|
||||
|
||||
if (ok != missing.size()) {
|
||||
Log.debugv("Delaying deletion check of {0}", objName);
|
||||
return true;
|
||||
} else {
|
||||
assert canDelete(target);
|
||||
Log.debugv("Async processing of remote obj del: after query {0}", objName);
|
||||
curTx.delete(objName);
|
||||
return false;
|
||||
}
|
||||
});
|
||||
} finally {
|
||||
synchronized (_movablesInProcessing) {
|
||||
_movablesInProcessing.remove(objName);
|
||||
if (!delay)
|
||||
_candidates.add(objName);
|
||||
else
|
||||
_canDeleteRetries.add(objName);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// Returns true if the object can be deleted
|
||||
private boolean canDelete(RemoteObject<?> obj) {
|
||||
if (!obj.meta().seen())
|
||||
return true;
|
||||
|
||||
var knownHosts = peerInfoService.getPeers();
|
||||
boolean missing = false;
|
||||
for (var x : knownHosts) {
|
||||
if (!obj.meta().confirmedDeletes().contains(x.id())) {
|
||||
missing = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
return !missing;
|
||||
}
|
||||
|
||||
private void refProcessor() {
|
||||
while (true) {
|
||||
try {
|
||||
while (!Thread.interrupted()) {
|
||||
JObjectKey next = null;
|
||||
JObjectKey nextQuick = null;
|
||||
|
||||
while (next == null && nextQuick == null) {
|
||||
nextQuick = _quickCandidates.tryGet();
|
||||
|
||||
if (nextQuick != null) break;
|
||||
|
||||
next = _canDeleteRetries.tryGet();
|
||||
if (next == null)
|
||||
next = _candidates.tryGet();
|
||||
if (next == null)
|
||||
nextQuick = _quickCandidates.get(canDeleteRetryDelay);
|
||||
}
|
||||
|
||||
Stream.of(next, nextQuick).filter(Objects::nonNull).forEach(realNext -> {
|
||||
Log.debugv("Processing remote object deletion candidate: {0}", realNext);
|
||||
var deleted = txm.run(() -> {
|
||||
RemoteObject<?> target = curTx.get(RemoteObject.class, realNext).orElse(null);
|
||||
if (target == null) return true;
|
||||
|
||||
if (canDelete(target)) {
|
||||
Log.debugv("Immediate deletion of: {0}", realNext);
|
||||
curTx.delete(realNext);
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
});
|
||||
if (!deleted)
|
||||
asyncProcessMovable(realNext);
|
||||
});
|
||||
}
|
||||
} catch (InterruptedException ignored) {
|
||||
return;
|
||||
} catch (Throwable error) {
|
||||
Log.error("Exception in refcounter thread", error);
|
||||
}
|
||||
Log.info("JObject Refcounter thread exiting");
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@@ -14,8 +14,8 @@ public record RemoteObjectMeta(
|
||||
PSet<PeerId> confirmedDeletes,
|
||||
boolean seen,
|
||||
PMap<PeerId, Long> changelog) implements Serializable {
|
||||
public RemoteObjectMeta(JObjectKey key, Class<? extends JDataRemote> type, PeerId initialPeer) {
|
||||
this(key, HashTreePMap.empty(), type, HashTreePSet.empty(), true,
|
||||
public RemoteObjectMeta(JObjectKey key, Class<? extends JDataRemote> type, boolean seen, PeerId initialPeer) {
|
||||
this(key, HashTreePMap.empty(), type, HashTreePSet.empty(), seen,
|
||||
HashTreePMap.<PeerId, Long>empty().plus(initialPeer, 1L));
|
||||
}
|
||||
|
||||
|
||||
@@ -12,9 +12,15 @@ import jakarta.enterprise.context.ApplicationScoped;
|
||||
import jakarta.inject.Inject;
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@ApplicationScoped
|
||||
public class RemoteObjectServiceClient {
|
||||
@@ -43,6 +49,8 @@ public class RemoteObjectServiceClient {
|
||||
@Inject
|
||||
ProtoSerializer<GetObjectReply, ReceivedObject> receivedObjectProtoSerializer;
|
||||
|
||||
private final ExecutorService _batchExecutor = Executors.newVirtualThreadPerTaskExecutor();
|
||||
|
||||
// public Pair<ObjectHeader, JObjectDataP> getSpecificObject(UUID host, String name) {
|
||||
// return rpcClientFactory.withObjSyncClient(host, client -> {
|
||||
// var reply = client.getObject(GetObjectRequest.newBuilder().setSelfUuid(persistentPeerDataService.getSelfUuid().toString()).setName(name).build());
|
||||
@@ -151,31 +159,24 @@ public class RemoteObjectServiceClient {
|
||||
return OpPushReply.getDefaultInstance();
|
||||
}
|
||||
|
||||
// public Collection<CanDeleteReply> canDelete(Collection<UUID> targets, String object, Collection<String> ourReferrers) {
|
||||
// ConcurrentLinkedDeque<CanDeleteReply> results = new ConcurrentLinkedDeque<>();
|
||||
// Log.trace("Asking canDelete for " + object + " from " + targets.stream().map(UUID::toString).collect(Collectors.joining(", ")));
|
||||
// try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
|
||||
// try {
|
||||
// executor.invokeAll(targets.stream().<Callable<Void>>map(h -> () -> {
|
||||
// try {
|
||||
// var req = CanDeleteRequest.newBuilder()
|
||||
// .setSelfUuid(persistentPeerDataService.getSelfUuid().toString())
|
||||
// .setName(object);
|
||||
// req.addAllOurReferrers(ourReferrers);
|
||||
// var res = rpcClientFactory.withObjSyncClient(h, client -> client.canDelete(req.build()));
|
||||
// if (res != null)
|
||||
// results.add(res);
|
||||
// } catch (Exception e) {
|
||||
// Log.debug("Error when asking canDelete for object " + object, e);
|
||||
// }
|
||||
// return null;
|
||||
// }).toList());
|
||||
// } catch (InterruptedException e) {
|
||||
// Log.warn("Interrupted waiting for canDelete for object " + object);
|
||||
// }
|
||||
// if (!executor.shutdownNow().isEmpty())
|
||||
// Log.warn("Didn't ask all targets when asking canDelete for " + object);
|
||||
// }
|
||||
// return results;
|
||||
// }
|
||||
public Collection<CanDeleteReply> canDelete(Collection<PeerId> targets, JObjectKey object, Collection<JObjectKey> ourReferrers) {
|
||||
Log.trace("Asking canDelete for " + object + " from " + targets.stream().map(PeerId::toString).collect(Collectors.joining(", ")));
|
||||
try {
|
||||
return _batchExecutor.invokeAll(targets.stream().<Callable<CanDeleteReply>>map(h -> () -> {
|
||||
var req = CanDeleteRequest.newBuilder().setName(object.toString());
|
||||
for (var ref : ourReferrers) {
|
||||
req.addOurReferrers(ref.toString());
|
||||
}
|
||||
return rpcClientFactory.withObjSyncClient(h, (p, client) -> client.canDelete(req.build()));
|
||||
}).toList()).stream().map(f -> {
|
||||
try {
|
||||
return f.get();
|
||||
} catch (InterruptedException | ExecutionException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}).toList();
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -15,7 +15,6 @@ import io.smallrye.common.annotation.Blocking;
|
||||
import io.smallrye.mutiny.Uni;
|
||||
import jakarta.annotation.security.RolesAllowed;
|
||||
import jakarta.inject.Inject;
|
||||
import org.apache.commons.lang3.NotImplementedException;
|
||||
|
||||
// Note: RunOnVirtualThread hangs somehow
|
||||
@GrpcService
|
||||
@@ -101,58 +100,37 @@ public class RemoteObjectServiceServer implements DhfsObjectSyncGrpc {
|
||||
}
|
||||
|
||||
@Override
|
||||
@Blocking
|
||||
public Uni<CanDeleteReply> canDelete(CanDeleteRequest request) {
|
||||
throw new NotImplementedException();
|
||||
var peerId = identity.getPrincipal().getName().substring(3);
|
||||
|
||||
Log.info("<-- canDelete: " + request.getName() + " from " + peerId);
|
||||
|
||||
var builder = CanDeleteReply.newBuilder();
|
||||
builder.setObjName(request.getName());
|
||||
|
||||
txm.run(() -> {
|
||||
var obj = curTx.get(RemoteObject.class, JObjectKey.of(request.getName())).orElse(null);
|
||||
|
||||
if (obj == null) {
|
||||
builder.setDeletionCandidate(true);
|
||||
return;
|
||||
}
|
||||
|
||||
builder.setDeletionCandidate(!obj.frozen() && obj.refsFrom().isEmpty());
|
||||
|
||||
if (!builder.getDeletionCandidate())
|
||||
for (var r : obj.refsFrom())
|
||||
builder.addReferrers(r.toString());
|
||||
|
||||
// if (!ret.getDeletionCandidate())
|
||||
// for (var rr : request.getOurReferrersList())
|
||||
// autoSyncProcessor.add(rr);
|
||||
});
|
||||
return Uni.createFrom().item(builder.build());
|
||||
}
|
||||
|
||||
// @Override
|
||||
// @Blocking
|
||||
// public Uni<CanDeleteReply> canDelete(CanDeleteRequest request) {
|
||||
// if (request.getSelfUuid().isBlank()) throw new StatusRuntimeException(Status.INVALID_ARGUMENT);
|
||||
// if (!persistentPeerDataService.existsHost(UUID.fromString(request.getSelfUuid())))
|
||||
// throw new StatusRuntimeException(Status.UNAUTHENTICATED);
|
||||
//
|
||||
// Log.info("<-- canDelete: " + request.getName() + " from " + request.getSelfUuid());
|
||||
//
|
||||
// var builder = CanDeleteReply.newBuilder();
|
||||
//
|
||||
// var obj = jObjectManager.get(request.getName());
|
||||
//
|
||||
// builder.setSelfUuid(persistentPeerDataService.getSelfUuid().toString());
|
||||
// builder.setObjName(request.getName());
|
||||
//
|
||||
// if (obj.isPresent()) try {
|
||||
// boolean tryUpdate = obj.get().runReadLocked(JObjectManager.ResolutionStrategy.NO_RESOLUTION, (m, d) -> {
|
||||
// if (m.isDeleted() && !m.isDeletionCandidate())
|
||||
// throw new IllegalStateException("Object " + m.getName() + " is deleted but not a deletion candidate");
|
||||
// builder.setDeletionCandidate(m.isDeletionCandidate());
|
||||
// builder.addAllReferrers(m.getReferrers());
|
||||
// return m.isDeletionCandidate() && !m.isDeleted();
|
||||
// });
|
||||
// // FIXME
|
||||
|
||||
/// / if (tryUpdate) {
|
||||
/// / obj.get().runWriteLocked(JObjectManager.ResolutionStrategy.NO_RESOLUTION, (m, d, b, v) -> {
|
||||
/// / return null;
|
||||
/// / });
|
||||
/// / }
|
||||
// } catch (DeletedObjectAccessException dox) {
|
||||
// builder.setDeletionCandidate(true);
|
||||
// }
|
||||
// else {
|
||||
// builder.setDeletionCandidate(true);
|
||||
// }
|
||||
//
|
||||
// var ret = builder.build();
|
||||
//
|
||||
// if (!ret.getDeletionCandidate())
|
||||
// for (var rr : request.getOurReferrersList())
|
||||
// autoSyncProcessor.add(rr);
|
||||
//
|
||||
// return Uni.createFrom().item(ret);
|
||||
// }
|
||||
|
||||
// @Override
|
||||
// @Override
|
||||
// @Blocking
|
||||
// public Uni<IndexUpdateReply> indexUpdate(IndexUpdatePush request) {
|
||||
// if (request.getSelfUuid().isBlank()) throw new StatusRuntimeException(Status.INVALID_ARGUMENT);
|
||||
|
||||
@@ -31,6 +31,10 @@ public class OpPusher {
|
||||
return new IndexUpdateOp(key, remote.meta().changelog());
|
||||
}
|
||||
case JKleppmannTreePersistentData pd -> {
|
||||
var maybeQueue = pd.queues().get(op);
|
||||
if(maybeQueue == null || maybeQueue.isEmpty()) {
|
||||
return null;
|
||||
}
|
||||
var ret = new JKleppmannTreeOpWrapper(key, pd.queues().get(op).firstEntry().getValue());
|
||||
var newPd = pd.withQueues(pd.queues().plus(op, pd.queues().get(op).minus(ret.op().timestamp())));
|
||||
curTx.put(newPd);
|
||||
|
||||
Reference in New Issue
Block a user