12 Commits

Author SHA1 Message Date
7902d9e486 proper pages 2025-05-12 16:10:43 +02:00
7c056b9674 fix 2025-05-12 16:03:42 +02:00
8cc040b234 fix 2025-05-12 16:01:26 +02:00
f8375c9cd8 javadocs github pages 2025-05-12 15:56:07 +02:00
0c3524851e Some javadocs + CI 2025-05-12 12:49:10 +02:00
3eb7164c0f Dhfs-fuse: fix LazyFsIT tests 2025-05-10 16:49:58 +02:00
f544a67fb5 Objects: cleanup AutoCloseableNoThrow 2025-05-10 13:49:42 +02:00
964b3da951 Objects: remove getUsableSpace 2025-05-10 11:20:34 +02:00
cb33472dc5 Utils: remove VoidFn 2025-05-10 11:07:40 +02:00
de211bb2d2 Objects: remove prepareTx 2025-05-07 16:12:47 +02:00
56ab3bad4c Objects: remove TransactionPrivate 2025-05-07 15:00:15 +02:00
9403556220 Objects: remove TransactionFactory 2025-05-07 14:39:55 +02:00
36 changed files with 559 additions and 504 deletions

View File

@@ -43,11 +43,11 @@ jobs:
distribution: "zulu"
cache: maven
- name: Build LazyFS
run: cd thirdparty/lazyfs/ && ./build.sh
# - name: Build LazyFS
# run: cd thirdparty/lazyfs/ && ./build.sh
- name: Test with Maven
run: cd dhfs-parent && mvn -T $(nproc) --batch-mode --update-snapshots package verify
run: cd dhfs-parent && mvn -T $(nproc) --batch-mode --update-snapshots javadoc:aggregate
# - name: Build with Maven
# run: cd dhfs-parent && mvn --batch-mode --update-snapshots package # -Dquarkus.log.category.\"com.usatiuk.dhfs\".min-level=DEBUG
@@ -57,6 +57,11 @@ jobs:
name: DHFS Server Package
path: dhfs-parent/dhfs-fuse/target/quarkus-app
- uses: actions/upload-artifact@v4
with:
name: DHFS Javadocs
path: dhfs-parent/target/reports/apidocs/
- uses: actions/upload-artifact@v4
if: ${{ always() }}
with:
@@ -231,3 +236,37 @@ jobs:
with:
name: Run wrapper
path: ~/run-wrapper.tar.gz
publish-javadoc:
environment:
name: github-pages
url: ${{ steps.deployment.outputs.page_url }}
runs-on: ubuntu-latest
permissions:
contents: read
pages: write
id-token: write
needs: [build-webui, build-dhfs]
steps:
- name: Checkout repository
uses: actions/checkout@v4
- uses: actions/download-artifact@v4
with:
name: DHFS Javadocs
path: dhfs-javadocs-downloaded
- name: Setup Pages
uses: actions/configure-pages@v5
- name: Upload artifact
uses: actions/upload-pages-artifact@v3
with:
# Upload entire repository
path: 'dhfs-javadocs-downloaded'
- name: Deploy to GitHub Pages
id: deployment
uses: actions/deploy-pages@v4

View File

@@ -41,3 +41,5 @@ nb-configuration.xml
# Plugin directory
/.quarkus/cli/plugins/
.jqwik-database

View File

