diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/LockManager.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/LockManager.java index 8d7ae3d1..8bb20802 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/LockManager.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/LockManager.java @@ -2,13 +2,21 @@ package com.usatiuk.dhfs.objects; import com.usatiuk.dhfs.utils.AutoCloseableNoThrow; import com.usatiuk.dhfs.utils.DataLocker; +import jakarta.annotation.Nonnull; +import jakarta.annotation.Nullable; import jakarta.enterprise.context.ApplicationScoped; @ApplicationScoped public class LockManager { private final DataLocker _objLocker = new DataLocker(); + @Nonnull public AutoCloseableNoThrow lockObject(JObjectKey key) { return _objLocker.lock(key); } + + @Nullable + public AutoCloseableNoThrow tryLockObject(JObjectKey key) { + return _objLocker.tryLock(key); + } } diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/persistence/CachingObjectPersistentStore.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/persistence/CachingObjectPersistentStore.java index ad378606..b52fe6bc 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/persistence/CachingObjectPersistentStore.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/persistence/CachingObjectPersistentStore.java @@ -2,7 +2,6 @@ package com.usatiuk.dhfs.objects.persistence; import com.usatiuk.dhfs.objects.*; import com.usatiuk.dhfs.objects.snapshot.Snapshot; -import com.usatiuk.dhfs.utils.DataLocker; import io.quarkus.logging.Log; import io.quarkus.runtime.Startup; import jakarta.enterprise.context.ApplicationScoped; @@ -25,7 +24,9 @@ public class CachingObjectPersistentStore { private long _cacheVersion = 0; private final ReentrantReadWriteLock _lock = new ReentrantReadWriteLock(); - private final DataLocker _readerLocker = new DataLocker(); + + @Inject + LockManager lockManager; @Inject SerializingObjectPersistentStore delegate; @@ -93,11 +94,22 @@ public class CachingObjectPersistentStore { } finally { _lock.readLock().unlock(); } - try (var lock = _readerLocker.lock(name)) { - // TODO: This is possibly racy -// var got = delegate.readObject(name); -// put(name, got); - return delegate.readObject(name); + // Global object lock, prevent putting the object into cache + // if its being written right now by another thread + try (var lock = lockManager.tryLockObject(name)) { + var got = delegate.readObject(name); + + if (lock == null) + return got; + + _lock.writeLock().lock(); + try { + put(name, got); + // No need to increase cache version, the objects didn't change + } finally { + _lock.writeLock().unlock(); + } + return got; } } @@ -125,79 +137,6 @@ public class CachingObjectPersistentStore { Log.tracev("Committed: {0} writes, {1} deletes", names.written().size(), names.deleted().size()); } - private class CachingKvIterator implements CloseableKvIterator { - private final CloseableKvIterator _delegate; - private final long _curCacheVersion; - - private CachingKvIterator(CloseableKvIterator delegate, long cacheVersion) { - _delegate = delegate; - _curCacheVersion = cacheVersion; - } - - @Override - public JObjectKey peekNextKey() { - return _delegate.peekNextKey(); - } - - @Override - public void skip() { - _delegate.skip(); - } - - @Override - public void close() { - _delegate.close(); - } - - @Override - public boolean hasNext() { - return _delegate.hasNext(); - } - - @Override - public JObjectKey peekPrevKey() { - return _delegate.peekPrevKey(); - } - - private void maybeCache(Pair prev) { - _lock.writeLock().lock(); - try { - if (_cacheVersion != _curCacheVersion) { - Log.tracev("Not caching: {0}", prev); - } else { - Log.tracev("Caching: {0}", prev); - put(prev.getKey(), Optional.of(prev.getValue())); - } - } finally { - _lock.writeLock().unlock(); - } - } - - @Override - public Pair prev() { - var prev = _delegate.prev(); - maybeCache(prev); - return prev; - } - - @Override - public boolean hasPrev() { - return _delegate.hasPrev(); - } - - @Override - public void skipPrev() { - _delegate.skipPrev(); - } - - @Override - public Pair next() { - var next = _delegate.next(); - maybeCache(next); - return next; - } - } - public Snapshot getSnapshot() { TreePMap curSortedCache; Snapshot backing = null; @@ -210,6 +149,7 @@ public class CachingObjectPersistentStore { try { curSortedCache = _sortedCache; cacheVersion = _cacheVersion; + // TODO: Could this be done without lock? backing = delegate.getSnapshot(); } finally { _lock.readLock().unlock(); @@ -219,7 +159,81 @@ public class CachingObjectPersistentStore { return new Snapshot() { private final TreePMap _curSortedCache = curSortedCache; private final Snapshot _backing = finalBacking; - private final long _cacheVersion = cacheVersion; + private final long _snapshotCacheVersion = cacheVersion; + + private void maybeCache(JObjectKey key, Optional obj) { + if (_snapshotCacheVersion != _cacheVersion) + return; + + _lock.writeLock().lock(); + try { + if (_snapshotCacheVersion != _cacheVersion) { + Log.tracev("Not caching: {0}", key); + } else { + Log.tracev("Caching: {0}", key); + put(key, obj); + } + } finally { + _lock.writeLock().unlock(); + } + } + + private class CachingKvIterator implements CloseableKvIterator { + private final CloseableKvIterator _delegate; + + private CachingKvIterator(CloseableKvIterator delegate) { + _delegate = delegate; + } + + @Override + public JObjectKey peekNextKey() { + return _delegate.peekNextKey(); + } + + @Override + public void skip() { + _delegate.skip(); + } + + @Override + public void close() { + _delegate.close(); + } + + @Override + public boolean hasNext() { + return _delegate.hasNext(); + } + + @Override + public JObjectKey peekPrevKey() { + return _delegate.peekPrevKey(); + } + + @Override + public Pair prev() { + var prev = _delegate.prev(); + maybeCache(prev.getKey(), Optional.of(prev.getValue())); + return prev; + } + + @Override + public boolean hasPrev() { + return _delegate.hasPrev(); + } + + @Override + public void skipPrev() { + _delegate.skipPrev(); + } + + @Override + public Pair next() { + var next = _delegate.next(); + maybeCache(next.getKey(), Optional.of(next.getValue())); + return next; + } + } @Override public CloseableKvIterator getIterator(IteratorStart start, JObjectKey key) { @@ -232,7 +246,7 @@ public class CachingObjectPersistentStore { return e.object(); } ), - (mS, mK) -> new MappingKvIterator<>(new CachingKvIterator(_backing.getIterator(start, key), _cacheVersion), Data::new)); + (mS, mK) -> new MappingKvIterator<>(new CachingKvIterator(_backing.getIterator(start, key)), Data::new)); } @Nonnull @@ -248,6 +262,8 @@ public class CachingObjectPersistentStore { default -> throw new IllegalStateException("Unexpected value: " + cached.object()); }; } + var read = _backing.readObject(name); + maybeCache(name, read); return _backing.readObject(name); } diff --git a/dhfs-parent/utils/src/main/java/com/usatiuk/dhfs/utils/DataLocker.java b/dhfs-parent/utils/src/main/java/com/usatiuk/dhfs/utils/DataLocker.java index 1292e235..f4b79ecf 100644 --- a/dhfs-parent/utils/src/main/java/com/usatiuk/dhfs/utils/DataLocker.java +++ b/dhfs-parent/utils/src/main/java/com/usatiuk/dhfs/utils/DataLocker.java @@ -1,6 +1,8 @@ package com.usatiuk.dhfs.utils; import io.quarkus.logging.Log; +import jakarta.annotation.Nonnull; +import jakarta.annotation.Nullable; import java.lang.ref.Cleaner; import java.util.concurrent.ConcurrentHashMap; @@ -10,6 +12,7 @@ public class DataLocker { }; private final ConcurrentHashMap _locks = new ConcurrentHashMap<>(); + @Nonnull public AutoCloseableNoThrow lock(Object data) { while (true) { try { @@ -36,6 +39,30 @@ public class DataLocker { } } + @Nullable + public AutoCloseableNoThrow tryLock(Object data) { + while (true) { + var tag = _locks.get(data); + if (tag != null) { + synchronized (tag) { + if (!tag.released) { + if (tag.owner == Thread.currentThread()) { + return DUMMY_LOCK; + } + return null; + } + continue; + } + } + + var newTag = new LockTag(); + var oldTag = _locks.putIfAbsent(data, newTag); + if (oldTag == null) { + return new Lock(data, newTag); + } + } + } + private static class LockTag { final Thread owner = Thread.currentThread(); // final StackTraceElement[] _creationStack = Thread.currentThread().getStackTrace();