mirror of
https://github.com/usatiuk/dhfs.git
synced 2025-10-28 20:47:49 +01:00
Server: hopefully working autosync
This commit is contained in:
@@ -23,7 +23,7 @@ public class RemoteObjPusherTxHook implements PreCommitTxHook {
|
||||
};
|
||||
|
||||
if (invalidate) {
|
||||
invalidationQueueService.pushInvalidationToAll(cur.key());
|
||||
curTx.onCommit(() -> invalidationQueueService.pushInvalidationToAll(cur.key()));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -33,7 +33,7 @@ public class RemoteObjPusherTxHook implements PreCommitTxHook {
|
||||
return;
|
||||
}
|
||||
|
||||
invalidationQueueService.pushInvalidationToAll(remote.key());
|
||||
curTx.onCommit(() -> invalidationQueueService.pushInvalidationToAll(remote.key()));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
package com.usatiuk.dhfs.objects;
|
||||
|
||||
import com.usatiuk.dhfs.objects.repository.AutosyncProcessor;
|
||||
import com.usatiuk.dhfs.objects.repository.RemoteObjectServiceClient;
|
||||
import com.usatiuk.dhfs.objects.repository.peersync.PeerInfo;
|
||||
import com.usatiuk.dhfs.objects.repository.peersync.PeerInfoService;
|
||||
@@ -42,6 +43,8 @@ public class RemoteObjectDeleter {
|
||||
PeerInfoService peerInfoService;
|
||||
@Inject
|
||||
RemoteObjectServiceClient remoteObjectServiceClient;
|
||||
@Inject
|
||||
AutosyncProcessor autosyncProcessor;
|
||||
|
||||
@ConfigProperty(name = "dhfs.objects.move-processor.threads")
|
||||
int moveProcessorThreads;
|
||||
@@ -138,8 +141,8 @@ public class RemoteObjectDeleter {
|
||||
|
||||
for (var r : ret) {
|
||||
if (!r.getValue().getDeletionCandidate()) {
|
||||
// for (var rr : r.getReferrersList())
|
||||
// autoSyncProcessor.add(rr);
|
||||
for (var rr : r.getRight().getReferrersList())
|
||||
curTx.onCommit(() -> autosyncProcessor.add(JObjectKey.of(rr.getName())));
|
||||
} else {
|
||||
target = target.withConfirmedDeletes(target.confirmedDeletes().plus(r.getKey()));
|
||||
ok++;
|
||||
|
||||
@@ -0,0 +1,121 @@
|
||||
package com.usatiuk.dhfs.objects.repository;
|
||||
|
||||
import com.usatiuk.dhfs.objects.*;
|
||||
import com.usatiuk.dhfs.objects.iterators.IteratorStart;
|
||||
import com.usatiuk.dhfs.objects.transaction.Transaction;
|
||||
import com.usatiuk.dhfs.objects.transaction.TransactionManager;
|
||||
import com.usatiuk.dhfs.utils.HashSetDelayedBlockingQueue;
|
||||
import io.quarkus.logging.Log;
|
||||
import io.quarkus.runtime.ShutdownEvent;
|
||||
import io.quarkus.runtime.StartupEvent;
|
||||
import jakarta.annotation.Priority;
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
import jakarta.enterprise.event.Observes;
|
||||
import jakarta.inject.Inject;
|
||||
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
|
||||
import org.eclipse.microprofile.config.inject.ConfigProperty;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
|
||||
@ApplicationScoped
|
||||
public class AutosyncProcessor {
|
||||
private final HashSetDelayedBlockingQueue<JObjectKey> _pending = new HashSetDelayedBlockingQueue<>(0);
|
||||
private final HashSetDelayedBlockingQueue<JObjectKey> _retries = new HashSetDelayedBlockingQueue<>(10000); //FIXME:
|
||||
@Inject
|
||||
TransactionManager txm;
|
||||
@ConfigProperty(name = "dhfs.objects.autosync.threads")
|
||||
int autosyncThreads;
|
||||
@Inject
|
||||
ExecutorService executorService;
|
||||
@Inject
|
||||
Transaction curTx;
|
||||
@Inject
|
||||
RemoteTransaction remoteTx;
|
||||
private ExecutorService _autosyncExcecutor;
|
||||
|
||||
void init(@Observes @Priority(300) StartupEvent event) {
|
||||
BasicThreadFactory factory = new BasicThreadFactory.Builder()
|
||||
.namingPattern("autosync-%d")
|
||||
.build();
|
||||
|
||||
_autosyncExcecutor = Executors.newFixedThreadPool(autosyncThreads, factory);
|
||||
for (int i = 0; i < autosyncThreads; i++) {
|
||||
_autosyncExcecutor.submit(this::autosync);
|
||||
}
|
||||
|
||||
executorService.submit(() -> {
|
||||
txm.run(() -> {
|
||||
Log.info("Adding all to autosync");
|
||||
ArrayList<JObjectKey> objs = new ArrayList<>();
|
||||
txm.run(() -> {
|
||||
try (var it = curTx.getIterator(IteratorStart.GE, JObjectKey.first())) {
|
||||
while (it.hasNext()) {
|
||||
var key = it.peekNextKey();
|
||||
objs.add(key);
|
||||
// TODO: Nested transactions
|
||||
it.skip();
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
for (var obj : objs) {
|
||||
txm.run(() -> {
|
||||
var gotObj = curTx.get(JData.class, obj).orElse(null);
|
||||
if (!(gotObj instanceof RemoteObjectMeta meta))
|
||||
return;
|
||||
if (!meta.hasLocalData())
|
||||
add(meta.key());
|
||||
});
|
||||
}
|
||||
});
|
||||
Log.info("Adding all to autosync: finished");
|
||||
});
|
||||
}
|
||||
|
||||
void shutdown(@Observes @Priority(10) ShutdownEvent event) {
|
||||
_autosyncExcecutor.shutdownNow();
|
||||
}
|
||||
|
||||
public void add(JObjectKey name) {
|
||||
_pending.add(name);
|
||||
}
|
||||
|
||||
private void autosync() {
|
||||
try {
|
||||
while (!Thread.interrupted()) {
|
||||
JObjectKey name = null;
|
||||
|
||||
while (name == null) {
|
||||
name = _pending.tryGet();
|
||||
if (name == null)
|
||||
name = _retries.tryGet();
|
||||
if (name == null)
|
||||
name = _pending.get(1000L); //FIXME:
|
||||
}
|
||||
|
||||
try {
|
||||
JObjectKey finalName = name;
|
||||
boolean ok = txm.run(() -> {
|
||||
var obj = remoteTx.getMeta(finalName).orElse(null);
|
||||
if (obj == null) return true;
|
||||
if (obj.hasLocalData()) return true;
|
||||
var data = remoteTx.getData(JDataRemote.class, finalName);
|
||||
return data.isPresent();
|
||||
});
|
||||
if (!ok) {
|
||||
Log.debug("Failed downloading object " + name + ", will retry.");
|
||||
_retries.add(name);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
Log.debug("Failed downloading object " + name + ", will retry.", e);
|
||||
_retries.add(name);
|
||||
}
|
||||
}
|
||||
} catch (InterruptedException ignored) {
|
||||
}
|
||||
Log.info("Autosync thread exiting");
|
||||
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,56 @@
|
||||
package com.usatiuk.dhfs.objects.repository;
|
||||
|
||||
import com.usatiuk.dhfs.objects.JData;
|
||||
import com.usatiuk.dhfs.objects.JObjectKey;
|
||||
import com.usatiuk.dhfs.objects.RemoteObjectMeta;
|
||||
import com.usatiuk.dhfs.objects.repository.invalidation.InvalidationQueueService;
|
||||
import com.usatiuk.dhfs.objects.transaction.PreCommitTxHook;
|
||||
import com.usatiuk.dhfs.objects.transaction.Transaction;
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
import jakarta.inject.Inject;
|
||||
import org.eclipse.microprofile.config.inject.ConfigProperty;
|
||||
|
||||
@ApplicationScoped
|
||||
public class AutosyncTxHook implements PreCommitTxHook {
|
||||
@Inject
|
||||
Transaction curTx;
|
||||
@Inject
|
||||
InvalidationQueueService invalidationQueueService;
|
||||
@Inject
|
||||
AutosyncProcessor autosyncProcessor;
|
||||
|
||||
@ConfigProperty(name = "dhfs.objects.autosync.download-all")
|
||||
boolean downloadAll;
|
||||
|
||||
@Override
|
||||
public void onChange(JObjectKey key, JData old, JData cur) {
|
||||
if (!(cur instanceof RemoteObjectMeta meta))
|
||||
return;
|
||||
|
||||
if (!meta.hasLocalData() && downloadAll) {
|
||||
curTx.onCommit(() -> autosyncProcessor.add(meta.key()));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onCreate(JObjectKey key, JData cur) {
|
||||
if (!(cur instanceof RemoteObjectMeta meta))
|
||||
return;
|
||||
|
||||
if (!meta.hasLocalData() && downloadAll) {
|
||||
curTx.onCommit(() -> autosyncProcessor.add(meta.key()));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onDelete(JObjectKey key, JData cur) {
|
||||
if (!(cur instanceof RemoteObjectMeta remote)) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getPriority() {
|
||||
return 100;
|
||||
}
|
||||
}
|
||||
@@ -50,6 +50,8 @@ public class RemoteObjectServiceServer implements DhfsObjectSyncGrpc {
|
||||
OpHandler opHandler;
|
||||
@Inject
|
||||
DtoMapperService dtoMapperService;
|
||||
@Inject
|
||||
AutosyncProcessor autosyncProcessor;
|
||||
|
||||
@Override
|
||||
@Blocking
|
||||
@@ -137,13 +139,12 @@ public class RemoteObjectServiceServer implements DhfsObjectSyncGrpc {
|
||||
|
||||
builder.setDeletionCandidate(!obj.frozen() && obj.refsFrom().isEmpty());
|
||||
|
||||
if (!builder.getDeletionCandidate())
|
||||
for (var r : obj.refsFrom())
|
||||
if (!builder.getDeletionCandidate()) {
|
||||
for (var r : obj.refsFrom()) {
|
||||
builder.addReferrers(JObjectKeyP.newBuilder().setName(r.toString()).build());
|
||||
|
||||
// if (!ret.getDeletionCandidate())
|
||||
// for (var rr : request.getOurReferrersList())
|
||||
// autoSyncProcessor.add(rr);
|
||||
curTx.onCommit(() -> autosyncProcessor.add(r));
|
||||
}
|
||||
}
|
||||
});
|
||||
return Uni.createFrom().item(builder.build());
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user