mirror of
https://github.com/usatiuk/dhfs.git
synced 2025-10-28 20:47:49 +01:00
Compare commits
8 Commits
83ceefa041
...
1025e6b246
| Author | SHA1 | Date | |
|---|---|---|---|
| 1025e6b246 | |||
| 7274131052 | |||
| 930eb38b9b | |||
| afb6f0c270 | |||
| e7f5be689f | |||
| 84b1d57125 | |||
| 2206c13466 | |||
| d1df6b705f |
31
README.md
31
README.md
@@ -16,7 +16,36 @@ the DHFS server in the background, and update itself (hopefully!)
|
||||
|
||||
## How to use it?
|
||||
|
||||
### General prerequisites
|
||||
|
||||
Unpack the run-wrapper and run the `run` script. The filesystem should be mounted to the `fuse` folder in the run-wrapper root directory.
|
||||
Java should be available as `java` in path, and Java 21+ is required.
|
||||
|
||||
FUSE 2 userspace library also should be available:
|
||||
|
||||
- On Ubuntu `libfuse2` package can be installed.
|
||||
|
||||
- On Windows, [WinFsp](https://winfsp.dev/) should be installed.
|
||||
|
||||
- On macOS, [macFUSE](https://macfuse.github.io/).
|
||||
|
||||
In the run-wrapper, 3 scripts are available.
|
||||
|
||||
- `run` script starts the filesystem
|
||||
- `stop` script stops it
|
||||
- `update` script will update the filesystem to the newest available CI build
|
||||
|
||||
On Windows, Powershell alternatives should be used. For them to work, it might be required to allow execution of unsigned scripts using `set-executionpolicy remotesigned`.
|
||||
|
||||
Additional options for the filesystem can be specified in the `extra-opts` file in the same directory with the run scripts.
|
||||
|
||||
One line in the `extra-opts` file corresponds to one option passed to the JVM when starting the filesystem.
|
||||
|
||||
Some extra possible configuration options are:
|
||||
|
||||
- `-Ddhfs.fuse.root=` specifies the root where filesystem should be mounted. By default it is the `fuse` path under the `run-wrapper` root. For windows, it should be a disk letter, by default `Z:\`.
|
||||
- `-Ddhfs.objects.last-seen.timeout=` specifies the period of time (in seconds) after which unavailable peers will be ignored for gabgage collection and resynchronized after being reconnected. The default is 43200 (30 days), if set to `-1`, this feature is disabled.
|
||||
- `-Ddhfs.objects.autosync.download-all=` specifies whether all objects (files and their data) should be downloaded to this peer. `true` or `false`, the default is `false`.
|
||||
|
||||
On Windows, the entire space for the filesystem should also be preallocated, the `-Ddhfs.objects.persistence.lmdb.size=` option controls the size (the value is in bytes), on Windows the default is 100GB.
|
||||
|
||||
Then, a web interface will be available at `losthost:8080`, that can be used to connect with other peers.
|
||||
|
||||
@@ -11,9 +11,10 @@ import java.util.Set;
|
||||
|
||||
/**
|
||||
* File is a data structure that represents a file in the file system
|
||||
*
|
||||
* @param key unique key
|
||||
* @param mode file mode
|
||||
* @param cTime creation time
|
||||
* @param cTime inode modification time
|
||||
* @param mTime modification time
|
||||
* @param symlink true if the file is a symlink, false otherwise
|
||||
*/
|
||||
@@ -40,6 +41,10 @@ public record File(JObjectKey key, long mode, long cTime, long mTime,
|
||||
return new File(key, mode, cTime, System.currentTimeMillis(), symlink);
|
||||
}
|
||||
|
||||
public File withCurrentCTime() {
|
||||
return new File(key, mode, System.currentTimeMillis(), mTime, symlink);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Collection<JObjectKey> collectRefsTo() {
|
||||
return Set.of();
|
||||
|
||||
@@ -19,6 +19,7 @@ import com.usatiuk.dhfsfs.objects.JKleppmannTreeNodeMetaFile;
|
||||
import com.usatiuk.objects.JData;
|
||||
import com.usatiuk.objects.JObjectKey;
|
||||
import com.usatiuk.objects.iterators.IteratorStart;
|
||||
import com.usatiuk.objects.stores.ObjectPersistentStore;
|
||||
import com.usatiuk.objects.transaction.Transaction;
|
||||
import com.usatiuk.objects.transaction.TransactionManager;
|
||||
import com.usatiuk.utils.StatusRuntimeExceptionNoStacktrace;
|
||||
@@ -69,6 +70,8 @@ public class DhfsFileService {
|
||||
JKleppmannTreeManager jKleppmannTreeManager;
|
||||
@Inject
|
||||
JMapHelper jMapHelper;
|
||||
@Inject
|
||||
ObjectPersistentStore objectPersistentStore;
|
||||
|
||||
private JKleppmannTreeManager.JKleppmannTree getTree() {
|
||||
return jKleppmannTreeManager.getTree(JObjectKey.of("fs"), () -> new JKleppmannTreeNodeMetaDirectory(""));
|
||||
@@ -286,7 +289,7 @@ public class DhfsFileService {
|
||||
} else if (dent instanceof RemoteObjectMeta) {
|
||||
var remote = remoteTx.getData(JDataRemote.class, uuid).orElse(null);
|
||||
if (remote instanceof File f) {
|
||||
remoteTx.putData(f.withMode(mode).withCurrentMTime());
|
||||
remoteTx.putData(f.withMode(mode).withCurrentCTime());
|
||||
return true;
|
||||
} else {
|
||||
throw new IllegalArgumentException(uuid + " is not a file");
|
||||
@@ -720,11 +723,10 @@ public class DhfsFileService {
|
||||
* Set the access and modification times of a file.
|
||||
*
|
||||
* @param fileUuid the ID of the file
|
||||
* @param atimeMs the access time in milliseconds
|
||||
* @param mtimeMs the modification time in milliseconds
|
||||
* @return true if the times were set successfully, false otherwise
|
||||
*/
|
||||
public Boolean setTimes(JObjectKey fileUuid, long atimeMs, long mtimeMs) {
|
||||
public Boolean setTimes(JObjectKey fileUuid, long mtimeMs) {
|
||||
return jObjectTxManager.executeTx(() -> {
|
||||
var dent = curTx.get(JData.class, fileUuid).orElseThrow(() -> new StatusRuntimeExceptionNoStacktrace(Status.NOT_FOUND));
|
||||
|
||||
@@ -734,7 +736,7 @@ public class DhfsFileService {
|
||||
} else if (dent instanceof RemoteObjectMeta) {
|
||||
var remote = remoteTx.getData(JDataRemote.class, fileUuid).orElse(null);
|
||||
if (remote instanceof File f) {
|
||||
remoteTx.putData(f.withCTime(atimeMs).withMTime(mtimeMs));
|
||||
remoteTx.putData(f.withCTime(System.currentTimeMillis()).withMTime(mtimeMs));
|
||||
return true;
|
||||
} else {
|
||||
throw new IllegalArgumentException(fileUuid + " is not a file");
|
||||
@@ -781,4 +783,22 @@ public class DhfsFileService {
|
||||
public Long write(JObjectKey fileUuid, long offset, byte[] data) {
|
||||
return write(fileUuid, offset, UnsafeByteOperations.unsafeWrap(data));
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the free space on the filesystem.
|
||||
*
|
||||
* @return the free space in bytes
|
||||
*/
|
||||
public long getFreeSpace() {
|
||||
return objectPersistentStore.getFreeSpace();
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the total space on the filesystem.
|
||||
*
|
||||
* @return the total space in bytes
|
||||
*/
|
||||
public long getTotalSpace() {
|
||||
return objectPersistentStore.getTotalSpace();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -54,13 +54,12 @@ public class DhfsFuse extends FuseStubFS {
|
||||
boolean enabled;
|
||||
@ConfigProperty(name = "dhfs.fuse.debug")
|
||||
Boolean debug;
|
||||
@ConfigProperty(name = "dhfs.files.target_chunk_size")
|
||||
int targetChunkSize;
|
||||
@Inject
|
||||
DhfsFileService fileService;
|
||||
|
||||
/**
|
||||
* Allocate a handle for the given key.
|
||||
*
|
||||
* @param key the key to allocate a handle for
|
||||
* @return the allocated handle, not 0
|
||||
*/
|
||||
@@ -76,6 +75,7 @@ public class DhfsFuse extends FuseStubFS {
|
||||
|
||||
/**
|
||||
* Get the key from the handle.
|
||||
*
|
||||
* @param handle the handle to get the key from
|
||||
* @return the key, or null if not found
|
||||
*/
|
||||
@@ -112,7 +112,6 @@ public class DhfsFuse extends FuseStubFS {
|
||||
opts.add("-o");
|
||||
opts.add("iosize=" + iosize);
|
||||
} else if (SystemUtils.IS_OS_LINUX) {
|
||||
// FIXME: There's something else missing: the writes still seem to be 32k max
|
||||
// opts.add("-o");
|
||||
// opts.add("large_read");
|
||||
opts.add("-o");
|
||||
@@ -144,13 +143,12 @@ public class DhfsFuse extends FuseStubFS {
|
||||
try {
|
||||
stbuf.f_frsize.set(blksize);
|
||||
stbuf.f_bsize.set(blksize);
|
||||
// FIXME:
|
||||
stbuf.f_blocks.set(1024 * 1024 * 1024 / blksize); // total data blocks in file system
|
||||
stbuf.f_bfree.set(1024 * 1024 * 1024 / blksize); // free blocks in fs
|
||||
stbuf.f_bavail.set(1024 * 1024 * 1024 / blksize); // avail blocks in fs
|
||||
stbuf.f_files.set(1000); //FIXME:
|
||||
stbuf.f_ffree.set(Integer.MAX_VALUE - 2000); //FIXME:
|
||||
stbuf.f_favail.set(Integer.MAX_VALUE - 2000); //FIXME:
|
||||
stbuf.f_blocks.set(fileService.getTotalSpace() / blksize); // total data blocks in file system
|
||||
stbuf.f_bfree.set(fileService.getFreeSpace() / blksize); // free blocks in fs
|
||||
stbuf.f_bavail.set(fileService.getFreeSpace() / blksize); // avail blocks in fs
|
||||
stbuf.f_files.set(1000); // TODO: Calculate real file counts?
|
||||
stbuf.f_ffree.set(Integer.MAX_VALUE - 1000);
|
||||
stbuf.f_favail.set(Integer.MAX_VALUE - 1000);
|
||||
stbuf.f_namemax.set(2048);
|
||||
return super.statfs(path, stbuf);
|
||||
} catch (Throwable e) {
|
||||
@@ -186,13 +184,13 @@ public class DhfsFuse extends FuseStubFS {
|
||||
}
|
||||
}
|
||||
|
||||
// FIXME: Race?
|
||||
stat.st_ctim.tv_sec.set(found.get().ctime() / 1000);
|
||||
stat.st_ctim.tv_nsec.set((found.get().ctime() % 1000) * 1000);
|
||||
stat.st_mtim.tv_sec.set(found.get().mtime() / 1000);
|
||||
stat.st_mtim.tv_nsec.set((found.get().mtime() % 1000) * 1000);
|
||||
stat.st_atim.tv_sec.set(found.get().mtime() / 1000);
|
||||
stat.st_atim.tv_nsec.set((found.get().mtime() % 1000) * 1000);
|
||||
var atime = Math.max(found.get().ctime(), found.get().mtime());
|
||||
stat.st_atim.tv_sec.set(atime / 1000);
|
||||
stat.st_atim.tv_nsec.set((atime % 1000) * 1000000L);
|
||||
stat.st_blksize.set(blksize);
|
||||
} catch (Throwable e) {
|
||||
Log.error("When getattr " + path, e);
|
||||
@@ -208,8 +206,7 @@ public class DhfsFuse extends FuseStubFS {
|
||||
if (fileOpt.isEmpty()) return -ErrorCodes.ENOENT();
|
||||
var file = fileOpt.get();
|
||||
var res = fileService.setTimes(file,
|
||||
timespec[0].tv_sec.get() * 1000,
|
||||
timespec[1].tv_sec.get() * 1000);
|
||||
timespec[1].tv_sec.get() * 1000L + timespec[1].tv_nsec.longValue() / 1000000L);
|
||||
if (!res) return -ErrorCodes.EINVAL();
|
||||
else return 0;
|
||||
} catch (Throwable e) {
|
||||
|
||||
@@ -40,11 +40,11 @@ public class DhfsFuseIT {
|
||||
container1 = new GenericContainer<>(DhfsImage.getInstance())
|
||||
.withPrivilegedMode(true)
|
||||
.withCreateContainerCmdModifier(cmd -> Objects.requireNonNull(cmd.getHostConfig()).withDevices(Device.parse("/dev/fuse")))
|
||||
.waitingFor(Wait.forLogMessage(".*Listening.*", 1).withStartupTimeout(Duration.ofSeconds(60))).withNetwork(network);
|
||||
.withNetwork(network);
|
||||
container2 = new GenericContainer<>(DhfsImage.getInstance())
|
||||
.withPrivilegedMode(true)
|
||||
.withCreateContainerCmdModifier(cmd -> Objects.requireNonNull(cmd.getHostConfig()).withDevices(Device.parse("/dev/fuse")))
|
||||
.waitingFor(Wait.forLogMessage(".*Listening.*", 1).withStartupTimeout(Duration.ofSeconds(60))).withNetwork(network);
|
||||
.withNetwork(network);
|
||||
|
||||
Stream.of(container1, container2).parallel().forEach(GenericContainer::start);
|
||||
|
||||
@@ -55,6 +55,9 @@ public class DhfsFuseIT {
|
||||
var loggingConsumer2 = new Slf4jLogConsumer(LoggerFactory.getLogger(DhfsFuseIT.class)).withPrefix("2-" + testInfo.getDisplayName());
|
||||
container2.followOutput(loggingConsumer2.andThen(waitingConsumer2));
|
||||
|
||||
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Listening"), 60, TimeUnit.SECONDS);
|
||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Listening"), 60, TimeUnit.SECONDS);
|
||||
|
||||
c1uuid = container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/data/stuff/self_uuid").getStdout();
|
||||
c2uuid = container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/data/stuff/self_uuid").getStdout();
|
||||
|
||||
|
||||
@@ -48,39 +48,42 @@ public class DhfsFusex3IT {
|
||||
container1 = new GenericContainer<>(DhfsImage.getInstance())
|
||||
.withPrivilegedMode(true)
|
||||
.withCreateContainerCmdModifier(cmd -> Objects.requireNonNull(cmd.getHostConfig()).withDevices(Device.parse("/dev/fuse")))
|
||||
.waitingFor(Wait.forLogMessage(".*Listening.*", 1).withStartupTimeout(Duration.ofSeconds(60))).withNetwork(network);
|
||||
.withNetwork(network);
|
||||
container2 = new GenericContainer<>(DhfsImage.getInstance())
|
||||
.withPrivilegedMode(true)
|
||||
.withCreateContainerCmdModifier(cmd -> Objects.requireNonNull(cmd.getHostConfig()).withDevices(Device.parse("/dev/fuse")))
|
||||
.waitingFor(Wait.forLogMessage(".*Listening.*", 1).withStartupTimeout(Duration.ofSeconds(60))).withNetwork(network);
|
||||
.withNetwork(network);
|
||||
container3 = new GenericContainer<>(DhfsImage.getInstance())
|
||||
.withPrivilegedMode(true)
|
||||
.withCreateContainerCmdModifier(cmd -> Objects.requireNonNull(cmd.getHostConfig()).withDevices(Device.parse("/dev/fuse")))
|
||||
.waitingFor(Wait.forLogMessage(".*Listening.*", 1).withStartupTimeout(Duration.ofSeconds(60))).withNetwork(network);
|
||||
|
||||
.withNetwork(network);
|
||||
|
||||
Stream.of(container1, container2, container3).parallel().forEach(GenericContainer::start);
|
||||
|
||||
waitingConsumer1 = new WaitingConsumer();
|
||||
var loggingConsumer1 = new Slf4jLogConsumer(LoggerFactory.getLogger(DhfsFusex3IT.class))
|
||||
.withPrefix(1 + "-" + testInfo.getDisplayName());
|
||||
container1.followOutput(loggingConsumer1.andThen(waitingConsumer1));
|
||||
waitingConsumer2 = new WaitingConsumer();
|
||||
var loggingConsumer2 = new Slf4jLogConsumer(LoggerFactory.getLogger(DhfsFusex3IT.class))
|
||||
.withPrefix(2 + "-" + testInfo.getDisplayName());
|
||||
container2.followOutput(loggingConsumer2.andThen(waitingConsumer2));
|
||||
waitingConsumer3 = new WaitingConsumer();
|
||||
var loggingConsumer3 = new Slf4jLogConsumer(LoggerFactory.getLogger(DhfsFusex3IT.class))
|
||||
.withPrefix(3 + "-" + testInfo.getDisplayName());
|
||||
container3.followOutput(loggingConsumer3.andThen(waitingConsumer3));
|
||||
|
||||
waitingConsumer3.waitUntil(frame -> frame.getUtf8String().contains("Listening"), 60, TimeUnit.SECONDS);
|
||||
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Listening"), 60, TimeUnit.SECONDS);
|
||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Listening"), 60, TimeUnit.SECONDS);
|
||||
|
||||
c1uuid = container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/data/stuff/self_uuid").getStdout();
|
||||
c2uuid = container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/data/stuff/self_uuid").getStdout();
|
||||
c3uuid = container3.execInContainer("/bin/sh", "-c", "cat /dhfs_test/data/stuff/self_uuid").getStdout();
|
||||
|
||||
Log.info(container1.getContainerId() + "=" + c1uuid);
|
||||
Log.info(container2.getContainerId() + "=" + c2uuid);
|
||||
Log.info(container3.getContainerId() + "=" + c3uuid);
|
||||
|
||||
waitingConsumer1 = new WaitingConsumer();
|
||||
var loggingConsumer1 = new Slf4jLogConsumer(LoggerFactory.getLogger(DhfsFusex3IT.class))
|
||||
.withPrefix(c1uuid.substring(0, 4) + "-" + testInfo.getDisplayName());
|
||||
container1.followOutput(loggingConsumer1.andThen(waitingConsumer1));
|
||||
waitingConsumer2 = new WaitingConsumer();
|
||||
var loggingConsumer2 = new Slf4jLogConsumer(LoggerFactory.getLogger(DhfsFusex3IT.class))
|
||||
.withPrefix(c2uuid.substring(0, 4) + "-" + testInfo.getDisplayName());
|
||||
container2.followOutput(loggingConsumer2.andThen(waitingConsumer2));
|
||||
waitingConsumer3 = new WaitingConsumer();
|
||||
var loggingConsumer3 = new Slf4jLogConsumer(LoggerFactory.getLogger(DhfsFusex3IT.class))
|
||||
.withPrefix(c3uuid.substring(0, 4) + "-" + testInfo.getDisplayName());
|
||||
container3.followOutput(loggingConsumer3.andThen(waitingConsumer3));
|
||||
Log.info(container1.getContainerId() + "=" + c1uuid + " = 1");
|
||||
Log.info(container2.getContainerId() + "=" + c2uuid + " = 2");
|
||||
Log.info(container3.getContainerId() + "=" + c3uuid + " = 3");
|
||||
|
||||
Assertions.assertDoesNotThrow(() -> UUID.fromString(c1uuid));
|
||||
Assertions.assertDoesNotThrow(() -> UUID.fromString(c2uuid));
|
||||
|
||||
@@ -52,12 +52,12 @@ public class KillIT {
|
||||
container1 = new GenericContainer<>(DhfsImage.getInstance())
|
||||
.withPrivilegedMode(true)
|
||||
.withCreateContainerCmdModifier(cmd -> Objects.requireNonNull(cmd.getHostConfig()).withDevices(Device.parse("/dev/fuse")))
|
||||
.waitingFor(Wait.forLogMessage(".*Listening.*", 1).withStartupTimeout(Duration.ofSeconds(60))).withNetwork(network)
|
||||
.withNetwork(network)
|
||||
.withFileSystemBind(data1.getAbsolutePath(), "/dhfs_test/data");
|
||||
container2 = new GenericContainer<>(DhfsImage.getInstance())
|
||||
.withPrivilegedMode(true)
|
||||
.withCreateContainerCmdModifier(cmd -> Objects.requireNonNull(cmd.getHostConfig()).withDevices(Device.parse("/dev/fuse")))
|
||||
.waitingFor(Wait.forLogMessage(".*Listening.*", 1).withStartupTimeout(Duration.ofSeconds(60))).withNetwork(network)
|
||||
.withNetwork(network)
|
||||
.withFileSystemBind(data2.getAbsolutePath(), "/dhfs_test/data");
|
||||
|
||||
Stream.of(container1, container2).parallel().forEach(GenericContainer::start);
|
||||
@@ -69,6 +69,9 @@ public class KillIT {
|
||||
var loggingConsumer2 = new Slf4jLogConsumer(LoggerFactory.getLogger(KillIT.class)).withPrefix("2-" + testInfo.getDisplayName());
|
||||
container2.followOutput(loggingConsumer2.andThen(waitingConsumer2));
|
||||
|
||||
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Listening"), 60, TimeUnit.SECONDS);
|
||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Listening"), 60, TimeUnit.SECONDS);
|
||||
|
||||
c1uuid = container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/data/stuff/self_uuid").getStdout();
|
||||
c2uuid = container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/data/stuff/self_uuid").getStdout();
|
||||
|
||||
|
||||
@@ -64,12 +64,12 @@ public class LazyFsIT {
|
||||
container1 = new GenericContainer<>(DhfsImage.getInstance())
|
||||
.withPrivilegedMode(true)
|
||||
.withCreateContainerCmdModifier(cmd -> Objects.requireNonNull(cmd.getHostConfig()).withDevices(Device.parse("/dev/fuse")))
|
||||
.waitingFor(Wait.forLogMessage(".*Listening.*", 1).withStartupTimeout(Duration.ofSeconds(60))).withNetwork(network)
|
||||
.withNetwork(network)
|
||||
.withFileSystemBind(data1.getAbsolutePath(), "/dhfs_test/data");
|
||||
container2 = new GenericContainer<>(DhfsImage.getInstance())
|
||||
.withPrivilegedMode(true)
|
||||
.withCreateContainerCmdModifier(cmd -> Objects.requireNonNull(cmd.getHostConfig()).withDevices(Device.parse("/dev/fuse")))
|
||||
.waitingFor(Wait.forLogMessage(".*Listening.*", 1).withStartupTimeout(Duration.ofSeconds(60))).withNetwork(network)
|
||||
.withNetwork(network)
|
||||
.withFileSystemBind(data2.getAbsolutePath(), "/dhfs_test/data");
|
||||
|
||||
Stream.of(container1, container2).parallel().forEach(GenericContainer::start);
|
||||
@@ -81,6 +81,9 @@ public class LazyFsIT {
|
||||
var loggingConsumer2 = new Slf4jLogConsumer(LoggerFactory.getLogger(LazyFsIT.class)).withPrefix("2-" + testInfo.getDisplayName());
|
||||
container2.followOutput(loggingConsumer2.andThen(waitingConsumer2));
|
||||
|
||||
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Listening"), 60, TimeUnit.SECONDS);
|
||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Listening"), 60, TimeUnit.SECONDS);
|
||||
|
||||
c1uuid = container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/data/stuff/self_uuid").getStdout();
|
||||
c2uuid = container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/data/stuff/self_uuid").getStdout();
|
||||
|
||||
|
||||
@@ -38,11 +38,11 @@ public class ResyncIT {
|
||||
container1 = new GenericContainer<>(DhfsImage.getInstance())
|
||||
.withPrivilegedMode(true)
|
||||
.withCreateContainerCmdModifier(cmd -> Objects.requireNonNull(cmd.getHostConfig()).withDevices(Device.parse("/dev/fuse")))
|
||||
.waitingFor(Wait.forLogMessage(".*Listening.*", 1).withStartupTimeout(Duration.ofSeconds(60))).withNetwork(network);
|
||||
.withNetwork(network);
|
||||
container2 = new GenericContainer<>(DhfsImage.getInstance())
|
||||
.withPrivilegedMode(true)
|
||||
.withCreateContainerCmdModifier(cmd -> Objects.requireNonNull(cmd.getHostConfig()).withDevices(Device.parse("/dev/fuse")))
|
||||
.waitingFor(Wait.forLogMessage(".*Listening.*", 1).withStartupTimeout(Duration.ofSeconds(60))).withNetwork(network);
|
||||
.withNetwork(network);
|
||||
|
||||
Stream.of(container1, container2).parallel().forEach(GenericContainer::start);
|
||||
|
||||
@@ -52,6 +52,9 @@ public class ResyncIT {
|
||||
waitingConsumer2 = new WaitingConsumer();
|
||||
var loggingConsumer2 = new Slf4jLogConsumer(LoggerFactory.getLogger(DhfsFuseIT.class)).withPrefix("2-" + testInfo.getDisplayName());
|
||||
container2.followOutput(loggingConsumer2.andThen(waitingConsumer2));
|
||||
|
||||
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Listening"), 60, TimeUnit.SECONDS);
|
||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Listening"), 60, TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
@AfterEach
|
||||
|
||||
@@ -67,12 +67,15 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore {
|
||||
Log.info("Initializing with root " + _root);
|
||||
_root.toFile().mkdirs();
|
||||
}
|
||||
Log.info("Opening LMDB with root " + _root);
|
||||
_env = create()
|
||||
.setMapSize(lmdbSize)
|
||||
.setMaxDbs(1)
|
||||
.open(_root.toFile(), EnvFlags.MDB_NOTLS);
|
||||
_db = _env.openDbi(DB_NAME, MDB_CREATE);
|
||||
|
||||
Log.info("Opened LMDB with root " + _root);
|
||||
|
||||
try (Txn<ByteBuffer> txn = _env.txnWrite()) {
|
||||
var read = readTxId(txn);
|
||||
if (read.isPresent()) {
|
||||
@@ -87,6 +90,7 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore {
|
||||
}
|
||||
|
||||
_ready = true;
|
||||
Log.info("LMDB storage ready");
|
||||
}
|
||||
|
||||
private Optional<Long> readTxId(Txn<ByteBuffer> txn) {
|
||||
@@ -95,6 +99,9 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore {
|
||||
}
|
||||
|
||||
void shutdown(@Observes @Priority(900) ShutdownEvent event) throws IOException {
|
||||
if (!_ready) {
|
||||
return;
|
||||
}
|
||||
_ready = false;
|
||||
_db.close();
|
||||
_env.close();
|
||||
@@ -112,6 +119,7 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore {
|
||||
*/
|
||||
@Override
|
||||
public Snapshot<JObjectKey, ByteBuffer> getSnapshot() {
|
||||
verifyReady();
|
||||
var txn = _env.txnRead();
|
||||
try {
|
||||
long commitId = readTxId(txn).orElseThrow();
|
||||
|
||||
@@ -23,14 +23,16 @@ class TransactionImpl implements Transaction, AutoCloseable {
|
||||
private boolean _writeTrack = false;
|
||||
private Map<JObjectKey, TxRecord.TxObjectRecord<?>> _newWrites = new HashMap<>();
|
||||
|
||||
private interface ReadTrackingInternalCrap {
|
||||
/**
|
||||
* Identifies the source of the read: whether it's from the source or written from the transaction.
|
||||
*/
|
||||
private interface ReadTrackingSourceWrapper {
|
||||
boolean fromSource();
|
||||
|
||||
JData obj();
|
||||
}
|
||||
|
||||
// FIXME:
|
||||
private record ReadTrackingInternalCrapSource(JDataVersionedWrapper wrapped) implements ReadTrackingInternalCrap {
|
||||
private record ReadTrackingSourceWrapperSource(JDataVersionedWrapper wrapped) implements ReadTrackingSourceWrapper {
|
||||
@Override
|
||||
public boolean fromSource() {
|
||||
return true;
|
||||
@@ -42,7 +44,7 @@ class TransactionImpl implements Transaction, AutoCloseable {
|
||||
}
|
||||
}
|
||||
|
||||
private record ReadTrackingInternalCrapTx(JData obj) implements ReadTrackingInternalCrap {
|
||||
private record ReadTrackingSourceWrapperTx(JData obj) implements ReadTrackingSourceWrapper {
|
||||
@Override
|
||||
public boolean fromSource() {
|
||||
return false;
|
||||
@@ -107,21 +109,21 @@ class TransactionImpl implements Transaction, AutoCloseable {
|
||||
@Override
|
||||
public CloseableKvIterator<JObjectKey, JData> getIterator(IteratorStart start, JObjectKey key) {
|
||||
Log.tracev("Getting tx iterator with start={0}, key={1}", start, key);
|
||||
return new ReadTrackingIterator(new TombstoneSkippingIterator<JObjectKey, ReadTrackingInternalCrap>(start, key,
|
||||
return new ReadTrackingIterator(new TombstoneSkippingIterator<JObjectKey, ReadTrackingSourceWrapper>(start, key,
|
||||
ListUtils.prependAndMap(
|
||||
new MappingKvIterator<>(new NavigableMapKvIterator<>(_writes, start, key),
|
||||
t -> switch (t) {
|
||||
case TxRecord.TxObjectRecordWrite<?> write ->
|
||||
new DataWrapper<ReadTrackingInternalCrap>(new ReadTrackingInternalCrapTx(write.data()));
|
||||
new DataWrapper<ReadTrackingSourceWrapper>(new ReadTrackingSourceWrapperTx(write.data()));
|
||||
case TxRecord.TxObjectRecordDeleted deleted ->
|
||||
new TombstoneImpl<ReadTrackingInternalCrap>();
|
||||
new TombstoneImpl<ReadTrackingSourceWrapper>();
|
||||
case null, default -> null;
|
||||
}),
|
||||
_snapshot.getIterator(start, key),
|
||||
itin -> new MappingKvIterator<JObjectKey, MaybeTombstone<JDataVersionedWrapper>, MaybeTombstone<ReadTrackingInternalCrap>>(itin,
|
||||
itin -> new MappingKvIterator<JObjectKey, MaybeTombstone<JDataVersionedWrapper>, MaybeTombstone<ReadTrackingSourceWrapper>>(itin,
|
||||
d -> switch (d) {
|
||||
case Data<JDataVersionedWrapper> w ->
|
||||
new DataWrapper<>(new ReadTrackingInternalCrapSource(w.value()));
|
||||
new DataWrapper<>(new ReadTrackingSourceWrapperSource(w.value()));
|
||||
case Tombstone<JDataVersionedWrapper> t -> new TombstoneImpl<>();
|
||||
case null, default -> null;
|
||||
}))));
|
||||
@@ -178,9 +180,9 @@ class TransactionImpl implements Transaction, AutoCloseable {
|
||||
}
|
||||
|
||||
private class ReadTrackingIterator implements CloseableKvIterator<JObjectKey, JData> {
|
||||
private final CloseableKvIterator<JObjectKey, ReadTrackingInternalCrap> _backing;
|
||||
private final CloseableKvIterator<JObjectKey, ReadTrackingSourceWrapper> _backing;
|
||||
|
||||
public ReadTrackingIterator(CloseableKvIterator<JObjectKey, ReadTrackingInternalCrap> backing) {
|
||||
public ReadTrackingIterator(CloseableKvIterator<JObjectKey, ReadTrackingSourceWrapper> backing) {
|
||||
_backing = backing;
|
||||
}
|
||||
|
||||
@@ -202,7 +204,7 @@ class TransactionImpl implements Transaction, AutoCloseable {
|
||||
@Override
|
||||
public Pair<JObjectKey, JData> prev() {
|
||||
var got = _backing.prev();
|
||||
if (got.getValue() instanceof ReadTrackingInternalCrapSource(JDataVersionedWrapper wrapped)) {
|
||||
if (got.getValue() instanceof ReadTrackingSourceWrapperSource(JDataVersionedWrapper wrapped)) {
|
||||
_readSet.putIfAbsent(got.getKey(), Optional.of(wrapped));
|
||||
}
|
||||
return Pair.of(got.getKey(), got.getValue().obj());
|
||||
@@ -231,7 +233,7 @@ class TransactionImpl implements Transaction, AutoCloseable {
|
||||
@Override
|
||||
public Pair<JObjectKey, JData> next() {
|
||||
var got = _backing.next();
|
||||
if (got.getValue() instanceof ReadTrackingInternalCrapSource(JDataVersionedWrapper wrapped)) {
|
||||
if (got.getValue() instanceof ReadTrackingSourceWrapperSource(JDataVersionedWrapper wrapped)) {
|
||||
_readSet.putIfAbsent(got.getKey(), Optional.of(wrapped));
|
||||
}
|
||||
return Pair.of(got.getKey(), got.getValue().obj());
|
||||
|
||||
@@ -67,7 +67,7 @@ public class DeferredInvalidationQueueService implements PeerConnectedEventListe
|
||||
/**
|
||||
* Periodically returns deferred invalidations to the invalidation queue for all reachable hosts.
|
||||
*/
|
||||
@Scheduled(every = "15s", concurrentExecution = Scheduled.ConcurrentExecution.SKIP)
|
||||
@Scheduled(every = "15s", concurrentExecution = Scheduled.ConcurrentExecution.SKIP, skipExecutionIf = Scheduled.ApplicationNotRunning.class)
|
||||
@Blocking
|
||||
void periodicReturn() {
|
||||
for (var reachable : reachablePeerManager.getAvailableHosts())
|
||||
|
||||
@@ -15,7 +15,7 @@ public class PersistentStaticPeerDiscovery {
|
||||
@Inject
|
||||
PersistentPeerDataService persistentPeerDataService;
|
||||
|
||||
@Scheduled(every = "1s", concurrentExecution = Scheduled.ConcurrentExecution.SKIP)
|
||||
@Scheduled(every = "1s", concurrentExecution = Scheduled.ConcurrentExecution.SKIP, skipExecutionIf = Scheduled.ApplicationNotRunning.class)
|
||||
public void discoverPeers() {
|
||||
var addrs = persistentPeerDataService.getPersistentPeerAddresses();
|
||||
for (var addr : addrs) {
|
||||
|
||||
@@ -25,7 +25,7 @@ public class StaticPeerDiscovery {
|
||||
).toList();
|
||||
}
|
||||
|
||||
@Scheduled(every = "1s", concurrentExecution = Scheduled.ConcurrentExecution.SKIP)
|
||||
@Scheduled(every = "1s", concurrentExecution = Scheduled.ConcurrentExecution.SKIP, skipExecutionIf = Scheduled.ApplicationNotRunning.class)
|
||||
public void discoverPeers() {
|
||||
for (var peer : _peers) {
|
||||
peerDiscoveryDirectory.notifyAddr(peer);
|
||||
|
||||
@@ -54,7 +54,7 @@ public class LocalPeerDiscoveryBroadcaster {
|
||||
_socket.close();
|
||||
}
|
||||
|
||||
@Scheduled(every = "${dhfs.objects.peerdiscovery.interval}", concurrentExecution = Scheduled.ConcurrentExecution.SKIP)
|
||||
@Scheduled(every = "${dhfs.objects.peerdiscovery.interval}", concurrentExecution = Scheduled.ConcurrentExecution.SKIP, skipExecutionIf = Scheduled.ApplicationNotRunning.class)
|
||||
public void broadcast() throws Exception {
|
||||
if (!enabled) {
|
||||
return;
|
||||
|
||||
@@ -1,25 +0,0 @@
|
||||
package com.usatiuk.dhfs.peersync;
|
||||
|
||||
import com.usatiuk.dhfs.ProtoSerializer;
|
||||
import com.usatiuk.dhfs.persistence.PeerInfoP;
|
||||
import com.usatiuk.utils.SerializationHelper;
|
||||
import jakarta.inject.Singleton;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
@Singleton
|
||||
public class PeerInfoProtoSerializer implements ProtoSerializer<PeerInfoP, PeerInfo> {
|
||||
@Override
|
||||
public PeerInfo deserialize(PeerInfoP message) {
|
||||
try (var is = message.getSerializedData().newInput()) {
|
||||
return SerializationHelper.deserialize(is);
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public PeerInfoP serialize(PeerInfo object) {
|
||||
return PeerInfoP.newBuilder().setSerializedData(SerializationHelper.serialize(object)).build();
|
||||
}
|
||||
}
|
||||
@@ -30,7 +30,7 @@ public class PeerLastSeenUpdater {
|
||||
@Inject
|
||||
PersistentPeerDataService persistentPeerDataService;
|
||||
|
||||
@Scheduled(every = "${dhfs.objects.last-seen.update}", concurrentExecution = Scheduled.ConcurrentExecution.SKIP)
|
||||
@Scheduled(every = "${dhfs.objects.last-seen.update}", concurrentExecution = Scheduled.ConcurrentExecution.SKIP, skipExecutionIf = Scheduled.ApplicationNotRunning.class)
|
||||
@Blocking
|
||||
void update() {
|
||||
var snapshot = reachablePeerManager.getHostStateSnapshot();
|
||||
@@ -52,11 +52,13 @@ public class PeerLastSeenUpdater {
|
||||
var curInfo = remoteTransaction.getData(PeerInfo.class, u.id()).orElse(null);
|
||||
if (curInfo == null) return;
|
||||
|
||||
if (lastSeenTimeout != -1) {
|
||||
var lastSeen = curInfo.lastSeenTimestamp();
|
||||
if (System.currentTimeMillis() - lastSeen > (lastSeenTimeout * 1000)) {
|
||||
var kicked = curInfo.withIncrementedKickCounter(persistentPeerDataService.getSelfUuid());
|
||||
remoteTransaction.putData(kicked);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -68,27 +68,28 @@ public class PersistentPeerDataService {
|
||||
private KeyPair _selfKeyPair;
|
||||
|
||||
void init(@Observes @Priority(300) StartupEvent event) throws IOException {
|
||||
jObjectTxManager.run(() -> {
|
||||
var selfData = curTx.get(PersistentRemoteHostsData.class, PersistentRemoteHostsData.KEY).orElse(null);
|
||||
var selfData = jObjectTxManager.run(() -> {
|
||||
return curTx.get(PersistentRemoteHostsData.class, PersistentRemoteHostsData.KEY).orElse(null);
|
||||
});
|
||||
if (selfData != null) {
|
||||
_selfUuid = selfData.selfUuid();
|
||||
_selfCertificate = CertificateTools.certFromBytes(selfData.selfCertificate().toByteArray());
|
||||
_selfKeyPair = SerializationHelper.deserialize(selfData.selfKeyPair().toByteArray());
|
||||
return;
|
||||
} else {
|
||||
try {
|
||||
_selfUuid = presetUuid.map(PeerId::of).orElseGet(() -> PeerId.of(UUID.randomUUID().toString()));
|
||||
Log.info("Generating a key pair, please wait");
|
||||
_selfKeyPair = CertificateTools.generateKeyPair();
|
||||
_selfUuid = presetUuid.map(PeerId::of).orElseGet(() -> PeerId.of(UUID.randomUUID().toString()));
|
||||
_selfCertificate = CertificateTools.generateCertificate(_selfKeyPair, _selfUuid.toString());
|
||||
|
||||
jObjectTxManager.run(() -> {
|
||||
try {
|
||||
curTx.put(new PersistentRemoteHostsData(_selfUuid, ByteString.copyFrom(_selfCertificate.getEncoded()), SerializationHelper.serialize(_selfKeyPair), HashTreePSet.empty(), HashTreePMap.empty()));
|
||||
peerInfoService.putPeer(_selfUuid, _selfCertificate.getEncoded());
|
||||
} catch (CertificateEncodingException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
peerTrustManager.reloadTrustManagerHosts(peerInfoService.getPeers());
|
||||
Log.info("Self uuid is: " + _selfUuid.toString());
|
||||
new File(stuffRoot).mkdirs();
|
||||
|
||||
@@ -66,12 +66,11 @@ public class ReachablePeerManager {
|
||||
_disconnectedListeners = List.copyOf(disconnectedListeners.stream().toList());
|
||||
}
|
||||
|
||||
// Note: keep priority updated with below
|
||||
void init(@Observes @Priority(600) StartupEvent event) throws IOException {
|
||||
_heartbeatExecutor = Executors.newVirtualThreadPerTaskExecutor();
|
||||
}
|
||||
|
||||
@Scheduled(every = "${dhfs.objects.reconnect_interval}", concurrentExecution = Scheduled.ConcurrentExecution.SKIP)
|
||||
@Scheduled(every = "${dhfs.objects.reconnect_interval}", concurrentExecution = Scheduled.ConcurrentExecution.SKIP, skipExecutionIf = Scheduled.ApplicationNotRunning.class)
|
||||
@Blocking
|
||||
public void tryConnectAll() {
|
||||
if (_heartbeatExecutor == null) return;
|
||||
@@ -158,6 +157,7 @@ public class ReachablePeerManager {
|
||||
|
||||
/**
|
||||
* Checks if the given host is reachable.
|
||||
*
|
||||
* @param host the host to check
|
||||
* @return true if the host is reachable, false otherwise
|
||||
*/
|
||||
@@ -167,6 +167,7 @@ public class ReachablePeerManager {
|
||||
|
||||
/**
|
||||
* Checks if the given host is reachable.
|
||||
*
|
||||
* @param host the host to check
|
||||
* @return true if the host is reachable, false otherwise
|
||||
*/
|
||||
@@ -176,6 +177,7 @@ public class ReachablePeerManager {
|
||||
|
||||
/**
|
||||
* Gets the address of the given host.
|
||||
*
|
||||
* @param host the host to get the address for
|
||||
* @return the address of the host, or null if not reachable
|
||||
*/
|
||||
@@ -185,6 +187,7 @@ public class ReachablePeerManager {
|
||||
|
||||
/**
|
||||
* Gets the ids of all reachable hosts.
|
||||
*
|
||||
* @return a list of ids of all reachable hosts
|
||||
*/
|
||||
public List<PeerId> getAvailableHosts() {
|
||||
@@ -193,6 +196,7 @@ public class ReachablePeerManager {
|
||||
|
||||
/**
|
||||
* Gets a snapshot of current state of the connected (and not connected) peers
|
||||
*
|
||||
* @return information about all connected/disconnected peers
|
||||
*/
|
||||
public HostStateSnapshot getHostStateSnapshot() {
|
||||
@@ -205,6 +209,7 @@ public class ReachablePeerManager {
|
||||
|
||||
/**
|
||||
* Removes the given host from the cluster
|
||||
*
|
||||
* @param peerId the id of the host to remove
|
||||
*/
|
||||
public void removeRemoteHost(PeerId peerId) {
|
||||
@@ -216,6 +221,7 @@ public class ReachablePeerManager {
|
||||
/**
|
||||
* Selects the best address for the given host.
|
||||
* The address is selected based on the type of the address. (with e.g. LAN address preferred over WAN)
|
||||
*
|
||||
* @param host the host to select the address for
|
||||
* @return the best address for the host, or null if not reachable
|
||||
*/
|
||||
@@ -225,6 +231,7 @@ public class ReachablePeerManager {
|
||||
|
||||
/**
|
||||
* Call the given peer and get its information.
|
||||
*
|
||||
* @param host the peer to get the information for
|
||||
* @return the information about the peer
|
||||
*/
|
||||
@@ -243,6 +250,7 @@ public class ReachablePeerManager {
|
||||
/**
|
||||
* Adds the given peer to the cluster.
|
||||
* The certificate provided is verified against the one peer is using right now.
|
||||
*
|
||||
* @param host the peer to add
|
||||
* @param cert the certificate of the peer
|
||||
*/
|
||||
@@ -264,6 +272,7 @@ public class ReachablePeerManager {
|
||||
|
||||
/**
|
||||
* Gets the information about all reachable peers that are not added to the cluster.
|
||||
*
|
||||
* @return a collection of pairs of peer id and peer info
|
||||
*/
|
||||
public Collection<Pair<PeerId, ApiPeerInfo>> getSeenButNotAddedHosts() {
|
||||
|
||||
@@ -22,20 +22,6 @@ message ObjectChangelog {
|
||||
repeated entries_Entry entries = 1;
|
||||
}
|
||||
|
||||
// TODO: Serialization
|
||||
message FileDtoP {
|
||||
bytes serializedData = 1;
|
||||
}
|
||||
|
||||
message ChunkDataP {
|
||||
JObjectKeyP key = 1;
|
||||
bytes data = 2;
|
||||
}
|
||||
|
||||
message PeerInfoP {
|
||||
bytes serializedData = 1;
|
||||
}
|
||||
|
||||
message JDataRemoteDtoP {
|
||||
bytes serializedData = 1;
|
||||
}
|
||||
|
||||
@@ -1,3 +1,5 @@
|
||||
# https://chatgpt.com/c/681762a4-dddc-800a-adad-2797355013f8
|
||||
|
||||
$ErrorActionPreference = 'Stop'
|
||||
|
||||
$PIDFILE = Join-Path $PSScriptRoot ".pid"
|
||||
|
||||
@@ -1,3 +1,5 @@
|
||||
# https://chatgpt.com/c/681762a4-dddc-800a-adad-2797355013f8
|
||||
|
||||
$ErrorActionPreference = 'Stop'
|
||||
|
||||
$PIDFILE = Join-Path $PSScriptRoot ".pid"
|
||||
|
||||
@@ -1,3 +1,5 @@
|
||||
# https://chatgpt.com/c/681762a4-dddc-800a-adad-2797355013f8
|
||||
|
||||
$ErrorActionPreference = 'Stop'
|
||||
|
||||
$PIDFILE = Join-Path $PSScriptRoot ".pid"
|
||||
|
||||
Reference in New Issue
Block a user