mirror of
				https://github.com/usatiuk/dhfs.git
				synced 2025-10-28 20:47:49 +01:00 
			
		
		
		
	Compare commits
	
		
			1 Commits
		
	
	
		
			javadocs
			...
			better-wri
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| 4bd7266c89 | 
| @@ -42,13 +42,16 @@ public class CachingObjectPersistentStore { | ||||
|             if (old != null) | ||||
|                 newSize -= old.size(); | ||||
|  | ||||
|             TreePMap<JObjectKey, CacheEntry> newCache = map().plus(key, entry); | ||||
|             TreePMap<JObjectKey, CacheEntry> newCache = map(); | ||||
|  | ||||
|             while (newSize > sizeLimit) { | ||||
|                 var del = newCache.firstEntry(); | ||||
|                 newCache = newCache.minusFirstEntry(); | ||||
|                 newSize -= del.getValue().size(); | ||||
|             } | ||||
|  | ||||
|             newCache = newCache.plus(key, entry); | ||||
|  | ||||
|             return new Cache( | ||||
|                     newCache, | ||||
|                     newSize, | ||||
|   | ||||
| @@ -43,4 +43,9 @@ public class CurrentTransaction implements Transaction { | ||||
|     public <T extends JData> void put(JData obj) { | ||||
|         transactionManager.current().put(obj); | ||||
|     } | ||||
|  | ||||
|     @Override | ||||
|     public <T extends JData> void putNew(JData obj) { | ||||
|         transactionManager.current().putNew(obj); | ||||
|     } | ||||
| } | ||||
|   | ||||
| @@ -1,11 +1,11 @@ | ||||
| package com.usatiuk.objects.transaction; | ||||
|  | ||||
| import com.usatiuk.dhfs.utils.AutoCloseableNoThrow; | ||||
| import com.usatiuk.objects.JData; | ||||
| import com.usatiuk.objects.JDataVersionedWrapper; | ||||
| import com.usatiuk.objects.JObjectKey; | ||||
| import com.usatiuk.objects.snapshot.Snapshot; | ||||
| import com.usatiuk.objects.snapshot.SnapshotManager; | ||||
| import com.usatiuk.dhfs.utils.AutoCloseableNoThrow; | ||||
| import io.quarkus.logging.Log; | ||||
| import io.quarkus.runtime.StartupEvent; | ||||
| import jakarta.annotation.Priority; | ||||
| @@ -16,7 +16,6 @@ import jakarta.inject.Inject; | ||||
| import org.apache.commons.lang3.tuple.Pair; | ||||
|  | ||||
| import java.util.*; | ||||
| import java.util.function.Consumer; | ||||
| import java.util.function.Function; | ||||
| import java.util.stream.Stream; | ||||
|  | ||||
| @@ -106,10 +105,17 @@ public class JObjectManager { | ||||
|  | ||||
|                         for (var entry : curIteration.entrySet()) { | ||||
| //                            Log.trace("Running pre-commit hook " + hook.getClass() + " for" + entry.getKey()); | ||||
|  | ||||
|                             if (entry.getValue() instanceof TxRecord.TxObjectRecordNewWrite<?> newWrite) { | ||||
|                                 lastCurHookSeen.put(entry.getKey(), entry.getValue()); | ||||
|                                 hook.onCreate(newWrite.key(), newWrite.data()); | ||||
|                                 continue; | ||||
|                             } | ||||
|  | ||||
|                             var oldObj = getPrev.apply(entry.getKey()); | ||||
|                             lastCurHookSeen.put(entry.getKey(), entry.getValue()); | ||||
|                             switch (entry.getValue()) { | ||||
|                                 case TxRecord.TxObjectRecordWrite<?> write -> { | ||||
|                                 case TxRecord.TxObjectRecordWriteChecked<?> write -> { | ||||
|                                     if (oldObj == null) { | ||||
|                                         hook.onCreate(write.key(), write.data()); | ||||
|                                     } else { | ||||
| @@ -221,8 +227,10 @@ public class JObjectManager { | ||||
|                     writes.values().stream() | ||||
|                             .filter(r -> { | ||||
|                                 if (!same) | ||||
|                                     if (r instanceof TxRecord.TxObjectRecordWrite<?>(JData data)) { | ||||
|                                     if (r instanceof TxRecord.TxObjectRecordWriteChecked<?>(JData data)) { | ||||
|                                         var dep = dependenciesLocked.get(data.key()); | ||||
|                                         if (dep == null) | ||||
|                                             return true; | ||||
|                                         if (dep.isPresent() && dep.get().version() > snapshotId) { | ||||
|                                             Log.trace("Skipping write " + data.key() + " - dependency " + dep.get().version() + " vs " + snapshotId); | ||||
|                                             return false; | ||||
|   | ||||
| @@ -14,6 +14,7 @@ public interface Transaction extends TransactionHandle { | ||||
|     <T extends JData> Optional<T> get(Class<T> type, JObjectKey key, LockingStrategy strategy); | ||||
|  | ||||
|     <T extends JData> void put(JData obj); | ||||
|     <T extends JData> void putNew(JData obj); | ||||
|  | ||||
|     void delete(JObjectKey key); | ||||
|  | ||||
|   | ||||
| @@ -57,6 +57,7 @@ public class TransactionFactoryImpl implements TransactionFactory { | ||||
|     private class TransactionImpl implements TransactionPrivate { | ||||
|         private final Map<JObjectKey, TransactionObject<?>> _readSet = new HashMap<>(); | ||||
|         private final NavigableMap<JObjectKey, TxRecord.TxObjectRecord<?>> _writes = new TreeMap<>(); | ||||
|         private final HashSet<JObjectKey> _totallyNew = new HashSet<>(); | ||||
|         private final List<Runnable> _onCommit = new ArrayList<>(); | ||||
|         private final List<Runnable> _onFlush = new ArrayList<>(); | ||||
|         private final Snapshot<JObjectKey, JDataVersionedWrapper> _snapshot; | ||||
| @@ -97,8 +98,13 @@ public class TransactionFactoryImpl implements TransactionFactory { | ||||
|             var got = _readSet.get(key); | ||||
|  | ||||
|             if (got == null) { | ||||
|                 if (_totallyNew.contains(key)) { | ||||
|                     return Optional.empty(); | ||||
|                 } | ||||
|  | ||||
|                 var read = _snapshot.readObject(key); | ||||
|                 _readSet.put(key, new TransactionObjectNoLock<>(read)); | ||||
| //                Log.infov("Read object {0} from source, type {1}", key, type); | ||||
|                 return read.map(JDataVersionedWrapper::data).map(type::cast); | ||||
|             } | ||||
|  | ||||
| @@ -175,8 +181,15 @@ public class TransactionFactoryImpl implements TransactionFactory { | ||||
|  | ||||
|         @Override | ||||
|         public void put(JData obj) { | ||||
|             _writes.put(obj.key(), new TxRecord.TxObjectRecordWrite<>(obj)); | ||||
|             _newWrites.put(obj.key(), new TxRecord.TxObjectRecordWrite<>(obj)); | ||||
|             _writes.put(obj.key(), new TxRecord.TxObjectRecordWriteChecked<>(obj)); | ||||
|             _newWrites.put(obj.key(), new TxRecord.TxObjectRecordWriteChecked<>(obj)); | ||||
|         } | ||||
|  | ||||
|         @Override | ||||
|         public <T extends JData> void putNew(JData obj) { | ||||
|             _writes.put(obj.key(), new TxRecord.TxObjectRecordNewWrite<>(obj)); | ||||
|             _newWrites.put(obj.key(), new TxRecord.TxObjectRecordNewWrite<>(obj)); | ||||
|             _totallyNew.add(obj.key()); | ||||
|         } | ||||
|  | ||||
|         @Override | ||||
|   | ||||
| @@ -8,7 +8,18 @@ public class TxRecord { | ||||
|         JObjectKey key(); | ||||
|     } | ||||
|  | ||||
|     public record TxObjectRecordWrite<T extends JData>(JData data) implements TxObjectRecord<T> { | ||||
|     public interface TxObjectRecordWrite<T> extends TxObjectRecord<T> { | ||||
|         JData data(); | ||||
|     } | ||||
|  | ||||
|     public record TxObjectRecordWriteChecked<T extends JData>(JData data) implements TxObjectRecordWrite<T> { | ||||
|         @Override | ||||
|         public JObjectKey key() { | ||||
|             return data.key(); | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     public record TxObjectRecordNewWrite<T extends JData>(JData data) implements TxObjectRecordWrite<T> { | ||||
|         @Override | ||||
|         public JObjectKey key() { | ||||
|             return data.key(); | ||||
| @@ -17,4 +28,5 @@ public class TxRecord { | ||||
|  | ||||
|     public record TxObjectRecordDeleted(JObjectKey key) implements TxObjectRecord<JData> { | ||||
|     } | ||||
|  | ||||
| } | ||||
|   | ||||
| @@ -1,9 +1,9 @@ | ||||
| package com.usatiuk.dhfs; | ||||
|  | ||||
| import com.usatiuk.objects.JObjectKey; | ||||
| import com.usatiuk.dhfs.repository.PersistentPeerDataService; | ||||
| import com.usatiuk.dhfs.repository.RemoteObjectServiceClient; | ||||
| import com.usatiuk.dhfs.repository.SyncHandler; | ||||
| import com.usatiuk.objects.JObjectKey; | ||||
| import com.usatiuk.objects.transaction.LockingStrategy; | ||||
| import com.usatiuk.objects.transaction.Transaction; | ||||
| import io.quarkus.logging.Log; | ||||
| @@ -123,6 +123,11 @@ public class RemoteTransaction { | ||||
|         curTx.put(newData); | ||||
|     } | ||||
|  | ||||
|     public <T extends JDataRemote> void putDataNew(T obj) { | ||||
|         curTx.putNew(new RemoteObjectMeta(obj, persistentPeerDataService.getSelfUuid())); | ||||
|         curTx.putNew(new RemoteObjectDataWrapper<>(obj)); | ||||
|     } | ||||
|  | ||||
|     public Optional<RemoteObjectMeta> getMeta(JObjectKey key) { | ||||
|         return getMeta(key, LockingStrategy.OPTIMISTIC); | ||||
|     } | ||||
|   | ||||
| @@ -81,7 +81,7 @@ public class DhfsFileServiceImpl implements DhfsFileService { | ||||
|  | ||||
|     private ChunkData createChunk(ByteString bytes) { | ||||
|         var newChunk = new ChunkData(JObjectKey.of(UUID.randomUUID().toString()), bytes); | ||||
|         remoteTx.putData(newChunk); | ||||
|         remoteTx.putDataNew(newChunk); | ||||
|         return newChunk; | ||||
|     } | ||||
|  | ||||
|   | ||||
		Reference in New Issue
	
	Block a user