20 Commits

Author SHA1 Message Date
d30dc594d3 create all the file buckets on init 2024-07-26 15:39:01 +02:00
1148713871 actually, really fix the bug 2024-07-26 15:16:50 +02:00
bfe19883a4 actually, fix the bug 2024-07-26 14:56:41 +02:00
c1ba6b9de1 actually, add lsdir into connection interruption tests
as somehow root dir gets invalidated for container1 in them
2024-07-26 14:40:45 +02:00
f130dfe4cc Revert "actually, decrease ping timeout"
This reverts commit 4cb7ee264f.
2024-07-26 14:36:03 +02:00
4cb7ee264f actually, decrease ping timeout
to be smaller than delay in Delayed tests
2024-07-26 14:23:56 +02:00
16f817a660 also increase ping timeout 2024-07-26 14:19:50 +02:00
c4afaa1907 test timeouts increase 2024-07-26 14:14:45 +02:00
52882dc687 avoid crazy numbers of files in directories 2024-07-26 14:06:09 +02:00
91c7998291 don't wait for extra Ignoring new address as they don't exist anymore 2024-07-26 11:01:36 +02:00
45e3181b3e remove from seenHostsButNotAdded after adding 2024-07-26 10:49:18 +02:00
5787bdddc1 less log spam with "Ignoring new address" 2024-07-26 10:45:19 +02:00
0c4af16390 better unreachable error 2024-07-26 10:37:11 +02:00
b21ddfb6ec timed jobject locks 2024-07-26 10:31:17 +02:00
6cd6cf7c6d more clean cert updates 2024-07-26 10:25:53 +02:00
4d5c99dbf8 less delays in tests 2024-07-26 10:10:01 +02:00
14049eca17 no need for fuse dev and maven in apt 2024-07-26 10:03:29 +02:00
bbb93ea2e1 fix run-wrapper publish 2024-07-26 10:02:04 +02:00
606afb4f53 less crazy parallel test runs 2024-07-26 10:01:33 +02:00
684eb436d4 move to multimodule pom 2024-07-26 09:51:00 +02:00
120 changed files with 510 additions and 945 deletions

View File

