mirror of
https://github.com/usatiuk/dhfs.git
synced 2025-10-29 04:57:48 +01:00
use local store only in jobjectmanager
This commit is contained in:
@@ -91,6 +91,7 @@ public class JObject<T extends JObjectData> implements Serializable {
|
||||
var ver = _metaPart.getOurVersion();
|
||||
VoidFn invalidateFn = () -> {
|
||||
_dataPart.set(null);
|
||||
_resolver.removeLocal(_metaPart.getName());
|
||||
};
|
||||
var ret = fn.apply(_metaPart, () -> _resolver.bumpVersionSelf(this), invalidateFn);
|
||||
if (!Objects.equals(ver, _metaPart.getOurVersion()))
|
||||
|
||||
@@ -9,7 +9,6 @@ import jakarta.enterprise.context.ApplicationScoped;
|
||||
import jakarta.inject.Inject;
|
||||
import lombok.Getter;
|
||||
import org.apache.commons.lang3.NotImplementedException;
|
||||
import org.eclipse.microprofile.config.inject.ConfigProperty;
|
||||
|
||||
import java.lang.ref.ReferenceQueue;
|
||||
import java.lang.ref.SoftReference;
|
||||
|
||||
@@ -4,6 +4,9 @@ import com.usatiuk.dhfs.storage.DeserializationHelper;
|
||||
import com.usatiuk.dhfs.storage.objects.repository.distributed.InvalidationQueueService;
|
||||
import com.usatiuk.dhfs.storage.objects.repository.distributed.RemoteObjectServiceClient;
|
||||
import com.usatiuk.dhfs.storage.objects.repository.persistence.ObjectPersistentStore;
|
||||
import io.grpc.Status;
|
||||
import io.grpc.StatusRuntimeException;
|
||||
import io.quarkus.logging.Log;
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
import jakarta.inject.Inject;
|
||||
import org.apache.commons.lang3.SerializationUtils;
|
||||
@@ -32,6 +35,18 @@ public class JObjectResolver {
|
||||
return DeserializationHelper.deserialize(obj);
|
||||
}
|
||||
|
||||
public void removeLocal(String name) {
|
||||
try {
|
||||
Log.info("Deleting " + name);
|
||||
objectPersistentStore.deleteObject(name);
|
||||
} catch (StatusRuntimeException sx) {
|
||||
if (sx.getStatus() != Status.NOT_FOUND)
|
||||
Log.info("Couldn't delete object from persistent store: ", sx);
|
||||
} catch (Exception e) {
|
||||
Log.info("Couldn't delete object from persistent store: ", e);
|
||||
}
|
||||
}
|
||||
|
||||
public void notifyWrite(JObject<?> self) {
|
||||
objectPersistentStore.writeObject("meta_" + self.getName(), self.runReadLocked((m) -> SerializationUtils.serialize(m)));
|
||||
if (self.isResolved()) {
|
||||
|
||||
@@ -3,7 +3,6 @@ package com.usatiuk.dhfs.storage.objects.repository.distributed;
|
||||
import com.google.protobuf.ByteString;
|
||||
import com.usatiuk.dhfs.objects.repository.distributed.*;
|
||||
import com.usatiuk.dhfs.storage.objects.jrepository.JObjectManager;
|
||||
import com.usatiuk.dhfs.storage.objects.repository.persistence.ObjectPersistentStore;
|
||||
import io.grpc.Status;
|
||||
import io.grpc.StatusRuntimeException;
|
||||
import io.quarkus.grpc.GrpcService;
|
||||
@@ -21,9 +20,6 @@ public class RemoteObjectServiceServer implements DhfsObjectSyncGrpc {
|
||||
@ConfigProperty(name = "dhfs.objects.distributed.selfname")
|
||||
String selfname;
|
||||
|
||||
@Inject
|
||||
ObjectPersistentStore objectPersistentStore;
|
||||
|
||||
@Inject
|
||||
SyncHandler syncHandler;
|
||||
|
||||
@@ -45,7 +41,6 @@ public class RemoteObjectServiceServer implements DhfsObjectSyncGrpc {
|
||||
var obj = jObjectManager.get(request.getName()).orElseThrow(() -> new StatusRuntimeException(Status.NOT_FOUND));
|
||||
|
||||
Pair<ObjectHeader, byte[]> read = obj.runReadLocked((meta, data) -> {
|
||||
byte[] bytes = objectPersistentStore.readObject(request.getName());
|
||||
return Pair.of(meta.toRpcHeader(), SerializationUtils.serialize(data));
|
||||
});
|
||||
var replyObj = ApiObject.newBuilder().setHeader(read.getLeft()).setContent(ByteString.copyFrom(read.getRight())).build();
|
||||
|
||||
@@ -6,9 +6,6 @@ import com.usatiuk.dhfs.objects.repository.distributed.ObjectChangelogEntry;
|
||||
import com.usatiuk.dhfs.storage.objects.jrepository.JObject;
|
||||
import com.usatiuk.dhfs.storage.objects.jrepository.JObjectData;
|
||||
import com.usatiuk.dhfs.storage.objects.jrepository.JObjectManager;
|
||||
import com.usatiuk.dhfs.storage.objects.repository.persistence.ObjectPersistentStore;
|
||||
import io.grpc.Status;
|
||||
import io.grpc.StatusRuntimeException;
|
||||
import io.quarkus.logging.Log;
|
||||
import io.quarkus.runtime.ShutdownEvent;
|
||||
import io.quarkus.runtime.StartupEvent;
|
||||
@@ -28,9 +25,6 @@ public class SyncHandler {
|
||||
@ConfigProperty(name = "dhfs.objects.distributed.selfname")
|
||||
String selfname;
|
||||
|
||||
@Inject
|
||||
ObjectPersistentStore objectPersistentStore;
|
||||
|
||||
@Inject
|
||||
JObjectManager jObjectManager;
|
||||
|
||||
@@ -119,15 +113,6 @@ public class SyncHandler {
|
||||
// md.getBestVersion() > md.getTotalVersion() should also work
|
||||
if (receivedTotalVer > md.getOurVersion()) {
|
||||
invalidate.apply();
|
||||
try {
|
||||
Log.info("Deleting " + request.getHeader().getName() + " as per invalidation from " + request.getSelfname());
|
||||
objectPersistentStore.deleteObject(request.getHeader().getName());
|
||||
} catch (StatusRuntimeException sx) {
|
||||
if (sx.getStatus() != Status.NOT_FOUND)
|
||||
Log.info("Couldn't delete object from persistent store: ", sx);
|
||||
} catch (Exception e) {
|
||||
Log.info("Couldn't delete object from persistent store: ", e);
|
||||
}
|
||||
}
|
||||
|
||||
md.getChangelog().clear();
|
||||
|
||||
Reference in New Issue
Block a user