16 Commits

Author SHA1 Message Date
8cbecf1714 Dhfs-fs: remove optional from read 2025-04-25 13:40:48 +02:00
16ba692019 Recordify tree metadata 2025-04-25 13:35:54 +02:00
e5be1e6164 Cleanup poms 2025-04-25 13:13:21 +02:00
c74fdfc5a6 Dhfs-app: test fixes 2 2025-04-25 12:59:25 +02:00
c4268ab35b Dhfs-app: test fixes 2025-04-25 11:23:12 +02:00
2ab6e3c3f7 Sync-base: Handle getting peer info failure nicely 2025-04-25 11:15:36 +02:00
ec8546bd69 Show Peer address in WebUI 2025-04-25 11:13:30 +02:00
6ecef94b90 Webui: a little nicer 2025-04-25 11:07:08 +02:00
e7f22d783f Webui: proper async hash 2025-04-25 11:03:39 +02:00
bed55162d7 Peer certificate check when adding 2025-04-25 10:48:55 +02:00
f43c6db4f0 Run code format 2025-04-25 09:58:46 +02:00
56a15f4672 Sync-base: cleanup JKleppmannTree meta 2025-04-25 09:57:44 +02:00
85a1fa09ab KleppmannTree: a little cleanup 2025-04-25 09:45:35 +02:00
cca0b410cf Some packages cleanup 2025-04-25 09:16:31 +02:00
d94abfee97 Sync-base: op extractor interface 2025-04-25 09:16:31 +02:00
dependabot[bot]
6bd92ad7cd Bump the npm_and_yarn group across 1 directory with 2 updates
Bumps the npm_and_yarn group with 2 updates in the /webui directory: [react-router](https://github.com/remix-run/react-router/tree/HEAD/packages/react-router) and [react-router-dom](https://github.com/remix-run/react-router/tree/HEAD/packages/react-router-dom).


Updates `react-router` from 7.4.1 to 7.5.2
- [Release notes](https://github.com/remix-run/react-router/releases)
- [Changelog](https://github.com/remix-run/react-router/blob/main/packages/react-router/CHANGELOG.md)
- [Commits](https://github.com/remix-run/react-router/commits/react-router@7.5.2/packages/react-router)

Updates `react-router-dom` from 7.4.1 to 7.5.2
- [Release notes](https://github.com/remix-run/react-router/releases)
- [Changelog](https://github.com/remix-run/react-router/blob/main/packages/react-router-dom/CHANGELOG.md)
- [Commits](https://github.com/remix-run/react-router/commits/react-router-dom@7.5.2/packages/react-router-dom)

---
updated-dependencies:
- dependency-name: react-router
  dependency-version: 7.5.2
  dependency-type: direct:production
  dependency-group: npm_and_yarn
- dependency-name: react-router-dom
  dependency-version: 7.5.2
  dependency-type: direct:production
  dependency-group: npm_and_yarn
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-04-25 09:14:39 +02:00
206 changed files with 1228 additions and 1184 deletions

View File

@@ -1,16 +1,17 @@
<component name="ProjectRunConfigurationManager"> <component name="ProjectRunConfigurationManager">
<configuration default="false" name="Main 2" type="QsApplicationConfigurationType" factoryName="QuarkusApplication"> <configuration default="false" name="Main 2" type="QsApplicationConfigurationType" factoryName="QuarkusApplication">
<option name="MAIN_CLASS_NAME" value="com.usatiuk.dhfs.app.Main" /> <option name="MAIN_CLASS_NAME" value="com.usatiuk.dhfsapp.Main"/>
<module name="dhfs-app" /> <module name="dhfs-app"/>
<option name="VM_PARAMETERS" value="-XX:+UnlockDiagnosticVMOptions -XX:+DebugNonSafepoints --add-exports java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-exports java.base/jdk.internal.access=ALL-UNNAMED -ea -Xmx2G -Ddhfs.webui.root=$ProjectFileDir$/../webui/dist -Ddhfs.fuse.root=${HOME}/dhfs_test/2/fuse -Ddhfs.objects.persistence.files.root=${HOME}/dhfs_test/2/data -Ddhfs.objects.persistence.stuff.root=${HOME}/dhfs_test/2/data/stuff -Ddhfs.objects.peerdiscovery.broadcast=false -Dquarkus.http.port=9020 -Dquarkus.http.ssl-port=9021 -Ddhfs.peerdiscovery.preset-uuid=22000000-0000-0000-0000-000000000000 -Ddhfs.peerdiscovery.static-peers=11000000-0000-0000-0000-000000000000:127.0.0.1:8080:9011" /> <option name="VM_PARAMETERS"
<extension name="coverage"> value="-XX:+UnlockDiagnosticVMOptions -XX:+DebugNonSafepoints --add-exports java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-exports java.base/jdk.internal.access=ALL-UNNAMED -ea -Xmx2G -Ddhfs.webui.root=$ProjectFileDir$/../webui/dist -Ddhfs.fuse.root=${HOME}/dhfs_test/2/fuse -Ddhfs.objects.persistence.files.root=${HOME}/dhfs_test/2/data -Ddhfs.objects.persistence.stuff.root=${HOME}/dhfs_test/2/data/stuff -Ddhfs.objects.peerdiscovery.broadcast=false -Dquarkus.http.port=9020 -Dquarkus.http.ssl-port=9021 -Ddhfs.peerdiscovery.preset-uuid=22000000-0000-0000-0000-000000000000 -Ddhfs.peerdiscovery.static-peers=11000000-0000-0000-0000-000000000000:127.0.0.1:9010:9011"/>
<pattern> <extension name="coverage">
<option name="PATTERN" value="com.usatiuk.dhfs.*" /> <pattern>
<option name="ENABLED" value="true" /> <option name="PATTERN" value="com.usatiuk.dhfs.remoteobj.*"/>
</pattern> <option name="ENABLED" value="true"/>
</extension> </pattern>
<method v="2"> </extension>
<option name="Make" enabled="true" /> <method v="2">
</method> <option name="Make" enabled="true"/>
</configuration> </method>
</configuration>
</component> </component>

View File

@@ -1,16 +1,18 @@
<component name="ProjectRunConfigurationManager"> <component name="ProjectRunConfigurationManager">
<configuration default="false" name="Main" type="QsApplicationConfigurationType" factoryName="QuarkusApplication" nameIsGenerated="true"> <configuration default="false" name="Main" type="QsApplicationConfigurationType" factoryName="QuarkusApplication"
<option name="MAIN_CLASS_NAME" value="com.usatiuk.dhfs.app.Main" /> nameIsGenerated="true">
<module name="dhfs-app" /> <option name="MAIN_CLASS_NAME" value="com.usatiuk.dhfsapp.Main"/>
<option name="VM_PARAMETERS" value="-XX:+UnlockDiagnosticVMOptions --enable-preview -XX:+UseParallelGC -XX:+DebugNonSafepoints --add-exports java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-exports java.base/jdk.internal.access=ALL-UNNAMED -ea -Xmx2G -Ddhfs.webui.root=$ProjectFileDir$/../webui/dist -Ddhfs.fuse.root=${HOME}/dhfs_test/1/fuse -Ddhfs.objects.persistence.files.root=${HOME}/dhfs_test/1/data -Ddhfs.objects.persistence.stuff.root=${HOME}/dhfs_test/1/data/stuff -Ddhfs.objects.peerdiscovery.broadcast=true -Dquarkus.http.port=8080 -Dquarkus.http.ssl-port=9011 -Ddhfs.peerdiscovery.preset-uuid=11000000-0000-0000-0000-000000000000 -Ddhfs.peerdiscovery.static-peers=22000000-0000-0000-0000-000000000000:127.0.0.1:9020:9021 -Dquarkus.http.host=0.0.0.0" /> <module name="dhfs-app"/>
<extension name="coverage"> <option name="VM_PARAMETERS"
<pattern> value="-XX:+UnlockDiagnosticVMOptions -XX:+UseParallelGC -XX:+DebugNonSafepoints --add-exports java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-exports java.base/jdk.internal.access=ALL-UNNAMED -ea -Xmx2G -Ddhfs.webui.root=$ProjectFileDir$/../webui/dist -Ddhfs.fuse.root=${HOME}/dhfs_test/1/fuse -Ddhfs.objects.persistence.files.root=${HOME}/dhfs_test/1/data -Ddhfs.objects.persistence.stuff.root=${HOME}/dhfs_test/1/data/stuff -Ddhfs.objects.peerdiscovery.broadcast=true -Dquarkus.http.port=9010 -Dquarkus.http.ssl-port=9011 -Ddhfs.peerdiscovery.preset-uuid=11000000-0000-0000-0000-000000000000 -Ddhfs.peerdiscovery.static-peers=22000000-0000-0000-0000-000000000000:127.0.0.1:9020:9021 -Dquarkus.http.host=0.0.0.0"/>
<option name="PATTERN" value="com.usatiuk.dhfs.*" /> <extension name="coverage">
<option name="ENABLED" value="true" /> <pattern>
</pattern> <option name="PATTERN" value="com.usatiuk.dhfs.remoteobj.*"/>
</extension> <option name="ENABLED" value="true"/>
<method v="2"> </pattern>
<option name="Make" enabled="true" /> </extension>
</method> <method v="2">
</configuration> <option name="Make" enabled="true"/>
</method>
</configuration>
</component> </component>

View File

@@ -107,31 +107,11 @@
<artifactId>commons-math3</artifactId> <artifactId>commons-math3</artifactId>
<version>3.6.1</version> <version>3.6.1</version>
</dependency> </dependency>
<dependency>
<groupId>com.usatiuk</groupId>
<artifactId>kleppmanntree</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.usatiuk.dhfs</groupId>
<artifactId>objects</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.usatiuk.dhfs</groupId>
<artifactId>dhfs-fs</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency> <dependency>
<groupId>com.usatiuk.dhfs</groupId> <groupId>com.usatiuk.dhfs</groupId>
<artifactId>dhfs-fuse</artifactId> <artifactId>dhfs-fuse</artifactId>
<version>1.0-SNAPSHOT</version> <version>1.0-SNAPSHOT</version>
</dependency> </dependency>
<dependency>
<groupId>com.usatiuk.dhfs</groupId>
<artifactId>sync-base</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency> <dependency>
<groupId>com.usatiuk.dhfs</groupId> <groupId>com.usatiuk.dhfs</groupId>
<artifactId>utils</artifactId> <artifactId>utils</artifactId>

View File

@@ -1,4 +1,4 @@
package com.usatiuk.dhfs.app; package com.usatiuk.dhfsapp;
import io.quarkus.runtime.Quarkus; import io.quarkus.runtime.Quarkus;
import io.quarkus.runtime.QuarkusApplication; import io.quarkus.runtime.QuarkusApplication;

View File

@@ -1,4 +1,4 @@
package com.usatiuk.dhfs; package com.usatiuk.dhfsapp;
import io.quarkus.test.junit.QuarkusTestProfile; import io.quarkus.test.junit.QuarkusTestProfile;

View File

@@ -1,4 +1,4 @@
package com.usatiuk.dhfs; package com.usatiuk.dhfsapp;
import io.quarkus.logging.Log; import io.quarkus.logging.Log;
import io.quarkus.runtime.ShutdownEvent; import io.quarkus.runtime.ShutdownEvent;
@@ -18,18 +18,6 @@ public class TestDataCleaner {
@ConfigProperty(name = "dhfs.objects.persistence.files.root") @ConfigProperty(name = "dhfs.objects.persistence.files.root")
String tempDirectory; String tempDirectory;
void init(@Observes @Priority(1) StartupEvent event) throws IOException {
try {
purgeDirectory(Path.of(tempDirectory).toFile());
} catch (Exception ignored) {
Log.warn("Couldn't cleanup test data on init");
}
}
void shutdown(@Observes @Priority(1000000000) ShutdownEvent event) throws IOException {
purgeDirectory(Path.of(tempDirectory).toFile());
}
public static void purgeDirectory(File dir) { public static void purgeDirectory(File dir) {
try { try {
for (File file : Objects.requireNonNull(dir.listFiles())) { for (File file : Objects.requireNonNull(dir.listFiles())) {
@@ -41,4 +29,16 @@ public class TestDataCleaner {
Log.error("Couldn't purge directory " + dir, e); Log.error("Couldn't purge directory " + dir, e);
} }
} }
void init(@Observes @Priority(1) StartupEvent event) throws IOException {
try {
purgeDirectory(Path.of(tempDirectory).toFile());
} catch (Exception ignored) {
Log.warn("Couldn't cleanup test data on init");
}
}
void shutdown(@Observes @Priority(1000000000) ShutdownEvent event) throws IOException {
purgeDirectory(Path.of(tempDirectory).toFile());
}
} }

View File

@@ -1,4 +1,4 @@
package com.usatiuk.dhfs.integration; package com.usatiuk.dhfsapp.integration;
import com.github.dockerjava.api.model.Device; import com.github.dockerjava.api.model.Device;
import io.quarkus.logging.Log; import io.quarkus.logging.Log;
@@ -67,14 +67,14 @@ public class DhfsFuseIT {
var c1curl = container1.execInContainer("/bin/sh", "-c", var c1curl = container1.execInContainer("/bin/sh", "-c",
"curl --header \"Content-Type: application/json\" " + "curl --header \"Content-Type: application/json\" " +
" --request PUT " + " --request PUT " +
" --data '{\"uuid\":\"" + c2uuid + "\"}' " + " --data '{}' " +
" http://localhost:8080/peers-manage/known-peers"); " http://localhost:8080/peers-manage/known-peers/" + c2uuid);
var c2curl = container2.execInContainer("/bin/sh", "-c", var c2curl = container2.execInContainer("/bin/sh", "-c",
"curl --header \"Content-Type: application/json\" " + "curl --header \"Content-Type: application/json\" " +
" --request PUT " + " --request PUT " +
" --data '{\"uuid\":\"" + c1uuid + "\"}' " + " --data '{}' " +
" http://localhost:8080/peers-manage/known-peers"); " http://localhost:8080/peers-manage/known-peers/" + c1uuid);
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS); waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS); waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
@@ -248,8 +248,8 @@ public class DhfsFuseIT {
var c2curl = container2.execInContainer("/bin/sh", "-c", var c2curl = container2.execInContainer("/bin/sh", "-c",
"curl --header \"Content-Type: application/json\" " + "curl --header \"Content-Type: application/json\" " +
" --request DELETE " + " --request DELETE " +
" --data '{\"uuid\":\"" + c1uuid + "\"}' " + " --data '{}' " +
" http://localhost:8080/peers-manage/known-peers"); " http://localhost:8080/peers-manage/known-peers/"+c1uuid);
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "echo rewritten > /dhfs_test/fuse/testf1").getExitCode()); await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "echo rewritten > /dhfs_test/fuse/testf1").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "echo jioadsd > /dhfs_test/fuse/newfile1").getExitCode()); await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "echo jioadsd > /dhfs_test/fuse/newfile1").getExitCode());
@@ -264,8 +264,8 @@ public class DhfsFuseIT {
container2.execInContainer("/bin/sh", "-c", container2.execInContainer("/bin/sh", "-c",
"curl --header \"Content-Type: application/json\" " + "curl --header \"Content-Type: application/json\" " +
" --request PUT " + " --request PUT " +
" --data '{\"uuid\":\"" + c1uuid + "\"}' " + " --data '{}' " +
" http://localhost:8080/peers-manage/known-peers"); " http://localhost:8080/peers-manage/known-peers/"+c1uuid);
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS); waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS); waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);

View File

@@ -1,4 +1,4 @@
package com.usatiuk.dhfs.integration; package com.usatiuk.dhfsapp.integration;
import com.github.dockerjava.api.model.Device; import com.github.dockerjava.api.model.Device;
import io.quarkus.logging.Log; import io.quarkus.logging.Log;
@@ -93,26 +93,26 @@ public class DhfsFusex3IT {
var c1curl = container1.execInContainer("/bin/sh", "-c", var c1curl = container1.execInContainer("/bin/sh", "-c",
"curl --header \"Content-Type: application/json\" " + "curl --header \"Content-Type: application/json\" " +
" --request PUT " + " --request PUT " +
" --data '{\"uuid\":\"" + c2uuid + "\"}' " + " --data '{}' " +
" http://localhost:8080/peers-manage/known-peers"); " http://localhost:8080/peers-manage/known-peers/" + c2uuid);
var c2curl1 = container2.execInContainer("/bin/sh", "-c", var c2curl1 = container2.execInContainer("/bin/sh", "-c",
"curl --header \"Content-Type: application/json\" " + "curl --header \"Content-Type: application/json\" " +
" --request PUT " + " --request PUT " +
" --data '{\"uuid\":\"" + c1uuid + "\"}' " + " --data '{}' " +
" http://localhost:8080/peers-manage/known-peers"); " http://localhost:8080/peers-manage/known-peers/" + c1uuid);
var c2curl3 = container2.execInContainer("/bin/sh", "-c", var c2curl3 = container2.execInContainer("/bin/sh", "-c",
"curl --header \"Content-Type: application/json\" " + "curl --header \"Content-Type: application/json\" " +
" --request PUT " + " --request PUT " +
" --data '{\"uuid\":\"" + c3uuid + "\"}' " + " --data '{}' " +
" http://localhost:8080/peers-manage/known-peers"); " http://localhost:8080/peers-manage/known-peers/" + c3uuid);
var c3curl = container3.execInContainer("/bin/sh", "-c", var c3curl = container3.execInContainer("/bin/sh", "-c",
"curl --header \"Content-Type: application/json\" " + "curl --header \"Content-Type: application/json\" " +
" --request PUT " + " --request PUT " +
" --data '{\"uuid\":\"" + c2uuid + "\"}' " + " --data '{}' " +
" http://localhost:8080/peers-manage/known-peers"); " http://localhost:8080/peers-manage/known-peers/" + c2uuid);
waitingConsumer3.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS, 2); waitingConsumer3.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS, 2);
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS, 2); waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS, 2);
@@ -193,8 +193,8 @@ public class DhfsFusex3IT {
var c3curl = container3.execInContainer("/bin/sh", "-c", var c3curl = container3.execInContainer("/bin/sh", "-c",
"curl --header \"Content-Type: application/json\" " + "curl --header \"Content-Type: application/json\" " +
" --request DELETE " + " --request DELETE " +
" --data '{\"uuid\":\"" + c2uuid + "\"}' " + " --data '{}' " +
" http://localhost:8080/peers-manage/known-peers"); " http://localhost:8080/peers-manage/known-peers/" + c2uuid);
Thread.sleep(10000); Thread.sleep(10000);

View File

@@ -1,4 +1,4 @@
package com.usatiuk.dhfs.integration; package com.usatiuk.dhfsapp.integration;
import io.quarkus.logging.Log; import io.quarkus.logging.Log;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;
@@ -78,6 +78,7 @@ public class DhfsImage implements Future<String> {
"-Ddhfs.objects.sync.timeout=30", "-Ddhfs.objects.sync.timeout=30",
"-Ddhfs.objects.sync.ping.timeout=5", "-Ddhfs.objects.sync.ping.timeout=5",
"-Ddhfs.objects.reconnect_interval=1s", "-Ddhfs.objects.reconnect_interval=1s",
"-Ddhfs.sync.cert-check=false",
"-Dquarkus.log.category.\"com.usatiuk\".level=TRACE", "-Dquarkus.log.category.\"com.usatiuk\".level=TRACE",
"-Dquarkus.log.category.\"com.usatiuk.dhfs\".level=TRACE", "-Dquarkus.log.category.\"com.usatiuk.dhfs\".level=TRACE",
"-Dquarkus.log.category.\"com.usatiuk.objects.transaction\".level=INFO", "-Dquarkus.log.category.\"com.usatiuk.objects.transaction\".level=INFO",

View File

@@ -1,7 +1,7 @@
package com.usatiuk.dhfs.integration; package com.usatiuk.dhfsapp.integration;
import com.github.dockerjava.api.model.Device; import com.github.dockerjava.api.model.Device;
import com.usatiuk.dhfs.TestDataCleaner; import com.usatiuk.dhfsapp.TestDataCleaner;
import io.quarkus.logging.Log; import io.quarkus.logging.Log;
import org.junit.jupiter.api.*; import org.junit.jupiter.api.*;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@@ -81,14 +81,14 @@ public class KillIT {
var c1curl = container1.execInContainer("/bin/sh", "-c", var c1curl = container1.execInContainer("/bin/sh", "-c",
"curl --header \"Content-Type: application/json\" " + "curl --header \"Content-Type: application/json\" " +
" --request PUT " + " --request PUT " +
" --data '{\"uuid\":\"" + c2uuid + "\"}' " + " --data '{}' " +
" http://localhost:8080/peers-manage/known-peers"); " http://localhost:8080/peers-manage/known-peers/" + c2uuid);
var c2curl = container2.execInContainer("/bin/sh", "-c", var c2curl = container2.execInContainer("/bin/sh", "-c",
"curl --header \"Content-Type: application/json\" " + "curl --header \"Content-Type: application/json\" " +
" --request PUT " + " --request PUT " +
" --data '{\"uuid\":\"" + c1uuid + "\"}' " + " --data '{}' " +
" http://localhost:8080/peers-manage/known-peers"); " http://localhost:8080/peers-manage/known-peers/" + c1uuid);
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS); waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS); waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);

