mirror of
https://github.com/usatiuk/dhfs.git
synced 2025-10-29 04:57:48 +01:00
writeback fixes
This commit is contained in:
@@ -87,6 +87,10 @@ public class JObjectWriteback {
|
||||
|
||||
public void markDirty(String name, JObject<?> object) {
|
||||
synchronized (this) {
|
||||
if (_objects.containsKey(name)) {
|
||||
return;
|
||||
}
|
||||
|
||||
// FIXME: better logic
|
||||
if (_objects.size() < 5000) {
|
||||
_objects.put(name, object);
|
||||
|
||||
@@ -2,7 +2,6 @@ package com.usatiuk.dhfs.storage.objects.repository.distributed;
|
||||
|
||||
import io.quarkus.logging.Log;
|
||||
import io.quarkus.scheduler.Scheduled;
|
||||
import io.smallrye.common.annotation.Blocking;
|
||||
import io.smallrye.common.annotation.RunOnVirtualThread;
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
import jakarta.inject.Inject;
|
||||
@@ -18,7 +17,7 @@ public class InvalidationQueueService {
|
||||
RemoteObjectServiceClient remoteObjectServiceClient;
|
||||
|
||||
@Scheduled(every = "1s", concurrentExecution = Scheduled.ConcurrentExecution.SKIP)
|
||||
@Blocking
|
||||
@RunOnVirtualThread
|
||||
public void trySend() {
|
||||
var data = _data.runReadLocked(InvalidationQueueData::pullAll);
|
||||
for (var forHost : data.entrySet()) {
|
||||
|
||||
@@ -5,17 +5,17 @@ import io.grpc.StatusRuntimeException;
|
||||
import io.quarkus.logging.Log;
|
||||
import io.quarkus.runtime.ShutdownEvent;
|
||||
import io.quarkus.runtime.StartupEvent;
|
||||
import io.vertx.mutiny.core.Vertx;
|
||||
import io.vertx.mutiny.core.buffer.Buffer;
|
||||
import jakarta.annotation.Priority;
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
import jakarta.enterprise.event.Observes;
|
||||
import jakarta.inject.Inject;
|
||||
import org.eclipse.microprofile.config.inject.ConfigProperty;
|
||||
|
||||
import javax.annotation.Nonnull;
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.Paths;
|
||||
import java.nio.file.StandardOpenOption;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
@@ -24,9 +24,6 @@ public class FileObjectPersistentStore implements ObjectPersistentStore {
|
||||
@ConfigProperty(name = "dhfs.objects.persistence.files.root")
|
||||
String root;
|
||||
|
||||
@Inject
|
||||
Vertx vertx;
|
||||
|
||||
void init(@Observes @Priority(200) StartupEvent event) {
|
||||
Paths.get(root).toFile().mkdirs();
|
||||
Log.info("Initializing with root " + root);
|
||||
@@ -44,10 +41,11 @@ public class FileObjectPersistentStore implements ObjectPersistentStore {
|
||||
if (!nsRoot.toFile().isDirectory())
|
||||
throw new StatusRuntimeException(Status.NOT_FOUND);
|
||||
|
||||
var read = vertx.fileSystem().readDir(nsRoot.toString()).await().indefinitely();
|
||||
var read = nsRoot.toFile().listFiles();
|
||||
if (read == null) return List.of();
|
||||
ArrayList<String> out = new ArrayList<>();
|
||||
for (var s : read) {
|
||||
var rel = nsRoot.relativize(Paths.get(s)).toString();
|
||||
var rel = nsRoot.relativize(s.toPath()).toString();
|
||||
if (rel.startsWith(prefix))
|
||||
out.add(rel);
|
||||
}
|
||||
@@ -70,29 +68,44 @@ public class FileObjectPersistentStore implements ObjectPersistentStore {
|
||||
if (!file.toFile().exists())
|
||||
throw new StatusRuntimeException(Status.NOT_FOUND);
|
||||
|
||||
return vertx.fileSystem().readFile(file.toString()).map(Buffer::getBytes).await().indefinitely();
|
||||
try {
|
||||
return Files.readAllBytes(file);
|
||||
} catch (IOException e) {
|
||||
Log.error("Error reading file " + file, e);
|
||||
throw new StatusRuntimeException(Status.INTERNAL);
|
||||
}
|
||||
}
|
||||
|
||||
@Nonnull
|
||||
@Override
|
||||
public Void writeObject(String name, byte[] data) {
|
||||
public void writeObject(String name, byte[] data) {
|
||||
var file = Path.of(root, name);
|
||||
|
||||
if (!Paths.get(root).toFile().isDirectory()
|
||||
&& !Paths.get(root).toFile().mkdirs())
|
||||
throw new StatusRuntimeException(Status.INTERNAL);
|
||||
|
||||
return vertx.fileSystem().writeFile(file.toString(), Buffer.buffer(data)).await().indefinitely();
|
||||
try {
|
||||
Files.write(file, data, StandardOpenOption.TRUNCATE_EXISTING, StandardOpenOption.CREATE);
|
||||
} catch (IOException e) {
|
||||
Log.error("Error writing file " + file, e);
|
||||
throw new StatusRuntimeException(Status.INTERNAL);
|
||||
}
|
||||
}
|
||||
|
||||
@Nonnull
|
||||
@Override
|
||||
public Void deleteObject(String name) {
|
||||
public void deleteObject(String name) {
|
||||
var file = Path.of(root, name);
|
||||
|
||||
if (!file.toFile().exists())
|
||||
throw new StatusRuntimeException(Status.NOT_FOUND);
|
||||
|
||||
return vertx.fileSystem().delete(file.toString()).await().indefinitely();
|
||||
try {
|
||||
Files.delete(file);
|
||||
} catch (IOException e) {
|
||||
Log.error("Error deleting file " + file, e);
|
||||
throw new StatusRuntimeException(Status.INTERNAL);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -15,7 +15,7 @@ public interface ObjectPersistentStore {
|
||||
@Nonnull
|
||||
byte[] readObject(String name);
|
||||
@Nonnull
|
||||
Void writeObject(String name, byte[] data);
|
||||
void writeObject(String name, byte[] data);
|
||||
@Nonnull
|
||||
Void deleteObject(String name);
|
||||
void deleteObject(String name);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user