mirror of
https://github.com/usatiuk/dhfs.git
synced 2025-10-28 12:37:48 +01:00
Store best version per host
This commit is contained in:
@@ -6,10 +6,8 @@ import com.usatiuk.dhfs.objects.repository.distributed.ObjectHeader;
|
||||
import lombok.Getter;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public class ObjectMetaData implements Serializable {
|
||||
public ObjectMetaData(String name, Boolean assumeUnique) {
|
||||
@@ -24,15 +22,19 @@ public class ObjectMetaData implements Serializable {
|
||||
private final Boolean _assumeUnique;
|
||||
|
||||
@Getter
|
||||
private final List<String> _remoteCopies = new ArrayList<>();
|
||||
private final Map<String, Long> _remoteCopies = new LinkedHashMap<>();
|
||||
|
||||
@Getter
|
||||
private final HashMap<String, Long> _changelog = new LinkedHashMap<>();
|
||||
private final Map<String, Long> _changelog = new LinkedHashMap<>();
|
||||
|
||||
Long getTotalVersion() {
|
||||
return _changelog.values().stream().reduce(0L, Long::sum);
|
||||
}
|
||||
|
||||
Long getBestVersion() {
|
||||
return _remoteCopies.values().stream().max(Long::compareTo).get();
|
||||
}
|
||||
|
||||
ObjectChangelog toRpcChangelog() {
|
||||
var changelogBuilder = ObjectChangelog.newBuilder();
|
||||
for (var m : getChangelog().entrySet()) {
|
||||
|
||||
@@ -51,7 +51,8 @@ public class RemoteHostManager {
|
||||
var hostInfo = _remoteHosts.get(remoteHostName);
|
||||
var channel = NettyChannelBuilder.forAddress(hostInfo.getAddr(), hostInfo.getPort())
|
||||
.usePlaintext().build();
|
||||
var client = DhfsObjectSyncGrpcGrpc.newBlockingStub(channel).withMaxOutboundMessageSize(Integer.MAX_VALUE)
|
||||
var client = DhfsObjectSyncGrpcGrpc.newBlockingStub(channel)
|
||||
.withMaxOutboundMessageSize(Integer.MAX_VALUE)
|
||||
.withMaxInboundMessageSize(Integer.MAX_VALUE);
|
||||
try {
|
||||
return fn.apply(client);
|
||||
|
||||
@@ -38,6 +38,15 @@ public class SyncHandler {
|
||||
.stream().map(ObjectChangelogEntry::getVersion).reduce(0L, Long::sum);
|
||||
|
||||
meta.runWriteLocked((data) -> {
|
||||
if (data.getRemoteCopies().getOrDefault(request.getSelfname(), 0L) > receivedTotalVer) {
|
||||
Log.error("Received older index update than was known for host: "
|
||||
+ request.getSelfname() + " " + request.getHeader().getName());
|
||||
return null;
|
||||
}
|
||||
|
||||
// Before or after conflict resolution?
|
||||
data.getRemoteCopies().put(request.getSelfname(), receivedTotalVer);
|
||||
|
||||
var conflict = (data.getChangelog().get(selfname) > receivedSelfVer) && !data.getAssumeUnique();
|
||||
|
||||
if (conflict) {
|
||||
@@ -46,13 +55,6 @@ public class SyncHandler {
|
||||
}
|
||||
|
||||
if (receivedTotalVer.equals(data.getTotalVersion())) {
|
||||
data.getRemoteCopies().add(request.getSelfname());
|
||||
return null;
|
||||
}
|
||||
|
||||
if (receivedTotalVer < data.getTotalVersion()) {
|
||||
// FIXME?:
|
||||
data.getRemoteCopies().remove(request.getSelfname());
|
||||
return null;
|
||||
}
|
||||
|
||||
@@ -62,9 +64,6 @@ public class SyncHandler {
|
||||
}
|
||||
data.getChangelog().putIfAbsent(selfname, 0L);
|
||||
|
||||
data.getRemoteCopies().clear();
|
||||
data.getRemoteCopies().add(request.getSelfname());
|
||||
|
||||
try {
|
||||
objectPersistentStore.deleteObject(request.getHeader().getName());
|
||||
} catch (StatusRuntimeException sx) {
|
||||
|
||||
Reference in New Issue
Block a user