From 76541fd7c1af3cc1c3f09875ddcff04acdd393c6 Mon Sep 17 00:00:00 2001 From: Stepan Usatiuk Date: Fri, 21 Jun 2024 21:00:46 +0200 Subject: [PATCH] a little broken chunk merging with nursery --- .../dhfs/storage/files/objects/ChunkData.java | 5 + .../dhfs/storage/files/objects/ChunkInfo.java | 5 + .../files/service/DhfsFileServiceImpl.java | 166 ++++++++++++++---- .../storage/objects/jrepository/JObject.java | 2 +- .../objects/jrepository/JObjectData.java | 4 + .../jrepository/JObjectManagerImpl.java | 51 +++--- .../objects/jrepository/JObjectResolver.java | 2 +- .../objects/jrepository/JObjectWriteback.java | 3 +- .../distributed/InvalidationQueueService.java | 3 +- .../RemoteObjectServiceClient.java | 6 +- .../RemoteObjectServiceServer.java | 2 +- .../src/main/resources/application.properties | 2 +- 12 files changed, 186 insertions(+), 65 deletions(-) diff --git a/server/src/main/java/com/usatiuk/dhfs/storage/files/objects/ChunkData.java b/server/src/main/java/com/usatiuk/dhfs/storage/files/objects/ChunkData.java index 4e3082e1..1597cd57 100644 --- a/server/src/main/java/com/usatiuk/dhfs/storage/files/objects/ChunkData.java +++ b/server/src/main/java/com/usatiuk/dhfs/storage/files/objects/ChunkData.java @@ -40,4 +40,9 @@ public class ChunkData extends JObjectData { public static String getNameFromHash(String hash) { return "data_" + hash; } + + @Override + public boolean assumeUnique() { + return true; + } } diff --git a/server/src/main/java/com/usatiuk/dhfs/storage/files/objects/ChunkInfo.java b/server/src/main/java/com/usatiuk/dhfs/storage/files/objects/ChunkInfo.java index ddbd3629..906e4a03 100644 --- a/server/src/main/java/com/usatiuk/dhfs/storage/files/objects/ChunkInfo.java +++ b/server/src/main/java/com/usatiuk/dhfs/storage/files/objects/ChunkInfo.java @@ -43,4 +43,9 @@ public class ChunkInfo extends JObjectData { public static String getNameFromHash(String hash) { return "info_" + hash; } + + @Override + public boolean assumeUnique() { + return true; + } } diff --git a/server/src/main/java/com/usatiuk/dhfs/storage/files/service/DhfsFileServiceImpl.java b/server/src/main/java/com/usatiuk/dhfs/storage/files/service/DhfsFileServiceImpl.java index 8e12e347..6011c896 100644 --- a/server/src/main/java/com/usatiuk/dhfs/storage/files/service/DhfsFileServiceImpl.java +++ b/server/src/main/java/com/usatiuk/dhfs/storage/files/service/DhfsFileServiceImpl.java @@ -297,6 +297,28 @@ public class DhfsFileServiceImpl implements DhfsFileService { return Optional.of(Arrays.copyOf(buf.array(), (int) (curPos - offset))); } + private Integer getChunkSize(String uuid) { + var chunkRead = jObjectManager.get(ChunkInfo.getNameFromHash(uuid), ChunkInfo.class); + + if (chunkRead.isEmpty()) { + Log.error("Chunk requested not found: " + uuid); + throw new StatusRuntimeException(Status.NOT_FOUND); + } + + return chunkRead.get().runReadLocked((m, d) -> d.getSize()); + } + + private byte[] readChunk(String uuid) { + var chunkRead = jObjectManager.get(ChunkData.getNameFromHash(uuid), ChunkData.class); + + if (chunkRead.isEmpty()) { + Log.error("Chunk requested not found: " + uuid); + throw new StatusRuntimeException(Status.NOT_FOUND); + } + + return chunkRead.get().runReadLocked((m, d) -> d.getBytes()); + } + @Override public Long write(String fileUuid, long offset, byte[] data) { var fileOpt = jObjectManager.get(fileUuid, File.class); @@ -307,70 +329,140 @@ public class DhfsFileServiceImpl implements DhfsFileService { var file = fileOpt.get(); // FIXME: - file.runWriteLocked((meta, fData, bump) -> { + var removedChunksOuter = file.runWriteLocked((meta, fData, bump) -> { var chunksAll = fData.getChunks(); var first = chunksAll.floorEntry(offset); var last = chunksAll.floorEntry((offset + data.length) - 1); + TreeSet removedChunks = new TreeSet<>(); + + long start = 0; + if (!chunksAll.isEmpty()) { var between = chunksAll.subMap(first.getKey(), true, last.getKey(), true); + removedChunks.addAll(between.values()); + start = first.getKey(); between.clear(); } + NavigableMap beforeFirst = first != null ? chunksAll.headMap(first.getKey(), false) : Collections.emptyNavigableMap(); + NavigableMap afterLast = last != null ? chunksAll.tailMap(last.getKey(), false) : Collections.emptyNavigableMap(); + + List pendingWrites = new LinkedList<>(); + int combinedSize = 0; + if (first != null && first.getKey() < offset) { - var chunkUuid = first.getValue(); - var chunkRead = jObjectManager.get(ChunkData.getNameFromHash(chunkUuid), ChunkData.class); - - if (chunkRead.isEmpty()) { - Log.error("Chunk requested not found: " + chunkUuid); - return -1L; - } - - var chunkBytes = chunkRead.get().runReadLocked((m, d) -> d.getBytes()); - ChunkData newChunkData = new ChunkData(Arrays.copyOfRange(chunkBytes, 0, (int) (offset - first.getKey()))); - ChunkInfo newChunkInfo = new ChunkInfo(newChunkData.getHash(), newChunkData.getBytes().length); - jObjectManager.put(newChunkData); - jObjectManager.put(newChunkInfo); - - chunksAll.put(first.getKey(), newChunkData.getHash()); + var chunkBytes = readChunk(first.getValue()); + pendingWrites.addLast(Arrays.copyOfRange(chunkBytes, 0, (int) (offset - first.getKey()))); + combinedSize += pendingWrites.getLast().length; } + pendingWrites.addLast(data); + combinedSize += pendingWrites.getLast().length; - { - ChunkData newChunkData = new ChunkData(data); - ChunkInfo newChunkInfo = new ChunkInfo(newChunkData.getHash(), newChunkData.getBytes().length); - jObjectManager.put(newChunkData); - jObjectManager.put(newChunkInfo); - - chunksAll.put(offset, newChunkData.getHash()); - } if (last != null) { - var lchunkUuid = last.getValue(); - var lchunkRead = jObjectManager.get(ChunkData.getNameFromHash(lchunkUuid), ChunkData.class); - - if (lchunkRead.isEmpty()) { - Log.error("Chunk requested not found: " + lchunkUuid); - return -1L; - } - - var lchunkBytes = lchunkRead.get().runReadLocked((m, d) -> d.getBytes()); - + var lchunkBytes = readChunk(last.getValue()); if (last.getKey() + lchunkBytes.length > offset + data.length) { var startInFile = offset + data.length; var startInChunk = startInFile - last.getKey(); - ChunkData newChunkData = new ChunkData(Arrays.copyOfRange(lchunkBytes, (int) startInChunk, lchunkBytes.length)); + pendingWrites.addLast(Arrays.copyOfRange(lchunkBytes, (int) startInChunk, lchunkBytes.length)); + combinedSize += pendingWrites.getLast().length; + } + } + + if (Math.abs(combinedSize - targetChunkSize) > targetChunkSize * 0.1) { + if (combinedSize < targetChunkSize) { + boolean leftDone = false; + boolean rightDone = false; + while (!leftDone && !rightDone) { + if (beforeFirst.isEmpty()) leftDone = true; + if (!beforeFirst.isEmpty() && !leftDone) { + var takeLeft = beforeFirst.lastEntry(); + + var cuuid = takeLeft.getValue(); + + if ((combinedSize + getChunkSize(cuuid)) > (targetChunkSize * 1.2)) { + leftDone = true; + continue; + } + + beforeFirst.pollLastEntry(); + start = takeLeft.getKey(); + pendingWrites.addFirst(readChunk(cuuid)); + combinedSize += getChunkSize(cuuid); + chunksAll.remove(takeLeft.getKey()); + removedChunks.add(cuuid); + } + if (afterLast.isEmpty()) rightDone = true; + if (!afterLast.isEmpty() && !rightDone) { + var takeRight = afterLast.firstEntry(); + + var cuuid = takeRight.getValue(); + + if ((combinedSize + getChunkSize(cuuid)) > (targetChunkSize * 1.2)) { + rightDone = true; + continue; + } + + afterLast.pollFirstEntry(); + pendingWrites.addLast(readChunk(cuuid)); + combinedSize += getChunkSize(cuuid); + chunksAll.remove(takeRight.getKey()); + removedChunks.add(cuuid); + } + } + } + } + + // FIXME:! + + byte[] realbytes = new byte[combinedSize]; + { + int cur = 0; + for (var b : pendingWrites) { + System.arraycopy(b, 0, realbytes, cur, b.length); + cur += b.length; + } + pendingWrites.clear(); + } + + { + int cur = 0; + while (cur < combinedSize) { + int end; + if ((combinedSize - cur) > (targetChunkSize * 1.5)) { + end = cur + targetChunkSize; + } else { + end = combinedSize; + } + + byte[] thisChunk = new byte[end - cur]; + System.arraycopy(realbytes, cur, thisChunk, 0, thisChunk.length); + + ChunkData newChunkData = new ChunkData(thisChunk); ChunkInfo newChunkInfo = new ChunkInfo(newChunkData.getHash(), newChunkData.getBytes().length); jObjectManager.put(newChunkData); jObjectManager.put(newChunkInfo); + chunksAll.put(start, newChunkInfo.getHash()); - chunksAll.put(startInFile, newChunkData.getHash()); + start += thisChunk.length; + cur = end; } } bump.apply(); fData.setMtime(System.currentTimeMillis()); - return null; + return removedChunks; }); + for (var v : removedChunksOuter) { + var ci = jObjectManager.get(ChunkInfo.getNameFromHash(v), ChunkInfo.class); + if (ci.isPresent()) + jObjectManager.unref(ci.get()); + var cd = jObjectManager.get(ChunkData.getNameFromHash(v), ChunkData.class); + if (cd.isPresent()) + jObjectManager.unref(cd.get()); + } + return (long) data.length; } diff --git a/server/src/main/java/com/usatiuk/dhfs/storage/objects/jrepository/JObject.java b/server/src/main/java/com/usatiuk/dhfs/storage/objects/jrepository/JObject.java index 361efb25..99f57e5f 100644 --- a/server/src/main/java/com/usatiuk/dhfs/storage/objects/jrepository/JObject.java +++ b/server/src/main/java/com/usatiuk/dhfs/storage/objects/jrepository/JObject.java @@ -72,7 +72,7 @@ public class JObject implements Serializable { } public boolean isOf(Class klass) { - return runReadLocked((m) -> (klass.isAssignableFrom(m.getType()))); + return (klass.isAssignableFrom(_metaPart.getType())); } public R runReadLocked(ObjectMetaFn fn) { diff --git a/server/src/main/java/com/usatiuk/dhfs/storage/objects/jrepository/JObjectData.java b/server/src/main/java/com/usatiuk/dhfs/storage/objects/jrepository/JObjectData.java index ceff3cd0..de750089 100644 --- a/server/src/main/java/com/usatiuk/dhfs/storage/objects/jrepository/JObjectData.java +++ b/server/src/main/java/com/usatiuk/dhfs/storage/objects/jrepository/JObjectData.java @@ -7,4 +7,8 @@ import java.io.Serializable; public abstract class JObjectData implements Serializable { public abstract String getName(); public abstract Class getConflictResolver(); + + public boolean assumeUnique() { + return false; + } } diff --git a/server/src/main/java/com/usatiuk/dhfs/storage/objects/jrepository/JObjectManagerImpl.java b/server/src/main/java/com/usatiuk/dhfs/storage/objects/jrepository/JObjectManagerImpl.java index bc603a30..ee8d5ddf 100644 --- a/server/src/main/java/com/usatiuk/dhfs/storage/objects/jrepository/JObjectManagerImpl.java +++ b/server/src/main/java/com/usatiuk/dhfs/storage/objects/jrepository/JObjectManagerImpl.java @@ -12,7 +12,10 @@ import org.apache.commons.lang3.NotImplementedException; import java.lang.ref.ReferenceQueue; import java.lang.ref.SoftReference; -import java.util.*; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.Optional; @ApplicationScoped public class JObjectManagerImpl implements JObjectManager { @@ -92,7 +95,7 @@ public class JObjectManagerImpl implements JObjectManager { @Override public Optional> get(String name, Class klass) { var got = get(name); - if (got.isEmpty()) return Optional.of((JObject) got.get()); + if (got.isEmpty()) return Optional.empty(); if (!got.get().isOf(klass)) throw new NotImplementedException("Class mismatch for " + name); return Optional.of((JObject) got.get()); } @@ -115,18 +118,15 @@ public class JObjectManagerImpl implements JObjectManager { synchronized (this) { var inMap = getFromMap(object.getName()); if (inMap != null) { - inMap.runReadLocked((m, d) -> { - if (!Objects.equals(d, object)) - throw new IllegalArgumentException("Trying to insert different object with same key"); - return null; - }); - _nurseryRefcounts.merge(object.getName(), 1L, Long::sum); + if (!object.assumeUnique()) + throw new IllegalArgumentException("Trying to insert different object with same key"); + addToNursery(object.getName()); return (JObject) inMap; } else { var created = new JObject(jObjectResolver, object.getName(), object.getConflictResolver().getName(), object); _map.put(object.getName(), new NamedSoftReference(created, _refQueue)); jObjectResolver.notifyWrite(created); - _nurseryRefcounts.merge(object.getName(), 1L, Long::sum); + addToNursery(created.getName()); return created; } } @@ -173,10 +173,7 @@ public class JObjectManagerImpl implements JObjectManager { synchronized (this) { var inMap = getFromMap(object.getName()); if (inMap != null) { - var ok = inMap.runReadLocked((m) -> { - return object.getClass().isAssignableFrom(m.getType()); - }); - if (ok) + if (inMap.isOf(object.getClass())) return (JObject) inMap; else throw new NotImplementedException("Type mismatch for " + name); @@ -189,6 +186,13 @@ public class JObjectManagerImpl implements JObjectManager { } } + private void addToNursery(String name) { + synchronized (this) { + if (!objectPersistentStore.existsObject("meta_" + name)) + _nurseryRefcounts.merge(name, 1L, Long::sum); + } + } + @Override public void onWriteback(String name) { synchronized (this) { @@ -198,19 +202,26 @@ public class JObjectManagerImpl implements JObjectManager { @Override public void unref(JObject object) { - synchronized (this) { - object.runWriteLockedMeta((m, a, b) -> { - String name = m.getName(); + object.runWriteLockedMeta((m, a, b) -> { + String name = m.getName(); + boolean removed = false; + synchronized (this) { if (!_nurseryRefcounts.containsKey(name)) return null; _nurseryRefcounts.merge(name, -1L, Long::sum); if (_nurseryRefcounts.get(name) <= 0) { _nurseryRefcounts.remove(name); - jObjectWriteback.remove(name); + removed = true; + } + } + // Race? + if (removed) { + jObjectWriteback.remove(name); + synchronized (this) { if (!objectPersistentStore.existsObject("meta_" + name)) _map.remove(name); } - return null; - }); - } + } + return null; + }); } } diff --git a/server/src/main/java/com/usatiuk/dhfs/storage/objects/jrepository/JObjectResolver.java b/server/src/main/java/com/usatiuk/dhfs/storage/objects/jrepository/JObjectResolver.java index 4a53e03e..c3aaa9da 100644 --- a/server/src/main/java/com/usatiuk/dhfs/storage/objects/jrepository/JObjectResolver.java +++ b/server/src/main/java/com/usatiuk/dhfs/storage/objects/jrepository/JObjectResolver.java @@ -40,7 +40,7 @@ public class JObjectResolver { public void removeLocal(JObject jObject, String name) { jObject.assertRWLock(); try { - Log.info("Deleting " + name); +// Log.info("Deleting " + name); jObjectWriteback.remove(name); objectPersistentStore.deleteObject(name); } catch (StatusRuntimeException sx) { diff --git a/server/src/main/java/com/usatiuk/dhfs/storage/objects/jrepository/JObjectWriteback.java b/server/src/main/java/com/usatiuk/dhfs/storage/objects/jrepository/JObjectWriteback.java index 0a09daea..ad5367d9 100644 --- a/server/src/main/java/com/usatiuk/dhfs/storage/objects/jrepository/JObjectWriteback.java +++ b/server/src/main/java/com/usatiuk/dhfs/storage/objects/jrepository/JObjectWriteback.java @@ -3,6 +3,7 @@ package com.usatiuk.dhfs.storage.objects.jrepository; import com.usatiuk.dhfs.storage.objects.repository.persistence.ObjectPersistentStore; import io.quarkus.runtime.ShutdownEvent; import io.quarkus.scheduler.Scheduled; +import io.smallrye.common.annotation.Blocking; import io.smallrye.common.annotation.RunOnVirtualThread; import jakarta.annotation.Priority; import jakarta.enterprise.context.ApplicationScoped; @@ -41,7 +42,7 @@ public class JObjectWriteback { } @Scheduled(every = "1s", concurrentExecution = Scheduled.ConcurrentExecution.SKIP) - @RunOnVirtualThread + @Blocking public void flush() { while (true) { JObject obj; diff --git a/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/InvalidationQueueService.java b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/InvalidationQueueService.java index 74a86cac..5ea7b1ca 100644 --- a/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/InvalidationQueueService.java +++ b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/InvalidationQueueService.java @@ -2,6 +2,7 @@ package com.usatiuk.dhfs.storage.objects.repository.distributed; import io.quarkus.logging.Log; import io.quarkus.scheduler.Scheduled; +import io.smallrye.common.annotation.Blocking; import io.smallrye.common.annotation.RunOnVirtualThread; import jakarta.enterprise.context.ApplicationScoped; import jakarta.inject.Inject; @@ -17,7 +18,7 @@ public class InvalidationQueueService { RemoteObjectServiceClient remoteObjectServiceClient; @Scheduled(every = "1s", concurrentExecution = Scheduled.ConcurrentExecution.SKIP) - @RunOnVirtualThread + @Blocking public void trySend() { var data = _data.runReadLocked(InvalidationQueueData::pullAll); for (var forHost : data.entrySet()) { diff --git a/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/RemoteObjectServiceClient.java b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/RemoteObjectServiceClient.java index cec4b1e0..f2f42879 100644 --- a/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/RemoteObjectServiceClient.java +++ b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/RemoteObjectServiceClient.java @@ -70,13 +70,15 @@ public class RemoteObjectServiceClient { } public void notifyUpdate(String host, String name) { - var obj = jObjectManager.get(name).orElseThrow(() -> new NotImplementedException("Race when invalidating")); + var obj = jObjectManager.get(name); + + if (obj.isEmpty()) return; remoteHostManager.withClient(host, client -> { var builder = IndexUpdatePush.newBuilder().setSelfname(selfname); client.indexUpdate(builder.setHeader( - obj.runReadLocked(ObjectMetadata::toRpcHeader) + obj.get().runReadLocked(ObjectMetadata::toRpcHeader) ).build()); return null; }); diff --git a/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/RemoteObjectServiceServer.java b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/RemoteObjectServiceServer.java index d52de36b..587f9892 100644 --- a/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/RemoteObjectServiceServer.java +++ b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/RemoteObjectServiceServer.java @@ -71,7 +71,7 @@ public class RemoteObjectServiceServer implements DhfsObjectSyncGrpc { if (request.getSelfname().isBlank()) throw new StatusRuntimeException(Status.INVALID_ARGUMENT); remoteHostManager.handleConnectionSuccess(request.getSelfname()); - Log.info("<-- indexUpdate: " + request.getHeader().getName()); +// Log.info("<-- indexUpdate: " + request.getHeader().getName()); return Uni.createFrom().item(syncHandler.handleRemoteUpdate(request)); } diff --git a/server/src/main/resources/application.properties b/server/src/main/resources/application.properties index f5366702..947377a2 100644 --- a/server/src/main/resources/application.properties +++ b/server/src/main/resources/application.properties @@ -2,7 +2,7 @@ quarkus.grpc.server.use-separate-server=false dhfs.objects.persistence.files.root=${HOME}/dhfs_data/dhfs_root dhfs.objects.distributed.root=${HOME}/dhfs_data/dhfs_root_d dhfs.fuse.root=${HOME}/dhfs_data/dhfs_fuse_root -dhfs.storage.files.target_chunk_size=4194304 +dhfs.storage.files.target_chunk_size=524288 quarkus.quartz.shutdown-wait-time=31D grpc.client.greeting-service.max-inbound-message-size=9155241000 grpc.client.greeting-service.package-max-inbound-message-size=9155241000