mirror of
https://github.com/usatiuk/dhfs.git
synced 2025-10-29 04:57:48 +01:00
Compare commits
5 Commits
ffef8959df
...
194166109e
| Author | SHA1 | Date | |
|---|---|---|---|
| 194166109e | |||
| 68111a0c4f | |||
| b872c32a05 | |||
| 0e14b1cd39 | |||
| 17843952f2 |
@@ -142,6 +142,7 @@ public class RemoteObjectDeleter {
|
||||
|
||||
for (var r : ret) {
|
||||
if (!r.getValue().getDeletionCandidate()) {
|
||||
Log.infov("Could not delete {0}: reply from {1}: {2}", target, r.getKey(), r.getValue());
|
||||
for (var rr : r.getRight().getReferrersList())
|
||||
curTx.onCommit(() -> autosyncProcessor.add(JObjectKey.of(rr.getName())));
|
||||
} else {
|
||||
|
||||
@@ -72,9 +72,7 @@ public class RemoteObjectServiceServerImpl {
|
||||
}
|
||||
|
||||
public Uni<CanDeleteReply> canDelete(PeerId from, CanDeleteRequest request) {
|
||||
var peerId = from;
|
||||
|
||||
Log.info("<-- canDelete: " + request.getName() + " from " + peerId);
|
||||
Log.infov("<-- canDelete: {0} from {1}", request, from);
|
||||
|
||||
var builder = CanDeleteReply.newBuilder();
|
||||
|
||||
@@ -94,6 +92,10 @@ public class RemoteObjectServiceServerImpl {
|
||||
curTx.onCommit(() -> autosyncProcessor.add(r.obj()));
|
||||
}
|
||||
}
|
||||
|
||||
if (!builder.getDeletionCandidate()) {
|
||||
Log.infov("Not deletion candidate: {0}, {1} (asked from {2})", obj, builder, from);
|
||||
}
|
||||
});
|
||||
return Uni.createFrom().item(builder.build());
|
||||
}
|
||||
@@ -102,13 +104,13 @@ public class RemoteObjectServiceServerImpl {
|
||||
try {
|
||||
var ops = request.getMsgList().stream().map(opProtoSerializer::deserialize).toList();
|
||||
for (var op : ops) {
|
||||
Log.info("<-- op: " + op + " from " + from);
|
||||
Log.infov("<-- opPush: {0} from {1}", op, from);
|
||||
txm.run(() -> {
|
||||
opHandler.handleOp(from, op);
|
||||
});
|
||||
}
|
||||
} catch (Exception e) {
|
||||
Log.error(e, e);
|
||||
Log.error("Error handling ops", e);
|
||||
throw e;
|
||||
}
|
||||
return Uni.createFrom().item(OpPushReply.getDefaultInstance());
|
||||
|
||||
@@ -1,5 +1,7 @@
|
||||
package com.usatiuk.dhfs.repository.invalidation;
|
||||
|
||||
import com.usatiuk.dhfs.repository.RemoteObjectServiceClient;
|
||||
import com.usatiuk.dhfs.utils.AutoCloseableNoThrow;
|
||||
import com.usatiuk.dhfs.utils.DataLocker;
|
||||
import com.usatiuk.objects.JObjectKey;
|
||||
import com.usatiuk.dhfs.PeerId;
|
||||
@@ -15,9 +17,13 @@ import jakarta.annotation.Priority;
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
import jakarta.enterprise.event.Observes;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.ws.rs.core.Link;
|
||||
import org.apache.commons.collections4.multimap.ArrayListValuedHashMap;
|
||||
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
|
||||
import org.eclipse.microprofile.config.inject.ConfigProperty;
|
||||
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
@@ -41,6 +47,8 @@ public class InvalidationQueueService {
|
||||
PersistentPeerDataService persistentPeerDataService;
|
||||
|
||||
private final DataLocker _locker = new DataLocker();
|
||||
@Inject
|
||||
RemoteObjectServiceClient remoteObjectServiceClient;
|
||||
private ExecutorService _executor;
|
||||
private volatile boolean _shutdown = false;
|
||||
|
||||
@@ -105,38 +113,61 @@ public class InvalidationQueueService {
|
||||
String stats = "Sent invalidation: ";
|
||||
long success = 0;
|
||||
|
||||
for (var e : data) {
|
||||
// TODO: Race?
|
||||
if (!peerInfoService.existsPeer(e.peer())) {
|
||||
Log.warnv("Will ignore invalidation of {0} to {1}, peer not found", e.key(), e.peer());
|
||||
continue;
|
||||
}
|
||||
List<AutoCloseableNoThrow> locks = new LinkedList<>();
|
||||
try {
|
||||
ArrayListValuedHashMap<PeerId, Op> ops = new ArrayListValuedHashMap<>();
|
||||
ArrayListValuedHashMap<PeerId, Runnable> commits = new ArrayListValuedHashMap<>();
|
||||
for (var e : data) {
|
||||
// TODO: Race?
|
||||
if (!peerInfoService.existsPeer(e.peer())) {
|
||||
Log.warnv("Will ignore invalidation of {0} to {1}, peer not found", e.key(), e.peer());
|
||||
continue;
|
||||
}
|
||||
|
||||
if (!remoteHostManager.isReachable(e.peer())) {
|
||||
deferredInvalidationQueueService.defer(e);
|
||||
continue;
|
||||
}
|
||||
if (!remoteHostManager.isReachable(e.peer())) {
|
||||
deferredInvalidationQueueService.defer(e);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (!persistentPeerDataService.isInitialSyncDone(e.peer())) {
|
||||
pushInvalidationToOne(e);
|
||||
continue;
|
||||
}
|
||||
if (!persistentPeerDataService.isInitialSyncDone(e.peer())) {
|
||||
pushInvalidationToOne(e);
|
||||
continue;
|
||||
}
|
||||
|
||||
try (var lock = _locker.tryLock(e)) {
|
||||
var lock = _locker.tryLock(e);
|
||||
if (lock == null) {
|
||||
pushInvalidationToOne(e);
|
||||
continue;
|
||||
}
|
||||
opPusher.doPush(e);
|
||||
success++;
|
||||
} catch (Exception ex) {
|
||||
Log.warnv("Failed to send invalidation to {0}, will retry: {1}", e, ex);
|
||||
pushInvalidationToOne(e);
|
||||
locks.add(lock);
|
||||
try {
|
||||
var prepared = opPusher.preparePush(e);
|
||||
ops.get(e.peer()).addAll(prepared.getLeft());
|
||||
commits.get(e.peer()).addAll(prepared.getRight());
|
||||
success++;
|
||||
} catch (Exception ex) {
|
||||
Log.warnv("Failed to prepare invalidation to {0}, will retry: {1}", e, ex);
|
||||
pushInvalidationToOne(e);
|
||||
}
|
||||
if (_shutdown) {
|
||||
Log.info("Invalidation sender exiting");
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (_shutdown) {
|
||||
Log.info("Invalidation sender exiting");
|
||||
break;
|
||||
|
||||
for (var p : ops.keySet()) {
|
||||
var list = ops.get(p);
|
||||
Log.infov("Pushing invalidations to {0}: {1}", p, list);
|
||||
remoteObjectServiceClient.pushOps(p, list);
|
||||
commits.get(p).forEach(Runnable::run);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
Log.warnv("Failed to send invalidations, will retry", e);
|
||||
for (var inv : data) {
|
||||
pushInvalidationToOne(inv);
|
||||
}
|
||||
} finally {
|
||||
locks.forEach(AutoCloseableNoThrow::close);
|
||||
}
|
||||
|
||||
stats += success + "/" + data.size() + " ";
|
||||
|
||||
@@ -14,6 +14,7 @@ import com.usatiuk.objects.transaction.TransactionManager;
|
||||
import io.quarkus.logging.Log;
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
import jakarta.inject.Inject;
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
@@ -35,65 +36,60 @@ public class OpPusher {
|
||||
@Inject
|
||||
DtoMapperService dtoMapperService;
|
||||
|
||||
public void doPush(InvalidationQueueEntry entry) {
|
||||
AtomicBoolean doAgain = new AtomicBoolean(false);
|
||||
do {
|
||||
// FIXME:
|
||||
doAgain.set(false);
|
||||
List<Op> info = txm.run(() -> {
|
||||
var obj = curTx.get(JData.class, entry.key()).orElse(null);
|
||||
switch (obj) {
|
||||
case RemoteObjectMeta remote -> {
|
||||
JDataRemoteDto data =
|
||||
remote.knownType().isAnnotationPresent(JDataRemotePush.class)
|
||||
? remoteTransaction.getData(remote.knownType(), entry.key())
|
||||
.map(d -> dtoMapperService.toDto(d, d.dtoClass())).orElse(null)
|
||||
: null;
|
||||
public Pair<List<Op>, List<Runnable>> preparePush(InvalidationQueueEntry entry) {
|
||||
List<Op> info = txm.run(() -> {
|
||||
var obj = curTx.get(JData.class, entry.key()).orElse(null);
|
||||
switch (obj) {
|
||||
case RemoteObjectMeta remote -> {
|
||||
JDataRemoteDto data =
|
||||
remote.knownType().isAnnotationPresent(JDataRemotePush.class)
|
||||
? remoteTransaction.getData(remote.knownType(), entry.key())
|
||||
.map(d -> dtoMapperService.toDto(d, d.dtoClass())).orElse(null)
|
||||
: null;
|
||||
|
||||
if (remote.knownType().isAnnotationPresent(JDataRemotePush.class) && data == null) {
|
||||
Log.warnv("Failed to get data for push {0} of type {1}", entry.key(), remote.knownType());
|
||||
}
|
||||
return List.of(new IndexUpdateOp(entry.key(), remote.changelog(), data));
|
||||
}
|
||||
case JKleppmannTreePersistentData pd -> {
|
||||
var tree = jKleppmannTreeManager.getTree(pd.key());
|
||||
|
||||
if (!tree.hasPendingOpsForHost(entry.peer()))
|
||||
return List.of(tree.getPeriodicPushOp());
|
||||
|
||||
var ops = tree.getPendingOpsForHost(entry.peer(), 100);
|
||||
|
||||
if (tree.hasPendingOpsForHost(entry.peer())) {
|
||||
doAgain.set(true);
|
||||
invalidationQueueService.pushInvalidationToOneNoDelay(entry.peer(), pd.key());
|
||||
}
|
||||
return ops;
|
||||
}
|
||||
case null,
|
||||
default -> {
|
||||
return null;
|
||||
if (remote.knownType().isAnnotationPresent(JDataRemotePush.class) && data == null) {
|
||||
Log.warnv("Failed to get data for push {0} of type {1}", entry.key(), remote.knownType());
|
||||
}
|
||||
return List.of(new IndexUpdateOp(entry.key(), remote.changelog(), data));
|
||||
}
|
||||
case JKleppmannTreePersistentData pd -> {
|
||||
var tree = jKleppmannTreeManager.getTree(pd.key());
|
||||
|
||||
if (!tree.hasPendingOpsForHost(entry.peer()))
|
||||
return List.of(tree.getPeriodicPushOp());
|
||||
|
||||
var ops = tree.getPendingOpsForHost(entry.peer(), 100);
|
||||
|
||||
if (tree.hasPendingOpsForHost(entry.peer())) {
|
||||
invalidationQueueService.pushInvalidationToOneNoDelay(entry.peer(), pd.key());
|
||||
}
|
||||
return ops;
|
||||
}
|
||||
case null,
|
||||
default -> {
|
||||
return List.of();
|
||||
}
|
||||
});
|
||||
if (info == null) {
|
||||
return;
|
||||
}
|
||||
Log.debugv("Pushing invalidation: entry {0}, sending {1}", entry, info);
|
||||
remoteObjectServiceClient.pushOps(entry.peer(), info);
|
||||
txm.run(() -> {
|
||||
var obj = curTx.get(JData.class, entry.key()).orElse(null);
|
||||
switch (obj) {
|
||||
case JKleppmannTreePersistentData pd: {
|
||||
var tree = jKleppmannTreeManager.getTree(pd.key());
|
||||
for (var op : info) {
|
||||
tree.commitOpForHost(entry.peer(), op);
|
||||
});
|
||||
List<Runnable> commits = info.stream().<Runnable>map(o -> {
|
||||
return () -> {
|
||||
txm.run(() -> {
|
||||
var obj = curTx.get(JData.class, entry.key()).orElse(null);
|
||||
switch (obj) {
|
||||
case JKleppmannTreePersistentData pd: {
|
||||
var tree = jKleppmannTreeManager.getTree(pd.key());
|
||||
for (var op : info) {
|
||||
tree.commitOpForHost(entry.peer(), op);
|
||||
}
|
||||
break;
|
||||
}
|
||||
break;
|
||||
case null:
|
||||
default:
|
||||
}
|
||||
case null:
|
||||
default:
|
||||
}
|
||||
});
|
||||
} while (doAgain.get());
|
||||
});
|
||||
};
|
||||
}).toList();
|
||||
|
||||
return Pair.of(info, commits);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4,7 +4,7 @@ dhfs.objects.peerdiscovery.interval=4s
|
||||
dhfs.objects.peerdiscovery.broadcast=true
|
||||
dhfs.objects.sync.timeout=30
|
||||
dhfs.objects.sync.ping.timeout=5
|
||||
dhfs.objects.invalidation.threads=4
|
||||
dhfs.objects.invalidation.threads=16
|
||||
dhfs.objects.invalidation.delay=1000
|
||||
dhfs.objects.reconnect_interval=5s
|
||||
dhfs.objects.write_log=false
|
||||
@@ -26,10 +26,10 @@ dhfs.objects.deletion.delay=1000
|
||||
dhfs.objects.deletion.can-delete-retry-delay=10000
|
||||
dhfs.objects.ref_verification=true
|
||||
dhfs.files.use_hash_for_chunks=false
|
||||
dhfs.objects.autosync.threads=2
|
||||
dhfs.objects.autosync.threads=16
|
||||
dhfs.objects.autosync.download-all=false
|
||||
dhfs.objects.move-processor.threads=4
|
||||
dhfs.objects.ref-processor.threads=4
|
||||
dhfs.objects.move-processor.threads=16
|
||||
dhfs.objects.ref-processor.threads=16
|
||||
dhfs.objects.opsender.batch-size=100
|
||||
dhfs.objects.lock_timeout_secs=2
|
||||
dhfs.local-discovery=true
|
||||
|
||||
Reference in New Issue
Block a user