diff --git a/server/src/main/java/com/usatiuk/dhfs/storage/files/api/DhfsFileGrpcService.java b/server/src/main/java/com/usatiuk/dhfs/storage/files/api/DhfsFileGrpcService.java index 368f432f..6c5e21b3 100644 --- a/server/src/main/java/com/usatiuk/dhfs/storage/files/api/DhfsFileGrpcService.java +++ b/server/src/main/java/com/usatiuk/dhfs/storage/files/api/DhfsFileGrpcService.java @@ -1,26 +1,31 @@ package com.usatiuk.dhfs.storage.files.api; import io.quarkus.grpc.GrpcService; +import io.smallrye.common.annotation.Blocking; import io.smallrye.mutiny.Uni; @GrpcService public class DhfsFileGrpcService implements DhfsFilesGrpc { @Override + @Blocking public Uni findFiles(FindFilesRequest request) { return null; } @Override + @Blocking public Uni readFile(ReadFileRequest request) { return null; } @Override + @Blocking public Uni writeFile(WriteFileRequest request) { return null; } @Override + @Blocking public Uni deleteFile(DeleteFileRequest request) { return null; } diff --git a/server/src/main/java/com/usatiuk/dhfs/storage/files/service/DhfsFileService.java b/server/src/main/java/com/usatiuk/dhfs/storage/files/service/DhfsFileService.java index 26f73e02..1302896e 100644 --- a/server/src/main/java/com/usatiuk/dhfs/storage/files/service/DhfsFileService.java +++ b/server/src/main/java/com/usatiuk/dhfs/storage/files/service/DhfsFileService.java @@ -3,27 +3,26 @@ package com.usatiuk.dhfs.storage.files.service; import com.usatiuk.dhfs.storage.files.objects.Directory; import com.usatiuk.dhfs.storage.files.objects.File; import com.usatiuk.dhfs.storage.files.objects.FsNode; -import io.smallrye.mutiny.Uni; import java.util.Optional; public interface DhfsFileService { - Uni> getDirEntry(String name); - Uni> open(String name); - Uni> create(String name, long mode); - Uni> mkdir(String name, long mode); - Uni chmod(String name, long mode); - Uni rmdir(String name); - Uni unlink(String name); - Uni rename(String from, String to); - Uni setTimes(String fileUuid, long atimeMs, long mtimeMs); - Uni> readDir(String name); + Optional getDirEntry(String name); + Optional open(String name); + Optional create(String name, long mode); + Optional mkdir(String name, long mode); + Boolean chmod(String name, long mode); + Boolean rmdir(String name); + Boolean unlink(String name); + Boolean rename(String from, String to); + Boolean setTimes(String fileUuid, long atimeMs, long mtimeMs); + Iterable readDir(String name); - Uni size(File f); + Long size(File f); - Uni> read(String fileUuid, long offset, int length); - Uni write(String fileUuid, long offset, byte[] data); - Uni truncate(String fileUuid, long length); + Optional read(String fileUuid, long offset, int length); + Long write(String fileUuid, long offset, byte[] data); + Boolean truncate(String fileUuid, long length); - Uni getRoot(); + Directory getRoot(); } diff --git a/server/src/main/java/com/usatiuk/dhfs/storage/files/service/DhfsFileServiceImpl.java b/server/src/main/java/com/usatiuk/dhfs/storage/files/service/DhfsFileServiceImpl.java index c7fa921a..ec5250e6 100644 --- a/server/src/main/java/com/usatiuk/dhfs/storage/files/service/DhfsFileServiceImpl.java +++ b/server/src/main/java/com/usatiuk/dhfs/storage/files/service/DhfsFileServiceImpl.java @@ -9,7 +9,6 @@ import com.usatiuk.dhfs.storage.objects.repository.ObjectRepository; import io.quarkus.logging.Log; import io.quarkus.runtime.ShutdownEvent; import io.quarkus.runtime.StartupEvent; -import io.smallrye.mutiny.Uni; import io.vertx.mutiny.core.Vertx; import jakarta.annotation.Priority; import jakarta.enterprise.context.ApplicationScoped; @@ -35,179 +34,178 @@ public class DhfsFileServiceImpl implements DhfsFileService { void init(@Observes @Priority(500) StartupEvent event) { Log.info("Initializing file service"); - if (!objectRepository.existsObject(namespace + new UUID(0, 0)).await().indefinitely()) { - jObjectManager.put(new Directory(new UUID(0, 0), 0755)).await().indefinitely(); + if (!objectRepository.existsObject(namespace + new UUID(0, 0))) { + jObjectManager.put(new Directory(new UUID(0, 0), 0755)); } - getRoot().await().indefinitely(); + getRoot(); } void shutdown(@Observes @Priority(100) ShutdownEvent event) { Log.info("Shutdown"); } - private Uni> traverse(FsNode from, Path path) { - if (path.getNameCount() == 0) return Uni.createFrom().item(Optional.of(from)); + private Optional traverse(FsNode from, Path path) { + if (path.getNameCount() == 0) return Optional.of(from); if (!(from instanceof Directory dir)) - return Uni.createFrom().item(Optional.empty()); + return Optional.empty(); var pathFirstPart = path.getName(0).toString(); var found = dir.getKid(pathFirstPart); if (found.isEmpty()) - return Uni.createFrom().item(Optional.empty()); + return Optional.empty(); - var ref = jObjectManager.get(found.get().toString(), FsNode.class) - .await().indefinitely(); + var ref = jObjectManager.get(found.get().toString(), FsNode.class); if (ref.isEmpty()) { Log.error("File missing when traversing directory " + from.getName() + ": " + found); - return Uni.createFrom().item(Optional.empty()); + return Optional.empty(); } - if (path.getNameCount() == 1) return Uni.createFrom().item(ref); + if (path.getNameCount() == 1) return ref; return traverse(ref.get(), path.subpath(1, path.getNameCount())); } @Override - public Uni> getDirEntry(String name) { - var root = getRoot().await().indefinitely(); - var found = traverse(root, Path.of(name)).await().indefinitely(); - return Uni.createFrom().item(found); + public Optional getDirEntry(String name) { + var root = getRoot(); + var found = traverse(root, Path.of(name)); + return found; } @Override - public Uni> open(String name) { + public Optional open(String name) { // FIXME: - var root = getRoot().await().indefinitely(); - var found = traverse(root, Path.of(name)).await().indefinitely(); + var root = getRoot(); + var found = traverse(root, Path.of(name)); if (found.isEmpty()) - return Uni.createFrom().item(Optional.empty()); + return Optional.empty(); if (!(found.get() instanceof File)) - return Uni.createFrom().item(Optional.empty()); + return Optional.empty(); - return Uni.createFrom().item(Optional.of((File) found.get())); + return Optional.of((File) found.get()); } @Override - public Uni> create(String name, long mode) { + public Optional create(String name, long mode) { // FIXME: - var root = getRoot().await().indefinitely(); - var found = traverse(root, Path.of(name).getParent()).await().indefinitely(); - if (found.isEmpty()) return Uni.createFrom().item(Optional.empty()); + var root = getRoot(); + var found = traverse(root, Path.of(name).getParent()); + if (found.isEmpty()) return Optional.empty(); - if (!(found.get() instanceof Directory dir)) return Uni.createFrom().item(Optional.empty()); + if (!(found.get() instanceof Directory dir)) return Optional.empty(); var fuuid = UUID.randomUUID(); File f = new File(fuuid); f.setMode(mode); - jObjectManager.put(f).await().indefinitely(); + jObjectManager.put(f); if (!dir.putKid(Path.of(name).getFileName().toString(), fuuid)) - return Uni.createFrom().item(Optional.empty()); + return Optional.empty(); - jObjectManager.put(dir).await().indefinitely(); + jObjectManager.put(dir); - return Uni.createFrom().item(Optional.of(f)); + return Optional.of(f); } @Override - public Uni> mkdir(String name, long mode) { + public Optional mkdir(String name, long mode) { // FIXME: - var root = getRoot().await().indefinitely(); - var found = traverse(root, Path.of(name).getParent()).await().indefinitely(); - if (found.isEmpty()) return Uni.createFrom().item(Optional.empty()); + var root = getRoot(); + var found = traverse(root, Path.of(name).getParent()); + if (found.isEmpty()) return Optional.empty(); - if (!(found.get() instanceof Directory dir)) return Uni.createFrom().item(Optional.empty()); + if (!(found.get() instanceof Directory dir)) return Optional.empty(); var duuid = UUID.randomUUID(); Directory d = new Directory(duuid); d.setMode(mode); - jObjectManager.put(d).await().indefinitely(); + jObjectManager.put(d); if (!dir.putKid(Path.of(name).getFileName().toString(), duuid)) - return Uni.createFrom().item(Optional.empty()); - jObjectManager.put(dir).await().indefinitely(); + return Optional.empty(); + jObjectManager.put(dir); - return Uni.createFrom().item(Optional.of(d)); + return Optional.of(d); } - private Uni rmdent(String name) { + private Boolean rmdent(String name) { // FIXME: - var root = getRoot().await().indefinitely(); - var found = traverse(root, Path.of(name).getParent()).await().indefinitely(); - if (found.isEmpty()) return Uni.createFrom().item(false); + var root = getRoot(); + var found = traverse(root, Path.of(name).getParent()); + if (found.isEmpty()) return false; - if (!(found.get() instanceof Directory dir)) return Uni.createFrom().item(false); + if (!(found.get() instanceof Directory dir)) return false; var removed = dir.removeKid(Path.of(name).getFileName().toString()); - if (removed) jObjectManager.put(dir).await().indefinitely(); + if (removed) jObjectManager.put(dir); - return Uni.createFrom().item(removed); + return removed; } @Override - public Uni rmdir(String name) { + public Boolean rmdir(String name) { return rmdent(name); } @Override - public Uni unlink(String name) { + public Boolean unlink(String name) { return rmdent(name); } @Override - public Uni rename(String from, String to) { - var dent = getDirEntry(from).await().indefinitely(); - if (dent.isEmpty()) return Uni.createFrom().item(false); - if (!rmdent(from).await().indefinitely()) return Uni.createFrom().item(false); + public Boolean rename(String from, String to) { + var dent = getDirEntry(from); + if (dent.isEmpty()) return false; + if (!rmdent(from)) return false; // FIXME: - var root = getRoot().await().indefinitely(); - var found = traverse(root, Path.of(to).getParent()).await().indefinitely(); - if (found.isEmpty()) return Uni.createFrom().item(false); + var root = getRoot(); + var found = traverse(root, Path.of(to).getParent()); + if (found.isEmpty()) return false; - if (!(found.get() instanceof Directory dir)) return Uni.createFrom().item(false); + if (!(found.get() instanceof Directory dir)) return false; if (!dir.putKid(Path.of(to).getFileName().toString(), dent.get().getUuid())) - return Uni.createFrom().item(false); - jObjectManager.put(dir).await().indefinitely(); + return false; + jObjectManager.put(dir); - return Uni.createFrom().item(true); + return true; } @Override - public Uni chmod(String name, long mode) { - var dent = getDirEntry(name).await().indefinitely(); - if (dent.isEmpty()) return Uni.createFrom().item(false); + public Boolean chmod(String name, long mode) { + var dent = getDirEntry(name); + if (dent.isEmpty()) return false; dent.get().setMode(mode); - jObjectManager.put(dent.get()).await().indefinitely(); + jObjectManager.put(dent.get()); - return Uni.createFrom().item(true); + return true; } @Override - public Uni> readDir(String name) { - var root = getRoot().await().indefinitely(); - var found = traverse(root, Path.of(name)).await().indefinitely(); + public Iterable readDir(String name) { + var root = getRoot(); + var found = traverse(root, Path.of(name)); if (found.isEmpty()) throw new IllegalArgumentException(); if (!(found.get() instanceof Directory foundDir)) throw new IllegalArgumentException(); - return Uni.createFrom().item(foundDir.getChildrenList()); + return foundDir.getChildrenList(); } @Override - public Uni> read(String fileUuid, long offset, int length) { - var fileOpt = jObjectManager.get(fileUuid, File.class).await().indefinitely(); + public Optional read(String fileUuid, long offset, int length) { + var fileOpt = jObjectManager.get(fileUuid, File.class); if (fileOpt.isEmpty()) { Log.error("File not found when trying to read: " + fileUuid); - return Uni.createFrom().item(Optional.empty()); + return Optional.empty(); } var file = fileOpt.get(); @@ -221,7 +219,7 @@ public class DhfsFileServiceImpl implements DhfsFileService { }); } catch (Exception e) { Log.error("Error reading file: " + fileUuid, e); - return Uni.createFrom().item(Optional.empty()); + return Optional.empty(); } var chunks = chunksList.get().iterator(); @@ -238,11 +236,11 @@ public class DhfsFileServiceImpl implements DhfsFileService { long toReadInChunk = (offset + length) - curPos; var chunkUuid = chunk.getValue(); - var chunkRead = jObjectManager.get(chunkUuid, Chunk.class).await().indefinitely(); + var chunkRead = jObjectManager.get(chunkUuid, Chunk.class); if (chunkRead.isEmpty()) { Log.error("Chunk requested not found: " + chunkUuid); - return Uni.createFrom().item(Optional.empty()); + return Optional.empty(); } var chunkBytes = chunkRead.get().getBytes(); @@ -264,15 +262,15 @@ public class DhfsFileServiceImpl implements DhfsFileService { } // FIXME: - return Uni.createFrom().item(Optional.of(Arrays.copyOf(buf.array(), (int) (curPos - offset)))); + return Optional.of(Arrays.copyOf(buf.array(), (int) (curPos - offset))); } @Override - public Uni write(String fileUuid, long offset, byte[] data) { - var fileOpt = jObjectManager.get(fileUuid, File.class).await().indefinitely(); + public Long write(String fileUuid, long offset, byte[] data) { + var fileOpt = jObjectManager.get(fileUuid, File.class); if (fileOpt.isEmpty()) { Log.error("File not found when trying to read: " + fileUuid); - return Uni.createFrom().item(-1L); + return -1L; } var file = fileOpt.get(); @@ -286,7 +284,7 @@ public class DhfsFileServiceImpl implements DhfsFileService { }); } catch (Exception e) { Log.error("Error reading file: " + fileUuid, e); - return Uni.createFrom().item(-1L); + return -1L; } var chunksAll = chunksAllRef.get(); @@ -301,33 +299,33 @@ public class DhfsFileServiceImpl implements DhfsFileService { if (first != null && first.getKey() < offset) { var chunkUuid = first.getValue(); - var chunkRead = jObjectManager.get(chunkUuid, Chunk.class).await().indefinitely(); + var chunkRead = jObjectManager.get(chunkUuid, Chunk.class); if (chunkRead.isEmpty()) { Log.error("Chunk requested not found: " + chunkUuid); - return Uni.createFrom().item(-1L); + return -1L; } var chunkBytes = chunkRead.get().getBytes(); Chunk newChunk = new Chunk(Arrays.copyOfRange(chunkBytes, 0, (int) (offset - first.getKey()))); - jObjectManager.put(newChunk).await().indefinitely(); + jObjectManager.put(newChunk); newChunks.put(first.getKey(), newChunk.getHash()); } { Chunk newChunk = new Chunk(data); - jObjectManager.put(newChunk).await().indefinitely(); + jObjectManager.put(newChunk); newChunks.put(offset, newChunk.getHash()); } if (last != null) { var lchunkUuid = last.getValue(); - var lchunkRead = jObjectManager.get(lchunkUuid, Chunk.class).await().indefinitely(); + var lchunkRead = jObjectManager.get(lchunkUuid, Chunk.class); if (lchunkRead.isEmpty()) { Log.error("Chunk requested not found: " + lchunkUuid); - return Uni.createFrom().item(-1L); + return -1L; } var lchunkBytes = lchunkRead.get().getBytes(); @@ -336,7 +334,7 @@ public class DhfsFileServiceImpl implements DhfsFileService { var startInFile = offset + data.length; var startInChunk = startInFile - last.getKey(); Chunk newChunk = new Chunk(Arrays.copyOfRange(lchunkBytes, (int) startInChunk, lchunkBytes.length)); - jObjectManager.put(newChunk).await().indefinitely(); + jObjectManager.put(newChunk); newChunks.put(startInFile, newChunk.getHash()); } @@ -351,20 +349,20 @@ public class DhfsFileServiceImpl implements DhfsFileService { }); } catch (Exception e) { Log.error("Error writing file chunks: " + fileUuid, e); - return Uni.createFrom().item(-1L); + return -1L; } - jObjectManager.put(file).await().indefinitely(); + jObjectManager.put(file); - return Uni.createFrom().item((long) data.length); + return (long) data.length; } @Override - public Uni truncate(String fileUuid, long length) { - var fileOpt = jObjectManager.get(fileUuid, File.class).await().indefinitely(); + public Boolean truncate(String fileUuid, long length) { + var fileOpt = jObjectManager.get(fileUuid, File.class); if (fileOpt.isEmpty()) { Log.error("File not found when trying to read: " + fileUuid); - return Uni.createFrom().item(false); + return false; } var file = fileOpt.get(); @@ -377,10 +375,10 @@ public class DhfsFileServiceImpl implements DhfsFileService { }); } catch (Exception e) { Log.error("Error writing file chunks: " + fileUuid, e); - return Uni.createFrom().item(false); + return false; } - jObjectManager.put(file).await().indefinitely(); - return Uni.createFrom().item(true); + jObjectManager.put(file); + return true; } AtomicReference> chunksAllRef = new AtomicReference<>(); @@ -392,7 +390,7 @@ public class DhfsFileServiceImpl implements DhfsFileService { }); } catch (Exception e) { Log.error("Error reading file: " + fileUuid, e); - return Uni.createFrom().item(false); + return false; } var chunksAll = chunksAllRef.get(); @@ -403,11 +401,11 @@ public class DhfsFileServiceImpl implements DhfsFileService { if (lastChunk != null) { var chunkUuid = lastChunk.getValue(); - var chunkRead = jObjectManager.get(chunkUuid, Chunk.class).await().indefinitely(); + var chunkRead = jObjectManager.get(chunkUuid, Chunk.class); if (chunkRead.isEmpty()) { Log.error("Chunk requested not found: " + chunkUuid); - return Uni.createFrom().item(false); + return false; } var chunkBytes = chunkRead.get().getBytes(); @@ -415,7 +413,7 @@ public class DhfsFileServiceImpl implements DhfsFileService { if (lastChunk.getKey() + chunkBytes.length > 0) { int start = (int) (length - lastChunk.getKey()); Chunk newChunk = new Chunk(Arrays.copyOfRange(chunkBytes, 0, (int) (length - start))); - jObjectManager.put(newChunk).await().indefinitely(); + jObjectManager.put(newChunk); newChunks.put(lastChunk.getKey(), newChunk.getHash()); } @@ -430,20 +428,20 @@ public class DhfsFileServiceImpl implements DhfsFileService { }); } catch (Exception e) { Log.error("Error writing file chunks: " + fileUuid, e); - return Uni.createFrom().item(false); + return false; } - jObjectManager.put(file).await().indefinitely(); + jObjectManager.put(file); - return Uni.createFrom().item(true); + return true; } @Override - public Uni setTimes(String fileUuid, long atimeMs, long mtimeMs) { - var fileOpt = jObjectManager.get(fileUuid, File.class).await().indefinitely(); + public Boolean setTimes(String fileUuid, long atimeMs, long mtimeMs) { + var fileOpt = jObjectManager.get(fileUuid, File.class); if (fileOpt.isEmpty()) { Log.error("File not found when trying to read: " + fileUuid); - return Uni.createFrom().item(false); + return false; } var file = fileOpt.get(); @@ -454,16 +452,16 @@ public class DhfsFileServiceImpl implements DhfsFileService { }); } catch (Exception e) { Log.error("Error writing file chunks: " + fileUuid, e); - return Uni.createFrom().item(false); + return false; } - jObjectManager.put(file).await().indefinitely(); + jObjectManager.put(file); - return Uni.createFrom().item(true); + return true; } @Override - public Uni size(File f) { + public Long size(File f) { int size = 0; //FIXME: AtomicReference> chunksAllRef = new AtomicReference<>(); @@ -475,32 +473,32 @@ public class DhfsFileServiceImpl implements DhfsFileService { }); } catch (Exception e) { Log.error("Error reading file: " + f.getUuid(), e); - return Uni.createFrom().item(-1L); + return -1L; } var chunksAll = chunksAllRef.get(); for (var chunk : chunksAll.entrySet()) { var chunkUuid = chunk.getValue(); - var chunkRead = jObjectManager.get(chunkUuid, Chunk.class).await().indefinitely(); + var chunkRead = jObjectManager.get(chunkUuid, Chunk.class); if (chunkRead.isEmpty()) { Log.error("Chunk requested not found: " + chunkUuid); - return Uni.createFrom().item(-1L); + return -1L; } var chunkBytes = chunkRead.get().getBytes(); size += chunkBytes.length; } - return Uni.createFrom().item((long) size); + return (long) size; } @Override - public Uni getRoot() { - var read = jObjectManager.get(new UUID(0, 0).toString(), FsNode.class).await().indefinitely(); + public Directory getRoot() { + var read = jObjectManager.get(new UUID(0, 0).toString(), FsNode.class); if (read.isEmpty() || !(read.get() instanceof Directory)) { Log.error("Root directory not found"); } - return Uni.createFrom().item((Directory) read.get()); + return (Directory) read.get(); } } diff --git a/server/src/main/java/com/usatiuk/dhfs/storage/fuse/DhfsFuse.java b/server/src/main/java/com/usatiuk/dhfs/storage/fuse/DhfsFuse.java index 9c828587..f30dd147 100644 --- a/server/src/main/java/com/usatiuk/dhfs/storage/fuse/DhfsFuse.java +++ b/server/src/main/java/com/usatiuk/dhfs/storage/fuse/DhfsFuse.java @@ -68,14 +68,14 @@ public class DhfsFuse extends FuseStubFS { @Override public int getattr(String path, FileStat stat) { try { - Optional found = fileService.getDirEntry(path).await().indefinitely(); + Optional found = fileService.getDirEntry(path); if (found.isEmpty()) { return -ErrorCodes.ENOENT(); } if (found.get() instanceof File f) { stat.st_mode.set(S_IFREG | f.getMode()); stat.st_nlink.set(1); - stat.st_size.set(fileService.size(f).await().indefinitely()); + stat.st_size.set(fileService.size(f)); } else if (found.get() instanceof Directory d) { stat.st_mode.set(S_IFDIR | d.getMode()); stat.st_nlink.set(2); @@ -101,12 +101,12 @@ public class DhfsFuse extends FuseStubFS { @Override public int utimens(String path, Timespec[] timespec) { try { - var fileOpt = fileService.open(path).await().indefinitely(); + var fileOpt = fileService.open(path); if (fileOpt.isEmpty()) return -ErrorCodes.ENOENT(); var file = fileOpt.get(); var res = fileService.setTimes(file.getUuid().toString(), timespec[0].tv_sec.get() * 1000, - timespec[1].tv_sec.get() * 1000).await().indefinitely(); + timespec[1].tv_sec.get() * 1000); if (!res) return -ErrorCodes.EINVAL(); else return 0; } catch (Exception e) { @@ -117,17 +117,17 @@ public class DhfsFuse extends FuseStubFS { @Override public int open(String path, FuseFileInfo fi) { - if (fileService.open(path).await().indefinitely().isEmpty()) return -ErrorCodes.ENOENT(); + if (fileService.open(path).isEmpty()) return -ErrorCodes.ENOENT(); return 0; } @Override public int read(String path, Pointer buf, long size, long offset, FuseFileInfo fi) { try { - var fileOpt = fileService.open(path).await().indefinitely(); + var fileOpt = fileService.open(path); if (fileOpt.isEmpty()) return -ErrorCodes.ENOENT(); var file = fileOpt.get(); - var read = fileService.read(file.getUuid().toString(), offset, (int) size).await().indefinitely(); + var read = fileService.read(file.getUuid().toString(), offset, (int) size); if (read.isEmpty()) return 0; buf.put(0, read.get(), 0, read.get().length); return read.get().length; @@ -140,12 +140,12 @@ public class DhfsFuse extends FuseStubFS { @Override public int write(String path, Pointer buf, long size, long offset, FuseFileInfo fi) { try { - var fileOpt = fileService.open(path).await().indefinitely(); + var fileOpt = fileService.open(path); if (fileOpt.isEmpty()) return -ErrorCodes.ENOENT(); var file = fileOpt.get(); byte[] buffer = new byte[(int) size]; buf.get(0, buffer, 0, (int) size); - var written = fileService.write(file.getUuid().toString(), offset, buffer).await().indefinitely(); + var written = fileService.write(file.getUuid().toString(), offset, buffer); return written.intValue(); } catch (Exception e) { Log.error("When writing " + path, e); @@ -155,45 +155,45 @@ public class DhfsFuse extends FuseStubFS { @Override public int create(String path, long mode, FuseFileInfo fi) { - var ret = fileService.create(path, mode).await().indefinitely(); + var ret = fileService.create(path, mode); if (ret.isEmpty()) return -ErrorCodes.ENOSPC(); else return 0; } @Override public int mkdir(String path, long mode) { - var ret = fileService.mkdir(path, mode).await().indefinitely(); + var ret = fileService.mkdir(path, mode); if (ret.isEmpty()) return -ErrorCodes.ENOSPC(); else return 0; } @Override public int rmdir(String path) { - var ret = fileService.rmdir(path).await().indefinitely(); + var ret = fileService.rmdir(path); if (!ret) return -ErrorCodes.ENOENT(); else return 0; } @Override public int rename(String path, String newName) { - var ret = fileService.rename(path, newName).await().indefinitely(); + var ret = fileService.rename(path, newName); if (!ret) return -ErrorCodes.ENOENT(); else return 0; } @Override public int unlink(String path) { - var ret = fileService.unlink(path).await().indefinitely(); + var ret = fileService.unlink(path); if (!ret) return -ErrorCodes.ENOENT(); else return 0; } @Override public int truncate(String path, long size) { - var fileOpt = fileService.open(path).await().indefinitely(); + var fileOpt = fileService.open(path); if (fileOpt.isEmpty()) return -ErrorCodes.ENOENT(); var file = fileOpt.get(); - var ok = fileService.truncate(file.getUuid().toString(), size).await().indefinitely(); + var ok = fileService.truncate(file.getUuid().toString(), size); if (ok) return 0; else @@ -202,7 +202,7 @@ public class DhfsFuse extends FuseStubFS { @Override public int chmod(String path, long mode) { - var ret = fileService.chmod(path, mode).await().indefinitely(); + var ret = fileService.chmod(path, mode); if (ret) return 0; else return -ErrorCodes.EINVAL(); } @@ -211,7 +211,7 @@ public class DhfsFuse extends FuseStubFS { public int readdir(String path, Pointer buf, FuseFillDir filler, long offset, FuseFileInfo fi) { Iterable found; try { - found = fileService.readDir(path).await().indefinitely(); + found = fileService.readDir(path); } catch (Exception e) { return -ErrorCodes.ENOENT(); } diff --git a/server/src/main/java/com/usatiuk/dhfs/storage/objects/api/DhfsObjectGrpcService.java b/server/src/main/java/com/usatiuk/dhfs/storage/objects/api/DhfsObjectGrpcService.java index da654191..16b9df40 100644 --- a/server/src/main/java/com/usatiuk/dhfs/storage/objects/api/DhfsObjectGrpcService.java +++ b/server/src/main/java/com/usatiuk/dhfs/storage/objects/api/DhfsObjectGrpcService.java @@ -3,7 +3,7 @@ package com.usatiuk.dhfs.storage.objects.api; import com.google.protobuf.ByteString; import com.usatiuk.dhfs.storage.objects.repository.ObjectRepository; import io.quarkus.grpc.GrpcService; -import io.smallrye.common.annotation.RunOnVirtualThread; +import io.smallrye.common.annotation.Blocking; import io.smallrye.mutiny.Uni; import jakarta.inject.Inject; @@ -13,29 +13,32 @@ public class DhfsObjectGrpcService implements DhfsObjectGrpc { ObjectRepository objectRepository; @Override + @Blocking public Uni findObjects(FindObjectsRequest request) { - return objectRepository.findObjects(request.getPrefix()) - .map(m -> FindObjectsReply.FindObjectsEntry.newBuilder().setName(m).build()) - .collect().in(FindObjectsReply::newBuilder, FindObjectsReply.Builder::addFound) - .map(FindObjectsReply.Builder::build); + var objects = objectRepository.findObjects(request.getPrefix()); + var builder = FindObjectsReply.newBuilder(); + for (var obj : objects) { + builder.addFound(FindObjectsReply.FindObjectsEntry.newBuilder().setName(obj).build()); + } + return Uni.createFrom().item(builder.build()); } @Override - @RunOnVirtualThread + @Blocking public Uni readObject(ReadObjectRequest request) { var read = objectRepository.readObject(request.getName()); return Uni.createFrom().item(ReadObjectReply.newBuilder().setData(ByteString.copyFrom(read)).build()); } @Override - @RunOnVirtualThread + @Blocking public Uni writeObject(WriteObjectRequest request) { objectRepository.writeObject(request.getName(), request.getData().toByteArray(), false); return Uni.createFrom().item(WriteObjectReply.newBuilder().build()); } @Override - @RunOnVirtualThread + @Blocking public Uni deleteObject(DeleteObjectRequest request) { objectRepository.deleteObject(request.getName()); return Uni.createFrom().item(DeleteObjectReply.newBuilder().build()); diff --git a/server/src/main/java/com/usatiuk/dhfs/storage/objects/jrepository/JObjectManager.java b/server/src/main/java/com/usatiuk/dhfs/storage/objects/jrepository/JObjectManager.java index 136cc621..d22798ea 100644 --- a/server/src/main/java/com/usatiuk/dhfs/storage/objects/jrepository/JObjectManager.java +++ b/server/src/main/java/com/usatiuk/dhfs/storage/objects/jrepository/JObjectManager.java @@ -5,7 +5,7 @@ import io.smallrye.mutiny.Uni; import java.util.Optional; public interface JObjectManager { - Uni> get(String name, Class clazz); - Uni put(T object); + Optional get(String name, Class clazz); + void put(T object); void invalidateJObject(String name); } diff --git a/server/src/main/java/com/usatiuk/dhfs/storage/objects/jrepository/JObjectManagerImpl.java b/server/src/main/java/com/usatiuk/dhfs/storage/objects/jrepository/JObjectManagerImpl.java index 0ee34e9b..395bceff 100644 --- a/server/src/main/java/com/usatiuk/dhfs/storage/objects/jrepository/JObjectManagerImpl.java +++ b/server/src/main/java/com/usatiuk/dhfs/storage/objects/jrepository/JObjectManagerImpl.java @@ -1,7 +1,6 @@ package com.usatiuk.dhfs.storage.objects.jrepository; import io.quarkus.logging.Log; -import io.smallrye.mutiny.Uni; import jakarta.enterprise.context.ApplicationScoped; import jakarta.inject.Inject; import lombok.Getter; @@ -56,29 +55,29 @@ public class JObjectManagerImpl implements JObjectManager { } @Override - public Uni> get(String name, Class clazz) { + public Optional get(String name, Class clazz) { cleanup(); synchronized (_map) { var inMap = getFromMap(name, clazz); - if (inMap != null) return Uni.createFrom().item(Optional.of(inMap)); + if (inMap != null) return Optional.of(inMap); } var read = jObjectRepository.readJObjectChecked(name, clazz); if (read.isEmpty()) - return Uni.createFrom().item(Optional.empty()); + return Optional.empty(); synchronized (_map) { var inMap = getFromMap(name, clazz); - if (inMap != null) return Uni.createFrom().item(Optional.of(inMap)); + if (inMap != null) return Optional.of(inMap); _map.put(name, new NamedSoftReference(read.get(), _refQueue)); } - return Uni.createFrom().item(Optional.of(read.get())); + return Optional.of(read.get()); } @Override - public Uni put(T object) { + public void put(T object) { cleanup(); synchronized (_map) { @@ -90,7 +89,6 @@ public class JObjectManagerImpl implements JObjectManager { } jObjectRepository.writeJObject(object); - return Uni.createFrom().voidItem(); } @Override diff --git a/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/ObjectRepository.java b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/ObjectRepository.java index e4894de2..d902a47c 100644 --- a/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/ObjectRepository.java +++ b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/ObjectRepository.java @@ -4,12 +4,13 @@ import io.smallrye.mutiny.Multi; import io.smallrye.mutiny.Uni; import javax.annotation.Nonnull; +import java.util.List; public interface ObjectRepository { @Nonnull - Multi findObjects(String prefix); + List findObjects(String prefix); @Nonnull - Uni existsObject(String name); + Boolean existsObject(String name); @Nonnull byte[] readObject(String name); diff --git a/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/SimplePersistentObjectRepository.java b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/SimplePersistentObjectRepository.java index f5639ab9..ab0c5a07 100644 --- a/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/SimplePersistentObjectRepository.java +++ b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/SimplePersistentObjectRepository.java @@ -6,6 +6,7 @@ import io.smallrye.mutiny.Uni; import jakarta.inject.Inject; import javax.annotation.Nonnull; +import java.util.List; //@ApplicationScoped public class SimplePersistentObjectRepository implements ObjectRepository { @@ -14,31 +15,31 @@ public class SimplePersistentObjectRepository implements ObjectRepository { @Nonnull @Override - public Multi findObjects(String prefix) { + public List findObjects(String prefix) { return objectPersistentStore.findObjects(prefix); } @Nonnull @Override - public Uni existsObject(String name) { + public Boolean existsObject(String name) { return objectPersistentStore.existsObject(name); } @Nonnull @Override public byte[] readObject(String name) { - return objectPersistentStore.readObject(name).await().indefinitely(); + return objectPersistentStore.readObject(name); } @Nonnull @Override public void writeObject(String name, byte[] data, Boolean canIgnoreConflict) { - objectPersistentStore.writeObject(name, data).await().indefinitely(); + objectPersistentStore.writeObject(name, data); } @Nonnull @Override public void deleteObject(String name) { - objectPersistentStore.deleteObject(name).await().indefinitely(); + objectPersistentStore.deleteObject(name); } } diff --git a/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/DistributedObjectRepository.java b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/DistributedObjectRepository.java index f14af9b9..0a759a26 100644 --- a/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/DistributedObjectRepository.java +++ b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/DistributedObjectRepository.java @@ -6,8 +6,6 @@ import com.usatiuk.dhfs.storage.objects.repository.persistence.ObjectPersistentS import io.quarkus.logging.Log; import io.quarkus.runtime.ShutdownEvent; import io.quarkus.runtime.StartupEvent; -import io.smallrye.mutiny.Multi; -import io.smallrye.mutiny.Uni; import io.vertx.mutiny.core.Vertx; import jakarta.annotation.Priority; import jakarta.enterprise.context.ApplicationScoped; @@ -18,6 +16,7 @@ import org.eclipse.microprofile.config.inject.ConfigProperty; import javax.annotation.Nonnull; import java.io.IOException; +import java.util.List; import java.util.Optional; @ApplicationScoped @@ -50,7 +49,7 @@ public class DistributedObjectRepository implements ObjectRepository { syncHandler.handleRemoteUpdate( IndexUpdatePush.newBuilder().setSelfname(selfname).setName(h.getName()) .setAssumeUnique(h.getAssumeUnique()) - .setMtime(h.getMtime()).setPrevMtime(prevMtime).build()).await().indefinitely(); + .setMtime(h.getMtime()).setPrevMtime(prevMtime).build()); } Log.info("Sync complete"); } catch (Exception e) { @@ -64,14 +63,14 @@ public class DistributedObjectRepository implements ObjectRepository { @Nonnull @Override - public Multi findObjects(String prefix) { + public List findObjects(String prefix) { throw new NotImplementedException(); } @Nonnull @Override - public Uni existsObject(String name) { - return Uni.createFrom().item(objectIndexService.exists(name)); + public Boolean existsObject(String name) { + return objectIndexService.exists(name); } @Nonnull @@ -86,18 +85,17 @@ public class DistributedObjectRepository implements ObjectRepository { var info = infoOpt.get(); Optional read = info.runReadLocked(() -> { - if (objectPersistentStore.existsObject(name).await().indefinitely()) - return Optional.of(objectPersistentStore.readObject(name).await().indefinitely()); + if (objectPersistentStore.existsObject(name)) + return Optional.of(objectPersistentStore.readObject(name)); return Optional.empty(); }); if (read.isPresent()) return read.get(); // Race? return info.runWriteLocked(() -> { - return remoteObjectServiceClient.getObject(name).map(got -> { - objectPersistentStore.writeObject(name, got); - return got; - }).await().indefinitely(); + var obj = remoteObjectServiceClient.getObject(name); + objectPersistentStore.writeObject(name, obj); + return obj; }); } @@ -107,7 +105,7 @@ public class DistributedObjectRepository implements ObjectRepository { var info = objectIndexService.getOrCreateMeta(name, canIgnoreConflict); info.runWriteLocked(() -> { - objectPersistentStore.writeObject(name, data).await().indefinitely(); + objectPersistentStore.writeObject(name, data); var prevMtime = info.getMtime(); info.setMtime(System.currentTimeMillis()); try { diff --git a/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/RemoteObjectServiceClient.java b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/RemoteObjectServiceClient.java index 50c6321f..cba38f8a 100644 --- a/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/RemoteObjectServiceClient.java +++ b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/RemoteObjectServiceClient.java @@ -24,7 +24,7 @@ public class RemoteObjectServiceClient { @Inject RemoteHostManager remoteHostManager; - public Uni getObject(String name) { + public byte[] getObject(String name) { return remoteHostManager.withClient(client -> { var req = GetObjectRequest.newBuilder().setName(name).build(); var reply = client.getObject(req); @@ -37,7 +37,7 @@ public class RemoteObjectServiceClient { throw new NotImplementedException(); } } - return Uni.createFrom().item(reply.getObject().getContent().toByteArray()); + return reply.getObject().getContent().toByteArray(); }); } diff --git a/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/RemoteObjectServiceServer.java b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/RemoteObjectServiceServer.java index e2591aa3..b750f778 100644 --- a/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/RemoteObjectServiceServer.java +++ b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/RemoteObjectServiceServer.java @@ -34,8 +34,8 @@ public class RemoteObjectServiceServer implements DhfsObjectSyncGrpc { if (metaOpt.isEmpty()) throw new StatusRuntimeException(Status.NOT_FOUND); var meta = metaOpt.get(); Optional> read = meta.runReadLocked(() -> { - if (objectPersistentStore.existsObject(request.getName()).await().indefinitely()) - return Optional.of(Pair.of(meta.getMtime(), objectPersistentStore.readObject(request.getName()).await().indefinitely())); + if (objectPersistentStore.existsObject(request.getName())) + return Optional.of(Pair.of(meta.getMtime(), objectPersistentStore.readObject(request.getName()))); return Optional.empty(); }); if (read.isEmpty()) throw new StatusRuntimeException(Status.NOT_FOUND); @@ -61,6 +61,6 @@ public class RemoteObjectServiceServer implements DhfsObjectSyncGrpc { @Blocking public Uni indexUpdate(IndexUpdatePush request) { Log.info("<-- indexUpdate: " + request.getName() + " from: " + request.getPrevMtime() + " to: " + request.getMtime()); - return syncHandler.handleRemoteUpdate(request); + return Uni.createFrom().item(syncHandler.handleRemoteUpdate(request)); } } diff --git a/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/SyncHandler.java b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/SyncHandler.java index 6df72f3f..9cf415ba 100644 --- a/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/SyncHandler.java +++ b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/SyncHandler.java @@ -21,7 +21,7 @@ public class SyncHandler { @Inject JObjectManager jObjectManager; - public Uni handleRemoteUpdate(IndexUpdatePush request) { + public IndexUpdateReply handleRemoteUpdate(IndexUpdatePush request) { var metaOpt = objectIndexService.getOrCreateMeta(request.getName(), request.getAssumeUnique()); metaOpt.runWriteLocked(() -> { if (metaOpt.getMtime() == request.getMtime()) { @@ -43,7 +43,7 @@ public class SyncHandler { metaOpt._remoteCopies.add(request.getSelfname()); try { - objectPersistentStore.deleteObject(request.getName()).await().indefinitely(); + objectPersistentStore.deleteObject(request.getName()); } catch (Exception ignored) { } @@ -52,7 +52,7 @@ public class SyncHandler { return null; }); - return Uni.createFrom().item(IndexUpdateReply.newBuilder().build()); + return IndexUpdateReply.newBuilder().build(); } } diff --git a/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/persistence/FileObjectPersistentStore.java b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/persistence/FileObjectPersistentStore.java index c622047e..e6bf8300 100644 --- a/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/persistence/FileObjectPersistentStore.java +++ b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/persistence/FileObjectPersistentStore.java @@ -6,7 +6,6 @@ import io.quarkus.logging.Log; import io.quarkus.runtime.ShutdownEvent; import io.quarkus.runtime.StartupEvent; import io.smallrye.mutiny.Multi; -import io.smallrye.mutiny.Uni; import io.vertx.mutiny.core.Vertx; import io.vertx.mutiny.core.buffer.Buffer; import jakarta.annotation.Priority; @@ -18,6 +17,7 @@ import org.eclipse.microprofile.config.inject.ConfigProperty; import javax.annotation.Nonnull; import java.nio.file.Path; import java.nio.file.Paths; +import java.util.List; @ApplicationScoped public class FileObjectPersistentStore implements ObjectPersistentStore { @@ -38,7 +38,7 @@ public class FileObjectPersistentStore implements ObjectPersistentStore { @Nonnull @Override - public Multi findObjects(String prefix) { + public List findObjects(String prefix) { Path nsRoot = Paths.get(root); if (!nsRoot.toFile().isDirectory()) @@ -47,51 +47,48 @@ public class FileObjectPersistentStore implements ObjectPersistentStore { return vertx.fileSystem().readDir(nsRoot.toString()).onItem() .transformToMulti(v -> Multi.createFrom().iterable(v)) .select().where(n -> n.startsWith(prefix)) - .map(f -> nsRoot.relativize(Paths.get(f)).toString()); + .map(f -> nsRoot.relativize(Paths.get(f)).toString()).collect().asList().await().indefinitely(); } @Nonnull @Override - public Uni existsObject(String name) { + public Boolean existsObject(String name) { Path obj = Paths.get(root, name); - if (!obj.toFile().isFile()) - return Uni.createFrom().item(false); - - return Uni.createFrom().item(true); + return obj.toFile().isFile(); } @Nonnull @Override - public Uni readObject(String name) { + public byte[] readObject(String name) { var file = Path.of(root, name); if (!file.toFile().exists()) throw new StatusRuntimeException(Status.NOT_FOUND); - return vertx.fileSystem().readFile(file.toString()).map(Buffer::getBytes); + return vertx.fileSystem().readFile(file.toString()).map(Buffer::getBytes).await().indefinitely(); } @Nonnull @Override - public Uni writeObject(String name, byte[] data) { + public Void writeObject(String name, byte[] data) { var file = Path.of(root, name); if (!Paths.get(root).toFile().isDirectory() && !Paths.get(root).toFile().mkdirs()) throw new StatusRuntimeException(Status.INTERNAL); - return vertx.fileSystem().writeFile(file.toString(), Buffer.buffer(data)); + return vertx.fileSystem().writeFile(file.toString(), Buffer.buffer(data)).await().indefinitely(); } @Nonnull @Override - public Uni deleteObject(String name) { + public Void deleteObject(String name) { var file = Path.of(root, name); if (!file.toFile().exists()) throw new StatusRuntimeException(Status.NOT_FOUND); - return vertx.fileSystem().delete(file.toString()); + return vertx.fileSystem().delete(file.toString()).await().indefinitely(); } } diff --git a/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/persistence/ObjectPersistentStore.java b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/persistence/ObjectPersistentStore.java index ca07315a..356318fd 100644 --- a/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/persistence/ObjectPersistentStore.java +++ b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/persistence/ObjectPersistentStore.java @@ -4,17 +4,18 @@ import io.smallrye.mutiny.Multi; import io.smallrye.mutiny.Uni; import javax.annotation.Nonnull; +import java.util.List; public interface ObjectPersistentStore { @Nonnull - Multi findObjects(String prefix); + List findObjects(String prefix); @Nonnull - Uni existsObject(String name); + Boolean existsObject(String name); @Nonnull - Uni readObject(String name); + byte[] readObject(String name); @Nonnull - Uni writeObject(String name, byte[] data); + Void writeObject(String name, byte[] data); @Nonnull - Uni deleteObject(String name); + Void deleteObject(String name); } diff --git a/server/src/test/java/com/usatiuk/dhfs/files/DhfsFileServiceSimpleTest.java b/server/src/test/java/com/usatiuk/dhfs/files/DhfsFileServiceSimpleTest.java index eee0f9f4..5137dfe0 100644 --- a/server/src/test/java/com/usatiuk/dhfs/files/DhfsFileServiceSimpleTest.java +++ b/server/src/test/java/com/usatiuk/dhfs/files/DhfsFileServiceSimpleTest.java @@ -59,7 +59,7 @@ public class DhfsFileServiceSimpleTest { for (int start = 0; start < all.length(); start++) { for (int end = start; end <= all.length(); end++) { var read = fileService.read(fuuid.toString(), start, end - start); - Assertions.assertArrayEquals(all.substring(start, end).getBytes(), read.await().indefinitely().get()); + Assertions.assertArrayEquals(all.substring(start, end).getBytes(), read.get()); } } }