11 Commits

Author SHA1 Message Date
ca354ba09c Webui: don't show complete address 2025-04-23 17:24:48 +02:00
81af021292 Wait for lazyfs to crash before unmount 2025-04-23 17:19:44 +02:00
0c04079258 Improved peer UI 2025-04-23 16:37:45 +02:00
2e2eb3ac97 Dhfs-app: lazyfs torn op testing 2025-04-23 15:07:09 +02:00
e2e756e7c5 Objects: getFromSource just ever so slightly faster
one map access
2025-04-23 14:50:34 +02:00
04e932ed62 Dhfs-app: LazyFs directory test non-repeated
some docker ip pool problems
2025-04-23 14:17:42 +02:00
aeec66389d Dhfs-app: LazyFs directory test 2025-04-23 14:00:44 +02:00
adc7356d4a Sync-base: fix leaking non-flushed ops 2025-04-23 14:00:30 +02:00
16da05292f Objects: better onflush for no write transactions 2025-04-23 13:58:55 +02:00
b0149b7251 Objects: less logs in iterators
less crap, and there are tests now
2025-04-23 11:06:26 +02:00
24416c1e87 Dhfs-app: less badlazyfs crash test 2025-04-23 10:38:52 +02:00
24 changed files with 448 additions and 253 deletions

View File

