Sync-base: WritebackObjectPersistentStore cleanup

This commit is contained in:
2025-04-29 12:45:20 +02:00
parent cc9da86440
commit 58de85c078

View File

@@ -27,23 +27,33 @@ import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.stream.Stream;
@ApplicationScoped
public class WritebackObjectPersistentStore {
private final LinkedList<TxBundle> _pendingBundles = new LinkedList<>();
private final LinkedHashMap<Long, TxBundle> _notFlushedBundles = new LinkedHashMap<>();
private final AtomicReference<PendingWriteData> _pendingWrites = new AtomicReference<>(null);
private final Object _flushWaitSynchronizer = new Object();
private final AtomicLong _lastWrittenId = new AtomicLong(-1);
private final AtomicLong _lastCommittedId = new AtomicLong();
private final AtomicLong _waitedTotal = new AtomicLong(0);
@Inject
CachingObjectPersistentStore cachedStore;
@ConfigProperty(name = "dhfs.objects.writeback.limit")
long sizeLimit;
private final AtomicReference<TxBundle> _pendingBundle = new AtomicReference<>(null);
private final AtomicReference<PendingWriteData> _pendingWrites = new AtomicReference<>(null);
private final Object _flushWaitSynchronizer = new Object();
private final AtomicLong _lastFlushedId = new AtomicLong(-1);
private final AtomicLong _lastCommittedId = new AtomicLong(-1);
private final AtomicLong _waitedTotal = new AtomicLong(0);
private long currentSize = 0;
private ExecutorService _writebackExecutor;
private ExecutorService _statusExecutor;
@Inject
ExecutorService _callbackExecutor;
private volatile boolean _ready = false;
void init(@Observes @Priority(120) StartupEvent event) {
@@ -72,7 +82,7 @@ public class WritebackObjectPersistentStore {
lastTxId = s.id();
}
_lastCommittedId.set(lastTxId);
_lastWrittenId.set(lastTxId);
_lastFlushedId.set(lastTxId);
_pendingWrites.set(new PendingWriteData(TreePMap.empty(), lastTxId, lastTxId));
_ready = true;
}
@@ -98,21 +108,10 @@ public class WritebackObjectPersistentStore {
private void writeback() {
while (!Thread.interrupted()) {
try {
TxBundle bundle = new TxBundle(0);
synchronized (_pendingBundles) {
while (_pendingBundles.isEmpty() || !_pendingBundles.peek()._ready)
_pendingBundles.wait();
long diff = 0;
while (!_pendingBundles.isEmpty() && _pendingBundles.peek()._ready) {
var toCompress = _pendingBundles.poll();
diff -= toCompress.size();
bundle.compress(toCompress);
}
diff += bundle.size();
synchronized (_flushWaitSynchronizer) {
currentSize += diff;
}
TxBundle bundle;
synchronized (_pendingBundle) {
while ((bundle = _pendingBundle.getAndSet(null)) == null)
_pendingBundle.wait();
}
var toWrite = new ArrayList<Pair<JObjectKey, JDataVersionedWrapper>>();
@@ -132,15 +131,11 @@ public class WritebackObjectPersistentStore {
}
}
cachedStore.commitTx(
new TxManifestObj<>(
Collections.unmodifiableList(toWrite),
Collections.unmodifiableList(toDelete)
), bundle.id());
cachedStore.commitTx(new TxManifestObj<>(toWrite, toDelete), bundle.id());
Log.tracev("Bundle {0} committed", bundle.id());
while (true) {
synchronized (_pendingWrites) {
var curPw = _pendingWrites.get();
var curPwMap = curPw.pendingWrites();
for (var e : bundle._entries.values()) {
@@ -153,22 +148,17 @@ public class WritebackObjectPersistentStore {
bundle.id(),
curPw.lastCommittedId()
);
if (_pendingWrites.compareAndSet(curPw, newCurPw))
break;
_pendingWrites.compareAndSet(curPw, newCurPw);
}
List<List<Runnable>> callbacks = new ArrayList<>();
synchronized (_notFlushedBundles) {
_lastWrittenId.set(bundle.id());
while (!_notFlushedBundles.isEmpty() && _notFlushedBundles.firstEntry().getKey() <= bundle.id()) {
callbacks.add(_notFlushedBundles.pollFirstEntry().getValue().setCommitted());
}
}
callbacks.forEach(l -> l.forEach(Runnable::run));
_lastFlushedId.set(bundle.id());
var callbacks = bundle.getCallbacks();
_callbackExecutor.submit(() -> {
callbacks.forEach(Runnable::run);
});
synchronized (_flushWaitSynchronizer) {
currentSize -= bundle.size();
// FIXME:
if (currentSize <= sizeLimit || !_ready)
_flushWaitSynchronizer.notifyAll();
}
@@ -184,65 +174,39 @@ public class WritebackObjectPersistentStore {
public long commitBundle(Collection<TxRecord.TxObjectRecord<?>> writes) {
verifyReady();
boolean wait = false;
while (true) {
if (wait) {
synchronized (_flushWaitSynchronizer) {
long started = System.currentTimeMillis();
while (currentSize > sizeLimit) {
try {
_flushWaitSynchronizer.wait();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
synchronized (_flushWaitSynchronizer) {
long started = System.currentTimeMillis();
while (currentSize > sizeLimit) {
try {
_flushWaitSynchronizer.wait();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
long waited = System.currentTimeMillis() - started;
_waitedTotal.addAndGet(waited);
if (Log.isTraceEnabled())
Log.tracev("Thread {0} waited for tx bundle for {1} ms", Thread.currentThread().getName(), waited);
wait = false;
}
long waited = System.currentTimeMillis() - started;
_waitedTotal.addAndGet(waited);
if (Log.isTraceEnabled())
Log.tracev("Thread {0} waited for tx bundle for {1} ms", Thread.currentThread().getName(), waited);
}
synchronized (_pendingBundles) {
synchronized (_pendingBundle) {
synchronized (_flushWaitSynchronizer) {
if (currentSize > sizeLimit) {
if (!_pendingBundles.isEmpty() && _pendingBundles.peek()._ready) {
var target = _pendingBundles.poll();
long diff = -target.size();
while (!_pendingBundles.isEmpty() && _pendingBundles.peek()._ready) {
var toCompress = _pendingBundles.poll();
diff -= toCompress.size();
target.compress(toCompress);
}
diff += target.size();
currentSize += diff;
_pendingBundles.addFirst(target);
}
}
if (currentSize > sizeLimit) {
wait = true;
continue;
}
}
TxBundle bundle;
synchronized (_notFlushedBundles) {
bundle = new TxBundle(_lastCommittedId.incrementAndGet());
_pendingBundles.addLast(bundle);
_notFlushedBundles.put(bundle.id(), bundle);
}
TxBundle bundle = new TxBundle(_lastCommittedId.incrementAndGet());
for (var action : writes) {
switch (action) {
case TxRecord.TxObjectRecordWrite<?> write -> {
Log.tracev("Flushing object {0}", write.key());
// Log.tracev("Flushing object {0}", write.key());
bundle.commit(new JDataVersionedWrapperImpl(write.data(), bundle.id()));
}
case TxRecord.TxObjectRecordDeleted deleted -> {
Log.tracev("Deleting object {0}", deleted.key());
// Log.tracev("Deleting object {0}", deleted.key());
bundle.delete(deleted.key());
}
default -> {
@@ -251,7 +215,7 @@ public class WritebackObjectPersistentStore {
}
}
while (true) {
synchronized (_pendingWrites) {
var curPw = _pendingWrites.get();
var curPwMap = curPw.pendingWrites();
for (var e : ((TxBundle) bundle)._entries.values()) {
@@ -272,18 +236,24 @@ public class WritebackObjectPersistentStore {
bundle.id()
);
if (!_pendingWrites.compareAndSet(curPw, newCurPw))
continue;
((TxBundle) bundle).setReady();
if (_pendingBundles.peek() == bundle)
_pendingBundles.notify();
synchronized (_flushWaitSynchronizer) {
currentSize += ((TxBundle) bundle).size();
}
return bundle.id();
_pendingWrites.compareAndSet(curPw, newCurPw);
}
var curBundle = _pendingBundle.get();
long oldSize = 0;
if (curBundle != null) {
oldSize = curBundle.size();
curBundle.compress(bundle);
} else {
curBundle = bundle;
}
_pendingBundle.set(curBundle);
_pendingBundle.notifyAll();
synchronized (_flushWaitSynchronizer) {
currentSize += (curBundle.size() - oldSize);
}
return bundle.id();
}
}
}
@@ -291,16 +261,21 @@ public class WritebackObjectPersistentStore {
public void asyncFence(long bundleId, Runnable fn) {
verifyReady();
if (bundleId < 0) throw new IllegalArgumentException("txId should be >0!");
if (_lastWrittenId.get() >= bundleId) {
if (_lastFlushedId.get() >= bundleId) {
fn.run();
return;
}
synchronized (_notFlushedBundles) {
if (_lastWrittenId.get() >= bundleId) {
synchronized (_pendingBundle) {
if (_lastFlushedId.get() >= bundleId) {
fn.run();
return;
}
_notFlushedBundles.get(bundleId).addCallback(fn);
var pendingBundle = _pendingBundle.get();
if (pendingBundle == null) {
fn.run();
return;
}
pendingBundle.addCallback(fn);
}
}
@@ -381,21 +356,20 @@ public class WritebackObjectPersistentStore {
}
}
public interface VerboseReadResult {
}
private record PendingWriteData(TreePMap<JObjectKey, PendingWriteEntry> pendingWrites,
long lastFlushedId,
long lastCommittedId) {
}
private static class TxBundle {
private final LinkedHashMap<JObjectKey, BundleEntry> _entries = new LinkedHashMap<>();
private final HashMap<JObjectKey, BundleEntry> _entries = new HashMap<>();
private final ArrayList<Runnable> _callbacks = new ArrayList<>();
private long _txId;
private volatile boolean _ready = false;
private long _size = 0;
private boolean _wasCommitted = false;
ArrayList<Runnable> getCallbacks() {
return _callbacks;
}
private TxBundle(long txId) {
_txId = txId;
@@ -405,22 +379,8 @@ public class WritebackObjectPersistentStore {
return _txId;
}
public void setReady() {
_ready = true;
}
public void addCallback(Runnable callback) {
synchronized (_callbacks) {
if (_wasCommitted) throw new IllegalStateException();
_callbacks.add(callback);
}
}
public List<Runnable> setCommitted() {
synchronized (_callbacks) {
_wasCommitted = true;
return Collections.unmodifiableList(_callbacks);
}
_callbacks.add(callback);
}
private void putEntry(BundleEntry entry) {
@@ -453,11 +413,7 @@ public class WritebackObjectPersistentStore {
putEntry(entry);
}
synchronized (_callbacks) {
assert !_wasCommitted;
assert !other._wasCommitted;
_callbacks.addAll(other._callbacks);
}
_callbacks.addAll(other._callbacks);
}
private interface BundleEntry {
@@ -478,10 +434,4 @@ public class WritebackObjectPersistentStore {
}
}
}
public record VerboseReadResultPersisted(Optional<JDataVersionedWrapper> data) implements VerboseReadResult {
}
public record VerboseReadResultPending(PendingWriteEntry pending) implements VerboseReadResult {
}
}