diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/RemoteObjPusherTxHook.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/RemoteObjPusherTxHook.java index cdfa2dab..2b976acb 100644 --- a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/RemoteObjPusherTxHook.java +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/RemoteObjPusherTxHook.java @@ -23,7 +23,7 @@ public class RemoteObjPusherTxHook implements PreCommitTxHook { }; if (invalidate) { - invalidationQueueService.pushInvalidationToAll(cur.key()); + curTx.onCommit(() -> invalidationQueueService.pushInvalidationToAll(cur.key())); } } @@ -33,7 +33,7 @@ public class RemoteObjPusherTxHook implements PreCommitTxHook { return; } - invalidationQueueService.pushInvalidationToAll(remote.key()); + curTx.onCommit(() -> invalidationQueueService.pushInvalidationToAll(remote.key())); } @Override diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/RemoteObjectDeleter.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/RemoteObjectDeleter.java index a6674f62..2b6c11ad 100644 --- a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/RemoteObjectDeleter.java +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/RemoteObjectDeleter.java @@ -1,5 +1,6 @@ package com.usatiuk.dhfs.objects; +import com.usatiuk.dhfs.objects.repository.AutosyncProcessor; import com.usatiuk.dhfs.objects.repository.RemoteObjectServiceClient; import com.usatiuk.dhfs.objects.repository.peersync.PeerInfo; import com.usatiuk.dhfs.objects.repository.peersync.PeerInfoService; @@ -42,6 +43,8 @@ public class RemoteObjectDeleter { PeerInfoService peerInfoService; @Inject RemoteObjectServiceClient remoteObjectServiceClient; + @Inject + AutosyncProcessor autosyncProcessor; @ConfigProperty(name = "dhfs.objects.move-processor.threads") int moveProcessorThreads; @@ -138,8 +141,8 @@ public class RemoteObjectDeleter { for (var r : ret) { if (!r.getValue().getDeletionCandidate()) { -// for (var rr : r.getReferrersList()) -// autoSyncProcessor.add(rr); + for (var rr : r.getRight().getReferrersList()) + curTx.onCommit(() -> autosyncProcessor.add(JObjectKey.of(rr.getName()))); } else { target = target.withConfirmedDeletes(target.confirmedDeletes().plus(r.getKey())); ok++; diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/AutosyncProcessor.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/AutosyncProcessor.java new file mode 100644 index 00000000..365d4f5e --- /dev/null +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/AutosyncProcessor.java @@ -0,0 +1,121 @@ +package com.usatiuk.dhfs.objects.repository; + +import com.usatiuk.dhfs.objects.*; +import com.usatiuk.dhfs.objects.iterators.IteratorStart; +import com.usatiuk.dhfs.objects.transaction.Transaction; +import com.usatiuk.dhfs.objects.transaction.TransactionManager; +import com.usatiuk.dhfs.utils.HashSetDelayedBlockingQueue; +import io.quarkus.logging.Log; +import io.quarkus.runtime.ShutdownEvent; +import io.quarkus.runtime.StartupEvent; +import jakarta.annotation.Priority; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.enterprise.event.Observes; +import jakarta.inject.Inject; +import org.apache.commons.lang3.concurrent.BasicThreadFactory; +import org.eclipse.microprofile.config.inject.ConfigProperty; + +import java.util.ArrayList; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +@ApplicationScoped +public class AutosyncProcessor { + private final HashSetDelayedBlockingQueue _pending = new HashSetDelayedBlockingQueue<>(0); + private final HashSetDelayedBlockingQueue _retries = new HashSetDelayedBlockingQueue<>(10000); //FIXME: + @Inject + TransactionManager txm; + @ConfigProperty(name = "dhfs.objects.autosync.threads") + int autosyncThreads; + @Inject + ExecutorService executorService; + @Inject + Transaction curTx; + @Inject + RemoteTransaction remoteTx; + private ExecutorService _autosyncExcecutor; + + void init(@Observes @Priority(300) StartupEvent event) { + BasicThreadFactory factory = new BasicThreadFactory.Builder() + .namingPattern("autosync-%d") + .build(); + + _autosyncExcecutor = Executors.newFixedThreadPool(autosyncThreads, factory); + for (int i = 0; i < autosyncThreads; i++) { + _autosyncExcecutor.submit(this::autosync); + } + + executorService.submit(() -> { + txm.run(() -> { + Log.info("Adding all to autosync"); + ArrayList objs = new ArrayList<>(); + txm.run(() -> { + try (var it = curTx.getIterator(IteratorStart.GE, JObjectKey.first())) { + while (it.hasNext()) { + var key = it.peekNextKey(); + objs.add(key); + // TODO: Nested transactions + it.skip(); + } + } + }); + + for (var obj : objs) { + txm.run(() -> { + var gotObj = curTx.get(JData.class, obj).orElse(null); + if (!(gotObj instanceof RemoteObjectMeta meta)) + return; + if (!meta.hasLocalData()) + add(meta.key()); + }); + } + }); + Log.info("Adding all to autosync: finished"); + }); + } + + void shutdown(@Observes @Priority(10) ShutdownEvent event) { + _autosyncExcecutor.shutdownNow(); + } + + public void add(JObjectKey name) { + _pending.add(name); + } + + private void autosync() { + try { + while (!Thread.interrupted()) { + JObjectKey name = null; + + while (name == null) { + name = _pending.tryGet(); + if (name == null) + name = _retries.tryGet(); + if (name == null) + name = _pending.get(1000L); //FIXME: + } + + try { + JObjectKey finalName = name; + boolean ok = txm.run(() -> { + var obj = remoteTx.getMeta(finalName).orElse(null); + if (obj == null) return true; + if (obj.hasLocalData()) return true; + var data = remoteTx.getData(JDataRemote.class, finalName); + return data.isPresent(); + }); + if (!ok) { + Log.debug("Failed downloading object " + name + ", will retry."); + _retries.add(name); + } + } catch (Exception e) { + Log.debug("Failed downloading object " + name + ", will retry.", e); + _retries.add(name); + } + } + } catch (InterruptedException ignored) { + } + Log.info("Autosync thread exiting"); + + } +} diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/AutosyncTxHook.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/AutosyncTxHook.java new file mode 100644 index 00000000..5394b328 --- /dev/null +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/AutosyncTxHook.java @@ -0,0 +1,56 @@ +package com.usatiuk.dhfs.objects.repository; + +import com.usatiuk.dhfs.objects.JData; +import com.usatiuk.dhfs.objects.JObjectKey; +import com.usatiuk.dhfs.objects.RemoteObjectMeta; +import com.usatiuk.dhfs.objects.repository.invalidation.InvalidationQueueService; +import com.usatiuk.dhfs.objects.transaction.PreCommitTxHook; +import com.usatiuk.dhfs.objects.transaction.Transaction; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; +import org.eclipse.microprofile.config.inject.ConfigProperty; + +@ApplicationScoped +public class AutosyncTxHook implements PreCommitTxHook { + @Inject + Transaction curTx; + @Inject + InvalidationQueueService invalidationQueueService; + @Inject + AutosyncProcessor autosyncProcessor; + + @ConfigProperty(name = "dhfs.objects.autosync.download-all") + boolean downloadAll; + + @Override + public void onChange(JObjectKey key, JData old, JData cur) { + if (!(cur instanceof RemoteObjectMeta meta)) + return; + + if (!meta.hasLocalData() && downloadAll) { + curTx.onCommit(() -> autosyncProcessor.add(meta.key())); + } + } + + @Override + public void onCreate(JObjectKey key, JData cur) { + if (!(cur instanceof RemoteObjectMeta meta)) + return; + + if (!meta.hasLocalData() && downloadAll) { + curTx.onCommit(() -> autosyncProcessor.add(meta.key())); + } + } + + @Override + public void onDelete(JObjectKey key, JData cur) { + if (!(cur instanceof RemoteObjectMeta remote)) { + return; + } + } + + @Override + public int getPriority() { + return 100; + } +} diff --git a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/RemoteObjectServiceServer.java b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/RemoteObjectServiceServer.java index db89789d..aa14eb23 100644 --- a/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/RemoteObjectServiceServer.java +++ b/dhfs-parent/server/src/main/java/com/usatiuk/dhfs/objects/repository/RemoteObjectServiceServer.java @@ -50,6 +50,8 @@ public class RemoteObjectServiceServer implements DhfsObjectSyncGrpc { OpHandler opHandler; @Inject DtoMapperService dtoMapperService; + @Inject + AutosyncProcessor autosyncProcessor; @Override @Blocking @@ -137,13 +139,12 @@ public class RemoteObjectServiceServer implements DhfsObjectSyncGrpc { builder.setDeletionCandidate(!obj.frozen() && obj.refsFrom().isEmpty()); - if (!builder.getDeletionCandidate()) - for (var r : obj.refsFrom()) + if (!builder.getDeletionCandidate()) { + for (var r : obj.refsFrom()) { builder.addReferrers(JObjectKeyP.newBuilder().setName(r.toString()).build()); - -// if (!ret.getDeletionCandidate()) -// for (var rr : request.getOurReferrersList()) -// autoSyncProcessor.add(rr); + curTx.onCommit(() -> autosyncProcessor.add(r)); + } + } }); return Uni.createFrom().item(builder.build()); }