12 Commits

Author SHA1 Message Date
7902d9e486 proper pages 2025-05-12 16:10:43 +02:00
7c056b9674 fix 2025-05-12 16:03:42 +02:00
8cc040b234 fix 2025-05-12 16:01:26 +02:00
f8375c9cd8 javadocs github pages 2025-05-12 15:56:07 +02:00
0c3524851e Some javadocs + CI 2025-05-12 12:49:10 +02:00
3eb7164c0f Dhfs-fuse: fix LazyFsIT tests 2025-05-10 16:49:58 +02:00
f544a67fb5 Objects: cleanup AutoCloseableNoThrow 2025-05-10 13:49:42 +02:00
964b3da951 Objects: remove getUsableSpace 2025-05-10 11:20:34 +02:00
cb33472dc5 Utils: remove VoidFn 2025-05-10 11:07:40 +02:00
de211bb2d2 Objects: remove prepareTx 2025-05-07 16:12:47 +02:00
56ab3bad4c Objects: remove TransactionPrivate 2025-05-07 15:00:15 +02:00
9403556220 Objects: remove TransactionFactory 2025-05-07 14:39:55 +02:00
36 changed files with 559 additions and 504 deletions

View File

@@ -43,11 +43,11 @@ jobs:
distribution: "zulu" distribution: "zulu"
cache: maven cache: maven
- name: Build LazyFS # - name: Build LazyFS
run: cd thirdparty/lazyfs/ && ./build.sh # run: cd thirdparty/lazyfs/ && ./build.sh
- name: Test with Maven - name: Test with Maven
run: cd dhfs-parent && mvn -T $(nproc) --batch-mode --update-snapshots package verify run: cd dhfs-parent && mvn -T $(nproc) --batch-mode --update-snapshots javadoc:aggregate
# - name: Build with Maven # - name: Build with Maven
# run: cd dhfs-parent && mvn --batch-mode --update-snapshots package # -Dquarkus.log.category.\"com.usatiuk.dhfs\".min-level=DEBUG # run: cd dhfs-parent && mvn --batch-mode --update-snapshots package # -Dquarkus.log.category.\"com.usatiuk.dhfs\".min-level=DEBUG
@@ -57,6 +57,11 @@ jobs:
name: DHFS Server Package name: DHFS Server Package
path: dhfs-parent/dhfs-fuse/target/quarkus-app path: dhfs-parent/dhfs-fuse/target/quarkus-app
- uses: actions/upload-artifact@v4
with:
name: DHFS Javadocs
path: dhfs-parent/target/reports/apidocs/
- uses: actions/upload-artifact@v4 - uses: actions/upload-artifact@v4
if: ${{ always() }} if: ${{ always() }}
with: with:
@@ -231,3 +236,37 @@ jobs:
with: with:
name: Run wrapper name: Run wrapper
path: ~/run-wrapper.tar.gz path: ~/run-wrapper.tar.gz
publish-javadoc:
environment:
name: github-pages
url: ${{ steps.deployment.outputs.page_url }}
runs-on: ubuntu-latest
permissions:
contents: read
pages: write
id-token: write
needs: [build-webui, build-dhfs]
steps:
- name: Checkout repository
uses: actions/checkout@v4
- uses: actions/download-artifact@v4
with:
name: DHFS Javadocs
path: dhfs-javadocs-downloaded
- name: Setup Pages
uses: actions/configure-pages@v5
- name: Upload artifact
uses: actions/upload-pages-artifact@v3
with:
# Upload entire repository
path: 'dhfs-javadocs-downloaded'
- name: Deploy to GitHub Pages
id: deployment
uses: actions/deploy-pages@v4

View File

@@ -41,3 +41,5 @@ nb-configuration.xml
# Plugin directory # Plugin directory
/.quarkus/cli/plugins/ /.quarkus/cli/plugins/
.jqwik-database

View File

