3 Commits

Author SHA1 Message Date
52ccbb99bc Sync-base: rename ConnectedPeerManager to ReachablePeerManager
tests check for "connected" in logs
2025-05-06 20:28:21 +02:00
d972cd1562 Objects: remove LockingStrategy 2025-05-06 20:21:29 +02:00
80151bcca5 Dhfs-fuse: less parallel e2e tests 2025-05-06 20:07:03 +02:00
21 changed files with 84 additions and 136 deletions

View File

@@ -8,7 +8,6 @@ import com.usatiuk.dhfs.remoteobj.*;
import com.usatiuk.dhfsfs.service.DhfsFileService; import com.usatiuk.dhfsfs.service.DhfsFileService;
import com.usatiuk.kleppmanntree.AlreadyExistsException; import com.usatiuk.kleppmanntree.AlreadyExistsException;
import com.usatiuk.objects.JObjectKey; import com.usatiuk.objects.JObjectKey;
import com.usatiuk.objects.transaction.LockingStrategy;
import com.usatiuk.objects.transaction.Transaction; import com.usatiuk.objects.transaction.Transaction;
import io.grpc.Status; import io.grpc.Status;
import io.grpc.StatusRuntimeException; import io.grpc.StatusRuntimeException;
@@ -42,14 +41,10 @@ public class FileSyncHandler implements ObjSyncHandler<File, FileDto> {
@Inject @Inject
DhfsFileService fileService; DhfsFileService fileService;
private JKleppmannTreeManager.JKleppmannTree getTreeW() { private JKleppmannTreeManager.JKleppmannTree getTree() {
return jKleppmannTreeManager.getTree(JObjectKey.of("fs")).orElseThrow(); return jKleppmannTreeManager.getTree(JObjectKey.of("fs")).orElseThrow();
} }
private JKleppmannTreeManager.JKleppmannTree getTreeR() {
return jKleppmannTreeManager.getTree(JObjectKey.of("fs"), LockingStrategy.OPTIMISTIC).orElseThrow();
}
private void resolveConflict(PeerId from, JObjectKey key, PMap<PeerId, Long> receivedChangelog, private void resolveConflict(PeerId from, JObjectKey key, PMap<PeerId, Long> receivedChangelog,
@Nullable FileDto receivedData) { @Nullable FileDto receivedData) {
var oursCurMeta = curTx.get(RemoteObjectMeta.class, key).orElse(null); var oursCurMeta = curTx.get(RemoteObjectMeta.class, key).orElse(null);
@@ -131,12 +126,12 @@ public class FileSyncHandler implements ObjSyncHandler<File, FileDto> {
do { do {
try { try {
getTreeW().move(parent.getRight(), getTree().move(parent.getRight(),
new JKleppmannTreeNodeMetaFile( new JKleppmannTreeNodeMetaFile(
parent.getLeft() + ".fconflict." + persistentPeerDataService.getSelfUuid() + "." + otherHostname.toString() + "." + i, parent.getLeft() + ".fconflict." + persistentPeerDataService.getSelfUuid() + "." + otherHostname.toString() + "." + i,
newFile.key() newFile.key()
), ),
getTreeW().getNewNodeId() getTree().getNewNodeId()
); );
} catch (AlreadyExistsException aex) { } catch (AlreadyExistsException aex) {
i++; i++;

View File

@@ -19,7 +19,6 @@ import com.usatiuk.dhfsfs.objects.JKleppmannTreeNodeMetaFile;
import com.usatiuk.objects.JData; import com.usatiuk.objects.JData;
import com.usatiuk.objects.JObjectKey; import com.usatiuk.objects.JObjectKey;
import com.usatiuk.objects.iterators.IteratorStart; import com.usatiuk.objects.iterators.IteratorStart;
import com.usatiuk.objects.transaction.LockingStrategy;
import com.usatiuk.objects.transaction.Transaction; import com.usatiuk.objects.transaction.Transaction;
import com.usatiuk.objects.transaction.TransactionManager; import com.usatiuk.objects.transaction.TransactionManager;
import com.usatiuk.utils.StatusRuntimeExceptionNoStacktrace; import com.usatiuk.utils.StatusRuntimeExceptionNoStacktrace;
@@ -68,14 +67,10 @@ public class DhfsFileService {
@Inject @Inject
JMapHelper jMapHelper; JMapHelper jMapHelper;
private JKleppmannTreeManager.JKleppmannTree getTreeW() { private JKleppmannTreeManager.JKleppmannTree getTree() {
return jKleppmannTreeManager.getTree(JObjectKey.of("fs"), () -> new JKleppmannTreeNodeMetaDirectory("")); return jKleppmannTreeManager.getTree(JObjectKey.of("fs"), () -> new JKleppmannTreeNodeMetaDirectory(""));
} }
private JKleppmannTreeManager.JKleppmannTree getTreeR() {
return jKleppmannTreeManager.getTree(JObjectKey.of("fs"), LockingStrategy.OPTIMISTIC, () -> new JKleppmannTreeNodeMetaDirectory(""));
}
private ChunkData createChunk(ByteString bytes) { private ChunkData createChunk(ByteString bytes) {
var newChunk = new ChunkData(JObjectKey.of(UUID.randomUUID().toString()), bytes); var newChunk = new ChunkData(JObjectKey.of(UUID.randomUUID().toString()), bytes);
remoteTx.putDataNew(newChunk); remoteTx.putDataNew(newChunk);
@@ -84,25 +79,25 @@ public class DhfsFileService {
void init(@Observes @Priority(500) StartupEvent event) { void init(@Observes @Priority(500) StartupEvent event) {
Log.info("Initializing file service"); Log.info("Initializing file service");
getTreeW(); getTree();
} }
private JKleppmannTreeNode getDirEntryW(String name) { private JKleppmannTreeNode getDirEntryW(String name) {
var res = getTreeW().traverse(StreamSupport.stream(Path.of(name).spliterator(), false).map(p -> p.toString()).toList()); var res = getTree().traverse(StreamSupport.stream(Path.of(name).spliterator(), false).map(p -> p.toString()).toList());
if (res == null) throw new StatusRuntimeExceptionNoStacktrace(Status.NOT_FOUND); if (res == null) throw new StatusRuntimeExceptionNoStacktrace(Status.NOT_FOUND);
var ret = curTx.get(JKleppmannTreeNodeHolder.class, res).map(JKleppmannTreeNodeHolder::node).orElseThrow(() -> new StatusRuntimeException(Status.NOT_FOUND.withDescription("Tree node exists but not found as jObject: " + name))); var ret = curTx.get(JKleppmannTreeNodeHolder.class, res).map(JKleppmannTreeNodeHolder::node).orElseThrow(() -> new StatusRuntimeException(Status.NOT_FOUND.withDescription("Tree node exists but not found as jObject: " + name)));
return ret; return ret;
} }
private JKleppmannTreeNode getDirEntryR(String name) { private JKleppmannTreeNode getDirEntryR(String name) {
var res = getTreeR().traverse(StreamSupport.stream(Path.of(name).spliterator(), false).map(p -> p.toString()).toList()); var res = getTree().traverse(StreamSupport.stream(Path.of(name).spliterator(), false).map(p -> p.toString()).toList());
if (res == null) throw new StatusRuntimeExceptionNoStacktrace(Status.NOT_FOUND); if (res == null) throw new StatusRuntimeExceptionNoStacktrace(Status.NOT_FOUND);
var ret = curTx.get(JKleppmannTreeNodeHolder.class, res).map(JKleppmannTreeNodeHolder::node).orElseThrow(() -> new StatusRuntimeException(Status.NOT_FOUND.withDescription("Tree node exists but not found as jObject: " + name))); var ret = curTx.get(JKleppmannTreeNodeHolder.class, res).map(JKleppmannTreeNodeHolder::node).orElseThrow(() -> new StatusRuntimeException(Status.NOT_FOUND.withDescription("Tree node exists but not found as jObject: " + name)));
return ret; return ret;
} }
private Optional<JKleppmannTreeNode> getDirEntryOpt(String name) { private Optional<JKleppmannTreeNode> getDirEntryOpt(String name) {
var res = getTreeW().traverse(StreamSupport.stream(Path.of(name).spliterator(), false).map(p -> p.toString()).toList()); var res = getTree().traverse(StreamSupport.stream(Path.of(name).spliterator(), false).map(p -> p.toString()).toList());
if (res == null) return Optional.empty(); if (res == null) return Optional.empty();
var ret = curTx.get(JKleppmannTreeNodeHolder.class, res).map(JKleppmannTreeNodeHolder::node); var ret = curTx.get(JKleppmannTreeNodeHolder.class, res).map(JKleppmannTreeNodeHolder::node);
return ret; return ret;
@@ -167,7 +162,7 @@ public class DhfsFileService {
remoteTx.putData(f); remoteTx.putData(f);
try { try {
getTreeW().move(parent.key(), new JKleppmannTreeNodeMetaFile(fname, f.key()), getTreeW().getNewNodeId()); getTree().move(parent.key(), new JKleppmannTreeNodeMetaFile(fname, f.key()), getTree().getNewNodeId());
} catch (Exception e) { } catch (Exception e) {
// fobj.getMeta().removeRef(newNodeId); // fobj.getMeta().removeRef(newNodeId);
throw e; throw e;
@@ -179,7 +174,7 @@ public class DhfsFileService {
//FIXME: Slow.. //FIXME: Slow..
public Pair<String, JObjectKey> inoToParent(JObjectKey ino) { public Pair<String, JObjectKey> inoToParent(JObjectKey ino) {
return jObjectTxManager.executeTx(() -> { return jObjectTxManager.executeTx(() -> {
return getTreeW().findParent(w -> { return getTree().findParent(w -> {
if (w.meta() instanceof JKleppmannTreeNodeMetaFile f) if (w.meta() instanceof JKleppmannTreeNodeMetaFile f)
return f.fileIno().equals(ino); return f.fileIno().equals(ino);
return false; return false;
@@ -197,7 +192,7 @@ public class DhfsFileService {
Log.debug("Creating directory " + name); Log.debug("Creating directory " + name);
getTreeW().move(parent.key(), new JKleppmannTreeNodeMetaDirectory(dname), getTreeW().getNewNodeId()); getTree().move(parent.key(), new JKleppmannTreeNodeMetaDirectory(dname), getTree().getNewNodeId());
}); });
} }
@@ -210,7 +205,7 @@ public class DhfsFileService {
if (!allowRecursiveDelete && !node.children().isEmpty()) if (!allowRecursiveDelete && !node.children().isEmpty())
throw new DirectoryNotEmptyException(); throw new DirectoryNotEmptyException();
} }
getTreeW().trash(node.meta(), node.key()); getTree().trash(node.meta(), node.key());
}); });
} }
@@ -223,7 +218,7 @@ public class DhfsFileService {
var toDentry = getDirEntryW(toPath.getParent().toString()); var toDentry = getDirEntryW(toPath.getParent().toString());
ensureDir(toDentry); ensureDir(toDentry);
getTreeW().move(toDentry.key(), meta.withName(toPath.getFileName().toString()), node.key()); getTree().move(toDentry.key(), meta.withName(toPath.getFileName().toString()), node.key());
return true; return true;
}); });
} }
@@ -344,7 +339,7 @@ public class DhfsFileService {
if (offset < 0) if (offset < 0)
throw new StatusRuntimeException(Status.INVALID_ARGUMENT.withDescription("Offset should be more than zero: " + offset)); throw new StatusRuntimeException(Status.INVALID_ARGUMENT.withDescription("Offset should be more than zero: " + offset));
var file = remoteTx.getData(File.class, fileUuid, LockingStrategy.WRITE).orElse(null); var file = remoteTx.getData(File.class, fileUuid).orElse(null);
if (file == null) { if (file == null) {
throw new StatusRuntimeException(Status.NOT_FOUND.withDescription("File not found when trying to write: " + fileUuid)); throw new StatusRuntimeException(Status.NOT_FOUND.withDescription("File not found when trying to write: " + fileUuid));
} }
@@ -595,7 +590,7 @@ public class DhfsFileService {
jMapHelper.put(f, JMapLongKey.of(0), newChunkData.key()); jMapHelper.put(f, JMapLongKey.of(0), newChunkData.key());
remoteTx.putData(f); remoteTx.putData(f);
getTreeW().move(parent.key(), new JKleppmannTreeNodeMetaFile(fname, f.key()), getTreeW().getNewNodeId()); getTree().move(parent.key(), new JKleppmannTreeNodeMetaFile(fname, f.key()), getTree().getNewNodeId());
return f.key(); return f.key();
}); });
} }

View File

@@ -139,7 +139,7 @@
<groupId>org.apache.maven.plugins</groupId> <groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-failsafe-plugin</artifactId> <artifactId>maven-failsafe-plugin</artifactId>
<configuration> <configuration>
<forkCount>1C</forkCount> <forkCount>0.5C</forkCount>
<reuseForks>false</reuseForks> <reuseForks>false</reuseForks>
<parallel>classes</parallel> <parallel>classes</parallel>
<systemPropertyVariables> <systemPropertyVariables>

View File

@@ -25,8 +25,8 @@ public class CurrentTransaction implements Transaction {
} }
@Override @Override
public <T extends JData> Optional<T> get(Class<T> type, JObjectKey key, LockingStrategy strategy) { public <T extends JData> Optional<T> get(Class<T> type, JObjectKey key) {
return transactionManager.current().get(type, key, strategy); return transactionManager.current().get(type, key);
} }
@Override @Override

View File

@@ -1,6 +0,0 @@
package com.usatiuk.objects.transaction;
public enum LockingStrategy {
OPTIMISTIC, // Optimistic write, no blocking other possible writers/readers
WRITE, // Write lock, blocks all other writers
}

View File

@@ -11,17 +11,13 @@ import java.util.Optional;
public interface Transaction extends TransactionHandle { public interface Transaction extends TransactionHandle {
void onCommit(Runnable runnable); void onCommit(Runnable runnable);
<T extends JData> Optional<T> get(Class<T> type, JObjectKey key, LockingStrategy strategy); <T extends JData> Optional<T> get(Class<T> type, JObjectKey key);
<T extends JData> void put(JData obj); <T extends JData> void put(JData obj);
<T extends JData> void putNew(JData obj); <T extends JData> void putNew(JData obj);
void delete(JObjectKey key); void delete(JObjectKey key);
default <T extends JData> Optional<T> get(Class<T> type, JObjectKey key) {
return get(type, key, LockingStrategy.OPTIMISTIC);
}
CloseableKvIterator<JObjectKey, JData> getIterator(IteratorStart start, JObjectKey key); CloseableKvIterator<JObjectKey, JData> getIterator(IteratorStart start, JObjectKey key);
default CloseableKvIterator<JObjectKey, JData> getIterator(JObjectKey key) { default CloseableKvIterator<JObjectKey, JData> getIterator(JObjectKey key) {

View File

@@ -108,7 +108,7 @@ public class TransactionFactoryImpl implements TransactionFactory {
} }
@Override @Override
public <T extends JData> Optional<T> get(Class<T> type, JObjectKey key, LockingStrategy strategy) { public <T extends JData> Optional<T> get(Class<T> type, JObjectKey key) {
return switch (_writes.get(key)) { return switch (_writes.get(key)) {
case TxRecord.TxObjectRecordWrite<?> write -> Optional.of(type.cast(write.data())); case TxRecord.TxObjectRecordWrite<?> write -> Optional.of(type.cast(write.data()));
case TxRecord.TxObjectRecordDeleted deleted -> Optional.empty(); case TxRecord.TxObjectRecordDeleted deleted -> Optional.empty();

View File

@@ -2,14 +2,11 @@ package com.usatiuk.objects;
import com.usatiuk.objects.data.Parent; import com.usatiuk.objects.data.Parent;
import com.usatiuk.objects.iterators.IteratorStart; import com.usatiuk.objects.iterators.IteratorStart;
import com.usatiuk.objects.transaction.LockingStrategy;
import com.usatiuk.objects.transaction.Transaction; import com.usatiuk.objects.transaction.Transaction;
import com.usatiuk.objects.transaction.TransactionManager; import com.usatiuk.objects.transaction.TransactionManager;
import io.quarkus.logging.Log; import io.quarkus.logging.Log;
import jakarta.inject.Inject; import jakarta.inject.Inject;
import org.junit.jupiter.api.*; import org.junit.jupiter.api.*;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@@ -151,12 +148,12 @@ public abstract class ObjectsTestImpl {
}); });
txm.run(() -> { txm.run(() -> {
var parent = curTx.get(Parent.class, JObjectKey.of(testInfo.getDisplayName() + "Parent3"), LockingStrategy.OPTIMISTIC).orElse(null); var parent = curTx.get(Parent.class, JObjectKey.of(testInfo.getDisplayName() + "Parent3")).orElse(null);
Assertions.assertEquals("John", parent.name()); Assertions.assertEquals("John", parent.name());
curTx.put(parent.withName("John2")); curTx.put(parent.withName("John2"));
}); });
txm.run(() -> { txm.run(() -> {
var parent = curTx.get(Parent.class, JObjectKey.of(testInfo.getDisplayName() + "Parent3"), LockingStrategy.WRITE).orElse(null); var parent = curTx.get(Parent.class, JObjectKey.of(testInfo.getDisplayName() + "Parent3")).orElse(null);
Assertions.assertEquals("John2", parent.name()); Assertions.assertEquals("John2", parent.name());
curTx.put(parent.withName("John3")); curTx.put(parent.withName("John3"));
}); });
@@ -236,10 +233,9 @@ public abstract class ObjectsTestImpl {
} }
} }
@ParameterizedTest @Test
@EnumSource(LockingStrategy.class) void editConflict(TestInfo testInfo) {
void editConflict(LockingStrategy strategy, TestInfo testInfo) { String key = testInfo.getDisplayName() + "Parent4";
String key = testInfo.getDisplayName() + "Parent4" + strategy.name();
txm.run(() -> { txm.run(() -> {
var newParent = new Parent(JObjectKey.of(key), "John3"); var newParent = new Parent(JObjectKey.of(key), "John3");
curTx.put(newParent); curTx.put(newParent);
@@ -260,7 +256,7 @@ public abstract class ObjectsTestImpl {
} catch (Throwable e) { } catch (Throwable e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
var parent = curTx.get(Parent.class, JObjectKey.of(key), strategy).orElse(null); var parent = curTx.get(Parent.class, JObjectKey.of(key)).orElse(null);
curTx.put(parent.withName("John")); curTx.put(parent.withName("John"));
Log.warn("Thread 1 commit"); Log.warn("Thread 1 commit");
}, 0); }, 0);
@@ -276,7 +272,7 @@ public abstract class ObjectsTestImpl {
Log.warn("Thread 2"); Log.warn("Thread 2");
barrier.await(); // Ensure thread 2 tx id is larger than thread 1 barrier.await(); // Ensure thread 2 tx id is larger than thread 1
txm.runTries(() -> { txm.runTries(() -> {
var parent = curTx.get(Parent.class, JObjectKey.of(key), strategy).orElse(null); var parent = curTx.get(Parent.class, JObjectKey.of(key)).orElse(null);
curTx.put(parent.withName("John2")); curTx.put(parent.withName("John2"));
Log.warn("Thread 2 commit"); Log.warn("Thread 2 commit");
}, 0); }, 0);
@@ -317,10 +313,9 @@ public abstract class ObjectsTestImpl {
} }
} }
@ParameterizedTest @Test
@EnumSource(LockingStrategy.class) void editConflict2(TestInfo testInfo) {
void editConflict2(LockingStrategy strategy, TestInfo testInfo) { String key = testInfo.getDisplayName() + "EditConflict2";
String key = testInfo.getDisplayName() + "EditConflict2" + strategy.name();
txm.run(() -> { txm.run(() -> {
var newParent = new Parent(JObjectKey.of(key), "John3"); var newParent = new Parent(JObjectKey.of(key), "John3");
curTx.put(newParent); curTx.put(newParent);
@@ -341,7 +336,7 @@ public abstract class ObjectsTestImpl {
} catch (Throwable e) { } catch (Throwable e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
var parent = curTx.get(Parent.class, JObjectKey.of(key), strategy).orElse(null); var parent = curTx.get(Parent.class, JObjectKey.of(key)).orElse(null);
curTx.put(parent.withName("John")); curTx.put(parent.withName("John"));
Log.warn("Thread 1 commit"); Log.warn("Thread 1 commit");
}, 0); }, 0);
@@ -362,7 +357,7 @@ public abstract class ObjectsTestImpl {
} catch (Throwable e) { } catch (Throwable e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
var parent = curTx.get(Parent.class, JObjectKey.of(key), strategy).orElse(null); var parent = curTx.get(Parent.class, JObjectKey.of(key)).orElse(null);
curTx.put(parent.withName("John2")); curTx.put(parent.withName("John2"));
Log.warn("Thread 2 commit"); Log.warn("Thread 2 commit");
}, 0); }, 0);
@@ -922,10 +917,8 @@ public abstract class ObjectsTestImpl {
() -> createGetObject(testInfo), () -> createGetObject(testInfo),
() -> createDeleteObject(testInfo), () -> createDeleteObject(testInfo),
() -> createCreateObject(testInfo), () -> createCreateObject(testInfo),
() -> editConflict(LockingStrategy.WRITE, testInfo), () -> editConflict(testInfo),
() -> editConflict(LockingStrategy.OPTIMISTIC, testInfo), () -> editConflict2(testInfo),
() -> editConflict2(LockingStrategy.WRITE, testInfo),
() -> editConflict2(LockingStrategy.OPTIMISTIC, testInfo),
() -> snapshotTest1(testInfo), () -> snapshotTest1(testInfo),
() -> snapshotTest2(testInfo), () -> snapshotTest2(testInfo),
() -> snapshotTest3(testInfo), () -> snapshotTest3(testInfo),

View File

@@ -2,7 +2,7 @@ package com.usatiuk.dhfs.invalidation;
import com.usatiuk.dhfs.peersync.PeerConnectedEventListener; import com.usatiuk.dhfs.peersync.PeerConnectedEventListener;
import com.usatiuk.dhfs.peersync.PeerId; import com.usatiuk.dhfs.peersync.PeerId;
import com.usatiuk.dhfs.peersync.ConnectedPeerManager; import com.usatiuk.dhfs.peersync.ReachablePeerManager;
import com.usatiuk.utils.SerializationHelper; import com.usatiuk.utils.SerializationHelper;
import io.quarkus.logging.Log; import io.quarkus.logging.Log;
import io.quarkus.runtime.ShutdownEvent; import io.quarkus.runtime.ShutdownEvent;
@@ -24,7 +24,7 @@ import java.nio.file.Paths;
public class DeferredInvalidationQueueService implements PeerConnectedEventListener { public class DeferredInvalidationQueueService implements PeerConnectedEventListener {
private static final String dataFileName = "invqueue"; private static final String dataFileName = "invqueue";
@Inject @Inject
ConnectedPeerManager remoteHostManager; ReachablePeerManager reachablePeerManager;
@Inject @Inject
InvalidationQueueService invalidationQueueService; InvalidationQueueService invalidationQueueService;
@ConfigProperty(name = "dhfs.objects.persistence.files.root") @ConfigProperty(name = "dhfs.objects.persistence.files.root")
@@ -63,7 +63,7 @@ public class DeferredInvalidationQueueService implements PeerConnectedEventListe
@Scheduled(every = "15s", concurrentExecution = Scheduled.ConcurrentExecution.SKIP) @Scheduled(every = "15s", concurrentExecution = Scheduled.ConcurrentExecution.SKIP)
@Blocking @Blocking
void periodicReturn() { void periodicReturn() {
for (var reachable : remoteHostManager.getAvailableHosts()) for (var reachable : reachablePeerManager.getAvailableHosts())
returnForHost(reachable); returnForHost(reachable);
} }

View File

@@ -2,7 +2,7 @@ package com.usatiuk.dhfs.invalidation;
import com.usatiuk.dhfs.peersync.PeerId; import com.usatiuk.dhfs.peersync.PeerId;
import com.usatiuk.dhfs.peersync.PeerInfoService; import com.usatiuk.dhfs.peersync.PeerInfoService;
import com.usatiuk.dhfs.peersync.ConnectedPeerManager; import com.usatiuk.dhfs.peersync.ReachablePeerManager;
import com.usatiuk.dhfs.peersync.PersistentPeerDataService; import com.usatiuk.dhfs.peersync.PersistentPeerDataService;
import com.usatiuk.dhfs.rpc.RemoteObjectServiceClient; import com.usatiuk.dhfs.rpc.RemoteObjectServiceClient;
import com.usatiuk.objects.JData; import com.usatiuk.objects.JData;
@@ -37,7 +37,7 @@ public class InvalidationQueueService {
private final AtomicReference<ConcurrentHashSet<JObjectKey>> _toAllQueue = new AtomicReference<>(new ConcurrentHashSet<>()); private final AtomicReference<ConcurrentHashSet<JObjectKey>> _toAllQueue = new AtomicReference<>(new ConcurrentHashSet<>());
private final DataLocker _locker = new DataLocker(); private final DataLocker _locker = new DataLocker();
@Inject @Inject
ConnectedPeerManager remoteHostManager; ReachablePeerManager reachablePeerManager;
@Inject @Inject
DeferredInvalidationQueueService deferredInvalidationQueueService; DeferredInvalidationQueueService deferredInvalidationQueueService;
@Inject @Inject
@@ -103,7 +103,7 @@ public class InvalidationQueueService {
} }
if (toAllQueue != null) { if (toAllQueue != null) {
var hostInfo = remoteHostManager.getHostStateSnapshot(); var hostInfo = reachablePeerManager.getHostStateSnapshot();
for (var o : toAllQueue) { for (var o : toAllQueue) {
for (var h : hostInfo.available()) for (var h : hostInfo.available())
_queue.add(new InvalidationQueueEntry(h, o)); _queue.add(new InvalidationQueueEntry(h, o));
@@ -129,7 +129,7 @@ public class InvalidationQueueService {
continue; continue;
} }
if (!remoteHostManager.isReachable(e.peer())) { if (!reachablePeerManager.isReachable(e.peer())) {
deferredInvalidationQueueService.defer(e); deferredInvalidationQueueService.defer(e);
continue; continue;
} }
@@ -210,14 +210,14 @@ public class InvalidationQueueService {
} }
void pushInvalidationToOne(InvalidationQueueEntry entry) { void pushInvalidationToOne(InvalidationQueueEntry entry) {
if (remoteHostManager.isReachable(entry.peer())) if (reachablePeerManager.isReachable(entry.peer()))
_queue.add(entry); _queue.add(entry);
else else
deferredInvalidationQueueService.defer(entry); deferredInvalidationQueueService.defer(entry);
} }
void pushInvalidationToOneNoDelay(InvalidationQueueEntry entry) { void pushInvalidationToOneNoDelay(InvalidationQueueEntry entry) {
if (remoteHostManager.isReachable(entry.peer())) if (reachablePeerManager.isReachable(entry.peer()))
_queue.addNoDelay(entry); _queue.addNoDelay(entry);
else else
deferredInvalidationQueueService.defer(entry); deferredInvalidationQueueService.defer(entry);

View File

@@ -10,7 +10,6 @@ import com.usatiuk.dhfs.peersync.PeerInfoService;
import com.usatiuk.dhfs.peersync.PersistentPeerDataService; import com.usatiuk.dhfs.peersync.PersistentPeerDataService;
import com.usatiuk.kleppmanntree.*; import com.usatiuk.kleppmanntree.*;
import com.usatiuk.objects.JObjectKey; import com.usatiuk.objects.JObjectKey;
import com.usatiuk.objects.transaction.LockingStrategy;
import com.usatiuk.objects.transaction.Transaction; import com.usatiuk.objects.transaction.Transaction;
import com.usatiuk.objects.transaction.TransactionManager; import com.usatiuk.objects.transaction.TransactionManager;
import io.quarkus.logging.Log; import io.quarkus.logging.Log;
@@ -39,9 +38,9 @@ public class JKleppmannTreeManager {
@Inject @Inject
PersistentPeerDataService persistentPeerDataService; PersistentPeerDataService persistentPeerDataService;
public JKleppmannTree getTree(JObjectKey name, LockingStrategy lockingStrategy, Supplier<JKleppmannTreeNodeMeta> rootNodeSupplier) { public JKleppmannTree getTree(JObjectKey name, Supplier<JKleppmannTreeNodeMeta> rootNodeSupplier) {
return txManager.executeTx(() -> { return txManager.executeTx(() -> {
var data = curTx.get(JKleppmannTreePersistentData.class, name, lockingStrategy).orElse(null); var data = curTx.get(JKleppmannTreePersistentData.class, name).orElse(null);
if (data == null) { if (data == null) {
data = new JKleppmannTreePersistentData( data = new JKleppmannTreePersistentData(
name, name,
@@ -66,18 +65,11 @@ public class JKleppmannTreeManager {
} }
public Optional<JKleppmannTree> getTree(JObjectKey name) { public Optional<JKleppmannTree> getTree(JObjectKey name) {
return getTree(name, LockingStrategy.WRITE);
}
public Optional<JKleppmannTree> getTree(JObjectKey name, LockingStrategy lockingStrategy) {
return txManager.executeTx(() -> { return txManager.executeTx(() -> {
return curTx.get(JKleppmannTreePersistentData.class, name, lockingStrategy).map(JKleppmannTree::new); return curTx.get(JKleppmannTreePersistentData.class, name).map(JKleppmannTree::new);
}); });
} }
public JKleppmannTree getTree(JObjectKey name, Supplier<JKleppmannTreeNodeMeta> rootNodeSupplier) {
return getTree(name, LockingStrategy.WRITE, rootNodeSupplier);
}
public class JKleppmannTree { public class JKleppmannTree {
private final KleppmannTree<Long, PeerId, JKleppmannTreeNodeMeta, JObjectKey> _tree; private final KleppmannTree<Long, PeerId, JKleppmannTreeNodeMeta, JObjectKey> _tree;

View File

@@ -5,7 +5,6 @@ import com.usatiuk.dhfs.jkleppmanntree.structs.JKleppmannTreeNodeHolder;
import com.usatiuk.dhfs.peersync.structs.JKleppmannTreeNodeMetaPeer; import com.usatiuk.dhfs.peersync.structs.JKleppmannTreeNodeMetaPeer;
import com.usatiuk.dhfs.remoteobj.RemoteTransaction; import com.usatiuk.dhfs.remoteobj.RemoteTransaction;
import com.usatiuk.objects.JObjectKey; import com.usatiuk.objects.JObjectKey;
import com.usatiuk.objects.transaction.LockingStrategy;
import com.usatiuk.objects.transaction.Transaction; import com.usatiuk.objects.transaction.Transaction;
import com.usatiuk.objects.transaction.TransactionManager; import com.usatiuk.objects.transaction.TransactionManager;
import io.quarkus.logging.Log; import io.quarkus.logging.Log;
@@ -29,14 +28,10 @@ public class PeerInfoService {
@Inject @Inject
RemoteTransaction remoteTx; RemoteTransaction remoteTx;
private JKleppmannTreeManager.JKleppmannTree getTreeW() { private JKleppmannTreeManager.JKleppmannTree getTree() {
return jKleppmannTreeManager.getTree(TREE_KEY, () -> null); return jKleppmannTreeManager.getTree(TREE_KEY, () -> null);
} }
private JKleppmannTreeManager.JKleppmannTree getTreeR() {
return jKleppmannTreeManager.getTree(TREE_KEY, LockingStrategy.OPTIMISTIC, () -> null);
}
public Optional<PeerInfo> getPeerInfoImpl(JObjectKey key) { public Optional<PeerInfo> getPeerInfoImpl(JObjectKey key) {
return jObjectTxManager.run(() -> { return jObjectTxManager.run(() -> {
return curTx.get(JKleppmannTreeNodeHolder.class, key).map(JKleppmannTreeNodeHolder::node).flatMap(node -> { return curTx.get(JKleppmannTreeNodeHolder.class, key).map(JKleppmannTreeNodeHolder::node).flatMap(node -> {
@@ -49,7 +44,7 @@ public class PeerInfoService {
public boolean existsPeer(PeerId peer) { public boolean existsPeer(PeerId peer) {
return jObjectTxManager.run(() -> { return jObjectTxManager.run(() -> {
var gotKey = getTreeR().traverse(List.of(JKleppmannTreeNodeMetaPeer.peerIdToNodeId(peer).value())); var gotKey = getTree().traverse(List.of(JKleppmannTreeNodeMetaPeer.peerIdToNodeId(peer).value()));
if (gotKey == null) { if (gotKey == null) {
return false; return false;
} }
@@ -59,7 +54,7 @@ public class PeerInfoService {
public Optional<PeerInfo> getPeerInfo(PeerId peer) { public Optional<PeerInfo> getPeerInfo(PeerId peer) {
return jObjectTxManager.run(() -> { return jObjectTxManager.run(() -> {
var gotKey = getTreeR().traverse(List.of(JKleppmannTreeNodeMetaPeer.peerIdToNodeId(peer).value())); var gotKey = getTree().traverse(List.of(JKleppmannTreeNodeMetaPeer.peerIdToNodeId(peer).value()));
if (gotKey == null) { if (gotKey == null) {
return Optional.empty(); return Optional.empty();
} }
@@ -72,7 +67,7 @@ public class PeerInfoService {
public List<PeerInfo> getPeers() { public List<PeerInfo> getPeers() {
return jObjectTxManager.run(() -> { return jObjectTxManager.run(() -> {
var gotKey = getTreeR().traverse(List.of()); var gotKey = getTree().traverse(List.of());
return curTx.get(JKleppmannTreeNodeHolder.class, gotKey).map(JKleppmannTreeNodeHolder::node).map( return curTx.get(JKleppmannTreeNodeHolder.class, gotKey).map(JKleppmannTreeNodeHolder::node).map(
node -> node.children().keySet().stream() node -> node.children().keySet().stream()
.map(JObjectKey::of).map(this::getPeerInfoImpl) .map(JObjectKey::of).map(this::getPeerInfoImpl)
@@ -113,16 +108,16 @@ public class PeerInfoService {
public void putPeer(PeerId id, byte[] cert) { public void putPeer(PeerId id, byte[] cert) {
jObjectTxManager.run(() -> { jObjectTxManager.run(() -> {
var parent = getTreeW().traverse(List.of()); var parent = getTree().traverse(List.of());
var newPeerInfo = new PeerInfo(id, cert); var newPeerInfo = new PeerInfo(id, cert);
remoteTx.putData(newPeerInfo); remoteTx.putData(newPeerInfo);
getTreeW().move(parent, new JKleppmannTreeNodeMetaPeer(newPeerInfo.id()), JKleppmannTreeNodeMetaPeer.peerIdToNodeId(newPeerInfo.id())); getTree().move(parent, new JKleppmannTreeNodeMetaPeer(newPeerInfo.id()), JKleppmannTreeNodeMetaPeer.peerIdToNodeId(newPeerInfo.id()));
}); });
} }
public void removePeer(PeerId id) { public void removePeer(PeerId id) {
jObjectTxManager.run(() -> { jObjectTxManager.run(() -> {
var gotKey = getTreeR().traverse(List.of(JKleppmannTreeNodeMetaPeer.peerIdToNodeId(id).value())); var gotKey = getTree().traverse(List.of(JKleppmannTreeNodeMetaPeer.peerIdToNodeId(id).value()));
if (gotKey == null) { if (gotKey == null) {
return; return;
} }
@@ -131,7 +126,7 @@ public class PeerInfoService {
Log.warn("Peer " + id + " not found in the tree"); Log.warn("Peer " + id + " not found in the tree");
return; return;
} }
getTreeW().trash(node.meta(), node.key()); getTree().trash(node.meta(), node.key());
curTx.onCommit(persistentPeerDataService::updateCerts); curTx.onCommit(persistentPeerDataService::updateCerts);
}); });
} }

View File

@@ -12,7 +12,7 @@ import org.eclipse.microprofile.config.inject.ConfigProperty;
@ApplicationScoped @ApplicationScoped
public class PeerLastSeenUpdater { public class PeerLastSeenUpdater {
@Inject @Inject
ConnectedPeerManager connectedPeerManager; ReachablePeerManager reachablePeerManager;
@Inject @Inject
PeerInfoService peerInfoService; PeerInfoService peerInfoService;
@Inject @Inject
@@ -30,7 +30,7 @@ public class PeerLastSeenUpdater {
@Scheduled(every = "${dhfs.objects.last-seen.update}", concurrentExecution = Scheduled.ConcurrentExecution.SKIP) @Scheduled(every = "${dhfs.objects.last-seen.update}", concurrentExecution = Scheduled.ConcurrentExecution.SKIP)
@Blocking @Blocking
void update() { void update() {
var snapshot = connectedPeerManager.getHostStateSnapshot(); var snapshot = reachablePeerManager.getHostStateSnapshot();
for (var a : snapshot.available()) { for (var a : snapshot.available()) {
txm.run(() -> { txm.run(() -> {
var curInfo = remoteTransaction.getData(PeerInfo.class, a.id()).orElse(null); var curInfo = remoteTransaction.getData(PeerInfo.class, a.id()).orElse(null);

View File

@@ -52,7 +52,7 @@ public class PersistentPeerDataService {
@Inject @Inject
TransactionManager txm; TransactionManager txm;
@Inject @Inject
ConnectedPeerManager connectedPeerManager; ReachablePeerManager reachablePeerManager;
@ConfigProperty(name = "dhfs.peerdiscovery.preset-uuid") @ConfigProperty(name = "dhfs.peerdiscovery.preset-uuid")
Optional<String> presetUuid; Optional<String> presetUuid;
@@ -135,7 +135,7 @@ public class PersistentPeerDataService {
} }
curTx.put(data.withInitialSyncDone(data.initialSyncDone().minus(peerId))); curTx.put(data.withInitialSyncDone(data.initialSyncDone().minus(peerId)));
Log.infov("Did reset sync state for {0}", peerId); Log.infov("Did reset sync state for {0}", peerId);
curTx.onCommit(() -> connectedPeerManager.handleConnectionError(peerId)); curTx.onCommit(() -> reachablePeerManager.handleConnectionError(peerId));
return true; return true;
}); });
} }

View File

@@ -30,7 +30,7 @@ import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;
@ApplicationScoped @ApplicationScoped
public class ConnectedPeerManager { public class ReachablePeerManager {
private final ConcurrentMap<PeerId, PeerAddress> _states = new ConcurrentHashMap<>(); private final ConcurrentMap<PeerId, PeerAddress> _states = new ConcurrentHashMap<>();
private final Collection<PeerConnectedEventListener> _connectedListeners; private final Collection<PeerConnectedEventListener> _connectedListeners;
private final Collection<PeerDisconnectedEventListener> _disconnectedListeners; private final Collection<PeerDisconnectedEventListener> _disconnectedListeners;
@@ -58,7 +58,7 @@ public class ConnectedPeerManager {
SyncHandler syncHandler; SyncHandler syncHandler;
private ExecutorService _heartbeatExecutor; private ExecutorService _heartbeatExecutor;
public ConnectedPeerManager(Instance<PeerConnectedEventListener> connectedListeners, Instance<PeerDisconnectedEventListener> disconnectedListeners) { public ReachablePeerManager(Instance<PeerConnectedEventListener> connectedListeners, Instance<PeerDisconnectedEventListener> disconnectedListeners) {
_connectedListeners = List.copyOf(connectedListeners.stream().toList()); _connectedListeners = List.copyOf(connectedListeners.stream().toList());
_disconnectedListeners = List.copyOf(disconnectedListeners.stream().toList()); _disconnectedListeners = List.copyOf(disconnectedListeners.stream().toList());
} }

View File

@@ -3,7 +3,6 @@ package com.usatiuk.dhfs.remoteobj;
import com.usatiuk.dhfs.peersync.PersistentPeerDataService; import com.usatiuk.dhfs.peersync.PersistentPeerDataService;
import com.usatiuk.dhfs.rpc.RemoteObjectServiceClient; import com.usatiuk.dhfs.rpc.RemoteObjectServiceClient;
import com.usatiuk.objects.JObjectKey; import com.usatiuk.objects.JObjectKey;
import com.usatiuk.objects.transaction.LockingStrategy;
import com.usatiuk.objects.transaction.Transaction; import com.usatiuk.objects.transaction.Transaction;
import io.quarkus.logging.Log; import io.quarkus.logging.Log;
import jakarta.inject.Inject; import jakarta.inject.Inject;
@@ -55,11 +54,11 @@ public class RemoteTransaction {
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
private <T extends JDataRemote> Optional<T> getData(Class<T> type, JObjectKey key, LockingStrategy strategy, boolean tryRequest) { private <T extends JDataRemote> Optional<T> getData(Class<T> type, JObjectKey key, boolean tryRequest) {
return curTx.get(RemoteObjectMeta.class, RemoteObjectMeta.ofMetaKey(key), strategy) return curTx.get(RemoteObjectMeta.class, RemoteObjectMeta.ofMetaKey(key))
.flatMap(obj -> { .flatMap(obj -> {
if (obj.hasLocalData()) { if (obj.hasLocalData()) {
var realData = curTx.get(RemoteObjectDataWrapper.class, RemoteObjectMeta.ofDataKey(key), strategy).orElse(null); var realData = curTx.get(RemoteObjectDataWrapper.class, RemoteObjectMeta.ofDataKey(key)).orElse(null);
if (realData == null) if (realData == null)
throw new IllegalStateException("Local data not found for " + key); // TODO: Race throw new IllegalStateException("Local data not found for " + key); // TODO: Race
if (!type.isInstance(realData.data())) if (!type.isInstance(realData.data()))
@@ -72,8 +71,8 @@ public class RemoteTransaction {
}); });
} }
public Optional<RemoteObjectMeta> getMeta(JObjectKey key, LockingStrategy strategy) { public Optional<RemoteObjectMeta> getMeta(JObjectKey key) {
return curTx.get(RemoteObjectMeta.class, RemoteObjectMeta.ofMetaKey(key), strategy); return curTx.get(RemoteObjectMeta.class, RemoteObjectMeta.ofMetaKey(key));
} }
public <T extends JDataRemote> void putDataRaw(T obj) { public <T extends JDataRemote> void putDataRaw(T obj) {
@@ -127,23 +126,12 @@ public class RemoteTransaction {
curTx.put(newData); curTx.put(newData);
} }
public Optional<RemoteObjectMeta> getMeta(JObjectKey key) {
return getMeta(key, LockingStrategy.OPTIMISTIC);
}
public <T extends JDataRemote> Optional<T> getData(Class<T> type, JObjectKey key) { public <T extends JDataRemote> Optional<T> getData(Class<T> type, JObjectKey key) {
return getData(type, key, LockingStrategy.OPTIMISTIC, true); return getData(type, key, true);
} }
public <T extends JDataRemote> Optional<T> getDataLocal(Class<T> type, JObjectKey key) { public <T extends JDataRemote> Optional<T> getDataLocal(Class<T> type, JObjectKey key) {
return getData(type, key, LockingStrategy.OPTIMISTIC, false); return getData(type, key, false);
} }
public <T extends JDataRemote> Optional<T> getData(Class<T> type, JObjectKey key, LockingStrategy strategy) {
return getData(type, key, strategy, true);
}
public <T extends JDataRemote> Optional<T> getDataLocal(Class<T> type, JObjectKey key, LockingStrategy strategy) {
return getData(type, key, strategy, false);
}
} }

View File

@@ -4,7 +4,7 @@ import com.usatiuk.dhfs.ProtoSerializer;
import com.usatiuk.dhfs.invalidation.InvalidationQueueService; import com.usatiuk.dhfs.invalidation.InvalidationQueueService;
import com.usatiuk.dhfs.invalidation.Op; import com.usatiuk.dhfs.invalidation.Op;
import com.usatiuk.dhfs.peersync.PeerId; import com.usatiuk.dhfs.peersync.PeerId;
import com.usatiuk.dhfs.peersync.ConnectedPeerManager; import com.usatiuk.dhfs.peersync.ReachablePeerManager;
import com.usatiuk.dhfs.peersync.PersistentPeerDataService; import com.usatiuk.dhfs.peersync.PersistentPeerDataService;
import com.usatiuk.dhfs.persistence.JObjectKeyP; import com.usatiuk.dhfs.persistence.JObjectKeyP;
import com.usatiuk.dhfs.remoteobj.ReceivedObject; import com.usatiuk.dhfs.remoteobj.ReceivedObject;
@@ -51,7 +51,7 @@ public class RemoteObjectServiceClient {
@Inject @Inject
ProtoSerializer<GetObjectReply, ReceivedObject> receivedObjectProtoSerializer; ProtoSerializer<GetObjectReply, ReceivedObject> receivedObjectProtoSerializer;
@Inject @Inject
ConnectedPeerManager connectedPeerManager; ReachablePeerManager reachablePeerManager;
public Pair<PeerId, ReceivedObject> getSpecificObject(JObjectKey key, PeerId peerId) { public Pair<PeerId, ReceivedObject> getSpecificObject(JObjectKey key, PeerId peerId) {
return rpcClientFactory.withObjSyncClient(peerId, (peer, client) -> { return rpcClientFactory.withObjSyncClient(peerId, (peer, client) -> {
@@ -70,7 +70,7 @@ public class RemoteObjectServiceClient {
var targetVersion = objMeta.versionSum(); var targetVersion = objMeta.versionSum();
var targets = objMeta.knownRemoteVersions().isEmpty() var targets = objMeta.knownRemoteVersions().isEmpty()
? connectedPeerManager.getAvailableHosts() ? reachablePeerManager.getAvailableHosts()
: objMeta.knownRemoteVersions().entrySet().stream() : objMeta.knownRemoteVersions().entrySet().stream()
.filter(entry -> entry.getValue().equals(targetVersion)) .filter(entry -> entry.getValue().equals(targetVersion))
.map(Map.Entry::getKey).toList(); .map(Map.Entry::getKey).toList();

View File

@@ -5,7 +5,7 @@ import com.usatiuk.dhfs.autosync.AutosyncProcessor;
import com.usatiuk.dhfs.invalidation.Op; import com.usatiuk.dhfs.invalidation.Op;
import com.usatiuk.dhfs.invalidation.OpHandlerService; import com.usatiuk.dhfs.invalidation.OpHandlerService;
import com.usatiuk.dhfs.peersync.PeerId; import com.usatiuk.dhfs.peersync.PeerId;
import com.usatiuk.dhfs.peersync.ConnectedPeerManager; import com.usatiuk.dhfs.peersync.ReachablePeerManager;
import com.usatiuk.dhfs.persistence.JObjectKeyP; import com.usatiuk.dhfs.persistence.JObjectKeyP;
import com.usatiuk.dhfs.remoteobj.*; import com.usatiuk.dhfs.remoteobj.*;
import com.usatiuk.dhfs.repository.*; import com.usatiuk.dhfs.repository.*;
@@ -31,7 +31,7 @@ public class RemoteObjectServiceServerImpl {
@Inject @Inject
TransactionManager txm; TransactionManager txm;
@Inject @Inject
ConnectedPeerManager connectedPeerManager; ReachablePeerManager reachablePeerManager;
@Inject @Inject
Transaction curTx; Transaction curTx;

View File

@@ -4,7 +4,7 @@ import com.usatiuk.dhfs.peerdiscovery.IpPeerAddress;
import com.usatiuk.dhfs.peerdiscovery.PeerAddress; import com.usatiuk.dhfs.peerdiscovery.PeerAddress;
import com.usatiuk.dhfs.peersync.PeerDisconnectedEventListener; import com.usatiuk.dhfs.peersync.PeerDisconnectedEventListener;
import com.usatiuk.dhfs.peersync.PeerId; import com.usatiuk.dhfs.peersync.PeerId;
import com.usatiuk.dhfs.peersync.ConnectedPeerManager; import com.usatiuk.dhfs.peersync.ReachablePeerManager;
import com.usatiuk.dhfs.repository.DhfsObjectSyncGrpcGrpc; import com.usatiuk.dhfs.repository.DhfsObjectSyncGrpcGrpc;
import io.grpc.ManagedChannel; import io.grpc.ManagedChannel;
import io.grpc.Status; import io.grpc.Status;
@@ -29,7 +29,7 @@ public class RpcClientFactory implements PeerDisconnectedEventListener {
long syncTimeout; long syncTimeout;
@Inject @Inject
ConnectedPeerManager connectedPeerManager; ReachablePeerManager reachablePeerManager;
@Inject @Inject
RpcChannelFactory rpcChannelFactory; RpcChannelFactory rpcChannelFactory;
@@ -56,7 +56,7 @@ public class RpcClientFactory implements PeerDisconnectedEventListener {
} }
public <R> R withObjSyncClient(PeerId target, ObjectSyncClientFunction<R> fn) { public <R> R withObjSyncClient(PeerId target, ObjectSyncClientFunction<R> fn) {
var hostinfo = connectedPeerManager.getAddress(target); var hostinfo = reachablePeerManager.getAddress(target);
if (hostinfo == null) if (hostinfo == null)
throw new StatusRuntimeException(Status.UNAVAILABLE.withDescription("Not known to be reachable: " + target)); throw new StatusRuntimeException(Status.UNAVAILABLE.withDescription("Not known to be reachable: " + target));

View File

@@ -2,7 +2,7 @@ package com.usatiuk.dhfs.webapi;
import com.usatiuk.dhfs.peersync.PeerId; import com.usatiuk.dhfs.peersync.PeerId;
import com.usatiuk.dhfs.peersync.PeerInfoService; import com.usatiuk.dhfs.peersync.PeerInfoService;
import com.usatiuk.dhfs.peersync.ConnectedPeerManager; import com.usatiuk.dhfs.peersync.ReachablePeerManager;
import com.usatiuk.dhfs.peersync.PersistentPeerDataService; import com.usatiuk.dhfs.peersync.PersistentPeerDataService;
import jakarta.inject.Inject; import jakarta.inject.Inject;
import jakarta.ws.rs.*; import jakarta.ws.rs.*;
@@ -14,7 +14,7 @@ public class PeerManagementApi {
@Inject @Inject
PeerInfoService peerInfoService; PeerInfoService peerInfoService;
@Inject @Inject
ConnectedPeerManager connectedPeerManager; ReachablePeerManager reachablePeerManager;
@Inject @Inject
PersistentPeerDataService persistentPeerDataService; PersistentPeerDataService persistentPeerDataService;
@@ -23,27 +23,27 @@ public class PeerManagementApi {
public List<PeerInfo> knownPeers() { public List<PeerInfo> knownPeers() {
return peerInfoService.getPeers().stream().map( return peerInfoService.getPeers().stream().map(
peerInfo -> new PeerInfo(peerInfo.id().toString(), Base64.getEncoder().encodeToString(peerInfo.cert().toByteArray()), peerInfo -> new PeerInfo(peerInfo.id().toString(), Base64.getEncoder().encodeToString(peerInfo.cert().toByteArray()),
Optional.ofNullable(connectedPeerManager.getAddress(peerInfo.id())).map(Objects::toString).orElse(null))).toList(); Optional.ofNullable(reachablePeerManager.getAddress(peerInfo.id())).map(Objects::toString).orElse(null))).toList();
} }
@Path("known-peers/{peerId}") @Path("known-peers/{peerId}")
@PUT @PUT
public void addPeer(@PathParam("peerId") String peerId, KnownPeerPut knownPeerPut) { public void addPeer(@PathParam("peerId") String peerId, KnownPeerPut knownPeerPut) {
connectedPeerManager.addRemoteHost(PeerId.of(peerId), knownPeerPut.cert()); reachablePeerManager.addRemoteHost(PeerId.of(peerId), knownPeerPut.cert());
} }
@Path("known-peers/{peerId}") @Path("known-peers/{peerId}")
@DELETE @DELETE
public void deletePeer(@PathParam("peerId") String peerId) { public void deletePeer(@PathParam("peerId") String peerId) {
connectedPeerManager.removeRemoteHost(PeerId.of(peerId)); reachablePeerManager.removeRemoteHost(PeerId.of(peerId));
} }
@Path("available-peers") @Path("available-peers")
@GET @GET
public Collection<PeerInfo> availablePeers() { public Collection<PeerInfo> availablePeers() {
return connectedPeerManager.getSeenButNotAddedHosts().stream() return reachablePeerManager.getSeenButNotAddedHosts().stream()
.map(p -> new PeerInfo(p.getLeft().toString(), p.getRight().cert(), .map(p -> new PeerInfo(p.getLeft().toString(), p.getRight().cert(),
connectedPeerManager.selectBestAddress(p.getLeft()).map(Objects::toString).orElse(null))) reachablePeerManager.selectBestAddress(p.getLeft()).map(Objects::toString).orElse(null)))
.toList(); .toList();
} }
} }

View File

@@ -3,7 +3,7 @@ package com.usatiuk.dhfs.webapi;
import com.usatiuk.dhfs.peerdiscovery.PeerAddrStringHelper; import com.usatiuk.dhfs.peerdiscovery.PeerAddrStringHelper;
import com.usatiuk.dhfs.peersync.PeerId; import com.usatiuk.dhfs.peersync.PeerId;
import com.usatiuk.dhfs.peersync.PeerInfoService; import com.usatiuk.dhfs.peersync.PeerInfoService;
import com.usatiuk.dhfs.peersync.ConnectedPeerManager; import com.usatiuk.dhfs.peersync.ReachablePeerManager;
import com.usatiuk.dhfs.peersync.PersistentPeerDataService; import com.usatiuk.dhfs.peersync.PersistentPeerDataService;
import jakarta.inject.Inject; import jakarta.inject.Inject;
import jakarta.ws.rs.*; import jakarta.ws.rs.*;
@@ -15,7 +15,7 @@ public class PersistentPeerAddressApi {
@Inject @Inject
PeerInfoService peerInfoService; PeerInfoService peerInfoService;
@Inject @Inject
ConnectedPeerManager connectedPeerManager; ReachablePeerManager reachablePeerManager;
@Inject @Inject
PersistentPeerDataService persistentPeerDataService; PersistentPeerDataService persistentPeerDataService;