mirror of
https://github.com/usatiuk/dhfs.git
synced 2025-10-29 04:57:48 +01:00
Compare commits
2 Commits
256f21a6e7
...
69aa61b98a
| Author | SHA1 | Date | |
|---|---|---|---|
| 69aa61b98a | |||
| 3333151677 |
@@ -10,7 +10,6 @@ import com.usatiuk.dhfs.objects.jrepository.JObjectManager;
|
||||
import com.usatiuk.dhfs.objects.repository.ConflictResolver;
|
||||
import com.usatiuk.dhfs.objects.repository.ObjectHeader;
|
||||
import com.usatiuk.dhfs.objects.repository.PersistentRemoteHostsService;
|
||||
import com.usatiuk.dhfs.objects.repository.movedummies.MoveDummyRegistry;
|
||||
import io.grpc.Status;
|
||||
import io.grpc.StatusRuntimeException;
|
||||
import io.quarkus.logging.Log;
|
||||
@@ -30,9 +29,6 @@ public class FileConflictResolver implements ConflictResolver {
|
||||
@Inject
|
||||
JObjectManager jObjectManager;
|
||||
|
||||
@Inject
|
||||
MoveDummyRegistry moveDummyRegistry;
|
||||
|
||||
@ConfigProperty(name = "dhfs.files.use_hash_for_chunks")
|
||||
boolean useHashForChunks;
|
||||
|
||||
@@ -116,7 +112,6 @@ public class FileConflictResolver implements ConflictResolver {
|
||||
oursFile.getChunks().put(e.getLeft(), e.getValue());
|
||||
jObjectManager.getOrPut(ChunkData.getNameFromHash(e.getValue()), ChunkData.class, Optional.of(ChunkInfo.getNameFromHash(e.getValue())));
|
||||
jObjectManager.getOrPut(ChunkInfo.getNameFromHash(e.getValue()), ChunkInfo.class, Optional.of(oursFile.getName()));
|
||||
moveDummyRegistry.registerMovedRef(ours, ChunkInfo.getNameFromHash(e.getValue()));
|
||||
}
|
||||
HashSet<String> oursNew = new HashSet<>(oursFile.getChunks().values());
|
||||
|
||||
@@ -127,32 +122,30 @@ public class FileConflictResolver implements ConflictResolver {
|
||||
|
||||
newFile.setMtime(second.getMtime());
|
||||
newFile.setCtime(second.getCtime());
|
||||
var newFileJ = jObjectManager.putLocked(newFile, Optional.of(_oursDir.getName()));
|
||||
try {
|
||||
for (var e : secondChunksCopy) {
|
||||
newFile.getChunks().put(e.getLeft(), e.getValue());
|
||||
jObjectManager.getOrPut(ChunkData.getNameFromHash(e.getValue()), ChunkData.class, Optional.of(ChunkInfo.getNameFromHash(e.getValue())));
|
||||
jObjectManager.getOrPut(ChunkInfo.getNameFromHash(e.getValue()), ChunkInfo.class, Optional.ofNullable(newFile.getName()));
|
||||
moveDummyRegistry.registerMovedRef(newFileJ, ChunkInfo.getNameFromHash(e.getValue())); // TODO: This probably could be optimized
|
||||
}
|
||||
|
||||
var theName = oursDir.getChildren().entrySet().stream()
|
||||
.filter(p -> p.getValue().equals(oursFile.getUuid())).findAny()
|
||||
.orElseThrow(() -> new NotImplementedException("Could not find our file in directory " + oursDir.getName()));
|
||||
|
||||
int i = 0;
|
||||
do {
|
||||
String name = theName.getKey() + ".conflict." + i + "." + otherHostname;
|
||||
if (oursDir.getChildren().containsKey(name)) {
|
||||
i++;
|
||||
continue;
|
||||
}
|
||||
oursDir.getChildren().put(name, newFile.getUuid());
|
||||
break;
|
||||
} while (true);
|
||||
} finally {
|
||||
newFileJ.rwUnlock();
|
||||
for (var e : secondChunksCopy) {
|
||||
newFile.getChunks().put(e.getLeft(), e.getValue());
|
||||
jObjectManager.getOrPut(ChunkData.getNameFromHash(e.getValue()), ChunkData.class, Optional.of(ChunkInfo.getNameFromHash(e.getValue())));
|
||||
jObjectManager.getOrPut(ChunkInfo.getNameFromHash(e.getValue()), ChunkInfo.class, Optional.ofNullable(newFile.getName()));
|
||||
}
|
||||
|
||||
var theName = oursDir.getChildren().entrySet().stream()
|
||||
.filter(p -> p.getValue().equals(oursFile.getUuid())).findAny()
|
||||
.orElseThrow(() -> new NotImplementedException("Could not find our file in directory " + oursDir.getName()));
|
||||
|
||||
jObjectManager.put(newFile, Optional.of(_oursDir.getName()));
|
||||
|
||||
int i = 0;
|
||||
do {
|
||||
String name = theName.getKey() + ".conflict." + i + "." + otherHostname;
|
||||
if (oursDir.getChildren().containsKey(name)) {
|
||||
i++;
|
||||
continue;
|
||||
}
|
||||
oursDir.getChildren().put(name, newFile.getUuid());
|
||||
break;
|
||||
} while (true);
|
||||
|
||||
bumpDir.apply();
|
||||
|
||||
for (var cuuid : oursBackup) {
|
||||
|
||||
@@ -6,7 +6,6 @@ import com.usatiuk.dhfs.files.objects.*;
|
||||
import com.usatiuk.dhfs.objects.jrepository.JObject;
|
||||
import com.usatiuk.dhfs.objects.jrepository.JObjectManager;
|
||||
import com.usatiuk.dhfs.objects.repository.PersistentRemoteHostsService;
|
||||
import com.usatiuk.dhfs.objects.repository.movedummies.MoveDummyRegistry;
|
||||
import com.usatiuk.utils.StatusRuntimeExceptionNoStacktrace;
|
||||
import io.grpc.Status;
|
||||
import io.grpc.StatusRuntimeException;
|
||||
@@ -32,9 +31,6 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
@Inject
|
||||
JObjectManager jObjectManager;
|
||||
|
||||
@Inject
|
||||
MoveDummyRegistry moveDummyRegistry;
|
||||
|
||||
@ConfigProperty(name = "dhfs.files.target_chunk_size")
|
||||
int targetChunkSize;
|
||||
|
||||
@@ -356,28 +352,24 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
newDent = d;
|
||||
theFile.getMeta().removeRef(dentFrom.getName());
|
||||
theFile.getMeta().addRef(dentTo.getName());
|
||||
moveDummyRegistry.registerMovedRef(dentTo, newDent.getName());
|
||||
} else if (theFile.getData() instanceof File f) {
|
||||
var newFile = new File(UUID.randomUUID(), f.getMode(), UUID.fromString(dentTo.getName()), f.isSymlink());
|
||||
newFile.setMtime(f.getMtime());
|
||||
newFile.setCtime(f.getCtime());
|
||||
newFile.getChunks().putAll(f.getChunks());
|
||||
var newFileJ = jObjectManager.putLocked(newFile, Optional.of(dentTo.getName()));
|
||||
try {
|
||||
for (var c : newFile.getChunks().values()) {
|
||||
var o = jObjectManager.get(ChunkInfo.getNameFromHash(c))
|
||||
.orElseThrow(() -> new StatusRuntimeException(Status.DATA_LOSS.withDescription("Could not find chunk " + c + " when moving " + from)));
|
||||
o.runWriteLocked(JObject.ResolutionStrategy.NO_RESOLUTION, (m, d, b, i) -> {
|
||||
m.addRef(newFile.getName());
|
||||
moveDummyRegistry.registerMovedRef(newFileJ, m.getName());
|
||||
return null;
|
||||
});
|
||||
}
|
||||
theFile.getMeta().removeRef(dentFrom.getName());
|
||||
newDent = newFile;
|
||||
} finally {
|
||||
newFileJ.rwUnlock();
|
||||
|
||||
for (var c : newFile.getChunks().values()) {
|
||||
var o = jObjectManager.get(ChunkInfo.getNameFromHash(c))
|
||||
.orElseThrow(() -> new StatusRuntimeException(Status.DATA_LOSS.withDescription("Could not find chunk " + c + " when moving " + from)));
|
||||
o.runWriteLocked(JObject.ResolutionStrategy.NO_RESOLUTION, (m, d, b, i) -> {
|
||||
m.addRef(newFile.getName());
|
||||
return null;
|
||||
});
|
||||
}
|
||||
|
||||
theFile.getMeta().removeRef(dentFrom.getName());
|
||||
jObjectManager.put(newFile, Optional.of(dentTo.getName()));
|
||||
newDent = newFile;
|
||||
} else {
|
||||
throw new StatusRuntimeException(Status.ABORTED.withDescription(theFile.getName() + " is of unknown type"));
|
||||
}
|
||||
|
||||
@@ -8,10 +8,7 @@ import io.quarkus.logging.Log;
|
||||
import jakarta.annotation.Nullable;
|
||||
import org.apache.commons.lang3.NotImplementedException;
|
||||
|
||||
import java.util.Comparator;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
@@ -29,7 +26,6 @@ public class JObject<T extends JObjectData> {
|
||||
final int metaHash;
|
||||
final int externalHash;
|
||||
final boolean data;
|
||||
boolean forceInvalidate = false;
|
||||
final HashSet<String> oldRefs;
|
||||
|
||||
TransactionState() {
|
||||
@@ -49,7 +45,7 @@ public class JObject<T extends JObjectData> {
|
||||
}
|
||||
}
|
||||
|
||||
void commit() {
|
||||
void commit(boolean forceInvalidate) {
|
||||
_resolver.updateDeletionState(JObject.this);
|
||||
|
||||
var newDataHash = _metaPart.dataHash();
|
||||
@@ -70,10 +66,6 @@ public class JObject<T extends JObjectData> {
|
||||
|
||||
verifyRefs(oldRefs);
|
||||
}
|
||||
|
||||
void forceInvalidate() {
|
||||
forceInvalidate = true;
|
||||
}
|
||||
}
|
||||
|
||||
private TransactionState _transactionState = null;
|
||||
@@ -249,15 +241,14 @@ public class JObject<T extends JObjectData> {
|
||||
_lock.readLock().unlock();
|
||||
}
|
||||
|
||||
protected void forceInvalidate() {
|
||||
assertRWLock();
|
||||
_transactionState.forceInvalidate();
|
||||
public void rwUnlock() {
|
||||
rwUnlock(false);
|
||||
}
|
||||
|
||||
public void rwUnlock() {
|
||||
public void rwUnlock(boolean forceInvalidate) {
|
||||
try {
|
||||
if (_lock.writeLock().getHoldCount() == 1) {
|
||||
_transactionState.commit();
|
||||
_transactionState.commit(forceInvalidate);
|
||||
_transactionState = null;
|
||||
}
|
||||
} catch (Exception ex) {
|
||||
@@ -267,12 +258,8 @@ public class JObject<T extends JObjectData> {
|
||||
}
|
||||
}
|
||||
|
||||
public boolean haveRwLock() {
|
||||
return _lock.isWriteLockedByCurrentThread();
|
||||
}
|
||||
|
||||
public void assertRWLock() {
|
||||
if (!haveRwLock())
|
||||
if (!_lock.isWriteLockedByCurrentThread())
|
||||
throw new IllegalStateException("Expected to be write-locked there: " + getName() + " " + Thread.currentThread().getName());
|
||||
}
|
||||
|
||||
@@ -341,7 +328,7 @@ public class JObject<T extends JObjectData> {
|
||||
throw new IllegalStateException("Expected to be deleted when discarding data");
|
||||
_dataPart.set(null);
|
||||
_metaPart.setHaveLocalCopy(false);
|
||||
_metaPart.setSavedRefs(new HashSet<>());
|
||||
_metaPart.setSavedRefs(Collections.emptySet());
|
||||
}
|
||||
|
||||
public enum ResolutionStrategy {
|
||||
|
||||
@@ -11,10 +11,8 @@ public interface JObjectManager {
|
||||
|
||||
// Put a new object
|
||||
<T extends JObjectData> JObject<T> put(T object, Optional<String> parent);
|
||||
<T extends JObjectData> JObject<T> putLocked(T object, Optional<String> parent);
|
||||
|
||||
// Get an object with a name if it exists, otherwise create new one based on metadata
|
||||
// Should be used when working with objects referenced from the outside
|
||||
JObject<?> getOrPut(String name, Class<? extends JObjectData> klass, Optional<String> parent);
|
||||
JObject<?> getOrPutLocked(String name, Class<? extends JObjectData> klass, Optional<String> parent);
|
||||
}
|
||||
|
||||
@@ -121,7 +121,7 @@ public class JObjectManagerImpl implements JObjectManager {
|
||||
}
|
||||
|
||||
@Override
|
||||
public <D extends JObjectData> JObject<D> putLocked(D object, Optional<String> parent) {
|
||||
public <D extends JObjectData> JObject<D> put(D object, Optional<String> parent) {
|
||||
while (true) {
|
||||
JObject<?> ret;
|
||||
JObject<?> newObj = null;
|
||||
@@ -161,30 +161,20 @@ public class JObjectManagerImpl implements JObjectManager {
|
||||
});
|
||||
} finally {
|
||||
if (newObj != null)
|
||||
newObj.forceInvalidate();
|
||||
newObj.rwUnlock(true);
|
||||
}
|
||||
if (newObj == null) {
|
||||
if (newObj == null)
|
||||
jObjectLRU.notifyAccess(ret);
|
||||
ret.rwLock();
|
||||
}
|
||||
return (JObject<D>) ret;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public <D extends JObjectData> JObject<D> put(D object, Optional<String> parent) {
|
||||
var ret = putLocked(object, parent);
|
||||
ret.rwUnlock();
|
||||
return ret;
|
||||
}
|
||||
|
||||
@Override
|
||||
public JObject<?> getOrPutLocked(String name, Class<? extends JObjectData> klass, Optional<String> parent) {
|
||||
public JObject<?> getOrPut(String name, Class<? extends JObjectData> klass, Optional<String> parent) {
|
||||
while (true) {
|
||||
var got = get(name);
|
||||
|
||||
if (got.isPresent()) {
|
||||
got.get().rwLock();
|
||||
got.get().narrowClass(klass);
|
||||
got.get().markSeen();
|
||||
parent.ifPresent(s -> got.get().runWriteLocked(JObject.ResolutionStrategy.NO_RESOLUTION, (m, d, b, i) -> {
|
||||
@@ -200,29 +190,26 @@ public class JObjectManagerImpl implements JObjectManager {
|
||||
JObject<?> ret = null;
|
||||
var created = new JObject<>(jObjectResolver, new ObjectMetadata(name, false, klass));
|
||||
created.rwLock();
|
||||
while (ret == null) {
|
||||
var ref = _map.computeIfAbsent(name, k -> new NamedWeakReference(created, _refQueue));
|
||||
if (ref.get() == null) _map.remove(name, ref);
|
||||
else ret = ref.get();
|
||||
}
|
||||
if (ret != created) continue;
|
||||
try {
|
||||
while (ret == null) {
|
||||
var ref = _map.computeIfAbsent(name, k -> new NamedWeakReference(created, _refQueue));
|
||||
if (ref.get() == null) _map.remove(name, ref);
|
||||
else ret = ref.get();
|
||||
}
|
||||
if (ret != created) continue;
|
||||
|
||||
created.runWriteLocked(JObject.ResolutionStrategy.NO_RESOLUTION, (m, d, b, i) -> {
|
||||
parent.ifPresent(m::addRef);
|
||||
m.markSeen();
|
||||
return null;
|
||||
});
|
||||
created.runWriteLocked(JObject.ResolutionStrategy.NO_RESOLUTION, (m, d, b, i) -> {
|
||||
parent.ifPresent(m::addRef);
|
||||
m.markSeen();
|
||||
return null;
|
||||
});
|
||||
} finally {
|
||||
created.rwUnlock();
|
||||
}
|
||||
return created;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public JObject<?> getOrPut(String name, Class<? extends JObjectData> klass, Optional<String> parent) {
|
||||
var obj = getOrPutLocked(name, klass, parent);
|
||||
obj.rwUnlock();
|
||||
return obj;
|
||||
}
|
||||
|
||||
private static class NamedWeakReference extends WeakReference<JObject<?>> {
|
||||
@Getter
|
||||
final String _key;
|
||||
|
||||
@@ -173,7 +173,7 @@ public class JObjectRefProcessor {
|
||||
if (m.isLocked()) return null;
|
||||
if (m.isDeleted()) return null;
|
||||
if (!m.isDeletionCandidate()) return null;
|
||||
if (m.isSeen() && m.carefulDeletion()) {
|
||||
if (m.isSeen() && m.getKnownClass().isAnnotationPresent(Movable.class)) {
|
||||
if (!processMovable(got))
|
||||
return null;
|
||||
}
|
||||
|
||||
@@ -121,7 +121,7 @@ public class JObjectResolver {
|
||||
|
||||
refs.forEach(r -> {
|
||||
Log.trace("Hydrating ref after undelete " + r + " for " + self.getName());
|
||||
jObjectManager.getOrPut(r, self.getData() != null ? self.getData().getRefType() : JObjectData.class, Optional.of(self.getName()));
|
||||
jObjectManager.getOrPut(r, self.getData().getRefType(), Optional.of(self.getName()));
|
||||
});
|
||||
|
||||
}
|
||||
|
||||
@@ -7,6 +7,7 @@ import com.usatiuk.dhfs.objects.repository.ObjectHeader;
|
||||
import io.grpc.Status;
|
||||
import io.grpc.StatusRuntimeException;
|
||||
import io.quarkus.logging.Log;
|
||||
import io.vertx.core.impl.ConcurrentHashSet;
|
||||
import lombok.Getter;
|
||||
import lombok.Setter;
|
||||
|
||||
@@ -15,6 +16,8 @@ import java.io.ObjectInputStream;
|
||||
import java.io.Serial;
|
||||
import java.io.Serializable;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.ConcurrentSkipListSet;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
public class ObjectMetadata implements Serializable {
|
||||
@@ -37,14 +40,11 @@ public class ObjectMetadata implements Serializable {
|
||||
private Map<UUID, Long> _changelog = new LinkedHashMap<>(4);
|
||||
@Getter
|
||||
@Setter
|
||||
private Set<String> _savedRefs = new HashSet<>(4);
|
||||
private Set<String> _savedRefs = Collections.emptySet();
|
||||
@Getter
|
||||
private boolean _locked = false;
|
||||
@Getter
|
||||
@Setter
|
||||
private volatile boolean _isMoveDummy = false;
|
||||
@Getter
|
||||
@Setter
|
||||
private volatile boolean _haveLocalCopy = false;
|
||||
@Getter
|
||||
private transient volatile boolean _written = true;
|
||||
@@ -118,7 +118,6 @@ public class ObjectMetadata implements Serializable {
|
||||
public void addRef(String from) {
|
||||
_confirmedDeletes.clear();
|
||||
_referrers.add(from);
|
||||
_isMoveDummy = false;
|
||||
if (Log.isTraceEnabled())
|
||||
Log.trace("Adding ref " + from + " to " + getName());
|
||||
}
|
||||
@@ -185,10 +184,6 @@ public class ObjectMetadata implements Serializable {
|
||||
return headerBuilder.build();
|
||||
}
|
||||
|
||||
public boolean carefulDeletion() {
|
||||
return _isMoveDummy;
|
||||
}
|
||||
|
||||
public int metaHash() {
|
||||
int res = Objects.hashCode(_name);
|
||||
res = 31 * res + Objects.hashCode(isSeen());
|
||||
@@ -201,7 +196,6 @@ public class ObjectMetadata implements Serializable {
|
||||
res = 31 * res + Objects.hashCode(_remoteCopies);
|
||||
res = 31 * res + Objects.hashCode(_savedRefs);
|
||||
res = 31 * res + Objects.hashCode(_haveLocalCopy);
|
||||
res = 31 * res + Objects.hashCode(_isMoveDummy);
|
||||
return res;
|
||||
}
|
||||
|
||||
|
||||
@@ -29,7 +29,6 @@ public class ObjectMetadataSerializer implements ProtoSerializer<ObjectMetadataP
|
||||
.addAllSavedRefs(object.getSavedRefs() != null ? object.getSavedRefs() : Collections.emptyList())
|
||||
.setLocked(object.isLocked())
|
||||
.setHaveLocalCopy(object.isHaveLocalCopy())
|
||||
.setIsMoveDummy(object.isMoveDummy())
|
||||
.build();
|
||||
}
|
||||
|
||||
@@ -53,8 +52,6 @@ public class ObjectMetadataSerializer implements ProtoSerializer<ObjectMetadataP
|
||||
obj.lock();
|
||||
if (message.getHaveLocalCopy())
|
||||
obj.setHaveLocalCopy(true);
|
||||
if (message.getIsMoveDummy())
|
||||
obj.setMoveDummy(true);
|
||||
|
||||
return obj;
|
||||
} catch (ClassNotFoundException cx) {
|
||||
|
||||
@@ -6,8 +6,6 @@ import com.usatiuk.dhfs.objects.jrepository.JObjectManager;
|
||||
import com.usatiuk.dhfs.objects.jrepository.PushResolution;
|
||||
import com.usatiuk.dhfs.objects.persistence.JObjectDataP;
|
||||
import com.usatiuk.dhfs.objects.protoserializer.ProtoSerializerService;
|
||||
import com.usatiuk.dhfs.objects.repository.movedummies.MoveDummyEntry;
|
||||
import com.usatiuk.dhfs.objects.repository.movedummies.MoveDummyProcessor;
|
||||
import io.grpc.Status;
|
||||
import io.grpc.StatusRuntimeException;
|
||||
import io.quarkus.logging.Log;
|
||||
@@ -38,41 +36,14 @@ public class RemoteObjectServiceClient {
|
||||
InvalidationQueueService invalidationQueueService;
|
||||
@Inject
|
||||
ProtoSerializerService protoSerializerService;
|
||||
@Inject
|
||||
MoveDummyProcessor moveDummyProcessor;
|
||||
|
||||
private GetObjectReply getObjectImpl(DhfsObjectSyncGrpcGrpc.DhfsObjectSyncGrpcBlockingStub client, String name) {
|
||||
GetObjectReply reply;
|
||||
|
||||
do {
|
||||
reply = client.getObject(GetObjectRequest.newBuilder().setSelfUuid(persistentRemoteHostsService.getSelfUuid().toString()).setName(name).build());
|
||||
|
||||
if (!reply.hasObject() && !reply.hasPushedMoves())
|
||||
throw new IllegalStateException("Reply has neither pushed moves nor object!");
|
||||
|
||||
if (reply.hasPushedMoves())
|
||||
moveDummyProcessor.processPushedMoves(UUID.fromString(reply.getSelfUuid()), reply.getPushedMoves());
|
||||
} while (!reply.hasObject());
|
||||
return reply;
|
||||
}
|
||||
|
||||
public Pair<ObjectHeader, JObjectDataP> getSpecificObject(UUID host, String name, boolean forConflict) {
|
||||
public Pair<ObjectHeader, JObjectDataP> getSpecificObject(UUID host, String name) {
|
||||
return rpcClientFactory.withObjSyncClient(host, client -> {
|
||||
var reply = getObjectImpl(client, name);
|
||||
var reply = client.getObject(GetObjectRequest.newBuilder().setSelfUuid(persistentRemoteHostsService.getSelfUuid().toString()).setName(name).build());
|
||||
return Pair.of(reply.getObject().getHeader(), reply.getObject().getContent());
|
||||
});
|
||||
}
|
||||
|
||||
public void pushConfirmedPushedMoves(UUID host, Collection<MoveDummyEntry> entries) {
|
||||
rpcClientFactory.withObjSyncClient(host, client -> {
|
||||
var builder = ConfirmPushedMoveRequest.newBuilder().setSelfUuid(persistentRemoteHostsService.getSelfUuid().toString());
|
||||
for (var e : entries)
|
||||
builder.addConfirmedMoves(PushedMoveConfirm.newBuilder().setParent(e.parent()).setKid(e.child()).build());
|
||||
client.confirmPushedMove(builder.build());
|
||||
return null;
|
||||
});
|
||||
}
|
||||
|
||||
public JObjectDataP getObject(JObject<?> jObject) {
|
||||
jObject.assertRWLock();
|
||||
|
||||
@@ -92,7 +63,7 @@ public class RemoteObjectServiceClient {
|
||||
Log.info("Downloading object " + jObject.getName() + " from " + targets.stream().map(UUID::toString).collect(Collectors.joining(", ")));
|
||||
|
||||
return rpcClientFactory.withObjSyncClient(targets, client -> {
|
||||
var reply = getObjectImpl(client, jObject.getName());
|
||||
var reply = client.getObject(GetObjectRequest.newBuilder().setSelfUuid(persistentRemoteHostsService.getSelfUuid().toString()).setName(jObject.getName()).build());
|
||||
|
||||
var receivedMap = new HashMap<UUID, Long>();
|
||||
for (var e : reply.getObject().getHeader().getChangelog().getEntriesList()) {
|
||||
|
||||
@@ -3,12 +3,9 @@ package com.usatiuk.dhfs.objects.repository;
|
||||
import com.usatiuk.dhfs.objects.jrepository.DeletedObjectAccessException;
|
||||
import com.usatiuk.dhfs.objects.jrepository.JObject;
|
||||
import com.usatiuk.dhfs.objects.jrepository.JObjectManager;
|
||||
import com.usatiuk.dhfs.objects.jrepository.PushResolution;
|
||||
import com.usatiuk.dhfs.objects.persistence.JObjectDataP;
|
||||
import com.usatiuk.dhfs.objects.protoserializer.ProtoSerializerService;
|
||||
import com.usatiuk.dhfs.objects.repository.autosync.AutoSyncProcessor;
|
||||
import com.usatiuk.dhfs.objects.repository.movedummies.MoveDummyEntry;
|
||||
import com.usatiuk.dhfs.objects.repository.movedummies.MoveDummyRegistry;
|
||||
import io.grpc.Status;
|
||||
import io.grpc.StatusRuntimeException;
|
||||
import io.quarkus.grpc.GrpcService;
|
||||
@@ -17,7 +14,6 @@ import io.smallrye.common.annotation.Blocking;
|
||||
import io.smallrye.mutiny.Uni;
|
||||
import jakarta.annotation.security.RolesAllowed;
|
||||
import jakarta.inject.Inject;
|
||||
import org.apache.commons.lang3.NotImplementedException;
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
|
||||
import java.util.UUID;
|
||||
@@ -47,9 +43,6 @@ public class RemoteObjectServiceServer implements DhfsObjectSyncGrpc {
|
||||
@Inject
|
||||
ProtoSerializerService protoSerializerService;
|
||||
|
||||
@Inject
|
||||
MoveDummyRegistry moveDummyRegistry;
|
||||
|
||||
@Override
|
||||
@Blocking
|
||||
public Uni<GetObjectReply> getObject(GetObjectRequest request) {
|
||||
@@ -59,60 +52,6 @@ public class RemoteObjectServiceServer implements DhfsObjectSyncGrpc {
|
||||
|
||||
Log.info("<-- getObject: " + request.getName() + " from " + request.getSelfUuid());
|
||||
|
||||
var pushedMoves = request.getForConflict() ? null : moveDummyRegistry.withPendingForHost(UUID.fromString(request.getSelfUuid()), pm -> {
|
||||
if (pm.isEmpty()) return null;
|
||||
|
||||
var builder = PushedMoves.newBuilder();
|
||||
|
||||
// FIXME:
|
||||
int count = 0;
|
||||
var it = pm.iterator();
|
||||
while (it.hasNext()) {
|
||||
count++;
|
||||
if (count > 100) break;
|
||||
|
||||
var next = it.next();
|
||||
|
||||
var obj = jObjectManager.get(next.parent());
|
||||
|
||||
if (obj.isEmpty()) {
|
||||
it.remove();
|
||||
continue;
|
||||
}
|
||||
|
||||
ObjectHeader header;
|
||||
|
||||
try {
|
||||
header = obj.get().runReadLocked(JObject.ResolutionStrategy.NO_RESOLUTION, (m, d) -> {
|
||||
if (m.getKnownClass().isAnnotationPresent(PushResolution.class))
|
||||
throw new NotImplementedException();
|
||||
|
||||
return m.toRpcHeader();
|
||||
});
|
||||
} catch (DeletedObjectAccessException e) {
|
||||
it.remove();
|
||||
continue;
|
||||
}
|
||||
|
||||
builder.addPushedMoves(PushedMove.newBuilder()
|
||||
.setParent(header)
|
||||
.setKid(next.child())
|
||||
.build());
|
||||
}
|
||||
|
||||
return builder.build();
|
||||
});
|
||||
|
||||
var replyBuilder = GetObjectReply.newBuilder()
|
||||
.setSelfUuid(persistentRemoteHostsService.getSelfUuid().toString());
|
||||
|
||||
if (pushedMoves != null) {
|
||||
replyBuilder.setPushedMoves(pushedMoves);
|
||||
|
||||
if (pushedMoves.getPushedMovesCount() >= 100)
|
||||
return Uni.createFrom().item(replyBuilder.build());
|
||||
}
|
||||
|
||||
var obj = jObjectManager.get(request.getName()).orElseThrow(() -> new StatusRuntimeException(Status.NOT_FOUND));
|
||||
|
||||
Pair<ObjectHeader, JObjectDataP> read = obj.runReadLocked(JObject.ResolutionStrategy.LOCAL_ONLY, (meta, data) -> {
|
||||
@@ -125,7 +64,9 @@ public class RemoteObjectServiceServer implements DhfsObjectSyncGrpc {
|
||||
});
|
||||
obj.markSeen();
|
||||
var replyObj = ApiObject.newBuilder().setHeader(read.getLeft()).setContent(read.getRight()).build();
|
||||
return Uni.createFrom().item(replyBuilder.setObject(replyObj).build());
|
||||
return Uni.createFrom().item(GetObjectReply.newBuilder()
|
||||
.setSelfUuid(persistentRemoteHostsService.getSelfUuid().toString())
|
||||
.setObject(replyObj).build());
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -168,15 +109,6 @@ public class RemoteObjectServiceServer implements DhfsObjectSyncGrpc {
|
||||
return Uni.createFrom().item(ret);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Uni<ConfirmPushedMoveReply> confirmPushedMove(ConfirmPushedMoveRequest request) {
|
||||
Log.info("<-- confirmPushedMove: from " + request.getSelfUuid());
|
||||
for (var m : request.getConfirmedMovesList())
|
||||
moveDummyRegistry.commitForHost(UUID.fromString(request.getSelfUuid()), new MoveDummyEntry(m.getParent(), m.getKid()));
|
||||
|
||||
return Uni.createFrom().item(ConfirmPushedMoveReply.getDefaultInstance());
|
||||
}
|
||||
|
||||
@Override
|
||||
@Blocking
|
||||
public Uni<GetIndexReply> getIndex(GetIndexRequest request) {
|
||||
|
||||
@@ -133,7 +133,7 @@ public class SyncHandler {
|
||||
theirsHeader = header;
|
||||
theirsData = protoSerializerService.deserialize(header.getPushedData());
|
||||
} else {
|
||||
var got = remoteObjectServiceClient.getSpecificObject(from, header.getName(), true);
|
||||
var got = remoteObjectServiceClient.getSpecificObject(from, header.getName());
|
||||
theirsData = protoSerializerService.deserialize(got.getRight());
|
||||
theirsHeader = got.getLeft();
|
||||
}
|
||||
@@ -165,18 +165,6 @@ public class SyncHandler {
|
||||
return IndexUpdateReply.getDefaultInstance();
|
||||
}
|
||||
|
||||
public static class OutdatedUpdateException extends RuntimeException {
|
||||
OutdatedUpdateException() {
|
||||
super();
|
||||
}
|
||||
|
||||
OutdatedUpdateException(String message) {
|
||||
super(message);
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized Throwable fillInStackTrace() {
|
||||
return this;
|
||||
}
|
||||
protected static class OutdatedUpdateException extends RuntimeException {
|
||||
}
|
||||
}
|
||||
@@ -1,4 +0,0 @@
|
||||
package com.usatiuk.dhfs.objects.repository.movedummies;
|
||||
|
||||
public record MoveDummyEntry(String parent, String child) {
|
||||
}
|
||||
@@ -1,90 +0,0 @@
|
||||
package com.usatiuk.dhfs.objects.repository.movedummies;
|
||||
|
||||
import com.usatiuk.dhfs.objects.jrepository.JObject;
|
||||
import com.usatiuk.dhfs.objects.jrepository.JObjectData;
|
||||
import com.usatiuk.dhfs.objects.jrepository.JObjectManager;
|
||||
import com.usatiuk.dhfs.objects.repository.ObjectHeader;
|
||||
import com.usatiuk.dhfs.objects.repository.PushedMoves;
|
||||
import com.usatiuk.dhfs.objects.repository.RemoteObjectServiceClient;
|
||||
import com.usatiuk.dhfs.objects.repository.SyncHandler;
|
||||
import io.quarkus.logging.Log;
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
import jakarta.inject.Inject;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashSet;
|
||||
import java.util.Optional;
|
||||
import java.util.UUID;
|
||||
|
||||
@ApplicationScoped
|
||||
public class MoveDummyProcessor {
|
||||
@Inject
|
||||
RemoteObjectServiceClient remoteObjectServiceClient;
|
||||
@Inject
|
||||
JObjectManager jObjectManager;
|
||||
@Inject
|
||||
SyncHandler syncHandler;
|
||||
|
||||
private void ensureDummyLink(JObject<?> from, String to) {
|
||||
from.assertRWLock();
|
||||
|
||||
if (from.hasLocalCopy()) {
|
||||
from.tryResolve(JObject.ResolutionStrategy.LOCAL_ONLY);
|
||||
if (!from.getData().extractRefs().contains(to))
|
||||
throw new IllegalStateException("Pushed move has actual copy but no ref " + from.getName() + "->" + to);
|
||||
return;
|
||||
}
|
||||
|
||||
var toObj = jObjectManager.get(to).orElse(null);
|
||||
if (toObj == null) return;
|
||||
|
||||
toObj.rwLock();
|
||||
try {
|
||||
if (toObj.getMeta().isDeleted())
|
||||
Log.warn("Pushed move but object is already deleted: " + from.getName() + "->" + to);
|
||||
|
||||
toObj.markSeen(); // TODO: Is this needed?
|
||||
from.markSeen();
|
||||
|
||||
var meta = from.getMeta();
|
||||
if (meta.getSavedRefs() == null) meta.setSavedRefs(new HashSet<>());
|
||||
|
||||
meta.getSavedRefs().add(to);
|
||||
meta.setMoveDummy(true);
|
||||
|
||||
toObj.getMeta().addRef(from.getName());
|
||||
} finally {
|
||||
toObj.rwUnlock();
|
||||
}
|
||||
}
|
||||
|
||||
private void processPushedMove(UUID from, ObjectHeader parent, String kid) {
|
||||
var obj = jObjectManager.getOrPutLocked(parent.getName(), JObjectData.class, Optional.empty());
|
||||
try {
|
||||
if (Log.isTraceEnabled())
|
||||
Log.trace("Processing pushed move from " + from + " " + parent.getName() + "->" + kid);
|
||||
syncHandler.handleOneUpdate(from, parent);
|
||||
ensureDummyLink(obj, kid);
|
||||
if (Log.isTraceEnabled())
|
||||
Log.trace("Processed pushed move from " + from + " " + parent.getName() + "->" + kid);
|
||||
} finally {
|
||||
obj.rwUnlock();
|
||||
}
|
||||
}
|
||||
|
||||
public void processPushedMoves(UUID from, PushedMoves moves) {
|
||||
ArrayList<MoveDummyEntry> processed = new ArrayList<>();
|
||||
|
||||
for (var m : moves.getPushedMovesList()) {
|
||||
try {
|
||||
processPushedMove(from, m.getParent(), m.getKid());
|
||||
} catch (Exception e) {
|
||||
Log.error("Error when processing pushed move " + m.getParent().getName() + "->" + m.getKid() + " from " + from, e);
|
||||
continue;
|
||||
}
|
||||
processed.add(new MoveDummyEntry(m.getParent().getName(), m.getKid()));
|
||||
}
|
||||
|
||||
remoteObjectServiceClient.pushConfirmedPushedMoves(from, processed);
|
||||
}
|
||||
}
|
||||
@@ -1,95 +0,0 @@
|
||||
package com.usatiuk.dhfs.objects.repository.movedummies;
|
||||
|
||||
import com.usatiuk.dhfs.SerializationHelper;
|
||||
import com.usatiuk.dhfs.objects.jrepository.JObject;
|
||||
import com.usatiuk.dhfs.objects.repository.PersistentRemoteHostsService;
|
||||
import io.quarkus.logging.Log;
|
||||
import io.quarkus.runtime.ShutdownEvent;
|
||||
import io.quarkus.runtime.StartupEvent;
|
||||
import jakarta.annotation.Priority;
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
import jakarta.enterprise.event.Observes;
|
||||
import jakarta.inject.Inject;
|
||||
import org.apache.commons.lang3.SerializationUtils;
|
||||
import org.eclipse.microprofile.config.inject.ConfigProperty;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Paths;
|
||||
import java.util.Collection;
|
||||
import java.util.UUID;
|
||||
|
||||
import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
|
||||
|
||||
@ApplicationScoped
|
||||
public class MoveDummyRegistry {
|
||||
@Inject
|
||||
PersistentRemoteHostsService persistentRemoteHostsService;
|
||||
@ConfigProperty(name = "dhfs.objects.root")
|
||||
String dataRoot;
|
||||
private static final String dataFileName = "movedummies";
|
||||
|
||||
// FIXME: DB when?
|
||||
private MoveDummyRegistryData _persistentData = new MoveDummyRegistryData();
|
||||
|
||||
void init(@Observes @Priority(300) StartupEvent event) throws IOException {
|
||||
Paths.get(dataRoot).toFile().mkdirs();
|
||||
Log.info("Initializing with root " + dataRoot);
|
||||
if (Paths.get(dataRoot).resolve(dataFileName).toFile().exists()) {
|
||||
Log.info("Reading move dummy data queue");
|
||||
_persistentData = SerializationHelper.deserialize(Files.readAllBytes(Paths.get(dataRoot).resolve(dataFileName)));
|
||||
} else if (Paths.get(dataRoot).resolve(dataFileName + ".bak").toFile().exists()) {
|
||||
Log.warn("Reading move dummy data from backup");
|
||||
_persistentData = SerializationHelper.deserialize(Files.readAllBytes(Paths.get(dataRoot).resolve(dataFileName)));
|
||||
}
|
||||
// TODO: Handle unclean shutdowns...
|
||||
}
|
||||
|
||||
void shutdown(@Observes @Priority(300) ShutdownEvent event) throws IOException {
|
||||
Log.info("Saving move dummy data");
|
||||
writeData();
|
||||
Log.info("Saved move dummy data");
|
||||
}
|
||||
|
||||
private void writeData() {
|
||||
try {
|
||||
if (Paths.get(dataRoot).resolve(dataFileName).toFile().exists())
|
||||
Files.move(Paths.get(dataRoot).resolve(dataFileName), Paths.get(dataRoot).resolve(dataFileName + ".bak"), REPLACE_EXISTING);
|
||||
Files.write(Paths.get(dataRoot).resolve(dataFileName), SerializationUtils.serialize(_persistentData));
|
||||
} catch (IOException iex) {
|
||||
Log.error("Error writing deferred invalidations data", iex);
|
||||
throw new RuntimeException(iex);
|
||||
}
|
||||
}
|
||||
|
||||
public void registerMovedRef(JObject<?> parent, String child) {
|
||||
parent.assertRWLock();
|
||||
if (Log.isTraceEnabled())
|
||||
Log.trace("Registered moved ref " + parent.getName() + "->" + child);
|
||||
for (var host : persistentRemoteHostsService.getHostsUuid()) {
|
||||
synchronized (this) {
|
||||
_persistentData.getMoveDummiesPending().put(host, new MoveDummyEntry(parent.getName(), child));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@FunctionalInterface
|
||||
public interface MoveDummyForHostFn<R> {
|
||||
R apply(Collection<MoveDummyEntry> hostsData);
|
||||
}
|
||||
|
||||
public <T> T withPendingForHost(UUID host, MoveDummyForHostFn<T> fn) {
|
||||
synchronized (this) {
|
||||
return fn.apply(_persistentData.getMoveDummiesPending().get(host));
|
||||
}
|
||||
}
|
||||
|
||||
public void commitForHost(UUID host, MoveDummyEntry entry) {
|
||||
if (Log.isTraceEnabled())
|
||||
Log.trace("Committing pushed move from " + host + " " + entry.parent() + "->" + entry.child());
|
||||
synchronized (this) {
|
||||
_persistentData.getMoveDummiesPending().get(host).remove(entry);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@@ -1,17 +0,0 @@
|
||||
package com.usatiuk.dhfs.objects.repository.movedummies;
|
||||
|
||||
import lombok.Getter;
|
||||
import org.apache.commons.collections4.MultiValuedMap;
|
||||
import org.apache.commons.collections4.multimap.HashSetValuedHashMap;
|
||||
|
||||
import java.io.Serial;
|
||||
import java.io.Serializable;
|
||||
import java.util.UUID;
|
||||
|
||||
public class MoveDummyRegistryData implements Serializable {
|
||||
@Serial
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
@Getter
|
||||
private final MultiValuedMap<UUID, MoveDummyEntry> _moveDummiesPending = new HashSetValuedHashMap<>();
|
||||
}
|
||||
@@ -18,7 +18,6 @@ message ObjectMetadataP {
|
||||
repeated string savedRefs = 9;
|
||||
bool locked = 10;
|
||||
bool haveLocalCopy = 11;
|
||||
bool isMoveDummy = 12;
|
||||
}
|
||||
|
||||
message FsNodeP {
|
||||
|
||||
@@ -10,12 +10,10 @@ package dhfs.objects.sync;
|
||||
|
||||
service DhfsObjectSyncGrpc {
|
||||
rpc GetObject (GetObjectRequest) returns (GetObjectReply) {}
|
||||
rpc CanDelete (CanDeleteRequest) returns (CanDeleteReply) {}
|
||||
rpc GetIndex (GetIndexRequest) returns (GetIndexReply) {}
|
||||
rpc IndexUpdate (IndexUpdatePush) returns (IndexUpdateReply) {}
|
||||
|
||||
rpc CanDelete (CanDeleteRequest) returns (CanDeleteReply) {}
|
||||
rpc ConfirmPushedMove (ConfirmPushedMoveRequest) returns (ConfirmPushedMoveReply) {}
|
||||
|
||||
rpc Ping (PingRequest) returns (PingReply) {}
|
||||
}
|
||||
|
||||
@@ -51,39 +49,12 @@ message GetObjectRequest {
|
||||
string selfUuid = 10;
|
||||
|
||||
string name = 2;
|
||||
bool forConflict = 3;
|
||||
}
|
||||
|
||||
//FIXME:
|
||||
message PushedMove {
|
||||
ObjectHeader parent = 1;
|
||||
string kid = 2;
|
||||
}
|
||||
|
||||
message PushedMoves {
|
||||
repeated PushedMove pushedMoves = 2;
|
||||
}
|
||||
|
||||
message GetObjectReply {
|
||||
string selfUuid = 10;
|
||||
|
||||
optional ApiObject object = 1;
|
||||
optional PushedMoves pushedMoves = 2;
|
||||
}
|
||||
|
||||
message PushedMoveConfirm {
|
||||
string parent = 1;
|
||||
string kid = 2;
|
||||
}
|
||||
|
||||
message ConfirmPushedMoveRequest {
|
||||
string selfUuid = 10;
|
||||
|
||||
repeated PushedMoveConfirm confirmedMoves = 1;
|
||||
}
|
||||
|
||||
message ConfirmPushedMoveReply {
|
||||
|
||||
ApiObject object = 1;
|
||||
}
|
||||
|
||||
message CanDeleteRequest {
|
||||
|
||||
@@ -172,7 +172,6 @@ public class DhfsFuseIT {
|
||||
}
|
||||
|
||||
@Test
|
||||
@Disabled // TODO: remove it?
|
||||
void deleteDelayedTest() throws IOException, InterruptedException, TimeoutException {
|
||||
Assertions.assertEquals(0, container1.execInContainer("/bin/sh", "-c", "echo tesempty > /root/dhfs_default/fuse/testf1").getExitCode());
|
||||
Thread.sleep(1000);
|
||||
|
||||
Reference in New Issue
Block a user