2 Commits

Author SHA1 Message Date
59e8f6a6b4 Objects: one less copy when serializing
only cache what was really read, otherwise its lifetime is the same as transaction
2025-04-19 11:03:26 +02:00
0292df7f0e Objects: faster JObjectKey 2025-04-19 11:02:30 +02:00
9 changed files with 91 additions and 38 deletions

View File

@@ -6,6 +6,7 @@ public class JDataVersionedWrapperLazy implements JDataVersionedWrapper {
private final long _version;
private final int _estimatedSize;
private Supplier<JData> _producer;
private Runnable _cacheCallback;
private JData _data;
public JDataVersionedWrapperLazy(long version, int estimatedSize, Supplier<JData> producer) {
@@ -14,6 +15,14 @@ public class JDataVersionedWrapperLazy implements JDataVersionedWrapper {
_producer = producer;
}
public void setCacheCallback(Runnable cacheCallback) {
if (_data != null) {
throw new IllegalStateException("Cache callback can be set only before data is loaded");
}
_cacheCallback = cacheCallback;
}
public JData data() {
if (_data != null)
return _data;
@@ -23,6 +32,10 @@ public class JDataVersionedWrapperLazy implements JDataVersionedWrapper {
return _data;
_data = _producer.get();
if (_cacheCallback != null) {
_cacheCallback.run();
_cacheCallback = null;
}
_producer = null;
return _data;
}

View File

@@ -2,12 +2,12 @@ package com.usatiuk.objects;
import com.google.protobuf.ByteString;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import java.nio.ByteBuffer;
@ApplicationScoped
@Singleton
public class JDataVersionedWrapperSerializer implements ObjectSerializer<JDataVersionedWrapper> {
@Inject
ObjectSerializer<JData> dataSerializer;

View File

@@ -30,7 +30,9 @@ public sealed interface JObjectKey extends Serializable, Comparable<JObjectKey>
}
static JObjectKey fromByteBuffer(ByteBuffer buff) {
return new JObjectKeyImpl(StandardCharsets.ISO_8859_1.decode(buff).toString());
byte[] bytes = new byte[buff.remaining()];
buff.get(bytes);
return new JObjectKeyImpl(bytes);
}
@Override
@@ -39,8 +41,6 @@ public sealed interface JObjectKey extends Serializable, Comparable<JObjectKey>
@Override
String toString();
byte[] bytes();
ByteBuffer toByteBuffer();
String value();

View File

@@ -2,10 +2,25 @@ package com.usatiuk.objects;
import com.usatiuk.dhfs.supportlib.UninitializedByteBuffer;
import java.io.Serial;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Objects;
public final class JObjectKeyImpl implements JObjectKey {
@Serial
private static final long serialVersionUID = 0L;
private final String value;
private transient ByteBuffer _bb = null;
public JObjectKeyImpl(String value) {
this.value = value;
}
public JObjectKeyImpl(byte[] bytes) {
this.value = new String(bytes, StandardCharsets.ISO_8859_1);
}
public record JObjectKeyImpl(String value) implements JObjectKey {
@Override
public int compareTo(JObjectKey o) {
switch (o) {
@@ -27,17 +42,36 @@ public record JObjectKeyImpl(String value) implements JObjectKey {
}
@Override
public byte[] bytes() {
return value.getBytes(StandardCharsets.ISO_8859_1);
public ByteBuffer toByteBuffer() {
if (_bb != null) return _bb;
synchronized (this) {
if (_bb != null) return _bb;
var bytes = value.getBytes(StandardCharsets.ISO_8859_1);
var directBb = UninitializedByteBuffer.allocateUninitialized(bytes.length);
directBb.put(bytes);
directBb.flip();
_bb = directBb;
return directBb;
}
}
@Override
public ByteBuffer toByteBuffer() {
var heapBb = StandardCharsets.ISO_8859_1.encode(value);
if (heapBb.isDirect()) return heapBb;
var directBb = UninitializedByteBuffer.allocateUninitialized(heapBb.remaining());
directBb.put(heapBb);
directBb.flip();
return directBb;
public String value() {
return value;
}
@Override
public boolean equals(Object obj) {
if (obj == this) return true;
if (obj == null || obj.getClass() != this.getClass()) return false;
var that = (JObjectKeyImpl) obj;
return Objects.equals(this.value, that.value);
}
@Override
public int hashCode() {
return Objects.hash(value);
}
}

View File

@@ -18,11 +18,6 @@ public record JObjectKeyMax() implements JObjectKey {
}
}
@Override
public byte[] bytes() {
throw new UnsupportedOperationException();
}
@Override
public ByteBuffer toByteBuffer() {
throw new UnsupportedOperationException();

View File

@@ -18,11 +18,6 @@ public record JObjectKeyMin() implements JObjectKey {
}
}
@Override
public byte[] bytes() {
throw new UnsupportedOperationException();
}
@Override
public ByteBuffer toByteBuffer() {
throw new UnsupportedOperationException();

View File

@@ -1,6 +1,7 @@
package com.usatiuk.objects.stores;
import com.usatiuk.objects.JDataVersionedWrapper;
import com.usatiuk.objects.JDataVersionedWrapperLazy;
import com.usatiuk.objects.JObjectKey;
import com.usatiuk.objects.iterators.*;
import com.usatiuk.objects.snapshot.Snapshot;
@@ -141,10 +142,11 @@ public class CachingObjectPersistentStore {
Cache finalCurCache = curCache;
return new Snapshot<JObjectKey, JDataVersionedWrapper>() {
private boolean _invalid = false;
private boolean _closed = false;
private final Cache _curCache = finalCurCache;
private final Snapshot<JObjectKey, JDataVersionedWrapper> _backing = finalBacking;
private void maybeCache(JObjectKey key, Optional<JDataVersionedWrapper> obj) {
private void doCache(JObjectKey key, Optional<JDataVersionedWrapper> obj) {
_cacheTries.incrementAndGet();
if (_invalid)
return;
@@ -160,6 +162,29 @@ public class CachingObjectPersistentStore {
_cached.incrementAndGet();
}
private void maybeCache(JObjectKey key, Optional<JDataVersionedWrapper> obj) {
if (obj.isEmpty()) {
doCache(key, obj);
return;
}
var wrapper = obj.get();
if (!(wrapper instanceof JDataVersionedWrapperLazy lazy)) {
doCache(key, obj);
return;
}
lazy.setCacheCallback(() -> {
if (_closed) {
Log.error("Cache callback called after close");
System.exit(-1);
}
doCache(key, obj);
});
return;
}
@Override
public CloseableKvIterator<JObjectKey, JDataVersionedWrapper> getIterator(IteratorStart start, JObjectKey key) {
return new TombstoneMergingKvIterator<>("cache", start, key,
@@ -199,6 +224,7 @@ public class CachingObjectPersistentStore {
@Override
public void close() {
_closed = true;
_backing.close();
}

View File

@@ -2,7 +2,6 @@ package com.usatiuk.objects.stores;
import com.google.protobuf.ByteString;
import com.google.protobuf.UnsafeByteOperations;
import com.usatiuk.dhfs.supportlib.UninitializedByteBuffer;
import com.usatiuk.dhfs.utils.RefcountedCloseable;
import com.usatiuk.objects.JObjectKey;
import com.usatiuk.objects.JObjectKeyMax;
@@ -132,12 +131,7 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore {
public Optional<ByteString> readObject(JObjectKey name) {
assert !_closed;
var got = _db.get(_txn.get(), name.toByteBuffer());
var ret = Optional.ofNullable(got).map(read -> {
var uninitBb = UninitializedByteBuffer.allocateUninitialized(got.remaining());
uninitBb.put(got);
uninitBb.flip();
return UnsafeByteOperations.unsafeWrap(uninitBb);
});
var ret = Optional.ofNullable(got).map(UnsafeByteOperations::unsafeWrap);
return ret;
}
@@ -361,10 +355,7 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore {
}
// TODO: Right now with java serialization it doesn't matter, it's all copied to arrays anyway
var val = _cursor.val();
var bbDirect = UninitializedByteBuffer.allocateUninitialized(val.remaining());
bbDirect.put(val);
bbDirect.flip();
var bs = UnsafeByteOperations.unsafeWrap(bbDirect);
var bs = UnsafeByteOperations.unsafeWrap(val);
var ret = Pair.of(JObjectKey.fromByteBuffer(_cursor.key()), bs);
if (_goingForward)
_hasNext = _cursor.next();

View File

@@ -3,7 +3,6 @@ package com.usatiuk.objects.stores;
import com.google.protobuf.ByteString;
import com.usatiuk.objects.JDataVersionedWrapper;
import com.usatiuk.objects.JObjectKey;
import com.usatiuk.objects.JObjectKeyImpl;
import com.usatiuk.objects.ObjectSerializer;
import com.usatiuk.objects.iterators.CloseableKvIterator;
import com.usatiuk.objects.iterators.IteratorStart;