mirror of
https://github.com/usatiuk/dhfs.git
synced 2025-10-28 20:47:49 +01:00
seemingly not completely broken write
This commit is contained in:
@@ -10,10 +10,14 @@ import java.util.Optional;
|
||||
public interface DhfsFileService {
|
||||
public Uni<Optional<DirEntry>> getDirEntry(String name);
|
||||
public Uni<Optional<File>> open(String name);
|
||||
public Uni<Optional<File>> create(String name);
|
||||
public Uni<Iterable<String>> readDir(String name);
|
||||
|
||||
public Uni<Long> size(File f);
|
||||
|
||||
public Uni<Optional<byte[]>> read(String fileUuid, long offset, int length);
|
||||
public Uni<Optional<Long>> write(String fileUuid, long offset, long length);
|
||||
public Uni<Long> write(String fileUuid, long offset, byte[] data);
|
||||
public Uni<Boolean> truncate(String fileUuid, long length);
|
||||
|
||||
public Uni<Directory> getRoot();
|
||||
}
|
||||
|
||||
@@ -23,7 +23,9 @@ import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.file.Path;
|
||||
import java.util.Arrays;
|
||||
import java.util.Optional;
|
||||
import java.util.TreeMap;
|
||||
import java.util.UUID;
|
||||
|
||||
// Note: this is not actually reactive
|
||||
@@ -72,16 +74,15 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
.map(o -> deserialize(o.getData().array()));
|
||||
}
|
||||
|
||||
private Uni<Optional<DirEntry>> traverse(Directory from, Path path) {
|
||||
private Uni<Optional<DirEntry>> traverse(DirEntry from, Path path) {
|
||||
if (path.getNameCount() == 0) return Uni.createFrom().item(Optional.of(from));
|
||||
for (var el : from.getChildren()) {
|
||||
if (!(from instanceof Directory dir))
|
||||
return Uni.createFrom().item(Optional.empty());
|
||||
for (var el : dir.getChildren()) {
|
||||
if (el.getLeft().equals(path.getName(0).toString())) {
|
||||
var ref = readDirEntry(el.getRight().toString()).await().indefinitely();
|
||||
if (ref instanceof Directory) {
|
||||
return traverse((Directory) ref, path.subpath(1, path.getNameCount()));
|
||||
} else {
|
||||
return Uni.createFrom().item(Optional.empty());
|
||||
}
|
||||
if (path.getNameCount() == 1) return Uni.createFrom().item(Optional.of(ref));
|
||||
return traverse(ref, path.subpath(1, path.getNameCount()));
|
||||
}
|
||||
}
|
||||
return Uni.createFrom().item(Optional.empty());
|
||||
@@ -104,6 +105,22 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
return Uni.createFrom().item(Optional.of((File) found.get()));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Uni<Optional<File>> create(String name) {
|
||||
// FIXME:
|
||||
var root = getRoot().await().indefinitely();
|
||||
var found = traverse(root, Path.of(name).getParent()).await().indefinitely();
|
||||
if (found.isEmpty()) return Uni.createFrom().item(Optional.empty());
|
||||
if (!(found.get() instanceof Directory dir)) return Uni.createFrom().item(Optional.empty());
|
||||
var fuuid = UUID.randomUUID();
|
||||
File f = new File();
|
||||
f.setUuid(fuuid);
|
||||
objectRepository.writeObject(namespace, fuuid.toString(), ByteBuffer.wrap(SerializationUtils.serialize(f))).await().indefinitely();
|
||||
dir.getChildren().add(Pair.of(Path.of(name).getFileName().toString(), fuuid));
|
||||
objectRepository.writeObject(namespace, dir.getUuid().toString(), ByteBuffer.wrap(SerializationUtils.serialize(dir))).await().indefinitely();
|
||||
return Uni.createFrom().item(Optional.of((File) f));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Uni<Iterable<String>> readDir(String name) {
|
||||
var root = getRoot().await().indefinitely();
|
||||
@@ -153,7 +170,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
|
||||
buf.put(chunkBytes, (int) offInChunk, (int) toReadReally);
|
||||
|
||||
if (readableLen > toReadInChunk)
|
||||
if (readableLen >= toReadInChunk)
|
||||
break;
|
||||
else
|
||||
curPos += readableLen;
|
||||
@@ -165,8 +182,131 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
}
|
||||
|
||||
@Override
|
||||
public Uni<Optional<Long>> write(String fileUuid, long offset, long length) {
|
||||
return null;
|
||||
public Uni<Long> write(String fileUuid, long offset, byte[] data) {
|
||||
var read = objectRepository.readObject(namespace, fileUuid).map(o -> deserialize(o.getData().array())).await().indefinitely();
|
||||
if (!(read instanceof File file)) {
|
||||
return Uni.createFrom().item(-1L);
|
||||
}
|
||||
|
||||
var chunksAll = file.getChunks();
|
||||
|
||||
var first = chunksAll.floorEntry(offset);
|
||||
var last = chunksAll.floorEntry((offset + data.length) - 1);
|
||||
|
||||
var newChunks = new TreeMap<Long, String>();
|
||||
for (var c : chunksAll.entrySet()) {
|
||||
if (c.getKey() < offset) newChunks.put(c.getKey(), c.getValue());
|
||||
}
|
||||
|
||||
if (first != null && first.getKey() < offset) {
|
||||
var chunkUuid = first.getValue();
|
||||
var chunkRead = objectRepository.readObject(namespace, chunkUuid).map(o -> deserialize(o.getData().array())).await().indefinitely();
|
||||
if (!(chunkRead instanceof Chunk chunkObj)) {
|
||||
Log.error("Chunk requested not a chunk: " + chunkUuid);
|
||||
return Uni.createFrom().item(-1L);
|
||||
}
|
||||
|
||||
var chunkBytes = chunkObj.getBytes();
|
||||
Chunk newChunk = new Chunk(Arrays.copyOfRange(chunkBytes, 0, (int) (offset - first.getKey())));
|
||||
objectRepository.writeObject(namespace, newChunk.getHash(), ByteBuffer.wrap(SerializationUtils.serialize(newChunk))).await().indefinitely();
|
||||
|
||||
newChunks.put(first.getKey(), newChunk.getHash());
|
||||
}
|
||||
|
||||
{
|
||||
Chunk newChunk = new Chunk(data);
|
||||
objectRepository.writeObject(namespace, newChunk.getHash(), ByteBuffer.wrap(SerializationUtils.serialize(newChunk))).await().indefinitely();
|
||||
|
||||
newChunks.put(offset, newChunk.getHash());
|
||||
}
|
||||
if (last != null) {
|
||||
var lchunkUuid = last.getValue();
|
||||
var lchunkRead = objectRepository.readObject(namespace, lchunkUuid).map(o -> deserialize(o.getData().array())).await().indefinitely();
|
||||
if (!(lchunkRead instanceof Chunk lchunkObj)) {
|
||||
Log.error("Chunk requested not a chunk: " + lchunkUuid);
|
||||
return Uni.createFrom().item(-1L);
|
||||
}
|
||||
|
||||
var lchunkBytes = lchunkObj.getBytes();
|
||||
|
||||
if (last.getKey() + lchunkBytes.length > offset + data.length) {
|
||||
int start = (int) ((offset + data.length) - last.getKey());
|
||||
Chunk newChunk = new Chunk(Arrays.copyOfRange(lchunkBytes, start, lchunkBytes.length - start));
|
||||
objectRepository.writeObject(namespace, newChunk.getHash(), ByteBuffer.wrap(SerializationUtils.serialize(newChunk))).await().indefinitely();
|
||||
|
||||
newChunks.put(first.getKey(), newChunk.getHash());
|
||||
}
|
||||
}
|
||||
|
||||
objectRepository.writeObject(namespace, fileUuid, ByteBuffer.wrap(SerializationUtils.serialize(
|
||||
new File().setChunks(newChunks).setUuid(file.getUuid())
|
||||
))).await().indefinitely();
|
||||
|
||||
return Uni.createFrom().item((long) data.length);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Uni<Boolean> truncate(String fileUuid, long length) {
|
||||
var read = objectRepository.readObject(namespace, fileUuid).map(o -> deserialize(o.getData().array())).await().indefinitely();
|
||||
if (!(read instanceof File file)) {
|
||||
return Uni.createFrom().item(false);
|
||||
}
|
||||
|
||||
if (length == 0) {
|
||||
objectRepository.writeObject(namespace, fileUuid, ByteBuffer.wrap(SerializationUtils.serialize(
|
||||
new File().setChunks(new TreeMap<>()).setUuid(file.getUuid())
|
||||
))).await().indefinitely();
|
||||
return Uni.createFrom().item(true);
|
||||
}
|
||||
|
||||
var chunksAll = file.getChunks();
|
||||
|
||||
var newChunks = chunksAll.subMap(0L, length - 1);
|
||||
|
||||
var lastChunk = newChunks.lastEntry();
|
||||
|
||||
if (lastChunk != null) {
|
||||
var chunkUuid = lastChunk.getValue();
|
||||
var chunkRead = objectRepository.readObject(namespace, chunkUuid).map(o -> deserialize(o.getData().array())).await().indefinitely();
|
||||
if (!(chunkRead instanceof Chunk chunkObj)) {
|
||||
Log.error("Chunk requested not a chunk: " + chunkUuid);
|
||||
return Uni.createFrom().item(false);
|
||||
}
|
||||
|
||||
var chunkBytes = chunkObj.getBytes();
|
||||
|
||||
if (lastChunk.getKey() + chunkBytes.length > 0) {
|
||||
int start = (int) (length - lastChunk.getKey());
|
||||
Chunk newChunk = new Chunk(Arrays.copyOfRange(chunkBytes, 0, (int) (length - start)));
|
||||
objectRepository.writeObject(namespace, newChunk.getHash(), ByteBuffer.wrap(SerializationUtils.serialize(newChunk))).await().indefinitely();
|
||||
|
||||
newChunks.put(lastChunk.getKey(), newChunk.getHash());
|
||||
}
|
||||
}
|
||||
|
||||
objectRepository.writeObject(namespace, fileUuid, ByteBuffer.wrap(SerializationUtils.serialize(
|
||||
new File().setChunks(new TreeMap<>(newChunks)).setUuid(file.getUuid())
|
||||
))).await().indefinitely();
|
||||
|
||||
return Uni.createFrom().item(true);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Uni<Long> size(File f) {
|
||||
int size = 0;
|
||||
//FIXME:
|
||||
for (var chunk : f.getChunks().entrySet()) {
|
||||
var lchunkUuid = chunk.getValue();
|
||||
var lchunkRead = objectRepository.readObject(namespace, lchunkUuid).map(o -> deserialize(o.getData().array())).await().indefinitely();
|
||||
if (!(lchunkRead instanceof Chunk lchunkObj)) {
|
||||
Log.error("Chunk requested not a chunk: " + lchunkUuid);
|
||||
return Uni.createFrom().item(-1L);
|
||||
}
|
||||
|
||||
var lchunkBytes = lchunkObj.getBytes();
|
||||
size += lchunkBytes.length;
|
||||
}
|
||||
return Uni.createFrom().item((long) size);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -20,7 +20,6 @@ import ru.serce.jnrfuse.struct.FileStat;
|
||||
import ru.serce.jnrfuse.struct.FuseFileInfo;
|
||||
|
||||
import java.nio.file.Paths;
|
||||
import java.time.Duration;
|
||||
import java.util.Optional;
|
||||
|
||||
import static jnr.posix.FileStat.S_IFDIR;
|
||||
@@ -48,13 +47,15 @@ public class DhfsFuse extends FuseStubFS {
|
||||
found = fileService.getDirEntry(path).await().indefinitely();
|
||||
} catch (Exception e) {
|
||||
Log.error(e);
|
||||
return ErrorCodes.ENOENT();
|
||||
return -ErrorCodes.ENOENT();
|
||||
}
|
||||
if (found.isEmpty()) return ErrorCodes.ENOENT();
|
||||
if (found.get() instanceof File) {
|
||||
if (found.isEmpty()) {
|
||||
return -ErrorCodes.ENOENT();
|
||||
}
|
||||
if (found.get() instanceof File f) {
|
||||
stat.st_mode.set(S_IFREG | 0777);
|
||||
stat.st_nlink.set(1);
|
||||
stat.st_size.set(0);
|
||||
stat.st_size.set(fileService.size(f).await().indefinitely());
|
||||
} else if (found.get() instanceof Directory) {
|
||||
stat.st_mode.set(S_IFDIR | 0777);
|
||||
stat.st_nlink.set(2);
|
||||
@@ -64,18 +65,49 @@ public class DhfsFuse extends FuseStubFS {
|
||||
|
||||
@Override
|
||||
public int open(String path, FuseFileInfo fi) {
|
||||
if (fileService.open(path).await().indefinitely().isEmpty()) return ErrorCodes.ENOENT();
|
||||
if (fileService.open(path).await().indefinitely().isEmpty()) return -ErrorCodes.ENOENT();
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int read(String path, Pointer buf, long size, long offset, FuseFileInfo fi) {
|
||||
return super.read(path, buf, size, offset, fi);
|
||||
var fileOpt = fileService.open(path).await().indefinitely();
|
||||
if (fileOpt.isEmpty()) return -ErrorCodes.ENOENT();
|
||||
var file = fileOpt.get();
|
||||
var read = fileService.read(file.getUuid().toString(), offset, (int) size).await().indefinitely();
|
||||
if (read.isEmpty()) return 0;
|
||||
buf.put(0, read.get(), 0, read.get().length);
|
||||
return read.get().length;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int write(String path, Pointer buf, long size, long offset, FuseFileInfo fi) {
|
||||
return super.write(path, buf, size, offset, fi);
|
||||
var fileOpt = fileService.open(path).await().indefinitely();
|
||||
if (fileOpt.isEmpty()) return -ErrorCodes.ENOENT();
|
||||
var file = fileOpt.get();
|
||||
byte[] buffer = new byte[(int) size];
|
||||
buf.get(0, buffer, 0, (int) size);
|
||||
var written = fileService.write(file.getUuid().toString(), offset, buffer).await().indefinitely();
|
||||
return written.intValue();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int create(String path, long mode, FuseFileInfo fi) {
|
||||
var ret = fileService.create(path).await().indefinitely();
|
||||
if (ret.isEmpty()) return -ErrorCodes.ENOSPC();
|
||||
else return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int truncate(String path, long size) {
|
||||
var fileOpt = fileService.open(path).await().indefinitely();
|
||||
if (fileOpt.isEmpty()) return -ErrorCodes.ENOENT();
|
||||
var file = fileOpt.get();
|
||||
var ok = fileService.truncate(file.getUuid().toString(), size).await().indefinitely();
|
||||
if (ok)
|
||||
return 0;
|
||||
else
|
||||
return -ErrorCodes.ENOSPC();
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -84,7 +116,7 @@ public class DhfsFuse extends FuseStubFS {
|
||||
try {
|
||||
found = fileService.readDir(path).await().indefinitely();
|
||||
} catch (Exception e) {
|
||||
return ErrorCodes.ENOENT();
|
||||
return -ErrorCodes.ENOENT();
|
||||
}
|
||||
|
||||
filler.apply(buf, ".", null, 0);
|
||||
|
||||
@@ -47,7 +47,7 @@ public class DhfsFileServiceSimpleTest extends SimpleFileRepoTest {
|
||||
|
||||
{
|
||||
for (int start = 0; start < all.length(); start++) {
|
||||
for (int end = start; end < all.length(); end++) {
|
||||
for (int end = start; end <= all.length(); end++) {
|
||||
var read = fileService.read(fuuid.toString(), start, end - start);
|
||||
Assertions.assertArrayEquals(all.substring(start, end).getBytes(), read.await().indefinitely().get());
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user