little fixies for conflict resolution, update quarkus

because of course grpc server had a bug where it could not process more than 1 query at a time
This commit is contained in:
2024-06-19 19:05:04 +02:00
parent 590d4a609d
commit a332c635f1
6 changed files with 13 additions and 12 deletions

View File

@@ -13,7 +13,7 @@
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<quarkus.platform.artifact-id>quarkus-bom</quarkus.platform.artifact-id> <quarkus.platform.artifact-id>quarkus-bom</quarkus.platform.artifact-id>
<quarkus.platform.group-id>io.quarkus.platform</quarkus.platform.group-id> <quarkus.platform.group-id>io.quarkus.platform</quarkus.platform.group-id>
<quarkus.platform.version>3.10.0</quarkus.platform.version> <quarkus.platform.version>3.11.3</quarkus.platform.version>
<skipITs>true</skipITs> <skipITs>true</skipITs>
<surefire-plugin.version>3.2.5</surefire-plugin.version> <surefire-plugin.version>3.2.5</surefire-plugin.version>
</properties> </properties>
@@ -75,17 +75,14 @@
<dependency> <dependency>
<groupId>org.apache.commons</groupId> <groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId> <artifactId>commons-lang3</artifactId>
<version>3.14.0</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>commons-io</groupId> <groupId>commons-io</groupId>
<artifactId>commons-io</artifactId> <artifactId>commons-io</artifactId>
<version>2.16.1</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>commons-codec</groupId> <groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId> <artifactId>commons-codec</artifactId>
<version>1.17.0</version>
</dependency> </dependency>
</dependencies> </dependencies>

View File

