Objects: working cache for object reads without iterator

This commit is contained in:
2025-03-14 00:01:21 +01:00
parent 4cbb4ce2be
commit d483eba20d
3 changed files with 133 additions and 82 deletions

View File

@@ -2,13 +2,21 @@ package com.usatiuk.dhfs.objects;
import com.usatiuk.dhfs.utils.AutoCloseableNoThrow;
import com.usatiuk.dhfs.utils.DataLocker;
import jakarta.annotation.Nonnull;
import jakarta.annotation.Nullable;
import jakarta.enterprise.context.ApplicationScoped;
@ApplicationScoped
public class LockManager {
private final DataLocker _objLocker = new DataLocker();
@Nonnull
public AutoCloseableNoThrow lockObject(JObjectKey key) {
return _objLocker.lock(key);
}
@Nullable
public AutoCloseableNoThrow tryLockObject(JObjectKey key) {
return _objLocker.tryLock(key);
}
}

View File

@@ -2,7 +2,6 @@ package com.usatiuk.dhfs.objects.persistence;
import com.usatiuk.dhfs.objects.*;
import com.usatiuk.dhfs.objects.snapshot.Snapshot;
import com.usatiuk.dhfs.utils.DataLocker;
import io.quarkus.logging.Log;
import io.quarkus.runtime.Startup;
import jakarta.enterprise.context.ApplicationScoped;
@@ -25,7 +24,9 @@ public class CachingObjectPersistentStore {
private long _cacheVersion = 0;
private final ReentrantReadWriteLock _lock = new ReentrantReadWriteLock();
private final DataLocker _readerLocker = new DataLocker();
@Inject
LockManager lockManager;
@Inject
SerializingObjectPersistentStore delegate;
@@ -93,11 +94,22 @@ public class CachingObjectPersistentStore {
} finally {
_lock.readLock().unlock();
}
try (var lock = _readerLocker.lock(name)) {
// TODO: This is possibly racy
// var got = delegate.readObject(name);
// put(name, got);
return delegate.readObject(name);
// Global object lock, prevent putting the object into cache
// if its being written right now by another thread
try (var lock = lockManager.tryLockObject(name)) {
var got = delegate.readObject(name);
if (lock == null)
return got;
_lock.writeLock().lock();
try {
put(name, got);
// No need to increase cache version, the objects didn't change
} finally {
_lock.writeLock().unlock();
}
return got;
}
}
@@ -125,79 +137,6 @@ public class CachingObjectPersistentStore {
Log.tracev("Committed: {0} writes, {1} deletes", names.written().size(), names.deleted().size());
}
private class CachingKvIterator implements CloseableKvIterator<JObjectKey, JDataVersionedWrapper> {
private final CloseableKvIterator<JObjectKey, JDataVersionedWrapper> _delegate;
private final long _curCacheVersion;
private CachingKvIterator(CloseableKvIterator<JObjectKey, JDataVersionedWrapper> delegate, long cacheVersion) {
_delegate = delegate;
_curCacheVersion = cacheVersion;
}
@Override
public JObjectKey peekNextKey() {
return _delegate.peekNextKey();
}
@Override
public void skip() {
_delegate.skip();
}
@Override
public void close() {
_delegate.close();
}
@Override
public boolean hasNext() {
return _delegate.hasNext();
}
@Override
public JObjectKey peekPrevKey() {
return _delegate.peekPrevKey();
}
private void maybeCache(Pair<JObjectKey, JDataVersionedWrapper> prev) {
_lock.writeLock().lock();
try {
if (_cacheVersion != _curCacheVersion) {
Log.tracev("Not caching: {0}", prev);
} else {
Log.tracev("Caching: {0}", prev);
put(prev.getKey(), Optional.of(prev.getValue()));
}
} finally {
_lock.writeLock().unlock();
}
}
@Override
public Pair<JObjectKey, JDataVersionedWrapper> prev() {
var prev = _delegate.prev();
maybeCache(prev);
return prev;
}
@Override
public boolean hasPrev() {
return _delegate.hasPrev();
}
@Override
public void skipPrev() {
_delegate.skipPrev();
}
@Override
public Pair<JObjectKey, JDataVersionedWrapper> next() {
var next = _delegate.next();
maybeCache(next);
return next;
}
}
public Snapshot<JObjectKey, JDataVersionedWrapper> getSnapshot() {
TreePMap<JObjectKey, CacheEntry> curSortedCache;
Snapshot<JObjectKey, JDataVersionedWrapper> backing = null;
@@ -210,6 +149,7 @@ public class CachingObjectPersistentStore {
try {
curSortedCache = _sortedCache;
cacheVersion = _cacheVersion;
// TODO: Could this be done without lock?
backing = delegate.getSnapshot();
} finally {
_lock.readLock().unlock();
@@ -219,7 +159,81 @@ public class CachingObjectPersistentStore {
return new Snapshot<JObjectKey, JDataVersionedWrapper>() {
private final TreePMap<JObjectKey, CacheEntry> _curSortedCache = curSortedCache;
private final Snapshot<JObjectKey, JDataVersionedWrapper> _backing = finalBacking;
private final long _cacheVersion = cacheVersion;
private final long _snapshotCacheVersion = cacheVersion;
private void maybeCache(JObjectKey key, Optional<JDataVersionedWrapper> obj) {
if (_snapshotCacheVersion != _cacheVersion)
return;
_lock.writeLock().lock();
try {
if (_snapshotCacheVersion != _cacheVersion) {
Log.tracev("Not caching: {0}", key);
} else {
Log.tracev("Caching: {0}", key);
put(key, obj);
}
} finally {
_lock.writeLock().unlock();
}
}
private class CachingKvIterator implements CloseableKvIterator<JObjectKey, JDataVersionedWrapper> {
private final CloseableKvIterator<JObjectKey, JDataVersionedWrapper> _delegate;
private CachingKvIterator(CloseableKvIterator<JObjectKey, JDataVersionedWrapper> delegate) {
_delegate = delegate;
}
@Override
public JObjectKey peekNextKey() {
return _delegate.peekNextKey();
}
@Override
public void skip() {
_delegate.skip();
}
@Override
public void close() {
_delegate.close();
}
@Override
public boolean hasNext() {
return _delegate.hasNext();
}
@Override
public JObjectKey peekPrevKey() {
return _delegate.peekPrevKey();
}
@Override
public Pair<JObjectKey, JDataVersionedWrapper> prev() {
var prev = _delegate.prev();
maybeCache(prev.getKey(), Optional.of(prev.getValue()));
return prev;
}
@Override
public boolean hasPrev() {
return _delegate.hasPrev();
}
@Override
public void skipPrev() {
_delegate.skipPrev();
}
@Override
public Pair<JObjectKey, JDataVersionedWrapper> next() {
var next = _delegate.next();
maybeCache(next.getKey(), Optional.of(next.getValue()));
return next;
}
}
@Override
public CloseableKvIterator<JObjectKey, JDataVersionedWrapper> getIterator(IteratorStart start, JObjectKey key) {
@@ -232,7 +246,7 @@ public class CachingObjectPersistentStore {
return e.object();
}
),
(mS, mK) -> new MappingKvIterator<>(new CachingKvIterator(_backing.getIterator(start, key), _cacheVersion), Data::new));
(mS, mK) -> new MappingKvIterator<>(new CachingKvIterator(_backing.getIterator(start, key)), Data::new));
}
@Nonnull
@@ -248,6 +262,8 @@ public class CachingObjectPersistentStore {
default -> throw new IllegalStateException("Unexpected value: " + cached.object());
};
}
var read = _backing.readObject(name);
maybeCache(name, read);
return _backing.readObject(name);
}

View File

@@ -1,6 +1,8 @@
package com.usatiuk.dhfs.utils;
import io.quarkus.logging.Log;
import jakarta.annotation.Nonnull;
import jakarta.annotation.Nullable;
import java.lang.ref.Cleaner;
import java.util.concurrent.ConcurrentHashMap;
@@ -10,6 +12,7 @@ public class DataLocker {
};
private final ConcurrentHashMap<Object, LockTag> _locks = new ConcurrentHashMap<>();
@Nonnull
public AutoCloseableNoThrow lock(Object data) {
while (true) {
try {
@@ -36,6 +39,30 @@ public class DataLocker {
}
}
@Nullable
public AutoCloseableNoThrow tryLock(Object data) {
while (true) {
var tag = _locks.get(data);
if (tag != null) {
synchronized (tag) {
if (!tag.released) {
if (tag.owner == Thread.currentThread()) {
return DUMMY_LOCK;
}
return null;
}
continue;
}
}
var newTag = new LockTag();
var oldTag = _locks.putIfAbsent(data, newTag);
if (oldTag == null) {
return new Lock(data, newTag);
}
}
}
private static class LockTag {
final Thread owner = Thread.currentThread();
// final StackTraceElement[] _creationStack = Thread.currentThread().getStackTrace();