8 Commits

Author SHA1 Message Date
855894b348 actually update and not put 2024-07-28 20:29:38 +02:00
4aa49e3f6d don't forget to actually update the size 2024-07-28 20:19:31 +02:00
be4a1ba165 separate updateSize in JObjectLRU 2024-07-28 19:42:22 +02:00
20904ed3ad make autosync fails "debug" and increase default delays 2024-07-28 19:34:07 +02:00
7297188ad1 avoid possible leak with the cache 2024-07-28 19:13:11 +02:00
22145d7674 fix Dir/File size calculation
it really is time to stop using strings for everything...
2024-07-28 19:11:46 +02:00
2b5fb1c850 simplify JObjectLRU
somehow the more clever solutions are exactly the same speed
so probably there are bigger fish to fry with git status speed
2024-07-28 18:26:02 +02:00
c8f9b4a339 multithreaded object deletion 2024-07-28 17:04:40 +02:00
10 changed files with 82 additions and 51 deletions

View File

@@ -32,9 +32,7 @@ public class Directory extends FsNode {
}
public Optional<UUID> getKid(String name) {
if (_children.containsKey(name))
return Optional.of(_children.get(name));
else return Optional.empty();
return Optional.ofNullable(_children.get(name));
}
public boolean removeKid(String name) {
@@ -64,6 +62,6 @@ public class Directory extends FsNode {
@Override
public long estimateSize() {
return _children.size() * 16L;
return _children.size() * 192L;
}
}

View File

@@ -46,6 +46,6 @@ public class File extends FsNode {
@Override
public long estimateSize() {
return _chunks.size() * 16L;
return _chunks.size() * 192L;
}
}

View File

@@ -8,7 +8,6 @@ import io.quarkus.logging.Log;
import jakarta.annotation.Nullable;
import org.apache.commons.lang3.NotImplementedException;
import java.io.Serializable;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
@@ -180,7 +179,8 @@ public class JObject<T extends JObjectData> {
throw new IllegalStateException("Object " + getName() + " has non-hydrated refs when written locally");
_metaPart.narrowClass(res.get().getClass());
_dataPart.compareAndSet(null, res.get());
if (_dataPart.compareAndSet(null, res.get()))
_resolver.onResolution(this);
} // _dataPart.get() == null
} finally {
rUnlock();

View File

@@ -8,11 +8,8 @@ import jakarta.inject.Inject;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import java.util.LinkedHashMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
@ApplicationScoped
public class JObjectLRU {
@@ -26,9 +23,6 @@ public class JObjectLRU {
private long _curSize = 0;
private long _evict = 0;
private final AtomicReference<ConcurrentHashMap<JObject<?>, Long>> _accessQueue = new AtomicReference<>(new ConcurrentHashMap<>());
private final AtomicLong _lastDrain = new AtomicLong(0);
private final LinkedHashMap<JObject<?>, Long> _cache = new LinkedHashMap<>();
private ExecutorService _statusExecutor = null;
@@ -39,12 +33,22 @@ public class JObjectLRU {
_statusExecutor.submit(() -> {
try {
while (true) {
Thread.sleep(1000);
Thread.sleep(10000);
if (_curSize > 0)
Log.info("Cache status: size="
+ _curSize / 1024 / 1024 + "MB"
+ " evicted=" + _evict);
_evict = 0;
if (Log.isTraceEnabled()) {
long realSize = 0;
synchronized (_cache) {
for (JObject<?> object : _cache.keySet()) {
realSize += jObjectSizeEstimator.estimateObjectSize(object.getData());
}
Log.info("Cache status: real size="
+ realSize / 1024 / 1024 + "MB" + " entries=" + _cache.size());
}
}
}
} catch (InterruptedException ignored) {
}
@@ -59,27 +63,37 @@ public class JObjectLRU {
}
public void notifyAccess(JObject<?> obj) {
_accessQueue.get().put(obj, jObjectSizeEstimator.estimateObjectSize(obj.getData()));
// TODO: no hardcoding
if (_accessQueue.get().size() > 500 || System.currentTimeMillis() - _lastDrain.get() > 100) {
synchronized (_cache) {
_lastDrain.set(System.currentTimeMillis());
var newQueue = new ConcurrentHashMap<JObject<?>, Long>();
var oldQueue = _accessQueue.getAndSet(newQueue);
if (obj.getData() == null) return;
long size = jObjectSizeEstimator.estimateObjectSize(obj.getData());
synchronized (_cache) {
_curSize += size;
var old = _cache.putLast(obj, size);
if (old != null)
_curSize -= old;
for (var x : oldQueue.entrySet()) {
long oldSize = _cache.getOrDefault(x.getKey(), 0L);
long newSize = x.getValue();
_curSize -= oldSize;
_curSize += newSize;
_cache.putLast(x.getKey(), newSize);
while (_curSize >= sizeLimit) {
var del = _cache.pollFirstEntry();
_curSize -= del.getValue();
_evict++;
}
}
}
while (_curSize >= sizeLimit) {
var del = _cache.pollFirstEntry();
_curSize -= del.getValue();
_evict++;
}
}
public void updateSize(JObject<?> obj) {
long size = jObjectSizeEstimator.estimateObjectSize(obj.getData());
synchronized (_cache) {
var old = _cache.replace(obj, size);
if (old != null) {
_curSize += size;
_curSize -= old;
} else {
return;
}
while (_curSize >= sizeLimit) {
var del = _cache.pollFirstEntry();
_curSize -= del.getValue();
_evict++;
}
}
}

View File

@@ -9,11 +9,13 @@ import io.quarkus.runtime.Shutdown;
import io.quarkus.runtime.Startup;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@ApplicationScoped
public class JObjectRefProcessor {
@@ -30,10 +32,12 @@ public class JObjectRefProcessor {
AutoSyncProcessor autoSyncProcessor;
@ConfigProperty(name = "dhfs.objects.move-processor.threads")
int moveProcessorThreads;
@ConfigProperty(name = "dhfs.objects.ref-processor.threads")
int refProcessorThreads;
@ConfigProperty(name = "dhfs.objects.deletion.can-delete-retry-delay")
long canDeleteRetryDelay;
ExecutorService _movableProcessorExecutorService;
private Thread _refProcessorThread;
private ExecutorService _movableProcessorExecutorService;
private ExecutorService _refProcessorExecutorService;
public JObjectRefProcessor(@ConfigProperty(name = "dhfs.objects.deletion.delay") long deletionDelay,
@ConfigProperty(name = "dhfs.objects.deletion.can-delete-retry-delay") long canDeleteRetryDelay) {
@@ -43,17 +47,26 @@ public class JObjectRefProcessor {
@Startup
void init() {
_movableProcessorExecutorService = Executors.newFixedThreadPool(moveProcessorThreads);
BasicThreadFactory factory = new BasicThreadFactory.Builder()
.namingPattern("move-proc-%d")
.build();
_movableProcessorExecutorService = Executors.newFixedThreadPool(moveProcessorThreads, factory);
_refProcessorThread = new Thread(this::refProcessorThread);
_refProcessorThread.setName("JObject Refcounter thread");
_refProcessorThread.start();
BasicThreadFactory factoryRef = new BasicThreadFactory.Builder()
.namingPattern("ref-proc-%d")
.build();
_refProcessorExecutorService = Executors.newFixedThreadPool(refProcessorThreads, factoryRef);
for (int i = 0; i < refProcessorThreads; i++) {
_refProcessorExecutorService.submit(this::refProcessor);
}
}
@Shutdown
void shutdown() throws InterruptedException {
_refProcessorThread.interrupt();
_refProcessorThread.join();
_refProcessorExecutorService.shutdownNow();
if (!_refProcessorExecutorService.awaitTermination(30, TimeUnit.SECONDS)) {
Log.error("Refcounting threads didn't exit in 30 seconds");
}
}
public void putDeletionCandidate(String name) {
@@ -140,7 +153,7 @@ public class JObjectRefProcessor {
}));
}
private void refProcessorThread() {
private void refProcessor() {
try {
while (!Thread.interrupted()) {
String next = null;

View File

@@ -178,6 +178,10 @@ public class JObjectResolver {
return protoSerializerService.deserialize(obj);
}
public void onResolution(JObject<?> self) {
jObjectLRU.updateSize(self);
}
public void removeLocal(JObject<?> jObject, String name) {
jObject.assertRWLock();
try {
@@ -196,6 +200,7 @@ public class JObjectResolver {
public <T extends JObjectData> void notifyWrite(JObject<T> self, boolean metaChanged,
boolean externalChanged, boolean hasDataChanged) {
self.assertRWLock();
jObjectLRU.updateSize(self);
if (metaChanged || hasDataChanged)
jObjectWriteback.markDirty(self);
if (metaChanged)

View File

@@ -1,11 +1,11 @@
package com.usatiuk.dhfs.objects.jrepository;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Singleton;
@ApplicationScoped
@Singleton
public class JObjectSizeEstimator {
public long estimateObjectSize(JObjectData d) {
if (d == null) return 200; // Assume metadata etc takes up something
else return d.estimateSize();
if (d == null) return 1024; // Assume metadata etc takes up something
else return d.estimateSize() + 1024;
}
}

View File

@@ -20,7 +20,7 @@ import java.util.concurrent.Executors;
@ApplicationScoped
public class AutoSyncProcessor {
private final HashSetDelayedBlockingQueue<String> _pending = new HashSetDelayedBlockingQueue<>(0);
private final HashSetDelayedBlockingQueue<String> _retries = new HashSetDelayedBlockingQueue<>(1000); //FIXME:
private final HashSetDelayedBlockingQueue<String> _retries = new HashSetDelayedBlockingQueue<>(10000); //FIXME:
@Inject
JObjectResolver jObjectResolver;
@Inject
@@ -101,13 +101,13 @@ public class AutoSyncProcessor {
return obj.tryResolve(JObject.ResolutionStrategy.REMOTE);
});
if (!ok) {
Log.warn("Failed downloading object " + obj.getName() + ", will retry.");
Log.debug("Failed downloading object " + obj.getName() + ", will retry.");
_retries.add(obj.getName());
}
});
} catch (DeletedObjectAccessException ignored) {
} catch (Exception e) {
Log.error("Failed downloading object " + name + ", will retry.", e);
Log.debug("Failed downloading object " + name + ", will retry.", e);
_retries.add(name);
}
}

View File

@@ -41,6 +41,6 @@ public class PeerDirectory extends JObjectData {
@Override
public long estimateSize() {
return _peers.size() * 16L;
return _peers.size() * 32L;
}
}

View File

@@ -29,12 +29,13 @@ dhfs.objects.writeback.watermark-high=0.6
dhfs.objects.writeback.watermark-low=0.4
dhfs.objects.writeback.threads=4
dhfs.objects.deletion.delay=1000
dhfs.objects.deletion.can-delete-retry-delay=1000
dhfs.objects.deletion.can-delete-retry-delay=10000
dhfs.objects.ref_verification=true
dhfs.files.use_hash_for_chunks=false
dhfs.objects.autosync.threads=2
dhfs.objects.autosync.download-all=false
dhfs.objects.move-processor.threads=2
dhfs.objects.ref-processor.threads=4
quarkus.log.category."com.usatiuk.dhfs".min-level=TRACE
quarkus.log.category."com.usatiuk.dhfs".level=TRACE
quarkus.http.insecure-requests=enabled