From 29fdd3eb08adbe2e6e68c264518816a7269956a2 Mon Sep 17 00:00:00 2001 From: Stepan Usatiuk Date: Tue, 15 Apr 2025 17:02:26 +0200 Subject: [PATCH] Objects: don't calculate bundle size all the time from scratch --- .../WritebackObjectPersistentStore.java | 74 ++++++++++--------- 1 file changed, 41 insertions(+), 33 deletions(-) diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/stores/WritebackObjectPersistentStore.java b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/stores/WritebackObjectPersistentStore.java index f5ff0a03..9bf75cdb 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/stores/WritebackObjectPersistentStore.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/stores/WritebackObjectPersistentStore.java @@ -3,7 +3,6 @@ package com.usatiuk.objects.stores; import com.usatiuk.objects.JDataVersionedWrapper; import com.usatiuk.objects.JDataVersionedWrapperImpl; import com.usatiuk.objects.JObjectKey; -import com.usatiuk.objects.JObjectKeyImpl; import com.usatiuk.objects.iterators.*; import com.usatiuk.objects.snapshot.Snapshot; import com.usatiuk.objects.transaction.TxCommitException; @@ -115,10 +114,10 @@ public class WritebackObjectPersistentStore { long diff = 0; while (!_pendingBundles.isEmpty() && _pendingBundles.peek()._ready) { var toCompress = _pendingBundles.poll(); - diff -= toCompress.calculateTotalSize(); + diff -= toCompress.size(); bundle.compress(toCompress); } - diff += bundle.calculateTotalSize(); + diff += bundle.size(); synchronized (_flushWaitSynchronizer) { currentSize += diff; } @@ -145,21 +144,21 @@ public class WritebackObjectPersistentStore { new TxManifestObj<>( Collections.unmodifiableList(toWrite), Collections.unmodifiableList(toDelete) - ), bundle.getId()); + ), bundle.id()); - Log.trace("Bundle " + bundle.getId() + " committed"); + Log.trace("Bundle " + bundle.id() + " committed"); while (true) { var curPw = _pendingWrites.get(); var curPwMap = curPw.pendingWrites(); for (var e : bundle._entries.values()) { var cur = curPwMap.get(e.key()); - if (cur.bundleId() <= bundle.getId()) + if (cur.bundleId() <= bundle.id()) curPwMap = curPwMap.minus(e.key()); } var newCurPw = new PendingWriteData( curPwMap, - bundle.getId(), + bundle.id(), curPw.lastCommittedId() ); if (_pendingWrites.compareAndSet(curPw, newCurPw)) @@ -168,15 +167,15 @@ public class WritebackObjectPersistentStore { List> callbacks = new ArrayList<>(); synchronized (_notFlushedBundles) { - _lastWrittenId.set(bundle.getId()); - while (!_notFlushedBundles.isEmpty() && _notFlushedBundles.firstEntry().getKey() <= bundle.getId()) { + _lastWrittenId.set(bundle.id()); + while (!_notFlushedBundles.isEmpty() && _notFlushedBundles.firstEntry().getKey() <= bundle.id()) { callbacks.add(_notFlushedBundles.pollFirstEntry().getValue().setCommitted()); } } callbacks.forEach(l -> l.forEach(Runnable::run)); synchronized (_flushWaitSynchronizer) { - currentSize -= bundle.calculateTotalSize(); + currentSize -= bundle.size(); // FIXME: if (currentSize <= sizeLimit || !_ready) _flushWaitSynchronizer.notifyAll(); @@ -219,13 +218,13 @@ public class WritebackObjectPersistentStore { if (!_pendingBundles.isEmpty() && _pendingBundles.peek()._ready) { var target = _pendingBundles.poll(); - long diff = -target.calculateTotalSize(); + long diff = -target.size(); while (!_pendingBundles.isEmpty() && _pendingBundles.peek()._ready) { var toCompress = _pendingBundles.poll(); - diff -= toCompress.calculateTotalSize(); + diff -= toCompress.size(); target.compress(toCompress); } - diff += target.calculateTotalSize(); + diff += target.size(); currentSize += diff; _pendingBundles.addFirst(target); } @@ -241,14 +240,14 @@ public class WritebackObjectPersistentStore { synchronized (_notFlushedBundles) { bundle = new TxBundle(_lastCommittedId.incrementAndGet()); _pendingBundles.addLast(bundle); - _notFlushedBundles.put(bundle.getId(), bundle); + _notFlushedBundles.put(bundle.id(), bundle); } for (var action : writes) { switch (action) { case TxRecord.TxObjectRecordWrite write -> { Log.trace("Flushing object " + write.key()); - bundle.commit(new JDataVersionedWrapperImpl(write.data(), bundle.getId())); + bundle.commit(new JDataVersionedWrapperImpl(write.data(), bundle.id())); } case TxRecord.TxObjectRecordDeleted deleted -> { Log.trace("Deleting object " + deleted.key()); @@ -266,10 +265,10 @@ public class WritebackObjectPersistentStore { for (var e : ((TxBundle) bundle)._entries.values()) { switch (e) { case TxBundle.CommittedEntry c -> { - curPwMap = curPwMap.plus(c.key(), new PendingWrite(c.data, bundle.getId())); + curPwMap = curPwMap.plus(c.key(), new PendingWrite(c.data, bundle.id())); } case TxBundle.DeletedEntry d -> { - curPwMap = curPwMap.plus(d.key(), new PendingDelete(d.key, bundle.getId())); + curPwMap = curPwMap.plus(d.key(), new PendingDelete(d.key, bundle.id())); } default -> throw new IllegalStateException("Unexpected value: " + e); } @@ -278,7 +277,7 @@ public class WritebackObjectPersistentStore { var newCurPw = new PendingWriteData( curPwMap, curPw.lastFlushedId(), - bundle.getId() + bundle.id() ); if (!_pendingWrites.compareAndSet(curPw, newCurPw)) @@ -288,10 +287,10 @@ public class WritebackObjectPersistentStore { if (_pendingBundles.peek() == bundle) _pendingBundles.notify(); synchronized (_flushWaitSynchronizer) { - currentSize += ((TxBundle) bundle).calculateTotalSize(); + currentSize += ((TxBundle) bundle).size(); } - return bundle.getId(); + return bundle.id(); } } } @@ -404,14 +403,14 @@ public class WritebackObjectPersistentStore { private final ArrayList _callbacks = new ArrayList<>(); private long _txId; private volatile boolean _ready = false; - private long _size = -1; + private long _size = 0; private boolean _wasCommitted = false; private TxBundle(long txId) { _txId = txId; } - public long getId() { + public long id() { return _txId; } @@ -433,21 +432,23 @@ public class WritebackObjectPersistentStore { } } - public void commit(JDataVersionedWrapper obj) { - synchronized (_entries) { - _entries.put(obj.data().key(), new CommittedEntry(obj.data().key(), obj, obj.data().estimateSize())); + private void putEntry(BundleEntry entry) { + var old = _entries.put(entry.key(), entry); + if (old != null) { + _size -= old.size(); } + _size += entry.size(); + } + + public void commit(JDataVersionedWrapper obj) { + putEntry(new CommittedEntry(obj.data().key(), obj, obj.data().estimateSize())); } public void delete(JObjectKey obj) { - synchronized (_entries) { - _entries.put(obj, new DeletedEntry(obj)); - } + putEntry(new DeletedEntry(obj)); } - public long calculateTotalSize() { - if (_size >= 0) return _size; - _size = _entries.values().stream().mapToInt(BundleEntry::size).sum(); + public long size() { return _size; } @@ -456,9 +457,16 @@ public class WritebackObjectPersistentStore { throw new IllegalArgumentException("Compressing an older bundle into newer"); _txId = other._txId; - _size = -1; - _entries.putAll(other._entries); + for (var entry : other._entries.values()) { + putEntry(entry); + } + + synchronized (_callbacks) { + assert !_wasCommitted; + assert !other._wasCommitted; + _callbacks.addAll(other._callbacks); + } } private interface BundleEntry {