mirror of
https://github.com/usatiuk/dhfs.git
synced 2025-10-28 20:47:49 +01:00
start using pcollections
This commit is contained in:
@@ -2,8 +2,13 @@ package com.usatiuk.dhfs.objects;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
||||
public record JObjectKey(String name) implements Serializable {
|
||||
public record JObjectKey(String name) implements Serializable, Comparable<JObjectKey> {
|
||||
public static JObjectKey of(String name) {
|
||||
return new JObjectKey(name);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int compareTo(JObjectKey o) {
|
||||
return name.compareTo(o.name);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -78,6 +78,11 @@
|
||||
<artifactId>commons-collections4</artifactId>
|
||||
<version>4.5.0-M2</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.pcollections</groupId>
|
||||
<artifactId>pcollections</artifactId>
|
||||
<version>4.0.2</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</dependencyManagement>
|
||||
|
||||
|
||||
@@ -126,6 +126,10 @@
|
||||
<groupId>org.apache.commons</groupId>
|
||||
<artifactId>commons-collections4</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.pcollections</groupId>
|
||||
<artifactId>pcollections</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.commons</groupId>
|
||||
<artifactId>commons-math3</artifactId>
|
||||
|
||||
@@ -3,18 +3,17 @@ package com.usatiuk.dhfs.files.objects;
|
||||
import com.google.protobuf.ByteString;
|
||||
import com.usatiuk.dhfs.objects.JDataRefcounted;
|
||||
import com.usatiuk.dhfs.objects.JObjectKey;
|
||||
import org.pcollections.PCollection;
|
||||
import org.pcollections.TreePSet;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.LinkedHashSet;
|
||||
|
||||
public record ChunkData(JObjectKey key, Collection<JObjectKey> refsFrom, boolean frozen,
|
||||
public record ChunkData(JObjectKey key, PCollection<JObjectKey> refsFrom, boolean frozen,
|
||||
ByteString data) implements JDataRefcounted {
|
||||
public ChunkData(JObjectKey key, ByteString data) {
|
||||
this(key, new LinkedHashSet<>(), false, data);
|
||||
this(key, TreePSet.empty(), false, data);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChunkData withRefsFrom(Collection<JObjectKey> refs) {
|
||||
public ChunkData withRefsFrom(PCollection<JObjectKey> refs) {
|
||||
return new ChunkData(key, refs, frozen, data);
|
||||
}
|
||||
|
||||
|
||||
@@ -1,15 +1,17 @@
|
||||
package com.usatiuk.dhfs.files.objects;
|
||||
|
||||
import com.usatiuk.dhfs.objects.JObjectKey;
|
||||
import org.pcollections.PCollection;
|
||||
import org.pcollections.TreePMap;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.NavigableMap;
|
||||
|
||||
public record File(JObjectKey key, Collection<JObjectKey> refsFrom, boolean frozen, long mode, long cTime, long mTime,
|
||||
NavigableMap<Long, JObjectKey> chunks, boolean symlink, long size
|
||||
public record File(JObjectKey key, PCollection<JObjectKey> refsFrom, boolean frozen,
|
||||
long mode, long cTime, long mTime,
|
||||
TreePMap<Long, JObjectKey> chunks, boolean symlink, long size
|
||||
) implements FsNode {
|
||||
@Override
|
||||
public File withRefsFrom(Collection<JObjectKey> refs) {
|
||||
public File withRefsFrom(PCollection<JObjectKey> refs) {
|
||||
return new File(key, refs, frozen, mode, cTime, mTime, chunks, symlink, size);
|
||||
}
|
||||
|
||||
@@ -18,7 +20,7 @@ public record File(JObjectKey key, Collection<JObjectKey> refsFrom, boolean froz
|
||||
return new File(key, refsFrom, frozen, mode, cTime, mTime, chunks, symlink, size);
|
||||
}
|
||||
|
||||
public File withChunks(NavigableMap<Long, JObjectKey> chunks) {
|
||||
public File withChunks(TreePMap<Long, JObjectKey> chunks) {
|
||||
return new File(key, refsFrom, frozen, mode, cTime, mTime, chunks, symlink, size);
|
||||
}
|
||||
|
||||
|
||||
@@ -25,6 +25,8 @@ import jakarta.enterprise.event.Observes;
|
||||
import jakarta.inject.Inject;
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
import org.eclipse.microprofile.config.inject.ConfigProperty;
|
||||
import org.pcollections.TreePMap;
|
||||
import org.pcollections.TreePSet;
|
||||
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.nio.file.Path;
|
||||
@@ -150,7 +152,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
|
||||
var fuuid = UUID.randomUUID();
|
||||
Log.debug("Creating file " + fuuid);
|
||||
File f = new File(JObjectKey.of(fuuid.toString()), new HashSet<>(), false, mode, System.currentTimeMillis(), System.currentTimeMillis(), new TreeMap<>(), false, 0);
|
||||
File f = new File(JObjectKey.of(fuuid.toString()), TreePSet.empty(), false, mode, System.currentTimeMillis(), System.currentTimeMillis(), TreePMap.empty(), false, 0);
|
||||
curTx.put(f);
|
||||
|
||||
try {
|
||||
@@ -371,8 +373,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
file = curTx.get(File.class, fileUuid).orElse(null);
|
||||
}
|
||||
|
||||
// FIXME: Some kind of immutable interface?
|
||||
var chunksAll = Collections.unmodifiableNavigableMap(file.chunks());
|
||||
var chunksAll = file.chunks();
|
||||
var first = chunksAll.floorEntry(offset);
|
||||
var last = chunksAll.lowerEntry(offset + data.size());
|
||||
NavigableMap<Long, JObjectKey> removedChunks = new TreeMap<>();
|
||||
@@ -494,16 +495,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
}
|
||||
}
|
||||
|
||||
NavigableMap<Long, JObjectKey> realNewChunks = new TreeMap<>();
|
||||
for (var chunk : chunksAll.entrySet()) {
|
||||
if (!removedChunks.containsKey(chunk.getKey())) {
|
||||
realNewChunks.put(chunk.getKey(), chunk.getValue());
|
||||
}
|
||||
}
|
||||
|
||||
realNewChunks.putAll(newChunks);
|
||||
|
||||
file = file.withChunks(Collections.unmodifiableNavigableMap(realNewChunks)).withMTime(System.currentTimeMillis());
|
||||
file = file.withChunks(file.chunks().minusAll(removedChunks.keySet()).plusAll(newChunks)).withMTime(System.currentTimeMillis());
|
||||
curTx.put(file);
|
||||
cleanupChunks(file, removedChunks.values());
|
||||
updateFileSize(file);
|
||||
@@ -525,9 +517,9 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
}
|
||||
|
||||
if (length == 0) {
|
||||
var oldChunks = Collections.unmodifiableNavigableMap(new TreeMap<>(file.chunks()));
|
||||
var oldChunks = file.chunks();
|
||||
|
||||
file = file.withChunks(new TreeMap<>()).withMTime(System.currentTimeMillis());
|
||||
file = file.withChunks(TreePMap.empty()).withMTime(System.currentTimeMillis());
|
||||
curTx.put(file);
|
||||
cleanupChunks(file, oldChunks.values());
|
||||
updateFileSize(file);
|
||||
@@ -537,7 +529,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
var curSize = size(fileUuid);
|
||||
if (curSize == length) return true;
|
||||
|
||||
var chunksAll = Collections.unmodifiableNavigableMap(file.chunks());
|
||||
var chunksAll = file.chunks();
|
||||
NavigableMap<Long, JObjectKey> removedChunks = new TreeMap<>();
|
||||
NavigableMap<Long, JObjectKey> newChunks = new TreeMap<>();
|
||||
|
||||
@@ -588,16 +580,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
newChunks.put(tail.getKey(), newChunkData.key());
|
||||
}
|
||||
|
||||
NavigableMap<Long, JObjectKey> realNewChunks = new TreeMap<>();
|
||||
for (var chunk : chunksAll.entrySet()) {
|
||||
if (!removedChunks.containsKey(chunk.getKey())) {
|
||||
realNewChunks.put(chunk.getKey(), chunk.getValue());
|
||||
}
|
||||
}
|
||||
|
||||
realNewChunks.putAll(newChunks);
|
||||
|
||||
file = file.withChunks(Collections.unmodifiableNavigableMap(realNewChunks)).withMTime(System.currentTimeMillis());
|
||||
file = file.withChunks(file.chunks().minusAll(removedChunks.keySet()).plusAll(newChunks)).withMTime(System.currentTimeMillis());
|
||||
curTx.put(file);
|
||||
cleanupChunks(file, removedChunks.values());
|
||||
updateFileSize(file);
|
||||
@@ -633,10 +616,10 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
var fuuid = UUID.randomUUID();
|
||||
Log.debug("Creating file " + fuuid);
|
||||
|
||||
File f = new File(JObjectKey.of(fuuid.toString()), new HashSet<>(), false, 0, System.currentTimeMillis(), System.currentTimeMillis(), new TreeMap<>(), true, 0);
|
||||
ChunkData newChunkData = createChunk(UnsafeByteOperations.unsafeWrap(oldpath.getBytes(StandardCharsets.UTF_8)));
|
||||
File f = new File(JObjectKey.of(fuuid.toString()), TreePSet.empty(),
|
||||
false, 0, System.currentTimeMillis(), System.currentTimeMillis(), TreePMap.<Long, JObjectKey>empty().plus(0L, newChunkData.key()), true, 0);
|
||||
|
||||
f.chunks().put(0L, newChunkData.key());
|
||||
updateFileSize(f);
|
||||
|
||||
getTree().move(parent.key(), new JKleppmannTreeNodeMetaFile(fname, f.key()), getTree().getNewNodeId());
|
||||
@@ -662,8 +645,8 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
jObjectTxManager.executeTx(() -> {
|
||||
long realSize = 0;
|
||||
|
||||
var last = file.chunks().lastEntry();
|
||||
if (last != null) {
|
||||
if (!file.chunks().isEmpty()) {
|
||||
var last = file.chunks().lastEntry();
|
||||
var lastSize = getChunkSize(last.getValue());
|
||||
realSize = last.getKey() + lastSize;
|
||||
}
|
||||
|
||||
@@ -1,12 +1,14 @@
|
||||
package com.usatiuk.dhfs.objects;
|
||||
|
||||
import org.pcollections.PCollection;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
|
||||
public interface JDataRefcounted extends JData {
|
||||
Collection<JObjectKey> refsFrom();
|
||||
PCollection<JObjectKey> refsFrom();
|
||||
|
||||
JDataRefcounted withRefsFrom(Collection<JObjectKey> refs);
|
||||
JDataRefcounted withRefsFrom(PCollection<JObjectKey> refs);
|
||||
|
||||
boolean frozen();
|
||||
|
||||
|
||||
@@ -3,9 +3,6 @@ package com.usatiuk.dhfs.objects;
|
||||
import com.usatiuk.dhfs.objects.transaction.Transaction;
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
import jakarta.inject.Inject;
|
||||
import org.apache.commons.collections4.CollectionUtils;
|
||||
|
||||
import java.util.Set;
|
||||
|
||||
@ApplicationScoped
|
||||
public class RefcounterTxHook implements PreCommitTxHook {
|
||||
@@ -19,14 +16,21 @@ public class RefcounterTxHook implements PreCommitTxHook {
|
||||
}
|
||||
var refOld = (JDataRefcounted) old;
|
||||
|
||||
for (var newRef : CollectionUtils.subtract(refCur.collectRefsTo(), refOld.collectRefsTo())) {
|
||||
var referenced = curTx.get(JDataRefcounted.class, newRef).orElse(null);
|
||||
curTx.put(referenced.withRefsFrom(CollectionUtils.union(referenced.refsFrom(), Set.of(key))));
|
||||
var curRefs = refCur.collectRefsTo();
|
||||
var oldRefs = refOld.collectRefsTo();
|
||||
|
||||
for (var curRef : curRefs) {
|
||||
if (!oldRefs.contains(curRef)) {
|
||||
var referenced = curTx.get(JDataRefcounted.class, curRef).orElse(null);
|
||||
curTx.put(referenced.withRefsFrom(referenced.refsFrom().plus(key)));
|
||||
}
|
||||
}
|
||||
|
||||
for (var removedRef : CollectionUtils.subtract(refOld.collectRefsTo(), refCur.collectRefsTo())) {
|
||||
var referenced = curTx.get(JDataRefcounted.class, removedRef).orElse(null);
|
||||
curTx.put(referenced.withRefsFrom(CollectionUtils.subtract(referenced.refsFrom(), Set.of(key))));
|
||||
for (var oldRef : oldRefs) {
|
||||
if (!curRefs.contains(oldRef)) {
|
||||
var referenced = curTx.get(JDataRefcounted.class, oldRef).orElse(null);
|
||||
curTx.put(referenced.withRefsFrom(referenced.refsFrom().minus(key)));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -38,7 +42,7 @@ public class RefcounterTxHook implements PreCommitTxHook {
|
||||
|
||||
for (var newRef : refCur.collectRefsTo()) {
|
||||
var referenced = curTx.get(JDataRefcounted.class, newRef).orElse(null);
|
||||
curTx.put(referenced.withRefsFrom(CollectionUtils.union(referenced.refsFrom(), Set.of(key))));
|
||||
curTx.put(referenced.withRefsFrom(referenced.refsFrom().plus(key)));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -48,10 +52,9 @@ public class RefcounterTxHook implements PreCommitTxHook {
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
for (var removedRef : refCur.collectRefsTo()) {
|
||||
var referenced = curTx.get(JDataRefcounted.class, removedRef).orElse(null);
|
||||
curTx.put(referenced.withRefsFrom(CollectionUtils.subtract(referenced.refsFrom(), Set.of(key))));
|
||||
curTx.put(referenced.withRefsFrom(referenced.refsFrom().minus(key)));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -12,6 +12,7 @@ import com.usatiuk.kleppmanntree.*;
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
import jakarta.inject.Inject;
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
import org.pcollections.TreePSet;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
@@ -37,7 +38,7 @@ public class JKleppmannTreeManager {
|
||||
if (data == null) {
|
||||
data = new JKleppmannTreePersistentData(
|
||||
name,
|
||||
List.of(),
|
||||
TreePSet.empty(),
|
||||
true,
|
||||
1L,
|
||||
new HashMap<>(),
|
||||
|
||||
@@ -4,6 +4,8 @@ import com.usatiuk.dhfs.objects.JDataRefcounted;
|
||||
import com.usatiuk.dhfs.objects.JObjectKey;
|
||||
import com.usatiuk.kleppmanntree.OpMove;
|
||||
import com.usatiuk.kleppmanntree.TreeNode;
|
||||
import org.pcollections.PCollection;
|
||||
import org.pcollections.TreePSet;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.Collection;
|
||||
@@ -13,13 +15,13 @@ import java.util.UUID;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
// FIXME: Ideally this is two classes?
|
||||
public record JKleppmannTreeNode(JObjectKey key, Collection<JObjectKey> refsFrom, boolean frozen, JObjectKey parent,
|
||||
public record JKleppmannTreeNode(JObjectKey key, PCollection<JObjectKey> refsFrom, boolean frozen, JObjectKey parent,
|
||||
OpMove<Long, UUID, JKleppmannTreeNodeMeta, JObjectKey> lastEffectiveOp,
|
||||
JKleppmannTreeNodeMeta meta,
|
||||
Map<String, JObjectKey> children) implements TreeNode<Long, UUID, JKleppmannTreeNodeMeta, JObjectKey>, JDataRefcounted, Serializable {
|
||||
|
||||
public JKleppmannTreeNode(JObjectKey id, JObjectKey parent, JKleppmannTreeNodeMeta meta) {
|
||||
this(id, Collections.emptyList(), false, parent, null, meta, Collections.emptyMap());
|
||||
this(id, TreePSet.empty(), false, parent, null, meta, Collections.emptyMap());
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -43,7 +45,7 @@ public record JKleppmannTreeNode(JObjectKey key, Collection<JObjectKey> refsFrom
|
||||
}
|
||||
|
||||
@Override
|
||||
public JKleppmannTreeNode withRefsFrom(Collection<JObjectKey> refs) {
|
||||
public JKleppmannTreeNode withRefsFrom(PCollection<JObjectKey> refs) {
|
||||
return new JKleppmannTreeNode(key, refs, frozen, parent, lastEffectiveOp, meta, children);
|
||||
}
|
||||
|
||||
|
||||
@@ -1,15 +1,16 @@
|
||||
package com.usatiuk.dhfs.objects.jkleppmanntree.structs;
|
||||
|
||||
import com.usatiuk.dhfs.objects.JDataRefcounted;
|
||||
import com.usatiuk.dhfs.objects.JObjectKey;
|
||||
import com.usatiuk.kleppmanntree.CombinedTimestamp;
|
||||
import com.usatiuk.kleppmanntree.LogRecord;
|
||||
import com.usatiuk.kleppmanntree.OpMove;
|
||||
import com.usatiuk.dhfs.objects.JObjectKey;
|
||||
import org.pcollections.PCollection;
|
||||
|
||||
import java.util.*;
|
||||
|
||||
public record JKleppmannTreePersistentData(
|
||||
JObjectKey key, Collection<JObjectKey> refsFrom, boolean frozen,
|
||||
JObjectKey key, PCollection<JObjectKey> refsFrom, boolean frozen,
|
||||
long clock,
|
||||
HashMap<UUID, TreeMap<CombinedTimestamp<Long, UUID>, OpMove<Long, UUID, JKleppmannTreeNodeMeta, JObjectKey>>> queues,
|
||||
HashMap<UUID, Long> peerTimestampLog,
|
||||
@@ -37,7 +38,7 @@ public record JKleppmannTreePersistentData(
|
||||
}
|
||||
|
||||
@Override
|
||||
public JKleppmannTreePersistentData withRefsFrom(Collection<JObjectKey> refs) {
|
||||
public JKleppmannTreePersistentData withRefsFrom(PCollection<JObjectKey> refs) {
|
||||
return new JKleppmannTreePersistentData(key, refs, frozen, clock, queues, peerTimestampLog, log);
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user