diff --git a/server/pom.xml b/server/pom.xml
index fec33891..eb5e8fdb 100644
--- a/server/pom.xml
+++ b/server/pom.xml
@@ -113,6 +113,11 @@
commons-codec
commons-codec
+
+ org.apache.commons
+ commons-collections4
+ 4.5.0-M2
+
diff --git a/server/src/main/java/com/usatiuk/dhfs/storage/files/objects/ChunkData.java b/server/src/main/java/com/usatiuk/dhfs/storage/files/objects/ChunkData.java
index 518c2354..111b4a53 100644
--- a/server/src/main/java/com/usatiuk/dhfs/storage/files/objects/ChunkData.java
+++ b/server/src/main/java/com/usatiuk/dhfs/storage/files/objects/ChunkData.java
@@ -63,4 +63,9 @@ public class ChunkData extends JObjectData {
public boolean assumeUnique() {
return true;
}
+
+ @Override
+ public long estimateSize() {
+ return _bytes.size();
+ }
}
diff --git a/server/src/main/java/com/usatiuk/dhfs/storage/files/objects/ChunkInfo.java b/server/src/main/java/com/usatiuk/dhfs/storage/files/objects/ChunkInfo.java
index 09f81313..71b5affe 100644
--- a/server/src/main/java/com/usatiuk/dhfs/storage/files/objects/ChunkInfo.java
+++ b/server/src/main/java/com/usatiuk/dhfs/storage/files/objects/ChunkInfo.java
@@ -56,4 +56,9 @@ public class ChunkInfo extends JObjectData {
public boolean assumeUnique() {
return true;
}
+
+ @Override
+ public long estimateSize() {
+ return _hash.length() * 2L;
+ }
}
diff --git a/server/src/main/java/com/usatiuk/dhfs/storage/files/objects/Directory.java b/server/src/main/java/com/usatiuk/dhfs/storage/files/objects/Directory.java
index b2732b01..ae81c582 100644
--- a/server/src/main/java/com/usatiuk/dhfs/storage/files/objects/Directory.java
+++ b/server/src/main/java/com/usatiuk/dhfs/storage/files/objects/Directory.java
@@ -48,4 +48,9 @@ public class Directory extends FsNode {
public List getChildrenList() {
return _children.keySet().stream().toList();
}
+
+ @Override
+ public long estimateSize() {
+ return _children.size() * 16L;
+ }
}
diff --git a/server/src/main/java/com/usatiuk/dhfs/storage/files/objects/File.java b/server/src/main/java/com/usatiuk/dhfs/storage/files/objects/File.java
index d11ae662..01269237 100644
--- a/server/src/main/java/com/usatiuk/dhfs/storage/files/objects/File.java
+++ b/server/src/main/java/com/usatiuk/dhfs/storage/files/objects/File.java
@@ -30,4 +30,9 @@ public class File extends FsNode {
@Getter
private final UUID _parent;
+
+ @Override
+ public long estimateSize() {
+ return _chunks.size() * 16L;
+ }
}
diff --git a/server/src/main/java/com/usatiuk/dhfs/storage/objects/jrepository/JObject.java b/server/src/main/java/com/usatiuk/dhfs/storage/objects/jrepository/JObject.java
index 0cbae47b..1689bde2 100644
--- a/server/src/main/java/com/usatiuk/dhfs/storage/objects/jrepository/JObject.java
+++ b/server/src/main/java/com/usatiuk/dhfs/storage/objects/jrepository/JObject.java
@@ -5,6 +5,7 @@ import com.usatiuk.dhfs.storage.objects.repository.distributed.ObjectMetadata;
import io.quarkus.logging.Log;
import jakarta.annotation.Nullable;
import org.apache.commons.lang3.NotImplementedException;
+import org.jetbrains.annotations.NotNull;
import java.io.Serializable;
import java.util.Comparator;
@@ -14,7 +15,25 @@ import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantReadWriteLock;
-public class JObject implements Serializable {
+public class JObject implements Serializable, Comparable> {
+ @Override
+ public int compareTo(@NotNull JObject> o) {
+ return getName().compareTo(o.getName());
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ JObject> jObject = (JObject>) o;
+ return Objects.equals(_metaPart.getName(), jObject._metaPart.getName());
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(_metaPart.getName());
+ }
+
public static class DeletedObjectAccessException extends RuntimeException {
}
@@ -153,6 +172,7 @@ public class JObject implements Serializable {
var ref = _metaPart.getRefcount();
boolean wasSeen = _metaPart.isSeen();
boolean wasDeleted = _metaPart.isDeleted();
+ var prevData = _dataPart.get();
VoidFn invalidateFn = () -> {
_resolver.backupRefs(this);
_dataPart.set(null);
@@ -163,7 +183,8 @@ public class JObject implements Serializable {
if (!Objects.equals(ver, _metaPart.getOurVersion())
|| ref != _metaPart.getRefcount()
|| wasDeleted != _metaPart.isDeleted()
- || wasSeen != _metaPart.isSeen())
+ || wasSeen != _metaPart.isSeen()
+ || prevData != _dataPart.get())
notifyWriteMeta();
if (!Objects.equals(ver, _metaPart.getOurVersion()))
notifyWriteData();
diff --git a/server/src/main/java/com/usatiuk/dhfs/storage/objects/jrepository/JObjectData.java b/server/src/main/java/com/usatiuk/dhfs/storage/objects/jrepository/JObjectData.java
index a43fd8c9..ba7618e2 100644
--- a/server/src/main/java/com/usatiuk/dhfs/storage/objects/jrepository/JObjectData.java
+++ b/server/src/main/java/com/usatiuk/dhfs/storage/objects/jrepository/JObjectData.java
@@ -14,4 +14,8 @@ public abstract class JObjectData implements Serializable {
}
public abstract Collection extractRefs();
+
+ public long estimateSize() {
+ return 0;
+ }
}
diff --git a/server/src/main/java/com/usatiuk/dhfs/storage/objects/jrepository/JObjectManagerImpl.java b/server/src/main/java/com/usatiuk/dhfs/storage/objects/jrepository/JObjectManagerImpl.java
index dbb3a8c0..2e0c1d95 100644
--- a/server/src/main/java/com/usatiuk/dhfs/storage/objects/jrepository/JObjectManagerImpl.java
+++ b/server/src/main/java/com/usatiuk/dhfs/storage/objects/jrepository/JObjectManagerImpl.java
@@ -222,7 +222,6 @@ public class JObjectManagerImpl implements JObjectManager {
refs = Streams.concat(refs, object.getData().extractRefs().stream());
object.discardData();
- jObjectWriteback.hintDeletion(m);
refs.forEach(c -> get(c).ifPresent(ref -> ref.runWriteLocked(JObject.ResolutionStrategy.NO_RESOLUTION, (mc, dc, bc, ic) -> {
mc.removeRef(object.getName());
diff --git a/server/src/main/java/com/usatiuk/dhfs/storage/objects/jrepository/JObjectRefProcessor.java b/server/src/main/java/com/usatiuk/dhfs/storage/objects/jrepository/JObjectRefProcessor.java
index 1d69446f..dc59ed88 100644
--- a/server/src/main/java/com/usatiuk/dhfs/storage/objects/jrepository/JObjectRefProcessor.java
+++ b/server/src/main/java/com/usatiuk/dhfs/storage/objects/jrepository/JObjectRefProcessor.java
@@ -91,7 +91,6 @@ public class JObjectRefProcessor {
refs = Streams.concat(refs, got.get().getData().extractRefs().stream());
got.get().discardData();
- jObjectWriteback.hintDeletion(m);
refs.forEach(c -> {
jObjectManager.get(c).ifPresent(ref -> ref.runWriteLocked(JObject.ResolutionStrategy.NO_RESOLUTION, (mc, dc, bc, ic) -> {
diff --git a/server/src/main/java/com/usatiuk/dhfs/storage/objects/jrepository/JObjectResolver.java b/server/src/main/java/com/usatiuk/dhfs/storage/objects/jrepository/JObjectResolver.java
index d53676e9..7a09a5bf 100644
--- a/server/src/main/java/com/usatiuk/dhfs/storage/objects/jrepository/JObjectResolver.java
+++ b/server/src/main/java/com/usatiuk/dhfs/storage/objects/jrepository/JObjectResolver.java
@@ -104,7 +104,7 @@ public class JObjectResolver {
if (local.isPresent()) return local.get();
var obj = remoteObjectServiceClient.getObject(jObject);
- jObjectWriteback.markDirty(jObject.getName(), jObject);
+ jObjectWriteback.markDirty(jObject);
invalidationQueueService.pushInvalidationToAll(jObject.getName(), !jObject.getMeta().isSeen());
return SerializationHelper.deserialize(obj);
}
@@ -113,7 +113,7 @@ public class JObjectResolver {
jObject.assertRWLock();
try {
Log.trace("Invalidating " + name);
- jObjectWriteback.remove(name);
+ jObjectWriteback.remove(jObject);
objectPersistentStore.deleteObject(name);
} catch (StatusRuntimeException sx) {
if (sx.getStatus() != Status.NOT_FOUND)
@@ -125,12 +125,12 @@ public class JObjectResolver {
public void notifyWriteMeta(JObject> self) {
self.assertRWLock();
- jObjectWriteback.markDirty(self.getName(), self);
+ jObjectWriteback.markDirty(self);
}
public void notifyWriteData(JObject> self) {
self.assertRWLock();
- jObjectWriteback.markDirty(self.getName(), self);
+ jObjectWriteback.markDirty(self);
if (self.isResolved())
invalidationQueueService.pushInvalidationToAll(self.getName(), !self.getMeta().isSeen());
}
diff --git a/server/src/main/java/com/usatiuk/dhfs/storage/objects/jrepository/JObjectSizeEstimator.java b/server/src/main/java/com/usatiuk/dhfs/storage/objects/jrepository/JObjectSizeEstimator.java
new file mode 100644
index 00000000..7098e545
--- /dev/null
+++ b/server/src/main/java/com/usatiuk/dhfs/storage/objects/jrepository/JObjectSizeEstimator.java
@@ -0,0 +1,11 @@
+package com.usatiuk.dhfs.storage.objects.jrepository;
+
+import jakarta.enterprise.context.ApplicationScoped;
+
+@ApplicationScoped
+public class JObjectSizeEstimator {
+ public long estimateObjectSize(JObjectData d) {
+ if (d == null) return 200; // Assume metadata etc takes up something
+ else return d.estimateSize();
+ }
+}
diff --git a/server/src/main/java/com/usatiuk/dhfs/storage/objects/jrepository/JObjectWriteback.java b/server/src/main/java/com/usatiuk/dhfs/storage/objects/jrepository/JObjectWriteback.java
index 7f4676d3..b40a0c8e 100644
--- a/server/src/main/java/com/usatiuk/dhfs/storage/objects/jrepository/JObjectWriteback.java
+++ b/server/src/main/java/com/usatiuk/dhfs/storage/objects/jrepository/JObjectWriteback.java
@@ -10,12 +10,15 @@ import jakarta.annotation.Priority;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
import jakarta.inject.Inject;
+import org.apache.commons.collections4.OrderedBidiMap;
+import org.apache.commons.collections4.bidimap.TreeBidiMap;
import org.apache.commons.lang3.tuple.Pair;
import org.eclipse.microprofile.config.inject.ConfigProperty;
-import java.util.ArrayList;
-import java.util.Collection;
+import java.util.HashSet;
import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.concurrent.atomic.AtomicLong;
@ApplicationScoped
public class JObjectWriteback {
@@ -26,15 +29,23 @@ public class JObjectWriteback {
@Inject
JObjectManager jObjectManager;
+ @Inject
+ JObjectSizeEstimator jObjectSizeEstimator;
+
@ConfigProperty(name = "dhfs.objects.writeback.delay")
- Integer delay;
+ long promotionDelay;
@ConfigProperty(name = "dhfs.objects.writeback.limit")
- Integer limit;
+ long sizeLimit;
- private final LinkedHashMap>> _objects = new LinkedHashMap<>();
+ AtomicLong _currentSize = new AtomicLong(0);
+
+ private final LinkedHashMap, Pair> _nursery = new LinkedHashMap<>();
+ // FIXME: Kind of a hack
+ private final OrderedBidiMap, JObject>> _writeQueue = new TreeBidiMap<>();
private Thread _writebackThread;
+ private Thread _promotionThread;
boolean overload = false;
@@ -43,9 +54,20 @@ public class JObjectWriteback {
_writebackThread = new Thread(this::writeback);
_writebackThread.setName("JObject writeback thread");
_writebackThread.start();
+ _promotionThread = new Thread(this::promote);
+ _promotionThread.setName("Writeback promotion thread");
+ _promotionThread.start();
}
void shutdown(@Observes @Priority(10) ShutdownEvent event) {
+ _promotionThread.interrupt();
+ while (_promotionThread.isAlive()) {
+ try {
+ _promotionThread.join();
+ } catch (InterruptedException ignored) {
+ }
+ }
+
_writebackThread.interrupt();
while (_writebackThread.isAlive()) {
try {
@@ -54,48 +76,80 @@ public class JObjectWriteback {
}
}
- Collection>> toWrite;
- synchronized (_objects) {
- toWrite = new ArrayList<>(_objects.values());
- }
+ HashSet> toWrite = new LinkedHashSet<>();
+ toWrite.addAll(_nursery.keySet());
+ toWrite.addAll(_writeQueue.values());
+
+ Log.info("Flushing objects");
for (var v : toWrite) {
try {
- flushOne(v.getRight());
+ flushOne(v);
} catch (Exception e) {
- Log.error("Failed writing object " + v.getRight().getName(), e);
+ Log.error("Failed writing object " + v.getName(), e);
}
}
}
+ private void promote() {
+ try {
+ while (!Thread.interrupted()) {
+
+ var curTime = System.currentTimeMillis();
+
+ long wait = 0;
+
+ synchronized (_nursery) {
+ while (_nursery.isEmpty())
+ _nursery.wait();
+
+ if ((curTime - _nursery.firstEntry().getValue().getLeft()) <= promotionDelay) {
+ wait = promotionDelay - (curTime - _nursery.firstEntry().getValue().getLeft());
+ }
+ }
+
+ if (wait > 0)
+ Thread.sleep(wait);
+
+ synchronized (_nursery) {
+ while (!_nursery.isEmpty() && (curTime - _nursery.firstEntry().getValue().getLeft()) >= promotionDelay) {
+ var got = _nursery.pollFirstEntry();
+ synchronized (_writeQueue) {
+ _writeQueue.put(Pair.of(got.getValue().getRight(), got.getKey().getName()), got.getKey());
+ _writeQueue.notifyAll();
+ }
+ }
+ }
+ }
+ } catch (InterruptedException ignored) {
+ }
+ Log.info("Writeback promotion thread exiting");
+ }
+
private void writeback() {
try {
- boolean wait = false;
while (!Thread.interrupted()) {
- if (wait) {
- Thread.sleep(delay);
- wait = false;
- }
JObject> obj;
- synchronized (_objects) {
- while (_objects.isEmpty())
- _objects.wait();
+ long removedSize;
+ synchronized (_writeQueue) {
+ while (_writeQueue.isEmpty())
+ _writeQueue.wait();
- if ((System.currentTimeMillis() - _objects.firstEntry().getValue().getLeft()) < delay) {
- wait = true;
- continue;
- }
-
- var entry = _objects.pollFirstEntry();
- if (entry == null) break;
- obj = entry.getValue().getRight();
+ var fk = _writeQueue.lastKey();
+ removedSize = fk.getKey();
+ obj = _writeQueue.remove(fk);
}
try {
+ _currentSize.addAndGet(-removedSize);
flushOne(obj);
} catch (Exception e) {
- Log.error("Failed writing object " + obj.getName(), e);
- synchronized (_objects) {
- _objects.put(obj.getName(), Pair.of(System.currentTimeMillis(), obj));
- }
+ Log.error("Failed writing object " + obj.getName() + ", will retry.", e);
+ obj.runReadLocked(JObject.ResolutionStrategy.NO_RESOLUTION, (m, d) -> {
+ var size = jObjectSizeEstimator.estimateObjectSize(d);
+ synchronized (_writeQueue) {
+ _writeQueue.put(Pair.of(size, m.getName()), obj);
+ }
+ return null;
+ });
}
}
} catch (InterruptedException ignored) {
@@ -132,69 +186,67 @@ public class JObjectWriteback {
objectPersistentStore.writeObject(m.getName(), SerializationHelper.serialize(data));
}
- public void remove(String name) {
- synchronized (_objects) {
- _objects.remove(name);
- }
- }
-
- private void tryClean() {
- JObject> found = null;
- synchronized (_objects) {
- if (_objects.size() >= limit) {
- for (var obj : _objects.entrySet()) {
- var jobj = obj.getValue().getValue();
- if (jobj.isDeleted()) {
- if (jobj.tryRwLock()) {
- if (!jobj.isDeleted()) {
- jobj.rwUnlock();
- continue;
- }
- found = jobj;
- _objects.remove(found.getName());
- break;
- }
- }
- }
- }
- }
- if (found != null)
- try {
- flushOneImmediate(found.getMeta(), null);
- } finally {
- found.rwUnlock();
- }
- }
-
- public void hintDeletion(ObjectMetadata meta) {
- synchronized (_objects) {
- if (!meta.isWritten()) {
- _objects.remove(meta.getName());
- }
- }
- }
-
- public void markDirty(String name, JObject> object) {
+ public void remove(JObject> object) {
object.assertRWLock();
- if (object.isDeleted() && !object.getMeta().isWritten())
- return;
+ synchronized (_nursery) {
+ if (_nursery.containsKey(object)) {
+ var size = _nursery.get(object).getRight();
+ _nursery.remove(object);
+ _currentSize.addAndGet(-size);
+ }
+ }
+ synchronized (_writeQueue) {
+ if (_writeQueue.containsValue(object)) {
+ var size = _writeQueue.inverseBidiMap().get(object).getLeft();
+ _writeQueue.removeValue(object);
+ _currentSize.addAndGet(-size);
+ }
+ }
+ }
- synchronized (_objects) {
- if (_objects.containsKey(name))
- return;
+ public void markDirty(JObject> object) {
+ object.assertRWLock();
+ if (object.isDeleted() && !object.getMeta().isWritten()) {
+ remove(object);
+ return;
}
- tryClean();
+ var size = jObjectSizeEstimator.estimateObjectSize(object.getData());
- synchronized (_objects) {
- // FIXME: better logic
- if (_objects.size() < limit) {
+ synchronized (_nursery) {
+ if (_nursery.containsKey(object)) {
+ long oldSize = _nursery.get(object).getRight();
+ if (oldSize == size)
+ return;
+ long oldTime = _nursery.get(object).getLeft();
+ _nursery.replace(object, Pair.of(oldTime, size));
+ _currentSize.addAndGet(size - oldSize);
+ return;
+ }
+ }
+
+ synchronized (_writeQueue) {
+ if (_writeQueue.containsValue(object)) {
+ long oldSize = _writeQueue.getKey(object).getKey();
+ if (oldSize == size)
+ return;
+ _currentSize.addAndGet(size - oldSize);
+ _writeQueue.inverseBidiMap().replace(object, Pair.of(size, object.getName()));
+ return;
+ }
+ }
+
+ var curTime = System.currentTimeMillis();
+
+ synchronized (_nursery) {
+ if (_currentSize.get() < sizeLimit) {
if (overload) {
overload = false;
Log.trace("Writeback cache enabled");
}
- _objects.put(name, Pair.of(System.currentTimeMillis(), object));
- _objects.notifyAll();
+ _nursery.put(object, Pair.of(curTime, size));
+ _currentSize.addAndGet(size);
+ _nursery.notifyAll();
return;
}
}
@@ -207,9 +259,7 @@ public class JObjectWriteback {
flushOneImmediate(object.getMeta(), object.getData());
} catch (Exception e) {
Log.error("Failed writing object " + object.getName(), e);
- synchronized (_objects) {
- _objects.put(object.getName(), Pair.of(System.currentTimeMillis(), object));
- }
+ throw e;
}
}
}
diff --git a/server/src/main/resources/application.properties b/server/src/main/resources/application.properties
index ff96ab11..4f507b8c 100644
--- a/server/src/main/resources/application.properties
+++ b/server/src/main/resources/application.properties
@@ -13,8 +13,8 @@ dhfs.fuse.root=${HOME}/dhfs_data/dhfs_fuse_root
dhfs.fuse.debug=false
dhfs.fuse.enabled=true
dhfs.storage.files.target_chunk_size=1048576
-dhfs.objects.writeback.delay=200
-dhfs.objects.writeback.limit=3000
+dhfs.objects.writeback.delay=100
+dhfs.objects.writeback.limit=1073741824
dhfs.objects.deletion.delay=0
dhfs.objects.ref_verification=false
dhfs.files.use_hash_for_chunks=false
\ No newline at end of file