Sync-base: op extractor interface

This commit is contained in:
2025-04-25 08:58:54 +02:00
parent 6bd92ad7cd
commit d94abfee97
6 changed files with 186 additions and 104 deletions

View File

@@ -0,0 +1,59 @@
package com.usatiuk.dhfs.jkleppmanntree;
import com.usatiuk.dhfs.PeerId;
import com.usatiuk.dhfs.RemoteTransaction;
import com.usatiuk.dhfs.jkleppmanntree.structs.JKleppmannTreePersistentData;
import com.usatiuk.dhfs.repository.invalidation.InvalidationQueueService;
import com.usatiuk.dhfs.repository.invalidation.Op;
import com.usatiuk.dhfs.repository.invalidation.OpExtractor;
import com.usatiuk.dhfs.repository.syncmap.DtoMapperService;
import com.usatiuk.objects.transaction.Transaction;
import com.usatiuk.objects.transaction.TransactionManager;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import org.apache.commons.lang3.tuple.Pair;
import java.util.List;
@ApplicationScoped
public class JKleppmannTreeOpExtractor implements OpExtractor<JKleppmannTreePersistentData> {
@Inject
TransactionManager txm;
@Inject
Transaction curTx;
@Inject
RemoteTransaction remoteTransaction;
@Inject
DtoMapperService dtoMapperService;
@Inject
JKleppmannTreeManager jKleppmannTreeManager;
@Inject
InvalidationQueueService invalidationQueueService;
@Override
public Pair<List<Op>, Runnable> extractOps(JKleppmannTreePersistentData data, PeerId peerId) {
return txm.run(() -> {
var tree = jKleppmannTreeManager.getTree(data.key());
if (!tree.hasPendingOpsForHost(peerId))
return Pair.of(List.of(tree.getPeriodicPushOp()), (Runnable) () -> {
});
var ops = tree.getPendingOpsForHost(peerId, 100);
if (tree.hasPendingOpsForHost(peerId)) {
curTx.onCommit(() -> invalidationQueueService.pushInvalidationToOneNoDelay(peerId, data.key()));
}
var key = data.key();
return Pair.<List<Op>, Runnable>of(ops, (Runnable) () -> {
txm.run(() -> {
var commitTree = jKleppmannTreeManager.getTree(key);
for (var op : ops) {
commitTree.commitOpForHost(peerId, op);
}
});
});
});
}
}

View File

