2 Commits

8 changed files with 91 additions and 25 deletions

View File

@@ -54,7 +54,7 @@ public class JObjectManager {
return tx;
}
public TransactionHandle commit(TransactionPrivate tx) {
public Pair<Collection<Runnable>, TransactionHandle> commit(TransactionPrivate tx) {
verifyReady();
var writes = new LinkedHashMap<JObjectKey, TxRecord.TxObjectRecord<?>>();
var dependenciesLocked = new LinkedHashMap<JObjectKey, Optional<JDataVersionedWrapper>>();
@@ -173,16 +173,17 @@ public class JObjectManager {
if (writes.isEmpty()) {
Log.trace("Committing transaction - no changes");
for (var callback : tx.getOnCommit()) {
callback.run();
}
return new TransactionHandle() {
@Override
public void onFlush(Runnable runnable) {
runnable.run();
}
};
return Pair.of(
Stream.concat(
tx.getOnCommit().stream(),
tx.getOnFlush().stream()
).toList(),
new TransactionHandle() {
@Override
public void onFlush(Runnable runnable) {
runnable.run();
}
});
}
Log.trace("Committing transaction start");
@@ -224,20 +225,18 @@ public class JObjectManager {
return true;
}).toList());
for (var callback : tx.getOnCommit()) {
callback.run();
}
for (var callback : tx.getOnFlush()) {
addFlushCallback.accept(callback);
}
return new TransactionHandle() {
@Override
public void onFlush(Runnable runnable) {
addFlushCallback.accept(runnable);
}
};
return Pair.of(
List.copyOf(tx.getOnCommit()),
new TransactionHandle() {
@Override
public void onFlush(Runnable runnable) {
addFlushCallback.accept(runnable);
}
});
} catch (Throwable t) {
Log.trace("Error when committing transaction", t);
throw new TxCommitException(t.getMessage(), t);

View File

@@ -3,6 +3,9 @@ package com.usatiuk.dhfs.objects.transaction;
import io.quarkus.logging.Log;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import org.apache.commons.lang3.tuple.Pair;
import java.util.Collection;
@ApplicationScoped
public class TransactionManagerImpl implements TransactionManager {
@@ -28,8 +31,10 @@ public class TransactionManagerImpl implements TransactionManager {
}
Log.trace("Committing transaction");
Pair<Collection<Runnable>, TransactionHandle> ret;
try {
return jObjectManager.commit(_currentTransaction.get());
ret = jObjectManager.commit(_currentTransaction.get());
} catch (Throwable e) {
Log.trace("Transaction commit failed", e);
throw e;
@@ -37,6 +42,15 @@ public class TransactionManagerImpl implements TransactionManager {
_currentTransaction.get().close();
_currentTransaction.remove();
}
for (var r : ret.getLeft()) {
try {
r.run();
} catch (Throwable e) {
Log.error("Transaction commit hook error: ", e);
}
}
return ret.getRight();
}
@Override

View File

@@ -69,6 +69,30 @@ public abstract class ObjectsTestImpl {
});
}
@Test
void onCommitHookTest() {
txm.run(() -> {
var newParent = new Parent(JObjectKey.of("ParentOnCommitHook"), "John");
curTx.put(newParent);
curTx.onCommit(() -> txm.run(() -> {
curTx.put(new Parent(JObjectKey.of("ParentOnCommitHook2"), "John2"));
}));
});
txm.run(() -> {
curTx.onCommit(() -> txm.run(() -> {
curTx.put(new Parent(JObjectKey.of("ParentOnCommitHook3"), "John3"));
}));
});
txm.run(() -> {
var parent = curTx.get(Parent.class, new JObjectKey("ParentOnCommitHook")).orElse(null);
Assertions.assertEquals("John", parent.name());
var parent2 = curTx.get(Parent.class, new JObjectKey("ParentOnCommitHook2")).orElse(null);
Assertions.assertEquals("John2", parent2.name());
var parent3 = curTx.get(Parent.class, new JObjectKey("ParentOnCommitHook3")).orElse(null);
Assertions.assertEquals("John3", parent3.name());
});
}
@Test
void createGetObject() {
txm.run(() -> {

View File

@@ -111,7 +111,8 @@ public class PeerManager {
_states.put(host.id(), address);
if (wasReachable) return;
if (wasReachable)
return;
Log.infov("Connected to {0}", host);

View File

@@ -120,8 +120,12 @@ public class PersistentPeerDataService {
var data = curTx.get(PersistentRemoteHostsData.class, PersistentRemoteHostsData.KEY).orElse(null);
if (data == null) throw new IllegalStateException("Self data not found");
boolean exists = data.initialSyncDone().contains(peerId);
if (exists) return false;
if (exists) {
Log.tracev("Already marked sync state for {0}", peerId);
return false;
}
curTx.put(data.withInitialSyncDone(data.initialSyncDone().plus(peerId)));
Log.infov("Did mark sync state for {0}", peerId);
return true;
});
}
@@ -132,8 +136,12 @@ public class PersistentPeerDataService {
var data = curTx.get(PersistentRemoteHostsData.class, PersistentRemoteHostsData.KEY).orElse(null);
if (data == null) throw new IllegalStateException("Self data not found");
boolean exists = data.initialSyncDone().contains(peerId);
if (!exists) return false;
if (!exists) {
Log.infov("Already reset sync state for {0}", peerId);
return false;
}
curTx.put(data.withInitialSyncDone(data.initialSyncDone().minus(peerId)));
Log.infov("Did reset sync state for {0}", peerId);
return true;
});
}

