Objects: don't calculate bundle size all the time from scratch

This commit is contained in:
2025-04-15 17:02:26 +02:00
parent e6ead10e7f
commit 29fdd3eb08

View File

@@ -3,7 +3,6 @@ package com.usatiuk.objects.stores;
import com.usatiuk.objects.JDataVersionedWrapper;
import com.usatiuk.objects.JDataVersionedWrapperImpl;
import com.usatiuk.objects.JObjectKey;
import com.usatiuk.objects.JObjectKeyImpl;
import com.usatiuk.objects.iterators.*;
import com.usatiuk.objects.snapshot.Snapshot;
import com.usatiuk.objects.transaction.TxCommitException;
@@ -115,10 +114,10 @@ public class WritebackObjectPersistentStore {
long diff = 0;
while (!_pendingBundles.isEmpty() && _pendingBundles.peek()._ready) {
var toCompress = _pendingBundles.poll();
diff -= toCompress.calculateTotalSize();
diff -= toCompress.size();
bundle.compress(toCompress);
}
diff += bundle.calculateTotalSize();
diff += bundle.size();
synchronized (_flushWaitSynchronizer) {
currentSize += diff;
}
@@ -145,21 +144,21 @@ public class WritebackObjectPersistentStore {
new TxManifestObj<>(
Collections.unmodifiableList(toWrite),
Collections.unmodifiableList(toDelete)
), bundle.getId());
), bundle.id());
Log.trace("Bundle " + bundle.getId() + " committed");
Log.trace("Bundle " + bundle.id() + " committed");
while (true) {
var curPw = _pendingWrites.get();
var curPwMap = curPw.pendingWrites();
for (var e : bundle._entries.values()) {
var cur = curPwMap.get(e.key());
if (cur.bundleId() <= bundle.getId())
if (cur.bundleId() <= bundle.id())
curPwMap = curPwMap.minus(e.key());
}
var newCurPw = new PendingWriteData(
curPwMap,
bundle.getId(),
bundle.id(),
curPw.lastCommittedId()
);
if (_pendingWrites.compareAndSet(curPw, newCurPw))
@@ -168,15 +167,15 @@ public class WritebackObjectPersistentStore {
List<List<Runnable>> callbacks = new ArrayList<>();
synchronized (_notFlushedBundles) {
_lastWrittenId.set(bundle.getId());
while (!_notFlushedBundles.isEmpty() && _notFlushedBundles.firstEntry().getKey() <= bundle.getId()) {
_lastWrittenId.set(bundle.id());
while (!_notFlushedBundles.isEmpty() && _notFlushedBundles.firstEntry().getKey() <= bundle.id()) {
callbacks.add(_notFlushedBundles.pollFirstEntry().getValue().setCommitted());
}
}
callbacks.forEach(l -> l.forEach(Runnable::run));
synchronized (_flushWaitSynchronizer) {
currentSize -= bundle.calculateTotalSize();
currentSize -= bundle.size();
// FIXME:
if (currentSize <= sizeLimit || !_ready)
_flushWaitSynchronizer.notifyAll();
@@ -219,13 +218,13 @@ public class WritebackObjectPersistentStore {
if (!_pendingBundles.isEmpty() && _pendingBundles.peek()._ready) {
var target = _pendingBundles.poll();
long diff = -target.calculateTotalSize();
long diff = -target.size();
while (!_pendingBundles.isEmpty() && _pendingBundles.peek()._ready) {
var toCompress = _pendingBundles.poll();
diff -= toCompress.calculateTotalSize();
diff -= toCompress.size();
target.compress(toCompress);
}
diff += target.calculateTotalSize();
diff += target.size();
currentSize += diff;
_pendingBundles.addFirst(target);
}
@@ -241,14 +240,14 @@ public class WritebackObjectPersistentStore {
synchronized (_notFlushedBundles) {
bundle = new TxBundle(_lastCommittedId.incrementAndGet());
_pendingBundles.addLast(bundle);
_notFlushedBundles.put(bundle.getId(), bundle);
_notFlushedBundles.put(bundle.id(), bundle);
}
for (var action : writes) {
switch (action) {
case TxRecord.TxObjectRecordWrite<?> write -> {
Log.trace("Flushing object " + write.key());
bundle.commit(new JDataVersionedWrapperImpl(write.data(), bundle.getId()));
bundle.commit(new JDataVersionedWrapperImpl(write.data(), bundle.id()));
}
case TxRecord.TxObjectRecordDeleted deleted -> {
Log.trace("Deleting object " + deleted.key());
@@ -266,10 +265,10 @@ public class WritebackObjectPersistentStore {
for (var e : ((TxBundle) bundle)._entries.values()) {
switch (e) {
case TxBundle.CommittedEntry c -> {
curPwMap = curPwMap.plus(c.key(), new PendingWrite(c.data, bundle.getId()));
curPwMap = curPwMap.plus(c.key(), new PendingWrite(c.data, bundle.id()));
}
case TxBundle.DeletedEntry d -> {
curPwMap = curPwMap.plus(d.key(), new PendingDelete(d.key, bundle.getId()));
curPwMap = curPwMap.plus(d.key(), new PendingDelete(d.key, bundle.id()));
}
default -> throw new IllegalStateException("Unexpected value: " + e);
}
@@ -278,7 +277,7 @@ public class WritebackObjectPersistentStore {
var newCurPw = new PendingWriteData(
curPwMap,
curPw.lastFlushedId(),
bundle.getId()
bundle.id()
);
if (!_pendingWrites.compareAndSet(curPw, newCurPw))
@@ -288,10 +287,10 @@ public class WritebackObjectPersistentStore {
if (_pendingBundles.peek() == bundle)
_pendingBundles.notify();
synchronized (_flushWaitSynchronizer) {
currentSize += ((TxBundle) bundle).calculateTotalSize();
currentSize += ((TxBundle) bundle).size();
}
return bundle.getId();
return bundle.id();
}
}
}
@@ -404,14 +403,14 @@ public class WritebackObjectPersistentStore {
private final ArrayList<Runnable> _callbacks = new ArrayList<>();
private long _txId;
private volatile boolean _ready = false;
private long _size = -1;
private long _size = 0;
private boolean _wasCommitted = false;
private TxBundle(long txId) {
_txId = txId;
}
public long getId() {
public long id() {
return _txId;
}
@@ -433,21 +432,23 @@ public class WritebackObjectPersistentStore {
}
}
public void commit(JDataVersionedWrapper obj) {
synchronized (_entries) {
_entries.put(obj.data().key(), new CommittedEntry(obj.data().key(), obj, obj.data().estimateSize()));
private void putEntry(BundleEntry entry) {
var old = _entries.put(entry.key(), entry);
if (old != null) {
_size -= old.size();
}
_size += entry.size();
}
public void commit(JDataVersionedWrapper obj) {
putEntry(new CommittedEntry(obj.data().key(), obj, obj.data().estimateSize()));
}
public void delete(JObjectKey obj) {
synchronized (_entries) {
_entries.put(obj, new DeletedEntry(obj));
}
putEntry(new DeletedEntry(obj));
}
public long calculateTotalSize() {
if (_size >= 0) return _size;
_size = _entries.values().stream().mapToInt(BundleEntry::size).sum();
public long size() {
return _size;
}
@@ -456,9 +457,16 @@ public class WritebackObjectPersistentStore {
throw new IllegalArgumentException("Compressing an older bundle into newer");
_txId = other._txId;
_size = -1;
_entries.putAll(other._entries);
for (var entry : other._entries.values()) {
putEntry(entry);
}
synchronized (_callbacks) {
assert !_wasCommitted;
assert !other._wasCommitted;
_callbacks.addAll(other._callbacks);
}
}
private interface BundleEntry {