mirror of
https://github.com/usatiuk/dhfs.git
synced 2025-10-29 04:57:48 +01:00
cleanup conflict resolution a little
This commit is contained in:
@@ -5,7 +5,6 @@ import com.usatiuk.dhfs.storage.files.objects.Directory;
|
||||
import com.usatiuk.dhfs.storage.objects.jrepository.JObject;
|
||||
import com.usatiuk.dhfs.storage.objects.jrepository.JObjectManager;
|
||||
import com.usatiuk.dhfs.storage.objects.repository.distributed.ConflictResolver;
|
||||
import com.usatiuk.dhfs.storage.objects.repository.distributed.ObjectMetadata;
|
||||
import com.usatiuk.dhfs.storage.objects.repository.distributed.PersistentRemoteHostsService;
|
||||
import com.usatiuk.dhfs.storage.objects.repository.distributed.RemoteObjectServiceClient;
|
||||
import io.grpc.Status;
|
||||
@@ -44,15 +43,12 @@ public class DirectoryConflictResolver implements ConflictResolver {
|
||||
if (!(oursDirU instanceof Directory oursDir))
|
||||
throw new NotImplementedException("Type conflict for " + ours.getName() + ", directory was expected");
|
||||
|
||||
LinkedHashMap<String, UUID> mergedChildren = new LinkedHashMap<>();
|
||||
ObjectMetadata newMetadata;
|
||||
|
||||
var oursHeader = m.toRpcHeader();
|
||||
var theirsHeader = theirsData.getLeft();
|
||||
|
||||
Directory first;
|
||||
Directory second;
|
||||
UUID otherHostname;
|
||||
|
||||
if (oursDir.getMtime() >= theirsDir.getMtime()) {
|
||||
first = oursDir;
|
||||
second = theirsDir;
|
||||
@@ -63,7 +59,9 @@ public class DirectoryConflictResolver implements ConflictResolver {
|
||||
otherHostname = persistentRemoteHostsService.getSelfUuid();
|
||||
}
|
||||
|
||||
mergedChildren.putAll(first.getChildren());
|
||||
LinkedHashMap<String, UUID> mergedChildren = new LinkedHashMap<>(first.getChildren());
|
||||
Map<UUID, Long> newChangelog = new LinkedHashMap<>(m.getChangelog());
|
||||
|
||||
for (var entry : second.getChildren().entrySet()) {
|
||||
if (mergedChildren.containsKey(entry.getKey()) &&
|
||||
!Objects.equals(mergedChildren.get(entry.getKey()), entry.getValue())) {
|
||||
@@ -82,40 +80,33 @@ public class DirectoryConflictResolver implements ConflictResolver {
|
||||
}
|
||||
}
|
||||
|
||||
newMetadata = new ObjectMetadata(ours.getName(), true);
|
||||
|
||||
for (var entry : oursHeader.getChangelog().getEntriesList()) {
|
||||
newMetadata.getChangelog().put(UUID.fromString(entry.getHost()), entry.getVersion());
|
||||
}
|
||||
for (var entry : theirsHeader.getChangelog().getEntriesList()) {
|
||||
newMetadata.getChangelog().merge(UUID.fromString(entry.getHost()), entry.getVersion(), Long::max);
|
||||
newChangelog.merge(UUID.fromString(entry.getHost()), entry.getVersion(), Long::max);
|
||||
}
|
||||
|
||||
boolean wasChanged = mergedChildren.size() != first.getChildren().size()
|
||||
boolean wasChanged = oursDir.getChildren().size() != mergedChildren.size()
|
||||
|| oursDir.getMtime() != first.getMtime()
|
||||
|| oursDir.getCtime() != first.getCtime();
|
||||
|
||||
if (wasChanged) {
|
||||
newMetadata.getChangelog().merge(persistentRemoteHostsService.getSelfUuid(), 1L, Long::sum);
|
||||
if (m.getBestVersion() > newChangelog.values().stream().reduce(0L, Long::sum))
|
||||
throw new StatusRuntimeException(Status.ABORTED.withDescription("Race when conflict resolving"));
|
||||
|
||||
if (m.getBestVersion() >= newMetadata.getOurVersion())
|
||||
throw new StatusRuntimeException(Status.ABORTED.withDescription("Race when conflict resolving"));
|
||||
if (wasChanged) {
|
||||
newChangelog.merge(persistentRemoteHostsService.getSelfUuid(), 1L, Long::sum);
|
||||
|
||||
for (var child : mergedChildren.values()) {
|
||||
if (!(new HashSet<>(oursDir.getChildren().values()).contains(child))) {
|
||||
jObjectManager.getOrPut(child.toString(), Optional.of(oursDir.getName()));
|
||||
}
|
||||
}
|
||||
|
||||
oursDir.setMtime(first.getMtime());
|
||||
oursDir.setCtime(first.getCtime());
|
||||
|
||||
oursDir.getChildren().clear();
|
||||
oursDir.getChildren().putAll(mergedChildren);
|
||||
} else if (m.getBestVersion() > newMetadata.getOurVersion())
|
||||
throw new NotImplementedException("Race when conflict resolving");
|
||||
oursDir.setChildren(mergedChildren);
|
||||
}
|
||||
|
||||
m.getChangelog().clear();
|
||||
m.getChangelog().putAll(newMetadata.getChangelog());
|
||||
m.setChangelog(newChangelog);
|
||||
return null;
|
||||
});
|
||||
|
||||
|
||||
@@ -8,7 +8,6 @@ import com.usatiuk.dhfs.storage.files.objects.File;
|
||||
import com.usatiuk.dhfs.storage.objects.jrepository.JObject;
|
||||
import com.usatiuk.dhfs.storage.objects.jrepository.JObjectManager;
|
||||
import com.usatiuk.dhfs.storage.objects.repository.distributed.ConflictResolver;
|
||||
import com.usatiuk.dhfs.storage.objects.repository.distributed.ObjectMetadata;
|
||||
import com.usatiuk.dhfs.storage.objects.repository.distributed.PersistentRemoteHostsService;
|
||||
import com.usatiuk.dhfs.storage.objects.repository.distributed.RemoteObjectServiceClient;
|
||||
import io.grpc.Status;
|
||||
@@ -18,10 +17,9 @@ import jakarta.enterprise.context.ApplicationScoped;
|
||||
import jakarta.inject.Inject;
|
||||
import org.apache.commons.lang3.NotImplementedException;
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
import org.eclipse.microprofile.config.inject.ConfigProperty;
|
||||
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.UUID;
|
||||
import java.util.*;
|
||||
|
||||
@ApplicationScoped
|
||||
public class FileConflictResolver implements ConflictResolver {
|
||||
@@ -34,6 +32,9 @@ public class FileConflictResolver implements ConflictResolver {
|
||||
@Inject
|
||||
JObjectManager jObjectManager;
|
||||
|
||||
@ConfigProperty(name = "dhfs.files.use_hash_for_chunks")
|
||||
boolean useHashForChunks;
|
||||
|
||||
@Override
|
||||
public ConflictResolutionResult resolve(UUID conflictHost, JObject<?> ours) {
|
||||
var theirsData = remoteObjectServiceClient.getSpecificObject(conflictHost, ours.getName());
|
||||
@@ -67,14 +68,13 @@ public class FileConflictResolver implements ConflictResolver {
|
||||
throw new StatusRuntimeException(Status.ABORTED.withDescription("Bad type for file"));
|
||||
|
||||
// TODO: dedup
|
||||
ObjectMetadata newMetadata;
|
||||
|
||||
var oursHeader = m.toRpcHeader();
|
||||
var theirsHeader = theirsData.getLeft();
|
||||
|
||||
File first;
|
||||
File second;
|
||||
UUID otherHostname;
|
||||
|
||||
if (oursFile.getMtime() >= theirsFile.getMtime()) {
|
||||
first = oursFile;
|
||||
second = theirsFile;
|
||||
@@ -85,13 +85,10 @@ public class FileConflictResolver implements ConflictResolver {
|
||||
otherHostname = persistentRemoteHostsService.getSelfUuid();
|
||||
}
|
||||
|
||||
newMetadata = new ObjectMetadata(ours.getName(), true);
|
||||
Map<UUID, Long> newChangelog = new LinkedHashMap<>(m.getChangelog());
|
||||
|
||||
for (var entry : oursHeader.getChangelog().getEntriesList()) {
|
||||
newMetadata.getChangelog().put(UUID.fromString(entry.getHost()), entry.getVersion());
|
||||
}
|
||||
for (var entry : theirsHeader.getChangelog().getEntriesList()) {
|
||||
newMetadata.getChangelog().merge(UUID.fromString(entry.getHost()), entry.getVersion(), Long::max);
|
||||
newChangelog.merge(UUID.fromString(entry.getHost()), entry.getVersion(), Long::max);
|
||||
}
|
||||
|
||||
boolean chunksDiff = !Objects.equals(first.getChunks(), second.getChunks());
|
||||
@@ -103,35 +100,42 @@ public class FileConflictResolver implements ConflictResolver {
|
||||
|| oursFile.getCtime() != first.getCtime()
|
||||
|| chunksDiff;
|
||||
|
||||
if (m.getBestVersion() > newChangelog.values().stream().reduce(0L, Long::sum))
|
||||
throw new StatusRuntimeException(Status.ABORTED.withDescription("Race when conflict resolving"));
|
||||
|
||||
if (wasChanged) {
|
||||
newMetadata.getChangelog().merge(persistentRemoteHostsService.getSelfUuid(), 1L, Long::sum);
|
||||
newChangelog.merge(persistentRemoteHostsService.getSelfUuid(), 1L, Long::sum);
|
||||
|
||||
if (m.getBestVersion() >= newMetadata.getOurVersion())
|
||||
throw new NotImplementedException("Race when conflict resolving");
|
||||
|
||||
var oldChunks = oursFile.getChunks().values().stream().toList();
|
||||
oursFile.getChunks().clear();
|
||||
if (useHashForChunks)
|
||||
throw new NotImplementedException();
|
||||
|
||||
// FIXME:
|
||||
for (var cuuid : oldChunks) {
|
||||
var ci = jObjectManager.get(ChunkInfo.getNameFromHash(cuuid));
|
||||
ci.ifPresent(jObject -> jObject.runWriteLocked(JObject.ResolutionStrategy.NO_RESOLUTION, (mc, d, b, v) -> {
|
||||
m.removeRef(oursFile.getName());
|
||||
return null;
|
||||
}));
|
||||
for (var cuuid : oursFile.getChunks().values()) {
|
||||
jObjectManager
|
||||
.get(ChunkInfo.getNameFromHash(cuuid))
|
||||
.ifPresent(jObject -> jObject.runWriteLocked(JObject.ResolutionStrategy.NO_RESOLUTION, (mc, d, b, v) -> {
|
||||
m.removeRef(oursFile.getName());
|
||||
return null;
|
||||
}));
|
||||
}
|
||||
|
||||
oursFile.getChunks().clear();
|
||||
|
||||
for (var e : firstChunksCopy) {
|
||||
oursFile.getChunks().put(e.getLeft(), e.getValue());
|
||||
jObjectManager.getOrPut(ChunkData.getNameFromHash(e.getValue()), Optional.of(ChunkInfo.getNameFromHash(e.getValue())));
|
||||
jObjectManager.getOrPut(ChunkInfo.getNameFromHash(e.getValue()), Optional.of(oursFile.getName()));
|
||||
jObjectManager.getOrPut(ChunkData.getNameFromHash(e.getValue()), Optional.of(ChunkInfo.getNameFromHash(e.getValue())));
|
||||
}
|
||||
|
||||
oursFile.setMtime(first.getMtime());
|
||||
oursFile.setCtime(first.getCtime());
|
||||
|
||||
var newFile = new File(UUID.randomUUID(), second.getMode(), oursDir.getUuid());
|
||||
|
||||
newFile.setMtime(second.getMtime());
|
||||
newFile.setCtime(second.getCtime());
|
||||
|
||||
for (var e : secondChunksCopy) {
|
||||
newFile.getChunks().put(e.getLeft(), e.getValue());
|
||||
jObjectManager.getOrPut(ChunkData.getNameFromHash(e.getValue()), Optional.of(ChunkInfo.getNameFromHash(e.getValue())));
|
||||
@@ -139,9 +143,9 @@ public class FileConflictResolver implements ConflictResolver {
|
||||
jObjectManager.getOrPut(ChunkData.getNameFromHash(e.getValue()), Optional.of(ChunkInfo.getNameFromHash(e.getValue())));
|
||||
}
|
||||
|
||||
var theName = oursDir.getChildren().entrySet().stream().filter(p -> p.getValue().equals(oursFile.getUuid())).findAny().orElseThrow(
|
||||
() -> new NotImplementedException("Could not find our file in directory " + oursDir.getName())
|
||||
);
|
||||
var theName = oursDir.getChildren().entrySet().stream()
|
||||
.filter(p -> p.getValue().equals(oursFile.getUuid())).findAny()
|
||||
.orElseThrow(() -> new NotImplementedException("Could not find our file in directory " + oursDir.getName()));
|
||||
|
||||
jObjectManager.put(newFile, Optional.of(_oursDir.getName()));
|
||||
|
||||
@@ -157,11 +161,9 @@ public class FileConflictResolver implements ConflictResolver {
|
||||
} while (true);
|
||||
|
||||
bumpDir.apply();
|
||||
} else if (m.getBestVersion() > newMetadata.getOurVersion())
|
||||
throw new NotImplementedException("Race when conflict resolving");
|
||||
}
|
||||
|
||||
m.getChangelog().clear();
|
||||
m.getChangelog().putAll(newMetadata.getChangelog());
|
||||
m.setChangelog(newChangelog);
|
||||
return null;
|
||||
});
|
||||
return null;
|
||||
|
||||
@@ -3,6 +3,7 @@ package com.usatiuk.dhfs.storage.files.objects;
|
||||
import com.usatiuk.dhfs.storage.files.conflicts.DirectoryConflictResolver;
|
||||
import com.usatiuk.dhfs.storage.objects.repository.distributed.ConflictResolver;
|
||||
import lombok.Getter;
|
||||
import lombok.Setter;
|
||||
|
||||
import java.util.*;
|
||||
|
||||
@@ -16,7 +17,8 @@ public class Directory extends FsNode {
|
||||
}
|
||||
|
||||
@Getter
|
||||
private final Map<String, UUID> _children = new TreeMap<>();
|
||||
@Setter
|
||||
private Map<String, UUID> _children = new TreeMap<>();
|
||||
|
||||
@Override
|
||||
public Class<? extends ConflictResolver> getConflictResolver() {
|
||||
|
||||
@@ -27,7 +27,8 @@ public class ObjectMetadata implements Serializable {
|
||||
private final Map<UUID, Long> _remoteCopies = new LinkedHashMap<>();
|
||||
|
||||
@Getter
|
||||
private final Map<UUID, Long> _changelog = new LinkedHashMap<>();
|
||||
@Setter
|
||||
private Map<UUID, Long> _changelog = new LinkedHashMap<>();
|
||||
|
||||
@Getter
|
||||
@Setter
|
||||
|
||||
Reference in New Issue
Block a user