mirror of
https://github.com/usatiuk/dhfs.git
synced 2025-10-28 20:47:49 +01:00
less crappy writeback
This commit is contained in:
@@ -113,6 +113,11 @@
|
||||
<groupId>commons-codec</groupId>
|
||||
<artifactId>commons-codec</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.commons</groupId>
|
||||
<artifactId>commons-collections4</artifactId>
|
||||
<version>4.5.0-M2</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
|
||||
@@ -63,4 +63,9 @@ public class ChunkData extends JObjectData {
|
||||
public boolean assumeUnique() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long estimateSize() {
|
||||
return _bytes.size();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -56,4 +56,9 @@ public class ChunkInfo extends JObjectData {
|
||||
public boolean assumeUnique() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long estimateSize() {
|
||||
return _hash.length() * 2L;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -48,4 +48,9 @@ public class Directory extends FsNode {
|
||||
public List<String> getChildrenList() {
|
||||
return _children.keySet().stream().toList();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long estimateSize() {
|
||||
return _children.size() * 16L;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -30,4 +30,9 @@ public class File extends FsNode {
|
||||
|
||||
@Getter
|
||||
private final UUID _parent;
|
||||
|
||||
@Override
|
||||
public long estimateSize() {
|
||||
return _chunks.size() * 16L;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5,6 +5,7 @@ import com.usatiuk.dhfs.storage.objects.repository.distributed.ObjectMetadata;
|
||||
import io.quarkus.logging.Log;
|
||||
import jakarta.annotation.Nullable;
|
||||
import org.apache.commons.lang3.NotImplementedException;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.Comparator;
|
||||
@@ -14,7 +15,25 @@ import java.util.UUID;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
|
||||
public class JObject<T extends JObjectData> implements Serializable {
|
||||
public class JObject<T extends JObjectData> implements Serializable, Comparable<JObject<?>> {
|
||||
@Override
|
||||
public int compareTo(@NotNull JObject<?> o) {
|
||||
return getName().compareTo(o.getName());
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) return true;
|
||||
if (o == null || getClass() != o.getClass()) return false;
|
||||
JObject<?> jObject = (JObject<?>) o;
|
||||
return Objects.equals(_metaPart.getName(), jObject._metaPart.getName());
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hashCode(_metaPart.getName());
|
||||
}
|
||||
|
||||
public static class DeletedObjectAccessException extends RuntimeException {
|
||||
}
|
||||
|
||||
@@ -153,6 +172,7 @@ public class JObject<T extends JObjectData> implements Serializable {
|
||||
var ref = _metaPart.getRefcount();
|
||||
boolean wasSeen = _metaPart.isSeen();
|
||||
boolean wasDeleted = _metaPart.isDeleted();
|
||||
var prevData = _dataPart.get();
|
||||
VoidFn invalidateFn = () -> {
|
||||
_resolver.backupRefs(this);
|
||||
_dataPart.set(null);
|
||||
@@ -163,7 +183,8 @@ public class JObject<T extends JObjectData> implements Serializable {
|
||||
if (!Objects.equals(ver, _metaPart.getOurVersion())
|
||||
|| ref != _metaPart.getRefcount()
|
||||
|| wasDeleted != _metaPart.isDeleted()
|
||||
|| wasSeen != _metaPart.isSeen())
|
||||
|| wasSeen != _metaPart.isSeen()
|
||||
|| prevData != _dataPart.get())
|
||||
notifyWriteMeta();
|
||||
if (!Objects.equals(ver, _metaPart.getOurVersion()))
|
||||
notifyWriteData();
|
||||
|
||||
@@ -14,4 +14,8 @@ public abstract class JObjectData implements Serializable {
|
||||
}
|
||||
|
||||
public abstract Collection<String> extractRefs();
|
||||
|
||||
public long estimateSize() {
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -222,7 +222,6 @@ public class JObjectManagerImpl implements JObjectManager {
|
||||
refs = Streams.concat(refs, object.getData().extractRefs().stream());
|
||||
|
||||
object.discardData();
|
||||
jObjectWriteback.hintDeletion(m);
|
||||
|
||||
refs.forEach(c -> get(c).ifPresent(ref -> ref.runWriteLocked(JObject.ResolutionStrategy.NO_RESOLUTION, (mc, dc, bc, ic) -> {
|
||||
mc.removeRef(object.getName());
|
||||
|
||||
@@ -91,7 +91,6 @@ public class JObjectRefProcessor {
|
||||
refs = Streams.concat(refs, got.get().getData().extractRefs().stream());
|
||||
|
||||
got.get().discardData();
|
||||
jObjectWriteback.hintDeletion(m);
|
||||
|
||||
refs.forEach(c -> {
|
||||
jObjectManager.get(c).ifPresent(ref -> ref.runWriteLocked(JObject.ResolutionStrategy.NO_RESOLUTION, (mc, dc, bc, ic) -> {
|
||||
|
||||
@@ -104,7 +104,7 @@ public class JObjectResolver {
|
||||
if (local.isPresent()) return local.get();
|
||||
|
||||
var obj = remoteObjectServiceClient.getObject(jObject);
|
||||
jObjectWriteback.markDirty(jObject.getName(), jObject);
|
||||
jObjectWriteback.markDirty(jObject);
|
||||
invalidationQueueService.pushInvalidationToAll(jObject.getName(), !jObject.getMeta().isSeen());
|
||||
return SerializationHelper.deserialize(obj);
|
||||
}
|
||||
@@ -113,7 +113,7 @@ public class JObjectResolver {
|
||||
jObject.assertRWLock();
|
||||
try {
|
||||
Log.trace("Invalidating " + name);
|
||||
jObjectWriteback.remove(name);
|
||||
jObjectWriteback.remove(jObject);
|
||||
objectPersistentStore.deleteObject(name);
|
||||
} catch (StatusRuntimeException sx) {
|
||||
if (sx.getStatus() != Status.NOT_FOUND)
|
||||
@@ -125,12 +125,12 @@ public class JObjectResolver {
|
||||
|
||||
public void notifyWriteMeta(JObject<?> self) {
|
||||
self.assertRWLock();
|
||||
jObjectWriteback.markDirty(self.getName(), self);
|
||||
jObjectWriteback.markDirty(self);
|
||||
}
|
||||
|
||||
public void notifyWriteData(JObject<?> self) {
|
||||
self.assertRWLock();
|
||||
jObjectWriteback.markDirty(self.getName(), self);
|
||||
jObjectWriteback.markDirty(self);
|
||||
if (self.isResolved())
|
||||
invalidationQueueService.pushInvalidationToAll(self.getName(), !self.getMeta().isSeen());
|
||||
}
|
||||
|
||||
@@ -0,0 +1,11 @@
|
||||
package com.usatiuk.dhfs.storage.objects.jrepository;
|
||||
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
|
||||
@ApplicationScoped
|
||||
public class JObjectSizeEstimator {
|
||||
public long estimateObjectSize(JObjectData d) {
|
||||
if (d == null) return 200; // Assume metadata etc takes up something
|
||||
else return d.estimateSize();
|
||||
}
|
||||
}
|
||||
@@ -10,12 +10,15 @@ import jakarta.annotation.Priority;
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
import jakarta.enterprise.event.Observes;
|
||||
import jakarta.inject.Inject;
|
||||
import org.apache.commons.collections4.OrderedBidiMap;
|
||||
import org.apache.commons.collections4.bidimap.TreeBidiMap;
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
import org.eclipse.microprofile.config.inject.ConfigProperty;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.HashSet;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.LinkedHashSet;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
@ApplicationScoped
|
||||
public class JObjectWriteback {
|
||||
@@ -26,15 +29,23 @@ public class JObjectWriteback {
|
||||
@Inject
|
||||
JObjectManager jObjectManager;
|
||||
|
||||
@Inject
|
||||
JObjectSizeEstimator jObjectSizeEstimator;
|
||||
|
||||
@ConfigProperty(name = "dhfs.objects.writeback.delay")
|
||||
Integer delay;
|
||||
long promotionDelay;
|
||||
|
||||
@ConfigProperty(name = "dhfs.objects.writeback.limit")
|
||||
Integer limit;
|
||||
long sizeLimit;
|
||||
|
||||
private final LinkedHashMap<String, Pair<Long, JObject<?>>> _objects = new LinkedHashMap<>();
|
||||
AtomicLong _currentSize = new AtomicLong(0);
|
||||
|
||||
private final LinkedHashMap<JObject<?>, Pair<Long, Long>> _nursery = new LinkedHashMap<>();
|
||||
// FIXME: Kind of a hack
|
||||
private final OrderedBidiMap<Pair<Long, String>, JObject<?>> _writeQueue = new TreeBidiMap<>();
|
||||
|
||||
private Thread _writebackThread;
|
||||
private Thread _promotionThread;
|
||||
|
||||
boolean overload = false;
|
||||
|
||||
@@ -43,9 +54,20 @@ public class JObjectWriteback {
|
||||
_writebackThread = new Thread(this::writeback);
|
||||
_writebackThread.setName("JObject writeback thread");
|
||||
_writebackThread.start();
|
||||
_promotionThread = new Thread(this::promote);
|
||||
_promotionThread.setName("Writeback promotion thread");
|
||||
_promotionThread.start();
|
||||
}
|
||||
|
||||
void shutdown(@Observes @Priority(10) ShutdownEvent event) {
|
||||
_promotionThread.interrupt();
|
||||
while (_promotionThread.isAlive()) {
|
||||
try {
|
||||
_promotionThread.join();
|
||||
} catch (InterruptedException ignored) {
|
||||
}
|
||||
}
|
||||
|
||||
_writebackThread.interrupt();
|
||||
while (_writebackThread.isAlive()) {
|
||||
try {
|
||||
@@ -54,48 +76,80 @@ public class JObjectWriteback {
|
||||
}
|
||||
}
|
||||
|
||||
Collection<Pair<Long, JObject<?>>> toWrite;
|
||||
synchronized (_objects) {
|
||||
toWrite = new ArrayList<>(_objects.values());
|
||||
}
|
||||
HashSet<JObject<?>> toWrite = new LinkedHashSet<>();
|
||||
toWrite.addAll(_nursery.keySet());
|
||||
toWrite.addAll(_writeQueue.values());
|
||||
|
||||
Log.info("Flushing objects");
|
||||
for (var v : toWrite) {
|
||||
try {
|
||||
flushOne(v.getRight());
|
||||
flushOne(v);
|
||||
} catch (Exception e) {
|
||||
Log.error("Failed writing object " + v.getRight().getName(), e);
|
||||
Log.error("Failed writing object " + v.getName(), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void promote() {
|
||||
try {
|
||||
while (!Thread.interrupted()) {
|
||||
|
||||
var curTime = System.currentTimeMillis();
|
||||
|
||||
long wait = 0;
|
||||
|
||||
synchronized (_nursery) {
|
||||
while (_nursery.isEmpty())
|
||||
_nursery.wait();
|
||||
|
||||
if ((curTime - _nursery.firstEntry().getValue().getLeft()) <= promotionDelay) {
|
||||
wait = promotionDelay - (curTime - _nursery.firstEntry().getValue().getLeft());
|
||||
}
|
||||
}
|
||||
|
||||
if (wait > 0)
|
||||
Thread.sleep(wait);
|
||||
|
||||
synchronized (_nursery) {
|
||||
while (!_nursery.isEmpty() && (curTime - _nursery.firstEntry().getValue().getLeft()) >= promotionDelay) {
|
||||
var got = _nursery.pollFirstEntry();
|
||||
synchronized (_writeQueue) {
|
||||
_writeQueue.put(Pair.of(got.getValue().getRight(), got.getKey().getName()), got.getKey());
|
||||
_writeQueue.notifyAll();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (InterruptedException ignored) {
|
||||
}
|
||||
Log.info("Writeback promotion thread exiting");
|
||||
}
|
||||
|
||||
private void writeback() {
|
||||
try {
|
||||
boolean wait = false;
|
||||
while (!Thread.interrupted()) {
|
||||
if (wait) {
|
||||
Thread.sleep(delay);
|
||||
wait = false;
|
||||
}
|
||||
JObject<?> obj;
|
||||
synchronized (_objects) {
|
||||
while (_objects.isEmpty())
|
||||
_objects.wait();
|
||||
long removedSize;
|
||||
synchronized (_writeQueue) {
|
||||
while (_writeQueue.isEmpty())
|
||||
_writeQueue.wait();
|
||||
|
||||
if ((System.currentTimeMillis() - _objects.firstEntry().getValue().getLeft()) < delay) {
|
||||
wait = true;
|
||||
continue;
|
||||
}
|
||||
|
||||
var entry = _objects.pollFirstEntry();
|
||||
if (entry == null) break;
|
||||
obj = entry.getValue().getRight();
|
||||
var fk = _writeQueue.lastKey();
|
||||
removedSize = fk.getKey();
|
||||
obj = _writeQueue.remove(fk);
|
||||
}
|
||||
try {
|
||||
_currentSize.addAndGet(-removedSize);
|
||||
flushOne(obj);
|
||||
} catch (Exception e) {
|
||||
Log.error("Failed writing object " + obj.getName(), e);
|
||||
synchronized (_objects) {
|
||||
_objects.put(obj.getName(), Pair.of(System.currentTimeMillis(), obj));
|
||||
}
|
||||
Log.error("Failed writing object " + obj.getName() + ", will retry.", e);
|
||||
obj.runReadLocked(JObject.ResolutionStrategy.NO_RESOLUTION, (m, d) -> {
|
||||
var size = jObjectSizeEstimator.estimateObjectSize(d);
|
||||
synchronized (_writeQueue) {
|
||||
_writeQueue.put(Pair.of(size, m.getName()), obj);
|
||||
}
|
||||
return null;
|
||||
});
|
||||
}
|
||||
}
|
||||
} catch (InterruptedException ignored) {
|
||||
@@ -132,69 +186,67 @@ public class JObjectWriteback {
|
||||
objectPersistentStore.writeObject(m.getName(), SerializationHelper.serialize(data));
|
||||
}
|
||||
|
||||
public void remove(String name) {
|
||||
synchronized (_objects) {
|
||||
_objects.remove(name);
|
||||
}
|
||||
}
|
||||
|
||||
private void tryClean() {
|
||||
JObject<?> found = null;
|
||||
synchronized (_objects) {
|
||||
if (_objects.size() >= limit) {
|
||||
for (var obj : _objects.entrySet()) {
|
||||
var jobj = obj.getValue().getValue();
|
||||
if (jobj.isDeleted()) {
|
||||
if (jobj.tryRwLock()) {
|
||||
if (!jobj.isDeleted()) {
|
||||
jobj.rwUnlock();
|
||||
continue;
|
||||
}
|
||||
found = jobj;
|
||||
_objects.remove(found.getName());
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if (found != null)
|
||||
try {
|
||||
flushOneImmediate(found.getMeta(), null);
|
||||
} finally {
|
||||
found.rwUnlock();
|
||||
}
|
||||
}
|
||||
|
||||
public void hintDeletion(ObjectMetadata meta) {
|
||||
synchronized (_objects) {
|
||||
if (!meta.isWritten()) {
|
||||
_objects.remove(meta.getName());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void markDirty(String name, JObject<?> object) {
|
||||
public void remove(JObject<?> object) {
|
||||
object.assertRWLock();
|
||||
if (object.isDeleted() && !object.getMeta().isWritten())
|
||||
return;
|
||||
synchronized (_nursery) {
|
||||
if (_nursery.containsKey(object)) {
|
||||
var size = _nursery.get(object).getRight();
|
||||
_nursery.remove(object);
|
||||
_currentSize.addAndGet(-size);
|
||||
}
|
||||
}
|
||||
synchronized (_writeQueue) {
|
||||
if (_writeQueue.containsValue(object)) {
|
||||
var size = _writeQueue.inverseBidiMap().get(object).getLeft();
|
||||
_writeQueue.removeValue(object);
|
||||
_currentSize.addAndGet(-size);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
synchronized (_objects) {
|
||||
if (_objects.containsKey(name))
|
||||
return;
|
||||
public void markDirty(JObject<?> object) {
|
||||
object.assertRWLock();
|
||||
if (object.isDeleted() && !object.getMeta().isWritten()) {
|
||||
remove(object);
|
||||
return;
|
||||
}
|
||||
|
||||
tryClean();
|
||||
var size = jObjectSizeEstimator.estimateObjectSize(object.getData());
|
||||
|
||||
synchronized (_objects) {
|
||||
// FIXME: better logic
|
||||
if (_objects.size() < limit) {
|
||||
synchronized (_nursery) {
|
||||
if (_nursery.containsKey(object)) {
|
||||
long oldSize = _nursery.get(object).getRight();
|
||||
if (oldSize == size)
|
||||
return;
|
||||
long oldTime = _nursery.get(object).getLeft();
|
||||
_nursery.replace(object, Pair.of(oldTime, size));
|
||||
_currentSize.addAndGet(size - oldSize);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
synchronized (_writeQueue) {
|
||||
if (_writeQueue.containsValue(object)) {
|
||||
long oldSize = _writeQueue.getKey(object).getKey();
|
||||
if (oldSize == size)
|
||||
return;
|
||||
_currentSize.addAndGet(size - oldSize);
|
||||
_writeQueue.inverseBidiMap().replace(object, Pair.of(size, object.getName()));
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
var curTime = System.currentTimeMillis();
|
||||
|
||||
synchronized (_nursery) {
|
||||
if (_currentSize.get() < sizeLimit) {
|
||||
if (overload) {
|
||||
overload = false;
|
||||
Log.trace("Writeback cache enabled");
|
||||
}
|
||||
_objects.put(name, Pair.of(System.currentTimeMillis(), object));
|
||||
_objects.notifyAll();
|
||||
_nursery.put(object, Pair.of(curTime, size));
|
||||
_currentSize.addAndGet(size);
|
||||
_nursery.notifyAll();
|
||||
return;
|
||||
}
|
||||
}
|
||||
@@ -207,9 +259,7 @@ public class JObjectWriteback {
|
||||
flushOneImmediate(object.getMeta(), object.getData());
|
||||
} catch (Exception e) {
|
||||
Log.error("Failed writing object " + object.getName(), e);
|
||||
synchronized (_objects) {
|
||||
_objects.put(object.getName(), Pair.of(System.currentTimeMillis(), object));
|
||||
}
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -13,8 +13,8 @@ dhfs.fuse.root=${HOME}/dhfs_data/dhfs_fuse_root
|
||||
dhfs.fuse.debug=false
|
||||
dhfs.fuse.enabled=true
|
||||
dhfs.storage.files.target_chunk_size=1048576
|
||||
dhfs.objects.writeback.delay=200
|
||||
dhfs.objects.writeback.limit=3000
|
||||
dhfs.objects.writeback.delay=100
|
||||
dhfs.objects.writeback.limit=1073741824
|
||||
dhfs.objects.deletion.delay=0
|
||||
dhfs.objects.ref_verification=false
|
||||
dhfs.files.use_hash_for_chunks=false
|
||||
Reference in New Issue
Block a user