diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/stores/CachingObjectPersistentStore.java b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/stores/CachingObjectPersistentStore.java index 6de4686f..d78fb00b 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/stores/CachingObjectPersistentStore.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/stores/CachingObjectPersistentStore.java @@ -42,13 +42,16 @@ public class CachingObjectPersistentStore { if (old != null) newSize -= old.size(); - TreePMap newCache = map().plus(key, entry); + TreePMap newCache = map(); while (newSize > sizeLimit) { var del = newCache.firstEntry(); newCache = newCache.minusFirstEntry(); newSize -= del.getValue().size(); } + + newCache = newCache.plus(key, entry); + return new Cache( newCache, newSize, diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/transaction/CurrentTransaction.java b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/transaction/CurrentTransaction.java index 7b02d6b1..ea7db5cc 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/transaction/CurrentTransaction.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/transaction/CurrentTransaction.java @@ -43,4 +43,9 @@ public class CurrentTransaction implements Transaction { public void put(JData obj) { transactionManager.current().put(obj); } + + @Override + public void putNew(JData obj) { + transactionManager.current().putNew(obj); + } } diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/transaction/JObjectManager.java b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/transaction/JObjectManager.java index 4e43ec72..a244c84a 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/transaction/JObjectManager.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/transaction/JObjectManager.java @@ -1,11 +1,11 @@ package com.usatiuk.objects.transaction; +import com.usatiuk.dhfs.utils.AutoCloseableNoThrow; import com.usatiuk.objects.JData; import com.usatiuk.objects.JDataVersionedWrapper; import com.usatiuk.objects.JObjectKey; import com.usatiuk.objects.snapshot.Snapshot; import com.usatiuk.objects.snapshot.SnapshotManager; -import com.usatiuk.dhfs.utils.AutoCloseableNoThrow; import io.quarkus.logging.Log; import io.quarkus.runtime.StartupEvent; import jakarta.annotation.Priority; @@ -16,7 +16,6 @@ import jakarta.inject.Inject; import org.apache.commons.lang3.tuple.Pair; import java.util.*; -import java.util.function.Consumer; import java.util.function.Function; import java.util.stream.Stream; @@ -106,10 +105,17 @@ public class JObjectManager { for (var entry : curIteration.entrySet()) { // Log.trace("Running pre-commit hook " + hook.getClass() + " for" + entry.getKey()); + + if (entry.getValue() instanceof TxRecord.TxObjectRecordNewWrite newWrite) { + lastCurHookSeen.put(entry.getKey(), entry.getValue()); + hook.onCreate(newWrite.key(), newWrite.data()); + continue; + } + var oldObj = getPrev.apply(entry.getKey()); lastCurHookSeen.put(entry.getKey(), entry.getValue()); switch (entry.getValue()) { - case TxRecord.TxObjectRecordWrite write -> { + case TxRecord.TxObjectRecordWriteChecked write -> { if (oldObj == null) { hook.onCreate(write.key(), write.data()); } else { @@ -221,8 +227,10 @@ public class JObjectManager { writes.values().stream() .filter(r -> { if (!same) - if (r instanceof TxRecord.TxObjectRecordWrite(JData data)) { + if (r instanceof TxRecord.TxObjectRecordWriteChecked(JData data)) { var dep = dependenciesLocked.get(data.key()); + if (dep == null) + return true; if (dep.isPresent() && dep.get().version() > snapshotId) { Log.trace("Skipping write " + data.key() + " - dependency " + dep.get().version() + " vs " + snapshotId); return false; diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/transaction/Transaction.java b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/transaction/Transaction.java index 216aae0b..81d88bd2 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/transaction/Transaction.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/transaction/Transaction.java @@ -14,6 +14,7 @@ public interface Transaction extends TransactionHandle { Optional get(Class type, JObjectKey key, LockingStrategy strategy); void put(JData obj); + void putNew(JData obj); void delete(JObjectKey key); diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/transaction/TransactionFactoryImpl.java b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/transaction/TransactionFactoryImpl.java index 0d7bec9e..8405fc41 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/transaction/TransactionFactoryImpl.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/transaction/TransactionFactoryImpl.java @@ -57,6 +57,7 @@ public class TransactionFactoryImpl implements TransactionFactory { private class TransactionImpl implements TransactionPrivate { private final Map> _readSet = new HashMap<>(); private final NavigableMap> _writes = new TreeMap<>(); + private final HashSet _totallyNew = new HashSet<>(); private final List _onCommit = new ArrayList<>(); private final List _onFlush = new ArrayList<>(); private final Snapshot _snapshot; @@ -97,8 +98,13 @@ public class TransactionFactoryImpl implements TransactionFactory { var got = _readSet.get(key); if (got == null) { + if (_totallyNew.contains(key)) { + return Optional.empty(); + } + var read = _snapshot.readObject(key); _readSet.put(key, new TransactionObjectNoLock<>(read)); +// Log.infov("Read object {0} from source, type {1}", key, type); return read.map(JDataVersionedWrapper::data).map(type::cast); } @@ -175,8 +181,15 @@ public class TransactionFactoryImpl implements TransactionFactory { @Override public void put(JData obj) { - _writes.put(obj.key(), new TxRecord.TxObjectRecordWrite<>(obj)); - _newWrites.put(obj.key(), new TxRecord.TxObjectRecordWrite<>(obj)); + _writes.put(obj.key(), new TxRecord.TxObjectRecordWriteChecked<>(obj)); + _newWrites.put(obj.key(), new TxRecord.TxObjectRecordWriteChecked<>(obj)); + } + + @Override + public void putNew(JData obj) { + _writes.put(obj.key(), new TxRecord.TxObjectRecordNewWrite<>(obj)); + _newWrites.put(obj.key(), new TxRecord.TxObjectRecordNewWrite<>(obj)); + _totallyNew.add(obj.key()); } @Override diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/transaction/TxRecord.java b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/transaction/TxRecord.java index e8661bb6..b0714422 100644 --- a/dhfs-parent/objects/src/main/java/com/usatiuk/objects/transaction/TxRecord.java +++ b/dhfs-parent/objects/src/main/java/com/usatiuk/objects/transaction/TxRecord.java @@ -8,7 +8,18 @@ public class TxRecord { JObjectKey key(); } - public record TxObjectRecordWrite(JData data) implements TxObjectRecord { + public interface TxObjectRecordWrite extends TxObjectRecord { + JData data(); + } + + public record TxObjectRecordWriteChecked(JData data) implements TxObjectRecordWrite { + @Override + public JObjectKey key() { + return data.key(); + } + } + + public record TxObjectRecordNewWrite(JData data) implements TxObjectRecordWrite { @Override public JObjectKey key() { return data.key(); @@ -17,4 +28,5 @@ public class TxRecord { public record TxObjectRecordDeleted(JObjectKey key) implements TxObjectRecord { } + } diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/RemoteTransaction.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/RemoteTransaction.java index 37f1b5cb..a3f25e6f 100644 --- a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/RemoteTransaction.java +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/RemoteTransaction.java @@ -1,9 +1,9 @@ package com.usatiuk.dhfs; -import com.usatiuk.objects.JObjectKey; import com.usatiuk.dhfs.repository.PersistentPeerDataService; import com.usatiuk.dhfs.repository.RemoteObjectServiceClient; import com.usatiuk.dhfs.repository.SyncHandler; +import com.usatiuk.objects.JObjectKey; import com.usatiuk.objects.transaction.LockingStrategy; import com.usatiuk.objects.transaction.Transaction; import io.quarkus.logging.Log; @@ -123,6 +123,11 @@ public class RemoteTransaction { curTx.put(newData); } + public void putDataNew(T obj) { + curTx.putNew(new RemoteObjectMeta(obj, persistentPeerDataService.getSelfUuid())); + curTx.putNew(new RemoteObjectDataWrapper<>(obj)); + } + public Optional getMeta(JObjectKey key) { return getMeta(key, LockingStrategy.OPTIMISTIC); } diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/files/service/DhfsFileServiceImpl.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/files/service/DhfsFileServiceImpl.java index 96125217..a103aa7a 100644 --- a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/files/service/DhfsFileServiceImpl.java +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/files/service/DhfsFileServiceImpl.java @@ -81,7 +81,7 @@ public class DhfsFileServiceImpl implements DhfsFileService { private ChunkData createChunk(ByteString bytes) { var newChunk = new ChunkData(JObjectKey.of(UUID.randomUUID().toString()), bytes); - remoteTx.putData(newChunk); + remoteTx.putDataNew(newChunk); return newChunk; }