a little less broken chunk merging with nursery

This commit is contained in:
2024-06-21 21:12:43 +02:00
parent 76541fd7c1
commit ac0a0c5ca9
4 changed files with 26 additions and 18 deletions

View File

@@ -12,10 +12,7 @@ import org.apache.commons.lang3.NotImplementedException;
import java.lang.ref.ReferenceQueue;
import java.lang.ref.SoftReference;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Optional;
import java.util.*;
@ApplicationScoped
public class JObjectManagerImpl implements JObjectManager {
@@ -41,6 +38,7 @@ public class JObjectManagerImpl implements JObjectManager {
private final HashMap<String, NamedSoftReference> _map = new HashMap<>();
private final HashMap<String, Long> _nurseryRefcounts = new HashMap<>();
private final ReferenceQueue<JObject<?>> _refQueue = new ReferenceQueue<>();
private final LinkedHashSet<String> _writebackQueue = new LinkedHashSet<>();
private void cleanup() {
NamedSoftReference cur;
@@ -50,6 +48,14 @@ public class JObjectManagerImpl implements JObjectManager {
_map.remove(cur._key);
}
}
synchronized (this) {
synchronized (_writebackQueue) {
for (var s : _writebackQueue) {
_nurseryRefcounts.remove(s);
}
_writebackQueue.clear();
}
}
}
private JObject<?> getFromMap(String key) {
@@ -190,13 +196,15 @@ public class JObjectManagerImpl implements JObjectManager {
synchronized (this) {
if (!objectPersistentStore.existsObject("meta_" + name))
_nurseryRefcounts.merge(name, 1L, Long::sum);
else
_nurseryRefcounts.remove(name);
}
}
@Override
public void onWriteback(String name) {
synchronized (this) {
_nurseryRefcounts.remove(name);
synchronized (_writebackQueue) {
_writebackQueue.add(name);
}
}
@@ -204,19 +212,17 @@ public class JObjectManagerImpl implements JObjectManager {
public void unref(JObject<?> object) {
object.runWriteLockedMeta((m, a, b) -> {
String name = m.getName();
boolean removed = false;
synchronized (this) {
if (objectPersistentStore.existsObject("meta_" + name)) {
_nurseryRefcounts.remove(name);
return null;
}
if (!_nurseryRefcounts.containsKey(name)) return null;
_nurseryRefcounts.merge(name, -1L, Long::sum);
if (_nurseryRefcounts.get(name) <= 0) {
_nurseryRefcounts.remove(name);
removed = true;
}
}
// Race?
if (removed) {
jObjectWriteback.remove(name);
synchronized (this) {
jObjectWriteback.remove(name);
if (!objectPersistentStore.existsObject("meta_" + name))
_map.remove(name);
}

View File

@@ -12,6 +12,7 @@ import jakarta.inject.Inject;
import org.apache.commons.lang3.SerializationUtils;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.concurrent.atomic.AtomicBoolean;
@ApplicationScoped
@@ -26,6 +27,7 @@ public class JObjectWriteback {
AtomicBoolean _writing = new AtomicBoolean(false);
private final LinkedHashMap<String, JObject<?>> _objects = new LinkedHashMap<>();
private final LinkedHashSet<String> _toIgnore = new LinkedHashSet<>();
void shutdown(@Observes @Priority(10) ShutdownEvent event) {
// FIXME: Hack!
@@ -42,7 +44,7 @@ public class JObjectWriteback {
}
@Scheduled(every = "1s", concurrentExecution = Scheduled.ConcurrentExecution.SKIP)
@Blocking
@RunOnVirtualThread
public void flush() {
while (true) {
JObject<?> obj;

View File

@@ -18,7 +18,7 @@ public class InvalidationQueueService {
RemoteObjectServiceClient remoteObjectServiceClient;
@Scheduled(every = "1s", concurrentExecution = Scheduled.ConcurrentExecution.SKIP)
@Blocking
@RunOnVirtualThread
public void trySend() {
var data = _data.runReadLocked(InvalidationQueueData::pullAll);
for (var forHost : data.entrySet()) {

View File

@@ -10,7 +10,7 @@ import io.quarkus.logging.Log;
import io.quarkus.runtime.ShutdownEvent;
import io.quarkus.runtime.StartupEvent;
import io.quarkus.scheduler.Scheduled;
import io.smallrye.common.annotation.Blocking;
import io.smallrye.common.annotation.RunOnVirtualThread;
import jakarta.annotation.Priority;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
@@ -40,7 +40,7 @@ public class RemoteHostManager {
}
@Scheduled(every = "2s", concurrentExecution = Scheduled.ConcurrentExecution.SKIP)
@Blocking
@RunOnVirtualThread
public void tryConnectAll() {
for (var host : persistentRemoteHostsService.getHosts()) {
var shouldTry = _transientPeersState.runReadLocked(d -> {