more options for writeback

This commit is contained in:
2024-06-30 16:05:42 +02:00
parent d0fd0f729f
commit b4dca123a2
3 changed files with 96 additions and 45 deletions

View File

@@ -31,6 +31,18 @@ public class DhfsFileServiceImpl implements DhfsFileService {
@ConfigProperty(name = "dhfs.storage.files.target_chunk_size")
Integer targetChunkSize;
@ConfigProperty(name = "dhfs.storage.files.write_merge_threshold")
float writeMergeThreshold;
@ConfigProperty(name = "dhfs.storage.files.write_merge_max_chunk_to_take")
float writeMergeMaxChunkToTake;
@ConfigProperty(name = "dhfs.storage.files.write_merge_limit")
float writeMergeLimit;
@ConfigProperty(name = "dhfs.storage.files.write_last_chunk_limit")
float writeLastChunkLimit;
@ConfigProperty(name = "dhfs.files.use_hash_for_chunks")
boolean useHashForChunks;
@@ -480,16 +492,22 @@ public class DhfsFileServiceImpl implements DhfsFileService {
long start = 0;
if (!chunksAll.isEmpty()) {
NavigableMap<Long, String> beforeFirst = first != null ? chunksAll.headMap(first.getKey(), false) : Collections.emptyNavigableMap();
NavigableMap<Long, String> afterLast = last != null ? chunksAll.tailMap(last.getKey(), false) : Collections.emptyNavigableMap();
if (first != null && (getChunkSize(first.getValue()) + first.getKey() <= offset)) {
beforeFirst = chunksAll;
afterLast = Collections.emptyNavigableMap();
first = null;
last = null;
start = offset;
} else if (!chunksAll.isEmpty()) {
var between = chunksAll.subMap(first.getKey(), true, last.getKey(), true);
removedChunks.addAll(between.values());
start = first.getKey();
between.clear();
}
NavigableMap<Long, String> beforeFirst = first != null ? chunksAll.headMap(first.getKey(), false) : Collections.emptyNavigableMap();
NavigableMap<Long, String> afterLast = last != null ? chunksAll.tailMap(last.getKey(), false) : Collections.emptyNavigableMap();
ByteString pendingWrites = ByteString.empty();
if (first != null && first.getKey() < offset) {
@@ -510,46 +528,54 @@ public class DhfsFileServiceImpl implements DhfsFileService {
int combinedSize = pendingWrites.size();
if (targetChunkSize > 0) {
if (Math.abs(combinedSize - targetChunkSize) > targetChunkSize * 0.1) {
if (combinedSize < targetChunkSize) {
boolean leftDone = false;
boolean rightDone = false;
while (!leftDone && !rightDone) {
if (beforeFirst.isEmpty()) leftDone = true;
if (!beforeFirst.isEmpty() && !leftDone) {
var takeLeft = beforeFirst.lastEntry();
if (combinedSize < (targetChunkSize * writeMergeThreshold)) {
boolean leftDone = false;
boolean rightDone = false;
while (!leftDone && !rightDone) {
if (beforeFirst.isEmpty()) leftDone = true;
if (!beforeFirst.isEmpty() && !leftDone) {
var takeLeft = beforeFirst.lastEntry();
var cuuid = takeLeft.getValue();
var cuuid = takeLeft.getValue();
if ((combinedSize + getChunkSize(cuuid)) > (targetChunkSize * 1.2)) {
leftDone = true;
continue;
}
beforeFirst.pollLastEntry();
start = takeLeft.getKey();
pendingWrites = readChunk(cuuid).concat(pendingWrites);
combinedSize += getChunkSize(cuuid);
chunksAll.remove(takeLeft.getKey());
removedChunks.add(cuuid);
if (getChunkSize(cuuid) >= (targetChunkSize * writeMergeMaxChunkToTake)) {
leftDone = true;
continue;
}
if (afterLast.isEmpty()) rightDone = true;
if (!afterLast.isEmpty() && !rightDone) {
var takeRight = afterLast.firstEntry();
var cuuid = takeRight.getValue();
if ((combinedSize + getChunkSize(cuuid)) > (targetChunkSize * 1.2)) {
rightDone = true;
continue;
}
afterLast.pollFirstEntry();
pendingWrites = pendingWrites.concat(readChunk(cuuid));
combinedSize += getChunkSize(cuuid);
chunksAll.remove(takeRight.getKey());
removedChunks.add(cuuid);
if ((combinedSize + getChunkSize(cuuid)) > (targetChunkSize * writeMergeLimit)) {
leftDone = true;
continue;
}
beforeFirst.pollLastEntry();
start = takeLeft.getKey();
pendingWrites = readChunk(cuuid).concat(pendingWrites);
combinedSize += getChunkSize(cuuid);
chunksAll.remove(takeLeft.getKey());
removedChunks.add(cuuid);
}
if (afterLast.isEmpty()) rightDone = true;
if (!afterLast.isEmpty() && !rightDone) {
var takeRight = afterLast.firstEntry();
var cuuid = takeRight.getValue();
if (getChunkSize(cuuid) >= (targetChunkSize * writeMergeMaxChunkToTake)) {
rightDone = true;
continue;
}
if ((combinedSize + getChunkSize(cuuid)) > (targetChunkSize * writeMergeLimit)) {
rightDone = true;
continue;
}
afterLast.pollFirstEntry();
pendingWrites = pendingWrites.concat(readChunk(cuuid));
combinedSize += getChunkSize(cuuid);
chunksAll.remove(takeRight.getKey());
removedChunks.add(cuuid);
}
}
}
@@ -563,8 +589,8 @@ public class DhfsFileServiceImpl implements DhfsFileService {
if (targetChunkSize <= 0)
end = combinedSize;
else {
if ((combinedSize - cur) > (targetChunkSize * 1.5)) {
end = cur + targetChunkSize;
if ((combinedSize - cur) > (targetChunkSize * writeLastChunkLimit)) {
end = Math.min(cur + targetChunkSize, combinedSize);
} else {
end = combinedSize;
}

View File

@@ -40,6 +40,9 @@ public class JObjectWriteback {
@ConfigProperty(name = "dhfs.objects.writeback.limit")
long sizeLimit;
@ConfigProperty(name = "dhfs.objects.writeback.nursery_limit")
int nurseryLimit;
@ConfigProperty(name = "dhfs.objects.writeback.threads")
int writebackThreads;
@@ -223,9 +226,14 @@ public class JObjectWriteback {
if (oldSize == size)
return;
long oldTime = _nursery.get(object).getLeft();
_nursery.replace(object, Pair.of(oldTime, size));
_currentSize.addAndGet(size - oldSize);
return;
if (nurseryLimit > 0 && size >= nurseryLimit) {
_nursery.remove(object);
_currentSize.addAndGet(-oldSize);
} else {
_nursery.replace(object, Pair.of(oldTime, size));
_currentSize.addAndGet(size - oldSize);
return;
}
}
}
@@ -242,6 +250,14 @@ public class JObjectWriteback {
var curTime = System.currentTimeMillis();
if (nurseryLimit > 0 && size >= nurseryLimit) {
synchronized (_writeQueue) {
_writeQueue.put(Pair.of(size, object.getName()), object);
_writeQueue.notifyAll();
return;
}
}
synchronized (_nursery) {
if (_currentSize.get() < sizeLimit) {
if (overload) {

View File

@@ -13,9 +13,18 @@ dhfs.fuse.root=${HOME}/dhfs_data/dhfs_fuse_root
dhfs.fuse.debug=false
dhfs.fuse.enabled=true
dhfs.storage.files.target_chunk_size=1048576
dhfs.objects.writeback.delay=100
# Writes strictly smaller than this will try to merge with blocks nearby
dhfs.storage.files.write_merge_threshold=0.8
# If a merge would result in a block of greater size than this, stop merging
dhfs.storage.files.write_merge_limit=1.2
# Don't take blocks of this size and above when merging
dhfs.storage.files.write_merge_max_chunk_to_take=1
dhfs.storage.files.write_last_chunk_limit=1.5
dhfs.objects.writeback.delay=50
dhfs.objects.writeback.limit=1073741824
dhfs.objects.writeback.threads=2
# Only objects with estimated size smaller than this will be put into nursery
dhfs.objects.writeback.nursery_limit=-1
dhfs.objects.deletion.delay=0
dhfs.objects.ref_verification=false
dhfs.files.use_hash_for_chunks=false