mirror of
https://github.com/usatiuk/dhfs.git
synced 2025-10-29 04:57:48 +01:00
Compare commits
1 Commits
469a6b9011
...
better-wri
| Author | SHA1 | Date | |
|---|---|---|---|
| 4bd7266c89 |
@@ -42,13 +42,16 @@ public class CachingObjectPersistentStore {
|
|||||||
if (old != null)
|
if (old != null)
|
||||||
newSize -= old.size();
|
newSize -= old.size();
|
||||||
|
|
||||||
TreePMap<JObjectKey, CacheEntry> newCache = map().plus(key, entry);
|
TreePMap<JObjectKey, CacheEntry> newCache = map();
|
||||||
|
|
||||||
while (newSize > sizeLimit) {
|
while (newSize > sizeLimit) {
|
||||||
var del = newCache.firstEntry();
|
var del = newCache.firstEntry();
|
||||||
newCache = newCache.minusFirstEntry();
|
newCache = newCache.minusFirstEntry();
|
||||||
newSize -= del.getValue().size();
|
newSize -= del.getValue().size();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
newCache = newCache.plus(key, entry);
|
||||||
|
|
||||||
return new Cache(
|
return new Cache(
|
||||||
newCache,
|
newCache,
|
||||||
newSize,
|
newSize,
|
||||||
|
|||||||
@@ -43,4 +43,9 @@ public class CurrentTransaction implements Transaction {
|
|||||||
public <T extends JData> void put(JData obj) {
|
public <T extends JData> void put(JData obj) {
|
||||||
transactionManager.current().put(obj);
|
transactionManager.current().put(obj);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public <T extends JData> void putNew(JData obj) {
|
||||||
|
transactionManager.current().putNew(obj);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,11 +1,11 @@
|
|||||||
package com.usatiuk.objects.transaction;
|
package com.usatiuk.objects.transaction;
|
||||||
|
|
||||||
|
import com.usatiuk.dhfs.utils.AutoCloseableNoThrow;
|
||||||
import com.usatiuk.objects.JData;
|
import com.usatiuk.objects.JData;
|
||||||
import com.usatiuk.objects.JDataVersionedWrapper;
|
import com.usatiuk.objects.JDataVersionedWrapper;
|
||||||
import com.usatiuk.objects.JObjectKey;
|
import com.usatiuk.objects.JObjectKey;
|
||||||
import com.usatiuk.objects.snapshot.Snapshot;
|
import com.usatiuk.objects.snapshot.Snapshot;
|
||||||
import com.usatiuk.objects.snapshot.SnapshotManager;
|
import com.usatiuk.objects.snapshot.SnapshotManager;
|
||||||
import com.usatiuk.dhfs.utils.AutoCloseableNoThrow;
|
|
||||||
import io.quarkus.logging.Log;
|
import io.quarkus.logging.Log;
|
||||||
import io.quarkus.runtime.StartupEvent;
|
import io.quarkus.runtime.StartupEvent;
|
||||||
import jakarta.annotation.Priority;
|
import jakarta.annotation.Priority;
|
||||||
@@ -16,7 +16,6 @@ import jakarta.inject.Inject;
|
|||||||
import org.apache.commons.lang3.tuple.Pair;
|
import org.apache.commons.lang3.tuple.Pair;
|
||||||
|
|
||||||
import java.util.*;
|
import java.util.*;
|
||||||
import java.util.function.Consumer;
|
|
||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
import java.util.stream.Stream;
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
@@ -106,10 +105,17 @@ public class JObjectManager {
|
|||||||
|
|
||||||
for (var entry : curIteration.entrySet()) {
|
for (var entry : curIteration.entrySet()) {
|
||||||
// Log.trace("Running pre-commit hook " + hook.getClass() + " for" + entry.getKey());
|
// Log.trace("Running pre-commit hook " + hook.getClass() + " for" + entry.getKey());
|
||||||
|
|
||||||
|
if (entry.getValue() instanceof TxRecord.TxObjectRecordNewWrite<?> newWrite) {
|
||||||
|
lastCurHookSeen.put(entry.getKey(), entry.getValue());
|
||||||
|
hook.onCreate(newWrite.key(), newWrite.data());
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
var oldObj = getPrev.apply(entry.getKey());
|
var oldObj = getPrev.apply(entry.getKey());
|
||||||
lastCurHookSeen.put(entry.getKey(), entry.getValue());
|
lastCurHookSeen.put(entry.getKey(), entry.getValue());
|
||||||
switch (entry.getValue()) {
|
switch (entry.getValue()) {
|
||||||
case TxRecord.TxObjectRecordWrite<?> write -> {
|
case TxRecord.TxObjectRecordWriteChecked<?> write -> {
|
||||||
if (oldObj == null) {
|
if (oldObj == null) {
|
||||||
hook.onCreate(write.key(), write.data());
|
hook.onCreate(write.key(), write.data());
|
||||||
} else {
|
} else {
|
||||||
@@ -221,8 +227,10 @@ public class JObjectManager {
|
|||||||
writes.values().stream()
|
writes.values().stream()
|
||||||
.filter(r -> {
|
.filter(r -> {
|
||||||
if (!same)
|
if (!same)
|
||||||
if (r instanceof TxRecord.TxObjectRecordWrite<?>(JData data)) {
|
if (r instanceof TxRecord.TxObjectRecordWriteChecked<?>(JData data)) {
|
||||||
var dep = dependenciesLocked.get(data.key());
|
var dep = dependenciesLocked.get(data.key());
|
||||||
|
if (dep == null)
|
||||||
|
return true;
|
||||||
if (dep.isPresent() && dep.get().version() > snapshotId) {
|
if (dep.isPresent() && dep.get().version() > snapshotId) {
|
||||||
Log.trace("Skipping write " + data.key() + " - dependency " + dep.get().version() + " vs " + snapshotId);
|
Log.trace("Skipping write " + data.key() + " - dependency " + dep.get().version() + " vs " + snapshotId);
|
||||||
return false;
|
return false;
|
||||||
|
|||||||
@@ -14,6 +14,7 @@ public interface Transaction extends TransactionHandle {
|
|||||||
<T extends JData> Optional<T> get(Class<T> type, JObjectKey key, LockingStrategy strategy);
|
<T extends JData> Optional<T> get(Class<T> type, JObjectKey key, LockingStrategy strategy);
|
||||||
|
|
||||||
<T extends JData> void put(JData obj);
|
<T extends JData> void put(JData obj);
|
||||||
|
<T extends JData> void putNew(JData obj);
|
||||||
|
|
||||||
void delete(JObjectKey key);
|
void delete(JObjectKey key);
|
||||||
|
|
||||||
|
|||||||
@@ -57,6 +57,7 @@ public class TransactionFactoryImpl implements TransactionFactory {
|
|||||||
private class TransactionImpl implements TransactionPrivate {
|
private class TransactionImpl implements TransactionPrivate {
|
||||||
private final Map<JObjectKey, TransactionObject<?>> _readSet = new HashMap<>();
|
private final Map<JObjectKey, TransactionObject<?>> _readSet = new HashMap<>();
|
||||||
private final NavigableMap<JObjectKey, TxRecord.TxObjectRecord<?>> _writes = new TreeMap<>();
|
private final NavigableMap<JObjectKey, TxRecord.TxObjectRecord<?>> _writes = new TreeMap<>();
|
||||||
|
private final HashSet<JObjectKey> _totallyNew = new HashSet<>();
|
||||||
private final List<Runnable> _onCommit = new ArrayList<>();
|
private final List<Runnable> _onCommit = new ArrayList<>();
|
||||||
private final List<Runnable> _onFlush = new ArrayList<>();
|
private final List<Runnable> _onFlush = new ArrayList<>();
|
||||||
private final Snapshot<JObjectKey, JDataVersionedWrapper> _snapshot;
|
private final Snapshot<JObjectKey, JDataVersionedWrapper> _snapshot;
|
||||||
@@ -97,8 +98,13 @@ public class TransactionFactoryImpl implements TransactionFactory {
|
|||||||
var got = _readSet.get(key);
|
var got = _readSet.get(key);
|
||||||
|
|
||||||
if (got == null) {
|
if (got == null) {
|
||||||
|
if (_totallyNew.contains(key)) {
|
||||||
|
return Optional.empty();
|
||||||
|
}
|
||||||
|
|
||||||
var read = _snapshot.readObject(key);
|
var read = _snapshot.readObject(key);
|
||||||
_readSet.put(key, new TransactionObjectNoLock<>(read));
|
_readSet.put(key, new TransactionObjectNoLock<>(read));
|
||||||
|
// Log.infov("Read object {0} from source, type {1}", key, type);
|
||||||
return read.map(JDataVersionedWrapper::data).map(type::cast);
|
return read.map(JDataVersionedWrapper::data).map(type::cast);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -175,8 +181,15 @@ public class TransactionFactoryImpl implements TransactionFactory {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void put(JData obj) {
|
public void put(JData obj) {
|
||||||
_writes.put(obj.key(), new TxRecord.TxObjectRecordWrite<>(obj));
|
_writes.put(obj.key(), new TxRecord.TxObjectRecordWriteChecked<>(obj));
|
||||||
_newWrites.put(obj.key(), new TxRecord.TxObjectRecordWrite<>(obj));
|
_newWrites.put(obj.key(), new TxRecord.TxObjectRecordWriteChecked<>(obj));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public <T extends JData> void putNew(JData obj) {
|
||||||
|
_writes.put(obj.key(), new TxRecord.TxObjectRecordNewWrite<>(obj));
|
||||||
|
_newWrites.put(obj.key(), new TxRecord.TxObjectRecordNewWrite<>(obj));
|
||||||
|
_totallyNew.add(obj.key());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|||||||
@@ -8,7 +8,18 @@ public class TxRecord {
|
|||||||
JObjectKey key();
|
JObjectKey key();
|
||||||
}
|
}
|
||||||
|
|
||||||
public record TxObjectRecordWrite<T extends JData>(JData data) implements TxObjectRecord<T> {
|
public interface TxObjectRecordWrite<T> extends TxObjectRecord<T> {
|
||||||
|
JData data();
|
||||||
|
}
|
||||||
|
|
||||||
|
public record TxObjectRecordWriteChecked<T extends JData>(JData data) implements TxObjectRecordWrite<T> {
|
||||||
|
@Override
|
||||||
|
public JObjectKey key() {
|
||||||
|
return data.key();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public record TxObjectRecordNewWrite<T extends JData>(JData data) implements TxObjectRecordWrite<T> {
|
||||||
@Override
|
@Override
|
||||||
public JObjectKey key() {
|
public JObjectKey key() {
|
||||||
return data.key();
|
return data.key();
|
||||||
@@ -17,4 +28,5 @@ public class TxRecord {
|
|||||||
|
|
||||||
public record TxObjectRecordDeleted(JObjectKey key) implements TxObjectRecord<JData> {
|
public record TxObjectRecordDeleted(JObjectKey key) implements TxObjectRecord<JData> {
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,9 +1,9 @@
|
|||||||
package com.usatiuk.dhfs;
|
package com.usatiuk.dhfs;
|
||||||
|
|
||||||
import com.usatiuk.objects.JObjectKey;
|
|
||||||
import com.usatiuk.dhfs.repository.PersistentPeerDataService;
|
import com.usatiuk.dhfs.repository.PersistentPeerDataService;
|
||||||
import com.usatiuk.dhfs.repository.RemoteObjectServiceClient;
|
import com.usatiuk.dhfs.repository.RemoteObjectServiceClient;
|
||||||
import com.usatiuk.dhfs.repository.SyncHandler;
|
import com.usatiuk.dhfs.repository.SyncHandler;
|
||||||
|
import com.usatiuk.objects.JObjectKey;
|
||||||
import com.usatiuk.objects.transaction.LockingStrategy;
|
import com.usatiuk.objects.transaction.LockingStrategy;
|
||||||
import com.usatiuk.objects.transaction.Transaction;
|
import com.usatiuk.objects.transaction.Transaction;
|
||||||
import io.quarkus.logging.Log;
|
import io.quarkus.logging.Log;
|
||||||
@@ -123,6 +123,11 @@ public class RemoteTransaction {
|
|||||||
curTx.put(newData);
|
curTx.put(newData);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public <T extends JDataRemote> void putDataNew(T obj) {
|
||||||
|
curTx.putNew(new RemoteObjectMeta(obj, persistentPeerDataService.getSelfUuid()));
|
||||||
|
curTx.putNew(new RemoteObjectDataWrapper<>(obj));
|
||||||
|
}
|
||||||
|
|
||||||
public Optional<RemoteObjectMeta> getMeta(JObjectKey key) {
|
public Optional<RemoteObjectMeta> getMeta(JObjectKey key) {
|
||||||
return getMeta(key, LockingStrategy.OPTIMISTIC);
|
return getMeta(key, LockingStrategy.OPTIMISTIC);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -81,7 +81,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
|||||||
|
|
||||||
private ChunkData createChunk(ByteString bytes) {
|
private ChunkData createChunk(ByteString bytes) {
|
||||||
var newChunk = new ChunkData(JObjectKey.of(UUID.randomUUID().toString()), bytes);
|
var newChunk = new ChunkData(JObjectKey.of(UUID.randomUUID().toString()), bytes);
|
||||||
remoteTx.putData(newChunk);
|
remoteTx.putDataNew(newChunk);
|
||||||
return newChunk;
|
return newChunk;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user