View File

@@ -1,4 +1,4 @@
package com.usatiuk.dhfs.integration; package com.usatiuk.dhfsapp.integration;
import io.quarkus.logging.Log; import io.quarkus.logging.Log;
@@ -6,25 +6,26 @@ import java.io.*;
import java.nio.file.Path; import java.nio.file.Path;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
public class LazyFs { public class LazyFs {
private static final String lazyFsPath; private static final String lazyFsPath;
private final String mountRoot;
private final String dataRoot;
private final String name;
private final File configFile;
private final File fifoFile;
static { static {
lazyFsPath = System.getProperty("lazyFsPath"); lazyFsPath = System.getProperty("lazyFsPath");
System.out.println("LazyFs Path: " + lazyFsPath); System.out.println("LazyFs Path: " + lazyFsPath);
} }
private final String mountRoot;
private final String dataRoot;
private final String name;
private final File configFile;
private final File fifoFile;
private Thread errPiper;
private Thread outPiper;
private CountDownLatch startLatch;
private Process fs;
public LazyFs(String name, String mountRoot, String dataRoot) { public LazyFs(String name, String mountRoot, String dataRoot) {
this.name = name; this.name = name;
this.mountRoot = mountRoot; this.mountRoot = mountRoot;
@@ -43,11 +44,6 @@ public class LazyFs {
Runtime.getRuntime().addShutdownHook(new Thread(this::stop)); Runtime.getRuntime().addShutdownHook(new Thread(this::stop));
} }
private Thread errPiper;
private Thread outPiper;
private CountDownLatch startLatch;
private Process fs;
private String fifoPath() { private String fifoPath() {
return fifoFile.getAbsolutePath(); return fifoFile.getAbsolutePath();
} }

View File

@@ -1,14 +1,11 @@
package com.usatiuk.dhfs.integration; package com.usatiuk.dhfsapp.integration;
import com.github.dockerjava.api.model.Device; import com.github.dockerjava.api.model.Device;
import com.usatiuk.dhfs.TestDataCleaner; import com.usatiuk.dhfsapp.TestDataCleaner;
import io.quarkus.logging.Log; import io.quarkus.logging.Log;
import org.junit.jupiter.api.*; import org.junit.jupiter.api.*;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.testcontainers.DockerClientFactory; import org.testcontainers.DockerClientFactory;
import org.testcontainers.containers.GenericContainer; import org.testcontainers.containers.GenericContainer;
@@ -96,14 +93,14 @@ public class LazyFsIT {
var c1curl = container1.execInContainer("/bin/sh", "-c", var c1curl = container1.execInContainer("/bin/sh", "-c",
"curl --header \"Content-Type: application/json\" " + "curl --header \"Content-Type: application/json\" " +
" --request PUT " + " --request PUT " +
" --data '{\"uuid\":\"" + c2uuid + "\"}' " + " --data '{}' " +
" http://localhost:8080/peers-manage/known-peers"); " http://localhost:8080/peers-manage/known-peers/" + c2uuid);
var c2curl = container2.execInContainer("/bin/sh", "-c", var c2curl = container2.execInContainer("/bin/sh", "-c",
"curl --header \"Content-Type: application/json\" " + "curl --header \"Content-Type: application/json\" " +
" --request PUT " + " --request PUT " +
" --data '{\"uuid\":\"" + c1uuid + "\"}' " + " --data '{}' " +
" http://localhost:8080/peers-manage/known-peers"); " http://localhost:8080/peers-manage/known-peers/" + c1uuid);
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS); waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS); waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
@@ -140,12 +137,6 @@ public class LazyFsIT {
}); });
} }
private static enum CrashType {
CRASH,
TORN_OP,
TORN_SEQ
}
@ParameterizedTest @ParameterizedTest
@EnumSource(CrashType.class) @EnumSource(CrashType.class)
void killTest(CrashType crashType, TestInfo testInfo) throws Exception { void killTest(CrashType crashType, TestInfo testInfo) throws Exception {
@@ -185,9 +176,7 @@ public class LazyFsIT {
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS); waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
} catch (TimeoutException e) { } catch (TimeoutException e) {
Log.info("Failed to connect: " + testInfo.getDisplayName()); Log.info("Failed to connect: " + testInfo.getDisplayName());
if (crashType.equals(CrashType.CRASH)) // Sometimes it doesn't get mounted properly for some reason
throw e;
// LazyFs can crash too early
Assumptions.assumeTrue(false); Assumptions.assumeTrue(false);
} }
@@ -205,7 +194,15 @@ public class LazyFsIT {
Thread.sleep(3000); Thread.sleep(3000);
lazyFs1.crash(); lazyFs1.crash();
} }
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Caused by: org.lmdbjava"), 60, TimeUnit.SECONDS); try {
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Caused by: org.lmdbjava"), 60, TimeUnit.SECONDS);
} catch (TimeoutException e) {
// Sometimes crash doesn't work
Log.info("Failed to crash: " + testInfo.getDisplayName());
if (crashType.equals(CrashType.CRASH))
throw e;
Assumptions.assumeTrue(false);
}
client.killContainerCmd(container1.getContainerId()).exec(); client.killContainerCmd(container1.getContainerId()).exec();
container1.stop(); container1.stop();
lazyFs1.stop(); lazyFs1.stop();
@@ -223,7 +220,6 @@ public class LazyFsIT {
checkConsistency(testInfo.getDisplayName()); checkConsistency(testInfo.getDisplayName());
} }
@ParameterizedTest @ParameterizedTest
@EnumSource(CrashType.class) @EnumSource(CrashType.class)
void killTestDirs(CrashType crashType, TestInfo testInfo) throws Exception { void killTestDirs(CrashType crashType, TestInfo testInfo) throws Exception {
@@ -263,9 +259,7 @@ public class LazyFsIT {
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS); waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
} catch (TimeoutException e) { } catch (TimeoutException e) {
Log.info("Failed to connect: " + testInfo.getDisplayName()); Log.info("Failed to connect: " + testInfo.getDisplayName());
if (crashType.equals(CrashType.CRASH)) // Sometimes it doesn't get mounted properly for some reason
throw e;
// LazyFs can crash too early
Assumptions.assumeTrue(false); Assumptions.assumeTrue(false);
} }
@@ -284,7 +278,15 @@ public class LazyFsIT {
Thread.sleep(3000); Thread.sleep(3000);
lazyFs1.crash(); lazyFs1.crash();
} }
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Caused by: org.lmdbjava"), 60, TimeUnit.SECONDS); try {
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Caused by: org.lmdbjava"), 60, TimeUnit.SECONDS);
} catch (TimeoutException e) {
// Sometimes crash doesn't work
Log.info("Failed to crash: " + testInfo.getDisplayName());
if (crashType.equals(CrashType.CRASH))
throw e;
Assumptions.assumeTrue(false);
}
client.killContainerCmd(container1.getContainerId()).exec(); client.killContainerCmd(container1.getContainerId()).exec();
container1.stop(); container1.stop();
lazyFs1.stop(); lazyFs1.stop();
@@ -342,9 +344,7 @@ public class LazyFsIT {
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS); waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
} catch (TimeoutException e) { } catch (TimeoutException e) {
Log.info("Failed to connect: " + testInfo.getDisplayName()); Log.info("Failed to connect: " + testInfo.getDisplayName());
if (crashType.equals(CrashType.CRASH)) // Sometimes it doesn't get mounted properly for some reason
throw e;
// LazyFs can crash too early
Assumptions.assumeTrue(false); Assumptions.assumeTrue(false);
} }
@@ -365,7 +365,15 @@ public class LazyFsIT {
lazyFs2.crash(); lazyFs2.crash();
} }
container1.execInContainer("/bin/sh", "-c", "touch /tmp/stopprinting2"); container1.execInContainer("/bin/sh", "-c", "touch /tmp/stopprinting2");
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Caused by: org.lmdbjava"), 60, TimeUnit.SECONDS); try {
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Caused by: org.lmdbjava"), 60, TimeUnit.SECONDS);
} catch (TimeoutException e) {
// Sometimes crash doesn't work
Log.info("Failed to crash: " + testInfo.getDisplayName());
if (crashType.equals(CrashType.CRASH))
throw e;
Assumptions.assumeTrue(false);
}
client.killContainerCmd(container2.getContainerId()).exec(); client.killContainerCmd(container2.getContainerId()).exec();
container2.stop(); container2.stop();
lazyFs2.stop(); lazyFs2.stop();
@@ -383,7 +391,6 @@ public class LazyFsIT {
checkConsistency(testInfo.getDisplayName()); checkConsistency(testInfo.getDisplayName());
} }
@ParameterizedTest @ParameterizedTest
@EnumSource(CrashType.class) @EnumSource(CrashType.class)
void killTestDirs2(CrashType crashType, TestInfo testInfo) throws Exception { void killTestDirs2(CrashType crashType, TestInfo testInfo) throws Exception {
@@ -424,10 +431,7 @@ public class LazyFsIT {
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS); waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
} catch (TimeoutException e) { } catch (TimeoutException e) {
Log.info("Failed to connect: " + testInfo.getDisplayName()); Log.info("Failed to connect: " + testInfo.getDisplayName());
if (crashType.equals(CrashType.CRASH)) // Sometimes it doesn't get mounted properly for some reason
throw e;
// LazyFs can crash too early
Assumptions.assumeTrue(false); Assumptions.assumeTrue(false);
} }
@@ -448,7 +452,15 @@ public class LazyFsIT {
lazyFs2.crash(); lazyFs2.crash();
} }
container1.execInContainer("/bin/sh", "-c", "touch /tmp/stopprinting2"); container1.execInContainer("/bin/sh", "-c", "touch /tmp/stopprinting2");
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Caused by: org.lmdbjava"), 60, TimeUnit.SECONDS); try {
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Caused by: org.lmdbjava"), 60, TimeUnit.SECONDS);
} catch (TimeoutException e) {
// Sometimes crash doesn't work
Log.info("Failed to crash: " + testInfo.getDisplayName());
if (crashType.equals(CrashType.CRASH))
throw e;
Assumptions.assumeTrue(false);
}
client.killContainerCmd(container2.getContainerId()).exec(); client.killContainerCmd(container2.getContainerId()).exec();
container2.stop(); container2.stop();
lazyFs2.stop(); lazyFs2.stop();
@@ -467,4 +479,11 @@ public class LazyFsIT {
} }
private static enum CrashType {
CRASH,
TORN_OP,
TORN_SEQ
}
} }

View File

@@ -1,4 +1,4 @@
package com.usatiuk.dhfs.integration; package com.usatiuk.dhfsapp.integration;
import com.github.dockerjava.api.model.Device; import com.github.dockerjava.api.model.Device;
import org.junit.jupiter.api.*; import org.junit.jupiter.api.*;
@@ -75,14 +75,14 @@ public class ResyncIT {
var c1curl = container1.execInContainer("/bin/sh", "-c", var c1curl = container1.execInContainer("/bin/sh", "-c",
"curl --header \"Content-Type: application/json\" " + "curl --header \"Content-Type: application/json\" " +
" --request PUT " + " --request PUT " +
" --data '{\"uuid\":\"" + c2uuid + "\"}' " + " --data '{}' " +
" http://localhost:8080/peers-manage/known-peers"); " http://localhost:8080/peers-manage/known-peers/" + c2uuid);
var c2curl = container2.execInContainer("/bin/sh", "-c", var c2curl = container2.execInContainer("/bin/sh", "-c",
"curl --header \"Content-Type: application/json\" " + "curl --header \"Content-Type: application/json\" " +
" --request PUT " + " --request PUT " +
" --data '{\"uuid\":\"" + c1uuid + "\"}' " + " --data '{}' " +
" http://localhost:8080/peers-manage/known-peers"); " http://localhost:8080/peers-manage/known-peers/" + c1uuid);
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS); waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS); waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
@@ -115,14 +115,14 @@ public class ResyncIT {
var c1curl = container1.execInContainer("/bin/sh", "-c", var c1curl = container1.execInContainer("/bin/sh", "-c",
"curl --header \"Content-Type: application/json\" " + "curl --header \"Content-Type: application/json\" " +
" --request PUT " + " --request PUT " +
" --data '{\"uuid\":\"" + c2uuid + "\"}' " + " --data '{}' " +
" http://localhost:8080/peers-manage/known-peers"); " http://localhost:8080/peers-manage/known-peers/" + c2uuid);
var c2curl = container2.execInContainer("/bin/sh", "-c", var c2curl = container2.execInContainer("/bin/sh", "-c",
"curl --header \"Content-Type: application/json\" " + "curl --header \"Content-Type: application/json\" " +
" --request PUT " + " --request PUT " +
" --data '{\"uuid\":\"" + c1uuid + "\"}' " + " --data '{}' " +
" http://localhost:8080/peers-manage/known-peers"); " http://localhost:8080/peers-manage/known-peers/" + c1uuid);
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS); waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS); waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
@@ -155,14 +155,14 @@ public class ResyncIT {
var c1curl = container1.execInContainer("/bin/sh", "-c", var c1curl = container1.execInContainer("/bin/sh", "-c",
"curl --header \"Content-Type: application/json\" " + "curl --header \"Content-Type: application/json\" " +
" --request PUT " + " --request PUT " +
" --data '{\"uuid\":\"" + c2uuid + "\"}' " + " --data '{}' " +
" http://localhost:8080/peers-manage/known-peers"); " http://localhost:8080/peers-manage/known-peers/" + c2uuid);
var c2curl = container2.execInContainer("/bin/sh", "-c", var c2curl = container2.execInContainer("/bin/sh", "-c",
"curl --header \"Content-Type: application/json\" " + "curl --header \"Content-Type: application/json\" " +
" --request PUT " + " --request PUT " +
" --data '{\"uuid\":\"" + c1uuid + "\"}' " + " --data '{}' " +
" http://localhost:8080/peers-manage/known-peers"); " http://localhost:8080/peers-manage/known-peers/" + c1uuid);
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS); waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS); waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);

View File

@@ -102,26 +102,11 @@
<artifactId>commons-math3</artifactId> <artifactId>commons-math3</artifactId>
<version>3.6.1</version> <version>3.6.1</version>
</dependency> </dependency>
<dependency>
<groupId>com.usatiuk</groupId>
<artifactId>kleppmanntree</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.usatiuk.dhfs</groupId>
<artifactId>objects</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency> <dependency>
<groupId>com.usatiuk.dhfs</groupId> <groupId>com.usatiuk.dhfs</groupId>
<artifactId>sync-base</artifactId> <artifactId>sync-base</artifactId>
<version>1.0-SNAPSHOT</version> <version>1.0-SNAPSHOT</version>
</dependency> </dependency>
<dependency>
<groupId>com.usatiuk.dhfs</groupId>
<artifactId>utils</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
</dependencies> </dependencies>
<build> <build>

View File

