mirror of
https://github.com/usatiuk/dhfs.git
synced 2025-10-28 12:37:48 +01:00
Objects: remove LockingStrategy
This commit is contained in:
@@ -8,7 +8,6 @@ import com.usatiuk.dhfs.remoteobj.*;
|
||||
import com.usatiuk.dhfsfs.service.DhfsFileService;
|
||||
import com.usatiuk.kleppmanntree.AlreadyExistsException;
|
||||
import com.usatiuk.objects.JObjectKey;
|
||||
import com.usatiuk.objects.transaction.LockingStrategy;
|
||||
import com.usatiuk.objects.transaction.Transaction;
|
||||
import io.grpc.Status;
|
||||
import io.grpc.StatusRuntimeException;
|
||||
@@ -42,14 +41,10 @@ public class FileSyncHandler implements ObjSyncHandler<File, FileDto> {
|
||||
@Inject
|
||||
DhfsFileService fileService;
|
||||
|
||||
private JKleppmannTreeManager.JKleppmannTree getTreeW() {
|
||||
private JKleppmannTreeManager.JKleppmannTree getTree() {
|
||||
return jKleppmannTreeManager.getTree(JObjectKey.of("fs")).orElseThrow();
|
||||
}
|
||||
|
||||
private JKleppmannTreeManager.JKleppmannTree getTreeR() {
|
||||
return jKleppmannTreeManager.getTree(JObjectKey.of("fs"), LockingStrategy.OPTIMISTIC).orElseThrow();
|
||||
}
|
||||
|
||||
private void resolveConflict(PeerId from, JObjectKey key, PMap<PeerId, Long> receivedChangelog,
|
||||
@Nullable FileDto receivedData) {
|
||||
var oursCurMeta = curTx.get(RemoteObjectMeta.class, key).orElse(null);
|
||||
@@ -131,12 +126,12 @@ public class FileSyncHandler implements ObjSyncHandler<File, FileDto> {
|
||||
|
||||
do {
|
||||
try {
|
||||
getTreeW().move(parent.getRight(),
|
||||
getTree().move(parent.getRight(),
|
||||
new JKleppmannTreeNodeMetaFile(
|
||||
parent.getLeft() + ".fconflict." + persistentPeerDataService.getSelfUuid() + "." + otherHostname.toString() + "." + i,
|
||||
newFile.key()
|
||||
),
|
||||
getTreeW().getNewNodeId()
|
||||
getTree().getNewNodeId()
|
||||
);
|
||||
} catch (AlreadyExistsException aex) {
|
||||
i++;
|
||||
|
||||
@@ -19,7 +19,6 @@ import com.usatiuk.dhfsfs.objects.JKleppmannTreeNodeMetaFile;
|
||||
import com.usatiuk.objects.JData;
|
||||
import com.usatiuk.objects.JObjectKey;
|
||||
import com.usatiuk.objects.iterators.IteratorStart;
|
||||
import com.usatiuk.objects.transaction.LockingStrategy;
|
||||
import com.usatiuk.objects.transaction.Transaction;
|
||||
import com.usatiuk.objects.transaction.TransactionManager;
|
||||
import com.usatiuk.utils.StatusRuntimeExceptionNoStacktrace;
|
||||
@@ -68,14 +67,10 @@ public class DhfsFileService {
|
||||
@Inject
|
||||
JMapHelper jMapHelper;
|
||||
|
||||
private JKleppmannTreeManager.JKleppmannTree getTreeW() {
|
||||
private JKleppmannTreeManager.JKleppmannTree getTree() {
|
||||
return jKleppmannTreeManager.getTree(JObjectKey.of("fs"), () -> new JKleppmannTreeNodeMetaDirectory(""));
|
||||
}
|
||||
|
||||
private JKleppmannTreeManager.JKleppmannTree getTreeR() {
|
||||
return jKleppmannTreeManager.getTree(JObjectKey.of("fs"), LockingStrategy.OPTIMISTIC, () -> new JKleppmannTreeNodeMetaDirectory(""));
|
||||
}
|
||||
|
||||
private ChunkData createChunk(ByteString bytes) {
|
||||
var newChunk = new ChunkData(JObjectKey.of(UUID.randomUUID().toString()), bytes);
|
||||
remoteTx.putDataNew(newChunk);
|
||||
@@ -84,25 +79,25 @@ public class DhfsFileService {
|
||||
|
||||
void init(@Observes @Priority(500) StartupEvent event) {
|
||||
Log.info("Initializing file service");
|
||||
getTreeW();
|
||||
getTree();
|
||||
}
|
||||
|
||||
private JKleppmannTreeNode getDirEntryW(String name) {
|
||||
var res = getTreeW().traverse(StreamSupport.stream(Path.of(name).spliterator(), false).map(p -> p.toString()).toList());
|
||||
var res = getTree().traverse(StreamSupport.stream(Path.of(name).spliterator(), false).map(p -> p.toString()).toList());
|
||||
if (res == null) throw new StatusRuntimeExceptionNoStacktrace(Status.NOT_FOUND);
|
||||
var ret = curTx.get(JKleppmannTreeNodeHolder.class, res).map(JKleppmannTreeNodeHolder::node).orElseThrow(() -> new StatusRuntimeException(Status.NOT_FOUND.withDescription("Tree node exists but not found as jObject: " + name)));
|
||||
return ret;
|
||||
}
|
||||
|
||||
private JKleppmannTreeNode getDirEntryR(String name) {
|
||||
var res = getTreeR().traverse(StreamSupport.stream(Path.of(name).spliterator(), false).map(p -> p.toString()).toList());
|
||||
var res = getTree().traverse(StreamSupport.stream(Path.of(name).spliterator(), false).map(p -> p.toString()).toList());
|
||||
if (res == null) throw new StatusRuntimeExceptionNoStacktrace(Status.NOT_FOUND);
|
||||
var ret = curTx.get(JKleppmannTreeNodeHolder.class, res).map(JKleppmannTreeNodeHolder::node).orElseThrow(() -> new StatusRuntimeException(Status.NOT_FOUND.withDescription("Tree node exists but not found as jObject: " + name)));
|
||||
return ret;
|
||||
}
|
||||
|
||||
private Optional<JKleppmannTreeNode> getDirEntryOpt(String name) {
|
||||
var res = getTreeW().traverse(StreamSupport.stream(Path.of(name).spliterator(), false).map(p -> p.toString()).toList());
|
||||
var res = getTree().traverse(StreamSupport.stream(Path.of(name).spliterator(), false).map(p -> p.toString()).toList());
|
||||
if (res == null) return Optional.empty();
|
||||
var ret = curTx.get(JKleppmannTreeNodeHolder.class, res).map(JKleppmannTreeNodeHolder::node);
|
||||
return ret;
|
||||
@@ -167,7 +162,7 @@ public class DhfsFileService {
|
||||
remoteTx.putData(f);
|
||||
|
||||
try {
|
||||
getTreeW().move(parent.key(), new JKleppmannTreeNodeMetaFile(fname, f.key()), getTreeW().getNewNodeId());
|
||||
getTree().move(parent.key(), new JKleppmannTreeNodeMetaFile(fname, f.key()), getTree().getNewNodeId());
|
||||
} catch (Exception e) {
|
||||
// fobj.getMeta().removeRef(newNodeId);
|
||||
throw e;
|
||||
@@ -179,7 +174,7 @@ public class DhfsFileService {
|
||||
//FIXME: Slow..
|
||||
public Pair<String, JObjectKey> inoToParent(JObjectKey ino) {
|
||||
return jObjectTxManager.executeTx(() -> {
|
||||
return getTreeW().findParent(w -> {
|
||||
return getTree().findParent(w -> {
|
||||
if (w.meta() instanceof JKleppmannTreeNodeMetaFile f)
|
||||
return f.fileIno().equals(ino);
|
||||
return false;
|
||||
@@ -197,7 +192,7 @@ public class DhfsFileService {
|
||||
|
||||
Log.debug("Creating directory " + name);
|
||||
|
||||
getTreeW().move(parent.key(), new JKleppmannTreeNodeMetaDirectory(dname), getTreeW().getNewNodeId());
|
||||
getTree().move(parent.key(), new JKleppmannTreeNodeMetaDirectory(dname), getTree().getNewNodeId());
|
||||
});
|
||||
}
|
||||
|
||||
@@ -210,7 +205,7 @@ public class DhfsFileService {
|
||||
if (!allowRecursiveDelete && !node.children().isEmpty())
|
||||
throw new DirectoryNotEmptyException();
|
||||
}
|
||||
getTreeW().trash(node.meta(), node.key());
|
||||
getTree().trash(node.meta(), node.key());
|
||||
});
|
||||
}
|
||||
|
||||
@@ -223,7 +218,7 @@ public class DhfsFileService {
|
||||
var toDentry = getDirEntryW(toPath.getParent().toString());
|
||||
ensureDir(toDentry);
|
||||
|
||||
getTreeW().move(toDentry.key(), meta.withName(toPath.getFileName().toString()), node.key());
|
||||
getTree().move(toDentry.key(), meta.withName(toPath.getFileName().toString()), node.key());
|
||||
return true;
|
||||
});
|
||||
}
|
||||
@@ -344,7 +339,7 @@ public class DhfsFileService {
|
||||
if (offset < 0)
|
||||
throw new StatusRuntimeException(Status.INVALID_ARGUMENT.withDescription("Offset should be more than zero: " + offset));
|
||||
|
||||
var file = remoteTx.getData(File.class, fileUuid, LockingStrategy.WRITE).orElse(null);
|
||||
var file = remoteTx.getData(File.class, fileUuid).orElse(null);
|
||||
if (file == null) {
|
||||
throw new StatusRuntimeException(Status.NOT_FOUND.withDescription("File not found when trying to write: " + fileUuid));
|
||||
}
|
||||
@@ -595,7 +590,7 @@ public class DhfsFileService {
|
||||
jMapHelper.put(f, JMapLongKey.of(0), newChunkData.key());
|
||||
|
||||
remoteTx.putData(f);
|
||||
getTreeW().move(parent.key(), new JKleppmannTreeNodeMetaFile(fname, f.key()), getTreeW().getNewNodeId());
|
||||
getTree().move(parent.key(), new JKleppmannTreeNodeMetaFile(fname, f.key()), getTree().getNewNodeId());
|
||||
return f.key();
|
||||
});
|
||||
}
|
||||
|
||||
@@ -25,8 +25,8 @@ public class CurrentTransaction implements Transaction {
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T extends JData> Optional<T> get(Class<T> type, JObjectKey key, LockingStrategy strategy) {
|
||||
return transactionManager.current().get(type, key, strategy);
|
||||
public <T extends JData> Optional<T> get(Class<T> type, JObjectKey key) {
|
||||
return transactionManager.current().get(type, key);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -1,6 +0,0 @@
|
||||
package com.usatiuk.objects.transaction;
|
||||
|
||||
public enum LockingStrategy {
|
||||
OPTIMISTIC, // Optimistic write, no blocking other possible writers/readers
|
||||
WRITE, // Write lock, blocks all other writers
|
||||
}
|
||||
@@ -11,17 +11,13 @@ import java.util.Optional;
|
||||
public interface Transaction extends TransactionHandle {
|
||||
void onCommit(Runnable runnable);
|
||||
|
||||
<T extends JData> Optional<T> get(Class<T> type, JObjectKey key, LockingStrategy strategy);
|
||||
<T extends JData> Optional<T> get(Class<T> type, JObjectKey key);
|
||||
|
||||
<T extends JData> void put(JData obj);
|
||||
<T extends JData> void putNew(JData obj);
|
||||
|
||||
void delete(JObjectKey key);
|
||||
|
||||
default <T extends JData> Optional<T> get(Class<T> type, JObjectKey key) {
|
||||
return get(type, key, LockingStrategy.OPTIMISTIC);
|
||||
}
|
||||
|
||||
CloseableKvIterator<JObjectKey, JData> getIterator(IteratorStart start, JObjectKey key);
|
||||
|
||||
default CloseableKvIterator<JObjectKey, JData> getIterator(JObjectKey key) {
|
||||
|
||||
@@ -108,7 +108,7 @@ public class TransactionFactoryImpl implements TransactionFactory {
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T extends JData> Optional<T> get(Class<T> type, JObjectKey key, LockingStrategy strategy) {
|
||||
public <T extends JData> Optional<T> get(Class<T> type, JObjectKey key) {
|
||||
return switch (_writes.get(key)) {
|
||||
case TxRecord.TxObjectRecordWrite<?> write -> Optional.of(type.cast(write.data()));
|
||||
case TxRecord.TxObjectRecordDeleted deleted -> Optional.empty();
|
||||
|
||||
@@ -2,14 +2,11 @@ package com.usatiuk.objects;
|
||||
|
||||
import com.usatiuk.objects.data.Parent;
|
||||
import com.usatiuk.objects.iterators.IteratorStart;
|
||||
import com.usatiuk.objects.transaction.LockingStrategy;
|
||||
import com.usatiuk.objects.transaction.Transaction;
|
||||
import com.usatiuk.objects.transaction.TransactionManager;
|
||||
import io.quarkus.logging.Log;
|
||||
import jakarta.inject.Inject;
|
||||
import org.junit.jupiter.api.*;
|
||||
import org.junit.jupiter.params.ParameterizedTest;
|
||||
import org.junit.jupiter.params.provider.EnumSource;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
@@ -151,12 +148,12 @@ public abstract class ObjectsTestImpl {
|
||||
});
|
||||
|
||||
txm.run(() -> {
|
||||
var parent = curTx.get(Parent.class, JObjectKey.of(testInfo.getDisplayName() + "Parent3"), LockingStrategy.OPTIMISTIC).orElse(null);
|
||||
var parent = curTx.get(Parent.class, JObjectKey.of(testInfo.getDisplayName() + "Parent3")).orElse(null);
|
||||
Assertions.assertEquals("John", parent.name());
|
||||
curTx.put(parent.withName("John2"));
|
||||
});
|
||||
txm.run(() -> {
|
||||
var parent = curTx.get(Parent.class, JObjectKey.of(testInfo.getDisplayName() + "Parent3"), LockingStrategy.WRITE).orElse(null);
|
||||
var parent = curTx.get(Parent.class, JObjectKey.of(testInfo.getDisplayName() + "Parent3")).orElse(null);
|
||||
Assertions.assertEquals("John2", parent.name());
|
||||
curTx.put(parent.withName("John3"));
|
||||
});
|
||||
@@ -236,10 +233,9 @@ public abstract class ObjectsTestImpl {
|
||||
}
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@EnumSource(LockingStrategy.class)
|
||||
void editConflict(LockingStrategy strategy, TestInfo testInfo) {
|
||||
String key = testInfo.getDisplayName() + "Parent4" + strategy.name();
|
||||
@Test
|
||||
void editConflict(TestInfo testInfo) {
|
||||
String key = testInfo.getDisplayName() + "Parent4";
|
||||
txm.run(() -> {
|
||||
var newParent = new Parent(JObjectKey.of(key), "John3");
|
||||
curTx.put(newParent);
|
||||
@@ -260,7 +256,7 @@ public abstract class ObjectsTestImpl {
|
||||
} catch (Throwable e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
var parent = curTx.get(Parent.class, JObjectKey.of(key), strategy).orElse(null);
|
||||
var parent = curTx.get(Parent.class, JObjectKey.of(key)).orElse(null);
|
||||
curTx.put(parent.withName("John"));
|
||||
Log.warn("Thread 1 commit");
|
||||
}, 0);
|
||||
@@ -276,7 +272,7 @@ public abstract class ObjectsTestImpl {
|
||||
Log.warn("Thread 2");
|
||||
barrier.await(); // Ensure thread 2 tx id is larger than thread 1
|
||||
txm.runTries(() -> {
|
||||
var parent = curTx.get(Parent.class, JObjectKey.of(key), strategy).orElse(null);
|
||||
var parent = curTx.get(Parent.class, JObjectKey.of(key)).orElse(null);
|
||||
curTx.put(parent.withName("John2"));
|
||||
Log.warn("Thread 2 commit");
|
||||
}, 0);
|
||||
@@ -317,10 +313,9 @@ public abstract class ObjectsTestImpl {
|
||||
}
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@EnumSource(LockingStrategy.class)
|
||||
void editConflict2(LockingStrategy strategy, TestInfo testInfo) {
|
||||
String key = testInfo.getDisplayName() + "EditConflict2" + strategy.name();
|
||||
@Test
|
||||
void editConflict2(TestInfo testInfo) {
|
||||
String key = testInfo.getDisplayName() + "EditConflict2";
|
||||
txm.run(() -> {
|
||||
var newParent = new Parent(JObjectKey.of(key), "John3");
|
||||
curTx.put(newParent);
|
||||
@@ -341,7 +336,7 @@ public abstract class ObjectsTestImpl {
|
||||
} catch (Throwable e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
var parent = curTx.get(Parent.class, JObjectKey.of(key), strategy).orElse(null);
|
||||
var parent = curTx.get(Parent.class, JObjectKey.of(key)).orElse(null);
|
||||
curTx.put(parent.withName("John"));
|
||||
Log.warn("Thread 1 commit");
|
||||
}, 0);
|
||||
@@ -362,7 +357,7 @@ public abstract class ObjectsTestImpl {
|
||||
} catch (Throwable e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
var parent = curTx.get(Parent.class, JObjectKey.of(key), strategy).orElse(null);
|
||||
var parent = curTx.get(Parent.class, JObjectKey.of(key)).orElse(null);
|
||||
curTx.put(parent.withName("John2"));
|
||||
Log.warn("Thread 2 commit");
|
||||
}, 0);
|
||||
@@ -922,10 +917,8 @@ public abstract class ObjectsTestImpl {
|
||||
() -> createGetObject(testInfo),
|
||||
() -> createDeleteObject(testInfo),
|
||||
() -> createCreateObject(testInfo),
|
||||
() -> editConflict(LockingStrategy.WRITE, testInfo),
|
||||
() -> editConflict(LockingStrategy.OPTIMISTIC, testInfo),
|
||||
() -> editConflict2(LockingStrategy.WRITE, testInfo),
|
||||
() -> editConflict2(LockingStrategy.OPTIMISTIC, testInfo),
|
||||
() -> editConflict(testInfo),
|
||||
() -> editConflict2(testInfo),
|
||||
() -> snapshotTest1(testInfo),
|
||||
() -> snapshotTest2(testInfo),
|
||||
() -> snapshotTest3(testInfo),
|
||||
|
||||
@@ -10,7 +10,6 @@ import com.usatiuk.dhfs.peersync.PeerInfoService;
|
||||
import com.usatiuk.dhfs.peersync.PersistentPeerDataService;
|
||||
import com.usatiuk.kleppmanntree.*;
|
||||
import com.usatiuk.objects.JObjectKey;
|
||||
import com.usatiuk.objects.transaction.LockingStrategy;
|
||||
import com.usatiuk.objects.transaction.Transaction;
|
||||
import com.usatiuk.objects.transaction.TransactionManager;
|
||||
import io.quarkus.logging.Log;
|
||||
@@ -39,9 +38,9 @@ public class JKleppmannTreeManager {
|
||||
@Inject
|
||||
PersistentPeerDataService persistentPeerDataService;
|
||||
|
||||
public JKleppmannTree getTree(JObjectKey name, LockingStrategy lockingStrategy, Supplier<JKleppmannTreeNodeMeta> rootNodeSupplier) {
|
||||
public JKleppmannTree getTree(JObjectKey name, Supplier<JKleppmannTreeNodeMeta> rootNodeSupplier) {
|
||||
return txManager.executeTx(() -> {
|
||||
var data = curTx.get(JKleppmannTreePersistentData.class, name, lockingStrategy).orElse(null);
|
||||
var data = curTx.get(JKleppmannTreePersistentData.class, name).orElse(null);
|
||||
if (data == null) {
|
||||
data = new JKleppmannTreePersistentData(
|
||||
name,
|
||||
@@ -66,18 +65,11 @@ public class JKleppmannTreeManager {
|
||||
}
|
||||
|
||||
public Optional<JKleppmannTree> getTree(JObjectKey name) {
|
||||
return getTree(name, LockingStrategy.WRITE);
|
||||
}
|
||||
|
||||
public Optional<JKleppmannTree> getTree(JObjectKey name, LockingStrategy lockingStrategy) {
|
||||
return txManager.executeTx(() -> {
|
||||
return curTx.get(JKleppmannTreePersistentData.class, name, lockingStrategy).map(JKleppmannTree::new);
|
||||
return curTx.get(JKleppmannTreePersistentData.class, name).map(JKleppmannTree::new);
|
||||
});
|
||||
}
|
||||
|
||||
public JKleppmannTree getTree(JObjectKey name, Supplier<JKleppmannTreeNodeMeta> rootNodeSupplier) {
|
||||
return getTree(name, LockingStrategy.WRITE, rootNodeSupplier);
|
||||
}
|
||||
|
||||
public class JKleppmannTree {
|
||||
private final KleppmannTree<Long, PeerId, JKleppmannTreeNodeMeta, JObjectKey> _tree;
|
||||
|
||||
@@ -5,7 +5,6 @@ import com.usatiuk.dhfs.jkleppmanntree.structs.JKleppmannTreeNodeHolder;
|
||||
import com.usatiuk.dhfs.peersync.structs.JKleppmannTreeNodeMetaPeer;
|
||||
import com.usatiuk.dhfs.remoteobj.RemoteTransaction;
|
||||
import com.usatiuk.objects.JObjectKey;
|
||||
import com.usatiuk.objects.transaction.LockingStrategy;
|
||||
import com.usatiuk.objects.transaction.Transaction;
|
||||
import com.usatiuk.objects.transaction.TransactionManager;
|
||||
import io.quarkus.logging.Log;
|
||||
@@ -29,14 +28,10 @@ public class PeerInfoService {
|
||||
@Inject
|
||||
RemoteTransaction remoteTx;
|
||||
|
||||
private JKleppmannTreeManager.JKleppmannTree getTreeW() {
|
||||
private JKleppmannTreeManager.JKleppmannTree getTree() {
|
||||
return jKleppmannTreeManager.getTree(TREE_KEY, () -> null);
|
||||
}
|
||||
|
||||
private JKleppmannTreeManager.JKleppmannTree getTreeR() {
|
||||
return jKleppmannTreeManager.getTree(TREE_KEY, LockingStrategy.OPTIMISTIC, () -> null);
|
||||
}
|
||||
|
||||
public Optional<PeerInfo> getPeerInfoImpl(JObjectKey key) {
|
||||
return jObjectTxManager.run(() -> {
|
||||
return curTx.get(JKleppmannTreeNodeHolder.class, key).map(JKleppmannTreeNodeHolder::node).flatMap(node -> {
|
||||
@@ -49,7 +44,7 @@ public class PeerInfoService {
|
||||
|
||||
public boolean existsPeer(PeerId peer) {
|
||||
return jObjectTxManager.run(() -> {
|
||||
var gotKey = getTreeR().traverse(List.of(JKleppmannTreeNodeMetaPeer.peerIdToNodeId(peer).value()));
|
||||
var gotKey = getTree().traverse(List.of(JKleppmannTreeNodeMetaPeer.peerIdToNodeId(peer).value()));
|
||||
if (gotKey == null) {
|
||||
return false;
|
||||
}
|
||||
@@ -59,7 +54,7 @@ public class PeerInfoService {
|
||||
|
||||
public Optional<PeerInfo> getPeerInfo(PeerId peer) {
|
||||
return jObjectTxManager.run(() -> {
|
||||
var gotKey = getTreeR().traverse(List.of(JKleppmannTreeNodeMetaPeer.peerIdToNodeId(peer).value()));
|
||||
var gotKey = getTree().traverse(List.of(JKleppmannTreeNodeMetaPeer.peerIdToNodeId(peer).value()));
|
||||
if (gotKey == null) {
|
||||
return Optional.empty();
|
||||
}
|
||||
@@ -72,7 +67,7 @@ public class PeerInfoService {
|
||||
|
||||
public List<PeerInfo> getPeers() {
|
||||
return jObjectTxManager.run(() -> {
|
||||
var gotKey = getTreeR().traverse(List.of());
|
||||
var gotKey = getTree().traverse(List.of());
|
||||
return curTx.get(JKleppmannTreeNodeHolder.class, gotKey).map(JKleppmannTreeNodeHolder::node).map(
|
||||
node -> node.children().keySet().stream()
|
||||
.map(JObjectKey::of).map(this::getPeerInfoImpl)
|
||||
@@ -113,16 +108,16 @@ public class PeerInfoService {
|
||||
|
||||
public void putPeer(PeerId id, byte[] cert) {
|
||||
jObjectTxManager.run(() -> {
|
||||
var parent = getTreeW().traverse(List.of());
|
||||
var parent = getTree().traverse(List.of());
|
||||
var newPeerInfo = new PeerInfo(id, cert);
|
||||
remoteTx.putData(newPeerInfo);
|
||||
getTreeW().move(parent, new JKleppmannTreeNodeMetaPeer(newPeerInfo.id()), JKleppmannTreeNodeMetaPeer.peerIdToNodeId(newPeerInfo.id()));
|
||||
getTree().move(parent, new JKleppmannTreeNodeMetaPeer(newPeerInfo.id()), JKleppmannTreeNodeMetaPeer.peerIdToNodeId(newPeerInfo.id()));
|
||||
});
|
||||
}
|
||||
|
||||
public void removePeer(PeerId id) {
|
||||
jObjectTxManager.run(() -> {
|
||||
var gotKey = getTreeR().traverse(List.of(JKleppmannTreeNodeMetaPeer.peerIdToNodeId(id).value()));
|
||||
var gotKey = getTree().traverse(List.of(JKleppmannTreeNodeMetaPeer.peerIdToNodeId(id).value()));
|
||||
if (gotKey == null) {
|
||||
return;
|
||||
}
|
||||
@@ -131,7 +126,7 @@ public class PeerInfoService {
|
||||
Log.warn("Peer " + id + " not found in the tree");
|
||||
return;
|
||||
}
|
||||
getTreeW().trash(node.meta(), node.key());
|
||||
getTree().trash(node.meta(), node.key());
|
||||
curTx.onCommit(persistentPeerDataService::updateCerts);
|
||||
});
|
||||
}
|
||||
|
||||
@@ -3,7 +3,6 @@ package com.usatiuk.dhfs.remoteobj;
|
||||
import com.usatiuk.dhfs.peersync.PersistentPeerDataService;
|
||||
import com.usatiuk.dhfs.rpc.RemoteObjectServiceClient;
|
||||
import com.usatiuk.objects.JObjectKey;
|
||||
import com.usatiuk.objects.transaction.LockingStrategy;
|
||||
import com.usatiuk.objects.transaction.Transaction;
|
||||
import io.quarkus.logging.Log;
|
||||
import jakarta.inject.Inject;
|
||||
@@ -55,11 +54,11 @@ public class RemoteTransaction {
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private <T extends JDataRemote> Optional<T> getData(Class<T> type, JObjectKey key, LockingStrategy strategy, boolean tryRequest) {
|
||||
return curTx.get(RemoteObjectMeta.class, RemoteObjectMeta.ofMetaKey(key), strategy)
|
||||
private <T extends JDataRemote> Optional<T> getData(Class<T> type, JObjectKey key, boolean tryRequest) {
|
||||
return curTx.get(RemoteObjectMeta.class, RemoteObjectMeta.ofMetaKey(key))
|
||||
.flatMap(obj -> {
|
||||
if (obj.hasLocalData()) {
|
||||
var realData = curTx.get(RemoteObjectDataWrapper.class, RemoteObjectMeta.ofDataKey(key), strategy).orElse(null);
|
||||
var realData = curTx.get(RemoteObjectDataWrapper.class, RemoteObjectMeta.ofDataKey(key)).orElse(null);
|
||||
if (realData == null)
|
||||
throw new IllegalStateException("Local data not found for " + key); // TODO: Race
|
||||
if (!type.isInstance(realData.data()))
|
||||
@@ -72,8 +71,8 @@ public class RemoteTransaction {
|
||||
});
|
||||
}
|
||||
|
||||
public Optional<RemoteObjectMeta> getMeta(JObjectKey key, LockingStrategy strategy) {
|
||||
return curTx.get(RemoteObjectMeta.class, RemoteObjectMeta.ofMetaKey(key), strategy);
|
||||
public Optional<RemoteObjectMeta> getMeta(JObjectKey key) {
|
||||
return curTx.get(RemoteObjectMeta.class, RemoteObjectMeta.ofMetaKey(key));
|
||||
}
|
||||
|
||||
public <T extends JDataRemote> void putDataRaw(T obj) {
|
||||
@@ -127,23 +126,12 @@ public class RemoteTransaction {
|
||||
curTx.put(newData);
|
||||
}
|
||||
|
||||
public Optional<RemoteObjectMeta> getMeta(JObjectKey key) {
|
||||
return getMeta(key, LockingStrategy.OPTIMISTIC);
|
||||
}
|
||||
|
||||
public <T extends JDataRemote> Optional<T> getData(Class<T> type, JObjectKey key) {
|
||||
return getData(type, key, LockingStrategy.OPTIMISTIC, true);
|
||||
return getData(type, key, true);
|
||||
}
|
||||
|
||||
public <T extends JDataRemote> Optional<T> getDataLocal(Class<T> type, JObjectKey key) {
|
||||
return getData(type, key, LockingStrategy.OPTIMISTIC, false);
|
||||
return getData(type, key, false);
|
||||
}
|
||||
|
||||
public <T extends JDataRemote> Optional<T> getData(Class<T> type, JObjectKey key, LockingStrategy strategy) {
|
||||
return getData(type, key, strategy, true);
|
||||
}
|
||||
|
||||
public <T extends JDataRemote> Optional<T> getDataLocal(Class<T> type, JObjectKey key, LockingStrategy strategy) {
|
||||
return getData(type, key, strategy, false);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user