mirror of
https://github.com/usatiuk/dhfs.git
synced 2025-10-28 20:47:49 +01:00
Objects: one less copy when serializing
only cache what was really read, otherwise its lifetime is the same as transaction
This commit is contained in:
@@ -6,6 +6,7 @@ public class JDataVersionedWrapperLazy implements JDataVersionedWrapper {
|
||||
private final long _version;
|
||||
private final int _estimatedSize;
|
||||
private Supplier<JData> _producer;
|
||||
private Runnable _cacheCallback;
|
||||
private JData _data;
|
||||
|
||||
public JDataVersionedWrapperLazy(long version, int estimatedSize, Supplier<JData> producer) {
|
||||
@@ -14,6 +15,14 @@ public class JDataVersionedWrapperLazy implements JDataVersionedWrapper {
|
||||
_producer = producer;
|
||||
}
|
||||
|
||||
public void setCacheCallback(Runnable cacheCallback) {
|
||||
if (_data != null) {
|
||||
throw new IllegalStateException("Cache callback can be set only before data is loaded");
|
||||
}
|
||||
|
||||
_cacheCallback = cacheCallback;
|
||||
}
|
||||
|
||||
public JData data() {
|
||||
if (_data != null)
|
||||
return _data;
|
||||
@@ -23,6 +32,10 @@ public class JDataVersionedWrapperLazy implements JDataVersionedWrapper {
|
||||
return _data;
|
||||
|
||||
_data = _producer.get();
|
||||
if (_cacheCallback != null) {
|
||||
_cacheCallback.run();
|
||||
_cacheCallback = null;
|
||||
}
|
||||
_producer = null;
|
||||
return _data;
|
||||
}
|
||||
|
||||
@@ -2,12 +2,12 @@ package com.usatiuk.objects;
|
||||
|
||||
|
||||
import com.google.protobuf.ByteString;
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Singleton;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
@ApplicationScoped
|
||||
@Singleton
|
||||
public class JDataVersionedWrapperSerializer implements ObjectSerializer<JDataVersionedWrapper> {
|
||||
@Inject
|
||||
ObjectSerializer<JData> dataSerializer;
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package com.usatiuk.objects.stores;
|
||||
|
||||
import com.usatiuk.objects.JDataVersionedWrapper;
|
||||
import com.usatiuk.objects.JDataVersionedWrapperLazy;
|
||||
import com.usatiuk.objects.JObjectKey;
|
||||
import com.usatiuk.objects.iterators.*;
|
||||
import com.usatiuk.objects.snapshot.Snapshot;
|
||||
@@ -141,10 +142,11 @@ public class CachingObjectPersistentStore {
|
||||
Cache finalCurCache = curCache;
|
||||
return new Snapshot<JObjectKey, JDataVersionedWrapper>() {
|
||||
private boolean _invalid = false;
|
||||
private boolean _closed = false;
|
||||
private final Cache _curCache = finalCurCache;
|
||||
private final Snapshot<JObjectKey, JDataVersionedWrapper> _backing = finalBacking;
|
||||
|
||||
private void maybeCache(JObjectKey key, Optional<JDataVersionedWrapper> obj) {
|
||||
private void doCache(JObjectKey key, Optional<JDataVersionedWrapper> obj) {
|
||||
_cacheTries.incrementAndGet();
|
||||
if (_invalid)
|
||||
return;
|
||||
@@ -160,6 +162,29 @@ public class CachingObjectPersistentStore {
|
||||
_cached.incrementAndGet();
|
||||
}
|
||||
|
||||
private void maybeCache(JObjectKey key, Optional<JDataVersionedWrapper> obj) {
|
||||
if (obj.isEmpty()) {
|
||||
doCache(key, obj);
|
||||
return;
|
||||
}
|
||||
|
||||
var wrapper = obj.get();
|
||||
|
||||
if (!(wrapper instanceof JDataVersionedWrapperLazy lazy)) {
|
||||
doCache(key, obj);
|
||||
return;
|
||||
}
|
||||
|
||||
lazy.setCacheCallback(() -> {
|
||||
if (_closed) {
|
||||
Log.error("Cache callback called after close");
|
||||
System.exit(-1);
|
||||
}
|
||||
doCache(key, obj);
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
@Override
|
||||
public CloseableKvIterator<JObjectKey, JDataVersionedWrapper> getIterator(IteratorStart start, JObjectKey key) {
|
||||
return new TombstoneMergingKvIterator<>("cache", start, key,
|
||||
@@ -199,6 +224,7 @@ public class CachingObjectPersistentStore {
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
_closed = true;
|
||||
_backing.close();
|
||||
}
|
||||
|
||||
|
||||
@@ -2,7 +2,6 @@ package com.usatiuk.objects.stores;
|
||||
|
||||
import com.google.protobuf.ByteString;
|
||||
import com.google.protobuf.UnsafeByteOperations;
|
||||
import com.usatiuk.dhfs.supportlib.UninitializedByteBuffer;
|
||||
import com.usatiuk.dhfs.utils.RefcountedCloseable;
|
||||
import com.usatiuk.objects.JObjectKey;
|
||||
import com.usatiuk.objects.JObjectKeyMax;
|
||||
@@ -132,12 +131,7 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore {
|
||||
public Optional<ByteString> readObject(JObjectKey name) {
|
||||
assert !_closed;
|
||||
var got = _db.get(_txn.get(), name.toByteBuffer());
|
||||
var ret = Optional.ofNullable(got).map(read -> {
|
||||
var uninitBb = UninitializedByteBuffer.allocateUninitialized(got.remaining());
|
||||
uninitBb.put(got);
|
||||
uninitBb.flip();
|
||||
return UnsafeByteOperations.unsafeWrap(uninitBb);
|
||||
});
|
||||
var ret = Optional.ofNullable(got).map(UnsafeByteOperations::unsafeWrap);
|
||||
return ret;
|
||||
}
|
||||
|
||||
@@ -361,10 +355,7 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore {
|
||||
}
|
||||
// TODO: Right now with java serialization it doesn't matter, it's all copied to arrays anyway
|
||||
var val = _cursor.val();
|
||||
var bbDirect = UninitializedByteBuffer.allocateUninitialized(val.remaining());
|
||||
bbDirect.put(val);
|
||||
bbDirect.flip();
|
||||
var bs = UnsafeByteOperations.unsafeWrap(bbDirect);
|
||||
var bs = UnsafeByteOperations.unsafeWrap(val);
|
||||
var ret = Pair.of(JObjectKey.fromByteBuffer(_cursor.key()), bs);
|
||||
if (_goingForward)
|
||||
_hasNext = _cursor.next();
|
||||
|
||||
@@ -3,7 +3,6 @@ package com.usatiuk.objects.stores;
|
||||
import com.google.protobuf.ByteString;
|
||||
import com.usatiuk.objects.JDataVersionedWrapper;
|
||||
import com.usatiuk.objects.JObjectKey;
|
||||
import com.usatiuk.objects.JObjectKeyImpl;
|
||||
import com.usatiuk.objects.ObjectSerializer;
|
||||
import com.usatiuk.objects.iterators.CloseableKvIterator;
|
||||
import com.usatiuk.objects.iterators.IteratorStart;
|
||||
|
||||
Reference in New Issue
Block a user