diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/CurrentTransaction.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/CurrentTransaction.java new file mode 100644 index 00000000..fe309206 --- /dev/null +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/CurrentTransaction.java @@ -0,0 +1,24 @@ +package com.usatiuk.dhfs.objects; + +import com.usatiuk.dhfs.objects.transaction.LockingStrategy; +import com.usatiuk.dhfs.objects.transaction.Transaction; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; + +import java.util.Optional; + +@ApplicationScoped +public class CurrentTransaction implements Transaction { + @Inject + TransactionManager transactionManager; + + @Override + public Optional getObject(Class type, JObjectKey key, LockingStrategy strategy) { + return transactionManager.current().getObject(type, key, strategy); + } + + @Override + public void putObject(JData obj) { + transactionManager.current().putObject(obj); + } +} diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/JObjectManager.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/JObjectManager.java index d5c1aca6..c2485f1a 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/JObjectManager.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/JObjectManager.java @@ -1,6 +1,7 @@ package com.usatiuk.dhfs.objects; import com.usatiuk.dhfs.objects.persistence.ObjectPersistentStore; +import com.usatiuk.dhfs.objects.persistence.TxManifest; import com.usatiuk.dhfs.objects.transaction.TransactionFactory; import com.usatiuk.dhfs.objects.transaction.TransactionObjectSource; import com.usatiuk.dhfs.objects.transaction.TransactionPrivate; @@ -14,9 +15,7 @@ import org.apache.commons.lang3.tuple.Pair; import java.lang.ref.Cleaner; import java.lang.ref.WeakReference; -import java.util.ArrayList; -import java.util.LinkedList; -import java.util.Optional; +import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReadWriteLock; @@ -114,7 +113,9 @@ public class JObjectManager { }; public TransactionPrivate createTransaction() { - return transactionFactory.createTransaction(_txCounter.getAndIncrement(), _objSource); + var counter = _txCounter.getAndIncrement(); + Log.trace("Creating transaction " + counter); + return transactionFactory.createTransaction(counter, _objSource); } @@ -123,8 +124,11 @@ public class JObjectManager { var toFlush = new LinkedList>(); var toLock = new ArrayList>(); + Log.trace("Committing transaction " + tx.getId()); + try { for (var entry : tx.drain()) { + Log.trace("Processing entry " + entry.toString()); switch (entry) { case TxRecord.TxObjectRecordRead read -> { toUnlock.add(read.original().getLock().readLock()::unlock); @@ -148,7 +152,11 @@ public class JObjectManager { } } + toLock.sort(Comparator.comparingInt(a -> System.identityHashCode(a.original()))); + for (var record : toLock) { + Log.trace("Locking " + record.toString()); + var found = _objects.get(record.original().getKey()); if (found.get() != record.original()) { @@ -160,15 +168,65 @@ public class JObjectManager { } for (var record : toFlush) { + Log.trace("Processing flush entry " + record.toString()); + var current = _objects.get(record.copy().wrapped().getKey()); + if (current == null && !(record instanceof TxRecord.TxObjectRecordNew)) { + throw new IllegalStateException("Object not found during transaction"); + } else if (current != null) { + var old = switch (record) { + case TxRecord.TxObjectRecordCopyLock copy -> copy.original().get(); + case TxRecord.TxObjectRecordCopyNoLock copy -> copy.original(); + default -> throw new IllegalStateException("Unexpected value: " + record); + }; - assert current == null && record instanceof TxRecord.TxObjectRecordNew || current == record.copy().wrapped(); - - if (current.get() != ) + if (current.get() != old) { + throw new IllegalStateException("Object changed during transaction"); + } + if (current.lastWriteTx > tx.getId()) { + throw new IllegalStateException("Transaction race"); + } + } else if (record instanceof TxRecord.TxObjectRecordNew created) { + var wrapper = new JDataWrapper<>(created.created()); + wrapper.lock.writeLock().lock(); + var old = _objects.putIfAbsent(created.created().getKey(), wrapper); + if (old != null) + throw new IllegalStateException("Object already exists"); + toUnlock.add(wrapper.lock.writeLock()::unlock); + } else { + throw new IllegalStateException("Object not found during transaction"); + } } + // Have all locks now + for (var record : toFlush) { + Log.trace("Flushing " + record.toString()); + + if (!record.copy().isModified()) + continue; + + var obj = record.copy().wrapped(); + var key = obj.getKey(); + var data = objectSerializer.serialize(obj); + objectStorage.writeObject(key, data); + _objects.get(key).lastWriteTx = tx.getId(); // FIXME: + } + + Log.trace("Flushing transaction " + tx.getId()); + + objectStorage.commitTx(new TxManifest() { + @Override + public List getWritten() { + return toFlush.stream().map(r -> r.copy().wrapped().getKey()).toList(); + } + + @Override + public List getDeleted() { + return List.of(); + } + }); } catch (Throwable t) { Log.error("Error when committing transaction", t); throw t; diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/TransactionManagerImpl.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/TransactionManagerImpl.java index 3c29beba..f4944aa0 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/TransactionManagerImpl.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/TransactionManagerImpl.java @@ -2,13 +2,14 @@ package com.usatiuk.dhfs.objects; import com.usatiuk.dhfs.objects.transaction.Transaction; import com.usatiuk.dhfs.objects.transaction.TransactionPrivate; +import io.quarkus.logging.Log; import jakarta.enterprise.context.ApplicationScoped; import jakarta.inject.Inject; @ApplicationScoped public class TransactionManagerImpl implements TransactionManager { @Inject - JObjectManager objectManager; + JObjectManager jObjectManager; private static final ThreadLocal _currentTransaction = new ThreadLocal<>(); @@ -18,28 +19,36 @@ public class TransactionManagerImpl implements TransactionManager { throw new IllegalStateException("Transaction already started"); } - var tx = objectManager.createTransaction(); + Log.trace("Starting transaction"); + var tx = jObjectManager.createTransaction(); _currentTransaction.set(tx); } @Override public void commit() { - if(_currentTransaction.get() == null) { + if (_currentTransaction.get() == null) { throw new IllegalStateException("No transaction started"); } - jobjectManager.commit(_currentTransaction.get()); + Log.trace("Committing transaction"); + try { + jObjectManager.commit(_currentTransaction.get()); + } catch (Throwable e) { + Log.warn("Transaction commit failed", e); + throw e; + } finally { + _currentTransaction.remove(); + } } @Override public void rollback() { - + _currentTransaction.remove(); } @Override public Transaction current() { return _currentTransaction.get(); } - } diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/TxWritebackImpl.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/TxWritebackImpl.java index 28ecf30e..92705e43 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/TxWritebackImpl.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/TxWritebackImpl.java @@ -1,415 +1,415 @@ -package com.usatiuk.dhfs.objects; - -import com.usatiuk.dhfs.objects.persistence.ObjectPersistentStore; -import com.usatiuk.dhfs.utils.VoidFn; -import io.quarkus.logging.Log; -import io.quarkus.runtime.ShutdownEvent; -import io.quarkus.runtime.StartupEvent; -import jakarta.annotation.Priority; -import jakarta.enterprise.context.ApplicationScoped; -import jakarta.enterprise.event.Observes; -import jakarta.inject.Inject; -import lombok.Getter; -import org.apache.commons.lang3.concurrent.BasicThreadFactory; -import org.eclipse.microprofile.config.inject.ConfigProperty; - -import java.util.*; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.atomic.AtomicLong; -import java.util.stream.Collectors; -import java.util.stream.Stream; - -@ApplicationScoped -public class TxWritebackImpl implements TxWriteback { - private final LinkedList _pendingBundles = new LinkedList<>(); - private final LinkedHashMap _notFlushedBundles = new LinkedHashMap<>(); - - private final Object _flushWaitSynchronizer = new Object(); - private final AtomicLong _lastWrittenTx = new AtomicLong(-1); - private final AtomicLong _counter = new AtomicLong(); - private final AtomicLong _waitedTotal = new AtomicLong(0); - @Inject - ObjectPersistentStore objectPersistentStore; - @ConfigProperty(name = "dhfs.objects.writeback.limit") - long sizeLimit; - private long currentSize = 0; - private ExecutorService _writebackExecutor; - private ExecutorService _commitExecutor; - private ExecutorService _statusExecutor; - private volatile boolean _ready = false; - - void init(@Observes @Priority(110) StartupEvent event) { - { - BasicThreadFactory factory = new BasicThreadFactory.Builder() - .namingPattern("tx-writeback-%d") - .build(); - - _writebackExecutor = Executors.newSingleThreadExecutor(factory); - _writebackExecutor.submit(this::writeback); - } - - { - BasicThreadFactory factory = new BasicThreadFactory.Builder() - .namingPattern("writeback-commit-%d") - .build(); - - _commitExecutor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(), factory); - } - _statusExecutor = Executors.newSingleThreadExecutor(); - _statusExecutor.submit(() -> { - try { - while (true) { - Thread.sleep(1000); - if (currentSize > 0) - Log.info("Tx commit status: size=" - + currentSize / 1024 / 1024 + "MB"); - } - } catch (InterruptedException ignored) { - } - }); - _ready = true; - } - - void shutdown(@Observes @Priority(890) ShutdownEvent event) throws InterruptedException { - Log.info("Waiting for all transactions to drain"); - - synchronized (_flushWaitSynchronizer) { - _ready = false; - while (currentSize > 0) { - _flushWaitSynchronizer.wait(); - } - } - - _writebackExecutor.shutdownNow(); - Log.info("Total tx bundle wait time: " + _waitedTotal.get() + "ms"); - } - - private void verifyReady() { - if (!_ready) throw new IllegalStateException("Not doing transactions while shutting down!"); - } - - private void writeback() { - while (!Thread.interrupted()) { - try { - TxBundle bundle = new TxBundle(0); - synchronized (_pendingBundles) { - while (_pendingBundles.isEmpty() || !_pendingBundles.peek()._ready) - _pendingBundles.wait(); - - long diff = 0; - while (!_pendingBundles.isEmpty() && _pendingBundles.peek()._ready) { - var toCompress = _pendingBundles.poll(); - diff -= toCompress.calculateTotalSize(); - bundle.compress(toCompress); - } - diff += bundle.calculateTotalSize(); - synchronized (_flushWaitSynchronizer) { - currentSize += diff; - } - } - - var latch = new CountDownLatch(bundle._committed.size() + bundle._meta.size()); - ConcurrentLinkedQueue errors = new ConcurrentLinkedQueue<>(); - - for (var c : bundle._committed.values()) { - _commitExecutor.execute(() -> { - try { - Log.trace("Writing new " + c.newMeta.getName()); - objectPersistentStore.writeNewObject(c.newMeta.getName(), c.newMeta, c.newData); - } catch (Throwable t) { - Log.error("Error writing " + c.newMeta.getName(), t); - errors.add(t); - } finally { - latch.countDown(); - } - }); - } - for (var c : bundle._meta.values()) { - _commitExecutor.execute(() -> { - try { - Log.trace("Writing (meta) " + c.newMeta.getName()); - objectPersistentStore.writeNewObjectMeta(c.newMeta.getName(), c.newMeta); - } catch (Throwable t) { - Log.error("Error writing " + c.newMeta.getName(), t); - errors.add(t); - } finally { - latch.countDown(); - } - }); - } - if (Log.isDebugEnabled()) - for (var d : bundle._deleted.keySet()) - Log.debug("Deleting from persistent storage " + d.getMeta().getName()); // FIXME: For tests - - latch.await(); - if (!errors.isEmpty()) { - throw new RuntimeException("Errors in writeback!"); - } - objectPersistentStore.commitTx( - new TxManifest( - Stream.concat(bundle._committed.keySet().stream().map(t -> t.getMeta().getName()), - bundle._meta.keySet().stream().map(t -> t.getMeta().getName())).collect(Collectors.toCollection(ArrayList::new)), - bundle._deleted.keySet().stream().map(t -> t.getMeta().getName()).collect(Collectors.toCollection(ArrayList::new)) - )); - Log.trace("Bundle " + bundle.getId() + " committed"); - - - List> callbacks = new ArrayList<>(); - synchronized (_notFlushedBundles) { - _lastWrittenTx.set(bundle.getId()); - while (!_notFlushedBundles.isEmpty() && _notFlushedBundles.firstEntry().getKey() <= bundle.getId()) { - callbacks.add(_notFlushedBundles.pollFirstEntry().getValue().setCommitted()); - } - } - callbacks.forEach(l -> l.forEach(VoidFn::apply)); - - synchronized (_flushWaitSynchronizer) { - currentSize -= ((TxBundle) bundle).calculateTotalSize(); - // FIXME: - if (currentSize <= sizeLimit || !_ready) - _flushWaitSynchronizer.notifyAll(); - } - } catch (InterruptedException ignored) { - } catch (Exception e) { - Log.error("Uncaught exception in writeback", e); - } catch (Throwable o) { - Log.error("Uncaught THROWABLE in writeback", o); - } - } - Log.info("Writeback thread exiting"); - } - - @Override - public com.usatiuk.dhfs.objects.jrepository.TxBundle createBundle() { - verifyReady(); - boolean wait = false; - while (true) { - if (wait) { - synchronized (_flushWaitSynchronizer) { - long started = System.currentTimeMillis(); - while (currentSize > sizeLimit) { - try { - _flushWaitSynchronizer.wait(); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - } - long waited = System.currentTimeMillis() - started; - _waitedTotal.addAndGet(waited); - if (Log.isTraceEnabled()) - Log.trace("Thread " + Thread.currentThread().getName() + " waited for tx bundle for " + waited + " ms"); - wait = false; - } - } - synchronized (_pendingBundles) { - synchronized (_flushWaitSynchronizer) { - if (currentSize > sizeLimit) { - if (!_pendingBundles.isEmpty() && _pendingBundles.peek()._ready) { - var target = _pendingBundles.poll(); - - long diff = -target.calculateTotalSize(); - while (!_pendingBundles.isEmpty() && _pendingBundles.peek()._ready) { - var toCompress = _pendingBundles.poll(); - diff -= toCompress.calculateTotalSize(); - target.compress(toCompress); - } - diff += target.calculateTotalSize(); - currentSize += diff; - _pendingBundles.addFirst(target); - } - } - - if (currentSize > sizeLimit) { - wait = true; - continue; - } - } - synchronized (_notFlushedBundles) { - var bundle = new TxBundle(_counter.incrementAndGet()); - _pendingBundles.addLast(bundle); - _notFlushedBundles.put(bundle.getId(), bundle); - return bundle; - } - } - } - } - - @Override - public void commitBundle(com.usatiuk.dhfs.objects.TxBundle bundle) { - verifyReady(); - synchronized (_pendingBundles) { - ((TxBundle) bundle).setReady(); - if (_pendingBundles.peek() == bundle) - _pendingBundles.notify(); - synchronized (_flushWaitSynchronizer) { - currentSize += ((TxBundle) bundle).calculateTotalSize(); - } - } - } - - @Override - public void dropBundle(com.usatiuk.dhfs.objects.TxBundle bundle) { - verifyReady(); - synchronized (_pendingBundles) { - Log.warn("Dropped bundle: " + bundle); - _pendingBundles.remove((TxBundle) bundle); - synchronized (_flushWaitSynchronizer) { - currentSize -= ((TxBundle) bundle).calculateTotalSize(); - } - } - } - - @Override - public void fence(long bundleId) { - var latch = new CountDownLatch(1); - asyncFence(bundleId, latch::countDown); - try { - latch.await(); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - } - - @Override - public void asyncFence(long bundleId, VoidFn fn) { - verifyReady(); - if (bundleId < 0) throw new IllegalArgumentException("txId should be >0!"); - if (_lastWrittenTx.get() >= bundleId) { - fn.apply(); - return; - } - synchronized (_notFlushedBundles) { - if (_lastWrittenTx.get() >= bundleId) { - fn.apply(); - return; - } - _notFlushedBundles.get(bundleId).addCallback(fn); - } - } - - @Getter - private static class TxManifest implements com.usatiuk.dhfs.objects.repository.persistence.TxManifest { - private final ArrayList _written; - private final ArrayList _deleted; - - private TxManifest(ArrayList written, ArrayList deleted) { - _written = written; - _deleted = deleted; - } - } - - private class TxBundle implements com.usatiuk.dhfs.objects.jrepository.TxBundle { - private final HashMap, CommittedEntry> _committed = new HashMap<>(); - private final HashMap, CommittedMeta> _meta = new HashMap<>(); - private final HashMap, Integer> _deleted = new HashMap<>(); - private final ArrayList _callbacks = new ArrayList<>(); - private long _txId; - @Getter - private volatile boolean _ready = false; - private long _size = -1; - private boolean _wasCommitted = false; - - private TxBundle(long txId) {_txId = txId;} - - @Override - public long getId() { - return _txId; - } - - public void setReady() { - _ready = true; - } - - public void addCallback(VoidFn callback) { - synchronized (_callbacks) { - if (_wasCommitted) throw new IllegalStateException(); - _callbacks.add(callback); - } - } - - public List setCommitted() { - synchronized (_callbacks) { - _wasCommitted = true; - return Collections.unmodifiableList(_callbacks); - } - } - - @Override - public void commit(JObject obj, ObjectMetadataP meta, JObjectDataP data) { - synchronized (_committed) { - _committed.put(obj, new CommittedEntry(meta, data, obj.estimateSize())); - } - } - - @Override - public void commitMetaChange(JObject obj, ObjectMetadataP meta) { - synchronized (_meta) { - _meta.put(obj, new CommittedMeta(meta, obj.estimateSize())); - } - } - - @Override - public void delete(JObject obj) { - synchronized (_deleted) { - _deleted.put(obj, obj.estimateSize()); - } - } - - - public long calculateTotalSize() { - if (_size >= 0) return _size; - long out = 0; - for (var c : _committed.values()) - out += c.size; - for (var c : _meta.values()) - out += c.size; - for (var c : _deleted.entrySet()) - out += c.getValue(); - _size = out; - return _size; - } - - public void compress(TxBundle other) { - if (_txId >= other._txId) - throw new IllegalArgumentException("Compressing an older bundle into newer"); - - _txId = other._txId; - _size = -1; - - for (var d : other._deleted.entrySet()) { - _committed.remove(d.getKey()); - _meta.remove(d.getKey()); - _deleted.put(d.getKey(), d.getValue()); - } - - for (var c : other._committed.entrySet()) { - _committed.put(c.getKey(), c.getValue()); - _meta.remove(c.getKey()); - _deleted.remove(c.getKey()); - } - - for (var m : other._meta.entrySet()) { - var deleted = _deleted.remove(m.getKey()); - if (deleted != null) { - _committed.put(m.getKey(), new CommittedEntry(m.getValue().newMeta, null, m.getKey().estimateSize())); - continue; - } - var committed = _committed.remove(m.getKey()); - if (committed != null) { - _committed.put(m.getKey(), new CommittedEntry(m.getValue().newMeta, committed.newData, m.getKey().estimateSize())); - continue; - } - _meta.put(m.getKey(), m.getValue()); - } - } - - private record CommittedEntry(ObjectMetadataP newMeta, JObjectDataP newData, int size) {} - - private record CommittedMeta(ObjectMetadataP newMeta, int size) {} - - private record Deleted(JObject handle) {} - } -} +//package com.usatiuk.dhfs.objects; +// +//import com.usatiuk.dhfs.objects.persistence.ObjectPersistentStore; +//import com.usatiuk.dhfs.utils.VoidFn; +//import io.quarkus.logging.Log; +//import io.quarkus.runtime.ShutdownEvent; +//import io.quarkus.runtime.StartupEvent; +//import jakarta.annotation.Priority; +//import jakarta.enterprise.context.ApplicationScoped; +//import jakarta.enterprise.event.Observes; +//import jakarta.inject.Inject; +//import lombok.Getter; +//import org.apache.commons.lang3.concurrent.BasicThreadFactory; +//import org.eclipse.microprofile.config.inject.ConfigProperty; +// +//import java.util.*; +//import java.util.concurrent.ConcurrentLinkedQueue; +//import java.util.concurrent.CountDownLatch; +//import java.util.concurrent.ExecutorService; +//import java.util.concurrent.Executors; +//import java.util.concurrent.atomic.AtomicLong; +//import java.util.stream.Collectors; +//import java.util.stream.Stream; +// +//@ApplicationScoped +//public class TxWritebackImpl implements TxWriteback { +// private final LinkedList _pendingBundles = new LinkedList<>(); +// private final LinkedHashMap _notFlushedBundles = new LinkedHashMap<>(); +// +// private final Object _flushWaitSynchronizer = new Object(); +// private final AtomicLong _lastWrittenTx = new AtomicLong(-1); +// private final AtomicLong _counter = new AtomicLong(); +// private final AtomicLong _waitedTotal = new AtomicLong(0); +// @Inject +// ObjectPersistentStore objectPersistentStore; +// @ConfigProperty(name = "dhfs.objects.writeback.limit") +// long sizeLimit; +// private long currentSize = 0; +// private ExecutorService _writebackExecutor; +// private ExecutorService _commitExecutor; +// private ExecutorService _statusExecutor; +// private volatile boolean _ready = false; +// +// void init(@Observes @Priority(110) StartupEvent event) { +// { +// BasicThreadFactory factory = new BasicThreadFactory.Builder() +// .namingPattern("tx-writeback-%d") +// .build(); +// +// _writebackExecutor = Executors.newSingleThreadExecutor(factory); +// _writebackExecutor.submit(this::writeback); +// } +// +// { +// BasicThreadFactory factory = new BasicThreadFactory.Builder() +// .namingPattern("writeback-commit-%d") +// .build(); +// +// _commitExecutor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(), factory); +// } +// _statusExecutor = Executors.newSingleThreadExecutor(); +// _statusExecutor.submit(() -> { +// try { +// while (true) { +// Thread.sleep(1000); +// if (currentSize > 0) +// Log.info("Tx commit status: size=" +// + currentSize / 1024 / 1024 + "MB"); +// } +// } catch (InterruptedException ignored) { +// } +// }); +// _ready = true; +// } +// +// void shutdown(@Observes @Priority(890) ShutdownEvent event) throws InterruptedException { +// Log.info("Waiting for all transactions to drain"); +// +// synchronized (_flushWaitSynchronizer) { +// _ready = false; +// while (currentSize > 0) { +// _flushWaitSynchronizer.wait(); +// } +// } +// +// _writebackExecutor.shutdownNow(); +// Log.info("Total tx bundle wait time: " + _waitedTotal.get() + "ms"); +// } +// +// private void verifyReady() { +// if (!_ready) throw new IllegalStateException("Not doing transactions while shutting down!"); +// } +// +// private void writeback() { +// while (!Thread.interrupted()) { +// try { +// TxBundle bundle = new TxBundle(0); +// synchronized (_pendingBundles) { +// while (_pendingBundles.isEmpty() || !_pendingBundles.peek()._ready) +// _pendingBundles.wait(); +// +// long diff = 0; +// while (!_pendingBundles.isEmpty() && _pendingBundles.peek()._ready) { +// var toCompress = _pendingBundles.poll(); +// diff -= toCompress.calculateTotalSize(); +// bundle.compress(toCompress); +// } +// diff += bundle.calculateTotalSize(); +// synchronized (_flushWaitSynchronizer) { +// currentSize += diff; +// } +// } +// +// var latch = new CountDownLatch(bundle._committed.size() + bundle._meta.size()); +// ConcurrentLinkedQueue errors = new ConcurrentLinkedQueue<>(); +// +// for (var c : bundle._committed.values()) { +// _commitExecutor.execute(() -> { +// try { +// Log.trace("Writing new " + c.newMeta.getName()); +// objectPersistentStore.writeNewObject(c.newMeta.getName(), c.newMeta, c.newData); +// } catch (Throwable t) { +// Log.error("Error writing " + c.newMeta.getName(), t); +// errors.add(t); +// } finally { +// latch.countDown(); +// } +// }); +// } +// for (var c : bundle._meta.values()) { +// _commitExecutor.execute(() -> { +// try { +// Log.trace("Writing (meta) " + c.newMeta.getName()); +// objectPersistentStore.writeNewObjectMeta(c.newMeta.getName(), c.newMeta); +// } catch (Throwable t) { +// Log.error("Error writing " + c.newMeta.getName(), t); +// errors.add(t); +// } finally { +// latch.countDown(); +// } +// }); +// } +// if (Log.isDebugEnabled()) +// for (var d : bundle._deleted.keySet()) +// Log.debug("Deleting from persistent storage " + d.getMeta().getName()); // FIXME: For tests +// +// latch.await(); +// if (!errors.isEmpty()) { +// throw new RuntimeException("Errors in writeback!"); +// } +// objectPersistentStore.commitTx( +// new TxManifest( +// Stream.concat(bundle._committed.keySet().stream().map(t -> t.getMeta().getName()), +// bundle._meta.keySet().stream().map(t -> t.getMeta().getName())).collect(Collectors.toCollection(ArrayList::new)), +// bundle._deleted.keySet().stream().map(t -> t.getMeta().getName()).collect(Collectors.toCollection(ArrayList::new)) +// )); +// Log.trace("Bundle " + bundle.getId() + " committed"); +// +// +// List> callbacks = new ArrayList<>(); +// synchronized (_notFlushedBundles) { +// _lastWrittenTx.set(bundle.getId()); +// while (!_notFlushedBundles.isEmpty() && _notFlushedBundles.firstEntry().getKey() <= bundle.getId()) { +// callbacks.add(_notFlushedBundles.pollFirstEntry().getValue().setCommitted()); +// } +// } +// callbacks.forEach(l -> l.forEach(VoidFn::apply)); +// +// synchronized (_flushWaitSynchronizer) { +// currentSize -= ((TxBundle) bundle).calculateTotalSize(); +// // FIXME: +// if (currentSize <= sizeLimit || !_ready) +// _flushWaitSynchronizer.notifyAll(); +// } +// } catch (InterruptedException ignored) { +// } catch (Exception e) { +// Log.error("Uncaught exception in writeback", e); +// } catch (Throwable o) { +// Log.error("Uncaught THROWABLE in writeback", o); +// } +// } +// Log.info("Writeback thread exiting"); +// } +// +// @Override +// public com.usatiuk.dhfs.objects.jrepository.TxBundle createBundle() { +// verifyReady(); +// boolean wait = false; +// while (true) { +// if (wait) { +// synchronized (_flushWaitSynchronizer) { +// long started = System.currentTimeMillis(); +// while (currentSize > sizeLimit) { +// try { +// _flushWaitSynchronizer.wait(); +// } catch (InterruptedException e) { +// throw new RuntimeException(e); +// } +// } +// long waited = System.currentTimeMillis() - started; +// _waitedTotal.addAndGet(waited); +// if (Log.isTraceEnabled()) +// Log.trace("Thread " + Thread.currentThread().getName() + " waited for tx bundle for " + waited + " ms"); +// wait = false; +// } +// } +// synchronized (_pendingBundles) { +// synchronized (_flushWaitSynchronizer) { +// if (currentSize > sizeLimit) { +// if (!_pendingBundles.isEmpty() && _pendingBundles.peek()._ready) { +// var target = _pendingBundles.poll(); +// +// long diff = -target.calculateTotalSize(); +// while (!_pendingBundles.isEmpty() && _pendingBundles.peek()._ready) { +// var toCompress = _pendingBundles.poll(); +// diff -= toCompress.calculateTotalSize(); +// target.compress(toCompress); +// } +// diff += target.calculateTotalSize(); +// currentSize += diff; +// _pendingBundles.addFirst(target); +// } +// } +// +// if (currentSize > sizeLimit) { +// wait = true; +// continue; +// } +// } +// synchronized (_notFlushedBundles) { +// var bundle = new TxBundle(_counter.incrementAndGet()); +// _pendingBundles.addLast(bundle); +// _notFlushedBundles.put(bundle.getId(), bundle); +// return bundle; +// } +// } +// } +// } +// +// @Override +// public void commitBundle(com.usatiuk.dhfs.objects.TxBundle bundle) { +// verifyReady(); +// synchronized (_pendingBundles) { +// ((TxBundle) bundle).setReady(); +// if (_pendingBundles.peek() == bundle) +// _pendingBundles.notify(); +// synchronized (_flushWaitSynchronizer) { +// currentSize += ((TxBundle) bundle).calculateTotalSize(); +// } +// } +// } +// +// @Override +// public void dropBundle(com.usatiuk.dhfs.objects.TxBundle bundle) { +// verifyReady(); +// synchronized (_pendingBundles) { +// Log.warn("Dropped bundle: " + bundle); +// _pendingBundles.remove((TxBundle) bundle); +// synchronized (_flushWaitSynchronizer) { +// currentSize -= ((TxBundle) bundle).calculateTotalSize(); +// } +// } +// } +// +// @Override +// public void fence(long bundleId) { +// var latch = new CountDownLatch(1); +// asyncFence(bundleId, latch::countDown); +// try { +// latch.await(); +// } catch (InterruptedException e) { +// throw new RuntimeException(e); +// } +// } +// +// @Override +// public void asyncFence(long bundleId, VoidFn fn) { +// verifyReady(); +// if (bundleId < 0) throw new IllegalArgumentException("txId should be >0!"); +// if (_lastWrittenTx.get() >= bundleId) { +// fn.apply(); +// return; +// } +// synchronized (_notFlushedBundles) { +// if (_lastWrittenTx.get() >= bundleId) { +// fn.apply(); +// return; +// } +// _notFlushedBundles.get(bundleId).addCallback(fn); +// } +// } +// +// @Getter +// private static class TxManifest implements com.usatiuk.dhfs.objects.repository.persistence.TxManifest { +// private final ArrayList _written; +// private final ArrayList _deleted; +// +// private TxManifest(ArrayList written, ArrayList deleted) { +// _written = written; +// _deleted = deleted; +// } +// } +// +// private class TxBundle implements com.usatiuk.dhfs.objects.jrepository.TxBundle { +// private final HashMap, CommittedEntry> _committed = new HashMap<>(); +// private final HashMap, CommittedMeta> _meta = new HashMap<>(); +// private final HashMap, Integer> _deleted = new HashMap<>(); +// private final ArrayList _callbacks = new ArrayList<>(); +// private long _txId; +// @Getter +// private volatile boolean _ready = false; +// private long _size = -1; +// private boolean _wasCommitted = false; +// +// private TxBundle(long txId) {_txId = txId;} +// +// @Override +// public long getId() { +// return _txId; +// } +// +// public void setReady() { +// _ready = true; +// } +// +// public void addCallback(VoidFn callback) { +// synchronized (_callbacks) { +// if (_wasCommitted) throw new IllegalStateException(); +// _callbacks.add(callback); +// } +// } +// +// public List setCommitted() { +// synchronized (_callbacks) { +// _wasCommitted = true; +// return Collections.unmodifiableList(_callbacks); +// } +// } +// +// @Override +// public void commit(JObject obj, ObjectMetadataP meta, JObjectDataP data) { +// synchronized (_committed) { +// _committed.put(obj, new CommittedEntry(meta, data, obj.estimateSize())); +// } +// } +// +// @Override +// public void commitMetaChange(JObject obj, ObjectMetadataP meta) { +// synchronized (_meta) { +// _meta.put(obj, new CommittedMeta(meta, obj.estimateSize())); +// } +// } +// +// @Override +// public void delete(JObject obj) { +// synchronized (_deleted) { +// _deleted.put(obj, obj.estimateSize()); +// } +// } +// +// +// public long calculateTotalSize() { +// if (_size >= 0) return _size; +// long out = 0; +// for (var c : _committed.values()) +// out += c.size; +// for (var c : _meta.values()) +// out += c.size; +// for (var c : _deleted.entrySet()) +// out += c.getValue(); +// _size = out; +// return _size; +// } +// +// public void compress(TxBundle other) { +// if (_txId >= other._txId) +// throw new IllegalArgumentException("Compressing an older bundle into newer"); +// +// _txId = other._txId; +// _size = -1; +// +// for (var d : other._deleted.entrySet()) { +// _committed.remove(d.getKey()); +// _meta.remove(d.getKey()); +// _deleted.put(d.getKey(), d.getValue()); +// } +// +// for (var c : other._committed.entrySet()) { +// _committed.put(c.getKey(), c.getValue()); +// _meta.remove(c.getKey()); +// _deleted.remove(c.getKey()); +// } +// +// for (var m : other._meta.entrySet()) { +// var deleted = _deleted.remove(m.getKey()); +// if (deleted != null) { +// _committed.put(m.getKey(), new CommittedEntry(m.getValue().newMeta, null, m.getKey().estimateSize())); +// continue; +// } +// var committed = _committed.remove(m.getKey()); +// if (committed != null) { +// _committed.put(m.getKey(), new CommittedEntry(m.getValue().newMeta, committed.newData, m.getKey().estimateSize())); +// continue; +// } +// _meta.put(m.getKey(), m.getValue()); +// } +// } +// +// private record CommittedEntry(ObjectMetadataP newMeta, JObjectDataP newData, int size) {} +// +// private record CommittedMeta(ObjectMetadataP newMeta, int size) {} +// +// private record Deleted(JObject handle) {} +// } +//} diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/persistence/FileObjectPersistentStore.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/persistence/FileObjectPersistentStore.java index b88b3cac..ed3cbef7 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/persistence/FileObjectPersistentStore.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/persistence/FileObjectPersistentStore.java @@ -8,6 +8,8 @@ import com.usatiuk.dhfs.utils.ByteUtils; import com.usatiuk.dhfs.utils.SerializationHelper; import com.usatiuk.dhfs.utils.StatusRuntimeExceptionNoStacktrace; import io.grpc.Status; +import io.quarkus.arc.lookup.LookupIfProperty; +import io.quarkus.arc.properties.IfBuildProperty; import io.quarkus.logging.Log; import io.quarkus.runtime.ShutdownEvent; import io.quarkus.runtime.StartupEvent; @@ -43,6 +45,7 @@ import static java.nio.file.StandardCopyOption.REPLACE_EXISTING; // rest of metadata @ApplicationScoped +@IfBuildProperty(name = "dhfs.objects.persistence", stringValue = "files") public class FileObjectPersistentStore implements ObjectPersistentStore { private final Path _root; private final Path _txManifest; diff --git a/dhfs-parent/objects/src/test/java/com/usatiuk/dhfs/objects/persistence/FakeObjectStorage.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/persistence/MemoryObjectPersistentStore.java similarity index 63% rename from dhfs-parent/objects/src/test/java/com/usatiuk/dhfs/objects/persistence/FakeObjectStorage.java rename to dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/persistence/MemoryObjectPersistentStore.java index 90c5c7c1..41af2572 100644 --- a/dhfs-parent/objects/src/test/java/com/usatiuk/dhfs/objects/persistence/FakeObjectStorage.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/persistence/MemoryObjectPersistentStore.java @@ -1,8 +1,10 @@ package com.usatiuk.dhfs.objects.persistence; -import com.usatiuk.dhfs.objects.JData; +import com.google.protobuf.ByteString; import com.usatiuk.dhfs.objects.JObjectKey; -import com.usatiuk.dhfs.objects.test.objs.TestData; +import io.quarkus.arc.lookup.LookupIfProperty; +import io.quarkus.arc.properties.IfBuildProperty; +import jakarta.enterprise.context.ApplicationScoped; import javax.annotation.Nonnull; import java.util.Collection; @@ -10,9 +12,11 @@ import java.util.HashMap; import java.util.Map; import java.util.Optional; -public class FakeObjectStorage implements ObjectPersistentStore { - private final Map _objects = new HashMap<>(); - private final Map _pending = new HashMap<>(); +@ApplicationScoped +@IfBuildProperty(name = "dhfs.objects.persistence", stringValue = "memory") +public class MemoryObjectPersistentStore implements ObjectPersistentStore { + private final Map _objects = new HashMap<>(); + private final Map _pending = new HashMap<>(); @Nonnull @Override @@ -24,16 +28,16 @@ public class FakeObjectStorage implements ObjectPersistentStore { @Nonnull @Override - public Optional readObject(JObjectKey name) { + public Optional readObject(JObjectKey name) { synchronized (this) { return Optional.ofNullable(_objects.get(name)); } } @Override - public void writeObject(JObjectKey name, JData object) { + public void writeObject(JObjectKey name, ByteString object) { synchronized (this) { - _pending.put(name, (TestData) object); + _pending.put(name, object); } } @@ -49,13 +53,6 @@ public class FakeObjectStorage implements ObjectPersistentStore { } } - @Override - public void deleteObjectDirect(JObjectKey name) { - synchronized (this) { - _objects.remove(name); - } - } - @Override public long getTotalSpace() { return 0; diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/TransactionFactoryImpl.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/TransactionFactoryImpl.java index 44367977..c0f8c55c 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/TransactionFactoryImpl.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/transaction/TransactionFactoryImpl.java @@ -3,12 +3,14 @@ package com.usatiuk.dhfs.objects.transaction; import com.usatiuk.dhfs.objects.JData; import com.usatiuk.dhfs.objects.JObjectKey; import com.usatiuk.dhfs.objects.ObjectAllocator; +import jakarta.enterprise.context.ApplicationScoped; import jakarta.inject.Inject; import lombok.AccessLevel; import lombok.Getter; import java.util.*; +@ApplicationScoped public class TransactionFactoryImpl implements TransactionFactory { @Inject ObjectAllocator objectAllocator; diff --git a/dhfs-parent/objects/src/test/java/com/usatiuk/dhfs/objects/ObjectsTest.java b/dhfs-parent/objects/src/test/java/com/usatiuk/dhfs/objects/ObjectsTest.java index 6a65b186..b3fd7649 100644 --- a/dhfs-parent/objects/src/test/java/com/usatiuk/dhfs/objects/ObjectsTest.java +++ b/dhfs-parent/objects/src/test/java/com/usatiuk/dhfs/objects/ObjectsTest.java @@ -1,97 +1,150 @@ package com.usatiuk.dhfs.objects; -import com.usatiuk.dhfs.objects.persistence.FakeObjectStorage; -import com.usatiuk.dhfs.objects.test.objs.Kid; -import com.usatiuk.dhfs.objects.test.objs.Parent; +import com.usatiuk.dhfs.objects.data.Parent; +import com.usatiuk.dhfs.objects.transaction.LockingStrategy; +import io.quarkus.logging.Log; +import io.quarkus.test.junit.QuarkusTest; +import jakarta.inject.Inject; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicBoolean; + +@QuarkusTest public class ObjectsTest { - private final FakeObjectStorage _storage = new FakeObjectStorage(); - private final JObjectManager _tx = new JObjectManager(_storage); + @Inject + TransactionManager txm; + + @Inject + CurrentTransaction curTx; + + @Inject + ObjectAllocator alloc; @Test void createObject() { { - var tx = _tx.beginTransaction(); - var parent = tx.getObject(new JObjectKey("Parent"), Parent.class); - parent.setName("John"); - tx.commit(); + txm.begin(); + var newParent = alloc.create(Parent.class, new JObjectKey("Parent")); + newParent.setLastName("John"); + curTx.putObject(newParent); + txm.commit(); } { - var tx2 = _tx.beginTransaction(); - var parent = tx2.getObject(new JObjectKey("Parent")); - Assertions.assertInstanceOf(Parent.class, parent); - Assertions.assertEquals("John", ((Parent) parent).getName()); + txm.begin(); + var parent = curTx.getObject(Parent.class, new JObjectKey("Parent"), LockingStrategy.READ_ONLY).orElse(null); + Assertions.assertEquals("John", parent.getLastName()); + txm.commit(); } } @Test - void createObjectConflict() { - { - var tx = _tx.beginTransaction(); - var parent = tx.getObject(new JObjectKey("Parent"), Parent.class); - parent.setName("John"); + void createObjectConflict() throws InterruptedException { + AtomicBoolean thread1Failed = new AtomicBoolean(true); + AtomicBoolean thread2Failed = new AtomicBoolean(true); - var tx2 = _tx.beginTransaction(); - var parent2 = tx2.getObject(new JObjectKey("Parent"), Parent.class); - parent2.setName("John"); + var signal = new Semaphore(0); - tx.commit(); - Assertions.assertThrows(Exception.class, tx2::commit); + new Thread(() -> { + Log.warn("Thread 1"); + txm.begin(); + var newParent = alloc.create(Parent.class, new JObjectKey("Parent2")); + newParent.setLastName("John"); + curTx.putObject(newParent); + try { + signal.acquire(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + Log.warn("Thread 1 commit"); + txm.commit(); + thread1Failed.set(false); + }).start(); + + new Thread(() -> { + Log.warn("Thread 2"); + txm.begin(); + var newParent = alloc.create(Parent.class, new JObjectKey("Parent2")); + newParent.setLastName("John2"); + curTx.putObject(newParent); + try { + signal.acquire(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + Log.warn("Thread 2 commit"); + txm.commit(); + thread2Failed.set(false); + }).start(); + + signal.release(2); + + Thread.sleep(500); + + txm.begin(); + var got = curTx.getObject(Parent.class, new JObjectKey("Parent2"), LockingStrategy.READ_ONLY).orElse(null); + + if (!thread1Failed.get()) { + Assertions.assertTrue(thread2Failed.get()); + Assertions.assertEquals("John", got.getLastName()); + } else if (!thread2Failed.get()) { + Assertions.assertEquals("John2", got.getLastName()); + } else { + Assertions.fail("No thread succeeded"); } } - @Test - void editConflict() { - { - var tx = _tx.beginTransaction(); - var parent = tx.getObject(new JObjectKey("Parent"), Parent.class); - parent.setName("John"); - tx.commit(); - } - - { - var tx = _tx.beginTransaction(); - var parent = tx.getObject(new JObjectKey("Parent"), Parent.class); - parent.setName("John2"); - - var tx2 = _tx.beginTransaction(); - var parent2 = tx2.getObject(new JObjectKey("Parent"), Parent.class); - parent2.setName("John3"); - - tx.commit(); - Assertions.assertThrows(Exception.class, tx2::commit); - } - - { - var tx2 = _tx.beginTransaction(); - var parent = tx2.getObject(new JObjectKey("Parent")); - Assertions.assertInstanceOf(Parent.class, parent); - Assertions.assertEquals("John2", ((Parent) parent).getName()); - } - } - - @Test - void nestedCreate() { - { - var tx = _tx.beginTransaction(); - var parent = tx.getObject(new JObjectKey("Parent"), Parent.class); - var kid = tx.getObject(new JObjectKey("Kid"), Kid.class); - parent.setName("John"); - kid.setName("KidName"); - parent.setKidKey(kid.getKey()); - tx.commit(); - } - - { - var tx2 = _tx.beginTransaction(); - var parent = tx2.getObject(new JObjectKey("Parent")); - Assertions.assertInstanceOf(Parent.class, parent); - Assertions.assertEquals("John", ((Parent) parent).getName()); - Assertions.assertEquals("KidName", ((Parent) parent).getKid().getName()); - } - } +// @Test +// void editConflict() { +// { +// var tx = _tx.beginTransaction(); +// var parent = tx.getObject(new JObjectKey("Parent"), Parent.class); +// parent.setName("John"); +// tx.commit(); +// } +// +// { +// var tx = _tx.beginTransaction(); +// var parent = tx.getObject(new JObjectKey("Parent"), Parent.class); +// parent.setName("John2"); +// +// var tx2 = _tx.beginTransaction(); +// var parent2 = tx2.getObject(new JObjectKey("Parent"), Parent.class); +// parent2.setName("John3"); +// +// tx.commit(); +// Assertions.assertThrows(Exception.class, tx2::commit); +// } +// +// { +// var tx2 = _tx.beginTransaction(); +// var parent = tx2.getObject(new JObjectKey("Parent")); +// Assertions.assertInstanceOf(Parent.class, parent); +// Assertions.assertEquals("John2", ((Parent) parent).getName()); +// } +// } +// +// @Test +// void nestedCreate() { +// { +// var tx = _tx.beginTransaction(); +// var parent = tx.getObject(new JObjectKey("Parent"), Parent.class); +// var kid = tx.getObject(new JObjectKey("Kid"), Kid.class); +// parent.setName("John"); +// kid.setName("KidName"); +// parent.setKidKey(kid.getKey()); +// tx.commit(); +// } +// +// { +// var tx2 = _tx.beginTransaction(); +// var parent = tx2.getObject(new JObjectKey("Parent")); +// Assertions.assertInstanceOf(Parent.class, parent); +// Assertions.assertEquals("John", ((Parent) parent).getName()); +// Assertions.assertEquals("KidName", ((Parent) parent).getKid().getName()); +// } +// } } diff --git a/dhfs-parent/objects/src/test/java/com/usatiuk/dhfs/objects/allocator/ChangeTrackerBase.java b/dhfs-parent/objects/src/test/java/com/usatiuk/dhfs/objects/allocator/ChangeTrackerBase.java new file mode 100644 index 00000000..597d944c --- /dev/null +++ b/dhfs-parent/objects/src/test/java/com/usatiuk/dhfs/objects/allocator/ChangeTrackerBase.java @@ -0,0 +1,14 @@ +package com.usatiuk.dhfs.objects.allocator; + +import com.usatiuk.dhfs.objects.JData; +import com.usatiuk.dhfs.objects.ObjectAllocator; +import lombok.Getter; + +public abstract class ChangeTrackerBase implements ObjectAllocator.ChangeTrackingJData { + @Getter + private boolean _modified = false; + + protected void onChange() { + _modified = true; + } +} diff --git a/dhfs-parent/objects/src/test/java/com/usatiuk/dhfs/objects/allocator/KidDataCT.java b/dhfs-parent/objects/src/test/java/com/usatiuk/dhfs/objects/allocator/KidDataCT.java new file mode 100644 index 00000000..3daee93b --- /dev/null +++ b/dhfs-parent/objects/src/test/java/com/usatiuk/dhfs/objects/allocator/KidDataCT.java @@ -0,0 +1,33 @@ +package com.usatiuk.dhfs.objects.allocator; + +import com.usatiuk.dhfs.objects.JObjectKey; +import com.usatiuk.dhfs.objects.data.Kid; +import lombok.Getter; + +public class KidDataCT extends ChangeTrackerBase implements Kid { + private final JObjectKey _key; + + @Getter + private String _name; + + @Override + public void setName(String name) { + _name = name; + onChange(); + } + + public KidDataCT(KidDataNormal normal) { + _key = normal.getKey(); + _name = normal.getName(); + } + + @Override + public JObjectKey getKey() { + return _key; + } + + @Override + public Kid wrapped() { + return this; + } +} diff --git a/dhfs-parent/objects/src/test/java/com/usatiuk/dhfs/objects/allocator/KidDataNormal.java b/dhfs-parent/objects/src/test/java/com/usatiuk/dhfs/objects/allocator/KidDataNormal.java new file mode 100644 index 00000000..600ec787 --- /dev/null +++ b/dhfs-parent/objects/src/test/java/com/usatiuk/dhfs/objects/allocator/KidDataNormal.java @@ -0,0 +1,23 @@ +package com.usatiuk.dhfs.objects.allocator; + +import com.usatiuk.dhfs.objects.JObjectKey; +import com.usatiuk.dhfs.objects.data.Kid; +import lombok.Getter; +import lombok.Setter; + +import java.io.Serializable; + +public class KidDataNormal implements Kid, Serializable { + private final JObjectKey _key; + + @Getter + @Setter + private String _name; + + public KidDataNormal(JObjectKey key) {_key = key;} + + @Override + public JObjectKey getKey() { + return _key; + } +} diff --git a/dhfs-parent/objects/src/test/java/com/usatiuk/dhfs/objects/allocator/ParentDataCT.java b/dhfs-parent/objects/src/test/java/com/usatiuk/dhfs/objects/allocator/ParentDataCT.java new file mode 100644 index 00000000..240624fe --- /dev/null +++ b/dhfs-parent/objects/src/test/java/com/usatiuk/dhfs/objects/allocator/ParentDataCT.java @@ -0,0 +1,40 @@ +package com.usatiuk.dhfs.objects.allocator; + +import com.usatiuk.dhfs.objects.JObjectKey; +import com.usatiuk.dhfs.objects.data.Parent; +import lombok.Getter; + +public class ParentDataCT extends ChangeTrackerBase implements Parent { + @Getter + private JObjectKey _name; + @Getter + private JObjectKey _kidKey; + @Getter + private String _lastName; + + public void setKidKey(JObjectKey key) { + _kidKey = key; + onChange(); + } + + public void setLastName(String lastName) { + _lastName = lastName; + onChange(); + } + + public ParentDataCT(ParentDataNormal normal) { + _name = normal.getKey(); + _kidKey = normal.getKidKey(); + _lastName = normal.getLastName(); + } + + @Override + public JObjectKey getKey() { + return _name; + } + + @Override + public Parent wrapped() { + return this; + } +} diff --git a/dhfs-parent/objects/src/test/java/com/usatiuk/dhfs/objects/allocator/ParentDataNormal.java b/dhfs-parent/objects/src/test/java/com/usatiuk/dhfs/objects/allocator/ParentDataNormal.java new file mode 100644 index 00000000..77943cf6 --- /dev/null +++ b/dhfs-parent/objects/src/test/java/com/usatiuk/dhfs/objects/allocator/ParentDataNormal.java @@ -0,0 +1,29 @@ +package com.usatiuk.dhfs.objects.allocator; + +import com.usatiuk.dhfs.objects.JObjectKey; +import com.usatiuk.dhfs.objects.data.Parent; +import lombok.Getter; +import lombok.Setter; + +import java.io.Serializable; + +public class ParentDataNormal implements Parent, Serializable { + @Getter + private JObjectKey _name; + @Getter + @Setter + private JObjectKey _kidKey; + @Getter + @Setter + private String _lastName; + + public ParentDataNormal(JObjectKey name) { + _name = name; + } + + @Override + public JObjectKey getKey() { + return _name; + } + +} diff --git a/dhfs-parent/objects/src/test/java/com/usatiuk/dhfs/objects/test/objs/TestData.java b/dhfs-parent/objects/src/test/java/com/usatiuk/dhfs/objects/allocator/TestData.java similarity index 93% rename from dhfs-parent/objects/src/test/java/com/usatiuk/dhfs/objects/test/objs/TestData.java rename to dhfs-parent/objects/src/test/java/com/usatiuk/dhfs/objects/allocator/TestData.java index 0bf25df1..e34db09d 100644 --- a/dhfs-parent/objects/src/test/java/com/usatiuk/dhfs/objects/test/objs/TestData.java +++ b/dhfs-parent/objects/src/test/java/com/usatiuk/dhfs/objects/allocator/TestData.java @@ -1,4 +1,4 @@ -package com.usatiuk.dhfs.objects.test.objs; +package com.usatiuk.dhfs.objects.allocator; import com.usatiuk.dhfs.objects.JData; import com.usatiuk.dhfs.objects.JObjectKey; diff --git a/dhfs-parent/objects/src/test/java/com/usatiuk/dhfs/objects/allocator/TestObjectAllocator.java b/dhfs-parent/objects/src/test/java/com/usatiuk/dhfs/objects/allocator/TestObjectAllocator.java new file mode 100644 index 00000000..a117e80d --- /dev/null +++ b/dhfs-parent/objects/src/test/java/com/usatiuk/dhfs/objects/allocator/TestObjectAllocator.java @@ -0,0 +1,40 @@ +package com.usatiuk.dhfs.objects.allocator; + +import com.usatiuk.dhfs.objects.JData; +import com.usatiuk.dhfs.objects.JObjectKey; +import com.usatiuk.dhfs.objects.ObjectAllocator; +import com.usatiuk.dhfs.objects.data.Kid; +import com.usatiuk.dhfs.objects.data.Parent; +import jakarta.enterprise.context.ApplicationScoped; + +@ApplicationScoped +public class TestObjectAllocator implements ObjectAllocator { + @Override + public T create(Class type, JObjectKey key) { + if (type == Kid.class) { + return type.cast(new KidDataNormal(key)); + } else if (type == Parent.class) { + return type.cast(new ParentDataNormal(key)); + } else { + throw new IllegalArgumentException("Unknown type: " + type); + } + } + + @Override + public ChangeTrackingJData copy(T obj) { + if (obj instanceof ChangeTrackerBase) { + throw new IllegalArgumentException("Cannot copy a ChangeTrackerBase object"); + } + + return switch (obj) { + case KidDataNormal kid -> (ChangeTrackingJData) new KidDataCT(kid); + case ParentDataNormal parent -> (ChangeTrackingJData) new ParentDataCT(parent); + default -> throw new IllegalStateException("Unexpected value: " + obj); + }; + } + + @Override + public T unmodifiable(T obj) { + return obj; // TODO: + } +} diff --git a/dhfs-parent/objects/src/test/java/com/usatiuk/dhfs/objects/data/Kid.java b/dhfs-parent/objects/src/test/java/com/usatiuk/dhfs/objects/data/Kid.java new file mode 100644 index 00000000..9c3df9db --- /dev/null +++ b/dhfs-parent/objects/src/test/java/com/usatiuk/dhfs/objects/data/Kid.java @@ -0,0 +1,9 @@ +package com.usatiuk.dhfs.objects.data; + +import com.usatiuk.dhfs.objects.JData; + +public interface Kid extends JData { + String getName(); + + void setName(String name); +} diff --git a/dhfs-parent/objects/src/test/java/com/usatiuk/dhfs/objects/data/Parent.java b/dhfs-parent/objects/src/test/java/com/usatiuk/dhfs/objects/data/Parent.java new file mode 100644 index 00000000..1067ea5d --- /dev/null +++ b/dhfs-parent/objects/src/test/java/com/usatiuk/dhfs/objects/data/Parent.java @@ -0,0 +1,15 @@ +package com.usatiuk.dhfs.objects.data; + +import com.usatiuk.dhfs.objects.JData; +import com.usatiuk.dhfs.objects.JObjectKey; + +public interface Parent extends JData { + JObjectKey getName(); + + String getLastName(); + void setLastName(String lastName); + + JObjectKey getKidKey(); + + void setKidKey(JObjectKey kid); +} diff --git a/dhfs-parent/objects/src/test/java/com/usatiuk/dhfs/objects/serializer/TestJDataSerializer.java b/dhfs-parent/objects/src/test/java/com/usatiuk/dhfs/objects/serializer/TestJDataSerializer.java new file mode 100644 index 00000000..388ffccd --- /dev/null +++ b/dhfs-parent/objects/src/test/java/com/usatiuk/dhfs/objects/serializer/TestJDataSerializer.java @@ -0,0 +1,23 @@ +package com.usatiuk.dhfs.objects.serializer; + + +import com.google.protobuf.ByteString; +import com.usatiuk.dhfs.objects.JData; +import com.usatiuk.dhfs.objects.ObjectSerializer; +import com.usatiuk.dhfs.utils.SerializationHelper; +import jakarta.enterprise.context.ApplicationScoped; + +import java.io.Serializable; + +@ApplicationScoped +public class TestJDataSerializer implements ObjectSerializer { + @Override + public ByteString serialize(JData obj) { + return SerializationHelper.serialize((Serializable) obj); + } + + @Override + public JData deserialize(ByteString data) { + return SerializationHelper.deserialize(data.toByteArray()); + } +} diff --git a/dhfs-parent/objects/src/test/java/com/usatiuk/dhfs/objects/test/objs/Kid.java b/dhfs-parent/objects/src/test/java/com/usatiuk/dhfs/objects/test/objs/Kid.java deleted file mode 100644 index b5696f25..00000000 --- a/dhfs-parent/objects/src/test/java/com/usatiuk/dhfs/objects/test/objs/Kid.java +++ /dev/null @@ -1,18 +0,0 @@ -package com.usatiuk.dhfs.objects.test.objs; - -import com.usatiuk.dhfs.objects.JData; -import com.usatiuk.dhfs.objects.JObject; -import com.usatiuk.dhfs.objects.transaction.Transaction; - -public class Kid extends JObject { - - public Kid(Transaction Transaction, KidData data) { - super(Transaction, data); - } - - @Override - public JData getData() { - return _data; - } - -} diff --git a/dhfs-parent/objects/src/test/java/com/usatiuk/dhfs/objects/test/objs/KidData.java b/dhfs-parent/objects/src/test/java/com/usatiuk/dhfs/objects/test/objs/KidData.java deleted file mode 100644 index bf49e1e7..00000000 --- a/dhfs-parent/objects/src/test/java/com/usatiuk/dhfs/objects/test/objs/KidData.java +++ /dev/null @@ -1,19 +0,0 @@ -package com.usatiuk.dhfs.objects.test.objs; - -import com.usatiuk.dhfs.objects.JData; -import com.usatiuk.dhfs.objects.JObject; -import com.usatiuk.dhfs.objects.transaction.Transaction; - -import java.util.function.Function; - -public interface KidData extends JData { - String getName(); - - void setName(String name); - - KidData bindCopy(); - - default Function binder(boolean isLocked) { - return jo -> new Kid(jo, bindCopy()); - } -} diff --git a/dhfs-parent/objects/src/test/java/com/usatiuk/dhfs/objects/test/objs/KidDataImpl.java b/dhfs-parent/objects/src/test/java/com/usatiuk/dhfs/objects/test/objs/KidDataImpl.java deleted file mode 100644 index 48b8baf6..00000000 --- a/dhfs-parent/objects/src/test/java/com/usatiuk/dhfs/objects/test/objs/KidDataImpl.java +++ /dev/null @@ -1,28 +0,0 @@ -package com.usatiuk.dhfs.objects.test.objs; - -import com.usatiuk.dhfs.objects.JObjectKey; - -public class KidDataImpl extends TestData implements KidData { - private String _name; - - public KidDataImpl(long version, JObjectKey key, String name) { - super(version, key); - _name = name; - } - - @Override - public String getName() { - return _name; - } - - @Override - public void setName(String name) { - _name = name; - onChanged(); - } - - @Override - public KidDataImpl copy() { - return new KidDataImpl(isChanged() ? getVersion() + 1 : getVersion(), getKey(), _name); - } -} diff --git a/dhfs-parent/objects/src/test/java/com/usatiuk/dhfs/objects/test/objs/Parent.java b/dhfs-parent/objects/src/test/java/com/usatiuk/dhfs/objects/test/objs/Parent.java deleted file mode 100644 index 7b90597e..00000000 --- a/dhfs-parent/objects/src/test/java/com/usatiuk/dhfs/objects/test/objs/Parent.java +++ /dev/null @@ -1,25 +0,0 @@ -package com.usatiuk.dhfs.objects.test.objs; - -import com.usatiuk.dhfs.objects.JData; -import com.usatiuk.dhfs.objects.JObject; -import com.usatiuk.dhfs.objects.transaction.Transaction; -import lombok.experimental.Delegate; - -public class Parent extends JObject { - @Delegate - private final ParentData _data; - - public Parent(Transaction Transaction, ParentData data) { - super(Transaction); - _data = data; - } - - @Override - public JData getData() { - return _data; - } - - public Kid getKid() { - return _jObjectInterface.getObject(_data.getKidKey(), Kid.class); - } -} diff --git a/dhfs-parent/objects/src/test/java/com/usatiuk/dhfs/objects/test/objs/ParentData.java b/dhfs-parent/objects/src/test/java/com/usatiuk/dhfs/objects/test/objs/ParentData.java deleted file mode 100644 index b3f0e76f..00000000 --- a/dhfs-parent/objects/src/test/java/com/usatiuk/dhfs/objects/test/objs/ParentData.java +++ /dev/null @@ -1,14 +0,0 @@ -package com.usatiuk.dhfs.objects.test.objs; - -import com.usatiuk.dhfs.objects.JData; -import com.usatiuk.dhfs.objects.JObjectKey; - -public interface ParentData extends JData { - String getName(); - - void setName(String name); - - JObjectKey getKidKey(); - - void setKidKey(JObjectKey kid); -} diff --git a/dhfs-parent/objects/src/test/java/com/usatiuk/dhfs/objects/test/objs/ParentDataImpl.java b/dhfs-parent/objects/src/test/java/com/usatiuk/dhfs/objects/test/objs/ParentDataImpl.java deleted file mode 100644 index c77a0020..00000000 --- a/dhfs-parent/objects/src/test/java/com/usatiuk/dhfs/objects/test/objs/ParentDataImpl.java +++ /dev/null @@ -1,41 +0,0 @@ -package com.usatiuk.dhfs.objects.test.objs; - -import com.usatiuk.dhfs.objects.JObjectKey; - -public class ParentDataImpl extends TestData implements ParentData { - private String _name; - private JObjectKey _kidKey; - - public ParentDataImpl(long version, JObjectKey key, String name, JObjectKey kidKey) { - super(version, key); - _name = name; - _kidKey = kidKey; - } - - @Override - public String getName() { - return _name; - } - - @Override - public void setName(String name) { - _name = name; - onChanged(); - } - - @Override - public JObjectKey getKidKey() { - return _kidKey; - } - - @Override - public void setKidKey(JObjectKey kid) { - _kidKey = kid; - onChanged(); - } - - @Override - public ParentDataImpl copy() { - return new ParentDataImpl(isChanged() ? getVersion() + 1 : getVersion(), getKey(), _name, _kidKey); - } -} diff --git a/dhfs-parent/objects/src/test/resources/application.properties b/dhfs-parent/objects/src/test/resources/application.properties new file mode 100644 index 00000000..1b0d9d26 --- /dev/null +++ b/dhfs-parent/objects/src/test/resources/application.properties @@ -0,0 +1 @@ +dhfs.objects.persistence=memory \ No newline at end of file