mirror of
https://github.com/usatiuk/dhfs.git
synced 2025-10-29 04:57:48 +01:00
Compare commits
6 Commits
66d2c26551
...
25e5acaabb
| Author | SHA1 | Date | |
|---|---|---|---|
| 25e5acaabb | |||
| 3d6bb55950 | |||
| da04398d26 | |||
| 830cde74a8 | |||
| e16556ce67 | |||
| 0e90c10efc |
@@ -27,10 +27,6 @@ public abstract class SerializationHelper {
|
||||
return deserialize(new ByteArrayInputStream(objectData));
|
||||
}
|
||||
|
||||
public static <T> T deserialize(final ByteString objectData) {
|
||||
return deserialize(objectData.newInput());
|
||||
}
|
||||
|
||||
public static <T extends Serializable> ByteString serialize(final T obj) {
|
||||
return UnsafeByteOperations.unsafeWrap(SerializationUtils.serialize(obj));
|
||||
}
|
||||
|
||||
@@ -3,6 +3,7 @@ package com.usatiuk.dhfs.files.objects;
|
||||
import com.google.protobuf.ByteString;
|
||||
import com.usatiuk.dhfs.files.conflicts.NoOpConflictResolver;
|
||||
import com.usatiuk.dhfs.objects.jrepository.JObjectData;
|
||||
import com.usatiuk.dhfs.objects.persistence.ChunkDataP;
|
||||
import com.usatiuk.dhfs.objects.repository.ConflictResolver;
|
||||
import lombok.Getter;
|
||||
import net.openhft.hashing.LongTupleHashFunction;
|
||||
@@ -19,21 +20,37 @@ public class ChunkData extends JObjectData {
|
||||
@Serial
|
||||
private static final long serialVersionUID = 1;
|
||||
|
||||
final String _hash;
|
||||
final ByteString _bytes;
|
||||
final ChunkDataP _data;
|
||||
|
||||
public ChunkData(ByteString bytes) {
|
||||
super();
|
||||
this._bytes = bytes;
|
||||
// TODO: There might be (most definitely) a copy there
|
||||
this._hash = Arrays.stream(LongTupleHashFunction.xx128().hashBytes(_bytes.asReadOnlyByteBuffer()))
|
||||
.mapToObj(Long::toHexString).collect(Collectors.joining());
|
||||
_data = ChunkDataP.newBuilder()
|
||||
.setData(bytes)
|
||||
// TODO: There might be (most definitely) a copy there
|
||||
.setName(Arrays.stream(LongTupleHashFunction.xx128().hashBytes(bytes.asReadOnlyByteBuffer()))
|
||||
.mapToObj(Long::toHexString).collect(Collectors.joining()))
|
||||
.build();
|
||||
}
|
||||
|
||||
public ChunkData(ByteString bytes, String name) {
|
||||
super();
|
||||
this._bytes = bytes;
|
||||
this._hash = name;
|
||||
_data = ChunkDataP.newBuilder()
|
||||
.setData(bytes)
|
||||
.setName(name)
|
||||
.build();
|
||||
}
|
||||
|
||||
public ChunkData(ChunkDataP chunkDataP) {
|
||||
super();
|
||||
_data = chunkDataP;
|
||||
}
|
||||
|
||||
public String getHash() {
|
||||
return _data.getName();
|
||||
}
|
||||
|
||||
public ByteString getBytes() {
|
||||
return _data.getData();
|
||||
}
|
||||
|
||||
public static String getNameFromHash(String hash) {
|
||||
@@ -45,17 +62,17 @@ public class ChunkData extends JObjectData {
|
||||
if (this == o) return true;
|
||||
if (o == null || getClass() != o.getClass()) return false;
|
||||
ChunkData chunkData = (ChunkData) o;
|
||||
return Objects.equals(_hash, chunkData._hash);
|
||||
return Objects.equals(_data.getName(), chunkData.getData().getName());
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hashCode(_hash);
|
||||
return Objects.hashCode(_data.getName());
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getName() {
|
||||
return getNameFromHash(_hash);
|
||||
return getNameFromHash(_data.getName());
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -75,6 +92,6 @@ public class ChunkData extends JObjectData {
|
||||
|
||||
@Override
|
||||
public long estimateSize() {
|
||||
return _bytes.size();
|
||||
return _data.getData().size();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,19 @@
|
||||
package com.usatiuk.dhfs.files.objects;
|
||||
|
||||
import com.usatiuk.dhfs.objects.persistence.ChunkDataP;
|
||||
import com.usatiuk.dhfs.objects.protoserializer.ProtoDeserializer;
|
||||
import com.usatiuk.dhfs.objects.protoserializer.ProtoSerializer;
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
|
||||
@ApplicationScoped
|
||||
public class ChunkDataSerializer implements ProtoSerializer<ChunkDataP, ChunkData>, ProtoDeserializer<ChunkDataP, ChunkData> {
|
||||
@Override
|
||||
public ChunkData deserialize(ChunkDataP message) {
|
||||
return new ChunkData(message);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChunkDataP serialize(ChunkData object) {
|
||||
return object.getData();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,22 @@
|
||||
package com.usatiuk.dhfs.files.objects;
|
||||
|
||||
import com.usatiuk.dhfs.objects.persistence.ChunkInfoP;
|
||||
import com.usatiuk.dhfs.objects.protoserializer.ProtoDeserializer;
|
||||
import com.usatiuk.dhfs.objects.protoserializer.ProtoSerializer;
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
|
||||
@ApplicationScoped
|
||||
public class ChunkInfoSerializer implements ProtoSerializer<ChunkInfoP, ChunkInfo>, ProtoDeserializer<ChunkInfoP, ChunkInfo> {
|
||||
@Override
|
||||
public ChunkInfo deserialize(ChunkInfoP message) {
|
||||
return new ChunkInfo(message.getName(), message.getSize());
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChunkInfoP serialize(ChunkInfo object) {
|
||||
return ChunkInfoP.newBuilder()
|
||||
.setName(object.getHash())
|
||||
.setSize(object.getSize())
|
||||
.build();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,37 @@
|
||||
package com.usatiuk.dhfs.files.objects;
|
||||
|
||||
import com.usatiuk.dhfs.objects.persistence.DirectoryP;
|
||||
import com.usatiuk.dhfs.objects.persistence.FsNodeP;
|
||||
import com.usatiuk.dhfs.objects.protoserializer.ProtoDeserializer;
|
||||
import com.usatiuk.dhfs.objects.protoserializer.ProtoSerializer;
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@ApplicationScoped
|
||||
public class DirectorySerializer implements ProtoSerializer<DirectoryP, Directory>, ProtoDeserializer<DirectoryP, Directory> {
|
||||
@Override
|
||||
public Directory deserialize(DirectoryP message) {
|
||||
var ret = new Directory(UUID.fromString(message.getFsNode().getUuid()));
|
||||
ret.setMtime(message.getFsNode().getMtime());
|
||||
ret.setCtime(message.getFsNode().getCtime());
|
||||
ret.setMode(message.getFsNode().getMode());
|
||||
ret.getChildren().putAll(message.getChildrenMap().entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> UUID.fromString(e.getValue()))));
|
||||
return ret;
|
||||
}
|
||||
|
||||
@Override
|
||||
public DirectoryP serialize(Directory object) {
|
||||
return DirectoryP.newBuilder()
|
||||
.setFsNode(FsNodeP.newBuilder()
|
||||
.setCtime(object.getCtime())
|
||||
.setMtime(object.getMtime())
|
||||
.setMode(object.getMode())
|
||||
.setUuid(object.getUuid().toString())
|
||||
.build())
|
||||
.putAllChildren(object.getChildren().entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().toString())))
|
||||
.build();
|
||||
}
|
||||
}
|
||||
@@ -23,6 +23,8 @@ public class File extends FsNode {
|
||||
|
||||
public File(UUID uuid, long mode, UUID parent, boolean symlink) {
|
||||
super(uuid, mode);
|
||||
if (parent == null)
|
||||
throw new IllegalArgumentException("Parent UUID cannot be null");
|
||||
_parent = parent;
|
||||
_symlink = symlink;
|
||||
}
|
||||
|
||||
@@ -0,0 +1,38 @@
|
||||
package com.usatiuk.dhfs.files.objects;
|
||||
|
||||
import com.usatiuk.dhfs.objects.persistence.FileP;
|
||||
import com.usatiuk.dhfs.objects.persistence.FsNodeP;
|
||||
import com.usatiuk.dhfs.objects.protoserializer.ProtoDeserializer;
|
||||
import com.usatiuk.dhfs.objects.protoserializer.ProtoSerializer;
|
||||
import io.quarkus.logging.Log;
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
|
||||
import java.util.UUID;
|
||||
|
||||
@ApplicationScoped
|
||||
public class FileSerializer implements ProtoSerializer<FileP, File>, ProtoDeserializer<FileP, File> {
|
||||
@Override
|
||||
public File deserialize(FileP message) {
|
||||
var ret = new File(UUID.fromString(message.getFsNode().getUuid()), message.getFsNode().getMode(), UUID.fromString(message.getParent()), message.getSymlink());
|
||||
ret.setMtime(message.getFsNode().getMtime());
|
||||
ret.setCtime(message.getFsNode().getCtime());
|
||||
ret.getChunks().putAll(message.getChunksMap());
|
||||
return ret;
|
||||
}
|
||||
|
||||
@Override
|
||||
public FileP serialize(File object) {
|
||||
var ret = FileP.newBuilder()
|
||||
.setFsNode(FsNodeP.newBuilder()
|
||||
.setCtime(object.getCtime())
|
||||
.setMtime(object.getMtime())
|
||||
.setMode(object.getMode())
|
||||
.setUuid(object.getUuid().toString())
|
||||
.build())
|
||||
.putAllChunks(object.getChunks())
|
||||
.setSymlink(object.isSymlink())
|
||||
.setParent(object.getParent().toString())
|
||||
.build();
|
||||
return ret;
|
||||
}
|
||||
}
|
||||
@@ -21,6 +21,7 @@ public class JObject<T extends JObjectData> implements Serializable, Comparable<
|
||||
protected JObject(JObjectResolver resolver, String name, UUID selfUuid, T obj) {
|
||||
_resolver = resolver;
|
||||
_metaPart = new ObjectMetadata(name, false, obj.getClass());
|
||||
_metaPart.getHaveLocalCopy().set(true);
|
||||
_dataPart.set(obj);
|
||||
_metaPart.bumpVersion(selfUuid);
|
||||
Log.trace("new JObject: " + getName());
|
||||
@@ -77,6 +78,10 @@ public class JObject<T extends JObjectData> implements Serializable, Comparable<
|
||||
return _metaPart;
|
||||
}
|
||||
|
||||
public boolean hasLocalCopyMd() {
|
||||
return _metaPart.getHaveLocalCopy().get();
|
||||
}
|
||||
|
||||
public Class<? extends ConflictResolver> getConflictResolver() {
|
||||
if (_dataPart.get() == null) throw new NotImplementedException("Data part not found!");
|
||||
return _dataPart.get().getConflictResolver();
|
||||
@@ -116,6 +121,7 @@ public class JObject<T extends JObjectData> implements Serializable, Comparable<
|
||||
var res = _resolver.resolveDataRemote(this);
|
||||
_metaPart.narrowClass(res.getClass());
|
||||
_dataPart.set(res);
|
||||
_metaPart.getHaveLocalCopy().set(true);
|
||||
hydrateRefs();
|
||||
verifyRefs();
|
||||
} // _dataPart.get() == null
|
||||
@@ -158,6 +164,7 @@ public class JObject<T extends JObjectData> implements Serializable, Comparable<
|
||||
throw new IllegalStateException("Expected external resolution only for classes with pushResolution " + getName());
|
||||
_metaPart.narrowClass(data.getClass());
|
||||
_dataPart.set(data);
|
||||
_metaPart.getHaveLocalCopy().set(true);
|
||||
if (!_metaPart.isLocked())
|
||||
_metaPart.lock();
|
||||
hydrateRefs();
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
package com.usatiuk.dhfs.objects.jrepository;
|
||||
|
||||
import com.google.protobuf.ByteString;
|
||||
import com.usatiuk.dhfs.SerializationHelper;
|
||||
import com.usatiuk.dhfs.objects.persistence.BlobP;
|
||||
import com.usatiuk.dhfs.objects.protoserializer.ProtoSerializerService;
|
||||
import com.usatiuk.dhfs.objects.repository.PersistentRemoteHostsService;
|
||||
import com.usatiuk.dhfs.objects.repository.persistence.ObjectPersistentStore;
|
||||
import io.grpc.Status;
|
||||
@@ -30,6 +30,8 @@ public class JObjectManagerImpl implements JObjectManager {
|
||||
JObjectResolver jObjectResolver;
|
||||
@Inject
|
||||
PersistentRemoteHostsService persistentRemoteHostsService;
|
||||
@Inject
|
||||
ProtoSerializerService protoSerializerService;
|
||||
private Thread _refCleanupThread;
|
||||
|
||||
@Startup
|
||||
@@ -78,7 +80,7 @@ public class JObjectManagerImpl implements JObjectManager {
|
||||
if (inMap != null) return Optional.of(inMap);
|
||||
}
|
||||
|
||||
ByteString readMd;
|
||||
BlobP readMd;
|
||||
try {
|
||||
readMd = objectPersistentStore.readObject("meta_" + name);
|
||||
} catch (StatusRuntimeException ex) {
|
||||
@@ -86,7 +88,7 @@ public class JObjectManagerImpl implements JObjectManager {
|
||||
return Optional.empty();
|
||||
throw ex;
|
||||
}
|
||||
var meta = SerializationHelper.deserialize(readMd);
|
||||
var meta = protoSerializerService.deserialize(readMd);
|
||||
if (!(meta instanceof ObjectMetadata))
|
||||
throw new StatusRuntimeException(Status.DATA_LOSS.withDescription("Unexpected metadata type for " + name));
|
||||
|
||||
|
||||
@@ -164,7 +164,7 @@ public class JObjectRefProcessor {
|
||||
got.get().tryResolve(JObject.ResolutionStrategy.LOCAL_ONLY);
|
||||
|
||||
Log.trace("Deleting " + m.getName());
|
||||
m.delete();
|
||||
m.markDeleted();
|
||||
|
||||
Stream<String> refs = Stream.empty();
|
||||
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
package com.usatiuk.dhfs.objects.jrepository;
|
||||
|
||||
import com.google.common.collect.Streams;
|
||||
import com.usatiuk.dhfs.SerializationHelper;
|
||||
import com.usatiuk.dhfs.objects.protoserializer.ProtoSerializerService;
|
||||
import com.usatiuk.dhfs.objects.repository.InvalidationQueueService;
|
||||
import com.usatiuk.dhfs.objects.repository.PersistentRemoteHostsService;
|
||||
import com.usatiuk.dhfs.objects.repository.RemoteObjectServiceClient;
|
||||
@@ -40,6 +40,8 @@ public class JObjectResolver {
|
||||
@Inject
|
||||
PersistentRemoteHostsService persistentRemoteHostsService;
|
||||
@Inject
|
||||
ProtoSerializerService protoSerializerService;
|
||||
@Inject
|
||||
JObjectRefProcessor jObjectRefProcessor;
|
||||
@ConfigProperty(name = "dhfs.objects.ref_verification")
|
||||
boolean refVerification;
|
||||
@@ -53,8 +55,12 @@ public class JObjectResolver {
|
||||
}
|
||||
|
||||
public boolean hasLocalCopy(JObject<?> self) {
|
||||
if (!self.isDeleted() && refVerification) {
|
||||
if (self.hasLocalCopyMd() && !(self.getData() != null || objectPersistentStore.existsObject(self.getName())))
|
||||
throw new IllegalStateException("hasLocalCopy mismatch for " + self.getName());
|
||||
}
|
||||
// FIXME: Read/write lock assert?
|
||||
return objectPersistentStore.existsObject(self.getName());
|
||||
return self.hasLocalCopyMd();
|
||||
}
|
||||
|
||||
public void backupRefs(JObject<?> self) {
|
||||
@@ -129,7 +135,7 @@ public class JObjectResolver {
|
||||
self.tryResolve(JObject.ResolutionStrategy.LOCAL_ONLY);
|
||||
|
||||
Log.trace("Quick delete of: " + self.getName());
|
||||
self.getMeta().delete();
|
||||
self.getMeta().markDeleted();
|
||||
|
||||
Stream<String> refs = Stream.empty();
|
||||
|
||||
@@ -152,7 +158,7 @@ public class JObjectResolver {
|
||||
// jObject.assertRWLock();
|
||||
// FIXME: No way to assert read lock?
|
||||
if (objectPersistentStore.existsObject(jObject.getName()))
|
||||
return Optional.of(SerializationHelper.deserialize(objectPersistentStore.readObject(jObject.getName())));
|
||||
return Optional.of(protoSerializerService.deserialize(objectPersistentStore.readObject(jObject.getName())));
|
||||
return Optional.empty();
|
||||
}
|
||||
|
||||
@@ -160,13 +166,14 @@ public class JObjectResolver {
|
||||
var obj = remoteObjectServiceClient.getObject(jObject);
|
||||
jObjectWriteback.markDirty(jObject);
|
||||
invalidationQueueService.pushInvalidationToAll(jObject.getName());
|
||||
return SerializationHelper.deserialize(obj);
|
||||
return protoSerializerService.deserialize(obj);
|
||||
}
|
||||
|
||||
public void removeLocal(JObject<?> jObject, String name) {
|
||||
jObject.assertRWLock();
|
||||
try {
|
||||
Log.trace("Invalidating " + name);
|
||||
jObject.getMeta().getHaveLocalCopy().set(false);
|
||||
jObjectWriteback.remove(jObject);
|
||||
objectPersistentStore.deleteObject(name);
|
||||
} catch (StatusRuntimeException sx) {
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
package com.usatiuk.dhfs.objects.jrepository;
|
||||
|
||||
import com.usatiuk.dhfs.SerializationHelper;
|
||||
import com.usatiuk.dhfs.objects.protoserializer.ProtoSerializerService;
|
||||
import com.usatiuk.dhfs.objects.repository.persistence.ObjectPersistentStore;
|
||||
import com.usatiuk.utils.HashSetDelayedBlockingQueue;
|
||||
import io.quarkus.logging.Log;
|
||||
@@ -49,6 +49,8 @@ public class JObjectWriteback {
|
||||
ObjectPersistentStore objectPersistentStore;
|
||||
@Inject
|
||||
JObjectSizeEstimator jObjectSizeEstimator;
|
||||
@Inject
|
||||
ProtoSerializerService protoSerializerService;
|
||||
@ConfigProperty(name = "dhfs.objects.writeback.limit")
|
||||
long sizeLimit;
|
||||
@ConfigProperty(name = "dhfs.objects.writeback.watermark-high")
|
||||
@@ -164,9 +166,9 @@ public class JObjectWriteback {
|
||||
objectPersistentStore.deleteObject(m.getName());
|
||||
return;
|
||||
}
|
||||
objectPersistentStore.writeObject("meta_" + m.getName(), SerializationHelper.serialize(m));
|
||||
objectPersistentStore.writeObject("meta_" + m.getName(), protoSerializerService.serializeToBlobP(m));
|
||||
if (data != null)
|
||||
objectPersistentStore.writeObject(m.getName(), SerializationHelper.serialize(data));
|
||||
objectPersistentStore.writeObject(m.getName(), protoSerializerService.serializeToBlobP(data));
|
||||
}
|
||||
|
||||
public void remove(JObject<?> object) {
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
package com.usatiuk.dhfs.objects.jrepository;
|
||||
|
||||
import com.usatiuk.dhfs.SerializationHelper;
|
||||
import com.usatiuk.dhfs.objects.persistence.JObjectDataP;
|
||||
import com.usatiuk.dhfs.objects.repository.ObjectChangelog;
|
||||
import com.usatiuk.dhfs.objects.repository.ObjectChangelogEntry;
|
||||
import com.usatiuk.dhfs.objects.repository.ObjectHeader;
|
||||
@@ -40,6 +40,8 @@ public class ObjectMetadata implements Serializable {
|
||||
private Set<String> _savedRefs = Collections.emptySet();
|
||||
@Getter
|
||||
private boolean _locked = false;
|
||||
@Getter
|
||||
private AtomicBoolean _haveLocalCopy = new AtomicBoolean(false);
|
||||
private transient AtomicBoolean _written = new AtomicBoolean(true);
|
||||
|
||||
public ObjectMetadata(String name, boolean written, Class<? extends JObjectData> knownClass) {
|
||||
@@ -82,7 +84,7 @@ public class ObjectMetadata implements Serializable {
|
||||
_seen.set(true);
|
||||
}
|
||||
|
||||
public void delete() {
|
||||
public void markDeleted() {
|
||||
_deleted.set(true);
|
||||
}
|
||||
|
||||
@@ -139,6 +141,10 @@ public class ObjectMetadata implements Serializable {
|
||||
return _referrers.stream().toList();
|
||||
}
|
||||
|
||||
protected Collection<String> getReferrersMutable() {
|
||||
return _referrers;
|
||||
}
|
||||
|
||||
public boolean isDeletionCandidate() {
|
||||
return !isLocked() && !isReferred();
|
||||
}
|
||||
@@ -176,18 +182,18 @@ public class ObjectMetadata implements Serializable {
|
||||
return headerBuilder.build();
|
||||
}
|
||||
|
||||
public ObjectHeader toRpcHeader(JObjectData data) {
|
||||
public ObjectHeader toRpcHeader(JObjectDataP data) {
|
||||
var headerBuilder = ObjectHeader.newBuilder().setName(getName());
|
||||
headerBuilder.setChangelog(toRpcChangelog());
|
||||
|
||||
if (data != null && data.getClass().isAnnotationPresent(PushResolution.class))
|
||||
headerBuilder.setPushedData(SerializationHelper.serialize(data));
|
||||
headerBuilder.setPushedData(data);
|
||||
|
||||
return headerBuilder.build();
|
||||
}
|
||||
|
||||
public int metaHash() {
|
||||
return Objects.hash(isSeen(), getKnownClass(), isDeleted(), _referrers, _locked, _remoteCopies, _savedRefs);
|
||||
return Objects.hash(isSeen(), getKnownClass(), isDeleted(), _referrers, _locked, _remoteCopies, _savedRefs, _haveLocalCopy);
|
||||
}
|
||||
|
||||
// Not really a hash
|
||||
|
||||
@@ -0,0 +1,61 @@
|
||||
package com.usatiuk.dhfs.objects.jrepository;
|
||||
|
||||
import com.usatiuk.dhfs.objects.persistence.ObjectMetadataP;
|
||||
import com.usatiuk.dhfs.objects.protoserializer.ProtoDeserializer;
|
||||
import com.usatiuk.dhfs.objects.protoserializer.ProtoSerializer;
|
||||
import io.grpc.Status;
|
||||
import io.grpc.StatusRuntimeException;
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.LinkedHashSet;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@ApplicationScoped
|
||||
public class ObjectMetadataSerializer implements ProtoSerializer<ObjectMetadataP, ObjectMetadata>, ProtoDeserializer<ObjectMetadataP, ObjectMetadata> {
|
||||
@Override
|
||||
public ObjectMetadataP serialize(ObjectMetadata object) {
|
||||
return ObjectMetadataP.newBuilder()
|
||||
.setName(object.getName())
|
||||
.putAllRemoteCopies(object.getRemoteCopies().entrySet().stream().collect(Collectors.toMap(e -> e.getKey().toString(), Map.Entry::getValue)))
|
||||
.setKnownClass(object.getKnownClass().getName())
|
||||
.setSeen(object.isSeen())
|
||||
.setDeleted(object.isDeleted())
|
||||
.addAllConfirmedDeletes(() -> object.getConfirmedDeletes().stream().map(e -> e.toString()).iterator())
|
||||
.addAllReferrers(object.getReferrers())
|
||||
.putAllChangelog(object.getChangelog().entrySet().stream().collect(Collectors.toMap(e -> e.getKey().toString(), Map.Entry::getValue)))
|
||||
.addAllSavedRefs(object.getSavedRefs() != null ? object.getSavedRefs() : Collections.emptyList())
|
||||
.setLocked(object.isLocked())
|
||||
.setHaveLocalCopy(object.getHaveLocalCopy().get())
|
||||
.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ObjectMetadata deserialize(ObjectMetadataP message) {
|
||||
try {
|
||||
var obj = new ObjectMetadata(message.getName(), true,
|
||||
(Class<? extends JObjectData>) Class.forName(message.getKnownClass(), true, ObjectMetadata.class.getClassLoader()));
|
||||
if (!JObjectData.class.isAssignableFrom(obj.getKnownClass()))
|
||||
throw new StatusRuntimeException(Status.DATA_LOSS.withDescription("Class not inherited from JObjectData " + message.getKnownClass()));
|
||||
|
||||
obj.getRemoteCopies().putAll(message.getRemoteCopiesMap().entrySet().stream().collect(Collectors.toMap(e -> UUID.fromString(e.getKey()), Map.Entry::getValue)));
|
||||
if (message.getSeen()) obj.markSeen();
|
||||
if (message.getDeleted()) obj.markDeleted();
|
||||
message.getConfirmedDeletesList().stream().map(UUID::fromString).forEach(o -> obj.getConfirmedDeletes().add(o));
|
||||
obj.getReferrersMutable().addAll(message.getReferrersList());
|
||||
obj.getChangelog().putAll(message.getChangelogMap().entrySet().stream().collect(Collectors.toMap(e -> UUID.fromString(e.getKey()), Map.Entry::getValue)));
|
||||
if (message.getSavedRefsCount() > 0)
|
||||
obj.setSavedRefs(new LinkedHashSet<>(message.getSavedRefsList()));
|
||||
if (message.getLocked())
|
||||
obj.lock();
|
||||
if (message.getHaveLocalCopy())
|
||||
obj.getHaveLocalCopy().set(true);
|
||||
|
||||
return obj;
|
||||
} catch (ClassNotFoundException cx) {
|
||||
throw new StatusRuntimeException(Status.DATA_LOSS.withDescription("Could not find class " + message.getKnownClass()));
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,21 @@
|
||||
package com.usatiuk.dhfs.objects.protoserializer;
|
||||
|
||||
import com.usatiuk.dhfs.objects.persistence.BlobP;
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
import jakarta.inject.Inject;
|
||||
|
||||
@ApplicationScoped
|
||||
public class BlobDeserializer implements ProtoDeserializer<BlobP, Object> {
|
||||
@Inject
|
||||
ProtoSerializerService protoSerializerService;
|
||||
|
||||
@Override
|
||||
public Object deserialize(BlobP message) {
|
||||
return switch (message.getDtypeCase()) {
|
||||
case METADATA -> protoSerializerService.deserialize(message.getMetadata());
|
||||
case DATA -> protoSerializerService.deserialize(message.getData());
|
||||
case DTYPE_NOT_SET ->
|
||||
throw new IllegalStateException("Malformed protobuf message " + message.getDtypeCase());
|
||||
};
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,25 @@
|
||||
package com.usatiuk.dhfs.objects.protoserializer;
|
||||
|
||||
import com.usatiuk.dhfs.objects.jrepository.JObjectData;
|
||||
import com.usatiuk.dhfs.objects.persistence.JObjectDataP;
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
import jakarta.inject.Inject;
|
||||
|
||||
@ApplicationScoped
|
||||
public class JObjectDataDeserializer implements ProtoDeserializer<JObjectDataP, JObjectData> {
|
||||
@Inject
|
||||
ProtoSerializerService protoSerializerService;
|
||||
|
||||
@Override
|
||||
public JObjectData deserialize(JObjectDataP message) {
|
||||
return switch (message.getObjCase()) {
|
||||
case FILE -> protoSerializerService.deserialize(message.getFile());
|
||||
case DIRECTORY -> protoSerializerService.deserialize(message.getDirectory());
|
||||
case CHUNKINFO -> protoSerializerService.deserialize(message.getChunkInfo());
|
||||
case CHUNKDATA -> protoSerializerService.deserialize(message.getChunkData());
|
||||
case PEERDIRECTORY -> protoSerializerService.deserialize(message.getPeerDirectory());
|
||||
case PERSISTENTPEERINFO -> protoSerializerService.deserialize(message.getPersistentPeerInfo());
|
||||
case OBJ_NOT_SET -> throw new IllegalStateException("Type not set when deserializing");
|
||||
};
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,7 @@
|
||||
package com.usatiuk.dhfs.objects.protoserializer;
|
||||
|
||||
import com.google.protobuf.Message;
|
||||
|
||||
public interface ProtoDeserializer<M extends Message, O> {
|
||||
O deserialize(M message);
|
||||
}
|
||||
@@ -0,0 +1,7 @@
|
||||
package com.usatiuk.dhfs.objects.protoserializer;
|
||||
|
||||
import com.google.protobuf.Message;
|
||||
|
||||
public interface ProtoSerializer<M extends Message, O> {
|
||||
M serialize(O object);
|
||||
}
|
||||
@@ -0,0 +1,119 @@
|
||||
package com.usatiuk.dhfs.objects.protoserializer;
|
||||
|
||||
import com.google.protobuf.Message;
|
||||
import com.usatiuk.dhfs.objects.persistence.*;
|
||||
import io.quarkus.arc.ClientProxy;
|
||||
import jakarta.annotation.PostConstruct;
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
import jakarta.enterprise.inject.Instance;
|
||||
import jakarta.inject.Inject;
|
||||
|
||||
import java.lang.reflect.ParameterizedType;
|
||||
import java.util.HashMap;
|
||||
import java.util.Optional;
|
||||
|
||||
@ApplicationScoped
|
||||
public class ProtoSerializerService {
|
||||
|
||||
@FunctionalInterface
|
||||
public interface SerializationFn<M extends Message, O> {
|
||||
M apply(O obj);
|
||||
}
|
||||
|
||||
@FunctionalInterface
|
||||
public interface DeserializationFn<M extends Message, O> {
|
||||
O apply(M message);
|
||||
}
|
||||
|
||||
private final HashMap<Class<?>, SerializationFn<? extends Message, ?>> _serializers = new HashMap<>();
|
||||
private final HashMap<Class<? extends Message>, DeserializationFn<? extends Message, ?>> _deserializers = new HashMap<>();
|
||||
|
||||
// Needed as otherwise they are removed
|
||||
@Inject
|
||||
Instance<ProtoSerializer<?, ?>> _protoSerializers;
|
||||
|
||||
@Inject
|
||||
Instance<ProtoDeserializer<?, ?>> _protoDeserializers;
|
||||
|
||||
@PostConstruct
|
||||
void init() {
|
||||
for (var s : _protoSerializers) {
|
||||
var args = ((ParameterizedType) ClientProxy.unwrap(s).getClass()
|
||||
.getGenericInterfaces()[0]).getActualTypeArguments(); //FIXME:
|
||||
Class<? extends Message> messageClass = (Class<? extends Message>) args[0];
|
||||
Class<?> objClass = (Class<?>) args[1];
|
||||
|
||||
if (_serializers.containsKey(objClass))
|
||||
throw new IllegalStateException("Already registered serializer for: " + objClass);
|
||||
|
||||
_serializers.put(objClass, obj -> ((ProtoSerializer) s).serialize(obj));
|
||||
}
|
||||
|
||||
for (var s : _protoDeserializers) {
|
||||
var args = ((ParameterizedType) ClientProxy.unwrap(s).getClass()
|
||||
.getGenericInterfaces()[0]).getActualTypeArguments(); //FIXME:
|
||||
Class<? extends Message> messageClass = (Class<? extends Message>) args[0];
|
||||
Class<?> objClass = (Class<?>) args[1];
|
||||
|
||||
if (_deserializers.containsKey(messageClass))
|
||||
throw new IllegalStateException("Already registered deserializer: " + messageClass);
|
||||
|
||||
_deserializers.put(messageClass, msg -> ((ProtoDeserializer) s).deserialize(msg));
|
||||
}
|
||||
}
|
||||
|
||||
public <M extends Message, O> M serialize(O object) {
|
||||
if (!_serializers.containsKey(object.getClass()))
|
||||
throw new IllegalStateException("Serializer not registered: " + object.getClass());
|
||||
return ((SerializationFn<M, O>) _serializers.get(object.getClass())).apply(object);
|
||||
}
|
||||
|
||||
// FIXME: This is annoying
|
||||
public <O> Optional<JObjectDataP> serializeToJObjectDataPInternal(O object) {
|
||||
var ser = serialize(object);
|
||||
if (ser instanceof FileP) {
|
||||
return Optional.of(JObjectDataP.newBuilder().setFile((FileP) ser).build());
|
||||
} else if (ser instanceof DirectoryP) {
|
||||
return Optional.of(JObjectDataP.newBuilder().setDirectory((DirectoryP) ser).build());
|
||||
} else if (ser instanceof ChunkInfoP) {
|
||||
return Optional.of(JObjectDataP.newBuilder().setChunkInfo((ChunkInfoP) ser).build());
|
||||
} else if (ser instanceof ChunkDataP) {
|
||||
return Optional.of(JObjectDataP.newBuilder().setChunkData((ChunkDataP) ser).build());
|
||||
} else if (ser instanceof PeerDirectoryP) {
|
||||
return Optional.of(JObjectDataP.newBuilder().setPeerDirectory((PeerDirectoryP) ser).build());
|
||||
} else if (ser instanceof PersistentPeerInfoP) {
|
||||
return Optional.of(JObjectDataP.newBuilder().setPersistentPeerInfo((PersistentPeerInfoP) ser).build());
|
||||
} else {
|
||||
return Optional.empty();
|
||||
}
|
||||
}
|
||||
|
||||
// FIXME: This is annoying
|
||||
public <O> JObjectDataP serializeToJObjectDataP(O object) {
|
||||
if (object == null) throw new IllegalArgumentException("Object to serialize shouldn't be null");
|
||||
|
||||
return serializeToJObjectDataPInternal(object).orElseThrow(() -> new IllegalStateException("Unknown JObjectDataP type: " + object.getClass()));
|
||||
}
|
||||
|
||||
public <O> BlobP serializeToBlobP(O object) {
|
||||
if (object == null) throw new IllegalArgumentException("Object to serialize shouldn't be null");
|
||||
|
||||
var jobjd = serializeToJObjectDataPInternal(object);
|
||||
if (jobjd.isPresent()) {
|
||||
return BlobP.newBuilder().setData((JObjectDataP) jobjd.get()).build();
|
||||
}
|
||||
|
||||
var ser = serialize(object);
|
||||
if (ser instanceof ObjectMetadataP) {
|
||||
return BlobP.newBuilder().setMetadata((ObjectMetadataP) ser).build();
|
||||
} else {
|
||||
throw new IllegalStateException("Unknown BlobP type: " + ser.getClass());
|
||||
}
|
||||
}
|
||||
|
||||
public <M extends Message, O> O deserialize(M message) {
|
||||
if (!_deserializers.containsKey(message.getClass()))
|
||||
throw new IllegalStateException("Deserializer not registered: " + message.getClass());
|
||||
return ((DeserializationFn<M, O>) _deserializers.get(message.getClass())).apply(message);
|
||||
}
|
||||
}
|
||||
@@ -1,10 +1,11 @@
|
||||
package com.usatiuk.dhfs.objects.repository;
|
||||
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.protobuf.ByteString;
|
||||
import com.usatiuk.dhfs.objects.jrepository.JObject;
|
||||
import com.usatiuk.dhfs.objects.jrepository.JObjectManager;
|
||||
import com.usatiuk.dhfs.objects.jrepository.PushResolution;
|
||||
import com.usatiuk.dhfs.objects.persistence.JObjectDataP;
|
||||
import com.usatiuk.dhfs.objects.protoserializer.ProtoSerializerService;
|
||||
import io.grpc.Status;
|
||||
import io.grpc.StatusRuntimeException;
|
||||
import io.quarkus.logging.Log;
|
||||
@@ -33,15 +34,17 @@ public class RemoteObjectServiceClient {
|
||||
SyncHandler syncHandler;
|
||||
@Inject
|
||||
InvalidationQueueService invalidationQueueService;
|
||||
@Inject
|
||||
ProtoSerializerService protoSerializerService;
|
||||
|
||||
public Pair<ObjectHeader, ByteString> getSpecificObject(UUID host, String name) {
|
||||
public Pair<ObjectHeader, JObjectDataP> getSpecificObject(UUID host, String name) {
|
||||
return rpcClientFactory.withObjSyncClient(host, client -> {
|
||||
var reply = client.getObject(GetObjectRequest.newBuilder().setSelfUuid(persistentRemoteHostsService.getSelfUuid().toString()).setName(name).build());
|
||||
return Pair.of(reply.getObject().getHeader(), reply.getObject().getContent());
|
||||
});
|
||||
}
|
||||
|
||||
public ByteString getObject(JObject<?> jObject) {
|
||||
public JObjectDataP getObject(JObject<?> jObject) {
|
||||
jObject.assertRWLock();
|
||||
|
||||
var targets = jObject.runReadLocked(JObject.ResolutionStrategy.NO_RESOLUTION, (md, d) -> {
|
||||
@@ -109,7 +112,10 @@ public class RemoteObjectServiceClient {
|
||||
(m, d) -> {
|
||||
if (m.getKnownClass().isAnnotationPresent(PushResolution.class) && d == null)
|
||||
Log.warn("Object " + m.getName() + " is marked as PushResolution but no resolution found");
|
||||
return m.toRpcHeader(d);
|
||||
if (m.getKnownClass().isAnnotationPresent(PushResolution.class))
|
||||
return m.toRpcHeader(protoSerializerService.serializeToJObjectDataP(d));
|
||||
else
|
||||
return m.toRpcHeader(null);
|
||||
});
|
||||
obj.markSeen();
|
||||
builder.setHeader(header);
|
||||
|
||||
@@ -1,10 +1,10 @@
|
||||
package com.usatiuk.dhfs.objects.repository;
|
||||
|
||||
import com.google.protobuf.ByteString;
|
||||
import com.usatiuk.dhfs.SerializationHelper;
|
||||
import com.usatiuk.dhfs.objects.jrepository.DeletedObjectAccessException;
|
||||
import com.usatiuk.dhfs.objects.jrepository.JObject;
|
||||
import com.usatiuk.dhfs.objects.jrepository.JObjectManager;
|
||||
import com.usatiuk.dhfs.objects.persistence.JObjectDataP;
|
||||
import com.usatiuk.dhfs.objects.protoserializer.ProtoSerializerService;
|
||||
import com.usatiuk.dhfs.objects.repository.autosync.AutoSyncProcessor;
|
||||
import io.grpc.Status;
|
||||
import io.grpc.StatusRuntimeException;
|
||||
@@ -40,6 +40,9 @@ public class RemoteObjectServiceServer implements DhfsObjectSyncGrpc {
|
||||
@Inject
|
||||
InvalidationQueueService invalidationQueueService;
|
||||
|
||||
@Inject
|
||||
ProtoSerializerService protoSerializerService;
|
||||
|
||||
@Override
|
||||
@Blocking
|
||||
public Uni<GetObjectReply> getObject(GetObjectRequest request) {
|
||||
@@ -51,13 +54,13 @@ public class RemoteObjectServiceServer implements DhfsObjectSyncGrpc {
|
||||
|
||||
var obj = jObjectManager.get(request.getName()).orElseThrow(() -> new StatusRuntimeException(Status.NOT_FOUND));
|
||||
|
||||
Pair<ObjectHeader, ByteString> read = obj.runReadLocked(JObject.ResolutionStrategy.LOCAL_ONLY, (meta, data) -> {
|
||||
Pair<ObjectHeader, JObjectDataP> read = obj.runReadLocked(JObject.ResolutionStrategy.LOCAL_ONLY, (meta, data) -> {
|
||||
if (data == null) {
|
||||
Log.info("<-- getObject FAIL: " + request.getName() + " from " + request.getSelfUuid());
|
||||
throw new StatusRuntimeException(Status.ABORTED.withDescription("Not available locally"));
|
||||
}
|
||||
data.extractRefs().forEach(ref -> jObjectManager.get(ref).ifPresent(JObject::markSeen));
|
||||
return Pair.of(meta.toRpcHeader(), SerializationHelper.serialize(data));
|
||||
return Pair.of(meta.toRpcHeader(), protoSerializerService.serializeToJObjectDataP(data));
|
||||
});
|
||||
obj.markSeen();
|
||||
var replyObj = ApiObject.newBuilder().setHeader(read.getLeft()).setContent(read.getRight()).build();
|
||||
|
||||
@@ -4,6 +4,7 @@ import com.usatiuk.dhfs.SerializationHelper;
|
||||
import com.usatiuk.dhfs.objects.jrepository.JObject;
|
||||
import com.usatiuk.dhfs.objects.jrepository.JObjectData;
|
||||
import com.usatiuk.dhfs.objects.jrepository.JObjectManager;
|
||||
import com.usatiuk.dhfs.objects.protoserializer.ProtoSerializerService;
|
||||
import io.grpc.Status;
|
||||
import io.grpc.StatusRuntimeException;
|
||||
import io.quarkus.logging.Log;
|
||||
@@ -32,6 +33,8 @@ public class SyncHandler {
|
||||
Instance<ConflictResolver> conflictResolvers;
|
||||
@Inject
|
||||
PersistentRemoteHostsService persistentRemoteHostsService;
|
||||
@Inject
|
||||
ProtoSerializerService protoSerializerService;
|
||||
|
||||
public void doInitialResync(UUID host) {
|
||||
Log.info("Doing initial resync for " + host);
|
||||
@@ -104,12 +107,12 @@ public class SyncHandler {
|
||||
md.getChangelog().putAll(receivedMap);
|
||||
md.getChangelog().putIfAbsent(persistentRemoteHostsService.getSelfUuid(), 0L);
|
||||
if (header.hasPushedData())
|
||||
found.externalResolution(SerializationHelper.deserialize(header.getPushedData()));
|
||||
found.externalResolution(protoSerializerService.deserialize(header.getPushedData()));
|
||||
return false;
|
||||
} else if (data == null && header.hasPushedData()) {
|
||||
found.tryResolve(JObject.ResolutionStrategy.LOCAL_ONLY);
|
||||
if (found.getData() == null)
|
||||
found.externalResolution(SerializationHelper.deserialize(header.getPushedData()));
|
||||
found.externalResolution(protoSerializerService.deserialize(header.getPushedData()));
|
||||
}
|
||||
|
||||
assert Objects.equals(receivedTotalVer, md.getOurVersion());
|
||||
@@ -127,10 +130,10 @@ public class SyncHandler {
|
||||
ObjectHeader theirsHeader;
|
||||
if (header.hasPushedData()) {
|
||||
theirsHeader = header;
|
||||
theirsData = SerializationHelper.deserialize(header.getPushedData());
|
||||
theirsData = protoSerializerService.deserialize(header.getPushedData());
|
||||
} else {
|
||||
var got = remoteObjectServiceClient.getSpecificObject(from, header.getName());
|
||||
theirsData = SerializationHelper.deserialize(got.getRight());
|
||||
theirsData = protoSerializerService.deserialize(got.getRight());
|
||||
theirsHeader = got.getLeft();
|
||||
}
|
||||
|
||||
|
||||
@@ -0,0 +1,24 @@
|
||||
package com.usatiuk.dhfs.objects.repository.peersync;
|
||||
|
||||
import com.usatiuk.dhfs.objects.persistence.PeerDirectoryP;
|
||||
import com.usatiuk.dhfs.objects.protoserializer.ProtoDeserializer;
|
||||
import com.usatiuk.dhfs.objects.protoserializer.ProtoSerializer;
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
|
||||
import java.util.Objects;
|
||||
import java.util.UUID;
|
||||
|
||||
@ApplicationScoped
|
||||
public class PeerDirectorySerializer implements ProtoSerializer<PeerDirectoryP, PeerDirectory>, ProtoDeserializer<PeerDirectoryP, PeerDirectory> {
|
||||
@Override
|
||||
public PeerDirectory deserialize(PeerDirectoryP message) {
|
||||
var ret = new PeerDirectory();
|
||||
message.getPeersList().stream().map(UUID::fromString).forEach(ret.getPeers()::add);
|
||||
return ret;
|
||||
}
|
||||
|
||||
@Override
|
||||
public PeerDirectoryP serialize(PeerDirectory object) {
|
||||
return PeerDirectoryP.newBuilder().addAllPeers(() -> object.getPeers().stream().map(Objects::toString).iterator()).build();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,39 @@
|
||||
package com.usatiuk.dhfs.objects.repository.peersync;
|
||||
|
||||
import com.google.protobuf.UnsafeByteOperations;
|
||||
import com.usatiuk.dhfs.objects.persistence.PersistentPeerInfoP;
|
||||
import com.usatiuk.dhfs.objects.protoserializer.ProtoDeserializer;
|
||||
import com.usatiuk.dhfs.objects.protoserializer.ProtoSerializer;
|
||||
import com.usatiuk.dhfs.objects.repository.CertificateTools;
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
|
||||
import java.security.cert.CertificateEncodingException;
|
||||
import java.security.cert.CertificateException;
|
||||
import java.util.UUID;
|
||||
|
||||
@ApplicationScoped
|
||||
public class PersistentPeerInfoSerializer implements ProtoSerializer<PersistentPeerInfoP, PersistentPeerInfo>, ProtoDeserializer<PersistentPeerInfoP, PersistentPeerInfo> {
|
||||
@Override
|
||||
public PersistentPeerInfo deserialize(PersistentPeerInfoP message) {
|
||||
try {
|
||||
return new PersistentPeerInfo(
|
||||
UUID.fromString(message.getUuid()),
|
||||
CertificateTools.certFromBytes(message.getCert().toByteArray())
|
||||
);
|
||||
} catch (CertificateException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public PersistentPeerInfoP serialize(PersistentPeerInfo object) {
|
||||
try {
|
||||
return PersistentPeerInfoP.newBuilder()
|
||||
.setUuid(object.getUuid().toString())
|
||||
.setCert(UnsafeByteOperations.unsafeWrap(object.getCertificate().getEncoded()))
|
||||
.build();
|
||||
} catch (CertificateEncodingException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,7 +1,6 @@
|
||||
package com.usatiuk.dhfs.objects.repository.persistence;
|
||||
|
||||
import com.google.protobuf.ByteString;
|
||||
import com.google.protobuf.UnsafeByteOperations;
|
||||
import com.usatiuk.dhfs.objects.persistence.BlobP;
|
||||
import io.grpc.Status;
|
||||
import io.grpc.StatusRuntimeException;
|
||||
import io.quarkus.logging.Log;
|
||||
@@ -13,6 +12,8 @@ import jakarta.enterprise.event.Observes;
|
||||
import org.eclipse.microprofile.config.inject.ConfigProperty;
|
||||
|
||||
import javax.annotation.Nonnull;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.FileOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Files;
|
||||
@@ -65,12 +66,12 @@ public class FileObjectPersistentStore implements ObjectPersistentStore {
|
||||
|
||||
@Nonnull
|
||||
@Override
|
||||
public ByteString readObject(String name) {
|
||||
public BlobP readObject(String name) {
|
||||
var file = Path.of(root, name);
|
||||
|
||||
try {
|
||||
return UnsafeByteOperations.unsafeWrap(Files.readAllBytes(file));
|
||||
} catch (NoSuchFileException fx) {
|
||||
try (var fs = new FileInputStream(file.toFile())) {
|
||||
return BlobP.parseFrom(fs);
|
||||
} catch (FileNotFoundException | NoSuchFileException fx) {
|
||||
throw new StatusRuntimeException(Status.NOT_FOUND);
|
||||
} catch (IOException e) {
|
||||
Log.error("Error reading file " + file, e);
|
||||
@@ -79,14 +80,12 @@ public class FileObjectPersistentStore implements ObjectPersistentStore {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeObject(String name, ByteString data) {
|
||||
public void writeObject(String name, BlobP data) {
|
||||
var file = Path.of(root, name);
|
||||
|
||||
try {
|
||||
try (var fc = new FileOutputStream(file.toFile(), false);
|
||||
var ch = fc.getChannel().truncate(0)) {
|
||||
if (ch.write(data.asReadOnlyByteBuffer()) != data.size())
|
||||
throw new StatusRuntimeException(Status.INTERNAL.withDescription("Could not write all bytes to file"));
|
||||
try (var fc = new FileOutputStream(file.toFile(), false)) {
|
||||
data.writeTo(fc);
|
||||
}
|
||||
} catch (IOException e) {
|
||||
Log.error("Error writing file " + file, e);
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package com.usatiuk.dhfs.objects.repository.persistence;
|
||||
|
||||
import com.google.protobuf.ByteString;
|
||||
import com.usatiuk.dhfs.objects.persistence.BlobP;
|
||||
|
||||
import javax.annotation.Nonnull;
|
||||
import java.util.List;
|
||||
@@ -13,9 +14,9 @@ public interface ObjectPersistentStore {
|
||||
Boolean existsObject(String name);
|
||||
|
||||
@Nonnull
|
||||
ByteString readObject(String name);
|
||||
BlobP readObject(String name);
|
||||
|
||||
void writeObject(String name, ByteString data);
|
||||
void writeObject(String name, BlobP data);
|
||||
|
||||
void deleteObject(String name);
|
||||
}
|
||||
|
||||
77
server/src/main/proto/dhfs_objects_serial.proto
Normal file
77
server/src/main/proto/dhfs_objects_serial.proto
Normal file
@@ -0,0 +1,77 @@
|
||||
syntax = "proto3";
|
||||
|
||||
option java_multiple_files = true;
|
||||
option java_package = "com.usatiuk.dhfs.objects.persistence";
|
||||
option java_outer_classname = "DhfsObjectPersistence";
|
||||
|
||||
package dhfs.objects.persistence;
|
||||
|
||||
message ObjectMetadataP {
|
||||
string name = 1;
|
||||
map<string, int64> remoteCopies = 2;
|
||||
string knownClass = 3;
|
||||
bool seen = 4;
|
||||
bool deleted = 5;
|
||||
repeated string confirmedDeletes = 6;
|
||||
repeated string referrers = 7;
|
||||
map<string, int64> changelog = 8;
|
||||
repeated string savedRefs = 9;
|
||||
bool locked = 10;
|
||||
bool haveLocalCopy = 11;
|
||||
}
|
||||
|
||||
message FsNodeP {
|
||||
string uuid = 1;
|
||||
int64 mode = 2;
|
||||
int64 ctime = 3;
|
||||
int64 mtime = 4;
|
||||
}
|
||||
|
||||
message FileP {
|
||||
FsNodeP fsNode = 1;
|
||||
map<int64, string> chunks = 2;
|
||||
string parent = 3;
|
||||
bool symlink = 4;
|
||||
}
|
||||
|
||||
message DirectoryP {
|
||||
FsNodeP fsNode = 1;
|
||||
map<string, string> children = 2;
|
||||
}
|
||||
|
||||
message ChunkInfoP {
|
||||
string name = 1;
|
||||
int32 size = 2;
|
||||
}
|
||||
|
||||
message ChunkDataP {
|
||||
string name = 1;
|
||||
bytes data = 2;
|
||||
}
|
||||
|
||||
message PeerDirectoryP {
|
||||
repeated string peers = 1;
|
||||
}
|
||||
|
||||
message PersistentPeerInfoP {
|
||||
string uuid = 1;
|
||||
bytes cert = 2;
|
||||
}
|
||||
|
||||
message JObjectDataP {
|
||||
oneof obj {
|
||||
FileP file = 2;
|
||||
DirectoryP directory = 3;
|
||||
ChunkInfoP chunkInfo = 4;
|
||||
ChunkDataP chunkData = 5;
|
||||
PeerDirectoryP peerDirectory = 6;
|
||||
PersistentPeerInfoP persistentPeerInfo = 7;
|
||||
}
|
||||
}
|
||||
|
||||
message BlobP {
|
||||
oneof dtype {
|
||||
ObjectMetadataP metadata = 1;
|
||||
JObjectDataP data = 2;
|
||||
}
|
||||
}
|
||||
@@ -1,5 +1,7 @@
|
||||
syntax = "proto3";
|
||||
|
||||
import "dhfs_objects_serial.proto";
|
||||
|
||||
option java_multiple_files = true;
|
||||
option java_package = "com.usatiuk.dhfs.objects.repository";
|
||||
option java_outer_classname = "DhfsObjectSyncApi";
|
||||
@@ -35,12 +37,12 @@ message ObjectChangelog {
|
||||
message ObjectHeader {
|
||||
string name = 2;
|
||||
ObjectChangelog changelog = 5;
|
||||
optional bytes pushedData = 6;
|
||||
optional dhfs.objects.persistence.JObjectDataP pushedData = 6;
|
||||
}
|
||||
|
||||
message ApiObject {
|
||||
ObjectHeader header = 1;
|
||||
bytes content = 2;
|
||||
dhfs.objects.persistence.JObjectDataP content = 2;
|
||||
}
|
||||
|
||||
message GetObjectRequest {
|
||||
|
||||
@@ -12,7 +12,7 @@ dhfs.objects.reconnect_interval=5s
|
||||
dhfs.fuse.root=${HOME}/dhfs_data/dhfs_fuse_root
|
||||
dhfs.fuse.debug=false
|
||||
dhfs.fuse.enabled=true
|
||||
dhfs.files.target_chunk_size=4194304
|
||||
dhfs.files.target_chunk_size=524288
|
||||
# Writes strictly smaller than this will try to merge with blocks nearby
|
||||
dhfs.files.write_merge_threshold=0.8
|
||||
# If a merge would result in a block of greater size than this, stop merging
|
||||
|
||||
@@ -64,7 +64,7 @@ public class DhfsFileServiceSimpleTestImpl {
|
||||
ChunkInfo c2i = new ChunkInfo(c2.getHash(), c2.getBytes().size());
|
||||
ChunkData c3 = new ChunkData(ByteString.copyFrom("91011".getBytes()));
|
||||
ChunkInfo c3i = new ChunkInfo(c3.getHash(), c3.getBytes().size());
|
||||
File f = new File(fuuid, 777, null, false);
|
||||
File f = new File(fuuid, 777, new UUID(0, 0), false);
|
||||
f.getChunks().put(0L, c1.getHash());
|
||||
f.getChunks().put((long) c1.getBytes().size(), c2.getHash());
|
||||
f.getChunks().put((long) c1.getBytes().size() + c2.getBytes().size(), c3.getHash());
|
||||
|
||||
@@ -0,0 +1,30 @@
|
||||
package com.usatiuk.dhfs.persistence;
|
||||
|
||||
import com.usatiuk.dhfs.objects.persistence.BlobP;
|
||||
import com.usatiuk.dhfs.objects.persistence.JObjectDataP;
|
||||
import com.usatiuk.dhfs.objects.persistence.PeerDirectoryP;
|
||||
import com.usatiuk.dhfs.objects.protoserializer.ProtoSerializerService;
|
||||
import com.usatiuk.dhfs.objects.repository.peersync.PeerDirectory;
|
||||
import io.quarkus.test.junit.QuarkusTest;
|
||||
import jakarta.inject.Inject;
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.util.UUID;
|
||||
|
||||
@QuarkusTest
|
||||
public class ProtoSerializationTest {
|
||||
|
||||
@Inject
|
||||
ProtoSerializerService protoSerializerService;
|
||||
|
||||
@Test
|
||||
void SerializeDeserializePeerDirectory() {
|
||||
var pd = new PeerDirectory();
|
||||
pd.getPeers().add(UUID.randomUUID());
|
||||
var ser = BlobP.newBuilder().setData(JObjectDataP.newBuilder().setPeerDirectory((PeerDirectoryP) protoSerializerService.serialize(pd)).build()).build();
|
||||
var deser = (PeerDirectory) protoSerializerService.deserialize(ser);
|
||||
Assertions.assertIterableEquals(pd.getPeers(), deser.getPeers());
|
||||
}
|
||||
|
||||
}
|
||||
Reference in New Issue
Block a user