6 Commits

Author SHA1 Message Date
25e5acaabb include haveLocalCopy in metaHash
Some checks failed
Server / build-dhfs (push) Failing after 34m53s
Server / build-webui (push) Successful in 2m48s
Server / publish-docker (push) Has been skipped
Server / publish-run-wrapper (push) Has been skipped
2024-07-21 13:21:18 +02:00
3d6bb55950 set haveLocalCopy in externalResolution 2024-07-21 13:19:00 +02:00
da04398d26 use ChunkDataP in ChunkData directly 2024-07-21 12:59:08 +02:00
830cde74a8 make chunks 512k by default 2024-07-21 12:37:01 +02:00
e16556ce67 save hasLocalCopy in metadata 2024-07-21 12:33:01 +02:00
0e90c10efc use protobuf for storage 2024-07-21 12:11:28 +02:00
31 changed files with 642 additions and 63 deletions

View File

@@ -27,10 +27,6 @@ public abstract class SerializationHelper {
return deserialize(new ByteArrayInputStream(objectData));
}
public static <T> T deserialize(final ByteString objectData) {
return deserialize(objectData.newInput());
}
public static <T extends Serializable> ByteString serialize(final T obj) {
return UnsafeByteOperations.unsafeWrap(SerializationUtils.serialize(obj));
}

View File

@@ -3,6 +3,7 @@ package com.usatiuk.dhfs.files.objects;
import com.google.protobuf.ByteString;
import com.usatiuk.dhfs.files.conflicts.NoOpConflictResolver;
import com.usatiuk.dhfs.objects.jrepository.JObjectData;
import com.usatiuk.dhfs.objects.persistence.ChunkDataP;
import com.usatiuk.dhfs.objects.repository.ConflictResolver;
import lombok.Getter;
import net.openhft.hashing.LongTupleHashFunction;
@@ -19,21 +20,37 @@ public class ChunkData extends JObjectData {
@Serial
private static final long serialVersionUID = 1;
final String _hash;
final ByteString _bytes;
final ChunkDataP _data;
public ChunkData(ByteString bytes) {
super();
this._bytes = bytes;
// TODO: There might be (most definitely) a copy there
this._hash = Arrays.stream(LongTupleHashFunction.xx128().hashBytes(_bytes.asReadOnlyByteBuffer()))
.mapToObj(Long::toHexString).collect(Collectors.joining());
_data = ChunkDataP.newBuilder()
.setData(bytes)
// TODO: There might be (most definitely) a copy there
.setName(Arrays.stream(LongTupleHashFunction.xx128().hashBytes(bytes.asReadOnlyByteBuffer()))
.mapToObj(Long::toHexString).collect(Collectors.joining()))
.build();
}
public ChunkData(ByteString bytes, String name) {
super();
this._bytes = bytes;
this._hash = name;
_data = ChunkDataP.newBuilder()
.setData(bytes)
.setName(name)
.build();
}
public ChunkData(ChunkDataP chunkDataP) {
super();
_data = chunkDataP;
}
public String getHash() {
return _data.getName();
}
public ByteString getBytes() {
return _data.getData();
}
public static String getNameFromHash(String hash) {
@@ -45,17 +62,17 @@ public class ChunkData extends JObjectData {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ChunkData chunkData = (ChunkData) o;
return Objects.equals(_hash, chunkData._hash);
return Objects.equals(_data.getName(), chunkData.getData().getName());
}
@Override
public int hashCode() {
return Objects.hashCode(_hash);
return Objects.hashCode(_data.getName());
}
@Override
public String getName() {
return getNameFromHash(_hash);
return getNameFromHash(_data.getName());
}
@Override
@@ -75,6 +92,6 @@ public class ChunkData extends JObjectData {
@Override
public long estimateSize() {
return _bytes.size();
return _data.getData().size();
}
}

View File

@@ -0,0 +1,19 @@
package com.usatiuk.dhfs.files.objects;
import com.usatiuk.dhfs.objects.persistence.ChunkDataP;
import com.usatiuk.dhfs.objects.protoserializer.ProtoDeserializer;
import com.usatiuk.dhfs.objects.protoserializer.ProtoSerializer;
import jakarta.enterprise.context.ApplicationScoped;
@ApplicationScoped
public class ChunkDataSerializer implements ProtoSerializer<ChunkDataP, ChunkData>, ProtoDeserializer<ChunkDataP, ChunkData> {
@Override
public ChunkData deserialize(ChunkDataP message) {
return new ChunkData(message);
}
@Override
public ChunkDataP serialize(ChunkData object) {
return object.getData();
}
}

View File

@@ -0,0 +1,22 @@
package com.usatiuk.dhfs.files.objects;
import com.usatiuk.dhfs.objects.persistence.ChunkInfoP;
import com.usatiuk.dhfs.objects.protoserializer.ProtoDeserializer;
import com.usatiuk.dhfs.objects.protoserializer.ProtoSerializer;
import jakarta.enterprise.context.ApplicationScoped;
@ApplicationScoped
public class ChunkInfoSerializer implements ProtoSerializer<ChunkInfoP, ChunkInfo>, ProtoDeserializer<ChunkInfoP, ChunkInfo> {
@Override
public ChunkInfo deserialize(ChunkInfoP message) {
return new ChunkInfo(message.getName(), message.getSize());
}
@Override
public ChunkInfoP serialize(ChunkInfo object) {
return ChunkInfoP.newBuilder()
.setName(object.getHash())
.setSize(object.getSize())
.build();
}
}

View File

@@ -0,0 +1,37 @@
package com.usatiuk.dhfs.files.objects;
import com.usatiuk.dhfs.objects.persistence.DirectoryP;
import com.usatiuk.dhfs.objects.persistence.FsNodeP;
import com.usatiuk.dhfs.objects.protoserializer.ProtoDeserializer;
import com.usatiuk.dhfs.objects.protoserializer.ProtoSerializer;
import jakarta.enterprise.context.ApplicationScoped;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;
@ApplicationScoped
public class DirectorySerializer implements ProtoSerializer<DirectoryP, Directory>, ProtoDeserializer<DirectoryP, Directory> {
@Override
public Directory deserialize(DirectoryP message) {
var ret = new Directory(UUID.fromString(message.getFsNode().getUuid()));
ret.setMtime(message.getFsNode().getMtime());
ret.setCtime(message.getFsNode().getCtime());
ret.setMode(message.getFsNode().getMode());
ret.getChildren().putAll(message.getChildrenMap().entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> UUID.fromString(e.getValue()))));
return ret;
}
@Override
public DirectoryP serialize(Directory object) {
return DirectoryP.newBuilder()
.setFsNode(FsNodeP.newBuilder()
.setCtime(object.getCtime())
.setMtime(object.getMtime())
.setMode(object.getMode())
.setUuid(object.getUuid().toString())
.build())
.putAllChildren(object.getChildren().entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().toString())))
.build();
}
}

