small op push improvements, prepare for "forced" push

This commit is contained in:
2025-02-21 16:22:43 +01:00
parent 4f7da67ba5
commit 12d7f3a427
8 changed files with 52 additions and 91 deletions

View File

@@ -103,11 +103,6 @@ public class PeerManager {
private void handleConnectionSuccess(PeerInfo host, PeerAddress address) {
boolean wasReachable = isReachable(host);
boolean shouldSync = persistentPeerDataService.markInitialSyncDone(host.id());
if (shouldSync)
syncHandler.doInitialSync(host.id());
_states.put(host.id(), address);
if (wasReachable) return;
@@ -178,7 +173,6 @@ public class PeerManager {
public void removeRemoteHost(PeerId peerId) {
transactionManager.run(() -> {
peerInfoService.removePeer(peerId);
persistentPeerDataService.resetInitialSyncDone(peerId);
});
}

View File

@@ -13,7 +13,6 @@ import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
import jakarta.inject.Inject;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.pcollections.HashTreePSet;
import java.io.IOException;
import java.security.KeyPair;
@@ -64,7 +63,7 @@ public class PersistentPeerDataService {
_selfKeyPair = CertificateTools.generateKeyPair();
_selfCertificate = CertificateTools.generateCertificate(_selfKeyPair, _selfUuid.toString());
curTx.put(new PersistentRemoteHostsData(_selfUuid, 0, _selfCertificate, _selfKeyPair, HashTreePSet.empty()));
curTx.put(new PersistentRemoteHostsData(_selfUuid, _selfCertificate, _selfKeyPair));
peerInfoService.putPeer(_selfUuid, _selfCertificate.getEncoded());
} catch (CertificateEncodingException e) {
throw new RuntimeException(e);
@@ -94,14 +93,6 @@ public class PersistentPeerDataService {
return _selfUuid;
}
public long getUniqueId() {
return jObjectTxManager.run(() -> {
var curData = curTx.get(PersistentRemoteHostsData.class, PersistentRemoteHostsData.KEY).orElse(null);
curTx.put(curData.withSelfCounter(curData.selfCounter() + 1));
return curData.selfCounter();
});
}
// private void updateCerts() {
// try {
// peerDirectory.get().runReadLocked(JObjectManager.ResolutionStrategy.LOCAL_ONLY, (m, d) -> {
@@ -125,28 +116,5 @@ public class PersistentPeerDataService {
return _selfCertificate;
}
// Returns true if host's initial sync wasn't done before, and marks it as done
public boolean markInitialSyncDone(PeerId peerId) {
return txm.run(() -> {
var data = curTx.get(PersistentRemoteHostsData.class, PersistentRemoteHostsData.KEY).orElse(null);
if (data == null) throw new IllegalStateException("Self data not found");
boolean exists = data.initialSyncDone().contains(peerId);
if (exists) return false;
curTx.put(data.withInitialSyncDone(data.initialSyncDone().plus(peerId)));
return true;
});
}
// Returns true if it was marked as done before, and resets it
public boolean resetInitialSyncDone(PeerId peerId) {
return txm.run(() -> {
var data = curTx.get(PersistentRemoteHostsData.class, PersistentRemoteHostsData.KEY).orElse(null);
if (data == null) throw new IllegalStateException("Self data not found");
boolean exists = data.initialSyncDone().contains(peerId);
if (!exists) return false;
curTx.put(data.withInitialSyncDone(data.initialSyncDone().minus(peerId)));
return true;
});
}
}

View File

@@ -3,30 +3,18 @@ package com.usatiuk.dhfs.objects.repository;
import com.usatiuk.dhfs.objects.JData;
import com.usatiuk.dhfs.objects.JObjectKey;
import com.usatiuk.dhfs.objects.PeerId;
import org.pcollections.HashTreePSet;
import org.pcollections.PSet;
import java.io.Serializable;
import java.security.KeyPair;
import java.security.cert.X509Certificate;
public record PersistentRemoteHostsData(PeerId selfUuid,
long selfCounter,
X509Certificate selfCertificate,
KeyPair selfKeyPair,
PSet<PeerId> initialSyncDone) implements JData, Serializable {
KeyPair selfKeyPair) implements JData, Serializable {
public static final JObjectKey KEY = JObjectKey.of("self_peer_data");
@Override
public JObjectKey key() {
return KEY;
}
public PersistentRemoteHostsData withSelfCounter(long selfCounter) {
return new PersistentRemoteHostsData(selfUuid, selfCounter, selfCertificate, selfKeyPair, HashTreePSet.empty());
}
public PersistentRemoteHostsData withInitialSyncDone(PSet<PeerId> initialSyncDone) {
return new PersistentRemoteHostsData(selfUuid, selfCounter, selfCertificate, selfKeyPair, initialSyncDone);
}
}

View File

@@ -13,5 +13,5 @@ public class DeferredInvalidationQueueData implements Serializable {
@Serial
private static final long serialVersionUID = 1L;
public final MultiValuedMap<PeerId, JObjectKey> deferredInvalidations = new HashSetValuedHashMap<>();
public final MultiValuedMap<PeerId, InvalidationQueueEntry> deferredInvalidations = new HashSetValuedHashMap<>();
}

View File

@@ -1,6 +1,5 @@
package com.usatiuk.dhfs.objects.repository.invalidation;
import com.usatiuk.dhfs.objects.JObjectKey;
import com.usatiuk.dhfs.objects.PeerId;
import com.usatiuk.dhfs.objects.repository.PeerManager;
import com.usatiuk.dhfs.utils.SerializationHelper;
@@ -19,7 +18,6 @@ import org.eclipse.microprofile.config.inject.ConfigProperty;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.UUID;
@ApplicationScoped
public class DeferredInvalidationQueueService {
@@ -69,17 +67,17 @@ public class DeferredInvalidationQueueService {
synchronized (this) {
var col = _persistentData.deferredInvalidations.get(host);
for (var s : col) {
Log.trace("Un-deferred invalidation to " + host + " of " + s);
invalidationQueueService.pushDeferredInvalidations(host, s);
Log.tracev("Returning deferred invalidation: {0}", s);
invalidationQueueService.pushDeferredInvalidations(s);
}
col.clear();
}
}
void defer(PeerId host, JObjectKey object) {
void defer(InvalidationQueueEntry entry) {
synchronized (this) {
Log.trace("Deferred invalidation to " + host + " of " + object);
_persistentData.deferredInvalidations.put(host, object);
Log.tracev("Deferred invalidation: {0}", entry);
_persistentData.deferredInvalidations.put(entry.peer(), entry);
}
}
}

View File

@@ -0,0 +1,7 @@
package com.usatiuk.dhfs.objects.repository.invalidation;
import com.usatiuk.dhfs.objects.JObjectKey;
import com.usatiuk.dhfs.objects.PeerId;
public record InvalidationQueueEntry(PeerId peer, JObjectKey key, boolean forced) {
}

View File

@@ -1,6 +1,5 @@
package com.usatiuk.dhfs.objects.repository.invalidation;
import com.usatiuk.dhfs.objects.JData;
import com.usatiuk.dhfs.objects.JObjectKey;
import com.usatiuk.dhfs.objects.PeerId;
import com.usatiuk.dhfs.objects.repository.PeerManager;
@@ -15,7 +14,6 @@ import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
import jakarta.inject.Inject;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.apache.commons.lang3.tuple.Pair;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import java.util.concurrent.ExecutorService;
@@ -25,7 +23,7 @@ import java.util.concurrent.atomic.AtomicReference;
@ApplicationScoped
public class InvalidationQueueService {
private final HashSetDelayedBlockingQueue<Pair<PeerId, JObjectKey>> _queue;
private final HashSetDelayedBlockingQueue<InvalidationQueueEntry> _queue;
private final AtomicReference<ConcurrentHashSet<JObjectKey>> _toAllQueue = new AtomicReference<>(new ConcurrentHashSet<>());
@Inject
PeerManager remoteHostManager;
@@ -65,7 +63,7 @@ public class InvalidationQueueService {
var data = _queue.close();
Log.info("Will defer " + data.size() + " invalidations on shutdown");
for (var e : data)
deferredInvalidationQueueService.defer(e.getLeft(), e.getRight());
deferredInvalidationQueueService.defer(e);
}
private void sender() {
@@ -89,9 +87,9 @@ public class InvalidationQueueService {
var hostInfo = remoteHostManager.getHostStateSnapshot();
for (var o : toAllQueue) {
for (var h : hostInfo.available())
_queue.add(Pair.of(h, o));
_queue.add(new InvalidationQueueEntry(h, o, false));
for (var u : hostInfo.unavailable())
deferredInvalidationQueueService.defer(u, o);
deferredInvalidationQueueService.defer(new InvalidationQueueEntry(u, o, false));
}
}
}
@@ -102,19 +100,19 @@ public class InvalidationQueueService {
long success = 0;
for (var e : data) {
if (peerInfoService.getPeerInfo(e.getLeft()).isEmpty()) continue;
if (peerInfoService.getPeerInfo(e.peer()).isEmpty()) continue;
if (!remoteHostManager.isReachable(e.getLeft())) {
deferredInvalidationQueueService.defer(e.getLeft(), e.getRight());
if (!remoteHostManager.isReachable(e.peer())) {
deferredInvalidationQueueService.defer(e);
continue;
}
try {
opPusher.doPush(e.getLeft(), e.getRight());
opPusher.doPush(e);
success++;
} catch (Exception ex) {
Log.info("Failed to send invalidation to " + e.getLeft() + ", will retry", ex);
pushInvalidationToOne(e.getLeft(), e.getRight());
Log.warnv("Failed to send invalidation to {0}, will retry: {1}", e, ex);
pushInvalidationToOne(e);
}
if (_shutdown) {
Log.info("Invalidation sender exiting");
@@ -150,18 +148,23 @@ public class InvalidationQueueService {
}
}
public void pushInvalidationToOne(PeerId host, JObjectKey obj) {
if (remoteHostManager.isReachable(host))
_queue.add(Pair.of(host, obj));
void pushInvalidationToOne(InvalidationQueueEntry entry) {
if (remoteHostManager.isReachable(entry.peer()))
_queue.add(entry);
else
deferredInvalidationQueueService.defer(host, obj);
deferredInvalidationQueueService.defer(entry);
}
public void pushInvalidationToOne(PeerId host, JData obj) {
pushInvalidationToOne(host, obj.key());
public void pushInvalidationToOne(PeerId host, JObjectKey obj, boolean forced) {
var entry = new InvalidationQueueEntry(host, obj, forced);
pushInvalidationToOne(entry);
}
protected void pushDeferredInvalidations(PeerId host, JObjectKey name) {
_queue.add(Pair.of(host, name));
public void pushInvalidationToOne(PeerId host, JObjectKey obj) {
pushInvalidationToOne(host, obj, false);
}
void pushDeferredInvalidations(InvalidationQueueEntry entry) {
_queue.add(entry);
}
}

View File

@@ -1,6 +1,9 @@
package com.usatiuk.dhfs.objects.repository.invalidation;
import com.usatiuk.dhfs.objects.*;
import com.usatiuk.dhfs.objects.JData;
import com.usatiuk.dhfs.objects.RemoteObjectMeta;
import com.usatiuk.dhfs.objects.RemoteTransaction;
import com.usatiuk.dhfs.objects.TransactionManager;
import com.usatiuk.dhfs.objects.jkleppmanntree.JKleppmannTreeOpWrapper;
import com.usatiuk.dhfs.objects.jkleppmanntree.structs.JKleppmannTreePersistentData;
import com.usatiuk.dhfs.objects.repository.RemoteObjectServiceClient;
@@ -23,23 +26,23 @@ public class OpPusher {
@Inject
InvalidationQueueService invalidationQueueService;
public void doPush(PeerId op, JObjectKey key) {
public void doPush(InvalidationQueueEntry entry) {
Op info = txm.run(() -> {
var obj = curTx.get(JData.class, key).orElse(null);
var obj = curTx.get(JData.class, entry.key()).orElse(null);
switch (obj) {
case RemoteObjectMeta remote -> {
return new IndexUpdateOp(key, remote.changelog());
return new IndexUpdateOp(entry.key(), remote.changelog());
}
case JKleppmannTreePersistentData pd -> {
var maybeQueue = pd.queues().get(op);
if(maybeQueue == null || maybeQueue.isEmpty()) {
var maybeQueue = pd.queues().get(entry.peer());
if (maybeQueue == null || maybeQueue.isEmpty()) {
return null;
}
var ret = new JKleppmannTreeOpWrapper(key, pd.queues().get(op).firstEntry().getValue());
var newPd = pd.withQueues(pd.queues().plus(op, pd.queues().get(op).minus(ret.op().timestamp())));
var ret = new JKleppmannTreeOpWrapper(entry.key(), pd.queues().get(entry.peer()).firstEntry().getValue());
var newPd = pd.withQueues(pd.queues().plus(entry.peer(), pd.queues().get(entry.peer()).minus(ret.op().timestamp())));
curTx.put(newPd);
if (!newPd.queues().get(op).isEmpty())
invalidationQueueService.pushInvalidationToOne(op, pd.key());
if (!newPd.queues().get(entry.peer()).isEmpty())
invalidationQueueService.pushInvalidationToOne(entry.peer(), pd.key());
return ret;
}
case null,
@@ -51,6 +54,6 @@ public class OpPusher {
if (info == null) {
return;
}
remoteObjectServiceClient.pushOps(op, List.of(info));
remoteObjectServiceClient.pushOps(entry.peer(), List.of(info));
}
}