From 23cd3e4345012954954993d4273bd787cfb55d78 Mon Sep 17 00:00:00 2001 From: Stepan Usatiuk Date: Sun, 16 Jun 2024 20:57:22 +0200 Subject: [PATCH] a bit more cleanup with metadata --- .../DistributedObjectRepository.java | 6 +-- .../repository/distributed/ObjectMeta.java | 47 +++++++------------ .../distributed/ObjectMetaData.java | 28 +++++++++++ .../RemoteObjectServiceServer.java | 2 +- .../repository/distributed/SyncHandler.java | 23 +++++---- 5 files changed, 61 insertions(+), 45 deletions(-) create mode 100644 server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/ObjectMetaData.java diff --git a/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/DistributedObjectRepository.java b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/DistributedObjectRepository.java index 0a759a26..4801e96c 100644 --- a/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/DistributedObjectRepository.java +++ b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/DistributedObjectRepository.java @@ -84,7 +84,7 @@ public class DistributedObjectRepository implements ObjectRepository { var info = infoOpt.get(); - Optional read = info.runReadLocked(() -> { + Optional read = info.runReadLocked((data) -> { if (objectPersistentStore.existsObject(name)) return Optional.of(objectPersistentStore.readObject(name)); return Optional.empty(); @@ -92,7 +92,7 @@ public class DistributedObjectRepository implements ObjectRepository { if (read.isPresent()) return read.get(); // Race? - return info.runWriteLocked(() -> { + return info.runWriteLocked((data) -> { var obj = remoteObjectServiceClient.getObject(name); objectPersistentStore.writeObject(name, obj); return obj; @@ -104,7 +104,7 @@ public class DistributedObjectRepository implements ObjectRepository { public void writeObject(String name, byte[] data, Boolean canIgnoreConflict) { var info = objectIndexService.getOrCreateMeta(name, canIgnoreConflict); - info.runWriteLocked(() -> { + info.runWriteLocked((metaData) -> { objectPersistentStore.writeObject(name, data); var prevMtime = info.getMtime(); info.setMtime(System.currentTimeMillis()); diff --git a/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/ObjectMeta.java b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/ObjectMeta.java index 19d339ec..2e67021d 100644 --- a/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/ObjectMeta.java +++ b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/ObjectMeta.java @@ -1,61 +1,50 @@ package com.usatiuk.dhfs.storage.objects.repository.distributed; -import lombok.Getter; - import java.io.Serializable; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.Callable; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; public class ObjectMeta implements Serializable { public ObjectMeta(String name, Boolean assumeUnique) { - this._name = name; - this._assumeUnique = assumeUnique; + _data = new ObjectMetaData(name, assumeUnique); } + private final ObjectMetaData _data; private final ReadWriteLock _lock = new ReentrantReadWriteLock(); - @Getter - final String _name; - - long _mtime; - - @Getter - final Boolean _assumeUnique; - - //FIXME: - final List _remoteCopies = new ArrayList<>(); - public void setMtime(long mtime) { - runWriteLocked(() -> { - _mtime = mtime; + runWriteLocked((data) -> { + data.setMtime(mtime); return null; }); } public long getMtime() { - return runReadLocked(() -> _mtime); + return runReadLocked((data) -> data.getMtime()); } - public R runReadLocked(Callable fn) { + public boolean getAssumeUnique() { + return runReadLocked((data) -> data.getAssumeUnique()); + } + + @FunctionalInterface + public interface ObjectMetaFn { + R apply(ObjectMetaData indexData); + } + + public R runReadLocked(ObjectMetaFn fn) { _lock.readLock().lock(); try { - return fn.call(); - } catch (Exception e) { - throw new RuntimeException(e); + return fn.apply(_data); } finally { _lock.readLock().unlock(); } } - public R runWriteLocked(Callable fn) { + public R runWriteLocked(ObjectMetaFn fn) { _lock.writeLock().lock(); try { - return fn.call(); - } catch (Exception e) { - throw new RuntimeException(e); + return fn.apply(_data); } finally { _lock.writeLock().unlock(); } diff --git a/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/ObjectMetaData.java b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/ObjectMetaData.java new file mode 100644 index 00000000..1fd627a8 --- /dev/null +++ b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/ObjectMetaData.java @@ -0,0 +1,28 @@ +package com.usatiuk.dhfs.storage.objects.repository.distributed; + +import lombok.Getter; +import lombok.Setter; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; + +public class ObjectMetaData implements Serializable { + public ObjectMetaData(String name, Boolean assumeUnique) { + _name = name; + _assumeUnique = assumeUnique; + } + + @Getter + final String _name; + + @Getter + @Setter + long _mtime; + + @Getter + final Boolean _assumeUnique; + + @Getter + final List _remoteCopies = new ArrayList<>(); +} diff --git a/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/RemoteObjectServiceServer.java b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/RemoteObjectServiceServer.java index b750f778..ca30a2c0 100644 --- a/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/RemoteObjectServiceServer.java +++ b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/RemoteObjectServiceServer.java @@ -33,7 +33,7 @@ public class RemoteObjectServiceServer implements DhfsObjectSyncGrpc { var metaOpt = objectIndexService.getMeta(request.getName()); if (metaOpt.isEmpty()) throw new StatusRuntimeException(Status.NOT_FOUND); var meta = metaOpt.get(); - Optional> read = meta.runReadLocked(() -> { + Optional> read = meta.runReadLocked((data) -> { if (objectPersistentStore.existsObject(request.getName())) return Optional.of(Pair.of(meta.getMtime(), objectPersistentStore.readObject(request.getName()))); return Optional.empty(); diff --git a/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/SyncHandler.java b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/SyncHandler.java index 9cf415ba..922bab91 100644 --- a/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/SyncHandler.java +++ b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/SyncHandler.java @@ -5,7 +5,6 @@ import com.usatiuk.dhfs.objects.repository.distributed.IndexUpdateReply; import com.usatiuk.dhfs.storage.objects.jrepository.JObjectManager; import com.usatiuk.dhfs.storage.objects.repository.persistence.ObjectPersistentStore; import io.quarkus.logging.Log; -import io.smallrye.mutiny.Uni; import jakarta.enterprise.context.ApplicationScoped; import jakarta.inject.Inject; import org.apache.commons.lang3.NotImplementedException; @@ -22,32 +21,32 @@ public class SyncHandler { JObjectManager jObjectManager; public IndexUpdateReply handleRemoteUpdate(IndexUpdatePush request) { - var metaOpt = objectIndexService.getOrCreateMeta(request.getName(), request.getAssumeUnique()); - metaOpt.runWriteLocked(() -> { - if (metaOpt.getMtime() == request.getMtime()) { - metaOpt._remoteCopies.add(request.getSelfname()); + var meta = objectIndexService.getOrCreateMeta(request.getName(), request.getAssumeUnique()); + meta.runWriteLocked((data) -> { + if (meta.getMtime() == request.getMtime()) { + data.getRemoteCopies().add(request.getSelfname()); return null; } - if (metaOpt.getMtime() != request.getPrevMtime()) { - if (!metaOpt.getAssumeUnique() - || (metaOpt.getAssumeUnique() != request.getAssumeUnique())) { + if (meta.getMtime() != request.getPrevMtime()) { + if (!meta.getAssumeUnique() + || (meta.getAssumeUnique() != request.getAssumeUnique())) { Log.error("Conflict!"); throw new NotImplementedException(); } } - metaOpt.setMtime(request.getMtime()); + meta.setMtime(request.getMtime()); - metaOpt._remoteCopies.clear(); - metaOpt._remoteCopies.add(request.getSelfname()); + data.getRemoteCopies().clear(); + data.getRemoteCopies().add(request.getSelfname()); try { objectPersistentStore.deleteObject(request.getName()); } catch (Exception ignored) { } - jObjectManager.invalidateJObject(metaOpt.getName()); + jObjectManager.invalidateJObject(data.getName()); return null; });