hacky read anomaly solution

This commit is contained in:
2025-01-02 18:51:26 +01:00
parent 6a4b2dd815
commit 57f865dafb
3 changed files with 61 additions and 6 deletions

View File

@@ -82,7 +82,25 @@ public class JObjectManager {
try (var readLock = _objLocker.lock(key)) {
if (_objects.containsKey(key)) continue;
var read = objectStorage.readObject(key)
var pending = txWriteback.getPendingWrite(key);
JDataVersionedWrapper<?> read;
switch (pending.orElse(null)) {
case TxWriteback.PendingWrite write -> {
read = write.data();
}
case TxWriteback.PendingDelete delete -> {
return null;
}
case null -> {
}
default -> {
throw new IllegalStateException("Unexpected value: " + pending);
}
}
read = objectStorage.readObject(key)
.map(objectSerializer::deserialize)
.orElse(null);
@@ -92,7 +110,7 @@ public class JObjectManager {
var wrapper = new JDataWrapper<>((JDataVersionedWrapper<T>) read);
var old = _objects.put(key, wrapper);
assert old == null;
return read;
return (JDataVersionedWrapper<T>) read;
} else {
throw new IllegalArgumentException("Object type mismatch: " + read.getClass() + " vs " + type);
}

View File

@@ -2,6 +2,8 @@ package com.usatiuk.dhfs.objects;
import com.usatiuk.dhfs.utils.VoidFn;
import java.util.Optional;
public interface TxWriteback {
TxBundle createBundle();
@@ -11,6 +13,18 @@ public interface TxWriteback {
void fence(long bundleId);
interface PendingWriteEntry {
long bundleId();
}
record PendingWrite(JDataVersionedWrapper<?> data, long bundleId) implements PendingWriteEntry {
}
record PendingDelete(JObjectKey key, long bundleId) implements PendingWriteEntry {
}
Optional<PendingWriteEntry> getPendingWrite(JObjectKey key);
// Executes callback after bundle with bundleId id has been persisted
// if it was already, runs callback on the caller thread
void asyncFence(long bundleId, VoidFn callback);

View File

@@ -14,15 +14,14 @@ import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import java.util.*;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
@ApplicationScoped
public class TxWritebackImpl implements TxWriteback {
private final LinkedList<TxBundleImpl> _pendingBundles = new LinkedList<>();
private final ConcurrentHashMap<JObjectKey, PendingWriteEntry> _pendingWrites = new ConcurrentHashMap<>();
private final LinkedHashMap<Long, TxBundleImpl> _notFlushedBundles = new LinkedHashMap<>();
private final Object _flushWaitSynchronizer = new Object();
@@ -148,6 +147,14 @@ public class TxWritebackImpl implements TxWriteback {
Log.trace("Bundle " + bundle.getId() + " committed");
synchronized (_pendingBundles) {
bundle._entries.values().forEach(e -> {
var cur = _pendingWrites.get(e.key());
if (cur.bundleId() == bundle.getId())
_pendingWrites.remove(e.key(), cur);
});
}
List<List<VoidFn>> callbacks = new ArrayList<>();
synchronized (_notFlushedBundles) {
_lastWrittenTx.set(bundle.getId());
@@ -233,6 +240,15 @@ public class TxWritebackImpl implements TxWriteback {
verifyReady();
synchronized (_pendingBundles) {
((TxBundleImpl) bundle).setReady();
((TxBundleImpl) bundle)._entries.values().forEach(e -> {
switch (e) {
case TxBundleImpl.CommittedEntry c ->
_pendingWrites.put(c.key(), new PendingWrite(c.data, bundle.getId()));
case TxBundleImpl.DeletedEntry d ->
_pendingWrites.put(d.key(), new PendingDelete(d.key, bundle.getId()));
default -> throw new IllegalStateException("Unexpected value: " + e);
}
});
if (_pendingBundles.peek() == bundle)
_pendingBundles.notify();
synchronized (_flushWaitSynchronizer) {
@@ -264,6 +280,13 @@ public class TxWritebackImpl implements TxWriteback {
}
}
@Override
public Optional<PendingWriteEntry> getPendingWrite(JObjectKey key) {
synchronized (_pendingWrites) {
return Optional.ofNullable(_pendingWrites.get(key));
}
}
@Override
public void asyncFence(long bundleId, VoidFn fn) {
verifyReady();