mirror of
https://github.com/usatiuk/dhfs.git
synced 2025-10-29 04:57:48 +01:00
maybe working file conflict resolution
This commit is contained in:
@@ -14,6 +14,7 @@
|
|||||||
<quarkus.platform.artifact-id>quarkus-bom</quarkus.platform.artifact-id>
|
<quarkus.platform.artifact-id>quarkus-bom</quarkus.platform.artifact-id>
|
||||||
<quarkus.platform.group-id>io.quarkus.platform</quarkus.platform.group-id>
|
<quarkus.platform.group-id>io.quarkus.platform</quarkus.platform.group-id>
|
||||||
<quarkus.platform.version>3.11.3</quarkus.platform.version>
|
<quarkus.platform.version>3.11.3</quarkus.platform.version>
|
||||||
|
<quarkus.package.jar.type>uber-jar</quarkus.package.jar.type>
|
||||||
<skipITs>true</skipITs>
|
<skipITs>true</skipITs>
|
||||||
<surefire-plugin.version>3.2.5</surefire-plugin.version>
|
<surefire-plugin.version>3.2.5</surefire-plugin.version>
|
||||||
</properties>
|
</properties>
|
||||||
|
|||||||
@@ -0,0 +1,124 @@
|
|||||||
|
package com.usatiuk.dhfs.storage.files.conflicts;
|
||||||
|
|
||||||
|
import com.usatiuk.dhfs.storage.SerializationHelper;
|
||||||
|
import com.usatiuk.dhfs.storage.files.objects.Directory;
|
||||||
|
import com.usatiuk.dhfs.storage.objects.jrepository.JObject;
|
||||||
|
import com.usatiuk.dhfs.storage.objects.repository.distributed.ConflictResolver;
|
||||||
|
import com.usatiuk.dhfs.storage.objects.repository.distributed.ObjectMetadata;
|
||||||
|
import com.usatiuk.dhfs.storage.objects.repository.distributed.PersistentRemoteHostsService;
|
||||||
|
import com.usatiuk.dhfs.storage.objects.repository.distributed.RemoteObjectServiceClient;
|
||||||
|
import io.quarkus.logging.Log;
|
||||||
|
import jakarta.enterprise.context.ApplicationScoped;
|
||||||
|
import jakarta.inject.Inject;
|
||||||
|
import org.apache.commons.lang3.NotImplementedException;
|
||||||
|
|
||||||
|
import java.util.LinkedHashMap;
|
||||||
|
import java.util.Objects;
|
||||||
|
import java.util.UUID;
|
||||||
|
|
||||||
|
@ApplicationScoped
|
||||||
|
public class DirectoryConflictResolver implements ConflictResolver {
|
||||||
|
@Inject
|
||||||
|
PersistentRemoteHostsService persistentRemoteHostsService;
|
||||||
|
|
||||||
|
@Inject
|
||||||
|
RemoteObjectServiceClient remoteObjectServiceClient;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ConflictResolutionResult resolve(UUID conflictHost, JObject<?> ours) {
|
||||||
|
var theirsData = remoteObjectServiceClient.getSpecificObject(conflictHost, ours.getName());
|
||||||
|
|
||||||
|
var theirsDir = (Directory) SerializationHelper.deserialize(theirsData.getRight());
|
||||||
|
if (!theirsDir.getClass().equals(Directory.class)) {
|
||||||
|
Log.error("Object type mismatch!");
|
||||||
|
throw new NotImplementedException();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!ours.isOf(Directory.class))
|
||||||
|
throw new NotImplementedException("Type conflict for " + ours.getName() + ", directory was expected");
|
||||||
|
|
||||||
|
var oursAsDir = (JObject<Directory>) ours;
|
||||||
|
oursAsDir.runWriteLockedMeta((a, b, c) -> {
|
||||||
|
// FIXME:
|
||||||
|
if (!ours.tryLocalResolve())
|
||||||
|
throw new NotImplementedException("Conflict but we don't have local copy for " + ours.getName());
|
||||||
|
|
||||||
|
oursAsDir.runWriteLocked((m, oursDir, bump) -> {
|
||||||
|
|
||||||
|
LinkedHashMap<String, UUID> mergedChildren = new LinkedHashMap<>();
|
||||||
|
ObjectMetadata newMetadata;
|
||||||
|
|
||||||
|
var oursHeader = m.toRpcHeader();
|
||||||
|
var theirsHeader = theirsData.getLeft();
|
||||||
|
|
||||||
|
Directory first;
|
||||||
|
Directory second;
|
||||||
|
UUID otherHostname;
|
||||||
|
if (oursDir.getMtime() >= theirsDir.getMtime()) {
|
||||||
|
first = oursDir;
|
||||||
|
second = theirsDir;
|
||||||
|
otherHostname = conflictHost;
|
||||||
|
} else {
|
||||||
|
second = oursDir;
|
||||||
|
first = theirsDir;
|
||||||
|
otherHostname = persistentRemoteHostsService.getSelfUuid();
|
||||||
|
}
|
||||||
|
|
||||||
|
mergedChildren.putAll(first.getChildren());
|
||||||
|
for (var entry : second.getChildren().entrySet()) {
|
||||||
|
if (mergedChildren.containsKey(entry.getKey()) &&
|
||||||
|
!Objects.equals(mergedChildren.get(entry.getKey()), entry.getValue())) {
|
||||||
|
int i = 0;
|
||||||
|
do {
|
||||||
|
String name = entry.getKey() + ".conflict." + i + "." + otherHostname;
|
||||||
|
if (mergedChildren.containsKey(name)) {
|
||||||
|
i++;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
mergedChildren.put(name, entry.getValue());
|
||||||
|
break;
|
||||||
|
} while (true);
|
||||||
|
} else {
|
||||||
|
mergedChildren.put(entry.getKey(), entry.getValue());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
newMetadata = new ObjectMetadata(ours.getName(), oursHeader.getConflictResolver(), m.getType());
|
||||||
|
|
||||||
|
for (var entry : oursHeader.getChangelog().getEntriesList()) {
|
||||||
|
newMetadata.getChangelog().put(UUID.fromString(entry.getHost()), entry.getVersion());
|
||||||
|
}
|
||||||
|
for (var entry : theirsHeader.getChangelog().getEntriesList()) {
|
||||||
|
newMetadata.getChangelog().merge(UUID.fromString(entry.getHost()), entry.getVersion(), Long::max);
|
||||||
|
}
|
||||||
|
|
||||||
|
boolean wasChanged = mergedChildren.size() != first.getChildren().size()
|
||||||
|
|| oursDir.getMtime() != first.getMtime()
|
||||||
|
|| oursDir.getCtime() != first.getCtime();
|
||||||
|
|
||||||
|
oursDir.setMtime(first.getMtime());
|
||||||
|
oursDir.setCtime(first.getCtime());
|
||||||
|
|
||||||
|
if (wasChanged) {
|
||||||
|
newMetadata.getChangelog().merge(persistentRemoteHostsService.getSelfUuid(), 1L, Long::sum);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (wasChanged)
|
||||||
|
if (m.getBestVersion() >= newMetadata.getOurVersion())
|
||||||
|
throw new NotImplementedException("Race when conflict resolving");
|
||||||
|
|
||||||
|
if (m.getBestVersion() > newMetadata.getOurVersion())
|
||||||
|
throw new NotImplementedException("Race when conflict resolving");
|
||||||
|
oursDir.getChildren().clear();
|
||||||
|
oursDir.getChildren().putAll(mergedChildren);
|
||||||
|
|
||||||
|
m.getChangelog().clear();
|
||||||
|
m.getChangelog().putAll(newMetadata.getChangelog());
|
||||||
|
return null;
|
||||||
|
});
|
||||||
|
return null;
|
||||||
|
});
|
||||||
|
|
||||||
|
return ConflictResolutionResult.RESOLVED;
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,151 @@
|
|||||||
|
package com.usatiuk.dhfs.storage.files.conflicts;
|
||||||
|
|
||||||
|
import com.usatiuk.dhfs.storage.SerializationHelper;
|
||||||
|
import com.usatiuk.dhfs.storage.files.objects.Directory;
|
||||||
|
import com.usatiuk.dhfs.storage.files.objects.File;
|
||||||
|
import com.usatiuk.dhfs.storage.objects.jrepository.JObject;
|
||||||
|
import com.usatiuk.dhfs.storage.objects.jrepository.JObjectManager;
|
||||||
|
import com.usatiuk.dhfs.storage.objects.repository.distributed.ConflictResolver;
|
||||||
|
import com.usatiuk.dhfs.storage.objects.repository.distributed.ObjectMetadata;
|
||||||
|
import com.usatiuk.dhfs.storage.objects.repository.distributed.PersistentRemoteHostsService;
|
||||||
|
import com.usatiuk.dhfs.storage.objects.repository.distributed.RemoteObjectServiceClient;
|
||||||
|
import io.quarkus.logging.Log;
|
||||||
|
import jakarta.inject.Inject;
|
||||||
|
import org.apache.commons.lang3.NotImplementedException;
|
||||||
|
import org.apache.commons.lang3.tuple.Pair;
|
||||||
|
|
||||||
|
import java.util.Objects;
|
||||||
|
import java.util.UUID;
|
||||||
|
|
||||||
|
public class FileConflictResolver implements ConflictResolver {
|
||||||
|
@Inject
|
||||||
|
PersistentRemoteHostsService persistentRemoteHostsService;
|
||||||
|
|
||||||
|
@Inject
|
||||||
|
RemoteObjectServiceClient remoteObjectServiceClient;
|
||||||
|
|
||||||
|
@Inject
|
||||||
|
JObjectManager jObjectManager;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ConflictResolutionResult resolve(UUID conflictHost, JObject<?> ours) {
|
||||||
|
var theirsData = remoteObjectServiceClient.getSpecificObject(conflictHost, ours.getName());
|
||||||
|
|
||||||
|
var theirsFile = (File) SerializationHelper.deserialize(theirsData.getRight());
|
||||||
|
if (!theirsFile.getClass().equals(File.class)) {
|
||||||
|
Log.error("Object type mismatch!");
|
||||||
|
throw new NotImplementedException();
|
||||||
|
}
|
||||||
|
|
||||||
|
var oursAsFile = (JObject<File>) ours;
|
||||||
|
|
||||||
|
var _oursDir = jObjectManager.get(oursAsFile.runReadLocked((m, d) -> d.getParent().toString()), Directory.class)
|
||||||
|
.orElseThrow(() -> new NotImplementedException("Could not find parent directory for file " + oursAsFile.getName()));
|
||||||
|
|
||||||
|
_oursDir.runWriteLockedMeta((a, b, c) -> {
|
||||||
|
// FIXME:
|
||||||
|
if (!_oursDir.tryLocalResolve())
|
||||||
|
throw new NotImplementedException("Conflict but we don't have local copy for " + _oursDir.getName());
|
||||||
|
|
||||||
|
_oursDir.runWriteLocked((mD, oursDir, bumpDir) -> {
|
||||||
|
oursAsFile.runWriteLockedMeta((a2, b2, c2) -> {
|
||||||
|
// FIXME:
|
||||||
|
if (!ours.tryLocalResolve())
|
||||||
|
throw new NotImplementedException("Conflict but we don't have local copy for " + ours.getName());
|
||||||
|
|
||||||
|
oursAsFile.runWriteLocked((m, oursFile, bumpFile) -> {
|
||||||
|
|
||||||
|
// TODO: dedup
|
||||||
|
ObjectMetadata newMetadata;
|
||||||
|
|
||||||
|
var oursHeader = m.toRpcHeader();
|
||||||
|
var theirsHeader = theirsData.getLeft();
|
||||||
|
|
||||||
|
File first;
|
||||||
|
File second;
|
||||||
|
UUID otherHostname;
|
||||||
|
if (oursFile.getMtime() >= theirsFile.getMtime()) {
|
||||||
|
first = oursFile;
|
||||||
|
second = theirsFile;
|
||||||
|
otherHostname = conflictHost;
|
||||||
|
} else {
|
||||||
|
second = oursFile;
|
||||||
|
first = theirsFile;
|
||||||
|
otherHostname = persistentRemoteHostsService.getSelfUuid();
|
||||||
|
}
|
||||||
|
|
||||||
|
newMetadata = new ObjectMetadata(ours.getName(), oursHeader.getConflictResolver(), m.getType());
|
||||||
|
|
||||||
|
for (var entry : oursHeader.getChangelog().getEntriesList()) {
|
||||||
|
newMetadata.getChangelog().put(UUID.fromString(entry.getHost()), entry.getVersion());
|
||||||
|
}
|
||||||
|
for (var entry : theirsHeader.getChangelog().getEntriesList()) {
|
||||||
|
newMetadata.getChangelog().merge(UUID.fromString(entry.getHost()), entry.getVersion(), Long::max);
|
||||||
|
}
|
||||||
|
|
||||||
|
boolean chunksDiff = !Objects.equals(first.getChunks(), second.getChunks());
|
||||||
|
|
||||||
|
var firstChunksCopy = first.getChunks().entrySet().stream().map(e -> Pair.of(e.getKey(), e.getValue())).toList();
|
||||||
|
var secondChunksCopy = second.getChunks().entrySet().stream().map(e -> Pair.of(e.getKey(), e.getValue())).toList();
|
||||||
|
|
||||||
|
boolean wasChanged = oursFile.getMtime() != first.getMtime()
|
||||||
|
|| oursFile.getCtime() != first.getCtime()
|
||||||
|
|| chunksDiff;
|
||||||
|
|
||||||
|
if (wasChanged) {
|
||||||
|
oursFile.getChunks().clear();
|
||||||
|
for (var e : firstChunksCopy) {
|
||||||
|
oursFile.getChunks().put(e.getLeft(), e.getValue());
|
||||||
|
}
|
||||||
|
oursFile.setMtime(first.getMtime());
|
||||||
|
oursFile.setCtime(first.getCtime());
|
||||||
|
|
||||||
|
var newFile = new File(UUID.randomUUID(), second.getMode(), oursDir.getUuid());
|
||||||
|
newFile.setMtime(second.getMtime());
|
||||||
|
newFile.setCtime(second.getCtime());
|
||||||
|
for (var e : secondChunksCopy) {
|
||||||
|
newFile.getChunks().put(e.getLeft(), e.getValue());
|
||||||
|
}
|
||||||
|
|
||||||
|
var theName = oursDir.getChildren().entrySet().stream().filter(p -> p.getValue().equals(oursFile.getUuid())).findAny().orElseThrow(
|
||||||
|
() -> new NotImplementedException("Could not find our file in directory " + oursDir.getName())
|
||||||
|
);
|
||||||
|
|
||||||
|
jObjectManager.put(newFile);
|
||||||
|
|
||||||
|
int i = 0;
|
||||||
|
do {
|
||||||
|
String name = theName.getKey() + ".conflict." + i + "." + otherHostname;
|
||||||
|
if (oursDir.getChildren().containsKey(name)) {
|
||||||
|
i++;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
oursDir.getChildren().put(name, newFile.getUuid());
|
||||||
|
break;
|
||||||
|
} while (true);
|
||||||
|
|
||||||
|
newMetadata.getChangelog().merge(persistentRemoteHostsService.getSelfUuid(), 1L, Long::sum);
|
||||||
|
bumpDir.apply();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (wasChanged)
|
||||||
|
if (m.getBestVersion() >= newMetadata.getOurVersion())
|
||||||
|
throw new NotImplementedException("Race when conflict resolving");
|
||||||
|
|
||||||
|
if (m.getBestVersion() > newMetadata.getOurVersion())
|
||||||
|
throw new NotImplementedException("Race when conflict resolving");
|
||||||
|
|
||||||
|
m.getChangelog().clear();
|
||||||
|
m.getChangelog().putAll(newMetadata.getChangelog());
|
||||||
|
return null;
|
||||||
|
});
|
||||||
|
return null;
|
||||||
|
});
|
||||||
|
return null;
|
||||||
|
});
|
||||||
|
return null;
|
||||||
|
});
|
||||||
|
|
||||||
|
return ConflictResolutionResult.RESOLVED;
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -1,4 +1,4 @@
|
|||||||
package com.usatiuk.dhfs.storage.files.objects;
|
package com.usatiuk.dhfs.storage.files.conflicts;
|
||||||
|
|
||||||
import com.usatiuk.dhfs.storage.objects.jrepository.JObject;
|
import com.usatiuk.dhfs.storage.objects.jrepository.JObject;
|
||||||
import com.usatiuk.dhfs.storage.objects.repository.distributed.ConflictResolver;
|
import com.usatiuk.dhfs.storage.objects.repository.distributed.ConflictResolver;
|
||||||
@@ -1,4 +1,4 @@
|
|||||||
package com.usatiuk.dhfs.storage.files.objects;
|
package com.usatiuk.dhfs.storage.files.conflicts;
|
||||||
|
|
||||||
import com.usatiuk.dhfs.storage.objects.jrepository.JObject;
|
import com.usatiuk.dhfs.storage.objects.jrepository.JObject;
|
||||||
import com.usatiuk.dhfs.storage.objects.repository.distributed.ConflictResolver;
|
import com.usatiuk.dhfs.storage.objects.repository.distributed.ConflictResolver;
|
||||||
@@ -1,6 +1,7 @@
|
|||||||
package com.usatiuk.dhfs.storage.files.objects;
|
package com.usatiuk.dhfs.storage.files.objects;
|
||||||
|
|
||||||
import com.google.protobuf.ByteString;
|
import com.google.protobuf.ByteString;
|
||||||
|
import com.usatiuk.dhfs.storage.files.conflicts.NoOpConflictResolver;
|
||||||
import com.usatiuk.dhfs.storage.objects.jrepository.JObjectData;
|
import com.usatiuk.dhfs.storage.objects.jrepository.JObjectData;
|
||||||
import com.usatiuk.dhfs.storage.objects.repository.distributed.ConflictResolver;
|
import com.usatiuk.dhfs.storage.objects.repository.distributed.ConflictResolver;
|
||||||
import lombok.Getter;
|
import lombok.Getter;
|
||||||
|
|||||||
@@ -1,5 +1,6 @@
|
|||||||
package com.usatiuk.dhfs.storage.files.objects;
|
package com.usatiuk.dhfs.storage.files.objects;
|
||||||
|
|
||||||
|
import com.usatiuk.dhfs.storage.files.conflicts.NoOpConflictResolver;
|
||||||
import com.usatiuk.dhfs.storage.objects.jrepository.JObjectData;
|
import com.usatiuk.dhfs.storage.objects.jrepository.JObjectData;
|
||||||
import com.usatiuk.dhfs.storage.objects.repository.distributed.ConflictResolver;
|
import com.usatiuk.dhfs.storage.objects.repository.distributed.ConflictResolver;
|
||||||
import lombok.Getter;
|
import lombok.Getter;
|
||||||
|
|||||||
@@ -1,5 +1,6 @@
|
|||||||
package com.usatiuk.dhfs.storage.files.objects;
|
package com.usatiuk.dhfs.storage.files.objects;
|
||||||
|
|
||||||
|
import com.usatiuk.dhfs.storage.files.conflicts.DirectoryConflictResolver;
|
||||||
import com.usatiuk.dhfs.storage.objects.repository.distributed.ConflictResolver;
|
import com.usatiuk.dhfs.storage.objects.repository.distributed.ConflictResolver;
|
||||||
import lombok.Getter;
|
import lombok.Getter;
|
||||||
|
|
||||||
|
|||||||
@@ -1,121 +0,0 @@
|
|||||||
package com.usatiuk.dhfs.storage.files.objects;
|
|
||||||
|
|
||||||
import com.usatiuk.dhfs.storage.SerializationHelper;
|
|
||||||
import com.usatiuk.dhfs.storage.objects.jrepository.JObject;
|
|
||||||
import com.usatiuk.dhfs.storage.objects.repository.distributed.ConflictResolver;
|
|
||||||
import com.usatiuk.dhfs.storage.objects.repository.distributed.ObjectMetadata;
|
|
||||||
import com.usatiuk.dhfs.storage.objects.repository.distributed.PersistentRemoteHostsService;
|
|
||||||
import com.usatiuk.dhfs.storage.objects.repository.distributed.RemoteObjectServiceClient;
|
|
||||||
import io.quarkus.logging.Log;
|
|
||||||
import jakarta.enterprise.context.ApplicationScoped;
|
|
||||||
import jakarta.inject.Inject;
|
|
||||||
import org.apache.commons.lang3.NotImplementedException;
|
|
||||||
|
|
||||||
import java.util.LinkedHashMap;
|
|
||||||
import java.util.Objects;
|
|
||||||
import java.util.UUID;
|
|
||||||
|
|
||||||
@ApplicationScoped
|
|
||||||
public class DirectoryConflictResolver implements ConflictResolver {
|
|
||||||
@Inject
|
|
||||||
PersistentRemoteHostsService persistentRemoteHostsService;
|
|
||||||
|
|
||||||
@Inject
|
|
||||||
RemoteObjectServiceClient remoteObjectServiceClient;
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public ConflictResolutionResult resolve(UUID conflictHost, JObject<?> ours) {
|
|
||||||
var theirsData = remoteObjectServiceClient.getSpecificObject(conflictHost, ours.getName());
|
|
||||||
|
|
||||||
if (!ours.isOf(Directory.class))
|
|
||||||
throw new NotImplementedException("Type conflict for " + ours.getName() + ", directory was expected");
|
|
||||||
|
|
||||||
var oursAsDir = (JObject<Directory>) ours;
|
|
||||||
|
|
||||||
oursAsDir.runWriteLocked((m, oursDir, bump) -> {
|
|
||||||
if (!ours.tryLocalResolve())
|
|
||||||
throw new NotImplementedException("Conflict but we don't have local copy for " + ours.getName());
|
|
||||||
|
|
||||||
LinkedHashMap<String, UUID> mergedChildren = new LinkedHashMap<>();
|
|
||||||
ObjectMetadata newMetadata;
|
|
||||||
long newMtime;
|
|
||||||
long newCtime;
|
|
||||||
|
|
||||||
var oursHeader = m.toRpcHeader();
|
|
||||||
var theirsHeader = theirsData.getLeft();
|
|
||||||
|
|
||||||
var theirsDir = (Directory) SerializationHelper.deserialize(theirsData.getRight());
|
|
||||||
if (!theirsDir.getClass().equals(Directory.class)) {
|
|
||||||
Log.error("Object type mismatch!");
|
|
||||||
throw new NotImplementedException();
|
|
||||||
}
|
|
||||||
|
|
||||||
Directory first;
|
|
||||||
Directory second;
|
|
||||||
UUID otherHostname;
|
|
||||||
if (oursDir.getMtime() >= theirsDir.getMtime()) {
|
|
||||||
first = oursDir;
|
|
||||||
second = theirsDir;
|
|
||||||
otherHostname = conflictHost;
|
|
||||||
} else {
|
|
||||||
second = oursDir;
|
|
||||||
first = theirsDir;
|
|
||||||
otherHostname = persistentRemoteHostsService.getSelfUuid();
|
|
||||||
}
|
|
||||||
|
|
||||||
mergedChildren.putAll(first.getChildren());
|
|
||||||
for (var entry : second.getChildren().entrySet()) {
|
|
||||||
if (mergedChildren.containsKey(entry.getKey()) &&
|
|
||||||
!Objects.equals(mergedChildren.get(entry.getKey()), entry.getValue())) {
|
|
||||||
int i = 0;
|
|
||||||
do {
|
|
||||||
String name = entry.getKey() + ".conflict." + i + "." + otherHostname;
|
|
||||||
if (mergedChildren.containsKey(name)) {
|
|
||||||
i++;
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
mergedChildren.put(name, entry.getValue());
|
|
||||||
break;
|
|
||||||
} while (true);
|
|
||||||
} else {
|
|
||||||
mergedChildren.put(entry.getKey(), entry.getValue());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
newMetadata = new ObjectMetadata(ours.getName(), oursHeader.getConflictResolver(), m.getType());
|
|
||||||
|
|
||||||
for (var entry : oursHeader.getChangelog().getEntriesList()) {
|
|
||||||
newMetadata.getChangelog().put(UUID.fromString(entry.getHost()), entry.getVersion());
|
|
||||||
}
|
|
||||||
for (var entry : theirsHeader.getChangelog().getEntriesList()) {
|
|
||||||
newMetadata.getChangelog().merge(UUID.fromString(entry.getHost()), entry.getVersion(), Long::max);
|
|
||||||
}
|
|
||||||
|
|
||||||
boolean wasChanged = mergedChildren.size() != first.getChildren().size();
|
|
||||||
if (wasChanged) {
|
|
||||||
newMetadata.getChangelog().merge(persistentRemoteHostsService.getSelfUuid(), 1L, Long::sum);
|
|
||||||
}
|
|
||||||
|
|
||||||
newMtime = first.getMtime();
|
|
||||||
newCtime = first.getCtime();
|
|
||||||
|
|
||||||
if (wasChanged)
|
|
||||||
if (m.getBestVersion() >= newMetadata.getOurVersion())
|
|
||||||
throw new NotImplementedException("Race when conflict resolving");
|
|
||||||
|
|
||||||
if (m.getBestVersion() > newMetadata.getOurVersion())
|
|
||||||
throw new NotImplementedException("Race when conflict resolving");
|
|
||||||
|
|
||||||
oursDir.setMtime(newMtime);
|
|
||||||
oursDir.setCtime(newCtime);
|
|
||||||
oursDir.getChildren().clear();
|
|
||||||
oursDir.getChildren().putAll(mergedChildren);
|
|
||||||
|
|
||||||
m.getChangelog().clear();
|
|
||||||
m.getChangelog().putAll(newMetadata.getChangelog());
|
|
||||||
return null;
|
|
||||||
});
|
|
||||||
|
|
||||||
return ConflictResolutionResult.RESOLVED;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -1,20 +1,21 @@
|
|||||||
package com.usatiuk.dhfs.storage.files.objects;
|
package com.usatiuk.dhfs.storage.files.objects;
|
||||||
|
|
||||||
import lombok.Getter;
|
import lombok.Getter;
|
||||||
|
import lombok.Setter;
|
||||||
|
|
||||||
import java.util.NavigableMap;
|
import java.util.NavigableMap;
|
||||||
import java.util.TreeMap;
|
import java.util.TreeMap;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
|
|
||||||
public class File extends FsNode {
|
public class File extends FsNode {
|
||||||
public File(UUID uuid) {
|
public File(UUID uuid, long mode, UUID parent) {
|
||||||
super(uuid);
|
|
||||||
}
|
|
||||||
|
|
||||||
public File(UUID uuid, long mode) {
|
|
||||||
super(uuid, mode);
|
super(uuid, mode);
|
||||||
|
_parent = parent;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Getter
|
@Getter
|
||||||
private final NavigableMap<Long, String> _chunks = new TreeMap<>();
|
private final NavigableMap<Long, String> _chunks = new TreeMap<>();
|
||||||
|
|
||||||
|
@Getter
|
||||||
|
private final UUID _parent;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,5 +1,6 @@
|
|||||||
package com.usatiuk.dhfs.storage.files.objects;
|
package com.usatiuk.dhfs.storage.files.objects;
|
||||||
|
|
||||||
|
import com.usatiuk.dhfs.storage.files.conflicts.NotImplementedConflictResolver;
|
||||||
import com.usatiuk.dhfs.storage.objects.jrepository.JObjectData;
|
import com.usatiuk.dhfs.storage.objects.jrepository.JObjectData;
|
||||||
import com.usatiuk.dhfs.storage.objects.repository.distributed.ConflictResolver;
|
import com.usatiuk.dhfs.storage.objects.repository.distributed.ConflictResolver;
|
||||||
import lombok.Getter;
|
import lombok.Getter;
|
||||||
|
|||||||
@@ -54,7 +54,15 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
|||||||
return Optional.empty();
|
return Optional.empty();
|
||||||
}
|
}
|
||||||
|
|
||||||
if (path.getNameCount() == 1) return ref;
|
if (path.getNameCount() == 1) {
|
||||||
|
if (ref.get().isOf(File.class)) {
|
||||||
|
var f = (JObject<File>) ref.get();
|
||||||
|
if (!Objects.equals(f.runReadLocked((m, d) -> d.getParent()).toString(), from.getName())) {
|
||||||
|
throw new StatusRuntimeException(Status.DATA_LOSS.withDescription("Parent mismatch for file " + path));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return ref;
|
||||||
|
}
|
||||||
|
|
||||||
return traverse(ref.get(), path.subpath(1, path.getNameCount()));
|
return traverse(ref.get(), path.subpath(1, path.getNameCount()));
|
||||||
|
|
||||||
@@ -99,8 +107,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
|||||||
|
|
||||||
var dir = (JObject<Directory>) found.get();
|
var dir = (JObject<Directory>) found.get();
|
||||||
var fuuid = UUID.randomUUID();
|
var fuuid = UUID.randomUUID();
|
||||||
File f = new File(fuuid);
|
File f = new File(fuuid, mode, UUID.fromString(dir.getName())); //FIXME:
|
||||||
f.setMode(mode);
|
|
||||||
|
|
||||||
jObjectManager.put(f);
|
jObjectManager.put(f);
|
||||||
|
|
||||||
@@ -147,6 +154,12 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
|||||||
|
|
||||||
if (!(found.get().isOf(Directory.class))) return false;
|
if (!(found.get().isOf(Directory.class))) return false;
|
||||||
|
|
||||||
|
var kidId = ((JObject<Directory>) found.get()).runReadLocked((m, d) -> d.getKid(Path.of(name).getFileName().toString()));
|
||||||
|
|
||||||
|
var kid = jObjectManager.get(kidId.get().toString());
|
||||||
|
|
||||||
|
if (kid.isEmpty()) return false;
|
||||||
|
|
||||||
var dir = (JObject<Directory>) found.get();
|
var dir = (JObject<Directory>) found.get();
|
||||||
return dir.runWriteLocked((m, d, bump) -> {
|
return dir.runWriteLocked((m, d, bump) -> {
|
||||||
bump.apply();
|
bump.apply();
|
||||||
@@ -171,6 +184,8 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
|||||||
if (dent.isEmpty()) return false;
|
if (dent.isEmpty()) return false;
|
||||||
if (!rmdent(from)) return false;
|
if (!rmdent(from)) return false;
|
||||||
|
|
||||||
|
var dentGot = dent.get();
|
||||||
|
|
||||||
// FIXME:
|
// FIXME:
|
||||||
var root = getRoot();
|
var root = getRoot();
|
||||||
var found = traverse(root, Path.of(to).getParent());
|
var found = traverse(root, Path.of(to).getParent());
|
||||||
@@ -180,10 +195,20 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
|||||||
|
|
||||||
var dir = (JObject<Directory>) found.get();
|
var dir = (JObject<Directory>) found.get();
|
||||||
|
|
||||||
|
var putDent = dentGot.isOf(File.class) ?
|
||||||
|
jObjectManager.put(((JObject<File>) dentGot).runWriteLocked((m, d, b) -> {
|
||||||
|
var cpy = new File(UUID.randomUUID(), d.getMode(), UUID.fromString(dir.getName()));
|
||||||
|
cpy.setMtime(d.getMtime());
|
||||||
|
cpy.setCtime(d.getCtime());
|
||||||
|
cpy.getChunks().putAll(d.getChunks());
|
||||||
|
return cpy;
|
||||||
|
})) : dentGot;
|
||||||
|
|
||||||
dir.runWriteLocked((m, d, bump) -> {
|
dir.runWriteLocked((m, d, bump) -> {
|
||||||
bump.apply();
|
bump.apply();
|
||||||
|
|
||||||
d.setMtime(System.currentTimeMillis());
|
d.setMtime(System.currentTimeMillis());
|
||||||
d.getChildren().put(Path.of(to).getFileName().toString(), UUID.fromString(dent.get().getName()));
|
d.getChildren().put(Path.of(to).getFileName().toString(), UUID.fromString(putDent.getName()));
|
||||||
return null;
|
return null;
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|||||||
@@ -53,7 +53,7 @@ public class InvalidationQueueService {
|
|||||||
private void sender() {
|
private void sender() {
|
||||||
try {
|
try {
|
||||||
while (!Thread.interrupted()) {
|
while (!Thread.interrupted()) {
|
||||||
Thread.sleep(100);
|
Thread.sleep(1000);
|
||||||
var data = pullAll();
|
var data = pullAll();
|
||||||
String stats = "Sent invalidation: ";
|
String stats = "Sent invalidation: ";
|
||||||
for (var forHost : data.entrySet()) {
|
for (var forHost : data.entrySet()) {
|
||||||
|
|||||||
@@ -39,7 +39,7 @@ public class DhfsFileServiceSimpleTest {
|
|||||||
ChunkInfo c2i = new ChunkInfo(c2.getHash(), c2.getBytes().size());
|
ChunkInfo c2i = new ChunkInfo(c2.getHash(), c2.getBytes().size());
|
||||||
ChunkData c3 = new ChunkData(ByteString.copyFrom("91011".getBytes()));
|
ChunkData c3 = new ChunkData(ByteString.copyFrom("91011".getBytes()));
|
||||||
ChunkInfo c3i = new ChunkInfo(c3.getHash(), c3.getBytes().size());
|
ChunkInfo c3i = new ChunkInfo(c3.getHash(), c3.getBytes().size());
|
||||||
File f = new File(fuuid);
|
File f = new File(fuuid, 777, null);
|
||||||
f.getChunks().put(0L, c1.getHash());
|
f.getChunks().put(0L, c1.getHash());
|
||||||
f.getChunks().put((long) c1.getBytes().size(), c2.getHash());
|
f.getChunks().put((long) c1.getBytes().size(), c2.getHash());
|
||||||
f.getChunks().put((long) c1.getBytes().size() + c2.getBytes().size(), c3.getHash());
|
f.getChunks().put((long) c1.getBytes().size() + c2.getBytes().size(), c3.getHash());
|
||||||
@@ -129,4 +129,21 @@ public class DhfsFileServiceSimpleTest {
|
|||||||
fileService.truncate(uuid, 7);
|
fileService.truncate(uuid, 7);
|
||||||
Assertions.assertArrayEquals(new byte[]{0, 1, 2, 3, 4, 5, 6,}, fileService.read(uuid, 0, 20).get().toByteArray());
|
Assertions.assertArrayEquals(new byte[]{0, 1, 2, 3, 4, 5, 6,}, fileService.read(uuid, 0, 20).get().toByteArray());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void moveTest() {
|
||||||
|
var ret = fileService.create("/moveTest", 777);
|
||||||
|
Assertions.assertTrue(ret.isPresent());
|
||||||
|
var uuid = ret.get();
|
||||||
|
|
||||||
|
fileService.write(uuid, 0, new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9});
|
||||||
|
Assertions.assertArrayEquals(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, fileService.read(uuid, 0, 10).get().toByteArray());
|
||||||
|
|
||||||
|
Assertions.assertTrue(fileService.rename("/moveTest", "/movedTest"));
|
||||||
|
Assertions.assertFalse(fileService.open("/moveTest").isPresent());
|
||||||
|
Assertions.assertTrue(fileService.open("/movedTest").isPresent());
|
||||||
|
|
||||||
|
Assertions.assertArrayEquals(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9},
|
||||||
|
fileService.read(fileService.open("/movedTest").get(), 0, 10).get().toByteArray());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user