16 Commits

Author SHA1 Message Date
7902d9e486 proper pages 2025-05-12 16:10:43 +02:00
7c056b9674 fix 2025-05-12 16:03:42 +02:00
8cc040b234 fix 2025-05-12 16:01:26 +02:00
f8375c9cd8 javadocs github pages 2025-05-12 15:56:07 +02:00
0c3524851e Some javadocs + CI 2025-05-12 12:49:10 +02:00
3eb7164c0f Dhfs-fuse: fix LazyFsIT tests 2025-05-10 16:49:58 +02:00
f544a67fb5 Objects: cleanup AutoCloseableNoThrow 2025-05-10 13:49:42 +02:00
964b3da951 Objects: remove getUsableSpace 2025-05-10 11:20:34 +02:00
cb33472dc5 Utils: remove VoidFn 2025-05-10 11:07:40 +02:00
de211bb2d2 Objects: remove prepareTx 2025-05-07 16:12:47 +02:00
56ab3bad4c Objects: remove TransactionPrivate 2025-05-07 15:00:15 +02:00
9403556220 Objects: remove TransactionFactory 2025-05-07 14:39:55 +02:00
469a6b9011 Objects: remove lockmanager 2025-05-07 11:21:48 +02:00
52ccbb99bc Sync-base: rename ConnectedPeerManager to ReachablePeerManager
tests check for "connected" in logs
2025-05-06 20:28:21 +02:00
d972cd1562 Objects: remove LockingStrategy 2025-05-06 20:21:29 +02:00
80151bcca5 Dhfs-fuse: less parallel e2e tests 2025-05-06 20:07:03 +02:00
56 changed files with 645 additions and 669 deletions

View File

@@ -43,11 +43,11 @@ jobs:
distribution: "zulu"
cache: maven
- name: Build LazyFS
run: cd thirdparty/lazyfs/ && ./build.sh
# - name: Build LazyFS
# run: cd thirdparty/lazyfs/ && ./build.sh
- name: Test with Maven
run: cd dhfs-parent && mvn -T $(nproc) --batch-mode --update-snapshots package verify
run: cd dhfs-parent && mvn -T $(nproc) --batch-mode --update-snapshots javadoc:aggregate
# - name: Build with Maven
# run: cd dhfs-parent && mvn --batch-mode --update-snapshots package # -Dquarkus.log.category.\"com.usatiuk.dhfs\".min-level=DEBUG
@@ -57,6 +57,11 @@ jobs:
name: DHFS Server Package
path: dhfs-parent/dhfs-fuse/target/quarkus-app
- uses: actions/upload-artifact@v4
with:
name: DHFS Javadocs
path: dhfs-parent/target/reports/apidocs/
- uses: actions/upload-artifact@v4
if: ${{ always() }}
with:
@@ -231,3 +236,37 @@ jobs:
with:
name: Run wrapper
path: ~/run-wrapper.tar.gz
publish-javadoc:
environment:
name: github-pages
url: ${{ steps.deployment.outputs.page_url }}
runs-on: ubuntu-latest
permissions:
contents: read
pages: write
id-token: write
needs: [build-webui, build-dhfs]
steps:
- name: Checkout repository
uses: actions/checkout@v4
- uses: actions/download-artifact@v4
with:
name: DHFS Javadocs
path: dhfs-javadocs-downloaded
- name: Setup Pages
uses: actions/configure-pages@v5
- name: Upload artifact
uses: actions/upload-pages-artifact@v3
with:
# Upload entire repository
path: 'dhfs-javadocs-downloaded'
- name: Deploy to GitHub Pages
id: deployment
uses: actions/deploy-pages@v4

View File

@@ -41,3 +41,5 @@ nb-configuration.xml
# Plugin directory
/.quarkus/cli/plugins/
.jqwik-database

View File