@@ -5,6 +5,11 @@ import com.usatiuk.dhfs.remoteobj.JDataRemote;
import com.usatiuk.dhfs.remoteobj.JDataRemoteDto; import com.usatiuk.dhfs.remoteobj.JDataRemoteDto;
import com.usatiuk.objects.JObjectKey; import com.usatiuk.objects.JObjectKey;
/**
* ChunkData is a data structure that represents an immutable binary blob
* @param key unique key
* @param data binary data
*/
public record ChunkData(JObjectKey key, ByteString data) implements JDataRemote, JDataRemoteDto { public record ChunkData(JObjectKey key, ByteString data) implements JDataRemote, JDataRemoteDto {
@Override @Override
public int estimateSize() { public int estimateSize() {

View File

@@ -1,26 +0,0 @@
package com.usatiuk.dhfsfs.objects;
import com.usatiuk.dhfs.ProtoSerializer;
import com.usatiuk.dhfs.persistence.ChunkDataP;
import com.usatiuk.dhfs.persistence.JObjectKeyP;
import com.usatiuk.objects.JObjectKey;
import jakarta.inject.Singleton;
@Singleton
public class ChunkDataProtoSerializer implements ProtoSerializer<ChunkDataP, ChunkData> {
@Override
public ChunkData deserialize(ChunkDataP message) {
return new ChunkData(
JObjectKey.of(message.getKey().getName()),
message.getData()
);
}
@Override
public ChunkDataP serialize(ChunkData object) {
return ChunkDataP.newBuilder()
.setKey(JObjectKeyP.newBuilder().setName(object.key().value()).build())
.setData(object.data())
.build();
}
}

View File

@@ -9,6 +9,14 @@ import com.usatiuk.objects.JObjectKey;
import java.util.Collection; import java.util.Collection;
import java.util.Set; import java.util.Set;
/**
* File is a data structure that represents a file in the file system
* @param key unique key
* @param mode file mode
* @param cTime creation time
* @param mTime modification time
* @param symlink true if the file is a symlink, false otherwise
*/
public record File(JObjectKey key, long mode, long cTime, long mTime, public record File(JObjectKey key, long mode, long cTime, long mTime,
boolean symlink boolean symlink
) implements JDataRemote, JMapHolder<JMapLongKey> { ) implements JDataRemote, JMapHolder<JMapLongKey> {

View File

@@ -7,6 +7,11 @@ import org.apache.commons.lang3.tuple.Pair;
import java.util.List; import java.util.List;
/**
* FileDto is a data transfer object that contains a file and its chunks.
* @param file the file
* @param chunks the list of chunks, each represented as a pair of a long and a JObjectKey
*/
public record FileDto(File file, List<Pair<Long, JObjectKey>> chunks) implements JDataRemoteDto { public record FileDto(File file, List<Pair<Long, JObjectKey>> chunks) implements JDataRemoteDto {
@Override @Override
public Class<? extends JDataRemote> objClass() { public Class<? extends JDataRemote> objClass() {

View File

@@ -5,6 +5,9 @@ import com.usatiuk.dhfs.syncmap.DtoMapper;
import jakarta.enterprise.context.ApplicationScoped; import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject; import jakarta.inject.Inject;
/**
* Maps a {@link File} object to a {@link FileDto} object and vice versa.
*/
@ApplicationScoped @ApplicationScoped
public class FileDtoMapper implements DtoMapper<File, FileDto> { public class FileDtoMapper implements DtoMapper<File, FileDto> {
@Inject @Inject

View File

@@ -10,11 +10,20 @@ import org.apache.commons.lang3.tuple.Pair;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
/**
* Helper class for working with files.
*/
@ApplicationScoped @ApplicationScoped
public class FileHelper { public class FileHelper {
@Inject @Inject
JMapHelper jMapHelper; JMapHelper jMapHelper;
/**
* Get the chunks of a file.
* Transaction is expected to be already started.
* @param file the file to get chunks from
* @return a list of pairs of chunk offset and chunk key
*/
public List<Pair<Long, JObjectKey>> getChunks(File file) { public List<Pair<Long, JObjectKey>> getChunks(File file) {
ArrayList<Pair<Long, JObjectKey>> chunks = new ArrayList<>(); ArrayList<Pair<Long, JObjectKey>> chunks = new ArrayList<>();
try (var it = jMapHelper.getIterator(file)) { try (var it = jMapHelper.getIterator(file)) {
@@ -26,6 +35,13 @@ public class FileHelper {
return List.copyOf(chunks); return List.copyOf(chunks);
} }
/**
* Replace the chunks of a file.
* All previous chunks will be deleted.
* Transaction is expected to be already started.
* @param file the file to replace chunks in
* @param chunks the list of pairs of chunk offset and chunk key
*/
public void replaceChunks(File file, List<Pair<Long, JObjectKey>> chunks) { public void replaceChunks(File file, List<Pair<Long, JObjectKey>> chunks) {
jMapHelper.deleteAll(file); jMapHelper.deleteAll(file);

View File

@@ -1,25 +0,0 @@
package com.usatiuk.dhfsfs.objects;
import com.usatiuk.dhfs.ProtoSerializer;
import com.usatiuk.dhfs.persistence.FileDtoP;
import com.usatiuk.utils.SerializationHelper;
import jakarta.inject.Singleton;
import java.io.IOException;
@Singleton
public class FileProtoSerializer implements ProtoSerializer<FileDtoP, FileDto> {
@Override
public FileDto deserialize(FileDtoP message) {
try (var is = message.getSerializedData().newInput()) {
return SerializationHelper.deserialize(is);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public FileDtoP serialize(FileDto object) {
return FileDtoP.newBuilder().setSerializedData(SerializationHelper.serialize(object)).build();
}
}

View File

@@ -23,6 +23,9 @@ import javax.annotation.Nullable;
import java.util.List; import java.util.List;
import java.util.Objects; import java.util.Objects;
/**
* Handles synchronization of file objects.
*/
@ApplicationScoped @ApplicationScoped
public class FileSyncHandler implements ObjSyncHandler<File, FileDto> { public class FileSyncHandler implements ObjSyncHandler<File, FileDto> {
@Inject @Inject
@@ -45,6 +48,14 @@ public class FileSyncHandler implements ObjSyncHandler<File, FileDto> {
return jKleppmannTreeManager.getTree(JObjectKey.of("fs")).orElseThrow(); return jKleppmannTreeManager.getTree(JObjectKey.of("fs")).orElseThrow();
} }
/**
* Resolve conflict between two file versions, update the file in storage and create a conflict file.
*
* @param from the peer that sent the update
* @param key the key of the file
* @param receivedChangelog the changelog of the received file
* @param receivedData the received file data
*/
private void resolveConflict(PeerId from, JObjectKey key, PMap<PeerId, Long> receivedChangelog, private void resolveConflict(PeerId from, JObjectKey key, PMap<PeerId, Long> receivedChangelog,
@Nullable FileDto receivedData) { @Nullable FileDto receivedData) {
var oursCurMeta = curTx.get(RemoteObjectMeta.class, key).orElse(null); var oursCurMeta = curTx.get(RemoteObjectMeta.class, key).orElse(null);

View File

@@ -6,6 +6,10 @@ import com.usatiuk.objects.JObjectKey;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
/**
* JKleppmannTreeNodeMetaDirectory is a record that represents a directory in the JKleppmann tree.
* @param name the name of the directory
*/
public record JKleppmannTreeNodeMetaDirectory(String name) implements JKleppmannTreeNodeMeta { public record JKleppmannTreeNodeMetaDirectory(String name) implements JKleppmannTreeNodeMeta {
public JKleppmannTreeNodeMeta withName(String name) { public JKleppmannTreeNodeMeta withName(String name) {
return new JKleppmannTreeNodeMetaDirectory(name); return new JKleppmannTreeNodeMetaDirectory(name);

View File

@@ -6,6 +6,11 @@ import com.usatiuk.objects.JObjectKey;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
/**
* JKleppmannTreeNodeMetaFile is a record that represents a file in the JKleppmann tree.
* @param name the name of the file
* @param fileIno a reference to the `File` object
*/
public record JKleppmannTreeNodeMetaFile(String name, JObjectKey fileIno) implements JKleppmannTreeNodeMeta { public record JKleppmannTreeNodeMetaFile(String name, JObjectKey fileIno) implements JKleppmannTreeNodeMeta {
@Override @Override
public JKleppmannTreeNodeMeta withName(String name) { public JKleppmannTreeNodeMeta withName(String name) {

View File

@@ -39,6 +39,9 @@ import java.nio.file.Path;
import java.util.*; import java.util.*;
import java.util.stream.StreamSupport; import java.util.stream.StreamSupport;
/**
* Actual filesystem implementation.
*/
@ApplicationScoped @ApplicationScoped
public class DhfsFileService { public class DhfsFileService {
@ConfigProperty(name = "dhfs.files.target_chunk_alignment") @ConfigProperty(name = "dhfs.files.target_chunk_alignment")
@@ -71,6 +74,12 @@ public class DhfsFileService {
return jKleppmannTreeManager.getTree(JObjectKey.of("fs"), () -> new JKleppmannTreeNodeMetaDirectory("")); return jKleppmannTreeManager.getTree(JObjectKey.of("fs"), () -> new JKleppmannTreeNodeMetaDirectory(""));
} }
/**
* Create a new chunk with the given data and a new unique ID.
*
* @param bytes the data to store in the chunk
* @return the created chunk
*/
private ChunkData createChunk(ByteString bytes) { private ChunkData createChunk(ByteString bytes) {
var newChunk = new ChunkData(JObjectKey.of(UUID.randomUUID().toString()), bytes); var newChunk = new ChunkData(JObjectKey.of(UUID.randomUUID().toString()), bytes);
remoteTx.putDataNew(newChunk); remoteTx.putDataNew(newChunk);
@@ -82,14 +91,7 @@ public class DhfsFileService {
getTree(); getTree();
} }
private JKleppmannTreeNode getDirEntryW(String name) { private JKleppmannTreeNode getDirEntry(String name) {
var res = getTree().traverse(StreamSupport.stream(Path.of(name).spliterator(), false).map(p -> p.toString()).toList());
if (res == null) throw new StatusRuntimeExceptionNoStacktrace(Status.NOT_FOUND);
var ret = curTx.get(JKleppmannTreeNodeHolder.class, res).map(JKleppmannTreeNodeHolder::node).orElseThrow(() -> new StatusRuntimeException(Status.NOT_FOUND.withDescription("Tree node exists but not found as jObject: " + name)));
return ret;
}
private JKleppmannTreeNode getDirEntryR(String name) {
var res = getTree().traverse(StreamSupport.stream(Path.of(name).spliterator(), false).map(p -> p.toString()).toList()); var res = getTree().traverse(StreamSupport.stream(Path.of(name).spliterator(), false).map(p -> p.toString()).toList());
if (res == null) throw new StatusRuntimeExceptionNoStacktrace(Status.NOT_FOUND); if (res == null) throw new StatusRuntimeExceptionNoStacktrace(Status.NOT_FOUND);
var ret = curTx.get(JKleppmannTreeNodeHolder.class, res).map(JKleppmannTreeNodeHolder::node).orElseThrow(() -> new StatusRuntimeException(Status.NOT_FOUND.withDescription("Tree node exists but not found as jObject: " + name))); var ret = curTx.get(JKleppmannTreeNodeHolder.class, res).map(JKleppmannTreeNodeHolder::node).orElseThrow(() -> new StatusRuntimeException(Status.NOT_FOUND.withDescription("Tree node exists but not found as jObject: " + name)));
@@ -103,6 +105,11 @@ public class DhfsFileService {
return ret; return ret;
} }
/**
* Get the attributes of a file or directory.
* @param uuid the UUID of the file or directory
* @return the attributes of the file or directory
*/
public Optional<GetattrRes> getattr(JObjectKey uuid) { public Optional<GetattrRes> getattr(JObjectKey uuid) {
return jObjectTxManager.executeTx(() -> { return jObjectTxManager.executeTx(() -> {
var ref = curTx.get(JData.class, uuid).orElse(null); var ref = curTx.get(JData.class, uuid).orElse(null);
@@ -124,10 +131,15 @@ public class DhfsFileService {
}); });
} }
/**
* Try to resolve a path to a file or directory.
* @param name the path to resolve
* @return the key of the file or directory, or an empty optional if it does not exist
*/
public Optional<JObjectKey> open(String name) { public Optional<JObjectKey> open(String name) {
return jObjectTxManager.executeTx(() -> { return jObjectTxManager.executeTx(() -> {
try { try {
var ret = getDirEntryR(name); var ret = getDirEntry(name);
return switch (ret.meta()) { return switch (ret.meta()) {
case JKleppmannTreeNodeMetaFile f -> Optional.of(f.fileIno()); case JKleppmannTreeNodeMetaFile f -> Optional.of(f.fileIno());
case JKleppmannTreeNodeMetaDirectory f -> Optional.of(ret.key()); case JKleppmannTreeNodeMetaDirectory f -> Optional.of(ret.key());
@@ -147,10 +159,16 @@ public class DhfsFileService {
throw new StatusRuntimeExceptionNoStacktrace(Status.INVALID_ARGUMENT.withDescription("Not a directory: " + entry.key())); throw new StatusRuntimeExceptionNoStacktrace(Status.INVALID_ARGUMENT.withDescription("Not a directory: " + entry.key()));
} }
/**
* Create a new file with the given name and mode.
* @param name the name of the file
* @param mode the mode of the file
* @return the key of the created file
*/
public Optional<JObjectKey> create(String name, long mode) { public Optional<JObjectKey> create(String name, long mode) {
return jObjectTxManager.executeTx(() -> { return jObjectTxManager.executeTx(() -> {
Path path = Path.of(name); Path path = Path.of(name);
var parent = getDirEntryW(path.getParent().toString()); var parent = getDirEntry(path.getParent().toString());
ensureDir(parent); ensureDir(parent);
@@ -171,9 +189,14 @@ public class DhfsFileService {
}); });
} }
//FIXME: Slow.. /**
* Get the parent directory of a file or directory.
* @param ino the key of the file or directory
* @return the parent directory
*/
public Pair<String, JObjectKey> inoToParent(JObjectKey ino) { public Pair<String, JObjectKey> inoToParent(JObjectKey ino) {
return jObjectTxManager.executeTx(() -> { return jObjectTxManager.executeTx(() -> {
// FIXME: Slow
return getTree().findParent(w -> { return getTree().findParent(w -> {
if (w.meta() instanceof JKleppmannTreeNodeMetaFile f) if (w.meta() instanceof JKleppmannTreeNodeMetaFile f)
return f.fileIno().equals(ino); return f.fileIno().equals(ino);
@@ -182,20 +205,31 @@ public class DhfsFileService {
}); });
} }
/**
* Create a new directory with the given name and mode.
* @param name the name of the directory
* @param mode the mode of the directory
*/
public void mkdir(String name, long mode) { public void mkdir(String name, long mode) {
jObjectTxManager.executeTx(() -> { jObjectTxManager.executeTx(() -> {
Path path = Path.of(name); Path path = Path.of(name);
var parent = getDirEntryW(path.getParent().toString()); var parent = getDirEntry(path.getParent().toString());
ensureDir(parent); ensureDir(parent);
String dname = path.getFileName().toString(); String dname = path.getFileName().toString();
Log.debug("Creating directory " + name); Log.debug("Creating directory " + name);
// TODO: No modes for directories yet
getTree().move(parent.key(), new JKleppmannTreeNodeMetaDirectory(dname), getTree().getNewNodeId()); getTree().move(parent.key(), new JKleppmannTreeNodeMetaDirectory(dname), getTree().getNewNodeId());
}); });
} }
/**
* Unlink a file or directory.
* @param name the name of the file or directory
* @throws DirectoryNotEmptyException if the directory is not empty and recursive delete is not allowed
*/
public void unlink(String name) { public void unlink(String name) {
jObjectTxManager.executeTx(() -> { jObjectTxManager.executeTx(() -> {
var node = getDirEntryOpt(name).orElse(null); var node = getDirEntryOpt(name).orElse(null);
@@ -209,13 +243,19 @@ public class DhfsFileService {
}); });
} }
/**
* Rename a file or directory.
* @param from the old name
* @param to the new name
* @return true if the rename was successful, false otherwise
*/
public Boolean rename(String from, String to) { public Boolean rename(String from, String to) {
return jObjectTxManager.executeTx(() -> { return jObjectTxManager.executeTx(() -> {
var node = getDirEntryW(from); var node = getDirEntry(from);
JKleppmannTreeNodeMeta meta = node.meta(); JKleppmannTreeNodeMeta meta = node.meta();
var toPath = Path.of(to); var toPath = Path.of(to);
var toDentry = getDirEntryW(toPath.getParent().toString()); var toDentry = getDirEntry(toPath.getParent().toString());
ensureDir(toDentry); ensureDir(toDentry);
getTree().move(toDentry.key(), meta.withName(toPath.getFileName().toString()), node.key()); getTree().move(toDentry.key(), meta.withName(toPath.getFileName().toString()), node.key());
@@ -223,6 +263,12 @@ public class DhfsFileService {
}); });
} }
/**
* Change the mode of a file or directory.
* @param uuid the ID of the file or directory
* @param mode the new mode
* @return true if the mode was changed successfully, false otherwise
*/
public Boolean chmod(JObjectKey uuid, long mode) { public Boolean chmod(JObjectKey uuid, long mode) {
return jObjectTxManager.executeTx(() -> { return jObjectTxManager.executeTx(() -> {
var dent = curTx.get(JData.class, uuid).orElseThrow(() -> new StatusRuntimeExceptionNoStacktrace(Status.NOT_FOUND)); var dent = curTx.get(JData.class, uuid).orElseThrow(() -> new StatusRuntimeExceptionNoStacktrace(Status.NOT_FOUND));
@@ -243,9 +289,14 @@ public class DhfsFileService {
}); });
} }
/**
* Read the contents of a directory.
* @param name the path of the directory
* @return an iterable of the names of the files in the directory
*/
public Iterable<String> readDir(String name) { public Iterable<String> readDir(String name) {
return jObjectTxManager.executeTx(() -> { return jObjectTxManager.executeTx(() -> {
var found = getDirEntryW(name); var found = getDirEntry(name);
if (!(found.meta() instanceof JKleppmannTreeNodeMetaDirectory md)) if (!(found.meta() instanceof JKleppmannTreeNodeMetaDirectory md))
throw new StatusRuntimeException(Status.INVALID_ARGUMENT); throw new StatusRuntimeException(Status.INVALID_ARGUMENT);
@@ -254,6 +305,13 @@ public class DhfsFileService {
}); });
} }
/**
* Read the contents of a file.
* @param fileUuid the ID of the file
* @param offset the offset to start reading from
* @param length the number of bytes to read
* @return the contents of the file as a ByteString
*/
public ByteString read(JObjectKey fileUuid, long offset, int length) { public ByteString read(JObjectKey fileUuid, long offset, int length) {
return jObjectTxManager.executeTx(() -> { return jObjectTxManager.executeTx(() -> {
if (length < 0) if (length < 0)
@@ -315,6 +373,11 @@ public class DhfsFileService {
}); });
} }
/**
* Get the size of a file.
* @param uuid the ID of the file
* @return the size of the file
*/
private ByteString readChunk(JObjectKey uuid) { private ByteString readChunk(JObjectKey uuid) {
var chunkRead = remoteTx.getData(ChunkData.class, uuid).orElse(null); var chunkRead = remoteTx.getData(ChunkData.class, uuid).orElse(null);
@@ -326,6 +389,11 @@ public class DhfsFileService {
return chunkRead.data(); return chunkRead.data();
} }
/**
* Get the size of a chunk.
* @param uuid the ID of the chunk
* @return the size of the chunk
*/
private int getChunkSize(JObjectKey uuid) { private int getChunkSize(JObjectKey uuid) {
return readChunk(uuid).size(); return readChunk(uuid).size();
} }
@@ -334,6 +402,13 @@ public class DhfsFileService {
return num & -(1L << n); return num & -(1L << n);
} }
/**
* Write data to a file.
* @param fileUuid the ID of the file
* @param offset the offset to write to
* @param data the data to write
* @return the number of bytes written
*/
public Long write(JObjectKey fileUuid, long offset, ByteString data) { public Long write(JObjectKey fileUuid, long offset, ByteString data) {
return jObjectTxManager.executeTx(() -> { return jObjectTxManager.executeTx(() -> {
if (offset < 0) if (offset < 0)
@@ -436,6 +511,12 @@ public class DhfsFileService {
}); });
} }
/**
* Truncate a file to the given length.
* @param fileUuid the ID of the file
* @param length the new length of the file
* @return true if the truncate was successful, false otherwise
*/
public Boolean truncate(JObjectKey fileUuid, long length) { public Boolean truncate(JObjectKey fileUuid, long length) {
return jObjectTxManager.executeTx(() -> { return jObjectTxManager.executeTx(() -> {
if (length < 0) if (length < 0)
@@ -525,6 +606,12 @@ public class DhfsFileService {
}); });
} }
/**
* Fill the given range with zeroes.
* @param fillStart the start of the range
* @param length the end of the range
* @param newChunks the map to store the new chunks in
*/
private void fillZeros(long fillStart, long length, Map<Long, JObjectKey> newChunks) { private void fillZeros(long fillStart, long length, Map<Long, JObjectKey> newChunks) {
long combinedSize = (length - fillStart); long combinedSize = (length - fillStart);
@@ -560,12 +647,22 @@ public class DhfsFileService {
} }
} }
/**
* Read the contents of a symlink.
* @param uuid the ID of the symlink
* @return the contents of the symlink as a string
*/
public String readlink(JObjectKey uuid) { public String readlink(JObjectKey uuid) {
return jObjectTxManager.executeTx(() -> { return jObjectTxManager.executeTx(() -> {
return readlinkBS(uuid).toStringUtf8(); return readlinkBS(uuid).toStringUtf8();
}); });
} }
/**
* Read the contents of a symlink as a ByteString.
* @param uuid the ID of the symlink
* @return the contents of the symlink as a ByteString
*/
public ByteString readlinkBS(JObjectKey uuid) { public ByteString readlinkBS(JObjectKey uuid) {
return jObjectTxManager.executeTx(() -> { return jObjectTxManager.executeTx(() -> {
var fileOpt = remoteTx.getData(File.class, uuid).orElseThrow(() -> new StatusRuntimeException(Status.NOT_FOUND.withDescription("File not found when trying to readlink: " + uuid))); var fileOpt = remoteTx.getData(File.class, uuid).orElseThrow(() -> new StatusRuntimeException(Status.NOT_FOUND.withDescription("File not found when trying to readlink: " + uuid)));
@@ -573,10 +670,16 @@ public class DhfsFileService {
}); });
} }
/**
* Create a symlink.
* @param oldpath the target of the symlink
* @param newpath the path of the symlink
* @return the key of the created symlink
*/
public JObjectKey symlink(String oldpath, String newpath) { public JObjectKey symlink(String oldpath, String newpath) {
return jObjectTxManager.executeTx(() -> { return jObjectTxManager.executeTx(() -> {
Path path = Path.of(newpath); Path path = Path.of(newpath);
var parent = getDirEntryW(path.getParent().toString()); var parent = getDirEntry(path.getParent().toString());
ensureDir(parent); ensureDir(parent);
@@ -595,6 +698,13 @@ public class DhfsFileService {
}); });
} }
/**
* Set the access and modification times of a file.
* @param fileUuid the ID of the file
* @param atimeMs the access time in milliseconds
* @param mtimeMs the modification time in milliseconds
* @return true if the times were set successfully, false otherwise
*/
public Boolean setTimes(JObjectKey fileUuid, long atimeMs, long mtimeMs) { public Boolean setTimes(JObjectKey fileUuid, long atimeMs, long mtimeMs) {
return jObjectTxManager.executeTx(() -> { return jObjectTxManager.executeTx(() -> {
var dent = curTx.get(JData.class, fileUuid).orElseThrow(() -> new StatusRuntimeExceptionNoStacktrace(Status.NOT_FOUND)); var dent = curTx.get(JData.class, fileUuid).orElseThrow(() -> new StatusRuntimeExceptionNoStacktrace(Status.NOT_FOUND));
@@ -616,6 +726,11 @@ public class DhfsFileService {
}); });
} }
/**
* Get the size of a file.
* @param fileUuid the ID of the file
* @return the size of the file
*/
public long size(JObjectKey fileUuid) { public long size(JObjectKey fileUuid) {
return jObjectTxManager.executeTx(() -> { return jObjectTxManager.executeTx(() -> {
long realSize = 0; long realSize = 0;
@@ -635,6 +750,13 @@ public class DhfsFileService {
}); });
} }
/**
* Write data to a file.
* @param fileUuid the ID of the file
* @param offset the offset to write to
* @param data the data to write
* @return the number of bytes written
*/
public Long write(JObjectKey fileUuid, long offset, byte[] data) { public Long write(JObjectKey fileUuid, long offset, byte[] data) {
return write(fileUuid, offset, UnsafeByteOperations.unsafeWrap(data)); return write(fileUuid, offset, UnsafeByteOperations.unsafeWrap(data));
} }

View File

@@ -1,5 +1,10 @@
package com.usatiuk.dhfsfs.service; package com.usatiuk.dhfsfs.service;
/**
* DirectoryNotEmptyException is thrown when a directory is not empty.
* This exception is used to indicate that a directory cannot be deleted
* because it contains files or subdirectories.
*/
public class DirectoryNotEmptyException extends RuntimeException { public class DirectoryNotEmptyException extends RuntimeException {
@Override @Override
public synchronized Throwable fillInStackTrace() { public synchronized Throwable fillInStackTrace() {

View File

@@ -1,4 +1,11 @@
package com.usatiuk.dhfsfs.service; package com.usatiuk.dhfsfs.service;
public record GetattrRes(long mtime, long ctime, long mode, GetattrType type) { /**
* GetattrRes is a record that represents the result of a getattr operation.
* @param mtime File modification time
* @param ctime File creation time
* @param mode File mode
* @param type File type
*/
public record GetattrRes(long mtime, long ctime, long mode, GetattrType type) {
} }

View File

@@ -154,7 +154,7 @@ public class LazyFsIT {
Thread.sleep(3000); Thread.sleep(3000);
Log.info("Killing"); Log.info("Killing");
lazyFs1.crash(); lazyFs1.crash();
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Caused by: org.lmdbjava"), 60, TimeUnit.SECONDS); waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("org.lmdbjava.LmdbNativeException"), 60, TimeUnit.SECONDS);
var client = DockerClientFactory.instance().client(); var client = DockerClientFactory.instance().client();
client.killContainerCmd(container1.getContainerId()).exec(); client.killContainerCmd(container1.getContainerId()).exec();
container1.stop(); container1.stop();
@@ -195,7 +195,7 @@ public class LazyFsIT {
lazyFs1.crash(); lazyFs1.crash();
} }
try { try {
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Caused by: org.lmdbjava"), 60, TimeUnit.SECONDS); waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("org.lmdbjava.LmdbNativeException"), 60, TimeUnit.SECONDS);
} catch (TimeoutException e) { } catch (TimeoutException e) {
// Sometimes crash doesn't work // Sometimes crash doesn't work
Log.info("Failed to crash: " + testInfo.getDisplayName()); Log.info("Failed to crash: " + testInfo.getDisplayName());
@@ -237,7 +237,7 @@ public class LazyFsIT {
Thread.sleep(3000); Thread.sleep(3000);
Log.info("Killing"); Log.info("Killing");
lazyFs1.crash(); lazyFs1.crash();
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Caused by: org.lmdbjava"), 60, TimeUnit.SECONDS); waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("org.lmdbjava.LmdbNativeException"), 60, TimeUnit.SECONDS);
var client = DockerClientFactory.instance().client(); var client = DockerClientFactory.instance().client();
client.killContainerCmd(container1.getContainerId()).exec(); client.killContainerCmd(container1.getContainerId()).exec();
container1.stop(); container1.stop();
@@ -279,7 +279,7 @@ public class LazyFsIT {
lazyFs1.crash(); lazyFs1.crash();
} }
try { try {
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Caused by: org.lmdbjava"), 60, TimeUnit.SECONDS); waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("org.lmdbjava.LmdbNativeException"), 60, TimeUnit.SECONDS);
} catch (TimeoutException e) { } catch (TimeoutException e) {
// Sometimes crash doesn't work // Sometimes crash doesn't work
Log.info("Failed to crash: " + testInfo.getDisplayName()); Log.info("Failed to crash: " + testInfo.getDisplayName());
@@ -322,7 +322,7 @@ public class LazyFsIT {
Log.info("Killing"); Log.info("Killing");
lazyFs2.crash(); lazyFs2.crash();
container1.execInContainer("/bin/sh", "-c", "touch /tmp/stopprinting1"); container1.execInContainer("/bin/sh", "-c", "touch /tmp/stopprinting1");
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Caused by: org.lmdbjava"), 60, TimeUnit.SECONDS); waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("org.lmdbjava.LmdbNativeException"), 60, TimeUnit.SECONDS);
var client = DockerClientFactory.instance().client(); var client = DockerClientFactory.instance().client();
client.killContainerCmd(container2.getContainerId()).exec(); client.killContainerCmd(container2.getContainerId()).exec();
container2.stop(); container2.stop();
@@ -366,7 +366,7 @@ public class LazyFsIT {
} }
container1.execInContainer("/bin/sh", "-c", "touch /tmp/stopprinting2"); container1.execInContainer("/bin/sh", "-c", "touch /tmp/stopprinting2");
try { try {
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Caused by: org.lmdbjava"), 60, TimeUnit.SECONDS); waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("org.lmdbjava.LmdbNativeException"), 60, TimeUnit.SECONDS);
} catch (TimeoutException e) { } catch (TimeoutException e) {
// Sometimes crash doesn't work // Sometimes crash doesn't work
Log.info("Failed to crash: " + testInfo.getDisplayName()); Log.info("Failed to crash: " + testInfo.getDisplayName());
@@ -409,7 +409,7 @@ public class LazyFsIT {
Log.info("Killing"); Log.info("Killing");
lazyFs2.crash(); lazyFs2.crash();
container1.execInContainer("/bin/sh", "-c", "touch /tmp/stopprinting1"); container1.execInContainer("/bin/sh", "-c", "touch /tmp/stopprinting1");
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Caused by: org.lmdbjava"), 60, TimeUnit.SECONDS); waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("org.lmdbjava.LmdbNativeException"), 60, TimeUnit.SECONDS);
var client = DockerClientFactory.instance().client(); var client = DockerClientFactory.instance().client();
client.killContainerCmd(container2.getContainerId()).exec(); client.killContainerCmd(container2.getContainerId()).exec();
container2.stop(); container2.stop();
@@ -453,7 +453,7 @@ public class LazyFsIT {
} }
container1.execInContainer("/bin/sh", "-c", "touch /tmp/stopprinting2"); container1.execInContainer("/bin/sh", "-c", "touch /tmp/stopprinting2");
try { try {
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Caused by: org.lmdbjava"), 60, TimeUnit.SECONDS); waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("org.lmdbjava.LmdbNativeException"), 60, TimeUnit.SECONDS);
} catch (TimeoutException e) { } catch (TimeoutException e) {
// Sometimes crash doesn't work // Sometimes crash doesn't work
Log.info("Failed to crash: " + testInfo.getDisplayName()); Log.info("Failed to crash: " + testInfo.getDisplayName());

View File

@@ -1,32 +0,0 @@
package com.usatiuk.kleppmanntree;
import java.io.Serializable;
public class AtomicClock implements Clock<Long>, Serializable {
private long _max = 0;
public AtomicClock(long counter) {
_max = counter;
}
@Override
public Long getTimestamp() {
return ++_max;
}
public void setTimestamp(Long timestamp) {
_max = timestamp;
}
@Override
public Long peekTimestamp() {
return _max;
}
@Override
public Long updateTimestamp(Long receivedTimestamp) {
var old = _max;
_max = Math.max(_max, receivedTimestamp) + 1;
return old;
}
}

View File

@@ -1,11 +1,10 @@
package com.usatiuk.objects.iterators; package com.usatiuk.objects.iterators;
import com.usatiuk.utils.AutoCloseableNoThrow;
import org.apache.commons.lang3.tuple.Pair; import org.apache.commons.lang3.tuple.Pair;
import java.util.Iterator; import java.util.Iterator;
public interface CloseableKvIterator<K extends Comparable<? super K>, V> extends Iterator<Pair<K, V>>, AutoCloseableNoThrow { public interface CloseableKvIterator<K extends Comparable<? super K>, V> extends Iterator<Pair<K, V>>, AutoCloseable {
K peekNextKey(); K peekNextKey();
void skip(); void skip();
@@ -21,4 +20,7 @@ public interface CloseableKvIterator<K extends Comparable<? super K>, V> extends
default CloseableKvIterator<K, V> reversed() { default CloseableKvIterator<K, V> reversed() {
return new ReversedKvIterator<K, V>(this); return new ReversedKvIterator<K, V>(this);
} }
@Override
void close();
} }

View File

@@ -3,19 +3,20 @@ package com.usatiuk.objects.snapshot;
import com.usatiuk.objects.iterators.CloseableKvIterator; import com.usatiuk.objects.iterators.CloseableKvIterator;
import com.usatiuk.objects.iterators.IteratorStart; import com.usatiuk.objects.iterators.IteratorStart;
import com.usatiuk.objects.iterators.MaybeTombstone; import com.usatiuk.objects.iterators.MaybeTombstone;
import com.usatiuk.objects.iterators.Tombstone;
import com.usatiuk.utils.AutoCloseableNoThrow;
import javax.annotation.Nonnull; import javax.annotation.Nonnull;
import java.util.List; import java.util.List;
import java.util.Optional; import java.util.Optional;
import java.util.stream.Stream;
public interface Snapshot<K extends Comparable<K>, V> extends AutoCloseableNoThrow { public interface Snapshot<K extends Comparable<K>, V> extends AutoCloseable {
List<CloseableKvIterator<K, MaybeTombstone<V>>> getIterator(IteratorStart start, K key); List<CloseableKvIterator<K, MaybeTombstone<V>>> getIterator(IteratorStart start, K key);
@Nonnull @Nonnull
Optional<V> readObject(K name); Optional<V> readObject(K name);
long id(); long id();
@Override
void close();
} }

View File

@@ -31,7 +31,6 @@ public class CachingObjectPersistentStore {
SerializingObjectPersistentStore delegate; SerializingObjectPersistentStore delegate;
@ConfigProperty(name = "dhfs.objects.lru.print-stats") @ConfigProperty(name = "dhfs.objects.lru.print-stats")
boolean printStats; boolean printStats;
private ExecutorService _commitExecutor;
private ExecutorService _statusExecutor; private ExecutorService _statusExecutor;
private AtomicLong _cached = new AtomicLong(); private AtomicLong _cached = new AtomicLong();
private AtomicLong _cacheTries = new AtomicLong(); private AtomicLong _cacheTries = new AtomicLong();
@@ -47,7 +46,6 @@ public class CachingObjectPersistentStore {
_cache.set(_cache.get().withVersion(s.id())); _cache.set(_cache.get().withVersion(s.id()));
} }
_commitExecutor = Executors.newSingleThreadExecutor();
if (printStats) { if (printStats) {
_statusExecutor = Executors.newSingleThreadExecutor(); _statusExecutor = Executors.newSingleThreadExecutor();
_statusExecutor.submit(() -> { _statusExecutor.submit(() -> {
@@ -68,7 +66,6 @@ public class CachingObjectPersistentStore {
Log.tracev("Committing: {0} writes, {1} deletes", objs.written().size(), objs.deleted().size()); Log.tracev("Committing: {0} writes, {1} deletes", objs.written().size(), objs.deleted().size());
var cache = _cache.get(); var cache = _cache.get();
var commitFuture = _commitExecutor.submit(() -> delegate.prepareTx(objs, txId).run());
for (var write : objs.written()) { for (var write : objs.written()) {
cache = cache.withPut(write.getLeft(), Optional.of(write.getRight())); cache = cache.withPut(write.getLeft(), Optional.of(write.getRight()));
} }
@@ -76,11 +73,7 @@ public class CachingObjectPersistentStore {
cache = cache.withPut(del, Optional.empty()); cache = cache.withPut(del, Optional.empty());
} }
cache = cache.withVersion(txId); cache = cache.withVersion(txId);
try { delegate.commitTx(objs, txId);
commitFuture.get();
} catch (Exception e) {
throw new RuntimeException(e);
}
_cache.set(cache); _cache.set(cache);
Log.tracev("Committed: {0} writes, {1} deletes", objs.written().size(), objs.deleted().size()); Log.tracev("Committed: {0} writes, {1} deletes", objs.written().size(), objs.deleted().size());

View File

@@ -145,10 +145,9 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore {
} }
@Override @Override
public Runnable prepareTx(TxManifestRaw names, long txId) { public void commitTx(TxManifestRaw names, long txId) {
verifyReady(); verifyReady();
var txn = _env.txnWrite(); try (var txn = _env.txnWrite()) {
try {
for (var written : names.written()) { for (var written : names.written()) {
var putBb = _db.reserve(txn, written.getKey().toByteBuffer(), written.getValue().size()); var putBb = _db.reserve(txn, written.getKey().toByteBuffer(), written.getValue().size());
written.getValue().copyTo(putBb); written.getValue().copyTo(putBb);
@@ -163,17 +162,8 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore {
bbData.putLong(txId); bbData.putLong(txId);
bbData.flip(); bbData.flip();
_db.put(txn, DB_VER_OBJ_NAME.asReadOnlyBuffer(), bbData); _db.put(txn, DB_VER_OBJ_NAME.asReadOnlyBuffer(), bbData);
} catch (Throwable t) { txn.commit();
txn.close();
throw t;
} }
return () -> {
try {
txn.commit();
} finally {
txn.close();
}
};
} }
@Override @Override
@@ -188,12 +178,6 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore {
return _root.toFile().getFreeSpace(); return _root.toFile().getFreeSpace();
} }
@Override
public long getUsableSpace() {
verifyReady();
return _root.toFile().getUsableSpace();
}
private class LmdbKvIterator extends ReversibleKvIterator<JObjectKey, MaybeTombstone<ByteBuffer>> { private class LmdbKvIterator extends ReversibleKvIterator<JObjectKey, MaybeTombstone<ByteBuffer>> {
private static final Cleaner CLEANER = Cleaner.create(); private static final Cleaner CLEANER = Cleaner.create();
private final Txn<ByteBuffer> _txn; // Managed by the snapshot private final Txn<ByteBuffer> _txn; // Managed by the snapshot

View File

@@ -53,19 +53,18 @@ public class MemoryObjectPersistentStore implements ObjectPersistentStore {
} }
} }
public Runnable prepareTx(TxManifestRaw names, long txId) { @Override
return () -> { public void commitTx(TxManifestRaw names, long txId) {
synchronized (this) { synchronized (this) {
for (var written : names.written()) { for (var written : names.written()) {
_objects = _objects.plus(written.getKey(), written.getValue()); _objects = _objects.plus(written.getKey(), written.getValue());
}
for (JObjectKey key : names.deleted()) {
_objects = _objects.minus(key);
}
assert txId > _lastCommitId;
_lastCommitId = txId;
} }
}; for (JObjectKey key : names.deleted()) {
_objects = _objects.minus(key);
}
assert txId > _lastCommitId;
_lastCommitId = txId;
}
} }
@Override @Override
@@ -77,9 +76,4 @@ public class MemoryObjectPersistentStore implements ObjectPersistentStore {
public long getFreeSpace() { public long getFreeSpace() {
return 0; return 0;
} }
@Override
public long getUsableSpace() {
return 0;
}
} }

View File

@@ -13,11 +13,9 @@ import java.util.Optional;
public interface ObjectPersistentStore { public interface ObjectPersistentStore {
Snapshot<JObjectKey, ByteBuffer> getSnapshot(); Snapshot<JObjectKey, ByteBuffer> getSnapshot();
Runnable prepareTx(TxManifestRaw names, long txId); void commitTx(TxManifestRaw names, long txId);
long getTotalSpace(); long getTotalSpace();
long getFreeSpace(); long getFreeSpace();
long getUsableSpace();
} }

View File

@@ -62,7 +62,7 @@ public class SerializingObjectPersistentStore {
, objs.deleted()); , objs.deleted());
} }
Runnable prepareTx(TxManifestObj<? extends JDataVersionedWrapper> objects, long txId) { void commitTx(TxManifestObj<? extends JDataVersionedWrapper> objects, long txId) {
return delegateStore.prepareTx(prepareManifest(objects), txId); delegateStore.commitTx(prepareManifest(objects), txId);
} }
} }

View File

@@ -188,7 +188,7 @@ public class WritebackObjectPersistentStore {
Log.info("Writeback thread exiting"); Log.info("Writeback thread exiting");
} }
public long commitBundle(Collection<TxRecord.TxObjectRecord<?>> writes) { private long commitBundle(Collection<TxRecord.TxObjectRecord<?>> writes) {
verifyReady(); verifyReady();
_pendingBundleLock.lock(); _pendingBundleLock.lock();
try { try {

View File

@@ -1,5 +0,0 @@
package com.usatiuk.objects.transaction;
public interface TransactionFactory {
TransactionPrivate createTransaction();
}

View File

@@ -1,262 +0,0 @@
package com.usatiuk.objects.transaction;
import com.usatiuk.objects.JData;
import com.usatiuk.objects.JDataVersionedWrapper;
import com.usatiuk.objects.JObjectKey;
import com.usatiuk.objects.iterators.*;
import com.usatiuk.objects.snapshot.Snapshot;
import com.usatiuk.objects.stores.WritebackObjectPersistentStore;
import com.usatiuk.utils.ListUtils;
import io.quarkus.logging.Log;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import org.apache.commons.lang3.tuple.Pair;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import java.util.*;
@Singleton
public class TransactionFactoryImpl implements TransactionFactory {
@Inject
WritebackObjectPersistentStore writebackObjectPersistentStore;
@Override
public TransactionPrivate createTransaction() {
return new TransactionImpl();
}
private interface ReadTrackingInternalCrap {
boolean fromSource();
JData obj();
}
// FIXME:
private record ReadTrackingInternalCrapSource(JDataVersionedWrapper wrapped) implements ReadTrackingInternalCrap {
@Override
public boolean fromSource() {
return true;
}
@Override
public JData obj() {
return wrapped.data();
}
}
private record ReadTrackingInternalCrapTx(JData obj) implements ReadTrackingInternalCrap {
@Override
public boolean fromSource() {
return false;
}
}
private class TransactionImpl implements TransactionPrivate {
private final Map<JObjectKey, Optional<JDataVersionedWrapper>> _readSet = new HashMap<>();
private final NavigableMap<JObjectKey, TxRecord.TxObjectRecord<?>> _writes = new TreeMap<>();
private final List<Runnable> _onCommit = new LinkedList<>();
private final List<Runnable> _onFlush = new LinkedList<>();
private final HashSet<JObjectKey> _knownNew = new HashSet<>();
private final Snapshot<JObjectKey, JDataVersionedWrapper> _snapshot;
private boolean _closed = false;
private boolean _writeTrack = false;
private Map<JObjectKey, TxRecord.TxObjectRecord<?>> _newWrites = new HashMap<>();
private TransactionImpl() {
_snapshot = writebackObjectPersistentStore.getSnapshot();
}
@Override
public void onCommit(Runnable runnable) {
_onCommit.add(runnable);
}
@Override
public void onFlush(Runnable runnable) {
_onFlush.add(runnable);
}
@Override
public Collection<Runnable> getOnCommit() {
return Collections.unmodifiableCollection(_onCommit);
}
@Override
public Snapshot<JObjectKey, JDataVersionedWrapper> snapshot() {
return _snapshot;
}
@Override
public Collection<Runnable> getOnFlush() {
return Collections.unmodifiableCollection(_onFlush);
}
@Override
public <T extends JData> Optional<T> getFromSource(Class<T> type, JObjectKey key) {
if (_knownNew.contains(key)) {
return Optional.empty();
}
return _readSet.computeIfAbsent(key, _snapshot::readObject)
.map(JDataVersionedWrapper::data)
.map(type::cast);
}
@Override
public <T extends JData> Optional<T> get(Class<T> type, JObjectKey key) {
return switch (_writes.get(key)) {
case TxRecord.TxObjectRecordWrite<?> write -> Optional.of(type.cast(write.data()));
case TxRecord.TxObjectRecordDeleted deleted -> Optional.empty();
case null -> getFromSource(type, key);
};
}
@Override
public void delete(JObjectKey key) {
var record = new TxRecord.TxObjectRecordDeleted(key);
if (_writes.put(key, record) instanceof TxRecord.TxObjectRecordDeleted) {
return;
}
if (_writeTrack)
_newWrites.put(key, record);
}
@Override
public CloseableKvIterator<JObjectKey, JData> getIterator(IteratorStart start, JObjectKey key) {
Log.tracev("Getting tx iterator with start={0}, key={1}", start, key);
return new ReadTrackingIterator(new TombstoneSkippingIterator<JObjectKey, ReadTrackingInternalCrap>(start, key,
ListUtils.prependAndMap(
new MappingKvIterator<>(new NavigableMapKvIterator<>(_writes, start, key),
t -> switch (t) {
case TxRecord.TxObjectRecordWrite<?> write ->
new DataWrapper<ReadTrackingInternalCrap>(new ReadTrackingInternalCrapTx(write.data()));
case TxRecord.TxObjectRecordDeleted deleted ->
new TombstoneImpl<ReadTrackingInternalCrap>();
case null, default -> null;
}),
_snapshot.getIterator(start, key),
itin -> new MappingKvIterator<JObjectKey, MaybeTombstone<JDataVersionedWrapper>, MaybeTombstone<ReadTrackingInternalCrap>>(itin,
d -> switch (d) {
case Data<JDataVersionedWrapper> w ->
new DataWrapper<>(new ReadTrackingInternalCrapSource(w.value()));
case Tombstone<JDataVersionedWrapper> t -> new TombstoneImpl<>();
case null, default -> null;
}))));
}
@Override
public void put(JData obj) {
var key = obj.key();
var read = _readSet.get(key);
if (read != null && (read.map(JDataVersionedWrapper::data).orElse(null) == obj)) {
return;
}
var record = new TxRecord.TxObjectRecordWrite<>(obj);
_writes.put(key, record);
if (_writeTrack)
_newWrites.put(key, record);
}
@Override
public void putNew(JData obj) {
var key = obj.key();
_knownNew.add(key);
var record = new TxRecord.TxObjectRecordWrite<>(obj);
_writes.put(key, record);
if (_writeTrack)
_newWrites.put(key, record);
}
@Override
public Collection<TxRecord.TxObjectRecord<?>> drainNewWrites() {
if (!_writeTrack) {
_writeTrack = true;
return Collections.unmodifiableCollection(_writes.values());
}
var ret = _newWrites;
_newWrites = new HashMap<>();
return ret.values();
}
@Override
public Map<JObjectKey, Optional<JDataVersionedWrapper>> reads() {
return _readSet;
}
@Override
public Set<JObjectKey> knownNew() {
return _knownNew;
}
@Override
public void close() {
if (_closed) return;
_closed = true;
_snapshot.close();
}
private class ReadTrackingIterator implements CloseableKvIterator<JObjectKey, JData> {
private final CloseableKvIterator<JObjectKey, ReadTrackingInternalCrap> _backing;
public ReadTrackingIterator(CloseableKvIterator<JObjectKey, ReadTrackingInternalCrap> backing) {
_backing = backing;
}
@Override
public JObjectKey peekNextKey() {
return _backing.peekNextKey();
}
@Override
public void skip() {
_backing.skip();
}
@Override
public JObjectKey peekPrevKey() {
return _backing.peekPrevKey();
}
@Override
public Pair<JObjectKey, JData> prev() {
var got = _backing.prev();
if (got.getValue() instanceof ReadTrackingInternalCrapSource(JDataVersionedWrapper wrapped)) {
_readSet.putIfAbsent(got.getKey(), Optional.of(wrapped));
}
return Pair.of(got.getKey(), got.getValue().obj());
}
@Override
public boolean hasPrev() {
return _backing.hasPrev();
}
@Override
public void skipPrev() {
_backing.skipPrev();
}
@Override
public void close() {
_backing.close();
}
@Override
public boolean hasNext() {
return _backing.hasNext();
}
@Override
public Pair<JObjectKey, JData> next() {
var got = _backing.next();
if (got.getValue() instanceof ReadTrackingInternalCrapSource(JDataVersionedWrapper wrapped)) {
_readSet.putIfAbsent(got.getKey(), Optional.of(wrapped));
}
return Pair.of(got.getKey(), got.getValue().obj());
}
}
}
}

View File

@@ -1,7 +0,0 @@
package com.usatiuk.objects.transaction;
import java.util.Collection;
public interface TransactionHandlePrivate extends TransactionHandle {
Collection<Runnable> getOnFlush();
}

View File

@@ -0,0 +1,240 @@
package com.usatiuk.objects.transaction;
import com.usatiuk.objects.JData;
import com.usatiuk.objects.JDataVersionedWrapper;
import com.usatiuk.objects.JObjectKey;
import com.usatiuk.objects.iterators.*;
import com.usatiuk.objects.snapshot.Snapshot;
import com.usatiuk.utils.ListUtils;
import io.quarkus.logging.Log;
import org.apache.commons.lang3.tuple.Pair;
import java.util.*;
class TransactionImpl implements Transaction, AutoCloseable {
private final Map<JObjectKey, Optional<JDataVersionedWrapper>> _readSet = new HashMap<>();
private final NavigableMap<JObjectKey, TxRecord.TxObjectRecord<?>> _writes = new TreeMap<>();
private final List<Runnable> _onCommit = new LinkedList<>();
private final List<Runnable> _onFlush = new LinkedList<>();
private final HashSet<JObjectKey> _knownNew = new HashSet<>();
private final Snapshot<JObjectKey, JDataVersionedWrapper> _snapshot;
private boolean _closed = false;
private boolean _writeTrack = false;
private Map<JObjectKey, TxRecord.TxObjectRecord<?>> _newWrites = new HashMap<>();
private interface ReadTrackingInternalCrap {
boolean fromSource();
JData obj();
}
// FIXME:
private record ReadTrackingInternalCrapSource(JDataVersionedWrapper wrapped) implements ReadTrackingInternalCrap {
@Override
public boolean fromSource() {
return true;
}
@Override
public JData obj() {
return wrapped.data();
}
}
private record ReadTrackingInternalCrapTx(JData obj) implements ReadTrackingInternalCrap {
@Override
public boolean fromSource() {
return false;
}
}
TransactionImpl(Snapshot<JObjectKey, JDataVersionedWrapper> snapshot) {
_snapshot = snapshot;
}
@Override
public void onCommit(Runnable runnable) {
_onCommit.add(runnable);
}
@Override
public void onFlush(Runnable runnable) {
_onFlush.add(runnable);
}
Collection<Runnable> getOnCommit() {
return Collections.unmodifiableCollection(_onCommit);
}
Snapshot<JObjectKey, JDataVersionedWrapper> snapshot() {
return _snapshot;
}
Collection<Runnable> getOnFlush() {
return Collections.unmodifiableCollection(_onFlush);
}
<T extends JData> Optional<T> getFromSource(Class<T> type, JObjectKey key) {
if (_knownNew.contains(key)) {
return Optional.empty();
}
return _readSet.computeIfAbsent(key, _snapshot::readObject)
.map(JDataVersionedWrapper::data)
.map(type::cast);
}
@Override
public <T extends JData> Optional<T> get(Class<T> type, JObjectKey key) {
return switch (_writes.get(key)) {
case TxRecord.TxObjectRecordWrite<?> write -> Optional.of(type.cast(write.data()));
case TxRecord.TxObjectRecordDeleted deleted -> Optional.empty();
case null -> getFromSource(type, key);
};
}
@Override
public void delete(JObjectKey key) {
var record = new TxRecord.TxObjectRecordDeleted(key);
if (_writes.put(key, record) instanceof TxRecord.TxObjectRecordDeleted) {
return;
}
if (_writeTrack)
_newWrites.put(key, record);
}
@Override
public CloseableKvIterator<JObjectKey, JData> getIterator(IteratorStart start, JObjectKey key) {
Log.tracev("Getting tx iterator with start={0}, key={1}", start, key);
return new ReadTrackingIterator(new TombstoneSkippingIterator<JObjectKey, ReadTrackingInternalCrap>(start, key,
ListUtils.prependAndMap(
new MappingKvIterator<>(new NavigableMapKvIterator<>(_writes, start, key),
t -> switch (t) {
case TxRecord.TxObjectRecordWrite<?> write ->
new DataWrapper<ReadTrackingInternalCrap>(new ReadTrackingInternalCrapTx(write.data()));
case TxRecord.TxObjectRecordDeleted deleted ->
new TombstoneImpl<ReadTrackingInternalCrap>();
case null, default -> null;
}),
_snapshot.getIterator(start, key),
itin -> new MappingKvIterator<JObjectKey, MaybeTombstone<JDataVersionedWrapper>, MaybeTombstone<ReadTrackingInternalCrap>>(itin,
d -> switch (d) {
case Data<JDataVersionedWrapper> w ->
new DataWrapper<>(new ReadTrackingInternalCrapSource(w.value()));
case Tombstone<JDataVersionedWrapper> t -> new TombstoneImpl<>();
case null, default -> null;
}))));
}
@Override
public void put(JData obj) {
var key = obj.key();
var read = _readSet.get(key);
if (read != null && (read.map(JDataVersionedWrapper::data).orElse(null) == obj)) {
return;
}
var record = new TxRecord.TxObjectRecordWrite<>(obj);
_writes.put(key, record);
if (_writeTrack)
_newWrites.put(key, record);
}
@Override
public void putNew(JData obj) {
var key = obj.key();
_knownNew.add(key);
var record = new TxRecord.TxObjectRecordWrite<>(obj);
_writes.put(key, record);
if (_writeTrack)
_newWrites.put(key, record);
}
Collection<TxRecord.TxObjectRecord<?>> drainNewWrites() {
if (!_writeTrack) {
_writeTrack = true;
return Collections.unmodifiableCollection(_writes.values());
}
var ret = _newWrites;
_newWrites = new HashMap<>();
return ret.values();
}
Map<JObjectKey, Optional<JDataVersionedWrapper>> reads() {
return _readSet;
}
Set<JObjectKey> knownNew() {
return _knownNew;
}
@Override
public void close() {
if (_closed) return;
_closed = true;
_snapshot.close();
}
private class ReadTrackingIterator implements CloseableKvIterator<JObjectKey, JData> {
private final CloseableKvIterator<JObjectKey, ReadTrackingInternalCrap> _backing;
public ReadTrackingIterator(CloseableKvIterator<JObjectKey, ReadTrackingInternalCrap> backing) {
_backing = backing;
}
@Override
public JObjectKey peekNextKey() {
return _backing.peekNextKey();
}
@Override
public void skip() {
_backing.skip();
}
@Override
public JObjectKey peekPrevKey() {
return _backing.peekPrevKey();
}
@Override
public Pair<JObjectKey, JData> prev() {
var got = _backing.prev();
if (got.getValue() instanceof ReadTrackingInternalCrapSource(JDataVersionedWrapper wrapped)) {
_readSet.putIfAbsent(got.getKey(), Optional.of(wrapped));
}
return Pair.of(got.getKey(), got.getValue().obj());
}
@Override
public boolean hasPrev() {
return _backing.hasPrev();
}
@Override
public void skipPrev() {
_backing.skipPrev();
}
@Override
public void close() {
_backing.close();
}
@Override
public boolean hasNext() {
return _backing.hasNext();
}
@Override
public Pair<JObjectKey, JData> next() {
var got = _backing.next();
if (got.getValue() instanceof ReadTrackingInternalCrapSource(JDataVersionedWrapper wrapped)) {
_readSet.putIfAbsent(got.getKey(), Optional.of(wrapped));
}
return Pair.of(got.getKey(), got.getValue().obj());
}
}
}

View File

@@ -1,6 +1,5 @@
package com.usatiuk.objects.transaction; package com.usatiuk.objects.transaction;
import com.usatiuk.utils.VoidFn;
import io.quarkus.logging.Log; import io.quarkus.logging.Log;
import java.util.function.Supplier; import java.util.function.Supplier;
@@ -41,9 +40,9 @@ public interface TransactionManager {
} }
} }
default TransactionHandle runTries(VoidFn fn, int tries, boolean nest) { default TransactionHandle runTries(Runnable fn, int tries, boolean nest) {
if (!nest && current() != null) { if (!nest && current() != null) {
fn.apply(); fn.run();
return new TransactionHandle() { return new TransactionHandle() {
@Override @Override
public void onFlush(Runnable runnable) { public void onFlush(Runnable runnable) {
@@ -56,7 +55,7 @@ public interface TransactionManager {
begin(); begin();
boolean commit = false; boolean commit = false;
try { try {
fn.apply(); fn.run();
commit = true; commit = true;
var ret = commit(); var ret = commit();
return ret; return ret;
@@ -80,11 +79,11 @@ public interface TransactionManager {
return runTries(supplier, tries, false); return runTries(supplier, tries, false);
} }
default TransactionHandle runTries(VoidFn fn, int tries) { default TransactionHandle runTries(Runnable fn, int tries) {
return runTries(fn, tries, false); return runTries(fn, tries, false);
} }
default TransactionHandle run(VoidFn fn, boolean nest) { default TransactionHandle run(Runnable fn, boolean nest) {
return runTries(fn, 10, nest); return runTries(fn, 10, nest);
} }
@@ -92,7 +91,7 @@ public interface TransactionManager {
return runTries(supplier, 10, nest); return runTries(supplier, 10, nest);
} }
default TransactionHandle run(VoidFn fn) { default TransactionHandle run(Runnable fn) {
return run(fn, false); return run(fn, false);
} }
@@ -100,7 +99,7 @@ public interface TransactionManager {
return run(supplier, false); return run(supplier, false);
} }
default void executeTx(VoidFn fn) { default void executeTx(Runnable fn) {
run(fn, false); run(fn, false);
} }

View File

@@ -11,14 +11,14 @@ import java.util.Stack;
@Singleton @Singleton
public class TransactionManagerImpl implements TransactionManager { public class TransactionManagerImpl implements TransactionManager {
private static final ThreadLocal<Stack<TransactionPrivate>> _currentTransaction = ThreadLocal.withInitial(Stack::new); private static final ThreadLocal<Stack<TransactionImpl>> _currentTransaction = ThreadLocal.withInitial(Stack::new);
@Inject @Inject
JObjectManager jObjectManager; TransactionService transactionService;
@Override @Override
public void begin() { public void begin() {
Log.trace("Starting transaction"); Log.trace("Starting transaction");
var tx = jObjectManager.createTransaction(); var tx = transactionService.createTransaction();
_currentTransaction.get().push(tx); _currentTransaction.get().push(tx);
} }
@@ -34,7 +34,7 @@ public class TransactionManagerImpl implements TransactionManager {
Pair<Collection<Runnable>, TransactionHandle> ret; Pair<Collection<Runnable>, TransactionHandle> ret;
try { try {
ret = jObjectManager.commit(peeked); ret = transactionService.commit(peeked);
} catch (Throwable e) { } catch (Throwable e) {
Log.trace("Transaction commit failed", e); Log.trace("Transaction commit failed", e);
throw e; throw e;
@@ -64,7 +64,7 @@ public class TransactionManagerImpl implements TransactionManager {
var peeked = stack.peek(); var peeked = stack.peek();
try { try {
jObjectManager.rollback(peeked); transactionService.rollback(peeked);
} catch (Throwable e) { } catch (Throwable e) {
Log.error("Transaction rollback failed", e); Log.error("Transaction rollback failed", e);
throw e; throw e;

View File

@@ -1,27 +0,0 @@
package com.usatiuk.objects.transaction;
import com.usatiuk.objects.JData;
import com.usatiuk.objects.JDataVersionedWrapper;
import com.usatiuk.objects.JObjectKey;
import com.usatiuk.objects.snapshot.Snapshot;
import com.usatiuk.utils.AutoCloseableNoThrow;
import java.util.Collection;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
// The transaction interface actually used by user code to retrieve objects
public interface TransactionPrivate extends Transaction, TransactionHandlePrivate, AutoCloseableNoThrow {
Collection<TxRecord.TxObjectRecord<?>> drainNewWrites();
Map<JObjectKey, Optional<JDataVersionedWrapper>> reads();
Set<JObjectKey> knownNew();
<T extends JData> Optional<T> getFromSource(Class<T> type, JObjectKey key);
Collection<Runnable> getOnCommit();
Snapshot<JObjectKey, JDataVersionedWrapper> snapshot();
}

View File

@@ -22,12 +22,10 @@ import java.util.function.Consumer;
import java.util.function.Function; import java.util.function.Function;
@ApplicationScoped @ApplicationScoped
public class JObjectManager { public class TransactionService {
private static final List<PreCommitTxHook> _preCommitTxHooks; private static final List<PreCommitTxHook> _preCommitTxHooks;
@Inject @Inject
WritebackObjectPersistentStore writebackObjectPersistentStore; WritebackObjectPersistentStore writebackObjectPersistentStore;
@Inject
TransactionFactory transactionFactory;
private boolean _ready = false; private boolean _ready = false;
private final DataLocker _objLocker = new DataLocker(); private final DataLocker _objLocker = new DataLocker();
@@ -36,7 +34,7 @@ public class JObjectManager {
_preCommitTxHooks = List.copyOf(CDI.current().select(PreCommitTxHook.class).stream().sorted(Comparator.comparingInt(PreCommitTxHook::getPriority)).toList()); _preCommitTxHooks = List.copyOf(CDI.current().select(PreCommitTxHook.class).stream().sorted(Comparator.comparingInt(PreCommitTxHook::getPriority)).toList());
} }
JObjectManager(Instance<PreCommitTxHook> preCommitTxHooks) { TransactionService(Instance<PreCommitTxHook> preCommitTxHooks) {
Log.debugv("Pre-commit hooks: {0}", String.join("->", _preCommitTxHooks.stream().map(Objects::toString).toList())); Log.debugv("Pre-commit hooks: {0}", String.join("->", _preCommitTxHooks.stream().map(Objects::toString).toList()));
} }
@@ -48,14 +46,14 @@ public class JObjectManager {
_ready = true; _ready = true;
} }
public TransactionPrivate createTransaction() { public TransactionImpl createTransaction() {
verifyReady(); verifyReady();
var tx = transactionFactory.createTransaction(); var tx = new TransactionImpl(writebackObjectPersistentStore.getSnapshot());
Log.tracev("Created transaction with snapshotId={0}", tx.snapshot().id()); Log.tracev("Created transaction with snapshotId={0}", tx.snapshot().id());
return tx; return tx;
} }
public Pair<Collection<Runnable>, TransactionHandle> commit(TransactionPrivate tx) { public Pair<Collection<Runnable>, TransactionHandle> commit(TransactionImpl tx) {
verifyReady(); verifyReady();
var writes = new HashMap<JObjectKey, TxRecord.TxObjectRecord<?>>(); var writes = new HashMap<JObjectKey, TxRecord.TxObjectRecord<?>>();
Snapshot<JObjectKey, JDataVersionedWrapper> commitSnapshot = null; Snapshot<JObjectKey, JDataVersionedWrapper> commitSnapshot = null;
@@ -259,7 +257,7 @@ public class JObjectManager {
} }
} }
public void rollback(TransactionPrivate tx) { public void rollback(TransactionImpl tx) {
verifyReady(); verifyReady();
tx.close(); tx.close();
} }

View File

@@ -22,7 +22,6 @@
<properties> <properties>
<compiler-plugin.version>3.12.1</compiler-plugin.version> <compiler-plugin.version>3.12.1</compiler-plugin.version>
<!--FIXME--> <!--FIXME-->
<maven.compiler.release></maven.compiler.release>
<maven.compiler.source>21</maven.compiler.source> <maven.compiler.source>21</maven.compiler.source>
<maven.compiler.target>21</maven.compiler.target> <maven.compiler.target>21</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
@@ -87,6 +86,19 @@
<build> <build>
<plugins> <plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-javadoc-plugin</artifactId>
<version>3.11.2</version>
<configuration>
<additionalOptions>
--add-exports java.base/sun.nio.ch=ALL-UNNAMED
--add-exports java.base/jdk.internal.access=ALL-UNNAMED
--add-opens=java.base/java.nio=ALL-UNNAMED
--enable-preview
</additionalOptions>
</configuration>
</plugin>
<plugin> <plugin>
<groupId>${quarkus.platform.group-id}</groupId> <groupId>${quarkus.platform.group-id}</groupId>
<artifactId>quarkus-maven-plugin</artifactId> <artifactId>quarkus-maven-plugin</artifactId>

View File

@@ -1,7 +0,0 @@
package com.usatiuk.utils;
@FunctionalInterface
public interface VoidFn {
void apply();
}

View File

@@ -1,7 +0,0 @@
package com.usatiuk.utils;
@FunctionalInterface
public interface VoidFnThrows {
void apply() throws Throwable;
}