even more functional iterators

This commit is contained in:
2025-02-23 16:34:06 +01:00
parent 716fb21516
commit 05901f1acc
21 changed files with 807 additions and 523 deletions

View File

@@ -73,6 +73,10 @@
<groupId>org.apache.commons</groupId>
<artifactId>commons-collections4</artifactId>
</dependency>
<dependency>
<groupId>org.pcollections</groupId>
<artifactId>pcollections</artifactId>
</dependency>
</dependencies>
<build>

View File

@@ -0,0 +1,114 @@
package com.usatiuk.dhfs.objects;
import com.usatiuk.dhfs.objects.persistence.IteratorStart;
import io.quarkus.logging.Log;
import org.apache.commons.lang3.tuple.Pair;
import java.util.NoSuchElementException;
import java.util.function.Function;
// Also checks that the next provided item is always consistent after a refresh
public class InconsistentKvIteratorWrapper<K extends Comparable<K>, V> implements CloseableKvIterator<K, V> {
private CloseableKvIterator<K, V> _backing;
private final Function<Pair<IteratorStart, K>, CloseableKvIterator<K, V>> _iteratorSupplier;
private K _lastReturnedKey = null;
private K _peekedKey = null;
private boolean _peekedNext = false;
private final Pair<IteratorStart, K> _initialStart;
public InconsistentKvIteratorWrapper(Function<Pair<IteratorStart, K>, CloseableKvIterator<K, V>> iteratorSupplier, IteratorStart start, K key) {
_iteratorSupplier = iteratorSupplier;
_initialStart = Pair.of(start, key);
while (true) {
try {
_backing = _iteratorSupplier.apply(Pair.of(start, key));
break;
} catch (StaleIteratorException ignored) {
continue;
}
}
}
private void refresh() {
_backing.close();
if (_peekedKey != null) {
_backing = _iteratorSupplier.apply(Pair.of(IteratorStart.GE, _peekedKey));
if (!_backing.hasNext() || !_backing.peekNextKey().equals(_peekedKey)) {
assert false;
}
} else if (_lastReturnedKey != null) {
_backing = _iteratorSupplier.apply(Pair.of(IteratorStart.GT, _lastReturnedKey));
} else {
_backing = _iteratorSupplier.apply(_initialStart);
}
if (_peekedNext && !_backing.hasNext()) {
assert false;
}
}
@Override
public K peekNextKey() {
while (true) {
if (_peekedKey != null) {
return _peekedKey;
}
try {
_peekedKey = _backing.peekNextKey();
assert _lastReturnedKey == null || _peekedKey.compareTo(_lastReturnedKey) > 0;
} catch (NoSuchElementException ignored) {
assert !_peekedNext;
throw ignored;
} catch (StaleIteratorException ignored) {
refresh();
continue;
}
_peekedNext = true;
Log.tracev("Peeked key: {0}", _peekedKey);
return _peekedKey;
}
}
@Override
public void close() {
_backing.close();
}
@Override
public boolean hasNext() {
while (true) {
if (_peekedNext) {
return true;
}
try {
_peekedNext = _backing.hasNext();
Log.tracev("Peeked next: {0}", _peekedNext);
return _peekedNext;
} catch (StaleIteratorException ignored) {
refresh();
continue;
}
}
}
@Override
public Pair<K, V> next() {
while (true) {
try {
var got = _backing.next();
assert _lastReturnedKey == null || _peekedKey.compareTo(_lastReturnedKey) > 0;
_peekedNext = false;
_peekedKey = null;
_lastReturnedKey = got.getKey();
return got;
} catch (NoSuchElementException ignored) {
assert !_peekedNext;
throw ignored;
} catch (StaleIteratorException ignored) {
refresh();
continue;
}
}
}
}

View File

@@ -45,7 +45,6 @@ public class InconsistentSelfRefreshingKvIterator<K extends Comparable<K>, V> im
return;
}
long newVersion = _versionSupplier.get();
Log.tracev("Refreshing iterator last refreshed {0}, current version {1}", _curVersion, newVersion);
oldBacking = _backing;
if (_peekedKey != null) {
_backing = _iteratorSupplier.apply(Pair.of(IteratorStart.GE, _peekedKey));
@@ -62,6 +61,9 @@ public class InconsistentSelfRefreshingKvIterator<K extends Comparable<K>, V> im
throw new StaleIteratorException();
}
Log.tracev("Refreshed iterator last refreshed {0}, current version {1}",
_curVersion, newVersion);
_curVersion = newVersion;
} finally {
_lock.unlock();
@@ -80,7 +82,9 @@ public class InconsistentSelfRefreshingKvIterator<K extends Comparable<K>, V> im
try {
maybeRefresh();
_peekedKey = _backing.peekNextKey();
assert _lastReturnedKey == null || _peekedKey.compareTo(_lastReturnedKey) > 0;
_peekedNext = true;
Log.tracev("Peeked key: {0}", _peekedKey);
return _peekedKey;
} finally {
_lock.unlock();
@@ -101,6 +105,7 @@ public class InconsistentSelfRefreshingKvIterator<K extends Comparable<K>, V> im
try {
maybeRefresh();
_peekedNext = _backing.hasNext();
Log.tracev("Peeked next: {0}", _peekedNext);
return _peekedNext;
} finally {
_lock.unlock();
@@ -113,6 +118,7 @@ public class InconsistentSelfRefreshingKvIterator<K extends Comparable<K>, V> im
try {
maybeRefresh();
var got = _backing.next();
assert _lastReturnedKey == null || got.getKey().compareTo(_lastReturnedKey) > 0;
_peekedNext = false;
_peekedKey = null;
_lastReturnedKey = got.getKey();

View File

@@ -34,4 +34,10 @@ public class MappingKvIterator<K extends Comparable<K>, V, V_T> implements Close
return Pair.of(got.getKey(), _transformer.apply(got.getValue()));
}
@Override
public String toString() {
return "MappingKvIterator{" +
"_backing=" + _backing +
'}';
}
}

View File

@@ -1,5 +1,6 @@
package com.usatiuk.dhfs.objects;
import io.quarkus.logging.Log;
import org.apache.commons.lang3.tuple.Pair;
import java.util.*;
@@ -7,8 +8,10 @@ import java.util.*;
public class MergingKvIterator<K extends Comparable<K>, V> implements CloseableKvIterator<K, V> {
private final Map<CloseableKvIterator<K, V>, Integer> _iterators;
private final SortedMap<K, CloseableKvIterator<K, V>> _sortedIterators = new TreeMap<>();
private final String _name;
public MergingKvIterator(List<CloseableKvIterator<K, V>> iterators) {
public MergingKvIterator(String name, List<CloseableKvIterator<K, V>> iterators) {
_name = name;
int counter = 0;
var iteratorsTmp = new HashMap<CloseableKvIterator<K, V>, Integer>();
for (CloseableKvIterator<K, V> iterator : iterators) {
@@ -19,11 +22,13 @@ public class MergingKvIterator<K extends Comparable<K>, V> implements CloseableK
for (CloseableKvIterator<K, V> iterator : iterators) {
advanceIterator(iterator);
}
Log.tracev("{0} Created: {1}", _name, _sortedIterators);
}
@SafeVarargs
public MergingKvIterator(CloseableKvIterator<K, V>... iterators) {
this(List.of(iterators));
public MergingKvIterator(String name, CloseableKvIterator<K, V>... iterators) {
this(name, List.of(iterators));
}
private void advanceIterator(CloseableKvIterator<K, V> iterator) {
@@ -43,6 +48,9 @@ public class MergingKvIterator<K extends Comparable<K>, V> implements CloseableK
if (oursPrio < theirsPrio) {
_sortedIterators.put(key, iterator);
advanceIterator(them);
} else {
iterator.next();
advanceIterator(iterator);
}
}
@@ -73,6 +81,16 @@ public class MergingKvIterator<K extends Comparable<K>, V> implements CloseableK
}
var curVal = cur.getValue().next();
advanceIterator(cur.getValue());
Log.tracev("{0} Read: {1}, next: {2}", _name, curVal, _sortedIterators);
return curVal;
}
@Override
public String toString() {
return "MergingKvIterator{" +
"_name='" + _name + '\'' +
", _sortedIterators=" + _sortedIterators +
", _iterators=" + _iterators +
'}';
}
}

View File

@@ -60,4 +60,11 @@ public class NavigableMapKvIterator<K extends Comparable<K>, V> implements Close
return Pair.of(ret);
}
@Override
public String toString() {
return "NavigableMapKvIterator{" +
"_iterator=" + _iterator +
", _next=" + _next +
'}';
}
}

View File

@@ -0,0 +1,4 @@
package com.usatiuk.dhfs.objects;
public record PendingDelete(JObjectKey key, long bundleId) implements PendingWriteEntry {
}

View File

@@ -0,0 +1,4 @@
package com.usatiuk.dhfs.objects;
public record PendingWrite(JDataVersionedWrapper data, long bundleId) implements PendingWriteEntry {
}

View File

@@ -0,0 +1,5 @@
package com.usatiuk.dhfs.objects;
public interface PendingWriteEntry {
long bundleId();
}

View File

@@ -53,4 +53,11 @@ public class PredicateKvIterator<K extends Comparable<K>, V, V_T> implements Clo
return ret;
}
@Override
public String toString() {
return "PredicateKvIterator{" +
"_backing=" + _backing +
", _next=" + _next +
'}';
}
}

