do cleanup on separate thread

This commit is contained in:
2024-06-21 21:55:57 +02:00
parent c1050769b4
commit 9c6a87de94
4 changed files with 57 additions and 24 deletions

View File

@@ -5,6 +5,9 @@ import com.usatiuk.dhfs.storage.objects.repository.distributed.ObjectMetadata;
import com.usatiuk.dhfs.storage.objects.repository.persistence.ObjectPersistentStore;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.quarkus.logging.Log;
import io.quarkus.runtime.Shutdown;
import io.quarkus.runtime.Startup;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import lombok.Getter;
@@ -13,6 +16,7 @@ import org.apache.commons.lang3.NotImplementedException;
import java.lang.ref.ReferenceQueue;
import java.lang.ref.SoftReference;
import java.util.*;
import java.util.concurrent.atomic.AtomicReference;
@ApplicationScoped
public class JObjectManagerImpl implements JObjectManager {
@@ -38,23 +42,60 @@ public class JObjectManagerImpl implements JObjectManager {
private final HashMap<String, NamedSoftReference> _map = new HashMap<>();
private final HashMap<String, Long> _nurseryRefcounts = new HashMap<>();
private final ReferenceQueue<JObject<?>> _refQueue = new ReferenceQueue<>();
private final LinkedHashSet<String> _writebackQueue = new LinkedHashSet<>();
private final AtomicReference<LinkedHashSet<String>> _writebackQueue = new AtomicReference<>(new LinkedHashSet<>());
private void cleanup() {
NamedSoftReference cur;
while ((cur = (NamedSoftReference) _refQueue.poll()) != null) {
synchronized (this) {
if (_map.containsKey(cur._key) && (_map.get(cur._key).get() == null))
_map.remove(cur._key);
private Thread _refCleanupThread;
private Thread _nurseryCleanupThread;
@Startup
void init() {
_refCleanupThread = new Thread(this::refCleanupThread);
_refCleanupThread.setName("JObject ref cleanup thread");
_refCleanupThread.start();
_nurseryCleanupThread = new Thread(this::nurseryCleanupThread);
_nurseryCleanupThread.setName("JObject nursery cleanup thread");
_nurseryCleanupThread.start();
}
@Shutdown
void shutdown() throws InterruptedException {
_refCleanupThread.interrupt();
_nurseryCleanupThread.interrupt();
_refCleanupThread.join();
_nurseryCleanupThread.join();
}
private void refCleanupThread() {
try {
while (true) {
NamedSoftReference cur = (NamedSoftReference) _refQueue.remove();
synchronized (this) {
if (_map.containsKey(cur._key) && (_map.get(cur._key).get() == null))
_map.remove(cur._key);
}
}
} catch (InterruptedException ignored) {
Log.info("Ref cleanup thread exiting");
}
synchronized (this) {
}
private void nurseryCleanupThread() {
try {
LinkedHashSet<String> got;
synchronized (_writebackQueue) {
for (var s : _writebackQueue) {
_writebackQueue.wait();
got = _writebackQueue.get();
_writebackQueue.set(new LinkedHashSet<>());
}
synchronized (this) {
for (var s : got) {
_nurseryRefcounts.remove(s);
}
_writebackQueue.clear();
}
} catch (InterruptedException ignored) {
Log.info("Ref cleanup thread exiting");
}
}
@@ -72,7 +113,6 @@ public class JObjectManagerImpl implements JObjectManager {
@Override
public Optional<JObject<?>> get(String name) {
cleanup();
synchronized (this) {
var inMap = getFromMap(name);
if (inMap != null) return Optional.of(inMap);
@@ -119,8 +159,6 @@ public class JObjectManagerImpl implements JObjectManager {
@Override
public <D extends JObjectData> JObject<D> put(D object) {
cleanup();
synchronized (this) {
var inMap = getFromMap(object.getName());
if (inMap != null) {
@@ -140,8 +178,6 @@ public class JObjectManagerImpl implements JObjectManager {
@Override
public JObject<?> getOrPut(String name, ObjectMetadata md) {
cleanup();
var got = get(name);
if (got.isPresent()) {
@@ -166,8 +202,6 @@ public class JObjectManagerImpl implements JObjectManager {
@Override
public <D extends JObjectData> JObject<D> getOrPut(String name, D object) {
cleanup();
var got = get(name);
if (got.isPresent()) {
if (!got.get().isOf(object.getClass())) {
@@ -204,7 +238,8 @@ public class JObjectManagerImpl implements JObjectManager {
@Override
public void onWriteback(String name) {
synchronized (_writebackQueue) {
_writebackQueue.add(name);
_writebackQueue.get().add(name);
_writebackQueue.notifyAll();
}
}

View File

@@ -4,7 +4,6 @@ import com.usatiuk.dhfs.storage.objects.repository.persistence.ObjectPersistentS
import io.quarkus.runtime.ShutdownEvent;
import io.quarkus.scheduler.Scheduled;
import io.smallrye.common.annotation.Blocking;
import io.smallrye.common.annotation.RunOnVirtualThread;
import jakarta.annotation.Priority;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
@@ -44,7 +43,7 @@ public class JObjectWriteback {
}
@Scheduled(every = "2s", concurrentExecution = Scheduled.ConcurrentExecution.SKIP)
@RunOnVirtualThread
@Blocking
public void flush() {
while (true) {
JObject<?> obj;

View File

@@ -3,7 +3,6 @@ package com.usatiuk.dhfs.storage.objects.repository.distributed;
import io.quarkus.logging.Log;
import io.quarkus.scheduler.Scheduled;
import io.smallrye.common.annotation.Blocking;
import io.smallrye.common.annotation.RunOnVirtualThread;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
@@ -18,7 +17,7 @@ public class InvalidationQueueService {
RemoteObjectServiceClient remoteObjectServiceClient;
@Scheduled(every = "1s", concurrentExecution = Scheduled.ConcurrentExecution.SKIP)
@RunOnVirtualThread
@Blocking
public void trySend() {
var data = _data.runReadLocked(InvalidationQueueData::pullAll);
for (var forHost : data.entrySet()) {

View File

@@ -10,7 +10,7 @@ import io.quarkus.logging.Log;
import io.quarkus.runtime.ShutdownEvent;
import io.quarkus.runtime.StartupEvent;
import io.quarkus.scheduler.Scheduled;
import io.smallrye.common.annotation.RunOnVirtualThread;
import io.smallrye.common.annotation.Blocking;
import jakarta.annotation.Priority;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
@@ -40,7 +40,7 @@ public class RemoteHostManager {
}
@Scheduled(every = "2s", concurrentExecution = Scheduled.ConcurrentExecution.SKIP)
@RunOnVirtualThread
@Blocking
public void tryConnectAll() {
for (var host : persistentRemoteHostsService.getHosts()) {
var shouldTry = _transientPeersState.runReadLocked(d -> {