mirror of
https://github.com/usatiuk/dhfs.git
synced 2025-10-29 04:57:48 +01:00
JObject writeback
This commit is contained in:
@@ -472,7 +472,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
|
||||
@Override
|
||||
public Boolean setTimes(String fileUuid, long atimeMs, long mtimeMs) {
|
||||
var fileOpt = jObjectManager.get(fileUuid, File.class);
|
||||
var fileOpt = jObjectManager.get(fileUuid, FsNode.class);
|
||||
if (fileOpt.isEmpty()) {
|
||||
Log.error("File not found when trying to read: " + fileUuid);
|
||||
return false;
|
||||
|
||||
@@ -7,7 +7,6 @@ import org.apache.commons.lang3.NotImplementedException;
|
||||
import java.io.Serializable;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.concurrent.locks.ReadWriteLock;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
|
||||
public class JObject<T extends JObjectData> implements Serializable {
|
||||
@@ -28,7 +27,7 @@ public class JObject<T extends JObjectData> implements Serializable {
|
||||
return runReadLocked(ObjectMetadata::getName);
|
||||
}
|
||||
|
||||
protected final ReadWriteLock _lock = new ReentrantReadWriteLock();
|
||||
protected final ReentrantReadWriteLock _lock = new ReentrantReadWriteLock();
|
||||
private final ObjectMetadata _metaPart;
|
||||
private final JObjectResolver _resolver;
|
||||
private final AtomicReference<T> _dataPart = new AtomicReference<>();
|
||||
@@ -91,7 +90,7 @@ public class JObject<T extends JObjectData> implements Serializable {
|
||||
var ver = _metaPart.getOurVersion();
|
||||
VoidFn invalidateFn = () -> {
|
||||
_dataPart.set(null);
|
||||
_resolver.removeLocal(_metaPart.getName());
|
||||
_resolver.removeLocal(this, _metaPart.getName());
|
||||
};
|
||||
var ret = fn.apply(_metaPart, () -> _resolver.bumpVersionSelf(this), invalidateFn);
|
||||
if (!Objects.equals(ver, _metaPart.getOurVersion()))
|
||||
@@ -140,4 +139,9 @@ public class JObject<T extends JObjectData> implements Serializable {
|
||||
_lock.writeLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
public void assertRWLock() {
|
||||
if (!_lock.isWriteLockedByCurrentThread())
|
||||
throw new IllegalStateException("Expected to be write-locked there: " + getName() + " " + Thread.currentThread().getName());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -38,7 +38,7 @@ public class JObjectManagerImpl implements JObjectManager {
|
||||
private void cleanup() {
|
||||
NamedSoftReference cur;
|
||||
while ((cur = (NamedSoftReference) _refQueue.poll()) != null) {
|
||||
synchronized (_map) {
|
||||
synchronized (this) {
|
||||
if (_map.containsKey(cur._key) && (_map.get(cur._key).get() == null))
|
||||
_map.remove(cur._key);
|
||||
}
|
||||
@@ -46,7 +46,7 @@ public class JObjectManagerImpl implements JObjectManager {
|
||||
}
|
||||
|
||||
private JObject<?> getFromMap(String key) {
|
||||
synchronized (_map) {
|
||||
synchronized (this) {
|
||||
if (_map.containsKey(key)) {
|
||||
var ref = _map.get(key).get();
|
||||
if (ref != null) {
|
||||
@@ -60,7 +60,7 @@ public class JObjectManagerImpl implements JObjectManager {
|
||||
@Override
|
||||
public Optional<JObject<?>> get(String name) {
|
||||
cleanup();
|
||||
synchronized (_map) {
|
||||
synchronized (this) {
|
||||
var inMap = getFromMap(name);
|
||||
if (inMap != null) return Optional.of(inMap);
|
||||
}
|
||||
@@ -76,7 +76,7 @@ public class JObjectManagerImpl implements JObjectManager {
|
||||
if (!(meta instanceof ObjectMetadata))
|
||||
throw new NotImplementedException("Unexpected metadata type for " + name);
|
||||
|
||||
synchronized (_map) {
|
||||
synchronized (this) {
|
||||
var inMap = getFromMap(name);
|
||||
if (inMap != null) return Optional.of(inMap);
|
||||
JObject<?> newObj = new JObject<>(jObjectResolver, (ObjectMetadata) meta);
|
||||
@@ -108,7 +108,7 @@ public class JObjectManagerImpl implements JObjectManager {
|
||||
public <D extends JObjectData> JObject<D> put(D object) {
|
||||
cleanup();
|
||||
|
||||
synchronized (_map) {
|
||||
synchronized (this) {
|
||||
var inMap = getFromMap(object.getName());
|
||||
if (inMap != null) {
|
||||
inMap.runReadLocked((m, d) -> {
|
||||
@@ -139,7 +139,7 @@ public class JObjectManagerImpl implements JObjectManager {
|
||||
return got.get();
|
||||
}
|
||||
|
||||
synchronized (_map) {
|
||||
synchronized (this) {
|
||||
var inMap = getFromMap(md.getName());
|
||||
if (inMap != null) {
|
||||
return inMap;
|
||||
@@ -164,7 +164,7 @@ public class JObjectManagerImpl implements JObjectManager {
|
||||
return (JObject<D>) got.get();
|
||||
}
|
||||
|
||||
synchronized (_map) {
|
||||
synchronized (this) {
|
||||
var inMap = getFromMap(object.getName());
|
||||
if (inMap != null) {
|
||||
var ok = inMap.runReadLocked((m) -> {
|
||||
|
||||
@@ -9,7 +9,6 @@ import io.grpc.StatusRuntimeException;
|
||||
import io.quarkus.logging.Log;
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
import jakarta.inject.Inject;
|
||||
import org.apache.commons.lang3.SerializationUtils;
|
||||
import org.eclipse.microprofile.config.inject.ConfigProperty;
|
||||
|
||||
@ApplicationScoped
|
||||
@@ -23,6 +22,9 @@ public class JObjectResolver {
|
||||
@Inject
|
||||
InvalidationQueueService invalidationQueueService;
|
||||
|
||||
@Inject
|
||||
JObjectWriteback jObjectWriteback;
|
||||
|
||||
@ConfigProperty(name = "dhfs.objects.distributed.selfname")
|
||||
String selfname;
|
||||
|
||||
@@ -35,9 +37,11 @@ public class JObjectResolver {
|
||||
return DeserializationHelper.deserialize(obj);
|
||||
}
|
||||
|
||||
public void removeLocal(String name) {
|
||||
public void removeLocal(JObject<?> jObject, String name) {
|
||||
jObject.assertRWLock();
|
||||
try {
|
||||
Log.info("Deleting " + name);
|
||||
jObjectWriteback.remove(name);
|
||||
objectPersistentStore.deleteObject(name);
|
||||
} catch (StatusRuntimeException sx) {
|
||||
if (sx.getStatus() != Status.NOT_FOUND)
|
||||
@@ -48,9 +52,8 @@ public class JObjectResolver {
|
||||
}
|
||||
|
||||
public void notifyWrite(JObject<?> self) {
|
||||
objectPersistentStore.writeObject("meta_" + self.getName(), self.runReadLocked((m) -> SerializationUtils.serialize(m)));
|
||||
jObjectWriteback.markDirty(self.getName(), self);
|
||||
if (self.isResolved()) {
|
||||
objectPersistentStore.writeObject(self.getName(), self.runReadLocked((m, d) -> SerializationUtils.serialize(d)));
|
||||
// FIXME:?
|
||||
invalidationQueueService.pushInvalidationToAll(self.getName());
|
||||
}
|
||||
|
||||
@@ -0,0 +1,54 @@
|
||||
package com.usatiuk.dhfs.storage.objects.jrepository;
|
||||
|
||||
import com.usatiuk.dhfs.storage.objects.repository.persistence.ObjectPersistentStore;
|
||||
import io.quarkus.scheduler.Scheduled;
|
||||
import io.smallrye.common.annotation.Blocking;
|
||||
import io.smallrye.common.annotation.RunOnVirtualThread;
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
import jakarta.inject.Inject;
|
||||
import org.apache.commons.lang3.SerializationUtils;
|
||||
|
||||
import java.util.LinkedHashMap;
|
||||
|
||||
@ApplicationScoped
|
||||
public class JObjectWriteback {
|
||||
|
||||
@Inject
|
||||
ObjectPersistentStore objectPersistentStore;
|
||||
|
||||
private final LinkedHashMap<String, JObject<?>> _objects = new LinkedHashMap<>();
|
||||
|
||||
@Scheduled(every = "1s", concurrentExecution = Scheduled.ConcurrentExecution.SKIP)
|
||||
@RunOnVirtualThread
|
||||
public void write() {
|
||||
while (true) {
|
||||
JObject<?> obj;
|
||||
synchronized (this) {
|
||||
var entry = _objects.pollFirstEntry();
|
||||
if (entry == null) break;
|
||||
obj = entry.getValue();
|
||||
}
|
||||
obj.runReadLocked((m) -> {
|
||||
objectPersistentStore.writeObject("meta_" + m.getName(), SerializationUtils.serialize(m));
|
||||
if (obj.isResolved())
|
||||
obj.runReadLocked((m2, d) -> {
|
||||
objectPersistentStore.writeObject(m.getName(), SerializationUtils.serialize(d));
|
||||
return null;
|
||||
});
|
||||
return null;
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
public void remove(String name) {
|
||||
synchronized (this) {
|
||||
_objects.remove(name);
|
||||
}
|
||||
}
|
||||
|
||||
public void markDirty(String name, JObject<?> object) {
|
||||
synchronized (this) {
|
||||
_objects.put(name, object);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -40,9 +40,7 @@ public class RemoteObjectServiceServer implements DhfsObjectSyncGrpc {
|
||||
|
||||
var obj = jObjectManager.get(request.getName()).orElseThrow(() -> new StatusRuntimeException(Status.NOT_FOUND));
|
||||
|
||||
Pair<ObjectHeader, byte[]> read = obj.runReadLocked((meta, data) -> {
|
||||
return Pair.of(meta.toRpcHeader(), SerializationUtils.serialize(data));
|
||||
});
|
||||
Pair<ObjectHeader, byte[]> read = obj.runReadLocked((meta, data) -> Pair.of(meta.toRpcHeader(), SerializationUtils.serialize(data)));
|
||||
var replyObj = ApiObject.newBuilder().setHeader(read.getLeft()).setContent(ByteString.copyFrom(read.getRight())).build();
|
||||
return Uni.createFrom().item(GetObjectReply.newBuilder().setObject(replyObj).build());
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user