View File

@@ -23,6 +23,8 @@ public class File extends FsNode {
public File(UUID uuid, long mode, UUID parent, boolean symlink) {
super(uuid, mode);
if (parent == null)
throw new IllegalArgumentException("Parent UUID cannot be null");
_parent = parent;
_symlink = symlink;
}

View File

@@ -0,0 +1,38 @@
package com.usatiuk.dhfs.files.objects;
import com.usatiuk.dhfs.objects.persistence.FileP;
import com.usatiuk.dhfs.objects.persistence.FsNodeP;
import com.usatiuk.dhfs.objects.protoserializer.ProtoDeserializer;
import com.usatiuk.dhfs.objects.protoserializer.ProtoSerializer;
import io.quarkus.logging.Log;
import jakarta.enterprise.context.ApplicationScoped;
import java.util.UUID;
@ApplicationScoped
public class FileSerializer implements ProtoSerializer<FileP, File>, ProtoDeserializer<FileP, File> {
@Override
public File deserialize(FileP message) {
var ret = new File(UUID.fromString(message.getFsNode().getUuid()), message.getFsNode().getMode(), UUID.fromString(message.getParent()), message.getSymlink());
ret.setMtime(message.getFsNode().getMtime());
ret.setCtime(message.getFsNode().getCtime());
ret.getChunks().putAll(message.getChunksMap());
return ret;
}
@Override
public FileP serialize(File object) {
var ret = FileP.newBuilder()
.setFsNode(FsNodeP.newBuilder()
.setCtime(object.getCtime())
.setMtime(object.getMtime())
.setMode(object.getMode())
.setUuid(object.getUuid().toString())
.build())
.putAllChunks(object.getChunks())
.setSymlink(object.isSymlink())
.setParent(object.getParent().toString())
.build();
return ret;
}
}

View File

@@ -21,6 +21,7 @@ public class JObject<T extends JObjectData> implements Serializable, Comparable<
protected JObject(JObjectResolver resolver, String name, UUID selfUuid, T obj) {
_resolver = resolver;
_metaPart = new ObjectMetadata(name, false, obj.getClass());
_metaPart.getHaveLocalCopy().set(true);
_dataPart.set(obj);
_metaPart.bumpVersion(selfUuid);
Log.trace("new JObject: " + getName());
@@ -77,6 +78,10 @@ public class JObject<T extends JObjectData> implements Serializable, Comparable<
return _metaPart;
}
public boolean hasLocalCopyMd() {
return _metaPart.getHaveLocalCopy().get();
}
public Class<? extends ConflictResolver> getConflictResolver() {
if (_dataPart.get() == null) throw new NotImplementedException("Data part not found!");
return _dataPart.get().getConflictResolver();
@@ -116,6 +121,7 @@ public class JObject<T extends JObjectData> implements Serializable, Comparable<
var res = _resolver.resolveDataRemote(this);
_metaPart.narrowClass(res.getClass());
_dataPart.set(res);
_metaPart.getHaveLocalCopy().set(true);
hydrateRefs();
verifyRefs();
} // _dataPart.get() == null
@@ -158,6 +164,7 @@ public class JObject<T extends JObjectData> implements Serializable, Comparable<
throw new IllegalStateException("Expected external resolution only for classes with pushResolution " + getName());
_metaPart.narrowClass(data.getClass());
_dataPart.set(data);
_metaPart.getHaveLocalCopy().set(true);
if (!_metaPart.isLocked())
_metaPart.lock();
hydrateRefs();

View File

@@ -1,7 +1,7 @@
package com.usatiuk.dhfs.objects.jrepository;
import com.google.protobuf.ByteString;
import com.usatiuk.dhfs.SerializationHelper;
import com.usatiuk.dhfs.objects.persistence.BlobP;
import com.usatiuk.dhfs.objects.protoserializer.ProtoSerializerService;
import com.usatiuk.dhfs.objects.repository.PersistentRemoteHostsService;
import com.usatiuk.dhfs.objects.repository.persistence.ObjectPersistentStore;
import io.grpc.Status;
@@ -30,6 +30,8 @@ public class JObjectManagerImpl implements JObjectManager {
JObjectResolver jObjectResolver;
@Inject
PersistentRemoteHostsService persistentRemoteHostsService;
@Inject
ProtoSerializerService protoSerializerService;
private Thread _refCleanupThread;
@Startup
@@ -78,7 +80,7 @@ public class JObjectManagerImpl implements JObjectManager {
if (inMap != null) return Optional.of(inMap);
}
ByteString readMd;
BlobP readMd;
try {
readMd = objectPersistentStore.readObject("meta_" + name);
} catch (StatusRuntimeException ex) {
@@ -86,7 +88,7 @@ public class JObjectManagerImpl implements JObjectManager {
return Optional.empty();
throw ex;
}
var meta = SerializationHelper.deserialize(readMd);
var meta = protoSerializerService.deserialize(readMd);
if (!(meta instanceof ObjectMetadata))
throw new StatusRuntimeException(Status.DATA_LOSS.withDescription("Unexpected metadata type for " + name));

View File

@@ -164,7 +164,7 @@ public class JObjectRefProcessor {
got.get().tryResolve(JObject.ResolutionStrategy.LOCAL_ONLY);
Log.trace("Deleting " + m.getName());
m.delete();
m.markDeleted();
Stream<String> refs = Stream.empty();

View File

@@ -1,7 +1,7 @@
package com.usatiuk.dhfs.objects.jrepository;
import com.google.common.collect.Streams;
import com.usatiuk.dhfs.SerializationHelper;
import com.usatiuk.dhfs.objects.protoserializer.ProtoSerializerService;
import com.usatiuk.dhfs.objects.repository.InvalidationQueueService;
import com.usatiuk.dhfs.objects.repository.PersistentRemoteHostsService;
import com.usatiuk.dhfs.objects.repository.RemoteObjectServiceClient;
@@ -40,6 +40,8 @@ public class JObjectResolver {
@Inject
PersistentRemoteHostsService persistentRemoteHostsService;
@Inject
ProtoSerializerService protoSerializerService;
@Inject
JObjectRefProcessor jObjectRefProcessor;
@ConfigProperty(name = "dhfs.objects.ref_verification")
boolean refVerification;
@@ -53,8 +55,12 @@ public class JObjectResolver {
}
public boolean hasLocalCopy(JObject<?> self) {
if (!self.isDeleted() && refVerification) {
if (self.hasLocalCopyMd() && !(self.getData() != null || objectPersistentStore.existsObject(self.getName())))
throw new IllegalStateException("hasLocalCopy mismatch for " + self.getName());
}
// FIXME: Read/write lock assert?
return objectPersistentStore.existsObject(self.getName());
return self.hasLocalCopyMd();
}
public void backupRefs(JObject<?> self) {
@@ -129,7 +135,7 @@ public class JObjectResolver {
self.tryResolve(JObject.ResolutionStrategy.LOCAL_ONLY);
Log.trace("Quick delete of: " + self.getName());
self.getMeta().delete();
self.getMeta().markDeleted();
Stream<String> refs = Stream.empty();
@@ -152,7 +158,7 @@ public class JObjectResolver {
// jObject.assertRWLock();
// FIXME: No way to assert read lock?
if (objectPersistentStore.existsObject(jObject.getName()))
return Optional.of(SerializationHelper.deserialize(objectPersistentStore.readObject(jObject.getName())));
return Optional.of(protoSerializerService.deserialize(objectPersistentStore.readObject(jObject.getName())));
return Optional.empty();
}
@@ -160,13 +166,14 @@ public class JObjectResolver {
var obj = remoteObjectServiceClient.getObject(jObject);
jObjectWriteback.markDirty(jObject);
invalidationQueueService.pushInvalidationToAll(jObject.getName());
return SerializationHelper.deserialize(obj);
return protoSerializerService.deserialize(obj);
}
public void removeLocal(JObject<?> jObject, String name) {
jObject.assertRWLock();
try {
Log.trace("Invalidating " + name);
jObject.getMeta().getHaveLocalCopy().set(false);
jObjectWriteback.remove(jObject);
objectPersistentStore.deleteObject(name);
} catch (StatusRuntimeException sx) {

View File

@@ -1,6 +1,6 @@
package com.usatiuk.dhfs.objects.jrepository;
import com.usatiuk.dhfs.SerializationHelper;
import com.usatiuk.dhfs.objects.protoserializer.ProtoSerializerService;
import com.usatiuk.dhfs.objects.repository.persistence.ObjectPersistentStore;
import com.usatiuk.utils.HashSetDelayedBlockingQueue;
import io.quarkus.logging.Log;
@@ -49,6 +49,8 @@ public class JObjectWriteback {
ObjectPersistentStore objectPersistentStore;
@Inject
JObjectSizeEstimator jObjectSizeEstimator;
@Inject
ProtoSerializerService protoSerializerService;
@ConfigProperty(name = "dhfs.objects.writeback.limit")
long sizeLimit;
@ConfigProperty(name = "dhfs.objects.writeback.watermark-high")
@@ -164,9 +166,9 @@ public class JObjectWriteback {
objectPersistentStore.deleteObject(m.getName());
return;
}
objectPersistentStore.writeObject("meta_" + m.getName(), SerializationHelper.serialize(m));
objectPersistentStore.writeObject("meta_" + m.getName(), protoSerializerService.serializeToBlobP(m));
if (data != null)
objectPersistentStore.writeObject(m.getName(), SerializationHelper.serialize(data));
objectPersistentStore.writeObject(m.getName(), protoSerializerService.serializeToBlobP(data));
}
public void remove(JObject<?> object) {

View File

@@ -1,6 +1,6 @@
package com.usatiuk.dhfs.objects.jrepository;
import com.usatiuk.dhfs.SerializationHelper;
import com.usatiuk.dhfs.objects.persistence.JObjectDataP;
import com.usatiuk.dhfs.objects.repository.ObjectChangelog;
import com.usatiuk.dhfs.objects.repository.ObjectChangelogEntry;
import com.usatiuk.dhfs.objects.repository.ObjectHeader;
@@ -40,6 +40,8 @@ public class ObjectMetadata implements Serializable {
private Set<String> _savedRefs = Collections.emptySet();
@Getter
private boolean _locked = false;
@Getter
private AtomicBoolean _haveLocalCopy = new AtomicBoolean(false);
private transient AtomicBoolean _written = new AtomicBoolean(true);
public ObjectMetadata(String name, boolean written, Class<? extends JObjectData> knownClass) {
@@ -82,7 +84,7 @@ public class ObjectMetadata implements Serializable {
_seen.set(true);
}
public void delete() {
public void markDeleted() {
_deleted.set(true);
}
@@ -139,6 +141,10 @@ public class ObjectMetadata implements Serializable {
return _referrers.stream().toList();
}
protected Collection<String> getReferrersMutable() {
return _referrers;
}
public boolean isDeletionCandidate() {
return !isLocked() && !isReferred();
}
@@ -176,18 +182,18 @@ public class ObjectMetadata implements Serializable {
return headerBuilder.build();
}
public ObjectHeader toRpcHeader(JObjectData data) {
public ObjectHeader toRpcHeader(JObjectDataP data) {
var headerBuilder = ObjectHeader.newBuilder().setName(getName());
headerBuilder.setChangelog(toRpcChangelog());
if (data != null && data.getClass().isAnnotationPresent(PushResolution.class))
headerBuilder.setPushedData(SerializationHelper.serialize(data));
headerBuilder.setPushedData(data);
return headerBuilder.build();
}
public int metaHash() {
return Objects.hash(isSeen(), getKnownClass(), isDeleted(), _referrers, _locked, _remoteCopies, _savedRefs);
return Objects.hash(isSeen(), getKnownClass(), isDeleted(), _referrers, _locked, _remoteCopies, _savedRefs, _haveLocalCopy);
}
// Not really a hash

View File

@@ -0,0 +1,61 @@
package com.usatiuk.dhfs.objects.jrepository;
import com.usatiuk.dhfs.objects.persistence.ObjectMetadataP;
import com.usatiuk.dhfs.objects.protoserializer.ProtoDeserializer;
import com.usatiuk.dhfs.objects.protoserializer.ProtoSerializer;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import jakarta.enterprise.context.ApplicationScoped;
import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;
@ApplicationScoped
public class ObjectMetadataSerializer implements ProtoSerializer<ObjectMetadataP, ObjectMetadata>, ProtoDeserializer<ObjectMetadataP, ObjectMetadata> {
@Override
public ObjectMetadataP serialize(ObjectMetadata object) {
return ObjectMetadataP.newBuilder()
.setName(object.getName())
.putAllRemoteCopies(object.getRemoteCopies().entrySet().stream().collect(Collectors.toMap(e -> e.getKey().toString(), Map.Entry::getValue)))
.setKnownClass(object.getKnownClass().getName())
.setSeen(object.isSeen())
.setDeleted(object.isDeleted())
.addAllConfirmedDeletes(() -> object.getConfirmedDeletes().stream().map(e -> e.toString()).iterator())
.addAllReferrers(object.getReferrers())
.putAllChangelog(object.getChangelog().entrySet().stream().collect(Collectors.toMap(e -> e.getKey().toString(), Map.Entry::getValue)))
.addAllSavedRefs(object.getSavedRefs() != null ? object.getSavedRefs() : Collections.emptyList())
.setLocked(object.isLocked())
.setHaveLocalCopy(object.getHaveLocalCopy().get())
.build();
}
@Override
public ObjectMetadata deserialize(ObjectMetadataP message) {
try {
var obj = new ObjectMetadata(message.getName(), true,
(Class<? extends JObjectData>) Class.forName(message.getKnownClass(), true, ObjectMetadata.class.getClassLoader()));
if (!JObjectData.class.isAssignableFrom(obj.getKnownClass()))
throw new StatusRuntimeException(Status.DATA_LOSS.withDescription("Class not inherited from JObjectData " + message.getKnownClass()));
obj.getRemoteCopies().putAll(message.getRemoteCopiesMap().entrySet().stream().collect(Collectors.toMap(e -> UUID.fromString(e.getKey()), Map.Entry::getValue)));
if (message.getSeen()) obj.markSeen();
if (message.getDeleted()) obj.markDeleted();
message.getConfirmedDeletesList().stream().map(UUID::fromString).forEach(o -> obj.getConfirmedDeletes().add(o));
obj.getReferrersMutable().addAll(message.getReferrersList());
obj.getChangelog().putAll(message.getChangelogMap().entrySet().stream().collect(Collectors.toMap(e -> UUID.fromString(e.getKey()), Map.Entry::getValue)));
if (message.getSavedRefsCount() > 0)
obj.setSavedRefs(new LinkedHashSet<>(message.getSavedRefsList()));
if (message.getLocked())
obj.lock();
if (message.getHaveLocalCopy())
obj.getHaveLocalCopy().set(true);
return obj;
} catch (ClassNotFoundException cx) {
throw new StatusRuntimeException(Status.DATA_LOSS.withDescription("Could not find class " + message.getKnownClass()));
}
}
}

View File

@@ -0,0 +1,21 @@
package com.usatiuk.dhfs.objects.protoserializer;
import com.usatiuk.dhfs.objects.persistence.BlobP;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
@ApplicationScoped
public class BlobDeserializer implements ProtoDeserializer<BlobP, Object> {
@Inject
ProtoSerializerService protoSerializerService;
@Override
public Object deserialize(BlobP message) {
return switch (message.getDtypeCase()) {
case METADATA -> protoSerializerService.deserialize(message.getMetadata());
case DATA -> protoSerializerService.deserialize(message.getData());
case DTYPE_NOT_SET ->
throw new IllegalStateException("Malformed protobuf message " + message.getDtypeCase());
};
}
}

View File

@@ -0,0 +1,25 @@
package com.usatiuk.dhfs.objects.protoserializer;
import com.usatiuk.dhfs.objects.jrepository.JObjectData;
import com.usatiuk.dhfs.objects.persistence.JObjectDataP;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
@ApplicationScoped
public class JObjectDataDeserializer implements ProtoDeserializer<JObjectDataP, JObjectData> {
@Inject
ProtoSerializerService protoSerializerService;
@Override
public JObjectData deserialize(JObjectDataP message) {
return switch (message.getObjCase()) {
case FILE -> protoSerializerService.deserialize(message.getFile());
case DIRECTORY -> protoSerializerService.deserialize(message.getDirectory());
case CHUNKINFO -> protoSerializerService.deserialize(message.getChunkInfo());
case CHUNKDATA -> protoSerializerService.deserialize(message.getChunkData());
case PEERDIRECTORY -> protoSerializerService.deserialize(message.getPeerDirectory());
case PERSISTENTPEERINFO -> protoSerializerService.deserialize(message.getPersistentPeerInfo());
case OBJ_NOT_SET -> throw new IllegalStateException("Type not set when deserializing");
};
}
}

View File

@@ -0,0 +1,7 @@
package com.usatiuk.dhfs.objects.protoserializer;
import com.google.protobuf.Message;
public interface ProtoDeserializer<M extends Message, O> {
O deserialize(M message);
}

View File

@@ -0,0 +1,7 @@
package com.usatiuk.dhfs.objects.protoserializer;
import com.google.protobuf.Message;
public interface ProtoSerializer<M extends Message, O> {
M serialize(O object);
}

View File

@@ -0,0 +1,119 @@
package com.usatiuk.dhfs.objects.protoserializer;
import com.google.protobuf.Message;
import com.usatiuk.dhfs.objects.persistence.*;
import io.quarkus.arc.ClientProxy;
import jakarta.annotation.PostConstruct;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Instance;
import jakarta.inject.Inject;
import java.lang.reflect.ParameterizedType;
import java.util.HashMap;
import java.util.Optional;
@ApplicationScoped
public class ProtoSerializerService {
@FunctionalInterface
public interface SerializationFn<M extends Message, O> {
M apply(O obj);
}
@FunctionalInterface
public interface DeserializationFn<M extends Message, O> {
O apply(M message);
}
private final HashMap<Class<?>, SerializationFn<? extends Message, ?>> _serializers = new HashMap<>();
private final HashMap<Class<? extends Message>, DeserializationFn<? extends Message, ?>> _deserializers = new HashMap<>();
// Needed as otherwise they are removed
@Inject
Instance<ProtoSerializer<?, ?>> _protoSerializers;
@Inject
Instance<ProtoDeserializer<?, ?>> _protoDeserializers;
@PostConstruct
void init() {
for (var s : _protoSerializers) {
var args = ((ParameterizedType) ClientProxy.unwrap(s).getClass()
.getGenericInterfaces()[0]).getActualTypeArguments(); //FIXME:
Class<? extends Message> messageClass = (Class<? extends Message>) args[0];
Class<?> objClass = (Class<?>) args[1];
if (_serializers.containsKey(objClass))
throw new IllegalStateException("Already registered serializer for: " + objClass);
_serializers.put(objClass, obj -> ((ProtoSerializer) s).serialize(obj));
}
for (var s : _protoDeserializers) {
var args = ((ParameterizedType) ClientProxy.unwrap(s).getClass()
.getGenericInterfaces()[0]).getActualTypeArguments(); //FIXME:
Class<? extends Message> messageClass = (Class<? extends Message>) args[0];
Class<?> objClass = (Class<?>) args[1];
if (_deserializers.containsKey(messageClass))
throw new IllegalStateException("Already registered deserializer: " + messageClass);
_deserializers.put(messageClass, msg -> ((ProtoDeserializer) s).deserialize(msg));
}
}
public <M extends Message, O> M serialize(O object) {
if (!_serializers.containsKey(object.getClass()))
throw new IllegalStateException("Serializer not registered: " + object.getClass());
return ((SerializationFn<M, O>) _serializers.get(object.getClass())).apply(object);
}
// FIXME: This is annoying
public <O> Optional<JObjectDataP> serializeToJObjectDataPInternal(O object) {
var ser = serialize(object);
if (ser instanceof FileP) {
return Optional.of(JObjectDataP.newBuilder().setFile((FileP) ser).build());
} else if (ser instanceof DirectoryP) {
return Optional.of(JObjectDataP.newBuilder().setDirectory((DirectoryP) ser).build());
} else if (ser instanceof ChunkInfoP) {
return Optional.of(JObjectDataP.newBuilder().setChunkInfo((ChunkInfoP) ser).build());
} else if (ser instanceof ChunkDataP) {
return Optional.of(JObjectDataP.newBuilder().setChunkData((ChunkDataP) ser).build());
} else if (ser instanceof PeerDirectoryP) {
return Optional.of(JObjectDataP.newBuilder().setPeerDirectory((PeerDirectoryP) ser).build());
} else if (ser instanceof PersistentPeerInfoP) {
return Optional.of(JObjectDataP.newBuilder().setPersistentPeerInfo((PersistentPeerInfoP) ser).build());
} else {
return Optional.empty();
}
}
// FIXME: This is annoying
public <O> JObjectDataP serializeToJObjectDataP(O object) {
if (object == null) throw new IllegalArgumentException("Object to serialize shouldn't be null");
return serializeToJObjectDataPInternal(object).orElseThrow(() -> new IllegalStateException("Unknown JObjectDataP type: " + object.getClass()));
}
public <O> BlobP serializeToBlobP(O object) {
if (object == null) throw new IllegalArgumentException("Object to serialize shouldn't be null");
var jobjd = serializeToJObjectDataPInternal(object);
if (jobjd.isPresent()) {
return BlobP.newBuilder().setData((JObjectDataP) jobjd.get()).build();
}
var ser = serialize(object);
if (ser instanceof ObjectMetadataP) {
return BlobP.newBuilder().setMetadata((ObjectMetadataP) ser).build();
} else {
throw new IllegalStateException("Unknown BlobP type: " + ser.getClass());
}
}
public <M extends Message, O> O deserialize(M message) {
if (!_deserializers.containsKey(message.getClass()))
throw new IllegalStateException("Deserializer not registered: " + message.getClass());
return ((DeserializationFn<M, O>) _deserializers.get(message.getClass())).apply(message);
}
}

View File

@@ -1,10 +1,11 @@
package com.usatiuk.dhfs.objects.repository;
import com.google.common.collect.Maps;
import com.google.protobuf.ByteString;
import com.usatiuk.dhfs.objects.jrepository.JObject;
import com.usatiuk.dhfs.objects.jrepository.JObjectManager;
import com.usatiuk.dhfs.objects.jrepository.PushResolution;
import com.usatiuk.dhfs.objects.persistence.JObjectDataP;
import com.usatiuk.dhfs.objects.protoserializer.ProtoSerializerService;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.quarkus.logging.Log;
@@ -33,15 +34,17 @@ public class RemoteObjectServiceClient {
SyncHandler syncHandler;
@Inject
InvalidationQueueService invalidationQueueService;
@Inject
ProtoSerializerService protoSerializerService;
public Pair<ObjectHeader, ByteString> getSpecificObject(UUID host, String name) {
public Pair<ObjectHeader, JObjectDataP> getSpecificObject(UUID host, String name) {
return rpcClientFactory.withObjSyncClient(host, client -> {
var reply = client.getObject(GetObjectRequest.newBuilder().setSelfUuid(persistentRemoteHostsService.getSelfUuid().toString()).setName(name).build());
return Pair.of(reply.getObject().getHeader(), reply.getObject().getContent());
});
}
public ByteString getObject(JObject<?> jObject) {
public JObjectDataP getObject(JObject<?> jObject) {
jObject.assertRWLock();
var targets = jObject.runReadLocked(JObject.ResolutionStrategy.NO_RESOLUTION, (md, d) -> {
@@ -109,7 +112,10 @@ public class RemoteObjectServiceClient {
(m, d) -> {
if (m.getKnownClass().isAnnotationPresent(PushResolution.class) && d == null)
Log.warn("Object " + m.getName() + " is marked as PushResolution but no resolution found");
return m.toRpcHeader(d);
if (m.getKnownClass().isAnnotationPresent(PushResolution.class))
return m.toRpcHeader(protoSerializerService.serializeToJObjectDataP(d));
else
return m.toRpcHeader(null);
});
obj.markSeen();
builder.setHeader(header);

View File

@@ -1,10 +1,10 @@
package com.usatiuk.dhfs.objects.repository;
import com.google.protobuf.ByteString;
import com.usatiuk.dhfs.SerializationHelper;
import com.usatiuk.dhfs.objects.jrepository.DeletedObjectAccessException;
import com.usatiuk.dhfs.objects.jrepository.JObject;
import com.usatiuk.dhfs.objects.jrepository.JObjectManager;
import com.usatiuk.dhfs.objects.persistence.JObjectDataP;
import com.usatiuk.dhfs.objects.protoserializer.ProtoSerializerService;
import com.usatiuk.dhfs.objects.repository.autosync.AutoSyncProcessor;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
@@ -40,6 +40,9 @@ public class RemoteObjectServiceServer implements DhfsObjectSyncGrpc {
@Inject
InvalidationQueueService invalidationQueueService;
@Inject
ProtoSerializerService protoSerializerService;
@Override
@Blocking
public Uni<GetObjectReply> getObject(GetObjectRequest request) {
@@ -51,13 +54,13 @@ public class RemoteObjectServiceServer implements DhfsObjectSyncGrpc {
var obj = jObjectManager.get(request.getName()).orElseThrow(() -> new StatusRuntimeException(Status.NOT_FOUND));
Pair<ObjectHeader, ByteString> read = obj.runReadLocked(JObject.ResolutionStrategy.LOCAL_ONLY, (meta, data) -> {
Pair<ObjectHeader, JObjectDataP> read = obj.runReadLocked(JObject.ResolutionStrategy.LOCAL_ONLY, (meta, data) -> {
if (data == null) {
Log.info("<-- getObject FAIL: " + request.getName() + " from " + request.getSelfUuid());
throw new StatusRuntimeException(Status.ABORTED.withDescription("Not available locally"));
}
data.extractRefs().forEach(ref -> jObjectManager.get(ref).ifPresent(JObject::markSeen));
return Pair.of(meta.toRpcHeader(), SerializationHelper.serialize(data));
return Pair.of(meta.toRpcHeader(), protoSerializerService.serializeToJObjectDataP(data));
});
obj.markSeen();
var replyObj = ApiObject.newBuilder().setHeader(read.getLeft()).setContent(read.getRight()).build();

View File

@@ -4,6 +4,7 @@ import com.usatiuk.dhfs.SerializationHelper;
import com.usatiuk.dhfs.objects.jrepository.JObject;
import com.usatiuk.dhfs.objects.jrepository.JObjectData;
import com.usatiuk.dhfs.objects.jrepository.JObjectManager;
import com.usatiuk.dhfs.objects.protoserializer.ProtoSerializerService;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.quarkus.logging.Log;
@@ -32,6 +33,8 @@ public class SyncHandler {
Instance<ConflictResolver> conflictResolvers;
@Inject
PersistentRemoteHostsService persistentRemoteHostsService;
@Inject
ProtoSerializerService protoSerializerService;
public void doInitialResync(UUID host) {
Log.info("Doing initial resync for " + host);
@@ -104,12 +107,12 @@ public class SyncHandler {
md.getChangelog().putAll(receivedMap);
md.getChangelog().putIfAbsent(persistentRemoteHostsService.getSelfUuid(), 0L);
if (header.hasPushedData())
found.externalResolution(SerializationHelper.deserialize(header.getPushedData()));
found.externalResolution(protoSerializerService.deserialize(header.getPushedData()));
return false;
} else if (data == null && header.hasPushedData()) {
found.tryResolve(JObject.ResolutionStrategy.LOCAL_ONLY);
if (found.getData() == null)
found.externalResolution(SerializationHelper.deserialize(header.getPushedData()));
found.externalResolution(protoSerializerService.deserialize(header.getPushedData()));
}
assert Objects.equals(receivedTotalVer, md.getOurVersion());
@@ -127,10 +130,10 @@ public class SyncHandler {
ObjectHeader theirsHeader;
if (header.hasPushedData()) {
theirsHeader = header;
theirsData = SerializationHelper.deserialize(header.getPushedData());
theirsData = protoSerializerService.deserialize(header.getPushedData());
} else {
var got = remoteObjectServiceClient.getSpecificObject(from, header.getName());
theirsData = SerializationHelper.deserialize(got.getRight());
theirsData = protoSerializerService.deserialize(got.getRight());
theirsHeader = got.getLeft();
}

View File

@@ -0,0 +1,24 @@
package com.usatiuk.dhfs.objects.repository.peersync;
import com.usatiuk.dhfs.objects.persistence.PeerDirectoryP;
import com.usatiuk.dhfs.objects.protoserializer.ProtoDeserializer;
import com.usatiuk.dhfs.objects.protoserializer.ProtoSerializer;
import jakarta.enterprise.context.ApplicationScoped;
import java.util.Objects;
import java.util.UUID;
@ApplicationScoped
public class PeerDirectorySerializer implements ProtoSerializer<PeerDirectoryP, PeerDirectory>, ProtoDeserializer<PeerDirectoryP, PeerDirectory> {
@Override
public PeerDirectory deserialize(PeerDirectoryP message) {
var ret = new PeerDirectory();
message.getPeersList().stream().map(UUID::fromString).forEach(ret.getPeers()::add);
return ret;
}
@Override
public PeerDirectoryP serialize(PeerDirectory object) {
return PeerDirectoryP.newBuilder().addAllPeers(() -> object.getPeers().stream().map(Objects::toString).iterator()).build();
}
}

View File

@@ -0,0 +1,39 @@
package com.usatiuk.dhfs.objects.repository.peersync;
import com.google.protobuf.UnsafeByteOperations;
import com.usatiuk.dhfs.objects.persistence.PersistentPeerInfoP;
import com.usatiuk.dhfs.objects.protoserializer.ProtoDeserializer;
import com.usatiuk.dhfs.objects.protoserializer.ProtoSerializer;
import com.usatiuk.dhfs.objects.repository.CertificateTools;
import jakarta.enterprise.context.ApplicationScoped;
import java.security.cert.CertificateEncodingException;
import java.security.cert.CertificateException;
import java.util.UUID;
@ApplicationScoped
public class PersistentPeerInfoSerializer implements ProtoSerializer<PersistentPeerInfoP, PersistentPeerInfo>, ProtoDeserializer<PersistentPeerInfoP, PersistentPeerInfo> {
@Override
public PersistentPeerInfo deserialize(PersistentPeerInfoP message) {
try {
return new PersistentPeerInfo(
UUID.fromString(message.getUuid()),
CertificateTools.certFromBytes(message.getCert().toByteArray())
);
} catch (CertificateException e) {
throw new RuntimeException(e);
}
}
@Override
public PersistentPeerInfoP serialize(PersistentPeerInfo object) {
try {
return PersistentPeerInfoP.newBuilder()
.setUuid(object.getUuid().toString())
.setCert(UnsafeByteOperations.unsafeWrap(object.getCertificate().getEncoded()))
.build();
} catch (CertificateEncodingException e) {
throw new RuntimeException(e);
}
}
}

View File

@@ -1,7 +1,6 @@
package com.usatiuk.dhfs.objects.repository.persistence;
import com.google.protobuf.ByteString;
import com.google.protobuf.UnsafeByteOperations;
import com.usatiuk.dhfs.objects.persistence.BlobP;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.quarkus.logging.Log;
@@ -13,6 +12,8 @@ import jakarta.enterprise.event.Observes;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import javax.annotation.Nonnull;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.file.Files;
@@ -65,12 +66,12 @@ public class FileObjectPersistentStore implements ObjectPersistentStore {
@Nonnull
@Override
public ByteString readObject(String name) {
public BlobP readObject(String name) {
var file = Path.of(root, name);
try {
return UnsafeByteOperations.unsafeWrap(Files.readAllBytes(file));
} catch (NoSuchFileException fx) {
try (var fs = new FileInputStream(file.toFile())) {
return BlobP.parseFrom(fs);
} catch (FileNotFoundException | NoSuchFileException fx) {
throw new StatusRuntimeException(Status.NOT_FOUND);
} catch (IOException e) {
Log.error("Error reading file " + file, e);
@@ -79,14 +80,12 @@ public class FileObjectPersistentStore implements ObjectPersistentStore {
}
@Override
public void writeObject(String name, ByteString data) {
public void writeObject(String name, BlobP data) {
var file = Path.of(root, name);
try {
try (var fc = new FileOutputStream(file.toFile(), false);
var ch = fc.getChannel().truncate(0)) {
if (ch.write(data.asReadOnlyByteBuffer()) != data.size())
throw new StatusRuntimeException(Status.INTERNAL.withDescription("Could not write all bytes to file"));
try (var fc = new FileOutputStream(file.toFile(), false)) {
data.writeTo(fc);
}
} catch (IOException e) {
Log.error("Error writing file " + file, e);

View File

@@ -1,6 +1,7 @@
package com.usatiuk.dhfs.objects.repository.persistence;
import com.google.protobuf.ByteString;
import com.usatiuk.dhfs.objects.persistence.BlobP;
import javax.annotation.Nonnull;
import java.util.List;
@@ -13,9 +14,9 @@ public interface ObjectPersistentStore {
Boolean existsObject(String name);
@Nonnull
ByteString readObject(String name);
BlobP readObject(String name);
void writeObject(String name, ByteString data);
void writeObject(String name, BlobP data);
void deleteObject(String name);
}

View File

@@ -0,0 +1,77 @@
syntax = "proto3";
option java_multiple_files = true;
option java_package = "com.usatiuk.dhfs.objects.persistence";
option java_outer_classname = "DhfsObjectPersistence";
package dhfs.objects.persistence;
message ObjectMetadataP {
string name = 1;
map<string, int64> remoteCopies = 2;
string knownClass = 3;
bool seen = 4;
bool deleted = 5;
repeated string confirmedDeletes = 6;
repeated string referrers = 7;
map<string, int64> changelog = 8;
repeated string savedRefs = 9;
bool locked = 10;
bool haveLocalCopy = 11;
}
message FsNodeP {
string uuid = 1;
int64 mode = 2;
int64 ctime = 3;
int64 mtime = 4;
}
message FileP {
FsNodeP fsNode = 1;
map<int64, string> chunks = 2;
string parent = 3;
bool symlink = 4;
}
message DirectoryP {
FsNodeP fsNode = 1;
map<string, string> children = 2;
}
message ChunkInfoP {
string name = 1;
int32 size = 2;
}
message ChunkDataP {
string name = 1;
bytes data = 2;
}
message PeerDirectoryP {
repeated string peers = 1;
}
message PersistentPeerInfoP {
string uuid = 1;
bytes cert = 2;
}
message JObjectDataP {
oneof obj {
FileP file = 2;
DirectoryP directory = 3;
ChunkInfoP chunkInfo = 4;
ChunkDataP chunkData = 5;
PeerDirectoryP peerDirectory = 6;
PersistentPeerInfoP persistentPeerInfo = 7;
}
}
message BlobP {
oneof dtype {
ObjectMetadataP metadata = 1;
JObjectDataP data = 2;
}
}

View File

@@ -1,5 +1,7 @@
syntax = "proto3";
import "dhfs_objects_serial.proto";
option java_multiple_files = true;
option java_package = "com.usatiuk.dhfs.objects.repository";
option java_outer_classname = "DhfsObjectSyncApi";
@@ -35,12 +37,12 @@ message ObjectChangelog {
message ObjectHeader {
string name = 2;
ObjectChangelog changelog = 5;
optional bytes pushedData = 6;
optional dhfs.objects.persistence.JObjectDataP pushedData = 6;
}
message ApiObject {
ObjectHeader header = 1;
bytes content = 2;
dhfs.objects.persistence.JObjectDataP content = 2;
}
message GetObjectRequest {

View File

@@ -12,7 +12,7 @@ dhfs.objects.reconnect_interval=5s
dhfs.fuse.root=${HOME}/dhfs_data/dhfs_fuse_root
dhfs.fuse.debug=false
dhfs.fuse.enabled=true
dhfs.files.target_chunk_size=4194304
dhfs.files.target_chunk_size=524288
# Writes strictly smaller than this will try to merge with blocks nearby
dhfs.files.write_merge_threshold=0.8
# If a merge would result in a block of greater size than this, stop merging

View File

@@ -64,7 +64,7 @@ public class DhfsFileServiceSimpleTestImpl {
ChunkInfo c2i = new ChunkInfo(c2.getHash(), c2.getBytes().size());
ChunkData c3 = new ChunkData(ByteString.copyFrom("91011".getBytes()));
ChunkInfo c3i = new ChunkInfo(c3.getHash(), c3.getBytes().size());
File f = new File(fuuid, 777, null, false);
File f = new File(fuuid, 777, new UUID(0, 0), false);
f.getChunks().put(0L, c1.getHash());
f.getChunks().put((long) c1.getBytes().size(), c2.getHash());
f.getChunks().put((long) c1.getBytes().size() + c2.getBytes().size(), c3.getHash());

View File

@@ -0,0 +1,30 @@
package com.usatiuk.dhfs.persistence;
import com.usatiuk.dhfs.objects.persistence.BlobP;
import com.usatiuk.dhfs.objects.persistence.JObjectDataP;
import com.usatiuk.dhfs.objects.persistence.PeerDirectoryP;
import com.usatiuk.dhfs.objects.protoserializer.ProtoSerializerService;
import com.usatiuk.dhfs.objects.repository.peersync.PeerDirectory;
import io.quarkus.test.junit.QuarkusTest;
import jakarta.inject.Inject;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.util.UUID;
@QuarkusTest
public class ProtoSerializationTest {
@Inject
ProtoSerializerService protoSerializerService;
@Test
void SerializeDeserializePeerDirectory() {
var pd = new PeerDirectory();
pd.getPeers().add(UUID.randomUUID());
var ser = BlobP.newBuilder().setData(JObjectDataP.newBuilder().setPeerDirectory((PeerDirectoryP) protoSerializerService.serialize(pd)).build()).build();
var deser = (PeerDirectory) protoSerializerService.deserialize(ser);
Assertions.assertIterableEquals(pd.getPeers(), deser.getPeers());
}
}