mirror of
https://github.com/usatiuk/dhfs.git
synced 2025-10-28 20:47:49 +01:00
Compare commits
3 Commits
469a6b9011
...
de211bb2d2
| Author | SHA1 | Date | |
|---|---|---|---|
| de211bb2d2 | |||
| 56ab3bad4c | |||
| 9403556220 |
@@ -31,7 +31,6 @@ public class CachingObjectPersistentStore {
|
|||||||
SerializingObjectPersistentStore delegate;
|
SerializingObjectPersistentStore delegate;
|
||||||
@ConfigProperty(name = "dhfs.objects.lru.print-stats")
|
@ConfigProperty(name = "dhfs.objects.lru.print-stats")
|
||||||
boolean printStats;
|
boolean printStats;
|
||||||
private ExecutorService _commitExecutor;
|
|
||||||
private ExecutorService _statusExecutor;
|
private ExecutorService _statusExecutor;
|
||||||
private AtomicLong _cached = new AtomicLong();
|
private AtomicLong _cached = new AtomicLong();
|
||||||
private AtomicLong _cacheTries = new AtomicLong();
|
private AtomicLong _cacheTries = new AtomicLong();
|
||||||
@@ -47,7 +46,6 @@ public class CachingObjectPersistentStore {
|
|||||||
_cache.set(_cache.get().withVersion(s.id()));
|
_cache.set(_cache.get().withVersion(s.id()));
|
||||||
}
|
}
|
||||||
|
|
||||||
_commitExecutor = Executors.newSingleThreadExecutor();
|
|
||||||
if (printStats) {
|
if (printStats) {
|
||||||
_statusExecutor = Executors.newSingleThreadExecutor();
|
_statusExecutor = Executors.newSingleThreadExecutor();
|
||||||
_statusExecutor.submit(() -> {
|
_statusExecutor.submit(() -> {
|
||||||
@@ -68,7 +66,6 @@ public class CachingObjectPersistentStore {
|
|||||||
Log.tracev("Committing: {0} writes, {1} deletes", objs.written().size(), objs.deleted().size());
|
Log.tracev("Committing: {0} writes, {1} deletes", objs.written().size(), objs.deleted().size());
|
||||||
|
|
||||||
var cache = _cache.get();
|
var cache = _cache.get();
|
||||||
var commitFuture = _commitExecutor.submit(() -> delegate.prepareTx(objs, txId).run());
|
|
||||||
for (var write : objs.written()) {
|
for (var write : objs.written()) {
|
||||||
cache = cache.withPut(write.getLeft(), Optional.of(write.getRight()));
|
cache = cache.withPut(write.getLeft(), Optional.of(write.getRight()));
|
||||||
}
|
}
|
||||||
@@ -76,11 +73,7 @@ public class CachingObjectPersistentStore {
|
|||||||
cache = cache.withPut(del, Optional.empty());
|
cache = cache.withPut(del, Optional.empty());
|
||||||
}
|
}
|
||||||
cache = cache.withVersion(txId);
|
cache = cache.withVersion(txId);
|
||||||
try {
|
delegate.commitTx(objs, txId);
|
||||||
commitFuture.get();
|
|
||||||
} catch (Exception e) {
|
|
||||||
throw new RuntimeException(e);
|
|
||||||
}
|
|
||||||
_cache.set(cache);
|
_cache.set(cache);
|
||||||
|
|
||||||
Log.tracev("Committed: {0} writes, {1} deletes", objs.written().size(), objs.deleted().size());
|
Log.tracev("Committed: {0} writes, {1} deletes", objs.written().size(), objs.deleted().size());
|
||||||
|
|||||||
@@ -145,10 +145,9 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Runnable prepareTx(TxManifestRaw names, long txId) {
|
public void commitTx(TxManifestRaw names, long txId) {
|
||||||
verifyReady();
|
verifyReady();
|
||||||
var txn = _env.txnWrite();
|
try (var txn = _env.txnWrite()) {
|
||||||
try {
|
|
||||||
for (var written : names.written()) {
|
for (var written : names.written()) {
|
||||||
var putBb = _db.reserve(txn, written.getKey().toByteBuffer(), written.getValue().size());
|
var putBb = _db.reserve(txn, written.getKey().toByteBuffer(), written.getValue().size());
|
||||||
written.getValue().copyTo(putBb);
|
written.getValue().copyTo(putBb);
|
||||||
@@ -163,17 +162,8 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore {
|
|||||||
bbData.putLong(txId);
|
bbData.putLong(txId);
|
||||||
bbData.flip();
|
bbData.flip();
|
||||||
_db.put(txn, DB_VER_OBJ_NAME.asReadOnlyBuffer(), bbData);
|
_db.put(txn, DB_VER_OBJ_NAME.asReadOnlyBuffer(), bbData);
|
||||||
} catch (Throwable t) {
|
txn.commit();
|
||||||
txn.close();
|
|
||||||
throw t;
|
|
||||||
}
|
}
|
||||||
return () -> {
|
|
||||||
try {
|
|
||||||
txn.commit();
|
|
||||||
} finally {
|
|
||||||
txn.close();
|
|
||||||
}
|
|
||||||
};
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|||||||
@@ -53,19 +53,18 @@ public class MemoryObjectPersistentStore implements ObjectPersistentStore {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public Runnable prepareTx(TxManifestRaw names, long txId) {
|
@Override
|
||||||
return () -> {
|
public void commitTx(TxManifestRaw names, long txId) {
|
||||||
synchronized (this) {
|
synchronized (this) {
|
||||||
for (var written : names.written()) {
|
for (var written : names.written()) {
|
||||||
_objects = _objects.plus(written.getKey(), written.getValue());
|
_objects = _objects.plus(written.getKey(), written.getValue());
|
||||||
}
|
|
||||||
for (JObjectKey key : names.deleted()) {
|
|
||||||
_objects = _objects.minus(key);
|
|
||||||
}
|
|
||||||
assert txId > _lastCommitId;
|
|
||||||
_lastCommitId = txId;
|
|
||||||
}
|
}
|
||||||
};
|
for (JObjectKey key : names.deleted()) {
|
||||||
|
_objects = _objects.minus(key);
|
||||||
|
}
|
||||||
|
assert txId > _lastCommitId;
|
||||||
|
_lastCommitId = txId;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|||||||
@@ -13,7 +13,7 @@ import java.util.Optional;
|
|||||||
public interface ObjectPersistentStore {
|
public interface ObjectPersistentStore {
|
||||||
Snapshot<JObjectKey, ByteBuffer> getSnapshot();
|
Snapshot<JObjectKey, ByteBuffer> getSnapshot();
|
||||||
|
|
||||||
Runnable prepareTx(TxManifestRaw names, long txId);
|
void commitTx(TxManifestRaw names, long txId);
|
||||||
|
|
||||||
long getTotalSpace();
|
long getTotalSpace();
|
||||||
|
|
||||||
|
|||||||
@@ -62,7 +62,7 @@ public class SerializingObjectPersistentStore {
|
|||||||
, objs.deleted());
|
, objs.deleted());
|
||||||
}
|
}
|
||||||
|
|
||||||
Runnable prepareTx(TxManifestObj<? extends JDataVersionedWrapper> objects, long txId) {
|
void commitTx(TxManifestObj<? extends JDataVersionedWrapper> objects, long txId) {
|
||||||
return delegateStore.prepareTx(prepareManifest(objects), txId);
|
delegateStore.commitTx(prepareManifest(objects), txId);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -188,7 +188,7 @@ public class WritebackObjectPersistentStore {
|
|||||||
Log.info("Writeback thread exiting");
|
Log.info("Writeback thread exiting");
|
||||||
}
|
}
|
||||||
|
|
||||||
public long commitBundle(Collection<TxRecord.TxObjectRecord<?>> writes) {
|
private long commitBundle(Collection<TxRecord.TxObjectRecord<?>> writes) {
|
||||||
verifyReady();
|
verifyReady();
|
||||||
_pendingBundleLock.lock();
|
_pendingBundleLock.lock();
|
||||||
try {
|
try {
|
||||||
|
|||||||
@@ -1,5 +0,0 @@
|
|||||||
package com.usatiuk.objects.transaction;
|
|
||||||
|
|
||||||
public interface TransactionFactory {
|
|
||||||
TransactionPrivate createTransaction();
|
|
||||||
}
|
|
||||||
@@ -1,262 +0,0 @@
|
|||||||
package com.usatiuk.objects.transaction;
|
|
||||||
|
|
||||||
import com.usatiuk.objects.JData;
|
|
||||||
import com.usatiuk.objects.JDataVersionedWrapper;
|
|
||||||
import com.usatiuk.objects.JObjectKey;
|
|
||||||
import com.usatiuk.objects.iterators.*;
|
|
||||||
import com.usatiuk.objects.snapshot.Snapshot;
|
|
||||||
import com.usatiuk.objects.stores.WritebackObjectPersistentStore;
|
|
||||||
import com.usatiuk.utils.ListUtils;
|
|
||||||
import io.quarkus.logging.Log;
|
|
||||||
import jakarta.inject.Inject;
|
|
||||||
import jakarta.inject.Singleton;
|
|
||||||
import org.apache.commons.lang3.tuple.Pair;
|
|
||||||
import org.eclipse.microprofile.config.inject.ConfigProperty;
|
|
||||||
|
|
||||||
import java.util.*;
|
|
||||||
|
|
||||||
@Singleton
|
|
||||||
public class TransactionFactoryImpl implements TransactionFactory {
|
|
||||||
@Inject
|
|
||||||
WritebackObjectPersistentStore writebackObjectPersistentStore;
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public TransactionPrivate createTransaction() {
|
|
||||||
return new TransactionImpl();
|
|
||||||
}
|
|
||||||
|
|
||||||
private interface ReadTrackingInternalCrap {
|
|
||||||
boolean fromSource();
|
|
||||||
|
|
||||||
JData obj();
|
|
||||||
}
|
|
||||||
|
|
||||||
// FIXME:
|
|
||||||
private record ReadTrackingInternalCrapSource(JDataVersionedWrapper wrapped) implements ReadTrackingInternalCrap {
|
|
||||||
@Override
|
|
||||||
public boolean fromSource() {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public JData obj() {
|
|
||||||
return wrapped.data();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private record ReadTrackingInternalCrapTx(JData obj) implements ReadTrackingInternalCrap {
|
|
||||||
@Override
|
|
||||||
public boolean fromSource() {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private class TransactionImpl implements TransactionPrivate {
|
|
||||||
private final Map<JObjectKey, Optional<JDataVersionedWrapper>> _readSet = new HashMap<>();
|
|
||||||
private final NavigableMap<JObjectKey, TxRecord.TxObjectRecord<?>> _writes = new TreeMap<>();
|
|
||||||
private final List<Runnable> _onCommit = new LinkedList<>();
|
|
||||||
private final List<Runnable> _onFlush = new LinkedList<>();
|
|
||||||
private final HashSet<JObjectKey> _knownNew = new HashSet<>();
|
|
||||||
private final Snapshot<JObjectKey, JDataVersionedWrapper> _snapshot;
|
|
||||||
private boolean _closed = false;
|
|
||||||
|
|
||||||
private boolean _writeTrack = false;
|
|
||||||
private Map<JObjectKey, TxRecord.TxObjectRecord<?>> _newWrites = new HashMap<>();
|
|
||||||
|
|
||||||
private TransactionImpl() {
|
|
||||||
_snapshot = writebackObjectPersistentStore.getSnapshot();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onCommit(Runnable runnable) {
|
|
||||||
_onCommit.add(runnable);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onFlush(Runnable runnable) {
|
|
||||||
_onFlush.add(runnable);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Collection<Runnable> getOnCommit() {
|
|
||||||
return Collections.unmodifiableCollection(_onCommit);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Snapshot<JObjectKey, JDataVersionedWrapper> snapshot() {
|
|
||||||
return _snapshot;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Collection<Runnable> getOnFlush() {
|
|
||||||
return Collections.unmodifiableCollection(_onFlush);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public <T extends JData> Optional<T> getFromSource(Class<T> type, JObjectKey key) {
|
|
||||||
if (_knownNew.contains(key)) {
|
|
||||||
return Optional.empty();
|
|
||||||
}
|
|
||||||
|
|
||||||
return _readSet.computeIfAbsent(key, _snapshot::readObject)
|
|
||||||
.map(JDataVersionedWrapper::data)
|
|
||||||
.map(type::cast);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public <T extends JData> Optional<T> get(Class<T> type, JObjectKey key) {
|
|
||||||
return switch (_writes.get(key)) {
|
|
||||||
case TxRecord.TxObjectRecordWrite<?> write -> Optional.of(type.cast(write.data()));
|
|
||||||
case TxRecord.TxObjectRecordDeleted deleted -> Optional.empty();
|
|
||||||
case null -> getFromSource(type, key);
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void delete(JObjectKey key) {
|
|
||||||
var record = new TxRecord.TxObjectRecordDeleted(key);
|
|
||||||
if (_writes.put(key, record) instanceof TxRecord.TxObjectRecordDeleted) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
if (_writeTrack)
|
|
||||||
_newWrites.put(key, record);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public CloseableKvIterator<JObjectKey, JData> getIterator(IteratorStart start, JObjectKey key) {
|
|
||||||
Log.tracev("Getting tx iterator with start={0}, key={1}", start, key);
|
|
||||||
return new ReadTrackingIterator(new TombstoneSkippingIterator<JObjectKey, ReadTrackingInternalCrap>(start, key,
|
|
||||||
ListUtils.prependAndMap(
|
|
||||||
new MappingKvIterator<>(new NavigableMapKvIterator<>(_writes, start, key),
|
|
||||||
t -> switch (t) {
|
|
||||||
case TxRecord.TxObjectRecordWrite<?> write ->
|
|
||||||
new DataWrapper<ReadTrackingInternalCrap>(new ReadTrackingInternalCrapTx(write.data()));
|
|
||||||
case TxRecord.TxObjectRecordDeleted deleted ->
|
|
||||||
new TombstoneImpl<ReadTrackingInternalCrap>();
|
|
||||||
case null, default -> null;
|
|
||||||
}),
|
|
||||||
_snapshot.getIterator(start, key),
|
|
||||||
itin -> new MappingKvIterator<JObjectKey, MaybeTombstone<JDataVersionedWrapper>, MaybeTombstone<ReadTrackingInternalCrap>>(itin,
|
|
||||||
d -> switch (d) {
|
|
||||||
case Data<JDataVersionedWrapper> w ->
|
|
||||||
new DataWrapper<>(new ReadTrackingInternalCrapSource(w.value()));
|
|
||||||
case Tombstone<JDataVersionedWrapper> t -> new TombstoneImpl<>();
|
|
||||||
case null, default -> null;
|
|
||||||
}))));
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void put(JData obj) {
|
|
||||||
var key = obj.key();
|
|
||||||
var read = _readSet.get(key);
|
|
||||||
if (read != null && (read.map(JDataVersionedWrapper::data).orElse(null) == obj)) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
var record = new TxRecord.TxObjectRecordWrite<>(obj);
|
|
||||||
_writes.put(key, record);
|
|
||||||
if (_writeTrack)
|
|
||||||
_newWrites.put(key, record);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void putNew(JData obj) {
|
|
||||||
var key = obj.key();
|
|
||||||
_knownNew.add(key);
|
|
||||||
|
|
||||||
var record = new TxRecord.TxObjectRecordWrite<>(obj);
|
|
||||||
_writes.put(key, record);
|
|
||||||
if (_writeTrack)
|
|
||||||
_newWrites.put(key, record);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Collection<TxRecord.TxObjectRecord<?>> drainNewWrites() {
|
|
||||||
if (!_writeTrack) {
|
|
||||||
_writeTrack = true;
|
|
||||||
return Collections.unmodifiableCollection(_writes.values());
|
|
||||||
}
|
|
||||||
var ret = _newWrites;
|
|
||||||
_newWrites = new HashMap<>();
|
|
||||||
return ret.values();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Map<JObjectKey, Optional<JDataVersionedWrapper>> reads() {
|
|
||||||
return _readSet;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Set<JObjectKey> knownNew() {
|
|
||||||
return _knownNew;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void close() {
|
|
||||||
if (_closed) return;
|
|
||||||
_closed = true;
|
|
||||||
_snapshot.close();
|
|
||||||
}
|
|
||||||
|
|
||||||
private class ReadTrackingIterator implements CloseableKvIterator<JObjectKey, JData> {
|
|
||||||
private final CloseableKvIterator<JObjectKey, ReadTrackingInternalCrap> _backing;
|
|
||||||
|
|
||||||
public ReadTrackingIterator(CloseableKvIterator<JObjectKey, ReadTrackingInternalCrap> backing) {
|
|
||||||
_backing = backing;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public JObjectKey peekNextKey() {
|
|
||||||
return _backing.peekNextKey();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void skip() {
|
|
||||||
_backing.skip();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public JObjectKey peekPrevKey() {
|
|
||||||
return _backing.peekPrevKey();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Pair<JObjectKey, JData> prev() {
|
|
||||||
var got = _backing.prev();
|
|
||||||
if (got.getValue() instanceof ReadTrackingInternalCrapSource(JDataVersionedWrapper wrapped)) {
|
|
||||||
_readSet.putIfAbsent(got.getKey(), Optional.of(wrapped));
|
|
||||||
}
|
|
||||||
return Pair.of(got.getKey(), got.getValue().obj());
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean hasPrev() {
|
|
||||||
return _backing.hasPrev();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void skipPrev() {
|
|
||||||
_backing.skipPrev();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void close() {
|
|
||||||
_backing.close();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean hasNext() {
|
|
||||||
return _backing.hasNext();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Pair<JObjectKey, JData> next() {
|
|
||||||
var got = _backing.next();
|
|
||||||
if (got.getValue() instanceof ReadTrackingInternalCrapSource(JDataVersionedWrapper wrapped)) {
|
|
||||||
_readSet.putIfAbsent(got.getKey(), Optional.of(wrapped));
|
|
||||||
}
|
|
||||||
return Pair.of(got.getKey(), got.getValue().obj());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -1,7 +0,0 @@
|
|||||||
package com.usatiuk.objects.transaction;
|
|
||||||
|
|
||||||
import java.util.Collection;
|
|
||||||
|
|
||||||
public interface TransactionHandlePrivate extends TransactionHandle {
|
|
||||||
Collection<Runnable> getOnFlush();
|
|
||||||
}
|
|
||||||
@@ -0,0 +1,241 @@
|
|||||||
|
package com.usatiuk.objects.transaction;
|
||||||
|
|
||||||
|
import com.usatiuk.objects.JData;
|
||||||
|
import com.usatiuk.objects.JDataVersionedWrapper;
|
||||||
|
import com.usatiuk.objects.JObjectKey;
|
||||||
|
import com.usatiuk.objects.iterators.*;
|
||||||
|
import com.usatiuk.objects.snapshot.Snapshot;
|
||||||
|
import com.usatiuk.utils.AutoCloseableNoThrow;
|
||||||
|
import com.usatiuk.utils.ListUtils;
|
||||||
|
import io.quarkus.logging.Log;
|
||||||
|
import org.apache.commons.lang3.tuple.Pair;
|
||||||
|
|
||||||
|
import java.util.*;
|
||||||
|
|
||||||
|
class TransactionImpl implements Transaction, AutoCloseableNoThrow {
|
||||||
|
private final Map<JObjectKey, Optional<JDataVersionedWrapper>> _readSet = new HashMap<>();
|
||||||
|
private final NavigableMap<JObjectKey, TxRecord.TxObjectRecord<?>> _writes = new TreeMap<>();
|
||||||
|
private final List<Runnable> _onCommit = new LinkedList<>();
|
||||||
|
private final List<Runnable> _onFlush = new LinkedList<>();
|
||||||
|
private final HashSet<JObjectKey> _knownNew = new HashSet<>();
|
||||||
|
private final Snapshot<JObjectKey, JDataVersionedWrapper> _snapshot;
|
||||||
|
private boolean _closed = false;
|
||||||
|
|
||||||
|
private boolean _writeTrack = false;
|
||||||
|
private Map<JObjectKey, TxRecord.TxObjectRecord<?>> _newWrites = new HashMap<>();
|
||||||
|
|
||||||
|
private interface ReadTrackingInternalCrap {
|
||||||
|
boolean fromSource();
|
||||||
|
|
||||||
|
JData obj();
|
||||||
|
}
|
||||||
|
|
||||||
|
// FIXME:
|
||||||
|
private record ReadTrackingInternalCrapSource(JDataVersionedWrapper wrapped) implements ReadTrackingInternalCrap {
|
||||||
|
@Override
|
||||||
|
public boolean fromSource() {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public JData obj() {
|
||||||
|
return wrapped.data();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private record ReadTrackingInternalCrapTx(JData obj) implements ReadTrackingInternalCrap {
|
||||||
|
@Override
|
||||||
|
public boolean fromSource() {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
TransactionImpl(Snapshot<JObjectKey, JDataVersionedWrapper> snapshot) {
|
||||||
|
_snapshot = snapshot;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onCommit(Runnable runnable) {
|
||||||
|
_onCommit.add(runnable);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onFlush(Runnable runnable) {
|
||||||
|
_onFlush.add(runnable);
|
||||||
|
}
|
||||||
|
|
||||||
|
Collection<Runnable> getOnCommit() {
|
||||||
|
return Collections.unmodifiableCollection(_onCommit);
|
||||||
|
}
|
||||||
|
|
||||||
|
Snapshot<JObjectKey, JDataVersionedWrapper> snapshot() {
|
||||||
|
return _snapshot;
|
||||||
|
}
|
||||||
|
|
||||||
|
Collection<Runnable> getOnFlush() {
|
||||||
|
return Collections.unmodifiableCollection(_onFlush);
|
||||||
|
}
|
||||||
|
|
||||||
|
<T extends JData> Optional<T> getFromSource(Class<T> type, JObjectKey key) {
|
||||||
|
if (_knownNew.contains(key)) {
|
||||||
|
return Optional.empty();
|
||||||
|
}
|
||||||
|
|
||||||
|
return _readSet.computeIfAbsent(key, _snapshot::readObject)
|
||||||
|
.map(JDataVersionedWrapper::data)
|
||||||
|
.map(type::cast);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public <T extends JData> Optional<T> get(Class<T> type, JObjectKey key) {
|
||||||
|
return switch (_writes.get(key)) {
|
||||||
|
case TxRecord.TxObjectRecordWrite<?> write -> Optional.of(type.cast(write.data()));
|
||||||
|
case TxRecord.TxObjectRecordDeleted deleted -> Optional.empty();
|
||||||
|
case null -> getFromSource(type, key);
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void delete(JObjectKey key) {
|
||||||
|
var record = new TxRecord.TxObjectRecordDeleted(key);
|
||||||
|
if (_writes.put(key, record) instanceof TxRecord.TxObjectRecordDeleted) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (_writeTrack)
|
||||||
|
_newWrites.put(key, record);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public CloseableKvIterator<JObjectKey, JData> getIterator(IteratorStart start, JObjectKey key) {
|
||||||
|
Log.tracev("Getting tx iterator with start={0}, key={1}", start, key);
|
||||||
|
return new ReadTrackingIterator(new TombstoneSkippingIterator<JObjectKey, ReadTrackingInternalCrap>(start, key,
|
||||||
|
ListUtils.prependAndMap(
|
||||||
|
new MappingKvIterator<>(new NavigableMapKvIterator<>(_writes, start, key),
|
||||||
|
t -> switch (t) {
|
||||||
|
case TxRecord.TxObjectRecordWrite<?> write ->
|
||||||
|
new DataWrapper<ReadTrackingInternalCrap>(new ReadTrackingInternalCrapTx(write.data()));
|
||||||
|
case TxRecord.TxObjectRecordDeleted deleted ->
|
||||||
|
new TombstoneImpl<ReadTrackingInternalCrap>();
|
||||||
|
case null, default -> null;
|
||||||
|
}),
|
||||||
|
_snapshot.getIterator(start, key),
|
||||||
|
itin -> new MappingKvIterator<JObjectKey, MaybeTombstone<JDataVersionedWrapper>, MaybeTombstone<ReadTrackingInternalCrap>>(itin,
|
||||||
|
d -> switch (d) {
|
||||||
|
case Data<JDataVersionedWrapper> w ->
|
||||||
|
new DataWrapper<>(new ReadTrackingInternalCrapSource(w.value()));
|
||||||
|
case Tombstone<JDataVersionedWrapper> t -> new TombstoneImpl<>();
|
||||||
|
case null, default -> null;
|
||||||
|
}))));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void put(JData obj) {
|
||||||
|
var key = obj.key();
|
||||||
|
var read = _readSet.get(key);
|
||||||
|
if (read != null && (read.map(JDataVersionedWrapper::data).orElse(null) == obj)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
var record = new TxRecord.TxObjectRecordWrite<>(obj);
|
||||||
|
_writes.put(key, record);
|
||||||
|
if (_writeTrack)
|
||||||
|
_newWrites.put(key, record);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void putNew(JData obj) {
|
||||||
|
var key = obj.key();
|
||||||
|
_knownNew.add(key);
|
||||||
|
|
||||||
|
var record = new TxRecord.TxObjectRecordWrite<>(obj);
|
||||||
|
_writes.put(key, record);
|
||||||
|
if (_writeTrack)
|
||||||
|
_newWrites.put(key, record);
|
||||||
|
}
|
||||||
|
|
||||||
|
Collection<TxRecord.TxObjectRecord<?>> drainNewWrites() {
|
||||||
|
if (!_writeTrack) {
|
||||||
|
_writeTrack = true;
|
||||||
|
return Collections.unmodifiableCollection(_writes.values());
|
||||||
|
}
|
||||||
|
var ret = _newWrites;
|
||||||
|
_newWrites = new HashMap<>();
|
||||||
|
return ret.values();
|
||||||
|
}
|
||||||
|
|
||||||
|
Map<JObjectKey, Optional<JDataVersionedWrapper>> reads() {
|
||||||
|
return _readSet;
|
||||||
|
}
|
||||||
|
|
||||||
|
Set<JObjectKey> knownNew() {
|
||||||
|
return _knownNew;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() {
|
||||||
|
if (_closed) return;
|
||||||
|
_closed = true;
|
||||||
|
_snapshot.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
private class ReadTrackingIterator implements CloseableKvIterator<JObjectKey, JData> {
|
||||||
|
private final CloseableKvIterator<JObjectKey, ReadTrackingInternalCrap> _backing;
|
||||||
|
|
||||||
|
public ReadTrackingIterator(CloseableKvIterator<JObjectKey, ReadTrackingInternalCrap> backing) {
|
||||||
|
_backing = backing;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public JObjectKey peekNextKey() {
|
||||||
|
return _backing.peekNextKey();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void skip() {
|
||||||
|
_backing.skip();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public JObjectKey peekPrevKey() {
|
||||||
|
return _backing.peekPrevKey();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Pair<JObjectKey, JData> prev() {
|
||||||
|
var got = _backing.prev();
|
||||||
|
if (got.getValue() instanceof ReadTrackingInternalCrapSource(JDataVersionedWrapper wrapped)) {
|
||||||
|
_readSet.putIfAbsent(got.getKey(), Optional.of(wrapped));
|
||||||
|
}
|
||||||
|
return Pair.of(got.getKey(), got.getValue().obj());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean hasPrev() {
|
||||||
|
return _backing.hasPrev();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void skipPrev() {
|
||||||
|
_backing.skipPrev();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() {
|
||||||
|
_backing.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean hasNext() {
|
||||||
|
return _backing.hasNext();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Pair<JObjectKey, JData> next() {
|
||||||
|
var got = _backing.next();
|
||||||
|
if (got.getValue() instanceof ReadTrackingInternalCrapSource(JDataVersionedWrapper wrapped)) {
|
||||||
|
_readSet.putIfAbsent(got.getKey(), Optional.of(wrapped));
|
||||||
|
}
|
||||||
|
return Pair.of(got.getKey(), got.getValue().obj());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -11,14 +11,14 @@ import java.util.Stack;
|
|||||||
|
|
||||||
@Singleton
|
@Singleton
|
||||||
public class TransactionManagerImpl implements TransactionManager {
|
public class TransactionManagerImpl implements TransactionManager {
|
||||||
private static final ThreadLocal<Stack<TransactionPrivate>> _currentTransaction = ThreadLocal.withInitial(Stack::new);
|
private static final ThreadLocal<Stack<TransactionImpl>> _currentTransaction = ThreadLocal.withInitial(Stack::new);
|
||||||
@Inject
|
@Inject
|
||||||
JObjectManager jObjectManager;
|
TransactionService transactionService;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void begin() {
|
public void begin() {
|
||||||
Log.trace("Starting transaction");
|
Log.trace("Starting transaction");
|
||||||
var tx = jObjectManager.createTransaction();
|
var tx = transactionService.createTransaction();
|
||||||
_currentTransaction.get().push(tx);
|
_currentTransaction.get().push(tx);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -34,7 +34,7 @@ public class TransactionManagerImpl implements TransactionManager {
|
|||||||
|
|
||||||
Pair<Collection<Runnable>, TransactionHandle> ret;
|
Pair<Collection<Runnable>, TransactionHandle> ret;
|
||||||
try {
|
try {
|
||||||
ret = jObjectManager.commit(peeked);
|
ret = transactionService.commit(peeked);
|
||||||
} catch (Throwable e) {
|
} catch (Throwable e) {
|
||||||
Log.trace("Transaction commit failed", e);
|
Log.trace("Transaction commit failed", e);
|
||||||
throw e;
|
throw e;
|
||||||
@@ -64,7 +64,7 @@ public class TransactionManagerImpl implements TransactionManager {
|
|||||||
var peeked = stack.peek();
|
var peeked = stack.peek();
|
||||||
|
|
||||||
try {
|
try {
|
||||||
jObjectManager.rollback(peeked);
|
transactionService.rollback(peeked);
|
||||||
} catch (Throwable e) {
|
} catch (Throwable e) {
|
||||||
Log.error("Transaction rollback failed", e);
|
Log.error("Transaction rollback failed", e);
|
||||||
throw e;
|
throw e;
|
||||||
|
|||||||
@@ -1,27 +0,0 @@
|
|||||||
package com.usatiuk.objects.transaction;
|
|
||||||
|
|
||||||
import com.usatiuk.objects.JData;
|
|
||||||
import com.usatiuk.objects.JDataVersionedWrapper;
|
|
||||||
import com.usatiuk.objects.JObjectKey;
|
|
||||||
import com.usatiuk.objects.snapshot.Snapshot;
|
|
||||||
import com.usatiuk.utils.AutoCloseableNoThrow;
|
|
||||||
|
|
||||||
import java.util.Collection;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.Optional;
|
|
||||||
import java.util.Set;
|
|
||||||
|
|
||||||
// The transaction interface actually used by user code to retrieve objects
|
|
||||||
public interface TransactionPrivate extends Transaction, TransactionHandlePrivate, AutoCloseableNoThrow {
|
|
||||||
Collection<TxRecord.TxObjectRecord<?>> drainNewWrites();
|
|
||||||
|
|
||||||
Map<JObjectKey, Optional<JDataVersionedWrapper>> reads();
|
|
||||||
|
|
||||||
Set<JObjectKey> knownNew();
|
|
||||||
|
|
||||||
<T extends JData> Optional<T> getFromSource(Class<T> type, JObjectKey key);
|
|
||||||
|
|
||||||
Collection<Runnable> getOnCommit();
|
|
||||||
|
|
||||||
Snapshot<JObjectKey, JDataVersionedWrapper> snapshot();
|
|
||||||
}
|
|
||||||
@@ -22,12 +22,10 @@ import java.util.function.Consumer;
|
|||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
|
|
||||||
@ApplicationScoped
|
@ApplicationScoped
|
||||||
public class JObjectManager {
|
public class TransactionService {
|
||||||
private static final List<PreCommitTxHook> _preCommitTxHooks;
|
private static final List<PreCommitTxHook> _preCommitTxHooks;
|
||||||
@Inject
|
@Inject
|
||||||
WritebackObjectPersistentStore writebackObjectPersistentStore;
|
WritebackObjectPersistentStore writebackObjectPersistentStore;
|
||||||
@Inject
|
|
||||||
TransactionFactory transactionFactory;
|
|
||||||
|
|
||||||
private boolean _ready = false;
|
private boolean _ready = false;
|
||||||
private final DataLocker _objLocker = new DataLocker();
|
private final DataLocker _objLocker = new DataLocker();
|
||||||
@@ -36,7 +34,7 @@ public class JObjectManager {
|
|||||||
_preCommitTxHooks = List.copyOf(CDI.current().select(PreCommitTxHook.class).stream().sorted(Comparator.comparingInt(PreCommitTxHook::getPriority)).toList());
|
_preCommitTxHooks = List.copyOf(CDI.current().select(PreCommitTxHook.class).stream().sorted(Comparator.comparingInt(PreCommitTxHook::getPriority)).toList());
|
||||||
}
|
}
|
||||||
|
|
||||||
JObjectManager(Instance<PreCommitTxHook> preCommitTxHooks) {
|
TransactionService(Instance<PreCommitTxHook> preCommitTxHooks) {
|
||||||
Log.debugv("Pre-commit hooks: {0}", String.join("->", _preCommitTxHooks.stream().map(Objects::toString).toList()));
|
Log.debugv("Pre-commit hooks: {0}", String.join("->", _preCommitTxHooks.stream().map(Objects::toString).toList()));
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -48,14 +46,14 @@ public class JObjectManager {
|
|||||||
_ready = true;
|
_ready = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
public TransactionPrivate createTransaction() {
|
public TransactionImpl createTransaction() {
|
||||||
verifyReady();
|
verifyReady();
|
||||||
var tx = transactionFactory.createTransaction();
|
var tx = new TransactionImpl(writebackObjectPersistentStore.getSnapshot());
|
||||||
Log.tracev("Created transaction with snapshotId={0}", tx.snapshot().id());
|
Log.tracev("Created transaction with snapshotId={0}", tx.snapshot().id());
|
||||||
return tx;
|
return tx;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Pair<Collection<Runnable>, TransactionHandle> commit(TransactionPrivate tx) {
|
public Pair<Collection<Runnable>, TransactionHandle> commit(TransactionImpl tx) {
|
||||||
verifyReady();
|
verifyReady();
|
||||||
var writes = new HashMap<JObjectKey, TxRecord.TxObjectRecord<?>>();
|
var writes = new HashMap<JObjectKey, TxRecord.TxObjectRecord<?>>();
|
||||||
Snapshot<JObjectKey, JDataVersionedWrapper> commitSnapshot = null;
|
Snapshot<JObjectKey, JDataVersionedWrapper> commitSnapshot = null;
|
||||||
@@ -259,7 +257,7 @@ public class JObjectManager {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void rollback(TransactionPrivate tx) {
|
public void rollback(TransactionImpl tx) {
|
||||||
verifyReady();
|
verifyReady();
|
||||||
tx.close();
|
tx.close();
|
||||||
}
|
}
|
||||||
Reference in New Issue
Block a user