@@ -1,9 +1,9 @@
package com.usatiuk.dhfs.files.objects; package com.usatiuk.dhfsfs.objects;
import com.google.protobuf.ByteString; import com.google.protobuf.ByteString;
import com.usatiuk.dhfs.JDataRemote; import com.usatiuk.dhfs.remoteobj.JDataRemote;
import com.usatiuk.dhfs.remoteobj.JDataRemoteDto;
import com.usatiuk.objects.JObjectKey; import com.usatiuk.objects.JObjectKey;
import com.usatiuk.dhfs.repository.JDataRemoteDto;
public record ChunkData(JObjectKey key, ByteString data) implements JDataRemote, JDataRemoteDto { public record ChunkData(JObjectKey key, ByteString data) implements JDataRemote, JDataRemoteDto {
@Override @Override

View File

@@ -1,9 +1,9 @@
package com.usatiuk.dhfs.files.objects; package com.usatiuk.dhfsfs.objects;
import com.usatiuk.dhfs.ProtoSerializer; import com.usatiuk.dhfs.ProtoSerializer;
import com.usatiuk.objects.JObjectKey;
import com.usatiuk.dhfs.persistence.ChunkDataP; import com.usatiuk.dhfs.persistence.ChunkDataP;
import com.usatiuk.dhfs.persistence.JObjectKeyP; import com.usatiuk.dhfs.persistence.JObjectKeyP;
import com.usatiuk.objects.JObjectKey;
import jakarta.inject.Singleton; import jakarta.inject.Singleton;
@Singleton @Singleton

View File

@@ -1,9 +1,9 @@
package com.usatiuk.dhfs.files.objects; package com.usatiuk.dhfsfs.objects;
import com.usatiuk.dhfs.JDataRemote;
import com.usatiuk.dhfs.jmap.JMapHolder; import com.usatiuk.dhfs.jmap.JMapHolder;
import com.usatiuk.dhfs.jmap.JMapLongKey; import com.usatiuk.dhfs.jmap.JMapLongKey;
import com.usatiuk.dhfs.repository.JDataRemoteDto; import com.usatiuk.dhfs.remoteobj.JDataRemote;
import com.usatiuk.dhfs.remoteobj.JDataRemoteDto;
import com.usatiuk.objects.JObjectKey; import com.usatiuk.objects.JObjectKey;
import java.util.Collection; import java.util.Collection;

View File

@@ -1,8 +1,8 @@
package com.usatiuk.dhfs.files.objects; package com.usatiuk.dhfsfs.objects;
import com.usatiuk.dhfs.JDataRemote; import com.usatiuk.dhfs.remoteobj.JDataRemote;
import com.usatiuk.dhfs.remoteobj.JDataRemoteDto;
import com.usatiuk.objects.JObjectKey; import com.usatiuk.objects.JObjectKey;
import com.usatiuk.dhfs.repository.JDataRemoteDto;
import org.apache.commons.lang3.tuple.Pair; import org.apache.commons.lang3.tuple.Pair;
import java.util.List; import java.util.List;

View File

@@ -1,7 +1,7 @@
package com.usatiuk.dhfs.files.objects; package com.usatiuk.dhfsfs.objects;
import com.usatiuk.dhfs.jmap.JMapHelper; import com.usatiuk.dhfs.jmap.JMapHelper;
import com.usatiuk.dhfs.repository.syncmap.DtoMapper; import com.usatiuk.dhfs.syncmap.DtoMapper;
import jakarta.enterprise.context.ApplicationScoped; import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject; import jakarta.inject.Inject;

View File

@@ -1,8 +1,8 @@
package com.usatiuk.dhfs.files.objects; package com.usatiuk.dhfsfs.objects;
import com.usatiuk.objects.JObjectKey;
import com.usatiuk.dhfs.jmap.JMapHelper; import com.usatiuk.dhfs.jmap.JMapHelper;
import com.usatiuk.dhfs.jmap.JMapLongKey; import com.usatiuk.dhfs.jmap.JMapLongKey;
import com.usatiuk.objects.JObjectKey;
import jakarta.enterprise.context.ApplicationScoped; import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject; import jakarta.inject.Inject;
import org.apache.commons.lang3.tuple.Pair; import org.apache.commons.lang3.tuple.Pair;

View File

@@ -1,8 +1,8 @@
package com.usatiuk.dhfs.files.objects; package com.usatiuk.dhfsfs.objects;
import com.usatiuk.dhfs.ProtoSerializer; import com.usatiuk.dhfs.ProtoSerializer;
import com.usatiuk.dhfs.persistence.FileDtoP; import com.usatiuk.dhfs.persistence.FileDtoP;
import com.usatiuk.dhfs.utils.SerializationHelper; import com.usatiuk.utils.SerializationHelper;
import jakarta.inject.Singleton; import jakarta.inject.Singleton;
import java.io.IOException; import java.io.IOException;

View File

@@ -1,20 +1,15 @@
package com.usatiuk.dhfs.files.objects; package com.usatiuk.dhfsfs.objects;
import com.usatiuk.dhfs.PeerId;
import com.usatiuk.dhfs.RemoteObjectDataWrapper;
import com.usatiuk.dhfs.RemoteObjectMeta;
import com.usatiuk.dhfs.RemoteTransaction;
import com.usatiuk.dhfs.files.service.DhfsFileService;
import com.usatiuk.dhfs.jkleppmanntree.JKleppmannTreeManager; import com.usatiuk.dhfs.jkleppmanntree.JKleppmannTreeManager;
import com.usatiuk.dhfs.jkleppmanntree.structs.JKleppmannTreeNodeMetaFile;
import com.usatiuk.dhfs.jmap.JMapHelper; import com.usatiuk.dhfs.jmap.JMapHelper;
import com.usatiuk.dhfs.repository.ObjSyncHandler; import com.usatiuk.dhfs.peersync.PeerId;
import com.usatiuk.dhfs.repository.PersistentPeerDataService; import com.usatiuk.dhfs.peersync.PersistentPeerDataService;
import com.usatiuk.dhfs.repository.SyncHelper; import com.usatiuk.dhfs.remoteobj.*;
import com.usatiuk.dhfsfs.service.DhfsFileService;
import com.usatiuk.kleppmanntree.AlreadyExistsException;
import com.usatiuk.objects.JObjectKey; import com.usatiuk.objects.JObjectKey;
import com.usatiuk.objects.transaction.LockingStrategy; import com.usatiuk.objects.transaction.LockingStrategy;
import com.usatiuk.objects.transaction.Transaction; import com.usatiuk.objects.transaction.Transaction;
import com.usatiuk.kleppmanntree.AlreadyExistsException;
import io.grpc.Status; import io.grpc.Status;
import io.grpc.StatusRuntimeException; import io.grpc.StatusRuntimeException;
import io.quarkus.logging.Log; import io.quarkus.logging.Log;
@@ -48,11 +43,11 @@ public class FileSyncHandler implements ObjSyncHandler<File, FileDto> {
DhfsFileService fileService; DhfsFileService fileService;
private JKleppmannTreeManager.JKleppmannTree getTreeW() { private JKleppmannTreeManager.JKleppmannTree getTreeW() {
return jKleppmannTreeManager.getTree(JObjectKey.of("fs")); return jKleppmannTreeManager.getTree(JObjectKey.of("fs")).orElseThrow();
} }
private JKleppmannTreeManager.JKleppmannTree getTreeR() { private JKleppmannTreeManager.JKleppmannTree getTreeR() {
return jKleppmannTreeManager.getTree(JObjectKey.of("fs"), LockingStrategy.OPTIMISTIC); return jKleppmannTreeManager.getTree(JObjectKey.of("fs"), LockingStrategy.OPTIMISTIC).orElseThrow();
} }
private void resolveConflict(PeerId from, JObjectKey key, PMap<PeerId, Long> receivedChangelog, private void resolveConflict(PeerId from, JObjectKey key, PMap<PeerId, Long> receivedChangelog,

View File

@@ -0,0 +1,18 @@
package com.usatiuk.dhfsfs.objects;
import com.usatiuk.dhfs.jkleppmanntree.structs.JKleppmannTreeNodeMeta;
import com.usatiuk.objects.JObjectKey;
import java.util.Collection;
import java.util.List;
public record JKleppmannTreeNodeMetaDirectory(String name) implements JKleppmannTreeNodeMeta {
public JKleppmannTreeNodeMeta withName(String name) {
return new JKleppmannTreeNodeMetaDirectory(name);
}
@Override
public Collection<JObjectKey> collectRefsTo() {
return List.of();
}
}

View File

@@ -0,0 +1,19 @@
package com.usatiuk.dhfsfs.objects;
import com.usatiuk.dhfs.jkleppmanntree.structs.JKleppmannTreeNodeMeta;
import com.usatiuk.objects.JObjectKey;
import java.util.Collection;
import java.util.List;
public record JKleppmannTreeNodeMetaFile(String name, JObjectKey fileIno) implements JKleppmannTreeNodeMeta {
@Override
public JKleppmannTreeNodeMeta withName(String name) {
return new JKleppmannTreeNodeMetaFile(name, fileIno);
}
@Override
public Collection<JObjectKey> collectRefsTo() {
return List.of(fileIno);
}
}

View File

@@ -1,4 +1,4 @@
package com.usatiuk.dhfs.files.service; package com.usatiuk.dhfsfs.service;
import com.google.protobuf.ByteString; import com.google.protobuf.ByteString;
import com.google.protobuf.UnsafeByteOperations; import com.google.protobuf.UnsafeByteOperations;
@@ -30,7 +30,7 @@ public interface DhfsFileService {
long size(JObjectKey fileUuid); long size(JObjectKey fileUuid);
Optional<ByteString> read(JObjectKey fileUuid, long offset, int length); ByteString read(JObjectKey fileUuid, long offset, int length);
Long write(JObjectKey fileUuid, long offset, ByteString data); Long write(JObjectKey fileUuid, long offset, ByteString data);

View File

@@ -1,27 +1,27 @@
package com.usatiuk.dhfs.files.service; package com.usatiuk.dhfsfs.service;
import com.google.protobuf.ByteString; import com.google.protobuf.ByteString;
import com.google.protobuf.UnsafeByteOperations; import com.google.protobuf.UnsafeByteOperations;
import com.usatiuk.dhfs.JDataRemote;
import com.usatiuk.dhfs.RemoteObjectMeta;
import com.usatiuk.dhfs.RemoteTransaction;
import com.usatiuk.dhfs.files.objects.ChunkData;
import com.usatiuk.dhfs.files.objects.File;
import com.usatiuk.dhfs.jkleppmanntree.JKleppmannTreeManager; import com.usatiuk.dhfs.jkleppmanntree.JKleppmannTreeManager;
import com.usatiuk.dhfs.jkleppmanntree.structs.JKleppmannTreeNode; import com.usatiuk.dhfs.jkleppmanntree.structs.JKleppmannTreeNode;
import com.usatiuk.dhfs.jkleppmanntree.structs.JKleppmannTreeNodeMeta; import com.usatiuk.dhfs.jkleppmanntree.structs.JKleppmannTreeNodeMeta;
import com.usatiuk.dhfs.jkleppmanntree.structs.JKleppmannTreeNodeMetaDirectory;
import com.usatiuk.dhfs.jkleppmanntree.structs.JKleppmannTreeNodeMetaFile;
import com.usatiuk.dhfs.jmap.JMapEntry; import com.usatiuk.dhfs.jmap.JMapEntry;
import com.usatiuk.dhfs.jmap.JMapHelper; import com.usatiuk.dhfs.jmap.JMapHelper;
import com.usatiuk.dhfs.jmap.JMapLongKey; import com.usatiuk.dhfs.jmap.JMapLongKey;
import com.usatiuk.dhfs.utils.StatusRuntimeExceptionNoStacktrace; import com.usatiuk.dhfs.remoteobj.JDataRemote;
import com.usatiuk.dhfs.remoteobj.RemoteObjectMeta;
import com.usatiuk.dhfs.remoteobj.RemoteTransaction;
import com.usatiuk.dhfsfs.objects.ChunkData;
import com.usatiuk.dhfsfs.objects.File;
import com.usatiuk.dhfsfs.objects.JKleppmannTreeNodeMetaDirectory;
import com.usatiuk.dhfsfs.objects.JKleppmannTreeNodeMetaFile;
import com.usatiuk.objects.JData; import com.usatiuk.objects.JData;
import com.usatiuk.objects.JObjectKey; import com.usatiuk.objects.JObjectKey;
import com.usatiuk.objects.iterators.IteratorStart; import com.usatiuk.objects.iterators.IteratorStart;
import com.usatiuk.objects.transaction.LockingStrategy; import com.usatiuk.objects.transaction.LockingStrategy;
import com.usatiuk.objects.transaction.Transaction; import com.usatiuk.objects.transaction.Transaction;
import com.usatiuk.objects.transaction.TransactionManager; import com.usatiuk.objects.transaction.TransactionManager;
import com.usatiuk.utils.StatusRuntimeExceptionNoStacktrace;
import io.grpc.Status; import io.grpc.Status;
import io.grpc.StatusRuntimeException; import io.grpc.StatusRuntimeException;
import io.quarkus.logging.Log; import io.quarkus.logging.Log;
@@ -73,11 +73,11 @@ public class DhfsFileServiceImpl implements DhfsFileService {
JMapHelper jMapHelper; JMapHelper jMapHelper;
private JKleppmannTreeManager.JKleppmannTree getTreeW() { private JKleppmannTreeManager.JKleppmannTree getTreeW() {
return jKleppmannTreeManager.getTree(JObjectKey.of("fs")); return jKleppmannTreeManager.getTree(JObjectKey.of("fs"), () -> new JKleppmannTreeNodeMetaDirectory(""));
} }
private JKleppmannTreeManager.JKleppmannTree getTreeR() { private JKleppmannTreeManager.JKleppmannTree getTreeR() {
return jKleppmannTreeManager.getTree(JObjectKey.of("fs"), LockingStrategy.OPTIMISTIC); return jKleppmannTreeManager.getTree(JObjectKey.of("fs"), LockingStrategy.OPTIMISTIC, () -> new JKleppmannTreeNodeMetaDirectory(""));
} }
private ChunkData createChunk(ByteString bytes) { private ChunkData createChunk(ByteString bytes) {
@@ -140,7 +140,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
try { try {
var ret = getDirEntryR(name); var ret = getDirEntryR(name);
return switch (ret.meta()) { return switch (ret.meta()) {
case JKleppmannTreeNodeMetaFile f -> Optional.of(f.getFileIno()); case JKleppmannTreeNodeMetaFile f -> Optional.of(f.fileIno());
case JKleppmannTreeNodeMetaDirectory f -> Optional.of(ret.key()); case JKleppmannTreeNodeMetaDirectory f -> Optional.of(ret.key());
default -> Optional.empty(); default -> Optional.empty();
}; };
@@ -189,7 +189,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
return jObjectTxManager.executeTx(() -> { return jObjectTxManager.executeTx(() -> {
return getTreeW().findParent(w -> { return getTreeW().findParent(w -> {
if (w.meta() instanceof JKleppmannTreeNodeMetaFile f) if (w.meta() instanceof JKleppmannTreeNodeMetaFile f)
return f.getFileIno().equals(ino); return f.fileIno().equals(ino);
return false; return false;
}); });
}); });
@@ -271,7 +271,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
} }
@Override @Override
public Optional<ByteString> read(JObjectKey fileUuid, long offset, int length) { public ByteString read(JObjectKey fileUuid, long offset, int length) {
return jObjectTxManager.executeTx(() -> { return jObjectTxManager.executeTx(() -> {
if (length < 0) if (length < 0)
throw new StatusRuntimeException(Status.INVALID_ARGUMENT.withDescription("Length should be more than zero: " + length)); throw new StatusRuntimeException(Status.INVALID_ARGUMENT.withDescription("Length should be more than zero: " + length));
@@ -281,12 +281,12 @@ public class DhfsFileServiceImpl implements DhfsFileService {
var file = remoteTx.getData(File.class, fileUuid).orElse(null); var file = remoteTx.getData(File.class, fileUuid).orElse(null);
if (file == null) { if (file == null) {
Log.error("File not found when trying to read: " + fileUuid); Log.error("File not found when trying to read: " + fileUuid);
return Optional.empty(); throw new StatusRuntimeException(Status.NOT_FOUND.withDescription("File not found when trying to read: " + fileUuid));
} }
try (var it = jMapHelper.getIterator(file, IteratorStart.LE, JMapLongKey.of(offset))) { try (var it = jMapHelper.getIterator(file, IteratorStart.LE, JMapLongKey.of(offset))) {
if (!it.hasNext()) if (!it.hasNext())
return Optional.of(ByteString.empty()); return ByteString.empty();
// if (it.peekNextKey().key() != offset) { // if (it.peekNextKey().key() != offset) {
// Log.warnv("Read over the end of file: {0} {1} {2}, next chunk: {3}", fileUuid, offset, length, it.peekNextKey()); // Log.warnv("Read over the end of file: {0} {1} {2}, next chunk: {3}", fileUuid, offset, length, it.peekNextKey());
@@ -324,10 +324,10 @@ public class DhfsFileServiceImpl implements DhfsFileService {
chunk = it.next(); chunk = it.next();
} }
return Optional.of(buf); return buf;
} catch (Exception e) { } catch (Exception e) {
Log.error("Error reading file: " + fileUuid, e); Log.error("Error reading file: " + fileUuid, e);
return Optional.empty(); throw new StatusRuntimeException(Status.INTERNAL.withDescription("Error reading file: " + fileUuid));
} }
}); });
} }
@@ -595,7 +595,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
public ByteString readlinkBS(JObjectKey uuid) { public ByteString readlinkBS(JObjectKey uuid) {
return jObjectTxManager.executeTx(() -> { return jObjectTxManager.executeTx(() -> {
var fileOpt = remoteTx.getData(File.class, uuid).orElseThrow(() -> new StatusRuntimeException(Status.NOT_FOUND.withDescription("File not found when trying to readlink: " + uuid))); var fileOpt = remoteTx.getData(File.class, uuid).orElseThrow(() -> new StatusRuntimeException(Status.NOT_FOUND.withDescription("File not found when trying to readlink: " + uuid)));
return read(uuid, 0, Math.toIntExact(size(uuid))).get(); return read(uuid, 0, Math.toIntExact(size(uuid)));
}); });
} }

View File

@@ -1,4 +1,4 @@
package com.usatiuk.dhfs.files.service; package com.usatiuk.dhfsfs.service;
public class DirectoryNotEmptyException extends RuntimeException { public class DirectoryNotEmptyException extends RuntimeException {
@Override @Override

View File

@@ -1,4 +1,4 @@
package com.usatiuk.dhfs.files.service; package com.usatiuk.dhfsfs.service;
public record GetattrRes(long mtime, long ctime, long mode, GetattrType type) { public record GetattrRes(long mtime, long ctime, long mode, GetattrType type) {
} }

View File

@@ -1,4 +1,4 @@
package com.usatiuk.dhfs.files.service; package com.usatiuk.dhfsfs.service;
public enum GetattrType { public enum GetattrType {
FILE, FILE,

View File

@@ -1,4 +1,4 @@
package com.usatiuk.dhfs.files; package com.usatiuk.dhfsfs;
import io.quarkus.test.junit.QuarkusTest; import io.quarkus.test.junit.QuarkusTest;
import io.quarkus.test.junit.TestProfile; import io.quarkus.test.junit.TestProfile;

View File

@@ -1,9 +1,8 @@
package com.usatiuk.dhfs.files; package com.usatiuk.dhfsfs;
import com.usatiuk.dhfs.RemoteTransaction; import com.usatiuk.dhfs.remoteobj.RemoteTransaction;
import com.usatiuk.dhfs.TempDataProfile; import com.usatiuk.dhfsfs.objects.File;
import com.usatiuk.dhfs.files.objects.File; import com.usatiuk.dhfsfs.service.DhfsFileService;
import com.usatiuk.dhfs.files.service.DhfsFileService;
import com.usatiuk.kleppmanntree.AlreadyExistsException; import com.usatiuk.kleppmanntree.AlreadyExistsException;
import com.usatiuk.objects.transaction.Transaction; import com.usatiuk.objects.transaction.Transaction;
import com.usatiuk.objects.transaction.TransactionManager; import com.usatiuk.objects.transaction.TransactionManager;
@@ -90,7 +89,7 @@ public abstract class DhfsFileServiceSimpleTestImpl {
// for (int start = 0; start < all.length(); start++) { // for (int start = 0; start < all.length(); start++) {
// for (int end = start; end <= all.length(); end++) { // for (int end = start; end <= all.length(); end++) {
// var read = fileService.read(fuuid.toString(), start, end - start); // var read = fileService.read(fuuid.toString(), start, end - start);
// Assertions.assertArrayEquals(all.substring(start, end).getBytes(), read.get().toByteArray()); // Assertions.assertArrayEquals(all.substring(start, end).getBytes(), read.toByteArray());
// } // }
// } // }
// } // }
@@ -113,20 +112,20 @@ public abstract class DhfsFileServiceSimpleTestImpl {
var curMtime = fileService.getattr(uuid).get().mtime(); var curMtime = fileService.getattr(uuid).get().mtime();
fileService.write(uuid, 0, new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}); fileService.write(uuid, 0, new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9});
Assertions.assertArrayEquals(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, fileService.read(uuid, 0, 10).get().toByteArray()); Assertions.assertArrayEquals(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, fileService.read(uuid, 0, 10).toByteArray());
Assertions.assertArrayEquals(new byte[]{2, 3, 4, 5, 6, 7, 8, 9}, fileService.read(uuid, 2, 8).get().toByteArray()); Assertions.assertArrayEquals(new byte[]{2, 3, 4, 5, 6, 7, 8, 9}, fileService.read(uuid, 2, 8).toByteArray());
fileService.write(uuid, 4, new byte[]{10, 11, 12}); fileService.write(uuid, 4, new byte[]{10, 11, 12});
Assertions.assertArrayEquals(new byte[]{0, 1, 2, 3, 10, 11, 12, 7, 8, 9}, fileService.read(uuid, 0, 10).get().toByteArray()); Assertions.assertArrayEquals(new byte[]{0, 1, 2, 3, 10, 11, 12, 7, 8, 9}, fileService.read(uuid, 0, 10).toByteArray());
fileService.write(uuid, 10, new byte[]{13, 14}); fileService.write(uuid, 10, new byte[]{13, 14});
Assertions.assertArrayEquals(new byte[]{0, 1, 2, 3, 10, 11, 12, 7, 8, 9, 13, 14}, fileService.read(uuid, 0, 12).get().toByteArray()); Assertions.assertArrayEquals(new byte[]{0, 1, 2, 3, 10, 11, 12, 7, 8, 9, 13, 14}, fileService.read(uuid, 0, 12).toByteArray());
fileService.write(uuid, 6, new byte[]{15, 16}); fileService.write(uuid, 6, new byte[]{15, 16});
Assertions.assertArrayEquals(new byte[]{0, 1, 2, 3, 10, 11, 15, 16, 8, 9, 13, 14}, fileService.read(uuid, 0, 12).get().toByteArray()); Assertions.assertArrayEquals(new byte[]{0, 1, 2, 3, 10, 11, 15, 16, 8, 9, 13, 14}, fileService.read(uuid, 0, 12).toByteArray());
fileService.write(uuid, 3, new byte[]{17, 18}); fileService.write(uuid, 3, new byte[]{17, 18});
Assertions.assertArrayEquals(new byte[]{0, 1, 2, 17, 18, 11, 15, 16, 8, 9, 13, 14}, fileService.read(uuid, 0, 12).get().toByteArray()); Assertions.assertArrayEquals(new byte[]{0, 1, 2, 17, 18, 11, 15, 16, 8, 9, 13, 14}, fileService.read(uuid, 0, 12).toByteArray());
var newMtime = fileService.getattr(uuid).get().mtime(); var newMtime = fileService.getattr(uuid).get().mtime();
Assertions.assertTrue(newMtime > curMtime); Assertions.assertTrue(newMtime > curMtime);
fileService.unlink("/writeTest"); fileService.unlink("/writeTest");
Assertions.assertFalse(fileService.open("/writeTest").isPresent()); Assertions.assertFalse(fileService.open("/writeTest").isPresent());
} }
@@ -139,7 +138,7 @@ public abstract class DhfsFileServiceSimpleTestImpl {
var uuid = ret.get(); var uuid = ret.get();
fileService.write(uuid, 0, new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}); fileService.write(uuid, 0, new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9});
Assertions.assertArrayEquals(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, fileService.read(uuid, 0, 10).get().toByteArray()); Assertions.assertArrayEquals(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, fileService.read(uuid, 0, 10).toByteArray());
fileService.unlink("/removeTest"); fileService.unlink("/removeTest");
Assertions.assertFalse(fileService.open("/removeTest").isPresent()); Assertions.assertFalse(fileService.open("/removeTest").isPresent());
@@ -153,12 +152,12 @@ public abstract class DhfsFileServiceSimpleTestImpl {
var uuid = ret.get(); var uuid = ret.get();
fileService.write(uuid, 0, new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}); fileService.write(uuid, 0, new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9});
Assertions.assertArrayEquals(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, fileService.read(uuid, 0, 10).get().toByteArray()); Assertions.assertArrayEquals(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, fileService.read(uuid, 0, 10).toByteArray());
fileService.truncate(uuid, 20); fileService.truncate(uuid, 20);
Assertions.assertArrayEquals(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, fileService.read(uuid, 0, 20).get().toByteArray()); Assertions.assertArrayEquals(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, fileService.read(uuid, 0, 20).toByteArray());
fileService.write(uuid, 5, new byte[]{10, 11, 12, 13, 14, 15, 16, 17}); fileService.write(uuid, 5, new byte[]{10, 11, 12, 13, 14, 15, 16, 17});
Assertions.assertArrayEquals(new byte[]{0, 1, 2, 3, 4, 10, 11, 12, 13, 14, 15, 16, 17, 0, 0, 0, 0, 0, 0, 0}, fileService.read(uuid, 0, 20).get().toByteArray()); Assertions.assertArrayEquals(new byte[]{0, 1, 2, 3, 4, 10, 11, 12, 13, 14, 15, 16, 17, 0, 0, 0, 0, 0, 0, 0}, fileService.read(uuid, 0, 20).toByteArray());
} }
@RepeatedTest(100) @RepeatedTest(100)
@@ -170,12 +169,12 @@ public abstract class DhfsFileServiceSimpleTestImpl {
var uuid = ret.get(); var uuid = ret.get();
fileService.write(uuid, 0, new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}); fileService.write(uuid, 0, new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9});
Assertions.assertArrayEquals(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, fileService.read(uuid, 0, 10).get().toByteArray()); Assertions.assertArrayEquals(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, fileService.read(uuid, 0, 10).toByteArray());
fileService.truncate(uuid, 20); fileService.truncate(uuid, 20);
Assertions.assertArrayEquals(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, fileService.read(uuid, 0, 20).get().toByteArray()); Assertions.assertArrayEquals(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, fileService.read(uuid, 0, 20).toByteArray());
fileService.write(uuid, 10, new byte[]{11, 12, 13, 14, 15, 16, 17, 18, 19, 20}); fileService.write(uuid, 10, new byte[]{11, 12, 13, 14, 15, 16, 17, 18, 19, 20});
Assertions.assertArrayEquals(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20}, fileService.read(uuid, 0, 20).get().toByteArray()); Assertions.assertArrayEquals(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20}, fileService.read(uuid, 0, 20).toByteArray());
} finally { } finally {
fileService.unlink("/truncateTest2"); fileService.unlink("/truncateTest2");
} }
@@ -189,10 +188,10 @@ public abstract class DhfsFileServiceSimpleTestImpl {
var uuid = ret.get(); var uuid = ret.get();
fileService.write(uuid, 0, new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}); fileService.write(uuid, 0, new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9});
Assertions.assertArrayEquals(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, fileService.read(uuid, 0, 10).get().toByteArray()); Assertions.assertArrayEquals(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, fileService.read(uuid, 0, 10).toByteArray());
fileService.truncate(uuid, 7); fileService.truncate(uuid, 7);
Assertions.assertArrayEquals(new byte[]{0, 1, 2, 3, 4, 5, 6,}, fileService.read(uuid, 0, 20).get().toByteArray()); Assertions.assertArrayEquals(new byte[]{0, 1, 2, 3, 4, 5, 6,}, fileService.read(uuid, 0, 20).toByteArray());
} }
@Test @Test
@@ -202,14 +201,14 @@ public abstract class DhfsFileServiceSimpleTestImpl {
var uuid = ret.get(); var uuid = ret.get();
fileService.write(uuid, 0, new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}); fileService.write(uuid, 0, new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9});
Assertions.assertArrayEquals(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, fileService.read(uuid, 0, 10).get().toByteArray()); Assertions.assertArrayEquals(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, fileService.read(uuid, 0, 10).toByteArray());
Assertions.assertTrue(fileService.rename("/moveTest", "/movedTest")); Assertions.assertTrue(fileService.rename("/moveTest", "/movedTest"));
Assertions.assertFalse(fileService.open("/moveTest").isPresent()); Assertions.assertFalse(fileService.open("/moveTest").isPresent());
Assertions.assertTrue(fileService.open("/movedTest").isPresent()); Assertions.assertTrue(fileService.open("/movedTest").isPresent());
Assertions.assertArrayEquals(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, Assertions.assertArrayEquals(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9},
fileService.read(fileService.open("/movedTest").get(), 0, 10).get().toByteArray()); fileService.read(fileService.open("/movedTest").get(), 0, 10).toByteArray());
} }
@Test @Test
@@ -222,9 +221,9 @@ public abstract class DhfsFileServiceSimpleTestImpl {
var uuid2 = ret2.get(); var uuid2 = ret2.get();
fileService.write(uuid, 0, new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}); fileService.write(uuid, 0, new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9});
Assertions.assertArrayEquals(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, fileService.read(uuid, 0, 10).get().toByteArray()); Assertions.assertArrayEquals(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, fileService.read(uuid, 0, 10).toByteArray());
fileService.write(uuid2, 0, new byte[]{11, 12, 13, 14, 15, 16, 17, 18, 19, 29}); fileService.write(uuid2, 0, new byte[]{11, 12, 13, 14, 15, 16, 17, 18, 19, 29});
Assertions.assertArrayEquals(new byte[]{11, 12, 13, 14, 15, 16, 17, 18, 19, 29}, fileService.read(uuid2, 0, 10).get().toByteArray()); Assertions.assertArrayEquals(new byte[]{11, 12, 13, 14, 15, 16, 17, 18, 19, 29}, fileService.read(uuid2, 0, 10).toByteArray());
jObjectTxManager.run(() -> { jObjectTxManager.run(() -> {
@@ -238,7 +237,7 @@ public abstract class DhfsFileServiceSimpleTestImpl {
Assertions.assertTrue(fileService.open("/moveOverTest2").isPresent()); Assertions.assertTrue(fileService.open("/moveOverTest2").isPresent());
Assertions.assertArrayEquals(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, Assertions.assertArrayEquals(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9},
fileService.read(fileService.open("/moveOverTest2").get(), 0, 10).get().toByteArray()); fileService.read(fileService.open("/moveOverTest2").get(), 0, 10).toByteArray());
// await().atMost(5, TimeUnit.SECONDS).until(() -> { // await().atMost(5, TimeUnit.SECONDS).until(() -> {
// jObjectTxManager.run(() -> { // jObjectTxManager.run(() -> {
@@ -256,8 +255,8 @@ public abstract class DhfsFileServiceSimpleTestImpl {
var uuid = ret.get(); var uuid = ret.get();
fileService.write(uuid, 0, new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}); fileService.write(uuid, 0, new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9});
Assertions.assertArrayEquals(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, fileService.read(uuid, 0, 10).get().toByteArray()); Assertions.assertArrayEquals(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, fileService.read(uuid, 0, 10).toByteArray());
Assertions.assertArrayEquals(new byte[]{}, fileService.read(uuid, 20, 10).get().toByteArray()); Assertions.assertArrayEquals(new byte[]{}, fileService.read(uuid, 20, 10).toByteArray());
} }
@Test @Test
@@ -267,13 +266,13 @@ public abstract class DhfsFileServiceSimpleTestImpl {
var uuid = ret.get(); var uuid = ret.get();
fileService.write(uuid, 0, new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}); fileService.write(uuid, 0, new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9});
Assertions.assertArrayEquals(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, fileService.read(uuid, 0, 10).get().toByteArray()); Assertions.assertArrayEquals(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, fileService.read(uuid, 0, 10).toByteArray());
fileService.write(uuid, 20, new byte[]{10, 11, 12, 13, 14, 15, 16, 17, 18, 19}); fileService.write(uuid, 20, new byte[]{10, 11, 12, 13, 14, 15, 16, 17, 18, 19});
Assertions.assertArrayEquals(new byte[]{ Assertions.assertArrayEquals(new byte[]{
0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
10, 11, 12, 13, 14, 15, 16, 17, 18, 19 10, 11, 12, 13, 14, 15, 16, 17, 18, 19
}, fileService.read(uuid, 0, 30).get().toByteArray()); }, fileService.read(uuid, 0, 30).toByteArray());
} }
@Test @Test
@@ -283,7 +282,7 @@ public abstract class DhfsFileServiceSimpleTestImpl {
var uuid = ret.get(); var uuid = ret.get();
fileService.write(uuid, 0, new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}); fileService.write(uuid, 0, new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9});
Assertions.assertArrayEquals(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, fileService.read(uuid, 0, 10).get().toByteArray()); Assertions.assertArrayEquals(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, fileService.read(uuid, 0, 10).toByteArray());
// var oldfile = jObjectManager.get(uuid).orElseThrow(IllegalStateException::new); // var oldfile = jObjectManager.get(uuid).orElseThrow(IllegalStateException::new);
// var chunk = oldfile.runReadLocked(JObjectManager.ResolutionStrategy.LOCAL_ONLY, (m, d) -> d.extractRefs()).stream().toList().get(0); // var chunk = oldfile.runReadLocked(JObjectManager.ResolutionStrategy.LOCAL_ONLY, (m, d) -> d.extractRefs()).stream().toList().get(0);
@@ -298,6 +297,6 @@ public abstract class DhfsFileServiceSimpleTestImpl {
Assertions.assertTrue(fileService.open("/movedTest2").isPresent()); Assertions.assertTrue(fileService.open("/movedTest2").isPresent());
Assertions.assertArrayEquals(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, Assertions.assertArrayEquals(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9},
fileService.read(fileService.open("/movedTest2").get(), 0, 10).get().toByteArray()); fileService.read(fileService.open("/movedTest2").get(), 0, 10).toByteArray());
} }
} }

