some infrastructure for syncing

This commit is contained in:
2024-06-16 15:09:46 +02:00
parent a7ef7b2fe8
commit 8d6d7a5e29
28 changed files with 458 additions and 261 deletions

View File

@@ -0,0 +1,29 @@
package com.usatiuk.dhfs.storage;
import com.usatiuk.dhfs.storage.files.objects.File;
import org.apache.commons.io.input.ClassLoaderObjectInputStream;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.UUID;
public abstract class DeserializationHelper {
// Taken from SerializationUtils
public static <T> T deserialize(final InputStream inputStream) {
// Shitty hack to make it work with quarkus class loader
var shit = new File(new UUID(0, 0)).getClass().getClassLoader();
try (ClassLoaderObjectInputStream in = new ClassLoaderObjectInputStream(shit, inputStream)) {
final T obj = (T) in.readObject();
return obj;
} catch (IOException | ClassNotFoundException e) {
throw new RuntimeException(e);
}
}
public static <T> T deserialize(final byte[] objectData) {
return deserialize(new ByteArrayInputStream(objectData));
}
}

View File

@@ -16,6 +16,11 @@ public class Chunk extends JObject {
this._hash = DigestUtils.sha512Hex(bytes);
}
@Override
public boolean assumeUnique() {
return true;
}
@Override
public String getName() {
return _hash;

View File

@@ -7,7 +7,7 @@ import com.usatiuk.dhfs.storage.files.objects.FsNode;
import com.usatiuk.dhfs.storage.objects.jrepository.JObjectManager;
import com.usatiuk.dhfs.storage.objects.repository.ObjectRepository;
import io.quarkus.logging.Log;
import io.quarkus.runtime.Shutdown;
import io.quarkus.runtime.ShutdownEvent;
import io.quarkus.runtime.StartupEvent;
import io.smallrye.mutiny.Uni;
import io.vertx.mutiny.core.Vertx;
@@ -33,7 +33,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
final static String namespace = "dhfs_files";
void init(@Observes @Priority(300) StartupEvent event) {
void init(@Observes @Priority(500) StartupEvent event) {
Log.info("Initializing file service");
if (!objectRepository.existsObject(namespace, new UUID(0, 0).toString()).await().indefinitely()) {
objectRepository.createNamespace(namespace).await().indefinitely();
@@ -42,9 +42,8 @@ public class DhfsFileServiceImpl implements DhfsFileService {
getRoot().await().indefinitely();
}
@Shutdown
void shutdown() {
Log.info("Shutdown file service");
void shutdown(@Observes @Priority(100) ShutdownEvent event) {
Log.info("Shutdown");
}
private Uni<Optional<FsNode>> traverse(FsNode from, Path path) {
@@ -312,14 +311,14 @@ public class DhfsFileServiceImpl implements DhfsFileService {
var chunkBytes = chunkRead.get().getBytes();
Chunk newChunk = new Chunk(Arrays.copyOfRange(chunkBytes, 0, (int) (offset - first.getKey())));
jObjectManager.tryPut(namespace, newChunk).await().indefinitely();
jObjectManager.put(namespace, newChunk).await().indefinitely();
newChunks.put(first.getKey(), newChunk.getHash());
}
{
Chunk newChunk = new Chunk(data);
jObjectManager.tryPut(namespace, newChunk).await().indefinitely();
jObjectManager.put(namespace, newChunk).await().indefinitely();
newChunks.put(offset, newChunk.getHash());
}
@@ -338,7 +337,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
var startInFile = offset + data.length;
var startInChunk = startInFile - last.getKey();
Chunk newChunk = new Chunk(Arrays.copyOfRange(lchunkBytes, (int) startInChunk, lchunkBytes.length));
jObjectManager.tryPut(namespace, newChunk).await().indefinitely();
jObjectManager.put(namespace, newChunk).await().indefinitely();
newChunks.put(startInFile, newChunk.getHash());
}
@@ -417,7 +416,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
if (lastChunk.getKey() + chunkBytes.length > 0) {
int start = (int) (length - lastChunk.getKey());
Chunk newChunk = new Chunk(Arrays.copyOfRange(chunkBytes, 0, (int) (length - start)));
jObjectManager.tryPut(namespace, newChunk).await().indefinitely();
jObjectManager.put(namespace, newChunk).await().indefinitely();
newChunks.put(lastChunk.getKey(), newChunk.getHash());
}

View File

@@ -6,7 +6,7 @@ import com.usatiuk.dhfs.storage.files.objects.File;
import com.usatiuk.dhfs.storage.files.objects.FsNode;
import com.usatiuk.dhfs.storage.files.service.DhfsFileService;
import io.quarkus.logging.Log;
import io.quarkus.runtime.Shutdown;
import io.quarkus.runtime.ShutdownEvent;
import io.quarkus.runtime.StartupEvent;
import jakarta.annotation.Priority;
import jakarta.enterprise.context.ApplicationScoped;
@@ -47,6 +47,12 @@ public class DhfsFuse extends FuseStubFS {
new String[]{"-o", "direct_io", "-o", "uid=" + uid, "-o", "gid=" + gid});
}
void shutdown(@Observes @Priority(1) ShutdownEvent event) {
Log.info("Unmounting");
umount();
Log.info("Unmounted");
}
@Override
public int statfs(String path, Statvfs stbuf) {
//FIXME:
@@ -221,11 +227,4 @@ public class DhfsFuse extends FuseStubFS {
return 0;
}
@Shutdown
void shutdown() {
Log.info("Unmounting");
umount();
Log.info("Unmounted");
}
}

View File

@@ -5,6 +5,7 @@ import com.usatiuk.dhfs.storage.objects.data.Namespace;
import com.usatiuk.dhfs.storage.objects.data.Object;
import com.usatiuk.dhfs.storage.objects.repository.ObjectRepository;
import io.quarkus.grpc.GrpcService;
import io.smallrye.common.annotation.RunOnVirtualThread;
import io.smallrye.mutiny.Uni;
import jakarta.inject.Inject;
@@ -22,22 +23,25 @@ public class DhfsObjectGrpcService implements DhfsObjectGrpc {
}
@Override
@RunOnVirtualThread
public Uni<ReadObjectReply> readObject(ReadObjectRequest request) {
return objectRepository.readObject(request.getNamespace(), request.getName())
.map(n -> ReadObjectReply.newBuilder().setData(ByteString.copyFrom(n.getData())).build());
var read = objectRepository.readObject(request.getNamespace(), request.getName());
return Uni.createFrom().item(ReadObjectReply.newBuilder().setData(ByteString.copyFrom(read.getData())).build());
}
@Override
@RunOnVirtualThread
public Uni<WriteObjectReply> writeObject(WriteObjectRequest request) {
return objectRepository.writeObject(request.getNamespace(),
new Object(new Namespace(request.getNamespace()), request.getName(), request.getData().toByteArray()))
.map(n -> WriteObjectReply.newBuilder().build());
objectRepository.writeObject(request.getNamespace(),
new Object(new Namespace(request.getNamespace()), request.getName(), request.getData().toByteArray()));
return Uni.createFrom().item(WriteObjectReply.newBuilder().build());
}
@Override
@RunOnVirtualThread
public Uni<DeleteObjectReply> deleteObject(DeleteObjectRequest request) {
return objectRepository.deleteObject(request.getNamespace(), request.getName())
.map(n -> DeleteObjectReply.newBuilder().build());
objectRepository.deleteObject(request.getNamespace(), request.getName());
return Uni.createFrom().item(DeleteObjectReply.newBuilder().build());
}
@Override

View File

@@ -12,6 +12,10 @@ public abstract class JObject implements Serializable {
protected final ReadWriteLock _lock = new ReentrantReadWriteLock();
public boolean assumeUnique() {
return false;
}
@Serial
private void writeObject(ObjectOutputStream oos) throws IOException {
_lock.readLock().lock();

View File

@@ -7,6 +7,4 @@ import java.util.Optional;
public interface JObjectManager {
<T extends JObject> Uni<Optional<T>> get(String namespace, String key, Class<T> clazz);
<T extends JObject> Uni<Void> put(String namespace, T object);
// Returns the object from store if it existed, nothing otherwise
<T extends JObject> Uni<Optional<T>> tryPut(String namespace, T object);
}

View File

@@ -63,18 +63,18 @@ public class JObjectManagerImpl implements JObjectManager {
if (inMap != null) return Uni.createFrom().item(Optional.of(inMap));
}
return jObjectRepository.readJObjectChecked(namespace, key, clazz).map(read -> {
if (read.isEmpty())
return Optional.empty();
var read = jObjectRepository.readJObjectChecked(namespace, key, clazz);
synchronized (_map) {
var inMap = getFromMap(namespace, key, clazz);
if (inMap != null) return Optional.of(inMap);
_map.put(key, new NamedSoftReference(read.get(), _refQueue));
}
if (read.isEmpty())
return Uni.createFrom().item(Optional.empty());
return Optional.of(read.get());
});
synchronized (_map) {
var inMap = getFromMap(namespace, key, clazz);
if (inMap != null) return Uni.createFrom().item(Optional.of(inMap));
_map.put(key, new NamedSoftReference(read.get(), _refQueue));
}
return Uni.createFrom().item(Optional.of(read.get()));
}
@Override
@@ -83,27 +83,14 @@ public class JObjectManagerImpl implements JObjectManager {
synchronized (_map) {
var inMap = getFromMap(namespace, object.getName(), object.getClass());
if (inMap != null && inMap != object)
if (inMap != null && inMap != object && !object.assumeUnique())
throw new IllegalArgumentException("Trying to insert different object with same key");
else if (inMap == null)
_map.put(object.getName(), new NamedSoftReference(object, _refQueue));
}
return jObjectRepository.writeJObject(namespace, object);
}
@Override
public <T extends JObject> Uni<Optional<T>> tryPut(String namespace, T object) {
cleanup();
synchronized (_map) {
var inMap = getFromMap(namespace, object.getName(), object.getClass());
if (inMap != null) return Uni.createFrom().item(Optional.of((T) inMap));
else
_map.put(object.getName(), new NamedSoftReference(object, _refQueue));
}
return jObjectRepository.writeJObject(namespace, object).map(t -> Optional.empty());
jObjectRepository.writeJObject(namespace, object);
return Uni.createFrom().voidItem();
}
}

View File

@@ -1,15 +1,13 @@
package com.usatiuk.dhfs.storage.objects.jrepository;
import io.smallrye.mutiny.Uni;
import javax.annotation.Nonnull;
import java.util.Optional;
public interface JObjectRepository {
@Nonnull
Uni<Optional<JObject>> readJObject(String namespace, String name);
Optional<JObject> readJObject(String namespace, String name);
@Nonnull
<T extends JObject> Uni<Optional<T>> readJObjectChecked(String namespace, String name, Class<T> clazz);
<T extends JObject> Optional<T> readJObjectChecked(String namespace, String name, Class<T> clazz);
@Nonnull
Uni<Void> writeJObject(String namespace, JObject object);
void writeJObject(String namespace, JObject object);
}

View File

@@ -1,19 +1,16 @@
package com.usatiuk.dhfs.storage.objects.jrepository;
import com.usatiuk.dhfs.storage.DeserializationHelper;
import com.usatiuk.dhfs.storage.objects.data.Namespace;
import com.usatiuk.dhfs.storage.objects.data.Object;
import com.usatiuk.dhfs.storage.objects.repository.ObjectRepository;
import io.quarkus.logging.Log;
import io.smallrye.mutiny.Uni;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import org.apache.commons.io.input.ClassLoaderObjectInputStream;
import org.apache.commons.lang3.SerializationUtils;
import javax.annotation.Nonnull;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Optional;
@ApplicationScoped
@@ -21,54 +18,38 @@ public class JObjectRepositoryImpl implements JObjectRepository {
@Inject
ObjectRepository objectRepository;
// Taken from SerializationUtils
private static <T> T deserialize(final InputStream inputStream) {
try (ClassLoaderObjectInputStream in = new ClassLoaderObjectInputStream(Thread.currentThread().getContextClassLoader(), inputStream)) {
final T obj = (T) in.readObject();
return obj;
} catch (IOException | ClassNotFoundException e) {
throw new RuntimeException(e);
@Nonnull
@Override
public Optional<JObject> readJObject(String namespace, String name) {
var read = objectRepository.readObject(namespace, name);
java.lang.Object obj = DeserializationHelper.deserialize(read.getData());
if (!(obj instanceof JObject)) {
Log.error("Read object is not a JObject: " + namespace + "/" + name);
return Optional.empty();
}
}
private static <T> T deserialize(final byte[] objectData) {
return deserialize(new ByteArrayInputStream(objectData));
return Optional.of((JObject) obj);
}
@Nonnull
@Override
public Uni<Optional<JObject>> readJObject(String namespace, String name) {
return objectRepository.readObject(namespace, name).map(o -> {
java.lang.Object obj = deserialize(new ByteArrayInputStream(o.getData()));
if (!(obj instanceof JObject)) {
Log.error("Read object is not a JObject: " + namespace + "/" + name);
return Optional.empty();
}
return Optional.of((JObject) obj);
});
public <T extends JObject> Optional<T> readJObjectChecked(String namespace, String name, Class<T> clazz) {
var read = readJObject(namespace, name);
if (read.isEmpty()) return Optional.empty();
if (!clazz.isAssignableFrom(read.get().getClass())) {
Log.error("Read object type mismatch: " + namespace + "/" + name);
return Optional.empty();
}
return Optional.of((T) read.get());
}
@Nonnull
@Override
public <T extends JObject> Uni<Optional<T>> readJObjectChecked(String namespace, String name, Class<T> clazz) {
return readJObject(namespace, name).map(o -> {
if (o.isEmpty()) return Optional.empty();
if (!clazz.isAssignableFrom(o.get().getClass())) {
Log.error("Read object type mismatch: " + namespace + "/" + name);
return Optional.empty();
}
return Optional.of((T) o.get());
});
}
@Nonnull
@Override
public Uni<Void> writeJObject(String namespace, JObject object) {
public void writeJObject(String namespace, JObject object) {
final var obj = new Object(
new Namespace(namespace),
object.getName(),
SerializationUtils.serialize(object));
return objectRepository.writeObject(namespace, obj);
objectRepository.writeObject(namespace, obj);
}
}

View File

@@ -14,11 +14,11 @@ public interface ObjectRepository {
Uni<Boolean> existsObject(String namespace, String name);
@Nonnull
Uni<Object> readObject(String namespace, String name);
Object readObject(String namespace, String name);
@Nonnull
Uni<Void> writeObject(String namespace, Object object);
void writeObject(String namespace, Object object);
@Nonnull
Uni<Void> deleteObject(String namespace, String name);
void deleteObject(String namespace, String name);
@Nonnull
Uni<Void> createNamespace(String namespace);

View File

@@ -1,116 +0,0 @@
package com.usatiuk.dhfs.storage.objects.repository;
import com.usatiuk.dhfs.storage.objects.data.Namespace;
import com.usatiuk.dhfs.storage.objects.data.Object;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.quarkus.logging.Log;
import io.quarkus.runtime.Shutdown;
import io.quarkus.runtime.StartupEvent;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.vertx.mutiny.core.Vertx;
import io.vertx.mutiny.core.buffer.Buffer;
import jakarta.annotation.Priority;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
import jakarta.inject.Inject;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import javax.annotation.Nonnull;
import java.nio.file.Path;
import java.nio.file.Paths;
@ApplicationScoped
public class SimpleFileObjectRepository implements ObjectRepository {
@ConfigProperty(name = "dhfs.filerepo.root")
String root;
@Inject
Vertx vertx;
void init(@Observes @Priority(200) StartupEvent event) {
Paths.get(root).toFile().mkdirs();
Log.info("Initializing with root " + root);
}
@Shutdown
void shutdown() {
Log.info("Shutdown");
}
@Nonnull
@Override
public Multi<String> findObjects(String namespace, String prefix) {
Path nsRoot = Paths.get(root, namespace);
if (!nsRoot.toFile().isDirectory())
throw new StatusRuntimeException(Status.NOT_FOUND);
return vertx.fileSystem().readDir(nsRoot.toString()).onItem()
.transformToMulti(v -> Multi.createFrom().iterable(v))
.select().where(n -> n.startsWith(prefix))
.map(f -> nsRoot.relativize(Paths.get(f)).toString());
}
@Nonnull
@Override
public Uni<Boolean> existsObject(String namespace, String name) {
Path obj = Paths.get(root, namespace, name);
if (!obj.toFile().isFile())
return Uni.createFrom().item(false);
return Uni.createFrom().item(true);
}
@Nonnull
@Override
public Uni<Object> readObject(String namespace, String name) {
var file = Path.of(root, namespace, name);
if (!file.toFile().exists())
throw new StatusRuntimeException(Status.NOT_FOUND);
return vertx.fileSystem().readFile(file.toString()).map(r -> new Object(new Namespace(namespace), name, r.getBytes()));
}
@Nonnull
@Override
public Uni<Void> writeObject(String namespace, Object object) {
var file = Path.of(root, namespace, object.getName());
return vertx.fileSystem().writeFile(file.toString(), Buffer.buffer(object.getData()));
}
@Nonnull
@Override
public Uni<Void> deleteObject(String namespace, String name) {
var file = Path.of(root, namespace, name);
if (!file.toFile().exists())
throw new StatusRuntimeException(Status.NOT_FOUND);
return vertx.fileSystem().delete(file.toString());
}
@Nonnull
@Override
public Uni<Void> createNamespace(String namespace) {
if (Paths.get(root, namespace).toFile().exists())
return Uni.createFrom().voidItem();
if (!Paths.get(root, namespace).toFile().mkdirs())
throw new StatusRuntimeException(Status.INTERNAL);
return Uni.createFrom().voidItem();
}
@Nonnull
@Override
public Uni<Void> deleteNamespace(String namespace) {
if (!Paths.get(root, namespace).toFile().exists())
throw new StatusRuntimeException(Status.NOT_FOUND);
if (!Paths.get(root, namespace).toFile().delete())
throw new StatusRuntimeException(Status.INTERNAL);
return Uni.createFrom().voidItem();
}
}

View File

@@ -2,14 +2,15 @@ package com.usatiuk.dhfs.storage.objects.repository;
import com.usatiuk.dhfs.storage.objects.data.Object;
import com.usatiuk.dhfs.storage.objects.repository.persistence.ObjectPersistentStore;
import io.smallrye.common.annotation.RunOnVirtualThread;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import org.apache.commons.lang3.NotImplementedException;
import javax.annotation.Nonnull;
@ApplicationScoped
//@ApplicationScoped
public class SimplePersistentObjectRepository implements ObjectRepository {
@Inject
ObjectPersistentStore objectPersistentStore;
@@ -28,31 +29,31 @@ public class SimplePersistentObjectRepository implements ObjectRepository {
@Nonnull
@Override
public Uni<Object> readObject(String namespace, String name) {
return objectPersistentStore.readObject(namespace, name);
public Object readObject(String namespace, String name) {
return objectPersistentStore.readObject(namespace, name).await().indefinitely();
}
@Nonnull
@Override
public Uni<Void> writeObject(String namespace, Object object) {
return objectPersistentStore.writeObject(namespace, object);
public void writeObject(String namespace, Object object) {
objectPersistentStore.writeObject(namespace, object).await().indefinitely();
}
@Nonnull
@Override
public Uni<Void> deleteObject(String namespace, String name) {
return objectPersistentStore.deleteObject(namespace, name);
public void deleteObject(String namespace, String name) {
objectPersistentStore.deleteObject(namespace, name).await().indefinitely();
}
@Nonnull
@Override
public Uni<Void> createNamespace(String namespace) {
return objectPersistentStore.createNamespace(namespace);
return Uni.createFrom().voidItem();
}
@Nonnull
@Override
public Uni<Void> deleteNamespace(String namespace) {
return objectPersistentStore.deleteNamespace(namespace);
throw new NotImplementedException();
}
}

View File

@@ -0,0 +1,104 @@
package com.usatiuk.dhfs.storage.objects.repository.distributed;
import com.usatiuk.dhfs.storage.objects.data.Object;
import com.usatiuk.dhfs.storage.objects.repository.ObjectRepository;
import com.usatiuk.dhfs.storage.objects.repository.persistence.ObjectPersistentStore;
import io.quarkus.runtime.ShutdownEvent;
import io.quarkus.runtime.StartupEvent;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.vertx.mutiny.core.Vertx;
import jakarta.annotation.Priority;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
import jakarta.inject.Inject;
import org.apache.commons.lang3.NotImplementedException;
import javax.annotation.Nonnull;
import java.io.IOException;
@ApplicationScoped
public class DistributedObjectRepository implements ObjectRepository {
@Inject
Vertx vertx;
@Inject
ObjectPersistentStore objectPersistentStore;
@Inject
ObjectIndexService objectIndexService;
@Inject
RemoteObjectServiceClient remoteObjectServiceClient;
void init(@Observes @Priority(400) StartupEvent event) throws IOException {
}
void shutdown(@Observes @Priority(200) ShutdownEvent event) throws IOException {
}
@Nonnull
@Override
public Multi<String> findObjects(String namespace, String prefix) {
throw new NotImplementedException();
}
@Nonnull
@Override
public Uni<Boolean> existsObject(String namespace, String name) {
return Uni.createFrom().item(objectIndexService.exists(namespace, name));
}
@Nonnull
@Override
public Object readObject(String namespace, String name) {
if (!objectIndexService.exists(namespace, name))
throw new IllegalArgumentException("Object " + name + " doesn't exist");
var infoOpt = objectIndexService.getMeta(namespace, name);
if (infoOpt.isEmpty()) throw new IllegalArgumentException("Object " + name + " doesn't exist");
var info = infoOpt.get();
return info.runReadLocked(() -> {
if (objectPersistentStore.existsObject(namespace, name).await().indefinitely())
return objectPersistentStore.readObject(namespace, name).await().indefinitely();
return remoteObjectServiceClient.getObject(namespace, name).map(got -> {
objectPersistentStore.writeObject(namespace, got);
return got;
}).await().indefinitely();
});
}
@Nonnull
@Override
public void writeObject(String namespace, Object object) {
var info = objectIndexService.getOrCreateMeta(namespace, object.getName());
info.runWriteLocked(() -> {
objectPersistentStore.writeObject(namespace, object).await().indefinitely();
info.setMtime(System.currentTimeMillis());
remoteObjectServiceClient.notifyUpdate(namespace, object.getName()).await().indefinitely();
return null;
});
}
@Nonnull
@Override
public void deleteObject(String namespace, String name) {
throw new NotImplementedException();
}
@Nonnull
@Override
public Uni<Void> createNamespace(String namespace) {
return Uni.createFrom().voidItem();
}
@Nonnull
@Override
public Uni<Void> deleteNamespace(String namespace) {
throw new NotImplementedException();
}
}

View File

@@ -0,0 +1,11 @@
package com.usatiuk.dhfs.storage.objects.repository.distributed;
import lombok.Getter;
import lombok.Setter;
@Getter
@Setter
public class HostInfo {
String name;
String ip;
}

View File

@@ -0,0 +1,70 @@
package com.usatiuk.dhfs.storage.objects.repository.distributed;
import lombok.Getter;
import org.apache.commons.lang3.tuple.ImmutablePair;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class ObjectIndex implements Serializable {
@Getter
final Map<ImmutablePair<String, String>, ObjectMeta> _objectMetaMap = new HashMap<>();
private final ReadWriteLock _lock = new ReentrantReadWriteLock();
public <R> R runReadLocked(Callable<R> fn) {
_lock.readLock().lock();
try {
return fn.call();
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
_lock.readLock().unlock();
}
}
public <R> R runWriteLocked(Callable<R> fn) {
_lock.readLock().lock();
try {
return fn.call();
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
_lock.readLock().unlock();
}
}
public boolean exists(String namespace, String name) {
return runReadLocked(() -> {
return _objectMetaMap.containsKey(new ImmutablePair<>(namespace, name));
});
}
public Optional<ObjectMeta> get(String namespace, String name) {
return runReadLocked(() -> {
if (_objectMetaMap.containsKey(new ImmutablePair<>(namespace, name))) {
return Optional.of(_objectMetaMap.get(new ImmutablePair<>(namespace, name)));
} else {
return Optional.empty();
}
});
}
public ObjectMeta getOrCreate(String namespace, String name) {
return runWriteLocked(() -> {
if (_objectMetaMap.containsKey(new ImmutablePair<>(namespace, name))) {
return _objectMetaMap.get(new ImmutablePair<>(namespace, name));
} else {
var newObjectMeta = new ObjectMeta(namespace, name);
_objectMetaMap.put(new ImmutablePair<>(namespace, name), newObjectMeta);
return newObjectMeta;
}
});
}
}

View File

@@ -0,0 +1,52 @@
package com.usatiuk.dhfs.storage.objects.repository.distributed;
import com.usatiuk.dhfs.storage.DeserializationHelper;
import io.quarkus.logging.Log;
import io.quarkus.runtime.ShutdownEvent;
import io.quarkus.runtime.StartupEvent;
import jakarta.annotation.Priority;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
import org.apache.commons.lang3.SerializationUtils;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Optional;
@ApplicationScoped
public class ObjectIndexService {
ObjectIndex _index = new ObjectIndex();
@ConfigProperty(name = "dhfs.objects.distributed.root")
String metaRoot;
final String metaFileName = "meta";
void init(@Observes @Priority(300) StartupEvent event) throws IOException {
Paths.get(metaRoot).toFile().mkdirs();
Log.info("Initializing with root " + metaRoot);
if (Paths.get(metaRoot).resolve(metaFileName).toFile().exists()) {
Log.info("Reading index");
_index = DeserializationHelper.deserialize(Files.readAllBytes(Paths.get(metaRoot).resolve(metaFileName)));
}
}
void shutdown(@Observes @Priority(300) ShutdownEvent event) throws IOException {
Log.info("Saving index");
Files.write(Paths.get(metaRoot).resolve(metaFileName), SerializationUtils.serialize(_index));
Log.info("Shutdown");
}
public boolean exists(String namespace, String name) {
return _index.exists(namespace, name);
}
public Optional<ObjectMeta> getMeta(String namespace, String name) {
return _index.get(namespace, name);
}
public ObjectMeta getOrCreateMeta(String namespace, String name) {
return _index.getOrCreate(namespace, name);
}
}

View File

@@ -0,0 +1,54 @@
package com.usatiuk.dhfs.storage.objects.repository.distributed;
import lombok.Getter;
import lombok.Setter;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class ObjectMeta implements Serializable {
public ObjectMeta(String namespace, String name) {
this._namespace = namespace;
this._name = name;
}
private final ReadWriteLock _lock = new ReentrantReadWriteLock();
@Getter
final String _namespace;
@Getter
final String _name;
@Getter
@Setter
long _mtime;
@Getter
final List<HostInfo> _remoteCopies = new ArrayList<>();
public <R> R runReadLocked(Callable<R> fn) {
_lock.readLock().lock();
try {
return fn.call();
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
_lock.readLock().unlock();
}
}
public <R> R runWriteLocked(Callable<R> fn) {
_lock.readLock().lock();
try {
return fn.call();
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
_lock.readLock().unlock();
}
}
}

View File

@@ -0,0 +1,7 @@
package com.usatiuk.dhfs.storage.objects.repository.distributed;
import jakarta.enterprise.context.ApplicationScoped;
@ApplicationScoped
public class RemoteHostManager {
}

View File

@@ -0,0 +1,20 @@
package com.usatiuk.dhfs.storage.objects.repository.distributed;
import com.usatiuk.dhfs.storage.objects.data.Object;
import io.smallrye.mutiny.Uni;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
@ApplicationScoped
public class RemoteObjectServiceClient {
@Inject
ObjectIndexService objectIndexService;
public Uni<Object> getObject(String namespace, String name) {
return Uni.createFrom().item(null);
}
public Uni<Boolean> notifyUpdate(String namespace, String name) {
return Uni.createFrom().item(true);
}
}

View File

@@ -0,0 +1,7 @@
package com.usatiuk.dhfs.storage.objects.repository.distributed;
import io.quarkus.grpc.GrpcService;
@GrpcService
public class RemoteObjectServiceServer {
}

View File

@@ -5,7 +5,7 @@ import com.usatiuk.dhfs.storage.objects.data.Object;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.quarkus.logging.Log;
import io.quarkus.runtime.Shutdown;
import io.quarkus.runtime.ShutdownEvent;
import io.quarkus.runtime.StartupEvent;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
@@ -34,8 +34,7 @@ public class FileObjectPersistentStore implements ObjectPersistentStore {
Log.info("Initializing with root " + root);
}
@Shutdown
void shutdown() {
void shutdown(@Observes @Priority(400) ShutdownEvent event) {
Log.info("Shutdown");
}
@@ -80,6 +79,10 @@ public class FileObjectPersistentStore implements ObjectPersistentStore {
public Uni<Void> writeObject(String namespace, Object object) {
var file = Path.of(root, namespace, object.getName());
if (!Paths.get(root, namespace).toFile().isDirectory()
&& !Paths.get(root, namespace).toFile().mkdirs())
throw new StatusRuntimeException(Status.INTERNAL);
return vertx.fileSystem().writeFile(file.toString(), Buffer.buffer(object.getData()));
}
@@ -93,24 +96,4 @@ public class FileObjectPersistentStore implements ObjectPersistentStore {
return vertx.fileSystem().delete(file.toString());
}
@Nonnull
@Override
public Uni<Void> createNamespace(String namespace) {
if (Paths.get(root, namespace).toFile().exists())
return Uni.createFrom().voidItem();
if (!Paths.get(root, namespace).toFile().mkdirs())
throw new StatusRuntimeException(Status.INTERNAL);
return Uni.createFrom().voidItem();
}
@Nonnull
@Override
public Uni<Void> deleteNamespace(String namespace) {
if (!Paths.get(root, namespace).toFile().exists())
throw new StatusRuntimeException(Status.NOT_FOUND);
if (!Paths.get(root, namespace).toFile().delete())
throw new StatusRuntimeException(Status.INTERNAL);
return Uni.createFrom().voidItem();
}
}

View File

@@ -18,9 +18,4 @@ public interface ObjectPersistentStore {
Uni<Void> writeObject(String namespace, Object object);
@Nonnull
Uni<Void> deleteObject(String namespace, String name);
@Nonnull
Uni<Void> createNamespace(String namespace);
@Nonnull
Uni<Void> deleteNamespace(String namespace);
}

View File

@@ -1,3 +1,4 @@
quarkus.grpc.server.use-separate-server=false
dhfs.objects.persistence.files.root=${HOME}/dhfs_data/dhfs_root
dhfs.objects.distributed.root=${HOME}/dhfs_data/dhfs_root_d
dhfs.fuse.root=${HOME}/dhfs_data/dhfs_fuse_root

View File

@@ -50,10 +50,10 @@ public class DhfsFileServiceSimpleTest extends SimpleFileRepoTest {
objectRepository.createNamespace("dhfs_files");
jObjectRepository.writeJObject("dhfs_files", c1).await().indefinitely();
jObjectRepository.writeJObject("dhfs_files", c2).await().indefinitely();
jObjectRepository.writeJObject("dhfs_files", c3).await().indefinitely();
jObjectRepository.writeJObject("dhfs_files", f).await().indefinitely();
jObjectRepository.writeJObject("dhfs_files", c1);
jObjectRepository.writeJObject("dhfs_files", c2);
jObjectRepository.writeJObject("dhfs_files", c3);
jObjectRepository.writeJObject("dhfs_files", f);
}
String all = "1234567891011";

View File

@@ -36,9 +36,9 @@ class DhfsObjectGrpcServiceTest extends SimpleFileRepoTest {
ReadObjectRequest.newBuilder().setNamespace("testns").setName("cool_file").build())
.await().atMost(Duration.ofSeconds(5));
Assertions.assertArrayEquals(read.getData().toByteArray(), "Hello world".getBytes());
var found = dhfsObjectGrpc.findObjects(FindObjectsRequest.newBuilder().setNamespace("testns").build())
.await().atMost(Duration.ofSeconds(5));
Assertions.assertIterableEquals(found.getFoundList().stream().map(l -> l.getName()).toList(), List.of("cool_file"));
// var found = dhfsObjectGrpc.findObjects(FindObjectsRequest.newBuilder().setNamespace("testns").build())
// .await().atMost(Duration.ofSeconds(5));
// Assertions.assertIterableEquals(found.getFoundList().stream().map(l -> l.getName()).toList(), List.of("cool_file"));
}
}

View File

@@ -12,6 +12,8 @@ import java.util.Objects;
public abstract class SimpleFileRepoTest {
@ConfigProperty(name = "dhfs.objects.persistence.files.root")
String tempDirectory;
@ConfigProperty(name = "dhfs.objects.distributed.root")
String tempDirectoryIdx;
void purgeDirectory(File dir) {
for (File file : Objects.requireNonNull(dir.listFiles())) {
@@ -24,5 +26,6 @@ public abstract class SimpleFileRepoTest {
@AfterEach
void teardown() {
purgeDirectory(Path.of(tempDirectory).toFile());
purgeDirectory(Path.of(tempDirectoryIdx).toFile());
}
}

View File

@@ -1,2 +1,3 @@
dhfs.objects.persistence.files.root=${HOME}/dhfs_data/dhfs_root_test
dhfs.objects.distributed.root=${HOME}/dhfs_data/dhfs_root_d_test
dhfs.fuse.root=${HOME}/dhfs_data/dhfs_fuse_root_test