simple transactions 1

This commit is contained in:
2024-12-02 22:08:48 +01:00
parent 094a3e5e76
commit 7bb509024f
25 changed files with 890 additions and 662 deletions

View File

@@ -0,0 +1,24 @@
package com.usatiuk.dhfs.objects;
import com.usatiuk.dhfs.objects.transaction.LockingStrategy;
import com.usatiuk.dhfs.objects.transaction.Transaction;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import java.util.Optional;
@ApplicationScoped
public class CurrentTransaction implements Transaction {
@Inject
TransactionManager transactionManager;
@Override
public <T extends JData> Optional<T> getObject(Class<T> type, JObjectKey key, LockingStrategy strategy) {
return transactionManager.current().getObject(type, key, strategy);
}
@Override
public <T extends JData> void putObject(JData obj) {
transactionManager.current().putObject(obj);
}
}

View File

@@ -1,6 +1,7 @@
package com.usatiuk.dhfs.objects;
import com.usatiuk.dhfs.objects.persistence.ObjectPersistentStore;
import com.usatiuk.dhfs.objects.persistence.TxManifest;
import com.usatiuk.dhfs.objects.transaction.TransactionFactory;
import com.usatiuk.dhfs.objects.transaction.TransactionObjectSource;
import com.usatiuk.dhfs.objects.transaction.TransactionPrivate;
@@ -14,9 +15,7 @@ import org.apache.commons.lang3.tuple.Pair;
import java.lang.ref.Cleaner;
import java.lang.ref.WeakReference;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.Optional;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReadWriteLock;
@@ -114,7 +113,9 @@ public class JObjectManager {
};
public TransactionPrivate createTransaction() {
return transactionFactory.createTransaction(_txCounter.getAndIncrement(), _objSource);
var counter = _txCounter.getAndIncrement();
Log.trace("Creating transaction " + counter);
return transactionFactory.createTransaction(counter, _objSource);
}
@@ -123,8 +124,11 @@ public class JObjectManager {
var toFlush = new LinkedList<TxRecord.TxObjectRecordWrite<?>>();
var toLock = new ArrayList<TxRecord.TxObjectRecordCopyNoLock<?>>();
Log.trace("Committing transaction " + tx.getId());
try {
for (var entry : tx.drain()) {
Log.trace("Processing entry " + entry.toString());
switch (entry) {
case TxRecord.TxObjectRecordRead<?> read -> {
toUnlock.add(read.original().getLock().readLock()::unlock);
@@ -148,7 +152,11 @@ public class JObjectManager {
}
}
toLock.sort(Comparator.comparingInt(a -> System.identityHashCode(a.original())));
for (var record : toLock) {
Log.trace("Locking " + record.toString());
var found = _objects.get(record.original().getKey());
if (found.get() != record.original()) {
@@ -160,15 +168,65 @@ public class JObjectManager {
}
for (var record : toFlush) {
Log.trace("Processing flush entry " + record.toString());
var current = _objects.get(record.copy().wrapped().getKey());
if (current == null && !(record instanceof TxRecord.TxObjectRecordNew<?>)) {
throw new IllegalStateException("Object not found during transaction");
} else if (current != null) {
var old = switch (record) {
case TxRecord.TxObjectRecordCopyLock<?> copy -> copy.original().get();
case TxRecord.TxObjectRecordCopyNoLock<?> copy -> copy.original();
default -> throw new IllegalStateException("Unexpected value: " + record);
};
assert current == null && record instanceof TxRecord.TxObjectRecordNew<?> || current == record.copy().wrapped();
if (current.get() != )
if (current.get() != old) {
throw new IllegalStateException("Object changed during transaction");
}
if (current.lastWriteTx > tx.getId()) {
throw new IllegalStateException("Transaction race");
}
} else if (record instanceof TxRecord.TxObjectRecordNew<?> created) {
var wrapper = new JDataWrapper<>(created.created());
wrapper.lock.writeLock().lock();
var old = _objects.putIfAbsent(created.created().getKey(), wrapper);
if (old != null)
throw new IllegalStateException("Object already exists");
toUnlock.add(wrapper.lock.writeLock()::unlock);
} else {
throw new IllegalStateException("Object not found during transaction");
}
}
// Have all locks now
for (var record : toFlush) {
Log.trace("Flushing " + record.toString());
if (!record.copy().isModified())
continue;
var obj = record.copy().wrapped();
var key = obj.getKey();
var data = objectSerializer.serialize(obj);
objectStorage.writeObject(key, data);
_objects.get(key).lastWriteTx = tx.getId(); // FIXME:
}
Log.trace("Flushing transaction " + tx.getId());
objectStorage.commitTx(new TxManifest() {
@Override
public List<JObjectKey> getWritten() {
return toFlush.stream().map(r -> r.copy().wrapped().getKey()).toList();
}
@Override
public List<JObjectKey> getDeleted() {
return List.of();
}
});
} catch (Throwable t) {
Log.error("Error when committing transaction", t);
throw t;

View File

@@ -2,13 +2,14 @@ package com.usatiuk.dhfs.objects;
import com.usatiuk.dhfs.objects.transaction.Transaction;
import com.usatiuk.dhfs.objects.transaction.TransactionPrivate;
import io.quarkus.logging.Log;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
@ApplicationScoped
public class TransactionManagerImpl implements TransactionManager {
@Inject
JObjectManager objectManager;
JObjectManager jObjectManager;
private static final ThreadLocal<TransactionPrivate> _currentTransaction = new ThreadLocal<>();
@@ -18,28 +19,36 @@ public class TransactionManagerImpl implements TransactionManager {
throw new IllegalStateException("Transaction already started");
}
var tx = objectManager.createTransaction();
Log.trace("Starting transaction");
var tx = jObjectManager.createTransaction();
_currentTransaction.set(tx);
}
@Override
public void commit() {
if(_currentTransaction.get() == null) {
if (_currentTransaction.get() == null) {
throw new IllegalStateException("No transaction started");
}
jobjectManager.commit(_currentTransaction.get());
Log.trace("Committing transaction");
try {
jObjectManager.commit(_currentTransaction.get());
} catch (Throwable e) {
Log.warn("Transaction commit failed", e);
throw e;
} finally {
_currentTransaction.remove();
}
}
@Override
public void rollback() {
_currentTransaction.remove();
}
@Override
public Transaction current() {
return _currentTransaction.get();
}
}

View File

@@ -1,415 +1,415 @@
package com.usatiuk.dhfs.objects;
import com.usatiuk.dhfs.objects.persistence.ObjectPersistentStore;
import com.usatiuk.dhfs.utils.VoidFn;
import io.quarkus.logging.Log;
import io.quarkus.runtime.ShutdownEvent;
import io.quarkus.runtime.StartupEvent;
import jakarta.annotation.Priority;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
import jakarta.inject.Inject;
import lombok.Getter;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import java.util.*;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@ApplicationScoped
public class TxWritebackImpl implements TxWriteback {
private final LinkedList<TxBundle> _pendingBundles = new LinkedList<>();
private final LinkedHashMap<Long, TxBundle> _notFlushedBundles = new LinkedHashMap<>();
private final Object _flushWaitSynchronizer = new Object();
private final AtomicLong _lastWrittenTx = new AtomicLong(-1);
private final AtomicLong _counter = new AtomicLong();
private final AtomicLong _waitedTotal = new AtomicLong(0);
@Inject
ObjectPersistentStore objectPersistentStore;
@ConfigProperty(name = "dhfs.objects.writeback.limit")
long sizeLimit;
private long currentSize = 0;
private ExecutorService _writebackExecutor;
private ExecutorService _commitExecutor;
private ExecutorService _statusExecutor;
private volatile boolean _ready = false;
void init(@Observes @Priority(110) StartupEvent event) {
{
BasicThreadFactory factory = new BasicThreadFactory.Builder()
.namingPattern("tx-writeback-%d")
.build();
_writebackExecutor = Executors.newSingleThreadExecutor(factory);
_writebackExecutor.submit(this::writeback);
}
{
BasicThreadFactory factory = new BasicThreadFactory.Builder()
.namingPattern("writeback-commit-%d")
.build();
_commitExecutor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(), factory);
}
_statusExecutor = Executors.newSingleThreadExecutor();
_statusExecutor.submit(() -> {
try {
while (true) {
Thread.sleep(1000);
if (currentSize > 0)
Log.info("Tx commit status: size="
+ currentSize / 1024 / 1024 + "MB");
}
} catch (InterruptedException ignored) {
}
});
_ready = true;
}
void shutdown(@Observes @Priority(890) ShutdownEvent event) throws InterruptedException {
Log.info("Waiting for all transactions to drain");
synchronized (_flushWaitSynchronizer) {
_ready = false;
while (currentSize > 0) {
_flushWaitSynchronizer.wait();
}
}
_writebackExecutor.shutdownNow();
Log.info("Total tx bundle wait time: " + _waitedTotal.get() + "ms");
}
private void verifyReady() {
if (!_ready) throw new IllegalStateException("Not doing transactions while shutting down!");
}
private void writeback() {
while (!Thread.interrupted()) {
try {
TxBundle bundle = new TxBundle(0);
synchronized (_pendingBundles) {
while (_pendingBundles.isEmpty() || !_pendingBundles.peek()._ready)
_pendingBundles.wait();
long diff = 0;
while (!_pendingBundles.isEmpty() && _pendingBundles.peek()._ready) {
var toCompress = _pendingBundles.poll();
diff -= toCompress.calculateTotalSize();
bundle.compress(toCompress);
}
diff += bundle.calculateTotalSize();
synchronized (_flushWaitSynchronizer) {
currentSize += diff;
}
}
var latch = new CountDownLatch(bundle._committed.size() + bundle._meta.size());
ConcurrentLinkedQueue<Throwable> errors = new ConcurrentLinkedQueue<>();
for (var c : bundle._committed.values()) {
_commitExecutor.execute(() -> {
try {
Log.trace("Writing new " + c.newMeta.getName());
objectPersistentStore.writeNewObject(c.newMeta.getName(), c.newMeta, c.newData);
} catch (Throwable t) {
Log.error("Error writing " + c.newMeta.getName(), t);
errors.add(t);
} finally {
latch.countDown();
}
});
}
for (var c : bundle._meta.values()) {
_commitExecutor.execute(() -> {
try {
Log.trace("Writing (meta) " + c.newMeta.getName());
objectPersistentStore.writeNewObjectMeta(c.newMeta.getName(), c.newMeta);
} catch (Throwable t) {
Log.error("Error writing " + c.newMeta.getName(), t);
errors.add(t);
} finally {
latch.countDown();
}
});
}
if (Log.isDebugEnabled())
for (var d : bundle._deleted.keySet())
Log.debug("Deleting from persistent storage " + d.getMeta().getName()); // FIXME: For tests
latch.await();
if (!errors.isEmpty()) {
throw new RuntimeException("Errors in writeback!");
}
objectPersistentStore.commitTx(
new TxManifest(
Stream.concat(bundle._committed.keySet().stream().map(t -> t.getMeta().getName()),
bundle._meta.keySet().stream().map(t -> t.getMeta().getName())).collect(Collectors.toCollection(ArrayList::new)),
bundle._deleted.keySet().stream().map(t -> t.getMeta().getName()).collect(Collectors.toCollection(ArrayList::new))
));
Log.trace("Bundle " + bundle.getId() + " committed");
List<List<VoidFn>> callbacks = new ArrayList<>();
synchronized (_notFlushedBundles) {
_lastWrittenTx.set(bundle.getId());
while (!_notFlushedBundles.isEmpty() && _notFlushedBundles.firstEntry().getKey() <= bundle.getId()) {
callbacks.add(_notFlushedBundles.pollFirstEntry().getValue().setCommitted());
}
}
callbacks.forEach(l -> l.forEach(VoidFn::apply));
synchronized (_flushWaitSynchronizer) {
currentSize -= ((TxBundle) bundle).calculateTotalSize();
// FIXME:
if (currentSize <= sizeLimit || !_ready)
_flushWaitSynchronizer.notifyAll();
}
} catch (InterruptedException ignored) {
} catch (Exception e) {
Log.error("Uncaught exception in writeback", e);
} catch (Throwable o) {
Log.error("Uncaught THROWABLE in writeback", o);
}
}
Log.info("Writeback thread exiting");
}
@Override
public com.usatiuk.dhfs.objects.jrepository.TxBundle createBundle() {
verifyReady();
boolean wait = false;
while (true) {
if (wait) {
synchronized (_flushWaitSynchronizer) {
long started = System.currentTimeMillis();
while (currentSize > sizeLimit) {
try {
_flushWaitSynchronizer.wait();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
long waited = System.currentTimeMillis() - started;
_waitedTotal.addAndGet(waited);
if (Log.isTraceEnabled())
Log.trace("Thread " + Thread.currentThread().getName() + " waited for tx bundle for " + waited + " ms");
wait = false;
}
}
synchronized (_pendingBundles) {
synchronized (_flushWaitSynchronizer) {
if (currentSize > sizeLimit) {
if (!_pendingBundles.isEmpty() && _pendingBundles.peek()._ready) {
var target = _pendingBundles.poll();
long diff = -target.calculateTotalSize();
while (!_pendingBundles.isEmpty() && _pendingBundles.peek()._ready) {
var toCompress = _pendingBundles.poll();
diff -= toCompress.calculateTotalSize();
target.compress(toCompress);
}
diff += target.calculateTotalSize();
currentSize += diff;
_pendingBundles.addFirst(target);
}
}
if (currentSize > sizeLimit) {
wait = true;
continue;
}
}
synchronized (_notFlushedBundles) {
var bundle = new TxBundle(_counter.incrementAndGet());
_pendingBundles.addLast(bundle);
_notFlushedBundles.put(bundle.getId(), bundle);
return bundle;
}
}
}
}
@Override
public void commitBundle(com.usatiuk.dhfs.objects.TxBundle bundle) {
verifyReady();
synchronized (_pendingBundles) {
((TxBundle) bundle).setReady();
if (_pendingBundles.peek() == bundle)
_pendingBundles.notify();
synchronized (_flushWaitSynchronizer) {
currentSize += ((TxBundle) bundle).calculateTotalSize();
}
}
}
@Override
public void dropBundle(com.usatiuk.dhfs.objects.TxBundle bundle) {
verifyReady();
synchronized (_pendingBundles) {
Log.warn("Dropped bundle: " + bundle);
_pendingBundles.remove((TxBundle) bundle);
synchronized (_flushWaitSynchronizer) {
currentSize -= ((TxBundle) bundle).calculateTotalSize();
}
}
}
@Override
public void fence(long bundleId) {
var latch = new CountDownLatch(1);
asyncFence(bundleId, latch::countDown);
try {
latch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
@Override
public void asyncFence(long bundleId, VoidFn fn) {
verifyReady();
if (bundleId < 0) throw new IllegalArgumentException("txId should be >0!");
if (_lastWrittenTx.get() >= bundleId) {
fn.apply();
return;
}
synchronized (_notFlushedBundles) {
if (_lastWrittenTx.get() >= bundleId) {
fn.apply();
return;
}
_notFlushedBundles.get(bundleId).addCallback(fn);
}
}
@Getter
private static class TxManifest implements com.usatiuk.dhfs.objects.repository.persistence.TxManifest {
private final ArrayList<String> _written;
private final ArrayList<String> _deleted;
private TxManifest(ArrayList<String> written, ArrayList<String> deleted) {
_written = written;
_deleted = deleted;
}
}
private class TxBundle implements com.usatiuk.dhfs.objects.jrepository.TxBundle {
private final HashMap<JObject<?>, CommittedEntry> _committed = new HashMap<>();
private final HashMap<JObject<?>, CommittedMeta> _meta = new HashMap<>();
private final HashMap<JObject<?>, Integer> _deleted = new HashMap<>();
private final ArrayList<VoidFn> _callbacks = new ArrayList<>();
private long _txId;
@Getter
private volatile boolean _ready = false;
private long _size = -1;
private boolean _wasCommitted = false;
private TxBundle(long txId) {_txId = txId;}
@Override
public long getId() {
return _txId;
}
public void setReady() {
_ready = true;
}
public void addCallback(VoidFn callback) {
synchronized (_callbacks) {
if (_wasCommitted) throw new IllegalStateException();
_callbacks.add(callback);
}
}
public List<VoidFn> setCommitted() {
synchronized (_callbacks) {
_wasCommitted = true;
return Collections.unmodifiableList(_callbacks);
}
}
@Override
public void commit(JObject<?> obj, ObjectMetadataP meta, JObjectDataP data) {
synchronized (_committed) {
_committed.put(obj, new CommittedEntry(meta, data, obj.estimateSize()));
}
}
@Override
public void commitMetaChange(JObject<?> obj, ObjectMetadataP meta) {
synchronized (_meta) {
_meta.put(obj, new CommittedMeta(meta, obj.estimateSize()));
}
}
@Override
public void delete(JObject<?> obj) {
synchronized (_deleted) {
_deleted.put(obj, obj.estimateSize());
}
}
public long calculateTotalSize() {
if (_size >= 0) return _size;
long out = 0;
for (var c : _committed.values())
out += c.size;
for (var c : _meta.values())
out += c.size;
for (var c : _deleted.entrySet())
out += c.getValue();
_size = out;
return _size;
}
public void compress(TxBundle other) {
if (_txId >= other._txId)
throw new IllegalArgumentException("Compressing an older bundle into newer");
_txId = other._txId;
_size = -1;
for (var d : other._deleted.entrySet()) {
_committed.remove(d.getKey());
_meta.remove(d.getKey());
_deleted.put(d.getKey(), d.getValue());
}
for (var c : other._committed.entrySet()) {
_committed.put(c.getKey(), c.getValue());
_meta.remove(c.getKey());
_deleted.remove(c.getKey());
}
for (var m : other._meta.entrySet()) {
var deleted = _deleted.remove(m.getKey());
if (deleted != null) {
_committed.put(m.getKey(), new CommittedEntry(m.getValue().newMeta, null, m.getKey().estimateSize()));
continue;
}
var committed = _committed.remove(m.getKey());
if (committed != null) {
_committed.put(m.getKey(), new CommittedEntry(m.getValue().newMeta, committed.newData, m.getKey().estimateSize()));
continue;
}
_meta.put(m.getKey(), m.getValue());
}
}
private record CommittedEntry(ObjectMetadataP newMeta, JObjectDataP newData, int size) {}
private record CommittedMeta(ObjectMetadataP newMeta, int size) {}
private record Deleted(JObject<?> handle) {}
}
}
//package com.usatiuk.dhfs.objects;
//
//import com.usatiuk.dhfs.objects.persistence.ObjectPersistentStore;
//import com.usatiuk.dhfs.utils.VoidFn;
//import io.quarkus.logging.Log;
//import io.quarkus.runtime.ShutdownEvent;
//import io.quarkus.runtime.StartupEvent;
//import jakarta.annotation.Priority;
//import jakarta.enterprise.context.ApplicationScoped;
//import jakarta.enterprise.event.Observes;
//import jakarta.inject.Inject;
//import lombok.Getter;
//import org.apache.commons.lang3.concurrent.BasicThreadFactory;
//import org.eclipse.microprofile.config.inject.ConfigProperty;
//
//import java.util.*;
//import java.util.concurrent.ConcurrentLinkedQueue;
//import java.util.concurrent.CountDownLatch;
//import java.util.concurrent.ExecutorService;
//import java.util.concurrent.Executors;
//import java.util.concurrent.atomic.AtomicLong;
//import java.util.stream.Collectors;
//import java.util.stream.Stream;
//
//@ApplicationScoped
//public class TxWritebackImpl implements TxWriteback {
// private final LinkedList<TxBundle> _pendingBundles = new LinkedList<>();
// private final LinkedHashMap<Long, TxBundle> _notFlushedBundles = new LinkedHashMap<>();
//
// private final Object _flushWaitSynchronizer = new Object();
// private final AtomicLong _lastWrittenTx = new AtomicLong(-1);
// private final AtomicLong _counter = new AtomicLong();
// private final AtomicLong _waitedTotal = new AtomicLong(0);
// @Inject
// ObjectPersistentStore objectPersistentStore;
// @ConfigProperty(name = "dhfs.objects.writeback.limit")
// long sizeLimit;
// private long currentSize = 0;
// private ExecutorService _writebackExecutor;
// private ExecutorService _commitExecutor;
// private ExecutorService _statusExecutor;
// private volatile boolean _ready = false;
//
// void init(@Observes @Priority(110) StartupEvent event) {
// {
// BasicThreadFactory factory = new BasicThreadFactory.Builder()
// .namingPattern("tx-writeback-%d")
// .build();
//
// _writebackExecutor = Executors.newSingleThreadExecutor(factory);
// _writebackExecutor.submit(this::writeback);
// }
//
// {
// BasicThreadFactory factory = new BasicThreadFactory.Builder()
// .namingPattern("writeback-commit-%d")
// .build();
//
// _commitExecutor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(), factory);
// }
// _statusExecutor = Executors.newSingleThreadExecutor();
// _statusExecutor.submit(() -> {
// try {
// while (true) {
// Thread.sleep(1000);
// if (currentSize > 0)
// Log.info("Tx commit status: size="
// + currentSize / 1024 / 1024 + "MB");
// }
// } catch (InterruptedException ignored) {
// }
// });
// _ready = true;
// }
//
// void shutdown(@Observes @Priority(890) ShutdownEvent event) throws InterruptedException {
// Log.info("Waiting for all transactions to drain");
//
// synchronized (_flushWaitSynchronizer) {
// _ready = false;
// while (currentSize > 0) {
// _flushWaitSynchronizer.wait();
// }
// }
//
// _writebackExecutor.shutdownNow();
// Log.info("Total tx bundle wait time: " + _waitedTotal.get() + "ms");
// }
//
// private void verifyReady() {
// if (!_ready) throw new IllegalStateException("Not doing transactions while shutting down!");
// }
//
// private void writeback() {
// while (!Thread.interrupted()) {
// try {
// TxBundle bundle = new TxBundle(0);
// synchronized (_pendingBundles) {
// while (_pendingBundles.isEmpty() || !_pendingBundles.peek()._ready)
// _pendingBundles.wait();
//
// long diff = 0;
// while (!_pendingBundles.isEmpty() && _pendingBundles.peek()._ready) {
// var toCompress = _pendingBundles.poll();
// diff -= toCompress.calculateTotalSize();
// bundle.compress(toCompress);
// }
// diff += bundle.calculateTotalSize();
// synchronized (_flushWaitSynchronizer) {
// currentSize += diff;
// }
// }
//
// var latch = new CountDownLatch(bundle._committed.size() + bundle._meta.size());
// ConcurrentLinkedQueue<Throwable> errors = new ConcurrentLinkedQueue<>();
//
// for (var c : bundle._committed.values()) {
// _commitExecutor.execute(() -> {
// try {
// Log.trace("Writing new " + c.newMeta.getName());
// objectPersistentStore.writeNewObject(c.newMeta.getName(), c.newMeta, c.newData);
// } catch (Throwable t) {
// Log.error("Error writing " + c.newMeta.getName(), t);
// errors.add(t);
// } finally {
// latch.countDown();
// }
// });
// }
// for (var c : bundle._meta.values()) {
// _commitExecutor.execute(() -> {
// try {
// Log.trace("Writing (meta) " + c.newMeta.getName());
// objectPersistentStore.writeNewObjectMeta(c.newMeta.getName(), c.newMeta);
// } catch (Throwable t) {
// Log.error("Error writing " + c.newMeta.getName(), t);
// errors.add(t);
// } finally {
// latch.countDown();
// }
// });
// }
// if (Log.isDebugEnabled())
// for (var d : bundle._deleted.keySet())
// Log.debug("Deleting from persistent storage " + d.getMeta().getName()); // FIXME: For tests
//
// latch.await();
// if (!errors.isEmpty()) {
// throw new RuntimeException("Errors in writeback!");
// }
// objectPersistentStore.commitTx(
// new TxManifest(
// Stream.concat(bundle._committed.keySet().stream().map(t -> t.getMeta().getName()),
// bundle._meta.keySet().stream().map(t -> t.getMeta().getName())).collect(Collectors.toCollection(ArrayList::new)),
// bundle._deleted.keySet().stream().map(t -> t.getMeta().getName()).collect(Collectors.toCollection(ArrayList::new))
// ));
// Log.trace("Bundle " + bundle.getId() + " committed");
//
//
// List<List<VoidFn>> callbacks = new ArrayList<>();
// synchronized (_notFlushedBundles) {
// _lastWrittenTx.set(bundle.getId());
// while (!_notFlushedBundles.isEmpty() && _notFlushedBundles.firstEntry().getKey() <= bundle.getId()) {
// callbacks.add(_notFlushedBundles.pollFirstEntry().getValue().setCommitted());
// }
// }
// callbacks.forEach(l -> l.forEach(VoidFn::apply));
//
// synchronized (_flushWaitSynchronizer) {
// currentSize -= ((TxBundle) bundle).calculateTotalSize();
// // FIXME:
// if (currentSize <= sizeLimit || !_ready)
// _flushWaitSynchronizer.notifyAll();
// }
// } catch (InterruptedException ignored) {
// } catch (Exception e) {
// Log.error("Uncaught exception in writeback", e);
// } catch (Throwable o) {
// Log.error("Uncaught THROWABLE in writeback", o);
// }
// }
// Log.info("Writeback thread exiting");
// }
//
// @Override
// public com.usatiuk.dhfs.objects.jrepository.TxBundle createBundle() {
// verifyReady();
// boolean wait = false;
// while (true) {
// if (wait) {
// synchronized (_flushWaitSynchronizer) {
// long started = System.currentTimeMillis();
// while (currentSize > sizeLimit) {
// try {
// _flushWaitSynchronizer.wait();
// } catch (InterruptedException e) {
// throw new RuntimeException(e);
// }
// }
// long waited = System.currentTimeMillis() - started;
// _waitedTotal.addAndGet(waited);
// if (Log.isTraceEnabled())
// Log.trace("Thread " + Thread.currentThread().getName() + " waited for tx bundle for " + waited + " ms");
// wait = false;
// }
// }
// synchronized (_pendingBundles) {
// synchronized (_flushWaitSynchronizer) {
// if (currentSize > sizeLimit) {
// if (!_pendingBundles.isEmpty() && _pendingBundles.peek()._ready) {
// var target = _pendingBundles.poll();
//
// long diff = -target.calculateTotalSize();
// while (!_pendingBundles.isEmpty() && _pendingBundles.peek()._ready) {
// var toCompress = _pendingBundles.poll();
// diff -= toCompress.calculateTotalSize();
// target.compress(toCompress);
// }
// diff += target.calculateTotalSize();
// currentSize += diff;
// _pendingBundles.addFirst(target);
// }
// }
//
// if (currentSize > sizeLimit) {
// wait = true;
// continue;
// }
// }
// synchronized (_notFlushedBundles) {
// var bundle = new TxBundle(_counter.incrementAndGet());
// _pendingBundles.addLast(bundle);
// _notFlushedBundles.put(bundle.getId(), bundle);
// return bundle;
// }
// }
// }
// }
//
// @Override
// public void commitBundle(com.usatiuk.dhfs.objects.TxBundle bundle) {
// verifyReady();
// synchronized (_pendingBundles) {
// ((TxBundle) bundle).setReady();
// if (_pendingBundles.peek() == bundle)
// _pendingBundles.notify();
// synchronized (_flushWaitSynchronizer) {
// currentSize += ((TxBundle) bundle).calculateTotalSize();
// }
// }
// }
//
// @Override
// public void dropBundle(com.usatiuk.dhfs.objects.TxBundle bundle) {
// verifyReady();
// synchronized (_pendingBundles) {
// Log.warn("Dropped bundle: " + bundle);
// _pendingBundles.remove((TxBundle) bundle);
// synchronized (_flushWaitSynchronizer) {
// currentSize -= ((TxBundle) bundle).calculateTotalSize();
// }
// }
// }
//
// @Override
// public void fence(long bundleId) {
// var latch = new CountDownLatch(1);
// asyncFence(bundleId, latch::countDown);
// try {
// latch.await();
// } catch (InterruptedException e) {
// throw new RuntimeException(e);
// }
// }
//
// @Override
// public void asyncFence(long bundleId, VoidFn fn) {
// verifyReady();
// if (bundleId < 0) throw new IllegalArgumentException("txId should be >0!");
// if (_lastWrittenTx.get() >= bundleId) {
// fn.apply();
// return;
// }
// synchronized (_notFlushedBundles) {
// if (_lastWrittenTx.get() >= bundleId) {
// fn.apply();
// return;
// }
// _notFlushedBundles.get(bundleId).addCallback(fn);
// }
// }
//
// @Getter
// private static class TxManifest implements com.usatiuk.dhfs.objects.repository.persistence.TxManifest {
// private final ArrayList<String> _written;
// private final ArrayList<String> _deleted;
//
// private TxManifest(ArrayList<String> written, ArrayList<String> deleted) {
// _written = written;
// _deleted = deleted;
// }
// }
//
// private class TxBundle implements com.usatiuk.dhfs.objects.jrepository.TxBundle {
// private final HashMap<JObject<?>, CommittedEntry> _committed = new HashMap<>();
// private final HashMap<JObject<?>, CommittedMeta> _meta = new HashMap<>();
// private final HashMap<JObject<?>, Integer> _deleted = new HashMap<>();
// private final ArrayList<VoidFn> _callbacks = new ArrayList<>();
// private long _txId;
// @Getter
// private volatile boolean _ready = false;
// private long _size = -1;
// private boolean _wasCommitted = false;
//
// private TxBundle(long txId) {_txId = txId;}
//
// @Override
// public long getId() {
// return _txId;
// }
//
// public void setReady() {
// _ready = true;
// }
//
// public void addCallback(VoidFn callback) {
// synchronized (_callbacks) {
// if (_wasCommitted) throw new IllegalStateException();
// _callbacks.add(callback);
// }
// }
//
// public List<VoidFn> setCommitted() {
// synchronized (_callbacks) {
// _wasCommitted = true;
// return Collections.unmodifiableList(_callbacks);
// }
// }
//
// @Override
// public void commit(JObject<?> obj, ObjectMetadataP meta, JObjectDataP data) {
// synchronized (_committed) {
// _committed.put(obj, new CommittedEntry(meta, data, obj.estimateSize()));
// }
// }
//
// @Override
// public void commitMetaChange(JObject<?> obj, ObjectMetadataP meta) {
// synchronized (_meta) {
// _meta.put(obj, new CommittedMeta(meta, obj.estimateSize()));
// }
// }
//
// @Override
// public void delete(JObject<?> obj) {
// synchronized (_deleted) {
// _deleted.put(obj, obj.estimateSize());
// }
// }
//
//
// public long calculateTotalSize() {
// if (_size >= 0) return _size;
// long out = 0;
// for (var c : _committed.values())
// out += c.size;
// for (var c : _meta.values())
// out += c.size;
// for (var c : _deleted.entrySet())
// out += c.getValue();
// _size = out;
// return _size;
// }
//
// public void compress(TxBundle other) {
// if (_txId >= other._txId)
// throw new IllegalArgumentException("Compressing an older bundle into newer");
//
// _txId = other._txId;
// _size = -1;
//
// for (var d : other._deleted.entrySet()) {
// _committed.remove(d.getKey());
// _meta.remove(d.getKey());
// _deleted.put(d.getKey(), d.getValue());
// }
//
// for (var c : other._committed.entrySet()) {
// _committed.put(c.getKey(), c.getValue());
// _meta.remove(c.getKey());
// _deleted.remove(c.getKey());
// }
//
// for (var m : other._meta.entrySet()) {
// var deleted = _deleted.remove(m.getKey());
// if (deleted != null) {
// _committed.put(m.getKey(), new CommittedEntry(m.getValue().newMeta, null, m.getKey().estimateSize()));
// continue;
// }
// var committed = _committed.remove(m.getKey());
// if (committed != null) {
// _committed.put(m.getKey(), new CommittedEntry(m.getValue().newMeta, committed.newData, m.getKey().estimateSize()));
// continue;
// }
// _meta.put(m.getKey(), m.getValue());
// }
// }
//
// private record CommittedEntry(ObjectMetadataP newMeta, JObjectDataP newData, int size) {}
//
// private record CommittedMeta(ObjectMetadataP newMeta, int size) {}
//
// private record Deleted(JObject<?> handle) {}
// }
//}

View File

@@ -8,6 +8,8 @@ import com.usatiuk.dhfs.utils.ByteUtils;
import com.usatiuk.dhfs.utils.SerializationHelper;
import com.usatiuk.dhfs.utils.StatusRuntimeExceptionNoStacktrace;
import io.grpc.Status;
import io.quarkus.arc.lookup.LookupIfProperty;
import io.quarkus.arc.properties.IfBuildProperty;
import io.quarkus.logging.Log;
import io.quarkus.runtime.ShutdownEvent;
import io.quarkus.runtime.StartupEvent;
@@ -43,6 +45,7 @@ import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
// rest of metadata
@ApplicationScoped
@IfBuildProperty(name = "dhfs.objects.persistence", stringValue = "files")
public class FileObjectPersistentStore implements ObjectPersistentStore {
private final Path _root;
private final Path _txManifest;

View File

@@ -1,8 +1,10 @@
package com.usatiuk.dhfs.objects.persistence;
import com.usatiuk.dhfs.objects.JData;
import com.google.protobuf.ByteString;
import com.usatiuk.dhfs.objects.JObjectKey;
import com.usatiuk.dhfs.objects.test.objs.TestData;
import io.quarkus.arc.lookup.LookupIfProperty;
import io.quarkus.arc.properties.IfBuildProperty;
import jakarta.enterprise.context.ApplicationScoped;
import javax.annotation.Nonnull;
import java.util.Collection;
@@ -10,9 +12,11 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
public class FakeObjectStorage implements ObjectPersistentStore {
private final Map<JObjectKey, TestData> _objects = new HashMap<>();
private final Map<JObjectKey, TestData> _pending = new HashMap<>();
@ApplicationScoped
@IfBuildProperty(name = "dhfs.objects.persistence", stringValue = "memory")
public class MemoryObjectPersistentStore implements ObjectPersistentStore {
private final Map<JObjectKey, ByteString> _objects = new HashMap<>();
private final Map<JObjectKey, ByteString> _pending = new HashMap<>();
@Nonnull
@Override
@@ -24,16 +28,16 @@ public class FakeObjectStorage implements ObjectPersistentStore {
@Nonnull
@Override
public Optional<JData> readObject(JObjectKey name) {
public Optional<ByteString> readObject(JObjectKey name) {
synchronized (this) {
return Optional.ofNullable(_objects.get(name));
}
}
@Override
public void writeObject(JObjectKey name, JData object) {
public void writeObject(JObjectKey name, ByteString object) {
synchronized (this) {
_pending.put(name, (TestData) object);
_pending.put(name, object);
}
}
@@ -49,13 +53,6 @@ public class FakeObjectStorage implements ObjectPersistentStore {
}
}
@Override
public void deleteObjectDirect(JObjectKey name) {
synchronized (this) {
_objects.remove(name);
}
}
@Override
public long getTotalSpace() {
return 0;

View File

@@ -3,12 +3,14 @@ package com.usatiuk.dhfs.objects.transaction;
import com.usatiuk.dhfs.objects.JData;
import com.usatiuk.dhfs.objects.JObjectKey;
import com.usatiuk.dhfs.objects.ObjectAllocator;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import lombok.AccessLevel;
import lombok.Getter;
import java.util.*;
@ApplicationScoped
public class TransactionFactoryImpl implements TransactionFactory {
@Inject
ObjectAllocator objectAllocator;

View File

@@ -1,97 +1,150 @@
package com.usatiuk.dhfs.objects;
import com.usatiuk.dhfs.objects.persistence.FakeObjectStorage;
import com.usatiuk.dhfs.objects.test.objs.Kid;
import com.usatiuk.dhfs.objects.test.objs.Parent;
import com.usatiuk.dhfs.objects.data.Parent;
import com.usatiuk.dhfs.objects.transaction.LockingStrategy;
import io.quarkus.logging.Log;
import io.quarkus.test.junit.QuarkusTest;
import jakarta.inject.Inject;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
@QuarkusTest
public class ObjectsTest {
private final FakeObjectStorage _storage = new FakeObjectStorage();
private final JObjectManager _tx = new JObjectManager(_storage);
@Inject
TransactionManager txm;
@Inject
CurrentTransaction curTx;
@Inject
ObjectAllocator alloc;
@Test
void createObject() {
{
var tx = _tx.beginTransaction();
var parent = tx.getObject(new JObjectKey("Parent"), Parent.class);
parent.setName("John");
tx.commit();
txm.begin();
var newParent = alloc.create(Parent.class, new JObjectKey("Parent"));
newParent.setLastName("John");
curTx.putObject(newParent);
txm.commit();
}
{
var tx2 = _tx.beginTransaction();
var parent = tx2.getObject(new JObjectKey("Parent"));
Assertions.assertInstanceOf(Parent.class, parent);
Assertions.assertEquals("John", ((Parent) parent).getName());
txm.begin();
var parent = curTx.getObject(Parent.class, new JObjectKey("Parent"), LockingStrategy.READ_ONLY).orElse(null);
Assertions.assertEquals("John", parent.getLastName());
txm.commit();
}
}
@Test
void createObjectConflict() {
{
var tx = _tx.beginTransaction();
var parent = tx.getObject(new JObjectKey("Parent"), Parent.class);
parent.setName("John");
void createObjectConflict() throws InterruptedException {
AtomicBoolean thread1Failed = new AtomicBoolean(true);
AtomicBoolean thread2Failed = new AtomicBoolean(true);
var tx2 = _tx.beginTransaction();
var parent2 = tx2.getObject(new JObjectKey("Parent"), Parent.class);
parent2.setName("John");
var signal = new Semaphore(0);
tx.commit();
Assertions.assertThrows(Exception.class, tx2::commit);
new Thread(() -> {
Log.warn("Thread 1");
txm.begin();
var newParent = alloc.create(Parent.class, new JObjectKey("Parent2"));
newParent.setLastName("John");
curTx.putObject(newParent);
try {
signal.acquire();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
Log.warn("Thread 1 commit");
txm.commit();
thread1Failed.set(false);
}).start();
new Thread(() -> {
Log.warn("Thread 2");
txm.begin();
var newParent = alloc.create(Parent.class, new JObjectKey("Parent2"));
newParent.setLastName("John2");
curTx.putObject(newParent);
try {
signal.acquire();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
Log.warn("Thread 2 commit");
txm.commit();
thread2Failed.set(false);
}).start();
signal.release(2);
Thread.sleep(500);
txm.begin();
var got = curTx.getObject(Parent.class, new JObjectKey("Parent2"), LockingStrategy.READ_ONLY).orElse(null);
if (!thread1Failed.get()) {
Assertions.assertTrue(thread2Failed.get());
Assertions.assertEquals("John", got.getLastName());
} else if (!thread2Failed.get()) {
Assertions.assertEquals("John2", got.getLastName());
} else {
Assertions.fail("No thread succeeded");
}
}
@Test
void editConflict() {
{
var tx = _tx.beginTransaction();
var parent = tx.getObject(new JObjectKey("Parent"), Parent.class);
parent.setName("John");
tx.commit();
}
{
var tx = _tx.beginTransaction();
var parent = tx.getObject(new JObjectKey("Parent"), Parent.class);
parent.setName("John2");
var tx2 = _tx.beginTransaction();
var parent2 = tx2.getObject(new JObjectKey("Parent"), Parent.class);
parent2.setName("John3");
tx.commit();
Assertions.assertThrows(Exception.class, tx2::commit);
}
{
var tx2 = _tx.beginTransaction();
var parent = tx2.getObject(new JObjectKey("Parent"));
Assertions.assertInstanceOf(Parent.class, parent);
Assertions.assertEquals("John2", ((Parent) parent).getName());
}
}
@Test
void nestedCreate() {
{
var tx = _tx.beginTransaction();
var parent = tx.getObject(new JObjectKey("Parent"), Parent.class);
var kid = tx.getObject(new JObjectKey("Kid"), Kid.class);
parent.setName("John");
kid.setName("KidName");
parent.setKidKey(kid.getKey());
tx.commit();
}
{
var tx2 = _tx.beginTransaction();
var parent = tx2.getObject(new JObjectKey("Parent"));
Assertions.assertInstanceOf(Parent.class, parent);
Assertions.assertEquals("John", ((Parent) parent).getName());
Assertions.assertEquals("KidName", ((Parent) parent).getKid().getName());
}
}
// @Test
// void editConflict() {
// {
// var tx = _tx.beginTransaction();
// var parent = tx.getObject(new JObjectKey("Parent"), Parent.class);
// parent.setName("John");
// tx.commit();
// }
//
// {
// var tx = _tx.beginTransaction();
// var parent = tx.getObject(new JObjectKey("Parent"), Parent.class);
// parent.setName("John2");
//
// var tx2 = _tx.beginTransaction();
// var parent2 = tx2.getObject(new JObjectKey("Parent"), Parent.class);
// parent2.setName("John3");
//
// tx.commit();
// Assertions.assertThrows(Exception.class, tx2::commit);
// }
//
// {
// var tx2 = _tx.beginTransaction();
// var parent = tx2.getObject(new JObjectKey("Parent"));
// Assertions.assertInstanceOf(Parent.class, parent);
// Assertions.assertEquals("John2", ((Parent) parent).getName());
// }
// }
//
// @Test
// void nestedCreate() {
// {
// var tx = _tx.beginTransaction();
// var parent = tx.getObject(new JObjectKey("Parent"), Parent.class);
// var kid = tx.getObject(new JObjectKey("Kid"), Kid.class);
// parent.setName("John");
// kid.setName("KidName");
// parent.setKidKey(kid.getKey());
// tx.commit();
// }
//
// {
// var tx2 = _tx.beginTransaction();
// var parent = tx2.getObject(new JObjectKey("Parent"));
// Assertions.assertInstanceOf(Parent.class, parent);
// Assertions.assertEquals("John", ((Parent) parent).getName());
// Assertions.assertEquals("KidName", ((Parent) parent).getKid().getName());
// }
// }
}

View File

@@ -0,0 +1,14 @@
package com.usatiuk.dhfs.objects.allocator;
import com.usatiuk.dhfs.objects.JData;
import com.usatiuk.dhfs.objects.ObjectAllocator;
import lombok.Getter;
public abstract class ChangeTrackerBase<T extends JData> implements ObjectAllocator.ChangeTrackingJData<T> {
@Getter
private boolean _modified = false;
protected void onChange() {
_modified = true;
}
}

View File

@@ -0,0 +1,33 @@
package com.usatiuk.dhfs.objects.allocator;
import com.usatiuk.dhfs.objects.JObjectKey;
import com.usatiuk.dhfs.objects.data.Kid;
import lombok.Getter;
public class KidDataCT extends ChangeTrackerBase<Kid> implements Kid {
private final JObjectKey _key;
@Getter
private String _name;
@Override
public void setName(String name) {
_name = name;
onChange();
}
public KidDataCT(KidDataNormal normal) {
_key = normal.getKey();
_name = normal.getName();
}
@Override
public JObjectKey getKey() {
return _key;
}
@Override
public Kid wrapped() {
return this;
}
}

View File

@@ -0,0 +1,23 @@
package com.usatiuk.dhfs.objects.allocator;
import com.usatiuk.dhfs.objects.JObjectKey;
import com.usatiuk.dhfs.objects.data.Kid;
import lombok.Getter;
import lombok.Setter;
import java.io.Serializable;
public class KidDataNormal implements Kid, Serializable {
private final JObjectKey _key;
@Getter
@Setter
private String _name;
public KidDataNormal(JObjectKey key) {_key = key;}
@Override
public JObjectKey getKey() {
return _key;
}
}

View File

@@ -0,0 +1,40 @@
package com.usatiuk.dhfs.objects.allocator;
import com.usatiuk.dhfs.objects.JObjectKey;
import com.usatiuk.dhfs.objects.data.Parent;
import lombok.Getter;
public class ParentDataCT extends ChangeTrackerBase<Parent> implements Parent {
@Getter
private JObjectKey _name;
@Getter
private JObjectKey _kidKey;
@Getter
private String _lastName;
public void setKidKey(JObjectKey key) {
_kidKey = key;
onChange();
}
public void setLastName(String lastName) {
_lastName = lastName;
onChange();
}
public ParentDataCT(ParentDataNormal normal) {
_name = normal.getKey();
_kidKey = normal.getKidKey();
_lastName = normal.getLastName();
}
@Override
public JObjectKey getKey() {
return _name;
}
@Override
public Parent wrapped() {
return this;
}
}

View File

@@ -0,0 +1,29 @@
package com.usatiuk.dhfs.objects.allocator;
import com.usatiuk.dhfs.objects.JObjectKey;
import com.usatiuk.dhfs.objects.data.Parent;
import lombok.Getter;
import lombok.Setter;
import java.io.Serializable;
public class ParentDataNormal implements Parent, Serializable {
@Getter
private JObjectKey _name;
@Getter
@Setter
private JObjectKey _kidKey;
@Getter
@Setter
private String _lastName;
public ParentDataNormal(JObjectKey name) {
_name = name;
}
@Override
public JObjectKey getKey() {
return _name;
}
}

View File

@@ -1,4 +1,4 @@
package com.usatiuk.dhfs.objects.test.objs;
package com.usatiuk.dhfs.objects.allocator;
import com.usatiuk.dhfs.objects.JData;
import com.usatiuk.dhfs.objects.JObjectKey;

View File

@@ -0,0 +1,40 @@
package com.usatiuk.dhfs.objects.allocator;
import com.usatiuk.dhfs.objects.JData;
import com.usatiuk.dhfs.objects.JObjectKey;
import com.usatiuk.dhfs.objects.ObjectAllocator;
import com.usatiuk.dhfs.objects.data.Kid;
import com.usatiuk.dhfs.objects.data.Parent;
import jakarta.enterprise.context.ApplicationScoped;
@ApplicationScoped
public class TestObjectAllocator implements ObjectAllocator {
@Override
public <T extends JData> T create(Class<T> type, JObjectKey key) {
if (type == Kid.class) {
return type.cast(new KidDataNormal(key));
} else if (type == Parent.class) {
return type.cast(new ParentDataNormal(key));
} else {
throw new IllegalArgumentException("Unknown type: " + type);
}
}
@Override
public <T extends JData> ChangeTrackingJData<T> copy(T obj) {
if (obj instanceof ChangeTrackerBase<?>) {
throw new IllegalArgumentException("Cannot copy a ChangeTrackerBase object");
}
return switch (obj) {
case KidDataNormal kid -> (ChangeTrackingJData<T>) new KidDataCT(kid);
case ParentDataNormal parent -> (ChangeTrackingJData<T>) new ParentDataCT(parent);
default -> throw new IllegalStateException("Unexpected value: " + obj);
};
}
@Override
public <T extends JData> T unmodifiable(T obj) {
return obj; // TODO:
}
}

View File

@@ -0,0 +1,9 @@
package com.usatiuk.dhfs.objects.data;
import com.usatiuk.dhfs.objects.JData;
public interface Kid extends JData {
String getName();
void setName(String name);
}

View File

@@ -0,0 +1,15 @@
package com.usatiuk.dhfs.objects.data;
import com.usatiuk.dhfs.objects.JData;
import com.usatiuk.dhfs.objects.JObjectKey;
public interface Parent extends JData {
JObjectKey getName();
String getLastName();
void setLastName(String lastName);
JObjectKey getKidKey();
void setKidKey(JObjectKey kid);
}

View File

@@ -0,0 +1,23 @@
package com.usatiuk.dhfs.objects.serializer;
import com.google.protobuf.ByteString;
import com.usatiuk.dhfs.objects.JData;
import com.usatiuk.dhfs.objects.ObjectSerializer;
import com.usatiuk.dhfs.utils.SerializationHelper;
import jakarta.enterprise.context.ApplicationScoped;
import java.io.Serializable;
@ApplicationScoped
public class TestJDataSerializer implements ObjectSerializer<JData> {
@Override
public ByteString serialize(JData obj) {
return SerializationHelper.serialize((Serializable) obj);
}
@Override
public JData deserialize(ByteString data) {
return SerializationHelper.deserialize(data.toByteArray());
}
}

View File

@@ -1,18 +0,0 @@
package com.usatiuk.dhfs.objects.test.objs;
import com.usatiuk.dhfs.objects.JData;
import com.usatiuk.dhfs.objects.JObject;
import com.usatiuk.dhfs.objects.transaction.Transaction;
public class Kid extends JObject {
public Kid(Transaction Transaction, KidData data) {
super(Transaction, data);
}
@Override
public JData getData() {
return _data;
}
}

View File

@@ -1,19 +0,0 @@
package com.usatiuk.dhfs.objects.test.objs;
import com.usatiuk.dhfs.objects.JData;
import com.usatiuk.dhfs.objects.JObject;
import com.usatiuk.dhfs.objects.transaction.Transaction;
import java.util.function.Function;
public interface KidData extends JData {
String getName();
void setName(String name);
KidData bindCopy();
default Function<Transaction, JObject> binder(boolean isLocked) {
return jo -> new Kid(jo, bindCopy());
}
}

View File

@@ -1,28 +0,0 @@
package com.usatiuk.dhfs.objects.test.objs;
import com.usatiuk.dhfs.objects.JObjectKey;
public class KidDataImpl extends TestData implements KidData {
private String _name;
public KidDataImpl(long version, JObjectKey key, String name) {
super(version, key);
_name = name;
}
@Override
public String getName() {
return _name;
}
@Override
public void setName(String name) {
_name = name;
onChanged();
}
@Override
public KidDataImpl copy() {
return new KidDataImpl(isChanged() ? getVersion() + 1 : getVersion(), getKey(), _name);
}
}

View File

@@ -1,25 +0,0 @@
package com.usatiuk.dhfs.objects.test.objs;
import com.usatiuk.dhfs.objects.JData;
import com.usatiuk.dhfs.objects.JObject;
import com.usatiuk.dhfs.objects.transaction.Transaction;
import lombok.experimental.Delegate;
public class Parent extends JObject {
@Delegate
private final ParentData _data;
public Parent(Transaction Transaction, ParentData data) {
super(Transaction);
_data = data;
}
@Override
public JData getData() {
return _data;
}
public Kid getKid() {
return _jObjectInterface.getObject(_data.getKidKey(), Kid.class);
}
}

View File

@@ -1,14 +0,0 @@
package com.usatiuk.dhfs.objects.test.objs;
import com.usatiuk.dhfs.objects.JData;
import com.usatiuk.dhfs.objects.JObjectKey;
public interface ParentData extends JData {
String getName();
void setName(String name);
JObjectKey getKidKey();
void setKidKey(JObjectKey kid);
}

View File

@@ -1,41 +0,0 @@
package com.usatiuk.dhfs.objects.test.objs;
import com.usatiuk.dhfs.objects.JObjectKey;
public class ParentDataImpl extends TestData implements ParentData {
private String _name;
private JObjectKey _kidKey;
public ParentDataImpl(long version, JObjectKey key, String name, JObjectKey kidKey) {
super(version, key);
_name = name;
_kidKey = kidKey;
}
@Override
public String getName() {
return _name;
}
@Override
public void setName(String name) {
_name = name;
onChanged();
}
@Override
public JObjectKey getKidKey() {
return _kidKey;
}
@Override
public void setKidKey(JObjectKey kid) {
_kidKey = kid;
onChanged();
}
@Override
public ParentDataImpl copy() {
return new ParentDataImpl(isChanged() ? getVersion() + 1 : getVersion(), getKey(), _name, _kidKey);
}
}

View File

@@ -0,0 +1 @@
dhfs.objects.persistence=memory