mirror of
https://github.com/usatiuk/dhfs.git
synced 2025-10-28 12:37:48 +01:00
remove uni/mutiny
Effectively, I ended up not writing reactive code anyways (FUSE has its own threads...) and if needed in future I guess it shouldn't be hard to use virtual threads
This commit is contained in:
@@ -1,26 +1,31 @@
|
||||
package com.usatiuk.dhfs.storage.files.api;
|
||||
|
||||
import io.quarkus.grpc.GrpcService;
|
||||
import io.smallrye.common.annotation.Blocking;
|
||||
import io.smallrye.mutiny.Uni;
|
||||
|
||||
@GrpcService
|
||||
public class DhfsFileGrpcService implements DhfsFilesGrpc {
|
||||
@Override
|
||||
@Blocking
|
||||
public Uni<FindFilesReply> findFiles(FindFilesRequest request) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
@Blocking
|
||||
public Uni<ReadFileReply> readFile(ReadFileRequest request) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
@Blocking
|
||||
public Uni<WriteFileReply> writeFile(WriteFileRequest request) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
@Blocking
|
||||
public Uni<DeleteFileReply> deleteFile(DeleteFileRequest request) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@@ -3,27 +3,26 @@ package com.usatiuk.dhfs.storage.files.service;
|
||||
import com.usatiuk.dhfs.storage.files.objects.Directory;
|
||||
import com.usatiuk.dhfs.storage.files.objects.File;
|
||||
import com.usatiuk.dhfs.storage.files.objects.FsNode;
|
||||
import io.smallrye.mutiny.Uni;
|
||||
|
||||
import java.util.Optional;
|
||||
|
||||
public interface DhfsFileService {
|
||||
Uni<Optional<FsNode>> getDirEntry(String name);
|
||||
Uni<Optional<File>> open(String name);
|
||||
Uni<Optional<File>> create(String name, long mode);
|
||||
Uni<Optional<Directory>> mkdir(String name, long mode);
|
||||
Uni<Boolean> chmod(String name, long mode);
|
||||
Uni<Boolean> rmdir(String name);
|
||||
Uni<Boolean> unlink(String name);
|
||||
Uni<Boolean> rename(String from, String to);
|
||||
Uni<Boolean> setTimes(String fileUuid, long atimeMs, long mtimeMs);
|
||||
Uni<Iterable<String>> readDir(String name);
|
||||
Optional<FsNode> getDirEntry(String name);
|
||||
Optional<File> open(String name);
|
||||
Optional<File> create(String name, long mode);
|
||||
Optional<Directory> mkdir(String name, long mode);
|
||||
Boolean chmod(String name, long mode);
|
||||
Boolean rmdir(String name);
|
||||
Boolean unlink(String name);
|
||||
Boolean rename(String from, String to);
|
||||
Boolean setTimes(String fileUuid, long atimeMs, long mtimeMs);
|
||||
Iterable<String> readDir(String name);
|
||||
|
||||
Uni<Long> size(File f);
|
||||
Long size(File f);
|
||||
|
||||
Uni<Optional<byte[]>> read(String fileUuid, long offset, int length);
|
||||
Uni<Long> write(String fileUuid, long offset, byte[] data);
|
||||
Uni<Boolean> truncate(String fileUuid, long length);
|
||||
Optional<byte[]> read(String fileUuid, long offset, int length);
|
||||
Long write(String fileUuid, long offset, byte[] data);
|
||||
Boolean truncate(String fileUuid, long length);
|
||||
|
||||
Uni<Directory> getRoot();
|
||||
Directory getRoot();
|
||||
}
|
||||
|
||||
@@ -9,7 +9,6 @@ import com.usatiuk.dhfs.storage.objects.repository.ObjectRepository;
|
||||
import io.quarkus.logging.Log;
|
||||
import io.quarkus.runtime.ShutdownEvent;
|
||||
import io.quarkus.runtime.StartupEvent;
|
||||
import io.smallrye.mutiny.Uni;
|
||||
import io.vertx.mutiny.core.Vertx;
|
||||
import jakarta.annotation.Priority;
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
@@ -35,179 +34,178 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
|
||||
void init(@Observes @Priority(500) StartupEvent event) {
|
||||
Log.info("Initializing file service");
|
||||
if (!objectRepository.existsObject(namespace + new UUID(0, 0)).await().indefinitely()) {
|
||||
jObjectManager.put(new Directory(new UUID(0, 0), 0755)).await().indefinitely();
|
||||
if (!objectRepository.existsObject(namespace + new UUID(0, 0))) {
|
||||
jObjectManager.put(new Directory(new UUID(0, 0), 0755));
|
||||
}
|
||||
getRoot().await().indefinitely();
|
||||
getRoot();
|
||||
}
|
||||
|
||||
void shutdown(@Observes @Priority(100) ShutdownEvent event) {
|
||||
Log.info("Shutdown");
|
||||
}
|
||||
|
||||
private Uni<Optional<FsNode>> traverse(FsNode from, Path path) {
|
||||
if (path.getNameCount() == 0) return Uni.createFrom().item(Optional.of(from));
|
||||
private Optional<FsNode> traverse(FsNode from, Path path) {
|
||||
if (path.getNameCount() == 0) return Optional.of(from);
|
||||
|
||||
if (!(from instanceof Directory dir))
|
||||
return Uni.createFrom().item(Optional.empty());
|
||||
return Optional.empty();
|
||||
|
||||
var pathFirstPart = path.getName(0).toString();
|
||||
|
||||
var found = dir.getKid(pathFirstPart);
|
||||
if (found.isEmpty())
|
||||
return Uni.createFrom().item(Optional.empty());
|
||||
return Optional.empty();
|
||||
|
||||
var ref = jObjectManager.get(found.get().toString(), FsNode.class)
|
||||
.await().indefinitely();
|
||||
var ref = jObjectManager.get(found.get().toString(), FsNode.class);
|
||||
|
||||
if (ref.isEmpty()) {
|
||||
Log.error("File missing when traversing directory " + from.getName() + ": " + found);
|
||||
return Uni.createFrom().item(Optional.empty());
|
||||
return Optional.empty();
|
||||
}
|
||||
|
||||
if (path.getNameCount() == 1) return Uni.createFrom().item(ref);
|
||||
if (path.getNameCount() == 1) return ref;
|
||||
|
||||
return traverse(ref.get(), path.subpath(1, path.getNameCount()));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Uni<Optional<FsNode>> getDirEntry(String name) {
|
||||
var root = getRoot().await().indefinitely();
|
||||
var found = traverse(root, Path.of(name)).await().indefinitely();
|
||||
return Uni.createFrom().item(found);
|
||||
public Optional<FsNode> getDirEntry(String name) {
|
||||
var root = getRoot();
|
||||
var found = traverse(root, Path.of(name));
|
||||
return found;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Uni<Optional<File>> open(String name) {
|
||||
public Optional<File> open(String name) {
|
||||
// FIXME:
|
||||
var root = getRoot().await().indefinitely();
|
||||
var found = traverse(root, Path.of(name)).await().indefinitely();
|
||||
var root = getRoot();
|
||||
var found = traverse(root, Path.of(name));
|
||||
|
||||
if (found.isEmpty())
|
||||
return Uni.createFrom().item(Optional.empty());
|
||||
return Optional.empty();
|
||||
|
||||
if (!(found.get() instanceof File))
|
||||
return Uni.createFrom().item(Optional.empty());
|
||||
return Optional.empty();
|
||||
|
||||
return Uni.createFrom().item(Optional.of((File) found.get()));
|
||||
return Optional.of((File) found.get());
|
||||
}
|
||||
|
||||
@Override
|
||||
public Uni<Optional<File>> create(String name, long mode) {
|
||||
public Optional<File> create(String name, long mode) {
|
||||
// FIXME:
|
||||
var root = getRoot().await().indefinitely();
|
||||
var found = traverse(root, Path.of(name).getParent()).await().indefinitely();
|
||||
if (found.isEmpty()) return Uni.createFrom().item(Optional.empty());
|
||||
var root = getRoot();
|
||||
var found = traverse(root, Path.of(name).getParent());
|
||||
if (found.isEmpty()) return Optional.empty();
|
||||
|
||||
if (!(found.get() instanceof Directory dir)) return Uni.createFrom().item(Optional.empty());
|
||||
if (!(found.get() instanceof Directory dir)) return Optional.empty();
|
||||
|
||||
var fuuid = UUID.randomUUID();
|
||||
File f = new File(fuuid);
|
||||
f.setMode(mode);
|
||||
|
||||
jObjectManager.put(f).await().indefinitely();
|
||||
jObjectManager.put(f);
|
||||
|
||||
if (!dir.putKid(Path.of(name).getFileName().toString(), fuuid))
|
||||
return Uni.createFrom().item(Optional.empty());
|
||||
return Optional.empty();
|
||||
|
||||
jObjectManager.put(dir).await().indefinitely();
|
||||
jObjectManager.put(dir);
|
||||
|
||||
return Uni.createFrom().item(Optional.of(f));
|
||||
return Optional.of(f);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Uni<Optional<Directory>> mkdir(String name, long mode) {
|
||||
public Optional<Directory> mkdir(String name, long mode) {
|
||||
// FIXME:
|
||||
var root = getRoot().await().indefinitely();
|
||||
var found = traverse(root, Path.of(name).getParent()).await().indefinitely();
|
||||
if (found.isEmpty()) return Uni.createFrom().item(Optional.empty());
|
||||
var root = getRoot();
|
||||
var found = traverse(root, Path.of(name).getParent());
|
||||
if (found.isEmpty()) return Optional.empty();
|
||||
|
||||
if (!(found.get() instanceof Directory dir)) return Uni.createFrom().item(Optional.empty());
|
||||
if (!(found.get() instanceof Directory dir)) return Optional.empty();
|
||||
|
||||
var duuid = UUID.randomUUID();
|
||||
Directory d = new Directory(duuid);
|
||||
d.setMode(mode);
|
||||
|
||||
jObjectManager.put(d).await().indefinitely();
|
||||
jObjectManager.put(d);
|
||||
if (!dir.putKid(Path.of(name).getFileName().toString(), duuid))
|
||||
return Uni.createFrom().item(Optional.empty());
|
||||
jObjectManager.put(dir).await().indefinitely();
|
||||
return Optional.empty();
|
||||
jObjectManager.put(dir);
|
||||
|
||||
return Uni.createFrom().item(Optional.of(d));
|
||||
return Optional.of(d);
|
||||
}
|
||||
|
||||
private Uni<Boolean> rmdent(String name) {
|
||||
private Boolean rmdent(String name) {
|
||||
// FIXME:
|
||||
var root = getRoot().await().indefinitely();
|
||||
var found = traverse(root, Path.of(name).getParent()).await().indefinitely();
|
||||
if (found.isEmpty()) return Uni.createFrom().item(false);
|
||||
var root = getRoot();
|
||||
var found = traverse(root, Path.of(name).getParent());
|
||||
if (found.isEmpty()) return false;
|
||||
|
||||
if (!(found.get() instanceof Directory dir)) return Uni.createFrom().item(false);
|
||||
if (!(found.get() instanceof Directory dir)) return false;
|
||||
|
||||
var removed = dir.removeKid(Path.of(name).getFileName().toString());
|
||||
if (removed) jObjectManager.put(dir).await().indefinitely();
|
||||
if (removed) jObjectManager.put(dir);
|
||||
|
||||
return Uni.createFrom().item(removed);
|
||||
return removed;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Uni<Boolean> rmdir(String name) {
|
||||
public Boolean rmdir(String name) {
|
||||
return rmdent(name);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Uni<Boolean> unlink(String name) {
|
||||
public Boolean unlink(String name) {
|
||||
return rmdent(name);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Uni<Boolean> rename(String from, String to) {
|
||||
var dent = getDirEntry(from).await().indefinitely();
|
||||
if (dent.isEmpty()) return Uni.createFrom().item(false);
|
||||
if (!rmdent(from).await().indefinitely()) return Uni.createFrom().item(false);
|
||||
public Boolean rename(String from, String to) {
|
||||
var dent = getDirEntry(from);
|
||||
if (dent.isEmpty()) return false;
|
||||
if (!rmdent(from)) return false;
|
||||
|
||||
// FIXME:
|
||||
var root = getRoot().await().indefinitely();
|
||||
var found = traverse(root, Path.of(to).getParent()).await().indefinitely();
|
||||
if (found.isEmpty()) return Uni.createFrom().item(false);
|
||||
var root = getRoot();
|
||||
var found = traverse(root, Path.of(to).getParent());
|
||||
if (found.isEmpty()) return false;
|
||||
|
||||
if (!(found.get() instanceof Directory dir)) return Uni.createFrom().item(false);
|
||||
if (!(found.get() instanceof Directory dir)) return false;
|
||||
|
||||
if (!dir.putKid(Path.of(to).getFileName().toString(), dent.get().getUuid()))
|
||||
return Uni.createFrom().item(false);
|
||||
jObjectManager.put(dir).await().indefinitely();
|
||||
return false;
|
||||
jObjectManager.put(dir);
|
||||
|
||||
return Uni.createFrom().item(true);
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Uni<Boolean> chmod(String name, long mode) {
|
||||
var dent = getDirEntry(name).await().indefinitely();
|
||||
if (dent.isEmpty()) return Uni.createFrom().item(false);
|
||||
public Boolean chmod(String name, long mode) {
|
||||
var dent = getDirEntry(name);
|
||||
if (dent.isEmpty()) return false;
|
||||
|
||||
dent.get().setMode(mode);
|
||||
|
||||
jObjectManager.put(dent.get()).await().indefinitely();
|
||||
jObjectManager.put(dent.get());
|
||||
|
||||
return Uni.createFrom().item(true);
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Uni<Iterable<String>> readDir(String name) {
|
||||
var root = getRoot().await().indefinitely();
|
||||
var found = traverse(root, Path.of(name)).await().indefinitely();
|
||||
public Iterable<String> readDir(String name) {
|
||||
var root = getRoot();
|
||||
var found = traverse(root, Path.of(name));
|
||||
if (found.isEmpty()) throw new IllegalArgumentException();
|
||||
if (!(found.get() instanceof Directory foundDir)) throw new IllegalArgumentException();
|
||||
|
||||
return Uni.createFrom().item(foundDir.getChildrenList());
|
||||
return foundDir.getChildrenList();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Uni<Optional<byte[]>> read(String fileUuid, long offset, int length) {
|
||||
var fileOpt = jObjectManager.get(fileUuid, File.class).await().indefinitely();
|
||||
public Optional<byte[]> read(String fileUuid, long offset, int length) {
|
||||
var fileOpt = jObjectManager.get(fileUuid, File.class);
|
||||
if (fileOpt.isEmpty()) {
|
||||
Log.error("File not found when trying to read: " + fileUuid);
|
||||
return Uni.createFrom().item(Optional.empty());
|
||||
return Optional.empty();
|
||||
}
|
||||
var file = fileOpt.get();
|
||||
|
||||
@@ -221,7 +219,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
});
|
||||
} catch (Exception e) {
|
||||
Log.error("Error reading file: " + fileUuid, e);
|
||||
return Uni.createFrom().item(Optional.empty());
|
||||
return Optional.empty();
|
||||
}
|
||||
|
||||
var chunks = chunksList.get().iterator();
|
||||
@@ -238,11 +236,11 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
long toReadInChunk = (offset + length) - curPos;
|
||||
|
||||
var chunkUuid = chunk.getValue();
|
||||
var chunkRead = jObjectManager.get(chunkUuid, Chunk.class).await().indefinitely();
|
||||
var chunkRead = jObjectManager.get(chunkUuid, Chunk.class);
|
||||
|
||||
if (chunkRead.isEmpty()) {
|
||||
Log.error("Chunk requested not found: " + chunkUuid);
|
||||
return Uni.createFrom().item(Optional.empty());
|
||||
return Optional.empty();
|
||||
}
|
||||
|
||||
var chunkBytes = chunkRead.get().getBytes();
|
||||
@@ -264,15 +262,15 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
}
|
||||
|
||||
// FIXME:
|
||||
return Uni.createFrom().item(Optional.of(Arrays.copyOf(buf.array(), (int) (curPos - offset))));
|
||||
return Optional.of(Arrays.copyOf(buf.array(), (int) (curPos - offset)));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Uni<Long> write(String fileUuid, long offset, byte[] data) {
|
||||
var fileOpt = jObjectManager.get(fileUuid, File.class).await().indefinitely();
|
||||
public Long write(String fileUuid, long offset, byte[] data) {
|
||||
var fileOpt = jObjectManager.get(fileUuid, File.class);
|
||||
if (fileOpt.isEmpty()) {
|
||||
Log.error("File not found when trying to read: " + fileUuid);
|
||||
return Uni.createFrom().item(-1L);
|
||||
return -1L;
|
||||
}
|
||||
var file = fileOpt.get();
|
||||
|
||||
@@ -286,7 +284,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
});
|
||||
} catch (Exception e) {
|
||||
Log.error("Error reading file: " + fileUuid, e);
|
||||
return Uni.createFrom().item(-1L);
|
||||
return -1L;
|
||||
}
|
||||
|
||||
var chunksAll = chunksAllRef.get();
|
||||
@@ -301,33 +299,33 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
|
||||
if (first != null && first.getKey() < offset) {
|
||||
var chunkUuid = first.getValue();
|
||||
var chunkRead = jObjectManager.get(chunkUuid, Chunk.class).await().indefinitely();
|
||||
var chunkRead = jObjectManager.get(chunkUuid, Chunk.class);
|
||||
|
||||
if (chunkRead.isEmpty()) {
|
||||
Log.error("Chunk requested not found: " + chunkUuid);
|
||||
return Uni.createFrom().item(-1L);
|
||||
return -1L;
|
||||
}
|
||||
|
||||
var chunkBytes = chunkRead.get().getBytes();
|
||||
Chunk newChunk = new Chunk(Arrays.copyOfRange(chunkBytes, 0, (int) (offset - first.getKey())));
|
||||
jObjectManager.put(newChunk).await().indefinitely();
|
||||
jObjectManager.put(newChunk);
|
||||
|
||||
newChunks.put(first.getKey(), newChunk.getHash());
|
||||
}
|
||||
|
||||
{
|
||||
Chunk newChunk = new Chunk(data);
|
||||
jObjectManager.put(newChunk).await().indefinitely();
|
||||
jObjectManager.put(newChunk);
|
||||
|
||||
newChunks.put(offset, newChunk.getHash());
|
||||
}
|
||||
if (last != null) {
|
||||
var lchunkUuid = last.getValue();
|
||||
var lchunkRead = jObjectManager.get(lchunkUuid, Chunk.class).await().indefinitely();
|
||||
var lchunkRead = jObjectManager.get(lchunkUuid, Chunk.class);
|
||||
|
||||
if (lchunkRead.isEmpty()) {
|
||||
Log.error("Chunk requested not found: " + lchunkUuid);
|
||||
return Uni.createFrom().item(-1L);
|
||||
return -1L;
|
||||
}
|
||||
|
||||
var lchunkBytes = lchunkRead.get().getBytes();
|
||||
@@ -336,7 +334,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
var startInFile = offset + data.length;
|
||||
var startInChunk = startInFile - last.getKey();
|
||||
Chunk newChunk = new Chunk(Arrays.copyOfRange(lchunkBytes, (int) startInChunk, lchunkBytes.length));
|
||||
jObjectManager.put(newChunk).await().indefinitely();
|
||||
jObjectManager.put(newChunk);
|
||||
|
||||
newChunks.put(startInFile, newChunk.getHash());
|
||||
}
|
||||
@@ -351,20 +349,20 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
});
|
||||
} catch (Exception e) {
|
||||
Log.error("Error writing file chunks: " + fileUuid, e);
|
||||
return Uni.createFrom().item(-1L);
|
||||
return -1L;
|
||||
}
|
||||
|
||||
jObjectManager.put(file).await().indefinitely();
|
||||
jObjectManager.put(file);
|
||||
|
||||
return Uni.createFrom().item((long) data.length);
|
||||
return (long) data.length;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Uni<Boolean> truncate(String fileUuid, long length) {
|
||||
var fileOpt = jObjectManager.get(fileUuid, File.class).await().indefinitely();
|
||||
public Boolean truncate(String fileUuid, long length) {
|
||||
var fileOpt = jObjectManager.get(fileUuid, File.class);
|
||||
if (fileOpt.isEmpty()) {
|
||||
Log.error("File not found when trying to read: " + fileUuid);
|
||||
return Uni.createFrom().item(false);
|
||||
return false;
|
||||
}
|
||||
var file = fileOpt.get();
|
||||
|
||||
@@ -377,10 +375,10 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
});
|
||||
} catch (Exception e) {
|
||||
Log.error("Error writing file chunks: " + fileUuid, e);
|
||||
return Uni.createFrom().item(false);
|
||||
return false;
|
||||
}
|
||||
jObjectManager.put(file).await().indefinitely();
|
||||
return Uni.createFrom().item(true);
|
||||
jObjectManager.put(file);
|
||||
return true;
|
||||
}
|
||||
|
||||
AtomicReference<TreeMap<Long, String>> chunksAllRef = new AtomicReference<>();
|
||||
@@ -392,7 +390,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
});
|
||||
} catch (Exception e) {
|
||||
Log.error("Error reading file: " + fileUuid, e);
|
||||
return Uni.createFrom().item(false);
|
||||
return false;
|
||||
}
|
||||
|
||||
var chunksAll = chunksAllRef.get();
|
||||
@@ -403,11 +401,11 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
|
||||
if (lastChunk != null) {
|
||||
var chunkUuid = lastChunk.getValue();
|
||||
var chunkRead = jObjectManager.get(chunkUuid, Chunk.class).await().indefinitely();
|
||||
var chunkRead = jObjectManager.get(chunkUuid, Chunk.class);
|
||||
|
||||
if (chunkRead.isEmpty()) {
|
||||
Log.error("Chunk requested not found: " + chunkUuid);
|
||||
return Uni.createFrom().item(false);
|
||||
return false;
|
||||
}
|
||||
|
||||
var chunkBytes = chunkRead.get().getBytes();
|
||||
@@ -415,7 +413,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
if (lastChunk.getKey() + chunkBytes.length > 0) {
|
||||
int start = (int) (length - lastChunk.getKey());
|
||||
Chunk newChunk = new Chunk(Arrays.copyOfRange(chunkBytes, 0, (int) (length - start)));
|
||||
jObjectManager.put(newChunk).await().indefinitely();
|
||||
jObjectManager.put(newChunk);
|
||||
|
||||
newChunks.put(lastChunk.getKey(), newChunk.getHash());
|
||||
}
|
||||
@@ -430,20 +428,20 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
});
|
||||
} catch (Exception e) {
|
||||
Log.error("Error writing file chunks: " + fileUuid, e);
|
||||
return Uni.createFrom().item(false);
|
||||
return false;
|
||||
}
|
||||
|
||||
jObjectManager.put(file).await().indefinitely();
|
||||
jObjectManager.put(file);
|
||||
|
||||
return Uni.createFrom().item(true);
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Uni<Boolean> setTimes(String fileUuid, long atimeMs, long mtimeMs) {
|
||||
var fileOpt = jObjectManager.get(fileUuid, File.class).await().indefinitely();
|
||||
public Boolean setTimes(String fileUuid, long atimeMs, long mtimeMs) {
|
||||
var fileOpt = jObjectManager.get(fileUuid, File.class);
|
||||
if (fileOpt.isEmpty()) {
|
||||
Log.error("File not found when trying to read: " + fileUuid);
|
||||
return Uni.createFrom().item(false);
|
||||
return false;
|
||||
}
|
||||
var file = fileOpt.get();
|
||||
|
||||
@@ -454,16 +452,16 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
});
|
||||
} catch (Exception e) {
|
||||
Log.error("Error writing file chunks: " + fileUuid, e);
|
||||
return Uni.createFrom().item(false);
|
||||
return false;
|
||||
}
|
||||
|
||||
jObjectManager.put(file).await().indefinitely();
|
||||
jObjectManager.put(file);
|
||||
|
||||
return Uni.createFrom().item(true);
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Uni<Long> size(File f) {
|
||||
public Long size(File f) {
|
||||
int size = 0;
|
||||
//FIXME:
|
||||
AtomicReference<TreeMap<Long, String>> chunksAllRef = new AtomicReference<>();
|
||||
@@ -475,32 +473,32 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
});
|
||||
} catch (Exception e) {
|
||||
Log.error("Error reading file: " + f.getUuid(), e);
|
||||
return Uni.createFrom().item(-1L);
|
||||
return -1L;
|
||||
}
|
||||
|
||||
var chunksAll = chunksAllRef.get();
|
||||
|
||||
for (var chunk : chunksAll.entrySet()) {
|
||||
var chunkUuid = chunk.getValue();
|
||||
var chunkRead = jObjectManager.get(chunkUuid, Chunk.class).await().indefinitely();
|
||||
var chunkRead = jObjectManager.get(chunkUuid, Chunk.class);
|
||||
|
||||
if (chunkRead.isEmpty()) {
|
||||
Log.error("Chunk requested not found: " + chunkUuid);
|
||||
return Uni.createFrom().item(-1L);
|
||||
return -1L;
|
||||
}
|
||||
|
||||
var chunkBytes = chunkRead.get().getBytes();
|
||||
size += chunkBytes.length;
|
||||
}
|
||||
return Uni.createFrom().item((long) size);
|
||||
return (long) size;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Uni<Directory> getRoot() {
|
||||
var read = jObjectManager.get(new UUID(0, 0).toString(), FsNode.class).await().indefinitely();
|
||||
public Directory getRoot() {
|
||||
var read = jObjectManager.get(new UUID(0, 0).toString(), FsNode.class);
|
||||
if (read.isEmpty() || !(read.get() instanceof Directory)) {
|
||||
Log.error("Root directory not found");
|
||||
}
|
||||
return Uni.createFrom().item((Directory) read.get());
|
||||
return (Directory) read.get();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -68,14 +68,14 @@ public class DhfsFuse extends FuseStubFS {
|
||||
@Override
|
||||
public int getattr(String path, FileStat stat) {
|
||||
try {
|
||||
Optional<FsNode> found = fileService.getDirEntry(path).await().indefinitely();
|
||||
Optional<FsNode> found = fileService.getDirEntry(path);
|
||||
if (found.isEmpty()) {
|
||||
return -ErrorCodes.ENOENT();
|
||||
}
|
||||
if (found.get() instanceof File f) {
|
||||
stat.st_mode.set(S_IFREG | f.getMode());
|
||||
stat.st_nlink.set(1);
|
||||
stat.st_size.set(fileService.size(f).await().indefinitely());
|
||||
stat.st_size.set(fileService.size(f));
|
||||
} else if (found.get() instanceof Directory d) {
|
||||
stat.st_mode.set(S_IFDIR | d.getMode());
|
||||
stat.st_nlink.set(2);
|
||||
@@ -101,12 +101,12 @@ public class DhfsFuse extends FuseStubFS {
|
||||
@Override
|
||||
public int utimens(String path, Timespec[] timespec) {
|
||||
try {
|
||||
var fileOpt = fileService.open(path).await().indefinitely();
|
||||
var fileOpt = fileService.open(path);
|
||||
if (fileOpt.isEmpty()) return -ErrorCodes.ENOENT();
|
||||
var file = fileOpt.get();
|
||||
var res = fileService.setTimes(file.getUuid().toString(),
|
||||
timespec[0].tv_sec.get() * 1000,
|
||||
timespec[1].tv_sec.get() * 1000).await().indefinitely();
|
||||
timespec[1].tv_sec.get() * 1000);
|
||||
if (!res) return -ErrorCodes.EINVAL();
|
||||
else return 0;
|
||||
} catch (Exception e) {
|
||||
@@ -117,17 +117,17 @@ public class DhfsFuse extends FuseStubFS {
|
||||
|
||||
@Override
|
||||
public int open(String path, FuseFileInfo fi) {
|
||||
if (fileService.open(path).await().indefinitely().isEmpty()) return -ErrorCodes.ENOENT();
|
||||
if (fileService.open(path).isEmpty()) return -ErrorCodes.ENOENT();
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int read(String path, Pointer buf, long size, long offset, FuseFileInfo fi) {
|
||||
try {
|
||||
var fileOpt = fileService.open(path).await().indefinitely();
|
||||
var fileOpt = fileService.open(path);
|
||||
if (fileOpt.isEmpty()) return -ErrorCodes.ENOENT();
|
||||
var file = fileOpt.get();
|
||||
var read = fileService.read(file.getUuid().toString(), offset, (int) size).await().indefinitely();
|
||||
var read = fileService.read(file.getUuid().toString(), offset, (int) size);
|
||||
if (read.isEmpty()) return 0;
|
||||
buf.put(0, read.get(), 0, read.get().length);
|
||||
return read.get().length;
|
||||
@@ -140,12 +140,12 @@ public class DhfsFuse extends FuseStubFS {
|
||||
@Override
|
||||
public int write(String path, Pointer buf, long size, long offset, FuseFileInfo fi) {
|
||||
try {
|
||||
var fileOpt = fileService.open(path).await().indefinitely();
|
||||
var fileOpt = fileService.open(path);
|
||||
if (fileOpt.isEmpty()) return -ErrorCodes.ENOENT();
|
||||
var file = fileOpt.get();
|
||||
byte[] buffer = new byte[(int) size];
|
||||
buf.get(0, buffer, 0, (int) size);
|
||||
var written = fileService.write(file.getUuid().toString(), offset, buffer).await().indefinitely();
|
||||
var written = fileService.write(file.getUuid().toString(), offset, buffer);
|
||||
return written.intValue();
|
||||
} catch (Exception e) {
|
||||
Log.error("When writing " + path, e);
|
||||
@@ -155,45 +155,45 @@ public class DhfsFuse extends FuseStubFS {
|
||||
|
||||
@Override
|
||||
public int create(String path, long mode, FuseFileInfo fi) {
|
||||
var ret = fileService.create(path, mode).await().indefinitely();
|
||||
var ret = fileService.create(path, mode);
|
||||
if (ret.isEmpty()) return -ErrorCodes.ENOSPC();
|
||||
else return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int mkdir(String path, long mode) {
|
||||
var ret = fileService.mkdir(path, mode).await().indefinitely();
|
||||
var ret = fileService.mkdir(path, mode);
|
||||
if (ret.isEmpty()) return -ErrorCodes.ENOSPC();
|
||||
else return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int rmdir(String path) {
|
||||
var ret = fileService.rmdir(path).await().indefinitely();
|
||||
var ret = fileService.rmdir(path);
|
||||
if (!ret) return -ErrorCodes.ENOENT();
|
||||
else return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int rename(String path, String newName) {
|
||||
var ret = fileService.rename(path, newName).await().indefinitely();
|
||||
var ret = fileService.rename(path, newName);
|
||||
if (!ret) return -ErrorCodes.ENOENT();
|
||||
else return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int unlink(String path) {
|
||||
var ret = fileService.unlink(path).await().indefinitely();
|
||||
var ret = fileService.unlink(path);
|
||||
if (!ret) return -ErrorCodes.ENOENT();
|
||||
else return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int truncate(String path, long size) {
|
||||
var fileOpt = fileService.open(path).await().indefinitely();
|
||||
var fileOpt = fileService.open(path);
|
||||
if (fileOpt.isEmpty()) return -ErrorCodes.ENOENT();
|
||||
var file = fileOpt.get();
|
||||
var ok = fileService.truncate(file.getUuid().toString(), size).await().indefinitely();
|
||||
var ok = fileService.truncate(file.getUuid().toString(), size);
|
||||
if (ok)
|
||||
return 0;
|
||||
else
|
||||
@@ -202,7 +202,7 @@ public class DhfsFuse extends FuseStubFS {
|
||||
|
||||
@Override
|
||||
public int chmod(String path, long mode) {
|
||||
var ret = fileService.chmod(path, mode).await().indefinitely();
|
||||
var ret = fileService.chmod(path, mode);
|
||||
if (ret) return 0;
|
||||
else return -ErrorCodes.EINVAL();
|
||||
}
|
||||
@@ -211,7 +211,7 @@ public class DhfsFuse extends FuseStubFS {
|
||||
public int readdir(String path, Pointer buf, FuseFillDir filler, long offset, FuseFileInfo fi) {
|
||||
Iterable<String> found;
|
||||
try {
|
||||
found = fileService.readDir(path).await().indefinitely();
|
||||
found = fileService.readDir(path);
|
||||
} catch (Exception e) {
|
||||
return -ErrorCodes.ENOENT();
|
||||
}
|
||||
|
||||
@@ -3,7 +3,7 @@ package com.usatiuk.dhfs.storage.objects.api;
|
||||
import com.google.protobuf.ByteString;
|
||||
import com.usatiuk.dhfs.storage.objects.repository.ObjectRepository;
|
||||
import io.quarkus.grpc.GrpcService;
|
||||
import io.smallrye.common.annotation.RunOnVirtualThread;
|
||||
import io.smallrye.common.annotation.Blocking;
|
||||
import io.smallrye.mutiny.Uni;
|
||||
import jakarta.inject.Inject;
|
||||
|
||||
@@ -13,29 +13,32 @@ public class DhfsObjectGrpcService implements DhfsObjectGrpc {
|
||||
ObjectRepository objectRepository;
|
||||
|
||||
@Override
|
||||
@Blocking
|
||||
public Uni<FindObjectsReply> findObjects(FindObjectsRequest request) {
|
||||
return objectRepository.findObjects(request.getPrefix())
|
||||
.map(m -> FindObjectsReply.FindObjectsEntry.newBuilder().setName(m).build())
|
||||
.collect().in(FindObjectsReply::newBuilder, FindObjectsReply.Builder::addFound)
|
||||
.map(FindObjectsReply.Builder::build);
|
||||
var objects = objectRepository.findObjects(request.getPrefix());
|
||||
var builder = FindObjectsReply.newBuilder();
|
||||
for (var obj : objects) {
|
||||
builder.addFound(FindObjectsReply.FindObjectsEntry.newBuilder().setName(obj).build());
|
||||
}
|
||||
return Uni.createFrom().item(builder.build());
|
||||
}
|
||||
|
||||
@Override
|
||||
@RunOnVirtualThread
|
||||
@Blocking
|
||||
public Uni<ReadObjectReply> readObject(ReadObjectRequest request) {
|
||||
var read = objectRepository.readObject(request.getName());
|
||||
return Uni.createFrom().item(ReadObjectReply.newBuilder().setData(ByteString.copyFrom(read)).build());
|
||||
}
|
||||
|
||||
@Override
|
||||
@RunOnVirtualThread
|
||||
@Blocking
|
||||
public Uni<WriteObjectReply> writeObject(WriteObjectRequest request) {
|
||||
objectRepository.writeObject(request.getName(), request.getData().toByteArray(), false);
|
||||
return Uni.createFrom().item(WriteObjectReply.newBuilder().build());
|
||||
}
|
||||
|
||||
@Override
|
||||
@RunOnVirtualThread
|
||||
@Blocking
|
||||
public Uni<DeleteObjectReply> deleteObject(DeleteObjectRequest request) {
|
||||
objectRepository.deleteObject(request.getName());
|
||||
return Uni.createFrom().item(DeleteObjectReply.newBuilder().build());
|
||||
|
||||
@@ -5,7 +5,7 @@ import io.smallrye.mutiny.Uni;
|
||||
import java.util.Optional;
|
||||
|
||||
public interface JObjectManager {
|
||||
<T extends JObject> Uni<Optional<T>> get(String name, Class<T> clazz);
|
||||
<T extends JObject> Uni<Void> put(T object);
|
||||
<T extends JObject> Optional<T> get(String name, Class<T> clazz);
|
||||
<T extends JObject> void put(T object);
|
||||
void invalidateJObject(String name);
|
||||
}
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
package com.usatiuk.dhfs.storage.objects.jrepository;
|
||||
|
||||
import io.quarkus.logging.Log;
|
||||
import io.smallrye.mutiny.Uni;
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
import jakarta.inject.Inject;
|
||||
import lombok.Getter;
|
||||
@@ -56,29 +55,29 @@ public class JObjectManagerImpl implements JObjectManager {
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T extends JObject> Uni<Optional<T>> get(String name, Class<T> clazz) {
|
||||
public <T extends JObject> Optional<T> get(String name, Class<T> clazz) {
|
||||
cleanup();
|
||||
synchronized (_map) {
|
||||
var inMap = getFromMap(name, clazz);
|
||||
if (inMap != null) return Uni.createFrom().item(Optional.of(inMap));
|
||||
if (inMap != null) return Optional.of(inMap);
|
||||
}
|
||||
|
||||
var read = jObjectRepository.readJObjectChecked(name, clazz);
|
||||
|
||||
if (read.isEmpty())
|
||||
return Uni.createFrom().item(Optional.empty());
|
||||
return Optional.empty();
|
||||
|
||||
synchronized (_map) {
|
||||
var inMap = getFromMap(name, clazz);
|
||||
if (inMap != null) return Uni.createFrom().item(Optional.of(inMap));
|
||||
if (inMap != null) return Optional.of(inMap);
|
||||
_map.put(name, new NamedSoftReference(read.get(), _refQueue));
|
||||
}
|
||||
|
||||
return Uni.createFrom().item(Optional.of(read.get()));
|
||||
return Optional.of(read.get());
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T extends JObject> Uni<Void> put(T object) {
|
||||
public <T extends JObject> void put(T object) {
|
||||
cleanup();
|
||||
|
||||
synchronized (_map) {
|
||||
@@ -90,7 +89,6 @@ public class JObjectManagerImpl implements JObjectManager {
|
||||
}
|
||||
|
||||
jObjectRepository.writeJObject(object);
|
||||
return Uni.createFrom().voidItem();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -4,12 +4,13 @@ import io.smallrye.mutiny.Multi;
|
||||
import io.smallrye.mutiny.Uni;
|
||||
|
||||
import javax.annotation.Nonnull;
|
||||
import java.util.List;
|
||||
|
||||
public interface ObjectRepository {
|
||||
@Nonnull
|
||||
Multi<String> findObjects(String prefix);
|
||||
List<String> findObjects(String prefix);
|
||||
@Nonnull
|
||||
Uni<Boolean> existsObject(String name);
|
||||
Boolean existsObject(String name);
|
||||
|
||||
@Nonnull
|
||||
byte[] readObject(String name);
|
||||
|
||||
@@ -6,6 +6,7 @@ import io.smallrye.mutiny.Uni;
|
||||
import jakarta.inject.Inject;
|
||||
|
||||
import javax.annotation.Nonnull;
|
||||
import java.util.List;
|
||||
|
||||
//@ApplicationScoped
|
||||
public class SimplePersistentObjectRepository implements ObjectRepository {
|
||||
@@ -14,31 +15,31 @@ public class SimplePersistentObjectRepository implements ObjectRepository {
|
||||
|
||||
@Nonnull
|
||||
@Override
|
||||
public Multi<String> findObjects(String prefix) {
|
||||
public List<String> findObjects(String prefix) {
|
||||
return objectPersistentStore.findObjects(prefix);
|
||||
}
|
||||
|
||||
@Nonnull
|
||||
@Override
|
||||
public Uni<Boolean> existsObject(String name) {
|
||||
public Boolean existsObject(String name) {
|
||||
return objectPersistentStore.existsObject(name);
|
||||
}
|
||||
|
||||
@Nonnull
|
||||
@Override
|
||||
public byte[] readObject(String name) {
|
||||
return objectPersistentStore.readObject(name).await().indefinitely();
|
||||
return objectPersistentStore.readObject(name);
|
||||
}
|
||||
|
||||
@Nonnull
|
||||
@Override
|
||||
public void writeObject(String name, byte[] data, Boolean canIgnoreConflict) {
|
||||
objectPersistentStore.writeObject(name, data).await().indefinitely();
|
||||
objectPersistentStore.writeObject(name, data);
|
||||
}
|
||||
|
||||
@Nonnull
|
||||
@Override
|
||||
public void deleteObject(String name) {
|
||||
objectPersistentStore.deleteObject(name).await().indefinitely();
|
||||
objectPersistentStore.deleteObject(name);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -6,8 +6,6 @@ import com.usatiuk.dhfs.storage.objects.repository.persistence.ObjectPersistentS
|
||||
import io.quarkus.logging.Log;
|
||||
import io.quarkus.runtime.ShutdownEvent;
|
||||
import io.quarkus.runtime.StartupEvent;
|
||||
import io.smallrye.mutiny.Multi;
|
||||
import io.smallrye.mutiny.Uni;
|
||||
import io.vertx.mutiny.core.Vertx;
|
||||
import jakarta.annotation.Priority;
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
@@ -18,6 +16,7 @@ import org.eclipse.microprofile.config.inject.ConfigProperty;
|
||||
|
||||
import javax.annotation.Nonnull;
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
|
||||
@ApplicationScoped
|
||||
@@ -50,7 +49,7 @@ public class DistributedObjectRepository implements ObjectRepository {
|
||||
syncHandler.handleRemoteUpdate(
|
||||
IndexUpdatePush.newBuilder().setSelfname(selfname).setName(h.getName())
|
||||
.setAssumeUnique(h.getAssumeUnique())
|
||||
.setMtime(h.getMtime()).setPrevMtime(prevMtime).build()).await().indefinitely();
|
||||
.setMtime(h.getMtime()).setPrevMtime(prevMtime).build());
|
||||
}
|
||||
Log.info("Sync complete");
|
||||
} catch (Exception e) {
|
||||
@@ -64,14 +63,14 @@ public class DistributedObjectRepository implements ObjectRepository {
|
||||
|
||||
@Nonnull
|
||||
@Override
|
||||
public Multi<String> findObjects(String prefix) {
|
||||
public List<String> findObjects(String prefix) {
|
||||
throw new NotImplementedException();
|
||||
}
|
||||
|
||||
@Nonnull
|
||||
@Override
|
||||
public Uni<Boolean> existsObject(String name) {
|
||||
return Uni.createFrom().item(objectIndexService.exists(name));
|
||||
public Boolean existsObject(String name) {
|
||||
return objectIndexService.exists(name);
|
||||
}
|
||||
|
||||
@Nonnull
|
||||
@@ -86,18 +85,17 @@ public class DistributedObjectRepository implements ObjectRepository {
|
||||
var info = infoOpt.get();
|
||||
|
||||
Optional<byte[]> read = info.runReadLocked(() -> {
|
||||
if (objectPersistentStore.existsObject(name).await().indefinitely())
|
||||
return Optional.of(objectPersistentStore.readObject(name).await().indefinitely());
|
||||
if (objectPersistentStore.existsObject(name))
|
||||
return Optional.of(objectPersistentStore.readObject(name));
|
||||
return Optional.empty();
|
||||
});
|
||||
if (read.isPresent()) return read.get();
|
||||
// Race?
|
||||
|
||||
return info.runWriteLocked(() -> {
|
||||
return remoteObjectServiceClient.getObject(name).map(got -> {
|
||||
objectPersistentStore.writeObject(name, got);
|
||||
return got;
|
||||
}).await().indefinitely();
|
||||
var obj = remoteObjectServiceClient.getObject(name);
|
||||
objectPersistentStore.writeObject(name, obj);
|
||||
return obj;
|
||||
});
|
||||
}
|
||||
|
||||
@@ -107,7 +105,7 @@ public class DistributedObjectRepository implements ObjectRepository {
|
||||
var info = objectIndexService.getOrCreateMeta(name, canIgnoreConflict);
|
||||
|
||||
info.runWriteLocked(() -> {
|
||||
objectPersistentStore.writeObject(name, data).await().indefinitely();
|
||||
objectPersistentStore.writeObject(name, data);
|
||||
var prevMtime = info.getMtime();
|
||||
info.setMtime(System.currentTimeMillis());
|
||||
try {
|
||||
|
||||
@@ -24,7 +24,7 @@ public class RemoteObjectServiceClient {
|
||||
@Inject
|
||||
RemoteHostManager remoteHostManager;
|
||||
|
||||
public Uni<byte[]> getObject(String name) {
|
||||
public byte[] getObject(String name) {
|
||||
return remoteHostManager.withClient(client -> {
|
||||
var req = GetObjectRequest.newBuilder().setName(name).build();
|
||||
var reply = client.getObject(req);
|
||||
@@ -37,7 +37,7 @@ public class RemoteObjectServiceClient {
|
||||
throw new NotImplementedException();
|
||||
}
|
||||
}
|
||||
return Uni.createFrom().item(reply.getObject().getContent().toByteArray());
|
||||
return reply.getObject().getContent().toByteArray();
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -34,8 +34,8 @@ public class RemoteObjectServiceServer implements DhfsObjectSyncGrpc {
|
||||
if (metaOpt.isEmpty()) throw new StatusRuntimeException(Status.NOT_FOUND);
|
||||
var meta = metaOpt.get();
|
||||
Optional<Pair<Long, byte[]>> read = meta.runReadLocked(() -> {
|
||||
if (objectPersistentStore.existsObject(request.getName()).await().indefinitely())
|
||||
return Optional.of(Pair.of(meta.getMtime(), objectPersistentStore.readObject(request.getName()).await().indefinitely()));
|
||||
if (objectPersistentStore.existsObject(request.getName()))
|
||||
return Optional.of(Pair.of(meta.getMtime(), objectPersistentStore.readObject(request.getName())));
|
||||
return Optional.empty();
|
||||
});
|
||||
if (read.isEmpty()) throw new StatusRuntimeException(Status.NOT_FOUND);
|
||||
@@ -61,6 +61,6 @@ public class RemoteObjectServiceServer implements DhfsObjectSyncGrpc {
|
||||
@Blocking
|
||||
public Uni<IndexUpdateReply> indexUpdate(IndexUpdatePush request) {
|
||||
Log.info("<-- indexUpdate: " + request.getName() + " from: " + request.getPrevMtime() + " to: " + request.getMtime());
|
||||
return syncHandler.handleRemoteUpdate(request);
|
||||
return Uni.createFrom().item(syncHandler.handleRemoteUpdate(request));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -21,7 +21,7 @@ public class SyncHandler {
|
||||
@Inject
|
||||
JObjectManager jObjectManager;
|
||||
|
||||
public Uni<IndexUpdateReply> handleRemoteUpdate(IndexUpdatePush request) {
|
||||
public IndexUpdateReply handleRemoteUpdate(IndexUpdatePush request) {
|
||||
var metaOpt = objectIndexService.getOrCreateMeta(request.getName(), request.getAssumeUnique());
|
||||
metaOpt.runWriteLocked(() -> {
|
||||
if (metaOpt.getMtime() == request.getMtime()) {
|
||||
@@ -43,7 +43,7 @@ public class SyncHandler {
|
||||
metaOpt._remoteCopies.add(request.getSelfname());
|
||||
|
||||
try {
|
||||
objectPersistentStore.deleteObject(request.getName()).await().indefinitely();
|
||||
objectPersistentStore.deleteObject(request.getName());
|
||||
} catch (Exception ignored) {
|
||||
}
|
||||
|
||||
@@ -52,7 +52,7 @@ public class SyncHandler {
|
||||
return null;
|
||||
});
|
||||
|
||||
return Uni.createFrom().item(IndexUpdateReply.newBuilder().build());
|
||||
return IndexUpdateReply.newBuilder().build();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -6,7 +6,6 @@ import io.quarkus.logging.Log;
|
||||
import io.quarkus.runtime.ShutdownEvent;
|
||||
import io.quarkus.runtime.StartupEvent;
|
||||
import io.smallrye.mutiny.Multi;
|
||||
import io.smallrye.mutiny.Uni;
|
||||
import io.vertx.mutiny.core.Vertx;
|
||||
import io.vertx.mutiny.core.buffer.Buffer;
|
||||
import jakarta.annotation.Priority;
|
||||
@@ -18,6 +17,7 @@ import org.eclipse.microprofile.config.inject.ConfigProperty;
|
||||
import javax.annotation.Nonnull;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.Paths;
|
||||
import java.util.List;
|
||||
|
||||
@ApplicationScoped
|
||||
public class FileObjectPersistentStore implements ObjectPersistentStore {
|
||||
@@ -38,7 +38,7 @@ public class FileObjectPersistentStore implements ObjectPersistentStore {
|
||||
|
||||
@Nonnull
|
||||
@Override
|
||||
public Multi<String> findObjects(String prefix) {
|
||||
public List<String> findObjects(String prefix) {
|
||||
Path nsRoot = Paths.get(root);
|
||||
|
||||
if (!nsRoot.toFile().isDirectory())
|
||||
@@ -47,51 +47,48 @@ public class FileObjectPersistentStore implements ObjectPersistentStore {
|
||||
return vertx.fileSystem().readDir(nsRoot.toString()).onItem()
|
||||
.transformToMulti(v -> Multi.createFrom().iterable(v))
|
||||
.select().where(n -> n.startsWith(prefix))
|
||||
.map(f -> nsRoot.relativize(Paths.get(f)).toString());
|
||||
.map(f -> nsRoot.relativize(Paths.get(f)).toString()).collect().asList().await().indefinitely();
|
||||
}
|
||||
|
||||
@Nonnull
|
||||
@Override
|
||||
public Uni<Boolean> existsObject(String name) {
|
||||
public Boolean existsObject(String name) {
|
||||
Path obj = Paths.get(root, name);
|
||||
|
||||
if (!obj.toFile().isFile())
|
||||
return Uni.createFrom().item(false);
|
||||
|
||||
return Uni.createFrom().item(true);
|
||||
return obj.toFile().isFile();
|
||||
}
|
||||
|
||||
@Nonnull
|
||||
@Override
|
||||
public Uni<byte[]> readObject(String name) {
|
||||
public byte[] readObject(String name) {
|
||||
var file = Path.of(root, name);
|
||||
|
||||
if (!file.toFile().exists())
|
||||
throw new StatusRuntimeException(Status.NOT_FOUND);
|
||||
|
||||
return vertx.fileSystem().readFile(file.toString()).map(Buffer::getBytes);
|
||||
return vertx.fileSystem().readFile(file.toString()).map(Buffer::getBytes).await().indefinitely();
|
||||
}
|
||||
|
||||
@Nonnull
|
||||
@Override
|
||||
public Uni<Void> writeObject(String name, byte[] data) {
|
||||
public Void writeObject(String name, byte[] data) {
|
||||
var file = Path.of(root, name);
|
||||
|
||||
if (!Paths.get(root).toFile().isDirectory()
|
||||
&& !Paths.get(root).toFile().mkdirs())
|
||||
throw new StatusRuntimeException(Status.INTERNAL);
|
||||
|
||||
return vertx.fileSystem().writeFile(file.toString(), Buffer.buffer(data));
|
||||
return vertx.fileSystem().writeFile(file.toString(), Buffer.buffer(data)).await().indefinitely();
|
||||
}
|
||||
|
||||
@Nonnull
|
||||
@Override
|
||||
public Uni<Void> deleteObject(String name) {
|
||||
public Void deleteObject(String name) {
|
||||
var file = Path.of(root, name);
|
||||
|
||||
if (!file.toFile().exists())
|
||||
throw new StatusRuntimeException(Status.NOT_FOUND);
|
||||
|
||||
return vertx.fileSystem().delete(file.toString());
|
||||
return vertx.fileSystem().delete(file.toString()).await().indefinitely();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4,17 +4,18 @@ import io.smallrye.mutiny.Multi;
|
||||
import io.smallrye.mutiny.Uni;
|
||||
|
||||
import javax.annotation.Nonnull;
|
||||
import java.util.List;
|
||||
|
||||
public interface ObjectPersistentStore {
|
||||
@Nonnull
|
||||
Multi<String> findObjects(String prefix);
|
||||
List<String> findObjects(String prefix);
|
||||
@Nonnull
|
||||
Uni<Boolean> existsObject(String name);
|
||||
Boolean existsObject(String name);
|
||||
|
||||
@Nonnull
|
||||
Uni<byte[]> readObject(String name);
|
||||
byte[] readObject(String name);
|
||||
@Nonnull
|
||||
Uni<Void> writeObject(String name, byte[] data);
|
||||
Void writeObject(String name, byte[] data);
|
||||
@Nonnull
|
||||
Uni<Void> deleteObject(String name);
|
||||
Void deleteObject(String name);
|
||||
}
|
||||
|
||||
@@ -59,7 +59,7 @@ public class DhfsFileServiceSimpleTest {
|
||||
for (int start = 0; start < all.length(); start++) {
|
||||
for (int end = start; end <= all.length(); end++) {
|
||||
var read = fileService.read(fuuid.toString(), start, end - start);
|
||||
Assertions.assertArrayEquals(all.substring(start, end).getBytes(), read.await().indefinitely().get());
|
||||
Assertions.assertArrayEquals(all.substring(start, end).getBytes(), read.get());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user