use separate threads for everything

This commit is contained in:
2024-06-21 22:29:30 +02:00
parent 9c6a87de94
commit 9cbac582d3
5 changed files with 130 additions and 124 deletions

View File

@@ -81,17 +81,20 @@ public class JObjectManagerImpl implements JObjectManager {
private void nurseryCleanupThread() {
try {
LinkedHashSet<String> got;
while (true) {
LinkedHashSet<String> got;
synchronized (_writebackQueue) {
_writebackQueue.wait();
got = _writebackQueue.get();
_writebackQueue.set(new LinkedHashSet<>());
}
synchronized (_writebackQueue) {
if (_writebackQueue.get().isEmpty())
_writebackQueue.wait();
got = _writebackQueue.get();
_writebackQueue.set(new LinkedHashSet<>());
}
synchronized (this) {
for (var s : got) {
_nurseryRefcounts.remove(s);
synchronized (this) {
for (var s : got) {
_nurseryRefcounts.remove(s);
}
}
}
} catch (InterruptedException ignored) {
@@ -230,8 +233,6 @@ public class JObjectManagerImpl implements JObjectManager {
synchronized (this) {
if (!objectPersistentStore.existsObject("meta_" + name))
_nurseryRefcounts.merge(name, 1L, Long::sum);
else
_nurseryRefcounts.remove(name);
}
}
@@ -248,18 +249,12 @@ public class JObjectManagerImpl implements JObjectManager {
object.runWriteLockedMeta((m, a, b) -> {
String name = m.getName();
synchronized (this) {
if (objectPersistentStore.existsObject("meta_" + name)) {
_nurseryRefcounts.remove(name);
return null;
}
if (!_nurseryRefcounts.containsKey(name)) return null;
_nurseryRefcounts.merge(name, -1L, Long::sum);
if (_nurseryRefcounts.get(name) <= 0) {
_nurseryRefcounts.remove(name);
jObjectWriteback.remove(name);
if (!objectPersistentStore.existsObject("meta_" + name))
_map.remove(name);
_map.remove(name);
}
}
return null;

View File

@@ -1,18 +1,20 @@
package com.usatiuk.dhfs.storage.objects.jrepository;
import com.usatiuk.dhfs.storage.objects.repository.persistence.ObjectPersistentStore;
import io.quarkus.logging.Log;
import io.quarkus.runtime.ShutdownEvent;
import io.quarkus.scheduler.Scheduled;
import io.smallrye.common.annotation.Blocking;
import io.quarkus.runtime.Startup;
import jakarta.annotation.Priority;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
import jakarta.inject.Inject;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.commons.lang3.tuple.Pair;
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.concurrent.atomic.AtomicBoolean;
@ApplicationScoped
public class JObjectWriteback {
@@ -23,40 +25,55 @@ public class JObjectWriteback {
@Inject
JObjectManager jObjectManager;
AtomicBoolean _writing = new AtomicBoolean(false);
private final LinkedHashMap<String, JObject<?>> _objects = new LinkedHashMap<>();
private final LinkedHashMap<String, Pair<Long, JObject<?>>> _objects = new LinkedHashMap<>();
private final LinkedHashSet<String> _toIgnore = new LinkedHashSet<>();
private Thread _writebackThread;
@Startup
void init() {
_writebackThread = new Thread(this::writeback);
_writebackThread.setName("JObject writeback thread");
_writebackThread.start();
}
void shutdown(@Observes @Priority(10) ShutdownEvent event) {
// FIXME: Hack!
while (true) {
synchronized (this) {
if (_objects.isEmpty() && !_writing.get()) break;
}
_writebackThread.interrupt();
while (_writebackThread.isAlive()) {
try {
Thread.sleep(100);
_writebackThread.join();
} catch (InterruptedException ignored) {
}
}
flush();
Collection<Pair<Long, JObject<?>>> toWrite;
synchronized (_objects) {
toWrite = new ArrayList<>(_objects.values());
}
for (var v : toWrite) {
flushOne(v.getRight());
}
}
@Scheduled(every = "2s", concurrentExecution = Scheduled.ConcurrentExecution.SKIP)
@Blocking
public void flush() {
while (true) {
JObject<?> obj;
synchronized (this) {
var entry = _objects.pollFirstEntry();
if (entry == null) break;
_writing.set(true);
obj = entry.getValue();
private void writeback() {
try {
while (true) {
JObject<?> obj;
synchronized (_objects) {
if (_objects.isEmpty())
_objects.wait();
if (System.currentTimeMillis() - _objects.firstEntry().getValue().getLeft() < 100L)
Thread.sleep(100);
var entry = _objects.pollFirstEntry();
if (entry == null) break;
obj = entry.getValue().getRight();
}
flushOne(obj);
}
flushOne(obj);
}
synchronized (this) {
_writing.set(false);
} catch (InterruptedException e) {
Log.info("Writeback thread exiting");
}
}
@@ -87,20 +104,22 @@ public class JObjectWriteback {
}
public void remove(String name) {
synchronized (this) {
synchronized (_objects) {
_objects.remove(name);
_objects.notifyAll();
}
}
public void markDirty(String name, JObject<?> object) {
synchronized (this) {
synchronized (_objects) {
if (_objects.containsKey(name)) {
return;
}
// FIXME: better logic
if (_objects.size() < 10000) {
_objects.put(name, object);
_objects.put(name, Pair.of(System.currentTimeMillis(), object));
_objects.notifyAll();
return;
}
}

View File

@@ -1,35 +1,9 @@
package com.usatiuk.dhfs.storage.objects.repository.distributed;
import jakarta.enterprise.context.ApplicationScoped;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Set;
public class InvalidationQueue {
private final InvalidationQueueData _data = new InvalidationQueueData();
private final ReadWriteLock _dataLock = new ReentrantReadWriteLock();
@FunctionalInterface
public interface InvalidationQueueDataFn<R> {
R apply(InvalidationQueueData data);
}
public <R> R runReadLocked(InvalidationQueueDataFn<R> fn) {
_dataLock.readLock().lock();
try {
return fn.apply(_data);
} finally {
_dataLock.readLock().unlock();
}
}
public <R> R runWriteLocked(InvalidationQueueDataFn<R> fn) {
_dataLock.writeLock().lock();
try {
return fn.apply(_data);
} finally {
_dataLock.writeLock().unlock();
}
}
}

View File

@@ -1,23 +0,0 @@
package com.usatiuk.dhfs.storage.objects.repository.distributed;
import lombok.Getter;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Set;
public class InvalidationQueueData {
@Getter
private Map<String, Set<String>> _hostToInvObj = new LinkedHashMap<>();
public Set<String> getSetForHost(String host) {
return _hostToInvObj.computeIfAbsent(host, k -> new LinkedHashSet<>());
}
public Map<String, Set<String>> pullAll() {
var ret = _hostToInvObj;
_hostToInvObj = new LinkedHashMap<>();
return ret;
}
}

View File

@@ -1,50 +1,91 @@
package com.usatiuk.dhfs.storage.objects.repository.distributed;
import io.quarkus.logging.Log;
import io.quarkus.scheduler.Scheduled;
import io.smallrye.common.annotation.Blocking;
import io.quarkus.runtime.ShutdownEvent;
import io.quarkus.runtime.Startup;
import jakarta.annotation.Priority;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
import jakarta.inject.Inject;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Set;
@ApplicationScoped
public class InvalidationQueueService {
private final InvalidationQueue _data = new InvalidationQueue();
@Inject
RemoteHostManager remoteHostManager;
@Inject
RemoteObjectServiceClient remoteObjectServiceClient;
@Scheduled(every = "1s", concurrentExecution = Scheduled.ConcurrentExecution.SKIP)
@Blocking
public void trySend() {
var data = _data.runReadLocked(InvalidationQueueData::pullAll);
for (var forHost : data.entrySet()) {
for (var obj : forHost.getValue()) {
try {
remoteObjectServiceClient.notifyUpdate(forHost.getKey(), obj);
} catch (Exception e) {
Log.info("Failed to send invalidation to " + forHost.getKey() + " of " + obj + ": " + e.getMessage() + " will retry");
pushInvalidationToOne(forHost.getKey(), obj);
private Map<String, Set<String>> _hostToInvObj = new LinkedHashMap<>();
private Thread _senderThread;
@Startup
void init() {
_senderThread = new Thread(this::sender);
_senderThread.setName("Invalidation sender");
_senderThread.start();
}
void shutdown(@Observes @Priority(10) ShutdownEvent event) throws InterruptedException {
_senderThread.interrupt();
_senderThread.join();
}
private Set<String> getSetForHost(String host) {
synchronized (this) {
return _hostToInvObj.computeIfAbsent(host, k -> new LinkedHashSet<>());
}
}
public Map<String, Set<String>> pullAll() throws InterruptedException {
synchronized (this) {
if (_hostToInvObj.isEmpty())
this.wait();
var ret = _hostToInvObj;
_hostToInvObj = new LinkedHashMap<>();
return ret;
}
}
private void sender() {
try {
while (true) {
var data = pullAll();
for (var forHost : data.entrySet()) {
for (var obj : forHost.getValue()) {
try {
remoteObjectServiceClient.notifyUpdate(forHost.getKey(), obj);
} catch (Exception e) {
Log.info("Failed to send invalidation to " + forHost.getKey() + " of " + obj + ": " + e.getMessage() + " will retry");
pushInvalidationToOne(forHost.getKey(), obj);
}
}
}
}
} catch (InterruptedException e) {
Log.info("Invalidation sender exiting");
}
}
public void pushInvalidationToAll(String name) {
_data.runWriteLocked(d -> {
synchronized (this) {
for (var h : remoteHostManager.getSeenHosts()) {
d.getSetForHost(h).add(name);
getSetForHost(h).add(name);
}
return null;
});
this.notifyAll();
}
}
public void pushInvalidationToOne(String host, String name) {
_data.runWriteLocked(d -> {
d.getSetForHost(host).add(name);
return null;
});
synchronized (this) {
getSetForHost(host).add(name);
this.notifyAll();
}
}
}