mirror of
https://github.com/usatiuk/dhfs.git
synced 2025-10-28 20:47:49 +01:00
simple read cache
This commit is contained in:
@@ -4,7 +4,7 @@ import com.usatiuk.dhfs.storage.files.objects.Chunk;
|
||||
import com.usatiuk.dhfs.storage.files.objects.DirEntry;
|
||||
import com.usatiuk.dhfs.storage.files.objects.Directory;
|
||||
import com.usatiuk.dhfs.storage.files.objects.File;
|
||||
import com.usatiuk.dhfs.storage.objects.jrepository.JObjectRepository;
|
||||
import com.usatiuk.dhfs.storage.objects.jrepository.JObjectManager;
|
||||
import com.usatiuk.dhfs.storage.objects.repository.ObjectRepository;
|
||||
import io.quarkus.logging.Log;
|
||||
import io.quarkus.runtime.Shutdown;
|
||||
@@ -29,7 +29,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
@Inject
|
||||
Vertx vertx;
|
||||
@Inject
|
||||
JObjectRepository jObjectRepository;
|
||||
JObjectManager jObjectManager;
|
||||
@Inject
|
||||
ObjectRepository objectRepository;
|
||||
|
||||
@@ -39,7 +39,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
Log.info("Initializing file service");
|
||||
if (!objectRepository.existsObject(namespace, new UUID(0, 0).toString()).await().indefinitely()) {
|
||||
objectRepository.createNamespace(namespace).await().indefinitely();
|
||||
jObjectRepository.writeJObject(namespace, new Directory(new UUID(0, 0)).setMode(0755)).await().indefinitely();
|
||||
jObjectManager.put(namespace, new Directory(new UUID(0, 0)).setMode(0755)).await().indefinitely();
|
||||
}
|
||||
getRoot().await().indefinitely();
|
||||
}
|
||||
@@ -61,7 +61,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
if (found == null)
|
||||
return Uni.createFrom().item(Optional.empty());
|
||||
|
||||
var ref = jObjectRepository.readJObjectChecked(namespace, found.toString(), DirEntry.class)
|
||||
var ref = jObjectManager.get(namespace, found.toString(), DirEntry.class)
|
||||
.await().indefinitely();
|
||||
|
||||
if (!ref.isPresent()) {
|
||||
@@ -109,9 +109,9 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
File f = new File(fuuid);
|
||||
f.setMode(mode);
|
||||
|
||||
jObjectRepository.writeJObject(namespace, f).await().indefinitely();
|
||||
jObjectManager.put(namespace, f).await().indefinitely();
|
||||
dir.getChildren().put(Path.of(name).getFileName().toString(), fuuid);
|
||||
jObjectRepository.writeJObject(namespace, dir).await().indefinitely();
|
||||
jObjectManager.put(namespace, dir).await().indefinitely();
|
||||
|
||||
return Uni.createFrom().item(Optional.of(f));
|
||||
}
|
||||
@@ -129,9 +129,9 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
Directory d = new Directory(duuid);
|
||||
d.setMode(mode);
|
||||
|
||||
jObjectRepository.writeJObject(namespace, d).await().indefinitely();
|
||||
jObjectManager.put(namespace, d).await().indefinitely();
|
||||
dir.getChildren().put(Path.of(name).getFileName().toString(), duuid);
|
||||
jObjectRepository.writeJObject(namespace, dir).await().indefinitely();
|
||||
jObjectManager.put(namespace, dir).await().indefinitely();
|
||||
|
||||
return Uni.createFrom().item(Optional.of(d));
|
||||
}
|
||||
@@ -145,7 +145,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
if (!(found.get() instanceof Directory dir)) return Uni.createFrom().item(false);
|
||||
|
||||
var removed = dir.getChildren().remove(Path.of(name).getFileName().toString());
|
||||
if (removed != null) jObjectRepository.writeJObject(namespace, dir).await().indefinitely();
|
||||
if (removed != null) jObjectManager.put(namespace, dir).await().indefinitely();
|
||||
|
||||
return Uni.createFrom().item(removed != null);
|
||||
}
|
||||
@@ -173,9 +173,8 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
|
||||
if (!(found.get() instanceof Directory dir)) return Uni.createFrom().item(false);
|
||||
|
||||
|
||||
dir.getChildren().put(Path.of(to).getFileName().toString(), dent.get().getUuid());
|
||||
jObjectRepository.writeJObject(namespace, dir).await().indefinitely();
|
||||
jObjectManager.put(namespace, dir).await().indefinitely();
|
||||
|
||||
return Uni.createFrom().item(true);
|
||||
}
|
||||
@@ -187,7 +186,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
|
||||
dent.get().setMode(mode);
|
||||
|
||||
jObjectRepository.writeJObject(namespace, dent.get()).await().indefinitely();
|
||||
jObjectManager.put(namespace, dent.get()).await().indefinitely();
|
||||
|
||||
return Uni.createFrom().item(true);
|
||||
}
|
||||
@@ -204,7 +203,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
|
||||
@Override
|
||||
public Uni<Optional<byte[]>> read(String fileUuid, long offset, int length) {
|
||||
var fileOpt = jObjectRepository.readJObjectChecked(namespace, fileUuid, File.class).await().indefinitely();
|
||||
var fileOpt = jObjectManager.get(namespace, fileUuid, File.class).await().indefinitely();
|
||||
if (fileOpt.isEmpty()) {
|
||||
Log.error("File not found when trying to read: " + fileUuid);
|
||||
return Uni.createFrom().item(Optional.empty());
|
||||
@@ -227,7 +226,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
long toReadInChunk = (offset + length) - curPos;
|
||||
|
||||
var chunkUuid = chunk.getValue();
|
||||
var chunkRead = jObjectRepository.readJObjectChecked(namespace, chunkUuid, Chunk.class).await().indefinitely();
|
||||
var chunkRead = jObjectManager.get(namespace, chunkUuid, Chunk.class).await().indefinitely();
|
||||
|
||||
if (chunkRead.isEmpty()) {
|
||||
Log.error("Chunk requested not found: " + chunkUuid);
|
||||
@@ -242,20 +241,23 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
|
||||
buf.put(chunkBytes, (int) offInChunk, (int) toReadReally);
|
||||
|
||||
if (readableLen >= toReadInChunk)
|
||||
if (readableLen > toReadInChunk)
|
||||
break;
|
||||
else
|
||||
curPos += readableLen;
|
||||
|
||||
if (!chunks.hasNext()) break;
|
||||
|
||||
chunk = chunks.next();
|
||||
}
|
||||
|
||||
return Uni.createFrom().item(Optional.of(buf.array()));
|
||||
// FIXME:
|
||||
return Uni.createFrom().item(Optional.of(Arrays.copyOf(buf.array(), (int) (curPos - offset))));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Uni<Long> write(String fileUuid, long offset, byte[] data) {
|
||||
var fileOpt = jObjectRepository.readJObjectChecked(namespace, fileUuid, File.class).await().indefinitely();
|
||||
var fileOpt = jObjectManager.get(namespace, fileUuid, File.class).await().indefinitely();
|
||||
if (fileOpt.isEmpty()) {
|
||||
Log.error("File not found when trying to read: " + fileUuid);
|
||||
return Uni.createFrom().item(-1L);
|
||||
@@ -274,7 +276,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
|
||||
if (first != null && first.getKey() < offset) {
|
||||
var chunkUuid = first.getValue();
|
||||
var chunkRead = jObjectRepository.readJObjectChecked(namespace, chunkUuid, Chunk.class).await().indefinitely();
|
||||
var chunkRead = jObjectManager.get(namespace, chunkUuid, Chunk.class).await().indefinitely();
|
||||
|
||||
if (chunkRead.isEmpty()) {
|
||||
Log.error("Chunk requested not found: " + chunkUuid);
|
||||
@@ -283,20 +285,20 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
|
||||
var chunkBytes = chunkRead.get().getBytes();
|
||||
Chunk newChunk = new Chunk(Arrays.copyOfRange(chunkBytes, 0, (int) (offset - first.getKey())));
|
||||
jObjectRepository.writeJObject(namespace, newChunk).await().indefinitely();
|
||||
jObjectManager.tryPut(namespace, newChunk).await().indefinitely();
|
||||
|
||||
newChunks.put(first.getKey(), newChunk.getHash());
|
||||
}
|
||||
|
||||
{
|
||||
Chunk newChunk = new Chunk(data);
|
||||
jObjectRepository.writeJObject(namespace, newChunk).await().indefinitely();
|
||||
jObjectManager.tryPut(namespace, newChunk).await().indefinitely();
|
||||
|
||||
newChunks.put(offset, newChunk.getHash());
|
||||
}
|
||||
if (last != null) {
|
||||
var lchunkUuid = last.getValue();
|
||||
var lchunkRead = jObjectRepository.readJObjectChecked(namespace, lchunkUuid, Chunk.class).await().indefinitely();
|
||||
var lchunkRead = jObjectManager.get(namespace, lchunkUuid, Chunk.class).await().indefinitely();
|
||||
|
||||
if (lchunkRead.isEmpty()) {
|
||||
Log.error("Chunk requested not found: " + lchunkUuid);
|
||||
@@ -308,20 +310,22 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
if (last.getKey() + lchunkBytes.length > offset + data.length) {
|
||||
int start = (int) ((offset + data.length) - last.getKey());
|
||||
Chunk newChunk = new Chunk(Arrays.copyOfRange(lchunkBytes, start, lchunkBytes.length - start));
|
||||
jObjectRepository.writeJObject(namespace, newChunk).await().indefinitely();
|
||||
jObjectManager.tryPut(namespace, newChunk).await().indefinitely();
|
||||
|
||||
newChunks.put(first.getKey(), newChunk.getHash());
|
||||
}
|
||||
}
|
||||
|
||||
jObjectRepository.writeJObject(namespace, new File(UUID.fromString(fileUuid)).setChunks(newChunks)).await().indefinitely();
|
||||
file.setChunks(newChunks);
|
||||
|
||||
jObjectManager.put(namespace, file).await().indefinitely();
|
||||
|
||||
return Uni.createFrom().item((long) data.length);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Uni<Boolean> truncate(String fileUuid, long length) {
|
||||
var fileOpt = jObjectRepository.readJObjectChecked(namespace, fileUuid, File.class).await().indefinitely();
|
||||
var fileOpt = jObjectManager.get(namespace, fileUuid, File.class).await().indefinitely();
|
||||
if (fileOpt.isEmpty()) {
|
||||
Log.error("File not found when trying to read: " + fileUuid);
|
||||
return Uni.createFrom().item(false);
|
||||
@@ -329,8 +333,8 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
var file = fileOpt.get();
|
||||
|
||||
if (length == 0) {
|
||||
jObjectRepository.writeJObject(namespace, new File(UUID.fromString(fileUuid)).setChunks(new TreeMap<>()))
|
||||
.await().indefinitely();
|
||||
file.setChunks(new TreeMap<>());
|
||||
jObjectManager.put(namespace, file).await().indefinitely();
|
||||
return Uni.createFrom().item(true);
|
||||
}
|
||||
|
||||
@@ -342,7 +346,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
|
||||
if (lastChunk != null) {
|
||||
var chunkUuid = lastChunk.getValue();
|
||||
var chunkRead = jObjectRepository.readJObjectChecked(namespace, chunkUuid, Chunk.class).await().indefinitely();
|
||||
var chunkRead = jObjectManager.get(namespace, chunkUuid, Chunk.class).await().indefinitely();
|
||||
|
||||
if (chunkRead.isEmpty()) {
|
||||
Log.error("Chunk requested not found: " + chunkUuid);
|
||||
@@ -354,13 +358,15 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
if (lastChunk.getKey() + chunkBytes.length > 0) {
|
||||
int start = (int) (length - lastChunk.getKey());
|
||||
Chunk newChunk = new Chunk(Arrays.copyOfRange(chunkBytes, 0, (int) (length - start)));
|
||||
jObjectRepository.writeJObject(namespace, newChunk).await().indefinitely();
|
||||
jObjectManager.tryPut(namespace, newChunk).await().indefinitely();
|
||||
|
||||
newChunks.put(lastChunk.getKey(), newChunk.getHash());
|
||||
}
|
||||
}
|
||||
|
||||
jObjectRepository.writeJObject(namespace, new File(UUID.fromString(fileUuid)).setChunks(new TreeMap<>(newChunks))).await().indefinitely();
|
||||
file.setChunks(new TreeMap<>(newChunks));
|
||||
|
||||
jObjectManager.put(namespace, file).await().indefinitely();
|
||||
|
||||
return Uni.createFrom().item(true);
|
||||
}
|
||||
@@ -371,7 +377,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
//FIXME:
|
||||
for (var chunk : f.getChunks().entrySet()) {
|
||||
var chunkUuid = chunk.getValue();
|
||||
var chunkRead = jObjectRepository.readJObjectChecked(namespace, chunkUuid, Chunk.class).await().indefinitely();
|
||||
var chunkRead = jObjectManager.get(namespace, chunkUuid, Chunk.class).await().indefinitely();
|
||||
|
||||
if (chunkRead.isEmpty()) {
|
||||
Log.error("Chunk requested not found: " + chunkUuid);
|
||||
@@ -386,7 +392,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
|
||||
@Override
|
||||
public Uni<Directory> getRoot() {
|
||||
var read = jObjectRepository.readJObjectChecked(namespace, new UUID(0, 0).toString(), DirEntry.class).await().indefinitely();
|
||||
var read = jObjectManager.get(namespace, new UUID(0, 0).toString(), DirEntry.class).await().indefinitely();
|
||||
if (read.isEmpty() || !(read.get() instanceof Directory)) {
|
||||
Log.error("Root directory not found");
|
||||
}
|
||||
|
||||
@@ -89,24 +89,34 @@ public class DhfsFuse extends FuseStubFS {
|
||||
|
||||
@Override
|
||||
public int read(String path, Pointer buf, long size, long offset, FuseFileInfo fi) {
|
||||
var fileOpt = fileService.open(path).await().indefinitely();
|
||||
if (fileOpt.isEmpty()) return -ErrorCodes.ENOENT();
|
||||
var file = fileOpt.get();
|
||||
var read = fileService.read(file.getUuid().toString(), offset, (int) size).await().indefinitely();
|
||||
if (read.isEmpty()) return 0;
|
||||
buf.put(0, read.get(), 0, read.get().length);
|
||||
return read.get().length;
|
||||
try {
|
||||
var fileOpt = fileService.open(path).await().indefinitely();
|
||||
if (fileOpt.isEmpty()) return -ErrorCodes.ENOENT();
|
||||
var file = fileOpt.get();
|
||||
var read = fileService.read(file.getUuid().toString(), offset, (int) size).await().indefinitely();
|
||||
if (read.isEmpty()) return 0;
|
||||
buf.put(0, read.get(), 0, read.get().length);
|
||||
return read.get().length;
|
||||
} catch (Exception e) {
|
||||
Log.error("When reading " + path, e);
|
||||
return -ErrorCodes.ENOENT();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public int write(String path, Pointer buf, long size, long offset, FuseFileInfo fi) {
|
||||
var fileOpt = fileService.open(path).await().indefinitely();
|
||||
if (fileOpt.isEmpty()) return -ErrorCodes.ENOENT();
|
||||
var file = fileOpt.get();
|
||||
byte[] buffer = new byte[(int) size];
|
||||
buf.get(0, buffer, 0, (int) size);
|
||||
var written = fileService.write(file.getUuid().toString(), offset, buffer).await().indefinitely();
|
||||
return written.intValue();
|
||||
try {
|
||||
var fileOpt = fileService.open(path).await().indefinitely();
|
||||
if (fileOpt.isEmpty()) return -ErrorCodes.ENOENT();
|
||||
var file = fileOpt.get();
|
||||
byte[] buffer = new byte[(int) size];
|
||||
buf.get(0, buffer, 0, (int) size);
|
||||
var written = fileService.write(file.getUuid().toString(), offset, buffer).await().indefinitely();
|
||||
return written.intValue();
|
||||
} catch (Exception e) {
|
||||
Log.error("When writing " + path, e);
|
||||
return -ErrorCodes.ENOENT();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -0,0 +1,12 @@
|
||||
package com.usatiuk.dhfs.storage.objects.jrepository;
|
||||
|
||||
import io.smallrye.mutiny.Uni;
|
||||
|
||||
import java.util.Optional;
|
||||
|
||||
public interface JObjectManager {
|
||||
public <T extends JObject> Uni<Optional<T>> get(String namespace, String key, Class<T> clazz);
|
||||
public <T extends JObject> Uni<Void> put(String namespace, T object);
|
||||
// Returns the object from store if it existed, nothing otherwise
|
||||
public <T extends JObject> Uni<Optional<T>> tryPut(String namespace, T object);
|
||||
}
|
||||
@@ -0,0 +1,109 @@
|
||||
package com.usatiuk.dhfs.storage.objects.jrepository;
|
||||
|
||||
import io.quarkus.logging.Log;
|
||||
import io.smallrye.mutiny.Uni;
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
import jakarta.inject.Inject;
|
||||
import lombok.Getter;
|
||||
|
||||
import java.lang.ref.ReferenceQueue;
|
||||
import java.lang.ref.SoftReference;
|
||||
import java.util.HashMap;
|
||||
import java.util.Optional;
|
||||
|
||||
@ApplicationScoped
|
||||
public class JObjectManagerImpl implements JObjectManager {
|
||||
@Inject
|
||||
JObjectRepository jObjectRepository;
|
||||
|
||||
private static class NamedSoftReference extends SoftReference<JObject> {
|
||||
public NamedSoftReference(JObject target, ReferenceQueue<? super JObject> q) {
|
||||
super(target, q);
|
||||
this.key = target.getName();
|
||||
}
|
||||
|
||||
@Getter
|
||||
final String key;
|
||||
}
|
||||
|
||||
private final HashMap<String, NamedSoftReference> _map = new HashMap<>();
|
||||
private final ReferenceQueue<JObject> _refQueue = new ReferenceQueue<>();
|
||||
|
||||
private void cleanup() {
|
||||
NamedSoftReference cur;
|
||||
while ((cur = (NamedSoftReference) _refQueue.poll()) != null) {
|
||||
synchronized (_map) {
|
||||
if (_map.containsKey(cur.key) && (_map.get(cur.key).get() == null))
|
||||
_map.remove(cur.key);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private <T extends JObject> T getFromMap(String namespace, String key, Class<T> clazz) {
|
||||
synchronized (_map) {
|
||||
if (_map.containsKey(key)) {
|
||||
var ref = _map.get(key).get();
|
||||
if (ref != null) {
|
||||
if (!clazz.isAssignableFrom(ref.getClass())) {
|
||||
Log.error("Cached object type mismatch: " + namespace + "/" + key);
|
||||
_map.remove(key);
|
||||
} else
|
||||
return (T) ref;
|
||||
}
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T extends JObject> Uni<Optional<T>> get(String namespace, String key, Class<T> clazz) {
|
||||
cleanup();
|
||||
synchronized (_map) {
|
||||
var inMap = getFromMap(namespace, key, clazz);
|
||||
if (inMap != null) return Uni.createFrom().item(Optional.of(inMap));
|
||||
}
|
||||
|
||||
return jObjectRepository.readJObjectChecked(namespace, key, clazz).map(read -> {
|
||||
if (read.isEmpty())
|
||||
return Optional.empty();
|
||||
|
||||
synchronized (_map) {
|
||||
var inMap = getFromMap(namespace, key, clazz);
|
||||
if (inMap != null) return Optional.of(inMap);
|
||||
_map.put(key, new NamedSoftReference(read.get(), _refQueue));
|
||||
}
|
||||
|
||||
return Optional.of(read.get());
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T extends JObject> Uni<Void> put(String namespace, T object) {
|
||||
cleanup();
|
||||
|
||||
synchronized (_map) {
|
||||
var inMap = getFromMap(namespace, object.getName(), object.getClass());
|
||||
if (inMap != null && inMap != object)
|
||||
throw new IllegalArgumentException("Trying to insert different object with same key");
|
||||
else if (inMap == null)
|
||||
_map.put(object.getName(), new NamedSoftReference(object, _refQueue));
|
||||
}
|
||||
|
||||
return jObjectRepository.writeJObject(namespace, object);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T extends JObject> Uni<Optional<T>> tryPut(String namespace, T object) {
|
||||
cleanup();
|
||||
|
||||
synchronized (_map) {
|
||||
var inMap = getFromMap(namespace, object.getName(), object.getClass());
|
||||
if (inMap != null) return Uni.createFrom().item(Optional.of((T) inMap));
|
||||
else
|
||||
_map.put(object.getName(), new NamedSoftReference(object, _refQueue));
|
||||
}
|
||||
|
||||
return jObjectRepository.writeJObject(namespace, object).map(t -> Optional.empty());
|
||||
}
|
||||
|
||||
}
|
||||
Reference in New Issue
Block a user