@@ -1,14 +1,17 @@
package com.usatiuk.dhfs.repository.invalidation;
import com.usatiuk.dhfs.repository.RemoteObjectServiceClient;
import com.usatiuk.dhfs.utils.AutoCloseableNoThrow;
import com.usatiuk.dhfs.utils.DataLocker;
import com.usatiuk.objects.JObjectKey;
import com.usatiuk.dhfs.PeerId;
import com.usatiuk.dhfs.repository.PeerManager;
import com.usatiuk.dhfs.repository.PersistentPeerDataService;
import com.usatiuk.dhfs.repository.RemoteObjectServiceClient;
import com.usatiuk.dhfs.repository.peersync.PeerInfoService;
import com.usatiuk.dhfs.utils.AutoCloseableNoThrow;
import com.usatiuk.dhfs.utils.DataLocker;
import com.usatiuk.dhfs.utils.HashSetDelayedBlockingQueue;
import com.usatiuk.objects.JData;
import com.usatiuk.objects.JObjectKey;
import com.usatiuk.objects.transaction.Transaction;
import com.usatiuk.objects.transaction.TransactionManager;
import io.quarkus.logging.Log;
import io.quarkus.runtime.ShutdownEvent;
import io.quarkus.runtime.StartupEvent;
@@ -17,7 +20,6 @@ import jakarta.annotation.Priority;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
import jakarta.inject.Inject;
import jakarta.ws.rs.core.Link;
import org.apache.commons.collections4.multimap.ArrayListValuedHashMap;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.eclipse.microprofile.config.inject.ConfigProperty;
@@ -40,7 +42,10 @@ public class InvalidationQueueService {
@Inject
PeerInfoService peerInfoService;
@Inject
OpPusher opPusher;
TransactionManager txm;
@Inject
Transaction curTx;
@ConfigProperty(name = "dhfs.objects.invalidation.threads")
int threads;
@Inject
@@ -49,6 +54,8 @@ public class InvalidationQueueService {
private final DataLocker _locker = new DataLocker();
@Inject
RemoteObjectServiceClient remoteObjectServiceClient;
@Inject
OpExtractorService opExtractorService;
private ExecutorService _executor;
private volatile boolean _shutdown = false;
@@ -141,9 +148,15 @@ public class InvalidationQueueService {
}
locks.add(lock);
try {
var prepared = opPusher.preparePush(e);
ops.get(e.peer()).addAll(prepared.getLeft());
commits.get(e.peer()).addAll(prepared.getRight());
txm.run(() -> {
var obj = curTx.get(JData.class, e.key()).orElse(null);
if (obj == null) return;
var extracted = opExtractorService.extractOps(obj, e.peer());
if (extracted == null) return;
ops.get(e.peer()).addAll(extracted.getLeft());
commits.get(e.peer()).add(extracted.getRight());
});
success++;
} catch (Exception ex) {
Log.warnv("Failed to prepare invalidation to {0}, will retry: {1}", e, ex);

View File

@@ -0,0 +1,11 @@
package com.usatiuk.dhfs.repository.invalidation;
import com.usatiuk.dhfs.PeerId;
import com.usatiuk.objects.JData;
import org.apache.commons.lang3.tuple.Pair;
import java.util.List;
public interface OpExtractor<T extends JData> {
Pair<List<Op>, Runnable> extractOps(T data, PeerId peerId);
}

View File

@@ -0,0 +1,49 @@
package com.usatiuk.dhfs.repository.invalidation;
import com.usatiuk.dhfs.PeerId;
import com.usatiuk.dhfs.repository.syncmap.DtoMapper;
import com.usatiuk.objects.JData;
import jakarta.annotation.Nullable;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Instance;
import org.apache.commons.lang3.tuple.Pair;
import java.lang.reflect.ParameterizedType;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;
@ApplicationScoped
public class OpExtractorService {
private final Map<Class<? extends JData>, OpExtractor> _opExtractorMap;
public OpExtractorService(Instance<OpExtractor<?>> opExtractors) {
HashMap<Class<? extends JData>, OpExtractor> opExtractorMap = new HashMap<>();
for (var opExtractor : opExtractors.handles()) {
for (var type : Arrays.stream(opExtractor.getBean().getBeanClass().getGenericInterfaces()).flatMap(
t -> {
if (!(t instanceof ParameterizedType pm)) return Stream.empty();
if (pm.getRawType().equals(OpExtractor.class)) return Stream.of(pm);
return Stream.empty();
}
).toList()) {
var orig = type.getActualTypeArguments()[0];
assert JData.class.isAssignableFrom((Class<?>) orig);
opExtractorMap.put((Class<? extends JData>) orig, opExtractor.get());
}
}
_opExtractorMap = Map.copyOf(opExtractorMap);
}
public @Nullable Pair<List<Op>, Runnable> extractOps(JData data, PeerId peerId) {
var extractor = _opExtractorMap.get(data.getClass());
if (extractor == null) {
return null;
}
return extractor.extractOps(data, peerId);
}
}

View File

@@ -1,95 +0,0 @@
package com.usatiuk.dhfs.repository.invalidation;
import com.usatiuk.objects.JData;
import com.usatiuk.dhfs.RemoteObjectMeta;
import com.usatiuk.dhfs.RemoteTransaction;
import com.usatiuk.dhfs.jkleppmanntree.JKleppmannTreeManager;
import com.usatiuk.dhfs.jkleppmanntree.structs.JKleppmannTreePersistentData;
import com.usatiuk.dhfs.repository.JDataRemoteDto;
import com.usatiuk.dhfs.repository.JDataRemotePush;
import com.usatiuk.dhfs.repository.RemoteObjectServiceClient;
import com.usatiuk.dhfs.repository.syncmap.DtoMapperService;
import com.usatiuk.objects.transaction.Transaction;
import com.usatiuk.objects.transaction.TransactionManager;
import io.quarkus.logging.Log;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import org.apache.commons.lang3.tuple.Pair;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
@ApplicationScoped
public class OpPusher {
@Inject
Transaction curTx;
@Inject
TransactionManager txm;
@Inject
RemoteTransaction remoteTransaction;
@Inject
RemoteObjectServiceClient remoteObjectServiceClient;
@Inject
InvalidationQueueService invalidationQueueService;
@Inject
JKleppmannTreeManager jKleppmannTreeManager;
@Inject
DtoMapperService dtoMapperService;
public Pair<List<Op>, List<Runnable>> preparePush(InvalidationQueueEntry entry) {
List<Op> info = txm.run(() -> {
var obj = curTx.get(JData.class, entry.key()).orElse(null);
switch (obj) {
case RemoteObjectMeta remote -> {
JDataRemoteDto data =
remote.knownType().isAnnotationPresent(JDataRemotePush.class)
? remoteTransaction.getData(remote.knownType(), entry.key())
.map(d -> dtoMapperService.toDto(d, d.dtoClass())).orElse(null)
: null;
if (remote.knownType().isAnnotationPresent(JDataRemotePush.class) && data == null) {
Log.warnv("Failed to get data for push {0} of type {1}", entry.key(), remote.knownType());
}
return List.of(new IndexUpdateOp(entry.key(), remote.changelog(), data));
}
case JKleppmannTreePersistentData pd -> {
var tree = jKleppmannTreeManager.getTree(pd.key());
if (!tree.hasPendingOpsForHost(entry.peer()))
return List.of(tree.getPeriodicPushOp());
var ops = tree.getPendingOpsForHost(entry.peer(), 100);
if (tree.hasPendingOpsForHost(entry.peer())) {
invalidationQueueService.pushInvalidationToOneNoDelay(entry.peer(), pd.key());
}
return ops;
}
case null,
default -> {
return List.of();
}
}
});
List<Runnable> commits = info.stream().<Runnable>map(o -> {
return () -> {
txm.run(() -> {
var obj = curTx.get(JData.class, entry.key()).orElse(null);
switch (obj) {
case JKleppmannTreePersistentData pd: {
var tree = jKleppmannTreeManager.getTree(pd.key());
for (var op : info) {
tree.commitOpForHost(entry.peer(), op);
}
break;
}
case null:
default:
}
});
};
}).toList();
return Pair.of(info, commits);
}
}

View File

@@ -0,0 +1,45 @@
package com.usatiuk.dhfs.repository.invalidation;
import com.usatiuk.dhfs.PeerId;
import com.usatiuk.dhfs.RemoteObjectMeta;
import com.usatiuk.dhfs.RemoteTransaction;
import com.usatiuk.dhfs.repository.JDataRemoteDto;
import com.usatiuk.dhfs.repository.JDataRemotePush;
import com.usatiuk.dhfs.repository.syncmap.DtoMapperService;
import com.usatiuk.objects.transaction.Transaction;
import com.usatiuk.objects.transaction.TransactionManager;
import io.quarkus.logging.Log;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import org.apache.commons.lang3.tuple.Pair;
import java.util.List;
@ApplicationScoped
public class RemoteObjectMetaOpExtractor implements OpExtractor<RemoteObjectMeta> {
@Inject
TransactionManager txm;
@Inject
Transaction curTx;
@Inject
RemoteTransaction remoteTransaction;
@Inject
DtoMapperService dtoMapperService;
@Override
public Pair<List<Op>, Runnable> extractOps(RemoteObjectMeta data, PeerId peerId) {
return txm.run(() -> {
JDataRemoteDto dto =
data.knownType().isAnnotationPresent(JDataRemotePush.class)
? remoteTransaction.getData(data.knownType(), data.key())
.map(d -> dtoMapperService.toDto(d, d.dtoClass())).orElse(null)
: null;
if (data.knownType().isAnnotationPresent(JDataRemotePush.class) && dto == null) {
Log.warnv("Failed to get data for push {0} of type {1}", data.key(), data.knownType());
}
return Pair.of(List.of(new IndexUpdateOp(data.key(), data.changelog(), dto)), () -> {
});
});
}
}