View File

@@ -1,4 +1,4 @@
package com.usatiuk.dhfs.files; package com.usatiuk.dhfsfs;
import io.quarkus.test.junit.QuarkusTest; import io.quarkus.test.junit.QuarkusTest;
import io.quarkus.test.junit.TestProfile; import io.quarkus.test.junit.TestProfile;

View File

@@ -1,4 +1,4 @@
package com.usatiuk.dhfs.files; package com.usatiuk.dhfsfs;
import io.quarkus.test.junit.QuarkusTest; import io.quarkus.test.junit.QuarkusTest;
import io.quarkus.test.junit.TestProfile; import io.quarkus.test.junit.TestProfile;

View File

@@ -1,4 +1,4 @@
package com.usatiuk.dhfs; package com.usatiuk.dhfsfs;
import io.quarkus.test.junit.QuarkusTestProfile; import io.quarkus.test.junit.QuarkusTestProfile;

View File

@@ -1,4 +1,4 @@
package com.usatiuk.dhfs; package com.usatiuk.dhfsfs;
import io.quarkus.logging.Log; import io.quarkus.logging.Log;
import io.quarkus.runtime.ShutdownEvent; import io.quarkus.runtime.ShutdownEvent;

View File

@@ -1,4 +1,4 @@
package com.usatiuk.dhfs.benchmarks; package com.usatiuk.dhfsfs.benchmarks;
import io.quarkus.logging.Log; import io.quarkus.logging.Log;
import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics; import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;

View File

@@ -1,8 +1,8 @@
package com.usatiuk.dhfs.benchmarks; package com.usatiuk.dhfsfs.benchmarks;
import com.google.protobuf.UnsafeByteOperations; import com.google.protobuf.UnsafeByteOperations;
import com.usatiuk.dhfs.TempDataProfile; import com.usatiuk.dhfsfs.TempDataProfile;
import com.usatiuk.dhfs.files.service.DhfsFileService; import com.usatiuk.dhfsfs.service.DhfsFileService;
import com.usatiuk.objects.JObjectKey; import com.usatiuk.objects.JObjectKey;
import io.quarkus.test.junit.QuarkusTest; import io.quarkus.test.junit.QuarkusTest;
import io.quarkus.test.junit.TestProfile; import io.quarkus.test.junit.TestProfile;

View File

@@ -107,26 +107,11 @@
<artifactId>commons-math3</artifactId> <artifactId>commons-math3</artifactId>
<version>3.6.1</version> <version>3.6.1</version>
</dependency> </dependency>
<dependency>
<groupId>com.usatiuk</groupId>
<artifactId>kleppmanntree</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.usatiuk.dhfs</groupId>
<artifactId>objects</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency> <dependency>
<groupId>com.usatiuk.dhfs</groupId> <groupId>com.usatiuk.dhfs</groupId>
<artifactId>dhfs-fs</artifactId> <artifactId>dhfs-fs</artifactId>
<version>1.0-SNAPSHOT</version> <version>1.0-SNAPSHOT</version>
</dependency> </dependency>
<dependency>
<groupId>com.usatiuk.dhfs</groupId>
<artifactId>sync-base</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency> <dependency>
<groupId>com.usatiuk.dhfs</groupId> <groupId>com.usatiuk.dhfs</groupId>
<artifactId>utils</artifactId> <artifactId>utils</artifactId>

View File

