mirror of
https://github.com/usatiuk/dhfs.git
synced 2025-10-28 20:47:49 +01:00
Compare commits
1 Commits
3bf4784c0d
...
bb-keys
| Author | SHA1 | Date | |
|---|---|---|---|
| d2fdf03d19 |
@@ -2,7 +2,7 @@
|
||||
<configuration default="false" name="Main" type="QsApplicationConfigurationType" factoryName="QuarkusApplication" nameIsGenerated="true">
|
||||
<option name="MAIN_CLASS_NAME" value="com.usatiuk.dhfs.app.Main" />
|
||||
<module name="dhfs-app" />
|
||||
<option name="VM_PARAMETERS" value="-XX:+UnlockDiagnosticVMOptions -XX:+DebugNonSafepoints --add-exports java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-exports java.base/jdk.internal.access=ALL-UNNAMED -ea -Dcom.usatiuk.dhfs.supportlib.native-path=$ProjectFileDir$/target/classes/native -Xmx2G -Ddhfs.webui.root=$ProjectFileDir$/../webui/dist -Ddhfs.fuse.root=${HOME}/dhfs_test/1/fuse -Ddhfs.objects.persistence.files.root=${HOME}/dhfs_test/1/data -Ddhfs.objects.persistence.stuff.root=${HOME}/dhfs_test/1/data/stuff -Ddhfs.objects.peerdiscovery.broadcast=true -Dquarkus.http.port=8080 -Dquarkus.http.ssl-port=9011 -Ddhfs.peerdiscovery.preset-uuid=11000000-0000-0000-0000-000000000000 -Ddhfs.peerdiscovery.static-peers=22000000-0000-0000-0000-000000000000:127.0.0.1:9020:9021 -Dquarkus.http.host=0.0.0.0" />
|
||||
<option name="VM_PARAMETERS" value="-XX:+UnlockDiagnosticVMOptions -XX:+DebugNonSafepoints --add-exports java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-exports java.base/jdk.internal.access=ALL-UNNAMED -ea -Dcom.usatiuk.dhfs.supportlib.native-path=$ProjectFileDir$/target/classes/native -Xmx2G -Ddhfs.webui.root=$ProjectFileDir$/../webui/dist -Ddhfs.fuse.root=${HOME}/dhfs_test/1/fuse -Ddhfs.objects.persistence.files.root=${HOME}/dhfs_test/1/data -Ddhfs.objects.persistence.stuff.root=${HOME}/dhfs_test/1/data/stuff -Ddhfs.objects.peerdiscovery.broadcast=true -Dquarkus.http.port=9010 -Dquarkus.http.ssl-port=9011 -Ddhfs.peerdiscovery.preset-uuid=11000000-0000-0000-0000-000000000000 -Ddhfs.peerdiscovery.static-peers=22000000-0000-0000-0000-000000000000:127.0.0.1:9020:9021 -Dquarkus.http.host=0.0.0.0" />
|
||||
<extension name="coverage">
|
||||
<pattern>
|
||||
<option name="PATTERN" value="com.usatiuk.dhfs.*" />
|
||||
|
||||
@@ -28,7 +28,7 @@ dhfs.objects.opsender.batch-size=100
|
||||
dhfs.objects.lock_timeout_secs=2
|
||||
dhfs.local-discovery=true
|
||||
dhfs.peerdiscovery.timeout=10000
|
||||
quarkus.log.category."com.usatiuk".min-level=TRACE
|
||||
quarkus.log.category."com.usatiuk".level=TRACE
|
||||
quarkus.log.category."com.usatiuk".min-level=INFO
|
||||
quarkus.log.category."com.usatiuk".level=INFO
|
||||
quarkus.http.insecure-requests=enabled
|
||||
quarkus.http.ssl.client-auth=required
|
||||
|
||||
@@ -1,5 +1,8 @@
|
||||
package com.usatiuk.objects;
|
||||
|
||||
import com.google.protobuf.ByteString;
|
||||
import com.usatiuk.dhfs.supportlib.UninitializedByteBuffer;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
@@ -10,11 +13,19 @@ public sealed interface JObjectKey extends Serializable, Comparable<JObjectKey>
|
||||
JObjectKeyMax MAX = new JObjectKeyMax();
|
||||
|
||||
static JObjectKey of(String value) {
|
||||
return new JObjectKeyImpl(value);
|
||||
var heapBb = StandardCharsets.UTF_8.encode(value);
|
||||
if (heapBb.isDirect()) return new JObjectKeyImpl(heapBb);
|
||||
return fromByteBuffer(heapBb);
|
||||
}
|
||||
|
||||
static JObjectKey of(ByteString value) {
|
||||
var heapBb = value.asReadOnlyByteBuffer();
|
||||
if (heapBb.isDirect()) return new JObjectKeyImpl(heapBb);
|
||||
return fromByteBuffer(heapBb);
|
||||
}
|
||||
|
||||
static JObjectKey random() {
|
||||
return new JObjectKeyImpl(UUID.randomUUID().toString());
|
||||
return JObjectKey.of(UUID.randomUUID().toString());
|
||||
}
|
||||
|
||||
static JObjectKey first() {
|
||||
@@ -25,12 +36,11 @@ public sealed interface JObjectKey extends Serializable, Comparable<JObjectKey>
|
||||
return MAX;
|
||||
}
|
||||
|
||||
static JObjectKey fromBytes(byte[] bytes) {
|
||||
return new JObjectKeyImpl(new String(bytes, StandardCharsets.UTF_8));
|
||||
}
|
||||
|
||||
static JObjectKey fromByteBuffer(ByteBuffer buff) {
|
||||
return new JObjectKeyImpl(StandardCharsets.UTF_8.decode(buff).toString());
|
||||
var directBb = UninitializedByteBuffer.allocateUninitialized(buff.remaining());
|
||||
directBb.put(buff);
|
||||
directBb.flip();
|
||||
return new JObjectKeyImpl(directBb);
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -39,9 +49,7 @@ public sealed interface JObjectKey extends Serializable, Comparable<JObjectKey>
|
||||
@Override
|
||||
String toString();
|
||||
|
||||
byte[] bytes();
|
||||
|
||||
ByteBuffer toByteBuffer();
|
||||
|
||||
String value();
|
||||
ByteString value();
|
||||
}
|
||||
|
||||
@@ -1,16 +1,71 @@
|
||||
package com.usatiuk.objects;
|
||||
|
||||
import com.google.protobuf.ByteString;
|
||||
import com.google.protobuf.UnsafeByteOperations;
|
||||
import com.usatiuk.dhfs.supportlib.UninitializedByteBuffer;
|
||||
|
||||
import java.io.*;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.Base64;
|
||||
import java.util.Objects;
|
||||
|
||||
public record JObjectKeyImpl(SerializableByteBufferWrapper rawValue) implements JObjectKey {
|
||||
private static class SerializableByteBufferWrapper implements Serializable {
|
||||
@Serial
|
||||
private static final long serialVersionUID = 1L;
|
||||
private ByteBuffer byteBuffer;
|
||||
|
||||
public SerializableByteBufferWrapper(ByteBuffer byteBuffer) {
|
||||
this.byteBuffer = byteBuffer;
|
||||
}
|
||||
|
||||
public ByteBuffer getByteBuffer() {
|
||||
return byteBuffer;
|
||||
}
|
||||
|
||||
@Serial
|
||||
private void writeObject(ObjectOutputStream out) throws IOException {
|
||||
var readOnlyBuffer = byteBuffer.asReadOnlyBuffer();
|
||||
var barr = new byte[readOnlyBuffer.remaining()];
|
||||
readOnlyBuffer.get(barr);
|
||||
out.writeInt(barr.length);
|
||||
out.write(barr);
|
||||
}
|
||||
|
||||
@Serial
|
||||
private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
|
||||
int length = in.readInt();
|
||||
ByteBuffer byteBuffer = UninitializedByteBuffer.allocateUninitialized(length);
|
||||
var bytes = new byte[length];
|
||||
in.readFully(bytes);
|
||||
byteBuffer.put(bytes);
|
||||
byteBuffer.flip();
|
||||
this.byteBuffer = byteBuffer;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (o == null || getClass() != o.getClass()) return false;
|
||||
SerializableByteBufferWrapper that = (SerializableByteBufferWrapper) o;
|
||||
return Objects.equals(byteBuffer, that.byteBuffer);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hashCode(byteBuffer);
|
||||
}
|
||||
}
|
||||
|
||||
JObjectKeyImpl(ByteBuffer byteBuffer) {
|
||||
this(new SerializableByteBufferWrapper(byteBuffer));
|
||||
}
|
||||
|
||||
public record JObjectKeyImpl(String value) implements JObjectKey {
|
||||
@Override
|
||||
public int compareTo(JObjectKey o) {
|
||||
switch (o) {
|
||||
case JObjectKeyImpl jObjectKeyImpl -> {
|
||||
return value.compareTo(jObjectKeyImpl.value());
|
||||
return rawValue.getByteBuffer().compareTo(jObjectKeyImpl.rawValue().getByteBuffer());
|
||||
}
|
||||
case JObjectKeyMax jObjectKeyMax -> {
|
||||
return -1;
|
||||
@@ -23,21 +78,18 @@ public record JObjectKeyImpl(String value) implements JObjectKey {
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return value;
|
||||
var encoded = Base64.getEncoder().encode(toByteBuffer());
|
||||
return StandardCharsets.US_ASCII.decode(encoded).toString();
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] bytes() {
|
||||
return value.getBytes(StandardCharsets.UTF_8);
|
||||
public ByteString value() {
|
||||
return UnsafeByteOperations.unsafeWrap(toByteBuffer());
|
||||
}
|
||||
|
||||
@Override
|
||||
public ByteBuffer toByteBuffer() {
|
||||
var heapBb = StandardCharsets.UTF_8.encode(value);
|
||||
if (heapBb.isDirect()) return heapBb;
|
||||
var directBb = UninitializedByteBuffer.allocateUninitialized(heapBb.remaining());
|
||||
directBb.put(heapBb);
|
||||
directBb.flip();
|
||||
return directBb;
|
||||
return rawValue.getByteBuffer().asReadOnlyBuffer();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -1,5 +1,7 @@
|
||||
package com.usatiuk.objects;
|
||||
|
||||
import com.google.protobuf.ByteString;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
public record JObjectKeyMax() implements JObjectKey {
|
||||
@@ -18,18 +20,13 @@ public record JObjectKeyMax() implements JObjectKey {
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] bytes() {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ByteBuffer toByteBuffer() {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String value() {
|
||||
public ByteString value() {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,5 +1,7 @@
|
||||
package com.usatiuk.objects;
|
||||
|
||||
import com.google.protobuf.ByteString;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
public record JObjectKeyMin() implements JObjectKey {
|
||||
@@ -18,18 +20,13 @@ public record JObjectKeyMin() implements JObjectKey {
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] bytes() {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ByteBuffer toByteBuffer() {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String value() {
|
||||
public ByteString value() {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -30,7 +30,6 @@ import java.lang.ref.Cleaner;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.nio.file.Path;
|
||||
import java.util.Arrays;
|
||||
import java.util.NoSuchElementException;
|
||||
import java.util.Optional;
|
||||
|
||||
@@ -118,10 +117,19 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore {
|
||||
private final long _id = commitId;
|
||||
private boolean _closed = false;
|
||||
|
||||
private final boolean byteStringAndByteArrEquals(ByteString bs, byte[] arr) {
|
||||
if (bs.size() != arr.length) return false;
|
||||
for (int i = 0; i < arr.length; i++) {
|
||||
if (bs.byteAt(i) != arr[i]) return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public CloseableKvIterator<JObjectKey, ByteString> getIterator(IteratorStart start, JObjectKey key) {
|
||||
assert !_closed;
|
||||
return new KeyPredicateKvIterator<>(new LmdbKvIterator(_txn.ref(), start, key), start, key, (k) -> !Arrays.equals(k.value().getBytes(StandardCharsets.UTF_8), DB_VER_OBJ_NAME));
|
||||
return new KeyPredicateKvIterator<>(new LmdbKvIterator(_txn.ref(), start, key), start, key,
|
||||
(k) -> !byteStringAndByteArrEquals(k.value(), DB_VER_OBJ_NAME));
|
||||
}
|
||||
|
||||
@Nonnull
|
||||
|
||||
@@ -153,7 +153,7 @@ public class JObjectManager {
|
||||
|
||||
if (!writes.isEmpty()) {
|
||||
Stream.concat(readSet.keySet().stream(), writes.keySet().stream())
|
||||
.sorted(Comparator.comparing(JObjectKey::toString))
|
||||
.sorted()
|
||||
.forEach(k -> {
|
||||
var lock = lockManager.lockObject(k);
|
||||
toUnlock.add(lock);
|
||||
|
||||
@@ -10,13 +10,17 @@ public record PeerId(JObjectKey id) implements Serializable, Comparable<PeerId>
|
||||
return new PeerId(JObjectKey.of(id));
|
||||
}
|
||||
|
||||
public static PeerId of(JObjectKey id) {
|
||||
return new PeerId(id);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return id.toString();
|
||||
}
|
||||
|
||||
public JObjectKey toJObjectKey() {
|
||||
return JObjectKey.of(id.toString());
|
||||
return id();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
package com.usatiuk.dhfs;
|
||||
|
||||
import com.google.protobuf.ByteString;
|
||||
import com.usatiuk.objects.JObjectKey;
|
||||
import org.pcollections.*;
|
||||
|
||||
@@ -14,6 +15,9 @@ public record RemoteObjectMeta(PCollection<JDataRef> refsFrom, boolean frozen,
|
||||
boolean seen,
|
||||
PMap<PeerId, Long> changelog,
|
||||
boolean hasLocalData) implements JDataRefcounted {
|
||||
|
||||
public static final ByteString DATA_SUFFIX = ByteString.copyFromUtf8(".data");
|
||||
|
||||
// Self put
|
||||
public RemoteObjectMeta(JDataRemote data, PeerId initialPeer) {
|
||||
this(HashTreePSet.empty(), false,
|
||||
@@ -41,7 +45,7 @@ public record RemoteObjectMeta(PCollection<JDataRef> refsFrom, boolean frozen,
|
||||
}
|
||||
|
||||
public static JObjectKey ofDataKey(JObjectKey key) {
|
||||
return JObjectKey.of("data_" + key.value());
|
||||
return JObjectKey.of(key.value().concat(DATA_SUFFIX));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
package com.usatiuk.dhfs.jkleppmanntree;
|
||||
|
||||
import com.google.protobuf.ByteString;
|
||||
import com.usatiuk.objects.JObjectKey;
|
||||
import com.usatiuk.dhfs.PeerId;
|
||||
import com.usatiuk.dhfs.jkleppmanntree.structs.*;
|
||||
@@ -35,6 +36,10 @@ public class JKleppmannTreeManager {
|
||||
@Inject
|
||||
PersistentPeerDataService persistentPeerDataService;
|
||||
|
||||
public static final ByteString ROOT_SUFFIX = ByteString.copyFromUtf8("_jt_root");
|
||||
public static final ByteString TRASH_SUFFIX = ByteString.copyFromUtf8("_jt_trash");
|
||||
public static final ByteString LF_SUFFIX = ByteString.copyFromUtf8("_jt_lf");
|
||||
|
||||
public JKleppmannTree getTree(JObjectKey name, LockingStrategy lockingStrategy) {
|
||||
return txManager.executeTx(() -> {
|
||||
var data = curTx.get(JKleppmannTreePersistentData.class, name, lockingStrategy).orElse(null);
|
||||
@@ -49,11 +54,11 @@ public class JKleppmannTreeManager {
|
||||
TreePMap.empty()
|
||||
);
|
||||
curTx.put(data);
|
||||
var rootNode = new JKleppmannTreeNode(JObjectKey.of(name.value() + "_jt_root"), null, new JKleppmannTreeNodeMetaDirectory(""));
|
||||
var rootNode = new JKleppmannTreeNode(JObjectKey.of(name.value().concat(ROOT_SUFFIX)), null, new JKleppmannTreeNodeMetaDirectory(""));
|
||||
curTx.put(rootNode);
|
||||
var trashNode = new JKleppmannTreeNode(JObjectKey.of(name.value() + "_jt_trash"), null, new JKleppmannTreeNodeMetaDirectory(""));
|
||||
var trashNode = new JKleppmannTreeNode(JObjectKey.of(name.value().concat(TRASH_SUFFIX)), null, new JKleppmannTreeNodeMetaDirectory(""));
|
||||
curTx.put(trashNode);
|
||||
var lf_node = new JKleppmannTreeNode(JObjectKey.of(name.value() + "_jt_lf"), null, new JKleppmannTreeNodeMetaDirectory(""));
|
||||
var lf_node = new JKleppmannTreeNode(JObjectKey.of(name.value().concat(LF_SUFFIX)), null, new JKleppmannTreeNodeMetaDirectory(""));
|
||||
curTx.put(lf_node);
|
||||
}
|
||||
return new JKleppmannTree(data);
|
||||
@@ -258,17 +263,17 @@ public class JKleppmannTreeManager {
|
||||
|
||||
@Override
|
||||
public JObjectKey getRootId() {
|
||||
return JObjectKey.of(_treeName.value() + "_jt_root");
|
||||
return JObjectKey.of(_treeName.value().concat(ROOT_SUFFIX));
|
||||
}
|
||||
|
||||
@Override
|
||||
public JObjectKey getTrashId() {
|
||||
return JObjectKey.of(_treeName.value() + "_jt_trash");
|
||||
return JObjectKey.of(_treeName.value().concat(TRASH_SUFFIX));
|
||||
}
|
||||
|
||||
@Override
|
||||
public JObjectKey getLostFoundId() {
|
||||
return JObjectKey.of(_treeName.value() + "_jt_lf");
|
||||
return JObjectKey.of(_treeName.value().concat(LF_SUFFIX));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -2,6 +2,7 @@ package com.usatiuk.dhfs.jkleppmanntree.structs;
|
||||
|
||||
import com.usatiuk.dhfs.JDataRef;
|
||||
import com.usatiuk.dhfs.JDataRefcounted;
|
||||
import com.usatiuk.dhfs.jkleppmanntree.JKleppmannTreeManager;
|
||||
import com.usatiuk.objects.JObjectKey;
|
||||
import com.usatiuk.dhfs.PeerId;
|
||||
import com.usatiuk.kleppmanntree.CombinedTimestamp;
|
||||
@@ -49,6 +50,6 @@ public record JKleppmannTreePersistentData(
|
||||
|
||||
@Override
|
||||
public Collection<JObjectKey> collectRefsTo() {
|
||||
return List.of(JObjectKey.of(key().value() + "_jt_trash"), JObjectKey.of(key().value() + "_jt_root"), JObjectKey.of(key().value() + "_jt_lf"));
|
||||
return List.of(JObjectKey.of(key().value().concat(JKleppmannTreeManager.ROOT_SUFFIX)), JObjectKey.of(key().value().concat(JKleppmannTreeManager.TRASH_SUFFIX)), JObjectKey.of(key().value().concat(JKleppmannTreeManager.LF_SUFFIX)));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
package com.usatiuk.dhfs.jmap;
|
||||
|
||||
import com.google.protobuf.ByteString;
|
||||
import com.usatiuk.objects.JObjectKey;
|
||||
import com.usatiuk.objects.iterators.CloseableKvIterator;
|
||||
import com.usatiuk.objects.iterators.IteratorStart;
|
||||
@@ -16,20 +17,24 @@ public class JMapHelper {
|
||||
@Inject
|
||||
Transaction curTx;
|
||||
|
||||
public static final ByteString DATA_SUFFIX = ByteString.copyFromUtf8("=");
|
||||
public static final ByteString FIRST_SUFFIX = ByteString.copyFromUtf8("<");
|
||||
public static final ByteString LAST_SUFFIX = ByteString.copyFromUtf8(">");
|
||||
|
||||
static <K extends JMapKey> JObjectKey makePrefix(JObjectKey holder) {
|
||||
return JObjectKey.of(holder.value() + "=");
|
||||
return JObjectKey.of(holder.value().concat(DATA_SUFFIX));
|
||||
}
|
||||
|
||||
static <K extends JMapKey> JObjectKey makeKeyFirst(JObjectKey holder) {
|
||||
return JObjectKey.of(holder.value() + "<");
|
||||
return JObjectKey.of(holder.value().concat(FIRST_SUFFIX));
|
||||
}
|
||||
|
||||
static <K extends JMapKey> JObjectKey makeKey(JObjectKey holder, K key) {
|
||||
return JObjectKey.of(makePrefix(holder).value() + key.toString());
|
||||
return JObjectKey.of(makePrefix(holder).value().concat(key.value()));
|
||||
}
|
||||
|
||||
static <K extends JMapKey> JObjectKey makeKeyLast(JObjectKey holder) {
|
||||
return JObjectKey.of(holder.value() + ">");
|
||||
return JObjectKey.of(holder.value().concat(LAST_SUFFIX));
|
||||
}
|
||||
|
||||
public <K extends JMapKey> CloseableKvIterator<K, JMapEntry<K>> getIterator(JMapHolder<K> holder, IteratorStart start, K key) {
|
||||
|
||||
@@ -32,8 +32,8 @@ public class JMapIterator<K extends JMapKey> implements CloseableKvIterator<K, J
|
||||
}
|
||||
|
||||
public K keyToKey(JObjectKey key) {
|
||||
var keyPart = key.value().substring(_prefix.value().length());
|
||||
return (K) JMapLongKey.of(Long.parseLong(keyPart));
|
||||
var keyPart = key.value().substring(_prefix.value().size());
|
||||
return (K) JMapLongKey.of(keyPart.asReadOnlyByteBuffer().getLong());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -1,4 +1,7 @@
|
||||
package com.usatiuk.dhfs.jmap;
|
||||
|
||||
import com.google.protobuf.ByteString;
|
||||
|
||||
public interface JMapKey extends Comparable<JMapKey> {
|
||||
public ByteString value();
|
||||
}
|
||||
|
||||
@@ -1,9 +1,13 @@
|
||||
package com.usatiuk.dhfs.jmap;
|
||||
|
||||
import com.google.protobuf.ByteString;
|
||||
import com.google.protobuf.UnsafeByteOperations;
|
||||
import com.usatiuk.dhfs.supportlib.UninitializedByteBuffer;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
|
||||
import javax.annotation.Nonnull;
|
||||
import java.io.Serializable;
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
public record JMapLongKey(long key) implements JMapKey, Comparable<JMapKey>, Serializable {
|
||||
public static JMapLongKey of(long key) {
|
||||
@@ -14,9 +18,11 @@ public record JMapLongKey(long key) implements JMapKey, Comparable<JMapKey>, Ser
|
||||
return new JMapLongKey(Long.MAX_VALUE);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return StringUtils.leftPad(String.valueOf(key), 20, '0');
|
||||
public ByteString value() {
|
||||
var newByteBuffer = ByteBuffer.allocate(Long.BYTES);
|
||||
newByteBuffer.putLong(key);
|
||||
newByteBuffer.flip();
|
||||
return UnsafeByteOperations.unsafeWrap(newByteBuffer);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -51,7 +51,7 @@ public class RemoteObjectServiceClient {
|
||||
|
||||
public Pair<PeerId, ReceivedObject> getSpecificObject(JObjectKey key, PeerId peerId) {
|
||||
return rpcClientFactory.withObjSyncClient(peerId, (peer, client) -> {
|
||||
var reply = client.getObject(GetObjectRequest.newBuilder().setName(JObjectKeyP.newBuilder().setName(key.toString()).build()).build());
|
||||
var reply = client.getObject(GetObjectRequest.newBuilder().setName(JObjectKeyP.newBuilder().setName(key.value()).build()).build());
|
||||
var deserialized = receivedObjectProtoSerializer.deserialize(reply);
|
||||
return Pair.of(peer, deserialized);
|
||||
});
|
||||
@@ -77,7 +77,7 @@ public class RemoteObjectServiceClient {
|
||||
Log.info("Downloading object " + key + " from " + targets);
|
||||
|
||||
rpcClientFactory.withObjSyncClient(targets, (peer, client) -> {
|
||||
var reply = client.getObject(GetObjectRequest.newBuilder().setName(JObjectKeyP.newBuilder().setName(key.toString()).build()).build());
|
||||
var reply = client.getObject(GetObjectRequest.newBuilder().setName(JObjectKeyP.newBuilder().setName(key.value()).build()).build());
|
||||
|
||||
var deserialized = receivedObjectProtoSerializer.deserialize(reply);
|
||||
|
||||
@@ -110,9 +110,9 @@ public class RemoteObjectServiceClient {
|
||||
Log.trace("Asking canDelete for " + objKey + " from " + targets.stream().map(PeerId::toString).collect(Collectors.joining(", ")));
|
||||
try {
|
||||
return _batchExecutor.invokeAll(targets.stream().<Callable<Pair<PeerId, CanDeleteReply>>>map(h -> () -> {
|
||||
var req = CanDeleteRequest.newBuilder().setName(JObjectKeyP.newBuilder().setName(objKey.toString()).build());
|
||||
var req = CanDeleteRequest.newBuilder().setName(JObjectKeyP.newBuilder().setName(objKey.value()).build());
|
||||
for (var ref : ourReferrers) {
|
||||
req.addOurReferrers(JObjectKeyP.newBuilder().setName(ref.obj().toString()).build());
|
||||
req.addOurReferrers(JObjectKeyP.newBuilder().setName(ref.obj().value()).build());
|
||||
}
|
||||
return Pair.of(h, rpcClientFactory.withObjSyncClient(h, (p, client) -> client.canDelete(req.build())));
|
||||
}).toList()).stream().map(f -> {
|
||||
|
||||
@@ -88,7 +88,7 @@ public class RemoteObjectServiceServerImpl {
|
||||
|
||||
if (!builder.getDeletionCandidate()) {
|
||||
for (var r : obj.refsFrom()) {
|
||||
builder.addReferrers(JObjectKeyP.newBuilder().setName(r.obj().toString()).build());
|
||||
builder.addReferrers(JObjectKeyP.newBuilder().setName(r.obj().value()).build());
|
||||
curTx.onCommit(() -> autosyncProcessor.add(r.obj()));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -6,6 +6,7 @@ import com.usatiuk.dhfs.ReceivedObject;
|
||||
import com.usatiuk.dhfs.persistence.JDataRemoteDtoP;
|
||||
import com.usatiuk.dhfs.persistence.JObjectKeyP;
|
||||
import com.usatiuk.dhfs.persistence.PeerIdP;
|
||||
import com.usatiuk.objects.JObjectKey;
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
import jakarta.inject.Inject;
|
||||
import org.pcollections.HashTreePMap;
|
||||
@@ -20,7 +21,7 @@ public class TemporaryReceivedObjectSerializer implements ProtoSerializer<GetObj
|
||||
public ReceivedObject deserialize(GetObjectReply message) {
|
||||
PMap<PeerId, Long> changelog = HashTreePMap.empty();
|
||||
for (var entry : message.getChangelog().getEntriesList()) {
|
||||
changelog = changelog.plus(PeerId.of(entry.getKey().getId().getName()), entry.getValue());
|
||||
changelog = changelog.plus(PeerId.of(JObjectKey.of(entry.getKey().getId().getName())), entry.getValue());
|
||||
}
|
||||
var data = remoteObjectSerializer.deserialize(message.getPushedData());
|
||||
return new ReceivedObject(changelog, data);
|
||||
@@ -32,7 +33,7 @@ public class TemporaryReceivedObjectSerializer implements ProtoSerializer<GetObj
|
||||
var changelogBuilder = builder.getChangelogBuilder();
|
||||
object.changelog().forEach((peer, version) -> {
|
||||
changelogBuilder.addEntriesBuilder()
|
||||
.setKey(PeerIdP.newBuilder().setId(JObjectKeyP.newBuilder().setName(peer.id().toString()).build()).build())
|
||||
.setKey(PeerIdP.newBuilder().setId(JObjectKeyP.newBuilder().setName(peer.id().value()).build()).build())
|
||||
.setValue(version);
|
||||
});
|
||||
builder.setPushedData(remoteObjectSerializer.serialize(object.data()));
|
||||
|
||||
@@ -51,7 +51,7 @@ public class PeerInfoService {
|
||||
|
||||
public boolean existsPeer(PeerId peer) {
|
||||
return jObjectTxManager.run(() -> {
|
||||
var gotKey = getTreeR().traverse(List.of(JKleppmannTreeNodeMetaPeer.peerIdToNodeId(peer).value()));
|
||||
var gotKey = getTreeR().traverse(List.of(JKleppmannTreeNodeMetaPeer.peerIdToNodeId(peer).toString()));
|
||||
if (gotKey == null) {
|
||||
return false;
|
||||
}
|
||||
@@ -61,7 +61,7 @@ public class PeerInfoService {
|
||||
|
||||
public Optional<PeerInfo> getPeerInfo(PeerId peer) {
|
||||
return jObjectTxManager.run(() -> {
|
||||
var gotKey = getTreeR().traverse(List.of(JKleppmannTreeNodeMetaPeer.peerIdToNodeId(peer).value()));
|
||||
var gotKey = getTreeR().traverse(List.of(JKleppmannTreeNodeMetaPeer.peerIdToNodeId(peer).toString()));
|
||||
if (gotKey == null) {
|
||||
return Optional.empty();
|
||||
}
|
||||
@@ -114,7 +114,7 @@ public class PeerInfoService {
|
||||
|
||||
public void removePeer(PeerId id) {
|
||||
jObjectTxManager.run(() -> {
|
||||
var gotKey = getTreeR().traverse(List.of(JKleppmannTreeNodeMetaPeer.peerIdToNodeId(id).value()));
|
||||
var gotKey = getTreeR().traverse(List.of(JKleppmannTreeNodeMetaPeer.peerIdToNodeId(id).toString()));
|
||||
if (gotKey == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -1,29 +1,34 @@
|
||||
package com.usatiuk.dhfs.repository.peersync.structs;
|
||||
|
||||
import com.google.protobuf.ByteString;
|
||||
import com.google.protobuf.UnsafeByteOperations;
|
||||
import com.usatiuk.objects.JObjectKey;
|
||||
import com.usatiuk.dhfs.PeerId;
|
||||
import com.usatiuk.dhfs.jkleppmanntree.structs.JKleppmannTreeNodeMeta;
|
||||
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.Objects;
|
||||
|
||||
//@ProtoMirror(JKleppmannTreeNodeMetaFileP.class)
|
||||
public class JKleppmannTreeNodeMetaPeer extends JKleppmannTreeNodeMeta {
|
||||
private final JObjectKey _peerId;
|
||||
private static final byte[] SUFFIX = "_tree_node".getBytes(StandardCharsets.UTF_8);
|
||||
private static final ByteString SUFFIX_BS = UnsafeByteOperations.unsafeWrap(SUFFIX);
|
||||
|
||||
public JKleppmannTreeNodeMetaPeer(PeerId id) {
|
||||
super(peerIdToNodeId(id).value());
|
||||
super(peerIdToNodeId(id).toString());
|
||||
_peerId = id.toJObjectKey();
|
||||
}
|
||||
|
||||
public static JObjectKey peerIdToNodeId(PeerId id) {
|
||||
return JObjectKey.of(id.toJObjectKey().value() + "_tree_node");
|
||||
return JObjectKey.of(id.toJObjectKey().value().concat(SUFFIX_BS));
|
||||
}
|
||||
|
||||
public static PeerId nodeIdToPeerId(JObjectKey id) {
|
||||
if (!id.value().endsWith("_tree_node")) {
|
||||
if (!id.value().endsWith(SUFFIX_BS)) {
|
||||
throw new IllegalArgumentException("Not a tree node key: " + id);
|
||||
}
|
||||
return PeerId.of(id.value().substring(0, id.value().length() - "_tree_node".length()));
|
||||
return PeerId.of(JObjectKey.of(id.value().substring(0, id.value().size() - SUFFIX_BS.size())));
|
||||
}
|
||||
|
||||
public JObjectKey getPeerId() {
|
||||
@@ -32,7 +37,7 @@ public class JKleppmannTreeNodeMetaPeer extends JKleppmannTreeNodeMeta {
|
||||
|
||||
@Override
|
||||
public JKleppmannTreeNodeMeta withName(String name) {
|
||||
assert name.equals(peerIdToNodeId(PeerId.of(getPeerId().value())).toString());
|
||||
assert name.equals(peerIdToNodeId(PeerId.of(getPeerId())).toString());
|
||||
assert getName().equals(name);
|
||||
return this;
|
||||
}
|
||||
|
||||
@@ -7,7 +7,7 @@ option java_outer_classname = "DhfsObjectPersistence";
|
||||
package dhfs.persistence;
|
||||
|
||||
message JObjectKeyP {
|
||||
string name = 1;
|
||||
bytes name = 1;
|
||||
}
|
||||
|
||||
message PeerIdP {
|
||||
|
||||
Reference in New Issue
Block a user