mirror of
https://github.com/usatiuk/dhfs.git
synced 2025-10-28 20:47:49 +01:00
simplify serialization, for now
This commit is contained in:
@@ -4,9 +4,7 @@ import com.google.protobuf.ByteString;
|
||||
import com.usatiuk.autoprotomap.runtime.ProtoMirror;
|
||||
import com.usatiuk.dhfs.objects.JDataRemote;
|
||||
import com.usatiuk.dhfs.objects.JObjectKey;
|
||||
import com.usatiuk.dhfs.objects.persistence.ChunkDataP;
|
||||
|
||||
//@ProtoMirror(ChunkDataP.class)
|
||||
public record ChunkData(JObjectKey key, ByteString data) implements JDataRemote {
|
||||
@Override
|
||||
public int estimateSize() {
|
||||
|
||||
@@ -1,22 +0,0 @@
|
||||
package com.usatiuk.dhfs.files.objects;
|
||||
|
||||
import com.usatiuk.autoprotomap.runtime.ProtoSerializer;
|
||||
import com.usatiuk.dhfs.objects.JObjectKey;
|
||||
import com.usatiuk.dhfs.objects.persistence.ChunkDataP;
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
|
||||
@ApplicationScoped
|
||||
public class ChunkDataSerializer implements ProtoSerializer<ChunkDataP, ChunkData> {
|
||||
@Override
|
||||
public ChunkData deserialize(ChunkDataP message) {
|
||||
return new ChunkData(JObjectKey.of(message.getName()), message.getData());
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChunkDataP serialize(ChunkData object) {
|
||||
return ChunkDataP.newBuilder()
|
||||
.setName(object.key().toString())
|
||||
.setData(object.data())
|
||||
.build();
|
||||
}
|
||||
}
|
||||
@@ -2,7 +2,6 @@ package com.usatiuk.dhfs.files.objects;
|
||||
|
||||
import com.usatiuk.autoprotomap.runtime.ProtoMirror;
|
||||
import com.usatiuk.dhfs.objects.JObjectKey;
|
||||
import com.usatiuk.dhfs.objects.persistence.ChunkDataP;
|
||||
import org.pcollections.TreePMap;
|
||||
|
||||
import java.util.Collection;
|
||||
|
||||
@@ -1,44 +0,0 @@
|
||||
package com.usatiuk.dhfs.files.objects;
|
||||
|
||||
import com.usatiuk.autoprotomap.runtime.ProtoSerializer;
|
||||
import com.usatiuk.dhfs.objects.JObjectKey;
|
||||
import com.usatiuk.dhfs.objects.persistence.FileP;
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
import org.pcollections.TreePMap;
|
||||
|
||||
@ApplicationScoped
|
||||
public class FileSerializer implements ProtoSerializer<FileP, File> {
|
||||
@Override
|
||||
public File deserialize(FileP message) {
|
||||
TreePMap<Long, JObjectKey> chunks = TreePMap.empty();
|
||||
for (var chunk : message.getChunksList()) {
|
||||
chunks = chunks.plus(chunk.getStart(), JObjectKey.of(chunk.getId()));
|
||||
}
|
||||
var ret = new File(JObjectKey.of(message.getUuid()),
|
||||
message.getMode(),
|
||||
message.getCtime(),
|
||||
message.getMtime(),
|
||||
chunks,
|
||||
message.getSymlink(),
|
||||
message.getSize()
|
||||
);
|
||||
return ret;
|
||||
}
|
||||
|
||||
@Override
|
||||
public FileP serialize(File object) {
|
||||
var builder = FileP.newBuilder()
|
||||
.setUuid(object.key().toString())
|
||||
.setMode(object.mode())
|
||||
.setCtime(object.cTime())
|
||||
.setMtime(object.mTime())
|
||||
.setSymlink(object.symlink())
|
||||
.setSize(object.size());
|
||||
object.chunks().forEach((s, i) -> {
|
||||
builder.addChunksBuilder()
|
||||
.setStart(s)
|
||||
.setId(i.toString());
|
||||
});
|
||||
return builder.build();
|
||||
}
|
||||
}
|
||||
@@ -1,13 +1,9 @@
|
||||
package com.usatiuk.dhfs.objects;
|
||||
|
||||
import com.usatiuk.autoprotomap.runtime.ProtoMirror;
|
||||
import com.usatiuk.dhfs.objects.persistence.RemoteObjectP;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
|
||||
@ProtoMirror(RemoteObjectP.class)
|
||||
public interface JDataRemote extends Serializable {
|
||||
JObjectKey key();
|
||||
|
||||
|
||||
@@ -2,5 +2,5 @@ package com.usatiuk.dhfs.objects;
|
||||
|
||||
import org.pcollections.PMap;
|
||||
|
||||
public record ReceivedObject(JObjectKey key, PMap<PeerId, Long> changelog, JDataRemote data) {
|
||||
public record ReceivedObject(PMap<PeerId, Long> changelog, JDataRemote data) {
|
||||
}
|
||||
|
||||
@@ -0,0 +1,21 @@
|
||||
package com.usatiuk.dhfs.objects;
|
||||
|
||||
import com.usatiuk.autoprotomap.runtime.ProtoSerializer;
|
||||
import com.usatiuk.dhfs.objects.persistence.JDataRemoteP;
|
||||
import com.usatiuk.dhfs.utils.SerializationHelper;
|
||||
import jakarta.inject.Singleton;
|
||||
|
||||
@Singleton
|
||||
public class TemporaryJDataRemoteSerializer implements ProtoSerializer<JDataRemoteP, JDataRemote> {
|
||||
@Override
|
||||
public JDataRemote deserialize(JDataRemoteP message) {
|
||||
return SerializationHelper.deserialize(message.getSerializedData().toByteArray());
|
||||
}
|
||||
|
||||
@Override
|
||||
public JDataRemoteP serialize(JDataRemote object) {
|
||||
return JDataRemoteP.newBuilder()
|
||||
.setSerializedData(SerializationHelper.serialize(object))
|
||||
.build();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,22 @@
|
||||
package com.usatiuk.dhfs.objects;
|
||||
|
||||
import com.usatiuk.autoprotomap.runtime.ProtoSerializer;
|
||||
import com.usatiuk.dhfs.objects.repository.OpP;
|
||||
import com.usatiuk.dhfs.objects.repository.invalidation.Op;
|
||||
import com.usatiuk.dhfs.utils.SerializationHelper;
|
||||
import jakarta.inject.Singleton;
|
||||
|
||||
@Singleton
|
||||
public class TemporaryOpSerializer implements ProtoSerializer<OpP, Op> {
|
||||
@Override
|
||||
public Op deserialize(OpP message) {
|
||||
return SerializationHelper.deserialize(message.getSerializedData().toByteArray());
|
||||
}
|
||||
|
||||
@Override
|
||||
public OpP serialize(Op object) {
|
||||
return OpP.newBuilder()
|
||||
.setSerializedData(SerializationHelper.serialize(object))
|
||||
.build();
|
||||
}
|
||||
}
|
||||
@@ -1,7 +1,5 @@
|
||||
package com.usatiuk.dhfs.objects.jkleppmanntree.structs;
|
||||
|
||||
import com.usatiuk.autoprotomap.runtime.ProtoMirror;
|
||||
import com.usatiuk.dhfs.objects.persistence.JKleppmannTreeNodeMetaP;
|
||||
import com.usatiuk.kleppmanntree.NodeMeta;
|
||||
|
||||
import java.util.Objects;
|
||||
|
||||
@@ -1,9 +1,5 @@
|
||||
package com.usatiuk.dhfs.objects.jkleppmanntree.structs;
|
||||
|
||||
import com.usatiuk.autoprotomap.runtime.ProtoMirror;
|
||||
import com.usatiuk.dhfs.objects.persistence.JKleppmannTreeNodeMetaDirectoryP;
|
||||
|
||||
//@ProtoMirror(JKleppmannTreeNodeMetaDirectoryP.class)
|
||||
public class JKleppmannTreeNodeMetaDirectory extends JKleppmannTreeNodeMeta {
|
||||
public JKleppmannTreeNodeMetaDirectory(String name) {
|
||||
super(name);
|
||||
|
||||
@@ -2,11 +2,9 @@ package com.usatiuk.dhfs.objects.jkleppmanntree.structs;
|
||||
|
||||
import com.usatiuk.autoprotomap.runtime.ProtoMirror;
|
||||
import com.usatiuk.dhfs.objects.JObjectKey;
|
||||
import com.usatiuk.dhfs.objects.persistence.JKleppmannTreeNodeMetaFileP;
|
||||
|
||||
import java.util.Objects;
|
||||
|
||||
//@ProtoMirror(JKleppmannTreeNodeMetaFileP.class)
|
||||
public class JKleppmannTreeNodeMetaFile extends JKleppmannTreeNodeMeta {
|
||||
private final JObjectKey _fileIno;
|
||||
|
||||
|
||||
@@ -136,10 +136,7 @@ public class PeerManager {
|
||||
private boolean pingCheck(PeerInfo host, PeerAddress address) {
|
||||
try {
|
||||
return rpcClientFactory.withObjSyncClient(host.id(), address, pingTimeout, (peer, c) -> {
|
||||
var ret = c.ping(PingRequest.newBuilder().setSelfUuid(persistentPeerDataService.getSelfUuid().toString()).build());
|
||||
if (!UUID.fromString(ret.getSelfUuid()).equals(host.id().id())) {
|
||||
throw new IllegalStateException("Ping selfUuid returned " + ret.getSelfUuid() + " but expected " + host.id());
|
||||
}
|
||||
c.ping(PingRequest.getDefaultInstance());
|
||||
return true;
|
||||
});
|
||||
} catch (Exception ignored) {
|
||||
|
||||
@@ -1,46 +0,0 @@
|
||||
package com.usatiuk.dhfs.objects.repository;
|
||||
|
||||
import com.usatiuk.autoprotomap.runtime.ProtoSerializer;
|
||||
import com.usatiuk.dhfs.objects.JDataRemote;
|
||||
import com.usatiuk.dhfs.objects.JObjectKey;
|
||||
import com.usatiuk.dhfs.objects.PeerId;
|
||||
import com.usatiuk.dhfs.objects.ReceivedObject;
|
||||
import com.usatiuk.dhfs.objects.persistence.RemoteObjectP;
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
import jakarta.inject.Inject;
|
||||
import org.pcollections.HashTreePMap;
|
||||
import org.pcollections.PMap;
|
||||
|
||||
@ApplicationScoped
|
||||
public class ReceivedObjectSerializer implements ProtoSerializer<GetObjectReply, ReceivedObject> {
|
||||
@Inject
|
||||
ProtoSerializer<RemoteObjectP, JDataRemote> remoteObjectSerializer;
|
||||
|
||||
@Override
|
||||
public ReceivedObject deserialize(GetObjectReply message) {
|
||||
PMap<PeerId, Long> changelog = HashTreePMap.empty();
|
||||
for (var entry : message.getHeader().getChangelog().getEntriesList()) {
|
||||
changelog = changelog.plus(PeerId.of(entry.getHost()), entry.getVersion());
|
||||
}
|
||||
return new ReceivedObject(
|
||||
JObjectKey.of(message.getHeader().getName()),
|
||||
changelog,
|
||||
remoteObjectSerializer.deserialize(message.getContent())
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public GetObjectReply serialize(ReceivedObject object) {
|
||||
var builder = GetObjectReply.newBuilder();
|
||||
var headerBuilder = builder.getHeaderBuilder();
|
||||
headerBuilder.setName(object.key().toString());
|
||||
var changelogBuilder = headerBuilder.getChangelogBuilder();
|
||||
object.changelog().forEach((peer, version) -> {
|
||||
changelogBuilder.addEntriesBuilder()
|
||||
.setHost(peer.toString())
|
||||
.setVersion(version);
|
||||
});
|
||||
builder.setContent(remoteObjectSerializer.serialize(object.data()));
|
||||
return builder.build();
|
||||
}
|
||||
}
|
||||
@@ -2,6 +2,7 @@ package com.usatiuk.dhfs.objects.repository;
|
||||
|
||||
import com.usatiuk.autoprotomap.runtime.ProtoSerializer;
|
||||
import com.usatiuk.dhfs.objects.*;
|
||||
import com.usatiuk.dhfs.objects.persistence.JObjectKeyP;
|
||||
import com.usatiuk.dhfs.objects.repository.invalidation.InvalidationQueueService;
|
||||
import com.usatiuk.dhfs.objects.repository.invalidation.Op;
|
||||
import com.usatiuk.dhfs.objects.transaction.Transaction;
|
||||
@@ -41,11 +42,8 @@ public class RemoteObjectServiceClient {
|
||||
SyncHandler syncHandler;
|
||||
@Inject
|
||||
InvalidationQueueService invalidationQueueService;
|
||||
// @Inject
|
||||
// ProtoSerializer<JObjectDataP, JObjectData> dataProtoSerializer;
|
||||
@Inject
|
||||
ProtoSerializer<OpPushPayload, Op> opProtoSerializer;
|
||||
|
||||
ProtoSerializer<OpP, Op> opProtoSerializer;
|
||||
@Inject
|
||||
ProtoSerializer<GetObjectReply, ReceivedObject> receivedObjectProtoSerializer;
|
||||
|
||||
@@ -76,7 +74,7 @@ public class RemoteObjectServiceClient {
|
||||
Log.info("Downloading object " + key + " from " + targets);
|
||||
|
||||
rpcClientFactory.withObjSyncClient(targets, (peer, client) -> {
|
||||
var reply = client.getObject(GetObjectRequest.newBuilder().setName(key.toString()).build());
|
||||
var reply = client.getObject(GetObjectRequest.newBuilder().setName(JObjectKeyP.newBuilder().setName(key.toString()).build()).build());
|
||||
|
||||
var deserialized = receivedObjectProtoSerializer.deserialize(reply);
|
||||
|
||||
@@ -152,13 +150,13 @@ public class RemoteObjectServiceClient {
|
||||
return OpPushReply.getDefaultInstance();
|
||||
}
|
||||
|
||||
public Collection<Pair<PeerId, CanDeleteReply>> canDelete(Collection<PeerId> targets, JObjectKey object, Collection<JObjectKey> ourReferrers) {
|
||||
Log.trace("Asking canDelete for " + object + " from " + targets.stream().map(PeerId::toString).collect(Collectors.joining(", ")));
|
||||
public Collection<Pair<PeerId, CanDeleteReply>> canDelete(Collection<PeerId> targets, JObjectKey objKey, Collection<JObjectKey> ourReferrers) {
|
||||
Log.trace("Asking canDelete for " + objKey + " from " + targets.stream().map(PeerId::toString).collect(Collectors.joining(", ")));
|
||||
try {
|
||||
return _batchExecutor.invokeAll(targets.stream().<Callable<Pair<PeerId, CanDeleteReply>>>map(h -> () -> {
|
||||
var req = CanDeleteRequest.newBuilder().setName(object.toString());
|
||||
var req = CanDeleteRequest.newBuilder().setName(JObjectKeyP.newBuilder().setName(objKey.toString()).build());
|
||||
for (var ref : ourReferrers) {
|
||||
req.addOurReferrers(ref.toString());
|
||||
req.addOurReferrers(JObjectKeyP.newBuilder().setName(ref.toString()).build());
|
||||
}
|
||||
return Pair.of(h, rpcClientFactory.withObjSyncClient(h, (p, client) -> client.canDelete(req.build())));
|
||||
}).toList()).stream().map(f -> {
|
||||
|
||||
@@ -2,6 +2,7 @@ package com.usatiuk.dhfs.objects.repository;
|
||||
|
||||
import com.usatiuk.autoprotomap.runtime.ProtoSerializer;
|
||||
import com.usatiuk.dhfs.objects.*;
|
||||
import com.usatiuk.dhfs.objects.persistence.JObjectKeyP;
|
||||
import com.usatiuk.dhfs.objects.repository.invalidation.InvalidationQueueService;
|
||||
import com.usatiuk.dhfs.objects.repository.invalidation.Op;
|
||||
import com.usatiuk.dhfs.objects.repository.invalidation.OpHandler;
|
||||
@@ -37,10 +38,8 @@ public class RemoteObjectServiceServer implements DhfsObjectSyncGrpc {
|
||||
InvalidationQueueService invalidationQueueService;
|
||||
@Inject
|
||||
SecurityIdentity identity;
|
||||
// @Inject
|
||||
// ProtoSerializer<JObjectDataP, JObjectData> dataProtoSerializer;
|
||||
@Inject
|
||||
ProtoSerializer<OpPushPayload, Op> opProtoSerializer;
|
||||
ProtoSerializer<OpP, Op> opProtoSerializer;
|
||||
@Inject
|
||||
ProtoSerializer<GetObjectReply, ReceivedObject> receivedObjectProtoSerializer;
|
||||
@Inject
|
||||
@@ -53,10 +52,9 @@ public class RemoteObjectServiceServer implements DhfsObjectSyncGrpc {
|
||||
public Uni<GetObjectReply> getObject(GetObjectRequest request) {
|
||||
Log.info("<-- getObject: " + request.getName() + " from " + identity.getPrincipal().getName().substring(3));
|
||||
|
||||
|
||||
Pair<RemoteObjectMeta, JDataRemote> got = txm.run(() -> {
|
||||
var meta = remoteTx.getMeta(JObjectKey.of(request.getName())).orElse(null);
|
||||
var obj = remoteTx.getDataLocal(JDataRemote.class, JObjectKey.of(request.getName())).orElse(null);
|
||||
var meta = remoteTx.getMeta(JObjectKey.of(request.getName().getName())).orElse(null);
|
||||
var obj = remoteTx.getDataLocal(JDataRemote.class, JObjectKey.of(request.getName().getName())).orElse(null);
|
||||
if (meta != null && !meta.seen())
|
||||
curTx.put(meta.withSeen(true));
|
||||
if (obj != null)
|
||||
@@ -78,7 +76,7 @@ public class RemoteObjectServiceServer implements DhfsObjectSyncGrpc {
|
||||
throw new StatusRuntimeException(Status.NOT_FOUND);
|
||||
}
|
||||
|
||||
var serialized = receivedObjectProtoSerializer.serialize(new ReceivedObject(got.getKey().key(), got.getKey().changelog(), got.getValue()));
|
||||
var serialized = receivedObjectProtoSerializer.serialize(new ReceivedObject(got.getKey().changelog(), got.getValue()));
|
||||
return Uni.createFrom().item(serialized);
|
||||
// // Does @Blocking break this?
|
||||
// return Uni.createFrom().emitter(emitter -> {
|
||||
@@ -124,10 +122,9 @@ public class RemoteObjectServiceServer implements DhfsObjectSyncGrpc {
|
||||
Log.info("<-- canDelete: " + request.getName() + " from " + peerId);
|
||||
|
||||
var builder = CanDeleteReply.newBuilder();
|
||||
builder.setObjName(request.getName());
|
||||
|
||||
txm.run(() -> {
|
||||
var obj = curTx.get(RemoteObjectMeta.class, JObjectKey.of(request.getName())).orElse(null);
|
||||
var obj = curTx.get(RemoteObjectMeta.class, JObjectKey.of(request.getName().getName())).orElse(null);
|
||||
|
||||
if (obj == null) {
|
||||
builder.setDeletionCandidate(true);
|
||||
@@ -138,7 +135,7 @@ public class RemoteObjectServiceServer implements DhfsObjectSyncGrpc {
|
||||
|
||||
if (!builder.getDeletionCandidate())
|
||||
for (var r : obj.refsFrom())
|
||||
builder.addReferrers(r.toString());
|
||||
builder.addReferrers(JObjectKeyP.newBuilder().setName(r.toString()).build());
|
||||
|
||||
// if (!ret.getDeletionCandidate())
|
||||
// for (var rr : request.getOurReferrersList())
|
||||
@@ -181,8 +178,6 @@ public class RemoteObjectServiceServer implements DhfsObjectSyncGrpc {
|
||||
@Override
|
||||
@Blocking
|
||||
public Uni<PingReply> ping(PingRequest request) {
|
||||
if (request.getSelfUuid().isBlank()) throw new StatusRuntimeException(Status.INVALID_ARGUMENT);
|
||||
|
||||
return Uni.createFrom().item(PingReply.newBuilder().setSelfUuid(persistentPeerDataService.getSelfUuid().toString()).build());
|
||||
return Uni.createFrom().item(PingReply.getDefaultInstance());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,43 @@
|
||||
package com.usatiuk.dhfs.objects.repository;
|
||||
|
||||
import com.usatiuk.autoprotomap.runtime.ProtoSerializer;
|
||||
import com.usatiuk.dhfs.objects.JDataRemote;
|
||||
import com.usatiuk.dhfs.objects.PeerId;
|
||||
import com.usatiuk.dhfs.objects.ReceivedObject;
|
||||
import com.usatiuk.dhfs.objects.persistence.JDataRemoteP;
|
||||
import com.usatiuk.dhfs.objects.persistence.JObjectKeyP;
|
||||
import com.usatiuk.dhfs.objects.persistence.PeerIdP;
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Singleton;
|
||||
import org.pcollections.HashTreePMap;
|
||||
import org.pcollections.PMap;
|
||||
|
||||
@Singleton
|
||||
public class TemporaryReceivedObjectSerializer implements ProtoSerializer<GetObjectReply, ReceivedObject> {
|
||||
@Inject
|
||||
ProtoSerializer<JDataRemoteP, JDataRemote> remoteObjectSerializer;
|
||||
|
||||
@Override
|
||||
public ReceivedObject deserialize(GetObjectReply message) {
|
||||
PMap<PeerId, Long> changelog = HashTreePMap.empty();
|
||||
for (var entry : message.getChangelog().getEntriesList()) {
|
||||
changelog = changelog.plus(PeerId.of(entry.getKey().getId().getName()), entry.getValue());
|
||||
}
|
||||
var data = remoteObjectSerializer.deserialize(message.getPushedData());
|
||||
return new ReceivedObject(changelog, data);
|
||||
}
|
||||
|
||||
@Override
|
||||
public GetObjectReply serialize(ReceivedObject object) {
|
||||
var builder = GetObjectReply.newBuilder();
|
||||
var changelogBuilder = builder.getChangelogBuilder();
|
||||
object.changelog().forEach((peer, version) -> {
|
||||
changelogBuilder.addEntriesBuilder()
|
||||
.setKey(PeerIdP.newBuilder().setId(JObjectKeyP.newBuilder().setName(peer.id().toString()).build()).build())
|
||||
.setValue(version);
|
||||
});
|
||||
builder.setPushedData(remoteObjectSerializer.serialize(object.data()));
|
||||
return builder.build();
|
||||
}
|
||||
}
|
||||
@@ -9,10 +9,6 @@ import java.util.Collection;
|
||||
import java.util.List;
|
||||
|
||||
public record IndexUpdateOp(JObjectKey key, PMap<PeerId, Long> changelog) implements Op {
|
||||
public IndexUpdateOp(RemoteObjectMeta object) {
|
||||
this(object.key(), object.changelog());
|
||||
}
|
||||
|
||||
@Override
|
||||
public Collection<JObjectKey> getEscapedRefs() {
|
||||
return List.of(key);
|
||||
|
||||
@@ -1,36 +0,0 @@
|
||||
package com.usatiuk.dhfs.objects.repository.invalidation;
|
||||
|
||||
import com.usatiuk.autoprotomap.runtime.ProtoSerializer;
|
||||
import com.usatiuk.dhfs.objects.JObjectKey;
|
||||
import com.usatiuk.dhfs.objects.PeerId;
|
||||
import com.usatiuk.dhfs.objects.repository.IndexUpdateOpP;
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
import org.pcollections.HashTreePMap;
|
||||
import org.pcollections.PMap;
|
||||
|
||||
@ApplicationScoped
|
||||
public class IndexUpdateOpSerializer implements ProtoSerializer<IndexUpdateOpP, IndexUpdateOp> {
|
||||
|
||||
@Override
|
||||
public IndexUpdateOp deserialize(IndexUpdateOpP message) {
|
||||
PMap<PeerId, Long> map = HashTreePMap.empty();
|
||||
for (var entry : message.getHeader().getChangelog().getEntriesList()) {
|
||||
map = map.plus(PeerId.of(entry.getHost()), entry.getVersion());
|
||||
}
|
||||
return new IndexUpdateOp(JObjectKey.of(message.getHeader().getName()), map);
|
||||
}
|
||||
|
||||
@Override
|
||||
public IndexUpdateOpP serialize(IndexUpdateOp object) {
|
||||
var builder = IndexUpdateOpP.newBuilder();
|
||||
var headerBuilder = builder.getHeaderBuilder();
|
||||
headerBuilder.setName(object.key().name());
|
||||
var changelogBuilder = headerBuilder.getChangelogBuilder();
|
||||
for (var entry : object.changelog().entrySet()) {
|
||||
var entryBuilder = changelogBuilder.addEntriesBuilder();
|
||||
entryBuilder.setHost(entry.getKey().id().toString());
|
||||
entryBuilder.setVersion(entry.getValue());
|
||||
}
|
||||
return builder.build();
|
||||
}
|
||||
}
|
||||
@@ -1,22 +0,0 @@
|
||||
package com.usatiuk.dhfs.objects.repository.invalidation;
|
||||
|
||||
import com.usatiuk.autoprotomap.runtime.ProtoSerializer;
|
||||
import com.usatiuk.dhfs.objects.jkleppmanntree.JKleppmannTreeOpWrapper;
|
||||
import com.usatiuk.dhfs.objects.repository.JKleppmannTreeOpPTemp;
|
||||
import com.usatiuk.dhfs.utils.SerializationHelper;
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
|
||||
@ApplicationScoped
|
||||
public class JKleppmannTreeOpPTempSerializer implements ProtoSerializer<JKleppmannTreeOpPTemp, JKleppmannTreeOpWrapper> {
|
||||
@Override
|
||||
public JKleppmannTreeOpWrapper deserialize(JKleppmannTreeOpPTemp message) {
|
||||
return SerializationHelper.deserialize(message.getSerialized().toByteArray());
|
||||
}
|
||||
|
||||
@Override
|
||||
public JKleppmannTreeOpPTemp serialize(JKleppmannTreeOpWrapper object) {
|
||||
return JKleppmannTreeOpPTemp.newBuilder()
|
||||
.setSerialized(SerializationHelper.serialize(object))
|
||||
.build();
|
||||
}
|
||||
}
|
||||
@@ -2,11 +2,10 @@ package com.usatiuk.dhfs.objects.repository.invalidation;
|
||||
|
||||
import com.usatiuk.autoprotomap.runtime.ProtoMirror;
|
||||
import com.usatiuk.dhfs.objects.JObjectKey;
|
||||
import com.usatiuk.dhfs.objects.repository.OpPushPayload;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.Collection;
|
||||
|
||||
@ProtoMirror(OpPushPayload.class)
|
||||
public interface Op {
|
||||
public interface Op extends Serializable {
|
||||
Collection<JObjectKey> getEscapedRefs();
|
||||
}
|
||||
|
||||
@@ -1,24 +0,0 @@
|
||||
package com.usatiuk.dhfs.objects.repository.peersync;
|
||||
|
||||
import com.google.protobuf.ByteString;
|
||||
import com.usatiuk.autoprotomap.runtime.ProtoSerializer;
|
||||
import com.usatiuk.dhfs.objects.PeerId;
|
||||
import com.usatiuk.dhfs.objects.persistence.PeerInfoP;
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
|
||||
@ApplicationScoped
|
||||
public class PeerInfoSerializer implements ProtoSerializer<PeerInfoP, PeerInfo> {
|
||||
|
||||
@Override
|
||||
public PeerInfo deserialize(PeerInfoP message) {
|
||||
return new PeerInfo(PeerId.of(message.getUuid()), message.getCert().toByteArray());
|
||||
}
|
||||
|
||||
@Override
|
||||
public PeerInfoP serialize(PeerInfo object) {
|
||||
return PeerInfoP.newBuilder()
|
||||
.setUuid(object.key().toString())
|
||||
.setCert(ByteString.copyFrom(object.cert()))
|
||||
.build();
|
||||
}
|
||||
}
|
||||
@@ -6,164 +6,28 @@ option java_outer_classname = "DhfsObjectPersistence";
|
||||
|
||||
package dhfs.objects.persistence;
|
||||
|
||||
message ObjectMetadataP {
|
||||
string name = 1;
|
||||
map<string, int64> remoteCopies = 2;
|
||||
string knownClass = 3;
|
||||
bool seen = 4;
|
||||
bool deleted = 5;
|
||||
repeated string confirmedDeletes = 6;
|
||||
repeated string referrers = 7;
|
||||
map<string, int64> changelog = 8;
|
||||
repeated string savedRefs = 9;
|
||||
bool frozen = 10;
|
||||
bool haveLocalCopy = 11;
|
||||
}
|
||||
|
||||
message FsNodeP {
|
||||
string uuid = 1;
|
||||
int64 mode = 2;
|
||||
int64 ctime = 3;
|
||||
int64 mtime = 4;
|
||||
}
|
||||
|
||||
message FilePChunksEntry {
|
||||
int64 start = 1;
|
||||
string id = 2;
|
||||
}
|
||||
|
||||
message FileP {
|
||||
string uuid = 1;
|
||||
int64 mode = 2;
|
||||
int64 ctime = 3;
|
||||
int64 mtime = 4;
|
||||
repeated FilePChunksEntry chunks = 5;
|
||||
bool symlink = 6;
|
||||
int64 size = 7;
|
||||
}
|
||||
|
||||
message DirectoryP {
|
||||
FsNodeP fsNode = 1;
|
||||
map<string, string> children = 2;
|
||||
}
|
||||
|
||||
message ChunkDataP {
|
||||
string name = 1;
|
||||
bytes data = 2;
|
||||
}
|
||||
|
||||
message PeerDirectoryP {
|
||||
repeated string peers = 1;
|
||||
}
|
||||
|
||||
message PeerInfoP {
|
||||
string uuid = 1;
|
||||
bytes cert = 2;
|
||||
}
|
||||
|
||||
message JKleppmannTreeNodeMetaFileP {
|
||||
string name = 1;
|
||||
string fileIno = 2;
|
||||
}
|
||||
|
||||
message JKleppmannTreeNodeMetaDirectoryP {
|
||||
message JObjectKeyP {
|
||||
string name = 1;
|
||||
}
|
||||
|
||||
message JKleppmannTreeNodeMetaP {
|
||||
oneof meta {
|
||||
JKleppmannTreeNodeMetaFileP jKleppmannTreeNodeMetaFile = 1;
|
||||
JKleppmannTreeNodeMetaDirectoryP jKleppmannTreeNodeMetaDirectory = 2;
|
||||
message PeerIdP {
|
||||
JObjectKeyP id = 1;
|
||||
}
|
||||
|
||||
message ObjectChangelog {
|
||||
message entries_Entry {
|
||||
PeerIdP key = 1;
|
||||
int64 value = 2;
|
||||
}
|
||||
repeated entries_Entry entries = 1;
|
||||
}
|
||||
|
||||
message JKleppmannTreeOpP {
|
||||
int64 timestamp = 1;
|
||||
string peer = 2;
|
||||
string newParentId = 3;
|
||||
JKleppmannTreeNodeMetaP meta = 4;
|
||||
string child = 5;
|
||||
// TODO: Serialization
|
||||
|
||||
message JDataRemoteP {
|
||||
bytes serializedData = 1;
|
||||
}
|
||||
|
||||
message JKleppmannTreeNodePChildrenEntry {
|
||||
string key = 1;
|
||||
string value = 2;
|
||||
}
|
||||
|
||||
message JKleppmannTreeNodeP {
|
||||
optional string parent = 1;
|
||||
string id = 2;
|
||||
repeated JKleppmannTreeNodePChildrenEntry children = 3;
|
||||
optional JKleppmannTreeNodeMetaP meta = 4;
|
||||
optional JKleppmannTreeOpP lastEffectiveOp = 5;
|
||||
}
|
||||
|
||||
message JKleppmannTreePersistentDataPQueueEntry {
|
||||
int64 clock = 1;
|
||||
string uuid = 2;
|
||||
JKleppmannTreeOpP op = 3;
|
||||
}
|
||||
|
||||
message JKleppmannTreePersistentDataPQueue {
|
||||
string node = 1;
|
||||
repeated JKleppmannTreePersistentDataPQueueEntry entries = 2;
|
||||
}
|
||||
|
||||
message JKleppmannTreePersistentDataPTimestampEntry {
|
||||
string host = 1;
|
||||
int64 timestamp = 2;
|
||||
}
|
||||
|
||||
message JKleppmannTreeOpLogEffectP {
|
||||
optional JKleppmannTreeOpP oldEffectiveMove = 1;
|
||||
optional string oldParent = 2;
|
||||
optional JKleppmannTreeNodeMetaP oldMeta = 3;
|
||||
JKleppmannTreeOpP effectiveOp = 4;
|
||||
string newParentId = 5;
|
||||
JKleppmannTreeNodeMetaP newMeta = 6;
|
||||
string selfId = 7;
|
||||
}
|
||||
|
||||
message JKleppmannTreeOpLogPEntry {
|
||||
int64 clock = 1;
|
||||
string uuid = 2;
|
||||
JKleppmannTreeOpP op = 3;
|
||||
repeated JKleppmannTreeOpLogEffectP effects = 4;
|
||||
}
|
||||
|
||||
message JKleppmannTreePersistentDataP {
|
||||
string treeName = 1;
|
||||
int64 clock = 2;
|
||||
repeated JKleppmannTreePersistentDataPQueue queues = 3;
|
||||
repeated JKleppmannTreePersistentDataPTimestampEntry peerLog = 4;
|
||||
repeated JKleppmannTreeOpLogPEntry opLog = 5;
|
||||
}
|
||||
|
||||
message PeerDirectoryLocalP {
|
||||
repeated string initialOpSyncDonePeers = 1;
|
||||
repeated string initialObjSyncDonePeers = 2;
|
||||
}
|
||||
|
||||
message RemoteObjectP {
|
||||
oneof obj {
|
||||
FileP file = 2;
|
||||
ChunkDataP chunkData = 5;
|
||||
// PeerDirectoryP peerDirectory = 6;
|
||||
PeerInfoP peerInfo = 7;
|
||||
// JKleppmannTreeNodeP jKleppmannTreeNode = 8;
|
||||
// JKleppmannTreePersistentDataP jKleppmannTreePersistentData = 9;
|
||||
// PeerDirectoryLocalP peerDirectoryLocal = 10;
|
||||
}
|
||||
}
|
||||
|
||||
message JObjectDataP {
|
||||
oneof obj {
|
||||
FileP file = 2;
|
||||
ChunkDataP chunkData = 5;
|
||||
// PeerDirectoryP peerDirectory = 6;
|
||||
PeerInfoP peerInfo = 7;
|
||||
// JKleppmannTreeNodeP jKleppmannTreeNode = 8;
|
||||
// JKleppmannTreePersistentDataP jKleppmannTreePersistentData = 9;
|
||||
// PeerDirectoryLocalP peerDirectoryLocal = 10;
|
||||
}
|
||||
message JDataP {
|
||||
bytes serializedData = 1;
|
||||
}
|
||||
@@ -9,83 +9,45 @@ option java_outer_classname = "DhfsObjectSyncApi";
|
||||
package dhfs.objects.sync;
|
||||
|
||||
service DhfsObjectSyncGrpc {
|
||||
rpc OpPush (OpPushRequest) returns (OpPushReply) {}
|
||||
|
||||
rpc GetObject (GetObjectRequest) returns (GetObjectReply) {}
|
||||
rpc CanDelete (CanDeleteRequest) returns (CanDeleteReply) {}
|
||||
rpc OpPush (OpPushRequest) returns (OpPushReply) {}
|
||||
|
||||
rpc Ping (PingRequest) returns (PingReply) {}
|
||||
}
|
||||
|
||||
message PingRequest {
|
||||
string selfUuid = 1;
|
||||
}
|
||||
message PingRequest {}
|
||||
|
||||
message PingReply {
|
||||
string selfUuid = 1;
|
||||
}
|
||||
|
||||
message ObjectChangelogEntry {
|
||||
string host = 1;
|
||||
uint64 version = 2;
|
||||
}
|
||||
|
||||
message ObjectChangelog {
|
||||
repeated ObjectChangelogEntry entries = 1;
|
||||
}
|
||||
|
||||
message ObjectHeader {
|
||||
string name = 2;
|
||||
ObjectChangelog changelog = 5;
|
||||
optional dhfs.objects.persistence.RemoteObjectP pushedData = 6;
|
||||
}
|
||||
message PingReply {}
|
||||
|
||||
message GetObjectRequest {
|
||||
string name = 2;
|
||||
dhfs.objects.persistence.JObjectKeyP name = 2;
|
||||
}
|
||||
|
||||
message GetObjectReply {
|
||||
ObjectHeader header = 1;
|
||||
dhfs.objects.persistence.RemoteObjectP content = 2;
|
||||
dhfs.objects.persistence.ObjectChangelog changelog = 5;
|
||||
dhfs.objects.persistence.JDataRemoteP pushedData = 6;
|
||||
}
|
||||
|
||||
message CanDeleteRequest {
|
||||
string name = 2;
|
||||
repeated string ourReferrers = 3;
|
||||
dhfs.objects.persistence.JObjectKeyP name = 2;
|
||||
repeated dhfs.objects.persistence.JObjectKeyP ourReferrers = 3;
|
||||
}
|
||||
|
||||
message CanDeleteReply {
|
||||
string objName = 1;
|
||||
bool deletionCandidate = 2;
|
||||
repeated string referrers = 3;
|
||||
}
|
||||
|
||||
message IndexUpdateOpP {
|
||||
ObjectHeader header = 1;
|
||||
}
|
||||
|
||||
message IndexUpdateReply {}
|
||||
|
||||
message JKleppmannTreePeriodicPushOpP {
|
||||
int64 timestamp = 2;
|
||||
}
|
||||
|
||||
message JKleppmannTreeOpPTemp {
|
||||
bytes serialized = 2;
|
||||
}
|
||||
|
||||
message OpPushPayload {
|
||||
oneof payload {
|
||||
JKleppmannTreeOpPTemp jKleppmannTreeOpWrapper = 1;
|
||||
// dhfs.objects.persistence.JKleppmannTreeOpP jKleppmannTreeOpWrapper = 1;
|
||||
// JKleppmannTreePeriodicPushOpP jKleppmannTreePeriodicPushOp = 2;
|
||||
IndexUpdateOpP indexUpdateOp = 3;
|
||||
}
|
||||
repeated dhfs.objects.persistence.JObjectKeyP referrers = 3;
|
||||
}
|
||||
|
||||
message OpPushRequest {
|
||||
repeated OpPushPayload msg = 2;
|
||||
repeated OpP msg = 2;
|
||||
}
|
||||
|
||||
message OpPushReply {
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
message OpP {
|
||||
bytes serializedData = 1;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user