mirror of
https://github.com/usatiuk/dhfs.git
synced 2025-10-29 04:57:48 +01:00
Compare commits
12 Commits
469a6b9011
...
javadoc-pu
| Author | SHA1 | Date | |
|---|---|---|---|
| 7902d9e486 | |||
| 7c056b9674 | |||
| 8cc040b234 | |||
| f8375c9cd8 | |||
| 0c3524851e | |||
| 3eb7164c0f | |||
| f544a67fb5 | |||
| 964b3da951 | |||
| cb33472dc5 | |||
| de211bb2d2 | |||
| 56ab3bad4c | |||
| 9403556220 |
45
.github/workflows/server.yml
vendored
45
.github/workflows/server.yml
vendored
@@ -43,11 +43,11 @@ jobs:
|
|||||||
distribution: "zulu"
|
distribution: "zulu"
|
||||||
cache: maven
|
cache: maven
|
||||||
|
|
||||||
- name: Build LazyFS
|
# - name: Build LazyFS
|
||||||
run: cd thirdparty/lazyfs/ && ./build.sh
|
# run: cd thirdparty/lazyfs/ && ./build.sh
|
||||||
|
|
||||||
- name: Test with Maven
|
- name: Test with Maven
|
||||||
run: cd dhfs-parent && mvn -T $(nproc) --batch-mode --update-snapshots package verify
|
run: cd dhfs-parent && mvn -T $(nproc) --batch-mode --update-snapshots javadoc:aggregate
|
||||||
|
|
||||||
# - name: Build with Maven
|
# - name: Build with Maven
|
||||||
# run: cd dhfs-parent && mvn --batch-mode --update-snapshots package # -Dquarkus.log.category.\"com.usatiuk.dhfs\".min-level=DEBUG
|
# run: cd dhfs-parent && mvn --batch-mode --update-snapshots package # -Dquarkus.log.category.\"com.usatiuk.dhfs\".min-level=DEBUG
|
||||||
@@ -57,6 +57,11 @@ jobs:
|
|||||||
name: DHFS Server Package
|
name: DHFS Server Package
|
||||||
path: dhfs-parent/dhfs-fuse/target/quarkus-app
|
path: dhfs-parent/dhfs-fuse/target/quarkus-app
|
||||||
|
|
||||||
|
- uses: actions/upload-artifact@v4
|
||||||
|
with:
|
||||||
|
name: DHFS Javadocs
|
||||||
|
path: dhfs-parent/target/reports/apidocs/
|
||||||
|
|
||||||
- uses: actions/upload-artifact@v4
|
- uses: actions/upload-artifact@v4
|
||||||
if: ${{ always() }}
|
if: ${{ always() }}
|
||||||
with:
|
with:
|
||||||
@@ -231,3 +236,37 @@ jobs:
|
|||||||
with:
|
with:
|
||||||
name: Run wrapper
|
name: Run wrapper
|
||||||
path: ~/run-wrapper.tar.gz
|
path: ~/run-wrapper.tar.gz
|
||||||
|
|
||||||
|
publish-javadoc:
|
||||||
|
environment:
|
||||||
|
name: github-pages
|
||||||
|
url: ${{ steps.deployment.outputs.page_url }}
|
||||||
|
runs-on: ubuntu-latest
|
||||||
|
permissions:
|
||||||
|
contents: read
|
||||||
|
pages: write
|
||||||
|
id-token: write
|
||||||
|
|
||||||
|
needs: [build-webui, build-dhfs]
|
||||||
|
|
||||||
|
steps:
|
||||||
|
- name: Checkout repository
|
||||||
|
uses: actions/checkout@v4
|
||||||
|
|
||||||
|
- uses: actions/download-artifact@v4
|
||||||
|
with:
|
||||||
|
name: DHFS Javadocs
|
||||||
|
path: dhfs-javadocs-downloaded
|
||||||
|
|
||||||
|
- name: Setup Pages
|
||||||
|
uses: actions/configure-pages@v5
|
||||||
|
- name: Upload artifact
|
||||||
|
uses: actions/upload-pages-artifact@v3
|
||||||
|
with:
|
||||||
|
# Upload entire repository
|
||||||
|
path: 'dhfs-javadocs-downloaded'
|
||||||
|
- name: Deploy to GitHub Pages
|
||||||
|
id: deployment
|
||||||
|
uses: actions/deploy-pages@v4
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
2
dhfs-parent/.gitignore
vendored
2
dhfs-parent/.gitignore
vendored
@@ -41,3 +41,5 @@ nb-configuration.xml
|
|||||||
|
|
||||||
# Plugin directory
|
# Plugin directory
|
||||||
/.quarkus/cli/plugins/
|
/.quarkus/cli/plugins/
|
||||||
|
|
||||||
|
.jqwik-database
|
||||||
@@ -5,6 +5,11 @@ import com.usatiuk.dhfs.remoteobj.JDataRemote;
|
|||||||
import com.usatiuk.dhfs.remoteobj.JDataRemoteDto;
|
import com.usatiuk.dhfs.remoteobj.JDataRemoteDto;
|
||||||
import com.usatiuk.objects.JObjectKey;
|
import com.usatiuk.objects.JObjectKey;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* ChunkData is a data structure that represents an immutable binary blob
|
||||||
|
* @param key unique key
|
||||||
|
* @param data binary data
|
||||||
|
*/
|
||||||
public record ChunkData(JObjectKey key, ByteString data) implements JDataRemote, JDataRemoteDto {
|
public record ChunkData(JObjectKey key, ByteString data) implements JDataRemote, JDataRemoteDto {
|
||||||
@Override
|
@Override
|
||||||
public int estimateSize() {
|
public int estimateSize() {
|
||||||
|
|||||||
@@ -1,26 +0,0 @@
|
|||||||
package com.usatiuk.dhfsfs.objects;
|
|
||||||
|
|
||||||
import com.usatiuk.dhfs.ProtoSerializer;
|
|
||||||
import com.usatiuk.dhfs.persistence.ChunkDataP;
|
|
||||||
import com.usatiuk.dhfs.persistence.JObjectKeyP;
|
|
||||||
import com.usatiuk.objects.JObjectKey;
|
|
||||||
import jakarta.inject.Singleton;
|
|
||||||
|
|
||||||
@Singleton
|
|
||||||
public class ChunkDataProtoSerializer implements ProtoSerializer<ChunkDataP, ChunkData> {
|
|
||||||
@Override
|
|
||||||
public ChunkData deserialize(ChunkDataP message) {
|
|
||||||
return new ChunkData(
|
|
||||||
JObjectKey.of(message.getKey().getName()),
|
|
||||||
message.getData()
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public ChunkDataP serialize(ChunkData object) {
|
|
||||||
return ChunkDataP.newBuilder()
|
|
||||||
.setKey(JObjectKeyP.newBuilder().setName(object.key().value()).build())
|
|
||||||
.setData(object.data())
|
|
||||||
.build();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -9,6 +9,14 @@ import com.usatiuk.objects.JObjectKey;
|
|||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* File is a data structure that represents a file in the file system
|
||||||
|
* @param key unique key
|
||||||
|
* @param mode file mode
|
||||||
|
* @param cTime creation time
|
||||||
|
* @param mTime modification time
|
||||||
|
* @param symlink true if the file is a symlink, false otherwise
|
||||||
|
*/
|
||||||
public record File(JObjectKey key, long mode, long cTime, long mTime,
|
public record File(JObjectKey key, long mode, long cTime, long mTime,
|
||||||
boolean symlink
|
boolean symlink
|
||||||
) implements JDataRemote, JMapHolder<JMapLongKey> {
|
) implements JDataRemote, JMapHolder<JMapLongKey> {
|
||||||
|
|||||||
@@ -7,6 +7,11 @@ import org.apache.commons.lang3.tuple.Pair;
|
|||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* FileDto is a data transfer object that contains a file and its chunks.
|
||||||
|
* @param file the file
|
||||||
|
* @param chunks the list of chunks, each represented as a pair of a long and a JObjectKey
|
||||||
|
*/
|
||||||
public record FileDto(File file, List<Pair<Long, JObjectKey>> chunks) implements JDataRemoteDto {
|
public record FileDto(File file, List<Pair<Long, JObjectKey>> chunks) implements JDataRemoteDto {
|
||||||
@Override
|
@Override
|
||||||
public Class<? extends JDataRemote> objClass() {
|
public Class<? extends JDataRemote> objClass() {
|
||||||
|
|||||||
@@ -5,6 +5,9 @@ import com.usatiuk.dhfs.syncmap.DtoMapper;
|
|||||||
import jakarta.enterprise.context.ApplicationScoped;
|
import jakarta.enterprise.context.ApplicationScoped;
|
||||||
import jakarta.inject.Inject;
|
import jakarta.inject.Inject;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Maps a {@link File} object to a {@link FileDto} object and vice versa.
|
||||||
|
*/
|
||||||
@ApplicationScoped
|
@ApplicationScoped
|
||||||
public class FileDtoMapper implements DtoMapper<File, FileDto> {
|
public class FileDtoMapper implements DtoMapper<File, FileDto> {
|
||||||
@Inject
|
@Inject
|
||||||
|
|||||||
@@ -10,11 +10,20 @@ import org.apache.commons.lang3.tuple.Pair;
|
|||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Helper class for working with files.
|
||||||
|
*/
|
||||||
@ApplicationScoped
|
@ApplicationScoped
|
||||||
public class FileHelper {
|
public class FileHelper {
|
||||||
@Inject
|
@Inject
|
||||||
JMapHelper jMapHelper;
|
JMapHelper jMapHelper;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the chunks of a file.
|
||||||
|
* Transaction is expected to be already started.
|
||||||
|
* @param file the file to get chunks from
|
||||||
|
* @return a list of pairs of chunk offset and chunk key
|
||||||
|
*/
|
||||||
public List<Pair<Long, JObjectKey>> getChunks(File file) {
|
public List<Pair<Long, JObjectKey>> getChunks(File file) {
|
||||||
ArrayList<Pair<Long, JObjectKey>> chunks = new ArrayList<>();
|
ArrayList<Pair<Long, JObjectKey>> chunks = new ArrayList<>();
|
||||||
try (var it = jMapHelper.getIterator(file)) {
|
try (var it = jMapHelper.getIterator(file)) {
|
||||||
@@ -26,6 +35,13 @@ public class FileHelper {
|
|||||||
return List.copyOf(chunks);
|
return List.copyOf(chunks);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Replace the chunks of a file.
|
||||||
|
* All previous chunks will be deleted.
|
||||||
|
* Transaction is expected to be already started.
|
||||||
|
* @param file the file to replace chunks in
|
||||||
|
* @param chunks the list of pairs of chunk offset and chunk key
|
||||||
|
*/
|
||||||
public void replaceChunks(File file, List<Pair<Long, JObjectKey>> chunks) {
|
public void replaceChunks(File file, List<Pair<Long, JObjectKey>> chunks) {
|
||||||
jMapHelper.deleteAll(file);
|
jMapHelper.deleteAll(file);
|
||||||
|
|
||||||
|
|||||||
@@ -1,25 +0,0 @@
|
|||||||
package com.usatiuk.dhfsfs.objects;
|
|
||||||
|
|
||||||
import com.usatiuk.dhfs.ProtoSerializer;
|
|
||||||
import com.usatiuk.dhfs.persistence.FileDtoP;
|
|
||||||
import com.usatiuk.utils.SerializationHelper;
|
|
||||||
import jakarta.inject.Singleton;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
|
|
||||||
@Singleton
|
|
||||||
public class FileProtoSerializer implements ProtoSerializer<FileDtoP, FileDto> {
|
|
||||||
@Override
|
|
||||||
public FileDto deserialize(FileDtoP message) {
|
|
||||||
try (var is = message.getSerializedData().newInput()) {
|
|
||||||
return SerializationHelper.deserialize(is);
|
|
||||||
} catch (IOException e) {
|
|
||||||
throw new RuntimeException(e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public FileDtoP serialize(FileDto object) {
|
|
||||||
return FileDtoP.newBuilder().setSerializedData(SerializationHelper.serialize(object)).build();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -23,6 +23,9 @@ import javax.annotation.Nullable;
|
|||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Handles synchronization of file objects.
|
||||||
|
*/
|
||||||
@ApplicationScoped
|
@ApplicationScoped
|
||||||
public class FileSyncHandler implements ObjSyncHandler<File, FileDto> {
|
public class FileSyncHandler implements ObjSyncHandler<File, FileDto> {
|
||||||
@Inject
|
@Inject
|
||||||
@@ -45,6 +48,14 @@ public class FileSyncHandler implements ObjSyncHandler<File, FileDto> {
|
|||||||
return jKleppmannTreeManager.getTree(JObjectKey.of("fs")).orElseThrow();
|
return jKleppmannTreeManager.getTree(JObjectKey.of("fs")).orElseThrow();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Resolve conflict between two file versions, update the file in storage and create a conflict file.
|
||||||
|
*
|
||||||
|
* @param from the peer that sent the update
|
||||||
|
* @param key the key of the file
|
||||||
|
* @param receivedChangelog the changelog of the received file
|
||||||
|
* @param receivedData the received file data
|
||||||
|
*/
|
||||||
private void resolveConflict(PeerId from, JObjectKey key, PMap<PeerId, Long> receivedChangelog,
|
private void resolveConflict(PeerId from, JObjectKey key, PMap<PeerId, Long> receivedChangelog,
|
||||||
@Nullable FileDto receivedData) {
|
@Nullable FileDto receivedData) {
|
||||||
var oursCurMeta = curTx.get(RemoteObjectMeta.class, key).orElse(null);
|
var oursCurMeta = curTx.get(RemoteObjectMeta.class, key).orElse(null);
|
||||||
|
|||||||
@@ -6,6 +6,10 @@ import com.usatiuk.objects.JObjectKey;
|
|||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* JKleppmannTreeNodeMetaDirectory is a record that represents a directory in the JKleppmann tree.
|
||||||
|
* @param name the name of the directory
|
||||||
|
*/
|
||||||
public record JKleppmannTreeNodeMetaDirectory(String name) implements JKleppmannTreeNodeMeta {
|
public record JKleppmannTreeNodeMetaDirectory(String name) implements JKleppmannTreeNodeMeta {
|
||||||
public JKleppmannTreeNodeMeta withName(String name) {
|
public JKleppmannTreeNodeMeta withName(String name) {
|
||||||
return new JKleppmannTreeNodeMetaDirectory(name);
|
return new JKleppmannTreeNodeMetaDirectory(name);
|
||||||
|
|||||||
@@ -6,6 +6,11 @@ import com.usatiuk.objects.JObjectKey;
|
|||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* JKleppmannTreeNodeMetaFile is a record that represents a file in the JKleppmann tree.
|
||||||
|
* @param name the name of the file
|
||||||
|
* @param fileIno a reference to the `File` object
|
||||||
|
*/
|
||||||
public record JKleppmannTreeNodeMetaFile(String name, JObjectKey fileIno) implements JKleppmannTreeNodeMeta {
|
public record JKleppmannTreeNodeMetaFile(String name, JObjectKey fileIno) implements JKleppmannTreeNodeMeta {
|
||||||
@Override
|
@Override
|
||||||
public JKleppmannTreeNodeMeta withName(String name) {
|
public JKleppmannTreeNodeMeta withName(String name) {
|
||||||
|
|||||||
@@ -39,6 +39,9 @@ import java.nio.file.Path;
|
|||||||
import java.util.*;
|
import java.util.*;
|
||||||
import java.util.stream.StreamSupport;
|
import java.util.stream.StreamSupport;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Actual filesystem implementation.
|
||||||
|
*/
|
||||||
@ApplicationScoped
|
@ApplicationScoped
|
||||||
public class DhfsFileService {
|
public class DhfsFileService {
|
||||||
@ConfigProperty(name = "dhfs.files.target_chunk_alignment")
|
@ConfigProperty(name = "dhfs.files.target_chunk_alignment")
|
||||||
@@ -71,6 +74,12 @@ public class DhfsFileService {
|
|||||||
return jKleppmannTreeManager.getTree(JObjectKey.of("fs"), () -> new JKleppmannTreeNodeMetaDirectory(""));
|
return jKleppmannTreeManager.getTree(JObjectKey.of("fs"), () -> new JKleppmannTreeNodeMetaDirectory(""));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a new chunk with the given data and a new unique ID.
|
||||||
|
*
|
||||||
|
* @param bytes the data to store in the chunk
|
||||||
|
* @return the created chunk
|
||||||
|
*/
|
||||||
private ChunkData createChunk(ByteString bytes) {
|
private ChunkData createChunk(ByteString bytes) {
|
||||||
var newChunk = new ChunkData(JObjectKey.of(UUID.randomUUID().toString()), bytes);
|
var newChunk = new ChunkData(JObjectKey.of(UUID.randomUUID().toString()), bytes);
|
||||||
remoteTx.putDataNew(newChunk);
|
remoteTx.putDataNew(newChunk);
|
||||||
@@ -82,14 +91,7 @@ public class DhfsFileService {
|
|||||||
getTree();
|
getTree();
|
||||||
}
|
}
|
||||||
|
|
||||||
private JKleppmannTreeNode getDirEntryW(String name) {
|
private JKleppmannTreeNode getDirEntry(String name) {
|
||||||
var res = getTree().traverse(StreamSupport.stream(Path.of(name).spliterator(), false).map(p -> p.toString()).toList());
|
|
||||||
if (res == null) throw new StatusRuntimeExceptionNoStacktrace(Status.NOT_FOUND);
|
|
||||||
var ret = curTx.get(JKleppmannTreeNodeHolder.class, res).map(JKleppmannTreeNodeHolder::node).orElseThrow(() -> new StatusRuntimeException(Status.NOT_FOUND.withDescription("Tree node exists but not found as jObject: " + name)));
|
|
||||||
return ret;
|
|
||||||
}
|
|
||||||
|
|
||||||
private JKleppmannTreeNode getDirEntryR(String name) {
|
|
||||||
var res = getTree().traverse(StreamSupport.stream(Path.of(name).spliterator(), false).map(p -> p.toString()).toList());
|
var res = getTree().traverse(StreamSupport.stream(Path.of(name).spliterator(), false).map(p -> p.toString()).toList());
|
||||||
if (res == null) throw new StatusRuntimeExceptionNoStacktrace(Status.NOT_FOUND);
|
if (res == null) throw new StatusRuntimeExceptionNoStacktrace(Status.NOT_FOUND);
|
||||||
var ret = curTx.get(JKleppmannTreeNodeHolder.class, res).map(JKleppmannTreeNodeHolder::node).orElseThrow(() -> new StatusRuntimeException(Status.NOT_FOUND.withDescription("Tree node exists but not found as jObject: " + name)));
|
var ret = curTx.get(JKleppmannTreeNodeHolder.class, res).map(JKleppmannTreeNodeHolder::node).orElseThrow(() -> new StatusRuntimeException(Status.NOT_FOUND.withDescription("Tree node exists but not found as jObject: " + name)));
|
||||||
@@ -103,6 +105,11 @@ public class DhfsFileService {
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the attributes of a file or directory.
|
||||||
|
* @param uuid the UUID of the file or directory
|
||||||
|
* @return the attributes of the file or directory
|
||||||
|
*/
|
||||||
public Optional<GetattrRes> getattr(JObjectKey uuid) {
|
public Optional<GetattrRes> getattr(JObjectKey uuid) {
|
||||||
return jObjectTxManager.executeTx(() -> {
|
return jObjectTxManager.executeTx(() -> {
|
||||||
var ref = curTx.get(JData.class, uuid).orElse(null);
|
var ref = curTx.get(JData.class, uuid).orElse(null);
|
||||||
@@ -124,10 +131,15 @@ public class DhfsFileService {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Try to resolve a path to a file or directory.
|
||||||
|
* @param name the path to resolve
|
||||||
|
* @return the key of the file or directory, or an empty optional if it does not exist
|
||||||
|
*/
|
||||||
public Optional<JObjectKey> open(String name) {
|
public Optional<JObjectKey> open(String name) {
|
||||||
return jObjectTxManager.executeTx(() -> {
|
return jObjectTxManager.executeTx(() -> {
|
||||||
try {
|
try {
|
||||||
var ret = getDirEntryR(name);
|
var ret = getDirEntry(name);
|
||||||
return switch (ret.meta()) {
|
return switch (ret.meta()) {
|
||||||
case JKleppmannTreeNodeMetaFile f -> Optional.of(f.fileIno());
|
case JKleppmannTreeNodeMetaFile f -> Optional.of(f.fileIno());
|
||||||
case JKleppmannTreeNodeMetaDirectory f -> Optional.of(ret.key());
|
case JKleppmannTreeNodeMetaDirectory f -> Optional.of(ret.key());
|
||||||
@@ -147,10 +159,16 @@ public class DhfsFileService {
|
|||||||
throw new StatusRuntimeExceptionNoStacktrace(Status.INVALID_ARGUMENT.withDescription("Not a directory: " + entry.key()));
|
throw new StatusRuntimeExceptionNoStacktrace(Status.INVALID_ARGUMENT.withDescription("Not a directory: " + entry.key()));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a new file with the given name and mode.
|
||||||
|
* @param name the name of the file
|
||||||
|
* @param mode the mode of the file
|
||||||
|
* @return the key of the created file
|
||||||
|
*/
|
||||||
public Optional<JObjectKey> create(String name, long mode) {
|
public Optional<JObjectKey> create(String name, long mode) {
|
||||||
return jObjectTxManager.executeTx(() -> {
|
return jObjectTxManager.executeTx(() -> {
|
||||||
Path path = Path.of(name);
|
Path path = Path.of(name);
|
||||||
var parent = getDirEntryW(path.getParent().toString());
|
var parent = getDirEntry(path.getParent().toString());
|
||||||
|
|
||||||
ensureDir(parent);
|
ensureDir(parent);
|
||||||
|
|
||||||
@@ -171,9 +189,14 @@ public class DhfsFileService {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
//FIXME: Slow..
|
/**
|
||||||
|
* Get the parent directory of a file or directory.
|
||||||
|
* @param ino the key of the file or directory
|
||||||
|
* @return the parent directory
|
||||||
|
*/
|
||||||
public Pair<String, JObjectKey> inoToParent(JObjectKey ino) {
|
public Pair<String, JObjectKey> inoToParent(JObjectKey ino) {
|
||||||
return jObjectTxManager.executeTx(() -> {
|
return jObjectTxManager.executeTx(() -> {
|
||||||
|
// FIXME: Slow
|
||||||
return getTree().findParent(w -> {
|
return getTree().findParent(w -> {
|
||||||
if (w.meta() instanceof JKleppmannTreeNodeMetaFile f)
|
if (w.meta() instanceof JKleppmannTreeNodeMetaFile f)
|
||||||
return f.fileIno().equals(ino);
|
return f.fileIno().equals(ino);
|
||||||
@@ -182,20 +205,31 @@ public class DhfsFileService {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a new directory with the given name and mode.
|
||||||
|
* @param name the name of the directory
|
||||||
|
* @param mode the mode of the directory
|
||||||
|
*/
|
||||||
public void mkdir(String name, long mode) {
|
public void mkdir(String name, long mode) {
|
||||||
jObjectTxManager.executeTx(() -> {
|
jObjectTxManager.executeTx(() -> {
|
||||||
Path path = Path.of(name);
|
Path path = Path.of(name);
|
||||||
var parent = getDirEntryW(path.getParent().toString());
|
var parent = getDirEntry(path.getParent().toString());
|
||||||
ensureDir(parent);
|
ensureDir(parent);
|
||||||
|
|
||||||
String dname = path.getFileName().toString();
|
String dname = path.getFileName().toString();
|
||||||
|
|
||||||
Log.debug("Creating directory " + name);
|
Log.debug("Creating directory " + name);
|
||||||
|
|
||||||
|
// TODO: No modes for directories yet
|
||||||
getTree().move(parent.key(), new JKleppmannTreeNodeMetaDirectory(dname), getTree().getNewNodeId());
|
getTree().move(parent.key(), new JKleppmannTreeNodeMetaDirectory(dname), getTree().getNewNodeId());
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Unlink a file or directory.
|
||||||
|
* @param name the name of the file or directory
|
||||||
|
* @throws DirectoryNotEmptyException if the directory is not empty and recursive delete is not allowed
|
||||||
|
*/
|
||||||
public void unlink(String name) {
|
public void unlink(String name) {
|
||||||
jObjectTxManager.executeTx(() -> {
|
jObjectTxManager.executeTx(() -> {
|
||||||
var node = getDirEntryOpt(name).orElse(null);
|
var node = getDirEntryOpt(name).orElse(null);
|
||||||
@@ -209,13 +243,19 @@ public class DhfsFileService {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Rename a file or directory.
|
||||||
|
* @param from the old name
|
||||||
|
* @param to the new name
|
||||||
|
* @return true if the rename was successful, false otherwise
|
||||||
|
*/
|
||||||
public Boolean rename(String from, String to) {
|
public Boolean rename(String from, String to) {
|
||||||
return jObjectTxManager.executeTx(() -> {
|
return jObjectTxManager.executeTx(() -> {
|
||||||
var node = getDirEntryW(from);
|
var node = getDirEntry(from);
|
||||||
JKleppmannTreeNodeMeta meta = node.meta();
|
JKleppmannTreeNodeMeta meta = node.meta();
|
||||||
|
|
||||||
var toPath = Path.of(to);
|
var toPath = Path.of(to);
|
||||||
var toDentry = getDirEntryW(toPath.getParent().toString());
|
var toDentry = getDirEntry(toPath.getParent().toString());
|
||||||
ensureDir(toDentry);
|
ensureDir(toDentry);
|
||||||
|
|
||||||
getTree().move(toDentry.key(), meta.withName(toPath.getFileName().toString()), node.key());
|
getTree().move(toDentry.key(), meta.withName(toPath.getFileName().toString()), node.key());
|
||||||
@@ -223,6 +263,12 @@ public class DhfsFileService {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Change the mode of a file or directory.
|
||||||
|
* @param uuid the ID of the file or directory
|
||||||
|
* @param mode the new mode
|
||||||
|
* @return true if the mode was changed successfully, false otherwise
|
||||||
|
*/
|
||||||
public Boolean chmod(JObjectKey uuid, long mode) {
|
public Boolean chmod(JObjectKey uuid, long mode) {
|
||||||
return jObjectTxManager.executeTx(() -> {
|
return jObjectTxManager.executeTx(() -> {
|
||||||
var dent = curTx.get(JData.class, uuid).orElseThrow(() -> new StatusRuntimeExceptionNoStacktrace(Status.NOT_FOUND));
|
var dent = curTx.get(JData.class, uuid).orElseThrow(() -> new StatusRuntimeExceptionNoStacktrace(Status.NOT_FOUND));
|
||||||
@@ -243,9 +289,14 @@ public class DhfsFileService {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Read the contents of a directory.
|
||||||
|
* @param name the path of the directory
|
||||||
|
* @return an iterable of the names of the files in the directory
|
||||||
|
*/
|
||||||
public Iterable<String> readDir(String name) {
|
public Iterable<String> readDir(String name) {
|
||||||
return jObjectTxManager.executeTx(() -> {
|
return jObjectTxManager.executeTx(() -> {
|
||||||
var found = getDirEntryW(name);
|
var found = getDirEntry(name);
|
||||||
|
|
||||||
if (!(found.meta() instanceof JKleppmannTreeNodeMetaDirectory md))
|
if (!(found.meta() instanceof JKleppmannTreeNodeMetaDirectory md))
|
||||||
throw new StatusRuntimeException(Status.INVALID_ARGUMENT);
|
throw new StatusRuntimeException(Status.INVALID_ARGUMENT);
|
||||||
@@ -254,6 +305,13 @@ public class DhfsFileService {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Read the contents of a file.
|
||||||
|
* @param fileUuid the ID of the file
|
||||||
|
* @param offset the offset to start reading from
|
||||||
|
* @param length the number of bytes to read
|
||||||
|
* @return the contents of the file as a ByteString
|
||||||
|
*/
|
||||||
public ByteString read(JObjectKey fileUuid, long offset, int length) {
|
public ByteString read(JObjectKey fileUuid, long offset, int length) {
|
||||||
return jObjectTxManager.executeTx(() -> {
|
return jObjectTxManager.executeTx(() -> {
|
||||||
if (length < 0)
|
if (length < 0)
|
||||||
@@ -315,6 +373,11 @@ public class DhfsFileService {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the size of a file.
|
||||||
|
* @param uuid the ID of the file
|
||||||
|
* @return the size of the file
|
||||||
|
*/
|
||||||
private ByteString readChunk(JObjectKey uuid) {
|
private ByteString readChunk(JObjectKey uuid) {
|
||||||
var chunkRead = remoteTx.getData(ChunkData.class, uuid).orElse(null);
|
var chunkRead = remoteTx.getData(ChunkData.class, uuid).orElse(null);
|
||||||
|
|
||||||
@@ -326,6 +389,11 @@ public class DhfsFileService {
|
|||||||
return chunkRead.data();
|
return chunkRead.data();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the size of a chunk.
|
||||||
|
* @param uuid the ID of the chunk
|
||||||
|
* @return the size of the chunk
|
||||||
|
*/
|
||||||
private int getChunkSize(JObjectKey uuid) {
|
private int getChunkSize(JObjectKey uuid) {
|
||||||
return readChunk(uuid).size();
|
return readChunk(uuid).size();
|
||||||
}
|
}
|
||||||
@@ -334,6 +402,13 @@ public class DhfsFileService {
|
|||||||
return num & -(1L << n);
|
return num & -(1L << n);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Write data to a file.
|
||||||
|
* @param fileUuid the ID of the file
|
||||||
|
* @param offset the offset to write to
|
||||||
|
* @param data the data to write
|
||||||
|
* @return the number of bytes written
|
||||||
|
*/
|
||||||
public Long write(JObjectKey fileUuid, long offset, ByteString data) {
|
public Long write(JObjectKey fileUuid, long offset, ByteString data) {
|
||||||
return jObjectTxManager.executeTx(() -> {
|
return jObjectTxManager.executeTx(() -> {
|
||||||
if (offset < 0)
|
if (offset < 0)
|
||||||
@@ -436,6 +511,12 @@ public class DhfsFileService {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Truncate a file to the given length.
|
||||||
|
* @param fileUuid the ID of the file
|
||||||
|
* @param length the new length of the file
|
||||||
|
* @return true if the truncate was successful, false otherwise
|
||||||
|
*/
|
||||||
public Boolean truncate(JObjectKey fileUuid, long length) {
|
public Boolean truncate(JObjectKey fileUuid, long length) {
|
||||||
return jObjectTxManager.executeTx(() -> {
|
return jObjectTxManager.executeTx(() -> {
|
||||||
if (length < 0)
|
if (length < 0)
|
||||||
@@ -525,6 +606,12 @@ public class DhfsFileService {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Fill the given range with zeroes.
|
||||||
|
* @param fillStart the start of the range
|
||||||
|
* @param length the end of the range
|
||||||
|
* @param newChunks the map to store the new chunks in
|
||||||
|
*/
|
||||||
private void fillZeros(long fillStart, long length, Map<Long, JObjectKey> newChunks) {
|
private void fillZeros(long fillStart, long length, Map<Long, JObjectKey> newChunks) {
|
||||||
long combinedSize = (length - fillStart);
|
long combinedSize = (length - fillStart);
|
||||||
|
|
||||||
@@ -560,12 +647,22 @@ public class DhfsFileService {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Read the contents of a symlink.
|
||||||
|
* @param uuid the ID of the symlink
|
||||||
|
* @return the contents of the symlink as a string
|
||||||
|
*/
|
||||||
public String readlink(JObjectKey uuid) {
|
public String readlink(JObjectKey uuid) {
|
||||||
return jObjectTxManager.executeTx(() -> {
|
return jObjectTxManager.executeTx(() -> {
|
||||||
return readlinkBS(uuid).toStringUtf8();
|
return readlinkBS(uuid).toStringUtf8();
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Read the contents of a symlink as a ByteString.
|
||||||
|
* @param uuid the ID of the symlink
|
||||||
|
* @return the contents of the symlink as a ByteString
|
||||||
|
*/
|
||||||
public ByteString readlinkBS(JObjectKey uuid) {
|
public ByteString readlinkBS(JObjectKey uuid) {
|
||||||
return jObjectTxManager.executeTx(() -> {
|
return jObjectTxManager.executeTx(() -> {
|
||||||
var fileOpt = remoteTx.getData(File.class, uuid).orElseThrow(() -> new StatusRuntimeException(Status.NOT_FOUND.withDescription("File not found when trying to readlink: " + uuid)));
|
var fileOpt = remoteTx.getData(File.class, uuid).orElseThrow(() -> new StatusRuntimeException(Status.NOT_FOUND.withDescription("File not found when trying to readlink: " + uuid)));
|
||||||
@@ -573,10 +670,16 @@ public class DhfsFileService {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a symlink.
|
||||||
|
* @param oldpath the target of the symlink
|
||||||
|
* @param newpath the path of the symlink
|
||||||
|
* @return the key of the created symlink
|
||||||
|
*/
|
||||||
public JObjectKey symlink(String oldpath, String newpath) {
|
public JObjectKey symlink(String oldpath, String newpath) {
|
||||||
return jObjectTxManager.executeTx(() -> {
|
return jObjectTxManager.executeTx(() -> {
|
||||||
Path path = Path.of(newpath);
|
Path path = Path.of(newpath);
|
||||||
var parent = getDirEntryW(path.getParent().toString());
|
var parent = getDirEntry(path.getParent().toString());
|
||||||
|
|
||||||
ensureDir(parent);
|
ensureDir(parent);
|
||||||
|
|
||||||
@@ -595,6 +698,13 @@ public class DhfsFileService {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set the access and modification times of a file.
|
||||||
|
* @param fileUuid the ID of the file
|
||||||
|
* @param atimeMs the access time in milliseconds
|
||||||
|
* @param mtimeMs the modification time in milliseconds
|
||||||
|
* @return true if the times were set successfully, false otherwise
|
||||||
|
*/
|
||||||
public Boolean setTimes(JObjectKey fileUuid, long atimeMs, long mtimeMs) {
|
public Boolean setTimes(JObjectKey fileUuid, long atimeMs, long mtimeMs) {
|
||||||
return jObjectTxManager.executeTx(() -> {
|
return jObjectTxManager.executeTx(() -> {
|
||||||
var dent = curTx.get(JData.class, fileUuid).orElseThrow(() -> new StatusRuntimeExceptionNoStacktrace(Status.NOT_FOUND));
|
var dent = curTx.get(JData.class, fileUuid).orElseThrow(() -> new StatusRuntimeExceptionNoStacktrace(Status.NOT_FOUND));
|
||||||
@@ -616,6 +726,11 @@ public class DhfsFileService {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the size of a file.
|
||||||
|
* @param fileUuid the ID of the file
|
||||||
|
* @return the size of the file
|
||||||
|
*/
|
||||||
public long size(JObjectKey fileUuid) {
|
public long size(JObjectKey fileUuid) {
|
||||||
return jObjectTxManager.executeTx(() -> {
|
return jObjectTxManager.executeTx(() -> {
|
||||||
long realSize = 0;
|
long realSize = 0;
|
||||||
@@ -635,6 +750,13 @@ public class DhfsFileService {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Write data to a file.
|
||||||
|
* @param fileUuid the ID of the file
|
||||||
|
* @param offset the offset to write to
|
||||||
|
* @param data the data to write
|
||||||
|
* @return the number of bytes written
|
||||||
|
*/
|
||||||
public Long write(JObjectKey fileUuid, long offset, byte[] data) {
|
public Long write(JObjectKey fileUuid, long offset, byte[] data) {
|
||||||
return write(fileUuid, offset, UnsafeByteOperations.unsafeWrap(data));
|
return write(fileUuid, offset, UnsafeByteOperations.unsafeWrap(data));
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,5 +1,10 @@
|
|||||||
package com.usatiuk.dhfsfs.service;
|
package com.usatiuk.dhfsfs.service;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* DirectoryNotEmptyException is thrown when a directory is not empty.
|
||||||
|
* This exception is used to indicate that a directory cannot be deleted
|
||||||
|
* because it contains files or subdirectories.
|
||||||
|
*/
|
||||||
public class DirectoryNotEmptyException extends RuntimeException {
|
public class DirectoryNotEmptyException extends RuntimeException {
|
||||||
@Override
|
@Override
|
||||||
public synchronized Throwable fillInStackTrace() {
|
public synchronized Throwable fillInStackTrace() {
|
||||||
|
|||||||
@@ -1,4 +1,11 @@
|
|||||||
package com.usatiuk.dhfsfs.service;
|
package com.usatiuk.dhfsfs.service;
|
||||||
|
|
||||||
public record GetattrRes(long mtime, long ctime, long mode, GetattrType type) {
|
/**
|
||||||
|
* GetattrRes is a record that represents the result of a getattr operation.
|
||||||
|
* @param mtime File modification time
|
||||||
|
* @param ctime File creation time
|
||||||
|
* @param mode File mode
|
||||||
|
* @param type File type
|
||||||
|
*/
|
||||||
|
public record GetattrRes(long mtime, long ctime, long mode, GetattrType type) {
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -154,7 +154,7 @@ public class LazyFsIT {
|
|||||||
Thread.sleep(3000);
|
Thread.sleep(3000);
|
||||||
Log.info("Killing");
|
Log.info("Killing");
|
||||||
lazyFs1.crash();
|
lazyFs1.crash();
|
||||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Caused by: org.lmdbjava"), 60, TimeUnit.SECONDS);
|
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("org.lmdbjava.LmdbNativeException"), 60, TimeUnit.SECONDS);
|
||||||
var client = DockerClientFactory.instance().client();
|
var client = DockerClientFactory.instance().client();
|
||||||
client.killContainerCmd(container1.getContainerId()).exec();
|
client.killContainerCmd(container1.getContainerId()).exec();
|
||||||
container1.stop();
|
container1.stop();
|
||||||
@@ -195,7 +195,7 @@ public class LazyFsIT {
|
|||||||
lazyFs1.crash();
|
lazyFs1.crash();
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Caused by: org.lmdbjava"), 60, TimeUnit.SECONDS);
|
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("org.lmdbjava.LmdbNativeException"), 60, TimeUnit.SECONDS);
|
||||||
} catch (TimeoutException e) {
|
} catch (TimeoutException e) {
|
||||||
// Sometimes crash doesn't work
|
// Sometimes crash doesn't work
|
||||||
Log.info("Failed to crash: " + testInfo.getDisplayName());
|
Log.info("Failed to crash: " + testInfo.getDisplayName());
|
||||||
@@ -237,7 +237,7 @@ public class LazyFsIT {
|
|||||||
Thread.sleep(3000);
|
Thread.sleep(3000);
|
||||||
Log.info("Killing");
|
Log.info("Killing");
|
||||||
lazyFs1.crash();
|
lazyFs1.crash();
|
||||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Caused by: org.lmdbjava"), 60, TimeUnit.SECONDS);
|
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("org.lmdbjava.LmdbNativeException"), 60, TimeUnit.SECONDS);
|
||||||
var client = DockerClientFactory.instance().client();
|
var client = DockerClientFactory.instance().client();
|
||||||
client.killContainerCmd(container1.getContainerId()).exec();
|
client.killContainerCmd(container1.getContainerId()).exec();
|
||||||
container1.stop();
|
container1.stop();
|
||||||
@@ -279,7 +279,7 @@ public class LazyFsIT {
|
|||||||
lazyFs1.crash();
|
lazyFs1.crash();
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Caused by: org.lmdbjava"), 60, TimeUnit.SECONDS);
|
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("org.lmdbjava.LmdbNativeException"), 60, TimeUnit.SECONDS);
|
||||||
} catch (TimeoutException e) {
|
} catch (TimeoutException e) {
|
||||||
// Sometimes crash doesn't work
|
// Sometimes crash doesn't work
|
||||||
Log.info("Failed to crash: " + testInfo.getDisplayName());
|
Log.info("Failed to crash: " + testInfo.getDisplayName());
|
||||||
@@ -322,7 +322,7 @@ public class LazyFsIT {
|
|||||||
Log.info("Killing");
|
Log.info("Killing");
|
||||||
lazyFs2.crash();
|
lazyFs2.crash();
|
||||||
container1.execInContainer("/bin/sh", "-c", "touch /tmp/stopprinting1");
|
container1.execInContainer("/bin/sh", "-c", "touch /tmp/stopprinting1");
|
||||||
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Caused by: org.lmdbjava"), 60, TimeUnit.SECONDS);
|
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("org.lmdbjava.LmdbNativeException"), 60, TimeUnit.SECONDS);
|
||||||
var client = DockerClientFactory.instance().client();
|
var client = DockerClientFactory.instance().client();
|
||||||
client.killContainerCmd(container2.getContainerId()).exec();
|
client.killContainerCmd(container2.getContainerId()).exec();
|
||||||
container2.stop();
|
container2.stop();
|
||||||
@@ -366,7 +366,7 @@ public class LazyFsIT {
|
|||||||
}
|
}
|
||||||
container1.execInContainer("/bin/sh", "-c", "touch /tmp/stopprinting2");
|
container1.execInContainer("/bin/sh", "-c", "touch /tmp/stopprinting2");
|
||||||
try {
|
try {
|
||||||
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Caused by: org.lmdbjava"), 60, TimeUnit.SECONDS);
|
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("org.lmdbjava.LmdbNativeException"), 60, TimeUnit.SECONDS);
|
||||||
} catch (TimeoutException e) {
|
} catch (TimeoutException e) {
|
||||||
// Sometimes crash doesn't work
|
// Sometimes crash doesn't work
|
||||||
Log.info("Failed to crash: " + testInfo.getDisplayName());
|
Log.info("Failed to crash: " + testInfo.getDisplayName());
|
||||||
@@ -409,7 +409,7 @@ public class LazyFsIT {
|
|||||||
Log.info("Killing");
|
Log.info("Killing");
|
||||||
lazyFs2.crash();
|
lazyFs2.crash();
|
||||||
container1.execInContainer("/bin/sh", "-c", "touch /tmp/stopprinting1");
|
container1.execInContainer("/bin/sh", "-c", "touch /tmp/stopprinting1");
|
||||||
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Caused by: org.lmdbjava"), 60, TimeUnit.SECONDS);
|
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("org.lmdbjava.LmdbNativeException"), 60, TimeUnit.SECONDS);
|
||||||
var client = DockerClientFactory.instance().client();
|
var client = DockerClientFactory.instance().client();
|
||||||
client.killContainerCmd(container2.getContainerId()).exec();
|
client.killContainerCmd(container2.getContainerId()).exec();
|
||||||
container2.stop();
|
container2.stop();
|
||||||
@@ -453,7 +453,7 @@ public class LazyFsIT {
|
|||||||
}
|
}
|
||||||
container1.execInContainer("/bin/sh", "-c", "touch /tmp/stopprinting2");
|
container1.execInContainer("/bin/sh", "-c", "touch /tmp/stopprinting2");
|
||||||
try {
|
try {
|
||||||
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Caused by: org.lmdbjava"), 60, TimeUnit.SECONDS);
|
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("org.lmdbjava.LmdbNativeException"), 60, TimeUnit.SECONDS);
|
||||||
} catch (TimeoutException e) {
|
} catch (TimeoutException e) {
|
||||||
// Sometimes crash doesn't work
|
// Sometimes crash doesn't work
|
||||||
Log.info("Failed to crash: " + testInfo.getDisplayName());
|
Log.info("Failed to crash: " + testInfo.getDisplayName());
|
||||||
|
|||||||
@@ -1,32 +0,0 @@
|
|||||||
package com.usatiuk.kleppmanntree;
|
|
||||||
|
|
||||||
import java.io.Serializable;
|
|
||||||
|
|
||||||
public class AtomicClock implements Clock<Long>, Serializable {
|
|
||||||
private long _max = 0;
|
|
||||||
|
|
||||||
public AtomicClock(long counter) {
|
|
||||||
_max = counter;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Long getTimestamp() {
|
|
||||||
return ++_max;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setTimestamp(Long timestamp) {
|
|
||||||
_max = timestamp;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Long peekTimestamp() {
|
|
||||||
return _max;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Long updateTimestamp(Long receivedTimestamp) {
|
|
||||||
var old = _max;
|
|
||||||
_max = Math.max(_max, receivedTimestamp) + 1;
|
|
||||||
return old;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -1,11 +1,10 @@
|
|||||||
package com.usatiuk.objects.iterators;
|
package com.usatiuk.objects.iterators;
|
||||||
|
|
||||||
import com.usatiuk.utils.AutoCloseableNoThrow;
|
|
||||||
import org.apache.commons.lang3.tuple.Pair;
|
import org.apache.commons.lang3.tuple.Pair;
|
||||||
|
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
|
|
||||||
public interface CloseableKvIterator<K extends Comparable<? super K>, V> extends Iterator<Pair<K, V>>, AutoCloseableNoThrow {
|
public interface CloseableKvIterator<K extends Comparable<? super K>, V> extends Iterator<Pair<K, V>>, AutoCloseable {
|
||||||
K peekNextKey();
|
K peekNextKey();
|
||||||
|
|
||||||
void skip();
|
void skip();
|
||||||
@@ -21,4 +20,7 @@ public interface CloseableKvIterator<K extends Comparable<? super K>, V> extends
|
|||||||
default CloseableKvIterator<K, V> reversed() {
|
default CloseableKvIterator<K, V> reversed() {
|
||||||
return new ReversedKvIterator<K, V>(this);
|
return new ReversedKvIterator<K, V>(this);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
void close();
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -3,19 +3,20 @@ package com.usatiuk.objects.snapshot;
|
|||||||
import com.usatiuk.objects.iterators.CloseableKvIterator;
|
import com.usatiuk.objects.iterators.CloseableKvIterator;
|
||||||
import com.usatiuk.objects.iterators.IteratorStart;
|
import com.usatiuk.objects.iterators.IteratorStart;
|
||||||
import com.usatiuk.objects.iterators.MaybeTombstone;
|
import com.usatiuk.objects.iterators.MaybeTombstone;
|
||||||
import com.usatiuk.objects.iterators.Tombstone;
|
|
||||||
import com.usatiuk.utils.AutoCloseableNoThrow;
|
|
||||||
|
|
||||||
import javax.annotation.Nonnull;
|
import javax.annotation.Nonnull;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
import java.util.stream.Stream;
|
|
||||||
|
|
||||||
public interface Snapshot<K extends Comparable<K>, V> extends AutoCloseableNoThrow {
|
public interface Snapshot<K extends Comparable<K>, V> extends AutoCloseable {
|
||||||
List<CloseableKvIterator<K, MaybeTombstone<V>>> getIterator(IteratorStart start, K key);
|
List<CloseableKvIterator<K, MaybeTombstone<V>>> getIterator(IteratorStart start, K key);
|
||||||
|
|
||||||
@Nonnull
|
@Nonnull
|
||||||
Optional<V> readObject(K name);
|
Optional<V> readObject(K name);
|
||||||
|
|
||||||
long id();
|
long id();
|
||||||
|
|
||||||
|
@Override
|
||||||
|
void close();
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -31,7 +31,6 @@ public class CachingObjectPersistentStore {
|
|||||||
SerializingObjectPersistentStore delegate;
|
SerializingObjectPersistentStore delegate;
|
||||||
@ConfigProperty(name = "dhfs.objects.lru.print-stats")
|
@ConfigProperty(name = "dhfs.objects.lru.print-stats")
|
||||||
boolean printStats;
|
boolean printStats;
|
||||||
private ExecutorService _commitExecutor;
|
|
||||||
private ExecutorService _statusExecutor;
|
private ExecutorService _statusExecutor;
|
||||||
private AtomicLong _cached = new AtomicLong();
|
private AtomicLong _cached = new AtomicLong();
|
||||||
private AtomicLong _cacheTries = new AtomicLong();
|
private AtomicLong _cacheTries = new AtomicLong();
|
||||||
@@ -47,7 +46,6 @@ public class CachingObjectPersistentStore {
|
|||||||
_cache.set(_cache.get().withVersion(s.id()));
|
_cache.set(_cache.get().withVersion(s.id()));
|
||||||
}
|
}
|
||||||
|
|
||||||
_commitExecutor = Executors.newSingleThreadExecutor();
|
|
||||||
if (printStats) {
|
if (printStats) {
|
||||||
_statusExecutor = Executors.newSingleThreadExecutor();
|
_statusExecutor = Executors.newSingleThreadExecutor();
|
||||||
_statusExecutor.submit(() -> {
|
_statusExecutor.submit(() -> {
|
||||||
@@ -68,7 +66,6 @@ public class CachingObjectPersistentStore {
|
|||||||
Log.tracev("Committing: {0} writes, {1} deletes", objs.written().size(), objs.deleted().size());
|
Log.tracev("Committing: {0} writes, {1} deletes", objs.written().size(), objs.deleted().size());
|
||||||
|
|
||||||
var cache = _cache.get();
|
var cache = _cache.get();
|
||||||
var commitFuture = _commitExecutor.submit(() -> delegate.prepareTx(objs, txId).run());
|
|
||||||
for (var write : objs.written()) {
|
for (var write : objs.written()) {
|
||||||
cache = cache.withPut(write.getLeft(), Optional.of(write.getRight()));
|
cache = cache.withPut(write.getLeft(), Optional.of(write.getRight()));
|
||||||
}
|
}
|
||||||
@@ -76,11 +73,7 @@ public class CachingObjectPersistentStore {
|
|||||||
cache = cache.withPut(del, Optional.empty());
|
cache = cache.withPut(del, Optional.empty());
|
||||||
}
|
}
|
||||||
cache = cache.withVersion(txId);
|
cache = cache.withVersion(txId);
|
||||||
try {
|
delegate.commitTx(objs, txId);
|
||||||
commitFuture.get();
|
|
||||||
} catch (Exception e) {
|
|
||||||
throw new RuntimeException(e);
|
|
||||||
}
|
|
||||||
_cache.set(cache);
|
_cache.set(cache);
|
||||||
|
|
||||||
Log.tracev("Committed: {0} writes, {1} deletes", objs.written().size(), objs.deleted().size());
|
Log.tracev("Committed: {0} writes, {1} deletes", objs.written().size(), objs.deleted().size());
|
||||||
|
|||||||
@@ -145,10 +145,9 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Runnable prepareTx(TxManifestRaw names, long txId) {
|
public void commitTx(TxManifestRaw names, long txId) {
|
||||||
verifyReady();
|
verifyReady();
|
||||||
var txn = _env.txnWrite();
|
try (var txn = _env.txnWrite()) {
|
||||||
try {
|
|
||||||
for (var written : names.written()) {
|
for (var written : names.written()) {
|
||||||
var putBb = _db.reserve(txn, written.getKey().toByteBuffer(), written.getValue().size());
|
var putBb = _db.reserve(txn, written.getKey().toByteBuffer(), written.getValue().size());
|
||||||
written.getValue().copyTo(putBb);
|
written.getValue().copyTo(putBb);
|
||||||
@@ -163,17 +162,8 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore {
|
|||||||
bbData.putLong(txId);
|
bbData.putLong(txId);
|
||||||
bbData.flip();
|
bbData.flip();
|
||||||
_db.put(txn, DB_VER_OBJ_NAME.asReadOnlyBuffer(), bbData);
|
_db.put(txn, DB_VER_OBJ_NAME.asReadOnlyBuffer(), bbData);
|
||||||
} catch (Throwable t) {
|
txn.commit();
|
||||||
txn.close();
|
|
||||||
throw t;
|
|
||||||
}
|
}
|
||||||
return () -> {
|
|
||||||
try {
|
|
||||||
txn.commit();
|
|
||||||
} finally {
|
|
||||||
txn.close();
|
|
||||||
}
|
|
||||||
};
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@@ -188,12 +178,6 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore {
|
|||||||
return _root.toFile().getFreeSpace();
|
return _root.toFile().getFreeSpace();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public long getUsableSpace() {
|
|
||||||
verifyReady();
|
|
||||||
return _root.toFile().getUsableSpace();
|
|
||||||
}
|
|
||||||
|
|
||||||
private class LmdbKvIterator extends ReversibleKvIterator<JObjectKey, MaybeTombstone<ByteBuffer>> {
|
private class LmdbKvIterator extends ReversibleKvIterator<JObjectKey, MaybeTombstone<ByteBuffer>> {
|
||||||
private static final Cleaner CLEANER = Cleaner.create();
|
private static final Cleaner CLEANER = Cleaner.create();
|
||||||
private final Txn<ByteBuffer> _txn; // Managed by the snapshot
|
private final Txn<ByteBuffer> _txn; // Managed by the snapshot
|
||||||
|
|||||||
@@ -53,19 +53,18 @@ public class MemoryObjectPersistentStore implements ObjectPersistentStore {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public Runnable prepareTx(TxManifestRaw names, long txId) {
|
@Override
|
||||||
return () -> {
|
public void commitTx(TxManifestRaw names, long txId) {
|
||||||
synchronized (this) {
|
synchronized (this) {
|
||||||
for (var written : names.written()) {
|
for (var written : names.written()) {
|
||||||
_objects = _objects.plus(written.getKey(), written.getValue());
|
_objects = _objects.plus(written.getKey(), written.getValue());
|
||||||
}
|
|
||||||
for (JObjectKey key : names.deleted()) {
|
|
||||||
_objects = _objects.minus(key);
|
|
||||||
}
|
|
||||||
assert txId > _lastCommitId;
|
|
||||||
_lastCommitId = txId;
|
|
||||||
}
|
}
|
||||||
};
|
for (JObjectKey key : names.deleted()) {
|
||||||
|
_objects = _objects.minus(key);
|
||||||
|
}
|
||||||
|
assert txId > _lastCommitId;
|
||||||
|
_lastCommitId = txId;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@@ -77,9 +76,4 @@ public class MemoryObjectPersistentStore implements ObjectPersistentStore {
|
|||||||
public long getFreeSpace() {
|
public long getFreeSpace() {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public long getUsableSpace() {
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -13,11 +13,9 @@ import java.util.Optional;
|
|||||||
public interface ObjectPersistentStore {
|
public interface ObjectPersistentStore {
|
||||||
Snapshot<JObjectKey, ByteBuffer> getSnapshot();
|
Snapshot<JObjectKey, ByteBuffer> getSnapshot();
|
||||||
|
|
||||||
Runnable prepareTx(TxManifestRaw names, long txId);
|
void commitTx(TxManifestRaw names, long txId);
|
||||||
|
|
||||||
long getTotalSpace();
|
long getTotalSpace();
|
||||||
|
|
||||||
long getFreeSpace();
|
long getFreeSpace();
|
||||||
|
|
||||||
long getUsableSpace();
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -62,7 +62,7 @@ public class SerializingObjectPersistentStore {
|
|||||||
, objs.deleted());
|
, objs.deleted());
|
||||||
}
|
}
|
||||||
|
|
||||||
Runnable prepareTx(TxManifestObj<? extends JDataVersionedWrapper> objects, long txId) {
|
void commitTx(TxManifestObj<? extends JDataVersionedWrapper> objects, long txId) {
|
||||||
return delegateStore.prepareTx(prepareManifest(objects), txId);
|
delegateStore.commitTx(prepareManifest(objects), txId);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -188,7 +188,7 @@ public class WritebackObjectPersistentStore {
|
|||||||
Log.info("Writeback thread exiting");
|
Log.info("Writeback thread exiting");
|
||||||
}
|
}
|
||||||
|
|
||||||
public long commitBundle(Collection<TxRecord.TxObjectRecord<?>> writes) {
|
private long commitBundle(Collection<TxRecord.TxObjectRecord<?>> writes) {
|
||||||
verifyReady();
|
verifyReady();
|
||||||
_pendingBundleLock.lock();
|
_pendingBundleLock.lock();
|
||||||
try {
|
try {
|
||||||
|
|||||||
@@ -1,5 +0,0 @@
|
|||||||
package com.usatiuk.objects.transaction;
|
|
||||||
|
|
||||||
public interface TransactionFactory {
|
|
||||||
TransactionPrivate createTransaction();
|
|
||||||
}
|
|
||||||
@@ -1,262 +0,0 @@
|
|||||||
package com.usatiuk.objects.transaction;
|
|
||||||
|
|
||||||
import com.usatiuk.objects.JData;
|
|
||||||
import com.usatiuk.objects.JDataVersionedWrapper;
|
|
||||||
import com.usatiuk.objects.JObjectKey;
|
|
||||||
import com.usatiuk.objects.iterators.*;
|
|
||||||
import com.usatiuk.objects.snapshot.Snapshot;
|
|
||||||
import com.usatiuk.objects.stores.WritebackObjectPersistentStore;
|
|
||||||
import com.usatiuk.utils.ListUtils;
|
|
||||||
import io.quarkus.logging.Log;
|
|
||||||
import jakarta.inject.Inject;
|
|
||||||
import jakarta.inject.Singleton;
|
|
||||||
import org.apache.commons.lang3.tuple.Pair;
|
|
||||||
import org.eclipse.microprofile.config.inject.ConfigProperty;
|
|
||||||
|
|
||||||
import java.util.*;
|
|
||||||
|
|
||||||
@Singleton
|
|
||||||
public class TransactionFactoryImpl implements TransactionFactory {
|
|
||||||
@Inject
|
|
||||||
WritebackObjectPersistentStore writebackObjectPersistentStore;
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public TransactionPrivate createTransaction() {
|
|
||||||
return new TransactionImpl();
|
|
||||||
}
|
|
||||||
|
|
||||||
private interface ReadTrackingInternalCrap {
|
|
||||||
boolean fromSource();
|
|
||||||
|
|
||||||
JData obj();
|
|
||||||
}
|
|
||||||
|
|
||||||
// FIXME:
|
|
||||||
private record ReadTrackingInternalCrapSource(JDataVersionedWrapper wrapped) implements ReadTrackingInternalCrap {
|
|
||||||
@Override
|
|
||||||
public boolean fromSource() {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public JData obj() {
|
|
||||||
return wrapped.data();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private record ReadTrackingInternalCrapTx(JData obj) implements ReadTrackingInternalCrap {
|
|
||||||
@Override
|
|
||||||
public boolean fromSource() {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private class TransactionImpl implements TransactionPrivate {
|
|
||||||
private final Map<JObjectKey, Optional<JDataVersionedWrapper>> _readSet = new HashMap<>();
|
|
||||||
private final NavigableMap<JObjectKey, TxRecord.TxObjectRecord<?>> _writes = new TreeMap<>();
|
|
||||||
private final List<Runnable> _onCommit = new LinkedList<>();
|
|
||||||
private final List<Runnable> _onFlush = new LinkedList<>();
|
|
||||||
private final HashSet<JObjectKey> _knownNew = new HashSet<>();
|
|
||||||
private final Snapshot<JObjectKey, JDataVersionedWrapper> _snapshot;
|
|
||||||
private boolean _closed = false;
|
|
||||||
|
|
||||||
private boolean _writeTrack = false;
|
|
||||||
private Map<JObjectKey, TxRecord.TxObjectRecord<?>> _newWrites = new HashMap<>();
|
|
||||||
|
|
||||||
private TransactionImpl() {
|
|
||||||
_snapshot = writebackObjectPersistentStore.getSnapshot();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onCommit(Runnable runnable) {
|
|
||||||
_onCommit.add(runnable);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onFlush(Runnable runnable) {
|
|
||||||
_onFlush.add(runnable);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Collection<Runnable> getOnCommit() {
|
|
||||||
return Collections.unmodifiableCollection(_onCommit);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Snapshot<JObjectKey, JDataVersionedWrapper> snapshot() {
|
|
||||||
return _snapshot;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Collection<Runnable> getOnFlush() {
|
|
||||||
return Collections.unmodifiableCollection(_onFlush);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public <T extends JData> Optional<T> getFromSource(Class<T> type, JObjectKey key) {
|
|
||||||
if (_knownNew.contains(key)) {
|
|
||||||
return Optional.empty();
|
|
||||||
}
|
|
||||||
|
|
||||||
return _readSet.computeIfAbsent(key, _snapshot::readObject)
|
|
||||||
.map(JDataVersionedWrapper::data)
|
|
||||||
.map(type::cast);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public <T extends JData> Optional<T> get(Class<T> type, JObjectKey key) {
|
|
||||||
return switch (_writes.get(key)) {
|
|
||||||
case TxRecord.TxObjectRecordWrite<?> write -> Optional.of(type.cast(write.data()));
|
|
||||||
case TxRecord.TxObjectRecordDeleted deleted -> Optional.empty();
|
|
||||||
case null -> getFromSource(type, key);
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void delete(JObjectKey key) {
|
|
||||||
var record = new TxRecord.TxObjectRecordDeleted(key);
|
|
||||||
if (_writes.put(key, record) instanceof TxRecord.TxObjectRecordDeleted) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
if (_writeTrack)
|
|
||||||
_newWrites.put(key, record);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public CloseableKvIterator<JObjectKey, JData> getIterator(IteratorStart start, JObjectKey key) {
|
|
||||||
Log.tracev("Getting tx iterator with start={0}, key={1}", start, key);
|
|
||||||
return new ReadTrackingIterator(new TombstoneSkippingIterator<JObjectKey, ReadTrackingInternalCrap>(start, key,
|
|
||||||
ListUtils.prependAndMap(
|
|
||||||
new MappingKvIterator<>(new NavigableMapKvIterator<>(_writes, start, key),
|
|
||||||
t -> switch (t) {
|
|
||||||
case TxRecord.TxObjectRecordWrite<?> write ->
|
|
||||||
new DataWrapper<ReadTrackingInternalCrap>(new ReadTrackingInternalCrapTx(write.data()));
|
|
||||||
case TxRecord.TxObjectRecordDeleted deleted ->
|
|
||||||
new TombstoneImpl<ReadTrackingInternalCrap>();
|
|
||||||
case null, default -> null;
|
|
||||||
}),
|
|
||||||
_snapshot.getIterator(start, key),
|
|
||||||
itin -> new MappingKvIterator<JObjectKey, MaybeTombstone<JDataVersionedWrapper>, MaybeTombstone<ReadTrackingInternalCrap>>(itin,
|
|
||||||
d -> switch (d) {
|
|
||||||
case Data<JDataVersionedWrapper> w ->
|
|
||||||
new DataWrapper<>(new ReadTrackingInternalCrapSource(w.value()));
|
|
||||||
case Tombstone<JDataVersionedWrapper> t -> new TombstoneImpl<>();
|
|
||||||
case null, default -> null;
|
|
||||||
}))));
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void put(JData obj) {
|
|
||||||
var key = obj.key();
|
|
||||||
var read = _readSet.get(key);
|
|
||||||
if (read != null && (read.map(JDataVersionedWrapper::data).orElse(null) == obj)) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
var record = new TxRecord.TxObjectRecordWrite<>(obj);
|
|
||||||
_writes.put(key, record);
|
|
||||||
if (_writeTrack)
|
|
||||||
_newWrites.put(key, record);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void putNew(JData obj) {
|
|
||||||
var key = obj.key();
|
|
||||||
_knownNew.add(key);
|
|
||||||
|
|
||||||
var record = new TxRecord.TxObjectRecordWrite<>(obj);
|
|
||||||
_writes.put(key, record);
|
|
||||||
if (_writeTrack)
|
|
||||||
_newWrites.put(key, record);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Collection<TxRecord.TxObjectRecord<?>> drainNewWrites() {
|
|
||||||
if (!_writeTrack) {
|
|
||||||
_writeTrack = true;
|
|
||||||
return Collections.unmodifiableCollection(_writes.values());
|
|
||||||
}
|
|
||||||
var ret = _newWrites;
|
|
||||||
_newWrites = new HashMap<>();
|
|
||||||
return ret.values();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Map<JObjectKey, Optional<JDataVersionedWrapper>> reads() {
|
|
||||||
return _readSet;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Set<JObjectKey> knownNew() {
|
|
||||||
return _knownNew;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void close() {
|
|
||||||
if (_closed) return;
|
|
||||||
_closed = true;
|
|
||||||
_snapshot.close();
|
|
||||||
}
|
|
||||||
|
|
||||||
private class ReadTrackingIterator implements CloseableKvIterator<JObjectKey, JData> {
|
|
||||||
private final CloseableKvIterator<JObjectKey, ReadTrackingInternalCrap> _backing;
|
|
||||||
|
|
||||||
public ReadTrackingIterator(CloseableKvIterator<JObjectKey, ReadTrackingInternalCrap> backing) {
|
|
||||||
_backing = backing;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public JObjectKey peekNextKey() {
|
|
||||||
return _backing.peekNextKey();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void skip() {
|
|
||||||
_backing.skip();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public JObjectKey peekPrevKey() {
|
|
||||||
return _backing.peekPrevKey();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Pair<JObjectKey, JData> prev() {
|
|
||||||
var got = _backing.prev();
|
|
||||||
if (got.getValue() instanceof ReadTrackingInternalCrapSource(JDataVersionedWrapper wrapped)) {
|
|
||||||
_readSet.putIfAbsent(got.getKey(), Optional.of(wrapped));
|
|
||||||
}
|
|
||||||
return Pair.of(got.getKey(), got.getValue().obj());
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean hasPrev() {
|
|
||||||
return _backing.hasPrev();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void skipPrev() {
|
|
||||||
_backing.skipPrev();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void close() {
|
|
||||||
_backing.close();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean hasNext() {
|
|
||||||
return _backing.hasNext();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Pair<JObjectKey, JData> next() {
|
|
||||||
var got = _backing.next();
|
|
||||||
if (got.getValue() instanceof ReadTrackingInternalCrapSource(JDataVersionedWrapper wrapped)) {
|
|
||||||
_readSet.putIfAbsent(got.getKey(), Optional.of(wrapped));
|
|
||||||
}
|
|
||||||
return Pair.of(got.getKey(), got.getValue().obj());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -1,7 +0,0 @@
|
|||||||
package com.usatiuk.objects.transaction;
|
|
||||||
|
|
||||||
import java.util.Collection;
|
|
||||||
|
|
||||||
public interface TransactionHandlePrivate extends TransactionHandle {
|
|
||||||
Collection<Runnable> getOnFlush();
|
|
||||||
}
|
|
||||||
@@ -0,0 +1,240 @@
|
|||||||
|
package com.usatiuk.objects.transaction;
|
||||||
|
|
||||||
|
import com.usatiuk.objects.JData;
|
||||||
|
import com.usatiuk.objects.JDataVersionedWrapper;
|
||||||
|
import com.usatiuk.objects.JObjectKey;
|
||||||
|
import com.usatiuk.objects.iterators.*;
|
||||||
|
import com.usatiuk.objects.snapshot.Snapshot;
|
||||||
|
import com.usatiuk.utils.ListUtils;
|
||||||
|
import io.quarkus.logging.Log;
|
||||||
|
import org.apache.commons.lang3.tuple.Pair;
|
||||||
|
|
||||||
|
import java.util.*;
|
||||||
|
|
||||||
|
class TransactionImpl implements Transaction, AutoCloseable {
|
||||||
|
private final Map<JObjectKey, Optional<JDataVersionedWrapper>> _readSet = new HashMap<>();
|
||||||
|
private final NavigableMap<JObjectKey, TxRecord.TxObjectRecord<?>> _writes = new TreeMap<>();
|
||||||
|
private final List<Runnable> _onCommit = new LinkedList<>();
|
||||||
|
private final List<Runnable> _onFlush = new LinkedList<>();
|
||||||
|
private final HashSet<JObjectKey> _knownNew = new HashSet<>();
|
||||||
|
private final Snapshot<JObjectKey, JDataVersionedWrapper> _snapshot;
|
||||||
|
private boolean _closed = false;
|
||||||
|
|
||||||
|
private boolean _writeTrack = false;
|
||||||
|
private Map<JObjectKey, TxRecord.TxObjectRecord<?>> _newWrites = new HashMap<>();
|
||||||
|
|
||||||
|
private interface ReadTrackingInternalCrap {
|
||||||
|
boolean fromSource();
|
||||||
|
|
||||||
|
JData obj();
|
||||||
|
}
|
||||||
|
|
||||||
|
// FIXME:
|
||||||
|
private record ReadTrackingInternalCrapSource(JDataVersionedWrapper wrapped) implements ReadTrackingInternalCrap {
|
||||||
|
@Override
|
||||||
|
public boolean fromSource() {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public JData obj() {
|
||||||
|
return wrapped.data();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private record ReadTrackingInternalCrapTx(JData obj) implements ReadTrackingInternalCrap {
|
||||||
|
@Override
|
||||||
|
public boolean fromSource() {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
TransactionImpl(Snapshot<JObjectKey, JDataVersionedWrapper> snapshot) {
|
||||||
|
_snapshot = snapshot;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onCommit(Runnable runnable) {
|
||||||
|
_onCommit.add(runnable);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onFlush(Runnable runnable) {
|
||||||
|
_onFlush.add(runnable);
|
||||||
|
}
|
||||||
|
|
||||||
|
Collection<Runnable> getOnCommit() {
|
||||||
|
return Collections.unmodifiableCollection(_onCommit);
|
||||||
|
}
|
||||||
|
|
||||||
|
Snapshot<JObjectKey, JDataVersionedWrapper> snapshot() {
|
||||||
|
return _snapshot;
|
||||||
|
}
|
||||||
|
|
||||||
|
Collection<Runnable> getOnFlush() {
|
||||||
|
return Collections.unmodifiableCollection(_onFlush);
|
||||||
|
}
|
||||||
|
|
||||||
|
<T extends JData> Optional<T> getFromSource(Class<T> type, JObjectKey key) {
|
||||||
|
if (_knownNew.contains(key)) {
|
||||||
|
return Optional.empty();
|
||||||
|
}
|
||||||
|
|
||||||
|
return _readSet.computeIfAbsent(key, _snapshot::readObject)
|
||||||
|
.map(JDataVersionedWrapper::data)
|
||||||
|
.map(type::cast);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public <T extends JData> Optional<T> get(Class<T> type, JObjectKey key) {
|
||||||
|
return switch (_writes.get(key)) {
|
||||||
|
case TxRecord.TxObjectRecordWrite<?> write -> Optional.of(type.cast(write.data()));
|
||||||
|
case TxRecord.TxObjectRecordDeleted deleted -> Optional.empty();
|
||||||
|
case null -> getFromSource(type, key);
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void delete(JObjectKey key) {
|
||||||
|
var record = new TxRecord.TxObjectRecordDeleted(key);
|
||||||
|
if (_writes.put(key, record) instanceof TxRecord.TxObjectRecordDeleted) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (_writeTrack)
|
||||||
|
_newWrites.put(key, record);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public CloseableKvIterator<JObjectKey, JData> getIterator(IteratorStart start, JObjectKey key) {
|
||||||
|
Log.tracev("Getting tx iterator with start={0}, key={1}", start, key);
|
||||||
|
return new ReadTrackingIterator(new TombstoneSkippingIterator<JObjectKey, ReadTrackingInternalCrap>(start, key,
|
||||||
|
ListUtils.prependAndMap(
|
||||||
|
new MappingKvIterator<>(new NavigableMapKvIterator<>(_writes, start, key),
|
||||||
|
t -> switch (t) {
|
||||||
|
case TxRecord.TxObjectRecordWrite<?> write ->
|
||||||
|
new DataWrapper<ReadTrackingInternalCrap>(new ReadTrackingInternalCrapTx(write.data()));
|
||||||
|
case TxRecord.TxObjectRecordDeleted deleted ->
|
||||||
|
new TombstoneImpl<ReadTrackingInternalCrap>();
|
||||||
|
case null, default -> null;
|
||||||
|
}),
|
||||||
|
_snapshot.getIterator(start, key),
|
||||||
|
itin -> new MappingKvIterator<JObjectKey, MaybeTombstone<JDataVersionedWrapper>, MaybeTombstone<ReadTrackingInternalCrap>>(itin,
|
||||||
|
d -> switch (d) {
|
||||||
|
case Data<JDataVersionedWrapper> w ->
|
||||||
|
new DataWrapper<>(new ReadTrackingInternalCrapSource(w.value()));
|
||||||
|
case Tombstone<JDataVersionedWrapper> t -> new TombstoneImpl<>();
|
||||||
|
case null, default -> null;
|
||||||
|
}))));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void put(JData obj) {
|
||||||
|
var key = obj.key();
|
||||||
|
var read = _readSet.get(key);
|
||||||
|
if (read != null && (read.map(JDataVersionedWrapper::data).orElse(null) == obj)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
var record = new TxRecord.TxObjectRecordWrite<>(obj);
|
||||||
|
_writes.put(key, record);
|
||||||
|
if (_writeTrack)
|
||||||
|
_newWrites.put(key, record);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void putNew(JData obj) {
|
||||||
|
var key = obj.key();
|
||||||
|
_knownNew.add(key);
|
||||||
|
|
||||||
|
var record = new TxRecord.TxObjectRecordWrite<>(obj);
|
||||||
|
_writes.put(key, record);
|
||||||
|
if (_writeTrack)
|
||||||
|
_newWrites.put(key, record);
|
||||||
|
}
|
||||||
|
|
||||||
|
Collection<TxRecord.TxObjectRecord<?>> drainNewWrites() {
|
||||||
|
if (!_writeTrack) {
|
||||||
|
_writeTrack = true;
|
||||||
|
return Collections.unmodifiableCollection(_writes.values());
|
||||||
|
}
|
||||||
|
var ret = _newWrites;
|
||||||
|
_newWrites = new HashMap<>();
|
||||||
|
return ret.values();
|
||||||
|
}
|
||||||
|
|
||||||
|
Map<JObjectKey, Optional<JDataVersionedWrapper>> reads() {
|
||||||
|
return _readSet;
|
||||||
|
}
|
||||||
|
|
||||||
|
Set<JObjectKey> knownNew() {
|
||||||
|
return _knownNew;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() {
|
||||||
|
if (_closed) return;
|
||||||
|
_closed = true;
|
||||||
|
_snapshot.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
private class ReadTrackingIterator implements CloseableKvIterator<JObjectKey, JData> {
|
||||||
|
private final CloseableKvIterator<JObjectKey, ReadTrackingInternalCrap> _backing;
|
||||||
|
|
||||||
|
public ReadTrackingIterator(CloseableKvIterator<JObjectKey, ReadTrackingInternalCrap> backing) {
|
||||||
|
_backing = backing;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public JObjectKey peekNextKey() {
|
||||||
|
return _backing.peekNextKey();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void skip() {
|
||||||
|
_backing.skip();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public JObjectKey peekPrevKey() {
|
||||||
|
return _backing.peekPrevKey();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Pair<JObjectKey, JData> prev() {
|
||||||
|
var got = _backing.prev();
|
||||||
|
if (got.getValue() instanceof ReadTrackingInternalCrapSource(JDataVersionedWrapper wrapped)) {
|
||||||
|
_readSet.putIfAbsent(got.getKey(), Optional.of(wrapped));
|
||||||
|
}
|
||||||
|
return Pair.of(got.getKey(), got.getValue().obj());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean hasPrev() {
|
||||||
|
return _backing.hasPrev();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void skipPrev() {
|
||||||
|
_backing.skipPrev();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() {
|
||||||
|
_backing.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean hasNext() {
|
||||||
|
return _backing.hasNext();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Pair<JObjectKey, JData> next() {
|
||||||
|
var got = _backing.next();
|
||||||
|
if (got.getValue() instanceof ReadTrackingInternalCrapSource(JDataVersionedWrapper wrapped)) {
|
||||||
|
_readSet.putIfAbsent(got.getKey(), Optional.of(wrapped));
|
||||||
|
}
|
||||||
|
return Pair.of(got.getKey(), got.getValue().obj());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -1,6 +1,5 @@
|
|||||||
package com.usatiuk.objects.transaction;
|
package com.usatiuk.objects.transaction;
|
||||||
|
|
||||||
import com.usatiuk.utils.VoidFn;
|
|
||||||
import io.quarkus.logging.Log;
|
import io.quarkus.logging.Log;
|
||||||
|
|
||||||
import java.util.function.Supplier;
|
import java.util.function.Supplier;
|
||||||
@@ -41,9 +40,9 @@ public interface TransactionManager {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
default TransactionHandle runTries(VoidFn fn, int tries, boolean nest) {
|
default TransactionHandle runTries(Runnable fn, int tries, boolean nest) {
|
||||||
if (!nest && current() != null) {
|
if (!nest && current() != null) {
|
||||||
fn.apply();
|
fn.run();
|
||||||
return new TransactionHandle() {
|
return new TransactionHandle() {
|
||||||
@Override
|
@Override
|
||||||
public void onFlush(Runnable runnable) {
|
public void onFlush(Runnable runnable) {
|
||||||
@@ -56,7 +55,7 @@ public interface TransactionManager {
|
|||||||
begin();
|
begin();
|
||||||
boolean commit = false;
|
boolean commit = false;
|
||||||
try {
|
try {
|
||||||
fn.apply();
|
fn.run();
|
||||||
commit = true;
|
commit = true;
|
||||||
var ret = commit();
|
var ret = commit();
|
||||||
return ret;
|
return ret;
|
||||||
@@ -80,11 +79,11 @@ public interface TransactionManager {
|
|||||||
return runTries(supplier, tries, false);
|
return runTries(supplier, tries, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
default TransactionHandle runTries(VoidFn fn, int tries) {
|
default TransactionHandle runTries(Runnable fn, int tries) {
|
||||||
return runTries(fn, tries, false);
|
return runTries(fn, tries, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
default TransactionHandle run(VoidFn fn, boolean nest) {
|
default TransactionHandle run(Runnable fn, boolean nest) {
|
||||||
return runTries(fn, 10, nest);
|
return runTries(fn, 10, nest);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -92,7 +91,7 @@ public interface TransactionManager {
|
|||||||
return runTries(supplier, 10, nest);
|
return runTries(supplier, 10, nest);
|
||||||
}
|
}
|
||||||
|
|
||||||
default TransactionHandle run(VoidFn fn) {
|
default TransactionHandle run(Runnable fn) {
|
||||||
return run(fn, false);
|
return run(fn, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -100,7 +99,7 @@ public interface TransactionManager {
|
|||||||
return run(supplier, false);
|
return run(supplier, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
default void executeTx(VoidFn fn) {
|
default void executeTx(Runnable fn) {
|
||||||
run(fn, false);
|
run(fn, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -11,14 +11,14 @@ import java.util.Stack;
|
|||||||
|
|
||||||
@Singleton
|
@Singleton
|
||||||
public class TransactionManagerImpl implements TransactionManager {
|
public class TransactionManagerImpl implements TransactionManager {
|
||||||
private static final ThreadLocal<Stack<TransactionPrivate>> _currentTransaction = ThreadLocal.withInitial(Stack::new);
|
private static final ThreadLocal<Stack<TransactionImpl>> _currentTransaction = ThreadLocal.withInitial(Stack::new);
|
||||||
@Inject
|
@Inject
|
||||||
JObjectManager jObjectManager;
|
TransactionService transactionService;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void begin() {
|
public void begin() {
|
||||||
Log.trace("Starting transaction");
|
Log.trace("Starting transaction");
|
||||||
var tx = jObjectManager.createTransaction();
|
var tx = transactionService.createTransaction();
|
||||||
_currentTransaction.get().push(tx);
|
_currentTransaction.get().push(tx);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -34,7 +34,7 @@ public class TransactionManagerImpl implements TransactionManager {
|
|||||||
|
|
||||||
Pair<Collection<Runnable>, TransactionHandle> ret;
|
Pair<Collection<Runnable>, TransactionHandle> ret;
|
||||||
try {
|
try {
|
||||||
ret = jObjectManager.commit(peeked);
|
ret = transactionService.commit(peeked);
|
||||||
} catch (Throwable e) {
|
} catch (Throwable e) {
|
||||||
Log.trace("Transaction commit failed", e);
|
Log.trace("Transaction commit failed", e);
|
||||||
throw e;
|
throw e;
|
||||||
@@ -64,7 +64,7 @@ public class TransactionManagerImpl implements TransactionManager {
|
|||||||
var peeked = stack.peek();
|
var peeked = stack.peek();
|
||||||
|
|
||||||
try {
|
try {
|
||||||
jObjectManager.rollback(peeked);
|
transactionService.rollback(peeked);
|
||||||
} catch (Throwable e) {
|
} catch (Throwable e) {
|
||||||
Log.error("Transaction rollback failed", e);
|
Log.error("Transaction rollback failed", e);
|
||||||
throw e;
|
throw e;
|
||||||
|
|||||||
@@ -1,27 +0,0 @@
|
|||||||
package com.usatiuk.objects.transaction;
|
|
||||||
|
|
||||||
import com.usatiuk.objects.JData;
|
|
||||||
import com.usatiuk.objects.JDataVersionedWrapper;
|
|
||||||
import com.usatiuk.objects.JObjectKey;
|
|
||||||
import com.usatiuk.objects.snapshot.Snapshot;
|
|
||||||
import com.usatiuk.utils.AutoCloseableNoThrow;
|
|
||||||
|
|
||||||
import java.util.Collection;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.Optional;
|
|
||||||
import java.util.Set;
|
|
||||||
|
|
||||||
// The transaction interface actually used by user code to retrieve objects
|
|
||||||
public interface TransactionPrivate extends Transaction, TransactionHandlePrivate, AutoCloseableNoThrow {
|
|
||||||
Collection<TxRecord.TxObjectRecord<?>> drainNewWrites();
|
|
||||||
|
|
||||||
Map<JObjectKey, Optional<JDataVersionedWrapper>> reads();
|
|
||||||
|
|
||||||
Set<JObjectKey> knownNew();
|
|
||||||
|
|
||||||
<T extends JData> Optional<T> getFromSource(Class<T> type, JObjectKey key);
|
|
||||||
|
|
||||||
Collection<Runnable> getOnCommit();
|
|
||||||
|
|
||||||
Snapshot<JObjectKey, JDataVersionedWrapper> snapshot();
|
|
||||||
}
|
|
||||||
@@ -22,12 +22,10 @@ import java.util.function.Consumer;
|
|||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
|
|
||||||
@ApplicationScoped
|
@ApplicationScoped
|
||||||
public class JObjectManager {
|
public class TransactionService {
|
||||||
private static final List<PreCommitTxHook> _preCommitTxHooks;
|
private static final List<PreCommitTxHook> _preCommitTxHooks;
|
||||||
@Inject
|
@Inject
|
||||||
WritebackObjectPersistentStore writebackObjectPersistentStore;
|
WritebackObjectPersistentStore writebackObjectPersistentStore;
|
||||||
@Inject
|
|
||||||
TransactionFactory transactionFactory;
|
|
||||||
|
|
||||||
private boolean _ready = false;
|
private boolean _ready = false;
|
||||||
private final DataLocker _objLocker = new DataLocker();
|
private final DataLocker _objLocker = new DataLocker();
|
||||||
@@ -36,7 +34,7 @@ public class JObjectManager {
|
|||||||
_preCommitTxHooks = List.copyOf(CDI.current().select(PreCommitTxHook.class).stream().sorted(Comparator.comparingInt(PreCommitTxHook::getPriority)).toList());
|
_preCommitTxHooks = List.copyOf(CDI.current().select(PreCommitTxHook.class).stream().sorted(Comparator.comparingInt(PreCommitTxHook::getPriority)).toList());
|
||||||
}
|
}
|
||||||
|
|
||||||
JObjectManager(Instance<PreCommitTxHook> preCommitTxHooks) {
|
TransactionService(Instance<PreCommitTxHook> preCommitTxHooks) {
|
||||||
Log.debugv("Pre-commit hooks: {0}", String.join("->", _preCommitTxHooks.stream().map(Objects::toString).toList()));
|
Log.debugv("Pre-commit hooks: {0}", String.join("->", _preCommitTxHooks.stream().map(Objects::toString).toList()));
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -48,14 +46,14 @@ public class JObjectManager {
|
|||||||
_ready = true;
|
_ready = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
public TransactionPrivate createTransaction() {
|
public TransactionImpl createTransaction() {
|
||||||
verifyReady();
|
verifyReady();
|
||||||
var tx = transactionFactory.createTransaction();
|
var tx = new TransactionImpl(writebackObjectPersistentStore.getSnapshot());
|
||||||
Log.tracev("Created transaction with snapshotId={0}", tx.snapshot().id());
|
Log.tracev("Created transaction with snapshotId={0}", tx.snapshot().id());
|
||||||
return tx;
|
return tx;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Pair<Collection<Runnable>, TransactionHandle> commit(TransactionPrivate tx) {
|
public Pair<Collection<Runnable>, TransactionHandle> commit(TransactionImpl tx) {
|
||||||
verifyReady();
|
verifyReady();
|
||||||
var writes = new HashMap<JObjectKey, TxRecord.TxObjectRecord<?>>();
|
var writes = new HashMap<JObjectKey, TxRecord.TxObjectRecord<?>>();
|
||||||
Snapshot<JObjectKey, JDataVersionedWrapper> commitSnapshot = null;
|
Snapshot<JObjectKey, JDataVersionedWrapper> commitSnapshot = null;
|
||||||
@@ -259,7 +257,7 @@ public class JObjectManager {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void rollback(TransactionPrivate tx) {
|
public void rollback(TransactionImpl tx) {
|
||||||
verifyReady();
|
verifyReady();
|
||||||
tx.close();
|
tx.close();
|
||||||
}
|
}
|
||||||
@@ -22,7 +22,6 @@
|
|||||||
<properties>
|
<properties>
|
||||||
<compiler-plugin.version>3.12.1</compiler-plugin.version>
|
<compiler-plugin.version>3.12.1</compiler-plugin.version>
|
||||||
<!--FIXME-->
|
<!--FIXME-->
|
||||||
<maven.compiler.release></maven.compiler.release>
|
|
||||||
<maven.compiler.source>21</maven.compiler.source>
|
<maven.compiler.source>21</maven.compiler.source>
|
||||||
<maven.compiler.target>21</maven.compiler.target>
|
<maven.compiler.target>21</maven.compiler.target>
|
||||||
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
|
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
|
||||||
@@ -87,6 +86,19 @@
|
|||||||
|
|
||||||
<build>
|
<build>
|
||||||
<plugins>
|
<plugins>
|
||||||
|
<plugin>
|
||||||
|
<groupId>org.apache.maven.plugins</groupId>
|
||||||
|
<artifactId>maven-javadoc-plugin</artifactId>
|
||||||
|
<version>3.11.2</version>
|
||||||
|
<configuration>
|
||||||
|
<additionalOptions>
|
||||||
|
--add-exports java.base/sun.nio.ch=ALL-UNNAMED
|
||||||
|
--add-exports java.base/jdk.internal.access=ALL-UNNAMED
|
||||||
|
--add-opens=java.base/java.nio=ALL-UNNAMED
|
||||||
|
--enable-preview
|
||||||
|
</additionalOptions>
|
||||||
|
</configuration>
|
||||||
|
</plugin>
|
||||||
<plugin>
|
<plugin>
|
||||||
<groupId>${quarkus.platform.group-id}</groupId>
|
<groupId>${quarkus.platform.group-id}</groupId>
|
||||||
<artifactId>quarkus-maven-plugin</artifactId>
|
<artifactId>quarkus-maven-plugin</artifactId>
|
||||||
|
|||||||
@@ -1,7 +0,0 @@
|
|||||||
package com.usatiuk.utils;
|
|
||||||
|
|
||||||
@FunctionalInterface
|
|
||||||
public interface VoidFn {
|
|
||||||
void apply();
|
|
||||||
}
|
|
||||||
|
|
||||||
@@ -1,7 +0,0 @@
|
|||||||
package com.usatiuk.utils;
|
|
||||||
|
|
||||||
@FunctionalInterface
|
|
||||||
public interface VoidFnThrows {
|
|
||||||
void apply() throws Throwable;
|
|
||||||
}
|
|
||||||
|
|
||||||
Reference in New Issue
Block a user