non-completely-crazy writeback

This commit is contained in:
2024-07-11 17:25:02 +02:00
parent 4e26e28b19
commit 57c57ac9ea
6 changed files with 301 additions and 213 deletions

View File

@@ -36,7 +36,7 @@ public class JObjectResolver {
@Inject
JObjectWriteback jObjectWriteback;
@Inject
JObjectManager jobjectManager;
JObjectManager jObjectManager;
@Inject
PersistentRemoteHostsService persistentRemoteHostsService;
@Inject
@@ -81,7 +81,7 @@ public class JObjectResolver {
Log.debug(sb.toString());
for (var r : self.getMeta().getSavedRefs()) {
if (!extracted.contains(r))
jobjectManager.get(r).ifPresent(ro -> ro.runWriteLocked(JObject.ResolutionStrategy.NO_RESOLUTION, (m, d, b, i) -> {
jObjectManager.get(r).ifPresent(ro -> ro.runWriteLocked(JObject.ResolutionStrategy.NO_RESOLUTION, (m, d, b, i) -> {
m.removeRef(self.getName());
return null;
}));
@@ -89,7 +89,7 @@ public class JObjectResolver {
for (var r : extracted) {
if (!self.getMeta().getSavedRefs().contains(r)) {
Log.trace("Hydrating ref " + r + " for " + self.getName());
jobjectManager.getOrPut(r, self.getData().getRefType(), Optional.of(self.getName()));
jObjectManager.getOrPut(r, self.getData().getRefType(), Optional.of(self.getName()));
}
}
self.getMeta().setSavedRefs(null);
@@ -113,16 +113,41 @@ public class JObjectResolver {
refs.forEach(r -> {
Log.trace("Hydrating ref after undelete " + r + " for " + self.getName());
jobjectManager.getOrPut(r, self.getData().getRefType(), Optional.of(self.getName()));
jObjectManager.getOrPut(r, self.getData().getRefType(), Optional.of(self.getName()));
});
}
if (self.isDeletionCandidate() && !self.isDeleted()) {
jObjectRefProcessor.putDeletionCandidate(self.getName());
if (!self.getMeta().isSeen()) tryQuickDelete(self);
else jObjectRefProcessor.putDeletionCandidate(self.getName());
}
}
private void tryQuickDelete(JObject<?> self) {
self.assertRWLock();
self.tryResolve(JObject.ResolutionStrategy.LOCAL_ONLY);
Log.trace("Quick delete of: " + self.getName());
self.getMeta().delete();
Stream<String> refs = Stream.empty();
if (self.getMeta().getSavedRefs() != null)
refs = self.getMeta().getSavedRefs().stream();
if (self.getData() != null)
refs = Streams.concat(refs, self.getData().extractRefs().stream());
self.discardData();
refs.forEach(c -> {
jObjectManager.get(c).ifPresent(ref -> ref.runWriteLocked(JObject.ResolutionStrategy.NO_RESOLUTION, (mc, dc, bc, ic) -> {
mc.removeRef(self.getName());
return null;
}));
});
}
public <T extends JObjectData> Optional<T> resolveDataLocal(JObject<T> jObject) {
// jObject.assertRWLock();
// FIXME: No way to assert read lock?
@@ -191,13 +216,13 @@ public class JObjectResolver {
var newRefs = self.getData().extractRefs();
if (oldRefs != null) for (var o : oldRefs)
if (!newRefs.contains(o)) {
jobjectManager.get(o).ifPresent(obj -> {
jObjectManager.get(o).ifPresent(obj -> {
if (obj.hasRef(self.getName()))
throw new IllegalStateException("Object " + o + " is referenced from " + self.getName() + " but shouldn't be");
});
}
for (var r : newRefs) {
var obj = jobjectManager.get(r).orElseThrow(() -> new IllegalStateException("Object " + r + " not found but should be referenced from " + self.getName()));
var obj = jObjectManager.get(r).orElseThrow(() -> new IllegalStateException("Object " + r + " not found but should be referenced from " + self.getName()));
if (obj.isDeleted())
throw new IllegalStateException("Object " + r + " deleted but referenced from " + self.getName());
if (!obj.hasRef(self.getName()))

View File

@@ -2,6 +2,7 @@ package com.usatiuk.dhfs.objects.jrepository;
import com.usatiuk.dhfs.SerializationHelper;
import com.usatiuk.dhfs.objects.repository.persistence.ObjectPersistentStore;
import com.usatiuk.utils.HashSetDelayedBlockingQueue;
import io.quarkus.logging.Log;
import io.quarkus.runtime.ShutdownEvent;
import io.quarkus.runtime.Startup;
@@ -9,147 +10,131 @@ import jakarta.annotation.Priority;
import jakarta.enterprise.event.Observes;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import org.apache.commons.collections4.OrderedBidiMap;
import org.apache.commons.collections4.bidimap.TreeBidiMap;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
@Singleton
public class JObjectWriteback {
private class QueueEntry {
private final JObject<?> _obj;
private long _size;
private final LinkedHashMap<JObject<?>, Pair<Long, Long>> _nursery = new LinkedHashMap<>();
// FIXME: Kind of a hack
private final OrderedBidiMap<Pair<Long, String>, JObject<?>> _writeQueue = new TreeBidiMap<>();
private QueueEntry(JObject<?> obj, long size) {
_obj = obj;
_size = size;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
QueueEntry that = (QueueEntry) o;
return Objects.equals(_obj, that._obj);
}
@Override
public int hashCode() {
return Objects.hashCode(_obj);
}
}
private final HashSetDelayedBlockingQueue<QueueEntry> _writeQueue;
@Inject
ObjectPersistentStore objectPersistentStore;
@Inject
JObjectManager jObjectManager;
@Inject
JObjectSizeEstimator jObjectSizeEstimator;
@ConfigProperty(name = "dhfs.objects.writeback.delay")
long promotionDelay;
@ConfigProperty(name = "dhfs.objects.writeback.limit")
long sizeLimit;
@ConfigProperty(name = "dhfs.objects.writeback.nursery_limit")
int nurseryLimit;
@ConfigProperty(name = "dhfs.objects.writeback.watermark-high")
float watermarkHighRatio;
@ConfigProperty(name = "dhfs.objects.writeback.watermark-low")
float watermarkLowRatio;
@ConfigProperty(name = "dhfs.objects.writeback.threads")
int writebackThreads;
AtomicLong _currentSize = new AtomicLong(0);
boolean overload = false;
private Thread _promotionThread;
private final AtomicLong _currentSize = new AtomicLong(0);
private final AtomicBoolean _watermarkReached = new AtomicBoolean(false);
private final AtomicBoolean _shutdown = new AtomicBoolean(false);
private ExecutorService _writebackExecutor;
private ExecutorService _statusExecutor;
public JObjectWriteback(@ConfigProperty(name = "dhfs.objects.writeback.delay") long promotionDelay) {
_writeQueue = new HashSetDelayedBlockingQueue<>(promotionDelay);
}
@Startup
void init() {
_writebackExecutor = Executors.newFixedThreadPool(writebackThreads);
BasicThreadFactory factory = new BasicThreadFactory.Builder()
.namingPattern("writeback-%d")
.build();
_writebackExecutor = Executors.newFixedThreadPool(writebackThreads, factory);
_statusExecutor = Executors.newSingleThreadExecutor();
_statusExecutor.submit(() -> {
try {
while (true) {
Thread.sleep(1000);
Log.info("Writeback status: size="
+ _currentSize.get() / 1024 / 1024 + "MB"
+ " watermark=" + (_watermarkReached.get() ? "reached" : "not reached"));
}
} catch (InterruptedException ignored) {
}
});
for (int i = 0; i < writebackThreads; i++) {
_writebackExecutor.submit(this::writeback);
}
_promotionThread = new Thread(this::promote);
_promotionThread.setName("Writeback promotion thread");
_promotionThread.start();
}
void shutdown(@Observes @Priority(10) ShutdownEvent event) {
_promotionThread.interrupt();
while (_promotionThread.isAlive()) {
try {
_promotionThread.join();
} catch (InterruptedException ignored) {
}
}
_shutdown.set(true);
_writebackExecutor.shutdownNow();
_statusExecutor.shutdownNow();
HashSet<JObject<?>> toWrite = new LinkedHashSet<>();
toWrite.addAll(_nursery.keySet());
toWrite.addAll(_writeQueue.values());
var toWrite = _writeQueue.close();
Log.info("Flushing objects");
for (var v : toWrite) {
try {
flushOne(v);
flushOne(v._obj);
} catch (Exception e) {
Log.error("Failed writing object " + v.getName(), e);
Log.error("Failed writing object " + v._obj.getName(), e);
}
}
}
private void promote() {
try {
while (!Thread.interrupted()) {
var curTime = System.currentTimeMillis();
long wait = 0;
synchronized (_nursery) {
while (_nursery.isEmpty())
_nursery.wait();
curTime = System.currentTimeMillis();
if ((curTime - _nursery.firstEntry().getValue().getLeft()) <= promotionDelay) {
wait = promotionDelay - (curTime - _nursery.firstEntry().getValue().getLeft());
}
}
if (wait > 0)
Thread.sleep(wait);
synchronized (_nursery) {
while (!_nursery.isEmpty() && (curTime - _nursery.firstEntry().getValue().getLeft()) >= promotionDelay) {
var got = _nursery.pollFirstEntry();
synchronized (_writeQueue) {
_writeQueue.put(Pair.of(got.getValue().getRight(), got.getKey().getName()), got.getKey());
_writeQueue.notify();
}
}
}
}
} catch (InterruptedException ignored) {
}
Log.info("Writeback promotion thread exiting");
}
private void writeback() {
try {
while (!Thread.interrupted()) {
JObject<?> obj;
long removedSize;
synchronized (_writeQueue) {
while (_writeQueue.isEmpty())
_writeQueue.wait();
while (!_shutdown.get()) {
try {
QueueEntry got
= _watermarkReached.get()
? _writeQueue.getNoDelay()
: _writeQueue.get();
var fk = _writeQueue.lastKey();
removedSize = fk.getKey();
obj = _writeQueue.remove(fk);
}
try {
_currentSize.addAndGet(-removedSize);
flushOne(obj);
_currentSize.addAndGet(-got._size);
flushOne(got._obj);
} catch (Exception e) {
Log.error("Failed writing object " + obj.getName() + ", will retry.", e);
Log.error("Failed writing object " + got._obj.getName() + ", will retry.", e);
try {
obj.runReadLocked(JObject.ResolutionStrategy.NO_RESOLUTION, (m, d) -> {
got._obj.runReadLocked(JObject.ResolutionStrategy.NO_RESOLUTION, (m, d) -> {
var size = jObjectSizeEstimator.estimateObjectSize(d);
synchronized (_writeQueue) {
_writeQueue.put(Pair.of(size, m.getName()), obj);
}
_currentSize.addAndGet(size);
_writeQueue.add(new QueueEntry(got._obj, size));
return null;
});
} catch (DeletedObjectAccessException ignored) {
}
}
} catch (InterruptedException ignored) {
}
} catch (InterruptedException ignored) {
}
Log.info("Writeback thread exiting");
}
@@ -168,6 +153,7 @@ public class JObjectWriteback {
}
private <T extends JObjectData> void flushOneImmediate(ObjectMetadata m, T data) {
m.markWritten();
if (m.isDeleted()) {
if (!m.isDeletionCandidate())
throw new IllegalStateException("Object deleted but not deletable! " + m.getName());
@@ -177,7 +163,6 @@ public class JObjectWriteback {
objectPersistentStore.deleteObject(m.getName());
return;
}
m.markWritten();
objectPersistentStore.writeObject("meta_" + m.getName(), SerializationHelper.serialize(m));
if (data != null)
objectPersistentStore.writeObject(m.getName(), SerializationHelper.serialize(data));
@@ -185,20 +170,9 @@ public class JObjectWriteback {
public void remove(JObject<?> object) {
object.assertRWLock();
synchronized (_nursery) {
if (_nursery.containsKey(object)) {
var size = _nursery.get(object).getRight();
_nursery.remove(object);
_currentSize.addAndGet(-size);
}
}
synchronized (_writeQueue) {
if (_writeQueue.containsValue(object)) {
var size = _writeQueue.inverseBidiMap().get(object).getLeft();
_writeQueue.removeValue(object);
_currentSize.addAndGet(-size);
}
}
var got = _writeQueue.remove(new QueueEntry(object, 0));
if (got == null) return;
_currentSize.addAndGet(-got._size);
}
public void markDirty(JObject<?> object) {
@@ -208,69 +182,35 @@ public class JObjectWriteback {
return;
}
if (_currentSize.get() > (watermarkHighRatio * sizeLimit)) {
if (!_watermarkReached.get()) {
Log.trace("Watermark reached");
_watermarkReached.set(true);
_writeQueue.interrupt();
}
} else if (_currentSize.get() <= (watermarkLowRatio * sizeLimit)) {
if (_watermarkReached.get())
Log.trace("Watermark reset");
_watermarkReached.set(false);
}
if (_currentSize.get() > sizeLimit) {
try {
flushOneImmediate(object.getMeta(), object.getData());
return;
} catch (Exception e) {
Log.error("Failed writing object " + object.getName(), e);
throw e;
}
}
var size = jObjectSizeEstimator.estimateObjectSize(object.getData());
synchronized (_nursery) {
if (_nursery.containsKey(object)) {
long oldSize = _nursery.get(object).getRight();
if (oldSize == size)
return;
long oldTime = _nursery.get(object).getLeft();
if (nurseryLimit > 0 && size >= nurseryLimit) {
_nursery.remove(object);
_currentSize.addAndGet(-oldSize);
} else {
_nursery.replace(object, Pair.of(oldTime, size));
_currentSize.addAndGet(size - oldSize);
return;
}
}
}
var old = _writeQueue.readd(new QueueEntry(object, size));
synchronized (_writeQueue) {
if (_writeQueue.containsValue(object)) {
long oldSize = _writeQueue.getKey(object).getKey();
if (oldSize == size)
return;
_currentSize.addAndGet(size - oldSize);
_writeQueue.inverseBidiMap().replace(object, Pair.of(size, object.getName()));
return;
}
}
var curTime = System.currentTimeMillis();
if (nurseryLimit > 0 && size >= nurseryLimit) {
synchronized (_writeQueue) {
_currentSize.addAndGet(size);
_writeQueue.put(Pair.of(size, object.getName()), object);
_writeQueue.notify();
return;
}
}
synchronized (_nursery) {
if (_currentSize.get() < sizeLimit) {
if (overload) {
overload = false;
Log.trace("Writeback cache enabled");
}
_nursery.put(object, Pair.of(curTime, size));
_currentSize.addAndGet(size);
_nursery.notify();
return;
}
}
try {
if (!overload) {
overload = true;
Log.trace("Writeback cache disabled");
}
flushOneImmediate(object.getMeta(), object.getData());
} catch (Exception e) {
Log.error("Failed writing object " + object.getName(), e);
throw e;
}
if (old != null)
_currentSize.addAndGet(size - old._size);
else
_currentSize.addAndGet(size);
}
}

View File

@@ -1,9 +1,6 @@
package com.usatiuk.utils;
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedHashSet;
import java.util.Objects;
import java.util.*;
public class HashSetDelayedBlockingQueue<T> {
private class SetElement {
@@ -29,16 +26,22 @@ public class HashSetDelayedBlockingQueue<T> {
}
}
private final LinkedHashSet<SetElement> _set = new LinkedHashSet<>();
private final LinkedHashMap<SetElement, SetElement> _set = new LinkedHashMap<>();
private final LinkedHashSet<Thread> _waiting = new LinkedHashSet<>();
private final long _delay;
private boolean _closed = false;
public HashSetDelayedBlockingQueue(long delay) {
_delay = delay;
}
// If there's object with key in the queue, don't do anything
// Returns whether it was added or not
public boolean add(T el) {
synchronized (this) {
if (_set.add(new SetElement(el, System.currentTimeMillis()))) {
if (_closed) throw new IllegalStateException("Adding to a queue that is closed!");
var sel = new SetElement(el, System.currentTimeMillis());
if (_set.put(sel, sel) == null) {
this.notify();
return true;
}
@@ -46,23 +49,65 @@ public class HashSetDelayedBlockingQueue<T> {
return false;
}
// Adds the object to the queue, if it exists re-adds it with a new delay
// Returns the old object, or null
public T readd(T el) {
synchronized (this) {
if (_closed) throw new IllegalStateException("Adding to a queue that is closed!");
var sel = new SetElement(el, System.currentTimeMillis());
SetElement contains = _set.remove(sel);
_set.put(sel, sel);
this.notify();
if (contains != null)
return contains._el;
else return null;
}
}
// Adds the object to the queue, if it exists re-adds it with a new delay
// Returns true if the object wasn't in the queue
public T remove(T el) {
synchronized (this) {
var rem = _set.remove(new SetElement(el, 0));
if (rem == null) return null;
return rem._el;
}
}
public T get() throws InterruptedException {
while (!Thread.interrupted()) {
long sleep;
try {
synchronized (this) {
while (_set.isEmpty()) this.wait();
var curTime = System.currentTimeMillis();
var first = _set.getFirst()._time;
if (first + _delay > curTime) sleep = (first + _delay) - curTime;
else return _set.removeFirst()._el;
_waiting.add(Thread.currentThread());
}
while (!Thread.interrupted()) {
long sleep;
synchronized (this) {
while (_set.isEmpty()) this.wait();
var curTime = System.currentTimeMillis();
var first = _set.firstEntry().getValue()._time;
if (first + _delay > curTime) sleep = (first + _delay) - curTime;
else return _set.pollFirstEntry().getValue()._el;
}
Thread.sleep(sleep);
}
} finally {
synchronized (this) {
Thread.interrupted();
_waiting.remove(Thread.currentThread());
}
Thread.sleep(sleep);
}
throw new InterruptedException();
}
public T getNoDelay() throws InterruptedException {
synchronized (this) {
while (_set.isEmpty()) this.wait();
return _set.pollFirstEntry().getValue()._el;
}
}
public Collection<T> getAll() {
ArrayList<T> out = new ArrayList<>();
@@ -71,41 +116,65 @@ public class HashSetDelayedBlockingQueue<T> {
var curTime = System.currentTimeMillis();
while (!_set.isEmpty()) {
SetElement el = _set.getFirst();
SetElement el = _set.firstEntry().getValue();
if (el._time + _delay > curTime) break;
out.add(_set.removeFirst()._el);
out.add(_set.pollFirstEntry().getValue()._el);
}
}
return out;
}
public Collection<T> close() {
synchronized (this) {
_closed = true;
var ret = _set.values().stream().map(o -> o._el).toList();
_set.clear();
return ret;
}
}
public Collection<T> getAllWait() throws InterruptedException {
ArrayList<T> out = new ArrayList<>();
while (!Thread.interrupted()) {
long sleep = 0;
try {
synchronized (this) {
while (_set.isEmpty()) this.wait();
_waiting.add(Thread.currentThread());
}
while (!Thread.interrupted()) {
long sleep = 0;
synchronized (this) {
while (_set.isEmpty()) this.wait();
var curTime = System.currentTimeMillis();
var curTime = System.currentTimeMillis();
var first = _set.getFirst()._time;
if (first + _delay > curTime) sleep = (first + _delay) - curTime;
else {
while (!_set.isEmpty()) {
SetElement el = _set.getFirst();
if (el._time + _delay > curTime) break;
out.add(_set.removeFirst()._el);
var first = _set.firstEntry().getValue()._time;
if (first + _delay > curTime) sleep = (first + _delay) - curTime;
else {
while (!_set.isEmpty()) {
SetElement el = _set.firstEntry().getValue();
if (el._time + _delay > curTime) break;
out.add(_set.pollFirstEntry().getValue()._el);
}
}
}
if (sleep > 0)
Thread.sleep(sleep);
else
return out;
}
} finally {
synchronized (this) {
Thread.interrupted();
_waiting.remove(Thread.currentThread());
}
if (sleep > 0)
Thread.sleep(sleep);
else return out;
}
throw new InterruptedException();
}
public void interrupt() {
synchronized (this) {
for (var t : _waiting) t.interrupt();
}
}
}

View File

@@ -22,9 +22,9 @@ dhfs.files.write_merge_max_chunk_to_take=1
dhfs.files.write_last_chunk_limit=1.5
dhfs.objects.writeback.delay=50
dhfs.objects.writeback.limit=1073741824
dhfs.objects.writeback.threads=2
# Only objects with estimated size smaller than this will be put into nursery
dhfs.objects.writeback.nursery_limit=-1
dhfs.objects.writeback.watermark-high=0.6
dhfs.objects.writeback.watermark-low=0.4
dhfs.objects.writeback.threads=4
dhfs.objects.deletion.delay=0
dhfs.objects.ref_verification=true
dhfs.files.use_hash_for_chunks=false

View File

@@ -187,7 +187,7 @@ public class DhfsFileServiceSimpleTestImpl {
}
@Test
void moveTest2() {
void moveTest2() throws InterruptedException {
var ret = fileService.create("/moveTest", 777);
Assertions.assertTrue(ret.isPresent());
var uuid = ret.get();
@@ -213,9 +213,11 @@ public class DhfsFileServiceSimpleTestImpl {
var newfile = fileService.open("/movedTest").get();
Thread.sleep(1000);
chunkObj.runWriteLocked(JObject.ResolutionStrategy.LOCAL_ONLY, (m, d, b, v) -> {
Assertions.assertFalse(m.getReferrers().contains(uuid));
Assertions.assertTrue(m.getReferrers().contains(newfile));
Assertions.assertFalse(m.getReferrers().contains(uuid));
return null;
});
}

View File

@@ -4,7 +4,10 @@ import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class HashSetDelayedBlockingQueueTest {
@@ -43,4 +46,53 @@ public class HashSetDelayedBlockingQueueTest {
Assertions.assertTrue((gotTime - curTime) >= 1010);
}
@Test
void readdTest() throws InterruptedException {
var queue = new HashSetDelayedBlockingQueue<>(1000);
var curTime = System.currentTimeMillis();
var ex = Executors.newSingleThreadExecutor();
ex.submit(() -> {
try {
Thread.sleep(10);
queue.add("hello1");
queue.add("hello2");
Thread.sleep(800);
queue.add("hello1");
queue.add("hello2");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
var thing = queue.getAllWait(); // Theoretically you can get one...
if (thing.size() == 1) thing.add(queue.getAllWait());
var gotTime = System.currentTimeMillis();
Assertions.assertIterableEquals(List.of("hello1", "hello2"), thing);
Assertions.assertTrue((gotTime - curTime) >= 1810);
}
@Test
void interruptTest() throws InterruptedException, ExecutionException, TimeoutException {
var queue = new HashSetDelayedBlockingQueue<>(100000);
var curTime = System.currentTimeMillis();
var ex = Executors.newSingleThreadExecutor();
var future = ex.submit(() -> {
Assertions.assertThrows(InterruptedException.class, queue::get);
Assertions.assertThrows(InterruptedException.class, queue::getAllWait);
Assertions.assertTrue((System.currentTimeMillis() - curTime) < 2000);
Thread.sleep(1000);
return null;
});
Thread.sleep(500);
queue.interrupt();
Thread.sleep(500);
queue.interrupt();
future.get(10, TimeUnit.SECONDS);
}
}