From e72698608164ac40e56bdb92f18f5e625ed73618 Mon Sep 17 00:00:00 2001 From: Stepan Usatiuk Date: Sun, 30 Jun 2024 13:23:05 +0200 Subject: [PATCH] less crappy writeback --- server/pom.xml | 5 + .../dhfs/storage/files/objects/ChunkData.java | 5 + .../dhfs/storage/files/objects/ChunkInfo.java | 5 + .../dhfs/storage/files/objects/Directory.java | 5 + .../dhfs/storage/files/objects/File.java | 5 + .../storage/objects/jrepository/JObject.java | 25 +- .../objects/jrepository/JObjectData.java | 4 + .../jrepository/JObjectManagerImpl.java | 1 - .../jrepository/JObjectRefProcessor.java | 1 - .../objects/jrepository/JObjectResolver.java | 8 +- .../jrepository/JObjectSizeEstimator.java | 11 + .../objects/jrepository/JObjectWriteback.java | 226 +++++++++++------- .../src/main/resources/application.properties | 4 +- 13 files changed, 207 insertions(+), 98 deletions(-) create mode 100644 server/src/main/java/com/usatiuk/dhfs/storage/objects/jrepository/JObjectSizeEstimator.java diff --git a/server/pom.xml b/server/pom.xml index fec33891..eb5e8fdb 100644 --- a/server/pom.xml +++ b/server/pom.xml @@ -113,6 +113,11 @@ commons-codec commons-codec + + org.apache.commons + commons-collections4 + 4.5.0-M2 + diff --git a/server/src/main/java/com/usatiuk/dhfs/storage/files/objects/ChunkData.java b/server/src/main/java/com/usatiuk/dhfs/storage/files/objects/ChunkData.java index 518c2354..111b4a53 100644 --- a/server/src/main/java/com/usatiuk/dhfs/storage/files/objects/ChunkData.java +++ b/server/src/main/java/com/usatiuk/dhfs/storage/files/objects/ChunkData.java @@ -63,4 +63,9 @@ public class ChunkData extends JObjectData { public boolean assumeUnique() { return true; } + + @Override + public long estimateSize() { + return _bytes.size(); + } } diff --git a/server/src/main/java/com/usatiuk/dhfs/storage/files/objects/ChunkInfo.java b/server/src/main/java/com/usatiuk/dhfs/storage/files/objects/ChunkInfo.java index 09f81313..71b5affe 100644 --- a/server/src/main/java/com/usatiuk/dhfs/storage/files/objects/ChunkInfo.java +++ b/server/src/main/java/com/usatiuk/dhfs/storage/files/objects/ChunkInfo.java @@ -56,4 +56,9 @@ public class ChunkInfo extends JObjectData { public boolean assumeUnique() { return true; } + + @Override + public long estimateSize() { + return _hash.length() * 2L; + } } diff --git a/server/src/main/java/com/usatiuk/dhfs/storage/files/objects/Directory.java b/server/src/main/java/com/usatiuk/dhfs/storage/files/objects/Directory.java index b2732b01..ae81c582 100644 --- a/server/src/main/java/com/usatiuk/dhfs/storage/files/objects/Directory.java +++ b/server/src/main/java/com/usatiuk/dhfs/storage/files/objects/Directory.java @@ -48,4 +48,9 @@ public class Directory extends FsNode { public List getChildrenList() { return _children.keySet().stream().toList(); } + + @Override + public long estimateSize() { + return _children.size() * 16L; + } } diff --git a/server/src/main/java/com/usatiuk/dhfs/storage/files/objects/File.java b/server/src/main/java/com/usatiuk/dhfs/storage/files/objects/File.java index d11ae662..01269237 100644 --- a/server/src/main/java/com/usatiuk/dhfs/storage/files/objects/File.java +++ b/server/src/main/java/com/usatiuk/dhfs/storage/files/objects/File.java @@ -30,4 +30,9 @@ public class File extends FsNode { @Getter private final UUID _parent; + + @Override + public long estimateSize() { + return _chunks.size() * 16L; + } } diff --git a/server/src/main/java/com/usatiuk/dhfs/storage/objects/jrepository/JObject.java b/server/src/main/java/com/usatiuk/dhfs/storage/objects/jrepository/JObject.java index 0cbae47b..1689bde2 100644 --- a/server/src/main/java/com/usatiuk/dhfs/storage/objects/jrepository/JObject.java +++ b/server/src/main/java/com/usatiuk/dhfs/storage/objects/jrepository/JObject.java @@ -5,6 +5,7 @@ import com.usatiuk.dhfs.storage.objects.repository.distributed.ObjectMetadata; import io.quarkus.logging.Log; import jakarta.annotation.Nullable; import org.apache.commons.lang3.NotImplementedException; +import org.jetbrains.annotations.NotNull; import java.io.Serializable; import java.util.Comparator; @@ -14,7 +15,25 @@ import java.util.UUID; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReentrantReadWriteLock; -public class JObject implements Serializable { +public class JObject implements Serializable, Comparable> { + @Override + public int compareTo(@NotNull JObject o) { + return getName().compareTo(o.getName()); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + JObject jObject = (JObject) o; + return Objects.equals(_metaPart.getName(), jObject._metaPart.getName()); + } + + @Override + public int hashCode() { + return Objects.hashCode(_metaPart.getName()); + } + public static class DeletedObjectAccessException extends RuntimeException { } @@ -153,6 +172,7 @@ public class JObject implements Serializable { var ref = _metaPart.getRefcount(); boolean wasSeen = _metaPart.isSeen(); boolean wasDeleted = _metaPart.isDeleted(); + var prevData = _dataPart.get(); VoidFn invalidateFn = () -> { _resolver.backupRefs(this); _dataPart.set(null); @@ -163,7 +183,8 @@ public class JObject implements Serializable { if (!Objects.equals(ver, _metaPart.getOurVersion()) || ref != _metaPart.getRefcount() || wasDeleted != _metaPart.isDeleted() - || wasSeen != _metaPart.isSeen()) + || wasSeen != _metaPart.isSeen() + || prevData != _dataPart.get()) notifyWriteMeta(); if (!Objects.equals(ver, _metaPart.getOurVersion())) notifyWriteData(); diff --git a/server/src/main/java/com/usatiuk/dhfs/storage/objects/jrepository/JObjectData.java b/server/src/main/java/com/usatiuk/dhfs/storage/objects/jrepository/JObjectData.java index a43fd8c9..ba7618e2 100644 --- a/server/src/main/java/com/usatiuk/dhfs/storage/objects/jrepository/JObjectData.java +++ b/server/src/main/java/com/usatiuk/dhfs/storage/objects/jrepository/JObjectData.java @@ -14,4 +14,8 @@ public abstract class JObjectData implements Serializable { } public abstract Collection extractRefs(); + + public long estimateSize() { + return 0; + } } diff --git a/server/src/main/java/com/usatiuk/dhfs/storage/objects/jrepository/JObjectManagerImpl.java b/server/src/main/java/com/usatiuk/dhfs/storage/objects/jrepository/JObjectManagerImpl.java index dbb3a8c0..2e0c1d95 100644 --- a/server/src/main/java/com/usatiuk/dhfs/storage/objects/jrepository/JObjectManagerImpl.java +++ b/server/src/main/java/com/usatiuk/dhfs/storage/objects/jrepository/JObjectManagerImpl.java @@ -222,7 +222,6 @@ public class JObjectManagerImpl implements JObjectManager { refs = Streams.concat(refs, object.getData().extractRefs().stream()); object.discardData(); - jObjectWriteback.hintDeletion(m); refs.forEach(c -> get(c).ifPresent(ref -> ref.runWriteLocked(JObject.ResolutionStrategy.NO_RESOLUTION, (mc, dc, bc, ic) -> { mc.removeRef(object.getName()); diff --git a/server/src/main/java/com/usatiuk/dhfs/storage/objects/jrepository/JObjectRefProcessor.java b/server/src/main/java/com/usatiuk/dhfs/storage/objects/jrepository/JObjectRefProcessor.java index 1d69446f..dc59ed88 100644 --- a/server/src/main/java/com/usatiuk/dhfs/storage/objects/jrepository/JObjectRefProcessor.java +++ b/server/src/main/java/com/usatiuk/dhfs/storage/objects/jrepository/JObjectRefProcessor.java @@ -91,7 +91,6 @@ public class JObjectRefProcessor { refs = Streams.concat(refs, got.get().getData().extractRefs().stream()); got.get().discardData(); - jObjectWriteback.hintDeletion(m); refs.forEach(c -> { jObjectManager.get(c).ifPresent(ref -> ref.runWriteLocked(JObject.ResolutionStrategy.NO_RESOLUTION, (mc, dc, bc, ic) -> { diff --git a/server/src/main/java/com/usatiuk/dhfs/storage/objects/jrepository/JObjectResolver.java b/server/src/main/java/com/usatiuk/dhfs/storage/objects/jrepository/JObjectResolver.java index d53676e9..7a09a5bf 100644 --- a/server/src/main/java/com/usatiuk/dhfs/storage/objects/jrepository/JObjectResolver.java +++ b/server/src/main/java/com/usatiuk/dhfs/storage/objects/jrepository/JObjectResolver.java @@ -104,7 +104,7 @@ public class JObjectResolver { if (local.isPresent()) return local.get(); var obj = remoteObjectServiceClient.getObject(jObject); - jObjectWriteback.markDirty(jObject.getName(), jObject); + jObjectWriteback.markDirty(jObject); invalidationQueueService.pushInvalidationToAll(jObject.getName(), !jObject.getMeta().isSeen()); return SerializationHelper.deserialize(obj); } @@ -113,7 +113,7 @@ public class JObjectResolver { jObject.assertRWLock(); try { Log.trace("Invalidating " + name); - jObjectWriteback.remove(name); + jObjectWriteback.remove(jObject); objectPersistentStore.deleteObject(name); } catch (StatusRuntimeException sx) { if (sx.getStatus() != Status.NOT_FOUND) @@ -125,12 +125,12 @@ public class JObjectResolver { public void notifyWriteMeta(JObject self) { self.assertRWLock(); - jObjectWriteback.markDirty(self.getName(), self); + jObjectWriteback.markDirty(self); } public void notifyWriteData(JObject self) { self.assertRWLock(); - jObjectWriteback.markDirty(self.getName(), self); + jObjectWriteback.markDirty(self); if (self.isResolved()) invalidationQueueService.pushInvalidationToAll(self.getName(), !self.getMeta().isSeen()); } diff --git a/server/src/main/java/com/usatiuk/dhfs/storage/objects/jrepository/JObjectSizeEstimator.java b/server/src/main/java/com/usatiuk/dhfs/storage/objects/jrepository/JObjectSizeEstimator.java new file mode 100644 index 00000000..7098e545 --- /dev/null +++ b/server/src/main/java/com/usatiuk/dhfs/storage/objects/jrepository/JObjectSizeEstimator.java @@ -0,0 +1,11 @@ +package com.usatiuk.dhfs.storage.objects.jrepository; + +import jakarta.enterprise.context.ApplicationScoped; + +@ApplicationScoped +public class JObjectSizeEstimator { + public long estimateObjectSize(JObjectData d) { + if (d == null) return 200; // Assume metadata etc takes up something + else return d.estimateSize(); + } +} diff --git a/server/src/main/java/com/usatiuk/dhfs/storage/objects/jrepository/JObjectWriteback.java b/server/src/main/java/com/usatiuk/dhfs/storage/objects/jrepository/JObjectWriteback.java index 7f4676d3..b40a0c8e 100644 --- a/server/src/main/java/com/usatiuk/dhfs/storage/objects/jrepository/JObjectWriteback.java +++ b/server/src/main/java/com/usatiuk/dhfs/storage/objects/jrepository/JObjectWriteback.java @@ -10,12 +10,15 @@ import jakarta.annotation.Priority; import jakarta.enterprise.context.ApplicationScoped; import jakarta.enterprise.event.Observes; import jakarta.inject.Inject; +import org.apache.commons.collections4.OrderedBidiMap; +import org.apache.commons.collections4.bidimap.TreeBidiMap; import org.apache.commons.lang3.tuple.Pair; import org.eclipse.microprofile.config.inject.ConfigProperty; -import java.util.ArrayList; -import java.util.Collection; +import java.util.HashSet; import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.concurrent.atomic.AtomicLong; @ApplicationScoped public class JObjectWriteback { @@ -26,15 +29,23 @@ public class JObjectWriteback { @Inject JObjectManager jObjectManager; + @Inject + JObjectSizeEstimator jObjectSizeEstimator; + @ConfigProperty(name = "dhfs.objects.writeback.delay") - Integer delay; + long promotionDelay; @ConfigProperty(name = "dhfs.objects.writeback.limit") - Integer limit; + long sizeLimit; - private final LinkedHashMap>> _objects = new LinkedHashMap<>(); + AtomicLong _currentSize = new AtomicLong(0); + + private final LinkedHashMap, Pair> _nursery = new LinkedHashMap<>(); + // FIXME: Kind of a hack + private final OrderedBidiMap, JObject> _writeQueue = new TreeBidiMap<>(); private Thread _writebackThread; + private Thread _promotionThread; boolean overload = false; @@ -43,9 +54,20 @@ public class JObjectWriteback { _writebackThread = new Thread(this::writeback); _writebackThread.setName("JObject writeback thread"); _writebackThread.start(); + _promotionThread = new Thread(this::promote); + _promotionThread.setName("Writeback promotion thread"); + _promotionThread.start(); } void shutdown(@Observes @Priority(10) ShutdownEvent event) { + _promotionThread.interrupt(); + while (_promotionThread.isAlive()) { + try { + _promotionThread.join(); + } catch (InterruptedException ignored) { + } + } + _writebackThread.interrupt(); while (_writebackThread.isAlive()) { try { @@ -54,48 +76,80 @@ public class JObjectWriteback { } } - Collection>> toWrite; - synchronized (_objects) { - toWrite = new ArrayList<>(_objects.values()); - } + HashSet> toWrite = new LinkedHashSet<>(); + toWrite.addAll(_nursery.keySet()); + toWrite.addAll(_writeQueue.values()); + + Log.info("Flushing objects"); for (var v : toWrite) { try { - flushOne(v.getRight()); + flushOne(v); } catch (Exception e) { - Log.error("Failed writing object " + v.getRight().getName(), e); + Log.error("Failed writing object " + v.getName(), e); } } } + private void promote() { + try { + while (!Thread.interrupted()) { + + var curTime = System.currentTimeMillis(); + + long wait = 0; + + synchronized (_nursery) { + while (_nursery.isEmpty()) + _nursery.wait(); + + if ((curTime - _nursery.firstEntry().getValue().getLeft()) <= promotionDelay) { + wait = promotionDelay - (curTime - _nursery.firstEntry().getValue().getLeft()); + } + } + + if (wait > 0) + Thread.sleep(wait); + + synchronized (_nursery) { + while (!_nursery.isEmpty() && (curTime - _nursery.firstEntry().getValue().getLeft()) >= promotionDelay) { + var got = _nursery.pollFirstEntry(); + synchronized (_writeQueue) { + _writeQueue.put(Pair.of(got.getValue().getRight(), got.getKey().getName()), got.getKey()); + _writeQueue.notifyAll(); + } + } + } + } + } catch (InterruptedException ignored) { + } + Log.info("Writeback promotion thread exiting"); + } + private void writeback() { try { - boolean wait = false; while (!Thread.interrupted()) { - if (wait) { - Thread.sleep(delay); - wait = false; - } JObject obj; - synchronized (_objects) { - while (_objects.isEmpty()) - _objects.wait(); + long removedSize; + synchronized (_writeQueue) { + while (_writeQueue.isEmpty()) + _writeQueue.wait(); - if ((System.currentTimeMillis() - _objects.firstEntry().getValue().getLeft()) < delay) { - wait = true; - continue; - } - - var entry = _objects.pollFirstEntry(); - if (entry == null) break; - obj = entry.getValue().getRight(); + var fk = _writeQueue.lastKey(); + removedSize = fk.getKey(); + obj = _writeQueue.remove(fk); } try { + _currentSize.addAndGet(-removedSize); flushOne(obj); } catch (Exception e) { - Log.error("Failed writing object " + obj.getName(), e); - synchronized (_objects) { - _objects.put(obj.getName(), Pair.of(System.currentTimeMillis(), obj)); - } + Log.error("Failed writing object " + obj.getName() + ", will retry.", e); + obj.runReadLocked(JObject.ResolutionStrategy.NO_RESOLUTION, (m, d) -> { + var size = jObjectSizeEstimator.estimateObjectSize(d); + synchronized (_writeQueue) { + _writeQueue.put(Pair.of(size, m.getName()), obj); + } + return null; + }); } } } catch (InterruptedException ignored) { @@ -132,69 +186,67 @@ public class JObjectWriteback { objectPersistentStore.writeObject(m.getName(), SerializationHelper.serialize(data)); } - public void remove(String name) { - synchronized (_objects) { - _objects.remove(name); - } - } - - private void tryClean() { - JObject found = null; - synchronized (_objects) { - if (_objects.size() >= limit) { - for (var obj : _objects.entrySet()) { - var jobj = obj.getValue().getValue(); - if (jobj.isDeleted()) { - if (jobj.tryRwLock()) { - if (!jobj.isDeleted()) { - jobj.rwUnlock(); - continue; - } - found = jobj; - _objects.remove(found.getName()); - break; - } - } - } - } - } - if (found != null) - try { - flushOneImmediate(found.getMeta(), null); - } finally { - found.rwUnlock(); - } - } - - public void hintDeletion(ObjectMetadata meta) { - synchronized (_objects) { - if (!meta.isWritten()) { - _objects.remove(meta.getName()); - } - } - } - - public void markDirty(String name, JObject object) { + public void remove(JObject object) { object.assertRWLock(); - if (object.isDeleted() && !object.getMeta().isWritten()) - return; + synchronized (_nursery) { + if (_nursery.containsKey(object)) { + var size = _nursery.get(object).getRight(); + _nursery.remove(object); + _currentSize.addAndGet(-size); + } + } + synchronized (_writeQueue) { + if (_writeQueue.containsValue(object)) { + var size = _writeQueue.inverseBidiMap().get(object).getLeft(); + _writeQueue.removeValue(object); + _currentSize.addAndGet(-size); + } + } + } - synchronized (_objects) { - if (_objects.containsKey(name)) - return; + public void markDirty(JObject object) { + object.assertRWLock(); + if (object.isDeleted() && !object.getMeta().isWritten()) { + remove(object); + return; } - tryClean(); + var size = jObjectSizeEstimator.estimateObjectSize(object.getData()); - synchronized (_objects) { - // FIXME: better logic - if (_objects.size() < limit) { + synchronized (_nursery) { + if (_nursery.containsKey(object)) { + long oldSize = _nursery.get(object).getRight(); + if (oldSize == size) + return; + long oldTime = _nursery.get(object).getLeft(); + _nursery.replace(object, Pair.of(oldTime, size)); + _currentSize.addAndGet(size - oldSize); + return; + } + } + + synchronized (_writeQueue) { + if (_writeQueue.containsValue(object)) { + long oldSize = _writeQueue.getKey(object).getKey(); + if (oldSize == size) + return; + _currentSize.addAndGet(size - oldSize); + _writeQueue.inverseBidiMap().replace(object, Pair.of(size, object.getName())); + return; + } + } + + var curTime = System.currentTimeMillis(); + + synchronized (_nursery) { + if (_currentSize.get() < sizeLimit) { if (overload) { overload = false; Log.trace("Writeback cache enabled"); } - _objects.put(name, Pair.of(System.currentTimeMillis(), object)); - _objects.notifyAll(); + _nursery.put(object, Pair.of(curTime, size)); + _currentSize.addAndGet(size); + _nursery.notifyAll(); return; } } @@ -207,9 +259,7 @@ public class JObjectWriteback { flushOneImmediate(object.getMeta(), object.getData()); } catch (Exception e) { Log.error("Failed writing object " + object.getName(), e); - synchronized (_objects) { - _objects.put(object.getName(), Pair.of(System.currentTimeMillis(), object)); - } + throw e; } } } diff --git a/server/src/main/resources/application.properties b/server/src/main/resources/application.properties index ff96ab11..4f507b8c 100644 --- a/server/src/main/resources/application.properties +++ b/server/src/main/resources/application.properties @@ -13,8 +13,8 @@ dhfs.fuse.root=${HOME}/dhfs_data/dhfs_fuse_root dhfs.fuse.debug=false dhfs.fuse.enabled=true dhfs.storage.files.target_chunk_size=1048576 -dhfs.objects.writeback.delay=200 -dhfs.objects.writeback.limit=3000 +dhfs.objects.writeback.delay=100 +dhfs.objects.writeback.limit=1073741824 dhfs.objects.deletion.delay=0 dhfs.objects.ref_verification=false dhfs.files.use_hash_for_chunks=false \ No newline at end of file