mirror of
https://github.com/usatiuk/dhfs.git
synced 2025-10-28 20:47:49 +01:00
fuse skeleton 2
This commit is contained in:
@@ -60,6 +60,16 @@
|
||||
<artifactId>jnr-ffi</artifactId>
|
||||
<version>2.2.16</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.commons</groupId>
|
||||
<artifactId>commons-lang3</artifactId>
|
||||
<version>3.14.0</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>commons-io</groupId>
|
||||
<artifactId>commons-io</artifactId>
|
||||
<version>2.16.1</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
|
||||
@@ -0,0 +1,12 @@
|
||||
package com.usatiuk.dhfs.storage.files.objects;
|
||||
|
||||
import java.io.Serial;
|
||||
import java.io.Serializable;
|
||||
|
||||
public class Chunk implements Serializable {
|
||||
@Serial
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
String hash;
|
||||
byte[] bytes;
|
||||
}
|
||||
@@ -1,6 +1,19 @@
|
||||
package com.usatiuk.dhfs.storage.files.objects;
|
||||
|
||||
import java.io.Serializable;
|
||||
import lombok.Getter;
|
||||
import lombok.Setter;
|
||||
import lombok.experimental.Accessors;
|
||||
|
||||
public class DirEntry implements Serializable {
|
||||
import java.io.Serial;
|
||||
import java.io.Serializable;
|
||||
import java.util.UUID;
|
||||
|
||||
@Accessors(chain = true)
|
||||
@Getter
|
||||
@Setter
|
||||
public abstract class DirEntry implements Serializable {
|
||||
@Serial
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
UUID uuid;
|
||||
}
|
||||
|
||||
@@ -3,11 +3,15 @@ package com.usatiuk.dhfs.storage.files.objects;
|
||||
import lombok.Getter;
|
||||
import lombok.Setter;
|
||||
import lombok.experimental.Accessors;
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.UUID;
|
||||
|
||||
@Accessors(chain = true)
|
||||
@Getter
|
||||
@Setter
|
||||
public class Directory extends DirEntry {
|
||||
Collection<Pair<String, UUID>> children = new ArrayList<>();
|
||||
}
|
||||
|
||||
@@ -4,8 +4,11 @@ import lombok.Getter;
|
||||
import lombok.Setter;
|
||||
import lombok.experimental.Accessors;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
@Accessors(chain = true)
|
||||
@Getter
|
||||
@Setter
|
||||
public class File extends DirEntry {
|
||||
Map<Long, String> chunks;
|
||||
}
|
||||
|
||||
@@ -1,15 +1,16 @@
|
||||
package com.usatiuk.dhfs.storage.files.service;
|
||||
|
||||
import com.usatiuk.dhfs.storage.files.api.*;
|
||||
import com.usatiuk.dhfs.storage.files.objects.DirEntry;
|
||||
import com.usatiuk.dhfs.storage.files.objects.Directory;
|
||||
import com.usatiuk.dhfs.storage.files.objects.File;
|
||||
import io.smallrye.mutiny.Uni;
|
||||
|
||||
import java.util.Optional;
|
||||
|
||||
public interface DhfsFileService {
|
||||
public Uni<FindFilesReply> findFiles(FindFilesRequest request);
|
||||
|
||||
public Uni<ReadFileReply> readFile(ReadFileRequest request) ;
|
||||
|
||||
public Uni<WriteFileReply> writeFile(WriteFileRequest request) ;
|
||||
|
||||
public Uni<DeleteFileReply> deleteFile(DeleteFileRequest request) ;
|
||||
public Uni<Optional<DirEntry>> getDirEntry(String name);
|
||||
public Uni<Optional<File>> open(String name);
|
||||
public Uni<Iterable<String>> readDir(String name);
|
||||
|
||||
public Uni<Directory> getRoot();
|
||||
}
|
||||
|
||||
@@ -1,14 +1,119 @@
|
||||
package com.usatiuk.dhfs.storage.files.service;
|
||||
|
||||
import com.usatiuk.dhfs.storage.files.objects.DirEntry;
|
||||
import com.usatiuk.dhfs.storage.files.objects.Directory;
|
||||
import com.usatiuk.dhfs.storage.files.objects.File;
|
||||
import com.usatiuk.dhfs.storage.objects.repository.ObjectRepository;
|
||||
import io.quarkus.logging.Log;
|
||||
import io.quarkus.runtime.Shutdown;
|
||||
import io.quarkus.runtime.StartupEvent;
|
||||
import io.smallrye.mutiny.Uni;
|
||||
import io.vertx.mutiny.core.Vertx;
|
||||
import jakarta.annotation.Priority;
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
import jakarta.enterprise.event.Observes;
|
||||
import jakarta.inject.Inject;
|
||||
import org.apache.commons.io.input.ClassLoaderObjectInputStream;
|
||||
import org.apache.commons.lang3.SerializationUtils;
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.file.Path;
|
||||
import java.util.Optional;
|
||||
import java.util.UUID;
|
||||
|
||||
@ApplicationScoped
|
||||
public class DhfsFileServiceImp {
|
||||
public class DhfsFileServiceImp implements DhfsFileService {
|
||||
@Inject
|
||||
Vertx vertx;
|
||||
@Inject
|
||||
ObjectRepository objectRepository;
|
||||
|
||||
final static String namespace = "dhfs_files";
|
||||
|
||||
void init(@Observes @Priority(300) StartupEvent event) {
|
||||
Log.info("Initializing file service");
|
||||
if (!objectRepository.existsObject(namespace, new UUID(0, 0).toString()).await().indefinitely()) {
|
||||
objectRepository.createNamespace(namespace).await().indefinitely();
|
||||
objectRepository.writeObject(namespace, new UUID(0, 0).toString(),
|
||||
ByteBuffer.wrap(SerializationUtils.serialize(
|
||||
new Directory().setUuid(new UUID(0, 0)))
|
||||
)).await().indefinitely();
|
||||
}
|
||||
getRoot().await().indefinitely();
|
||||
}
|
||||
|
||||
@Shutdown
|
||||
void shutdown() {
|
||||
Log.info("Shutdown file service");
|
||||
}
|
||||
|
||||
public static <T> T deserialize(final InputStream inputStream) {
|
||||
try (ClassLoaderObjectInputStream in = new ClassLoaderObjectInputStream(Thread.currentThread().getContextClassLoader(), inputStream)) {
|
||||
final T obj = (T) in.readObject();
|
||||
return obj;
|
||||
} catch (IOException | ClassNotFoundException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
public static <T> T deserialize(final byte[] objectData) {
|
||||
return deserialize(new ByteArrayInputStream(objectData));
|
||||
}
|
||||
|
||||
private Uni<DirEntry> readDirEntry(String uuid) {
|
||||
return objectRepository.readObject(namespace, uuid)
|
||||
.map(o -> deserialize(o.getData().array()));
|
||||
}
|
||||
|
||||
private Uni<Optional<DirEntry>> traverse(Directory from, Path path) {
|
||||
if (path.getNameCount() == 0) return Uni.createFrom().item(Optional.of(from));
|
||||
for (var el : from.getChildren()) {
|
||||
if (el.getLeft().equals(path.getName(0).toString())) {
|
||||
var ref = readDirEntry(el.getRight().toString()).await().indefinitely();
|
||||
if (ref instanceof Directory) {
|
||||
return traverse((Directory) ref, path.subpath(1, path.getNameCount()));
|
||||
} else {
|
||||
return Uni.createFrom().item(Optional.empty());
|
||||
}
|
||||
}
|
||||
}
|
||||
return Uni.createFrom().item(Optional.empty());
|
||||
}
|
||||
|
||||
@Override
|
||||
public Uni<Optional<DirEntry>> getDirEntry(String name) {
|
||||
var root = getRoot().await().indefinitely();
|
||||
var found = traverse(root, Path.of(name)).await().indefinitely();
|
||||
return Uni.createFrom().item(found);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Uni<Optional<File>> open(String name) {
|
||||
// FIXME:
|
||||
var root = getRoot().await().indefinitely();
|
||||
var found = traverse(root, Path.of(name)).await().indefinitely();
|
||||
if (found.isEmpty()) return Uni.createFrom().item(Optional.empty());
|
||||
if (!(found.get() instanceof File)) return Uni.createFrom().item(Optional.empty());
|
||||
return Uni.createFrom().item(Optional.of((File) found.get()));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Uni<Iterable<String>> readDir(String name) {
|
||||
var root = getRoot().await().indefinitely();
|
||||
var found = traverse(root, Path.of(name)).await().indefinitely();
|
||||
if (found.isEmpty()) throw new IllegalArgumentException();
|
||||
if (!(found.get() instanceof Directory)) throw new IllegalArgumentException();
|
||||
|
||||
var foundDir = (Directory) found.get();
|
||||
return Uni.createFrom().item(foundDir.getChildren().stream().map(Pair::getLeft).toList());
|
||||
}
|
||||
|
||||
@Override
|
||||
public Uni<Directory> getRoot() {
|
||||
return readDirEntry(new UUID(0, 0).toString()).map(d -> (Directory) d);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,25 +1,39 @@
|
||||
package com.usatiuk.dhfs.storage.fuse;
|
||||
|
||||
import com.usatiuk.dhfs.storage.files.objects.DirEntry;
|
||||
import com.usatiuk.dhfs.storage.files.objects.Directory;
|
||||
import com.usatiuk.dhfs.storage.files.objects.File;
|
||||
import com.usatiuk.dhfs.storage.files.service.DhfsFileService;
|
||||
import io.quarkus.logging.Log;
|
||||
import io.quarkus.runtime.Shutdown;
|
||||
import io.quarkus.runtime.StartupEvent;
|
||||
import jakarta.annotation.Priority;
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
import jakarta.enterprise.event.Observes;
|
||||
import jakarta.inject.Inject;
|
||||
import jnr.ffi.Pointer;
|
||||
import org.eclipse.microprofile.config.inject.ConfigProperty;
|
||||
import ru.serce.jnrfuse.ErrorCodes;
|
||||
import ru.serce.jnrfuse.FuseFillDir;
|
||||
import ru.serce.jnrfuse.FuseStubFS;
|
||||
import ru.serce.jnrfuse.struct.FileStat;
|
||||
import ru.serce.jnrfuse.struct.FuseFileInfo;
|
||||
|
||||
import java.nio.file.Paths;
|
||||
import java.time.Duration;
|
||||
import java.util.Optional;
|
||||
|
||||
import static jnr.posix.FileStat.S_IFDIR;
|
||||
import static jnr.posix.FileStat.S_IFREG;
|
||||
|
||||
@ApplicationScoped
|
||||
public class DhfsFuse extends FuseStubFS {
|
||||
@ConfigProperty(name = "dhfs.fuse.root")
|
||||
String root;
|
||||
|
||||
@Inject
|
||||
DhfsFileService fileService;
|
||||
|
||||
void init(@Observes @Priority(100000) StartupEvent event) {
|
||||
Paths.get(root).toFile().mkdirs();
|
||||
Log.info("Mounting with root " + root);
|
||||
@@ -29,12 +43,29 @@ public class DhfsFuse extends FuseStubFS {
|
||||
|
||||
@Override
|
||||
public int getattr(String path, FileStat stat) {
|
||||
return super.getattr(path, stat);
|
||||
Optional<DirEntry> found;
|
||||
try {
|
||||
found = fileService.getDirEntry(path).await().indefinitely();
|
||||
} catch (Exception e) {
|
||||
Log.error(e);
|
||||
return ErrorCodes.ENOENT();
|
||||
}
|
||||
if (found.isEmpty()) return ErrorCodes.ENOENT();
|
||||
if (found.get() instanceof File) {
|
||||
stat.st_mode.set(S_IFREG | 0777);
|
||||
stat.st_nlink.set(1);
|
||||
stat.st_size.set(0);
|
||||
} else if (found.get() instanceof Directory) {
|
||||
stat.st_mode.set(S_IFDIR | 0777);
|
||||
stat.st_nlink.set(2);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int open(String path, FuseFileInfo fi) {
|
||||
return super.open(path, fi);
|
||||
if (fileService.open(path).await().indefinitely().isEmpty()) return ErrorCodes.ENOENT();
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -48,8 +79,22 @@ public class DhfsFuse extends FuseStubFS {
|
||||
}
|
||||
|
||||
@Override
|
||||
public int readdir(String path, Pointer buf, FuseFillDir filter, long offset, FuseFileInfo fi) {
|
||||
return super.readdir(path, buf, filter, offset, fi);
|
||||
public int readdir(String path, Pointer buf, FuseFillDir filler, long offset, FuseFileInfo fi) {
|
||||
Iterable<String> found;
|
||||
try {
|
||||
found = fileService.readDir(path).await().indefinitely();
|
||||
} catch (Exception e) {
|
||||
return ErrorCodes.ENOENT();
|
||||
}
|
||||
|
||||
filler.apply(buf, ".", null, 0);
|
||||
filler.apply(buf, "..", null, 0);
|
||||
|
||||
for (var c : found) {
|
||||
filler.apply(buf, c, null, 0);
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Shutdown
|
||||
|
||||
@@ -10,6 +10,8 @@ import java.nio.ByteBuffer;
|
||||
public interface ObjectRepository {
|
||||
@Nonnull
|
||||
public Multi<String> findObjects(String namespace, String prefix);
|
||||
@Nonnull
|
||||
public Uni<Boolean> existsObject(String namespace, String name);
|
||||
|
||||
@Nonnull
|
||||
public Uni<Object> readObject(String namespace, String name);
|
||||
|
||||
@@ -6,12 +6,14 @@ import io.grpc.Status;
|
||||
import io.grpc.StatusRuntimeException;
|
||||
import io.quarkus.logging.Log;
|
||||
import io.quarkus.runtime.Shutdown;
|
||||
import io.quarkus.runtime.Startup;
|
||||
import io.quarkus.runtime.StartupEvent;
|
||||
import io.smallrye.mutiny.Multi;
|
||||
import io.smallrye.mutiny.Uni;
|
||||
import io.vertx.mutiny.core.Vertx;
|
||||
import io.vertx.mutiny.core.buffer.Buffer;
|
||||
import jakarta.annotation.Priority;
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
import jakarta.enterprise.event.Observes;
|
||||
import jakarta.inject.Inject;
|
||||
import org.eclipse.microprofile.config.inject.ConfigProperty;
|
||||
|
||||
@@ -28,8 +30,7 @@ public class SimpleFileObjectRepository implements ObjectRepository {
|
||||
@Inject
|
||||
Vertx vertx;
|
||||
|
||||
@Startup
|
||||
void init() {
|
||||
void init(@Observes @Priority(200) StartupEvent event) {
|
||||
Paths.get(root).toFile().mkdirs();
|
||||
Log.info("Initializing with root " + root);
|
||||
}
|
||||
@@ -53,6 +54,17 @@ public class SimpleFileObjectRepository implements ObjectRepository {
|
||||
.map(f -> nsRoot.relativize(Paths.get(f)).toString());
|
||||
}
|
||||
|
||||
@Nonnull
|
||||
@Override
|
||||
public Uni<Boolean> existsObject(String namespace, String name) {
|
||||
Path obj = Paths.get(root, namespace, name);
|
||||
|
||||
if (!obj.toFile().isFile())
|
||||
return Uni.createFrom().item(false);
|
||||
|
||||
return Uni.createFrom().item(true);
|
||||
}
|
||||
|
||||
@Nonnull
|
||||
@Override
|
||||
public Uni<Object> readObject(String namespace, String name) {
|
||||
|
||||
Reference in New Issue
Block a user