mirror of
https://github.com/usatiuk/dhfs.git
synced 2025-10-28 20:47:49 +01:00
Server: separate ref types
as we need to get the real "parent" object when sending some requests
This commit is contained in:
@@ -5,7 +5,7 @@ import org.apache.commons.lang3.tuple.Pair;
|
||||
|
||||
import java.util.Iterator;
|
||||
|
||||
public interface CloseableKvIterator<K extends Comparable<K>, V> extends Iterator<Pair<K, V>>, AutoCloseableNoThrow {
|
||||
public interface CloseableKvIterator<K extends Comparable<? super K>, V> extends Iterator<Pair<K, V>>, AutoCloseableNoThrow {
|
||||
K peekNextKey();
|
||||
|
||||
void skip();
|
||||
@@ -19,6 +19,6 @@ public interface CloseableKvIterator<K extends Comparable<K>, V> extends Iterato
|
||||
void skipPrev();
|
||||
|
||||
default CloseableKvIterator<K, V> reversed() {
|
||||
return new ReversedKvIterator<>(this);
|
||||
return new ReversedKvIterator<K, V>(this);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,7 +2,7 @@ package com.usatiuk.dhfs.objects.iterators;
|
||||
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
|
||||
public class ReversedKvIterator<K extends Comparable<K>, V> implements CloseableKvIterator<K, V> {
|
||||
public class ReversedKvIterator<K extends Comparable<? super K>, V> implements CloseableKvIterator<K, V> {
|
||||
private final CloseableKvIterator<K, V> _backing;
|
||||
|
||||
public ReversedKvIterator(CloseableKvIterator<K, V> backing) {
|
||||
|
||||
@@ -0,0 +1,17 @@
|
||||
package com.usatiuk.dhfs.objects;
|
||||
|
||||
import com.usatiuk.dhfs.objects.jmap.JMapRef;
|
||||
|
||||
public record JDataNormalRef(JObjectKey obj) implements JDataRef {
|
||||
@Override
|
||||
public int compareTo(JDataRef o) {
|
||||
if (o instanceof JDataNormalRef) {
|
||||
return obj.compareTo(((JDataNormalRef) o).obj);
|
||||
} else if (o instanceof JMapRef) {
|
||||
// TODO: Prettier?
|
||||
return -1;
|
||||
} else {
|
||||
throw new IllegalArgumentException("Unknown type of JDataRef");
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,7 @@
|
||||
package com.usatiuk.dhfs.objects;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
||||
public interface JDataRef extends Comparable<JDataRef>, Serializable {
|
||||
JObjectKey obj();
|
||||
}
|
||||
@@ -6,15 +6,15 @@ import java.util.Collection;
|
||||
import java.util.List;
|
||||
|
||||
public interface JDataRefcounted extends JData {
|
||||
PCollection<JObjectKey> refsFrom();
|
||||
PCollection<JDataRef> refsFrom();
|
||||
|
||||
JDataRefcounted withRefsFrom(PCollection<JObjectKey> refs);
|
||||
JDataRefcounted withRefsFrom(PCollection<JDataRef> refs);
|
||||
|
||||
boolean frozen();
|
||||
|
||||
JDataRefcounted withFrozen(boolean frozen);
|
||||
|
||||
default Collection<JObjectKey> collectRefsTo() {
|
||||
default Collection<JDataRef> collectRefsTo() {
|
||||
return List.of();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -33,16 +33,16 @@ public class RefcounterTxHook implements PreCommitTxHook {
|
||||
|
||||
for (var curRef : curRefs) {
|
||||
if (!oldRefs.contains(curRef)) {
|
||||
var referenced = getRef(curRef);
|
||||
curTx.put(referenced.withRefsFrom(referenced.refsFrom().plus(key)));
|
||||
var referenced = getRef(curRef.obj());
|
||||
curTx.put(referenced.withRefsFrom(referenced.refsFrom().plus(new JDataNormalRef(curRef.obj()))));
|
||||
Log.tracev("Added ref from {0} to {1}", key, curRef);
|
||||
}
|
||||
}
|
||||
|
||||
for (var oldRef : oldRefs) {
|
||||
if (!curRefs.contains(oldRef)) {
|
||||
var referenced = getRef(oldRef);
|
||||
curTx.put(referenced.withRefsFrom(referenced.refsFrom().minus(key)));
|
||||
var referenced = getRef(oldRef.obj());
|
||||
curTx.put(referenced.withRefsFrom(referenced.refsFrom().minus(new JDataNormalRef(oldRef.obj()))));
|
||||
Log.tracev("Removed ref from {0} to {1}", key, oldRef);
|
||||
}
|
||||
}
|
||||
@@ -55,8 +55,8 @@ public class RefcounterTxHook implements PreCommitTxHook {
|
||||
}
|
||||
|
||||
for (var newRef : refCur.collectRefsTo()) {
|
||||
var referenced = getRef(newRef);
|
||||
curTx.put(referenced.withRefsFrom(referenced.refsFrom().plus(key)));
|
||||
var referenced = getRef(newRef.obj());
|
||||
curTx.put(referenced.withRefsFrom(referenced.refsFrom().plus(new JDataNormalRef(newRef.obj()))));
|
||||
Log.tracev("Added ref from {0} to {1}", key, newRef);
|
||||
}
|
||||
}
|
||||
@@ -68,8 +68,8 @@ public class RefcounterTxHook implements PreCommitTxHook {
|
||||
}
|
||||
|
||||
for (var removedRef : refCur.collectRefsTo()) {
|
||||
var referenced = getRef(removedRef);
|
||||
curTx.put(referenced.withRefsFrom(referenced.refsFrom().minus(key)));
|
||||
var referenced = getRef(removedRef.obj());
|
||||
curTx.put(referenced.withRefsFrom(referenced.refsFrom().minus(new JDataNormalRef(removedRef.obj()))));
|
||||
Log.tracev("Removed ref from {0} to {1}", key, removedRef);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5,7 +5,7 @@ import org.pcollections.PCollection;
|
||||
|
||||
import java.util.Collection;
|
||||
|
||||
public record RemoteObjectDataWrapper<T extends JDataRemote>(PCollection<JObjectKey> refsFrom,
|
||||
public record RemoteObjectDataWrapper<T extends JDataRemote>(PCollection<JDataRef> refsFrom,
|
||||
boolean frozen,
|
||||
T data) implements JDataRefcounted {
|
||||
public RemoteObjectDataWrapper(T data) {
|
||||
@@ -13,7 +13,7 @@ public record RemoteObjectDataWrapper<T extends JDataRemote>(PCollection<JObject
|
||||
}
|
||||
|
||||
@Override
|
||||
public RemoteObjectDataWrapper<T> withRefsFrom(PCollection<JObjectKey> refs) {
|
||||
public RemoteObjectDataWrapper<T> withRefsFrom(PCollection<JDataRef> refs) {
|
||||
return new RemoteObjectDataWrapper<>(refs, frozen, data);
|
||||
}
|
||||
|
||||
@@ -32,8 +32,8 @@ public record RemoteObjectDataWrapper<T extends JDataRemote>(PCollection<JObject
|
||||
}
|
||||
|
||||
@Override
|
||||
public Collection<JObjectKey> collectRefsTo() {
|
||||
return data.collectRefsTo();
|
||||
public Collection<JDataRef> collectRefsTo() {
|
||||
return data.collectRefsTo().stream().<JDataRef>map(JDataNormalRef::new).toList();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -5,7 +5,7 @@ import org.pcollections.*;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
|
||||
public record RemoteObjectMeta(PCollection<JObjectKey> refsFrom, boolean frozen,
|
||||
public record RemoteObjectMeta(PCollection<JDataRef> refsFrom, boolean frozen,
|
||||
JObjectKey key,
|
||||
PMap<PeerId, Long> knownRemoteVersions,
|
||||
Class<? extends JDataRemote> knownType,
|
||||
@@ -53,7 +53,7 @@ public record RemoteObjectMeta(PCollection<JObjectKey> refsFrom, boolean frozen,
|
||||
}
|
||||
|
||||
@Override
|
||||
public RemoteObjectMeta withRefsFrom(PCollection<JObjectKey> refs) {
|
||||
public RemoteObjectMeta withRefsFrom(PCollection<JDataRef> refs) {
|
||||
return new RemoteObjectMeta(refs, frozen, key, knownRemoteVersions, knownType, confirmedDeletes, seen, changelog, hasLocalData);
|
||||
}
|
||||
|
||||
@@ -91,8 +91,8 @@ public record RemoteObjectMeta(PCollection<JObjectKey> refsFrom, boolean frozen,
|
||||
}
|
||||
|
||||
@Override
|
||||
public Collection<JObjectKey> collectRefsTo() {
|
||||
if (hasLocalData) return List.of(dataKey());
|
||||
public Collection<JDataRef> collectRefsTo() {
|
||||
if (hasLocalData) return List.of(new JDataNormalRef(dataKey()));
|
||||
return List.of();
|
||||
}
|
||||
|
||||
|
||||
@@ -1,8 +1,6 @@
|
||||
package com.usatiuk.dhfs.objects.jkleppmanntree.structs;
|
||||
|
||||
import com.usatiuk.dhfs.objects.JDataRefcounted;
|
||||
import com.usatiuk.dhfs.objects.JObjectKey;
|
||||
import com.usatiuk.dhfs.objects.PeerId;
|
||||
import com.usatiuk.dhfs.objects.*;
|
||||
import com.usatiuk.dhfs.objects.repository.peersync.structs.JKleppmannTreeNodeMetaPeer;
|
||||
import com.usatiuk.kleppmanntree.OpMove;
|
||||
import com.usatiuk.kleppmanntree.TreeNode;
|
||||
@@ -13,13 +11,11 @@ import org.pcollections.TreePSet;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
// FIXME: Ideally this is two classes?
|
||||
public record JKleppmannTreeNode(JObjectKey key, PCollection<JObjectKey> refsFrom, boolean frozen, JObjectKey parent,
|
||||
public record JKleppmannTreeNode(JObjectKey key, PCollection<JDataRef> refsFrom, boolean frozen, JObjectKey parent,
|
||||
OpMove<Long, PeerId, JKleppmannTreeNodeMeta, JObjectKey> lastEffectiveOp,
|
||||
JKleppmannTreeNodeMeta meta,
|
||||
PMap<String, JObjectKey> children) implements TreeNode<Long, PeerId, JKleppmannTreeNodeMeta, JObjectKey>, JDataRefcounted, Serializable {
|
||||
@@ -49,7 +45,7 @@ public record JKleppmannTreeNode(JObjectKey key, PCollection<JObjectKey> refsFro
|
||||
}
|
||||
|
||||
@Override
|
||||
public JKleppmannTreeNode withRefsFrom(PCollection<JObjectKey> refs) {
|
||||
public JKleppmannTreeNode withRefsFrom(PCollection<JDataRef> refs) {
|
||||
return new JKleppmannTreeNode(key, refs, frozen, parent, lastEffectiveOp, meta, children);
|
||||
}
|
||||
|
||||
@@ -59,12 +55,12 @@ public record JKleppmannTreeNode(JObjectKey key, PCollection<JObjectKey> refsFro
|
||||
}
|
||||
|
||||
@Override
|
||||
public Collection<JObjectKey> collectRefsTo() {
|
||||
return Stream.concat(children().values().stream(),
|
||||
public Collection<JDataRef> collectRefsTo() {
|
||||
return Stream.<JDataRef>concat(children().values().stream().map(JDataNormalRef::new),
|
||||
switch (meta()) {
|
||||
case JKleppmannTreeNodeMetaDirectory dir -> Stream.<JObjectKey>of();
|
||||
case JKleppmannTreeNodeMetaFile file -> Stream.of(file.getFileIno());
|
||||
case JKleppmannTreeNodeMetaPeer peer -> Stream.of(peer.getPeerId());
|
||||
case JKleppmannTreeNodeMetaDirectory dir -> Stream.<JDataNormalRef>of();
|
||||
case JKleppmannTreeNodeMetaFile file -> Stream.of(new JDataNormalRef(file.getFileIno()));
|
||||
case JKleppmannTreeNodeMetaPeer peer -> Stream.of(new JDataNormalRef(peer.getPeerId()));
|
||||
default -> throw new IllegalStateException("Unexpected value: " + meta());
|
||||
}
|
||||
).collect(Collectors.toUnmodifiableSet());
|
||||
|
||||
@@ -1,27 +1,25 @@
|
||||
package com.usatiuk.dhfs.objects.jkleppmanntree.structs;
|
||||
|
||||
import com.usatiuk.dhfs.objects.JDataRefcounted;
|
||||
import com.usatiuk.dhfs.objects.JObjectKey;
|
||||
import com.usatiuk.dhfs.objects.PeerId;
|
||||
import com.usatiuk.dhfs.objects.*;
|
||||
import com.usatiuk.kleppmanntree.CombinedTimestamp;
|
||||
import com.usatiuk.kleppmanntree.LogRecord;
|
||||
import com.usatiuk.kleppmanntree.OpMove;
|
||||
import org.pcollections.PCollection;
|
||||
import org.pcollections.PMap;
|
||||
import org.pcollections.PSortedMap;
|
||||
import org.pcollections.TreePMap;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
|
||||
public record JKleppmannTreePersistentData(
|
||||
JObjectKey key, PCollection<JObjectKey> refsFrom, boolean frozen,
|
||||
JObjectKey key, PCollection<JDataRef> refsFrom, boolean frozen,
|
||||
long clock,
|
||||
PMap<PeerId, PSortedMap<CombinedTimestamp<Long, PeerId>, OpMove<Long, PeerId, JKleppmannTreeNodeMeta, JObjectKey>>> queues,
|
||||
PMap<PeerId, Long> peerTimestampLog,
|
||||
PSortedMap<CombinedTimestamp<Long, PeerId>, LogRecord<Long, PeerId, JKleppmannTreeNodeMeta, JObjectKey>> log
|
||||
) implements JDataRefcounted {
|
||||
@Override
|
||||
public JKleppmannTreePersistentData withRefsFrom(PCollection<JObjectKey> refs) {
|
||||
public JKleppmannTreePersistentData withRefsFrom(PCollection<JDataRef> refs) {
|
||||
return new JKleppmannTreePersistentData(key, refs, frozen, clock, queues, peerTimestampLog, log);
|
||||
}
|
||||
|
||||
@@ -47,7 +45,8 @@ public record JKleppmannTreePersistentData(
|
||||
}
|
||||
|
||||
@Override
|
||||
public Collection<JObjectKey> collectRefsTo() {
|
||||
return List.of(new JObjectKey(key().name() + "_jt_trash"), new JObjectKey(key().name() + "_jt_root"));
|
||||
public Collection<JDataRef> collectRefsTo() {
|
||||
return List.of(new JObjectKey(key().name() + "_jt_trash"), new JObjectKey(key().name() + "_jt_root"))
|
||||
.stream().<JDataRef>map(JDataNormalRef::new).toList();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,9 +3,9 @@ package com.usatiuk.dhfs.objects.jmap;
|
||||
import com.usatiuk.dhfs.objects.JData;
|
||||
import com.usatiuk.dhfs.objects.JObjectKey;
|
||||
|
||||
public record JMapEntry<K extends JMapKey & Comparable<K>>(JObjectKey holder,
|
||||
K selfKey,
|
||||
JObjectKey ref) implements JData {
|
||||
public record JMapEntry<K extends JMapKey>(JObjectKey holder,
|
||||
K selfKey,
|
||||
JObjectKey ref) implements JData {
|
||||
@Override
|
||||
public JObjectKey key() {
|
||||
return JMapHelper.makeKey(holder, selfKey);
|
||||
|
||||
@@ -16,40 +16,40 @@ public class JMapHelper {
|
||||
@Inject
|
||||
Transaction curTx;
|
||||
|
||||
static <K extends JMapKey & Comparable<K>> JObjectKey makePrefix(JObjectKey holder) {
|
||||
static <K extends JMapKey> JObjectKey makePrefix(JObjectKey holder) {
|
||||
return JObjectKey.of(holder.name() + "/");
|
||||
}
|
||||
|
||||
static <K extends JMapKey & Comparable<K>> JObjectKey makeKey(JObjectKey holder, K key) {
|
||||
static <K extends JMapKey> JObjectKey makeKey(JObjectKey holder, K key) {
|
||||
return JObjectKey.of(makePrefix(holder).name() + key.toString());
|
||||
}
|
||||
|
||||
public <K extends JMapKey & Comparable<K>> CloseableKvIterator<K, JMapEntry<K>> getIterator(JMapHolder<K> holder, IteratorStart start, K key) {
|
||||
public <K extends JMapKey> CloseableKvIterator<K, JMapEntry<K>> getIterator(JMapHolder<K> holder, IteratorStart start, K key) {
|
||||
return new JMapIterator<>(curTx.getIterator(start, makeKey(holder.key(), key)), holder);
|
||||
}
|
||||
|
||||
public <K extends JMapKey & Comparable<K>> CloseableKvIterator<K, JMapEntry<K>> getIterator(JMapHolder<K> holder, K key) {
|
||||
public <K extends JMapKey> CloseableKvIterator<K, JMapEntry<K>> getIterator(JMapHolder<K> holder, K key) {
|
||||
return getIterator(holder, IteratorStart.GE, key);
|
||||
}
|
||||
|
||||
public <K extends JMapKey & Comparable<K>> CloseableKvIterator<K, JMapEntry<K>> getIterator(JMapHolder<K> holder) {
|
||||
public <K extends JMapKey> CloseableKvIterator<K, JMapEntry<K>> getIterator(JMapHolder<K> holder) {
|
||||
return new JMapIterator<>(curTx.getIterator(IteratorStart.GE, makePrefix(holder.key())), holder);
|
||||
}
|
||||
|
||||
public <K extends JMapKey & Comparable<K>> void put(JMapHolder<K> holder, K key, JObjectKey ref) {
|
||||
public <K extends JMapKey> void put(JMapHolder<K> holder, K key, JObjectKey ref) {
|
||||
curTx.put(new JMapEntry<>(holder.key(), key, ref));
|
||||
}
|
||||
|
||||
public <K extends JMapKey & Comparable<K>> Optional<JMapEntry<K>> get(JMapHolder<K> holder, K key) {
|
||||
public <K extends JMapKey> Optional<JMapEntry<K>> get(JMapHolder<K> holder, K key) {
|
||||
// TODO:
|
||||
return curTx.get(JMapEntry.class, makeKey(holder.key(), key)).map(e -> (JMapEntry<K>) e);
|
||||
}
|
||||
|
||||
public <K extends JMapKey & Comparable<K>> void delete(JMapHolder<K> holder, K key) {
|
||||
public <K extends JMapKey> void delete(JMapHolder<K> holder, K key) {
|
||||
curTx.delete(makeKey(holder.key(), key));
|
||||
}
|
||||
|
||||
public <K extends JMapKey & Comparable<K>> void deleteAll(JMapHolder<K> he) {
|
||||
public <K extends JMapKey> void deleteAll(JMapHolder<K> he) {
|
||||
ArrayList<K> collectedKeys = new ArrayList<>();
|
||||
try (var it = getIterator(he)) {
|
||||
while (it.hasNext()) {
|
||||
|
||||
@@ -2,5 +2,5 @@ package com.usatiuk.dhfs.objects.jmap;
|
||||
|
||||
import com.usatiuk.dhfs.objects.JData;
|
||||
|
||||
public interface JMapHolder<K extends JMapKey & Comparable<K>> extends JData {
|
||||
public interface JMapHolder<K extends JMapKey> extends JData {
|
||||
}
|
||||
|
||||
@@ -6,7 +6,7 @@ import com.usatiuk.dhfs.objects.JObjectKey;
|
||||
import org.apache.commons.lang3.NotImplementedException;
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
|
||||
public class JMapIterator<K extends JMapKey & Comparable<K>> implements CloseableKvIterator<K, JMapEntry<K>> {
|
||||
public class JMapIterator<K extends JMapKey> implements CloseableKvIterator<K, JMapEntry<K>> {
|
||||
private final CloseableKvIterator<JObjectKey, JData> _backing;
|
||||
private final JObjectKey _prefix;
|
||||
private boolean _hasNext = true;
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
package com.usatiuk.dhfs.objects.jmap;
|
||||
|
||||
public interface JMapKey {
|
||||
public interface JMapKey extends Comparable<JMapKey> {
|
||||
}
|
||||
|
||||
@@ -3,7 +3,7 @@ package com.usatiuk.dhfs.objects.jmap;
|
||||
import javax.annotation.Nonnull;
|
||||
import java.io.Serializable;
|
||||
|
||||
public record JMapLongKey(long key) implements JMapKey, Comparable<JMapLongKey>, Serializable {
|
||||
public record JMapLongKey(long key) implements JMapKey, Comparable<JMapKey>, Serializable {
|
||||
public static JMapLongKey of(long key) {
|
||||
return new JMapLongKey(key);
|
||||
}
|
||||
@@ -18,7 +18,10 @@ public record JMapLongKey(long key) implements JMapKey, Comparable<JMapLongKey>,
|
||||
}
|
||||
|
||||
@Override
|
||||
public int compareTo(@Nonnull JMapLongKey o) {
|
||||
return Long.compare(key, o.key);
|
||||
public int compareTo(@Nonnull JMapKey o) {
|
||||
if (!(o instanceof JMapLongKey lk)) {
|
||||
throw new IllegalArgumentException("Unknown type of JMapKey");
|
||||
}
|
||||
return Long.compare(key, lk.key);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,26 @@
|
||||
package com.usatiuk.dhfs.objects.jmap;
|
||||
|
||||
import com.usatiuk.dhfs.objects.JDataNormalRef;
|
||||
import com.usatiuk.dhfs.objects.JDataRef;
|
||||
import com.usatiuk.dhfs.objects.JObjectKey;
|
||||
|
||||
import java.util.Comparator;
|
||||
|
||||
public record JMapRef(JObjectKey holder, JMapKey mapKey) implements JDataRef {
|
||||
@Override
|
||||
public JObjectKey obj() {
|
||||
return holder;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int compareTo(JDataRef o) {
|
||||
if (o instanceof JMapRef mr) {
|
||||
return Comparator.comparing(JMapRef::obj).thenComparing(JMapRef::mapKey).compare(this, mr);
|
||||
} else if (o instanceof JDataNormalRef) {
|
||||
// TODO: Prettier?
|
||||
return 1;
|
||||
} else {
|
||||
throw new IllegalArgumentException("Unknown type of JDataRef");
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,6 +1,9 @@
|
||||
package com.usatiuk.dhfs.objects.jmap;
|
||||
|
||||
import com.usatiuk.dhfs.objects.*;
|
||||
import com.usatiuk.dhfs.objects.JData;
|
||||
import com.usatiuk.dhfs.objects.JDataRefcounted;
|
||||
import com.usatiuk.dhfs.objects.JObjectKey;
|
||||
import com.usatiuk.dhfs.objects.RemoteObjectMeta;
|
||||
import com.usatiuk.dhfs.objects.transaction.PreCommitTxHook;
|
||||
import com.usatiuk.dhfs.objects.transaction.Transaction;
|
||||
import io.quarkus.logging.Log;
|
||||
@@ -32,9 +35,9 @@ public class JMapRefcounterTxHook implements PreCommitTxHook {
|
||||
var oldRef = oldMe.ref();
|
||||
var curRef = me.ref();
|
||||
var referencedOld = getRef(oldRef);
|
||||
curTx.put(referencedOld.withRefsFrom(referencedOld.refsFrom().minus(key)));
|
||||
curTx.put(referencedOld.withRefsFrom(referencedOld.refsFrom().minus(new JMapRef(me.holder(), me.selfKey()))));
|
||||
var referencedCur = getRef(curRef);
|
||||
curTx.put(referencedCur.withRefsFrom(referencedCur.refsFrom().plus(key)));
|
||||
curTx.put(referencedCur.withRefsFrom(referencedCur.refsFrom().plus(new JMapRef(me.holder(), me.selfKey()))));
|
||||
Log.tracev("Removed ref from {0} to {1}, added ref to {2}", key, oldRef, curRef);
|
||||
}
|
||||
|
||||
@@ -46,7 +49,7 @@ public class JMapRefcounterTxHook implements PreCommitTxHook {
|
||||
|
||||
var curRef = me.ref();
|
||||
var referencedCur = getRef(curRef);
|
||||
curTx.put(referencedCur.withRefsFrom(referencedCur.refsFrom().plus(key)));
|
||||
curTx.put(referencedCur.withRefsFrom(referencedCur.refsFrom().plus(new JMapRef(me.holder(), me.selfKey()))));
|
||||
Log.tracev("Added ref from {0} to {1}", key, curRef);
|
||||
}
|
||||
|
||||
@@ -58,7 +61,7 @@ public class JMapRefcounterTxHook implements PreCommitTxHook {
|
||||
|
||||
var oldRef = me.ref();
|
||||
var referencedOld = getRef(oldRef);
|
||||
curTx.put(referencedOld.withRefsFrom(referencedOld.refsFrom().minus(key)));
|
||||
curTx.put(referencedOld.withRefsFrom(referencedOld.refsFrom().minus(new JMapRef(me.holder(), me.selfKey()))));
|
||||
Log.tracev("Removed ref from {0} to {1}", key, oldRef);
|
||||
}
|
||||
|
||||
|
||||
@@ -152,13 +152,13 @@ public class RemoteObjectServiceClient {
|
||||
return OpPushReply.getDefaultInstance();
|
||||
}
|
||||
|
||||
public Collection<Pair<PeerId, CanDeleteReply>> canDelete(Collection<PeerId> targets, JObjectKey objKey, Collection<JObjectKey> ourReferrers) {
|
||||
public Collection<Pair<PeerId, CanDeleteReply>> canDelete(Collection<PeerId> targets, JObjectKey objKey, Collection<JDataRef> ourReferrers) {
|
||||
Log.trace("Asking canDelete for " + objKey + " from " + targets.stream().map(PeerId::toString).collect(Collectors.joining(", ")));
|
||||
try {
|
||||
return _batchExecutor.invokeAll(targets.stream().<Callable<Pair<PeerId, CanDeleteReply>>>map(h -> () -> {
|
||||
var req = CanDeleteRequest.newBuilder().setName(JObjectKeyP.newBuilder().setName(objKey.toString()).build());
|
||||
for (var ref : ourReferrers) {
|
||||
req.addOurReferrers(JObjectKeyP.newBuilder().setName(ref.toString()).build());
|
||||
req.addOurReferrers(JObjectKeyP.newBuilder().setName(ref.obj().toString()).build());
|
||||
}
|
||||
return Pair.of(h, rpcClientFactory.withObjSyncClient(h, (p, client) -> client.canDelete(req.build())));
|
||||
}).toList()).stream().map(f -> {
|
||||
|
||||
@@ -141,8 +141,8 @@ public class RemoteObjectServiceServer implements DhfsObjectSyncGrpc {
|
||||
|
||||
if (!builder.getDeletionCandidate()) {
|
||||
for (var r : obj.refsFrom()) {
|
||||
builder.addReferrers(JObjectKeyP.newBuilder().setName(r.toString()).build());
|
||||
curTx.onCommit(() -> autosyncProcessor.add(r));
|
||||
builder.addReferrers(JObjectKeyP.newBuilder().setName(r.obj().toString()).build());
|
||||
curTx.onCommit(() -> autosyncProcessor.add(r.obj()));
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
@@ -4,8 +4,8 @@ import com.usatiuk.dhfs.TempDataProfile;
|
||||
import com.usatiuk.dhfs.files.objects.File;
|
||||
import com.usatiuk.dhfs.files.service.DhfsFileService;
|
||||
import com.usatiuk.dhfs.objects.RemoteTransaction;
|
||||
import com.usatiuk.dhfs.objects.transaction.TransactionManager;
|
||||
import com.usatiuk.dhfs.objects.transaction.Transaction;
|
||||
import com.usatiuk.dhfs.objects.transaction.TransactionManager;
|
||||
import com.usatiuk.kleppmanntree.AlreadyExistsException;
|
||||
import jakarta.inject.Inject;
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
|
||||
Reference in New Issue
Block a user