From b3927e7a0fdc5d64081907ea402899a39d34f31e Mon Sep 17 00:00:00 2001 From: Stepan Usatiuk Date: Fri, 14 Jun 2024 21:32:11 +0200 Subject: [PATCH] simple read cache --- .../files/service/DhfsFileServiceImpl.java | 68 ++++++----- .../usatiuk/dhfs/storage/fuse/DhfsFuse.java | 38 +++--- .../objects/jrepository/JObjectManager.java | 12 ++ .../jrepository/JObjectManagerImpl.java | 109 ++++++++++++++++++ 4 files changed, 182 insertions(+), 45 deletions(-) create mode 100644 server/src/main/java/com/usatiuk/dhfs/storage/objects/jrepository/JObjectManager.java create mode 100644 server/src/main/java/com/usatiuk/dhfs/storage/objects/jrepository/JObjectManagerImpl.java diff --git a/server/src/main/java/com/usatiuk/dhfs/storage/files/service/DhfsFileServiceImpl.java b/server/src/main/java/com/usatiuk/dhfs/storage/files/service/DhfsFileServiceImpl.java index 83c0d927..cc204d39 100644 --- a/server/src/main/java/com/usatiuk/dhfs/storage/files/service/DhfsFileServiceImpl.java +++ b/server/src/main/java/com/usatiuk/dhfs/storage/files/service/DhfsFileServiceImpl.java @@ -4,7 +4,7 @@ import com.usatiuk.dhfs.storage.files.objects.Chunk; import com.usatiuk.dhfs.storage.files.objects.DirEntry; import com.usatiuk.dhfs.storage.files.objects.Directory; import com.usatiuk.dhfs.storage.files.objects.File; -import com.usatiuk.dhfs.storage.objects.jrepository.JObjectRepository; +import com.usatiuk.dhfs.storage.objects.jrepository.JObjectManager; import com.usatiuk.dhfs.storage.objects.repository.ObjectRepository; import io.quarkus.logging.Log; import io.quarkus.runtime.Shutdown; @@ -29,7 +29,7 @@ public class DhfsFileServiceImpl implements DhfsFileService { @Inject Vertx vertx; @Inject - JObjectRepository jObjectRepository; + JObjectManager jObjectManager; @Inject ObjectRepository objectRepository; @@ -39,7 +39,7 @@ public class DhfsFileServiceImpl implements DhfsFileService { Log.info("Initializing file service"); if (!objectRepository.existsObject(namespace, new UUID(0, 0).toString()).await().indefinitely()) { objectRepository.createNamespace(namespace).await().indefinitely(); - jObjectRepository.writeJObject(namespace, new Directory(new UUID(0, 0)).setMode(0755)).await().indefinitely(); + jObjectManager.put(namespace, new Directory(new UUID(0, 0)).setMode(0755)).await().indefinitely(); } getRoot().await().indefinitely(); } @@ -61,7 +61,7 @@ public class DhfsFileServiceImpl implements DhfsFileService { if (found == null) return Uni.createFrom().item(Optional.empty()); - var ref = jObjectRepository.readJObjectChecked(namespace, found.toString(), DirEntry.class) + var ref = jObjectManager.get(namespace, found.toString(), DirEntry.class) .await().indefinitely(); if (!ref.isPresent()) { @@ -109,9 +109,9 @@ public class DhfsFileServiceImpl implements DhfsFileService { File f = new File(fuuid); f.setMode(mode); - jObjectRepository.writeJObject(namespace, f).await().indefinitely(); + jObjectManager.put(namespace, f).await().indefinitely(); dir.getChildren().put(Path.of(name).getFileName().toString(), fuuid); - jObjectRepository.writeJObject(namespace, dir).await().indefinitely(); + jObjectManager.put(namespace, dir).await().indefinitely(); return Uni.createFrom().item(Optional.of(f)); } @@ -129,9 +129,9 @@ public class DhfsFileServiceImpl implements DhfsFileService { Directory d = new Directory(duuid); d.setMode(mode); - jObjectRepository.writeJObject(namespace, d).await().indefinitely(); + jObjectManager.put(namespace, d).await().indefinitely(); dir.getChildren().put(Path.of(name).getFileName().toString(), duuid); - jObjectRepository.writeJObject(namespace, dir).await().indefinitely(); + jObjectManager.put(namespace, dir).await().indefinitely(); return Uni.createFrom().item(Optional.of(d)); } @@ -145,7 +145,7 @@ public class DhfsFileServiceImpl implements DhfsFileService { if (!(found.get() instanceof Directory dir)) return Uni.createFrom().item(false); var removed = dir.getChildren().remove(Path.of(name).getFileName().toString()); - if (removed != null) jObjectRepository.writeJObject(namespace, dir).await().indefinitely(); + if (removed != null) jObjectManager.put(namespace, dir).await().indefinitely(); return Uni.createFrom().item(removed != null); } @@ -173,9 +173,8 @@ public class DhfsFileServiceImpl implements DhfsFileService { if (!(found.get() instanceof Directory dir)) return Uni.createFrom().item(false); - dir.getChildren().put(Path.of(to).getFileName().toString(), dent.get().getUuid()); - jObjectRepository.writeJObject(namespace, dir).await().indefinitely(); + jObjectManager.put(namespace, dir).await().indefinitely(); return Uni.createFrom().item(true); } @@ -187,7 +186,7 @@ public class DhfsFileServiceImpl implements DhfsFileService { dent.get().setMode(mode); - jObjectRepository.writeJObject(namespace, dent.get()).await().indefinitely(); + jObjectManager.put(namespace, dent.get()).await().indefinitely(); return Uni.createFrom().item(true); } @@ -204,7 +203,7 @@ public class DhfsFileServiceImpl implements DhfsFileService { @Override public Uni> read(String fileUuid, long offset, int length) { - var fileOpt = jObjectRepository.readJObjectChecked(namespace, fileUuid, File.class).await().indefinitely(); + var fileOpt = jObjectManager.get(namespace, fileUuid, File.class).await().indefinitely(); if (fileOpt.isEmpty()) { Log.error("File not found when trying to read: " + fileUuid); return Uni.createFrom().item(Optional.empty()); @@ -227,7 +226,7 @@ public class DhfsFileServiceImpl implements DhfsFileService { long toReadInChunk = (offset + length) - curPos; var chunkUuid = chunk.getValue(); - var chunkRead = jObjectRepository.readJObjectChecked(namespace, chunkUuid, Chunk.class).await().indefinitely(); + var chunkRead = jObjectManager.get(namespace, chunkUuid, Chunk.class).await().indefinitely(); if (chunkRead.isEmpty()) { Log.error("Chunk requested not found: " + chunkUuid); @@ -242,20 +241,23 @@ public class DhfsFileServiceImpl implements DhfsFileService { buf.put(chunkBytes, (int) offInChunk, (int) toReadReally); - if (readableLen >= toReadInChunk) + if (readableLen > toReadInChunk) break; else curPos += readableLen; + if (!chunks.hasNext()) break; + chunk = chunks.next(); } - return Uni.createFrom().item(Optional.of(buf.array())); + // FIXME: + return Uni.createFrom().item(Optional.of(Arrays.copyOf(buf.array(), (int) (curPos - offset)))); } @Override public Uni write(String fileUuid, long offset, byte[] data) { - var fileOpt = jObjectRepository.readJObjectChecked(namespace, fileUuid, File.class).await().indefinitely(); + var fileOpt = jObjectManager.get(namespace, fileUuid, File.class).await().indefinitely(); if (fileOpt.isEmpty()) { Log.error("File not found when trying to read: " + fileUuid); return Uni.createFrom().item(-1L); @@ -274,7 +276,7 @@ public class DhfsFileServiceImpl implements DhfsFileService { if (first != null && first.getKey() < offset) { var chunkUuid = first.getValue(); - var chunkRead = jObjectRepository.readJObjectChecked(namespace, chunkUuid, Chunk.class).await().indefinitely(); + var chunkRead = jObjectManager.get(namespace, chunkUuid, Chunk.class).await().indefinitely(); if (chunkRead.isEmpty()) { Log.error("Chunk requested not found: " + chunkUuid); @@ -283,20 +285,20 @@ public class DhfsFileServiceImpl implements DhfsFileService { var chunkBytes = chunkRead.get().getBytes(); Chunk newChunk = new Chunk(Arrays.copyOfRange(chunkBytes, 0, (int) (offset - first.getKey()))); - jObjectRepository.writeJObject(namespace, newChunk).await().indefinitely(); + jObjectManager.tryPut(namespace, newChunk).await().indefinitely(); newChunks.put(first.getKey(), newChunk.getHash()); } { Chunk newChunk = new Chunk(data); - jObjectRepository.writeJObject(namespace, newChunk).await().indefinitely(); + jObjectManager.tryPut(namespace, newChunk).await().indefinitely(); newChunks.put(offset, newChunk.getHash()); } if (last != null) { var lchunkUuid = last.getValue(); - var lchunkRead = jObjectRepository.readJObjectChecked(namespace, lchunkUuid, Chunk.class).await().indefinitely(); + var lchunkRead = jObjectManager.get(namespace, lchunkUuid, Chunk.class).await().indefinitely(); if (lchunkRead.isEmpty()) { Log.error("Chunk requested not found: " + lchunkUuid); @@ -308,20 +310,22 @@ public class DhfsFileServiceImpl implements DhfsFileService { if (last.getKey() + lchunkBytes.length > offset + data.length) { int start = (int) ((offset + data.length) - last.getKey()); Chunk newChunk = new Chunk(Arrays.copyOfRange(lchunkBytes, start, lchunkBytes.length - start)); - jObjectRepository.writeJObject(namespace, newChunk).await().indefinitely(); + jObjectManager.tryPut(namespace, newChunk).await().indefinitely(); newChunks.put(first.getKey(), newChunk.getHash()); } } - jObjectRepository.writeJObject(namespace, new File(UUID.fromString(fileUuid)).setChunks(newChunks)).await().indefinitely(); + file.setChunks(newChunks); + + jObjectManager.put(namespace, file).await().indefinitely(); return Uni.createFrom().item((long) data.length); } @Override public Uni truncate(String fileUuid, long length) { - var fileOpt = jObjectRepository.readJObjectChecked(namespace, fileUuid, File.class).await().indefinitely(); + var fileOpt = jObjectManager.get(namespace, fileUuid, File.class).await().indefinitely(); if (fileOpt.isEmpty()) { Log.error("File not found when trying to read: " + fileUuid); return Uni.createFrom().item(false); @@ -329,8 +333,8 @@ public class DhfsFileServiceImpl implements DhfsFileService { var file = fileOpt.get(); if (length == 0) { - jObjectRepository.writeJObject(namespace, new File(UUID.fromString(fileUuid)).setChunks(new TreeMap<>())) - .await().indefinitely(); + file.setChunks(new TreeMap<>()); + jObjectManager.put(namespace, file).await().indefinitely(); return Uni.createFrom().item(true); } @@ -342,7 +346,7 @@ public class DhfsFileServiceImpl implements DhfsFileService { if (lastChunk != null) { var chunkUuid = lastChunk.getValue(); - var chunkRead = jObjectRepository.readJObjectChecked(namespace, chunkUuid, Chunk.class).await().indefinitely(); + var chunkRead = jObjectManager.get(namespace, chunkUuid, Chunk.class).await().indefinitely(); if (chunkRead.isEmpty()) { Log.error("Chunk requested not found: " + chunkUuid); @@ -354,13 +358,15 @@ public class DhfsFileServiceImpl implements DhfsFileService { if (lastChunk.getKey() + chunkBytes.length > 0) { int start = (int) (length - lastChunk.getKey()); Chunk newChunk = new Chunk(Arrays.copyOfRange(chunkBytes, 0, (int) (length - start))); - jObjectRepository.writeJObject(namespace, newChunk).await().indefinitely(); + jObjectManager.tryPut(namespace, newChunk).await().indefinitely(); newChunks.put(lastChunk.getKey(), newChunk.getHash()); } } - jObjectRepository.writeJObject(namespace, new File(UUID.fromString(fileUuid)).setChunks(new TreeMap<>(newChunks))).await().indefinitely(); + file.setChunks(new TreeMap<>(newChunks)); + + jObjectManager.put(namespace, file).await().indefinitely(); return Uni.createFrom().item(true); } @@ -371,7 +377,7 @@ public class DhfsFileServiceImpl implements DhfsFileService { //FIXME: for (var chunk : f.getChunks().entrySet()) { var chunkUuid = chunk.getValue(); - var chunkRead = jObjectRepository.readJObjectChecked(namespace, chunkUuid, Chunk.class).await().indefinitely(); + var chunkRead = jObjectManager.get(namespace, chunkUuid, Chunk.class).await().indefinitely(); if (chunkRead.isEmpty()) { Log.error("Chunk requested not found: " + chunkUuid); @@ -386,7 +392,7 @@ public class DhfsFileServiceImpl implements DhfsFileService { @Override public Uni getRoot() { - var read = jObjectRepository.readJObjectChecked(namespace, new UUID(0, 0).toString(), DirEntry.class).await().indefinitely(); + var read = jObjectManager.get(namespace, new UUID(0, 0).toString(), DirEntry.class).await().indefinitely(); if (read.isEmpty() || !(read.get() instanceof Directory)) { Log.error("Root directory not found"); } diff --git a/server/src/main/java/com/usatiuk/dhfs/storage/fuse/DhfsFuse.java b/server/src/main/java/com/usatiuk/dhfs/storage/fuse/DhfsFuse.java index 912df352..3a546c51 100644 --- a/server/src/main/java/com/usatiuk/dhfs/storage/fuse/DhfsFuse.java +++ b/server/src/main/java/com/usatiuk/dhfs/storage/fuse/DhfsFuse.java @@ -89,24 +89,34 @@ public class DhfsFuse extends FuseStubFS { @Override public int read(String path, Pointer buf, long size, long offset, FuseFileInfo fi) { - var fileOpt = fileService.open(path).await().indefinitely(); - if (fileOpt.isEmpty()) return -ErrorCodes.ENOENT(); - var file = fileOpt.get(); - var read = fileService.read(file.getUuid().toString(), offset, (int) size).await().indefinitely(); - if (read.isEmpty()) return 0; - buf.put(0, read.get(), 0, read.get().length); - return read.get().length; + try { + var fileOpt = fileService.open(path).await().indefinitely(); + if (fileOpt.isEmpty()) return -ErrorCodes.ENOENT(); + var file = fileOpt.get(); + var read = fileService.read(file.getUuid().toString(), offset, (int) size).await().indefinitely(); + if (read.isEmpty()) return 0; + buf.put(0, read.get(), 0, read.get().length); + return read.get().length; + } catch (Exception e) { + Log.error("When reading " + path, e); + return -ErrorCodes.ENOENT(); + } } @Override public int write(String path, Pointer buf, long size, long offset, FuseFileInfo fi) { - var fileOpt = fileService.open(path).await().indefinitely(); - if (fileOpt.isEmpty()) return -ErrorCodes.ENOENT(); - var file = fileOpt.get(); - byte[] buffer = new byte[(int) size]; - buf.get(0, buffer, 0, (int) size); - var written = fileService.write(file.getUuid().toString(), offset, buffer).await().indefinitely(); - return written.intValue(); + try { + var fileOpt = fileService.open(path).await().indefinitely(); + if (fileOpt.isEmpty()) return -ErrorCodes.ENOENT(); + var file = fileOpt.get(); + byte[] buffer = new byte[(int) size]; + buf.get(0, buffer, 0, (int) size); + var written = fileService.write(file.getUuid().toString(), offset, buffer).await().indefinitely(); + return written.intValue(); + } catch (Exception e) { + Log.error("When writing " + path, e); + return -ErrorCodes.ENOENT(); + } } @Override diff --git a/server/src/main/java/com/usatiuk/dhfs/storage/objects/jrepository/JObjectManager.java b/server/src/main/java/com/usatiuk/dhfs/storage/objects/jrepository/JObjectManager.java new file mode 100644 index 00000000..e41267d2 --- /dev/null +++ b/server/src/main/java/com/usatiuk/dhfs/storage/objects/jrepository/JObjectManager.java @@ -0,0 +1,12 @@ +package com.usatiuk.dhfs.storage.objects.jrepository; + +import io.smallrye.mutiny.Uni; + +import java.util.Optional; + +public interface JObjectManager { + public Uni> get(String namespace, String key, Class clazz); + public Uni put(String namespace, T object); + // Returns the object from store if it existed, nothing otherwise + public Uni> tryPut(String namespace, T object); +} diff --git a/server/src/main/java/com/usatiuk/dhfs/storage/objects/jrepository/JObjectManagerImpl.java b/server/src/main/java/com/usatiuk/dhfs/storage/objects/jrepository/JObjectManagerImpl.java new file mode 100644 index 00000000..f1e2a183 --- /dev/null +++ b/server/src/main/java/com/usatiuk/dhfs/storage/objects/jrepository/JObjectManagerImpl.java @@ -0,0 +1,109 @@ +package com.usatiuk.dhfs.storage.objects.jrepository; + +import io.quarkus.logging.Log; +import io.smallrye.mutiny.Uni; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; +import lombok.Getter; + +import java.lang.ref.ReferenceQueue; +import java.lang.ref.SoftReference; +import java.util.HashMap; +import java.util.Optional; + +@ApplicationScoped +public class JObjectManagerImpl implements JObjectManager { + @Inject + JObjectRepository jObjectRepository; + + private static class NamedSoftReference extends SoftReference { + public NamedSoftReference(JObject target, ReferenceQueue q) { + super(target, q); + this.key = target.getName(); + } + + @Getter + final String key; + } + + private final HashMap _map = new HashMap<>(); + private final ReferenceQueue _refQueue = new ReferenceQueue<>(); + + private void cleanup() { + NamedSoftReference cur; + while ((cur = (NamedSoftReference) _refQueue.poll()) != null) { + synchronized (_map) { + if (_map.containsKey(cur.key) && (_map.get(cur.key).get() == null)) + _map.remove(cur.key); + } + } + } + + private T getFromMap(String namespace, String key, Class clazz) { + synchronized (_map) { + if (_map.containsKey(key)) { + var ref = _map.get(key).get(); + if (ref != null) { + if (!clazz.isAssignableFrom(ref.getClass())) { + Log.error("Cached object type mismatch: " + namespace + "/" + key); + _map.remove(key); + } else + return (T) ref; + } + } + } + return null; + } + + @Override + public Uni> get(String namespace, String key, Class clazz) { + cleanup(); + synchronized (_map) { + var inMap = getFromMap(namespace, key, clazz); + if (inMap != null) return Uni.createFrom().item(Optional.of(inMap)); + } + + return jObjectRepository.readJObjectChecked(namespace, key, clazz).map(read -> { + if (read.isEmpty()) + return Optional.empty(); + + synchronized (_map) { + var inMap = getFromMap(namespace, key, clazz); + if (inMap != null) return Optional.of(inMap); + _map.put(key, new NamedSoftReference(read.get(), _refQueue)); + } + + return Optional.of(read.get()); + }); + } + + @Override + public Uni put(String namespace, T object) { + cleanup(); + + synchronized (_map) { + var inMap = getFromMap(namespace, object.getName(), object.getClass()); + if (inMap != null && inMap != object) + throw new IllegalArgumentException("Trying to insert different object with same key"); + else if (inMap == null) + _map.put(object.getName(), new NamedSoftReference(object, _refQueue)); + } + + return jObjectRepository.writeJObject(namespace, object); + } + + @Override + public Uni> tryPut(String namespace, T object) { + cleanup(); + + synchronized (_map) { + var inMap = getFromMap(namespace, object.getName(), object.getClass()); + if (inMap != null) return Uni.createFrom().item(Optional.of((T) inMap)); + else + _map.put(object.getName(), new NamedSoftReference(object, _refQueue)); + } + + return jObjectRepository.writeJObject(namespace, object).map(t -> Optional.empty()); + } + +}