@@ -5,54 +5,61 @@ import io.quarkus.logging.Log;
import java.io.*;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
public class LazyFs {
private static final String lazyFsPath;
private final String mountRoot;
private final String dataRoot;
private final String lazyFsDataPath;
private final String name;
private final File tmpConfigFile;
private final File fifoPath;
private final File configFile;
private final File fifoFile;
static {
lazyFsPath = System.getProperty("lazyFsPath");
System.out.println("LazyFs Path: " + lazyFsPath);
}
public LazyFs(String name, String dataRoot, String lazyFsDataPath) {
public LazyFs(String name, String mountRoot, String dataRoot) {
this.name = name;
this.mountRoot = mountRoot;
this.dataRoot = dataRoot;
this.lazyFsDataPath = lazyFsDataPath;
try {
tmpConfigFile = File.createTempFile("lazyfs", ".conf");
tmpConfigFile.deleteOnExit();
configFile = File.createTempFile("lazyfs", ".conf");
configFile.deleteOnExit();
fifoPath = new File("/tmp/" + ThreadLocalRandom.current().nextLong() + ".faultsfifo");
fifoPath.deleteOnExit();
fifoFile = new File("/tmp/" + ThreadLocalRandom.current().nextLong() + ".faultsfifo");
fifoFile.deleteOnExit();
} catch (IOException e) {
throw new RuntimeException(e);
}
Runtime.getRuntime().addShutdownHook(new Thread(this::stop));
}
private Thread outputPiper;
private Thread errPiper;
private Thread outPiper;
private CountDownLatch startLatch;
private Process fs;
private String fifoPath() {
return fifoPath.getAbsolutePath();
return fifoFile.getAbsolutePath();
}
public void start() {
public void start(String extraOpts) {
var lfsPath = Path.of(lazyFsPath).resolve("build").resolve("lazyfs");
if (!lfsPath.toFile().isFile())
throw new IllegalStateException("LazyFs binary does not exist: " + lfsPath.toAbsolutePath());
if (!lfsPath.toFile().canExecute())
throw new IllegalStateException("LazyFs binary is not executable: " + lfsPath.toAbsolutePath());
try (var rwFile = new RandomAccessFile(tmpConfigFile, "rw");
try (var rwFile = new RandomAccessFile(configFile, "rw");
var channel = rwFile.getChannel()) {
channel.truncate(0);
var config = "[faults]\n" +
@@ -64,7 +71,7 @@ public class LazyFs {
"blocks_per_page=1\n" +
"[filesystem]\n" +
"log_all_operations=false\n" +
"logfile=\"\"";
"logfile=\"\"\n" + extraOpts;
rwFile.write(config.getBytes());
Log.info("LazyFs config: \n" + config);
} catch (Exception e) {
@@ -74,38 +81,42 @@ public class LazyFs {
var argList = new ArrayList<String>();
argList.add(lfsPath.toString());
argList.add(Path.of(dataRoot).toString());
argList.add(Path.of(mountRoot).toString());
argList.add("--config-path");
argList.add(tmpConfigFile.getAbsolutePath());
argList.add(configFile.getAbsolutePath());
argList.add("-o");
argList.add("allow_other");
argList.add("-o");
argList.add("modules=subdir");
argList.add("-o");
argList.add("subdir=" + Path.of(lazyFsDataPath).toAbsolutePath().toString());
argList.add("subdir=" + Path.of(dataRoot).toAbsolutePath().toString());
try {
Log.info("Starting LazyFs " + argList);
fs = Runtime.getRuntime().exec(argList.toArray(String[]::new));
Thread.sleep(1000);
} catch (Exception e) {
throw new RuntimeException(e);
}
outputPiper = new Thread(() -> {
startLatch = new CountDownLatch(1);
outPiper = new Thread(() -> {
try {
try (BufferedReader input = new BufferedReader(new InputStreamReader(fs.getInputStream()))) {
String line;
while ((line = input.readLine()) != null) {
if (line.contains("running LazyFS"))
startLatch.countDown();
System.out.println(line);
}
}
} catch (Exception e) {
Log.info("Exception in LazyFs piper", e);
}
Log.info("LazyFs out piper finished");
});
outputPiper.start();
outputPiper = new Thread(() -> {
outPiper.start();
errPiper = new Thread(() -> {
try {
try (BufferedReader input = new BufferedReader(new InputStreamReader(fs.getErrorStream()))) {
String line;
@@ -117,15 +128,51 @@ public class LazyFs {
} catch (Exception e) {
Log.info("Exception in LazyFs piper", e);
}
Log.info("LazyFs err piper finished");
});
outputPiper.start();
errPiper.start();
try {
if (!startLatch.await(5, TimeUnit.SECONDS))
throw new RuntimeException("StartLatch timed out");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
Log.info("LazyFs started");
}
public void start() {
start("");
}
private String mdbPath() {
return Path.of(dataRoot).resolve("objects").resolve("data.mdb").toAbsolutePath().toString();
}
public void startTornOp() {
start("\n" +
"[[injection]]\n" +
"type=\"torn-seq\"\n" +
"op=\"write\"\n" +
"file=\"" + mdbPath() + "\"\n" +
"persist=[1,4]\n" +
"occurrence=2");
}
public void startTornSeq() {
start("[[injection]]\n" +
"type=\"torn-op\"\n" +
"file=\"" + mdbPath() + "\"\n" +
"occurrence=5\n" +
"parts=3 #or parts_bytes=[4096,3600,1260]\n" +
"persist=[1,3]");
}
public void crash() {
try {
var cmd = "echo \"lazyfs::crash::timing=after::op=write::from_rgx=*\" > " + fifoPath();
System.out.println("Running command: " + cmd);
Runtime.getRuntime().exec(new String[]{"/bin/sh", "-c", cmd});
Log.info("Running command: " + cmd);
Runtime.getRuntime().exec(new String[]{"/bin/sh", "-c", cmd}).waitFor();
stop();
} catch (Exception e) {
throw new RuntimeException(e);
@@ -134,13 +181,13 @@ public class LazyFs {
public void stop() {
try {
if (fs == null) {
return;
synchronized (this) {
if (fs == null) {
return;
}
Runtime.getRuntime().exec(new String[]{"/bin/sh", "-c", "fusermount3 -u " + mountRoot}).waitFor();
fs = null;
}
Runtime.getRuntime().exec(new String[]{"/bin/sh", "-c", "fusermount3 -u " + dataRoot});
if (!fs.waitFor(1, TimeUnit.SECONDS))
throw new RuntimeException("LazyFs process did not stop in time");
fs = null;
} catch (Exception e) {
throw new RuntimeException(e);
}

View File

@@ -18,10 +18,7 @@ import java.nio.file.Files;
import java.time.Duration;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.*;
import java.util.stream.Stream;
import static org.awaitility.Awaitility.await;
@@ -39,19 +36,26 @@ public class LazyFsIT {
File data1;
File data2;
File data1Lazy;
File data2Lazy;
LazyFs lazyFs1;
LazyFs lazyFs2;
private final ExecutorService executor = Executors.newCachedThreadPool();
@BeforeEach
void setup(TestInfo testInfo) throws IOException, InterruptedException, TimeoutException {
data1 = Files.createTempDirectory("dhfsdata").toFile();
data2 = Files.createTempDirectory("dhfsdata").toFile();
data1Lazy = Files.createTempDirectory("lazyfsroot").toFile();
data2Lazy = Files.createTempDirectory("lazyfsroot").toFile();
Network network = Network.newNetwork();
lazyFs1 = new LazyFs(testInfo.getDisplayName(), data1.toString(), data1Lazy.toString());
lazyFs1.start();
lazyFs2 = new LazyFs(testInfo.getDisplayName(), data2.toString(), data2Lazy.toString());
lazyFs2.start();
container1 = new GenericContainer<>(DhfsImage.getInstance())
.withPrivilegedMode(true)
@@ -101,46 +105,18 @@ public class LazyFsIT {
@AfterEach
void stop() {
lazyFs1.stop();
lazyFs2.stop();
Stream.of(container1, container2).parallel().forEach(GenericContainer::stop);
TestDataCleaner.purgeDirectory(data1);
TestDataCleaner.purgeDirectory(data1Lazy);
TestDataCleaner.purgeDirectory(data2);
TestDataCleaner.purgeDirectory(data2Lazy);
executor.close();
}
@Test
void killTest(TestInfo testInfo) throws Exception {
var executor = Executors.newFixedThreadPool(2);
var barrier = new CyclicBarrier(2);
var ret1 = executor.submit(() -> {
try {
Log.info("Writing to container 1");
barrier.await();
container1.execInContainer("/bin/sh", "-c", "counter=0; while true; do counter=`expr $counter + 1`; echo $counter >> /dhfs_test/fuse/test1; done");
} catch (Exception e) {
throw new RuntimeException(e);
}
});
barrier.await();
Thread.sleep(1000);
Log.info("Killing");
lazyFs1.crash();
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Caused by: org.lmdbjava"), 5, TimeUnit.SECONDS);
Thread.sleep(1000);
var client = DockerClientFactory.instance().client();
client.killContainerCmd(container1.getContainerId()).exec();
container1.stop();
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Lost connection to"), 60, TimeUnit.SECONDS);
Log.info("Restart");
lazyFs1.start();
container1.start();
waitingConsumer1 = new WaitingConsumer();
var loggingConsumer1 = new Slf4jLogConsumer(LoggerFactory.getLogger(LazyFsIT.class)).withPrefix("1-" + testInfo.getDisplayName());
container1.followOutput(loggingConsumer1.andThen(waitingConsumer1));
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
private void checkConsistency() {
await().atMost(45, TimeUnit.SECONDS).until(() -> {
Log.info("Listing consistency");
var ls1 = container1.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse");
@@ -156,44 +132,185 @@ public class LazyFsIT {
});
}
// @Test
// void killTestDirs(TestInfo testInfo) throws Exception {
// var executor = Executors.newFixedThreadPool(2);
// var barrier = new CyclicBarrier(2);
// var ret1 = executor.submit(() -> {
// try {
// Log.info("Writing to container 1");
// barrier.await();
// container1.execInContainer("/bin/sh", "-c", "counter=0; while true; do counter=`expr $counter + 1`; echo $counter >> /dhfs_test/fuse/test$counter; done");
// } catch (Exception e) {
// throw new RuntimeException(e);
// }
// });
// barrier.await();
// Thread.sleep(10000);
// var client = DockerClientFactory.instance().client();
// client.killContainerCmd(container1.getContainerId()).exec();
// container1.stop();
// waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Lost connection to"), 60, TimeUnit.SECONDS);
// container1.start();
// waitingConsumer1 = new WaitingConsumer();
// var loggingConsumer1 = new Slf4jLogConsumer(LoggerFactory.getLogger(LazyFsIT.class)).withPrefix("1-" + testInfo.getDisplayName());
// container1.followOutput(loggingConsumer1.andThen(waitingConsumer1));
// waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
// waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
//
// await().atMost(45, TimeUnit.SECONDS).until(() -> {
// Log.info("Listing consistency");
// var ls1 = container1.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse");
// var cat1 = container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/*");
// var ls2 = container2.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse");
// var cat2 = container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/*");
// Log.info(ls1);
// Log.info(cat1);
// Log.info(ls2);
// Log.info(cat2);
//
// return ls1.equals(ls2) && cat1.equals(cat2);
// });
// }
@Test
void killTest(TestInfo testInfo) throws Exception {
var barrier = new CountDownLatch(1);
executor.submit(() -> {
try {
Log.info("Writing to container 1");
barrier.countDown();
container1.execInContainer("/bin/sh", "-c", "counter=0; while true; do counter=`expr $counter + 1`; echo $counter >> /dhfs_test/fuse/test1; done");
} catch (Exception e) {
throw new RuntimeException(e);
}
});
barrier.await();
Thread.sleep(2000);
Log.info("Killing");
lazyFs1.crash();
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Caused by: org.lmdbjava"), 5, TimeUnit.SECONDS);
var client = DockerClientFactory.instance().client();
client.killContainerCmd(container1.getContainerId()).exec();
container1.stop();
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Lost connection to"), 60, TimeUnit.SECONDS);
Log.info("Restart");
lazyFs1.start();
container1.start();
waitingConsumer1 = new WaitingConsumer();
var loggingConsumer1 = new Slf4jLogConsumer(LoggerFactory.getLogger(LazyFsIT.class)).withPrefix("1-" + testInfo.getDisplayName());
container1.followOutput(loggingConsumer1.andThen(waitingConsumer1));
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
checkConsistency();
}
@Test
void killTestDirs(TestInfo testInfo) throws Exception {
var barrier = new CountDownLatch(1);
executor.submit(() -> {
try {
Log.info("Writing to container 1");
barrier.countDown();
container1.execInContainer("/bin/sh", "-c", "counter=0; while true; do counter=`expr $counter + 1`; echo $counter >> /dhfs_test/fuse/test$counter; done");
} catch (Exception e) {
throw new RuntimeException(e);
}
});
barrier.await();
Thread.sleep(2000);
Log.info("Killing");
lazyFs1.crash();
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Caused by: org.lmdbjava"), 5, TimeUnit.SECONDS);
var client = DockerClientFactory.instance().client();
client.killContainerCmd(container1.getContainerId()).exec();
container1.stop();
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Lost connection to"), 60, TimeUnit.SECONDS);
Log.info("Restart");
lazyFs1.start();
container1.start();
waitingConsumer1 = new WaitingConsumer();
var loggingConsumer1 = new Slf4jLogConsumer(LoggerFactory.getLogger(LazyFsIT.class)).withPrefix("1-" + testInfo.getDisplayName());
container1.followOutput(loggingConsumer1.andThen(waitingConsumer1));
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
checkConsistency();
}
@Test
void killTestDirs2(TestInfo testInfo) throws Exception {
var barrier = new CountDownLatch(1);
executor.submit(() -> {
try {
Log.info("Writing to container 1");
barrier.countDown();
container1.execInContainer("/bin/sh", "-c", "counter=0; while true; do counter=`expr $counter + 1`; echo $counter >> /dhfs_test/fuse/test$counter; done");
} catch (Exception e) {
throw new RuntimeException(e);
}
});
barrier.await();
Log.info("Killing");
lazyFs1.crash();
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Caused by: org.lmdbjava"), 5, TimeUnit.SECONDS);
var client = DockerClientFactory.instance().client();
client.killContainerCmd(container1.getContainerId()).exec();
container1.stop();
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Lost connection to"), 60, TimeUnit.SECONDS);
Log.info("Restart");
lazyFs1.startTornOp();
container1.start();
waitingConsumer1 = new WaitingConsumer();
var loggingConsumer1 = new Slf4jLogConsumer(LoggerFactory.getLogger(LazyFsIT.class)).withPrefix("1-" + testInfo.getDisplayName());
container1.followOutput(loggingConsumer1.andThen(waitingConsumer1));
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
executor.submit(() -> {
try {
Log.info("Writing to container 1");
barrier.countDown();
container1.execInContainer("/bin/sh", "-c", "counter=0; while true; do counter=`expr $counter + 1`; echo $counter >> /dhfs_test/fuse/2test$counter; done");
} catch (Exception e) {
throw new RuntimeException(e);
}
});
Log.info("Killing");
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Caused by: org.lmdbjava"), 5, TimeUnit.SECONDS);
client.killContainerCmd(container1.getContainerId()).exec();
container1.stop();
lazyFs1.stop();
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Lost connection to"), 60, TimeUnit.SECONDS);
Log.info("Restart");
lazyFs1.start();
container1.start();
waitingConsumer1 = new WaitingConsumer();
loggingConsumer1 = new Slf4jLogConsumer(LoggerFactory.getLogger(LazyFsIT.class)).withPrefix("1-" + testInfo.getDisplayName());
container1.followOutput(loggingConsumer1.andThen(waitingConsumer1));
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
checkConsistency();
}
@Test
void killTestDirs3(TestInfo testInfo) throws Exception {
var barrier = new CountDownLatch(1);
executor.submit(() -> {
try {
Log.info("Writing to container 1");
barrier.countDown();
container1.execInContainer("/bin/sh", "-c", "counter=0; while true; do counter=`expr $counter + 1`; echo $counter >> /dhfs_test/fuse/test$counter; done");
} catch (Exception e) {
throw new RuntimeException(e);
}
});
barrier.await();
Log.info("Killing");
lazyFs1.crash();
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Caused by: org.lmdbjava"), 5, TimeUnit.SECONDS);
var client = DockerClientFactory.instance().client();
client.killContainerCmd(container1.getContainerId()).exec();
container1.stop();
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Lost connection to"), 60, TimeUnit.SECONDS);
Log.info("Restart");
lazyFs1.startTornSeq();
container1.start();
waitingConsumer1 = new WaitingConsumer();
var loggingConsumer1 = new Slf4jLogConsumer(LoggerFactory.getLogger(LazyFsIT.class)).withPrefix("1-" + testInfo.getDisplayName());
container1.followOutput(loggingConsumer1.andThen(waitingConsumer1));
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
executor.submit(() -> {
try {
Log.info("Writing to container 1");
barrier.countDown();
container1.execInContainer("/bin/sh", "-c", "counter=0; while true; do counter=`expr $counter + 1`; echo $counter >> /dhfs_test/fuse/2test$counter; done");
} catch (Exception e) {
throw new RuntimeException(e);
}
});
Log.info("Killing");
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Caused by: org.lmdbjava"), 5, TimeUnit.SECONDS);
client.killContainerCmd(container1.getContainerId()).exec();
container1.stop();
lazyFs1.stop();
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Lost connection to"), 60, TimeUnit.SECONDS);
Log.info("Restart");
lazyFs1.start();
container1.start();
waitingConsumer1 = new WaitingConsumer();
loggingConsumer1 = new Slf4jLogConsumer(LoggerFactory.getLogger(LazyFsIT.class)).withPrefix("1-" + testInfo.getDisplayName());
container1.followOutput(loggingConsumer1.andThen(waitingConsumer1));
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
checkConsistency();
}
}

View File

@@ -39,20 +39,20 @@ public class KeyPredicateKvIterator<K extends Comparable<K>, V> extends Reversib
}
switch (start) {
case LT -> {
// assert _next == null || _next.getKey().compareTo(startKey) < 0;
}
case LE -> {
// assert _next == null || _next.getKey().compareTo(startKey) <= 0;
}
case GT -> {
assert _next == null || _next.compareTo(startKey) > 0;
}
case GE -> {
assert _next == null || _next.compareTo(startKey) >= 0;
}
}
// switch (start) {
// case LT -> {
//// assert _next == null || _next.getKey().compareTo(startKey) < 0;
// }
// case LE -> {
//// assert _next == null || _next.getKey().compareTo(startKey) <= 0;
// }
// case GT -> {
// assert _next == null || _next.compareTo(startKey) > 0;
// }
// case GE -> {
// assert _next == null || _next.compareTo(startKey) >= 0;
// }
// }
}
private void fillNext() {

View File

@@ -80,21 +80,21 @@ public class MergingKvIterator<K extends Comparable<K>, V> extends ReversibleKvI
advanceIterator(iterator);
}
Log.tracev("{0} Initialized: {1}", _name, _sortedIterators);
switch (startType) {
// case LT -> {
// assert _sortedIterators.isEmpty() || _sortedIterators.firstKey().compareTo(initialStartKey) < 0;
// Log.tracev("{0} Initialized: {1}", _name, _sortedIterators);
// switch (startType) {
//// case LT -> {
//// assert _sortedIterators.isEmpty() || _sortedIterators.firstKey().compareTo(initialStartKey) < 0;
//// }
//// case LE -> {
//// assert _sortedIterators.isEmpty() || _sortedIterators.firstKey().compareTo(initialStartKey) <= 0;
//// }
// case GT -> {
// assert _sortedIterators.isEmpty() || _sortedIterators.firstKey().compareTo(startKey) > 0;
// }
// case LE -> {
// assert _sortedIterators.isEmpty() || _sortedIterators.firstKey().compareTo(initialStartKey) <= 0;
// case GE -> {
// assert _sortedIterators.isEmpty() || _sortedIterators.firstKey().compareTo(startKey) >= 0;
// }
case GT -> {
assert _sortedIterators.isEmpty() || _sortedIterators.firstKey().compareTo(startKey) > 0;
}
case GE -> {
assert _sortedIterators.isEmpty() || _sortedIterators.firstKey().compareTo(startKey) >= 0;
}
}
// }
}
@SafeVarargs
@@ -105,7 +105,7 @@ public class MergingKvIterator<K extends Comparable<K>, V> extends ReversibleKvI
private void advanceIterator(IteratorEntry<K, V> iteratorEntry) {
while (iteratorEntry.iterator().hasNext()) {
K key = iteratorEntry.iterator().peekNextKey();
Log.tracev("{0} Advance peeked: {1}-{2}", _name, iteratorEntry, key);
// Log.tracev("{0} Advance peeked: {1}-{2}", _name, iteratorEntry, key);
MutableObject<IteratorEntry<K, V>> mutableBoolean = new MutableObject<>(null);
@@ -125,7 +125,7 @@ public class MergingKvIterator<K extends Comparable<K>, V> extends ReversibleKvI
});
if (newVal != iteratorEntry) {
Log.tracev("{0} Skipped: {1}", _name, iteratorEntry.iterator().peekNextKey());
// Log.tracev("{0} Skipped: {1}", _name, iteratorEntry.iterator().peekNextKey());
iteratorEntry.iterator().skip();
continue;
}
@@ -141,7 +141,7 @@ public class MergingKvIterator<K extends Comparable<K>, V> extends ReversibleKvI
@Override
protected void reverse() {
var cur = _goingForward ? _sortedIterators.pollFirstEntry() : _sortedIterators.pollLastEntry();
Log.tracev("{0} Reversing from {1}", _name, cur);
// Log.tracev("{0} Reversing from {1}", _name, cur);
_goingForward = !_goingForward;
_sortedIterators.clear();
for (IteratorEntry<K, V> iterator : _iterators) {
@@ -176,7 +176,7 @@ public class MergingKvIterator<K extends Comparable<K>, V> extends ReversibleKvI
}
cur.getValue().iterator().skip();
advanceIterator(cur.getValue());
Log.tracev("{0} Skip: {1}, next: {2}", _name, cur, _sortedIterators);
// Log.tracev("{0} Skip: {1}, next: {2}", _name, cur, _sortedIterators);
}
@Override

View File

@@ -44,20 +44,20 @@ public class PredicateKvIterator<K extends Comparable<K>, V, V_T> extends Revers
}
switch (start) {
case LT -> {
// assert _next == null || _next.getKey().compareTo(startKey) < 0;
}
case LE -> {
// assert _next == null || _next.getKey().compareTo(startKey) <= 0;
}
case GT -> {
assert _next == null || _next.getKey().compareTo(startKey) > 0;
}
case GE -> {
assert _next == null || _next.getKey().compareTo(startKey) >= 0;
}
}
// switch (start) {
// case LT -> {
//// assert _next == null || _next.getKey().compareTo(startKey) < 0;
// }
// case LE -> {
//// assert _next == null || _next.getKey().compareTo(startKey) <= 0;
// }
// case GT -> {
// assert _next == null || _next.getKey().compareTo(startKey) > 0;
// }
// case GE -> {
// assert _next == null || _next.getKey().compareTo(startKey) >= 0;
// }
// }
}
private void fillNext() {
@@ -81,8 +81,8 @@ public class PredicateKvIterator<K extends Comparable<K>, V, V_T> extends Revers
else if (!_goingForward && !wasAtEnd)
_backing.skipPrev();
if (!wasAtEnd)
Log.tracev("Skipped in reverse: {0}", _next);
// if (!wasAtEnd)
// Log.tracev("Skipped in reverse: {0}", _next);
_next = null;
_checkedNext = false;

View File

@@ -10,7 +10,7 @@ public abstract class TombstoneMergingKvIterator {
new MergingKvIterator<K, MaybeTombstone<V>>(name + "-merging", startType, startKey, iterators),
startType, startKey,
pair -> {
Log.tracev("{0} - Processing pair {1}", name, pair);
// Log.tracev("{0} - Processing pair {1}", name, pair);
if (pair instanceof Tombstone<V>) {
return null;
}

View File

@@ -1,29 +0,0 @@
package com.usatiuk.objects.snapshot;
import com.usatiuk.objects.JDataVersionedWrapper;
import com.usatiuk.objects.JObjectKey;
import com.usatiuk.objects.stores.WritebackObjectPersistentStore;
import com.usatiuk.objects.transaction.TxRecord;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import javax.annotation.Nonnull;
import java.util.Collection;
import java.util.Optional;
import java.util.function.Consumer;
@Singleton
public class SnapshotManager {
@Inject
WritebackObjectPersistentStore writebackStore;
public Snapshot<JObjectKey, JDataVersionedWrapper> createSnapshot() {
return writebackStore.getSnapshot();
}
// This should not be called for the same objects concurrently
public Consumer<Runnable> commitTx(Collection<TxRecord.TxObjectRecord<?>> writes) {
return writebackStore.commitTx(writes);
}
}

View File

@@ -42,13 +42,15 @@ public class CachingObjectPersistentStore {
if (old != null)
newSize -= old.size();
TreePMap<JObjectKey, CacheEntry> newCache = map().plus(key, entry);
TreePMap<JObjectKey, CacheEntry> newCache = map();
while (newSize > sizeLimit) {
var del = newCache.firstEntry();
newCache = newCache.minusFirstEntry();
newSize -= del.getValue().size();
}
newCache = newCache.plus(key, entry);
return new Cache(
newCache,
newSize,

View File

@@ -368,7 +368,7 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore {
_hasNext = _cursor.next();
else
_hasNext = _cursor.prev();
Log.tracev("Read: {0}, hasNext: {1}", ret, _hasNext);
// Log.tracev("Read: {0}, hasNext: {1}", ret, _hasNext);
_peekedNextKey = null;
return ret;
}

View File

@@ -1,12 +1,11 @@
package com.usatiuk.objects.transaction;
import com.google.common.collect.Streams;
import com.usatiuk.dhfs.utils.AutoCloseableNoThrow;
import com.usatiuk.objects.JData;
import com.usatiuk.objects.JDataVersionedWrapper;
import com.usatiuk.objects.JObjectKey;
import com.usatiuk.objects.snapshot.Snapshot;
import com.usatiuk.objects.snapshot.SnapshotManager;
import com.usatiuk.objects.stores.WritebackObjectPersistentStore;
import io.quarkus.logging.Log;
import io.quarkus.runtime.StartupEvent;
import jakarta.annotation.Priority;
@@ -14,10 +13,10 @@ import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
import jakarta.enterprise.inject.Instance;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import org.apache.commons.lang3.tuple.Pair;
import java.util.*;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Stream;
@@ -31,7 +30,7 @@ public class JObjectManager {
}
@Inject
SnapshotManager snapshotManager;
WritebackObjectPersistentStore writebackObjectPersistentStore;
@Inject
TransactionFactory transactionFactory;
@Inject
@@ -176,25 +175,36 @@ public class JObjectManager {
toUnlock.add(lock);
}
commitSnapshot = snapshotManager.createSnapshot();
commitSnapshot = writebackObjectPersistentStore.getSnapshot();
} else {
Log.trace("Committing transaction - no changes");
long version = 0L;
for (var read : readSet.values()) {
version = Math.max(version, read.data().map(JDataVersionedWrapper::version).orElse(0L));
if (read instanceof TransactionObjectLocked<?> locked) {
locked.lock().close();
}
}
long finalVersion = version;
Consumer<Runnable> fenceFn = r -> {
writebackObjectPersistentStore.asyncFence(finalVersion, r);
};
return Pair.of(
Stream.concat(
tx.getOnCommit().stream(),
tx.getOnFlush().stream()
Stream.<Runnable>of(() -> {
for (var f : tx.getOnFlush())
fenceFn.accept(f);
})
).toList(),
new TransactionHandle() {
@Override
public void onFlush(Runnable runnable) {
runnable.run();
fenceFn.accept(runnable);
}
});
}
@@ -229,8 +239,9 @@ public class JObjectManager {
Log.tracev("Skipped dependency checks: no changes");
}
var addFlushCallback = snapshotManager.commitTx(writes.values());
var addFlushCallback = writebackObjectPersistentStore.commitTx(writes.values());
// TODO: is it ok to possibly run it inside transaction?
for (var callback : tx.getOnFlush()) {
addFlushCallback.accept(callback);
}

View File

@@ -5,7 +5,7 @@ import com.usatiuk.objects.JDataVersionedWrapper;
import com.usatiuk.objects.JObjectKey;
import com.usatiuk.objects.iterators.*;
import com.usatiuk.objects.snapshot.Snapshot;
import com.usatiuk.objects.snapshot.SnapshotManager;
import com.usatiuk.objects.stores.WritebackObjectPersistentStore;
import io.quarkus.logging.Log;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
@@ -17,7 +17,7 @@ import java.util.*;
@Singleton
public class TransactionFactoryImpl implements TransactionFactory {
@Inject
SnapshotManager snapshotManager;
WritebackObjectPersistentStore writebackObjectPersistentStore;
@Inject
LockManager lockManager;
@ConfigProperty(name = "dhfs.objects.transaction.never-lock")
@@ -64,7 +64,7 @@ public class TransactionFactoryImpl implements TransactionFactory {
private Map<JObjectKey, TxRecord.TxObjectRecord<?>> _newWrites = new HashMap<>();
private TransactionImpl() {
_snapshot = snapshotManager.createSnapshot();
_snapshot = writebackObjectPersistentStore.getSnapshot();
}
@Override
@@ -94,15 +94,12 @@ public class TransactionFactoryImpl implements TransactionFactory {
@Override
public <T extends JData> Optional<T> getFromSource(Class<T> type, JObjectKey key) {
var got = _readSet.get(key);
if (got == null) {
var read = _snapshot.readObject(key);
_readSet.put(key, new TransactionObjectNoLock<>(read));
return read.map(JDataVersionedWrapper::data).map(type::cast);
}
return got.data().map(JDataVersionedWrapper::data).map(type::cast);
return _readSet.computeIfAbsent(key, k -> {
var read = _snapshot.readObject(k);
return new TransactionObjectNoLock<>(read);
})
.data()
.map(w -> type.cast(w.data()));
}
public <T extends JData> Optional<T> getWriteLockedFromSource(Class<T> type, JObjectKey key) {

View File

@@ -6,8 +6,7 @@ import org.pcollections.*;
import java.util.Collection;
import java.util.List;
public record RemoteObjectMeta(PCollection<JDataRef> refsFrom, boolean frozen,
JObjectKey key,
public record RemoteObjectMeta(JObjectKey key, PCollection<JDataRef> refsFrom, boolean frozen,
PMap<PeerId, Long> knownRemoteVersions,
Class<? extends JDataRemote> knownType,
PSet<PeerId> confirmedDeletes,
@@ -16,22 +15,22 @@ public record RemoteObjectMeta(PCollection<JDataRef> refsFrom, boolean frozen,
boolean hasLocalData) implements JDataRefcounted {
// Self put
public RemoteObjectMeta(JDataRemote data, PeerId initialPeer) {
this(HashTreePSet.empty(), false,
data.key(), HashTreePMap.empty(), data.getClass(), HashTreePSet.empty(), false,
this(data.key(), HashTreePSet.empty(), false,
HashTreePMap.empty(), data.getClass(), HashTreePSet.empty(), false,
HashTreePMap.<PeerId, Long>empty().plus(initialPeer, 1L),
true);
}
public RemoteObjectMeta(JObjectKey key, PMap<PeerId, Long> remoteChangelog) {
this(HashTreePSet.empty(), false,
key, HashTreePMap.empty(), JDataRemote.class, HashTreePSet.empty(), true,
this(key, HashTreePSet.empty(), false,
HashTreePMap.empty(), JDataRemote.class, HashTreePSet.empty(), true,
remoteChangelog,
false);
}
public RemoteObjectMeta(JObjectKey key) {
this(HashTreePSet.empty(), false,
key, HashTreePMap.empty(), JDataRemote.class, HashTreePSet.empty(), true,
this(key, HashTreePSet.empty(), false,
HashTreePMap.empty(), JDataRemote.class, HashTreePSet.empty(), true,
TreePMap.empty(),
false);
}
@@ -44,47 +43,42 @@ public record RemoteObjectMeta(PCollection<JDataRef> refsFrom, boolean frozen,
return JObjectKey.of(key.value() + "_data");
}
@Override
public JObjectKey key() {
return ofMetaKey(key);
}
public JObjectKey dataKey() {
return ofDataKey(key);
}
@Override
public RemoteObjectMeta withRefsFrom(PCollection<JDataRef> refs) {
return new RemoteObjectMeta(refs, frozen, key, knownRemoteVersions, knownType, confirmedDeletes, seen, changelog, hasLocalData);
return new RemoteObjectMeta(key, refs, frozen, knownRemoteVersions, knownType, confirmedDeletes, seen, changelog, hasLocalData);
}
@Override
public RemoteObjectMeta withFrozen(boolean frozen) {
return new RemoteObjectMeta(refsFrom, frozen, key, knownRemoteVersions, knownType, confirmedDeletes, seen, changelog, hasLocalData);
return new RemoteObjectMeta(key, refsFrom, frozen, knownRemoteVersions, knownType, confirmedDeletes, seen, changelog, hasLocalData);
}
public RemoteObjectMeta withKnownRemoteVersions(PMap<PeerId, Long> knownRemoteVersions) {
return new RemoteObjectMeta(refsFrom, frozen, key, knownRemoteVersions, knownType, confirmedDeletes, seen, changelog, hasLocalData);
return new RemoteObjectMeta(key, refsFrom, frozen, knownRemoteVersions, knownType, confirmedDeletes, seen, changelog, hasLocalData);
}
public RemoteObjectMeta withKnownType(Class<? extends JDataRemote> knownType) {
return new RemoteObjectMeta(refsFrom, frozen, key, knownRemoteVersions, knownType, confirmedDeletes, seen, changelog, hasLocalData);
return new RemoteObjectMeta(key, refsFrom, frozen, knownRemoteVersions, knownType, confirmedDeletes, seen, changelog, hasLocalData);
}
public RemoteObjectMeta withConfirmedDeletes(PSet<PeerId> confirmedDeletes) {
return new RemoteObjectMeta(refsFrom, frozen, key, knownRemoteVersions, knownType, confirmedDeletes, seen, changelog, hasLocalData);
return new RemoteObjectMeta(key, refsFrom, frozen, knownRemoteVersions, knownType, confirmedDeletes, seen, changelog, hasLocalData);
}
public RemoteObjectMeta withSeen(boolean seen) {
return new RemoteObjectMeta(refsFrom, frozen, key, knownRemoteVersions, knownType, confirmedDeletes, seen, changelog, hasLocalData);
return new RemoteObjectMeta(key, refsFrom, frozen, knownRemoteVersions, knownType, confirmedDeletes, seen, changelog, hasLocalData);
}
public RemoteObjectMeta withChangelog(PMap<PeerId, Long> changelog) {
return new RemoteObjectMeta(refsFrom, frozen, key, knownRemoteVersions, knownType, confirmedDeletes, seen, changelog, hasLocalData);
return new RemoteObjectMeta(key, refsFrom, frozen, knownRemoteVersions, knownType, confirmedDeletes, seen, changelog, hasLocalData);
}
public RemoteObjectMeta withHaveLocal(boolean haveLocal) {
return new RemoteObjectMeta(refsFrom, frozen, key, knownRemoteVersions, knownType, confirmedDeletes, seen, changelog, haveLocal);
return new RemoteObjectMeta(key, refsFrom, frozen, knownRemoteVersions, knownType, confirmedDeletes, seen, changelog, haveLocal);
}
public long versionSum() {

View File

@@ -180,7 +180,7 @@ public class PeerManager {
});
}
private Optional<PeerAddress> selectBestAddress(PeerId host) {
public Optional<PeerAddress> selectBestAddress(PeerId host) {
return peerDiscoveryDirectory.getForPeer(host).stream().min(Comparator.comparing(PeerAddress::type));
}

View File

@@ -18,10 +18,7 @@ import org.apache.commons.lang3.tuple.Pair;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.*;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -90,18 +87,24 @@ public class RemoteObjectServiceClient {
}
public OpPushReply pushOps(PeerId target, List<Op> ops) {
var barrier = new CountDownLatch(ops.size());
for (Op op : ops) {
txm.run(() -> {
for (var ref : op.getEscapedRefs()) {
curTx.get(RemoteObjectMeta.class, ref).map(m -> m.withSeen(true)).ifPresent(curTx::put);
}
});
}).onFlush(barrier::countDown);
}
var builder = OpPushRequest.newBuilder();
for (Op op : ops) {
builder.addMsg(opProtoSerializer.serialize(op));
}
var built = builder.build();
try {
barrier.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
rpcClientFactory.withObjSyncClient(target, (tgt, client) -> client.opPush(built));
return OpPushReply.getDefaultInstance();
}

View File

@@ -8,6 +8,7 @@ import com.usatiuk.dhfs.repository.invalidation.OpHandler;
import com.usatiuk.dhfs.repository.syncmap.DtoMapperService;
import com.usatiuk.objects.JObjectKey;
import com.usatiuk.objects.transaction.Transaction;
import com.usatiuk.objects.transaction.TransactionHandle;
import com.usatiuk.objects.transaction.TransactionManager;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
@@ -17,6 +18,9 @@ import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import org.apache.commons.lang3.tuple.Pair;
import java.util.ArrayList;
import java.util.concurrent.atomic.AtomicLong;
// Note: RunOnVirtualThread hangs somehow
@ApplicationScoped
public class RemoteObjectServiceServerImpl {
@@ -101,19 +105,30 @@ public class RemoteObjectServiceServerImpl {
}
public Uni<OpPushReply> opPush(PeerId from, OpPushRequest request) {
var handles = new ArrayList<TransactionHandle>();
try {
var ops = request.getMsgList().stream().map(opProtoSerializer::deserialize).toList();
for (var op : ops) {
Log.infov("<-- opPush: {0} from {1}", op, from);
txm.run(() -> {
var handle = txm.run(() -> {
opHandler.handleOp(from, op);
});
handles.add(handle);
}
} catch (Exception e) {
Log.error("Error handling ops", e);
throw e;
}
return Uni.createFrom().item(OpPushReply.getDefaultInstance());
return Uni.createFrom().emitter(e -> {
var counter = new AtomicLong(handles.size());
for (var handle : handles) {
handle.onFlush(() -> {
if (counter.decrementAndGet() == 0) {
e.complete(OpPushReply.getDefaultInstance());
}
});
}
});
}
public Uni<PingReply> ping(PeerId from, PingRequest request) {

View File

@@ -1,4 +1,6 @@
package com.usatiuk.dhfs.repository.webapi;
public record KnownPeerInfo(String uuid) {
import jakarta.annotation.Nullable;
public record KnownPeerInfo(String uuid, @Nullable String knownAddress) {
}

View File

@@ -0,0 +1,6 @@
package com.usatiuk.dhfs.repository.webapi;
import java.util.List;
public record KnownPeers(List<KnownPeerInfo> peers, String selfUuid) {
}

View File

@@ -2,6 +2,7 @@ package com.usatiuk.dhfs.repository.webapi;
import com.usatiuk.dhfs.PeerId;
import com.usatiuk.dhfs.repository.PeerManager;
import com.usatiuk.dhfs.repository.PersistentPeerDataService;
import com.usatiuk.dhfs.repository.peersync.PeerInfoService;
import jakarta.inject.Inject;
import jakarta.ws.rs.DELETE;
@@ -11,6 +12,8 @@ import jakarta.ws.rs.Path;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
@Path("/peers-manage")
public class PeerManagementApi {
@@ -18,11 +21,15 @@ public class PeerManagementApi {
PeerInfoService peerInfoService;
@Inject
PeerManager peerManager;
@Inject
PersistentPeerDataService persistentPeerDataService;
@Path("known-peers")
@GET
public List<KnownPeerInfo> knownPeers() {
return peerInfoService.getPeers().stream().map(peerInfo -> new KnownPeerInfo(peerInfo.id().toString())).toList();
public KnownPeers knownPeers() {
return new KnownPeers(peerInfoService.getPeers().stream().map(peerInfo -> new KnownPeerInfo(peerInfo.id().toString(),
Optional.ofNullable(peerManager.getAddress(peerInfo.id())).map(Objects::toString).orElse(null))).toList(),
persistentPeerDataService.getSelfUuid().toString());
}
@Path("known-peers")
@@ -40,7 +47,10 @@ public class PeerManagementApi {
@Path("available-peers")
@GET
public Collection<KnownPeerInfo> availablePeers() {
return peerManager.getSeenButNotAddedHosts().stream().map(p -> new KnownPeerInfo(p.toString())).toList();
return peerManager.getSeenButNotAddedHosts().stream()
.map(p -> new KnownPeerInfo(p.toString(),
peerManager.selectBestAddress(p).map(Objects::toString).orElse(null)))
.toList();
}
@Path("peer-state")

View File

@@ -12,8 +12,10 @@ export function PeerAvailableCard({ peerInfo }: TPeerAvailableCardProps) {
return (
<div className="peerAvailableCard">
<div className={"peerInfo"}>
<span>UUID: </span>
<span>{peerInfo.uuid}</span>
<div>
<span>UUID: </span>
<span>{peerInfo.uuid}</span>
</div>
</div>
<fetcher.Form
className="actions"

View File

@@ -21,15 +21,21 @@ export function PeerKnownCard({ peerInfo }: TPeerKnownCardProps) {
<div className="peerKnownCard">
<div className={"peerInfo"}>
<div>
<div>
<span>UUID: </span>
<span>{peerInfo.uuid}</span>
</div>
<div>
<span>{peerInfo.knownAddress ? "connected" : "not connected"}</span>
</div>
</div>
<div>
<fetcher.Form
className="actions"
method="put"
action={"/home/peers"}
>
<span>Manual address: </span>
<input
name="intent"
hidden={true}
@@ -43,6 +49,7 @@ export function PeerKnownCard({ peerInfo }: TPeerKnownCardProps) {
<input
name="address"
defaultValue={addr?.address || ""}
placeholder={"ip:port:secure port"}
/>
<button type="submit">save</button>
</fetcher.Form>

View File

@@ -8,9 +8,9 @@ import { PeerKnownCard } from "./PeerKnownCard";
export function PeerState() {
const loaderData = useLoaderData() as LoaderToType<typeof peerStateLoader>;
const knownPeers = loaderData.knownPeers.map((p) => (
<PeerKnownCard peerInfo={p} key={p.uuid} />
));
const knownPeers = loaderData.knownPeers.peers
.filter((p) => p.uuid !== loaderData.knownPeers.selfUuid)
.map((p) => <PeerKnownCard peerInfo={p} key={p.uuid} />);
const availablePeers = loaderData.availablePeers.map((p) => (
<PeerAvailableCard peerInfo={p} key={p.uuid} />
@@ -18,6 +18,7 @@ export function PeerState() {
return (
<div id={"PeerState"}>
<div>Self UUID: {loaderData.knownPeers.selfUuid}</div>
<div>
<div>Known peers</div>
<div>{knownPeers}</div>

View File

@@ -1,13 +1,13 @@
import { fetchJSON, fetchJSON_throws } from "./utils";
import {
AvailablePeerInfoToResp,
KnownPeerInfoToResp,
KnownPeersToResp,
NoContentToResp,
PeerAddressInfoToResp,
TAvailablePeerInfoArrTo,
TAvailablePeerInfoToResp,
TKnownPeerInfoArrTo,
TKnownPeerInfoToResp,
TKnownPeerInfoArrTo, TKnownPeersTo,
TKnownPeersToResp,
TNoContentToResp,
TPeerAddressInfoArrTo,
TPeerAddressInfoToResp,
@@ -20,11 +20,11 @@ export async function getAvailablePeers(): Promise<TAvailablePeerInfoArrTo> {
>("/peers-manage/available-peers", "GET", AvailablePeerInfoToResp);
}
export async function getKnownPeers(): Promise<TKnownPeerInfoArrTo> {
return fetchJSON_throws<TKnownPeerInfoToResp, typeof KnownPeerInfoToResp>(
export async function getKnownPeers(): Promise<TKnownPeersTo> {
return fetchJSON_throws<TKnownPeersToResp, typeof KnownPeersToResp>(
"/peers-manage/known-peers",
"GET",
KnownPeerInfoToResp,
KnownPeersToResp,
);
}

View File

@@ -39,6 +39,7 @@ export type TTokenToResp = z.infer<typeof TokenToResp>;
// AvailablePeerInfo
export const AvailablePeerInfoTo = z.object({
uuid: z.string(),
knownAddress: z.string().optional(),
// addr: z.string(),
// port: z.number(),
});
@@ -55,14 +56,21 @@ export type TAvailablePeerInfoToResp = z.infer<typeof AvailablePeerInfoToResp>;
// KnownPeerInfo
export const KnownPeerInfoTo = z.object({
uuid: z.string(),
knownAddress: z.string().optional(),
});
export type TKnownPeerInfoTo = z.infer<typeof KnownPeerInfoTo>;
export const KnownPeerInfoArrTo = z.array(KnownPeerInfoTo);
export type TKnownPeerInfoArrTo = z.infer<typeof KnownPeerInfoArrTo>;
export const KnownPeerInfoToResp = CreateAPIResponse(KnownPeerInfoArrTo);
export type TKnownPeerInfoToResp = z.infer<typeof KnownPeerInfoToResp>;
export const KnownPeersTo = z.object({
selfUuid: z.string(),
peers: KnownPeerInfoArrTo,
});
export type TKnownPeersTo = z.infer<typeof KnownPeersTo>;
export const KnownPeersToResp = CreateAPIResponse(KnownPeersTo);
export type TKnownPeersToResp = z.infer<typeof KnownPeersToResp>;
// PeerAddressInfo
export const PeerAddressInfoTo = z.object({

View File

@@ -40,6 +40,8 @@
color: inherit;
font-size: inherit;
padding: 0.5rem;
width: 100%;
height: 100%;
}