file sync not compleeeetely broken

This commit is contained in:
2025-02-15 22:12:10 +01:00
parent ecee392a39
commit 2065423376
18 changed files with 434 additions and 334 deletions

View File

@@ -75,7 +75,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
private ChunkData createChunk(ByteString bytes) {
var newChunk = new ChunkData(JObjectKey.of(UUID.randomUUID().toString()), bytes);
remoteTx.put(newChunk);
remoteTx.putData(newChunk);
return newChunk;
}
@@ -104,7 +104,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
var ref = curTx.get(JData.class, uuid).orElse(null);
if (ref == null) return Optional.empty();
GetattrRes ret;
if (ref instanceof RemoteObject r) {
if (ref instanceof RemoteObjectMeta r) {
var remote = remoteTx.getData(JDataRemote.class, uuid).orElse(null);
if (remote instanceof File f) {
ret = new GetattrRes(f.mTime(), f.cTime(), f.mode(), f.symlink() ? GetattrType.SYMLINK : GetattrType.FILE);
@@ -157,7 +157,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
var fuuid = UUID.randomUUID();
Log.debug("Creating file " + fuuid);
File f = new File(JObjectKey.of(fuuid.toString()), mode, System.currentTimeMillis(), System.currentTimeMillis(), TreePMap.empty(), false, 0);
remoteTx.put(f);
remoteTx.putData(f);
try {
getTree().move(parent.key(), new JKleppmannTreeNodeMetaFile(fname, f.key()), getTree().getNewNodeId());
@@ -230,10 +230,10 @@ public class DhfsFileServiceImpl implements DhfsFileService {
if (dent instanceof JKleppmannTreeNode) {
return true;
} else if (dent instanceof RemoteObject) {
} else if (dent instanceof RemoteObjectMeta) {
var remote = remoteTx.getData(JDataRemote.class, uuid).orElse(null);
if (remote instanceof File f) {
remoteTx.put(f.withMode(mode).withMTime(System.currentTimeMillis()));
remoteTx.putData(f.withMode(mode).withMTime(System.currentTimeMillis()));
return true;
} else {
throw new IllegalArgumentException(uuid + " is not a file");
@@ -502,7 +502,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
}
file = file.withChunks(file.chunks().minusAll(removedChunks.keySet()).plusAll(newChunks)).withMTime(System.currentTimeMillis());
remoteTx.put(file);
remoteTx.putData(file);
cleanupChunks(file, removedChunks.values());
updateFileSize(file);
@@ -526,7 +526,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
var oldChunks = file.chunks();
file = file.withChunks(TreePMap.empty()).withMTime(System.currentTimeMillis());
remoteTx.put(file);
remoteTx.putData(file);
cleanupChunks(file, oldChunks.values());
updateFileSize(file);
return true;
@@ -587,7 +587,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
}
file = file.withChunks(file.chunks().minusAll(removedChunks.keySet()).plusAll(newChunks)).withMTime(System.currentTimeMillis());
remoteTx.put(file);
remoteTx.putData(file);
cleanupChunks(file, removedChunks.values());
updateFileSize(file);
return true;
@@ -640,7 +640,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
"File not found for setTimes: " + fileUuid))
);
remoteTx.put(file.withCTime(atimeMs).withMTime(mtimeMs));
remoteTx.putData(file.withCTime(atimeMs).withMTime(mtimeMs));
return true;
});
}
@@ -657,7 +657,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
}
if (realSize != file.size()) {
remoteTx.put(file.withSize(realSize));
remoteTx.putData(file.withSize(realSize));
}
});
}

View File

@@ -1,5 +1,5 @@
package com.usatiuk.dhfs.objects;
public interface ConflictResolver {
void resolve(PeerId fromPeer, RemoteObject<?> ours, RemoteObject<?> theirs);
void resolve(PeerId fromPeer, RemoteObjectMeta ours, RemoteObjectMeta theirs);
}

View File

