mirror of
https://github.com/usatiuk/dhfs.git
synced 2025-10-28 20:47:49 +01:00
get rid of namespace
This commit is contained in:
@@ -6,16 +6,12 @@ import org.apache.commons.io.input.ClassLoaderObjectInputStream;
|
|||||||
import java.io.ByteArrayInputStream;
|
import java.io.ByteArrayInputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
import java.util.UUID;
|
|
||||||
|
|
||||||
public abstract class DeserializationHelper {
|
public abstract class DeserializationHelper {
|
||||||
|
|
||||||
// Taken from SerializationUtils
|
// Taken from SerializationUtils
|
||||||
public static <T> T deserialize(final InputStream inputStream) {
|
public static <T> T deserialize(final InputStream inputStream) {
|
||||||
// Shitty hack to make it work with quarkus class loader
|
try (ClassLoaderObjectInputStream in = new ClassLoaderObjectInputStream(File.class.getClassLoader(), inputStream)) {
|
||||||
var shit = new File(new UUID(0, 0)).getClass().getClassLoader();
|
|
||||||
|
|
||||||
try (ClassLoaderObjectInputStream in = new ClassLoaderObjectInputStream(shit, inputStream)) {
|
|
||||||
final T obj = (T) in.readObject();
|
final T obj = (T) in.readObject();
|
||||||
return obj;
|
return obj;
|
||||||
} catch (IOException | ClassNotFoundException e) {
|
} catch (IOException | ClassNotFoundException e) {
|
||||||
|
|||||||
@@ -35,9 +35,8 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
|||||||
|
|
||||||
void init(@Observes @Priority(500) StartupEvent event) {
|
void init(@Observes @Priority(500) StartupEvent event) {
|
||||||
Log.info("Initializing file service");
|
Log.info("Initializing file service");
|
||||||
if (!objectRepository.existsObject(namespace, new UUID(0, 0).toString()).await().indefinitely()) {
|
if (!objectRepository.existsObject(namespace + new UUID(0, 0)).await().indefinitely()) {
|
||||||
objectRepository.createNamespace(namespace).await().indefinitely();
|
jObjectManager.put(new Directory(new UUID(0, 0), 0755)).await().indefinitely();
|
||||||
jObjectManager.put(namespace, new Directory(new UUID(0, 0), 0755)).await().indefinitely();
|
|
||||||
}
|
}
|
||||||
getRoot().await().indefinitely();
|
getRoot().await().indefinitely();
|
||||||
}
|
}
|
||||||
@@ -58,7 +57,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
|||||||
if (found.isEmpty())
|
if (found.isEmpty())
|
||||||
return Uni.createFrom().item(Optional.empty());
|
return Uni.createFrom().item(Optional.empty());
|
||||||
|
|
||||||
var ref = jObjectManager.get(namespace, found.get().toString(), FsNode.class)
|
var ref = jObjectManager.get(found.get().toString(), FsNode.class)
|
||||||
.await().indefinitely();
|
.await().indefinitely();
|
||||||
|
|
||||||
if (ref.isEmpty()) {
|
if (ref.isEmpty()) {
|
||||||
@@ -106,12 +105,12 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
|||||||
File f = new File(fuuid);
|
File f = new File(fuuid);
|
||||||
f.setMode(mode);
|
f.setMode(mode);
|
||||||
|
|
||||||
jObjectManager.put(namespace, f).await().indefinitely();
|
jObjectManager.put(f).await().indefinitely();
|
||||||
|
|
||||||
if (!dir.putKid(Path.of(name).getFileName().toString(), fuuid))
|
if (!dir.putKid(Path.of(name).getFileName().toString(), fuuid))
|
||||||
return Uni.createFrom().item(Optional.empty());
|
return Uni.createFrom().item(Optional.empty());
|
||||||
|
|
||||||
jObjectManager.put(namespace, dir).await().indefinitely();
|
jObjectManager.put(dir).await().indefinitely();
|
||||||
|
|
||||||
return Uni.createFrom().item(Optional.of(f));
|
return Uni.createFrom().item(Optional.of(f));
|
||||||
}
|
}
|
||||||
@@ -129,10 +128,10 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
|||||||
Directory d = new Directory(duuid);
|
Directory d = new Directory(duuid);
|
||||||
d.setMode(mode);
|
d.setMode(mode);
|
||||||
|
|
||||||
jObjectManager.put(namespace, d).await().indefinitely();
|
jObjectManager.put(d).await().indefinitely();
|
||||||
if (!dir.putKid(Path.of(name).getFileName().toString(), duuid))
|
if (!dir.putKid(Path.of(name).getFileName().toString(), duuid))
|
||||||
return Uni.createFrom().item(Optional.empty());
|
return Uni.createFrom().item(Optional.empty());
|
||||||
jObjectManager.put(namespace, dir).await().indefinitely();
|
jObjectManager.put(dir).await().indefinitely();
|
||||||
|
|
||||||
return Uni.createFrom().item(Optional.of(d));
|
return Uni.createFrom().item(Optional.of(d));
|
||||||
}
|
}
|
||||||
@@ -146,7 +145,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
|||||||
if (!(found.get() instanceof Directory dir)) return Uni.createFrom().item(false);
|
if (!(found.get() instanceof Directory dir)) return Uni.createFrom().item(false);
|
||||||
|
|
||||||
var removed = dir.removeKid(Path.of(name).getFileName().toString());
|
var removed = dir.removeKid(Path.of(name).getFileName().toString());
|
||||||
if (removed) jObjectManager.put(namespace, dir).await().indefinitely();
|
if (removed) jObjectManager.put(dir).await().indefinitely();
|
||||||
|
|
||||||
return Uni.createFrom().item(removed);
|
return Uni.createFrom().item(removed);
|
||||||
}
|
}
|
||||||
@@ -176,7 +175,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
|||||||
|
|
||||||
if (!dir.putKid(Path.of(to).getFileName().toString(), dent.get().getUuid()))
|
if (!dir.putKid(Path.of(to).getFileName().toString(), dent.get().getUuid()))
|
||||||
return Uni.createFrom().item(false);
|
return Uni.createFrom().item(false);
|
||||||
jObjectManager.put(namespace, dir).await().indefinitely();
|
jObjectManager.put(dir).await().indefinitely();
|
||||||
|
|
||||||
return Uni.createFrom().item(true);
|
return Uni.createFrom().item(true);
|
||||||
}
|
}
|
||||||
@@ -188,7 +187,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
|||||||
|
|
||||||
dent.get().setMode(mode);
|
dent.get().setMode(mode);
|
||||||
|
|
||||||
jObjectManager.put(namespace, dent.get()).await().indefinitely();
|
jObjectManager.put(dent.get()).await().indefinitely();
|
||||||
|
|
||||||
return Uni.createFrom().item(true);
|
return Uni.createFrom().item(true);
|
||||||
}
|
}
|
||||||
@@ -205,7 +204,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Uni<Optional<byte[]>> read(String fileUuid, long offset, int length) {
|
public Uni<Optional<byte[]>> read(String fileUuid, long offset, int length) {
|
||||||
var fileOpt = jObjectManager.get(namespace, fileUuid, File.class).await().indefinitely();
|
var fileOpt = jObjectManager.get(fileUuid, File.class).await().indefinitely();
|
||||||
if (fileOpt.isEmpty()) {
|
if (fileOpt.isEmpty()) {
|
||||||
Log.error("File not found when trying to read: " + fileUuid);
|
Log.error("File not found when trying to read: " + fileUuid);
|
||||||
return Uni.createFrom().item(Optional.empty());
|
return Uni.createFrom().item(Optional.empty());
|
||||||
@@ -239,7 +238,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
|||||||
long toReadInChunk = (offset + length) - curPos;
|
long toReadInChunk = (offset + length) - curPos;
|
||||||
|
|
||||||
var chunkUuid = chunk.getValue();
|
var chunkUuid = chunk.getValue();
|
||||||
var chunkRead = jObjectManager.get(namespace, chunkUuid, Chunk.class).await().indefinitely();
|
var chunkRead = jObjectManager.get(chunkUuid, Chunk.class).await().indefinitely();
|
||||||
|
|
||||||
if (chunkRead.isEmpty()) {
|
if (chunkRead.isEmpty()) {
|
||||||
Log.error("Chunk requested not found: " + chunkUuid);
|
Log.error("Chunk requested not found: " + chunkUuid);
|
||||||
@@ -270,7 +269,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Uni<Long> write(String fileUuid, long offset, byte[] data) {
|
public Uni<Long> write(String fileUuid, long offset, byte[] data) {
|
||||||
var fileOpt = jObjectManager.get(namespace, fileUuid, File.class).await().indefinitely();
|
var fileOpt = jObjectManager.get(fileUuid, File.class).await().indefinitely();
|
||||||
if (fileOpt.isEmpty()) {
|
if (fileOpt.isEmpty()) {
|
||||||
Log.error("File not found when trying to read: " + fileUuid);
|
Log.error("File not found when trying to read: " + fileUuid);
|
||||||
return Uni.createFrom().item(-1L);
|
return Uni.createFrom().item(-1L);
|
||||||
@@ -302,7 +301,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
|||||||
|
|
||||||
if (first != null && first.getKey() < offset) {
|
if (first != null && first.getKey() < offset) {
|
||||||
var chunkUuid = first.getValue();
|
var chunkUuid = first.getValue();
|
||||||
var chunkRead = jObjectManager.get(namespace, chunkUuid, Chunk.class).await().indefinitely();
|
var chunkRead = jObjectManager.get(chunkUuid, Chunk.class).await().indefinitely();
|
||||||
|
|
||||||
if (chunkRead.isEmpty()) {
|
if (chunkRead.isEmpty()) {
|
||||||
Log.error("Chunk requested not found: " + chunkUuid);
|
Log.error("Chunk requested not found: " + chunkUuid);
|
||||||
@@ -311,20 +310,20 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
|||||||
|
|
||||||
var chunkBytes = chunkRead.get().getBytes();
|
var chunkBytes = chunkRead.get().getBytes();
|
||||||
Chunk newChunk = new Chunk(Arrays.copyOfRange(chunkBytes, 0, (int) (offset - first.getKey())));
|
Chunk newChunk = new Chunk(Arrays.copyOfRange(chunkBytes, 0, (int) (offset - first.getKey())));
|
||||||
jObjectManager.put(namespace, newChunk).await().indefinitely();
|
jObjectManager.put(newChunk).await().indefinitely();
|
||||||
|
|
||||||
newChunks.put(first.getKey(), newChunk.getHash());
|
newChunks.put(first.getKey(), newChunk.getHash());
|
||||||
}
|
}
|
||||||
|
|
||||||
{
|
{
|
||||||
Chunk newChunk = new Chunk(data);
|
Chunk newChunk = new Chunk(data);
|
||||||
jObjectManager.put(namespace, newChunk).await().indefinitely();
|
jObjectManager.put(newChunk).await().indefinitely();
|
||||||
|
|
||||||
newChunks.put(offset, newChunk.getHash());
|
newChunks.put(offset, newChunk.getHash());
|
||||||
}
|
}
|
||||||
if (last != null) {
|
if (last != null) {
|
||||||
var lchunkUuid = last.getValue();
|
var lchunkUuid = last.getValue();
|
||||||
var lchunkRead = jObjectManager.get(namespace, lchunkUuid, Chunk.class).await().indefinitely();
|
var lchunkRead = jObjectManager.get(lchunkUuid, Chunk.class).await().indefinitely();
|
||||||
|
|
||||||
if (lchunkRead.isEmpty()) {
|
if (lchunkRead.isEmpty()) {
|
||||||
Log.error("Chunk requested not found: " + lchunkUuid);
|
Log.error("Chunk requested not found: " + lchunkUuid);
|
||||||
@@ -337,7 +336,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
|||||||
var startInFile = offset + data.length;
|
var startInFile = offset + data.length;
|
||||||
var startInChunk = startInFile - last.getKey();
|
var startInChunk = startInFile - last.getKey();
|
||||||
Chunk newChunk = new Chunk(Arrays.copyOfRange(lchunkBytes, (int) startInChunk, lchunkBytes.length));
|
Chunk newChunk = new Chunk(Arrays.copyOfRange(lchunkBytes, (int) startInChunk, lchunkBytes.length));
|
||||||
jObjectManager.put(namespace, newChunk).await().indefinitely();
|
jObjectManager.put(newChunk).await().indefinitely();
|
||||||
|
|
||||||
newChunks.put(startInFile, newChunk.getHash());
|
newChunks.put(startInFile, newChunk.getHash());
|
||||||
}
|
}
|
||||||
@@ -355,14 +354,14 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
|||||||
return Uni.createFrom().item(-1L);
|
return Uni.createFrom().item(-1L);
|
||||||
}
|
}
|
||||||
|
|
||||||
jObjectManager.put(namespace, file).await().indefinitely();
|
jObjectManager.put(file).await().indefinitely();
|
||||||
|
|
||||||
return Uni.createFrom().item((long) data.length);
|
return Uni.createFrom().item((long) data.length);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Uni<Boolean> truncate(String fileUuid, long length) {
|
public Uni<Boolean> truncate(String fileUuid, long length) {
|
||||||
var fileOpt = jObjectManager.get(namespace, fileUuid, File.class).await().indefinitely();
|
var fileOpt = jObjectManager.get(fileUuid, File.class).await().indefinitely();
|
||||||
if (fileOpt.isEmpty()) {
|
if (fileOpt.isEmpty()) {
|
||||||
Log.error("File not found when trying to read: " + fileUuid);
|
Log.error("File not found when trying to read: " + fileUuid);
|
||||||
return Uni.createFrom().item(false);
|
return Uni.createFrom().item(false);
|
||||||
@@ -380,7 +379,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
|||||||
Log.error("Error writing file chunks: " + fileUuid, e);
|
Log.error("Error writing file chunks: " + fileUuid, e);
|
||||||
return Uni.createFrom().item(false);
|
return Uni.createFrom().item(false);
|
||||||
}
|
}
|
||||||
jObjectManager.put(namespace, file).await().indefinitely();
|
jObjectManager.put(file).await().indefinitely();
|
||||||
return Uni.createFrom().item(true);
|
return Uni.createFrom().item(true);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -404,7 +403,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
|||||||
|
|
||||||
if (lastChunk != null) {
|
if (lastChunk != null) {
|
||||||
var chunkUuid = lastChunk.getValue();
|
var chunkUuid = lastChunk.getValue();
|
||||||
var chunkRead = jObjectManager.get(namespace, chunkUuid, Chunk.class).await().indefinitely();
|
var chunkRead = jObjectManager.get(chunkUuid, Chunk.class).await().indefinitely();
|
||||||
|
|
||||||
if (chunkRead.isEmpty()) {
|
if (chunkRead.isEmpty()) {
|
||||||
Log.error("Chunk requested not found: " + chunkUuid);
|
Log.error("Chunk requested not found: " + chunkUuid);
|
||||||
@@ -416,7 +415,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
|||||||
if (lastChunk.getKey() + chunkBytes.length > 0) {
|
if (lastChunk.getKey() + chunkBytes.length > 0) {
|
||||||
int start = (int) (length - lastChunk.getKey());
|
int start = (int) (length - lastChunk.getKey());
|
||||||
Chunk newChunk = new Chunk(Arrays.copyOfRange(chunkBytes, 0, (int) (length - start)));
|
Chunk newChunk = new Chunk(Arrays.copyOfRange(chunkBytes, 0, (int) (length - start)));
|
||||||
jObjectManager.put(namespace, newChunk).await().indefinitely();
|
jObjectManager.put(newChunk).await().indefinitely();
|
||||||
|
|
||||||
newChunks.put(lastChunk.getKey(), newChunk.getHash());
|
newChunks.put(lastChunk.getKey(), newChunk.getHash());
|
||||||
}
|
}
|
||||||
@@ -434,14 +433,14 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
|||||||
return Uni.createFrom().item(false);
|
return Uni.createFrom().item(false);
|
||||||
}
|
}
|
||||||
|
|
||||||
jObjectManager.put(namespace, file).await().indefinitely();
|
jObjectManager.put(file).await().indefinitely();
|
||||||
|
|
||||||
return Uni.createFrom().item(true);
|
return Uni.createFrom().item(true);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Uni<Boolean> setTimes(String fileUuid, long atimeMs, long mtimeMs) {
|
public Uni<Boolean> setTimes(String fileUuid, long atimeMs, long mtimeMs) {
|
||||||
var fileOpt = jObjectManager.get(namespace, fileUuid, File.class).await().indefinitely();
|
var fileOpt = jObjectManager.get(fileUuid, File.class).await().indefinitely();
|
||||||
if (fileOpt.isEmpty()) {
|
if (fileOpt.isEmpty()) {
|
||||||
Log.error("File not found when trying to read: " + fileUuid);
|
Log.error("File not found when trying to read: " + fileUuid);
|
||||||
return Uni.createFrom().item(false);
|
return Uni.createFrom().item(false);
|
||||||
@@ -458,7 +457,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
|||||||
return Uni.createFrom().item(false);
|
return Uni.createFrom().item(false);
|
||||||
}
|
}
|
||||||
|
|
||||||
jObjectManager.put(namespace, file).await().indefinitely();
|
jObjectManager.put(file).await().indefinitely();
|
||||||
|
|
||||||
return Uni.createFrom().item(true);
|
return Uni.createFrom().item(true);
|
||||||
}
|
}
|
||||||
@@ -483,7 +482,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
|||||||
|
|
||||||
for (var chunk : chunksAll.entrySet()) {
|
for (var chunk : chunksAll.entrySet()) {
|
||||||
var chunkUuid = chunk.getValue();
|
var chunkUuid = chunk.getValue();
|
||||||
var chunkRead = jObjectManager.get(namespace, chunkUuid, Chunk.class).await().indefinitely();
|
var chunkRead = jObjectManager.get(chunkUuid, Chunk.class).await().indefinitely();
|
||||||
|
|
||||||
if (chunkRead.isEmpty()) {
|
if (chunkRead.isEmpty()) {
|
||||||
Log.error("Chunk requested not found: " + chunkUuid);
|
Log.error("Chunk requested not found: " + chunkUuid);
|
||||||
@@ -498,7 +497,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Uni<Directory> getRoot() {
|
public Uni<Directory> getRoot() {
|
||||||
var read = jObjectManager.get(namespace, new UUID(0, 0).toString(), FsNode.class).await().indefinitely();
|
var read = jObjectManager.get(new UUID(0, 0).toString(), FsNode.class).await().indefinitely();
|
||||||
if (read.isEmpty() || !(read.get() instanceof Directory)) {
|
if (read.isEmpty() || !(read.get() instanceof Directory)) {
|
||||||
Log.error("Root directory not found");
|
Log.error("Root directory not found");
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,7 +1,6 @@
|
|||||||
package com.usatiuk.dhfs.storage.objects.api;
|
package com.usatiuk.dhfs.storage.objects.api;
|
||||||
|
|
||||||
import com.google.protobuf.ByteString;
|
import com.google.protobuf.ByteString;
|
||||||
import com.usatiuk.dhfs.storage.objects.data.Object;
|
|
||||||
import com.usatiuk.dhfs.storage.objects.repository.ObjectRepository;
|
import com.usatiuk.dhfs.storage.objects.repository.ObjectRepository;
|
||||||
import io.quarkus.grpc.GrpcService;
|
import io.quarkus.grpc.GrpcService;
|
||||||
import io.smallrye.common.annotation.RunOnVirtualThread;
|
import io.smallrye.common.annotation.RunOnVirtualThread;
|
||||||
@@ -15,7 +14,7 @@ public class DhfsObjectGrpcService implements DhfsObjectGrpc {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Uni<FindObjectsReply> findObjects(FindObjectsRequest request) {
|
public Uni<FindObjectsReply> findObjects(FindObjectsRequest request) {
|
||||||
return objectRepository.findObjects(request.getNamespace(), request.getPrefix())
|
return objectRepository.findObjects(request.getPrefix())
|
||||||
.map(m -> FindObjectsReply.FindObjectsEntry.newBuilder().setName(m).build())
|
.map(m -> FindObjectsReply.FindObjectsEntry.newBuilder().setName(m).build())
|
||||||
.collect().in(FindObjectsReply::newBuilder, FindObjectsReply.Builder::addFound)
|
.collect().in(FindObjectsReply::newBuilder, FindObjectsReply.Builder::addFound)
|
||||||
.map(FindObjectsReply.Builder::build);
|
.map(FindObjectsReply.Builder::build);
|
||||||
@@ -24,34 +23,21 @@ public class DhfsObjectGrpcService implements DhfsObjectGrpc {
|
|||||||
@Override
|
@Override
|
||||||
@RunOnVirtualThread
|
@RunOnVirtualThread
|
||||||
public Uni<ReadObjectReply> readObject(ReadObjectRequest request) {
|
public Uni<ReadObjectReply> readObject(ReadObjectRequest request) {
|
||||||
var read = objectRepository.readObject(request.getNamespace(), request.getName());
|
var read = objectRepository.readObject(request.getName());
|
||||||
return Uni.createFrom().item(ReadObjectReply.newBuilder().setData(ByteString.copyFrom(read.getData())).build());
|
return Uni.createFrom().item(ReadObjectReply.newBuilder().setData(ByteString.copyFrom(read)).build());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@RunOnVirtualThread
|
@RunOnVirtualThread
|
||||||
public Uni<WriteObjectReply> writeObject(WriteObjectRequest request) {
|
public Uni<WriteObjectReply> writeObject(WriteObjectRequest request) {
|
||||||
objectRepository.writeObject(request.getNamespace(),
|
objectRepository.writeObject(request.getName(), request.getData().toByteArray(), false);
|
||||||
new Object(request.getNamespace(), request.getName(), request.getData().toByteArray()), false);
|
|
||||||
return Uni.createFrom().item(WriteObjectReply.newBuilder().build());
|
return Uni.createFrom().item(WriteObjectReply.newBuilder().build());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@RunOnVirtualThread
|
@RunOnVirtualThread
|
||||||
public Uni<DeleteObjectReply> deleteObject(DeleteObjectRequest request) {
|
public Uni<DeleteObjectReply> deleteObject(DeleteObjectRequest request) {
|
||||||
objectRepository.deleteObject(request.getNamespace(), request.getName());
|
objectRepository.deleteObject(request.getName());
|
||||||
return Uni.createFrom().item(DeleteObjectReply.newBuilder().build());
|
return Uni.createFrom().item(DeleteObjectReply.newBuilder().build());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public Uni<CreateNamespaceReply> createNamespace(CreateNamespaceRequest request) {
|
|
||||||
return objectRepository.createNamespace(request.getNamespace())
|
|
||||||
.map(n -> CreateNamespaceReply.newBuilder().build());
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Uni<DeleteNamespaceReply> deleteNamespace(DeleteNamespaceRequest request) {
|
|
||||||
return objectRepository.deleteNamespace(request.getNamespace())
|
|
||||||
.map(n -> DeleteNamespaceReply.newBuilder().build());
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,17 +0,0 @@
|
|||||||
package com.usatiuk.dhfs.storage.objects.data;
|
|
||||||
|
|
||||||
import lombok.AllArgsConstructor;
|
|
||||||
import lombok.Getter;
|
|
||||||
import lombok.Setter;
|
|
||||||
import lombok.experimental.Accessors;
|
|
||||||
|
|
||||||
@Accessors(chain = true)
|
|
||||||
@Getter
|
|
||||||
@Setter
|
|
||||||
@AllArgsConstructor
|
|
||||||
public class Object {
|
|
||||||
final String _namespace;
|
|
||||||
|
|
||||||
final String _name;
|
|
||||||
final byte[] _data;
|
|
||||||
}
|
|
||||||
@@ -5,7 +5,7 @@ import io.smallrye.mutiny.Uni;
|
|||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
|
|
||||||
public interface JObjectManager {
|
public interface JObjectManager {
|
||||||
<T extends JObject> Uni<Optional<T>> get(String namespace, String key, Class<T> clazz);
|
<T extends JObject> Uni<Optional<T>> get(String name, Class<T> clazz);
|
||||||
<T extends JObject> Uni<Void> put(String namespace, T object);
|
<T extends JObject> Uni<Void> put(T object);
|
||||||
void invalidateJObject(String namespace, String name);
|
void invalidateJObject(String name);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -39,13 +39,13 @@ public class JObjectManagerImpl implements JObjectManager {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private <T extends JObject> T getFromMap(String namespace, String key, Class<T> clazz) {
|
private <T extends JObject> T getFromMap(String key, Class<T> clazz) {
|
||||||
synchronized (_map) {
|
synchronized (_map) {
|
||||||
if (_map.containsKey(key)) {
|
if (_map.containsKey(key)) {
|
||||||
var ref = _map.get(key).get();
|
var ref = _map.get(key).get();
|
||||||
if (ref != null) {
|
if (ref != null) {
|
||||||
if (!clazz.isAssignableFrom(ref.getClass())) {
|
if (!clazz.isAssignableFrom(ref.getClass())) {
|
||||||
Log.error("Cached object type mismatch: " + namespace + "/" + key);
|
Log.error("Cached object type mismatch: " + key);
|
||||||
_map.remove(key);
|
_map.remove(key);
|
||||||
} else
|
} else
|
||||||
return (T) ref;
|
return (T) ref;
|
||||||
@@ -56,45 +56,45 @@ public class JObjectManagerImpl implements JObjectManager {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public <T extends JObject> Uni<Optional<T>> get(String namespace, String key, Class<T> clazz) {
|
public <T extends JObject> Uni<Optional<T>> get(String name, Class<T> clazz) {
|
||||||
cleanup();
|
cleanup();
|
||||||
synchronized (_map) {
|
synchronized (_map) {
|
||||||
var inMap = getFromMap(namespace, key, clazz);
|
var inMap = getFromMap(name, clazz);
|
||||||
if (inMap != null) return Uni.createFrom().item(Optional.of(inMap));
|
if (inMap != null) return Uni.createFrom().item(Optional.of(inMap));
|
||||||
}
|
}
|
||||||
|
|
||||||
var read = jObjectRepository.readJObjectChecked(namespace, key, clazz);
|
var read = jObjectRepository.readJObjectChecked(name, clazz);
|
||||||
|
|
||||||
if (read.isEmpty())
|
if (read.isEmpty())
|
||||||
return Uni.createFrom().item(Optional.empty());
|
return Uni.createFrom().item(Optional.empty());
|
||||||
|
|
||||||
synchronized (_map) {
|
synchronized (_map) {
|
||||||
var inMap = getFromMap(namespace, key, clazz);
|
var inMap = getFromMap(name, clazz);
|
||||||
if (inMap != null) return Uni.createFrom().item(Optional.of(inMap));
|
if (inMap != null) return Uni.createFrom().item(Optional.of(inMap));
|
||||||
_map.put(key, new NamedSoftReference(read.get(), _refQueue));
|
_map.put(name, new NamedSoftReference(read.get(), _refQueue));
|
||||||
}
|
}
|
||||||
|
|
||||||
return Uni.createFrom().item(Optional.of(read.get()));
|
return Uni.createFrom().item(Optional.of(read.get()));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public <T extends JObject> Uni<Void> put(String namespace, T object) {
|
public <T extends JObject> Uni<Void> put(T object) {
|
||||||
cleanup();
|
cleanup();
|
||||||
|
|
||||||
synchronized (_map) {
|
synchronized (_map) {
|
||||||
var inMap = getFromMap(namespace, object.getName(), object.getClass());
|
var inMap = getFromMap(object.getName(), object.getClass());
|
||||||
if (inMap != null && inMap != object && !object.assumeUnique())
|
if (inMap != null && inMap != object && !object.assumeUnique())
|
||||||
throw new IllegalArgumentException("Trying to insert different object with same key");
|
throw new IllegalArgumentException("Trying to insert different object with same key");
|
||||||
else if (inMap == null)
|
else if (inMap == null)
|
||||||
_map.put(object.getName(), new NamedSoftReference(object, _refQueue));
|
_map.put(object.getName(), new NamedSoftReference(object, _refQueue));
|
||||||
}
|
}
|
||||||
|
|
||||||
jObjectRepository.writeJObject(namespace, object);
|
jObjectRepository.writeJObject(object);
|
||||||
return Uni.createFrom().voidItem();
|
return Uni.createFrom().voidItem();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void invalidateJObject(String namespace, String name) {
|
public void invalidateJObject(String name) {
|
||||||
synchronized (_map) {
|
synchronized (_map) {
|
||||||
_map.remove(name);
|
_map.remove(name);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -5,9 +5,9 @@ import java.util.Optional;
|
|||||||
|
|
||||||
public interface JObjectRepository {
|
public interface JObjectRepository {
|
||||||
@Nonnull
|
@Nonnull
|
||||||
Optional<JObject> readJObject(String namespace, String name);
|
Optional<JObject> readJObject(String name);
|
||||||
@Nonnull
|
@Nonnull
|
||||||
<T extends JObject> Optional<T> readJObjectChecked(String namespace, String name, Class<T> clazz);
|
<T extends JObject> Optional<T> readJObjectChecked(String name, Class<T> clazz);
|
||||||
@Nonnull
|
@Nonnull
|
||||||
void writeJObject(String namespace, JObject object);
|
void writeJObject(JObject object);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,7 +1,6 @@
|
|||||||
package com.usatiuk.dhfs.storage.objects.jrepository;
|
package com.usatiuk.dhfs.storage.objects.jrepository;
|
||||||
|
|
||||||
import com.usatiuk.dhfs.storage.DeserializationHelper;
|
import com.usatiuk.dhfs.storage.DeserializationHelper;
|
||||||
import com.usatiuk.dhfs.storage.objects.data.Object;
|
|
||||||
import com.usatiuk.dhfs.storage.objects.repository.ObjectRepository;
|
import com.usatiuk.dhfs.storage.objects.repository.ObjectRepository;
|
||||||
import io.quarkus.logging.Log;
|
import io.quarkus.logging.Log;
|
||||||
import jakarta.enterprise.context.ApplicationScoped;
|
import jakarta.enterprise.context.ApplicationScoped;
|
||||||
@@ -18,11 +17,11 @@ public class JObjectRepositoryImpl implements JObjectRepository {
|
|||||||
|
|
||||||
@Nonnull
|
@Nonnull
|
||||||
@Override
|
@Override
|
||||||
public Optional<JObject> readJObject(String namespace, String name) {
|
public Optional<JObject> readJObject(String name) {
|
||||||
var read = objectRepository.readObject(namespace, name);
|
var read = objectRepository.readObject(name);
|
||||||
java.lang.Object obj = DeserializationHelper.deserialize(read.getData());
|
java.lang.Object obj = DeserializationHelper.deserialize(read);
|
||||||
if (!(obj instanceof JObject)) {
|
if (!(obj instanceof JObject)) {
|
||||||
Log.error("Read object is not a JObject: " + namespace + "/" + name);
|
Log.error("Read object is not a JObject: " + name);
|
||||||
return Optional.empty();
|
return Optional.empty();
|
||||||
}
|
}
|
||||||
return Optional.of((JObject) obj);
|
return Optional.of((JObject) obj);
|
||||||
@@ -30,12 +29,12 @@ public class JObjectRepositoryImpl implements JObjectRepository {
|
|||||||
|
|
||||||
@Nonnull
|
@Nonnull
|
||||||
@Override
|
@Override
|
||||||
public <T extends JObject> Optional<T> readJObjectChecked(String namespace, String name, Class<T> clazz) {
|
public <T extends JObject> Optional<T> readJObjectChecked(String name, Class<T> clazz) {
|
||||||
var read = readJObject(namespace, name);
|
var read = readJObject(name);
|
||||||
if (read.isEmpty()) return Optional.empty();
|
if (read.isEmpty()) return Optional.empty();
|
||||||
|
|
||||||
if (!clazz.isAssignableFrom(read.get().getClass())) {
|
if (!clazz.isAssignableFrom(read.get().getClass())) {
|
||||||
Log.error("Read object type mismatch: " + namespace + "/" + name);
|
Log.error("Read object type mismatch: " + name);
|
||||||
return Optional.empty();
|
return Optional.empty();
|
||||||
}
|
}
|
||||||
return Optional.of((T) read.get());
|
return Optional.of((T) read.get());
|
||||||
@@ -43,11 +42,7 @@ public class JObjectRepositoryImpl implements JObjectRepository {
|
|||||||
|
|
||||||
@Nonnull
|
@Nonnull
|
||||||
@Override
|
@Override
|
||||||
public void writeJObject(String namespace, JObject object) {
|
public void writeJObject(JObject object) {
|
||||||
final var obj = new Object(
|
objectRepository.writeObject(object.getName(), SerializationUtils.serialize(object), object.assumeUnique());
|
||||||
namespace,
|
|
||||||
object.getName(),
|
|
||||||
SerializationUtils.serialize(object));
|
|
||||||
objectRepository.writeObject(namespace, obj, object.assumeUnique());
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,6 +1,5 @@
|
|||||||
package com.usatiuk.dhfs.storage.objects.repository;
|
package com.usatiuk.dhfs.storage.objects.repository;
|
||||||
|
|
||||||
import com.usatiuk.dhfs.storage.objects.data.Object;
|
|
||||||
import io.smallrye.mutiny.Multi;
|
import io.smallrye.mutiny.Multi;
|
||||||
import io.smallrye.mutiny.Uni;
|
import io.smallrye.mutiny.Uni;
|
||||||
|
|
||||||
@@ -8,19 +7,14 @@ import javax.annotation.Nonnull;
|
|||||||
|
|
||||||
public interface ObjectRepository {
|
public interface ObjectRepository {
|
||||||
@Nonnull
|
@Nonnull
|
||||||
Multi<String> findObjects(String namespace, String prefix);
|
Multi<String> findObjects(String prefix);
|
||||||
@Nonnull
|
@Nonnull
|
||||||
Uni<Boolean> existsObject(String namespace, String name);
|
Uni<Boolean> existsObject(String name);
|
||||||
|
|
||||||
@Nonnull
|
@Nonnull
|
||||||
Object readObject(String namespace, String name);
|
byte[] readObject(String name);
|
||||||
@Nonnull
|
@Nonnull
|
||||||
void writeObject(String namespace, Object object, Boolean canIgnoreConflict);
|
void writeObject(String name, byte[] data, Boolean canIgnoreConflict);
|
||||||
@Nonnull
|
@Nonnull
|
||||||
void deleteObject(String namespace, String name);
|
void deleteObject(String name);
|
||||||
|
|
||||||
@Nonnull
|
|
||||||
Uni<Void> createNamespace(String namespace);
|
|
||||||
@Nonnull
|
|
||||||
Uni<Void> deleteNamespace(String namespace);
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,11 +1,9 @@
|
|||||||
package com.usatiuk.dhfs.storage.objects.repository;
|
package com.usatiuk.dhfs.storage.objects.repository;
|
||||||
|
|
||||||
import com.usatiuk.dhfs.storage.objects.data.Object;
|
|
||||||
import com.usatiuk.dhfs.storage.objects.repository.persistence.ObjectPersistentStore;
|
import com.usatiuk.dhfs.storage.objects.repository.persistence.ObjectPersistentStore;
|
||||||
import io.smallrye.mutiny.Multi;
|
import io.smallrye.mutiny.Multi;
|
||||||
import io.smallrye.mutiny.Uni;
|
import io.smallrye.mutiny.Uni;
|
||||||
import jakarta.inject.Inject;
|
import jakarta.inject.Inject;
|
||||||
import org.apache.commons.lang3.NotImplementedException;
|
|
||||||
|
|
||||||
import javax.annotation.Nonnull;
|
import javax.annotation.Nonnull;
|
||||||
|
|
||||||
@@ -16,43 +14,31 @@ public class SimplePersistentObjectRepository implements ObjectRepository {
|
|||||||
|
|
||||||
@Nonnull
|
@Nonnull
|
||||||
@Override
|
@Override
|
||||||
public Multi<String> findObjects(String namespace, String prefix) {
|
public Multi<String> findObjects(String prefix) {
|
||||||
return objectPersistentStore.findObjects(namespace, prefix);
|
return objectPersistentStore.findObjects(prefix);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Nonnull
|
@Nonnull
|
||||||
@Override
|
@Override
|
||||||
public Uni<Boolean> existsObject(String namespace, String name) {
|
public Uni<Boolean> existsObject(String name) {
|
||||||
return objectPersistentStore.existsObject(namespace, name);
|
return objectPersistentStore.existsObject(name);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Nonnull
|
@Nonnull
|
||||||
@Override
|
@Override
|
||||||
public Object readObject(String namespace, String name) {
|
public byte[] readObject(String name) {
|
||||||
return objectPersistentStore.readObject(namespace, name).await().indefinitely();
|
return objectPersistentStore.readObject(name).await().indefinitely();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Nonnull
|
@Nonnull
|
||||||
@Override
|
@Override
|
||||||
public void writeObject(String namespace, Object object, Boolean canIgnoreConflict) {
|
public void writeObject(String name, byte[] data, Boolean canIgnoreConflict) {
|
||||||
objectPersistentStore.writeObject(namespace, object).await().indefinitely();
|
objectPersistentStore.writeObject(name, data).await().indefinitely();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Nonnull
|
@Nonnull
|
||||||
@Override
|
@Override
|
||||||
public void deleteObject(String namespace, String name) {
|
public void deleteObject(String name) {
|
||||||
objectPersistentStore.deleteObject(namespace, name).await().indefinitely();
|
objectPersistentStore.deleteObject(name).await().indefinitely();
|
||||||
}
|
|
||||||
|
|
||||||
@Nonnull
|
|
||||||
@Override
|
|
||||||
public Uni<Void> createNamespace(String namespace) {
|
|
||||||
return Uni.createFrom().voidItem();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Nonnull
|
|
||||||
@Override
|
|
||||||
public Uni<Void> deleteNamespace(String namespace) {
|
|
||||||
throw new NotImplementedException();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,7 +1,6 @@
|
|||||||
package com.usatiuk.dhfs.storage.objects.repository.distributed;
|
package com.usatiuk.dhfs.storage.objects.repository.distributed;
|
||||||
|
|
||||||
import com.usatiuk.dhfs.objects.repository.distributed.IndexUpdatePush;
|
import com.usatiuk.dhfs.objects.repository.distributed.IndexUpdatePush;
|
||||||
import com.usatiuk.dhfs.storage.objects.data.Object;
|
|
||||||
import com.usatiuk.dhfs.storage.objects.repository.ObjectRepository;
|
import com.usatiuk.dhfs.storage.objects.repository.ObjectRepository;
|
||||||
import com.usatiuk.dhfs.storage.objects.repository.persistence.ObjectPersistentStore;
|
import com.usatiuk.dhfs.storage.objects.repository.persistence.ObjectPersistentStore;
|
||||||
import io.quarkus.logging.Log;
|
import io.quarkus.logging.Log;
|
||||||
@@ -45,12 +44,12 @@ public class DistributedObjectRepository implements ObjectRepository {
|
|||||||
Log.info("Starting sync");
|
Log.info("Starting sync");
|
||||||
var got = remoteObjectServiceClient.getIndex();
|
var got = remoteObjectServiceClient.getIndex();
|
||||||
for (var h : got) {
|
for (var h : got) {
|
||||||
var prevMtime = objectIndexService.exists(h.getNamespace(), h.getName())
|
var prevMtime = objectIndexService.exists(h.getName())
|
||||||
? objectIndexService.getMeta(h.getNamespace(), h.getName()).get().getMtime()
|
? objectIndexService.getMeta(h.getName()).get().getMtime()
|
||||||
: 0;
|
: 0;
|
||||||
syncHandler.handleRemoteUpdate(
|
syncHandler.handleRemoteUpdate(
|
||||||
IndexUpdatePush.newBuilder().setSelfname(selfname
|
IndexUpdatePush.newBuilder().setSelfname(selfname).setName(h.getName())
|
||||||
).setNamespace(h.getNamespace()).setName(h.getName()).setAssumeUnique(h.getAssumeUnique())
|
.setAssumeUnique(h.getAssumeUnique())
|
||||||
.setMtime(h.getMtime()).setPrevMtime(prevMtime).build()).await().indefinitely();
|
.setMtime(h.getMtime()).setPrevMtime(prevMtime).build()).await().indefinitely();
|
||||||
}
|
}
|
||||||
Log.info("Sync complete");
|
Log.info("Sync complete");
|
||||||
@@ -65,38 +64,38 @@ public class DistributedObjectRepository implements ObjectRepository {
|
|||||||
|
|
||||||
@Nonnull
|
@Nonnull
|
||||||
@Override
|
@Override
|
||||||
public Multi<String> findObjects(String namespace, String prefix) {
|
public Multi<String> findObjects(String prefix) {
|
||||||
throw new NotImplementedException();
|
throw new NotImplementedException();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Nonnull
|
@Nonnull
|
||||||
@Override
|
@Override
|
||||||
public Uni<Boolean> existsObject(String namespace, String name) {
|
public Uni<Boolean> existsObject(String name) {
|
||||||
return Uni.createFrom().item(objectIndexService.exists(namespace, name));
|
return Uni.createFrom().item(objectIndexService.exists(name));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Nonnull
|
@Nonnull
|
||||||
@Override
|
@Override
|
||||||
public Object readObject(String namespace, String name) {
|
public byte[] readObject(String name) {
|
||||||
if (!objectIndexService.exists(namespace, name))
|
if (!objectIndexService.exists(name))
|
||||||
throw new IllegalArgumentException("Object " + name + " doesn't exist");
|
throw new IllegalArgumentException("Object " + name + " doesn't exist");
|
||||||
|
|
||||||
var infoOpt = objectIndexService.getMeta(namespace, name);
|
var infoOpt = objectIndexService.getMeta(name);
|
||||||
if (infoOpt.isEmpty()) throw new IllegalArgumentException("Object " + name + " doesn't exist");
|
if (infoOpt.isEmpty()) throw new IllegalArgumentException("Object " + name + " doesn't exist");
|
||||||
|
|
||||||
var info = infoOpt.get();
|
var info = infoOpt.get();
|
||||||
|
|
||||||
Optional<Object> read = info.runReadLocked(() -> {
|
Optional<byte[]> read = info.runReadLocked(() -> {
|
||||||
if (objectPersistentStore.existsObject(namespace, name).await().indefinitely())
|
if (objectPersistentStore.existsObject(name).await().indefinitely())
|
||||||
return Optional.of(objectPersistentStore.readObject(namespace, name).await().indefinitely());
|
return Optional.of(objectPersistentStore.readObject(name).await().indefinitely());
|
||||||
return Optional.empty();
|
return Optional.empty();
|
||||||
});
|
});
|
||||||
if (read.isPresent()) return read.get();
|
if (read.isPresent()) return read.get();
|
||||||
// Race?
|
// Race?
|
||||||
|
|
||||||
return info.runWriteLocked(() -> {
|
return info.runWriteLocked(() -> {
|
||||||
return remoteObjectServiceClient.getObject(namespace, name).map(got -> {
|
return remoteObjectServiceClient.getObject(name).map(got -> {
|
||||||
objectPersistentStore.writeObject(namespace, got);
|
objectPersistentStore.writeObject(name, got);
|
||||||
return got;
|
return got;
|
||||||
}).await().indefinitely();
|
}).await().indefinitely();
|
||||||
});
|
});
|
||||||
@@ -104,17 +103,15 @@ public class DistributedObjectRepository implements ObjectRepository {
|
|||||||
|
|
||||||
@Nonnull
|
@Nonnull
|
||||||
@Override
|
@Override
|
||||||
public void writeObject(String namespace, Object object, Boolean canIgnoreConflict) {
|
public void writeObject(String name, byte[] data, Boolean canIgnoreConflict) {
|
||||||
var info = objectIndexService.getOrCreateMeta(namespace, object.getName(), canIgnoreConflict);
|
var info = objectIndexService.getOrCreateMeta(name, canIgnoreConflict);
|
||||||
|
|
||||||
info.runWriteLocked(() -> {
|
info.runWriteLocked(() -> {
|
||||||
objectPersistentStore.writeObject(namespace, object).await().indefinitely();
|
objectPersistentStore.writeObject(name, data).await().indefinitely();
|
||||||
var prevMtime = info.getMtime();
|
var prevMtime = info.getMtime();
|
||||||
info.setMtime(System.currentTimeMillis());
|
info.setMtime(System.currentTimeMillis());
|
||||||
try {
|
try {
|
||||||
Log.warn("Updating object " + object.getNamespace() + "/" + object.getName() + " from: " + info.getMtime() + " to: " + prevMtime);
|
remoteObjectServiceClient.notifyUpdate(name, prevMtime);
|
||||||
remoteObjectServiceClient.notifyUpdate(namespace, object.getName(), prevMtime);
|
|
||||||
Log.warn("Updating object complete" + object.getNamespace() + "/" + object.getName() + " from: " + info.getMtime() + " to: " + prevMtime);
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
Log.error("Error when notifying remote update:");
|
Log.error("Error when notifying remote update:");
|
||||||
Log.error(e);
|
Log.error(e);
|
||||||
@@ -126,19 +123,7 @@ public class DistributedObjectRepository implements ObjectRepository {
|
|||||||
|
|
||||||
@Nonnull
|
@Nonnull
|
||||||
@Override
|
@Override
|
||||||
public void deleteObject(String namespace, String name) {
|
public void deleteObject(String name) {
|
||||||
throw new NotImplementedException();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Nonnull
|
|
||||||
@Override
|
|
||||||
public Uni<Void> createNamespace(String namespace) {
|
|
||||||
return Uni.createFrom().voidItem();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Nonnull
|
|
||||||
@Override
|
|
||||||
public Uni<Void> deleteNamespace(String namespace) {
|
|
||||||
throw new NotImplementedException();
|
throw new NotImplementedException();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,7 +1,6 @@
|
|||||||
package com.usatiuk.dhfs.storage.objects.repository.distributed;
|
package com.usatiuk.dhfs.storage.objects.repository.distributed;
|
||||||
|
|
||||||
import lombok.Getter;
|
import lombok.Getter;
|
||||||
import org.apache.commons.lang3.tuple.ImmutablePair;
|
|
||||||
|
|
||||||
import java.io.Serializable;
|
import java.io.Serializable;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
@@ -13,7 +12,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
|
|||||||
|
|
||||||
public class ObjectIndex implements Serializable {
|
public class ObjectIndex implements Serializable {
|
||||||
@Getter
|
@Getter
|
||||||
final Map<ImmutablePair<String, String>, ObjectMeta> _objectMetaMap = new HashMap<>();
|
final Map<String, ObjectMeta> _objectMetaMap = new HashMap<>();
|
||||||
|
|
||||||
private final ReadWriteLock _lock = new ReentrantReadWriteLock();
|
private final ReadWriteLock _lock = new ReentrantReadWriteLock();
|
||||||
|
|
||||||
@@ -39,32 +38,32 @@ public class ObjectIndex implements Serializable {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean exists(String namespace, String name) {
|
public boolean exists(String name) {
|
||||||
return runReadLocked(() -> {
|
return runReadLocked(() -> {
|
||||||
return _objectMetaMap.containsKey(new ImmutablePair<>(namespace, name));
|
return _objectMetaMap.containsKey(name);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
public Optional<ObjectMeta> get(String namespace, String name) {
|
public Optional<ObjectMeta> get(String name) {
|
||||||
return runReadLocked(() -> {
|
return runReadLocked(() -> {
|
||||||
if (_objectMetaMap.containsKey(new ImmutablePair<>(namespace, name))) {
|
if (_objectMetaMap.containsKey(name)) {
|
||||||
return Optional.of(_objectMetaMap.get(new ImmutablePair<>(namespace, name)));
|
return Optional.of(_objectMetaMap.get(name));
|
||||||
} else {
|
} else {
|
||||||
return Optional.empty();
|
return Optional.empty();
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
public ObjectMeta getOrCreate(String namespace, String name, boolean assumeUnique) {
|
public ObjectMeta getOrCreate(String name, boolean assumeUnique) {
|
||||||
return runWriteLocked(() -> {
|
return runWriteLocked(() -> {
|
||||||
if (_objectMetaMap.containsKey(new ImmutablePair<>(namespace, name))) {
|
if (_objectMetaMap.containsKey(name)) {
|
||||||
var got = _objectMetaMap.get(new ImmutablePair<>(namespace, name));
|
var got = _objectMetaMap.get(name);
|
||||||
if (got.getAssumeUnique() != assumeUnique)
|
if (got.getAssumeUnique() != assumeUnique)
|
||||||
throw new IllegalArgumentException("assumeUnique mismatch for " + namespace + "/" + name);
|
throw new IllegalArgumentException("assumeUnique mismatch for " + name);
|
||||||
return got;
|
return got;
|
||||||
} else {
|
} else {
|
||||||
var newObjectMeta = new ObjectMeta(namespace, name, assumeUnique);
|
var newObjectMeta = new ObjectMeta(name, assumeUnique);
|
||||||
_objectMetaMap.put(new ImmutablePair<>(namespace, name), newObjectMeta);
|
_objectMetaMap.put(name, newObjectMeta);
|
||||||
return newObjectMeta;
|
return newObjectMeta;
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|||||||
@@ -1,18 +1,13 @@
|
|||||||
package com.usatiuk.dhfs.storage.objects.repository.distributed;
|
package com.usatiuk.dhfs.storage.objects.repository.distributed;
|
||||||
|
|
||||||
import com.usatiuk.dhfs.objects.repository.distributed.IndexUpdatePush;
|
|
||||||
import com.usatiuk.dhfs.objects.repository.distributed.IndexUpdateReply;
|
|
||||||
import com.usatiuk.dhfs.storage.DeserializationHelper;
|
import com.usatiuk.dhfs.storage.DeserializationHelper;
|
||||||
import io.quarkus.logging.Log;
|
import io.quarkus.logging.Log;
|
||||||
import io.quarkus.runtime.ShutdownEvent;
|
import io.quarkus.runtime.ShutdownEvent;
|
||||||
import io.quarkus.runtime.StartupEvent;
|
import io.quarkus.runtime.StartupEvent;
|
||||||
import io.smallrye.mutiny.Uni;
|
|
||||||
import jakarta.annotation.Priority;
|
import jakarta.annotation.Priority;
|
||||||
import jakarta.enterprise.context.ApplicationScoped;
|
import jakarta.enterprise.context.ApplicationScoped;
|
||||||
import jakarta.enterprise.event.Observes;
|
import jakarta.enterprise.event.Observes;
|
||||||
import org.apache.commons.lang3.NotImplementedException;
|
|
||||||
import org.apache.commons.lang3.SerializationUtils;
|
import org.apache.commons.lang3.SerializationUtils;
|
||||||
import org.apache.commons.lang3.tuple.ImmutablePair;
|
|
||||||
import org.eclipse.microprofile.config.inject.ConfigProperty;
|
import org.eclipse.microprofile.config.inject.ConfigProperty;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
@@ -43,21 +38,21 @@ public class ObjectIndexService {
|
|||||||
Log.info("Shutdown");
|
Log.info("Shutdown");
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean exists(String namespace, String name) {
|
public boolean exists(String name) {
|
||||||
return _index.exists(namespace, name);
|
return _index.exists(name);
|
||||||
}
|
}
|
||||||
|
|
||||||
public Optional<ObjectMeta> getMeta(String namespace, String name) {
|
public Optional<ObjectMeta> getMeta(String name) {
|
||||||
return _index.get(namespace, name);
|
return _index.get(name);
|
||||||
}
|
}
|
||||||
|
|
||||||
public ObjectMeta getOrCreateMeta(String namespace, String name, boolean assumeUnique) {
|
public ObjectMeta getOrCreateMeta(String name, boolean assumeUnique) {
|
||||||
return _index.getOrCreate(namespace, name, assumeUnique);
|
return _index.getOrCreate(name, assumeUnique);
|
||||||
}
|
}
|
||||||
|
|
||||||
@FunctionalInterface
|
@FunctionalInterface
|
||||||
public interface ForAllFn {
|
public interface ForAllFn {
|
||||||
void apply(ImmutablePair<String, String> name, ObjectMeta meta);
|
void apply(String name, ObjectMeta meta);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void forAllRead(ForAllFn fn) {
|
public void forAllRead(ForAllFn fn) {
|
||||||
|
|||||||
@@ -10,16 +10,13 @@ import java.util.concurrent.locks.ReadWriteLock;
|
|||||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||||
|
|
||||||
public class ObjectMeta implements Serializable {
|
public class ObjectMeta implements Serializable {
|
||||||
public ObjectMeta(String namespace, String name, Boolean assumeUnique) {
|
public ObjectMeta(String name, Boolean assumeUnique) {
|
||||||
this._namespace = namespace;
|
|
||||||
this._name = name;
|
this._name = name;
|
||||||
this._assumeUnique = assumeUnique;
|
this._assumeUnique = assumeUnique;
|
||||||
}
|
}
|
||||||
|
|
||||||
private final ReadWriteLock _lock = new ReentrantReadWriteLock();
|
private final ReadWriteLock _lock = new ReentrantReadWriteLock();
|
||||||
|
|
||||||
@Getter
|
|
||||||
final String _namespace;
|
|
||||||
@Getter
|
@Getter
|
||||||
final String _name;
|
final String _name;
|
||||||
|
|
||||||
|
|||||||
@@ -4,7 +4,6 @@ import com.usatiuk.dhfs.objects.repository.distributed.GetIndexRequest;
|
|||||||
import com.usatiuk.dhfs.objects.repository.distributed.GetObjectRequest;
|
import com.usatiuk.dhfs.objects.repository.distributed.GetObjectRequest;
|
||||||
import com.usatiuk.dhfs.objects.repository.distributed.IndexUpdatePush;
|
import com.usatiuk.dhfs.objects.repository.distributed.IndexUpdatePush;
|
||||||
import com.usatiuk.dhfs.objects.repository.distributed.ObjectHeader;
|
import com.usatiuk.dhfs.objects.repository.distributed.ObjectHeader;
|
||||||
import com.usatiuk.dhfs.storage.objects.data.Object;
|
|
||||||
import io.quarkus.logging.Log;
|
import io.quarkus.logging.Log;
|
||||||
import io.smallrye.mutiny.Uni;
|
import io.smallrye.mutiny.Uni;
|
||||||
import jakarta.enterprise.context.ApplicationScoped;
|
import jakarta.enterprise.context.ApplicationScoped;
|
||||||
@@ -25,11 +24,11 @@ public class RemoteObjectServiceClient {
|
|||||||
@Inject
|
@Inject
|
||||||
RemoteHostManager remoteHostManager;
|
RemoteHostManager remoteHostManager;
|
||||||
|
|
||||||
public Uni<Object> getObject(String namespace, String name) {
|
public Uni<byte[]> getObject(String name) {
|
||||||
return remoteHostManager.withClient(client -> {
|
return remoteHostManager.withClient(client -> {
|
||||||
var req = GetObjectRequest.newBuilder().setNamespace(namespace).setName(name).build();
|
var req = GetObjectRequest.newBuilder().setName(name).build();
|
||||||
var reply = client.getObject(req);
|
var reply = client.getObject(req);
|
||||||
var metaOpt = objectIndexService.getMeta(namespace, name);
|
var metaOpt = objectIndexService.getMeta(name);
|
||||||
if (metaOpt.isEmpty()) throw new RuntimeException("Oops!");
|
if (metaOpt.isEmpty()) throw new RuntimeException("Oops!");
|
||||||
var meta = metaOpt.get();
|
var meta = metaOpt.get();
|
||||||
if (meta.getMtime() != reply.getObject().getHeader().getMtime()) {
|
if (meta.getMtime() != reply.getObject().getHeader().getMtime()) {
|
||||||
@@ -38,11 +37,7 @@ public class RemoteObjectServiceClient {
|
|||||||
throw new NotImplementedException();
|
throw new NotImplementedException();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return Uni.createFrom().item(new Object(
|
return Uni.createFrom().item(reply.getObject().getContent().toByteArray());
|
||||||
reply.getObject().getHeader().getNamespace(),
|
|
||||||
reply.getObject().getHeader().getName(),
|
|
||||||
reply.getObject().getContent().toByteArray()
|
|
||||||
));
|
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -54,14 +49,14 @@ public class RemoteObjectServiceClient {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
public Boolean notifyUpdate(String namespace, String name, long prevMtime) {
|
public Boolean notifyUpdate(String name, long prevMtime) {
|
||||||
return remoteHostManager.withClient(client -> {
|
return remoteHostManager.withClient(client -> {
|
||||||
var metaOpt = objectIndexService.getMeta(namespace, name);
|
var metaOpt = objectIndexService.getMeta(name);
|
||||||
if (metaOpt.isEmpty()) throw new RuntimeException("Oops!");
|
if (metaOpt.isEmpty()) throw new RuntimeException("Oops!");
|
||||||
var meta = metaOpt.get();
|
var meta = metaOpt.get();
|
||||||
|
|
||||||
var req = IndexUpdatePush.newBuilder().setSelfname(selfname
|
var req = IndexUpdatePush.newBuilder().setSelfname(selfname).setName(name)
|
||||||
).setNamespace(namespace).setName(name).setAssumeUnique(meta.getAssumeUnique())
|
.setAssumeUnique(meta.getAssumeUnique())
|
||||||
.setMtime(meta.getMtime()).setPrevMtime(prevMtime).build();
|
.setMtime(meta.getMtime()).setPrevMtime(prevMtime).build();
|
||||||
client.indexUpdate(req);
|
client.indexUpdate(req);
|
||||||
return true;
|
return true;
|
||||||
|
|||||||
@@ -2,7 +2,6 @@ package com.usatiuk.dhfs.storage.objects.repository.distributed;
|
|||||||
|
|
||||||
import com.google.protobuf.ByteString;
|
import com.google.protobuf.ByteString;
|
||||||
import com.usatiuk.dhfs.objects.repository.distributed.*;
|
import com.usatiuk.dhfs.objects.repository.distributed.*;
|
||||||
import com.usatiuk.dhfs.storage.objects.data.Object;
|
|
||||||
import com.usatiuk.dhfs.storage.objects.repository.persistence.ObjectPersistentStore;
|
import com.usatiuk.dhfs.storage.objects.repository.persistence.ObjectPersistentStore;
|
||||||
import io.grpc.Status;
|
import io.grpc.Status;
|
||||||
import io.grpc.StatusRuntimeException;
|
import io.grpc.StatusRuntimeException;
|
||||||
@@ -31,18 +30,18 @@ public class RemoteObjectServiceServer implements DhfsObjectSyncGrpc {
|
|||||||
@Blocking
|
@Blocking
|
||||||
public Uni<GetObjectReply> getObject(GetObjectRequest request) {
|
public Uni<GetObjectReply> getObject(GetObjectRequest request) {
|
||||||
Log.info("<-- getObject: " + request.getName());
|
Log.info("<-- getObject: " + request.getName());
|
||||||
var metaOpt = objectIndexService.getMeta(request.getNamespace(), request.getName());
|
var metaOpt = objectIndexService.getMeta(request.getName());
|
||||||
if (metaOpt.isEmpty()) throw new StatusRuntimeException(Status.NOT_FOUND);
|
if (metaOpt.isEmpty()) throw new StatusRuntimeException(Status.NOT_FOUND);
|
||||||
var meta = metaOpt.get();
|
var meta = metaOpt.get();
|
||||||
Optional<Pair<Long, Object>> read = meta.runReadLocked(() -> {
|
Optional<Pair<Long, byte[]>> read = meta.runReadLocked(() -> {
|
||||||
if (objectPersistentStore.existsObject(request.getNamespace(), request.getName()).await().indefinitely())
|
if (objectPersistentStore.existsObject(request.getName()).await().indefinitely())
|
||||||
return Optional.of(Pair.of(meta.getMtime(), objectPersistentStore.readObject(request.getNamespace(), request.getName()).await().indefinitely()));
|
return Optional.of(Pair.of(meta.getMtime(), objectPersistentStore.readObject(request.getName()).await().indefinitely()));
|
||||||
return Optional.empty();
|
return Optional.empty();
|
||||||
});
|
});
|
||||||
if (read.isEmpty()) throw new StatusRuntimeException(Status.NOT_FOUND);
|
if (read.isEmpty()) throw new StatusRuntimeException(Status.NOT_FOUND);
|
||||||
var obj = read.get().getRight();
|
var obj = read.get().getRight();
|
||||||
var header = ObjectHeader.newBuilder().setName(obj.getName()).setNamespace(obj.getNamespace()).setMtime(read.get().getLeft()).setAssumeUnique(meta.getAssumeUnique()).build();
|
var header = ObjectHeader.newBuilder().setName(request.getName()).setMtime(read.get().getLeft()).setAssumeUnique(meta.getAssumeUnique()).build();
|
||||||
var replyObj = ApiObject.newBuilder().setHeader(header).setContent(ByteString.copyFrom(obj.getData())).build();
|
var replyObj = ApiObject.newBuilder().setHeader(header).setContent(ByteString.copyFrom(obj)).build();
|
||||||
return Uni.createFrom().item(GetObjectReply.newBuilder().setObject(replyObj).build());
|
return Uni.createFrom().item(GetObjectReply.newBuilder().setObject(replyObj).build());
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -52,7 +51,7 @@ public class RemoteObjectServiceServer implements DhfsObjectSyncGrpc {
|
|||||||
Log.info("<-- getIndex: ");
|
Log.info("<-- getIndex: ");
|
||||||
var builder = GetIndexReply.newBuilder();
|
var builder = GetIndexReply.newBuilder();
|
||||||
objectIndexService.forAllRead((name, meta) -> {
|
objectIndexService.forAllRead((name, meta) -> {
|
||||||
var entry = ObjectHeader.newBuilder().setNamespace(name.getLeft()).setName(name.getRight()).setMtime(meta.getMtime()).setAssumeUnique(meta.getAssumeUnique()).build();
|
var entry = ObjectHeader.newBuilder().setName(name).setMtime(meta.getMtime()).setAssumeUnique(meta.getAssumeUnique()).build();
|
||||||
builder.addObjects(entry);
|
builder.addObjects(entry);
|
||||||
});
|
});
|
||||||
return Uni.createFrom().item(builder.build());
|
return Uni.createFrom().item(builder.build());
|
||||||
|
|||||||
@@ -22,7 +22,7 @@ public class SyncHandler {
|
|||||||
JObjectManager jObjectManager;
|
JObjectManager jObjectManager;
|
||||||
|
|
||||||
public Uni<IndexUpdateReply> handleRemoteUpdate(IndexUpdatePush request) {
|
public Uni<IndexUpdateReply> handleRemoteUpdate(IndexUpdatePush request) {
|
||||||
var metaOpt = objectIndexService.getOrCreateMeta(request.getNamespace(), request.getName(), request.getAssumeUnique());
|
var metaOpt = objectIndexService.getOrCreateMeta(request.getName(), request.getAssumeUnique());
|
||||||
metaOpt.runWriteLocked(() -> {
|
metaOpt.runWriteLocked(() -> {
|
||||||
if (metaOpt.getMtime() == request.getMtime()) {
|
if (metaOpt.getMtime() == request.getMtime()) {
|
||||||
metaOpt._remoteCopies.add(request.getSelfname());
|
metaOpt._remoteCopies.add(request.getSelfname());
|
||||||
@@ -43,11 +43,11 @@ public class SyncHandler {
|
|||||||
metaOpt._remoteCopies.add(request.getSelfname());
|
metaOpt._remoteCopies.add(request.getSelfname());
|
||||||
|
|
||||||
try {
|
try {
|
||||||
objectPersistentStore.deleteObject(request.getNamespace(), request.getName()).await().indefinitely();
|
objectPersistentStore.deleteObject(request.getName()).await().indefinitely();
|
||||||
} catch (Exception ignored) {
|
} catch (Exception ignored) {
|
||||||
}
|
}
|
||||||
|
|
||||||
jObjectManager.invalidateJObject(metaOpt.getNamespace(), metaOpt.getName());
|
jObjectManager.invalidateJObject(metaOpt.getName());
|
||||||
|
|
||||||
return null;
|
return null;
|
||||||
});
|
});
|
||||||
|
|||||||
@@ -1,6 +1,5 @@
|
|||||||
package com.usatiuk.dhfs.storage.objects.repository.persistence;
|
package com.usatiuk.dhfs.storage.objects.repository.persistence;
|
||||||
|
|
||||||
import com.usatiuk.dhfs.storage.objects.data.Object;
|
|
||||||
import io.grpc.Status;
|
import io.grpc.Status;
|
||||||
import io.grpc.StatusRuntimeException;
|
import io.grpc.StatusRuntimeException;
|
||||||
import io.quarkus.logging.Log;
|
import io.quarkus.logging.Log;
|
||||||
@@ -39,8 +38,8 @@ public class FileObjectPersistentStore implements ObjectPersistentStore {
|
|||||||
|
|
||||||
@Nonnull
|
@Nonnull
|
||||||
@Override
|
@Override
|
||||||
public Multi<String> findObjects(String namespace, String prefix) {
|
public Multi<String> findObjects(String prefix) {
|
||||||
Path nsRoot = Paths.get(root, namespace);
|
Path nsRoot = Paths.get(root);
|
||||||
|
|
||||||
if (!nsRoot.toFile().isDirectory())
|
if (!nsRoot.toFile().isDirectory())
|
||||||
throw new StatusRuntimeException(Status.NOT_FOUND);
|
throw new StatusRuntimeException(Status.NOT_FOUND);
|
||||||
@@ -53,8 +52,8 @@ public class FileObjectPersistentStore implements ObjectPersistentStore {
|
|||||||
|
|
||||||
@Nonnull
|
@Nonnull
|
||||||
@Override
|
@Override
|
||||||
public Uni<Boolean> existsObject(String namespace, String name) {
|
public Uni<Boolean> existsObject(String name) {
|
||||||
Path obj = Paths.get(root, namespace, name);
|
Path obj = Paths.get(root, name);
|
||||||
|
|
||||||
if (!obj.toFile().isFile())
|
if (!obj.toFile().isFile())
|
||||||
return Uni.createFrom().item(false);
|
return Uni.createFrom().item(false);
|
||||||
@@ -64,31 +63,31 @@ public class FileObjectPersistentStore implements ObjectPersistentStore {
|
|||||||
|
|
||||||
@Nonnull
|
@Nonnull
|
||||||
@Override
|
@Override
|
||||||
public Uni<Object> readObject(String namespace, String name) {
|
public Uni<byte[]> readObject(String name) {
|
||||||
var file = Path.of(root, namespace, name);
|
var file = Path.of(root, name);
|
||||||
|
|
||||||
if (!file.toFile().exists())
|
if (!file.toFile().exists())
|
||||||
throw new StatusRuntimeException(Status.NOT_FOUND);
|
throw new StatusRuntimeException(Status.NOT_FOUND);
|
||||||
|
|
||||||
return vertx.fileSystem().readFile(file.toString()).map(r -> new Object(namespace, name, r.getBytes()));
|
return vertx.fileSystem().readFile(file.toString()).map(Buffer::getBytes);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Nonnull
|
@Nonnull
|
||||||
@Override
|
@Override
|
||||||
public Uni<Void> writeObject(String namespace, Object object) {
|
public Uni<Void> writeObject(String name, byte[] data) {
|
||||||
var file = Path.of(root, namespace, object.getName());
|
var file = Path.of(root, name);
|
||||||
|
|
||||||
if (!Paths.get(root, namespace).toFile().isDirectory()
|
if (!Paths.get(root).toFile().isDirectory()
|
||||||
&& !Paths.get(root, namespace).toFile().mkdirs())
|
&& !Paths.get(root).toFile().mkdirs())
|
||||||
throw new StatusRuntimeException(Status.INTERNAL);
|
throw new StatusRuntimeException(Status.INTERNAL);
|
||||||
|
|
||||||
return vertx.fileSystem().writeFile(file.toString(), Buffer.buffer(object.getData()));
|
return vertx.fileSystem().writeFile(file.toString(), Buffer.buffer(data));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Nonnull
|
@Nonnull
|
||||||
@Override
|
@Override
|
||||||
public Uni<Void> deleteObject(String namespace, String name) {
|
public Uni<Void> deleteObject(String name) {
|
||||||
var file = Path.of(root, namespace, name);
|
var file = Path.of(root, name);
|
||||||
|
|
||||||
if (!file.toFile().exists())
|
if (!file.toFile().exists())
|
||||||
throw new StatusRuntimeException(Status.NOT_FOUND);
|
throw new StatusRuntimeException(Status.NOT_FOUND);
|
||||||
|
|||||||
@@ -1,6 +1,5 @@
|
|||||||
package com.usatiuk.dhfs.storage.objects.repository.persistence;
|
package com.usatiuk.dhfs.storage.objects.repository.persistence;
|
||||||
|
|
||||||
import com.usatiuk.dhfs.storage.objects.data.Object;
|
|
||||||
import io.smallrye.mutiny.Multi;
|
import io.smallrye.mutiny.Multi;
|
||||||
import io.smallrye.mutiny.Uni;
|
import io.smallrye.mutiny.Uni;
|
||||||
|
|
||||||
@@ -8,14 +7,14 @@ import javax.annotation.Nonnull;
|
|||||||
|
|
||||||
public interface ObjectPersistentStore {
|
public interface ObjectPersistentStore {
|
||||||
@Nonnull
|
@Nonnull
|
||||||
Multi<String> findObjects(String namespace, String prefix);
|
Multi<String> findObjects(String prefix);
|
||||||
@Nonnull
|
@Nonnull
|
||||||
Uni<Boolean> existsObject(String namespace, String name);
|
Uni<Boolean> existsObject(String name);
|
||||||
|
|
||||||
@Nonnull
|
@Nonnull
|
||||||
Uni<Object> readObject(String namespace, String name);
|
Uni<byte[]> readObject(String name);
|
||||||
@Nonnull
|
@Nonnull
|
||||||
Uni<Void> writeObject(String namespace, Object object);
|
Uni<Void> writeObject(String name, byte[] data);
|
||||||
@Nonnull
|
@Nonnull
|
||||||
Uni<Void> deleteObject(String namespace, String name);
|
Uni<Void> deleteObject(String name);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -14,7 +14,6 @@ service DhfsFilesGrpc {
|
|||||||
}
|
}
|
||||||
|
|
||||||
message FindFilesRequest {
|
message FindFilesRequest {
|
||||||
string namespace = 1;
|
|
||||||
string prefix = 2;
|
string prefix = 2;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -26,7 +25,6 @@ message FindFilesReply {
|
|||||||
}
|
}
|
||||||
|
|
||||||
message ReadFileRequest {
|
message ReadFileRequest {
|
||||||
string namespace = 1;
|
|
||||||
string name = 2;
|
string name = 2;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -35,7 +33,6 @@ message ReadFileReply {
|
|||||||
}
|
}
|
||||||
|
|
||||||
message WriteFileRequest {
|
message WriteFileRequest {
|
||||||
string namespace = 1;
|
|
||||||
string name = 2;
|
string name = 2;
|
||||||
bytes data = 10;
|
bytes data = 10;
|
||||||
}
|
}
|
||||||
@@ -44,7 +41,6 @@ message WriteFileReply {
|
|||||||
}
|
}
|
||||||
|
|
||||||
message DeleteFileRequest {
|
message DeleteFileRequest {
|
||||||
string namespace = 1;
|
|
||||||
string name = 2;
|
string name = 2;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -11,13 +11,9 @@ service DhfsObjectGrpc {
|
|||||||
rpc ReadObject (ReadObjectRequest) returns (ReadObjectReply) {}
|
rpc ReadObject (ReadObjectRequest) returns (ReadObjectReply) {}
|
||||||
rpc WriteObject (WriteObjectRequest) returns (WriteObjectReply) {}
|
rpc WriteObject (WriteObjectRequest) returns (WriteObjectReply) {}
|
||||||
rpc DeleteObject (DeleteObjectRequest) returns (DeleteObjectReply) {}
|
rpc DeleteObject (DeleteObjectRequest) returns (DeleteObjectReply) {}
|
||||||
|
|
||||||
rpc CreateNamespace (CreateNamespaceRequest) returns (CreateNamespaceReply) {}
|
|
||||||
rpc DeleteNamespace (DeleteNamespaceRequest) returns (DeleteNamespaceReply) {}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
message FindObjectsRequest {
|
message FindObjectsRequest {
|
||||||
string namespace = 1;
|
|
||||||
string prefix = 2;
|
string prefix = 2;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -29,7 +25,6 @@ message FindObjectsReply {
|
|||||||
}
|
}
|
||||||
|
|
||||||
message ReadObjectRequest {
|
message ReadObjectRequest {
|
||||||
string namespace = 1;
|
|
||||||
string name = 2;
|
string name = 2;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -38,7 +33,6 @@ message ReadObjectReply {
|
|||||||
}
|
}
|
||||||
|
|
||||||
message WriteObjectRequest {
|
message WriteObjectRequest {
|
||||||
string namespace = 1;
|
|
||||||
string name = 2;
|
string name = 2;
|
||||||
bytes data = 10;
|
bytes data = 10;
|
||||||
}
|
}
|
||||||
@@ -47,24 +41,8 @@ message WriteObjectReply {
|
|||||||
}
|
}
|
||||||
|
|
||||||
message DeleteObjectRequest {
|
message DeleteObjectRequest {
|
||||||
string namespace = 1;
|
|
||||||
string name = 2;
|
string name = 2;
|
||||||
}
|
}
|
||||||
|
|
||||||
message DeleteObjectReply {
|
message DeleteObjectReply {
|
||||||
}
|
}
|
||||||
|
|
||||||
message DeleteNamespaceRequest {
|
|
||||||
string namespace = 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
message DeleteNamespaceReply {
|
|
||||||
}
|
|
||||||
|
|
||||||
message CreateNamespaceRequest {
|
|
||||||
string namespace = 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
message CreateNamespaceReply {
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|||||||
@@ -13,7 +13,6 @@ service DhfsObjectSyncGrpc {
|
|||||||
}
|
}
|
||||||
|
|
||||||
message ObjectHeader {
|
message ObjectHeader {
|
||||||
string namespace = 1;
|
|
||||||
string name = 2;
|
string name = 2;
|
||||||
bool assumeUnique = 3;
|
bool assumeUnique = 3;
|
||||||
uint64 mtime = 4;
|
uint64 mtime = 4;
|
||||||
@@ -25,7 +24,6 @@ message ApiObject {
|
|||||||
}
|
}
|
||||||
|
|
||||||
message GetObjectRequest {
|
message GetObjectRequest {
|
||||||
string namespace = 1;
|
|
||||||
string name = 2;
|
string name = 2;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -44,7 +42,6 @@ message GetIndexReply {
|
|||||||
message IndexUpdatePush {
|
message IndexUpdatePush {
|
||||||
string selfname = 10;
|
string selfname = 10;
|
||||||
|
|
||||||
string namespace = 1;
|
|
||||||
string name = 2;
|
string name = 2;
|
||||||
bool assumeUnique = 3;
|
bool assumeUnique = 3;
|
||||||
uint64 mtime = 4;
|
uint64 mtime = 4;
|
||||||
|
|||||||
@@ -20,8 +20,11 @@ public class TestDataCleaner {
|
|||||||
String tempDirectoryIdx;
|
String tempDirectoryIdx;
|
||||||
|
|
||||||
void init(@Observes @Priority(1) StartupEvent event) throws IOException {
|
void init(@Observes @Priority(1) StartupEvent event) throws IOException {
|
||||||
purgeDirectory(Path.of(tempDirectory).toFile());
|
try {
|
||||||
purgeDirectory(Path.of(tempDirectoryIdx).toFile());
|
purgeDirectory(Path.of(tempDirectory).toFile());
|
||||||
|
purgeDirectory(Path.of(tempDirectoryIdx).toFile());
|
||||||
|
} catch (Exception ignored) {
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void shutdown(@Observes @Priority(1000000000) ShutdownEvent event) throws IOException {
|
void shutdown(@Observes @Priority(1000000000) ShutdownEvent event) throws IOException {
|
||||||
|
|||||||
@@ -47,12 +47,10 @@ public class DhfsFileServiceSimpleTest {
|
|||||||
|
|
||||||
// FIXME: dhfs_files
|
// FIXME: dhfs_files
|
||||||
|
|
||||||
objectRepository.createNamespace("dhfs_files");
|
jObjectRepository.writeJObject(c1);
|
||||||
|
jObjectRepository.writeJObject(c2);
|
||||||
jObjectRepository.writeJObject("dhfs_files", c1);
|
jObjectRepository.writeJObject(c3);
|
||||||
jObjectRepository.writeJObject("dhfs_files", c2);
|
jObjectRepository.writeJObject(f);
|
||||||
jObjectRepository.writeJObject("dhfs_files", c3);
|
|
||||||
jObjectRepository.writeJObject("dhfs_files", f);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
String all = "1234567891011";
|
String all = "1234567891011";
|
||||||
|
|||||||
@@ -24,15 +24,12 @@ class DhfsObjectGrpcServiceTest {
|
|||||||
|
|
||||||
@Test
|
@Test
|
||||||
void writeReadTest() {
|
void writeReadTest() {
|
||||||
dhfsObjectGrpc.createNamespace(
|
|
||||||
CreateNamespaceRequest.newBuilder().setNamespace("testns").build())
|
|
||||||
.await().atMost(Duration.ofSeconds(5));
|
|
||||||
dhfsObjectGrpc.writeObject(
|
dhfsObjectGrpc.writeObject(
|
||||||
WriteObjectRequest.newBuilder().setNamespace("testns").setName("cool_file")
|
WriteObjectRequest.newBuilder().setName("cool_file")
|
||||||
.setData(ByteString.copyFrom("Hello world".getBytes())).build())
|
.setData(ByteString.copyFrom("Hello world".getBytes())).build())
|
||||||
.await().atMost(Duration.ofSeconds(5));
|
.await().atMost(Duration.ofSeconds(5));
|
||||||
var read = dhfsObjectGrpc.readObject(
|
var read = dhfsObjectGrpc.readObject(
|
||||||
ReadObjectRequest.newBuilder().setNamespace("testns").setName("cool_file").build())
|
ReadObjectRequest.newBuilder().setName("cool_file").build())
|
||||||
.await().atMost(Duration.ofSeconds(5));
|
.await().atMost(Duration.ofSeconds(5));
|
||||||
Assertions.assertArrayEquals(read.getData().toByteArray(), "Hello world".getBytes());
|
Assertions.assertArrayEquals(read.getData().toByteArray(), "Hello world".getBytes());
|
||||||
// var found = dhfsObjectGrpc.findObjects(FindObjectsRequest.newBuilder().setNamespace("testns").build())
|
// var found = dhfsObjectGrpc.findObjects(FindObjectsRequest.newBuilder().setNamespace("testns").build())
|
||||||
|
|||||||
Reference in New Issue
Block a user