diff --git a/server/pom.xml b/server/pom.xml index 01ac31bb..9d18afa6 100644 --- a/server/pom.xml +++ b/server/pom.xml @@ -13,7 +13,7 @@ UTF-8 quarkus-bom io.quarkus.platform - 3.10.0 + 3.11.3 true 3.2.5 @@ -75,17 +75,14 @@ org.apache.commons commons-lang3 - 3.14.0 commons-io commons-io - 2.16.1 commons-codec commons-codec - 1.17.0 diff --git a/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/InvalidationQueueService.java b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/InvalidationQueueService.java index 74a86cac..5ea7b1ca 100644 --- a/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/InvalidationQueueService.java +++ b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/InvalidationQueueService.java @@ -2,6 +2,7 @@ package com.usatiuk.dhfs.storage.objects.repository.distributed; import io.quarkus.logging.Log; import io.quarkus.scheduler.Scheduled; +import io.smallrye.common.annotation.Blocking; import io.smallrye.common.annotation.RunOnVirtualThread; import jakarta.enterprise.context.ApplicationScoped; import jakarta.inject.Inject; @@ -17,7 +18,7 @@ public class InvalidationQueueService { RemoteObjectServiceClient remoteObjectServiceClient; @Scheduled(every = "1s", concurrentExecution = Scheduled.ConcurrentExecution.SKIP) - @RunOnVirtualThread + @Blocking public void trySend() { var data = _data.runReadLocked(InvalidationQueueData::pullAll); for (var forHost : data.entrySet()) { diff --git a/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/ObjectMetaData.java b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/ObjectMetaData.java index 8d052cb8..da26789b 100644 --- a/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/ObjectMetaData.java +++ b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/ObjectMetaData.java @@ -32,6 +32,7 @@ public class ObjectMetaData implements Serializable { } public Long getBestVersion() { + if (_remoteCopies.isEmpty()) return 0L; return _remoteCopies.values().stream().max(Long::compareTo).get(); } diff --git a/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/RemoteHostManager.java b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/RemoteHostManager.java index b001683f..f7092908 100644 --- a/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/RemoteHostManager.java +++ b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/RemoteHostManager.java @@ -4,12 +4,13 @@ import com.usatiuk.dhfs.objects.repository.distributed.DhfsObjectSyncGrpcGrpc; import com.usatiuk.dhfs.objects.repository.distributed.PingRequest; import io.grpc.Status; import io.grpc.StatusRuntimeException; +import io.grpc.netty.NegotiationType; import io.grpc.netty.NettyChannelBuilder; import io.quarkus.logging.Log; import io.quarkus.runtime.ShutdownEvent; import io.quarkus.runtime.StartupEvent; import io.quarkus.scheduler.Scheduled; -import io.smallrye.common.annotation.RunOnVirtualThread; +import io.smallrye.common.annotation.Blocking; import jakarta.annotation.Priority; import jakarta.enterprise.context.ApplicationScoped; import jakarta.enterprise.event.Observes; @@ -39,7 +40,7 @@ public class RemoteHostManager { } @Scheduled(every = "10s", concurrentExecution = Scheduled.ConcurrentExecution.SKIP) - @RunOnVirtualThread + @Blocking public void tryConnectAll() { for (var host : persistentRemoteHostsService.getHosts()) { var shouldTry = _transientPeersState.runReadLocked(d -> { @@ -101,7 +102,7 @@ public class RemoteHostManager { } private R withClient(String addr, int port, Optional timeout, ClientFunction fn) { - var channel = NettyChannelBuilder.forAddress(addr, port) + var channel = NettyChannelBuilder.forAddress(addr, port).negotiationType(NegotiationType.PLAINTEXT) .usePlaintext().build(); var client = DhfsObjectSyncGrpcGrpc.newBlockingStub(channel) .withMaxOutboundMessageSize(Integer.MAX_VALUE) @@ -119,7 +120,7 @@ public class RemoteHostManager { // FIXME: private boolean reachable(HostInfo hostInfo) { try { - return withClient(hostInfo.getAddr(), hostInfo.getPort(), Optional.of(4000L /*ms*/), c -> { + return withClient(hostInfo.getAddr(), hostInfo.getPort(), Optional.of(30000L /*ms*/), c -> { var ret = c.ping(PingRequest.newBuilder().setSelfname(selfname).build()); if (!ret.getSelfname().equals(hostInfo.getName())) { throw new IllegalStateException("Ping selfname returned " + ret.getSelfname() + " but expected " + hostInfo.getName()); diff --git a/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/RemoteObjectServiceServer.java b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/RemoteObjectServiceServer.java index 0b499b70..193314fc 100644 --- a/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/RemoteObjectServiceServer.java +++ b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/RemoteObjectServiceServer.java @@ -82,6 +82,7 @@ public class RemoteObjectServiceServer implements DhfsObjectSyncGrpc { } @Override + @Blocking public Uni ping(PingRequest request) { if (request.getSelfname().isBlank()) throw new StatusRuntimeException(Status.INVALID_ARGUMENT); remoteHostManager.handleConnectionSuccess(request.getSelfname()); diff --git a/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/SyncHandler.java b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/SyncHandler.java index cf8d4d07..c26b7418 100644 --- a/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/SyncHandler.java +++ b/server/src/main/java/com/usatiuk/dhfs/storage/objects/repository/distributed/SyncHandler.java @@ -94,18 +94,18 @@ public class SyncHandler { return null; } - // Before or after conflict resolution? - data.getRemoteCopies().put(request.getSelfname(), receivedTotalVer); if (data.getChangelog().get(selfname) > receivedSelfVer) return true; + data.getRemoteCopies().put(request.getSelfname(), receivedTotalVer); + assert Objects.equals(data.getBestVersion(), data.getTotalVersion()); // TODO: recheck this if (data.getBestVersion() >= receivedTotalVer) { Log.info("Received older index update than known: " + request.getSelfname() + " " + request.getHeader().getName()); - return null; + return false; } data.getChangelog().clear();