mirror of
https://github.com/usatiuk/dhfs.git
synced 2025-10-28 20:47:49 +01:00
more cleanup
This commit is contained in:
@@ -42,10 +42,10 @@ One line in the `extra-opts` file corresponds to one option passed to the JVM wh
|
|||||||
|
|
||||||
Some extra possible configuration options are:
|
Some extra possible configuration options are:
|
||||||
|
|
||||||
- `-Ddhfs.fuse.root=` specifies the root where filesystem should be mounted. By default it is the `fuse` path under the `run-wrapper` root. For windows, it should be a disk letter, by default `Z:\`.
|
- `-Ddhfs.fuse.root=` specifies the root where filesystem should be mounted. By default, it is the `fuse` path under the `run-wrapper` root. For windows, it should be a disk letter, by default `Z:\`.
|
||||||
- `-Ddhfs.objects.last-seen.timeout=` specifies the period of time (in seconds) after which unavailable peers will be ignored for gabgage collection and resynchronized after being reconnected. The default is 43200 (30 days), if set to `-1`, this feature is disabled.
|
- `-Ddhfs.objects.last-seen.timeout=` specifies the period of time (in seconds) after which unavailable peers will be ignored for gabgage collection and resynchronized after being reconnected. The default is 43200 (30 days), if set to `-1`, this feature is disabled.
|
||||||
- `-Ddhfs.objects.autosync.download-all=` specifies whether all objects (files and their data) should be downloaded to this peer. `true` or `false`, the default is `false`.
|
- `-Ddhfs.objects.autosync.download-all=` specifies whether all objects (files and their data) should be downloaded to this peer. `true` or `false`, the default is `false`.
|
||||||
|
|
||||||
On Windows, the entire space for the filesystem should also be preallocated, the `-Ddhfs.objects.persistence.lmdb.size=` option controls the size (the value is in bytes), on Windows the default is 100GB.
|
On Windows, the entire space for the filesystem should also be preallocated, the `-Ddhfs.objects.persistence.lmdb.size=` option controls the size (the value is in bytes), on Windows the default is 100 GB.
|
||||||
|
|
||||||
Then, a web interface will be available at `losthost:8080`, that can be used to connect with other peers.
|
Then, a web interface will be available at `losthost:8080`, that can be used to connect with other peers.
|
||||||
|
|||||||
@@ -254,7 +254,7 @@ public class DhfsFileService {
|
|||||||
* @param to the new name
|
* @param to the new name
|
||||||
* @return true if the rename was successful, false otherwise
|
* @return true if the rename was successful, false otherwise
|
||||||
*/
|
*/
|
||||||
public Boolean rename(String from, String to) {
|
public boolean rename(String from, String to) {
|
||||||
return jObjectTxManager.executeTx(() -> {
|
return jObjectTxManager.executeTx(() -> {
|
||||||
var node = getDirEntry(from);
|
var node = getDirEntry(from);
|
||||||
JKleppmannTreeNodeMeta meta = node.meta();
|
JKleppmannTreeNodeMeta meta = node.meta();
|
||||||
@@ -275,7 +275,7 @@ public class DhfsFileService {
|
|||||||
* @param mode the new mode
|
* @param mode the new mode
|
||||||
* @return true if the mode was changed successfully, false otherwise
|
* @return true if the mode was changed successfully, false otherwise
|
||||||
*/
|
*/
|
||||||
public Boolean chmod(JObjectKey uuid, long mode) {
|
public boolean chmod(JObjectKey uuid, long mode) {
|
||||||
return jObjectTxManager.executeTx(() -> {
|
return jObjectTxManager.executeTx(() -> {
|
||||||
var dent = curTx.get(JData.class, uuid).orElseThrow(() -> new StatusRuntimeExceptionNoStacktrace(Status.NOT_FOUND));
|
var dent = curTx.get(JData.class, uuid).orElseThrow(() -> new StatusRuntimeExceptionNoStacktrace(Status.NOT_FOUND));
|
||||||
|
|
||||||
@@ -420,7 +420,7 @@ public class DhfsFileService {
|
|||||||
* @param data the data to write
|
* @param data the data to write
|
||||||
* @return the number of bytes written
|
* @return the number of bytes written
|
||||||
*/
|
*/
|
||||||
public Long write(JObjectKey fileUuid, long offset, ByteString data) {
|
public long write(JObjectKey fileUuid, long offset, ByteString data) {
|
||||||
return jObjectTxManager.executeTx(() -> {
|
return jObjectTxManager.executeTx(() -> {
|
||||||
if (offset < 0)
|
if (offset < 0)
|
||||||
throw new StatusRuntimeException(Status.INVALID_ARGUMENT.withDescription("Offset should be more than zero: " + offset));
|
throw new StatusRuntimeException(Status.INVALID_ARGUMENT.withDescription("Offset should be more than zero: " + offset));
|
||||||
@@ -529,7 +529,7 @@ public class DhfsFileService {
|
|||||||
* @param length the new length of the file
|
* @param length the new length of the file
|
||||||
* @return true if the truncate was successful, false otherwise
|
* @return true if the truncate was successful, false otherwise
|
||||||
*/
|
*/
|
||||||
public Boolean truncate(JObjectKey fileUuid, long length) {
|
public boolean truncate(JObjectKey fileUuid, long length) {
|
||||||
return jObjectTxManager.executeTx(() -> {
|
return jObjectTxManager.executeTx(() -> {
|
||||||
if (length < 0)
|
if (length < 0)
|
||||||
throw new StatusRuntimeException(Status.INVALID_ARGUMENT.withDescription("Length should be more than zero: " + length));
|
throw new StatusRuntimeException(Status.INVALID_ARGUMENT.withDescription("Length should be more than zero: " + length));
|
||||||
@@ -721,7 +721,7 @@ public class DhfsFileService {
|
|||||||
* @param mtimeMs the modification time in milliseconds
|
* @param mtimeMs the modification time in milliseconds
|
||||||
* @return true if the times were set successfully, false otherwise
|
* @return true if the times were set successfully, false otherwise
|
||||||
*/
|
*/
|
||||||
public Boolean setTimes(JObjectKey fileUuid, long mtimeMs) {
|
public boolean setTimes(JObjectKey fileUuid, long mtimeMs) {
|
||||||
return jObjectTxManager.executeTx(() -> {
|
return jObjectTxManager.executeTx(() -> {
|
||||||
var dent = curTx.get(JData.class, fileUuid).orElseThrow(() -> new StatusRuntimeExceptionNoStacktrace(Status.NOT_FOUND));
|
var dent = curTx.get(JData.class, fileUuid).orElseThrow(() -> new StatusRuntimeExceptionNoStacktrace(Status.NOT_FOUND));
|
||||||
|
|
||||||
@@ -775,7 +775,7 @@ public class DhfsFileService {
|
|||||||
* @param data the data to write
|
* @param data the data to write
|
||||||
* @return the number of bytes written
|
* @return the number of bytes written
|
||||||
*/
|
*/
|
||||||
public Long write(JObjectKey fileUuid, long offset, byte[] data) {
|
public long write(JObjectKey fileUuid, long offset, byte[] data) {
|
||||||
return write(fileUuid, offset, UnsafeByteOperations.unsafeWrap(data));
|
return write(fileUuid, offset, UnsafeByteOperations.unsafeWrap(data));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -267,7 +267,7 @@ public class DhfsFuse extends FuseStubFS {
|
|||||||
try {
|
try {
|
||||||
var fileKey = getFromHandle(fi.fh.get());
|
var fileKey = getFromHandle(fi.fh.get());
|
||||||
var written = fileService.write(fileKey, offset, UnsafeByteOperations.unsafeWrap(buffer));
|
var written = fileService.write(fileKey, offset, UnsafeByteOperations.unsafeWrap(buffer));
|
||||||
return written.intValue();
|
return Math.toIntExact(written);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
Log.error("When writing " + path, e);
|
Log.error("When writing " + path, e);
|
||||||
return -ErrorCodes.EIO();
|
return -ErrorCodes.EIO();
|
||||||
|
|||||||
@@ -1,13 +1,32 @@
|
|||||||
package com.usatiuk.dhfs.rpc;
|
package com.usatiuk.dhfs.rpc;
|
||||||
|
|
||||||
|
import com.usatiuk.dhfs.ProtoSerializer;
|
||||||
|
import com.usatiuk.dhfs.autosync.AutosyncProcessor;
|
||||||
|
import com.usatiuk.dhfs.invalidation.Op;
|
||||||
|
import com.usatiuk.dhfs.invalidation.OpHandlerService;
|
||||||
import com.usatiuk.dhfs.peersync.PeerId;
|
import com.usatiuk.dhfs.peersync.PeerId;
|
||||||
|
import com.usatiuk.dhfs.peersync.ReachablePeerManager;
|
||||||
|
import com.usatiuk.dhfs.persistence.JObjectKeyP;
|
||||||
|
import com.usatiuk.dhfs.remoteobj.*;
|
||||||
import com.usatiuk.dhfs.repository.*;
|
import com.usatiuk.dhfs.repository.*;
|
||||||
|
import com.usatiuk.dhfs.syncmap.DtoMapperService;
|
||||||
|
import com.usatiuk.objects.JObjectKey;
|
||||||
|
import com.usatiuk.objects.transaction.Transaction;
|
||||||
|
import com.usatiuk.objects.transaction.TransactionHandle;
|
||||||
|
import com.usatiuk.objects.transaction.TransactionManager;
|
||||||
|
import io.grpc.Status;
|
||||||
|
import io.grpc.StatusRuntimeException;
|
||||||
import io.quarkus.grpc.GrpcService;
|
import io.quarkus.grpc.GrpcService;
|
||||||
|
import io.quarkus.logging.Log;
|
||||||
import io.quarkus.security.identity.SecurityIdentity;
|
import io.quarkus.security.identity.SecurityIdentity;
|
||||||
import io.smallrye.common.annotation.Blocking;
|
import io.smallrye.common.annotation.Blocking;
|
||||||
import io.smallrye.mutiny.Uni;
|
import io.smallrye.mutiny.Uni;
|
||||||
import jakarta.annotation.security.RolesAllowed;
|
import jakarta.annotation.security.RolesAllowed;
|
||||||
import jakarta.inject.Inject;
|
import jakarta.inject.Inject;
|
||||||
|
import org.apache.commons.lang3.tuple.Pair;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
|
||||||
// Note: RunOnVirtualThread hangs somehow
|
// Note: RunOnVirtualThread hangs somehow
|
||||||
@GrpcService
|
@GrpcService
|
||||||
@@ -15,8 +34,120 @@ import jakarta.inject.Inject;
|
|||||||
public class RemoteObjectServiceServer implements DhfsObjectSyncGrpc {
|
public class RemoteObjectServiceServer implements DhfsObjectSyncGrpc {
|
||||||
@Inject
|
@Inject
|
||||||
SecurityIdentity identity;
|
SecurityIdentity identity;
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
RemoteObjectServiceServerImpl remoteObjectServiceServerImpl;
|
TransactionManager txm;
|
||||||
|
@Inject
|
||||||
|
Transaction curTx;
|
||||||
|
|
||||||
|
@Inject
|
||||||
|
ProtoSerializer<OpP, Op> opProtoSerializer;
|
||||||
|
@Inject
|
||||||
|
ProtoSerializer<GetObjectReply, ReceivedObject> receivedObjectProtoSerializer;
|
||||||
|
@Inject
|
||||||
|
RemoteTransaction remoteTx;
|
||||||
|
@Inject
|
||||||
|
OpHandlerService opHandlerService;
|
||||||
|
@Inject
|
||||||
|
DtoMapperService dtoMapperService;
|
||||||
|
@Inject
|
||||||
|
AutosyncProcessor autosyncProcessor;
|
||||||
|
|
||||||
|
public Uni<GetObjectReply> getObject(PeerId from, GetObjectRequest request) {
|
||||||
|
Log.info("<-- getObject: " + request.getName() + " from " + from);
|
||||||
|
|
||||||
|
Pair<RemoteObjectMeta, JDataRemoteDto> got = txm.run(() -> {
|
||||||
|
var meta = remoteTx.getMeta(JObjectKey.of(request.getName().getName())).orElse(null);
|
||||||
|
var obj = remoteTx.getDataLocal(JDataRemote.class, JObjectKey.of(request.getName().getName())).orElse(null);
|
||||||
|
if (meta != null && !meta.seen())
|
||||||
|
curTx.put(meta.withSeen(true));
|
||||||
|
if (obj != null)
|
||||||
|
for (var ref : obj.collectRefsTo()) {
|
||||||
|
var refMeta = remoteTx.getMeta(ref).orElse(null);
|
||||||
|
if (refMeta != null && !refMeta.seen())
|
||||||
|
curTx.put(refMeta.withSeen(true));
|
||||||
|
}
|
||||||
|
return Pair.of(meta, obj == null ? null : dtoMapperService.toDto(obj, obj.dtoClass()));
|
||||||
|
});
|
||||||
|
|
||||||
|
if ((got.getValue() != null) && (got.getKey() == null)) {
|
||||||
|
Log.error("Inconsistent state for object meta: " + request.getName());
|
||||||
|
throw new StatusRuntimeException(Status.INTERNAL);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (got.getValue() == null) {
|
||||||
|
Log.info("<-- getObject NOT FOUND: " + request.getName() + " from " + from);
|
||||||
|
throw new StatusRuntimeException(Status.NOT_FOUND);
|
||||||
|
}
|
||||||
|
|
||||||
|
var serialized = receivedObjectProtoSerializer.serialize(new ReceivedObject(got.getKey().changelog(), got.getRight()));
|
||||||
|
return Uni.createFrom().item(serialized);
|
||||||
|
}
|
||||||
|
|
||||||
|
public Uni<CanDeleteReply> canDelete(PeerId from, CanDeleteRequest request) {
|
||||||
|
Log.infov("<-- canDelete: {0} from {1}", request, from);
|
||||||
|
|
||||||
|
var builder = CanDeleteReply.newBuilder();
|
||||||
|
|
||||||
|
txm.run(() -> {
|
||||||
|
var obj = curTx.get(RemoteObjectMeta.class, JObjectKey.of(request.getName().getName())).orElse(null);
|
||||||
|
|
||||||
|
if (obj == null) {
|
||||||
|
builder.setDeletionCandidate(true);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
builder.setDeletionCandidate(!obj.frozen() && obj.refsFrom().isEmpty());
|
||||||
|
|
||||||
|
if (!builder.getDeletionCandidate()) {
|
||||||
|
for (var r : obj.refsFrom()) {
|
||||||
|
builder.addReferrers(JObjectKeyP.newBuilder().setName(r.obj().toString()).build());
|
||||||
|
curTx.onCommit(() -> autosyncProcessor.add(r.obj()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!builder.getDeletionCandidate()) {
|
||||||
|
Log.infov("Not deletion candidate: {0}, {1} (asked from {2})", obj, builder, from);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
return Uni.createFrom().item(builder.build());
|
||||||
|
}
|
||||||
|
|
||||||
|
public Uni<OpPushReply> opPush(PeerId from, OpPushRequest request) {
|
||||||
|
if (request.getMsgCount() == 0) {
|
||||||
|
Log.infov("<-- opPush: empty from {0}", from);
|
||||||
|
return Uni.createFrom().item(OpPushReply.getDefaultInstance());
|
||||||
|
}
|
||||||
|
|
||||||
|
var handles = new ArrayList<TransactionHandle>();
|
||||||
|
try {
|
||||||
|
var ops = request.getMsgList().stream().map(opProtoSerializer::deserialize).toList();
|
||||||
|
for (var op : ops) {
|
||||||
|
Log.infov("<-- opPush: {0} from {1}", op, from);
|
||||||
|
var handle = txm.run(() -> {
|
||||||
|
opHandlerService.handleOp(from, op);
|
||||||
|
});
|
||||||
|
handles.add(handle);
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
Log.error("Error handling ops", e);
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
return Uni.createFrom().emitter(e -> {
|
||||||
|
var counter = new AtomicLong(handles.size());
|
||||||
|
for (var handle : handles) {
|
||||||
|
handle.onFlush(() -> {
|
||||||
|
if (counter.decrementAndGet() == 0) {
|
||||||
|
e.complete(OpPushReply.getDefaultInstance());
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
public Uni<PingReply> ping(PeerId from, PingRequest request) {
|
||||||
|
return Uni.createFrom().item(PingReply.getDefaultInstance());
|
||||||
|
}
|
||||||
|
|
||||||
private PeerId getIdentity() {
|
private PeerId getIdentity() {
|
||||||
return PeerId.of(identity.getPrincipal().getName().substring(3));
|
return PeerId.of(identity.getPrincipal().getName().substring(3));
|
||||||
@@ -25,24 +156,24 @@ public class RemoteObjectServiceServer implements DhfsObjectSyncGrpc {
|
|||||||
@Override
|
@Override
|
||||||
@Blocking
|
@Blocking
|
||||||
public Uni<GetObjectReply> getObject(GetObjectRequest request) {
|
public Uni<GetObjectReply> getObject(GetObjectRequest request) {
|
||||||
return remoteObjectServiceServerImpl.getObject(getIdentity(), request);
|
return getObject(getIdentity(), request);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@Blocking
|
@Blocking
|
||||||
public Uni<CanDeleteReply> canDelete(CanDeleteRequest request) {
|
public Uni<CanDeleteReply> canDelete(CanDeleteRequest request) {
|
||||||
return remoteObjectServiceServerImpl.canDelete(getIdentity(), request);
|
return canDelete(getIdentity(), request);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@Blocking
|
@Blocking
|
||||||
public Uni<OpPushReply> opPush(OpPushRequest request) {
|
public Uni<OpPushReply> opPush(OpPushRequest request) {
|
||||||
return remoteObjectServiceServerImpl.opPush(getIdentity(), request);
|
return opPush(getIdentity(), request);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@Blocking
|
@Blocking
|
||||||
public Uni<PingReply> ping(PingRequest request) {
|
public Uni<PingReply> ping(PingRequest request) {
|
||||||
return remoteObjectServiceServerImpl.ping(getIdentity(), request);
|
return ping(getIdentity(), request);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,146 +0,0 @@
|
|||||||
package com.usatiuk.dhfs.rpc;
|
|
||||||
|
|
||||||
import com.usatiuk.dhfs.ProtoSerializer;
|
|
||||||
import com.usatiuk.dhfs.autosync.AutosyncProcessor;
|
|
||||||
import com.usatiuk.dhfs.invalidation.Op;
|
|
||||||
import com.usatiuk.dhfs.invalidation.OpHandlerService;
|
|
||||||
import com.usatiuk.dhfs.peersync.PeerId;
|
|
||||||
import com.usatiuk.dhfs.peersync.ReachablePeerManager;
|
|
||||||
import com.usatiuk.dhfs.persistence.JObjectKeyP;
|
|
||||||
import com.usatiuk.dhfs.remoteobj.*;
|
|
||||||
import com.usatiuk.dhfs.repository.*;
|
|
||||||
import com.usatiuk.dhfs.syncmap.DtoMapperService;
|
|
||||||
import com.usatiuk.objects.JObjectKey;
|
|
||||||
import com.usatiuk.objects.transaction.Transaction;
|
|
||||||
import com.usatiuk.objects.transaction.TransactionHandle;
|
|
||||||
import com.usatiuk.objects.transaction.TransactionManager;
|
|
||||||
import io.grpc.Status;
|
|
||||||
import io.grpc.StatusRuntimeException;
|
|
||||||
import io.quarkus.logging.Log;
|
|
||||||
import io.smallrye.mutiny.Uni;
|
|
||||||
import jakarta.enterprise.context.ApplicationScoped;
|
|
||||||
import jakarta.inject.Inject;
|
|
||||||
import org.apache.commons.lang3.tuple.Pair;
|
|
||||||
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
|
||||||
|
|
||||||
// Note: RunOnVirtualThread hangs somehow
|
|
||||||
@ApplicationScoped
|
|
||||||
public class RemoteObjectServiceServerImpl {
|
|
||||||
@Inject
|
|
||||||
TransactionManager txm;
|
|
||||||
@Inject
|
|
||||||
ReachablePeerManager reachablePeerManager;
|
|
||||||
@Inject
|
|
||||||
Transaction curTx;
|
|
||||||
|
|
||||||
@Inject
|
|
||||||
ProtoSerializer<OpP, Op> opProtoSerializer;
|
|
||||||
@Inject
|
|
||||||
ProtoSerializer<GetObjectReply, ReceivedObject> receivedObjectProtoSerializer;
|
|
||||||
@Inject
|
|
||||||
RemoteTransaction remoteTx;
|
|
||||||
@Inject
|
|
||||||
OpHandlerService opHandlerService;
|
|
||||||
@Inject
|
|
||||||
DtoMapperService dtoMapperService;
|
|
||||||
@Inject
|
|
||||||
AutosyncProcessor autosyncProcessor;
|
|
||||||
|
|
||||||
public Uni<GetObjectReply> getObject(PeerId from, GetObjectRequest request) {
|
|
||||||
Log.info("<-- getObject: " + request.getName() + " from " + from);
|
|
||||||
|
|
||||||
Pair<RemoteObjectMeta, JDataRemoteDto> got = txm.run(() -> {
|
|
||||||
var meta = remoteTx.getMeta(JObjectKey.of(request.getName().getName())).orElse(null);
|
|
||||||
var obj = remoteTx.getDataLocal(JDataRemote.class, JObjectKey.of(request.getName().getName())).orElse(null);
|
|
||||||
if (meta != null && !meta.seen())
|
|
||||||
curTx.put(meta.withSeen(true));
|
|
||||||
if (obj != null)
|
|
||||||
for (var ref : obj.collectRefsTo()) {
|
|
||||||
var refMeta = remoteTx.getMeta(ref).orElse(null);
|
|
||||||
if (refMeta != null && !refMeta.seen())
|
|
||||||
curTx.put(refMeta.withSeen(true));
|
|
||||||
}
|
|
||||||
return Pair.of(meta, obj == null ? null : dtoMapperService.toDto(obj, obj.dtoClass()));
|
|
||||||
});
|
|
||||||
|
|
||||||
if ((got.getValue() != null) && (got.getKey() == null)) {
|
|
||||||
Log.error("Inconsistent state for object meta: " + request.getName());
|
|
||||||
throw new StatusRuntimeException(Status.INTERNAL);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (got.getValue() == null) {
|
|
||||||
Log.info("<-- getObject NOT FOUND: " + request.getName() + " from " + from);
|
|
||||||
throw new StatusRuntimeException(Status.NOT_FOUND);
|
|
||||||
}
|
|
||||||
|
|
||||||
var serialized = receivedObjectProtoSerializer.serialize(new ReceivedObject(got.getKey().changelog(), got.getRight()));
|
|
||||||
return Uni.createFrom().item(serialized);
|
|
||||||
}
|
|
||||||
|
|
||||||
public Uni<CanDeleteReply> canDelete(PeerId from, CanDeleteRequest request) {
|
|
||||||
Log.infov("<-- canDelete: {0} from {1}", request, from);
|
|
||||||
|
|
||||||
var builder = CanDeleteReply.newBuilder();
|
|
||||||
|
|
||||||
txm.run(() -> {
|
|
||||||
var obj = curTx.get(RemoteObjectMeta.class, JObjectKey.of(request.getName().getName())).orElse(null);
|
|
||||||
|
|
||||||
if (obj == null) {
|
|
||||||
builder.setDeletionCandidate(true);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
builder.setDeletionCandidate(!obj.frozen() && obj.refsFrom().isEmpty());
|
|
||||||
|
|
||||||
if (!builder.getDeletionCandidate()) {
|
|
||||||
for (var r : obj.refsFrom()) {
|
|
||||||
builder.addReferrers(JObjectKeyP.newBuilder().setName(r.obj().toString()).build());
|
|
||||||
curTx.onCommit(() -> autosyncProcessor.add(r.obj()));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!builder.getDeletionCandidate()) {
|
|
||||||
Log.infov("Not deletion candidate: {0}, {1} (asked from {2})", obj, builder, from);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
return Uni.createFrom().item(builder.build());
|
|
||||||
}
|
|
||||||
|
|
||||||
public Uni<OpPushReply> opPush(PeerId from, OpPushRequest request) {
|
|
||||||
if (request.getMsgCount() == 0) {
|
|
||||||
Log.infov("<-- opPush: empty from {0}", from);
|
|
||||||
return Uni.createFrom().item(OpPushReply.getDefaultInstance());
|
|
||||||
}
|
|
||||||
|
|
||||||
var handles = new ArrayList<TransactionHandle>();
|
|
||||||
try {
|
|
||||||
var ops = request.getMsgList().stream().map(opProtoSerializer::deserialize).toList();
|
|
||||||
for (var op : ops) {
|
|
||||||
Log.infov("<-- opPush: {0} from {1}", op, from);
|
|
||||||
var handle = txm.run(() -> {
|
|
||||||
opHandlerService.handleOp(from, op);
|
|
||||||
});
|
|
||||||
handles.add(handle);
|
|
||||||
}
|
|
||||||
} catch (Exception e) {
|
|
||||||
Log.error("Error handling ops", e);
|
|
||||||
throw e;
|
|
||||||
}
|
|
||||||
return Uni.createFrom().emitter(e -> {
|
|
||||||
var counter = new AtomicLong(handles.size());
|
|
||||||
for (var handle : handles) {
|
|
||||||
handle.onFlush(() -> {
|
|
||||||
if (counter.decrementAndGet() == 0) {
|
|
||||||
e.complete(OpPushReply.getDefaultInstance());
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
public Uni<PingReply> ping(PeerId from, PingRequest request) {
|
|
||||||
return Uni.createFrom().item(PingReply.getDefaultInstance());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Reference in New Issue
Block a user