From 14ad1da4171be01a4367cd9a584e8c10d053df9a Mon Sep 17 00:00:00 2001 From: Stepan Usatiuk Date: Sun, 16 Jun 2024 22:27:03 +0200 Subject: [PATCH] a little less crazy versioning --- .../files/service/DhfsFileServiceImpl.java | 2 +- .../DistributedObjectRepository.java | 38 +++++------ .../distributed/ObjectIndexService.java | 10 ++- .../repository/distributed/ObjectMeta.java | 11 ---- .../distributed/ObjectMetaData.java | 34 ++++++++-- .../RemoteObjectServiceClient.java | 64 +++++++++++-------- .../RemoteObjectServiceServer.java | 29 +++++---- .../repository/distributed/SyncHandler.java | 47 +++++++++++--- server/src/main/proto/dhfs_objects_sync.proto | 20 ++++-- 9 files changed, 159 insertions(+), 96 deletions(-) diff --git a/server/src/main/java/com/usatiuk/dhfs/storage/files/service/DhfsFileServiceImpl.java b/server/src/main/java/com/usatiuk/dhfs/storage/files/service/DhfsFileServiceImpl.java index ec5250e6..ef547ec0 100644 --- a/server/src/main/java/com/usatiuk/dhfs/storage/files/service/DhfsFileServiceImpl.java +++ b/server/src/main/java/com/usatiuk/dhfs/storage/files/service/DhfsFileServiceImpl.java @@ -34,7 +34,7 @@ public class DhfsFileServiceImpl implements DhfsFileService { void init(@Observes @Priority(500) StartupEvent event) { Log.info("Initializing file service"); - if (!objectRepository.existsObject(namespace + new UUID(0, 0))) { + if (!objectRepository.existsObject(new UUID(0, 0).toString())) { jObjectManager.put(new Directory(new UUID(0, 0), 0755)); } getRoot(); diff --git a/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/DistributedObjectRepository.java b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/DistributedObjectRepository.java index 4801e96c..fbbc1849 100644 --- a/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/DistributedObjectRepository.java +++ b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/DistributedObjectRepository.java @@ -23,6 +23,7 @@ import java.util.Optional; public class DistributedObjectRepository implements ObjectRepository { @ConfigProperty(name = "dhfs.objects.distributed.selfname") String selfname; + @Inject Vertx vertx; @@ -42,14 +43,9 @@ public class DistributedObjectRepository implements ObjectRepository { try { Log.info("Starting sync"); var got = remoteObjectServiceClient.getIndex(); - for (var h : got) { - var prevMtime = objectIndexService.exists(h.getName()) - ? objectIndexService.getMeta(h.getName()).get().getMtime() - : 0; - syncHandler.handleRemoteUpdate( - IndexUpdatePush.newBuilder().setSelfname(selfname).setName(h.getName()) - .setAssumeUnique(h.getAssumeUnique()) - .setMtime(h.getMtime()).setPrevMtime(prevMtime).build()); + for (var h : got.getObjectsList()) { + syncHandler.handleRemoteUpdate(IndexUpdatePush.newBuilder() + .setSelfname(got.getSelfname()).setHeader(h).build()); } Log.info("Sync complete"); } catch (Exception e) { @@ -76,11 +72,9 @@ public class DistributedObjectRepository implements ObjectRepository { @Nonnull @Override public byte[] readObject(String name) { - if (!objectIndexService.exists(name)) - throw new IllegalArgumentException("Object " + name + " doesn't exist"); - var infoOpt = objectIndexService.getMeta(name); - if (infoOpt.isEmpty()) throw new IllegalArgumentException("Object " + name + " doesn't exist"); + if (infoOpt.isEmpty()) + throw new IllegalArgumentException("Object " + name + " doesn't exist"); var info = infoOpt.get(); @@ -90,8 +84,8 @@ public class DistributedObjectRepository implements ObjectRepository { return Optional.empty(); }); if (read.isPresent()) return read.get(); - // Race? + // Possible race if it got deleted? return info.runWriteLocked((data) -> { var obj = remoteObjectServiceClient.getObject(name); objectPersistentStore.writeObject(name, obj); @@ -106,17 +100,17 @@ public class DistributedObjectRepository implements ObjectRepository { info.runWriteLocked((metaData) -> { objectPersistentStore.writeObject(name, data); - var prevMtime = info.getMtime(); - info.setMtime(System.currentTimeMillis()); - try { - remoteObjectServiceClient.notifyUpdate(name, prevMtime); - } catch (Exception e) { - Log.error("Error when notifying remote update:"); - Log.error(e); - Log.error(e.getCause()); - } + metaData.getChangelog().merge(selfname, 1L, Long::sum); return null; }); + // FIXME: Race? + try { + remoteObjectServiceClient.notifyUpdate(name); + } catch (Exception e) { + Log.error("Error when notifying remote update:"); + Log.error(e); + Log.error(e.getCause()); + } } @Nonnull diff --git a/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/ObjectIndexService.java b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/ObjectIndexService.java index d0172441..232a7a6a 100644 --- a/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/ObjectIndexService.java +++ b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/ObjectIndexService.java @@ -17,6 +17,9 @@ import java.util.Optional; @ApplicationScoped public class ObjectIndexService { + @ConfigProperty(name = "dhfs.objects.distributed.selfname") + String selfname; + ObjectIndex _index = new ObjectIndex(); @ConfigProperty(name = "dhfs.objects.distributed.root") @@ -47,7 +50,12 @@ public class ObjectIndexService { } public ObjectMeta getOrCreateMeta(String name, boolean assumeUnique) { - return _index.getOrCreate(name, assumeUnique); + var ret = _index.getOrCreate(name, assumeUnique); + ret.runWriteLocked(md -> { + md.getChangelog().putIfAbsent(selfname, 0L); + return null; + }); + return ret; } @FunctionalInterface diff --git a/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/ObjectMeta.java b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/ObjectMeta.java index 2e67021d..0a27aa95 100644 --- a/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/ObjectMeta.java +++ b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/ObjectMeta.java @@ -12,17 +12,6 @@ public class ObjectMeta implements Serializable { private final ObjectMetaData _data; private final ReadWriteLock _lock = new ReentrantReadWriteLock(); - public void setMtime(long mtime) { - runWriteLocked((data) -> { - data.setMtime(mtime); - return null; - }); - } - - public long getMtime() { - return runReadLocked((data) -> data.getMtime()); - } - public boolean getAssumeUnique() { return runReadLocked((data) -> data.getAssumeUnique()); } diff --git a/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/ObjectMetaData.java b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/ObjectMetaData.java index 1fd627a8..d3fe050d 100644 --- a/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/ObjectMetaData.java +++ b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/ObjectMetaData.java @@ -1,10 +1,14 @@ package com.usatiuk.dhfs.storage.objects.repository.distributed; +import com.usatiuk.dhfs.objects.repository.distributed.ObjectChangelog; +import com.usatiuk.dhfs.objects.repository.distributed.ObjectChangelogEntry; +import com.usatiuk.dhfs.objects.repository.distributed.ObjectHeader; import lombok.Getter; -import lombok.Setter; import java.io.Serializable; import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.List; public class ObjectMetaData implements Serializable { @@ -14,15 +18,33 @@ public class ObjectMetaData implements Serializable { } @Getter - final String _name; + private final String _name; @Getter - @Setter - long _mtime; + private final Boolean _assumeUnique; @Getter - final Boolean _assumeUnique; + private final List _remoteCopies = new ArrayList<>(); @Getter - final List _remoteCopies = new ArrayList<>(); + private final HashMap _changelog = new LinkedHashMap<>(); + + Long getTotalVersion() { + return _changelog.values().stream().reduce(0L, Long::sum); + } + + ObjectChangelog toRpcChangelog() { + var changelogBuilder = ObjectChangelog.newBuilder(); + for (var m : getChangelog().entrySet()) { + changelogBuilder.addEntries(ObjectChangelogEntry.newBuilder().setHost(m.getKey()).setVersion(m.getValue()).build()); + } + return changelogBuilder.build(); + } + + ObjectHeader toRpcHeader() { + var headerBuilder = ObjectHeader.newBuilder().setName(getName()); + headerBuilder.setAssumeUnique(getAssumeUnique()); + headerBuilder.setChangelog(toRpcChangelog()); + return headerBuilder.build(); + } } diff --git a/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/RemoteObjectServiceClient.java b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/RemoteObjectServiceClient.java index 3f28a1d3..bb2460e0 100644 --- a/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/RemoteObjectServiceClient.java +++ b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/RemoteObjectServiceClient.java @@ -1,18 +1,12 @@ package com.usatiuk.dhfs.storage.objects.repository.distributed; -import com.usatiuk.dhfs.objects.repository.distributed.GetIndexRequest; -import com.usatiuk.dhfs.objects.repository.distributed.GetObjectRequest; -import com.usatiuk.dhfs.objects.repository.distributed.IndexUpdatePush; -import com.usatiuk.dhfs.objects.repository.distributed.ObjectHeader; +import com.usatiuk.dhfs.objects.repository.distributed.*; import io.quarkus.logging.Log; -import io.smallrye.mutiny.Uni; import jakarta.enterprise.context.ApplicationScoped; import jakarta.inject.Inject; import org.apache.commons.lang3.NotImplementedException; import org.eclipse.microprofile.config.inject.ConfigProperty; -import java.util.List; - @ApplicationScoped public class RemoteObjectServiceClient { @ConfigProperty(name = "dhfs.objects.distributed.selfname") @@ -26,39 +20,57 @@ public class RemoteObjectServiceClient { public byte[] getObject(String name) { return remoteHostManager.withClient(client -> { - var req = GetObjectRequest.newBuilder().setName(name).build(); - var reply = client.getObject(req); - var metaOpt = objectIndexService.getMeta(name); - if (metaOpt.isEmpty()) throw new RuntimeException("Oops!"); - var meta = metaOpt.get(); - if (meta.getMtime() != reply.getObject().getHeader().getMtime()) { - if (!meta.getAssumeUnique() && (meta.getAssumeUnique() != reply.getObject().getHeader().getAssumeUnique())) { + var reply = client.getObject(GetObjectRequest.newBuilder().setName(name).build()); + + var meta = objectIndexService.getMeta(name).orElseThrow(() -> { + Log.error("Race when trying to fetch"); + return new NotImplementedException(); + }); + + var receivedSelfVer = reply.getObject().getHeader().getChangelog() + .getEntriesList().stream().filter(p -> p.getHost().equals(selfname)) + .findFirst().map(ObjectChangelogEntry::getVersion).orElse(0L); + + var receivedTotalVer = reply.getObject().getHeader().getChangelog().getEntriesList() + .stream().map(ObjectChangelogEntry::getVersion).reduce(0L, Long::sum); + + return meta.runReadLocked(md -> { + var outdated = + ( + (md.getTotalVersion() > receivedTotalVer) + || (md.getChangelog().get(selfname) > receivedSelfVer) + ) + && !md.getAssumeUnique(); + + if (outdated) { Log.error("Race when trying to fetch"); throw new NotImplementedException(); } - } - return reply.getObject().getContent().toByteArray(); + return reply.getObject().getContent().toByteArray(); + }); }); } - public List getIndex() { + public GetIndexReply getIndex() { return remoteHostManager.withClient(client -> { var req = GetIndexRequest.newBuilder().build(); var reply = client.getIndex(req); - return reply.getObjectsList(); + return reply; }); } - public Boolean notifyUpdate(String name, long prevMtime) { + public Boolean notifyUpdate(String name) { return remoteHostManager.withClient(client -> { - var metaOpt = objectIndexService.getMeta(name); - if (metaOpt.isEmpty()) throw new RuntimeException("Oops!"); - var meta = metaOpt.get(); + var meta = objectIndexService.getMeta(name).orElseThrow(() -> { + Log.error("Race when trying to notify update"); + return new NotImplementedException(); + }); - var req = IndexUpdatePush.newBuilder().setSelfname(selfname).setName(name) - .setAssumeUnique(meta.getAssumeUnique()) - .setMtime(meta.getMtime()).setPrevMtime(prevMtime).build(); - client.indexUpdate(req); + var builder = IndexUpdatePush.newBuilder().setSelfname(selfname); + + client.indexUpdate(builder.setHeader( + meta.runReadLocked(ObjectMetaData::toRpcHeader) + ).build()); return true; }); } diff --git a/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/RemoteObjectServiceServer.java b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/RemoteObjectServiceServer.java index ca30a2c0..9b383027 100644 --- a/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/RemoteObjectServiceServer.java +++ b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/RemoteObjectServiceServer.java @@ -30,18 +30,20 @@ public class RemoteObjectServiceServer implements DhfsObjectSyncGrpc { @Blocking public Uni getObject(GetObjectRequest request) { Log.info("<-- getObject: " + request.getName()); - var metaOpt = objectIndexService.getMeta(request.getName()); - if (metaOpt.isEmpty()) throw new StatusRuntimeException(Status.NOT_FOUND); - var meta = metaOpt.get(); - Optional> read = meta.runReadLocked((data) -> { - if (objectPersistentStore.existsObject(request.getName())) - return Optional.of(Pair.of(meta.getMtime(), objectPersistentStore.readObject(request.getName()))); - return Optional.empty(); + + var meta = objectIndexService.getMeta(request.getName()).orElseThrow(() -> new StatusRuntimeException(Status.NOT_FOUND)); + + Optional> readOpt = meta.runReadLocked((data) -> { + if (objectPersistentStore.existsObject(request.getName())) { + ObjectHeader header = data.toRpcHeader(); + byte[] bytes = objectPersistentStore.readObject(request.getName()); + return Optional.of(Pair.of(header, bytes)); + } else { + return Optional.empty(); + } }); - if (read.isEmpty()) throw new StatusRuntimeException(Status.NOT_FOUND); - var obj = read.get().getRight(); - var header = ObjectHeader.newBuilder().setName(request.getName()).setMtime(read.get().getLeft()).setAssumeUnique(meta.getAssumeUnique()).build(); - var replyObj = ApiObject.newBuilder().setHeader(header).setContent(ByteString.copyFrom(obj)).build(); + var read = readOpt.orElseThrow(() -> new StatusRuntimeException(Status.NOT_FOUND)); + var replyObj = ApiObject.newBuilder().setHeader(read.getLeft()).setContent(ByteString.copyFrom(read.getRight())).build(); return Uni.createFrom().item(GetObjectReply.newBuilder().setObject(replyObj).build()); } @@ -51,8 +53,7 @@ public class RemoteObjectServiceServer implements DhfsObjectSyncGrpc { Log.info("<-- getIndex: "); var builder = GetIndexReply.newBuilder(); objectIndexService.forAllRead((name, meta) -> { - var entry = ObjectHeader.newBuilder().setName(name).setMtime(meta.getMtime()).setAssumeUnique(meta.getAssumeUnique()).build(); - builder.addObjects(entry); + builder.addObjects(meta.runReadLocked(ObjectMetaData::toRpcHeader)); }); return Uni.createFrom().item(builder.build()); } @@ -60,7 +61,7 @@ public class RemoteObjectServiceServer implements DhfsObjectSyncGrpc { @Override @Blocking public Uni indexUpdate(IndexUpdatePush request) { - Log.info("<-- indexUpdate: " + request.getName() + " from: " + request.getPrevMtime() + " to: " + request.getMtime()); + Log.info("<-- indexUpdate: " + request.getHeader().getName()); return Uni.createFrom().item(syncHandler.handleRemoteUpdate(request)); } } diff --git a/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/SyncHandler.java b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/SyncHandler.java index 7ea8067b..216dec0d 100644 --- a/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/SyncHandler.java +++ b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/SyncHandler.java @@ -2,15 +2,22 @@ package com.usatiuk.dhfs.storage.objects.repository.distributed; import com.usatiuk.dhfs.objects.repository.distributed.IndexUpdatePush; import com.usatiuk.dhfs.objects.repository.distributed.IndexUpdateReply; +import com.usatiuk.dhfs.objects.repository.distributed.ObjectChangelogEntry; import com.usatiuk.dhfs.storage.objects.jrepository.JObjectManager; import com.usatiuk.dhfs.storage.objects.repository.persistence.ObjectPersistentStore; +import io.grpc.Status; +import io.grpc.StatusRuntimeException; import io.quarkus.logging.Log; import jakarta.enterprise.context.ApplicationScoped; import jakarta.inject.Inject; import org.apache.commons.lang3.NotImplementedException; +import org.eclipse.microprofile.config.inject.ConfigProperty; @ApplicationScoped public class SyncHandler { + @ConfigProperty(name = "dhfs.objects.distributed.selfname") + String selfname; + @Inject ObjectPersistentStore objectPersistentStore; @@ -21,28 +28,48 @@ public class SyncHandler { JObjectManager jObjectManager; public IndexUpdateReply handleRemoteUpdate(IndexUpdatePush request) { - var meta = objectIndexService.getOrCreateMeta(request.getName(), request.getAssumeUnique()); + var meta = objectIndexService.getOrCreateMeta(request.getHeader().getName(), request.getHeader().getAssumeUnique()); + + var receivedSelfVer = request.getHeader().getChangelog() + .getEntriesList().stream().filter(p -> p.getHost().equals(selfname)) + .findFirst().map(ObjectChangelogEntry::getVersion).orElse(0L); + + var receivedTotalVer = request.getHeader().getChangelog().getEntriesList() + .stream().map(ObjectChangelogEntry::getVersion).reduce(0L, Long::sum); + meta.runWriteLocked((data) -> { - if (meta.getMtime() == request.getMtime()) { + var conflict = (data.getChangelog().get(selfname) > receivedSelfVer) && !data.getAssumeUnique(); + + if (conflict) { + Log.error("Conflict when updating: " + request.getHeader().getName()); + throw new NotImplementedException(); + } + + if (receivedTotalVer.equals(data.getTotalVersion())) { data.getRemoteCopies().add(request.getSelfname()); return null; } - if (meta.getMtime() != request.getPrevMtime()) { - if (!meta.getAssumeUnique() - || (meta.getAssumeUnique() != request.getAssumeUnique())) { - Log.error("Conflict!"); - throw new NotImplementedException(); - } + if (receivedTotalVer < data.getTotalVersion()) { + // FIXME?: + data.getRemoteCopies().remove(request.getSelfname()); + return null; } - meta.setMtime(request.getMtime()); + data.getChangelog().clear(); + for (var entry : request.getHeader().getChangelog().getEntriesList()) { + data.getChangelog().put(entry.getHost(), entry.getVersion()); + } + data.getChangelog().putIfAbsent(selfname, 0L); data.getRemoteCopies().clear(); data.getRemoteCopies().add(request.getSelfname()); try { - objectPersistentStore.deleteObject(request.getName()); + objectPersistentStore.deleteObject(request.getHeader().getName()); + } catch (StatusRuntimeException sx) { + if (sx.getStatus() != Status.NOT_FOUND) + Log.info("Couldn't delete object from persistent store: ", sx); } catch (Exception e) { Log.info("Couldn't delete object from persistent store: ", e); } diff --git a/server/src/main/proto/dhfs_objects_sync.proto b/server/src/main/proto/dhfs_objects_sync.proto index 9d4bb12e..45f0fbf9 100644 --- a/server/src/main/proto/dhfs_objects_sync.proto +++ b/server/src/main/proto/dhfs_objects_sync.proto @@ -12,10 +12,19 @@ service DhfsObjectSyncGrpc { rpc IndexUpdate (IndexUpdatePush) returns (IndexUpdateReply) {} } +message ObjectChangelogEntry { + string host = 1; + uint64 version = 2; +} + +message ObjectChangelog { + repeated ObjectChangelogEntry entries = 1; +} + message ObjectHeader { string name = 2; bool assumeUnique = 3; - uint64 mtime = 4; + ObjectChangelog changelog = 4; } message ApiObject { @@ -28,6 +37,8 @@ message GetObjectRequest { } message GetObjectReply { + string selfname = 10; + ApiObject object = 1; } @@ -36,16 +47,15 @@ message GetIndexRequest { } message GetIndexReply { + string selfname = 10; + repeated ObjectHeader objects = 1; } message IndexUpdatePush { string selfname = 10; - string name = 2; - bool assumeUnique = 3; - uint64 mtime = 4; - uint64 prevMtime = 5; + ObjectHeader header = 1; } message IndexUpdateReply {