Objects: simplify WritebackObjectPersistentStore

This commit is contained in:
2025-05-01 10:29:11 +02:00
parent 46bc9fa810
commit b506ced9d5
2 changed files with 114 additions and 120 deletions

View File

@@ -25,36 +25,41 @@ import javax.annotation.Nonnull;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import java.util.stream.Stream;
@ApplicationScoped
public class WritebackObjectPersistentStore {
@Inject
CachingObjectPersistentStore cachedStore;
@Inject
ExecutorService _callbackExecutor;
@ConfigProperty(name = "dhfs.objects.writeback.limit")
long sizeLimit;
int sizeLimit;
private TxBundle _pendingBundle = null;
private int _curSize = 0;
private final AtomicReference<TxBundle> _pendingBundle = new AtomicReference<>(null);
private final AtomicReference<PendingWriteData> _pendingWrites = new AtomicReference<>(null);
private final Object _flushWaitSynchronizer = new Object();
private final ReentrantLock _pendingBundleLock = new ReentrantLock();
private final Condition _newBundleCondition = _pendingBundleLock.newCondition();
private final Condition _flushCondition = _pendingBundleLock.newCondition();
private final AtomicLong _lastFlushedId = new AtomicLong(-1);
private final AtomicLong _lastCommittedId = new AtomicLong(-1);
private final AtomicLong _waitedTotal = new AtomicLong(0);
private long currentSize = 0;
private ExecutorService _writebackExecutor;
private ExecutorService _statusExecutor;
@Inject
ExecutorService _callbackExecutor;
private volatile boolean _ready = false;
void init(@Observes @Priority(120) StartupEvent event) {
@@ -72,8 +77,8 @@ public class WritebackObjectPersistentStore {
try {
while (true) {
Thread.sleep(1000);
if (currentSize > 0)
Log.info("Tx commit status: size=" + currentSize / 1024 / 1024 + "MB");
if (_curSize > 0)
Log.info("Tx commit status: size=" + _curSize / 1024 / 1024 + "MB");
}
} catch (InterruptedException ignored) {
}
@@ -91,11 +96,14 @@ public class WritebackObjectPersistentStore {
void shutdown(@Observes @Priority(890) ShutdownEvent event) throws InterruptedException {
Log.info("Waiting for all transactions to drain");
synchronized (_flushWaitSynchronizer) {
_ready = false;
while (currentSize > 0) {
_flushWaitSynchronizer.wait();
_ready = false;
_pendingBundleLock.lock();
try {
while (_curSize > 0) {
_flushCondition.await();
}
} finally {
_pendingBundleLock.unlock();
}
_writebackExecutor.shutdownNow();
@@ -110,9 +118,18 @@ public class WritebackObjectPersistentStore {
while (!Thread.interrupted()) {
try {
TxBundle bundle;
synchronized (_pendingBundle) {
while ((bundle = _pendingBundle.getAndSet(null)) == null)
_pendingBundle.wait();
_pendingBundleLock.lock();
try {
while (_pendingBundle == null)
_newBundleCondition.await();
bundle = _pendingBundle;
_pendingBundle = null;
_curSize -= bundle.size();
assert _curSize == 0;
_flushCondition.signal();
} finally {
_pendingBundleLock.unlock();
}
var toWrite = new ArrayList<Pair<JObjectKey, JDataVersionedWrapper>>();
@@ -136,7 +153,8 @@ public class WritebackObjectPersistentStore {
Log.tracev("Bundle {0} committed", bundle.id());
synchronized (_pendingWrites) {
_pendingBundleLock.lock();
try {
var curPw = _pendingWrites.get();
var curPwMap = curPw.pendingWrites();
for (var e : bundle._entries.values()) {
@@ -150,19 +168,15 @@ public class WritebackObjectPersistentStore {
curPw.lastCommittedId()
);
_pendingWrites.compareAndSet(curPw, newCurPw);
} finally {
_pendingBundleLock.unlock();
}
_lastFlushedId.set(bundle.id());
var callbacks = bundle.getCallbacks();
var callbacks = bundle.callbacks();
_callbackExecutor.submit(() -> {
callbacks.forEach(Runnable::run);
});
synchronized (_flushWaitSynchronizer) {
currentSize -= bundle.size();
if (currentSize <= sizeLimit || !_ready)
_flushWaitSynchronizer.notifyAll();
}
} catch (InterruptedException ignored) {
} catch (Exception e) {
Log.error("Uncaught exception in writeback", e);
@@ -175,87 +189,72 @@ public class WritebackObjectPersistentStore {
public long commitBundle(Collection<TxRecord.TxObjectRecord<?>> writes) {
verifyReady();
while (true) {
synchronized (_flushWaitSynchronizer) {
_pendingBundleLock.lock();
try {
boolean shouldWake = false;
if (_curSize > sizeLimit) {
shouldWake = true;
long started = System.currentTimeMillis();
while (currentSize > sizeLimit) {
try {
_flushWaitSynchronizer.wait();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
while (_curSize > sizeLimit)
_flushCondition.await();
long waited = System.currentTimeMillis() - started;
_waitedTotal.addAndGet(waited);
if (Log.isTraceEnabled())
Log.tracev("Thread {0} waited for tx bundle for {1} ms", Thread.currentThread().getName(), waited);
}
synchronized (_pendingBundle) {
synchronized (_flushWaitSynchronizer) {
if (currentSize > sizeLimit) {
continue;
}
}
var oursId = _lastCommittedId.incrementAndGet();
TxBundle bundle = new TxBundle(_lastCommittedId.incrementAndGet());
for (var action : writes) {
switch (action) {
case TxRecord.TxObjectRecordWrite<?> write -> {
// Log.tracev("Flushing object {0}", write.key());
bundle.commit(new JDataVersionedWrapperImpl(write.data(), bundle.id()));
}
case TxRecord.TxObjectRecordDeleted deleted -> {
// Log.tracev("Deleting object {0}", deleted.key());
bundle.delete(deleted.key());
}
default -> {
throw new TxCommitException("Unexpected value: " + action.key());
}
}
}
synchronized (_pendingWrites) {
var curPw = _pendingWrites.get();
var curPwMap = curPw.pendingWrites();
for (var e : ((TxBundle) bundle)._entries.values()) {
switch (e) {
case TxBundle.CommittedEntry c -> {
curPwMap = curPwMap.plus(c.key(), new PendingWrite(c.data, bundle.id()));
}
case TxBundle.DeletedEntry d -> {
curPwMap = curPwMap.plus(d.key(), new PendingDelete(d.key, bundle.id()));
}
default -> throw new IllegalStateException("Unexpected value: " + e);
}
}
// Now, make the changes visible to new iterators
var newCurPw = new PendingWriteData(
curPwMap,
curPw.lastFlushedId(),
bundle.id()
);
_pendingWrites.compareAndSet(curPw, newCurPw);
}
var curBundle = _pendingBundle.get();
long oldSize = 0;
if (curBundle != null) {
oldSize = curBundle.size();
curBundle.compress(bundle);
} else {
curBundle = bundle;
}
_pendingBundle.set(curBundle);
_pendingBundle.notifyAll();
synchronized (_flushWaitSynchronizer) {
currentSize += (curBundle.size() - oldSize);
}
return bundle.id();
var curBundle = _pendingBundle;
int oldSize = 0;
if (curBundle != null) {
oldSize = curBundle.size();
curBundle.setId(oursId);
} else {
curBundle = new TxBundle(oursId);
}
var curPw = _pendingWrites.get();
var curPwMap = curPw.pendingWrites();
for (var action : writes) {
switch (action) {
case TxRecord.TxObjectRecordWrite<?> write -> {
// Log.tracev("Flushing object {0}", write.key());
var wrapper = new JDataVersionedWrapperImpl(write.data(), oursId);
curPwMap = curPwMap.plus(write.key(), new PendingWrite(wrapper, oursId));
curBundle.commit(wrapper);
}
case TxRecord.TxObjectRecordDeleted deleted -> {
// Log.tracev("Deleting object {0}", deleted.key());
curPwMap = curPwMap.plus(deleted.key(), new PendingDelete(deleted.key(), oursId));
curBundle.delete(deleted.key());
}
}
}
// Now, make the changes visible to new iterators
var newCurPw = new PendingWriteData(
curPwMap,
curPw.lastFlushedId(),
oursId
);
_pendingWrites.compareAndSet(curPw, newCurPw);
_pendingBundle = curBundle;
_newBundleCondition.signalAll();
_curSize += (curBundle.size() - oldSize);
if (shouldWake && _curSize < sizeLimit) {
_flushCondition.signal();
}
return oursId;
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
_pendingBundleLock.unlock();
}
}
@@ -266,17 +265,20 @@ public class WritebackObjectPersistentStore {
fn.run();
return;
}
synchronized (_pendingBundle) {
_pendingBundleLock.lock();
try {
if (_lastFlushedId.get() >= bundleId) {
fn.run();
return;
}
var pendingBundle = _pendingBundle.get();
var pendingBundle = _pendingBundle;
if (pendingBundle == null) {
fn.run();
return;
}
pendingBundle.addCallback(fn);
} finally {
_pendingBundleLock.unlock();
}
}
@@ -363,10 +365,10 @@ public class WritebackObjectPersistentStore {
private static class TxBundle {
private final HashMap<JObjectKey, BundleEntry> _entries = new HashMap<>();
private final ArrayList<Runnable> _callbacks = new ArrayList<>();
private int _size = 0;
private long _txId;
private long _size = 0;
ArrayList<Runnable> getCallbacks() {
ArrayList<Runnable> callbacks() {
return _callbacks;
}
@@ -374,14 +376,23 @@ public class WritebackObjectPersistentStore {
_txId = txId;
}
public void setId(long id) {
_txId = id;
}
public long id() {
return _txId;
}
public void addCallback(Runnable callback) {
_callbacks.add(callback);
}
public int size() {
return _size;
}
private void putEntry(BundleEntry entry) {
var old = _entries.put(entry.key(), entry);
if (old != null) {
@@ -398,24 +409,7 @@ public class WritebackObjectPersistentStore {
putEntry(new DeletedEntry(obj));
}
public long size() {
return _size;
}
public void compress(TxBundle other) {
if (_txId >= other._txId)
throw new IllegalArgumentException("Compressing an older bundle into newer");
_txId = other._txId;
for (var entry : other._entries.values()) {
putEntry(entry);
}
_callbacks.addAll(other._callbacks);
}
private interface BundleEntry {
private sealed interface BundleEntry permits CommittedEntry, DeletedEntry {
JObjectKey key();
int size();

View File

@@ -4,7 +4,7 @@ import com.usatiuk.objects.JData;
import com.usatiuk.objects.JObjectKey;
public class TxRecord {
public interface TxObjectRecord<T> {
public sealed interface TxObjectRecord<T> permits TxObjectRecordWrite, TxObjectRecordDeleted {
JObjectKey key();
}