View File

@@ -7,6 +7,7 @@ import com.usatiuk.dhfs.objects.iterators.IteratorStart;
import com.usatiuk.dhfs.objects.repository.invalidation.InvalidationQueueService;
import com.usatiuk.dhfs.objects.transaction.Transaction;
import com.usatiuk.dhfs.objects.transaction.TransactionManager;
import io.quarkus.logging.Log;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Instance;
import jakarta.inject.Inject;
@@ -75,6 +76,7 @@ public class SyncHandler {
public void doInitialSync(PeerId peer) {
txm.run(() -> {
Log.tracev("Will do initial sync for {0}", peer);
try (var it = curTx.getIterator(IteratorStart.GE, JObjectKey.first())) {
while (it.hasNext()) {
var key = it.peekNextKey();

View File

@@ -23,6 +23,13 @@ public class JKleppmannTreeNodeMetaPeer extends JKleppmannTreeNodeMeta {
return JObjectKey.of(id.toJObjectKey().name() + "_tree_node");
}
public static PeerId nodeIdToPeerId(JObjectKey id) {
if (!id.name().endsWith("_tree_node")) {
throw new IllegalArgumentException("Not a tree node key: " + id);
}
return PeerId.of(id.name().substring(0, id.name().length() - "_tree_node".length()));
}
@Override
public JKleppmannTreeNodeMeta withName(String name) {
assert name.equals(peerIdToNodeId(PeerId.of(getPeerId().name())).toString());

View File

@@ -6,6 +6,7 @@ import com.usatiuk.dhfs.objects.RemoteObjectDataWrapper;
import com.usatiuk.dhfs.objects.jkleppmanntree.structs.JKleppmannTreeNode;
import com.usatiuk.dhfs.objects.repository.PersistentPeerDataService;
import com.usatiuk.dhfs.objects.repository.peersync.PeerInfo;
import com.usatiuk.dhfs.objects.repository.peersync.structs.JKleppmannTreeNodeMetaPeer;
import com.usatiuk.dhfs.objects.transaction.PreCommitTxHook;
import com.usatiuk.dhfs.objects.transaction.Transaction;
import io.quarkus.logging.Log;
@@ -23,9 +24,19 @@ public class PeerInfoCertUpdateTxHook implements PreCommitTxHook {
public void onChange(JObjectKey key, JData old, JData cur) {
if (cur instanceof JKleppmannTreeNode n) {
if (n.key().name().equals("peers_jt_root")) {
// TODO: This is kinda sucky
Log.infov("Changed peer tree root: {0} to {1}", key, cur);
curTx.onCommit(() -> persistentPeerDataService.updateCerts());
if (!(old instanceof JKleppmannTreeNode oldNode))
throw new IllegalStateException("Old node is not a tree node");
for (var curRef : oldNode.children().entrySet()) {
if (!n.children().containsKey(curRef.getKey())) {
Log.infov("Will reset sync state for {0}", curRef.getValue());
curTx.onCommit(() -> persistentPeerDataService.resetInitialSyncDone(JKleppmannTreeNodeMetaPeer.nodeIdToPeerId(curRef.getValue())));
}
}
return;
}
}