mirror of
https://github.com/usatiuk/dhfs.git
synced 2025-10-29 04:57:48 +01:00
Server: fix possible deadlock when resolving conflict
This commit is contained in:
@@ -19,6 +19,7 @@ import java.util.HashMap;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
@@ -65,8 +66,11 @@ public class SyncHandler {
|
||||
}
|
||||
|
||||
public void handleOneUpdate(UUID from, ObjectHeader header) {
|
||||
jObjectTxManager.executeTx(() -> {
|
||||
AtomicReference<JObject<?>> foundExt = new AtomicReference<>();
|
||||
|
||||
boolean conflict = jObjectTxManager.executeTx(() -> {
|
||||
JObject<?> found = jObjectManager.getOrPut(header.getName(), JObjectData.class, Optional.empty());
|
||||
foundExt.set(found);
|
||||
|
||||
var receivedTotalVer = header.getChangelog().getEntriesList()
|
||||
.stream().map(ObjectChangelogEntry::getVersion).reduce(0L, Long::sum);
|
||||
@@ -76,7 +80,7 @@ public class SyncHandler {
|
||||
receivedMap.put(UUID.fromString(e.getHost()), e.getVersion());
|
||||
}
|
||||
|
||||
boolean conflict = found.runWriteLocked(JObjectManager.ResolutionStrategy.NO_RESOLUTION, (md, data, bump, invalidate) -> {
|
||||
return found.runWriteLocked(JObjectManager.ResolutionStrategy.NO_RESOLUTION, (md, data, bump, invalidate) -> {
|
||||
if (md.getRemoteCopies().getOrDefault(from, 0L) > receivedTotalVer) {
|
||||
Log.error("Received older index update than was known for host: "
|
||||
+ from + " " + header.getName());
|
||||
@@ -139,21 +143,25 @@ public class SyncHandler {
|
||||
|
||||
return false;
|
||||
});
|
||||
});
|
||||
|
||||
if (conflict) {
|
||||
Log.info("Trying conflict resolution: " + header.getName() + " from " + from);
|
||||
// TODO: Is the lock gap here ok?
|
||||
if (conflict) {
|
||||
Log.info("Trying conflict resolution: " + header.getName() + " from " + from);
|
||||
var found = foundExt.get();
|
||||
|
||||
JObjectData theirsData;
|
||||
ObjectHeader theirsHeader;
|
||||
if (header.hasPushedData()) {
|
||||
theirsHeader = header;
|
||||
theirsData = dataProtoSerializer.deserialize(header.getPushedData());
|
||||
} else {
|
||||
var got = remoteObjectServiceClient.getSpecificObject(from, header.getName());
|
||||
theirsData = dataProtoSerializer.deserialize(got.getRight());
|
||||
theirsHeader = got.getLeft();
|
||||
}
|
||||
JObjectData theirsData;
|
||||
ObjectHeader theirsHeader;
|
||||
if (header.hasPushedData()) {
|
||||
theirsHeader = header;
|
||||
theirsData = dataProtoSerializer.deserialize(header.getPushedData());
|
||||
} else {
|
||||
var got = remoteObjectServiceClient.getSpecificObject(from, header.getName());
|
||||
theirsData = dataProtoSerializer.deserialize(got.getRight());
|
||||
theirsHeader = got.getLeft();
|
||||
}
|
||||
|
||||
jObjectTxManager.executeTx(() -> {
|
||||
var resolverClass = found.runReadLocked(JObjectManager.ResolutionStrategy.LOCAL_ONLY, (m, d) -> {
|
||||
if (d == null)
|
||||
throw new StatusRuntimeExceptionNoStacktrace(Status.UNAVAILABLE.withDescription("No local data when conflict " + header.getName()));
|
||||
@@ -161,9 +169,10 @@ public class SyncHandler {
|
||||
});
|
||||
var resolver = conflictResolvers.select(resolverClass);
|
||||
resolver.get().resolve(from, theirsHeader, theirsData, found);
|
||||
Log.info("Resolved conflict for " + from + " " + header.getName());
|
||||
}
|
||||
});
|
||||
});
|
||||
Log.info("Resolved conflict for " + from + " " + header.getName());
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public IndexUpdateReply handleRemoteUpdate(IndexUpdatePush request) {
|
||||
|
||||
Reference in New Issue
Block a user