mirror of
https://github.com/usatiuk/dhfs.git
synced 2025-10-29 04:57:48 +01:00
less copies 2
This commit is contained in:
@@ -1,13 +1,17 @@
|
||||
package com.usatiuk.dhfs.storage;
|
||||
|
||||
import com.google.protobuf.ByteString;
|
||||
import com.google.protobuf.UnsafeByteOperations;
|
||||
import com.usatiuk.dhfs.storage.files.objects.File;
|
||||
import org.apache.commons.io.input.ClassLoaderObjectInputStream;
|
||||
import org.apache.commons.lang3.SerializationUtils;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.Serializable;
|
||||
|
||||
public abstract class DeserializationHelper {
|
||||
public abstract class SerializationHelper {
|
||||
|
||||
// Taken from SerializationUtils
|
||||
public static <T> T deserialize(final InputStream inputStream) {
|
||||
@@ -22,4 +26,12 @@ public abstract class DeserializationHelper {
|
||||
public static <T> T deserialize(final byte[] objectData) {
|
||||
return deserialize(new ByteArrayInputStream(objectData));
|
||||
}
|
||||
|
||||
public static <T> T deserialize(final ByteString objectData) {
|
||||
return deserialize(objectData.newInput());
|
||||
}
|
||||
|
||||
public static <T extends Serializable> ByteString serialize(final T obj) {
|
||||
return UnsafeByteOperations.unsafeWrap(SerializationUtils.serialize(obj));
|
||||
}
|
||||
}
|
||||
@@ -1,7 +1,7 @@
|
||||
package com.usatiuk.dhfs.storage.files.objects;
|
||||
|
||||
import com.usatiuk.dhfs.objects.repository.distributed.ObjectHeader;
|
||||
import com.usatiuk.dhfs.storage.DeserializationHelper;
|
||||
import com.usatiuk.dhfs.storage.SerializationHelper;
|
||||
import com.usatiuk.dhfs.storage.objects.jrepository.JObjectManager;
|
||||
import com.usatiuk.dhfs.storage.objects.repository.distributed.ConflictResolver;
|
||||
import com.usatiuk.dhfs.storage.objects.repository.distributed.ObjectMetadata;
|
||||
@@ -45,7 +45,7 @@ public class DirectoryConflictResolver implements ConflictResolver {
|
||||
var oursHeader = oursData.runReadLocked(ObjectMetadata::toRpcHeader);
|
||||
var theirsHeader = theirsData.getLeft();
|
||||
|
||||
var theirsDir = (Directory) DeserializationHelper.deserialize(theirsData.getRight());
|
||||
var theirsDir = (Directory) SerializationHelper.deserialize(theirsData.getRight());
|
||||
if (!theirsDir.getClass().equals(Directory.class)) {
|
||||
Log.error("Object type mismatch!");
|
||||
throw new NotImplementedException();
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package com.usatiuk.dhfs.storage.objects.jrepository;
|
||||
|
||||
import com.usatiuk.dhfs.storage.DeserializationHelper;
|
||||
import com.google.protobuf.ByteString;
|
||||
import com.usatiuk.dhfs.storage.SerializationHelper;
|
||||
import com.usatiuk.dhfs.storage.objects.repository.distributed.ObjectMetadata;
|
||||
import com.usatiuk.dhfs.storage.objects.repository.persistence.ObjectPersistentStore;
|
||||
import io.grpc.Status;
|
||||
@@ -127,14 +128,14 @@ public class JObjectManagerImpl implements JObjectManager {
|
||||
if (inMap != null) return Optional.of(inMap);
|
||||
}
|
||||
|
||||
byte[] readMd;
|
||||
ByteString readMd;
|
||||
try {
|
||||
readMd = objectPersistentStore.readObject("meta_" + name);
|
||||
} catch (StatusRuntimeException ex) {
|
||||
if (!ex.getStatus().equals(Status.NOT_FOUND)) throw ex;
|
||||
return Optional.empty();
|
||||
}
|
||||
var meta = DeserializationHelper.deserialize(readMd);
|
||||
var meta = SerializationHelper.deserialize(readMd);
|
||||
if (!(meta instanceof ObjectMetadata))
|
||||
throw new NotImplementedException("Unexpected metadata type for " + name);
|
||||
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
package com.usatiuk.dhfs.storage.objects.jrepository;
|
||||
|
||||
import com.usatiuk.dhfs.storage.DeserializationHelper;
|
||||
import com.usatiuk.dhfs.storage.SerializationHelper;
|
||||
import com.usatiuk.dhfs.storage.objects.repository.distributed.InvalidationQueueService;
|
||||
import com.usatiuk.dhfs.storage.objects.repository.distributed.RemoteObjectServiceClient;
|
||||
import com.usatiuk.dhfs.storage.objects.repository.persistence.ObjectPersistentStore;
|
||||
@@ -30,11 +30,11 @@ public class JObjectResolver {
|
||||
|
||||
public <T extends JObjectData> T resolveData(JObject<T> jObject) {
|
||||
if (objectPersistentStore.existsObject(jObject.getName()))
|
||||
return DeserializationHelper.deserialize(objectPersistentStore.readObject(jObject.getName()));
|
||||
return SerializationHelper.deserialize(objectPersistentStore.readObject(jObject.getName()));
|
||||
|
||||
var obj = remoteObjectServiceClient.getObject(jObject);
|
||||
objectPersistentStore.writeObject(jObject.getName(), obj);
|
||||
return DeserializationHelper.deserialize(obj);
|
||||
return SerializationHelper.deserialize(obj);
|
||||
}
|
||||
|
||||
public void removeLocal(JObject<?> jObject, String name) {
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
package com.usatiuk.dhfs.storage.objects.jrepository;
|
||||
|
||||
import com.usatiuk.dhfs.storage.SerializationHelper;
|
||||
import com.usatiuk.dhfs.storage.objects.repository.persistence.ObjectPersistentStore;
|
||||
import io.quarkus.logging.Log;
|
||||
import io.quarkus.runtime.ShutdownEvent;
|
||||
@@ -8,7 +9,6 @@ import jakarta.annotation.Priority;
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
import jakarta.enterprise.event.Observes;
|
||||
import jakarta.inject.Inject;
|
||||
import org.apache.commons.lang3.SerializationUtils;
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
|
||||
import java.util.ArrayList;
|
||||
@@ -87,10 +87,10 @@ public class JObjectWriteback {
|
||||
|
||||
private void flushOne(JObject<?> obj) {
|
||||
obj.runReadLocked((m) -> {
|
||||
objectPersistentStore.writeObject("meta_" + m.getName(), SerializationUtils.serialize(m));
|
||||
objectPersistentStore.writeObject("meta_" + m.getName(), SerializationHelper.serialize(m));
|
||||
if (obj.isResolved())
|
||||
obj.runReadLocked((m2, d) -> {
|
||||
objectPersistentStore.writeObject(m.getName(), SerializationUtils.serialize(d));
|
||||
objectPersistentStore.writeObject(m.getName(), SerializationHelper.serialize(d));
|
||||
return null;
|
||||
});
|
||||
jObjectManager.onWriteback(m.getName());
|
||||
@@ -100,10 +100,10 @@ public class JObjectWriteback {
|
||||
|
||||
private void flushOneImmediate(JObject<?> obj) {
|
||||
obj.runWriteLockedMeta((m, a, b) -> {
|
||||
objectPersistentStore.writeObject("meta_" + m.getName(), SerializationUtils.serialize(m));
|
||||
objectPersistentStore.writeObject("meta_" + m.getName(), SerializationHelper.serialize(m));
|
||||
if (obj.isResolved())
|
||||
obj.runWriteLocked((m2, d, bump) -> {
|
||||
objectPersistentStore.writeObject(m.getName(), SerializationUtils.serialize(d));
|
||||
objectPersistentStore.writeObject(m.getName(), SerializationHelper.serialize(d));
|
||||
return null;
|
||||
});
|
||||
jObjectManager.onWriteback(m.getName());
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
package com.usatiuk.dhfs.storage.objects.repository.distributed;
|
||||
|
||||
import com.usatiuk.dhfs.storage.DeserializationHelper;
|
||||
import com.usatiuk.dhfs.storage.SerializationHelper;
|
||||
import io.quarkus.logging.Log;
|
||||
import io.quarkus.runtime.ShutdownEvent;
|
||||
import io.quarkus.runtime.StartupEvent;
|
||||
@@ -32,7 +32,7 @@ public class PersistentRemoteHostsService {
|
||||
Log.info("Initializing with root " + dataRoot);
|
||||
if (Paths.get(dataRoot).resolve(dataFileName).toFile().exists()) {
|
||||
Log.info("Reading hosts");
|
||||
_persistentData = DeserializationHelper.deserialize(Files.readAllBytes(Paths.get(dataRoot).resolve(dataFileName)));
|
||||
_persistentData = SerializationHelper.deserialize(Files.readAllBytes(Paths.get(dataRoot).resolve(dataFileName)));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
package com.usatiuk.dhfs.storage.objects.repository.distributed;
|
||||
|
||||
import com.google.protobuf.ByteString;
|
||||
import com.usatiuk.dhfs.objects.repository.distributed.*;
|
||||
import com.usatiuk.dhfs.storage.objects.jrepository.JObject;
|
||||
import com.usatiuk.dhfs.storage.objects.jrepository.JObjectManager;
|
||||
@@ -24,14 +25,14 @@ public class RemoteObjectServiceClient {
|
||||
@Inject
|
||||
JObjectManager jObjectManager;
|
||||
|
||||
public Pair<ObjectHeader, byte[]> getSpecificObject(String host, String name) {
|
||||
public Pair<ObjectHeader, ByteString> getSpecificObject(String host, String name) {
|
||||
return remoteHostManager.withClient(host, client -> {
|
||||
var reply = client.getObject(GetObjectRequest.newBuilder().setSelfname(selfname).setName(name).build());
|
||||
return Pair.of(reply.getObject().getHeader(), reply.getObject().getContent().toByteArray());
|
||||
return Pair.of(reply.getObject().getHeader(), reply.getObject().getContent());
|
||||
});
|
||||
}
|
||||
|
||||
public byte[] getObject(JObject<?> jObject) {
|
||||
public ByteString getObject(JObject<?> jObject) {
|
||||
// Assert lock?
|
||||
var targets = jObject.runReadLocked(d -> {
|
||||
var bestVer = d.getBestVersion();
|
||||
@@ -57,7 +58,7 @@ public class RemoteObjectServiceClient {
|
||||
Log.error("Race when trying to fetch");
|
||||
throw new NotImplementedException();
|
||||
}
|
||||
return reply.getObject().getContent().toByteArray();
|
||||
return reply.getObject().getContent();
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
@@ -2,6 +2,7 @@ package com.usatiuk.dhfs.storage.objects.repository.distributed;
|
||||
|
||||
import com.google.protobuf.ByteString;
|
||||
import com.usatiuk.dhfs.objects.repository.distributed.*;
|
||||
import com.usatiuk.dhfs.storage.SerializationHelper;
|
||||
import com.usatiuk.dhfs.storage.objects.jrepository.JObjectManager;
|
||||
import io.grpc.Status;
|
||||
import io.grpc.StatusRuntimeException;
|
||||
@@ -10,7 +11,6 @@ import io.quarkus.logging.Log;
|
||||
import io.smallrye.common.annotation.Blocking;
|
||||
import io.smallrye.mutiny.Uni;
|
||||
import jakarta.inject.Inject;
|
||||
import org.apache.commons.lang3.SerializationUtils;
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
import org.eclipse.microprofile.config.inject.ConfigProperty;
|
||||
|
||||
@@ -40,8 +40,8 @@ public class RemoteObjectServiceServer implements DhfsObjectSyncGrpc {
|
||||
|
||||
var obj = jObjectManager.get(request.getName()).orElseThrow(() -> new StatusRuntimeException(Status.NOT_FOUND));
|
||||
|
||||
Pair<ObjectHeader, byte[]> read = obj.runReadLocked((meta, data) -> Pair.of(meta.toRpcHeader(), SerializationUtils.serialize(data)));
|
||||
var replyObj = ApiObject.newBuilder().setHeader(read.getLeft()).setContent(ByteString.copyFrom(read.getRight())).build();
|
||||
Pair<ObjectHeader, ByteString> read = obj.runReadLocked((meta, data) -> Pair.of(meta.toRpcHeader(), SerializationHelper.serialize(data)));
|
||||
var replyObj = ApiObject.newBuilder().setHeader(read.getLeft()).setContent(read.getRight()).build();
|
||||
return Uni.createFrom().item(GetObjectReply.newBuilder().setObject(replyObj).build());
|
||||
}
|
||||
|
||||
|
||||
@@ -1,5 +1,7 @@
|
||||
package com.usatiuk.dhfs.storage.objects.repository.persistence;
|
||||
|
||||
import com.google.protobuf.ByteString;
|
||||
import com.google.protobuf.UnsafeByteOperations;
|
||||
import io.grpc.Status;
|
||||
import io.grpc.StatusRuntimeException;
|
||||
import io.quarkus.logging.Log;
|
||||
@@ -11,11 +13,11 @@ import jakarta.enterprise.event.Observes;
|
||||
import org.eclipse.microprofile.config.inject.ConfigProperty;
|
||||
|
||||
import javax.annotation.Nonnull;
|
||||
import java.io.FileOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.Paths;
|
||||
import java.nio.file.StandardOpenOption;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
@@ -62,14 +64,14 @@ public class FileObjectPersistentStore implements ObjectPersistentStore {
|
||||
|
||||
@Nonnull
|
||||
@Override
|
||||
public byte[] readObject(String name) {
|
||||
public ByteString readObject(String name) {
|
||||
var file = Path.of(root, name);
|
||||
|
||||
if (!file.toFile().exists())
|
||||
throw new StatusRuntimeException(Status.NOT_FOUND);
|
||||
|
||||
try {
|
||||
return Files.readAllBytes(file);
|
||||
return UnsafeByteOperations.unsafeWrap(Files.readAllBytes(file));
|
||||
} catch (IOException e) {
|
||||
Log.error("Error reading file " + file, e);
|
||||
throw new StatusRuntimeException(Status.INTERNAL);
|
||||
@@ -78,7 +80,7 @@ public class FileObjectPersistentStore implements ObjectPersistentStore {
|
||||
|
||||
@Nonnull
|
||||
@Override
|
||||
public void writeObject(String name, byte[] data) {
|
||||
public void writeObject(String name, ByteString data) {
|
||||
var file = Path.of(root, name);
|
||||
|
||||
if (!Paths.get(root).toFile().isDirectory()
|
||||
@@ -86,7 +88,11 @@ public class FileObjectPersistentStore implements ObjectPersistentStore {
|
||||
throw new StatusRuntimeException(Status.INTERNAL);
|
||||
|
||||
try {
|
||||
Files.write(file, data, StandardOpenOption.TRUNCATE_EXISTING, StandardOpenOption.CREATE);
|
||||
file.toFile().createNewFile();
|
||||
try (var fc = new FileOutputStream(file.toFile())) {
|
||||
if (fc.getChannel().write(data.asReadOnlyByteBuffer()) != data.size())
|
||||
throw new StatusRuntimeException(Status.INTERNAL.withDescription("Could not write all bytes to file"));
|
||||
}
|
||||
} catch (IOException e) {
|
||||
Log.error("Error writing file " + file, e);
|
||||
throw new StatusRuntimeException(Status.INTERNAL);
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
package com.usatiuk.dhfs.storage.objects.repository.persistence;
|
||||
|
||||
import io.smallrye.mutiny.Multi;
|
||||
import io.smallrye.mutiny.Uni;
|
||||
import com.google.protobuf.ByteString;
|
||||
|
||||
import javax.annotation.Nonnull;
|
||||
import java.util.List;
|
||||
@@ -13,9 +12,9 @@ public interface ObjectPersistentStore {
|
||||
Boolean existsObject(String name);
|
||||
|
||||
@Nonnull
|
||||
byte[] readObject(String name);
|
||||
ByteString readObject(String name);
|
||||
@Nonnull
|
||||
void writeObject(String name, byte[] data);
|
||||
void writeObject(String name, ByteString data);
|
||||
@Nonnull
|
||||
void deleteObject(String name);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user