dump some server changes

This commit is contained in:
2025-02-03 22:02:05 +01:00
parent 6c93504b2c
commit 7c06241876
43 changed files with 1484 additions and 655 deletions

View File

@@ -1,27 +1,13 @@
package com.usatiuk.dhfs.files.objects;
import com.google.protobuf.ByteString;
import com.usatiuk.dhfs.objects.JDataRefcounted;
import com.usatiuk.autoprotomap.runtime.ProtoMirror;
import com.usatiuk.dhfs.objects.JDataRemote;
import com.usatiuk.dhfs.objects.JObjectKey;
import org.pcollections.PCollection;
import org.pcollections.TreePSet;
public record ChunkData(JObjectKey key, PCollection<JObjectKey> refsFrom, boolean frozen,
ByteString data) implements JDataRefcounted {
public ChunkData(JObjectKey key, ByteString data) {
this(key, TreePSet.empty(), false, data);
}
@Override
public ChunkData withRefsFrom(PCollection<JObjectKey> refs) {
return new ChunkData(key, refs, frozen, data);
}
@Override
public ChunkData withFrozen(boolean frozen) {
return new ChunkData(key, refsFrom, frozen, data);
}
import com.usatiuk.dhfs.objects.persistence.ChunkDataP;
//@ProtoMirror(ChunkDataP.class)
public record ChunkData(JObjectKey key, ByteString data) implements JDataRemote {
@Override
public int estimateSize() {
return data.size();

View File

@@ -0,0 +1,22 @@
package com.usatiuk.dhfs.files.objects;
import com.usatiuk.autoprotomap.runtime.ProtoSerializer;
import com.usatiuk.dhfs.objects.JObjectKey;
import com.usatiuk.dhfs.objects.persistence.ChunkDataP;
import jakarta.enterprise.context.ApplicationScoped;
@ApplicationScoped
public class ChunkDataSerializer implements ProtoSerializer<ChunkDataP, ChunkData> {
@Override
public ChunkData deserialize(ChunkDataP message) {
return new ChunkData(JObjectKey.of(message.getName()), message.getData());
}
@Override
public ChunkDataP serialize(ChunkData object) {
return ChunkDataP.newBuilder()
.setName(object.key().toString())
.setData(object.data())
.build();
}
}

View File

@@ -1,52 +1,48 @@
package com.usatiuk.dhfs.files.objects;
import com.usatiuk.autoprotomap.runtime.ProtoMirror;
import com.usatiuk.dhfs.objects.JObjectKey;
import org.pcollections.PCollection;
import com.usatiuk.dhfs.objects.persistence.ChunkDataP;
import org.pcollections.TreePMap;
import java.util.Collection;
import java.util.Set;
public record File(JObjectKey key, PCollection<JObjectKey> refsFrom, boolean frozen,
long mode, long cTime, long mTime,
//@ProtoMirror(ChunkDataP.class)
public record File(JObjectKey key, long mode, long cTime, long mTime,
TreePMap<Long, JObjectKey> chunks, boolean symlink, long size
) implements FsNode {
@Override
public File withRefsFrom(PCollection<JObjectKey> refs) {
return new File(key, refs, frozen, mode, cTime, mTime, chunks, symlink, size);
}
@Override
public File withFrozen(boolean frozen) {
return new File(key, refsFrom, frozen, mode, cTime, mTime, chunks, symlink, size);
}
public File withChunks(TreePMap<Long, JObjectKey> chunks) {
return new File(key, refsFrom, frozen, mode, cTime, mTime, chunks, symlink, size);
return new File(key, mode, cTime, mTime, chunks, symlink, size);
}
public File withSymlink(boolean symlink) {
return new File(key, refsFrom, frozen, mode, cTime, mTime, chunks, symlink, size);
return new File(key, mode, cTime, mTime, chunks, symlink, size);
}
public File withSize(long size) {
return new File(key, refsFrom, frozen, mode, cTime, mTime, chunks, symlink, size);
return new File(key, mode, cTime, mTime, chunks, symlink, size);
}
public File withMode(long mode) {
return new File(key, refsFrom, frozen, mode, cTime, mTime, chunks, symlink, size);
return new File(key, mode, cTime, mTime, chunks, symlink, size);
}
public File withCTime(long cTime) {
return new File(key, refsFrom, frozen, mode, cTime, mTime, chunks, symlink, size);
return new File(key, mode, cTime, mTime, chunks, symlink, size);
}
public File withMTime(long mTime) {
return new File(key, refsFrom, frozen, mode, cTime, mTime, chunks, symlink, size);
return new File(key, mode, cTime, mTime, chunks, symlink, size);
}
@Override
public Collection<JObjectKey> collectRefsTo() {
return Set.copyOf(chunks().values());
}
@Override
public int estimateSize() {
return chunks.size() * 64;
}
}

View File

@@ -0,0 +1,44 @@
package com.usatiuk.dhfs.files.objects;
import com.usatiuk.autoprotomap.runtime.ProtoSerializer;
import com.usatiuk.dhfs.objects.JObjectKey;
import com.usatiuk.dhfs.objects.persistence.FileP;
import jakarta.enterprise.context.ApplicationScoped;
import org.pcollections.TreePMap;
@ApplicationScoped
public class FileSerializer implements ProtoSerializer<FileP, File> {
@Override
public File deserialize(FileP message) {
TreePMap<Long, JObjectKey> chunks = TreePMap.empty();
for (var chunk : message.getChunksList()) {
chunks = chunks.plus(chunk.getStart(), JObjectKey.of(chunk.getId()));
}
var ret = new File(JObjectKey.of(message.getUuid()),
message.getMode(),
message.getCtime(),
message.getMtime(),
chunks,
message.getSymlink(),
message.getSize()
);
return ret;
}
@Override
public FileP serialize(File object) {
var builder = FileP.newBuilder()
.setUuid(object.key().toString())
.setMode(object.mode())
.setCtime(object.cTime())
.setMtime(object.mTime())
.setSymlink(object.symlink())
.setSize(object.size());
object.chunks().forEach((s, i) -> {
builder.addChunksBuilder()
.setStart(s)
.setId(i.toString());
});
return builder.build();
}
}

View File

@@ -1,8 +1,8 @@
package com.usatiuk.dhfs.files.objects;
import com.usatiuk.dhfs.objects.JDataRefcounted;
import com.usatiuk.dhfs.objects.JDataRemote;
public interface FsNode extends JDataRefcounted {
public interface FsNode extends JDataRemote {
long mode();
long cTime();

View File

@@ -4,9 +4,7 @@ import com.google.protobuf.ByteString;
import com.google.protobuf.UnsafeByteOperations;
import com.usatiuk.dhfs.files.objects.ChunkData;
import com.usatiuk.dhfs.files.objects.File;
import com.usatiuk.dhfs.objects.JData;
import com.usatiuk.dhfs.objects.JObjectKey;
import com.usatiuk.dhfs.objects.TransactionManager;
import com.usatiuk.dhfs.objects.*;
import com.usatiuk.dhfs.objects.jkleppmanntree.JKleppmannTreeManager;
import com.usatiuk.dhfs.objects.jkleppmanntree.structs.JKleppmannTreeNode;
import com.usatiuk.dhfs.objects.jkleppmanntree.structs.JKleppmannTreeNodeMeta;
@@ -26,7 +24,6 @@ import jakarta.inject.Inject;
import org.apache.commons.lang3.tuple.Pair;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.pcollections.TreePMap;
import org.pcollections.TreePSet;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
@@ -38,6 +35,8 @@ public class DhfsFileServiceImpl implements DhfsFileService {
@Inject
Transaction curTx;
@Inject
RemoteTransaction remoteTx;
@Inject
TransactionManager jObjectTxManager;
@ConfigProperty(name = "dhfs.files.target_chunk_size")
@@ -76,7 +75,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
private ChunkData createChunk(ByteString bytes) {
var newChunk = new ChunkData(JObjectKey.of(UUID.randomUUID().toString()), bytes);
curTx.put(newChunk);
remoteTx.put(newChunk);
return newChunk;
}
@@ -105,8 +104,13 @@ public class DhfsFileServiceImpl implements DhfsFileService {
var ref = curTx.get(JData.class, uuid).orElse(null);
if (ref == null) return Optional.empty();
GetattrRes ret;
if (ref instanceof File f) {
ret = new GetattrRes(f.mTime(), f.cTime(), f.mode(), f.symlink() ? GetattrType.SYMLINK : GetattrType.FILE);
if (ref instanceof RemoteObject r) {
var remote = remoteTx.getData(JDataRemote.class, uuid).orElse(null);
if (remote instanceof File f) {
ret = new GetattrRes(f.mTime(), f.cTime(), f.mode(), f.symlink() ? GetattrType.SYMLINK : GetattrType.FILE);
} else {
throw new StatusRuntimeException(Status.DATA_LOSS.withDescription("FsNode is not an FsNode: " + ref.key()));
}
} else if (ref instanceof JKleppmannTreeNode) {
ret = new GetattrRes(100, 100, 0700, GetattrType.DIRECTORY);
} else {
@@ -152,8 +156,8 @@ public class DhfsFileServiceImpl implements DhfsFileService {
var fuuid = UUID.randomUUID();
Log.debug("Creating file " + fuuid);
File f = new File(JObjectKey.of(fuuid.toString()), TreePSet.empty(), false, mode, System.currentTimeMillis(), System.currentTimeMillis(), TreePMap.empty(), false, 0);
curTx.put(f);
File f = new File(JObjectKey.of(fuuid.toString()), mode, System.currentTimeMillis(), System.currentTimeMillis(), TreePMap.empty(), false, 0);
remoteTx.put(f);
try {
getTree().move(parent.key(), new JKleppmannTreeNodeMetaFile(fname, f.key()), getTree().getNewNodeId());
@@ -226,9 +230,14 @@ public class DhfsFileServiceImpl implements DhfsFileService {
if (dent instanceof JKleppmannTreeNode) {
return true;
} else if (dent instanceof File f) {
curTx.put(f.withMode(mode).withMTime(System.currentTimeMillis()));
return true;
} else if (dent instanceof RemoteObject) {
var remote = remoteTx.getData(JDataRemote.class, uuid).orElse(null);
if (remote instanceof File f) {
remoteTx.put(f.withMode(mode).withMTime(System.currentTimeMillis()));
return true;
} else {
throw new IllegalArgumentException(uuid + " is not a file");
}
} else {
throw new IllegalArgumentException(uuid + " is not a file");
}
@@ -255,7 +264,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
if (offset < 0)
throw new StatusRuntimeException(Status.INVALID_ARGUMENT.withDescription("Offset should be more than zero: " + offset));
var file = curTx.get(File.class, fileUuid).orElse(null);
var file = remoteTx.getData(File.class, fileUuid).orElse(null);
if (file == null) {
Log.error("File not found when trying to read: " + fileUuid);
return Optional.empty();
@@ -315,7 +324,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
}
private ByteString readChunk(JObjectKey uuid) {
var chunkRead = curTx.get(ChunkData.class, uuid).orElse(null);
var chunkRead = remoteTx.getData(ChunkData.class, uuid).orElse(null);
if (chunkRead == null) {
Log.error("Chunk requested not found: " + uuid);
@@ -354,7 +363,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
throw new StatusRuntimeException(Status.INVALID_ARGUMENT.withDescription("Offset should be more than zero: " + offset));
// FIXME:
var file = curTx.get(File.class, fileUuid, LockingStrategy.WRITE).orElse(null);
var file = remoteTx.getData(File.class, fileUuid, LockingStrategy.WRITE).orElse(null);
if (file == null) {
Log.error("File not found when trying to write: " + fileUuid);
return -1L;
@@ -367,7 +376,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
if (size(fileUuid) < offset) {
truncate(fileUuid, offset);
file = curTx.get(File.class, fileUuid).orElse(null);
file = remoteTx.getData(File.class, fileUuid).orElse(null);
}
var chunksAll = file.chunks();
@@ -493,7 +502,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
}
file = file.withChunks(file.chunks().minusAll(removedChunks.keySet()).plusAll(newChunks)).withMTime(System.currentTimeMillis());
curTx.put(file);
remoteTx.put(file);
cleanupChunks(file, removedChunks.values());
updateFileSize(file);
@@ -507,7 +516,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
if (length < 0)
throw new StatusRuntimeException(Status.INVALID_ARGUMENT.withDescription("Length should be more than zero: " + length));
var file = curTx.get(File.class, fileUuid).orElse(null);
var file = remoteTx.getData(File.class, fileUuid).orElse(null);
if (file == null) {
Log.error("File not found when trying to write: " + fileUuid);
return false;
@@ -517,7 +526,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
var oldChunks = file.chunks();
file = file.withChunks(TreePMap.empty()).withMTime(System.currentTimeMillis());
curTx.put(file);
remoteTx.put(file);
cleanupChunks(file, oldChunks.values());
updateFileSize(file);
return true;
@@ -578,7 +587,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
}
file = file.withChunks(file.chunks().minusAll(removedChunks.keySet()).plusAll(newChunks)).withMTime(System.currentTimeMillis());
curTx.put(file);
remoteTx.put(file);
cleanupChunks(file, removedChunks.values());
updateFileSize(file);
return true;
@@ -595,7 +604,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
@Override
public ByteString readlinkBS(JObjectKey uuid) {
return jObjectTxManager.executeTx(() -> {
var fileOpt = curTx.get(File.class, uuid).orElseThrow(() -> new StatusRuntimeException(Status.NOT_FOUND.withDescription("File not found when trying to readlink: " + uuid)));
var fileOpt = remoteTx.getData(File.class, uuid).orElseThrow(() -> new StatusRuntimeException(Status.NOT_FOUND.withDescription("File not found when trying to readlink: " + uuid)));
return read(uuid, 0, Math.toIntExact(size(uuid))).get();
});
}
@@ -614,8 +623,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
Log.debug("Creating file " + fuuid);
ChunkData newChunkData = createChunk(UnsafeByteOperations.unsafeWrap(oldpath.getBytes(StandardCharsets.UTF_8)));
File f = new File(JObjectKey.of(fuuid.toString()), TreePSet.empty(),
false, 0, System.currentTimeMillis(), System.currentTimeMillis(), TreePMap.<Long, JObjectKey>empty().plus(0L, newChunkData.key()), true, 0);
File f = new File(JObjectKey.of(fuuid.toString()), 0, System.currentTimeMillis(), System.currentTimeMillis(), TreePMap.<Long, JObjectKey>empty().plus(0L, newChunkData.key()), true, 0);
updateFileSize(f);
@@ -627,12 +635,12 @@ public class DhfsFileServiceImpl implements DhfsFileService {
@Override
public Boolean setTimes(JObjectKey fileUuid, long atimeMs, long mtimeMs) {
return jObjectTxManager.executeTx(() -> {
var file = curTx.get(File.class, fileUuid).orElseThrow(
var file = remoteTx.getData(File.class, fileUuid).orElseThrow(
() -> new StatusRuntimeException(Status.NOT_FOUND.withDescription(
"File not found for setTimes: " + fileUuid))
);
curTx.put(file.withCTime(atimeMs).withMTime(mtimeMs));
remoteTx.put(file.withCTime(atimeMs).withMTime(mtimeMs));
return true;
});
}
@@ -649,7 +657,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
}
if (realSize != file.size()) {
curTx.put(file.withSize(realSize));
remoteTx.put(file.withSize(realSize));
}
});
}
@@ -657,7 +665,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
@Override
public Long size(JObjectKey uuid) {
return jObjectTxManager.executeTx(() -> {
var read = curTx.get(File.class, uuid)
var read = remoteTx.getData(File.class, uuid)
.orElseThrow(() -> new StatusRuntimeException(Status.NOT_FOUND));
return read.size();

View File

@@ -1,4 +1,21 @@
package com.usatiuk.dhfs.objects;
public interface JDataRemote {
import com.usatiuk.autoprotomap.runtime.ProtoMirror;
import com.usatiuk.dhfs.objects.persistence.RemoteObjectP;
import java.io.Serializable;
import java.util.Collection;
import java.util.List;
@ProtoMirror(RemoteObjectP.class)
public interface JDataRemote extends Serializable {
JObjectKey key();
default int estimateSize() {
return 100;
}
default Collection<JObjectKey> collectRefsTo() {
return List.of();
}
}

View File

@@ -3,7 +3,7 @@ package com.usatiuk.dhfs.objects;
import java.io.Serializable;
import java.util.UUID;
public record PeerId(UUID id) implements Serializable {
public record PeerId(UUID id) implements Serializable, Comparable<PeerId> {
public static PeerId of(UUID id) {
return new PeerId(id);
}
@@ -20,4 +20,9 @@ public record PeerId(UUID id) implements Serializable {
public JObjectKey toJObjectKey() {
return JObjectKey.of(id.toString());
}
@Override
public int compareTo(PeerId o) {
return id.compareTo(o.id);
}
}

View File

@@ -0,0 +1,6 @@
package com.usatiuk.dhfs.objects;
import org.pcollections.PMap;
public record ReceivedObject(JObjectKey key, PMap<PeerId, Long> changelog, JDataRemote data) {
}

View File

@@ -1,5 +1,8 @@
package com.usatiuk.dhfs.objects;
import com.usatiuk.dhfs.objects.jkleppmanntree.structs.JKleppmannTreeNode;
import com.usatiuk.dhfs.objects.jkleppmanntree.structs.JKleppmannTreeNodeMetaFile;
import com.usatiuk.dhfs.objects.jkleppmanntree.structs.JKleppmannTreePersistentData;
import com.usatiuk.dhfs.objects.transaction.Transaction;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
@@ -9,6 +12,21 @@ public class RefcounterTxHook implements PreCommitTxHook {
@Inject
Transaction curTx;
private JDataRefcounted getRef(JDataRefcounted cur, JObjectKey key) {
var found = curTx.get(JDataRefcounted.class, key).orElse(null);
if (found != null) {
return found;
}
if (cur instanceof RemoteObject<?> || cur instanceof JKleppmannTreeNode) {
return new RemoteObject<>(key);
} else {
return found;
}
}
@Override
public void onChange(JObjectKey key, JData old, JData cur) {
if (!(cur instanceof JDataRefcounted refCur)) {
@@ -21,14 +39,14 @@ public class RefcounterTxHook implements PreCommitTxHook {
for (var curRef : curRefs) {
if (!oldRefs.contains(curRef)) {
var referenced = curTx.get(JDataRefcounted.class, curRef).orElse(null);
var referenced = getRef(refCur, curRef);
curTx.put(referenced.withRefsFrom(referenced.refsFrom().plus(key)));
}
}
for (var oldRef : oldRefs) {
if (!curRefs.contains(oldRef)) {
var referenced = curTx.get(JDataRefcounted.class, oldRef).orElse(null);
var referenced = getRef(refCur, oldRef);
curTx.put(referenced.withRefsFrom(referenced.refsFrom().minus(key)));
}
}
@@ -41,7 +59,7 @@ public class RefcounterTxHook implements PreCommitTxHook {
}
for (var newRef : refCur.collectRefsTo()) {
var referenced = curTx.get(JDataRefcounted.class, newRef).orElse(null);
var referenced = getRef(refCur, newRef);
curTx.put(referenced.withRefsFrom(referenced.refsFrom().plus(key)));
}
}
@@ -53,7 +71,7 @@ public class RefcounterTxHook implements PreCommitTxHook {
}
for (var removedRef : refCur.collectRefsTo()) {
var referenced = curTx.get(JDataRefcounted.class, removedRef).orElse(null);
var referenced = getRef(refCur, removedRef);
curTx.put(referenced.withRefsFrom(referenced.refsFrom().minus(key)));
}
}

View File

@@ -0,0 +1,49 @@
package com.usatiuk.dhfs.objects;
import com.usatiuk.dhfs.objects.jkleppmanntree.structs.JKleppmannTreePersistentData;
import com.usatiuk.dhfs.objects.repository.invalidation.InvalidationQueueService;
import com.usatiuk.dhfs.objects.transaction.Transaction;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
@ApplicationScoped
public class RemoteObjPusherTxHook implements PreCommitTxHook {
@Inject
Transaction curTx;
@Inject
InvalidationQueueService invalidationQueueService;
@Override
public void onChange(JObjectKey key, JData old, JData cur) {
boolean invalidate = switch (cur) {
case RemoteObject<?> remote -> !remote.meta().changelog().equals(((RemoteObject) old).meta().changelog());
case JKleppmannTreePersistentData pd -> !pd.queues().equals(((JKleppmannTreePersistentData) old).queues());
default -> false;
};
if (invalidate) {
invalidationQueueService.pushInvalidationToAll(cur.key());
}
}
@Override
public void onCreate(JObjectKey key, JData cur) {
if (!(cur instanceof RemoteObject remote)) {
return;
}
invalidationQueueService.pushInvalidationToAll(remote.key());
}
@Override
public void onDelete(JObjectKey key, JData cur) {
if (!(cur instanceof RemoteObject remote)) {
return;
}
}
@Override
public int getPriority() {
return 100;
}
}

View File

@@ -1,67 +1,71 @@
package com.usatiuk.dhfs.objects;
import org.pcollections.HashTreePSet;
import org.pcollections.PCollection;
import org.pcollections.PMap;
import org.pcollections.PSet;
import org.pcollections.TreePMap;
import javax.annotation.Nullable;
import java.util.Collection;
import java.util.List;
public record RemoteObject<T>(
JObjectKey key, PCollection<JObjectKey> refsFrom, boolean frozen,
PMap<PeerId, Long> knownRemoteVersions,
Class<? extends JData> knownType,
PSet<PeerId> confirmedDeletes,
boolean seen,
PMap<PeerId, Long> changelog,
boolean haveLocal
) implements JDataRefcounted {
public record RemoteObject<T extends JDataRemote>(PCollection<JObjectKey> refsFrom, boolean frozen,
RemoteObjectMeta meta, @Nullable T data) implements JDataRefcounted {
public RemoteObject(T data, PeerId initialPeer) {
this(HashTreePSet.empty(), false, new RemoteObjectMeta(data.key(), data.getClass(), initialPeer), data);
}
public RemoteObject(JObjectKey key, PMap<PeerId, Long> remoteChangelog) {
this(HashTreePSet.empty(), false, new RemoteObjectMeta(key, remoteChangelog), null);
}
public RemoteObject(JObjectKey key) {
this(HashTreePSet.empty(), false, new RemoteObjectMeta(key, TreePMap.empty()), null);
}
@Override
public JObjectKey key() {
if (data != null && !data.key().equals(meta.key()))
throw new IllegalStateException("Corrupted object, key mismatch: " + meta.key() + " vs " + data.key());
return meta.key();
}
@Override
public RemoteObject<T> withRefsFrom(PCollection<JObjectKey> refs) {
return new RemoteObject<>(key, refs, frozen, knownRemoteVersions, knownType, confirmedDeletes, seen, changelog, haveLocal);
return new RemoteObject<>(refs, frozen, meta, data);
}
@Override
public RemoteObject<T> withFrozen(boolean frozen) {
return new RemoteObject<>(key, refsFrom, frozen, knownRemoteVersions, knownType, confirmedDeletes, seen, changelog, haveLocal);
return new RemoteObject<>(refsFrom, frozen, meta, data);
}
public RemoteObject<T> withKnownRemoteVersions(PMap<PeerId, Long> knownRemoteVersions) {
return new RemoteObject<>(key, refsFrom, frozen, knownRemoteVersions, knownType, confirmedDeletes, seen, changelog, haveLocal);
public RemoteObject<T> withMeta(RemoteObjectMeta meta) {
return new RemoteObject<>(refsFrom, frozen, meta, data);
}
public RemoteObject<T> withKnownType(Class<? extends JData> knownType) {
return new RemoteObject<>(key, refsFrom, frozen, knownRemoteVersions, knownType, confirmedDeletes, seen, changelog, haveLocal);
public RemoteObject<T> withData(T data) {
return new RemoteObject<>(refsFrom, frozen, meta, data);
}
public RemoteObject<T> withConfirmedDeletes(PSet<PeerId> confirmedDeletes) {
return new RemoteObject<>(key, refsFrom, frozen, knownRemoteVersions, knownType, confirmedDeletes, seen, changelog, haveLocal);
public RemoteObject<T> withRefsFrom(PCollection<JObjectKey> refs, boolean frozen) {
return new RemoteObject<>(refs, frozen, meta, data);
}
public RemoteObject<T> withSeen(boolean seen) {
return new RemoteObject<>(key, refsFrom, frozen, knownRemoteVersions, knownType, confirmedDeletes, seen, changelog, haveLocal);
}
public RemoteObject<T> withChangelog(PMap<PeerId, Long> changelog) {
return new RemoteObject<>(key, refsFrom, frozen, knownRemoteVersions, knownType, confirmedDeletes, seen, changelog, haveLocal);
}
public RemoteObject<T> withHaveLocal(boolean haveLocal) {
return new RemoteObject<>(key, refsFrom, frozen, knownRemoteVersions, knownType, confirmedDeletes, seen, changelog, haveLocal);
}
public static JObjectKey keyFrom(JObjectKey key) {
return new JObjectKey(key + "_remote");
}
public JObjectKey localKey() {
if (!haveLocal) throw new IllegalStateException("No local key");
return JObjectKey.of(key.name().substring(0, key.name().length() - "_remote".length()));
public ReceivedObject toReceivedObject() {
if (data == null)
throw new IllegalStateException("Cannot convert to ReceivedObject without data: " + meta.key());
return new ReceivedObject(meta.key(), meta.changelog(), data);
}
@Override
public Collection<JObjectKey> collectRefsTo() {
if (haveLocal) return List.of(localKey());
if (data != null) return data.collectRefsTo();
return List.of();
}
@Override
public int estimateSize() {
return data == null ? 1000 : data.estimateSize();
}
}

View File

@@ -0,0 +1,53 @@
package com.usatiuk.dhfs.objects;
import org.pcollections.HashTreePMap;
import org.pcollections.HashTreePSet;
import org.pcollections.PMap;
import org.pcollections.PSet;
import java.io.Serializable;
public record RemoteObjectMeta(
JObjectKey key,
PMap<PeerId, Long> knownRemoteVersions,
Class<? extends JDataRemote> knownType,
PSet<PeerId> confirmedDeletes,
boolean seen,
PMap<PeerId, Long> changelog) implements Serializable {
public RemoteObjectMeta(JObjectKey key, Class<? extends JDataRemote> type, PeerId initialPeer) {
this(key, HashTreePMap.empty(), type, HashTreePSet.empty(), true,
HashTreePMap.<PeerId, Long>empty().plus(initialPeer, 1L));
}
public RemoteObjectMeta(JObjectKey key, PMap<PeerId, Long> remoteChangelog) {
this(key, HashTreePMap.empty(), JDataRemote.class, HashTreePSet.empty(), true, remoteChangelog);
}
public RemoteObjectMeta withKnownRemoteVersions(PMap<PeerId, Long> knownRemoteVersions) {
return new RemoteObjectMeta(key, knownRemoteVersions, knownType, confirmedDeletes, seen, changelog);
}
public RemoteObjectMeta withKnownType(Class<? extends JDataRemote> knownType) {
return new RemoteObjectMeta(key, knownRemoteVersions, knownType, confirmedDeletes, seen, changelog);
}
public RemoteObjectMeta withConfirmedDeletes(PSet<PeerId> confirmedDeletes) {
return new RemoteObjectMeta(key, knownRemoteVersions, knownType, confirmedDeletes, seen, changelog);
}
public RemoteObjectMeta withSeen(boolean seen) {
return new RemoteObjectMeta(key, knownRemoteVersions, knownType, confirmedDeletes, seen, changelog);
}
public RemoteObjectMeta withChangelog(PMap<PeerId, Long> changelog) {
return new RemoteObjectMeta(key, knownRemoteVersions, knownType, confirmedDeletes, seen, changelog);
}
public RemoteObjectMeta withHaveLocal(boolean haveLocal) {
return new RemoteObjectMeta(key, knownRemoteVersions, knownType, confirmedDeletes, seen, changelog);
}
public long versionSum() {
return changelog.values().stream().mapToLong(Long::longValue).sum();
}
}

View File

@@ -1,10 +1,13 @@
package com.usatiuk.dhfs.objects;
import com.usatiuk.dhfs.objects.repository.PersistentPeerDataService;
import com.usatiuk.dhfs.objects.repository.RemoteObjectServiceClient;
import com.usatiuk.dhfs.objects.repository.SyncHandler;
import com.usatiuk.dhfs.objects.transaction.LockingStrategy;
import com.usatiuk.dhfs.objects.transaction.Transaction;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.commons.lang3.mutable.MutableObject;
import java.util.Optional;
@@ -12,20 +15,96 @@ import java.util.Optional;
public class RemoteTransaction {
@Inject
Transaction curTx;
@Inject
RemoteObjectServiceClient remoteObjectServiceClient;
@Inject
SyncHandler syncHandler;
@Inject
PersistentPeerDataService persistentPeerDataService;
public long getId() {
return curTx.getId();
}
public <T extends JDataRemote> Optional<T> get(Class<T> type, JObjectKey key, LockingStrategy strategy) {
throw new NotImplementedException();
private <T extends JDataRemote> RemoteObject<T> tryDownloadRemote(RemoteObject<T> obj) {
MutableObject<RemoteObject<T>> success = new MutableObject<>(null);
remoteObjectServiceClient.getObject(obj.key(), rcv -> {
if (!obj.meta().knownType().isInstance(rcv.getRight().data()))
throw new IllegalStateException("Object type mismatch: " + obj.meta().knownType() + " vs " + rcv.getRight().data().getClass());
if (!rcv.getRight().changelog().equals(obj.meta().changelog())) {
var updated = syncHandler.handleRemoteUpdate(rcv.getLeft(), obj.key(), obj, rcv.getRight().changelog());
if (!rcv.getRight().changelog().equals(updated.meta().changelog()))
throw new IllegalStateException("Changelog mismatch, update failed?: " + rcv.getRight().changelog() + " vs " + updated.meta().changelog());
success.setValue(updated.withData((T) rcv.getRight().data()));
} else {
success.setValue(obj.withData((T) rcv.getRight().data()));
}
return true;
});
curTx.put(success.getValue());
return success.getValue();
}
public <T extends JDataRemote> void put(JData obj) {
throw new NotImplementedException();
@SuppressWarnings("unchecked")
public <T extends JDataRemote> Optional<RemoteObject<T>> get(Class<T> type, JObjectKey key, LockingStrategy strategy) {
return curTx.get(RemoteObject.class, key, strategy)
.map(obj -> {
if (obj.data() != null && !type.isInstance(obj.data()))
throw new IllegalStateException("Object (real) type mismatch: " + obj.data().getClass() + " vs " + type);
if (!type.isAssignableFrom(obj.meta().knownType()))
throw new IllegalStateException("Object (meta) type mismatch: " + obj.meta().knownType() + " vs " + type);
if (obj.data() != null)
return obj;
else
return tryDownloadRemote(obj);
});
}
public <T extends JDataRemote> Optional<T> get(Class<T> type, JObjectKey key) {
public Optional<RemoteObjectMeta> getMeta(JObjectKey key, LockingStrategy strategy) {
return curTx.get(RemoteObject.class, key, strategy).map(obj -> obj.meta());
}
public <T extends JDataRemote> Optional<T> getData(Class<T> type, JObjectKey key, LockingStrategy strategy) {
return get(type, key, strategy).map(RemoteObject::data);
}
public <T extends JDataRemote> void put(RemoteObject<T> obj) {
curTx.put(obj);
}
public <T extends JDataRemote> void put(T obj) {
var cur = get((Class<T>) obj.getClass(), obj.key()).orElse(null);
if (cur == null) {
curTx.put(new RemoteObject<>(obj, persistentPeerDataService.getSelfUuid()));
return;
}
if (cur.data() != null && cur.data().equals(obj))
return;
if (cur.data() != null && !cur.data().getClass().equals(obj.getClass()))
throw new IllegalStateException("Object type mismatch: " + cur.data().getClass() + " vs " + obj.getClass());
var newMeta = cur.meta();
newMeta = newMeta.withChangelog(newMeta.changelog().plus(persistentPeerDataService.getSelfUuid(),
newMeta.changelog().get(persistentPeerDataService.getSelfUuid()) + 1));
var newObj = cur.withData(obj).withMeta(newMeta);
curTx.put(newObj);
}
public <T extends JDataRemote> Optional<RemoteObject<T>> get(Class<T> type, JObjectKey key) {
return get(type, key, LockingStrategy.OPTIMISTIC);
}
public Optional<RemoteObjectMeta> getMeta(JObjectKey key) {
return getMeta(key, LockingStrategy.OPTIMISTIC);
}
public <T extends JDataRemote> Optional<T> getData(Class<T> type, JObjectKey key) {
return getData(type, key, LockingStrategy.OPTIMISTIC);
}
}

View File

@@ -1,36 +1,39 @@
package com.usatiuk.dhfs.objects.jkleppmanntree;
import com.usatiuk.dhfs.objects.JObjectKey;
import com.usatiuk.dhfs.objects.PeerId;
import com.usatiuk.dhfs.objects.TransactionManager;
import com.usatiuk.dhfs.objects.jkleppmanntree.structs.JKleppmannTreeNode;
import com.usatiuk.dhfs.objects.jkleppmanntree.structs.JKleppmannTreeNodeMeta;
import com.usatiuk.dhfs.objects.jkleppmanntree.structs.JKleppmannTreeNodeMetaDirectory;
import com.usatiuk.dhfs.objects.jkleppmanntree.structs.JKleppmannTreePersistentData;
import com.usatiuk.dhfs.objects.repository.invalidation.Op;
import com.usatiuk.dhfs.objects.repository.peersync.PeerInfoService;
import com.usatiuk.dhfs.objects.transaction.LockingStrategy;
import com.usatiuk.dhfs.objects.transaction.Transaction;
import com.usatiuk.kleppmanntree.*;
import io.quarkus.logging.Log;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import org.apache.commons.lang3.tuple.Pair;
import org.pcollections.HashTreePMap;
import org.pcollections.TreePMap;
import org.pcollections.TreePSet;
import java.util.HashMap;
import java.util.List;
import java.util.TreeMap;
import java.util.UUID;
import java.util.*;
import java.util.function.Function;
@ApplicationScoped
public class JKleppmannTreeManager {
private static final String dataFileName = "trees";
@Inject
JKleppmannTreePeerInterface jKleppmannTreePeerInterface;
@Inject
Transaction curTx;
@Inject
TransactionManager txManager;
@Inject
JKleppmannTreePeerInterface peerInterface;
@Inject
PeerInfoService peerInfoService;
public JKleppmannTree getTree(JObjectKey name) {
return txManager.executeTx(() -> {
@@ -41,7 +44,7 @@ public class JKleppmannTreeManager {
TreePSet.empty(),
true,
1L,
new HashMap<>(),
HashTreePMap.empty(),
new HashMap<>(),
new TreeMap<>()
);
@@ -57,7 +60,7 @@ public class JKleppmannTreeManager {
}
public class JKleppmannTree {
private final KleppmannTree<Long, UUID, JKleppmannTreeNodeMeta, JObjectKey> _tree;
private final KleppmannTree<Long, PeerId, JKleppmannTreeNodeMeta, JObjectKey> _tree;
private final JKleppmannTreeStorageInterface _storageInterface;
private final JKleppmannTreeClock _clock;
private final JObjectKey _treeName;
@@ -89,105 +92,71 @@ public class JKleppmannTreeManager {
_tree.move(_storageInterface.getTrashId(), newMeta.withName(nodeKey.toString()), nodeKey);
}
// @Override
// public boolean hasPendingOpsForHost(UUID host) {
// return _persistentData.get()
// .runReadLocked(JObjectManager.ResolutionStrategy.LOCAL_ONLY,
// (m, d) -> d.getQueues().containsKey(host) &&
// !d.getQueues().get(host).isEmpty()
// );
// }
//
// @Override
// public List<Op> getPendingOpsForHost(UUID host, int limit) {
// return _persistentData.get().runReadLocked(JObjectManager.ResolutionStrategy.LOCAL_ONLY, (m, d) -> {
// if (d.getQueues().containsKey(host)) {
// var queue = d.getQueues().get(host);
// ArrayList<Op> collected = new ArrayList<>();
//
// for (var node : queue.entrySet()) {
// collected.add(new JKleppmannTreeOpWrapper(node.getValue()));
// if (collected.size() >= limit) break;
// }
//
// return collected;
// }
// return List.of();
// });
// }
public boolean hasPendingOpsForHost(PeerId host) {
return !_data.queues().getOrDefault(host, TreePMap.empty()).isEmpty();
}
// @Override
// public String getId() {
// return _treeName;
// }
public List<Op> getPendingOpsForHost(PeerId host, int limit) {
ArrayList<Op> collected = new ArrayList<>();
for (var node : _data.queues().getOrDefault(host, TreePMap.empty()).entrySet()) {
collected.add(new JKleppmannTreeOpWrapper(_data.key(), node.getValue()));
if (collected.size() >= limit) break;
}
return Collections.unmodifiableList(collected);
}
// @Override
// public void commitOpForHost(UUID host, Op op) {
// if (!(op instanceof JKleppmannTreeOpWrapper jop))
// throw new IllegalArgumentException("Invalid incoming op type for JKleppmannTree: " + op.getClass() + " " + getId());
// _persistentData.get().assertRwLock();
// _persistentData.get().tryResolve(JObjectManager.ResolutionStrategy.LOCAL_ONLY);
//
// var got = _persistentData.get().getData().getQueues().get(host).firstEntry().getValue();
// if (!Objects.equals(jop.getOp(), got))
// throw new IllegalArgumentException("Committed op push was not the oldest");
//
// _persistentData.get().mutate(new JMutator<JKleppmannTreePersistentData>() {
// @Override
// public boolean mutate(JKleppmannTreePersistentData object) {
// object.getQueues().get(host).pollFirstEntry();
// return true;
// }
//
// @Override
// public void revert(JKleppmannTreePersistentData object) {
// object.getQueues().get(host).put(jop.getOp().timestamp(), jop.getOp());
// }
// });
//
// }
// @Override
public void commitOpForHost(PeerId host, Op op) {
if (!(op instanceof JKleppmannTreeOpWrapper jop))
throw new IllegalArgumentException("Invalid incoming op type for JKleppmannTree: " + op.getClass());
// @Override
// public void pushBootstrap(UUID host) {
// _tree.recordBoostrapFor(host);
// }
var firstOp = _data.queues().get(host).firstEntry().getValue();
if (!Objects.equals(firstOp, jop.op()))
throw new IllegalArgumentException("Committed op push was not the oldest");
public Pair<String, JObjectKey> findParent(Function<TreeNode<Long, UUID, JKleppmannTreeNodeMeta, JObjectKey>, Boolean> predicate) {
return _tree.findParent(predicate);
_data = _data.withQueues(_data.queues().plus(host, _data.queues().get(host).minus(_data.queues().get(host).firstKey())));
}
// @Override
// public boolean acceptExternalOp(UUID from, Op op) {
// if (op instanceof JKleppmannTreePeriodicPushOp pushOp) {
// return _tree.updateExternalTimestamp(pushOp.getFrom(), pushOp.getTimestamp());
// }
//
// if (!(op instanceof JKleppmannTreeOpWrapper jop))
// throw new IllegalArgumentException("Invalid incoming op type for JKleppmannTree: " + op.getClass() + " " + getId());
//
// JObject<?> fileRef;
// if (jop.getOp().newMeta() instanceof JKleppmannTreeNodeMetaFile f) {
// public void pushBootstrap(PeerId host) {
// _tree.recordBoostrapFor(host);
// }
public Pair<String, JObjectKey> findParent(Function<TreeNode<Long, PeerId, JKleppmannTreeNodeMeta, JObjectKey>, Boolean> predicate) {
return _tree.findParent(predicate);
}
// @Override
public boolean acceptExternalOp(PeerId from, Op op) {
if (op instanceof JKleppmannTreePeriodicPushOp pushOp) {
return _tree.updateExternalTimestamp(pushOp.getFrom(), pushOp.getTimestamp());
}
if (!(op instanceof JKleppmannTreeOpWrapper jop))
throw new IllegalArgumentException("Invalid incoming op type for JKleppmannTree: " + op.getClass());
// if (jop.op().newMeta() instanceof JKleppmannTreeNodeMetaFile f) {
// var fino = f.getFileIno();
// fileRef = jObjectManager.getOrPut(fino, File.class, Optional.of(jop.getOp().childId()));
// } else {
// fileRef = null;
// }
//
// if (Log.isTraceEnabled())
// Log.trace("Received op from " + from + ": " + jop.getOp().timestamp().timestamp() + " " + jop.getOp().childId() + "->" + jop.getOp().newParentId() + " as " + jop.getOp().newMeta().getName());
//
// try {
// _tree.applyExternalOp(from, jop.getOp());
// } catch (Exception e) {
// Log.error("Error applying external op", e);
// throw e;
// } finally {
// // FIXME:
// // Fixup the ref if it didn't really get applied
//
if (Log.isTraceEnabled())
Log.trace("Received op from " + from + ": " + jop.op().timestamp().timestamp() + " " + jop.op().childId() + "->" + jop.op().newParentId() + " as " + jop.op().newMeta().getName());
try {
_tree.applyExternalOp(from, jop.op());
} catch (Exception e) {
Log.error("Error applying external op", e);
throw e;
} finally {
// FIXME:
// Fixup the ref if it didn't really get applied
// if ((fileRef == null) && (jop.getOp().newMeta() instanceof JKleppmannTreeNodeMetaFile))
// Log.error("Could not create child of pushed op: " + jop.getOp());
//
// if (jop.getOp().newMeta() instanceof JKleppmannTreeNodeMetaFile f) {
// if (fileRef != null) {
// var got = jObjectManager.get(jop.getOp().childId()).orElse(null);
@@ -216,9 +185,9 @@ public class JKleppmannTreeManager {
// }
// }
// }
// }
// return true;
// }
}
return true;
}
// @Override
// public Op getPeriodicPushOp() {
@@ -232,9 +201,12 @@ public class JKleppmannTreeManager {
// _persistentData.get().rwUnlock();
// }
private class JOpRecorder implements OpRecorder<Long, UUID, JKleppmannTreeNodeMeta, JObjectKey> {
private class JOpRecorder implements OpRecorder<Long, PeerId, JKleppmannTreeNodeMeta, JObjectKey> {
@Override
public void recordOp(OpMove<Long, UUID, JKleppmannTreeNodeMeta, JObjectKey> op) {
public void recordOp(OpMove<Long, PeerId, JKleppmannTreeNodeMeta, JObjectKey> op) {
for (var p : peerInfoService.getPeersNoSelf()) {
recordOpForPeer(p.id(), op);
}
// _persistentData.get().assertRwLock();
// _persistentData.get().tryResolve(JObjectManager.ResolutionStrategy.LOCAL_ONLY);
// var hostUuds = persistentPeerDataService.getHostUuids().stream().toList();
@@ -254,7 +226,8 @@ public class JKleppmannTreeManager {
}
@Override
public void recordOpForPeer(UUID peer, OpMove<Long, UUID, JKleppmannTreeNodeMeta, JObjectKey> op) {
public void recordOpForPeer(PeerId peer, OpMove<Long, PeerId, JKleppmannTreeNodeMeta, JObjectKey> op) {
_data = _data.withQueues(_data.queues().plus(peer, _data.queues().getOrDefault(peer, TreePMap.empty()).plus(op.timestamp(), op)));
// _persistentData.get().assertRwLock();
// _persistentData.get().tryResolve(JObjectManager.ResolutionStrategy.LOCAL_ONLY);
// _persistentData.get().mutate(new JMutator<JKleppmannTreePersistentData>() {
@@ -296,7 +269,7 @@ public class JKleppmannTreeManager {
}
}
public class JKleppmannTreeStorageInterface implements StorageInterface<Long, UUID, JKleppmannTreeNodeMeta, JObjectKey> {
public class JKleppmannTreeStorageInterface implements StorageInterface<Long, PeerId, JKleppmannTreeNodeMeta, JObjectKey> {
private final LogWrapper _logWrapper = new LogWrapper();
private final PeerLogWrapper _peerLogWrapper = new PeerLogWrapper();
@@ -330,7 +303,7 @@ public class JKleppmannTreeManager {
}
@Override
public void putNode(TreeNode<Long, UUID, JKleppmannTreeNodeMeta, JObjectKey> node) {
public void putNode(TreeNode<Long, PeerId, JKleppmannTreeNodeMeta, JObjectKey> node) {
curTx.put(((JKleppmannTreeNode) node));
}
@@ -340,23 +313,23 @@ public class JKleppmannTreeManager {
}
@Override
public LogInterface<Long, UUID, JKleppmannTreeNodeMeta, JObjectKey> getLog() {
public LogInterface<Long, PeerId, JKleppmannTreeNodeMeta, JObjectKey> getLog() {
return _logWrapper;
}
@Override
public PeerTimestampLogInterface<Long, UUID> getPeerTimestampLog() {
public PeerTimestampLogInterface<Long, PeerId> getPeerTimestampLog() {
return _peerLogWrapper;
}
private class PeerLogWrapper implements PeerTimestampLogInterface<Long, UUID> {
private class PeerLogWrapper implements PeerTimestampLogInterface<Long, PeerId> {
@Override
public Long getForPeer(UUID peerId) {
public Long getForPeer(PeerId peerId) {
return _data.peerTimestampLog().get(peerId);
}
@Override
public void putForPeer(UUID peerId, Long timestamp) {
public void putForPeer(PeerId peerId, Long timestamp) {
var newPeerTimestampLog = new HashMap<>(_data.peerTimestampLog());
newPeerTimestampLog.put(peerId, timestamp);
_data = _data.withPeerTimestampLog(newPeerTimestampLog);
@@ -364,16 +337,16 @@ public class JKleppmannTreeManager {
}
}
private class LogWrapper implements LogInterface<Long, UUID, JKleppmannTreeNodeMeta, JObjectKey> {
private class LogWrapper implements LogInterface<Long, PeerId, JKleppmannTreeNodeMeta, JObjectKey> {
@Override
public Pair<CombinedTimestamp<Long, UUID>, LogRecord<Long, UUID, JKleppmannTreeNodeMeta, JObjectKey>> peekOldest() {
public Pair<CombinedTimestamp<Long, PeerId>, LogRecord<Long, PeerId, JKleppmannTreeNodeMeta, JObjectKey>> peekOldest() {
var ret = _data.log().firstEntry();
if (ret == null) return null;
return Pair.of(ret);
}
@Override
public Pair<CombinedTimestamp<Long, UUID>, LogRecord<Long, UUID, JKleppmannTreeNodeMeta, JObjectKey>> takeOldest() {
public Pair<CombinedTimestamp<Long, PeerId>, LogRecord<Long, PeerId, JKleppmannTreeNodeMeta, JObjectKey>> takeOldest() {
var newLog = new TreeMap<>(_data.log());
var ret = newLog.pollFirstEntry();
_data = _data.withLog(newLog);
@@ -383,19 +356,19 @@ public class JKleppmannTreeManager {
}
@Override
public Pair<CombinedTimestamp<Long, UUID>, LogRecord<Long, UUID, JKleppmannTreeNodeMeta, JObjectKey>> peekNewest() {
public Pair<CombinedTimestamp<Long, PeerId>, LogRecord<Long, PeerId, JKleppmannTreeNodeMeta, JObjectKey>> peekNewest() {
var ret = _data.log().lastEntry();
if (ret == null) return null;
return Pair.of(ret);
}
@Override
public List<Pair<CombinedTimestamp<Long, UUID>, LogRecord<Long, UUID, JKleppmannTreeNodeMeta, JObjectKey>>> newestSlice(CombinedTimestamp<Long, UUID> since, boolean inclusive) {
public List<Pair<CombinedTimestamp<Long, PeerId>, LogRecord<Long, PeerId, JKleppmannTreeNodeMeta, JObjectKey>>> newestSlice(CombinedTimestamp<Long, PeerId> since, boolean inclusive) {
return _data.log().tailMap(since, inclusive).entrySet().stream().map(e -> Pair.of(e.getKey(), e.getValue())).toList();
}
@Override
public List<Pair<CombinedTimestamp<Long, UUID>, LogRecord<Long, UUID, JKleppmannTreeNodeMeta, JObjectKey>>> getAll() {
public List<Pair<CombinedTimestamp<Long, PeerId>, LogRecord<Long, PeerId, JKleppmannTreeNodeMeta, JObjectKey>>> getAll() {
return _data.log().entrySet().stream().map(e -> Pair.of(e.getKey(), e.getValue())).toList();
}
@@ -405,7 +378,7 @@ public class JKleppmannTreeManager {
}
@Override
public boolean containsKey(CombinedTimestamp<Long, UUID> timestamp) {
public boolean containsKey(CombinedTimestamp<Long, PeerId> timestamp) {
return _data.log().containsKey(timestamp);
}
@@ -415,7 +388,7 @@ public class JKleppmannTreeManager {
}
@Override
public void put(CombinedTimestamp<Long, UUID> timestamp, LogRecord<Long, UUID, JKleppmannTreeNodeMeta, JObjectKey> record) {
public void put(CombinedTimestamp<Long, PeerId> timestamp, LogRecord<Long, PeerId, JKleppmannTreeNodeMeta, JObjectKey> record) {
if (_data.log().containsKey(timestamp))
throw new IllegalStateException("Overwriting log entry?");
var newLog = new TreeMap<>(_data.log());
@@ -425,7 +398,7 @@ public class JKleppmannTreeManager {
}
@Override
public void replace(CombinedTimestamp<Long, UUID> timestamp, LogRecord<Long, UUID, JKleppmannTreeNodeMeta, JObjectKey> record) {
public void replace(CombinedTimestamp<Long, PeerId> timestamp, LogRecord<Long, PeerId, JKleppmannTreeNodeMeta, JObjectKey> record) {
var newLog = new TreeMap<>(_data.log());
newLog.put(timestamp, record);
_data = _data.withLog(newLog);

View File

@@ -1,24 +1,16 @@
package com.usatiuk.dhfs.objects.jkleppmanntree;
import com.usatiuk.dhfs.objects.JObjectKey;
import com.usatiuk.dhfs.objects.PeerId;
import com.usatiuk.dhfs.objects.jkleppmanntree.structs.JKleppmannTreeNodeMeta;
import com.usatiuk.dhfs.objects.repository.invalidation.Op;
import com.usatiuk.kleppmanntree.OpMove;
import java.util.UUID;
import java.io.Serializable;
// Wrapper to avoid having to specify generic types
public class JKleppmannTreeOpWrapper {
private final OpMove<Long, UUID, JKleppmannTreeNodeMeta, JObjectKey> _op;
public JKleppmannTreeOpWrapper(OpMove<Long, UUID, JKleppmannTreeNodeMeta, JObjectKey> op) {
if (op == null) throw new IllegalArgumentException("op shouldn't be null");
_op = op;
}
public OpMove<Long, UUID, JKleppmannTreeNodeMeta, JObjectKey> getOp() {
return _op;
}
public record JKleppmannTreeOpWrapper(JObjectKey treeName,
OpMove<Long, PeerId, JKleppmannTreeNodeMeta, JObjectKey> op) implements Op, Serializable {
// @Override
// public Collection<JObjectKey> getEscapedRefs() {
// if (_op.newMeta() instanceof JKleppmannTreeNodeMetaFile mf) {

View File

@@ -1,6 +1,11 @@
package com.usatiuk.dhfs.objects.jkleppmanntree;
import com.usatiuk.dhfs.objects.PeerId;
import com.usatiuk.dhfs.objects.repository.PersistentPeerDataService;
import com.usatiuk.dhfs.objects.repository.peersync.PeerInfo;
import com.usatiuk.dhfs.objects.repository.peersync.PeerInfoService;
import com.usatiuk.kleppmanntree.PeerInterface;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import java.util.Collection;
@@ -8,14 +13,19 @@ import java.util.List;
import java.util.UUID;
@Singleton
public class JKleppmannTreePeerInterface implements PeerInterface<UUID> {
public class JKleppmannTreePeerInterface implements PeerInterface<PeerId> {
@Inject
PeerInfoService peerInfoService;
@Inject
PersistentPeerDataService persistentPeerDataService;
@Override
public UUID getSelfId() {
return UUID.nameUUIDFromBytes("1".getBytes());
public PeerId getSelfId() {
return persistentPeerDataService.getSelfUuid();
}
@Override
public Collection<UUID> getAllPeers() {
return List.of(getSelfId());
public Collection<PeerId> getAllPeers() {
return peerInfoService.getPeers().stream().map(PeerInfo::id).toList();
}
}

View File

@@ -1,17 +1,19 @@
package com.usatiuk.dhfs.objects.jkleppmanntree;
import com.usatiuk.dhfs.objects.PeerId;
import java.util.UUID;
public class JKleppmannTreePeriodicPushOp {
private final UUID _from;
private final PeerId _from;
private final long _timestamp;
public JKleppmannTreePeriodicPushOp(UUID from, long timestamp) {
public JKleppmannTreePeriodicPushOp(PeerId from, long timestamp) {
_from = from;
_timestamp = timestamp;
}
public UUID getFrom() {
public PeerId getFrom() {
return _from;
}

View File

@@ -2,6 +2,7 @@ package com.usatiuk.dhfs.objects.jkleppmanntree.structs;
import com.usatiuk.dhfs.objects.JDataRefcounted;
import com.usatiuk.dhfs.objects.JObjectKey;
import com.usatiuk.dhfs.objects.PeerId;
import com.usatiuk.dhfs.objects.repository.peersync.structs.JKleppmannTreeNodeMetaPeer;
import com.usatiuk.kleppmanntree.OpMove;
import com.usatiuk.kleppmanntree.TreeNode;
@@ -12,15 +13,14 @@ import java.io.Serializable;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.Stream;
// FIXME: Ideally this is two classes?
public record JKleppmannTreeNode(JObjectKey key, PCollection<JObjectKey> refsFrom, boolean frozen, JObjectKey parent,
OpMove<Long, UUID, JKleppmannTreeNodeMeta, JObjectKey> lastEffectiveOp,
OpMove<Long, PeerId, JKleppmannTreeNodeMeta, JObjectKey> lastEffectiveOp,
JKleppmannTreeNodeMeta meta,
Map<String, JObjectKey> children) implements TreeNode<Long, UUID, JKleppmannTreeNodeMeta, JObjectKey>, JDataRefcounted, Serializable {
Map<String, JObjectKey> children) implements TreeNode<Long, PeerId, JKleppmannTreeNodeMeta, JObjectKey>, JDataRefcounted, Serializable {
public JKleppmannTreeNode(JObjectKey id, JObjectKey parent, JKleppmannTreeNodeMeta meta) {
this(id, TreePSet.empty(), false, parent, null, meta, Collections.emptyMap());
@@ -32,7 +32,7 @@ public record JKleppmannTreeNode(JObjectKey key, PCollection<JObjectKey> refsFro
}
@Override
public JKleppmannTreeNode withLastEffectiveOp(OpMove<Long, UUID, JKleppmannTreeNodeMeta, JObjectKey> lastEffectiveOp) {
public JKleppmannTreeNode withLastEffectiveOp(OpMove<Long, PeerId, JKleppmannTreeNodeMeta, JObjectKey> lastEffectiveOp) {
return new JKleppmannTreeNode(key, refsFrom, frozen, parent, lastEffectiveOp, meta, children);
}

View File

@@ -2,41 +2,24 @@ package com.usatiuk.dhfs.objects.jkleppmanntree.structs;
import com.usatiuk.dhfs.objects.JDataRefcounted;
import com.usatiuk.dhfs.objects.JObjectKey;
import com.usatiuk.dhfs.objects.PeerId;
import com.usatiuk.kleppmanntree.CombinedTimestamp;
import com.usatiuk.kleppmanntree.LogRecord;
import com.usatiuk.kleppmanntree.OpMove;
import org.pcollections.PCollection;
import org.pcollections.PMap;
import org.pcollections.PSortedMap;
import org.pcollections.TreePMap;
import java.util.*;
public record JKleppmannTreePersistentData(
JObjectKey key, PCollection<JObjectKey> refsFrom, boolean frozen,
long clock,
HashMap<UUID, TreeMap<CombinedTimestamp<Long, UUID>, OpMove<Long, UUID, JKleppmannTreeNodeMeta, JObjectKey>>> queues,
HashMap<UUID, Long> peerTimestampLog,
TreeMap<CombinedTimestamp<Long, UUID>, LogRecord<Long, UUID, JKleppmannTreeNodeMeta, JObjectKey>> log
PMap<PeerId, PSortedMap<CombinedTimestamp<Long, PeerId>, OpMove<Long, PeerId, JKleppmannTreeNodeMeta, JObjectKey>>> queues,
HashMap<PeerId, Long> peerTimestampLog,
TreeMap<CombinedTimestamp<Long, PeerId>, LogRecord<Long, PeerId, JKleppmannTreeNodeMeta, JObjectKey>> log
) implements JDataRefcounted {
void recordOp(UUID host, OpMove<Long, UUID, JKleppmannTreeNodeMeta, JObjectKey> opMove) {
queues().computeIfAbsent(host, h -> new TreeMap<>());
queues().get(host).put(opMove.timestamp(), opMove);
}
void removeOp(UUID host, OpMove<Long, UUID, JKleppmannTreeNodeMeta, JObjectKey> opMove) {
queues().get(host).remove(opMove.timestamp(), opMove);
}
void recordOp(Collection<UUID> hosts, OpMove<Long, UUID, JKleppmannTreeNodeMeta, JObjectKey> opMove) {
for (var u : hosts) {
recordOp(u, opMove);
}
}
void removeOp(Collection<UUID> hosts, OpMove<Long, UUID, JKleppmannTreeNodeMeta, JObjectKey> opMove) {
for (var u : hosts) {
removeOp(u, opMove);
}
}
@Override
public JKleppmannTreePersistentData withRefsFrom(PCollection<JObjectKey> refs) {
return new JKleppmannTreePersistentData(key, refs, frozen, clock, queues, peerTimestampLog, log);
@@ -51,15 +34,15 @@ public record JKleppmannTreePersistentData(
return new JKleppmannTreePersistentData(key, refsFrom, frozen, clock, queues, peerTimestampLog, log);
}
public JKleppmannTreePersistentData withQueues(HashMap<UUID, TreeMap<CombinedTimestamp<Long, UUID>, OpMove<Long, UUID, JKleppmannTreeNodeMeta, JObjectKey>>> queues) {
public JKleppmannTreePersistentData withQueues(PMap<PeerId, PSortedMap<CombinedTimestamp<Long, PeerId>, OpMove<Long, PeerId, JKleppmannTreeNodeMeta, JObjectKey>>> queues) {
return new JKleppmannTreePersistentData(key, refsFrom, frozen, clock, queues, peerTimestampLog, log);
}
public JKleppmannTreePersistentData withPeerTimestampLog(HashMap<UUID, Long> peerTimestampLog) {
public JKleppmannTreePersistentData withPeerTimestampLog(HashMap<PeerId, Long> peerTimestampLog) {
return new JKleppmannTreePersistentData(key, refsFrom, frozen, clock, queues, peerTimestampLog, log);
}
public JKleppmannTreePersistentData withLog(TreeMap<CombinedTimestamp<Long, UUID>, LogRecord<Long, UUID, JKleppmannTreeNodeMeta, JObjectKey>> log) {
public JKleppmannTreePersistentData withLog(TreeMap<CombinedTimestamp<Long, PeerId>, LogRecord<Long, PeerId, JKleppmannTreeNodeMeta, JObjectKey>> log) {
return new JKleppmannTreePersistentData(key, refsFrom, frozen, clock, queues, peerTimestampLog, log);
}

View File

@@ -23,6 +23,7 @@ import org.eclipse.microprofile.config.inject.ConfigProperty;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.*;
import java.util.stream.Collectors;
@ApplicationScoped
public class PeerManager {
@@ -135,7 +136,7 @@ public class PeerManager {
// FIXME:
private boolean pingCheck(PeerInfo host, PeerAddress address) {
try {
return rpcClientFactory.withObjSyncClient(host.id(), address, pingTimeout, c -> {
return rpcClientFactory.withObjSyncClient(host.id(), address, pingTimeout, (peer, c) -> {
var ret = c.ping(PingRequest.newBuilder().setSelfUuid(persistentPeerDataService.getSelfUuid().toString()).build());
if (!UUID.fromString(ret.getSelfUuid()).equals(host.id().id())) {
throw new IllegalStateException("Ping selfUuid returned " + ret.getSelfUuid() + " but expected " + host.id());
@@ -148,8 +149,12 @@ public class PeerManager {
}
}
public boolean isReachable(PeerId host) {
return _states.containsKey(host);
}
public boolean isReachable(PeerInfo host) {
return _states.containsKey(host.id());
return isReachable(host.id());
}
public PeerAddress getAddress(PeerId host) {
@@ -166,21 +171,13 @@ public class PeerManager {
// .map(Map.Entry::getKey).toList());
// }
// public HostStateSnapshot getHostStateSnapshot() {
// ArrayList<UUID> available = new ArrayList<>();
// ArrayList<UUID> unavailable = new ArrayList<>();
// _transientPeersState.runReadLocked(d -> {
// for (var v : d.getStates().entrySet()) {
// if (v.getValue().isReachable())
// available.add(v.getKey());
// else
// unavailable.add(v.getKey());
// }
// return null;
// }
// );
// return new HostStateSnapshot(available, unavailable);
// }
public HostStateSnapshot getHostStateSnapshot() {
return transactionManager.run(() -> {
var partition = peerInfoService.getPeersNoSelf().stream().map(PeerInfo::id)
.collect(Collectors.partitioningBy(this::isReachable));
return new HostStateSnapshot(partition.get(true), partition.get(false));
});
}
// public void removeRemoteHost(UUID host) {
// persistentPeerDataService.removeHost(host);
@@ -227,7 +224,7 @@ public class PeerManager {
void apply(UUID host);
}
public record HostStateSnapshot(List<UUID> available, List<UUID> unavailable) {
public record HostStateSnapshot(Collection<PeerId> available, Collection<PeerId> unavailable) {
}
}

View File

@@ -0,0 +1,46 @@
package com.usatiuk.dhfs.objects.repository;
import com.usatiuk.autoprotomap.runtime.ProtoSerializer;
import com.usatiuk.dhfs.objects.JDataRemote;
import com.usatiuk.dhfs.objects.JObjectKey;
import com.usatiuk.dhfs.objects.PeerId;
import com.usatiuk.dhfs.objects.ReceivedObject;
import com.usatiuk.dhfs.objects.persistence.RemoteObjectP;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import org.pcollections.HashTreePMap;
import org.pcollections.PMap;
@ApplicationScoped
public class ReceivedObjectSerializer implements ProtoSerializer<GetObjectReply, ReceivedObject> {
@Inject
ProtoSerializer<RemoteObjectP, JDataRemote> remoteObjectSerializer;
@Override
public ReceivedObject deserialize(GetObjectReply message) {
PMap<PeerId, Long> changelog = HashTreePMap.empty();
for (var entry : message.getHeader().getChangelog().getEntriesList()) {
changelog = changelog.plus(PeerId.of(entry.getHost()), entry.getVersion());
}
return new ReceivedObject(
JObjectKey.of(message.getHeader().getName()),
changelog,
remoteObjectSerializer.deserialize(message.getContent())
);
}
@Override
public GetObjectReply serialize(ReceivedObject object) {
var builder = GetObjectReply.newBuilder();
var headerBuilder = builder.getHeaderBuilder();
headerBuilder.setName(object.key().toString());
var changelogBuilder = headerBuilder.getChangelogBuilder();
object.changelog().forEach((peer, version) -> {
changelogBuilder.addEntriesBuilder()
.setHost(peer.toString())
.setVersion(version);
});
builder.setContent(remoteObjectSerializer.serialize(object.data()));
return builder.build();
}
}

View File

@@ -1,82 +1,84 @@
//package com.usatiuk.dhfs.objects.repository;
//
//import com.google.common.collect.Maps;
//import com.usatiuk.autoprotomap.runtime.ProtoSerializer;
//import com.usatiuk.dhfs.objects.jrepository.*;
//import com.usatiuk.dhfs.objects.persistence.JObjectDataP;
//import com.usatiuk.dhfs.objects.repository.invalidation.InvalidationQueueService;
//import com.usatiuk.dhfs.objects.repository.opsupport.Op;
//import io.grpc.Status;
//import io.grpc.StatusRuntimeException;
//import io.quarkus.logging.Log;
//import jakarta.enterprise.context.ApplicationScoped;
//import jakarta.inject.Inject;
//import org.apache.commons.lang3.tuple.Pair;
//
//import javax.annotation.Nullable;
//import java.util.*;
//import java.util.concurrent.Callable;
//import java.util.concurrent.ConcurrentLinkedDeque;
//import java.util.concurrent.Executors;
//import java.util.stream.Collectors;
//
//@ApplicationScoped
//public class RemoteObjectServiceClient {
// @Inject
// PersistentPeerDataService persistentPeerDataService;
//
// @Inject
// RpcClientFactory rpcClientFactory;
//
// @Inject
// JObjectManager jObjectManager;
//
// @Inject
// SyncHandler syncHandler;
// @Inject
// InvalidationQueueService invalidationQueueService;
// @Inject
package com.usatiuk.dhfs.objects.repository;
import com.usatiuk.autoprotomap.runtime.ProtoSerializer;
import com.usatiuk.dhfs.objects.*;
import com.usatiuk.dhfs.objects.repository.invalidation.InvalidationQueueService;
import com.usatiuk.dhfs.objects.repository.invalidation.Op;
import com.usatiuk.dhfs.objects.transaction.Transaction;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.quarkus.logging.Log;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import org.apache.commons.lang3.tuple.Pair;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
@ApplicationScoped
public class RemoteObjectServiceClient {
@Inject
PersistentPeerDataService persistentPeerDataService;
@Inject
RpcClientFactory rpcClientFactory;
@Inject
TransactionManager txm;
@Inject
Transaction curTx;
@Inject
RemoteTransaction remoteTx;
@Inject
SyncHandler syncHandler;
@Inject
InvalidationQueueService invalidationQueueService;
// @Inject
// ProtoSerializer<JObjectDataP, JObjectData> dataProtoSerializer;
// @Inject
// ProtoSerializer<OpPushPayload, Op> opProtoSerializer;
// @Inject
// JObjectTxManager jObjectTxManager;
//
@Inject
ProtoSerializer<OpPushPayload, Op> opProtoSerializer;
@Inject
ProtoSerializer<GetObjectReply, ReceivedObject> receivedObjectProtoSerializer;
// public Pair<ObjectHeader, JObjectDataP> getSpecificObject(UUID host, String name) {
// return rpcClientFactory.withObjSyncClient(host, client -> {
// var reply = client.getObject(GetObjectRequest.newBuilder().setSelfUuid(persistentPeerDataService.getSelfUuid().toString()).setName(name).build());
// return Pair.of(reply.getObject().getHeader(), reply.getObject().getContent());
// });
// }
//
// public JObjectDataP getObject(JObject<?> jObject) {
// jObject.assertRwLock();
//
// var targets = jObject.runReadLocked(JObjectManager.ResolutionStrategy.NO_RESOLUTION, (md, d) -> {
// var ourVersion = md.getOurVersion();
// if (ourVersion >= 1)
// return md.getRemoteCopies().entrySet().stream()
// .filter(entry -> entry.getValue().equals(ourVersion))
// .map(Map.Entry::getKey).toList();
// else
// return persistentPeerDataService.getHostUuids();
// });
//
// if (targets.isEmpty())
// throw new IllegalStateException("No targets for object " + jObject.getMeta().getName());
//
// Log.info("Downloading object " + jObject.getMeta().getName() + " from " + targets.stream().map(UUID::toString).collect(Collectors.joining(", ")));
//
// return rpcClientFactory.withObjSyncClient(targets, client -> {
// var reply = client.getObject(GetObjectRequest.newBuilder().setSelfUuid(persistentPeerDataService.getSelfUuid().toString()).setName(jObject.getMeta().getName()).build());
//
// var receivedMap = new HashMap<UUID, Long>();
// for (var e : reply.getObject().getHeader().getChangelog().getEntriesList()) {
// receivedMap.put(UUID.fromString(e.getHost()), e.getVersion());
// }
//
public void getObject(JObjectKey key, Function<Pair<PeerId, ReceivedObject>, Boolean> onReceive) {
var objMeta = remoteTx.getMeta(key).orElse(null);
if (objMeta == null) {
throw new IllegalArgumentException("Object " + key + " not found");
}
var targetVersion = objMeta.versionSum();
var targets = objMeta.knownRemoteVersions().entrySet().stream()
.filter(entry -> entry.getValue().equals(targetVersion))
.map(Map.Entry::getKey).toList();
if (targets.isEmpty())
throw new IllegalStateException("No targets for object " + key);
Log.info("Downloading object " + key + " from " + targets);
rpcClientFactory.withObjSyncClient(targets, (peer, client) -> {
var reply = client.getObject(GetObjectRequest.newBuilder().setName(key.toString()).build());
var deserialized = receivedObjectProtoSerializer.deserialize(reply);
if (!onReceive.apply(Pair.of(peer, deserialized))) {
throw new StatusRuntimeException(Status.ABORTED.withDescription("Failed to process object " + key + " from " + peer));
}
return null;
// return jObjectTxManager.executeTx(() -> {
// return jObject.runWriteLocked(JObjectManager.ResolutionStrategy.NO_RESOLUTION, (md, d, b, v) -> {
// return key.runWriteLocked(JObjectManager.ResolutionStrategy.NO_RESOLUTION, (md, d, b, v) -> {
// var unexpected = !Objects.equals(
// Maps.filterValues(md.getChangelog(), val -> val != 0),
// Maps.filterValues(receivedMap, val -> val != 0));
@@ -98,10 +100,10 @@
// return reply.getObject().getContent();
// });
// });
// });
// }
//
// @Nullable
});
}
// @Nullable
// public IndexUpdateReply notifyUpdate(JObject<?> obj, UUID host) {
// var builder = IndexUpdatePush.newBuilder().setSelfUuid(persistentPeerDataService.getSelfUuid().toString());
//
@@ -128,7 +130,7 @@
// return rpcClientFactory.withObjSyncClient(host, client -> client.indexUpdate(send));
// }
//
// public OpPushReply pushOps(List<Op> ops, String queueName, UUID host) {
public OpPushReply pushOps(PeerId target, List<Op> ops) {
// for (Op op : ops) {
// for (var ref : op.getEscapedRefs()) {
// jObjectTxManager.executeTx(() -> {
@@ -141,9 +143,14 @@
// .setQueueId(queueName);
// for (var op : ops)
// builder.addMsg(opProtoSerializer.serialize(op));
// return rpcClientFactory.withObjSyncClient(host, client -> client.opPush(builder.build()));
// }
//
for (Op op : ops) {
var serialized = opProtoSerializer.serialize(op);
var built = OpPushRequest.newBuilder().addMsg(serialized).build();
rpcClientFactory.withObjSyncClient(target, (tgt, client) -> client.opPush(built));
}
return OpPushReply.getDefaultInstance();
}
// public Collection<CanDeleteReply> canDelete(Collection<UUID> targets, String object, Collection<String> ourReferrers) {
// ConcurrentLinkedDeque<CanDeleteReply> results = new ConcurrentLinkedDeque<>();
// Log.trace("Asking canDelete for " + object + " from " + targets.stream().map(UUID::toString).collect(Collectors.joining(", ")));
@@ -171,4 +178,4 @@
// }
// return results;
// }
//}
}

View File

@@ -1,60 +1,77 @@
package com.usatiuk.dhfs.objects.repository;
import com.usatiuk.autoprotomap.runtime.ProtoSerializer;
import com.usatiuk.dhfs.objects.*;
import com.usatiuk.dhfs.objects.repository.invalidation.InvalidationQueueService;
import com.usatiuk.dhfs.objects.repository.invalidation.Op;
import com.usatiuk.dhfs.objects.repository.invalidation.OpHandler;
import com.usatiuk.dhfs.objects.transaction.Transaction;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.quarkus.grpc.GrpcService;
import io.quarkus.logging.Log;
import io.quarkus.security.identity.SecurityIdentity;
import io.smallrye.common.annotation.Blocking;
import io.smallrye.mutiny.Uni;
import jakarta.annotation.security.RolesAllowed;
import jakarta.inject.Inject;
import org.apache.commons.lang3.NotImplementedException;
/// / Note: RunOnVirtualThread hangs somehow
// Note: RunOnVirtualThread hangs somehow
@GrpcService
@RolesAllowed("cluster-member")
public class RemoteObjectServiceServer implements DhfsObjectSyncGrpc {
// @Inject
// SyncHandler syncHandler;
//
// @Inject
// JObjectManager jObjectManager;
//
// @Inject
// PeerManager peerManager;
//
// @Inject
// AutoSyncProcessor autoSyncProcessor;
//
@Inject
PersistentPeerDataService persistentPeerDataService;
//
// @Inject
// InvalidationQueueService invalidationQueueService;
//
// @Inject
@Inject
TransactionManager txm;
@Inject
PeerManager peerManager;
@Inject
Transaction curTx;
@Inject
PersistentPeerDataService persistentPeerDataService;
@Inject
InvalidationQueueService invalidationQueueService;
@Inject
SecurityIdentity identity;
// @Inject
// ProtoSerializer<JObjectDataP, JObjectData> dataProtoSerializer;
// @Inject
// ProtoSerializer<OpPushPayload, Op> opProtoSerializer;
//
// @Inject
// OpObjectRegistry opObjectRegistry;
//
// @Inject
// JObjectTxManager jObjectTxManager;
//
// @Override
// @Blocking
// public Uni<GetObjectReply> getObject(GetObjectRequest request) {
// if (request.getSelfUuid().isBlank()) throw new StatusRuntimeException(Status.INVALID_ARGUMENT);
// if (!persistentPeerDataService.existsHost(UUID.fromString(request.getSelfUuid())))
// throw new StatusRuntimeException(Status.UNAUTHENTICATED);
//
// Log.info("<-- getObject: " + request.getName() + " from " + request.getSelfUuid());
//
// var obj = jObjectManager.get(request.getName()).orElseThrow(() -> new StatusRuntimeException(Status.NOT_FOUND));
//
@Inject
ProtoSerializer<OpPushPayload, Op> opProtoSerializer;
@Inject
ProtoSerializer<GetObjectReply, ReceivedObject> receivedObjectProtoSerializer;
@Inject
RemoteTransaction remoteTx;
@Inject
OpHandler opHandler;
@Override
@Blocking
public Uni<GetObjectReply> getObject(GetObjectRequest request) {
Log.info("<-- getObject: " + request.getName() + " from " + identity.getPrincipal().getName().substring(3));
var obj = txm.run(() -> {
var got = remoteTx.get(JDataRemote.class, JObjectKey.of(request.getName())).orElse(null);
if (got == null) {
Log.info("<-- getObject NOT FOUND: " + request.getName() + " from " + identity.getPrincipal().getName().substring(3));
throw new StatusRuntimeException(Status.NOT_FOUND);
}
return got;
});
var serialized = receivedObjectProtoSerializer.serialize(obj.toReceivedObject());
return Uni.createFrom().item(serialized);
// // Does @Blocking break this?
// return Uni.createFrom().emitter(emitter -> {
// var replyObj = jObjectTxManager.executeTx(() -> {
// try {
// } catch (Exception e) {
// emitter.fail(e);
// }
// var replyObj = txm.run(() -> {
// var cur = curTx.get(JDataRemote.class, JObjectKey.of(request.getName())).orElseThrow(() -> new StatusRuntimeException(Status.NOT_FOUND));
// // Obj.markSeen before markSeen of its children
// obj.markSeen();
// return obj.runReadLocked(JObjectManager.ResolutionStrategy.LOCAL_ONLY, (meta, data) -> {
@@ -77,11 +94,17 @@ PersistentPeerDataService persistentPeerDataService;
// var ret = GetObjectReply.newBuilder()
// .setSelfUuid(persistentPeerDataService.getSelfUuid().toString())
// .setObject(replyObj).build();
// emitter.complete(ret);
// // TODO: Could this cause problems if we wait for too long?
// obj.commitFenceAsync(() -> emitter.complete(ret));
//// obj.commitFenceAsync(() -> emitter.complete(ret));
// });
// }
//
}
@Override
public Uni<CanDeleteReply> canDelete(CanDeleteRequest request) {
throw new NotImplementedException();
}
// @Override
// @Blocking
// public Uni<CanDeleteReply> canDelete(CanDeleteRequest request) {
@@ -107,11 +130,12 @@ PersistentPeerDataService persistentPeerDataService;
// return m.isDeletionCandidate() && !m.isDeleted();
// });
// // FIXME
//// if (tryUpdate) {
//// obj.get().runWriteLocked(JObjectManager.ResolutionStrategy.NO_RESOLUTION, (m, d, b, v) -> {
//// return null;
//// });
//// }
/// / if (tryUpdate) {
/// / obj.get().runWriteLocked(JObjectManager.ResolutionStrategy.NO_RESOLUTION, (m, d, b, v) -> {
/// / return null;
/// / });
/// / }
// } catch (DeletedObjectAccessException dox) {
// builder.setDeletionCandidate(true);
// }
@@ -127,7 +151,7 @@ PersistentPeerDataService persistentPeerDataService;
//
// return Uni.createFrom().item(ret);
// }
//
// @Override
// @Blocking
// public Uni<IndexUpdateReply> indexUpdate(IndexUpdatePush request) {
@@ -135,51 +159,29 @@ PersistentPeerDataService persistentPeerDataService;
// if (!persistentPeerDataService.existsHost(UUID.fromString(request.getSelfUuid())))
// throw new StatusRuntimeException(Status.UNAUTHENTICATED);
//
/// / Log.info("<-- indexUpdate: " + request.getHeader().getName());
// Log.info("<-- indexUpdate: " + request.getHeader().getName());
// return jObjectTxManager.executeTxAndFlush(() -> {
// return Uni.createFrom().item(syncHandler.handleRemoteUpdate(request));
// });
// }
//
// @Override
// @Blocking
// public Uni<OpPushReply> opPush(OpPushMsg request) {
// if (request.getSelfUuid().isBlank()) throw new StatusRuntimeException(Status.INVALID_ARGUMENT);
// if (!persistentPeerDataService.existsHost(UUID.fromString(request.getSelfUuid())))
// throw new StatusRuntimeException(Status.UNAUTHENTICATED);
//
// try {
// var objs = request.getMsgList().stream().map(opProtoSerializer::deserialize).toList();
// jObjectTxManager.executeTxAndFlush(() -> {
// opObjectRegistry.acceptExternalOps(request.getQueueId(), UUID.fromString(request.getSelfUuid()), objs);
// });
// } catch (Exception e) {
// Log.error(e, e);
// throw e;
// }
// return Uni.createFrom().item(OpPushReply.getDefaultInstance());
// }
//
@Override
public Uni<GetObjectReply> getObject(GetObjectRequest request) {
return null;
@Blocking
public Uni<OpPushReply> opPush(OpPushRequest request) {
try {
var ops = request.getMsgList().stream().map(opProtoSerializer::deserialize).toList();
for (var op : ops) {
Log.info("<-- op: " + op + " from " + identity.getPrincipal().getName().substring(3));
txm.run(() -> {
opHandler.handleOp(PeerId.of(identity.getPrincipal().getName().substring(3)), op);
});
}
} catch (Exception e) {
Log.error(e, e);
throw e;
}
return Uni.createFrom().item(OpPushReply.getDefaultInstance());
}
@Override
public Uni<CanDeleteReply> canDelete(CanDeleteRequest request) {
return null;
}
@Override
public Uni<IndexUpdateReply> indexUpdate(IndexUpdatePush request) {
return null;
}
@Override
public Uni<OpPushReply> opPush(OpPushMsg request) {
return null;
}
@Override
@Blocking

View File

@@ -76,7 +76,7 @@ public class RpcClientFactory {
.withMaxOutboundMessageSize(Integer.MAX_VALUE)
.withMaxInboundMessageSize(Integer.MAX_VALUE);
});
return fn.apply(stub.withDeadlineAfter(timeout, TimeUnit.SECONDS));
return fn.apply(host, stub.withDeadlineAfter(timeout, TimeUnit.SECONDS));
}
public void dropCache() {
@@ -85,7 +85,7 @@ public class RpcClientFactory {
@FunctionalInterface
public interface ObjectSyncClientFunction<R> {
R apply(DhfsObjectSyncGrpcGrpc.DhfsObjectSyncGrpcBlockingStub client);
R apply(PeerId peer, DhfsObjectSyncGrpcGrpc.DhfsObjectSyncGrpcBlockingStub client);
}
private record ObjSyncStubKey(PeerId id, InetAddress addr, int port) {

View File

@@ -1,12 +1,35 @@
//package com.usatiuk.dhfs.objects.repository;
package com.usatiuk.dhfs.objects.repository;
import com.usatiuk.dhfs.objects.JDataRemote;
import com.usatiuk.dhfs.objects.JObjectKey;
import com.usatiuk.dhfs.objects.PeerId;
import com.usatiuk.dhfs.objects.RemoteObject;
import com.usatiuk.dhfs.objects.transaction.Transaction;
import io.quarkus.logging.Log;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import org.apache.commons.lang3.NotImplementedException;
import org.pcollections.PMap;
import java.util.stream.Collectors;
import java.util.stream.Stream;
//
//import com.usatiuk.autoprotomap.runtime.ProtoSerializer;
//import com.usatiuk.dhfs.objects.jrepository.JObject;
//import com.usatiuk.dhfs.objects.jrepository.JObjectData;
//import com.usatiuk.dhfs.objects.jrepository.JObjectManager;
//import com.usatiuk.dhfs.objects.jrepository.JObjectTxManager;
//import com.usatiuk.dhfs.objects.persistence.JObjectDataP;
//import com.usatiuk.dhfs.objects.repository.invalidation.InvalidationQueueService;
//import com.usatiuk.dhfs.objects.repository.opsupport.OpObjectRegistry;
//import com.usatiuk.dhfs.utils.StatusRuntimeExceptionNoStacktrace;
//import io.grpc.Status;
@@ -23,8 +46,12 @@
//import java.util.stream.Collectors;
//import java.util.stream.Stream;
//
//@ApplicationScoped
//public class SyncHandler {
@ApplicationScoped
public class SyncHandler {
@Inject
Transaction curTx;
@Inject
PersistentPeerDataService persistentPeerDataService;
// @Inject
// JObjectManager jObjectManager;
// @Inject
@@ -65,143 +92,150 @@
// );
// }
//
// public void handleOneUpdate(UUID from, ObjectHeader header) {
// AtomicReference<JObject<?>> foundExt = new AtomicReference<>();
public <T extends JDataRemote> RemoteObject<T> handleOneUpdate(PeerId from, RemoteObject<T> current, PMap<PeerId, Long> rcvChangelog) {
// if (!rcv.key().equals(current.key())) {
// Log.error("Received update for different object: " + rcv.key() + " from " + from);
// throw new IllegalArgumentException("Received update for different object: " + rcv.key() + " from " + from);
// }
var receivedTotalVer = rcvChangelog.values().stream().mapToLong(Long::longValue).sum();
if (current.meta().knownRemoteVersions().getOrDefault(from, 0L) > receivedTotalVer) {
Log.error("Received older index update than was known for host: " + from + " " + current.key());
throw new IllegalStateException(); // FIXME: OutdatedUpdateException
}
Log.trace("Handling update: " + current.key() + " from " + from + "\n" + "ours: " + current + " \n" + "received: " + rcvChangelog);
boolean conflict = false;
boolean updatedRemoteVersion = false;
var newObj = current;
var curKnownRemoteVersion = current.meta().knownRemoteVersions().get(from);
if (curKnownRemoteVersion == null || !curKnownRemoteVersion.equals(receivedTotalVer))
updatedRemoteVersion = true;
if (updatedRemoteVersion)
newObj = current.withMeta(current.meta().withKnownRemoteVersions(
current.meta().knownRemoteVersions().plus(from, receivedTotalVer)
));
boolean hasLower = false;
boolean hasHigher = false;
for (var e : Stream.concat(current.meta().changelog().keySet().stream(), rcvChangelog.keySet().stream()).collect(Collectors.toUnmodifiableSet())) {
if (rcvChangelog.getOrDefault(e, 0L) < current.meta().changelog().getOrDefault(e, 0L))
hasLower = true;
if (rcvChangelog.getOrDefault(e, 0L) > current.meta().changelog().getOrDefault(e, 0L))
hasHigher = true;
}
if (hasLower && hasHigher) {
Log.info("Conflict on update (inconsistent version): " + current.key() + " from " + from);
// Log.
//
// boolean conflict = jObjectTxManager.executeTx(() -> {
// JObject<?> found = jObjectManager.getOrPut(header.getName(), JObjectData.class, Optional.empty());
// foundExt.set(found);
//
// var receivedTotalVer = header.getChangelog().getEntriesList()
// .stream().map(ObjectChangelogEntry::getVersion).reduce(0L, Long::sum);
//
// var receivedMap = new HashMap<UUID, Long>();
// for (var e : header.getChangelog().getEntriesList()) {
// receivedMap.put(UUID.fromString(e.getHost()), e.getVersion());
// }
//
// return found.runWriteLocked(JObjectManager.ResolutionStrategy.NO_RESOLUTION, (md, data, bump, invalidate) -> {
// if (md.getRemoteCopies().getOrDefault(from, 0L) > receivedTotalVer) {
// Log.error("Received older index update than was known for host: "
// + from + " " + header.getName());
// throw new OutdatedUpdateException();
// }
//
// String rcv = "";
// for (var e : header.getChangelog().getEntriesList()) {
// rcv += e.getHost() + ": " + e.getVersion() + "; ";
// }
// String ours = "";
// for (var e : md.getChangelog().entrySet()) {
// ours += e.getKey() + ": " + e.getValue() + "; ";
// }
// Log.trace("Handling update: " + header.getName() + " from " + from + "\n" + "ours: " + ours + " \n" + "received: " + rcv);
//
// boolean updatedRemoteVersion = false;
//
// var oldRemoteVer = md.getRemoteCopies().put(from, receivedTotalVer);
// if (oldRemoteVer == null || !oldRemoteVer.equals(receivedTotalVer)) updatedRemoteVersion = true;
//
// boolean hasLower = false;
// boolean hasHigher = false;
// for (var e : Stream.concat(md.getChangelog().keySet().stream(), receivedMap.keySet().stream()).collect(Collectors.toSet())) {
// if (receivedMap.getOrDefault(e, 0L) < md.getChangelog().getOrDefault(e, 0L))
// hasLower = true;
// if (receivedMap.getOrDefault(e, 0L) > md.getChangelog().getOrDefault(e, 0L))
// hasHigher = true;
// }
//
// if (hasLower && hasHigher) {
// Log.info("Conflict on update (inconsistent version): " + header.getName() + " from " + from);
// return true;
// }
//
// if (hasLower) {
// Log.info("Received older index update than known: "
// + from + " " + header.getName());
// throw new OutdatedUpdateException();
// }
//
// if (hasHigher) {
// invalidate.apply();
// md.getChangelog().clear();
// md.getChangelog().putAll(receivedMap);
// md.getChangelog().putIfAbsent(persistentPeerDataService.getSelfUuid(), 0L);
// if (header.hasPushedData())
// found.externalResolution(dataProtoSerializer.deserialize(header.getPushedData()));
// return false;
// } else if (data == null && header.hasPushedData()) {
// found.tryResolve(JObjectManager.ResolutionStrategy.LOCAL_ONLY);
// if (found.getData() == null)
// found.externalResolution(dataProtoSerializer.deserialize(header.getPushedData()));
// }
//
// assert Objects.equals(receivedTotalVer, md.getOurVersion());
//
// if (!updatedRemoteVersion)
// Log.debug("No action on update: " + header.getName() + " from " + from);
//
// return false;
// });
// });
//
// // TODO: Is the lock gap here ok?
// if (conflict) {
// Log.info("Trying conflict resolution: " + header.getName() + " from " + from);
// info("Trying conflict resolution: " + header.getName() + " from " + from);
// var found = foundExt.get();
//
// JObjectData theirsData;
// ObjectHeader theirsHeader;
// if (header.hasPushedData()) {
// if (header. hasPushedData()) {
// theirsHeader = header;
// theirsData = dataProtoSerializer.deserialize(header.getPushedData());
// theirsData = dataProtoSerializer.
//
// deserialize(header.getPushedData());
// } else {
// var got = remoteObjectServiceClient.getSpecificObject(from, header.getName());
// theirsData = dataProtoSerializer.deserialize(got.getRight());
// theirsHeader = got.getLeft();
// theirsData = dataProtoSerializer.
//
// deserialize(got.getRight());
// theirsHeader = got.
//
// getLeft();
// }
//
// jObjectTxManager.executeTx(() -> {
// var resolverClass = found.runReadLocked(JObjectManager.ResolutionStrategy.LOCAL_ONLY, (m, d) -> {
// if (d == null)
// throw new StatusRuntimeExceptionNoStacktrace(Status.UNAVAILABLE.withDescription("No local data when conflict " + header.getName()));
// return d.getConflictResolver();
// });
// var resolver = conflictResolvers.select(resolverClass);
// resolver.get().resolve(from, theirsHeader, theirsData, found);
// });
// Log.info("Resolved conflict for " + from + " " + header.getName());
// jObjectTxManager.
//
// executeTx(() -> {
// var resolverClass = found.runReadLocked(JObjectManager.ResolutionStrategy.LOCAL_ONLY, (m, d) -> {
// if (d == null)
// throw new StatusRuntimeExceptionNoStacktrace(Status.UNAVAILABLE.withDescription("No local data when conflict " + header.getName()));
// return d.getConflictResolver();
// });
// var resolver = conflictResolvers.select(resolverClass);
// resolver.
//
// get().
//
// resolve(from, theirsHeader, theirsData, found);
// });
// Log. info("Resolved conflict for " + from + " " + header.getName());
throw new NotImplementedException();
} else if (hasLower) {
Log.info("Received older index update than known: " + from + " " + current.key());
// throw new OutdatedUpdateException();
throw new NotImplementedException();
} else if (hasHigher) {
var newChangelog = rcvChangelog.containsKey(persistentPeerDataService.getSelfUuid()) ?
rcvChangelog : rcvChangelog.plus(persistentPeerDataService.getSelfUuid(), 0L);
newObj = newObj.withData(null).withMeta(newObj.meta().withChangelog(newChangelog));
// if (header.hasPushedData())
// found.externalResolution(dataProtoSerializer.deserialize(header.getPushedData()));
}
// else if (data == null && header.hasPushedData()) {
// found.tryResolve(JObjectManager.ResolutionStrategy.LOCAL_ONLY);
// if (found.getData() == null)
// found.externalResolution(dataProtoSerializer.deserialize(header.getPushedData()));
// }
//
// }
//
// public IndexUpdateReply handleRemoteUpdate(IndexUpdatePush request) {
// // TODO: Dedup
// try {
// handleOneUpdate(UUID.fromString(request.getSelfUuid()), request.getHeader());
// assert Objects.equals(receivedTotalVer, md.getOurVersion());
if (!updatedRemoteVersion)
Log.debug("No action on update: " + current.meta().key() + " from " + from);
return newObj;
}
public <T extends JDataRemote> RemoteObject<T> handleRemoteUpdate(PeerId from, JObjectKey key, RemoteObject<T> current, PMap<PeerId, Long> rcv) {
// TODO: Dedup
try {
if (current == null) {
var obj = new RemoteObject<>(key, rcv);
curTx.put(obj);
return (RemoteObject<T>) obj;
}
var newObj = handleOneUpdate(from, current, rcv);
if (newObj != current) {
curTx.put(newObj);
}
return newObj;
// } catch (OutdatedUpdateException ignored) {
// Log.warn("Outdated update of " + request.getHeader().getName() + " from " + request.getSelfUuid());
// invalidationQueueService.pushInvalidationToOne(UUID.fromString(request.getSelfUuid()), request.getHeader().getName());
// } catch (Exception ex) {
// Log.info("Error when handling update from " + request.getSelfUuid() + " of " + request.getHeader().getName(), ex);
// throw ex;
// }
//
} catch (Exception ex) {
Log.info("Error when handling update from " + from + " of " + current.meta().key(), ex);
throw ex;
}
// return IndexUpdateReply.getDefaultInstance();
// }
//
// protected static class OutdatedUpdateException extends RuntimeException {
// OutdatedUpdateException() {
// super();
// }
//
// OutdatedUpdateException(String message) {
// super(message);
// }
//
// @Override
// public synchronized Throwable fillInStackTrace() {
// return this;
// }
// }
//}
}
protected static class OutdatedUpdateException extends RuntimeException {
OutdatedUpdateException() {
super();
}
OutdatedUpdateException(String message) {
super(message);
}
@Override
public synchronized Throwable fillInStackTrace() {
return this;
}
}
}

View File

@@ -0,0 +1,17 @@
package com.usatiuk.dhfs.objects.repository.invalidation;
import com.usatiuk.dhfs.objects.JObjectKey;
import com.usatiuk.dhfs.objects.PeerId;
import org.apache.commons.collections4.MultiValuedMap;
import org.apache.commons.collections4.multimap.HashSetValuedHashMap;
import java.io.Serial;
import java.io.Serializable;
import java.util.UUID;
public class DeferredInvalidationQueueData implements Serializable {
@Serial
private static final long serialVersionUID = 1L;
public final MultiValuedMap<PeerId, JObjectKey> deferredInvalidations = new HashSetValuedHashMap<>();
}

View File

@@ -0,0 +1,85 @@
package com.usatiuk.dhfs.objects.repository.invalidation;
import com.usatiuk.dhfs.objects.JObjectKey;
import com.usatiuk.dhfs.objects.PeerId;
import com.usatiuk.dhfs.objects.repository.PeerManager;
import com.usatiuk.dhfs.utils.SerializationHelper;
import io.quarkus.logging.Log;
import io.quarkus.runtime.ShutdownEvent;
import io.quarkus.runtime.StartupEvent;
import io.quarkus.scheduler.Scheduled;
import io.smallrye.common.annotation.Blocking;
import jakarta.annotation.Priority;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
import jakarta.inject.Inject;
import org.apache.commons.lang3.SerializationUtils;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.UUID;
@ApplicationScoped
public class DeferredInvalidationQueueService {
private static final String dataFileName = "invqueue";
@Inject
PeerManager remoteHostManager;
@Inject
InvalidationQueueService invalidationQueueService;
@ConfigProperty(name = "dhfs.objects.persistence.files.root")
String dataRoot;
private DeferredInvalidationQueueData _persistentData = new DeferredInvalidationQueueData();
void init(@Observes @Priority(290) StartupEvent event) throws IOException {
Paths.get(dataRoot).toFile().mkdirs();
Log.info("Initializing with root " + dataRoot);
if (Paths.get(dataRoot).resolve(dataFileName).toFile().exists()) {
Log.info("Reading invalidation queue");
_persistentData = SerializationHelper.deserialize(Files.readAllBytes(Paths.get(dataRoot).resolve(dataFileName)));
}
// remoteHostManager.registerConnectEventListener(this::returnForHost);
}
void shutdown(@Observes @Priority(300) ShutdownEvent event) throws IOException {
Log.info("Saving deferred invalidations");
writeData();
Log.info("Saved deferred invalidations");
}
private void writeData() {
try {
Files.write(Paths.get(dataRoot).resolve(dataFileName), SerializationUtils.serialize(_persistentData));
} catch (IOException iex) {
Log.error("Error writing deferred invalidations data", iex);
throw new RuntimeException(iex);
}
}
// FIXME:
@Scheduled(every = "15s", concurrentExecution = Scheduled.ConcurrentExecution.SKIP)
@Blocking
void periodicReturn() {
for (var reachable : remoteHostManager.getAvailableHosts())
returnForHost(reachable);
}
void returnForHost(PeerId host) {
synchronized (this) {
var col = _persistentData.deferredInvalidations.get(host);
for (var s : col) {
Log.trace("Un-deferred invalidation to " + host + " of " + s);
invalidationQueueService.pushDeferredInvalidations(host, s);
}
col.clear();
}
}
void defer(PeerId host, JObjectKey object) {
synchronized (this) {
Log.trace("Deferred invalidation to " + host + " of " + object);
_persistentData.deferredInvalidations.put(host, object);
}
}
}

View File

@@ -0,0 +1,12 @@
package com.usatiuk.dhfs.objects.repository.invalidation;
import com.usatiuk.dhfs.objects.JObjectKey;
import com.usatiuk.dhfs.objects.PeerId;
import com.usatiuk.dhfs.objects.RemoteObject;
import org.pcollections.PMap;
public record IndexUpdateOp(JObjectKey key, PMap<PeerId, Long> changelog) implements Op {
public IndexUpdateOp(RemoteObject<?> object) {
this(object.key(), object.meta().changelog());
}
}

View File

@@ -0,0 +1,36 @@
package com.usatiuk.dhfs.objects.repository.invalidation;
import com.usatiuk.autoprotomap.runtime.ProtoSerializer;
import com.usatiuk.dhfs.objects.JObjectKey;
import com.usatiuk.dhfs.objects.PeerId;
import com.usatiuk.dhfs.objects.repository.IndexUpdateOpP;
import jakarta.enterprise.context.ApplicationScoped;
import org.pcollections.HashTreePMap;
import org.pcollections.PMap;
@ApplicationScoped
public class IndexUpdateOpSerializer implements ProtoSerializer<IndexUpdateOpP, IndexUpdateOp> {
@Override
public IndexUpdateOp deserialize(IndexUpdateOpP message) {
PMap<PeerId, Long> map = HashTreePMap.empty();
for (var entry : message.getHeader().getChangelog().getEntriesList()) {
map = map.plus(PeerId.of(entry.getHost()), entry.getVersion());
}
return new IndexUpdateOp(JObjectKey.of(message.getHeader().getName()), map);
}
@Override
public IndexUpdateOpP serialize(IndexUpdateOp object) {
var builder = IndexUpdateOpP.newBuilder();
var headerBuilder = builder.getHeaderBuilder();
headerBuilder.setName(object.key().name());
var changelogBuilder = headerBuilder.getChangelogBuilder();
for (var entry : object.changelog().entrySet()) {
var entryBuilder = changelogBuilder.addEntriesBuilder();
entryBuilder.setHost(entry.getKey().id().toString());
entryBuilder.setVersion(entry.getValue());
}
return builder.build();
}
}

View File

@@ -0,0 +1,190 @@
package com.usatiuk.dhfs.objects.repository.invalidation;
import com.usatiuk.dhfs.objects.JData;
import com.usatiuk.dhfs.objects.JObjectKey;
import com.usatiuk.dhfs.objects.PeerId;
import com.usatiuk.dhfs.objects.TransactionManager;
import com.usatiuk.dhfs.objects.repository.PeerManager;
import com.usatiuk.dhfs.objects.repository.PersistentPeerDataService;
import com.usatiuk.dhfs.objects.repository.RemoteObjectServiceClient;
import com.usatiuk.dhfs.objects.repository.peersync.PeerInfoService;
import com.usatiuk.dhfs.objects.transaction.Transaction;
import com.usatiuk.dhfs.utils.HashSetDelayedBlockingQueue;
import io.quarkus.logging.Log;
import io.quarkus.runtime.ShutdownEvent;
import io.quarkus.runtime.StartupEvent;
import io.vertx.core.impl.ConcurrentHashSet;
import jakarta.annotation.Priority;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
import jakarta.inject.Inject;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.apache.commons.lang3.tuple.Pair;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
@ApplicationScoped
public class InvalidationQueueService {
private final HashSetDelayedBlockingQueue<Pair<PeerId, JObjectKey>> _queue;
private final AtomicReference<ConcurrentHashSet<JObjectKey>> _toAllQueue = new AtomicReference<>(new ConcurrentHashSet<>());
@Inject
PeerManager remoteHostManager;
@Inject
RemoteObjectServiceClient remoteObjectServiceClient;
@Inject
TransactionManager txm;
@Inject
Transaction curTx;
@Inject
PersistentPeerDataService persistentPeerDataService;
@Inject
DeferredInvalidationQueueService deferredInvalidationQueueService;
@Inject
PeerInfoService peerInfoService;
@Inject
OpPusher opPusher;
@ConfigProperty(name = "dhfs.objects.invalidation.threads")
int threads;
private ExecutorService _executor;
private volatile boolean _shutdown = false;
public InvalidationQueueService(@ConfigProperty(name = "dhfs.objects.invalidation.delay") int delay) {
_queue = new HashSetDelayedBlockingQueue<>(delay);
}
void init(@Observes @Priority(300) StartupEvent event) throws InterruptedException {
BasicThreadFactory factory = new BasicThreadFactory.Builder()
.namingPattern("invalidation-%d")
.build();
_executor = Executors.newFixedThreadPool(threads, factory);
for (int i = 0; i < threads; i++) {
_executor.submit(this::sender);
}
}
void shutdown(@Observes @Priority(10) ShutdownEvent event) throws InterruptedException {
_shutdown = true;
_executor.shutdownNow();
if (!_executor.awaitTermination(30, TimeUnit.SECONDS)) {
Log.error("Failed to shut down invalidation sender thread");
}
var data = _queue.close();
Log.info("Will defer " + data.size() + " invalidations on shutdown");
for (var e : data)
deferredInvalidationQueueService.defer(e.getLeft(), e.getRight());
}
private void sender() {
while (!_shutdown) {
try {
try {
if (!_queue.hasImmediate()) {
ConcurrentHashSet<JObjectKey> toAllQueue;
while (true) {
toAllQueue = _toAllQueue.get();
if (toAllQueue != null) {
if (_toAllQueue.compareAndSet(toAllQueue, null))
break;
} else {
break;
}
}
if (toAllQueue != null) {
var hostInfo = remoteHostManager.getHostStateSnapshot();
for (var o : toAllQueue) {
for (var h : hostInfo.available())
_queue.add(Pair.of(h, o));
for (var u : hostInfo.unavailable())
deferredInvalidationQueueService.defer(u, o);
}
}
}
var data = _queue.getAllWait(100, _queue.getDelay()); // TODO: config?
if (data.isEmpty()) continue;
String stats = "Sent invalidation: ";
long success = 0;
for (var e : data) {
if (peerInfoService.getPeerInfo(e.getLeft()).isEmpty()) continue;
if (!remoteHostManager.isReachable(e.getLeft())) {
deferredInvalidationQueueService.defer(e.getLeft(), e.getRight());
continue;
}
try {
opPusher.doPush(e.getLeft(), e.getRight());
success++;
} catch (Exception ex) {
Log.info("Failed to send invalidation to " + e.getLeft() + ", will retry", ex);
pushInvalidationToOne(e.getLeft(), e.getRight());
}
if (_shutdown) {
Log.info("Invalidation sender exiting");
break;
}
}
stats += success + "/" + data.size() + " ";
Log.info(stats);
} catch (InterruptedException ie) {
throw ie;
} catch (Exception e) {
Log.error("Exception in invalidation sender thread: ", e);
}
} catch (InterruptedException ignored) {
}
}
Log.info("Invalidation sender exiting");
}
public void pushInvalidationToAll(JObjectKey key) {
// if (obj.getMeta().isOnlyLocal()) return;
while (true) {
var queue = _toAllQueue.get();
if (queue == null) {
var nq = new ConcurrentHashSet<JObjectKey>();
if (!_toAllQueue.compareAndSet(null, nq)) continue;
queue = nq;
}
queue.add(key);
if (_toAllQueue.get() == queue) break;
}
}
public void pushInvalidationToOne(PeerId host, JObjectKey obj) {
// if (obj.getMeta().isOnlyLocal()) return;
if (remoteHostManager.isReachable(host))
_queue.add(Pair.of(host, obj));
else
deferredInvalidationQueueService.defer(host, obj);
}
public void pushInvalidationToOne(PeerId host, JData obj) {
// if (obj.getMeta().isOnlyLocal()) return;
pushInvalidationToOne(host, obj.key());
}
// public void pushInvalidationToAll(String name) {
// pushInvalidationToAll(jObjectManager.get(name).orElseThrow(() -> new IllegalArgumentException("Object " + name + " not found")));
// }
//
// public void pushInvalidationToOne(PeerId host, JObjectKey name) {
// pushInvalidationToOne(host, jObjectManager.get(name).orElseThrow(() -> new IllegalArgumentException("Object " + name + " not found")));
// }
protected void pushDeferredInvalidations(PeerId host, JObjectKey name) {
_queue.add(Pair.of(host, name));
}
}

View File

@@ -0,0 +1,22 @@
package com.usatiuk.dhfs.objects.repository.invalidation;
import com.usatiuk.autoprotomap.runtime.ProtoSerializer;
import com.usatiuk.dhfs.objects.jkleppmanntree.JKleppmannTreeOpWrapper;
import com.usatiuk.dhfs.objects.repository.JKleppmannTreeOpPTemp;
import com.usatiuk.dhfs.utils.SerializationHelper;
import jakarta.enterprise.context.ApplicationScoped;
@ApplicationScoped
public class JKleppmannTreeOpPTempSerializer implements ProtoSerializer<JKleppmannTreeOpPTemp, JKleppmannTreeOpWrapper> {
@Override
public JKleppmannTreeOpWrapper deserialize(JKleppmannTreeOpPTemp message) {
return SerializationHelper.deserialize(message.getSerialized().toByteArray());
}
@Override
public JKleppmannTreeOpPTemp serialize(JKleppmannTreeOpWrapper object) {
return JKleppmannTreeOpPTemp.newBuilder()
.setSerialized(SerializationHelper.serialize(object))
.build();
}
}

View File

@@ -0,0 +1,8 @@
package com.usatiuk.dhfs.objects.repository.invalidation;
import com.usatiuk.autoprotomap.runtime.ProtoMirror;
import com.usatiuk.dhfs.objects.repository.OpPushPayload;
@ProtoMirror(OpPushPayload.class)
public interface Op {
}

View File

@@ -0,0 +1,27 @@
package com.usatiuk.dhfs.objects.repository.invalidation;
import com.usatiuk.dhfs.objects.PeerId;
import com.usatiuk.dhfs.objects.jkleppmanntree.JKleppmannTreeManager;
import com.usatiuk.dhfs.objects.jkleppmanntree.JKleppmannTreeOpWrapper;
import com.usatiuk.dhfs.objects.transaction.Transaction;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
@ApplicationScoped
public class OpHandler {
@Inject
PushOpHandler pushOpHandler;
@Inject
Transaction curTx;
@Inject
JKleppmannTreeManager jKleppmannTreeManager;
public void handleOp(PeerId from, Op op) {
if (op instanceof IndexUpdateOp iu) {
pushOpHandler.handlePush(from, iu);
} else if (op instanceof JKleppmannTreeOpWrapper jk) {
var tree = jKleppmannTreeManager.getTree(jk.treeName());
tree.acceptExternalOp(from, jk);
}
}
}

View File

@@ -0,0 +1,52 @@
package com.usatiuk.dhfs.objects.repository.invalidation;
import com.usatiuk.dhfs.objects.*;
import com.usatiuk.dhfs.objects.jkleppmanntree.JKleppmannTreeOpWrapper;
import com.usatiuk.dhfs.objects.jkleppmanntree.structs.JKleppmannTreePersistentData;
import com.usatiuk.dhfs.objects.repository.RemoteObjectServiceClient;
import com.usatiuk.dhfs.objects.transaction.Transaction;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import java.util.List;
@ApplicationScoped
public class OpPusher {
@Inject
Transaction curTx;
@Inject
TransactionManager txm;
@Inject
RemoteTransaction remoteTransaction;
@Inject
RemoteObjectServiceClient remoteObjectServiceClient;
@Inject
InvalidationQueueService invalidationQueueService;
public void doPush(PeerId op, JObjectKey key) {
Op info = txm.run(() -> {
var obj = curTx.get(JData.class, key).orElse(null);
switch (obj) {
case RemoteObject<?> remote -> {
return new IndexUpdateOp(key, remote.meta().changelog());
}
case JKleppmannTreePersistentData pd -> {
var ret = new JKleppmannTreeOpWrapper(key, pd.queues().get(op).firstEntry().getValue());
var newPd = pd.withQueues(pd.queues().plus(op, pd.queues().get(op).minus(ret.op().timestamp())));
curTx.put(newPd);
if (!newPd.queues().get(op).isEmpty())
invalidationQueueService.pushInvalidationToOne(op, pd.key());
return ret;
}
case null,
default -> {
return null;
}
}
});
if (info == null) {
return;
}
remoteObjectServiceClient.pushOps(op, List.of(info));
}
}

View File

@@ -0,0 +1,25 @@
package com.usatiuk.dhfs.objects.repository.invalidation;
import com.usatiuk.dhfs.objects.JDataRemote;
import com.usatiuk.dhfs.objects.PeerId;
import com.usatiuk.dhfs.objects.RemoteTransaction;
import com.usatiuk.dhfs.objects.repository.SyncHandler;
import com.usatiuk.dhfs.objects.transaction.Transaction;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
@ApplicationScoped
public class PushOpHandler {
@Inject
Transaction curTx;
@Inject
SyncHandler syncHandler;
@Inject
RemoteTransaction remoteTransaction;
public void handlePush(PeerId peer, IndexUpdateOp obj) {
syncHandler.handleRemoteUpdate(peer, obj.key(),
remoteTransaction.get(JDataRemote.class, obj.key()).orElse(null),
obj.changelog());
}
}

View File

@@ -1,30 +1,19 @@
package com.usatiuk.dhfs.objects.repository.peersync;
import com.usatiuk.dhfs.objects.JDataRefcounted;
import com.usatiuk.autoprotomap.runtime.ProtoMirror;
import com.usatiuk.dhfs.objects.JDataRemote;
import com.usatiuk.dhfs.objects.JObjectKey;
import com.usatiuk.dhfs.objects.PeerId;
import com.usatiuk.dhfs.objects.persistence.ChunkDataP;
import com.usatiuk.dhfs.objects.persistence.PeerInfoP;
import com.usatiuk.dhfs.objects.repository.CertificateTools;
import org.pcollections.HashTreePSet;
import org.pcollections.PCollection;
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
public record PeerInfo(JObjectKey key, PCollection<JObjectKey> refsFrom, boolean frozen, PeerId id,
byte[] cert) implements JDataRefcounted, JDataRemote {
public record PeerInfo(JObjectKey key, PeerId id, byte[] cert) implements JDataRemote {
public PeerInfo(PeerId id, byte[] cert) {
this(id.toJObjectKey(), HashTreePSet.empty(), false, id, cert);
}
@Override
public JDataRefcounted withRefsFrom(PCollection<JObjectKey> refs) {
return new PeerInfo(key, refs, frozen, id, cert);
}
@Override
public JDataRefcounted withFrozen(boolean frozen) {
return new PeerInfo(key, refsFrom, frozen, id, cert);
this(id.toJObjectKey(), id, cert);
}
public X509Certificate parsedCert() {

View File

@@ -0,0 +1,24 @@
package com.usatiuk.dhfs.objects.repository.peersync;
import com.google.protobuf.ByteString;
import com.usatiuk.autoprotomap.runtime.ProtoSerializer;
import com.usatiuk.dhfs.objects.PeerId;
import com.usatiuk.dhfs.objects.persistence.PeerInfoP;
import jakarta.enterprise.context.ApplicationScoped;
@ApplicationScoped
public class PeerInfoSerializer implements ProtoSerializer<PeerInfoP, PeerInfo> {
@Override
public PeerInfo deserialize(PeerInfoP message) {
return new PeerInfo(PeerId.of(message.getUuid()), message.getCert().toByteArray());
}
@Override
public PeerInfoP serialize(PeerInfo object) {
return PeerInfoP.newBuilder()
.setUuid(object.key().toString())
.setCert(ByteString.copyFrom(object.cert()))
.build();
}
}

View File

@@ -2,6 +2,7 @@ package com.usatiuk.dhfs.objects.repository.peersync;
import com.usatiuk.dhfs.objects.JObjectKey;
import com.usatiuk.dhfs.objects.PeerId;
import com.usatiuk.dhfs.objects.RemoteTransaction;
import com.usatiuk.dhfs.objects.TransactionManager;
import com.usatiuk.dhfs.objects.jkleppmanntree.JKleppmannTreeManager;
import com.usatiuk.dhfs.objects.jkleppmanntree.structs.JKleppmannTreeNode;
@@ -24,6 +25,8 @@ public class PeerInfoService {
JKleppmannTreeManager jKleppmannTreeManager;
@Inject
PersistentPeerDataService persistentPeerDataService;
@Inject
RemoteTransaction remoteTx;
private JKleppmannTreeManager.JKleppmannTree getTree() {
return jKleppmannTreeManager.getTree(JObjectKey.of("peers"));
@@ -37,7 +40,7 @@ public class PeerInfoService {
}
return curTx.get(JKleppmannTreeNode.class, gotKey).flatMap(node -> {
var meta = (JKleppmannTreeNodeMetaPeer) node.meta();
return curTx.get(PeerInfo.class, meta.getPeerId());
return remoteTx.getData(PeerInfo.class, meta.getPeerId());
});
});
}
@@ -69,7 +72,7 @@ public class PeerInfoService {
jObjectTxManager.run(() -> {
var parent = getTree().traverse(List.of());
var newPeerInfo = new PeerInfo(id, cert);
curTx.put(newPeerInfo);
remoteTx.put(newPeerInfo);
getTree().move(parent, new JKleppmannTreeNodeMetaPeer(newPeerInfo.id()), getTree().getNewNodeId());
});
}

View File

@@ -33,10 +33,13 @@ message FilePChunksEntry {
}
message FileP {
FsNodeP fsNode = 1;
repeated FilePChunksEntry chunks = 2;
bool symlink = 3;
int64 size = 4;
string uuid = 1;
int64 mode = 2;
int64 ctime = 3;
int64 mtime = 4;
repeated FilePChunksEntry chunks = 5;
bool symlink = 6;
int64 size = 7;
}
message DirectoryP {
@@ -53,7 +56,7 @@ message PeerDirectoryP {
repeated string peers = 1;
}
message PersistentPeerInfoP {
message PeerInfoP {
string uuid = 1;
bytes cert = 2;
}
@@ -141,15 +144,26 @@ message PeerDirectoryLocalP {
repeated string initialObjSyncDonePeers = 2;
}
message RemoteObjectP {
oneof obj {
FileP file = 2;
ChunkDataP chunkData = 5;
// PeerDirectoryP peerDirectory = 6;
PeerInfoP peerInfo = 7;
// JKleppmannTreeNodeP jKleppmannTreeNode = 8;
// JKleppmannTreePersistentDataP jKleppmannTreePersistentData = 9;
// PeerDirectoryLocalP peerDirectoryLocal = 10;
}
}
message JObjectDataP {
oneof obj {
FileP file = 2;
DirectoryP directory = 3;
ChunkDataP chunkData = 5;
PeerDirectoryP peerDirectory = 6;
PersistentPeerInfoP persistentPeerInfo = 7;
JKleppmannTreeNodeP jKleppmannTreeNode = 8;
JKleppmannTreePersistentDataP jKleppmannTreePersistentData = 9;
PeerDirectoryLocalP peerDirectoryLocal = 10;
// PeerDirectoryP peerDirectory = 6;
PeerInfoP peerInfo = 7;
// JKleppmannTreeNodeP jKleppmannTreeNode = 8;
// JKleppmannTreePersistentDataP jKleppmannTreePersistentData = 9;
// PeerDirectoryLocalP peerDirectoryLocal = 10;
}
}

View File

@@ -11,8 +11,7 @@ package dhfs.objects.sync;
service DhfsObjectSyncGrpc {
rpc GetObject (GetObjectRequest) returns (GetObjectReply) {}
rpc CanDelete (CanDeleteRequest) returns (CanDeleteReply) {}
rpc IndexUpdate (IndexUpdatePush) returns (IndexUpdateReply) {}
rpc OpPush (OpPushMsg) returns (OpPushReply) {}
rpc OpPush (OpPushRequest) returns (OpPushReply) {}
rpc Ping (PingRequest) returns (PingReply) {}
}
@@ -37,63 +36,53 @@ message ObjectChangelog {
message ObjectHeader {
string name = 2;
ObjectChangelog changelog = 5;
optional dhfs.objects.persistence.JObjectDataP pushedData = 6;
}
message ApiObject {
ObjectHeader header = 1;
dhfs.objects.persistence.JObjectDataP content = 2;
optional dhfs.objects.persistence.RemoteObjectP pushedData = 6;
}
message GetObjectRequest {
string selfUuid = 10;
string name = 2;
}
message GetObjectReply {
string selfUuid = 10;
ApiObject object = 1;
ObjectHeader header = 1;
dhfs.objects.persistence.RemoteObjectP content = 2;
}
message CanDeleteRequest {
string selfUuid = 10;
string name = 2;
repeated string ourReferrers = 3;
}
message CanDeleteReply {
string selfUuid = 10;
string objName = 1;
bool deletionCandidate = 2;
repeated string referrers = 3;
}
message IndexUpdatePush {
string selfUuid = 10;
message IndexUpdateOpP {
ObjectHeader header = 1;
}
message IndexUpdateReply {}
message JKleppmannTreePeriodicPushOpP {
string fromUuid = 1;
int64 timestamp = 2;
}
message JKleppmannTreeOpPTemp {
bytes serialized = 2;
}
message OpPushPayload {
oneof payload {
dhfs.objects.persistence.JKleppmannTreeOpP jKleppmannTreeOpWrapper = 1;
JKleppmannTreePeriodicPushOpP jKleppmannTreePeriodicPushOp = 2;
JKleppmannTreeOpPTemp jKleppmannTreeOpWrapper = 1;
// dhfs.objects.persistence.JKleppmannTreeOpP jKleppmannTreeOpWrapper = 1;
// JKleppmannTreePeriodicPushOpP jKleppmannTreePeriodicPushOp = 2;
IndexUpdateOpP indexUpdateOp = 3;
}
}
message OpPushMsg {
string selfUuid = 10;
string queueId = 1;
message OpPushRequest {
repeated OpPushPayload msg = 2;
}

View File

@@ -4,7 +4,7 @@ dhfs.objects.peerdiscovery.interval=5s
dhfs.objects.peerdiscovery.broadcast=true
dhfs.objects.sync.timeout=30
dhfs.objects.sync.ping.timeout=5
dhfs.objects.invalidation.threads=4
dhfs.objects.invalidation.threads=1
dhfs.objects.invalidation.delay=1000
dhfs.objects.reconnect_interval=5s
dhfs.objects.write_log=false

View File

@@ -4,6 +4,7 @@ import com.usatiuk.dhfs.TempDataProfile;
import com.usatiuk.dhfs.files.objects.ChunkData;
import com.usatiuk.dhfs.files.objects.File;
import com.usatiuk.dhfs.files.service.DhfsFileService;
import com.usatiuk.dhfs.objects.RemoteTransaction;
import com.usatiuk.dhfs.objects.TransactionManager;
import com.usatiuk.dhfs.objects.transaction.Transaction;
import com.usatiuk.kleppmanntree.AlreadyExistsException;
@@ -45,6 +46,8 @@ public class DhfsFileServiceSimpleTestImpl {
Transaction curTx;
@Inject
TransactionManager jObjectTxManager;
@Inject
RemoteTransaction remoteTx;
// @Test
// void readTest() {
@@ -207,9 +210,9 @@ public class DhfsFileServiceSimpleTestImpl {
jObjectTxManager.run(() -> {
var oldfile = curTx.get(File.class, ret2.get()).orElseThrow(IllegalStateException::new);
var oldfile = remoteTx.getData(File.class, ret2.get()).orElseThrow(IllegalStateException::new);
var chunk = oldfile.chunks().get(0L);
var chunkObj = curTx.get(ChunkData.class, chunk).orElseThrow(IllegalStateException::new);
var chunkObj = remoteTx.getData(ChunkData.class, chunk).orElseThrow(IllegalStateException::new);
});
Assertions.assertTrue(fileService.rename("/moveOverTest1", "/moveOverTest2"));