mirror of
https://github.com/usatiuk/dhfs.git
synced 2025-10-29 04:57:48 +01:00
more robust shutdown handling
This commit is contained in:
@@ -0,0 +1,43 @@
|
||||
package com.usatiuk.dhfs;
|
||||
|
||||
import io.quarkus.logging.Log;
|
||||
import io.quarkus.runtime.ShutdownEvent;
|
||||
import io.quarkus.runtime.StartupEvent;
|
||||
import jakarta.annotation.Priority;
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
import jakarta.enterprise.event.Observes;
|
||||
import org.eclipse.microprofile.config.inject.ConfigProperty;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Paths;
|
||||
|
||||
@ApplicationScoped
|
||||
public class ShutdownChecker {
|
||||
@ConfigProperty(name = "dhfs.objects.root")
|
||||
String dataRoot;
|
||||
private static final String dataFileName = "running";
|
||||
|
||||
boolean _cleanShutdown = true;
|
||||
boolean _initialized = false;
|
||||
|
||||
void init(@Observes @Priority(1) StartupEvent event) throws IOException {
|
||||
Paths.get(dataRoot).toFile().mkdirs();
|
||||
Log.info("Initializing with root " + dataRoot);
|
||||
if (Paths.get(dataRoot).resolve(dataFileName).toFile().exists()) {
|
||||
_cleanShutdown = false;
|
||||
Log.error("Unclean shutdown detected!");
|
||||
} else {
|
||||
Paths.get(dataRoot).resolve(dataFileName).toFile().createNewFile();
|
||||
}
|
||||
_initialized = true;
|
||||
}
|
||||
|
||||
void shutdown(@Observes @Priority(100000) ShutdownEvent event) throws IOException {
|
||||
Paths.get(dataRoot).resolve(dataFileName).toFile().delete();
|
||||
}
|
||||
|
||||
public boolean lastShutdownClean() {
|
||||
if (!_initialized) throw new IllegalStateException("ShutdownChecker not initialized");
|
||||
return _cleanShutdown;
|
||||
}
|
||||
}
|
||||
@@ -16,6 +16,8 @@ import java.nio.file.Files;
|
||||
import java.nio.file.Paths;
|
||||
import java.util.UUID;
|
||||
|
||||
import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
|
||||
|
||||
@ApplicationScoped
|
||||
public class DeferredInvalidationQueueService {
|
||||
@Inject
|
||||
@@ -33,7 +35,10 @@ public class DeferredInvalidationQueueService {
|
||||
Paths.get(dataRoot).toFile().mkdirs();
|
||||
Log.info("Initializing with root " + dataRoot);
|
||||
if (Paths.get(dataRoot).resolve(dataFileName).toFile().exists()) {
|
||||
Log.info("Reading hosts");
|
||||
Log.info("Reading invalidation queue");
|
||||
_persistentData = SerializationHelper.deserialize(Files.readAllBytes(Paths.get(dataRoot).resolve(dataFileName)));
|
||||
} else if (Paths.get(dataRoot).resolve(dataFileName + ".bak").toFile().exists()) {
|
||||
Log.warn("Reading invalidation queue from backup");
|
||||
_persistentData = SerializationHelper.deserialize(Files.readAllBytes(Paths.get(dataRoot).resolve(dataFileName)));
|
||||
}
|
||||
|
||||
@@ -42,10 +47,22 @@ public class DeferredInvalidationQueueService {
|
||||
|
||||
void shutdown(@Observes @Priority(300) ShutdownEvent event) throws IOException {
|
||||
Log.info("Saving deferred invalidations");
|
||||
Files.write(Paths.get(dataRoot).resolve(dataFileName), SerializationUtils.serialize(_persistentData));
|
||||
writeData();
|
||||
Log.info("Saved deferred invalidations");
|
||||
}
|
||||
|
||||
|
||||
private void writeData() {
|
||||
try {
|
||||
if (Paths.get(dataRoot).resolve(dataFileName).toFile().exists())
|
||||
Files.move(Paths.get(dataRoot).resolve(dataFileName), Paths.get(dataRoot).resolve(dataFileName + ".bak"), REPLACE_EXISTING);
|
||||
Files.write(Paths.get(dataRoot).resolve(dataFileName), SerializationUtils.serialize(_persistentData));
|
||||
} catch (IOException iex) {
|
||||
Log.error("Error writing deferred invalidations data", iex);
|
||||
throw new RuntimeException(iex);
|
||||
}
|
||||
}
|
||||
|
||||
void returnForHost(UUID host) {
|
||||
synchronized (this) {
|
||||
var col = _persistentData.getDeferredInvalidations().get(host);
|
||||
|
||||
@@ -20,6 +20,8 @@ public class PersistentRemoteHostsData implements Serializable {
|
||||
@Getter
|
||||
private final AtomicLong _selfCounter = new AtomicLong();
|
||||
@Getter
|
||||
private final AtomicLong _irregularShutdownCounter = new AtomicLong();
|
||||
@Getter
|
||||
@Setter
|
||||
private X509Certificate _selfCertificate = null;
|
||||
@Getter
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package com.usatiuk.dhfs.objects.repository;
|
||||
|
||||
import com.usatiuk.dhfs.SerializationHelper;
|
||||
import com.usatiuk.dhfs.ShutdownChecker;
|
||||
import com.usatiuk.dhfs.objects.jrepository.JObject;
|
||||
import com.usatiuk.dhfs.objects.jrepository.JObjectManager;
|
||||
import com.usatiuk.dhfs.objects.jrepository.JObjectResolver;
|
||||
@@ -31,6 +32,8 @@ import java.util.Optional;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
|
||||
import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
|
||||
|
||||
@ApplicationScoped
|
||||
public class PersistentRemoteHostsService {
|
||||
final String dataFileName = "hosts";
|
||||
@@ -48,6 +51,8 @@ public class PersistentRemoteHostsService {
|
||||
InvalidationQueueService invalidationQueueService;
|
||||
@Inject
|
||||
RpcClientFactory rpcClientFactory;
|
||||
@Inject
|
||||
ShutdownChecker shutdownChecker;
|
||||
private PersistentRemoteHosts _persistentData = new PersistentRemoteHosts();
|
||||
|
||||
private UUID _selfUuid;
|
||||
@@ -58,6 +63,9 @@ public class PersistentRemoteHostsService {
|
||||
if (Paths.get(dataRoot).resolve(dataFileName).toFile().exists()) {
|
||||
Log.info("Reading hosts");
|
||||
_persistentData = SerializationHelper.deserialize(Files.readAllBytes(Paths.get(dataRoot).resolve(dataFileName)));
|
||||
} else if (Paths.get(dataRoot).resolve(dataFileName + ".bak").toFile().exists()) {
|
||||
Log.warn("Reading hosts from backup");
|
||||
_persistentData = SerializationHelper.deserialize(Files.readAllBytes(Paths.get(dataRoot).resolve(dataFileName)));
|
||||
}
|
||||
_selfUuid = _persistentData.runReadLocked(PersistentRemoteHostsData::getSelfUuid);
|
||||
|
||||
@@ -78,6 +86,10 @@ public class PersistentRemoteHostsService {
|
||||
var dir = jObjectManager.put(newpd, Optional.empty());
|
||||
}
|
||||
|
||||
if (!shutdownChecker.lastShutdownClean()) {
|
||||
_persistentData.getData().getIrregularShutdownCounter().addAndGet(1);
|
||||
}
|
||||
|
||||
jObjectResolver.registerWriteListener(PersistentPeerInfo.class, this::pushPeerUpdates);
|
||||
jObjectResolver.registerWriteListener(PeerDirectory.class, this::pushPeerUpdates);
|
||||
|
||||
@@ -89,14 +101,26 @@ public class PersistentRemoteHostsService {
|
||||
|
||||
Files.writeString(Paths.get(dataRoot, "self_uuid"), _selfUuid.toString());
|
||||
Log.info("Self uuid is: " + _selfUuid.toString());
|
||||
writeData();
|
||||
}
|
||||
|
||||
void shutdown(@Observes @Priority(300) ShutdownEvent event) throws IOException {
|
||||
Log.info("Saving hosts");
|
||||
Files.write(Paths.get(dataRoot).resolve(dataFileName), SerializationUtils.serialize(_persistentData));
|
||||
writeData();
|
||||
Log.info("Shutdown");
|
||||
}
|
||||
|
||||
private void writeData() {
|
||||
try {
|
||||
if (Paths.get(dataRoot).resolve(dataFileName).toFile().exists())
|
||||
Files.move(Paths.get(dataRoot).resolve(dataFileName), Paths.get(dataRoot).resolve(dataFileName + ".bak"), REPLACE_EXISTING);
|
||||
Files.write(Paths.get(dataRoot).resolve(dataFileName), SerializationUtils.serialize(_persistentData));
|
||||
} catch (IOException iex) {
|
||||
Log.error("Error writing persistent hosts data", iex);
|
||||
throw new RuntimeException(iex);
|
||||
}
|
||||
}
|
||||
|
||||
private JObject<PeerDirectory> getPeerDirectory() {
|
||||
var got = jObjectManager.get(PeerDirectory.PeerDirectoryObjName).orElseThrow(() -> new IllegalStateException("Peer directory not found"));
|
||||
got.runReadLocked(JObject.ResolutionStrategy.LOCAL_ONLY, (m, d) -> {
|
||||
@@ -153,7 +177,12 @@ public class PersistentRemoteHostsService {
|
||||
}
|
||||
|
||||
public String getUniqueId() {
|
||||
return _selfUuid.toString() + _persistentData.getData().getSelfCounter().addAndGet(1);
|
||||
StringBuilder sb = new StringBuilder(64);
|
||||
sb.append(_selfUuid);
|
||||
sb.append(_persistentData.getData().getIrregularShutdownCounter());
|
||||
sb.append("_");
|
||||
sb.append(_persistentData.getData().getSelfCounter().addAndGet(1));
|
||||
return sb.toString();
|
||||
}
|
||||
|
||||
public PersistentPeerInfo getInfo(UUID name) {
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
package com.usatiuk.dhfs.objects.repository;
|
||||
|
||||
import com.usatiuk.dhfs.ShutdownChecker;
|
||||
import com.usatiuk.dhfs.objects.repository.peersync.PeerSyncApiClientDynamic;
|
||||
import com.usatiuk.dhfs.objects.repository.peersync.PersistentPeerInfo;
|
||||
import com.usatiuk.dhfs.objects.repository.webapi.AvailablePeerInfo;
|
||||
@@ -34,6 +35,8 @@ public class RemoteHostManager {
|
||||
RpcClientFactory rpcClientFactory;
|
||||
@Inject
|
||||
PeerSyncApiClientDynamic peerSyncApiClient;
|
||||
@Inject
|
||||
ShutdownChecker shutdownChecker;
|
||||
@ConfigProperty(name = "dhfs.objects.sync.ping.timeout")
|
||||
long pingTimeout;
|
||||
private ExecutorService _heartbeatExecutor;
|
||||
@@ -115,8 +118,11 @@ public class RemoteHostManager {
|
||||
|
||||
Log.info("Connected to " + host);
|
||||
|
||||
if (persistentRemoteHostsService.markInitialSyncDone(host))
|
||||
if (persistentRemoteHostsService.markInitialSyncDone(host) || !shutdownChecker.lastShutdownClean()) {
|
||||
if (!shutdownChecker.lastShutdownClean())
|
||||
Log.info("Resyncing with " + host + " as last shutdown wasn't clean");
|
||||
syncHandler.doInitialResync(host);
|
||||
}
|
||||
}
|
||||
|
||||
public void handleConnectionError(UUID host) {
|
||||
|
||||
Reference in New Issue
Block a user