mirror of
https://github.com/usatiuk/dhfs.git
synced 2025-10-28 20:47:49 +01:00
file read
This commit is contained in:
@@ -70,6 +70,11 @@
|
||||
<artifactId>commons-io</artifactId>
|
||||
<version>2.16.1</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>commons-codec</groupId>
|
||||
<artifactId>commons-codec</artifactId>
|
||||
<version>1.17.0</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
|
||||
@@ -1,12 +1,26 @@
|
||||
package com.usatiuk.dhfs.storage.files.objects;
|
||||
|
||||
import lombok.Getter;
|
||||
import lombok.Setter;
|
||||
import lombok.experimental.Accessors;
|
||||
import org.apache.commons.codec.digest.DigestUtils;
|
||||
|
||||
import java.io.Serial;
|
||||
import java.io.Serializable;
|
||||
import java.util.Arrays;
|
||||
|
||||
@Accessors(chain = true)
|
||||
@Getter
|
||||
@Setter
|
||||
public class Chunk implements Serializable {
|
||||
@Serial
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
String hash;
|
||||
byte[] bytes;
|
||||
public Chunk(byte[] bytes) {
|
||||
this.bytes = Arrays.copyOf(bytes, bytes.length);
|
||||
this.hash = DigestUtils.sha512Hex(bytes);
|
||||
}
|
||||
|
||||
final String hash;
|
||||
final byte[] bytes;
|
||||
}
|
||||
|
||||
@@ -4,11 +4,12 @@ import lombok.Getter;
|
||||
import lombok.Setter;
|
||||
import lombok.experimental.Accessors;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.NavigableMap;
|
||||
import java.util.TreeMap;
|
||||
|
||||
@Accessors(chain = true)
|
||||
@Getter
|
||||
@Setter
|
||||
public class File extends DirEntry {
|
||||
Map<Long, String> chunks;
|
||||
NavigableMap<Long, String> chunks = new TreeMap<>();
|
||||
}
|
||||
|
||||
@@ -12,5 +12,8 @@ public interface DhfsFileService {
|
||||
public Uni<Optional<File>> open(String name);
|
||||
public Uni<Iterable<String>> readDir(String name);
|
||||
|
||||
public Uni<Optional<byte[]>> read(String fileUuid, long offset, int length);
|
||||
public Uni<Optional<Long>> write(String fileUuid, long offset, long length);
|
||||
|
||||
public Uni<Directory> getRoot();
|
||||
}
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
package com.usatiuk.dhfs.storage.files.service;
|
||||
|
||||
import com.usatiuk.dhfs.storage.files.objects.Chunk;
|
||||
import com.usatiuk.dhfs.storage.files.objects.DirEntry;
|
||||
import com.usatiuk.dhfs.storage.files.objects.Directory;
|
||||
import com.usatiuk.dhfs.storage.files.objects.File;
|
||||
@@ -25,6 +26,7 @@ import java.nio.file.Path;
|
||||
import java.util.Optional;
|
||||
import java.util.UUID;
|
||||
|
||||
// Note: this is not actually reactive
|
||||
@ApplicationScoped
|
||||
public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
@Inject
|
||||
@@ -113,6 +115,60 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
return Uni.createFrom().item(foundDir.getChildren().stream().map(Pair::getLeft).toList());
|
||||
}
|
||||
|
||||
@Override
|
||||
public Uni<Optional<byte[]>> read(String fileUuid, long offset, int length) {
|
||||
var read = objectRepository.readObject(namespace, fileUuid).map(o -> deserialize(o.getData().array())).await().indefinitely();
|
||||
if (!(read instanceof File file)) {
|
||||
return Uni.createFrom().item(Optional.empty());
|
||||
}
|
||||
|
||||
var chunksAll = file.getChunks();
|
||||
var chunks = chunksAll.tailMap(chunksAll.floorKey(offset)).entrySet().iterator();
|
||||
|
||||
ByteBuffer buf = ByteBuffer.allocate(length);
|
||||
|
||||
long curPos = offset;
|
||||
var chunk = chunks.next();
|
||||
|
||||
while (curPos < offset + length) {
|
||||
var chunkPos = chunk.getKey();
|
||||
|
||||
long offInChunk = curPos - chunkPos;
|
||||
|
||||
long toReadInChunk = (offset + length) - curPos;
|
||||
|
||||
var chunkUuid = chunk.getValue();
|
||||
var chunkRead = objectRepository.readObject(namespace, chunkUuid).map(o -> deserialize(o.getData().array())).await().indefinitely();
|
||||
|
||||
if (!(chunkRead instanceof Chunk chunkObj)) {
|
||||
Log.error("Chunk requested not a chunk: " + chunkUuid);
|
||||
return Uni.createFrom().item(Optional.empty());
|
||||
}
|
||||
|
||||
var chunkBytes = chunkObj.getBytes();
|
||||
|
||||
long readableLen = chunkBytes.length - offInChunk;
|
||||
|
||||
var toReadReally = Math.min(readableLen, toReadInChunk);
|
||||
|
||||
buf.put(chunkBytes, (int) offInChunk, (int) toReadReally);
|
||||
|
||||
if (readableLen > toReadInChunk)
|
||||
break;
|
||||
else
|
||||
curPos += readableLen;
|
||||
|
||||
chunk = chunks.next();
|
||||
}
|
||||
|
||||
return Uni.createFrom().item(Optional.of(buf.array()));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Uni<Optional<Long>> write(String fileUuid, long offset, long length) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Uni<Directory> getRoot() {
|
||||
return readDirEntry(new UUID(0, 0).toString()).map(d -> (Directory) d);
|
||||
|
||||
@@ -0,0 +1,58 @@
|
||||
package com.usatiuk.dhfs.files;
|
||||
|
||||
import com.usatiuk.dhfs.storage.SimpleFileRepoTest;
|
||||
import com.usatiuk.dhfs.storage.files.objects.Chunk;
|
||||
import com.usatiuk.dhfs.storage.files.objects.File;
|
||||
import com.usatiuk.dhfs.storage.files.service.DhfsFileService;
|
||||
import com.usatiuk.dhfs.storage.objects.repository.ObjectRepository;
|
||||
import io.quarkus.test.junit.QuarkusTest;
|
||||
import jakarta.inject.Inject;
|
||||
import org.apache.commons.lang3.SerializationUtils;
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.UUID;
|
||||
|
||||
@QuarkusTest
|
||||
public class DhfsFileServiceSimpleTest extends SimpleFileRepoTest {
|
||||
@Inject
|
||||
DhfsFileService fileService;
|
||||
@Inject
|
||||
ObjectRepository objectRepository;
|
||||
|
||||
@Test
|
||||
void readTest() {
|
||||
var fuuid = UUID.randomUUID();
|
||||
{
|
||||
Chunk c1 = new Chunk("12345".getBytes());
|
||||
Chunk c2 = new Chunk("678".getBytes());
|
||||
Chunk c3 = new Chunk("91011".getBytes());
|
||||
File f = new File();
|
||||
f.getChunks().put(0L, c1.getHash());
|
||||
f.getChunks().put((long) c1.getBytes().length, c2.getHash());
|
||||
f.getChunks().put((long) c1.getBytes().length + c2.getBytes().length, c3.getHash());
|
||||
|
||||
// FIXME: dhfs_files
|
||||
|
||||
objectRepository.createNamespace("dhfs_files");
|
||||
|
||||
objectRepository.writeObject("dhfs_files", c1.getHash(), ByteBuffer.wrap(SerializationUtils.serialize(c1))).await().indefinitely();
|
||||
objectRepository.writeObject("dhfs_files", c2.getHash(), ByteBuffer.wrap(SerializationUtils.serialize(c2))).await().indefinitely();
|
||||
objectRepository.writeObject("dhfs_files", c3.getHash(), ByteBuffer.wrap(SerializationUtils.serialize(c3))).await().indefinitely();
|
||||
objectRepository.writeObject("dhfs_files", fuuid.toString(), ByteBuffer.wrap(SerializationUtils.serialize(f))).await().indefinitely();
|
||||
}
|
||||
|
||||
String all = "1234567891011";
|
||||
|
||||
{
|
||||
for (int start = 0; start < all.length(); start++) {
|
||||
for (int end = start; end < all.length(); end++) {
|
||||
var read = fileService.read(fuuid.toString(), start, end - start);
|
||||
Assertions.assertArrayEquals(all.substring(start, end).getBytes(), read.await().indefinitely().get());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@@ -16,9 +16,6 @@ class DhfsObjectGrpcServiceTest extends SimpleFileRepoTest {
|
||||
@GrpcClient
|
||||
DhfsObjectGrpc dhfsObjectGrpc;
|
||||
|
||||
@ConfigProperty(name = "dhfs.filerepo.root")
|
||||
String tempDirectory;
|
||||
|
||||
@Test
|
||||
void writeReadTest() {
|
||||
dhfsObjectGrpc.createNamespace(
|
||||
|
||||
Reference in New Issue
Block a user