2 Commits

Author SHA1 Message Date
69aa61b98a Revert "push moving prototype"
This reverts commit fb4fd7e712.
2024-08-02 10:27:08 +02:00
3333151677 Revert "handle deleted objects in getObject pushedMoves"
This reverts commit 81de8084a7.
2024-08-02 10:27:07 +02:00
19 changed files with 77 additions and 475 deletions

View File

@@ -10,7 +10,6 @@ import com.usatiuk.dhfs.objects.jrepository.JObjectManager;
import com.usatiuk.dhfs.objects.repository.ConflictResolver;
import com.usatiuk.dhfs.objects.repository.ObjectHeader;
import com.usatiuk.dhfs.objects.repository.PersistentRemoteHostsService;
import com.usatiuk.dhfs.objects.repository.movedummies.MoveDummyRegistry;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.quarkus.logging.Log;
@@ -30,9 +29,6 @@ public class FileConflictResolver implements ConflictResolver {
@Inject
JObjectManager jObjectManager;
@Inject
MoveDummyRegistry moveDummyRegistry;
@ConfigProperty(name = "dhfs.files.use_hash_for_chunks")
boolean useHashForChunks;
@@ -116,7 +112,6 @@ public class FileConflictResolver implements ConflictResolver {
oursFile.getChunks().put(e.getLeft(), e.getValue());
jObjectManager.getOrPut(ChunkData.getNameFromHash(e.getValue()), ChunkData.class, Optional.of(ChunkInfo.getNameFromHash(e.getValue())));
jObjectManager.getOrPut(ChunkInfo.getNameFromHash(e.getValue()), ChunkInfo.class, Optional.of(oursFile.getName()));
moveDummyRegistry.registerMovedRef(ours, ChunkInfo.getNameFromHash(e.getValue()));
}
HashSet<String> oursNew = new HashSet<>(oursFile.getChunks().values());
@@ -127,32 +122,30 @@ public class FileConflictResolver implements ConflictResolver {
newFile.setMtime(second.getMtime());
newFile.setCtime(second.getCtime());
var newFileJ = jObjectManager.putLocked(newFile, Optional.of(_oursDir.getName()));
try {
for (var e : secondChunksCopy) {
newFile.getChunks().put(e.getLeft(), e.getValue());
jObjectManager.getOrPut(ChunkData.getNameFromHash(e.getValue()), ChunkData.class, Optional.of(ChunkInfo.getNameFromHash(e.getValue())));
jObjectManager.getOrPut(ChunkInfo.getNameFromHash(e.getValue()), ChunkInfo.class, Optional.ofNullable(newFile.getName()));
moveDummyRegistry.registerMovedRef(newFileJ, ChunkInfo.getNameFromHash(e.getValue())); // TODO: This probably could be optimized
}
var theName = oursDir.getChildren().entrySet().stream()
.filter(p -> p.getValue().equals(oursFile.getUuid())).findAny()
.orElseThrow(() -> new NotImplementedException("Could not find our file in directory " + oursDir.getName()));
int i = 0;
do {
String name = theName.getKey() + ".conflict." + i + "." + otherHostname;
if (oursDir.getChildren().containsKey(name)) {
i++;
continue;
}
oursDir.getChildren().put(name, newFile.getUuid());
break;
} while (true);
} finally {
newFileJ.rwUnlock();
for (var e : secondChunksCopy) {
newFile.getChunks().put(e.getLeft(), e.getValue());
jObjectManager.getOrPut(ChunkData.getNameFromHash(e.getValue()), ChunkData.class, Optional.of(ChunkInfo.getNameFromHash(e.getValue())));
jObjectManager.getOrPut(ChunkInfo.getNameFromHash(e.getValue()), ChunkInfo.class, Optional.ofNullable(newFile.getName()));
}
var theName = oursDir.getChildren().entrySet().stream()
.filter(p -> p.getValue().equals(oursFile.getUuid())).findAny()
.orElseThrow(() -> new NotImplementedException("Could not find our file in directory " + oursDir.getName()));
jObjectManager.put(newFile, Optional.of(_oursDir.getName()));
int i = 0;
do {
String name = theName.getKey() + ".conflict." + i + "." + otherHostname;
if (oursDir.getChildren().containsKey(name)) {
i++;
continue;
}
oursDir.getChildren().put(name, newFile.getUuid());
break;
} while (true);
bumpDir.apply();
for (var cuuid : oursBackup) {

View File

@@ -6,7 +6,6 @@ import com.usatiuk.dhfs.files.objects.*;
import com.usatiuk.dhfs.objects.jrepository.JObject;
import com.usatiuk.dhfs.objects.jrepository.JObjectManager;
import com.usatiuk.dhfs.objects.repository.PersistentRemoteHostsService;
import com.usatiuk.dhfs.objects.repository.movedummies.MoveDummyRegistry;
import com.usatiuk.utils.StatusRuntimeExceptionNoStacktrace;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
@@ -32,9 +31,6 @@ public class DhfsFileServiceImpl implements DhfsFileService {
@Inject
JObjectManager jObjectManager;
@Inject
MoveDummyRegistry moveDummyRegistry;
@ConfigProperty(name = "dhfs.files.target_chunk_size")
int targetChunkSize;
@@ -356,28 +352,24 @@ public class DhfsFileServiceImpl implements DhfsFileService {
newDent = d;
theFile.getMeta().removeRef(dentFrom.getName());
theFile.getMeta().addRef(dentTo.getName());
moveDummyRegistry.registerMovedRef(dentTo, newDent.getName());
} else if (theFile.getData() instanceof File f) {
var newFile = new File(UUID.randomUUID(), f.getMode(), UUID.fromString(dentTo.getName()), f.isSymlink());
newFile.setMtime(f.getMtime());
newFile.setCtime(f.getCtime());
newFile.getChunks().putAll(f.getChunks());
var newFileJ = jObjectManager.putLocked(newFile, Optional.of(dentTo.getName()));
try {
for (var c : newFile.getChunks().values()) {
var o = jObjectManager.get(ChunkInfo.getNameFromHash(c))
.orElseThrow(() -> new StatusRuntimeException(Status.DATA_LOSS.withDescription("Could not find chunk " + c + " when moving " + from)));
o.runWriteLocked(JObject.ResolutionStrategy.NO_RESOLUTION, (m, d, b, i) -> {
m.addRef(newFile.getName());
moveDummyRegistry.registerMovedRef(newFileJ, m.getName());
return null;
});
}
theFile.getMeta().removeRef(dentFrom.getName());
newDent = newFile;
} finally {
newFileJ.rwUnlock();
for (var c : newFile.getChunks().values()) {
var o = jObjectManager.get(ChunkInfo.getNameFromHash(c))
.orElseThrow(() -> new StatusRuntimeException(Status.DATA_LOSS.withDescription("Could not find chunk " + c + " when moving " + from)));
o.runWriteLocked(JObject.ResolutionStrategy.NO_RESOLUTION, (m, d, b, i) -> {
m.addRef(newFile.getName());
return null;
});
}
theFile.getMeta().removeRef(dentFrom.getName());
jObjectManager.put(newFile, Optional.of(dentTo.getName()));
newDent = newFile;
} else {
throw new StatusRuntimeException(Status.ABORTED.withDescription(theFile.getName() + " is of unknown type"));
}

View File

@@ -8,10 +8,7 @@ import io.quarkus.logging.Log;
import jakarta.annotation.Nullable;
import org.apache.commons.lang3.NotImplementedException;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.UUID;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -29,7 +26,6 @@ public class JObject<T extends JObjectData> {
final int metaHash;
final int externalHash;
final boolean data;
boolean forceInvalidate = false;
final HashSet<String> oldRefs;
TransactionState() {
@@ -49,7 +45,7 @@ public class JObject<T extends JObjectData> {
}
}
void commit() {
void commit(boolean forceInvalidate) {
_resolver.updateDeletionState(JObject.this);
var newDataHash = _metaPart.dataHash();
@@ -70,10 +66,6 @@ public class JObject<T extends JObjectData> {
verifyRefs(oldRefs);
}
void forceInvalidate() {
forceInvalidate = true;
}
}
private TransactionState _transactionState = null;
@@ -249,15 +241,14 @@ public class JObject<T extends JObjectData> {
_lock.readLock().unlock();
}
protected void forceInvalidate() {
assertRWLock();
_transactionState.forceInvalidate();
public void rwUnlock() {
rwUnlock(false);
}
public void rwUnlock() {
public void rwUnlock(boolean forceInvalidate) {
try {
if (_lock.writeLock().getHoldCount() == 1) {
_transactionState.commit();
_transactionState.commit(forceInvalidate);
_transactionState = null;
}
} catch (Exception ex) {
@@ -267,12 +258,8 @@ public class JObject<T extends JObjectData> {
}
}
public boolean haveRwLock() {
return _lock.isWriteLockedByCurrentThread();
}
public void assertRWLock() {
if (!haveRwLock())
if (!_lock.isWriteLockedByCurrentThread())
throw new IllegalStateException("Expected to be write-locked there: " + getName() + " " + Thread.currentThread().getName());
}
@@ -341,7 +328,7 @@ public class JObject<T extends JObjectData> {
throw new IllegalStateException("Expected to be deleted when discarding data");
_dataPart.set(null);
_metaPart.setHaveLocalCopy(false);
_metaPart.setSavedRefs(new HashSet<>());
_metaPart.setSavedRefs(Collections.emptySet());
}
public enum ResolutionStrategy {

View File

@@ -11,10 +11,8 @@ public interface JObjectManager {
// Put a new object
<T extends JObjectData> JObject<T> put(T object, Optional<String> parent);
<T extends JObjectData> JObject<T> putLocked(T object, Optional<String> parent);
// Get an object with a name if it exists, otherwise create new one based on metadata
// Should be used when working with objects referenced from the outside
JObject<?> getOrPut(String name, Class<? extends JObjectData> klass, Optional<String> parent);
JObject<?> getOrPutLocked(String name, Class<? extends JObjectData> klass, Optional<String> parent);
}

View File

@@ -121,7 +121,7 @@ public class JObjectManagerImpl implements JObjectManager {
}
@Override
public <D extends JObjectData> JObject<D> putLocked(D object, Optional<String> parent) {
public <D extends JObjectData> JObject<D> put(D object, Optional<String> parent) {
while (true) {
JObject<?> ret;
JObject<?> newObj = null;
@@ -161,30 +161,20 @@ public class JObjectManagerImpl implements JObjectManager {
});
} finally {
if (newObj != null)
newObj.forceInvalidate();
newObj.rwUnlock(true);
}
if (newObj == null) {
if (newObj == null)
jObjectLRU.notifyAccess(ret);
ret.rwLock();
}
return (JObject<D>) ret;
}
}
@Override
public <D extends JObjectData> JObject<D> put(D object, Optional<String> parent) {
var ret = putLocked(object, parent);
ret.rwUnlock();
return ret;
}
@Override
public JObject<?> getOrPutLocked(String name, Class<? extends JObjectData> klass, Optional<String> parent) {
public JObject<?> getOrPut(String name, Class<? extends JObjectData> klass, Optional<String> parent) {
while (true) {
var got = get(name);
if (got.isPresent()) {
got.get().rwLock();
got.get().narrowClass(klass);
got.get().markSeen();
parent.ifPresent(s -> got.get().runWriteLocked(JObject.ResolutionStrategy.NO_RESOLUTION, (m, d, b, i) -> {
@@ -200,29 +190,26 @@ public class JObjectManagerImpl implements JObjectManager {
JObject<?> ret = null;
var created = new JObject<>(jObjectResolver, new ObjectMetadata(name, false, klass));
created.rwLock();
while (ret == null) {
var ref = _map.computeIfAbsent(name, k -> new NamedWeakReference(created, _refQueue));
if (ref.get() == null) _map.remove(name, ref);
else ret = ref.get();
}
if (ret != created) continue;
try {
while (ret == null) {
var ref = _map.computeIfAbsent(name, k -> new NamedWeakReference(created, _refQueue));
if (ref.get() == null) _map.remove(name, ref);
else ret = ref.get();
}
if (ret != created) continue;
created.runWriteLocked(JObject.ResolutionStrategy.NO_RESOLUTION, (m, d, b, i) -> {
parent.ifPresent(m::addRef);
m.markSeen();
return null;
});
created.runWriteLocked(JObject.ResolutionStrategy.NO_RESOLUTION, (m, d, b, i) -> {
parent.ifPresent(m::addRef);
m.markSeen();
return null;
});
} finally {
created.rwUnlock();
}
return created;
}
}
@Override
public JObject<?> getOrPut(String name, Class<? extends JObjectData> klass, Optional<String> parent) {
var obj = getOrPutLocked(name, klass, parent);
obj.rwUnlock();
return obj;
}
private static class NamedWeakReference extends WeakReference<JObject<?>> {
@Getter
final String _key;

View File

@@ -173,7 +173,7 @@ public class JObjectRefProcessor {
if (m.isLocked()) return null;
if (m.isDeleted()) return null;
if (!m.isDeletionCandidate()) return null;
if (m.isSeen() && m.carefulDeletion()) {
if (m.isSeen() && m.getKnownClass().isAnnotationPresent(Movable.class)) {
if (!processMovable(got))
return null;
}

View File

@@ -121,7 +121,7 @@ public class JObjectResolver {
refs.forEach(r -> {
Log.trace("Hydrating ref after undelete " + r + " for " + self.getName());
jObjectManager.getOrPut(r, self.getData() != null ? self.getData().getRefType() : JObjectData.class, Optional.of(self.getName()));
jObjectManager.getOrPut(r, self.getData().getRefType(), Optional.of(self.getName()));
});
}

View File

@@ -7,6 +7,7 @@ import com.usatiuk.dhfs.objects.repository.ObjectHeader;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.quarkus.logging.Log;
import io.vertx.core.impl.ConcurrentHashSet;
import lombok.Getter;
import lombok.Setter;
@@ -15,6 +16,8 @@ import java.io.ObjectInputStream;
import java.io.Serial;
import java.io.Serializable;
import java.util.*;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
public class ObjectMetadata implements Serializable {
@@ -37,14 +40,11 @@ public class ObjectMetadata implements Serializable {
private Map<UUID, Long> _changelog = new LinkedHashMap<>(4);
@Getter
@Setter
private Set<String> _savedRefs = new HashSet<>(4);
private Set<String> _savedRefs = Collections.emptySet();
@Getter
private boolean _locked = false;
@Getter
@Setter
private volatile boolean _isMoveDummy = false;
@Getter
@Setter
private volatile boolean _haveLocalCopy = false;
@Getter
private transient volatile boolean _written = true;
@@ -118,7 +118,6 @@ public class ObjectMetadata implements Serializable {
public void addRef(String from) {
_confirmedDeletes.clear();
_referrers.add(from);
_isMoveDummy = false;
if (Log.isTraceEnabled())
Log.trace("Adding ref " + from + " to " + getName());
}
@@ -185,10 +184,6 @@ public class ObjectMetadata implements Serializable {
return headerBuilder.build();
}
public boolean carefulDeletion() {
return _isMoveDummy;
}
public int metaHash() {
int res = Objects.hashCode(_name);
res = 31 * res + Objects.hashCode(isSeen());
@@ -201,7 +196,6 @@ public class ObjectMetadata implements Serializable {
res = 31 * res + Objects.hashCode(_remoteCopies);
res = 31 * res + Objects.hashCode(_savedRefs);
res = 31 * res + Objects.hashCode(_haveLocalCopy);
res = 31 * res + Objects.hashCode(_isMoveDummy);
return res;
}

View File

@@ -29,7 +29,6 @@ public class ObjectMetadataSerializer implements ProtoSerializer<ObjectMetadataP
.addAllSavedRefs(object.getSavedRefs() != null ? object.getSavedRefs() : Collections.emptyList())
.setLocked(object.isLocked())
.setHaveLocalCopy(object.isHaveLocalCopy())
.setIsMoveDummy(object.isMoveDummy())
.build();
}
@@ -53,8 +52,6 @@ public class ObjectMetadataSerializer implements ProtoSerializer<ObjectMetadataP
obj.lock();
if (message.getHaveLocalCopy())
obj.setHaveLocalCopy(true);
if (message.getIsMoveDummy())
obj.setMoveDummy(true);
return obj;
} catch (ClassNotFoundException cx) {

View File

@@ -6,8 +6,6 @@ import com.usatiuk.dhfs.objects.jrepository.JObjectManager;
import com.usatiuk.dhfs.objects.jrepository.PushResolution;
import com.usatiuk.dhfs.objects.persistence.JObjectDataP;
import com.usatiuk.dhfs.objects.protoserializer.ProtoSerializerService;
import com.usatiuk.dhfs.objects.repository.movedummies.MoveDummyEntry;
import com.usatiuk.dhfs.objects.repository.movedummies.MoveDummyProcessor;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.quarkus.logging.Log;
@@ -38,41 +36,14 @@ public class RemoteObjectServiceClient {
InvalidationQueueService invalidationQueueService;
@Inject
ProtoSerializerService protoSerializerService;
@Inject
MoveDummyProcessor moveDummyProcessor;
private GetObjectReply getObjectImpl(DhfsObjectSyncGrpcGrpc.DhfsObjectSyncGrpcBlockingStub client, String name) {
GetObjectReply reply;
do {
reply = client.getObject(GetObjectRequest.newBuilder().setSelfUuid(persistentRemoteHostsService.getSelfUuid().toString()).setName(name).build());
if (!reply.hasObject() && !reply.hasPushedMoves())
throw new IllegalStateException("Reply has neither pushed moves nor object!");
if (reply.hasPushedMoves())
moveDummyProcessor.processPushedMoves(UUID.fromString(reply.getSelfUuid()), reply.getPushedMoves());
} while (!reply.hasObject());
return reply;
}
public Pair<ObjectHeader, JObjectDataP> getSpecificObject(UUID host, String name, boolean forConflict) {
public Pair<ObjectHeader, JObjectDataP> getSpecificObject(UUID host, String name) {
return rpcClientFactory.withObjSyncClient(host, client -> {
var reply = getObjectImpl(client, name);
var reply = client.getObject(GetObjectRequest.newBuilder().setSelfUuid(persistentRemoteHostsService.getSelfUuid().toString()).setName(name).build());
return Pair.of(reply.getObject().getHeader(), reply.getObject().getContent());
});
}
public void pushConfirmedPushedMoves(UUID host, Collection<MoveDummyEntry> entries) {
rpcClientFactory.withObjSyncClient(host, client -> {
var builder = ConfirmPushedMoveRequest.newBuilder().setSelfUuid(persistentRemoteHostsService.getSelfUuid().toString());
for (var e : entries)
builder.addConfirmedMoves(PushedMoveConfirm.newBuilder().setParent(e.parent()).setKid(e.child()).build());
client.confirmPushedMove(builder.build());
return null;
});
}
public JObjectDataP getObject(JObject<?> jObject) {
jObject.assertRWLock();
@@ -92,7 +63,7 @@ public class RemoteObjectServiceClient {
Log.info("Downloading object " + jObject.getName() + " from " + targets.stream().map(UUID::toString).collect(Collectors.joining(", ")));
return rpcClientFactory.withObjSyncClient(targets, client -> {
var reply = getObjectImpl(client, jObject.getName());
var reply = client.getObject(GetObjectRequest.newBuilder().setSelfUuid(persistentRemoteHostsService.getSelfUuid().toString()).setName(jObject.getName()).build());
var receivedMap = new HashMap<UUID, Long>();
for (var e : reply.getObject().getHeader().getChangelog().getEntriesList()) {

View File

@@ -3,12 +3,9 @@ package com.usatiuk.dhfs.objects.repository;
import com.usatiuk.dhfs.objects.jrepository.DeletedObjectAccessException;
import com.usatiuk.dhfs.objects.jrepository.JObject;
import com.usatiuk.dhfs.objects.jrepository.JObjectManager;
import com.usatiuk.dhfs.objects.jrepository.PushResolution;
import com.usatiuk.dhfs.objects.persistence.JObjectDataP;
import com.usatiuk.dhfs.objects.protoserializer.ProtoSerializerService;
import com.usatiuk.dhfs.objects.repository.autosync.AutoSyncProcessor;
import com.usatiuk.dhfs.objects.repository.movedummies.MoveDummyEntry;
import com.usatiuk.dhfs.objects.repository.movedummies.MoveDummyRegistry;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.quarkus.grpc.GrpcService;
@@ -17,7 +14,6 @@ import io.smallrye.common.annotation.Blocking;
import io.smallrye.mutiny.Uni;
import jakarta.annotation.security.RolesAllowed;
import jakarta.inject.Inject;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.commons.lang3.tuple.Pair;
import java.util.UUID;
@@ -47,9 +43,6 @@ public class RemoteObjectServiceServer implements DhfsObjectSyncGrpc {
@Inject
ProtoSerializerService protoSerializerService;
@Inject
MoveDummyRegistry moveDummyRegistry;
@Override
@Blocking
public Uni<GetObjectReply> getObject(GetObjectRequest request) {
@@ -59,60 +52,6 @@ public class RemoteObjectServiceServer implements DhfsObjectSyncGrpc {
Log.info("<-- getObject: " + request.getName() + " from " + request.getSelfUuid());
var pushedMoves = request.getForConflict() ? null : moveDummyRegistry.withPendingForHost(UUID.fromString(request.getSelfUuid()), pm -> {
if (pm.isEmpty()) return null;
var builder = PushedMoves.newBuilder();
// FIXME:
int count = 0;
var it = pm.iterator();
while (it.hasNext()) {
count++;
if (count > 100) break;
var next = it.next();
var obj = jObjectManager.get(next.parent());
if (obj.isEmpty()) {
it.remove();
continue;
}
ObjectHeader header;
try {
header = obj.get().runReadLocked(JObject.ResolutionStrategy.NO_RESOLUTION, (m, d) -> {
if (m.getKnownClass().isAnnotationPresent(PushResolution.class))
throw new NotImplementedException();
return m.toRpcHeader();
});
} catch (DeletedObjectAccessException e) {
it.remove();
continue;
}
builder.addPushedMoves(PushedMove.newBuilder()
.setParent(header)
.setKid(next.child())
.build());
}
return builder.build();
});
var replyBuilder = GetObjectReply.newBuilder()
.setSelfUuid(persistentRemoteHostsService.getSelfUuid().toString());
if (pushedMoves != null) {
replyBuilder.setPushedMoves(pushedMoves);
if (pushedMoves.getPushedMovesCount() >= 100)
return Uni.createFrom().item(replyBuilder.build());
}
var obj = jObjectManager.get(request.getName()).orElseThrow(() -> new StatusRuntimeException(Status.NOT_FOUND));
Pair<ObjectHeader, JObjectDataP> read = obj.runReadLocked(JObject.ResolutionStrategy.LOCAL_ONLY, (meta, data) -> {
@@ -125,7 +64,9 @@ public class RemoteObjectServiceServer implements DhfsObjectSyncGrpc {
});
obj.markSeen();
var replyObj = ApiObject.newBuilder().setHeader(read.getLeft()).setContent(read.getRight()).build();
return Uni.createFrom().item(replyBuilder.setObject(replyObj).build());
return Uni.createFrom().item(GetObjectReply.newBuilder()
.setSelfUuid(persistentRemoteHostsService.getSelfUuid().toString())
.setObject(replyObj).build());
}
@Override
@@ -168,15 +109,6 @@ public class RemoteObjectServiceServer implements DhfsObjectSyncGrpc {
return Uni.createFrom().item(ret);
}
@Override
public Uni<ConfirmPushedMoveReply> confirmPushedMove(ConfirmPushedMoveRequest request) {
Log.info("<-- confirmPushedMove: from " + request.getSelfUuid());
for (var m : request.getConfirmedMovesList())
moveDummyRegistry.commitForHost(UUID.fromString(request.getSelfUuid()), new MoveDummyEntry(m.getParent(), m.getKid()));
return Uni.createFrom().item(ConfirmPushedMoveReply.getDefaultInstance());
}
@Override
@Blocking
public Uni<GetIndexReply> getIndex(GetIndexRequest request) {

View File

@@ -133,7 +133,7 @@ public class SyncHandler {
theirsHeader = header;
theirsData = protoSerializerService.deserialize(header.getPushedData());
} else {
var got = remoteObjectServiceClient.getSpecificObject(from, header.getName(), true);
var got = remoteObjectServiceClient.getSpecificObject(from, header.getName());
theirsData = protoSerializerService.deserialize(got.getRight());
theirsHeader = got.getLeft();
}
@@ -165,18 +165,6 @@ public class SyncHandler {
return IndexUpdateReply.getDefaultInstance();
}
public static class OutdatedUpdateException extends RuntimeException {
OutdatedUpdateException() {
super();
}
OutdatedUpdateException(String message) {
super(message);
}
@Override
public synchronized Throwable fillInStackTrace() {
return this;
}
protected static class OutdatedUpdateException extends RuntimeException {
}
}

View File

@@ -1,4 +0,0 @@
package com.usatiuk.dhfs.objects.repository.movedummies;
public record MoveDummyEntry(String parent, String child) {
}

View File

@@ -1,90 +0,0 @@
package com.usatiuk.dhfs.objects.repository.movedummies;
import com.usatiuk.dhfs.objects.jrepository.JObject;
import com.usatiuk.dhfs.objects.jrepository.JObjectData;
import com.usatiuk.dhfs.objects.jrepository.JObjectManager;
import com.usatiuk.dhfs.objects.repository.ObjectHeader;
import com.usatiuk.dhfs.objects.repository.PushedMoves;
import com.usatiuk.dhfs.objects.repository.RemoteObjectServiceClient;
import com.usatiuk.dhfs.objects.repository.SyncHandler;
import io.quarkus.logging.Log;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Optional;
import java.util.UUID;
@ApplicationScoped
public class MoveDummyProcessor {
@Inject
RemoteObjectServiceClient remoteObjectServiceClient;
@Inject
JObjectManager jObjectManager;
@Inject
SyncHandler syncHandler;
private void ensureDummyLink(JObject<?> from, String to) {
from.assertRWLock();
if (from.hasLocalCopy()) {
from.tryResolve(JObject.ResolutionStrategy.LOCAL_ONLY);
if (!from.getData().extractRefs().contains(to))
throw new IllegalStateException("Pushed move has actual copy but no ref " + from.getName() + "->" + to);
return;
}
var toObj = jObjectManager.get(to).orElse(null);
if (toObj == null) return;
toObj.rwLock();
try {
if (toObj.getMeta().isDeleted())
Log.warn("Pushed move but object is already deleted: " + from.getName() + "->" + to);
toObj.markSeen(); // TODO: Is this needed?
from.markSeen();
var meta = from.getMeta();
if (meta.getSavedRefs() == null) meta.setSavedRefs(new HashSet<>());
meta.getSavedRefs().add(to);
meta.setMoveDummy(true);
toObj.getMeta().addRef(from.getName());
} finally {
toObj.rwUnlock();
}
}
private void processPushedMove(UUID from, ObjectHeader parent, String kid) {
var obj = jObjectManager.getOrPutLocked(parent.getName(), JObjectData.class, Optional.empty());
try {
if (Log.isTraceEnabled())
Log.trace("Processing pushed move from " + from + " " + parent.getName() + "->" + kid);
syncHandler.handleOneUpdate(from, parent);
ensureDummyLink(obj, kid);
if (Log.isTraceEnabled())
Log.trace("Processed pushed move from " + from + " " + parent.getName() + "->" + kid);
} finally {
obj.rwUnlock();
}
}
public void processPushedMoves(UUID from, PushedMoves moves) {
ArrayList<MoveDummyEntry> processed = new ArrayList<>();
for (var m : moves.getPushedMovesList()) {
try {
processPushedMove(from, m.getParent(), m.getKid());
} catch (Exception e) {
Log.error("Error when processing pushed move " + m.getParent().getName() + "->" + m.getKid() + " from " + from, e);
continue;
}
processed.add(new MoveDummyEntry(m.getParent().getName(), m.getKid()));
}
remoteObjectServiceClient.pushConfirmedPushedMoves(from, processed);
}
}

View File

@@ -1,95 +0,0 @@
package com.usatiuk.dhfs.objects.repository.movedummies;
import com.usatiuk.dhfs.SerializationHelper;
import com.usatiuk.dhfs.objects.jrepository.JObject;
import com.usatiuk.dhfs.objects.repository.PersistentRemoteHostsService;
import io.quarkus.logging.Log;
import io.quarkus.runtime.ShutdownEvent;
import io.quarkus.runtime.StartupEvent;
import jakarta.annotation.Priority;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
import jakarta.inject.Inject;
import org.apache.commons.lang3.SerializationUtils;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Collection;
import java.util.UUID;
import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
@ApplicationScoped
public class MoveDummyRegistry {
@Inject
PersistentRemoteHostsService persistentRemoteHostsService;
@ConfigProperty(name = "dhfs.objects.root")
String dataRoot;
private static final String dataFileName = "movedummies";
// FIXME: DB when?
private MoveDummyRegistryData _persistentData = new MoveDummyRegistryData();
void init(@Observes @Priority(300) StartupEvent event) throws IOException {
Paths.get(dataRoot).toFile().mkdirs();
Log.info("Initializing with root " + dataRoot);
if (Paths.get(dataRoot).resolve(dataFileName).toFile().exists()) {
Log.info("Reading move dummy data queue");
_persistentData = SerializationHelper.deserialize(Files.readAllBytes(Paths.get(dataRoot).resolve(dataFileName)));
} else if (Paths.get(dataRoot).resolve(dataFileName + ".bak").toFile().exists()) {
Log.warn("Reading move dummy data from backup");
_persistentData = SerializationHelper.deserialize(Files.readAllBytes(Paths.get(dataRoot).resolve(dataFileName)));
}
// TODO: Handle unclean shutdowns...
}
void shutdown(@Observes @Priority(300) ShutdownEvent event) throws IOException {
Log.info("Saving move dummy data");
writeData();
Log.info("Saved move dummy data");
}
private void writeData() {
try {
if (Paths.get(dataRoot).resolve(dataFileName).toFile().exists())
Files.move(Paths.get(dataRoot).resolve(dataFileName), Paths.get(dataRoot).resolve(dataFileName + ".bak"), REPLACE_EXISTING);
Files.write(Paths.get(dataRoot).resolve(dataFileName), SerializationUtils.serialize(_persistentData));
} catch (IOException iex) {
Log.error("Error writing deferred invalidations data", iex);
throw new RuntimeException(iex);
}
}
public void registerMovedRef(JObject<?> parent, String child) {
parent.assertRWLock();
if (Log.isTraceEnabled())
Log.trace("Registered moved ref " + parent.getName() + "->" + child);
for (var host : persistentRemoteHostsService.getHostsUuid()) {
synchronized (this) {
_persistentData.getMoveDummiesPending().put(host, new MoveDummyEntry(parent.getName(), child));
}
}
}
@FunctionalInterface
public interface MoveDummyForHostFn<R> {
R apply(Collection<MoveDummyEntry> hostsData);
}
public <T> T withPendingForHost(UUID host, MoveDummyForHostFn<T> fn) {
synchronized (this) {
return fn.apply(_persistentData.getMoveDummiesPending().get(host));
}
}
public void commitForHost(UUID host, MoveDummyEntry entry) {
if (Log.isTraceEnabled())
Log.trace("Committing pushed move from " + host + " " + entry.parent() + "->" + entry.child());
synchronized (this) {
_persistentData.getMoveDummiesPending().get(host).remove(entry);
}
}
}

View File

@@ -1,17 +0,0 @@
package com.usatiuk.dhfs.objects.repository.movedummies;
import lombok.Getter;
import org.apache.commons.collections4.MultiValuedMap;
import org.apache.commons.collections4.multimap.HashSetValuedHashMap;
import java.io.Serial;
import java.io.Serializable;
import java.util.UUID;
public class MoveDummyRegistryData implements Serializable {
@Serial
private static final long serialVersionUID = 1L;
@Getter
private final MultiValuedMap<UUID, MoveDummyEntry> _moveDummiesPending = new HashSetValuedHashMap<>();
}

View File

@@ -18,7 +18,6 @@ message ObjectMetadataP {
repeated string savedRefs = 9;
bool locked = 10;
bool haveLocalCopy = 11;
bool isMoveDummy = 12;
}
message FsNodeP {

View File

@@ -10,12 +10,10 @@ package dhfs.objects.sync;
service DhfsObjectSyncGrpc {
rpc GetObject (GetObjectRequest) returns (GetObjectReply) {}
rpc CanDelete (CanDeleteRequest) returns (CanDeleteReply) {}
rpc GetIndex (GetIndexRequest) returns (GetIndexReply) {}
rpc IndexUpdate (IndexUpdatePush) returns (IndexUpdateReply) {}
rpc CanDelete (CanDeleteRequest) returns (CanDeleteReply) {}
rpc ConfirmPushedMove (ConfirmPushedMoveRequest) returns (ConfirmPushedMoveReply) {}
rpc Ping (PingRequest) returns (PingReply) {}
}
@@ -51,39 +49,12 @@ message GetObjectRequest {
string selfUuid = 10;
string name = 2;
bool forConflict = 3;
}
//FIXME:
message PushedMove {
ObjectHeader parent = 1;
string kid = 2;
}
message PushedMoves {
repeated PushedMove pushedMoves = 2;
}
message GetObjectReply {
string selfUuid = 10;
optional ApiObject object = 1;
optional PushedMoves pushedMoves = 2;
}
message PushedMoveConfirm {
string parent = 1;
string kid = 2;
}
message ConfirmPushedMoveRequest {
string selfUuid = 10;
repeated PushedMoveConfirm confirmedMoves = 1;
}
message ConfirmPushedMoveReply {
ApiObject object = 1;
}
message CanDeleteRequest {

View File

@@ -172,7 +172,6 @@ public class DhfsFuseIT {
}
@Test
@Disabled // TODO: remove it?
void deleteDelayedTest() throws IOException, InterruptedException, TimeoutException {
Assertions.assertEquals(0, container1.execInContainer("/bin/sh", "-c", "echo tesempty > /root/dhfs_default/fuse/testf1").getExitCode());
Thread.sleep(1000);