a little broken chunk merging with nursery

This commit is contained in:
2024-06-21 21:00:46 +02:00
parent ae741841fc
commit 76541fd7c1
12 changed files with 186 additions and 65 deletions

View File

@@ -40,4 +40,9 @@ public class ChunkData extends JObjectData {
public static String getNameFromHash(String hash) {
return "data_" + hash;
}
@Override
public boolean assumeUnique() {
return true;
}
}

View File

@@ -43,4 +43,9 @@ public class ChunkInfo extends JObjectData {
public static String getNameFromHash(String hash) {
return "info_" + hash;
}
@Override
public boolean assumeUnique() {
return true;
}
}

View File

@@ -297,6 +297,28 @@ public class DhfsFileServiceImpl implements DhfsFileService {
return Optional.of(Arrays.copyOf(buf.array(), (int) (curPos - offset)));
}
private Integer getChunkSize(String uuid) {
var chunkRead = jObjectManager.get(ChunkInfo.getNameFromHash(uuid), ChunkInfo.class);
if (chunkRead.isEmpty()) {
Log.error("Chunk requested not found: " + uuid);
throw new StatusRuntimeException(Status.NOT_FOUND);
}
return chunkRead.get().runReadLocked((m, d) -> d.getSize());
}
private byte[] readChunk(String uuid) {
var chunkRead = jObjectManager.get(ChunkData.getNameFromHash(uuid), ChunkData.class);
if (chunkRead.isEmpty()) {
Log.error("Chunk requested not found: " + uuid);
throw new StatusRuntimeException(Status.NOT_FOUND);
}
return chunkRead.get().runReadLocked((m, d) -> d.getBytes());
}
@Override
public Long write(String fileUuid, long offset, byte[] data) {
var fileOpt = jObjectManager.get(fileUuid, File.class);
@@ -307,70 +329,140 @@ public class DhfsFileServiceImpl implements DhfsFileService {
var file = fileOpt.get();
// FIXME:
file.runWriteLocked((meta, fData, bump) -> {
var removedChunksOuter = file.runWriteLocked((meta, fData, bump) -> {
var chunksAll = fData.getChunks();
var first = chunksAll.floorEntry(offset);
var last = chunksAll.floorEntry((offset + data.length) - 1);
TreeSet<String> removedChunks = new TreeSet<>();
long start = 0;
if (!chunksAll.isEmpty()) {
var between = chunksAll.subMap(first.getKey(), true, last.getKey(), true);
removedChunks.addAll(between.values());
start = first.getKey();
between.clear();
}
NavigableMap<Long, String> beforeFirst = first != null ? chunksAll.headMap(first.getKey(), false) : Collections.emptyNavigableMap();
NavigableMap<Long, String> afterLast = last != null ? chunksAll.tailMap(last.getKey(), false) : Collections.emptyNavigableMap();
List<byte[]> pendingWrites = new LinkedList<>();
int combinedSize = 0;
if (first != null && first.getKey() < offset) {
var chunkUuid = first.getValue();
var chunkRead = jObjectManager.get(ChunkData.getNameFromHash(chunkUuid), ChunkData.class);
if (chunkRead.isEmpty()) {
Log.error("Chunk requested not found: " + chunkUuid);
return -1L;
}
var chunkBytes = chunkRead.get().runReadLocked((m, d) -> d.getBytes());
ChunkData newChunkData = new ChunkData(Arrays.copyOfRange(chunkBytes, 0, (int) (offset - first.getKey())));
ChunkInfo newChunkInfo = new ChunkInfo(newChunkData.getHash(), newChunkData.getBytes().length);
jObjectManager.put(newChunkData);
jObjectManager.put(newChunkInfo);
chunksAll.put(first.getKey(), newChunkData.getHash());
var chunkBytes = readChunk(first.getValue());
pendingWrites.addLast(Arrays.copyOfRange(chunkBytes, 0, (int) (offset - first.getKey())));
combinedSize += pendingWrites.getLast().length;
}
pendingWrites.addLast(data);
combinedSize += pendingWrites.getLast().length;
{
ChunkData newChunkData = new ChunkData(data);
ChunkInfo newChunkInfo = new ChunkInfo(newChunkData.getHash(), newChunkData.getBytes().length);
jObjectManager.put(newChunkData);
jObjectManager.put(newChunkInfo);
chunksAll.put(offset, newChunkData.getHash());
}
if (last != null) {
var lchunkUuid = last.getValue();
var lchunkRead = jObjectManager.get(ChunkData.getNameFromHash(lchunkUuid), ChunkData.class);
if (lchunkRead.isEmpty()) {
Log.error("Chunk requested not found: " + lchunkUuid);
return -1L;
}
var lchunkBytes = lchunkRead.get().runReadLocked((m, d) -> d.getBytes());
var lchunkBytes = readChunk(last.getValue());
if (last.getKey() + lchunkBytes.length > offset + data.length) {
var startInFile = offset + data.length;
var startInChunk = startInFile - last.getKey();
ChunkData newChunkData = new ChunkData(Arrays.copyOfRange(lchunkBytes, (int) startInChunk, lchunkBytes.length));
pendingWrites.addLast(Arrays.copyOfRange(lchunkBytes, (int) startInChunk, lchunkBytes.length));
combinedSize += pendingWrites.getLast().length;
}
}
if (Math.abs(combinedSize - targetChunkSize) > targetChunkSize * 0.1) {
if (combinedSize < targetChunkSize) {
boolean leftDone = false;
boolean rightDone = false;
while (!leftDone && !rightDone) {
if (beforeFirst.isEmpty()) leftDone = true;
if (!beforeFirst.isEmpty() && !leftDone) {
var takeLeft = beforeFirst.lastEntry();
var cuuid = takeLeft.getValue();
if ((combinedSize + getChunkSize(cuuid)) > (targetChunkSize * 1.2)) {
leftDone = true;
continue;
}
beforeFirst.pollLastEntry();
start = takeLeft.getKey();
pendingWrites.addFirst(readChunk(cuuid));
combinedSize += getChunkSize(cuuid);
chunksAll.remove(takeLeft.getKey());
removedChunks.add(cuuid);
}
if (afterLast.isEmpty()) rightDone = true;
if (!afterLast.isEmpty() && !rightDone) {
var takeRight = afterLast.firstEntry();
var cuuid = takeRight.getValue();
if ((combinedSize + getChunkSize(cuuid)) > (targetChunkSize * 1.2)) {
rightDone = true;
continue;
}
afterLast.pollFirstEntry();
pendingWrites.addLast(readChunk(cuuid));
combinedSize += getChunkSize(cuuid);
chunksAll.remove(takeRight.getKey());
removedChunks.add(cuuid);
}
}
}
}
// FIXME:!
byte[] realbytes = new byte[combinedSize];
{
int cur = 0;
for (var b : pendingWrites) {
System.arraycopy(b, 0, realbytes, cur, b.length);
cur += b.length;
}
pendingWrites.clear();
}
{
int cur = 0;
while (cur < combinedSize) {
int end;
if ((combinedSize - cur) > (targetChunkSize * 1.5)) {
end = cur + targetChunkSize;
} else {
end = combinedSize;
}
byte[] thisChunk = new byte[end - cur];
System.arraycopy(realbytes, cur, thisChunk, 0, thisChunk.length);
ChunkData newChunkData = new ChunkData(thisChunk);
ChunkInfo newChunkInfo = new ChunkInfo(newChunkData.getHash(), newChunkData.getBytes().length);
jObjectManager.put(newChunkData);
jObjectManager.put(newChunkInfo);
chunksAll.put(start, newChunkInfo.getHash());
chunksAll.put(startInFile, newChunkData.getHash());
start += thisChunk.length;
cur = end;
}
}
bump.apply();
fData.setMtime(System.currentTimeMillis());
return null;
return removedChunks;
});
for (var v : removedChunksOuter) {
var ci = jObjectManager.get(ChunkInfo.getNameFromHash(v), ChunkInfo.class);
if (ci.isPresent())
jObjectManager.unref(ci.get());
var cd = jObjectManager.get(ChunkData.getNameFromHash(v), ChunkData.class);
if (cd.isPresent())
jObjectManager.unref(cd.get());
}
return (long) data.length;
}

View File

@@ -72,7 +72,7 @@ public class JObject<T extends JObjectData> implements Serializable {
}
public <X> boolean isOf(Class<X> klass) {
return runReadLocked((m) -> (klass.isAssignableFrom(m.getType())));
return (klass.isAssignableFrom(_metaPart.getType()));
}
public <R> R runReadLocked(ObjectMetaFn<R> fn) {

View File

@@ -7,4 +7,8 @@ import java.io.Serializable;
public abstract class JObjectData implements Serializable {
public abstract String getName();
public abstract Class<? extends ConflictResolver> getConflictResolver();
public boolean assumeUnique() {
return false;
}
}

View File

@@ -12,7 +12,10 @@ import org.apache.commons.lang3.NotImplementedException;
import java.lang.ref.ReferenceQueue;
import java.lang.ref.SoftReference;
import java.util.*;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Optional;
@ApplicationScoped
public class JObjectManagerImpl implements JObjectManager {
@@ -92,7 +95,7 @@ public class JObjectManagerImpl implements JObjectManager {
@Override
public <D extends JObjectData> Optional<JObject<? extends D>> get(String name, Class<D> klass) {
var got = get(name);
if (got.isEmpty()) return Optional.of((JObject<? extends D>) got.get());
if (got.isEmpty()) return Optional.empty();
if (!got.get().isOf(klass)) throw new NotImplementedException("Class mismatch for " + name);
return Optional.of((JObject<? extends D>) got.get());
}
@@ -115,18 +118,15 @@ public class JObjectManagerImpl implements JObjectManager {
synchronized (this) {
var inMap = getFromMap(object.getName());
if (inMap != null) {
inMap.runReadLocked((m, d) -> {
if (!Objects.equals(d, object))
throw new IllegalArgumentException("Trying to insert different object with same key");
return null;
});
_nurseryRefcounts.merge(object.getName(), 1L, Long::sum);
if (!object.assumeUnique())
throw new IllegalArgumentException("Trying to insert different object with same key");
addToNursery(object.getName());
return (JObject<D>) inMap;
} else {
var created = new JObject<D>(jObjectResolver, object.getName(), object.getConflictResolver().getName(), object);
_map.put(object.getName(), new NamedSoftReference(created, _refQueue));
jObjectResolver.notifyWrite(created);
_nurseryRefcounts.merge(object.getName(), 1L, Long::sum);
addToNursery(created.getName());
return created;
}
}
@@ -173,10 +173,7 @@ public class JObjectManagerImpl implements JObjectManager {
synchronized (this) {
var inMap = getFromMap(object.getName());
if (inMap != null) {
var ok = inMap.runReadLocked((m) -> {
return object.getClass().isAssignableFrom(m.getType());
});
if (ok)
if (inMap.isOf(object.getClass()))
return (JObject<D>) inMap;
else
throw new NotImplementedException("Type mismatch for " + name);
@@ -189,6 +186,13 @@ public class JObjectManagerImpl implements JObjectManager {
}
}
private void addToNursery(String name) {
synchronized (this) {
if (!objectPersistentStore.existsObject("meta_" + name))
_nurseryRefcounts.merge(name, 1L, Long::sum);
}
}
@Override
public void onWriteback(String name) {
synchronized (this) {
@@ -198,19 +202,26 @@ public class JObjectManagerImpl implements JObjectManager {
@Override
public void unref(JObject<?> object) {
synchronized (this) {
object.runWriteLockedMeta((m, a, b) -> {
String name = m.getName();
object.runWriteLockedMeta((m, a, b) -> {
String name = m.getName();
boolean removed = false;
synchronized (this) {
if (!_nurseryRefcounts.containsKey(name)) return null;
_nurseryRefcounts.merge(name, -1L, Long::sum);
if (_nurseryRefcounts.get(name) <= 0) {
_nurseryRefcounts.remove(name);
jObjectWriteback.remove(name);
removed = true;
}
}
// Race?
if (removed) {
jObjectWriteback.remove(name);
synchronized (this) {
if (!objectPersistentStore.existsObject("meta_" + name))
_map.remove(name);
}
return null;
});
}
}
return null;
});
}
}

View File

@@ -40,7 +40,7 @@ public class JObjectResolver {
public void removeLocal(JObject<?> jObject, String name) {
jObject.assertRWLock();
try {
Log.info("Deleting " + name);
// Log.info("Deleting " + name);
jObjectWriteback.remove(name);
objectPersistentStore.deleteObject(name);
} catch (StatusRuntimeException sx) {

View File

@@ -3,6 +3,7 @@ package com.usatiuk.dhfs.storage.objects.jrepository;
import com.usatiuk.dhfs.storage.objects.repository.persistence.ObjectPersistentStore;
import io.quarkus.runtime.ShutdownEvent;
import io.quarkus.scheduler.Scheduled;
import io.smallrye.common.annotation.Blocking;
import io.smallrye.common.annotation.RunOnVirtualThread;
import jakarta.annotation.Priority;
import jakarta.enterprise.context.ApplicationScoped;
@@ -41,7 +42,7 @@ public class JObjectWriteback {
}
@Scheduled(every = "1s", concurrentExecution = Scheduled.ConcurrentExecution.SKIP)
@RunOnVirtualThread
@Blocking
public void flush() {
while (true) {
JObject<?> obj;

View File

@@ -2,6 +2,7 @@ package com.usatiuk.dhfs.storage.objects.repository.distributed;
import io.quarkus.logging.Log;
import io.quarkus.scheduler.Scheduled;
import io.smallrye.common.annotation.Blocking;
import io.smallrye.common.annotation.RunOnVirtualThread;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
@@ -17,7 +18,7 @@ public class InvalidationQueueService {
RemoteObjectServiceClient remoteObjectServiceClient;
@Scheduled(every = "1s", concurrentExecution = Scheduled.ConcurrentExecution.SKIP)
@RunOnVirtualThread
@Blocking
public void trySend() {
var data = _data.runReadLocked(InvalidationQueueData::pullAll);
for (var forHost : data.entrySet()) {

View File

@@ -70,13 +70,15 @@ public class RemoteObjectServiceClient {
}
public void notifyUpdate(String host, String name) {
var obj = jObjectManager.get(name).orElseThrow(() -> new NotImplementedException("Race when invalidating"));
var obj = jObjectManager.get(name);
if (obj.isEmpty()) return;
remoteHostManager.withClient(host, client -> {
var builder = IndexUpdatePush.newBuilder().setSelfname(selfname);
client.indexUpdate(builder.setHeader(
obj.runReadLocked(ObjectMetadata::toRpcHeader)
obj.get().runReadLocked(ObjectMetadata::toRpcHeader)
).build());
return null;
});

View File

@@ -71,7 +71,7 @@ public class RemoteObjectServiceServer implements DhfsObjectSyncGrpc {
if (request.getSelfname().isBlank()) throw new StatusRuntimeException(Status.INVALID_ARGUMENT);
remoteHostManager.handleConnectionSuccess(request.getSelfname());
Log.info("<-- indexUpdate: " + request.getHeader().getName());
// Log.info("<-- indexUpdate: " + request.getHeader().getName());
return Uni.createFrom().item(syncHandler.handleRemoteUpdate(request));
}

View File

@@ -2,7 +2,7 @@ quarkus.grpc.server.use-separate-server=false
dhfs.objects.persistence.files.root=${HOME}/dhfs_data/dhfs_root
dhfs.objects.distributed.root=${HOME}/dhfs_data/dhfs_root_d
dhfs.fuse.root=${HOME}/dhfs_data/dhfs_fuse_root
dhfs.storage.files.target_chunk_size=4194304
dhfs.storage.files.target_chunk_size=524288
quarkus.quartz.shutdown-wait-time=31D
grpc.client.greeting-service.max-inbound-message-size=9155241000
grpc.client.greeting-service.package-max-inbound-message-size=9155241000