pcollections in JKleppmannTree

This commit is contained in:
2025-02-16 15:38:48 +01:00
parent 1f30af50df
commit f9ad540e2d
2 changed files with 16 additions and 25 deletions

View File

@@ -45,8 +45,8 @@ public class JKleppmannTreeManager {
true,
1L,
HashTreePMap.empty(),
new HashMap<>(),
new TreeMap<>()
HashTreePMap.empty(),
TreePMap.empty()
);
curTx.put(data);
var rootNode = new JKleppmannTreeNode(JObjectKey.of(name.name() + "_jt_root"), null, new JKleppmannTreeNodeMetaDirectory(""));
@@ -330,9 +330,7 @@ public class JKleppmannTreeManager {
@Override
public void putForPeer(PeerId peerId, Long timestamp) {
var newPeerTimestampLog = new HashMap<>(_data.peerTimestampLog());
newPeerTimestampLog.put(peerId, timestamp);
_data = _data.withPeerTimestampLog(newPeerTimestampLog);
_data = _data.withPeerTimestampLog(_data.peerTimestampLog().plus(peerId, timestamp));
curTx.put(_data);
}
}
@@ -340,26 +338,23 @@ public class JKleppmannTreeManager {
private class LogWrapper implements LogInterface<Long, PeerId, JKleppmannTreeNodeMeta, JObjectKey> {
@Override
public Pair<CombinedTimestamp<Long, PeerId>, LogRecord<Long, PeerId, JKleppmannTreeNodeMeta, JObjectKey>> peekOldest() {
var ret = _data.log().firstEntry();
if (ret == null) return null;
return Pair.of(ret);
if (_data.log().isEmpty()) return null;
return Pair.of(_data.log().firstEntry());
}
@Override
public Pair<CombinedTimestamp<Long, PeerId>, LogRecord<Long, PeerId, JKleppmannTreeNodeMeta, JObjectKey>> takeOldest() {
var newLog = new TreeMap<>(_data.log());
var ret = newLog.pollFirstEntry();
_data = _data.withLog(newLog);
if (_data.log().isEmpty()) return null;
var ret = _data.log().firstEntry();
_data = _data.withLog(_data.log().minusFirstEntry());
curTx.put(_data);
if (ret == null) return null;
return Pair.of(ret);
}
@Override
public Pair<CombinedTimestamp<Long, PeerId>, LogRecord<Long, PeerId, JKleppmannTreeNodeMeta, JObjectKey>> peekNewest() {
var ret = _data.log().lastEntry();
if (ret == null) return null;
return Pair.of(ret);
if (_data.log().isEmpty()) return null;
return Pair.of(_data.log().lastEntry());
}
@Override
@@ -391,17 +386,13 @@ public class JKleppmannTreeManager {
public void put(CombinedTimestamp<Long, PeerId> timestamp, LogRecord<Long, PeerId, JKleppmannTreeNodeMeta, JObjectKey> record) {
if (_data.log().containsKey(timestamp))
throw new IllegalStateException("Overwriting log entry?");
var newLog = new TreeMap<>(_data.log());
newLog.put(timestamp, record);
_data = _data.withLog(newLog);
_data = _data.withLog(_data.log().plus(timestamp, record));
curTx.put(_data);
}
@Override
public void replace(CombinedTimestamp<Long, PeerId> timestamp, LogRecord<Long, PeerId, JKleppmannTreeNodeMeta, JObjectKey> record) {
var newLog = new TreeMap<>(_data.log());
newLog.put(timestamp, record);
_data = _data.withLog(newLog);
_data = _data.withLog(_data.log().plus(timestamp, record));
curTx.put(_data);
}
}

View File

@@ -17,8 +17,8 @@ public record JKleppmannTreePersistentData(
JObjectKey key, PCollection<JObjectKey> refsFrom, boolean frozen,
long clock,
PMap<PeerId, PSortedMap<CombinedTimestamp<Long, PeerId>, OpMove<Long, PeerId, JKleppmannTreeNodeMeta, JObjectKey>>> queues,
HashMap<PeerId, Long> peerTimestampLog,
TreeMap<CombinedTimestamp<Long, PeerId>, LogRecord<Long, PeerId, JKleppmannTreeNodeMeta, JObjectKey>> log
PMap<PeerId, Long> peerTimestampLog,
PSortedMap<CombinedTimestamp<Long, PeerId>, LogRecord<Long, PeerId, JKleppmannTreeNodeMeta, JObjectKey>> log
) implements JDataRefcounted {
@Override
public JKleppmannTreePersistentData withRefsFrom(PCollection<JObjectKey> refs) {
@@ -38,11 +38,11 @@ public record JKleppmannTreePersistentData(
return new JKleppmannTreePersistentData(key, refsFrom, frozen, clock, queues, peerTimestampLog, log);
}
public JKleppmannTreePersistentData withPeerTimestampLog(HashMap<PeerId, Long> peerTimestampLog) {
public JKleppmannTreePersistentData withPeerTimestampLog(PMap<PeerId, Long> peerTimestampLog) {
return new JKleppmannTreePersistentData(key, refsFrom, frozen, clock, queues, peerTimestampLog, log);
}
public JKleppmannTreePersistentData withLog(TreeMap<CombinedTimestamp<Long, PeerId>, LogRecord<Long, PeerId, JKleppmannTreeNodeMeta, JObjectKey>> log) {
public JKleppmannTreePersistentData withLog(PSortedMap<CombinedTimestamp<Long, PeerId>, LogRecord<Long, PeerId, JKleppmannTreeNodeMeta, JObjectKey>> log) {
return new JKleppmannTreePersistentData(key, refsFrom, frozen, clock, queues, peerTimestampLog, log);
}