10 Commits

16 changed files with 349 additions and 174 deletions

View File

@@ -0,0 +1,43 @@
package com.usatiuk.dhfs;
import io.quarkus.logging.Log;
import io.quarkus.runtime.ShutdownEvent;
import io.quarkus.runtime.StartupEvent;
import jakarta.annotation.Priority;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import java.io.IOException;
import java.nio.file.Paths;
@ApplicationScoped
public class ShutdownChecker {
@ConfigProperty(name = "dhfs.objects.root")
String dataRoot;
private static final String dataFileName = "running";
boolean _cleanShutdown = true;
boolean _initialized = false;
void init(@Observes @Priority(1) StartupEvent event) throws IOException {
Paths.get(dataRoot).toFile().mkdirs();
Log.info("Initializing with root " + dataRoot);
if (Paths.get(dataRoot).resolve(dataFileName).toFile().exists()) {
_cleanShutdown = false;
Log.error("Unclean shutdown detected!");
} else {
Paths.get(dataRoot).resolve(dataFileName).toFile().createNewFile();
}
_initialized = true;
}
void shutdown(@Observes @Priority(100000) ShutdownEvent event) throws IOException {
Paths.get(dataRoot).resolve(dataFileName).toFile().delete();
}
public boolean lastShutdownClean() {
if (!_initialized) throw new IllegalStateException("ShutdownChecker not initialized");
return _cleanShutdown;
}
}

View File

