13 Commits

Author SHA1 Message Date
88d1e2136f don't remove non-empty dirs
Some checks failed
Server / build-dhfs (push) Failing after 7m6s
Server / build-webui (push) Successful in 2m26s
Server / publish-docker (push) Has been skipped
Server / publish-run-wrapper (push) Has been skipped
2024-07-21 22:05:09 +02:00
eb9e9d9fc1 a little cleaner traverse 2024-07-21 21:54:22 +02:00
4795fe28a5 don't put into lru cache just created objects 2024-07-21 21:33:00 +02:00
701a281a67 remove notfound stacktrace in traverse 2024-07-21 17:09:33 +02:00
5585af2375 cached LRU access queue 2024-07-21 17:02:47 +02:00
0ad86a74a3 use concurrent hash map not the skiplist! 2024-07-21 16:40:39 +02:00
fa95fb24eb lock-free jObjectManager 2024-07-21 16:37:18 +02:00
6a8a46b0d4 less lru notifications 2024-07-21 16:04:55 +02:00
039a275de1 lru stats 2024-07-21 15:59:33 +02:00
5ba62edbbc use buffered streams 2024-07-21 15:24:52 +02:00
458970a384 use mtime as atime 2024-07-21 15:24:43 +02:00
7865e6aa46 simple additional lru cache 2024-07-21 14:03:57 +02:00
c8267ffa9e fake chown 2024-07-21 13:44:51 +02:00
12 changed files with 272 additions and 96 deletions

View File

