mirror of
https://github.com/usatiuk/dhfs.git
synced 2025-10-28 20:47:49 +01:00
Compare commits
2 Commits
a6a4101bb0
...
59e8f6a6b4
| Author | SHA1 | Date | |
|---|---|---|---|
| 59e8f6a6b4 | |||
| 0292df7f0e |
@@ -6,6 +6,7 @@ public class JDataVersionedWrapperLazy implements JDataVersionedWrapper {
|
||||
private final long _version;
|
||||
private final int _estimatedSize;
|
||||
private Supplier<JData> _producer;
|
||||
private Runnable _cacheCallback;
|
||||
private JData _data;
|
||||
|
||||
public JDataVersionedWrapperLazy(long version, int estimatedSize, Supplier<JData> producer) {
|
||||
@@ -14,6 +15,14 @@ public class JDataVersionedWrapperLazy implements JDataVersionedWrapper {
|
||||
_producer = producer;
|
||||
}
|
||||
|
||||
public void setCacheCallback(Runnable cacheCallback) {
|
||||
if (_data != null) {
|
||||
throw new IllegalStateException("Cache callback can be set only before data is loaded");
|
||||
}
|
||||
|
||||
_cacheCallback = cacheCallback;
|
||||
}
|
||||
|
||||
public JData data() {
|
||||
if (_data != null)
|
||||
return _data;
|
||||
@@ -23,6 +32,10 @@ public class JDataVersionedWrapperLazy implements JDataVersionedWrapper {
|
||||
return _data;
|
||||
|
||||
_data = _producer.get();
|
||||
if (_cacheCallback != null) {
|
||||
_cacheCallback.run();
|
||||
_cacheCallback = null;
|
||||
}
|
||||
_producer = null;
|
||||
return _data;
|
||||
}
|
||||
|
||||
@@ -2,12 +2,12 @@ package com.usatiuk.objects;
|
||||
|
||||
|
||||
import com.google.protobuf.ByteString;
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Singleton;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
@ApplicationScoped
|
||||
@Singleton
|
||||
public class JDataVersionedWrapperSerializer implements ObjectSerializer<JDataVersionedWrapper> {
|
||||
@Inject
|
||||
ObjectSerializer<JData> dataSerializer;
|
||||
|
||||
@@ -30,7 +30,9 @@ public sealed interface JObjectKey extends Serializable, Comparable<JObjectKey>
|
||||
}
|
||||
|
||||
static JObjectKey fromByteBuffer(ByteBuffer buff) {
|
||||
return new JObjectKeyImpl(StandardCharsets.ISO_8859_1.decode(buff).toString());
|
||||
byte[] bytes = new byte[buff.remaining()];
|
||||
buff.get(bytes);
|
||||
return new JObjectKeyImpl(bytes);
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -39,8 +41,6 @@ public sealed interface JObjectKey extends Serializable, Comparable<JObjectKey>
|
||||
@Override
|
||||
String toString();
|
||||
|
||||
byte[] bytes();
|
||||
|
||||
ByteBuffer toByteBuffer();
|
||||
|
||||
String value();
|
||||
|
||||
@@ -2,10 +2,25 @@ package com.usatiuk.objects;
|
||||
|
||||
import com.usatiuk.dhfs.supportlib.UninitializedByteBuffer;
|
||||
|
||||
import java.io.Serial;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.Objects;
|
||||
|
||||
public final class JObjectKeyImpl implements JObjectKey {
|
||||
@Serial
|
||||
private static final long serialVersionUID = 0L;
|
||||
private final String value;
|
||||
private transient ByteBuffer _bb = null;
|
||||
|
||||
public JObjectKeyImpl(String value) {
|
||||
this.value = value;
|
||||
}
|
||||
|
||||
public JObjectKeyImpl(byte[] bytes) {
|
||||
this.value = new String(bytes, StandardCharsets.ISO_8859_1);
|
||||
}
|
||||
|
||||
public record JObjectKeyImpl(String value) implements JObjectKey {
|
||||
@Override
|
||||
public int compareTo(JObjectKey o) {
|
||||
switch (o) {
|
||||
@@ -27,17 +42,36 @@ public record JObjectKeyImpl(String value) implements JObjectKey {
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] bytes() {
|
||||
return value.getBytes(StandardCharsets.ISO_8859_1);
|
||||
public ByteBuffer toByteBuffer() {
|
||||
if (_bb != null) return _bb;
|
||||
|
||||
synchronized (this) {
|
||||
if (_bb != null) return _bb;
|
||||
var bytes = value.getBytes(StandardCharsets.ISO_8859_1);
|
||||
var directBb = UninitializedByteBuffer.allocateUninitialized(bytes.length);
|
||||
directBb.put(bytes);
|
||||
directBb.flip();
|
||||
_bb = directBb;
|
||||
return directBb;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public ByteBuffer toByteBuffer() {
|
||||
var heapBb = StandardCharsets.ISO_8859_1.encode(value);
|
||||
if (heapBb.isDirect()) return heapBb;
|
||||
var directBb = UninitializedByteBuffer.allocateUninitialized(heapBb.remaining());
|
||||
directBb.put(heapBb);
|
||||
directBb.flip();
|
||||
return directBb;
|
||||
public String value() {
|
||||
return value;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
if (obj == this) return true;
|
||||
if (obj == null || obj.getClass() != this.getClass()) return false;
|
||||
var that = (JObjectKeyImpl) obj;
|
||||
return Objects.equals(this.value, that.value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(value);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -18,11 +18,6 @@ public record JObjectKeyMax() implements JObjectKey {
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] bytes() {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ByteBuffer toByteBuffer() {
|
||||
throw new UnsupportedOperationException();
|
||||
|
||||
@@ -18,11 +18,6 @@ public record JObjectKeyMin() implements JObjectKey {
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] bytes() {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ByteBuffer toByteBuffer() {
|
||||
throw new UnsupportedOperationException();
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package com.usatiuk.objects.stores;
|
||||
|
||||
import com.usatiuk.objects.JDataVersionedWrapper;
|
||||
import com.usatiuk.objects.JDataVersionedWrapperLazy;
|
||||
import com.usatiuk.objects.JObjectKey;
|
||||
import com.usatiuk.objects.iterators.*;
|
||||
import com.usatiuk.objects.snapshot.Snapshot;
|
||||
@@ -141,10 +142,11 @@ public class CachingObjectPersistentStore {
|
||||
Cache finalCurCache = curCache;
|
||||
return new Snapshot<JObjectKey, JDataVersionedWrapper>() {
|
||||
private boolean _invalid = false;
|
||||
private boolean _closed = false;
|
||||
private final Cache _curCache = finalCurCache;
|
||||
private final Snapshot<JObjectKey, JDataVersionedWrapper> _backing = finalBacking;
|
||||
|
||||
private void maybeCache(JObjectKey key, Optional<JDataVersionedWrapper> obj) {
|
||||
private void doCache(JObjectKey key, Optional<JDataVersionedWrapper> obj) {
|
||||
_cacheTries.incrementAndGet();
|
||||
if (_invalid)
|
||||
return;
|
||||
@@ -160,6 +162,29 @@ public class CachingObjectPersistentStore {
|
||||
_cached.incrementAndGet();
|
||||
}
|
||||
|
||||
private void maybeCache(JObjectKey key, Optional<JDataVersionedWrapper> obj) {
|
||||
if (obj.isEmpty()) {
|
||||
doCache(key, obj);
|
||||
return;
|
||||
}
|
||||
|
||||
var wrapper = obj.get();
|
||||
|
||||
if (!(wrapper instanceof JDataVersionedWrapperLazy lazy)) {
|
||||
doCache(key, obj);
|
||||
return;
|
||||
}
|
||||
|
||||
lazy.setCacheCallback(() -> {
|
||||
if (_closed) {
|
||||
Log.error("Cache callback called after close");
|
||||
System.exit(-1);
|
||||
}
|
||||
doCache(key, obj);
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
@Override
|
||||
public CloseableKvIterator<JObjectKey, JDataVersionedWrapper> getIterator(IteratorStart start, JObjectKey key) {
|
||||
return new TombstoneMergingKvIterator<>("cache", start, key,
|
||||
@@ -199,6 +224,7 @@ public class CachingObjectPersistentStore {
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
_closed = true;
|
||||
_backing.close();
|
||||
}
|
||||
|
||||
|
||||
@@ -2,7 +2,6 @@ package com.usatiuk.objects.stores;
|
||||
|
||||
import com.google.protobuf.ByteString;
|
||||
import com.google.protobuf.UnsafeByteOperations;
|
||||
import com.usatiuk.dhfs.supportlib.UninitializedByteBuffer;
|
||||
import com.usatiuk.dhfs.utils.RefcountedCloseable;
|
||||
import com.usatiuk.objects.JObjectKey;
|
||||
import com.usatiuk.objects.JObjectKeyMax;
|
||||
@@ -132,12 +131,7 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore {
|
||||
public Optional<ByteString> readObject(JObjectKey name) {
|
||||
assert !_closed;
|
||||
var got = _db.get(_txn.get(), name.toByteBuffer());
|
||||
var ret = Optional.ofNullable(got).map(read -> {
|
||||
var uninitBb = UninitializedByteBuffer.allocateUninitialized(got.remaining());
|
||||
uninitBb.put(got);
|
||||
uninitBb.flip();
|
||||
return UnsafeByteOperations.unsafeWrap(uninitBb);
|
||||
});
|
||||
var ret = Optional.ofNullable(got).map(UnsafeByteOperations::unsafeWrap);
|
||||
return ret;
|
||||
}
|
||||
|
||||
@@ -361,10 +355,7 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore {
|
||||
}
|
||||
// TODO: Right now with java serialization it doesn't matter, it's all copied to arrays anyway
|
||||
var val = _cursor.val();
|
||||
var bbDirect = UninitializedByteBuffer.allocateUninitialized(val.remaining());
|
||||
bbDirect.put(val);
|
||||
bbDirect.flip();
|
||||
var bs = UnsafeByteOperations.unsafeWrap(bbDirect);
|
||||
var bs = UnsafeByteOperations.unsafeWrap(val);
|
||||
var ret = Pair.of(JObjectKey.fromByteBuffer(_cursor.key()), bs);
|
||||
if (_goingForward)
|
||||
_hasNext = _cursor.next();
|
||||
|
||||
@@ -3,7 +3,6 @@ package com.usatiuk.objects.stores;
|
||||
import com.google.protobuf.ByteString;
|
||||
import com.usatiuk.objects.JDataVersionedWrapper;
|
||||
import com.usatiuk.objects.JObjectKey;
|
||||
import com.usatiuk.objects.JObjectKeyImpl;
|
||||
import com.usatiuk.objects.ObjectSerializer;
|
||||
import com.usatiuk.objects.iterators.CloseableKvIterator;
|
||||
import com.usatiuk.objects.iterators.IteratorStart;
|
||||
|
||||
Reference in New Issue
Block a user