From de5338a8134d2af7592fb78ef18bc405b644648a Mon Sep 17 00:00:00 2001 From: Stepan Usatiuk Date: Wed, 14 May 2025 16:16:27 +0200 Subject: [PATCH] more cleanup --- README.md | 4 +- .../dhfsfs/service/DhfsFileService.java | 12 +- .../java/com/usatiuk/dhfsfuse/DhfsFuse.java | 2 +- .../dhfs/rpc/RemoteObjectServiceServer.java | 141 ++++++++++++++++- .../rpc/RemoteObjectServiceServerImpl.java | 146 ------------------ 5 files changed, 145 insertions(+), 160 deletions(-) delete mode 100644 dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/rpc/RemoteObjectServiceServerImpl.java diff --git a/README.md b/README.md index b5cca726..55caef32 100644 --- a/README.md +++ b/README.md @@ -42,10 +42,10 @@ One line in the `extra-opts` file corresponds to one option passed to the JVM wh Some extra possible configuration options are: -- `-Ddhfs.fuse.root=` specifies the root where filesystem should be mounted. By default it is the `fuse` path under the `run-wrapper` root. For windows, it should be a disk letter, by default `Z:\`. +- `-Ddhfs.fuse.root=` specifies the root where filesystem should be mounted. By default, it is the `fuse` path under the `run-wrapper` root. For windows, it should be a disk letter, by default `Z:\`. - `-Ddhfs.objects.last-seen.timeout=` specifies the period of time (in seconds) after which unavailable peers will be ignored for gabgage collection and resynchronized after being reconnected. The default is 43200 (30 days), if set to `-1`, this feature is disabled. - `-Ddhfs.objects.autosync.download-all=` specifies whether all objects (files and their data) should be downloaded to this peer. `true` or `false`, the default is `false`. -On Windows, the entire space for the filesystem should also be preallocated, the `-Ddhfs.objects.persistence.lmdb.size=` option controls the size (the value is in bytes), on Windows the default is 100GB. +On Windows, the entire space for the filesystem should also be preallocated, the `-Ddhfs.objects.persistence.lmdb.size=` option controls the size (the value is in bytes), on Windows the default is 100 GB. Then, a web interface will be available at `losthost:8080`, that can be used to connect with other peers. diff --git a/dhfs-parent/dhfs-fs/src/main/java/com/usatiuk/dhfsfs/service/DhfsFileService.java b/dhfs-parent/dhfs-fs/src/main/java/com/usatiuk/dhfsfs/service/DhfsFileService.java index 2dae0c37..ef72dac4 100644 --- a/dhfs-parent/dhfs-fs/src/main/java/com/usatiuk/dhfsfs/service/DhfsFileService.java +++ b/dhfs-parent/dhfs-fs/src/main/java/com/usatiuk/dhfsfs/service/DhfsFileService.java @@ -254,7 +254,7 @@ public class DhfsFileService { * @param to the new name * @return true if the rename was successful, false otherwise */ - public Boolean rename(String from, String to) { + public boolean rename(String from, String to) { return jObjectTxManager.executeTx(() -> { var node = getDirEntry(from); JKleppmannTreeNodeMeta meta = node.meta(); @@ -275,7 +275,7 @@ public class DhfsFileService { * @param mode the new mode * @return true if the mode was changed successfully, false otherwise */ - public Boolean chmod(JObjectKey uuid, long mode) { + public boolean chmod(JObjectKey uuid, long mode) { return jObjectTxManager.executeTx(() -> { var dent = curTx.get(JData.class, uuid).orElseThrow(() -> new StatusRuntimeExceptionNoStacktrace(Status.NOT_FOUND)); @@ -420,7 +420,7 @@ public class DhfsFileService { * @param data the data to write * @return the number of bytes written */ - public Long write(JObjectKey fileUuid, long offset, ByteString data) { + public long write(JObjectKey fileUuid, long offset, ByteString data) { return jObjectTxManager.executeTx(() -> { if (offset < 0) throw new StatusRuntimeException(Status.INVALID_ARGUMENT.withDescription("Offset should be more than zero: " + offset)); @@ -529,7 +529,7 @@ public class DhfsFileService { * @param length the new length of the file * @return true if the truncate was successful, false otherwise */ - public Boolean truncate(JObjectKey fileUuid, long length) { + public boolean truncate(JObjectKey fileUuid, long length) { return jObjectTxManager.executeTx(() -> { if (length < 0) throw new StatusRuntimeException(Status.INVALID_ARGUMENT.withDescription("Length should be more than zero: " + length)); @@ -721,7 +721,7 @@ public class DhfsFileService { * @param mtimeMs the modification time in milliseconds * @return true if the times were set successfully, false otherwise */ - public Boolean setTimes(JObjectKey fileUuid, long mtimeMs) { + public boolean setTimes(JObjectKey fileUuid, long mtimeMs) { return jObjectTxManager.executeTx(() -> { var dent = curTx.get(JData.class, fileUuid).orElseThrow(() -> new StatusRuntimeExceptionNoStacktrace(Status.NOT_FOUND)); @@ -775,7 +775,7 @@ public class DhfsFileService { * @param data the data to write * @return the number of bytes written */ - public Long write(JObjectKey fileUuid, long offset, byte[] data) { + public long write(JObjectKey fileUuid, long offset, byte[] data) { return write(fileUuid, offset, UnsafeByteOperations.unsafeWrap(data)); } diff --git a/dhfs-parent/dhfs-fuse/src/main/java/com/usatiuk/dhfsfuse/DhfsFuse.java b/dhfs-parent/dhfs-fuse/src/main/java/com/usatiuk/dhfsfuse/DhfsFuse.java index 423daa0b..eb02b04a 100644 --- a/dhfs-parent/dhfs-fuse/src/main/java/com/usatiuk/dhfsfuse/DhfsFuse.java +++ b/dhfs-parent/dhfs-fuse/src/main/java/com/usatiuk/dhfsfuse/DhfsFuse.java @@ -267,7 +267,7 @@ public class DhfsFuse extends FuseStubFS { try { var fileKey = getFromHandle(fi.fh.get()); var written = fileService.write(fileKey, offset, UnsafeByteOperations.unsafeWrap(buffer)); - return written.intValue(); + return Math.toIntExact(written); } catch (Exception e) { Log.error("When writing " + path, e); return -ErrorCodes.EIO(); diff --git a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/rpc/RemoteObjectServiceServer.java b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/rpc/RemoteObjectServiceServer.java index 695de70e..b91ba444 100644 --- a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/rpc/RemoteObjectServiceServer.java +++ b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/rpc/RemoteObjectServiceServer.java @@ -1,13 +1,32 @@ package com.usatiuk.dhfs.rpc; +import com.usatiuk.dhfs.ProtoSerializer; +import com.usatiuk.dhfs.autosync.AutosyncProcessor; +import com.usatiuk.dhfs.invalidation.Op; +import com.usatiuk.dhfs.invalidation.OpHandlerService; import com.usatiuk.dhfs.peersync.PeerId; +import com.usatiuk.dhfs.peersync.ReachablePeerManager; +import com.usatiuk.dhfs.persistence.JObjectKeyP; +import com.usatiuk.dhfs.remoteobj.*; import com.usatiuk.dhfs.repository.*; +import com.usatiuk.dhfs.syncmap.DtoMapperService; +import com.usatiuk.objects.JObjectKey; +import com.usatiuk.objects.transaction.Transaction; +import com.usatiuk.objects.transaction.TransactionHandle; +import com.usatiuk.objects.transaction.TransactionManager; +import io.grpc.Status; +import io.grpc.StatusRuntimeException; import io.quarkus.grpc.GrpcService; +import io.quarkus.logging.Log; import io.quarkus.security.identity.SecurityIdentity; import io.smallrye.common.annotation.Blocking; import io.smallrye.mutiny.Uni; import jakarta.annotation.security.RolesAllowed; import jakarta.inject.Inject; +import org.apache.commons.lang3.tuple.Pair; + +import java.util.ArrayList; +import java.util.concurrent.atomic.AtomicLong; // Note: RunOnVirtualThread hangs somehow @GrpcService @@ -15,8 +34,120 @@ import jakarta.inject.Inject; public class RemoteObjectServiceServer implements DhfsObjectSyncGrpc { @Inject SecurityIdentity identity; + @Inject - RemoteObjectServiceServerImpl remoteObjectServiceServerImpl; + TransactionManager txm; + @Inject + Transaction curTx; + + @Inject + ProtoSerializer opProtoSerializer; + @Inject + ProtoSerializer receivedObjectProtoSerializer; + @Inject + RemoteTransaction remoteTx; + @Inject + OpHandlerService opHandlerService; + @Inject + DtoMapperService dtoMapperService; + @Inject + AutosyncProcessor autosyncProcessor; + + public Uni getObject(PeerId from, GetObjectRequest request) { + Log.info("<-- getObject: " + request.getName() + " from " + from); + + Pair got = txm.run(() -> { + var meta = remoteTx.getMeta(JObjectKey.of(request.getName().getName())).orElse(null); + var obj = remoteTx.getDataLocal(JDataRemote.class, JObjectKey.of(request.getName().getName())).orElse(null); + if (meta != null && !meta.seen()) + curTx.put(meta.withSeen(true)); + if (obj != null) + for (var ref : obj.collectRefsTo()) { + var refMeta = remoteTx.getMeta(ref).orElse(null); + if (refMeta != null && !refMeta.seen()) + curTx.put(refMeta.withSeen(true)); + } + return Pair.of(meta, obj == null ? null : dtoMapperService.toDto(obj, obj.dtoClass())); + }); + + if ((got.getValue() != null) && (got.getKey() == null)) { + Log.error("Inconsistent state for object meta: " + request.getName()); + throw new StatusRuntimeException(Status.INTERNAL); + } + + if (got.getValue() == null) { + Log.info("<-- getObject NOT FOUND: " + request.getName() + " from " + from); + throw new StatusRuntimeException(Status.NOT_FOUND); + } + + var serialized = receivedObjectProtoSerializer.serialize(new ReceivedObject(got.getKey().changelog(), got.getRight())); + return Uni.createFrom().item(serialized); + } + + public Uni canDelete(PeerId from, CanDeleteRequest request) { + Log.infov("<-- canDelete: {0} from {1}", request, from); + + var builder = CanDeleteReply.newBuilder(); + + txm.run(() -> { + var obj = curTx.get(RemoteObjectMeta.class, JObjectKey.of(request.getName().getName())).orElse(null); + + if (obj == null) { + builder.setDeletionCandidate(true); + return; + } + + builder.setDeletionCandidate(!obj.frozen() && obj.refsFrom().isEmpty()); + + if (!builder.getDeletionCandidate()) { + for (var r : obj.refsFrom()) { + builder.addReferrers(JObjectKeyP.newBuilder().setName(r.obj().toString()).build()); + curTx.onCommit(() -> autosyncProcessor.add(r.obj())); + } + } + + if (!builder.getDeletionCandidate()) { + Log.infov("Not deletion candidate: {0}, {1} (asked from {2})", obj, builder, from); + } + }); + return Uni.createFrom().item(builder.build()); + } + + public Uni opPush(PeerId from, OpPushRequest request) { + if (request.getMsgCount() == 0) { + Log.infov("<-- opPush: empty from {0}", from); + return Uni.createFrom().item(OpPushReply.getDefaultInstance()); + } + + var handles = new ArrayList(); + try { + var ops = request.getMsgList().stream().map(opProtoSerializer::deserialize).toList(); + for (var op : ops) { + Log.infov("<-- opPush: {0} from {1}", op, from); + var handle = txm.run(() -> { + opHandlerService.handleOp(from, op); + }); + handles.add(handle); + } + } catch (Exception e) { + Log.error("Error handling ops", e); + throw e; + } + return Uni.createFrom().emitter(e -> { + var counter = new AtomicLong(handles.size()); + for (var handle : handles) { + handle.onFlush(() -> { + if (counter.decrementAndGet() == 0) { + e.complete(OpPushReply.getDefaultInstance()); + } + }); + } + }); + } + + public Uni ping(PeerId from, PingRequest request) { + return Uni.createFrom().item(PingReply.getDefaultInstance()); + } private PeerId getIdentity() { return PeerId.of(identity.getPrincipal().getName().substring(3)); @@ -25,24 +156,24 @@ public class RemoteObjectServiceServer implements DhfsObjectSyncGrpc { @Override @Blocking public Uni getObject(GetObjectRequest request) { - return remoteObjectServiceServerImpl.getObject(getIdentity(), request); + return getObject(getIdentity(), request); } @Override @Blocking public Uni canDelete(CanDeleteRequest request) { - return remoteObjectServiceServerImpl.canDelete(getIdentity(), request); + return canDelete(getIdentity(), request); } @Override @Blocking public Uni opPush(OpPushRequest request) { - return remoteObjectServiceServerImpl.opPush(getIdentity(), request); + return opPush(getIdentity(), request); } @Override @Blocking public Uni ping(PingRequest request) { - return remoteObjectServiceServerImpl.ping(getIdentity(), request); + return ping(getIdentity(), request); } } diff --git a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/rpc/RemoteObjectServiceServerImpl.java b/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/rpc/RemoteObjectServiceServerImpl.java deleted file mode 100644 index d0fe8f41..00000000 --- a/dhfs-parent/sync-base/src/main/java/com/usatiuk/dhfs/rpc/RemoteObjectServiceServerImpl.java +++ /dev/null @@ -1,146 +0,0 @@ -package com.usatiuk.dhfs.rpc; - -import com.usatiuk.dhfs.ProtoSerializer; -import com.usatiuk.dhfs.autosync.AutosyncProcessor; -import com.usatiuk.dhfs.invalidation.Op; -import com.usatiuk.dhfs.invalidation.OpHandlerService; -import com.usatiuk.dhfs.peersync.PeerId; -import com.usatiuk.dhfs.peersync.ReachablePeerManager; -import com.usatiuk.dhfs.persistence.JObjectKeyP; -import com.usatiuk.dhfs.remoteobj.*; -import com.usatiuk.dhfs.repository.*; -import com.usatiuk.dhfs.syncmap.DtoMapperService; -import com.usatiuk.objects.JObjectKey; -import com.usatiuk.objects.transaction.Transaction; -import com.usatiuk.objects.transaction.TransactionHandle; -import com.usatiuk.objects.transaction.TransactionManager; -import io.grpc.Status; -import io.grpc.StatusRuntimeException; -import io.quarkus.logging.Log; -import io.smallrye.mutiny.Uni; -import jakarta.enterprise.context.ApplicationScoped; -import jakarta.inject.Inject; -import org.apache.commons.lang3.tuple.Pair; - -import java.util.ArrayList; -import java.util.concurrent.atomic.AtomicLong; - -// Note: RunOnVirtualThread hangs somehow -@ApplicationScoped -public class RemoteObjectServiceServerImpl { - @Inject - TransactionManager txm; - @Inject - ReachablePeerManager reachablePeerManager; - @Inject - Transaction curTx; - - @Inject - ProtoSerializer opProtoSerializer; - @Inject - ProtoSerializer receivedObjectProtoSerializer; - @Inject - RemoteTransaction remoteTx; - @Inject - OpHandlerService opHandlerService; - @Inject - DtoMapperService dtoMapperService; - @Inject - AutosyncProcessor autosyncProcessor; - - public Uni getObject(PeerId from, GetObjectRequest request) { - Log.info("<-- getObject: " + request.getName() + " from " + from); - - Pair got = txm.run(() -> { - var meta = remoteTx.getMeta(JObjectKey.of(request.getName().getName())).orElse(null); - var obj = remoteTx.getDataLocal(JDataRemote.class, JObjectKey.of(request.getName().getName())).orElse(null); - if (meta != null && !meta.seen()) - curTx.put(meta.withSeen(true)); - if (obj != null) - for (var ref : obj.collectRefsTo()) { - var refMeta = remoteTx.getMeta(ref).orElse(null); - if (refMeta != null && !refMeta.seen()) - curTx.put(refMeta.withSeen(true)); - } - return Pair.of(meta, obj == null ? null : dtoMapperService.toDto(obj, obj.dtoClass())); - }); - - if ((got.getValue() != null) && (got.getKey() == null)) { - Log.error("Inconsistent state for object meta: " + request.getName()); - throw new StatusRuntimeException(Status.INTERNAL); - } - - if (got.getValue() == null) { - Log.info("<-- getObject NOT FOUND: " + request.getName() + " from " + from); - throw new StatusRuntimeException(Status.NOT_FOUND); - } - - var serialized = receivedObjectProtoSerializer.serialize(new ReceivedObject(got.getKey().changelog(), got.getRight())); - return Uni.createFrom().item(serialized); - } - - public Uni canDelete(PeerId from, CanDeleteRequest request) { - Log.infov("<-- canDelete: {0} from {1}", request, from); - - var builder = CanDeleteReply.newBuilder(); - - txm.run(() -> { - var obj = curTx.get(RemoteObjectMeta.class, JObjectKey.of(request.getName().getName())).orElse(null); - - if (obj == null) { - builder.setDeletionCandidate(true); - return; - } - - builder.setDeletionCandidate(!obj.frozen() && obj.refsFrom().isEmpty()); - - if (!builder.getDeletionCandidate()) { - for (var r : obj.refsFrom()) { - builder.addReferrers(JObjectKeyP.newBuilder().setName(r.obj().toString()).build()); - curTx.onCommit(() -> autosyncProcessor.add(r.obj())); - } - } - - if (!builder.getDeletionCandidate()) { - Log.infov("Not deletion candidate: {0}, {1} (asked from {2})", obj, builder, from); - } - }); - return Uni.createFrom().item(builder.build()); - } - - public Uni opPush(PeerId from, OpPushRequest request) { - if (request.getMsgCount() == 0) { - Log.infov("<-- opPush: empty from {0}", from); - return Uni.createFrom().item(OpPushReply.getDefaultInstance()); - } - - var handles = new ArrayList(); - try { - var ops = request.getMsgList().stream().map(opProtoSerializer::deserialize).toList(); - for (var op : ops) { - Log.infov("<-- opPush: {0} from {1}", op, from); - var handle = txm.run(() -> { - opHandlerService.handleOp(from, op); - }); - handles.add(handle); - } - } catch (Exception e) { - Log.error("Error handling ops", e); - throw e; - } - return Uni.createFrom().emitter(e -> { - var counter = new AtomicLong(handles.size()); - for (var handle : handles) { - handle.onFlush(() -> { - if (counter.decrementAndGet() == 0) { - e.complete(OpPushReply.getDefaultInstance()); - } - }); - } - }); - } - - public Uni ping(PeerId from, PingRequest request) { - return Uni.createFrom().item(PingReply.getDefaultInstance()); - } -}