44 Commits

Author SHA1 Message Date
6a962022bb dump 2025-04-28 09:03:13 +02:00
f87eb365c3 Sync-base: remove our referrers from canDelete 2025-04-26 16:07:23 +02:00
8d3244fe64 Webui: use node-forge for hashing
apparently crypto works only on ssl websites
2025-04-26 14:09:05 +02:00
0a8985c93f Short readme 2025-04-26 11:19:06 +02:00
a8cf483eee Simplify ObjectPersistentStore 2025-04-26 11:11:51 +02:00
f7338f4e80 Dhfs-app: check that kill tests ls/cat return success 2025-04-26 10:44:33 +02:00
b89b182c58 Dhfs-fuse: make lmdb map size configurable 2025-04-25 23:11:02 +02:00
ad4ce72fdd Dhfs-fuse: attempt at windows support 2025-04-25 22:17:55 +02:00
26ba65fdce Sync-base: make Pushing invalidations log message trace
it's too big
2025-04-25 22:02:21 +02:00
697add66d5 Kelppmanntree: fix a dumb bug
directories are always the same duh
2025-04-25 22:00:33 +02:00
a53fc5e973 Kelppmanntree: remove undocontext 2025-04-25 21:37:50 +02:00
b034591091 Sync-base: OpHandler interface 2025-04-25 15:04:07 +02:00
07133a7186 Sync-base: get rid of JDataRemotePush 2025-04-25 14:57:06 +02:00
8cbecf1714 Dhfs-fs: remove optional from read 2025-04-25 13:40:48 +02:00
16ba692019 Recordify tree metadata 2025-04-25 13:35:54 +02:00
e5be1e6164 Cleanup poms 2025-04-25 13:13:21 +02:00
c74fdfc5a6 Dhfs-app: test fixes 2 2025-04-25 12:59:25 +02:00
c4268ab35b Dhfs-app: test fixes 2025-04-25 11:23:12 +02:00
2ab6e3c3f7 Sync-base: Handle getting peer info failure nicely 2025-04-25 11:15:36 +02:00
ec8546bd69 Show Peer address in WebUI 2025-04-25 11:13:30 +02:00
6ecef94b90 Webui: a little nicer 2025-04-25 11:07:08 +02:00
e7f22d783f Webui: proper async hash 2025-04-25 11:03:39 +02:00
bed55162d7 Peer certificate check when adding 2025-04-25 10:48:55 +02:00
f43c6db4f0 Run code format 2025-04-25 09:58:46 +02:00
56a15f4672 Sync-base: cleanup JKleppmannTree meta 2025-04-25 09:57:44 +02:00
85a1fa09ab KleppmannTree: a little cleanup 2025-04-25 09:45:35 +02:00
cca0b410cf Some packages cleanup 2025-04-25 09:16:31 +02:00
d94abfee97 Sync-base: op extractor interface 2025-04-25 09:16:31 +02:00
dependabot[bot]
6bd92ad7cd Bump the npm_and_yarn group across 1 directory with 2 updates
Bumps the npm_and_yarn group with 2 updates in the /webui directory: [react-router](https://github.com/remix-run/react-router/tree/HEAD/packages/react-router) and [react-router-dom](https://github.com/remix-run/react-router/tree/HEAD/packages/react-router-dom).


Updates `react-router` from 7.4.1 to 7.5.2
- [Release notes](https://github.com/remix-run/react-router/releases)
- [Changelog](https://github.com/remix-run/react-router/blob/main/packages/react-router/CHANGELOG.md)
- [Commits](https://github.com/remix-run/react-router/commits/react-router@7.5.2/packages/react-router)

Updates `react-router-dom` from 7.4.1 to 7.5.2
- [Release notes](https://github.com/remix-run/react-router/releases)
- [Changelog](https://github.com/remix-run/react-router/blob/main/packages/react-router-dom/CHANGELOG.md)
- [Commits](https://github.com/remix-run/react-router/commits/react-router-dom@7.5.2/packages/react-router-dom)

---
updated-dependencies:
- dependency-name: react-router
  dependency-version: 7.5.2
  dependency-type: direct:production
  dependency-group: npm_and_yarn
- dependency-name: react-router-dom
  dependency-version: 7.5.2
  dependency-type: direct:production
  dependency-group: npm_and_yarn
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-04-25 09:14:39 +02:00
1965d93f25 Dhfs-app: handle empty op push correctly 2025-04-24 22:38:01 +02:00
f6685f45f9 Dhfs-app: increase sync timeout 2025-04-24 22:34:04 +02:00
060ab1767d Dhfs-app: don't crash too late (never) 2025-04-24 19:46:41 +02:00
89d87095c8 Dhfs-app: class level parallel docker tests
otherwise the logs are unreadable
2025-04-24 17:42:24 +02:00
7425c1f312 Dhfs-app: add failed to connect logs 2025-04-24 17:40:38 +02:00
428eca325f Dhfs-app: increase lazyfs start timeout 2025-04-24 17:19:41 +02:00
005bc35496 Dhfs-app: assume lazyfs doesn't crash too early 2025-04-24 16:32:25 +02:00
6685575ca5 Dhfs-app: please work now 2025-04-24 15:48:57 +02:00
1ae813aacd Sync-base: cleanup old grpc channels 2025-04-24 15:46:23 +02:00
e81671251a Dhfs-fs: nevermind, no sleep 2025-04-24 15:30:18 +02:00
add26bb156 Dhfs-fs: larger sleep 2025-04-24 15:23:31 +02:00
4060045f15 Dhfs-fs: no need for removing/adding chunk logs now 2025-04-24 15:14:18 +02:00
75b484d5b2 Dhfs-app: fix asyncFence if there were no transactions 2025-04-24 15:14:03 +02:00
1d9dc8ed4d Dhfs-app: lazyfs test fixes 2025-04-24 15:13:45 +02:00
7a85704862 Dhfs-app: stopatlevel1 for docker tests 2025-04-24 14:14:52 +02:00
219 changed files with 1731 additions and 1594 deletions

View File

@@ -14,6 +14,9 @@ Syncthing and allowing you to stream your files like Google Drive File Stream
This is a simple wrapper around the jar/web ui distribution that allows you to run/stop
the DHFS server in the background, and update itself (hopefully!)
## How to use it and how it works?
## How to use it?
TODO 😁
Unpack the run-wrapper and run the `run` script. The filesystem should be mounted to the `fuse` folder in the run-wrapper root directory.
Then, a web interface will be available at `losthost:8080`, that can be used to connect with other peers.

View File

@@ -1,16 +1,17 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="Main 2" type="QsApplicationConfigurationType" factoryName="QuarkusApplication">
<option name="MAIN_CLASS_NAME" value="com.usatiuk.dhfs.app.Main" />
<module name="dhfs-app" />
<option name="VM_PARAMETERS" value="-XX:+UnlockDiagnosticVMOptions -XX:+DebugNonSafepoints --add-exports java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-exports java.base/jdk.internal.access=ALL-UNNAMED -ea -Xmx2G -Ddhfs.webui.root=$ProjectFileDir$/../webui/dist -Ddhfs.fuse.root=${HOME}/dhfs_test/2/fuse -Ddhfs.objects.persistence.files.root=${HOME}/dhfs_test/2/data -Ddhfs.objects.persistence.stuff.root=${HOME}/dhfs_test/2/data/stuff -Ddhfs.objects.peerdiscovery.broadcast=false -Dquarkus.http.port=9020 -Dquarkus.http.ssl-port=9021 -Ddhfs.peerdiscovery.preset-uuid=22000000-0000-0000-0000-000000000000 -Ddhfs.peerdiscovery.static-peers=11000000-0000-0000-0000-000000000000:127.0.0.1:8080:9011" />
<extension name="coverage">
<pattern>
<option name="PATTERN" value="com.usatiuk.dhfs.*" />
<option name="ENABLED" value="true" />
</pattern>
</extension>
<method v="2">
<option name="Make" enabled="true" />
</method>
</configuration>
<configuration default="false" name="Main 2" type="QsApplicationConfigurationType" factoryName="QuarkusApplication">
<option name="MAIN_CLASS_NAME" value="com.usatiuk.dhfsapp.Main"/>
<module name="dhfs-app"/>
<option name="VM_PARAMETERS"
value="-XX:+UnlockDiagnosticVMOptions -XX:+DebugNonSafepoints --add-exports java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-exports java.base/jdk.internal.access=ALL-UNNAMED -ea -Xmx2G -Ddhfs.webui.root=$ProjectFileDir$/../webui/dist -Ddhfs.fuse.root=${HOME}/dhfs_test/2/fuse -Ddhfs.objects.persistence.files.root=${HOME}/dhfs_test/2/data -Ddhfs.objects.persistence.stuff.root=${HOME}/dhfs_test/2/data/stuff -Ddhfs.objects.peerdiscovery.broadcast=false -Dquarkus.http.port=9020 -Dquarkus.http.ssl-port=9021 -Ddhfs.peerdiscovery.preset-uuid=22000000-0000-0000-0000-000000000000 -Ddhfs.peerdiscovery.static-peers=11000000-0000-0000-0000-000000000000:127.0.0.1:9010:9011"/>
<extension name="coverage">
<pattern>
<option name="PATTERN" value="com.usatiuk.dhfs.remoteobj.*"/>
<option name="ENABLED" value="true"/>
</pattern>
</extension>
<method v="2">
<option name="Make" enabled="true"/>
</method>
</configuration>
</component>

View File

@@ -1,16 +1,18 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="Main" type="QsApplicationConfigurationType" factoryName="QuarkusApplication" nameIsGenerated="true">
<option name="MAIN_CLASS_NAME" value="com.usatiuk.dhfs.app.Main" />
<module name="dhfs-app" />
<option name="VM_PARAMETERS" value="-XX:+UnlockDiagnosticVMOptions --enable-preview -XX:+UseParallelGC -XX:+DebugNonSafepoints --add-exports java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-exports java.base/jdk.internal.access=ALL-UNNAMED -ea -Xmx2G -Ddhfs.webui.root=$ProjectFileDir$/../webui/dist -Ddhfs.fuse.root=${HOME}/dhfs_test/1/fuse -Ddhfs.objects.persistence.files.root=${HOME}/dhfs_test/1/data -Ddhfs.objects.persistence.stuff.root=${HOME}/dhfs_test/1/data/stuff -Ddhfs.objects.peerdiscovery.broadcast=true -Dquarkus.http.port=8080 -Dquarkus.http.ssl-port=9011 -Ddhfs.peerdiscovery.preset-uuid=11000000-0000-0000-0000-000000000000 -Ddhfs.peerdiscovery.static-peers=22000000-0000-0000-0000-000000000000:127.0.0.1:9020:9021 -Dquarkus.http.host=0.0.0.0" />
<extension name="coverage">
<pattern>
<option name="PATTERN" value="com.usatiuk.dhfs.*" />
<option name="ENABLED" value="true" />
</pattern>
</extension>
<method v="2">
<option name="Make" enabled="true" />
</method>
</configuration>
<configuration default="false" name="Main" type="QsApplicationConfigurationType" factoryName="QuarkusApplication"
nameIsGenerated="true">
<option name="MAIN_CLASS_NAME" value="com.usatiuk.dhfsapp.Main"/>
<module name="dhfs-app"/>
<option name="VM_PARAMETERS"
value="-XX:+UnlockDiagnosticVMOptions -XX:+UseParallelGC -XX:+DebugNonSafepoints --add-exports java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-exports java.base/jdk.internal.access=ALL-UNNAMED -ea -Xmx2G -Ddhfs.webui.root=$ProjectFileDir$/../webui/dist -Ddhfs.fuse.root=${HOME}/dhfs_test/1/fuse -Ddhfs.objects.persistence.files.root=${HOME}/dhfs_test/1/data -Ddhfs.objects.persistence.stuff.root=${HOME}/dhfs_test/1/data/stuff -Ddhfs.objects.peerdiscovery.broadcast=true -Dquarkus.http.port=9010 -Dquarkus.http.ssl-port=9011 -Ddhfs.peerdiscovery.preset-uuid=11000000-0000-0000-0000-000000000000 -Ddhfs.peerdiscovery.static-peers=22000000-0000-0000-0000-000000000000:127.0.0.1:9020:9021 -Dquarkus.http.host=0.0.0.0"/>
<extension name="coverage">
<pattern>
<option name="PATTERN" value="com.usatiuk.dhfs.remoteobj.*"/>
<option name="ENABLED" value="true"/>
</pattern>
</extension>
<method v="2">
<option name="Make" enabled="true"/>
</method>
</configuration>
</component>

View File

@@ -107,31 +107,11 @@
<artifactId>commons-math3</artifactId>
<version>3.6.1</version>
</dependency>
<dependency>
<groupId>com.usatiuk</groupId>
<artifactId>kleppmanntree</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.usatiuk.dhfs</groupId>
<artifactId>objects</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.usatiuk.dhfs</groupId>
<artifactId>dhfs-fs</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.usatiuk.dhfs</groupId>
<artifactId>dhfs-fuse</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.usatiuk.dhfs</groupId>
<artifactId>sync-base</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.usatiuk.dhfs</groupId>
<artifactId>utils</artifactId>
@@ -159,16 +139,13 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-failsafe-plugin</artifactId>
<configuration>
<forkCount>1C</forkCount>
<reuseForks>false</reuseForks>
<parallel>classes</parallel>
<systemPropertyVariables>
<junit.jupiter.execution.parallel.enabled>
true
false
</junit.jupiter.execution.parallel.enabled>
<junit.jupiter.execution.parallel.mode.default>
concurrent
</junit.jupiter.execution.parallel.mode.default>
<junit.jupiter.execution.parallel.config.dynamic.factor>
0.5
</junit.jupiter.execution.parallel.config.dynamic.factor>
<junit.platform.output.capture.stdout>true</junit.platform.output.capture.stdout>
<junit.platform.output.capture.stderr>true</junit.platform.output.capture.stderr>
</systemPropertyVariables>

View File

@@ -1,4 +1,4 @@
package com.usatiuk.dhfs.app;
package com.usatiuk.dhfsapp;
import io.quarkus.runtime.Quarkus;
import io.quarkus.runtime.QuarkusApplication;

View File

@@ -1,4 +1,4 @@
package com.usatiuk.dhfs;
package com.usatiuk.dhfsapp;
import io.quarkus.test.junit.QuarkusTestProfile;

View File

@@ -1,4 +1,4 @@
package com.usatiuk.dhfs;
package com.usatiuk.dhfsapp;
import io.quarkus.logging.Log;
import io.quarkus.runtime.ShutdownEvent;
@@ -18,18 +18,6 @@ public class TestDataCleaner {
@ConfigProperty(name = "dhfs.objects.persistence.files.root")
String tempDirectory;
void init(@Observes @Priority(1) StartupEvent event) throws IOException {
try {
purgeDirectory(Path.of(tempDirectory).toFile());
} catch (Exception ignored) {
Log.warn("Couldn't cleanup test data on init");
}
}
void shutdown(@Observes @Priority(1000000000) ShutdownEvent event) throws IOException {
purgeDirectory(Path.of(tempDirectory).toFile());
}
public static void purgeDirectory(File dir) {
try {
for (File file : Objects.requireNonNull(dir.listFiles())) {
@@ -41,4 +29,16 @@ public class TestDataCleaner {
Log.error("Couldn't purge directory " + dir, e);
}
}
void init(@Observes @Priority(1) StartupEvent event) throws IOException {
try {
purgeDirectory(Path.of(tempDirectory).toFile());
} catch (Exception ignored) {
Log.warn("Couldn't cleanup test data on init");
}
}
void shutdown(@Observes @Priority(1000000000) ShutdownEvent event) throws IOException {
purgeDirectory(Path.of(tempDirectory).toFile());
}
}

View File

@@ -1,4 +1,4 @@
package com.usatiuk.dhfs.integration;
package com.usatiuk.dhfsapp.integration;
import com.github.dockerjava.api.model.Device;
import io.quarkus.logging.Log;
@@ -67,19 +67,35 @@ public class DhfsFuseIT {
var c1curl = container1.execInContainer("/bin/sh", "-c",
"curl --header \"Content-Type: application/json\" " +
" --request PUT " +
" --data '{\"uuid\":\"" + c2uuid + "\"}' " +
" http://localhost:8080/peers-manage/known-peers");
" --data '{}' " +
" http://localhost:8080/peers-manage/known-peers/" + c2uuid);
var c2curl = container2.execInContainer("/bin/sh", "-c",
"curl --header \"Content-Type: application/json\" " +
" --request PUT " +
" --data '{\"uuid\":\"" + c1uuid + "\"}' " +
" http://localhost:8080/peers-manage/known-peers");
" --data '{}' " +
" http://localhost:8080/peers-manage/known-peers/" + c1uuid);
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
}
private void checkConsistency() {
await().atMost(45, TimeUnit.SECONDS).until(() -> {
Log.info("Listing consistency");
var cat1 = container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/*/*");
var cat2 = container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/*/*");
var ls1 = container1.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse/*/*");
var ls2 = container2.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse/*/*");
Log.info(ls1);
Log.info(cat1);
Log.info(ls2);
Log.info(cat2);
return ls1.equals(ls2) && cat1.equals(cat2);
});
}
@AfterEach
void stop() {
Stream.of(container1, container2).parallel().forEach(GenericContainer::stop);
@@ -248,8 +264,8 @@ public class DhfsFuseIT {
var c2curl = container2.execInContainer("/bin/sh", "-c",
"curl --header \"Content-Type: application/json\" " +
" --request DELETE " +
" --data '{\"uuid\":\"" + c1uuid + "\"}' " +
" http://localhost:8080/peers-manage/known-peers");
" --data '{}' " +
" http://localhost:8080/peers-manage/known-peers/" + c1uuid);
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "echo rewritten > /dhfs_test/fuse/testf1").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "echo jioadsd > /dhfs_test/fuse/newfile1").getExitCode());
@@ -264,8 +280,8 @@ public class DhfsFuseIT {
container2.execInContainer("/bin/sh", "-c",
"curl --header \"Content-Type: application/json\" " +
" --request PUT " +
" --data '{\"uuid\":\"" + c1uuid + "\"}' " +
" http://localhost:8080/peers-manage/known-peers");
" --data '{}' " +
" http://localhost:8080/peers-manage/known-peers/" + c1uuid);
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
@@ -309,6 +325,33 @@ public class DhfsFuseIT {
});
}
@Test
void dirConflictTest2() throws IOException, InterruptedException, TimeoutException {
var client = DockerClientFactory.instance().client();
client.disconnectFromNetworkCmd().withContainerId(container1.getContainerId()).withNetworkId(network.getId()).exec();
client.disconnectFromNetworkCmd().withContainerId(container2.getContainerId()).withNetworkId(network.getId()).exec();
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Lost connection to"), 60, TimeUnit.SECONDS, 1);
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Lost connection to"), 60, TimeUnit.SECONDS, 1);
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "mkdir /dhfs_test/fuse/a && echo fdsaio >> /dhfs_test/fuse/a/testf").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "mkdir /dhfs_test/fuse/a && echo exgrg >> /dhfs_test/fuse/a/testf").getExitCode());
client.connectToNetworkCmd().withContainerId(container1.getContainerId()).withNetworkId(network.getId()).exec();
client.connectToNetworkCmd().withContainerId(container2.getContainerId()).withNetworkId(network.getId()).exec();
Log.warn("Waiting for connections");
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS, 1);
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS, 1);
Log.warn("Connected");
checkConsistency();
var ls1 = container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/a*/*");
Assertions.assertTrue(ls1.getStdout().contains("fdsaio"));
Assertions.assertTrue(ls1.getStdout().contains("exgrg"));
}
@Test
void dirCycleTest() throws IOException, InterruptedException, TimeoutException {
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse").getExitCode());

View File

@@ -1,4 +1,4 @@
package com.usatiuk.dhfs.integration;
package com.usatiuk.dhfsapp.integration;
import com.github.dockerjava.api.model.Device;
import io.quarkus.logging.Log;
@@ -93,26 +93,26 @@ public class DhfsFusex3IT {
var c1curl = container1.execInContainer("/bin/sh", "-c",
"curl --header \"Content-Type: application/json\" " +
" --request PUT " +
" --data '{\"uuid\":\"" + c2uuid + "\"}' " +
" http://localhost:8080/peers-manage/known-peers");
" --data '{}' " +
" http://localhost:8080/peers-manage/known-peers/" + c2uuid);
var c2curl1 = container2.execInContainer("/bin/sh", "-c",
"curl --header \"Content-Type: application/json\" " +
" --request PUT " +
" --data '{\"uuid\":\"" + c1uuid + "\"}' " +
" http://localhost:8080/peers-manage/known-peers");
" --data '{}' " +
" http://localhost:8080/peers-manage/known-peers/" + c1uuid);
var c2curl3 = container2.execInContainer("/bin/sh", "-c",
"curl --header \"Content-Type: application/json\" " +
" --request PUT " +
" --data '{\"uuid\":\"" + c3uuid + "\"}' " +
" http://localhost:8080/peers-manage/known-peers");
" --data '{}' " +
" http://localhost:8080/peers-manage/known-peers/" + c3uuid);
var c3curl = container3.execInContainer("/bin/sh", "-c",
"curl --header \"Content-Type: application/json\" " +
" --request PUT " +
" --data '{\"uuid\":\"" + c2uuid + "\"}' " +
" http://localhost:8080/peers-manage/known-peers");
" --data '{}' " +
" http://localhost:8080/peers-manage/known-peers/" + c2uuid);
waitingConsumer3.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS, 2);
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS, 2);
@@ -193,8 +193,8 @@ public class DhfsFusex3IT {
var c3curl = container3.execInContainer("/bin/sh", "-c",
"curl --header \"Content-Type: application/json\" " +
" --request DELETE " +
" --data '{\"uuid\":\"" + c2uuid + "\"}' " +
" http://localhost:8080/peers-manage/known-peers");
" --data '{}' " +
" http://localhost:8080/peers-manage/known-peers/" + c2uuid);
Thread.sleep(10000);

View File

@@ -1,4 +1,4 @@
package com.usatiuk.dhfs.integration;
package com.usatiuk.dhfsapp.integration;
import io.quarkus.logging.Log;
import org.jetbrains.annotations.NotNull;
@@ -66,7 +66,7 @@ public class DhfsImage implements Future<String> {
.run("apt update && apt install -y libfuse2 curl gcc")
.copy("/app", "/app")
.copy("/libs", "/libs")
.cmd("java", "-ea", "-Xmx128M",
.cmd("java", "-ea", "-Xmx256M", "-XX:TieredStopAtLevel=1", "-XX:+UseParallelGC",
"--add-exports", "java.base/sun.nio.ch=ALL-UNNAMED",
"--add-exports", "java.base/jdk.internal.access=ALL-UNNAMED",
"--add-opens=java.base/java.nio=ALL-UNNAMED",
@@ -75,10 +75,10 @@ public class DhfsImage implements Future<String> {
"-Ddhfs.objects.deletion.delay=0",
"-Ddhfs.objects.deletion.can-delete-retry-delay=1000",
"-Ddhfs.objects.ref_verification=true",
"-Ddhfs.objects.write_log=true",
"-Ddhfs.objects.sync.timeout=10",
"-Ddhfs.objects.sync.timeout=30",
"-Ddhfs.objects.sync.ping.timeout=5",
"-Ddhfs.objects.reconnect_interval=1s",
"-Ddhfs.sync.cert-check=false",
"-Dquarkus.log.category.\"com.usatiuk\".level=TRACE",
"-Dquarkus.log.category.\"com.usatiuk.dhfs\".level=TRACE",
"-Dquarkus.log.category.\"com.usatiuk.objects.transaction\".level=INFO",

View File

@@ -1,7 +1,7 @@
package com.usatiuk.dhfs.integration;
package com.usatiuk.dhfsapp.integration;
import com.github.dockerjava.api.model.Device;
import com.usatiuk.dhfs.TestDataCleaner;
import com.usatiuk.dhfsapp.TestDataCleaner;
import io.quarkus.logging.Log;
import org.junit.jupiter.api.*;
import org.slf4j.LoggerFactory;
@@ -81,14 +81,14 @@ public class KillIT {
var c1curl = container1.execInContainer("/bin/sh", "-c",
"curl --header \"Content-Type: application/json\" " +
" --request PUT " +
" --data '{\"uuid\":\"" + c2uuid + "\"}' " +
" http://localhost:8080/peers-manage/known-peers");
" --data '{}' " +
" http://localhost:8080/peers-manage/known-peers/" + c2uuid);
var c2curl = container2.execInContainer("/bin/sh", "-c",
"curl --header \"Content-Type: application/json\" " +
" --request PUT " +
" --data '{\"uuid\":\"" + c1uuid + "\"}' " +
" http://localhost:8080/peers-manage/known-peers");
" --data '{}' " +
" http://localhost:8080/peers-manage/known-peers/" + c1uuid);
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
@@ -115,7 +115,7 @@ public class KillIT {
Log.info(ls2);
Log.info(cat2);
return ls1.equals(ls2) && cat1.equals(cat2);
return ls1.equals(ls2) && cat1.equals(cat2) && ls1.getExitCode() == 0 && ls2.getExitCode() == 0 && cat1.getExitCode() == 0 && cat2.getExitCode() == 0;
});
}

View File

@@ -1,4 +1,4 @@
package com.usatiuk.dhfs.integration;
package com.usatiuk.dhfsapp.integration;
import io.quarkus.logging.Log;
@@ -6,25 +6,26 @@ import java.io.*;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
public class LazyFs {
private static final String lazyFsPath;
private final String mountRoot;
private final String dataRoot;
private final String name;
private final File configFile;
private final File fifoFile;
static {
lazyFsPath = System.getProperty("lazyFsPath");
System.out.println("LazyFs Path: " + lazyFsPath);
}
private final String mountRoot;
private final String dataRoot;
private final String name;
private final File configFile;
private final File fifoFile;
private Thread errPiper;
private Thread outPiper;
private CountDownLatch startLatch;
private Process fs;
public LazyFs(String name, String mountRoot, String dataRoot) {
this.name = name;
this.mountRoot = mountRoot;
@@ -43,11 +44,6 @@ public class LazyFs {
Runtime.getRuntime().addShutdownHook(new Thread(this::stop));
}
private Thread errPiper;
private Thread outPiper;
private CountDownLatch startLatch;
private Process fs;
private String fifoPath() {
return fifoFile.getAbsolutePath();
}
@@ -133,7 +129,7 @@ public class LazyFs {
errPiper.start();
try {
if (!startLatch.await(5, TimeUnit.SECONDS))
if (!startLatch.await(30, TimeUnit.SECONDS))
throw new RuntimeException("StartLatch timed out");
} catch (InterruptedException e) {
throw new RuntimeException(e);
@@ -156,14 +152,14 @@ public class LazyFs {
"op=\"write\"\n" +
"file=\"" + mdbPath() + "\"\n" +
"persist=[1,4]\n" +
"occurrence=10");
"occurrence=3");
}
public void startTornSeq() {
start("[[injection]]\n" +
"type=\"torn-op\"\n" +
"file=\"" + mdbPath() + "\"\n" +
"occurrence=10\n" +
"occurrence=3\n" +
"parts=3 #or parts_bytes=[4096,3600,1260]\n" +
"persist=[1,3]");
}
@@ -173,7 +169,6 @@ public class LazyFs {
var cmd = "echo \"lazyfs::crash::timing=after::op=write::from_rgx=*\" > " + fifoPath();
Log.info("Running command: " + cmd);
Runtime.getRuntime().exec(new String[]{"/bin/sh", "-c", cmd}).waitFor();
stop();
} catch (Exception e) {
throw new RuntimeException(e);
}
@@ -182,11 +177,7 @@ public class LazyFs {
public void stop() {
try {
synchronized (this) {
if (fs == null) {
return;
}
Runtime.getRuntime().exec(new String[]{"/bin/sh", "-c", "fusermount3 -u " + mountRoot}).waitFor();
fs = null;
}
} catch (Exception e) {
throw new RuntimeException(e);

View File

@@ -1,14 +1,11 @@
package com.usatiuk.dhfs.integration;
package com.usatiuk.dhfsapp.integration;
import com.github.dockerjava.api.model.Device;
import com.usatiuk.dhfs.TestDataCleaner;
import com.usatiuk.dhfsapp.TestDataCleaner;
import io.quarkus.logging.Log;
import org.junit.jupiter.api.*;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.slf4j.LoggerFactory;
import org.testcontainers.DockerClientFactory;
import org.testcontainers.containers.GenericContainer;
@@ -96,14 +93,14 @@ public class LazyFsIT {
var c1curl = container1.execInContainer("/bin/sh", "-c",
"curl --header \"Content-Type: application/json\" " +
" --request PUT " +
" --data '{\"uuid\":\"" + c2uuid + "\"}' " +
" http://localhost:8080/peers-manage/known-peers");
" --data '{}' " +
" http://localhost:8080/peers-manage/known-peers/" + c2uuid);
var c2curl = container2.execInContainer("/bin/sh", "-c",
"curl --header \"Content-Type: application/json\" " +
" --request PUT " +
" --data '{\"uuid\":\"" + c1uuid + "\"}' " +
" http://localhost:8080/peers-manage/known-peers");
" --data '{}' " +
" http://localhost:8080/peers-manage/known-peers/" + c1uuid);
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
@@ -136,16 +133,10 @@ public class LazyFsIT {
+ ls2 + "\n"
+ cat2 + "\n");
return ls1.equals(ls2) && cat1.equals(cat2);
return ls1.equals(ls2) && cat1.equals(cat2) && ls1.getExitCode() == 0 && ls2.getExitCode() == 0 && cat1.getExitCode() == 0 && cat2.getExitCode() == 0;
});
}
private static enum CrashType {
CRASH,
TORN_OP,
TORN_SEQ
}
@ParameterizedTest
@EnumSource(CrashType.class)
void killTest(CrashType crashType, TestInfo testInfo) throws Exception {
@@ -163,10 +154,11 @@ public class LazyFsIT {
Thread.sleep(3000);
Log.info("Killing");
lazyFs1.crash();
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Caused by: org.lmdbjava"), 5, TimeUnit.SECONDS);
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Caused by: org.lmdbjava"), 60, TimeUnit.SECONDS);
var client = DockerClientFactory.instance().client();
client.killContainerCmd(container1.getContainerId()).exec();
container1.stop();
lazyFs1.stop();
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Lost connection to"), 60, TimeUnit.SECONDS);
Log.info("Restart");
switch (crashType) {
@@ -179,8 +171,14 @@ public class LazyFsIT {
waitingConsumer1 = new WaitingConsumer();
var loggingConsumer1 = new Slf4jLogConsumer(LoggerFactory.getLogger(LazyFsIT.class)).withPrefix("1-" + testInfo.getDisplayName());
container1.followOutput(loggingConsumer1.andThen(waitingConsumer1));
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
try {
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
} catch (TimeoutException e) {
Log.info("Failed to connect: " + testInfo.getDisplayName());
// Sometimes it doesn't get mounted properly for some reason
Assumptions.assumeTrue(false);
}
executor.submit(() -> {
try {
@@ -196,7 +194,15 @@ public class LazyFsIT {
Thread.sleep(3000);
lazyFs1.crash();
}
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Caused by: org.lmdbjava"), 5, TimeUnit.SECONDS);
try {
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Caused by: org.lmdbjava"), 60, TimeUnit.SECONDS);
} catch (TimeoutException e) {
// Sometimes crash doesn't work
Log.info("Failed to crash: " + testInfo.getDisplayName());
if (crashType.equals(CrashType.CRASH))
throw e;
Assumptions.assumeTrue(false);
}
client.killContainerCmd(container1.getContainerId()).exec();
container1.stop();
lazyFs1.stop();
@@ -214,7 +220,6 @@ public class LazyFsIT {
checkConsistency(testInfo.getDisplayName());
}
@ParameterizedTest
@EnumSource(CrashType.class)
void killTestDirs(CrashType crashType, TestInfo testInfo) throws Exception {
@@ -232,10 +237,11 @@ public class LazyFsIT {
Thread.sleep(3000);
Log.info("Killing");
lazyFs1.crash();
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Caused by: org.lmdbjava"), 5, TimeUnit.SECONDS);
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Caused by: org.lmdbjava"), 60, TimeUnit.SECONDS);
var client = DockerClientFactory.instance().client();
client.killContainerCmd(container1.getContainerId()).exec();
container1.stop();
lazyFs1.stop();
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Lost connection to"), 60, TimeUnit.SECONDS);
Log.info("Restart");
switch (crashType) {
@@ -248,8 +254,15 @@ public class LazyFsIT {
waitingConsumer1 = new WaitingConsumer();
var loggingConsumer1 = new Slf4jLogConsumer(LoggerFactory.getLogger(LazyFsIT.class)).withPrefix("1-" + testInfo.getDisplayName());
container1.followOutput(loggingConsumer1.andThen(waitingConsumer1));
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
try {
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
} catch (TimeoutException e) {
Log.info("Failed to connect: " + testInfo.getDisplayName());
// Sometimes it doesn't get mounted properly for some reason
Assumptions.assumeTrue(false);
}
executor.submit(() -> {
try {
@@ -265,7 +278,15 @@ public class LazyFsIT {
Thread.sleep(3000);
lazyFs1.crash();
}
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Caused by: org.lmdbjava"), 5, TimeUnit.SECONDS);
try {
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Caused by: org.lmdbjava"), 60, TimeUnit.SECONDS);
} catch (TimeoutException e) {
// Sometimes crash doesn't work
Log.info("Failed to crash: " + testInfo.getDisplayName());
if (crashType.equals(CrashType.CRASH))
throw e;
Assumptions.assumeTrue(false);
}
client.killContainerCmd(container1.getContainerId()).exec();
container1.stop();
lazyFs1.stop();
@@ -297,12 +318,15 @@ public class LazyFsIT {
}
});
barrier.await();
Thread.sleep(3000);
Log.info("Killing");
lazyFs2.crash();
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Caused by: org.lmdbjava"), 5, TimeUnit.SECONDS);
container1.execInContainer("/bin/sh", "-c", "touch /tmp/stopprinting1");
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Caused by: org.lmdbjava"), 60, TimeUnit.SECONDS);
var client = DockerClientFactory.instance().client();
client.killContainerCmd(container2.getContainerId()).exec();
container2.stop();
lazyFs2.stop();
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Lost connection to"), 60, TimeUnit.SECONDS);
Log.info("Restart");
switch (crashType) {
@@ -310,15 +334,20 @@ public class LazyFsIT {
case TORN_OP -> lazyFs2.startTornOp();
case TORN_SEQ -> lazyFs2.startTornSeq();
}
container1.execInContainer("/bin/sh", "-c", "touch /tmp/stopprinting1");
container2.start();
waitingConsumer2 = new WaitingConsumer();
var loggingConsumer2 = new Slf4jLogConsumer(LoggerFactory.getLogger(LazyFsIT.class)).withPrefix("2-" + testInfo.getDisplayName());
container2.followOutput(loggingConsumer2.andThen(waitingConsumer2));
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
try {
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
} catch (TimeoutException e) {
Log.info("Failed to connect: " + testInfo.getDisplayName());
// Sometimes it doesn't get mounted properly for some reason
Assumptions.assumeTrue(false);
}
var barrier2 = new CountDownLatch(1);
executor.submit(() -> {
try {
@@ -331,17 +360,25 @@ public class LazyFsIT {
});
barrier2.await();
Log.info("Killing");
Thread.sleep(3000);
if (crashType.equals(CrashType.CRASH)) {
Thread.sleep(2000);
lazyFs2.crash();
}
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Caused by: org.lmdbjava"), 30, TimeUnit.SECONDS);
container1.execInContainer("/bin/sh", "-c", "touch /tmp/stopprinting2");
try {
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Caused by: org.lmdbjava"), 60, TimeUnit.SECONDS);
} catch (TimeoutException e) {
// Sometimes crash doesn't work
Log.info("Failed to crash: " + testInfo.getDisplayName());
if (crashType.equals(CrashType.CRASH))
throw e;
Assumptions.assumeTrue(false);
}
client.killContainerCmd(container2.getContainerId()).exec();
container2.stop();
lazyFs2.stop();
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Lost connection to"), 60, TimeUnit.SECONDS);
Log.info("Restart");
container1.execInContainer("/bin/sh", "-c", "touch /tmp/stopprinting2");
lazyFs2.start();
container2.start();
@@ -354,7 +391,6 @@ public class LazyFsIT {
checkConsistency(testInfo.getDisplayName());
}
@ParameterizedTest
@EnumSource(CrashType.class)
void killTestDirs2(CrashType crashType, TestInfo testInfo) throws Exception {
@@ -372,10 +408,12 @@ public class LazyFsIT {
Thread.sleep(3000);
Log.info("Killing");
lazyFs2.crash();
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Caused by: org.lmdbjava"), 5, TimeUnit.SECONDS);
container1.execInContainer("/bin/sh", "-c", "touch /tmp/stopprinting1");
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Caused by: org.lmdbjava"), 60, TimeUnit.SECONDS);
var client = DockerClientFactory.instance().client();
client.killContainerCmd(container2.getContainerId()).exec();
container2.stop();
lazyFs2.stop();
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Lost connection to"), 60, TimeUnit.SECONDS);
Log.info("Restart");
switch (crashType) {
@@ -383,14 +421,19 @@ public class LazyFsIT {
case TORN_OP -> lazyFs2.startTornOp();
case TORN_SEQ -> lazyFs2.startTornSeq();
}
container1.execInContainer("/bin/sh", "-c", "touch /tmp/stopprinting1");
container2.start();
waitingConsumer2 = new WaitingConsumer();
var loggingConsumer2 = new Slf4jLogConsumer(LoggerFactory.getLogger(LazyFsIT.class)).withPrefix("2-" + testInfo.getDisplayName());
container2.followOutput(loggingConsumer2.andThen(waitingConsumer2));
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
try {
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
} catch (TimeoutException e) {
Log.info("Failed to connect: " + testInfo.getDisplayName());
// Sometimes it doesn't get mounted properly for some reason
Assumptions.assumeTrue(false);
}
var barrier2 = new CountDownLatch(1);
executor.submit(() -> {
@@ -403,17 +446,25 @@ public class LazyFsIT {
}
});
barrier2.await();
Thread.sleep(3000);
Log.info("Killing");
if (crashType.equals(CrashType.CRASH)) {
Thread.sleep(2000);
lazyFs2.crash();
}
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Caused by: org.lmdbjava"), 30, TimeUnit.SECONDS);
container1.execInContainer("/bin/sh", "-c", "touch /tmp/stopprinting2");
try {
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Caused by: org.lmdbjava"), 60, TimeUnit.SECONDS);
} catch (TimeoutException e) {
// Sometimes crash doesn't work
Log.info("Failed to crash: " + testInfo.getDisplayName());
if (crashType.equals(CrashType.CRASH))
throw e;
Assumptions.assumeTrue(false);
}
client.killContainerCmd(container2.getContainerId()).exec();
container2.stop();
lazyFs2.stop();
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Lost connection to"), 60, TimeUnit.SECONDS);
container1.execInContainer("/bin/sh", "-c", "touch /tmp/stopprinting2");
Log.info("Restart");
lazyFs2.start();
container2.start();
@@ -428,4 +479,11 @@ public class LazyFsIT {
}
private static enum CrashType {
CRASH,
TORN_OP,
TORN_SEQ
}
}

View File

@@ -1,4 +1,4 @@
package com.usatiuk.dhfs.integration;
package com.usatiuk.dhfsapp.integration;
import com.github.dockerjava.api.model.Device;
import org.junit.jupiter.api.*;
@@ -75,14 +75,14 @@ public class ResyncIT {
var c1curl = container1.execInContainer("/bin/sh", "-c",
"curl --header \"Content-Type: application/json\" " +
" --request PUT " +
" --data '{\"uuid\":\"" + c2uuid + "\"}' " +
" http://localhost:8080/peers-manage/known-peers");
" --data '{}' " +
" http://localhost:8080/peers-manage/known-peers/" + c2uuid);
var c2curl = container2.execInContainer("/bin/sh", "-c",
"curl --header \"Content-Type: application/json\" " +
" --request PUT " +
" --data '{\"uuid\":\"" + c1uuid + "\"}' " +
" http://localhost:8080/peers-manage/known-peers");
" --data '{}' " +
" http://localhost:8080/peers-manage/known-peers/" + c1uuid);
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
@@ -115,14 +115,14 @@ public class ResyncIT {
var c1curl = container1.execInContainer("/bin/sh", "-c",
"curl --header \"Content-Type: application/json\" " +
" --request PUT " +
" --data '{\"uuid\":\"" + c2uuid + "\"}' " +
" http://localhost:8080/peers-manage/known-peers");
" --data '{}' " +
" http://localhost:8080/peers-manage/known-peers/" + c2uuid);
var c2curl = container2.execInContainer("/bin/sh", "-c",
"curl --header \"Content-Type: application/json\" " +
" --request PUT " +
" --data '{\"uuid\":\"" + c1uuid + "\"}' " +
" http://localhost:8080/peers-manage/known-peers");
" --data '{}' " +
" http://localhost:8080/peers-manage/known-peers/" + c1uuid);
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
@@ -155,14 +155,14 @@ public class ResyncIT {
var c1curl = container1.execInContainer("/bin/sh", "-c",
"curl --header \"Content-Type: application/json\" " +
" --request PUT " +
" --data '{\"uuid\":\"" + c2uuid + "\"}' " +
" http://localhost:8080/peers-manage/known-peers");
" --data '{}' " +
" http://localhost:8080/peers-manage/known-peers/" + c2uuid);
var c2curl = container2.execInContainer("/bin/sh", "-c",
"curl --header \"Content-Type: application/json\" " +
" --request PUT " +
" --data '{\"uuid\":\"" + c1uuid + "\"}' " +
" http://localhost:8080/peers-manage/known-peers");
" --data '{}' " +
" http://localhost:8080/peers-manage/known-peers/" + c1uuid);
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);

View File

@@ -102,26 +102,11 @@
<artifactId>commons-math3</artifactId>
<version>3.6.1</version>
</dependency>
<dependency>
<groupId>com.usatiuk</groupId>
<artifactId>kleppmanntree</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.usatiuk.dhfs</groupId>
<artifactId>objects</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.usatiuk.dhfs</groupId>
<artifactId>sync-base</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.usatiuk.dhfs</groupId>
<artifactId>utils</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
</dependencies>
<build>

View File

@@ -1,9 +1,9 @@
package com.usatiuk.dhfs.files.objects;
package com.usatiuk.dhfsfs.objects;
import com.google.protobuf.ByteString;
import com.usatiuk.dhfs.JDataRemote;
import com.usatiuk.dhfs.remoteobj.JDataRemote;
import com.usatiuk.dhfs.remoteobj.JDataRemoteDto;
import com.usatiuk.objects.JObjectKey;
import com.usatiuk.dhfs.repository.JDataRemoteDto;
public record ChunkData(JObjectKey key, ByteString data) implements JDataRemote, JDataRemoteDto {
@Override

View File

@@ -1,9 +1,9 @@
package com.usatiuk.dhfs.files.objects;
package com.usatiuk.dhfsfs.objects;
import com.usatiuk.dhfs.ProtoSerializer;
import com.usatiuk.objects.JObjectKey;
import com.usatiuk.dhfs.persistence.ChunkDataP;
import com.usatiuk.dhfs.persistence.JObjectKeyP;
import com.usatiuk.objects.JObjectKey;
import jakarta.inject.Singleton;
@Singleton

View File

@@ -1,9 +1,9 @@
package com.usatiuk.dhfs.files.objects;
package com.usatiuk.dhfsfs.objects;
import com.usatiuk.dhfs.JDataRemote;
import com.usatiuk.dhfs.jmap.JMapHolder;
import com.usatiuk.dhfs.jmap.JMapLongKey;
import com.usatiuk.dhfs.repository.JDataRemoteDto;
import com.usatiuk.dhfs.remoteobj.JDataRemote;
import com.usatiuk.dhfs.remoteobj.JDataRemoteDto;
import com.usatiuk.objects.JObjectKey;
import java.util.Collection;

View File

@@ -1,8 +1,8 @@
package com.usatiuk.dhfs.files.objects;
package com.usatiuk.dhfsfs.objects;
import com.usatiuk.dhfs.JDataRemote;
import com.usatiuk.dhfs.remoteobj.JDataRemote;
import com.usatiuk.dhfs.remoteobj.JDataRemoteDto;
import com.usatiuk.objects.JObjectKey;
import com.usatiuk.dhfs.repository.JDataRemoteDto;
import org.apache.commons.lang3.tuple.Pair;
import java.util.List;

View File

@@ -1,7 +1,7 @@
package com.usatiuk.dhfs.files.objects;
package com.usatiuk.dhfsfs.objects;
import com.usatiuk.dhfs.jmap.JMapHelper;
import com.usatiuk.dhfs.repository.syncmap.DtoMapper;
import com.usatiuk.dhfs.syncmap.DtoMapper;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;

View File

@@ -1,8 +1,8 @@
package com.usatiuk.dhfs.files.objects;
package com.usatiuk.dhfsfs.objects;
import com.usatiuk.objects.JObjectKey;
import com.usatiuk.dhfs.jmap.JMapHelper;
import com.usatiuk.dhfs.jmap.JMapLongKey;
import com.usatiuk.objects.JObjectKey;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import org.apache.commons.lang3.tuple.Pair;

View File

@@ -1,8 +1,8 @@
package com.usatiuk.dhfs.files.objects;
package com.usatiuk.dhfsfs.objects;
import com.usatiuk.dhfs.ProtoSerializer;
import com.usatiuk.dhfs.persistence.FileDtoP;
import com.usatiuk.dhfs.utils.SerializationHelper;
import com.usatiuk.utils.SerializationHelper;
import jakarta.inject.Singleton;
import java.io.IOException;

View File

@@ -1,20 +1,15 @@
package com.usatiuk.dhfs.files.objects;
package com.usatiuk.dhfsfs.objects;
import com.usatiuk.dhfs.PeerId;
import com.usatiuk.dhfs.RemoteObjectDataWrapper;
import com.usatiuk.dhfs.RemoteObjectMeta;
import com.usatiuk.dhfs.RemoteTransaction;
import com.usatiuk.dhfs.files.service.DhfsFileService;
import com.usatiuk.dhfs.jkleppmanntree.JKleppmannTreeManager;
import com.usatiuk.dhfs.jkleppmanntree.structs.JKleppmannTreeNodeMetaFile;
import com.usatiuk.dhfs.jmap.JMapHelper;
import com.usatiuk.dhfs.repository.ObjSyncHandler;
import com.usatiuk.dhfs.repository.PersistentPeerDataService;
import com.usatiuk.dhfs.repository.SyncHelper;
import com.usatiuk.dhfs.peersync.PeerId;
import com.usatiuk.dhfs.peersync.PersistentPeerDataService;
import com.usatiuk.dhfs.remoteobj.*;
import com.usatiuk.dhfsfs.service.DhfsFileService;
import com.usatiuk.kleppmanntree.AlreadyExistsException;
import com.usatiuk.objects.JObjectKey;
import com.usatiuk.objects.transaction.LockingStrategy;
import com.usatiuk.objects.transaction.Transaction;
import com.usatiuk.kleppmanntree.AlreadyExistsException;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.quarkus.logging.Log;
@@ -48,11 +43,11 @@ public class FileSyncHandler implements ObjSyncHandler<File, FileDto> {
DhfsFileService fileService;
private JKleppmannTreeManager.JKleppmannTree getTreeW() {
return jKleppmannTreeManager.getTree(JObjectKey.of("fs"));
return jKleppmannTreeManager.getTree(JObjectKey.of("fs")).orElseThrow();
}
private JKleppmannTreeManager.JKleppmannTree getTreeR() {
return jKleppmannTreeManager.getTree(JObjectKey.of("fs"), LockingStrategy.OPTIMISTIC);
return jKleppmannTreeManager.getTree(JObjectKey.of("fs"), LockingStrategy.OPTIMISTIC).orElseThrow();
}
private void resolveConflict(PeerId from, JObjectKey key, PMap<PeerId, Long> receivedChangelog,

View File

@@ -0,0 +1,18 @@
package com.usatiuk.dhfsfs.objects;
import com.usatiuk.dhfs.jkleppmanntree.structs.JKleppmannTreeNodeMeta;
import com.usatiuk.objects.JObjectKey;
import java.util.Collection;
import java.util.List;
public record JKleppmannTreeNodeMetaDirectory(String name) implements JKleppmannTreeNodeMeta {
public JKleppmannTreeNodeMeta withName(String name) {
return new JKleppmannTreeNodeMetaDirectory(name);
}
@Override
public Collection<JObjectKey> collectRefsTo() {
return List.of();
}
}

View File

@@ -0,0 +1,19 @@
package com.usatiuk.dhfsfs.objects;
import com.usatiuk.dhfs.jkleppmanntree.structs.JKleppmannTreeNodeMeta;
import com.usatiuk.objects.JObjectKey;
import java.util.Collection;
import java.util.List;
public record JKleppmannTreeNodeMetaFile(String name, JObjectKey fileIno) implements JKleppmannTreeNodeMeta {
@Override
public JKleppmannTreeNodeMeta withName(String name) {
return new JKleppmannTreeNodeMetaFile(name, fileIno);
}
@Override
public Collection<JObjectKey> collectRefsTo() {
return List.of(fileIno);
}
}

View File

@@ -1,4 +1,4 @@
package com.usatiuk.dhfs.files.service;
package com.usatiuk.dhfsfs.service;
import com.google.protobuf.ByteString;
import com.google.protobuf.UnsafeByteOperations;
@@ -30,7 +30,7 @@ public interface DhfsFileService {
long size(JObjectKey fileUuid);
Optional<ByteString> read(JObjectKey fileUuid, long offset, int length);
ByteString read(JObjectKey fileUuid, long offset, int length);
Long write(JObjectKey fileUuid, long offset, ByteString data);

View File

@@ -1,27 +1,28 @@
package com.usatiuk.dhfs.files.service;
package com.usatiuk.dhfsfs.service;
import com.google.protobuf.ByteString;
import com.google.protobuf.UnsafeByteOperations;
import com.usatiuk.dhfs.JDataRemote;
import com.usatiuk.dhfs.RemoteObjectMeta;
import com.usatiuk.dhfs.RemoteTransaction;
import com.usatiuk.dhfs.files.objects.ChunkData;
import com.usatiuk.dhfs.files.objects.File;
import com.usatiuk.dhfs.jkleppmanntree.JKleppmannTreeManager;
import com.usatiuk.dhfs.jkleppmanntree.structs.JKleppmannTreeNode;
import com.usatiuk.dhfs.jkleppmanntree.structs.JKleppmannTreeNodeHolder;
import com.usatiuk.dhfs.jkleppmanntree.structs.JKleppmannTreeNodeMeta;
import com.usatiuk.dhfs.jkleppmanntree.structs.JKleppmannTreeNodeMetaDirectory;
import com.usatiuk.dhfs.jkleppmanntree.structs.JKleppmannTreeNodeMetaFile;
import com.usatiuk.dhfs.jmap.JMapEntry;
import com.usatiuk.dhfs.jmap.JMapHelper;
import com.usatiuk.dhfs.jmap.JMapLongKey;
import com.usatiuk.dhfs.utils.StatusRuntimeExceptionNoStacktrace;
import com.usatiuk.dhfs.remoteobj.JDataRemote;
import com.usatiuk.dhfs.remoteobj.RemoteObjectMeta;
import com.usatiuk.dhfs.remoteobj.RemoteTransaction;
import com.usatiuk.dhfsfs.objects.ChunkData;
import com.usatiuk.dhfsfs.objects.File;
import com.usatiuk.dhfsfs.objects.JKleppmannTreeNodeMetaDirectory;
import com.usatiuk.dhfsfs.objects.JKleppmannTreeNodeMetaFile;
import com.usatiuk.objects.JData;
import com.usatiuk.objects.JObjectKey;
import com.usatiuk.objects.iterators.IteratorStart;
import com.usatiuk.objects.transaction.LockingStrategy;
import com.usatiuk.objects.transaction.Transaction;
import com.usatiuk.objects.transaction.TransactionManager;
import com.usatiuk.utils.StatusRuntimeExceptionNoStacktrace;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.quarkus.logging.Log;
@@ -73,11 +74,11 @@ public class DhfsFileServiceImpl implements DhfsFileService {
JMapHelper jMapHelper;
private JKleppmannTreeManager.JKleppmannTree getTreeW() {
return jKleppmannTreeManager.getTree(JObjectKey.of("fs"));
return jKleppmannTreeManager.getTree(JObjectKey.of("fs"), () -> new JKleppmannTreeNodeMetaDirectory(""));
}
private JKleppmannTreeManager.JKleppmannTree getTreeR() {
return jKleppmannTreeManager.getTree(JObjectKey.of("fs"), LockingStrategy.OPTIMISTIC);
return jKleppmannTreeManager.getTree(JObjectKey.of("fs"), LockingStrategy.OPTIMISTIC, () -> new JKleppmannTreeNodeMetaDirectory(""));
}
private ChunkData createChunk(ByteString bytes) {
@@ -94,21 +95,21 @@ public class DhfsFileServiceImpl implements DhfsFileService {
private JKleppmannTreeNode getDirEntryW(String name) {
var res = getTreeW().traverse(StreamSupport.stream(Path.of(name).spliterator(), false).map(p -> p.toString()).toList());
if (res == null) throw new StatusRuntimeExceptionNoStacktrace(Status.NOT_FOUND);
var ret = curTx.get(JKleppmannTreeNode.class, res).orElseThrow(() -> new StatusRuntimeException(Status.NOT_FOUND.withDescription("Tree node exists but not found as jObject: " + name)));
var ret = curTx.get(JKleppmannTreeNodeHolder.class, res).map(JKleppmannTreeNodeHolder::node).orElseThrow(() -> new StatusRuntimeException(Status.NOT_FOUND.withDescription("Tree node exists but not found as jObject: " + name)));
return ret;
}
private JKleppmannTreeNode getDirEntryR(String name) {
var res = getTreeR().traverse(StreamSupport.stream(Path.of(name).spliterator(), false).map(p -> p.toString()).toList());
if (res == null) throw new StatusRuntimeExceptionNoStacktrace(Status.NOT_FOUND);
var ret = curTx.get(JKleppmannTreeNode.class, res).orElseThrow(() -> new StatusRuntimeException(Status.NOT_FOUND.withDescription("Tree node exists but not found as jObject: " + name)));
var ret = curTx.get(JKleppmannTreeNodeHolder.class, res).map(JKleppmannTreeNodeHolder::node).orElseThrow(() -> new StatusRuntimeException(Status.NOT_FOUND.withDescription("Tree node exists but not found as jObject: " + name)));
return ret;
}
private Optional<JKleppmannTreeNode> getDirEntryOpt(String name) {
var res = getTreeW().traverse(StreamSupport.stream(Path.of(name).spliterator(), false).map(p -> p.toString()).toList());
if (res == null) return Optional.empty();
var ret = curTx.get(JKleppmannTreeNode.class, res);
var ret = curTx.get(JKleppmannTreeNodeHolder.class, res).map(JKleppmannTreeNodeHolder::node);
return ret;
}
@@ -125,7 +126,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
} else {
throw new StatusRuntimeException(Status.DATA_LOSS.withDescription("FsNode is not an FsNode: " + ref.key()));
}
} else if (ref instanceof JKleppmannTreeNode) {
} else if (ref instanceof JKleppmannTreeNodeHolder) {
ret = new GetattrRes(100, 100, 0700, GetattrType.DIRECTORY);
} else {
throw new StatusRuntimeException(Status.DATA_LOSS.withDescription("FsNode is not an FsNode: " + ref.key()));
@@ -140,7 +141,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
try {
var ret = getDirEntryR(name);
return switch (ret.meta()) {
case JKleppmannTreeNodeMetaFile f -> Optional.of(f.getFileIno());
case JKleppmannTreeNodeMetaFile f -> Optional.of(f.fileIno());
case JKleppmannTreeNodeMetaDirectory f -> Optional.of(ret.key());
default -> Optional.empty();
};
@@ -189,7 +190,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
return jObjectTxManager.executeTx(() -> {
return getTreeW().findParent(w -> {
if (w.meta() instanceof JKleppmannTreeNodeMetaFile f)
return f.getFileIno().equals(ino);
return f.fileIno().equals(ino);
return false;
});
});
@@ -242,7 +243,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
return jObjectTxManager.executeTx(() -> {
var dent = curTx.get(JData.class, uuid).orElseThrow(() -> new StatusRuntimeExceptionNoStacktrace(Status.NOT_FOUND));
if (dent instanceof JKleppmannTreeNode) {
if (dent instanceof JKleppmannTreeNodeHolder) {
return true;
} else if (dent instanceof RemoteObjectMeta) {
var remote = remoteTx.getData(JDataRemote.class, uuid).orElse(null);
@@ -271,7 +272,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
}
@Override
public Optional<ByteString> read(JObjectKey fileUuid, long offset, int length) {
public ByteString read(JObjectKey fileUuid, long offset, int length) {
return jObjectTxManager.executeTx(() -> {
if (length < 0)
throw new StatusRuntimeException(Status.INVALID_ARGUMENT.withDescription("Length should be more than zero: " + length));
@@ -281,12 +282,12 @@ public class DhfsFileServiceImpl implements DhfsFileService {
var file = remoteTx.getData(File.class, fileUuid).orElse(null);
if (file == null) {
Log.error("File not found when trying to read: " + fileUuid);
return Optional.empty();
throw new StatusRuntimeException(Status.NOT_FOUND.withDescription("File not found when trying to read: " + fileUuid));
}
try (var it = jMapHelper.getIterator(file, IteratorStart.LE, JMapLongKey.of(offset))) {
if (!it.hasNext())
return Optional.of(ByteString.empty());
return ByteString.empty();
// if (it.peekNextKey().key() != offset) {
// Log.warnv("Read over the end of file: {0} {1} {2}, next chunk: {3}", fileUuid, offset, length, it.peekNextKey());
@@ -324,10 +325,10 @@ public class DhfsFileServiceImpl implements DhfsFileService {
chunk = it.next();
}
return Optional.of(buf);
return buf;
} catch (Exception e) {
Log.error("Error reading file: " + fileUuid, e);
return Optional.empty();
throw new StatusRuntimeException(Status.INTERNAL.withDescription("Error reading file: " + fileUuid));
}
});
}
@@ -444,12 +445,12 @@ public class DhfsFileServiceImpl implements DhfsFileService {
}
for (var e : removedChunks.entrySet()) {
Log.tracev("Removing chunk {0}-{1}", e.getKey(), e.getValue());
// Log.tracev("Removing chunk {0}-{1}", e.getKey(), e.getValue());
jMapHelper.delete(file, JMapLongKey.of(e.getKey()));
}
for (var e : newChunks.entrySet()) {
Log.tracev("Adding chunk {0}-{1}", e.getKey(), e.getValue());
// Log.tracev("Adding chunk {0}-{1}", e.getKey(), e.getValue());
jMapHelper.put(file, JMapLongKey.of(e.getKey()), e.getValue());
}
@@ -535,12 +536,12 @@ public class DhfsFileServiceImpl implements DhfsFileService {
// file = file.withChunks(file.chunks().minusAll(removedChunks.keySet()).plusAll(newChunks)).withMTime(System.currentTimeMillis());
for (var e : removedChunks.entrySet()) {
Log.tracev("Removing chunk {0}-{1}", e.getKey(), e.getValue());
// Log.tracev("Removing chunk {0}-{1}", e.getKey(), e.getValue());
jMapHelper.delete(file, JMapLongKey.of(e.getKey()));
}
for (var e : newChunks.entrySet()) {
Log.tracev("Adding chunk {0}-{1}", e.getKey(), e.getValue());
// Log.tracev("Adding chunk {0}-{1}", e.getKey(), e.getValue());
jMapHelper.put(file, JMapLongKey.of(e.getKey()), e.getValue());
}
@@ -595,7 +596,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
public ByteString readlinkBS(JObjectKey uuid) {
return jObjectTxManager.executeTx(() -> {
var fileOpt = remoteTx.getData(File.class, uuid).orElseThrow(() -> new StatusRuntimeException(Status.NOT_FOUND.withDescription("File not found when trying to readlink: " + uuid)));
return read(uuid, 0, Math.toIntExact(size(uuid))).get();
return read(uuid, 0, Math.toIntExact(size(uuid)));
});
}
@@ -628,7 +629,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
var dent = curTx.get(JData.class, fileUuid).orElseThrow(() -> new StatusRuntimeExceptionNoStacktrace(Status.NOT_FOUND));
// FIXME:
if (dent instanceof JKleppmannTreeNode) {
if (dent instanceof JKleppmannTreeNodeHolder) {
return true;
} else if (dent instanceof RemoteObjectMeta) {
var remote = remoteTx.getData(JDataRemote.class, fileUuid).orElse(null);

View File

@@ -1,4 +1,4 @@
package com.usatiuk.dhfs.files.service;
package com.usatiuk.dhfsfs.service;
public class DirectoryNotEmptyException extends RuntimeException {
@Override

View File

@@ -1,4 +1,4 @@
package com.usatiuk.dhfs.files.service;
package com.usatiuk.dhfsfs.service;
public record GetattrRes(long mtime, long ctime, long mode, GetattrType type) {
}

View File

@@ -1,4 +1,4 @@
package com.usatiuk.dhfs.files.service;
package com.usatiuk.dhfsfs.service;
public enum GetattrType {
FILE,

View File

@@ -1,4 +1,4 @@
package com.usatiuk.dhfs.files;
package com.usatiuk.dhfsfs;
import io.quarkus.test.junit.QuarkusTest;
import io.quarkus.test.junit.TestProfile;

View File

@@ -1,9 +1,8 @@
package com.usatiuk.dhfs.files;
package com.usatiuk.dhfsfs;
import com.usatiuk.dhfs.RemoteTransaction;
import com.usatiuk.dhfs.TempDataProfile;
import com.usatiuk.dhfs.files.objects.File;
import com.usatiuk.dhfs.files.service.DhfsFileService;
import com.usatiuk.dhfs.remoteobj.RemoteTransaction;
import com.usatiuk.dhfsfs.objects.File;
import com.usatiuk.dhfsfs.service.DhfsFileService;
import com.usatiuk.kleppmanntree.AlreadyExistsException;
import com.usatiuk.objects.transaction.Transaction;
import com.usatiuk.objects.transaction.TransactionManager;
@@ -90,7 +89,7 @@ public abstract class DhfsFileServiceSimpleTestImpl {
// for (int start = 0; start < all.length(); start++) {
// for (int end = start; end <= all.length(); end++) {
// var read = fileService.read(fuuid.toString(), start, end - start);
// Assertions.assertArrayEquals(all.substring(start, end).getBytes(), read.get().toByteArray());
// Assertions.assertArrayEquals(all.substring(start, end).getBytes(), read.toByteArray());
// }
// }
// }
@@ -113,20 +112,20 @@ public abstract class DhfsFileServiceSimpleTestImpl {
var curMtime = fileService.getattr(uuid).get().mtime();
fileService.write(uuid, 0, new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9});
Assertions.assertArrayEquals(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, fileService.read(uuid, 0, 10).get().toByteArray());
Assertions.assertArrayEquals(new byte[]{2, 3, 4, 5, 6, 7, 8, 9}, fileService.read(uuid, 2, 8).get().toByteArray());
Assertions.assertArrayEquals(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, fileService.read(uuid, 0, 10).toByteArray());
Assertions.assertArrayEquals(new byte[]{2, 3, 4, 5, 6, 7, 8, 9}, fileService.read(uuid, 2, 8).toByteArray());
fileService.write(uuid, 4, new byte[]{10, 11, 12});
Assertions.assertArrayEquals(new byte[]{0, 1, 2, 3, 10, 11, 12, 7, 8, 9}, fileService.read(uuid, 0, 10).get().toByteArray());
Assertions.assertArrayEquals(new byte[]{0, 1, 2, 3, 10, 11, 12, 7, 8, 9}, fileService.read(uuid, 0, 10).toByteArray());
fileService.write(uuid, 10, new byte[]{13, 14});
Assertions.assertArrayEquals(new byte[]{0, 1, 2, 3, 10, 11, 12, 7, 8, 9, 13, 14}, fileService.read(uuid, 0, 12).get().toByteArray());
Assertions.assertArrayEquals(new byte[]{0, 1, 2, 3, 10, 11, 12, 7, 8, 9, 13, 14}, fileService.read(uuid, 0, 12).toByteArray());
fileService.write(uuid, 6, new byte[]{15, 16});
Assertions.assertArrayEquals(new byte[]{0, 1, 2, 3, 10, 11, 15, 16, 8, 9, 13, 14}, fileService.read(uuid, 0, 12).get().toByteArray());
Assertions.assertArrayEquals(new byte[]{0, 1, 2, 3, 10, 11, 15, 16, 8, 9, 13, 14}, fileService.read(uuid, 0, 12).toByteArray());
fileService.write(uuid, 3, new byte[]{17, 18});
Assertions.assertArrayEquals(new byte[]{0, 1, 2, 17, 18, 11, 15, 16, 8, 9, 13, 14}, fileService.read(uuid, 0, 12).get().toByteArray());
Assertions.assertArrayEquals(new byte[]{0, 1, 2, 17, 18, 11, 15, 16, 8, 9, 13, 14}, fileService.read(uuid, 0, 12).toByteArray());
var newMtime = fileService.getattr(uuid).get().mtime();
Assertions.assertTrue(newMtime > curMtime);
fileService.unlink("/writeTest");
Assertions.assertFalse(fileService.open("/writeTest").isPresent());
}
@@ -139,7 +138,7 @@ public abstract class DhfsFileServiceSimpleTestImpl {
var uuid = ret.get();
fileService.write(uuid, 0, new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9});
Assertions.assertArrayEquals(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, fileService.read(uuid, 0, 10).get().toByteArray());
Assertions.assertArrayEquals(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, fileService.read(uuid, 0, 10).toByteArray());
fileService.unlink("/removeTest");
Assertions.assertFalse(fileService.open("/removeTest").isPresent());
@@ -153,12 +152,12 @@ public abstract class DhfsFileServiceSimpleTestImpl {
var uuid = ret.get();
fileService.write(uuid, 0, new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9});
Assertions.assertArrayEquals(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, fileService.read(uuid, 0, 10).get().toByteArray());
Assertions.assertArrayEquals(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, fileService.read(uuid, 0, 10).toByteArray());
fileService.truncate(uuid, 20);
Assertions.assertArrayEquals(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, fileService.read(uuid, 0, 20).get().toByteArray());
Assertions.assertArrayEquals(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, fileService.read(uuid, 0, 20).toByteArray());
fileService.write(uuid, 5, new byte[]{10, 11, 12, 13, 14, 15, 16, 17});
Assertions.assertArrayEquals(new byte[]{0, 1, 2, 3, 4, 10, 11, 12, 13, 14, 15, 16, 17, 0, 0, 0, 0, 0, 0, 0}, fileService.read(uuid, 0, 20).get().toByteArray());
Assertions.assertArrayEquals(new byte[]{0, 1, 2, 3, 4, 10, 11, 12, 13, 14, 15, 16, 17, 0, 0, 0, 0, 0, 0, 0}, fileService.read(uuid, 0, 20).toByteArray());
}
@RepeatedTest(100)
@@ -170,12 +169,12 @@ public abstract class DhfsFileServiceSimpleTestImpl {
var uuid = ret.get();
fileService.write(uuid, 0, new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9});
Assertions.assertArrayEquals(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, fileService.read(uuid, 0, 10).get().toByteArray());
Assertions.assertArrayEquals(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, fileService.read(uuid, 0, 10).toByteArray());
fileService.truncate(uuid, 20);
Assertions.assertArrayEquals(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, fileService.read(uuid, 0, 20).get().toByteArray());
Assertions.assertArrayEquals(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, fileService.read(uuid, 0, 20).toByteArray());
fileService.write(uuid, 10, new byte[]{11, 12, 13, 14, 15, 16, 17, 18, 19, 20});
Assertions.assertArrayEquals(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20}, fileService.read(uuid, 0, 20).get().toByteArray());
Assertions.assertArrayEquals(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20}, fileService.read(uuid, 0, 20).toByteArray());
} finally {
fileService.unlink("/truncateTest2");
}
@@ -189,10 +188,10 @@ public abstract class DhfsFileServiceSimpleTestImpl {
var uuid = ret.get();
fileService.write(uuid, 0, new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9});
Assertions.assertArrayEquals(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, fileService.read(uuid, 0, 10).get().toByteArray());
Assertions.assertArrayEquals(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, fileService.read(uuid, 0, 10).toByteArray());
fileService.truncate(uuid, 7);
Assertions.assertArrayEquals(new byte[]{0, 1, 2, 3, 4, 5, 6,}, fileService.read(uuid, 0, 20).get().toByteArray());
Assertions.assertArrayEquals(new byte[]{0, 1, 2, 3, 4, 5, 6,}, fileService.read(uuid, 0, 20).toByteArray());
}
@Test
@@ -202,14 +201,14 @@ public abstract class DhfsFileServiceSimpleTestImpl {
var uuid = ret.get();
fileService.write(uuid, 0, new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9});
Assertions.assertArrayEquals(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, fileService.read(uuid, 0, 10).get().toByteArray());
Assertions.assertArrayEquals(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, fileService.read(uuid, 0, 10).toByteArray());
Assertions.assertTrue(fileService.rename("/moveTest", "/movedTest"));
Assertions.assertFalse(fileService.open("/moveTest").isPresent());
Assertions.assertTrue(fileService.open("/movedTest").isPresent());
Assertions.assertArrayEquals(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9},
fileService.read(fileService.open("/movedTest").get(), 0, 10).get().toByteArray());
fileService.read(fileService.open("/movedTest").get(), 0, 10).toByteArray());
}
@Test
@@ -222,9 +221,9 @@ public abstract class DhfsFileServiceSimpleTestImpl {
var uuid2 = ret2.get();
fileService.write(uuid, 0, new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9});
Assertions.assertArrayEquals(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, fileService.read(uuid, 0, 10).get().toByteArray());
Assertions.assertArrayEquals(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, fileService.read(uuid, 0, 10).toByteArray());
fileService.write(uuid2, 0, new byte[]{11, 12, 13, 14, 15, 16, 17, 18, 19, 29});
Assertions.assertArrayEquals(new byte[]{11, 12, 13, 14, 15, 16, 17, 18, 19, 29}, fileService.read(uuid2, 0, 10).get().toByteArray());
Assertions.assertArrayEquals(new byte[]{11, 12, 13, 14, 15, 16, 17, 18, 19, 29}, fileService.read(uuid2, 0, 10).toByteArray());
jObjectTxManager.run(() -> {
@@ -238,7 +237,7 @@ public abstract class DhfsFileServiceSimpleTestImpl {
Assertions.assertTrue(fileService.open("/moveOverTest2").isPresent());
Assertions.assertArrayEquals(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9},
fileService.read(fileService.open("/moveOverTest2").get(), 0, 10).get().toByteArray());
fileService.read(fileService.open("/moveOverTest2").get(), 0, 10).toByteArray());
// await().atMost(5, TimeUnit.SECONDS).until(() -> {
// jObjectTxManager.run(() -> {
@@ -256,8 +255,8 @@ public abstract class DhfsFileServiceSimpleTestImpl {
var uuid = ret.get();
fileService.write(uuid, 0, new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9});
Assertions.assertArrayEquals(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, fileService.read(uuid, 0, 10).get().toByteArray());
Assertions.assertArrayEquals(new byte[]{}, fileService.read(uuid, 20, 10).get().toByteArray());
Assertions.assertArrayEquals(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, fileService.read(uuid, 0, 10).toByteArray());
Assertions.assertArrayEquals(new byte[]{}, fileService.read(uuid, 20, 10).toByteArray());
}
@Test
@@ -267,13 +266,13 @@ public abstract class DhfsFileServiceSimpleTestImpl {
var uuid = ret.get();
fileService.write(uuid, 0, new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9});
Assertions.assertArrayEquals(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, fileService.read(uuid, 0, 10).get().toByteArray());
Assertions.assertArrayEquals(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, fileService.read(uuid, 0, 10).toByteArray());
fileService.write(uuid, 20, new byte[]{10, 11, 12, 13, 14, 15, 16, 17, 18, 19});
Assertions.assertArrayEquals(new byte[]{
0, 1, 2, 3, 4, 5, 6, 7, 8, 9,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
10, 11, 12, 13, 14, 15, 16, 17, 18, 19
}, fileService.read(uuid, 0, 30).get().toByteArray());
}, fileService.read(uuid, 0, 30).toByteArray());
}
@Test
@@ -283,7 +282,7 @@ public abstract class DhfsFileServiceSimpleTestImpl {
var uuid = ret.get();
fileService.write(uuid, 0, new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9});
Assertions.assertArrayEquals(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, fileService.read(uuid, 0, 10).get().toByteArray());
Assertions.assertArrayEquals(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, fileService.read(uuid, 0, 10).toByteArray());
// var oldfile = jObjectManager.get(uuid).orElseThrow(IllegalStateException::new);
// var chunk = oldfile.runReadLocked(JObjectManager.ResolutionStrategy.LOCAL_ONLY, (m, d) -> d.extractRefs()).stream().toList().get(0);
@@ -298,6 +297,6 @@ public abstract class DhfsFileServiceSimpleTestImpl {
Assertions.assertTrue(fileService.open("/movedTest2").isPresent());
Assertions.assertArrayEquals(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9},
fileService.read(fileService.open("/movedTest2").get(), 0, 10).get().toByteArray());
fileService.read(fileService.open("/movedTest2").get(), 0, 10).toByteArray());
}
}

View File

@@ -1,4 +1,4 @@
package com.usatiuk.dhfs.files;
package com.usatiuk.dhfsfs;
import io.quarkus.test.junit.QuarkusTest;
import io.quarkus.test.junit.TestProfile;

View File

@@ -1,4 +1,4 @@
package com.usatiuk.dhfs.files;
package com.usatiuk.dhfsfs;
import io.quarkus.test.junit.QuarkusTest;
import io.quarkus.test.junit.TestProfile;

View File

@@ -1,4 +1,4 @@
package com.usatiuk.dhfs;
package com.usatiuk.dhfsfs;
import io.quarkus.test.junit.QuarkusTestProfile;

View File

@@ -1,4 +1,4 @@
package com.usatiuk.dhfs;
package com.usatiuk.dhfsfs;
import io.quarkus.logging.Log;
import io.quarkus.runtime.ShutdownEvent;

View File

@@ -1,4 +1,4 @@
package com.usatiuk.dhfs.benchmarks;
package com.usatiuk.dhfsfs.benchmarks;
import io.quarkus.logging.Log;
import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;

View File

@@ -1,8 +1,8 @@
package com.usatiuk.dhfs.benchmarks;
package com.usatiuk.dhfsfs.benchmarks;
import com.google.protobuf.UnsafeByteOperations;
import com.usatiuk.dhfs.TempDataProfile;
import com.usatiuk.dhfs.files.service.DhfsFileService;
import com.usatiuk.dhfsfs.TempDataProfile;
import com.usatiuk.dhfsfs.service.DhfsFileService;
import com.usatiuk.objects.JObjectKey;
import io.quarkus.test.junit.QuarkusTest;
import io.quarkus.test.junit.TestProfile;

View File

@@ -107,26 +107,11 @@
<artifactId>commons-math3</artifactId>
<version>3.6.1</version>
</dependency>
<dependency>
<groupId>com.usatiuk</groupId>
<artifactId>kleppmanntree</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.usatiuk.dhfs</groupId>
<artifactId>objects</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.usatiuk.dhfs</groupId>
<artifactId>dhfs-fs</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.usatiuk.dhfs</groupId>
<artifactId>sync-base</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.usatiuk.dhfs</groupId>
<artifactId>utils</artifactId>

View File

@@ -1,10 +1,10 @@
package com.usatiuk.dhfs.fuse;
package com.usatiuk.dhfsfuse;
import com.google.protobuf.UnsafeByteOperations;
import com.sun.security.auth.module.UnixSystem;
import com.usatiuk.dhfs.files.service.DhfsFileService;
import com.usatiuk.dhfs.files.service.DirectoryNotEmptyException;
import com.usatiuk.dhfs.files.service.GetattrRes;
import com.usatiuk.dhfsfs.service.DhfsFileService;
import com.usatiuk.dhfsfs.service.DirectoryNotEmptyException;
import com.usatiuk.dhfsfs.service.GetattrRes;
import com.usatiuk.kleppmanntree.AlreadyExistsException;
import com.usatiuk.objects.JObjectKey;
import io.grpc.Status;
@@ -40,6 +40,8 @@ import static jnr.posix.FileStat.*;
public class DhfsFuse extends FuseStubFS {
private static final int blksize = 1048576;
private static final int iosize = 1048576;
private final ConcurrentHashMap<Long, JObjectKey> _openHandles = new ConcurrentHashMap<>();
private final AtomicLong _fh = new AtomicLong(1);
@ConfigProperty(name = "dhfs.fuse.root")
String root;
@ConfigProperty(name = "dhfs.fuse.enabled")
@@ -53,9 +55,6 @@ public class DhfsFuse extends FuseStubFS {
@Inject
DhfsFileService fileService;
private final ConcurrentHashMap<Long, JObjectKey> _openHandles = new ConcurrentHashMap<>();
private final AtomicLong _fh = new AtomicLong(1);
private long allocateHandle(JObjectKey key) {
while (true) {
var newFh = _fh.getAndIncrement();
@@ -73,40 +72,48 @@ public class DhfsFuse extends FuseStubFS {
void init(@Observes @Priority(100000) StartupEvent event) {
if (!enabled) return;
Paths.get(root).toFile().mkdirs();
if (!Paths.get(root).toFile().isDirectory())
throw new IllegalStateException("Could not create directory " + root);
Log.info("Mounting with root " + root);
var uid = new UnixSystem().getUid();
var gid = new UnixSystem().getGid();
var opts = new ArrayList<String>();
// Assuming macFuse
if (SystemUtils.IS_OS_MAC) {
if (SystemUtils.IS_OS_WINDOWS) {
opts.add("-o");
opts.add("iosize=" + iosize);
} else if (SystemUtils.IS_OS_LINUX) {
// FIXME: There's something else missing: the writes still seem to be 32k max
opts.add("auto_cache");
opts.add("-o");
opts.add("uid=-1");
opts.add("-o");
opts.add("gid=-1");
} else {
Paths.get(root).toFile().mkdirs();
if (!Paths.get(root).toFile().isDirectory())
throw new IllegalStateException("Could not create directory " + root);
var uid = new UnixSystem().getUid();
var gid = new UnixSystem().getGid();
// Assuming macFuse
if (SystemUtils.IS_OS_MAC) {
opts.add("-o");
opts.add("iosize=" + iosize);
} else if (SystemUtils.IS_OS_LINUX) {
// FIXME: There's something else missing: the writes still seem to be 32k max
// opts.add("-o");
// opts.add("large_read");
opts.add("-o");
opts.add("big_writes");
opts.add("-o");
opts.add("max_read=" + iosize);
opts.add("-o");
opts.add("max_write=" + iosize);
}
opts.add("-o");
opts.add("big_writes");
opts.add("auto_cache");
opts.add("-o");
opts.add("max_read=" + iosize);
opts.add("uid=" + uid);
opts.add("-o");
opts.add("max_write=" + iosize);
opts.add("gid=" + gid);
}
opts.add("-o");
opts.add("auto_cache");
opts.add("-o");
opts.add("uid=" + uid);
opts.add("-o");
opts.add("gid=" + gid);
mount(Paths.get(root), false, debug, opts.toArray(String[]::new));
}
@@ -224,8 +231,8 @@ public class DhfsFuse extends FuseStubFS {
var fileKey = getFromHandle(fi.fh.get());
var read = fileService.read(fileKey, offset, (int) size);
if (read.isEmpty()) return 0;
UnsafeByteOperations.unsafeWriteTo(read.get(), new JnrPtrByteOutput(jnrPtrByteOutputAccessors, buf, size));
return read.get().size();
UnsafeByteOperations.unsafeWriteTo(read, new JnrPtrByteOutput(jnrPtrByteOutputAccessors, buf, size));
return read.size();
} catch (Throwable e) {
Log.error("When reading " + path, e);
return -ErrorCodes.EIO();

View File

@@ -1,4 +1,4 @@
package com.usatiuk.dhfs.fuse;
package com.usatiuk.dhfsfuse;
import com.google.protobuf.ByteOutput;
import jnr.ffi.Pointer;

View File

@@ -1,4 +1,4 @@
package com.usatiuk.dhfs.fuse;
package com.usatiuk.dhfsfuse;
import jakarta.inject.Singleton;
import jdk.internal.access.JavaNioAccess;

View File

@@ -1,6 +1,5 @@
package com.usatiuk.dhfs.fuse;
package com.usatiuk.dhfsfuse;
import com.usatiuk.dhfs.TempDataProfile;
import io.quarkus.test.junit.QuarkusTest;
import io.quarkus.test.junit.TestProfile;
import org.eclipse.microprofile.config.inject.ConfigProperty;

View File

@@ -1,4 +1,4 @@
package com.usatiuk.dhfs;
package com.usatiuk.dhfsfuse;
import io.quarkus.test.junit.QuarkusTestProfile;

View File

@@ -1,4 +1,4 @@
package com.usatiuk.dhfs;
package com.usatiuk.dhfsfuse;
import io.quarkus.logging.Log;
import io.quarkus.runtime.ShutdownEvent;

View File

@@ -1,7 +1,5 @@
package com.usatiuk.kleppmanntree;
import jakarta.annotation.Nonnull;
import jakarta.annotation.Nullable;
import org.apache.commons.lang3.tuple.Pair;
import java.util.*;
@@ -17,7 +15,6 @@ public class KleppmannTree<TimestampT extends Comparable<TimestampT>, PeerIdT ex
private final PeerInterface<PeerIdT> _peers;
private final Clock<TimestampT> _clock;
private final OpRecorder<TimestampT, PeerIdT, MetaT, NodeIdT> _opRecorder;
private HashMap<NodeIdT, TreeNode<TimestampT, PeerIdT, MetaT, NodeIdT>> _undoCtx = null;
public KleppmannTree(StorageInterface<TimestampT, PeerIdT, MetaT, NodeIdT> storage,
PeerInterface<PeerIdT> peers,
@@ -89,7 +86,6 @@ public class KleppmannTree<TimestampT extends Comparable<TimestampT>, PeerIdT ex
node.withParent(null)
.withLastEffectiveOp(null)
);
_undoCtx.put(node.key(), node);
}
}
@@ -217,31 +213,16 @@ public class KleppmannTree<TimestampT extends Comparable<TimestampT>, PeerIdT ex
}
assert cmp != 0;
if (cmp < 0) {
try {
if (log.containsKey(op.timestamp())) return;
var toUndo = log.newestSlice(op.timestamp(), false);
_undoCtx = new HashMap<>();
for (var entry : toUndo.reversed()) {
undoOp(entry.getValue());
}
try {
doAndPut(op, failCreatingIfExists);
} finally {
for (var entry : toUndo) {
redoOp(entry);
}
if (!_undoCtx.isEmpty()) {
for (var e : _undoCtx.entrySet()) {
LOGGER.log(Level.FINE, "Dropping node " + e.getKey());
_storage.removeNode(e.getKey());
}
}
_undoCtx = null;
}
} finally {
tryTrimLog();
if (log.containsKey(op.timestamp())) return;
var toUndo = log.newestSlice(op.timestamp(), false);
for (var entry : toUndo.reversed()) {
undoOp(entry.getValue());
}
doAndPut(op, failCreatingIfExists);
for (var entry : toUndo) {
redoOp(entry);
}
tryTrimLog();
} else {
doAndPut(op, failCreatingIfExists);
tryTrimLog();
@@ -264,8 +245,7 @@ public class KleppmannTree<TimestampT extends Comparable<TimestampT>, PeerIdT ex
} catch (AlreadyExistsException aex) {
throw aex;
} catch (Exception e) {
LOGGER.log(Level.SEVERE, "Error computing effects for op" + op.toString(), e);
computed = new LogRecord<>(op, null);
throw new RuntimeException("Error computing effects for op " + op.toString(), e);
}
if (computed.effects() != null)
@@ -274,24 +254,6 @@ public class KleppmannTree<TimestampT extends Comparable<TimestampT>, PeerIdT ex
}
private TreeNode<TimestampT, PeerIdT, MetaT, NodeIdT> getNewNode(NodeIdT key, NodeIdT parent, MetaT meta) {
if (_undoCtx != null) {
var node = _undoCtx.get(key);
if (node != null) {
try {
if (!node.children().isEmpty()) {
LOGGER.log(Level.WARNING, "Not empty children for undone node " + key);
}
node = node.withParent(parent).withMeta(meta);
} catch (Exception e) {
LOGGER.log(Level.SEVERE, "Error while fixing up node " + key, e);
node = null;
}
}
if (node != null) {
_undoCtx.remove(key);
return node;
}
}
return _storage.createNewNode(key, parent, meta);
}
@@ -372,10 +334,6 @@ public class KleppmannTree<TimestampT extends Comparable<TimestampT>, PeerIdT ex
var conflictNode = _storage.getById(conflictNodeId);
MetaT conflictNodeMeta = conflictNode.meta();
if (Objects.equals(conflictNodeMeta, op.newMeta())) {
return new LogRecord<>(op, null);
}
LOGGER.finer(() -> "Node creation conflict: " + conflictNode);
String newConflictNodeName = op.newName() + ".conflict." + conflictNode.key();
@@ -400,18 +358,14 @@ public class KleppmannTree<TimestampT extends Comparable<TimestampT>, PeerIdT ex
if (oldMeta != null
&& op.newMeta() != null
&& !oldMeta.getClass().equals(op.newMeta().getClass())) {
LOGGER.log(Level.SEVERE, "Class mismatch for meta for node " + node.key());
return new LogRecord<>(op, null);
throw new RuntimeException("Class mismatch for meta for node " + node.key());
}
var replaceNodeId = newParent.children().get(op.newName());
if (replaceNodeId != null) {
var replaceNode = _storage.getById(replaceNodeId);
var replaceNodeMeta = replaceNode.meta();
if (Objects.equals(replaceNodeMeta, op.newMeta())) {
return new LogRecord<>(op, null);
}
LOGGER.finer(() -> "Node replacement: " + replaceNode);
return new LogRecord<>(op, List.of(

View File

@@ -10,14 +10,14 @@ public record LogEffect<TimestampT extends Comparable<TimestampT>, PeerIdT exten
NodeIdT childId) implements Serializable {
public String oldName() {
if (oldInfo.oldMeta() != null) {
return oldInfo.oldMeta().getName();
return oldInfo.oldMeta().name();
}
return childId.toString();
}
public String newName() {
if (newMeta != null) {
return newMeta.getName();
return newMeta.name();
}
return childId.toString();
}

View File

@@ -3,7 +3,7 @@ package com.usatiuk.kleppmanntree;
import java.io.Serializable;
public interface NodeMeta extends Serializable {
String getName();
String name();
NodeMeta withName(String name);
}

View File

@@ -7,7 +7,7 @@ public record OpMove<TimestampT extends Comparable<TimestampT>, PeerIdT extends
NodeIdT childId) implements Serializable {
public String newName() {
if (newMeta != null)
return newMeta.getName();
return newMeta.name();
return childId.toString();
}
}

View File

@@ -17,7 +17,7 @@ public interface TreeNode<TimestampT extends Comparable<TimestampT>, PeerIdT ext
default String name() {
var meta = meta();
if (meta != null) return meta.getName();
if (meta != null) return meta.name();
return key().toString();
}

View File

@@ -8,7 +8,7 @@ public abstract class TestNodeMeta implements NodeMeta {
}
@Override
public String getName() {
public String name() {
return _name;
}

View File

@@ -18,6 +18,11 @@
</properties>
<dependencies>
<dependency>
<groupId>org.apache.fury</groupId>
<artifactId>fury-core</artifactId>
<version>0.10.1</version>
</dependency>
<dependency>
<groupId>net.jqwik</groupId>
<artifactId>jqwik</artifactId>

View File

@@ -3,9 +3,9 @@ package com.usatiuk.objects;
import java.util.function.Supplier;
public final class JDataVersionedWrapperLazy implements JDataVersionedWrapper {
private JData _data;
private final long _version;
private final int _estimatedSize;
private JData _data;
private Supplier<JData> _producer;
public JDataVersionedWrapperLazy(long version, int estimatedSize, Supplier<JData> producer) {

View File

@@ -8,11 +8,10 @@ import jakarta.inject.Singleton;
import java.nio.ByteBuffer;
@Singleton
public class JDataVersionedWrapperSerializer implements ObjectSerializer<JDataVersionedWrapper> {
public class JDataVersionedWrapperSerializer {
@Inject
ObjectSerializer<JData> dataSerializer;
@Override
public ByteString serialize(JDataVersionedWrapper obj) {
ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES);
buffer.putLong(obj.version());
@@ -20,12 +19,10 @@ public class JDataVersionedWrapperSerializer implements ObjectSerializer<JDataVe
return ByteString.copyFrom(buffer).concat(dataSerializer.serialize(obj.data()));
}
@Override
public JDataVersionedWrapper deserialize(ByteString data) {
var version = data.substring(0, Long.BYTES).asReadOnlyByteBuffer().getLong();
var rawData = data.substring(Long.BYTES);
return new JDataVersionedWrapperLazy(version, rawData.size(),
() -> dataSerializer.deserialize(rawData)
public JDataVersionedWrapper deserialize(ByteBuffer data) {
var version = data.getLong();
return new JDataVersionedWrapperLazy(version, data.remaining(),
() -> dataSerializer.deserialize(data)
);
}
}

View File

@@ -2,26 +2,31 @@ package com.usatiuk.objects;
import com.google.protobuf.ByteString;
import com.usatiuk.dhfs.utils.SerializationHelper;
import com.google.protobuf.UnsafeByteOperations;
import io.quarkus.arc.DefaultBean;
import jakarta.enterprise.context.ApplicationScoped;
import org.apache.fury.Fury;
import org.apache.fury.ThreadSafeFury;
import org.apache.fury.config.Language;
import java.io.IOException;
import java.nio.ByteBuffer;
@ApplicationScoped
@DefaultBean
public class JavaDataSerializer implements ObjectSerializer<JData> {
private static final ThreadSafeFury fury = Fury.builder().withLanguage(Language.JAVA)
// Allow to deserialize objects unknown types,
// more flexible but less secure.
.requireClassRegistration(false)
.buildThreadSafeFury();
@Override
public ByteString serialize(JData obj) {
return SerializationHelper.serialize(obj);
return UnsafeByteOperations.unsafeWrap(fury.serialize(obj));
}
@Override
public JData deserialize(ByteString data) {
try (var is = data.newInput()) {
return SerializationHelper.deserialize(is);
} catch (IOException e) {
throw new RuntimeException(e);
}
public JData deserialize(ByteBuffer data) {
return (JData) fury.deserialize(data);
}
}

View File

@@ -2,8 +2,10 @@ package com.usatiuk.objects;
import com.google.protobuf.ByteString;
import java.nio.ByteBuffer;
public interface ObjectSerializer<T> {
ByteString serialize(T obj);
T deserialize(ByteString data);
T deserialize(ByteBuffer data);
}

View File

@@ -1,6 +1,6 @@
package com.usatiuk.objects.iterators;
import com.usatiuk.dhfs.utils.AutoCloseableNoThrow;
import com.usatiuk.utils.AutoCloseableNoThrow;
import org.apache.commons.lang3.tuple.Pair;
import java.util.Iterator;

View File

@@ -1,7 +1,5 @@
package com.usatiuk.objects.iterators;
import java.util.Optional;
public interface Data<V> extends MaybeTombstone<V> {
V value();
}

View File

@@ -1,6 +1,4 @@
package com.usatiuk.objects.iterators;
import java.util.Optional;
public record DataWrapper<V>(V value) implements Data<V> {
}

View File

@@ -1,6 +1,4 @@
package com.usatiuk.objects.iterators;
import java.util.Optional;
public interface MaybeTombstone<T> {
}

View File

@@ -10,16 +10,9 @@ import java.util.NoSuchElementException;
import java.util.TreeMap;
public class MergingKvIterator<K extends Comparable<K>, V> extends ReversibleKvIterator<K, V> {
private record IteratorEntry<K extends Comparable<K>, V>(int priority, CloseableKvIterator<K, V> iterator) {
public IteratorEntry<K, V> reversed() {
return new IteratorEntry<>(priority, iterator.reversed());
}
}
private final NavigableMap<K, IteratorEntry<K, V>> _sortedIterators = new TreeMap<>();
private final String _name;
private final List<IteratorEntry<K, V>> _iterators;
public MergingKvIterator(String name, IteratorStart startType, K startKey, List<IterProdFn<K, V>> iterators) {
_goingForward = true;
_name = name;
@@ -215,6 +208,12 @@ public class MergingKvIterator<K extends Comparable<K>, V> extends ReversibleKvI
private interface FirstMatchState<K extends Comparable<K>, V> {
}
private record IteratorEntry<K extends Comparable<K>, V>(int priority, CloseableKvIterator<K, V> iterator) {
public IteratorEntry<K, V> reversed() {
return new IteratorEntry<>(priority, iterator.reversed());
}
}
private record FirstMatchNone<K extends Comparable<K>, V>() implements FirstMatchState<K, V> {
}

View File

@@ -1,6 +1,5 @@
package com.usatiuk.objects.iterators;
import io.quarkus.logging.Log;
import org.apache.commons.lang3.tuple.Pair;
import java.util.NoSuchElementException;

View File

@@ -1,6 +1,4 @@
package com.usatiuk.objects.iterators;
import java.util.Optional;
public interface Tombstone<V> extends MaybeTombstone<V> {
}

View File

@@ -1,7 +1,5 @@
package com.usatiuk.objects.iterators;
import io.quarkus.logging.Log;
import java.util.List;
public abstract class TombstoneMergingKvIterator {

View File

@@ -1,9 +1,8 @@
package com.usatiuk.objects.snapshot;
import com.usatiuk.objects.JObjectKey;
import com.usatiuk.objects.iterators.CloseableKvIterator;
import com.usatiuk.objects.iterators.IteratorStart;
import com.usatiuk.dhfs.utils.AutoCloseableNoThrow;
import com.usatiuk.utils.AutoCloseableNoThrow;
import javax.annotation.Nonnull;
import java.util.Optional;

View File

@@ -24,52 +24,15 @@ import java.util.concurrent.atomic.AtomicReference;
@ApplicationScoped
public class CachingObjectPersistentStore {
private final AtomicReference<Cache> _cache;
@Inject
SerializingObjectPersistentStore delegate;
@ConfigProperty(name = "dhfs.objects.lru.print-stats")
boolean printStats;
private record Cache(TreePMap<JObjectKey, CacheEntry> map,
int size,
long version,
int sizeLimit) {
public Cache withPut(JObjectKey key, Optional<JDataVersionedWrapper> obj) {
var entry = obj.<CacheEntry>map(o -> new CacheEntryPresent(o, o.estimateSize())).orElse(new CacheEntryMiss());
int newSize = size() + entry.size();
var old = map.get(key);
if (old != null)
newSize -= old.size();
TreePMap<JObjectKey, CacheEntry> newCache = map();
while (newSize > sizeLimit) {
var del = newCache.firstEntry();
newCache = newCache.minusFirstEntry();
newSize -= del.getValue().size();
}
newCache = newCache.plus(key, entry);
return new Cache(
newCache,
newSize,
version,
sizeLimit
);
}
public Cache withVersion(long version) {
return new Cache(map, size, version, sizeLimit);
}
}
private final AtomicReference<Cache> _cache;
private ExecutorService _commitExecutor;
private ExecutorService _statusExecutor;
private AtomicLong _cached = new AtomicLong();
private AtomicLong _cacheTries = new AtomicLong();
public CachingObjectPersistentStore(@ConfigProperty(name = "dhfs.objects.lru.limit") int sizeLimit) {
_cache = new AtomicReference<>(
new Cache(TreePMap.empty(), 0, -1, sizeLimit)
@@ -142,10 +105,10 @@ public class CachingObjectPersistentStore {
Snapshot<JObjectKey, JDataVersionedWrapper> finalBacking = backing;
Cache finalCurCache = curCache;
return new Snapshot<JObjectKey, JDataVersionedWrapper>() {
private boolean _invalid = false;
private boolean _closed = false;
private final Cache _curCache = finalCurCache;
private final Snapshot<JObjectKey, JDataVersionedWrapper> _backing = finalBacking;
private boolean _invalid = false;
private boolean _closed = false;
private void doCache(JObjectKey key, Optional<JDataVersionedWrapper> obj) {
_cacheTries.incrementAndGet();
@@ -292,6 +255,41 @@ public class CachingObjectPersistentStore {
int size();
}
private record Cache(TreePMap<JObjectKey, CacheEntry> map,
int size,
long version,
int sizeLimit) {
public Cache withPut(JObjectKey key, Optional<JDataVersionedWrapper> obj) {
var entry = obj.<CacheEntry>map(o -> new CacheEntryPresent(o, o.estimateSize())).orElse(new CacheEntryMiss());
int newSize = size() + entry.size();
var old = map.get(key);
if (old != null)
newSize -= old.size();
TreePMap<JObjectKey, CacheEntry> newCache = map();
while (newSize > sizeLimit) {
var del = newCache.firstEntry();
newCache = newCache.minusFirstEntry();
newSize -= del.getValue().size();
}
newCache = newCache.plus(key, entry);
return new Cache(
newCache,
newSize,
version,
sizeLimit
);
}
public Cache withVersion(long version) {
return new Cache(map, size, version, sizeLimit);
}
}
private record CacheEntryPresent(JDataVersionedWrapper value,
int size) implements CacheEntry, Data<JDataVersionedWrapper> {
}

View File

@@ -1,8 +1,5 @@
package com.usatiuk.objects.stores;
import com.google.protobuf.ByteString;
import com.google.protobuf.UnsafeByteOperations;
import com.usatiuk.dhfs.utils.RefcountedCloseable;
import com.usatiuk.objects.JObjectKey;
import com.usatiuk.objects.JObjectKeyMax;
import com.usatiuk.objects.JObjectKeyMin;
@@ -42,6 +39,9 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore {
private static final String DB_VER_OBJ_NAME_STR = "__DB_VER_OBJ";
private static final ByteBuffer DB_VER_OBJ_NAME;
@ConfigProperty(name = "dhfs.objects.persistence.lmdb.size", defaultValue = "1000000000000")
long lmdbSize;
static {
byte[] tmp = DB_VER_OBJ_NAME_STR.getBytes(StandardCharsets.ISO_8859_1);
var bb = ByteBuffer.allocateDirect(tmp.length);
@@ -65,7 +65,7 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore {
_root.toFile().mkdirs();
}
_env = create()
.setMapSize(1_000_000_000_000L)
.setMapSize(lmdbSize)
.setMaxDbs(1)
.open(_root.toFile(), EnvFlags.MDB_NOTLS);
_db = _env.openDbi(DB_NAME, MDB_CREATE);
@@ -101,55 +101,50 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore {
if (!_ready) throw new IllegalStateException("Wrong service order!");
}
@Nonnull
@Override
public Optional<ByteString> readObject(JObjectKey name) {
verifyReady();
try (Txn<ByteBuffer> txn = _env.txnRead()) {
var value = _db.get(txn, name.toByteBuffer());
return Optional.ofNullable(value).map(ByteString::copyFrom);
public Snapshot<JObjectKey, ByteBuffer> getSnapshot() {
var txn = _env.txnRead();
try {
long commitId = readTxId(txn).orElseThrow();
return new Snapshot<JObjectKey, ByteBuffer>() {
private final Txn<ByteBuffer> _txn = txn;
private final long _id = commitId;
private boolean _closed = false;
@Override
public CloseableKvIterator<JObjectKey, ByteBuffer> getIterator(IteratorStart start, JObjectKey key) {
assert !_closed;
return new KeyPredicateKvIterator<>(new LmdbKvIterator(_txn, start, key), start, key, (k) -> !k.value().equals(DB_VER_OBJ_NAME_STR));
}
@Nonnull
@Override
public Optional<ByteBuffer> readObject(JObjectKey name) {
assert !_closed;
var got = _db.get(_txn, name.toByteBuffer());
var ret = Optional.ofNullable(got).map(ByteBuffer::asReadOnlyBuffer);
return ret;
}
@Override
public long id() {
assert !_closed;
return _id;
}
@Override
public void close() {
assert !_closed;
_closed = true;
_txn.close();
}
};
} catch (Exception e) {
txn.close();
throw e;
}
}
@Override
public Snapshot<JObjectKey, ByteString> getSnapshot() {
var txn = new RefcountedCloseable<>(_env.txnRead());
long commitId = readTxId(txn.get()).orElseThrow();
return new Snapshot<JObjectKey, ByteString>() {
private final RefcountedCloseable<Txn<ByteBuffer>> _txn = txn;
private final long _id = commitId;
private boolean _closed = false;
@Override
public CloseableKvIterator<JObjectKey, ByteString> getIterator(IteratorStart start, JObjectKey key) {
assert !_closed;
return new KeyPredicateKvIterator<>(new LmdbKvIterator(_txn.ref(), start, key), start, key, (k) -> !k.value().equals(DB_VER_OBJ_NAME_STR));
}
@Nonnull
@Override
public Optional<ByteString> readObject(JObjectKey name) {
assert !_closed;
var got = _db.get(_txn.get(), name.toByteBuffer());
var ret = Optional.ofNullable(got).map(UnsafeByteOperations::unsafeWrap);
return ret;
}
@Override
public long id() {
assert !_closed;
return _id;
}
@Override
public void close() {
assert !_closed;
_closed = true;
_txn.unref();
}
};
}
@Override
public Runnable prepareTx(TxManifestRaw names, long txId) {
verifyReady();
@@ -200,30 +195,30 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore {
return _root.toFile().getUsableSpace();
}
private class LmdbKvIterator extends ReversibleKvIterator<JObjectKey, ByteString> {
private class LmdbKvIterator extends ReversibleKvIterator<JObjectKey, ByteBuffer> {
private static final Cleaner CLEANER = Cleaner.create();
private final RefcountedCloseable<Txn<ByteBuffer>> _txn;
private final Txn<ByteBuffer> _txn; // Managed by the snapshot
private final Cursor<ByteBuffer> _cursor;
private final MutableObject<Boolean> _closed = new MutableObject<>(false);
// private final Exception _allocationStacktrace = new Exception();
private final Exception _allocationStacktrace = null;
// private final Exception _allocationStacktrace = null;
private boolean _hasNext = false;
private JObjectKey _peekedNextKey = null;
LmdbKvIterator(RefcountedCloseable<Txn<ByteBuffer>> txn, IteratorStart start, JObjectKey key) {
LmdbKvIterator(Txn<ByteBuffer> txn, IteratorStart start, JObjectKey key) {
_txn = txn;
_goingForward = true;
_cursor = _db.openCursor(_txn.get());
_cursor = _db.openCursor(_txn);
var closedRef = _closed;
var bt = _allocationStacktrace;
CLEANER.register(this, () -> {
if (!closedRef.getValue()) {
Log.error("Iterator was not closed before GC, allocated at: {0}", bt);
System.exit(-1);
}
});
// var bt = _allocationStacktrace;
// CLEANER.register(this, () -> {
// if (!closedRef.getValue()) {
// Log.error("Iterator was not closed before GC, allocated at: {0}", bt);
// System.exit(-1);
// }
// });
verifyReady();
@@ -305,7 +300,6 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore {
}
_closed.setValue(true);
_cursor.close();
_txn.unref();
}
@Override
@@ -356,14 +350,13 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore {
}
@Override
protected Pair<JObjectKey, ByteString> nextImpl() {
protected Pair<JObjectKey, ByteBuffer> nextImpl() {
if (!_hasNext) {
throw new NoSuchElementException("No more elements");
}
// TODO: Right now with java serialization it doesn't matter, it's all copied to arrays anyway
var val = _cursor.val();
var bs = UnsafeByteOperations.unsafeWrap(val);
var ret = Pair.of(JObjectKey.fromByteBuffer(_cursor.key()), bs);
var ret = Pair.of(JObjectKey.fromByteBuffer(_cursor.key()), val.asReadOnlyBuffer());
if (_goingForward)
_hasNext = _cursor.next();
else

View File

@@ -2,9 +2,9 @@ package com.usatiuk.objects.stores;
import com.google.protobuf.ByteString;
import com.usatiuk.objects.JObjectKey;
import com.usatiuk.objects.JObjectKeyImpl;
import com.usatiuk.objects.iterators.CloseableKvIterator;
import com.usatiuk.objects.iterators.IteratorStart;
import com.usatiuk.objects.iterators.MappingKvIterator;
import com.usatiuk.objects.iterators.NavigableMapKvIterator;
import com.usatiuk.objects.snapshot.Snapshot;
import io.quarkus.arc.properties.IfBuildProperty;
@@ -12,6 +12,7 @@ import jakarta.enterprise.context.ApplicationScoped;
import org.pcollections.TreePMap;
import javax.annotation.Nonnull;
import java.nio.ByteBuffer;
import java.util.Optional;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -22,30 +23,22 @@ public class MemoryObjectPersistentStore implements ObjectPersistentStore {
private TreePMap<JObjectKey, ByteString> _objects = TreePMap.empty();
private long _lastCommitId = 0;
@Nonnull
@Override
public Optional<ByteString> readObject(JObjectKey name) {
public Snapshot<JObjectKey, ByteBuffer> getSnapshot() {
synchronized (this) {
return Optional.ofNullable(_objects.get(name));
}
}
@Override
public Snapshot<JObjectKey, ByteString> getSnapshot() {
synchronized (this) {
return new Snapshot<JObjectKey, ByteString>() {
return new Snapshot<JObjectKey, ByteBuffer>() {
private final TreePMap<JObjectKey, ByteString> _objects = MemoryObjectPersistentStore.this._objects;
private final long _lastCommitId = MemoryObjectPersistentStore.this._lastCommitId;
@Override
public CloseableKvIterator<JObjectKey, ByteString> getIterator(IteratorStart start, JObjectKey key) {
return new NavigableMapKvIterator<>(_objects, start, key);
public CloseableKvIterator<JObjectKey, ByteBuffer> getIterator(IteratorStart start, JObjectKey key) {
return new MappingKvIterator<>(new NavigableMapKvIterator<>(_objects, start, key), ByteString::asReadOnlyByteBuffer);
}
@Nonnull
@Override
public Optional<ByteString> readObject(JObjectKey name) {
return Optional.ofNullable(_objects.get(name));
public Optional<ByteBuffer> readObject(JObjectKey name) {
return Optional.ofNullable(_objects.get(name)).map(ByteString::asReadOnlyByteBuffer);
}
@Override

View File

@@ -2,21 +2,16 @@ package com.usatiuk.objects.stores;
import com.google.protobuf.ByteString;
import com.usatiuk.objects.JObjectKey;
import com.usatiuk.objects.iterators.CloseableKvIterator;
import com.usatiuk.objects.iterators.IteratorStart;
import com.usatiuk.objects.snapshot.Snapshot;
import javax.annotation.Nonnull;
import java.nio.ByteBuffer;
import java.util.Optional;
import java.util.function.Consumer;
// Persistent storage of objects
// All changes are written as sequential transactions
public interface ObjectPersistentStore {
@Nonnull
Optional<ByteString> readObject(JObjectKey name);
Snapshot<JObjectKey, ByteString> getSnapshot();
Snapshot<JObjectKey, ByteBuffer> getSnapshot();
Runnable prepareTx(TxManifestRaw names, long txId);

View File

@@ -3,5 +3,6 @@ package com.usatiuk.objects.stores;
import com.usatiuk.objects.JDataVersionedWrapper;
import com.usatiuk.objects.iterators.Data;
public record PendingWrite(JDataVersionedWrapper value, long bundleId) implements PendingWriteEntry, Data<JDataVersionedWrapper> {
public record PendingWrite(JDataVersionedWrapper value,
long bundleId) implements PendingWriteEntry, Data<JDataVersionedWrapper> {
}

View File

@@ -2,6 +2,7 @@ package com.usatiuk.objects.stores;
import com.google.protobuf.ByteString;
import com.usatiuk.objects.JDataVersionedWrapper;
import com.usatiuk.objects.JDataVersionedWrapperSerializer;
import com.usatiuk.objects.JObjectKey;
import com.usatiuk.objects.ObjectSerializer;
import com.usatiuk.objects.iterators.CloseableKvIterator;
@@ -13,24 +14,20 @@ import jakarta.inject.Inject;
import org.apache.commons.lang3.tuple.Pair;
import javax.annotation.Nonnull;
import java.nio.ByteBuffer;
import java.util.Optional;
@ApplicationScoped
public class SerializingObjectPersistentStore {
@Inject
ObjectSerializer<JDataVersionedWrapper> serializer;
JDataVersionedWrapperSerializer serializer;
@Inject
ObjectPersistentStore delegateStore;
@Nonnull
Optional<JDataVersionedWrapper> readObject(JObjectKey name) {
return delegateStore.readObject(name).map(serializer::deserialize);
}
public Snapshot<JObjectKey, JDataVersionedWrapper> getSnapshot() {
return new Snapshot<JObjectKey, JDataVersionedWrapper>() {
private final Snapshot<JObjectKey, ByteString> _backing = delegateStore.getSnapshot();
private final Snapshot<JObjectKey, ByteBuffer> _backing = delegateStore.getSnapshot();
@Override
public CloseableKvIterator<JObjectKey, JDataVersionedWrapper> getIterator(IteratorStart start, JObjectKey key) {

View File

@@ -32,19 +32,10 @@ import java.util.function.Consumer;
public class WritebackObjectPersistentStore {
private final LinkedList<TxBundle> _pendingBundles = new LinkedList<>();
private final LinkedHashMap<Long, TxBundle> _notFlushedBundles = new LinkedHashMap<>();
private record PendingWriteData(TreePMap<JObjectKey, PendingWriteEntry> pendingWrites,
long lastFlushedId,
long lastCommittedId) {
}
private final AtomicReference<PendingWriteData> _pendingWrites = new AtomicReference<>(null);
private final Object _flushWaitSynchronizer = new Object();
private final AtomicLong _lastWrittenId = new AtomicLong(-1);
private final AtomicLong _lastCommittedId = new AtomicLong();
private final AtomicLong _waitedTotal = new AtomicLong(0);
@Inject
CachingObjectPersistentStore cachedStore;
@@ -81,6 +72,7 @@ public class WritebackObjectPersistentStore {
lastTxId = s.id();
}
_lastCommittedId.set(lastTxId);
_lastWrittenId.set(lastTxId);
_pendingWrites.set(new PendingWriteData(TreePMap.empty(), lastTxId, lastTxId));
_ready = true;
}
@@ -350,7 +342,7 @@ public class WritebackObjectPersistentStore {
@Override
public CloseableKvIterator<JObjectKey, JDataVersionedWrapper> getIterator(IteratorStart start, JObjectKey key) {
return TombstoneMergingKvIterator.<JObjectKey, JDataVersionedWrapper>of("writeback-ps", start, key,
return TombstoneMergingKvIterator.<JObjectKey, JDataVersionedWrapper>of("writeback-ps", start, key,
(tS, tK) -> new NavigableMapKvIterator<>(_pendingWrites, tS, tK),
(tS, tK) -> (CloseableKvIterator<JObjectKey, MaybeTombstone<JDataVersionedWrapper>>) (CloseableKvIterator<JObjectKey, ?>) _cache.getIterator(tS, tK));
}
@@ -392,6 +384,11 @@ public class WritebackObjectPersistentStore {
public interface VerboseReadResult {
}
private record PendingWriteData(TreePMap<JObjectKey, PendingWriteEntry> pendingWrites,
long lastFlushedId,
long lastCommittedId) {
}
private static class TxBundle {
private final LinkedHashMap<JObjectKey, BundleEntry> _entries = new LinkedHashMap<>();
private final ArrayList<Runnable> _callbacks = new ArrayList<>();

View File

@@ -4,7 +4,6 @@ import com.usatiuk.objects.JData;
import com.usatiuk.objects.JObjectKey;
import com.usatiuk.objects.iterators.CloseableKvIterator;
import com.usatiuk.objects.iterators.IteratorStart;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;

View File

@@ -1,17 +1,17 @@
package com.usatiuk.objects.transaction;
import com.usatiuk.dhfs.utils.AutoCloseableNoThrow;
import com.usatiuk.objects.JData;
import com.usatiuk.objects.JDataVersionedWrapper;
import com.usatiuk.objects.JObjectKey;
import com.usatiuk.objects.snapshot.Snapshot;
import com.usatiuk.objects.stores.WritebackObjectPersistentStore;
import com.usatiuk.utils.AutoCloseableNoThrow;
import io.quarkus.logging.Log;
import io.quarkus.runtime.StartupEvent;
import jakarta.annotation.Priority;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
import jakarta.enterprise.inject.Instance;
import jakarta.enterprise.inject.spi.CDI;
import jakarta.inject.Inject;
import org.apache.commons.lang3.tuple.Pair;
@@ -22,13 +22,7 @@ import java.util.stream.Stream;
@ApplicationScoped
public class JObjectManager {
private final List<PreCommitTxHook> _preCommitTxHooks;
private record CommitHookIterationData(PreCommitTxHook hook,
Map<JObjectKey, TxRecord.TxObjectRecord<?>> lastWrites,
Map<JObjectKey, TxRecord.TxObjectRecord<?>> pendingWrites) {
}
private static final List<PreCommitTxHook> _preCommitTxHooks;
@Inject
WritebackObjectPersistentStore writebackObjectPersistentStore;
@Inject
@@ -37,9 +31,12 @@ public class JObjectManager {
LockManager lockManager;
private boolean _ready = false;
JObjectManager(Instance<PreCommitTxHook> preCommitTxHooks) {
_preCommitTxHooks = List.copyOf(preCommitTxHooks.stream().sorted(Comparator.comparingInt(PreCommitTxHook::getPriority)).toList());
Log.debugv("Pre-commit hooks: {0}", String.join("->", _preCommitTxHooks.stream().map(Objects::toString).toList()));
static {
_preCommitTxHooks = List.copyOf(CDI.current().select(PreCommitTxHook.class).stream().sorted(Comparator.comparingInt(PreCommitTxHook::getPriority)).toList());
}
JObjectManager() {
Log.infov("Pre-commit hooks: {0}", String.join("->", _preCommitTxHooks.stream().map(Objects::toString).toList()));
}
private void verifyReady() {
@@ -277,4 +274,9 @@ public class JObjectManager {
});
tx.close();
}
private record CommitHookIterationData(PreCommitTxHook hook,
Map<JObjectKey, TxRecord.TxObjectRecord<?>> lastWrites,
Map<JObjectKey, TxRecord.TxObjectRecord<?>> pendingWrites) {
}
}

View File

@@ -1,11 +1,10 @@
package com.usatiuk.objects.transaction;
import com.usatiuk.objects.JObjectKey;
import com.usatiuk.dhfs.utils.AutoCloseableNoThrow;
import com.usatiuk.dhfs.utils.DataLocker;
import com.usatiuk.utils.AutoCloseableNoThrow;
import com.usatiuk.utils.DataLocker;
import jakarta.annotation.Nonnull;
import jakarta.annotation.Nullable;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Singleton;
@Singleton

View File

@@ -1,6 +1,6 @@
package com.usatiuk.objects.transaction;
import com.usatiuk.dhfs.utils.VoidFn;
import com.usatiuk.utils.VoidFn;
import io.quarkus.logging.Log;
import java.util.function.Supplier;

View File

@@ -1,7 +1,6 @@
package com.usatiuk.objects.transaction;
import io.quarkus.logging.Log;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import org.apache.commons.lang3.tuple.Pair;

View File

@@ -2,7 +2,7 @@ package com.usatiuk.objects.transaction;
import com.usatiuk.objects.JData;
import com.usatiuk.objects.JDataVersionedWrapper;
import com.usatiuk.dhfs.utils.AutoCloseableNoThrow;
import com.usatiuk.utils.AutoCloseableNoThrow;
import java.util.Optional;

View File

@@ -4,7 +4,7 @@ import com.usatiuk.objects.JData;
import com.usatiuk.objects.JDataVersionedWrapper;
import com.usatiuk.objects.JObjectKey;
import com.usatiuk.objects.snapshot.Snapshot;
import com.usatiuk.dhfs.utils.AutoCloseableNoThrow;
import com.usatiuk.utils.AutoCloseableNoThrow;
import java.util.Collection;
import java.util.Map;

View File

@@ -7,11 +7,8 @@ import io.quarkus.test.junit.QuarkusTest;
import io.quarkus.test.junit.TestProfile;
import jakarta.inject.Inject;
import org.apache.commons.lang3.tuple.Pair;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;
class ObjectsIterateAllTestProfiles {

View File

@@ -6,9 +6,42 @@ import net.jqwik.api.state.ActionChain;
import org.apache.commons.lang3.tuple.Pair;
import org.junit.jupiter.api.Assertions;
import java.util.*;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
public class MergingKvIteratorPbtTest {
@Property
public void checkMergingIterator(@ForAll("actions") ActionChain<MergingIteratorModel> actions) {
actions.run();
}
@Provide
Arbitrary<ActionChain<MergingIteratorModel>> actions(@ForAll("lists") List<List<Map.Entry<Integer, Integer>>> list,
@ForAll IteratorStart iteratorStart, @ForAll("startKey") Integer startKey) {
return ActionChain.startWith(() -> new MergingIteratorModel(list, iteratorStart, startKey))
.withAction(new NextAction())
.withAction(new PeekNextKeyAction())
.withAction(new SkipAction())
.withAction(new PeekPrevKeyAction())
.withAction(new SkipPrevAction())
.withAction(new PrevAction())
.withAction(new HasNextAction())
.withAction(new HasPrevAction());
}
@Provide
Arbitrary<List<List<Map.Entry<Integer, Integer>>>> lists() {
return Arbitraries.entries(Arbitraries.integers().between(-50, 50), Arbitraries.integers().between(-50, 50))
.list().uniqueElements(Map.Entry::getKey).ofMinSize(0).ofMaxSize(20)
.list().ofMinSize(1).ofMaxSize(5);
}
@Provide
Arbitrary<Integer> startKey() {
return Arbitraries.integers().between(-51, 51);
}
static class MergingIteratorModel implements CloseableKvIterator<Integer, Integer> {
private final CloseableKvIterator<Integer, Integer> mergedIterator;
private final CloseableKvIterator<Integer, Integer> mergingIterator;
@@ -228,35 +261,4 @@ public class MergingKvIteratorPbtTest {
return "Has prev key";
}
}
@Property
public void checkMergingIterator(@ForAll("actions") ActionChain<MergingIteratorModel> actions) {
actions.run();
}
@Provide
Arbitrary<ActionChain<MergingIteratorModel>> actions(@ForAll("lists") List<List<Map.Entry<Integer, Integer>>> list,
@ForAll IteratorStart iteratorStart, @ForAll("startKey") Integer startKey) {
return ActionChain.startWith(() -> new MergingIteratorModel(list, iteratorStart, startKey))
.withAction(new NextAction())
.withAction(new PeekNextKeyAction())
.withAction(new SkipAction())
.withAction(new PeekPrevKeyAction())
.withAction(new SkipPrevAction())
.withAction(new PrevAction())
.withAction(new HasNextAction())
.withAction(new HasPrevAction());
}
@Provide
Arbitrary<List<List<Map.Entry<Integer, Integer>>>> lists() {
return Arbitraries.entries(Arbitraries.integers().between(-50, 50), Arbitraries.integers().between(-50, 50))
.list().uniqueElements(Map.Entry::getKey).ofMinSize(0).ofMaxSize(20)
.list().ofMinSize(1).ofMaxSize(5);
}
@Provide
Arbitrary<Integer> startKey() {
return Arbitraries.integers().between(-51, 51);
}
}

View File

@@ -178,8 +178,8 @@
--initialize-at-run-time=jnr.ffi.util.ref.FinalizableReferenceQueue,
--initialize-at-run-time=jnr.ffi.provider.jffi.NativeFinalizer,
--initialize-at-run-time=jnr.ffi.provider.jffi.NativeFinalizer$SingletonHolder,
--initialize-at-run-time=com.usatiuk.dhfs.utils.RefcountedCloseable,
--initialize-at-run-time=com.usatiuk.dhfs.utils.DataLocker$Lock,
--initialize-at-run-time=com.usatiuk.utils.RefcountedCloseable,
--initialize-at-run-time=com.usatiuk.utils.DataLocker$Lock,
--initialize-at-run-time=com.usatiuk.objects.stores.LmdbObjectPersistentStore$LmdbKvIterator,
--initialize-at-run-time=com.usatiuk.objects.stores.LmdbObjectPersistentStore,
--initialize-at-run-time=com.google.protobuf.UnsafeUtil

View File

@@ -13,6 +13,10 @@
</parent>
<dependencies>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-smallrye-openapi</artifactId>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>

View File

@@ -1,7 +1,7 @@
package com.usatiuk.dhfs;
import com.usatiuk.objects.JObjectKey;
import com.usatiuk.dhfs.persistence.JObjectKeyP;
import com.usatiuk.objects.JObjectKey;
import jakarta.inject.Singleton;
@Singleton

View File

@@ -1,8 +1,8 @@
package com.usatiuk.dhfs;
import com.usatiuk.dhfs.invalidation.Op;
import com.usatiuk.dhfs.repository.OpP;
import com.usatiuk.dhfs.repository.invalidation.Op;
import com.usatiuk.dhfs.utils.SerializationHelper;
import com.usatiuk.utils.SerializationHelper;
import jakarta.inject.Singleton;
@Singleton

View File

@@ -1,14 +1,14 @@
package com.usatiuk.dhfs.repository;
package com.usatiuk.dhfs.autosync;
import com.usatiuk.dhfs.JDataRemote;
import com.usatiuk.dhfs.RemoteObjectMeta;
import com.usatiuk.dhfs.RemoteTransaction;
import com.usatiuk.dhfs.remoteobj.JDataRemote;
import com.usatiuk.dhfs.remoteobj.RemoteObjectMeta;
import com.usatiuk.dhfs.remoteobj.RemoteTransaction;
import com.usatiuk.objects.JData;
import com.usatiuk.objects.JObjectKey;
import com.usatiuk.objects.iterators.IteratorStart;
import com.usatiuk.objects.transaction.Transaction;
import com.usatiuk.objects.transaction.TransactionManager;
import com.usatiuk.dhfs.utils.HashSetDelayedBlockingQueue;
import com.usatiuk.utils.HashSetDelayedBlockingQueue;
import io.quarkus.logging.Log;
import io.quarkus.runtime.ShutdownEvent;
import io.quarkus.runtime.StartupEvent;

View File

@@ -1,12 +1,11 @@
package com.usatiuk.dhfs.repository;
package com.usatiuk.dhfs.autosync;
import com.usatiuk.dhfs.invalidation.InvalidationQueueService;
import com.usatiuk.dhfs.remoteobj.RemoteObjectMeta;
import com.usatiuk.objects.JData;
import com.usatiuk.objects.JObjectKey;
import com.usatiuk.dhfs.RemoteObjectMeta;
import com.usatiuk.dhfs.repository.invalidation.InvalidationQueueService;
import com.usatiuk.objects.transaction.PreCommitTxHook;
import com.usatiuk.objects.transaction.Transaction;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import org.eclipse.microprofile.config.inject.ConfigProperty;

View File

@@ -1,6 +1,6 @@
package com.usatiuk.dhfs.repository.invalidation;
package com.usatiuk.dhfs.invalidation;
import com.usatiuk.dhfs.PeerId;
import com.usatiuk.dhfs.peersync.PeerId;
import org.apache.commons.collections4.MultiValuedMap;
import org.apache.commons.collections4.multimap.HashSetValuedHashMap;

View File

@@ -1,9 +1,9 @@
package com.usatiuk.dhfs.repository.invalidation;
package com.usatiuk.dhfs.invalidation;
import com.usatiuk.dhfs.PeerId;
import com.usatiuk.dhfs.repository.PeerConnectedEventListener;
import com.usatiuk.dhfs.repository.PeerManager;
import com.usatiuk.dhfs.utils.SerializationHelper;
import com.usatiuk.dhfs.peersync.PeerConnectedEventListener;
import com.usatiuk.dhfs.peersync.PeerId;
import com.usatiuk.dhfs.peersync.PeerManager;
import com.usatiuk.utils.SerializationHelper;
import io.quarkus.logging.Log;
import io.quarkus.runtime.ShutdownEvent;
import io.quarkus.runtime.StartupEvent;

View File

@@ -1,14 +1,13 @@
package com.usatiuk.dhfs.repository.invalidation;
package com.usatiuk.dhfs.invalidation;
import com.usatiuk.dhfs.peersync.PeerId;
import com.usatiuk.objects.JObjectKey;
import com.usatiuk.dhfs.PeerId;
import com.usatiuk.dhfs.repository.JDataRemoteDto;
import org.pcollections.PMap;
import java.util.Collection;
import java.util.List;
public record IndexUpdateOp(JObjectKey key, PMap<PeerId, Long> changelog, JDataRemoteDto data) implements Op {
public record IndexUpdateOp(JObjectKey key, PMap<PeerId, Long> changelog) implements Op {
@Override
public Collection<JObjectKey> getEscapedRefs() {
return List.of(key);

View File

@@ -1,7 +1,7 @@
package com.usatiuk.dhfs.repository.invalidation;
package com.usatiuk.dhfs.invalidation;
import com.usatiuk.dhfs.peersync.PeerId;
import com.usatiuk.objects.JObjectKey;
import com.usatiuk.dhfs.PeerId;
import java.io.Serializable;

View File

@@ -1,14 +1,17 @@
package com.usatiuk.dhfs.repository.invalidation;
package com.usatiuk.dhfs.invalidation;
import com.usatiuk.dhfs.repository.RemoteObjectServiceClient;
import com.usatiuk.dhfs.utils.AutoCloseableNoThrow;
import com.usatiuk.dhfs.utils.DataLocker;
import com.usatiuk.dhfs.peersync.PeerId;
import com.usatiuk.dhfs.peersync.PeerInfoService;
import com.usatiuk.dhfs.peersync.PeerManager;
import com.usatiuk.dhfs.peersync.PersistentPeerDataService;
import com.usatiuk.dhfs.rpc.RemoteObjectServiceClient;
import com.usatiuk.objects.JData;
import com.usatiuk.objects.JObjectKey;
import com.usatiuk.dhfs.PeerId;
import com.usatiuk.dhfs.repository.PeerManager;
import com.usatiuk.dhfs.repository.PersistentPeerDataService;
import com.usatiuk.dhfs.repository.peersync.PeerInfoService;
import com.usatiuk.dhfs.utils.HashSetDelayedBlockingQueue;
import com.usatiuk.objects.transaction.Transaction;
import com.usatiuk.objects.transaction.TransactionManager;
import com.usatiuk.utils.AutoCloseableNoThrow;
import com.usatiuk.utils.DataLocker;
import com.usatiuk.utils.HashSetDelayedBlockingQueue;
import io.quarkus.logging.Log;
import io.quarkus.runtime.ShutdownEvent;
import io.quarkus.runtime.StartupEvent;
@@ -17,7 +20,6 @@ import jakarta.annotation.Priority;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
import jakarta.inject.Inject;
import jakarta.ws.rs.core.Link;
import org.apache.commons.collections4.multimap.ArrayListValuedHashMap;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.eclipse.microprofile.config.inject.ConfigProperty;
@@ -33,6 +35,7 @@ import java.util.concurrent.atomic.AtomicReference;
public class InvalidationQueueService {
private final HashSetDelayedBlockingQueue<InvalidationQueueEntry> _queue;
private final AtomicReference<ConcurrentHashSet<JObjectKey>> _toAllQueue = new AtomicReference<>(new ConcurrentHashSet<>());
private final DataLocker _locker = new DataLocker();
@Inject
PeerManager remoteHostManager;
@Inject
@@ -40,15 +43,17 @@ public class InvalidationQueueService {
@Inject
PeerInfoService peerInfoService;
@Inject
OpPusher opPusher;
TransactionManager txm;
@Inject
Transaction curTx;
@ConfigProperty(name = "dhfs.objects.invalidation.threads")
int threads;
@Inject
PersistentPeerDataService persistentPeerDataService;
private final DataLocker _locker = new DataLocker();
@Inject
RemoteObjectServiceClient remoteObjectServiceClient;
@Inject
OpExtractorService opExtractorService;
private ExecutorService _executor;
private volatile boolean _shutdown = false;
@@ -141,9 +146,15 @@ public class InvalidationQueueService {
}
locks.add(lock);
try {
var prepared = opPusher.preparePush(e);
ops.get(e.peer()).addAll(prepared.getLeft());
commits.get(e.peer()).addAll(prepared.getRight());
txm.run(() -> {
var obj = curTx.get(JData.class, e.key()).orElse(null);
if (obj == null) return;
var extracted = opExtractorService.extractOps(obj, e.peer());
if (extracted == null) return;
ops.get(e.peer()).addAll(extracted.getLeft());
commits.get(e.peer()).add(extracted.getRight());
});
success++;
} catch (Exception ex) {
Log.warnv("Failed to prepare invalidation to {0}, will retry: {1}", e, ex);
@@ -157,12 +168,12 @@ public class InvalidationQueueService {
for (var p : ops.keySet()) {
var list = ops.get(p);
Log.infov("Pushing invalidations to {0}: {1}", p, list);
Log.tracev("Pushing invalidations to {0}: {1}", p, list);
remoteObjectServiceClient.pushOps(p, list);
commits.get(p).forEach(Runnable::run);
}
} catch (Exception e) {
Log.warnv("Failed to send invalidations, will retry", e);
Log.warn("Failed to send invalidations, will retry", e);
for (var inv : data) {
pushInvalidationToOne(inv);
}

View File

@@ -1,4 +1,4 @@
package com.usatiuk.dhfs.repository.invalidation;
package com.usatiuk.dhfs.invalidation;
import com.usatiuk.objects.JObjectKey;

View File

@@ -0,0 +1,11 @@
package com.usatiuk.dhfs.invalidation;
import com.usatiuk.dhfs.peersync.PeerId;
import com.usatiuk.objects.JData;
import org.apache.commons.lang3.tuple.Pair;
import java.util.List;
public interface OpExtractor<T extends JData> {
Pair<List<Op>, Runnable> extractOps(T data, PeerId peerId);
}

View File

@@ -0,0 +1,48 @@
package com.usatiuk.dhfs.invalidation;
import com.usatiuk.dhfs.peersync.PeerId;
import com.usatiuk.objects.JData;
import jakarta.annotation.Nullable;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Instance;
import org.apache.commons.lang3.tuple.Pair;
import java.lang.reflect.ParameterizedType;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;
@ApplicationScoped
public class OpExtractorService {
private final Map<Class<? extends JData>, OpExtractor> _opExtractorMap;
public OpExtractorService(Instance<OpExtractor<?>> opExtractors) {
HashMap<Class<? extends JData>, OpExtractor> opExtractorMap = new HashMap<>();
for (var opExtractor : opExtractors.handles()) {
for (var type : Arrays.stream(opExtractor.getBean().getBeanClass().getGenericInterfaces()).flatMap(
t -> {
if (!(t instanceof ParameterizedType pm)) return Stream.empty();
if (pm.getRawType().equals(OpExtractor.class)) return Stream.of(pm);
return Stream.empty();
}
).toList()) {
var orig = type.getActualTypeArguments()[0];
assert JData.class.isAssignableFrom((Class<?>) orig);
opExtractorMap.put((Class<? extends JData>) orig, opExtractor.get());
}
}
_opExtractorMap = Map.copyOf(opExtractorMap);
}
public @Nullable Pair<List<Op>, Runnable> extractOps(JData data, PeerId peerId) {
var extractor = _opExtractorMap.get(data.getClass());
if (extractor == null) {
return null;
}
return extractor.extractOps(data, peerId);
}
}

View File

@@ -0,0 +1,7 @@
package com.usatiuk.dhfs.invalidation;
import com.usatiuk.dhfs.peersync.PeerId;
public interface OpHandler<T extends Op> {
void handleOp(PeerId from, T op);
}

View File

@@ -0,0 +1,44 @@
package com.usatiuk.dhfs.invalidation;
import com.usatiuk.dhfs.peersync.PeerId;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Instance;
import java.lang.reflect.ParameterizedType;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Stream;
@ApplicationScoped
public class OpHandlerService {
private final Map<Class<? extends Op>, OpHandler> _opHandlerMap;
public OpHandlerService(Instance<OpHandler<?>> OpHandlers) {
HashMap<Class<? extends Op>, OpHandler> OpHandlerMap = new HashMap<>();
for (var OpHandler : OpHandlers.handles()) {
for (var type : Arrays.stream(OpHandler.getBean().getBeanClass().getGenericInterfaces()).flatMap(
t -> {
if (!(t instanceof ParameterizedType pm)) return Stream.empty();
if (pm.getRawType().equals(OpHandler.class)) return Stream.of(pm);
return Stream.empty();
}
).toList()) {
var orig = type.getActualTypeArguments()[0];
assert Op.class.isAssignableFrom((Class<?>) orig);
OpHandlerMap.put((Class<? extends Op>) orig, OpHandler.get());
}
}
_opHandlerMap = Map.copyOf(OpHandlerMap);
}
public void handleOp(PeerId from, Op op) {
var handler = _opHandlerMap.get(op.getClass());
if (handler == null) {
throw new IllegalArgumentException("No handler for op: " + op.getClass());
}
handler.handleOp(from, op);
}
}

View File

@@ -0,0 +1,30 @@
package com.usatiuk.dhfs.invalidation;
import com.usatiuk.dhfs.peersync.PeerId;
import com.usatiuk.dhfs.remoteobj.RemoteObjectMeta;
import com.usatiuk.dhfs.remoteobj.RemoteTransaction;
import com.usatiuk.objects.transaction.Transaction;
import com.usatiuk.objects.transaction.TransactionManager;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import org.apache.commons.lang3.tuple.Pair;
import java.util.List;
@ApplicationScoped
public class RemoteObjectMetaOpExtractor implements OpExtractor<RemoteObjectMeta> {
@Inject
TransactionManager txm;
@Inject
Transaction curTx;
@Inject
RemoteTransaction remoteTransaction;
@Override
public Pair<List<Op>, Runnable> extractOps(RemoteObjectMeta data, PeerId peerId) {
return txm.run(() -> {
return Pair.of(List.of(new IndexUpdateOp(data.key(), data.changelog())), () -> {
});
});
}
}

View File

@@ -1,9 +1,9 @@
package com.usatiuk.dhfs.jkleppmanntree;
import com.usatiuk.dhfs.PeerId;
import com.usatiuk.dhfs.invalidation.InvalidationQueueService;
import com.usatiuk.dhfs.jkleppmanntree.structs.JKleppmannTreePersistentData;
import com.usatiuk.dhfs.repository.InitialSyncProcessor;
import com.usatiuk.dhfs.repository.invalidation.InvalidationQueueService;
import com.usatiuk.dhfs.peersync.InitialSyncProcessor;
import com.usatiuk.dhfs.peersync.PeerId;
import com.usatiuk.objects.JObjectKey;
import io.quarkus.logging.Log;
import jakarta.enterprise.context.ApplicationScoped;
@@ -18,7 +18,7 @@ public class JKleppmannTreeInitialSyncProcessor implements InitialSyncProcessor<
@Override
public void prepareForInitialSync(PeerId from, JObjectKey key) {
var tree = jKleppmannTreeManager.getTree(key);
var tree = jKleppmannTreeManager.getTree(key).orElseThrow();
tree.recordBootstrap(from);
}

View File

@@ -1,15 +1,18 @@
package com.usatiuk.dhfs.jkleppmanntree;
import com.usatiuk.dhfs.invalidation.Op;
import com.usatiuk.dhfs.jkleppmanntree.structs.JKleppmannTreeNode;
import com.usatiuk.dhfs.jkleppmanntree.structs.JKleppmannTreeNodeHolder;
import com.usatiuk.dhfs.jkleppmanntree.structs.JKleppmannTreeNodeMeta;
import com.usatiuk.dhfs.jkleppmanntree.structs.JKleppmannTreePersistentData;
import com.usatiuk.dhfs.peersync.PeerId;
import com.usatiuk.dhfs.peersync.PeerInfoService;
import com.usatiuk.dhfs.peersync.PersistentPeerDataService;
import com.usatiuk.kleppmanntree.*;
import com.usatiuk.objects.JObjectKey;
import com.usatiuk.dhfs.PeerId;
import com.usatiuk.dhfs.jkleppmanntree.structs.*;
import com.usatiuk.dhfs.repository.PersistentPeerDataService;
import com.usatiuk.dhfs.repository.invalidation.Op;
import com.usatiuk.dhfs.repository.peersync.PeerInfoService;
import com.usatiuk.objects.transaction.LockingStrategy;
import com.usatiuk.objects.transaction.Transaction;
import com.usatiuk.objects.transaction.TransactionManager;
import com.usatiuk.kleppmanntree.*;
import io.quarkus.logging.Log;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
@@ -20,6 +23,7 @@ import org.pcollections.TreePSet;
import java.util.*;
import java.util.function.Function;
import java.util.function.Supplier;
@ApplicationScoped
public class JKleppmannTreeManager {
@@ -35,7 +39,7 @@ public class JKleppmannTreeManager {
@Inject
PersistentPeerDataService persistentPeerDataService;
public JKleppmannTree getTree(JObjectKey name, LockingStrategy lockingStrategy) {
public JKleppmannTree getTree(JObjectKey name, LockingStrategy lockingStrategy, Supplier<JKleppmannTreeNodeMeta> rootNodeSupplier) {
return txManager.executeTx(() -> {
var data = curTx.get(JKleppmannTreePersistentData.class, name, lockingStrategy).orElse(null);
if (data == null) {
@@ -49,22 +53,32 @@ public class JKleppmannTreeManager {
TreePMap.empty()
);
curTx.put(data);
var rootNode = new JKleppmannTreeNode(JObjectKey.of(name.value() + "_jt_root"), null, new JKleppmannTreeNodeMetaDirectory(""));
curTx.put(rootNode);
var trashNode = new JKleppmannTreeNode(JObjectKey.of(name.value() + "_jt_trash"), null, new JKleppmannTreeNodeMetaDirectory(""));
curTx.put(trashNode);
var lf_node = new JKleppmannTreeNode(JObjectKey.of(name.value() + "_jt_lf"), null, new JKleppmannTreeNodeMetaDirectory(""));
curTx.put(lf_node);
var rootNode = new JKleppmannTreeNode(JObjectKey.of(name.value() + "_jt_root"), null, rootNodeSupplier.get());
curTx.put(new JKleppmannTreeNodeHolder(rootNode, true));
var trashNode = new JKleppmannTreeNode(JObjectKey.of(name.value() + "_jt_trash"), null, rootNodeSupplier.get());
curTx.put(new JKleppmannTreeNodeHolder(trashNode, true));
var lf_node = new JKleppmannTreeNode(JObjectKey.of(name.value() + "_jt_lf"), null, rootNodeSupplier.get());
curTx.put(new JKleppmannTreeNodeHolder(lf_node, true));
}
return new JKleppmannTree(data);
// opObjectRegistry.registerObject(tree);
});
}
public JKleppmannTree getTree(JObjectKey name) {
public Optional<JKleppmannTree> getTree(JObjectKey name) {
return getTree(name, LockingStrategy.WRITE);
}
public Optional<JKleppmannTree> getTree(JObjectKey name, LockingStrategy lockingStrategy) {
return txManager.executeTx(() -> {
return curTx.get(JKleppmannTreePersistentData.class, name, lockingStrategy).map(JKleppmannTree::new);
});
}
public JKleppmannTree getTree(JObjectKey name, Supplier<JKleppmannTreeNodeMeta> rootNodeSupplier) {
return getTree(name, LockingStrategy.WRITE, rootNodeSupplier);
}
public class JKleppmannTree {
private final KleppmannTree<Long, PeerId, JKleppmannTreeNodeMeta, JObjectKey> _tree;
private final JKleppmannTreeStorageInterface _storageInterface;
@@ -155,62 +169,15 @@ public class JKleppmannTreeManager {
// }
if (Log.isTraceEnabled())
Log.trace("Received op from " + from + ": " + jop.op().timestamp().timestamp() + " " + jop.op().childId() + "->" + jop.op().newParentId() + " as " + jop.op().newMeta().getName());
Log.trace("Received op from " + from + ": " + jop.op().timestamp().timestamp() + " " + jop.op().childId() + "->" + jop.op().newParentId() + " as " + jop.op().newMeta().name());
try {
_tree.applyExternalOp(from, jop.op());
} catch (Exception e) {
Log.error("Error applying external op", e);
throw e;
} finally {
// FIXME:
// Fixup the ref if it didn't really get applied
// if ((fileRef == null) && (jop.getOp().newMeta() instanceof JKleppmannTreeNodeMetaFile))
// Log.error("Could not create child of pushed op: " + jop.getOp());
// if (jop.getOp().newMeta() instanceof JKleppmannTreeNodeMetaFile f) {
// if (fileRef != null) {
// var got = jObjectManager.get(jop.getOp().childId()).orElse(null);
//
// VoidFn remove = () -> {
// fileRef.runWriteLockedVoid(JObjectManager.ResolutionStrategy.LOCAL_ONLY, (m, d, b, v) -> {
// m.removeRef(jop.getOp().childId());
// });
// };
//
// if (got == null) {
// remove.apply();
// } else {
// try {
// got.rLock();
// try {
// got.tryResolve(JObjectManager.ResolutionStrategy.LOCAL_ONLY);
// if (got.getData() == null || !got.getData().extractRefs().contains(f.getFileIno()))
// remove.apply();
// } finally {
// got.rUnlock();
// }
// } catch (DeletedObjectAccessException dex) {
// remove.apply();
// }
// }
// }
// }
}
_tree.applyExternalOp(from, jop.op());
}
public Op getPeriodicPushOp() {
return new JKleppmannTreePeriodicPushOp(_treeName, persistentPeerDataService.getSelfUuid(), _clock.peekTimestamp());
}
// @Override
// public void addToTx() {
// // FIXME: a hack
// _persistentData.get().rwLockNoCopy();
// _persistentData.get().rwUnlock();
// }
private class JOpRecorder implements OpRecorder<Long, PeerId, JKleppmannTreeNodeMeta, JObjectKey> {
@Override
public void recordOp(OpMove<Long, PeerId, JKleppmannTreeNodeMeta, JObjectKey> op) {
@@ -278,8 +245,8 @@ public class JKleppmannTreeManager {
@Override
public JKleppmannTreeNode getById(JObjectKey id) {
var got = curTx.get(JKleppmannTreeNode.class, id);
return got.orElse(null);
var got = curTx.get(JKleppmannTreeNodeHolder.class, id);
return got.map(JKleppmannTreeNodeHolder::node).orElse(null);
}
@Override
@@ -289,12 +256,14 @@ public class JKleppmannTreeManager {
@Override
public void putNode(TreeNode<Long, PeerId, JKleppmannTreeNodeMeta, JObjectKey> node) {
curTx.put(((JKleppmannTreeNode) node));
curTx.put(curTx.get(JKleppmannTreeNodeHolder.class, node.key())
.map(n -> n.withNode((JKleppmannTreeNode) node))
.orElse(new JKleppmannTreeNodeHolder((JKleppmannTreeNode) node)));
}
@Override
public void removeNode(JObjectKey id) {
// TODO
// GC
}
@Override

Some files were not shown because too many files have changed in this diff Show More