mirror of
https://github.com/usatiuk/dhfs.git
synced 2025-10-29 04:57:48 +01:00
Compare commits
11 Commits
18d775cf68
...
10ba128de3
| Author | SHA1 | Date | |
|---|---|---|---|
| 10ba128de3 | |||
| 178308153d | |||
| e1f262c330 | |||
| 8c16f32df6 | |||
| 9c6dd69556 | |||
| 5fbdfb3047 | |||
| 0c399b0156 | |||
| ff69827baf | |||
| 4446aedb17 | |||
| c82ec23739 | |||
| 048e46fc8d |
2
.github/workflows/server.yml
vendored
2
.github/workflows/server.yml
vendored
@@ -39,7 +39,7 @@ jobs:
|
||||
run: cd dhfs-parent && mvn --batch-mode --update-snapshots package verify
|
||||
|
||||
- name: Build with Maven
|
||||
run: cd dhfs-parent && mvn --batch-mode --update-snapshots package -Dquarkus.log.category.\"com.usatiuk.dhfs\".min-level=DEBUG
|
||||
run: cd dhfs-parent && mvn --batch-mode --update-snapshots package # -Dquarkus.log.category.\"com.usatiuk.dhfs\".min-level=DEBUG
|
||||
|
||||
- uses: actions/upload-artifact@v3
|
||||
with:
|
||||
|
||||
@@ -4,6 +4,7 @@ import com.google.protobuf.ByteString;
|
||||
import com.usatiuk.dhfs.files.conflicts.NoOpConflictResolver;
|
||||
import com.usatiuk.dhfs.objects.jrepository.AssumedUnique;
|
||||
import com.usatiuk.dhfs.objects.jrepository.JObjectData;
|
||||
import com.usatiuk.dhfs.objects.jrepository.Leaf;
|
||||
import com.usatiuk.dhfs.objects.persistence.ChunkDataP;
|
||||
import com.usatiuk.dhfs.objects.repository.ConflictResolver;
|
||||
import lombok.Getter;
|
||||
@@ -18,6 +19,7 @@ import java.util.stream.Collectors;
|
||||
|
||||
@Getter
|
||||
@AssumedUnique
|
||||
@Leaf
|
||||
public class ChunkData extends JObjectData {
|
||||
@Serial
|
||||
private static final long serialVersionUID = 1;
|
||||
|
||||
@@ -22,6 +22,7 @@ import java.nio.charset.StandardCharsets;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.Paths;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
@@ -48,6 +49,9 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
@ConfigProperty(name = "dhfs.files.use_hash_for_chunks")
|
||||
boolean useHashForChunks;
|
||||
|
||||
@ConfigProperty(name = "dhfs.files.allow_recursive_delete")
|
||||
boolean allowRecursiveDelete;
|
||||
|
||||
@ConfigProperty(name = "dhfs.objects.ref_verification")
|
||||
boolean refVerification;
|
||||
|
||||
@@ -162,13 +166,17 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
if (dir.getKid(fname).isPresent())
|
||||
return false;
|
||||
|
||||
bump.apply();
|
||||
|
||||
boolean created = dir.putKid(fname, fuuid);
|
||||
if (!created) return false;
|
||||
|
||||
jObjectManager.put(f, Optional.of(dir.getName()));
|
||||
try {
|
||||
jObjectManager.put(f, Optional.of(dir.getName()));
|
||||
} catch (Exception ex) {
|
||||
Log.error("Failed creating file " + fuuid);
|
||||
dir.removeKid(fname);
|
||||
}
|
||||
|
||||
bump.apply();
|
||||
dir.setMtime(System.currentTimeMillis());
|
||||
return true;
|
||||
}))
|
||||
@@ -194,12 +202,20 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
if (dir.getKid(dname).isPresent())
|
||||
return false;
|
||||
|
||||
boolean created = dir.putKid(dname, duuid);
|
||||
if (!created) return false;
|
||||
|
||||
try {
|
||||
jObjectManager.put(ndir, Optional.of(dir.getName()));
|
||||
} catch (Exception ex) {
|
||||
Log.error("Failed creating directory " + dname);
|
||||
dir.removeKid(dname);
|
||||
}
|
||||
|
||||
bump.apply();
|
||||
|
||||
jObjectManager.put(ndir, Optional.of(dir.getName()));
|
||||
|
||||
dir.setMtime(System.currentTimeMillis());
|
||||
return dir.putKid(dname, duuid);
|
||||
return true;
|
||||
}))
|
||||
return Optional.empty();
|
||||
|
||||
@@ -232,21 +248,64 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
if (dir.getKid(kname).isEmpty())
|
||||
return false;
|
||||
|
||||
kid.get().runWriteLocked(JObject.ResolutionStrategy.REMOTE, (m2, d2, bump2, i2) -> {
|
||||
if (d2 == null)
|
||||
throw new StatusRuntimeException(Status.UNAVAILABLE.withDescription("Pessimistically not removing unresolved maybe-directory"));
|
||||
if (d2 instanceof Directory)
|
||||
if (!((Directory) d2).getChildren().isEmpty())
|
||||
throw new DirectoryNotEmptyException();
|
||||
AtomicBoolean hadRef = new AtomicBoolean(false);
|
||||
AtomicBoolean removedRef = new AtomicBoolean(false);
|
||||
|
||||
m2.removeRef(m.getName());
|
||||
return null;
|
||||
});
|
||||
try {
|
||||
kid.get().runWriteLocked(JObject.ResolutionStrategy.REMOTE, (m2, d2, bump2, i2) -> {
|
||||
if (m2.checkRef(m.getName()))
|
||||
hadRef.set(true);
|
||||
|
||||
if (d2 == null)
|
||||
throw new StatusRuntimeException(Status.UNAVAILABLE.withDescription("Pessimistically not removing unresolved maybe-directory"));
|
||||
if (!allowRecursiveDelete)
|
||||
if (d2 instanceof Directory)
|
||||
if (!((Directory) d2).getChildren().isEmpty())
|
||||
throw new DirectoryNotEmptyException();
|
||||
|
||||
m2.removeRef(m.getName());
|
||||
removedRef.set(true);
|
||||
return null;
|
||||
});
|
||||
} catch (Exception ex) {
|
||||
Log.error("Failed removing dentry " + name, ex);
|
||||
// If we failed something and removed the ref, try re-adding it
|
||||
if (hadRef.get() && removedRef.get()) {
|
||||
AtomicBoolean hadRef2 = new AtomicBoolean(false);
|
||||
AtomicBoolean addedRef = new AtomicBoolean(false);
|
||||
try {
|
||||
kid.get().runWriteLocked(JObject.ResolutionStrategy.REMOTE, (m2, d2, bump2, i2) -> {
|
||||
if (m2.checkRef(m.getName()))
|
||||
hadRef2.set(true);
|
||||
|
||||
if (d2 == null)
|
||||
throw new StatusRuntimeException(Status.UNAVAILABLE.withDescription("Pessimistically not removing unresolved maybe-directory"));
|
||||
if (!allowRecursiveDelete)
|
||||
if (d2 instanceof Directory)
|
||||
if (!((Directory) d2).getChildren().isEmpty())
|
||||
throw new DirectoryNotEmptyException();
|
||||
|
||||
m2.addRef(m.getName());
|
||||
addedRef.set(true);
|
||||
return null;
|
||||
});
|
||||
} catch (Exception e) {
|
||||
Log.error("Failed fixing up failed-removed dentry " + name, ex);
|
||||
// If it is not there and we haven't added it, continue removing
|
||||
if (!(!hadRef2.get() && !addedRef.get())) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
} else { // Otherwise, we didn't remove anything, go back.
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
boolean removed = dir.removeKid(kname);
|
||||
|
||||
bump.apply();
|
||||
dir.setMtime(System.currentTimeMillis());
|
||||
if (removed) {
|
||||
bump.apply();
|
||||
dir.setMtime(System.currentTimeMillis());
|
||||
}
|
||||
return removed;
|
||||
});
|
||||
}
|
||||
|
||||
@@ -7,6 +7,7 @@ import com.usatiuk.dhfs.files.objects.File;
|
||||
import com.usatiuk.dhfs.files.objects.FsNode;
|
||||
import com.usatiuk.dhfs.files.service.DhfsFileService;
|
||||
import com.usatiuk.dhfs.files.service.DirectoryNotEmptyException;
|
||||
import com.usatiuk.dhfs.objects.repository.persistence.ObjectPersistentStore;
|
||||
import io.grpc.Status;
|
||||
import io.grpc.StatusRuntimeException;
|
||||
import io.quarkus.logging.Log;
|
||||
@@ -33,6 +34,9 @@ import static jnr.posix.FileStat.*;
|
||||
|
||||
@ApplicationScoped
|
||||
public class DhfsFuse extends FuseStubFS {
|
||||
@Inject
|
||||
ObjectPersistentStore persistentStore; // FIXME?
|
||||
|
||||
@ConfigProperty(name = "dhfs.fuse.root")
|
||||
String root;
|
||||
|
||||
@@ -67,13 +71,11 @@ public class DhfsFuse extends FuseStubFS {
|
||||
@Override
|
||||
public int statfs(String path, Statvfs stbuf) {
|
||||
try {
|
||||
//FIXME:
|
||||
if ("/".equals(path)) {
|
||||
stbuf.f_blocks.set(1024 * 1024 * 1024); // total data blocks in file system
|
||||
stbuf.f_frsize.set(1024); // fs block size
|
||||
stbuf.f_bfree.set(1024 * 1024 * 1024); // free blocks in fs
|
||||
stbuf.f_bavail.set(1024 * 1024 * 1024); // avail blocks in fs
|
||||
}
|
||||
stbuf.f_frsize.set(4096);
|
||||
stbuf.f_blocks.set(persistentStore.getTotalSpace() / 4096L); // total data blocks in file system
|
||||
stbuf.f_bfree.set(persistentStore.getFreeSpace() / 4096L); // free blocks in fs
|
||||
stbuf.f_bavail.set(persistentStore.getUsableSpace() / 4096L); // avail blocks in fs
|
||||
stbuf.f_namemax.set(2048);
|
||||
return super.statfs(path, stbuf);
|
||||
} catch (Exception e) {
|
||||
Log.error("When statfs " + path, e);
|
||||
|
||||
@@ -20,6 +20,8 @@ public class JObjectLRU {
|
||||
JObjectSizeEstimator jObjectSizeEstimator;
|
||||
@ConfigProperty(name = "dhfs.objects.lru.limit")
|
||||
long sizeLimit;
|
||||
@ConfigProperty(name = "dhfs.objects.lru.print-stats")
|
||||
boolean printStats;
|
||||
|
||||
private long _curSize = 0;
|
||||
private long _evict = 0;
|
||||
@@ -28,29 +30,32 @@ public class JObjectLRU {
|
||||
private final AtomicLong _lastDrain = new AtomicLong(0);
|
||||
|
||||
private final LinkedHashMap<JObject<?>, Long> _cache = new LinkedHashMap<>();
|
||||
private ExecutorService _statusExecutor;
|
||||
private ExecutorService _statusExecutor = null;
|
||||
|
||||
@Startup
|
||||
void init() {
|
||||
_statusExecutor = Executors.newSingleThreadExecutor();
|
||||
_statusExecutor.submit(() -> {
|
||||
try {
|
||||
while (true) {
|
||||
Thread.sleep(1000);
|
||||
if (_curSize > 0)
|
||||
Log.info("Cache status: size="
|
||||
+ _curSize / 1024 / 1024 + "MB"
|
||||
+ " evicted=" + _evict);
|
||||
_evict = 0;
|
||||
if (printStats) {
|
||||
_statusExecutor = Executors.newSingleThreadExecutor();
|
||||
_statusExecutor.submit(() -> {
|
||||
try {
|
||||
while (true) {
|
||||
Thread.sleep(1000);
|
||||
if (_curSize > 0)
|
||||
Log.info("Cache status: size="
|
||||
+ _curSize / 1024 / 1024 + "MB"
|
||||
+ " evicted=" + _evict);
|
||||
_evict = 0;
|
||||
}
|
||||
} catch (InterruptedException ignored) {
|
||||
}
|
||||
} catch (InterruptedException ignored) {
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@Shutdown
|
||||
void shutdown() {
|
||||
_statusExecutor.shutdownNow();
|
||||
if (_statusExecutor != null)
|
||||
_statusExecutor.shutdownNow();
|
||||
}
|
||||
|
||||
public void notifyAccess(JObject<?> obj) {
|
||||
|
||||
@@ -146,7 +146,9 @@ public class JObjectRefProcessor {
|
||||
String next = null;
|
||||
|
||||
while (next == null) {
|
||||
next = _canDeleteRetries.tryGet();
|
||||
next = _candidates.tryGet();
|
||||
if (next == null)
|
||||
next = _canDeleteRetries.tryGet();
|
||||
if (next == null)
|
||||
next = _candidates.get(canDeleteRetryDelay);
|
||||
}
|
||||
@@ -163,13 +165,14 @@ public class JObjectRefProcessor {
|
||||
return null;
|
||||
}
|
||||
|
||||
got.tryResolve(JObject.ResolutionStrategy.LOCAL_ONLY);
|
||||
if (!got.getKnownClass().isAnnotationPresent(Leaf.class))
|
||||
got.tryResolve(JObject.ResolutionStrategy.LOCAL_ONLY);
|
||||
|
||||
Log.debug("Deleting " + m.getName());
|
||||
m.markDeleted();
|
||||
|
||||
Collection<String> extracted = null;
|
||||
if (got.getData() != null)
|
||||
if (!got.getKnownClass().isAnnotationPresent(Leaf.class) && got.getData() != null)
|
||||
extracted = got.getData().extractRefs();
|
||||
Collection<String> saved = got.getMeta().getSavedRefs();
|
||||
|
||||
|
||||
@@ -142,7 +142,8 @@ public class JObjectResolver {
|
||||
|
||||
private void tryQuickDelete(JObject<?> self) {
|
||||
self.assertRWLock();
|
||||
self.tryResolve(JObject.ResolutionStrategy.LOCAL_ONLY);
|
||||
if (!self.getKnownClass().isAnnotationPresent(Leaf.class))
|
||||
self.tryResolve(JObject.ResolutionStrategy.LOCAL_ONLY);
|
||||
|
||||
if (Log.isTraceEnabled())
|
||||
Log.trace("Quick delete of: " + self.getName());
|
||||
@@ -150,7 +151,7 @@ public class JObjectResolver {
|
||||
self.getMeta().markDeleted();
|
||||
|
||||
Collection<String> extracted = null;
|
||||
if (self.getData() != null)
|
||||
if (!self.getKnownClass().isAnnotationPresent(Leaf.class) && self.getData() != null)
|
||||
extracted = self.getData().extractRefs();
|
||||
Collection<String> saved = self.getMeta().getSavedRefs();
|
||||
|
||||
|
||||
@@ -0,0 +1,12 @@
|
||||
package com.usatiuk.dhfs.objects.jrepository;
|
||||
|
||||
import java.lang.annotation.ElementType;
|
||||
import java.lang.annotation.Retention;
|
||||
import java.lang.annotation.RetentionPolicy;
|
||||
import java.lang.annotation.Target;
|
||||
|
||||
// Indicates the object never has references
|
||||
@Retention(RetentionPolicy.RUNTIME)
|
||||
@Target(ElementType.TYPE)
|
||||
public @interface Leaf {
|
||||
}
|
||||
@@ -66,9 +66,9 @@ public class RemoteHostManager {
|
||||
.<Callable<Void>>map(host -> () -> {
|
||||
try {
|
||||
if (isReachable(host))
|
||||
Log.debug("Heartbeat: " + host);
|
||||
Log.trace("Heartbeat: " + host);
|
||||
else
|
||||
Log.info("Trying to connect to " + host);
|
||||
Log.debug("Trying to connect to " + host);
|
||||
if (pingCheck(host))
|
||||
handleConnectionSuccess(host);
|
||||
else
|
||||
@@ -154,7 +154,7 @@ public class RemoteHostManager {
|
||||
return true;
|
||||
});
|
||||
} catch (Exception ignored) {
|
||||
Log.info("Host " + host + " is unreachable: " + ignored.getMessage() + " " + ignored.getCause());
|
||||
Log.debug("Host " + host + " is unreachable: " + ignored.getMessage() + " " + ignored.getCause());
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -84,7 +84,9 @@ public class AutoSyncProcessor {
|
||||
String name = null;
|
||||
|
||||
while (name == null) {
|
||||
name = _retries.tryGet();
|
||||
name = _pending.tryGet();
|
||||
if (name == null)
|
||||
name = _retries.tryGet();
|
||||
if (name == null)
|
||||
name = _pending.get(1000L); //FIXME:
|
||||
}
|
||||
|
||||
@@ -174,4 +174,20 @@ public class FileObjectPersistentStore implements ObjectPersistentStore {
|
||||
// FIXME: Race?
|
||||
deleteImpl(getMetaPath(name));
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getTotalSpace() {
|
||||
return dataPath.toFile().getTotalSpace();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getFreeSpace() {
|
||||
return dataPath.toFile().getFreeSpace();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getUsableSpace() {
|
||||
return dataPath.toFile().getUsableSpace();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -30,4 +30,8 @@ public interface ObjectPersistentStore {
|
||||
|
||||
// Deletes object metadata and data
|
||||
void deleteObject(String name);
|
||||
|
||||
long getTotalSpace();
|
||||
long getFreeSpace();
|
||||
long getUsableSpace();
|
||||
}
|
||||
|
||||
@@ -126,7 +126,7 @@ public class HashSetDelayedBlockingQueue<T> {
|
||||
}
|
||||
|
||||
@Nullable
|
||||
public T tryGet() throws InterruptedException {
|
||||
public T tryGet() {
|
||||
synchronized (this) {
|
||||
if (_set.isEmpty()) return null;
|
||||
|
||||
|
||||
@@ -12,6 +12,7 @@ dhfs.objects.write_log=false
|
||||
dhfs.fuse.root=${HOME}/dhfs_default/fuse
|
||||
dhfs.fuse.debug=false
|
||||
dhfs.fuse.enabled=true
|
||||
dhfs.files.allow_recursive_delete=false
|
||||
dhfs.files.target_chunk_size=524288
|
||||
# Writes strictly smaller than this will try to merge with blocks nearby
|
||||
dhfs.files.write_merge_threshold=0.8
|
||||
@@ -23,6 +24,7 @@ dhfs.files.write_last_chunk_limit=1.5
|
||||
dhfs.objects.writeback.delay=100
|
||||
dhfs.objects.writeback.limit=134217728
|
||||
dhfs.objects.lru.limit=134217728
|
||||
dhfs.objects.lru.print-stats=false
|
||||
dhfs.objects.writeback.watermark-high=0.6
|
||||
dhfs.objects.writeback.watermark-low=0.4
|
||||
dhfs.objects.writeback.threads=4
|
||||
|
||||
@@ -6,6 +6,7 @@ set -o pipefail || true
|
||||
set -x || true
|
||||
|
||||
exec java \
|
||||
-Xmx512M \
|
||||
--add-exports java.base/sun.nio.ch=ALL-UNNAMED \
|
||||
-Ddhfs.objects.persistence.files.root=/dhfs_root/p \
|
||||
-Ddhfs.objects.root=/dhfs_root/d \
|
||||
|
||||
@@ -24,7 +24,7 @@ EXTRAOPTS_PARSED="$(tr '\n\r' ' ' <"$EXTRAOPTS")"
|
||||
echo "Extra options: $EXTRAOPTS_PARSED"
|
||||
|
||||
java \
|
||||
-Xmx384M \
|
||||
-Xmx512M \
|
||||
-Ddhfs.objects.writeback.limit=134217728 \
|
||||
-Ddhfs.objects.lru.limit=134217728 \
|
||||
--add-exports java.base/sun.nio.ch=ALL-UNNAMED \
|
||||
|
||||
@@ -5,6 +5,15 @@ set -o pipefail
|
||||
|
||||
SCRIPT_DIR=$(cd -- "$(dirname -- "${BASH_SOURCE[0]}")" &>/dev/null && pwd)
|
||||
|
||||
PIDFILE="$SCRIPT_DIR"/.pid
|
||||
|
||||
if [ -f "$PIDFILE" ]; then
|
||||
if kill -0 $(cat "$PIDFILE") >/dev/null; then
|
||||
echo "Already running: "$(cat "$PIDFILE")
|
||||
exit 2
|
||||
fi
|
||||
fi
|
||||
|
||||
# 💀
|
||||
LATEST=$(curl "https://api.github.com/repos/usatiuk/dhfs/actions/runs?branch=main&status=completed&per_page=1" | tr -d "[:space:]" | sed -n "s/.*\[{\"id\":\([0-9]*\).*/\1/p")
|
||||
|
||||
|
||||
Reference in New Issue
Block a user