mirror of
https://github.com/usatiuk/dhfs.git
synced 2025-10-29 04:57:48 +01:00
Compare commits
10 Commits
d27c51583d
...
18d775cf68
| Author | SHA1 | Date | |
|---|---|---|---|
| 18d775cf68 | |||
| 1de1ff672a | |||
| bacf7307f2 | |||
| 468aaa2b09 | |||
| b83860bef4 | |||
| 4b9269e6d6 | |||
| aa45248e1e | |||
| c690ddc910 | |||
| 0d3367c202 | |||
| 907881f327 |
@@ -0,0 +1,43 @@
|
||||
package com.usatiuk.dhfs;
|
||||
|
||||
import io.quarkus.logging.Log;
|
||||
import io.quarkus.runtime.ShutdownEvent;
|
||||
import io.quarkus.runtime.StartupEvent;
|
||||
import jakarta.annotation.Priority;
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
import jakarta.enterprise.event.Observes;
|
||||
import org.eclipse.microprofile.config.inject.ConfigProperty;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Paths;
|
||||
|
||||
@ApplicationScoped
|
||||
public class ShutdownChecker {
|
||||
@ConfigProperty(name = "dhfs.objects.root")
|
||||
String dataRoot;
|
||||
private static final String dataFileName = "running";
|
||||
|
||||
boolean _cleanShutdown = true;
|
||||
boolean _initialized = false;
|
||||
|
||||
void init(@Observes @Priority(1) StartupEvent event) throws IOException {
|
||||
Paths.get(dataRoot).toFile().mkdirs();
|
||||
Log.info("Initializing with root " + dataRoot);
|
||||
if (Paths.get(dataRoot).resolve(dataFileName).toFile().exists()) {
|
||||
_cleanShutdown = false;
|
||||
Log.error("Unclean shutdown detected!");
|
||||
} else {
|
||||
Paths.get(dataRoot).resolve(dataFileName).toFile().createNewFile();
|
||||
}
|
||||
_initialized = true;
|
||||
}
|
||||
|
||||
void shutdown(@Observes @Priority(100000) ShutdownEvent event) throws IOException {
|
||||
Paths.get(dataRoot).resolve(dataFileName).toFile().delete();
|
||||
}
|
||||
|
||||
public boolean lastShutdownClean() {
|
||||
if (!_initialized) throw new IllegalStateException("ShutdownChecker not initialized");
|
||||
return _cleanShutdown;
|
||||
}
|
||||
}
|
||||
@@ -1,6 +1,5 @@
|
||||
package com.usatiuk.dhfs.objects.jrepository;
|
||||
|
||||
import com.google.common.collect.Streams;
|
||||
import com.usatiuk.dhfs.objects.repository.PersistentRemoteHostsService;
|
||||
import com.usatiuk.dhfs.objects.repository.RemoteObjectServiceClient;
|
||||
import com.usatiuk.dhfs.objects.repository.autosync.AutoSyncProcessor;
|
||||
@@ -15,7 +14,6 @@ import org.eclipse.microprofile.config.inject.ConfigProperty;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
@ApplicationScoped
|
||||
public class JObjectRefProcessor {
|
||||
@@ -171,12 +169,14 @@ public class JObjectRefProcessor {
|
||||
m.markDeleted();
|
||||
|
||||
Collection<String> extracted = null;
|
||||
if (got.getData() != null) extracted = got.getData().extractRefs();
|
||||
if (got.getData() != null)
|
||||
extracted = got.getData().extractRefs();
|
||||
Collection<String> saved = got.getMeta().getSavedRefs();
|
||||
|
||||
got.discardData();
|
||||
|
||||
if (got.getMeta().getSavedRefs() != null)
|
||||
for (var r : got.getMeta().getSavedRefs()) deleteRef(got, r);
|
||||
if (saved != null)
|
||||
for (var r : saved) deleteRef(got, r);
|
||||
if (extracted != null)
|
||||
for (var r : extracted) deleteRef(got, r);
|
||||
|
||||
|
||||
@@ -150,12 +150,14 @@ public class JObjectResolver {
|
||||
self.getMeta().markDeleted();
|
||||
|
||||
Collection<String> extracted = null;
|
||||
if (self.getData() != null) extracted = self.getData().extractRefs();
|
||||
if (self.getData() != null)
|
||||
extracted = self.getData().extractRefs();
|
||||
Collection<String> saved = self.getMeta().getSavedRefs();
|
||||
|
||||
self.discardData();
|
||||
|
||||
if (self.getMeta().getSavedRefs() != null)
|
||||
for (var r : self.getMeta().getSavedRefs()) quickDeleteRef(self, r);
|
||||
if (saved != null)
|
||||
for (var r : saved) quickDeleteRef(self, r);
|
||||
if (extracted != null)
|
||||
for (var r : extracted) quickDeleteRef(self, r);
|
||||
}
|
||||
|
||||
@@ -59,6 +59,8 @@ public class JObjectWriteback {
|
||||
float watermarkLowRatio;
|
||||
@ConfigProperty(name = "dhfs.objects.writeback.threads")
|
||||
int writebackThreads;
|
||||
@ConfigProperty(name = "dhfs.objects.writeback.delay")
|
||||
long promotionDelay;
|
||||
|
||||
private final AtomicLong _currentSize = new AtomicLong(0);
|
||||
private final AtomicBoolean _watermarkReached = new AtomicBoolean(false);
|
||||
@@ -116,14 +118,15 @@ public class JObjectWriteback {
|
||||
private void writeback() {
|
||||
while (!_shutdown.get()) {
|
||||
try {
|
||||
QueueEntry got
|
||||
= _watermarkReached.get()
|
||||
? _writeQueue.getNoDelay()
|
||||
: _writeQueue.get();
|
||||
QueueEntry got = _writeQueue.get();
|
||||
|
||||
try {
|
||||
_currentSize.addAndGet(-got._size);
|
||||
flushOne(got._obj);
|
||||
if (_currentSize.get() <= sizeLimit)
|
||||
synchronized (this) {
|
||||
this.notifyAll();
|
||||
}
|
||||
} catch (Exception e) {
|
||||
Log.error("Failed writing object " + got._obj.getName() + ", will retry.", e);
|
||||
try {
|
||||
@@ -188,21 +191,40 @@ public class JObjectWriteback {
|
||||
if (!_watermarkReached.get()) {
|
||||
Log.trace("Watermark reached");
|
||||
_watermarkReached.set(true);
|
||||
_writeQueue.interrupt();
|
||||
_writeQueue.setDelay(0);
|
||||
}
|
||||
} else if (_currentSize.get() <= (watermarkLowRatio * sizeLimit)) {
|
||||
if (_watermarkReached.get())
|
||||
if (_watermarkReached.get()) {
|
||||
Log.trace("Watermark reset");
|
||||
_watermarkReached.set(false);
|
||||
_watermarkReached.set(false);
|
||||
_writeQueue.setDelay(promotionDelay);
|
||||
}
|
||||
}
|
||||
|
||||
if (_currentSize.get() > sizeLimit) {
|
||||
try {
|
||||
flushOneImmediate(object.getMeta(), object.getData());
|
||||
return;
|
||||
} catch (Exception e) {
|
||||
Log.error("Failed writing object " + object.getName(), e);
|
||||
throw e;
|
||||
long started = System.currentTimeMillis();
|
||||
final long timeout = 15000L; // FIXME:
|
||||
boolean finished = false;
|
||||
while (!finished && System.currentTimeMillis() - started < timeout) {
|
||||
synchronized (this) {
|
||||
try {
|
||||
this.wait();
|
||||
finished = true;
|
||||
} catch (InterruptedException ignored) {
|
||||
}
|
||||
}
|
||||
if (_currentSize.get() > sizeLimit)
|
||||
finished = false;
|
||||
}
|
||||
if (!finished) {
|
||||
Log.error("Timed out waiting for writeback to drain");
|
||||
try {
|
||||
flushOneImmediate(object.getMeta(), object.getData());
|
||||
return;
|
||||
} catch (Exception e) {
|
||||
Log.error("Failed writing object " + object.getName(), e);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -16,6 +16,8 @@ import java.nio.file.Files;
|
||||
import java.nio.file.Paths;
|
||||
import java.util.UUID;
|
||||
|
||||
import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
|
||||
|
||||
@ApplicationScoped
|
||||
public class DeferredInvalidationQueueService {
|
||||
@Inject
|
||||
@@ -33,7 +35,10 @@ public class DeferredInvalidationQueueService {
|
||||
Paths.get(dataRoot).toFile().mkdirs();
|
||||
Log.info("Initializing with root " + dataRoot);
|
||||
if (Paths.get(dataRoot).resolve(dataFileName).toFile().exists()) {
|
||||
Log.info("Reading hosts");
|
||||
Log.info("Reading invalidation queue");
|
||||
_persistentData = SerializationHelper.deserialize(Files.readAllBytes(Paths.get(dataRoot).resolve(dataFileName)));
|
||||
} else if (Paths.get(dataRoot).resolve(dataFileName + ".bak").toFile().exists()) {
|
||||
Log.warn("Reading invalidation queue from backup");
|
||||
_persistentData = SerializationHelper.deserialize(Files.readAllBytes(Paths.get(dataRoot).resolve(dataFileName)));
|
||||
}
|
||||
|
||||
@@ -42,10 +47,22 @@ public class DeferredInvalidationQueueService {
|
||||
|
||||
void shutdown(@Observes @Priority(300) ShutdownEvent event) throws IOException {
|
||||
Log.info("Saving deferred invalidations");
|
||||
Files.write(Paths.get(dataRoot).resolve(dataFileName), SerializationUtils.serialize(_persistentData));
|
||||
writeData();
|
||||
Log.info("Saved deferred invalidations");
|
||||
}
|
||||
|
||||
|
||||
private void writeData() {
|
||||
try {
|
||||
if (Paths.get(dataRoot).resolve(dataFileName).toFile().exists())
|
||||
Files.move(Paths.get(dataRoot).resolve(dataFileName), Paths.get(dataRoot).resolve(dataFileName + ".bak"), REPLACE_EXISTING);
|
||||
Files.write(Paths.get(dataRoot).resolve(dataFileName), SerializationUtils.serialize(_persistentData));
|
||||
} catch (IOException iex) {
|
||||
Log.error("Error writing deferred invalidations data", iex);
|
||||
throw new RuntimeException(iex);
|
||||
}
|
||||
}
|
||||
|
||||
void returnForHost(UUID host) {
|
||||
synchronized (this) {
|
||||
var col = _persistentData.getDeferredInvalidations().get(host);
|
||||
|
||||
@@ -11,12 +11,10 @@ import jakarta.annotation.Priority;
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
import jakarta.enterprise.event.Observes;
|
||||
import jakarta.inject.Inject;
|
||||
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
import org.eclipse.microprofile.config.inject.ConfigProperty;
|
||||
|
||||
import java.lang.ref.Reference;
|
||||
import java.util.HashSet;
|
||||
import java.util.LinkedHashSet;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
@@ -46,7 +44,7 @@ public class InvalidationQueueService {
|
||||
private final HashSetDelayedBlockingQueue<Pair<UUID, String>> _queue;
|
||||
private final AtomicReference<ConcurrentHashSet<String>> _toAllQueue = new AtomicReference<>(new ConcurrentHashSet<>());
|
||||
private ExecutorService _executor;
|
||||
private boolean _shutdown = false;
|
||||
private volatile boolean _shutdown = false;
|
||||
|
||||
public InvalidationQueueService(@ConfigProperty(name = "dhfs.objects.invalidation.delay") int delay) {
|
||||
_queue = new HashSetDelayedBlockingQueue<>(delay);
|
||||
@@ -54,7 +52,11 @@ public class InvalidationQueueService {
|
||||
|
||||
@Startup
|
||||
void init() {
|
||||
_executor = Executors.newFixedThreadPool(threads);
|
||||
BasicThreadFactory factory = new BasicThreadFactory.Builder()
|
||||
.namingPattern("invalidation-%d")
|
||||
.build();
|
||||
|
||||
_executor = Executors.newFixedThreadPool(threads, factory);
|
||||
|
||||
for (int i = 0; i < threads; i++) {
|
||||
_executor.submit(this::sender);
|
||||
|
||||
@@ -20,6 +20,8 @@ public class PersistentRemoteHostsData implements Serializable {
|
||||
@Getter
|
||||
private final AtomicLong _selfCounter = new AtomicLong();
|
||||
@Getter
|
||||
private final AtomicLong _irregularShutdownCounter = new AtomicLong();
|
||||
@Getter
|
||||
@Setter
|
||||
private X509Certificate _selfCertificate = null;
|
||||
@Getter
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package com.usatiuk.dhfs.objects.repository;
|
||||
|
||||
import com.usatiuk.dhfs.SerializationHelper;
|
||||
import com.usatiuk.dhfs.ShutdownChecker;
|
||||
import com.usatiuk.dhfs.objects.jrepository.JObject;
|
||||
import com.usatiuk.dhfs.objects.jrepository.JObjectManager;
|
||||
import com.usatiuk.dhfs.objects.jrepository.JObjectResolver;
|
||||
@@ -31,6 +32,8 @@ import java.util.Optional;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
|
||||
import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
|
||||
|
||||
@ApplicationScoped
|
||||
public class PersistentRemoteHostsService {
|
||||
final String dataFileName = "hosts";
|
||||
@@ -48,6 +51,8 @@ public class PersistentRemoteHostsService {
|
||||
InvalidationQueueService invalidationQueueService;
|
||||
@Inject
|
||||
RpcClientFactory rpcClientFactory;
|
||||
@Inject
|
||||
ShutdownChecker shutdownChecker;
|
||||
private PersistentRemoteHosts _persistentData = new PersistentRemoteHosts();
|
||||
|
||||
private UUID _selfUuid;
|
||||
@@ -58,6 +63,9 @@ public class PersistentRemoteHostsService {
|
||||
if (Paths.get(dataRoot).resolve(dataFileName).toFile().exists()) {
|
||||
Log.info("Reading hosts");
|
||||
_persistentData = SerializationHelper.deserialize(Files.readAllBytes(Paths.get(dataRoot).resolve(dataFileName)));
|
||||
} else if (Paths.get(dataRoot).resolve(dataFileName + ".bak").toFile().exists()) {
|
||||
Log.warn("Reading hosts from backup");
|
||||
_persistentData = SerializationHelper.deserialize(Files.readAllBytes(Paths.get(dataRoot).resolve(dataFileName)));
|
||||
}
|
||||
_selfUuid = _persistentData.runReadLocked(PersistentRemoteHostsData::getSelfUuid);
|
||||
|
||||
@@ -78,6 +86,10 @@ public class PersistentRemoteHostsService {
|
||||
var dir = jObjectManager.put(newpd, Optional.empty());
|
||||
}
|
||||
|
||||
if (!shutdownChecker.lastShutdownClean()) {
|
||||
_persistentData.getData().getIrregularShutdownCounter().addAndGet(1);
|
||||
}
|
||||
|
||||
jObjectResolver.registerWriteListener(PersistentPeerInfo.class, this::pushPeerUpdates);
|
||||
jObjectResolver.registerWriteListener(PeerDirectory.class, this::pushPeerUpdates);
|
||||
|
||||
@@ -89,14 +101,26 @@ public class PersistentRemoteHostsService {
|
||||
|
||||
Files.writeString(Paths.get(dataRoot, "self_uuid"), _selfUuid.toString());
|
||||
Log.info("Self uuid is: " + _selfUuid.toString());
|
||||
writeData();
|
||||
}
|
||||
|
||||
void shutdown(@Observes @Priority(300) ShutdownEvent event) throws IOException {
|
||||
Log.info("Saving hosts");
|
||||
Files.write(Paths.get(dataRoot).resolve(dataFileName), SerializationUtils.serialize(_persistentData));
|
||||
writeData();
|
||||
Log.info("Shutdown");
|
||||
}
|
||||
|
||||
private void writeData() {
|
||||
try {
|
||||
if (Paths.get(dataRoot).resolve(dataFileName).toFile().exists())
|
||||
Files.move(Paths.get(dataRoot).resolve(dataFileName), Paths.get(dataRoot).resolve(dataFileName + ".bak"), REPLACE_EXISTING);
|
||||
Files.write(Paths.get(dataRoot).resolve(dataFileName), SerializationUtils.serialize(_persistentData));
|
||||
} catch (IOException iex) {
|
||||
Log.error("Error writing persistent hosts data", iex);
|
||||
throw new RuntimeException(iex);
|
||||
}
|
||||
}
|
||||
|
||||
private JObject<PeerDirectory> getPeerDirectory() {
|
||||
var got = jObjectManager.get(PeerDirectory.PeerDirectoryObjName).orElseThrow(() -> new IllegalStateException("Peer directory not found"));
|
||||
got.runReadLocked(JObject.ResolutionStrategy.LOCAL_ONLY, (m, d) -> {
|
||||
@@ -153,7 +177,12 @@ public class PersistentRemoteHostsService {
|
||||
}
|
||||
|
||||
public String getUniqueId() {
|
||||
return _selfUuid.toString() + _persistentData.getData().getSelfCounter().addAndGet(1);
|
||||
StringBuilder sb = new StringBuilder(64);
|
||||
sb.append(_selfUuid);
|
||||
sb.append(_persistentData.getData().getIrregularShutdownCounter());
|
||||
sb.append("_");
|
||||
sb.append(_persistentData.getData().getSelfCounter().addAndGet(1));
|
||||
return sb.toString();
|
||||
}
|
||||
|
||||
public PersistentPeerInfo getInfo(UUID name) {
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
package com.usatiuk.dhfs.objects.repository;
|
||||
|
||||
import com.usatiuk.dhfs.ShutdownChecker;
|
||||
import com.usatiuk.dhfs.objects.repository.peersync.PeerSyncApiClientDynamic;
|
||||
import com.usatiuk.dhfs.objects.repository.peersync.PersistentPeerInfo;
|
||||
import com.usatiuk.dhfs.objects.repository.webapi.AvailablePeerInfo;
|
||||
@@ -34,6 +35,8 @@ public class RemoteHostManager {
|
||||
RpcClientFactory rpcClientFactory;
|
||||
@Inject
|
||||
PeerSyncApiClientDynamic peerSyncApiClient;
|
||||
@Inject
|
||||
ShutdownChecker shutdownChecker;
|
||||
@ConfigProperty(name = "dhfs.objects.sync.ping.timeout")
|
||||
long pingTimeout;
|
||||
private ExecutorService _heartbeatExecutor;
|
||||
@@ -115,8 +118,11 @@ public class RemoteHostManager {
|
||||
|
||||
Log.info("Connected to " + host);
|
||||
|
||||
if (persistentRemoteHostsService.markInitialSyncDone(host))
|
||||
if (persistentRemoteHostsService.markInitialSyncDone(host) || !shutdownChecker.lastShutdownClean()) {
|
||||
if (!shutdownChecker.lastShutdownClean())
|
||||
Log.info("Resyncing with " + host + " as last shutdown wasn't clean");
|
||||
syncHandler.doInitialResync(host);
|
||||
}
|
||||
}
|
||||
|
||||
public void handleConnectionError(UUID host) {
|
||||
|
||||
@@ -103,7 +103,7 @@ public class RemoteObjectServiceServer implements DhfsObjectSyncGrpc {
|
||||
var ret = builder.build();
|
||||
|
||||
if (!ret.getDeletionCandidate())
|
||||
for (var rr : ret.getReferrersList())
|
||||
for (var rr : request.getOurReferrersList())
|
||||
autoSyncProcessor.add(rr);
|
||||
|
||||
return Uni.createFrom().item(ret);
|
||||
|
||||
@@ -11,6 +11,7 @@ import jakarta.annotation.Priority;
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
import jakarta.enterprise.event.Observes;
|
||||
import jakarta.inject.Inject;
|
||||
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
|
||||
import org.eclipse.microprofile.config.inject.ConfigProperty;
|
||||
|
||||
import java.util.concurrent.ExecutorService;
|
||||
@@ -34,7 +35,11 @@ public class AutoSyncProcessor {
|
||||
|
||||
@Startup
|
||||
void init() {
|
||||
_autosyncExcecutor = Executors.newFixedThreadPool(autosyncThreads);
|
||||
BasicThreadFactory factory = new BasicThreadFactory.Builder()
|
||||
.namingPattern("autosync-%d")
|
||||
.build();
|
||||
|
||||
_autosyncExcecutor = Executors.newFixedThreadPool(autosyncThreads, factory);
|
||||
for (int i = 0; i < autosyncThreads; i++) {
|
||||
_autosyncExcecutor.submit(this::autosync);
|
||||
}
|
||||
|
||||
@@ -3,38 +3,23 @@ package com.usatiuk.utils;
|
||||
import jakarta.annotation.Nullable;
|
||||
import lombok.Getter;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.LinkedHashMap;
|
||||
|
||||
public class HashSetDelayedBlockingQueue<T> {
|
||||
private class SetElement {
|
||||
private final T _el;
|
||||
private final long _time;
|
||||
|
||||
private SetElement(T el, long time) {
|
||||
_el = el;
|
||||
_time = time;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) return true;
|
||||
if (o == null || getClass() != o.getClass()) return false;
|
||||
SetElement setElement = (SetElement) o;
|
||||
return Objects.equals(_el, setElement._el);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hashCode(_el);
|
||||
}
|
||||
private record SetElement<T>(T el, long time) {
|
||||
}
|
||||
|
||||
private final LinkedHashMap<SetElement, SetElement> _set = new LinkedHashMap<>();
|
||||
private final LinkedHashSet<Thread> _waiting = new LinkedHashSet<>();
|
||||
private final LinkedHashMap<T, SetElement<T>> _set = new LinkedHashMap<>();
|
||||
|
||||
@Getter
|
||||
private final long _delay;
|
||||
private long _delay;
|
||||
|
||||
private boolean _closed = false;
|
||||
|
||||
private final Object _sleepSynchronizer = new Object();
|
||||
|
||||
public HashSetDelayedBlockingQueue(long delay) {
|
||||
_delay = delay;
|
||||
}
|
||||
@@ -44,13 +29,15 @@ public class HashSetDelayedBlockingQueue<T> {
|
||||
public boolean add(T el) {
|
||||
synchronized (this) {
|
||||
if (_closed) throw new IllegalStateException("Adding to a queue that is closed!");
|
||||
var sel = new SetElement(el, System.currentTimeMillis());
|
||||
if (_set.put(sel, sel) == null) {
|
||||
this.notify();
|
||||
return true;
|
||||
}
|
||||
|
||||
if (_set.containsKey(el))
|
||||
return false;
|
||||
|
||||
_set.put(el, new SetElement<>(el, System.currentTimeMillis()));
|
||||
|
||||
this.notify();
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
// Adds the object to the queue, if it exists re-adds it with a new delay
|
||||
@@ -58,69 +45,73 @@ public class HashSetDelayedBlockingQueue<T> {
|
||||
public T readd(T el) {
|
||||
synchronized (this) {
|
||||
if (_closed) throw new IllegalStateException("Adding to a queue that is closed!");
|
||||
var sel = new SetElement(el, System.currentTimeMillis());
|
||||
SetElement contains = _set.putLast(sel, sel);
|
||||
|
||||
SetElement<T> old = _set.putLast(el, new SetElement<>(el, System.currentTimeMillis()));
|
||||
this.notify();
|
||||
if (contains != null)
|
||||
return contains._el;
|
||||
else return null;
|
||||
|
||||
if (old != null)
|
||||
return old.el();
|
||||
else
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
// Adds the object to the queue, if it exists re-adds it with a new delay
|
||||
// Returns true if the object wasn't in the queue
|
||||
// Removes the object
|
||||
public T remove(T el) {
|
||||
synchronized (this) {
|
||||
var rem = _set.remove(new SetElement(el, 0));
|
||||
var rem = _set.remove(el);
|
||||
if (rem == null) return null;
|
||||
return rem._el;
|
||||
return rem.el();
|
||||
}
|
||||
}
|
||||
|
||||
public T get(Long timeout) throws InterruptedException {
|
||||
long startedWaiting = System.currentTimeMillis();
|
||||
try {
|
||||
public T get(long timeout) throws InterruptedException {
|
||||
long startedWaiting = timeout > 0 ? System.currentTimeMillis() : -1;
|
||||
|
||||
while (!Thread.interrupted()) {
|
||||
long sleep;
|
||||
synchronized (this) {
|
||||
_waiting.add(Thread.currentThread());
|
||||
}
|
||||
while (!Thread.interrupted()) {
|
||||
long sleep;
|
||||
synchronized (this) {
|
||||
while (_set.isEmpty()) {
|
||||
if (timeout == null) this.wait();
|
||||
else {
|
||||
this.wait(timeout);
|
||||
if (System.currentTimeMillis() > (startedWaiting + timeout)) return null;
|
||||
}
|
||||
if (timeout > 0)
|
||||
if (System.currentTimeMillis() > (startedWaiting + timeout)) return null;
|
||||
|
||||
while (_set.isEmpty()) {
|
||||
if (timeout > 0) {
|
||||
this.wait(Math.max(timeout - (System.currentTimeMillis() - startedWaiting), 1));
|
||||
if (System.currentTimeMillis() > (startedWaiting + timeout)) return null;
|
||||
} else {
|
||||
this.wait();
|
||||
}
|
||||
|
||||
var curTime = System.currentTimeMillis();
|
||||
|
||||
var first = _set.firstEntry().getValue()._time;
|
||||
if (first + _delay > curTime) sleep = (first + _delay) - curTime;
|
||||
else return _set.pollFirstEntry().getValue()._el;
|
||||
}
|
||||
Thread.sleep(sleep);
|
||||
|
||||
var curTime = System.currentTimeMillis();
|
||||
|
||||
var first = _set.firstEntry().getValue().time();
|
||||
|
||||
if (first + _delay > curTime)
|
||||
sleep = (first + _delay) - curTime;
|
||||
else
|
||||
return _set.pollFirstEntry().getValue().el();
|
||||
}
|
||||
} finally {
|
||||
synchronized (this) {
|
||||
Thread.interrupted();
|
||||
_waiting.remove(Thread.currentThread());
|
||||
|
||||
if (timeout > 0)
|
||||
sleep = Math.min(sleep, (startedWaiting + timeout) - System.currentTimeMillis());
|
||||
|
||||
if (sleep <= 0)
|
||||
continue;
|
||||
|
||||
synchronized (_sleepSynchronizer) {
|
||||
_sleepSynchronizer.wait(sleep);
|
||||
}
|
||||
}
|
||||
|
||||
throw new InterruptedException();
|
||||
}
|
||||
|
||||
public T get() throws InterruptedException {
|
||||
return get(null);
|
||||
}
|
||||
|
||||
public T getNoDelay() throws InterruptedException {
|
||||
synchronized (this) {
|
||||
while (_set.isEmpty()) this.wait();
|
||||
|
||||
return _set.pollFirstEntry().getValue()._el;
|
||||
}
|
||||
T ret;
|
||||
do {
|
||||
} while ((ret = get(-1)) == null);
|
||||
return ret;
|
||||
}
|
||||
|
||||
public boolean hasImmediate() {
|
||||
@@ -129,7 +120,7 @@ public class HashSetDelayedBlockingQueue<T> {
|
||||
|
||||
var curTime = System.currentTimeMillis();
|
||||
|
||||
var first = _set.firstEntry().getValue()._time;
|
||||
var first = _set.firstEntry().getValue().time();
|
||||
return first + _delay <= curTime;
|
||||
}
|
||||
}
|
||||
@@ -141,9 +132,12 @@ public class HashSetDelayedBlockingQueue<T> {
|
||||
|
||||
var curTime = System.currentTimeMillis();
|
||||
|
||||
var first = _set.firstEntry().getValue()._time;
|
||||
if (first + _delay > curTime) return null;
|
||||
else return _set.pollFirstEntry().getValue()._el;
|
||||
var first = _set.firstEntry().getValue().time();
|
||||
|
||||
if (first + _delay > curTime)
|
||||
return null;
|
||||
else
|
||||
return _set.pollFirstEntry().getValue().el();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -154,9 +148,9 @@ public class HashSetDelayedBlockingQueue<T> {
|
||||
var curTime = System.currentTimeMillis();
|
||||
|
||||
while (!_set.isEmpty()) {
|
||||
SetElement el = _set.firstEntry().getValue();
|
||||
if (el._time + _delay > curTime) break;
|
||||
out.add(_set.pollFirstEntry().getValue()._el);
|
||||
SetElement<T> el = _set.firstEntry().getValue();
|
||||
if (el.time() + _delay > curTime) break;
|
||||
out.add(_set.pollFirstEntry().getValue().el());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -166,78 +160,83 @@ public class HashSetDelayedBlockingQueue<T> {
|
||||
public Collection<T> close() {
|
||||
synchronized (this) {
|
||||
_closed = true;
|
||||
var ret = _set.values().stream().map(o -> o._el).toList();
|
||||
var ret = _set.values().stream().map(SetElement::el).toList();
|
||||
_set.clear();
|
||||
return ret;
|
||||
}
|
||||
}
|
||||
|
||||
public Collection<T> getAllWait() throws InterruptedException {
|
||||
return getAllWait(Integer.MAX_VALUE, -1);
|
||||
Collection<T> out;
|
||||
do {
|
||||
} while ((out = getAllWait(Integer.MAX_VALUE, -1)).isEmpty());
|
||||
return out;
|
||||
}
|
||||
|
||||
public Collection<T> getAllWait(int max) throws InterruptedException {
|
||||
return getAllWait(max, -1);
|
||||
Collection<T> out;
|
||||
do {
|
||||
} while ((out = getAllWait(max, -1)).isEmpty());
|
||||
return out;
|
||||
}
|
||||
|
||||
public Collection<T> getAllWait(int max, long timeout) throws InterruptedException {
|
||||
ArrayList<T> out = new ArrayList<>();
|
||||
|
||||
long startedWaiting = timeout > 0 ? System.currentTimeMillis() : -1;
|
||||
try {
|
||||
|
||||
while (!Thread.interrupted()) {
|
||||
if (timeout > 0)
|
||||
if (System.currentTimeMillis() > (startedWaiting + timeout)) return out;
|
||||
|
||||
long sleep = 0;
|
||||
|
||||
synchronized (this) {
|
||||
_waiting.add(Thread.currentThread());
|
||||
}
|
||||
while (!Thread.interrupted()) {
|
||||
if (timeout > 0)
|
||||
if (System.currentTimeMillis() > (startedWaiting + timeout)) return out;
|
||||
long sleep = 0;
|
||||
synchronized (this) {
|
||||
while (_set.isEmpty()) {
|
||||
if (timeout > 0) {
|
||||
this.wait(timeout);
|
||||
if (System.currentTimeMillis() > (startedWaiting + timeout))
|
||||
return out;
|
||||
} else {
|
||||
this.wait();
|
||||
}
|
||||
}
|
||||
|
||||
var curTime = System.currentTimeMillis();
|
||||
|
||||
var first = _set.firstEntry().getValue()._time;
|
||||
if (first + _delay > curTime) sleep = (first + _delay) - curTime;
|
||||
else {
|
||||
while (!_set.isEmpty() && (out.size() < max)) {
|
||||
SetElement el = _set.firstEntry().getValue();
|
||||
if (el._time + _delay > curTime) break;
|
||||
out.add(_set.pollFirstEntry().getValue()._el);
|
||||
}
|
||||
while (_set.isEmpty()) {
|
||||
if (timeout > 0) {
|
||||
this.wait(Math.max(timeout - (System.currentTimeMillis() - startedWaiting), 1));
|
||||
if (System.currentTimeMillis() > (startedWaiting + timeout))
|
||||
return out;
|
||||
} else {
|
||||
this.wait();
|
||||
}
|
||||
}
|
||||
|
||||
if (timeout > 0) {
|
||||
var cur = System.currentTimeMillis();
|
||||
if (cur > (startedWaiting + timeout)) return out;
|
||||
sleep = Math.min(sleep, (startedWaiting + timeout) - cur);
|
||||
}
|
||||
var curTime = System.currentTimeMillis();
|
||||
|
||||
if (sleep > 0)
|
||||
Thread.sleep(sleep);
|
||||
else
|
||||
return out;
|
||||
var first = _set.firstEntry().getValue().time();
|
||||
if (first + _delay > curTime)
|
||||
sleep = (first + _delay) - curTime;
|
||||
else {
|
||||
while (!_set.isEmpty() && (out.size() < max)) {
|
||||
SetElement<T> el = _set.firstEntry().getValue();
|
||||
if (el.time() + _delay > curTime) break;
|
||||
out.add(_set.pollFirstEntry().getValue().el());
|
||||
}
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
synchronized (this) {
|
||||
Thread.interrupted();
|
||||
_waiting.remove(Thread.currentThread());
|
||||
|
||||
if (timeout > 0) {
|
||||
var cur = System.currentTimeMillis();
|
||||
if (cur > (startedWaiting + timeout)) return out;
|
||||
sleep = Math.min(sleep, (startedWaiting + timeout) - cur);
|
||||
}
|
||||
|
||||
if (sleep > 0) {
|
||||
synchronized (_sleepSynchronizer) {
|
||||
_sleepSynchronizer.wait(sleep);
|
||||
}
|
||||
} else
|
||||
return out;
|
||||
}
|
||||
|
||||
return out;
|
||||
}
|
||||
|
||||
public void interrupt() {
|
||||
synchronized (this) {
|
||||
for (var t : _waiting) t.interrupt();
|
||||
public void setDelay(long delay) {
|
||||
synchronized (_sleepSynchronizer) {
|
||||
_delay = delay;
|
||||
_sleepSynchronizer.notifyAll();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -8,6 +8,7 @@ import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
public class HashSetDelayedBlockingQueueTest {
|
||||
|
||||
@@ -134,11 +135,11 @@ public class HashSetDelayedBlockingQueueTest {
|
||||
ex.submit(() -> {
|
||||
try {
|
||||
Thread.sleep(10);
|
||||
queue.add("hello1");
|
||||
queue.add("hello2");
|
||||
queue.readd("hello1");
|
||||
queue.readd("hello2");
|
||||
Thread.sleep(800);
|
||||
queue.add("hello1");
|
||||
queue.add("hello2");
|
||||
queue.readd("hello1");
|
||||
queue.readd("hello2");
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
@@ -155,20 +156,58 @@ public class HashSetDelayedBlockingQueueTest {
|
||||
var queue = new HashSetDelayedBlockingQueue<>(100000);
|
||||
|
||||
var curTime = System.currentTimeMillis();
|
||||
var ex = Executors.newSingleThreadExecutor();
|
||||
|
||||
var future = ex.submit(() -> {
|
||||
AtomicBoolean ok = new AtomicBoolean(false);
|
||||
Thread t = new Thread(() -> {
|
||||
Assertions.assertThrows(InterruptedException.class, queue::get);
|
||||
Assertions.assertThrows(InterruptedException.class, queue::getAllWait);
|
||||
Assertions.assertTrue((System.currentTimeMillis() - curTime) < 2000);
|
||||
Thread.sleep(1000);
|
||||
try {
|
||||
Thread.sleep(1000);
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
ok.set(true);
|
||||
});
|
||||
|
||||
t.start();
|
||||
|
||||
Thread.sleep(500);
|
||||
t.interrupt();
|
||||
Thread.sleep(500);
|
||||
t.interrupt();
|
||||
|
||||
Thread.sleep(1500);
|
||||
|
||||
Assertions.assertTrue(ok.get());
|
||||
}
|
||||
|
||||
@Test
|
||||
void setDelayTest() throws InterruptedException, ExecutionException, TimeoutException {
|
||||
var queue = new HashSetDelayedBlockingQueue<String>(100000);
|
||||
|
||||
var curTime = System.currentTimeMillis();
|
||||
var ex = Executors.newSingleThreadExecutor();
|
||||
|
||||
var future = ex.submit(() -> {
|
||||
Assertions.assertEquals("hello1", queue.get());
|
||||
Assertions.assertTrue((System.currentTimeMillis() - curTime) < 2000);
|
||||
var startTime2 = System.currentTimeMillis();
|
||||
Assertions.assertEquals("hello2", queue.get());
|
||||
Assertions.assertTrue((System.currentTimeMillis() - startTime2) < 200);
|
||||
var startTime3 = System.currentTimeMillis();
|
||||
Assertions.assertEquals("hello3", queue.get());
|
||||
Assertions.assertTrue((System.currentTimeMillis() - startTime3) >= 1000);
|
||||
return null;
|
||||
});
|
||||
|
||||
Thread.sleep(500);
|
||||
queue.interrupt();
|
||||
queue.add("hello1");
|
||||
queue.add("hello2");
|
||||
Thread.sleep(500);
|
||||
queue.interrupt();
|
||||
queue.setDelay(0);
|
||||
Thread.sleep(500);
|
||||
queue.setDelay(1000);
|
||||
queue.add("hello3");
|
||||
|
||||
future.get(10, TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
@@ -1,10 +1,17 @@
|
||||
#!/bin/sh
|
||||
|
||||
set -e || true
|
||||
set -u || true
|
||||
set -o pipefail || true
|
||||
set -x || true
|
||||
|
||||
exec java \
|
||||
--add-exports java.base/sun.nio.ch=ALL-UNNAMED \
|
||||
-Ddhfs.objects.persistence.files.root=/dhfs_root/p \
|
||||
-Ddhfs.objects.root=/dhfs_root/d \
|
||||
-Ddhfs.fuse.root=/dhfs_root_fuse \
|
||||
-Dquarkus.http.host=0.0.0.0 \
|
||||
-Ddhfs.objects.ref_verification=false \
|
||||
-Dquarkus.log.category.\"com.usatiuk.dhfs\".level=$DHFS_LOGLEVEL \
|
||||
"$@" \
|
||||
-jar quarkus-run.jar
|
||||
|
||||
@@ -32,6 +32,7 @@ java \
|
||||
-Ddhfs.objects.root="$SCRIPT_DIR"/../data/configs \
|
||||
-Ddhfs.fuse.root="$SCRIPT_DIR"/../fuse \
|
||||
-Dquarkus.http.host=0.0.0.0 \
|
||||
-Ddhfs.objects.ref_verification=false \
|
||||
-Dquarkus.log.category.\"com.usatiuk.dhfs\".level=INFO \
|
||||
-Ddhfs.webui.root="$SCRIPT_DIR"/Webui $EXTRAOPTS_PARSED \
|
||||
-jar "$SCRIPT_DIR"/"DHFS Package"/quarkus-run.jar >quarkus.log 2>&1 &
|
||||
|
||||
@@ -6,7 +6,7 @@ set -o pipefail
|
||||
SCRIPT_DIR=$(cd -- "$(dirname -- "${BASH_SOURCE[0]}")" &>/dev/null && pwd)
|
||||
|
||||
# 💀
|
||||
LATEST=$(curl "https://api.github.com/repos/usatiuk/dhfs/actions/runs?branch=main&status=completed&per_page=1" | tr -d "[:space:]" | sed -en "s/.*\[\{\"id\":([0-9]*).*/\1/p")
|
||||
LATEST=$(curl "https://api.github.com/repos/usatiuk/dhfs/actions/runs?branch=main&status=completed&per_page=1" | tr -d "[:space:]" | sed -n "s/.*\[{\"id\":\([0-9]*\).*/\1/p")
|
||||
|
||||
echo Latest: $LATEST
|
||||
|
||||
@@ -24,14 +24,15 @@ echo Downloading...
|
||||
cd "$SCRIPT_DIR"
|
||||
|
||||
rm "Run wrapper.zip" || true
|
||||
rm "Run wrapper.tar.gz" || true
|
||||
rm "run-wrapper.tar.gz" || true
|
||||
rm -rf "dhfs" || true
|
||||
|
||||
wget https://nightly.link/usatiuk/dhfs/actions/runs/$LATEST/Run%20wrapper.zip
|
||||
|
||||
unzip "Run wrapper.zip"
|
||||
rm "Run wrapper.zip"
|
||||
tar xvf "Run wrapper.tar.gz"
|
||||
tar xvf "run-wrapper.tar.gz" --strip-components=2
|
||||
rm "run-wrapper.tar.gz"
|
||||
|
||||
rm -rf "DHFS Package"
|
||||
rm -rf "Webui"
|
||||
|
||||
Reference in New Issue
Block a user