mirror of
https://github.com/usatiuk/dhfs.git
synced 2025-10-28 20:47:49 +01:00
a little cleanup
This commit is contained in:
@@ -1,5 +1,6 @@
|
||||
package com.usatiuk.dhfs.storage.files.objects;
|
||||
|
||||
import com.usatiuk.dhfs.storage.objects.jrepository.JObject;
|
||||
import lombok.Getter;
|
||||
import lombok.Setter;
|
||||
import lombok.experimental.Accessors;
|
||||
@@ -12,15 +13,17 @@ import java.util.Arrays;
|
||||
@Accessors(chain = true)
|
||||
@Getter
|
||||
@Setter
|
||||
public class Chunk implements Serializable {
|
||||
@Serial
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
public class Chunk extends JObject {
|
||||
public Chunk(byte[] bytes) {
|
||||
this.bytes = Arrays.copyOf(bytes, bytes.length);
|
||||
this.hash = DigestUtils.sha512Hex(bytes);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getName() {
|
||||
return hash;
|
||||
}
|
||||
|
||||
final String hash;
|
||||
final byte[] bytes;
|
||||
}
|
||||
|
||||
@@ -1,19 +1,24 @@
|
||||
package com.usatiuk.dhfs.storage.files.objects;
|
||||
|
||||
import com.usatiuk.dhfs.storage.objects.jrepository.JObject;
|
||||
import lombok.Getter;
|
||||
import lombok.Setter;
|
||||
import lombok.experimental.Accessors;
|
||||
|
||||
import java.io.Serial;
|
||||
import java.io.Serializable;
|
||||
import java.util.UUID;
|
||||
|
||||
@Accessors(chain = true)
|
||||
@Getter
|
||||
@Setter
|
||||
public abstract class DirEntry implements Serializable {
|
||||
@Serial
|
||||
private static final long serialVersionUID = 1L;
|
||||
public abstract class DirEntry extends JObject {
|
||||
final UUID uuid;
|
||||
|
||||
UUID uuid;
|
||||
protected DirEntry(UUID uuid) {
|
||||
this.uuid = uuid;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getName() {
|
||||
return uuid.toString();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -13,5 +13,9 @@ import java.util.UUID;
|
||||
@Getter
|
||||
@Setter
|
||||
public class Directory extends DirEntry {
|
||||
public Directory(UUID uuid) {
|
||||
super(uuid);
|
||||
}
|
||||
|
||||
Collection<Pair<String, UUID>> children = new ArrayList<>();
|
||||
}
|
||||
|
||||
@@ -6,10 +6,15 @@ import lombok.experimental.Accessors;
|
||||
|
||||
import java.util.NavigableMap;
|
||||
import java.util.TreeMap;
|
||||
import java.util.UUID;
|
||||
|
||||
@Accessors(chain = true)
|
||||
@Getter
|
||||
@Setter
|
||||
public class File extends DirEntry {
|
||||
public File(UUID uuid) {
|
||||
super(uuid);
|
||||
}
|
||||
|
||||
NavigableMap<Long, String> chunks = new TreeMap<>();
|
||||
}
|
||||
|
||||
@@ -8,16 +8,16 @@ import io.smallrye.mutiny.Uni;
|
||||
import java.util.Optional;
|
||||
|
||||
public interface DhfsFileService {
|
||||
public Uni<Optional<DirEntry>> getDirEntry(String name);
|
||||
public Uni<Optional<File>> open(String name);
|
||||
public Uni<Optional<File>> create(String name);
|
||||
public Uni<Iterable<String>> readDir(String name);
|
||||
Uni<Optional<DirEntry>> getDirEntry(String name);
|
||||
Uni<Optional<File>> open(String name);
|
||||
Uni<Optional<File>> create(String name);
|
||||
Uni<Iterable<String>> readDir(String name);
|
||||
|
||||
public Uni<Long> size(File f);
|
||||
Uni<Long> size(File f);
|
||||
|
||||
public Uni<Optional<byte[]>> read(String fileUuid, long offset, int length);
|
||||
public Uni<Long> write(String fileUuid, long offset, byte[] data);
|
||||
public Uni<Boolean> truncate(String fileUuid, long length);
|
||||
Uni<Optional<byte[]>> read(String fileUuid, long offset, int length);
|
||||
Uni<Long> write(String fileUuid, long offset, byte[] data);
|
||||
Uni<Boolean> truncate(String fileUuid, long length);
|
||||
|
||||
public Uni<Directory> getRoot();
|
||||
Uni<Directory> getRoot();
|
||||
}
|
||||
|
||||
@@ -4,6 +4,7 @@ import com.usatiuk.dhfs.storage.files.objects.Chunk;
|
||||
import com.usatiuk.dhfs.storage.files.objects.DirEntry;
|
||||
import com.usatiuk.dhfs.storage.files.objects.Directory;
|
||||
import com.usatiuk.dhfs.storage.files.objects.File;
|
||||
import com.usatiuk.dhfs.storage.objects.jrepository.JObjectRepository;
|
||||
import com.usatiuk.dhfs.storage.objects.repository.ObjectRepository;
|
||||
import io.quarkus.logging.Log;
|
||||
import io.quarkus.runtime.Shutdown;
|
||||
@@ -14,13 +15,8 @@ import jakarta.annotation.Priority;
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
import jakarta.enterprise.event.Observes;
|
||||
import jakarta.inject.Inject;
|
||||
import org.apache.commons.io.input.ClassLoaderObjectInputStream;
|
||||
import org.apache.commons.lang3.SerializationUtils;
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.file.Path;
|
||||
import java.util.Arrays;
|
||||
@@ -34,6 +30,8 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
@Inject
|
||||
Vertx vertx;
|
||||
@Inject
|
||||
JObjectRepository jObjectRepository;
|
||||
@Inject
|
||||
ObjectRepository objectRepository;
|
||||
|
||||
final static String namespace = "dhfs_files";
|
||||
@@ -42,10 +40,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
Log.info("Initializing file service");
|
||||
if (!objectRepository.existsObject(namespace, new UUID(0, 0).toString()).await().indefinitely()) {
|
||||
objectRepository.createNamespace(namespace).await().indefinitely();
|
||||
objectRepository.writeObject(namespace, new UUID(0, 0).toString(),
|
||||
ByteBuffer.wrap(SerializationUtils.serialize(
|
||||
new Directory().setUuid(new UUID(0, 0)))
|
||||
)).await().indefinitely();
|
||||
jObjectRepository.writeJObject(namespace, new Directory(new UUID(0, 0))).await().indefinitely();
|
||||
}
|
||||
getRoot().await().indefinitely();
|
||||
}
|
||||
@@ -55,34 +50,23 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
Log.info("Shutdown file service");
|
||||
}
|
||||
|
||||
// Taken from SerializationUtils
|
||||
public static <T> T deserialize(final InputStream inputStream) {
|
||||
try (ClassLoaderObjectInputStream in = new ClassLoaderObjectInputStream(Thread.currentThread().getContextClassLoader(), inputStream)) {
|
||||
final T obj = (T) in.readObject();
|
||||
return obj;
|
||||
} catch (IOException | ClassNotFoundException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
public static <T> T deserialize(final byte[] objectData) {
|
||||
return deserialize(new ByteArrayInputStream(objectData));
|
||||
}
|
||||
|
||||
private Uni<DirEntry> readDirEntry(String uuid) {
|
||||
return objectRepository.readObject(namespace, uuid)
|
||||
.map(o -> deserialize(o.getData().array()));
|
||||
}
|
||||
|
||||
private Uni<Optional<DirEntry>> traverse(DirEntry from, Path path) {
|
||||
if (path.getNameCount() == 0) return Uni.createFrom().item(Optional.of(from));
|
||||
|
||||
if (!(from instanceof Directory dir))
|
||||
return Uni.createFrom().item(Optional.empty());
|
||||
|
||||
for (var el : dir.getChildren()) {
|
||||
if (el.getLeft().equals(path.getName(0).toString())) {
|
||||
var ref = readDirEntry(el.getRight().toString()).await().indefinitely();
|
||||
if (path.getNameCount() == 1) return Uni.createFrom().item(Optional.of(ref));
|
||||
return traverse(ref, path.subpath(1, path.getNameCount()));
|
||||
var ref = jObjectRepository.readJObjectChecked(namespace, el.getRight().toString(), DirEntry.class)
|
||||
.await().indefinitely();
|
||||
if (!ref.isPresent()) {
|
||||
Log.error("File missing when traversing directory " + from.getName() + ": " + el.getRight().toString());
|
||||
return Uni.createFrom().item(Optional.empty());
|
||||
}
|
||||
if (path.getNameCount() == 1) return Uni.createFrom().item(ref);
|
||||
|
||||
return traverse(ref.get(), path.subpath(1, path.getNameCount()));
|
||||
}
|
||||
}
|
||||
return Uni.createFrom().item(Optional.empty());
|
||||
@@ -100,8 +84,13 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
// FIXME:
|
||||
var root = getRoot().await().indefinitely();
|
||||
var found = traverse(root, Path.of(name)).await().indefinitely();
|
||||
if (found.isEmpty()) return Uni.createFrom().item(Optional.empty());
|
||||
if (!(found.get() instanceof File)) return Uni.createFrom().item(Optional.empty());
|
||||
|
||||
if (found.isEmpty())
|
||||
return Uni.createFrom().item(Optional.empty());
|
||||
|
||||
if (!(found.get() instanceof File))
|
||||
return Uni.createFrom().item(Optional.empty());
|
||||
|
||||
return Uni.createFrom().item(Optional.of((File) found.get()));
|
||||
}
|
||||
|
||||
@@ -111,14 +100,17 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
var root = getRoot().await().indefinitely();
|
||||
var found = traverse(root, Path.of(name).getParent()).await().indefinitely();
|
||||
if (found.isEmpty()) return Uni.createFrom().item(Optional.empty());
|
||||
|
||||
if (!(found.get() instanceof Directory dir)) return Uni.createFrom().item(Optional.empty());
|
||||
|
||||
var fuuid = UUID.randomUUID();
|
||||
File f = new File();
|
||||
f.setUuid(fuuid);
|
||||
objectRepository.writeObject(namespace, fuuid.toString(), ByteBuffer.wrap(SerializationUtils.serialize(f))).await().indefinitely();
|
||||
File f = new File(fuuid);
|
||||
|
||||
jObjectRepository.writeJObject(namespace, f).await().indefinitely();
|
||||
dir.getChildren().add(Pair.of(Path.of(name).getFileName().toString(), fuuid));
|
||||
objectRepository.writeObject(namespace, dir.getUuid().toString(), ByteBuffer.wrap(SerializationUtils.serialize(dir))).await().indefinitely();
|
||||
return Uni.createFrom().item(Optional.of((File) f));
|
||||
jObjectRepository.writeJObject(namespace, dir).await().indefinitely();
|
||||
|
||||
return Uni.createFrom().item(Optional.of(f));
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -126,18 +118,19 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
var root = getRoot().await().indefinitely();
|
||||
var found = traverse(root, Path.of(name)).await().indefinitely();
|
||||
if (found.isEmpty()) throw new IllegalArgumentException();
|
||||
if (!(found.get() instanceof Directory)) throw new IllegalArgumentException();
|
||||
if (!(found.get() instanceof Directory foundDir)) throw new IllegalArgumentException();
|
||||
|
||||
var foundDir = (Directory) found.get();
|
||||
return Uni.createFrom().item(foundDir.getChildren().stream().map(Pair::getLeft).toList());
|
||||
}
|
||||
|
||||
@Override
|
||||
public Uni<Optional<byte[]>> read(String fileUuid, long offset, int length) {
|
||||
var read = objectRepository.readObject(namespace, fileUuid).map(o -> deserialize(o.getData().array())).await().indefinitely();
|
||||
if (!(read instanceof File file)) {
|
||||
var fileOpt = jObjectRepository.readJObjectChecked(namespace, fileUuid, File.class).await().indefinitely();
|
||||
if (fileOpt.isEmpty()) {
|
||||
Log.error("File not found when trying to read: " + fileUuid);
|
||||
return Uni.createFrom().item(Optional.empty());
|
||||
}
|
||||
var file = fileOpt.get();
|
||||
|
||||
var chunksAll = file.getChunks();
|
||||
var chunks = chunksAll.tailMap(chunksAll.floorKey(offset)).entrySet().iterator();
|
||||
@@ -155,14 +148,14 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
long toReadInChunk = (offset + length) - curPos;
|
||||
|
||||
var chunkUuid = chunk.getValue();
|
||||
var chunkRead = objectRepository.readObject(namespace, chunkUuid).map(o -> deserialize(o.getData().array())).await().indefinitely();
|
||||
var chunkRead = jObjectRepository.readJObjectChecked(namespace, chunkUuid, Chunk.class).await().indefinitely();
|
||||
|
||||
if (!(chunkRead instanceof Chunk chunkObj)) {
|
||||
Log.error("Chunk requested not a chunk: " + chunkUuid);
|
||||
if (chunkRead.isEmpty()) {
|
||||
Log.error("Chunk requested not found: " + chunkUuid);
|
||||
return Uni.createFrom().item(Optional.empty());
|
||||
}
|
||||
|
||||
var chunkBytes = chunkObj.getBytes();
|
||||
var chunkBytes = chunkRead.get().getBytes();
|
||||
|
||||
long readableLen = chunkBytes.length - offInChunk;
|
||||
|
||||
@@ -183,10 +176,12 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
|
||||
@Override
|
||||
public Uni<Long> write(String fileUuid, long offset, byte[] data) {
|
||||
var read = objectRepository.readObject(namespace, fileUuid).map(o -> deserialize(o.getData().array())).await().indefinitely();
|
||||
if (!(read instanceof File file)) {
|
||||
var fileOpt = jObjectRepository.readJObjectChecked(namespace, fileUuid, File.class).await().indefinitely();
|
||||
if (fileOpt.isEmpty()) {
|
||||
Log.error("File not found when trying to read: " + fileUuid);
|
||||
return Uni.createFrom().item(-1L);
|
||||
}
|
||||
var file = fileOpt.get();
|
||||
|
||||
var chunksAll = file.getChunks();
|
||||
|
||||
@@ -200,62 +195,63 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
|
||||
if (first != null && first.getKey() < offset) {
|
||||
var chunkUuid = first.getValue();
|
||||
var chunkRead = objectRepository.readObject(namespace, chunkUuid).map(o -> deserialize(o.getData().array())).await().indefinitely();
|
||||
if (!(chunkRead instanceof Chunk chunkObj)) {
|
||||
Log.error("Chunk requested not a chunk: " + chunkUuid);
|
||||
var chunkRead = jObjectRepository.readJObjectChecked(namespace, chunkUuid, Chunk.class).await().indefinitely();
|
||||
|
||||
if (chunkRead.isEmpty()) {
|
||||
Log.error("Chunk requested not found: " + chunkUuid);
|
||||
return Uni.createFrom().item(-1L);
|
||||
}
|
||||
|
||||
var chunkBytes = chunkObj.getBytes();
|
||||
var chunkBytes = chunkRead.get().getBytes();
|
||||
Chunk newChunk = new Chunk(Arrays.copyOfRange(chunkBytes, 0, (int) (offset - first.getKey())));
|
||||
objectRepository.writeObject(namespace, newChunk.getHash(), ByteBuffer.wrap(SerializationUtils.serialize(newChunk))).await().indefinitely();
|
||||
jObjectRepository.writeJObject(namespace, newChunk).await().indefinitely();
|
||||
|
||||
newChunks.put(first.getKey(), newChunk.getHash());
|
||||
}
|
||||
|
||||
{
|
||||
Chunk newChunk = new Chunk(data);
|
||||
objectRepository.writeObject(namespace, newChunk.getHash(), ByteBuffer.wrap(SerializationUtils.serialize(newChunk))).await().indefinitely();
|
||||
jObjectRepository.writeJObject(namespace, newChunk).await().indefinitely();
|
||||
|
||||
newChunks.put(offset, newChunk.getHash());
|
||||
}
|
||||
if (last != null) {
|
||||
var lchunkUuid = last.getValue();
|
||||
var lchunkRead = objectRepository.readObject(namespace, lchunkUuid).map(o -> deserialize(o.getData().array())).await().indefinitely();
|
||||
if (!(lchunkRead instanceof Chunk lchunkObj)) {
|
||||
Log.error("Chunk requested not a chunk: " + lchunkUuid);
|
||||
var lchunkRead = jObjectRepository.readJObjectChecked(namespace, lchunkUuid, Chunk.class).await().indefinitely();
|
||||
|
||||
if (lchunkRead.isEmpty()) {
|
||||
Log.error("Chunk requested not found: " + lchunkUuid);
|
||||
return Uni.createFrom().item(-1L);
|
||||
}
|
||||
|
||||
var lchunkBytes = lchunkObj.getBytes();
|
||||
var lchunkBytes = lchunkRead.get().getBytes();
|
||||
|
||||
if (last.getKey() + lchunkBytes.length > offset + data.length) {
|
||||
int start = (int) ((offset + data.length) - last.getKey());
|
||||
Chunk newChunk = new Chunk(Arrays.copyOfRange(lchunkBytes, start, lchunkBytes.length - start));
|
||||
objectRepository.writeObject(namespace, newChunk.getHash(), ByteBuffer.wrap(SerializationUtils.serialize(newChunk))).await().indefinitely();
|
||||
jObjectRepository.writeJObject(namespace, newChunk).await().indefinitely();
|
||||
|
||||
newChunks.put(first.getKey(), newChunk.getHash());
|
||||
}
|
||||
}
|
||||
|
||||
objectRepository.writeObject(namespace, fileUuid, ByteBuffer.wrap(SerializationUtils.serialize(
|
||||
new File().setChunks(newChunks).setUuid(file.getUuid())
|
||||
))).await().indefinitely();
|
||||
jObjectRepository.writeJObject(namespace, new File(UUID.fromString(fileUuid)).setChunks(newChunks)).await().indefinitely();
|
||||
|
||||
return Uni.createFrom().item((long) data.length);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Uni<Boolean> truncate(String fileUuid, long length) {
|
||||
var read = objectRepository.readObject(namespace, fileUuid).map(o -> deserialize(o.getData().array())).await().indefinitely();
|
||||
if (!(read instanceof File file)) {
|
||||
var fileOpt = jObjectRepository.readJObjectChecked(namespace, fileUuid, File.class).await().indefinitely();
|
||||
if (fileOpt.isEmpty()) {
|
||||
Log.error("File not found when trying to read: " + fileUuid);
|
||||
return Uni.createFrom().item(false);
|
||||
}
|
||||
var file = fileOpt.get();
|
||||
|
||||
if (length == 0) {
|
||||
objectRepository.writeObject(namespace, fileUuid, ByteBuffer.wrap(SerializationUtils.serialize(
|
||||
new File().setChunks(new TreeMap<>()).setUuid(file.getUuid())
|
||||
))).await().indefinitely();
|
||||
jObjectRepository.writeJObject(namespace, new File(UUID.fromString(fileUuid)).setChunks(new TreeMap<>()))
|
||||
.await().indefinitely();
|
||||
return Uni.createFrom().item(true);
|
||||
}
|
||||
|
||||
@@ -267,26 +263,25 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
|
||||
if (lastChunk != null) {
|
||||
var chunkUuid = lastChunk.getValue();
|
||||
var chunkRead = objectRepository.readObject(namespace, chunkUuid).map(o -> deserialize(o.getData().array())).await().indefinitely();
|
||||
if (!(chunkRead instanceof Chunk chunkObj)) {
|
||||
Log.error("Chunk requested not a chunk: " + chunkUuid);
|
||||
var chunkRead = jObjectRepository.readJObjectChecked(namespace, chunkUuid, Chunk.class).await().indefinitely();
|
||||
|
||||
if (chunkRead.isEmpty()) {
|
||||
Log.error("Chunk requested not found: " + chunkUuid);
|
||||
return Uni.createFrom().item(false);
|
||||
}
|
||||
|
||||
var chunkBytes = chunkObj.getBytes();
|
||||
var chunkBytes = chunkRead.get().getBytes();
|
||||
|
||||
if (lastChunk.getKey() + chunkBytes.length > 0) {
|
||||
int start = (int) (length - lastChunk.getKey());
|
||||
Chunk newChunk = new Chunk(Arrays.copyOfRange(chunkBytes, 0, (int) (length - start)));
|
||||
objectRepository.writeObject(namespace, newChunk.getHash(), ByteBuffer.wrap(SerializationUtils.serialize(newChunk))).await().indefinitely();
|
||||
jObjectRepository.writeJObject(namespace, newChunk).await().indefinitely();
|
||||
|
||||
newChunks.put(lastChunk.getKey(), newChunk.getHash());
|
||||
}
|
||||
}
|
||||
|
||||
objectRepository.writeObject(namespace, fileUuid, ByteBuffer.wrap(SerializationUtils.serialize(
|
||||
new File().setChunks(new TreeMap<>(newChunks)).setUuid(file.getUuid())
|
||||
))).await().indefinitely();
|
||||
jObjectRepository.writeJObject(namespace, new File(UUID.fromString(fileUuid)).setChunks(new TreeMap<>(newChunks))).await().indefinitely();
|
||||
|
||||
return Uni.createFrom().item(true);
|
||||
}
|
||||
@@ -296,21 +291,26 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
int size = 0;
|
||||
//FIXME:
|
||||
for (var chunk : f.getChunks().entrySet()) {
|
||||
var lchunkUuid = chunk.getValue();
|
||||
var lchunkRead = objectRepository.readObject(namespace, lchunkUuid).map(o -> deserialize(o.getData().array())).await().indefinitely();
|
||||
if (!(lchunkRead instanceof Chunk lchunkObj)) {
|
||||
Log.error("Chunk requested not a chunk: " + lchunkUuid);
|
||||
var chunkUuid = chunk.getValue();
|
||||
var chunkRead = jObjectRepository.readJObjectChecked(namespace, chunkUuid, Chunk.class).await().indefinitely();
|
||||
|
||||
if (chunkRead.isEmpty()) {
|
||||
Log.error("Chunk requested not found: " + chunkUuid);
|
||||
return Uni.createFrom().item(-1L);
|
||||
}
|
||||
|
||||
var lchunkBytes = lchunkObj.getBytes();
|
||||
size += lchunkBytes.length;
|
||||
var chunkBytes = chunkRead.get().getBytes();
|
||||
size += chunkBytes.length;
|
||||
}
|
||||
return Uni.createFrom().item((long) size);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Uni<Directory> getRoot() {
|
||||
return readDirEntry(new UUID(0, 0).toString()).map(d -> (Directory) d);
|
||||
var read = jObjectRepository.readJObjectChecked(namespace, new UUID(0, 0).toString(), DirEntry.class).await().indefinitely();
|
||||
if (read.isEmpty() || !(read.get() instanceof Directory)) {
|
||||
Log.error("Root directory not found");
|
||||
}
|
||||
return Uni.createFrom().item((Directory) read.get());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,13 +1,13 @@
|
||||
package com.usatiuk.dhfs.storage.objects.api;
|
||||
|
||||
import com.google.protobuf.ByteString;
|
||||
import com.usatiuk.dhfs.storage.objects.data.Namespace;
|
||||
import com.usatiuk.dhfs.storage.objects.data.Object;
|
||||
import com.usatiuk.dhfs.storage.objects.repository.ObjectRepository;
|
||||
import io.quarkus.grpc.GrpcService;
|
||||
import io.smallrye.mutiny.Uni;
|
||||
import jakarta.inject.Inject;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
@GrpcService
|
||||
public class DhfsObjectGrpcService implements DhfsObjectGrpc {
|
||||
@Inject
|
||||
@@ -29,7 +29,8 @@ public class DhfsObjectGrpcService implements DhfsObjectGrpc {
|
||||
|
||||
@Override
|
||||
public Uni<WriteObjectReply> writeObject(WriteObjectRequest request) {
|
||||
return objectRepository.writeObject(request.getNamespace(), request.getName(), ByteBuffer.wrap(request.getData().toByteArray()))
|
||||
return objectRepository.writeObject(request.getNamespace(),
|
||||
new Object(new Namespace(request.getNamespace()), request.getName(), request.getData().toByteArray()))
|
||||
.map(n -> WriteObjectReply.newBuilder().build());
|
||||
}
|
||||
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
package com.usatiuk.dhfs.storage.objects.data;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Getter;
|
||||
import lombok.Setter;
|
||||
import lombok.experimental.Accessors;
|
||||
@@ -7,6 +8,7 @@ import lombok.experimental.Accessors;
|
||||
@Accessors(chain = true)
|
||||
@Getter
|
||||
@Setter
|
||||
@AllArgsConstructor
|
||||
public class Namespace {
|
||||
String name;
|
||||
final String name;
|
||||
}
|
||||
|
||||
@@ -1,17 +1,17 @@
|
||||
package com.usatiuk.dhfs.storage.objects.data;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Getter;
|
||||
import lombok.Setter;
|
||||
import lombok.experimental.Accessors;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
@Accessors(chain = true)
|
||||
@Getter
|
||||
@Setter
|
||||
@AllArgsConstructor
|
||||
public class Object {
|
||||
Namespace namespace;
|
||||
final Namespace namespace;
|
||||
|
||||
String name;
|
||||
ByteBuffer data;
|
||||
final String name;
|
||||
final byte[] data;
|
||||
}
|
||||
|
||||
@@ -0,0 +1,7 @@
|
||||
package com.usatiuk.dhfs.storage.objects.jrepository;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
||||
public abstract class JObject implements Serializable {
|
||||
public abstract String getName();
|
||||
}
|
||||
@@ -0,0 +1,15 @@
|
||||
package com.usatiuk.dhfs.storage.objects.jrepository;
|
||||
|
||||
import io.smallrye.mutiny.Uni;
|
||||
|
||||
import javax.annotation.Nonnull;
|
||||
import java.util.Optional;
|
||||
|
||||
public interface JObjectRepository {
|
||||
@Nonnull
|
||||
Uni<Optional<JObject>> readJObject(String namespace, String name);
|
||||
@Nonnull
|
||||
<T extends JObject> Uni<Optional<T>> readJObjectChecked(String namespace, String name, Class<T> clazz);
|
||||
@Nonnull
|
||||
Uni<Void> writeJObject(String namespace, JObject object);
|
||||
}
|
||||
@@ -0,0 +1,74 @@
|
||||
package com.usatiuk.dhfs.storage.objects.jrepository;
|
||||
|
||||
import com.usatiuk.dhfs.storage.objects.data.Namespace;
|
||||
import com.usatiuk.dhfs.storage.objects.data.Object;
|
||||
import com.usatiuk.dhfs.storage.objects.repository.ObjectRepository;
|
||||
import io.quarkus.logging.Log;
|
||||
import io.smallrye.mutiny.Uni;
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
import jakarta.inject.Inject;
|
||||
import org.apache.commons.io.input.ClassLoaderObjectInputStream;
|
||||
import org.apache.commons.lang3.SerializationUtils;
|
||||
|
||||
import javax.annotation.Nonnull;
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.util.Optional;
|
||||
|
||||
@ApplicationScoped
|
||||
public class JObjectRepositoryImpl implements JObjectRepository {
|
||||
@Inject
|
||||
ObjectRepository objectRepository;
|
||||
|
||||
// Taken from SerializationUtils
|
||||
private static <T> T deserialize(final InputStream inputStream) {
|
||||
try (ClassLoaderObjectInputStream in = new ClassLoaderObjectInputStream(Thread.currentThread().getContextClassLoader(), inputStream)) {
|
||||
final T obj = (T) in.readObject();
|
||||
return obj;
|
||||
} catch (IOException | ClassNotFoundException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
private static <T> T deserialize(final byte[] objectData) {
|
||||
return deserialize(new ByteArrayInputStream(objectData));
|
||||
}
|
||||
|
||||
@Nonnull
|
||||
@Override
|
||||
public Uni<Optional<JObject>> readJObject(String namespace, String name) {
|
||||
return objectRepository.readObject(namespace, name).map(o -> {
|
||||
java.lang.Object obj = deserialize(new ByteArrayInputStream(o.getData()));
|
||||
if (!(obj instanceof JObject)) {
|
||||
Log.error("Read object is not a JObject: " + namespace + "/" + name);
|
||||
return Optional.empty();
|
||||
}
|
||||
return Optional.of((JObject) obj);
|
||||
});
|
||||
}
|
||||
|
||||
@Nonnull
|
||||
@Override
|
||||
public <T extends JObject> Uni<Optional<T>> readJObjectChecked(String namespace, String name, Class<T> clazz) {
|
||||
return readJObject(namespace, name).map(o -> {
|
||||
if (o.isEmpty()) return Optional.empty();
|
||||
|
||||
if (!clazz.isAssignableFrom(o.get().getClass())) {
|
||||
Log.error("Read object type mismatch: " + namespace + "/" + name);
|
||||
return Optional.empty();
|
||||
}
|
||||
return Optional.of((T) o.get());
|
||||
});
|
||||
}
|
||||
|
||||
@Nonnull
|
||||
@Override
|
||||
public Uni<Void> writeJObject(String namespace, JObject object) {
|
||||
final var obj = new Object(
|
||||
new Namespace(namespace),
|
||||
object.getName(),
|
||||
SerializationUtils.serialize(object));
|
||||
return objectRepository.writeObject(namespace, obj);
|
||||
}
|
||||
}
|
||||
@@ -9,19 +9,19 @@ import java.nio.ByteBuffer;
|
||||
|
||||
public interface ObjectRepository {
|
||||
@Nonnull
|
||||
public Multi<String> findObjects(String namespace, String prefix);
|
||||
Multi<String> findObjects(String namespace, String prefix);
|
||||
@Nonnull
|
||||
public Uni<Boolean> existsObject(String namespace, String name);
|
||||
Uni<Boolean> existsObject(String namespace, String name);
|
||||
|
||||
@Nonnull
|
||||
public Uni<Object> readObject(String namespace, String name);
|
||||
Uni<Object> readObject(String namespace, String name);
|
||||
@Nonnull
|
||||
public Uni<Void> writeObject(String namespace, String name, ByteBuffer data);
|
||||
Uni<Void> writeObject(String namespace, Object object);
|
||||
@Nonnull
|
||||
public Uni<Void> deleteObject(String namespace, String name);
|
||||
Uni<Void> deleteObject(String namespace, String name);
|
||||
|
||||
@Nonnull
|
||||
public Uni<Void> createNamespace(String namespace);
|
||||
Uni<Void> createNamespace(String namespace);
|
||||
@Nonnull
|
||||
public Uni<Void> deleteNamespace(String namespace);
|
||||
Uni<Void> deleteNamespace(String namespace);
|
||||
}
|
||||
|
||||
@@ -18,7 +18,6 @@ import jakarta.inject.Inject;
|
||||
import org.eclipse.microprofile.config.inject.ConfigProperty;
|
||||
|
||||
import javax.annotation.Nonnull;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.Paths;
|
||||
|
||||
@@ -68,24 +67,20 @@ public class SimpleFileObjectRepository implements ObjectRepository {
|
||||
@Nonnull
|
||||
@Override
|
||||
public Uni<Object> readObject(String namespace, String name) {
|
||||
var ret = new Object();
|
||||
|
||||
ret.setName(name)
|
||||
.setNamespace(new Namespace().setName(namespace));
|
||||
var file = Path.of(root, namespace, name);
|
||||
|
||||
if (!file.toFile().exists())
|
||||
throw new StatusRuntimeException(Status.NOT_FOUND);
|
||||
|
||||
return vertx.fileSystem().readFile(file.toString()).map(r -> ret.setData(ByteBuffer.wrap(r.getBytes())));
|
||||
return vertx.fileSystem().readFile(file.toString()).map(r -> new Object(new Namespace(namespace), name, r.getBytes()));
|
||||
}
|
||||
|
||||
@Nonnull
|
||||
@Override
|
||||
public Uni<Void> writeObject(String namespace, String name, ByteBuffer data) {
|
||||
var file = Path.of(root, namespace, name);
|
||||
public Uni<Void> writeObject(String namespace, Object object) {
|
||||
var file = Path.of(root, namespace, object.getName());
|
||||
|
||||
return vertx.fileSystem().writeFile(file.toString(), Buffer.buffer(data.array()));
|
||||
return vertx.fileSystem().writeFile(file.toString(), Buffer.buffer(object.getData()));
|
||||
}
|
||||
|
||||
@Nonnull
|
||||
|
||||
@@ -4,14 +4,13 @@ import com.usatiuk.dhfs.storage.SimpleFileRepoTest;
|
||||
import com.usatiuk.dhfs.storage.files.objects.Chunk;
|
||||
import com.usatiuk.dhfs.storage.files.objects.File;
|
||||
import com.usatiuk.dhfs.storage.files.service.DhfsFileService;
|
||||
import com.usatiuk.dhfs.storage.objects.jrepository.JObjectRepository;
|
||||
import com.usatiuk.dhfs.storage.objects.repository.ObjectRepository;
|
||||
import io.quarkus.test.junit.QuarkusTest;
|
||||
import jakarta.inject.Inject;
|
||||
import org.apache.commons.lang3.SerializationUtils;
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.UUID;
|
||||
|
||||
@QuarkusTest
|
||||
@@ -20,6 +19,8 @@ public class DhfsFileServiceSimpleTest extends SimpleFileRepoTest {
|
||||
DhfsFileService fileService;
|
||||
@Inject
|
||||
ObjectRepository objectRepository;
|
||||
@Inject
|
||||
JObjectRepository jObjectRepository;
|
||||
|
||||
@Test
|
||||
void readTest() {
|
||||
@@ -28,7 +29,7 @@ public class DhfsFileServiceSimpleTest extends SimpleFileRepoTest {
|
||||
Chunk c1 = new Chunk("12345".getBytes());
|
||||
Chunk c2 = new Chunk("678".getBytes());
|
||||
Chunk c3 = new Chunk("91011".getBytes());
|
||||
File f = new File();
|
||||
File f = new File(fuuid);
|
||||
f.getChunks().put(0L, c1.getHash());
|
||||
f.getChunks().put((long) c1.getBytes().length, c2.getHash());
|
||||
f.getChunks().put((long) c1.getBytes().length + c2.getBytes().length, c3.getHash());
|
||||
@@ -37,10 +38,10 @@ public class DhfsFileServiceSimpleTest extends SimpleFileRepoTest {
|
||||
|
||||
objectRepository.createNamespace("dhfs_files");
|
||||
|
||||
objectRepository.writeObject("dhfs_files", c1.getHash(), ByteBuffer.wrap(SerializationUtils.serialize(c1))).await().indefinitely();
|
||||
objectRepository.writeObject("dhfs_files", c2.getHash(), ByteBuffer.wrap(SerializationUtils.serialize(c2))).await().indefinitely();
|
||||
objectRepository.writeObject("dhfs_files", c3.getHash(), ByteBuffer.wrap(SerializationUtils.serialize(c3))).await().indefinitely();
|
||||
objectRepository.writeObject("dhfs_files", fuuid.toString(), ByteBuffer.wrap(SerializationUtils.serialize(f))).await().indefinitely();
|
||||
jObjectRepository.writeJObject("dhfs_files", c1).await().indefinitely();
|
||||
jObjectRepository.writeJObject("dhfs_files", c2).await().indefinitely();
|
||||
jObjectRepository.writeJObject("dhfs_files", c3).await().indefinitely();
|
||||
jObjectRepository.writeJObject("dhfs_files", f).await().indefinitely();
|
||||
}
|
||||
|
||||
String all = "1234567891011";
|
||||
|
||||
Reference in New Issue
Block a user