diff --git a/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/ObjectMetaData.java b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/ObjectMetaData.java index d3fe050d..02c7c4d1 100644 --- a/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/ObjectMetaData.java +++ b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/ObjectMetaData.java @@ -6,10 +6,8 @@ import com.usatiuk.dhfs.objects.repository.distributed.ObjectHeader; import lombok.Getter; import java.io.Serializable; -import java.util.ArrayList; -import java.util.HashMap; import java.util.LinkedHashMap; -import java.util.List; +import java.util.Map; public class ObjectMetaData implements Serializable { public ObjectMetaData(String name, Boolean assumeUnique) { @@ -24,15 +22,19 @@ public class ObjectMetaData implements Serializable { private final Boolean _assumeUnique; @Getter - private final List _remoteCopies = new ArrayList<>(); + private final Map _remoteCopies = new LinkedHashMap<>(); @Getter - private final HashMap _changelog = new LinkedHashMap<>(); + private final Map _changelog = new LinkedHashMap<>(); Long getTotalVersion() { return _changelog.values().stream().reduce(0L, Long::sum); } + Long getBestVersion() { + return _remoteCopies.values().stream().max(Long::compareTo).get(); + } + ObjectChangelog toRpcChangelog() { var changelogBuilder = ObjectChangelog.newBuilder(); for (var m : getChangelog().entrySet()) { diff --git a/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/RemoteHostManager.java b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/RemoteHostManager.java index 2c232225..d769a8f1 100644 --- a/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/RemoteHostManager.java +++ b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/RemoteHostManager.java @@ -51,7 +51,8 @@ public class RemoteHostManager { var hostInfo = _remoteHosts.get(remoteHostName); var channel = NettyChannelBuilder.forAddress(hostInfo.getAddr(), hostInfo.getPort()) .usePlaintext().build(); - var client = DhfsObjectSyncGrpcGrpc.newBlockingStub(channel).withMaxOutboundMessageSize(Integer.MAX_VALUE) + var client = DhfsObjectSyncGrpcGrpc.newBlockingStub(channel) + .withMaxOutboundMessageSize(Integer.MAX_VALUE) .withMaxInboundMessageSize(Integer.MAX_VALUE); try { return fn.apply(client); diff --git a/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/SyncHandler.java b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/SyncHandler.java index 216dec0d..f2c64f05 100644 --- a/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/SyncHandler.java +++ b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/SyncHandler.java @@ -38,6 +38,15 @@ public class SyncHandler { .stream().map(ObjectChangelogEntry::getVersion).reduce(0L, Long::sum); meta.runWriteLocked((data) -> { + if (data.getRemoteCopies().getOrDefault(request.getSelfname(), 0L) > receivedTotalVer) { + Log.error("Received older index update than was known for host: " + + request.getSelfname() + " " + request.getHeader().getName()); + return null; + } + + // Before or after conflict resolution? + data.getRemoteCopies().put(request.getSelfname(), receivedTotalVer); + var conflict = (data.getChangelog().get(selfname) > receivedSelfVer) && !data.getAssumeUnique(); if (conflict) { @@ -46,13 +55,6 @@ public class SyncHandler { } if (receivedTotalVer.equals(data.getTotalVersion())) { - data.getRemoteCopies().add(request.getSelfname()); - return null; - } - - if (receivedTotalVer < data.getTotalVersion()) { - // FIXME?: - data.getRemoteCopies().remove(request.getSelfname()); return null; } @@ -62,9 +64,6 @@ public class SyncHandler { } data.getChangelog().putIfAbsent(selfname, 0L); - data.getRemoteCopies().clear(); - data.getRemoteCopies().add(request.getSelfname()); - try { objectPersistentStore.deleteObject(request.getHeader().getName()); } catch (StatusRuntimeException sx) {