6 Commits

Author SHA1 Message Date
a6a4101bb0 Objects: use bytebuffer to read
a little less GC pressure
2025-04-18 13:21:04 +02:00
59fa5dcf28 Fixie for HashSetDelayedBlockingQueueTest 2025-04-18 13:08:55 +02:00
0f5fb8b8b6 Objects: PBT MergingIterator test 2025-04-18 13:08:40 +02:00
c087dd8971 More microoptimizations 3! 2025-04-18 12:13:22 +02:00
14ddddd0ff Sync-base: use serialized certificate in self data
makes it easier to switch serialization
2025-04-18 11:06:40 +02:00
9859378627 Sync-base: move "_data" to suffix
makes cache much less bad
2025-04-18 11:06:15 +02:00
9 changed files with 292 additions and 16 deletions

View File

@@ -33,6 +33,7 @@ import jakarta.inject.Inject;
import org.apache.commons.lang3.tuple.Pair;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.util.*;
@@ -410,7 +411,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
if (existingEnd < offset) {
if (!pendingPrefix.isEmpty()) {
int diff = Math.toIntExact(offset - existingEnd);
pendingPrefix = pendingPrefix.concat(ByteString.copyFrom(new byte[diff]));
pendingPrefix = pendingPrefix.concat(UnsafeByteOperations.unsafeWrap(ByteBuffer.allocateDirect(diff)));
} else {
fillZeros(existingEnd, offset, newChunks);
start = offset;
@@ -572,7 +573,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
}
if (!zeroCache.containsKey(end - cur))
zeroCache.put(end - cur, createChunk(UnsafeByteOperations.unsafeWrap(new byte[Math.toIntExact(end - cur)])));
zeroCache.put(end - cur, createChunk(UnsafeByteOperations.unsafeWrap(ByteBuffer.allocateDirect(Math.toIntExact(end - cur)))));
ChunkData newChunkData = zeroCache.get(end - cur);
newChunks.put(start, newChunkData.key());

View File

@@ -18,6 +18,11 @@
</properties>
<dependencies>
<dependency>
<groupId>net.jqwik</groupId>
<artifactId>jqwik</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-junit5</artifactId>

View File

@@ -360,13 +360,12 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore {
throw new NoSuchElementException("No more elements");
}
// TODO: Right now with java serialization it doesn't matter, it's all copied to arrays anyway
// var val = _cursor.val();
// var bbDirect = UninitializedByteBuffer.allocateUninitialized(val.remaining());
// bbDirect.put(val);
// bbDirect.flip();
// var bs = UnsafeByteOperations.unsafeWrap(bbDirect);
// var ret = Pair.of(JObjectKey.fromByteBuffer(_cursor.key()), bs);
var ret = Pair.of(JObjectKey.fromByteBuffer(_cursor.key()), ByteString.copyFrom(_cursor.val()));
var val = _cursor.val();
var bbDirect = UninitializedByteBuffer.allocateUninitialized(val.remaining());
bbDirect.put(val);
bbDirect.flip();
var bs = UnsafeByteOperations.unsafeWrap(bbDirect);
var ret = Pair.of(JObjectKey.fromByteBuffer(_cursor.key()), bs);
if (_goingForward)
_hasNext = _cursor.next();
else

View File

@@ -0,0 +1,262 @@
package com.usatiuk.objects.iterators;
import net.jqwik.api.*;
import net.jqwik.api.state.Action;
import net.jqwik.api.state.ActionChain;
import org.apache.commons.lang3.tuple.Pair;
import org.junit.jupiter.api.Assertions;
import java.util.*;
public class MergingKvIteratorPbtTest {
static class MergingIteratorModel implements CloseableKvIterator<Integer, Integer> {
private final CloseableKvIterator<Integer, Integer> mergedIterator;
private final CloseableKvIterator<Integer, Integer> mergingIterator;
private MergingIteratorModel(List<List<Map.Entry<Integer, Integer>>> pairs, IteratorStart startType, Integer startKey) {
TreeMap<Integer, Integer> perfectMerged = new TreeMap<>();
for (List<Map.Entry<Integer, Integer>> list : pairs) {
for (Map.Entry<Integer, Integer> pair : list) {
perfectMerged.putIfAbsent(pair.getKey(), pair.getValue());
}
}
mergedIterator = new NavigableMapKvIterator<>(perfectMerged, startType, startKey);
mergingIterator = new MergingKvIterator<>("test", startType, startKey, pairs.stream().<IterProdFn<Integer, Integer>>map(
list -> (IteratorStart start, Integer key) -> new NavigableMapKvIterator<>(new TreeMap<Integer, Integer>(Map.ofEntries(list.toArray(Map.Entry[]::new))), start, key)
).toList());
}
@Override
public Integer peekNextKey() {
var mergedKey = mergedIterator.peekNextKey();
var mergingKey = mergingIterator.peekNextKey();
Assertions.assertEquals(mergedKey, mergingKey);
return mergedKey;
}
@Override
public void skip() {
mergedIterator.skip();
mergingIterator.skip();
}
@Override
public Integer peekPrevKey() {
var mergedKey = mergedIterator.peekPrevKey();
var mergingKey = mergingIterator.peekPrevKey();
Assertions.assertEquals(mergedKey, mergingKey);
return mergedKey;
}
@Override
public Pair<Integer, Integer> prev() {
var mergedKey = mergedIterator.prev();
var mergingKey = mergingIterator.prev();
Assertions.assertEquals(mergedKey, mergingKey);
return mergedKey;
}
@Override
public boolean hasPrev() {
var mergedKey = mergedIterator.hasPrev();
var mergingKey = mergingIterator.hasPrev();
Assertions.assertEquals(mergedKey, mergingKey);
return mergedKey;
}
@Override
public void skipPrev() {
mergedIterator.skipPrev();
mergingIterator.skipPrev();
}
@Override
public void close() {
mergedIterator.close();
mergingIterator.close();
}
@Override
public boolean hasNext() {
var mergedKey = mergedIterator.hasNext();
var mergingKey = mergingIterator.hasNext();
Assertions.assertEquals(mergedKey, mergingKey);
return mergedKey;
}
@Override
public Pair<Integer, Integer> next() {
var mergedKey = mergedIterator.next();
var mergingKey = mergingIterator.next();
Assertions.assertEquals(mergedKey, mergingKey);
return mergedKey;
}
}
static class PeekNextKeyAction extends Action.JustMutate<MergingIteratorModel> {
@Override
public void mutate(MergingIteratorModel state) {
state.peekNextKey();
}
@Override
public boolean precondition(MergingIteratorModel state) {
return state.hasNext();
}
@Override
public String description() {
return "Peek next key";
}
}
static class SkipAction extends Action.JustMutate<MergingIteratorModel> {
@Override
public void mutate(MergingIteratorModel state) {
state.skip();
}
@Override
public boolean precondition(MergingIteratorModel state) {
return state.hasNext();
}
@Override
public String description() {
return "Skip next key";
}
}
static class PeekPrevKeyAction extends Action.JustMutate<MergingIteratorModel> {
@Override
public void mutate(MergingIteratorModel state) {
state.peekPrevKey();
}
@Override
public boolean precondition(MergingIteratorModel state) {
return state.hasPrev();
}
@Override
public String description() {
return "Peek prev key";
}
}
static class SkipPrevAction extends Action.JustMutate<MergingIteratorModel> {
@Override
public void mutate(MergingIteratorModel state) {
state.skipPrev();
}
@Override
public boolean precondition(MergingIteratorModel state) {
return state.hasPrev();
}
@Override
public String description() {
return "Skip prev key";
}
}
static class PrevAction extends Action.JustMutate<MergingIteratorModel> {
@Override
public void mutate(MergingIteratorModel state) {
state.prev();
}
@Override
public boolean precondition(MergingIteratorModel state) {
return state.hasPrev();
}
@Override
public String description() {
return "Prev key";
}
}
static class NextAction extends Action.JustMutate<MergingIteratorModel> {
@Override
public void mutate(MergingIteratorModel state) {
state.next();
}
@Override
public boolean precondition(MergingIteratorModel state) {
return state.hasNext();
}
@Override
public String description() {
return "Next key";
}
}
static class HasNextAction extends Action.JustMutate<MergingIteratorModel> {
@Override
public void mutate(MergingIteratorModel state) {
state.hasNext();
}
@Override
public boolean precondition(MergingIteratorModel state) {
return true;
}
@Override
public String description() {
return "Has next key";
}
}
static class HasPrevAction extends Action.JustMutate<MergingIteratorModel> {
@Override
public void mutate(MergingIteratorModel state) {
state.hasPrev();
}
@Override
public boolean precondition(MergingIteratorModel state) {
return true;
}
@Override
public String description() {
return "Has prev key";
}
}
@Property
public void checkMergingIterator(@ForAll("actions") ActionChain<MergingIteratorModel> actions) {
actions.run();
}
@Provide
Arbitrary<ActionChain<MergingIteratorModel>> actions(@ForAll("lists") List<List<Map.Entry<Integer, Integer>>> list,
@ForAll IteratorStart iteratorStart, @ForAll("startKey") Integer startKey) {
return ActionChain.startWith(() -> new MergingIteratorModel(list, iteratorStart, startKey))
.withAction(new NextAction())
.withAction(new PeekNextKeyAction())
.withAction(new SkipAction())
.withAction(new PeekPrevKeyAction())
.withAction(new SkipPrevAction())
.withAction(new PrevAction())
.withAction(new HasNextAction())
.withAction(new HasPrevAction());
}
@Provide
Arbitrary<List<List<Map.Entry<Integer, Integer>>>> lists() {
return Arbitraries.entries(Arbitraries.integers().between(-50, 50), Arbitraries.integers().between(-50, 50))
.list().uniqueElements(Map.Entry::getKey).ofMinSize(0).ofMaxSize(20)
.list().ofMinSize(1).ofMaxSize(5);
}
@Provide
Arbitrary<Integer> startKey() {
return Arbitraries.integers().between(-51, 51);
}
}

View File

@@ -52,6 +52,12 @@
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>net.jqwik</groupId>
<artifactId>jqwik</artifactId>
<version>1.9.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>

View File

@@ -41,7 +41,7 @@ public record RemoteObjectMeta(PCollection<JDataRef> refsFrom, boolean frozen,
}
public static JObjectKey ofDataKey(JObjectKey key) {
return JObjectKey.of("data_" + key.value());
return JObjectKey.of(key.value() + "_data");
}
@Override

View File

@@ -1,11 +1,13 @@
package com.usatiuk.dhfs.repository;
import com.google.protobuf.ByteString;
import com.usatiuk.dhfs.PeerId;
import com.usatiuk.dhfs.ShutdownChecker;
import com.usatiuk.dhfs.repository.peerdiscovery.IpPeerAddress;
import com.usatiuk.dhfs.repository.peerdiscovery.PeerAddressType;
import com.usatiuk.dhfs.repository.peersync.PeerInfoService;
import com.usatiuk.dhfs.repository.peertrust.PeerTrustManager;
import com.usatiuk.dhfs.utils.SerializationHelper;
import com.usatiuk.objects.transaction.Transaction;
import com.usatiuk.objects.transaction.TransactionManager;
import io.quarkus.logging.Log;
@@ -64,8 +66,8 @@ public class PersistentPeerDataService {
var selfData = curTx.get(PersistentRemoteHostsData.class, PersistentRemoteHostsData.KEY).orElse(null);
if (selfData != null) {
_selfUuid = selfData.selfUuid();
_selfCertificate = selfData.selfCertificate();
_selfKeyPair = selfData.selfKeyPair();
_selfCertificate = CertificateTools.certFromBytes(selfData.selfCertificate().toByteArray());
_selfKeyPair = SerializationHelper.deserialize(selfData.selfKeyPair().toByteArray());
return;
} else {
try {
@@ -74,7 +76,7 @@ public class PersistentPeerDataService {
_selfKeyPair = CertificateTools.generateKeyPair();
_selfCertificate = CertificateTools.generateCertificate(_selfKeyPair, _selfUuid.toString());
curTx.put(new PersistentRemoteHostsData(_selfUuid, _selfCertificate, _selfKeyPair, HashTreePSet.empty(), HashTreePMap.empty()));
curTx.put(new PersistentRemoteHostsData(_selfUuid, ByteString.copyFrom(_selfCertificate.getEncoded()), SerializationHelper.serialize(_selfKeyPair), HashTreePSet.empty(), HashTreePMap.empty()));
peerInfoService.putPeer(_selfUuid, _selfCertificate.getEncoded());
} catch (CertificateEncodingException e) {
throw new RuntimeException(e);

View File

@@ -1,5 +1,6 @@
package com.usatiuk.dhfs.repository;
import com.google.protobuf.ByteString;
import com.usatiuk.dhfs.PeerId;
import com.usatiuk.dhfs.repository.peerdiscovery.IpPeerAddress;
import com.usatiuk.objects.JData;
@@ -12,8 +13,8 @@ import java.security.KeyPair;
import java.security.cert.X509Certificate;
public record PersistentRemoteHostsData(PeerId selfUuid,
X509Certificate selfCertificate,
KeyPair selfKeyPair,
ByteString selfCertificate,
ByteString selfKeyPair,
PSet<PeerId> initialSyncDone,
PMap<PeerId, IpPeerAddress> persistentPeerAddress) implements JData, Serializable {
public static final JObjectKey KEY = JObjectKey.of("self_peer_data");

View File

@@ -45,7 +45,7 @@ public class HashSetDelayedBlockingQueueTest {
queue.add("hello!");
Assertions.assertEquals("hello!", queue.get());
var gotTime = System.currentTimeMillis();
Assertions.assertTrue((gotTime - curTime) <= 10);
Assertions.assertTrue((gotTime - curTime) <= 50);
}
@Test