mirror of
https://github.com/usatiuk/dhfs.git
synced 2025-10-29 04:57:48 +01:00
Compare commits
20 Commits
42a1f098c1
...
d30dc594d3
| Author | SHA1 | Date | |
|---|---|---|---|
| d30dc594d3 | |||
| 1148713871 | |||
| bfe19883a4 | |||
| c1ba6b9de1 | |||
| f130dfe4cc | |||
| 4cb7ee264f | |||
| 16f817a660 | |||
| c4afaa1907 | |||
| 52882dc687 | |||
| 91c7998291 | |||
| 45e3181b3e | |||
| 5787bdddc1 | |||
| 0c4af16390 | |||
| b21ddfb6ec | |||
| 6cd6cf7c6d | |||
| 4d5c99dbf8 | |||
| 14049eca17 | |||
| bbb93ea2e1 | |||
| 606afb4f53 | |||
| 684eb436d4 |
14
.github/workflows/server.yml
vendored
14
.github/workflows/server.yml
vendored
@@ -26,7 +26,7 @@ jobs:
|
||||
if: env.ACT=='true'
|
||||
|
||||
- name: Install fuse and maven
|
||||
run: sudo apt-get update && sudo apt-get install -y fuse libfuse-dev maven
|
||||
run: sudo apt-get update && sudo apt-get install -y fuse
|
||||
|
||||
- name: Set up JDK 21
|
||||
uses: actions/setup-java@v4
|
||||
@@ -36,18 +36,18 @@ jobs:
|
||||
cache: maven
|
||||
|
||||
- name: Build and test with Maven
|
||||
run: cd server && mvn --batch-mode --update-snapshots package verify
|
||||
run: cd dhfs-parent && mvn --batch-mode --update-snapshots package verify
|
||||
|
||||
- uses: actions/upload-artifact@v3
|
||||
with:
|
||||
name: DHFS Package
|
||||
path: server/target/quarkus-app
|
||||
name: DHFS Server Package
|
||||
path: dhfs-parent/server/target/quarkus-app
|
||||
|
||||
- uses: actions/upload-artifact@v3
|
||||
if: ${{ always() }}
|
||||
with:
|
||||
name: Test logs
|
||||
path: server/target/*-reports
|
||||
path: dhfs-parent/**/target/*-reports
|
||||
|
||||
build-webui:
|
||||
runs-on: ubuntu-latest
|
||||
@@ -90,7 +90,7 @@ jobs:
|
||||
|
||||
- uses: actions/download-artifact@v3
|
||||
with:
|
||||
name: DHFS Package
|
||||
name: DHFS Server Package
|
||||
path: dhfs-package-downloaded
|
||||
|
||||
- uses: actions/download-artifact@v3
|
||||
@@ -181,7 +181,7 @@ jobs:
|
||||
|
||||
- uses: actions/download-artifact@v3
|
||||
with:
|
||||
name: DHFS Package
|
||||
name: DHFS Server Package
|
||||
path: dhfs-package-downloaded
|
||||
|
||||
- uses: actions/download-artifact@v3
|
||||
|
||||
130
dhfs-parent/pom.xml
Normal file
130
dhfs-parent/pom.xml
Normal file
@@ -0,0 +1,130 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<groupId>com.usatiuk.dhfs</groupId>
|
||||
<artifactId>parent</artifactId>
|
||||
<version>1.0-SNAPSHOT</version>
|
||||
|
||||
<name>dhfs-parent</name>
|
||||
<packaging>pom</packaging>
|
||||
<modules>
|
||||
<module>server</module>
|
||||
</modules>
|
||||
|
||||
<properties>
|
||||
<compiler-plugin.version>3.12.1</compiler-plugin.version>
|
||||
<maven.compiler.release>21</maven.compiler.release>
|
||||
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
|
||||
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
|
||||
<quarkus.platform.artifact-id>quarkus-bom</quarkus.platform.artifact-id>
|
||||
<quarkus.platform.group-id>io.quarkus.platform</quarkus.platform.group-id>
|
||||
<quarkus.platform.version>3.11.3</quarkus.platform.version>
|
||||
<surefire-plugin.version>3.2.5</surefire-plugin.version>
|
||||
</properties>
|
||||
|
||||
<repositories>
|
||||
<repository>
|
||||
<id>jitpack.io</id>
|
||||
<url>https://jitpack.io</url>
|
||||
</repository>
|
||||
</repositories>
|
||||
|
||||
<dependencyManagement>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>${quarkus.platform.group-id}</groupId>
|
||||
<artifactId>${quarkus.platform.artifact-id}</artifactId>
|
||||
<version>${quarkus.platform.version}</version>
|
||||
<type>pom</type>
|
||||
<scope>import</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</dependencyManagement>
|
||||
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>${quarkus.platform.group-id}</groupId>
|
||||
<artifactId>quarkus-maven-plugin</artifactId>
|
||||
<version>${quarkus.platform.version}</version>
|
||||
<extensions>true</extensions>
|
||||
<executions>
|
||||
<execution>
|
||||
<goals>
|
||||
<goal>build</goal>
|
||||
<goal>generate-code</goal>
|
||||
<goal>generate-code-tests</goal>
|
||||
</goals>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<artifactId>maven-compiler-plugin</artifactId>
|
||||
<version>${compiler-plugin.version}</version>
|
||||
<configuration>
|
||||
<compilerArgs>
|
||||
<arg>-parameters</arg>
|
||||
</compilerArgs>
|
||||
</configuration>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<artifactId>maven-surefire-plugin</artifactId>
|
||||
<version>${surefire-plugin.version}</version>
|
||||
<configuration>
|
||||
<systemPropertyVariables>
|
||||
<java.util.logging.manager>org.jboss.logmanager.LogManager</java.util.logging.manager>
|
||||
<maven.home>${maven.home}</maven.home>
|
||||
</systemPropertyVariables>
|
||||
<argLine>--add-exports java.base/sun.nio.ch=ALL-UNNAMED</argLine>
|
||||
<skipTests>${skip.unit}</skipTests>
|
||||
</configuration>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<artifactId>maven-failsafe-plugin</artifactId>
|
||||
<version>${surefire-plugin.version}</version>
|
||||
<executions>
|
||||
<execution>
|
||||
<goals>
|
||||
<goal>integration-test</goal>
|
||||
<goal>verify</goal>
|
||||
</goals>
|
||||
</execution>
|
||||
</executions>
|
||||
<configuration>
|
||||
<systemPropertyVariables>
|
||||
<native.image.path>
|
||||
${project.build.directory}/${project.build.finalName}-runner
|
||||
</native.image.path>
|
||||
<java.util.logging.manager>org.jboss.logmanager.LogManager</java.util.logging.manager>
|
||||
<buildDirectory>${project.build.directory}</buildDirectory>
|
||||
<junit.jupiter.execution.parallel.enabled>false</junit.jupiter.execution.parallel.enabled>
|
||||
<junit.jupiter.execution.parallel.mode.default>
|
||||
concurrent
|
||||
</junit.jupiter.execution.parallel.mode.default>
|
||||
<junit.platform.output.capture.stdout>true</junit.platform.output.capture.stdout>
|
||||
<junit.platform.output.capture.stderr>true</junit.platform.output.capture.stderr>
|
||||
<maven.home>${maven.home}</maven.home>
|
||||
</systemPropertyVariables>
|
||||
</configuration>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
|
||||
<profiles>
|
||||
<profile>
|
||||
<id>native</id>
|
||||
<activation>
|
||||
<property>
|
||||
<name>native</name>
|
||||
</property>
|
||||
</activation>
|
||||
<properties>
|
||||
<skipITs>false</skipITs>
|
||||
<quarkus.native.enabled>true</quarkus.native.enabled>
|
||||
</properties>
|
||||
</profile>
|
||||
</profiles>
|
||||
</project>
|
||||
43
dhfs-parent/server/.gitignore
vendored
Normal file
43
dhfs-parent/server/.gitignore
vendored
Normal file
@@ -0,0 +1,43 @@
|
||||
#Maven
|
||||
target/
|
||||
pom.xml.tag
|
||||
pom.xml.releaseBackup
|
||||
pom.xml.versionsBackup
|
||||
release.properties
|
||||
.flattened-pom.xml
|
||||
|
||||
# Eclipse
|
||||
.project
|
||||
.classpath
|
||||
.settings/
|
||||
bin/
|
||||
|
||||
# IntelliJ
|
||||
.idea
|
||||
*.ipr
|
||||
*.iml
|
||||
*.iws
|
||||
|
||||
# NetBeans
|
||||
nb-configuration.xml
|
||||
|
||||
# Visual Studio Code
|
||||
.vscode
|
||||
.factorypath
|
||||
|
||||
# OSX
|
||||
.DS_Store
|
||||
|
||||
# Vim
|
||||
*.swp
|
||||
*.swo
|
||||
|
||||
# patch
|
||||
*.orig
|
||||
*.rej
|
||||
|
||||
# Local environment
|
||||
.env
|
||||
|
||||
# Plugin directory
|
||||
/.quarkus/cli/plugins/
|
||||
@@ -2,7 +2,7 @@
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<groupId>com.usatiuk.dhfs.storage</groupId>
|
||||
<groupId>com.usatiuk.dhfs</groupId>
|
||||
<artifactId>server</artifactId>
|
||||
<version>1.0.0-SNAPSHOT</version>
|
||||
|
||||
@@ -200,10 +200,15 @@
|
||||
</native.image.path>
|
||||
<java.util.logging.manager>org.jboss.logmanager.LogManager</java.util.logging.manager>
|
||||
<buildDirectory>${project.build.directory}</buildDirectory>
|
||||
<junit.jupiter.execution.parallel.enabled>false</junit.jupiter.execution.parallel.enabled>
|
||||
<junit.jupiter.execution.parallel.enabled>
|
||||
true
|
||||
</junit.jupiter.execution.parallel.enabled>
|
||||
<junit.jupiter.execution.parallel.mode.default>
|
||||
concurrent
|
||||
</junit.jupiter.execution.parallel.mode.default>
|
||||
<junit.jupiter.execution.parallel.config.dynamic.factor>
|
||||
0.5
|
||||
</junit.jupiter.execution.parallel.config.dynamic.factor>
|
||||
<junit.platform.output.capture.stdout>true</junit.platform.output.capture.stdout>
|
||||
<junit.platform.output.capture.stderr>true</junit.platform.output.capture.stderr>
|
||||
<maven.home>${maven.home}</maven.home>
|
||||
@@ -1,12 +1,16 @@
|
||||
package com.usatiuk.dhfs.objects.jrepository;
|
||||
|
||||
import com.usatiuk.dhfs.objects.repository.ConflictResolver;
|
||||
import com.usatiuk.utils.VoidFn;
|
||||
import io.grpc.Status;
|
||||
import io.grpc.StatusRuntimeException;
|
||||
import io.quarkus.logging.Log;
|
||||
import jakarta.annotation.Nullable;
|
||||
import org.apache.commons.lang3.NotImplementedException;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
|
||||
@@ -16,6 +20,7 @@ public class JObject<T extends JObjectData> implements Serializable, Comparable<
|
||||
private final ObjectMetadata _metaPart;
|
||||
private final JObjectResolver _resolver;
|
||||
private final AtomicReference<T> _dataPart = new AtomicReference<>();
|
||||
private static final int lockTimeoutSecs = 15;
|
||||
|
||||
// Create a new object
|
||||
protected JObject(JObjectResolver resolver, String name, UUID selfUuid, T obj) {
|
||||
@@ -114,7 +119,7 @@ public class JObject<T extends JObjectData> implements Serializable, Comparable<
|
||||
|
||||
private void tryRemoteResolve() {
|
||||
if (_dataPart.get() == null) {
|
||||
_lock.writeLock().lock();
|
||||
rwLock();
|
||||
try {
|
||||
tryLocalResolve();
|
||||
if (_dataPart.get() == null) {
|
||||
@@ -133,7 +138,7 @@ public class JObject<T extends JObjectData> implements Serializable, Comparable<
|
||||
|
||||
private void tryLocalResolve() {
|
||||
if (_dataPart.get() == null) {
|
||||
_lock.readLock().lock();
|
||||
rLock();
|
||||
try {
|
||||
if (_dataPart.get() == null) {
|
||||
var res = _resolver.resolveDataLocal(this);
|
||||
@@ -171,10 +176,36 @@ public class JObject<T extends JObjectData> implements Serializable, Comparable<
|
||||
verifyRefs();
|
||||
}
|
||||
|
||||
public boolean tryRLock() {
|
||||
try {
|
||||
return _lock.readLock().tryLock(lockTimeoutSecs, TimeUnit.SECONDS);
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
public boolean tryRWLock() {
|
||||
try {
|
||||
return _lock.writeLock().tryLock(lockTimeoutSecs, TimeUnit.SECONDS);
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
public void rwLock() {
|
||||
if (!tryRWLock())
|
||||
throw new StatusRuntimeException(Status.UNAVAILABLE.withDescription("Failed to acquire write lock for " + getName()));
|
||||
}
|
||||
|
||||
public void rLock() {
|
||||
if (!tryRLock())
|
||||
throw new StatusRuntimeException(Status.UNAVAILABLE.withDescription("Failed to acquire read lock for " + getName()));
|
||||
}
|
||||
|
||||
public <R> R runReadLocked(ResolutionStrategy resolutionStrategy, ObjectFnRead<T, R> fn) {
|
||||
tryResolve(resolutionStrategy);
|
||||
|
||||
_lock.readLock().lock();
|
||||
rLock();
|
||||
try {
|
||||
if (_metaPart.isDeleted())
|
||||
throw new DeletedObjectAccessException();
|
||||
@@ -197,7 +228,7 @@ public class JObject<T extends JObjectData> implements Serializable, Comparable<
|
||||
}
|
||||
|
||||
public <R> R runWriteLocked(ResolutionStrategy resolutionStrategy, ObjectFnWrite<T, R> fn) {
|
||||
_lock.writeLock().lock();
|
||||
rwLock();
|
||||
try {
|
||||
tryResolve(resolutionStrategy);
|
||||
|
||||
@@ -270,14 +301,6 @@ public class JObject<T extends JObjectData> implements Serializable, Comparable<
|
||||
_resolver.bumpVersionSelf(this);
|
||||
}
|
||||
|
||||
public void rwLock() {
|
||||
_lock.writeLock().lock();
|
||||
}
|
||||
|
||||
public boolean tryRwLock() {
|
||||
return _lock.writeLock().tryLock();
|
||||
}
|
||||
|
||||
public void rwUnlock() {
|
||||
_lock.writeLock().unlock();
|
||||
}
|
||||
@@ -301,11 +324,6 @@ public class JObject<T extends JObjectData> implements Serializable, Comparable<
|
||||
REMOTE
|
||||
}
|
||||
|
||||
@FunctionalInterface
|
||||
public interface VoidFn {
|
||||
void apply();
|
||||
}
|
||||
|
||||
@FunctionalInterface
|
||||
public interface ObjectFnRead<T, R> {
|
||||
R apply(ObjectMetadata meta, @Nullable T data);
|
||||
@@ -7,7 +7,7 @@ public interface JObjectManager {
|
||||
|
||||
Optional<JObject<?>> get(String name);
|
||||
|
||||
Collection<JObject<?>> find(String prefix);
|
||||
Collection<JObject<?>> findAll();
|
||||
|
||||
// Put a new object
|
||||
<T extends JObjectData> JObject<T> put(T object, Optional<String> parent);
|
||||
@@ -1,6 +1,6 @@
|
||||
package com.usatiuk.dhfs.objects.jrepository;
|
||||
|
||||
import com.usatiuk.dhfs.objects.persistence.BlobP;
|
||||
import com.usatiuk.dhfs.objects.persistence.ObjectMetadataP;
|
||||
import com.usatiuk.dhfs.objects.protoserializer.ProtoSerializerService;
|
||||
import com.usatiuk.dhfs.objects.repository.PersistentRemoteHostsService;
|
||||
import com.usatiuk.dhfs.objects.repository.persistence.ObjectPersistentStore;
|
||||
@@ -15,10 +15,11 @@ import lombok.Getter;
|
||||
|
||||
import java.lang.ref.ReferenceQueue;
|
||||
import java.lang.ref.SoftReference;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
@ApplicationScoped
|
||||
public class JObjectManagerImpl implements JObjectManager {
|
||||
@@ -79,9 +80,9 @@ public class JObjectManagerImpl implements JObjectManager {
|
||||
}
|
||||
}
|
||||
|
||||
BlobP readMd;
|
||||
ObjectMetadataP readMd;
|
||||
try {
|
||||
readMd = objectPersistentStore.readObject("meta_" + name);
|
||||
readMd = objectPersistentStore.readObjectMeta(name);
|
||||
} catch (StatusRuntimeException ex) {
|
||||
if (ex.getStatus().getCode().equals(Status.NOT_FOUND.getCode()))
|
||||
return Optional.empty();
|
||||
@@ -108,14 +109,9 @@ public class JObjectManagerImpl implements JObjectManager {
|
||||
}
|
||||
|
||||
@Override
|
||||
public Collection<JObject<?>> find(String prefix) {
|
||||
var ret = new ArrayList<JObject<?>>();
|
||||
for (var f : objectPersistentStore.findObjects("meta_")) {
|
||||
var got = get(f.substring(5));
|
||||
if (got.isPresent())
|
||||
ret.add(got.get());
|
||||
}
|
||||
return ret;
|
||||
public Collection<JObject<?>> findAll() {
|
||||
Stream<JObject<?>> x = objectPersistentStore.findAllObjects().stream().map(f -> get(f).orElse(null));
|
||||
return x.filter(Objects::nonNull).toList(); // Somehow this is needed otherwise typing breaks
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -58,7 +58,7 @@ public class JObjectResolver {
|
||||
|
||||
public boolean hasLocalCopy(JObject<?> self) {
|
||||
if (!self.isDeleted() && refVerification) {
|
||||
if (self.hasLocalCopyMd() && !(self.getData() != null || objectPersistentStore.existsObject(self.getName())))
|
||||
if (self.hasLocalCopyMd() && !(self.getData() != null || objectPersistentStore.existsObjectData(self.getName())))
|
||||
throw new IllegalStateException("hasLocalCopy mismatch for " + self.getName());
|
||||
}
|
||||
// FIXME: Read/write lock assert?
|
||||
@@ -159,7 +159,7 @@ public class JObjectResolver {
|
||||
public <T extends JObjectData> Optional<T> resolveDataLocal(JObject<T> jObject) {
|
||||
// jObject.assertRWLock();
|
||||
// FIXME: No way to assert read lock?
|
||||
if (objectPersistentStore.existsObject(jObject.getName()))
|
||||
if (objectPersistentStore.existsObjectData(jObject.getName()))
|
||||
return Optional.of(protoSerializerService.deserialize(objectPersistentStore.readObject(jObject.getName())));
|
||||
return Optional.empty();
|
||||
}
|
||||
@@ -177,7 +177,7 @@ public class JObjectResolver {
|
||||
Log.trace("Invalidating " + name);
|
||||
jObject.getMeta().getHaveLocalCopy().set(false);
|
||||
jObjectWriteback.remove(jObject);
|
||||
objectPersistentStore.deleteObject(name);
|
||||
objectPersistentStore.deleteObjectData(name);
|
||||
} catch (StatusRuntimeException sx) {
|
||||
if (sx.getStatus() != Status.NOT_FOUND)
|
||||
Log.info("Couldn't delete object from persistent store: ", sx);
|
||||
@@ -162,13 +162,12 @@ public class JObjectWriteback {
|
||||
throw new IllegalStateException("Object deleted but not deletable! " + m.getName());
|
||||
// FIXME: assert Rw lock here?
|
||||
Log.trace("Deleting from persistent storage " + m.getName());
|
||||
objectPersistentStore.deleteObject("meta_" + m.getName());
|
||||
objectPersistentStore.deleteObject(m.getName());
|
||||
return;
|
||||
}
|
||||
objectPersistentStore.writeObject("meta_" + m.getName(), protoSerializerService.serializeToBlobP(m));
|
||||
objectPersistentStore.writeObjectMeta(m.getName(), protoSerializerService.serialize(m));
|
||||
if (data != null)
|
||||
objectPersistentStore.writeObject(m.getName(), protoSerializerService.serializeToBlobP(data));
|
||||
objectPersistentStore.writeObject(m.getName(), protoSerializerService.serializeToJObjectDataP(data));
|
||||
}
|
||||
|
||||
public void remove(JObject<?> object) {
|
||||
@@ -80,7 +80,7 @@ public class ProtoSerializerService {
|
||||
}
|
||||
|
||||
// FIXME: This is annoying
|
||||
public <O> Optional<JObjectDataP> serializeToJObjectDataPInternal(O object) {
|
||||
private <O> Optional<JObjectDataP> serializeToJObjectDataPInternal(O object) {
|
||||
var ser = serialize(object);
|
||||
if (ser instanceof FileP) {
|
||||
return Optional.of(JObjectDataP.newBuilder().setFile((FileP) ser).build());
|
||||
@@ -106,22 +106,6 @@ public class ProtoSerializerService {
|
||||
return serializeToJObjectDataPInternal(object).orElseThrow(() -> new IllegalStateException("Unknown JObjectDataP type: " + object.getClass()));
|
||||
}
|
||||
|
||||
public <O> BlobP serializeToBlobP(O object) {
|
||||
if (object == null) throw new IllegalArgumentException("Object to serialize shouldn't be null");
|
||||
|
||||
var jobjd = serializeToJObjectDataPInternal(object);
|
||||
if (jobjd.isPresent()) {
|
||||
return BlobP.newBuilder().setData((JObjectDataP) jobjd.get()).build();
|
||||
}
|
||||
|
||||
var ser = serialize(object);
|
||||
if (ser instanceof ObjectMetadataP) {
|
||||
return BlobP.newBuilder().setMetadata((ObjectMetadataP) ser).build();
|
||||
} else {
|
||||
throw new IllegalStateException("Unknown BlobP type: " + ser.getClass());
|
||||
}
|
||||
}
|
||||
|
||||
public <M extends Message, O> O deserialize(M message) {
|
||||
if (!_deserializers.containsKey(message.getClass()))
|
||||
throw new IllegalStateException("Deserializer not registered: " + message.getClass());
|
||||
@@ -12,6 +12,7 @@ import io.grpc.StatusRuntimeException;
|
||||
import io.quarkus.logging.Log;
|
||||
import io.quarkus.runtime.ShutdownEvent;
|
||||
import io.quarkus.runtime.StartupEvent;
|
||||
import jakarta.annotation.Nullable;
|
||||
import jakarta.annotation.Priority;
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
import jakarta.enterprise.event.Observes;
|
||||
@@ -107,8 +108,13 @@ public class PersistentRemoteHostsService {
|
||||
return (JObject<PeerDirectory>) got;
|
||||
}
|
||||
|
||||
private void pushPeerUpdates(JObject<?> obj) {
|
||||
Log.info("Scheduling certificate update after " + obj.getName() + " was updated");
|
||||
private void pushPeerUpdates() {
|
||||
pushPeerUpdates(null);
|
||||
}
|
||||
|
||||
private void pushPeerUpdates(@Nullable JObject<?> obj) {
|
||||
if (obj != null)
|
||||
Log.info("Scheduling certificate update after " + obj.getName() + " was updated");
|
||||
executorService.submit(() -> {
|
||||
updateCerts();
|
||||
invalidationQueueService.pushInvalidationToAll(PeerDirectory.PeerDirectoryObjName);
|
||||
@@ -192,8 +198,6 @@ public class PersistentRemoteHostsService {
|
||||
}
|
||||
return addedInner;
|
||||
});
|
||||
if (added)
|
||||
updateCerts();
|
||||
return added;
|
||||
}
|
||||
|
||||
@@ -210,19 +214,22 @@ public class PersistentRemoteHostsService {
|
||||
}
|
||||
return removedInner;
|
||||
});
|
||||
if (removed)
|
||||
updateCerts();
|
||||
return removed;
|
||||
}
|
||||
|
||||
private void updateCerts() {
|
||||
getPeerDirectory().runReadLocked(JObject.ResolutionStrategy.LOCAL_ONLY, (m, d) -> {
|
||||
peerTrustManager.reloadTrustManagerHosts(getHostsNoNulls());
|
||||
// Fixme:? I don't think it should be needed with custom trust store
|
||||
// but it doesn't work?
|
||||
rpcClientFactory.dropCache();
|
||||
return null;
|
||||
});
|
||||
try {
|
||||
getPeerDirectory().runReadLocked(JObject.ResolutionStrategy.LOCAL_ONLY, (m, d) -> {
|
||||
peerTrustManager.reloadTrustManagerHosts(getHostsNoNulls());
|
||||
// Fixme:? I don't think it should be needed with custom trust store
|
||||
// but it doesn't work?
|
||||
rpcClientFactory.dropCache();
|
||||
return null;
|
||||
});
|
||||
} catch (Exception ex) {
|
||||
Log.error("Error when refreshing certificates, will retry", ex);
|
||||
pushPeerUpdates();
|
||||
}
|
||||
}
|
||||
|
||||
public boolean existsHost(UUID uuid) {
|
||||
@@ -62,7 +62,7 @@ public class RemoteHostManager {
|
||||
_heartbeatExecutor.invokeAll(persistentRemoteHostsService.getHostsUuid().stream()
|
||||
.<Callable<Void>>map(host -> () -> {
|
||||
try {
|
||||
if(isReachable(host))
|
||||
if (isReachable(host))
|
||||
Log.trace("Heartbeat: " + host);
|
||||
else
|
||||
Log.info("Trying to connect to " + host);
|
||||
@@ -184,10 +184,13 @@ public class RemoteHostManager {
|
||||
state.setSecurePort(securePort);
|
||||
|
||||
if (!persistentRemoteHostsService.existsHost(host)) {
|
||||
_seenHostsButNotAdded.put(host, state);
|
||||
var prev = _seenHostsButNotAdded.put(host, state);
|
||||
// Needed for tests
|
||||
Log.trace("Ignoring new address from unknown host " + ": addr=" + addr + " port=" + port);
|
||||
if (prev == null)
|
||||
Log.trace("Ignoring new address from unknown host " + ": addr=" + addr + " port=" + port);
|
||||
return;
|
||||
} else {
|
||||
_seenHostsButNotAdded.remove(host);
|
||||
}
|
||||
|
||||
_transientPeersState.runWriteLocked(d -> {
|
||||
@@ -118,7 +118,7 @@ public class RemoteObjectServiceServer implements DhfsObjectSyncGrpc {
|
||||
|
||||
Log.info("<-- getIndex: from " + request.getSelfUuid());
|
||||
|
||||
var objs = jObjectManager.find("");
|
||||
var objs = jObjectManager.findAll();
|
||||
|
||||
var reqUuid = UUID.fromString(request.getSelfUuid());
|
||||
|
||||
@@ -21,9 +21,6 @@ public class RpcClientFactory {
|
||||
@ConfigProperty(name = "dhfs.objects.sync.timeout")
|
||||
long syncTimeout;
|
||||
|
||||
@ConfigProperty(name = "dhfs.objects.peersync.timeout")
|
||||
long peerSyncTimeout;
|
||||
|
||||
@Inject
|
||||
RemoteHostManager remoteHostManager;
|
||||
|
||||
@@ -54,9 +51,12 @@ public class RpcClientFactory {
|
||||
var hostinfo = remoteHostManager.getTransientState(target);
|
||||
boolean reachable = remoteHostManager.isReachable(target);
|
||||
|
||||
if (hostinfo.getAddr() == null || !reachable)
|
||||
if (hostinfo.getAddr() == null)
|
||||
throw new StatusRuntimeException(Status.UNAVAILABLE.withDescription("Address for " + target + " not yet known"));
|
||||
|
||||
if (!reachable)
|
||||
throw new StatusRuntimeException(Status.UNAVAILABLE.withDescription("Not known to be reachable: " + target));
|
||||
|
||||
return withObjSyncClient(target.toString(), hostinfo.getAddr(), hostinfo.getSecurePort(), syncTimeout, fn);
|
||||
}
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
package com.usatiuk.dhfs.objects.repository;
|
||||
|
||||
import com.usatiuk.dhfs.SerializationHelper;
|
||||
import com.usatiuk.dhfs.objects.jrepository.JObject;
|
||||
import com.usatiuk.dhfs.objects.jrepository.JObjectData;
|
||||
import com.usatiuk.dhfs.objects.jrepository.JObjectManager;
|
||||
@@ -42,7 +41,7 @@ public class SyncHandler {
|
||||
remoteObjectServiceClient.getIndex(host);
|
||||
// Push our index to the other peer too, as they might not request it if
|
||||
// they didn't thing we were disconnected
|
||||
var objs = jObjectManager.find("");
|
||||
var objs = jObjectManager.findAll();
|
||||
|
||||
for (var obj : objs)
|
||||
invalidationQueueService.pushInvalidationToOne(host, obj.getName());
|
||||
@@ -48,7 +48,7 @@ public class AutoSyncProcessor {
|
||||
|
||||
if (downloadAll)
|
||||
executorService.submit(() -> {
|
||||
for (var obj : jObjectManager.find("")) {
|
||||
for (var obj : jObjectManager.findAll()) {
|
||||
if (!obj.hasLocalCopy())
|
||||
add(obj.getName());
|
||||
}
|
||||
@@ -0,0 +1,176 @@
|
||||
package com.usatiuk.dhfs.objects.repository.persistence;
|
||||
|
||||
import com.google.protobuf.Message;
|
||||
import com.usatiuk.dhfs.objects.persistence.JObjectDataP;
|
||||
import com.usatiuk.dhfs.objects.persistence.ObjectMetadataP;
|
||||
import com.usatiuk.utils.StatusRuntimeExceptionNoStacktrace;
|
||||
import io.grpc.Status;
|
||||
import io.quarkus.logging.Log;
|
||||
import io.quarkus.runtime.ShutdownEvent;
|
||||
import io.quarkus.runtime.StartupEvent;
|
||||
import jakarta.annotation.Priority;
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
import jakarta.enterprise.event.Observes;
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
import org.eclipse.microprofile.config.inject.ConfigProperty;
|
||||
|
||||
import javax.annotation.Nonnull;
|
||||
import java.io.*;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.NoSuchFileException;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.Paths;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.Objects;
|
||||
|
||||
@ApplicationScoped
|
||||
public class FileObjectPersistentStore implements ObjectPersistentStore {
|
||||
private final String root;
|
||||
|
||||
private final Path metaPath;
|
||||
private final Path dataPath;
|
||||
|
||||
public FileObjectPersistentStore(@ConfigProperty(name = "dhfs.objects.persistence.files.root") String root) {
|
||||
this.root = root;
|
||||
this.metaPath = Paths.get(root, "meta");
|
||||
this.dataPath = Paths.get(root, "data");
|
||||
}
|
||||
|
||||
void init(@Observes @Priority(200) StartupEvent event) {
|
||||
if (!metaPath.toFile().exists()) {
|
||||
Log.info("Initializing with root " + root);
|
||||
metaPath.toFile().mkdirs();
|
||||
dataPath.toFile().mkdirs();
|
||||
for (int i = 0; i < 256; i++) {
|
||||
for (int j = 0; j < 256; j++) {
|
||||
metaPath.resolve(String.valueOf(i)).resolve(String.valueOf(j)).toFile().mkdirs();
|
||||
dataPath.resolve(String.valueOf(i)).resolve(String.valueOf(j)).toFile().mkdirs();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void shutdown(@Observes @Priority(400) ShutdownEvent event) {
|
||||
Log.info("Shutdown");
|
||||
}
|
||||
|
||||
private Pair<String, String> getDirPathComponents(@Nonnull String obj) {
|
||||
int h = Objects.hash(obj);
|
||||
int p1 = h & 0b00000000_00000000_11111111_00000000;
|
||||
int p2 = h & 0b00000000_00000000_00000000_11111111;
|
||||
return Pair.ofNonNull(String.valueOf(p1 >> 8), String.valueOf(p2));
|
||||
}
|
||||
|
||||
private Path getMetaPath(@Nonnull String obj) {
|
||||
var components = getDirPathComponents(obj);
|
||||
return metaPath.resolve(components.getLeft()).resolve(components.getRight()).resolve(obj);
|
||||
}
|
||||
|
||||
private Path getDataPath(@Nonnull String obj) {
|
||||
var components = getDirPathComponents(obj);
|
||||
return dataPath.resolve(components.getLeft()).resolve(components.getRight()).resolve(obj);
|
||||
}
|
||||
|
||||
private void findAllObjectsImpl(Collection<String> out, Path path) {
|
||||
var read = path.toFile().listFiles();
|
||||
if (read == null) return;
|
||||
|
||||
for (var s : read) {
|
||||
if (s.isDirectory()) {
|
||||
findAllObjectsImpl(out, s.toPath());
|
||||
} else {
|
||||
out.add(s.getName());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Nonnull
|
||||
@Override
|
||||
public Collection<String> findAllObjects() {
|
||||
ArrayList<String> out = new ArrayList<>();
|
||||
findAllObjectsImpl(out, metaPath);
|
||||
return Collections.unmodifiableCollection(out);
|
||||
}
|
||||
|
||||
@Nonnull
|
||||
@Override
|
||||
public Boolean existsObject(String name) {
|
||||
return getMetaPath(name).toFile().isFile();
|
||||
}
|
||||
|
||||
@Nonnull
|
||||
@Override
|
||||
public Boolean existsObjectData(String name) {
|
||||
return getDataPath(name).toFile().isFile();
|
||||
}
|
||||
|
||||
private <T extends Message> T readObjectImpl(T defaultInstance, Path path) {
|
||||
try (var fsb = new FileInputStream(path.toFile());
|
||||
var fs = new BufferedInputStream(fsb, 1048576)) {
|
||||
return (T) defaultInstance.getParserForType().parseFrom(fs);
|
||||
} catch (FileNotFoundException | NoSuchFileException fx) {
|
||||
throw new StatusRuntimeExceptionNoStacktrace(Status.NOT_FOUND);
|
||||
} catch (IOException e) {
|
||||
Log.error("Error reading file " + path, e);
|
||||
throw new StatusRuntimeExceptionNoStacktrace(Status.INTERNAL);
|
||||
}
|
||||
}
|
||||
|
||||
@Nonnull
|
||||
@Override
|
||||
public JObjectDataP readObject(String name) {
|
||||
return readObjectImpl(JObjectDataP.getDefaultInstance(), getDataPath(name));
|
||||
}
|
||||
|
||||
@Nonnull
|
||||
@Override
|
||||
public ObjectMetadataP readObjectMeta(String name) {
|
||||
return readObjectImpl(ObjectMetadataP.getDefaultInstance(), getMetaPath(name));
|
||||
}
|
||||
|
||||
private void writeObjectImpl(Path path, Message data) {
|
||||
try {
|
||||
try (var fsb = new FileOutputStream(path.toFile(), false);
|
||||
var fs = new BufferedOutputStream(fsb, 1048576)) {
|
||||
data.writeTo(fs);
|
||||
}
|
||||
} catch (IOException e) {
|
||||
Log.error("Error writing file " + path, e);
|
||||
throw new StatusRuntimeExceptionNoStacktrace(Status.INTERNAL);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeObject(String name, JObjectDataP data) {
|
||||
writeObjectImpl(getDataPath(name), data);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeObjectMeta(String name, ObjectMetadataP data) {
|
||||
writeObjectImpl(getMetaPath(name), data);
|
||||
}
|
||||
|
||||
private void deleteImpl(Path path) {
|
||||
try {
|
||||
Files.delete(path);
|
||||
} catch (NoSuchFileException ignored) {
|
||||
} catch (IOException e) {
|
||||
Log.error("Error deleting file " + path, e);
|
||||
throw new StatusRuntimeExceptionNoStacktrace(Status.INTERNAL);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void deleteObjectData(String name) {
|
||||
deleteImpl(getDataPath(name));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void deleteObject(String name) {
|
||||
deleteImpl(getDataPath(name));
|
||||
// FIXME: Race?
|
||||
deleteImpl(getMetaPath(name));
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,33 @@
|
||||
package com.usatiuk.dhfs.objects.repository.persistence;
|
||||
|
||||
import com.usatiuk.dhfs.objects.persistence.JObjectDataP;
|
||||
import com.usatiuk.dhfs.objects.persistence.ObjectMetadataP;
|
||||
|
||||
import javax.annotation.Nonnull;
|
||||
import java.util.Collection;
|
||||
|
||||
public interface ObjectPersistentStore {
|
||||
@Nonnull
|
||||
Collection<String> findAllObjects();
|
||||
|
||||
@Nonnull
|
||||
Boolean existsObject(String name);
|
||||
|
||||
@Nonnull
|
||||
Boolean existsObjectData(String name);
|
||||
|
||||
@Nonnull
|
||||
JObjectDataP readObject(String name);
|
||||
|
||||
@Nonnull
|
||||
ObjectMetadataP readObjectMeta(String name);
|
||||
|
||||
void writeObject(String name, JObjectDataP data);
|
||||
|
||||
void writeObjectMeta(String name, ObjectMetadataP data);
|
||||
|
||||
void deleteObjectData(String name);
|
||||
|
||||
// Deletes object metadata and data
|
||||
void deleteObject(String name);
|
||||
}
|
||||
@@ -0,0 +1,7 @@
|
||||
package com.usatiuk.utils;
|
||||
|
||||
@FunctionalInterface
|
||||
public interface VoidFn {
|
||||
void apply();
|
||||
}
|
||||
|
||||
@@ -67,11 +67,4 @@ message JObjectDataP {
|
||||
PeerDirectoryP peerDirectory = 6;
|
||||
PersistentPeerInfoP persistentPeerInfo = 7;
|
||||
}
|
||||
}
|
||||
|
||||
message BlobP {
|
||||
oneof dtype {
|
||||
ObjectMetadataP metadata = 1;
|
||||
JObjectDataP data = 2;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -3,7 +3,6 @@ dhfs.objects.persistence.files.root=${HOME}/dhfs_data/dhfs_root
|
||||
dhfs.objects.root=${HOME}/dhfs_data/dhfs_root_d
|
||||
dhfs.objects.peerdiscovery.port=42069
|
||||
dhfs.objects.peerdiscovery.interval=5000
|
||||
dhfs.objects.peersync.timeout=5
|
||||
dhfs.objects.sync.timeout=30
|
||||
dhfs.objects.sync.ping.timeout=5
|
||||
dhfs.objects.invalidation.threads=4
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user