5 Commits

5 changed files with 116 additions and 86 deletions

View File

@@ -142,6 +142,7 @@ public class RemoteObjectDeleter {
for (var r : ret) {
if (!r.getValue().getDeletionCandidate()) {
Log.infov("Could not delete {0}: reply from {1}: {2}", target, r.getKey(), r.getValue());
for (var rr : r.getRight().getReferrersList())
curTx.onCommit(() -> autosyncProcessor.add(JObjectKey.of(rr.getName())));
} else {

View File

@@ -72,9 +72,7 @@ public class RemoteObjectServiceServerImpl {
}
public Uni<CanDeleteReply> canDelete(PeerId from, CanDeleteRequest request) {
var peerId = from;
Log.info("<-- canDelete: " + request.getName() + " from " + peerId);
Log.infov("<-- canDelete: {0} from {1}", request, from);
var builder = CanDeleteReply.newBuilder();
@@ -94,6 +92,10 @@ public class RemoteObjectServiceServerImpl {
curTx.onCommit(() -> autosyncProcessor.add(r.obj()));
}
}
if (!builder.getDeletionCandidate()) {
Log.infov("Not deletion candidate: {0}, {1} (asked from {2})", obj, builder, from);
}
});
return Uni.createFrom().item(builder.build());
}
@@ -102,13 +104,13 @@ public class RemoteObjectServiceServerImpl {
try {
var ops = request.getMsgList().stream().map(opProtoSerializer::deserialize).toList();
for (var op : ops) {
Log.info("<-- op: " + op + " from " + from);
Log.infov("<-- opPush: {0} from {1}", op, from);
txm.run(() -> {
opHandler.handleOp(from, op);
});
}
} catch (Exception e) {
Log.error(e, e);
Log.error("Error handling ops", e);
throw e;
}
return Uni.createFrom().item(OpPushReply.getDefaultInstance());

View File

@@ -1,5 +1,7 @@
package com.usatiuk.dhfs.repository.invalidation;
import com.usatiuk.dhfs.repository.RemoteObjectServiceClient;
import com.usatiuk.dhfs.utils.AutoCloseableNoThrow;
import com.usatiuk.dhfs.utils.DataLocker;
import com.usatiuk.objects.JObjectKey;
import com.usatiuk.dhfs.PeerId;
@@ -15,9 +17,13 @@ import jakarta.annotation.Priority;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
import jakarta.inject.Inject;
import jakarta.ws.rs.core.Link;
import org.apache.commons.collections4.multimap.ArrayListValuedHashMap;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@@ -41,6 +47,8 @@ public class InvalidationQueueService {
PersistentPeerDataService persistentPeerDataService;
private final DataLocker _locker = new DataLocker();
@Inject
RemoteObjectServiceClient remoteObjectServiceClient;
private ExecutorService _executor;
private volatile boolean _shutdown = false;
@@ -105,38 +113,61 @@ public class InvalidationQueueService {
String stats = "Sent invalidation: ";
long success = 0;
for (var e : data) {
// TODO: Race?
if (!peerInfoService.existsPeer(e.peer())) {
Log.warnv("Will ignore invalidation of {0} to {1}, peer not found", e.key(), e.peer());
continue;
}
List<AutoCloseableNoThrow> locks = new LinkedList<>();
try {
ArrayListValuedHashMap<PeerId, Op> ops = new ArrayListValuedHashMap<>();
ArrayListValuedHashMap<PeerId, Runnable> commits = new ArrayListValuedHashMap<>();
for (var e : data) {
// TODO: Race?
if (!peerInfoService.existsPeer(e.peer())) {
Log.warnv("Will ignore invalidation of {0} to {1}, peer not found", e.key(), e.peer());
continue;
}
if (!remoteHostManager.isReachable(e.peer())) {
deferredInvalidationQueueService.defer(e);
continue;
}
if (!remoteHostManager.isReachable(e.peer())) {
deferredInvalidationQueueService.defer(e);
continue;
}
if (!persistentPeerDataService.isInitialSyncDone(e.peer())) {
pushInvalidationToOne(e);
continue;
}
if (!persistentPeerDataService.isInitialSyncDone(e.peer())) {
pushInvalidationToOne(e);
continue;
}
try (var lock = _locker.tryLock(e)) {
var lock = _locker.tryLock(e);
if (lock == null) {
pushInvalidationToOne(e);
continue;
}
opPusher.doPush(e);
success++;
} catch (Exception ex) {
Log.warnv("Failed to send invalidation to {0}, will retry: {1}", e, ex);
pushInvalidationToOne(e);
locks.add(lock);
try {
var prepared = opPusher.preparePush(e);
ops.get(e.peer()).addAll(prepared.getLeft());
commits.get(e.peer()).addAll(prepared.getRight());
success++;
} catch (Exception ex) {
Log.warnv("Failed to prepare invalidation to {0}, will retry: {1}", e, ex);
pushInvalidationToOne(e);
}
if (_shutdown) {
Log.info("Invalidation sender exiting");
break;
}
}
if (_shutdown) {
Log.info("Invalidation sender exiting");
break;
for (var p : ops.keySet()) {
var list = ops.get(p);
Log.infov("Pushing invalidations to {0}: {1}", p, list);
remoteObjectServiceClient.pushOps(p, list);
commits.get(p).forEach(Runnable::run);
}
} catch (Exception e) {
Log.warnv("Failed to send invalidations, will retry", e);
for (var inv : data) {
pushInvalidationToOne(inv);
}
} finally {
locks.forEach(AutoCloseableNoThrow::close);
}
stats += success + "/" + data.size() + " ";

View File

@@ -14,6 +14,7 @@ import com.usatiuk.objects.transaction.TransactionManager;
import io.quarkus.logging.Log;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import org.apache.commons.lang3.tuple.Pair;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -35,65 +36,60 @@ public class OpPusher {
@Inject
DtoMapperService dtoMapperService;
public void doPush(InvalidationQueueEntry entry) {
AtomicBoolean doAgain = new AtomicBoolean(false);
do {
// FIXME:
doAgain.set(false);
List<Op> info = txm.run(() -> {
var obj = curTx.get(JData.class, entry.key()).orElse(null);
switch (obj) {
case RemoteObjectMeta remote -> {
JDataRemoteDto data =
remote.knownType().isAnnotationPresent(JDataRemotePush.class)
? remoteTransaction.getData(remote.knownType(), entry.key())
.map(d -> dtoMapperService.toDto(d, d.dtoClass())).orElse(null)
: null;
public Pair<List<Op>, List<Runnable>> preparePush(InvalidationQueueEntry entry) {
List<Op> info = txm.run(() -> {
var obj = curTx.get(JData.class, entry.key()).orElse(null);
switch (obj) {
case RemoteObjectMeta remote -> {
JDataRemoteDto data =
remote.knownType().isAnnotationPresent(JDataRemotePush.class)
? remoteTransaction.getData(remote.knownType(), entry.key())
.map(d -> dtoMapperService.toDto(d, d.dtoClass())).orElse(null)
: null;
if (remote.knownType().isAnnotationPresent(JDataRemotePush.class) && data == null) {
Log.warnv("Failed to get data for push {0} of type {1}", entry.key(), remote.knownType());
}
return List.of(new IndexUpdateOp(entry.key(), remote.changelog(), data));
}
case JKleppmannTreePersistentData pd -> {
var tree = jKleppmannTreeManager.getTree(pd.key());
if (!tree.hasPendingOpsForHost(entry.peer()))
return List.of(tree.getPeriodicPushOp());
var ops = tree.getPendingOpsForHost(entry.peer(), 100);
if (tree.hasPendingOpsForHost(entry.peer())) {
doAgain.set(true);
invalidationQueueService.pushInvalidationToOneNoDelay(entry.peer(), pd.key());
}
return ops;
}
case null,
default -> {
return null;
if (remote.knownType().isAnnotationPresent(JDataRemotePush.class) && data == null) {
Log.warnv("Failed to get data for push {0} of type {1}", entry.key(), remote.knownType());
}
return List.of(new IndexUpdateOp(entry.key(), remote.changelog(), data));
}
case JKleppmannTreePersistentData pd -> {
var tree = jKleppmannTreeManager.getTree(pd.key());
if (!tree.hasPendingOpsForHost(entry.peer()))
return List.of(tree.getPeriodicPushOp());
var ops = tree.getPendingOpsForHost(entry.peer(), 100);
if (tree.hasPendingOpsForHost(entry.peer())) {
invalidationQueueService.pushInvalidationToOneNoDelay(entry.peer(), pd.key());
}
return ops;
}
case null,
default -> {
return List.of();
}
});
if (info == null) {
return;
}
Log.debugv("Pushing invalidation: entry {0}, sending {1}", entry, info);
remoteObjectServiceClient.pushOps(entry.peer(), info);
txm.run(() -> {
var obj = curTx.get(JData.class, entry.key()).orElse(null);
switch (obj) {
case JKleppmannTreePersistentData pd: {
var tree = jKleppmannTreeManager.getTree(pd.key());
for (var op : info) {
tree.commitOpForHost(entry.peer(), op);
});
List<Runnable> commits = info.stream().<Runnable>map(o -> {
return () -> {
txm.run(() -> {
var obj = curTx.get(JData.class, entry.key()).orElse(null);
switch (obj) {
case JKleppmannTreePersistentData pd: {
var tree = jKleppmannTreeManager.getTree(pd.key());
for (var op : info) {
tree.commitOpForHost(entry.peer(), op);
}
break;
}
break;
case null:
default:
}
case null:
default:
}
});
} while (doAgain.get());
});
};
}).toList();
return Pair.of(info, commits);
}
}

View File

@@ -4,7 +4,7 @@ dhfs.objects.peerdiscovery.interval=4s
dhfs.objects.peerdiscovery.broadcast=true
dhfs.objects.sync.timeout=30
dhfs.objects.sync.ping.timeout=5
dhfs.objects.invalidation.threads=4
dhfs.objects.invalidation.threads=16
dhfs.objects.invalidation.delay=1000
dhfs.objects.reconnect_interval=5s
dhfs.objects.write_log=false
@@ -26,10 +26,10 @@ dhfs.objects.deletion.delay=1000
dhfs.objects.deletion.can-delete-retry-delay=10000
dhfs.objects.ref_verification=true
dhfs.files.use_hash_for_chunks=false
dhfs.objects.autosync.threads=2
dhfs.objects.autosync.threads=16
dhfs.objects.autosync.download-all=false
dhfs.objects.move-processor.threads=4
dhfs.objects.ref-processor.threads=4
dhfs.objects.move-processor.threads=16
dhfs.objects.ref-processor.threads=16
dhfs.objects.opsender.batch-size=100
dhfs.objects.lock_timeout_secs=2
dhfs.local-discovery=true