3 Commits

Author SHA1 Message Date
52ccbb99bc Sync-base: rename ConnectedPeerManager to ReachablePeerManager
tests check for "connected" in logs
2025-05-06 20:28:21 +02:00
d972cd1562 Objects: remove LockingStrategy 2025-05-06 20:21:29 +02:00
80151bcca5 Dhfs-fuse: less parallel e2e tests 2025-05-06 20:07:03 +02:00
21 changed files with 84 additions and 136 deletions

View File

@@ -8,7 +8,6 @@ import com.usatiuk.dhfs.remoteobj.*;
import com.usatiuk.dhfsfs.service.DhfsFileService;
import com.usatiuk.kleppmanntree.AlreadyExistsException;
import com.usatiuk.objects.JObjectKey;
import com.usatiuk.objects.transaction.LockingStrategy;
import com.usatiuk.objects.transaction.Transaction;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
@@ -42,14 +41,10 @@ public class FileSyncHandler implements ObjSyncHandler<File, FileDto> {
@Inject
DhfsFileService fileService;
private JKleppmannTreeManager.JKleppmannTree getTreeW() {
private JKleppmannTreeManager.JKleppmannTree getTree() {
return jKleppmannTreeManager.getTree(JObjectKey.of("fs")).orElseThrow();
}
private JKleppmannTreeManager.JKleppmannTree getTreeR() {
return jKleppmannTreeManager.getTree(JObjectKey.of("fs"), LockingStrategy.OPTIMISTIC).orElseThrow();
}
private void resolveConflict(PeerId from, JObjectKey key, PMap<PeerId, Long> receivedChangelog,
@Nullable FileDto receivedData) {
var oursCurMeta = curTx.get(RemoteObjectMeta.class, key).orElse(null);
@@ -131,12 +126,12 @@ public class FileSyncHandler implements ObjSyncHandler<File, FileDto> {
do {
try {
getTreeW().move(parent.getRight(),
getTree().move(parent.getRight(),
new JKleppmannTreeNodeMetaFile(
parent.getLeft() + ".fconflict." + persistentPeerDataService.getSelfUuid() + "." + otherHostname.toString() + "." + i,
newFile.key()
),
getTreeW().getNewNodeId()
getTree().getNewNodeId()
);
} catch (AlreadyExistsException aex) {
i++;

View File

@@ -19,7 +19,6 @@ import com.usatiuk.dhfsfs.objects.JKleppmannTreeNodeMetaFile;
import com.usatiuk.objects.JData;
import com.usatiuk.objects.JObjectKey;
import com.usatiuk.objects.iterators.IteratorStart;
import com.usatiuk.objects.transaction.LockingStrategy;
import com.usatiuk.objects.transaction.Transaction;
import com.usatiuk.objects.transaction.TransactionManager;
import com.usatiuk.utils.StatusRuntimeExceptionNoStacktrace;
@@ -68,14 +67,10 @@ public class DhfsFileService {
@Inject
JMapHelper jMapHelper;
private JKleppmannTreeManager.JKleppmannTree getTreeW() {
private JKleppmannTreeManager.JKleppmannTree getTree() {
return jKleppmannTreeManager.getTree(JObjectKey.of("fs"), () -> new JKleppmannTreeNodeMetaDirectory(""));
}
private JKleppmannTreeManager.JKleppmannTree getTreeR() {
return jKleppmannTreeManager.getTree(JObjectKey.of("fs"), LockingStrategy.OPTIMISTIC, () -> new JKleppmannTreeNodeMetaDirectory(""));
}
private ChunkData createChunk(ByteString bytes) {
var newChunk = new ChunkData(JObjectKey.of(UUID.randomUUID().toString()), bytes);
remoteTx.putDataNew(newChunk);
@@ -84,25 +79,25 @@ public class DhfsFileService {
void init(@Observes @Priority(500) StartupEvent event) {
Log.info("Initializing file service");
getTreeW();
getTree();
}
private JKleppmannTreeNode getDirEntryW(String name) {
var res = getTreeW().traverse(StreamSupport.stream(Path.of(name).spliterator(), false).map(p -> p.toString()).toList());
var res = getTree().traverse(StreamSupport.stream(Path.of(name).spliterator(), false).map(p -> p.toString()).toList());
if (res == null) throw new StatusRuntimeExceptionNoStacktrace(Status.NOT_FOUND);
var ret = curTx.get(JKleppmannTreeNodeHolder.class, res).map(JKleppmannTreeNodeHolder::node).orElseThrow(() -> new StatusRuntimeException(Status.NOT_FOUND.withDescription("Tree node exists but not found as jObject: " + name)));
return ret;
}
private JKleppmannTreeNode getDirEntryR(String name) {
var res = getTreeR().traverse(StreamSupport.stream(Path.of(name).spliterator(), false).map(p -> p.toString()).toList());
var res = getTree().traverse(StreamSupport.stream(Path.of(name).spliterator(), false).map(p -> p.toString()).toList());
if (res == null) throw new StatusRuntimeExceptionNoStacktrace(Status.NOT_FOUND);
var ret = curTx.get(JKleppmannTreeNodeHolder.class, res).map(JKleppmannTreeNodeHolder::node).orElseThrow(() -> new StatusRuntimeException(Status.NOT_FOUND.withDescription("Tree node exists but not found as jObject: " + name)));
return ret;
}
private Optional<JKleppmannTreeNode> getDirEntryOpt(String name) {
var res = getTreeW().traverse(StreamSupport.stream(Path.of(name).spliterator(), false).map(p -> p.toString()).toList());
var res = getTree().traverse(StreamSupport.stream(Path.of(name).spliterator(), false).map(p -> p.toString()).toList());
if (res == null) return Optional.empty();
var ret = curTx.get(JKleppmannTreeNodeHolder.class, res).map(JKleppmannTreeNodeHolder::node);
return ret;
@@ -167,7 +162,7 @@ public class DhfsFileService {
remoteTx.putData(f);
try {
getTreeW().move(parent.key(), new JKleppmannTreeNodeMetaFile(fname, f.key()), getTreeW().getNewNodeId());
getTree().move(parent.key(), new JKleppmannTreeNodeMetaFile(fname, f.key()), getTree().getNewNodeId());
} catch (Exception e) {
// fobj.getMeta().removeRef(newNodeId);
throw e;
@@ -179,7 +174,7 @@ public class DhfsFileService {
//FIXME: Slow..
public Pair<String, JObjectKey> inoToParent(JObjectKey ino) {
return jObjectTxManager.executeTx(() -> {
return getTreeW().findParent(w -> {
return getTree().findParent(w -> {
if (w.meta() instanceof JKleppmannTreeNodeMetaFile f)
return f.fileIno().equals(ino);
return false;
@@ -197,7 +192,7 @@ public class DhfsFileService {
Log.debug("Creating directory " + name);
getTreeW().move(parent.key(), new JKleppmannTreeNodeMetaDirectory(dname), getTreeW().getNewNodeId());
getTree().move(parent.key(), new JKleppmannTreeNodeMetaDirectory(dname), getTree().getNewNodeId());
});
}
@@ -210,7 +205,7 @@ public class DhfsFileService {
if (!allowRecursiveDelete && !node.children().isEmpty())
throw new DirectoryNotEmptyException();
}
getTreeW().trash(node.meta(), node.key());
getTree().trash(node.meta(), node.key());
});
}
@@ -223,7 +218,7 @@ public class DhfsFileService {
var toDentry = getDirEntryW(toPath.getParent().toString());
ensureDir(toDentry);
getTreeW().move(toDentry.key(), meta.withName(toPath.getFileName().toString()), node.key());
getTree().move(toDentry.key(), meta.withName(toPath.getFileName().toString()), node.key());
return true;
});
}
@@ -344,7 +339,7 @@ public class DhfsFileService {
if (offset < 0)
throw new StatusRuntimeException(Status.INVALID_ARGUMENT.withDescription("Offset should be more than zero: " + offset));
var file = remoteTx.getData(File.class, fileUuid, LockingStrategy.WRITE).orElse(null);
var file = remoteTx.getData(File.class, fileUuid).orElse(null);
if (file == null) {
throw new StatusRuntimeException(Status.NOT_FOUND.withDescription("File not found when trying to write: " + fileUuid));
}
@@ -595,7 +590,7 @@ public class DhfsFileService {
jMapHelper.put(f, JMapLongKey.of(0), newChunkData.key());
remoteTx.putData(f);
getTreeW().move(parent.key(), new JKleppmannTreeNodeMetaFile(fname, f.key()), getTreeW().getNewNodeId());
getTree().move(parent.key(), new JKleppmannTreeNodeMetaFile(fname, f.key()), getTree().getNewNodeId());
return f.key();
});
}

View File

@@ -139,7 +139,7 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-failsafe-plugin</artifactId>
<configuration>
<forkCount>1C</forkCount>
<forkCount>0.5C</forkCount>
<reuseForks>false</reuseForks>
<parallel>classes</parallel>
<systemPropertyVariables>

View File

@@ -25,8 +25,8 @@ public class CurrentTransaction implements Transaction {
}
@Override
public <T extends JData> Optional<T> get(Class<T> type, JObjectKey key, LockingStrategy strategy) {
return transactionManager.current().get(type, key, strategy);
public <T extends JData> Optional<T> get(Class<T> type, JObjectKey key) {
return transactionManager.current().get(type, key);
}
@Override

View File

@@ -1,6 +0,0 @@
package com.usatiuk.objects.transaction;
public enum LockingStrategy {
OPTIMISTIC, // Optimistic write, no blocking other possible writers/readers
WRITE, // Write lock, blocks all other writers
}

View File

@@ -11,17 +11,13 @@ import java.util.Optional;
public interface Transaction extends TransactionHandle {
void onCommit(Runnable runnable);
<T extends JData> Optional<T> get(Class<T> type, JObjectKey key, LockingStrategy strategy);
<T extends JData> Optional<T> get(Class<T> type, JObjectKey key);
<T extends JData> void put(JData obj);
<T extends JData> void putNew(JData obj);
void delete(JObjectKey key);
default <T extends JData> Optional<T> get(Class<T> type, JObjectKey key) {
return get(type, key, LockingStrategy.OPTIMISTIC);
}
CloseableKvIterator<JObjectKey, JData> getIterator(IteratorStart start, JObjectKey key);
default CloseableKvIterator<JObjectKey, JData> getIterator(JObjectKey key) {

View File

@@ -108,7 +108,7 @@ public class TransactionFactoryImpl implements TransactionFactory {
}
@Override
public <T extends JData> Optional<T> get(Class<T> type, JObjectKey key, LockingStrategy strategy) {
public <T extends JData> Optional<T> get(Class<T> type, JObjectKey key) {
return switch (_writes.get(key)) {
case TxRecord.TxObjectRecordWrite<?> write -> Optional.of(type.cast(write.data()));
case TxRecord.TxObjectRecordDeleted deleted -> Optional.empty();

View File

@@ -2,14 +2,11 @@ package com.usatiuk.objects;
import com.usatiuk.objects.data.Parent;
import com.usatiuk.objects.iterators.IteratorStart;
import com.usatiuk.objects.transaction.LockingStrategy;
import com.usatiuk.objects.transaction.Transaction;
import com.usatiuk.objects.transaction.TransactionManager;
import io.quarkus.logging.Log;
import jakarta.inject.Inject;
import org.junit.jupiter.api.*;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import java.util.List;
import java.util.Map;
@@ -151,12 +148,12 @@ public abstract class ObjectsTestImpl {
});
txm.run(() -> {
var parent = curTx.get(Parent.class, JObjectKey.of(testInfo.getDisplayName() + "Parent3"), LockingStrategy.OPTIMISTIC).orElse(null);
var parent = curTx.get(Parent.class, JObjectKey.of(testInfo.getDisplayName() + "Parent3")).orElse(null);
Assertions.assertEquals("John", parent.name());
curTx.put(parent.withName("John2"));
});
txm.run(() -> {
var parent = curTx.get(Parent.class, JObjectKey.of(testInfo.getDisplayName() + "Parent3"), LockingStrategy.WRITE).orElse(null);
var parent = curTx.get(Parent.class, JObjectKey.of(testInfo.getDisplayName() + "Parent3")).orElse(null);
Assertions.assertEquals("John2", parent.name());
curTx.put(parent.withName("John3"));
});
@@ -236,10 +233,9 @@ public abstract class ObjectsTestImpl {
}
}
@ParameterizedTest
@EnumSource(LockingStrategy.class)
void editConflict(LockingStrategy strategy, TestInfo testInfo) {
String key = testInfo.getDisplayName() + "Parent4" + strategy.name();
@Test
void editConflict(TestInfo testInfo) {
String key = testInfo.getDisplayName() + "Parent4";
txm.run(() -> {
var newParent = new Parent(JObjectKey.of(key), "John3");
curTx.put(newParent);
@@ -260,7 +256,7 @@ public abstract class ObjectsTestImpl {
} catch (Throwable e) {
throw new RuntimeException(e);
}
var parent = curTx.get(Parent.class, JObjectKey.of(key), strategy).orElse(null);
var parent = curTx.get(Parent.class, JObjectKey.of(key)).orElse(null);
curTx.put(parent.withName("John"));
Log.warn("Thread 1 commit");
}, 0);
@@ -276,7 +272,7 @@ public abstract class ObjectsTestImpl {
Log.warn("Thread 2");
barrier.await(); // Ensure thread 2 tx id is larger than thread 1
txm.runTries(() -> {
var parent = curTx.get(Parent.class, JObjectKey.of(key), strategy).orElse(null);
var parent = curTx.get(Parent.class, JObjectKey.of(key)).orElse(null);
curTx.put(parent.withName("John2"));
Log.warn("Thread 2 commit");
}, 0);
@@ -317,10 +313,9 @@ public abstract class ObjectsTestImpl {
}
}
@ParameterizedTest
@EnumSource(LockingStrategy.class)
void editConflict2(LockingStrategy strategy, TestInfo testInfo) {
String key = testInfo.getDisplayName() + "EditConflict2" + strategy.name();
@Test
void editConflict2(TestInfo testInfo) {
String key = testInfo.getDisplayName() + "EditConflict2";
txm.run(() -> {
var newParent = new Parent(JObjectKey.of(key), "John3");
curTx.put(newParent);
@@ -341,7 +336,7 @@ public abstract class ObjectsTestImpl {
} catch (Throwable e) {
throw new RuntimeException(e);
}
var parent = curTx.get(Parent.class, JObjectKey.of(key), strategy).orElse(null);
var parent = curTx.get(Parent.class, JObjectKey.of(key)).orElse(null);
curTx.put(parent.withName("John"));
Log.warn("Thread 1 commit");
}, 0);
@@ -362,7 +357,7 @@ public abstract class ObjectsTestImpl {
} catch (Throwable e) {
throw new RuntimeException(e);
}
var parent = curTx.get(Parent.class, JObjectKey.of(key), strategy).orElse(null);
var parent = curTx.get(Parent.class, JObjectKey.of(key)).orElse(null);
curTx.put(parent.withName("John2"));
Log.warn("Thread 2 commit");
}, 0);
@@ -922,10 +917,8 @@ public abstract class ObjectsTestImpl {
() -> createGetObject(testInfo),
() -> createDeleteObject(testInfo),
() -> createCreateObject(testInfo),
() -> editConflict(LockingStrategy.WRITE, testInfo),
() -> editConflict(LockingStrategy.OPTIMISTIC, testInfo),
() -> editConflict2(LockingStrategy.WRITE, testInfo),
() -> editConflict2(LockingStrategy.OPTIMISTIC, testInfo),
() -> editConflict(testInfo),
() -> editConflict2(testInfo),
() -> snapshotTest1(testInfo),
() -> snapshotTest2(testInfo),
() -> snapshotTest3(testInfo),

View File

@@ -2,7 +2,7 @@ package com.usatiuk.dhfs.invalidation;
import com.usatiuk.dhfs.peersync.PeerConnectedEventListener;
import com.usatiuk.dhfs.peersync.PeerId;
import com.usatiuk.dhfs.peersync.ConnectedPeerManager;
import com.usatiuk.dhfs.peersync.ReachablePeerManager;
import com.usatiuk.utils.SerializationHelper;
import io.quarkus.logging.Log;
import io.quarkus.runtime.ShutdownEvent;
@@ -24,7 +24,7 @@ import java.nio.file.Paths;
public class DeferredInvalidationQueueService implements PeerConnectedEventListener {
private static final String dataFileName = "invqueue";
@Inject
ConnectedPeerManager remoteHostManager;
ReachablePeerManager reachablePeerManager;
@Inject
InvalidationQueueService invalidationQueueService;
@ConfigProperty(name = "dhfs.objects.persistence.files.root")
@@ -63,7 +63,7 @@ public class DeferredInvalidationQueueService implements PeerConnectedEventListe
@Scheduled(every = "15s", concurrentExecution = Scheduled.ConcurrentExecution.SKIP)
@Blocking
void periodicReturn() {
for (var reachable : remoteHostManager.getAvailableHosts())
for (var reachable : reachablePeerManager.getAvailableHosts())
returnForHost(reachable);
}

View File

@@ -2,7 +2,7 @@ package com.usatiuk.dhfs.invalidation;
import com.usatiuk.dhfs.peersync.PeerId;
import com.usatiuk.dhfs.peersync.PeerInfoService;
import com.usatiuk.dhfs.peersync.ConnectedPeerManager;
import com.usatiuk.dhfs.peersync.ReachablePeerManager;
import com.usatiuk.dhfs.peersync.PersistentPeerDataService;
import com.usatiuk.dhfs.rpc.RemoteObjectServiceClient;
import com.usatiuk.objects.JData;
@@ -37,7 +37,7 @@ public class InvalidationQueueService {
private final AtomicReference<ConcurrentHashSet<JObjectKey>> _toAllQueue = new AtomicReference<>(new ConcurrentHashSet<>());
private final DataLocker _locker = new DataLocker();
@Inject
ConnectedPeerManager remoteHostManager;
ReachablePeerManager reachablePeerManager;
@Inject
DeferredInvalidationQueueService deferredInvalidationQueueService;
@Inject
@@ -103,7 +103,7 @@ public class InvalidationQueueService {
}
if (toAllQueue != null) {
var hostInfo = remoteHostManager.getHostStateSnapshot();
var hostInfo = reachablePeerManager.getHostStateSnapshot();
for (var o : toAllQueue) {
for (var h : hostInfo.available())
_queue.add(new InvalidationQueueEntry(h, o));
@@ -129,7 +129,7 @@ public class InvalidationQueueService {
continue;
}
if (!remoteHostManager.isReachable(e.peer())) {
if (!reachablePeerManager.isReachable(e.peer())) {
deferredInvalidationQueueService.defer(e);
continue;
}
@@ -210,14 +210,14 @@ public class InvalidationQueueService {
}
void pushInvalidationToOne(InvalidationQueueEntry entry) {
if (remoteHostManager.isReachable(entry.peer()))
if (reachablePeerManager.isReachable(entry.peer()))
_queue.add(entry);
else
deferredInvalidationQueueService.defer(entry);
}
void pushInvalidationToOneNoDelay(InvalidationQueueEntry entry) {
if (remoteHostManager.isReachable(entry.peer()))
if (reachablePeerManager.isReachable(entry.peer()))
_queue.addNoDelay(entry);
else
deferredInvalidationQueueService.defer(entry);

View File

@@ -10,7 +10,6 @@ import com.usatiuk.dhfs.peersync.PeerInfoService;
import com.usatiuk.dhfs.peersync.PersistentPeerDataService;
import com.usatiuk.kleppmanntree.*;
import com.usatiuk.objects.JObjectKey;
import com.usatiuk.objects.transaction.LockingStrategy;
import com.usatiuk.objects.transaction.Transaction;
import com.usatiuk.objects.transaction.TransactionManager;
import io.quarkus.logging.Log;
@@ -39,9 +38,9 @@ public class JKleppmannTreeManager {
@Inject
PersistentPeerDataService persistentPeerDataService;
public JKleppmannTree getTree(JObjectKey name, LockingStrategy lockingStrategy, Supplier<JKleppmannTreeNodeMeta> rootNodeSupplier) {
public JKleppmannTree getTree(JObjectKey name, Supplier<JKleppmannTreeNodeMeta> rootNodeSupplier) {
return txManager.executeTx(() -> {
var data = curTx.get(JKleppmannTreePersistentData.class, name, lockingStrategy).orElse(null);
var data = curTx.get(JKleppmannTreePersistentData.class, name).orElse(null);
if (data == null) {
data = new JKleppmannTreePersistentData(
name,
@@ -66,18 +65,11 @@ public class JKleppmannTreeManager {
}
public Optional<JKleppmannTree> getTree(JObjectKey name) {
return getTree(name, LockingStrategy.WRITE);
}
public Optional<JKleppmannTree> getTree(JObjectKey name, LockingStrategy lockingStrategy) {
return txManager.executeTx(() -> {
return curTx.get(JKleppmannTreePersistentData.class, name, lockingStrategy).map(JKleppmannTree::new);
return curTx.get(JKleppmannTreePersistentData.class, name).map(JKleppmannTree::new);
});
}
public JKleppmannTree getTree(JObjectKey name, Supplier<JKleppmannTreeNodeMeta> rootNodeSupplier) {
return getTree(name, LockingStrategy.WRITE, rootNodeSupplier);
}
public class JKleppmannTree {
private final KleppmannTree<Long, PeerId, JKleppmannTreeNodeMeta, JObjectKey> _tree;

View File

@@ -5,7 +5,6 @@ import com.usatiuk.dhfs.jkleppmanntree.structs.JKleppmannTreeNodeHolder;
import com.usatiuk.dhfs.peersync.structs.JKleppmannTreeNodeMetaPeer;
import com.usatiuk.dhfs.remoteobj.RemoteTransaction;
import com.usatiuk.objects.JObjectKey;
import com.usatiuk.objects.transaction.LockingStrategy;
import com.usatiuk.objects.transaction.Transaction;
import com.usatiuk.objects.transaction.TransactionManager;
import io.quarkus.logging.Log;
@@ -29,14 +28,10 @@ public class PeerInfoService {
@Inject
RemoteTransaction remoteTx;
private JKleppmannTreeManager.JKleppmannTree getTreeW() {
private JKleppmannTreeManager.JKleppmannTree getTree() {
return jKleppmannTreeManager.getTree(TREE_KEY, () -> null);
}
private JKleppmannTreeManager.JKleppmannTree getTreeR() {
return jKleppmannTreeManager.getTree(TREE_KEY, LockingStrategy.OPTIMISTIC, () -> null);
}
public Optional<PeerInfo> getPeerInfoImpl(JObjectKey key) {
return jObjectTxManager.run(() -> {
return curTx.get(JKleppmannTreeNodeHolder.class, key).map(JKleppmannTreeNodeHolder::node).flatMap(node -> {
@@ -49,7 +44,7 @@ public class PeerInfoService {
public boolean existsPeer(PeerId peer) {
return jObjectTxManager.run(() -> {
var gotKey = getTreeR().traverse(List.of(JKleppmannTreeNodeMetaPeer.peerIdToNodeId(peer).value()));
var gotKey = getTree().traverse(List.of(JKleppmannTreeNodeMetaPeer.peerIdToNodeId(peer).value()));
if (gotKey == null) {
return false;
}
@@ -59,7 +54,7 @@ public class PeerInfoService {
public Optional<PeerInfo> getPeerInfo(PeerId peer) {
return jObjectTxManager.run(() -> {
var gotKey = getTreeR().traverse(List.of(JKleppmannTreeNodeMetaPeer.peerIdToNodeId(peer).value()));
var gotKey = getTree().traverse(List.of(JKleppmannTreeNodeMetaPeer.peerIdToNodeId(peer).value()));
if (gotKey == null) {
return Optional.empty();
}
@@ -72,7 +67,7 @@ public class PeerInfoService {
public List<PeerInfo> getPeers() {
return jObjectTxManager.run(() -> {
var gotKey = getTreeR().traverse(List.of());
var gotKey = getTree().traverse(List.of());
return curTx.get(JKleppmannTreeNodeHolder.class, gotKey).map(JKleppmannTreeNodeHolder::node).map(
node -> node.children().keySet().stream()
.map(JObjectKey::of).map(this::getPeerInfoImpl)
@@ -113,16 +108,16 @@ public class PeerInfoService {
public void putPeer(PeerId id, byte[] cert) {
jObjectTxManager.run(() -> {
var parent = getTreeW().traverse(List.of());
var parent = getTree().traverse(List.of());
var newPeerInfo = new PeerInfo(id, cert);
remoteTx.putData(newPeerInfo);
getTreeW().move(parent, new JKleppmannTreeNodeMetaPeer(newPeerInfo.id()), JKleppmannTreeNodeMetaPeer.peerIdToNodeId(newPeerInfo.id()));
getTree().move(parent, new JKleppmannTreeNodeMetaPeer(newPeerInfo.id()), JKleppmannTreeNodeMetaPeer.peerIdToNodeId(newPeerInfo.id()));
});
}
public void removePeer(PeerId id) {
jObjectTxManager.run(() -> {
var gotKey = getTreeR().traverse(List.of(JKleppmannTreeNodeMetaPeer.peerIdToNodeId(id).value()));
var gotKey = getTree().traverse(List.of(JKleppmannTreeNodeMetaPeer.peerIdToNodeId(id).value()));
if (gotKey == null) {
return;
}
@@ -131,7 +126,7 @@ public class PeerInfoService {
Log.warn("Peer " + id + " not found in the tree");
return;
}
getTreeW().trash(node.meta(), node.key());
getTree().trash(node.meta(), node.key());
curTx.onCommit(persistentPeerDataService::updateCerts);
});
}

View File

@@ -12,7 +12,7 @@ import org.eclipse.microprofile.config.inject.ConfigProperty;
@ApplicationScoped
public class PeerLastSeenUpdater {
@Inject
ConnectedPeerManager connectedPeerManager;
ReachablePeerManager reachablePeerManager;
@Inject
PeerInfoService peerInfoService;
@Inject
@@ -30,7 +30,7 @@ public class PeerLastSeenUpdater {
@Scheduled(every = "${dhfs.objects.last-seen.update}", concurrentExecution = Scheduled.ConcurrentExecution.SKIP)
@Blocking
void update() {
var snapshot = connectedPeerManager.getHostStateSnapshot();
var snapshot = reachablePeerManager.getHostStateSnapshot();
for (var a : snapshot.available()) {
txm.run(() -> {
var curInfo = remoteTransaction.getData(PeerInfo.class, a.id()).orElse(null);

View File

@@ -52,7 +52,7 @@ public class PersistentPeerDataService {
@Inject
TransactionManager txm;
@Inject
ConnectedPeerManager connectedPeerManager;
ReachablePeerManager reachablePeerManager;
@ConfigProperty(name = "dhfs.peerdiscovery.preset-uuid")
Optional<String> presetUuid;
@@ -135,7 +135,7 @@ public class PersistentPeerDataService {
}
curTx.put(data.withInitialSyncDone(data.initialSyncDone().minus(peerId)));
Log.infov("Did reset sync state for {0}", peerId);
curTx.onCommit(() -> connectedPeerManager.handleConnectionError(peerId));
curTx.onCommit(() -> reachablePeerManager.handleConnectionError(peerId));
return true;
});
}

View File

@@ -30,7 +30,7 @@ import java.util.stream.Collectors;
import java.util.stream.Stream;
@ApplicationScoped
public class ConnectedPeerManager {
public class ReachablePeerManager {
private final ConcurrentMap<PeerId, PeerAddress> _states = new ConcurrentHashMap<>();
private final Collection<PeerConnectedEventListener> _connectedListeners;
private final Collection<PeerDisconnectedEventListener> _disconnectedListeners;
@@ -58,7 +58,7 @@ public class ConnectedPeerManager {
SyncHandler syncHandler;
private ExecutorService _heartbeatExecutor;
public ConnectedPeerManager(Instance<PeerConnectedEventListener> connectedListeners, Instance<PeerDisconnectedEventListener> disconnectedListeners) {
public ReachablePeerManager(Instance<PeerConnectedEventListener> connectedListeners, Instance<PeerDisconnectedEventListener> disconnectedListeners) {
_connectedListeners = List.copyOf(connectedListeners.stream().toList());
_disconnectedListeners = List.copyOf(disconnectedListeners.stream().toList());
}

View File

@@ -3,7 +3,6 @@ package com.usatiuk.dhfs.remoteobj;
import com.usatiuk.dhfs.peersync.PersistentPeerDataService;
import com.usatiuk.dhfs.rpc.RemoteObjectServiceClient;
import com.usatiuk.objects.JObjectKey;
import com.usatiuk.objects.transaction.LockingStrategy;
import com.usatiuk.objects.transaction.Transaction;
import io.quarkus.logging.Log;
import jakarta.inject.Inject;
@@ -55,11 +54,11 @@ public class RemoteTransaction {
}
@SuppressWarnings("unchecked")
private <T extends JDataRemote> Optional<T> getData(Class<T> type, JObjectKey key, LockingStrategy strategy, boolean tryRequest) {
return curTx.get(RemoteObjectMeta.class, RemoteObjectMeta.ofMetaKey(key), strategy)
private <T extends JDataRemote> Optional<T> getData(Class<T> type, JObjectKey key, boolean tryRequest) {
return curTx.get(RemoteObjectMeta.class, RemoteObjectMeta.ofMetaKey(key))
.flatMap(obj -> {
if (obj.hasLocalData()) {
var realData = curTx.get(RemoteObjectDataWrapper.class, RemoteObjectMeta.ofDataKey(key), strategy).orElse(null);
var realData = curTx.get(RemoteObjectDataWrapper.class, RemoteObjectMeta.ofDataKey(key)).orElse(null);
if (realData == null)
throw new IllegalStateException("Local data not found for " + key); // TODO: Race
if (!type.isInstance(realData.data()))
@@ -72,8 +71,8 @@ public class RemoteTransaction {
});
}
public Optional<RemoteObjectMeta> getMeta(JObjectKey key, LockingStrategy strategy) {
return curTx.get(RemoteObjectMeta.class, RemoteObjectMeta.ofMetaKey(key), strategy);
public Optional<RemoteObjectMeta> getMeta(JObjectKey key) {
return curTx.get(RemoteObjectMeta.class, RemoteObjectMeta.ofMetaKey(key));
}
public <T extends JDataRemote> void putDataRaw(T obj) {
@@ -127,23 +126,12 @@ public class RemoteTransaction {
curTx.put(newData);
}
public Optional<RemoteObjectMeta> getMeta(JObjectKey key) {
return getMeta(key, LockingStrategy.OPTIMISTIC);
}
public <T extends JDataRemote> Optional<T> getData(Class<T> type, JObjectKey key) {
return getData(type, key, LockingStrategy.OPTIMISTIC, true);
return getData(type, key, true);
}
public <T extends JDataRemote> Optional<T> getDataLocal(Class<T> type, JObjectKey key) {
return getData(type, key, LockingStrategy.OPTIMISTIC, false);
return getData(type, key, false);
}
public <T extends JDataRemote> Optional<T> getData(Class<T> type, JObjectKey key, LockingStrategy strategy) {
return getData(type, key, strategy, true);
}
public <T extends JDataRemote> Optional<T> getDataLocal(Class<T> type, JObjectKey key, LockingStrategy strategy) {
return getData(type, key, strategy, false);
}
}

View File

@@ -4,7 +4,7 @@ import com.usatiuk.dhfs.ProtoSerializer;
import com.usatiuk.dhfs.invalidation.InvalidationQueueService;
import com.usatiuk.dhfs.invalidation.Op;
import com.usatiuk.dhfs.peersync.PeerId;
import com.usatiuk.dhfs.peersync.ConnectedPeerManager;
import com.usatiuk.dhfs.peersync.ReachablePeerManager;
import com.usatiuk.dhfs.peersync.PersistentPeerDataService;
import com.usatiuk.dhfs.persistence.JObjectKeyP;
import com.usatiuk.dhfs.remoteobj.ReceivedObject;
@@ -51,7 +51,7 @@ public class RemoteObjectServiceClient {
@Inject
ProtoSerializer<GetObjectReply, ReceivedObject> receivedObjectProtoSerializer;
@Inject
ConnectedPeerManager connectedPeerManager;
ReachablePeerManager reachablePeerManager;
public Pair<PeerId, ReceivedObject> getSpecificObject(JObjectKey key, PeerId peerId) {
return rpcClientFactory.withObjSyncClient(peerId, (peer, client) -> {
@@ -70,7 +70,7 @@ public class RemoteObjectServiceClient {
var targetVersion = objMeta.versionSum();
var targets = objMeta.knownRemoteVersions().isEmpty()
? connectedPeerManager.getAvailableHosts()
? reachablePeerManager.getAvailableHosts()
: objMeta.knownRemoteVersions().entrySet().stream()
.filter(entry -> entry.getValue().equals(targetVersion))
.map(Map.Entry::getKey).toList();

View File

@@ -5,7 +5,7 @@ import com.usatiuk.dhfs.autosync.AutosyncProcessor;
import com.usatiuk.dhfs.invalidation.Op;
import com.usatiuk.dhfs.invalidation.OpHandlerService;
import com.usatiuk.dhfs.peersync.PeerId;
import com.usatiuk.dhfs.peersync.ConnectedPeerManager;
import com.usatiuk.dhfs.peersync.ReachablePeerManager;
import com.usatiuk.dhfs.persistence.JObjectKeyP;
import com.usatiuk.dhfs.remoteobj.*;
import com.usatiuk.dhfs.repository.*;
@@ -31,7 +31,7 @@ public class RemoteObjectServiceServerImpl {
@Inject
TransactionManager txm;
@Inject
ConnectedPeerManager connectedPeerManager;
ReachablePeerManager reachablePeerManager;
@Inject
Transaction curTx;

View File

@@ -4,7 +4,7 @@ import com.usatiuk.dhfs.peerdiscovery.IpPeerAddress;
import com.usatiuk.dhfs.peerdiscovery.PeerAddress;
import com.usatiuk.dhfs.peersync.PeerDisconnectedEventListener;
import com.usatiuk.dhfs.peersync.PeerId;
import com.usatiuk.dhfs.peersync.ConnectedPeerManager;
import com.usatiuk.dhfs.peersync.ReachablePeerManager;
import com.usatiuk.dhfs.repository.DhfsObjectSyncGrpcGrpc;
import io.grpc.ManagedChannel;
import io.grpc.Status;
@@ -29,7 +29,7 @@ public class RpcClientFactory implements PeerDisconnectedEventListener {
long syncTimeout;
@Inject
ConnectedPeerManager connectedPeerManager;
ReachablePeerManager reachablePeerManager;
@Inject
RpcChannelFactory rpcChannelFactory;
@@ -56,7 +56,7 @@ public class RpcClientFactory implements PeerDisconnectedEventListener {
}
public <R> R withObjSyncClient(PeerId target, ObjectSyncClientFunction<R> fn) {
var hostinfo = connectedPeerManager.getAddress(target);
var hostinfo = reachablePeerManager.getAddress(target);
if (hostinfo == null)
throw new StatusRuntimeException(Status.UNAVAILABLE.withDescription("Not known to be reachable: " + target));

View File

@@ -2,7 +2,7 @@ package com.usatiuk.dhfs.webapi;
import com.usatiuk.dhfs.peersync.PeerId;
import com.usatiuk.dhfs.peersync.PeerInfoService;
import com.usatiuk.dhfs.peersync.ConnectedPeerManager;
import com.usatiuk.dhfs.peersync.ReachablePeerManager;
import com.usatiuk.dhfs.peersync.PersistentPeerDataService;
import jakarta.inject.Inject;
import jakarta.ws.rs.*;
@@ -14,7 +14,7 @@ public class PeerManagementApi {
@Inject
PeerInfoService peerInfoService;
@Inject
ConnectedPeerManager connectedPeerManager;
ReachablePeerManager reachablePeerManager;
@Inject
PersistentPeerDataService persistentPeerDataService;
@@ -23,27 +23,27 @@ public class PeerManagementApi {
public List<PeerInfo> knownPeers() {
return peerInfoService.getPeers().stream().map(
peerInfo -> new PeerInfo(peerInfo.id().toString(), Base64.getEncoder().encodeToString(peerInfo.cert().toByteArray()),
Optional.ofNullable(connectedPeerManager.getAddress(peerInfo.id())).map(Objects::toString).orElse(null))).toList();
Optional.ofNullable(reachablePeerManager.getAddress(peerInfo.id())).map(Objects::toString).orElse(null))).toList();
}
@Path("known-peers/{peerId}")
@PUT
public void addPeer(@PathParam("peerId") String peerId, KnownPeerPut knownPeerPut) {
connectedPeerManager.addRemoteHost(PeerId.of(peerId), knownPeerPut.cert());
reachablePeerManager.addRemoteHost(PeerId.of(peerId), knownPeerPut.cert());
}
@Path("known-peers/{peerId}")
@DELETE
public void deletePeer(@PathParam("peerId") String peerId) {
connectedPeerManager.removeRemoteHost(PeerId.of(peerId));
reachablePeerManager.removeRemoteHost(PeerId.of(peerId));
}
@Path("available-peers")
@GET
public Collection<PeerInfo> availablePeers() {
return connectedPeerManager.getSeenButNotAddedHosts().stream()
return reachablePeerManager.getSeenButNotAddedHosts().stream()
.map(p -> new PeerInfo(p.getLeft().toString(), p.getRight().cert(),
connectedPeerManager.selectBestAddress(p.getLeft()).map(Objects::toString).orElse(null)))
reachablePeerManager.selectBestAddress(p.getLeft()).map(Objects::toString).orElse(null)))
.toList();
}
}

View File

@@ -3,7 +3,7 @@ package com.usatiuk.dhfs.webapi;
import com.usatiuk.dhfs.peerdiscovery.PeerAddrStringHelper;
import com.usatiuk.dhfs.peersync.PeerId;
import com.usatiuk.dhfs.peersync.PeerInfoService;
import com.usatiuk.dhfs.peersync.ConnectedPeerManager;
import com.usatiuk.dhfs.peersync.ReachablePeerManager;
import com.usatiuk.dhfs.peersync.PersistentPeerDataService;
import jakarta.inject.Inject;
import jakarta.ws.rs.*;
@@ -15,7 +15,7 @@ public class PersistentPeerAddressApi {
@Inject
PeerInfoService peerInfoService;
@Inject
ConnectedPeerManager connectedPeerManager;
ReachablePeerManager reachablePeerManager;
@Inject
PersistentPeerDataService persistentPeerDataService;