From 58de85c07890831baa24ce8e4f7c9f0527416769 Mon Sep 17 00:00:00 2001 From: Stepan Usatiuk Date: Tue, 29 Apr 2025 12:45:20 +0200 Subject: [PATCH] Sync-base: WritebackObjectPersistentStore cleanup --- .../WritebackObjectPersistentStore.java | 208 +++++++----------- 1 file changed, 79 insertions(+), 129 deletions(-) diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/stores/WritebackObjectPersistentStore.java b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/stores/WritebackObjectPersistentStore.java index 41a449e9..d02816f2 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/stores/WritebackObjectPersistentStore.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/stores/WritebackObjectPersistentStore.java @@ -27,23 +27,33 @@ import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; +import java.util.stream.Stream; @ApplicationScoped public class WritebackObjectPersistentStore { - private final LinkedList _pendingBundles = new LinkedList<>(); - private final LinkedHashMap _notFlushedBundles = new LinkedHashMap<>(); - private final AtomicReference _pendingWrites = new AtomicReference<>(null); - private final Object _flushWaitSynchronizer = new Object(); - private final AtomicLong _lastWrittenId = new AtomicLong(-1); - private final AtomicLong _lastCommittedId = new AtomicLong(); - private final AtomicLong _waitedTotal = new AtomicLong(0); @Inject CachingObjectPersistentStore cachedStore; + @ConfigProperty(name = "dhfs.objects.writeback.limit") long sizeLimit; + + private final AtomicReference _pendingBundle = new AtomicReference<>(null); + private final AtomicReference _pendingWrites = new AtomicReference<>(null); + + private final Object _flushWaitSynchronizer = new Object(); + + private final AtomicLong _lastFlushedId = new AtomicLong(-1); + private final AtomicLong _lastCommittedId = new AtomicLong(-1); + + private final AtomicLong _waitedTotal = new AtomicLong(0); private long currentSize = 0; + private ExecutorService _writebackExecutor; private ExecutorService _statusExecutor; + + @Inject + ExecutorService _callbackExecutor; + private volatile boolean _ready = false; void init(@Observes @Priority(120) StartupEvent event) { @@ -72,7 +82,7 @@ public class WritebackObjectPersistentStore { lastTxId = s.id(); } _lastCommittedId.set(lastTxId); - _lastWrittenId.set(lastTxId); + _lastFlushedId.set(lastTxId); _pendingWrites.set(new PendingWriteData(TreePMap.empty(), lastTxId, lastTxId)); _ready = true; } @@ -98,21 +108,10 @@ public class WritebackObjectPersistentStore { private void writeback() { while (!Thread.interrupted()) { try { - TxBundle bundle = new TxBundle(0); - synchronized (_pendingBundles) { - while (_pendingBundles.isEmpty() || !_pendingBundles.peek()._ready) - _pendingBundles.wait(); - - long diff = 0; - while (!_pendingBundles.isEmpty() && _pendingBundles.peek()._ready) { - var toCompress = _pendingBundles.poll(); - diff -= toCompress.size(); - bundle.compress(toCompress); - } - diff += bundle.size(); - synchronized (_flushWaitSynchronizer) { - currentSize += diff; - } + TxBundle bundle; + synchronized (_pendingBundle) { + while ((bundle = _pendingBundle.getAndSet(null)) == null) + _pendingBundle.wait(); } var toWrite = new ArrayList>(); @@ -132,15 +131,11 @@ public class WritebackObjectPersistentStore { } } - cachedStore.commitTx( - new TxManifestObj<>( - Collections.unmodifiableList(toWrite), - Collections.unmodifiableList(toDelete) - ), bundle.id()); + cachedStore.commitTx(new TxManifestObj<>(toWrite, toDelete), bundle.id()); Log.tracev("Bundle {0} committed", bundle.id()); - while (true) { + synchronized (_pendingWrites) { var curPw = _pendingWrites.get(); var curPwMap = curPw.pendingWrites(); for (var e : bundle._entries.values()) { @@ -153,22 +148,17 @@ public class WritebackObjectPersistentStore { bundle.id(), curPw.lastCommittedId() ); - if (_pendingWrites.compareAndSet(curPw, newCurPw)) - break; + _pendingWrites.compareAndSet(curPw, newCurPw); } - List> callbacks = new ArrayList<>(); - synchronized (_notFlushedBundles) { - _lastWrittenId.set(bundle.id()); - while (!_notFlushedBundles.isEmpty() && _notFlushedBundles.firstEntry().getKey() <= bundle.id()) { - callbacks.add(_notFlushedBundles.pollFirstEntry().getValue().setCommitted()); - } - } - callbacks.forEach(l -> l.forEach(Runnable::run)); + _lastFlushedId.set(bundle.id()); + var callbacks = bundle.getCallbacks(); + _callbackExecutor.submit(() -> { + callbacks.forEach(Runnable::run); + }); synchronized (_flushWaitSynchronizer) { currentSize -= bundle.size(); - // FIXME: if (currentSize <= sizeLimit || !_ready) _flushWaitSynchronizer.notifyAll(); } @@ -184,65 +174,39 @@ public class WritebackObjectPersistentStore { public long commitBundle(Collection> writes) { verifyReady(); - boolean wait = false; while (true) { - if (wait) { - synchronized (_flushWaitSynchronizer) { - long started = System.currentTimeMillis(); - while (currentSize > sizeLimit) { - try { - _flushWaitSynchronizer.wait(); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } + synchronized (_flushWaitSynchronizer) { + long started = System.currentTimeMillis(); + while (currentSize > sizeLimit) { + try { + _flushWaitSynchronizer.wait(); + } catch (InterruptedException e) { + throw new RuntimeException(e); } - long waited = System.currentTimeMillis() - started; - _waitedTotal.addAndGet(waited); - if (Log.isTraceEnabled()) - Log.tracev("Thread {0} waited for tx bundle for {1} ms", Thread.currentThread().getName(), waited); - wait = false; } + long waited = System.currentTimeMillis() - started; + _waitedTotal.addAndGet(waited); + if (Log.isTraceEnabled()) + Log.tracev("Thread {0} waited for tx bundle for {1} ms", Thread.currentThread().getName(), waited); } - synchronized (_pendingBundles) { + synchronized (_pendingBundle) { synchronized (_flushWaitSynchronizer) { if (currentSize > sizeLimit) { - if (!_pendingBundles.isEmpty() && _pendingBundles.peek()._ready) { - var target = _pendingBundles.poll(); - - long diff = -target.size(); - while (!_pendingBundles.isEmpty() && _pendingBundles.peek()._ready) { - var toCompress = _pendingBundles.poll(); - diff -= toCompress.size(); - target.compress(toCompress); - } - diff += target.size(); - currentSize += diff; - _pendingBundles.addFirst(target); - } - } - - if (currentSize > sizeLimit) { - wait = true; continue; } } - TxBundle bundle; - synchronized (_notFlushedBundles) { - bundle = new TxBundle(_lastCommittedId.incrementAndGet()); - _pendingBundles.addLast(bundle); - _notFlushedBundles.put(bundle.id(), bundle); - } + TxBundle bundle = new TxBundle(_lastCommittedId.incrementAndGet()); for (var action : writes) { switch (action) { case TxRecord.TxObjectRecordWrite write -> { - Log.tracev("Flushing object {0}", write.key()); +// Log.tracev("Flushing object {0}", write.key()); bundle.commit(new JDataVersionedWrapperImpl(write.data(), bundle.id())); } case TxRecord.TxObjectRecordDeleted deleted -> { - Log.tracev("Deleting object {0}", deleted.key()); +// Log.tracev("Deleting object {0}", deleted.key()); bundle.delete(deleted.key()); } default -> { @@ -251,7 +215,7 @@ public class WritebackObjectPersistentStore { } } - while (true) { + synchronized (_pendingWrites) { var curPw = _pendingWrites.get(); var curPwMap = curPw.pendingWrites(); for (var e : ((TxBundle) bundle)._entries.values()) { @@ -272,18 +236,24 @@ public class WritebackObjectPersistentStore { bundle.id() ); - if (!_pendingWrites.compareAndSet(curPw, newCurPw)) - continue; - - ((TxBundle) bundle).setReady(); - if (_pendingBundles.peek() == bundle) - _pendingBundles.notify(); - synchronized (_flushWaitSynchronizer) { - currentSize += ((TxBundle) bundle).size(); - } - - return bundle.id(); + _pendingWrites.compareAndSet(curPw, newCurPw); } + + var curBundle = _pendingBundle.get(); + long oldSize = 0; + if (curBundle != null) { + oldSize = curBundle.size(); + curBundle.compress(bundle); + } else { + curBundle = bundle; + } + _pendingBundle.set(curBundle); + _pendingBundle.notifyAll(); + synchronized (_flushWaitSynchronizer) { + currentSize += (curBundle.size() - oldSize); + } + + return bundle.id(); } } } @@ -291,16 +261,21 @@ public class WritebackObjectPersistentStore { public void asyncFence(long bundleId, Runnable fn) { verifyReady(); if (bundleId < 0) throw new IllegalArgumentException("txId should be >0!"); - if (_lastWrittenId.get() >= bundleId) { + if (_lastFlushedId.get() >= bundleId) { fn.run(); return; } - synchronized (_notFlushedBundles) { - if (_lastWrittenId.get() >= bundleId) { + synchronized (_pendingBundle) { + if (_lastFlushedId.get() >= bundleId) { fn.run(); return; } - _notFlushedBundles.get(bundleId).addCallback(fn); + var pendingBundle = _pendingBundle.get(); + if (pendingBundle == null) { + fn.run(); + return; + } + pendingBundle.addCallback(fn); } } @@ -381,21 +356,20 @@ public class WritebackObjectPersistentStore { } } - public interface VerboseReadResult { - } - private record PendingWriteData(TreePMap pendingWrites, long lastFlushedId, long lastCommittedId) { } private static class TxBundle { - private final LinkedHashMap _entries = new LinkedHashMap<>(); + private final HashMap _entries = new HashMap<>(); private final ArrayList _callbacks = new ArrayList<>(); private long _txId; - private volatile boolean _ready = false; private long _size = 0; - private boolean _wasCommitted = false; + + ArrayList getCallbacks() { + return _callbacks; + } private TxBundle(long txId) { _txId = txId; @@ -405,22 +379,8 @@ public class WritebackObjectPersistentStore { return _txId; } - public void setReady() { - _ready = true; - } - public void addCallback(Runnable callback) { - synchronized (_callbacks) { - if (_wasCommitted) throw new IllegalStateException(); - _callbacks.add(callback); - } - } - - public List setCommitted() { - synchronized (_callbacks) { - _wasCommitted = true; - return Collections.unmodifiableList(_callbacks); - } + _callbacks.add(callback); } private void putEntry(BundleEntry entry) { @@ -453,11 +413,7 @@ public class WritebackObjectPersistentStore { putEntry(entry); } - synchronized (_callbacks) { - assert !_wasCommitted; - assert !other._wasCommitted; - _callbacks.addAll(other._callbacks); - } + _callbacks.addAll(other._callbacks); } private interface BundleEntry { @@ -478,10 +434,4 @@ public class WritebackObjectPersistentStore { } } } - - public record VerboseReadResultPersisted(Optional data) implements VerboseReadResult { - } - - public record VerboseReadResultPending(PendingWriteEntry pending) implements VerboseReadResult { - } }