Server: tree op push ack (to motivate garbage collection)

This commit is contained in:
2025-03-23 23:18:49 +01:00
parent 035f64df5a
commit e0b4f97349
6 changed files with 36 additions and 41 deletions

View File

@@ -187,16 +187,14 @@ public class KleppmannTree<TimestampT extends Comparable<TimestampT>, PeerIdT ex
return true;
}
public boolean updateExternalTimestamp(PeerIdT from, TimestampT timestamp) {
// TODO: Ideally no point in this separate locking?
public void updateExternalTimestamp(PeerIdT from, TimestampT timestamp) {
var gotExt = _storage.getPeerTimestampLog().getForPeer(from);
var gotSelf = _storage.getPeerTimestampLog().getForPeer(_peers.getSelfId());
if ((gotExt != null && gotExt.compareTo(timestamp) >= 0)
&& (gotSelf != null && gotSelf.compareTo(_clock.peekTimestamp()) >= 0)) return false;
updateTimestampImpl(_peers.getSelfId(), _clock.peekTimestamp()); // FIXME:? Kind of a hack?
updateTimestampImpl(from, timestamp);
if (!(gotExt != null && gotExt.compareTo(timestamp) >= 0))
updateTimestampImpl(from, timestamp);
if (!(gotSelf != null && gotSelf.compareTo(_clock.peekTimestamp()) >= 0))
updateTimestampImpl(_peers.getSelfId(), _clock.peekTimestamp()); // FIXME:? Kind of a hack?
tryTrimLog();
return true;
}
private void applyOp(PeerIdT from, OpMove<TimestampT, PeerIdT, MetaT, NodeIdT> op, boolean failCreatingIfExists) {

View File

@@ -6,6 +6,7 @@ import com.usatiuk.dhfs.objects.jkleppmanntree.structs.JKleppmannTreeNode;
import com.usatiuk.dhfs.objects.jkleppmanntree.structs.JKleppmannTreeNodeMeta;
import com.usatiuk.dhfs.objects.jkleppmanntree.structs.JKleppmannTreeNodeMetaDirectory;
import com.usatiuk.dhfs.objects.jkleppmanntree.structs.JKleppmannTreePersistentData;
import com.usatiuk.dhfs.objects.repository.PersistentPeerDataService;
import com.usatiuk.dhfs.objects.repository.invalidation.Op;
import com.usatiuk.dhfs.objects.repository.peersync.PeerInfoService;
import com.usatiuk.dhfs.objects.transaction.LockingStrategy;
@@ -34,6 +35,8 @@ public class JKleppmannTreeManager {
JKleppmannTreePeerInterface peerInterface;
@Inject
PeerInfoService peerInfoService;
@Inject
PersistentPeerDataService persistentPeerDataService;
public JKleppmannTree getTree(JObjectKey name, LockingStrategy lockingStrategy) {
return txManager.executeTx(() -> {
@@ -113,6 +116,9 @@ public class JKleppmannTreeManager {
// @Override
public void commitOpForHost(PeerId host, Op op) {
if (op instanceof JKleppmannTreePeriodicPushOp)
return;
if (!(op instanceof JKleppmannTreeOpWrapper jop))
throw new IllegalArgumentException("Invalid incoming op type for JKleppmannTree: " + op.getClass());
@@ -133,9 +139,10 @@ public class JKleppmannTreeManager {
}
// @Override
public boolean acceptExternalOp(PeerId from, Op op) {
if (op instanceof JKleppmannTreePeriodicPushOp pushOp) {
return _tree.updateExternalTimestamp(pushOp.getFrom(), pushOp.getTimestamp());
public void acceptExternalOp(PeerId from, Op op) {
if (op instanceof JKleppmannTreePeriodicPushOp(JObjectKey treeName, PeerId from1, long timestamp)) {
_tree.updateExternalTimestamp(from1, timestamp);
return;
}
if (!(op instanceof JKleppmannTreeOpWrapper jop))
@@ -192,13 +199,11 @@ public class JKleppmannTreeManager {
// }
// }
}
return true;
}
// @Override
// public Op getPeriodicPushOp() {
// return new JKleppmannTreePeriodicPushOp(persistentPeerDataService.getSelfUuid(), _clock.peekTimestamp());
// }
public Op getPeriodicPushOp() {
return new JKleppmannTreePeriodicPushOp(_treeName, persistentPeerDataService.getSelfUuid(), _clock.peekTimestamp());
}
// @Override
// public void addToTx() {

View File

@@ -1,26 +1,17 @@
package com.usatiuk.dhfs.objects.jkleppmanntree;
import com.usatiuk.dhfs.objects.JObjectKey;
import com.usatiuk.dhfs.objects.PeerId;
import com.usatiuk.dhfs.objects.repository.invalidation.Op;
public class JKleppmannTreePeriodicPushOp {
private final PeerId _from;
private final long _timestamp;
import java.io.Serializable;
import java.util.Collection;
import java.util.List;
public JKleppmannTreePeriodicPushOp(PeerId from, long timestamp) {
_from = from;
_timestamp = timestamp;
public record JKleppmannTreePeriodicPushOp(JObjectKey treeName, PeerId from,
long timestamp) implements Op, Serializable {
@Override
public Collection<JObjectKey> getEscapedRefs() {
return List.of();
}
public PeerId getFrom() {
return _from;
}
public long getTimestamp() {
return _timestamp;
}
// @Override
// public Collection<String> getEscapedRefs() {
// return List.of();
// }
}

View File

@@ -3,6 +3,7 @@ package com.usatiuk.dhfs.objects.repository.invalidation;
import com.usatiuk.dhfs.objects.PeerId;
import com.usatiuk.dhfs.objects.jkleppmanntree.JKleppmannTreeManager;
import com.usatiuk.dhfs.objects.jkleppmanntree.JKleppmannTreeOpWrapper;
import com.usatiuk.dhfs.objects.jkleppmanntree.JKleppmannTreePeriodicPushOp;
import com.usatiuk.dhfs.objects.transaction.Transaction;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
@@ -15,6 +16,8 @@ public class OpHandler {
Transaction curTx;
@Inject
JKleppmannTreeManager jKleppmannTreeManager;
@Inject
InvalidationQueueService invalidationQueueService;
public void handleOp(PeerId from, Op op) {
if (op instanceof IndexUpdateOp iu) {
@@ -22,6 +25,10 @@ public class OpHandler {
} else if (op instanceof JKleppmannTreeOpWrapper jk) {
var tree = jKleppmannTreeManager.getTree(jk.treeName());
tree.acceptExternalOp(from, jk);
curTx.onCommit(() -> invalidationQueueService.pushInvalidationToOne(from, jk.treeName()));
} else if (op instanceof JKleppmannTreePeriodicPushOp pop) {
var tree = jKleppmannTreeManager.getTree(pop.treeName());
tree.acceptExternalOp(from, pop);
}
}
}

View File

@@ -54,7 +54,7 @@ public class OpPusher {
var tree = jKleppmannTreeManager.getTree(pd.key());
if (!tree.hasPendingOpsForHost(entry.peer()))
return null;
return List.of(tree.getPeriodicPushOp());
var ops = tree.getPendingOpsForHost(entry.peer(), 1);

View File

@@ -192,12 +192,6 @@ public class DhfsFuseIT {
0 == container2.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse").getExitCode());
Log.info("Deleted");
// TODO: Fix this
Log.info("Dummy write");
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo tesempty > /root/dhfs_default/fuse/testd1").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "echo tesempty > /root/dhfs_default/fuse/testd2").getExitCode());
Log.info("Dummy written");
// FIXME?
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Deleting from persistent"), 60, TimeUnit.SECONDS, 3);
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Deleting from persistent"), 60, TimeUnit.SECONDS, 3);