Server: seemingly working file sync

This commit is contained in:
2025-03-16 00:13:42 +01:00
parent be1f5d12c9
commit 8fbdf50732
27 changed files with 560 additions and 110 deletions

View File

@@ -1,11 +1,11 @@
package com.usatiuk.dhfs.files.objects;
import com.google.protobuf.ByteString;
import com.usatiuk.autoprotomap.runtime.ProtoMirror;
import com.usatiuk.dhfs.objects.JDataRemote;
import com.usatiuk.dhfs.objects.JObjectKey;
import com.usatiuk.dhfs.objects.repository.JDataRemoteDto;
public record ChunkData(JObjectKey key, ByteString data) implements JDataRemote {
public record ChunkData(JObjectKey key, ByteString data) implements JDataRemote, JDataRemoteDto {
@Override
public int estimateSize() {
return data.size();

View File

@@ -0,0 +1,26 @@
package com.usatiuk.dhfs.files.objects;
import com.usatiuk.autoprotomap.runtime.ProtoSerializer;
import com.usatiuk.dhfs.objects.JObjectKey;
import com.usatiuk.dhfs.objects.persistence.ChunkDataP;
import com.usatiuk.dhfs.objects.persistence.JObjectKeyP;
import jakarta.inject.Singleton;
@Singleton
public class ChunkDataProtoSerializer implements ProtoSerializer<ChunkDataP, ChunkData> {
@Override
public ChunkData deserialize(ChunkDataP message) {
return new ChunkData(
JObjectKey.of(message.getKey().getName()),
message.getData()
);
}
@Override
public ChunkDataP serialize(ChunkData object) {
return ChunkDataP.newBuilder()
.setKey(JObjectKeyP.newBuilder().setName(object.key().name()).build())
.setData(object.data())
.build();
}
}

View File

@@ -4,6 +4,7 @@ import com.usatiuk.dhfs.objects.JDataRemote;
import com.usatiuk.dhfs.objects.JObjectKey;
import com.usatiuk.dhfs.objects.jmap.JMapHolder;
import com.usatiuk.dhfs.objects.jmap.JMapLongKey;
import com.usatiuk.dhfs.objects.repository.JDataRemoteDto;
import java.util.Collection;
import java.util.Set;
@@ -38,4 +39,9 @@ public record File(JObjectKey key, long mode, long cTime, long mTime,
return 64;
// return chunks.size() * 64;
}
@Override
public Class<? extends JDataRemoteDto> dtoClass() {
return FileDto.class;
}
}

View File

@@ -0,0 +1,16 @@
package com.usatiuk.dhfs.files.objects;
import com.usatiuk.dhfs.objects.JDataRemote;
import com.usatiuk.dhfs.objects.JObjectKey;
import com.usatiuk.dhfs.objects.repository.JDataRemoteDto;
import org.apache.commons.lang3.tuple.Pair;
import java.util.List;
import java.util.Map;
public record FileDto(File file, List<Pair<Long, JObjectKey>> chunks) implements JDataRemoteDto {
@Override
public Class<? extends JDataRemote> objClass() {
return File.class;
}
}

View File

@@ -0,0 +1,35 @@
package com.usatiuk.dhfs.files.objects;
import com.usatiuk.dhfs.objects.JObjectKey;
import com.usatiuk.dhfs.objects.jmap.JMapHelper;
import com.usatiuk.dhfs.objects.repository.syncmap.DtoMapper;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import org.apache.commons.lang3.tuple.Pair;
import java.util.ArrayList;
import java.util.Collections;
@ApplicationScoped
public class FileDtoMapper implements DtoMapper<File, FileDto> {
@Inject
JMapHelper jMapHelper;
@Override
public FileDto toDto(File obj) {
ArrayList<Pair<Long, JObjectKey>> chunks = new ArrayList<>();
try (var it = jMapHelper.getIterator(obj)) {
while (it.hasNext()) {
var cur = it.next();
chunks.add(Pair.of(cur.getKey().key(), cur.getValue().ref()));
}
}
return new FileDto(obj, Collections.unmodifiableList(chunks));
}
@Override
public File fromDto(FileDto dto) {
throw new UnsupportedOperationException();
}
}

View File

@@ -0,0 +1,25 @@
package com.usatiuk.dhfs.files.objects;
import com.usatiuk.autoprotomap.runtime.ProtoSerializer;
import com.usatiuk.dhfs.objects.persistence.FileDtoP;
import com.usatiuk.dhfs.utils.SerializationHelper;
import jakarta.inject.Singleton;
import java.io.IOException;
@Singleton
public class FileProtoSerializer implements ProtoSerializer<FileDtoP, FileDto> {
@Override
public FileDto deserialize(FileDtoP message) {
try (var is = message.getSerializedData().newInput()) {
return SerializationHelper.deserialize(is);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public FileDtoP serialize(FileDto object) {
return FileDtoP.newBuilder().setSerializedData(SerializationHelper.serialize(object)).build();
}
}

View File

@@ -0,0 +1,98 @@
package com.usatiuk.dhfs.files.objects;
import com.usatiuk.dhfs.objects.JObjectKey;
import com.usatiuk.dhfs.objects.PeerId;
import com.usatiuk.dhfs.objects.RemoteObjectDataWrapper;
import com.usatiuk.dhfs.objects.RemoteObjectMeta;
import com.usatiuk.dhfs.objects.jmap.JMapHelper;
import com.usatiuk.dhfs.objects.jmap.JMapLongKey;
import com.usatiuk.dhfs.objects.repository.ObjSyncHandler;
import com.usatiuk.dhfs.objects.repository.PersistentPeerDataService;
import com.usatiuk.dhfs.objects.repository.SyncHelper;
import com.usatiuk.dhfs.objects.transaction.Transaction;
import io.quarkus.logging.Log;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import org.pcollections.HashTreePMap;
import org.pcollections.PMap;
import javax.annotation.Nullable;
@ApplicationScoped
public class FileSyncHandler implements ObjSyncHandler<File, FileDto> {
@Inject
Transaction curTx;
@Inject
PersistentPeerDataService persistentPeerDataService;
@Inject
JMapHelper jMapHelper;
@Override
public void handleRemoteUpdate(PeerId from, JObjectKey key, PMap<PeerId, Long> receivedChangelog,
@Nullable FileDto receivedData) {
var current = curTx.get(RemoteObjectMeta.class, key).orElse(null);
if (current == null) {
current = new RemoteObjectMeta(key, HashTreePMap.empty());
curTx.put(current);
}
var changelogCompare = SyncHelper.compareChangelogs(current.changelog(), receivedChangelog);
switch (changelogCompare) {
case EQUAL -> {
Log.debug("No action on update: " + key + " from " + from);
if (!current.hasLocalData() && receivedData != null) {
current = current.withHaveLocal(true);
curTx.put(current);
curTx.put(curTx.get(RemoteObjectDataWrapper.class, RemoteObjectMeta.ofDataKey(current.key()))
.map(w -> w.withData(receivedData.file())).orElse(new RemoteObjectDataWrapper<>(receivedData.file())));
jMapHelper.deleteAll(receivedData.file());
for (var f : receivedData.chunks()) {
jMapHelper.put(receivedData.file(), JMapLongKey.of(f.getLeft()), f.getRight());
}
}
}
case NEWER -> {
Log.debug("Received newer index update than known: " + key + " from " + from);
var newChangelog = receivedChangelog.containsKey(persistentPeerDataService.getSelfUuid()) ?
receivedChangelog : receivedChangelog.plus(persistentPeerDataService.getSelfUuid(), 0L);
current = current.withChangelog(newChangelog);
if (receivedData != null) {
current = current.withHaveLocal(true);
curTx.put(current);
curTx.put(curTx.get(RemoteObjectDataWrapper.class, RemoteObjectMeta.ofDataKey(current.key()))
.map(w -> w.withData(receivedData.file())).orElse(new RemoteObjectDataWrapper<>(receivedData.file())));
jMapHelper.deleteAll(receivedData.file());
for (var f : receivedData.chunks()) {
jMapHelper.put(receivedData.file(), JMapLongKey.of(f.getLeft()), f.getRight());
}
} else {
current = current.withHaveLocal(false);
curTx.put(current);
}
}
case OLDER -> {
Log.debug("Received older index update than known: " + key + " from " + from);
return;
}
case CONFLICT -> {
Log.debug("Conflict on update (inconsistent version): " + key + " from " + from);
// TODO:
return;
}
}
var curKnownRemoteVersion = current.knownRemoteVersions().get(from);
var receivedTotalVer = receivedChangelog.values().stream().mapToLong(Long::longValue).sum();
if (curKnownRemoteVersion == null || curKnownRemoteVersion < receivedTotalVer) {
current = current.withKnownRemoteVersions(current.knownRemoteVersions().plus(from, receivedTotalVer));
curTx.put(current);
}
}
}

View File

@@ -1,5 +1,7 @@
package com.usatiuk.dhfs.objects;
import com.usatiuk.dhfs.objects.repository.JDataRemoteDto;
import java.io.Serializable;
import java.util.Collection;
import java.util.List;
@@ -14,4 +16,9 @@ public interface JDataRemote extends Serializable {
default Collection<JObjectKey> collectRefsTo() {
return List.of();
}
default Class<? extends JDataRemoteDto> dtoClass() {
assert JDataRemoteDto.class.isAssignableFrom(getClass());
return (Class<? extends JDataRemoteDto>) this.getClass();
}
}

View File

@@ -0,0 +1,18 @@
package com.usatiuk.dhfs.objects;
import com.usatiuk.autoprotomap.runtime.ProtoSerializer;
import com.usatiuk.dhfs.objects.persistence.JObjectKeyP;
import jakarta.inject.Singleton;
@Singleton
public class JObjectKeyProtoSerializer implements ProtoSerializer<JObjectKeyP, JObjectKey> {
@Override
public JObjectKey deserialize(JObjectKeyP message) {
return JObjectKey.of(message.getName());
}
@Override
public JObjectKeyP serialize(JObjectKey object) {
return JObjectKeyP.newBuilder().setName(object.name()).build();
}
}

View File

@@ -1,6 +1,7 @@
package com.usatiuk.dhfs.objects;
import com.usatiuk.dhfs.objects.repository.JDataRemoteDto;
import org.pcollections.PMap;
public record ReceivedObject(PMap<PeerId, Long> changelog, JDataRemote data) {
public record ReceivedObject(PMap<PeerId, Long> changelog, JDataRemoteDto data) {
}

View File

@@ -28,7 +28,7 @@ public class RemoteTransaction {
try {
remoteObjectServiceClient.getObject(obj.key(), rcv -> {
if (!obj.knownType().isInstance(rcv.getRight().data()))
if (!obj.knownType().isAssignableFrom(rcv.getRight().data().objClass()))
throw new IllegalStateException("Object type mismatch: " + obj.knownType() + " vs " + rcv.getRight().data().getClass());
syncHandler.handleRemoteUpdate(rcv.getLeft(), obj.key(), rcv.getRight().changelog(), rcv.getRight().data());

View File

@@ -1,21 +0,0 @@
package com.usatiuk.dhfs.objects;
import com.usatiuk.autoprotomap.runtime.ProtoSerializer;
import com.usatiuk.dhfs.objects.persistence.JDataRemoteP;
import com.usatiuk.dhfs.utils.SerializationHelper;
import jakarta.inject.Singleton;
@Singleton
public class TemporaryJDataRemoteSerializer implements ProtoSerializer<JDataRemoteP, JDataRemote> {
@Override
public JDataRemote deserialize(JDataRemoteP message) {
return SerializationHelper.deserialize(message.getSerializedData().toByteArray());
}
@Override
public JDataRemoteP serialize(JDataRemote object) {
return JDataRemoteP.newBuilder()
.setSerializedData(SerializationHelper.serialize(object))
.build();
}
}

View File

@@ -1,12 +1,14 @@
package com.usatiuk.dhfs.objects.jmap;
import com.usatiuk.dhfs.objects.iterators.CloseableKvIterator;
import com.usatiuk.dhfs.objects.JObjectKey;
import com.usatiuk.dhfs.objects.iterators.CloseableKvIterator;
import com.usatiuk.dhfs.objects.iterators.IteratorStart;
import com.usatiuk.dhfs.objects.transaction.Transaction;
import io.quarkus.logging.Log;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import java.util.ArrayList;
import java.util.Optional;
@ApplicationScoped
@@ -30,8 +32,8 @@ public class JMapHelper {
return getIterator(holder, IteratorStart.GE, key);
}
public <K extends JMapKey & Comparable<K>> CloseableKvIterator<K, JMapEntry<K>> getIterator(JMapHolder<K> holder, IteratorStart start) {
return new JMapIterator<>(curTx.getIterator(start, makePrefix(holder.key())), holder);
public <K extends JMapKey & Comparable<K>> CloseableKvIterator<K, JMapEntry<K>> getIterator(JMapHolder<K> holder) {
return new JMapIterator<>(curTx.getIterator(IteratorStart.GE, makePrefix(holder.key())), holder);
}
public <K extends JMapKey & Comparable<K>> void put(JMapHolder<K> holder, K key, JObjectKey ref) {
@@ -46,4 +48,21 @@ public class JMapHelper {
public <K extends JMapKey & Comparable<K>> void delete(JMapHolder<K> holder, K key) {
curTx.delete(makeKey(holder.key(), key));
}
public <K extends JMapKey & Comparable<K>> void deleteAll(JMapHolder<K> he) {
ArrayList<K> collectedKeys = new ArrayList<>();
try (var it = getIterator(he)) {
while (it.hasNext()) {
var curKey = it.peekNextKey();
collectedKeys.add(curKey);
it.skip();
}
}
for (var curKey : collectedKeys) {
delete(he, curKey);
Log.tracev("Removed map entry {0} to {1}", he.key(), curKey);
}
}
}

View File

@@ -1,15 +1,11 @@
package com.usatiuk.dhfs.objects.jmap;
import com.usatiuk.dhfs.objects.*;
import com.usatiuk.dhfs.objects.iterators.IteratorStart;
import com.usatiuk.dhfs.objects.transaction.PreCommitTxHook;
import com.usatiuk.dhfs.objects.transaction.Transaction;
import io.quarkus.logging.Log;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import java.util.ArrayList;
@ApplicationScoped
public class JMapHolderRefcounterTxHook implements PreCommitTxHook {
@Inject
@@ -27,33 +23,17 @@ public class JMapHolderRefcounterTxHook implements PreCommitTxHook {
return new RemoteObjectMeta(key);
}
private <K extends JMapKey & Comparable<K>> void onDeleteImpl(JMapHolder<K> he) {
ArrayList<K> collectedKeys = new ArrayList<>();
try (var it = helper.getIterator(he, IteratorStart.GE)) {
while (it.hasNext()) {
var curKey = it.peekNextKey();
collectedKeys.add(curKey);
it.skip();
}
}
for (var curKey : collectedKeys) {
helper.delete(he, curKey);
Log.tracev("Removed map entry {0} to {1}", he.key(), curKey);
}
}
@Override
public void onDelete(JObjectKey key, JData cur) {
if (cur instanceof RemoteObjectDataWrapper dw) {
if (dw.data() instanceof JMapHolder he) {
onDeleteImpl(he);
helper.deleteAll(he);
return;
}
}
if (cur instanceof JMapHolder he) {
onDeleteImpl(he);
helper.deleteAll(he);
return;
}
}

View File

@@ -0,0 +1,76 @@
package com.usatiuk.dhfs.objects.repository;
import com.usatiuk.dhfs.objects.*;
import com.usatiuk.dhfs.objects.transaction.Transaction;
import io.quarkus.logging.Log;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import org.pcollections.HashTreePMap;
import org.pcollections.PMap;
import javax.annotation.Nullable;
@ApplicationScoped
public class DefaultObjSyncHandler {
@Inject
Transaction curTx;
@Inject
PersistentPeerDataService persistentPeerDataService;
public <T extends JDataRemote> void handleRemoteUpdate(PeerId from, JObjectKey key,
PMap<PeerId, Long> receivedChangelog,
@Nullable JDataRemote receivedData) {
var current = curTx.get(RemoteObjectMeta.class, key).orElse(null);
if (current == null) {
current = new RemoteObjectMeta(key, HashTreePMap.empty());
curTx.put(current);
}
var changelogCompare = SyncHelper.compareChangelogs(current.changelog(), receivedChangelog);
switch (changelogCompare) {
case EQUAL -> {
Log.debug("No action on update: " + key + " from " + from);
if (!current.hasLocalData() && receivedData != null) {
current = current.withHaveLocal(true);
curTx.put(current);
curTx.put(curTx.get(RemoteObjectDataWrapper.class, RemoteObjectMeta.ofDataKey(current.key()))
.map(w -> w.withData(receivedData)).orElse(new RemoteObjectDataWrapper<>(receivedData)));
}
}
case NEWER -> {
Log.debug("Received newer index update than known: " + key + " from " + from);
var newChangelog = receivedChangelog.containsKey(persistentPeerDataService.getSelfUuid()) ?
receivedChangelog : receivedChangelog.plus(persistentPeerDataService.getSelfUuid(), 0L);
current = current.withChangelog(newChangelog);
if (receivedData != null) {
current = current.withHaveLocal(true);
curTx.put(current);
curTx.put(curTx.get(RemoteObjectDataWrapper.class, RemoteObjectMeta.ofDataKey(current.key()))
.map(w -> w.withData(receivedData)).orElse(new RemoteObjectDataWrapper<>(receivedData)));
} else {
current = current.withHaveLocal(false);
curTx.put(current);
}
}
case OLDER -> {
Log.debug("Received older index update than known: " + key + " from " + from);
return;
}
case CONFLICT -> {
Log.debug("Conflict on update (inconsistent version): " + key + " from " + from);
// TODO:
return;
}
}
var curKnownRemoteVersion = current.knownRemoteVersions().get(from);
var receivedTotalVer = receivedChangelog.values().stream().mapToLong(Long::longValue).sum();
if (curKnownRemoteVersion == null || curKnownRemoteVersion < receivedTotalVer) {
current = current.withKnownRemoteVersions(current.knownRemoteVersions().plus(from, receivedTotalVer));
curTx.put(current);
}
}
}

View File

@@ -0,0 +1,15 @@
package com.usatiuk.dhfs.objects.repository;
import com.usatiuk.autoprotomap.runtime.ProtoMirror;
import com.usatiuk.dhfs.objects.JDataRemote;
import com.usatiuk.dhfs.objects.persistence.JDataRemoteDtoP;
import java.io.Serializable;
@ProtoMirror(JDataRemoteDtoP.class)
public interface JDataRemoteDto extends Serializable {
default Class<? extends JDataRemote> objClass() {
assert JDataRemote.class.isAssignableFrom(getClass());
return (Class<? extends JDataRemote>) this.getClass();
}
}

View File

@@ -0,0 +1,14 @@
package com.usatiuk.dhfs.objects.repository;
import com.usatiuk.dhfs.objects.JDataRemote;
import com.usatiuk.dhfs.objects.JObjectKey;
import com.usatiuk.dhfs.objects.PeerId;
import org.pcollections.PMap;
import javax.annotation.Nullable;
public interface ObjSyncHandler<T extends JDataRemote, D extends JDataRemoteDto> {
void handleRemoteUpdate(PeerId from, JObjectKey key,
PMap<PeerId, Long> receivedChangelog,
@Nullable D receivedData);
}

View File

@@ -6,6 +6,7 @@ import com.usatiuk.dhfs.objects.persistence.JObjectKeyP;
import com.usatiuk.dhfs.objects.repository.invalidation.InvalidationQueueService;
import com.usatiuk.dhfs.objects.repository.invalidation.Op;
import com.usatiuk.dhfs.objects.repository.invalidation.OpHandler;
import com.usatiuk.dhfs.objects.repository.syncmap.DtoMapperService;
import com.usatiuk.dhfs.objects.transaction.Transaction;
import com.usatiuk.dhfs.objects.transaction.TransactionManager;
import io.grpc.Status;
@@ -47,13 +48,15 @@ public class RemoteObjectServiceServer implements DhfsObjectSyncGrpc {
RemoteTransaction remoteTx;
@Inject
OpHandler opHandler;
@Inject
DtoMapperService dtoMapperService;
@Override
@Blocking
public Uni<GetObjectReply> getObject(GetObjectRequest request) {
Log.info("<-- getObject: " + request.getName() + " from " + identity.getPrincipal().getName().substring(3));
Pair<RemoteObjectMeta, JDataRemote> got = txm.run(() -> {
Pair<RemoteObjectMeta, JDataRemoteDto> got = txm.run(() -> {
var meta = remoteTx.getMeta(JObjectKey.of(request.getName().getName())).orElse(null);
var obj = remoteTx.getDataLocal(JDataRemote.class, JObjectKey.of(request.getName().getName())).orElse(null);
if (meta != null && !meta.seen())
@@ -64,7 +67,7 @@ public class RemoteObjectServiceServer implements DhfsObjectSyncGrpc {
if (refMeta != null && !refMeta.seen())
curTx.put(refMeta.withSeen(true));
}
return Pair.of(meta, obj);
return Pair.of(meta, dtoMapperService.toDto(obj, obj.dtoClass()));
});
if ((got.getValue() != null) && (got.getKey() == null)) {
@@ -77,7 +80,7 @@ public class RemoteObjectServiceServer implements DhfsObjectSyncGrpc {
throw new StatusRuntimeException(Status.NOT_FOUND);
}
var serialized = receivedObjectProtoSerializer.serialize(new ReceivedObject(got.getKey().changelog(), got.getValue()));
var serialized = receivedObjectProtoSerializer.serialize(new ReceivedObject(got.getKey().changelog(), got.getRight()));
return Uni.createFrom().item(serialized);
// // Does @Blocking break this?
// return Uni.createFrom().emitter(emitter -> {

View File

@@ -1,17 +1,24 @@
package com.usatiuk.dhfs.objects.repository;
import com.usatiuk.dhfs.objects.*;
import com.usatiuk.dhfs.objects.JDataRemote;
import com.usatiuk.dhfs.objects.JObjectKey;
import com.usatiuk.dhfs.objects.PeerId;
import com.usatiuk.dhfs.objects.iterators.IteratorStart;
import com.usatiuk.dhfs.objects.repository.invalidation.InvalidationQueueService;
import com.usatiuk.dhfs.objects.transaction.Transaction;
import com.usatiuk.dhfs.objects.transaction.TransactionManager;
import io.quarkus.logging.Log;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Instance;
import jakarta.inject.Inject;
import org.pcollections.HashTreePMap;
import org.pcollections.PMap;
import javax.annotation.Nullable;
import java.lang.reflect.ParameterizedType;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Stream;
@ApplicationScoped
public class SyncHandler {
@@ -23,58 +30,46 @@ public class SyncHandler {
TransactionManager txm;
@Inject
InvalidationQueueService invalidationQueueService;
@Inject
DefaultObjSyncHandler defaultObjSyncHandler;
public <T extends JDataRemote> void handleRemoteUpdate(PeerId from, JObjectKey key, PMap<PeerId, Long> receivedChangelog, @Nullable JDataRemote receivedData) {
var current = curTx.get(RemoteObjectMeta.class, key).orElse(null);
if (current == null) {
current = new RemoteObjectMeta(key, HashTreePMap.empty());
curTx.put(current);
}
private final Map<Class<? extends JDataRemote>, ObjSyncHandler> _objToSyncHandler;
private final Map<Class<? extends JDataRemoteDto>, ObjSyncHandler> _dtoToSyncHandler;
var changelogCompare = SyncHelper.compareChangelogs(current.changelog(), receivedChangelog);
public SyncHandler(Instance<ObjSyncHandler<?, ?>> syncHandlers) {
HashMap<Class<? extends JDataRemote>, ObjSyncHandler> objToHandlerMap = new HashMap<>();
HashMap<Class<? extends JDataRemoteDto>, ObjSyncHandler> dtoToHandlerMap = new HashMap<>();
switch (changelogCompare) {
case EQUAL -> {
Log.debug("No action on update: " + key + " from " + from);
if (!current.hasLocalData() && receivedData != null) {
current = current.withHaveLocal(true);
curTx.put(current);
curTx.put(curTx.get(RemoteObjectDataWrapper.class, RemoteObjectMeta.ofDataKey(current.key()))
.map(w -> w.withData(receivedData)).orElse(new RemoteObjectDataWrapper<>(receivedData)));
}
}
case NEWER -> {
Log.debug("Received newer index update than known: " + key + " from " + from);
var newChangelog = receivedChangelog.containsKey(persistentPeerDataService.getSelfUuid()) ?
receivedChangelog : receivedChangelog.plus(persistentPeerDataService.getSelfUuid(), 0L);
current = current.withChangelog(newChangelog);
if (receivedData != null) {
current = current.withHaveLocal(true);
curTx.put(current);
curTx.put(curTx.get(RemoteObjectDataWrapper.class, RemoteObjectMeta.ofDataKey(current.key()))
.map(w -> w.withData(receivedData)).orElse(new RemoteObjectDataWrapper<>(receivedData)));
} else {
current = current.withHaveLocal(false);
curTx.put(current);
}
}
case OLDER -> {
Log.debug("Received older index update than known: " + key + " from " + from);
return;
}
case CONFLICT -> {
Log.debug("Conflict on update (inconsistent version): " + key + " from " + from);
// TODO:
return;
for (var syncHandler : syncHandlers.handles()) {
for (var type : Arrays.stream(syncHandler.getBean().getBeanClass().getGenericInterfaces()).flatMap(
t -> {
if (!(t instanceof ParameterizedType pm)) return Stream.empty();
if (pm.getRawType().equals(ObjSyncHandler.class)) return Stream.of(pm);
return Stream.empty();
}
).toList()) {
var orig = type.getActualTypeArguments()[0];
var dto = type.getActualTypeArguments()[1];
assert JDataRemote.class.isAssignableFrom((Class<?>) orig);
assert JDataRemoteDto.class.isAssignableFrom((Class<?>) dto);
objToHandlerMap.put((Class<? extends JDataRemote>) orig, syncHandler.get());
dtoToHandlerMap.put((Class<? extends JDataRemoteDto>) dto, syncHandler.get());
}
}
var curKnownRemoteVersion = current.knownRemoteVersions().get(from);
var receivedTotalVer = receivedChangelog.values().stream().mapToLong(Long::longValue).sum();
if (curKnownRemoteVersion == null || curKnownRemoteVersion < receivedTotalVer) {
current = current.withKnownRemoteVersions(current.knownRemoteVersions().plus(from, receivedTotalVer));
curTx.put(current);
_objToSyncHandler = Map.copyOf(objToHandlerMap);
_dtoToSyncHandler = Map.copyOf(dtoToHandlerMap);
}
public <D extends JDataRemoteDto> void handleRemoteUpdate(PeerId from, JObjectKey key,
PMap<PeerId, Long> receivedChangelog,
@Nullable D receivedData) {
var got = Optional.ofNullable(receivedData).flatMap(d -> Optional.ofNullable(_dtoToSyncHandler.get(d.getClass()))).orElse(null);
if (got == null) {
assert receivedData == null || receivedData.objClass().equals(receivedData.getClass());
defaultObjSyncHandler.handleRemoteUpdate(from, key, receivedChangelog, (JDataRemote) receivedData);
} else {
got.handleRemoteUpdate(from, key, receivedChangelog, receivedData);
}
}

View File

@@ -0,0 +1,23 @@
package com.usatiuk.dhfs.objects.repository;
import com.usatiuk.dhfs.objects.JDataRemote;
import com.usatiuk.dhfs.objects.JObjectKey;
import com.usatiuk.dhfs.objects.PeerId;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Instance;
import jakarta.inject.Inject;
import org.pcollections.PMap;
import javax.annotation.Nullable;
import java.lang.reflect.ParameterizedType;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Stream;
@ApplicationScoped
public class SyncHandlerService {
}

View File

@@ -1,21 +1,21 @@
package com.usatiuk.dhfs.objects.repository;
import com.usatiuk.autoprotomap.runtime.ProtoSerializer;
import com.usatiuk.dhfs.objects.JDataRemote;
import com.usatiuk.dhfs.objects.PeerId;
import com.usatiuk.dhfs.objects.ReceivedObject;
import com.usatiuk.dhfs.objects.persistence.JDataRemoteP;
import com.usatiuk.dhfs.objects.persistence.JDataRemoteDtoP;
import com.usatiuk.dhfs.objects.persistence.JObjectKeyP;
import com.usatiuk.dhfs.objects.persistence.PeerIdP;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import org.pcollections.HashTreePMap;
import org.pcollections.PMap;
@Singleton
@ApplicationScoped
public class TemporaryReceivedObjectSerializer implements ProtoSerializer<GetObjectReply, ReceivedObject> {
@Inject
ProtoSerializer<JDataRemoteP, JDataRemote> remoteObjectSerializer;
ProtoSerializer<JDataRemoteDtoP, JDataRemoteDto> remoteObjectSerializer;
@Override
public ReceivedObject deserialize(GetObjectReply message) {

View File

@@ -5,10 +5,11 @@ import com.usatiuk.dhfs.objects.JDataRemote;
import com.usatiuk.dhfs.objects.JObjectKey;
import com.usatiuk.dhfs.objects.PeerId;
import com.usatiuk.dhfs.objects.repository.CertificateTools;
import com.usatiuk.dhfs.objects.repository.JDataRemoteDto;
import java.security.cert.X509Certificate;
public record PeerInfo(JObjectKey key, PeerId id, ByteString cert) implements JDataRemote {
public record PeerInfo(JObjectKey key, PeerId id, ByteString cert) implements JDataRemote, JDataRemoteDto {
public PeerInfo(PeerId id, byte[] cert) {
this(id.toJObjectKey(), id, ByteString.copyFrom(cert));
}

View File

@@ -0,0 +1,26 @@
package com.usatiuk.dhfs.objects.repository.peersync;
import com.usatiuk.autoprotomap.runtime.ProtoSerializer;
import com.usatiuk.dhfs.objects.persistence.PeerInfoP;
import com.usatiuk.dhfs.utils.SerializationHelper;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Singleton;
import java.io.IOException;
@Singleton
public class PeerInfoProtoSerializer implements ProtoSerializer<PeerInfoP, PeerInfo> {
@Override
public PeerInfo deserialize(PeerInfoP message) {
try (var is = message.getSerializedData().newInput()) {
return SerializationHelper.deserialize(is);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public PeerInfoP serialize(PeerInfo object) {
return PeerInfoP.newBuilder().setSerializedData(SerializationHelper.serialize(object)).build();
}
}

View File

@@ -0,0 +1,10 @@
package com.usatiuk.dhfs.objects.repository.syncmap;
import com.usatiuk.dhfs.objects.JDataRemote;
import com.usatiuk.dhfs.objects.repository.JDataRemoteDto;
public interface DtoMapper<F extends JDataRemote, D extends JDataRemoteDto> {
D toDto(F obj);
F fromDto(D dto);
}

View File

@@ -0,0 +1,61 @@
package com.usatiuk.dhfs.objects.repository.syncmap;
import com.usatiuk.dhfs.objects.JDataRemote;
import com.usatiuk.dhfs.objects.repository.JDataRemoteDto;
import jakarta.enterprise.inject.Instance;
import jakarta.inject.Singleton;
import java.lang.reflect.ParameterizedType;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Stream;
@Singleton
public class DtoMapperService {
private final Map<Class<? extends JDataRemote>, DtoMapper> _remoteToDtoMap;
private final Map<Class<? extends JDataRemoteDto>, DtoMapper> _dtoToRemoteMap;
public DtoMapperService(Instance<DtoMapper<?, ?>> dtoMappers) {
HashMap<Class<? extends JDataRemote>, DtoMapper> remoteToDtoMap = new HashMap<>();
HashMap<Class<? extends JDataRemoteDto>, DtoMapper> dtoToRemoteMap = new HashMap<>();
for (var dtoMapper : dtoMappers.handles()) {
for (var type : Arrays.stream(dtoMapper.getBean().getBeanClass().getGenericInterfaces()).flatMap(
t -> {
if (!(t instanceof ParameterizedType pm)) return Stream.empty();
if (pm.getRawType().equals(DtoMapper.class)) return Stream.of(pm);
return Stream.empty();
}
).toList()) {
var orig = type.getActualTypeArguments()[0];
var dto = type.getActualTypeArguments()[1];
assert JDataRemote.class.isAssignableFrom((Class<?>) orig);
assert JDataRemoteDto.class.isAssignableFrom((Class<?>) dto);
remoteToDtoMap.put((Class<? extends JDataRemote>) orig, dtoMapper.get());
dtoToRemoteMap.put((Class<? extends JDataRemoteDto>) dto, dtoMapper.get());
}
}
_remoteToDtoMap = Map.copyOf(remoteToDtoMap);
_dtoToRemoteMap = Map.copyOf(dtoToRemoteMap);
}
public <F extends JDataRemote, D extends JDataRemoteDto> D toDto(F from, Class<D> to) {
if (to.equals(from.getClass())) {
return (D) from;
}
var got = _remoteToDtoMap.get(from.getClass()).toDto(from);
assert to.isInstance(got);
return to.cast(got);
}
public <F extends JDataRemote, D extends JDataRemoteDto> F fromDto(D from, Class<F> to) {
if (to.equals(from.getClass())) {
return (F) from;
}
var got = _dtoToRemoteMap.get(from.getClass()).fromDto(from);
assert to.isInstance(got);
return to.cast(got);
}
}

View File

@@ -23,11 +23,27 @@ message ObjectChangelog {
}
// TODO: Serialization
message JDataRemoteP {
message FileDtoP {
bytes serializedData = 1;
}
message ChunkDataP {
JObjectKeyP key = 1;
bytes data = 2;
}
message PeerInfoP {
bytes serializedData = 1;
}
message JDataRemoteDtoP {
oneof obj {
FileDtoP fileDto = 1;
ChunkDataP chunkData = 2;
PeerInfoP peerInfo = 3;
}
}
message JDataP {
bytes serializedData = 1;
}

View File

@@ -27,7 +27,7 @@ message GetObjectRequest {
message GetObjectReply {
dhfs.objects.persistence.ObjectChangelog changelog = 5;
dhfs.objects.persistence.JDataRemoteP pushedData = 6;
dhfs.objects.persistence.JDataRemoteDtoP pushedData = 6;
}
message CanDeleteRequest {