diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/InvalidIteratorException.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/InvalidIteratorException.java new file mode 100644 index 00000000..fa2bb988 --- /dev/null +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/InvalidIteratorException.java @@ -0,0 +1,11 @@ +package com.usatiuk.dhfs.objects; + +public class InvalidIteratorException extends RuntimeException { + public InvalidIteratorException() { + super(); + } + + public InvalidIteratorException(String message) { + super(message); + } +} diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/InvalidatableKvIterator.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/InvalidatableKvIterator.java new file mode 100644 index 00000000..712499c3 --- /dev/null +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/InvalidatableKvIterator.java @@ -0,0 +1,66 @@ +package com.usatiuk.dhfs.objects; + +import io.quarkus.logging.Log; +import org.apache.commons.lang3.tuple.Pair; + +import java.util.concurrent.locks.Lock; +import java.util.function.Supplier; + +public class InvalidatableKvIterator, V> implements CloseableKvIterator { + private final CloseableKvIterator _backing; + private final Supplier _versionSupplier; + private final long _version; + private final Lock _lock; + + public InvalidatableKvIterator(CloseableKvIterator backing, Supplier versionSupplier, Lock lock) { + _backing = backing; + _versionSupplier = versionSupplier; + _lock = lock; + _version = _versionSupplier.get(); + } + + private void checkVersion() { + if (_versionSupplier.get() != _version) { + Log.errorv("Version mismatch: {0} != {1}", _versionSupplier.get(), _version); + throw new InvalidIteratorException(); + } + } + + @Override + public K peekNextKey() { + _lock.lock(); + try { + checkVersion(); + return _backing.peekNextKey(); + } finally { + _lock.unlock(); + } + } + + @Override + public void close() { + _backing.close(); + } + + @Override + public boolean hasNext() { + _lock.lock(); + try { + checkVersion(); + return _backing.hasNext(); + } finally { + _lock.unlock(); + } + } + + @Override + public Pair next() { + _lock.lock(); + try { + checkVersion(); + return _backing.next(); + } finally { + _lock.unlock(); + } + } +} diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/SelfRefreshingKvIterator.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/SelfRefreshingKvIterator.java index 400ce735..4b4f6df2 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/SelfRefreshingKvIterator.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/SelfRefreshingKvIterator.java @@ -1,69 +1,84 @@ package com.usatiuk.dhfs.objects; +import com.usatiuk.dhfs.objects.persistence.IteratorStart; import io.quarkus.logging.Log; import org.apache.commons.lang3.tuple.Pair; import java.util.NoSuchElementException; +import java.util.concurrent.locks.Lock; +import java.util.function.Function; import java.util.function.Supplier; +// Also checks that the next provided item is always consistent after a refresh public class SelfRefreshingKvIterator, V> implements CloseableKvIterator { private CloseableKvIterator _backing; - private long _lastRefreshed = -1L; - private Pair _next; - private final Object _synchronizer; - private final Supplier> _iteratorSupplier; + private long _curVersion = -1L; + private final Lock _lock; + private final Function, CloseableKvIterator> _iteratorSupplier; private final Supplier _versionSupplier; + private Pair _next; - public SelfRefreshingKvIterator(Supplier> iteratorSupplier, Supplier versionSupplier, Object synchronizer) { + public SelfRefreshingKvIterator(Function, CloseableKvIterator> iteratorSupplier, Supplier versionSupplier, Lock lock, + IteratorStart start, K key) { _iteratorSupplier = iteratorSupplier; _versionSupplier = versionSupplier; - _synchronizer = synchronizer; + _lock = lock; - synchronized (_synchronizer) { + _lock.lock(); + try { long curVersion = _versionSupplier.get(); - _backing = _iteratorSupplier.get(); + _backing = _iteratorSupplier.apply(Pair.of(start, key)); _next = _backing.hasNext() ? _backing.next() : null; -// if (_next != null) -// assert _next.getValue().version() <= _id; - _lastRefreshed = curVersion; + _curVersion = curVersion; + } finally { + _lock.unlock(); } } - private void doRefresh() { - long curVersion = _versionSupplier.get(); - if (curVersion == _lastRefreshed) { - return; - } - if (_next == null) return; - synchronized (_synchronizer) { - curVersion = _versionSupplier.get(); - Log.tracev("Refreshing iterator last refreshed {0}, current version {1}", _lastRefreshed, curVersion); - _backing.close(); - _backing = _iteratorSupplier.get(); + private void maybeRefresh() { + _lock.lock(); + CloseableKvIterator oldBacking = null; + try { + if (_versionSupplier.get() == _curVersion) { + return; + } + long newVersion = _versionSupplier.get(); + Log.tracev("Refreshing iterator last refreshed {0}, current version {1}", _curVersion, newVersion); + oldBacking = _backing; + _backing = _iteratorSupplier.apply(Pair.of(IteratorStart.GE, _next.getKey())); var next = _backing.hasNext() ? _backing.next() : null; if (next == null) { Log.errorv("Failed to refresh iterator, null last refreshed {0}," + - " current version {1}, current value {2}", _lastRefreshed, curVersion, next); + " current version {1}, current value {2}", _curVersion, newVersion, next); assert false; } else if (!next.equals(_next)) { Log.errorv("Failed to refresh iterator, mismatch last refreshed {0}," + - " current version {1}, current value {2}, read value {3}", _lastRefreshed, curVersion, _next, next); + " current version {1}, current value {2}, read value {3}", _curVersion, newVersion, _next, next); assert false; } _next = next; - _lastRefreshed = curVersion; + _curVersion = newVersion; + } finally { + _lock.unlock(); + if (oldBacking != null) { + oldBacking.close(); + } } } // _next should always be valid, so it's ok to do the refresh "lazily" private void prepareNext() { - doRefresh(); - if (_backing.hasNext()) { - _next = _backing.next(); -// assert _next.getValue().version() <= _id; - } else { - _next = null; + _lock.lock(); + try { + maybeRefresh(); + if (_backing.hasNext()) { + _next = _backing.next(); + } else { + _next = null; + } + } finally { + _lock.unlock(); } } @@ -91,7 +106,6 @@ public class SelfRefreshingKvIterator, V> implements Clo throw new NoSuchElementException("No more elements"); } var ret = _next; -// assert ret.getValue().version() <= _id; prepareNext(); Log.tracev("Read: {0}, next: {1}", ret, _next); return ret; diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/SnapshotManager.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/SnapshotManager.java index d0ef2b65..16bd6146 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/SnapshotManager.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/SnapshotManager.java @@ -14,6 +14,7 @@ import java.lang.ref.Cleaner; import java.util.*; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Consumer; @ApplicationScoped @@ -21,6 +22,8 @@ public class SnapshotManager { @Inject WritebackObjectPersistentStore delegateStore; + private final ReentrantReadWriteLock _lock = new ReentrantReadWriteLock(); + private interface SnapshotEntry { long whenToRemove(); } @@ -55,7 +58,8 @@ public class SnapshotManager { } Consumer commitTx(Collection> writes, long id) { - synchronized (this) { + _lock.writeLock().lock(); + try { assert id > _lastSnapshotId; if (!_snapshotIds.isEmpty()) { verify(); @@ -67,10 +71,9 @@ public class SnapshotManager { Pair newSnapshotEntry = switch (current) { case WritebackObjectPersistentStore.VerboseReadResultPersisted( Optional data - ) -> { - yield Pair.of(new SnapshotKey(action.key(), Math.max(_snapshotIds.peek(), data.map(JDataVersionedWrapper::version).orElse(0L))), - data.map(o -> new SnapshotEntryObject(o, id)).orElse(new SnapshotEntryDeleted(id))); - } + ) -> + Pair.of(new SnapshotKey(action.key(), Math.max(_snapshotIds.peek(), data.map(JDataVersionedWrapper::version).orElse(0L))), + data.map(o -> new SnapshotEntryObject(o, id)).orElse(new SnapshotEntryDeleted(id))); case WritebackObjectPersistentStore.VerboseReadResultPending( TxWriteback.PendingWriteEntry pending ) -> { @@ -108,13 +111,19 @@ public class SnapshotManager { } verify(); + // Commit under lock, iterators will see new version after the lock is released and writeback + // cache is updated + // TODO: Maybe writeback iterator being invalidated wouldn't be a problem? return delegateStore.commitTx(writes, id); + } finally { + _lock.writeLock().unlock(); } } private void unrefSnapshot(long id) { Log.tracev("Unref snapshot {0}", id); - synchronized (this) { + _lock.writeLock().lock(); + try { verify(); var refCount = _snapshotRefCounts.merge(id, -1L, (a, b) -> a + b == 0 ? null : a + b); if (!(refCount == null && id == _lastAliveSnapshotId)) { @@ -176,6 +185,8 @@ public class SnapshotManager { curCount = _snapshotRefCounts.getOrDefault(curId, 0L); } while (curCount == 0); verify(); + } finally { + _lock.writeLock().unlock(); } } @@ -201,7 +212,8 @@ public class SnapshotManager { private Snapshot(long id) { _id = id; - synchronized (SnapshotManager.this) { + _lock.writeLock().lock(); + try { verify(); if (_lastSnapshotId > id) throw new IllegalSnapshotIdException("Snapshot id " + id + " is less than last snapshot id " + _lastSnapshotId); @@ -212,6 +224,8 @@ public class SnapshotManager { _snapshotIds.add(id); } verify(); + } finally { + _lock.writeLock().unlock(); } var closedRef = _closed; var idRef = _id; @@ -295,9 +309,6 @@ public class SnapshotManager { } - // In case something was added to the snapshot, it is not guaranteed that the iterators will see it, - // so refresh them manually. Otherwise, it could be possible that something from the writeback cache will - // be served instead. public class CheckingSnapshotKvIterator implements CloseableKvIterator { private final CloseableKvIterator _backing; @@ -329,14 +340,18 @@ public class SnapshotManager { } public CloseableKvIterator getIterator(IteratorStart start, JObjectKey key) { - return new CheckingSnapshotKvIterator(new SelfRefreshingKvIterator<>(() -> - new TombstoneMergingKvIterator<>(new SnapshotKvIterator(start, key), - new MappingKvIterator<>(delegateStore.getIterator(start, key), d -> switch (d) { + // In case something was added to the snapshot, it is not guaranteed that the iterators will see it, + // so refresh them manually. Otherwise, it could be possible that something from the writeback cache will + // be served instead. Note that refreshing the iterator will also refresh the writeback iterator, + // so it also should be consistent. + return new CheckingSnapshotKvIterator(new SelfRefreshingKvIterator<>((params) -> + new TombstoneMergingKvIterator<>(new SnapshotKvIterator(params.getLeft(), params.getRight()), + new MappingKvIterator<>(delegateStore.getIterator(params.getLeft(), params.getRight()), d -> switch (d) { case TombstoneMergingKvIterator.Tombstone() -> d; case TombstoneMergingKvIterator.Data data -> data.value().version() <= _id ? data : new TombstoneMergingKvIterator.Tombstone<>(); default -> throw new IllegalStateException("Unexpected value: " + d); - })), _snapshotVersion::get, SnapshotManager.this)); + })), _snapshotVersion::get, _lock.readLock(), start, key)); } public CloseableKvIterator getIterator(JObjectKey key) { diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/TxWritebackImpl.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/TxWritebackImpl.java index 3fd67c60..1926ca95 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/TxWritebackImpl.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/TxWritebackImpl.java @@ -20,12 +20,15 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReentrantReadWriteLock; @ApplicationScoped public class TxWritebackImpl implements TxWriteback { private final LinkedList _pendingBundles = new LinkedList<>(); + private final ReentrantReadWriteLock _pendingBundlesVersionLock = new ReentrantReadWriteLock(); private final ConcurrentSkipListMap _pendingWrites = new ConcurrentSkipListMap<>(); + private final AtomicLong _pendingWritesVersion = new AtomicLong(); private final LinkedHashMap _notFlushedBundles = new LinkedHashMap<>(); private final Object _flushWaitSynchronizer = new Object(); @@ -128,12 +131,14 @@ public class TxWritebackImpl implements TxWriteback { Log.trace("Bundle " + bundle.getId() + " committed"); + // Remove from pending writes, after real commit synchronized (_pendingBundles) { bundle._entries.values().forEach(e -> { var cur = _pendingWrites.get(e.key()); if (cur.bundleId() <= bundle.getId()) _pendingWrites.remove(e.key(), cur); }); + // No need to increment version } List> callbacks = new ArrayList<>(); @@ -219,22 +224,28 @@ public class TxWritebackImpl implements TxWriteback { @Override public void commitBundle(TxBundle bundle) { verifyReady(); - synchronized (_pendingBundles) { - ((TxBundleImpl) bundle).setReady(); - ((TxBundleImpl) bundle)._entries.values().forEach(e -> { - switch (e) { - case TxBundleImpl.CommittedEntry c -> - _pendingWrites.put(c.key(), new PendingWrite(c.data, bundle.getId())); - case TxBundleImpl.DeletedEntry d -> - _pendingWrites.put(d.key(), new PendingDelete(d.key, bundle.getId())); - default -> throw new IllegalStateException("Unexpected value: " + e); + _pendingBundlesVersionLock.writeLock().lock(); + try { + synchronized (_pendingBundles) { + ((TxBundleImpl) bundle).setReady(); + ((TxBundleImpl) bundle)._entries.values().forEach(e -> { + switch (e) { + case TxBundleImpl.CommittedEntry c -> + _pendingWrites.put(c.key(), new PendingWrite(c.data, bundle.getId())); + case TxBundleImpl.DeletedEntry d -> + _pendingWrites.put(d.key(), new PendingDelete(d.key, bundle.getId())); + default -> throw new IllegalStateException("Unexpected value: " + e); + } + }); + _pendingWritesVersion.incrementAndGet(); + if (_pendingBundles.peek() == bundle) + _pendingBundles.notify(); + synchronized (_flushWaitSynchronizer) { + currentSize += ((TxBundleImpl) bundle).calculateTotalSize(); } - }); - if (_pendingBundles.peek() == bundle) - _pendingBundles.notify(); - synchronized (_flushWaitSynchronizer) { - currentSize += ((TxBundleImpl) bundle).calculateTotalSize(); } + } finally { + _pendingBundlesVersionLock.writeLock().unlock(); } } @@ -378,14 +389,20 @@ public class TxWritebackImpl implements TxWriteback { // Returns an iterator with a view of all commited objects // Does not have to guarantee consistent view, snapshots are handled by upper layers + // Invalidated by commitBundle, but might return data after it has been really committed @Override public CloseableKvIterator> getIterator(IteratorStart start, JObjectKey key) { - return new MappingKvIterator<>( - new NavigableMapKvIterator<>(_pendingWrites, start, key), - e -> switch (e) { - case PendingWrite p -> new TombstoneMergingKvIterator.Data<>(p.data()); - case PendingDelete d -> new TombstoneMergingKvIterator.Tombstone<>(); - default -> throw new IllegalStateException("Unexpected value: " + e); - }); + _pendingBundlesVersionLock.readLock().lock(); + try { + return new InvalidatableKvIterator<>(new MappingKvIterator<>( + new NavigableMapKvIterator<>(_pendingWrites, start, key), + e -> switch (e) { + case PendingWrite p -> new TombstoneMergingKvIterator.Data<>(p.data()); + case PendingDelete d -> new TombstoneMergingKvIterator.Tombstone<>(); + default -> throw new IllegalStateException("Unexpected value: " + e); + }), _pendingWritesVersion::get, _pendingBundlesVersionLock.readLock()); + } finally { + _pendingBundlesVersionLock.readLock().unlock(); + } } } diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/WritebackObjectPersistentStore.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/WritebackObjectPersistentStore.java index a373750b..4adc4489 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/WritebackObjectPersistentStore.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/WritebackObjectPersistentStore.java @@ -11,6 +11,8 @@ import javax.annotation.Nonnull; import java.util.Collection; import java.util.HashSet; import java.util.Optional; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Consumer; @ApplicationScoped @@ -19,6 +21,8 @@ public class WritebackObjectPersistentStore { CachingObjectPersistentStore delegate; @Inject TxWriteback txWriteback; + private final AtomicLong _commitCounter = new AtomicLong(0); + private final ReentrantReadWriteLock _lock = new ReentrantReadWriteLock(); @Nonnull public Collection findAllObjects() { @@ -88,6 +92,7 @@ public class WritebackObjectPersistentStore { Log.tracef("Committing transaction %d to storage", id); txWriteback.commitBundle(bundle); + _commitCounter.incrementAndGet(); long bundleId = bundle.getId(); @@ -96,9 +101,16 @@ public class WritebackObjectPersistentStore { // Returns an iterator with a view of all commited objects // Does not have to guarantee consistent view, snapshots are handled by upper layers + // Should be refreshed after each commit public CloseableKvIterator> getIterator(IteratorStart start, JObjectKey key) { - return new MergingKvIterator<>(txWriteback.getIterator(start, key), - new MappingKvIterator<>(delegate.getIterator(start, key), TombstoneMergingKvIterator.Data::new)); + _lock.readLock().lock(); + try { + return new InvalidatableKvIterator<>(new MergingKvIterator<>(txWriteback.getIterator(start, key), + new MappingKvIterator<>(delegate.getIterator(start, key), TombstoneMergingKvIterator.Data::new)), + _commitCounter::get, _lock.readLock()); + } finally { + _lock.readLock().unlock(); + } } public CloseableKvIterator> getIterator(JObjectKey key) { diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/persistence/CachingObjectPersistentStore.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/persistence/CachingObjectPersistentStore.java index 34f900f6..807887c2 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/persistence/CachingObjectPersistentStore.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/persistence/CachingObjectPersistentStore.java @@ -113,6 +113,7 @@ public class CachingObjectPersistentStore { } } delegate.commitTx(names); + // Now, reading from the backing store should return the new data synchronized (_cache) { for (var key : Stream.concat(names.written().stream().map(Pair::getLeft), names.deleted().stream()).toList()) {