Objects: less locks in writeback

This commit is contained in:
2025-04-02 16:14:50 +02:00
parent 0e12a59f23
commit 3c37638db2
2 changed files with 100 additions and 141 deletions

View File

@@ -23,11 +23,6 @@ public class SnapshotManager {
// This should not be called for the same objects concurrently
public Consumer<Runnable> commitTx(Collection<TxRecord.TxObjectRecord<?>> writes) {
// TODO: FIXME:
synchronized (this) {
return writebackStore.commitTx(writes, (id, commit) -> {
commit.run();
});
}
return writebackStore.commitTx(writes);
}
}

View File

@@ -4,7 +4,6 @@ import com.usatiuk.objects.JDataVersionedWrapper;
import com.usatiuk.objects.JDataVersionedWrapperImpl;
import com.usatiuk.objects.JObjectKey;
import com.usatiuk.objects.iterators.*;
import com.usatiuk.objects.iterators.*;
import com.usatiuk.objects.snapshot.Snapshot;
import com.usatiuk.objects.transaction.TxCommitException;
import com.usatiuk.objects.transaction.TxRecord;
@@ -23,27 +22,30 @@ import org.pcollections.TreePMap;
import javax.annotation.Nonnull;
import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
@ApplicationScoped
public class WritebackObjectPersistentStore {
private final LinkedList<TxBundle> _pendingBundles = new LinkedList<>();
private final AtomicReference<PSortedMap<JObjectKey, PendingWriteEntry>> _pendingWrites = new AtomicReference<>(TreePMap.empty());
private final ReentrantReadWriteLock _pendingWritesVersionLock = new ReentrantReadWriteLock();
private final LinkedHashMap<Long, TxBundle> _notFlushedBundles = new LinkedHashMap<>();
private record PendingWriteData(TreePMap<JObjectKey, PendingWriteEntry> pendingWrites,
long lastFlushedId,
long lastCommittedId) {
}
private final AtomicReference<PendingWriteData> _pendingWrites = new AtomicReference<>(null);
private final Object _flushWaitSynchronizer = new Object();
private final AtomicLong _lastWrittenTx = new AtomicLong(-1);
private final AtomicLong _counter = new AtomicLong();
private final AtomicLong _lastCommittedTx = new AtomicLong(-1);
private final AtomicLong _lastWrittenId = new AtomicLong(-1);
private final AtomicLong _lastCommittedId = new AtomicLong();
private final AtomicLong _waitedTotal = new AtomicLong(0);
@Inject
CachingObjectPersistentStore cachedStore;
@@ -79,8 +81,8 @@ public class WritebackObjectPersistentStore {
try (var s = cachedStore.getSnapshot()) {
lastTxId = s.id();
}
_counter.set(lastTxId);
_lastCommittedTx.set(lastTxId);
_lastCommittedId.set(lastTxId);
_pendingWrites.set(new PendingWriteData(TreePMap.empty(), lastTxId, lastTxId));
_ready = true;
}
@@ -147,23 +149,26 @@ public class WritebackObjectPersistentStore {
Log.trace("Bundle " + bundle.getId() + " committed");
// Remove from pending writes, after real commit
// As we are the only writers to _pendingWrites, no need to synchronize with iterator creation
// if they get the older version, as it will still contain all the new changes
synchronized (_pendingBundles) {
while (true) {
var curPw = _pendingWrites.get();
var curPwMap = curPw.pendingWrites();
for (var e : bundle._entries.values()) {
var cur = curPw.get(e.key());
var cur = curPwMap.get(e.key());
if (cur.bundleId() <= bundle.getId())
curPw = curPw.minus(e.key());
curPwMap = curPwMap.minus(e.key());
}
_pendingWrites.set(curPw);
// No need to increment version
var newCurPw = new PendingWriteData(
curPwMap,
bundle.getId(),
curPw.lastCommittedId()
);
if (_pendingWrites.compareAndSet(curPw, newCurPw))
break;
}
List<List<Runnable>> callbacks = new ArrayList<>();
synchronized (_notFlushedBundles) {
_lastWrittenTx.set(bundle.getId());
_lastWrittenId.set(bundle.getId());
while (!_notFlushedBundles.isEmpty() && _notFlushedBundles.firstEntry().getKey() <= bundle.getId()) {
callbacks.add(_notFlushedBundles.pollFirstEntry().getValue().setCommitted());
}
@@ -186,8 +191,7 @@ public class WritebackObjectPersistentStore {
Log.info("Writeback thread exiting");
}
public TxBundle createBundle() {
public long commitBundle(Collection<TxRecord.TxObjectRecord<?>> writes) {
verifyReady();
boolean wait = false;
while (true) {
@@ -208,6 +212,7 @@ public class WritebackObjectPersistentStore {
wait = false;
}
}
synchronized (_pendingBundles) {
synchronized (_flushWaitSynchronizer) {
if (currentSize > sizeLimit) {
@@ -231,79 +236,76 @@ public class WritebackObjectPersistentStore {
continue;
}
}
TxBundle bundle;
synchronized (_notFlushedBundles) {
var bundle = new TxBundle(_counter.incrementAndGet());
bundle = new TxBundle(_lastCommittedId.incrementAndGet());
_pendingBundles.addLast(bundle);
_notFlushedBundles.put(bundle.getId(), bundle);
return bundle;
}
}
}
}
public void commitBundle(TxBundle bundle) {
verifyReady();
_pendingWritesVersionLock.writeLock().lock();
try {
synchronized (_pendingBundles) {
var curPw = _pendingWrites.get();
for (var e : ((TxBundle) bundle)._entries.values()) {
switch (e) {
case TxBundle.CommittedEntry c -> {
curPw = curPw.plus(c.key(), new PendingWrite(c.data, bundle.getId()));
for (var action : writes) {
switch (action) {
case TxRecord.TxObjectRecordWrite<?> write -> {
Log.trace("Flushing object " + write.key());
bundle.commit(new JDataVersionedWrapperImpl(write.data(), bundle.getId()));
}
case TxBundle.DeletedEntry d -> {
curPw = curPw.plus(d.key(), new PendingDelete(d.key, bundle.getId()));
case TxRecord.TxObjectRecordDeleted deleted -> {
Log.trace("Deleting object " + deleted.key());
bundle.delete(deleted.key());
}
default -> {
throw new TxCommitException("Unexpected value: " + action.key());
}
default -> throw new IllegalStateException("Unexpected value: " + e);
}
}
// Now, make the changes visible to new iterators
_pendingWrites.set(curPw);
((TxBundle) bundle).setReady();
if (_pendingBundles.peek() == bundle)
_pendingBundles.notify();
synchronized (_flushWaitSynchronizer) {
currentSize += ((TxBundle) bundle).calculateTotalSize();
while (true) {
var curPw = _pendingWrites.get();
var curPwMap = curPw.pendingWrites();
for (var e : ((TxBundle) bundle)._entries.values()) {
switch (e) {
case TxBundle.CommittedEntry c -> {
curPwMap = curPwMap.plus(c.key(), new PendingWrite(c.data, bundle.getId()));
}
case TxBundle.DeletedEntry d -> {
curPwMap = curPwMap.plus(d.key(), new PendingDelete(d.key, bundle.getId()));
}
default -> throw new IllegalStateException("Unexpected value: " + e);
}
}
// Now, make the changes visible to new iterators
var newCurPw = new PendingWriteData(
curPwMap,
curPw.lastFlushedId(),
bundle.getId()
);
if (!_pendingWrites.compareAndSet(curPw, newCurPw))
continue;
((TxBundle) bundle).setReady();
if (_pendingBundles.peek() == bundle)
_pendingBundles.notify();
synchronized (_flushWaitSynchronizer) {
currentSize += ((TxBundle) bundle).calculateTotalSize();
}
return bundle.getId();
}
}
assert bundle.getId() > _lastCommittedTx.get();
_lastCommittedTx.set(bundle.getId());
} finally {
_pendingWritesVersionLock.writeLock().unlock();
}
}
public void dropBundle(TxBundle bundle) {
verifyReady();
synchronized (_pendingBundles) {
Log.warn("Dropped bundle: " + bundle);
_pendingBundles.remove((TxBundle) bundle);
synchronized (_flushWaitSynchronizer) {
currentSize -= ((TxBundle) bundle).calculateTotalSize();
}
}
}
public void fence(long bundleId) {
var latch = new CountDownLatch(1);
asyncFence(bundleId, latch::countDown);
try {
latch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
public void asyncFence(long bundleId, Runnable fn) {
verifyReady();
if (bundleId < 0) throw new IllegalArgumentException("txId should be >0!");
if (_lastWrittenTx.get() >= bundleId) {
if (_lastWrittenId.get() >= bundleId) {
fn.run();
return;
}
synchronized (_notFlushedBundles) {
if (_lastWrittenTx.get() >= bundleId) {
if (_lastWrittenId.get() >= bundleId) {
fn.run();
return;
}
@@ -311,70 +313,41 @@ public class WritebackObjectPersistentStore {
}
}
public Optional<PendingWriteEntry> getPendingWrite(JObjectKey key) {
synchronized (_pendingBundles) {
return Optional.ofNullable(_pendingWrites.get().get(key));
}
}
/**
* @param commitLocked - a function that will be called with a Consumer of a new transaction id,
* that will commit the transaction the changes in the store will be visible to new transactions
* only after the runnable is called
*/
public Consumer<Runnable> commitTx(Collection<TxRecord.TxObjectRecord<?>> writes, BiConsumer<Long, Runnable> commitLocked) {
var bundle = createBundle();
long bundleId = bundle.getId();
try {
for (var action : writes) {
switch (action) {
case TxRecord.TxObjectRecordWrite<?> write -> {
Log.trace("Flushing object " + write.key());
bundle.commit(new JDataVersionedWrapperImpl(write.data(), bundleId));
}
case TxRecord.TxObjectRecordDeleted deleted -> {
Log.trace("Deleting object " + deleted.key());
bundle.delete(deleted.key());
}
default -> {
throw new TxCommitException("Unexpected value: " + action.key());
}
}
}
} catch (Throwable t) {
dropBundle(bundle);
throw new TxCommitException(t.getMessage(), t);
}
Log.tracef("Committing transaction %d to storage", bundleId);
commitLocked.accept(bundleId, () -> {
commitBundle(bundle);
});
public Consumer<Runnable> commitTx(Collection<TxRecord.TxObjectRecord<?>> writes) {
long bundleId = commitBundle(writes);
return r -> asyncFence(bundleId, r);
}
public Snapshot<JObjectKey, JDataVersionedWrapper> getSnapshot() {
PSortedMap<JObjectKey, PendingWriteEntry> pendingWrites;
Snapshot<JObjectKey, JDataVersionedWrapper> cache = null;
long lastTxId;
PendingWriteData pw = null;
try {
_pendingWritesVersionLock.readLock().lock();
try {
pendingWrites = _pendingWrites.get();
while (true) {
pw = _pendingWrites.get();
cache = cachedStore.getSnapshot();
lastTxId = getLastTxId();
} finally {
_pendingWritesVersionLock.readLock().unlock();
if (cache.id() >= pw.lastCommittedId())
return cache;
// TODO: Can this really happen?
if (cache.id() < pw.lastFlushedId()) {
assert false;
cache.close();
cache = null;
continue;
}
break;
}
PendingWriteData finalPw = pw;
Snapshot<JObjectKey, JDataVersionedWrapper> finalCache = cache;
return new Snapshot<JObjectKey, JDataVersionedWrapper>() {
private final PSortedMap<JObjectKey, PendingWriteEntry> _pendingWrites = pendingWrites;
private final PSortedMap<JObjectKey, PendingWriteEntry> _pendingWrites = finalPw.pendingWrites();
private final Snapshot<JObjectKey, JDataVersionedWrapper> _cache = finalCache;
private final long txId = lastTxId;
private final long txId = finalPw.lastCommittedId();
@Override
public CloseableKvIterator<JObjectKey, JDataVersionedWrapper> getIterator(IteratorStart start, JObjectKey key) {
@@ -407,8 +380,8 @@ public class WritebackObjectPersistentStore {
@Override
public long id() {
assert lastTxId >= _cache.id();
return lastTxId;
assert txId >= _cache.id();
return txId;
}
@Override
@@ -423,15 +396,6 @@ public class WritebackObjectPersistentStore {
}
}
public long getLastTxId() {
_pendingWritesVersionLock.readLock().lock();
try {
return _lastCommittedTx.get();
} finally {
_pendingWritesVersionLock.readLock().unlock();
}
}
public interface VerboseReadResult {
}