Server: mutator interface?

This commit is contained in:
2024-10-11 23:41:53 +02:00
parent 57565ad132
commit f8bbf325e5
10 changed files with 251 additions and 55 deletions

View File

@@ -14,13 +14,24 @@ public class AtomicClock implements Clock<Long>, Serializable {
return ++_max;
}
// FIXME:
public void ungetTimestamp() {
--_max;
}
public void setTimestamp(Long timestamp) {
_max = timestamp;
}
@Override
public Long peekTimestamp() {
return _max;
}
@Override
public void updateTimestamp(Long receivedTimestamp) {
public Long updateTimestamp(Long receivedTimestamp) {
var old = _max;
_max = Math.max(_max, receivedTimestamp) + 1;
return old;
}
}

View File

@@ -5,5 +5,5 @@ public interface Clock<TimestampT extends Comparable<TimestampT>> {
TimestampT peekTimestamp();
void updateTimestamp(TimestampT receivedTimestamp);
TimestampT updateTimestamp(TimestampT receivedTimestamp);
}

View File

@@ -14,7 +14,9 @@ public class TestClock implements Clock<Long> {
}
@Override
public void updateTimestamp(Long receivedTimestamp) {
public Long updateTimestamp(Long receivedTimestamp) {
var old = max;
max = Math.max(max, receivedTimestamp) + 1;
return old;
}
}

View File