@@ -2,6 +2,7 @@ package com.usatiuk.dhfs.storage.objects.repository.distributed;
import io.quarkus.logging.Log; import io.quarkus.logging.Log;
import io.quarkus.scheduler.Scheduled; import io.quarkus.scheduler.Scheduled;
import io.smallrye.common.annotation.Blocking;
import io.smallrye.common.annotation.RunOnVirtualThread; import io.smallrye.common.annotation.RunOnVirtualThread;
import jakarta.enterprise.context.ApplicationScoped; import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject; import jakarta.inject.Inject;
@@ -17,7 +18,7 @@ public class InvalidationQueueService {
RemoteObjectServiceClient remoteObjectServiceClient; RemoteObjectServiceClient remoteObjectServiceClient;
@Scheduled(every = "1s", concurrentExecution = Scheduled.ConcurrentExecution.SKIP) @Scheduled(every = "1s", concurrentExecution = Scheduled.ConcurrentExecution.SKIP)
@RunOnVirtualThread @Blocking
public void trySend() { public void trySend() {
var data = _data.runReadLocked(InvalidationQueueData::pullAll); var data = _data.runReadLocked(InvalidationQueueData::pullAll);
for (var forHost : data.entrySet()) { for (var forHost : data.entrySet()) {

View File

@@ -32,6 +32,7 @@ public class ObjectMetaData implements Serializable {
} }
public Long getBestVersion() { public Long getBestVersion() {
if (_remoteCopies.isEmpty()) return 0L;
return _remoteCopies.values().stream().max(Long::compareTo).get(); return _remoteCopies.values().stream().max(Long::compareTo).get();
} }

View File

@@ -4,12 +4,13 @@ import com.usatiuk.dhfs.objects.repository.distributed.DhfsObjectSyncGrpcGrpc;
import com.usatiuk.dhfs.objects.repository.distributed.PingRequest; import com.usatiuk.dhfs.objects.repository.distributed.PingRequest;
import io.grpc.Status; import io.grpc.Status;
import io.grpc.StatusRuntimeException; import io.grpc.StatusRuntimeException;
import io.grpc.netty.NegotiationType;
import io.grpc.netty.NettyChannelBuilder; import io.grpc.netty.NettyChannelBuilder;
import io.quarkus.logging.Log; import io.quarkus.logging.Log;
import io.quarkus.runtime.ShutdownEvent; import io.quarkus.runtime.ShutdownEvent;
import io.quarkus.runtime.StartupEvent; import io.quarkus.runtime.StartupEvent;
import io.quarkus.scheduler.Scheduled; import io.quarkus.scheduler.Scheduled;
import io.smallrye.common.annotation.RunOnVirtualThread; import io.smallrye.common.annotation.Blocking;
import jakarta.annotation.Priority; import jakarta.annotation.Priority;
import jakarta.enterprise.context.ApplicationScoped; import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes; import jakarta.enterprise.event.Observes;
@@ -39,7 +40,7 @@ public class RemoteHostManager {
} }
@Scheduled(every = "10s", concurrentExecution = Scheduled.ConcurrentExecution.SKIP) @Scheduled(every = "10s", concurrentExecution = Scheduled.ConcurrentExecution.SKIP)
@RunOnVirtualThread @Blocking
public void tryConnectAll() { public void tryConnectAll() {
for (var host : persistentRemoteHostsService.getHosts()) { for (var host : persistentRemoteHostsService.getHosts()) {
var shouldTry = _transientPeersState.runReadLocked(d -> { var shouldTry = _transientPeersState.runReadLocked(d -> {
@@ -101,7 +102,7 @@ public class RemoteHostManager {
} }
private <R> R withClient(String addr, int port, Optional<Long> timeout, ClientFunction<R> fn) { private <R> R withClient(String addr, int port, Optional<Long> timeout, ClientFunction<R> fn) {
var channel = NettyChannelBuilder.forAddress(addr, port) var channel = NettyChannelBuilder.forAddress(addr, port).negotiationType(NegotiationType.PLAINTEXT)
.usePlaintext().build(); .usePlaintext().build();
var client = DhfsObjectSyncGrpcGrpc.newBlockingStub(channel) var client = DhfsObjectSyncGrpcGrpc.newBlockingStub(channel)
.withMaxOutboundMessageSize(Integer.MAX_VALUE) .withMaxOutboundMessageSize(Integer.MAX_VALUE)
@@ -119,7 +120,7 @@ public class RemoteHostManager {
// FIXME: // FIXME:
private boolean reachable(HostInfo hostInfo) { private boolean reachable(HostInfo hostInfo) {
try { try {
return withClient(hostInfo.getAddr(), hostInfo.getPort(), Optional.of(4000L /*ms*/), c -> { return withClient(hostInfo.getAddr(), hostInfo.getPort(), Optional.of(30000L /*ms*/), c -> {
var ret = c.ping(PingRequest.newBuilder().setSelfname(selfname).build()); var ret = c.ping(PingRequest.newBuilder().setSelfname(selfname).build());
if (!ret.getSelfname().equals(hostInfo.getName())) { if (!ret.getSelfname().equals(hostInfo.getName())) {
throw new IllegalStateException("Ping selfname returned " + ret.getSelfname() + " but expected " + hostInfo.getName()); throw new IllegalStateException("Ping selfname returned " + ret.getSelfname() + " but expected " + hostInfo.getName());

View File

@@ -82,6 +82,7 @@ public class RemoteObjectServiceServer implements DhfsObjectSyncGrpc {
} }
@Override @Override
@Blocking
public Uni<PingReply> ping(PingRequest request) { public Uni<PingReply> ping(PingRequest request) {
if (request.getSelfname().isBlank()) throw new StatusRuntimeException(Status.INVALID_ARGUMENT); if (request.getSelfname().isBlank()) throw new StatusRuntimeException(Status.INVALID_ARGUMENT);
remoteHostManager.handleConnectionSuccess(request.getSelfname()); remoteHostManager.handleConnectionSuccess(request.getSelfname());

View File

@@ -94,18 +94,18 @@ public class SyncHandler {
return null; return null;
} }
// Before or after conflict resolution?
data.getRemoteCopies().put(request.getSelfname(), receivedTotalVer);
if (data.getChangelog().get(selfname) > receivedSelfVer) return true; if (data.getChangelog().get(selfname) > receivedSelfVer) return true;
data.getRemoteCopies().put(request.getSelfname(), receivedTotalVer);
assert Objects.equals(data.getBestVersion(), data.getTotalVersion()); assert Objects.equals(data.getBestVersion(), data.getTotalVersion());
// TODO: recheck this // TODO: recheck this
if (data.getBestVersion() >= receivedTotalVer) { if (data.getBestVersion() >= receivedTotalVer) {
Log.info("Received older index update than known: " Log.info("Received older index update than known: "
+ request.getSelfname() + " " + request.getHeader().getName()); + request.getSelfname() + " " + request.getHeader().getName());
return null; return false;
} }
data.getChangelog().clear(); data.getChangelog().clear();