mirror of
https://github.com/usatiuk/dhfs.git
synced 2025-10-29 04:57:48 +01:00
Compare commits
8 Commits
10ba128de3
...
855894b348
| Author | SHA1 | Date | |
|---|---|---|---|
| 855894b348 | |||
| 4aa49e3f6d | |||
| be4a1ba165 | |||
| 20904ed3ad | |||
| 7297188ad1 | |||
| 22145d7674 | |||
| 2b5fb1c850 | |||
| c8f9b4a339 |
@@ -32,9 +32,7 @@ public class Directory extends FsNode {
|
||||
}
|
||||
|
||||
public Optional<UUID> getKid(String name) {
|
||||
if (_children.containsKey(name))
|
||||
return Optional.of(_children.get(name));
|
||||
else return Optional.empty();
|
||||
return Optional.ofNullable(_children.get(name));
|
||||
}
|
||||
|
||||
public boolean removeKid(String name) {
|
||||
@@ -64,6 +62,6 @@ public class Directory extends FsNode {
|
||||
|
||||
@Override
|
||||
public long estimateSize() {
|
||||
return _children.size() * 16L;
|
||||
return _children.size() * 192L;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -46,6 +46,6 @@ public class File extends FsNode {
|
||||
|
||||
@Override
|
||||
public long estimateSize() {
|
||||
return _chunks.size() * 16L;
|
||||
return _chunks.size() * 192L;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -8,7 +8,6 @@ import io.quarkus.logging.Log;
|
||||
import jakarta.annotation.Nullable;
|
||||
import org.apache.commons.lang3.NotImplementedException;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
@@ -180,7 +179,8 @@ public class JObject<T extends JObjectData> {
|
||||
throw new IllegalStateException("Object " + getName() + " has non-hydrated refs when written locally");
|
||||
|
||||
_metaPart.narrowClass(res.get().getClass());
|
||||
_dataPart.compareAndSet(null, res.get());
|
||||
if (_dataPart.compareAndSet(null, res.get()))
|
||||
_resolver.onResolution(this);
|
||||
} // _dataPart.get() == null
|
||||
} finally {
|
||||
rUnlock();
|
||||
|
||||
@@ -8,11 +8,8 @@ import jakarta.inject.Inject;
|
||||
import org.eclipse.microprofile.config.inject.ConfigProperty;
|
||||
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
@ApplicationScoped
|
||||
public class JObjectLRU {
|
||||
@@ -26,9 +23,6 @@ public class JObjectLRU {
|
||||
private long _curSize = 0;
|
||||
private long _evict = 0;
|
||||
|
||||
private final AtomicReference<ConcurrentHashMap<JObject<?>, Long>> _accessQueue = new AtomicReference<>(new ConcurrentHashMap<>());
|
||||
private final AtomicLong _lastDrain = new AtomicLong(0);
|
||||
|
||||
private final LinkedHashMap<JObject<?>, Long> _cache = new LinkedHashMap<>();
|
||||
private ExecutorService _statusExecutor = null;
|
||||
|
||||
@@ -39,12 +33,22 @@ public class JObjectLRU {
|
||||
_statusExecutor.submit(() -> {
|
||||
try {
|
||||
while (true) {
|
||||
Thread.sleep(1000);
|
||||
Thread.sleep(10000);
|
||||
if (_curSize > 0)
|
||||
Log.info("Cache status: size="
|
||||
+ _curSize / 1024 / 1024 + "MB"
|
||||
+ " evicted=" + _evict);
|
||||
_evict = 0;
|
||||
if (Log.isTraceEnabled()) {
|
||||
long realSize = 0;
|
||||
synchronized (_cache) {
|
||||
for (JObject<?> object : _cache.keySet()) {
|
||||
realSize += jObjectSizeEstimator.estimateObjectSize(object.getData());
|
||||
}
|
||||
Log.info("Cache status: real size="
|
||||
+ realSize / 1024 / 1024 + "MB" + " entries=" + _cache.size());
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (InterruptedException ignored) {
|
||||
}
|
||||
@@ -59,27 +63,37 @@ public class JObjectLRU {
|
||||
}
|
||||
|
||||
public void notifyAccess(JObject<?> obj) {
|
||||
_accessQueue.get().put(obj, jObjectSizeEstimator.estimateObjectSize(obj.getData()));
|
||||
// TODO: no hardcoding
|
||||
if (_accessQueue.get().size() > 500 || System.currentTimeMillis() - _lastDrain.get() > 100) {
|
||||
synchronized (_cache) {
|
||||
_lastDrain.set(System.currentTimeMillis());
|
||||
var newQueue = new ConcurrentHashMap<JObject<?>, Long>();
|
||||
var oldQueue = _accessQueue.getAndSet(newQueue);
|
||||
if (obj.getData() == null) return;
|
||||
long size = jObjectSizeEstimator.estimateObjectSize(obj.getData());
|
||||
synchronized (_cache) {
|
||||
_curSize += size;
|
||||
var old = _cache.putLast(obj, size);
|
||||
if (old != null)
|
||||
_curSize -= old;
|
||||
|
||||
for (var x : oldQueue.entrySet()) {
|
||||
long oldSize = _cache.getOrDefault(x.getKey(), 0L);
|
||||
long newSize = x.getValue();
|
||||
_curSize -= oldSize;
|
||||
_curSize += newSize;
|
||||
_cache.putLast(x.getKey(), newSize);
|
||||
while (_curSize >= sizeLimit) {
|
||||
var del = _cache.pollFirstEntry();
|
||||
_curSize -= del.getValue();
|
||||
_evict++;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
while (_curSize >= sizeLimit) {
|
||||
var del = _cache.pollFirstEntry();
|
||||
_curSize -= del.getValue();
|
||||
_evict++;
|
||||
}
|
||||
}
|
||||
public void updateSize(JObject<?> obj) {
|
||||
long size = jObjectSizeEstimator.estimateObjectSize(obj.getData());
|
||||
synchronized (_cache) {
|
||||
var old = _cache.replace(obj, size);
|
||||
if (old != null) {
|
||||
_curSize += size;
|
||||
_curSize -= old;
|
||||
} else {
|
||||
return;
|
||||
}
|
||||
|
||||
while (_curSize >= sizeLimit) {
|
||||
var del = _cache.pollFirstEntry();
|
||||
_curSize -= del.getValue();
|
||||
_evict++;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -9,11 +9,13 @@ import io.quarkus.runtime.Shutdown;
|
||||
import io.quarkus.runtime.Startup;
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
import jakarta.inject.Inject;
|
||||
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
|
||||
import org.eclipse.microprofile.config.inject.ConfigProperty;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
@ApplicationScoped
|
||||
public class JObjectRefProcessor {
|
||||
@@ -30,10 +32,12 @@ public class JObjectRefProcessor {
|
||||
AutoSyncProcessor autoSyncProcessor;
|
||||
@ConfigProperty(name = "dhfs.objects.move-processor.threads")
|
||||
int moveProcessorThreads;
|
||||
@ConfigProperty(name = "dhfs.objects.ref-processor.threads")
|
||||
int refProcessorThreads;
|
||||
@ConfigProperty(name = "dhfs.objects.deletion.can-delete-retry-delay")
|
||||
long canDeleteRetryDelay;
|
||||
ExecutorService _movableProcessorExecutorService;
|
||||
private Thread _refProcessorThread;
|
||||
private ExecutorService _movableProcessorExecutorService;
|
||||
private ExecutorService _refProcessorExecutorService;
|
||||
|
||||
public JObjectRefProcessor(@ConfigProperty(name = "dhfs.objects.deletion.delay") long deletionDelay,
|
||||
@ConfigProperty(name = "dhfs.objects.deletion.can-delete-retry-delay") long canDeleteRetryDelay) {
|
||||
@@ -43,17 +47,26 @@ public class JObjectRefProcessor {
|
||||
|
||||
@Startup
|
||||
void init() {
|
||||
_movableProcessorExecutorService = Executors.newFixedThreadPool(moveProcessorThreads);
|
||||
BasicThreadFactory factory = new BasicThreadFactory.Builder()
|
||||
.namingPattern("move-proc-%d")
|
||||
.build();
|
||||
_movableProcessorExecutorService = Executors.newFixedThreadPool(moveProcessorThreads, factory);
|
||||
|
||||
_refProcessorThread = new Thread(this::refProcessorThread);
|
||||
_refProcessorThread.setName("JObject Refcounter thread");
|
||||
_refProcessorThread.start();
|
||||
BasicThreadFactory factoryRef = new BasicThreadFactory.Builder()
|
||||
.namingPattern("ref-proc-%d")
|
||||
.build();
|
||||
_refProcessorExecutorService = Executors.newFixedThreadPool(refProcessorThreads, factoryRef);
|
||||
for (int i = 0; i < refProcessorThreads; i++) {
|
||||
_refProcessorExecutorService.submit(this::refProcessor);
|
||||
}
|
||||
}
|
||||
|
||||
@Shutdown
|
||||
void shutdown() throws InterruptedException {
|
||||
_refProcessorThread.interrupt();
|
||||
_refProcessorThread.join();
|
||||
_refProcessorExecutorService.shutdownNow();
|
||||
if (!_refProcessorExecutorService.awaitTermination(30, TimeUnit.SECONDS)) {
|
||||
Log.error("Refcounting threads didn't exit in 30 seconds");
|
||||
}
|
||||
}
|
||||
|
||||
public void putDeletionCandidate(String name) {
|
||||
@@ -140,7 +153,7 @@ public class JObjectRefProcessor {
|
||||
}));
|
||||
}
|
||||
|
||||
private void refProcessorThread() {
|
||||
private void refProcessor() {
|
||||
try {
|
||||
while (!Thread.interrupted()) {
|
||||
String next = null;
|
||||
|
||||
@@ -178,6 +178,10 @@ public class JObjectResolver {
|
||||
return protoSerializerService.deserialize(obj);
|
||||
}
|
||||
|
||||
public void onResolution(JObject<?> self) {
|
||||
jObjectLRU.updateSize(self);
|
||||
}
|
||||
|
||||
public void removeLocal(JObject<?> jObject, String name) {
|
||||
jObject.assertRWLock();
|
||||
try {
|
||||
@@ -196,6 +200,7 @@ public class JObjectResolver {
|
||||
public <T extends JObjectData> void notifyWrite(JObject<T> self, boolean metaChanged,
|
||||
boolean externalChanged, boolean hasDataChanged) {
|
||||
self.assertRWLock();
|
||||
jObjectLRU.updateSize(self);
|
||||
if (metaChanged || hasDataChanged)
|
||||
jObjectWriteback.markDirty(self);
|
||||
if (metaChanged)
|
||||
|
||||
@@ -1,11 +1,11 @@
|
||||
package com.usatiuk.dhfs.objects.jrepository;
|
||||
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
import jakarta.inject.Singleton;
|
||||
|
||||
@ApplicationScoped
|
||||
@Singleton
|
||||
public class JObjectSizeEstimator {
|
||||
public long estimateObjectSize(JObjectData d) {
|
||||
if (d == null) return 200; // Assume metadata etc takes up something
|
||||
else return d.estimateSize();
|
||||
if (d == null) return 1024; // Assume metadata etc takes up something
|
||||
else return d.estimateSize() + 1024;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -20,7 +20,7 @@ import java.util.concurrent.Executors;
|
||||
@ApplicationScoped
|
||||
public class AutoSyncProcessor {
|
||||
private final HashSetDelayedBlockingQueue<String> _pending = new HashSetDelayedBlockingQueue<>(0);
|
||||
private final HashSetDelayedBlockingQueue<String> _retries = new HashSetDelayedBlockingQueue<>(1000); //FIXME:
|
||||
private final HashSetDelayedBlockingQueue<String> _retries = new HashSetDelayedBlockingQueue<>(10000); //FIXME:
|
||||
@Inject
|
||||
JObjectResolver jObjectResolver;
|
||||
@Inject
|
||||
@@ -101,13 +101,13 @@ public class AutoSyncProcessor {
|
||||
return obj.tryResolve(JObject.ResolutionStrategy.REMOTE);
|
||||
});
|
||||
if (!ok) {
|
||||
Log.warn("Failed downloading object " + obj.getName() + ", will retry.");
|
||||
Log.debug("Failed downloading object " + obj.getName() + ", will retry.");
|
||||
_retries.add(obj.getName());
|
||||
}
|
||||
});
|
||||
} catch (DeletedObjectAccessException ignored) {
|
||||
} catch (Exception e) {
|
||||
Log.error("Failed downloading object " + name + ", will retry.", e);
|
||||
Log.debug("Failed downloading object " + name + ", will retry.", e);
|
||||
_retries.add(name);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -41,6 +41,6 @@ public class PeerDirectory extends JObjectData {
|
||||
|
||||
@Override
|
||||
public long estimateSize() {
|
||||
return _peers.size() * 16L;
|
||||
return _peers.size() * 32L;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -29,12 +29,13 @@ dhfs.objects.writeback.watermark-high=0.6
|
||||
dhfs.objects.writeback.watermark-low=0.4
|
||||
dhfs.objects.writeback.threads=4
|
||||
dhfs.objects.deletion.delay=1000
|
||||
dhfs.objects.deletion.can-delete-retry-delay=1000
|
||||
dhfs.objects.deletion.can-delete-retry-delay=10000
|
||||
dhfs.objects.ref_verification=true
|
||||
dhfs.files.use_hash_for_chunks=false
|
||||
dhfs.objects.autosync.threads=2
|
||||
dhfs.objects.autosync.download-all=false
|
||||
dhfs.objects.move-processor.threads=2
|
||||
dhfs.objects.ref-processor.threads=4
|
||||
quarkus.log.category."com.usatiuk.dhfs".min-level=TRACE
|
||||
quarkus.log.category."com.usatiuk.dhfs".level=TRACE
|
||||
quarkus.http.insecure-requests=enabled
|
||||
|
||||
Reference in New Issue
Block a user