rewrite all objects to be in single store part 2

This commit is contained in:
2024-06-21 14:05:04 +02:00
parent b5e7e142e8
commit 268d475517
10 changed files with 209 additions and 160 deletions

View File

@@ -19,7 +19,7 @@ public class Directory extends FsNode {
@Override
public Class<? extends ConflictResolver> getConflictResolver() {
return NotImplementedConflictResolver.class;
return DirectoryConflictResolver.class;
}
public Map<String, UUID> getChildrenMap() {

View File

@@ -3,120 +3,100 @@ package com.usatiuk.dhfs.storage.files.objects;
import com.usatiuk.dhfs.objects.repository.distributed.ObjectHeader;
import com.usatiuk.dhfs.storage.DeserializationHelper;
import com.usatiuk.dhfs.storage.objects.jrepository.JObjectManager;
import com.usatiuk.dhfs.storage.objects.repository.distributed.*;
import com.usatiuk.dhfs.storage.objects.repository.persistence.ObjectPersistentStore;
import com.usatiuk.dhfs.storage.objects.repository.distributed.ConflictResolver;
import com.usatiuk.dhfs.storage.objects.repository.distributed.ObjectMetadata;
import com.usatiuk.dhfs.storage.objects.repository.distributed.RemoteObjectServiceClient;
import io.quarkus.logging.Log;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.commons.lang3.SerializationUtils;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import java.util.LinkedHashMap;
import java.util.Objects;
import java.util.UUID;
//@ApplicationScoped
//public class DirectoryConflictResolver implements ConflictResolver {
// @ConfigProperty(name = "dhfs.objects.distributed.selfname")
// String selfname;
//
// @Inject
// ObjectPersistentStore objectPersistentStore;
//
// @Inject
// RemoteObjectServiceClient remoteObjectServiceClient;
//
// @Inject
// InvalidationQueueService invalidationQueueService;
//
// @Inject
// JObjectManager jObjectManager;
//
// @Override
// public ConflictResolutionResult resolve(String conflictHost,
// ObjectHeader conflictSource,
// String localName) {
//
// var oursData = objectPersistentStore.readObject(localName);
// var theirsData = remoteObjectServiceClient.getSpecificObject(conflictHost, conflictSource.getName());
//
// var localMeta = objectIndexService.getMeta(localName).orElseThrow(() ->
// new NotImplementedException("Race when conflict resolving"));
//
// var oursHeader = localMeta.runReadLocked(ObjectMetadata::toRpcHeader);
// var theirsHeader = theirsData.getLeft();
//
// var ours = (Directory) DeserializationHelper.deserialize(oursData);
// var theirs = (Directory) DeserializationHelper.deserialize(theirsData.getRight());
// if (!ours.getClass().equals(Directory.class) || !theirs.getClass().equals(Directory.class)) {
// Log.error("Object type mismatch!");
// throw new NotImplementedException();
// }
//
// LinkedHashMap<String, UUID> mergedChildren = new LinkedHashMap<>(ours.getChildrenMap());
// for (var entry : theirs.getChildrenMap().entrySet()) {
// if (mergedChildren.containsKey(entry.getKey()) &&
// !Objects.equals(mergedChildren.get(entry.getKey()), entry.getValue())) {
// int i = 0;
// do {
// String name = entry.getKey() + ".conflict." + i + "." + conflictHost;
// if (mergedChildren.containsKey(name)) {
// i++;
// continue;
// }
// mergedChildren.put(name, entry.getValue());
// break;
// } while (true);
// } else {
// mergedChildren.put(entry.getKey(), entry.getValue());
// }
// }
//
// var newMetaData = new ObjectMetadata(oursHeader.getName(), oursHeader.getConflictResolver());
//
// for (var entry : oursHeader.getChangelog().getEntriesList()) {
// newMetaData.getChangelog().put(entry.getHost(), entry.getVersion());
// }
// for (var entry : theirsHeader.getChangelog().getEntriesList()) {
// newMetaData.getChangelog().merge(entry.getHost(), entry.getVersion(), Long::max);
// }
//
// boolean wasChanged = mergedChildren.size() != ours.getChildrenMap().size();
// if (wasChanged) {
// newMetaData.getChangelog().merge(selfname, 1L, Long::sum);
// }
//
// var newHdr = newMetaData.toRpcHeader();
//
// var newDir = new Directory(ours.getUuid(), ours.getMode());
// for (var entry : mergedChildren.entrySet()) newDir.putKid(entry.getKey(), entry.getValue());
//
// // FIXME:
// newDir.setMtime(System.currentTimeMillis());
// newDir.setCtime(ours.getCtime());
//
// var newBytes = SerializationUtils.serialize(newDir);
//
// objectIndexService.getMeta(localName).orElseThrow(() ->
// new NotImplementedException("Race when conflict resolving")).runWriteLocked(m -> {
//
// if (wasChanged)
// if (m.getBestVersion() >= newMetaData.getOurVersion())
// throw new NotImplementedException("Race when conflict resolving");
//
// if (m.getBestVersion() > newMetaData.getOurVersion())
// throw new NotImplementedException("Race when conflict resolving");
//
// m.getChangelog().clear();
// m.getChangelog().putAll(newMetaData.getChangelog());
//
// objectPersistentStore.writeObject(m.getName(), newBytes);
// return null;
// });
// invalidationQueueService.pushInvalidationToAll(oursHeader.getName());
// jObjectManager.invalidateJObject(oursHeader.getName());
//
// return ConflictResolutionResult.RESOLVED;
// }
//}
@ApplicationScoped
public class DirectoryConflictResolver implements ConflictResolver {
@ConfigProperty(name = "dhfs.objects.distributed.selfname")
String selfname;
@Inject
RemoteObjectServiceClient remoteObjectServiceClient;
@Inject
JObjectManager jObjectManager;
@Override
public ConflictResolutionResult resolve(String conflictHost,
ObjectHeader conflictSource,
String localName) {
var oursData = jObjectManager.get(localName, Directory.class).orElseThrow(() -> new NotImplementedException("Oops"));
var theirsData = remoteObjectServiceClient.getSpecificObject(conflictHost, conflictSource.getName());
var oursHeader = oursData.runReadLocked(ObjectMetadata::toRpcHeader);
var theirsHeader = theirsData.getLeft();
var theirs = (Directory) DeserializationHelper.deserialize(theirsData.getRight());
if (!theirs.getClass().equals(Directory.class)) {
Log.error("Object type mismatch!");
throw new NotImplementedException();
}
LinkedHashMap<String, UUID> mergedChildren = new LinkedHashMap<>(oursData.runReadLocked((m, d) -> d.getChildrenMap()));
for (var entry : theirs.getChildrenMap().entrySet()) {
if (mergedChildren.containsKey(entry.getKey()) &&
!Objects.equals(mergedChildren.get(entry.getKey()), entry.getValue())) {
int i = 0;
do {
String name = entry.getKey() + ".conflict." + i + "." + conflictHost;
if (mergedChildren.containsKey(name)) {
i++;
continue;
}
mergedChildren.put(name, entry.getValue());
break;
} while (true);
} else {
mergedChildren.put(entry.getKey(), entry.getValue());
}
}
var newMetaData = new ObjectMetadata(oursHeader.getName(), oursHeader.getConflictResolver(), oursData.runReadLocked(m -> m.getType()));
for (var entry : oursHeader.getChangelog().getEntriesList()) {
newMetaData.getChangelog().put(entry.getHost(), entry.getVersion());
}
for (var entry : theirsHeader.getChangelog().getEntriesList()) {
newMetaData.getChangelog().merge(entry.getHost(), entry.getVersion(), Long::max);
}
boolean wasChanged = mergedChildren.size() != oursData.runReadLocked((m, d) -> d.getChildrenMap().size());
if (wasChanged) {
newMetaData.getChangelog().merge(selfname, 1L, Long::sum);
}
var newHdr = newMetaData.toRpcHeader();
oursData.runWriteLocked((m, d, bump) -> {
if (wasChanged)
if (m.getBestVersion() >= newMetaData.getOurVersion())
throw new NotImplementedException("Race when conflict resolving");
if (m.getBestVersion() > newMetaData.getOurVersion())
throw new NotImplementedException("Race when conflict resolving");
d.setMtime(System.currentTimeMillis());
d.setCtime(d.getCtime());
d.getChildren().clear();
d.getChildren().putAll(mergedChildren);
m.getChangelog().clear();
m.getChangelog().putAll(newMetaData.getChangelog());
return null;
});
return ConflictResolutionResult.RESOLVED;
}
}

View File

@@ -108,7 +108,8 @@ public class DhfsFileServiceImpl implements DhfsFileService {
jObjectManager.put(f);
if (!dir.runWriteLocked((m, d) -> {
if (!dir.runWriteLocked((m, d, bump) -> {
bump.apply();
return d.putKid(Path.of(name).getFileName().toString(), fuuid);
}))
return Optional.empty();
@@ -131,7 +132,8 @@ public class DhfsFileServiceImpl implements DhfsFileService {
var dir = (JObject<Directory>) found.get();
jObjectManager.put(d);
if (!dir.runWriteLocked((m, dd) -> {
if (!dir.runWriteLocked((m, dd, bump) -> {
bump.apply();
return dd.putKid(Path.of(name).getFileName().toString(), duuid);
}))
return Optional.empty();
@@ -148,7 +150,8 @@ public class DhfsFileServiceImpl implements DhfsFileService {
if (!(found.get().isOf(Directory.class))) return false;
var dir = (JObject<Directory>) found.get();
return dir.runWriteLocked((m, d) -> {
return dir.runWriteLocked((m, d, bump) -> {
bump.apply();
return d.removeKid(Path.of(name).getFileName().toString());
});
}
@@ -178,7 +181,8 @@ public class DhfsFileServiceImpl implements DhfsFileService {
var dir = (JObject<Directory>) found.get();
dir.runWriteLocked((m, d) -> {
dir.runWriteLocked((m, d, bump) -> {
bump.apply();
d.getChildren().put(Path.of(to).getFileName().toString(), UUID.fromString(dent.get().getName()));
return null;
});
@@ -191,7 +195,8 @@ public class DhfsFileServiceImpl implements DhfsFileService {
var dent = getDirEntry(name);
if (dent.isEmpty()) return false;
dent.get().runWriteLocked((m, d) -> {
dent.get().runWriteLocked((m, d, bump) -> {
bump.apply();
d.setMode(mode);
return null;
});
@@ -261,7 +266,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
return Optional.empty();
}
var chunkBytes = chunkRead.get().runWriteLocked((m, d) -> d.getBytes());
var chunkBytes = chunkRead.get().runReadLocked((m, d) -> d.getBytes());
long readableLen = chunkBytes.length - offInChunk;
@@ -324,7 +329,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
return -1L;
}
var chunkBytes = chunkRead.get().runWriteLocked((m, d) -> d.getBytes());
var chunkBytes = chunkRead.get().runReadLocked((m, d) -> d.getBytes());
ChunkData newChunkData = new ChunkData(Arrays.copyOfRange(chunkBytes, 0, (int) (offset - first.getKey())));
ChunkInfo newChunkInfo = new ChunkInfo(newChunkData.getHash(), newChunkData.getBytes().length);
jObjectManager.put(newChunkData);
@@ -350,7 +355,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
return -1L;
}
var lchunkBytes = lchunkRead.get().runWriteLocked((m, d) -> d.getBytes());
var lchunkBytes = lchunkRead.get().runReadLocked((m, d) -> d.getBytes());
if (last.getKey() + lchunkBytes.length > offset + data.length) {
var startInFile = offset + data.length;
@@ -365,7 +370,8 @@ public class DhfsFileServiceImpl implements DhfsFileService {
}
try {
file.runWriteLocked((m, fileData) -> {
file.runWriteLocked((m, fileData, bump) -> {
bump.apply();
fileData.getChunks().clear();
fileData.getChunks().putAll(newChunks);
fileData.setMtime(System.currentTimeMillis());
@@ -390,7 +396,8 @@ public class DhfsFileServiceImpl implements DhfsFileService {
if (length == 0) {
try {
file.runWriteLocked((m, fileData) -> {
file.runWriteLocked((m, fileData, bump) -> {
bump.apply();
fileData.getChunks().clear();
fileData.setMtime(System.currentTimeMillis());
return null;
@@ -429,7 +436,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
return false;
}
var chunkBytes = chunkRead.get().runWriteLocked((m, d) -> d.getBytes());
var chunkBytes = chunkRead.get().runReadLocked((m, d) -> d.getBytes());
if (lastChunk.getKey() + chunkBytes.length > 0) {
int start = (int) (length - lastChunk.getKey());
@@ -443,7 +450,8 @@ public class DhfsFileServiceImpl implements DhfsFileService {
}
try {
file.runWriteLocked((m, fileData) -> {
file.runWriteLocked((m, fileData, bump) -> {
bump.apply();
fileData.getChunks().clear();
fileData.getChunks().putAll(newChunks);
fileData.setMtime(System.currentTimeMillis());
@@ -467,7 +475,8 @@ public class DhfsFileServiceImpl implements DhfsFileService {
var file = fileOpt.get();
try {
file.runWriteLocked((m, fileData) -> {
file.runWriteLocked((m, fileData, bump) -> {
bump.apply();
fileData.setMtime(mtimeMs);
return null;
});

View File

@@ -5,6 +5,8 @@ import com.usatiuk.dhfs.storage.objects.repository.distributed.ObjectMetadata;
import org.apache.commons.lang3.NotImplementedException;
import java.io.Serializable;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -12,7 +14,9 @@ public class JObject<T extends JObjectData> implements Serializable {
protected JObject(JObjectResolver resolver, String name, String conflictResolver, T obj) {
_resolver = resolver;
_metaPart = new ObjectMetadata(name, conflictResolver, obj.getClass());
_dataPart = obj;
_dataPart.set(obj);
// FIXME:?
_resolver.bumpVersionSelf(this);
}
protected JObject(JObjectResolver resolver, ObjectMetadata objectMetadata) {
@@ -27,18 +31,27 @@ public class JObject<T extends JObjectData> implements Serializable {
protected final ReadWriteLock _lock = new ReentrantReadWriteLock();
private final ObjectMetadata _metaPart;
private final JObjectResolver _resolver;
private T _dataPart;
private final AtomicReference<T> _dataPart = new AtomicReference<>();
public Class<? extends ConflictResolver> getConflictResolver() {
return runReadLocked(m -> {
try {
return (Class<? extends ConflictResolver>) Class.forName(m.getConflictResolver());
return (Class<? extends ConflictResolver>) Class.forName(m.getConflictResolver(), true, JObject.class.getClassLoader());
} catch (ClassNotFoundException e) {
throw new NotImplementedException(e);
}
});
}
public boolean isResolved() {
return _dataPart.get() != null;
}
@FunctionalInterface
public interface VoidFn {
void apply();
}
@FunctionalInterface
public interface ObjectMetaFn<R> {
R apply(ObjectMetadata indexData);
@@ -49,6 +62,16 @@ public class JObject<T extends JObjectData> implements Serializable {
R apply(ObjectMetadata meta, T data);
}
@FunctionalInterface
public interface ObjectMetaFnW<R> {
R apply(ObjectMetadata indexData, VoidFn bump, VoidFn invalidate);
}
@FunctionalInterface
public interface ObjectDataFnW<T, R> {
R apply(ObjectMetadata meta, T data, VoidFn bump);
}
public <X> boolean isOf(Class<X> klass) {
return runReadLocked((m) -> (klass.isAssignableFrom(m.getType())));
}
@@ -62,11 +85,16 @@ public class JObject<T extends JObjectData> implements Serializable {
}
}
public <R> R runWriteLocked(ObjectMetaFn<R> fn) {
public <R> R runWriteLockedMeta(ObjectMetaFnW<R> fn) {
_lock.writeLock().lock();
try {
var ret = fn.apply(_metaPart);
_resolver.notifyWrite(this);
var ver = _metaPart.getOurVersion();
VoidFn invalidateFn = () -> {
_dataPart.set(null);
};
var ret = fn.apply(_metaPart, () -> _resolver.bumpVersionSelf(this), invalidateFn);
if (!Objects.equals(ver, _metaPart.getOurVersion()))
_resolver.notifyWrite(this);
return ret;
} finally {
_lock.writeLock().unlock();
@@ -74,12 +102,12 @@ public class JObject<T extends JObjectData> implements Serializable {
}
private void resolveDataPart() {
if (_dataPart == null) {
if (_dataPart.get() == null) {
_lock.readLock().lock();
try {
if (_dataPart == null) {
_dataPart = _resolver.resolveData(this);
if (!_metaPart.getType().isAssignableFrom(_dataPart.getClass()))
if (_dataPart.get() == null) {
_dataPart.set(_resolver.resolveData(this));
if (!_metaPart.getType().isAssignableFrom(_dataPart.get().getClass()))
throw new NotImplementedException("Type mismatch for " + getName());
}
} finally {
@@ -92,18 +120,20 @@ public class JObject<T extends JObjectData> implements Serializable {
resolveDataPart();
_lock.readLock().lock();
try {
return fn.apply(_metaPart, _dataPart);
return fn.apply(_metaPart, _dataPart.get());
} finally {
_lock.readLock().unlock();
}
}
public <R> R runWriteLocked(ObjectDataFn<T, R> fn) {
public <R> R runWriteLocked(ObjectDataFnW<T, R> fn) {
resolveDataPart();
_lock.writeLock().lock();
try {
var ret = fn.apply(_metaPart, _dataPart);
_resolver.notifyWrite(this);
var ver = _metaPart.getOurVersion();
var ret = fn.apply(_metaPart, _dataPart.get(), () -> _resolver.bumpVersionSelf(this));
if (!Objects.equals(ver, _metaPart.getOurVersion()))
_resolver.notifyWrite(this);
return ret;
} finally {
_lock.writeLock().unlock();

View File

@@ -9,13 +9,11 @@ import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import lombok.Getter;
import org.apache.commons.lang3.NotImplementedException;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import java.lang.ref.ReferenceQueue;
import java.lang.ref.SoftReference;
import java.util.Collection;
import java.util.HashMap;
import java.util.Objects;
import java.util.Optional;
import java.util.*;
@ApplicationScoped
public class JObjectManagerImpl implements JObjectManager {
@@ -98,7 +96,13 @@ public class JObjectManagerImpl implements JObjectManager {
@Override
public Collection<JObject<?>> find(String prefix) {
throw new NotImplementedException();
var ret = new ArrayList<JObject<?>>();
for (var f : objectPersistentStore.findObjects("meta_")) {
var got = get(f.substring(5));
if (got.isPresent())
ret.add(got.get());
}
return ret;
}
@Override

View File

@@ -7,6 +7,7 @@ import com.usatiuk.dhfs.storage.objects.repository.persistence.ObjectPersistentS
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import org.apache.commons.lang3.SerializationUtils;
import org.eclipse.microprofile.config.inject.ConfigProperty;
@ApplicationScoped
public class JObjectResolver {
@@ -19,6 +20,9 @@ public class JObjectResolver {
@Inject
InvalidationQueueService invalidationQueueService;
@ConfigProperty(name = "dhfs.objects.distributed.selfname")
String selfname;
public <T extends JObjectData> T resolveData(JObject<T> jObject) {
if (objectPersistentStore.existsObject(jObject.getName()))
return DeserializationHelper.deserialize(objectPersistentStore.readObject(jObject.getName()));
@@ -30,7 +34,17 @@ public class JObjectResolver {
public void notifyWrite(JObject<?> self) {
objectPersistentStore.writeObject("meta_" + self.getName(), self.runReadLocked((m) -> SerializationUtils.serialize(m)));
objectPersistentStore.writeObject(self.getName(), self.runReadLocked((m, d) -> SerializationUtils.serialize(d)));
invalidationQueueService.pushInvalidationToAll(self.getName());
if (self.isResolved()) {
objectPersistentStore.writeObject(self.getName(), self.runReadLocked((m, d) -> SerializationUtils.serialize(d)));
// FIXME:?
invalidationQueueService.pushInvalidationToAll(self.getName());
}
}
public void bumpVersionSelf(JObject<?> self) {
self.runWriteLockedMeta((m, bump, invalidate) -> {
m.bumpVersion(selfname);
return null;
});
}
}

View File

@@ -5,6 +5,7 @@ import com.usatiuk.dhfs.objects.repository.distributed.ObjectChangelogEntry;
import com.usatiuk.dhfs.objects.repository.distributed.ObjectHeader;
import com.usatiuk.dhfs.storage.objects.jrepository.JObjectData;
import lombok.Getter;
import org.eclipse.microprofile.config.ConfigProvider;
import java.io.Serializable;
import java.util.LinkedHashMap;
@@ -41,6 +42,10 @@ public class ObjectMetadata implements Serializable {
return Math.max(getOurVersion(), _remoteCopies.values().stream().max(Long::compareTo).get());
}
public void bumpVersion(String selfname) {
_changelog.merge(selfname, 1L, Long::sum);
}
public ObjectChangelog toRpcChangelog() {
var changelogBuilder = ObjectChangelog.newBuilder();
for (var m : getChangelog().entrySet()) {
@@ -53,6 +58,7 @@ public class ObjectMetadata implements Serializable {
var headerBuilder = ObjectHeader.newBuilder().setName(getName());
headerBuilder.setConflictResolver(getConflictResolver());
headerBuilder.setChangelog(toRpcChangelog());
headerBuilder.setType(_type.getName());
return headerBuilder.build();
}
}

View File

@@ -39,7 +39,7 @@ public class RemoteHostManager {
void shutdown(@Observes @Priority(250) ShutdownEvent event) throws IOException {
}
@Scheduled(every = "10s", concurrentExecution = Scheduled.ConcurrentExecution.SKIP)
@Scheduled(every = "2s", concurrentExecution = Scheduled.ConcurrentExecution.SKIP)
@Blocking
public void tryConnectAll() {
for (var host : persistentRemoteHostsService.getHosts()) {

View File

@@ -62,22 +62,23 @@ public class SyncHandler {
handleRemoteUpdate(IndexUpdatePush.newBuilder()
.setSelfname(got.getSelfname()).setHeader(h).build());
}
// // Push our index to the other peer too, as they might not request it if
// // they didn't thing we were disconnected
// List<String> toPush = new ArrayList<>();
// objectIndexService.forAllRead((name, meta) -> {
// toPush.add(name);
// });
// for (String name : toPush) {
// invalidationQueueService.pushInvalidationToOne(host, name);
// }
// Push our index to the other peer too, as they might not request it if
// they didn't thing we were disconnected
var objs = jObjectManager.find("");
for (var obj : objs) {
obj.runReadLocked((meta) -> {
invalidationQueueService.pushInvalidationToOne(host, obj.getName());
return null;
});
}
}
public IndexUpdateReply handleRemoteUpdate(IndexUpdatePush request) {
JObject<?> found;
try {
found = jObjectManager.getOrPut(request.getHeader().getName(), new ObjectMetadata(
request.getHeader().getName(), request.getHeader().getConflictResolver(), (Class<? extends JObjectData>) Class.forName(request.getHeader().getType())
request.getHeader().getName(), request.getHeader().getConflictResolver(), (Class<? extends JObjectData>) Class.forName(request.getHeader().getType(), true, JObject.class.getClassLoader())
));
} catch (ClassNotFoundException ex) {
throw new NotImplementedException(ex);
@@ -90,14 +91,14 @@ public class SyncHandler {
var receivedTotalVer = request.getHeader().getChangelog().getEntriesList()
.stream().map(ObjectChangelogEntry::getVersion).reduce(0L, Long::sum);
boolean conflict = found.runWriteLocked((md) -> {
boolean conflict = found.runWriteLockedMeta((md, bump, invalidate) -> {
if (md.getRemoteCopies().getOrDefault(request.getSelfname(), 0L) > receivedTotalVer) {
Log.error("Received older index update than was known for host: "
+ request.getSelfname() + " " + request.getHeader().getName());
return false;
}
if (md.getChangelog().get(selfname) > receivedSelfVer) return true;
if (md.getChangelog().getOrDefault(selfname, 0L) > receivedSelfVer) return true;
md.getRemoteCopies().put(request.getSelfname(), receivedTotalVer);
@@ -117,6 +118,7 @@ public class SyncHandler {
// md.getBestVersion() > md.getTotalVersion() should also work
if (receivedTotalVer > md.getOurVersion()) {
invalidate.apply();
try {
Log.info("Deleting " + request.getHeader().getName() + " as per invalidation from " + request.getSelfname());
objectPersistentStore.deleteObject(request.getHeader().getName());

View File

@@ -5,7 +5,6 @@ import io.grpc.StatusRuntimeException;
import io.quarkus.logging.Log;
import io.quarkus.runtime.ShutdownEvent;
import io.quarkus.runtime.StartupEvent;
import io.smallrye.mutiny.Multi;
import io.vertx.mutiny.core.Vertx;
import io.vertx.mutiny.core.buffer.Buffer;
import jakarta.annotation.Priority;
@@ -17,6 +16,7 @@ import org.eclipse.microprofile.config.inject.ConfigProperty;
import javax.annotation.Nonnull;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
@ApplicationScoped
@@ -44,10 +44,14 @@ public class FileObjectPersistentStore implements ObjectPersistentStore {
if (!nsRoot.toFile().isDirectory())
throw new StatusRuntimeException(Status.NOT_FOUND);
return vertx.fileSystem().readDir(nsRoot.toString()).onItem()
.transformToMulti(v -> Multi.createFrom().iterable(v))
.select().where(n -> n.startsWith(prefix))
.map(f -> nsRoot.relativize(Paths.get(f)).toString()).collect().asList().await().indefinitely();
var read = vertx.fileSystem().readDir(nsRoot.toString()).await().indefinitely();
ArrayList<String> out = new ArrayList<>();
for (var s : read) {
var rel = nsRoot.relativize(Paths.get(s)).toString();
if (rel.startsWith(prefix))
out.add(rel);
}
return out;
}
@Nonnull