rewrite all objects to be in single store part 1

This commit is contained in:
2024-06-21 12:49:40 +02:00
parent 7477023aab
commit b5e7e142e8
37 changed files with 623 additions and 1144 deletions

View File

@@ -1,32 +0,0 @@
package com.usatiuk.dhfs.storage.files.api;
import io.quarkus.grpc.GrpcService;
import io.smallrye.common.annotation.Blocking;
import io.smallrye.mutiny.Uni;
@GrpcService
public class DhfsFileGrpcService implements DhfsFilesGrpc {
@Override
@Blocking
public Uni<FindFilesReply> findFiles(FindFilesRequest request) {
return null;
}
@Override
@Blocking
public Uni<ReadFileReply> readFile(ReadFileRequest request) {
return null;
}
@Override
@Blocking
public Uni<WriteFileReply> writeFile(WriteFileRequest request) {
return null;
}
@Override
@Blocking
public Uni<DeleteFileReply> deleteFile(DeleteFileRequest request) {
return null;
}
}

View File

@@ -1,6 +1,6 @@
package com.usatiuk.dhfs.storage.files.objects;
import com.usatiuk.dhfs.storage.objects.jrepository.JObject;
import com.usatiuk.dhfs.storage.objects.jrepository.JObjectData;
import com.usatiuk.dhfs.storage.objects.repository.distributed.ConflictResolver;
import lombok.Getter;
import org.apache.commons.codec.digest.DigestUtils;
@@ -9,15 +9,24 @@ import java.util.Arrays;
import java.util.Objects;
@Getter
public class ChunkData extends JObject {
public class ChunkData extends JObjectData {
final String _hash;
final byte[] _bytes;
public ChunkData(byte[] bytes) {
super();
this._bytes = Arrays.copyOf(bytes, bytes.length);
this._hash = DigestUtils.sha512Hex(bytes);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ChunkData chunkData = (ChunkData) o;
return Objects.equals(_hash, chunkData._hash);
}
@Override
public String getName() {
return getNameFromHash(_hash);
@@ -29,19 +38,6 @@ public class ChunkData extends JObject {
}
public static String getNameFromHash(String hash) {
return hash + "_data";
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ChunkData chunkData = (ChunkData) o;
return Objects.equals(_hash, chunkData._hash);
}
@Override
public int hashCode() {
return Objects.hashCode(_hash);
return "data_" + hash;
}
}

View File

@@ -1,35 +1,22 @@
package com.usatiuk.dhfs.storage.files.objects;
import com.usatiuk.dhfs.storage.objects.jrepository.JObject;
import com.usatiuk.dhfs.storage.objects.jrepository.JObjectData;
import com.usatiuk.dhfs.storage.objects.repository.distributed.ConflictResolver;
import lombok.Getter;
import java.util.Objects;
@Getter
public class ChunkInfo extends JObject {
public class ChunkInfo extends JObjectData {
final String _hash;
final Integer _size;
public ChunkInfo(String hash, Integer size) {
super();
this._hash = hash;
this._size = size;
}
@Override
public Class<? extends ConflictResolver> getConflictResolver() {
return NoOpConflictResolver.class;
}
@Override
public String getName() {
return getNameFromHash(_hash);
}
public static String getNameFromHash(String hash) {
return hash + "_info";
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
@@ -42,4 +29,18 @@ public class ChunkInfo extends JObject {
public int hashCode() {
return Objects.hash(_hash, _size);
}
@Override
public String getName() {
return getNameFromHash(_hash);
}
@Override
public Class<? extends ConflictResolver> getConflictResolver() {
return NoOpConflictResolver.class;
}
public static String getNameFromHash(String hash) {
return "info_" + hash;
}
}

View File

@@ -1,6 +1,7 @@
package com.usatiuk.dhfs.storage.files.objects;
import com.usatiuk.dhfs.storage.objects.repository.distributed.ConflictResolver;
import lombok.Getter;
import java.util.*;
@@ -13,62 +14,36 @@ public class Directory extends FsNode {
super(uuid, mode);
}
final DirectoryData _directoryData = new DirectoryData();
@Getter
private final Map<String, UUID> _children = new TreeMap<>();
@FunctionalInterface
public interface DirectoryFunction<R> {
R apply(FsNodeData fsNodeData, DirectoryData DirectoryData);
}
public <R> R runReadLocked(DirectoryFunction<R> fn) {
_lock.readLock().lock();
try {
return fn.apply(_fsNodeData, _directoryData);
} finally {
_lock.readLock().unlock();
}
}
public <R> R runWriteLocked(DirectoryFunction<R> fn) {
_lock.writeLock().lock();
try {
return fn.apply(_fsNodeData, _directoryData);
} finally {
_lock.writeLock().unlock();
}
@Override
public Class<? extends ConflictResolver> getConflictResolver() {
return NotImplementedConflictResolver.class;
}
public Map<String, UUID> getChildrenMap() {
return runReadLocked(((fsNodeData, directoryData) -> new TreeMap<>(directoryData.getChildren())));
return new TreeMap<>(_children);
}
public Optional<UUID> getKid(String name) {
return runReadLocked(((fsNodeData, directoryData) -> {
if (directoryData.getChildren().containsKey(name))
return Optional.of(directoryData.getChildren().get(name));
else return Optional.empty();
}));
if (_children.containsKey(name))
return Optional.of(_children.get(name));
else return Optional.empty();
}
public boolean removeKid(String name) {
return runWriteLocked((fsNodeData, directoryData) -> directoryData.getChildren().remove(name) != null);
return _children.remove(name) != null;
}
public boolean putKid(String name, UUID uuid) {
return runWriteLocked((fsNodeData, directoryData) -> {
if (directoryData.getChildren().containsKey(name)) return false;
if (_children.containsKey(name)) return false;
directoryData.getChildren().put(name, uuid);
return true;
});
_children.put(name, uuid);
return true;
}
public List<String> getChildrenList() {
return runReadLocked((fsNodeData, directoryData) -> directoryData.getChildren().keySet().stream().toList());
}
@Override
public Class<DirectoryConflictResolver> getConflictResolver() {
return DirectoryConflictResolver.class;
return _children.keySet().stream().toList();
}
}

View File

@@ -16,110 +16,107 @@ import java.util.LinkedHashMap;
import java.util.Objects;
import java.util.UUID;
@ApplicationScoped
public class DirectoryConflictResolver implements ConflictResolver {
@ConfigProperty(name = "dhfs.objects.distributed.selfname")
String selfname;
@Inject
ObjectPersistentStore objectPersistentStore;
@Inject
RemoteObjectServiceClient remoteObjectServiceClient;
@Inject
ObjectIndexService objectIndexService;
@Inject
InvalidationQueueService invalidationQueueService;
@Inject
JObjectManager jObjectManager;
@Override
public ConflictResolutionResult resolve(String conflictHost,
ObjectHeader conflictSource,
String localName) {
var oursData = objectPersistentStore.readObject(localName);
var theirsData = remoteObjectServiceClient.getSpecificObject(conflictHost, conflictSource.getName());
var localMeta = objectIndexService.getMeta(localName).orElseThrow(() ->
new NotImplementedException("Race when conflict resolving"));
var oursHeader = localMeta.runReadLocked(ObjectMetaData::toRpcHeader);
var theirsHeader = theirsData.getLeft();
var ours = (Directory) DeserializationHelper.deserialize(oursData);
var theirs = (Directory) DeserializationHelper.deserialize(theirsData.getRight());
if (!ours.getClass().equals(Directory.class) || !theirs.getClass().equals(Directory.class)) {
Log.error("Object type mismatch!");
throw new NotImplementedException();
}
LinkedHashMap<String, UUID> mergedChildren = new LinkedHashMap<>(ours.getChildrenMap());
for (var entry : theirs.getChildrenMap().entrySet()) {
if (mergedChildren.containsKey(entry.getKey()) &&
!Objects.equals(mergedChildren.get(entry.getKey()), entry.getValue())) {
int i = 0;
do {
String name = entry.getKey() + ".conflict." + i + "." + conflictHost;
if (mergedChildren.containsKey(name)) {
i++;
continue;
}
mergedChildren.put(name, entry.getValue());
break;
} while (true);
} else {
mergedChildren.put(entry.getKey(), entry.getValue());
}
}
var newMetaData = new ObjectMetaData(oursHeader.getName(), oursHeader.getConflictResolver());
for (var entry : oursHeader.getChangelog().getEntriesList()) {
newMetaData.getChangelog().put(entry.getHost(), entry.getVersion());
}
for (var entry : theirsHeader.getChangelog().getEntriesList()) {
newMetaData.getChangelog().merge(entry.getHost(), entry.getVersion(), Long::max);
}
boolean wasChanged = mergedChildren.size() != ours.getChildrenMap().size();
if (wasChanged) {
newMetaData.getChangelog().merge(selfname, 1L, Long::sum);
}
var newHdr = newMetaData.toRpcHeader();
var newDir = new Directory(ours.getUuid(), ours.getMode());
for (var entry : mergedChildren.entrySet()) newDir.putKid(entry.getKey(), entry.getValue());
// FIXME:
newDir.setMtime(System.currentTimeMillis());
newDir.setCtime(ours.getCtime());
var newBytes = SerializationUtils.serialize(newDir);
objectIndexService.getMeta(localName).orElseThrow(() ->
new NotImplementedException("Race when conflict resolving")).runWriteLocked(m -> {
if (wasChanged)
if (m.getBestVersion() >= newMetaData.getOurVersion())
throw new NotImplementedException("Race when conflict resolving");
if (m.getBestVersion() > newMetaData.getOurVersion())
throw new NotImplementedException("Race when conflict resolving");
m.getChangelog().clear();
m.getChangelog().putAll(newMetaData.getChangelog());
objectPersistentStore.writeObject(m.getName(), newBytes);
return null;
});
invalidationQueueService.pushInvalidationToAll(oursHeader.getName());
jObjectManager.invalidateJObject(oursHeader.getName());
return ConflictResolutionResult.RESOLVED;
}
}
//@ApplicationScoped
//public class DirectoryConflictResolver implements ConflictResolver {
// @ConfigProperty(name = "dhfs.objects.distributed.selfname")
// String selfname;
//
// @Inject
// ObjectPersistentStore objectPersistentStore;
//
// @Inject
// RemoteObjectServiceClient remoteObjectServiceClient;
//
// @Inject
// InvalidationQueueService invalidationQueueService;
//
// @Inject
// JObjectManager jObjectManager;
//
// @Override
// public ConflictResolutionResult resolve(String conflictHost,
// ObjectHeader conflictSource,
// String localName) {
//
// var oursData = objectPersistentStore.readObject(localName);
// var theirsData = remoteObjectServiceClient.getSpecificObject(conflictHost, conflictSource.getName());
//
// var localMeta = objectIndexService.getMeta(localName).orElseThrow(() ->
// new NotImplementedException("Race when conflict resolving"));
//
// var oursHeader = localMeta.runReadLocked(ObjectMetadata::toRpcHeader);
// var theirsHeader = theirsData.getLeft();
//
// var ours = (Directory) DeserializationHelper.deserialize(oursData);
// var theirs = (Directory) DeserializationHelper.deserialize(theirsData.getRight());
// if (!ours.getClass().equals(Directory.class) || !theirs.getClass().equals(Directory.class)) {
// Log.error("Object type mismatch!");
// throw new NotImplementedException();
// }
//
// LinkedHashMap<String, UUID> mergedChildren = new LinkedHashMap<>(ours.getChildrenMap());
// for (var entry : theirs.getChildrenMap().entrySet()) {
// if (mergedChildren.containsKey(entry.getKey()) &&
// !Objects.equals(mergedChildren.get(entry.getKey()), entry.getValue())) {
// int i = 0;
// do {
// String name = entry.getKey() + ".conflict." + i + "." + conflictHost;
// if (mergedChildren.containsKey(name)) {
// i++;
// continue;
// }
// mergedChildren.put(name, entry.getValue());
// break;
// } while (true);
// } else {
// mergedChildren.put(entry.getKey(), entry.getValue());
// }
// }
//
// var newMetaData = new ObjectMetadata(oursHeader.getName(), oursHeader.getConflictResolver());
//
// for (var entry : oursHeader.getChangelog().getEntriesList()) {
// newMetaData.getChangelog().put(entry.getHost(), entry.getVersion());
// }
// for (var entry : theirsHeader.getChangelog().getEntriesList()) {
// newMetaData.getChangelog().merge(entry.getHost(), entry.getVersion(), Long::max);
// }
//
// boolean wasChanged = mergedChildren.size() != ours.getChildrenMap().size();
// if (wasChanged) {
// newMetaData.getChangelog().merge(selfname, 1L, Long::sum);
// }
//
// var newHdr = newMetaData.toRpcHeader();
//
// var newDir = new Directory(ours.getUuid(), ours.getMode());
// for (var entry : mergedChildren.entrySet()) newDir.putKid(entry.getKey(), entry.getValue());
//
// // FIXME:
// newDir.setMtime(System.currentTimeMillis());
// newDir.setCtime(ours.getCtime());
//
// var newBytes = SerializationUtils.serialize(newDir);
//
// objectIndexService.getMeta(localName).orElseThrow(() ->
// new NotImplementedException("Race when conflict resolving")).runWriteLocked(m -> {
//
// if (wasChanged)
// if (m.getBestVersion() >= newMetaData.getOurVersion())
// throw new NotImplementedException("Race when conflict resolving");
//
// if (m.getBestVersion() > newMetaData.getOurVersion())
// throw new NotImplementedException("Race when conflict resolving");
//
// m.getChangelog().clear();
// m.getChangelog().putAll(newMetaData.getChangelog());
//
// objectPersistentStore.writeObject(m.getName(), newBytes);
// return null;
// });
// invalidationQueueService.pushInvalidationToAll(oursHeader.getName());
// jObjectManager.invalidateJObject(oursHeader.getName());
//
// return ConflictResolutionResult.RESOLVED;
// }
//}

View File

@@ -1,13 +0,0 @@
package com.usatiuk.dhfs.storage.files.objects;
import lombok.Getter;
import java.io.Serializable;
import java.util.Map;
import java.util.TreeMap;
import java.util.UUID;
@Getter
public class DirectoryData implements Serializable {
private final Map<String, UUID> _children = new TreeMap<>();
}

View File

@@ -1,5 +1,9 @@
package com.usatiuk.dhfs.storage.files.objects;
import lombok.Getter;
import java.util.NavigableMap;
import java.util.TreeMap;
import java.util.UUID;
public class File extends FsNode {
@@ -11,28 +15,6 @@ public class File extends FsNode {
super(uuid, mode);
}
final FileData _fileData = new FileData();
@FunctionalInterface
public interface FileFunction<R> {
R apply(FsNodeData fsNodeData, FileData fileData);
}
public <R> R runReadLocked(FileFunction<R> fn) {
_lock.readLock().lock();
try {
return fn.apply(_fsNodeData, _fileData);
} finally {
_lock.readLock().unlock();
}
}
public <R> R runWriteLocked(FileFunction<R> fn) {
_lock.writeLock().lock();
try {
return fn.apply(_fsNodeData, _fileData);
} finally {
_lock.writeLock().unlock();
}
}
@Getter
private final NavigableMap<Long, String> _chunks = new TreeMap<>();
}

View File

@@ -1,12 +0,0 @@
package com.usatiuk.dhfs.storage.files.objects;
import lombok.Getter;
import java.io.Serializable;
import java.util.NavigableMap;
import java.util.TreeMap;
@Getter
public class FileData implements Serializable {
private final NavigableMap<Long, String> _chunks = new TreeMap<>();
}

View File

@@ -1,89 +1,49 @@
package com.usatiuk.dhfs.storage.files.objects;
import com.usatiuk.dhfs.storage.objects.jrepository.JObject;
import com.usatiuk.dhfs.storage.objects.jrepository.JObjectData;
import com.usatiuk.dhfs.storage.objects.repository.distributed.ConflictResolver;
import lombok.Getter;
import lombok.Setter;
import java.util.UUID;
import java.util.function.Function;
public abstract class FsNode extends JObject {
public abstract class FsNode extends JObjectData {
@Getter
final UUID _uuid;
protected FsNode(UUID uuid) {
this._uuid = uuid;
this._fsNodeData.setCtime(System.currentTimeMillis());
this._fsNodeData.setMtime(this._fsNodeData.getCtime());
this._ctime = System.currentTimeMillis();
this._mtime = this._ctime;
}
protected FsNode(UUID uuid, long mode) {
this._uuid = uuid;
this._fsNodeData.setMode(mode);
this._fsNodeData.setCtime(System.currentTimeMillis());
this._fsNodeData.setMtime(this._fsNodeData.getCtime());
this._mode = mode;
this._ctime = System.currentTimeMillis();
this._mtime = this._ctime;
}
@Override
public String getName() {
return _uuid.toString();
}
final FsNodeData _fsNodeData = new FsNodeData();
@FunctionalInterface
public interface FsNodeFunction<R> {
R apply(FsNodeData fsNodeData);
@Override
public Class<? extends ConflictResolver> getConflictResolver() {
return NotImplementedConflictResolver.class;
}
public <R> R runReadLocked(FsNodeFunction<R> fn) {
_lock.readLock().lock();
try {
return fn.apply(_fsNodeData);
} finally {
_lock.readLock().unlock();
}
}
@Getter
@Setter
private long _mode;
public <R> R runWriteLocked(Function<FsNodeData, R> fn) {
_lock.writeLock().lock();
try {
return fn.apply(_fsNodeData);
} finally {
_lock.writeLock().unlock();
}
}
public void setMode(long mode) {
runWriteLocked((fsNodeData) -> {
fsNodeData.setMode(mode);
return null;
});
}
public long getMode() {
return runReadLocked(FsNodeData::getMode);
}
public void setCtime(long ctime) {
runWriteLocked((fsNodeData) -> {
fsNodeData.setCtime(ctime);
return null;
});
}
public long getCtime() {
return runReadLocked(FsNodeData::getCtime);
}
public void setMtime(long mtime) {
runWriteLocked((fsNodeData) -> {
fsNodeData.setMtime(mtime);
return null;
});
}
public long getMtime() {
return runReadLocked(FsNodeData::getMtime);
}
@Getter
@Setter
private long _ctime;
@Getter
@Setter
private long _mtime;
}

View File

@@ -1,14 +0,0 @@
package com.usatiuk.dhfs.storage.files.objects;
import lombok.Getter;
import lombok.Setter;
import java.io.Serializable;
@Getter
@Setter
public class FsNodeData implements Serializable {
private long _mode;
private long _ctime;
private long _mtime;
}

View File

@@ -0,0 +1,12 @@
package com.usatiuk.dhfs.storage.files.objects;
import com.usatiuk.dhfs.objects.repository.distributed.ObjectHeader;
import com.usatiuk.dhfs.storage.objects.repository.distributed.ConflictResolver;
import org.apache.commons.lang3.NotImplementedException;
public class NotImplementedConflictResolver implements ConflictResolver {
@Override
public ConflictResolutionResult resolve(String conflictHost, ObjectHeader conflictSource, String localName) {
throw new NotImplementedException();
}
}

View File

@@ -1,16 +1,14 @@
package com.usatiuk.dhfs.storage.files.service;
import com.usatiuk.dhfs.storage.files.objects.Directory;
import com.usatiuk.dhfs.storage.files.objects.File;
import com.usatiuk.dhfs.storage.files.objects.FsNode;
import java.util.Optional;
public interface DhfsFileService {
Optional<FsNode> getDirEntry(String name);
Optional<File> open(String name);
Optional<File> create(String name, long mode);
Optional<Directory> mkdir(String name, long mode);
Optional<String> open(String name);
Optional<String> create(String name, long mode);
Optional<String> mkdir(String name, long mode);
Optional<FsNode> getattr(String name);
Boolean chmod(String name, long mode);
Boolean rmdir(String name);
Boolean unlink(String name);
@@ -18,11 +16,9 @@ public interface DhfsFileService {
Boolean setTimes(String fileUuid, long atimeMs, long mtimeMs);
Iterable<String> readDir(String name);
Long size(File f);
Long size(String f);
Optional<byte[]> read(String fileUuid, long offset, int length);
Long write(String fileUuid, long offset, byte[] data);
Boolean truncate(String fileUuid, long length);
Directory getRoot();
}

View File

@@ -1,8 +1,10 @@
package com.usatiuk.dhfs.storage.files.service;
import com.usatiuk.dhfs.storage.files.objects.*;
import com.usatiuk.dhfs.storage.objects.jrepository.JObject;
import com.usatiuk.dhfs.storage.objects.jrepository.JObjectManager;
import com.usatiuk.dhfs.storage.objects.repository.ObjectRepository;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.quarkus.logging.Log;
import io.quarkus.runtime.ShutdownEvent;
import io.quarkus.runtime.StartupEvent;
@@ -24,16 +26,12 @@ public class DhfsFileServiceImpl implements DhfsFileService {
Vertx vertx;
@Inject
JObjectManager jObjectManager;
@Inject
ObjectRepository objectRepository;
final static String namespace = "dhfs_files";
void init(@Observes @Priority(500) StartupEvent event) {
Log.info("Initializing file service");
if (!objectRepository.existsObject(new UUID(0, 0).toString())) {
jObjectManager.put(new Directory(new UUID(0, 0), 0755));
}
jObjectManager.getOrPut(new UUID(0, 0).toString(), new Directory(new UUID(0, 0), 0755));
getRoot();
}
@@ -41,39 +39,49 @@ public class DhfsFileServiceImpl implements DhfsFileService {
Log.info("Shutdown");
}
private Optional<FsNode> traverse(FsNode from, Path path) {
private Optional<JObject<? extends FsNode>> traverse(JObject<? extends FsNode> from, Path path) {
if (path.getNameCount() == 0) return Optional.of(from);
if (!(from instanceof Directory dir))
if (!(from.isOf(Directory.class)))
return Optional.empty();
var pathFirstPart = path.getName(0).toString();
var found = dir.getKid(pathFirstPart);
if (found.isEmpty())
return Optional.empty();
return ((JObject<Directory>) from).runReadLocked((m, d) -> {
var found = d.getKid(pathFirstPart);
if (found.isEmpty())
return Optional.empty();
Optional<JObject<? extends FsNode>> ref = jObjectManager.get(found.get().toString(), FsNode.class);
var ref = jObjectManager.get(found.get().toString(), FsNode.class);
if (ref.isEmpty()) {
Log.error("File missing when traversing directory " + from.getName() + ": " + found);
return Optional.empty();
}
if (ref.isEmpty()) {
Log.error("File missing when traversing directory " + from.getName() + ": " + found);
return Optional.empty();
}
if (path.getNameCount() == 1) return ref;
if (path.getNameCount() == 1) return ref;
return traverse(ref.get(), path.subpath(1, path.getNameCount()));
return traverse(ref.get(), path.subpath(1, path.getNameCount()));
});
}
@Override
public Optional<FsNode> getDirEntry(String name) {
private Optional<JObject<? extends FsNode>> getDirEntry(String name) {
var root = getRoot();
var found = traverse(root, Path.of(name));
return found;
}
@Override
public Optional<File> open(String name) {
public Optional<FsNode> getattr(String uuid) {
Optional<JObject<? extends FsNode>> ref = jObjectManager.get(uuid, FsNode.class);
if (ref.isEmpty()) return Optional.empty();
return ref.get().runReadLocked((m, d) -> {
//FIXME:
return Optional.of(d);
});
}
@Override
public Optional<String> open(String name) {
// FIXME:
var root = getRoot();
var found = traverse(root, Path.of(name));
@@ -81,54 +89,54 @@ public class DhfsFileServiceImpl implements DhfsFileService {
if (found.isEmpty())
return Optional.empty();
if (!(found.get() instanceof File))
return Optional.empty();
return Optional.of((File) found.get());
return Optional.of(found.get().getName());
}
@Override
public Optional<File> create(String name, long mode) {
public Optional<String> create(String name, long mode) {
// FIXME:
var root = getRoot();
var found = traverse(root, Path.of(name).getParent());
if (found.isEmpty()) return Optional.empty();
if (!(found.get() instanceof Directory dir)) return Optional.empty();
if (!(found.get().isOf(Directory.class))) return Optional.empty();
var dir = (JObject<Directory>) found.get();
var fuuid = UUID.randomUUID();
File f = new File(fuuid);
f.setMode(mode);
jObjectManager.put(f);
if (!dir.putKid(Path.of(name).getFileName().toString(), fuuid))
if (!dir.runWriteLocked((m, d) -> {
return d.putKid(Path.of(name).getFileName().toString(), fuuid);
}))
return Optional.empty();
jObjectManager.put(dir);
return Optional.of(f);
return Optional.of(f.getName());
}
@Override
public Optional<Directory> mkdir(String name, long mode) {
public Optional<String> mkdir(String name, long mode) {
// FIXME:
var root = getRoot();
var found = traverse(root, Path.of(name).getParent());
if (found.isEmpty()) return Optional.empty();
if (!(found.get() instanceof Directory dir)) return Optional.empty();
if (!(found.get().isOf(Directory.class))) return Optional.empty();
var duuid = UUID.randomUUID();
Directory d = new Directory(duuid);
d.setMode(mode);
var dir = (JObject<Directory>) found.get();
jObjectManager.put(d);
if (!dir.putKid(Path.of(name).getFileName().toString(), duuid))
if (!dir.runWriteLocked((m, dd) -> {
return dd.putKid(Path.of(name).getFileName().toString(), duuid);
}))
return Optional.empty();
jObjectManager.put(dir);
return Optional.of(d);
return Optional.of(d.getName());
}
private Boolean rmdent(String name) {
@@ -137,12 +145,12 @@ public class DhfsFileServiceImpl implements DhfsFileService {
var found = traverse(root, Path.of(name).getParent());
if (found.isEmpty()) return false;
if (!(found.get() instanceof Directory dir)) return false;
if (!(found.get().isOf(Directory.class))) return false;
var removed = dir.removeKid(Path.of(name).getFileName().toString());
if (removed) jObjectManager.put(dir);
return removed;
var dir = (JObject<Directory>) found.get();
return dir.runWriteLocked((m, d) -> {
return d.removeKid(Path.of(name).getFileName().toString());
});
}
@Override
@@ -166,13 +174,14 @@ public class DhfsFileServiceImpl implements DhfsFileService {
var found = traverse(root, Path.of(to).getParent());
if (found.isEmpty()) return false;
if (!(found.get() instanceof Directory dir)) return false;
if (!(found.get().isOf(Directory.class))) return false;
dir.runWriteLocked((n, d) -> {
d.getChildren().put(Path.of(to).getFileName().toString(), dent.get().getUuid());
var dir = (JObject<Directory>) found.get();
dir.runWriteLocked((m, d) -> {
d.getChildren().put(Path.of(to).getFileName().toString(), UUID.fromString(dent.get().getName()));
return null;
});
jObjectManager.put(dir);
return true;
}
@@ -182,9 +191,10 @@ public class DhfsFileServiceImpl implements DhfsFileService {
var dent = getDirEntry(name);
if (dent.isEmpty()) return false;
dent.get().setMode(mode);
jObjectManager.put(dent.get());
dent.get().runWriteLocked((m, d) -> {
d.setMode(mode);
return null;
});
return true;
}
@@ -194,9 +204,10 @@ public class DhfsFileServiceImpl implements DhfsFileService {
var root = getRoot();
var found = traverse(root, Path.of(name));
if (found.isEmpty()) throw new IllegalArgumentException();
if (!(found.get() instanceof Directory foundDir)) throw new IllegalArgumentException();
if (!(found.get().isOf(Directory.class))) throw new IllegalArgumentException();
var dir = (JObject<Directory>) found.get();
return foundDir.getChildrenList();
return dir.runReadLocked((m, d) -> d.getChildrenList());
}
@Override
@@ -211,7 +222,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
AtomicReference<List<Map.Entry<Long, String>>> chunksList = new AtomicReference<>();
try {
file.runReadLocked((fsNodeData, fileData) -> {
file.runReadLocked((md, fileData) -> {
var chunksAll = fileData.getChunks();
if (chunksAll.isEmpty()) {
chunksList.set(new ArrayList<>());
@@ -250,7 +261,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
return Optional.empty();
}
var chunkBytes = chunkRead.get().getBytes();
var chunkBytes = chunkRead.get().runWriteLocked((m, d) -> d.getBytes());
long readableLen = chunkBytes.length - offInChunk;
@@ -313,7 +324,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
return -1L;
}
var chunkBytes = chunkRead.get().getBytes();
var chunkBytes = chunkRead.get().runWriteLocked((m, d) -> d.getBytes());
ChunkData newChunkData = new ChunkData(Arrays.copyOfRange(chunkBytes, 0, (int) (offset - first.getKey())));
ChunkInfo newChunkInfo = new ChunkInfo(newChunkData.getHash(), newChunkData.getBytes().length);
jObjectManager.put(newChunkData);
@@ -339,7 +350,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
return -1L;
}
var lchunkBytes = lchunkRead.get().getBytes();
var lchunkBytes = lchunkRead.get().runWriteLocked((m, d) -> d.getBytes());
if (last.getKey() + lchunkBytes.length > offset + data.length) {
var startInFile = offset + data.length;
@@ -354,10 +365,10 @@ public class DhfsFileServiceImpl implements DhfsFileService {
}
try {
file.runWriteLocked((fsNodeData, fileData) -> {
file.runWriteLocked((m, fileData) -> {
fileData.getChunks().clear();
fileData.getChunks().putAll(newChunks);
fsNodeData.setMtime(System.currentTimeMillis());
fileData.setMtime(System.currentTimeMillis());
return null;
});
} catch (Exception e) {
@@ -365,8 +376,6 @@ public class DhfsFileServiceImpl implements DhfsFileService {
return -1L;
}
jObjectManager.put(file);
return (long) data.length;
}
@@ -381,16 +390,15 @@ public class DhfsFileServiceImpl implements DhfsFileService {
if (length == 0) {
try {
file.runWriteLocked((fsNodeData, fileData) -> {
file.runWriteLocked((m, fileData) -> {
fileData.getChunks().clear();
fsNodeData.setMtime(System.currentTimeMillis());
fileData.setMtime(System.currentTimeMillis());
return null;
});
} catch (Exception e) {
Log.error("Error writing file chunks: " + fileUuid, e);
return false;
}
jObjectManager.put(file);
return true;
}
@@ -421,7 +429,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
return false;
}
var chunkBytes = chunkRead.get().getBytes();
var chunkBytes = chunkRead.get().runWriteLocked((m, d) -> d.getBytes());
if (lastChunk.getKey() + chunkBytes.length > 0) {
int start = (int) (length - lastChunk.getKey());
@@ -435,10 +443,10 @@ public class DhfsFileServiceImpl implements DhfsFileService {
}
try {
file.runWriteLocked((fsNodeData, fileData) -> {
file.runWriteLocked((m, fileData) -> {
fileData.getChunks().clear();
fileData.getChunks().putAll(newChunks);
fsNodeData.setMtime(System.currentTimeMillis());
fileData.setMtime(System.currentTimeMillis());
return null;
});
} catch (Exception e) {
@@ -446,8 +454,6 @@ public class DhfsFileServiceImpl implements DhfsFileService {
return false;
}
jObjectManager.put(file);
return true;
}
@@ -461,8 +467,8 @@ public class DhfsFileServiceImpl implements DhfsFileService {
var file = fileOpt.get();
try {
file.runWriteLocked((fsNodeData, fileData) -> {
fsNodeData.setMtime(mtimeMs);
file.runWriteLocked((m, fileData) -> {
fileData.setMtime(mtimeMs);
return null;
});
} catch (Exception e) {
@@ -470,24 +476,24 @@ public class DhfsFileServiceImpl implements DhfsFileService {
return false;
}
jObjectManager.put(file);
return true;
}
@Override
public Long size(File f) {
public Long size(String uuid) {
int size = 0;
//FIXME:
AtomicReference<TreeMap<Long, String>> chunksAllRef = new AtomicReference<>();
var read = jObjectManager.get(uuid, File.class)
.orElseThrow(() -> new StatusRuntimeException(Status.NOT_FOUND));
try {
f.runReadLocked((fsNodeData, fileData) -> {
read.runReadLocked((fsNodeData, fileData) -> {
chunksAllRef.set(new TreeMap<>(fileData.getChunks()));
return null;
});
} catch (Exception e) {
Log.error("Error reading file: " + f.getUuid(), e);
Log.error("Error reading file: " + uuid, e);
return -1L;
}
@@ -502,17 +508,16 @@ public class DhfsFileServiceImpl implements DhfsFileService {
return -1L;
}
size += chunkRead.get().getSize();
size += chunkRead.get().runReadLocked((m, d) -> d.getSize());
}
return (long) size;
}
@Override
public Directory getRoot() {
var read = jObjectManager.get(new UUID(0, 0).toString(), FsNode.class);
if (read.isEmpty() || !(read.get() instanceof Directory)) {
private JObject<Directory> getRoot() {
var read = jObjectManager.get(new UUID(0, 0).toString(), Directory.class);
if (read.isEmpty()) {
Log.error("Root directory not found");
}
return (Directory) read.get();
return (JObject<Directory>) read.get();
}
}

View File

@@ -68,14 +68,17 @@ public class DhfsFuse extends FuseStubFS {
@Override
public int getattr(String path, FileStat stat) {
try {
Optional<FsNode> found = fileService.getDirEntry(path);
var fileOpt = fileService.open(path);
if (fileOpt.isEmpty()) return -ErrorCodes.ENOENT();
var uuid = fileOpt.get();
Optional<FsNode> found = fileService.getattr(uuid);
if (found.isEmpty()) {
return -ErrorCodes.ENOENT();
}
if (found.get() instanceof File f) {
stat.st_mode.set(S_IFREG | f.getMode());
stat.st_nlink.set(1);
stat.st_size.set(fileService.size(f));
stat.st_size.set(fileService.size(uuid));
} else if (found.get() instanceof Directory d) {
stat.st_mode.set(S_IFDIR | d.getMode());
stat.st_nlink.set(2);
@@ -104,7 +107,7 @@ public class DhfsFuse extends FuseStubFS {
var fileOpt = fileService.open(path);
if (fileOpt.isEmpty()) return -ErrorCodes.ENOENT();
var file = fileOpt.get();
var res = fileService.setTimes(file.getUuid().toString(),
var res = fileService.setTimes(file,
timespec[0].tv_sec.get() * 1000,
timespec[1].tv_sec.get() * 1000);
if (!res) return -ErrorCodes.EINVAL();
@@ -127,7 +130,7 @@ public class DhfsFuse extends FuseStubFS {
var fileOpt = fileService.open(path);
if (fileOpt.isEmpty()) return -ErrorCodes.ENOENT();
var file = fileOpt.get();
var read = fileService.read(file.getUuid().toString(), offset, (int) size);
var read = fileService.read(fileOpt.get(), offset, (int) size);
if (read.isEmpty()) return 0;
buf.put(0, read.get(), 0, read.get().length);
return read.get().length;
@@ -142,10 +145,9 @@ public class DhfsFuse extends FuseStubFS {
try {
var fileOpt = fileService.open(path);
if (fileOpt.isEmpty()) return -ErrorCodes.ENOENT();
var file = fileOpt.get();
byte[] buffer = new byte[(int) size];
buf.get(0, buffer, 0, (int) size);
var written = fileService.write(file.getUuid().toString(), offset, buffer);
var written = fileService.write(fileOpt.get(), offset, buffer);
return written.intValue();
} catch (Exception e) {
Log.error("When writing " + path, e);
@@ -193,7 +195,7 @@ public class DhfsFuse extends FuseStubFS {
var fileOpt = fileService.open(path);
if (fileOpt.isEmpty()) return -ErrorCodes.ENOENT();
var file = fileOpt.get();
var ok = fileService.truncate(file.getUuid().toString(), size);
var ok = fileService.truncate(file, size);
if (ok)
return 0;
else

View File

@@ -1,46 +0,0 @@
package com.usatiuk.dhfs.storage.objects.api;
import com.google.protobuf.ByteString;
import com.usatiuk.dhfs.storage.objects.repository.ObjectRepository;
import io.quarkus.grpc.GrpcService;
import io.smallrye.common.annotation.Blocking;
import io.smallrye.mutiny.Uni;
import jakarta.inject.Inject;
@GrpcService
public class DhfsObjectGrpcService implements DhfsObjectGrpc {
@Inject
ObjectRepository objectRepository;
@Override
@Blocking
public Uni<FindObjectsReply> findObjects(FindObjectsRequest request) {
var objects = objectRepository.findObjects(request.getPrefix());
var builder = FindObjectsReply.newBuilder();
for (var obj : objects) {
builder.addFound(FindObjectsReply.FindObjectsEntry.newBuilder().setName(obj).build());
}
return Uni.createFrom().item(builder.build());
}
@Override
@Blocking
public Uni<ReadObjectReply> readObject(ReadObjectRequest request) {
var read = objectRepository.readObject(request.getName());
return Uni.createFrom().item(ReadObjectReply.newBuilder().setData(ByteString.copyFrom(read)).build());
}
@Override
@Blocking
public Uni<WriteObjectReply> writeObject(WriteObjectRequest request) {
objectRepository.writeObject(request.getName(), request.getData().toByteArray(), "none");
return Uni.createFrom().item(WriteObjectReply.newBuilder().build());
}
@Override
@Blocking
public Uni<DeleteObjectReply> deleteObject(DeleteObjectRequest request) {
objectRepository.deleteObject(request.getName());
return Uni.createFrom().item(DeleteObjectReply.newBuilder().build());
}
}

View File

@@ -1,18 +1,112 @@
package com.usatiuk.dhfs.storage.objects.jrepository;
import com.usatiuk.dhfs.storage.objects.repository.distributed.ConflictResolver;
import com.usatiuk.dhfs.storage.objects.repository.distributed.ObjectMetadata;
import org.apache.commons.lang3.NotImplementedException;
import java.io.Serializable;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public abstract class JObject implements Serializable {
public abstract String getName();
public class JObject<T extends JObjectData> implements Serializable {
protected JObject(JObjectResolver resolver, String name, String conflictResolver, T obj) {
_resolver = resolver;
_metaPart = new ObjectMetadata(name, conflictResolver, obj.getClass());
_dataPart = obj;
}
protected JObject(JObjectResolver resolver, ObjectMetadata objectMetadata) {
_resolver = resolver;
_metaPart = objectMetadata;
}
public String getName() {
return runReadLocked(ObjectMetadata::getName);
}
protected final ReadWriteLock _lock = new ReentrantReadWriteLock();
private final ObjectMetadata _metaPart;
private final JObjectResolver _resolver;
private T _dataPart;
public Class<? extends ConflictResolver> getConflictResolver() {
throw new NotImplementedException();
return runReadLocked(m -> {
try {
return (Class<? extends ConflictResolver>) Class.forName(m.getConflictResolver());
} catch (ClassNotFoundException e) {
throw new NotImplementedException(e);
}
});
}
@FunctionalInterface
public interface ObjectMetaFn<R> {
R apply(ObjectMetadata indexData);
}
@FunctionalInterface
public interface ObjectDataFn<T, R> {
R apply(ObjectMetadata meta, T data);
}
public <X> boolean isOf(Class<X> klass) {
return runReadLocked((m) -> (klass.isAssignableFrom(m.getType())));
}
public <R> R runReadLocked(ObjectMetaFn<R> fn) {
_lock.readLock().lock();
try {
return fn.apply(_metaPart);
} finally {
_lock.readLock().unlock();
}
}
public <R> R runWriteLocked(ObjectMetaFn<R> fn) {
_lock.writeLock().lock();
try {
var ret = fn.apply(_metaPart);
_resolver.notifyWrite(this);
return ret;
} finally {
_lock.writeLock().unlock();
}
}
private void resolveDataPart() {
if (_dataPart == null) {
_lock.readLock().lock();
try {
if (_dataPart == null) {
_dataPart = _resolver.resolveData(this);
if (!_metaPart.getType().isAssignableFrom(_dataPart.getClass()))
throw new NotImplementedException("Type mismatch for " + getName());
}
} finally {
_lock.readLock().unlock();
}
}
}
public <R> R runReadLocked(ObjectDataFn<T, R> fn) {
resolveDataPart();
_lock.readLock().lock();
try {
return fn.apply(_metaPart, _dataPart);
} finally {
_lock.readLock().unlock();
}
}
public <R> R runWriteLocked(ObjectDataFn<T, R> fn) {
resolveDataPart();
_lock.writeLock().lock();
try {
var ret = fn.apply(_metaPart, _dataPart);
_resolver.notifyWrite(this);
return ret;
} finally {
_lock.writeLock().unlock();
}
}
}

View File

@@ -1,28 +0,0 @@
package com.usatiuk.dhfs.storage.objects.jrepository;
import com.usatiuk.dhfs.objects.repository.distributed.ObjectHeader;
import com.usatiuk.dhfs.storage.DeserializationHelper;
import com.usatiuk.dhfs.storage.objects.repository.distributed.ConflictResolver;
import com.usatiuk.dhfs.storage.objects.repository.persistence.ObjectPersistentStore;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Instance;
import jakarta.inject.Inject;
import jakarta.inject.Named;
@ApplicationScoped
@Named("JObjectConflictResolution")
public class JObjectConflictResolution implements ConflictResolver {
@Inject
Instance<ConflictResolver> conflictResolvers;
@Inject
ObjectPersistentStore objectPersistentStore;
@Override
public ConflictResolutionResult
resolve(String conflictHost, ObjectHeader conflictSource, String localName) {
var oursData = objectPersistentStore.readObject(localName);
var ours = (JObject) DeserializationHelper.deserialize(oursData);
return conflictResolvers.select(ours.getConflictResolver()).get().resolve(conflictHost, conflictSource, localName);
}
}

View File

@@ -0,0 +1,10 @@
package com.usatiuk.dhfs.storage.objects.jrepository;
import com.usatiuk.dhfs.storage.objects.repository.distributed.ConflictResolver;
import java.io.Serializable;
public abstract class JObjectData implements Serializable {
public abstract String getName();
public abstract Class<? extends ConflictResolver> getConflictResolver();
}

View File

@@ -1,11 +1,18 @@
package com.usatiuk.dhfs.storage.objects.jrepository;
import io.smallrye.mutiny.Uni;
import com.usatiuk.dhfs.storage.objects.repository.distributed.ObjectMetadata;
import java.util.Collection;
import java.util.Optional;
public interface JObjectManager {
<T extends JObject> Optional<T> get(String name, Class<T> clazz);
<T extends JObject> void put(T object);
void invalidateJObject(String name);
<D extends JObjectData> Optional<JObject<? extends D>> get(String name, Class<D> klass);
Optional<JObject<?>> get(String name);
Collection<JObject<?>> find(String prefix);
<T extends JObjectData> JObject<T> put(T object);
JObject<?> getOrPut(String name, ObjectMetadata md);
<T extends JObjectData> JObject<T> getOrPut(String name, T object);
}

View File

@@ -1,12 +1,18 @@
package com.usatiuk.dhfs.storage.objects.jrepository;
import io.quarkus.logging.Log;
import com.usatiuk.dhfs.storage.DeserializationHelper;
import com.usatiuk.dhfs.storage.objects.repository.distributed.ObjectMetadata;
import com.usatiuk.dhfs.storage.objects.repository.persistence.ObjectPersistentStore;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import lombok.Getter;
import org.apache.commons.lang3.NotImplementedException;
import java.lang.ref.ReferenceQueue;
import java.lang.ref.SoftReference;
import java.util.Collection;
import java.util.HashMap;
import java.util.Objects;
import java.util.Optional;
@@ -14,10 +20,13 @@ import java.util.Optional;
@ApplicationScoped
public class JObjectManagerImpl implements JObjectManager {
@Inject
JObjectRepository jObjectRepository;
ObjectPersistentStore objectPersistentStore;
private static class NamedSoftReference extends SoftReference<JObject> {
public NamedSoftReference(JObject target, ReferenceQueue<? super JObject> q) {
@Inject
JObjectResolver jObjectResolver;
private static class NamedSoftReference extends SoftReference<JObject<?>> {
public NamedSoftReference(JObject<?> target, ReferenceQueue<JObject<?>> q) {
super(target, q);
this._key = target.getName();
}
@@ -27,7 +36,7 @@ public class JObjectManagerImpl implements JObjectManager {
}
private final HashMap<String, NamedSoftReference> _map = new HashMap<>();
private final ReferenceQueue<JObject> _refQueue = new ReferenceQueue<>();
private final ReferenceQueue<JObject<?>> _refQueue = new ReferenceQueue<>();
private void cleanup() {
NamedSoftReference cur;
@@ -39,16 +48,12 @@ public class JObjectManagerImpl implements JObjectManager {
}
}
private <T extends JObject> T getFromMap(String key, Class<T> clazz) {
private JObject<?> getFromMap(String key) {
synchronized (_map) {
if (_map.containsKey(key)) {
var ref = _map.get(key).get();
if (ref != null) {
if (!clazz.isAssignableFrom(ref.getClass())) {
Log.error("Cached object type mismatch: " + key);
_map.remove(key);
} else
return (T) ref;
return ref;
}
}
}
@@ -56,46 +61,122 @@ public class JObjectManagerImpl implements JObjectManager {
}
@Override
public <T extends JObject> Optional<T> get(String name, Class<T> clazz) {
public Optional<JObject<?>> get(String name) {
cleanup();
synchronized (_map) {
var inMap = getFromMap(name, clazz);
var inMap = getFromMap(name);
if (inMap != null) return Optional.of(inMap);
}
var read = jObjectRepository.readJObjectChecked(name, clazz);
if (read.isEmpty())
byte[] readMd;
try {
readMd = objectPersistentStore.readObject("meta_" + name);
} catch (StatusRuntimeException ex) {
if (!ex.getStatus().equals(Status.NOT_FOUND)) throw ex;
return Optional.empty();
}
var meta = DeserializationHelper.deserialize(readMd);
if (!(meta instanceof ObjectMetadata))
throw new NotImplementedException("Unexpected metadata type for " + name);
synchronized (_map) {
var inMap = getFromMap(name, clazz);
var inMap = getFromMap(name);
if (inMap != null) return Optional.of(inMap);
_map.put(name, new NamedSoftReference(read.get(), _refQueue));
JObject<?> newObj = new JObject<>(jObjectResolver, (ObjectMetadata) meta);
_map.put(name, new NamedSoftReference(newObj, _refQueue));
return Optional.of(newObj);
}
return Optional.of(read.get());
}
@Override
public <T extends JObject> void put(T object) {
public <D extends JObjectData> Optional<JObject<? extends D>> get(String name, Class<D> klass) {
var got = get(name);
if (got.isEmpty()) return Optional.of((JObject<? extends D>) got.get());
if (!got.get().isOf(klass)) throw new NotImplementedException("Class mismatch for " + name);
return Optional.of((JObject<? extends D>) got.get());
}
@Override
public Collection<JObject<?>> find(String prefix) {
throw new NotImplementedException();
}
@Override
public <D extends JObjectData> JObject<D> put(D object) {
cleanup();
synchronized (_map) {
var inMap = getFromMap(object.getName(), object.getClass());
if (inMap != null && inMap != object && !Objects.equals(inMap, object)) {
throw new IllegalArgumentException("Trying to insert different object with same key");
} else if (inMap == null)
_map.put(object.getName(), new NamedSoftReference(object, _refQueue));
var inMap = getFromMap(object.getName());
if (inMap != null) {
inMap.runReadLocked((m, d) -> {
if (!Objects.equals(d, object))
throw new IllegalArgumentException("Trying to insert different object with same key");
return null;
});
return (JObject<D>) inMap;
} else {
var created = new JObject<D>(jObjectResolver, object.getName(), object.getConflictResolver().getName(), object);
_map.put(object.getName(), new NamedSoftReference(created, _refQueue));
jObjectResolver.notifyWrite(created);
return created;
}
}
jObjectRepository.writeJObject(object);
}
@Override
public void invalidateJObject(String name) {
public JObject<?> getOrPut(String name, ObjectMetadata md) {
cleanup();
var got = get(name);
if (got.isPresent()) {
if (!got.get().isOf(md.getType())) {
throw new NotImplementedException("Type mismatch for " + name);
}
return got.get();
}
synchronized (_map) {
_map.remove(name);
var inMap = getFromMap(md.getName());
if (inMap != null) {
return inMap;
} else {
var created = new JObject<>(jObjectResolver, md);
_map.put(md.getName(), new NamedSoftReference(created, _refQueue));
jObjectResolver.notifyWrite(created);
return created;
}
}
}
@Override
public <D extends JObjectData> JObject<D> getOrPut(String name, D object) {
cleanup();
var got = get(name);
if (got.isPresent()) {
if (!got.get().isOf(object.getClass())) {
throw new NotImplementedException("Type mismatch for " + name);
}
return (JObject<D>) got.get();
}
synchronized (_map) {
var inMap = getFromMap(object.getName());
if (inMap != null) {
var ok = inMap.runReadLocked((m) -> {
return object.getClass().isAssignableFrom(m.getType());
});
if (ok)
return (JObject<D>) inMap;
else
throw new NotImplementedException("Type mismatch for " + name);
} else {
var created = new JObject<D>(jObjectResolver, object.getName(), object.getConflictResolver().getName(), object);
_map.put(object.getName(), new NamedSoftReference(created, _refQueue));
jObjectResolver.notifyWrite(created);
return created;
}
}
}
}

View File

@@ -1,13 +0,0 @@
package com.usatiuk.dhfs.storage.objects.jrepository;
import javax.annotation.Nonnull;
import java.util.Optional;
public interface JObjectRepository {
@Nonnull
Optional<JObject> readJObject(String name);
@Nonnull
<T extends JObject> Optional<T> readJObjectChecked(String name, Class<T> clazz);
@Nonnull
void writeJObject(JObject object);
}

View File

@@ -1,49 +0,0 @@
package com.usatiuk.dhfs.storage.objects.jrepository;
import com.usatiuk.dhfs.storage.DeserializationHelper;
import com.usatiuk.dhfs.storage.objects.repository.ObjectRepository;
import io.quarkus.logging.Log;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import org.apache.commons.lang3.SerializationUtils;
import javax.annotation.Nonnull;
import java.util.Optional;
@ApplicationScoped
public class JObjectRepositoryImpl implements JObjectRepository {
@Inject
ObjectRepository objectRepository;
@Nonnull
@Override
public Optional<JObject> readJObject(String name) {
var read = objectRepository.readObject(name);
java.lang.Object obj = DeserializationHelper.deserialize(read);
if (!(obj instanceof JObject)) {
Log.error("Read object is not a JObject: " + name);
return Optional.empty();
}
return Optional.of((JObject) obj);
}
@Nonnull
@Override
public <T extends JObject> Optional<T> readJObjectChecked(String name, Class<T> clazz) {
var read = readJObject(name);
if (read.isEmpty()) return Optional.empty();
if (!clazz.isAssignableFrom(read.get().getClass())) {
Log.error("Read object type mismatch: " + name);
return Optional.empty();
}
return Optional.of((T) read.get());
}
@Nonnull
@Override
public void writeJObject(JObject object) {
objectRepository.writeObject(object.getName(), SerializationUtils.serialize(object),
"JObjectConflictResolution");
}
}

View File

@@ -0,0 +1,36 @@
package com.usatiuk.dhfs.storage.objects.jrepository;
import com.usatiuk.dhfs.storage.DeserializationHelper;
import com.usatiuk.dhfs.storage.objects.repository.distributed.InvalidationQueueService;
import com.usatiuk.dhfs.storage.objects.repository.distributed.RemoteObjectServiceClient;
import com.usatiuk.dhfs.storage.objects.repository.persistence.ObjectPersistentStore;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import org.apache.commons.lang3.SerializationUtils;
@ApplicationScoped
public class JObjectResolver {
@Inject
ObjectPersistentStore objectPersistentStore;
@Inject
RemoteObjectServiceClient remoteObjectServiceClient;
@Inject
InvalidationQueueService invalidationQueueService;
public <T extends JObjectData> T resolveData(JObject<T> jObject) {
if (objectPersistentStore.existsObject(jObject.getName()))
return DeserializationHelper.deserialize(objectPersistentStore.readObject(jObject.getName()));
var obj = remoteObjectServiceClient.getObject(jObject);
objectPersistentStore.writeObject(jObject.getName(), obj);
return DeserializationHelper.deserialize(obj);
}
public void notifyWrite(JObject<?> self) {
objectPersistentStore.writeObject("meta_" + self.getName(), self.runReadLocked((m) -> SerializationUtils.serialize(m)));
objectPersistentStore.writeObject(self.getName(), self.runReadLocked((m, d) -> SerializationUtils.serialize(d)));
invalidationQueueService.pushInvalidationToAll(self.getName());
}
}

View File

@@ -1,21 +0,0 @@
package com.usatiuk.dhfs.storage.objects.repository;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import javax.annotation.Nonnull;
import java.util.List;
public interface ObjectRepository {
@Nonnull
List<String> findObjects(String prefix);
@Nonnull
Boolean existsObject(String name);
@Nonnull
byte[] readObject(String name);
@Nonnull
void writeObject(String name, byte[] data, String conflictResolver);
@Nonnull
void deleteObject(String name);
}

View File

@@ -1,45 +0,0 @@
package com.usatiuk.dhfs.storage.objects.repository;
import com.usatiuk.dhfs.storage.objects.repository.persistence.ObjectPersistentStore;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import jakarta.inject.Inject;
import javax.annotation.Nonnull;
import java.util.List;
//@ApplicationScoped
public class SimplePersistentObjectRepository implements ObjectRepository {
@Inject
ObjectPersistentStore objectPersistentStore;
@Nonnull
@Override
public List<String> findObjects(String prefix) {
return objectPersistentStore.findObjects(prefix);
}
@Nonnull
@Override
public Boolean existsObject(String name) {
return objectPersistentStore.existsObject(name);
}
@Nonnull
@Override
public byte[] readObject(String name) {
return objectPersistentStore.readObject(name);
}
@Nonnull
@Override
public void writeObject(String name, byte[] data, String conflictResolver) {
objectPersistentStore.writeObject(name, data);
}
@Nonnull
@Override
public void deleteObject(String name) {
objectPersistentStore.deleteObject(name);
}
}

View File

@@ -1,114 +0,0 @@
package com.usatiuk.dhfs.storage.objects.repository.distributed;
import com.usatiuk.dhfs.storage.objects.repository.ObjectRepository;
import com.usatiuk.dhfs.storage.objects.repository.persistence.ObjectPersistentStore;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.quarkus.logging.Log;
import io.quarkus.runtime.ShutdownEvent;
import io.quarkus.runtime.StartupEvent;
import io.vertx.mutiny.core.Vertx;
import jakarta.annotation.Priority;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
import jakarta.inject.Inject;
import org.apache.commons.lang3.NotImplementedException;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import javax.annotation.Nonnull;
import java.io.IOException;
import java.util.List;
import java.util.Optional;
@ApplicationScoped
public class DistributedObjectRepository implements ObjectRepository {
@ConfigProperty(name = "dhfs.objects.distributed.selfname")
String selfname;
@Inject
Vertx vertx;
@Inject
ObjectPersistentStore objectPersistentStore;
@Inject
ObjectIndexService objectIndexService;
@Inject
RemoteObjectServiceClient remoteObjectServiceClient;
@Inject
SyncHandler syncHandler;
@Inject
InvalidationQueueService invalidationQueueService;
void init(@Observes @Priority(400) StartupEvent event) throws IOException {
}
void shutdown(@Observes @Priority(200) ShutdownEvent event) throws IOException {
}
@Nonnull
@Override
public List<String> findObjects(String prefix) {
throw new NotImplementedException();
}
@Nonnull
@Override
public Boolean existsObject(String name) {
return objectIndexService.exists(name);
}
@Nonnull
@Override
public byte[] readObject(String name) {
var infoOpt = objectIndexService.getMeta(name);
if (infoOpt.isEmpty())
throw new StatusRuntimeException(Status.NOT_FOUND.withDescription("Object " + name + " doesn't exist"));
var info = infoOpt.get();
Optional<byte[]> read = info.runReadLocked((data) -> {
if (objectPersistentStore.existsObject(name))
return Optional.of(objectPersistentStore.readObject(name));
return Optional.empty();
});
if (read.isPresent()) return read.get();
// Possible race if it got deleted?
return info.runWriteLocked((data) -> {
var obj = remoteObjectServiceClient.getObject(name);
objectPersistentStore.writeObject(name, obj);
return obj;
});
}
@Nonnull
@Override
public void writeObject(String name, byte[] data, String conflictResolver) {
var info = objectIndexService.getOrCreateMeta(name, conflictResolver);
info.runWriteLocked((metaData) -> {
objectPersistentStore.writeObject(name, data);
metaData.getChangelog().merge(selfname, 1L, Long::sum);
return null;
});
// FIXME: Race?
try {
invalidationQueueService.pushInvalidationToAll(name);
} catch (Exception e) {
Log.error("Error when notifying remote update:");
Log.error(e);
Log.error(e.getCause());
}
}
@Nonnull
@Override
public void deleteObject(String name) {
throw new NotImplementedException();
}
}

View File

@@ -1,68 +0,0 @@
package com.usatiuk.dhfs.storage.objects.repository.distributed;
import java.io.Serializable;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class ObjectIndex implements Serializable {
private final ObjectIndexData _data = new ObjectIndexData();
private final ReadWriteLock _lock = new ReentrantReadWriteLock();
@FunctionalInterface
public interface ObjectIndexFn<R> {
R apply(ObjectIndexData indexData);
}
public <R> R runReadLocked(ObjectIndexFn<R> fn) {
_lock.readLock().lock();
try {
return fn.apply(_data);
} finally {
_lock.readLock().unlock();
}
}
public <R> R runWriteLocked(ObjectIndexFn<R> fn) {
_lock.writeLock().lock();
try {
return fn.apply(_data);
} finally {
_lock.writeLock().unlock();
}
}
public boolean exists(String name) {
return runReadLocked((data) -> {
return data.getObjectMetaMap().containsKey(name);
});
}
public Optional<ObjectMeta> get(String name) {
return runReadLocked((data) -> {
if (data.getObjectMetaMap().containsKey(name)) {
return Optional.of(data.getObjectMetaMap().get(name));
} else {
return Optional.empty();
}
});
}
public ObjectMeta getOrCreate(String name, String conflictResolver) {
return runWriteLocked((data) -> {
if (data.getObjectMetaMap().containsKey(name)) {
var got = data.getObjectMetaMap().get(name);
if (!Objects.equals(got.getConflictResolver(), conflictResolver))
throw new IllegalArgumentException("conflictResolver mismatch for " + name);
return got;
} else {
var newObjectMeta = new ObjectMeta(name, conflictResolver);
data.getObjectMetaMap().put(name, newObjectMeta);
return newObjectMeta;
}
});
}
}

View File

@@ -1,12 +0,0 @@
package com.usatiuk.dhfs.storage.objects.repository.distributed;
import lombok.Getter;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
public class ObjectIndexData implements Serializable {
@Getter
private final Map<String, ObjectMeta> _objectMetaMap = new HashMap<>();
}

View File

@@ -1,74 +0,0 @@
package com.usatiuk.dhfs.storage.objects.repository.distributed;
import com.usatiuk.dhfs.storage.DeserializationHelper;
import io.quarkus.logging.Log;
import io.quarkus.runtime.ShutdownEvent;
import io.quarkus.runtime.StartupEvent;
import jakarta.annotation.Priority;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
import org.apache.commons.lang3.SerializationUtils;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Optional;
@ApplicationScoped
public class ObjectIndexService {
@ConfigProperty(name = "dhfs.objects.distributed.selfname")
String selfname;
ObjectIndex _index = new ObjectIndex();
@ConfigProperty(name = "dhfs.objects.distributed.root")
String metaRoot;
final String metaFileName = "meta";
void init(@Observes @Priority(300) StartupEvent event) throws IOException {
Paths.get(metaRoot).toFile().mkdirs();
Log.info("Initializing with root " + metaRoot);
if (Paths.get(metaRoot).resolve(metaFileName).toFile().exists()) {
Log.info("Reading index");
_index = DeserializationHelper.deserialize(Files.readAllBytes(Paths.get(metaRoot).resolve(metaFileName)));
}
}
void shutdown(@Observes @Priority(300) ShutdownEvent event) throws IOException {
Log.info("Saving index");
Files.write(Paths.get(metaRoot).resolve(metaFileName), SerializationUtils.serialize(_index));
Log.info("Shutdown");
}
public boolean exists(String name) {
return _index.exists(name);
}
public Optional<ObjectMeta> getMeta(String name) {
return _index.get(name);
}
public ObjectMeta getOrCreateMeta(String name, String conflictResolver) {
var ret = _index.getOrCreate(name, conflictResolver);
ret.runWriteLocked(md -> {
md.getChangelog().putIfAbsent(selfname, 0L);
return null;
});
return ret;
}
@FunctionalInterface
public interface ForAllFn {
void apply(String name, ObjectMeta meta);
}
public void forAllRead(ForAllFn fn) {
_index.runReadLocked((data) -> {
for (var entry : data.getObjectMetaMap().entrySet()) {
fn.apply(entry.getKey(), entry.getValue());
}
return null;
});
}
}

View File

@@ -1,41 +0,0 @@
package com.usatiuk.dhfs.storage.objects.repository.distributed;
import java.io.Serializable;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class ObjectMeta implements Serializable {
public ObjectMeta(String name, String conflictResolver) {
_data = new ObjectMetaData(name, conflictResolver);
}
private final ObjectMetaData _data;
private final ReadWriteLock _lock = new ReentrantReadWriteLock();
public String getConflictResolver() {
return runReadLocked(ObjectMetaData::getConflictResolver);
}
@FunctionalInterface
public interface ObjectMetaFn<R> {
R apply(ObjectMetaData indexData);
}
public <R> R runReadLocked(ObjectMetaFn<R> fn) {
_lock.readLock().lock();
try {
return fn.apply(_data);
} finally {
_lock.readLock().unlock();
}
}
public <R> R runWriteLocked(ObjectMetaFn<R> fn) {
_lock.writeLock().lock();
try {
return fn.apply(_data);
} finally {
_lock.writeLock().unlock();
}
}
}

View File

@@ -3,16 +3,18 @@ package com.usatiuk.dhfs.storage.objects.repository.distributed;
import com.usatiuk.dhfs.objects.repository.distributed.ObjectChangelog;
import com.usatiuk.dhfs.objects.repository.distributed.ObjectChangelogEntry;
import com.usatiuk.dhfs.objects.repository.distributed.ObjectHeader;
import com.usatiuk.dhfs.storage.objects.jrepository.JObjectData;
import lombok.Getter;
import java.io.Serializable;
import java.util.LinkedHashMap;
import java.util.Map;
public class ObjectMetaData implements Serializable {
public ObjectMetaData(String name, String conflictResolver) {
public class ObjectMetadata implements Serializable {
public ObjectMetadata(String name, String conflictResolver, Class<? extends JObjectData> type) {
_name = name;
_conflictResolver = conflictResolver;
_type = type;
}
@Getter
@@ -21,6 +23,9 @@ public class ObjectMetaData implements Serializable {
@Getter
private final String _conflictResolver;
@Getter
private final Class<? extends JObjectData> _type;
@Getter
private final Map<String, Long> _remoteCopies = new LinkedHashMap<>();

View File

@@ -1,6 +1,8 @@
package com.usatiuk.dhfs.storage.objects.repository.distributed;
import com.usatiuk.dhfs.objects.repository.distributed.*;
import com.usatiuk.dhfs.storage.objects.jrepository.JObject;
import com.usatiuk.dhfs.storage.objects.jrepository.JObjectManager;
import io.quarkus.logging.Log;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
@@ -16,10 +18,10 @@ public class RemoteObjectServiceClient {
String selfname;
@Inject
ObjectIndexService objectIndexService;
RemoteHostManager remoteHostManager;
@Inject
RemoteHostManager remoteHostManager;
JObjectManager jObjectManager;
public Pair<ObjectHeader, byte[]> getSpecificObject(String host, String name) {
return remoteHostManager.withClient(host, client -> {
@@ -28,19 +30,15 @@ public class RemoteObjectServiceClient {
});
}
public byte[] getObject(String name) {
var meta = objectIndexService.getMeta(name).orElseThrow(() -> {
Log.error("Race when trying to fetch");
return new NotImplementedException();
});
var targets = meta.runReadLocked(d -> {
public byte[] getObject(JObject<?> jObject) {
// Assert lock?
var targets = jObject.runReadLocked(d -> {
var bestVer = d.getBestVersion();
return d.getRemoteCopies().entrySet().stream().filter(entry -> entry.getValue().equals(bestVer)).map(Map.Entry::getKey).toList();
});
return remoteHostManager.withClientAny(targets, client -> {
var reply = client.getObject(GetObjectRequest.newBuilder().setSelfname(selfname).setName(name).build());
var reply = client.getObject(GetObjectRequest.newBuilder().setSelfname(selfname).setName(jObject.getName()).build());
var receivedSelfVer = reply.getObject().getHeader().getChangelog()
.getEntriesList().stream().filter(p -> p.getHost().equals(selfname))
@@ -49,7 +47,7 @@ public class RemoteObjectServiceClient {
var receivedTotalVer = reply.getObject().getHeader().getChangelog().getEntriesList()
.stream().map(ObjectChangelogEntry::getVersion).reduce(0L, Long::sum);
return meta.runReadLocked(md -> {
return jObject.runReadLocked(md -> {
var outdated =
(md.getOurVersion() > receivedTotalVer)
|| (md.getChangelog().get(selfname) > receivedSelfVer);
@@ -72,16 +70,13 @@ public class RemoteObjectServiceClient {
}
public void notifyUpdate(String host, String name) {
remoteHostManager.withClient(host, client -> {
var meta = objectIndexService.getMeta(name).orElseThrow(() -> {
Log.error("Race when trying to notify update");
return new NotImplementedException();
});
var obj = jObjectManager.get(name).orElseThrow(() -> new NotImplementedException("Race when invalidating"));
remoteHostManager.withClient(host, client -> {
var builder = IndexUpdatePush.newBuilder().setSelfname(selfname);
client.indexUpdate(builder.setHeader(
meta.runReadLocked(ObjectMetaData::toRpcHeader)
obj.runReadLocked(ObjectMetadata::toRpcHeader)
).build());
return null;
});

View File

@@ -2,6 +2,7 @@ package com.usatiuk.dhfs.storage.objects.repository.distributed;
import com.google.protobuf.ByteString;
import com.usatiuk.dhfs.objects.repository.distributed.*;
import com.usatiuk.dhfs.storage.objects.jrepository.JObjectManager;
import com.usatiuk.dhfs.storage.objects.repository.persistence.ObjectPersistentStore;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
@@ -10,11 +11,10 @@ import io.quarkus.logging.Log;
import io.smallrye.common.annotation.Blocking;
import io.smallrye.mutiny.Uni;
import jakarta.inject.Inject;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import java.util.Optional;
// Note: RunOnVirtualThread hangs somehow
@GrpcService
public class RemoteObjectServiceServer implements DhfsObjectSyncGrpc {
@@ -25,10 +25,10 @@ public class RemoteObjectServiceServer implements DhfsObjectSyncGrpc {
ObjectPersistentStore objectPersistentStore;
@Inject
ObjectIndexService objectIndexService;
SyncHandler syncHandler;
@Inject
SyncHandler syncHandler;
JObjectManager jObjectManager;
@Inject
RemoteHostManager remoteHostManager;
@@ -37,22 +37,17 @@ public class RemoteObjectServiceServer implements DhfsObjectSyncGrpc {
@Blocking
public Uni<GetObjectReply> getObject(GetObjectRequest request) {
if (request.getSelfname().isBlank()) throw new StatusRuntimeException(Status.INVALID_ARGUMENT);
remoteHostManager.handleConnectionSuccess(request.getSelfname());
Log.info("<-- getObject: " + request.getName());
var meta = objectIndexService.getMeta(request.getName()).orElseThrow(() -> new StatusRuntimeException(Status.NOT_FOUND));
var obj = jObjectManager.get(request.getName()).orElseThrow(() -> new StatusRuntimeException(Status.NOT_FOUND));
Optional<Pair<ObjectHeader, byte[]>> readOpt = meta.runReadLocked((data) -> {
if (objectPersistentStore.existsObject(request.getName())) {
ObjectHeader header = data.toRpcHeader();
byte[] bytes = objectPersistentStore.readObject(request.getName());
return Optional.of(Pair.of(header, bytes));
} else {
return Optional.empty();
}
Pair<ObjectHeader, byte[]> read = obj.runReadLocked((meta, data) -> {
byte[] bytes = objectPersistentStore.readObject(request.getName());
return Pair.of(meta.toRpcHeader(), SerializationUtils.serialize(data));
});
var read = readOpt.orElseThrow(() -> new StatusRuntimeException(Status.NOT_FOUND));
var replyObj = ApiObject.newBuilder().setHeader(read.getLeft()).setContent(ByteString.copyFrom(read.getRight())).build();
return Uni.createFrom().item(GetObjectReply.newBuilder().setObject(replyObj).build());
}
@@ -65,9 +60,15 @@ public class RemoteObjectServiceServer implements DhfsObjectSyncGrpc {
Log.info("<-- getIndex: ");
var builder = GetIndexReply.newBuilder().setSelfname(selfname);
objectIndexService.forAllRead((name, meta) -> {
builder.addObjects(meta.runReadLocked(ObjectMetaData::toRpcHeader));
});
var objs = jObjectManager.find("");
for (var obj : objs) {
obj.runReadLocked((meta) -> {
builder.addObjects(meta.toRpcHeader());
return null;
});
}
return Uni.createFrom().item(builder.build());
}

View File

@@ -3,6 +3,8 @@ package com.usatiuk.dhfs.storage.objects.repository.distributed;
import com.usatiuk.dhfs.objects.repository.distributed.IndexUpdatePush;
import com.usatiuk.dhfs.objects.repository.distributed.IndexUpdateReply;
import com.usatiuk.dhfs.objects.repository.distributed.ObjectChangelogEntry;
import com.usatiuk.dhfs.storage.objects.jrepository.JObject;
import com.usatiuk.dhfs.storage.objects.jrepository.JObjectData;
import com.usatiuk.dhfs.storage.objects.jrepository.JObjectManager;
import com.usatiuk.dhfs.storage.objects.repository.persistence.ObjectPersistentStore;
import io.grpc.Status;
@@ -14,14 +16,11 @@ import jakarta.annotation.Priority;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
import jakarta.enterprise.inject.Instance;
import jakarta.enterprise.inject.literal.NamedLiteral;
import jakarta.inject.Inject;
import org.apache.commons.lang3.NotImplementedException;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
@ApplicationScoped
@@ -32,9 +31,6 @@ public class SyncHandler {
@Inject
ObjectPersistentStore objectPersistentStore;
@Inject
ObjectIndexService objectIndexService;
@Inject
JObjectManager jObjectManager;
@@ -66,19 +62,26 @@ public class SyncHandler {
handleRemoteUpdate(IndexUpdatePush.newBuilder()
.setSelfname(got.getSelfname()).setHeader(h).build());
}
// Push our index to the other peer too, as they might not request it if
// they didn't thing we were disconnected
List<String> toPush = new ArrayList<>();
objectIndexService.forAllRead((name, meta) -> {
toPush.add(name);
});
for (String name : toPush) {
invalidationQueueService.pushInvalidationToOne(host, name);
}
// // Push our index to the other peer too, as they might not request it if
// // they didn't thing we were disconnected
// List<String> toPush = new ArrayList<>();
// objectIndexService.forAllRead((name, meta) -> {
// toPush.add(name);
// });
// for (String name : toPush) {
// invalidationQueueService.pushInvalidationToOne(host, name);
// }
}
public IndexUpdateReply handleRemoteUpdate(IndexUpdatePush request) {
var meta = objectIndexService.getOrCreateMeta(request.getHeader().getName(), request.getHeader().getConflictResolver());
JObject<?> found;
try {
found = jObjectManager.getOrPut(request.getHeader().getName(), new ObjectMetadata(
request.getHeader().getName(), request.getHeader().getConflictResolver(), (Class<? extends JObjectData>) Class.forName(request.getHeader().getType())
));
} catch (ClassNotFoundException ex) {
throw new NotImplementedException(ex);
}
var receivedSelfVer = request.getHeader().getChangelog()
.getEntriesList().stream().filter(p -> p.getHost().equals(selfname))
@@ -87,33 +90,33 @@ public class SyncHandler {
var receivedTotalVer = request.getHeader().getChangelog().getEntriesList()
.stream().map(ObjectChangelogEntry::getVersion).reduce(0L, Long::sum);
boolean conflict = meta.runWriteLocked((data) -> {
if (data.getRemoteCopies().getOrDefault(request.getSelfname(), 0L) > receivedTotalVer) {
boolean conflict = found.runWriteLocked((md) -> {
if (md.getRemoteCopies().getOrDefault(request.getSelfname(), 0L) > receivedTotalVer) {
Log.error("Received older index update than was known for host: "
+ request.getSelfname() + " " + request.getHeader().getName());
return false;
}
if (data.getChangelog().get(selfname) > receivedSelfVer) return true;
if (md.getChangelog().get(selfname) > receivedSelfVer) return true;
data.getRemoteCopies().put(request.getSelfname(), receivedTotalVer);
md.getRemoteCopies().put(request.getSelfname(), receivedTotalVer);
if (Objects.equals(data.getOurVersion(), receivedTotalVer)) {
if (Objects.equals(md.getOurVersion(), receivedTotalVer)) {
for (var e : request.getHeader().getChangelog().getEntriesList()) {
if (!Objects.equals(data.getChangelog().getOrDefault(e.getHost(), 0L),
if (!Objects.equals(md.getChangelog().getOrDefault(e.getHost(), 0L),
e.getVersion())) return true;
}
}
// TODO: recheck this
if (data.getOurVersion() > receivedTotalVer) {
if (md.getOurVersion() > receivedTotalVer) {
Log.info("Received older index update than known: "
+ request.getSelfname() + " " + request.getHeader().getName());
return false;
}
// data.getBestVersion() > data.getTotalVersion() should also work
if (receivedTotalVer > data.getOurVersion()) {
// md.getBestVersion() > md.getTotalVersion() should also work
if (receivedTotalVer > md.getOurVersion()) {
try {
Log.info("Deleting " + request.getHeader().getName() + " as per invalidation from " + request.getSelfname());
objectPersistentStore.deleteObject(request.getHeader().getName());
@@ -123,21 +126,19 @@ public class SyncHandler {
} catch (Exception e) {
Log.info("Couldn't delete object from persistent store: ", e);
}
jObjectManager.invalidateJObject(data.getName());
}
data.getChangelog().clear();
md.getChangelog().clear();
for (var entry : request.getHeader().getChangelog().getEntriesList()) {
data.getChangelog().put(entry.getHost(), entry.getVersion());
md.getChangelog().put(entry.getHost(), entry.getVersion());
}
data.getChangelog().putIfAbsent(selfname, 0L);
md.getChangelog().putIfAbsent(selfname, 0L);
return false;
});
if (conflict) {
var resolver = conflictResolvers.select(NamedLiteral.of(meta.getConflictResolver()));
var resolver = conflictResolvers.select(found.getConflictResolver());
var result = resolver.get().resolve(request.getSelfname(), request.getHeader(), request.getHeader().getName());
if (result.equals(ConflictResolver.ConflictResolutionResult.RESOLVED)) {
Log.info("Resolved conflict for " + request.getSelfname() + " " + request.getHeader().getName());

View File

@@ -1,48 +0,0 @@
syntax = "proto3";
option java_multiple_files = true;
option java_package = "com.usatiuk.dhfs.storage.files.api";
option java_outer_classname = "DhfsFilesApi";
package dhfs.files;
service DhfsFilesGrpc {
rpc FindFiles (FindFilesRequest) returns (FindFilesReply) {}
rpc ReadFile (ReadFileRequest) returns (ReadFileReply) {}
rpc WriteFile (WriteFileRequest) returns (WriteFileReply) {}
rpc DeleteFile (DeleteFileRequest) returns (DeleteFileReply) {}
}
message FindFilesRequest {
string prefix = 2;
}
message FindFilesReply {
message FindFilesEntry {
string name = 1;
}
repeated FindFilesEntry found = 1;
}
message ReadFileRequest {
string name = 2;
}
message ReadFileReply {
bytes data = 10;
}
message WriteFileRequest {
string name = 2;
bytes data = 10;
}
message WriteFileReply {
}
message DeleteFileRequest {
string name = 2;
}
message DeleteFileReply {
}

View File

@@ -1,48 +0,0 @@
syntax = "proto3";
option java_multiple_files = true;
option java_package = "com.usatiuk.dhfs.storage.objects.api";
option java_outer_classname = "DhfsObjectsApi";
package dhfs.objects;
service DhfsObjectGrpc {
rpc FindObjects (FindObjectsRequest) returns (FindObjectsReply) {}
rpc ReadObject (ReadObjectRequest) returns (ReadObjectReply) {}
rpc WriteObject (WriteObjectRequest) returns (WriteObjectReply) {}
rpc DeleteObject (DeleteObjectRequest) returns (DeleteObjectReply) {}
}
message FindObjectsRequest {
string prefix = 2;
}
message FindObjectsReply {
message FindObjectsEntry {
string name = 1;
}
repeated FindObjectsEntry found = 1;
}
message ReadObjectRequest {
string name = 2;
}
message ReadObjectReply {
bytes data = 10;
}
message WriteObjectRequest {
string name = 2;
bytes data = 10;
}
message WriteObjectReply {
}
message DeleteObjectRequest {
string name = 2;
}
message DeleteObjectReply {
}

View File

@@ -34,7 +34,8 @@ message ObjectChangelog {
message ObjectHeader {
string name = 2;
string conflictResolver = 3;
ObjectChangelog changelog = 4;
string type = 4;
ObjectChangelog changelog = 5;
}
message ApiObject {