mirror of
https://github.com/usatiuk/dhfs.git
synced 2025-10-28 20:47:49 +01:00
Server: push multiple ops, really^2
This commit is contained in:
@@ -1,5 +1,7 @@
|
||||
package com.usatiuk.dhfs.repository.invalidation;
|
||||
|
||||
import com.usatiuk.dhfs.repository.RemoteObjectServiceClient;
|
||||
import com.usatiuk.dhfs.utils.AutoCloseableNoThrow;
|
||||
import com.usatiuk.dhfs.utils.DataLocker;
|
||||
import com.usatiuk.objects.JObjectKey;
|
||||
import com.usatiuk.dhfs.PeerId;
|
||||
@@ -15,9 +17,13 @@ import jakarta.annotation.Priority;
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
import jakarta.enterprise.event.Observes;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.ws.rs.core.Link;
|
||||
import org.apache.commons.collections4.multimap.ArrayListValuedHashMap;
|
||||
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
|
||||
import org.eclipse.microprofile.config.inject.ConfigProperty;
|
||||
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
@@ -41,6 +47,8 @@ public class InvalidationQueueService {
|
||||
PersistentPeerDataService persistentPeerDataService;
|
||||
|
||||
private final DataLocker _locker = new DataLocker();
|
||||
@Inject
|
||||
RemoteObjectServiceClient remoteObjectServiceClient;
|
||||
private ExecutorService _executor;
|
||||
private volatile boolean _shutdown = false;
|
||||
|
||||
@@ -105,38 +113,57 @@ public class InvalidationQueueService {
|
||||
String stats = "Sent invalidation: ";
|
||||
long success = 0;
|
||||
|
||||
for (var e : data) {
|
||||
// TODO: Race?
|
||||
if (!peerInfoService.existsPeer(e.peer())) {
|
||||
Log.warnv("Will ignore invalidation of {0} to {1}, peer not found", e.key(), e.peer());
|
||||
continue;
|
||||
}
|
||||
List<AutoCloseableNoThrow> locks = new LinkedList<>();
|
||||
try {
|
||||
ArrayListValuedHashMap<PeerId, Op> ops = new ArrayListValuedHashMap<>();
|
||||
ArrayListValuedHashMap<PeerId, Runnable> commits = new ArrayListValuedHashMap<>();
|
||||
|
||||
if (!remoteHostManager.isReachable(e.peer())) {
|
||||
deferredInvalidationQueueService.defer(e);
|
||||
continue;
|
||||
}
|
||||
for (var e : data) {
|
||||
// TODO: Race?
|
||||
if (!peerInfoService.existsPeer(e.peer())) {
|
||||
Log.warnv("Will ignore invalidation of {0} to {1}, peer not found", e.key(), e.peer());
|
||||
continue;
|
||||
}
|
||||
|
||||
if (!persistentPeerDataService.isInitialSyncDone(e.peer())) {
|
||||
pushInvalidationToOne(e);
|
||||
continue;
|
||||
}
|
||||
if (!remoteHostManager.isReachable(e.peer())) {
|
||||
deferredInvalidationQueueService.defer(e);
|
||||
continue;
|
||||
}
|
||||
|
||||
try (var lock = _locker.tryLock(e)) {
|
||||
if (!persistentPeerDataService.isInitialSyncDone(e.peer())) {
|
||||
pushInvalidationToOne(e);
|
||||
continue;
|
||||
}
|
||||
|
||||
var lock = _locker.tryLock(e);
|
||||
if (lock == null) {
|
||||
pushInvalidationToOne(e);
|
||||
continue;
|
||||
}
|
||||
opPusher.doPush(e);
|
||||
success++;
|
||||
} catch (Exception ex) {
|
||||
Log.warnv("Failed to send invalidation to {0}, will retry: {1}", e, ex);
|
||||
pushInvalidationToOne(e);
|
||||
locks.add(lock);
|
||||
try {
|
||||
var prepared = opPusher.preparePush(e);
|
||||
ops.putAll(e.peer(), prepared.getLeft());
|
||||
commits.putAll(e.peer(), prepared.getRight());
|
||||
success++;
|
||||
} catch (Exception ex) {
|
||||
Log.warnv("Failed to prepare invalidation to {0}, will retry: {1}", e, ex);
|
||||
pushInvalidationToOne(e);
|
||||
}
|
||||
if (_shutdown) {
|
||||
Log.info("Invalidation sender exiting");
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (_shutdown) {
|
||||
Log.info("Invalidation sender exiting");
|
||||
break;
|
||||
|
||||
for (var p : ops.keySet()) {
|
||||
var list = ops.get(p);
|
||||
Log.debugv("Pushing invalidations to {0}: {1}", p, list);
|
||||
remoteObjectServiceClient.pushOps(p, list);
|
||||
commits.get(p).forEach(Runnable::run);
|
||||
}
|
||||
} finally {
|
||||
locks.forEach(AutoCloseableNoThrow::close);
|
||||
}
|
||||
|
||||
stats += success + "/" + data.size() + " ";
|
||||
|
||||
@@ -14,6 +14,7 @@ import com.usatiuk.objects.transaction.TransactionManager;
|
||||
import io.quarkus.logging.Log;
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
import jakarta.inject.Inject;
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
@@ -35,65 +36,60 @@ public class OpPusher {
|
||||
@Inject
|
||||
DtoMapperService dtoMapperService;
|
||||
|
||||
public void doPush(InvalidationQueueEntry entry) {
|
||||
AtomicBoolean doAgain = new AtomicBoolean(false);
|
||||
do {
|
||||
// FIXME:
|
||||
doAgain.set(false);
|
||||
List<Op> info = txm.run(() -> {
|
||||
var obj = curTx.get(JData.class, entry.key()).orElse(null);
|
||||
switch (obj) {
|
||||
case RemoteObjectMeta remote -> {
|
||||
JDataRemoteDto data =
|
||||
remote.knownType().isAnnotationPresent(JDataRemotePush.class)
|
||||
? remoteTransaction.getData(remote.knownType(), entry.key())
|
||||
.map(d -> dtoMapperService.toDto(d, d.dtoClass())).orElse(null)
|
||||
: null;
|
||||
public Pair<List<Op>, List<Runnable>> preparePush(InvalidationQueueEntry entry) {
|
||||
List<Op> info = txm.run(() -> {
|
||||
var obj = curTx.get(JData.class, entry.key()).orElse(null);
|
||||
switch (obj) {
|
||||
case RemoteObjectMeta remote -> {
|
||||
JDataRemoteDto data =
|
||||
remote.knownType().isAnnotationPresent(JDataRemotePush.class)
|
||||
? remoteTransaction.getData(remote.knownType(), entry.key())
|
||||
.map(d -> dtoMapperService.toDto(d, d.dtoClass())).orElse(null)
|
||||
: null;
|
||||
|
||||
if (remote.knownType().isAnnotationPresent(JDataRemotePush.class) && data == null) {
|
||||
Log.warnv("Failed to get data for push {0} of type {1}", entry.key(), remote.knownType());
|
||||
}
|
||||
return List.of(new IndexUpdateOp(entry.key(), remote.changelog(), data));
|
||||
}
|
||||
case JKleppmannTreePersistentData pd -> {
|
||||
var tree = jKleppmannTreeManager.getTree(pd.key());
|
||||
|
||||
if (!tree.hasPendingOpsForHost(entry.peer()))
|
||||
return List.of(tree.getPeriodicPushOp());
|
||||
|
||||
var ops = tree.getPendingOpsForHost(entry.peer(), 100);
|
||||
|
||||
if (tree.hasPendingOpsForHost(entry.peer())) {
|
||||
doAgain.set(true);
|
||||
invalidationQueueService.pushInvalidationToOneNoDelay(entry.peer(), pd.key());
|
||||
}
|
||||
return ops;
|
||||
}
|
||||
case null,
|
||||
default -> {
|
||||
return null;
|
||||
if (remote.knownType().isAnnotationPresent(JDataRemotePush.class) && data == null) {
|
||||
Log.warnv("Failed to get data for push {0} of type {1}", entry.key(), remote.knownType());
|
||||
}
|
||||
return List.of(new IndexUpdateOp(entry.key(), remote.changelog(), data));
|
||||
}
|
||||
case JKleppmannTreePersistentData pd -> {
|
||||
var tree = jKleppmannTreeManager.getTree(pd.key());
|
||||
|
||||
if (!tree.hasPendingOpsForHost(entry.peer()))
|
||||
return List.of(tree.getPeriodicPushOp());
|
||||
|
||||
var ops = tree.getPendingOpsForHost(entry.peer(), 100);
|
||||
|
||||
if (tree.hasPendingOpsForHost(entry.peer())) {
|
||||
invalidationQueueService.pushInvalidationToOneNoDelay(entry.peer(), pd.key());
|
||||
}
|
||||
return ops;
|
||||
}
|
||||
case null,
|
||||
default -> {
|
||||
return List.of();
|
||||
}
|
||||
});
|
||||
if (info == null) {
|
||||
return;
|
||||
}
|
||||
Log.debugv("Pushing invalidation: entry {0}, sending {1}", entry, info);
|
||||
remoteObjectServiceClient.pushOps(entry.peer(), info);
|
||||
txm.run(() -> {
|
||||
var obj = curTx.get(JData.class, entry.key()).orElse(null);
|
||||
switch (obj) {
|
||||
case JKleppmannTreePersistentData pd: {
|
||||
var tree = jKleppmannTreeManager.getTree(pd.key());
|
||||
for (var op : info) {
|
||||
tree.commitOpForHost(entry.peer(), op);
|
||||
});
|
||||
List<Runnable> commits = info.stream().<Runnable>map(o -> {
|
||||
return () -> {
|
||||
txm.run(() -> {
|
||||
var obj = curTx.get(JData.class, entry.key()).orElse(null);
|
||||
switch (obj) {
|
||||
case JKleppmannTreePersistentData pd: {
|
||||
var tree = jKleppmannTreeManager.getTree(pd.key());
|
||||
for (var op : info) {
|
||||
tree.commitOpForHost(entry.peer(), op);
|
||||
}
|
||||
break;
|
||||
}
|
||||
break;
|
||||
case null:
|
||||
default:
|
||||
}
|
||||
case null:
|
||||
default:
|
||||
}
|
||||
});
|
||||
} while (doAgain.get());
|
||||
});
|
||||
};
|
||||
}).toList();
|
||||
|
||||
return Pair.of(info, commits);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user