mirror of
https://github.com/usatiuk/dhfs.git
synced 2025-10-28 20:47:49 +01:00
something?
This commit is contained in:
@@ -67,36 +67,34 @@ public class DhfsFuse extends FuseStubFS {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int getattr(String path, FileStat stat) {
|
public int getattr(String path, FileStat stat) {
|
||||||
Optional<FsNode> found;
|
|
||||||
try {
|
try {
|
||||||
found = fileService.getDirEntry(path).await().indefinitely();
|
Optional<FsNode> found = fileService.getDirEntry(path).await().indefinitely();
|
||||||
|
if (found.isEmpty()) {
|
||||||
|
return -ErrorCodes.ENOENT();
|
||||||
|
}
|
||||||
|
if (found.get() instanceof File f) {
|
||||||
|
stat.st_mode.set(S_IFREG | f.getMode());
|
||||||
|
stat.st_nlink.set(1);
|
||||||
|
stat.st_size.set(fileService.size(f).await().indefinitely());
|
||||||
|
} else if (found.get() instanceof Directory d) {
|
||||||
|
stat.st_mode.set(S_IFDIR | d.getMode());
|
||||||
|
stat.st_nlink.set(2);
|
||||||
|
}
|
||||||
|
var foundDent = (FsNode) found.get();
|
||||||
|
|
||||||
|
var ctime = System.currentTimeMillis();
|
||||||
|
stat.st_atim.tv_sec.set(ctime / 1000);
|
||||||
|
stat.st_atim.tv_nsec.set((ctime % 1000) * 1000);
|
||||||
|
|
||||||
|
// FIXME: Race?
|
||||||
|
stat.st_ctim.tv_sec.set(foundDent.getCtime() / 1000);
|
||||||
|
stat.st_ctim.tv_nsec.set((foundDent.getCtime() % 1000) * 1000);
|
||||||
|
stat.st_mtim.tv_sec.set(foundDent.getMtime() / 1000);
|
||||||
|
stat.st_mtim.tv_nsec.set((foundDent.getMtime() % 1000) * 1000);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
Log.error("When accessing " + path, e);
|
Log.error("When accessing " + path, e);
|
||||||
return -ErrorCodes.ENOENT();
|
return -ErrorCodes.ENOENT();
|
||||||
}
|
}
|
||||||
if (found.isEmpty()) {
|
|
||||||
return -ErrorCodes.ENOENT();
|
|
||||||
}
|
|
||||||
if (found.get() instanceof File f) {
|
|
||||||
stat.st_mode.set(S_IFREG | f.getMode());
|
|
||||||
stat.st_nlink.set(1);
|
|
||||||
stat.st_size.set(fileService.size(f).await().indefinitely());
|
|
||||||
} else if (found.get() instanceof Directory d) {
|
|
||||||
stat.st_mode.set(S_IFDIR | d.getMode());
|
|
||||||
stat.st_nlink.set(2);
|
|
||||||
}
|
|
||||||
var foundDent = (FsNode) found.get();
|
|
||||||
|
|
||||||
var ctime = System.currentTimeMillis();
|
|
||||||
stat.st_atim.tv_sec.set(ctime / 1000);
|
|
||||||
stat.st_atim.tv_nsec.set((ctime % 1000) * 1000);
|
|
||||||
|
|
||||||
// FIXME: Race?
|
|
||||||
stat.st_ctim.tv_sec.set(foundDent.getCtime() / 1000);
|
|
||||||
stat.st_ctim.tv_nsec.set((foundDent.getCtime() % 1000) * 1000);
|
|
||||||
stat.st_mtim.tv_sec.set(foundDent.getMtime() / 1000);
|
|
||||||
stat.st_mtim.tv_nsec.set((foundDent.getMtime() % 1000) * 1000);
|
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -33,7 +33,7 @@ public class DhfsObjectGrpcService implements DhfsObjectGrpc {
|
|||||||
@RunOnVirtualThread
|
@RunOnVirtualThread
|
||||||
public Uni<WriteObjectReply> writeObject(WriteObjectRequest request) {
|
public Uni<WriteObjectReply> writeObject(WriteObjectRequest request) {
|
||||||
objectRepository.writeObject(request.getNamespace(),
|
objectRepository.writeObject(request.getNamespace(),
|
||||||
new Object(new Namespace(request.getNamespace()), request.getName(), request.getData().toByteArray()));
|
new Object(new Namespace(request.getNamespace()), request.getName(), request.getData().toByteArray()), false);
|
||||||
return Uni.createFrom().item(WriteObjectReply.newBuilder().build());
|
return Uni.createFrom().item(WriteObjectReply.newBuilder().build());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -7,4 +7,5 @@ import java.util.Optional;
|
|||||||
public interface JObjectManager {
|
public interface JObjectManager {
|
||||||
<T extends JObject> Uni<Optional<T>> get(String namespace, String key, Class<T> clazz);
|
<T extends JObject> Uni<Optional<T>> get(String namespace, String key, Class<T> clazz);
|
||||||
<T extends JObject> Uni<Void> put(String namespace, T object);
|
<T extends JObject> Uni<Void> put(String namespace, T object);
|
||||||
|
void invalidateJObject(String namespace, String name);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -93,4 +93,10 @@ public class JObjectManagerImpl implements JObjectManager {
|
|||||||
return Uni.createFrom().voidItem();
|
return Uni.createFrom().voidItem();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void invalidateJObject(String namespace, String name) {
|
||||||
|
synchronized (_map) {
|
||||||
|
_map.remove(name);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -10,7 +10,6 @@ import jakarta.inject.Inject;
|
|||||||
import org.apache.commons.lang3.SerializationUtils;
|
import org.apache.commons.lang3.SerializationUtils;
|
||||||
|
|
||||||
import javax.annotation.Nonnull;
|
import javax.annotation.Nonnull;
|
||||||
import java.io.ByteArrayInputStream;
|
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
|
|
||||||
@ApplicationScoped
|
@ApplicationScoped
|
||||||
@@ -50,6 +49,6 @@ public class JObjectRepositoryImpl implements JObjectRepository {
|
|||||||
new Namespace(namespace),
|
new Namespace(namespace),
|
||||||
object.getName(),
|
object.getName(),
|
||||||
SerializationUtils.serialize(object));
|
SerializationUtils.serialize(object));
|
||||||
objectRepository.writeObject(namespace, obj);
|
objectRepository.writeObject(namespace, obj, object.assumeUnique());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -5,7 +5,6 @@ import io.smallrye.mutiny.Multi;
|
|||||||
import io.smallrye.mutiny.Uni;
|
import io.smallrye.mutiny.Uni;
|
||||||
|
|
||||||
import javax.annotation.Nonnull;
|
import javax.annotation.Nonnull;
|
||||||
import java.nio.ByteBuffer;
|
|
||||||
|
|
||||||
public interface ObjectRepository {
|
public interface ObjectRepository {
|
||||||
@Nonnull
|
@Nonnull
|
||||||
@@ -16,7 +15,7 @@ public interface ObjectRepository {
|
|||||||
@Nonnull
|
@Nonnull
|
||||||
Object readObject(String namespace, String name);
|
Object readObject(String namespace, String name);
|
||||||
@Nonnull
|
@Nonnull
|
||||||
void writeObject(String namespace, Object object);
|
void writeObject(String namespace, Object object, Boolean canIgnoreConflict);
|
||||||
@Nonnull
|
@Nonnull
|
||||||
void deleteObject(String namespace, String name);
|
void deleteObject(String namespace, String name);
|
||||||
|
|
||||||
|
|||||||
@@ -2,7 +2,6 @@ package com.usatiuk.dhfs.storage.objects.repository;
|
|||||||
|
|
||||||
import com.usatiuk.dhfs.storage.objects.data.Object;
|
import com.usatiuk.dhfs.storage.objects.data.Object;
|
||||||
import com.usatiuk.dhfs.storage.objects.repository.persistence.ObjectPersistentStore;
|
import com.usatiuk.dhfs.storage.objects.repository.persistence.ObjectPersistentStore;
|
||||||
import io.smallrye.common.annotation.RunOnVirtualThread;
|
|
||||||
import io.smallrye.mutiny.Multi;
|
import io.smallrye.mutiny.Multi;
|
||||||
import io.smallrye.mutiny.Uni;
|
import io.smallrye.mutiny.Uni;
|
||||||
import jakarta.inject.Inject;
|
import jakarta.inject.Inject;
|
||||||
@@ -35,7 +34,7 @@ public class SimplePersistentObjectRepository implements ObjectRepository {
|
|||||||
|
|
||||||
@Nonnull
|
@Nonnull
|
||||||
@Override
|
@Override
|
||||||
public void writeObject(String namespace, Object object) {
|
public void writeObject(String namespace, Object object, Boolean canIgnoreConflict) {
|
||||||
objectPersistentStore.writeObject(namespace, object).await().indefinitely();
|
objectPersistentStore.writeObject(namespace, object).await().indefinitely();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -1,8 +1,10 @@
|
|||||||
package com.usatiuk.dhfs.storage.objects.repository.distributed;
|
package com.usatiuk.dhfs.storage.objects.repository.distributed;
|
||||||
|
|
||||||
|
import com.usatiuk.dhfs.objects.repository.distributed.IndexUpdatePush;
|
||||||
import com.usatiuk.dhfs.storage.objects.data.Object;
|
import com.usatiuk.dhfs.storage.objects.data.Object;
|
||||||
import com.usatiuk.dhfs.storage.objects.repository.ObjectRepository;
|
import com.usatiuk.dhfs.storage.objects.repository.ObjectRepository;
|
||||||
import com.usatiuk.dhfs.storage.objects.repository.persistence.ObjectPersistentStore;
|
import com.usatiuk.dhfs.storage.objects.repository.persistence.ObjectPersistentStore;
|
||||||
|
import io.quarkus.logging.Log;
|
||||||
import io.quarkus.runtime.ShutdownEvent;
|
import io.quarkus.runtime.ShutdownEvent;
|
||||||
import io.quarkus.runtime.StartupEvent;
|
import io.quarkus.runtime.StartupEvent;
|
||||||
import io.smallrye.mutiny.Multi;
|
import io.smallrye.mutiny.Multi;
|
||||||
@@ -13,12 +15,16 @@ import jakarta.enterprise.context.ApplicationScoped;
|
|||||||
import jakarta.enterprise.event.Observes;
|
import jakarta.enterprise.event.Observes;
|
||||||
import jakarta.inject.Inject;
|
import jakarta.inject.Inject;
|
||||||
import org.apache.commons.lang3.NotImplementedException;
|
import org.apache.commons.lang3.NotImplementedException;
|
||||||
|
import org.eclipse.microprofile.config.inject.ConfigProperty;
|
||||||
|
|
||||||
import javax.annotation.Nonnull;
|
import javax.annotation.Nonnull;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.Optional;
|
||||||
|
|
||||||
@ApplicationScoped
|
@ApplicationScoped
|
||||||
public class DistributedObjectRepository implements ObjectRepository {
|
public class DistributedObjectRepository implements ObjectRepository {
|
||||||
|
@ConfigProperty(name = "dhfs.objects.distributed.selfname")
|
||||||
|
String selfname;
|
||||||
@Inject
|
@Inject
|
||||||
Vertx vertx;
|
Vertx vertx;
|
||||||
|
|
||||||
@@ -31,7 +37,27 @@ public class DistributedObjectRepository implements ObjectRepository {
|
|||||||
@Inject
|
@Inject
|
||||||
RemoteObjectServiceClient remoteObjectServiceClient;
|
RemoteObjectServiceClient remoteObjectServiceClient;
|
||||||
|
|
||||||
|
@Inject
|
||||||
|
SyncHandler syncHandler;
|
||||||
|
|
||||||
void init(@Observes @Priority(400) StartupEvent event) throws IOException {
|
void init(@Observes @Priority(400) StartupEvent event) throws IOException {
|
||||||
|
try {
|
||||||
|
Log.info("Starting sync");
|
||||||
|
var got = remoteObjectServiceClient.getIndex();
|
||||||
|
for (var h : got) {
|
||||||
|
var prevMtime = objectIndexService.exists(h.getNamespace(), h.getName())
|
||||||
|
? objectIndexService.getMeta(h.getNamespace(), h.getName()).get().getMtime()
|
||||||
|
: 0;
|
||||||
|
syncHandler.handleRemoteUpdate(
|
||||||
|
IndexUpdatePush.newBuilder().setSelfname(selfname
|
||||||
|
).setNamespace(h.getNamespace()).setName(h.getName()).setAssumeUnique(h.getAssumeUnique())
|
||||||
|
.setMtime(h.getMtime()).setPrevMtime(prevMtime).build()).await().indefinitely();
|
||||||
|
}
|
||||||
|
Log.info("Sync complete");
|
||||||
|
} catch (Exception e) {
|
||||||
|
Log.error("Error when fetching remote index:");
|
||||||
|
Log.error(e);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void shutdown(@Observes @Priority(200) ShutdownEvent event) throws IOException {
|
void shutdown(@Observes @Priority(200) ShutdownEvent event) throws IOException {
|
||||||
@@ -60,8 +86,13 @@ public class DistributedObjectRepository implements ObjectRepository {
|
|||||||
|
|
||||||
var info = infoOpt.get();
|
var info = infoOpt.get();
|
||||||
|
|
||||||
if (objectPersistentStore.existsObject(namespace, name).await().indefinitely())
|
Optional<Object> read = info.runReadLocked(() -> {
|
||||||
return objectPersistentStore.readObject(namespace, name).await().indefinitely();
|
if (objectPersistentStore.existsObject(namespace, name).await().indefinitely())
|
||||||
|
return Optional.of(objectPersistentStore.readObject(namespace, name).await().indefinitely());
|
||||||
|
return Optional.empty();
|
||||||
|
});
|
||||||
|
if (read.isPresent()) return read.get();
|
||||||
|
// Race?
|
||||||
|
|
||||||
return info.runWriteLocked(() -> {
|
return info.runWriteLocked(() -> {
|
||||||
return remoteObjectServiceClient.getObject(namespace, name).map(got -> {
|
return remoteObjectServiceClient.getObject(namespace, name).map(got -> {
|
||||||
@@ -73,13 +104,22 @@ public class DistributedObjectRepository implements ObjectRepository {
|
|||||||
|
|
||||||
@Nonnull
|
@Nonnull
|
||||||
@Override
|
@Override
|
||||||
public void writeObject(String namespace, Object object) {
|
public void writeObject(String namespace, Object object, Boolean canIgnoreConflict) {
|
||||||
var info = objectIndexService.getOrCreateMeta(namespace, object.getName());
|
var info = objectIndexService.getOrCreateMeta(namespace, object.getName(), canIgnoreConflict);
|
||||||
|
|
||||||
info.runWriteLocked(() -> {
|
info.runWriteLocked(() -> {
|
||||||
objectPersistentStore.writeObject(namespace, object).await().indefinitely();
|
objectPersistentStore.writeObject(namespace, object).await().indefinitely();
|
||||||
|
var prevMtime = info.getMtime();
|
||||||
info.setMtime(System.currentTimeMillis());
|
info.setMtime(System.currentTimeMillis());
|
||||||
remoteObjectServiceClient.notifyUpdate(namespace, object.getName()).await().indefinitely();
|
try {
|
||||||
|
Log.warn("Updating object " + object.getNamespace() + "/" + object.getName() + " from: " + info.getMtime() + " to: " + prevMtime);
|
||||||
|
remoteObjectServiceClient.notifyUpdate(namespace, object.getName(), prevMtime);
|
||||||
|
Log.warn("Updating object complete" + object.getNamespace() + "/" + object.getName() + " from: " + info.getMtime() + " to: " + prevMtime);
|
||||||
|
} catch (Exception e) {
|
||||||
|
Log.error("Error when notifying remote update:");
|
||||||
|
Log.error(e);
|
||||||
|
Log.error(e.getCause());
|
||||||
|
}
|
||||||
return null;
|
return null;
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,11 +0,0 @@
|
|||||||
package com.usatiuk.dhfs.storage.objects.repository.distributed;
|
|
||||||
|
|
||||||
import lombok.Getter;
|
|
||||||
import lombok.Setter;
|
|
||||||
|
|
||||||
@Getter
|
|
||||||
@Setter
|
|
||||||
public class HostInfo {
|
|
||||||
String name;
|
|
||||||
String ip;
|
|
||||||
}
|
|
||||||
@@ -55,12 +55,15 @@ public class ObjectIndex implements Serializable {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
public ObjectMeta getOrCreate(String namespace, String name) {
|
public ObjectMeta getOrCreate(String namespace, String name, boolean assumeUnique) {
|
||||||
return runWriteLocked(() -> {
|
return runWriteLocked(() -> {
|
||||||
if (_objectMetaMap.containsKey(new ImmutablePair<>(namespace, name))) {
|
if (_objectMetaMap.containsKey(new ImmutablePair<>(namespace, name))) {
|
||||||
return _objectMetaMap.get(new ImmutablePair<>(namespace, name));
|
var got = _objectMetaMap.get(new ImmutablePair<>(namespace, name));
|
||||||
|
if (got.getAssumeUnique() != assumeUnique)
|
||||||
|
throw new IllegalArgumentException("assumeUnique mismatch for " + namespace + "/" + name);
|
||||||
|
return got;
|
||||||
} else {
|
} else {
|
||||||
var newObjectMeta = new ObjectMeta(namespace, name);
|
var newObjectMeta = new ObjectMeta(namespace, name, assumeUnique);
|
||||||
_objectMetaMap.put(new ImmutablePair<>(namespace, name), newObjectMeta);
|
_objectMetaMap.put(new ImmutablePair<>(namespace, name), newObjectMeta);
|
||||||
return newObjectMeta;
|
return newObjectMeta;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,13 +1,18 @@
|
|||||||
package com.usatiuk.dhfs.storage.objects.repository.distributed;
|
package com.usatiuk.dhfs.storage.objects.repository.distributed;
|
||||||
|
|
||||||
|
import com.usatiuk.dhfs.objects.repository.distributed.IndexUpdatePush;
|
||||||
|
import com.usatiuk.dhfs.objects.repository.distributed.IndexUpdateReply;
|
||||||
import com.usatiuk.dhfs.storage.DeserializationHelper;
|
import com.usatiuk.dhfs.storage.DeserializationHelper;
|
||||||
import io.quarkus.logging.Log;
|
import io.quarkus.logging.Log;
|
||||||
import io.quarkus.runtime.ShutdownEvent;
|
import io.quarkus.runtime.ShutdownEvent;
|
||||||
import io.quarkus.runtime.StartupEvent;
|
import io.quarkus.runtime.StartupEvent;
|
||||||
|
import io.smallrye.mutiny.Uni;
|
||||||
import jakarta.annotation.Priority;
|
import jakarta.annotation.Priority;
|
||||||
import jakarta.enterprise.context.ApplicationScoped;
|
import jakarta.enterprise.context.ApplicationScoped;
|
||||||
import jakarta.enterprise.event.Observes;
|
import jakarta.enterprise.event.Observes;
|
||||||
|
import org.apache.commons.lang3.NotImplementedException;
|
||||||
import org.apache.commons.lang3.SerializationUtils;
|
import org.apache.commons.lang3.SerializationUtils;
|
||||||
|
import org.apache.commons.lang3.tuple.ImmutablePair;
|
||||||
import org.eclipse.microprofile.config.inject.ConfigProperty;
|
import org.eclipse.microprofile.config.inject.ConfigProperty;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
@@ -46,7 +51,22 @@ public class ObjectIndexService {
|
|||||||
return _index.get(namespace, name);
|
return _index.get(namespace, name);
|
||||||
}
|
}
|
||||||
|
|
||||||
public ObjectMeta getOrCreateMeta(String namespace, String name) {
|
public ObjectMeta getOrCreateMeta(String namespace, String name, boolean assumeUnique) {
|
||||||
return _index.getOrCreate(namespace, name);
|
return _index.getOrCreate(namespace, name, assumeUnique);
|
||||||
|
}
|
||||||
|
|
||||||
|
@FunctionalInterface
|
||||||
|
public interface ForAllFn {
|
||||||
|
void apply(ImmutablePair<String, String> name, ObjectMeta meta);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void forAllRead(ForAllFn fn) {
|
||||||
|
_index.runReadLocked(() -> {
|
||||||
|
// FIXME:
|
||||||
|
for (var entry : _index._objectMetaMap.entrySet()) {
|
||||||
|
fn.apply(entry.getKey(), entry.getValue());
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,7 +1,6 @@
|
|||||||
package com.usatiuk.dhfs.storage.objects.repository.distributed;
|
package com.usatiuk.dhfs.storage.objects.repository.distributed;
|
||||||
|
|
||||||
import lombok.Getter;
|
import lombok.Getter;
|
||||||
import lombok.Setter;
|
|
||||||
|
|
||||||
import java.io.Serializable;
|
import java.io.Serializable;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
@@ -11,9 +10,10 @@ import java.util.concurrent.locks.ReadWriteLock;
|
|||||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||||
|
|
||||||
public class ObjectMeta implements Serializable {
|
public class ObjectMeta implements Serializable {
|
||||||
public ObjectMeta(String namespace, String name) {
|
public ObjectMeta(String namespace, String name, Boolean assumeUnique) {
|
||||||
this._namespace = namespace;
|
this._namespace = namespace;
|
||||||
this._name = name;
|
this._name = name;
|
||||||
|
this._assumeUnique = assumeUnique;
|
||||||
}
|
}
|
||||||
|
|
||||||
private final ReadWriteLock _lock = new ReentrantReadWriteLock();
|
private final ReadWriteLock _lock = new ReentrantReadWriteLock();
|
||||||
@@ -23,12 +23,24 @@ public class ObjectMeta implements Serializable {
|
|||||||
@Getter
|
@Getter
|
||||||
final String _name;
|
final String _name;
|
||||||
|
|
||||||
@Getter
|
|
||||||
@Setter
|
|
||||||
long _mtime;
|
long _mtime;
|
||||||
|
|
||||||
@Getter
|
@Getter
|
||||||
final List<HostInfo> _remoteCopies = new ArrayList<>();
|
final Boolean _assumeUnique;
|
||||||
|
|
||||||
|
//FIXME:
|
||||||
|
final List<String> _remoteCopies = new ArrayList<>();
|
||||||
|
|
||||||
|
public void setMtime(long mtime) {
|
||||||
|
runWriteLocked(() -> {
|
||||||
|
_mtime = mtime;
|
||||||
|
return null;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getMtime() {
|
||||||
|
return runReadLocked(() -> _mtime);
|
||||||
|
}
|
||||||
|
|
||||||
public <R> R runReadLocked(Callable<R> fn) {
|
public <R> R runReadLocked(Callable<R> fn) {
|
||||||
_lock.readLock().lock();
|
_lock.readLock().lock();
|
||||||
|
|||||||
@@ -1,7 +1,62 @@
|
|||||||
package com.usatiuk.dhfs.storage.objects.repository.distributed;
|
package com.usatiuk.dhfs.storage.objects.repository.distributed;
|
||||||
|
|
||||||
|
import com.usatiuk.dhfs.objects.repository.distributed.DhfsObjectSyncGrpcGrpc;
|
||||||
|
import io.grpc.netty.NettyChannelBuilder;
|
||||||
|
import io.quarkus.runtime.ShutdownEvent;
|
||||||
|
import io.quarkus.runtime.StartupEvent;
|
||||||
|
import jakarta.annotation.Priority;
|
||||||
import jakarta.enterprise.context.ApplicationScoped;
|
import jakarta.enterprise.context.ApplicationScoped;
|
||||||
|
import jakarta.enterprise.event.Observes;
|
||||||
|
import lombok.AllArgsConstructor;
|
||||||
|
import lombok.Getter;
|
||||||
|
import org.eclipse.microprofile.config.inject.ConfigProperty;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.HashMap;
|
||||||
|
|
||||||
@ApplicationScoped
|
@ApplicationScoped
|
||||||
public class RemoteHostManager {
|
public class RemoteHostManager {
|
||||||
|
@ConfigProperty(name = "dhfs.objects.distributed.selfname")
|
||||||
|
String selfname;
|
||||||
|
|
||||||
|
@ConfigProperty(name = "dhfs.objects.distributed.friend")
|
||||||
|
String remoteHostName;
|
||||||
|
@ConfigProperty(name = "dhfs.objects.distributed.friendAddr")
|
||||||
|
String remoteHostAddr;
|
||||||
|
@ConfigProperty(name = "dhfs.objects.distributed.friendPort")
|
||||||
|
String remoteHostPort;
|
||||||
|
|
||||||
|
@Getter
|
||||||
|
@AllArgsConstructor
|
||||||
|
private static class HostInfo {
|
||||||
|
String _addr;
|
||||||
|
Integer _port;
|
||||||
|
}
|
||||||
|
|
||||||
|
final HashMap<String, HostInfo> _remoteHosts = new HashMap<>();
|
||||||
|
|
||||||
|
void init(@Observes @Priority(350) StartupEvent event) throws IOException {
|
||||||
|
_remoteHosts.put(remoteHostName, new HostInfo(remoteHostAddr, Integer.valueOf(remoteHostPort)));
|
||||||
|
}
|
||||||
|
|
||||||
|
void shutdown(@Observes @Priority(250) ShutdownEvent event) throws IOException {
|
||||||
|
}
|
||||||
|
|
||||||
|
@FunctionalInterface
|
||||||
|
public interface ClientFunction<R> {
|
||||||
|
R apply(DhfsObjectSyncGrpcGrpc.DhfsObjectSyncGrpcBlockingStub client);
|
||||||
|
}
|
||||||
|
|
||||||
|
public <R> R withClient(ClientFunction<R> fn) {
|
||||||
|
var hostInfo = _remoteHosts.get(remoteHostName);
|
||||||
|
var channel = NettyChannelBuilder.forAddress(hostInfo.getAddr(), hostInfo.getPort())
|
||||||
|
.usePlaintext().build();
|
||||||
|
var client = DhfsObjectSyncGrpcGrpc.newBlockingStub(channel).withMaxOutboundMessageSize(Integer.MAX_VALUE)
|
||||||
|
.withMaxInboundMessageSize(Integer.MAX_VALUE);
|
||||||
|
try {
|
||||||
|
return fn.apply(client);
|
||||||
|
} finally {
|
||||||
|
channel.shutdownNow();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,20 +1,71 @@
|
|||||||
package com.usatiuk.dhfs.storage.objects.repository.distributed;
|
package com.usatiuk.dhfs.storage.objects.repository.distributed;
|
||||||
|
|
||||||
|
import com.usatiuk.dhfs.objects.repository.distributed.GetIndexRequest;
|
||||||
|
import com.usatiuk.dhfs.objects.repository.distributed.GetObjectRequest;
|
||||||
|
import com.usatiuk.dhfs.objects.repository.distributed.IndexUpdatePush;
|
||||||
|
import com.usatiuk.dhfs.objects.repository.distributed.ObjectHeader;
|
||||||
|
import com.usatiuk.dhfs.storage.objects.data.Namespace;
|
||||||
import com.usatiuk.dhfs.storage.objects.data.Object;
|
import com.usatiuk.dhfs.storage.objects.data.Object;
|
||||||
|
import io.quarkus.logging.Log;
|
||||||
import io.smallrye.mutiny.Uni;
|
import io.smallrye.mutiny.Uni;
|
||||||
import jakarta.enterprise.context.ApplicationScoped;
|
import jakarta.enterprise.context.ApplicationScoped;
|
||||||
import jakarta.inject.Inject;
|
import jakarta.inject.Inject;
|
||||||
|
import org.apache.commons.lang3.NotImplementedException;
|
||||||
|
import org.eclipse.microprofile.config.inject.ConfigProperty;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
@ApplicationScoped
|
@ApplicationScoped
|
||||||
public class RemoteObjectServiceClient {
|
public class RemoteObjectServiceClient {
|
||||||
|
@ConfigProperty(name = "dhfs.objects.distributed.selfname")
|
||||||
|
String selfname;
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
ObjectIndexService objectIndexService;
|
ObjectIndexService objectIndexService;
|
||||||
|
|
||||||
|
@Inject
|
||||||
|
RemoteHostManager remoteHostManager;
|
||||||
|
|
||||||
public Uni<Object> getObject(String namespace, String name) {
|
public Uni<Object> getObject(String namespace, String name) {
|
||||||
return Uni.createFrom().item(() -> null);
|
return remoteHostManager.withClient(client -> {
|
||||||
|
var req = GetObjectRequest.newBuilder().setNamespace(namespace).setName(name).build();
|
||||||
|
var reply = client.getObject(req);
|
||||||
|
var metaOpt = objectIndexService.getMeta(namespace, name);
|
||||||
|
if (metaOpt.isEmpty()) throw new RuntimeException("Oops!");
|
||||||
|
var meta = metaOpt.get();
|
||||||
|
if (meta.getMtime() != reply.getObject().getHeader().getMtime()) {
|
||||||
|
if (!meta.getAssumeUnique() && (meta.getAssumeUnique() != reply.getObject().getHeader().getAssumeUnique())) {
|
||||||
|
Log.error("Conflict!");
|
||||||
|
throw new NotImplementedException();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return Uni.createFrom().item(new Object(
|
||||||
|
new Namespace(reply.getObject().getHeader().getNamespace()),
|
||||||
|
reply.getObject().getHeader().getName(),
|
||||||
|
reply.getObject().getContent().toByteArray()
|
||||||
|
));
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
public Uni<Boolean> notifyUpdate(String namespace, String name) {
|
public List<ObjectHeader> getIndex() {
|
||||||
return Uni.createFrom().item(true);
|
return remoteHostManager.withClient(client -> {
|
||||||
|
var req = GetIndexRequest.newBuilder().build();
|
||||||
|
var reply = client.getIndex(req);
|
||||||
|
return reply.getObjectsList();
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
public Boolean notifyUpdate(String namespace, String name, long prevMtime) {
|
||||||
|
return remoteHostManager.withClient(client -> {
|
||||||
|
var metaOpt = objectIndexService.getMeta(namespace, name);
|
||||||
|
if (metaOpt.isEmpty()) throw new RuntimeException("Oops!");
|
||||||
|
var meta = metaOpt.get();
|
||||||
|
|
||||||
|
var req = IndexUpdatePush.newBuilder().setSelfname(selfname
|
||||||
|
).setNamespace(namespace).setName(name).setAssumeUnique(meta.getAssumeUnique())
|
||||||
|
.setMtime(meta.getMtime()).setPrevMtime(prevMtime).build();
|
||||||
|
client.indexUpdate(req);
|
||||||
|
return true;
|
||||||
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,7 +1,67 @@
|
|||||||
package com.usatiuk.dhfs.storage.objects.repository.distributed;
|
package com.usatiuk.dhfs.storage.objects.repository.distributed;
|
||||||
|
|
||||||
|
import com.google.protobuf.ByteString;
|
||||||
|
import com.usatiuk.dhfs.objects.repository.distributed.*;
|
||||||
|
import com.usatiuk.dhfs.storage.objects.data.Object;
|
||||||
|
import com.usatiuk.dhfs.storage.objects.repository.persistence.ObjectPersistentStore;
|
||||||
|
import io.grpc.Status;
|
||||||
|
import io.grpc.StatusRuntimeException;
|
||||||
import io.quarkus.grpc.GrpcService;
|
import io.quarkus.grpc.GrpcService;
|
||||||
|
import io.quarkus.logging.Log;
|
||||||
|
import io.smallrye.common.annotation.Blocking;
|
||||||
|
import io.smallrye.mutiny.Uni;
|
||||||
|
import jakarta.inject.Inject;
|
||||||
|
import org.apache.commons.lang3.tuple.Pair;
|
||||||
|
|
||||||
|
import java.util.Optional;
|
||||||
|
|
||||||
|
// Note: RunOnVirtualThread hangs somehow
|
||||||
@GrpcService
|
@GrpcService
|
||||||
public class RemoteObjectServiceServer {
|
public class RemoteObjectServiceServer implements DhfsObjectSyncGrpc {
|
||||||
|
@Inject
|
||||||
|
ObjectPersistentStore objectPersistentStore;
|
||||||
|
|
||||||
|
@Inject
|
||||||
|
ObjectIndexService objectIndexService;
|
||||||
|
|
||||||
|
@Inject
|
||||||
|
SyncHandler syncHandler;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
@Blocking
|
||||||
|
public Uni<GetObjectReply> getObject(GetObjectRequest request) {
|
||||||
|
Log.info("<-- getObject: " + request.getName());
|
||||||
|
var metaOpt = objectIndexService.getMeta(request.getNamespace(), request.getName());
|
||||||
|
if (metaOpt.isEmpty()) throw new StatusRuntimeException(Status.NOT_FOUND);
|
||||||
|
var meta = metaOpt.get();
|
||||||
|
Optional<Pair<Long, Object>> read = meta.runReadLocked(() -> {
|
||||||
|
if (objectPersistentStore.existsObject(request.getNamespace(), request.getName()).await().indefinitely())
|
||||||
|
return Optional.of(Pair.of(meta.getMtime(), objectPersistentStore.readObject(request.getNamespace(), request.getName()).await().indefinitely()));
|
||||||
|
return Optional.empty();
|
||||||
|
});
|
||||||
|
if (read.isEmpty()) throw new StatusRuntimeException(Status.NOT_FOUND);
|
||||||
|
var obj = read.get().getRight();
|
||||||
|
var header = ObjectHeader.newBuilder().setName(obj.getName()).setNamespace(obj.getNamespace().getName()).setMtime(read.get().getLeft()).setAssumeUnique(meta.getAssumeUnique()).build();
|
||||||
|
var replyObj = ApiObject.newBuilder().setHeader(header).setContent(ByteString.copyFrom(obj.getData())).build();
|
||||||
|
return Uni.createFrom().item(GetObjectReply.newBuilder().setObject(replyObj).build());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
@Blocking
|
||||||
|
public Uni<GetIndexReply> getIndex(GetIndexRequest request) {
|
||||||
|
Log.info("<-- getIndex: ");
|
||||||
|
var builder = GetIndexReply.newBuilder();
|
||||||
|
objectIndexService.forAllRead((name, meta) -> {
|
||||||
|
var entry = ObjectHeader.newBuilder().setNamespace(name.getLeft()).setName(name.getRight()).setMtime(meta.getMtime()).setAssumeUnique(meta.getAssumeUnique()).build();
|
||||||
|
builder.addObjects(entry);
|
||||||
|
});
|
||||||
|
return Uni.createFrom().item(builder.build());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
@Blocking
|
||||||
|
public Uni<IndexUpdateReply> indexUpdate(IndexUpdatePush request) {
|
||||||
|
Log.info("<-- indexUpdate: " + request.getName() + " from: " + String.valueOf(request.getPrevMtime()) + " to: " + String.valueOf(request.getMtime()));
|
||||||
|
return syncHandler.handleRemoteUpdate(request);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,58 @@
|
|||||||
|
package com.usatiuk.dhfs.storage.objects.repository.distributed;
|
||||||
|
|
||||||
|
import com.usatiuk.dhfs.objects.repository.distributed.IndexUpdatePush;
|
||||||
|
import com.usatiuk.dhfs.objects.repository.distributed.IndexUpdateReply;
|
||||||
|
import com.usatiuk.dhfs.storage.objects.jrepository.JObjectManager;
|
||||||
|
import com.usatiuk.dhfs.storage.objects.repository.persistence.ObjectPersistentStore;
|
||||||
|
import io.quarkus.logging.Log;
|
||||||
|
import io.smallrye.mutiny.Uni;
|
||||||
|
import jakarta.enterprise.context.ApplicationScoped;
|
||||||
|
import jakarta.inject.Inject;
|
||||||
|
import org.apache.commons.lang3.NotImplementedException;
|
||||||
|
|
||||||
|
@ApplicationScoped
|
||||||
|
public class SyncHandler {
|
||||||
|
@Inject
|
||||||
|
ObjectPersistentStore objectPersistentStore;
|
||||||
|
|
||||||
|
@Inject
|
||||||
|
ObjectIndexService objectIndexService;
|
||||||
|
|
||||||
|
@Inject
|
||||||
|
JObjectManager jObjectManager;
|
||||||
|
|
||||||
|
public Uni<IndexUpdateReply> handleRemoteUpdate(IndexUpdatePush request) {
|
||||||
|
var metaOpt = objectIndexService.getOrCreateMeta(request.getNamespace(), request.getName(), request.getAssumeUnique());
|
||||||
|
metaOpt.runWriteLocked(() -> {
|
||||||
|
if (metaOpt.getMtime() == request.getMtime()) {
|
||||||
|
metaOpt._remoteCopies.add(request.getSelfname());
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (metaOpt.getMtime() != request.getPrevMtime()) {
|
||||||
|
if (!metaOpt.getAssumeUnique()
|
||||||
|
|| (metaOpt.getAssumeUnique() != request.getAssumeUnique())) {
|
||||||
|
Log.error("Conflict!");
|
||||||
|
throw new NotImplementedException();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
metaOpt.setMtime(request.getMtime());
|
||||||
|
|
||||||
|
metaOpt._remoteCopies.clear();
|
||||||
|
metaOpt._remoteCopies.add(request.getSelfname());
|
||||||
|
|
||||||
|
try {
|
||||||
|
objectPersistentStore.deleteObject(request.getNamespace(), request.getName()).await().indefinitely();
|
||||||
|
} catch (Exception ignored) {
|
||||||
|
}
|
||||||
|
|
||||||
|
jObjectManager.invalidateJObject(metaOpt.getNamespace(), metaOpt.getName());
|
||||||
|
|
||||||
|
return null;
|
||||||
|
});
|
||||||
|
|
||||||
|
return Uni.createFrom().item(IndexUpdateReply.newBuilder().build());
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
@@ -4,7 +4,7 @@ option java_multiple_files = true;
|
|||||||
option java_package = "com.usatiuk.dhfs.storage.files.api";
|
option java_package = "com.usatiuk.dhfs.storage.files.api";
|
||||||
option java_outer_classname = "DhfsFilesApi";
|
option java_outer_classname = "DhfsFilesApi";
|
||||||
|
|
||||||
package hello;
|
package dhfs.files;
|
||||||
|
|
||||||
service DhfsFilesGrpc {
|
service DhfsFilesGrpc {
|
||||||
rpc FindFiles (FindFilesRequest) returns (FindFilesReply) {}
|
rpc FindFiles (FindFilesRequest) returns (FindFilesReply) {}
|
||||||
|
|||||||
@@ -4,7 +4,7 @@ option java_multiple_files = true;
|
|||||||
option java_package = "com.usatiuk.dhfs.storage.objects.api";
|
option java_package = "com.usatiuk.dhfs.storage.objects.api";
|
||||||
option java_outer_classname = "DhfsObjectsApi";
|
option java_outer_classname = "DhfsObjectsApi";
|
||||||
|
|
||||||
package hello;
|
package dhfs.objects;
|
||||||
|
|
||||||
service DhfsObjectGrpc {
|
service DhfsObjectGrpc {
|
||||||
rpc FindObjects (FindObjectsRequest) returns (FindObjectsReply) {}
|
rpc FindObjects (FindObjectsRequest) returns (FindObjectsReply) {}
|
||||||
|
|||||||
56
server/src/main/proto/dhfs_objects_sync.proto
Normal file
56
server/src/main/proto/dhfs_objects_sync.proto
Normal file
@@ -0,0 +1,56 @@
|
|||||||
|
syntax = "proto3";
|
||||||
|
|
||||||
|
option java_multiple_files = true;
|
||||||
|
option java_package = "com.usatiuk.dhfs.objects.repository.distributed";
|
||||||
|
option java_outer_classname = "DhfsObjectSyncApi";
|
||||||
|
|
||||||
|
package dhfs.objects.sync;
|
||||||
|
|
||||||
|
service DhfsObjectSyncGrpc {
|
||||||
|
rpc GetObject (GetObjectRequest) returns (GetObjectReply) {}
|
||||||
|
rpc GetIndex (GetIndexRequest) returns (GetIndexReply) {}
|
||||||
|
rpc IndexUpdate (IndexUpdatePush) returns (IndexUpdateReply) {}
|
||||||
|
}
|
||||||
|
|
||||||
|
message ObjectHeader {
|
||||||
|
string namespace = 1;
|
||||||
|
string name = 2;
|
||||||
|
bool assumeUnique = 3;
|
||||||
|
uint64 mtime = 4;
|
||||||
|
}
|
||||||
|
|
||||||
|
message ApiObject {
|
||||||
|
ObjectHeader header = 1;
|
||||||
|
bytes content = 2;
|
||||||
|
}
|
||||||
|
|
||||||
|
message GetObjectRequest {
|
||||||
|
string namespace = 1;
|
||||||
|
string name = 2;
|
||||||
|
}
|
||||||
|
|
||||||
|
message GetObjectReply {
|
||||||
|
ApiObject object = 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
message GetIndexRequest {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
message GetIndexReply {
|
||||||
|
repeated ObjectHeader objects = 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
message IndexUpdatePush {
|
||||||
|
string selfname = 10;
|
||||||
|
|
||||||
|
string namespace = 1;
|
||||||
|
string name = 2;
|
||||||
|
bool assumeUnique = 3;
|
||||||
|
uint64 mtime = 4;
|
||||||
|
uint64 prevMtime = 5;
|
||||||
|
}
|
||||||
|
|
||||||
|
message IndexUpdateReply {
|
||||||
|
|
||||||
|
}
|
||||||
@@ -1,4 +1,13 @@
|
|||||||
quarkus.grpc.server.use-separate-server=false
|
quarkus.grpc.server.use-separate-server=false
|
||||||
dhfs.objects.persistence.files.root=${HOME}/dhfs_data/dhfs_root
|
dhfs.objects.persistence.files.root=${HOME}/dhfs_data/dhfs_root
|
||||||
dhfs.objects.distributed.root=${HOME}/dhfs_data/dhfs_root_d
|
dhfs.objects.distributed.root=${HOME}/dhfs_data/dhfs_root_d
|
||||||
dhfs.fuse.root=${HOME}/dhfs_data/dhfs_fuse_root
|
dhfs.fuse.root=${HOME}/dhfs_data/dhfs_fuse_root
|
||||||
|
|
||||||
|
grpc.client.greeting-service.max-inbound-message-size=9155241000
|
||||||
|
grpc.client.greeting-service.package-max-inbound-message-size=9155241000
|
||||||
|
grpc.client.greeting-service.server.max-inbound-message-size=9155241000
|
||||||
|
|
||||||
|
dhfs.objects.distributed.selfname=thing1
|
||||||
|
dhfs.objects.distributed.friend=localhost
|
||||||
|
dhfs.objects.distributed.friendAddr=asdf
|
||||||
|
dhfs.objects.distributed.friendPort=1234
|
||||||
Reference in New Issue
Block a user