View File

@@ -43,7 +43,6 @@ public class SelfRefreshingKvIterator<K extends Comparable<K>, V> implements Clo
return;
}
long newVersion = _versionSupplier.get();
Log.tracev("Refreshing iterator last refreshed {0}, current version {1}", _curVersion, newVersion);
oldBacking = _backing;
_backing = _iteratorSupplier.apply(Pair.of(IteratorStart.GE, _next.getKey()));
var next = _backing.hasNext() ? _backing.next() : null;
@@ -56,6 +55,8 @@ public class SelfRefreshingKvIterator<K extends Comparable<K>, V> implements Clo
" current version {1}, current value {2}, read value {3}", _curVersion, newVersion, _next, next);
assert false;
}
Log.tracev("Refreshed iterator last refreshed {0}, current version {1}, old value {2}, new value {3}",
_curVersion, newVersion, _next, next);
_next = next;
_curVersion = newVersion;

View File

@@ -20,7 +20,7 @@ import java.util.function.Consumer;
@ApplicationScoped
public class SnapshotManager {
@Inject
WritebackObjectPersistentStore delegateStore;
WritebackObjectPersistentStore writebackStore;
private final ReentrantReadWriteLock _lock = new ReentrantReadWriteLock();
@@ -64,7 +64,7 @@ public class SnapshotManager {
if (!_snapshotIds.isEmpty()) {
verify();
for (var action : writes) {
var current = delegateStore.readObjectVerbose(action.key());
var current = writebackStore.readObjectVerbose(action.key());
// Add to snapshot the previous visible version of the replaced object
// I.e. should be visible to all transactions with id <= id
// and at least as its corresponding version
@@ -75,13 +75,13 @@ public class SnapshotManager {
Pair.of(new SnapshotKey(action.key(), Math.max(_snapshotIds.peek(), data.map(JDataVersionedWrapper::version).orElse(0L))),
data.<SnapshotEntry>map(o -> new SnapshotEntryObject(o, id)).orElse(new SnapshotEntryDeleted(id)));
case WritebackObjectPersistentStore.VerboseReadResultPending(
TxWriteback.PendingWriteEntry pending
PendingWriteEntry pending
) -> {
assert pending.bundleId() < id;
yield switch (pending) {
case TxWriteback.PendingWrite write ->
case PendingWrite write ->
Pair.of(new SnapshotKey(action.key(), write.bundleId()), new SnapshotEntryObject(write.data(), id));
case TxWriteback.PendingDelete delete ->
case PendingDelete delete ->
Pair.of(new SnapshotKey(action.key(), delete.bundleId()), new SnapshotEntryDeleted(id));
default -> throw new IllegalStateException("Unexpected value: " + pending);
};
@@ -114,7 +114,7 @@ public class SnapshotManager {
// Commit under lock, iterators will see new version after the lock is released and writeback
// cache is updated
// TODO: Maybe writeback iterator being invalidated wouldn't be a problem?
return delegateStore.commitTx(writes, id);
return writebackStore.commitTx(writes, id);
} finally {
_lock.writeLock().unlock();
}
@@ -345,13 +345,10 @@ public class SnapshotManager {
// be served instead. Note that refreshing the iterator will also refresh the writeback iterator,
// so it also should be consistent.
return new CheckingSnapshotKvIterator(new SelfRefreshingKvIterator<>((params) ->
new TombstoneMergingKvIterator<>(new SnapshotKvIterator(params.getLeft(), params.getRight()),
new MappingKvIterator<>(delegateStore.getIterator(params.getLeft(), params.getRight()), d -> switch (d) {
case TombstoneMergingKvIterator.Tombstone<JDataVersionedWrapper>() -> d;
case TombstoneMergingKvIterator.Data<JDataVersionedWrapper> data ->
data.value().version() <= _id ? data : new TombstoneMergingKvIterator.Tombstone<>();
default -> throw new IllegalStateException("Unexpected value: " + d);
})), _snapshotVersion::get, _lock.readLock(), start, key));
new TombstoneMergingKvIterator<>("snapshot", new SnapshotKvIterator(params.getLeft(), params.getRight()),
new MappingKvIterator<>(writebackStore.getIterator(params.getLeft(), params.getRight()), d ->
d.version() <= _id ? new TombstoneMergingKvIterator.Data<>(d) : new TombstoneMergingKvIterator.Tombstone<>()
)), _snapshotVersion::get, _lock.readLock(), start, key));
}
public CloseableKvIterator<JObjectKey, JDataVersionedWrapper> getIterator(JObjectKey key) {
@@ -387,6 +384,6 @@ public class SnapshotManager {
@Nonnull
Optional<JDataVersionedWrapper> readObjectDirect(JObjectKey name) {
return delegateStore.readObject(name);
return writebackStore.readObject(name);
}
}

View File

@@ -1,16 +1,20 @@
package com.usatiuk.dhfs.objects;
import io.quarkus.logging.Log;
import org.apache.commons.lang3.tuple.Pair;
import java.util.List;
public class TombstoneMergingKvIterator<K extends Comparable<K>, V> implements CloseableKvIterator<K, V> {
private final CloseableKvIterator<K, V> _backing;
private final String _name;
public TombstoneMergingKvIterator(List<CloseableKvIterator<K, DataType<V>>> iterators) {
public TombstoneMergingKvIterator(String name, List<CloseableKvIterator<K, DataType<V>>> iterators) {
_name = name;
_backing = new PredicateKvIterator<>(
new MergingKvIterator<>(iterators),
new MergingKvIterator<>(name + "-merging", iterators),
pair -> {
Log.tracev("{0} - Processing pair {1}", _name, pair);
if (pair instanceof Tombstone) {
return null;
}
@@ -19,8 +23,8 @@ public class TombstoneMergingKvIterator<K extends Comparable<K>, V> implements C
}
@SafeVarargs
public TombstoneMergingKvIterator(CloseableKvIterator<K, DataType<V>>... iterators) {
this(List.of(iterators));
public TombstoneMergingKvIterator(String name, CloseableKvIterator<K, DataType<V>>... iterators) {
this(name, List.of(iterators));
}
public interface DataType<T> {
@@ -51,4 +55,12 @@ public class TombstoneMergingKvIterator<K extends Comparable<K>, V> implements C
public Pair<K, V> next() {
return _backing.next();
}
@Override
public String toString() {
return "TombstoneMergingKvIterator{" +
"_backing=" + _backing +
", _name='" + _name + '\'' +
'}';
}
}

View File

@@ -1,40 +0,0 @@
package com.usatiuk.dhfs.objects;
import com.usatiuk.dhfs.objects.persistence.IteratorStart;
import java.util.Collection;
import java.util.Optional;
public interface TxWriteback {
TxBundle createBundle();
void commitBundle(TxBundle bundle);
void dropBundle(TxBundle bundle);
void fence(long bundleId);
Optional<PendingWriteEntry> getPendingWrite(JObjectKey key);
Collection<PendingWriteEntry> getPendingWrites();
// Executes callback after bundle with bundleId id has been persisted
// if it was already, runs callback on the caller thread
void asyncFence(long bundleId, Runnable callback);
interface PendingWriteEntry {
long bundleId();
}
record PendingWrite(JDataVersionedWrapper data, long bundleId) implements PendingWriteEntry {
}
record PendingDelete(JObjectKey key, long bundleId) implements PendingWriteEntry {
}
CloseableKvIterator<JObjectKey, TombstoneMergingKvIterator.DataType<JDataVersionedWrapper>> getIterator(IteratorStart start, JObjectKey key);
default CloseableKvIterator<JObjectKey, TombstoneMergingKvIterator.DataType<JDataVersionedWrapper>> getIterator(JObjectKey key) {
return getIterator(IteratorStart.GE, key);
}
}

View File

@@ -1,408 +0,0 @@
package com.usatiuk.dhfs.objects;
import com.usatiuk.dhfs.objects.persistence.CachingObjectPersistentStore;
import com.usatiuk.dhfs.objects.persistence.IteratorStart;
import com.usatiuk.dhfs.objects.persistence.TxManifestObj;
import io.quarkus.logging.Log;
import io.quarkus.runtime.ShutdownEvent;
import io.quarkus.runtime.StartupEvent;
import jakarta.annotation.Priority;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
import jakarta.inject.Inject;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.apache.commons.lang3.tuple.Pair;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import java.util.*;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@ApplicationScoped
public class TxWritebackImpl implements TxWriteback {
private final LinkedList<TxBundleImpl> _pendingBundles = new LinkedList<>();
private final ReentrantReadWriteLock _pendingBundlesVersionLock = new ReentrantReadWriteLock();
private final ConcurrentSkipListMap<JObjectKey, PendingWriteEntry> _pendingWrites = new ConcurrentSkipListMap<>();
private final AtomicLong _pendingWritesVersion = new AtomicLong();
private final LinkedHashMap<Long, TxBundleImpl> _notFlushedBundles = new LinkedHashMap<>();
private final Object _flushWaitSynchronizer = new Object();
private final AtomicLong _lastWrittenTx = new AtomicLong(-1);
private final AtomicLong _counter = new AtomicLong();
private final AtomicLong _waitedTotal = new AtomicLong(0);
@Inject
CachingObjectPersistentStore objectPersistentStore;
@ConfigProperty(name = "dhfs.objects.writeback.limit")
long sizeLimit;
private long currentSize = 0;
private ExecutorService _writebackExecutor;
private ExecutorService _statusExecutor;
private volatile boolean _ready = false;
void init(@Observes @Priority(110) StartupEvent event) {
{
BasicThreadFactory factory = new BasicThreadFactory.Builder()
.namingPattern("tx-writeback-%d")
.build();
_writebackExecutor = Executors.newSingleThreadExecutor(factory);
_writebackExecutor.submit(this::writeback);
}
_statusExecutor = Executors.newSingleThreadExecutor();
_statusExecutor.submit(() -> {
try {
while (true) {
Thread.sleep(1000);
if (currentSize > 0)
Log.info("Tx commit status: size=" + currentSize / 1024 / 1024 + "MB");
}
} catch (InterruptedException ignored) {
}
});
_ready = true;
}
void shutdown(@Observes @Priority(890) ShutdownEvent event) throws InterruptedException {
Log.info("Waiting for all transactions to drain");
synchronized (_flushWaitSynchronizer) {
_ready = false;
while (currentSize > 0) {
_flushWaitSynchronizer.wait();
}
}
_writebackExecutor.shutdownNow();
Log.info("Total tx bundle wait time: " + _waitedTotal.get() + "ms");
}
private void verifyReady() {
if (!_ready) throw new IllegalStateException("Not doing transactions while shutting down!");
}
private void writeback() {
while (!Thread.interrupted()) {
try {
TxBundleImpl bundle = new TxBundleImpl(0);
synchronized (_pendingBundles) {
while (_pendingBundles.isEmpty() || !_pendingBundles.peek()._ready)
_pendingBundles.wait();
long diff = 0;
while (!_pendingBundles.isEmpty() && _pendingBundles.peek()._ready) {
var toCompress = _pendingBundles.poll();
diff -= toCompress.calculateTotalSize();
bundle.compress(toCompress);
}
diff += bundle.calculateTotalSize();
synchronized (_flushWaitSynchronizer) {
currentSize += diff;
}
}
var toWrite = new ArrayList<Pair<JObjectKey, JDataVersionedWrapper>>();
var toDelete = new ArrayList<JObjectKey>();
for (var e : bundle._entries.values()) {
switch (e) {
case TxBundleImpl.CommittedEntry(JObjectKey key, JDataVersionedWrapper data, int size) -> {
Log.trace("Writing new " + key);
toWrite.add(Pair.of(key, data));
}
case TxBundleImpl.DeletedEntry(JObjectKey key) -> {
Log.trace("Deleting from persistent storage " + key);
toDelete.add(key);
}
default -> throw new IllegalStateException("Unexpected value: " + e);
}
}
objectPersistentStore.commitTx(
new TxManifestObj<>(
Collections.unmodifiableList(toWrite),
Collections.unmodifiableList(toDelete)
));
Log.trace("Bundle " + bundle.getId() + " committed");
// Remove from pending writes, after real commit
synchronized (_pendingBundles) {
bundle._entries.values().forEach(e -> {
var cur = _pendingWrites.get(e.key());
if (cur.bundleId() <= bundle.getId())
_pendingWrites.remove(e.key(), cur);
});
// No need to increment version
}
List<List<Runnable>> callbacks = new ArrayList<>();
synchronized (_notFlushedBundles) {
_lastWrittenTx.set(bundle.getId());
while (!_notFlushedBundles.isEmpty() && _notFlushedBundles.firstEntry().getKey() <= bundle.getId()) {
callbacks.add(_notFlushedBundles.pollFirstEntry().getValue().setCommitted());
}
}
callbacks.forEach(l -> l.forEach(Runnable::run));
synchronized (_flushWaitSynchronizer) {
currentSize -= bundle.calculateTotalSize();
// FIXME:
if (currentSize <= sizeLimit || !_ready)
_flushWaitSynchronizer.notifyAll();
}
} catch (InterruptedException ignored) {
} catch (Exception e) {
Log.error("Uncaught exception in writeback", e);
} catch (Throwable o) {
Log.error("Uncaught THROWABLE in writeback", o);
}
}
Log.info("Writeback thread exiting");
}
@Override
public TxBundle createBundle() {
verifyReady();
boolean wait = false;
while (true) {
if (wait) {
synchronized (_flushWaitSynchronizer) {
long started = System.currentTimeMillis();
while (currentSize > sizeLimit) {
try {
_flushWaitSynchronizer.wait();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
long waited = System.currentTimeMillis() - started;
_waitedTotal.addAndGet(waited);
if (Log.isTraceEnabled())
Log.trace("Thread " + Thread.currentThread().getName() + " waited for tx bundle for " + waited + " ms");
wait = false;
}
}
synchronized (_pendingBundles) {
synchronized (_flushWaitSynchronizer) {
if (currentSize > sizeLimit) {
if (!_pendingBundles.isEmpty() && _pendingBundles.peek()._ready) {
var target = _pendingBundles.poll();
long diff = -target.calculateTotalSize();
while (!_pendingBundles.isEmpty() && _pendingBundles.peek()._ready) {
var toCompress = _pendingBundles.poll();
diff -= toCompress.calculateTotalSize();
target.compress(toCompress);
}
diff += target.calculateTotalSize();
currentSize += diff;
_pendingBundles.addFirst(target);
}
}
if (currentSize > sizeLimit) {
wait = true;
continue;
}
}
synchronized (_notFlushedBundles) {
var bundle = new TxBundleImpl(_counter.incrementAndGet());
_pendingBundles.addLast(bundle);
_notFlushedBundles.put(bundle.getId(), bundle);
return bundle;
}
}
}
}
@Override
public void commitBundle(TxBundle bundle) {
verifyReady();
_pendingBundlesVersionLock.writeLock().lock();
try {
synchronized (_pendingBundles) {
((TxBundleImpl) bundle).setReady();
((TxBundleImpl) bundle)._entries.values().forEach(e -> {
switch (e) {
case TxBundleImpl.CommittedEntry c ->
_pendingWrites.put(c.key(), new PendingWrite(c.data, bundle.getId()));
case TxBundleImpl.DeletedEntry d ->
_pendingWrites.put(d.key(), new PendingDelete(d.key, bundle.getId()));
default -> throw new IllegalStateException("Unexpected value: " + e);
}
});
_pendingWritesVersion.incrementAndGet();
if (_pendingBundles.peek() == bundle)
_pendingBundles.notify();
synchronized (_flushWaitSynchronizer) {
currentSize += ((TxBundleImpl) bundle).calculateTotalSize();
}
}
} finally {
_pendingBundlesVersionLock.writeLock().unlock();
}
}
@Override
public void dropBundle(TxBundle bundle) {
verifyReady();
synchronized (_pendingBundles) {
Log.warn("Dropped bundle: " + bundle);
_pendingBundles.remove((TxBundleImpl) bundle);
synchronized (_flushWaitSynchronizer) {
currentSize -= ((TxBundleImpl) bundle).calculateTotalSize();
}
}
}
@Override
public void fence(long bundleId) {
var latch = new CountDownLatch(1);
asyncFence(bundleId, latch::countDown);
try {
latch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
@Override
public Optional<PendingWriteEntry> getPendingWrite(JObjectKey key) {
synchronized (_pendingBundles) {
return Optional.ofNullable(_pendingWrites.get(key));
}
}
@Override
public Collection<PendingWriteEntry> getPendingWrites() {
synchronized (_pendingBundles) {
return Collections.unmodifiableCollection(_pendingWrites.values());
}
}
@Override
public void asyncFence(long bundleId, Runnable fn) {
verifyReady();
if (bundleId < 0) throw new IllegalArgumentException("txId should be >0!");
if (_lastWrittenTx.get() >= bundleId) {
fn.run();
return;
}
synchronized (_notFlushedBundles) {
if (_lastWrittenTx.get() >= bundleId) {
fn.run();
return;
}
_notFlushedBundles.get(bundleId).addCallback(fn);
}
}
private class TxBundleImpl implements TxBundle {
private final LinkedHashMap<JObjectKey, BundleEntry> _entries = new LinkedHashMap<>();
private final ArrayList<Runnable> _callbacks = new ArrayList<>();
private long _txId;
private volatile boolean _ready = false;
private long _size = -1;
private boolean _wasCommitted = false;
private TxBundleImpl(long txId) {
_txId = txId;
}
@Override
public long getId() {
return _txId;
}
public void setReady() {
_ready = true;
}
public void addCallback(Runnable callback) {
synchronized (_callbacks) {
if (_wasCommitted) throw new IllegalStateException();
_callbacks.add(callback);
}
}
public List<Runnable> setCommitted() {
synchronized (_callbacks) {
_wasCommitted = true;
return Collections.unmodifiableList(_callbacks);
}
}
@Override
public void commit(JDataVersionedWrapper obj) {
synchronized (_entries) {
_entries.put(obj.data().key(), new CommittedEntry(obj.data().key(), obj, obj.data().estimateSize()));
}
}
@Override
public void delete(JObjectKey obj) {
synchronized (_entries) {
_entries.put(obj, new DeletedEntry(obj));
}
}
public long calculateTotalSize() {
if (_size >= 0) return _size;
_size = _entries.values().stream().mapToInt(BundleEntry::size).sum();
return _size;
}
public void compress(TxBundleImpl other) {
if (_txId >= other._txId)
throw new IllegalArgumentException("Compressing an older bundle into newer");
_txId = other._txId;
_size = -1;
_entries.putAll(other._entries);
}
private interface BundleEntry {
JObjectKey key();
int size();
}
private record CommittedEntry(JObjectKey key, JDataVersionedWrapper data, int size)
implements BundleEntry {
}
private record DeletedEntry(JObjectKey key)
implements BundleEntry {
@Override
public int size() {
return 64;
}
}
}
// Returns an iterator with a view of all commited objects
// Does not have to guarantee consistent view, snapshots are handled by upper layers
// Invalidated by commitBundle, but might return data after it has been really committed
@Override
public CloseableKvIterator<JObjectKey, TombstoneMergingKvIterator.DataType<JDataVersionedWrapper>> getIterator(IteratorStart start, JObjectKey key) {
_pendingBundlesVersionLock.readLock().lock();
try {
return new InvalidatableKvIterator<>(new MappingKvIterator<>(
new NavigableMapKvIterator<>(_pendingWrites, start, key),
e -> switch (e) {
case PendingWrite p -> new TombstoneMergingKvIterator.Data<>(p.data());
case PendingDelete d -> new TombstoneMergingKvIterator.Tombstone<>();
default -> throw new IllegalStateException("Unexpected value: " + e);
}), _pendingWritesVersion::get, _pendingBundlesVersionLock.readLock());
} finally {
_pendingBundlesVersionLock.readLock().unlock();
}
}
}

View File

@@ -2,49 +2,394 @@ package com.usatiuk.dhfs.objects;
import com.usatiuk.dhfs.objects.persistence.CachingObjectPersistentStore;
import com.usatiuk.dhfs.objects.persistence.IteratorStart;
import com.usatiuk.dhfs.objects.persistence.TxManifestObj;
import com.usatiuk.dhfs.objects.transaction.TxRecord;
import io.quarkus.logging.Log;
import io.quarkus.runtime.ShutdownEvent;
import io.quarkus.runtime.StartupEvent;
import jakarta.annotation.Priority;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
import jakarta.inject.Inject;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.apache.commons.lang3.tuple.Pair;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.pcollections.PSortedMap;
import org.pcollections.TreePMap;
import javax.annotation.Nonnull;
import java.util.Collection;
import java.util.HashSet;
import java.util.Optional;
import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
@ApplicationScoped
public class WritebackObjectPersistentStore {
@Inject
CachingObjectPersistentStore delegate;
@Inject
TxWriteback txWriteback;
private final AtomicLong _commitCounter = new AtomicLong(0);
private final ReentrantReadWriteLock _lock = new ReentrantReadWriteLock();
private final LinkedList<TxBundleImpl> _pendingBundles = new LinkedList<>();
@Nonnull
public Collection<JObjectKey> findAllObjects() {
var pending = txWriteback.getPendingWrites();
var found = new HashSet<>(delegate.findAllObjects());
for (var p : pending) {
switch (p) {
case TxWriteback.PendingWrite write -> found.add(write.data().data().key());
case TxWriteback.PendingDelete deleted -> found.remove(deleted.key());
default -> throw new IllegalStateException("Unexpected value: " + p);
private final AtomicReference<PSortedMap<JObjectKey, PendingWriteEntry>> _pendingWrites = new AtomicReference<>(TreePMap.empty());
private final ReentrantReadWriteLock _pendingWritesVersionLock = new ReentrantReadWriteLock();
private final AtomicLong _pendingWritesVersion = new AtomicLong();
private final LinkedHashMap<Long, TxBundleImpl> _notFlushedBundles = new LinkedHashMap<>();
private final Object _flushWaitSynchronizer = new Object();
private final AtomicLong _lastWrittenTx = new AtomicLong(-1);
private final AtomicLong _counter = new AtomicLong();
private final AtomicLong _waitedTotal = new AtomicLong(0);
@Inject
CachingObjectPersistentStore cachedStore;
@ConfigProperty(name = "dhfs.objects.writeback.limit")
long sizeLimit;
private long currentSize = 0;
private ExecutorService _writebackExecutor;
private ExecutorService _statusExecutor;
private volatile boolean _ready = false;
void init(@Observes @Priority(110) StartupEvent event) {
{
BasicThreadFactory factory = new BasicThreadFactory.Builder()
.namingPattern("tx-writeback-%d")
.build();
_writebackExecutor = Executors.newSingleThreadExecutor(factory);
_writebackExecutor.submit(this::writeback);
}
_statusExecutor = Executors.newSingleThreadExecutor();
_statusExecutor.submit(() -> {
try {
while (true) {
Thread.sleep(1000);
if (currentSize > 0)
Log.info("Tx commit status: size=" + currentSize / 1024 / 1024 + "MB");
}
} catch (InterruptedException ignored) {
}
});
_ready = true;
}
void shutdown(@Observes @Priority(890) ShutdownEvent event) throws InterruptedException {
Log.info("Waiting for all transactions to drain");
synchronized (_flushWaitSynchronizer) {
_ready = false;
while (currentSize > 0) {
_flushWaitSynchronizer.wait();
}
}
return found;
_writebackExecutor.shutdownNow();
Log.info("Total tx bundle wait time: " + _waitedTotal.get() + "ms");
}
private void verifyReady() {
if (!_ready) throw new IllegalStateException("Not doing transactions while shutting down!");
}
private void writeback() {
while (!Thread.interrupted()) {
try {
TxBundleImpl bundle = new TxBundleImpl(0);
synchronized (_pendingBundles) {
while (_pendingBundles.isEmpty() || !_pendingBundles.peek()._ready)
_pendingBundles.wait();
long diff = 0;
while (!_pendingBundles.isEmpty() && _pendingBundles.peek()._ready) {
var toCompress = _pendingBundles.poll();
diff -= toCompress.calculateTotalSize();
bundle.compress(toCompress);
}
diff += bundle.calculateTotalSize();
synchronized (_flushWaitSynchronizer) {
currentSize += diff;
}
}
var toWrite = new ArrayList<Pair<JObjectKey, JDataVersionedWrapper>>();
var toDelete = new ArrayList<JObjectKey>();
for (var e : bundle._entries.values()) {
switch (e) {
case TxBundleImpl.CommittedEntry(JObjectKey key, JDataVersionedWrapper data, int size) -> {
Log.trace("Writing new " + key);
toWrite.add(Pair.of(key, data));
}
case TxBundleImpl.DeletedEntry(JObjectKey key) -> {
Log.trace("Deleting from persistent storage " + key);
toDelete.add(key);
}
default -> throw new IllegalStateException("Unexpected value: " + e);
}
}
cachedStore.commitTx(
new TxManifestObj<>(
Collections.unmodifiableList(toWrite),
Collections.unmodifiableList(toDelete)
));
Log.trace("Bundle " + bundle.getId() + " committed");
// Remove from pending writes, after real commit
synchronized (_pendingBundles) {
var curPw = _pendingWrites.get();
for (var e : bundle._entries.values()) {
var cur = curPw.get(e.key());
if (cur.bundleId() <= bundle.getId())
curPw = curPw.minus(e.key());
}
_pendingWrites.set(curPw);
// No need to increment version
}
List<List<Runnable>> callbacks = new ArrayList<>();
synchronized (_notFlushedBundles) {
_lastWrittenTx.set(bundle.getId());
while (!_notFlushedBundles.isEmpty() && _notFlushedBundles.firstEntry().getKey() <= bundle.getId()) {
callbacks.add(_notFlushedBundles.pollFirstEntry().getValue().setCommitted());
}
}
callbacks.forEach(l -> l.forEach(Runnable::run));
synchronized (_flushWaitSynchronizer) {
currentSize -= bundle.calculateTotalSize();
// FIXME:
if (currentSize <= sizeLimit || !_ready)
_flushWaitSynchronizer.notifyAll();
}
} catch (InterruptedException ignored) {
} catch (Exception e) {
Log.error("Uncaught exception in writeback", e);
} catch (Throwable o) {
Log.error("Uncaught THROWABLE in writeback", o);
}
}
Log.info("Writeback thread exiting");
}
public TxBundle createBundle() {
verifyReady();
boolean wait = false;
while (true) {
if (wait) {
synchronized (_flushWaitSynchronizer) {
long started = System.currentTimeMillis();
while (currentSize > sizeLimit) {
try {
_flushWaitSynchronizer.wait();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
long waited = System.currentTimeMillis() - started;
_waitedTotal.addAndGet(waited);
if (Log.isTraceEnabled())
Log.trace("Thread " + Thread.currentThread().getName() + " waited for tx bundle for " + waited + " ms");
wait = false;
}
}
synchronized (_pendingBundles) {
synchronized (_flushWaitSynchronizer) {
if (currentSize > sizeLimit) {
if (!_pendingBundles.isEmpty() && _pendingBundles.peek()._ready) {
var target = _pendingBundles.poll();
long diff = -target.calculateTotalSize();
while (!_pendingBundles.isEmpty() && _pendingBundles.peek()._ready) {
var toCompress = _pendingBundles.poll();
diff -= toCompress.calculateTotalSize();
target.compress(toCompress);
}
diff += target.calculateTotalSize();
currentSize += diff;
_pendingBundles.addFirst(target);
}
}
if (currentSize > sizeLimit) {
wait = true;
continue;
}
}
synchronized (_notFlushedBundles) {
var bundle = new TxBundleImpl(_counter.incrementAndGet());
_pendingBundles.addLast(bundle);
_notFlushedBundles.put(bundle.getId(), bundle);
return bundle;
}
}
}
}
public void commitBundle(TxBundle bundle) {
verifyReady();
_pendingWritesVersionLock.writeLock().lock();
try {
var curPw = _pendingWrites.get();
for (var e : ((TxBundleImpl) bundle)._entries.values()) {
switch (e) {
case TxBundleImpl.CommittedEntry c -> {
curPw = curPw.plus(c.key(), new PendingWrite(c.data, bundle.getId()));
}
case TxBundleImpl.DeletedEntry d -> {
curPw = curPw.plus(d.key(), new PendingDelete(d.key, bundle.getId()));
}
default -> throw new IllegalStateException("Unexpected value: " + e);
}
}
_pendingWrites.set(curPw);
synchronized (_pendingBundles) {
((TxBundleImpl) bundle).setReady();
_pendingWritesVersion.incrementAndGet();
if (_pendingBundles.peek() == bundle)
_pendingBundles.notify();
synchronized (_flushWaitSynchronizer) {
currentSize += ((TxBundleImpl) bundle).calculateTotalSize();
}
}
} finally {
_pendingWritesVersionLock.writeLock().unlock();
}
}
public void dropBundle(TxBundle bundle) {
verifyReady();
synchronized (_pendingBundles) {
Log.warn("Dropped bundle: " + bundle);
_pendingBundles.remove((TxBundleImpl) bundle);
synchronized (_flushWaitSynchronizer) {
currentSize -= ((TxBundleImpl) bundle).calculateTotalSize();
}
}
}
public void fence(long bundleId) {
var latch = new CountDownLatch(1);
asyncFence(bundleId, latch::countDown);
try {
latch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
public void asyncFence(long bundleId, Runnable fn) {
verifyReady();
if (bundleId < 0) throw new IllegalArgumentException("txId should be >0!");
if (_lastWrittenTx.get() >= bundleId) {
fn.run();
return;
}
synchronized (_notFlushedBundles) {
if (_lastWrittenTx.get() >= bundleId) {
fn.run();
return;
}
_notFlushedBundles.get(bundleId).addCallback(fn);
}
}
private static class TxBundleImpl implements TxBundle {
private final LinkedHashMap<JObjectKey, BundleEntry> _entries = new LinkedHashMap<>();
private final ArrayList<Runnable> _callbacks = new ArrayList<>();
private long _txId;
private volatile boolean _ready = false;
private long _size = -1;
private boolean _wasCommitted = false;
private TxBundleImpl(long txId) {
_txId = txId;
}
public long getId() {
return _txId;
}
public void setReady() {
_ready = true;
}
public void addCallback(Runnable callback) {
synchronized (_callbacks) {
if (_wasCommitted) throw new IllegalStateException();
_callbacks.add(callback);
}
}
public List<Runnable> setCommitted() {
synchronized (_callbacks) {
_wasCommitted = true;
return Collections.unmodifiableList(_callbacks);
}
}
public void commit(JDataVersionedWrapper obj) {
synchronized (_entries) {
_entries.put(obj.data().key(), new CommittedEntry(obj.data().key(), obj, obj.data().estimateSize()));
}
}
public void delete(JObjectKey obj) {
synchronized (_entries) {
_entries.put(obj, new DeletedEntry(obj));
}
}
public long calculateTotalSize() {
if (_size >= 0) return _size;
_size = _entries.values().stream().mapToInt(BundleEntry::size).sum();
return _size;
}
public void compress(TxBundleImpl other) {
if (_txId >= other._txId)
throw new IllegalArgumentException("Compressing an older bundle into newer");
_txId = other._txId;
_size = -1;
_entries.putAll(other._entries);
}
private interface BundleEntry {
JObjectKey key();
int size();
}
private record CommittedEntry(JObjectKey key, JDataVersionedWrapper data, int size)
implements BundleEntry {
}
private record DeletedEntry(JObjectKey key)
implements BundleEntry {
public int size() {
return 64;
}
}
}
public Optional<PendingWriteEntry> getPendingWrite(JObjectKey key) {
synchronized (_pendingBundles) {
return Optional.ofNullable(_pendingWrites.get().get(key));
}
}
@Nonnull
Optional<JDataVersionedWrapper> readObject(JObjectKey name) {
var pending = txWriteback.getPendingWrite(name).orElse(null);
var pending = getPendingWrite(name).orElse(null);
return switch (pending) {
case TxWriteback.PendingWrite write -> Optional.of(write.data());
case TxWriteback.PendingDelete ignored -> Optional.empty();
case null -> delegate.readObject(name);
case PendingWrite write -> Optional.of(write.data());
case PendingDelete ignored -> Optional.empty();
case null -> cachedStore.readObject(name);
default -> throw new IllegalStateException("Unexpected value: " + pending);
};
}
@@ -55,20 +400,20 @@ public class WritebackObjectPersistentStore {
public record VerboseReadResultPersisted(Optional<JDataVersionedWrapper> data) implements VerboseReadResult {
}
public record VerboseReadResultPending(TxWriteback.PendingWriteEntry pending) implements VerboseReadResult {
public record VerboseReadResultPending(PendingWriteEntry pending) implements VerboseReadResult {
}
@Nonnull
VerboseReadResult readObjectVerbose(JObjectKey key) {
var pending = txWriteback.getPendingWrite(key).orElse(null);
var pending = getPendingWrite(key).orElse(null);
if (pending != null) {
return new VerboseReadResultPending(pending);
}
return new VerboseReadResultPersisted(delegate.readObject(key));
return new VerboseReadResultPersisted(cachedStore.readObject(key));
}
Consumer<Runnable> commitTx(Collection<TxRecord.TxObjectRecord<?>> writes, long id) {
var bundle = txWriteback.createBundle();
var bundle = createBundle();
try {
for (var action : writes) {
switch (action) {
@@ -86,34 +431,45 @@ public class WritebackObjectPersistentStore {
}
}
} catch (Throwable t) {
txWriteback.dropBundle(bundle);
dropBundle(bundle);
throw new TxCommitException(t.getMessage(), t);
}
Log.tracef("Committing transaction %d to storage", id);
txWriteback.commitBundle(bundle);
_commitCounter.incrementAndGet();
commitBundle(bundle);
long bundleId = bundle.getId();
return r -> txWriteback.asyncFence(bundleId, r);
return r -> asyncFence(bundleId, r);
}
// Returns an iterator with a view of all commited objects
// Does not have to guarantee consistent view, snapshots are handled by upper layers
// Should be refreshed after each commit
public CloseableKvIterator<JObjectKey, TombstoneMergingKvIterator.DataType<JDataVersionedWrapper>> getIterator(IteratorStart start, JObjectKey key) {
_lock.readLock().lock();
// Invalidated by commitBundle, but might return data after it has been really committed
public CloseableKvIterator<JObjectKey, JDataVersionedWrapper> getIterator(IteratorStart start, JObjectKey key) {
_pendingWritesVersionLock.readLock().lock();
try {
return new InvalidatableKvIterator<>(new MergingKvIterator<>(txWriteback.getIterator(start, key),
new MappingKvIterator<>(delegate.getIterator(start, key), TombstoneMergingKvIterator.Data::new)),
_commitCounter::get, _lock.readLock());
CloseableKvIterator<JObjectKey, TombstoneMergingKvIterator.DataType<JDataVersionedWrapper>> oursIterator = new MappingKvIterator<>(
new NavigableMapKvIterator<>(_pendingWrites.get(), start, key),
e -> switch (e) {
case PendingWrite p -> new TombstoneMergingKvIterator.Data<>(p.data());
case PendingDelete d -> new TombstoneMergingKvIterator.Tombstone<>();
default -> throw new IllegalStateException("Unexpected value: " + e);
});
return new InvalidatableKvIterator<>(
new InconsistentKvIteratorWrapper<>(
(p) ->
new TombstoneMergingKvIterator<>("writeback-ps",
oursIterator,
new MappingKvIterator<>(cachedStore.getIterator(p.getLeft(), p.getRight()), TombstoneMergingKvIterator.Data::new)), start, key),
_pendingWritesVersion::get, _pendingWritesVersionLock.readLock());
} finally {
_lock.readLock().unlock();
_pendingWritesVersionLock.readLock().unlock();
}
}
public CloseableKvIterator<JObjectKey, TombstoneMergingKvIterator.DataType<JDataVersionedWrapper>> getIterator(JObjectKey key) {
public CloseableKvIterator<JObjectKey, JDataVersionedWrapper> getIterator(JObjectKey key) {
return getIterator(IteratorStart.GE, key);
}
}

View File

@@ -125,6 +125,7 @@ public class CachingObjectPersistentStore {
assert added;
}
}
Log.tracev("Committing: {0} writes, {1} deletes", names.written().size(), names.deleted().size());
delegate.commitTx(serialized);
// Now, reading from the backing store should return the new data
synchronized (_cache) {
@@ -135,6 +136,7 @@ public class CachingObjectPersistentStore {
}
}
_cacheVersion.incrementAndGet();
Log.tracev("Committed: {0} writes, {1} deletes", names.written().size(), names.deleted().size());
} finally {
_cacheVersionLock.writeLock().unlock();
}
@@ -166,6 +168,7 @@ public class CachingObjectPersistentStore {
@Override
public Pair<JObjectKey, JDataVersionedWrapper> next() {
var next = _delegate.next();
Log.tracev("Caching: {0}", next);
put(next.getKey(), Optional.of(next.getValue()));
return next;
}
@@ -176,13 +179,21 @@ public class CachingObjectPersistentStore {
// Warning: it has a nasty side effect of global caching, so in this case don't even call next on it,
// if some objects are still in writeback
public CloseableKvIterator<JObjectKey, JDataVersionedWrapper> getIterator(IteratorStart start, JObjectKey key) {
return new InconsistentSelfRefreshingKvIterator<>(
(bp) -> new MergingKvIterator<>(
new PredicateKvIterator<>(
new NavigableMapKvIterator<>(_sortedCache, bp.getLeft(), bp.getRight()),
e -> e.object().orElse(null)
), new CachingKvIterator(delegate.getIterator(bp.getLeft(), bp.getRight()))), _cacheVersion::get,
_cacheVersionLock.readLock(), start, key);
_cacheVersionLock.readLock().lock();
try {
return new InconsistentSelfRefreshingKvIterator<>(
(bp) -> new MergingKvIterator<>("cache",
new PredicateKvIterator<>(
new NavigableMapKvIterator<>(_sortedCache, bp.getLeft(), bp.getRight()),
e -> {
Log.tracev("Taken from cache: {0}", e);
return e.object().orElse(null);
}
), new CachingKvIterator(delegate.getIterator(bp.getLeft(), bp.getRight()))), _cacheVersion::get,
_cacheVersionLock.readLock(), start, key);
} finally {
_cacheVersionLock.readLock().unlock();
}
}
public CloseableKvIterator<JObjectKey, JDataVersionedWrapper> getIterator(JObjectKey key) {

View File

@@ -1,6 +1,8 @@
package com.usatiuk.dhfs.objects;
import java.util.Arrays;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
public abstract class Just {
public static void run(Callable<?> callable) {
@@ -12,4 +14,26 @@ public abstract class Just {
}
}).start();
}
public static void runAll(Callable<?>... callables) {
try {
try (var exs = Executors.newFixedThreadPool(callables.length)) {
exs.invokeAll(Arrays.stream(callables).map(c -> (Callable<?>) () -> {
try {
return c.call();
} catch (Exception e) {
throw new RuntimeException(e);
}
}).toList()).forEach(f -> {
try {
f.get();
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}

View File

@@ -74,7 +74,7 @@ public class MergingKvIteratorTest {
public void testSimple() {
var source1 = List.of(Pair.of(1, 2), Pair.of(3, 4), Pair.of(5, 6)).iterator();
var source2 = List.of(Pair.of(2, 3), Pair.of(4, 5), Pair.of(6, 7)).iterator();
var mergingIterator = new MergingKvIterator<>(new SimpleIteratorWrapper<>(source1), new SimpleIteratorWrapper<>(source2));
var mergingIterator = new MergingKvIterator<>("test", new SimpleIteratorWrapper<>(source1), new SimpleIteratorWrapper<>(source2));
var expected = List.of(Pair.of(1, 2), Pair.of(2, 3), Pair.of(3, 4), Pair.of(4, 5), Pair.of(5, 6), Pair.of(6, 7));
for (var pair : expected) {
Assertions.assertTrue(mergingIterator.hasNext());
@@ -86,7 +86,7 @@ public class MergingKvIteratorTest {
public void testPriority() {
var source1 = List.of(Pair.of(1, 2), Pair.of(2, 4), Pair.of(5, 6));
var source2 = List.of(Pair.of(1, 3), Pair.of(2, 5), Pair.of(5, 7));
var mergingIterator = new MergingKvIterator<>(new SimpleIteratorWrapper<>(source1.iterator()), new SimpleIteratorWrapper<>(source2.iterator()));
var mergingIterator = new MergingKvIterator<>("test", new SimpleIteratorWrapper<>(source1.iterator()), new SimpleIteratorWrapper<>(source2.iterator()));
var expected = List.of(Pair.of(1, 2), Pair.of(2, 4), Pair.of(5, 6));
for (var pair : expected) {
Assertions.assertTrue(mergingIterator.hasNext());
@@ -94,7 +94,7 @@ public class MergingKvIteratorTest {
}
Assertions.assertFalse(mergingIterator.hasNext());
var mergingIterator2 = new MergingKvIterator<>(new SimpleIteratorWrapper<>(source2.iterator()), new SimpleIteratorWrapper<>(source1.iterator()));
var mergingIterator2 = new MergingKvIterator<>("test", new SimpleIteratorWrapper<>(source2.iterator()), new SimpleIteratorWrapper<>(source1.iterator()));
var expected2 = List.of(Pair.of(1, 3), Pair.of(2, 5), Pair.of(5, 7));
for (var pair : expected2) {
Assertions.assertTrue(mergingIterator2.hasNext());
@@ -102,4 +102,25 @@ public class MergingKvIteratorTest {
}
Assertions.assertFalse(mergingIterator2.hasNext());
}
@Test
public void testPriority2() {
var source1 = List.of(Pair.of(2, 4), Pair.of(5, 6));
var source2 = List.of(Pair.of(1, 3), Pair.of(2, 5));
var mergingIterator = new MergingKvIterator<>("test", new SimpleIteratorWrapper<>(source1.iterator()), new SimpleIteratorWrapper<>(source2.iterator()));
var expected = List.of(Pair.of(1, 3), Pair.of(2, 4), Pair.of(5, 6));
for (var pair : expected) {
Assertions.assertTrue(mergingIterator.hasNext());
Assertions.assertEquals(pair, mergingIterator.next());
}
Assertions.assertFalse(mergingIterator.hasNext());
var mergingIterator2 = new MergingKvIterator<>("test", new SimpleIteratorWrapper<>(source2.iterator()), new SimpleIteratorWrapper<>(source1.iterator()));
var expected2 = List.of(Pair.of(1, 3), Pair.of(2, 5), Pair.of(5, 6));
for (var pair : expected2) {
Assertions.assertTrue(mergingIterator2.hasNext());
Assertions.assertEquals(pair, mergingIterator2.next());
}
Assertions.assertFalse(mergingIterator2.hasNext());
}
}

View File

@@ -1,6 +1,7 @@
package com.usatiuk.dhfs.objects;
import com.usatiuk.dhfs.objects.data.Parent;
import com.usatiuk.dhfs.objects.persistence.IteratorStart;
import com.usatiuk.dhfs.objects.transaction.LockingStrategy;
import com.usatiuk.dhfs.objects.transaction.Transaction;
import io.quarkus.logging.Log;
@@ -442,6 +443,139 @@ public class ObjectsTest {
deleteAndCheck(new JObjectKey(key));
}
@RepeatedTest(100)
void simpleIterator1() throws Exception {
var key = "SimpleIterator1";
var key1 = key + "_1";
var key2 = key + "_2";
var key3 = key + "_3";
var key4 = key + "_4";
txm.run(() -> {
curTx.put(new Parent(JObjectKey.of(key), "John"));
curTx.put(new Parent(JObjectKey.of(key1), "John1"));
curTx.put(new Parent(JObjectKey.of(key2), "John2"));
curTx.put(new Parent(JObjectKey.of(key3), "John3"));
curTx.put(new Parent(JObjectKey.of(key4), "John4"));
});
txm.run(() -> {
var iter = curTx.getIterator(IteratorStart.GT, new JObjectKey(key));
var got = iter.next();
Assertions.assertEquals(key1, got.getKey().name());
got = iter.next();
Assertions.assertEquals(key2, got.getKey().name());
got = iter.next();
Assertions.assertEquals(key3, got.getKey().name());
got = iter.next();
Assertions.assertEquals(key4, got.getKey().name());
});
}
@RepeatedTest(100)
void simpleIterator2() throws Exception {
var key = "SimpleIterator2";
var key1 = key + "_1";
var key2 = key + "_2";
var key3 = key + "_3";
var key4 = key + "_4";
txm.run(() -> {
curTx.put(new Parent(JObjectKey.of(key), "John"));
curTx.put(new Parent(JObjectKey.of(key1), "John1"));
curTx.put(new Parent(JObjectKey.of(key2), "John2"));
curTx.put(new Parent(JObjectKey.of(key3), "John3"));
curTx.put(new Parent(JObjectKey.of(key4), "John4"));
});
txm.run(() -> {
var iter = curTx.getIterator(IteratorStart.GT, new JObjectKey(key));
var got = iter.next();
Assertions.assertEquals(key1, got.getKey().name());
got = iter.next();
Assertions.assertEquals(key2, got.getKey().name());
got = iter.next();
Assertions.assertEquals(key3, got.getKey().name());
got = iter.next();
Assertions.assertEquals(key4, got.getKey().name());
});
txm.run(() -> {
curTx.delete(new JObjectKey(key));
curTx.delete(new JObjectKey(key1));
curTx.delete(new JObjectKey(key2));
curTx.delete(new JObjectKey(key3));
curTx.delete(new JObjectKey(key4));
});
txm.run(() -> {
var iter = curTx.getIterator(IteratorStart.GT, new JObjectKey(key));
Assertions.assertTrue(!iter.hasNext() || !iter.next().getKey().name().startsWith(key));
});
}
@RepeatedTest(100)
void concurrentIterator1() {
var key = "ConcurrentIterator1";
var key1 = key + "_1";
var key2 = key + "_2";
var key3 = key + "_3";
var key4 = key + "_4";
txm.run(() -> {
curTx.put(new Parent(JObjectKey.of(key), "John"));
curTx.put(new Parent(JObjectKey.of(key1), "John1"));
curTx.put(new Parent(JObjectKey.of(key4), "John4"));
});
var barrier = new CyclicBarrier(2);
var barrier2 = new CyclicBarrier(2);
Just.runAll(() -> {
barrier.await();
txm.run(() -> {
Log.info("Thread 1 starting tx");
try {
barrier2.await();
} catch (Exception e) {
throw new RuntimeException(e);
}
curTx.put(new Parent(JObjectKey.of(key2), "John2"));
curTx.put(new Parent(JObjectKey.of(key3), "John3"));
Log.info("Thread 1 committing");
});
Log.info("Thread 1 commited");
return null;
}, () -> {
txm.run(() -> {
Log.info("Thread 2 starting tx");
try {
barrier.await();
barrier2.await();
var iter = curTx.getIterator(IteratorStart.GT, new JObjectKey(key));
var got = iter.next();
Assertions.assertEquals(key1, got.getKey().name());
got = iter.next();
Assertions.assertEquals(key4, got.getKey().name());
} catch (Exception e) {
throw new RuntimeException(e);
}
});
Log.info("Thread 2 finished");
return null;
});
Log.info("All threads finished");
txm.run(() -> {
var iter = curTx.getIterator(IteratorStart.GT, new JObjectKey(key));
var got = iter.next();
Assertions.assertEquals(key1, got.getKey().name());
got = iter.next();
Assertions.assertEquals(key2, got.getKey().name());
got = iter.next();
Assertions.assertEquals(key3, got.getKey().name());
got = iter.next();
Assertions.assertEquals(key4, got.getKey().name());
});
txm.run(() -> {
curTx.delete(new JObjectKey(key));
curTx.delete(new JObjectKey(key1));
curTx.delete(new JObjectKey(key2));
curTx.delete(new JObjectKey(key3));
curTx.delete(new JObjectKey(key4));
});
}
// }
//
// @Test

View File

@@ -1,5 +1,6 @@
package com.usatiuk.dhfs.objects.repository.peersync;
import com.google.protobuf.ByteString;
import com.usatiuk.dhfs.objects.JDataRemote;
import com.usatiuk.dhfs.objects.JObjectKey;
import com.usatiuk.dhfs.objects.PeerId;
@@ -7,12 +8,12 @@ import com.usatiuk.dhfs.objects.repository.CertificateTools;
import java.security.cert.X509Certificate;
public record PeerInfo(JObjectKey key, PeerId id, byte[] cert) implements JDataRemote {
public record PeerInfo(JObjectKey key, PeerId id, ByteString cert) implements JDataRemote {
public PeerInfo(PeerId id, byte[] cert) {
this(id.toJObjectKey(), id, cert);
this(id.toJObjectKey(), id, ByteString.copyFrom(cert));
}
public X509Certificate parsedCert() {
return CertificateTools.certFromBytes(cert);
return CertificateTools.certFromBytes(cert.toByteArray());
}
}