mirror of
https://github.com/usatiuk/dhfs.git
synced 2025-10-29 04:57:48 +01:00
Compare commits
13 Commits
25e5acaabb
...
88d1e2136f
| Author | SHA1 | Date | |
|---|---|---|---|
| 88d1e2136f | |||
| eb9e9d9fc1 | |||
| 4795fe28a5 | |||
| 701a281a67 | |||
| 5585af2375 | |||
| 0ad86a74a3 | |||
| fa95fb24eb | |||
| 6a8a46b0d4 | |||
| 039a275de1 | |||
| 5ba62edbbc | |||
| 458970a384 | |||
| 7865e6aa46 | |||
| c8267ffa9e |
@@ -16,7 +16,7 @@ public class Directory extends FsNode {
|
||||
private static final long serialVersionUID = 1;
|
||||
@Getter
|
||||
@Setter
|
||||
private Map<String, UUID> _children = new TreeMap<>();
|
||||
private Map<String, UUID> _children = new HashMap<>();
|
||||
|
||||
public Directory(UUID uuid) {
|
||||
super(uuid);
|
||||
|
||||
@@ -6,6 +6,7 @@ import com.usatiuk.dhfs.files.objects.*;
|
||||
import com.usatiuk.dhfs.objects.jrepository.JObject;
|
||||
import com.usatiuk.dhfs.objects.jrepository.JObjectManager;
|
||||
import com.usatiuk.dhfs.objects.repository.PersistentRemoteHostsService;
|
||||
import com.usatiuk.utils.StatusRuntimeExceptionNoStacktrace;
|
||||
import io.grpc.Status;
|
||||
import io.grpc.StatusRuntimeException;
|
||||
import io.quarkus.logging.Log;
|
||||
@@ -22,6 +23,7 @@ import java.nio.file.Path;
|
||||
import java.nio.file.Paths;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
@ApplicationScoped
|
||||
public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
@@ -46,6 +48,9 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
@ConfigProperty(name = "dhfs.files.use_hash_for_chunks")
|
||||
boolean useHashForChunks;
|
||||
|
||||
@ConfigProperty(name = "dhfs.objects.ref_verification")
|
||||
boolean refVerification;
|
||||
|
||||
@Inject
|
||||
PersistentRemoteHostsService persistentRemoteHostsService;
|
||||
|
||||
@@ -64,45 +69,49 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
getRoot();
|
||||
}
|
||||
|
||||
private JObject<? extends FsNode> traverse(JObject<? extends FsNode> from, Path path) {
|
||||
if (path.getNameCount() == 0) return from;
|
||||
|
||||
var pathFirstPart = path.getName(0).toString();
|
||||
|
||||
var notFound = new StatusRuntimeException(Status.NOT_FOUND.withDescription("Not found: " + from.getName() + "/" + path));
|
||||
private JObject<? extends FsNode> traverse(JObject<? extends FsNode> from, Path path, int curPos) {
|
||||
Supplier<StatusRuntimeExceptionNoStacktrace> notFound
|
||||
= () -> new StatusRuntimeExceptionNoStacktrace(Status.NOT_FOUND.withDescription("Not found: " + from.getName() + "/" + path));
|
||||
|
||||
var found = from.runReadLocked(JObject.ResolutionStrategy.REMOTE, (m, d) -> {
|
||||
if (d instanceof Directory dir)
|
||||
return dir.getKid(pathFirstPart);
|
||||
return dir.getKid(path.getName(curPos).toString());
|
||||
return Optional.empty();
|
||||
}).orElseThrow(() -> notFound);
|
||||
}).orElseThrow(notFound);
|
||||
|
||||
Optional<JObject<?>> ref = jObjectManager.get(found.toString());
|
||||
|
||||
if (ref.isEmpty()) {
|
||||
Log.error("File missing when traversing directory " + from.getName() + ": " + found);
|
||||
throw notFound;
|
||||
throw notFound.get();
|
||||
}
|
||||
|
||||
ref.get().runReadLocked(JObject.ResolutionStrategy.REMOTE, (m, d) -> {
|
||||
if (!(d instanceof FsNode))
|
||||
throw new StatusRuntimeException(Status.DATA_LOSS.withDescription("FsNode is not an FsNode: " + m.getName()));
|
||||
return null;
|
||||
});
|
||||
|
||||
if (path.getNameCount() == 1) {
|
||||
if (refVerification)
|
||||
ref.get().runReadLocked(JObject.ResolutionStrategy.REMOTE, (m, d) -> {
|
||||
if (d instanceof File f) {
|
||||
if (!Objects.equals(f.getParent().toString(), from.getName())) {
|
||||
throw new StatusRuntimeException(Status.DATA_LOSS.withDescription("Parent mismatch for file " + path));
|
||||
}
|
||||
}
|
||||
if (!(d instanceof FsNode))
|
||||
throw new StatusRuntimeException(Status.DATA_LOSS.withDescription("FsNode is not an FsNode: " + m.getName()));
|
||||
return null;
|
||||
});
|
||||
|
||||
if (path.getNameCount() - 1 == curPos) {
|
||||
if (refVerification)
|
||||
ref.get().runReadLocked(JObject.ResolutionStrategy.REMOTE, (m, d) -> {
|
||||
if (d instanceof File f) {
|
||||
if (!Objects.equals(f.getParent().toString(), from.getName())) {
|
||||
throw new StatusRuntimeException(Status.DATA_LOSS.withDescription("Parent mismatch for file " + path));
|
||||
}
|
||||
}
|
||||
return null;
|
||||
});
|
||||
return (JObject<? extends FsNode>) ref.get();
|
||||
}
|
||||
|
||||
return traverse((JObject<? extends FsNode>) ref.get(), path.subpath(1, path.getNameCount()));
|
||||
return traverse((JObject<? extends FsNode>) ref.get(), path, curPos + 1);
|
||||
}
|
||||
|
||||
private JObject<? extends FsNode> traverse(JObject<? extends FsNode> from, Path path) {
|
||||
if (path.getNameCount() == 0) return from;
|
||||
return traverse(from, path, 0);
|
||||
}
|
||||
|
||||
private JObject<? extends FsNode> getDirEntry(String name) {
|
||||
@@ -220,7 +229,13 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
if (dir.getKid(kname).isEmpty())
|
||||
return false;
|
||||
|
||||
kid.get().runWriteLocked(JObject.ResolutionStrategy.NO_RESOLUTION, (m2, d2, bump2, i2) -> {
|
||||
kid.get().runWriteLocked(JObject.ResolutionStrategy.REMOTE, (m2, d2, bump2, i2) -> {
|
||||
if (d2 == null)
|
||||
throw new StatusRuntimeException(Status.UNAVAILABLE.withDescription("Pessimistically not removing unresolved maybe-directory"));
|
||||
if (d2 instanceof Directory)
|
||||
if (!((Directory) d2).getChildren().isEmpty())
|
||||
throw new DirectoryNotEmptyException();
|
||||
|
||||
m2.removeRef(m.getName());
|
||||
return null;
|
||||
});
|
||||
|
||||
@@ -0,0 +1,4 @@
|
||||
package com.usatiuk.dhfs.files.service;
|
||||
|
||||
public class DirectoryNotEmptyException extends RuntimeException {
|
||||
}
|
||||
@@ -6,6 +6,7 @@ import com.usatiuk.dhfs.files.objects.Directory;
|
||||
import com.usatiuk.dhfs.files.objects.File;
|
||||
import com.usatiuk.dhfs.files.objects.FsNode;
|
||||
import com.usatiuk.dhfs.files.service.DhfsFileService;
|
||||
import com.usatiuk.dhfs.files.service.DirectoryNotEmptyException;
|
||||
import io.grpc.Status;
|
||||
import io.grpc.StatusRuntimeException;
|
||||
import io.quarkus.logging.Log;
|
||||
@@ -103,15 +104,13 @@ public class DhfsFuse extends FuseStubFS {
|
||||
}
|
||||
var foundDent = (FsNode) found.get();
|
||||
|
||||
var atime = System.currentTimeMillis();
|
||||
stat.st_atim.tv_sec.set(atime / 1000);
|
||||
stat.st_atim.tv_nsec.set((atime % 1000) * 1000);
|
||||
|
||||
// FIXME: Race?
|
||||
stat.st_ctim.tv_sec.set(foundDent.getCtime() / 1000);
|
||||
stat.st_ctim.tv_nsec.set((foundDent.getCtime() % 1000) * 1000);
|
||||
stat.st_mtim.tv_sec.set(foundDent.getMtime() / 1000);
|
||||
stat.st_mtim.tv_nsec.set((foundDent.getMtime() % 1000) * 1000);
|
||||
stat.st_atim.tv_sec.set(foundDent.getMtime() / 1000);
|
||||
stat.st_atim.tv_nsec.set((foundDent.getMtime() % 1000) * 1000);
|
||||
} catch (Exception e) {
|
||||
Log.error("When getattr " + path, e);
|
||||
return -ErrorCodes.EIO();
|
||||
@@ -211,6 +210,8 @@ public class DhfsFuse extends FuseStubFS {
|
||||
var ret = fileService.rmdir(path);
|
||||
if (!ret) return -ErrorCodes.ENOENT();
|
||||
else return 0;
|
||||
} catch (DirectoryNotEmptyException ex) {
|
||||
return -ErrorCodes.ENOTEMPTY();
|
||||
} catch (Exception e) {
|
||||
Log.error("When removing dir " + path, e);
|
||||
return -ErrorCodes.EIO();
|
||||
@@ -316,6 +317,18 @@ public class DhfsFuse extends FuseStubFS {
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public int chown(String path, long uid, long gid) {
|
||||
try {
|
||||
var fileOpt = fileService.open(path);
|
||||
if (fileOpt.isEmpty()) return -ErrorCodes.ENOENT();
|
||||
return 0;
|
||||
} catch (Exception e) {
|
||||
Log.error("When chown " + path, e);
|
||||
return -ErrorCodes.EIO();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public int symlink(String oldpath, String newpath) {
|
||||
try {
|
||||
|
||||
@@ -69,7 +69,7 @@ public class JObject<T extends JObjectData> implements Serializable, Comparable<
|
||||
}
|
||||
|
||||
public T getData() {
|
||||
assertRWLock();
|
||||
// assertRWLock(); FIXME:
|
||||
return _dataPart.get();
|
||||
}
|
||||
|
||||
|
||||
@@ -0,0 +1,81 @@
|
||||
package com.usatiuk.dhfs.objects.jrepository;
|
||||
|
||||
import io.quarkus.logging.Log;
|
||||
import io.quarkus.runtime.Shutdown;
|
||||
import io.quarkus.runtime.Startup;
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
import jakarta.inject.Inject;
|
||||
import org.eclipse.microprofile.config.inject.ConfigProperty;
|
||||
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
@ApplicationScoped
|
||||
public class JObjectLRU {
|
||||
@Inject
|
||||
JObjectSizeEstimator jObjectSizeEstimator;
|
||||
@ConfigProperty(name = "dhfs.objects.lru.limit")
|
||||
long sizeLimit;
|
||||
|
||||
private long _curSize = 0;
|
||||
private long _evict = 0;
|
||||
|
||||
private final AtomicReference<ConcurrentHashMap<JObject<?>, Long>> _accessQueue = new AtomicReference<>(new ConcurrentHashMap<>());
|
||||
private final AtomicLong _lastDrain = new AtomicLong(0);
|
||||
|
||||
private final LinkedHashMap<JObject<?>, Long> _cache = new LinkedHashMap<>();
|
||||
private ExecutorService _statusExecutor;
|
||||
|
||||
@Startup
|
||||
void init() {
|
||||
_statusExecutor = Executors.newSingleThreadExecutor();
|
||||
_statusExecutor.submit(() -> {
|
||||
try {
|
||||
while (true) {
|
||||
Thread.sleep(1000);
|
||||
if (_curSize > 0)
|
||||
Log.info("Cache status: size="
|
||||
+ _curSize / 1024 / 1024 + "MB"
|
||||
+ " evicted=" + _evict);
|
||||
_evict = 0;
|
||||
}
|
||||
} catch (InterruptedException ignored) {
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Shutdown
|
||||
void shutdown() {
|
||||
_statusExecutor.shutdownNow();
|
||||
}
|
||||
|
||||
public void notifyAccess(JObject<?> obj) {
|
||||
_accessQueue.get().put(obj, jObjectSizeEstimator.estimateObjectSize(obj.getData()));
|
||||
// TODO: no hardcoding
|
||||
if (_accessQueue.get().size() > 500 || System.currentTimeMillis() - _lastDrain.get() > 100) {
|
||||
synchronized (_cache) {
|
||||
_lastDrain.set(System.currentTimeMillis());
|
||||
var newQueue = new ConcurrentHashMap<JObject<?>, Long>();
|
||||
var oldQueue = _accessQueue.getAndSet(newQueue);
|
||||
|
||||
for (var x : oldQueue.entrySet()) {
|
||||
long oldSize = _cache.getOrDefault(x.getKey(), 0L);
|
||||
long newSize = x.getValue();
|
||||
_curSize -= oldSize;
|
||||
_curSize += newSize;
|
||||
_cache.putLast(x.getKey(), newSize);
|
||||
|
||||
while (_curSize >= sizeLimit) {
|
||||
var del = _cache.pollFirstEntry();
|
||||
_curSize -= del.getValue();
|
||||
_evict++;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -17,12 +17,12 @@ import java.lang.ref.ReferenceQueue;
|
||||
import java.lang.ref.SoftReference;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
@ApplicationScoped
|
||||
public class JObjectManagerImpl implements JObjectManager {
|
||||
private final HashMap<String, NamedSoftReference> _map = new HashMap<>();
|
||||
private final ConcurrentHashMap<String, NamedSoftReference> _map = new ConcurrentHashMap<>();
|
||||
private final ReferenceQueue<JObject<?>> _refQueue = new ReferenceQueue<>();
|
||||
@Inject
|
||||
ObjectPersistentStore objectPersistentStore;
|
||||
@@ -32,6 +32,9 @@ public class JObjectManagerImpl implements JObjectManager {
|
||||
PersistentRemoteHostsService persistentRemoteHostsService;
|
||||
@Inject
|
||||
ProtoSerializerService protoSerializerService;
|
||||
@Inject
|
||||
JObjectLRU jObjectLRU;
|
||||
|
||||
private Thread _refCleanupThread;
|
||||
|
||||
@Startup
|
||||
@@ -51,10 +54,7 @@ public class JObjectManagerImpl implements JObjectManager {
|
||||
try {
|
||||
while (!Thread.interrupted()) {
|
||||
NamedSoftReference cur = (NamedSoftReference) _refQueue.remove();
|
||||
synchronized (this) {
|
||||
if (_map.containsKey(cur._key) && (_map.get(cur._key).get() == null))
|
||||
_map.remove(cur._key);
|
||||
}
|
||||
_map.remove(cur._key, cur);
|
||||
}
|
||||
} catch (InterruptedException ignored) {
|
||||
}
|
||||
@@ -62,22 +62,21 @@ public class JObjectManagerImpl implements JObjectManager {
|
||||
}
|
||||
|
||||
private JObject<?> getFromMap(String key) {
|
||||
synchronized (this) {
|
||||
if (_map.containsKey(key)) {
|
||||
var ref = _map.get(key).get();
|
||||
if (ref != null) {
|
||||
return ref;
|
||||
}
|
||||
}
|
||||
var ret = _map.get(key);
|
||||
if (ret != null && ret.get() != null) {
|
||||
return ret.get();
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<JObject<?>> get(String name) {
|
||||
synchronized (this) {
|
||||
{
|
||||
var inMap = getFromMap(name);
|
||||
if (inMap != null) return Optional.of(inMap);
|
||||
if (inMap != null) {
|
||||
jObjectLRU.notifyAccess(inMap);
|
||||
return Optional.of(inMap);
|
||||
}
|
||||
}
|
||||
|
||||
BlobP readMd;
|
||||
@@ -97,13 +96,15 @@ public class JObjectManagerImpl implements JObjectManager {
|
||||
return Optional.empty();
|
||||
}
|
||||
|
||||
synchronized (this) {
|
||||
var inMap = getFromMap(name);
|
||||
if (inMap != null) return Optional.of(inMap);
|
||||
JObject<?> newObj = new JObject<>(jObjectResolver, (ObjectMetadata) meta);
|
||||
_map.put(name, new NamedSoftReference(newObj, _refQueue));
|
||||
return Optional.of(newObj);
|
||||
JObject<?> ret = null;
|
||||
var newObj = new JObject<>(jObjectResolver, (ObjectMetadata) meta);
|
||||
while (ret == null) {
|
||||
var ref = _map.computeIfAbsent(name, k -> new NamedSoftReference(newObj, _refQueue));
|
||||
if (ref.get() == null) _map.remove(name, ref);
|
||||
else ret = ref.get();
|
||||
}
|
||||
jObjectLRU.notifyAccess(ret);
|
||||
return Optional.of(ret);
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -122,35 +123,47 @@ public class JObjectManagerImpl implements JObjectManager {
|
||||
while (true) {
|
||||
JObject<?> ret;
|
||||
boolean created = false;
|
||||
synchronized (this) {
|
||||
JObject<?> newObj = null;
|
||||
try {
|
||||
ret = getFromMap(object.getName());
|
||||
if (ret != null) {
|
||||
if (!object.assumeUnique())
|
||||
throw new IllegalArgumentException("Trying to insert different object with same key");
|
||||
} else {
|
||||
ret = new JObject<D>(jObjectResolver, object.getName(), persistentRemoteHostsService.getSelfUuid(), object);
|
||||
_map.put(object.getName(), new NamedSoftReference(ret, _refQueue));
|
||||
newObj = new JObject<D>(jObjectResolver, object.getName(), persistentRemoteHostsService.getSelfUuid(), object);
|
||||
newObj.rwLock();
|
||||
while (ret == null) {
|
||||
JObject<?> finalNewObj = newObj;
|
||||
var ref = _map.computeIfAbsent(object.getName(), k -> new NamedSoftReference(finalNewObj, _refQueue));
|
||||
if (ref.get() == null) _map.remove(object.getName(), ref);
|
||||
else ret = ref.get();
|
||||
}
|
||||
if (ret != newObj) continue;
|
||||
created = true;
|
||||
}
|
||||
JObject<D> finalRet = (JObject<D>) ret;
|
||||
boolean finalCreated = created;
|
||||
ret.runWriteLocked(JObject.ResolutionStrategy.NO_RESOLUTION, (m, d, b, i) -> {
|
||||
if (object.getClass().isAnnotationPresent(PushResolution.class) && object.assumeUnique() && finalRet.getData() == null) {
|
||||
finalRet.externalResolution(object);
|
||||
}
|
||||
|
||||
if (parent.isPresent()) {
|
||||
m.addRef(parent.get());
|
||||
if (m.isLocked())
|
||||
m.unlock();
|
||||
} else {
|
||||
m.lock();
|
||||
}
|
||||
|
||||
if (finalCreated) finalRet.notifyWrite();// Kind of a hack?
|
||||
return null;
|
||||
});
|
||||
} finally {
|
||||
if (newObj != null) newObj.rwUnlock();
|
||||
}
|
||||
JObject<D> finalRet = (JObject<D>) ret;
|
||||
boolean finalCreated = created;
|
||||
ret.runWriteLocked(JObject.ResolutionStrategy.NO_RESOLUTION, (m, d, b, i) -> {
|
||||
if (object.getClass().isAnnotationPresent(PushResolution.class) && object.assumeUnique() && finalRet.getData() == null) {
|
||||
finalRet.externalResolution(object);
|
||||
}
|
||||
|
||||
if (parent.isPresent()) {
|
||||
m.addRef(parent.get());
|
||||
if (m.isLocked())
|
||||
m.unlock();
|
||||
} else {
|
||||
m.lock();
|
||||
}
|
||||
|
||||
if (finalCreated) finalRet.notifyWrite();// Kind of a hack?
|
||||
return null;
|
||||
});
|
||||
if (!created)
|
||||
jObjectLRU.notifyAccess(ret);
|
||||
return (JObject<D>) ret;
|
||||
}
|
||||
}
|
||||
@@ -169,28 +182,30 @@ public class JObjectManagerImpl implements JObjectManager {
|
||||
m.addRef(s);
|
||||
return true;
|
||||
}));
|
||||
jObjectLRU.notifyAccess(got.get());
|
||||
return got.get();
|
||||
}
|
||||
|
||||
synchronized (this) {
|
||||
var inMap = getFromMap(name);
|
||||
if (inMap != null) {
|
||||
continue;
|
||||
} else {
|
||||
// FIXME:
|
||||
if (objectPersistentStore.existsObject("meta_" + name))
|
||||
continue;
|
||||
|
||||
var created = new JObject<>(jObjectResolver, new ObjectMetadata(name, false, klass));
|
||||
_map.put(name, new NamedSoftReference(created, _refQueue));
|
||||
created.runWriteLocked(JObject.ResolutionStrategy.NO_RESOLUTION, (m, d, b, i) -> {
|
||||
parent.ifPresent(m::addRef);
|
||||
m.markSeen();
|
||||
return null;
|
||||
});
|
||||
return created;
|
||||
JObject<?> ret = null;
|
||||
var created = new JObject<>(jObjectResolver, new ObjectMetadata(name, false, klass));
|
||||
created.rwLock();
|
||||
try {
|
||||
while (ret == null) {
|
||||
var ref = _map.computeIfAbsent(name, k -> new NamedSoftReference(created, _refQueue));
|
||||
if (ref.get() == null) _map.remove(name, ref);
|
||||
else ret = ref.get();
|
||||
}
|
||||
if (ret != created) continue;
|
||||
|
||||
created.runWriteLocked(JObject.ResolutionStrategy.NO_RESOLUTION, (m, d, b, i) -> {
|
||||
parent.ifPresent(m::addRef);
|
||||
m.markSeen();
|
||||
return null;
|
||||
});
|
||||
} finally {
|
||||
created.rwUnlock();
|
||||
}
|
||||
return created;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -43,6 +43,8 @@ public class JObjectResolver {
|
||||
ProtoSerializerService protoSerializerService;
|
||||
@Inject
|
||||
JObjectRefProcessor jObjectRefProcessor;
|
||||
@Inject
|
||||
JObjectLRU jObjectLRU;
|
||||
@ConfigProperty(name = "dhfs.objects.ref_verification")
|
||||
boolean refVerification;
|
||||
|
||||
|
||||
@@ -12,10 +12,7 @@ import jakarta.enterprise.event.Observes;
|
||||
import org.eclipse.microprofile.config.inject.ConfigProperty;
|
||||
|
||||
import javax.annotation.Nonnull;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.FileOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.*;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.NoSuchFileException;
|
||||
import java.nio.file.Path;
|
||||
@@ -69,7 +66,8 @@ public class FileObjectPersistentStore implements ObjectPersistentStore {
|
||||
public BlobP readObject(String name) {
|
||||
var file = Path.of(root, name);
|
||||
|
||||
try (var fs = new FileInputStream(file.toFile())) {
|
||||
try (var fsb = new FileInputStream(file.toFile());
|
||||
var fs = new BufferedInputStream(fsb, 1048576)) {
|
||||
return BlobP.parseFrom(fs);
|
||||
} catch (FileNotFoundException | NoSuchFileException fx) {
|
||||
throw new StatusRuntimeException(Status.NOT_FOUND);
|
||||
@@ -84,8 +82,9 @@ public class FileObjectPersistentStore implements ObjectPersistentStore {
|
||||
var file = Path.of(root, name);
|
||||
|
||||
try {
|
||||
try (var fc = new FileOutputStream(file.toFile(), false)) {
|
||||
data.writeTo(fc);
|
||||
try (var fsb = new FileOutputStream(file.toFile(), false);
|
||||
var fs = new BufferedOutputStream(fsb, 1048576)) {
|
||||
data.writeTo(fs);
|
||||
}
|
||||
} catch (IOException e) {
|
||||
Log.error("Error writing file " + file, e);
|
||||
|
||||
@@ -0,0 +1,22 @@
|
||||
package com.usatiuk.utils;
|
||||
|
||||
import io.grpc.Metadata;
|
||||
import io.grpc.Status;
|
||||
import io.grpc.StatusRuntimeException;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
|
||||
public class StatusRuntimeExceptionNoStacktrace extends StatusRuntimeException {
|
||||
public StatusRuntimeExceptionNoStacktrace(Status status) {
|
||||
super(status);
|
||||
}
|
||||
|
||||
public StatusRuntimeExceptionNoStacktrace(Status status, @Nullable Metadata trailers) {
|
||||
super(status, trailers);
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized Throwable fillInStackTrace() {
|
||||
return this;
|
||||
}
|
||||
}
|
||||
@@ -21,7 +21,8 @@ dhfs.files.write_merge_limit=1.2
|
||||
dhfs.files.write_merge_max_chunk_to_take=1
|
||||
dhfs.files.write_last_chunk_limit=1.5
|
||||
dhfs.objects.writeback.delay=100
|
||||
dhfs.objects.writeback.limit=1073741824
|
||||
dhfs.objects.writeback.limit=134217728
|
||||
dhfs.objects.lru.limit=134217728
|
||||
dhfs.objects.writeback.watermark-high=0.6
|
||||
dhfs.objects.writeback.watermark-low=0.4
|
||||
dhfs.objects.writeback.threads=4
|
||||
|
||||
@@ -50,4 +50,28 @@ public class DhfsFuseTest {
|
||||
Assertions.assertDoesNotThrow(() -> Files.readAllBytes(testSymlink));
|
||||
Assertions.assertArrayEquals(Files.readAllBytes(testSymlink), testString);
|
||||
}
|
||||
|
||||
@Test
|
||||
void dontRemoveEmptyDirTest() throws IOException {
|
||||
byte[] testString = "dontRemoveEmptyDirTestStr".getBytes();
|
||||
Path testDir = Path.of(root).resolve("dontRemoveEmptyDirTestDir");
|
||||
Path testFile = testDir.resolve("dontRemoveEmptyDirTestFile");
|
||||
|
||||
Assertions.assertDoesNotThrow(() -> Files.createDirectory(testDir));
|
||||
Assertions.assertDoesNotThrow(() -> Files.createFile(testFile));
|
||||
Assertions.assertDoesNotThrow(() -> Files.write(testFile, testString));
|
||||
Assertions.assertDoesNotThrow(() -> Files.readAllBytes(testFile));
|
||||
Assertions.assertArrayEquals(Files.readAllBytes(testFile), testString);
|
||||
|
||||
Assertions.assertThrows(Exception.class, () -> Files.delete(testDir));
|
||||
Assertions.assertDoesNotThrow(() -> Files.readAllBytes(testFile));
|
||||
Assertions.assertArrayEquals(Files.readAllBytes(testFile), testString);
|
||||
|
||||
Assertions.assertDoesNotThrow(() -> Files.delete(testFile));
|
||||
Assertions.assertDoesNotThrow(() -> Files.delete(testDir));
|
||||
Assertions.assertFalse(Files.exists(testDir));
|
||||
Assertions.assertFalse(Files.exists(testFile));
|
||||
Assertions.assertThrows(Exception.class, () -> Files.readAllBytes(testFile));
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user