From 71080dc45c9d77b2377072ca132abc40c7d7d8a1 Mon Sep 17 00:00:00 2001 From: Stepan Usatiuk Date: Sun, 16 Jun 2024 18:34:14 +0200 Subject: [PATCH] something? --- .../usatiuk/dhfs/storage/fuse/DhfsFuse.java | 48 +++++++------- .../objects/api/DhfsObjectGrpcService.java | 2 +- .../objects/jrepository/JObjectManager.java | 1 + .../jrepository/JObjectManagerImpl.java | 6 ++ .../jrepository/JObjectRepositoryImpl.java | 3 +- .../objects/repository/ObjectRepository.java | 3 +- .../SimplePersistentObjectRepository.java | 3 +- .../DistributedObjectRepository.java | 50 +++++++++++++-- .../repository/distributed/HostInfo.java | 11 ---- .../repository/distributed/ObjectIndex.java | 9 ++- .../distributed/ObjectIndexService.java | 24 ++++++- .../repository/distributed/ObjectMeta.java | 22 +++++-- .../distributed/RemoteHostManager.java | 55 ++++++++++++++++ .../RemoteObjectServiceClient.java | 57 ++++++++++++++++- .../RemoteObjectServiceServer.java | 62 ++++++++++++++++++- .../repository/distributed/SyncHandler.java | 58 +++++++++++++++++ server/src/main/proto/dhfs_files.proto | 2 +- server/src/main/proto/dhfs_objects.proto | 2 +- server/src/main/proto/dhfs_objects_sync.proto | 56 +++++++++++++++++ .../src/main/resources/application.properties | 11 +++- 20 files changed, 420 insertions(+), 65 deletions(-) delete mode 100644 server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/HostInfo.java create mode 100644 server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/SyncHandler.java create mode 100644 server/src/main/proto/dhfs_objects_sync.proto diff --git a/server/src/main/java/com/usatiuk/dhfs/storage/fuse/DhfsFuse.java b/server/src/main/java/com/usatiuk/dhfs/storage/fuse/DhfsFuse.java index 9cdf420f..9c828587 100644 --- a/server/src/main/java/com/usatiuk/dhfs/storage/fuse/DhfsFuse.java +++ b/server/src/main/java/com/usatiuk/dhfs/storage/fuse/DhfsFuse.java @@ -67,36 +67,34 @@ public class DhfsFuse extends FuseStubFS { @Override public int getattr(String path, FileStat stat) { - Optional found; try { - found = fileService.getDirEntry(path).await().indefinitely(); + Optional found = fileService.getDirEntry(path).await().indefinitely(); + if (found.isEmpty()) { + return -ErrorCodes.ENOENT(); + } + if (found.get() instanceof File f) { + stat.st_mode.set(S_IFREG | f.getMode()); + stat.st_nlink.set(1); + stat.st_size.set(fileService.size(f).await().indefinitely()); + } else if (found.get() instanceof Directory d) { + stat.st_mode.set(S_IFDIR | d.getMode()); + stat.st_nlink.set(2); + } + var foundDent = (FsNode) found.get(); + + var ctime = System.currentTimeMillis(); + stat.st_atim.tv_sec.set(ctime / 1000); + stat.st_atim.tv_nsec.set((ctime % 1000) * 1000); + + // FIXME: Race? + stat.st_ctim.tv_sec.set(foundDent.getCtime() / 1000); + stat.st_ctim.tv_nsec.set((foundDent.getCtime() % 1000) * 1000); + stat.st_mtim.tv_sec.set(foundDent.getMtime() / 1000); + stat.st_mtim.tv_nsec.set((foundDent.getMtime() % 1000) * 1000); } catch (Exception e) { Log.error("When accessing " + path, e); return -ErrorCodes.ENOENT(); } - if (found.isEmpty()) { - return -ErrorCodes.ENOENT(); - } - if (found.get() instanceof File f) { - stat.st_mode.set(S_IFREG | f.getMode()); - stat.st_nlink.set(1); - stat.st_size.set(fileService.size(f).await().indefinitely()); - } else if (found.get() instanceof Directory d) { - stat.st_mode.set(S_IFDIR | d.getMode()); - stat.st_nlink.set(2); - } - var foundDent = (FsNode) found.get(); - - var ctime = System.currentTimeMillis(); - stat.st_atim.tv_sec.set(ctime / 1000); - stat.st_atim.tv_nsec.set((ctime % 1000) * 1000); - - // FIXME: Race? - stat.st_ctim.tv_sec.set(foundDent.getCtime() / 1000); - stat.st_ctim.tv_nsec.set((foundDent.getCtime() % 1000) * 1000); - stat.st_mtim.tv_sec.set(foundDent.getMtime() / 1000); - stat.st_mtim.tv_nsec.set((foundDent.getMtime() % 1000) * 1000); - return 0; } diff --git a/server/src/main/java/com/usatiuk/dhfs/storage/objects/api/DhfsObjectGrpcService.java b/server/src/main/java/com/usatiuk/dhfs/storage/objects/api/DhfsObjectGrpcService.java index 29b7bb22..e881968f 100644 --- a/server/src/main/java/com/usatiuk/dhfs/storage/objects/api/DhfsObjectGrpcService.java +++ b/server/src/main/java/com/usatiuk/dhfs/storage/objects/api/DhfsObjectGrpcService.java @@ -33,7 +33,7 @@ public class DhfsObjectGrpcService implements DhfsObjectGrpc { @RunOnVirtualThread public Uni writeObject(WriteObjectRequest request) { objectRepository.writeObject(request.getNamespace(), - new Object(new Namespace(request.getNamespace()), request.getName(), request.getData().toByteArray())); + new Object(new Namespace(request.getNamespace()), request.getName(), request.getData().toByteArray()), false); return Uni.createFrom().item(WriteObjectReply.newBuilder().build()); } diff --git a/server/src/main/java/com/usatiuk/dhfs/storage/objects/jrepository/JObjectManager.java b/server/src/main/java/com/usatiuk/dhfs/storage/objects/jrepository/JObjectManager.java index 21dceb6a..c4abc4c6 100644 --- a/server/src/main/java/com/usatiuk/dhfs/storage/objects/jrepository/JObjectManager.java +++ b/server/src/main/java/com/usatiuk/dhfs/storage/objects/jrepository/JObjectManager.java @@ -7,4 +7,5 @@ import java.util.Optional; public interface JObjectManager { Uni> get(String namespace, String key, Class clazz); Uni put(String namespace, T object); + void invalidateJObject(String namespace, String name); } diff --git a/server/src/main/java/com/usatiuk/dhfs/storage/objects/jrepository/JObjectManagerImpl.java b/server/src/main/java/com/usatiuk/dhfs/storage/objects/jrepository/JObjectManagerImpl.java index 5d30b563..62556d37 100644 --- a/server/src/main/java/com/usatiuk/dhfs/storage/objects/jrepository/JObjectManagerImpl.java +++ b/server/src/main/java/com/usatiuk/dhfs/storage/objects/jrepository/JObjectManagerImpl.java @@ -93,4 +93,10 @@ public class JObjectManagerImpl implements JObjectManager { return Uni.createFrom().voidItem(); } + @Override + public void invalidateJObject(String namespace, String name) { + synchronized (_map) { + _map.remove(name); + } + } } diff --git a/server/src/main/java/com/usatiuk/dhfs/storage/objects/jrepository/JObjectRepositoryImpl.java b/server/src/main/java/com/usatiuk/dhfs/storage/objects/jrepository/JObjectRepositoryImpl.java index 615cae4d..b1a5246f 100644 --- a/server/src/main/java/com/usatiuk/dhfs/storage/objects/jrepository/JObjectRepositoryImpl.java +++ b/server/src/main/java/com/usatiuk/dhfs/storage/objects/jrepository/JObjectRepositoryImpl.java @@ -10,7 +10,6 @@ import jakarta.inject.Inject; import org.apache.commons.lang3.SerializationUtils; import javax.annotation.Nonnull; -import java.io.ByteArrayInputStream; import java.util.Optional; @ApplicationScoped @@ -50,6 +49,6 @@ public class JObjectRepositoryImpl implements JObjectRepository { new Namespace(namespace), object.getName(), SerializationUtils.serialize(object)); - objectRepository.writeObject(namespace, obj); + objectRepository.writeObject(namespace, obj, object.assumeUnique()); } } diff --git a/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/ObjectRepository.java b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/ObjectRepository.java index 09baa1c9..aa27c48f 100644 --- a/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/ObjectRepository.java +++ b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/ObjectRepository.java @@ -5,7 +5,6 @@ import io.smallrye.mutiny.Multi; import io.smallrye.mutiny.Uni; import javax.annotation.Nonnull; -import java.nio.ByteBuffer; public interface ObjectRepository { @Nonnull @@ -16,7 +15,7 @@ public interface ObjectRepository { @Nonnull Object readObject(String namespace, String name); @Nonnull - void writeObject(String namespace, Object object); + void writeObject(String namespace, Object object, Boolean canIgnoreConflict); @Nonnull void deleteObject(String namespace, String name); diff --git a/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/SimplePersistentObjectRepository.java b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/SimplePersistentObjectRepository.java index 7c3556fe..81425c8e 100644 --- a/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/SimplePersistentObjectRepository.java +++ b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/SimplePersistentObjectRepository.java @@ -2,7 +2,6 @@ package com.usatiuk.dhfs.storage.objects.repository; import com.usatiuk.dhfs.storage.objects.data.Object; import com.usatiuk.dhfs.storage.objects.repository.persistence.ObjectPersistentStore; -import io.smallrye.common.annotation.RunOnVirtualThread; import io.smallrye.mutiny.Multi; import io.smallrye.mutiny.Uni; import jakarta.inject.Inject; @@ -35,7 +34,7 @@ public class SimplePersistentObjectRepository implements ObjectRepository { @Nonnull @Override - public void writeObject(String namespace, Object object) { + public void writeObject(String namespace, Object object, Boolean canIgnoreConflict) { objectPersistentStore.writeObject(namespace, object).await().indefinitely(); } diff --git a/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/DistributedObjectRepository.java b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/DistributedObjectRepository.java index 12434fb4..9fd68f6f 100644 --- a/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/DistributedObjectRepository.java +++ b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/DistributedObjectRepository.java @@ -1,8 +1,10 @@ package com.usatiuk.dhfs.storage.objects.repository.distributed; +import com.usatiuk.dhfs.objects.repository.distributed.IndexUpdatePush; import com.usatiuk.dhfs.storage.objects.data.Object; import com.usatiuk.dhfs.storage.objects.repository.ObjectRepository; import com.usatiuk.dhfs.storage.objects.repository.persistence.ObjectPersistentStore; +import io.quarkus.logging.Log; import io.quarkus.runtime.ShutdownEvent; import io.quarkus.runtime.StartupEvent; import io.smallrye.mutiny.Multi; @@ -13,12 +15,16 @@ import jakarta.enterprise.context.ApplicationScoped; import jakarta.enterprise.event.Observes; import jakarta.inject.Inject; import org.apache.commons.lang3.NotImplementedException; +import org.eclipse.microprofile.config.inject.ConfigProperty; import javax.annotation.Nonnull; import java.io.IOException; +import java.util.Optional; @ApplicationScoped public class DistributedObjectRepository implements ObjectRepository { + @ConfigProperty(name = "dhfs.objects.distributed.selfname") + String selfname; @Inject Vertx vertx; @@ -31,7 +37,27 @@ public class DistributedObjectRepository implements ObjectRepository { @Inject RemoteObjectServiceClient remoteObjectServiceClient; + @Inject + SyncHandler syncHandler; + void init(@Observes @Priority(400) StartupEvent event) throws IOException { + try { + Log.info("Starting sync"); + var got = remoteObjectServiceClient.getIndex(); + for (var h : got) { + var prevMtime = objectIndexService.exists(h.getNamespace(), h.getName()) + ? objectIndexService.getMeta(h.getNamespace(), h.getName()).get().getMtime() + : 0; + syncHandler.handleRemoteUpdate( + IndexUpdatePush.newBuilder().setSelfname(selfname + ).setNamespace(h.getNamespace()).setName(h.getName()).setAssumeUnique(h.getAssumeUnique()) + .setMtime(h.getMtime()).setPrevMtime(prevMtime).build()).await().indefinitely(); + } + Log.info("Sync complete"); + } catch (Exception e) { + Log.error("Error when fetching remote index:"); + Log.error(e); + } } void shutdown(@Observes @Priority(200) ShutdownEvent event) throws IOException { @@ -60,8 +86,13 @@ public class DistributedObjectRepository implements ObjectRepository { var info = infoOpt.get(); - if (objectPersistentStore.existsObject(namespace, name).await().indefinitely()) - return objectPersistentStore.readObject(namespace, name).await().indefinitely(); + Optional read = info.runReadLocked(() -> { + if (objectPersistentStore.existsObject(namespace, name).await().indefinitely()) + return Optional.of(objectPersistentStore.readObject(namespace, name).await().indefinitely()); + return Optional.empty(); + }); + if (read.isPresent()) return read.get(); + // Race? return info.runWriteLocked(() -> { return remoteObjectServiceClient.getObject(namespace, name).map(got -> { @@ -73,13 +104,22 @@ public class DistributedObjectRepository implements ObjectRepository { @Nonnull @Override - public void writeObject(String namespace, Object object) { - var info = objectIndexService.getOrCreateMeta(namespace, object.getName()); + public void writeObject(String namespace, Object object, Boolean canIgnoreConflict) { + var info = objectIndexService.getOrCreateMeta(namespace, object.getName(), canIgnoreConflict); info.runWriteLocked(() -> { objectPersistentStore.writeObject(namespace, object).await().indefinitely(); + var prevMtime = info.getMtime(); info.setMtime(System.currentTimeMillis()); - remoteObjectServiceClient.notifyUpdate(namespace, object.getName()).await().indefinitely(); + try { + Log.warn("Updating object " + object.getNamespace() + "/" + object.getName() + " from: " + info.getMtime() + " to: " + prevMtime); + remoteObjectServiceClient.notifyUpdate(namespace, object.getName(), prevMtime); + Log.warn("Updating object complete" + object.getNamespace() + "/" + object.getName() + " from: " + info.getMtime() + " to: " + prevMtime); + } catch (Exception e) { + Log.error("Error when notifying remote update:"); + Log.error(e); + Log.error(e.getCause()); + } return null; }); } diff --git a/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/HostInfo.java b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/HostInfo.java deleted file mode 100644 index 640a7626..00000000 --- a/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/HostInfo.java +++ /dev/null @@ -1,11 +0,0 @@ -package com.usatiuk.dhfs.storage.objects.repository.distributed; - -import lombok.Getter; -import lombok.Setter; - -@Getter -@Setter -public class HostInfo { - String name; - String ip; -} diff --git a/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/ObjectIndex.java b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/ObjectIndex.java index 4904a480..e1d834cd 100644 --- a/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/ObjectIndex.java +++ b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/ObjectIndex.java @@ -55,12 +55,15 @@ public class ObjectIndex implements Serializable { }); } - public ObjectMeta getOrCreate(String namespace, String name) { + public ObjectMeta getOrCreate(String namespace, String name, boolean assumeUnique) { return runWriteLocked(() -> { if (_objectMetaMap.containsKey(new ImmutablePair<>(namespace, name))) { - return _objectMetaMap.get(new ImmutablePair<>(namespace, name)); + var got = _objectMetaMap.get(new ImmutablePair<>(namespace, name)); + if (got.getAssumeUnique() != assumeUnique) + throw new IllegalArgumentException("assumeUnique mismatch for " + namespace + "/" + name); + return got; } else { - var newObjectMeta = new ObjectMeta(namespace, name); + var newObjectMeta = new ObjectMeta(namespace, name, assumeUnique); _objectMetaMap.put(new ImmutablePair<>(namespace, name), newObjectMeta); return newObjectMeta; } diff --git a/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/ObjectIndexService.java b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/ObjectIndexService.java index e992d3d1..45483c40 100644 --- a/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/ObjectIndexService.java +++ b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/ObjectIndexService.java @@ -1,13 +1,18 @@ package com.usatiuk.dhfs.storage.objects.repository.distributed; +import com.usatiuk.dhfs.objects.repository.distributed.IndexUpdatePush; +import com.usatiuk.dhfs.objects.repository.distributed.IndexUpdateReply; import com.usatiuk.dhfs.storage.DeserializationHelper; import io.quarkus.logging.Log; import io.quarkus.runtime.ShutdownEvent; import io.quarkus.runtime.StartupEvent; +import io.smallrye.mutiny.Uni; import jakarta.annotation.Priority; import jakarta.enterprise.context.ApplicationScoped; import jakarta.enterprise.event.Observes; +import org.apache.commons.lang3.NotImplementedException; import org.apache.commons.lang3.SerializationUtils; +import org.apache.commons.lang3.tuple.ImmutablePair; import org.eclipse.microprofile.config.inject.ConfigProperty; import java.io.IOException; @@ -46,7 +51,22 @@ public class ObjectIndexService { return _index.get(namespace, name); } - public ObjectMeta getOrCreateMeta(String namespace, String name) { - return _index.getOrCreate(namespace, name); + public ObjectMeta getOrCreateMeta(String namespace, String name, boolean assumeUnique) { + return _index.getOrCreate(namespace, name, assumeUnique); + } + + @FunctionalInterface + public interface ForAllFn { + void apply(ImmutablePair name, ObjectMeta meta); + } + + public void forAllRead(ForAllFn fn) { + _index.runReadLocked(() -> { + // FIXME: + for (var entry : _index._objectMetaMap.entrySet()) { + fn.apply(entry.getKey(), entry.getValue()); + } + return null; + }); } } diff --git a/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/ObjectMeta.java b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/ObjectMeta.java index 6671bf48..65966fe0 100644 --- a/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/ObjectMeta.java +++ b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/ObjectMeta.java @@ -1,7 +1,6 @@ package com.usatiuk.dhfs.storage.objects.repository.distributed; import lombok.Getter; -import lombok.Setter; import java.io.Serializable; import java.util.ArrayList; @@ -11,9 +10,10 @@ import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; public class ObjectMeta implements Serializable { - public ObjectMeta(String namespace, String name) { + public ObjectMeta(String namespace, String name, Boolean assumeUnique) { this._namespace = namespace; this._name = name; + this._assumeUnique = assumeUnique; } private final ReadWriteLock _lock = new ReentrantReadWriteLock(); @@ -23,12 +23,24 @@ public class ObjectMeta implements Serializable { @Getter final String _name; - @Getter - @Setter long _mtime; @Getter - final List _remoteCopies = new ArrayList<>(); + final Boolean _assumeUnique; + + //FIXME: + final List _remoteCopies = new ArrayList<>(); + + public void setMtime(long mtime) { + runWriteLocked(() -> { + _mtime = mtime; + return null; + }); + } + + public long getMtime() { + return runReadLocked(() -> _mtime); + } public R runReadLocked(Callable fn) { _lock.readLock().lock(); diff --git a/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/RemoteHostManager.java b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/RemoteHostManager.java index 5d4cfdcd..2c232225 100644 --- a/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/RemoteHostManager.java +++ b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/RemoteHostManager.java @@ -1,7 +1,62 @@ package com.usatiuk.dhfs.storage.objects.repository.distributed; +import com.usatiuk.dhfs.objects.repository.distributed.DhfsObjectSyncGrpcGrpc; +import io.grpc.netty.NettyChannelBuilder; +import io.quarkus.runtime.ShutdownEvent; +import io.quarkus.runtime.StartupEvent; +import jakarta.annotation.Priority; import jakarta.enterprise.context.ApplicationScoped; +import jakarta.enterprise.event.Observes; +import lombok.AllArgsConstructor; +import lombok.Getter; +import org.eclipse.microprofile.config.inject.ConfigProperty; + +import java.io.IOException; +import java.util.HashMap; @ApplicationScoped public class RemoteHostManager { + @ConfigProperty(name = "dhfs.objects.distributed.selfname") + String selfname; + + @ConfigProperty(name = "dhfs.objects.distributed.friend") + String remoteHostName; + @ConfigProperty(name = "dhfs.objects.distributed.friendAddr") + String remoteHostAddr; + @ConfigProperty(name = "dhfs.objects.distributed.friendPort") + String remoteHostPort; + + @Getter + @AllArgsConstructor + private static class HostInfo { + String _addr; + Integer _port; + } + + final HashMap _remoteHosts = new HashMap<>(); + + void init(@Observes @Priority(350) StartupEvent event) throws IOException { + _remoteHosts.put(remoteHostName, new HostInfo(remoteHostAddr, Integer.valueOf(remoteHostPort))); + } + + void shutdown(@Observes @Priority(250) ShutdownEvent event) throws IOException { + } + + @FunctionalInterface + public interface ClientFunction { + R apply(DhfsObjectSyncGrpcGrpc.DhfsObjectSyncGrpcBlockingStub client); + } + + public R withClient(ClientFunction fn) { + var hostInfo = _remoteHosts.get(remoteHostName); + var channel = NettyChannelBuilder.forAddress(hostInfo.getAddr(), hostInfo.getPort()) + .usePlaintext().build(); + var client = DhfsObjectSyncGrpcGrpc.newBlockingStub(channel).withMaxOutboundMessageSize(Integer.MAX_VALUE) + .withMaxInboundMessageSize(Integer.MAX_VALUE); + try { + return fn.apply(client); + } finally { + channel.shutdownNow(); + } + } } diff --git a/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/RemoteObjectServiceClient.java b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/RemoteObjectServiceClient.java index 28a96d72..d47597ae 100644 --- a/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/RemoteObjectServiceClient.java +++ b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/RemoteObjectServiceClient.java @@ -1,20 +1,71 @@ package com.usatiuk.dhfs.storage.objects.repository.distributed; +import com.usatiuk.dhfs.objects.repository.distributed.GetIndexRequest; +import com.usatiuk.dhfs.objects.repository.distributed.GetObjectRequest; +import com.usatiuk.dhfs.objects.repository.distributed.IndexUpdatePush; +import com.usatiuk.dhfs.objects.repository.distributed.ObjectHeader; +import com.usatiuk.dhfs.storage.objects.data.Namespace; import com.usatiuk.dhfs.storage.objects.data.Object; +import io.quarkus.logging.Log; import io.smallrye.mutiny.Uni; import jakarta.enterprise.context.ApplicationScoped; import jakarta.inject.Inject; +import org.apache.commons.lang3.NotImplementedException; +import org.eclipse.microprofile.config.inject.ConfigProperty; + +import java.util.List; @ApplicationScoped public class RemoteObjectServiceClient { + @ConfigProperty(name = "dhfs.objects.distributed.selfname") + String selfname; + @Inject ObjectIndexService objectIndexService; + @Inject + RemoteHostManager remoteHostManager; + public Uni getObject(String namespace, String name) { - return Uni.createFrom().item(() -> null); + return remoteHostManager.withClient(client -> { + var req = GetObjectRequest.newBuilder().setNamespace(namespace).setName(name).build(); + var reply = client.getObject(req); + var metaOpt = objectIndexService.getMeta(namespace, name); + if (metaOpt.isEmpty()) throw new RuntimeException("Oops!"); + var meta = metaOpt.get(); + if (meta.getMtime() != reply.getObject().getHeader().getMtime()) { + if (!meta.getAssumeUnique() && (meta.getAssumeUnique() != reply.getObject().getHeader().getAssumeUnique())) { + Log.error("Conflict!"); + throw new NotImplementedException(); + } + } + return Uni.createFrom().item(new Object( + new Namespace(reply.getObject().getHeader().getNamespace()), + reply.getObject().getHeader().getName(), + reply.getObject().getContent().toByteArray() + )); + }); } - public Uni notifyUpdate(String namespace, String name) { - return Uni.createFrom().item(true); + public List getIndex() { + return remoteHostManager.withClient(client -> { + var req = GetIndexRequest.newBuilder().build(); + var reply = client.getIndex(req); + return reply.getObjectsList(); + }); + } + + public Boolean notifyUpdate(String namespace, String name, long prevMtime) { + return remoteHostManager.withClient(client -> { + var metaOpt = objectIndexService.getMeta(namespace, name); + if (metaOpt.isEmpty()) throw new RuntimeException("Oops!"); + var meta = metaOpt.get(); + + var req = IndexUpdatePush.newBuilder().setSelfname(selfname + ).setNamespace(namespace).setName(name).setAssumeUnique(meta.getAssumeUnique()) + .setMtime(meta.getMtime()).setPrevMtime(prevMtime).build(); + client.indexUpdate(req); + return true; + }); } } diff --git a/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/RemoteObjectServiceServer.java b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/RemoteObjectServiceServer.java index 54aacc8f..61e3c3b3 100644 --- a/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/RemoteObjectServiceServer.java +++ b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/RemoteObjectServiceServer.java @@ -1,7 +1,67 @@ package com.usatiuk.dhfs.storage.objects.repository.distributed; +import com.google.protobuf.ByteString; +import com.usatiuk.dhfs.objects.repository.distributed.*; +import com.usatiuk.dhfs.storage.objects.data.Object; +import com.usatiuk.dhfs.storage.objects.repository.persistence.ObjectPersistentStore; +import io.grpc.Status; +import io.grpc.StatusRuntimeException; import io.quarkus.grpc.GrpcService; +import io.quarkus.logging.Log; +import io.smallrye.common.annotation.Blocking; +import io.smallrye.mutiny.Uni; +import jakarta.inject.Inject; +import org.apache.commons.lang3.tuple.Pair; +import java.util.Optional; + +// Note: RunOnVirtualThread hangs somehow @GrpcService -public class RemoteObjectServiceServer { +public class RemoteObjectServiceServer implements DhfsObjectSyncGrpc { + @Inject + ObjectPersistentStore objectPersistentStore; + + @Inject + ObjectIndexService objectIndexService; + + @Inject + SyncHandler syncHandler; + + @Override + @Blocking + public Uni getObject(GetObjectRequest request) { + Log.info("<-- getObject: " + request.getName()); + var metaOpt = objectIndexService.getMeta(request.getNamespace(), request.getName()); + if (metaOpt.isEmpty()) throw new StatusRuntimeException(Status.NOT_FOUND); + var meta = metaOpt.get(); + Optional> read = meta.runReadLocked(() -> { + if (objectPersistentStore.existsObject(request.getNamespace(), request.getName()).await().indefinitely()) + return Optional.of(Pair.of(meta.getMtime(), objectPersistentStore.readObject(request.getNamespace(), request.getName()).await().indefinitely())); + return Optional.empty(); + }); + if (read.isEmpty()) throw new StatusRuntimeException(Status.NOT_FOUND); + var obj = read.get().getRight(); + var header = ObjectHeader.newBuilder().setName(obj.getName()).setNamespace(obj.getNamespace().getName()).setMtime(read.get().getLeft()).setAssumeUnique(meta.getAssumeUnique()).build(); + var replyObj = ApiObject.newBuilder().setHeader(header).setContent(ByteString.copyFrom(obj.getData())).build(); + return Uni.createFrom().item(GetObjectReply.newBuilder().setObject(replyObj).build()); + } + + @Override + @Blocking + public Uni getIndex(GetIndexRequest request) { + Log.info("<-- getIndex: "); + var builder = GetIndexReply.newBuilder(); + objectIndexService.forAllRead((name, meta) -> { + var entry = ObjectHeader.newBuilder().setNamespace(name.getLeft()).setName(name.getRight()).setMtime(meta.getMtime()).setAssumeUnique(meta.getAssumeUnique()).build(); + builder.addObjects(entry); + }); + return Uni.createFrom().item(builder.build()); + } + + @Override + @Blocking + public Uni indexUpdate(IndexUpdatePush request) { + Log.info("<-- indexUpdate: " + request.getName() + " from: " + String.valueOf(request.getPrevMtime()) + " to: " + String.valueOf(request.getMtime())); + return syncHandler.handleRemoteUpdate(request); + } } diff --git a/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/SyncHandler.java b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/SyncHandler.java new file mode 100644 index 00000000..dab9f648 --- /dev/null +++ b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/SyncHandler.java @@ -0,0 +1,58 @@ +package com.usatiuk.dhfs.storage.objects.repository.distributed; + +import com.usatiuk.dhfs.objects.repository.distributed.IndexUpdatePush; +import com.usatiuk.dhfs.objects.repository.distributed.IndexUpdateReply; +import com.usatiuk.dhfs.storage.objects.jrepository.JObjectManager; +import com.usatiuk.dhfs.storage.objects.repository.persistence.ObjectPersistentStore; +import io.quarkus.logging.Log; +import io.smallrye.mutiny.Uni; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; +import org.apache.commons.lang3.NotImplementedException; + +@ApplicationScoped +public class SyncHandler { + @Inject + ObjectPersistentStore objectPersistentStore; + + @Inject + ObjectIndexService objectIndexService; + + @Inject + JObjectManager jObjectManager; + + public Uni handleRemoteUpdate(IndexUpdatePush request) { + var metaOpt = objectIndexService.getOrCreateMeta(request.getNamespace(), request.getName(), request.getAssumeUnique()); + metaOpt.runWriteLocked(() -> { + if (metaOpt.getMtime() == request.getMtime()) { + metaOpt._remoteCopies.add(request.getSelfname()); + return null; + } + + if (metaOpt.getMtime() != request.getPrevMtime()) { + if (!metaOpt.getAssumeUnique() + || (metaOpt.getAssumeUnique() != request.getAssumeUnique())) { + Log.error("Conflict!"); + throw new NotImplementedException(); + } + } + + metaOpt.setMtime(request.getMtime()); + + metaOpt._remoteCopies.clear(); + metaOpt._remoteCopies.add(request.getSelfname()); + + try { + objectPersistentStore.deleteObject(request.getNamespace(), request.getName()).await().indefinitely(); + } catch (Exception ignored) { + } + + jObjectManager.invalidateJObject(metaOpt.getNamespace(), metaOpt.getName()); + + return null; + }); + + return Uni.createFrom().item(IndexUpdateReply.newBuilder().build()); + } + +} diff --git a/server/src/main/proto/dhfs_files.proto b/server/src/main/proto/dhfs_files.proto index d326c1bb..38bf033c 100644 --- a/server/src/main/proto/dhfs_files.proto +++ b/server/src/main/proto/dhfs_files.proto @@ -4,7 +4,7 @@ option java_multiple_files = true; option java_package = "com.usatiuk.dhfs.storage.files.api"; option java_outer_classname = "DhfsFilesApi"; -package hello; +package dhfs.files; service DhfsFilesGrpc { rpc FindFiles (FindFilesRequest) returns (FindFilesReply) {} diff --git a/server/src/main/proto/dhfs_objects.proto b/server/src/main/proto/dhfs_objects.proto index a6e422b8..f36c2d90 100644 --- a/server/src/main/proto/dhfs_objects.proto +++ b/server/src/main/proto/dhfs_objects.proto @@ -4,7 +4,7 @@ option java_multiple_files = true; option java_package = "com.usatiuk.dhfs.storage.objects.api"; option java_outer_classname = "DhfsObjectsApi"; -package hello; +package dhfs.objects; service DhfsObjectGrpc { rpc FindObjects (FindObjectsRequest) returns (FindObjectsReply) {} diff --git a/server/src/main/proto/dhfs_objects_sync.proto b/server/src/main/proto/dhfs_objects_sync.proto new file mode 100644 index 00000000..87305415 --- /dev/null +++ b/server/src/main/proto/dhfs_objects_sync.proto @@ -0,0 +1,56 @@ +syntax = "proto3"; + +option java_multiple_files = true; +option java_package = "com.usatiuk.dhfs.objects.repository.distributed"; +option java_outer_classname = "DhfsObjectSyncApi"; + +package dhfs.objects.sync; + +service DhfsObjectSyncGrpc { + rpc GetObject (GetObjectRequest) returns (GetObjectReply) {} + rpc GetIndex (GetIndexRequest) returns (GetIndexReply) {} + rpc IndexUpdate (IndexUpdatePush) returns (IndexUpdateReply) {} +} + +message ObjectHeader { + string namespace = 1; + string name = 2; + bool assumeUnique = 3; + uint64 mtime = 4; +} + +message ApiObject { + ObjectHeader header = 1; + bytes content = 2; +} + +message GetObjectRequest { + string namespace = 1; + string name = 2; +} + +message GetObjectReply { + ApiObject object = 1; +} + +message GetIndexRequest { + +} + +message GetIndexReply { + repeated ObjectHeader objects = 1; +} + +message IndexUpdatePush { + string selfname = 10; + + string namespace = 1; + string name = 2; + bool assumeUnique = 3; + uint64 mtime = 4; + uint64 prevMtime = 5; +} + +message IndexUpdateReply { + +} \ No newline at end of file diff --git a/server/src/main/resources/application.properties b/server/src/main/resources/application.properties index 1f8834b1..b16e53a6 100644 --- a/server/src/main/resources/application.properties +++ b/server/src/main/resources/application.properties @@ -1,4 +1,13 @@ quarkus.grpc.server.use-separate-server=false dhfs.objects.persistence.files.root=${HOME}/dhfs_data/dhfs_root dhfs.objects.distributed.root=${HOME}/dhfs_data/dhfs_root_d -dhfs.fuse.root=${HOME}/dhfs_data/dhfs_fuse_root \ No newline at end of file +dhfs.fuse.root=${HOME}/dhfs_data/dhfs_fuse_root + +grpc.client.greeting-service.max-inbound-message-size=9155241000 +grpc.client.greeting-service.package-max-inbound-message-size=9155241000 +grpc.client.greeting-service.server.max-inbound-message-size=9155241000 + +dhfs.objects.distributed.selfname=thing1 +dhfs.objects.distributed.friend=localhost +dhfs.objects.distributed.friendAddr=asdf +dhfs.objects.distributed.friendPort=1234 \ No newline at end of file