mirror of
https://github.com/usatiuk/dhfs.git
synced 2025-10-28 20:47:49 +01:00
Server: push resync after crash
This commit is contained in:
@@ -0,0 +1,36 @@
|
||||
package com.usatiuk.dhfs;
|
||||
|
||||
import com.usatiuk.dhfs.repository.InitialSyncProcessor;
|
||||
import com.usatiuk.dhfs.repository.invalidation.InvalidationQueueService;
|
||||
import com.usatiuk.dhfs.repository.peersync.PeerInfoService;
|
||||
import com.usatiuk.objects.JObjectKey;
|
||||
import com.usatiuk.objects.transaction.Transaction;
|
||||
import io.quarkus.logging.Log;
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
import jakarta.inject.Inject;
|
||||
|
||||
@ApplicationScoped
|
||||
public class RemoteObjectInitialSyncProcessor implements InitialSyncProcessor<RemoteObjectMeta> {
|
||||
@Inject
|
||||
InvalidationQueueService invalidationQueueService;
|
||||
@Inject
|
||||
Transaction curTx;
|
||||
@Inject
|
||||
PeerInfoService peerInfoService;
|
||||
|
||||
@Override
|
||||
public void prepareForInitialSync(PeerId from, JObjectKey key) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleCrash(JObjectKey key) {
|
||||
var data = curTx.get(RemoteObjectMeta.class, key).orElseThrow();
|
||||
var versionSum = data.versionSum();
|
||||
for (var p : peerInfoService.getPeersNoSelf()) {
|
||||
if (data.knownRemoteVersions().getOrDefault(p.id(), 0L) != versionSum) {
|
||||
invalidationQueueService.pushInvalidationToOne(p.id(), key);
|
||||
Log.infov("Pushing after crash invalidation to {0} for {1}", p.id(), key);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,9 +1,11 @@
|
||||
package com.usatiuk.dhfs.jkleppmanntree;
|
||||
|
||||
import com.usatiuk.objects.JObjectKey;
|
||||
import com.usatiuk.dhfs.PeerId;
|
||||
import com.usatiuk.dhfs.jkleppmanntree.structs.JKleppmannTreePersistentData;
|
||||
import com.usatiuk.dhfs.repository.InitialSyncProcessor;
|
||||
import com.usatiuk.dhfs.repository.invalidation.InvalidationQueueService;
|
||||
import com.usatiuk.objects.JObjectKey;
|
||||
import io.quarkus.logging.Log;
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
import jakarta.inject.Inject;
|
||||
|
||||
@@ -11,10 +13,18 @@ import jakarta.inject.Inject;
|
||||
public class JKleppmannTreeInitialSyncProcessor implements InitialSyncProcessor<JKleppmannTreePersistentData> {
|
||||
@Inject
|
||||
JKleppmannTreeManager jKleppmannTreeManager;
|
||||
@Inject
|
||||
InvalidationQueueService invalidationQueueService;
|
||||
|
||||
@Override
|
||||
public void prepareForInitialSync(PeerId from, JObjectKey key) {
|
||||
var tree = jKleppmannTreeManager.getTree(key);
|
||||
tree.recordBootstrap(from);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleCrash(JObjectKey key) {
|
||||
invalidationQueueService.pushInvalidationToAll(key);
|
||||
Log.infov("Pushing after crash invalidation to all for {0}", key);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,9 +1,10 @@
|
||||
package com.usatiuk.dhfs.repository;
|
||||
|
||||
import com.usatiuk.dhfs.PeerId;
|
||||
import com.usatiuk.objects.JData;
|
||||
import com.usatiuk.objects.JObjectKey;
|
||||
import com.usatiuk.dhfs.PeerId;
|
||||
|
||||
public interface InitialSyncProcessor<T extends JData> {
|
||||
void prepareForInitialSync(PeerId from, JObjectKey key);
|
||||
void handleCrash(JObjectKey key);
|
||||
}
|
||||
|
||||
@@ -3,14 +3,18 @@ package com.usatiuk.dhfs.repository;
|
||||
import com.usatiuk.dhfs.JDataRemote;
|
||||
import com.usatiuk.dhfs.PeerId;
|
||||
import com.usatiuk.dhfs.RemoteTransaction;
|
||||
import com.usatiuk.dhfs.ShutdownChecker;
|
||||
import com.usatiuk.dhfs.repository.invalidation.InvalidationQueueService;
|
||||
import com.usatiuk.objects.JData;
|
||||
import com.usatiuk.objects.JObjectKey;
|
||||
import com.usatiuk.objects.iterators.IteratorStart;
|
||||
import com.usatiuk.dhfs.repository.invalidation.InvalidationQueueService;
|
||||
import com.usatiuk.objects.transaction.Transaction;
|
||||
import com.usatiuk.objects.transaction.TransactionManager;
|
||||
import io.quarkus.logging.Log;
|
||||
import io.quarkus.runtime.StartupEvent;
|
||||
import jakarta.annotation.Priority;
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
import jakarta.enterprise.event.Observes;
|
||||
import jakarta.enterprise.inject.Instance;
|
||||
import jakarta.inject.Inject;
|
||||
import org.pcollections.HashTreePSet;
|
||||
@@ -40,6 +44,8 @@ public class SyncHandler {
|
||||
RemoteTransaction remoteTx;
|
||||
@Inject
|
||||
RemoteObjectServiceClient remoteObjectServiceClient;
|
||||
@Inject
|
||||
ShutdownChecker shutdownChecker;
|
||||
|
||||
public SyncHandler(Instance<ObjSyncHandler<?, ?>> syncHandlers, Instance<InitialSyncProcessor<?>> initialSyncProcessors) {
|
||||
HashMap<Class<? extends JDataRemote>, ObjSyncHandler> objToHandlerMap = new HashMap<>();
|
||||
@@ -113,6 +119,33 @@ public class SyncHandler {
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public void resyncAfterCrash(@Observes @Priority(100000) StartupEvent event) {
|
||||
if (shutdownChecker.lastShutdownClean())
|
||||
return;
|
||||
List<JObjectKey> objs = new LinkedList<>();
|
||||
txm.run(() -> {
|
||||
try (var it = curTx.getIterator(IteratorStart.GE, JObjectKey.first())) {
|
||||
while (it.hasNext()) {
|
||||
var key = it.peekNextKey();
|
||||
objs.add(key);
|
||||
// TODO: Nested transactions
|
||||
it.skip();
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
for (var obj : objs) {
|
||||
txm.run(() -> {
|
||||
var proc = curTx.get(JData.class, obj).flatMap(o -> Optional.ofNullable(_initialSyncProcessors.get(o.getClass()))).orElse(null);
|
||||
if (proc != null) {
|
||||
proc.handleCrash(obj);
|
||||
}
|
||||
Log.infov("Handled crash of {0}", obj);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
public void doInitialSync(PeerId peer) {
|
||||
List<JObjectKey> objs = new LinkedList<>();
|
||||
txm.run(() -> {
|
||||
|
||||
Reference in New Issue
Block a user