little fixes

This commit is contained in:
2024-06-21 23:35:23 +02:00
parent b178d90271
commit e68bb25edd
3 changed files with 17 additions and 0 deletions

View File

@@ -2,7 +2,9 @@ package com.usatiuk.dhfs.storage.files.objects;
import com.usatiuk.dhfs.objects.repository.distributed.ObjectHeader;
import com.usatiuk.dhfs.storage.objects.repository.distributed.ConflictResolver;
import jakarta.enterprise.context.ApplicationScoped;
@ApplicationScoped
public class NoOpConflictResolver implements ConflictResolver {
@Override
public ConflictResolutionResult resolve(String conflictHost, ObjectHeader conflictSource, String localName) {

View File

@@ -12,6 +12,7 @@ import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import lombok.Getter;
import org.apache.commons.lang3.NotImplementedException;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import java.lang.ref.ReferenceQueue;
import java.lang.ref.SoftReference;
@@ -23,6 +24,9 @@ public class JObjectManagerImpl implements JObjectManager {
@Inject
ObjectPersistentStore objectPersistentStore;
@ConfigProperty(name = "dhfs.objects.distributed.selfname")
String selfname;
@Inject
JObjectResolver jObjectResolver;
@@ -173,6 +177,9 @@ public class JObjectManagerImpl implements JObjectManager {
return (JObject<D>) inMap;
} else {
var created = new JObject<D>(jObjectResolver, object.getName(), object.getConflictResolver().getName(), object);
// FIXME:
if (object.assumeUnique())
created.runWriteLockedMeta((m, a, b) -> m.getChangelog().put(selfname, 0L));
_map.put(object.getName(), new NamedSoftReference(created, _refQueue));
jObjectResolver.notifyWrite(created);
addToNursery(created.getName());

View File

@@ -58,15 +58,23 @@ public class InvalidationQueueService {
while (true) {
Thread.sleep(500);
var data = pullAll();
String stats = "Sent invalidation: ";
for (var forHost : data.entrySet()) {
long sent = 0;
for (var obj : forHost.getValue()) {
try {
remoteObjectServiceClient.notifyUpdate(forHost.getKey(), obj);
sent++;
} catch (Exception e) {
if (e.getCause().getClass().equals(InterruptedException.class)) {
Log.info("Invalidation sender exiting");
return;
}
Log.info("Failed to send invalidation to " + forHost.getKey() + " of " + obj + ": " + e.getMessage() + " will retry");
pushInvalidationToOne(forHost.getKey(), obj);
}
}
stats += forHost.getKey() + ": " + sent + " ";
}
if (Thread.interrupted()) break;
}