@@ -1,6 +1,5 @@
package com.usatiuk.dhfs.objects.jrepository;
import com.google.common.collect.Streams;
import com.usatiuk.dhfs.objects.repository.PersistentRemoteHostsService;
import com.usatiuk.dhfs.objects.repository.RemoteObjectServiceClient;
import com.usatiuk.dhfs.objects.repository.autosync.AutoSyncProcessor;
@@ -15,7 +14,6 @@ import org.eclipse.microprofile.config.inject.ConfigProperty;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Stream;
@ApplicationScoped
public class JObjectRefProcessor {
@@ -171,12 +169,14 @@ public class JObjectRefProcessor {
m.markDeleted();
Collection<String> extracted = null;
if (got.getData() != null) extracted = got.getData().extractRefs();
if (got.getData() != null)
extracted = got.getData().extractRefs();
Collection<String> saved = got.getMeta().getSavedRefs();
got.discardData();
if (got.getMeta().getSavedRefs() != null)
for (var r : got.getMeta().getSavedRefs()) deleteRef(got, r);
if (saved != null)
for (var r : saved) deleteRef(got, r);
if (extracted != null)
for (var r : extracted) deleteRef(got, r);

View File

@@ -150,12 +150,14 @@ public class JObjectResolver {
self.getMeta().markDeleted();
Collection<String> extracted = null;
if (self.getData() != null) extracted = self.getData().extractRefs();
if (self.getData() != null)
extracted = self.getData().extractRefs();
Collection<String> saved = self.getMeta().getSavedRefs();
self.discardData();
if (self.getMeta().getSavedRefs() != null)
for (var r : self.getMeta().getSavedRefs()) quickDeleteRef(self, r);
if (saved != null)
for (var r : saved) quickDeleteRef(self, r);
if (extracted != null)
for (var r : extracted) quickDeleteRef(self, r);
}

View File

@@ -59,6 +59,8 @@ public class JObjectWriteback {
float watermarkLowRatio;
@ConfigProperty(name = "dhfs.objects.writeback.threads")
int writebackThreads;
@ConfigProperty(name = "dhfs.objects.writeback.delay")
long promotionDelay;
private final AtomicLong _currentSize = new AtomicLong(0);
private final AtomicBoolean _watermarkReached = new AtomicBoolean(false);
@@ -116,14 +118,15 @@ public class JObjectWriteback {
private void writeback() {
while (!_shutdown.get()) {
try {
QueueEntry got
= _watermarkReached.get()
? _writeQueue.getNoDelay()
: _writeQueue.get();
QueueEntry got = _writeQueue.get();
try {
_currentSize.addAndGet(-got._size);
flushOne(got._obj);
if (_currentSize.get() <= sizeLimit)
synchronized (this) {
this.notifyAll();
}
} catch (Exception e) {
Log.error("Failed writing object " + got._obj.getName() + ", will retry.", e);
try {
@@ -188,21 +191,40 @@ public class JObjectWriteback {
if (!_watermarkReached.get()) {
Log.trace("Watermark reached");
_watermarkReached.set(true);
_writeQueue.interrupt();
_writeQueue.setDelay(0);
}
} else if (_currentSize.get() <= (watermarkLowRatio * sizeLimit)) {
if (_watermarkReached.get())
if (_watermarkReached.get()) {
Log.trace("Watermark reset");
_watermarkReached.set(false);
_watermarkReached.set(false);
_writeQueue.setDelay(promotionDelay);
}
}
if (_currentSize.get() > sizeLimit) {
try {
flushOneImmediate(object.getMeta(), object.getData());
return;
} catch (Exception e) {
Log.error("Failed writing object " + object.getName(), e);
throw e;
long started = System.currentTimeMillis();
final long timeout = 15000L; // FIXME:
boolean finished = false;
while (!finished && System.currentTimeMillis() - started < timeout) {
synchronized (this) {
try {
this.wait();
finished = true;
} catch (InterruptedException ignored) {
}
}
if (_currentSize.get() > sizeLimit)
finished = false;
}
if (!finished) {
Log.error("Timed out waiting for writeback to drain");
try {
flushOneImmediate(object.getMeta(), object.getData());
return;
} catch (Exception e) {
Log.error("Failed writing object " + object.getName(), e);
throw e;
}
}
}

View File

@@ -16,6 +16,8 @@ import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.UUID;
import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
@ApplicationScoped
public class DeferredInvalidationQueueService {
@Inject
@@ -33,7 +35,10 @@ public class DeferredInvalidationQueueService {
Paths.get(dataRoot).toFile().mkdirs();
Log.info("Initializing with root " + dataRoot);
if (Paths.get(dataRoot).resolve(dataFileName).toFile().exists()) {
Log.info("Reading hosts");
Log.info("Reading invalidation queue");
_persistentData = SerializationHelper.deserialize(Files.readAllBytes(Paths.get(dataRoot).resolve(dataFileName)));
} else if (Paths.get(dataRoot).resolve(dataFileName + ".bak").toFile().exists()) {
Log.warn("Reading invalidation queue from backup");
_persistentData = SerializationHelper.deserialize(Files.readAllBytes(Paths.get(dataRoot).resolve(dataFileName)));
}
@@ -42,10 +47,22 @@ public class DeferredInvalidationQueueService {
void shutdown(@Observes @Priority(300) ShutdownEvent event) throws IOException {
Log.info("Saving deferred invalidations");
Files.write(Paths.get(dataRoot).resolve(dataFileName), SerializationUtils.serialize(_persistentData));
writeData();
Log.info("Saved deferred invalidations");
}
private void writeData() {
try {
if (Paths.get(dataRoot).resolve(dataFileName).toFile().exists())
Files.move(Paths.get(dataRoot).resolve(dataFileName), Paths.get(dataRoot).resolve(dataFileName + ".bak"), REPLACE_EXISTING);
Files.write(Paths.get(dataRoot).resolve(dataFileName), SerializationUtils.serialize(_persistentData));
} catch (IOException iex) {
Log.error("Error writing deferred invalidations data", iex);
throw new RuntimeException(iex);
}
}
void returnForHost(UUID host) {
synchronized (this) {
var col = _persistentData.getDeferredInvalidations().get(host);

View File

@@ -11,12 +11,10 @@ import jakarta.annotation.Priority;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
import jakarta.inject.Inject;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.apache.commons.lang3.tuple.Pair;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import java.lang.ref.Reference;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -46,7 +44,7 @@ public class InvalidationQueueService {
private final HashSetDelayedBlockingQueue<Pair<UUID, String>> _queue;
private final AtomicReference<ConcurrentHashSet<String>> _toAllQueue = new AtomicReference<>(new ConcurrentHashSet<>());
private ExecutorService _executor;
private boolean _shutdown = false;
private volatile boolean _shutdown = false;
public InvalidationQueueService(@ConfigProperty(name = "dhfs.objects.invalidation.delay") int delay) {
_queue = new HashSetDelayedBlockingQueue<>(delay);
@@ -54,7 +52,11 @@ public class InvalidationQueueService {
@Startup
void init() {
_executor = Executors.newFixedThreadPool(threads);
BasicThreadFactory factory = new BasicThreadFactory.Builder()
.namingPattern("invalidation-%d")
.build();
_executor = Executors.newFixedThreadPool(threads, factory);
for (int i = 0; i < threads; i++) {
_executor.submit(this::sender);

View File

@@ -20,6 +20,8 @@ public class PersistentRemoteHostsData implements Serializable {
@Getter
private final AtomicLong _selfCounter = new AtomicLong();
@Getter
private final AtomicLong _irregularShutdownCounter = new AtomicLong();
@Getter
@Setter
private X509Certificate _selfCertificate = null;
@Getter

View File

@@ -1,6 +1,7 @@
package com.usatiuk.dhfs.objects.repository;
import com.usatiuk.dhfs.SerializationHelper;
import com.usatiuk.dhfs.ShutdownChecker;
import com.usatiuk.dhfs.objects.jrepository.JObject;
import com.usatiuk.dhfs.objects.jrepository.JObjectManager;
import com.usatiuk.dhfs.objects.jrepository.JObjectResolver;
@@ -31,6 +32,8 @@ import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
@ApplicationScoped
public class PersistentRemoteHostsService {
final String dataFileName = "hosts";
@@ -48,6 +51,8 @@ public class PersistentRemoteHostsService {
InvalidationQueueService invalidationQueueService;
@Inject
RpcClientFactory rpcClientFactory;
@Inject
ShutdownChecker shutdownChecker;
private PersistentRemoteHosts _persistentData = new PersistentRemoteHosts();
private UUID _selfUuid;
@@ -58,6 +63,9 @@ public class PersistentRemoteHostsService {
if (Paths.get(dataRoot).resolve(dataFileName).toFile().exists()) {
Log.info("Reading hosts");
_persistentData = SerializationHelper.deserialize(Files.readAllBytes(Paths.get(dataRoot).resolve(dataFileName)));
} else if (Paths.get(dataRoot).resolve(dataFileName + ".bak").toFile().exists()) {
Log.warn("Reading hosts from backup");
_persistentData = SerializationHelper.deserialize(Files.readAllBytes(Paths.get(dataRoot).resolve(dataFileName)));
}
_selfUuid = _persistentData.runReadLocked(PersistentRemoteHostsData::getSelfUuid);
@@ -78,6 +86,10 @@ public class PersistentRemoteHostsService {
var dir = jObjectManager.put(newpd, Optional.empty());
}
if (!shutdownChecker.lastShutdownClean()) {
_persistentData.getData().getIrregularShutdownCounter().addAndGet(1);
}
jObjectResolver.registerWriteListener(PersistentPeerInfo.class, this::pushPeerUpdates);
jObjectResolver.registerWriteListener(PeerDirectory.class, this::pushPeerUpdates);
@@ -89,14 +101,26 @@ public class PersistentRemoteHostsService {
Files.writeString(Paths.get(dataRoot, "self_uuid"), _selfUuid.toString());
Log.info("Self uuid is: " + _selfUuid.toString());
writeData();
}
void shutdown(@Observes @Priority(300) ShutdownEvent event) throws IOException {
Log.info("Saving hosts");
Files.write(Paths.get(dataRoot).resolve(dataFileName), SerializationUtils.serialize(_persistentData));
writeData();
Log.info("Shutdown");
}
private void writeData() {
try {
if (Paths.get(dataRoot).resolve(dataFileName).toFile().exists())
Files.move(Paths.get(dataRoot).resolve(dataFileName), Paths.get(dataRoot).resolve(dataFileName + ".bak"), REPLACE_EXISTING);
Files.write(Paths.get(dataRoot).resolve(dataFileName), SerializationUtils.serialize(_persistentData));
} catch (IOException iex) {
Log.error("Error writing persistent hosts data", iex);
throw new RuntimeException(iex);
}
}
private JObject<PeerDirectory> getPeerDirectory() {
var got = jObjectManager.get(PeerDirectory.PeerDirectoryObjName).orElseThrow(() -> new IllegalStateException("Peer directory not found"));
got.runReadLocked(JObject.ResolutionStrategy.LOCAL_ONLY, (m, d) -> {
@@ -153,7 +177,12 @@ public class PersistentRemoteHostsService {
}
public String getUniqueId() {
return _selfUuid.toString() + _persistentData.getData().getSelfCounter().addAndGet(1);
StringBuilder sb = new StringBuilder(64);
sb.append(_selfUuid);
sb.append(_persistentData.getData().getIrregularShutdownCounter());
sb.append("_");
sb.append(_persistentData.getData().getSelfCounter().addAndGet(1));
return sb.toString();
}
public PersistentPeerInfo getInfo(UUID name) {

View File

@@ -1,5 +1,6 @@
package com.usatiuk.dhfs.objects.repository;
import com.usatiuk.dhfs.ShutdownChecker;
import com.usatiuk.dhfs.objects.repository.peersync.PeerSyncApiClientDynamic;
import com.usatiuk.dhfs.objects.repository.peersync.PersistentPeerInfo;
import com.usatiuk.dhfs.objects.repository.webapi.AvailablePeerInfo;
@@ -34,6 +35,8 @@ public class RemoteHostManager {
RpcClientFactory rpcClientFactory;
@Inject
PeerSyncApiClientDynamic peerSyncApiClient;
@Inject
ShutdownChecker shutdownChecker;
@ConfigProperty(name = "dhfs.objects.sync.ping.timeout")
long pingTimeout;
private ExecutorService _heartbeatExecutor;
@@ -115,8 +118,11 @@ public class RemoteHostManager {
Log.info("Connected to " + host);
if (persistentRemoteHostsService.markInitialSyncDone(host))
if (persistentRemoteHostsService.markInitialSyncDone(host) || !shutdownChecker.lastShutdownClean()) {
if (!shutdownChecker.lastShutdownClean())
Log.info("Resyncing with " + host + " as last shutdown wasn't clean");
syncHandler.doInitialResync(host);
}
}
public void handleConnectionError(UUID host) {

View File

@@ -103,7 +103,7 @@ public class RemoteObjectServiceServer implements DhfsObjectSyncGrpc {
var ret = builder.build();
if (!ret.getDeletionCandidate())
for (var rr : ret.getReferrersList())
for (var rr : request.getOurReferrersList())
autoSyncProcessor.add(rr);
return Uni.createFrom().item(ret);

View File

@@ -11,6 +11,7 @@ import jakarta.annotation.Priority;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
import jakarta.inject.Inject;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import java.util.concurrent.ExecutorService;
@@ -34,7 +35,11 @@ public class AutoSyncProcessor {
@Startup
void init() {
_autosyncExcecutor = Executors.newFixedThreadPool(autosyncThreads);
BasicThreadFactory factory = new BasicThreadFactory.Builder()
.namingPattern("autosync-%d")
.build();
_autosyncExcecutor = Executors.newFixedThreadPool(autosyncThreads, factory);
for (int i = 0; i < autosyncThreads; i++) {
_autosyncExcecutor.submit(this::autosync);
}

View File

@@ -3,38 +3,23 @@ package com.usatiuk.utils;
import jakarta.annotation.Nullable;
import lombok.Getter;
import java.util.*;
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedHashMap;
public class HashSetDelayedBlockingQueue<T> {
private class SetElement {
private final T _el;
private final long _time;
private SetElement(T el, long time) {
_el = el;
_time = time;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
SetElement setElement = (SetElement) o;
return Objects.equals(_el, setElement._el);
}
@Override
public int hashCode() {
return Objects.hashCode(_el);
}
private record SetElement<T>(T el, long time) {
}
private final LinkedHashMap<SetElement, SetElement> _set = new LinkedHashMap<>();
private final LinkedHashSet<Thread> _waiting = new LinkedHashSet<>();
private final LinkedHashMap<T, SetElement<T>> _set = new LinkedHashMap<>();
@Getter
private final long _delay;
private long _delay;
private boolean _closed = false;
private final Object _sleepSynchronizer = new Object();
public HashSetDelayedBlockingQueue(long delay) {
_delay = delay;
}
@@ -44,13 +29,15 @@ public class HashSetDelayedBlockingQueue<T> {
public boolean add(T el) {
synchronized (this) {
if (_closed) throw new IllegalStateException("Adding to a queue that is closed!");
var sel = new SetElement(el, System.currentTimeMillis());
if (_set.put(sel, sel) == null) {
this.notify();
return true;
}
if (_set.containsKey(el))
return false;
_set.put(el, new SetElement<>(el, System.currentTimeMillis()));
this.notify();
return true;
}
return false;
}
// Adds the object to the queue, if it exists re-adds it with a new delay
@@ -58,69 +45,73 @@ public class HashSetDelayedBlockingQueue<T> {
public T readd(T el) {
synchronized (this) {
if (_closed) throw new IllegalStateException("Adding to a queue that is closed!");
var sel = new SetElement(el, System.currentTimeMillis());
SetElement contains = _set.putLast(sel, sel);
SetElement<T> old = _set.putLast(el, new SetElement<>(el, System.currentTimeMillis()));
this.notify();
if (contains != null)
return contains._el;
else return null;
if (old != null)
return old.el();
else
return null;
}
}
// Adds the object to the queue, if it exists re-adds it with a new delay
// Returns true if the object wasn't in the queue
// Removes the object
public T remove(T el) {
synchronized (this) {
var rem = _set.remove(new SetElement(el, 0));
var rem = _set.remove(el);
if (rem == null) return null;
return rem._el;
return rem.el();
}
}
public T get(Long timeout) throws InterruptedException {
long startedWaiting = System.currentTimeMillis();
try {
public T get(long timeout) throws InterruptedException {
long startedWaiting = timeout > 0 ? System.currentTimeMillis() : -1;
while (!Thread.interrupted()) {
long sleep;
synchronized (this) {
_waiting.add(Thread.currentThread());
}
while (!Thread.interrupted()) {
long sleep;
synchronized (this) {
while (_set.isEmpty()) {
if (timeout == null) this.wait();
else {
this.wait(timeout);
if (System.currentTimeMillis() > (startedWaiting + timeout)) return null;
}
if (timeout > 0)
if (System.currentTimeMillis() > (startedWaiting + timeout)) return null;
while (_set.isEmpty()) {
if (timeout > 0) {
this.wait(Math.max(timeout - (System.currentTimeMillis() - startedWaiting), 1));
if (System.currentTimeMillis() > (startedWaiting + timeout)) return null;
} else {
this.wait();
}
var curTime = System.currentTimeMillis();
var first = _set.firstEntry().getValue()._time;
if (first + _delay > curTime) sleep = (first + _delay) - curTime;
else return _set.pollFirstEntry().getValue()._el;
}
Thread.sleep(sleep);
var curTime = System.currentTimeMillis();
var first = _set.firstEntry().getValue().time();
if (first + _delay > curTime)
sleep = (first + _delay) - curTime;
else
return _set.pollFirstEntry().getValue().el();
}
} finally {
synchronized (this) {
Thread.interrupted();
_waiting.remove(Thread.currentThread());
if (timeout > 0)
sleep = Math.min(sleep, (startedWaiting + timeout) - System.currentTimeMillis());
if (sleep <= 0)
continue;
synchronized (_sleepSynchronizer) {
_sleepSynchronizer.wait(sleep);
}
}
throw new InterruptedException();
}
public T get() throws InterruptedException {
return get(null);
}
public T getNoDelay() throws InterruptedException {
synchronized (this) {
while (_set.isEmpty()) this.wait();
return _set.pollFirstEntry().getValue()._el;
}
T ret;
do {
} while ((ret = get(-1)) == null);
return ret;
}
public boolean hasImmediate() {
@@ -129,7 +120,7 @@ public class HashSetDelayedBlockingQueue<T> {
var curTime = System.currentTimeMillis();
var first = _set.firstEntry().getValue()._time;
var first = _set.firstEntry().getValue().time();
return first + _delay <= curTime;
}
}
@@ -141,9 +132,12 @@ public class HashSetDelayedBlockingQueue<T> {
var curTime = System.currentTimeMillis();
var first = _set.firstEntry().getValue()._time;
if (first + _delay > curTime) return null;
else return _set.pollFirstEntry().getValue()._el;
var first = _set.firstEntry().getValue().time();
if (first + _delay > curTime)
return null;
else
return _set.pollFirstEntry().getValue().el();
}
}
@@ -154,9 +148,9 @@ public class HashSetDelayedBlockingQueue<T> {
var curTime = System.currentTimeMillis();
while (!_set.isEmpty()) {
SetElement el = _set.firstEntry().getValue();
if (el._time + _delay > curTime) break;
out.add(_set.pollFirstEntry().getValue()._el);
SetElement<T> el = _set.firstEntry().getValue();
if (el.time() + _delay > curTime) break;
out.add(_set.pollFirstEntry().getValue().el());
}
}
@@ -166,78 +160,83 @@ public class HashSetDelayedBlockingQueue<T> {
public Collection<T> close() {
synchronized (this) {
_closed = true;
var ret = _set.values().stream().map(o -> o._el).toList();
var ret = _set.values().stream().map(SetElement::el).toList();
_set.clear();
return ret;
}
}
public Collection<T> getAllWait() throws InterruptedException {
return getAllWait(Integer.MAX_VALUE, -1);
Collection<T> out;
do {
} while ((out = getAllWait(Integer.MAX_VALUE, -1)).isEmpty());
return out;
}
public Collection<T> getAllWait(int max) throws InterruptedException {
return getAllWait(max, -1);
Collection<T> out;
do {
} while ((out = getAllWait(max, -1)).isEmpty());
return out;
}
public Collection<T> getAllWait(int max, long timeout) throws InterruptedException {
ArrayList<T> out = new ArrayList<>();
long startedWaiting = timeout > 0 ? System.currentTimeMillis() : -1;
try {
while (!Thread.interrupted()) {
if (timeout > 0)
if (System.currentTimeMillis() > (startedWaiting + timeout)) return out;
long sleep = 0;
synchronized (this) {
_waiting.add(Thread.currentThread());
}
while (!Thread.interrupted()) {
if (timeout > 0)
if (System.currentTimeMillis() > (startedWaiting + timeout)) return out;
long sleep = 0;
synchronized (this) {
while (_set.isEmpty()) {
if (timeout > 0) {
this.wait(timeout);
if (System.currentTimeMillis() > (startedWaiting + timeout))
return out;
} else {
this.wait();
}
}
var curTime = System.currentTimeMillis();
var first = _set.firstEntry().getValue()._time;
if (first + _delay > curTime) sleep = (first + _delay) - curTime;
else {
while (!_set.isEmpty() && (out.size() < max)) {
SetElement el = _set.firstEntry().getValue();
if (el._time + _delay > curTime) break;
out.add(_set.pollFirstEntry().getValue()._el);
}
while (_set.isEmpty()) {
if (timeout > 0) {
this.wait(Math.max(timeout - (System.currentTimeMillis() - startedWaiting), 1));
if (System.currentTimeMillis() > (startedWaiting + timeout))
return out;
} else {
this.wait();
}
}
if (timeout > 0) {
var cur = System.currentTimeMillis();
if (cur > (startedWaiting + timeout)) return out;
sleep = Math.min(sleep, (startedWaiting + timeout) - cur);
}
var curTime = System.currentTimeMillis();
if (sleep > 0)
Thread.sleep(sleep);
else
return out;
var first = _set.firstEntry().getValue().time();
if (first + _delay > curTime)
sleep = (first + _delay) - curTime;
else {
while (!_set.isEmpty() && (out.size() < max)) {
SetElement<T> el = _set.firstEntry().getValue();
if (el.time() + _delay > curTime) break;
out.add(_set.pollFirstEntry().getValue().el());
}
}
}
} finally {
synchronized (this) {
Thread.interrupted();
_waiting.remove(Thread.currentThread());
if (timeout > 0) {
var cur = System.currentTimeMillis();
if (cur > (startedWaiting + timeout)) return out;
sleep = Math.min(sleep, (startedWaiting + timeout) - cur);
}
if (sleep > 0) {
synchronized (_sleepSynchronizer) {
_sleepSynchronizer.wait(sleep);
}
} else
return out;
}
return out;
}
public void interrupt() {
synchronized (this) {
for (var t : _waiting) t.interrupt();
public void setDelay(long delay) {
synchronized (_sleepSynchronizer) {
_delay = delay;
_sleepSynchronizer.notifyAll();
}
}
}

View File

@@ -8,6 +8,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
public class HashSetDelayedBlockingQueueTest {
@@ -134,11 +135,11 @@ public class HashSetDelayedBlockingQueueTest {
ex.submit(() -> {
try {
Thread.sleep(10);
queue.add("hello1");
queue.add("hello2");
queue.readd("hello1");
queue.readd("hello2");
Thread.sleep(800);
queue.add("hello1");
queue.add("hello2");
queue.readd("hello1");
queue.readd("hello2");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
@@ -155,20 +156,58 @@ public class HashSetDelayedBlockingQueueTest {
var queue = new HashSetDelayedBlockingQueue<>(100000);
var curTime = System.currentTimeMillis();
var ex = Executors.newSingleThreadExecutor();
var future = ex.submit(() -> {
AtomicBoolean ok = new AtomicBoolean(false);
Thread t = new Thread(() -> {
Assertions.assertThrows(InterruptedException.class, queue::get);
Assertions.assertThrows(InterruptedException.class, queue::getAllWait);
Assertions.assertTrue((System.currentTimeMillis() - curTime) < 2000);
Thread.sleep(1000);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
ok.set(true);
});
t.start();
Thread.sleep(500);
t.interrupt();
Thread.sleep(500);
t.interrupt();
Thread.sleep(1500);
Assertions.assertTrue(ok.get());
}
@Test
void setDelayTest() throws InterruptedException, ExecutionException, TimeoutException {
var queue = new HashSetDelayedBlockingQueue<String>(100000);
var curTime = System.currentTimeMillis();
var ex = Executors.newSingleThreadExecutor();
var future = ex.submit(() -> {
Assertions.assertEquals("hello1", queue.get());
Assertions.assertTrue((System.currentTimeMillis() - curTime) < 2000);
var startTime2 = System.currentTimeMillis();
Assertions.assertEquals("hello2", queue.get());
Assertions.assertTrue((System.currentTimeMillis() - startTime2) < 200);
var startTime3 = System.currentTimeMillis();
Assertions.assertEquals("hello3", queue.get());
Assertions.assertTrue((System.currentTimeMillis() - startTime3) >= 1000);
return null;
});
Thread.sleep(500);
queue.interrupt();
queue.add("hello1");
queue.add("hello2");
Thread.sleep(500);
queue.interrupt();
queue.setDelay(0);
Thread.sleep(500);
queue.setDelay(1000);
queue.add("hello3");
future.get(10, TimeUnit.SECONDS);
}

View File

@@ -1,10 +1,17 @@
#!/bin/sh
set -e || true
set -u || true
set -o pipefail || true
set -x || true
exec java \
--add-exports java.base/sun.nio.ch=ALL-UNNAMED \
-Ddhfs.objects.persistence.files.root=/dhfs_root/p \
-Ddhfs.objects.root=/dhfs_root/d \
-Ddhfs.fuse.root=/dhfs_root_fuse \
-Dquarkus.http.host=0.0.0.0 \
-Ddhfs.objects.ref_verification=false \
-Dquarkus.log.category.\"com.usatiuk.dhfs\".level=$DHFS_LOGLEVEL \
"$@" \
-jar quarkus-run.jar

View File

@@ -32,6 +32,7 @@ java \
-Ddhfs.objects.root="$SCRIPT_DIR"/../data/configs \
-Ddhfs.fuse.root="$SCRIPT_DIR"/../fuse \
-Dquarkus.http.host=0.0.0.0 \
-Ddhfs.objects.ref_verification=false \
-Dquarkus.log.category.\"com.usatiuk.dhfs\".level=INFO \
-Ddhfs.webui.root="$SCRIPT_DIR"/Webui $EXTRAOPTS_PARSED \
-jar "$SCRIPT_DIR"/"DHFS Package"/quarkus-run.jar >quarkus.log 2>&1 &

View File

@@ -6,7 +6,7 @@ set -o pipefail
SCRIPT_DIR=$(cd -- "$(dirname -- "${BASH_SOURCE[0]}")" &>/dev/null && pwd)
# 💀
LATEST=$(curl "https://api.github.com/repos/usatiuk/dhfs/actions/runs?branch=main&status=completed&per_page=1" | tr -d "[:space:]" | sed -en "s/.*\[\{\"id\":([0-9]*).*/\1/p")
LATEST=$(curl "https://api.github.com/repos/usatiuk/dhfs/actions/runs?branch=main&status=completed&per_page=1" | tr -d "[:space:]" | sed -n "s/.*\[{\"id\":\([0-9]*\).*/\1/p")
echo Latest: $LATEST
@@ -24,14 +24,15 @@ echo Downloading...
cd "$SCRIPT_DIR"
rm "Run wrapper.zip" || true
rm "Run wrapper.tar.gz" || true
rm "run-wrapper.tar.gz" || true
rm -rf "dhfs" || true
wget https://nightly.link/usatiuk/dhfs/actions/runs/$LATEST/Run%20wrapper.zip
unzip "Run wrapper.zip"
rm "Run wrapper.zip"
tar xvf "Run wrapper.tar.gz"
tar xvf "run-wrapper.tar.gz" --strip-components=2
rm "run-wrapper.tar.gz"
rm -rf "DHFS Package"
rm -rf "Webui"