From 7c06241876f2ebed8ccfb0f3df9b4644ef5daa2e Mon Sep 17 00:00:00 2001 From: Stepan Usatiuk Date: Mon, 3 Feb 2025 22:02:05 +0100 Subject: [PATCH] dump some server changes --- .../usatiuk/dhfs/files/objects/ChunkData.java | 24 +- .../files/objects/ChunkDataSerializer.java | 22 ++ .../com/usatiuk/dhfs/files/objects/File.java | 34 +- .../dhfs/files/objects/FileSerializer.java | 44 +++ .../usatiuk/dhfs/files/objects/FsNode.java | 4 +- .../files/service/DhfsFileServiceImpl.java | 62 ++-- .../com/usatiuk/dhfs/objects/JDataRemote.java | 19 +- .../java/com/usatiuk/dhfs/objects/PeerId.java | 7 +- .../usatiuk/dhfs/objects/ReceivedObject.java | 6 + .../dhfs/objects/RefcounterTxHook.java | 26 +- .../dhfs/objects/RemoteObjPusherTxHook.java | 49 +++ .../usatiuk/dhfs/objects/RemoteObject.java | 80 ++--- .../dhfs/objects/RemoteObjectMeta.java | 53 ++++ .../dhfs/objects/RemoteTransaction.java | 91 +++++- .../jkleppmanntree/JKleppmannTreeManager.java | 203 ++++++------ .../JKleppmannTreeOpWrapper.java | 18 +- .../JKleppmannTreePeerInterface.java | 20 +- .../JKleppmannTreePeriodicPushOp.java | 8 +- .../structs/JKleppmannTreeNode.java | 8 +- .../structs/JKleppmannTreePersistentData.java | 37 +-- .../dhfs/objects/repository/PeerManager.java | 33 +- .../repository/ReceivedObjectSerializer.java | 46 +++ .../repository/RemoteObjectServiceClient.java | 167 +++++----- .../repository/RemoteObjectServiceServer.java | 174 +++++------ .../objects/repository/RpcClientFactory.java | 4 +- .../dhfs/objects/repository/SyncHandler.java | 290 ++++++++++-------- .../DeferredInvalidationQueueData.java | 17 + .../DeferredInvalidationQueueService.java | 85 +++++ .../invalidation/IndexUpdateOp.java | 12 + .../invalidation/IndexUpdateOpSerializer.java | 36 +++ .../InvalidationQueueService.java | 190 ++++++++++++ .../JKleppmannTreeOpPTempSerializer.java | 22 ++ .../objects/repository/invalidation/Op.java | 8 + .../repository/invalidation/OpHandler.java | 27 ++ .../repository/invalidation/OpPusher.java | 52 ++++ .../invalidation/PushOpHandler.java | 25 ++ .../objects/repository/peersync/PeerInfo.java | 21 +- .../peersync/PeerInfoSerializer.java | 24 ++ .../repository/peersync/PeerInfoService.java | 7 +- .../src/main/proto/dhfs_objects_serial.proto | 36 ++- .../src/main/proto/dhfs_objects_sync.proto | 39 +-- .../src/main/resources/application.properties | 2 +- .../files/DhfsFileServiceSimpleTestImpl.java | 7 +- 43 files changed, 1484 insertions(+), 655 deletions(-) create mode 100644 dhfs-parent/server/src/main/java/com/usatiuk/dhfs/files/objects/ChunkDataSerializer.java create mode 100644 dhfs-parent/server/src/main/java/com/usatiuk/dhfs/files/objects/FileSerializer.java create mode 100644 dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/ReceivedObject.java create mode 100644 dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/RemoteObjPusherTxHook.java create mode 100644 dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/RemoteObjectMeta.java create mode 100644 dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/ReceivedObjectSerializer.java create mode 100644 dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/invalidation/DeferredInvalidationQueueData.java create mode 100644 dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/invalidation/DeferredInvalidationQueueService.java create mode 100644 dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/invalidation/IndexUpdateOp.java create mode 100644 dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/invalidation/IndexUpdateOpSerializer.java create mode 100644 dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/invalidation/InvalidationQueueService.java create mode 100644 dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/invalidation/JKleppmannTreeOpPTempSerializer.java create mode 100644 dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/invalidation/Op.java create mode 100644 dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/invalidation/OpHandler.java create mode 100644 dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/invalidation/OpPusher.java create mode 100644 dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/invalidation/PushOpHandler.java create mode 100644 dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/peersync/PeerInfoSerializer.java diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/files/objects/ChunkData.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/files/objects/ChunkData.java index 99811ff6..0f1033c7 100644 --- a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/files/objects/ChunkData.java +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/files/objects/ChunkData.java @@ -1,27 +1,13 @@ package com.usatiuk.dhfs.files.objects; import com.google.protobuf.ByteString; -import com.usatiuk.dhfs.objects.JDataRefcounted; +import com.usatiuk.autoprotomap.runtime.ProtoMirror; +import com.usatiuk.dhfs.objects.JDataRemote; import com.usatiuk.dhfs.objects.JObjectKey; -import org.pcollections.PCollection; -import org.pcollections.TreePSet; - -public record ChunkData(JObjectKey key, PCollection refsFrom, boolean frozen, - ByteString data) implements JDataRefcounted { - public ChunkData(JObjectKey key, ByteString data) { - this(key, TreePSet.empty(), false, data); - } - - @Override - public ChunkData withRefsFrom(PCollection refs) { - return new ChunkData(key, refs, frozen, data); - } - - @Override - public ChunkData withFrozen(boolean frozen) { - return new ChunkData(key, refsFrom, frozen, data); - } +import com.usatiuk.dhfs.objects.persistence.ChunkDataP; +//@ProtoMirror(ChunkDataP.class) +public record ChunkData(JObjectKey key, ByteString data) implements JDataRemote { @Override public int estimateSize() { return data.size(); diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/files/objects/ChunkDataSerializer.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/files/objects/ChunkDataSerializer.java new file mode 100644 index 00000000..f23a8da0 --- /dev/null +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/files/objects/ChunkDataSerializer.java @@ -0,0 +1,22 @@ +package com.usatiuk.dhfs.files.objects; + +import com.usatiuk.autoprotomap.runtime.ProtoSerializer; +import com.usatiuk.dhfs.objects.JObjectKey; +import com.usatiuk.dhfs.objects.persistence.ChunkDataP; +import jakarta.enterprise.context.ApplicationScoped; + +@ApplicationScoped +public class ChunkDataSerializer implements ProtoSerializer { + @Override + public ChunkData deserialize(ChunkDataP message) { + return new ChunkData(JObjectKey.of(message.getName()), message.getData()); + } + + @Override + public ChunkDataP serialize(ChunkData object) { + return ChunkDataP.newBuilder() + .setName(object.key().toString()) + .setData(object.data()) + .build(); + } +} diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/files/objects/File.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/files/objects/File.java index d6012ef9..a1878128 100644 --- a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/files/objects/File.java +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/files/objects/File.java @@ -1,52 +1,48 @@ package com.usatiuk.dhfs.files.objects; +import com.usatiuk.autoprotomap.runtime.ProtoMirror; import com.usatiuk.dhfs.objects.JObjectKey; -import org.pcollections.PCollection; +import com.usatiuk.dhfs.objects.persistence.ChunkDataP; import org.pcollections.TreePMap; import java.util.Collection; import java.util.Set; -public record File(JObjectKey key, PCollection refsFrom, boolean frozen, - long mode, long cTime, long mTime, +//@ProtoMirror(ChunkDataP.class) +public record File(JObjectKey key, long mode, long cTime, long mTime, TreePMap chunks, boolean symlink, long size ) implements FsNode { - @Override - public File withRefsFrom(PCollection refs) { - return new File(key, refs, frozen, mode, cTime, mTime, chunks, symlink, size); - } - - @Override - public File withFrozen(boolean frozen) { - return new File(key, refsFrom, frozen, mode, cTime, mTime, chunks, symlink, size); - } - public File withChunks(TreePMap chunks) { - return new File(key, refsFrom, frozen, mode, cTime, mTime, chunks, symlink, size); + return new File(key, mode, cTime, mTime, chunks, symlink, size); } public File withSymlink(boolean symlink) { - return new File(key, refsFrom, frozen, mode, cTime, mTime, chunks, symlink, size); + return new File(key, mode, cTime, mTime, chunks, symlink, size); } public File withSize(long size) { - return new File(key, refsFrom, frozen, mode, cTime, mTime, chunks, symlink, size); + return new File(key, mode, cTime, mTime, chunks, symlink, size); } public File withMode(long mode) { - return new File(key, refsFrom, frozen, mode, cTime, mTime, chunks, symlink, size); + return new File(key, mode, cTime, mTime, chunks, symlink, size); } public File withCTime(long cTime) { - return new File(key, refsFrom, frozen, mode, cTime, mTime, chunks, symlink, size); + return new File(key, mode, cTime, mTime, chunks, symlink, size); } public File withMTime(long mTime) { - return new File(key, refsFrom, frozen, mode, cTime, mTime, chunks, symlink, size); + return new File(key, mode, cTime, mTime, chunks, symlink, size); } @Override public Collection collectRefsTo() { return Set.copyOf(chunks().values()); } + + @Override + public int estimateSize() { + return chunks.size() * 64; + } } diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/files/objects/FileSerializer.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/files/objects/FileSerializer.java new file mode 100644 index 00000000..d5550a7c --- /dev/null +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/files/objects/FileSerializer.java @@ -0,0 +1,44 @@ +package com.usatiuk.dhfs.files.objects; + +import com.usatiuk.autoprotomap.runtime.ProtoSerializer; +import com.usatiuk.dhfs.objects.JObjectKey; +import com.usatiuk.dhfs.objects.persistence.FileP; +import jakarta.enterprise.context.ApplicationScoped; +import org.pcollections.TreePMap; + +@ApplicationScoped +public class FileSerializer implements ProtoSerializer { + @Override + public File deserialize(FileP message) { + TreePMap chunks = TreePMap.empty(); + for (var chunk : message.getChunksList()) { + chunks = chunks.plus(chunk.getStart(), JObjectKey.of(chunk.getId())); + } + var ret = new File(JObjectKey.of(message.getUuid()), + message.getMode(), + message.getCtime(), + message.getMtime(), + chunks, + message.getSymlink(), + message.getSize() + ); + return ret; + } + + @Override + public FileP serialize(File object) { + var builder = FileP.newBuilder() + .setUuid(object.key().toString()) + .setMode(object.mode()) + .setCtime(object.cTime()) + .setMtime(object.mTime()) + .setSymlink(object.symlink()) + .setSize(object.size()); + object.chunks().forEach((s, i) -> { + builder.addChunksBuilder() + .setStart(s) + .setId(i.toString()); + }); + return builder.build(); + } +} diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/files/objects/FsNode.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/files/objects/FsNode.java index 09b76015..a359d2b7 100644 --- a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/files/objects/FsNode.java +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/files/objects/FsNode.java @@ -1,8 +1,8 @@ package com.usatiuk.dhfs.files.objects; -import com.usatiuk.dhfs.objects.JDataRefcounted; +import com.usatiuk.dhfs.objects.JDataRemote; -public interface FsNode extends JDataRefcounted { +public interface FsNode extends JDataRemote { long mode(); long cTime(); diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/files/service/DhfsFileServiceImpl.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/files/service/DhfsFileServiceImpl.java index e1056659..53963c00 100644 --- a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/files/service/DhfsFileServiceImpl.java +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/files/service/DhfsFileServiceImpl.java @@ -4,9 +4,7 @@ import com.google.protobuf.ByteString; import com.google.protobuf.UnsafeByteOperations; import com.usatiuk.dhfs.files.objects.ChunkData; import com.usatiuk.dhfs.files.objects.File; -import com.usatiuk.dhfs.objects.JData; -import com.usatiuk.dhfs.objects.JObjectKey; -import com.usatiuk.dhfs.objects.TransactionManager; +import com.usatiuk.dhfs.objects.*; import com.usatiuk.dhfs.objects.jkleppmanntree.JKleppmannTreeManager; import com.usatiuk.dhfs.objects.jkleppmanntree.structs.JKleppmannTreeNode; import com.usatiuk.dhfs.objects.jkleppmanntree.structs.JKleppmannTreeNodeMeta; @@ -26,7 +24,6 @@ import jakarta.inject.Inject; import org.apache.commons.lang3.tuple.Pair; import org.eclipse.microprofile.config.inject.ConfigProperty; import org.pcollections.TreePMap; -import org.pcollections.TreePSet; import java.nio.charset.StandardCharsets; import java.nio.file.Path; @@ -38,6 +35,8 @@ public class DhfsFileServiceImpl implements DhfsFileService { @Inject Transaction curTx; @Inject + RemoteTransaction remoteTx; + @Inject TransactionManager jObjectTxManager; @ConfigProperty(name = "dhfs.files.target_chunk_size") @@ -76,7 +75,7 @@ public class DhfsFileServiceImpl implements DhfsFileService { private ChunkData createChunk(ByteString bytes) { var newChunk = new ChunkData(JObjectKey.of(UUID.randomUUID().toString()), bytes); - curTx.put(newChunk); + remoteTx.put(newChunk); return newChunk; } @@ -105,8 +104,13 @@ public class DhfsFileServiceImpl implements DhfsFileService { var ref = curTx.get(JData.class, uuid).orElse(null); if (ref == null) return Optional.empty(); GetattrRes ret; - if (ref instanceof File f) { - ret = new GetattrRes(f.mTime(), f.cTime(), f.mode(), f.symlink() ? GetattrType.SYMLINK : GetattrType.FILE); + if (ref instanceof RemoteObject r) { + var remote = remoteTx.getData(JDataRemote.class, uuid).orElse(null); + if (remote instanceof File f) { + ret = new GetattrRes(f.mTime(), f.cTime(), f.mode(), f.symlink() ? GetattrType.SYMLINK : GetattrType.FILE); + } else { + throw new StatusRuntimeException(Status.DATA_LOSS.withDescription("FsNode is not an FsNode: " + ref.key())); + } } else if (ref instanceof JKleppmannTreeNode) { ret = new GetattrRes(100, 100, 0700, GetattrType.DIRECTORY); } else { @@ -152,8 +156,8 @@ public class DhfsFileServiceImpl implements DhfsFileService { var fuuid = UUID.randomUUID(); Log.debug("Creating file " + fuuid); - File f = new File(JObjectKey.of(fuuid.toString()), TreePSet.empty(), false, mode, System.currentTimeMillis(), System.currentTimeMillis(), TreePMap.empty(), false, 0); - curTx.put(f); + File f = new File(JObjectKey.of(fuuid.toString()), mode, System.currentTimeMillis(), System.currentTimeMillis(), TreePMap.empty(), false, 0); + remoteTx.put(f); try { getTree().move(parent.key(), new JKleppmannTreeNodeMetaFile(fname, f.key()), getTree().getNewNodeId()); @@ -226,9 +230,14 @@ public class DhfsFileServiceImpl implements DhfsFileService { if (dent instanceof JKleppmannTreeNode) { return true; - } else if (dent instanceof File f) { - curTx.put(f.withMode(mode).withMTime(System.currentTimeMillis())); - return true; + } else if (dent instanceof RemoteObject) { + var remote = remoteTx.getData(JDataRemote.class, uuid).orElse(null); + if (remote instanceof File f) { + remoteTx.put(f.withMode(mode).withMTime(System.currentTimeMillis())); + return true; + } else { + throw new IllegalArgumentException(uuid + " is not a file"); + } } else { throw new IllegalArgumentException(uuid + " is not a file"); } @@ -255,7 +264,7 @@ public class DhfsFileServiceImpl implements DhfsFileService { if (offset < 0) throw new StatusRuntimeException(Status.INVALID_ARGUMENT.withDescription("Offset should be more than zero: " + offset)); - var file = curTx.get(File.class, fileUuid).orElse(null); + var file = remoteTx.getData(File.class, fileUuid).orElse(null); if (file == null) { Log.error("File not found when trying to read: " + fileUuid); return Optional.empty(); @@ -315,7 +324,7 @@ public class DhfsFileServiceImpl implements DhfsFileService { } private ByteString readChunk(JObjectKey uuid) { - var chunkRead = curTx.get(ChunkData.class, uuid).orElse(null); + var chunkRead = remoteTx.getData(ChunkData.class, uuid).orElse(null); if (chunkRead == null) { Log.error("Chunk requested not found: " + uuid); @@ -354,7 +363,7 @@ public class DhfsFileServiceImpl implements DhfsFileService { throw new StatusRuntimeException(Status.INVALID_ARGUMENT.withDescription("Offset should be more than zero: " + offset)); // FIXME: - var file = curTx.get(File.class, fileUuid, LockingStrategy.WRITE).orElse(null); + var file = remoteTx.getData(File.class, fileUuid, LockingStrategy.WRITE).orElse(null); if (file == null) { Log.error("File not found when trying to write: " + fileUuid); return -1L; @@ -367,7 +376,7 @@ public class DhfsFileServiceImpl implements DhfsFileService { if (size(fileUuid) < offset) { truncate(fileUuid, offset); - file = curTx.get(File.class, fileUuid).orElse(null); + file = remoteTx.getData(File.class, fileUuid).orElse(null); } var chunksAll = file.chunks(); @@ -493,7 +502,7 @@ public class DhfsFileServiceImpl implements DhfsFileService { } file = file.withChunks(file.chunks().minusAll(removedChunks.keySet()).plusAll(newChunks)).withMTime(System.currentTimeMillis()); - curTx.put(file); + remoteTx.put(file); cleanupChunks(file, removedChunks.values()); updateFileSize(file); @@ -507,7 +516,7 @@ public class DhfsFileServiceImpl implements DhfsFileService { if (length < 0) throw new StatusRuntimeException(Status.INVALID_ARGUMENT.withDescription("Length should be more than zero: " + length)); - var file = curTx.get(File.class, fileUuid).orElse(null); + var file = remoteTx.getData(File.class, fileUuid).orElse(null); if (file == null) { Log.error("File not found when trying to write: " + fileUuid); return false; @@ -517,7 +526,7 @@ public class DhfsFileServiceImpl implements DhfsFileService { var oldChunks = file.chunks(); file = file.withChunks(TreePMap.empty()).withMTime(System.currentTimeMillis()); - curTx.put(file); + remoteTx.put(file); cleanupChunks(file, oldChunks.values()); updateFileSize(file); return true; @@ -578,7 +587,7 @@ public class DhfsFileServiceImpl implements DhfsFileService { } file = file.withChunks(file.chunks().minusAll(removedChunks.keySet()).plusAll(newChunks)).withMTime(System.currentTimeMillis()); - curTx.put(file); + remoteTx.put(file); cleanupChunks(file, removedChunks.values()); updateFileSize(file); return true; @@ -595,7 +604,7 @@ public class DhfsFileServiceImpl implements DhfsFileService { @Override public ByteString readlinkBS(JObjectKey uuid) { return jObjectTxManager.executeTx(() -> { - var fileOpt = curTx.get(File.class, uuid).orElseThrow(() -> new StatusRuntimeException(Status.NOT_FOUND.withDescription("File not found when trying to readlink: " + uuid))); + var fileOpt = remoteTx.getData(File.class, uuid).orElseThrow(() -> new StatusRuntimeException(Status.NOT_FOUND.withDescription("File not found when trying to readlink: " + uuid))); return read(uuid, 0, Math.toIntExact(size(uuid))).get(); }); } @@ -614,8 +623,7 @@ public class DhfsFileServiceImpl implements DhfsFileService { Log.debug("Creating file " + fuuid); ChunkData newChunkData = createChunk(UnsafeByteOperations.unsafeWrap(oldpath.getBytes(StandardCharsets.UTF_8))); - File f = new File(JObjectKey.of(fuuid.toString()), TreePSet.empty(), - false, 0, System.currentTimeMillis(), System.currentTimeMillis(), TreePMap.empty().plus(0L, newChunkData.key()), true, 0); + File f = new File(JObjectKey.of(fuuid.toString()), 0, System.currentTimeMillis(), System.currentTimeMillis(), TreePMap.empty().plus(0L, newChunkData.key()), true, 0); updateFileSize(f); @@ -627,12 +635,12 @@ public class DhfsFileServiceImpl implements DhfsFileService { @Override public Boolean setTimes(JObjectKey fileUuid, long atimeMs, long mtimeMs) { return jObjectTxManager.executeTx(() -> { - var file = curTx.get(File.class, fileUuid).orElseThrow( + var file = remoteTx.getData(File.class, fileUuid).orElseThrow( () -> new StatusRuntimeException(Status.NOT_FOUND.withDescription( "File not found for setTimes: " + fileUuid)) ); - curTx.put(file.withCTime(atimeMs).withMTime(mtimeMs)); + remoteTx.put(file.withCTime(atimeMs).withMTime(mtimeMs)); return true; }); } @@ -649,7 +657,7 @@ public class DhfsFileServiceImpl implements DhfsFileService { } if (realSize != file.size()) { - curTx.put(file.withSize(realSize)); + remoteTx.put(file.withSize(realSize)); } }); } @@ -657,7 +665,7 @@ public class DhfsFileServiceImpl implements DhfsFileService { @Override public Long size(JObjectKey uuid) { return jObjectTxManager.executeTx(() -> { - var read = curTx.get(File.class, uuid) + var read = remoteTx.getData(File.class, uuid) .orElseThrow(() -> new StatusRuntimeException(Status.NOT_FOUND)); return read.size(); diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/JDataRemote.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/JDataRemote.java index 4386f03e..531fe8ad 100644 --- a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/JDataRemote.java +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/JDataRemote.java @@ -1,4 +1,21 @@ package com.usatiuk.dhfs.objects; -public interface JDataRemote { +import com.usatiuk.autoprotomap.runtime.ProtoMirror; +import com.usatiuk.dhfs.objects.persistence.RemoteObjectP; + +import java.io.Serializable; +import java.util.Collection; +import java.util.List; + +@ProtoMirror(RemoteObjectP.class) +public interface JDataRemote extends Serializable { + JObjectKey key(); + + default int estimateSize() { + return 100; + } + + default Collection collectRefsTo() { + return List.of(); + } } diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/PeerId.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/PeerId.java index 5c34de0e..a85ae068 100644 --- a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/PeerId.java +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/PeerId.java @@ -3,7 +3,7 @@ package com.usatiuk.dhfs.objects; import java.io.Serializable; import java.util.UUID; -public record PeerId(UUID id) implements Serializable { +public record PeerId(UUID id) implements Serializable, Comparable { public static PeerId of(UUID id) { return new PeerId(id); } @@ -20,4 +20,9 @@ public record PeerId(UUID id) implements Serializable { public JObjectKey toJObjectKey() { return JObjectKey.of(id.toString()); } + + @Override + public int compareTo(PeerId o) { + return id.compareTo(o.id); + } } diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/ReceivedObject.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/ReceivedObject.java new file mode 100644 index 00000000..30e92654 --- /dev/null +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/ReceivedObject.java @@ -0,0 +1,6 @@ +package com.usatiuk.dhfs.objects; + +import org.pcollections.PMap; + +public record ReceivedObject(JObjectKey key, PMap changelog, JDataRemote data) { +} diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/RefcounterTxHook.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/RefcounterTxHook.java index e4f945c7..e239b8f2 100644 --- a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/RefcounterTxHook.java +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/RefcounterTxHook.java @@ -1,5 +1,8 @@ package com.usatiuk.dhfs.objects; +import com.usatiuk.dhfs.objects.jkleppmanntree.structs.JKleppmannTreeNode; +import com.usatiuk.dhfs.objects.jkleppmanntree.structs.JKleppmannTreeNodeMetaFile; +import com.usatiuk.dhfs.objects.jkleppmanntree.structs.JKleppmannTreePersistentData; import com.usatiuk.dhfs.objects.transaction.Transaction; import jakarta.enterprise.context.ApplicationScoped; import jakarta.inject.Inject; @@ -9,6 +12,21 @@ public class RefcounterTxHook implements PreCommitTxHook { @Inject Transaction curTx; + private JDataRefcounted getRef(JDataRefcounted cur, JObjectKey key) { + var found = curTx.get(JDataRefcounted.class, key).orElse(null); + + if (found != null) { + return found; + } + + if (cur instanceof RemoteObject || cur instanceof JKleppmannTreeNode) { + return new RemoteObject<>(key); + } else { + return found; + } + + } + @Override public void onChange(JObjectKey key, JData old, JData cur) { if (!(cur instanceof JDataRefcounted refCur)) { @@ -21,14 +39,14 @@ public class RefcounterTxHook implements PreCommitTxHook { for (var curRef : curRefs) { if (!oldRefs.contains(curRef)) { - var referenced = curTx.get(JDataRefcounted.class, curRef).orElse(null); + var referenced = getRef(refCur, curRef); curTx.put(referenced.withRefsFrom(referenced.refsFrom().plus(key))); } } for (var oldRef : oldRefs) { if (!curRefs.contains(oldRef)) { - var referenced = curTx.get(JDataRefcounted.class, oldRef).orElse(null); + var referenced = getRef(refCur, oldRef); curTx.put(referenced.withRefsFrom(referenced.refsFrom().minus(key))); } } @@ -41,7 +59,7 @@ public class RefcounterTxHook implements PreCommitTxHook { } for (var newRef : refCur.collectRefsTo()) { - var referenced = curTx.get(JDataRefcounted.class, newRef).orElse(null); + var referenced = getRef(refCur, newRef); curTx.put(referenced.withRefsFrom(referenced.refsFrom().plus(key))); } } @@ -53,7 +71,7 @@ public class RefcounterTxHook implements PreCommitTxHook { } for (var removedRef : refCur.collectRefsTo()) { - var referenced = curTx.get(JDataRefcounted.class, removedRef).orElse(null); + var referenced = getRef(refCur, removedRef); curTx.put(referenced.withRefsFrom(referenced.refsFrom().minus(key))); } } diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/RemoteObjPusherTxHook.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/RemoteObjPusherTxHook.java new file mode 100644 index 00000000..e83bc163 --- /dev/null +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/RemoteObjPusherTxHook.java @@ -0,0 +1,49 @@ +package com.usatiuk.dhfs.objects; + +import com.usatiuk.dhfs.objects.jkleppmanntree.structs.JKleppmannTreePersistentData; +import com.usatiuk.dhfs.objects.repository.invalidation.InvalidationQueueService; +import com.usatiuk.dhfs.objects.transaction.Transaction; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; + +@ApplicationScoped +public class RemoteObjPusherTxHook implements PreCommitTxHook { + @Inject + Transaction curTx; + @Inject + InvalidationQueueService invalidationQueueService; + + @Override + public void onChange(JObjectKey key, JData old, JData cur) { + boolean invalidate = switch (cur) { + case RemoteObject remote -> !remote.meta().changelog().equals(((RemoteObject) old).meta().changelog()); + case JKleppmannTreePersistentData pd -> !pd.queues().equals(((JKleppmannTreePersistentData) old).queues()); + default -> false; + }; + + if (invalidate) { + invalidationQueueService.pushInvalidationToAll(cur.key()); + } + } + + @Override + public void onCreate(JObjectKey key, JData cur) { + if (!(cur instanceof RemoteObject remote)) { + return; + } + + invalidationQueueService.pushInvalidationToAll(remote.key()); + } + + @Override + public void onDelete(JObjectKey key, JData cur) { + if (!(cur instanceof RemoteObject remote)) { + return; + } + } + + @Override + public int getPriority() { + return 100; + } +} diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/RemoteObject.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/RemoteObject.java index 46719854..a965c1ba 100644 --- a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/RemoteObject.java +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/RemoteObject.java @@ -1,67 +1,71 @@ package com.usatiuk.dhfs.objects; +import org.pcollections.HashTreePSet; import org.pcollections.PCollection; import org.pcollections.PMap; -import org.pcollections.PSet; +import org.pcollections.TreePMap; +import javax.annotation.Nullable; import java.util.Collection; import java.util.List; -public record RemoteObject( - JObjectKey key, PCollection refsFrom, boolean frozen, - PMap knownRemoteVersions, - Class knownType, - PSet confirmedDeletes, - boolean seen, - PMap changelog, - boolean haveLocal -) implements JDataRefcounted { +public record RemoteObject(PCollection refsFrom, boolean frozen, + RemoteObjectMeta meta, @Nullable T data) implements JDataRefcounted { + public RemoteObject(T data, PeerId initialPeer) { + this(HashTreePSet.empty(), false, new RemoteObjectMeta(data.key(), data.getClass(), initialPeer), data); + } + + public RemoteObject(JObjectKey key, PMap remoteChangelog) { + this(HashTreePSet.empty(), false, new RemoteObjectMeta(key, remoteChangelog), null); + } + + public RemoteObject(JObjectKey key) { + this(HashTreePSet.empty(), false, new RemoteObjectMeta(key, TreePMap.empty()), null); + } + + @Override + public JObjectKey key() { + if (data != null && !data.key().equals(meta.key())) + throw new IllegalStateException("Corrupted object, key mismatch: " + meta.key() + " vs " + data.key()); + return meta.key(); + } + @Override public RemoteObject withRefsFrom(PCollection refs) { - return new RemoteObject<>(key, refs, frozen, knownRemoteVersions, knownType, confirmedDeletes, seen, changelog, haveLocal); + return new RemoteObject<>(refs, frozen, meta, data); } @Override public RemoteObject withFrozen(boolean frozen) { - return new RemoteObject<>(key, refsFrom, frozen, knownRemoteVersions, knownType, confirmedDeletes, seen, changelog, haveLocal); + return new RemoteObject<>(refsFrom, frozen, meta, data); } - public RemoteObject withKnownRemoteVersions(PMap knownRemoteVersions) { - return new RemoteObject<>(key, refsFrom, frozen, knownRemoteVersions, knownType, confirmedDeletes, seen, changelog, haveLocal); + public RemoteObject withMeta(RemoteObjectMeta meta) { + return new RemoteObject<>(refsFrom, frozen, meta, data); } - public RemoteObject withKnownType(Class knownType) { - return new RemoteObject<>(key, refsFrom, frozen, knownRemoteVersions, knownType, confirmedDeletes, seen, changelog, haveLocal); + public RemoteObject withData(T data) { + return new RemoteObject<>(refsFrom, frozen, meta, data); } - public RemoteObject withConfirmedDeletes(PSet confirmedDeletes) { - return new RemoteObject<>(key, refsFrom, frozen, knownRemoteVersions, knownType, confirmedDeletes, seen, changelog, haveLocal); + public RemoteObject withRefsFrom(PCollection refs, boolean frozen) { + return new RemoteObject<>(refs, frozen, meta, data); } - public RemoteObject withSeen(boolean seen) { - return new RemoteObject<>(key, refsFrom, frozen, knownRemoteVersions, knownType, confirmedDeletes, seen, changelog, haveLocal); - } - - public RemoteObject withChangelog(PMap changelog) { - return new RemoteObject<>(key, refsFrom, frozen, knownRemoteVersions, knownType, confirmedDeletes, seen, changelog, haveLocal); - } - - public RemoteObject withHaveLocal(boolean haveLocal) { - return new RemoteObject<>(key, refsFrom, frozen, knownRemoteVersions, knownType, confirmedDeletes, seen, changelog, haveLocal); - } - - public static JObjectKey keyFrom(JObjectKey key) { - return new JObjectKey(key + "_remote"); - } - - public JObjectKey localKey() { - if (!haveLocal) throw new IllegalStateException("No local key"); - return JObjectKey.of(key.name().substring(0, key.name().length() - "_remote".length())); + public ReceivedObject toReceivedObject() { + if (data == null) + throw new IllegalStateException("Cannot convert to ReceivedObject without data: " + meta.key()); + return new ReceivedObject(meta.key(), meta.changelog(), data); } @Override public Collection collectRefsTo() { - if (haveLocal) return List.of(localKey()); + if (data != null) return data.collectRefsTo(); return List.of(); } + + @Override + public int estimateSize() { + return data == null ? 1000 : data.estimateSize(); + } } diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/RemoteObjectMeta.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/RemoteObjectMeta.java new file mode 100644 index 00000000..2642525a --- /dev/null +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/RemoteObjectMeta.java @@ -0,0 +1,53 @@ +package com.usatiuk.dhfs.objects; + +import org.pcollections.HashTreePMap; +import org.pcollections.HashTreePSet; +import org.pcollections.PMap; +import org.pcollections.PSet; + +import java.io.Serializable; + +public record RemoteObjectMeta( + JObjectKey key, + PMap knownRemoteVersions, + Class knownType, + PSet confirmedDeletes, + boolean seen, + PMap changelog) implements Serializable { + public RemoteObjectMeta(JObjectKey key, Class type, PeerId initialPeer) { + this(key, HashTreePMap.empty(), type, HashTreePSet.empty(), true, + HashTreePMap.empty().plus(initialPeer, 1L)); + } + + public RemoteObjectMeta(JObjectKey key, PMap remoteChangelog) { + this(key, HashTreePMap.empty(), JDataRemote.class, HashTreePSet.empty(), true, remoteChangelog); + } + + public RemoteObjectMeta withKnownRemoteVersions(PMap knownRemoteVersions) { + return new RemoteObjectMeta(key, knownRemoteVersions, knownType, confirmedDeletes, seen, changelog); + } + + public RemoteObjectMeta withKnownType(Class knownType) { + return new RemoteObjectMeta(key, knownRemoteVersions, knownType, confirmedDeletes, seen, changelog); + } + + public RemoteObjectMeta withConfirmedDeletes(PSet confirmedDeletes) { + return new RemoteObjectMeta(key, knownRemoteVersions, knownType, confirmedDeletes, seen, changelog); + } + + public RemoteObjectMeta withSeen(boolean seen) { + return new RemoteObjectMeta(key, knownRemoteVersions, knownType, confirmedDeletes, seen, changelog); + } + + public RemoteObjectMeta withChangelog(PMap changelog) { + return new RemoteObjectMeta(key, knownRemoteVersions, knownType, confirmedDeletes, seen, changelog); + } + + public RemoteObjectMeta withHaveLocal(boolean haveLocal) { + return new RemoteObjectMeta(key, knownRemoteVersions, knownType, confirmedDeletes, seen, changelog); + } + + public long versionSum() { + return changelog.values().stream().mapToLong(Long::longValue).sum(); + } +} diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/RemoteTransaction.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/RemoteTransaction.java index b5086e4a..e7187193 100644 --- a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/RemoteTransaction.java +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/RemoteTransaction.java @@ -1,10 +1,13 @@ package com.usatiuk.dhfs.objects; +import com.usatiuk.dhfs.objects.repository.PersistentPeerDataService; +import com.usatiuk.dhfs.objects.repository.RemoteObjectServiceClient; +import com.usatiuk.dhfs.objects.repository.SyncHandler; import com.usatiuk.dhfs.objects.transaction.LockingStrategy; import com.usatiuk.dhfs.objects.transaction.Transaction; import jakarta.enterprise.context.ApplicationScoped; import jakarta.inject.Inject; -import org.apache.commons.lang3.NotImplementedException; +import org.apache.commons.lang3.mutable.MutableObject; import java.util.Optional; @@ -12,20 +15,96 @@ import java.util.Optional; public class RemoteTransaction { @Inject Transaction curTx; + @Inject + RemoteObjectServiceClient remoteObjectServiceClient; + @Inject + SyncHandler syncHandler; + @Inject + PersistentPeerDataService persistentPeerDataService; public long getId() { return curTx.getId(); } - public Optional get(Class type, JObjectKey key, LockingStrategy strategy) { - throw new NotImplementedException(); + private RemoteObject tryDownloadRemote(RemoteObject obj) { + MutableObject> success = new MutableObject<>(null); + + remoteObjectServiceClient.getObject(obj.key(), rcv -> { + if (!obj.meta().knownType().isInstance(rcv.getRight().data())) + throw new IllegalStateException("Object type mismatch: " + obj.meta().knownType() + " vs " + rcv.getRight().data().getClass()); + + if (!rcv.getRight().changelog().equals(obj.meta().changelog())) { + var updated = syncHandler.handleRemoteUpdate(rcv.getLeft(), obj.key(), obj, rcv.getRight().changelog()); + if (!rcv.getRight().changelog().equals(updated.meta().changelog())) + throw new IllegalStateException("Changelog mismatch, update failed?: " + rcv.getRight().changelog() + " vs " + updated.meta().changelog()); + success.setValue(updated.withData((T) rcv.getRight().data())); + } else { + success.setValue(obj.withData((T) rcv.getRight().data())); + } + return true; + }); + + curTx.put(success.getValue()); + return success.getValue(); } - public void put(JData obj) { - throw new NotImplementedException(); + @SuppressWarnings("unchecked") + public Optional> get(Class type, JObjectKey key, LockingStrategy strategy) { + return curTx.get(RemoteObject.class, key, strategy) + .map(obj -> { + if (obj.data() != null && !type.isInstance(obj.data())) + throw new IllegalStateException("Object (real) type mismatch: " + obj.data().getClass() + " vs " + type); + if (!type.isAssignableFrom(obj.meta().knownType())) + throw new IllegalStateException("Object (meta) type mismatch: " + obj.meta().knownType() + " vs " + type); + + if (obj.data() != null) + return obj; + else + return tryDownloadRemote(obj); + }); } - public Optional get(Class type, JObjectKey key) { + public Optional getMeta(JObjectKey key, LockingStrategy strategy) { + return curTx.get(RemoteObject.class, key, strategy).map(obj -> obj.meta()); + } + + public Optional getData(Class type, JObjectKey key, LockingStrategy strategy) { + return get(type, key, strategy).map(RemoteObject::data); + } + + + public void put(RemoteObject obj) { + curTx.put(obj); + } + + public void put(T obj) { + var cur = get((Class) obj.getClass(), obj.key()).orElse(null); + + if (cur == null) { + curTx.put(new RemoteObject<>(obj, persistentPeerDataService.getSelfUuid())); + return; + } + + if (cur.data() != null && cur.data().equals(obj)) + return; + if (cur.data() != null && !cur.data().getClass().equals(obj.getClass())) + throw new IllegalStateException("Object type mismatch: " + cur.data().getClass() + " vs " + obj.getClass()); + var newMeta = cur.meta(); + newMeta = newMeta.withChangelog(newMeta.changelog().plus(persistentPeerDataService.getSelfUuid(), + newMeta.changelog().get(persistentPeerDataService.getSelfUuid()) + 1)); + var newObj = cur.withData(obj).withMeta(newMeta); + curTx.put(newObj); + } + + public Optional> get(Class type, JObjectKey key) { return get(type, key, LockingStrategy.OPTIMISTIC); } + + public Optional getMeta(JObjectKey key) { + return getMeta(key, LockingStrategy.OPTIMISTIC); + } + + public Optional getData(Class type, JObjectKey key) { + return getData(type, key, LockingStrategy.OPTIMISTIC); + } } diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/jkleppmanntree/JKleppmannTreeManager.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/jkleppmanntree/JKleppmannTreeManager.java index ecdc816c..cfcf5036 100644 --- a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/jkleppmanntree/JKleppmannTreeManager.java +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/jkleppmanntree/JKleppmannTreeManager.java @@ -1,36 +1,39 @@ package com.usatiuk.dhfs.objects.jkleppmanntree; import com.usatiuk.dhfs.objects.JObjectKey; +import com.usatiuk.dhfs.objects.PeerId; import com.usatiuk.dhfs.objects.TransactionManager; import com.usatiuk.dhfs.objects.jkleppmanntree.structs.JKleppmannTreeNode; import com.usatiuk.dhfs.objects.jkleppmanntree.structs.JKleppmannTreeNodeMeta; import com.usatiuk.dhfs.objects.jkleppmanntree.structs.JKleppmannTreeNodeMetaDirectory; import com.usatiuk.dhfs.objects.jkleppmanntree.structs.JKleppmannTreePersistentData; +import com.usatiuk.dhfs.objects.repository.invalidation.Op; +import com.usatiuk.dhfs.objects.repository.peersync.PeerInfoService; import com.usatiuk.dhfs.objects.transaction.LockingStrategy; import com.usatiuk.dhfs.objects.transaction.Transaction; import com.usatiuk.kleppmanntree.*; +import io.quarkus.logging.Log; import jakarta.enterprise.context.ApplicationScoped; import jakarta.inject.Inject; import org.apache.commons.lang3.tuple.Pair; +import org.pcollections.HashTreePMap; +import org.pcollections.TreePMap; import org.pcollections.TreePSet; -import java.util.HashMap; -import java.util.List; -import java.util.TreeMap; -import java.util.UUID; +import java.util.*; import java.util.function.Function; @ApplicationScoped public class JKleppmannTreeManager { private static final String dataFileName = "trees"; @Inject - JKleppmannTreePeerInterface jKleppmannTreePeerInterface; - @Inject Transaction curTx; @Inject TransactionManager txManager; @Inject JKleppmannTreePeerInterface peerInterface; + @Inject + PeerInfoService peerInfoService; public JKleppmannTree getTree(JObjectKey name) { return txManager.executeTx(() -> { @@ -41,7 +44,7 @@ public class JKleppmannTreeManager { TreePSet.empty(), true, 1L, - new HashMap<>(), + HashTreePMap.empty(), new HashMap<>(), new TreeMap<>() ); @@ -57,7 +60,7 @@ public class JKleppmannTreeManager { } public class JKleppmannTree { - private final KleppmannTree _tree; + private final KleppmannTree _tree; private final JKleppmannTreeStorageInterface _storageInterface; private final JKleppmannTreeClock _clock; private final JObjectKey _treeName; @@ -89,105 +92,71 @@ public class JKleppmannTreeManager { _tree.move(_storageInterface.getTrashId(), newMeta.withName(nodeKey.toString()), nodeKey); } -// @Override -// public boolean hasPendingOpsForHost(UUID host) { -// return _persistentData.get() -// .runReadLocked(JObjectManager.ResolutionStrategy.LOCAL_ONLY, -// (m, d) -> d.getQueues().containsKey(host) && -// !d.getQueues().get(host).isEmpty() -// ); -// } -// -// @Override -// public List getPendingOpsForHost(UUID host, int limit) { -// return _persistentData.get().runReadLocked(JObjectManager.ResolutionStrategy.LOCAL_ONLY, (m, d) -> { -// if (d.getQueues().containsKey(host)) { -// var queue = d.getQueues().get(host); -// ArrayList collected = new ArrayList<>(); -// -// for (var node : queue.entrySet()) { -// collected.add(new JKleppmannTreeOpWrapper(node.getValue())); -// if (collected.size() >= limit) break; -// } -// -// return collected; -// } -// return List.of(); -// }); -// } + public boolean hasPendingOpsForHost(PeerId host) { + return !_data.queues().getOrDefault(host, TreePMap.empty()).isEmpty(); + } -// @Override -// public String getId() { -// return _treeName; -// } + public List getPendingOpsForHost(PeerId host, int limit) { + ArrayList collected = new ArrayList<>(); + for (var node : _data.queues().getOrDefault(host, TreePMap.empty()).entrySet()) { + collected.add(new JKleppmannTreeOpWrapper(_data.key(), node.getValue())); + if (collected.size() >= limit) break; + } + return Collections.unmodifiableList(collected); + } -// @Override -// public void commitOpForHost(UUID host, Op op) { -// if (!(op instanceof JKleppmannTreeOpWrapper jop)) -// throw new IllegalArgumentException("Invalid incoming op type for JKleppmannTree: " + op.getClass() + " " + getId()); -// _persistentData.get().assertRwLock(); -// _persistentData.get().tryResolve(JObjectManager.ResolutionStrategy.LOCAL_ONLY); -// -// var got = _persistentData.get().getData().getQueues().get(host).firstEntry().getValue(); -// if (!Objects.equals(jop.getOp(), got)) -// throw new IllegalArgumentException("Committed op push was not the oldest"); -// -// _persistentData.get().mutate(new JMutator() { -// @Override -// public boolean mutate(JKleppmannTreePersistentData object) { -// object.getQueues().get(host).pollFirstEntry(); -// return true; -// } -// -// @Override -// public void revert(JKleppmannTreePersistentData object) { -// object.getQueues().get(host).put(jop.getOp().timestamp(), jop.getOp()); -// } -// }); -// -// } + // @Override + public void commitOpForHost(PeerId host, Op op) { + if (!(op instanceof JKleppmannTreeOpWrapper jop)) + throw new IllegalArgumentException("Invalid incoming op type for JKleppmannTree: " + op.getClass()); -// @Override -// public void pushBootstrap(UUID host) { -// _tree.recordBoostrapFor(host); -// } + var firstOp = _data.queues().get(host).firstEntry().getValue(); + if (!Objects.equals(firstOp, jop.op())) + throw new IllegalArgumentException("Committed op push was not the oldest"); - public Pair findParent(Function, Boolean> predicate) { - return _tree.findParent(predicate); + _data = _data.withQueues(_data.queues().plus(host, _data.queues().get(host).minus(_data.queues().get(host).firstKey()))); } // @Override -// public boolean acceptExternalOp(UUID from, Op op) { -// if (op instanceof JKleppmannTreePeriodicPushOp pushOp) { -// return _tree.updateExternalTimestamp(pushOp.getFrom(), pushOp.getTimestamp()); -// } -// -// if (!(op instanceof JKleppmannTreeOpWrapper jop)) -// throw new IllegalArgumentException("Invalid incoming op type for JKleppmannTree: " + op.getClass() + " " + getId()); -// -// JObject fileRef; -// if (jop.getOp().newMeta() instanceof JKleppmannTreeNodeMetaFile f) { +// public void pushBootstrap(PeerId host) { +// _tree.recordBoostrapFor(host); +// } + + public Pair findParent(Function, Boolean> predicate) { + return _tree.findParent(predicate); + } + + // @Override + public boolean acceptExternalOp(PeerId from, Op op) { + if (op instanceof JKleppmannTreePeriodicPushOp pushOp) { + return _tree.updateExternalTimestamp(pushOp.getFrom(), pushOp.getTimestamp()); + } + + if (!(op instanceof JKleppmannTreeOpWrapper jop)) + throw new IllegalArgumentException("Invalid incoming op type for JKleppmannTree: " + op.getClass()); + +// if (jop.op().newMeta() instanceof JKleppmannTreeNodeMetaFile f) { // var fino = f.getFileIno(); // fileRef = jObjectManager.getOrPut(fino, File.class, Optional.of(jop.getOp().childId())); // } else { // fileRef = null; // } -// -// if (Log.isTraceEnabled()) -// Log.trace("Received op from " + from + ": " + jop.getOp().timestamp().timestamp() + " " + jop.getOp().childId() + "->" + jop.getOp().newParentId() + " as " + jop.getOp().newMeta().getName()); -// -// try { -// _tree.applyExternalOp(from, jop.getOp()); -// } catch (Exception e) { -// Log.error("Error applying external op", e); -// throw e; -// } finally { -// // FIXME: -// // Fixup the ref if it didn't really get applied -// + + if (Log.isTraceEnabled()) + Log.trace("Received op from " + from + ": " + jop.op().timestamp().timestamp() + " " + jop.op().childId() + "->" + jop.op().newParentId() + " as " + jop.op().newMeta().getName()); + + try { + _tree.applyExternalOp(from, jop.op()); + } catch (Exception e) { + Log.error("Error applying external op", e); + throw e; + } finally { + // FIXME: + // Fixup the ref if it didn't really get applied + // if ((fileRef == null) && (jop.getOp().newMeta() instanceof JKleppmannTreeNodeMetaFile)) // Log.error("Could not create child of pushed op: " + jop.getOp()); -// + // if (jop.getOp().newMeta() instanceof JKleppmannTreeNodeMetaFile f) { // if (fileRef != null) { // var got = jObjectManager.get(jop.getOp().childId()).orElse(null); @@ -216,9 +185,9 @@ public class JKleppmannTreeManager { // } // } // } -// } -// return true; -// } + } + return true; + } // @Override // public Op getPeriodicPushOp() { @@ -232,9 +201,12 @@ public class JKleppmannTreeManager { // _persistentData.get().rwUnlock(); // } - private class JOpRecorder implements OpRecorder { + private class JOpRecorder implements OpRecorder { @Override - public void recordOp(OpMove op) { + public void recordOp(OpMove op) { + for (var p : peerInfoService.getPeersNoSelf()) { + recordOpForPeer(p.id(), op); + } // _persistentData.get().assertRwLock(); // _persistentData.get().tryResolve(JObjectManager.ResolutionStrategy.LOCAL_ONLY); // var hostUuds = persistentPeerDataService.getHostUuids().stream().toList(); @@ -254,7 +226,8 @@ public class JKleppmannTreeManager { } @Override - public void recordOpForPeer(UUID peer, OpMove op) { + public void recordOpForPeer(PeerId peer, OpMove op) { + _data = _data.withQueues(_data.queues().plus(peer, _data.queues().getOrDefault(peer, TreePMap.empty()).plus(op.timestamp(), op))); // _persistentData.get().assertRwLock(); // _persistentData.get().tryResolve(JObjectManager.ResolutionStrategy.LOCAL_ONLY); // _persistentData.get().mutate(new JMutator() { @@ -296,7 +269,7 @@ public class JKleppmannTreeManager { } } - public class JKleppmannTreeStorageInterface implements StorageInterface { + public class JKleppmannTreeStorageInterface implements StorageInterface { private final LogWrapper _logWrapper = new LogWrapper(); private final PeerLogWrapper _peerLogWrapper = new PeerLogWrapper(); @@ -330,7 +303,7 @@ public class JKleppmannTreeManager { } @Override - public void putNode(TreeNode node) { + public void putNode(TreeNode node) { curTx.put(((JKleppmannTreeNode) node)); } @@ -340,23 +313,23 @@ public class JKleppmannTreeManager { } @Override - public LogInterface getLog() { + public LogInterface getLog() { return _logWrapper; } @Override - public PeerTimestampLogInterface getPeerTimestampLog() { + public PeerTimestampLogInterface getPeerTimestampLog() { return _peerLogWrapper; } - private class PeerLogWrapper implements PeerTimestampLogInterface { + private class PeerLogWrapper implements PeerTimestampLogInterface { @Override - public Long getForPeer(UUID peerId) { + public Long getForPeer(PeerId peerId) { return _data.peerTimestampLog().get(peerId); } @Override - public void putForPeer(UUID peerId, Long timestamp) { + public void putForPeer(PeerId peerId, Long timestamp) { var newPeerTimestampLog = new HashMap<>(_data.peerTimestampLog()); newPeerTimestampLog.put(peerId, timestamp); _data = _data.withPeerTimestampLog(newPeerTimestampLog); @@ -364,16 +337,16 @@ public class JKleppmannTreeManager { } } - private class LogWrapper implements LogInterface { + private class LogWrapper implements LogInterface { @Override - public Pair, LogRecord> peekOldest() { + public Pair, LogRecord> peekOldest() { var ret = _data.log().firstEntry(); if (ret == null) return null; return Pair.of(ret); } @Override - public Pair, LogRecord> takeOldest() { + public Pair, LogRecord> takeOldest() { var newLog = new TreeMap<>(_data.log()); var ret = newLog.pollFirstEntry(); _data = _data.withLog(newLog); @@ -383,19 +356,19 @@ public class JKleppmannTreeManager { } @Override - public Pair, LogRecord> peekNewest() { + public Pair, LogRecord> peekNewest() { var ret = _data.log().lastEntry(); if (ret == null) return null; return Pair.of(ret); } @Override - public List, LogRecord>> newestSlice(CombinedTimestamp since, boolean inclusive) { + public List, LogRecord>> newestSlice(CombinedTimestamp since, boolean inclusive) { return _data.log().tailMap(since, inclusive).entrySet().stream().map(e -> Pair.of(e.getKey(), e.getValue())).toList(); } @Override - public List, LogRecord>> getAll() { + public List, LogRecord>> getAll() { return _data.log().entrySet().stream().map(e -> Pair.of(e.getKey(), e.getValue())).toList(); } @@ -405,7 +378,7 @@ public class JKleppmannTreeManager { } @Override - public boolean containsKey(CombinedTimestamp timestamp) { + public boolean containsKey(CombinedTimestamp timestamp) { return _data.log().containsKey(timestamp); } @@ -415,7 +388,7 @@ public class JKleppmannTreeManager { } @Override - public void put(CombinedTimestamp timestamp, LogRecord record) { + public void put(CombinedTimestamp timestamp, LogRecord record) { if (_data.log().containsKey(timestamp)) throw new IllegalStateException("Overwriting log entry?"); var newLog = new TreeMap<>(_data.log()); @@ -425,7 +398,7 @@ public class JKleppmannTreeManager { } @Override - public void replace(CombinedTimestamp timestamp, LogRecord record) { + public void replace(CombinedTimestamp timestamp, LogRecord record) { var newLog = new TreeMap<>(_data.log()); newLog.put(timestamp, record); _data = _data.withLog(newLog); diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/jkleppmanntree/JKleppmannTreeOpWrapper.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/jkleppmanntree/JKleppmannTreeOpWrapper.java index cf734a4e..209c43df 100644 --- a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/jkleppmanntree/JKleppmannTreeOpWrapper.java +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/jkleppmanntree/JKleppmannTreeOpWrapper.java @@ -1,24 +1,16 @@ package com.usatiuk.dhfs.objects.jkleppmanntree; import com.usatiuk.dhfs.objects.JObjectKey; +import com.usatiuk.dhfs.objects.PeerId; import com.usatiuk.dhfs.objects.jkleppmanntree.structs.JKleppmannTreeNodeMeta; +import com.usatiuk.dhfs.objects.repository.invalidation.Op; import com.usatiuk.kleppmanntree.OpMove; -import java.util.UUID; +import java.io.Serializable; // Wrapper to avoid having to specify generic types -public class JKleppmannTreeOpWrapper { - private final OpMove _op; - - public JKleppmannTreeOpWrapper(OpMove op) { - if (op == null) throw new IllegalArgumentException("op shouldn't be null"); - _op = op; - } - - public OpMove getOp() { - return _op; - } - +public record JKleppmannTreeOpWrapper(JObjectKey treeName, + OpMove op) implements Op, Serializable { // @Override // public Collection getEscapedRefs() { // if (_op.newMeta() instanceof JKleppmannTreeNodeMetaFile mf) { diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/jkleppmanntree/JKleppmannTreePeerInterface.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/jkleppmanntree/JKleppmannTreePeerInterface.java index 9088ecfd..0ea613f7 100644 --- a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/jkleppmanntree/JKleppmannTreePeerInterface.java +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/jkleppmanntree/JKleppmannTreePeerInterface.java @@ -1,6 +1,11 @@ package com.usatiuk.dhfs.objects.jkleppmanntree; +import com.usatiuk.dhfs.objects.PeerId; +import com.usatiuk.dhfs.objects.repository.PersistentPeerDataService; +import com.usatiuk.dhfs.objects.repository.peersync.PeerInfo; +import com.usatiuk.dhfs.objects.repository.peersync.PeerInfoService; import com.usatiuk.kleppmanntree.PeerInterface; +import jakarta.inject.Inject; import jakarta.inject.Singleton; import java.util.Collection; @@ -8,14 +13,19 @@ import java.util.List; import java.util.UUID; @Singleton -public class JKleppmannTreePeerInterface implements PeerInterface { +public class JKleppmannTreePeerInterface implements PeerInterface { + @Inject + PeerInfoService peerInfoService; + @Inject + PersistentPeerDataService persistentPeerDataService; + @Override - public UUID getSelfId() { - return UUID.nameUUIDFromBytes("1".getBytes()); + public PeerId getSelfId() { + return persistentPeerDataService.getSelfUuid(); } @Override - public Collection getAllPeers() { - return List.of(getSelfId()); + public Collection getAllPeers() { + return peerInfoService.getPeers().stream().map(PeerInfo::id).toList(); } } diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/jkleppmanntree/JKleppmannTreePeriodicPushOp.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/jkleppmanntree/JKleppmannTreePeriodicPushOp.java index f7526587..679e1249 100644 --- a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/jkleppmanntree/JKleppmannTreePeriodicPushOp.java +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/jkleppmanntree/JKleppmannTreePeriodicPushOp.java @@ -1,17 +1,19 @@ package com.usatiuk.dhfs.objects.jkleppmanntree; +import com.usatiuk.dhfs.objects.PeerId; + import java.util.UUID; public class JKleppmannTreePeriodicPushOp { - private final UUID _from; + private final PeerId _from; private final long _timestamp; - public JKleppmannTreePeriodicPushOp(UUID from, long timestamp) { + public JKleppmannTreePeriodicPushOp(PeerId from, long timestamp) { _from = from; _timestamp = timestamp; } - public UUID getFrom() { + public PeerId getFrom() { return _from; } diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/jkleppmanntree/structs/JKleppmannTreeNode.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/jkleppmanntree/structs/JKleppmannTreeNode.java index 3b4a8687..1d1a4839 100644 --- a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/jkleppmanntree/structs/JKleppmannTreeNode.java +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/jkleppmanntree/structs/JKleppmannTreeNode.java @@ -2,6 +2,7 @@ package com.usatiuk.dhfs.objects.jkleppmanntree.structs; import com.usatiuk.dhfs.objects.JDataRefcounted; import com.usatiuk.dhfs.objects.JObjectKey; +import com.usatiuk.dhfs.objects.PeerId; import com.usatiuk.dhfs.objects.repository.peersync.structs.JKleppmannTreeNodeMetaPeer; import com.usatiuk.kleppmanntree.OpMove; import com.usatiuk.kleppmanntree.TreeNode; @@ -12,15 +13,14 @@ import java.io.Serializable; import java.util.Collection; import java.util.Collections; import java.util.Map; -import java.util.UUID; import java.util.stream.Collectors; import java.util.stream.Stream; // FIXME: Ideally this is two classes? public record JKleppmannTreeNode(JObjectKey key, PCollection refsFrom, boolean frozen, JObjectKey parent, - OpMove lastEffectiveOp, + OpMove lastEffectiveOp, JKleppmannTreeNodeMeta meta, - Map children) implements TreeNode, JDataRefcounted, Serializable { + Map children) implements TreeNode, JDataRefcounted, Serializable { public JKleppmannTreeNode(JObjectKey id, JObjectKey parent, JKleppmannTreeNodeMeta meta) { this(id, TreePSet.empty(), false, parent, null, meta, Collections.emptyMap()); @@ -32,7 +32,7 @@ public record JKleppmannTreeNode(JObjectKey key, PCollection refsFro } @Override - public JKleppmannTreeNode withLastEffectiveOp(OpMove lastEffectiveOp) { + public JKleppmannTreeNode withLastEffectiveOp(OpMove lastEffectiveOp) { return new JKleppmannTreeNode(key, refsFrom, frozen, parent, lastEffectiveOp, meta, children); } diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/jkleppmanntree/structs/JKleppmannTreePersistentData.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/jkleppmanntree/structs/JKleppmannTreePersistentData.java index 910a652b..440b38de 100644 --- a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/jkleppmanntree/structs/JKleppmannTreePersistentData.java +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/jkleppmanntree/structs/JKleppmannTreePersistentData.java @@ -2,41 +2,24 @@ package com.usatiuk.dhfs.objects.jkleppmanntree.structs; import com.usatiuk.dhfs.objects.JDataRefcounted; import com.usatiuk.dhfs.objects.JObjectKey; +import com.usatiuk.dhfs.objects.PeerId; import com.usatiuk.kleppmanntree.CombinedTimestamp; import com.usatiuk.kleppmanntree.LogRecord; import com.usatiuk.kleppmanntree.OpMove; import org.pcollections.PCollection; +import org.pcollections.PMap; +import org.pcollections.PSortedMap; +import org.pcollections.TreePMap; import java.util.*; public record JKleppmannTreePersistentData( JObjectKey key, PCollection refsFrom, boolean frozen, long clock, - HashMap, OpMove>> queues, - HashMap peerTimestampLog, - TreeMap, LogRecord> log + PMap, OpMove>> queues, + HashMap peerTimestampLog, + TreeMap, LogRecord> log ) implements JDataRefcounted { - void recordOp(UUID host, OpMove opMove) { - queues().computeIfAbsent(host, h -> new TreeMap<>()); - queues().get(host).put(opMove.timestamp(), opMove); - } - - void removeOp(UUID host, OpMove opMove) { - queues().get(host).remove(opMove.timestamp(), opMove); - } - - void recordOp(Collection hosts, OpMove opMove) { - for (var u : hosts) { - recordOp(u, opMove); - } - } - - void removeOp(Collection hosts, OpMove opMove) { - for (var u : hosts) { - removeOp(u, opMove); - } - } - @Override public JKleppmannTreePersistentData withRefsFrom(PCollection refs) { return new JKleppmannTreePersistentData(key, refs, frozen, clock, queues, peerTimestampLog, log); @@ -51,15 +34,15 @@ public record JKleppmannTreePersistentData( return new JKleppmannTreePersistentData(key, refsFrom, frozen, clock, queues, peerTimestampLog, log); } - public JKleppmannTreePersistentData withQueues(HashMap, OpMove>> queues) { + public JKleppmannTreePersistentData withQueues(PMap, OpMove>> queues) { return new JKleppmannTreePersistentData(key, refsFrom, frozen, clock, queues, peerTimestampLog, log); } - public JKleppmannTreePersistentData withPeerTimestampLog(HashMap peerTimestampLog) { + public JKleppmannTreePersistentData withPeerTimestampLog(HashMap peerTimestampLog) { return new JKleppmannTreePersistentData(key, refsFrom, frozen, clock, queues, peerTimestampLog, log); } - public JKleppmannTreePersistentData withLog(TreeMap, LogRecord> log) { + public JKleppmannTreePersistentData withLog(TreeMap, LogRecord> log) { return new JKleppmannTreePersistentData(key, refsFrom, frozen, clock, queues, peerTimestampLog, log); } diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/PeerManager.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/PeerManager.java index 81f3c764..eabd1b8c 100644 --- a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/PeerManager.java +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/PeerManager.java @@ -23,6 +23,7 @@ import org.eclipse.microprofile.config.inject.ConfigProperty; import java.io.IOException; import java.util.*; import java.util.concurrent.*; +import java.util.stream.Collectors; @ApplicationScoped public class PeerManager { @@ -135,7 +136,7 @@ public class PeerManager { // FIXME: private boolean pingCheck(PeerInfo host, PeerAddress address) { try { - return rpcClientFactory.withObjSyncClient(host.id(), address, pingTimeout, c -> { + return rpcClientFactory.withObjSyncClient(host.id(), address, pingTimeout, (peer, c) -> { var ret = c.ping(PingRequest.newBuilder().setSelfUuid(persistentPeerDataService.getSelfUuid().toString()).build()); if (!UUID.fromString(ret.getSelfUuid()).equals(host.id().id())) { throw new IllegalStateException("Ping selfUuid returned " + ret.getSelfUuid() + " but expected " + host.id()); @@ -148,8 +149,12 @@ public class PeerManager { } } + public boolean isReachable(PeerId host) { + return _states.containsKey(host); + } + public boolean isReachable(PeerInfo host) { - return _states.containsKey(host.id()); + return isReachable(host.id()); } public PeerAddress getAddress(PeerId host) { @@ -166,21 +171,13 @@ public class PeerManager { // .map(Map.Entry::getKey).toList()); // } -// public HostStateSnapshot getHostStateSnapshot() { -// ArrayList available = new ArrayList<>(); -// ArrayList unavailable = new ArrayList<>(); -// _transientPeersState.runReadLocked(d -> { -// for (var v : d.getStates().entrySet()) { -// if (v.getValue().isReachable()) -// available.add(v.getKey()); -// else -// unavailable.add(v.getKey()); -// } -// return null; -// } -// ); -// return new HostStateSnapshot(available, unavailable); -// } + public HostStateSnapshot getHostStateSnapshot() { + return transactionManager.run(() -> { + var partition = peerInfoService.getPeersNoSelf().stream().map(PeerInfo::id) + .collect(Collectors.partitioningBy(this::isReachable)); + return new HostStateSnapshot(partition.get(true), partition.get(false)); + }); + } // public void removeRemoteHost(UUID host) { // persistentPeerDataService.removeHost(host); @@ -227,7 +224,7 @@ public class PeerManager { void apply(UUID host); } - public record HostStateSnapshot(List available, List unavailable) { + public record HostStateSnapshot(Collection available, Collection unavailable) { } } diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/ReceivedObjectSerializer.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/ReceivedObjectSerializer.java new file mode 100644 index 00000000..73ab19cd --- /dev/null +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/ReceivedObjectSerializer.java @@ -0,0 +1,46 @@ +package com.usatiuk.dhfs.objects.repository; + +import com.usatiuk.autoprotomap.runtime.ProtoSerializer; +import com.usatiuk.dhfs.objects.JDataRemote; +import com.usatiuk.dhfs.objects.JObjectKey; +import com.usatiuk.dhfs.objects.PeerId; +import com.usatiuk.dhfs.objects.ReceivedObject; +import com.usatiuk.dhfs.objects.persistence.RemoteObjectP; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; +import org.pcollections.HashTreePMap; +import org.pcollections.PMap; + +@ApplicationScoped +public class ReceivedObjectSerializer implements ProtoSerializer { + @Inject + ProtoSerializer remoteObjectSerializer; + + @Override + public ReceivedObject deserialize(GetObjectReply message) { + PMap changelog = HashTreePMap.empty(); + for (var entry : message.getHeader().getChangelog().getEntriesList()) { + changelog = changelog.plus(PeerId.of(entry.getHost()), entry.getVersion()); + } + return new ReceivedObject( + JObjectKey.of(message.getHeader().getName()), + changelog, + remoteObjectSerializer.deserialize(message.getContent()) + ); + } + + @Override + public GetObjectReply serialize(ReceivedObject object) { + var builder = GetObjectReply.newBuilder(); + var headerBuilder = builder.getHeaderBuilder(); + headerBuilder.setName(object.key().toString()); + var changelogBuilder = headerBuilder.getChangelogBuilder(); + object.changelog().forEach((peer, version) -> { + changelogBuilder.addEntriesBuilder() + .setHost(peer.toString()) + .setVersion(version); + }); + builder.setContent(remoteObjectSerializer.serialize(object.data())); + return builder.build(); + } +} diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/RemoteObjectServiceClient.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/RemoteObjectServiceClient.java index 37458390..d591bcb7 100644 --- a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/RemoteObjectServiceClient.java +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/RemoteObjectServiceClient.java @@ -1,82 +1,84 @@ -//package com.usatiuk.dhfs.objects.repository; -// -//import com.google.common.collect.Maps; -//import com.usatiuk.autoprotomap.runtime.ProtoSerializer; -//import com.usatiuk.dhfs.objects.jrepository.*; -//import com.usatiuk.dhfs.objects.persistence.JObjectDataP; -//import com.usatiuk.dhfs.objects.repository.invalidation.InvalidationQueueService; -//import com.usatiuk.dhfs.objects.repository.opsupport.Op; -//import io.grpc.Status; -//import io.grpc.StatusRuntimeException; -//import io.quarkus.logging.Log; -//import jakarta.enterprise.context.ApplicationScoped; -//import jakarta.inject.Inject; -//import org.apache.commons.lang3.tuple.Pair; -// -//import javax.annotation.Nullable; -//import java.util.*; -//import java.util.concurrent.Callable; -//import java.util.concurrent.ConcurrentLinkedDeque; -//import java.util.concurrent.Executors; -//import java.util.stream.Collectors; -// -//@ApplicationScoped -//public class RemoteObjectServiceClient { -// @Inject -// PersistentPeerDataService persistentPeerDataService; -// -// @Inject -// RpcClientFactory rpcClientFactory; -// -// @Inject -// JObjectManager jObjectManager; -// -// @Inject -// SyncHandler syncHandler; -// @Inject -// InvalidationQueueService invalidationQueueService; -// @Inject +package com.usatiuk.dhfs.objects.repository; + +import com.usatiuk.autoprotomap.runtime.ProtoSerializer; +import com.usatiuk.dhfs.objects.*; +import com.usatiuk.dhfs.objects.repository.invalidation.InvalidationQueueService; +import com.usatiuk.dhfs.objects.repository.invalidation.Op; +import com.usatiuk.dhfs.objects.transaction.Transaction; +import io.grpc.Status; +import io.grpc.StatusRuntimeException; +import io.quarkus.logging.Log; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; +import org.apache.commons.lang3.tuple.Pair; + +import java.util.List; +import java.util.Map; +import java.util.function.Function; + +@ApplicationScoped +public class RemoteObjectServiceClient { + @Inject + PersistentPeerDataService persistentPeerDataService; + + @Inject + RpcClientFactory rpcClientFactory; + + @Inject + TransactionManager txm; + @Inject + Transaction curTx; + @Inject + RemoteTransaction remoteTx; + + @Inject + SyncHandler syncHandler; + @Inject + InvalidationQueueService invalidationQueueService; + // @Inject // ProtoSerializer dataProtoSerializer; -// @Inject -// ProtoSerializer opProtoSerializer; -// @Inject -// JObjectTxManager jObjectTxManager; -// + @Inject + ProtoSerializer opProtoSerializer; + + @Inject + ProtoSerializer receivedObjectProtoSerializer; + // public Pair getSpecificObject(UUID host, String name) { // return rpcClientFactory.withObjSyncClient(host, client -> { // var reply = client.getObject(GetObjectRequest.newBuilder().setSelfUuid(persistentPeerDataService.getSelfUuid().toString()).setName(name).build()); // return Pair.of(reply.getObject().getHeader(), reply.getObject().getContent()); // }); // } -// -// public JObjectDataP getObject(JObject jObject) { -// jObject.assertRwLock(); -// -// var targets = jObject.runReadLocked(JObjectManager.ResolutionStrategy.NO_RESOLUTION, (md, d) -> { -// var ourVersion = md.getOurVersion(); -// if (ourVersion >= 1) -// return md.getRemoteCopies().entrySet().stream() -// .filter(entry -> entry.getValue().equals(ourVersion)) -// .map(Map.Entry::getKey).toList(); -// else -// return persistentPeerDataService.getHostUuids(); -// }); -// -// if (targets.isEmpty()) -// throw new IllegalStateException("No targets for object " + jObject.getMeta().getName()); -// -// Log.info("Downloading object " + jObject.getMeta().getName() + " from " + targets.stream().map(UUID::toString).collect(Collectors.joining(", "))); -// -// return rpcClientFactory.withObjSyncClient(targets, client -> { -// var reply = client.getObject(GetObjectRequest.newBuilder().setSelfUuid(persistentPeerDataService.getSelfUuid().toString()).setName(jObject.getMeta().getName()).build()); -// -// var receivedMap = new HashMap(); -// for (var e : reply.getObject().getHeader().getChangelog().getEntriesList()) { -// receivedMap.put(UUID.fromString(e.getHost()), e.getVersion()); -// } -// + + public void getObject(JObjectKey key, Function, Boolean> onReceive) { + var objMeta = remoteTx.getMeta(key).orElse(null); + + if (objMeta == null) { + throw new IllegalArgumentException("Object " + key + " not found"); + } + + var targetVersion = objMeta.versionSum(); + var targets = objMeta.knownRemoteVersions().entrySet().stream() + .filter(entry -> entry.getValue().equals(targetVersion)) + .map(Map.Entry::getKey).toList(); + + if (targets.isEmpty()) + throw new IllegalStateException("No targets for object " + key); + + Log.info("Downloading object " + key + " from " + targets); + + rpcClientFactory.withObjSyncClient(targets, (peer, client) -> { + var reply = client.getObject(GetObjectRequest.newBuilder().setName(key.toString()).build()); + + var deserialized = receivedObjectProtoSerializer.deserialize(reply); + + if (!onReceive.apply(Pair.of(peer, deserialized))) { + throw new StatusRuntimeException(Status.ABORTED.withDescription("Failed to process object " + key + " from " + peer)); + } + + return null; // return jObjectTxManager.executeTx(() -> { -// return jObject.runWriteLocked(JObjectManager.ResolutionStrategy.NO_RESOLUTION, (md, d, b, v) -> { +// return key.runWriteLocked(JObjectManager.ResolutionStrategy.NO_RESOLUTION, (md, d, b, v) -> { // var unexpected = !Objects.equals( // Maps.filterValues(md.getChangelog(), val -> val != 0), // Maps.filterValues(receivedMap, val -> val != 0)); @@ -98,10 +100,10 @@ // return reply.getObject().getContent(); // }); // }); -// }); -// } -// -// @Nullable + }); + } + + // @Nullable // public IndexUpdateReply notifyUpdate(JObject obj, UUID host) { // var builder = IndexUpdatePush.newBuilder().setSelfUuid(persistentPeerDataService.getSelfUuid().toString()); // @@ -128,7 +130,7 @@ // return rpcClientFactory.withObjSyncClient(host, client -> client.indexUpdate(send)); // } // -// public OpPushReply pushOps(List ops, String queueName, UUID host) { + public OpPushReply pushOps(PeerId target, List ops) { // for (Op op : ops) { // for (var ref : op.getEscapedRefs()) { // jObjectTxManager.executeTx(() -> { @@ -141,9 +143,14 @@ // .setQueueId(queueName); // for (var op : ops) // builder.addMsg(opProtoSerializer.serialize(op)); -// return rpcClientFactory.withObjSyncClient(host, client -> client.opPush(builder.build())); -// } -// + for (Op op : ops) { + var serialized = opProtoSerializer.serialize(op); + var built = OpPushRequest.newBuilder().addMsg(serialized).build(); + rpcClientFactory.withObjSyncClient(target, (tgt, client) -> client.opPush(built)); + } + return OpPushReply.getDefaultInstance(); + } + // public Collection canDelete(Collection targets, String object, Collection ourReferrers) { // ConcurrentLinkedDeque results = new ConcurrentLinkedDeque<>(); // Log.trace("Asking canDelete for " + object + " from " + targets.stream().map(UUID::toString).collect(Collectors.joining(", "))); @@ -171,4 +178,4 @@ // } // return results; // } -//} +} diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/RemoteObjectServiceServer.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/RemoteObjectServiceServer.java index 990ad534..e4b6b468 100644 --- a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/RemoteObjectServiceServer.java +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/RemoteObjectServiceServer.java @@ -1,60 +1,77 @@ package com.usatiuk.dhfs.objects.repository; +import com.usatiuk.autoprotomap.runtime.ProtoSerializer; +import com.usatiuk.dhfs.objects.*; +import com.usatiuk.dhfs.objects.repository.invalidation.InvalidationQueueService; +import com.usatiuk.dhfs.objects.repository.invalidation.Op; +import com.usatiuk.dhfs.objects.repository.invalidation.OpHandler; +import com.usatiuk.dhfs.objects.transaction.Transaction; import io.grpc.Status; import io.grpc.StatusRuntimeException; import io.quarkus.grpc.GrpcService; +import io.quarkus.logging.Log; +import io.quarkus.security.identity.SecurityIdentity; import io.smallrye.common.annotation.Blocking; import io.smallrye.mutiny.Uni; import jakarta.annotation.security.RolesAllowed; import jakarta.inject.Inject; +import org.apache.commons.lang3.NotImplementedException; -/// / Note: RunOnVirtualThread hangs somehow +// Note: RunOnVirtualThread hangs somehow @GrpcService @RolesAllowed("cluster-member") public class RemoteObjectServiceServer implements DhfsObjectSyncGrpc { // @Inject // SyncHandler syncHandler; -// -// @Inject -// JObjectManager jObjectManager; -// -// @Inject -// PeerManager peerManager; -// -// @Inject -// AutoSyncProcessor autoSyncProcessor; -// -@Inject -PersistentPeerDataService persistentPeerDataService; -// -// @Inject -// InvalidationQueueService invalidationQueueService; -// -// @Inject + + @Inject + TransactionManager txm; + @Inject + PeerManager peerManager; + @Inject + Transaction curTx; + @Inject + PersistentPeerDataService persistentPeerDataService; + + @Inject + InvalidationQueueService invalidationQueueService; + @Inject + SecurityIdentity identity; + // @Inject // ProtoSerializer dataProtoSerializer; -// @Inject -// ProtoSerializer opProtoSerializer; -// -// @Inject -// OpObjectRegistry opObjectRegistry; -// -// @Inject -// JObjectTxManager jObjectTxManager; -// -// @Override -// @Blocking -// public Uni getObject(GetObjectRequest request) { -// if (request.getSelfUuid().isBlank()) throw new StatusRuntimeException(Status.INVALID_ARGUMENT); -// if (!persistentPeerDataService.existsHost(UUID.fromString(request.getSelfUuid()))) -// throw new StatusRuntimeException(Status.UNAUTHENTICATED); -// -// Log.info("<-- getObject: " + request.getName() + " from " + request.getSelfUuid()); -// -// var obj = jObjectManager.get(request.getName()).orElseThrow(() -> new StatusRuntimeException(Status.NOT_FOUND)); -// + @Inject + ProtoSerializer opProtoSerializer; + @Inject + ProtoSerializer receivedObjectProtoSerializer; + @Inject + RemoteTransaction remoteTx; + @Inject + OpHandler opHandler; + + @Override + @Blocking + public Uni getObject(GetObjectRequest request) { + Log.info("<-- getObject: " + request.getName() + " from " + identity.getPrincipal().getName().substring(3)); + + var obj = txm.run(() -> { + var got = remoteTx.get(JDataRemote.class, JObjectKey.of(request.getName())).orElse(null); + if (got == null) { + Log.info("<-- getObject NOT FOUND: " + request.getName() + " from " + identity.getPrincipal().getName().substring(3)); + throw new StatusRuntimeException(Status.NOT_FOUND); + } + return got; + }); + + var serialized = receivedObjectProtoSerializer.serialize(obj.toReceivedObject()); + return Uni.createFrom().item(serialized); // // Does @Blocking break this? // return Uni.createFrom().emitter(emitter -> { -// var replyObj = jObjectTxManager.executeTx(() -> { +// try { +// } catch (Exception e) { +// emitter.fail(e); +// } +// var replyObj = txm.run(() -> { +// var cur = curTx.get(JDataRemote.class, JObjectKey.of(request.getName())).orElseThrow(() -> new StatusRuntimeException(Status.NOT_FOUND)); // // Obj.markSeen before markSeen of its children // obj.markSeen(); // return obj.runReadLocked(JObjectManager.ResolutionStrategy.LOCAL_ONLY, (meta, data) -> { @@ -77,11 +94,17 @@ PersistentPeerDataService persistentPeerDataService; // var ret = GetObjectReply.newBuilder() // .setSelfUuid(persistentPeerDataService.getSelfUuid().toString()) // .setObject(replyObj).build(); +// emitter.complete(ret); // // TODO: Could this cause problems if we wait for too long? -// obj.commitFenceAsync(() -> emitter.complete(ret)); +//// obj.commitFenceAsync(() -> emitter.complete(ret)); // }); -// } -// + } + + @Override + public Uni canDelete(CanDeleteRequest request) { + throw new NotImplementedException(); + } + // @Override // @Blocking // public Uni canDelete(CanDeleteRequest request) { @@ -107,11 +130,12 @@ PersistentPeerDataService persistentPeerDataService; // return m.isDeletionCandidate() && !m.isDeleted(); // }); // // FIXME -//// if (tryUpdate) { -//// obj.get().runWriteLocked(JObjectManager.ResolutionStrategy.NO_RESOLUTION, (m, d, b, v) -> { -//// return null; -//// }); -//// } + + /// / if (tryUpdate) { + /// / obj.get().runWriteLocked(JObjectManager.ResolutionStrategy.NO_RESOLUTION, (m, d, b, v) -> { + /// / return null; + /// / }); + /// / } // } catch (DeletedObjectAccessException dox) { // builder.setDeletionCandidate(true); // } @@ -127,7 +151,7 @@ PersistentPeerDataService persistentPeerDataService; // // return Uni.createFrom().item(ret); // } -// + // @Override // @Blocking // public Uni indexUpdate(IndexUpdatePush request) { @@ -135,51 +159,29 @@ PersistentPeerDataService persistentPeerDataService; // if (!persistentPeerDataService.existsHost(UUID.fromString(request.getSelfUuid()))) // throw new StatusRuntimeException(Status.UNAUTHENTICATED); // - /// / Log.info("<-- indexUpdate: " + request.getHeader().getName()); +// Log.info("<-- indexUpdate: " + request.getHeader().getName()); // return jObjectTxManager.executeTxAndFlush(() -> { // return Uni.createFrom().item(syncHandler.handleRemoteUpdate(request)); // }); // } -// -// @Override -// @Blocking -// public Uni opPush(OpPushMsg request) { -// if (request.getSelfUuid().isBlank()) throw new StatusRuntimeException(Status.INVALID_ARGUMENT); -// if (!persistentPeerDataService.existsHost(UUID.fromString(request.getSelfUuid()))) -// throw new StatusRuntimeException(Status.UNAUTHENTICATED); -// -// try { -// var objs = request.getMsgList().stream().map(opProtoSerializer::deserialize).toList(); -// jObjectTxManager.executeTxAndFlush(() -> { -// opObjectRegistry.acceptExternalOps(request.getQueueId(), UUID.fromString(request.getSelfUuid()), objs); -// }); -// } catch (Exception e) { -// Log.error(e, e); -// throw e; -// } -// return Uni.createFrom().item(OpPushReply.getDefaultInstance()); -// } -// - @Override - public Uni getObject(GetObjectRequest request) { - return null; + @Blocking + public Uni opPush(OpPushRequest request) { + try { + var ops = request.getMsgList().stream().map(opProtoSerializer::deserialize).toList(); + for (var op : ops) { + Log.info("<-- op: " + op + " from " + identity.getPrincipal().getName().substring(3)); + txm.run(() -> { + opHandler.handleOp(PeerId.of(identity.getPrincipal().getName().substring(3)), op); + }); + } + } catch (Exception e) { + Log.error(e, e); + throw e; + } + return Uni.createFrom().item(OpPushReply.getDefaultInstance()); } - @Override - public Uni canDelete(CanDeleteRequest request) { - return null; - } - - @Override - public Uni indexUpdate(IndexUpdatePush request) { - return null; - } - - @Override - public Uni opPush(OpPushMsg request) { - return null; - } @Override @Blocking diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/RpcClientFactory.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/RpcClientFactory.java index 3dddddf3..ab234a66 100644 --- a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/RpcClientFactory.java +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/RpcClientFactory.java @@ -76,7 +76,7 @@ public class RpcClientFactory { .withMaxOutboundMessageSize(Integer.MAX_VALUE) .withMaxInboundMessageSize(Integer.MAX_VALUE); }); - return fn.apply(stub.withDeadlineAfter(timeout, TimeUnit.SECONDS)); + return fn.apply(host, stub.withDeadlineAfter(timeout, TimeUnit.SECONDS)); } public void dropCache() { @@ -85,7 +85,7 @@ public class RpcClientFactory { @FunctionalInterface public interface ObjectSyncClientFunction { - R apply(DhfsObjectSyncGrpcGrpc.DhfsObjectSyncGrpcBlockingStub client); + R apply(PeerId peer, DhfsObjectSyncGrpcGrpc.DhfsObjectSyncGrpcBlockingStub client); } private record ObjSyncStubKey(PeerId id, InetAddress addr, int port) { diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/SyncHandler.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/SyncHandler.java index f47e34e2..4e7f883f 100644 --- a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/SyncHandler.java +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/SyncHandler.java @@ -1,12 +1,35 @@ -//package com.usatiuk.dhfs.objects.repository; +package com.usatiuk.dhfs.objects.repository; + +import com.usatiuk.dhfs.objects.JDataRemote; +import com.usatiuk.dhfs.objects.JObjectKey; +import com.usatiuk.dhfs.objects.PeerId; +import com.usatiuk.dhfs.objects.RemoteObject; +import com.usatiuk.dhfs.objects.transaction.Transaction; +import io.quarkus.logging.Log; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; +import org.apache.commons.lang3.NotImplementedException; +import org.pcollections.PMap; + +import java.util.stream.Collectors; +import java.util.stream.Stream; + // + //import com.usatiuk.autoprotomap.runtime.ProtoSerializer; + //import com.usatiuk.dhfs.objects.jrepository.JObject; + //import com.usatiuk.dhfs.objects.jrepository.JObjectData; + //import com.usatiuk.dhfs.objects.jrepository.JObjectManager; + //import com.usatiuk.dhfs.objects.jrepository.JObjectTxManager; + //import com.usatiuk.dhfs.objects.persistence.JObjectDataP; + //import com.usatiuk.dhfs.objects.repository.invalidation.InvalidationQueueService; + //import com.usatiuk.dhfs.objects.repository.opsupport.OpObjectRegistry; //import com.usatiuk.dhfs.utils.StatusRuntimeExceptionNoStacktrace; //import io.grpc.Status; @@ -23,8 +46,12 @@ //import java.util.stream.Collectors; //import java.util.stream.Stream; // -//@ApplicationScoped -//public class SyncHandler { +@ApplicationScoped +public class SyncHandler { + @Inject + Transaction curTx; + @Inject + PersistentPeerDataService persistentPeerDataService; // @Inject // JObjectManager jObjectManager; // @Inject @@ -65,143 +92,150 @@ // ); // } // -// public void handleOneUpdate(UUID from, ObjectHeader header) { -// AtomicReference> foundExt = new AtomicReference<>(); + + public RemoteObject handleOneUpdate(PeerId from, RemoteObject current, PMap rcvChangelog) { +// if (!rcv.key().equals(current.key())) { +// Log.error("Received update for different object: " + rcv.key() + " from " + from); +// throw new IllegalArgumentException("Received update for different object: " + rcv.key() + " from " + from); +// } + + var receivedTotalVer = rcvChangelog.values().stream().mapToLong(Long::longValue).sum(); + + if (current.meta().knownRemoteVersions().getOrDefault(from, 0L) > receivedTotalVer) { + Log.error("Received older index update than was known for host: " + from + " " + current.key()); + throw new IllegalStateException(); // FIXME: OutdatedUpdateException + } + + Log.trace("Handling update: " + current.key() + " from " + from + "\n" + "ours: " + current + " \n" + "received: " + rcvChangelog); + + boolean conflict = false; + boolean updatedRemoteVersion = false; + + var newObj = current; + var curKnownRemoteVersion = current.meta().knownRemoteVersions().get(from); + + if (curKnownRemoteVersion == null || !curKnownRemoteVersion.equals(receivedTotalVer)) + updatedRemoteVersion = true; + + if (updatedRemoteVersion) + newObj = current.withMeta(current.meta().withKnownRemoteVersions( + current.meta().knownRemoteVersions().plus(from, receivedTotalVer) + )); + + + boolean hasLower = false; + boolean hasHigher = false; + for (var e : Stream.concat(current.meta().changelog().keySet().stream(), rcvChangelog.keySet().stream()).collect(Collectors.toUnmodifiableSet())) { + if (rcvChangelog.getOrDefault(e, 0L) < current.meta().changelog().getOrDefault(e, 0L)) + hasLower = true; + if (rcvChangelog.getOrDefault(e, 0L) > current.meta().changelog().getOrDefault(e, 0L)) + hasHigher = true; + } + + if (hasLower && hasHigher) { + Log.info("Conflict on update (inconsistent version): " + current.key() + " from " + from); +// Log. // -// boolean conflict = jObjectTxManager.executeTx(() -> { -// JObject found = jObjectManager.getOrPut(header.getName(), JObjectData.class, Optional.empty()); -// foundExt.set(found); -// -// var receivedTotalVer = header.getChangelog().getEntriesList() -// .stream().map(ObjectChangelogEntry::getVersion).reduce(0L, Long::sum); -// -// var receivedMap = new HashMap(); -// for (var e : header.getChangelog().getEntriesList()) { -// receivedMap.put(UUID.fromString(e.getHost()), e.getVersion()); -// } -// -// return found.runWriteLocked(JObjectManager.ResolutionStrategy.NO_RESOLUTION, (md, data, bump, invalidate) -> { -// if (md.getRemoteCopies().getOrDefault(from, 0L) > receivedTotalVer) { -// Log.error("Received older index update than was known for host: " -// + from + " " + header.getName()); -// throw new OutdatedUpdateException(); -// } -// -// String rcv = ""; -// for (var e : header.getChangelog().getEntriesList()) { -// rcv += e.getHost() + ": " + e.getVersion() + "; "; -// } -// String ours = ""; -// for (var e : md.getChangelog().entrySet()) { -// ours += e.getKey() + ": " + e.getValue() + "; "; -// } -// Log.trace("Handling update: " + header.getName() + " from " + from + "\n" + "ours: " + ours + " \n" + "received: " + rcv); -// -// boolean updatedRemoteVersion = false; -// -// var oldRemoteVer = md.getRemoteCopies().put(from, receivedTotalVer); -// if (oldRemoteVer == null || !oldRemoteVer.equals(receivedTotalVer)) updatedRemoteVersion = true; -// -// boolean hasLower = false; -// boolean hasHigher = false; -// for (var e : Stream.concat(md.getChangelog().keySet().stream(), receivedMap.keySet().stream()).collect(Collectors.toSet())) { -// if (receivedMap.getOrDefault(e, 0L) < md.getChangelog().getOrDefault(e, 0L)) -// hasLower = true; -// if (receivedMap.getOrDefault(e, 0L) > md.getChangelog().getOrDefault(e, 0L)) -// hasHigher = true; -// } -// -// if (hasLower && hasHigher) { -// Log.info("Conflict on update (inconsistent version): " + header.getName() + " from " + from); -// return true; -// } -// -// if (hasLower) { -// Log.info("Received older index update than known: " -// + from + " " + header.getName()); -// throw new OutdatedUpdateException(); -// } -// -// if (hasHigher) { -// invalidate.apply(); -// md.getChangelog().clear(); -// md.getChangelog().putAll(receivedMap); -// md.getChangelog().putIfAbsent(persistentPeerDataService.getSelfUuid(), 0L); -// if (header.hasPushedData()) -// found.externalResolution(dataProtoSerializer.deserialize(header.getPushedData())); -// return false; -// } else if (data == null && header.hasPushedData()) { -// found.tryResolve(JObjectManager.ResolutionStrategy.LOCAL_ONLY); -// if (found.getData() == null) -// found.externalResolution(dataProtoSerializer.deserialize(header.getPushedData())); -// } -// -// assert Objects.equals(receivedTotalVer, md.getOurVersion()); -// -// if (!updatedRemoteVersion) -// Log.debug("No action on update: " + header.getName() + " from " + from); -// -// return false; -// }); -// }); -// -// // TODO: Is the lock gap here ok? -// if (conflict) { -// Log.info("Trying conflict resolution: " + header.getName() + " from " + from); +// info("Trying conflict resolution: " + header.getName() + " from " + from); // var found = foundExt.get(); // // JObjectData theirsData; // ObjectHeader theirsHeader; -// if (header.hasPushedData()) { +// if (header. hasPushedData()) { // theirsHeader = header; -// theirsData = dataProtoSerializer.deserialize(header.getPushedData()); +// theirsData = dataProtoSerializer. +// +// deserialize(header.getPushedData()); // } else { // var got = remoteObjectServiceClient.getSpecificObject(from, header.getName()); -// theirsData = dataProtoSerializer.deserialize(got.getRight()); -// theirsHeader = got.getLeft(); +// theirsData = dataProtoSerializer. +// +// deserialize(got.getRight()); +// theirsHeader = got. +// +// getLeft(); // } // -// jObjectTxManager.executeTx(() -> { -// var resolverClass = found.runReadLocked(JObjectManager.ResolutionStrategy.LOCAL_ONLY, (m, d) -> { -// if (d == null) -// throw new StatusRuntimeExceptionNoStacktrace(Status.UNAVAILABLE.withDescription("No local data when conflict " + header.getName())); -// return d.getConflictResolver(); -// }); -// var resolver = conflictResolvers.select(resolverClass); -// resolver.get().resolve(from, theirsHeader, theirsData, found); -// }); -// Log.info("Resolved conflict for " + from + " " + header.getName()); +// jObjectTxManager. +// +// executeTx(() -> { +// var resolverClass = found.runReadLocked(JObjectManager.ResolutionStrategy.LOCAL_ONLY, (m, d) -> { +// if (d == null) +// throw new StatusRuntimeExceptionNoStacktrace(Status.UNAVAILABLE.withDescription("No local data when conflict " + header.getName())); +// return d.getConflictResolver(); +// }); +// var resolver = conflictResolvers.select(resolverClass); +// resolver. +// +// get(). +// +// resolve(from, theirsHeader, theirsData, found); +// }); +// Log. info("Resolved conflict for " + from + " " + header.getName()); + throw new NotImplementedException(); + } else if (hasLower) { + Log.info("Received older index update than known: " + from + " " + current.key()); +// throw new OutdatedUpdateException(); + throw new NotImplementedException(); + } else if (hasHigher) { + var newChangelog = rcvChangelog.containsKey(persistentPeerDataService.getSelfUuid()) ? + rcvChangelog : rcvChangelog.plus(persistentPeerDataService.getSelfUuid(), 0L); + + newObj = newObj.withData(null).withMeta(newObj.meta().withChangelog(newChangelog)); +// if (header.hasPushedData()) +// found.externalResolution(dataProtoSerializer.deserialize(header.getPushedData())); + } +// else if (data == null && header.hasPushedData()) { +// found.tryResolve(JObjectManager.ResolutionStrategy.LOCAL_ONLY); +// if (found.getData() == null) +// found.externalResolution(dataProtoSerializer.deserialize(header.getPushedData())); // } -// -// } -// -// public IndexUpdateReply handleRemoteUpdate(IndexUpdatePush request) { -// // TODO: Dedup -// try { -// handleOneUpdate(UUID.fromString(request.getSelfUuid()), request.getHeader()); + +// assert Objects.equals(receivedTotalVer, md.getOurVersion()); + + if (!updatedRemoteVersion) + Log.debug("No action on update: " + current.meta().key() + " from " + from); + + return newObj; + } + + public RemoteObject handleRemoteUpdate(PeerId from, JObjectKey key, RemoteObject current, PMap rcv) { + // TODO: Dedup + try { + if (current == null) { + var obj = new RemoteObject<>(key, rcv); + curTx.put(obj); + return (RemoteObject) obj; + } + + var newObj = handleOneUpdate(from, current, rcv); + if (newObj != current) { + curTx.put(newObj); + } + return newObj; // } catch (OutdatedUpdateException ignored) { // Log.warn("Outdated update of " + request.getHeader().getName() + " from " + request.getSelfUuid()); // invalidationQueueService.pushInvalidationToOne(UUID.fromString(request.getSelfUuid()), request.getHeader().getName()); -// } catch (Exception ex) { -// Log.info("Error when handling update from " + request.getSelfUuid() + " of " + request.getHeader().getName(), ex); -// throw ex; -// } -// + } catch (Exception ex) { + Log.info("Error when handling update from " + from + " of " + current.meta().key(), ex); + throw ex; + } + // return IndexUpdateReply.getDefaultInstance(); -// } -// -// protected static class OutdatedUpdateException extends RuntimeException { -// OutdatedUpdateException() { -// super(); -// } -// -// OutdatedUpdateException(String message) { -// super(message); -// } -// -// @Override -// public synchronized Throwable fillInStackTrace() { -// return this; -// } -// } -//} \ No newline at end of file + } + + protected static class OutdatedUpdateException extends RuntimeException { + OutdatedUpdateException() { + super(); + } + + OutdatedUpdateException(String message) { + super(message); + } + + @Override + public synchronized Throwable fillInStackTrace() { + return this; + } + } +} \ No newline at end of file diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/invalidation/DeferredInvalidationQueueData.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/invalidation/DeferredInvalidationQueueData.java new file mode 100644 index 00000000..1df4136a --- /dev/null +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/invalidation/DeferredInvalidationQueueData.java @@ -0,0 +1,17 @@ +package com.usatiuk.dhfs.objects.repository.invalidation; + +import com.usatiuk.dhfs.objects.JObjectKey; +import com.usatiuk.dhfs.objects.PeerId; +import org.apache.commons.collections4.MultiValuedMap; +import org.apache.commons.collections4.multimap.HashSetValuedHashMap; + +import java.io.Serial; +import java.io.Serializable; +import java.util.UUID; + +public class DeferredInvalidationQueueData implements Serializable { + @Serial + private static final long serialVersionUID = 1L; + + public final MultiValuedMap deferredInvalidations = new HashSetValuedHashMap<>(); +} diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/invalidation/DeferredInvalidationQueueService.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/invalidation/DeferredInvalidationQueueService.java new file mode 100644 index 00000000..8d2a30b9 --- /dev/null +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/invalidation/DeferredInvalidationQueueService.java @@ -0,0 +1,85 @@ +package com.usatiuk.dhfs.objects.repository.invalidation; + +import com.usatiuk.dhfs.objects.JObjectKey; +import com.usatiuk.dhfs.objects.PeerId; +import com.usatiuk.dhfs.objects.repository.PeerManager; +import com.usatiuk.dhfs.utils.SerializationHelper; +import io.quarkus.logging.Log; +import io.quarkus.runtime.ShutdownEvent; +import io.quarkus.runtime.StartupEvent; +import io.quarkus.scheduler.Scheduled; +import io.smallrye.common.annotation.Blocking; +import jakarta.annotation.Priority; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.enterprise.event.Observes; +import jakarta.inject.Inject; +import org.apache.commons.lang3.SerializationUtils; +import org.eclipse.microprofile.config.inject.ConfigProperty; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.UUID; + +@ApplicationScoped +public class DeferredInvalidationQueueService { + private static final String dataFileName = "invqueue"; + @Inject + PeerManager remoteHostManager; + @Inject + InvalidationQueueService invalidationQueueService; + @ConfigProperty(name = "dhfs.objects.persistence.files.root") + String dataRoot; + private DeferredInvalidationQueueData _persistentData = new DeferredInvalidationQueueData(); + + void init(@Observes @Priority(290) StartupEvent event) throws IOException { + Paths.get(dataRoot).toFile().mkdirs(); + Log.info("Initializing with root " + dataRoot); + if (Paths.get(dataRoot).resolve(dataFileName).toFile().exists()) { + Log.info("Reading invalidation queue"); + _persistentData = SerializationHelper.deserialize(Files.readAllBytes(Paths.get(dataRoot).resolve(dataFileName))); + } +// remoteHostManager.registerConnectEventListener(this::returnForHost); + } + + void shutdown(@Observes @Priority(300) ShutdownEvent event) throws IOException { + Log.info("Saving deferred invalidations"); + writeData(); + Log.info("Saved deferred invalidations"); + } + + private void writeData() { + try { + Files.write(Paths.get(dataRoot).resolve(dataFileName), SerializationUtils.serialize(_persistentData)); + } catch (IOException iex) { + Log.error("Error writing deferred invalidations data", iex); + throw new RuntimeException(iex); + } + } + + // FIXME: + @Scheduled(every = "15s", concurrentExecution = Scheduled.ConcurrentExecution.SKIP) + @Blocking + void periodicReturn() { + for (var reachable : remoteHostManager.getAvailableHosts()) + returnForHost(reachable); + } + + void returnForHost(PeerId host) { + synchronized (this) { + var col = _persistentData.deferredInvalidations.get(host); + for (var s : col) { + Log.trace("Un-deferred invalidation to " + host + " of " + s); + invalidationQueueService.pushDeferredInvalidations(host, s); + } + col.clear(); + } + } + + void defer(PeerId host, JObjectKey object) { + synchronized (this) { + Log.trace("Deferred invalidation to " + host + " of " + object); + _persistentData.deferredInvalidations.put(host, object); + } + } +} diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/invalidation/IndexUpdateOp.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/invalidation/IndexUpdateOp.java new file mode 100644 index 00000000..e2162139 --- /dev/null +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/invalidation/IndexUpdateOp.java @@ -0,0 +1,12 @@ +package com.usatiuk.dhfs.objects.repository.invalidation; + +import com.usatiuk.dhfs.objects.JObjectKey; +import com.usatiuk.dhfs.objects.PeerId; +import com.usatiuk.dhfs.objects.RemoteObject; +import org.pcollections.PMap; + +public record IndexUpdateOp(JObjectKey key, PMap changelog) implements Op { + public IndexUpdateOp(RemoteObject object) { + this(object.key(), object.meta().changelog()); + } +} diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/invalidation/IndexUpdateOpSerializer.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/invalidation/IndexUpdateOpSerializer.java new file mode 100644 index 00000000..bb614c1a --- /dev/null +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/invalidation/IndexUpdateOpSerializer.java @@ -0,0 +1,36 @@ +package com.usatiuk.dhfs.objects.repository.invalidation; + +import com.usatiuk.autoprotomap.runtime.ProtoSerializer; +import com.usatiuk.dhfs.objects.JObjectKey; +import com.usatiuk.dhfs.objects.PeerId; +import com.usatiuk.dhfs.objects.repository.IndexUpdateOpP; +import jakarta.enterprise.context.ApplicationScoped; +import org.pcollections.HashTreePMap; +import org.pcollections.PMap; + +@ApplicationScoped +public class IndexUpdateOpSerializer implements ProtoSerializer { + + @Override + public IndexUpdateOp deserialize(IndexUpdateOpP message) { + PMap map = HashTreePMap.empty(); + for (var entry : message.getHeader().getChangelog().getEntriesList()) { + map = map.plus(PeerId.of(entry.getHost()), entry.getVersion()); + } + return new IndexUpdateOp(JObjectKey.of(message.getHeader().getName()), map); + } + + @Override + public IndexUpdateOpP serialize(IndexUpdateOp object) { + var builder = IndexUpdateOpP.newBuilder(); + var headerBuilder = builder.getHeaderBuilder(); + headerBuilder.setName(object.key().name()); + var changelogBuilder = headerBuilder.getChangelogBuilder(); + for (var entry : object.changelog().entrySet()) { + var entryBuilder = changelogBuilder.addEntriesBuilder(); + entryBuilder.setHost(entry.getKey().id().toString()); + entryBuilder.setVersion(entry.getValue()); + } + return builder.build(); + } +} diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/invalidation/InvalidationQueueService.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/invalidation/InvalidationQueueService.java new file mode 100644 index 00000000..b2c3c024 --- /dev/null +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/invalidation/InvalidationQueueService.java @@ -0,0 +1,190 @@ +package com.usatiuk.dhfs.objects.repository.invalidation; + +import com.usatiuk.dhfs.objects.JData; +import com.usatiuk.dhfs.objects.JObjectKey; +import com.usatiuk.dhfs.objects.PeerId; +import com.usatiuk.dhfs.objects.TransactionManager; +import com.usatiuk.dhfs.objects.repository.PeerManager; +import com.usatiuk.dhfs.objects.repository.PersistentPeerDataService; +import com.usatiuk.dhfs.objects.repository.RemoteObjectServiceClient; +import com.usatiuk.dhfs.objects.repository.peersync.PeerInfoService; +import com.usatiuk.dhfs.objects.transaction.Transaction; +import com.usatiuk.dhfs.utils.HashSetDelayedBlockingQueue; +import io.quarkus.logging.Log; +import io.quarkus.runtime.ShutdownEvent; +import io.quarkus.runtime.StartupEvent; +import io.vertx.core.impl.ConcurrentHashSet; +import jakarta.annotation.Priority; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.enterprise.event.Observes; +import jakarta.inject.Inject; +import org.apache.commons.lang3.concurrent.BasicThreadFactory; +import org.apache.commons.lang3.tuple.Pair; +import org.eclipse.microprofile.config.inject.ConfigProperty; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +@ApplicationScoped +public class InvalidationQueueService { + private final HashSetDelayedBlockingQueue> _queue; + private final AtomicReference> _toAllQueue = new AtomicReference<>(new ConcurrentHashSet<>()); + @Inject + PeerManager remoteHostManager; + @Inject + RemoteObjectServiceClient remoteObjectServiceClient; + @Inject + TransactionManager txm; + @Inject + Transaction curTx; + @Inject + PersistentPeerDataService persistentPeerDataService; + @Inject + DeferredInvalidationQueueService deferredInvalidationQueueService; + @Inject + PeerInfoService peerInfoService; + @Inject + OpPusher opPusher; + @ConfigProperty(name = "dhfs.objects.invalidation.threads") + int threads; + private ExecutorService _executor; + private volatile boolean _shutdown = false; + + public InvalidationQueueService(@ConfigProperty(name = "dhfs.objects.invalidation.delay") int delay) { + _queue = new HashSetDelayedBlockingQueue<>(delay); + } + + void init(@Observes @Priority(300) StartupEvent event) throws InterruptedException { + BasicThreadFactory factory = new BasicThreadFactory.Builder() + .namingPattern("invalidation-%d") + .build(); + + _executor = Executors.newFixedThreadPool(threads, factory); + + for (int i = 0; i < threads; i++) { + _executor.submit(this::sender); + } + } + + void shutdown(@Observes @Priority(10) ShutdownEvent event) throws InterruptedException { + _shutdown = true; + _executor.shutdownNow(); + if (!_executor.awaitTermination(30, TimeUnit.SECONDS)) { + Log.error("Failed to shut down invalidation sender thread"); + } + var data = _queue.close(); + Log.info("Will defer " + data.size() + " invalidations on shutdown"); + for (var e : data) + deferredInvalidationQueueService.defer(e.getLeft(), e.getRight()); + } + + private void sender() { + while (!_shutdown) { + try { + try { + if (!_queue.hasImmediate()) { + ConcurrentHashSet toAllQueue; + + while (true) { + toAllQueue = _toAllQueue.get(); + if (toAllQueue != null) { + if (_toAllQueue.compareAndSet(toAllQueue, null)) + break; + } else { + break; + } + } + + if (toAllQueue != null) { + var hostInfo = remoteHostManager.getHostStateSnapshot(); + for (var o : toAllQueue) { + for (var h : hostInfo.available()) + _queue.add(Pair.of(h, o)); + for (var u : hostInfo.unavailable()) + deferredInvalidationQueueService.defer(u, o); + } + } + } + + var data = _queue.getAllWait(100, _queue.getDelay()); // TODO: config? + if (data.isEmpty()) continue; + String stats = "Sent invalidation: "; + long success = 0; + + for (var e : data) { + if (peerInfoService.getPeerInfo(e.getLeft()).isEmpty()) continue; + + if (!remoteHostManager.isReachable(e.getLeft())) { + deferredInvalidationQueueService.defer(e.getLeft(), e.getRight()); + continue; + } + + try { + opPusher.doPush(e.getLeft(), e.getRight()); + success++; + } catch (Exception ex) { + Log.info("Failed to send invalidation to " + e.getLeft() + ", will retry", ex); + pushInvalidationToOne(e.getLeft(), e.getRight()); + } + if (_shutdown) { + Log.info("Invalidation sender exiting"); + break; + } + } + + stats += success + "/" + data.size() + " "; + Log.info(stats); + } catch (InterruptedException ie) { + throw ie; + } catch (Exception e) { + Log.error("Exception in invalidation sender thread: ", e); + } + } catch (InterruptedException ignored) { + } + } + Log.info("Invalidation sender exiting"); + } + + public void pushInvalidationToAll(JObjectKey key) { +// if (obj.getMeta().isOnlyLocal()) return; + while (true) { + var queue = _toAllQueue.get(); + if (queue == null) { + var nq = new ConcurrentHashSet(); + if (!_toAllQueue.compareAndSet(null, nq)) continue; + queue = nq; + } + + queue.add(key); + + if (_toAllQueue.get() == queue) break; + } + } + + public void pushInvalidationToOne(PeerId host, JObjectKey obj) { +// if (obj.getMeta().isOnlyLocal()) return; + if (remoteHostManager.isReachable(host)) + _queue.add(Pair.of(host, obj)); + else + deferredInvalidationQueueService.defer(host, obj); + } + + public void pushInvalidationToOne(PeerId host, JData obj) { +// if (obj.getMeta().isOnlyLocal()) return; + pushInvalidationToOne(host, obj.key()); + } + +// public void pushInvalidationToAll(String name) { +// pushInvalidationToAll(jObjectManager.get(name).orElseThrow(() -> new IllegalArgumentException("Object " + name + " not found"))); +// } +// +// public void pushInvalidationToOne(PeerId host, JObjectKey name) { +// pushInvalidationToOne(host, jObjectManager.get(name).orElseThrow(() -> new IllegalArgumentException("Object " + name + " not found"))); +// } + + protected void pushDeferredInvalidations(PeerId host, JObjectKey name) { + _queue.add(Pair.of(host, name)); + } +} diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/invalidation/JKleppmannTreeOpPTempSerializer.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/invalidation/JKleppmannTreeOpPTempSerializer.java new file mode 100644 index 00000000..13399feb --- /dev/null +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/invalidation/JKleppmannTreeOpPTempSerializer.java @@ -0,0 +1,22 @@ +package com.usatiuk.dhfs.objects.repository.invalidation; + +import com.usatiuk.autoprotomap.runtime.ProtoSerializer; +import com.usatiuk.dhfs.objects.jkleppmanntree.JKleppmannTreeOpWrapper; +import com.usatiuk.dhfs.objects.repository.JKleppmannTreeOpPTemp; +import com.usatiuk.dhfs.utils.SerializationHelper; +import jakarta.enterprise.context.ApplicationScoped; + +@ApplicationScoped +public class JKleppmannTreeOpPTempSerializer implements ProtoSerializer { + @Override + public JKleppmannTreeOpWrapper deserialize(JKleppmannTreeOpPTemp message) { + return SerializationHelper.deserialize(message.getSerialized().toByteArray()); + } + + @Override + public JKleppmannTreeOpPTemp serialize(JKleppmannTreeOpWrapper object) { + return JKleppmannTreeOpPTemp.newBuilder() + .setSerialized(SerializationHelper.serialize(object)) + .build(); + } +} diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/invalidation/Op.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/invalidation/Op.java new file mode 100644 index 00000000..eb5c6029 --- /dev/null +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/invalidation/Op.java @@ -0,0 +1,8 @@ +package com.usatiuk.dhfs.objects.repository.invalidation; + +import com.usatiuk.autoprotomap.runtime.ProtoMirror; +import com.usatiuk.dhfs.objects.repository.OpPushPayload; + +@ProtoMirror(OpPushPayload.class) +public interface Op { +} diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/invalidation/OpHandler.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/invalidation/OpHandler.java new file mode 100644 index 00000000..6b78661a --- /dev/null +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/invalidation/OpHandler.java @@ -0,0 +1,27 @@ +package com.usatiuk.dhfs.objects.repository.invalidation; + +import com.usatiuk.dhfs.objects.PeerId; +import com.usatiuk.dhfs.objects.jkleppmanntree.JKleppmannTreeManager; +import com.usatiuk.dhfs.objects.jkleppmanntree.JKleppmannTreeOpWrapper; +import com.usatiuk.dhfs.objects.transaction.Transaction; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; + +@ApplicationScoped +public class OpHandler { + @Inject + PushOpHandler pushOpHandler; + @Inject + Transaction curTx; + @Inject + JKleppmannTreeManager jKleppmannTreeManager; + + public void handleOp(PeerId from, Op op) { + if (op instanceof IndexUpdateOp iu) { + pushOpHandler.handlePush(from, iu); + } else if (op instanceof JKleppmannTreeOpWrapper jk) { + var tree = jKleppmannTreeManager.getTree(jk.treeName()); + tree.acceptExternalOp(from, jk); + } + } +} diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/invalidation/OpPusher.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/invalidation/OpPusher.java new file mode 100644 index 00000000..f653c265 --- /dev/null +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/invalidation/OpPusher.java @@ -0,0 +1,52 @@ +package com.usatiuk.dhfs.objects.repository.invalidation; + +import com.usatiuk.dhfs.objects.*; +import com.usatiuk.dhfs.objects.jkleppmanntree.JKleppmannTreeOpWrapper; +import com.usatiuk.dhfs.objects.jkleppmanntree.structs.JKleppmannTreePersistentData; +import com.usatiuk.dhfs.objects.repository.RemoteObjectServiceClient; +import com.usatiuk.dhfs.objects.transaction.Transaction; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; + +import java.util.List; + +@ApplicationScoped +public class OpPusher { + @Inject + Transaction curTx; + @Inject + TransactionManager txm; + @Inject + RemoteTransaction remoteTransaction; + @Inject + RemoteObjectServiceClient remoteObjectServiceClient; + @Inject + InvalidationQueueService invalidationQueueService; + + public void doPush(PeerId op, JObjectKey key) { + Op info = txm.run(() -> { + var obj = curTx.get(JData.class, key).orElse(null); + switch (obj) { + case RemoteObject remote -> { + return new IndexUpdateOp(key, remote.meta().changelog()); + } + case JKleppmannTreePersistentData pd -> { + var ret = new JKleppmannTreeOpWrapper(key, pd.queues().get(op).firstEntry().getValue()); + var newPd = pd.withQueues(pd.queues().plus(op, pd.queues().get(op).minus(ret.op().timestamp()))); + curTx.put(newPd); + if (!newPd.queues().get(op).isEmpty()) + invalidationQueueService.pushInvalidationToOne(op, pd.key()); + return ret; + } + case null, + default -> { + return null; + } + } + }); + if (info == null) { + return; + } + remoteObjectServiceClient.pushOps(op, List.of(info)); + } +} diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/invalidation/PushOpHandler.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/invalidation/PushOpHandler.java new file mode 100644 index 00000000..8ee79b77 --- /dev/null +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/invalidation/PushOpHandler.java @@ -0,0 +1,25 @@ +package com.usatiuk.dhfs.objects.repository.invalidation; + +import com.usatiuk.dhfs.objects.JDataRemote; +import com.usatiuk.dhfs.objects.PeerId; +import com.usatiuk.dhfs.objects.RemoteTransaction; +import com.usatiuk.dhfs.objects.repository.SyncHandler; +import com.usatiuk.dhfs.objects.transaction.Transaction; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; + +@ApplicationScoped +public class PushOpHandler { + @Inject + Transaction curTx; + @Inject + SyncHandler syncHandler; + @Inject + RemoteTransaction remoteTransaction; + + public void handlePush(PeerId peer, IndexUpdateOp obj) { + syncHandler.handleRemoteUpdate(peer, obj.key(), + remoteTransaction.get(JDataRemote.class, obj.key()).orElse(null), + obj.changelog()); + } +} diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/peersync/PeerInfo.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/peersync/PeerInfo.java index caa45d3a..cc9d4586 100644 --- a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/peersync/PeerInfo.java +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/peersync/PeerInfo.java @@ -1,30 +1,19 @@ package com.usatiuk.dhfs.objects.repository.peersync; -import com.usatiuk.dhfs.objects.JDataRefcounted; +import com.usatiuk.autoprotomap.runtime.ProtoMirror; import com.usatiuk.dhfs.objects.JDataRemote; import com.usatiuk.dhfs.objects.JObjectKey; import com.usatiuk.dhfs.objects.PeerId; +import com.usatiuk.dhfs.objects.persistence.ChunkDataP; +import com.usatiuk.dhfs.objects.persistence.PeerInfoP; import com.usatiuk.dhfs.objects.repository.CertificateTools; -import org.pcollections.HashTreePSet; -import org.pcollections.PCollection; import java.security.cert.CertificateException; import java.security.cert.X509Certificate; -public record PeerInfo(JObjectKey key, PCollection refsFrom, boolean frozen, PeerId id, - byte[] cert) implements JDataRefcounted, JDataRemote { +public record PeerInfo(JObjectKey key, PeerId id, byte[] cert) implements JDataRemote { public PeerInfo(PeerId id, byte[] cert) { - this(id.toJObjectKey(), HashTreePSet.empty(), false, id, cert); - } - - @Override - public JDataRefcounted withRefsFrom(PCollection refs) { - return new PeerInfo(key, refs, frozen, id, cert); - } - - @Override - public JDataRefcounted withFrozen(boolean frozen) { - return new PeerInfo(key, refsFrom, frozen, id, cert); + this(id.toJObjectKey(), id, cert); } public X509Certificate parsedCert() { diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/peersync/PeerInfoSerializer.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/peersync/PeerInfoSerializer.java new file mode 100644 index 00000000..5f00c155 --- /dev/null +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/peersync/PeerInfoSerializer.java @@ -0,0 +1,24 @@ +package com.usatiuk.dhfs.objects.repository.peersync; + +import com.google.protobuf.ByteString; +import com.usatiuk.autoprotomap.runtime.ProtoSerializer; +import com.usatiuk.dhfs.objects.PeerId; +import com.usatiuk.dhfs.objects.persistence.PeerInfoP; +import jakarta.enterprise.context.ApplicationScoped; + +@ApplicationScoped +public class PeerInfoSerializer implements ProtoSerializer { + + @Override + public PeerInfo deserialize(PeerInfoP message) { + return new PeerInfo(PeerId.of(message.getUuid()), message.getCert().toByteArray()); + } + + @Override + public PeerInfoP serialize(PeerInfo object) { + return PeerInfoP.newBuilder() + .setUuid(object.key().toString()) + .setCert(ByteString.copyFrom(object.cert())) + .build(); + } +} diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/peersync/PeerInfoService.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/peersync/PeerInfoService.java index c83fe311..be8d8a2a 100644 --- a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/peersync/PeerInfoService.java +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/peersync/PeerInfoService.java @@ -2,6 +2,7 @@ package com.usatiuk.dhfs.objects.repository.peersync; import com.usatiuk.dhfs.objects.JObjectKey; import com.usatiuk.dhfs.objects.PeerId; +import com.usatiuk.dhfs.objects.RemoteTransaction; import com.usatiuk.dhfs.objects.TransactionManager; import com.usatiuk.dhfs.objects.jkleppmanntree.JKleppmannTreeManager; import com.usatiuk.dhfs.objects.jkleppmanntree.structs.JKleppmannTreeNode; @@ -24,6 +25,8 @@ public class PeerInfoService { JKleppmannTreeManager jKleppmannTreeManager; @Inject PersistentPeerDataService persistentPeerDataService; + @Inject + RemoteTransaction remoteTx; private JKleppmannTreeManager.JKleppmannTree getTree() { return jKleppmannTreeManager.getTree(JObjectKey.of("peers")); @@ -37,7 +40,7 @@ public class PeerInfoService { } return curTx.get(JKleppmannTreeNode.class, gotKey).flatMap(node -> { var meta = (JKleppmannTreeNodeMetaPeer) node.meta(); - return curTx.get(PeerInfo.class, meta.getPeerId()); + return remoteTx.getData(PeerInfo.class, meta.getPeerId()); }); }); } @@ -69,7 +72,7 @@ public class PeerInfoService { jObjectTxManager.run(() -> { var parent = getTree().traverse(List.of()); var newPeerInfo = new PeerInfo(id, cert); - curTx.put(newPeerInfo); + remoteTx.put(newPeerInfo); getTree().move(parent, new JKleppmannTreeNodeMetaPeer(newPeerInfo.id()), getTree().getNewNodeId()); }); } diff --git a/dhfs-parent/server/src/main/proto/dhfs_objects_serial.proto b/dhfs-parent/server/src/main/proto/dhfs_objects_serial.proto index 0f93fdd5..3acb21a3 100644 --- a/dhfs-parent/server/src/main/proto/dhfs_objects_serial.proto +++ b/dhfs-parent/server/src/main/proto/dhfs_objects_serial.proto @@ -33,10 +33,13 @@ message FilePChunksEntry { } message FileP { - FsNodeP fsNode = 1; - repeated FilePChunksEntry chunks = 2; - bool symlink = 3; - int64 size = 4; + string uuid = 1; + int64 mode = 2; + int64 ctime = 3; + int64 mtime = 4; + repeated FilePChunksEntry chunks = 5; + bool symlink = 6; + int64 size = 7; } message DirectoryP { @@ -53,7 +56,7 @@ message PeerDirectoryP { repeated string peers = 1; } -message PersistentPeerInfoP { +message PeerInfoP { string uuid = 1; bytes cert = 2; } @@ -141,15 +144,26 @@ message PeerDirectoryLocalP { repeated string initialObjSyncDonePeers = 2; } +message RemoteObjectP { + oneof obj { + FileP file = 2; + ChunkDataP chunkData = 5; + // PeerDirectoryP peerDirectory = 6; + PeerInfoP peerInfo = 7; + // JKleppmannTreeNodeP jKleppmannTreeNode = 8; + // JKleppmannTreePersistentDataP jKleppmannTreePersistentData = 9; + // PeerDirectoryLocalP peerDirectoryLocal = 10; + } +} + message JObjectDataP { oneof obj { FileP file = 2; - DirectoryP directory = 3; ChunkDataP chunkData = 5; - PeerDirectoryP peerDirectory = 6; - PersistentPeerInfoP persistentPeerInfo = 7; - JKleppmannTreeNodeP jKleppmannTreeNode = 8; - JKleppmannTreePersistentDataP jKleppmannTreePersistentData = 9; - PeerDirectoryLocalP peerDirectoryLocal = 10; +// PeerDirectoryP peerDirectory = 6; + PeerInfoP peerInfo = 7; +// JKleppmannTreeNodeP jKleppmannTreeNode = 8; +// JKleppmannTreePersistentDataP jKleppmannTreePersistentData = 9; +// PeerDirectoryLocalP peerDirectoryLocal = 10; } } \ No newline at end of file diff --git a/dhfs-parent/server/src/main/proto/dhfs_objects_sync.proto b/dhfs-parent/server/src/main/proto/dhfs_objects_sync.proto index 8ef94946..5820aefb 100644 --- a/dhfs-parent/server/src/main/proto/dhfs_objects_sync.proto +++ b/dhfs-parent/server/src/main/proto/dhfs_objects_sync.proto @@ -11,8 +11,7 @@ package dhfs.objects.sync; service DhfsObjectSyncGrpc { rpc GetObject (GetObjectRequest) returns (GetObjectReply) {} rpc CanDelete (CanDeleteRequest) returns (CanDeleteReply) {} - rpc IndexUpdate (IndexUpdatePush) returns (IndexUpdateReply) {} - rpc OpPush (OpPushMsg) returns (OpPushReply) {} + rpc OpPush (OpPushRequest) returns (OpPushReply) {} rpc Ping (PingRequest) returns (PingReply) {} } @@ -37,63 +36,53 @@ message ObjectChangelog { message ObjectHeader { string name = 2; ObjectChangelog changelog = 5; - optional dhfs.objects.persistence.JObjectDataP pushedData = 6; -} - -message ApiObject { - ObjectHeader header = 1; - dhfs.objects.persistence.JObjectDataP content = 2; + optional dhfs.objects.persistence.RemoteObjectP pushedData = 6; } message GetObjectRequest { - string selfUuid = 10; - string name = 2; } message GetObjectReply { - string selfUuid = 10; - - ApiObject object = 1; + ObjectHeader header = 1; + dhfs.objects.persistence.RemoteObjectP content = 2; } message CanDeleteRequest { - string selfUuid = 10; - string name = 2; repeated string ourReferrers = 3; } message CanDeleteReply { - string selfUuid = 10; string objName = 1; bool deletionCandidate = 2; repeated string referrers = 3; } -message IndexUpdatePush { - string selfUuid = 10; - +message IndexUpdateOpP { ObjectHeader header = 1; } message IndexUpdateReply {} message JKleppmannTreePeriodicPushOpP { - string fromUuid = 1; int64 timestamp = 2; } +message JKleppmannTreeOpPTemp { + bytes serialized = 2; +} + message OpPushPayload { oneof payload { - dhfs.objects.persistence.JKleppmannTreeOpP jKleppmannTreeOpWrapper = 1; - JKleppmannTreePeriodicPushOpP jKleppmannTreePeriodicPushOp = 2; + JKleppmannTreeOpPTemp jKleppmannTreeOpWrapper = 1; + // dhfs.objects.persistence.JKleppmannTreeOpP jKleppmannTreeOpWrapper = 1; + // JKleppmannTreePeriodicPushOpP jKleppmannTreePeriodicPushOp = 2; + IndexUpdateOpP indexUpdateOp = 3; } } -message OpPushMsg { - string selfUuid = 10; - string queueId = 1; +message OpPushRequest { repeated OpPushPayload msg = 2; } diff --git a/dhfs-parent/server/src/main/resources/application.properties b/dhfs-parent/server/src/main/resources/application.properties index bbf3bab4..220ba49b 100644 --- a/dhfs-parent/server/src/main/resources/application.properties +++ b/dhfs-parent/server/src/main/resources/application.properties @@ -4,7 +4,7 @@ dhfs.objects.peerdiscovery.interval=5s dhfs.objects.peerdiscovery.broadcast=true dhfs.objects.sync.timeout=30 dhfs.objects.sync.ping.timeout=5 -dhfs.objects.invalidation.threads=4 +dhfs.objects.invalidation.threads=1 dhfs.objects.invalidation.delay=1000 dhfs.objects.reconnect_interval=5s dhfs.objects.write_log=false diff --git a/dhfs-parent/server/src/test/java/com/usatiuk/dhfs/files/DhfsFileServiceSimpleTestImpl.java b/dhfs-parent/server/src/test/java/com/usatiuk/dhfs/files/DhfsFileServiceSimpleTestImpl.java index 381fca46..6fa33055 100644 --- a/dhfs-parent/server/src/test/java/com/usatiuk/dhfs/files/DhfsFileServiceSimpleTestImpl.java +++ b/dhfs-parent/server/src/test/java/com/usatiuk/dhfs/files/DhfsFileServiceSimpleTestImpl.java @@ -4,6 +4,7 @@ import com.usatiuk.dhfs.TempDataProfile; import com.usatiuk.dhfs.files.objects.ChunkData; import com.usatiuk.dhfs.files.objects.File; import com.usatiuk.dhfs.files.service.DhfsFileService; +import com.usatiuk.dhfs.objects.RemoteTransaction; import com.usatiuk.dhfs.objects.TransactionManager; import com.usatiuk.dhfs.objects.transaction.Transaction; import com.usatiuk.kleppmanntree.AlreadyExistsException; @@ -45,6 +46,8 @@ public class DhfsFileServiceSimpleTestImpl { Transaction curTx; @Inject TransactionManager jObjectTxManager; + @Inject + RemoteTransaction remoteTx; // @Test // void readTest() { @@ -207,9 +210,9 @@ public class DhfsFileServiceSimpleTestImpl { jObjectTxManager.run(() -> { - var oldfile = curTx.get(File.class, ret2.get()).orElseThrow(IllegalStateException::new); + var oldfile = remoteTx.getData(File.class, ret2.get()).orElseThrow(IllegalStateException::new); var chunk = oldfile.chunks().get(0L); - var chunkObj = curTx.get(ChunkData.class, chunk).orElseThrow(IllegalStateException::new); + var chunkObj = remoteTx.getData(ChunkData.class, chunk).orElseThrow(IllegalStateException::new); }); Assertions.assertTrue(fileService.rename("/moveOverTest1", "/moveOverTest2"));