@@ -26,7 +26,7 @@ jobs:
if: env.ACT=='true'
- name: Install fuse and maven
run: sudo apt-get update && sudo apt-get install -y fuse libfuse-dev maven
run: sudo apt-get update && sudo apt-get install -y fuse
- name: Set up JDK 21
uses: actions/setup-java@v4
@@ -36,18 +36,18 @@ jobs:
cache: maven
- name: Build and test with Maven
run: cd server && mvn --batch-mode --update-snapshots package verify
run: cd dhfs-parent && mvn --batch-mode --update-snapshots package verify
- uses: actions/upload-artifact@v3
with:
name: DHFS Package
path: server/target/quarkus-app
name: DHFS Server Package
path: dhfs-parent/server/target/quarkus-app
- uses: actions/upload-artifact@v3
if: ${{ always() }}
with:
name: Test logs
path: server/target/*-reports
path: dhfs-parent/**/target/*-reports
build-webui:
runs-on: ubuntu-latest
@@ -90,7 +90,7 @@ jobs:
- uses: actions/download-artifact@v3
with:
name: DHFS Package
name: DHFS Server Package
path: dhfs-package-downloaded
- uses: actions/download-artifact@v3
@@ -181,7 +181,7 @@ jobs:
- uses: actions/download-artifact@v3
with:
name: DHFS Package
name: DHFS Server Package
path: dhfs-package-downloaded
- uses: actions/download-artifact@v3

130
dhfs-parent/pom.xml Normal file
View File

@@ -0,0 +1,130 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.usatiuk.dhfs</groupId>
<artifactId>parent</artifactId>
<version>1.0-SNAPSHOT</version>
<name>dhfs-parent</name>
<packaging>pom</packaging>
<modules>
<module>server</module>
</modules>
<properties>
<compiler-plugin.version>3.12.1</compiler-plugin.version>
<maven.compiler.release>21</maven.compiler.release>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<quarkus.platform.artifact-id>quarkus-bom</quarkus.platform.artifact-id>
<quarkus.platform.group-id>io.quarkus.platform</quarkus.platform.group-id>
<quarkus.platform.version>3.11.3</quarkus.platform.version>
<surefire-plugin.version>3.2.5</surefire-plugin.version>
</properties>
<repositories>
<repository>
<id>jitpack.io</id>
<url>https://jitpack.io</url>
</repository>
</repositories>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>${quarkus.platform.group-id}</groupId>
<artifactId>${quarkus.platform.artifact-id}</artifactId>
<version>${quarkus.platform.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<plugin>
<groupId>${quarkus.platform.group-id}</groupId>
<artifactId>quarkus-maven-plugin</artifactId>
<version>${quarkus.platform.version}</version>
<extensions>true</extensions>
<executions>
<execution>
<goals>
<goal>build</goal>
<goal>generate-code</goal>
<goal>generate-code-tests</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>${compiler-plugin.version}</version>
<configuration>
<compilerArgs>
<arg>-parameters</arg>
</compilerArgs>
</configuration>
</plugin>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<version>${surefire-plugin.version}</version>
<configuration>
<systemPropertyVariables>
<java.util.logging.manager>org.jboss.logmanager.LogManager</java.util.logging.manager>
<maven.home>${maven.home}</maven.home>
</systemPropertyVariables>
<argLine>--add-exports java.base/sun.nio.ch=ALL-UNNAMED</argLine>
<skipTests>${skip.unit}</skipTests>
</configuration>
</plugin>
<plugin>
<artifactId>maven-failsafe-plugin</artifactId>
<version>${surefire-plugin.version}</version>
<executions>
<execution>
<goals>
<goal>integration-test</goal>
<goal>verify</goal>
</goals>
</execution>
</executions>
<configuration>
<systemPropertyVariables>
<native.image.path>
${project.build.directory}/${project.build.finalName}-runner
</native.image.path>
<java.util.logging.manager>org.jboss.logmanager.LogManager</java.util.logging.manager>
<buildDirectory>${project.build.directory}</buildDirectory>
<junit.jupiter.execution.parallel.enabled>false</junit.jupiter.execution.parallel.enabled>
<junit.jupiter.execution.parallel.mode.default>
concurrent
</junit.jupiter.execution.parallel.mode.default>
<junit.platform.output.capture.stdout>true</junit.platform.output.capture.stdout>
<junit.platform.output.capture.stderr>true</junit.platform.output.capture.stderr>
<maven.home>${maven.home}</maven.home>
</systemPropertyVariables>
</configuration>
</plugin>
</plugins>
</build>
<profiles>
<profile>
<id>native</id>
<activation>
<property>
<name>native</name>
</property>
</activation>
<properties>
<skipITs>false</skipITs>
<quarkus.native.enabled>true</quarkus.native.enabled>
</properties>
</profile>
</profiles>
</project>

43
dhfs-parent/server/.gitignore vendored Normal file
View File

@@ -0,0 +1,43 @@
#Maven
target/
pom.xml.tag
pom.xml.releaseBackup
pom.xml.versionsBackup
release.properties
.flattened-pom.xml
# Eclipse
.project
.classpath
.settings/
bin/
# IntelliJ
.idea
*.ipr
*.iml
*.iws
# NetBeans
nb-configuration.xml
# Visual Studio Code
.vscode
.factorypath
# OSX
.DS_Store
# Vim
*.swp
*.swo
# patch
*.orig
*.rej
# Local environment
.env
# Plugin directory
/.quarkus/cli/plugins/

View File

@@ -2,7 +2,7 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.usatiuk.dhfs.storage</groupId>
<groupId>com.usatiuk.dhfs</groupId>
<artifactId>server</artifactId>
<version>1.0.0-SNAPSHOT</version>
@@ -200,10 +200,15 @@
</native.image.path>
<java.util.logging.manager>org.jboss.logmanager.LogManager</java.util.logging.manager>
<buildDirectory>${project.build.directory}</buildDirectory>
<junit.jupiter.execution.parallel.enabled>false</junit.jupiter.execution.parallel.enabled>
<junit.jupiter.execution.parallel.enabled>
true
</junit.jupiter.execution.parallel.enabled>
<junit.jupiter.execution.parallel.mode.default>
concurrent
</junit.jupiter.execution.parallel.mode.default>
<junit.jupiter.execution.parallel.config.dynamic.factor>
0.5
</junit.jupiter.execution.parallel.config.dynamic.factor>
<junit.platform.output.capture.stdout>true</junit.platform.output.capture.stdout>
<junit.platform.output.capture.stderr>true</junit.platform.output.capture.stderr>
<maven.home>${maven.home}</maven.home>

View File

@@ -1,12 +1,16 @@
package com.usatiuk.dhfs.objects.jrepository;
import com.usatiuk.dhfs.objects.repository.ConflictResolver;
import com.usatiuk.utils.VoidFn;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.quarkus.logging.Log;
import jakarta.annotation.Nullable;
import org.apache.commons.lang3.NotImplementedException;
import java.io.Serializable;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -16,6 +20,7 @@ public class JObject<T extends JObjectData> implements Serializable, Comparable<
private final ObjectMetadata _metaPart;
private final JObjectResolver _resolver;
private final AtomicReference<T> _dataPart = new AtomicReference<>();
private static final int lockTimeoutSecs = 15;
// Create a new object
protected JObject(JObjectResolver resolver, String name, UUID selfUuid, T obj) {
@@ -114,7 +119,7 @@ public class JObject<T extends JObjectData> implements Serializable, Comparable<
private void tryRemoteResolve() {
if (_dataPart.get() == null) {
_lock.writeLock().lock();
rwLock();
try {
tryLocalResolve();
if (_dataPart.get() == null) {
@@ -133,7 +138,7 @@ public class JObject<T extends JObjectData> implements Serializable, Comparable<
private void tryLocalResolve() {
if (_dataPart.get() == null) {
_lock.readLock().lock();
rLock();
try {
if (_dataPart.get() == null) {
var res = _resolver.resolveDataLocal(this);
@@ -171,10 +176,36 @@ public class JObject<T extends JObjectData> implements Serializable, Comparable<
verifyRefs();
}
public boolean tryRLock() {
try {
return _lock.readLock().tryLock(lockTimeoutSecs, TimeUnit.SECONDS);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
public boolean tryRWLock() {
try {
return _lock.writeLock().tryLock(lockTimeoutSecs, TimeUnit.SECONDS);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
public void rwLock() {
if (!tryRWLock())
throw new StatusRuntimeException(Status.UNAVAILABLE.withDescription("Failed to acquire write lock for " + getName()));
}
public void rLock() {
if (!tryRLock())
throw new StatusRuntimeException(Status.UNAVAILABLE.withDescription("Failed to acquire read lock for " + getName()));
}
public <R> R runReadLocked(ResolutionStrategy resolutionStrategy, ObjectFnRead<T, R> fn) {
tryResolve(resolutionStrategy);
_lock.readLock().lock();
rLock();
try {
if (_metaPart.isDeleted())
throw new DeletedObjectAccessException();
@@ -197,7 +228,7 @@ public class JObject<T extends JObjectData> implements Serializable, Comparable<
}
public <R> R runWriteLocked(ResolutionStrategy resolutionStrategy, ObjectFnWrite<T, R> fn) {
_lock.writeLock().lock();
rwLock();
try {
tryResolve(resolutionStrategy);
@@ -270,14 +301,6 @@ public class JObject<T extends JObjectData> implements Serializable, Comparable<
_resolver.bumpVersionSelf(this);
}
public void rwLock() {
_lock.writeLock().lock();
}
public boolean tryRwLock() {
return _lock.writeLock().tryLock();
}
public void rwUnlock() {
_lock.writeLock().unlock();
}
@@ -301,11 +324,6 @@ public class JObject<T extends JObjectData> implements Serializable, Comparable<
REMOTE
}
@FunctionalInterface
public interface VoidFn {
void apply();
}
@FunctionalInterface
public interface ObjectFnRead<T, R> {
R apply(ObjectMetadata meta, @Nullable T data);

View File

@@ -7,7 +7,7 @@ public interface JObjectManager {
Optional<JObject<?>> get(String name);
Collection<JObject<?>> find(String prefix);
Collection<JObject<?>> findAll();
// Put a new object
<T extends JObjectData> JObject<T> put(T object, Optional<String> parent);

View File

@@ -1,6 +1,6 @@
package com.usatiuk.dhfs.objects.jrepository;
import com.usatiuk.dhfs.objects.persistence.BlobP;
import com.usatiuk.dhfs.objects.persistence.ObjectMetadataP;
import com.usatiuk.dhfs.objects.protoserializer.ProtoSerializerService;
import com.usatiuk.dhfs.objects.repository.PersistentRemoteHostsService;
import com.usatiuk.dhfs.objects.repository.persistence.ObjectPersistentStore;
@@ -15,10 +15,11 @@ import lombok.Getter;
import java.lang.ref.ReferenceQueue;
import java.lang.ref.SoftReference;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Stream;
@ApplicationScoped
public class JObjectManagerImpl implements JObjectManager {
@@ -79,9 +80,9 @@ public class JObjectManagerImpl implements JObjectManager {
}
}
BlobP readMd;
ObjectMetadataP readMd;
try {
readMd = objectPersistentStore.readObject("meta_" + name);
readMd = objectPersistentStore.readObjectMeta(name);
} catch (StatusRuntimeException ex) {
if (ex.getStatus().getCode().equals(Status.NOT_FOUND.getCode()))
return Optional.empty();
@@ -108,14 +109,9 @@ public class JObjectManagerImpl implements JObjectManager {
}
@Override
public Collection<JObject<?>> find(String prefix) {
var ret = new ArrayList<JObject<?>>();
for (var f : objectPersistentStore.findObjects("meta_")) {
var got = get(f.substring(5));
if (got.isPresent())
ret.add(got.get());
}
return ret;
public Collection<JObject<?>> findAll() {
Stream<JObject<?>> x = objectPersistentStore.findAllObjects().stream().map(f -> get(f).orElse(null));
return x.filter(Objects::nonNull).toList(); // Somehow this is needed otherwise typing breaks
}
@Override

View File

@@ -58,7 +58,7 @@ public class JObjectResolver {
public boolean hasLocalCopy(JObject<?> self) {
if (!self.isDeleted() && refVerification) {
if (self.hasLocalCopyMd() && !(self.getData() != null || objectPersistentStore.existsObject(self.getName())))
if (self.hasLocalCopyMd() && !(self.getData() != null || objectPersistentStore.existsObjectData(self.getName())))
throw new IllegalStateException("hasLocalCopy mismatch for " + self.getName());
}
// FIXME: Read/write lock assert?
@@ -159,7 +159,7 @@ public class JObjectResolver {
public <T extends JObjectData> Optional<T> resolveDataLocal(JObject<T> jObject) {
// jObject.assertRWLock();
// FIXME: No way to assert read lock?
if (objectPersistentStore.existsObject(jObject.getName()))
if (objectPersistentStore.existsObjectData(jObject.getName()))
return Optional.of(protoSerializerService.deserialize(objectPersistentStore.readObject(jObject.getName())));
return Optional.empty();
}
@@ -177,7 +177,7 @@ public class JObjectResolver {
Log.trace("Invalidating " + name);
jObject.getMeta().getHaveLocalCopy().set(false);
jObjectWriteback.remove(jObject);
objectPersistentStore.deleteObject(name);
objectPersistentStore.deleteObjectData(name);
} catch (StatusRuntimeException sx) {
if (sx.getStatus() != Status.NOT_FOUND)
Log.info("Couldn't delete object from persistent store: ", sx);

View File

@@ -162,13 +162,12 @@ public class JObjectWriteback {
throw new IllegalStateException("Object deleted but not deletable! " + m.getName());
// FIXME: assert Rw lock here?
Log.trace("Deleting from persistent storage " + m.getName());
objectPersistentStore.deleteObject("meta_" + m.getName());
objectPersistentStore.deleteObject(m.getName());
return;
}
objectPersistentStore.writeObject("meta_" + m.getName(), protoSerializerService.serializeToBlobP(m));
objectPersistentStore.writeObjectMeta(m.getName(), protoSerializerService.serialize(m));
if (data != null)
objectPersistentStore.writeObject(m.getName(), protoSerializerService.serializeToBlobP(data));
objectPersistentStore.writeObject(m.getName(), protoSerializerService.serializeToJObjectDataP(data));
}
public void remove(JObject<?> object) {

View File

@@ -80,7 +80,7 @@ public class ProtoSerializerService {
}
// FIXME: This is annoying
public <O> Optional<JObjectDataP> serializeToJObjectDataPInternal(O object) {
private <O> Optional<JObjectDataP> serializeToJObjectDataPInternal(O object) {
var ser = serialize(object);
if (ser instanceof FileP) {
return Optional.of(JObjectDataP.newBuilder().setFile((FileP) ser).build());
@@ -106,22 +106,6 @@ public class ProtoSerializerService {
return serializeToJObjectDataPInternal(object).orElseThrow(() -> new IllegalStateException("Unknown JObjectDataP type: " + object.getClass()));
}
public <O> BlobP serializeToBlobP(O object) {
if (object == null) throw new IllegalArgumentException("Object to serialize shouldn't be null");
var jobjd = serializeToJObjectDataPInternal(object);
if (jobjd.isPresent()) {
return BlobP.newBuilder().setData((JObjectDataP) jobjd.get()).build();
}
var ser = serialize(object);
if (ser instanceof ObjectMetadataP) {
return BlobP.newBuilder().setMetadata((ObjectMetadataP) ser).build();
} else {
throw new IllegalStateException("Unknown BlobP type: " + ser.getClass());
}
}
public <M extends Message, O> O deserialize(M message) {
if (!_deserializers.containsKey(message.getClass()))
throw new IllegalStateException("Deserializer not registered: " + message.getClass());

View File

@@ -12,6 +12,7 @@ import io.grpc.StatusRuntimeException;
import io.quarkus.logging.Log;
import io.quarkus.runtime.ShutdownEvent;
import io.quarkus.runtime.StartupEvent;
import jakarta.annotation.Nullable;
import jakarta.annotation.Priority;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
@@ -107,8 +108,13 @@ public class PersistentRemoteHostsService {
return (JObject<PeerDirectory>) got;
}
private void pushPeerUpdates(JObject<?> obj) {
Log.info("Scheduling certificate update after " + obj.getName() + " was updated");
private void pushPeerUpdates() {
pushPeerUpdates(null);
}
private void pushPeerUpdates(@Nullable JObject<?> obj) {
if (obj != null)
Log.info("Scheduling certificate update after " + obj.getName() + " was updated");
executorService.submit(() -> {
updateCerts();
invalidationQueueService.pushInvalidationToAll(PeerDirectory.PeerDirectoryObjName);
@@ -192,8 +198,6 @@ public class PersistentRemoteHostsService {
}
return addedInner;
});
if (added)
updateCerts();
return added;
}
@@ -210,19 +214,22 @@ public class PersistentRemoteHostsService {
}
return removedInner;
});
if (removed)
updateCerts();
return removed;
}
private void updateCerts() {
getPeerDirectory().runReadLocked(JObject.ResolutionStrategy.LOCAL_ONLY, (m, d) -> {
peerTrustManager.reloadTrustManagerHosts(getHostsNoNulls());
// Fixme:? I don't think it should be needed with custom trust store
// but it doesn't work?
rpcClientFactory.dropCache();
return null;
});
try {
getPeerDirectory().runReadLocked(JObject.ResolutionStrategy.LOCAL_ONLY, (m, d) -> {
peerTrustManager.reloadTrustManagerHosts(getHostsNoNulls());
// Fixme:? I don't think it should be needed with custom trust store
// but it doesn't work?
rpcClientFactory.dropCache();
return null;
});
} catch (Exception ex) {
Log.error("Error when refreshing certificates, will retry", ex);
pushPeerUpdates();
}
}
public boolean existsHost(UUID uuid) {

View File

@@ -62,7 +62,7 @@ public class RemoteHostManager {
_heartbeatExecutor.invokeAll(persistentRemoteHostsService.getHostsUuid().stream()
.<Callable<Void>>map(host -> () -> {
try {
if(isReachable(host))
if (isReachable(host))
Log.trace("Heartbeat: " + host);
else
Log.info("Trying to connect to " + host);
@@ -184,10 +184,13 @@ public class RemoteHostManager {
state.setSecurePort(securePort);
if (!persistentRemoteHostsService.existsHost(host)) {
_seenHostsButNotAdded.put(host, state);
var prev = _seenHostsButNotAdded.put(host, state);
// Needed for tests
Log.trace("Ignoring new address from unknown host " + ": addr=" + addr + " port=" + port);
if (prev == null)
Log.trace("Ignoring new address from unknown host " + ": addr=" + addr + " port=" + port);
return;
} else {
_seenHostsButNotAdded.remove(host);
}
_transientPeersState.runWriteLocked(d -> {

View File

@@ -118,7 +118,7 @@ public class RemoteObjectServiceServer implements DhfsObjectSyncGrpc {
Log.info("<-- getIndex: from " + request.getSelfUuid());
var objs = jObjectManager.find("");
var objs = jObjectManager.findAll();
var reqUuid = UUID.fromString(request.getSelfUuid());

View File

@@ -21,9 +21,6 @@ public class RpcClientFactory {
@ConfigProperty(name = "dhfs.objects.sync.timeout")
long syncTimeout;
@ConfigProperty(name = "dhfs.objects.peersync.timeout")
long peerSyncTimeout;
@Inject
RemoteHostManager remoteHostManager;
@@ -54,9 +51,12 @@ public class RpcClientFactory {
var hostinfo = remoteHostManager.getTransientState(target);
boolean reachable = remoteHostManager.isReachable(target);
if (hostinfo.getAddr() == null || !reachable)
if (hostinfo.getAddr() == null)
throw new StatusRuntimeException(Status.UNAVAILABLE.withDescription("Address for " + target + " not yet known"));
if (!reachable)
throw new StatusRuntimeException(Status.UNAVAILABLE.withDescription("Not known to be reachable: " + target));
return withObjSyncClient(target.toString(), hostinfo.getAddr(), hostinfo.getSecurePort(), syncTimeout, fn);
}

View File

@@ -1,6 +1,5 @@
package com.usatiuk.dhfs.objects.repository;
import com.usatiuk.dhfs.SerializationHelper;
import com.usatiuk.dhfs.objects.jrepository.JObject;
import com.usatiuk.dhfs.objects.jrepository.JObjectData;
import com.usatiuk.dhfs.objects.jrepository.JObjectManager;
@@ -42,7 +41,7 @@ public class SyncHandler {
remoteObjectServiceClient.getIndex(host);
// Push our index to the other peer too, as they might not request it if
// they didn't thing we were disconnected
var objs = jObjectManager.find("");
var objs = jObjectManager.findAll();
for (var obj : objs)
invalidationQueueService.pushInvalidationToOne(host, obj.getName());

View File

@@ -48,7 +48,7 @@ public class AutoSyncProcessor {
if (downloadAll)
executorService.submit(() -> {
for (var obj : jObjectManager.find("")) {
for (var obj : jObjectManager.findAll()) {
if (!obj.hasLocalCopy())
add(obj.getName());
}

View File

@@ -0,0 +1,176 @@
package com.usatiuk.dhfs.objects.repository.persistence;
import com.google.protobuf.Message;
import com.usatiuk.dhfs.objects.persistence.JObjectDataP;
import com.usatiuk.dhfs.objects.persistence.ObjectMetadataP;
import com.usatiuk.utils.StatusRuntimeExceptionNoStacktrace;
import io.grpc.Status;
import io.quarkus.logging.Log;
import io.quarkus.runtime.ShutdownEvent;
import io.quarkus.runtime.StartupEvent;
import jakarta.annotation.Priority;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
import org.apache.commons.lang3.tuple.Pair;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import javax.annotation.Nonnull;
import java.io.*;
import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Objects;
@ApplicationScoped
public class FileObjectPersistentStore implements ObjectPersistentStore {
private final String root;
private final Path metaPath;
private final Path dataPath;
public FileObjectPersistentStore(@ConfigProperty(name = "dhfs.objects.persistence.files.root") String root) {
this.root = root;
this.metaPath = Paths.get(root, "meta");
this.dataPath = Paths.get(root, "data");
}
void init(@Observes @Priority(200) StartupEvent event) {
if (!metaPath.toFile().exists()) {
Log.info("Initializing with root " + root);
metaPath.toFile().mkdirs();
dataPath.toFile().mkdirs();
for (int i = 0; i < 256; i++) {
for (int j = 0; j < 256; j++) {
metaPath.resolve(String.valueOf(i)).resolve(String.valueOf(j)).toFile().mkdirs();
dataPath.resolve(String.valueOf(i)).resolve(String.valueOf(j)).toFile().mkdirs();
}
}
}
}
void shutdown(@Observes @Priority(400) ShutdownEvent event) {
Log.info("Shutdown");
}
private Pair<String, String> getDirPathComponents(@Nonnull String obj) {
int h = Objects.hash(obj);
int p1 = h & 0b00000000_00000000_11111111_00000000;
int p2 = h & 0b00000000_00000000_00000000_11111111;
return Pair.ofNonNull(String.valueOf(p1 >> 8), String.valueOf(p2));
}
private Path getMetaPath(@Nonnull String obj) {
var components = getDirPathComponents(obj);
return metaPath.resolve(components.getLeft()).resolve(components.getRight()).resolve(obj);
}
private Path getDataPath(@Nonnull String obj) {
var components = getDirPathComponents(obj);
return dataPath.resolve(components.getLeft()).resolve(components.getRight()).resolve(obj);
}
private void findAllObjectsImpl(Collection<String> out, Path path) {
var read = path.toFile().listFiles();
if (read == null) return;
for (var s : read) {
if (s.isDirectory()) {
findAllObjectsImpl(out, s.toPath());
} else {
out.add(s.getName());
}
}
}
@Nonnull
@Override
public Collection<String> findAllObjects() {
ArrayList<String> out = new ArrayList<>();
findAllObjectsImpl(out, metaPath);
return Collections.unmodifiableCollection(out);
}
@Nonnull
@Override
public Boolean existsObject(String name) {
return getMetaPath(name).toFile().isFile();
}
@Nonnull
@Override
public Boolean existsObjectData(String name) {
return getDataPath(name).toFile().isFile();
}
private <T extends Message> T readObjectImpl(T defaultInstance, Path path) {
try (var fsb = new FileInputStream(path.toFile());
var fs = new BufferedInputStream(fsb, 1048576)) {
return (T) defaultInstance.getParserForType().parseFrom(fs);
} catch (FileNotFoundException | NoSuchFileException fx) {
throw new StatusRuntimeExceptionNoStacktrace(Status.NOT_FOUND);
} catch (IOException e) {
Log.error("Error reading file " + path, e);
throw new StatusRuntimeExceptionNoStacktrace(Status.INTERNAL);
}
}
@Nonnull
@Override
public JObjectDataP readObject(String name) {
return readObjectImpl(JObjectDataP.getDefaultInstance(), getDataPath(name));
}
@Nonnull
@Override
public ObjectMetadataP readObjectMeta(String name) {
return readObjectImpl(ObjectMetadataP.getDefaultInstance(), getMetaPath(name));
}
private void writeObjectImpl(Path path, Message data) {
try {
try (var fsb = new FileOutputStream(path.toFile(), false);
var fs = new BufferedOutputStream(fsb, 1048576)) {
data.writeTo(fs);
}
} catch (IOException e) {
Log.error("Error writing file " + path, e);
throw new StatusRuntimeExceptionNoStacktrace(Status.INTERNAL);
}
}
@Override
public void writeObject(String name, JObjectDataP data) {
writeObjectImpl(getDataPath(name), data);
}
@Override
public void writeObjectMeta(String name, ObjectMetadataP data) {
writeObjectImpl(getMetaPath(name), data);
}
private void deleteImpl(Path path) {
try {
Files.delete(path);
} catch (NoSuchFileException ignored) {
} catch (IOException e) {
Log.error("Error deleting file " + path, e);
throw new StatusRuntimeExceptionNoStacktrace(Status.INTERNAL);
}
}
@Override
public void deleteObjectData(String name) {
deleteImpl(getDataPath(name));
}
@Override
public void deleteObject(String name) {
deleteImpl(getDataPath(name));
// FIXME: Race?
deleteImpl(getMetaPath(name));
}
}

View File

@@ -0,0 +1,33 @@
package com.usatiuk.dhfs.objects.repository.persistence;
import com.usatiuk.dhfs.objects.persistence.JObjectDataP;
import com.usatiuk.dhfs.objects.persistence.ObjectMetadataP;
import javax.annotation.Nonnull;
import java.util.Collection;
public interface ObjectPersistentStore {
@Nonnull
Collection<String> findAllObjects();
@Nonnull
Boolean existsObject(String name);
@Nonnull
Boolean existsObjectData(String name);
@Nonnull
JObjectDataP readObject(String name);
@Nonnull
ObjectMetadataP readObjectMeta(String name);
void writeObject(String name, JObjectDataP data);
void writeObjectMeta(String name, ObjectMetadataP data);
void deleteObjectData(String name);
// Deletes object metadata and data
void deleteObject(String name);
}

View File

@@ -0,0 +1,7 @@
package com.usatiuk.utils;
@FunctionalInterface
public interface VoidFn {
void apply();
}

View File

@@ -67,11 +67,4 @@ message JObjectDataP {
PeerDirectoryP peerDirectory = 6;
PersistentPeerInfoP persistentPeerInfo = 7;
}
}
message BlobP {
oneof dtype {
ObjectMetadataP metadata = 1;
JObjectDataP data = 2;
}
}
}

View File

@@ -3,7 +3,6 @@ dhfs.objects.persistence.files.root=${HOME}/dhfs_data/dhfs_root
dhfs.objects.root=${HOME}/dhfs_data/dhfs_root_d
dhfs.objects.peerdiscovery.port=42069
dhfs.objects.peerdiscovery.interval=5000
dhfs.objects.peersync.timeout=5
dhfs.objects.sync.timeout=30
dhfs.objects.sync.ping.timeout=5
dhfs.objects.invalidation.threads=4

Some files were not shown because too many files have changed in this diff Show More