@@ -22,7 +22,7 @@ public class DeleterTxHook implements PreCommitTxHook {
return;
}
if (canDelete(refCur)) {
if (refCur instanceof RemoteObject<?> ro) {
if (refCur instanceof RemoteObjectMeta ro) {
remoteObjectDeleter.putDeletionCandidate(ro);
return;
}
@@ -38,7 +38,7 @@ public class DeleterTxHook implements PreCommitTxHook {
}
if (canDelete(refCur)) {
if (refCur instanceof RemoteObject<?> ro) {
if (refCur instanceof RemoteObjectMeta ro) {
remoteObjectDeleter.putDeletionCandidate(ro);
return;
}

View File

@@ -1,8 +1,6 @@
package com.usatiuk.dhfs.objects;
import com.usatiuk.dhfs.objects.jkleppmanntree.structs.JKleppmannTreeNode;
import com.usatiuk.dhfs.objects.jkleppmanntree.structs.JKleppmannTreeNodeMetaFile;
import com.usatiuk.dhfs.objects.jkleppmanntree.structs.JKleppmannTreePersistentData;
import com.usatiuk.dhfs.objects.transaction.Transaction;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
@@ -19,8 +17,8 @@ public class RefcounterTxHook implements PreCommitTxHook {
return found;
}
if (cur instanceof RemoteObject<?> || cur instanceof JKleppmannTreeNode) {
return new RemoteObject<>(key);
if (cur instanceof RemoteObjectMeta || cur instanceof JKleppmannTreeNode) {
return new RemoteObjectMeta(key);
} else {
return found;
}

View File

@@ -16,7 +16,7 @@ public class RemoteObjPusherTxHook implements PreCommitTxHook {
@Override
public void onChange(JObjectKey key, JData old, JData cur) {
boolean invalidate = switch (cur) {
case RemoteObject<?> remote -> !remote.meta().changelog().equals(((RemoteObject) old).meta().changelog());
case RemoteObjectMeta remote -> !remote.changelog().equals(((RemoteObjectMeta) old).changelog());
case JKleppmannTreePersistentData pd -> !pd.queues().equals(((JKleppmannTreePersistentData) old).queues());
default -> false;
};
@@ -28,7 +28,7 @@ public class RemoteObjPusherTxHook implements PreCommitTxHook {
@Override
public void onCreate(JObjectKey key, JData cur) {
if (!(cur instanceof RemoteObject remote)) {
if (!(cur instanceof RemoteObjectMeta remote)) {
return;
}
@@ -37,7 +37,7 @@ public class RemoteObjPusherTxHook implements PreCommitTxHook {
@Override
public void onDelete(JObjectKey key, JData cur) {
if (!(cur instanceof RemoteObject remote)) {
if (!(cur instanceof RemoteObjectMeta remote)) {
return;
}
}

View File

@@ -1,72 +0,0 @@
package com.usatiuk.dhfs.objects;
import org.pcollections.HashTreePSet;
import org.pcollections.PCollection;
import org.pcollections.PMap;
import org.pcollections.TreePMap;
import javax.annotation.Nullable;
import java.util.Collection;
import java.util.List;
public record RemoteObject<T extends JDataRemote>(PCollection<JObjectKey> refsFrom, boolean frozen,
RemoteObjectMeta meta, @Nullable T data) implements JDataRefcounted {
// Self put
public RemoteObject(T data, PeerId initialPeer) {
this(HashTreePSet.empty(), false, new RemoteObjectMeta(data.key(), data.getClass(), false, initialPeer), data);
}
public RemoteObject(JObjectKey key, PMap<PeerId, Long> remoteChangelog) {
this(HashTreePSet.empty(), false, new RemoteObjectMeta(key, remoteChangelog), null);
}
public RemoteObject(JObjectKey key) {
this(HashTreePSet.empty(), false, new RemoteObjectMeta(key, TreePMap.empty()), null);
}
@Override
public JObjectKey key() {
if (data != null && !data.key().equals(meta.key()))
throw new IllegalStateException("Corrupted object, key mismatch: " + meta.key() + " vs " + data.key());
return meta.key();
}
@Override
public RemoteObject<T> withRefsFrom(PCollection<JObjectKey> refs) {
return new RemoteObject<>(refs, frozen, meta, data);
}
@Override
public RemoteObject<T> withFrozen(boolean frozen) {
return new RemoteObject<>(refsFrom, frozen, meta, data);
}
public RemoteObject<T> withMeta(RemoteObjectMeta meta) {
return new RemoteObject<>(refsFrom, frozen, meta, data);
}
public RemoteObject<T> withData(T data) {
return new RemoteObject<>(refsFrom, frozen, meta, data);
}
public RemoteObject<T> withRefsFrom(PCollection<JObjectKey> refs, boolean frozen) {
return new RemoteObject<>(refs, frozen, meta, data);
}
public ReceivedObject toReceivedObject() {
if (data == null)
throw new IllegalStateException("Cannot convert to ReceivedObject without data: " + meta.key());
return new ReceivedObject(meta.key(), meta.changelog(), data);
}
@Override
public Collection<JObjectKey> collectRefsTo() {
if (data != null) return data.collectRefsTo();
return List.of();
}
@Override
public int estimateSize() {
return data == null ? 1000 : data.estimateSize();
}
}

View File

@@ -0,0 +1,43 @@
package com.usatiuk.dhfs.objects;
import org.pcollections.HashTreePSet;
import org.pcollections.PCollection;
import java.util.Collection;
public record RemoteObjectDataWrapper<T extends JDataRemote>(PCollection<JObjectKey> refsFrom,
boolean frozen,
T data) implements JDataRefcounted {
public RemoteObjectDataWrapper(T data) {
this(HashTreePSet.empty(), false, data);
}
@Override
public RemoteObjectDataWrapper<T> withRefsFrom(PCollection<JObjectKey> refs) {
return new RemoteObjectDataWrapper<>(refs, frozen, data);
}
@Override
public RemoteObjectDataWrapper<T> withFrozen(boolean frozen) {
return new RemoteObjectDataWrapper<>(refsFrom, frozen, data);
}
public RemoteObjectDataWrapper<T> withData(T data) {
return new RemoteObjectDataWrapper<>(refsFrom, frozen, data);
}
@Override
public JObjectKey key() {
return RemoteObjectMeta.ofDataKey(data.key());
}
@Override
public Collection<JObjectKey> collectRefsTo() {
return data.collectRefsTo();
}
@Override
public int estimateSize() {
return data.estimateSize();
}
}

View File

@@ -93,17 +93,14 @@ public class RemoteObjectDeleter {
// _quickCandidates.add(obj);
// }
public void putDeletionCandidate(RemoteObject<?> obj) {
synchronized (_movablesInProcessing) {
if (_movablesInProcessing.contains(obj.key())) return;
if (!obj.meta().seen()) {
if (_quickCandidates.add(obj.key()))
Log.debug("Quick deletion candidate: " + obj.key());
return;
}
if (_candidates.add(obj.key()))
Log.debug("Deletion candidate: " + obj.key());
public void putDeletionCandidate(RemoteObjectMeta obj) {
if (!obj.seen()) {
if (_quickCandidates.add(obj.key()))
Log.debug("Quick deletion candidate: " + obj.key());
return;
}
if (_candidates.add(obj.key()))
Log.debug("Deletion candidate: " + obj.key());
}
private void asyncProcessMovable(JObjectKey objName) {
@@ -118,18 +115,20 @@ public class RemoteObjectDeleter {
try {
delay = txm.run(() -> {
Log.debugv("Starting async processing of remote obj del: {0}", objName);
RemoteObject<?> target = curTx.get(RemoteObject.class, objName).orElse(null);
RemoteObjectMeta target = curTx.get(RemoteObjectMeta.class, objName).orElse(null);
if (target == null) return true;
if (!canDelete(target)) return true;
if (canDelete(target)) {
if (canDeleteImmediately(target)) {
Log.debugv("Async processing of remote obj del: immediate {0}", objName);
curTx.delete(objName);
return true;
}
var knownHosts = peerInfoService.getPeersNoSelf();
List<PeerId> missing = knownHosts.stream()
.map(PeerInfo::id)
.filter(id -> !target.meta().confirmedDeletes().contains(id)).toList();
.filter(id -> !target.confirmedDeletes().contains(id)).toList();
var ret = remoteObjectServiceClient.canDelete(missing, objName, target.refsFrom());
@@ -148,7 +147,7 @@ public class RemoteObjectDeleter {
Log.debugv("Delaying deletion check of {0}", objName);
return true;
} else {
assert canDelete(target);
assert canDeleteImmediately(target);
Log.debugv("Async processing of remote obj del: after query {0}", objName);
curTx.delete(objName);
return false;
@@ -166,15 +165,20 @@ public class RemoteObjectDeleter {
});
}
// FIXME:
private boolean canDelete(JDataRefcounted obj) {
return obj.refsFrom().isEmpty() && !obj.frozen();
}
// Returns true if the object can be deleted
private boolean canDelete(RemoteObject<?> obj) {
if (!obj.meta().seen())
private boolean canDeleteImmediately(RemoteObjectMeta obj) {
if (!obj.seen())
return true;
var knownHosts = peerInfoService.getPeers();
boolean missing = false;
for (var x : knownHosts) {
if (!obj.meta().confirmedDeletes().contains(x.id())) {
if (!obj.confirmedDeletes().contains(x.id())) {
missing = true;
break;
}
@@ -204,10 +208,12 @@ public class RemoteObjectDeleter {
Stream.of(next, nextQuick).filter(Objects::nonNull).forEach(realNext -> {
Log.debugv("Processing remote object deletion candidate: {0}", realNext);
var deleted = txm.run(() -> {
RemoteObject<?> target = curTx.get(RemoteObject.class, realNext).orElse(null);
RemoteObjectMeta target = curTx.get(RemoteObjectMeta.class, realNext).orElse(null);
if (target == null) return true;
if (canDelete(target)) {
if (!canDelete(target)) return true;
if (canDeleteImmediately(target)) {
Log.debugv("Immediate deletion of: {0}", realNext);
curTx.delete(realNext);
return true;

View File

@@ -1,53 +1,103 @@
package com.usatiuk.dhfs.objects;
import org.pcollections.HashTreePMap;
import org.pcollections.HashTreePSet;
import org.pcollections.PMap;
import org.pcollections.PSet;
import org.pcollections.*;
import java.io.Serializable;
import java.util.Collection;
import java.util.List;
public record RemoteObjectMeta(
JObjectKey key,
PMap<PeerId, Long> knownRemoteVersions,
Class<? extends JDataRemote> knownType,
PSet<PeerId> confirmedDeletes,
boolean seen,
PMap<PeerId, Long> changelog) implements Serializable {
public RemoteObjectMeta(JObjectKey key, Class<? extends JDataRemote> type, boolean seen, PeerId initialPeer) {
this(key, HashTreePMap.empty(), type, HashTreePSet.empty(), seen,
HashTreePMap.<PeerId, Long>empty().plus(initialPeer, 1L));
public record RemoteObjectMeta(PCollection<JObjectKey> refsFrom, boolean frozen,
JObjectKey key,
PMap<PeerId, Long> knownRemoteVersions,
Class<? extends JDataRemote> knownType,
PSet<PeerId> confirmedDeletes,
boolean seen,
PMap<PeerId, Long> changelog,
boolean hasLocalData) implements JDataRefcounted {
// Self put
public RemoteObjectMeta(JDataRemote data, PeerId initialPeer) {
this(HashTreePSet.empty(), false,
data.key(), HashTreePMap.empty(), data.getClass(), HashTreePSet.empty(), false,
HashTreePMap.<PeerId, Long>empty().plus(initialPeer, 1L),
true);
}
public RemoteObjectMeta(JObjectKey key, PMap<PeerId, Long> remoteChangelog) {
this(key, HashTreePMap.empty(), JDataRemote.class, HashTreePSet.empty(), true, remoteChangelog);
this(HashTreePSet.empty(), false,
key, HashTreePMap.empty(), JDataRemote.class, HashTreePSet.empty(), true,
remoteChangelog,
false);
}
public RemoteObjectMeta(JObjectKey key) {
this(HashTreePSet.empty(), false,
key, HashTreePMap.empty(), JDataRemote.class, HashTreePSet.empty(), true,
TreePMap.empty(),
false);
}
@Override
public JObjectKey key() {
return ofMetaKey(key);
}
public static JObjectKey ofMetaKey(JObjectKey key) {
return key;
}
public static JObjectKey ofDataKey(JObjectKey key) {
return JObjectKey.of(key.name() + "_data");
}
public JObjectKey dataKey() {
return ofDataKey(key);
}
@Override
public RemoteObjectMeta withRefsFrom(PCollection<JObjectKey> refs) {
return new RemoteObjectMeta(refs, frozen, key, knownRemoteVersions, knownType, confirmedDeletes, seen, changelog, hasLocalData);
}
@Override
public RemoteObjectMeta withFrozen(boolean frozen) {
return new RemoteObjectMeta(refsFrom, frozen, key, knownRemoteVersions, knownType, confirmedDeletes, seen, changelog, hasLocalData);
}
public RemoteObjectMeta withKnownRemoteVersions(PMap<PeerId, Long> knownRemoteVersions) {
return new RemoteObjectMeta(key, knownRemoteVersions, knownType, confirmedDeletes, seen, changelog);
return new RemoteObjectMeta(refsFrom, frozen, key, knownRemoteVersions, knownType, confirmedDeletes, seen, changelog, hasLocalData);
}
public RemoteObjectMeta withKnownType(Class<? extends JDataRemote> knownType) {
return new RemoteObjectMeta(key, knownRemoteVersions, knownType, confirmedDeletes, seen, changelog);
return new RemoteObjectMeta(refsFrom, frozen, key, knownRemoteVersions, knownType, confirmedDeletes, seen, changelog, hasLocalData);
}
public RemoteObjectMeta withConfirmedDeletes(PSet<PeerId> confirmedDeletes) {
return new RemoteObjectMeta(key, knownRemoteVersions, knownType, confirmedDeletes, seen, changelog);
return new RemoteObjectMeta(refsFrom, frozen, key, knownRemoteVersions, knownType, confirmedDeletes, seen, changelog, hasLocalData);
}
public RemoteObjectMeta withSeen(boolean seen) {
return new RemoteObjectMeta(key, knownRemoteVersions, knownType, confirmedDeletes, seen, changelog);
return new RemoteObjectMeta(refsFrom, frozen, key, knownRemoteVersions, knownType, confirmedDeletes, seen, changelog, hasLocalData);
}
public RemoteObjectMeta withChangelog(PMap<PeerId, Long> changelog) {
return new RemoteObjectMeta(key, knownRemoteVersions, knownType, confirmedDeletes, seen, changelog);
return new RemoteObjectMeta(refsFrom, frozen, key, knownRemoteVersions, knownType, confirmedDeletes, seen, changelog, hasLocalData);
}
public RemoteObjectMeta withHaveLocal(boolean haveLocal) {
return new RemoteObjectMeta(key, knownRemoteVersions, knownType, confirmedDeletes, seen, changelog);
return new RemoteObjectMeta(refsFrom, frozen, key, knownRemoteVersions, knownType, confirmedDeletes, seen, changelog, haveLocal);
}
public long versionSum() {
return changelog.values().stream().mapToLong(Long::longValue).sum();
}
@Override
public Collection<JObjectKey> collectRefsTo() {
if (hasLocalData) return List.of(dataKey());
return List.of();
}
@Override
public int estimateSize() {
return 1000;
}
}

View File

@@ -27,22 +27,26 @@ public class RemoteTransaction {
return curTx.getId();
}
private <T extends JDataRemote> Optional<RemoteObject<T>> tryDownloadRemote(RemoteObject<T> obj) {
MutableObject<RemoteObject<T>> success = new MutableObject<>(null);
private <T extends JDataRemote> Optional<RemoteObjectDataWrapper<T>> tryDownloadRemote(RemoteObjectMeta obj) {
MutableObject<RemoteObjectDataWrapper<T>> success = new MutableObject<>(null);
try {
remoteObjectServiceClient.getObject(obj.key(), rcv -> {
if (!obj.meta().knownType().isInstance(rcv.getRight().data()))
throw new IllegalStateException("Object type mismatch: " + obj.meta().knownType() + " vs " + rcv.getRight().data().getClass());
if (!obj.knownType().isInstance(rcv.getRight().data()))
throw new IllegalStateException("Object type mismatch: " + obj.knownType() + " vs " + rcv.getRight().data().getClass());
if (!rcv.getRight().changelog().equals(obj.meta().changelog())) {
var updated = syncHandler.handleRemoteUpdate(rcv.getLeft(), obj.key(), obj, rcv.getRight().changelog());
if (!rcv.getRight().changelog().equals(updated.meta().changelog()))
throw new IllegalStateException("Changelog mismatch, update failed?: " + rcv.getRight().changelog() + " vs " + updated.meta().changelog());
success.setValue(updated.withData((T) rcv.getRight().data()));
} else {
success.setValue(obj.withData((T) rcv.getRight().data()));
}
syncHandler.handleRemoteUpdate(rcv.getLeft(), obj.key(), rcv.getRight().changelog(), rcv.getRight().data());
var now = curTx.get(RemoteObjectMeta.class, RemoteObjectMeta.ofMetaKey(obj.key())).orElse(null);
assert now != null;
if (!now.hasLocalData())
return false;
var gotData = curTx.get(RemoteObjectDataWrapper.class, RemoteObjectMeta.ofDataKey(obj.key())).orElse(null);
assert gotData != null;
success.setValue(gotData);
return true;
});
} catch (Exception e) {
@@ -50,60 +54,51 @@ public class RemoteTransaction {
return Optional.empty();
}
curTx.put(success.getValue());
return Optional.of(success.getValue());
}
@SuppressWarnings("unchecked")
public <T extends JDataRemote> Optional<RemoteObject<T>> get(Class<T> type, JObjectKey key, LockingStrategy strategy) {
return curTx.get(RemoteObject.class, key, strategy)
private <T extends JDataRemote> Optional<T> getData(Class<T> type, JObjectKey key, LockingStrategy strategy, boolean tryRequest) {
return curTx.get(RemoteObjectMeta.class, RemoteObjectMeta.ofMetaKey(key), strategy)
.flatMap(obj -> {
if (obj.data() != null && !type.isInstance(obj.data()))
throw new IllegalStateException("Object (real) type mismatch: " + obj.data().getClass() + " vs " + type);
// FIXME:
// if (!type.isAssignableFrom(obj.meta().knownType()))
// throw new IllegalStateException("Object (meta) type mismatch: " + obj.meta().knownType() + " vs " + type);
if (obj.data() != null)
return Optional.of(obj);
else
return tryDownloadRemote(obj);
if (obj.hasLocalData()) {
var realData = curTx.get(RemoteObjectDataWrapper.class, RemoteObjectMeta.ofDataKey(key), strategy).orElse(null);
if (realData == null)
throw new IllegalStateException("Local data not found for " + key); // TODO: Race
if (!type.isInstance(realData.data()))
throw new IllegalStateException("Object type mismatch: " + realData.data().getClass() + " vs " + type);
return Optional.of((T) realData.data());
}
if (!tryRequest)
return Optional.empty();
return tryDownloadRemote(obj).map(wrapper -> (T) wrapper.data());
});
}
public Optional<RemoteObjectMeta> getMeta(JObjectKey key, LockingStrategy strategy) {
return curTx.get(RemoteObject.class, key, strategy).map(obj -> obj.meta());
return curTx.get(RemoteObjectMeta.class, RemoteObjectMeta.ofMetaKey(key), strategy);
}
public <T extends JDataRemote> Optional<T> getData(Class<T> type, JObjectKey key, LockingStrategy strategy) {
return get(type, key, strategy).map(RemoteObject::data);
}
public <T extends JDataRemote> void putData(T obj) {
var curMeta = getMeta(obj.key()).orElse(null);
public <T extends JDataRemote> void put(RemoteObject<T> obj) {
curTx.put(obj);
}
public <T extends JDataRemote> void put(T obj) {
var cur = get((Class<T>) obj.getClass(), obj.key()).orElse(null);
if (cur == null) {
curTx.put(new RemoteObject<>(obj, persistentPeerDataService.getSelfUuid()));
if (curMeta == null) {
curTx.put(new RemoteObjectMeta(obj, persistentPeerDataService.getSelfUuid()));
curTx.put(new RemoteObjectDataWrapper<>(obj));
return;
}
if (cur.data() != null && cur.data().equals(obj))
return;
if (cur.data() != null && !cur.data().getClass().equals(obj.getClass()))
throw new IllegalStateException("Object type mismatch: " + cur.data().getClass() + " vs " + obj.getClass());
var newMeta = cur.meta();
// if (cur.data() != null && cur.data().equals(obj))
// return;
if (!curMeta.knownType().isAssignableFrom(obj.getClass()))
throw new IllegalStateException("Object type mismatch: " + curMeta.knownType() + " vs " + obj.getClass());
var newMeta = curMeta;
newMeta = newMeta.withChangelog(newMeta.changelog().plus(persistentPeerDataService.getSelfUuid(),
newMeta.changelog().get(persistentPeerDataService.getSelfUuid()) + 1));
var newObj = cur.withData(obj).withMeta(newMeta);
curTx.put(newObj);
}
public <T extends JDataRemote> Optional<RemoteObject<T>> get(Class<T> type, JObjectKey key) {
return get(type, key, LockingStrategy.OPTIMISTIC);
curTx.put(newMeta);
var newData = curTx.get(RemoteObjectDataWrapper.class, RemoteObjectMeta.ofDataKey(obj.key()))
.map(w -> w.withData(obj)).orElse(new RemoteObjectDataWrapper<>(obj));
curTx.put(newData);
}
public Optional<RemoteObjectMeta> getMeta(JObjectKey key) {
@@ -111,6 +106,18 @@ public class RemoteTransaction {
}
public <T extends JDataRemote> Optional<T> getData(Class<T> type, JObjectKey key) {
return getData(type, key, LockingStrategy.OPTIMISTIC);
return getData(type, key, LockingStrategy.OPTIMISTIC, true);
}
public <T extends JDataRemote> Optional<T> getDataLocal(Class<T> type, JObjectKey key) {
return getData(type, key, LockingStrategy.OPTIMISTIC, false);
}
public <T extends JDataRemote> Optional<T> getData(Class<T> type, JObjectKey key, LockingStrategy strategy) {
return getData(type, key, strategy, true);
}
public <T extends JDataRemote> Optional<T> getDataLocal(Class<T> type, JObjectKey key, LockingStrategy strategy) {
return getData(type, key, strategy, false);
}
}

View File

@@ -15,6 +15,7 @@ import io.smallrye.common.annotation.Blocking;
import io.smallrye.mutiny.Uni;
import jakarta.annotation.security.RolesAllowed;
import jakarta.inject.Inject;
import org.apache.commons.lang3.tuple.Pair;
// Note: RunOnVirtualThread hangs somehow
@GrpcService
@@ -52,16 +53,32 @@ public class RemoteObjectServiceServer implements DhfsObjectSyncGrpc {
public Uni<GetObjectReply> getObject(GetObjectRequest request) {
Log.info("<-- getObject: " + request.getName() + " from " + identity.getPrincipal().getName().substring(3));
var obj = txm.run(() -> {
var got = remoteTx.get(JDataRemote.class, JObjectKey.of(request.getName())).orElse(null);
if (got == null) {
Log.info("<-- getObject NOT FOUND: " + request.getName() + " from " + identity.getPrincipal().getName().substring(3));
throw new StatusRuntimeException(Status.NOT_FOUND);
}
return got;
Pair<RemoteObjectMeta, JDataRemote> got = txm.run(() -> {
var meta = remoteTx.getMeta(JObjectKey.of(request.getName())).orElse(null);
var obj = remoteTx.getDataLocal(JDataRemote.class, JObjectKey.of(request.getName())).orElse(null);
if (meta != null && !meta.seen())
curTx.put(meta.withSeen(true));
if (obj != null)
for (var ref : obj.collectRefsTo()) {
var refMeta = remoteTx.getMeta(ref).orElse(null);
if (refMeta != null && !refMeta.seen())
curTx.put(refMeta.withSeen(true));
}
return Pair.of(meta, obj);
});
var serialized = receivedObjectProtoSerializer.serialize(obj.toReceivedObject());
if ((got.getValue() != null) && (got.getKey() == null)) {
Log.error("Inconsistent state for object meta: " + request.getName());
throw new StatusRuntimeException(Status.INTERNAL);
}
if (got.getValue() == null) {
Log.info("<-- getObject NOT FOUND: " + request.getName() + " from " + identity.getPrincipal().getName().substring(3));
throw new StatusRuntimeException(Status.NOT_FOUND);
}
var serialized = receivedObjectProtoSerializer.serialize(new ReceivedObject(got.getKey().key(), got.getKey().changelog(), got.getValue()));
return Uni.createFrom().item(serialized);
// // Does @Blocking break this?
// return Uni.createFrom().emitter(emitter -> {
@@ -110,7 +127,7 @@ public class RemoteObjectServiceServer implements DhfsObjectSyncGrpc {
builder.setObjName(request.getName());
txm.run(() -> {
var obj = curTx.get(RemoteObject.class, JObjectKey.of(request.getName())).orElse(null);
var obj = curTx.get(RemoteObjectMeta.class, JObjectKey.of(request.getName())).orElse(null);
if (obj == null) {
builder.setDeletionCandidate(true);

View File

@@ -1,17 +1,14 @@
package com.usatiuk.dhfs.objects.repository;
import com.usatiuk.dhfs.objects.JDataRemote;
import com.usatiuk.dhfs.objects.JObjectKey;
import com.usatiuk.dhfs.objects.PeerId;
import com.usatiuk.dhfs.objects.RemoteObject;
import com.usatiuk.dhfs.objects.*;
import com.usatiuk.dhfs.objects.transaction.Transaction;
import io.quarkus.logging.Log;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import org.pcollections.HashTreePMap;
import org.pcollections.PMap;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nullable;
//
@@ -92,149 +89,163 @@ public class SyncHandler {
// }
//
public <T extends JDataRemote> RemoteObject<T> handleOneUpdate(PeerId from, RemoteObject<T> current, PMap<PeerId, Long> rcvChangelog) {
// if (!rcv.key().equals(current.key())) {
// Log.error("Received update for different object: " + rcv.key() + " from " + from);
// throw new IllegalArgumentException("Received update for different object: " + rcv.key() + " from " + from);
// public <T extends JDataRemote> RemoteObjectMeta<T> handleOneUpdate(PeerId from, RemoteObjectMeta<T> current, PMap<PeerId, Long> rcvChangelog) {
//// if (!rcv.key().equals(current.key())) {
//// Log.error("Received update for different object: " + rcv.key() + " from " + from);
//// throw new IllegalArgumentException("Received update for different object: " + rcv.key() + " from " + from);
//// }
//
// var receivedTotalVer = rcvChangelog.values().stream().mapToLong(Long::longValue).sum();
//
// if (current.meta().knownRemoteVersions().getOrDefault(from, 0L) > receivedTotalVer) {
// Log.error("Received older index update than was known for host: " + from + " " + current.key());
// throw new IllegalStateException(); // FIXME: OutdatedUpdateException
// }
var receivedTotalVer = rcvChangelog.values().stream().mapToLong(Long::longValue).sum();
if (current.meta().knownRemoteVersions().getOrDefault(from, 0L) > receivedTotalVer) {
Log.error("Received older index update than was known for host: " + from + " " + current.key());
throw new IllegalStateException(); // FIXME: OutdatedUpdateException
}
Log.trace("Handling update: " + current.key() + " from " + from + "\n" + "ours: " + current + " \n" + "received: " + rcvChangelog);
boolean conflict = false;
boolean updatedRemoteVersion = false;
var newObj = current;
var curKnownRemoteVersion = current.meta().knownRemoteVersions().get(from);
if (curKnownRemoteVersion == null || !curKnownRemoteVersion.equals(receivedTotalVer))
updatedRemoteVersion = true;
if (updatedRemoteVersion)
newObj = current.withMeta(current.meta().withKnownRemoteVersions(
current.meta().knownRemoteVersions().plus(from, receivedTotalVer)
));
boolean hasLower = false;
boolean hasHigher = false;
for (var e : Stream.concat(current.meta().changelog().keySet().stream(), rcvChangelog.keySet().stream()).collect(Collectors.toUnmodifiableSet())) {
if (rcvChangelog.getOrDefault(e, 0L) < current.meta().changelog().getOrDefault(e, 0L))
hasLower = true;
if (rcvChangelog.getOrDefault(e, 0L) > current.meta().changelog().getOrDefault(e, 0L))
hasHigher = true;
}
if (hasLower && hasHigher) {
Log.info("Conflict on update (inconsistent version): " + current.key() + " from " + from);
// Log.
//
// info("Trying conflict resolution: " + header.getName() + " from " + from);
// var found = foundExt.get();
// Log.trace("Handling update: " + current.key() + " from " + from + "\n" + "ours: " + current + " \n" + "received: " + rcvChangelog);
//
// JObjectData theirsData;
// ObjectHeader theirsHeader;
// if (header. hasPushedData()) {
// theirsHeader = header;
// theirsData = dataProtoSerializer.
// boolean conflict = false;
// boolean updatedRemoteVersion = false;
//
// deserialize(header.getPushedData());
// } else {
// var got = remoteObjectServiceClient.getSpecificObject(from, header.getName());
// theirsData = dataProtoSerializer.
// var newObj = current;
// var curKnownRemoteVersion = current.meta().knownRemoteVersions().get(from);
//
// deserialize(got.getRight());
// theirsHeader = got.
// if (curKnownRemoteVersion == null || !curKnownRemoteVersion.equals(receivedTotalVer))
// updatedRemoteVersion = true;
//
// getLeft();
// }
// if (updatedRemoteVersion)
// newObj = current.withMeta(current.meta().withKnownRemoteVersions(
// current.meta().knownRemoteVersions().plus(from, receivedTotalVer)
// ));
//
// jObjectTxManager.
//
// executeTx(() -> {
// var resolverClass = found.runReadLocked(JObjectManager.ResolutionStrategy.LOCAL_ONLY, (m, d) -> {
// if (d == null)
// throw new StatusRuntimeExceptionNoStacktrace(Status.UNAVAILABLE.withDescription("No local data when conflict " + header.getName()));
// return d.getConflictResolver();
// });
// var resolver = conflictResolvers.select(resolverClass);
// resolver.
//
// get().
//
// resolve(from, theirsHeader, theirsData, found);
// });
// Log. info("Resolved conflict for " + from + " " + header.getName());
// throw new NotImplementedException();
} else if (hasLower) {
Log.info("Received older index update than known: " + from + " " + current.key());
// throw new OutdatedUpdateException();
// throw new NotImplementedException();
} else if (hasHigher) {
var newChangelog = rcvChangelog.containsKey(persistentPeerDataService.getSelfUuid()) ?
rcvChangelog : rcvChangelog.plus(persistentPeerDataService.getSelfUuid(), 0L);
newObj = newObj.withData(null).withMeta(newObj.meta().withChangelog(newChangelog));
// if (header.hasPushedData())
// found.externalResolution(dataProtoSerializer.deserialize(header.getPushedData()));
}
// else if (data == null && header.hasPushedData()) {
// found.tryResolve(JObjectManager.ResolutionStrategy.LOCAL_ONLY);
// if (found.getData() == null)
// found.externalResolution(dataProtoSerializer.deserialize(header.getPushedData()));
// boolean hasLower = false;
// boolean hasHigher = false;
// for (var e : Stream.concat(current.meta().changelog().keySet().stream(), rcvChangelog.keySet().stream()).collect(Collectors.toUnmodifiableSet())) {
// if (rcvChangelog.getOrDefault(e, 0L) < current.meta().changelog().getOrDefault(e, 0L))
// hasLower = true;
// if (rcvChangelog.getOrDefault(e, 0L) > current.meta().changelog().getOrDefault(e, 0L))
// hasHigher = true;
// }
//
// if (hasLower && hasHigher) {
// Log.info("Conflict on update (inconsistent version): " + current.key() + " from " + from);
//// Log.
////
//// info("Trying conflict resolution: " + header.getName() + " from " + from);
//// var found = foundExt.get();
////
//// JObjectData theirsData;
//// ObjectHeader theirsHeader;
//// if (header. hasPushedData()) {
//// theirsHeader = header;
//// theirsData = dataProtoSerializer.
////
//// deserialize(header.getPushedData());
//// } else {
//// var got = remoteObjectServiceClient.getSpecificObject(from, header.getName());
//// theirsData = dataProtoSerializer.
////
//// deserialize(got.getRight());
//// theirsHeader = got.
////
//// getLeft();
//// }
////
//// jObjectTxManager.
////
//// executeTx(() -> {
//// var resolverClass = found.runReadLocked(JObjectManager.ResolutionStrategy.LOCAL_ONLY, (m, d) -> {
//// if (d == null)
//// throw new StatusRuntimeExceptionNoStacktrace(Status.UNAVAILABLE.withDescription("No local data when conflict " + header.getName()));
//// return d.getConflictResolver();
//// });
//// var resolver = conflictResolvers.select(resolverClass);
//// resolver.
////
//// get().
////
//// resolve(from, theirsHeader, theirsData, found);
//// });
//// Log. info("Resolved conflict for " + from + " " + header.getName());
//// throw new NotImplementedException();
// } else if (hasLower) {
// Log.info("Received older index update than known: " + from + " " + current.key());
//// throw new OutdatedUpdateException();
//// throw new NotImplementedException();
// } else if (hasHigher) {
// var newChangelog = rcvChangelog.containsKey(persistentPeerDataService.getSelfUuid()) ?
// rcvChangelog : rcvChangelog.plus(persistentPeerDataService.getSelfUuid(), 0L);
//
// newObj = newObj.withData(null).withMeta(newObj.meta().withChangelog(newChangelog));
//// if (header.hasPushedData())
//// found.externalResolution(dataProtoSerializer.deserialize(header.getPushedData()));
// }
//// else if (data == null && header.hasPushedData()) {
//// found.tryResolve(JObjectManager.ResolutionStrategy.LOCAL_ONLY);
//// if (found.getData() == null)
//// found.externalResolution(dataProtoSerializer.deserialize(header.getPushedData()));
//// }
//
// assert Objects.equals(receivedTotalVer, md.getOurVersion());
/// / assert Objects.equals(receivedTotalVer, md.getOurVersion());
//
// if (!updatedRemoteVersion)
// Log.debug("No action on update: " + current.meta().key() + " from " + from);
//
// return newObj;
// }
public <T extends JDataRemote> void handleRemoteUpdate(PeerId from, JObjectKey key, PMap<PeerId, Long> receivedChangelog, @Nullable JDataRemote receivedData) {
var current = curTx.get(RemoteObjectMeta.class, key).orElse(null);
if (current == null) {
current = new RemoteObjectMeta(key, HashTreePMap.empty());
curTx.put(current);
}
if (!updatedRemoteVersion)
Log.debug("No action on update: " + current.meta().key() + " from " + from);
var changelogCompare = SyncHelper.compareChangelogs(current.changelog(), receivedChangelog);
return newObj;
}
public <T extends JDataRemote> RemoteObject<T> handleRemoteUpdate(PeerId from, JObjectKey key, RemoteObject<T> current, PMap<PeerId, Long> rcv) {
// TODO: Dedup
try {
if (current == null) {
var obj = new RemoteObject<>(key, rcv);
curTx.put(obj);
current = (RemoteObject<T>) obj; // Will update known remote version too
switch (changelogCompare) {
case EQUAL -> {
Log.debug("No action on update: " + key + " from " + from);
if (!current.hasLocalData() && receivedData != null) {
current = current.withHaveLocal(true);
curTx.put(current);
curTx.put(curTx.get(RemoteObjectDataWrapper.class, RemoteObjectMeta.ofDataKey(current.key()))
.map(w -> w.withData(receivedData)).orElse(new RemoteObjectDataWrapper<>(receivedData)));
}
}
case NEWER -> {
Log.debug("Received newer index update than known: " + key + " from " + from);
var newChangelog = receivedChangelog.containsKey(persistentPeerDataService.getSelfUuid()) ?
receivedChangelog : receivedChangelog.plus(persistentPeerDataService.getSelfUuid(), 0L);
current = current.withChangelog(newChangelog);
var newObj = handleOneUpdate(from, current, rcv);
if (newObj != current) {
curTx.put(newObj);
if (receivedData != null) {
current = current.withHaveLocal(true);
curTx.put(current);
curTx.put(curTx.get(RemoteObjectDataWrapper.class, RemoteObjectMeta.ofDataKey(current.key()))
.map(w -> w.withData(receivedData)).orElse(new RemoteObjectDataWrapper<>(receivedData)));
} else {
current = current.withHaveLocal(false);
curTx.put(current);
}
}
case OLDER -> {
Log.debug("Received older index update than known: " + key + " from " + from);
return;
}
case CONFLICT -> {
Log.debug("Conflict on update (inconsistent version): " + key + " from " + from);
// TODO:
return;
}
return newObj;
// } catch (OutdatedUpdateException ignored) {
// Log.warn("Outdated update of " + request.getHeader().getName() + " from " + request.getSelfUuid());
// invalidationQueueService.pushInvalidationToOne(UUID.fromString(request.getSelfUuid()), request.getHeader().getName());
} catch (Exception ex) {
Log.info("Error when handling update from " + from + " of " + current.meta().key(), ex);
throw ex;
}
var curKnownRemoteVersion = current.knownRemoteVersions().get(from);
var receivedTotalVer = receivedChangelog.values().stream().mapToLong(Long::longValue).sum();
// return IndexUpdateReply.getDefaultInstance();
}
protected static class OutdatedUpdateException extends RuntimeException {
OutdatedUpdateException() {
super();
}
OutdatedUpdateException(String message) {
super(message);
}
@Override
public synchronized Throwable fillInStackTrace() {
return this;
if (curKnownRemoteVersion == null || curKnownRemoteVersion < receivedTotalVer) {
current = current.withKnownRemoteVersions(current.knownRemoteVersions().plus(from, receivedTotalVer));
curTx.put(current);
}
}
}

View File

@@ -0,0 +1,42 @@
package com.usatiuk.dhfs.objects.repository;
import com.usatiuk.dhfs.objects.PeerId;
import org.pcollections.PMap;
import java.util.stream.Collectors;
import java.util.stream.Stream;
public class SyncHelper {
public enum ChangelogCmpResult {
EQUAL,
NEWER,
OLDER,
CONFLICT
}
public static ChangelogCmpResult compareChangelogs(PMap<PeerId, Long> current, PMap<PeerId, Long> other) {
boolean hasLower = false;
boolean hasHigher = false;
for (var e : Stream.concat(current.keySet().stream(), other.keySet().stream()).collect(Collectors.toUnmodifiableSet())) {
if (other.getOrDefault(e, 0L) < current.getOrDefault(e, 0L))
hasLower = true;
if (other.getOrDefault(e, 0L) > current.getOrDefault(e, 0L))
hasHigher = true;
}
if (hasLower && hasHigher)
return ChangelogCmpResult.CONFLICT;
if (hasLower)
return ChangelogCmpResult.OLDER;
if (hasHigher)
return ChangelogCmpResult.NEWER;
return ChangelogCmpResult.EQUAL;
}
// public static PMap<PeerId,Long> mergeChangelogs(PMap<PeerId, Long> current, PMap<PeerId, Long> other) {
// return current.plusAll(other);
// }
}

View File

@@ -2,11 +2,11 @@ package com.usatiuk.dhfs.objects.repository.invalidation;
import com.usatiuk.dhfs.objects.JObjectKey;
import com.usatiuk.dhfs.objects.PeerId;
import com.usatiuk.dhfs.objects.RemoteObject;
import com.usatiuk.dhfs.objects.RemoteObjectMeta;
import org.pcollections.PMap;
public record IndexUpdateOp(JObjectKey key, PMap<PeerId, Long> changelog) implements Op {
public IndexUpdateOp(RemoteObject<?> object) {
this(object.key(), object.meta().changelog());
public IndexUpdateOp(RemoteObjectMeta object) {
this(object.key(), object.changelog());
}
}

View File

@@ -27,8 +27,8 @@ public class OpPusher {
Op info = txm.run(() -> {
var obj = curTx.get(JData.class, key).orElse(null);
switch (obj) {
case RemoteObject<?> remote -> {
return new IndexUpdateOp(key, remote.meta().changelog());
case RemoteObjectMeta remote -> {
return new IndexUpdateOp(key, remote.changelog());
}
case JKleppmannTreePersistentData pd -> {
var maybeQueue = pd.queues().get(op);

View File

@@ -1,6 +1,5 @@
package com.usatiuk.dhfs.objects.repository.invalidation;
import com.usatiuk.dhfs.objects.JDataRemote;
import com.usatiuk.dhfs.objects.PeerId;
import com.usatiuk.dhfs.objects.RemoteTransaction;
import com.usatiuk.dhfs.objects.repository.SyncHandler;
@@ -18,8 +17,6 @@ public class PushOpHandler {
RemoteTransaction remoteTransaction;
public void handlePush(PeerId peer, IndexUpdateOp obj) {
syncHandler.handleRemoteUpdate(peer, obj.key(),
remoteTransaction.get(JDataRemote.class, obj.key()).orElse(null),
obj.changelog());
syncHandler.handleRemoteUpdate(peer, obj.key(), obj.changelog(), null);
}
}

View File

@@ -72,7 +72,7 @@ public class PeerInfoService {
jObjectTxManager.run(() -> {
var parent = getTree().traverse(List.of());
var newPeerInfo = new PeerInfo(id, cert);
remoteTx.put(newPeerInfo);
remoteTx.putData(newPeerInfo);
getTree().move(parent, new JKleppmannTreeNodeMetaPeer(newPeerInfo.id()), getTree().getNewNodeId());
});
}

View File

@@ -38,6 +38,7 @@ public class DataLocker {
private static class LockTag {
final Thread owner = Thread.currentThread();
// final StackTraceElement[] _creationStack = Thread.currentThread().getStackTrace();
boolean released = false;
}