mirror of
https://github.com/usatiuk/dhfs.git
synced 2025-10-28 20:47:49 +01:00
Compare commits
3 Commits
469a6b9011
...
de211bb2d2
| Author | SHA1 | Date | |
|---|---|---|---|
| de211bb2d2 | |||
| 56ab3bad4c | |||
| 9403556220 |
@@ -31,7 +31,6 @@ public class CachingObjectPersistentStore {
|
||||
SerializingObjectPersistentStore delegate;
|
||||
@ConfigProperty(name = "dhfs.objects.lru.print-stats")
|
||||
boolean printStats;
|
||||
private ExecutorService _commitExecutor;
|
||||
private ExecutorService _statusExecutor;
|
||||
private AtomicLong _cached = new AtomicLong();
|
||||
private AtomicLong _cacheTries = new AtomicLong();
|
||||
@@ -47,7 +46,6 @@ public class CachingObjectPersistentStore {
|
||||
_cache.set(_cache.get().withVersion(s.id()));
|
||||
}
|
||||
|
||||
_commitExecutor = Executors.newSingleThreadExecutor();
|
||||
if (printStats) {
|
||||
_statusExecutor = Executors.newSingleThreadExecutor();
|
||||
_statusExecutor.submit(() -> {
|
||||
@@ -68,7 +66,6 @@ public class CachingObjectPersistentStore {
|
||||
Log.tracev("Committing: {0} writes, {1} deletes", objs.written().size(), objs.deleted().size());
|
||||
|
||||
var cache = _cache.get();
|
||||
var commitFuture = _commitExecutor.submit(() -> delegate.prepareTx(objs, txId).run());
|
||||
for (var write : objs.written()) {
|
||||
cache = cache.withPut(write.getLeft(), Optional.of(write.getRight()));
|
||||
}
|
||||
@@ -76,11 +73,7 @@ public class CachingObjectPersistentStore {
|
||||
cache = cache.withPut(del, Optional.empty());
|
||||
}
|
||||
cache = cache.withVersion(txId);
|
||||
try {
|
||||
commitFuture.get();
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
delegate.commitTx(objs, txId);
|
||||
_cache.set(cache);
|
||||
|
||||
Log.tracev("Committed: {0} writes, {1} deletes", objs.written().size(), objs.deleted().size());
|
||||
|
||||
@@ -145,10 +145,9 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore {
|
||||
}
|
||||
|
||||
@Override
|
||||
public Runnable prepareTx(TxManifestRaw names, long txId) {
|
||||
public void commitTx(TxManifestRaw names, long txId) {
|
||||
verifyReady();
|
||||
var txn = _env.txnWrite();
|
||||
try {
|
||||
try (var txn = _env.txnWrite()) {
|
||||
for (var written : names.written()) {
|
||||
var putBb = _db.reserve(txn, written.getKey().toByteBuffer(), written.getValue().size());
|
||||
written.getValue().copyTo(putBb);
|
||||
@@ -163,17 +162,8 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore {
|
||||
bbData.putLong(txId);
|
||||
bbData.flip();
|
||||
_db.put(txn, DB_VER_OBJ_NAME.asReadOnlyBuffer(), bbData);
|
||||
} catch (Throwable t) {
|
||||
txn.close();
|
||||
throw t;
|
||||
}
|
||||
return () -> {
|
||||
try {
|
||||
txn.commit();
|
||||
} finally {
|
||||
txn.close();
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -53,8 +53,8 @@ public class MemoryObjectPersistentStore implements ObjectPersistentStore {
|
||||
}
|
||||
}
|
||||
|
||||
public Runnable prepareTx(TxManifestRaw names, long txId) {
|
||||
return () -> {
|
||||
@Override
|
||||
public void commitTx(TxManifestRaw names, long txId) {
|
||||
synchronized (this) {
|
||||
for (var written : names.written()) {
|
||||
_objects = _objects.plus(written.getKey(), written.getValue());
|
||||
@@ -65,7 +65,6 @@ public class MemoryObjectPersistentStore implements ObjectPersistentStore {
|
||||
assert txId > _lastCommitId;
|
||||
_lastCommitId = txId;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -13,7 +13,7 @@ import java.util.Optional;
|
||||
public interface ObjectPersistentStore {
|
||||
Snapshot<JObjectKey, ByteBuffer> getSnapshot();
|
||||
|
||||
Runnable prepareTx(TxManifestRaw names, long txId);
|
||||
void commitTx(TxManifestRaw names, long txId);
|
||||
|
||||
long getTotalSpace();
|
||||
|
||||
|
||||
@@ -62,7 +62,7 @@ public class SerializingObjectPersistentStore {
|
||||
, objs.deleted());
|
||||
}
|
||||
|
||||
Runnable prepareTx(TxManifestObj<? extends JDataVersionedWrapper> objects, long txId) {
|
||||
return delegateStore.prepareTx(prepareManifest(objects), txId);
|
||||
void commitTx(TxManifestObj<? extends JDataVersionedWrapper> objects, long txId) {
|
||||
delegateStore.commitTx(prepareManifest(objects), txId);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -188,7 +188,7 @@ public class WritebackObjectPersistentStore {
|
||||
Log.info("Writeback thread exiting");
|
||||
}
|
||||
|
||||
public long commitBundle(Collection<TxRecord.TxObjectRecord<?>> writes) {
|
||||
private long commitBundle(Collection<TxRecord.TxObjectRecord<?>> writes) {
|
||||
verifyReady();
|
||||
_pendingBundleLock.lock();
|
||||
try {
|
||||
|
||||
@@ -1,5 +0,0 @@
|
||||
package com.usatiuk.objects.transaction;
|
||||
|
||||
public interface TransactionFactory {
|
||||
TransactionPrivate createTransaction();
|
||||
}
|
||||
@@ -1,262 +0,0 @@
|
||||
package com.usatiuk.objects.transaction;
|
||||
|
||||
import com.usatiuk.objects.JData;
|
||||
import com.usatiuk.objects.JDataVersionedWrapper;
|
||||
import com.usatiuk.objects.JObjectKey;
|
||||
import com.usatiuk.objects.iterators.*;
|
||||
import com.usatiuk.objects.snapshot.Snapshot;
|
||||
import com.usatiuk.objects.stores.WritebackObjectPersistentStore;
|
||||
import com.usatiuk.utils.ListUtils;
|
||||
import io.quarkus.logging.Log;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Singleton;
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
import org.eclipse.microprofile.config.inject.ConfigProperty;
|
||||
|
||||
import java.util.*;
|
||||
|
||||
@Singleton
|
||||
public class TransactionFactoryImpl implements TransactionFactory {
|
||||
@Inject
|
||||
WritebackObjectPersistentStore writebackObjectPersistentStore;
|
||||
|
||||
@Override
|
||||
public TransactionPrivate createTransaction() {
|
||||
return new TransactionImpl();
|
||||
}
|
||||
|
||||
private interface ReadTrackingInternalCrap {
|
||||
boolean fromSource();
|
||||
|
||||
JData obj();
|
||||
}
|
||||
|
||||
// FIXME:
|
||||
private record ReadTrackingInternalCrapSource(JDataVersionedWrapper wrapped) implements ReadTrackingInternalCrap {
|
||||
@Override
|
||||
public boolean fromSource() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public JData obj() {
|
||||
return wrapped.data();
|
||||
}
|
||||
}
|
||||
|
||||
private record ReadTrackingInternalCrapTx(JData obj) implements ReadTrackingInternalCrap {
|
||||
@Override
|
||||
public boolean fromSource() {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
private class TransactionImpl implements TransactionPrivate {
|
||||
private final Map<JObjectKey, Optional<JDataVersionedWrapper>> _readSet = new HashMap<>();
|
||||
private final NavigableMap<JObjectKey, TxRecord.TxObjectRecord<?>> _writes = new TreeMap<>();
|
||||
private final List<Runnable> _onCommit = new LinkedList<>();
|
||||
private final List<Runnable> _onFlush = new LinkedList<>();
|
||||
private final HashSet<JObjectKey> _knownNew = new HashSet<>();
|
||||
private final Snapshot<JObjectKey, JDataVersionedWrapper> _snapshot;
|
||||
private boolean _closed = false;
|
||||
|
||||
private boolean _writeTrack = false;
|
||||
private Map<JObjectKey, TxRecord.TxObjectRecord<?>> _newWrites = new HashMap<>();
|
||||
|
||||
private TransactionImpl() {
|
||||
_snapshot = writebackObjectPersistentStore.getSnapshot();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onCommit(Runnable runnable) {
|
||||
_onCommit.add(runnable);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFlush(Runnable runnable) {
|
||||
_onFlush.add(runnable);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Collection<Runnable> getOnCommit() {
|
||||
return Collections.unmodifiableCollection(_onCommit);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Snapshot<JObjectKey, JDataVersionedWrapper> snapshot() {
|
||||
return _snapshot;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Collection<Runnable> getOnFlush() {
|
||||
return Collections.unmodifiableCollection(_onFlush);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T extends JData> Optional<T> getFromSource(Class<T> type, JObjectKey key) {
|
||||
if (_knownNew.contains(key)) {
|
||||
return Optional.empty();
|
||||
}
|
||||
|
||||
return _readSet.computeIfAbsent(key, _snapshot::readObject)
|
||||
.map(JDataVersionedWrapper::data)
|
||||
.map(type::cast);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T extends JData> Optional<T> get(Class<T> type, JObjectKey key) {
|
||||
return switch (_writes.get(key)) {
|
||||
case TxRecord.TxObjectRecordWrite<?> write -> Optional.of(type.cast(write.data()));
|
||||
case TxRecord.TxObjectRecordDeleted deleted -> Optional.empty();
|
||||
case null -> getFromSource(type, key);
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public void delete(JObjectKey key) {
|
||||
var record = new TxRecord.TxObjectRecordDeleted(key);
|
||||
if (_writes.put(key, record) instanceof TxRecord.TxObjectRecordDeleted) {
|
||||
return;
|
||||
}
|
||||
if (_writeTrack)
|
||||
_newWrites.put(key, record);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CloseableKvIterator<JObjectKey, JData> getIterator(IteratorStart start, JObjectKey key) {
|
||||
Log.tracev("Getting tx iterator with start={0}, key={1}", start, key);
|
||||
return new ReadTrackingIterator(new TombstoneSkippingIterator<JObjectKey, ReadTrackingInternalCrap>(start, key,
|
||||
ListUtils.prependAndMap(
|
||||
new MappingKvIterator<>(new NavigableMapKvIterator<>(_writes, start, key),
|
||||
t -> switch (t) {
|
||||
case TxRecord.TxObjectRecordWrite<?> write ->
|
||||
new DataWrapper<ReadTrackingInternalCrap>(new ReadTrackingInternalCrapTx(write.data()));
|
||||
case TxRecord.TxObjectRecordDeleted deleted ->
|
||||
new TombstoneImpl<ReadTrackingInternalCrap>();
|
||||
case null, default -> null;
|
||||
}),
|
||||
_snapshot.getIterator(start, key),
|
||||
itin -> new MappingKvIterator<JObjectKey, MaybeTombstone<JDataVersionedWrapper>, MaybeTombstone<ReadTrackingInternalCrap>>(itin,
|
||||
d -> switch (d) {
|
||||
case Data<JDataVersionedWrapper> w ->
|
||||
new DataWrapper<>(new ReadTrackingInternalCrapSource(w.value()));
|
||||
case Tombstone<JDataVersionedWrapper> t -> new TombstoneImpl<>();
|
||||
case null, default -> null;
|
||||
}))));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void put(JData obj) {
|
||||
var key = obj.key();
|
||||
var read = _readSet.get(key);
|
||||
if (read != null && (read.map(JDataVersionedWrapper::data).orElse(null) == obj)) {
|
||||
return;
|
||||
}
|
||||
|
||||
var record = new TxRecord.TxObjectRecordWrite<>(obj);
|
||||
_writes.put(key, record);
|
||||
if (_writeTrack)
|
||||
_newWrites.put(key, record);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void putNew(JData obj) {
|
||||
var key = obj.key();
|
||||
_knownNew.add(key);
|
||||
|
||||
var record = new TxRecord.TxObjectRecordWrite<>(obj);
|
||||
_writes.put(key, record);
|
||||
if (_writeTrack)
|
||||
_newWrites.put(key, record);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Collection<TxRecord.TxObjectRecord<?>> drainNewWrites() {
|
||||
if (!_writeTrack) {
|
||||
_writeTrack = true;
|
||||
return Collections.unmodifiableCollection(_writes.values());
|
||||
}
|
||||
var ret = _newWrites;
|
||||
_newWrites = new HashMap<>();
|
||||
return ret.values();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<JObjectKey, Optional<JDataVersionedWrapper>> reads() {
|
||||
return _readSet;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<JObjectKey> knownNew() {
|
||||
return _knownNew;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
if (_closed) return;
|
||||
_closed = true;
|
||||
_snapshot.close();
|
||||
}
|
||||
|
||||
private class ReadTrackingIterator implements CloseableKvIterator<JObjectKey, JData> {
|
||||
private final CloseableKvIterator<JObjectKey, ReadTrackingInternalCrap> _backing;
|
||||
|
||||
public ReadTrackingIterator(CloseableKvIterator<JObjectKey, ReadTrackingInternalCrap> backing) {
|
||||
_backing = backing;
|
||||
}
|
||||
|
||||
@Override
|
||||
public JObjectKey peekNextKey() {
|
||||
return _backing.peekNextKey();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void skip() {
|
||||
_backing.skip();
|
||||
}
|
||||
|
||||
@Override
|
||||
public JObjectKey peekPrevKey() {
|
||||
return _backing.peekPrevKey();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Pair<JObjectKey, JData> prev() {
|
||||
var got = _backing.prev();
|
||||
if (got.getValue() instanceof ReadTrackingInternalCrapSource(JDataVersionedWrapper wrapped)) {
|
||||
_readSet.putIfAbsent(got.getKey(), Optional.of(wrapped));
|
||||
}
|
||||
return Pair.of(got.getKey(), got.getValue().obj());
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasPrev() {
|
||||
return _backing.hasPrev();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void skipPrev() {
|
||||
_backing.skipPrev();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
_backing.close();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasNext() {
|
||||
return _backing.hasNext();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Pair<JObjectKey, JData> next() {
|
||||
var got = _backing.next();
|
||||
if (got.getValue() instanceof ReadTrackingInternalCrapSource(JDataVersionedWrapper wrapped)) {
|
||||
_readSet.putIfAbsent(got.getKey(), Optional.of(wrapped));
|
||||
}
|
||||
return Pair.of(got.getKey(), got.getValue().obj());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,7 +0,0 @@
|
||||
package com.usatiuk.objects.transaction;
|
||||
|
||||
import java.util.Collection;
|
||||
|
||||
public interface TransactionHandlePrivate extends TransactionHandle {
|
||||
Collection<Runnable> getOnFlush();
|
||||
}
|
||||
@@ -0,0 +1,241 @@
|
||||
package com.usatiuk.objects.transaction;
|
||||
|
||||
import com.usatiuk.objects.JData;
|
||||
import com.usatiuk.objects.JDataVersionedWrapper;
|
||||
import com.usatiuk.objects.JObjectKey;
|
||||
import com.usatiuk.objects.iterators.*;
|
||||
import com.usatiuk.objects.snapshot.Snapshot;
|
||||
import com.usatiuk.utils.AutoCloseableNoThrow;
|
||||
import com.usatiuk.utils.ListUtils;
|
||||
import io.quarkus.logging.Log;
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
|
||||
import java.util.*;
|
||||
|
||||
class TransactionImpl implements Transaction, AutoCloseableNoThrow {
|
||||
private final Map<JObjectKey, Optional<JDataVersionedWrapper>> _readSet = new HashMap<>();
|
||||
private final NavigableMap<JObjectKey, TxRecord.TxObjectRecord<?>> _writes = new TreeMap<>();
|
||||
private final List<Runnable> _onCommit = new LinkedList<>();
|
||||
private final List<Runnable> _onFlush = new LinkedList<>();
|
||||
private final HashSet<JObjectKey> _knownNew = new HashSet<>();
|
||||
private final Snapshot<JObjectKey, JDataVersionedWrapper> _snapshot;
|
||||
private boolean _closed = false;
|
||||
|
||||
private boolean _writeTrack = false;
|
||||
private Map<JObjectKey, TxRecord.TxObjectRecord<?>> _newWrites = new HashMap<>();
|
||||
|
||||
private interface ReadTrackingInternalCrap {
|
||||
boolean fromSource();
|
||||
|
||||
JData obj();
|
||||
}
|
||||
|
||||
// FIXME:
|
||||
private record ReadTrackingInternalCrapSource(JDataVersionedWrapper wrapped) implements ReadTrackingInternalCrap {
|
||||
@Override
|
||||
public boolean fromSource() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public JData obj() {
|
||||
return wrapped.data();
|
||||
}
|
||||
}
|
||||
|
||||
private record ReadTrackingInternalCrapTx(JData obj) implements ReadTrackingInternalCrap {
|
||||
@Override
|
||||
public boolean fromSource() {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
TransactionImpl(Snapshot<JObjectKey, JDataVersionedWrapper> snapshot) {
|
||||
_snapshot = snapshot;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onCommit(Runnable runnable) {
|
||||
_onCommit.add(runnable);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFlush(Runnable runnable) {
|
||||
_onFlush.add(runnable);
|
||||
}
|
||||
|
||||
Collection<Runnable> getOnCommit() {
|
||||
return Collections.unmodifiableCollection(_onCommit);
|
||||
}
|
||||
|
||||
Snapshot<JObjectKey, JDataVersionedWrapper> snapshot() {
|
||||
return _snapshot;
|
||||
}
|
||||
|
||||
Collection<Runnable> getOnFlush() {
|
||||
return Collections.unmodifiableCollection(_onFlush);
|
||||
}
|
||||
|
||||
<T extends JData> Optional<T> getFromSource(Class<T> type, JObjectKey key) {
|
||||
if (_knownNew.contains(key)) {
|
||||
return Optional.empty();
|
||||
}
|
||||
|
||||
return _readSet.computeIfAbsent(key, _snapshot::readObject)
|
||||
.map(JDataVersionedWrapper::data)
|
||||
.map(type::cast);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T extends JData> Optional<T> get(Class<T> type, JObjectKey key) {
|
||||
return switch (_writes.get(key)) {
|
||||
case TxRecord.TxObjectRecordWrite<?> write -> Optional.of(type.cast(write.data()));
|
||||
case TxRecord.TxObjectRecordDeleted deleted -> Optional.empty();
|
||||
case null -> getFromSource(type, key);
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public void delete(JObjectKey key) {
|
||||
var record = new TxRecord.TxObjectRecordDeleted(key);
|
||||
if (_writes.put(key, record) instanceof TxRecord.TxObjectRecordDeleted) {
|
||||
return;
|
||||
}
|
||||
if (_writeTrack)
|
||||
_newWrites.put(key, record);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CloseableKvIterator<JObjectKey, JData> getIterator(IteratorStart start, JObjectKey key) {
|
||||
Log.tracev("Getting tx iterator with start={0}, key={1}", start, key);
|
||||
return new ReadTrackingIterator(new TombstoneSkippingIterator<JObjectKey, ReadTrackingInternalCrap>(start, key,
|
||||
ListUtils.prependAndMap(
|
||||
new MappingKvIterator<>(new NavigableMapKvIterator<>(_writes, start, key),
|
||||
t -> switch (t) {
|
||||
case TxRecord.TxObjectRecordWrite<?> write ->
|
||||
new DataWrapper<ReadTrackingInternalCrap>(new ReadTrackingInternalCrapTx(write.data()));
|
||||
case TxRecord.TxObjectRecordDeleted deleted ->
|
||||
new TombstoneImpl<ReadTrackingInternalCrap>();
|
||||
case null, default -> null;
|
||||
}),
|
||||
_snapshot.getIterator(start, key),
|
||||
itin -> new MappingKvIterator<JObjectKey, MaybeTombstone<JDataVersionedWrapper>, MaybeTombstone<ReadTrackingInternalCrap>>(itin,
|
||||
d -> switch (d) {
|
||||
case Data<JDataVersionedWrapper> w ->
|
||||
new DataWrapper<>(new ReadTrackingInternalCrapSource(w.value()));
|
||||
case Tombstone<JDataVersionedWrapper> t -> new TombstoneImpl<>();
|
||||
case null, default -> null;
|
||||
}))));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void put(JData obj) {
|
||||
var key = obj.key();
|
||||
var read = _readSet.get(key);
|
||||
if (read != null && (read.map(JDataVersionedWrapper::data).orElse(null) == obj)) {
|
||||
return;
|
||||
}
|
||||
|
||||
var record = new TxRecord.TxObjectRecordWrite<>(obj);
|
||||
_writes.put(key, record);
|
||||
if (_writeTrack)
|
||||
_newWrites.put(key, record);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void putNew(JData obj) {
|
||||
var key = obj.key();
|
||||
_knownNew.add(key);
|
||||
|
||||
var record = new TxRecord.TxObjectRecordWrite<>(obj);
|
||||
_writes.put(key, record);
|
||||
if (_writeTrack)
|
||||
_newWrites.put(key, record);
|
||||
}
|
||||
|
||||
Collection<TxRecord.TxObjectRecord<?>> drainNewWrites() {
|
||||
if (!_writeTrack) {
|
||||
_writeTrack = true;
|
||||
return Collections.unmodifiableCollection(_writes.values());
|
||||
}
|
||||
var ret = _newWrites;
|
||||
_newWrites = new HashMap<>();
|
||||
return ret.values();
|
||||
}
|
||||
|
||||
Map<JObjectKey, Optional<JDataVersionedWrapper>> reads() {
|
||||
return _readSet;
|
||||
}
|
||||
|
||||
Set<JObjectKey> knownNew() {
|
||||
return _knownNew;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
if (_closed) return;
|
||||
_closed = true;
|
||||
_snapshot.close();
|
||||
}
|
||||
|
||||
private class ReadTrackingIterator implements CloseableKvIterator<JObjectKey, JData> {
|
||||
private final CloseableKvIterator<JObjectKey, ReadTrackingInternalCrap> _backing;
|
||||
|
||||
public ReadTrackingIterator(CloseableKvIterator<JObjectKey, ReadTrackingInternalCrap> backing) {
|
||||
_backing = backing;
|
||||
}
|
||||
|
||||
@Override
|
||||
public JObjectKey peekNextKey() {
|
||||
return _backing.peekNextKey();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void skip() {
|
||||
_backing.skip();
|
||||
}
|
||||
|
||||
@Override
|
||||
public JObjectKey peekPrevKey() {
|
||||
return _backing.peekPrevKey();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Pair<JObjectKey, JData> prev() {
|
||||
var got = _backing.prev();
|
||||
if (got.getValue() instanceof ReadTrackingInternalCrapSource(JDataVersionedWrapper wrapped)) {
|
||||
_readSet.putIfAbsent(got.getKey(), Optional.of(wrapped));
|
||||
}
|
||||
return Pair.of(got.getKey(), got.getValue().obj());
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasPrev() {
|
||||
return _backing.hasPrev();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void skipPrev() {
|
||||
_backing.skipPrev();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
_backing.close();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasNext() {
|
||||
return _backing.hasNext();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Pair<JObjectKey, JData> next() {
|
||||
var got = _backing.next();
|
||||
if (got.getValue() instanceof ReadTrackingInternalCrapSource(JDataVersionedWrapper wrapped)) {
|
||||
_readSet.putIfAbsent(got.getKey(), Optional.of(wrapped));
|
||||
}
|
||||
return Pair.of(got.getKey(), got.getValue().obj());
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -11,14 +11,14 @@ import java.util.Stack;
|
||||
|
||||
@Singleton
|
||||
public class TransactionManagerImpl implements TransactionManager {
|
||||
private static final ThreadLocal<Stack<TransactionPrivate>> _currentTransaction = ThreadLocal.withInitial(Stack::new);
|
||||
private static final ThreadLocal<Stack<TransactionImpl>> _currentTransaction = ThreadLocal.withInitial(Stack::new);
|
||||
@Inject
|
||||
JObjectManager jObjectManager;
|
||||
TransactionService transactionService;
|
||||
|
||||
@Override
|
||||
public void begin() {
|
||||
Log.trace("Starting transaction");
|
||||
var tx = jObjectManager.createTransaction();
|
||||
var tx = transactionService.createTransaction();
|
||||
_currentTransaction.get().push(tx);
|
||||
}
|
||||
|
||||
@@ -34,7 +34,7 @@ public class TransactionManagerImpl implements TransactionManager {
|
||||
|
||||
Pair<Collection<Runnable>, TransactionHandle> ret;
|
||||
try {
|
||||
ret = jObjectManager.commit(peeked);
|
||||
ret = transactionService.commit(peeked);
|
||||
} catch (Throwable e) {
|
||||
Log.trace("Transaction commit failed", e);
|
||||
throw e;
|
||||
@@ -64,7 +64,7 @@ public class TransactionManagerImpl implements TransactionManager {
|
||||
var peeked = stack.peek();
|
||||
|
||||
try {
|
||||
jObjectManager.rollback(peeked);
|
||||
transactionService.rollback(peeked);
|
||||
} catch (Throwable e) {
|
||||
Log.error("Transaction rollback failed", e);
|
||||
throw e;
|
||||
|
||||
@@ -1,27 +0,0 @@
|
||||
package com.usatiuk.objects.transaction;
|
||||
|
||||
import com.usatiuk.objects.JData;
|
||||
import com.usatiuk.objects.JDataVersionedWrapper;
|
||||
import com.usatiuk.objects.JObjectKey;
|
||||
import com.usatiuk.objects.snapshot.Snapshot;
|
||||
import com.usatiuk.utils.AutoCloseableNoThrow;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
|
||||
// The transaction interface actually used by user code to retrieve objects
|
||||
public interface TransactionPrivate extends Transaction, TransactionHandlePrivate, AutoCloseableNoThrow {
|
||||
Collection<TxRecord.TxObjectRecord<?>> drainNewWrites();
|
||||
|
||||
Map<JObjectKey, Optional<JDataVersionedWrapper>> reads();
|
||||
|
||||
Set<JObjectKey> knownNew();
|
||||
|
||||
<T extends JData> Optional<T> getFromSource(Class<T> type, JObjectKey key);
|
||||
|
||||
Collection<Runnable> getOnCommit();
|
||||
|
||||
Snapshot<JObjectKey, JDataVersionedWrapper> snapshot();
|
||||
}
|
||||
@@ -22,12 +22,10 @@ import java.util.function.Consumer;
|
||||
import java.util.function.Function;
|
||||
|
||||
@ApplicationScoped
|
||||
public class JObjectManager {
|
||||
public class TransactionService {
|
||||
private static final List<PreCommitTxHook> _preCommitTxHooks;
|
||||
@Inject
|
||||
WritebackObjectPersistentStore writebackObjectPersistentStore;
|
||||
@Inject
|
||||
TransactionFactory transactionFactory;
|
||||
|
||||
private boolean _ready = false;
|
||||
private final DataLocker _objLocker = new DataLocker();
|
||||
@@ -36,7 +34,7 @@ public class JObjectManager {
|
||||
_preCommitTxHooks = List.copyOf(CDI.current().select(PreCommitTxHook.class).stream().sorted(Comparator.comparingInt(PreCommitTxHook::getPriority)).toList());
|
||||
}
|
||||
|
||||
JObjectManager(Instance<PreCommitTxHook> preCommitTxHooks) {
|
||||
TransactionService(Instance<PreCommitTxHook> preCommitTxHooks) {
|
||||
Log.debugv("Pre-commit hooks: {0}", String.join("->", _preCommitTxHooks.stream().map(Objects::toString).toList()));
|
||||
}
|
||||
|
||||
@@ -48,14 +46,14 @@ public class JObjectManager {
|
||||
_ready = true;
|
||||
}
|
||||
|
||||
public TransactionPrivate createTransaction() {
|
||||
public TransactionImpl createTransaction() {
|
||||
verifyReady();
|
||||
var tx = transactionFactory.createTransaction();
|
||||
var tx = new TransactionImpl(writebackObjectPersistentStore.getSnapshot());
|
||||
Log.tracev("Created transaction with snapshotId={0}", tx.snapshot().id());
|
||||
return tx;
|
||||
}
|
||||
|
||||
public Pair<Collection<Runnable>, TransactionHandle> commit(TransactionPrivate tx) {
|
||||
public Pair<Collection<Runnable>, TransactionHandle> commit(TransactionImpl tx) {
|
||||
verifyReady();
|
||||
var writes = new HashMap<JObjectKey, TxRecord.TxObjectRecord<?>>();
|
||||
Snapshot<JObjectKey, JDataVersionedWrapper> commitSnapshot = null;
|
||||
@@ -259,7 +257,7 @@ public class JObjectManager {
|
||||
}
|
||||
}
|
||||
|
||||
public void rollback(TransactionPrivate tx) {
|
||||
public void rollback(TransactionImpl tx) {
|
||||
verifyReady();
|
||||
tx.close();
|
||||
}
|
||||
Reference in New Issue
Block a user