@@ -1,10 +1,10 @@
package com.usatiuk.dhfs.fuse; package com.usatiuk.dhfsfuse;
import com.google.protobuf.UnsafeByteOperations; import com.google.protobuf.UnsafeByteOperations;
import com.sun.security.auth.module.UnixSystem; import com.sun.security.auth.module.UnixSystem;
import com.usatiuk.dhfs.files.service.DhfsFileService; import com.usatiuk.dhfsfs.service.DhfsFileService;
import com.usatiuk.dhfs.files.service.DirectoryNotEmptyException; import com.usatiuk.dhfsfs.service.DirectoryNotEmptyException;
import com.usatiuk.dhfs.files.service.GetattrRes; import com.usatiuk.dhfsfs.service.GetattrRes;
import com.usatiuk.kleppmanntree.AlreadyExistsException; import com.usatiuk.kleppmanntree.AlreadyExistsException;
import com.usatiuk.objects.JObjectKey; import com.usatiuk.objects.JObjectKey;
import io.grpc.Status; import io.grpc.Status;
@@ -40,6 +40,8 @@ import static jnr.posix.FileStat.*;
public class DhfsFuse extends FuseStubFS { public class DhfsFuse extends FuseStubFS {
private static final int blksize = 1048576; private static final int blksize = 1048576;
private static final int iosize = 1048576; private static final int iosize = 1048576;
private final ConcurrentHashMap<Long, JObjectKey> _openHandles = new ConcurrentHashMap<>();
private final AtomicLong _fh = new AtomicLong(1);
@ConfigProperty(name = "dhfs.fuse.root") @ConfigProperty(name = "dhfs.fuse.root")
String root; String root;
@ConfigProperty(name = "dhfs.fuse.enabled") @ConfigProperty(name = "dhfs.fuse.enabled")
@@ -53,9 +55,6 @@ public class DhfsFuse extends FuseStubFS {
@Inject @Inject
DhfsFileService fileService; DhfsFileService fileService;
private final ConcurrentHashMap<Long, JObjectKey> _openHandles = new ConcurrentHashMap<>();
private final AtomicLong _fh = new AtomicLong(1);
private long allocateHandle(JObjectKey key) { private long allocateHandle(JObjectKey key) {
while (true) { while (true) {
var newFh = _fh.getAndIncrement(); var newFh = _fh.getAndIncrement();
@@ -224,8 +223,8 @@ public class DhfsFuse extends FuseStubFS {
var fileKey = getFromHandle(fi.fh.get()); var fileKey = getFromHandle(fi.fh.get());
var read = fileService.read(fileKey, offset, (int) size); var read = fileService.read(fileKey, offset, (int) size);
if (read.isEmpty()) return 0; if (read.isEmpty()) return 0;
UnsafeByteOperations.unsafeWriteTo(read.get(), new JnrPtrByteOutput(jnrPtrByteOutputAccessors, buf, size)); UnsafeByteOperations.unsafeWriteTo(read, new JnrPtrByteOutput(jnrPtrByteOutputAccessors, buf, size));
return read.get().size(); return read.size();
} catch (Throwable e) { } catch (Throwable e) {
Log.error("When reading " + path, e); Log.error("When reading " + path, e);
return -ErrorCodes.EIO(); return -ErrorCodes.EIO();

View File

@@ -1,4 +1,4 @@
package com.usatiuk.dhfs.fuse; package com.usatiuk.dhfsfuse;
import com.google.protobuf.ByteOutput; import com.google.protobuf.ByteOutput;
import jnr.ffi.Pointer; import jnr.ffi.Pointer;

View File

@@ -1,4 +1,4 @@
package com.usatiuk.dhfs.fuse; package com.usatiuk.dhfsfuse;
import jakarta.inject.Singleton; import jakarta.inject.Singleton;
import jdk.internal.access.JavaNioAccess; import jdk.internal.access.JavaNioAccess;

View File

@@ -1,6 +1,5 @@
package com.usatiuk.dhfs.fuse; package com.usatiuk.dhfsfuse;
import com.usatiuk.dhfs.TempDataProfile;
import io.quarkus.test.junit.QuarkusTest; import io.quarkus.test.junit.QuarkusTest;
import io.quarkus.test.junit.TestProfile; import io.quarkus.test.junit.TestProfile;
import org.eclipse.microprofile.config.inject.ConfigProperty; import org.eclipse.microprofile.config.inject.ConfigProperty;

View File

@@ -1,4 +1,4 @@
package com.usatiuk.dhfs; package com.usatiuk.dhfsfuse;
import io.quarkus.test.junit.QuarkusTestProfile; import io.quarkus.test.junit.QuarkusTestProfile;

View File

@@ -1,4 +1,4 @@
package com.usatiuk.dhfs; package com.usatiuk.dhfsfuse;
import io.quarkus.logging.Log; import io.quarkus.logging.Log;
import io.quarkus.runtime.ShutdownEvent; import io.quarkus.runtime.ShutdownEvent;

View File

@@ -1,7 +1,5 @@
package com.usatiuk.kleppmanntree; package com.usatiuk.kleppmanntree;
import jakarta.annotation.Nonnull;
import jakarta.annotation.Nullable;
import org.apache.commons.lang3.tuple.Pair; import org.apache.commons.lang3.tuple.Pair;
import java.util.*; import java.util.*;
@@ -217,31 +215,24 @@ public class KleppmannTree<TimestampT extends Comparable<TimestampT>, PeerIdT ex
} }
assert cmp != 0; assert cmp != 0;
if (cmp < 0) { if (cmp < 0) {
try { if (log.containsKey(op.timestamp())) return;
if (log.containsKey(op.timestamp())) return; var toUndo = log.newestSlice(op.timestamp(), false);
var toUndo = log.newestSlice(op.timestamp(), false); _undoCtx = new HashMap<>();
_undoCtx = new HashMap<>(); for (var entry : toUndo.reversed()) {
for (var entry : toUndo.reversed()) { undoOp(entry.getValue());
undoOp(entry.getValue());
}
try {
doAndPut(op, failCreatingIfExists);
} finally {
for (var entry : toUndo) {
redoOp(entry);
}
if (!_undoCtx.isEmpty()) {
for (var e : _undoCtx.entrySet()) {
LOGGER.log(Level.FINE, "Dropping node " + e.getKey());
_storage.removeNode(e.getKey());
}
}
_undoCtx = null;
}
} finally {
tryTrimLog();
} }
doAndPut(op, failCreatingIfExists);
for (var entry : toUndo) {
redoOp(entry);
}
if (!_undoCtx.isEmpty()) {
for (var e : _undoCtx.entrySet()) {
LOGGER.log(Level.FINE, "Dropping node " + e.getKey());
_storage.removeNode(e.getKey());
}
}
_undoCtx = null;
tryTrimLog();
} else { } else {
doAndPut(op, failCreatingIfExists); doAndPut(op, failCreatingIfExists);
tryTrimLog(); tryTrimLog();
@@ -264,8 +255,7 @@ public class KleppmannTree<TimestampT extends Comparable<TimestampT>, PeerIdT ex
} catch (AlreadyExistsException aex) { } catch (AlreadyExistsException aex) {
throw aex; throw aex;
} catch (Exception e) { } catch (Exception e) {
LOGGER.log(Level.SEVERE, "Error computing effects for op" + op.toString(), e); throw new RuntimeException("Error computing effects for op " + op.toString(), e);
computed = new LogRecord<>(op, null);
} }
if (computed.effects() != null) if (computed.effects() != null)
@@ -373,6 +363,7 @@ public class KleppmannTree<TimestampT extends Comparable<TimestampT>, PeerIdT ex
MetaT conflictNodeMeta = conflictNode.meta(); MetaT conflictNodeMeta = conflictNode.meta();
if (Objects.equals(conflictNodeMeta, op.newMeta())) { if (Objects.equals(conflictNodeMeta, op.newMeta())) {
LOGGER.finer(() -> "Node creation conflict (the same): " + conflictNode);
return new LogRecord<>(op, null); return new LogRecord<>(op, null);
} }
@@ -400,9 +391,9 @@ public class KleppmannTree<TimestampT extends Comparable<TimestampT>, PeerIdT ex
if (oldMeta != null if (oldMeta != null
&& op.newMeta() != null && op.newMeta() != null
&& !oldMeta.getClass().equals(op.newMeta().getClass())) { && !oldMeta.getClass().equals(op.newMeta().getClass())) {
LOGGER.log(Level.SEVERE, "Class mismatch for meta for node " + node.key()); throw new RuntimeException("Class mismatch for meta for node " + node.key());
return new LogRecord<>(op, null);
} }
var replaceNodeId = newParent.children().get(op.newName()); var replaceNodeId = newParent.children().get(op.newName());
if (replaceNodeId != null) { if (replaceNodeId != null) {
var replaceNode = _storage.getById(replaceNodeId); var replaceNode = _storage.getById(replaceNodeId);

View File

@@ -10,14 +10,14 @@ public record LogEffect<TimestampT extends Comparable<TimestampT>, PeerIdT exten
NodeIdT childId) implements Serializable { NodeIdT childId) implements Serializable {
public String oldName() { public String oldName() {
if (oldInfo.oldMeta() != null) { if (oldInfo.oldMeta() != null) {
return oldInfo.oldMeta().getName(); return oldInfo.oldMeta().name();
} }
return childId.toString(); return childId.toString();
} }
public String newName() { public String newName() {
if (newMeta != null) { if (newMeta != null) {
return newMeta.getName(); return newMeta.name();
} }
return childId.toString(); return childId.toString();
} }

View File

@@ -3,7 +3,7 @@ package com.usatiuk.kleppmanntree;
import java.io.Serializable; import java.io.Serializable;
public interface NodeMeta extends Serializable { public interface NodeMeta extends Serializable {
String getName(); String name();
NodeMeta withName(String name); NodeMeta withName(String name);
} }

View File

@@ -7,7 +7,7 @@ public record OpMove<TimestampT extends Comparable<TimestampT>, PeerIdT extends
NodeIdT childId) implements Serializable { NodeIdT childId) implements Serializable {
public String newName() { public String newName() {
if (newMeta != null) if (newMeta != null)
return newMeta.getName(); return newMeta.name();
return childId.toString(); return childId.toString();
} }
} }

View File

@@ -17,7 +17,7 @@ public interface TreeNode<TimestampT extends Comparable<TimestampT>, PeerIdT ext
default String name() { default String name() {
var meta = meta(); var meta = meta();
if (meta != null) return meta.getName(); if (meta != null) return meta.name();
return key().toString(); return key().toString();
} }

View File

@@ -8,7 +8,7 @@ public abstract class TestNodeMeta implements NodeMeta {
} }
@Override @Override
public String getName() { public String name() {
return _name; return _name;
} }

View File

@@ -3,9 +3,9 @@ package com.usatiuk.objects;
import java.util.function.Supplier; import java.util.function.Supplier;
public final class JDataVersionedWrapperLazy implements JDataVersionedWrapper { public final class JDataVersionedWrapperLazy implements JDataVersionedWrapper {
private JData _data;
private final long _version; private final long _version;
private final int _estimatedSize; private final int _estimatedSize;
private JData _data;
private Supplier<JData> _producer; private Supplier<JData> _producer;
public JDataVersionedWrapperLazy(long version, int estimatedSize, Supplier<JData> producer) { public JDataVersionedWrapperLazy(long version, int estimatedSize, Supplier<JData> producer) {

View File

@@ -2,7 +2,7 @@ package com.usatiuk.objects;
import com.google.protobuf.ByteString; import com.google.protobuf.ByteString;
import com.usatiuk.dhfs.utils.SerializationHelper; import com.usatiuk.utils.SerializationHelper;
import io.quarkus.arc.DefaultBean; import io.quarkus.arc.DefaultBean;
import jakarta.enterprise.context.ApplicationScoped; import jakarta.enterprise.context.ApplicationScoped;

View File

@@ -1,6 +1,6 @@
package com.usatiuk.objects.iterators; package com.usatiuk.objects.iterators;
import com.usatiuk.dhfs.utils.AutoCloseableNoThrow; import com.usatiuk.utils.AutoCloseableNoThrow;
import org.apache.commons.lang3.tuple.Pair; import org.apache.commons.lang3.tuple.Pair;
import java.util.Iterator; import java.util.Iterator;

View File

@@ -1,7 +1,5 @@
package com.usatiuk.objects.iterators; package com.usatiuk.objects.iterators;
import java.util.Optional;
public interface Data<V> extends MaybeTombstone<V> { public interface Data<V> extends MaybeTombstone<V> {
V value(); V value();
} }

View File

@@ -1,6 +1,4 @@
package com.usatiuk.objects.iterators; package com.usatiuk.objects.iterators;
import java.util.Optional;
public record DataWrapper<V>(V value) implements Data<V> { public record DataWrapper<V>(V value) implements Data<V> {
} }

View File

@@ -1,6 +1,4 @@
package com.usatiuk.objects.iterators; package com.usatiuk.objects.iterators;
import java.util.Optional;
public interface MaybeTombstone<T> { public interface MaybeTombstone<T> {
} }

View File

@@ -10,16 +10,9 @@ import java.util.NoSuchElementException;
import java.util.TreeMap; import java.util.TreeMap;
public class MergingKvIterator<K extends Comparable<K>, V> extends ReversibleKvIterator<K, V> { public class MergingKvIterator<K extends Comparable<K>, V> extends ReversibleKvIterator<K, V> {
private record IteratorEntry<K extends Comparable<K>, V>(int priority, CloseableKvIterator<K, V> iterator) {
public IteratorEntry<K, V> reversed() {
return new IteratorEntry<>(priority, iterator.reversed());
}
}
private final NavigableMap<K, IteratorEntry<K, V>> _sortedIterators = new TreeMap<>(); private final NavigableMap<K, IteratorEntry<K, V>> _sortedIterators = new TreeMap<>();
private final String _name; private final String _name;
private final List<IteratorEntry<K, V>> _iterators; private final List<IteratorEntry<K, V>> _iterators;
public MergingKvIterator(String name, IteratorStart startType, K startKey, List<IterProdFn<K, V>> iterators) { public MergingKvIterator(String name, IteratorStart startType, K startKey, List<IterProdFn<K, V>> iterators) {
_goingForward = true; _goingForward = true;
_name = name; _name = name;
@@ -215,6 +208,12 @@ public class MergingKvIterator<K extends Comparable<K>, V> extends ReversibleKvI
private interface FirstMatchState<K extends Comparable<K>, V> { private interface FirstMatchState<K extends Comparable<K>, V> {
} }
private record IteratorEntry<K extends Comparable<K>, V>(int priority, CloseableKvIterator<K, V> iterator) {
public IteratorEntry<K, V> reversed() {
return new IteratorEntry<>(priority, iterator.reversed());
}
}
private record FirstMatchNone<K extends Comparable<K>, V>() implements FirstMatchState<K, V> { private record FirstMatchNone<K extends Comparable<K>, V>() implements FirstMatchState<K, V> {
} }

View File

@@ -1,6 +1,5 @@
package com.usatiuk.objects.iterators; package com.usatiuk.objects.iterators;
import io.quarkus.logging.Log;
import org.apache.commons.lang3.tuple.Pair; import org.apache.commons.lang3.tuple.Pair;
import java.util.NoSuchElementException; import java.util.NoSuchElementException;

View File

@@ -1,6 +1,4 @@
package com.usatiuk.objects.iterators; package com.usatiuk.objects.iterators;
import java.util.Optional;
public interface Tombstone<V> extends MaybeTombstone<V> { public interface Tombstone<V> extends MaybeTombstone<V> {
} }

View File

@@ -1,7 +1,5 @@
package com.usatiuk.objects.iterators; package com.usatiuk.objects.iterators;
import io.quarkus.logging.Log;
import java.util.List; import java.util.List;
public abstract class TombstoneMergingKvIterator { public abstract class TombstoneMergingKvIterator {

View File

@@ -1,9 +1,8 @@
package com.usatiuk.objects.snapshot; package com.usatiuk.objects.snapshot;
import com.usatiuk.objects.JObjectKey;
import com.usatiuk.objects.iterators.CloseableKvIterator; import com.usatiuk.objects.iterators.CloseableKvIterator;
import com.usatiuk.objects.iterators.IteratorStart; import com.usatiuk.objects.iterators.IteratorStart;
import com.usatiuk.dhfs.utils.AutoCloseableNoThrow; import com.usatiuk.utils.AutoCloseableNoThrow;
import javax.annotation.Nonnull; import javax.annotation.Nonnull;
import java.util.Optional; import java.util.Optional;

View File

@@ -24,52 +24,15 @@ import java.util.concurrent.atomic.AtomicReference;
@ApplicationScoped @ApplicationScoped
public class CachingObjectPersistentStore { public class CachingObjectPersistentStore {
private final AtomicReference<Cache> _cache;
@Inject @Inject
SerializingObjectPersistentStore delegate; SerializingObjectPersistentStore delegate;
@ConfigProperty(name = "dhfs.objects.lru.print-stats") @ConfigProperty(name = "dhfs.objects.lru.print-stats")
boolean printStats; boolean printStats;
private record Cache(TreePMap<JObjectKey, CacheEntry> map,
int size,
long version,
int sizeLimit) {
public Cache withPut(JObjectKey key, Optional<JDataVersionedWrapper> obj) {
var entry = obj.<CacheEntry>map(o -> new CacheEntryPresent(o, o.estimateSize())).orElse(new CacheEntryMiss());
int newSize = size() + entry.size();
var old = map.get(key);
if (old != null)
newSize -= old.size();
TreePMap<JObjectKey, CacheEntry> newCache = map();
while (newSize > sizeLimit) {
var del = newCache.firstEntry();
newCache = newCache.minusFirstEntry();
newSize -= del.getValue().size();
}
newCache = newCache.plus(key, entry);
return new Cache(
newCache,
newSize,
version,
sizeLimit
);
}
public Cache withVersion(long version) {
return new Cache(map, size, version, sizeLimit);
}
}
private final AtomicReference<Cache> _cache;
private ExecutorService _commitExecutor; private ExecutorService _commitExecutor;
private ExecutorService _statusExecutor; private ExecutorService _statusExecutor;
private AtomicLong _cached = new AtomicLong(); private AtomicLong _cached = new AtomicLong();
private AtomicLong _cacheTries = new AtomicLong(); private AtomicLong _cacheTries = new AtomicLong();
public CachingObjectPersistentStore(@ConfigProperty(name = "dhfs.objects.lru.limit") int sizeLimit) { public CachingObjectPersistentStore(@ConfigProperty(name = "dhfs.objects.lru.limit") int sizeLimit) {
_cache = new AtomicReference<>( _cache = new AtomicReference<>(
new Cache(TreePMap.empty(), 0, -1, sizeLimit) new Cache(TreePMap.empty(), 0, -1, sizeLimit)
@@ -142,10 +105,10 @@ public class CachingObjectPersistentStore {
Snapshot<JObjectKey, JDataVersionedWrapper> finalBacking = backing; Snapshot<JObjectKey, JDataVersionedWrapper> finalBacking = backing;
Cache finalCurCache = curCache; Cache finalCurCache = curCache;
return new Snapshot<JObjectKey, JDataVersionedWrapper>() { return new Snapshot<JObjectKey, JDataVersionedWrapper>() {
private boolean _invalid = false;
private boolean _closed = false;
private final Cache _curCache = finalCurCache; private final Cache _curCache = finalCurCache;
private final Snapshot<JObjectKey, JDataVersionedWrapper> _backing = finalBacking; private final Snapshot<JObjectKey, JDataVersionedWrapper> _backing = finalBacking;
private boolean _invalid = false;
private boolean _closed = false;
private void doCache(JObjectKey key, Optional<JDataVersionedWrapper> obj) { private void doCache(JObjectKey key, Optional<JDataVersionedWrapper> obj) {
_cacheTries.incrementAndGet(); _cacheTries.incrementAndGet();
@@ -292,6 +255,41 @@ public class CachingObjectPersistentStore {
int size(); int size();
} }
private record Cache(TreePMap<JObjectKey, CacheEntry> map,
int size,
long version,
int sizeLimit) {
public Cache withPut(JObjectKey key, Optional<JDataVersionedWrapper> obj) {
var entry = obj.<CacheEntry>map(o -> new CacheEntryPresent(o, o.estimateSize())).orElse(new CacheEntryMiss());
int newSize = size() + entry.size();
var old = map.get(key);
if (old != null)
newSize -= old.size();
TreePMap<JObjectKey, CacheEntry> newCache = map();
while (newSize > sizeLimit) {
var del = newCache.firstEntry();
newCache = newCache.minusFirstEntry();
newSize -= del.getValue().size();
}
newCache = newCache.plus(key, entry);
return new Cache(
newCache,
newSize,
version,
sizeLimit
);
}
public Cache withVersion(long version) {
return new Cache(map, size, version, sizeLimit);
}
}
private record CacheEntryPresent(JDataVersionedWrapper value, private record CacheEntryPresent(JDataVersionedWrapper value,
int size) implements CacheEntry, Data<JDataVersionedWrapper> { int size) implements CacheEntry, Data<JDataVersionedWrapper> {
} }

View File

@@ -2,7 +2,6 @@ package com.usatiuk.objects.stores;
import com.google.protobuf.ByteString; import com.google.protobuf.ByteString;
import com.google.protobuf.UnsafeByteOperations; import com.google.protobuf.UnsafeByteOperations;
import com.usatiuk.dhfs.utils.RefcountedCloseable;
import com.usatiuk.objects.JObjectKey; import com.usatiuk.objects.JObjectKey;
import com.usatiuk.objects.JObjectKeyMax; import com.usatiuk.objects.JObjectKeyMax;
import com.usatiuk.objects.JObjectKeyMin; import com.usatiuk.objects.JObjectKeyMin;
@@ -11,6 +10,7 @@ import com.usatiuk.objects.iterators.IteratorStart;
import com.usatiuk.objects.iterators.KeyPredicateKvIterator; import com.usatiuk.objects.iterators.KeyPredicateKvIterator;
import com.usatiuk.objects.iterators.ReversibleKvIterator; import com.usatiuk.objects.iterators.ReversibleKvIterator;
import com.usatiuk.objects.snapshot.Snapshot; import com.usatiuk.objects.snapshot.Snapshot;
import com.usatiuk.utils.RefcountedCloseable;
import io.quarkus.arc.properties.IfBuildProperty; import io.quarkus.arc.properties.IfBuildProperty;
import io.quarkus.logging.Log; import io.quarkus.logging.Log;
import io.quarkus.runtime.ShutdownEvent; import io.quarkus.runtime.ShutdownEvent;

View File

@@ -2,7 +2,6 @@ package com.usatiuk.objects.stores;
import com.google.protobuf.ByteString; import com.google.protobuf.ByteString;
import com.usatiuk.objects.JObjectKey; import com.usatiuk.objects.JObjectKey;
import com.usatiuk.objects.JObjectKeyImpl;
import com.usatiuk.objects.iterators.CloseableKvIterator; import com.usatiuk.objects.iterators.CloseableKvIterator;
import com.usatiuk.objects.iterators.IteratorStart; import com.usatiuk.objects.iterators.IteratorStart;
import com.usatiuk.objects.iterators.NavigableMapKvIterator; import com.usatiuk.objects.iterators.NavigableMapKvIterator;

View File

@@ -2,13 +2,10 @@ package com.usatiuk.objects.stores;
import com.google.protobuf.ByteString; import com.google.protobuf.ByteString;
import com.usatiuk.objects.JObjectKey; import com.usatiuk.objects.JObjectKey;
import com.usatiuk.objects.iterators.CloseableKvIterator;
import com.usatiuk.objects.iterators.IteratorStart;
import com.usatiuk.objects.snapshot.Snapshot; import com.usatiuk.objects.snapshot.Snapshot;
import javax.annotation.Nonnull; import javax.annotation.Nonnull;
import java.util.Optional; import java.util.Optional;
import java.util.function.Consumer;
// Persistent storage of objects // Persistent storage of objects
// All changes are written as sequential transactions // All changes are written as sequential transactions

View File

@@ -3,5 +3,6 @@ package com.usatiuk.objects.stores;
import com.usatiuk.objects.JDataVersionedWrapper; import com.usatiuk.objects.JDataVersionedWrapper;
import com.usatiuk.objects.iterators.Data; import com.usatiuk.objects.iterators.Data;
public record PendingWrite(JDataVersionedWrapper value, long bundleId) implements PendingWriteEntry, Data<JDataVersionedWrapper> { public record PendingWrite(JDataVersionedWrapper value,
long bundleId) implements PendingWriteEntry, Data<JDataVersionedWrapper> {
} }

View File

@@ -32,19 +32,10 @@ import java.util.function.Consumer;
public class WritebackObjectPersistentStore { public class WritebackObjectPersistentStore {
private final LinkedList<TxBundle> _pendingBundles = new LinkedList<>(); private final LinkedList<TxBundle> _pendingBundles = new LinkedList<>();
private final LinkedHashMap<Long, TxBundle> _notFlushedBundles = new LinkedHashMap<>(); private final LinkedHashMap<Long, TxBundle> _notFlushedBundles = new LinkedHashMap<>();
private record PendingWriteData(TreePMap<JObjectKey, PendingWriteEntry> pendingWrites,
long lastFlushedId,
long lastCommittedId) {
}
private final AtomicReference<PendingWriteData> _pendingWrites = new AtomicReference<>(null); private final AtomicReference<PendingWriteData> _pendingWrites = new AtomicReference<>(null);
private final Object _flushWaitSynchronizer = new Object(); private final Object _flushWaitSynchronizer = new Object();
private final AtomicLong _lastWrittenId = new AtomicLong(-1); private final AtomicLong _lastWrittenId = new AtomicLong(-1);
private final AtomicLong _lastCommittedId = new AtomicLong(); private final AtomicLong _lastCommittedId = new AtomicLong();
private final AtomicLong _waitedTotal = new AtomicLong(0); private final AtomicLong _waitedTotal = new AtomicLong(0);
@Inject @Inject
CachingObjectPersistentStore cachedStore; CachingObjectPersistentStore cachedStore;
@@ -351,7 +342,7 @@ public class WritebackObjectPersistentStore {
@Override @Override
public CloseableKvIterator<JObjectKey, JDataVersionedWrapper> getIterator(IteratorStart start, JObjectKey key) { public CloseableKvIterator<JObjectKey, JDataVersionedWrapper> getIterator(IteratorStart start, JObjectKey key) {
return TombstoneMergingKvIterator.<JObjectKey, JDataVersionedWrapper>of("writeback-ps", start, key, return TombstoneMergingKvIterator.<JObjectKey, JDataVersionedWrapper>of("writeback-ps", start, key,
(tS, tK) -> new NavigableMapKvIterator<>(_pendingWrites, tS, tK), (tS, tK) -> new NavigableMapKvIterator<>(_pendingWrites, tS, tK),
(tS, tK) -> (CloseableKvIterator<JObjectKey, MaybeTombstone<JDataVersionedWrapper>>) (CloseableKvIterator<JObjectKey, ?>) _cache.getIterator(tS, tK)); (tS, tK) -> (CloseableKvIterator<JObjectKey, MaybeTombstone<JDataVersionedWrapper>>) (CloseableKvIterator<JObjectKey, ?>) _cache.getIterator(tS, tK));
} }
@@ -393,6 +384,11 @@ public class WritebackObjectPersistentStore {
public interface VerboseReadResult { public interface VerboseReadResult {
} }
private record PendingWriteData(TreePMap<JObjectKey, PendingWriteEntry> pendingWrites,
long lastFlushedId,
long lastCommittedId) {
}
private static class TxBundle { private static class TxBundle {
private final LinkedHashMap<JObjectKey, BundleEntry> _entries = new LinkedHashMap<>(); private final LinkedHashMap<JObjectKey, BundleEntry> _entries = new LinkedHashMap<>();
private final ArrayList<Runnable> _callbacks = new ArrayList<>(); private final ArrayList<Runnable> _callbacks = new ArrayList<>();

View File

@@ -4,7 +4,6 @@ import com.usatiuk.objects.JData;
import com.usatiuk.objects.JObjectKey; import com.usatiuk.objects.JObjectKey;
import com.usatiuk.objects.iterators.CloseableKvIterator; import com.usatiuk.objects.iterators.CloseableKvIterator;
import com.usatiuk.objects.iterators.IteratorStart; import com.usatiuk.objects.iterators.IteratorStart;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject; import jakarta.inject.Inject;
import jakarta.inject.Singleton; import jakarta.inject.Singleton;

View File

@@ -1,11 +1,11 @@
package com.usatiuk.objects.transaction; package com.usatiuk.objects.transaction;
import com.usatiuk.dhfs.utils.AutoCloseableNoThrow;
import com.usatiuk.objects.JData; import com.usatiuk.objects.JData;
import com.usatiuk.objects.JDataVersionedWrapper; import com.usatiuk.objects.JDataVersionedWrapper;
import com.usatiuk.objects.JObjectKey; import com.usatiuk.objects.JObjectKey;
import com.usatiuk.objects.snapshot.Snapshot; import com.usatiuk.objects.snapshot.Snapshot;
import com.usatiuk.objects.stores.WritebackObjectPersistentStore; import com.usatiuk.objects.stores.WritebackObjectPersistentStore;
import com.usatiuk.utils.AutoCloseableNoThrow;
import io.quarkus.logging.Log; import io.quarkus.logging.Log;
import io.quarkus.runtime.StartupEvent; import io.quarkus.runtime.StartupEvent;
import jakarta.annotation.Priority; import jakarta.annotation.Priority;
@@ -23,12 +23,6 @@ import java.util.stream.Stream;
@ApplicationScoped @ApplicationScoped
public class JObjectManager { public class JObjectManager {
private final List<PreCommitTxHook> _preCommitTxHooks; private final List<PreCommitTxHook> _preCommitTxHooks;
private record CommitHookIterationData(PreCommitTxHook hook,
Map<JObjectKey, TxRecord.TxObjectRecord<?>> lastWrites,
Map<JObjectKey, TxRecord.TxObjectRecord<?>> pendingWrites) {
}
@Inject @Inject
WritebackObjectPersistentStore writebackObjectPersistentStore; WritebackObjectPersistentStore writebackObjectPersistentStore;
@Inject @Inject
@@ -36,7 +30,6 @@ public class JObjectManager {
@Inject @Inject
LockManager lockManager; LockManager lockManager;
private boolean _ready = false; private boolean _ready = false;
JObjectManager(Instance<PreCommitTxHook> preCommitTxHooks) { JObjectManager(Instance<PreCommitTxHook> preCommitTxHooks) {
_preCommitTxHooks = List.copyOf(preCommitTxHooks.stream().sorted(Comparator.comparingInt(PreCommitTxHook::getPriority)).toList()); _preCommitTxHooks = List.copyOf(preCommitTxHooks.stream().sorted(Comparator.comparingInt(PreCommitTxHook::getPriority)).toList());
Log.debugv("Pre-commit hooks: {0}", String.join("->", _preCommitTxHooks.stream().map(Objects::toString).toList())); Log.debugv("Pre-commit hooks: {0}", String.join("->", _preCommitTxHooks.stream().map(Objects::toString).toList()));
@@ -277,4 +270,9 @@ public class JObjectManager {
}); });
tx.close(); tx.close();
} }
private record CommitHookIterationData(PreCommitTxHook hook,
Map<JObjectKey, TxRecord.TxObjectRecord<?>> lastWrites,
Map<JObjectKey, TxRecord.TxObjectRecord<?>> pendingWrites) {
}
} }

View File

@@ -1,11 +1,10 @@
package com.usatiuk.objects.transaction; package com.usatiuk.objects.transaction;
import com.usatiuk.objects.JObjectKey; import com.usatiuk.objects.JObjectKey;
import com.usatiuk.dhfs.utils.AutoCloseableNoThrow; import com.usatiuk.utils.AutoCloseableNoThrow;
import com.usatiuk.dhfs.utils.DataLocker; import com.usatiuk.utils.DataLocker;
import jakarta.annotation.Nonnull; import jakarta.annotation.Nonnull;
import jakarta.annotation.Nullable; import jakarta.annotation.Nullable;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Singleton; import jakarta.inject.Singleton;
@Singleton @Singleton

View File

@@ -1,6 +1,6 @@
package com.usatiuk.objects.transaction; package com.usatiuk.objects.transaction;
import com.usatiuk.dhfs.utils.VoidFn; import com.usatiuk.utils.VoidFn;
import io.quarkus.logging.Log; import io.quarkus.logging.Log;
import java.util.function.Supplier; import java.util.function.Supplier;

View File

@@ -1,7 +1,6 @@
package com.usatiuk.objects.transaction; package com.usatiuk.objects.transaction;
import io.quarkus.logging.Log; import io.quarkus.logging.Log;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject; import jakarta.inject.Inject;
import jakarta.inject.Singleton; import jakarta.inject.Singleton;
import org.apache.commons.lang3.tuple.Pair; import org.apache.commons.lang3.tuple.Pair;

View File

@@ -2,7 +2,7 @@ package com.usatiuk.objects.transaction;
import com.usatiuk.objects.JData; import com.usatiuk.objects.JData;
import com.usatiuk.objects.JDataVersionedWrapper; import com.usatiuk.objects.JDataVersionedWrapper;
import com.usatiuk.dhfs.utils.AutoCloseableNoThrow; import com.usatiuk.utils.AutoCloseableNoThrow;
import java.util.Optional; import java.util.Optional;

View File

@@ -4,7 +4,7 @@ import com.usatiuk.objects.JData;
import com.usatiuk.objects.JDataVersionedWrapper; import com.usatiuk.objects.JDataVersionedWrapper;
import com.usatiuk.objects.JObjectKey; import com.usatiuk.objects.JObjectKey;
import com.usatiuk.objects.snapshot.Snapshot; import com.usatiuk.objects.snapshot.Snapshot;
import com.usatiuk.dhfs.utils.AutoCloseableNoThrow; import com.usatiuk.utils.AutoCloseableNoThrow;
import java.util.Collection; import java.util.Collection;
import java.util.Map; import java.util.Map;

View File

@@ -7,11 +7,8 @@ import io.quarkus.test.junit.QuarkusTest;
import io.quarkus.test.junit.TestProfile; import io.quarkus.test.junit.TestProfile;
import jakarta.inject.Inject; import jakarta.inject.Inject;
import org.apache.commons.lang3.tuple.Pair; import org.apache.commons.lang3.tuple.Pair;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream; import java.util.stream.Stream;
class ObjectsIterateAllTestProfiles { class ObjectsIterateAllTestProfiles {

View File

@@ -6,9 +6,42 @@ import net.jqwik.api.state.ActionChain;
import org.apache.commons.lang3.tuple.Pair; import org.apache.commons.lang3.tuple.Pair;
import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Assertions;
import java.util.*; import java.util.List;
import java.util.Map;
import java.util.TreeMap;
public class MergingKvIteratorPbtTest { public class MergingKvIteratorPbtTest {
@Property
public void checkMergingIterator(@ForAll("actions") ActionChain<MergingIteratorModel> actions) {
actions.run();
}
@Provide
Arbitrary<ActionChain<MergingIteratorModel>> actions(@ForAll("lists") List<List<Map.Entry<Integer, Integer>>> list,
@ForAll IteratorStart iteratorStart, @ForAll("startKey") Integer startKey) {
return ActionChain.startWith(() -> new MergingIteratorModel(list, iteratorStart, startKey))
.withAction(new NextAction())
.withAction(new PeekNextKeyAction())
.withAction(new SkipAction())
.withAction(new PeekPrevKeyAction())
.withAction(new SkipPrevAction())
.withAction(new PrevAction())
.withAction(new HasNextAction())
.withAction(new HasPrevAction());
}
@Provide
Arbitrary<List<List<Map.Entry<Integer, Integer>>>> lists() {
return Arbitraries.entries(Arbitraries.integers().between(-50, 50), Arbitraries.integers().between(-50, 50))
.list().uniqueElements(Map.Entry::getKey).ofMinSize(0).ofMaxSize(20)
.list().ofMinSize(1).ofMaxSize(5);
}
@Provide
Arbitrary<Integer> startKey() {
return Arbitraries.integers().between(-51, 51);
}
static class MergingIteratorModel implements CloseableKvIterator<Integer, Integer> { static class MergingIteratorModel implements CloseableKvIterator<Integer, Integer> {
private final CloseableKvIterator<Integer, Integer> mergedIterator; private final CloseableKvIterator<Integer, Integer> mergedIterator;
private final CloseableKvIterator<Integer, Integer> mergingIterator; private final CloseableKvIterator<Integer, Integer> mergingIterator;
@@ -228,35 +261,4 @@ public class MergingKvIteratorPbtTest {
return "Has prev key"; return "Has prev key";
} }
} }
@Property
public void checkMergingIterator(@ForAll("actions") ActionChain<MergingIteratorModel> actions) {
actions.run();
}
@Provide
Arbitrary<ActionChain<MergingIteratorModel>> actions(@ForAll("lists") List<List<Map.Entry<Integer, Integer>>> list,
@ForAll IteratorStart iteratorStart, @ForAll("startKey") Integer startKey) {
return ActionChain.startWith(() -> new MergingIteratorModel(list, iteratorStart, startKey))
.withAction(new NextAction())
.withAction(new PeekNextKeyAction())
.withAction(new SkipAction())
.withAction(new PeekPrevKeyAction())
.withAction(new SkipPrevAction())
.withAction(new PrevAction())
.withAction(new HasNextAction())
.withAction(new HasPrevAction());
}
@Provide
Arbitrary<List<List<Map.Entry<Integer, Integer>>>> lists() {
return Arbitraries.entries(Arbitraries.integers().between(-50, 50), Arbitraries.integers().between(-50, 50))
.list().uniqueElements(Map.Entry::getKey).ofMinSize(0).ofMaxSize(20)
.list().ofMinSize(1).ofMaxSize(5);
}
@Provide
Arbitrary<Integer> startKey() {
return Arbitraries.integers().between(-51, 51);
}
} }

View File

@@ -178,8 +178,8 @@
--initialize-at-run-time=jnr.ffi.util.ref.FinalizableReferenceQueue, --initialize-at-run-time=jnr.ffi.util.ref.FinalizableReferenceQueue,
--initialize-at-run-time=jnr.ffi.provider.jffi.NativeFinalizer, --initialize-at-run-time=jnr.ffi.provider.jffi.NativeFinalizer,
--initialize-at-run-time=jnr.ffi.provider.jffi.NativeFinalizer$SingletonHolder, --initialize-at-run-time=jnr.ffi.provider.jffi.NativeFinalizer$SingletonHolder,
--initialize-at-run-time=com.usatiuk.dhfs.utils.RefcountedCloseable, --initialize-at-run-time=com.usatiuk.utils.RefcountedCloseable,
--initialize-at-run-time=com.usatiuk.dhfs.utils.DataLocker$Lock, --initialize-at-run-time=com.usatiuk.utils.DataLocker$Lock,
--initialize-at-run-time=com.usatiuk.objects.stores.LmdbObjectPersistentStore$LmdbKvIterator, --initialize-at-run-time=com.usatiuk.objects.stores.LmdbObjectPersistentStore$LmdbKvIterator,
--initialize-at-run-time=com.usatiuk.objects.stores.LmdbObjectPersistentStore, --initialize-at-run-time=com.usatiuk.objects.stores.LmdbObjectPersistentStore,
--initialize-at-run-time=com.google.protobuf.UnsafeUtil --initialize-at-run-time=com.google.protobuf.UnsafeUtil

View File

@@ -1,7 +1,7 @@
package com.usatiuk.dhfs; package com.usatiuk.dhfs;
import com.usatiuk.objects.JObjectKey;
import com.usatiuk.dhfs.persistence.JObjectKeyP; import com.usatiuk.dhfs.persistence.JObjectKeyP;
import com.usatiuk.objects.JObjectKey;
import jakarta.inject.Singleton; import jakarta.inject.Singleton;
@Singleton @Singleton

View File

@@ -1,8 +1,8 @@
package com.usatiuk.dhfs; package com.usatiuk.dhfs;
import com.usatiuk.dhfs.invalidation.Op;
import com.usatiuk.dhfs.repository.OpP; import com.usatiuk.dhfs.repository.OpP;
import com.usatiuk.dhfs.repository.invalidation.Op; import com.usatiuk.utils.SerializationHelper;
import com.usatiuk.dhfs.utils.SerializationHelper;
import jakarta.inject.Singleton; import jakarta.inject.Singleton;
@Singleton @Singleton

View File

@@ -1,14 +1,14 @@
package com.usatiuk.dhfs.repository; package com.usatiuk.dhfs.autosync;
import com.usatiuk.dhfs.JDataRemote; import com.usatiuk.dhfs.remoteobj.JDataRemote;
import com.usatiuk.dhfs.RemoteObjectMeta; import com.usatiuk.dhfs.remoteobj.RemoteObjectMeta;
import com.usatiuk.dhfs.RemoteTransaction; import com.usatiuk.dhfs.remoteobj.RemoteTransaction;
import com.usatiuk.objects.JData; import com.usatiuk.objects.JData;
import com.usatiuk.objects.JObjectKey; import com.usatiuk.objects.JObjectKey;
import com.usatiuk.objects.iterators.IteratorStart; import com.usatiuk.objects.iterators.IteratorStart;
import com.usatiuk.objects.transaction.Transaction; import com.usatiuk.objects.transaction.Transaction;
import com.usatiuk.objects.transaction.TransactionManager; import com.usatiuk.objects.transaction.TransactionManager;
import com.usatiuk.dhfs.utils.HashSetDelayedBlockingQueue; import com.usatiuk.utils.HashSetDelayedBlockingQueue;
import io.quarkus.logging.Log; import io.quarkus.logging.Log;
import io.quarkus.runtime.ShutdownEvent; import io.quarkus.runtime.ShutdownEvent;
import io.quarkus.runtime.StartupEvent; import io.quarkus.runtime.StartupEvent;

View File

@@ -1,12 +1,11 @@
package com.usatiuk.dhfs.repository; package com.usatiuk.dhfs.autosync;
import com.usatiuk.dhfs.invalidation.InvalidationQueueService;
import com.usatiuk.dhfs.remoteobj.RemoteObjectMeta;
import com.usatiuk.objects.JData; import com.usatiuk.objects.JData;
import com.usatiuk.objects.JObjectKey; import com.usatiuk.objects.JObjectKey;
import com.usatiuk.dhfs.RemoteObjectMeta;
import com.usatiuk.dhfs.repository.invalidation.InvalidationQueueService;
import com.usatiuk.objects.transaction.PreCommitTxHook; import com.usatiuk.objects.transaction.PreCommitTxHook;
import com.usatiuk.objects.transaction.Transaction; import com.usatiuk.objects.transaction.Transaction;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject; import jakarta.inject.Inject;
import jakarta.inject.Singleton; import jakarta.inject.Singleton;
import org.eclipse.microprofile.config.inject.ConfigProperty; import org.eclipse.microprofile.config.inject.ConfigProperty;

View File

@@ -1,6 +1,6 @@
package com.usatiuk.dhfs.repository.invalidation; package com.usatiuk.dhfs.invalidation;
import com.usatiuk.dhfs.PeerId; import com.usatiuk.dhfs.peersync.PeerId;
import org.apache.commons.collections4.MultiValuedMap; import org.apache.commons.collections4.MultiValuedMap;
import org.apache.commons.collections4.multimap.HashSetValuedHashMap; import org.apache.commons.collections4.multimap.HashSetValuedHashMap;

View File

@@ -1,9 +1,9 @@
package com.usatiuk.dhfs.repository.invalidation; package com.usatiuk.dhfs.invalidation;
import com.usatiuk.dhfs.PeerId; import com.usatiuk.dhfs.peersync.PeerConnectedEventListener;
import com.usatiuk.dhfs.repository.PeerConnectedEventListener; import com.usatiuk.dhfs.peersync.PeerId;
import com.usatiuk.dhfs.repository.PeerManager; import com.usatiuk.dhfs.peersync.PeerManager;
import com.usatiuk.dhfs.utils.SerializationHelper; import com.usatiuk.utils.SerializationHelper;
import io.quarkus.logging.Log; import io.quarkus.logging.Log;
import io.quarkus.runtime.ShutdownEvent; import io.quarkus.runtime.ShutdownEvent;
import io.quarkus.runtime.StartupEvent; import io.quarkus.runtime.StartupEvent;

View File

@@ -1,8 +1,8 @@
package com.usatiuk.dhfs.repository.invalidation; package com.usatiuk.dhfs.invalidation;
import com.usatiuk.dhfs.peersync.PeerId;
import com.usatiuk.dhfs.remoteobj.JDataRemoteDto;
import com.usatiuk.objects.JObjectKey; import com.usatiuk.objects.JObjectKey;
import com.usatiuk.dhfs.PeerId;
import com.usatiuk.dhfs.repository.JDataRemoteDto;
import org.pcollections.PMap; import org.pcollections.PMap;
import java.util.Collection; import java.util.Collection;

View File

@@ -1,7 +1,7 @@
package com.usatiuk.dhfs.repository.invalidation; package com.usatiuk.dhfs.invalidation;
import com.usatiuk.dhfs.peersync.PeerId;
import com.usatiuk.objects.JObjectKey; import com.usatiuk.objects.JObjectKey;
import com.usatiuk.dhfs.PeerId;
import java.io.Serializable; import java.io.Serializable;

View File

@@ -1,14 +1,17 @@
package com.usatiuk.dhfs.repository.invalidation; package com.usatiuk.dhfs.invalidation;
import com.usatiuk.dhfs.repository.RemoteObjectServiceClient; import com.usatiuk.dhfs.peersync.PeerId;
import com.usatiuk.dhfs.utils.AutoCloseableNoThrow; import com.usatiuk.dhfs.peersync.PeerInfoService;
import com.usatiuk.dhfs.utils.DataLocker; import com.usatiuk.dhfs.peersync.PeerManager;
import com.usatiuk.dhfs.peersync.PersistentPeerDataService;
import com.usatiuk.dhfs.rpc.RemoteObjectServiceClient;
import com.usatiuk.objects.JData;
import com.usatiuk.objects.JObjectKey; import com.usatiuk.objects.JObjectKey;
import com.usatiuk.dhfs.PeerId; import com.usatiuk.objects.transaction.Transaction;
import com.usatiuk.dhfs.repository.PeerManager; import com.usatiuk.objects.transaction.TransactionManager;
import com.usatiuk.dhfs.repository.PersistentPeerDataService; import com.usatiuk.utils.AutoCloseableNoThrow;
import com.usatiuk.dhfs.repository.peersync.PeerInfoService; import com.usatiuk.utils.DataLocker;
import com.usatiuk.dhfs.utils.HashSetDelayedBlockingQueue; import com.usatiuk.utils.HashSetDelayedBlockingQueue;
import io.quarkus.logging.Log; import io.quarkus.logging.Log;
import io.quarkus.runtime.ShutdownEvent; import io.quarkus.runtime.ShutdownEvent;
import io.quarkus.runtime.StartupEvent; import io.quarkus.runtime.StartupEvent;
@@ -17,7 +20,6 @@ import jakarta.annotation.Priority;
import jakarta.enterprise.context.ApplicationScoped; import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes; import jakarta.enterprise.event.Observes;
import jakarta.inject.Inject; import jakarta.inject.Inject;
import jakarta.ws.rs.core.Link;
import org.apache.commons.collections4.multimap.ArrayListValuedHashMap; import org.apache.commons.collections4.multimap.ArrayListValuedHashMap;
import org.apache.commons.lang3.concurrent.BasicThreadFactory; import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.eclipse.microprofile.config.inject.ConfigProperty; import org.eclipse.microprofile.config.inject.ConfigProperty;
@@ -33,6 +35,7 @@ import java.util.concurrent.atomic.AtomicReference;
public class InvalidationQueueService { public class InvalidationQueueService {
private final HashSetDelayedBlockingQueue<InvalidationQueueEntry> _queue; private final HashSetDelayedBlockingQueue<InvalidationQueueEntry> _queue;
private final AtomicReference<ConcurrentHashSet<JObjectKey>> _toAllQueue = new AtomicReference<>(new ConcurrentHashSet<>()); private final AtomicReference<ConcurrentHashSet<JObjectKey>> _toAllQueue = new AtomicReference<>(new ConcurrentHashSet<>());
private final DataLocker _locker = new DataLocker();
@Inject @Inject
PeerManager remoteHostManager; PeerManager remoteHostManager;
@Inject @Inject
@@ -40,15 +43,17 @@ public class InvalidationQueueService {
@Inject @Inject
PeerInfoService peerInfoService; PeerInfoService peerInfoService;
@Inject @Inject
OpPusher opPusher; TransactionManager txm;
@Inject
Transaction curTx;
@ConfigProperty(name = "dhfs.objects.invalidation.threads") @ConfigProperty(name = "dhfs.objects.invalidation.threads")
int threads; int threads;
@Inject @Inject
PersistentPeerDataService persistentPeerDataService; PersistentPeerDataService persistentPeerDataService;
private final DataLocker _locker = new DataLocker();
@Inject @Inject
RemoteObjectServiceClient remoteObjectServiceClient; RemoteObjectServiceClient remoteObjectServiceClient;
@Inject
OpExtractorService opExtractorService;
private ExecutorService _executor; private ExecutorService _executor;
private volatile boolean _shutdown = false; private volatile boolean _shutdown = false;
@@ -141,9 +146,15 @@ public class InvalidationQueueService {
} }
locks.add(lock); locks.add(lock);
try { try {
var prepared = opPusher.preparePush(e); txm.run(() -> {
ops.get(e.peer()).addAll(prepared.getLeft()); var obj = curTx.get(JData.class, e.key()).orElse(null);
commits.get(e.peer()).addAll(prepared.getRight()); if (obj == null) return;
var extracted = opExtractorService.extractOps(obj, e.peer());
if (extracted == null) return;
ops.get(e.peer()).addAll(extracted.getLeft());
commits.get(e.peer()).add(extracted.getRight());
});
success++; success++;
} catch (Exception ex) { } catch (Exception ex) {
Log.warnv("Failed to prepare invalidation to {0}, will retry: {1}", e, ex); Log.warnv("Failed to prepare invalidation to {0}, will retry: {1}", e, ex);

View File

@@ -1,4 +1,4 @@
package com.usatiuk.dhfs.repository.invalidation; package com.usatiuk.dhfs.invalidation;
import com.usatiuk.objects.JObjectKey; import com.usatiuk.objects.JObjectKey;

View File

@@ -0,0 +1,11 @@
package com.usatiuk.dhfs.invalidation;
import com.usatiuk.dhfs.peersync.PeerId;
import com.usatiuk.objects.JData;
import org.apache.commons.lang3.tuple.Pair;
import java.util.List;
public interface OpExtractor<T extends JData> {
Pair<List<Op>, Runnable> extractOps(T data, PeerId peerId);
}

View File

@@ -0,0 +1,48 @@
package com.usatiuk.dhfs.invalidation;
import com.usatiuk.dhfs.peersync.PeerId;
import com.usatiuk.objects.JData;
import jakarta.annotation.Nullable;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Instance;
import org.apache.commons.lang3.tuple.Pair;
import java.lang.reflect.ParameterizedType;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;
@ApplicationScoped
public class OpExtractorService {
private final Map<Class<? extends JData>, OpExtractor> _opExtractorMap;
public OpExtractorService(Instance<OpExtractor<?>> opExtractors) {
HashMap<Class<? extends JData>, OpExtractor> opExtractorMap = new HashMap<>();
for (var opExtractor : opExtractors.handles()) {
for (var type : Arrays.stream(opExtractor.getBean().getBeanClass().getGenericInterfaces()).flatMap(
t -> {
if (!(t instanceof ParameterizedType pm)) return Stream.empty();
if (pm.getRawType().equals(OpExtractor.class)) return Stream.of(pm);
return Stream.empty();
}
).toList()) {
var orig = type.getActualTypeArguments()[0];
assert JData.class.isAssignableFrom((Class<?>) orig);
opExtractorMap.put((Class<? extends JData>) orig, opExtractor.get());
}
}
_opExtractorMap = Map.copyOf(opExtractorMap);
}
public @Nullable Pair<List<Op>, Runnable> extractOps(JData data, PeerId peerId) {
var extractor = _opExtractorMap.get(data.getClass());
if (extractor == null) {
return null;
}
return extractor.extractOps(data, peerId);
}
}

View File

@@ -1,9 +1,9 @@
package com.usatiuk.dhfs.repository.invalidation; package com.usatiuk.dhfs.invalidation;
import com.usatiuk.dhfs.PeerId;
import com.usatiuk.dhfs.jkleppmanntree.JKleppmannTreeManager; import com.usatiuk.dhfs.jkleppmanntree.JKleppmannTreeManager;
import com.usatiuk.dhfs.jkleppmanntree.JKleppmannTreeOpWrapper; import com.usatiuk.dhfs.jkleppmanntree.JKleppmannTreeOpWrapper;
import com.usatiuk.dhfs.jkleppmanntree.JKleppmannTreePeriodicPushOp; import com.usatiuk.dhfs.jkleppmanntree.JKleppmannTreePeriodicPushOp;
import com.usatiuk.dhfs.peersync.PeerId;
import com.usatiuk.objects.transaction.Transaction; import com.usatiuk.objects.transaction.Transaction;
import jakarta.enterprise.context.ApplicationScoped; import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject; import jakarta.inject.Inject;
@@ -23,11 +23,11 @@ public class OpHandler {
if (op instanceof IndexUpdateOp iu) { if (op instanceof IndexUpdateOp iu) {
pushOpHandler.handlePush(from, iu); pushOpHandler.handlePush(from, iu);
} else if (op instanceof JKleppmannTreeOpWrapper jk) { } else if (op instanceof JKleppmannTreeOpWrapper jk) {
var tree = jKleppmannTreeManager.getTree(jk.treeName()); var tree = jKleppmannTreeManager.getTree(jk.treeName()).orElseThrow();
tree.acceptExternalOp(from, jk); tree.acceptExternalOp(from, jk);
curTx.onCommit(() -> invalidationQueueService.pushInvalidationToOne(from, jk.treeName())); curTx.onCommit(() -> invalidationQueueService.pushInvalidationToOne(from, jk.treeName()));
} else if (op instanceof JKleppmannTreePeriodicPushOp pop) { } else if (op instanceof JKleppmannTreePeriodicPushOp pop) {
var tree = jKleppmannTreeManager.getTree(pop.treeName()); var tree = jKleppmannTreeManager.getTree(pop.treeName()).orElseThrow();
tree.acceptExternalOp(from, pop); tree.acceptExternalOp(from, pop);
} }
} }

View File

@@ -1,8 +1,8 @@
package com.usatiuk.dhfs.repository.invalidation; package com.usatiuk.dhfs.invalidation;
import com.usatiuk.dhfs.PeerId; import com.usatiuk.dhfs.peersync.PeerId;
import com.usatiuk.dhfs.RemoteTransaction; import com.usatiuk.dhfs.remoteobj.RemoteTransaction;
import com.usatiuk.dhfs.repository.SyncHandler; import com.usatiuk.dhfs.remoteobj.SyncHandler;
import com.usatiuk.objects.transaction.Transaction; import com.usatiuk.objects.transaction.Transaction;
import jakarta.enterprise.context.ApplicationScoped; import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject; import jakarta.inject.Inject;

View File

@@ -0,0 +1,45 @@
package com.usatiuk.dhfs.invalidation;
import com.usatiuk.dhfs.peersync.PeerId;
import com.usatiuk.dhfs.remoteobj.JDataRemoteDto;
import com.usatiuk.dhfs.remoteobj.JDataRemotePush;
import com.usatiuk.dhfs.remoteobj.RemoteObjectMeta;
import com.usatiuk.dhfs.remoteobj.RemoteTransaction;
import com.usatiuk.dhfs.syncmap.DtoMapperService;
import com.usatiuk.objects.transaction.Transaction;
import com.usatiuk.objects.transaction.TransactionManager;
import io.quarkus.logging.Log;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import org.apache.commons.lang3.tuple.Pair;
import java.util.List;
@ApplicationScoped
public class RemoteObjectMetaOpExtractor implements OpExtractor<RemoteObjectMeta> {
@Inject
TransactionManager txm;
@Inject
Transaction curTx;
@Inject
RemoteTransaction remoteTransaction;
@Inject
DtoMapperService dtoMapperService;
@Override
public Pair<List<Op>, Runnable> extractOps(RemoteObjectMeta data, PeerId peerId) {
return txm.run(() -> {
JDataRemoteDto dto =
data.knownType().isAnnotationPresent(JDataRemotePush.class)
? remoteTransaction.getData(data.knownType(), data.key())
.map(d -> dtoMapperService.toDto(d, d.dtoClass())).orElse(null)
: null;
if (data.knownType().isAnnotationPresent(JDataRemotePush.class) && dto == null) {
Log.warnv("Failed to get data for push {0} of type {1}", data.key(), data.knownType());
}
return Pair.of(List.of(new IndexUpdateOp(data.key(), data.changelog(), dto)), () -> {
});
});
}
}

View File

@@ -1,9 +1,9 @@
package com.usatiuk.dhfs.jkleppmanntree; package com.usatiuk.dhfs.jkleppmanntree;
import com.usatiuk.dhfs.PeerId; import com.usatiuk.dhfs.invalidation.InvalidationQueueService;
import com.usatiuk.dhfs.jkleppmanntree.structs.JKleppmannTreePersistentData; import com.usatiuk.dhfs.jkleppmanntree.structs.JKleppmannTreePersistentData;
import com.usatiuk.dhfs.repository.InitialSyncProcessor; import com.usatiuk.dhfs.peersync.InitialSyncProcessor;
import com.usatiuk.dhfs.repository.invalidation.InvalidationQueueService; import com.usatiuk.dhfs.peersync.PeerId;
import com.usatiuk.objects.JObjectKey; import com.usatiuk.objects.JObjectKey;
import io.quarkus.logging.Log; import io.quarkus.logging.Log;
import jakarta.enterprise.context.ApplicationScoped; import jakarta.enterprise.context.ApplicationScoped;
@@ -18,7 +18,7 @@ public class JKleppmannTreeInitialSyncProcessor implements InitialSyncProcessor<
@Override @Override
public void prepareForInitialSync(PeerId from, JObjectKey key) { public void prepareForInitialSync(PeerId from, JObjectKey key) {
var tree = jKleppmannTreeManager.getTree(key); var tree = jKleppmannTreeManager.getTree(key).orElseThrow();
tree.recordBootstrap(from); tree.recordBootstrap(from);
} }

View File

@@ -1,15 +1,17 @@
package com.usatiuk.dhfs.jkleppmanntree; package com.usatiuk.dhfs.jkleppmanntree;
import com.usatiuk.dhfs.invalidation.Op;
import com.usatiuk.dhfs.jkleppmanntree.structs.JKleppmannTreeNode;
import com.usatiuk.dhfs.jkleppmanntree.structs.JKleppmannTreeNodeMeta;
import com.usatiuk.dhfs.jkleppmanntree.structs.JKleppmannTreePersistentData;
import com.usatiuk.dhfs.peersync.PeerId;
import com.usatiuk.dhfs.peersync.PeerInfoService;
import com.usatiuk.dhfs.peersync.PersistentPeerDataService;
import com.usatiuk.kleppmanntree.*;
import com.usatiuk.objects.JObjectKey; import com.usatiuk.objects.JObjectKey;
import com.usatiuk.dhfs.PeerId;
import com.usatiuk.dhfs.jkleppmanntree.structs.*;
import com.usatiuk.dhfs.repository.PersistentPeerDataService;
import com.usatiuk.dhfs.repository.invalidation.Op;
import com.usatiuk.dhfs.repository.peersync.PeerInfoService;
import com.usatiuk.objects.transaction.LockingStrategy; import com.usatiuk.objects.transaction.LockingStrategy;
import com.usatiuk.objects.transaction.Transaction; import com.usatiuk.objects.transaction.Transaction;
import com.usatiuk.objects.transaction.TransactionManager; import com.usatiuk.objects.transaction.TransactionManager;
import com.usatiuk.kleppmanntree.*;
import io.quarkus.logging.Log; import io.quarkus.logging.Log;
import jakarta.enterprise.context.ApplicationScoped; import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject; import jakarta.inject.Inject;
@@ -20,6 +22,7 @@ import org.pcollections.TreePSet;
import java.util.*; import java.util.*;
import java.util.function.Function; import java.util.function.Function;
import java.util.function.Supplier;
@ApplicationScoped @ApplicationScoped
public class JKleppmannTreeManager { public class JKleppmannTreeManager {
@@ -35,7 +38,7 @@ public class JKleppmannTreeManager {
@Inject @Inject
PersistentPeerDataService persistentPeerDataService; PersistentPeerDataService persistentPeerDataService;
public JKleppmannTree getTree(JObjectKey name, LockingStrategy lockingStrategy) { public JKleppmannTree getTree(JObjectKey name, LockingStrategy lockingStrategy, Supplier<JKleppmannTreeNodeMeta> rootNodeSupplier) {
return txManager.executeTx(() -> { return txManager.executeTx(() -> {
var data = curTx.get(JKleppmannTreePersistentData.class, name, lockingStrategy).orElse(null); var data = curTx.get(JKleppmannTreePersistentData.class, name, lockingStrategy).orElse(null);
if (data == null) { if (data == null) {
@@ -49,11 +52,11 @@ public class JKleppmannTreeManager {
TreePMap.empty() TreePMap.empty()
); );
curTx.put(data); curTx.put(data);
var rootNode = new JKleppmannTreeNode(JObjectKey.of(name.value() + "_jt_root"), null, new JKleppmannTreeNodeMetaDirectory("")); var rootNode = new JKleppmannTreeNode(JObjectKey.of(name.value() + "_jt_root"), null, rootNodeSupplier.get());
curTx.put(rootNode); curTx.put(rootNode);
var trashNode = new JKleppmannTreeNode(JObjectKey.of(name.value() + "_jt_trash"), null, new JKleppmannTreeNodeMetaDirectory("")); var trashNode = new JKleppmannTreeNode(JObjectKey.of(name.value() + "_jt_trash"), null, rootNodeSupplier.get());
curTx.put(trashNode); curTx.put(trashNode);
var lf_node = new JKleppmannTreeNode(JObjectKey.of(name.value() + "_jt_lf"), null, new JKleppmannTreeNodeMetaDirectory("")); var lf_node = new JKleppmannTreeNode(JObjectKey.of(name.value() + "_jt_lf"), null, rootNodeSupplier.get());
curTx.put(lf_node); curTx.put(lf_node);
} }
return new JKleppmannTree(data); return new JKleppmannTree(data);
@@ -61,10 +64,20 @@ public class JKleppmannTreeManager {
}); });
} }
public JKleppmannTree getTree(JObjectKey name) { public Optional<JKleppmannTree> getTree(JObjectKey name) {
return getTree(name, LockingStrategy.WRITE); return getTree(name, LockingStrategy.WRITE);
} }
public Optional<JKleppmannTree> getTree(JObjectKey name, LockingStrategy lockingStrategy) {
return txManager.executeTx(() -> {
return curTx.get(JKleppmannTreePersistentData.class, name, lockingStrategy).map(JKleppmannTree::new);
});
}
public JKleppmannTree getTree(JObjectKey name, Supplier<JKleppmannTreeNodeMeta> rootNodeSupplier) {
return getTree(name, LockingStrategy.WRITE, rootNodeSupplier);
}
public class JKleppmannTree { public class JKleppmannTree {
private final KleppmannTree<Long, PeerId, JKleppmannTreeNodeMeta, JObjectKey> _tree; private final KleppmannTree<Long, PeerId, JKleppmannTreeNodeMeta, JObjectKey> _tree;
private final JKleppmannTreeStorageInterface _storageInterface; private final JKleppmannTreeStorageInterface _storageInterface;
@@ -155,7 +168,7 @@ public class JKleppmannTreeManager {
// } // }
if (Log.isTraceEnabled()) if (Log.isTraceEnabled())
Log.trace("Received op from " + from + ": " + jop.op().timestamp().timestamp() + " " + jop.op().childId() + "->" + jop.op().newParentId() + " as " + jop.op().newMeta().getName()); Log.trace("Received op from " + from + ": " + jop.op().timestamp().timestamp() + " " + jop.op().childId() + "->" + jop.op().newParentId() + " as " + jop.op().newMeta().name());
try { try {
_tree.applyExternalOp(from, jop.op()); _tree.applyExternalOp(from, jop.op());

View File

@@ -0,0 +1,59 @@
package com.usatiuk.dhfs.jkleppmanntree;
import com.usatiuk.dhfs.invalidation.InvalidationQueueService;
import com.usatiuk.dhfs.invalidation.Op;
import com.usatiuk.dhfs.invalidation.OpExtractor;
import com.usatiuk.dhfs.jkleppmanntree.structs.JKleppmannTreePersistentData;
import com.usatiuk.dhfs.peersync.PeerId;
import com.usatiuk.dhfs.remoteobj.RemoteTransaction;
import com.usatiuk.dhfs.syncmap.DtoMapperService;
import com.usatiuk.objects.transaction.Transaction;
import com.usatiuk.objects.transaction.TransactionManager;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import org.apache.commons.lang3.tuple.Pair;
import java.util.List;
@ApplicationScoped
public class JKleppmannTreeOpExtractor implements OpExtractor<JKleppmannTreePersistentData> {
@Inject
TransactionManager txm;
@Inject
Transaction curTx;
@Inject
RemoteTransaction remoteTransaction;
@Inject
DtoMapperService dtoMapperService;
@Inject
JKleppmannTreeManager jKleppmannTreeManager;
@Inject
InvalidationQueueService invalidationQueueService;
@Override
public Pair<List<Op>, Runnable> extractOps(JKleppmannTreePersistentData data, PeerId peerId) {
return txm.run(() -> {
var tree = jKleppmannTreeManager.getTree(data.key()).orElseThrow();
if (!tree.hasPendingOpsForHost(peerId))
return Pair.of(List.of(tree.getPeriodicPushOp()), (Runnable) () -> {
});
var ops = tree.getPendingOpsForHost(peerId, 100);
if (tree.hasPendingOpsForHost(peerId)) {
curTx.onCommit(() -> invalidationQueueService.pushInvalidationToOneNoDelay(peerId, data.key()));
}
var key = data.key();
return Pair.<List<Op>, Runnable>of(ops, (Runnable) () -> {
txm.run(() -> {
var commitTree = jKleppmannTreeManager.getTree(key).orElseThrow();
for (var op : ops) {
commitTree.commitOpForHost(peerId, op);
}
});
});
});
}
}

View File

@@ -1,24 +1,19 @@
package com.usatiuk.dhfs.jkleppmanntree; package com.usatiuk.dhfs.jkleppmanntree;
import com.usatiuk.objects.JObjectKey; import com.usatiuk.dhfs.invalidation.Op;
import com.usatiuk.dhfs.PeerId;
import com.usatiuk.dhfs.jkleppmanntree.structs.JKleppmannTreeNodeMeta; import com.usatiuk.dhfs.jkleppmanntree.structs.JKleppmannTreeNodeMeta;
import com.usatiuk.dhfs.jkleppmanntree.structs.JKleppmannTreeNodeMetaFile; import com.usatiuk.dhfs.peersync.PeerId;
import com.usatiuk.dhfs.repository.invalidation.Op;
import com.usatiuk.kleppmanntree.OpMove; import com.usatiuk.kleppmanntree.OpMove;
import com.usatiuk.objects.JObjectKey;
import java.io.Serializable; import java.io.Serializable;
import java.util.Collection; import java.util.Collection;
import java.util.List;
// Wrapper to avoid having to specify generic types // Wrapper to avoid having to specify generic types
public record JKleppmannTreeOpWrapper(JObjectKey treeName, public record JKleppmannTreeOpWrapper(JObjectKey treeName,
OpMove<Long, PeerId, JKleppmannTreeNodeMeta, JObjectKey> op) implements Op, Serializable { OpMove<Long, PeerId, JKleppmannTreeNodeMeta, JObjectKey> op) implements Op, Serializable {
@Override @Override
public Collection<JObjectKey> getEscapedRefs() { public Collection<JObjectKey> getEscapedRefs() {
if (op.newMeta() instanceof JKleppmannTreeNodeMetaFile mf) { return op.newMeta().collectRefsTo();
return List.of(mf.getFileIno());
}
return List.of();
} }
} }

View File

@@ -1,9 +1,9 @@
package com.usatiuk.dhfs.jkleppmanntree; package com.usatiuk.dhfs.jkleppmanntree;
import com.usatiuk.dhfs.PeerId; import com.usatiuk.dhfs.peersync.PeerId;
import com.usatiuk.dhfs.repository.PersistentPeerDataService; import com.usatiuk.dhfs.peersync.PeerInfo;
import com.usatiuk.dhfs.repository.peersync.PeerInfo; import com.usatiuk.dhfs.peersync.PeerInfoService;
import com.usatiuk.dhfs.repository.peersync.PeerInfoService; import com.usatiuk.dhfs.peersync.PersistentPeerDataService;
import com.usatiuk.kleppmanntree.PeerInterface; import com.usatiuk.kleppmanntree.PeerInterface;
import jakarta.inject.Inject; import jakarta.inject.Inject;
import jakarta.inject.Singleton; import jakarta.inject.Singleton;

View File

@@ -1,8 +1,8 @@
package com.usatiuk.dhfs.jkleppmanntree; package com.usatiuk.dhfs.jkleppmanntree;
import com.usatiuk.dhfs.invalidation.Op;
import com.usatiuk.dhfs.peersync.PeerId;
import com.usatiuk.objects.JObjectKey; import com.usatiuk.objects.JObjectKey;
import com.usatiuk.dhfs.PeerId;
import com.usatiuk.dhfs.repository.invalidation.Op;
import java.io.Serializable; import java.io.Serializable;
import java.util.Collection; import java.util.Collection;

View File

@@ -1,13 +1,12 @@
package com.usatiuk.dhfs.jkleppmanntree.structs; package com.usatiuk.dhfs.jkleppmanntree.structs;
import com.usatiuk.dhfs.JDataRef; import com.usatiuk.dhfs.peersync.PeerId;
import com.usatiuk.dhfs.JDataRefcounted; import com.usatiuk.dhfs.refcount.JDataRef;
import com.usatiuk.objects.JObjectKey; import com.usatiuk.dhfs.refcount.JDataRefcounted;
import com.usatiuk.dhfs.PeerId;
import com.usatiuk.dhfs.repository.peersync.structs.JKleppmannTreeNodeMetaPeer;
import com.usatiuk.kleppmanntree.OpMove; import com.usatiuk.kleppmanntree.OpMove;
import com.usatiuk.kleppmanntree.TreeNode; import com.usatiuk.kleppmanntree.TreeNode;
import com.usatiuk.objects.JObjectKeyImpl; import com.usatiuk.objects.JObjectKey;
import jakarta.annotation.Nullable;
import org.pcollections.HashTreePMap; import org.pcollections.HashTreePMap;
import org.pcollections.PCollection; import org.pcollections.PCollection;
import org.pcollections.PMap; import org.pcollections.PMap;
@@ -15,13 +14,14 @@ import org.pcollections.TreePSet;
import java.io.Serializable; import java.io.Serializable;
import java.util.Collection; import java.util.Collection;
import java.util.Optional;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;
// FIXME: Ideally this is two classes? // FIXME: Ideally this is two classes?
public record JKleppmannTreeNode(JObjectKey key, PCollection<JDataRef> refsFrom, boolean frozen, JObjectKey parent, public record JKleppmannTreeNode(JObjectKey key, PCollection<JDataRef> refsFrom, boolean frozen, JObjectKey parent,
OpMove<Long, PeerId, JKleppmannTreeNodeMeta, JObjectKey> lastEffectiveOp, OpMove<Long, PeerId, JKleppmannTreeNodeMeta, JObjectKey> lastEffectiveOp,
JKleppmannTreeNodeMeta meta, @Nullable JKleppmannTreeNodeMeta meta,
PMap<String, JObjectKey> children) implements TreeNode<Long, PeerId, JKleppmannTreeNodeMeta, JObjectKey>, JDataRefcounted, Serializable { PMap<String, JObjectKey> children) implements TreeNode<Long, PeerId, JKleppmannTreeNodeMeta, JObjectKey>, JDataRefcounted, Serializable {
public JKleppmannTreeNode(JObjectKey id, JObjectKey parent, JKleppmannTreeNodeMeta meta) { public JKleppmannTreeNode(JObjectKey id, JObjectKey parent, JKleppmannTreeNodeMeta meta) {
@@ -61,13 +61,9 @@ public record JKleppmannTreeNode(JObjectKey key, PCollection<JDataRef> refsFrom,
@Override @Override
public Collection<JObjectKey> collectRefsTo() { public Collection<JObjectKey> collectRefsTo() {
return Stream.<JObjectKey>concat(children().values().stream(), return Stream.<JObjectKey>concat(children().values().stream(),
switch (meta()) { Optional.ofNullable(meta)
case JKleppmannTreeNodeMetaDirectory dir -> Stream.<JObjectKey>empty(); .<Stream<JObjectKey>>map(o -> o.collectRefsTo().stream())
case JKleppmannTreeNodeMetaFile file -> Stream.of(file.getFileIno()); .orElse(Stream.empty()))
case JKleppmannTreeNodeMetaPeer peer -> Stream.of(peer.getPeerId()); .collect(Collectors.toUnmodifiableSet());
case null -> Stream.<JObjectKey>empty();
default -> throw new IllegalStateException("Unexpected value: " + meta());
}
).collect(Collectors.toUnmodifiableSet());
} }
} }

View File

@@ -1,41 +1,13 @@
package com.usatiuk.dhfs.jkleppmanntree.structs; package com.usatiuk.dhfs.jkleppmanntree.structs;
import com.usatiuk.kleppmanntree.NodeMeta; import com.usatiuk.kleppmanntree.NodeMeta;
import com.usatiuk.objects.JObjectKey;
import java.util.Objects; import java.util.Collection;
//@ProtoMirror(JKleppmannTreeNodeMetaP.class) //@ProtoMirror(JKleppmannTreeNodeMetaP.class)
public abstract class JKleppmannTreeNodeMeta implements NodeMeta { public interface JKleppmannTreeNodeMeta extends NodeMeta {
private final String _name; JKleppmannTreeNodeMeta withName(String name);
public JKleppmannTreeNodeMeta(String name) { Collection<JObjectKey> collectRefsTo();
_name = name;
}
public String getName() {
return _name;
}
public abstract JKleppmannTreeNodeMeta withName(String name);
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
JKleppmannTreeNodeMeta that = (JKleppmannTreeNodeMeta) o;
return Objects.equals(_name, that._name);
}
@Override
public int hashCode() {
return Objects.hashCode(_name);
}
@Override
public String toString() {
return "JKleppmannTreeNodeMeta{" +
"class=" + this.getClass().getSimpleName() + " " +
"_name='" + _name + '\'' +
'}';
}
} }

Some files were not shown because too many files have changed in this diff Show More