@@ -16,7 +16,7 @@ public class Directory extends FsNode {
private static final long serialVersionUID = 1;
@Getter
@Setter
private Map<String, UUID> _children = new TreeMap<>();
private Map<String, UUID> _children = new HashMap<>();
public Directory(UUID uuid) {
super(uuid);

View File

@@ -6,6 +6,7 @@ import com.usatiuk.dhfs.files.objects.*;
import com.usatiuk.dhfs.objects.jrepository.JObject;
import com.usatiuk.dhfs.objects.jrepository.JObjectManager;
import com.usatiuk.dhfs.objects.repository.PersistentRemoteHostsService;
import com.usatiuk.utils.StatusRuntimeExceptionNoStacktrace;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.quarkus.logging.Log;
@@ -22,6 +23,7 @@ import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.*;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
@ApplicationScoped
public class DhfsFileServiceImpl implements DhfsFileService {
@@ -46,6 +48,9 @@ public class DhfsFileServiceImpl implements DhfsFileService {
@ConfigProperty(name = "dhfs.files.use_hash_for_chunks")
boolean useHashForChunks;
@ConfigProperty(name = "dhfs.objects.ref_verification")
boolean refVerification;
@Inject
PersistentRemoteHostsService persistentRemoteHostsService;
@@ -64,45 +69,49 @@ public class DhfsFileServiceImpl implements DhfsFileService {
getRoot();
}
private JObject<? extends FsNode> traverse(JObject<? extends FsNode> from, Path path) {
if (path.getNameCount() == 0) return from;
var pathFirstPart = path.getName(0).toString();
var notFound = new StatusRuntimeException(Status.NOT_FOUND.withDescription("Not found: " + from.getName() + "/" + path));
private JObject<? extends FsNode> traverse(JObject<? extends FsNode> from, Path path, int curPos) {
Supplier<StatusRuntimeExceptionNoStacktrace> notFound
= () -> new StatusRuntimeExceptionNoStacktrace(Status.NOT_FOUND.withDescription("Not found: " + from.getName() + "/" + path));
var found = from.runReadLocked(JObject.ResolutionStrategy.REMOTE, (m, d) -> {
if (d instanceof Directory dir)
return dir.getKid(pathFirstPart);
return dir.getKid(path.getName(curPos).toString());
return Optional.empty();
}).orElseThrow(() -> notFound);
}).orElseThrow(notFound);
Optional<JObject<?>> ref = jObjectManager.get(found.toString());
if (ref.isEmpty()) {
Log.error("File missing when traversing directory " + from.getName() + ": " + found);
throw notFound;
throw notFound.get();
}
ref.get().runReadLocked(JObject.ResolutionStrategy.REMOTE, (m, d) -> {
if (!(d instanceof FsNode))
throw new StatusRuntimeException(Status.DATA_LOSS.withDescription("FsNode is not an FsNode: " + m.getName()));
return null;
});
if (path.getNameCount() == 1) {
if (refVerification)
ref.get().runReadLocked(JObject.ResolutionStrategy.REMOTE, (m, d) -> {
if (d instanceof File f) {
if (!Objects.equals(f.getParent().toString(), from.getName())) {
throw new StatusRuntimeException(Status.DATA_LOSS.withDescription("Parent mismatch for file " + path));
}
}
if (!(d instanceof FsNode))
throw new StatusRuntimeException(Status.DATA_LOSS.withDescription("FsNode is not an FsNode: " + m.getName()));
return null;
});
if (path.getNameCount() - 1 == curPos) {
if (refVerification)
ref.get().runReadLocked(JObject.ResolutionStrategy.REMOTE, (m, d) -> {
if (d instanceof File f) {
if (!Objects.equals(f.getParent().toString(), from.getName())) {
throw new StatusRuntimeException(Status.DATA_LOSS.withDescription("Parent mismatch for file " + path));
}
}
return null;
});
return (JObject<? extends FsNode>) ref.get();
}
return traverse((JObject<? extends FsNode>) ref.get(), path.subpath(1, path.getNameCount()));
return traverse((JObject<? extends FsNode>) ref.get(), path, curPos + 1);
}
private JObject<? extends FsNode> traverse(JObject<? extends FsNode> from, Path path) {
if (path.getNameCount() == 0) return from;
return traverse(from, path, 0);
}
private JObject<? extends FsNode> getDirEntry(String name) {
@@ -220,7 +229,13 @@ public class DhfsFileServiceImpl implements DhfsFileService {
if (dir.getKid(kname).isEmpty())
return false;
kid.get().runWriteLocked(JObject.ResolutionStrategy.NO_RESOLUTION, (m2, d2, bump2, i2) -> {
kid.get().runWriteLocked(JObject.ResolutionStrategy.REMOTE, (m2, d2, bump2, i2) -> {
if (d2 == null)
throw new StatusRuntimeException(Status.UNAVAILABLE.withDescription("Pessimistically not removing unresolved maybe-directory"));
if (d2 instanceof Directory)
if (!((Directory) d2).getChildren().isEmpty())
throw new DirectoryNotEmptyException();
m2.removeRef(m.getName());
return null;
});

View File

@@ -0,0 +1,4 @@
package com.usatiuk.dhfs.files.service;
public class DirectoryNotEmptyException extends RuntimeException {
}

View File

@@ -6,6 +6,7 @@ import com.usatiuk.dhfs.files.objects.Directory;
import com.usatiuk.dhfs.files.objects.File;
import com.usatiuk.dhfs.files.objects.FsNode;
import com.usatiuk.dhfs.files.service.DhfsFileService;
import com.usatiuk.dhfs.files.service.DirectoryNotEmptyException;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.quarkus.logging.Log;
@@ -103,15 +104,13 @@ public class DhfsFuse extends FuseStubFS {
}
var foundDent = (FsNode) found.get();
var atime = System.currentTimeMillis();
stat.st_atim.tv_sec.set(atime / 1000);
stat.st_atim.tv_nsec.set((atime % 1000) * 1000);
// FIXME: Race?
stat.st_ctim.tv_sec.set(foundDent.getCtime() / 1000);
stat.st_ctim.tv_nsec.set((foundDent.getCtime() % 1000) * 1000);
stat.st_mtim.tv_sec.set(foundDent.getMtime() / 1000);
stat.st_mtim.tv_nsec.set((foundDent.getMtime() % 1000) * 1000);
stat.st_atim.tv_sec.set(foundDent.getMtime() / 1000);
stat.st_atim.tv_nsec.set((foundDent.getMtime() % 1000) * 1000);
} catch (Exception e) {
Log.error("When getattr " + path, e);
return -ErrorCodes.EIO();
@@ -211,6 +210,8 @@ public class DhfsFuse extends FuseStubFS {
var ret = fileService.rmdir(path);
if (!ret) return -ErrorCodes.ENOENT();
else return 0;
} catch (DirectoryNotEmptyException ex) {
return -ErrorCodes.ENOTEMPTY();
} catch (Exception e) {
Log.error("When removing dir " + path, e);
return -ErrorCodes.EIO();
@@ -316,6 +317,18 @@ public class DhfsFuse extends FuseStubFS {
}
}
@Override
public int chown(String path, long uid, long gid) {
try {
var fileOpt = fileService.open(path);
if (fileOpt.isEmpty()) return -ErrorCodes.ENOENT();
return 0;
} catch (Exception e) {
Log.error("When chown " + path, e);
return -ErrorCodes.EIO();
}
}
@Override
public int symlink(String oldpath, String newpath) {
try {

View File

@@ -69,7 +69,7 @@ public class JObject<T extends JObjectData> implements Serializable, Comparable<
}
public T getData() {
assertRWLock();
// assertRWLock(); FIXME:
return _dataPart.get();
}

View File

@@ -0,0 +1,81 @@
package com.usatiuk.dhfs.objects.jrepository;
import io.quarkus.logging.Log;
import io.quarkus.runtime.Shutdown;
import io.quarkus.runtime.Startup;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import java.util.LinkedHashMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
@ApplicationScoped
public class JObjectLRU {
@Inject
JObjectSizeEstimator jObjectSizeEstimator;
@ConfigProperty(name = "dhfs.objects.lru.limit")
long sizeLimit;
private long _curSize = 0;
private long _evict = 0;
private final AtomicReference<ConcurrentHashMap<JObject<?>, Long>> _accessQueue = new AtomicReference<>(new ConcurrentHashMap<>());
private final AtomicLong _lastDrain = new AtomicLong(0);
private final LinkedHashMap<JObject<?>, Long> _cache = new LinkedHashMap<>();
private ExecutorService _statusExecutor;
@Startup
void init() {
_statusExecutor = Executors.newSingleThreadExecutor();
_statusExecutor.submit(() -> {
try {
while (true) {
Thread.sleep(1000);
if (_curSize > 0)
Log.info("Cache status: size="
+ _curSize / 1024 / 1024 + "MB"
+ " evicted=" + _evict);
_evict = 0;
}
} catch (InterruptedException ignored) {
}
});
}
@Shutdown
void shutdown() {
_statusExecutor.shutdownNow();
}
public void notifyAccess(JObject<?> obj) {
_accessQueue.get().put(obj, jObjectSizeEstimator.estimateObjectSize(obj.getData()));
// TODO: no hardcoding
if (_accessQueue.get().size() > 500 || System.currentTimeMillis() - _lastDrain.get() > 100) {
synchronized (_cache) {
_lastDrain.set(System.currentTimeMillis());
var newQueue = new ConcurrentHashMap<JObject<?>, Long>();
var oldQueue = _accessQueue.getAndSet(newQueue);
for (var x : oldQueue.entrySet()) {
long oldSize = _cache.getOrDefault(x.getKey(), 0L);
long newSize = x.getValue();
_curSize -= oldSize;
_curSize += newSize;
_cache.putLast(x.getKey(), newSize);
while (_curSize >= sizeLimit) {
var del = _cache.pollFirstEntry();
_curSize -= del.getValue();
_evict++;
}
}
}
}
}
}

View File

@@ -17,12 +17,12 @@ import java.lang.ref.ReferenceQueue;
import java.lang.ref.SoftReference;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
@ApplicationScoped
public class JObjectManagerImpl implements JObjectManager {
private final HashMap<String, NamedSoftReference> _map = new HashMap<>();
private final ConcurrentHashMap<String, NamedSoftReference> _map = new ConcurrentHashMap<>();
private final ReferenceQueue<JObject<?>> _refQueue = new ReferenceQueue<>();
@Inject
ObjectPersistentStore objectPersistentStore;
@@ -32,6 +32,9 @@ public class JObjectManagerImpl implements JObjectManager {
PersistentRemoteHostsService persistentRemoteHostsService;
@Inject
ProtoSerializerService protoSerializerService;
@Inject
JObjectLRU jObjectLRU;
private Thread _refCleanupThread;
@Startup
@@ -51,10 +54,7 @@ public class JObjectManagerImpl implements JObjectManager {
try {
while (!Thread.interrupted()) {
NamedSoftReference cur = (NamedSoftReference) _refQueue.remove();
synchronized (this) {
if (_map.containsKey(cur._key) && (_map.get(cur._key).get() == null))
_map.remove(cur._key);
}
_map.remove(cur._key, cur);
}
} catch (InterruptedException ignored) {
}
@@ -62,22 +62,21 @@ public class JObjectManagerImpl implements JObjectManager {
}
private JObject<?> getFromMap(String key) {
synchronized (this) {
if (_map.containsKey(key)) {
var ref = _map.get(key).get();
if (ref != null) {
return ref;
}
}
var ret = _map.get(key);
if (ret != null && ret.get() != null) {
return ret.get();
}
return null;
}
@Override
public Optional<JObject<?>> get(String name) {
synchronized (this) {
{
var inMap = getFromMap(name);
if (inMap != null) return Optional.of(inMap);
if (inMap != null) {
jObjectLRU.notifyAccess(inMap);
return Optional.of(inMap);
}
}
BlobP readMd;
@@ -97,13 +96,15 @@ public class JObjectManagerImpl implements JObjectManager {
return Optional.empty();
}
synchronized (this) {
var inMap = getFromMap(name);
if (inMap != null) return Optional.of(inMap);
JObject<?> newObj = new JObject<>(jObjectResolver, (ObjectMetadata) meta);
_map.put(name, new NamedSoftReference(newObj, _refQueue));
return Optional.of(newObj);
JObject<?> ret = null;
var newObj = new JObject<>(jObjectResolver, (ObjectMetadata) meta);
while (ret == null) {
var ref = _map.computeIfAbsent(name, k -> new NamedSoftReference(newObj, _refQueue));
if (ref.get() == null) _map.remove(name, ref);
else ret = ref.get();
}
jObjectLRU.notifyAccess(ret);
return Optional.of(ret);
}
@Override
@@ -122,35 +123,47 @@ public class JObjectManagerImpl implements JObjectManager {
while (true) {
JObject<?> ret;
boolean created = false;
synchronized (this) {
JObject<?> newObj = null;
try {
ret = getFromMap(object.getName());
if (ret != null) {
if (!object.assumeUnique())
throw new IllegalArgumentException("Trying to insert different object with same key");
} else {
ret = new JObject<D>(jObjectResolver, object.getName(), persistentRemoteHostsService.getSelfUuid(), object);
_map.put(object.getName(), new NamedSoftReference(ret, _refQueue));
newObj = new JObject<D>(jObjectResolver, object.getName(), persistentRemoteHostsService.getSelfUuid(), object);
newObj.rwLock();
while (ret == null) {
JObject<?> finalNewObj = newObj;
var ref = _map.computeIfAbsent(object.getName(), k -> new NamedSoftReference(finalNewObj, _refQueue));
if (ref.get() == null) _map.remove(object.getName(), ref);
else ret = ref.get();
}
if (ret != newObj) continue;
created = true;
}
JObject<D> finalRet = (JObject<D>) ret;
boolean finalCreated = created;
ret.runWriteLocked(JObject.ResolutionStrategy.NO_RESOLUTION, (m, d, b, i) -> {
if (object.getClass().isAnnotationPresent(PushResolution.class) && object.assumeUnique() && finalRet.getData() == null) {
finalRet.externalResolution(object);
}
if (parent.isPresent()) {
m.addRef(parent.get());
if (m.isLocked())
m.unlock();
} else {
m.lock();
}
if (finalCreated) finalRet.notifyWrite();// Kind of a hack?
return null;
});
} finally {
if (newObj != null) newObj.rwUnlock();
}
JObject<D> finalRet = (JObject<D>) ret;
boolean finalCreated = created;
ret.runWriteLocked(JObject.ResolutionStrategy.NO_RESOLUTION, (m, d, b, i) -> {
if (object.getClass().isAnnotationPresent(PushResolution.class) && object.assumeUnique() && finalRet.getData() == null) {
finalRet.externalResolution(object);
}
if (parent.isPresent()) {
m.addRef(parent.get());
if (m.isLocked())
m.unlock();
} else {
m.lock();
}
if (finalCreated) finalRet.notifyWrite();// Kind of a hack?
return null;
});
if (!created)
jObjectLRU.notifyAccess(ret);
return (JObject<D>) ret;
}
}
@@ -169,28 +182,30 @@ public class JObjectManagerImpl implements JObjectManager {
m.addRef(s);
return true;
}));
jObjectLRU.notifyAccess(got.get());
return got.get();
}
synchronized (this) {
var inMap = getFromMap(name);
if (inMap != null) {
continue;
} else {
// FIXME:
if (objectPersistentStore.existsObject("meta_" + name))
continue;
var created = new JObject<>(jObjectResolver, new ObjectMetadata(name, false, klass));
_map.put(name, new NamedSoftReference(created, _refQueue));
created.runWriteLocked(JObject.ResolutionStrategy.NO_RESOLUTION, (m, d, b, i) -> {
parent.ifPresent(m::addRef);
m.markSeen();
return null;
});
return created;
JObject<?> ret = null;
var created = new JObject<>(jObjectResolver, new ObjectMetadata(name, false, klass));
created.rwLock();
try {
while (ret == null) {
var ref = _map.computeIfAbsent(name, k -> new NamedSoftReference(created, _refQueue));
if (ref.get() == null) _map.remove(name, ref);
else ret = ref.get();
}
if (ret != created) continue;
created.runWriteLocked(JObject.ResolutionStrategy.NO_RESOLUTION, (m, d, b, i) -> {
parent.ifPresent(m::addRef);
m.markSeen();
return null;
});
} finally {
created.rwUnlock();
}
return created;
}
}

View File

@@ -43,6 +43,8 @@ public class JObjectResolver {
ProtoSerializerService protoSerializerService;
@Inject
JObjectRefProcessor jObjectRefProcessor;
@Inject
JObjectLRU jObjectLRU;
@ConfigProperty(name = "dhfs.objects.ref_verification")
boolean refVerification;

View File

@@ -12,10 +12,7 @@ import jakarta.enterprise.event.Observes;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import javax.annotation.Nonnull;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.*;
import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
@@ -69,7 +66,8 @@ public class FileObjectPersistentStore implements ObjectPersistentStore {
public BlobP readObject(String name) {
var file = Path.of(root, name);
try (var fs = new FileInputStream(file.toFile())) {
try (var fsb = new FileInputStream(file.toFile());
var fs = new BufferedInputStream(fsb, 1048576)) {
return BlobP.parseFrom(fs);
} catch (FileNotFoundException | NoSuchFileException fx) {
throw new StatusRuntimeException(Status.NOT_FOUND);
@@ -84,8 +82,9 @@ public class FileObjectPersistentStore implements ObjectPersistentStore {
var file = Path.of(root, name);
try {
try (var fc = new FileOutputStream(file.toFile(), false)) {
data.writeTo(fc);
try (var fsb = new FileOutputStream(file.toFile(), false);
var fs = new BufferedOutputStream(fsb, 1048576)) {
data.writeTo(fs);
}
} catch (IOException e) {
Log.error("Error writing file " + file, e);

View File

@@ -0,0 +1,22 @@
package com.usatiuk.utils;
import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import javax.annotation.Nullable;
public class StatusRuntimeExceptionNoStacktrace extends StatusRuntimeException {
public StatusRuntimeExceptionNoStacktrace(Status status) {
super(status);
}
public StatusRuntimeExceptionNoStacktrace(Status status, @Nullable Metadata trailers) {
super(status, trailers);
}
@Override
public synchronized Throwable fillInStackTrace() {
return this;
}
}

View File

@@ -21,7 +21,8 @@ dhfs.files.write_merge_limit=1.2
dhfs.files.write_merge_max_chunk_to_take=1
dhfs.files.write_last_chunk_limit=1.5
dhfs.objects.writeback.delay=100
dhfs.objects.writeback.limit=1073741824
dhfs.objects.writeback.limit=134217728
dhfs.objects.lru.limit=134217728
dhfs.objects.writeback.watermark-high=0.6
dhfs.objects.writeback.watermark-low=0.4
dhfs.objects.writeback.threads=4

View File

@@ -50,4 +50,28 @@ public class DhfsFuseTest {
Assertions.assertDoesNotThrow(() -> Files.readAllBytes(testSymlink));
Assertions.assertArrayEquals(Files.readAllBytes(testSymlink), testString);
}
@Test
void dontRemoveEmptyDirTest() throws IOException {
byte[] testString = "dontRemoveEmptyDirTestStr".getBytes();
Path testDir = Path.of(root).resolve("dontRemoveEmptyDirTestDir");
Path testFile = testDir.resolve("dontRemoveEmptyDirTestFile");
Assertions.assertDoesNotThrow(() -> Files.createDirectory(testDir));
Assertions.assertDoesNotThrow(() -> Files.createFile(testFile));
Assertions.assertDoesNotThrow(() -> Files.write(testFile, testString));
Assertions.assertDoesNotThrow(() -> Files.readAllBytes(testFile));
Assertions.assertArrayEquals(Files.readAllBytes(testFile), testString);
Assertions.assertThrows(Exception.class, () -> Files.delete(testDir));
Assertions.assertDoesNotThrow(() -> Files.readAllBytes(testFile));
Assertions.assertArrayEquals(Files.readAllBytes(testFile), testString);
Assertions.assertDoesNotThrow(() -> Files.delete(testFile));
Assertions.assertDoesNotThrow(() -> Files.delete(testDir));
Assertions.assertFalse(Files.exists(testDir));
Assertions.assertFalse(Files.exists(testFile));
Assertions.assertThrows(Exception.class, () -> Files.readAllBytes(testFile));
}
}