@@ -119,13 +119,26 @@ public class JKleppmannTreeManager {
public void commitOpForHost(UUID host, Op op) {
if (!(op instanceof JKleppmannTreeOpWrapper jop))
throw new IllegalArgumentException("Invalid incoming op type for JKleppmannTree: " + op.getClass() + " " + getId());
_persistentData.get().runWriteLockedVoid(JObjectManager.ResolutionStrategy.LOCAL_ONLY, (m, d, b, v) -> {
_persistentData.get().bumpVer();
var got = d.getQueues().get(host).pollFirstEntry().getValue();
if (!Objects.equals(jop.getOp(), got)) {
throw new IllegalArgumentException("Committed op push was not the oldest");
_persistentData.get().assertRwLock();
_persistentData.get().tryResolve(JObjectManager.ResolutionStrategy.LOCAL_ONLY);
var got = _persistentData.get().getData().getQueues().get(host).pollFirstEntry().getValue();
if (!Objects.equals(jop.getOp(), got))
throw new IllegalArgumentException("Committed op push was not the oldest");
_persistentData.get().mutate(new JMutator<>() {
@Override
public boolean mutate(JKleppmannTreePersistentData object) {
_persistentData.get().getData().getQueues().get(host).pollFirstEntry();
return true;
}
@Override
public void revert(JKleppmannTreePersistentData object) {
_persistentData.get().getData().getQueues().get(host).put(jop.getOp().timestamp(), jop.getOp());
}
});
}
@Override
@@ -171,37 +184,69 @@ public class JKleppmannTreeManager {
@Override
public void addToTx() {
// FIXME: a hack
_persistentData.get().rwLock();
_persistentData.get().rwLockNoCopy();
_persistentData.get().rwUnlock();
}
private class JOpRecorder implements OpRecorder<Long, UUID, JKleppmannTreeNodeMeta, String> {
@Override
public void recordOp(OpMove<Long, UUID, JKleppmannTreeNodeMeta, String> op) {
_persistentData.get().runWriteLockedVoid(JObjectManager.ResolutionStrategy.LOCAL_ONLY, (m, d, b, v) -> {
_persistentData.get().bumpVer();
d.recordOp(persistentPeerDataService.getHostUuids(), op);
opSender.push(JKleppmannTree.this);
_persistentData.get().assertRwLock();
_persistentData.get().tryResolve(JObjectManager.ResolutionStrategy.LOCAL_ONLY);
_persistentData.get().mutate(new JMutator<>() {
@Override
public boolean mutate(JKleppmannTreePersistentData object) {
object.recordOp(persistentPeerDataService.getHostUuids(), op);
return true;
}
@Override
public void revert(JKleppmannTreePersistentData object) {
object.removeOp(persistentPeerDataService.getHostUuids(), op);
}
});
opSender.push(JKleppmannTree.this);
}
@Override
public void recordOpForPeer(UUID peer, OpMove<Long, UUID, JKleppmannTreeNodeMeta, String> op) {
_persistentData.get().runWriteLockedVoid(JObjectManager.ResolutionStrategy.LOCAL_ONLY, (m, d, b, v) -> {
_persistentData.get().bumpVer();
d.recordOp(peer, op);
opSender.push(JKleppmannTree.this);
_persistentData.get().assertRwLock();
_persistentData.get().tryResolve(JObjectManager.ResolutionStrategy.LOCAL_ONLY);
_persistentData.get().mutate(new JMutator<>() {
@Override
public boolean mutate(JKleppmannTreePersistentData object) {
object.recordOp(peer, op);
return true;
}
@Override
public void revert(JKleppmannTreePersistentData object) {
object.removeOp(peer, op);
}
});
opSender.push(JKleppmannTree.this);
}
}
private class JKleppmannTreeClock implements Clock<Long> {
@Override
public Long getTimestamp() {
return _persistentData.get().runWriteLocked(JObjectManager.ResolutionStrategy.LOCAL_ONLY, (m, d, b, v) -> {
_persistentData.get().bumpVer();
return d.getClock().getTimestamp();
_persistentData.get().assertRwLock();
_persistentData.get().tryResolve(JObjectManager.ResolutionStrategy.LOCAL_ONLY);
var ret = _persistentData.get().getData().getClock().peekTimestamp() + 1;
_persistentData.get().mutate(new JMutator<>() {
@Override
public boolean mutate(JKleppmannTreePersistentData object) {
object.getClock().getTimestamp();
return true;
}
@Override
public void revert(JKleppmannTreePersistentData object) {
object.getClock().ungetTimestamp();
}
});
return ret;
}
@Override
@@ -210,11 +255,24 @@ public class JKleppmannTreeManager {
}
@Override
public void updateTimestamp(Long receivedTimestamp) {
_persistentData.get().runWriteLockedVoid(JObjectManager.ResolutionStrategy.LOCAL_ONLY, (m, d, b, v) -> {
_persistentData.get().bumpVer();
d.getClock().updateTimestamp(receivedTimestamp);
public Long updateTimestamp(Long receivedTimestamp) {
_persistentData.get().assertRwLock();
_persistentData.get().tryResolve(JObjectManager.ResolutionStrategy.LOCAL_ONLY);
_persistentData.get().mutate(new JMutator<>() {
Long _old;
@Override
public boolean mutate(JKleppmannTreePersistentData object) {
_old = object.getClock().updateTimestamp(receivedTimestamp);
return true;
}
@Override
public void revert(JKleppmannTreePersistentData object) {
object.getClock().setTimestamp(_old);
}
});
return _persistentData.get().getData().getClock().peekTimestamp();
}
}
@@ -289,7 +347,7 @@ public class JKleppmannTreeManager {
@Override
public void rwLock() {
_persistentData.get().rwLock();
_persistentData.get().rwLockNoCopy();
}
@Override
@@ -312,11 +370,20 @@ public class JKleppmannTreeManager {
@Override
public void putForPeer(UUID peerId, Long timestamp) {
_persistentData.get().runWriteLockedVoid(JObjectManager.ResolutionStrategy.LOCAL_ONLY,
(m, d, b, v) -> {
_persistentData.get().bumpVer();
d.getPeerTimestampLog().put(peerId, timestamp);
});
_persistentData.get().assertRwLock();
_persistentData.get().tryResolve(JObjectManager.ResolutionStrategy.LOCAL_ONLY);
_persistentData.get().mutate(new JMutator<>() {
@Override
public boolean mutate(JKleppmannTreePersistentData object) {
var old = object.getPeerTimestampLog().put(peerId, timestamp);
return Objects.equals(old, timestamp);
}
@Override
public void revert(JKleppmannTreePersistentData object) {
object.getPeerTimestampLog().remove(peerId, timestamp);
}
});
}
}
@@ -332,12 +399,24 @@ public class JKleppmannTreeManager {
@Override
public Pair<CombinedTimestamp<Long, UUID>, LogRecord<Long, UUID, JKleppmannTreeNodeMeta, String>> takeOldest() {
return _persistentData.get().runWriteLocked(JObjectManager.ResolutionStrategy.LOCAL_ONLY, (m, d, b, v) -> {
var ret = d.getLog().pollFirstEntry();
if (ret == null) return null;
_persistentData.get().bumpVer();
return Pair.of(ret);
});
_persistentData.get().assertRwLock();
_persistentData.get().tryResolve(JObjectManager.ResolutionStrategy.LOCAL_ONLY);
var ret = _persistentData.get().getData().getLog().firstEntry();
if (ret != null)
_persistentData.get().mutate(new JMutator<>() {
@Override
public boolean mutate(JKleppmannTreePersistentData object) {
object.getLog().pollFirstEntry();
return true;
}
@Override
public void revert(JKleppmannTreePersistentData object) {
object.getLog().put(ret.getKey(), ret.getValue());
}
});
return Pair.of(ret);
}
@Override
@@ -387,17 +466,39 @@ public class JKleppmannTreeManager {
@Override
public void put(CombinedTimestamp<Long, UUID> timestamp, LogRecord<Long, UUID, JKleppmannTreeNodeMeta, String> record) {
_persistentData.get().runWriteLockedVoid(JObjectManager.ResolutionStrategy.LOCAL_ONLY, (m, d, b, v) -> {
var old = d.getLog().put(timestamp, record);
_persistentData.get().bumpVer();
_persistentData.get().assertRwLock();
_persistentData.get().tryResolve(JObjectManager.ResolutionStrategy.LOCAL_ONLY);
if (_persistentData.get().getData().getLog().containsKey(timestamp))
throw new IllegalStateException("Overwriting log entry?");
_persistentData.get().mutate(new JMutator<>() {
@Override
public boolean mutate(JKleppmannTreePersistentData object) {
object.getLog().put(timestamp, record);
return true;
}
@Override
public void revert(JKleppmannTreePersistentData object) {
object.getLog().remove(timestamp, record);
}
});
}
@Override
public void replace(CombinedTimestamp<Long, UUID> timestamp, LogRecord<Long, UUID, JKleppmannTreeNodeMeta, String> record) {
_persistentData.get().runWriteLockedVoid(JObjectManager.ResolutionStrategy.LOCAL_ONLY, (m, d, b, v) -> {
var old = d.getLog().put(timestamp, record);
_persistentData.get().bumpVer();
_persistentData.get().assertRwLock();
_persistentData.get().tryResolve(JObjectManager.ResolutionStrategy.LOCAL_ONLY);
_persistentData.get().mutate(new JMutator<>() {
@Override
public boolean mutate(JKleppmannTreePersistentData object) {
var old = object.getLog().put(timestamp, record);
return !Objects.equals(old, record);
}
@Override
public void revert(JKleppmannTreePersistentData object) {
object.getLog().remove(timestamp, record);
}
});
}
}

View File

@@ -50,12 +50,23 @@ public class JKleppmannTreePersistentData extends JObjectData {
_queues.get(host).put(opMove.timestamp(), opMove);
}
public void removeOp(UUID host, OpMove<Long, UUID, JKleppmannTreeNodeMeta, String> opMove) {
_queues.get(host).remove(opMove.timestamp(), opMove);
}
public void recordOp(Collection<UUID> hosts, OpMove<Long, UUID, JKleppmannTreeNodeMeta, String> opMove) {
for (var u : hosts) {
recordOp(u, opMove);
}
}
public void removeOp(Collection<UUID> hosts, OpMove<Long, UUID, JKleppmannTreeNodeMeta, String> opMove) {
for (var u : hosts) {
removeOp(u, opMove);
}
}
@Override
public String getName() {
return nameFromTreeName(_treeName);

View File

@@ -0,0 +1,7 @@
package com.usatiuk.dhfs.objects.jrepository;
public interface JMutator<T extends JObjectData> {
boolean mutate(T object);
void revert(T object);
}

View File

@@ -28,6 +28,8 @@ public abstract class JObject<T extends JObjectData> {
});
}
public abstract void mutate(JMutator<? super T> mutator);
public abstract boolean tryResolve(JObjectManager.ResolutionStrategy resolutionStrategy);
public abstract void externalResolution(JObjectData data);

View File

@@ -595,6 +595,18 @@ public class JObjectManagerImpl implements JObjectManager {
}
}
@Override
public void mutate(JMutator<? super T> mutator) {
assertRwLock();
if (getData() == null) throw new IllegalStateException("Resolve before mutate!");
if (mutator.mutate(getData())) {
bumpVer();
jObjectTxManager.addMutator(this, mutator);
}
}
public boolean tryResolve(ResolutionStrategy resolutionStrategy) {
if (resolutionStrategy == ResolutionStrategy.LOCAL_ONLY ||
resolutionStrategy == ResolutionStrategy.REMOTE)

View File

@@ -12,10 +12,7 @@ import jakarta.inject.Inject;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Objects;
import java.util.*;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
@@ -108,6 +105,23 @@ public class JObjectTxManager {
Log.warn("Useless update for " + obj.getKey().getMeta().getName());
}
if (refVerification && !obj.getValue()._copy) {
var cur = dataProtoSerializer.serialize(obj.getKey().getData());
for (var mut : obj.getValue()._mutators.reversed())
revertMutator(obj.getKey(), mut);
var rev = dataProtoSerializer.serialize(obj.getKey().getData());
if (obj.getValue().snapshot.data() != null && !Objects.equals(rev, obj.getValue().snapshot.data()))
throw new IllegalStateException("Mutator could not be reverted for object " + obj.getKey().getMeta().getName());
for (var mut : obj.getValue()._mutators)
applyMutator(obj.getKey(), mut);
var cur2 = dataProtoSerializer.serialize(obj.getKey().getData());
if (!Objects.equals(cur, cur2))
throw new IllegalStateException("Mutator could not be reapplied for object " + obj.getKey().getMeta().getName());
}
notifyWrite(obj.getKey(),
obj.getValue().snapshot == null || !Objects.equals(obj.getValue().snapshot.meta(), metaProtoSerializer.serialize(obj.getKey().getMeta())),
obj.getValue().snapshot == null || dataDiff);
@@ -219,6 +233,14 @@ public class JObjectTxManager {
}
}
private <T extends JObjectData> void applyMutator(JObject<?> obj, JMutator<T> mutator) {
mutator.mutate((T) obj.getData());
}
private <T extends JObjectData> void revertMutator(JObject<?> obj, JMutator<T> mutator) {
mutator.revert((T) obj.getData());
}
public void rollback(String message) {
var state = _state.get();
if (state == null)
@@ -228,13 +250,15 @@ public class JObjectTxManager {
for (var obj : state._writeObjects.entrySet()) {
Log.debug("Rollback of " + obj.getKey().getMeta().getName());
try {
if (obj.getValue() == null) {
Log.warn("Skipped rollback of " + obj.getKey().getMeta().getName());
continue;
if (obj.getValue()._copy) {
obj.getKey().rollback(
metaProtoSerializer.deserialize(obj.getValue().snapshot.meta()),
obj.getValue().snapshot.data() != null ? dataProtoSerializer.deserialize(obj.getValue().snapshot.data()) : null);
} else {
for (var mut : obj.getValue()._mutators.reversed())
revertMutator(obj.getKey(), mut);
obj.getKey().rollback(metaProtoSerializer.deserialize(obj.getValue().snapshot.meta()), obj.getKey().getData());
}
obj.getKey().rollback(
metaProtoSerializer.deserialize(obj.getValue().snapshot.meta()),
obj.getValue().snapshot.data() != null ? dataProtoSerializer.deserialize(obj.getValue().snapshot.data()) : null);
obj.getKey().updateDeletionState();
} finally {
obj.getKey().rwUnlock();
@@ -345,9 +369,29 @@ public class JObjectTxManager {
metaProtoSerializer.serialize(obj.getMeta()),
(obj.getData() == null) ? null : dataProtoSerializer.serialize(obj.getData()),
obj.getMeta().changelogHash())
: null;
: new JObjectSnapshot(
metaProtoSerializer.serialize(obj.getMeta()), refVerification ? null :
(obj.getData() == null) ? null : dataProtoSerializer.serialize(obj.getData()),
obj.getMeta().changelogHash());
state._writeObjects.put(obj, new TxState.TxObjectState(snapshot));
state._writeObjects.put(obj, new TxState.TxObjectState(snapshot, copy));
}
<T extends JObjectData> void addMutator(JObject<T> obj, JMutator<? super T> mut) {
var state = _state.get();
if (state == null)
throw new IllegalStateException("Transaction not running");
obj.assertRwLock();
//TODO: Asserts for rwLock/rwLockNoCopy?
var got = state._writeObjects.get(obj);
if (got == null) throw new IllegalStateException("Object not in transaction");
if (got._copy)
throw new IllegalStateException("Mutating object locked with copy?");
got._mutators.addLast(mut);
}
private class TxState {
@@ -357,9 +401,14 @@ public class JObjectTxManager {
private static class TxObjectState {
final JObjectSnapshot snapshot;
final List<JMutator<?>> _mutators = new LinkedList<>();
boolean _forceInvalidated = false;
final boolean _copy;
private TxObjectState(JObjectSnapshot snapshot) {this.snapshot = snapshot;}
private TxObjectState(JObjectSnapshot snapshot, boolean copy) {
this.snapshot = snapshot;
_copy = copy;
}
}
}
}

View File

@@ -80,6 +80,7 @@ public class OpSender {
try {
remoteObjectServiceClient.pushOps(collected, obj.getId(), host);
jObjectTxManager.executeTx(() -> {
obj.addToTx();
for (var op : collected)
obj.commitOpForHost(host, op);
});