somewhat working FS layer

This commit is contained in:
2024-08-03 21:40:54 +02:00
parent f8d8ae0c89
commit e8929350f9
58 changed files with 1222 additions and 616 deletions

View File

@@ -0,0 +1,24 @@
package com.usatiuk.kleppmanntree;
import java.io.Serializable;
import java.util.concurrent.atomic.AtomicLong;
public class AtomicClock implements Clock<Long>, Serializable {
private AtomicLong _max = new AtomicLong(0L);
@Override
public Long getTimestamp() {
return _max.incrementAndGet();
}
@Override
public void updateTimestamp(Long receivedTimestamp) {
long exp = _max.get();
long set = Math.max(exp, receivedTimestamp);
// TODO: What is correct memory ordering?
while (!_max.weakCompareAndSetVolatile(exp, set)) {
exp = _max.get();
set = Math.max(exp, set);
}
}
}

View File

@@ -1,10 +1,11 @@
package com.usatiuk.kleppmanntree;
import java.io.Serializable;
import java.util.Comparator;
public record CombinedTimestamp<TimestampT extends Comparable<TimestampT>, PeerIdT extends Comparable<PeerIdT>>
(TimestampT timestamp, PeerIdT nodeId) implements Comparable<CombinedTimestamp<TimestampT, PeerIdT>> {
(TimestampT timestamp,
PeerIdT nodeId) implements Comparable<CombinedTimestamp<TimestampT, PeerIdT>>, Serializable {
@Override
public int compareTo(CombinedTimestamp<TimestampT, PeerIdT> o) {

View File

@@ -27,7 +27,7 @@ public class KleppmannTree<TimestampT extends Comparable<TimestampT>, PeerIdT ex
try {
childId = from.getNode().getChildren().get(names.getFirst());
} finally {
from.rwUnlock();
from.rUnlock();
}
if (childId == null)
@@ -37,7 +37,12 @@ public class KleppmannTree<TimestampT extends Comparable<TimestampT>, PeerIdT ex
}
public NodeIdT traverse(List<NameT> names) {
return traverse(_storage.getRootId(), names);
_storage.globalRLock();
try {
return traverse(_storage.getRootId(), names);
} finally {
_storage.globalRUnlock();
}
}
private void undoOp(LogOpMove<TimestampT, PeerIdT, NameT, ? extends MetaT, NodeIdT> op) {
@@ -55,6 +60,8 @@ public class KleppmannTree<TimestampT extends Comparable<TimestampT>, PeerIdT ex
node.getNode().setMeta(op.oldInfo().oldMeta());
node.getNode().setParent(oldParent.getNode().getId());
oldParent.getNode().getChildren().put(node.getNode().getMeta().getName(), node.getNode().getId());
node.notifyRmRef(curParent.getNode().getId());
node.notifyRef(oldParent.getNode().getId());
} finally {
node.rwUnlock();
oldParent.rwUnlock();
@@ -67,6 +74,7 @@ public class KleppmannTree<TimestampT extends Comparable<TimestampT>, PeerIdT ex
node.rwLock();
try {
curParent.getNode().getChildren().remove(node.getNode().getMeta().getName());
node.notifyRmRef(curParent.getNode().getId());
_storage.removeNode(node.getNode().getId());
} finally {
node.rwUnlock();
@@ -82,15 +90,26 @@ public class KleppmannTree<TimestampT extends Comparable<TimestampT>, PeerIdT ex
public <LocalMetaT extends MetaT> void applyOp(OpMove<TimestampT, PeerIdT, NameT, LocalMetaT, NodeIdT> op) {
_clock.updateTimestamp(op.timestamp().timestamp());
var log = _storage.getLog();
if (log.isEmpty()) {
log.put(op.timestamp(), doOp(op));
return;
int cmp;
_storage.globalRLock();
try {
if (log.isEmpty()) {
// doOp can't be a move here, otherwise we deadlock
log.put(op.timestamp(), doOp(op));
return;
}
cmp = op.timestamp().compareTo(log.lastEntry().getKey());
} finally {
_storage.globalRUnlock();
}
var cmp = op.timestamp().compareTo(log.lastEntry().getKey());
assert cmp != 0;
if (cmp < 0) {
_storage.globalLock();
_storage.globalRwLock();
try {
if (log.containsKey(op.timestamp())) return;
var toUndo = log.tailMap(op.timestamp(), false);
for (var entry : toUndo.reversed().entrySet()) {
undoOp(entry.getValue());
@@ -100,20 +119,18 @@ public class KleppmannTree<TimestampT extends Comparable<TimestampT>, PeerIdT ex
redoOp(entry);
}
} finally {
_storage.globalUnlock();
_storage.globalRwUnlock();
}
} else {
log.put(op.timestamp(), doOp(op));
var res = doOp(op);
log.put(op.timestamp(), res);
}
}
private <LocalMetaT extends MetaT> LogOpMove<TimestampT, PeerIdT, NameT, LocalMetaT, NodeIdT> doOp(OpMove<TimestampT, PeerIdT, NameT, LocalMetaT, NodeIdT> op) {
var node = _storage.getById(op.childId());
if (node == null) {
node = _storage.createNewNode(op.childId());
}
var oldParent = node.getNode().getParent() != null ? _storage.getById(node.getNode().getParent()) : null;
var oldParent = (node != null && node.getNode().getParent() != null) ? _storage.getById(node.getNode().getParent()) : null;
var newParent = _storage.getById(op.newParentId());
if (newParent == null) {
@@ -123,11 +140,10 @@ public class KleppmannTree<TimestampT extends Comparable<TimestampT>, PeerIdT ex
if (oldParent == null) {
newParent.rwLock();
try {
node.rwLock();
node = _storage.createNewNode(new TreeNode<>(op.childId(), op.newParentId(), op.newMeta()));
try {
node.getNode().setMeta(op.newMeta());
node.getNode().setParent(op.newParentId());
newParent.getNode().getChildren().put(node.getNode().getMeta().getName(), node.getNode().getId());
node.notifyRef(newParent.getNode().getId());
return new LogOpMove<>(null, op);
} finally {
node.rwUnlock();
@@ -138,7 +154,7 @@ public class KleppmannTree<TimestampT extends Comparable<TimestampT>, PeerIdT ex
}
// FIXME:
_storage.globalLock();
_storage.globalRwLock();
try {
if (op.childId() == op.newParentId() || isAncestor(op.childId(), op.newParentId()))
return new LogOpMove<>(null, op);
@@ -155,6 +171,8 @@ public class KleppmannTree<TimestampT extends Comparable<TimestampT>, PeerIdT ex
node.getNode().setMeta(op.newMeta());
node.getNode().setParent(newParent.getNode().getId());
newParent.getNode().getChildren().put(op.newMeta().getName(), node.getNode().getId());
node.notifyRmRef(oldParent.getNode().getId());
node.notifyRef(newParent.getNode().getId());
return new LogOpMove<>(new LogOpMoveOld<>(oldParent.getNode().getId(), (LocalMetaT) oldMeta), op);
} finally {
node.rwUnlock();
@@ -162,7 +180,7 @@ public class KleppmannTree<TimestampT extends Comparable<TimestampT>, PeerIdT ex
newParent.rwUnlock();
}
} finally {
_storage.globalUnlock();
_storage.globalRwUnlock();
}
}

View File

@@ -1,5 +1,7 @@
package com.usatiuk.kleppmanntree;
import java.io.Serializable;
public record LogOpMove<TimestampT extends Comparable<TimestampT>, PeerIdT extends Comparable<PeerIdT>, NameT, MetaT extends NodeMeta<NameT>, NodeIdT>
(LogOpMoveOld<NameT, MetaT, NodeIdT> oldInfo,
OpMove<TimestampT, PeerIdT, NameT, MetaT, NodeIdT> op) {}
OpMove<TimestampT, PeerIdT, NameT, MetaT, NodeIdT> op) implements Serializable {}

View File

@@ -1,4 +1,6 @@
package com.usatiuk.kleppmanntree;
import java.io.Serializable;
public record LogOpMoveOld<NameT, MetaT extends NodeMeta<NameT>, NodeIdT>
(NodeIdT oldParent, MetaT oldMeta) {}
(NodeIdT oldParent, MetaT oldMeta) implements Serializable {}

View File

@@ -1,5 +1,7 @@
package com.usatiuk.kleppmanntree;
public interface NodeMeta<NameT> {
import java.io.Serializable;
public interface NodeMeta<NameT> extends Serializable {
public NameT getName();
}

View File

@@ -1,4 +1,7 @@
package com.usatiuk.kleppmanntree;
import java.io.Serializable;
public record OpMove<TimestampT extends Comparable<TimestampT>, PeerIdT extends Comparable<PeerIdT>, NameT, MetaT extends NodeMeta<NameT>, NodeIdT>
(CombinedTimestamp<TimestampT, PeerIdT> timestamp, NodeIdT newParentId, MetaT newMeta, NodeIdT childId) {}
(CombinedTimestamp<TimestampT, PeerIdT> timestamp, NodeIdT newParentId, MetaT newMeta,
NodeIdT childId) implements Serializable {}

View File

@@ -18,7 +18,8 @@ public interface StorageInterface<
WrapperT getById(NodeIdT id);
WrapperT createNewNode(NodeIdT id);
// Creates a node, returned wrapper is RW-locked
WrapperT createNewNode(TreeNode<NameT, MetaT, NodeIdT> node);
void removeNode(NodeIdT id);
@@ -28,7 +29,12 @@ public interface StorageInterface<
NavigableMap<CombinedTimestamp<TimestampT, PeerIdT>, LogOpMove<TimestampT, PeerIdT, NameT, ? extends MetaT, NodeIdT>> getLog();
// Locks all the objects from being changed
void globalLock();
void globalRwLock();
void globalUnlock();
void globalRwUnlock();
// Locks all the objects from being changed
void globalRLock();
void globalRUnlock();
}

View File

@@ -3,16 +3,21 @@ package com.usatiuk.kleppmanntree;
import lombok.Getter;
import lombok.Setter;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
@Getter
@Setter
public class TreeNode<NameT, MetaT extends NodeMeta<NameT>, NodeIdT> {
public class TreeNode<NameT, MetaT extends NodeMeta<NameT>, NodeIdT> implements Serializable {
private NodeIdT _parent = null;
private final NodeIdT _id;
private MetaT _meta = null;
private Map<NameT, NodeIdT> _children = new HashMap<>();
public TreeNode(NodeIdT id) {_id = id;}
public TreeNode(NodeIdT id, NodeIdT parent, MetaT meta) {
_id = id;
_meta = meta;
_parent = parent;
}
}

View File

@@ -9,5 +9,9 @@ public interface TreeNodeWrapper<NameT, MetaT extends NodeMeta<NameT>, NodeIdT>
void rwUnlock();
void notifyRef(NodeIdT id);
void notifyRmRef(NodeIdT id);
TreeNode<NameT, MetaT, NodeIdT> getNode();
}

View File

@@ -25,6 +25,16 @@ public class TestNodeWrapper implements TreeNodeWrapper<String, TestNodeMeta, Lo
}
@Override
public void notifyRef(Long id) {
}
@Override
public void notifyRmRef(Long id) {
}
@Override
public TreeNode<String, TestNodeMeta, Long> getNode() {
return _backingNode;

View File

@@ -11,8 +11,8 @@ public class TestStorageInterface implements StorageInterface<Long, Long, String
public TestStorageInterface(long peerId) {
_peerId = peerId;
_nodes.put(getRootId(), new TreeNode<>(getRootId()));
_nodes.put(getTrashId(), new TreeNode<>(getTrashId()));
_nodes.put(getRootId(), new TreeNode<>(getRootId(), null, null));
_nodes.put(getTrashId(), new TreeNode<>(getTrashId(), null, null));
}
@Override
@@ -37,13 +37,12 @@ public class TestStorageInterface implements StorageInterface<Long, Long, String
}
@Override
public TestNodeWrapper createNewNode(Long id) {
if (!_nodes.containsKey(id)) {
var newNode = new TreeNode<String, TestNodeMeta, Long>(id);
_nodes.put(id, newNode);
return new TestNodeWrapper(newNode);
public TestNodeWrapper createNewNode(TreeNode<String, TestNodeMeta, Long> node) {
if (!_nodes.containsKey(node.getId())) {
_nodes.put(node.getId(), node);
return new TestNodeWrapper(node);
}
throw new IllegalStateException("Node with id " + id + " already exists");
throw new IllegalStateException("Node with id " + node.getId() + " already exists");
}
@Override
@@ -64,12 +63,22 @@ public class TestStorageInterface implements StorageInterface<Long, Long, String
}
@Override
public void globalLock() {
public void globalRwLock() {
}
@Override
public void globalUnlock() {
public void globalRwUnlock() {
}
@Override
public void globalRLock() {
}
@Override
public void globalRUnlock() {
}
}

View File

@@ -145,6 +145,11 @@
<artifactId>commons-collections4</artifactId>
<version>4.5.0-M2</version>
</dependency>
<dependency>
<groupId>com.usatiuk</groupId>
<artifactId>kleppmanntree</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
</dependencies>
<build>

View File

@@ -41,127 +41,128 @@ public class FileConflictResolver implements ConflictResolver {
Log.error("Object type mismatch!");
throw new NotImplementedException();
}
throw new NotImplementedException();
var _oursDir = jObjectManager.get(ours.runReadLocked(JObject.ResolutionStrategy.LOCAL_ONLY, (m, d) -> {
if (d == null)
throw new StatusRuntimeException(Status.ABORTED.withDescription("Conflict but we don't have local copy"));
if (!(d instanceof File df))
throw new StatusRuntimeException(Status.ABORTED.withDescription("Bad type for file"));
return df.getParent().toString();
}))
.orElseThrow(() -> new NotImplementedException("Could not find parent directory for file " + ours.getName()));
_oursDir.runWriteLocked(JObject.ResolutionStrategy.LOCAL_ONLY, (mD, oursDirU, bumpDir, invalidateDir) -> {
if (oursDirU == null)
throw new StatusRuntimeException(Status.ABORTED.withDescription("Conflict but we don't have local copy"));
if (!(oursDirU instanceof Directory oursDir))
throw new StatusRuntimeException(Status.ABORTED.withDescription("Bad type for directory"));
ours.runWriteLocked(JObject.ResolutionStrategy.LOCAL_ONLY, (m, oursFileU, bumpFile, invalidateFile) -> {
if (oursFileU == null)
throw new StatusRuntimeException(Status.ABORTED.withDescription("Conflict but we don't have local copy"));
if (!(oursFileU instanceof File oursFile))
throw new StatusRuntimeException(Status.ABORTED.withDescription("Bad type for file"));
// TODO: dedup
File first;
File second;
UUID otherHostname;
if (oursFile.getMtime() >= theirsFile.getMtime()) {
first = oursFile;
second = theirsFile;
otherHostname = conflictHost;
} else {
second = oursFile;
first = theirsFile;
otherHostname = persistentRemoteHostsService.getSelfUuid();
}
Map<UUID, Long> newChangelog = new LinkedHashMap<>(m.getChangelog());
for (var entry : theirsHeader.getChangelog().getEntriesList()) {
newChangelog.merge(UUID.fromString(entry.getHost()), entry.getVersion(), Long::max);
}
boolean chunksDiff = !Objects.equals(first.getChunks(), second.getChunks());
var firstChunksCopy = first.getChunks().entrySet().stream().map(e -> Pair.of(e.getKey(), e.getValue())).toList();
var secondChunksCopy = second.getChunks().entrySet().stream().map(e -> Pair.of(e.getKey(), e.getValue())).toList();
boolean wasChanged = oursFile.getMtime() != first.getMtime()
|| oursFile.getCtime() != first.getCtime()
|| first.isSymlink() != second.isSymlink()
|| chunksDiff;
if (m.getBestVersion() > newChangelog.values().stream().reduce(0L, Long::sum))
throw new StatusRuntimeException(Status.ABORTED.withDescription("Race when conflict resolving"));
if (wasChanged) {
newChangelog.merge(persistentRemoteHostsService.getSelfUuid(), 1L, Long::sum);
if (useHashForChunks)
throw new NotImplementedException();
HashSet<String> oursBackup = new HashSet<>(oursFile.getChunks().values());
oursFile.getChunks().clear();
for (var e : firstChunksCopy) {
oursFile.getChunks().put(e.getLeft(), e.getValue());
jObjectManager.getOrPut(ChunkData.getNameFromHash(e.getValue()), ChunkData.class, Optional.of(ChunkInfo.getNameFromHash(e.getValue())));
jObjectManager.getOrPut(ChunkInfo.getNameFromHash(e.getValue()), ChunkInfo.class, Optional.of(oursFile.getName()));
}
HashSet<String> oursNew = new HashSet<>(oursFile.getChunks().values());
oursFile.setMtime(first.getMtime());
oursFile.setCtime(first.getCtime());
var newFile = new File(UUID.randomUUID(), second.getMode(), oursDir.getUuid(), second.isSymlink());
newFile.setMtime(second.getMtime());
newFile.setCtime(second.getCtime());
for (var e : secondChunksCopy) {
newFile.getChunks().put(e.getLeft(), e.getValue());
jObjectManager.getOrPut(ChunkData.getNameFromHash(e.getValue()), ChunkData.class, Optional.of(ChunkInfo.getNameFromHash(e.getValue())));
jObjectManager.getOrPut(ChunkInfo.getNameFromHash(e.getValue()), ChunkInfo.class, Optional.ofNullable(newFile.getName()));
}
var theName = oursDir.getChildren().entrySet().stream()
.filter(p -> p.getValue().equals(oursFile.getUuid())).findAny()
.orElseThrow(() -> new NotImplementedException("Could not find our file in directory " + oursDir.getName()));
jObjectManager.put(newFile, Optional.of(_oursDir.getName()));
int i = 0;
do {
String name = theName.getKey() + ".conflict." + i + "." + otherHostname;
if (oursDir.getChildren().containsKey(name)) {
i++;
continue;
}
oursDir.getChildren().put(name, newFile.getUuid());
break;
} while (true);
bumpDir.apply();
for (var cuuid : oursBackup) {
if (!oursNew.contains(cuuid))
jObjectManager
.get(ChunkInfo.getNameFromHash(cuuid))
.ifPresent(jObject -> jObject.runWriteLocked(JObject.ResolutionStrategy.NO_RESOLUTION, (mc, d, b, v) -> {
mc.removeRef(oursFile.getName());
return null;
}));
}
}
m.setChangelog(newChangelog);
return null;
});
return null;
});
// var _oursDir = jObjectManager.get(ours.runReadLocked(JObject.ResolutionStrategy.LOCAL_ONLY, (m, d) -> {
// if (d == null)
// throw new StatusRuntimeException(Status.ABORTED.withDescription("Conflict but we don't have local copy"));
// if (!(d instanceof File df))
// throw new StatusRuntimeException(Status.ABORTED.withDescription("Bad type for file"));
// return df.getParent().toString();
// }))
// .orElseThrow(() -> new NotImplementedException("Could not find parent directory for file " + ours.getName()));
//
// _oursDir.runWriteLocked(JObject.ResolutionStrategy.LOCAL_ONLY, (mD, oursDirU, bumpDir, invalidateDir) -> {
// if (oursDirU == null)
// throw new StatusRuntimeException(Status.ABORTED.withDescription("Conflict but we don't have local copy"));
// if (!(oursDirU instanceof Directory oursDir))
// throw new StatusRuntimeException(Status.ABORTED.withDescription("Bad type for directory"));
//
// ours.runWriteLocked(JObject.ResolutionStrategy.LOCAL_ONLY, (m, oursFileU, bumpFile, invalidateFile) -> {
// if (oursFileU == null)
// throw new StatusRuntimeException(Status.ABORTED.withDescription("Conflict but we don't have local copy"));
// if (!(oursFileU instanceof File oursFile))
// throw new StatusRuntimeException(Status.ABORTED.withDescription("Bad type for file"));
//
// // TODO: dedup
//
// File first;
// File second;
// UUID otherHostname;
//
// if (oursFile.getMtime() >= theirsFile.getMtime()) {
// first = oursFile;
// second = theirsFile;
// otherHostname = conflictHost;
// } else {
// second = oursFile;
// first = theirsFile;
// otherHostname = persistentRemoteHostsService.getSelfUuid();
// }
//
// Map<UUID, Long> newChangelog = new LinkedHashMap<>(m.getChangelog());
//
// for (var entry : theirsHeader.getChangelog().getEntriesList()) {
// newChangelog.merge(UUID.fromString(entry.getHost()), entry.getVersion(), Long::max);
// }
//
// boolean chunksDiff = !Objects.equals(first.getChunks(), second.getChunks());
//
// var firstChunksCopy = first.getChunks().entrySet().stream().map(e -> Pair.of(e.getKey(), e.getValue())).toList();
// var secondChunksCopy = second.getChunks().entrySet().stream().map(e -> Pair.of(e.getKey(), e.getValue())).toList();
//
// boolean wasChanged = oursFile.getMtime() != first.getMtime()
// || oursFile.getCtime() != first.getCtime()
// || first.isSymlink() != second.isSymlink()
// || chunksDiff;
//
// if (m.getBestVersion() > newChangelog.values().stream().reduce(0L, Long::sum))
// throw new StatusRuntimeException(Status.ABORTED.withDescription("Race when conflict resolving"));
//
// if (wasChanged) {
// newChangelog.merge(persistentRemoteHostsService.getSelfUuid(), 1L, Long::sum);
//
// if (useHashForChunks)
// throw new NotImplementedException();
//
// HashSet<String> oursBackup = new HashSet<>(oursFile.getChunks().values());
// oursFile.getChunks().clear();
//
// for (var e : firstChunksCopy) {
// oursFile.getChunks().put(e.getLeft(), e.getValue());
// jObjectManager.getOrPut(ChunkData.getNameFromHash(e.getValue()), ChunkData.class, Optional.of(ChunkInfo.getNameFromHash(e.getValue())));
// jObjectManager.getOrPut(ChunkInfo.getNameFromHash(e.getValue()), ChunkInfo.class, Optional.of(oursFile.getName()));
// }
// HashSet<String> oursNew = new HashSet<>(oursFile.getChunks().values());
//
// oursFile.setMtime(first.getMtime());
// oursFile.setCtime(first.getCtime());
//
// var newFile = new File(UUID.randomUUID(), second.getMode(), oursDir.getUuid(), second.isSymlink());
//
// newFile.setMtime(second.getMtime());
// newFile.setCtime(second.getCtime());
//
// for (var e : secondChunksCopy) {
// newFile.getChunks().put(e.getLeft(), e.getValue());
// jObjectManager.getOrPut(ChunkData.getNameFromHash(e.getValue()), ChunkData.class, Optional.of(ChunkInfo.getNameFromHash(e.getValue())));
// jObjectManager.getOrPut(ChunkInfo.getNameFromHash(e.getValue()), ChunkInfo.class, Optional.ofNullable(newFile.getName()));
// }
//
// var theName = oursDir.getChildren().entrySet().stream()
// .filter(p -> p.getValue().equals(oursFile.getUuid())).findAny()
// .orElseThrow(() -> new NotImplementedException("Could not find our file in directory " + oursDir.getName()));
//
// jObjectManager.put(newFile, Optional.of(_oursDir.getName()));
//
// int i = 0;
// do {
// String name = theName.getKey() + ".conflict." + i + "." + otherHostname;
// if (oursDir.getChildren().containsKey(name)) {
// i++;
// continue;
// }
// oursDir.getChildren().put(name, newFile.getUuid());
// break;
// } while (true);
//
// bumpDir.apply();
//
// for (var cuuid : oursBackup) {
// if (!oursNew.contains(cuuid))
// jObjectManager
// .get(ChunkInfo.getNameFromHash(cuuid))
// .ifPresent(jObject -> jObject.runWriteLocked(JObject.ResolutionStrategy.NO_RESOLUTION, (mc, d, b, v) -> {
// mc.removeRef(oursFile.getName());
// return null;
// }));
// }
// }
//
// m.setChangelog(newChangelog);
// return null;
// });
// return null;
// });
}
}

View File

@@ -17,15 +17,10 @@ public class File extends FsNode {
@Getter
private final NavigableMap<Long, String> _chunks = new TreeMap<>();
@Getter
private final UUID _parent;
@Getter
private final boolean _symlink;
public File(UUID uuid, long mode, UUID parent, boolean symlink) {
public File(UUID uuid, long mode, boolean symlink) {
super(uuid, mode);
if (parent == null)
throw new IllegalArgumentException("Parent UUID cannot be null");
_parent = parent;
_symlink = symlink;
}

View File

@@ -4,7 +4,6 @@ import com.usatiuk.dhfs.objects.persistence.FileP;
import com.usatiuk.dhfs.objects.persistence.FsNodeP;
import com.usatiuk.dhfs.objects.protoserializer.ProtoDeserializer;
import com.usatiuk.dhfs.objects.protoserializer.ProtoSerializer;
import io.quarkus.logging.Log;
import jakarta.enterprise.context.ApplicationScoped;
import java.util.UUID;
@@ -13,7 +12,7 @@ import java.util.UUID;
public class FileSerializer implements ProtoSerializer<FileP, File>, ProtoDeserializer<FileP, File> {
@Override
public File deserialize(FileP message) {
var ret = new File(UUID.fromString(message.getFsNode().getUuid()), message.getFsNode().getMode(), UUID.fromString(message.getParent()), message.getSymlink());
var ret = new File(UUID.fromString(message.getFsNode().getUuid()), message.getFsNode().getMode(), message.getSymlink());
ret.setMtime(message.getFsNode().getMtime());
ret.setCtime(message.getFsNode().getCtime());
ret.getChunks().putAll(message.getChunksMap());
@@ -23,16 +22,15 @@ public class FileSerializer implements ProtoSerializer<FileP, File>, ProtoDeseri
@Override
public FileP serialize(File object) {
var ret = FileP.newBuilder()
.setFsNode(FsNodeP.newBuilder()
.setCtime(object.getCtime())
.setMtime(object.getMtime())
.setMode(object.getMode())
.setUuid(object.getUuid().toString())
.build())
.putAllChunks(object.getChunks())
.setSymlink(object.isSymlink())
.setParent(object.getParent().toString())
.build();
.setFsNode(FsNodeP.newBuilder()
.setCtime(object.getCtime())
.setMtime(object.getMtime())
.setMode(object.getMode())
.setUuid(object.getUuid().toString())
.build())
.putAllChunks(object.getChunks())
.setSymlink(object.isSymlink())
.build();
return ret;
}
}

View File

@@ -1,7 +1,6 @@
package com.usatiuk.dhfs.files.service;
import com.google.protobuf.ByteString;
import com.usatiuk.dhfs.files.objects.FsNode;
import java.util.Optional;
@@ -10,15 +9,13 @@ public interface DhfsFileService {
Optional<String> create(String name, long mode);
Optional<String> mkdir(String name, long mode);
void mkdir(String name, long mode);
Optional<FsNode> getattr(String name);
Optional<GetattrRes> getattr(String name);
Boolean chmod(String name, long mode);
Boolean rmdir(String name);
Boolean unlink(String name);
void unlink(String name);
Boolean rename(String from, String to);
@@ -35,6 +32,7 @@ public interface DhfsFileService {
Boolean truncate(String fileUuid, long length);
String readlink(String uuid);
ByteString readlinkBS(String uuid);
String symlink(String oldpath, String newpath);

View File

@@ -2,7 +2,16 @@ package com.usatiuk.dhfs.files.service;
import com.google.protobuf.ByteString;
import com.google.protobuf.UnsafeByteOperations;
import com.usatiuk.dhfs.files.objects.*;
import com.usatiuk.dhfs.files.objects.ChunkData;
import com.usatiuk.dhfs.files.objects.ChunkInfo;
import com.usatiuk.dhfs.files.objects.File;
import com.usatiuk.dhfs.files.objects.FsNode;
import com.usatiuk.dhfs.objects.jklepmanntree.JKleppmannTree;
import com.usatiuk.dhfs.objects.jklepmanntree.JKleppmannTreeManager;
import com.usatiuk.dhfs.objects.jklepmanntree.structs.JTreeNodeMeta;
import com.usatiuk.dhfs.objects.jklepmanntree.structs.JTreeNodeMetaDirectory;
import com.usatiuk.dhfs.objects.jklepmanntree.structs.JTreeNodeMetaFile;
import com.usatiuk.dhfs.objects.jklepmanntree.structs.TreeNodeJObjectData;
import com.usatiuk.dhfs.objects.jrepository.JObject;
import com.usatiuk.dhfs.objects.jrepository.JObjectManager;
import com.usatiuk.dhfs.objects.repository.PersistentRemoteHostsService;
@@ -15,16 +24,13 @@ import jakarta.annotation.Priority;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
import jakarta.inject.Inject;
import org.apache.commons.lang3.NotImplementedException;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import java.util.stream.StreamSupport;
@ApplicationScoped
public class DhfsFileServiceImpl implements DhfsFileService {
@@ -60,6 +66,10 @@ public class DhfsFileServiceImpl implements DhfsFileService {
@Inject
PersistentRemoteHostsService persistentRemoteHostsService;
@Inject
JKleppmannTreeManager jKleppmannTreeManager;
private JKleppmannTree _tree;
private ChunkData createChunk(ByteString bytes) {
if (useHashForChunks) {
@@ -71,76 +81,53 @@ public class DhfsFileServiceImpl implements DhfsFileService {
void init(@Observes @Priority(500) StartupEvent event) {
Log.info("Initializing file service");
if (jObjectManager.get(new UUID(0, 0).toString()).isEmpty())
jObjectManager.put(new Directory(new UUID(0, 0), 0755), Optional.empty());
getRoot();
_tree = jKleppmannTreeManager.getTree("fs");
}
private JObject<? extends FsNode> traverse(JObject<? extends FsNode> from, Path path, int curPos) {
Supplier<StatusRuntimeExceptionNoStacktrace> notFound
= () -> new StatusRuntimeExceptionNoStacktrace(Status.NOT_FOUND.withDescription("Not found: " + from.getName() + "/" + path));
var found = from.runReadLocked(JObject.ResolutionStrategy.REMOTE, (m, d) -> {
if (d instanceof Directory dir)
return dir.getKid(path.getName(curPos).toString());
return Optional.empty();
}).orElseThrow(notFound);
Optional<JObject<?>> ref = jObjectManager.get(found.toString());
if (ref.isEmpty()) {
Log.error("File missing when traversing directory " + from.getName() + ": " + found);
throw notFound.get();
}
if (refVerification)
ref.get().runReadLocked(JObject.ResolutionStrategy.REMOTE, (m, d) -> {
if (!(d instanceof FsNode))
throw new StatusRuntimeException(Status.DATA_LOSS.withDescription("FsNode is not an FsNode: " + m.getName()));
return null;
});
if (path.getNameCount() - 1 == curPos) {
if (refVerification)
ref.get().runReadLocked(JObject.ResolutionStrategy.REMOTE, (m, d) -> {
if (d instanceof File f) {
if (!Objects.equals(f.getParent().toString(), from.getName())) {
throw new StatusRuntimeException(Status.DATA_LOSS.withDescription("Parent mismatch for file " + path));
}
}
return null;
});
return (JObject<? extends FsNode>) ref.get();
}
return traverse((JObject<? extends FsNode>) ref.get(), path, curPos + 1);
private JObject<TreeNodeJObjectData> getDirEntry(String name) {
var res = _tree.traverse(StreamSupport.stream(Path.of(name).spliterator(), false).map(p -> p.toString()).toList());
if (res == null) throw new StatusRuntimeExceptionNoStacktrace(Status.NOT_FOUND);
var ret = jObjectManager.get(res).orElseThrow(() -> new StatusRuntimeException(Status.NOT_FOUND.withDescription("Tree node exists but not found as jObject: " + name)));
if (!ret.getKnownClass().equals(TreeNodeJObjectData.class))
throw new StatusRuntimeException(Status.NOT_FOUND.withDescription("Tree node exists but not jObject: " + name));
return (JObject<TreeNodeJObjectData>) ret;
}
private JObject<? extends FsNode> traverse(JObject<? extends FsNode> from, Path path) {
if (path.getNameCount() == 0) return from;
return traverse(from, path, 0);
}
private JObject<? extends FsNode> getDirEntry(String name) {
return traverse(getRoot(), Path.of(name));
private Optional<JObject<TreeNodeJObjectData>> getDirEntryOpt(String name) {
var res = _tree.traverse(StreamSupport.stream(Path.of(name).spliterator(), false).map(p -> p.toString()).toList());
if (res == null) return Optional.empty();
var ret = jObjectManager.get(res).orElseThrow(() -> new StatusRuntimeException(Status.NOT_FOUND.withDescription("Tree node exists but not found as jObject: " + name)));
if (!ret.getKnownClass().equals(TreeNodeJObjectData.class))
throw new StatusRuntimeException(Status.NOT_FOUND.withDescription("Tree node exists but not jObject: " + name));
return Optional.of((JObject<TreeNodeJObjectData>) ret);
}
@Override
public Optional<FsNode> getattr(String uuid) {
public Optional<GetattrRes> getattr(String uuid) {
var ref = jObjectManager.get(uuid);
if (ref.isEmpty()) return Optional.empty();
return ref.get().runReadLocked(JObject.ResolutionStrategy.REMOTE, (m, d) -> {
if (!(d instanceof FsNode))
GetattrRes ret;
if (d instanceof File f) {
ret = new GetattrRes(f.getMtime(), f.getCtime(), f.getMode(), f.isSymlink() ? GetattrType.SYMLINK : GetattrType.FILE);
} else if (d instanceof TreeNodeJObjectData) {
ret = new GetattrRes(100, 100, 0777, GetattrType.DIRECTORY);
} else {
throw new StatusRuntimeException(Status.DATA_LOSS.withDescription("FsNode is not an FsNode: " + m.getName()));
//FIXME:
return Optional.of((FsNode) d);
}
return Optional.of(ret);
});
}
@Override
public Optional<String> open(String name) {
try {
return Optional.ofNullable(traverse(getRoot(), Path.of(name)).getName());
var ret = getDirEntry(name);
return Optional.of(ret.runReadLocked(JObject.ResolutionStrategy.LOCAL_ONLY, (m, d) -> {
if (d.getNode().getMeta() instanceof JTreeNodeMetaFile f) return f.getFileIno();
else if (d.getNode().getMeta() instanceof JTreeNodeMetaDirectory f) return m.getName();
throw new StatusRuntimeException(Status.DATA_LOSS.withDescription("FsNode is not an FsNode: " + m.getName()));
}));
} catch (StatusRuntimeException e) {
if (e.getStatus().getCode() == Status.Code.NOT_FOUND) {
return Optional.empty();
@@ -149,271 +136,88 @@ public class DhfsFileServiceImpl implements DhfsFileService {
}
}
private void ensureDir(JObject<TreeNodeJObjectData> entry) {
entry.runReadLocked(JObject.ResolutionStrategy.REMOTE, (m, d) -> {
if (d.getNode().getMeta() instanceof JTreeNodeMetaFile f)
throw new StatusRuntimeExceptionNoStacktrace(Status.INVALID_ARGUMENT.withDescription(m.getName() + " is a file, not directory"));
else if (d.getNode().getMeta() instanceof JTreeNodeMetaDirectory f) return null;
throw new StatusRuntimeException(Status.DATA_LOSS.withDescription("FsNode is not an FsNode: " + m.getName()));
});
}
@Override
public Optional<String> create(String name, long mode) {
var parent = traverse(getRoot(), Path.of(name).getParent());
Path path = Path.of(name);
var parent = getDirEntry(path.getParent().toString());
String fname = Path.of(name).getFileName().toString();
ensureDir(parent);
String fname = path.getFileName().toString();
var fuuid = UUID.randomUUID();
Log.debug("Creating file " + fuuid);
File f = new File(fuuid, mode, UUID.fromString(parent.getName()), false);
File f = new File(fuuid, mode, false);
if (!parent.runWriteLocked(JObject.ResolutionStrategy.REMOTE, (m, d, bump, invalidate) -> {
if (!(d instanceof Directory dir))
return false;
if (dir.getKid(fname).isPresent())
return false;
boolean created = dir.putKid(fname, fuuid);
if (!created) return false;
try {
jObjectManager.put(f, Optional.of(dir.getName()));
} catch (Exception ex) {
Log.error("Failed creating file " + fuuid);
dir.removeKid(fname);
}
bump.apply();
dir.setMtime(System.currentTimeMillis());
return true;
}))
return Optional.empty();
var newNodeId = _tree.getNewNodeId();
jObjectManager.put(f, Optional.of(newNodeId));
_tree.move(parent.getName(), new JTreeNodeMetaFile(fname, f.getName()), newNodeId);
return Optional.of(f.getName());
}
@Override
public Optional<String> mkdir(String name, long mode) {
var found = traverse(getRoot(), Path.of(name).getParent());
public void mkdir(String name, long mode) {
Path path = Path.of(name);
var parent = getDirEntry(path.getParent().toString());
ensureDir(parent);
String dname = Path.of(name).getFileName().toString();
String dname = path.getFileName().toString();
var duuid = UUID.randomUUID();
Log.debug("Creating dir " + duuid);
Directory ndir = new Directory(duuid, mode); //FIXME:
Log.debug("Creating directory " + name);
if (!found.runWriteLocked(JObject.ResolutionStrategy.REMOTE, (m, d, bump, invalidate) -> {
if (!(d instanceof Directory dir))
return false;
if (dir.getKid(dname).isPresent())
return false;
boolean created = dir.putKid(dname, duuid);
if (!created) return false;
try {
jObjectManager.put(ndir, Optional.of(dir.getName()));
} catch (Exception ex) {
Log.error("Failed creating directory " + dname);
dir.removeKid(dname);
}
bump.apply();
dir.setMtime(System.currentTimeMillis());
return true;
}))
return Optional.empty();
return Optional.of(ndir.getName());
}
private Boolean rmdent(String name) {
var parent = getDirEntry(Path.of(name).getParent().toString());
Optional<UUID> kidId = parent.runReadLocked(JObject.ResolutionStrategy.REMOTE, (m, d) -> {
if (!(d instanceof Directory dir))
return Optional.empty();
return dir.getKid(Path.of(name).getFileName().toString());
});
if (kidId.isEmpty())
return false;
var kid = jObjectManager.get(kidId.get().toString());
if (kid.isEmpty()) return false;
return parent.runWriteLocked(JObject.ResolutionStrategy.REMOTE, (m, d, bump, i) -> {
if (!(d instanceof Directory dir))
return false;
String kname = Path.of(name).getFileName().toString();
if (dir.getKid(kname).isEmpty())
return false;
AtomicBoolean hadRef = new AtomicBoolean(false);
AtomicBoolean removedRef = new AtomicBoolean(false);
try {
kid.get().runWriteLocked(JObject.ResolutionStrategy.REMOTE, (m2, d2, bump2, i2) -> {
if (m2.checkRef(m.getName()))
hadRef.set(true);
if (d2 == null)
throw new StatusRuntimeException(Status.UNAVAILABLE.withDescription("Pessimistically not removing unresolved maybe-directory"));
if (!allowRecursiveDelete)
if (d2 instanceof Directory)
if (!((Directory) d2).getChildren().isEmpty())
throw new DirectoryNotEmptyException();
m2.removeRef(m.getName());
removedRef.set(true);
return null;
});
} catch (Exception ex) {
Log.error("Failed removing dentry " + name, ex);
// If we failed something and removed the ref, try re-adding it
if (hadRef.get() && removedRef.get()) {
AtomicBoolean hadRef2 = new AtomicBoolean(false);
AtomicBoolean addedRef = new AtomicBoolean(false);
try {
kid.get().runWriteLocked(JObject.ResolutionStrategy.REMOTE, (m2, d2, bump2, i2) -> {
if (m2.checkRef(m.getName()))
hadRef2.set(true);
if (d2 == null)
throw new StatusRuntimeException(Status.UNAVAILABLE.withDescription("Pessimistically not removing unresolved maybe-directory"));
if (!allowRecursiveDelete)
if (d2 instanceof Directory)
if (!((Directory) d2).getChildren().isEmpty())
throw new DirectoryNotEmptyException();
m2.addRef(m.getName());
addedRef.set(true);
return null;
});
} catch (Exception e) {
Log.error("Failed fixing up failed-removed dentry " + name, ex);
// If it is not there and we haven't added it, continue removing
if (!(!hadRef2.get() && !addedRef.get())) {
return false;
}
}
} else { // Otherwise, we didn't remove anything, go back.
return false;
}
}
boolean removed = dir.removeKid(kname);
if (removed) {
bump.apply();
dir.setMtime(System.currentTimeMillis());
}
return removed;
});
_tree.move(parent.getName(), new JTreeNodeMetaDirectory(dname), _tree.getNewNodeId());
}
@Override
public Boolean rmdir(String name) {
return rmdent(name);
}
public void unlink(String name) {
var node = getDirEntryOpt(name).orElse(null);
JTreeNodeMeta meta = node.runReadLocked(JObject.ResolutionStrategy.REMOTE, (m, d) -> {
if (d.getNode().getMeta() instanceof JTreeNodeMetaDirectory f)
if (!d.getNode().getChildren().isEmpty()) throw new DirectoryNotEmptyException();
return d.getNode().getMeta();
});
@Override
public Boolean unlink(String name) {
return rmdent(name);
_tree.trash(meta, node.getName());
}
@Override
public Boolean rename(String from, String to) {
var theFile = getDirEntry(from);
var dentFrom = getDirEntry(Paths.get(from).getParent().toString());
var dentTo = getDirEntry(Paths.get(to).getParent().toString());
var node = getDirEntry(from);
JTreeNodeMeta meta = node.runReadLocked(JObject.ResolutionStrategy.REMOTE, (m, d) -> d.getNode().getMeta());
UUID cleanup = null;
var toPath = Path.of(to);
var toDentry = getDirEntry(toPath.getParent().toString());
ensureDir(toDentry);
try {
JObject.rwLockAll(List.of(dentFrom, dentTo));
theFile.rwLock();
if (!dentFrom.tryResolve(JObject.ResolutionStrategy.REMOTE))
throw new StatusRuntimeException(Status.ABORTED.withDescription(dentFrom.getName() + " could not be resolved"));
if (!dentTo.tryResolve(JObject.ResolutionStrategy.REMOTE))
throw new StatusRuntimeException(Status.ABORTED.withDescription(dentTo.getName() + " could not be resolved"));
if (!theFile.tryResolve(JObject.ResolutionStrategy.REMOTE))
throw new StatusRuntimeException(Status.ABORTED.withDescription(theFile.getName() + " could not be resolved"));
if (!(dentFrom.getData() instanceof Directory dentFromD))
throw new StatusRuntimeException(Status.ABORTED.withDescription(dentFrom.getName() + " is not a directory"));
if (!(dentTo.getData() instanceof Directory dentToD))
throw new StatusRuntimeException(Status.ABORTED.withDescription(dentTo.getName() + " is not a directory"));
if (dentFromD.getKid(Paths.get(from).getFileName().toString()).isEmpty())
throw new NotImplementedException("Race when moving (missing)");
FsNode newDent;
if (theFile.getData() instanceof Directory d) {
newDent = d;
theFile.getMeta().removeRef(dentFrom.getName());
theFile.getMeta().addRef(dentTo.getName());
} else if (theFile.getData() instanceof File f) {
var newFile = new File(UUID.randomUUID(), f.getMode(), UUID.fromString(dentTo.getName()), f.isSymlink());
newFile.setMtime(f.getMtime());
newFile.setCtime(f.getCtime());
newFile.getChunks().putAll(f.getChunks());
for (var c : newFile.getChunks().values()) {
var o = jObjectManager.get(ChunkInfo.getNameFromHash(c))
.orElseThrow(() -> new StatusRuntimeException(Status.DATA_LOSS.withDescription("Could not find chunk " + c + " when moving " + from)));
o.runWriteLocked(JObject.ResolutionStrategy.NO_RESOLUTION, (m, d, b, i) -> {
m.addRef(newFile.getName());
return null;
});
}
theFile.getMeta().removeRef(dentFrom.getName());
jObjectManager.put(newFile, Optional.of(dentTo.getName()));
newDent = newFile;
} else {
throw new StatusRuntimeException(Status.ABORTED.withDescription(theFile.getName() + " is of unknown type"));
}
if (!dentFromD.removeKid(Paths.get(from).getFileName().toString()))
throw new NotImplementedException("Should not reach here");
String toFn = Paths.get(to).getFileName().toString();
if (dentToD.getChildren().containsKey(toFn)) {
cleanup = dentToD.getChildren().get(toFn);
}
dentToD.getChildren().put(toFn, newDent.getUuid());
dentToD.setMtime(System.currentTimeMillis());
dentFrom.bumpVer();
dentTo.bumpVer();
} finally {
dentFrom.rwUnlock();
dentTo.rwUnlock();
theFile.rwUnlock();
}
if (cleanup != null) {
jObjectManager.get(cleanup.toString()).ifPresent(c -> {
c.runWriteLocked(JObject.ResolutionStrategy.NO_RESOLUTION, (m, d, b, i) -> {
m.removeRef(dentTo.getName());
return null;
});
});
}
_tree.move(toDentry.getName(), meta.withName(toPath.getFileName().toString()), node.getName());
return true;
}
@Override
public Boolean chmod(String name, long mode) {
var dent = getDirEntry(name);
public Boolean chmod(String uuid, long mode) {
var dent = jObjectManager.get(uuid).orElseThrow(() -> new StatusRuntimeExceptionNoStacktrace(Status.NOT_FOUND));
dent.runWriteLocked(JObject.ResolutionStrategy.REMOTE, (m, d, bump, i) -> {
bump.apply();
d.setMtime(System.currentTimeMillis());
d.setMode(mode);
if (d instanceof TreeNodeJObjectData) {
return null;//FIXME:?
} else if (d instanceof File f) {
bump.apply();
f.setMtime(System.currentTimeMillis());
f.setMode(mode);
} else {
throw new IllegalArgumentException(uuid + " is not a file");
}
return null;
});
@@ -425,10 +229,10 @@ public class DhfsFileServiceImpl implements DhfsFileService {
var found = getDirEntry(name);
return found.runReadLocked(JObject.ResolutionStrategy.REMOTE, (m, d) -> {
if (!(d instanceof Directory)) {
if (!(d instanceof TreeNodeJObjectData) || !(d.getNode().getMeta() instanceof JTreeNodeMetaDirectory)) {
throw new StatusRuntimeException(Status.INVALID_ARGUMENT);
}
return ((Directory) d).getChildrenList();
return new ArrayList<>(d.getNode().getChildren().keySet());
});
}
@@ -871,38 +675,28 @@ public class DhfsFileServiceImpl implements DhfsFileService {
@Override
public String symlink(String oldpath, String newpath) {
var parent = traverse(getRoot(), Path.of(newpath).getParent());
Path path = Path.of(newpath);
var parent = getDirEntry(path.getParent().toString());
String fname = Path.of(newpath).getFileName().toString();
ensureDir(parent);
String fname = path.getFileName().toString();
var fuuid = UUID.randomUUID();
Log.debug("Creating file " + fuuid);
File f = new File(fuuid, 0, UUID.fromString(parent.getName()), true);
File f = new File(fuuid, 0, true);
var newNodeId = _tree.getNewNodeId();
_tree.move(parent.getName(), new JTreeNodeMetaFile(fname, f.getName()), newNodeId);
ChunkData newChunkData = createChunk(UnsafeByteOperations.unsafeWrap(oldpath.getBytes(StandardCharsets.UTF_8)));
ChunkInfo newChunkInfo = new ChunkInfo(newChunkData.getHash(), newChunkData.getBytes().size());
f.getChunks().put(0L, newChunkInfo.getHash());
if (!parent.runWriteLocked(JObject.ResolutionStrategy.REMOTE, (m, d, bump, invalidate) -> {
if (!(d instanceof Directory dir))
return false;
if (dir.getKid(fname).isPresent())
return false;
bump.apply();
boolean created = dir.putKid(fname, fuuid);
if (!created) return false;
jObjectManager.put(newChunkData, Optional.of(newChunkInfo.getName()));
jObjectManager.put(newChunkInfo, Optional.of(f.getName()));
jObjectManager.put(f, Optional.of(dir.getName()));
dir.setMtime(System.currentTimeMillis());
return true;
})) return null;
jObjectManager.put(newChunkData, Optional.of(newChunkInfo.getName()));
jObjectManager.put(newChunkInfo, Optional.of(f.getName()));
jObjectManager.put(f, Optional.of(newNodeId));
return f.getName();
}
@@ -912,7 +706,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
var file = jObjectManager.get(fileUuid).orElseThrow(
() -> new StatusRuntimeException(Status.NOT_FOUND.withDescription(
"File not found for setTimes: " + fileUuid))
);
);
file.runWriteLocked(JObject.ResolutionStrategy.REMOTE, (m, fileData, bump, i) -> {
if (!(fileData instanceof FsNode fd))
@@ -929,7 +723,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
@Override
public Long size(String uuid) {
var read = jObjectManager.get(uuid)
.orElseThrow(() -> new StatusRuntimeException(Status.NOT_FOUND));
.orElseThrow(() -> new StatusRuntimeException(Status.NOT_FOUND));
try {
return read.runReadLocked(JObject.ResolutionStrategy.REMOTE, (fsNodeData, fileData) -> {
@@ -949,12 +743,4 @@ public class DhfsFileServiceImpl implements DhfsFileService {
return -1L;
}
}
private JObject<Directory> getRoot() {
var read = jObjectManager.get(new UUID(0, 0).toString());
if (read.isEmpty()) {
Log.error("Root directory not found");
}
return (JObject<Directory>) read.get();
}
}

View File

@@ -1,4 +1,8 @@
package com.usatiuk.dhfs.files.service;
public class DirectoryNotEmptyException extends RuntimeException {
@Override
public synchronized Throwable fillInStackTrace() {
return this;
}
}

View File

@@ -0,0 +1,4 @@
package com.usatiuk.dhfs.files.service;
public record GetattrRes(long mtime, long ctime, long mode, GetattrType type) {
}

View File

@@ -0,0 +1,7 @@
package com.usatiuk.dhfs.files.service;
public enum GetattrType {
FILE,
DIRECTORY,
SYMLINK
}

View File

@@ -2,11 +2,9 @@ package com.usatiuk.dhfs.fuse;
import com.google.protobuf.UnsafeByteOperations;
import com.sun.security.auth.module.UnixSystem;
import com.usatiuk.dhfs.files.objects.Directory;
import com.usatiuk.dhfs.files.objects.File;
import com.usatiuk.dhfs.files.objects.FsNode;
import com.usatiuk.dhfs.files.service.DhfsFileService;
import com.usatiuk.dhfs.files.service.DirectoryNotEmptyException;
import com.usatiuk.dhfs.files.service.GetattrRes;
import com.usatiuk.dhfs.objects.repository.persistence.ObjectPersistentStore;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
@@ -64,7 +62,7 @@ public class DhfsFuse extends FuseStubFS {
var gid = new UnixSystem().getGid();
mount(Paths.get(root), false, debug,
new String[]{"-o", "auto_cache", "-o", "uid=" + uid, "-o", "gid=" + gid});
new String[]{"-o", "auto_cache", "-o", "uid=" + uid, "-o", "gid=" + gid});
}
void shutdown(@Observes @Priority(1) ShutdownEvent event) {
@@ -100,30 +98,34 @@ public class DhfsFuse extends FuseStubFS {
var fileOpt = fileService.open(path);
if (fileOpt.isEmpty()) return -ErrorCodes.ENOENT();
var uuid = fileOpt.get();
Optional<FsNode> found = fileService.getattr(uuid);
Optional<GetattrRes> found = fileService.getattr(uuid);
if (found.isEmpty()) {
return -ErrorCodes.ENOENT();
}
if (found.get() instanceof File f) {
if (f.isSymlink())
stat.st_mode.set(S_IFLNK | 0777); // FIXME?
else
stat.st_mode.set(S_IFREG | f.getMode());
stat.st_nlink.set(1);
stat.st_size.set(fileService.size(uuid));
} else if (found.get() instanceof Directory d) {
stat.st_mode.set(S_IFDIR | d.getMode());
stat.st_nlink.set(2);
switch (found.get().type()) {
case FILE -> {
stat.st_mode.set(S_IFREG | found.get().mode());
stat.st_nlink.set(1);
stat.st_size.set(fileService.size(uuid));
}
case DIRECTORY -> {
stat.st_mode.set(S_IFDIR | found.get().mode());
stat.st_nlink.set(2);
}
case SYMLINK -> {
stat.st_mode.set(S_IFLNK | 0777);
stat.st_nlink.set(1);
stat.st_size.set(fileService.size(uuid));
}
}
var foundDent = (FsNode) found.get();
// FIXME: Race?
stat.st_ctim.tv_sec.set(foundDent.getCtime() / 1000);
stat.st_ctim.tv_nsec.set((foundDent.getCtime() % 1000) * 1000);
stat.st_mtim.tv_sec.set(foundDent.getMtime() / 1000);
stat.st_mtim.tv_nsec.set((foundDent.getMtime() % 1000) * 1000);
stat.st_atim.tv_sec.set(foundDent.getMtime() / 1000);
stat.st_atim.tv_nsec.set((foundDent.getMtime() % 1000) * 1000);
stat.st_ctim.tv_sec.set(found.get().ctime() / 1000);
stat.st_ctim.tv_nsec.set((found.get().ctime() % 1000) * 1000);
stat.st_mtim.tv_sec.set(found.get().mtime() / 1000);
stat.st_mtim.tv_nsec.set((found.get().mtime() % 1000) * 1000);
stat.st_atim.tv_sec.set(found.get().mtime() / 1000);
stat.st_atim.tv_nsec.set((found.get().mtime() % 1000) * 1000);
} catch (Exception e) {
Log.error("When getattr " + path, e);
return -ErrorCodes.EIO();
@@ -138,8 +140,8 @@ public class DhfsFuse extends FuseStubFS {
if (fileOpt.isEmpty()) return -ErrorCodes.ENOENT();
var file = fileOpt.get();
var res = fileService.setTimes(file,
timespec[0].tv_sec.get() * 1000,
timespec[1].tv_sec.get() * 1000);
timespec[0].tv_sec.get() * 1000,
timespec[1].tv_sec.get() * 1000);
if (!res) return -ErrorCodes.EINVAL();
else return 0;
} catch (Exception e) {
@@ -208,9 +210,8 @@ public class DhfsFuse extends FuseStubFS {
@Override
public int mkdir(String path, long mode) {
try {
var ret = fileService.mkdir(path, mode);
if (ret.isEmpty()) return -ErrorCodes.ENOSPC();
else return 0;
fileService.mkdir(path, mode);
return 0;
} catch (Exception e) {
Log.error("When creating dir " + path, e);
return -ErrorCodes.EIO();
@@ -220,9 +221,8 @@ public class DhfsFuse extends FuseStubFS {
@Override
public int rmdir(String path) {
try {
var ret = fileService.rmdir(path);
if (!ret) return -ErrorCodes.ENOENT();
else return 0;
fileService.unlink(path);
return 0;
} catch (DirectoryNotEmptyException ex) {
return -ErrorCodes.ENOTEMPTY();
} catch (Exception e) {
@@ -247,9 +247,8 @@ public class DhfsFuse extends FuseStubFS {
@Override
public int unlink(String path) {
try {
var ret = fileService.unlink(path);
if (!ret) return -ErrorCodes.ENOENT();
else return 0;
fileService.unlink(path);
return 0;
} catch (Exception e) {
Log.error("When unlinking " + path, e);
return -ErrorCodes.EIO();
@@ -277,7 +276,9 @@ public class DhfsFuse extends FuseStubFS {
@Override
public int chmod(String path, long mode) {
try {
var ret = fileService.chmod(path, mode);
var fileOpt = fileService.open(path);
if (fileOpt.isEmpty()) return -ErrorCodes.ENOENT();
var ret = fileService.chmod(fileOpt.get(), mode);
if (ret) return 0;
else return -ErrorCodes.EINVAL();
} catch (Exception e) {

View File

@@ -0,0 +1,54 @@
package com.usatiuk.dhfs.objects.jklepmanntree;
import com.usatiuk.dhfs.objects.jklepmanntree.helpers.StorageInterfaceService;
import com.usatiuk.dhfs.objects.jklepmanntree.structs.JTreeNodeMeta;
import com.usatiuk.kleppmanntree.CombinedTimestamp;
import com.usatiuk.kleppmanntree.KleppmannTree;
import com.usatiuk.kleppmanntree.OpMove;
import java.util.List;
import java.util.UUID;
public class JKleppmannTree {
private final JKleppmannTreePersistentData _persistentData;
private final JStorageInterface _storageInterface;
private final KleppmannTree<Long, UUID, String, JTreeNodeMeta, String, JTreeNodeWrapper> _tree;
JKleppmannTree(JKleppmannTreePersistentData persistentData, StorageInterfaceService storageInterfaceService, JPeerInterface peerInterface) {
_persistentData = persistentData;
var si = new JStorageInterface(persistentData, storageInterfaceService);
_storageInterface = si;
si.ensureRootCreated();
_tree = new KleppmannTree<>(si, peerInterface, _persistentData.getClock());
}
private CombinedTimestamp<Long, UUID> getTimestamp() {
return new CombinedTimestamp<>(_persistentData.getClock().getTimestamp(), _persistentData.getSelfUuid());
}
public String traverse(List<String> names) {
return _tree.traverse(names);
}
OpMove<Long, UUID, String, JTreeNodeMeta, String> createMove(String newParent, JTreeNodeMeta newMeta, String node) {
return new OpMove<>(getTimestamp(), newParent, newMeta, node);
}
public void move(String newParent, JTreeNodeMeta newMeta, String node) {
applyOp(createMove(newParent, newMeta, node));
}
public String getNewNodeId() {
return _storageInterface.getNewNodeId();
}
public void trash(JTreeNodeMeta newMeta, String node) {
applyOp(createMove(_storageInterface.getTrashId(), newMeta, node));
}
public void applyOp(OpMove<Long, UUID, String, JTreeNodeMeta, String> opMove) {
_persistentData.recordOp(opMove);
_tree.applyOp(opMove);
}
}

View File

@@ -0,0 +1,79 @@
package com.usatiuk.dhfs.objects.jklepmanntree;
import com.usatiuk.dhfs.SerializationHelper;
import com.usatiuk.dhfs.objects.jklepmanntree.helpers.OpQueueHelper;
import com.usatiuk.dhfs.objects.jklepmanntree.helpers.StorageInterfaceService;
import com.usatiuk.kleppmanntree.AtomicClock;
import io.quarkus.logging.Log;
import io.quarkus.runtime.ShutdownEvent;
import io.quarkus.runtime.StartupEvent;
import jakarta.annotation.Priority;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
import jakarta.inject.Inject;
import org.apache.commons.lang3.SerializationUtils;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.concurrent.ConcurrentHashMap;
import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
@ApplicationScoped
public class JKleppmannTreeManager {
@Inject
JPeerInterface jPeerInterface;
@Inject
StorageInterfaceService storageInterfaceService;
@Inject
OpQueueHelper opQueueHelper;
@ConfigProperty(name = "dhfs.objects.root")
String dataRoot;
private static final String dataFileName = "trees";
// FIXME: There should be something smarter...
private ConcurrentHashMap<String, JKleppmannTreePersistentData> _persistentData = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, JKleppmannTree> _trees = new ConcurrentHashMap<>();
void init(@Observes @Priority(300) StartupEvent event) throws IOException {
Paths.get(dataRoot).toFile().mkdirs();
Log.info("Initializing with root " + dataRoot);
if (Paths.get(dataRoot).resolve(dataFileName).toFile().exists()) {
Log.info("Reading tree data");
_persistentData = SerializationHelper.deserialize(Files.readAllBytes(Paths.get(dataRoot).resolve(dataFileName)));
} else if (Paths.get(dataRoot).resolve(dataFileName + ".bak").toFile().exists()) {
Log.warn("Reading tree data from backup");
_persistentData = SerializationHelper.deserialize(Files.readAllBytes(Paths.get(dataRoot).resolve(dataFileName)));
}
// FIXME!: Handle unclean shutdowns
}
void shutdown(@Observes @Priority(300) ShutdownEvent event) throws IOException {
Log.info("Saving tree data");
writeData();
Log.info("Saved tree data");
}
private void writeData() {
try {
if (Paths.get(dataRoot).resolve(dataFileName).toFile().exists())
Files.move(Paths.get(dataRoot).resolve(dataFileName), Paths.get(dataRoot).resolve(dataFileName + ".bak"), REPLACE_EXISTING);
Files.write(Paths.get(dataRoot).resolve(dataFileName), SerializationUtils.serialize(_persistentData));
} catch (IOException iex) {
Log.error("Error writing deferred invalidations data", iex);
throw new RuntimeException(iex);
}
}
public JKleppmannTree getTree(String name) {
return _trees.computeIfAbsent(name, this::createTree);
}
private JKleppmannTree createTree(String name) {
return new JKleppmannTree(_persistentData.computeIfAbsent(name, n -> new JKleppmannTreePersistentData(opQueueHelper, n, new AtomicClock())), storageInterfaceService, jPeerInterface);
}
}

View File

@@ -0,0 +1,61 @@
package com.usatiuk.dhfs.objects.jklepmanntree;
import com.usatiuk.dhfs.objects.jklepmanntree.helpers.OpQueueHelper;
import com.usatiuk.dhfs.objects.jklepmanntree.structs.JTreeNodeMeta;
import com.usatiuk.dhfs.objects.repository.invalidation.OpQueue;
import com.usatiuk.kleppmanntree.AtomicClock;
import com.usatiuk.kleppmanntree.CombinedTimestamp;
import com.usatiuk.kleppmanntree.LogOpMove;
import com.usatiuk.kleppmanntree.OpMove;
import lombok.Getter;
import java.io.Serializable;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class JKleppmannTreePersistentData implements Serializable, OpQueue {
@Getter
private final String _name;
@Getter
private final AtomicClock _clock;
@Getter
private final UUID _selfUuid;
@Getter
private final ConcurrentHashMap<UUID, ConcurrentLinkedQueue<OpMove<Long, UUID, String, JTreeNodeMeta, String>>> _queues = new ConcurrentHashMap<>();
@Getter
private final ConcurrentSkipListMap<CombinedTimestamp<Long, UUID>, LogOpMove<Long, UUID, String, ? extends JTreeNodeMeta, String>> _log = new ConcurrentSkipListMap<>();
@Getter
private final ReentrantReadWriteLock _logLock = new ReentrantReadWriteLock();
private final OpQueueHelper _helper;
JKleppmannTreePersistentData(OpQueueHelper opQueueHelper, String name, AtomicClock clock) {
_name = name;
_clock = clock;
_helper = opQueueHelper;
_selfUuid = _helper.getSelfUUid();
_helper.registerOnConnection(this);
}
@Override
public Object getForHost(UUID host) {
if (_queues.containsKey(host)) {
return _queues.get(host).poll();
}
return null;
}
void recordOp(OpMove<Long, UUID, String, JTreeNodeMeta, String> opMove) {
for (var u : _helper.getHostList()) {
_queues.computeIfAbsent(u, h -> new ConcurrentLinkedQueue<>());
_queues.get(u).add(opMove);
}
notifyInvQueue();
}
protected void notifyInvQueue() {
}
}

View File

@@ -0,0 +1,26 @@
package com.usatiuk.dhfs.objects.jklepmanntree;
import com.usatiuk.dhfs.objects.repository.PersistentRemoteHostsService;
import com.usatiuk.kleppmanntree.PeerInterface;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import org.apache.commons.lang3.NotImplementedException;
import java.util.Collection;
import java.util.UUID;
@Singleton
public class JPeerInterface implements PeerInterface<UUID> {
@Inject
PersistentRemoteHostsService persistentRemoteHostsService;
@Override
public UUID getSelfId() {
return persistentRemoteHostsService.getSelfUuid();
}
@Override
public Collection<UUID> getAllPeers() {
throw new NotImplementedException();
}
}

View File

@@ -0,0 +1,96 @@
package com.usatiuk.dhfs.objects.jklepmanntree;
import com.usatiuk.dhfs.objects.jklepmanntree.helpers.StorageInterfaceService;
import com.usatiuk.dhfs.objects.jklepmanntree.structs.JTreeNodeMeta;
import com.usatiuk.dhfs.objects.jklepmanntree.structs.JTreeNodeMetaDirectory;
import com.usatiuk.dhfs.objects.jklepmanntree.structs.TreeNodeJObjectData;
import com.usatiuk.dhfs.objects.jrepository.JObject;
import com.usatiuk.kleppmanntree.CombinedTimestamp;
import com.usatiuk.kleppmanntree.LogOpMove;
import com.usatiuk.kleppmanntree.StorageInterface;
import com.usatiuk.kleppmanntree.TreeNode;
import org.apache.commons.lang3.NotImplementedException;
import java.util.Collection;
import java.util.NavigableMap;
import java.util.UUID;
public class JStorageInterface implements StorageInterface<Long, UUID, String, JTreeNodeMeta, String, JTreeNodeWrapper> {
private final JKleppmannTreePersistentData _persistentData;
private final StorageInterfaceService _storageInterfaceService;
public JStorageInterface(JKleppmannTreePersistentData persistentData, StorageInterfaceService storageInterfaceService) {
_persistentData = persistentData;
_storageInterfaceService = storageInterfaceService;
}
public void ensureRootCreated() {
if (_storageInterfaceService.getObject(getRootId()).isEmpty()) {
_storageInterfaceService.putObject(new TreeNodeJObjectData(new TreeNode<>(getRootId(), null, new JTreeNodeMetaDirectory(""))));
_storageInterfaceService.putObject(new TreeNodeJObjectData(new TreeNode<>(getTrashId(), null, null)));
}
}
@Override
public String getRootId() {
return _persistentData.getName() + "_jt_root";
}
@Override
public String getTrashId() {
return _persistentData.getName() + "_jt_trash";
}
@Override
public String getNewNodeId() {
return _storageInterfaceService.getUniqueId();
}
@Override
public JTreeNodeWrapper getById(String id) {
var got = _storageInterfaceService.getObject(id);
if (got.isEmpty()) return null;
return new JTreeNodeWrapper((JObject<TreeNodeJObjectData>) got.get());
}
@Override
public JTreeNodeWrapper createNewNode(TreeNode<String, JTreeNodeMeta, String> node) {
return new JTreeNodeWrapper(_storageInterfaceService.putObjectLocked(new TreeNodeJObjectData(node)));
}
@Override
public void removeNode(String id) {
// TODO:
}
@Override
public void lockSet(Collection<JTreeNodeWrapper> nodes) {
throw new NotImplementedException();
}
@Override
public NavigableMap<CombinedTimestamp<Long, UUID>, LogOpMove<Long, UUID, String, ? extends JTreeNodeMeta, String>> getLog() {
return _persistentData.getLog();
}
@Override
public void globalRwLock() {
_persistentData.getLogLock().writeLock().lock();
}
@Override
public void globalRwUnlock() {
_persistentData.getLogLock().writeLock().unlock();
}
@Override
public void globalRLock() {
_persistentData.getLogLock().readLock().lock();
}
@Override
public void globalRUnlock() {
_persistentData.getLogLock().readLock().unlock();
}
}

View File

@@ -0,0 +1,49 @@
package com.usatiuk.dhfs.objects.jklepmanntree;
import com.usatiuk.dhfs.objects.jklepmanntree.structs.JTreeNodeMeta;
import com.usatiuk.dhfs.objects.jklepmanntree.structs.TreeNodeJObjectData;
import com.usatiuk.dhfs.objects.jrepository.JObject;
import com.usatiuk.kleppmanntree.TreeNode;
import com.usatiuk.kleppmanntree.TreeNodeWrapper;
public class JTreeNodeWrapper implements TreeNodeWrapper<String, JTreeNodeMeta, String> {
private final JObject<TreeNodeJObjectData> _backing;
public JTreeNodeWrapper(JObject<TreeNodeJObjectData> backing) {_backing = backing;}
@Override
public void rLock() {
_backing.rLock();
}
@Override
public void rUnlock() {
_backing.rUnlock();
}
@Override
public void rwLock() {
_backing.rwLock();
}
@Override
public void rwUnlock() {
_backing.bumpVer(); // FIXME:?
_backing.rwUnlock();
}
@Override
public void notifyRef(String id) {
_backing.getMeta().addRef(id);
}
@Override
public void notifyRmRef(String id) {
_backing.getMeta().removeRef(id);
}
@Override
public TreeNode<String, JTreeNodeMeta, String> getNode() {
return _backing.getData().getNode();
}
}

View File

@@ -0,0 +1,37 @@
package com.usatiuk.dhfs.objects.jklepmanntree.helpers;
import com.usatiuk.dhfs.objects.jklepmanntree.JKleppmannTreePersistentData;
import com.usatiuk.dhfs.objects.repository.PersistentRemoteHostsService;
import com.usatiuk.dhfs.objects.repository.RemoteHostManager;
import com.usatiuk.dhfs.objects.repository.invalidation.InvalidationQueueService;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import java.util.Collection;
import java.util.UUID;
@Singleton
public class OpQueueHelper {
@Inject
RemoteHostManager remoteHostManager;
@Inject
InvalidationQueueService invalidationQueueService;
@Inject
PersistentRemoteHostsService persistentRemoteHostsService;
public void registerOnConnection(JKleppmannTreePersistentData self) {
remoteHostManager.registerConnectEventListener(h -> notifyInvQueue(self));
}
public void notifyInvQueue(JKleppmannTreePersistentData self) {
invalidationQueueService.pushInvalidationToAll(self);
}
public UUID getSelfUUid() {
return persistentRemoteHostsService.getSelfUuid();
}
public Collection<UUID> getHostList() {
return persistentRemoteHostsService.getHostUuids();
}
}

View File

@@ -0,0 +1,35 @@
package com.usatiuk.dhfs.objects.jklepmanntree.helpers;
import com.usatiuk.dhfs.objects.jklepmanntree.structs.TreeNodeJObjectData;
import com.usatiuk.dhfs.objects.jrepository.JObject;
import com.usatiuk.dhfs.objects.jrepository.JObjectManager;
import com.usatiuk.dhfs.objects.repository.PersistentRemoteHostsService;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import java.util.Optional;
@Singleton
public class StorageInterfaceService {
@Inject
PersistentRemoteHostsService persistentRemoteHostsService;
@Inject
JObjectManager jObjectManager;
public String getUniqueId() {
return persistentRemoteHostsService.getUniqueId();
}
public Optional<JObject<?>> getObject(String id) {
return jObjectManager.get(id);
}
public JObject<TreeNodeJObjectData> putObject(TreeNodeJObjectData node) {
return jObjectManager.put(node, Optional.ofNullable(node.getNode().getParent()));
}
public JObject<TreeNodeJObjectData> putObjectLocked(TreeNodeJObjectData node) {
return jObjectManager.putLocked(node, Optional.ofNullable(node.getNode().getParent()));
}
}

View File

@@ -0,0 +1,20 @@
package com.usatiuk.dhfs.objects.jklepmanntree.serializers;
import com.usatiuk.dhfs.objects.jklepmanntree.structs.JTreeNodeMetaDirectory;
import com.usatiuk.dhfs.objects.persistence.TreeNodeMetaDirectoryP;
import com.usatiuk.dhfs.objects.protoserializer.ProtoDeserializer;
import com.usatiuk.dhfs.objects.protoserializer.ProtoSerializer;
import jakarta.enterprise.context.ApplicationScoped;
@ApplicationScoped
public class JTreeNodeMetaDirectorySerializer implements ProtoDeserializer<TreeNodeMetaDirectoryP, JTreeNodeMetaDirectory>, ProtoSerializer<TreeNodeMetaDirectoryP, JTreeNodeMetaDirectory> {
@Override
public JTreeNodeMetaDirectory deserialize(TreeNodeMetaDirectoryP message) {
return new JTreeNodeMetaDirectory(message.getName());
}
@Override
public TreeNodeMetaDirectoryP serialize(JTreeNodeMetaDirectory object) {
return TreeNodeMetaDirectoryP.newBuilder().setName(object.getName()).build();
}
}

View File

@@ -0,0 +1,20 @@
package com.usatiuk.dhfs.objects.jklepmanntree.serializers;
import com.usatiuk.dhfs.objects.jklepmanntree.structs.JTreeNodeMetaFile;
import com.usatiuk.dhfs.objects.persistence.TreeNodeMetaFileP;
import com.usatiuk.dhfs.objects.protoserializer.ProtoDeserializer;
import com.usatiuk.dhfs.objects.protoserializer.ProtoSerializer;
import jakarta.enterprise.context.ApplicationScoped;
@ApplicationScoped
public class JTreeNodeMetaFileSerializer implements ProtoDeserializer<TreeNodeMetaFileP, JTreeNodeMetaFile>, ProtoSerializer<TreeNodeMetaFileP, JTreeNodeMetaFile> {
@Override
public JTreeNodeMetaFile deserialize(TreeNodeMetaFileP message) {
return new JTreeNodeMetaFile(message.getName(), message.getFileMirror());
}
@Override
public TreeNodeMetaFileP serialize(JTreeNodeMetaFile object) {
return TreeNodeMetaFileP.newBuilder().setName(object.getName()).setFileMirror(object.getFileIno()).build();
}
}

View File

@@ -0,0 +1,53 @@
package com.usatiuk.dhfs.objects.jklepmanntree.serializers;
import com.usatiuk.dhfs.objects.jklepmanntree.structs.JTreeNodeMeta;
import com.usatiuk.dhfs.objects.jklepmanntree.structs.JTreeNodeMetaDirectory;
import com.usatiuk.dhfs.objects.jklepmanntree.structs.JTreeNodeMetaFile;
import com.usatiuk.dhfs.objects.jklepmanntree.structs.TreeNodeJObjectData;
import com.usatiuk.dhfs.objects.persistence.TreeNodeMetaDirectoryP;
import com.usatiuk.dhfs.objects.persistence.TreeNodeMetaFileP;
import com.usatiuk.dhfs.objects.persistence.TreeNodeP;
import com.usatiuk.dhfs.objects.protoserializer.ProtoDeserializer;
import com.usatiuk.dhfs.objects.protoserializer.ProtoSerializer;
import com.usatiuk.dhfs.objects.protoserializer.ProtoSerializerService;
import com.usatiuk.kleppmanntree.TreeNode;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
@ApplicationScoped
public class TreeNodeProtoSerializer implements ProtoDeserializer<TreeNodeP, TreeNodeJObjectData>, ProtoSerializer<TreeNodeP, TreeNodeJObjectData> {
@Inject
ProtoSerializerService protoSerializerService;
@Override
public TreeNodeJObjectData deserialize(TreeNodeP message) {
JTreeNodeMeta meta = switch (message.getMetaCase()) {
case FILE -> (JTreeNodeMetaFile) protoSerializerService.deserialize(message.getFile());
case DIR -> (JTreeNodeMetaDirectory) protoSerializerService.deserialize(message.getDir());
case META_NOT_SET -> null;
};
var node = new TreeNode<>(
message.getId(),
message.hasParent() ? message.getParent() : null,
meta
);
node.getChildren().putAll(message.getChildrenMap());
return new TreeNodeJObjectData(node);
}
@Override
public TreeNodeP serialize(TreeNodeJObjectData object) {
var builder = TreeNodeP.newBuilder().setId(object.getNode().getId()).putAllChildren(object.getNode().getChildren());
if (object.getNode().getParent() != null)
builder.setParent(object.getNode().getParent());
switch (object.getNode().getMeta()) {
case JTreeNodeMetaFile jTreeNodeMetaFile ->
builder.setFile((TreeNodeMetaFileP) protoSerializerService.serialize(jTreeNodeMetaFile));
case JTreeNodeMetaDirectory jTreeNodeMetaDirectory ->
builder.setDir((TreeNodeMetaDirectoryP) protoSerializerService.serialize(jTreeNodeMetaDirectory));
case null, default -> {
}
}
return builder.build();
}
}

View File

@@ -0,0 +1,13 @@
package com.usatiuk.dhfs.objects.jklepmanntree.structs;
import com.usatiuk.kleppmanntree.NodeMeta;
import lombok.Getter;
public abstract class JTreeNodeMeta implements NodeMeta<String> {
@Getter
private final String _name;
public JTreeNodeMeta(String name) {_name = name;}
public abstract JTreeNodeMeta withName(String name);
}

View File

@@ -0,0 +1,12 @@
package com.usatiuk.dhfs.objects.jklepmanntree.structs;
public class JTreeNodeMetaDirectory extends JTreeNodeMeta {
public JTreeNodeMetaDirectory(String name) {
super(name);
}
@Override
public JTreeNodeMeta withName(String name) {
return new JTreeNodeMetaDirectory(name);
}
}

View File

@@ -0,0 +1,18 @@
package com.usatiuk.dhfs.objects.jklepmanntree.structs;
import lombok.Getter;
public class JTreeNodeMetaFile extends JTreeNodeMeta {
@Getter
private final String _fileIno;
public JTreeNodeMetaFile(String name, String fileIno) {
super(name);
_fileIno = fileIno;
}
@Override
public JTreeNodeMeta withName(String name) {
return new JTreeNodeMetaFile(name, _fileIno);
}
}

View File

@@ -0,0 +1,39 @@
package com.usatiuk.dhfs.objects.jklepmanntree.structs;
import com.usatiuk.dhfs.objects.jrepository.JObjectData;
import com.usatiuk.dhfs.objects.jrepository.OnlyLocal;
import com.usatiuk.dhfs.objects.repository.ConflictResolver;
import com.usatiuk.kleppmanntree.TreeNode;
import lombok.Getter;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
// FIXME: Ideally this is two classes?
@OnlyLocal
public class TreeNodeJObjectData extends JObjectData {
@Getter
final TreeNode<String, JTreeNodeMeta, String> _node;
public TreeNodeJObjectData(TreeNode<String, JTreeNodeMeta, String> node) {
_node = node;
}
@Override
public String getName() {
return _node.getId();
}
@Override
public Class<? extends ConflictResolver> getConflictResolver() {
return null;
}
@Override
public Collection<String> extractRefs() {
if (_node.getMeta() instanceof JTreeNodeMetaFile)
return List.of(((JTreeNodeMetaFile) _node.getMeta()).getFileIno());
return Collections.unmodifiableCollection(_node.getChildren().values());
}
}

View File

@@ -8,7 +8,10 @@ import io.quarkus.logging.Log;
import jakarta.annotation.Nullable;
import org.apache.commons.lang3.NotImplementedException;
import java.util.*;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -26,6 +29,7 @@ public class JObject<T extends JObjectData> {
final int metaHash;
final int externalHash;
final boolean data;
boolean forceInvalidate = false;
final HashSet<String> oldRefs;
TransactionState() {
@@ -45,7 +49,7 @@ public class JObject<T extends JObjectData> {
}
}
void commit(boolean forceInvalidate) {
void commit() {
_resolver.updateDeletionState(JObject.this);
var newDataHash = _metaPart.dataHash();
@@ -62,10 +66,14 @@ public class JObject<T extends JObjectData> {
newDataHash != dataHash
|| newData != data
|| forceInvalidate
);
);
verifyRefs(oldRefs);
}
void forceInvalidate() {
forceInvalidate = true;
}
}
private TransactionState _transactionState = null;
@@ -100,6 +108,10 @@ public class JObject<T extends JObjectData> {
_metaPart.narrowClass(klass);
}
public boolean isOnlyLocal() {
return getKnownClass().isAnnotationPresent(OnlyLocal.class);
}
public String getName() {
return _metaPart.getName();
}
@@ -241,14 +253,15 @@ public class JObject<T extends JObjectData> {
_lock.readLock().unlock();
}
public void rwUnlock() {
rwUnlock(false);
protected void forceInvalidate() {
assertRWLock();
_transactionState.forceInvalidate();
}
public void rwUnlock(boolean forceInvalidate) {
public void rwUnlock() {
try {
if (_lock.writeLock().getHoldCount() == 1) {
_transactionState.commit(forceInvalidate);
_transactionState.commit();
_transactionState = null;
}
} catch (Exception ex) {
@@ -258,8 +271,12 @@ public class JObject<T extends JObjectData> {
}
}
public boolean haveRwLock() {
return _lock.isWriteLockedByCurrentThread();
}
public void assertRWLock() {
if (!_lock.isWriteLockedByCurrentThread())
if (!haveRwLock())
throw new IllegalStateException("Expected to be write-locked there: " + getName() + " " + Thread.currentThread().getName());
}
@@ -328,7 +345,7 @@ public class JObject<T extends JObjectData> {
throw new IllegalStateException("Expected to be deleted when discarding data");
_dataPart.set(null);
_metaPart.setHaveLocalCopy(false);
_metaPart.setSavedRefs(Collections.emptySet());
_metaPart.setSavedRefs(new HashSet<>());
}
public enum ResolutionStrategy {

View File

@@ -11,8 +11,10 @@ public interface JObjectManager {
// Put a new object
<T extends JObjectData> JObject<T> put(T object, Optional<String> parent);
<T extends JObjectData> JObject<T> putLocked(T object, Optional<String> parent);
// Get an object with a name if it exists, otherwise create new one based on metadata
// Should be used when working with objects referenced from the outside
JObject<?> getOrPut(String name, Class<? extends JObjectData> klass, Optional<String> parent);
JObject<?> getOrPutLocked(String name, Class<? extends JObjectData> klass, Optional<String> parent);
}

View File

@@ -121,7 +121,7 @@ public class JObjectManagerImpl implements JObjectManager {
}
@Override
public <D extends JObjectData> JObject<D> put(D object, Optional<String> parent) {
public <D extends JObjectData> JObject<D> putLocked(D object, Optional<String> parent) {
while (true) {
JObject<?> ret;
JObject<?> newObj = null;
@@ -161,20 +161,30 @@ public class JObjectManagerImpl implements JObjectManager {
});
} finally {
if (newObj != null)
newObj.rwUnlock(true);
newObj.forceInvalidate();
}
if (newObj == null)
if (newObj == null) {
jObjectLRU.notifyAccess(ret);
ret.rwLock();
}
return (JObject<D>) ret;
}
}
@Override
public JObject<?> getOrPut(String name, Class<? extends JObjectData> klass, Optional<String> parent) {
public <D extends JObjectData> JObject<D> put(D object, Optional<String> parent) {
var ret = putLocked(object, parent);
ret.rwUnlock();
return ret;
}
@Override
public JObject<?> getOrPutLocked(String name, Class<? extends JObjectData> klass, Optional<String> parent) {
while (true) {
var got = get(name);
if (got.isPresent()) {
got.get().rwLock();
got.get().narrowClass(klass);
got.get().markSeen();
parent.ifPresent(s -> got.get().runWriteLocked(JObject.ResolutionStrategy.NO_RESOLUTION, (m, d, b, i) -> {
@@ -190,26 +200,29 @@ public class JObjectManagerImpl implements JObjectManager {
JObject<?> ret = null;
var created = new JObject<>(jObjectResolver, new ObjectMetadata(name, false, klass));
created.rwLock();
try {
while (ret == null) {
var ref = _map.computeIfAbsent(name, k -> new NamedWeakReference(created, _refQueue));
if (ref.get() == null) _map.remove(name, ref);
else ret = ref.get();
}
if (ret != created) continue;
created.runWriteLocked(JObject.ResolutionStrategy.NO_RESOLUTION, (m, d, b, i) -> {
parent.ifPresent(m::addRef);
m.markSeen();
return null;
});
} finally {
created.rwUnlock();
while (ret == null) {
var ref = _map.computeIfAbsent(name, k -> new NamedWeakReference(created, _refQueue));
if (ref.get() == null) _map.remove(name, ref);
else ret = ref.get();
}
if (ret != created) continue;
created.runWriteLocked(JObject.ResolutionStrategy.NO_RESOLUTION, (m, d, b, i) -> {
parent.ifPresent(m::addRef);
m.markSeen();
return null;
});
return created;
}
}
@Override
public JObject<?> getOrPut(String name, Class<? extends JObjectData> klass, Optional<String> parent) {
var obj = getOrPutLocked(name, klass, parent);
obj.rwUnlock();
return obj;
}
private static class NamedWeakReference extends WeakReference<JObject<?>> {
@Getter
final String _key;

View File

@@ -86,7 +86,7 @@ public class JObjectRefProcessor {
_movableProcessorExecutorService.submit(() -> {
boolean delay = false;
try {
var knownHosts = persistentRemoteHostsService.getHostsUuid();
var knownHosts = persistentRemoteHostsService.getHostUuids();
List<UUID> missing = new ArrayList<>();
var ourReferrers = obj.runReadLocked(JObject.ResolutionStrategy.NO_RESOLUTION, (m, d) -> {
@@ -133,7 +133,7 @@ public class JObjectRefProcessor {
private boolean processMovable(JObject<?> obj) {
obj.assertRWLock();
var knownHosts = persistentRemoteHostsService.getHostsUuid();
var knownHosts = persistentRemoteHostsService.getHostUuids();
boolean missing = false;
for (var x : knownHosts)
if (!obj.getMeta().getConfirmedDeletes().contains(x)) {

View File

@@ -2,7 +2,7 @@ package com.usatiuk.dhfs.objects.jrepository;
import com.google.common.collect.Streams;
import com.usatiuk.dhfs.objects.protoserializer.ProtoSerializerService;
import com.usatiuk.dhfs.objects.repository.InvalidationQueueService;
import com.usatiuk.dhfs.objects.repository.invalidation.InvalidationQueueService;
import com.usatiuk.dhfs.objects.repository.PersistentRemoteHostsService;
import com.usatiuk.dhfs.objects.repository.RemoteObjectServiceClient;
import com.usatiuk.dhfs.objects.repository.persistence.ObjectPersistentStore;
@@ -121,7 +121,7 @@ public class JObjectResolver {
refs.forEach(r -> {
Log.trace("Hydrating ref after undelete " + r + " for " + self.getName());
jObjectManager.getOrPut(r, self.getData().getRefType(), Optional.of(self.getName()));
jObjectManager.getOrPut(r, self.getData() != null ? self.getData().getRefType() : JObjectData.class, Optional.of(self.getName()));
});
}

View File

@@ -0,0 +1,11 @@
package com.usatiuk.dhfs.objects.jrepository;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
public @interface OnlyLocal {
}

View File

@@ -19,6 +19,7 @@ public class JObjectDataDeserializer implements ProtoDeserializer<JObjectDataP,
case CHUNKDATA -> protoSerializerService.deserialize(message.getChunkData());
case PEERDIRECTORY -> protoSerializerService.deserialize(message.getPeerDirectory());
case PERSISTENTPEERINFO -> protoSerializerService.deserialize(message.getPersistentPeerInfo());
case TREENODE -> protoSerializerService.deserialize(message.getTreeNode());
case OBJ_NOT_SET -> throw new IllegalStateException("Type not set when deserializing");
};
}

View File

@@ -40,11 +40,11 @@ public class ProtoSerializerService {
void init() {
for (var s : _protoSerializers) {
var args = ((ParameterizedType) Arrays.stream(ClientProxy.unwrap(s).getClass().getGenericInterfaces())
.filter(t -> {
if (t instanceof ParameterizedType)
return ((ParameterizedType) t).getRawType().equals(ProtoSerializer.class);
return false;
}).findFirst().orElseThrow(() -> new IllegalArgumentException("ProtoSerializer interface not found on ProtoSerializer?")))
.filter(t -> {
if (t instanceof ParameterizedType)
return ((ParameterizedType) t).getRawType().equals(ProtoSerializer.class);
return false;
}).findFirst().orElseThrow(() -> new IllegalArgumentException("ProtoSerializer interface not found on ProtoSerializer?")))
.getActualTypeArguments(); //FIXME:
Class<? extends Message> messageClass = (Class<? extends Message>) args[0];
Class<?> objClass = (Class<?>) args[1];
@@ -57,11 +57,11 @@ public class ProtoSerializerService {
for (var s : _protoDeserializers) {
var args = ((ParameterizedType) Arrays.stream(ClientProxy.unwrap(s).getClass().getGenericInterfaces())
.filter(t -> {
if (t instanceof ParameterizedType)
return ((ParameterizedType) t).getRawType().equals(ProtoDeserializer.class);
return false;
}).findFirst().orElseThrow(() -> new IllegalArgumentException("ProtoSerializer interface not found on ProtoSerializer?")))
.filter(t -> {
if (t instanceof ParameterizedType)
return ((ParameterizedType) t).getRawType().equals(ProtoDeserializer.class);
return false;
}).findFirst().orElseThrow(() -> new IllegalArgumentException("ProtoSerializer interface not found on ProtoSerializer?")))
.getActualTypeArguments(); //FIXME:
Class<? extends Message> messageClass = (Class<? extends Message>) args[0];
Class<?> objClass = (Class<?>) args[1];
@@ -94,6 +94,8 @@ public class ProtoSerializerService {
return Optional.of(JObjectDataP.newBuilder().setPeerDirectory((PeerDirectoryP) ser).build());
} else if (ser instanceof PersistentPeerInfoP) {
return Optional.of(JObjectDataP.newBuilder().setPersistentPeerInfo((PersistentPeerInfoP) ser).build());
} else if (ser instanceof TreeNodeP) {
return Optional.of(JObjectDataP.newBuilder().setTreeNode((TreeNodeP) ser).build());
} else {
return Optional.empty();
}

View File

@@ -1,4 +0,0 @@
package com.usatiuk.dhfs.objects.repository;
public class InvalidationQueue {
}

View File

@@ -5,6 +5,7 @@ import com.usatiuk.dhfs.ShutdownChecker;
import com.usatiuk.dhfs.objects.jrepository.JObject;
import com.usatiuk.dhfs.objects.jrepository.JObjectManager;
import com.usatiuk.dhfs.objects.jrepository.JObjectResolver;
import com.usatiuk.dhfs.objects.repository.invalidation.InvalidationQueueService;
import com.usatiuk.dhfs.objects.repository.peersync.PeerDirectory;
import com.usatiuk.dhfs.objects.repository.peersync.PersistentPeerInfo;
import com.usatiuk.dhfs.objects.repository.peertrust.PeerTrustManager;
@@ -193,7 +194,7 @@ public class PersistentRemoteHostsService {
return getPeersSnapshot().stream().filter(i -> !i.getUuid().equals(_selfUuid)).toList();
}
public List<UUID> getHostsUuid() {
public List<UUID> getHostUuids() {
return getPeerDirectory().runReadLocked(JObject.ResolutionStrategy.LOCAL_ONLY, (m, d) -> d.getPeers().stream().filter(i -> !i.equals(_selfUuid)).toList());
}

View File

@@ -48,7 +48,7 @@ public class RemoteHostManager {
// Note: newly added hosts aren't in _transientPeersState
// but that's ok as they don't have initialSyncDone set
for (var h : persistentRemoteHostsService.getHostsUuid())
for (var h : persistentRemoteHostsService.getHostUuids())
_transientPeersState.runWriteLocked(d -> d.get(h));
_initialized = true;
@@ -62,8 +62,8 @@ public class RemoteHostManager {
public void tryConnectAll() {
if (!_initialized) return;
try {
_heartbeatExecutor.invokeAll(persistentRemoteHostsService.getHostsUuid().stream()
.<Callable<Void>>map(host -> () -> {
_heartbeatExecutor.invokeAll(persistentRemoteHostsService.getHostUuids().stream()
.<Callable<Void>>map(host -> () -> {
try {
if (isReachable(host))
Log.trace("Heartbeat: " + host);

View File

@@ -6,6 +6,7 @@ import com.usatiuk.dhfs.objects.jrepository.JObjectManager;
import com.usatiuk.dhfs.objects.jrepository.PushResolution;
import com.usatiuk.dhfs.objects.persistence.JObjectDataP;
import com.usatiuk.dhfs.objects.protoserializer.ProtoSerializerService;
import com.usatiuk.dhfs.objects.repository.invalidation.InvalidationQueueService;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.quarkus.logging.Log;
@@ -54,7 +55,7 @@ public class RemoteObjectServiceClient {
.filter(entry -> entry.getValue().equals(ourVersion))
.map(Map.Entry::getKey).toList();
else
return persistentRemoteHostsService.getHostsUuid();
return persistentRemoteHostsService.getHostUuids();
});
if (targets.isEmpty())

View File

@@ -6,6 +6,7 @@ import com.usatiuk.dhfs.objects.jrepository.JObjectManager;
import com.usatiuk.dhfs.objects.persistence.JObjectDataP;
import com.usatiuk.dhfs.objects.protoserializer.ProtoSerializerService;
import com.usatiuk.dhfs.objects.repository.autosync.AutoSyncProcessor;
import com.usatiuk.dhfs.objects.repository.invalidation.InvalidationQueueService;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.quarkus.grpc.GrpcService;

View File

@@ -4,6 +4,7 @@ import com.usatiuk.dhfs.objects.jrepository.JObject;
import com.usatiuk.dhfs.objects.jrepository.JObjectData;
import com.usatiuk.dhfs.objects.jrepository.JObjectManager;
import com.usatiuk.dhfs.objects.protoserializer.ProtoSerializerService;
import com.usatiuk.dhfs.objects.repository.invalidation.InvalidationQueueService;
import io.quarkus.logging.Log;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Instance;
@@ -51,7 +52,7 @@ public class SyncHandler {
JObject<?> found = jObjectManager.getOrPut(header.getName(), JObjectData.class, Optional.empty());
var receivedTotalVer = header.getChangelog().getEntriesList()
.stream().map(ObjectChangelogEntry::getVersion).reduce(0L, Long::sum);
.stream().map(ObjectChangelogEntry::getVersion).reduce(0L, Long::sum);
var receivedMap = new HashMap<UUID, Long>();
for (var e : header.getChangelog().getEntriesList()) {
@@ -61,7 +62,7 @@ public class SyncHandler {
boolean conflict = found.runWriteLocked(JObject.ResolutionStrategy.NO_RESOLUTION, (md, data, bump, invalidate) -> {
if (md.getRemoteCopies().getOrDefault(from, 0L) > receivedTotalVer) {
Log.error("Received older index update than was known for host: "
+ from + " " + header.getName());
+ from + " " + header.getName());
throw new OutdatedUpdateException();
}
@@ -96,7 +97,7 @@ public class SyncHandler {
if (hasLower) {
Log.info("Received older index update than known: "
+ from + " " + header.getName());
+ from + " " + header.getName());
throw new OutdatedUpdateException();
}
@@ -159,5 +160,17 @@ public class SyncHandler {
}
protected static class OutdatedUpdateException extends RuntimeException {
OutdatedUpdateException() {
super();
}
OutdatedUpdateException(String message) {
super(message);
}
@Override
public synchronized Throwable fillInStackTrace() {
return this;
}
}
}

View File

@@ -1,4 +1,4 @@
package com.usatiuk.dhfs.objects.repository;
package com.usatiuk.dhfs.objects.repository.invalidation;
import lombok.Getter;
import org.apache.commons.collections4.MultiValuedMap;

View File

@@ -1,6 +1,7 @@
package com.usatiuk.dhfs.objects.repository;
package com.usatiuk.dhfs.objects.repository.invalidation;
import com.usatiuk.dhfs.SerializationHelper;
import com.usatiuk.dhfs.objects.repository.RemoteHostManager;
import io.quarkus.logging.Log;
import io.quarkus.runtime.ShutdownEvent;
import io.quarkus.runtime.StartupEvent;

View File

@@ -0,0 +1,4 @@
package com.usatiuk.dhfs.objects.repository.invalidation;
public class InvalidationQueue {
}

View File

@@ -1,7 +1,11 @@
package com.usatiuk.dhfs.objects.repository;
package com.usatiuk.dhfs.objects.repository.invalidation;
import com.usatiuk.dhfs.objects.jrepository.DeletedObjectAccessException;
import com.usatiuk.dhfs.objects.jrepository.JObject;
import com.usatiuk.dhfs.objects.jrepository.JObjectManager;
import com.usatiuk.dhfs.objects.repository.PersistentRemoteHostsService;
import com.usatiuk.dhfs.objects.repository.RemoteHostManager;
import com.usatiuk.dhfs.objects.repository.RemoteObjectServiceClient;
import com.usatiuk.utils.HashSetDelayedBlockingQueue;
import io.quarkus.logging.Log;
import io.quarkus.runtime.ShutdownEvent;
@@ -145,7 +149,8 @@ public class InvalidationQueueService {
Log.info("Invalidation sender exited");
}
public void pushInvalidationToAll(String name) {
public void pushInvalidationToAll(JObject<?> obj) {
if (obj.isOnlyLocal()) return;
while (true) {
var queue = _toAllQueue.get();
if (queue == null) {
@@ -154,19 +159,32 @@ public class InvalidationQueueService {
queue = nq;
}
queue.add(name);
queue.add(obj.getName());
if (_toAllQueue.get() == queue) break;
}
}
public void pushInvalidationToOne(UUID host, String name) {
public void pushInvalidationToOne(UUID host, JObject<?> obj) {
if (obj.isOnlyLocal()) return;
if (remoteHostManager.isReachable(host))
_queue.add(Pair.of(host, name));
_queue.add(Pair.of(host, obj.getName()));
else
deferredInvalidationQueueService.returnForHost(host);
}
public void pushInvalidationToAll(String name) {
pushInvalidationToAll(jObjectManager.get(name).orElseThrow(() -> new IllegalArgumentException("Object " + name + " not found")));
}
public void pushInvalidationToOne(UUID host, String name) {
pushInvalidationToOne(host, jObjectManager.get(name).orElseThrow(() -> new IllegalArgumentException("Object " + name + " not found")));
}
public void pushInvalidationToAll(OpQueue queue) {
}
protected void pushDeferredInvalidations(UUID host, String name) {
_queue.add(Pair.of(host, name));
}

View File

@@ -0,0 +1,8 @@
package com.usatiuk.dhfs.objects.repository.invalidation;
import java.util.Collection;
import java.util.UUID;
public interface OpQueue {
Object getForHost(UUID host);
}

View File

@@ -30,8 +30,7 @@ message FsNodeP {
message FileP {
FsNodeP fsNode = 1;
map<int64, string> chunks = 2;
string parent = 3;
bool symlink = 4;
bool symlink = 3;
}
message DirectoryP {
@@ -58,6 +57,25 @@ message PersistentPeerInfoP {
bytes cert = 2;
}
message TreeNodeMetaFileP {
string name = 1;
string fileMirror = 2;
}
message TreeNodeMetaDirectoryP {
string name = 1;
}
message TreeNodeP {
optional string parent = 1;
string id = 2;
map<string, string> children = 3;
oneof meta {
TreeNodeMetaFileP file = 4;
TreeNodeMetaDirectoryP dir = 5;
}
}
message JObjectDataP {
oneof obj {
FileP file = 2;
@@ -66,5 +84,6 @@ message JObjectDataP {
ChunkDataP chunkData = 5;
PeerDirectoryP peerDirectory = 6;
PersistentPeerInfoP persistentPeerInfo = 7;
TreeNodeP treeNode = 8;
}
}

View File

@@ -64,7 +64,7 @@ public class DhfsFileServiceSimpleTestImpl {
ChunkInfo c2i = new ChunkInfo(c2.getHash(), c2.getBytes().size());
ChunkData c3 = new ChunkData(ByteString.copyFrom("91011".getBytes()));
ChunkInfo c3i = new ChunkInfo(c3.getHash(), c3.getBytes().size());
File f = new File(fuuid, 777, new UUID(0, 0), false);
File f = new File(fuuid, 777, false);
f.getChunks().put(0L, c1.getHash());
f.getChunks().put((long) c1.getBytes().size(), c2.getHash());
f.getChunks().put((long) c1.getBytes().size() + c2.getBytes().size(), c3.getHash());
@@ -192,7 +192,7 @@ public class DhfsFileServiceSimpleTestImpl {
Assertions.assertTrue(fileService.open("/movedTest").isPresent());
Assertions.assertArrayEquals(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9},
fileService.read(fileService.open("/movedTest").get(), 0, 10).get().toByteArray());
fileService.read(fileService.open("/movedTest").get(), 0, 10).get().toByteArray());
}
@Test
@@ -245,16 +245,17 @@ public class DhfsFileServiceSimpleTestImpl {
Assertions.assertTrue(fileService.open("/movedTest2").isPresent());
Assertions.assertArrayEquals(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9},
fileService.read(fileService.open("/movedTest2").get(), 0, 10).get().toByteArray());
fileService.read(fileService.open("/movedTest2").get(), 0, 10).get().toByteArray());
var newfile = fileService.open("/movedTest2").get();
Thread.sleep(1000);
chunkObj.runWriteLocked(JObject.ResolutionStrategy.LOCAL_ONLY, (m, d, b, v) -> {
Assertions.assertTrue(m.getReferrers().contains(newfile));
Assertions.assertFalse(m.getReferrers().contains(uuid));
return null;
});
// FIXME: No gc!
// Thread.sleep(1000);
//
// chunkObj.runWriteLocked(JObject.ResolutionStrategy.LOCAL_ONLY, (m, d, b, v) -> {
// Assertions.assertTrue(m.getReferrers().contains(newfile));
// Assertions.assertFalse(m.getReferrers().contains(uuid));
// return null;
// });
}
}