Server: less bad chunk merging

This commit is contained in:
2025-04-05 17:50:43 +02:00
parent 8e4ea67e53
commit 075867daaa
3 changed files with 50 additions and 175 deletions

View File

@@ -7,10 +7,6 @@ import com.usatiuk.dhfs.RemoteObjectMeta;
import com.usatiuk.dhfs.RemoteTransaction;
import com.usatiuk.dhfs.files.objects.ChunkData;
import com.usatiuk.dhfs.files.objects.File;
import com.usatiuk.objects.JData;
import com.usatiuk.objects.JObjectKey;
import com.usatiuk.objects.JObjectKeyImpl;
import com.usatiuk.objects.iterators.IteratorStart;
import com.usatiuk.dhfs.jkleppmanntree.JKleppmannTreeManager;
import com.usatiuk.dhfs.jkleppmanntree.structs.JKleppmannTreeNode;
import com.usatiuk.dhfs.jkleppmanntree.structs.JKleppmannTreeNodeMeta;
@@ -19,10 +15,13 @@ import com.usatiuk.dhfs.jkleppmanntree.structs.JKleppmannTreeNodeMetaFile;
import com.usatiuk.dhfs.jmap.JMapEntry;
import com.usatiuk.dhfs.jmap.JMapHelper;
import com.usatiuk.dhfs.jmap.JMapLongKey;
import com.usatiuk.dhfs.utils.StatusRuntimeExceptionNoStacktrace;
import com.usatiuk.objects.JData;
import com.usatiuk.objects.JObjectKey;
import com.usatiuk.objects.iterators.IteratorStart;
import com.usatiuk.objects.transaction.LockingStrategy;
import com.usatiuk.objects.transaction.Transaction;
import com.usatiuk.objects.transaction.TransactionManager;
import com.usatiuk.dhfs.utils.StatusRuntimeExceptionNoStacktrace;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.quarkus.logging.Log;
@@ -48,21 +47,12 @@ public class DhfsFileServiceImpl implements DhfsFileService {
@Inject
TransactionManager jObjectTxManager;
@ConfigProperty(name = "dhfs.files.target_chunk_alignment")
int targetChunkAlignment;
@ConfigProperty(name = "dhfs.files.target_chunk_size")
int targetChunkSize;
@ConfigProperty(name = "dhfs.files.write_merge_threshold")
float writeMergeThreshold;
@ConfigProperty(name = "dhfs.files.write_merge_max_chunk_to_take")
float writeMergeMaxChunkToTake;
@ConfigProperty(name = "dhfs.files.write_merge_limit")
float writeMergeLimit;
@ConfigProperty(name = "dhfs.files.write_last_chunk_limit")
float writeLastChunkLimit;
@ConfigProperty(name = "dhfs.files.use_hash_for_chunks")
boolean useHashForChunks;
@@ -356,22 +346,8 @@ public class DhfsFileServiceImpl implements DhfsFileService {
return readChunk(uuid).size();
}
private void cleanupChunks(File f, Collection<JObjectKey> uuids) {
// FIXME:
// var inFile = useHashForChunks ? new HashSet<>(f.getChunks().values()) : Collections.emptySet();
// for (var cuuid : uuids) {
// try {
// if (inFile.contains(cuuid)) continue;
// jObjectManager.get(cuuid)
// .ifPresent(jObject -> jObject.runWriteLocked(JObjectManager.ResolutionStrategy.NO_RESOLUTION,
// (m, d, b, v) -> {
// m.removeRef(f.getName());
// return null;
// }));
// } catch (Exception e) {
// Log.error("Error when cleaning chunk " + cuuid, e);
// }
// }
private long alignDown(long num, long n) {
return num & -(1L << n);
}
@Override
@@ -380,7 +356,6 @@ public class DhfsFileServiceImpl implements DhfsFileService {
if (offset < 0)
throw new StatusRuntimeException(Status.INVALID_ARGUMENT.withDescription("Offset should be more than zero: " + offset));
// FIXME:
var file = remoteTx.getData(File.class, fileUuid, LockingStrategy.WRITE).orElse(null);
if (file == null) {
Log.error("File not found when trying to write: " + fileUuid);
@@ -397,144 +372,58 @@ public class DhfsFileServiceImpl implements DhfsFileService {
file = remoteTx.getData(File.class, fileUuid).orElse(null);
}
Pair<JMapLongKey, JMapEntry<JMapLongKey>> first;
Pair<JMapLongKey, JMapEntry<JMapLongKey>> last;
Log.tracev("Getting last");
try (var it = jMapHelper.getIterator(file, IteratorStart.LT, JMapLongKey.of(offset + data.size()))) {
last = it.hasNext() ? it.next() : null;
Log.tracev("Last: {0}", last);
}
NavigableMap<Long, JObjectKey> removedChunks = new TreeMap<>();
long start = 0;
long realOffset = targetChunkAlignment >= 0 ? alignDown(offset, targetChunkAlignment) : offset;
long writeEnd = offset + data.size();
long start = realOffset;
ByteString pendingPrefix = ByteString.empty();
ByteString pendingSuffix = ByteString.empty();
try (var it = jMapHelper.getIterator(file, IteratorStart.LE, JMapLongKey.of(offset))) {
first = it.hasNext() ? it.next() : null;
Log.tracev("First: {0}", first);
boolean empty = last == null;
if (first != null && getChunkSize(first.getValue().ref()) + first.getKey().key() <= offset) {
first = null;
last = null;
start = offset;
} else if (!empty) {
assert first != null;
removedChunks.put(first.getKey().key(), first.getValue().ref());
while (it.hasNext() && it.peekNextKey().compareTo(last.getKey()) <= 0) {
var next = it.next();
Log.tracev("Next: {0}", next);
removedChunks.put(next.getKey().key(), next.getValue().ref());
try (var it = jMapHelper.getIterator(file, IteratorStart.LE, JMapLongKey.of(realOffset))) {
while (it.hasNext()) {
var curEntry = it.next();
long curChunkStart = curEntry.getKey().key();
var curChunkId = curEntry.getValue().ref();
long curChunkEnd = curChunkStart + getChunkSize(curChunkId);
if (curChunkEnd <= realOffset) break;
removedChunks.put(curEntry.getKey().key(), curChunkId);
if (curChunkStart < offset) {
if (curChunkStart < start)
start = curChunkStart;
var readChunk = readChunk(curChunkId);
pendingPrefix = pendingPrefix.concat(readChunk.substring(0, Math.min(readChunk.size(), (int) (offset - curChunkStart))));
}
removedChunks.put(last.getKey().key(), last.getValue().ref());
start = first.getKey().key();
if (curChunkEnd > writeEnd) {
var readChunk = readChunk(curChunkId);
pendingSuffix = pendingSuffix.concat(readChunk.substring((int) (writeEnd - curChunkStart), readChunk.size()));
}
if (curChunkEnd >= writeEnd) break;
}
}
// NavigableMap<Long, JObjectKey> beforeFirst = first != null ? chunksAll.headMap(first.getKey(), false) : Collections.emptyNavigableMap();
// NavigableMap<Long, JObjectKey> afterLast = last != null ? chunksAll.tailMap(last.getKey(), false) : Collections.emptyNavigableMap();
// if (first != null && (getChunkSize(first.getValue()) + first.getKey() <= offset)) {
// beforeFirst = chunksAll;
// afterLast = Collections.emptyNavigableMap();
// first = null;
// last = null;
// start = offset;
// } else if (!chunksAll.isEmpty()) {
// var between = chunksAll.subMap(first.getKey(), true, last.getKey(), true);
// removedChunks.putAll(between);
// start = first.getKey();
// }
ByteString pendingWrites = ByteString.empty();
if (first != null && first.getKey().key() < offset) {
var chunkBytes = readChunk(first.getValue().ref());
pendingWrites = pendingWrites.concat(chunkBytes.substring(0, (int) (offset - first.getKey().key())));
}
pendingWrites = pendingWrites.concat(data);
if (last != null) {
var lchunkBytes = readChunk(last.getValue().ref());
if (last.getKey().key() + lchunkBytes.size() > offset + data.size()) {
var startInFile = offset + data.size();
var startInChunk = startInFile - last.getKey().key();
pendingWrites = pendingWrites.concat(lchunkBytes.substring((int) startInChunk, lchunkBytes.size()));
}
}
ByteString pendingWrites = pendingPrefix.concat(data).concat(pendingSuffix);
int combinedSize = pendingWrites.size();
if (targetChunkSize > 0) {
// if (combinedSize < (targetChunkSize * writeMergeThreshold)) {
// boolean leftDone = false;
// boolean rightDone = false;
// while (!leftDone && !rightDone) {
// if (beforeFirst.isEmpty()) leftDone = true;
// if (!beforeFirst.isEmpty() || !leftDone) {
// var takeLeft = beforeFirst.lastEntry();
//
// var cuuid = takeLeft.getValue();
//
// if (getChunkSize(cuuid) >= (targetChunkSize * writeMergeMaxChunkToTake)) {
// leftDone = true;
// continue;
// }
//
// if ((combinedSize + getChunkSize(cuuid)) > (targetChunkSize * writeMergeLimit)) {
// leftDone = true;
// continue;
// }
//
// // FIXME: (and test this)
// beforeFirst = beforeFirst.headMap(takeLeft.getKey(), false);
// start = takeLeft.getKey();
// pendingWrites = readChunk(cuuid).concat(pendingWrites);
// combinedSize += getChunkSize(cuuid);
// removedChunks.put(takeLeft.getKey(), takeLeft.getValue());
// }
// if (afterLast.isEmpty()) rightDone = true;
// if (!afterLast.isEmpty() && !rightDone) {
// var takeRight = afterLast.firstEntry();
//
// var cuuid = takeRight.getValue();
//
// if (getChunkSize(cuuid) >= (targetChunkSize * writeMergeMaxChunkToTake)) {
// rightDone = true;
// continue;
// }
//
// if ((combinedSize + getChunkSize(cuuid)) > (targetChunkSize * writeMergeLimit)) {
// rightDone = true;
// continue;
// }
//
// // FIXME: (and test this)
// afterLast = afterLast.tailMap(takeRight.getKey(), false);
// pendingWrites = pendingWrites.concat(readChunk(cuuid));
// combinedSize += getChunkSize(cuuid);
// removedChunks.put(takeRight.getKey(), takeRight.getValue());
// }
// }
// }
}
NavigableMap<Long, JObjectKey> newChunks = new TreeMap<>();
{
int targetChunkSize = 1 << targetChunkAlignment;
int cur = 0;
while (cur < combinedSize) {
int end;
if (targetChunkSize <= 0)
if (targetChunkAlignment < 0)
end = combinedSize;
else {
if ((combinedSize - cur) > (targetChunkSize * writeLastChunkLimit)) {
end = Math.min(cur + targetChunkSize, combinedSize);
} else {
end = combinedSize;
}
}
else
end = Math.min(cur + targetChunkSize, combinedSize);
var thisChunk = pendingWrites.substring(cur, end);
@@ -557,7 +446,6 @@ public class DhfsFileServiceImpl implements DhfsFileService {
}
remoteTx.putData(file);
cleanupChunks(file, removedChunks.values());
return (long) data.size();
});
@@ -576,17 +464,8 @@ public class DhfsFileServiceImpl implements DhfsFileService {
}
if (length == 0) {
try (var it = jMapHelper.getIterator(file, IteratorStart.GE, JMapLongKey.of(0))) {
while (it.hasNext()) {
var next = it.next();
jMapHelper.delete(file, next.getKey());
}
}
// var oldChunks = file.chunks();
//
// file = file.withChunks(TreePMap.empty()).withMTime(System.currentTimeMillis());
jMapHelper.deleteAll(file);
remoteTx.putData(file);
// cleanupChunks(file, oldChunks.values());
return true;
}
@@ -689,7 +568,6 @@ public class DhfsFileServiceImpl implements DhfsFileService {
}
remoteTx.putData(file);
cleanupChunks(file, removedChunks.values());
return true;
});
}

View File

@@ -15,13 +15,7 @@ dhfs.fuse.debug=false
dhfs.fuse.enabled=true
dhfs.files.allow_recursive_delete=false
dhfs.files.target_chunk_size=2097152
# Writes strictly smaller than this will try to merge with blocks nearby
dhfs.files.write_merge_threshold=0.8
# If a merge would result in a block of greater size than this, stop merging
dhfs.files.write_merge_limit=1.2
# Don't take blocks of this size and above when merging
dhfs.files.write_merge_max_chunk_to_take=1
dhfs.files.write_last_chunk_limit=1.5
dhfs.files.target_chunk_alignment=19
dhfs.objects.deletion.delay=1000
dhfs.objects.deletion.can-delete-retry-delay=10000
dhfs.objects.ref_verification=true

View File

@@ -1,12 +1,12 @@
package com.usatiuk.dhfs.files;
import com.usatiuk.dhfs.RemoteTransaction;
import com.usatiuk.dhfs.TempDataProfile;
import com.usatiuk.dhfs.files.objects.File;
import com.usatiuk.dhfs.files.service.DhfsFileService;
import com.usatiuk.dhfs.RemoteTransaction;
import com.usatiuk.kleppmanntree.AlreadyExistsException;
import com.usatiuk.objects.transaction.Transaction;
import com.usatiuk.objects.transaction.TransactionManager;
import com.usatiuk.kleppmanntree.AlreadyExistsException;
import jakarta.inject.Inject;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.RepeatedTest;
@@ -27,6 +27,7 @@ class Profiles {
protected void getConfigOverrides(Map<String, String> ret) {
ret.put("dhfs.fuse.enabled", "false");
ret.put("dhfs.files.target_chunk_size", "-1");
ret.put("dhfs.files.target_chunk_alignment", "-1");
}
}
@@ -35,6 +36,7 @@ class Profiles {
protected void getConfigOverrides(Map<String, String> ret) {
ret.put("dhfs.fuse.enabled", "false");
ret.put("dhfs.files.target_chunk_size", "3");
ret.put("dhfs.files.target_chunk_alignment", "2");
}
}
}
@@ -150,6 +152,7 @@ public abstract class DhfsFileServiceSimpleTestImpl {
Assertions.assertArrayEquals(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, fileService.read(uuid, 0, 10).get().toByteArray());
fileService.truncate(uuid, 20);
Assertions.assertArrayEquals(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, fileService.read(uuid, 0, 20).get().toByteArray());
fileService.write(uuid, 5, new byte[]{10, 11, 12, 13, 14, 15, 16, 17});
Assertions.assertArrayEquals(new byte[]{0, 1, 2, 3, 4, 10, 11, 12, 13, 14, 15, 16, 17, 0, 0, 0, 0, 0, 0, 0}, fileService.read(uuid, 0, 20).get().toByteArray());
}