diff --git a/server/src/main/java/com/usatiuk/dhfs/storage/files/service/DhfsFileServiceImpl.java b/server/src/main/java/com/usatiuk/dhfs/storage/files/service/DhfsFileServiceImpl.java index 7cf8f1f3..822dcdc6 100644 --- a/server/src/main/java/com/usatiuk/dhfs/storage/files/service/DhfsFileServiceImpl.java +++ b/server/src/main/java/com/usatiuk/dhfs/storage/files/service/DhfsFileServiceImpl.java @@ -472,7 +472,7 @@ public class DhfsFileServiceImpl implements DhfsFileService { @Override public Boolean setTimes(String fileUuid, long atimeMs, long mtimeMs) { - var fileOpt = jObjectManager.get(fileUuid, File.class); + var fileOpt = jObjectManager.get(fileUuid, FsNode.class); if (fileOpt.isEmpty()) { Log.error("File not found when trying to read: " + fileUuid); return false; diff --git a/server/src/main/java/com/usatiuk/dhfs/storage/objects/jrepository/JObject.java b/server/src/main/java/com/usatiuk/dhfs/storage/objects/jrepository/JObject.java index 5542af69..361efb25 100644 --- a/server/src/main/java/com/usatiuk/dhfs/storage/objects/jrepository/JObject.java +++ b/server/src/main/java/com/usatiuk/dhfs/storage/objects/jrepository/JObject.java @@ -7,7 +7,6 @@ import org.apache.commons.lang3.NotImplementedException; import java.io.Serializable; import java.util.Objects; import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; public class JObject implements Serializable { @@ -28,7 +27,7 @@ public class JObject implements Serializable { return runReadLocked(ObjectMetadata::getName); } - protected final ReadWriteLock _lock = new ReentrantReadWriteLock(); + protected final ReentrantReadWriteLock _lock = new ReentrantReadWriteLock(); private final ObjectMetadata _metaPart; private final JObjectResolver _resolver; private final AtomicReference _dataPart = new AtomicReference<>(); @@ -91,7 +90,7 @@ public class JObject implements Serializable { var ver = _metaPart.getOurVersion(); VoidFn invalidateFn = () -> { _dataPart.set(null); - _resolver.removeLocal(_metaPart.getName()); + _resolver.removeLocal(this, _metaPart.getName()); }; var ret = fn.apply(_metaPart, () -> _resolver.bumpVersionSelf(this), invalidateFn); if (!Objects.equals(ver, _metaPart.getOurVersion())) @@ -140,4 +139,9 @@ public class JObject implements Serializable { _lock.writeLock().unlock(); } } + + public void assertRWLock() { + if (!_lock.isWriteLockedByCurrentThread()) + throw new IllegalStateException("Expected to be write-locked there: " + getName() + " " + Thread.currentThread().getName()); + } } diff --git a/server/src/main/java/com/usatiuk/dhfs/storage/objects/jrepository/JObjectManagerImpl.java b/server/src/main/java/com/usatiuk/dhfs/storage/objects/jrepository/JObjectManagerImpl.java index ef75016c..a465c8c9 100644 --- a/server/src/main/java/com/usatiuk/dhfs/storage/objects/jrepository/JObjectManagerImpl.java +++ b/server/src/main/java/com/usatiuk/dhfs/storage/objects/jrepository/JObjectManagerImpl.java @@ -38,7 +38,7 @@ public class JObjectManagerImpl implements JObjectManager { private void cleanup() { NamedSoftReference cur; while ((cur = (NamedSoftReference) _refQueue.poll()) != null) { - synchronized (_map) { + synchronized (this) { if (_map.containsKey(cur._key) && (_map.get(cur._key).get() == null)) _map.remove(cur._key); } @@ -46,7 +46,7 @@ public class JObjectManagerImpl implements JObjectManager { } private JObject getFromMap(String key) { - synchronized (_map) { + synchronized (this) { if (_map.containsKey(key)) { var ref = _map.get(key).get(); if (ref != null) { @@ -60,7 +60,7 @@ public class JObjectManagerImpl implements JObjectManager { @Override public Optional> get(String name) { cleanup(); - synchronized (_map) { + synchronized (this) { var inMap = getFromMap(name); if (inMap != null) return Optional.of(inMap); } @@ -76,7 +76,7 @@ public class JObjectManagerImpl implements JObjectManager { if (!(meta instanceof ObjectMetadata)) throw new NotImplementedException("Unexpected metadata type for " + name); - synchronized (_map) { + synchronized (this) { var inMap = getFromMap(name); if (inMap != null) return Optional.of(inMap); JObject newObj = new JObject<>(jObjectResolver, (ObjectMetadata) meta); @@ -108,7 +108,7 @@ public class JObjectManagerImpl implements JObjectManager { public JObject put(D object) { cleanup(); - synchronized (_map) { + synchronized (this) { var inMap = getFromMap(object.getName()); if (inMap != null) { inMap.runReadLocked((m, d) -> { @@ -139,7 +139,7 @@ public class JObjectManagerImpl implements JObjectManager { return got.get(); } - synchronized (_map) { + synchronized (this) { var inMap = getFromMap(md.getName()); if (inMap != null) { return inMap; @@ -164,7 +164,7 @@ public class JObjectManagerImpl implements JObjectManager { return (JObject) got.get(); } - synchronized (_map) { + synchronized (this) { var inMap = getFromMap(object.getName()); if (inMap != null) { var ok = inMap.runReadLocked((m) -> { diff --git a/server/src/main/java/com/usatiuk/dhfs/storage/objects/jrepository/JObjectResolver.java b/server/src/main/java/com/usatiuk/dhfs/storage/objects/jrepository/JObjectResolver.java index f26b5f25..95eb21ea 100644 --- a/server/src/main/java/com/usatiuk/dhfs/storage/objects/jrepository/JObjectResolver.java +++ b/server/src/main/java/com/usatiuk/dhfs/storage/objects/jrepository/JObjectResolver.java @@ -9,7 +9,6 @@ import io.grpc.StatusRuntimeException; import io.quarkus.logging.Log; import jakarta.enterprise.context.ApplicationScoped; import jakarta.inject.Inject; -import org.apache.commons.lang3.SerializationUtils; import org.eclipse.microprofile.config.inject.ConfigProperty; @ApplicationScoped @@ -23,6 +22,9 @@ public class JObjectResolver { @Inject InvalidationQueueService invalidationQueueService; + @Inject + JObjectWriteback jObjectWriteback; + @ConfigProperty(name = "dhfs.objects.distributed.selfname") String selfname; @@ -35,9 +37,11 @@ public class JObjectResolver { return DeserializationHelper.deserialize(obj); } - public void removeLocal(String name) { + public void removeLocal(JObject jObject, String name) { + jObject.assertRWLock(); try { Log.info("Deleting " + name); + jObjectWriteback.remove(name); objectPersistentStore.deleteObject(name); } catch (StatusRuntimeException sx) { if (sx.getStatus() != Status.NOT_FOUND) @@ -48,9 +52,8 @@ public class JObjectResolver { } public void notifyWrite(JObject self) { - objectPersistentStore.writeObject("meta_" + self.getName(), self.runReadLocked((m) -> SerializationUtils.serialize(m))); + jObjectWriteback.markDirty(self.getName(), self); if (self.isResolved()) { - objectPersistentStore.writeObject(self.getName(), self.runReadLocked((m, d) -> SerializationUtils.serialize(d))); // FIXME:? invalidationQueueService.pushInvalidationToAll(self.getName()); } diff --git a/server/src/main/java/com/usatiuk/dhfs/storage/objects/jrepository/JObjectWriteback.java b/server/src/main/java/com/usatiuk/dhfs/storage/objects/jrepository/JObjectWriteback.java new file mode 100644 index 00000000..15142ff4 --- /dev/null +++ b/server/src/main/java/com/usatiuk/dhfs/storage/objects/jrepository/JObjectWriteback.java @@ -0,0 +1,54 @@ +package com.usatiuk.dhfs.storage.objects.jrepository; + +import com.usatiuk.dhfs.storage.objects.repository.persistence.ObjectPersistentStore; +import io.quarkus.scheduler.Scheduled; +import io.smallrye.common.annotation.Blocking; +import io.smallrye.common.annotation.RunOnVirtualThread; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; +import org.apache.commons.lang3.SerializationUtils; + +import java.util.LinkedHashMap; + +@ApplicationScoped +public class JObjectWriteback { + + @Inject + ObjectPersistentStore objectPersistentStore; + + private final LinkedHashMap> _objects = new LinkedHashMap<>(); + + @Scheduled(every = "1s", concurrentExecution = Scheduled.ConcurrentExecution.SKIP) + @RunOnVirtualThread + public void write() { + while (true) { + JObject obj; + synchronized (this) { + var entry = _objects.pollFirstEntry(); + if (entry == null) break; + obj = entry.getValue(); + } + obj.runReadLocked((m) -> { + objectPersistentStore.writeObject("meta_" + m.getName(), SerializationUtils.serialize(m)); + if (obj.isResolved()) + obj.runReadLocked((m2, d) -> { + objectPersistentStore.writeObject(m.getName(), SerializationUtils.serialize(d)); + return null; + }); + return null; + }); + } + } + + public void remove(String name) { + synchronized (this) { + _objects.remove(name); + } + } + + public void markDirty(String name, JObject object) { + synchronized (this) { + _objects.put(name, object); + } + } +} diff --git a/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/RemoteObjectServiceServer.java b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/RemoteObjectServiceServer.java index 26bce24a..d52de36b 100644 --- a/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/RemoteObjectServiceServer.java +++ b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/RemoteObjectServiceServer.java @@ -40,9 +40,7 @@ public class RemoteObjectServiceServer implements DhfsObjectSyncGrpc { var obj = jObjectManager.get(request.getName()).orElseThrow(() -> new StatusRuntimeException(Status.NOT_FOUND)); - Pair read = obj.runReadLocked((meta, data) -> { - return Pair.of(meta.toRpcHeader(), SerializationUtils.serialize(data)); - }); + Pair read = obj.runReadLocked((meta, data) -> Pair.of(meta.toRpcHeader(), SerializationUtils.serialize(data))); var replyObj = ApiObject.newBuilder().setHeader(read.getLeft()).setContent(ByteString.copyFrom(read.getRight())).build(); return Uni.createFrom().item(GetObjectReply.newBuilder().setObject(replyObj).build()); }