@@ -5,6 +5,11 @@ import com.usatiuk.dhfs.remoteobj.JDataRemote;
import com.usatiuk.dhfs.remoteobj.JDataRemoteDto;
import com.usatiuk.objects.JObjectKey;
/**
* ChunkData is a data structure that represents an immutable binary blob
* @param key unique key
* @param data binary data
*/
public record ChunkData(JObjectKey key, ByteString data) implements JDataRemote, JDataRemoteDto {
@Override
public int estimateSize() {

View File

@@ -1,26 +0,0 @@
package com.usatiuk.dhfsfs.objects;
import com.usatiuk.dhfs.ProtoSerializer;
import com.usatiuk.dhfs.persistence.ChunkDataP;
import com.usatiuk.dhfs.persistence.JObjectKeyP;
import com.usatiuk.objects.JObjectKey;
import jakarta.inject.Singleton;
@Singleton
public class ChunkDataProtoSerializer implements ProtoSerializer<ChunkDataP, ChunkData> {
@Override
public ChunkData deserialize(ChunkDataP message) {
return new ChunkData(
JObjectKey.of(message.getKey().getName()),
message.getData()
);
}
@Override
public ChunkDataP serialize(ChunkData object) {
return ChunkDataP.newBuilder()
.setKey(JObjectKeyP.newBuilder().setName(object.key().value()).build())
.setData(object.data())
.build();
}
}

View File

@@ -9,6 +9,14 @@ import com.usatiuk.objects.JObjectKey;
import java.util.Collection;
import java.util.Set;
/**
* File is a data structure that represents a file in the file system
* @param key unique key
* @param mode file mode
* @param cTime creation time
* @param mTime modification time
* @param symlink true if the file is a symlink, false otherwise
*/
public record File(JObjectKey key, long mode, long cTime, long mTime,
boolean symlink
) implements JDataRemote, JMapHolder<JMapLongKey> {

View File

@@ -7,6 +7,11 @@ import org.apache.commons.lang3.tuple.Pair;
import java.util.List;
/**
* FileDto is a data transfer object that contains a file and its chunks.
* @param file the file
* @param chunks the list of chunks, each represented as a pair of a long and a JObjectKey
*/
public record FileDto(File file, List<Pair<Long, JObjectKey>> chunks) implements JDataRemoteDto {
@Override
public Class<? extends JDataRemote> objClass() {

View File

@@ -5,6 +5,9 @@ import com.usatiuk.dhfs.syncmap.DtoMapper;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
/**
* Maps a {@link File} object to a {@link FileDto} object and vice versa.
*/
@ApplicationScoped
public class FileDtoMapper implements DtoMapper<File, FileDto> {
@Inject

View File

@@ -10,11 +10,20 @@ import org.apache.commons.lang3.tuple.Pair;
import java.util.ArrayList;
import java.util.List;
/**
* Helper class for working with files.
*/
@ApplicationScoped
public class FileHelper {
@Inject
JMapHelper jMapHelper;
/**
* Get the chunks of a file.
* Transaction is expected to be already started.
* @param file the file to get chunks from
* @return a list of pairs of chunk offset and chunk key
*/
public List<Pair<Long, JObjectKey>> getChunks(File file) {
ArrayList<Pair<Long, JObjectKey>> chunks = new ArrayList<>();
try (var it = jMapHelper.getIterator(file)) {
@@ -26,6 +35,13 @@ public class FileHelper {
return List.copyOf(chunks);
}
/**
* Replace the chunks of a file.
* All previous chunks will be deleted.
* Transaction is expected to be already started.
* @param file the file to replace chunks in
* @param chunks the list of pairs of chunk offset and chunk key
*/
public void replaceChunks(File file, List<Pair<Long, JObjectKey>> chunks) {
jMapHelper.deleteAll(file);

View File

@@ -1,25 +0,0 @@
package com.usatiuk.dhfsfs.objects;
import com.usatiuk.dhfs.ProtoSerializer;
import com.usatiuk.dhfs.persistence.FileDtoP;
import com.usatiuk.utils.SerializationHelper;
import jakarta.inject.Singleton;
import java.io.IOException;
@Singleton
public class FileProtoSerializer implements ProtoSerializer<FileDtoP, FileDto> {
@Override
public FileDto deserialize(FileDtoP message) {
try (var is = message.getSerializedData().newInput()) {
return SerializationHelper.deserialize(is);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public FileDtoP serialize(FileDto object) {
return FileDtoP.newBuilder().setSerializedData(SerializationHelper.serialize(object)).build();
}
}

View File

@@ -8,7 +8,6 @@ import com.usatiuk.dhfs.remoteobj.*;
import com.usatiuk.dhfsfs.service.DhfsFileService;
import com.usatiuk.kleppmanntree.AlreadyExistsException;
import com.usatiuk.objects.JObjectKey;
import com.usatiuk.objects.transaction.LockingStrategy;
import com.usatiuk.objects.transaction.Transaction;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
@@ -24,6 +23,9 @@ import javax.annotation.Nullable;
import java.util.List;
import java.util.Objects;
/**
* Handles synchronization of file objects.
*/
@ApplicationScoped
public class FileSyncHandler implements ObjSyncHandler<File, FileDto> {
@Inject
@@ -42,14 +44,18 @@ public class FileSyncHandler implements ObjSyncHandler<File, FileDto> {
@Inject
DhfsFileService fileService;
private JKleppmannTreeManager.JKleppmannTree getTreeW() {
private JKleppmannTreeManager.JKleppmannTree getTree() {
return jKleppmannTreeManager.getTree(JObjectKey.of("fs")).orElseThrow();
}
private JKleppmannTreeManager.JKleppmannTree getTreeR() {
return jKleppmannTreeManager.getTree(JObjectKey.of("fs"), LockingStrategy.OPTIMISTIC).orElseThrow();
}
/**
* Resolve conflict between two file versions, update the file in storage and create a conflict file.
*
* @param from the peer that sent the update
* @param key the key of the file
* @param receivedChangelog the changelog of the received file
* @param receivedData the received file data
*/
private void resolveConflict(PeerId from, JObjectKey key, PMap<PeerId, Long> receivedChangelog,
@Nullable FileDto receivedData) {
var oursCurMeta = curTx.get(RemoteObjectMeta.class, key).orElse(null);
@@ -131,12 +137,12 @@ public class FileSyncHandler implements ObjSyncHandler<File, FileDto> {
do {
try {
getTreeW().move(parent.getRight(),
getTree().move(parent.getRight(),
new JKleppmannTreeNodeMetaFile(
parent.getLeft() + ".fconflict." + persistentPeerDataService.getSelfUuid() + "." + otherHostname.toString() + "." + i,
newFile.key()
),
getTreeW().getNewNodeId()
getTree().getNewNodeId()
);
} catch (AlreadyExistsException aex) {
i++;

View File

@@ -6,6 +6,10 @@ import com.usatiuk.objects.JObjectKey;
import java.util.Collection;
import java.util.List;
/**
* JKleppmannTreeNodeMetaDirectory is a record that represents a directory in the JKleppmann tree.
* @param name the name of the directory
*/
public record JKleppmannTreeNodeMetaDirectory(String name) implements JKleppmannTreeNodeMeta {
public JKleppmannTreeNodeMeta withName(String name) {
return new JKleppmannTreeNodeMetaDirectory(name);

View File

@@ -6,6 +6,11 @@ import com.usatiuk.objects.JObjectKey;
import java.util.Collection;
import java.util.List;
/**
* JKleppmannTreeNodeMetaFile is a record that represents a file in the JKleppmann tree.
* @param name the name of the file
* @param fileIno a reference to the `File` object
*/
public record JKleppmannTreeNodeMetaFile(String name, JObjectKey fileIno) implements JKleppmannTreeNodeMeta {
@Override
public JKleppmannTreeNodeMeta withName(String name) {

View File

@@ -19,7 +19,6 @@ import com.usatiuk.dhfsfs.objects.JKleppmannTreeNodeMetaFile;
import com.usatiuk.objects.JData;
import com.usatiuk.objects.JObjectKey;
import com.usatiuk.objects.iterators.IteratorStart;
import com.usatiuk.objects.transaction.LockingStrategy;
import com.usatiuk.objects.transaction.Transaction;
import com.usatiuk.objects.transaction.TransactionManager;
import com.usatiuk.utils.StatusRuntimeExceptionNoStacktrace;
@@ -40,6 +39,9 @@ import java.nio.file.Path;
import java.util.*;
import java.util.stream.StreamSupport;
/**
* Actual filesystem implementation.
*/
@ApplicationScoped
public class DhfsFileService {
@ConfigProperty(name = "dhfs.files.target_chunk_alignment")
@@ -68,14 +70,16 @@ public class DhfsFileService {
@Inject
JMapHelper jMapHelper;
private JKleppmannTreeManager.JKleppmannTree getTreeW() {
private JKleppmannTreeManager.JKleppmannTree getTree() {
return jKleppmannTreeManager.getTree(JObjectKey.of("fs"), () -> new JKleppmannTreeNodeMetaDirectory(""));
}
private JKleppmannTreeManager.JKleppmannTree getTreeR() {
return jKleppmannTreeManager.getTree(JObjectKey.of("fs"), LockingStrategy.OPTIMISTIC, () -> new JKleppmannTreeNodeMetaDirectory(""));
}
/**
* Create a new chunk with the given data and a new unique ID.
*
* @param bytes the data to store in the chunk
* @return the created chunk
*/
private ChunkData createChunk(ByteString bytes) {
var newChunk = new ChunkData(JObjectKey.of(UUID.randomUUID().toString()), bytes);
remoteTx.putDataNew(newChunk);
@@ -84,30 +88,28 @@ public class DhfsFileService {
void init(@Observes @Priority(500) StartupEvent event) {
Log.info("Initializing file service");
getTreeW();
getTree();
}
private JKleppmannTreeNode getDirEntryW(String name) {
var res = getTreeW().traverse(StreamSupport.stream(Path.of(name).spliterator(), false).map(p -> p.toString()).toList());
if (res == null) throw new StatusRuntimeExceptionNoStacktrace(Status.NOT_FOUND);
var ret = curTx.get(JKleppmannTreeNodeHolder.class, res).map(JKleppmannTreeNodeHolder::node).orElseThrow(() -> new StatusRuntimeException(Status.NOT_FOUND.withDescription("Tree node exists but not found as jObject: " + name)));
return ret;
}
private JKleppmannTreeNode getDirEntryR(String name) {
var res = getTreeR().traverse(StreamSupport.stream(Path.of(name).spliterator(), false).map(p -> p.toString()).toList());
private JKleppmannTreeNode getDirEntry(String name) {
var res = getTree().traverse(StreamSupport.stream(Path.of(name).spliterator(), false).map(p -> p.toString()).toList());
if (res == null) throw new StatusRuntimeExceptionNoStacktrace(Status.NOT_FOUND);
var ret = curTx.get(JKleppmannTreeNodeHolder.class, res).map(JKleppmannTreeNodeHolder::node).orElseThrow(() -> new StatusRuntimeException(Status.NOT_FOUND.withDescription("Tree node exists but not found as jObject: " + name)));
return ret;
}
private Optional<JKleppmannTreeNode> getDirEntryOpt(String name) {
var res = getTreeW().traverse(StreamSupport.stream(Path.of(name).spliterator(), false).map(p -> p.toString()).toList());
var res = getTree().traverse(StreamSupport.stream(Path.of(name).spliterator(), false).map(p -> p.toString()).toList());
if (res == null) return Optional.empty();
var ret = curTx.get(JKleppmannTreeNodeHolder.class, res).map(JKleppmannTreeNodeHolder::node);
return ret;
}
/**
* Get the attributes of a file or directory.
* @param uuid the UUID of the file or directory
* @return the attributes of the file or directory
*/
public Optional<GetattrRes> getattr(JObjectKey uuid) {
return jObjectTxManager.executeTx(() -> {
var ref = curTx.get(JData.class, uuid).orElse(null);
@@ -129,10 +131,15 @@ public class DhfsFileService {
});
}
/**
* Try to resolve a path to a file or directory.
* @param name the path to resolve
* @return the key of the file or directory, or an empty optional if it does not exist
*/
public Optional<JObjectKey> open(String name) {
return jObjectTxManager.executeTx(() -> {
try {
var ret = getDirEntryR(name);
var ret = getDirEntry(name);
return switch (ret.meta()) {
case JKleppmannTreeNodeMetaFile f -> Optional.of(f.fileIno());
case JKleppmannTreeNodeMetaDirectory f -> Optional.of(ret.key());
@@ -152,10 +159,16 @@ public class DhfsFileService {
throw new StatusRuntimeExceptionNoStacktrace(Status.INVALID_ARGUMENT.withDescription("Not a directory: " + entry.key()));
}
/**
* Create a new file with the given name and mode.
* @param name the name of the file
* @param mode the mode of the file
* @return the key of the created file
*/
public Optional<JObjectKey> create(String name, long mode) {
return jObjectTxManager.executeTx(() -> {
Path path = Path.of(name);
var parent = getDirEntryW(path.getParent().toString());
var parent = getDirEntry(path.getParent().toString());
ensureDir(parent);
@@ -167,7 +180,7 @@ public class DhfsFileService {
remoteTx.putData(f);
try {
getTreeW().move(parent.key(), new JKleppmannTreeNodeMetaFile(fname, f.key()), getTreeW().getNewNodeId());
getTree().move(parent.key(), new JKleppmannTreeNodeMetaFile(fname, f.key()), getTree().getNewNodeId());
} catch (Exception e) {
// fobj.getMeta().removeRef(newNodeId);
throw e;
@@ -176,10 +189,15 @@ public class DhfsFileService {
});
}
//FIXME: Slow..
/**
* Get the parent directory of a file or directory.
* @param ino the key of the file or directory
* @return the parent directory
*/
public Pair<String, JObjectKey> inoToParent(JObjectKey ino) {
return jObjectTxManager.executeTx(() -> {
return getTreeW().findParent(w -> {
// FIXME: Slow
return getTree().findParent(w -> {
if (w.meta() instanceof JKleppmannTreeNodeMetaFile f)
return f.fileIno().equals(ino);
return false;
@@ -187,20 +205,31 @@ public class DhfsFileService {
});
}
/**
* Create a new directory with the given name and mode.
* @param name the name of the directory
* @param mode the mode of the directory
*/
public void mkdir(String name, long mode) {
jObjectTxManager.executeTx(() -> {
Path path = Path.of(name);
var parent = getDirEntryW(path.getParent().toString());
var parent = getDirEntry(path.getParent().toString());
ensureDir(parent);
String dname = path.getFileName().toString();
Log.debug("Creating directory " + name);
getTreeW().move(parent.key(), new JKleppmannTreeNodeMetaDirectory(dname), getTreeW().getNewNodeId());
// TODO: No modes for directories yet
getTree().move(parent.key(), new JKleppmannTreeNodeMetaDirectory(dname), getTree().getNewNodeId());
});
}
/**
* Unlink a file or directory.
* @param name the name of the file or directory
* @throws DirectoryNotEmptyException if the directory is not empty and recursive delete is not allowed
*/
public void unlink(String name) {
jObjectTxManager.executeTx(() -> {
var node = getDirEntryOpt(name).orElse(null);
@@ -210,24 +239,36 @@ public class DhfsFileService {
if (!allowRecursiveDelete && !node.children().isEmpty())
throw new DirectoryNotEmptyException();
}
getTreeW().trash(node.meta(), node.key());
getTree().trash(node.meta(), node.key());
});
}
/**
* Rename a file or directory.
* @param from the old name
* @param to the new name
* @return true if the rename was successful, false otherwise
*/
public Boolean rename(String from, String to) {
return jObjectTxManager.executeTx(() -> {
var node = getDirEntryW(from);
var node = getDirEntry(from);
JKleppmannTreeNodeMeta meta = node.meta();
var toPath = Path.of(to);
var toDentry = getDirEntryW(toPath.getParent().toString());
var toDentry = getDirEntry(toPath.getParent().toString());
ensureDir(toDentry);
getTreeW().move(toDentry.key(), meta.withName(toPath.getFileName().toString()), node.key());
getTree().move(toDentry.key(), meta.withName(toPath.getFileName().toString()), node.key());
return true;
});
}
/**
* Change the mode of a file or directory.
* @param uuid the ID of the file or directory
* @param mode the new mode
* @return true if the mode was changed successfully, false otherwise
*/
public Boolean chmod(JObjectKey uuid, long mode) {
return jObjectTxManager.executeTx(() -> {
var dent = curTx.get(JData.class, uuid).orElseThrow(() -> new StatusRuntimeExceptionNoStacktrace(Status.NOT_FOUND));
@@ -248,9 +289,14 @@ public class DhfsFileService {
});
}
/**
* Read the contents of a directory.
* @param name the path of the directory
* @return an iterable of the names of the files in the directory
*/
public Iterable<String> readDir(String name) {
return jObjectTxManager.executeTx(() -> {
var found = getDirEntryW(name);
var found = getDirEntry(name);
if (!(found.meta() instanceof JKleppmannTreeNodeMetaDirectory md))
throw new StatusRuntimeException(Status.INVALID_ARGUMENT);
@@ -259,6 +305,13 @@ public class DhfsFileService {
});
}
/**
* Read the contents of a file.
* @param fileUuid the ID of the file
* @param offset the offset to start reading from
* @param length the number of bytes to read
* @return the contents of the file as a ByteString
*/
public ByteString read(JObjectKey fileUuid, long offset, int length) {
return jObjectTxManager.executeTx(() -> {
if (length < 0)
@@ -320,6 +373,11 @@ public class DhfsFileService {
});
}
/**
* Get the size of a file.
* @param uuid the ID of the file
* @return the size of the file
*/
private ByteString readChunk(JObjectKey uuid) {
var chunkRead = remoteTx.getData(ChunkData.class, uuid).orElse(null);
@@ -331,6 +389,11 @@ public class DhfsFileService {
return chunkRead.data();
}
/**
* Get the size of a chunk.
* @param uuid the ID of the chunk
* @return the size of the chunk
*/
private int getChunkSize(JObjectKey uuid) {
return readChunk(uuid).size();
}
@@ -339,12 +402,19 @@ public class DhfsFileService {
return num & -(1L << n);
}
/**
* Write data to a file.
* @param fileUuid the ID of the file
* @param offset the offset to write to
* @param data the data to write
* @return the number of bytes written
*/
public Long write(JObjectKey fileUuid, long offset, ByteString data) {
return jObjectTxManager.executeTx(() -> {
if (offset < 0)
throw new StatusRuntimeException(Status.INVALID_ARGUMENT.withDescription("Offset should be more than zero: " + offset));
var file = remoteTx.getData(File.class, fileUuid, LockingStrategy.WRITE).orElse(null);
var file = remoteTx.getData(File.class, fileUuid).orElse(null);
if (file == null) {
throw new StatusRuntimeException(Status.NOT_FOUND.withDescription("File not found when trying to write: " + fileUuid));
}
@@ -441,6 +511,12 @@ public class DhfsFileService {
});
}
/**
* Truncate a file to the given length.
* @param fileUuid the ID of the file
* @param length the new length of the file
* @return true if the truncate was successful, false otherwise
*/
public Boolean truncate(JObjectKey fileUuid, long length) {
return jObjectTxManager.executeTx(() -> {
if (length < 0)
@@ -530,6 +606,12 @@ public class DhfsFileService {
});
}
/**
* Fill the given range with zeroes.
* @param fillStart the start of the range
* @param length the end of the range
* @param newChunks the map to store the new chunks in
*/
private void fillZeros(long fillStart, long length, Map<Long, JObjectKey> newChunks) {
long combinedSize = (length - fillStart);
@@ -565,12 +647,22 @@ public class DhfsFileService {
}
}
/**
* Read the contents of a symlink.
* @param uuid the ID of the symlink
* @return the contents of the symlink as a string
*/
public String readlink(JObjectKey uuid) {
return jObjectTxManager.executeTx(() -> {
return readlinkBS(uuid).toStringUtf8();
});
}
/**
* Read the contents of a symlink as a ByteString.
* @param uuid the ID of the symlink
* @return the contents of the symlink as a ByteString
*/
public ByteString readlinkBS(JObjectKey uuid) {
return jObjectTxManager.executeTx(() -> {
var fileOpt = remoteTx.getData(File.class, uuid).orElseThrow(() -> new StatusRuntimeException(Status.NOT_FOUND.withDescription("File not found when trying to readlink: " + uuid)));
@@ -578,10 +670,16 @@ public class DhfsFileService {
});
}
/**
* Create a symlink.
* @param oldpath the target of the symlink
* @param newpath the path of the symlink
* @return the key of the created symlink
*/
public JObjectKey symlink(String oldpath, String newpath) {
return jObjectTxManager.executeTx(() -> {
Path path = Path.of(newpath);
var parent = getDirEntryW(path.getParent().toString());
var parent = getDirEntry(path.getParent().toString());
ensureDir(parent);
@@ -595,11 +693,18 @@ public class DhfsFileService {
jMapHelper.put(f, JMapLongKey.of(0), newChunkData.key());
remoteTx.putData(f);
getTreeW().move(parent.key(), new JKleppmannTreeNodeMetaFile(fname, f.key()), getTreeW().getNewNodeId());
getTree().move(parent.key(), new JKleppmannTreeNodeMetaFile(fname, f.key()), getTree().getNewNodeId());
return f.key();
});
}
/**
* Set the access and modification times of a file.
* @param fileUuid the ID of the file
* @param atimeMs the access time in milliseconds
* @param mtimeMs the modification time in milliseconds
* @return true if the times were set successfully, false otherwise
*/
public Boolean setTimes(JObjectKey fileUuid, long atimeMs, long mtimeMs) {
return jObjectTxManager.executeTx(() -> {
var dent = curTx.get(JData.class, fileUuid).orElseThrow(() -> new StatusRuntimeExceptionNoStacktrace(Status.NOT_FOUND));
@@ -621,6 +726,11 @@ public class DhfsFileService {
});
}
/**
* Get the size of a file.
* @param fileUuid the ID of the file
* @return the size of the file
*/
public long size(JObjectKey fileUuid) {
return jObjectTxManager.executeTx(() -> {
long realSize = 0;
@@ -640,6 +750,13 @@ public class DhfsFileService {
});
}
/**
* Write data to a file.
* @param fileUuid the ID of the file
* @param offset the offset to write to
* @param data the data to write
* @return the number of bytes written
*/
public Long write(JObjectKey fileUuid, long offset, byte[] data) {
return write(fileUuid, offset, UnsafeByteOperations.unsafeWrap(data));
}

View File

@@ -1,5 +1,10 @@
package com.usatiuk.dhfsfs.service;
/**
* DirectoryNotEmptyException is thrown when a directory is not empty.
* This exception is used to indicate that a directory cannot be deleted
* because it contains files or subdirectories.
*/
public class DirectoryNotEmptyException extends RuntimeException {
@Override
public synchronized Throwable fillInStackTrace() {

View File

@@ -1,4 +1,11 @@
package com.usatiuk.dhfsfs.service;
public record GetattrRes(long mtime, long ctime, long mode, GetattrType type) {
/**
* GetattrRes is a record that represents the result of a getattr operation.
* @param mtime File modification time
* @param ctime File creation time
* @param mode File mode
* @param type File type
*/
public record GetattrRes(long mtime, long ctime, long mode, GetattrType type) {
}

View File

@@ -139,7 +139,7 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-failsafe-plugin</artifactId>
<configuration>
<forkCount>1C</forkCount>
<forkCount>0.5C</forkCount>
<reuseForks>false</reuseForks>
<parallel>classes</parallel>
<systemPropertyVariables>

View File

@@ -154,7 +154,7 @@ public class LazyFsIT {
Thread.sleep(3000);
Log.info("Killing");
lazyFs1.crash();
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Caused by: org.lmdbjava"), 60, TimeUnit.SECONDS);
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("org.lmdbjava.LmdbNativeException"), 60, TimeUnit.SECONDS);
var client = DockerClientFactory.instance().client();
client.killContainerCmd(container1.getContainerId()).exec();
container1.stop();
@@ -195,7 +195,7 @@ public class LazyFsIT {
lazyFs1.crash();
}
try {
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Caused by: org.lmdbjava"), 60, TimeUnit.SECONDS);
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("org.lmdbjava.LmdbNativeException"), 60, TimeUnit.SECONDS);
} catch (TimeoutException e) {
// Sometimes crash doesn't work
Log.info("Failed to crash: " + testInfo.getDisplayName());
@@ -237,7 +237,7 @@ public class LazyFsIT {
Thread.sleep(3000);
Log.info("Killing");
lazyFs1.crash();
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Caused by: org.lmdbjava"), 60, TimeUnit.SECONDS);
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("org.lmdbjava.LmdbNativeException"), 60, TimeUnit.SECONDS);
var client = DockerClientFactory.instance().client();
client.killContainerCmd(container1.getContainerId()).exec();
container1.stop();
@@ -279,7 +279,7 @@ public class LazyFsIT {
lazyFs1.crash();
}
try {
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Caused by: org.lmdbjava"), 60, TimeUnit.SECONDS);
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("org.lmdbjava.LmdbNativeException"), 60, TimeUnit.SECONDS);
} catch (TimeoutException e) {
// Sometimes crash doesn't work
Log.info("Failed to crash: " + testInfo.getDisplayName());
@@ -322,7 +322,7 @@ public class LazyFsIT {
Log.info("Killing");
lazyFs2.crash();
container1.execInContainer("/bin/sh", "-c", "touch /tmp/stopprinting1");
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Caused by: org.lmdbjava"), 60, TimeUnit.SECONDS);
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("org.lmdbjava.LmdbNativeException"), 60, TimeUnit.SECONDS);
var client = DockerClientFactory.instance().client();
client.killContainerCmd(container2.getContainerId()).exec();
container2.stop();
@@ -366,7 +366,7 @@ public class LazyFsIT {
}
container1.execInContainer("/bin/sh", "-c", "touch /tmp/stopprinting2");
try {
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Caused by: org.lmdbjava"), 60, TimeUnit.SECONDS);
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("org.lmdbjava.LmdbNativeException"), 60, TimeUnit.SECONDS);
} catch (TimeoutException e) {
// Sometimes crash doesn't work
Log.info("Failed to crash: " + testInfo.getDisplayName());
@@ -409,7 +409,7 @@ public class LazyFsIT {
Log.info("Killing");
lazyFs2.crash();
container1.execInContainer("/bin/sh", "-c", "touch /tmp/stopprinting1");
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Caused by: org.lmdbjava"), 60, TimeUnit.SECONDS);
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("org.lmdbjava.LmdbNativeException"), 60, TimeUnit.SECONDS);
var client = DockerClientFactory.instance().client();
client.killContainerCmd(container2.getContainerId()).exec();
container2.stop();
@@ -453,7 +453,7 @@ public class LazyFsIT {
}
container1.execInContainer("/bin/sh", "-c", "touch /tmp/stopprinting2");
try {
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Caused by: org.lmdbjava"), 60, TimeUnit.SECONDS);
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("org.lmdbjava.LmdbNativeException"), 60, TimeUnit.SECONDS);
} catch (TimeoutException e) {
// Sometimes crash doesn't work
Log.info("Failed to crash: " + testInfo.getDisplayName());

View File

@@ -1,32 +0,0 @@
package com.usatiuk.kleppmanntree;
import java.io.Serializable;
public class AtomicClock implements Clock<Long>, Serializable {
private long _max = 0;
public AtomicClock(long counter) {
_max = counter;
}
@Override
public Long getTimestamp() {
return ++_max;
}
public void setTimestamp(Long timestamp) {
_max = timestamp;
}
@Override
public Long peekTimestamp() {
return _max;
}
@Override
public Long updateTimestamp(Long receivedTimestamp) {
var old = _max;
_max = Math.max(_max, receivedTimestamp) + 1;
return old;
}
}

View File

@@ -1,11 +1,10 @@
package com.usatiuk.objects.iterators;
import com.usatiuk.utils.AutoCloseableNoThrow;
import org.apache.commons.lang3.tuple.Pair;
import java.util.Iterator;
public interface CloseableKvIterator<K extends Comparable<? super K>, V> extends Iterator<Pair<K, V>>, AutoCloseableNoThrow {
public interface CloseableKvIterator<K extends Comparable<? super K>, V> extends Iterator<Pair<K, V>>, AutoCloseable {
K peekNextKey();
void skip();
@@ -21,4 +20,7 @@ public interface CloseableKvIterator<K extends Comparable<? super K>, V> extends
default CloseableKvIterator<K, V> reversed() {
return new ReversedKvIterator<K, V>(this);
}
@Override
void close();
}

View File

@@ -3,19 +3,20 @@ package com.usatiuk.objects.snapshot;
import com.usatiuk.objects.iterators.CloseableKvIterator;
import com.usatiuk.objects.iterators.IteratorStart;
import com.usatiuk.objects.iterators.MaybeTombstone;
import com.usatiuk.objects.iterators.Tombstone;
import com.usatiuk.utils.AutoCloseableNoThrow;
import javax.annotation.Nonnull;
import java.util.List;
import java.util.Optional;
import java.util.stream.Stream;
public interface Snapshot<K extends Comparable<K>, V> extends AutoCloseableNoThrow {
public interface Snapshot<K extends Comparable<K>, V> extends AutoCloseable {
List<CloseableKvIterator<K, MaybeTombstone<V>>> getIterator(IteratorStart start, K key);
@Nonnull
Optional<V> readObject(K name);
long id();
@Override
void close();
}

View File

@@ -31,7 +31,6 @@ public class CachingObjectPersistentStore {
SerializingObjectPersistentStore delegate;
@ConfigProperty(name = "dhfs.objects.lru.print-stats")
boolean printStats;
private ExecutorService _commitExecutor;
private ExecutorService _statusExecutor;
private AtomicLong _cached = new AtomicLong();
private AtomicLong _cacheTries = new AtomicLong();
@@ -47,7 +46,6 @@ public class CachingObjectPersistentStore {
_cache.set(_cache.get().withVersion(s.id()));
}
_commitExecutor = Executors.newSingleThreadExecutor();
if (printStats) {
_statusExecutor = Executors.newSingleThreadExecutor();
_statusExecutor.submit(() -> {
@@ -68,7 +66,6 @@ public class CachingObjectPersistentStore {
Log.tracev("Committing: {0} writes, {1} deletes", objs.written().size(), objs.deleted().size());
var cache = _cache.get();
var commitFuture = _commitExecutor.submit(() -> delegate.prepareTx(objs, txId).run());
for (var write : objs.written()) {
cache = cache.withPut(write.getLeft(), Optional.of(write.getRight()));
}
@@ -76,11 +73,7 @@ public class CachingObjectPersistentStore {
cache = cache.withPut(del, Optional.empty());
}
cache = cache.withVersion(txId);
try {
commitFuture.get();
} catch (Exception e) {
throw new RuntimeException(e);
}
delegate.commitTx(objs, txId);
_cache.set(cache);
Log.tracev("Committed: {0} writes, {1} deletes", objs.written().size(), objs.deleted().size());

View File

@@ -145,10 +145,9 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore {
}
@Override
public Runnable prepareTx(TxManifestRaw names, long txId) {
public void commitTx(TxManifestRaw names, long txId) {
verifyReady();
var txn = _env.txnWrite();
try {
try (var txn = _env.txnWrite()) {
for (var written : names.written()) {
var putBb = _db.reserve(txn, written.getKey().toByteBuffer(), written.getValue().size());
written.getValue().copyTo(putBb);
@@ -163,17 +162,8 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore {
bbData.putLong(txId);
bbData.flip();
_db.put(txn, DB_VER_OBJ_NAME.asReadOnlyBuffer(), bbData);
} catch (Throwable t) {
txn.close();
throw t;
txn.commit();
}
return () -> {
try {
txn.commit();
} finally {
txn.close();
}
};
}
@Override
@@ -188,12 +178,6 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore {
return _root.toFile().getFreeSpace();
}
@Override
public long getUsableSpace() {
verifyReady();
return _root.toFile().getUsableSpace();
}
private class LmdbKvIterator extends ReversibleKvIterator<JObjectKey, MaybeTombstone<ByteBuffer>> {
private static final Cleaner CLEANER = Cleaner.create();
private final Txn<ByteBuffer> _txn; // Managed by the snapshot

View File

@@ -53,19 +53,18 @@ public class MemoryObjectPersistentStore implements ObjectPersistentStore {
}
}
public Runnable prepareTx(TxManifestRaw names, long txId) {
return () -> {
synchronized (this) {
for (var written : names.written()) {
_objects = _objects.plus(written.getKey(), written.getValue());
}
for (JObjectKey key : names.deleted()) {
_objects = _objects.minus(key);
}
assert txId > _lastCommitId;
_lastCommitId = txId;
@Override
public void commitTx(TxManifestRaw names, long txId) {
synchronized (this) {
for (var written : names.written()) {
_objects = _objects.plus(written.getKey(), written.getValue());
}
};
for (JObjectKey key : names.deleted()) {
_objects = _objects.minus(key);
}
assert txId > _lastCommitId;
_lastCommitId = txId;
}
}
@Override
@@ -77,9 +76,4 @@ public class MemoryObjectPersistentStore implements ObjectPersistentStore {
public long getFreeSpace() {
return 0;
}
@Override
public long getUsableSpace() {
return 0;
}
}

View File

@@ -13,11 +13,9 @@ import java.util.Optional;
public interface ObjectPersistentStore {
Snapshot<JObjectKey, ByteBuffer> getSnapshot();
Runnable prepareTx(TxManifestRaw names, long txId);
void commitTx(TxManifestRaw names, long txId);
long getTotalSpace();
long getFreeSpace();
long getUsableSpace();
}

View File

@@ -62,7 +62,7 @@ public class SerializingObjectPersistentStore {
, objs.deleted());
}
Runnable prepareTx(TxManifestObj<? extends JDataVersionedWrapper> objects, long txId) {
return delegateStore.prepareTx(prepareManifest(objects), txId);
void commitTx(TxManifestObj<? extends JDataVersionedWrapper> objects, long txId) {
delegateStore.commitTx(prepareManifest(objects), txId);
}
}

View File

@@ -188,7 +188,7 @@ public class WritebackObjectPersistentStore {
Log.info("Writeback thread exiting");
}
public long commitBundle(Collection<TxRecord.TxObjectRecord<?>> writes) {
private long commitBundle(Collection<TxRecord.TxObjectRecord<?>> writes) {
verifyReady();
_pendingBundleLock.lock();
try {

View File

@@ -25,8 +25,8 @@ public class CurrentTransaction implements Transaction {
}
@Override
public <T extends JData> Optional<T> get(Class<T> type, JObjectKey key, LockingStrategy strategy) {
return transactionManager.current().get(type, key, strategy);
public <T extends JData> Optional<T> get(Class<T> type, JObjectKey key) {
return transactionManager.current().get(type, key);
}
@Override

View File

@@ -1,23 +0,0 @@
package com.usatiuk.objects.transaction;
import com.usatiuk.objects.JObjectKey;
import com.usatiuk.utils.AutoCloseableNoThrow;
import com.usatiuk.utils.DataLocker;
import jakarta.annotation.Nonnull;
import jakarta.annotation.Nullable;
import jakarta.inject.Singleton;
@Singleton
public class LockManager {
private final DataLocker _objLocker = new DataLocker();
@Nonnull
public AutoCloseableNoThrow lockObject(JObjectKey key) {
return _objLocker.lock(key);
}
@Nullable
public AutoCloseableNoThrow tryLockObject(JObjectKey key) {
return _objLocker.tryLock(key);
}
}

View File

@@ -1,6 +0,0 @@
package com.usatiuk.objects.transaction;
public enum LockingStrategy {
OPTIMISTIC, // Optimistic write, no blocking other possible writers/readers
WRITE, // Write lock, blocks all other writers
}

View File

@@ -11,17 +11,13 @@ import java.util.Optional;
public interface Transaction extends TransactionHandle {
void onCommit(Runnable runnable);
<T extends JData> Optional<T> get(Class<T> type, JObjectKey key, LockingStrategy strategy);
<T extends JData> Optional<T> get(Class<T> type, JObjectKey key);
<T extends JData> void put(JData obj);
<T extends JData> void putNew(JData obj);
void delete(JObjectKey key);
default <T extends JData> Optional<T> get(Class<T> type, JObjectKey key) {
return get(type, key, LockingStrategy.OPTIMISTIC);
}
CloseableKvIterator<JObjectKey, JData> getIterator(IteratorStart start, JObjectKey key);
default CloseableKvIterator<JObjectKey, JData> getIterator(JObjectKey key) {

View File

@@ -1,5 +0,0 @@
package com.usatiuk.objects.transaction;
public interface TransactionFactory {
TransactionPrivate createTransaction();
}

View File

@@ -1,266 +0,0 @@
package com.usatiuk.objects.transaction;
import com.usatiuk.objects.JData;
import com.usatiuk.objects.JDataVersionedWrapper;
import com.usatiuk.objects.JObjectKey;
import com.usatiuk.objects.iterators.*;
import com.usatiuk.objects.snapshot.Snapshot;
import com.usatiuk.objects.stores.WritebackObjectPersistentStore;
import com.usatiuk.utils.ListUtils;
import io.quarkus.logging.Log;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import org.apache.commons.lang3.tuple.Pair;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import java.util.*;
@Singleton
public class TransactionFactoryImpl implements TransactionFactory {
@Inject
WritebackObjectPersistentStore writebackObjectPersistentStore;
@Inject
LockManager lockManager;
@ConfigProperty(name = "dhfs.objects.transaction.never-lock")
boolean neverLock;
@Override
public TransactionPrivate createTransaction() {
return new TransactionImpl();
}
private interface ReadTrackingInternalCrap {
boolean fromSource();
JData obj();
}
// FIXME:
private record ReadTrackingInternalCrapSource(JDataVersionedWrapper wrapped) implements ReadTrackingInternalCrap {
@Override
public boolean fromSource() {
return true;
}
@Override
public JData obj() {
return wrapped.data();
}
}
private record ReadTrackingInternalCrapTx(JData obj) implements ReadTrackingInternalCrap {
@Override
public boolean fromSource() {
return false;
}
}
private class TransactionImpl implements TransactionPrivate {
private final Map<JObjectKey, Optional<JDataVersionedWrapper>> _readSet = new HashMap<>();
private final NavigableMap<JObjectKey, TxRecord.TxObjectRecord<?>> _writes = new TreeMap<>();
private final List<Runnable> _onCommit = new LinkedList<>();
private final List<Runnable> _onFlush = new LinkedList<>();
private final HashSet<JObjectKey> _knownNew = new HashSet<>();
private final Snapshot<JObjectKey, JDataVersionedWrapper> _snapshot;
private boolean _closed = false;
private boolean _writeTrack = false;
private Map<JObjectKey, TxRecord.TxObjectRecord<?>> _newWrites = new HashMap<>();
private TransactionImpl() {
_snapshot = writebackObjectPersistentStore.getSnapshot();
}
@Override
public void onCommit(Runnable runnable) {
_onCommit.add(runnable);
}
@Override
public void onFlush(Runnable runnable) {
_onFlush.add(runnable);
}
@Override
public Collection<Runnable> getOnCommit() {
return Collections.unmodifiableCollection(_onCommit);
}
@Override
public Snapshot<JObjectKey, JDataVersionedWrapper> snapshot() {
return _snapshot;
}
@Override
public Collection<Runnable> getOnFlush() {
return Collections.unmodifiableCollection(_onFlush);
}
@Override
public <T extends JData> Optional<T> getFromSource(Class<T> type, JObjectKey key) {
if (_knownNew.contains(key)) {
return Optional.empty();
}
return _readSet.computeIfAbsent(key, _snapshot::readObject)
.map(JDataVersionedWrapper::data)
.map(type::cast);
}
@Override
public <T extends JData> Optional<T> get(Class<T> type, JObjectKey key, LockingStrategy strategy) {
return switch (_writes.get(key)) {
case TxRecord.TxObjectRecordWrite<?> write -> Optional.of(type.cast(write.data()));
case TxRecord.TxObjectRecordDeleted deleted -> Optional.empty();
case null -> getFromSource(type, key);
};
}
@Override
public void delete(JObjectKey key) {
var record = new TxRecord.TxObjectRecordDeleted(key);
if (_writes.put(key, record) instanceof TxRecord.TxObjectRecordDeleted) {
return;
}
if (_writeTrack)
_newWrites.put(key, record);
}
@Override
public CloseableKvIterator<JObjectKey, JData> getIterator(IteratorStart start, JObjectKey key) {
Log.tracev("Getting tx iterator with start={0}, key={1}", start, key);
return new ReadTrackingIterator(new TombstoneSkippingIterator<JObjectKey, ReadTrackingInternalCrap>(start, key,
ListUtils.prependAndMap(
new MappingKvIterator<>(new NavigableMapKvIterator<>(_writes, start, key),
t -> switch (t) {
case TxRecord.TxObjectRecordWrite<?> write ->
new DataWrapper<ReadTrackingInternalCrap>(new ReadTrackingInternalCrapTx(write.data()));
case TxRecord.TxObjectRecordDeleted deleted ->
new TombstoneImpl<ReadTrackingInternalCrap>();
case null, default -> null;
}),
_snapshot.getIterator(start, key),
itin -> new MappingKvIterator<JObjectKey, MaybeTombstone<JDataVersionedWrapper>, MaybeTombstone<ReadTrackingInternalCrap>>(itin,
d -> switch (d) {
case Data<JDataVersionedWrapper> w ->
new DataWrapper<>(new ReadTrackingInternalCrapSource(w.value()));
case Tombstone<JDataVersionedWrapper> t -> new TombstoneImpl<>();
case null, default -> null;
}))));
}
@Override
public void put(JData obj) {
var key = obj.key();
var read = _readSet.get(key);
if (read != null && (read.map(JDataVersionedWrapper::data).orElse(null) == obj)) {
return;
}
var record = new TxRecord.TxObjectRecordWrite<>(obj);
_writes.put(key, record);
if (_writeTrack)
_newWrites.put(key, record);
}
@Override
public void putNew(JData obj) {
var key = obj.key();
_knownNew.add(key);
var record = new TxRecord.TxObjectRecordWrite<>(obj);
_writes.put(key, record);
if (_writeTrack)
_newWrites.put(key, record);
}
@Override
public Collection<TxRecord.TxObjectRecord<?>> drainNewWrites() {
if (!_writeTrack) {
_writeTrack = true;
return Collections.unmodifiableCollection(_writes.values());
}
var ret = _newWrites;
_newWrites = new HashMap<>();
return ret.values();
}
@Override
public Map<JObjectKey, Optional<JDataVersionedWrapper>> reads() {
return _readSet;
}
@Override
public Set<JObjectKey> knownNew() {
return _knownNew;
}
@Override
public void close() {
if (_closed) return;
_closed = true;
_snapshot.close();
}
private class ReadTrackingIterator implements CloseableKvIterator<JObjectKey, JData> {
private final CloseableKvIterator<JObjectKey, ReadTrackingInternalCrap> _backing;
public ReadTrackingIterator(CloseableKvIterator<JObjectKey, ReadTrackingInternalCrap> backing) {
_backing = backing;
}
@Override
public JObjectKey peekNextKey() {
return _backing.peekNextKey();
}
@Override
public void skip() {
_backing.skip();
}
@Override
public JObjectKey peekPrevKey() {
return _backing.peekPrevKey();
}
@Override
public Pair<JObjectKey, JData> prev() {
var got = _backing.prev();
if (got.getValue() instanceof ReadTrackingInternalCrapSource(JDataVersionedWrapper wrapped)) {
_readSet.putIfAbsent(got.getKey(), Optional.of(wrapped));
}
return Pair.of(got.getKey(), got.getValue().obj());
}
@Override
public boolean hasPrev() {
return _backing.hasPrev();
}
@Override
public void skipPrev() {
_backing.skipPrev();
}
@Override
public void close() {
_backing.close();
}
@Override
public boolean hasNext() {
return _backing.hasNext();
}
@Override
public Pair<JObjectKey, JData> next() {
var got = _backing.next();
if (got.getValue() instanceof ReadTrackingInternalCrapSource(JDataVersionedWrapper wrapped)) {
_readSet.putIfAbsent(got.getKey(), Optional.of(wrapped));
}
return Pair.of(got.getKey(), got.getValue().obj());
}
}
}
}

View File

@@ -1,7 +0,0 @@
package com.usatiuk.objects.transaction;
import java.util.Collection;
public interface TransactionHandlePrivate extends TransactionHandle {
Collection<Runnable> getOnFlush();
}

View File

@@ -0,0 +1,240 @@
package com.usatiuk.objects.transaction;
import com.usatiuk.objects.JData;
import com.usatiuk.objects.JDataVersionedWrapper;
import com.usatiuk.objects.JObjectKey;
import com.usatiuk.objects.iterators.*;
import com.usatiuk.objects.snapshot.Snapshot;
import com.usatiuk.utils.ListUtils;
import io.quarkus.logging.Log;
import org.apache.commons.lang3.tuple.Pair;
import java.util.*;
class TransactionImpl implements Transaction, AutoCloseable {
private final Map<JObjectKey, Optional<JDataVersionedWrapper>> _readSet = new HashMap<>();
private final NavigableMap<JObjectKey, TxRecord.TxObjectRecord<?>> _writes = new TreeMap<>();
private final List<Runnable> _onCommit = new LinkedList<>();
private final List<Runnable> _onFlush = new LinkedList<>();
private final HashSet<JObjectKey> _knownNew = new HashSet<>();
private final Snapshot<JObjectKey, JDataVersionedWrapper> _snapshot;
private boolean _closed = false;
private boolean _writeTrack = false;
private Map<JObjectKey, TxRecord.TxObjectRecord<?>> _newWrites = new HashMap<>();
private interface ReadTrackingInternalCrap {
boolean fromSource();
JData obj();
}
// FIXME:
private record ReadTrackingInternalCrapSource(JDataVersionedWrapper wrapped) implements ReadTrackingInternalCrap {
@Override
public boolean fromSource() {
return true;
}
@Override
public JData obj() {
return wrapped.data();
}
}
private record ReadTrackingInternalCrapTx(JData obj) implements ReadTrackingInternalCrap {
@Override
public boolean fromSource() {
return false;
}
}
TransactionImpl(Snapshot<JObjectKey, JDataVersionedWrapper> snapshot) {
_snapshot = snapshot;
}
@Override
public void onCommit(Runnable runnable) {
_onCommit.add(runnable);
}
@Override
public void onFlush(Runnable runnable) {
_onFlush.add(runnable);
}
Collection<Runnable> getOnCommit() {
return Collections.unmodifiableCollection(_onCommit);
}
Snapshot<JObjectKey, JDataVersionedWrapper> snapshot() {
return _snapshot;
}
Collection<Runnable> getOnFlush() {
return Collections.unmodifiableCollection(_onFlush);
}
<T extends JData> Optional<T> getFromSource(Class<T> type, JObjectKey key) {
if (_knownNew.contains(key)) {
return Optional.empty();
}
return _readSet.computeIfAbsent(key, _snapshot::readObject)
.map(JDataVersionedWrapper::data)
.map(type::cast);
}
@Override
public <T extends JData> Optional<T> get(Class<T> type, JObjectKey key) {
return switch (_writes.get(key)) {
case TxRecord.TxObjectRecordWrite<?> write -> Optional.of(type.cast(write.data()));
case TxRecord.TxObjectRecordDeleted deleted -> Optional.empty();
case null -> getFromSource(type, key);
};
}
@Override
public void delete(JObjectKey key) {
var record = new TxRecord.TxObjectRecordDeleted(key);
if (_writes.put(key, record) instanceof TxRecord.TxObjectRecordDeleted) {
return;
}
if (_writeTrack)
_newWrites.put(key, record);
}
@Override
public CloseableKvIterator<JObjectKey, JData> getIterator(IteratorStart start, JObjectKey key) {
Log.tracev("Getting tx iterator with start={0}, key={1}", start, key);
return new ReadTrackingIterator(new TombstoneSkippingIterator<JObjectKey, ReadTrackingInternalCrap>(start, key,
ListUtils.prependAndMap(
new MappingKvIterator<>(new NavigableMapKvIterator<>(_writes, start, key),
t -> switch (t) {
case TxRecord.TxObjectRecordWrite<?> write ->
new DataWrapper<ReadTrackingInternalCrap>(new ReadTrackingInternalCrapTx(write.data()));
case TxRecord.TxObjectRecordDeleted deleted ->
new TombstoneImpl<ReadTrackingInternalCrap>();
case null, default -> null;
}),
_snapshot.getIterator(start, key),
itin -> new MappingKvIterator<JObjectKey, MaybeTombstone<JDataVersionedWrapper>, MaybeTombstone<ReadTrackingInternalCrap>>(itin,
d -> switch (d) {
case Data<JDataVersionedWrapper> w ->
new DataWrapper<>(new ReadTrackingInternalCrapSource(w.value()));
case Tombstone<JDataVersionedWrapper> t -> new TombstoneImpl<>();
case null, default -> null;
}))));
}
@Override
public void put(JData obj) {
var key = obj.key();
var read = _readSet.get(key);
if (read != null && (read.map(JDataVersionedWrapper::data).orElse(null) == obj)) {
return;
}
var record = new TxRecord.TxObjectRecordWrite<>(obj);
_writes.put(key, record);
if (_writeTrack)
_newWrites.put(key, record);
}
@Override
public void putNew(JData obj) {
var key = obj.key();
_knownNew.add(key);
var record = new TxRecord.TxObjectRecordWrite<>(obj);
_writes.put(key, record);
if (_writeTrack)
_newWrites.put(key, record);
}
Collection<TxRecord.TxObjectRecord<?>> drainNewWrites() {
if (!_writeTrack) {
_writeTrack = true;
return Collections.unmodifiableCollection(_writes.values());
}
var ret = _newWrites;
_newWrites = new HashMap<>();
return ret.values();
}
Map<JObjectKey, Optional<JDataVersionedWrapper>> reads() {
return _readSet;
}
Set<JObjectKey> knownNew() {
return _knownNew;
}
@Override
public void close() {
if (_closed) return;
_closed = true;
_snapshot.close();
}
private class ReadTrackingIterator implements CloseableKvIterator<JObjectKey, JData> {
private final CloseableKvIterator<JObjectKey, ReadTrackingInternalCrap> _backing;
public ReadTrackingIterator(CloseableKvIterator<JObjectKey, ReadTrackingInternalCrap> backing) {
_backing = backing;
}
@Override
public JObjectKey peekNextKey() {
return _backing.peekNextKey();
}
@Override
public void skip() {
_backing.skip();
}
@Override
public JObjectKey peekPrevKey() {
return _backing.peekPrevKey();
}
@Override
public Pair<JObjectKey, JData> prev() {
var got = _backing.prev();
if (got.getValue() instanceof ReadTrackingInternalCrapSource(JDataVersionedWrapper wrapped)) {
_readSet.putIfAbsent(got.getKey(), Optional.of(wrapped));
}
return Pair.of(got.getKey(), got.getValue().obj());
}
@Override
public boolean hasPrev() {
return _backing.hasPrev();
}
@Override
public void skipPrev() {
_backing.skipPrev();
}
@Override
public void close() {
_backing.close();
}
@Override
public boolean hasNext() {
return _backing.hasNext();
}
@Override
public Pair<JObjectKey, JData> next() {
var got = _backing.next();
if (got.getValue() instanceof ReadTrackingInternalCrapSource(JDataVersionedWrapper wrapped)) {
_readSet.putIfAbsent(got.getKey(), Optional.of(wrapped));
}
return Pair.of(got.getKey(), got.getValue().obj());
}
}
}

View File

@@ -1,6 +1,5 @@
package com.usatiuk.objects.transaction;
import com.usatiuk.utils.VoidFn;
import io.quarkus.logging.Log;
import java.util.function.Supplier;
@@ -41,9 +40,9 @@ public interface TransactionManager {
}
}
default TransactionHandle runTries(VoidFn fn, int tries, boolean nest) {
default TransactionHandle runTries(Runnable fn, int tries, boolean nest) {
if (!nest && current() != null) {
fn.apply();
fn.run();
return new TransactionHandle() {
@Override
public void onFlush(Runnable runnable) {
@@ -56,7 +55,7 @@ public interface TransactionManager {
begin();
boolean commit = false;
try {
fn.apply();
fn.run();
commit = true;
var ret = commit();
return ret;
@@ -80,11 +79,11 @@ public interface TransactionManager {
return runTries(supplier, tries, false);
}
default TransactionHandle runTries(VoidFn fn, int tries) {
default TransactionHandle runTries(Runnable fn, int tries) {
return runTries(fn, tries, false);
}
default TransactionHandle run(VoidFn fn, boolean nest) {
default TransactionHandle run(Runnable fn, boolean nest) {
return runTries(fn, 10, nest);
}
@@ -92,7 +91,7 @@ public interface TransactionManager {
return runTries(supplier, 10, nest);
}
default TransactionHandle run(VoidFn fn) {
default TransactionHandle run(Runnable fn) {
return run(fn, false);
}
@@ -100,7 +99,7 @@ public interface TransactionManager {
return run(supplier, false);
}
default void executeTx(VoidFn fn) {
default void executeTx(Runnable fn) {
run(fn, false);
}

View File

@@ -11,14 +11,14 @@ import java.util.Stack;
@Singleton
public class TransactionManagerImpl implements TransactionManager {
private static final ThreadLocal<Stack<TransactionPrivate>> _currentTransaction = ThreadLocal.withInitial(Stack::new);
private static final ThreadLocal<Stack<TransactionImpl>> _currentTransaction = ThreadLocal.withInitial(Stack::new);
@Inject
JObjectManager jObjectManager;
TransactionService transactionService;
@Override
public void begin() {
Log.trace("Starting transaction");
var tx = jObjectManager.createTransaction();
var tx = transactionService.createTransaction();
_currentTransaction.get().push(tx);
}
@@ -34,7 +34,7 @@ public class TransactionManagerImpl implements TransactionManager {
Pair<Collection<Runnable>, TransactionHandle> ret;
try {
ret = jObjectManager.commit(peeked);
ret = transactionService.commit(peeked);
} catch (Throwable e) {
Log.trace("Transaction commit failed", e);
throw e;
@@ -64,7 +64,7 @@ public class TransactionManagerImpl implements TransactionManager {
var peeked = stack.peek();
try {
jObjectManager.rollback(peeked);
transactionService.rollback(peeked);
} catch (Throwable e) {
Log.error("Transaction rollback failed", e);
throw e;

View File

@@ -1,27 +0,0 @@
package com.usatiuk.objects.transaction;
import com.usatiuk.objects.JData;
import com.usatiuk.objects.JDataVersionedWrapper;
import com.usatiuk.objects.JObjectKey;
import com.usatiuk.objects.snapshot.Snapshot;
import com.usatiuk.utils.AutoCloseableNoThrow;
import java.util.Collection;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
// The transaction interface actually used by user code to retrieve objects
public interface TransactionPrivate extends Transaction, TransactionHandlePrivate, AutoCloseableNoThrow {
Collection<TxRecord.TxObjectRecord<?>> drainNewWrites();
Map<JObjectKey, Optional<JDataVersionedWrapper>> reads();
Set<JObjectKey> knownNew();
<T extends JData> Optional<T> getFromSource(Class<T> type, JObjectKey key);
Collection<Runnable> getOnCommit();
Snapshot<JObjectKey, JDataVersionedWrapper> snapshot();
}

View File

@@ -6,6 +6,7 @@ import com.usatiuk.objects.JObjectKey;
import com.usatiuk.objects.snapshot.Snapshot;
import com.usatiuk.objects.stores.WritebackObjectPersistentStore;
import com.usatiuk.utils.AutoCloseableNoThrow;
import com.usatiuk.utils.DataLocker;
import io.quarkus.logging.Log;
import io.quarkus.runtime.StartupEvent;
import jakarta.annotation.Priority;
@@ -21,21 +22,19 @@ import java.util.function.Consumer;
import java.util.function.Function;
@ApplicationScoped
public class JObjectManager {
public class TransactionService {
private static final List<PreCommitTxHook> _preCommitTxHooks;
@Inject
WritebackObjectPersistentStore writebackObjectPersistentStore;
@Inject
TransactionFactory transactionFactory;
@Inject
LockManager lockManager;
private boolean _ready = false;
private final DataLocker _objLocker = new DataLocker();
static {
_preCommitTxHooks = List.copyOf(CDI.current().select(PreCommitTxHook.class).stream().sorted(Comparator.comparingInt(PreCommitTxHook::getPriority)).toList());
}
JObjectManager(Instance<PreCommitTxHook> preCommitTxHooks) {
TransactionService(Instance<PreCommitTxHook> preCommitTxHooks) {
Log.debugv("Pre-commit hooks: {0}", String.join("->", _preCommitTxHooks.stream().map(Objects::toString).toList()));
}
@@ -47,14 +46,14 @@ public class JObjectManager {
_ready = true;
}
public TransactionPrivate createTransaction() {
public TransactionImpl createTransaction() {
verifyReady();
var tx = transactionFactory.createTransaction();
var tx = new TransactionImpl(writebackObjectPersistentStore.getSnapshot());
Log.tracev("Created transaction with snapshotId={0}", tx.snapshot().id());
return tx;
}
public Pair<Collection<Runnable>, TransactionHandle> commit(TransactionPrivate tx) {
public Pair<Collection<Runnable>, TransactionHandle> commit(TransactionImpl tx) {
verifyReady();
var writes = new HashMap<JObjectKey, TxRecord.TxObjectRecord<?>>();
Snapshot<JObjectKey, JDataVersionedWrapper> commitSnapshot = null;
@@ -162,7 +161,7 @@ public class JObjectManager {
for (var key : toLock) {
if (tx.knownNew().contains(key))
continue;
var lock = lockManager.lockObject(key);
var lock = _objLocker.lock(key);
toUnlock.add(lock);
}
@@ -258,7 +257,7 @@ public class JObjectManager {
}
}
public void rollback(TransactionPrivate tx) {
public void rollback(TransactionImpl tx) {
verifyReady();
tx.close();
}

View File

@@ -5,7 +5,6 @@ dhfs.objects.lru.print-stats=false
dhfs.objects.lock_timeout_secs=15
dhfs.objects.persistence.files.root=${HOME}/dhfs_default/data/objs
dhfs.objects.persistence.snapshot-extra-checks=false
dhfs.objects.transaction.never-lock=true
dhfs.objects.last-seen.update=60
dhfs.objects.last-seen.timeout=43200
quarkus.log.category."com.usatiuk.objects.iterators".level=INFO

View File

@@ -2,14 +2,11 @@ package com.usatiuk.objects;
import com.usatiuk.objects.data.Parent;
import com.usatiuk.objects.iterators.IteratorStart;
import com.usatiuk.objects.transaction.LockingStrategy;
import com.usatiuk.objects.transaction.Transaction;
import com.usatiuk.objects.transaction.TransactionManager;
import io.quarkus.logging.Log;
import jakarta.inject.Inject;
import org.junit.jupiter.api.*;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import java.util.List;
import java.util.Map;
@@ -151,12 +148,12 @@ public abstract class ObjectsTestImpl {
});
txm.run(() -> {
var parent = curTx.get(Parent.class, JObjectKey.of(testInfo.getDisplayName() + "Parent3"), LockingStrategy.OPTIMISTIC).orElse(null);
var parent = curTx.get(Parent.class, JObjectKey.of(testInfo.getDisplayName() + "Parent3")).orElse(null);
Assertions.assertEquals("John", parent.name());
curTx.put(parent.withName("John2"));
});
txm.run(() -> {
var parent = curTx.get(Parent.class, JObjectKey.of(testInfo.getDisplayName() + "Parent3"), LockingStrategy.WRITE).orElse(null);
var parent = curTx.get(Parent.class, JObjectKey.of(testInfo.getDisplayName() + "Parent3")).orElse(null);
Assertions.assertEquals("John2", parent.name());
curTx.put(parent.withName("John3"));
});
@@ -236,10 +233,9 @@ public abstract class ObjectsTestImpl {
}
}
@ParameterizedTest
@EnumSource(LockingStrategy.class)
void editConflict(LockingStrategy strategy, TestInfo testInfo) {
String key = testInfo.getDisplayName() + "Parent4" + strategy.name();
@Test
void editConflict(TestInfo testInfo) {
String key = testInfo.getDisplayName() + "Parent4";
txm.run(() -> {
var newParent = new Parent(JObjectKey.of(key), "John3");
curTx.put(newParent);
@@ -260,7 +256,7 @@ public abstract class ObjectsTestImpl {
} catch (Throwable e) {
throw new RuntimeException(e);
}
var parent = curTx.get(Parent.class, JObjectKey.of(key), strategy).orElse(null);
var parent = curTx.get(Parent.class, JObjectKey.of(key)).orElse(null);
curTx.put(parent.withName("John"));
Log.warn("Thread 1 commit");
}, 0);
@@ -276,7 +272,7 @@ public abstract class ObjectsTestImpl {
Log.warn("Thread 2");
barrier.await(); // Ensure thread 2 tx id is larger than thread 1
txm.runTries(() -> {
var parent = curTx.get(Parent.class, JObjectKey.of(key), strategy).orElse(null);
var parent = curTx.get(Parent.class, JObjectKey.of(key)).orElse(null);
curTx.put(parent.withName("John2"));
Log.warn("Thread 2 commit");
}, 0);
@@ -317,10 +313,9 @@ public abstract class ObjectsTestImpl {
}
}
@ParameterizedTest
@EnumSource(LockingStrategy.class)
void editConflict2(LockingStrategy strategy, TestInfo testInfo) {
String key = testInfo.getDisplayName() + "EditConflict2" + strategy.name();
@Test
void editConflict2(TestInfo testInfo) {
String key = testInfo.getDisplayName() + "EditConflict2";
txm.run(() -> {
var newParent = new Parent(JObjectKey.of(key), "John3");
curTx.put(newParent);
@@ -341,7 +336,7 @@ public abstract class ObjectsTestImpl {
} catch (Throwable e) {
throw new RuntimeException(e);
}
var parent = curTx.get(Parent.class, JObjectKey.of(key), strategy).orElse(null);
var parent = curTx.get(Parent.class, JObjectKey.of(key)).orElse(null);
curTx.put(parent.withName("John"));
Log.warn("Thread 1 commit");
}, 0);
@@ -362,7 +357,7 @@ public abstract class ObjectsTestImpl {
} catch (Throwable e) {
throw new RuntimeException(e);
}
var parent = curTx.get(Parent.class, JObjectKey.of(key), strategy).orElse(null);
var parent = curTx.get(Parent.class, JObjectKey.of(key)).orElse(null);
curTx.put(parent.withName("John2"));
Log.warn("Thread 2 commit");
}, 0);
@@ -922,10 +917,8 @@ public abstract class ObjectsTestImpl {
() -> createGetObject(testInfo),
() -> createDeleteObject(testInfo),
() -> createCreateObject(testInfo),
() -> editConflict(LockingStrategy.WRITE, testInfo),
() -> editConflict(LockingStrategy.OPTIMISTIC, testInfo),
() -> editConflict2(LockingStrategy.WRITE, testInfo),
() -> editConflict2(LockingStrategy.OPTIMISTIC, testInfo),
() -> editConflict(testInfo),
() -> editConflict2(testInfo),
() -> snapshotTest1(testInfo),
() -> snapshotTest2(testInfo),
() -> snapshotTest3(testInfo),

View File

@@ -22,7 +22,6 @@
<properties>
<compiler-plugin.version>3.12.1</compiler-plugin.version>
<!--FIXME-->
<maven.compiler.release></maven.compiler.release>
<maven.compiler.source>21</maven.compiler.source>
<maven.compiler.target>21</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
@@ -87,6 +86,19 @@
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-javadoc-plugin</artifactId>
<version>3.11.2</version>
<configuration>
<additionalOptions>
--add-exports java.base/sun.nio.ch=ALL-UNNAMED
--add-exports java.base/jdk.internal.access=ALL-UNNAMED
--add-opens=java.base/java.nio=ALL-UNNAMED
--enable-preview
</additionalOptions>
</configuration>
</plugin>
<plugin>
<groupId>${quarkus.platform.group-id}</groupId>
<artifactId>quarkus-maven-plugin</artifactId>

View File

@@ -2,7 +2,7 @@ package com.usatiuk.dhfs.invalidation;
import com.usatiuk.dhfs.peersync.PeerConnectedEventListener;
import com.usatiuk.dhfs.peersync.PeerId;
import com.usatiuk.dhfs.peersync.ConnectedPeerManager;
import com.usatiuk.dhfs.peersync.ReachablePeerManager;
import com.usatiuk.utils.SerializationHelper;
import io.quarkus.logging.Log;
import io.quarkus.runtime.ShutdownEvent;
@@ -24,7 +24,7 @@ import java.nio.file.Paths;
public class DeferredInvalidationQueueService implements PeerConnectedEventListener {
private static final String dataFileName = "invqueue";
@Inject
ConnectedPeerManager remoteHostManager;
ReachablePeerManager reachablePeerManager;
@Inject
InvalidationQueueService invalidationQueueService;
@ConfigProperty(name = "dhfs.objects.persistence.files.root")
@@ -63,7 +63,7 @@ public class DeferredInvalidationQueueService implements PeerConnectedEventListe
@Scheduled(every = "15s", concurrentExecution = Scheduled.ConcurrentExecution.SKIP)
@Blocking
void periodicReturn() {
for (var reachable : remoteHostManager.getAvailableHosts())
for (var reachable : reachablePeerManager.getAvailableHosts())
returnForHost(reachable);
}

View File

@@ -2,7 +2,7 @@ package com.usatiuk.dhfs.invalidation;
import com.usatiuk.dhfs.peersync.PeerId;
import com.usatiuk.dhfs.peersync.PeerInfoService;
import com.usatiuk.dhfs.peersync.ConnectedPeerManager;
import com.usatiuk.dhfs.peersync.ReachablePeerManager;
import com.usatiuk.dhfs.peersync.PersistentPeerDataService;
import com.usatiuk.dhfs.rpc.RemoteObjectServiceClient;
import com.usatiuk.objects.JData;
@@ -37,7 +37,7 @@ public class InvalidationQueueService {
private final AtomicReference<ConcurrentHashSet<JObjectKey>> _toAllQueue = new AtomicReference<>(new ConcurrentHashSet<>());
private final DataLocker _locker = new DataLocker();
@Inject
ConnectedPeerManager remoteHostManager;
ReachablePeerManager reachablePeerManager;
@Inject
DeferredInvalidationQueueService deferredInvalidationQueueService;
@Inject
@@ -103,7 +103,7 @@ public class InvalidationQueueService {
}
if (toAllQueue != null) {
var hostInfo = remoteHostManager.getHostStateSnapshot();
var hostInfo = reachablePeerManager.getHostStateSnapshot();
for (var o : toAllQueue) {
for (var h : hostInfo.available())
_queue.add(new InvalidationQueueEntry(h, o));
@@ -129,7 +129,7 @@ public class InvalidationQueueService {
continue;
}
if (!remoteHostManager.isReachable(e.peer())) {
if (!reachablePeerManager.isReachable(e.peer())) {
deferredInvalidationQueueService.defer(e);
continue;
}
@@ -210,14 +210,14 @@ public class InvalidationQueueService {
}
void pushInvalidationToOne(InvalidationQueueEntry entry) {
if (remoteHostManager.isReachable(entry.peer()))
if (reachablePeerManager.isReachable(entry.peer()))
_queue.add(entry);
else
deferredInvalidationQueueService.defer(entry);
}
void pushInvalidationToOneNoDelay(InvalidationQueueEntry entry) {
if (remoteHostManager.isReachable(entry.peer()))
if (reachablePeerManager.isReachable(entry.peer()))
_queue.addNoDelay(entry);
else
deferredInvalidationQueueService.defer(entry);

View File

@@ -10,7 +10,6 @@ import com.usatiuk.dhfs.peersync.PeerInfoService;
import com.usatiuk.dhfs.peersync.PersistentPeerDataService;
import com.usatiuk.kleppmanntree.*;
import com.usatiuk.objects.JObjectKey;
import com.usatiuk.objects.transaction.LockingStrategy;
import com.usatiuk.objects.transaction.Transaction;
import com.usatiuk.objects.transaction.TransactionManager;
import io.quarkus.logging.Log;
@@ -39,9 +38,9 @@ public class JKleppmannTreeManager {
@Inject
PersistentPeerDataService persistentPeerDataService;
public JKleppmannTree getTree(JObjectKey name, LockingStrategy lockingStrategy, Supplier<JKleppmannTreeNodeMeta> rootNodeSupplier) {
public JKleppmannTree getTree(JObjectKey name, Supplier<JKleppmannTreeNodeMeta> rootNodeSupplier) {
return txManager.executeTx(() -> {
var data = curTx.get(JKleppmannTreePersistentData.class, name, lockingStrategy).orElse(null);
var data = curTx.get(JKleppmannTreePersistentData.class, name).orElse(null);
if (data == null) {
data = new JKleppmannTreePersistentData(
name,
@@ -66,18 +65,11 @@ public class JKleppmannTreeManager {
}
public Optional<JKleppmannTree> getTree(JObjectKey name) {
return getTree(name, LockingStrategy.WRITE);
}
public Optional<JKleppmannTree> getTree(JObjectKey name, LockingStrategy lockingStrategy) {
return txManager.executeTx(() -> {
return curTx.get(JKleppmannTreePersistentData.class, name, lockingStrategy).map(JKleppmannTree::new);
return curTx.get(JKleppmannTreePersistentData.class, name).map(JKleppmannTree::new);
});
}
public JKleppmannTree getTree(JObjectKey name, Supplier<JKleppmannTreeNodeMeta> rootNodeSupplier) {
return getTree(name, LockingStrategy.WRITE, rootNodeSupplier);
}
public class JKleppmannTree {
private final KleppmannTree<Long, PeerId, JKleppmannTreeNodeMeta, JObjectKey> _tree;

View File

@@ -5,7 +5,6 @@ import com.usatiuk.dhfs.jkleppmanntree.structs.JKleppmannTreeNodeHolder;
import com.usatiuk.dhfs.peersync.structs.JKleppmannTreeNodeMetaPeer;
import com.usatiuk.dhfs.remoteobj.RemoteTransaction;
import com.usatiuk.objects.JObjectKey;
import com.usatiuk.objects.transaction.LockingStrategy;
import com.usatiuk.objects.transaction.Transaction;
import com.usatiuk.objects.transaction.TransactionManager;
import io.quarkus.logging.Log;
@@ -29,14 +28,10 @@ public class PeerInfoService {
@Inject
RemoteTransaction remoteTx;
private JKleppmannTreeManager.JKleppmannTree getTreeW() {
private JKleppmannTreeManager.JKleppmannTree getTree() {
return jKleppmannTreeManager.getTree(TREE_KEY, () -> null);
}
private JKleppmannTreeManager.JKleppmannTree getTreeR() {
return jKleppmannTreeManager.getTree(TREE_KEY, LockingStrategy.OPTIMISTIC, () -> null);
}
public Optional<PeerInfo> getPeerInfoImpl(JObjectKey key) {
return jObjectTxManager.run(() -> {
return curTx.get(JKleppmannTreeNodeHolder.class, key).map(JKleppmannTreeNodeHolder::node).flatMap(node -> {
@@ -49,7 +44,7 @@ public class PeerInfoService {
public boolean existsPeer(PeerId peer) {
return jObjectTxManager.run(() -> {
var gotKey = getTreeR().traverse(List.of(JKleppmannTreeNodeMetaPeer.peerIdToNodeId(peer).value()));
var gotKey = getTree().traverse(List.of(JKleppmannTreeNodeMetaPeer.peerIdToNodeId(peer).value()));
if (gotKey == null) {
return false;
}
@@ -59,7 +54,7 @@ public class PeerInfoService {
public Optional<PeerInfo> getPeerInfo(PeerId peer) {
return jObjectTxManager.run(() -> {
var gotKey = getTreeR().traverse(List.of(JKleppmannTreeNodeMetaPeer.peerIdToNodeId(peer).value()));
var gotKey = getTree().traverse(List.of(JKleppmannTreeNodeMetaPeer.peerIdToNodeId(peer).value()));
if (gotKey == null) {
return Optional.empty();
}
@@ -72,7 +67,7 @@ public class PeerInfoService {
public List<PeerInfo> getPeers() {
return jObjectTxManager.run(() -> {
var gotKey = getTreeR().traverse(List.of());
var gotKey = getTree().traverse(List.of());
return curTx.get(JKleppmannTreeNodeHolder.class, gotKey).map(JKleppmannTreeNodeHolder::node).map(
node -> node.children().keySet().stream()
.map(JObjectKey::of).map(this::getPeerInfoImpl)
@@ -113,16 +108,16 @@ public class PeerInfoService {
public void putPeer(PeerId id, byte[] cert) {
jObjectTxManager.run(() -> {
var parent = getTreeW().traverse(List.of());
var parent = getTree().traverse(List.of());
var newPeerInfo = new PeerInfo(id, cert);
remoteTx.putData(newPeerInfo);
getTreeW().move(parent, new JKleppmannTreeNodeMetaPeer(newPeerInfo.id()), JKleppmannTreeNodeMetaPeer.peerIdToNodeId(newPeerInfo.id()));
getTree().move(parent, new JKleppmannTreeNodeMetaPeer(newPeerInfo.id()), JKleppmannTreeNodeMetaPeer.peerIdToNodeId(newPeerInfo.id()));
});
}
public void removePeer(PeerId id) {
jObjectTxManager.run(() -> {
var gotKey = getTreeR().traverse(List.of(JKleppmannTreeNodeMetaPeer.peerIdToNodeId(id).value()));
var gotKey = getTree().traverse(List.of(JKleppmannTreeNodeMetaPeer.peerIdToNodeId(id).value()));
if (gotKey == null) {
return;
}
@@ -131,7 +126,7 @@ public class PeerInfoService {
Log.warn("Peer " + id + " not found in the tree");
return;
}
getTreeW().trash(node.meta(), node.key());
getTree().trash(node.meta(), node.key());
curTx.onCommit(persistentPeerDataService::updateCerts);
});
}

View File

@@ -12,7 +12,7 @@ import org.eclipse.microprofile.config.inject.ConfigProperty;
@ApplicationScoped
public class PeerLastSeenUpdater {
@Inject
ConnectedPeerManager connectedPeerManager;
ReachablePeerManager reachablePeerManager;
@Inject
PeerInfoService peerInfoService;
@Inject
@@ -30,7 +30,7 @@ public class PeerLastSeenUpdater {
@Scheduled(every = "${dhfs.objects.last-seen.update}", concurrentExecution = Scheduled.ConcurrentExecution.SKIP)
@Blocking
void update() {
var snapshot = connectedPeerManager.getHostStateSnapshot();
var snapshot = reachablePeerManager.getHostStateSnapshot();
for (var a : snapshot.available()) {
txm.run(() -> {
var curInfo = remoteTransaction.getData(PeerInfo.class, a.id()).orElse(null);

View File

@@ -52,7 +52,7 @@ public class PersistentPeerDataService {
@Inject
TransactionManager txm;
@Inject
ConnectedPeerManager connectedPeerManager;
ReachablePeerManager reachablePeerManager;
@ConfigProperty(name = "dhfs.peerdiscovery.preset-uuid")
Optional<String> presetUuid;
@@ -135,7 +135,7 @@ public class PersistentPeerDataService {
}
curTx.put(data.withInitialSyncDone(data.initialSyncDone().minus(peerId)));
Log.infov("Did reset sync state for {0}", peerId);
curTx.onCommit(() -> connectedPeerManager.handleConnectionError(peerId));
curTx.onCommit(() -> reachablePeerManager.handleConnectionError(peerId));
return true;
});
}

View File

@@ -30,7 +30,7 @@ import java.util.stream.Collectors;
import java.util.stream.Stream;
@ApplicationScoped
public class ConnectedPeerManager {
public class ReachablePeerManager {
private final ConcurrentMap<PeerId, PeerAddress> _states = new ConcurrentHashMap<>();
private final Collection<PeerConnectedEventListener> _connectedListeners;
private final Collection<PeerDisconnectedEventListener> _disconnectedListeners;
@@ -58,7 +58,7 @@ public class ConnectedPeerManager {
SyncHandler syncHandler;
private ExecutorService _heartbeatExecutor;
public ConnectedPeerManager(Instance<PeerConnectedEventListener> connectedListeners, Instance<PeerDisconnectedEventListener> disconnectedListeners) {
public ReachablePeerManager(Instance<PeerConnectedEventListener> connectedListeners, Instance<PeerDisconnectedEventListener> disconnectedListeners) {
_connectedListeners = List.copyOf(connectedListeners.stream().toList());
_disconnectedListeners = List.copyOf(disconnectedListeners.stream().toList());
}

View File

@@ -3,7 +3,6 @@ package com.usatiuk.dhfs.remoteobj;
import com.usatiuk.dhfs.peersync.PersistentPeerDataService;
import com.usatiuk.dhfs.rpc.RemoteObjectServiceClient;
import com.usatiuk.objects.JObjectKey;
import com.usatiuk.objects.transaction.LockingStrategy;
import com.usatiuk.objects.transaction.Transaction;
import io.quarkus.logging.Log;
import jakarta.inject.Inject;
@@ -55,11 +54,11 @@ public class RemoteTransaction {
}
@SuppressWarnings("unchecked")
private <T extends JDataRemote> Optional<T> getData(Class<T> type, JObjectKey key, LockingStrategy strategy, boolean tryRequest) {
return curTx.get(RemoteObjectMeta.class, RemoteObjectMeta.ofMetaKey(key), strategy)
private <T extends JDataRemote> Optional<T> getData(Class<T> type, JObjectKey key, boolean tryRequest) {
return curTx.get(RemoteObjectMeta.class, RemoteObjectMeta.ofMetaKey(key))
.flatMap(obj -> {
if (obj.hasLocalData()) {
var realData = curTx.get(RemoteObjectDataWrapper.class, RemoteObjectMeta.ofDataKey(key), strategy).orElse(null);
var realData = curTx.get(RemoteObjectDataWrapper.class, RemoteObjectMeta.ofDataKey(key)).orElse(null);
if (realData == null)
throw new IllegalStateException("Local data not found for " + key); // TODO: Race
if (!type.isInstance(realData.data()))
@@ -72,8 +71,8 @@ public class RemoteTransaction {
});
}
public Optional<RemoteObjectMeta> getMeta(JObjectKey key, LockingStrategy strategy) {
return curTx.get(RemoteObjectMeta.class, RemoteObjectMeta.ofMetaKey(key), strategy);
public Optional<RemoteObjectMeta> getMeta(JObjectKey key) {
return curTx.get(RemoteObjectMeta.class, RemoteObjectMeta.ofMetaKey(key));
}
public <T extends JDataRemote> void putDataRaw(T obj) {
@@ -127,23 +126,12 @@ public class RemoteTransaction {
curTx.put(newData);
}
public Optional<RemoteObjectMeta> getMeta(JObjectKey key) {
return getMeta(key, LockingStrategy.OPTIMISTIC);
}
public <T extends JDataRemote> Optional<T> getData(Class<T> type, JObjectKey key) {
return getData(type, key, LockingStrategy.OPTIMISTIC, true);
return getData(type, key, true);
}
public <T extends JDataRemote> Optional<T> getDataLocal(Class<T> type, JObjectKey key) {
return getData(type, key, LockingStrategy.OPTIMISTIC, false);
return getData(type, key, false);
}
public <T extends JDataRemote> Optional<T> getData(Class<T> type, JObjectKey key, LockingStrategy strategy) {
return getData(type, key, strategy, true);
}
public <T extends JDataRemote> Optional<T> getDataLocal(Class<T> type, JObjectKey key, LockingStrategy strategy) {
return getData(type, key, strategy, false);
}
}

View File

@@ -4,7 +4,7 @@ import com.usatiuk.dhfs.ProtoSerializer;
import com.usatiuk.dhfs.invalidation.InvalidationQueueService;
import com.usatiuk.dhfs.invalidation.Op;
import com.usatiuk.dhfs.peersync.PeerId;
import com.usatiuk.dhfs.peersync.ConnectedPeerManager;
import com.usatiuk.dhfs.peersync.ReachablePeerManager;
import com.usatiuk.dhfs.peersync.PersistentPeerDataService;
import com.usatiuk.dhfs.persistence.JObjectKeyP;
import com.usatiuk.dhfs.remoteobj.ReceivedObject;
@@ -51,7 +51,7 @@ public class RemoteObjectServiceClient {
@Inject
ProtoSerializer<GetObjectReply, ReceivedObject> receivedObjectProtoSerializer;
@Inject
ConnectedPeerManager connectedPeerManager;
ReachablePeerManager reachablePeerManager;
public Pair<PeerId, ReceivedObject> getSpecificObject(JObjectKey key, PeerId peerId) {
return rpcClientFactory.withObjSyncClient(peerId, (peer, client) -> {
@@ -70,7 +70,7 @@ public class RemoteObjectServiceClient {
var targetVersion = objMeta.versionSum();
var targets = objMeta.knownRemoteVersions().isEmpty()
? connectedPeerManager.getAvailableHosts()
? reachablePeerManager.getAvailableHosts()
: objMeta.knownRemoteVersions().entrySet().stream()
.filter(entry -> entry.getValue().equals(targetVersion))
.map(Map.Entry::getKey).toList();

View File

@@ -5,7 +5,7 @@ import com.usatiuk.dhfs.autosync.AutosyncProcessor;
import com.usatiuk.dhfs.invalidation.Op;
import com.usatiuk.dhfs.invalidation.OpHandlerService;
import com.usatiuk.dhfs.peersync.PeerId;
import com.usatiuk.dhfs.peersync.ConnectedPeerManager;
import com.usatiuk.dhfs.peersync.ReachablePeerManager;
import com.usatiuk.dhfs.persistence.JObjectKeyP;
import com.usatiuk.dhfs.remoteobj.*;
import com.usatiuk.dhfs.repository.*;
@@ -31,7 +31,7 @@ public class RemoteObjectServiceServerImpl {
@Inject
TransactionManager txm;
@Inject
ConnectedPeerManager connectedPeerManager;
ReachablePeerManager reachablePeerManager;
@Inject
Transaction curTx;

View File

@@ -4,7 +4,7 @@ import com.usatiuk.dhfs.peerdiscovery.IpPeerAddress;
import com.usatiuk.dhfs.peerdiscovery.PeerAddress;
import com.usatiuk.dhfs.peersync.PeerDisconnectedEventListener;
import com.usatiuk.dhfs.peersync.PeerId;
import com.usatiuk.dhfs.peersync.ConnectedPeerManager;
import com.usatiuk.dhfs.peersync.ReachablePeerManager;
import com.usatiuk.dhfs.repository.DhfsObjectSyncGrpcGrpc;
import io.grpc.ManagedChannel;
import io.grpc.Status;
@@ -29,7 +29,7 @@ public class RpcClientFactory implements PeerDisconnectedEventListener {
long syncTimeout;
@Inject
ConnectedPeerManager connectedPeerManager;
ReachablePeerManager reachablePeerManager;
@Inject
RpcChannelFactory rpcChannelFactory;
@@ -56,7 +56,7 @@ public class RpcClientFactory implements PeerDisconnectedEventListener {
}
public <R> R withObjSyncClient(PeerId target, ObjectSyncClientFunction<R> fn) {
var hostinfo = connectedPeerManager.getAddress(target);
var hostinfo = reachablePeerManager.getAddress(target);
if (hostinfo == null)
throw new StatusRuntimeException(Status.UNAVAILABLE.withDescription("Not known to be reachable: " + target));

View File

@@ -2,7 +2,7 @@ package com.usatiuk.dhfs.webapi;
import com.usatiuk.dhfs.peersync.PeerId;
import com.usatiuk.dhfs.peersync.PeerInfoService;
import com.usatiuk.dhfs.peersync.ConnectedPeerManager;
import com.usatiuk.dhfs.peersync.ReachablePeerManager;
import com.usatiuk.dhfs.peersync.PersistentPeerDataService;
import jakarta.inject.Inject;
import jakarta.ws.rs.*;
@@ -14,7 +14,7 @@ public class PeerManagementApi {
@Inject
PeerInfoService peerInfoService;
@Inject
ConnectedPeerManager connectedPeerManager;
ReachablePeerManager reachablePeerManager;
@Inject
PersistentPeerDataService persistentPeerDataService;
@@ -23,27 +23,27 @@ public class PeerManagementApi {
public List<PeerInfo> knownPeers() {
return peerInfoService.getPeers().stream().map(
peerInfo -> new PeerInfo(peerInfo.id().toString(), Base64.getEncoder().encodeToString(peerInfo.cert().toByteArray()),
Optional.ofNullable(connectedPeerManager.getAddress(peerInfo.id())).map(Objects::toString).orElse(null))).toList();
Optional.ofNullable(reachablePeerManager.getAddress(peerInfo.id())).map(Objects::toString).orElse(null))).toList();
}
@Path("known-peers/{peerId}")
@PUT
public void addPeer(@PathParam("peerId") String peerId, KnownPeerPut knownPeerPut) {
connectedPeerManager.addRemoteHost(PeerId.of(peerId), knownPeerPut.cert());
reachablePeerManager.addRemoteHost(PeerId.of(peerId), knownPeerPut.cert());
}
@Path("known-peers/{peerId}")
@DELETE
public void deletePeer(@PathParam("peerId") String peerId) {
connectedPeerManager.removeRemoteHost(PeerId.of(peerId));
reachablePeerManager.removeRemoteHost(PeerId.of(peerId));
}
@Path("available-peers")
@GET
public Collection<PeerInfo> availablePeers() {
return connectedPeerManager.getSeenButNotAddedHosts().stream()
return reachablePeerManager.getSeenButNotAddedHosts().stream()
.map(p -> new PeerInfo(p.getLeft().toString(), p.getRight().cert(),
connectedPeerManager.selectBestAddress(p.getLeft()).map(Objects::toString).orElse(null)))
reachablePeerManager.selectBestAddress(p.getLeft()).map(Objects::toString).orElse(null)))
.toList();
}
}

View File

@@ -3,7 +3,7 @@ package com.usatiuk.dhfs.webapi;
import com.usatiuk.dhfs.peerdiscovery.PeerAddrStringHelper;
import com.usatiuk.dhfs.peersync.PeerId;
import com.usatiuk.dhfs.peersync.PeerInfoService;
import com.usatiuk.dhfs.peersync.ConnectedPeerManager;
import com.usatiuk.dhfs.peersync.ReachablePeerManager;
import com.usatiuk.dhfs.peersync.PersistentPeerDataService;
import jakarta.inject.Inject;
import jakarta.ws.rs.*;
@@ -15,7 +15,7 @@ public class PersistentPeerAddressApi {
@Inject
PeerInfoService peerInfoService;
@Inject
ConnectedPeerManager connectedPeerManager;
ReachablePeerManager reachablePeerManager;
@Inject
PersistentPeerDataService persistentPeerDataService;

View File

@@ -1,7 +0,0 @@
package com.usatiuk.utils;
@FunctionalInterface
public interface VoidFn {
void apply();
}

View File

@@ -1,7 +0,0 @@
package com.usatiuk.utils;
@FunctionalInterface
public interface VoidFnThrows {
void apply() throws Throwable;
}