@@ -5,6 +5,11 @@ import com.usatiuk.dhfs.remoteobj.JDataRemote;
import com.usatiuk.dhfs.remoteobj.JDataRemoteDto;
import com.usatiuk.objects.JObjectKey;
/**
* ChunkData is a data structure that represents an immutable binary blob
* @param key unique key
* @param data binary data
*/
public record ChunkData(JObjectKey key, ByteString data) implements JDataRemote, JDataRemoteDto {
@Override
public int estimateSize() {

View File

@@ -1,26 +0,0 @@
package com.usatiuk.dhfsfs.objects;
import com.usatiuk.dhfs.ProtoSerializer;
import com.usatiuk.dhfs.persistence.ChunkDataP;
import com.usatiuk.dhfs.persistence.JObjectKeyP;
import com.usatiuk.objects.JObjectKey;
import jakarta.inject.Singleton;
@Singleton
public class ChunkDataProtoSerializer implements ProtoSerializer<ChunkDataP, ChunkData> {
@Override
public ChunkData deserialize(ChunkDataP message) {
return new ChunkData(
JObjectKey.of(message.getKey().getName()),
message.getData()
);
}
@Override
public ChunkDataP serialize(ChunkData object) {
return ChunkDataP.newBuilder()
.setKey(JObjectKeyP.newBuilder().setName(object.key().value()).build())
.setData(object.data())
.build();
}
}

View File

@@ -9,6 +9,14 @@ import com.usatiuk.objects.JObjectKey;
import java.util.Collection;
import java.util.Set;
/**
* File is a data structure that represents a file in the file system
* @param key unique key
* @param mode file mode
* @param cTime creation time
* @param mTime modification time
* @param symlink true if the file is a symlink, false otherwise
*/
public record File(JObjectKey key, long mode, long cTime, long mTime,
boolean symlink
) implements JDataRemote, JMapHolder<JMapLongKey> {

View File

@@ -7,6 +7,11 @@ import org.apache.commons.lang3.tuple.Pair;
import java.util.List;
/**
* FileDto is a data transfer object that contains a file and its chunks.
* @param file the file
* @param chunks the list of chunks, each represented as a pair of a long and a JObjectKey
*/
public record FileDto(File file, List<Pair<Long, JObjectKey>> chunks) implements JDataRemoteDto {
@Override
public Class<? extends JDataRemote> objClass() {

View File

@@ -5,6 +5,9 @@ import com.usatiuk.dhfs.syncmap.DtoMapper;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
/**
* Maps a {@link File} object to a {@link FileDto} object and vice versa.
*/
@ApplicationScoped
public class FileDtoMapper implements DtoMapper<File, FileDto> {
@Inject

View File

@@ -10,11 +10,20 @@ import org.apache.commons.lang3.tuple.Pair;
import java.util.ArrayList;
import java.util.List;
/**
* Helper class for working with files.
*/
@ApplicationScoped
public class FileHelper {
@Inject
JMapHelper jMapHelper;
/**
* Get the chunks of a file.
* Transaction is expected to be already started.
* @param file the file to get chunks from
* @return a list of pairs of chunk offset and chunk key
*/
public List<Pair<Long, JObjectKey>> getChunks(File file) {
ArrayList<Pair<Long, JObjectKey>> chunks = new ArrayList<>();
try (var it = jMapHelper.getIterator(file)) {
@@ -26,6 +35,13 @@ public class FileHelper {
return List.copyOf(chunks);
}
/**
* Replace the chunks of a file.
* All previous chunks will be deleted.
* Transaction is expected to be already started.
* @param file the file to replace chunks in
* @param chunks the list of pairs of chunk offset and chunk key
*/
public void replaceChunks(File file, List<Pair<Long, JObjectKey>> chunks) {
jMapHelper.deleteAll(file);

View File

@@ -1,25 +0,0 @@
package com.usatiuk.dhfsfs.objects;
import com.usatiuk.dhfs.ProtoSerializer;
import com.usatiuk.dhfs.persistence.FileDtoP;
import com.usatiuk.utils.SerializationHelper;
import jakarta.inject.Singleton;
import java.io.IOException;
@Singleton
public class FileProtoSerializer implements ProtoSerializer<FileDtoP, FileDto> {
@Override
public FileDto deserialize(FileDtoP message) {
try (var is = message.getSerializedData().newInput()) {
return SerializationHelper.deserialize(is);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public FileDtoP serialize(FileDto object) {
return FileDtoP.newBuilder().setSerializedData(SerializationHelper.serialize(object)).build();
}
}

View File

@@ -23,6 +23,9 @@ import javax.annotation.Nullable;
import java.util.List;
import java.util.Objects;
/**
* Handles synchronization of file objects.
*/
@ApplicationScoped
public class FileSyncHandler implements ObjSyncHandler<File, FileDto> {
@Inject
@@ -45,6 +48,14 @@ public class FileSyncHandler implements ObjSyncHandler<File, FileDto> {
return jKleppmannTreeManager.getTree(JObjectKey.of("fs")).orElseThrow();
}
/**
* Resolve conflict between two file versions, update the file in storage and create a conflict file.
*
* @param from the peer that sent the update
* @param key the key of the file
* @param receivedChangelog the changelog of the received file
* @param receivedData the received file data
*/
private void resolveConflict(PeerId from, JObjectKey key, PMap<PeerId, Long> receivedChangelog,
@Nullable FileDto receivedData) {
var oursCurMeta = curTx.get(RemoteObjectMeta.class, key).orElse(null);

View File

@@ -6,6 +6,10 @@ import com.usatiuk.objects.JObjectKey;
import java.util.Collection;
import java.util.List;
/**
* JKleppmannTreeNodeMetaDirectory is a record that represents a directory in the JKleppmann tree.
* @param name the name of the directory
*/
public record JKleppmannTreeNodeMetaDirectory(String name) implements JKleppmannTreeNodeMeta {
public JKleppmannTreeNodeMeta withName(String name) {
return new JKleppmannTreeNodeMetaDirectory(name);

View File

@@ -6,6 +6,11 @@ import com.usatiuk.objects.JObjectKey;
import java.util.Collection;
import java.util.List;
/**
* JKleppmannTreeNodeMetaFile is a record that represents a file in the JKleppmann tree.
* @param name the name of the file
* @param fileIno a reference to the `File` object
*/
public record JKleppmannTreeNodeMetaFile(String name, JObjectKey fileIno) implements JKleppmannTreeNodeMeta {
@Override
public JKleppmannTreeNodeMeta withName(String name) {

View File

@@ -39,6 +39,9 @@ import java.nio.file.Path;
import java.util.*;
import java.util.stream.StreamSupport;
/**
* Actual filesystem implementation.
*/
@ApplicationScoped
public class DhfsFileService {
@ConfigProperty(name = "dhfs.files.target_chunk_alignment")
@@ -71,6 +74,12 @@ public class DhfsFileService {
return jKleppmannTreeManager.getTree(JObjectKey.of("fs"), () -> new JKleppmannTreeNodeMetaDirectory(""));
}
/**
* Create a new chunk with the given data and a new unique ID.
*
* @param bytes the data to store in the chunk
* @return the created chunk
*/
private ChunkData createChunk(ByteString bytes) {
var newChunk = new ChunkData(JObjectKey.of(UUID.randomUUID().toString()), bytes);
remoteTx.putDataNew(newChunk);
@@ -82,14 +91,7 @@ public class DhfsFileService {
getTree();
}
private JKleppmannTreeNode getDirEntryW(String name) {
var res = getTree().traverse(StreamSupport.stream(Path.of(name).spliterator(), false).map(p -> p.toString()).toList());
if (res == null) throw new StatusRuntimeExceptionNoStacktrace(Status.NOT_FOUND);
var ret = curTx.get(JKleppmannTreeNodeHolder.class, res).map(JKleppmannTreeNodeHolder::node).orElseThrow(() -> new StatusRuntimeException(Status.NOT_FOUND.withDescription("Tree node exists but not found as jObject: " + name)));
return ret;
}
private JKleppmannTreeNode getDirEntryR(String name) {
private JKleppmannTreeNode getDirEntry(String name) {
var res = getTree().traverse(StreamSupport.stream(Path.of(name).spliterator(), false).map(p -> p.toString()).toList());
if (res == null) throw new StatusRuntimeExceptionNoStacktrace(Status.NOT_FOUND);
var ret = curTx.get(JKleppmannTreeNodeHolder.class, res).map(JKleppmannTreeNodeHolder::node).orElseThrow(() -> new StatusRuntimeException(Status.NOT_FOUND.withDescription("Tree node exists but not found as jObject: " + name)));
@@ -103,6 +105,11 @@ public class DhfsFileService {
return ret;
}
/**
* Get the attributes of a file or directory.
* @param uuid the UUID of the file or directory
* @return the attributes of the file or directory
*/
public Optional<GetattrRes> getattr(JObjectKey uuid) {
return jObjectTxManager.executeTx(() -> {
var ref = curTx.get(JData.class, uuid).orElse(null);
@@ -124,10 +131,15 @@ public class DhfsFileService {
});
}
/**
* Try to resolve a path to a file or directory.
* @param name the path to resolve
* @return the key of the file or directory, or an empty optional if it does not exist
*/
public Optional<JObjectKey> open(String name) {
return jObjectTxManager.executeTx(() -> {
try {
var ret = getDirEntryR(name);
var ret = getDirEntry(name);
return switch (ret.meta()) {
case JKleppmannTreeNodeMetaFile f -> Optional.of(f.fileIno());
case JKleppmannTreeNodeMetaDirectory f -> Optional.of(ret.key());
@@ -147,10 +159,16 @@ public class DhfsFileService {
throw new StatusRuntimeExceptionNoStacktrace(Status.INVALID_ARGUMENT.withDescription("Not a directory: " + entry.key()));
}
/**
* Create a new file with the given name and mode.
* @param name the name of the file
* @param mode the mode of the file
* @return the key of the created file
*/
public Optional<JObjectKey> create(String name, long mode) {
return jObjectTxManager.executeTx(() -> {
Path path = Path.of(name);
var parent = getDirEntryW(path.getParent().toString());
var parent = getDirEntry(path.getParent().toString());
ensureDir(parent);
@@ -171,9 +189,14 @@ public class DhfsFileService {
});
}
//FIXME: Slow..
/**
* Get the parent directory of a file or directory.
* @param ino the key of the file or directory
* @return the parent directory
*/
public Pair<String, JObjectKey> inoToParent(JObjectKey ino) {
return jObjectTxManager.executeTx(() -> {
// FIXME: Slow
return getTree().findParent(w -> {
if (w.meta() instanceof JKleppmannTreeNodeMetaFile f)
return f.fileIno().equals(ino);
@@ -182,20 +205,31 @@ public class DhfsFileService {
});
}
/**
* Create a new directory with the given name and mode.
* @param name the name of the directory
* @param mode the mode of the directory
*/
public void mkdir(String name, long mode) {
jObjectTxManager.executeTx(() -> {
Path path = Path.of(name);
var parent = getDirEntryW(path.getParent().toString());
var parent = getDirEntry(path.getParent().toString());
ensureDir(parent);
String dname = path.getFileName().toString();
Log.debug("Creating directory " + name);
// TODO: No modes for directories yet
getTree().move(parent.key(), new JKleppmannTreeNodeMetaDirectory(dname), getTree().getNewNodeId());
});
}
/**
* Unlink a file or directory.
* @param name the name of the file or directory
* @throws DirectoryNotEmptyException if the directory is not empty and recursive delete is not allowed
*/
public void unlink(String name) {
jObjectTxManager.executeTx(() -> {
var node = getDirEntryOpt(name).orElse(null);
@@ -209,13 +243,19 @@ public class DhfsFileService {
});
}
/**
* Rename a file or directory.
* @param from the old name
* @param to the new name
* @return true if the rename was successful, false otherwise
*/
public Boolean rename(String from, String to) {
return jObjectTxManager.executeTx(() -> {
var node = getDirEntryW(from);
var node = getDirEntry(from);
JKleppmannTreeNodeMeta meta = node.meta();
var toPath = Path.of(to);
var toDentry = getDirEntryW(toPath.getParent().toString());
var toDentry = getDirEntry(toPath.getParent().toString());
ensureDir(toDentry);
getTree().move(toDentry.key(), meta.withName(toPath.getFileName().toString()), node.key());
@@ -223,6 +263,12 @@ public class DhfsFileService {
});
}
/**
* Change the mode of a file or directory.
* @param uuid the ID of the file or directory
* @param mode the new mode
* @return true if the mode was changed successfully, false otherwise
*/
public Boolean chmod(JObjectKey uuid, long mode) {
return jObjectTxManager.executeTx(() -> {
var dent = curTx.get(JData.class, uuid).orElseThrow(() -> new StatusRuntimeExceptionNoStacktrace(Status.NOT_FOUND));
@@ -243,9 +289,14 @@ public class DhfsFileService {
});
}
/**
* Read the contents of a directory.
* @param name the path of the directory
* @return an iterable of the names of the files in the directory
*/
public Iterable<String> readDir(String name) {
return jObjectTxManager.executeTx(() -> {
var found = getDirEntryW(name);
var found = getDirEntry(name);
if (!(found.meta() instanceof JKleppmannTreeNodeMetaDirectory md))
throw new StatusRuntimeException(Status.INVALID_ARGUMENT);
@@ -254,6 +305,13 @@ public class DhfsFileService {
});
}
/**
* Read the contents of a file.
* @param fileUuid the ID of the file
* @param offset the offset to start reading from
* @param length the number of bytes to read
* @return the contents of the file as a ByteString
*/
public ByteString read(JObjectKey fileUuid, long offset, int length) {
return jObjectTxManager.executeTx(() -> {
if (length < 0)
@@ -315,6 +373,11 @@ public class DhfsFileService {
});
}
/**
* Get the size of a file.
* @param uuid the ID of the file
* @return the size of the file
*/
private ByteString readChunk(JObjectKey uuid) {
var chunkRead = remoteTx.getData(ChunkData.class, uuid).orElse(null);
@@ -326,6 +389,11 @@ public class DhfsFileService {
return chunkRead.data();
}
/**
* Get the size of a chunk.
* @param uuid the ID of the chunk
* @return the size of the chunk
*/
private int getChunkSize(JObjectKey uuid) {
return readChunk(uuid).size();
}
@@ -334,6 +402,13 @@ public class DhfsFileService {
return num & -(1L << n);
}
/**
* Write data to a file.
* @param fileUuid the ID of the file
* @param offset the offset to write to
* @param data the data to write
* @return the number of bytes written
*/
public Long write(JObjectKey fileUuid, long offset, ByteString data) {
return jObjectTxManager.executeTx(() -> {
if (offset < 0)
@@ -436,6 +511,12 @@ public class DhfsFileService {
});
}
/**
* Truncate a file to the given length.
* @param fileUuid the ID of the file
* @param length the new length of the file
* @return true if the truncate was successful, false otherwise
*/
public Boolean truncate(JObjectKey fileUuid, long length) {
return jObjectTxManager.executeTx(() -> {
if (length < 0)
@@ -525,6 +606,12 @@ public class DhfsFileService {
});
}
/**
* Fill the given range with zeroes.
* @param fillStart the start of the range
* @param length the end of the range
* @param newChunks the map to store the new chunks in
*/
private void fillZeros(long fillStart, long length, Map<Long, JObjectKey> newChunks) {
long combinedSize = (length - fillStart);
@@ -560,12 +647,22 @@ public class DhfsFileService {
}
}
/**
* Read the contents of a symlink.
* @param uuid the ID of the symlink
* @return the contents of the symlink as a string
*/
public String readlink(JObjectKey uuid) {
return jObjectTxManager.executeTx(() -> {
return readlinkBS(uuid).toStringUtf8();
});
}
/**
* Read the contents of a symlink as a ByteString.
* @param uuid the ID of the symlink
* @return the contents of the symlink as a ByteString
*/
public ByteString readlinkBS(JObjectKey uuid) {
return jObjectTxManager.executeTx(() -> {
var fileOpt = remoteTx.getData(File.class, uuid).orElseThrow(() -> new StatusRuntimeException(Status.NOT_FOUND.withDescription("File not found when trying to readlink: " + uuid)));
@@ -573,10 +670,16 @@ public class DhfsFileService {
});
}
/**
* Create a symlink.
* @param oldpath the target of the symlink
* @param newpath the path of the symlink
* @return the key of the created symlink
*/
public JObjectKey symlink(String oldpath, String newpath) {
return jObjectTxManager.executeTx(() -> {
Path path = Path.of(newpath);
var parent = getDirEntryW(path.getParent().toString());
var parent = getDirEntry(path.getParent().toString());
ensureDir(parent);
@@ -595,6 +698,13 @@ public class DhfsFileService {
});
}
/**
* Set the access and modification times of a file.
* @param fileUuid the ID of the file
* @param atimeMs the access time in milliseconds
* @param mtimeMs the modification time in milliseconds
* @return true if the times were set successfully, false otherwise
*/
public Boolean setTimes(JObjectKey fileUuid, long atimeMs, long mtimeMs) {
return jObjectTxManager.executeTx(() -> {
var dent = curTx.get(JData.class, fileUuid).orElseThrow(() -> new StatusRuntimeExceptionNoStacktrace(Status.NOT_FOUND));
@@ -616,6 +726,11 @@ public class DhfsFileService {
});
}
/**
* Get the size of a file.
* @param fileUuid the ID of the file
* @return the size of the file
*/
public long size(JObjectKey fileUuid) {
return jObjectTxManager.executeTx(() -> {
long realSize = 0;
@@ -635,6 +750,13 @@ public class DhfsFileService {
});
}
/**
* Write data to a file.
* @param fileUuid the ID of the file
* @param offset the offset to write to
* @param data the data to write
* @return the number of bytes written
*/
public Long write(JObjectKey fileUuid, long offset, byte[] data) {
return write(fileUuid, offset, UnsafeByteOperations.unsafeWrap(data));
}

View File

@@ -1,5 +1,10 @@
package com.usatiuk.dhfsfs.service;
/**
* DirectoryNotEmptyException is thrown when a directory is not empty.
* This exception is used to indicate that a directory cannot be deleted
* because it contains files or subdirectories.
*/
public class DirectoryNotEmptyException extends RuntimeException {
@Override
public synchronized Throwable fillInStackTrace() {

View File

@@ -1,4 +1,11 @@
package com.usatiuk.dhfsfs.service;
public record GetattrRes(long mtime, long ctime, long mode, GetattrType type) {
/**
* GetattrRes is a record that represents the result of a getattr operation.
* @param mtime File modification time
* @param ctime File creation time
* @param mode File mode
* @param type File type
*/
public record GetattrRes(long mtime, long ctime, long mode, GetattrType type) {
}

View File

@@ -154,7 +154,7 @@ public class LazyFsIT {
Thread.sleep(3000);
Log.info("Killing");
lazyFs1.crash();
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Caused by: org.lmdbjava"), 60, TimeUnit.SECONDS);
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("org.lmdbjava.LmdbNativeException"), 60, TimeUnit.SECONDS);
var client = DockerClientFactory.instance().client();
client.killContainerCmd(container1.getContainerId()).exec();
container1.stop();
@@ -195,7 +195,7 @@ public class LazyFsIT {
lazyFs1.crash();
}
try {
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Caused by: org.lmdbjava"), 60, TimeUnit.SECONDS);
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("org.lmdbjava.LmdbNativeException"), 60, TimeUnit.SECONDS);
} catch (TimeoutException e) {
// Sometimes crash doesn't work
Log.info("Failed to crash: " + testInfo.getDisplayName());
@@ -237,7 +237,7 @@ public class LazyFsIT {
Thread.sleep(3000);
Log.info("Killing");
lazyFs1.crash();
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Caused by: org.lmdbjava"), 60, TimeUnit.SECONDS);
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("org.lmdbjava.LmdbNativeException"), 60, TimeUnit.SECONDS);
var client = DockerClientFactory.instance().client();
client.killContainerCmd(container1.getContainerId()).exec();
container1.stop();
@@ -279,7 +279,7 @@ public class LazyFsIT {
lazyFs1.crash();
}
try {
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Caused by: org.lmdbjava"), 60, TimeUnit.SECONDS);
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("org.lmdbjava.LmdbNativeException"), 60, TimeUnit.SECONDS);
} catch (TimeoutException e) {
// Sometimes crash doesn't work
Log.info("Failed to crash: " + testInfo.getDisplayName());
@@ -322,7 +322,7 @@ public class LazyFsIT {
Log.info("Killing");
lazyFs2.crash();
container1.execInContainer("/bin/sh", "-c", "touch /tmp/stopprinting1");
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Caused by: org.lmdbjava"), 60, TimeUnit.SECONDS);
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("org.lmdbjava.LmdbNativeException"), 60, TimeUnit.SECONDS);
var client = DockerClientFactory.instance().client();
client.killContainerCmd(container2.getContainerId()).exec();
container2.stop();
@@ -366,7 +366,7 @@ public class LazyFsIT {
}
container1.execInContainer("/bin/sh", "-c", "touch /tmp/stopprinting2");
try {
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Caused by: org.lmdbjava"), 60, TimeUnit.SECONDS);
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("org.lmdbjava.LmdbNativeException"), 60, TimeUnit.SECONDS);
} catch (TimeoutException e) {
// Sometimes crash doesn't work
Log.info("Failed to crash: " + testInfo.getDisplayName());
@@ -409,7 +409,7 @@ public class LazyFsIT {
Log.info("Killing");
lazyFs2.crash();
container1.execInContainer("/bin/sh", "-c", "touch /tmp/stopprinting1");
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Caused by: org.lmdbjava"), 60, TimeUnit.SECONDS);
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("org.lmdbjava.LmdbNativeException"), 60, TimeUnit.SECONDS);
var client = DockerClientFactory.instance().client();
client.killContainerCmd(container2.getContainerId()).exec();
container2.stop();
@@ -453,7 +453,7 @@ public class LazyFsIT {
}
container1.execInContainer("/bin/sh", "-c", "touch /tmp/stopprinting2");
try {
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Caused by: org.lmdbjava"), 60, TimeUnit.SECONDS);
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("org.lmdbjava.LmdbNativeException"), 60, TimeUnit.SECONDS);
} catch (TimeoutException e) {
// Sometimes crash doesn't work
Log.info("Failed to crash: " + testInfo.getDisplayName());

View File

@@ -1,32 +0,0 @@
package com.usatiuk.kleppmanntree;
import java.io.Serializable;
public class AtomicClock implements Clock<Long>, Serializable {
private long _max = 0;
public AtomicClock(long counter) {
_max = counter;
}
@Override
public Long getTimestamp() {
return ++_max;
}
public void setTimestamp(Long timestamp) {
_max = timestamp;
}
@Override
public Long peekTimestamp() {
return _max;
}
@Override
public Long updateTimestamp(Long receivedTimestamp) {
var old = _max;
_max = Math.max(_max, receivedTimestamp) + 1;
return old;
}
}

View File

@@ -1,11 +1,10 @@
package com.usatiuk.objects.iterators;
import com.usatiuk.utils.AutoCloseableNoThrow;
import org.apache.commons.lang3.tuple.Pair;
import java.util.Iterator;
public interface CloseableKvIterator<K extends Comparable<? super K>, V> extends Iterator<Pair<K, V>>, AutoCloseableNoThrow {
public interface CloseableKvIterator<K extends Comparable<? super K>, V> extends Iterator<Pair<K, V>>, AutoCloseable {
K peekNextKey();
void skip();
@@ -21,4 +20,7 @@ public interface CloseableKvIterator<K extends Comparable<? super K>, V> extends
default CloseableKvIterator<K, V> reversed() {
return new ReversedKvIterator<K, V>(this);
}
@Override
void close();
}

View File

@@ -3,19 +3,20 @@ package com.usatiuk.objects.snapshot;
import com.usatiuk.objects.iterators.CloseableKvIterator;
import com.usatiuk.objects.iterators.IteratorStart;
import com.usatiuk.objects.iterators.MaybeTombstone;
import com.usatiuk.objects.iterators.Tombstone;
import com.usatiuk.utils.AutoCloseableNoThrow;
import javax.annotation.Nonnull;
import java.util.List;
import java.util.Optional;
import java.util.stream.Stream;
public interface Snapshot<K extends Comparable<K>, V> extends AutoCloseableNoThrow {
public interface Snapshot<K extends Comparable<K>, V> extends AutoCloseable {
List<CloseableKvIterator<K, MaybeTombstone<V>>> getIterator(IteratorStart start, K key);
@Nonnull
Optional<V> readObject(K name);
long id();
@Override
void close();
}

View File

@@ -31,7 +31,6 @@ public class CachingObjectPersistentStore {
SerializingObjectPersistentStore delegate;
@ConfigProperty(name = "dhfs.objects.lru.print-stats")
boolean printStats;
private ExecutorService _commitExecutor;
private ExecutorService _statusExecutor;
private AtomicLong _cached = new AtomicLong();
private AtomicLong _cacheTries = new AtomicLong();
@@ -47,7 +46,6 @@ public class CachingObjectPersistentStore {
_cache.set(_cache.get().withVersion(s.id()));
}
_commitExecutor = Executors.newSingleThreadExecutor();
if (printStats) {
_statusExecutor = Executors.newSingleThreadExecutor();
_statusExecutor.submit(() -> {
@@ -68,7 +66,6 @@ public class CachingObjectPersistentStore {
Log.tracev("Committing: {0} writes, {1} deletes", objs.written().size(), objs.deleted().size());
var cache = _cache.get();
var commitFuture = _commitExecutor.submit(() -> delegate.prepareTx(objs, txId).run());
for (var write : objs.written()) {
cache = cache.withPut(write.getLeft(), Optional.of(write.getRight()));
}
@@ -76,11 +73,7 @@ public class CachingObjectPersistentStore {
cache = cache.withPut(del, Optional.empty());
}
cache = cache.withVersion(txId);
try {
commitFuture.get();
} catch (Exception e) {
throw new RuntimeException(e);
}
delegate.commitTx(objs, txId);
_cache.set(cache);
Log.tracev("Committed: {0} writes, {1} deletes", objs.written().size(), objs.deleted().size());

View File

@@ -145,10 +145,9 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore {
}
@Override
public Runnable prepareTx(TxManifestRaw names, long txId) {
public void commitTx(TxManifestRaw names, long txId) {
verifyReady();
var txn = _env.txnWrite();
try {
try (var txn = _env.txnWrite()) {
for (var written : names.written()) {
var putBb = _db.reserve(txn, written.getKey().toByteBuffer(), written.getValue().size());
written.getValue().copyTo(putBb);
@@ -163,17 +162,8 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore {
bbData.putLong(txId);
bbData.flip();
_db.put(txn, DB_VER_OBJ_NAME.asReadOnlyBuffer(), bbData);
} catch (Throwable t) {
txn.close();
throw t;
txn.commit();
}
return () -> {
try {
txn.commit();
} finally {
txn.close();
}
};
}
@Override
@@ -188,12 +178,6 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore {
return _root.toFile().getFreeSpace();
}
@Override
public long getUsableSpace() {
verifyReady();
return _root.toFile().getUsableSpace();
}
private class LmdbKvIterator extends ReversibleKvIterator<JObjectKey, MaybeTombstone<ByteBuffer>> {
private static final Cleaner CLEANER = Cleaner.create();
private final Txn<ByteBuffer> _txn; // Managed by the snapshot

View File

@@ -53,19 +53,18 @@ public class MemoryObjectPersistentStore implements ObjectPersistentStore {
}
}
public Runnable prepareTx(TxManifestRaw names, long txId) {
return () -> {
synchronized (this) {
for (var written : names.written()) {
_objects = _objects.plus(written.getKey(), written.getValue());
}
for (JObjectKey key : names.deleted()) {
_objects = _objects.minus(key);
}
assert txId > _lastCommitId;
_lastCommitId = txId;
@Override
public void commitTx(TxManifestRaw names, long txId) {
synchronized (this) {
for (var written : names.written()) {
_objects = _objects.plus(written.getKey(), written.getValue());
}
};
for (JObjectKey key : names.deleted()) {
_objects = _objects.minus(key);
}
assert txId > _lastCommitId;
_lastCommitId = txId;
}
}
@Override
@@ -77,9 +76,4 @@ public class MemoryObjectPersistentStore implements ObjectPersistentStore {
public long getFreeSpace() {
return 0;
}
@Override
public long getUsableSpace() {
return 0;
}
}

View File

@@ -13,11 +13,9 @@ import java.util.Optional;
public interface ObjectPersistentStore {
Snapshot<JObjectKey, ByteBuffer> getSnapshot();
Runnable prepareTx(TxManifestRaw names, long txId);
void commitTx(TxManifestRaw names, long txId);
long getTotalSpace();
long getFreeSpace();
long getUsableSpace();
}

View File

@@ -62,7 +62,7 @@ public class SerializingObjectPersistentStore {
, objs.deleted());
}
Runnable prepareTx(TxManifestObj<? extends JDataVersionedWrapper> objects, long txId) {
return delegateStore.prepareTx(prepareManifest(objects), txId);
void commitTx(TxManifestObj<? extends JDataVersionedWrapper> objects, long txId) {
delegateStore.commitTx(prepareManifest(objects), txId);
}
}

View File

@@ -188,7 +188,7 @@ public class WritebackObjectPersistentStore {
Log.info("Writeback thread exiting");
}
public long commitBundle(Collection<TxRecord.TxObjectRecord<?>> writes) {
private long commitBundle(Collection<TxRecord.TxObjectRecord<?>> writes) {
verifyReady();
_pendingBundleLock.lock();
try {

View File

@@ -1,5 +0,0 @@
package com.usatiuk.objects.transaction;
public interface TransactionFactory {
TransactionPrivate createTransaction();
}

View File

@@ -1,262 +0,0 @@
package com.usatiuk.objects.transaction;
import com.usatiuk.objects.JData;
import com.usatiuk.objects.JDataVersionedWrapper;
import com.usatiuk.objects.JObjectKey;
import com.usatiuk.objects.iterators.*;
import com.usatiuk.objects.snapshot.Snapshot;
import com.usatiuk.objects.stores.WritebackObjectPersistentStore;
import com.usatiuk.utils.ListUtils;
import io.quarkus.logging.Log;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import org.apache.commons.lang3.tuple.Pair;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import java.util.*;
@Singleton
public class TransactionFactoryImpl implements TransactionFactory {
@Inject
WritebackObjectPersistentStore writebackObjectPersistentStore;
@Override
public TransactionPrivate createTransaction() {
return new TransactionImpl();
}
private interface ReadTrackingInternalCrap {
boolean fromSource();
JData obj();
}
// FIXME:
private record ReadTrackingInternalCrapSource(JDataVersionedWrapper wrapped) implements ReadTrackingInternalCrap {
@Override
public boolean fromSource() {
return true;
}
@Override
public JData obj() {
return wrapped.data();
}
}
private record ReadTrackingInternalCrapTx(JData obj) implements ReadTrackingInternalCrap {
@Override
public boolean fromSource() {
return false;
}
}
private class TransactionImpl implements TransactionPrivate {
private final Map<JObjectKey, Optional<JDataVersionedWrapper>> _readSet = new HashMap<>();
private final NavigableMap<JObjectKey, TxRecord.TxObjectRecord<?>> _writes = new TreeMap<>();
private final List<Runnable> _onCommit = new LinkedList<>();
private final List<Runnable> _onFlush = new LinkedList<>();
private final HashSet<JObjectKey> _knownNew = new HashSet<>();
private final Snapshot<JObjectKey, JDataVersionedWrapper> _snapshot;
private boolean _closed = false;
private boolean _writeTrack = false;
private Map<JObjectKey, TxRecord.TxObjectRecord<?>> _newWrites = new HashMap<>();
private TransactionImpl() {
_snapshot = writebackObjectPersistentStore.getSnapshot();
}
@Override
public void onCommit(Runnable runnable) {
_onCommit.add(runnable);
}
@Override
public void onFlush(Runnable runnable) {
_onFlush.add(runnable);
}
@Override
public Collection<Runnable> getOnCommit() {
return Collections.unmodifiableCollection(_onCommit);
}
@Override
public Snapshot<JObjectKey, JDataVersionedWrapper> snapshot() {
return _snapshot;
}
@Override
public Collection<Runnable> getOnFlush() {
return Collections.unmodifiableCollection(_onFlush);
}
@Override
public <T extends JData> Optional<T> getFromSource(Class<T> type, JObjectKey key) {
if (_knownNew.contains(key)) {
return Optional.empty();
}
return _readSet.computeIfAbsent(key, _snapshot::readObject)
.map(JDataVersionedWrapper::data)
.map(type::cast);
}
@Override
public <T extends JData> Optional<T> get(Class<T> type, JObjectKey key) {
return switch (_writes.get(key)) {
case TxRecord.TxObjectRecordWrite<?> write -> Optional.of(type.cast(write.data()));
case TxRecord.TxObjectRecordDeleted deleted -> Optional.empty();
case null -> getFromSource(type, key);
};
}
@Override
public void delete(JObjectKey key) {
var record = new TxRecord.TxObjectRecordDeleted(key);
if (_writes.put(key, record) instanceof TxRecord.TxObjectRecordDeleted) {
return;
}
if (_writeTrack)
_newWrites.put(key, record);
}
@Override
public CloseableKvIterator<JObjectKey, JData> getIterator(IteratorStart start, JObjectKey key) {
Log.tracev("Getting tx iterator with start={0}, key={1}", start, key);
return new ReadTrackingIterator(new TombstoneSkippingIterator<JObjectKey, ReadTrackingInternalCrap>(start, key,
ListUtils.prependAndMap(
new MappingKvIterator<>(new NavigableMapKvIterator<>(_writes, start, key),
t -> switch (t) {
case TxRecord.TxObjectRecordWrite<?> write ->
new DataWrapper<ReadTrackingInternalCrap>(new ReadTrackingInternalCrapTx(write.data()));
case TxRecord.TxObjectRecordDeleted deleted ->
new TombstoneImpl<ReadTrackingInternalCrap>();
case null, default -> null;
}),
_snapshot.getIterator(start, key),
itin -> new MappingKvIterator<JObjectKey, MaybeTombstone<JDataVersionedWrapper>, MaybeTombstone<ReadTrackingInternalCrap>>(itin,
d -> switch (d) {
case Data<JDataVersionedWrapper> w ->
new DataWrapper<>(new ReadTrackingInternalCrapSource(w.value()));
case Tombstone<JDataVersionedWrapper> t -> new TombstoneImpl<>();
case null, default -> null;
}))));
}
@Override
public void put(JData obj) {
var key = obj.key();
var read = _readSet.get(key);
if (read != null && (read.map(JDataVersionedWrapper::data).orElse(null) == obj)) {
return;
}
var record = new TxRecord.TxObjectRecordWrite<>(obj);
_writes.put(key, record);
if (_writeTrack)
_newWrites.put(key, record);
}
@Override
public void putNew(JData obj) {
var key = obj.key();
_knownNew.add(key);
var record = new TxRecord.TxObjectRecordWrite<>(obj);
_writes.put(key, record);
if (_writeTrack)
_newWrites.put(key, record);
}
@Override
public Collection<TxRecord.TxObjectRecord<?>> drainNewWrites() {
if (!_writeTrack) {
_writeTrack = true;
return Collections.unmodifiableCollection(_writes.values());
}
var ret = _newWrites;
_newWrites = new HashMap<>();
return ret.values();
}
@Override
public Map<JObjectKey, Optional<JDataVersionedWrapper>> reads() {
return _readSet;
}
@Override
public Set<JObjectKey> knownNew() {
return _knownNew;
}
@Override
public void close() {
if (_closed) return;
_closed = true;
_snapshot.close();
}
private class ReadTrackingIterator implements CloseableKvIterator<JObjectKey, JData> {
private final CloseableKvIterator<JObjectKey, ReadTrackingInternalCrap> _backing;
public ReadTrackingIterator(CloseableKvIterator<JObjectKey, ReadTrackingInternalCrap> backing) {
_backing = backing;
}
@Override
public JObjectKey peekNextKey() {
return _backing.peekNextKey();
}
@Override
public void skip() {
_backing.skip();
}
@Override
public JObjectKey peekPrevKey() {
return _backing.peekPrevKey();
}
@Override
public Pair<JObjectKey, JData> prev() {
var got = _backing.prev();
if (got.getValue() instanceof ReadTrackingInternalCrapSource(JDataVersionedWrapper wrapped)) {
_readSet.putIfAbsent(got.getKey(), Optional.of(wrapped));
}
return Pair.of(got.getKey(), got.getValue().obj());
}
@Override
public boolean hasPrev() {
return _backing.hasPrev();
}
@Override
public void skipPrev() {
_backing.skipPrev();
}
@Override
public void close() {
_backing.close();
}
@Override
public boolean hasNext() {
return _backing.hasNext();
}
@Override
public Pair<JObjectKey, JData> next() {
var got = _backing.next();
if (got.getValue() instanceof ReadTrackingInternalCrapSource(JDataVersionedWrapper wrapped)) {
_readSet.putIfAbsent(got.getKey(), Optional.of(wrapped));
}
return Pair.of(got.getKey(), got.getValue().obj());
}
}
}
}

View File

@@ -1,7 +0,0 @@
package com.usatiuk.objects.transaction;
import java.util.Collection;
public interface TransactionHandlePrivate extends TransactionHandle {
Collection<Runnable> getOnFlush();
}

View File

@@ -0,0 +1,240 @@
package com.usatiuk.objects.transaction;
import com.usatiuk.objects.JData;
import com.usatiuk.objects.JDataVersionedWrapper;
import com.usatiuk.objects.JObjectKey;
import com.usatiuk.objects.iterators.*;
import com.usatiuk.objects.snapshot.Snapshot;
import com.usatiuk.utils.ListUtils;
import io.quarkus.logging.Log;
import org.apache.commons.lang3.tuple.Pair;
import java.util.*;
class TransactionImpl implements Transaction, AutoCloseable {
private final Map<JObjectKey, Optional<JDataVersionedWrapper>> _readSet = new HashMap<>();
private final NavigableMap<JObjectKey, TxRecord.TxObjectRecord<?>> _writes = new TreeMap<>();
private final List<Runnable> _onCommit = new LinkedList<>();
private final List<Runnable> _onFlush = new LinkedList<>();
private final HashSet<JObjectKey> _knownNew = new HashSet<>();
private final Snapshot<JObjectKey, JDataVersionedWrapper> _snapshot;
private boolean _closed = false;
private boolean _writeTrack = false;
private Map<JObjectKey, TxRecord.TxObjectRecord<?>> _newWrites = new HashMap<>();
private interface ReadTrackingInternalCrap {
boolean fromSource();
JData obj();
}
// FIXME:
private record ReadTrackingInternalCrapSource(JDataVersionedWrapper wrapped) implements ReadTrackingInternalCrap {
@Override
public boolean fromSource() {
return true;
}
@Override
public JData obj() {
return wrapped.data();
}
}
private record ReadTrackingInternalCrapTx(JData obj) implements ReadTrackingInternalCrap {
@Override
public boolean fromSource() {
return false;
}
}
TransactionImpl(Snapshot<JObjectKey, JDataVersionedWrapper> snapshot) {
_snapshot = snapshot;
}
@Override
public void onCommit(Runnable runnable) {
_onCommit.add(runnable);
}
@Override
public void onFlush(Runnable runnable) {
_onFlush.add(runnable);
}
Collection<Runnable> getOnCommit() {
return Collections.unmodifiableCollection(_onCommit);
}
Snapshot<JObjectKey, JDataVersionedWrapper> snapshot() {
return _snapshot;
}
Collection<Runnable> getOnFlush() {
return Collections.unmodifiableCollection(_onFlush);
}
<T extends JData> Optional<T> getFromSource(Class<T> type, JObjectKey key) {
if (_knownNew.contains(key)) {
return Optional.empty();
}
return _readSet.computeIfAbsent(key, _snapshot::readObject)
.map(JDataVersionedWrapper::data)
.map(type::cast);
}
@Override
public <T extends JData> Optional<T> get(Class<T> type, JObjectKey key) {
return switch (_writes.get(key)) {
case TxRecord.TxObjectRecordWrite<?> write -> Optional.of(type.cast(write.data()));
case TxRecord.TxObjectRecordDeleted deleted -> Optional.empty();
case null -> getFromSource(type, key);
};
}
@Override
public void delete(JObjectKey key) {
var record = new TxRecord.TxObjectRecordDeleted(key);
if (_writes.put(key, record) instanceof TxRecord.TxObjectRecordDeleted) {
return;
}
if (_writeTrack)
_newWrites.put(key, record);
}
@Override
public CloseableKvIterator<JObjectKey, JData> getIterator(IteratorStart start, JObjectKey key) {
Log.tracev("Getting tx iterator with start={0}, key={1}", start, key);
return new ReadTrackingIterator(new TombstoneSkippingIterator<JObjectKey, ReadTrackingInternalCrap>(start, key,
ListUtils.prependAndMap(
new MappingKvIterator<>(new NavigableMapKvIterator<>(_writes, start, key),
t -> switch (t) {
case TxRecord.TxObjectRecordWrite<?> write ->
new DataWrapper<ReadTrackingInternalCrap>(new ReadTrackingInternalCrapTx(write.data()));
case TxRecord.TxObjectRecordDeleted deleted ->
new TombstoneImpl<ReadTrackingInternalCrap>();
case null, default -> null;
}),
_snapshot.getIterator(start, key),
itin -> new MappingKvIterator<JObjectKey, MaybeTombstone<JDataVersionedWrapper>, MaybeTombstone<ReadTrackingInternalCrap>>(itin,
d -> switch (d) {
case Data<JDataVersionedWrapper> w ->
new DataWrapper<>(new ReadTrackingInternalCrapSource(w.value()));
case Tombstone<JDataVersionedWrapper> t -> new TombstoneImpl<>();
case null, default -> null;
}))));
}
@Override
public void put(JData obj) {
var key = obj.key();
var read = _readSet.get(key);
if (read != null && (read.map(JDataVersionedWrapper::data).orElse(null) == obj)) {
return;
}
var record = new TxRecord.TxObjectRecordWrite<>(obj);
_writes.put(key, record);
if (_writeTrack)
_newWrites.put(key, record);
}
@Override
public void putNew(JData obj) {
var key = obj.key();
_knownNew.add(key);
var record = new TxRecord.TxObjectRecordWrite<>(obj);
_writes.put(key, record);
if (_writeTrack)
_newWrites.put(key, record);
}
Collection<TxRecord.TxObjectRecord<?>> drainNewWrites() {
if (!_writeTrack) {
_writeTrack = true;
return Collections.unmodifiableCollection(_writes.values());
}
var ret = _newWrites;
_newWrites = new HashMap<>();
return ret.values();
}
Map<JObjectKey, Optional<JDataVersionedWrapper>> reads() {
return _readSet;
}
Set<JObjectKey> knownNew() {
return _knownNew;
}
@Override
public void close() {
if (_closed) return;
_closed = true;
_snapshot.close();
}
private class ReadTrackingIterator implements CloseableKvIterator<JObjectKey, JData> {
private final CloseableKvIterator<JObjectKey, ReadTrackingInternalCrap> _backing;
public ReadTrackingIterator(CloseableKvIterator<JObjectKey, ReadTrackingInternalCrap> backing) {
_backing = backing;
}
@Override
public JObjectKey peekNextKey() {
return _backing.peekNextKey();
}
@Override
public void skip() {
_backing.skip();
}
@Override
public JObjectKey peekPrevKey() {
return _backing.peekPrevKey();
}
@Override
public Pair<JObjectKey, JData> prev() {
var got = _backing.prev();
if (got.getValue() instanceof ReadTrackingInternalCrapSource(JDataVersionedWrapper wrapped)) {
_readSet.putIfAbsent(got.getKey(), Optional.of(wrapped));
}
return Pair.of(got.getKey(), got.getValue().obj());
}
@Override
public boolean hasPrev() {
return _backing.hasPrev();
}
@Override
public void skipPrev() {
_backing.skipPrev();
}
@Override
public void close() {
_backing.close();
}
@Override
public boolean hasNext() {
return _backing.hasNext();
}
@Override
public Pair<JObjectKey, JData> next() {
var got = _backing.next();
if (got.getValue() instanceof ReadTrackingInternalCrapSource(JDataVersionedWrapper wrapped)) {
_readSet.putIfAbsent(got.getKey(), Optional.of(wrapped));
}
return Pair.of(got.getKey(), got.getValue().obj());
}
}
}

View File

@@ -1,6 +1,5 @@
package com.usatiuk.objects.transaction;
import com.usatiuk.utils.VoidFn;
import io.quarkus.logging.Log;
import java.util.function.Supplier;
@@ -41,9 +40,9 @@ public interface TransactionManager {
}
}
default TransactionHandle runTries(VoidFn fn, int tries, boolean nest) {
default TransactionHandle runTries(Runnable fn, int tries, boolean nest) {
if (!nest && current() != null) {
fn.apply();
fn.run();
return new TransactionHandle() {
@Override
public void onFlush(Runnable runnable) {
@@ -56,7 +55,7 @@ public interface TransactionManager {
begin();
boolean commit = false;
try {
fn.apply();
fn.run();
commit = true;
var ret = commit();
return ret;
@@ -80,11 +79,11 @@ public interface TransactionManager {
return runTries(supplier, tries, false);
}
default TransactionHandle runTries(VoidFn fn, int tries) {
default TransactionHandle runTries(Runnable fn, int tries) {
return runTries(fn, tries, false);
}
default TransactionHandle run(VoidFn fn, boolean nest) {
default TransactionHandle run(Runnable fn, boolean nest) {
return runTries(fn, 10, nest);
}
@@ -92,7 +91,7 @@ public interface TransactionManager {
return runTries(supplier, 10, nest);
}
default TransactionHandle run(VoidFn fn) {
default TransactionHandle run(Runnable fn) {
return run(fn, false);
}
@@ -100,7 +99,7 @@ public interface TransactionManager {
return run(supplier, false);
}
default void executeTx(VoidFn fn) {
default void executeTx(Runnable fn) {
run(fn, false);
}

View File

@@ -11,14 +11,14 @@ import java.util.Stack;
@Singleton
public class TransactionManagerImpl implements TransactionManager {
private static final ThreadLocal<Stack<TransactionPrivate>> _currentTransaction = ThreadLocal.withInitial(Stack::new);
private static final ThreadLocal<Stack<TransactionImpl>> _currentTransaction = ThreadLocal.withInitial(Stack::new);
@Inject
JObjectManager jObjectManager;
TransactionService transactionService;
@Override
public void begin() {
Log.trace("Starting transaction");
var tx = jObjectManager.createTransaction();
var tx = transactionService.createTransaction();
_currentTransaction.get().push(tx);
}
@@ -34,7 +34,7 @@ public class TransactionManagerImpl implements TransactionManager {
Pair<Collection<Runnable>, TransactionHandle> ret;
try {
ret = jObjectManager.commit(peeked);
ret = transactionService.commit(peeked);
} catch (Throwable e) {
Log.trace("Transaction commit failed", e);
throw e;
@@ -64,7 +64,7 @@ public class TransactionManagerImpl implements TransactionManager {
var peeked = stack.peek();
try {
jObjectManager.rollback(peeked);
transactionService.rollback(peeked);
} catch (Throwable e) {
Log.error("Transaction rollback failed", e);
throw e;

View File

@@ -1,27 +0,0 @@
package com.usatiuk.objects.transaction;
import com.usatiuk.objects.JData;
import com.usatiuk.objects.JDataVersionedWrapper;
import com.usatiuk.objects.JObjectKey;
import com.usatiuk.objects.snapshot.Snapshot;
import com.usatiuk.utils.AutoCloseableNoThrow;
import java.util.Collection;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
// The transaction interface actually used by user code to retrieve objects
public interface TransactionPrivate extends Transaction, TransactionHandlePrivate, AutoCloseableNoThrow {
Collection<TxRecord.TxObjectRecord<?>> drainNewWrites();
Map<JObjectKey, Optional<JDataVersionedWrapper>> reads();
Set<JObjectKey> knownNew();
<T extends JData> Optional<T> getFromSource(Class<T> type, JObjectKey key);
Collection<Runnable> getOnCommit();
Snapshot<JObjectKey, JDataVersionedWrapper> snapshot();
}

View File

@@ -22,12 +22,10 @@ import java.util.function.Consumer;
import java.util.function.Function;
@ApplicationScoped
public class JObjectManager {
public class TransactionService {
private static final List<PreCommitTxHook> _preCommitTxHooks;
@Inject
WritebackObjectPersistentStore writebackObjectPersistentStore;
@Inject
TransactionFactory transactionFactory;
private boolean _ready = false;
private final DataLocker _objLocker = new DataLocker();
@@ -36,7 +34,7 @@ public class JObjectManager {
_preCommitTxHooks = List.copyOf(CDI.current().select(PreCommitTxHook.class).stream().sorted(Comparator.comparingInt(PreCommitTxHook::getPriority)).toList());
}
JObjectManager(Instance<PreCommitTxHook> preCommitTxHooks) {
TransactionService(Instance<PreCommitTxHook> preCommitTxHooks) {
Log.debugv("Pre-commit hooks: {0}", String.join("->", _preCommitTxHooks.stream().map(Objects::toString).toList()));
}
@@ -48,14 +46,14 @@ public class JObjectManager {
_ready = true;
}
public TransactionPrivate createTransaction() {
public TransactionImpl createTransaction() {
verifyReady();
var tx = transactionFactory.createTransaction();
var tx = new TransactionImpl(writebackObjectPersistentStore.getSnapshot());
Log.tracev("Created transaction with snapshotId={0}", tx.snapshot().id());
return tx;
}
public Pair<Collection<Runnable>, TransactionHandle> commit(TransactionPrivate tx) {
public Pair<Collection<Runnable>, TransactionHandle> commit(TransactionImpl tx) {
verifyReady();
var writes = new HashMap<JObjectKey, TxRecord.TxObjectRecord<?>>();
Snapshot<JObjectKey, JDataVersionedWrapper> commitSnapshot = null;
@@ -259,7 +257,7 @@ public class JObjectManager {
}
}
public void rollback(TransactionPrivate tx) {
public void rollback(TransactionImpl tx) {
verifyReady();
tx.close();
}

View File

@@ -22,7 +22,6 @@
<properties>
<compiler-plugin.version>3.12.1</compiler-plugin.version>
<!--FIXME-->
<maven.compiler.release></maven.compiler.release>
<maven.compiler.source>21</maven.compiler.source>
<maven.compiler.target>21</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
@@ -87,6 +86,19 @@
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-javadoc-plugin</artifactId>
<version>3.11.2</version>
<configuration>
<additionalOptions>
--add-exports java.base/sun.nio.ch=ALL-UNNAMED
--add-exports java.base/jdk.internal.access=ALL-UNNAMED
--add-opens=java.base/java.nio=ALL-UNNAMED
--enable-preview
</additionalOptions>
</configuration>
</plugin>
<plugin>
<groupId>${quarkus.platform.group-id}</groupId>
<artifactId>quarkus-maven-plugin</artifactId>

View File

@@ -1,7 +0,0 @@
package com.usatiuk.utils;
@FunctionalInterface
public interface VoidFn {
void apply();
}

View File

@@ -1,7 +0,0 @@
package com.usatiuk.utils;
@FunctionalInterface
public interface VoidFnThrows {
void apply() throws Throwable;
}