8 Commits

Author SHA1 Message Date
1025e6b246 Sync-base: cleanup proto files 2025-05-14 12:14:38 +02:00
7274131052 update readme 2025-05-14 12:05:13 +02:00
930eb38b9b Objects: cleanup ReadTrackingSourceWrapper 2025-05-14 11:41:11 +02:00
afb6f0c270 Dhfs-fuse: fix ctime 2025-05-14 11:39:40 +02:00
e7f5be689f Dhfs-fuse: report real filesystem space 2025-05-14 11:32:00 +02:00
84b1d57125 Sync-base: skip scheduled execution when app is not running 2025-05-14 11:27:39 +02:00
2206c13466 More lmdb logs 2025-05-14 11:18:05 +02:00
d1df6b705f Show logs for test containers that failed starting
it just... doesn't start sometimes???
2025-05-14 10:38:55 +02:00
23 changed files with 186 additions and 131 deletions

View File

@@ -16,7 +16,36 @@ the DHFS server in the background, and update itself (hopefully!)
## How to use it? ## How to use it?
### General prerequisites
Unpack the run-wrapper and run the `run` script. The filesystem should be mounted to the `fuse` folder in the run-wrapper root directory. Java should be available as `java` in path, and Java 21+ is required.
FUSE 2 userspace library also should be available:
- On Ubuntu `libfuse2` package can be installed.
- On Windows, [WinFsp](https://winfsp.dev/) should be installed.
- On macOS, [macFUSE](https://macfuse.github.io/).
In the run-wrapper, 3 scripts are available.
- `run` script starts the filesystem
- `stop` script stops it
- `update` script will update the filesystem to the newest available CI build
On Windows, Powershell alternatives should be used. For them to work, it might be required to allow execution of unsigned scripts using `set-executionpolicy remotesigned`.
Additional options for the filesystem can be specified in the `extra-opts` file in the same directory with the run scripts.
One line in the `extra-opts` file corresponds to one option passed to the JVM when starting the filesystem.
Some extra possible configuration options are:
- `-Ddhfs.fuse.root=` specifies the root where filesystem should be mounted. By default it is the `fuse` path under the `run-wrapper` root. For windows, it should be a disk letter, by default `Z:\`.
- `-Ddhfs.objects.last-seen.timeout=` specifies the period of time (in seconds) after which unavailable peers will be ignored for gabgage collection and resynchronized after being reconnected. The default is 43200 (30 days), if set to `-1`, this feature is disabled.
- `-Ddhfs.objects.autosync.download-all=` specifies whether all objects (files and their data) should be downloaded to this peer. `true` or `false`, the default is `false`.
On Windows, the entire space for the filesystem should also be preallocated, the `-Ddhfs.objects.persistence.lmdb.size=` option controls the size (the value is in bytes), on Windows the default is 100GB.
Then, a web interface will be available at `losthost:8080`, that can be used to connect with other peers. Then, a web interface will be available at `losthost:8080`, that can be used to connect with other peers.

View File

@@ -11,10 +11,11 @@ import java.util.Set;
/** /**
* File is a data structure that represents a file in the file system * File is a data structure that represents a file in the file system
* @param key unique key *
* @param mode file mode * @param key unique key
* @param cTime creation time * @param mode file mode
* @param mTime modification time * @param cTime inode modification time
* @param mTime modification time
* @param symlink true if the file is a symlink, false otherwise * @param symlink true if the file is a symlink, false otherwise
*/ */
public record File(JObjectKey key, long mode, long cTime, long mTime, public record File(JObjectKey key, long mode, long cTime, long mTime,
@@ -40,6 +41,10 @@ public record File(JObjectKey key, long mode, long cTime, long mTime,
return new File(key, mode, cTime, System.currentTimeMillis(), symlink); return new File(key, mode, cTime, System.currentTimeMillis(), symlink);
} }
public File withCurrentCTime() {
return new File(key, mode, System.currentTimeMillis(), mTime, symlink);
}
@Override @Override
public Collection<JObjectKey> collectRefsTo() { public Collection<JObjectKey> collectRefsTo() {
return Set.of(); return Set.of();

View File

@@ -19,6 +19,7 @@ import com.usatiuk.dhfsfs.objects.JKleppmannTreeNodeMetaFile;
import com.usatiuk.objects.JData; import com.usatiuk.objects.JData;
import com.usatiuk.objects.JObjectKey; import com.usatiuk.objects.JObjectKey;
import com.usatiuk.objects.iterators.IteratorStart; import com.usatiuk.objects.iterators.IteratorStart;
import com.usatiuk.objects.stores.ObjectPersistentStore;
import com.usatiuk.objects.transaction.Transaction; import com.usatiuk.objects.transaction.Transaction;
import com.usatiuk.objects.transaction.TransactionManager; import com.usatiuk.objects.transaction.TransactionManager;
import com.usatiuk.utils.StatusRuntimeExceptionNoStacktrace; import com.usatiuk.utils.StatusRuntimeExceptionNoStacktrace;
@@ -69,6 +70,8 @@ public class DhfsFileService {
JKleppmannTreeManager jKleppmannTreeManager; JKleppmannTreeManager jKleppmannTreeManager;
@Inject @Inject
JMapHelper jMapHelper; JMapHelper jMapHelper;
@Inject
ObjectPersistentStore objectPersistentStore;
private JKleppmannTreeManager.JKleppmannTree getTree() { private JKleppmannTreeManager.JKleppmannTree getTree() {
return jKleppmannTreeManager.getTree(JObjectKey.of("fs"), () -> new JKleppmannTreeNodeMetaDirectory("")); return jKleppmannTreeManager.getTree(JObjectKey.of("fs"), () -> new JKleppmannTreeNodeMetaDirectory(""));
@@ -286,7 +289,7 @@ public class DhfsFileService {
} else if (dent instanceof RemoteObjectMeta) { } else if (dent instanceof RemoteObjectMeta) {
var remote = remoteTx.getData(JDataRemote.class, uuid).orElse(null); var remote = remoteTx.getData(JDataRemote.class, uuid).orElse(null);
if (remote instanceof File f) { if (remote instanceof File f) {
remoteTx.putData(f.withMode(mode).withCurrentMTime()); remoteTx.putData(f.withMode(mode).withCurrentCTime());
return true; return true;
} else { } else {
throw new IllegalArgumentException(uuid + " is not a file"); throw new IllegalArgumentException(uuid + " is not a file");
@@ -720,11 +723,10 @@ public class DhfsFileService {
* Set the access and modification times of a file. * Set the access and modification times of a file.
* *
* @param fileUuid the ID of the file * @param fileUuid the ID of the file
* @param atimeMs the access time in milliseconds
* @param mtimeMs the modification time in milliseconds * @param mtimeMs the modification time in milliseconds
* @return true if the times were set successfully, false otherwise * @return true if the times were set successfully, false otherwise
*/ */
public Boolean setTimes(JObjectKey fileUuid, long atimeMs, long mtimeMs) { public Boolean setTimes(JObjectKey fileUuid, long mtimeMs) {
return jObjectTxManager.executeTx(() -> { return jObjectTxManager.executeTx(() -> {
var dent = curTx.get(JData.class, fileUuid).orElseThrow(() -> new StatusRuntimeExceptionNoStacktrace(Status.NOT_FOUND)); var dent = curTx.get(JData.class, fileUuid).orElseThrow(() -> new StatusRuntimeExceptionNoStacktrace(Status.NOT_FOUND));
@@ -734,7 +736,7 @@ public class DhfsFileService {
} else if (dent instanceof RemoteObjectMeta) { } else if (dent instanceof RemoteObjectMeta) {
var remote = remoteTx.getData(JDataRemote.class, fileUuid).orElse(null); var remote = remoteTx.getData(JDataRemote.class, fileUuid).orElse(null);
if (remote instanceof File f) { if (remote instanceof File f) {
remoteTx.putData(f.withCTime(atimeMs).withMTime(mtimeMs)); remoteTx.putData(f.withCTime(System.currentTimeMillis()).withMTime(mtimeMs));
return true; return true;
} else { } else {
throw new IllegalArgumentException(fileUuid + " is not a file"); throw new IllegalArgumentException(fileUuid + " is not a file");
@@ -781,4 +783,22 @@ public class DhfsFileService {
public Long write(JObjectKey fileUuid, long offset, byte[] data) { public Long write(JObjectKey fileUuid, long offset, byte[] data) {
return write(fileUuid, offset, UnsafeByteOperations.unsafeWrap(data)); return write(fileUuid, offset, UnsafeByteOperations.unsafeWrap(data));
} }
/**
* Get the free space on the filesystem.
*
* @return the free space in bytes
*/
public long getFreeSpace() {
return objectPersistentStore.getFreeSpace();
}
/**
* Get the total space on the filesystem.
*
* @return the total space in bytes
*/
public long getTotalSpace() {
return objectPersistentStore.getTotalSpace();
}
} }

View File

@@ -54,13 +54,12 @@ public class DhfsFuse extends FuseStubFS {
boolean enabled; boolean enabled;
@ConfigProperty(name = "dhfs.fuse.debug") @ConfigProperty(name = "dhfs.fuse.debug")
Boolean debug; Boolean debug;
@ConfigProperty(name = "dhfs.files.target_chunk_size")
int targetChunkSize;
@Inject @Inject
DhfsFileService fileService; DhfsFileService fileService;
/** /**
* Allocate a handle for the given key. * Allocate a handle for the given key.
*
* @param key the key to allocate a handle for * @param key the key to allocate a handle for
* @return the allocated handle, not 0 * @return the allocated handle, not 0
*/ */
@@ -76,11 +75,12 @@ public class DhfsFuse extends FuseStubFS {
/** /**
* Get the key from the handle. * Get the key from the handle.
*
* @param handle the handle to get the key from * @param handle the handle to get the key from
* @return the key, or null if not found * @return the key, or null if not found
*/ */
private JObjectKey getFromHandle(long handle) { private JObjectKey getFromHandle(long handle) {
if(handle == 0) if (handle == 0)
throw new IllegalStateException("Handle is 0"); throw new IllegalStateException("Handle is 0");
return _openHandles.get(handle); return _openHandles.get(handle);
} }
@@ -112,7 +112,6 @@ public class DhfsFuse extends FuseStubFS {
opts.add("-o"); opts.add("-o");
opts.add("iosize=" + iosize); opts.add("iosize=" + iosize);
} else if (SystemUtils.IS_OS_LINUX) { } else if (SystemUtils.IS_OS_LINUX) {
// FIXME: There's something else missing: the writes still seem to be 32k max
// opts.add("-o"); // opts.add("-o");
// opts.add("large_read"); // opts.add("large_read");
opts.add("-o"); opts.add("-o");
@@ -144,13 +143,12 @@ public class DhfsFuse extends FuseStubFS {
try { try {
stbuf.f_frsize.set(blksize); stbuf.f_frsize.set(blksize);
stbuf.f_bsize.set(blksize); stbuf.f_bsize.set(blksize);
// FIXME: stbuf.f_blocks.set(fileService.getTotalSpace() / blksize); // total data blocks in file system
stbuf.f_blocks.set(1024 * 1024 * 1024 / blksize); // total data blocks in file system stbuf.f_bfree.set(fileService.getFreeSpace() / blksize); // free blocks in fs
stbuf.f_bfree.set(1024 * 1024 * 1024 / blksize); // free blocks in fs stbuf.f_bavail.set(fileService.getFreeSpace() / blksize); // avail blocks in fs
stbuf.f_bavail.set(1024 * 1024 * 1024 / blksize); // avail blocks in fs stbuf.f_files.set(1000); // TODO: Calculate real file counts?
stbuf.f_files.set(1000); //FIXME: stbuf.f_ffree.set(Integer.MAX_VALUE - 1000);
stbuf.f_ffree.set(Integer.MAX_VALUE - 2000); //FIXME: stbuf.f_favail.set(Integer.MAX_VALUE - 1000);
stbuf.f_favail.set(Integer.MAX_VALUE - 2000); //FIXME:
stbuf.f_namemax.set(2048); stbuf.f_namemax.set(2048);
return super.statfs(path, stbuf); return super.statfs(path, stbuf);
} catch (Throwable e) { } catch (Throwable e) {
@@ -186,13 +184,13 @@ public class DhfsFuse extends FuseStubFS {
} }
} }
// FIXME: Race?
stat.st_ctim.tv_sec.set(found.get().ctime() / 1000); stat.st_ctim.tv_sec.set(found.get().ctime() / 1000);
stat.st_ctim.tv_nsec.set((found.get().ctime() % 1000) * 1000); stat.st_ctim.tv_nsec.set((found.get().ctime() % 1000) * 1000);
stat.st_mtim.tv_sec.set(found.get().mtime() / 1000); stat.st_mtim.tv_sec.set(found.get().mtime() / 1000);
stat.st_mtim.tv_nsec.set((found.get().mtime() % 1000) * 1000); stat.st_mtim.tv_nsec.set((found.get().mtime() % 1000) * 1000);
stat.st_atim.tv_sec.set(found.get().mtime() / 1000); var atime = Math.max(found.get().ctime(), found.get().mtime());
stat.st_atim.tv_nsec.set((found.get().mtime() % 1000) * 1000); stat.st_atim.tv_sec.set(atime / 1000);
stat.st_atim.tv_nsec.set((atime % 1000) * 1000000L);
stat.st_blksize.set(blksize); stat.st_blksize.set(blksize);
} catch (Throwable e) { } catch (Throwable e) {
Log.error("When getattr " + path, e); Log.error("When getattr " + path, e);
@@ -208,8 +206,7 @@ public class DhfsFuse extends FuseStubFS {
if (fileOpt.isEmpty()) return -ErrorCodes.ENOENT(); if (fileOpt.isEmpty()) return -ErrorCodes.ENOENT();
var file = fileOpt.get(); var file = fileOpt.get();
var res = fileService.setTimes(file, var res = fileService.setTimes(file,
timespec[0].tv_sec.get() * 1000, timespec[1].tv_sec.get() * 1000L + timespec[1].tv_nsec.longValue() / 1000000L);
timespec[1].tv_sec.get() * 1000);
if (!res) return -ErrorCodes.EINVAL(); if (!res) return -ErrorCodes.EINVAL();
else return 0; else return 0;
} catch (Throwable e) { } catch (Throwable e) {

View File

@@ -40,11 +40,11 @@ public class DhfsFuseIT {
container1 = new GenericContainer<>(DhfsImage.getInstance()) container1 = new GenericContainer<>(DhfsImage.getInstance())
.withPrivilegedMode(true) .withPrivilegedMode(true)
.withCreateContainerCmdModifier(cmd -> Objects.requireNonNull(cmd.getHostConfig()).withDevices(Device.parse("/dev/fuse"))) .withCreateContainerCmdModifier(cmd -> Objects.requireNonNull(cmd.getHostConfig()).withDevices(Device.parse("/dev/fuse")))
.waitingFor(Wait.forLogMessage(".*Listening.*", 1).withStartupTimeout(Duration.ofSeconds(60))).withNetwork(network); .withNetwork(network);
container2 = new GenericContainer<>(DhfsImage.getInstance()) container2 = new GenericContainer<>(DhfsImage.getInstance())
.withPrivilegedMode(true) .withPrivilegedMode(true)
.withCreateContainerCmdModifier(cmd -> Objects.requireNonNull(cmd.getHostConfig()).withDevices(Device.parse("/dev/fuse"))) .withCreateContainerCmdModifier(cmd -> Objects.requireNonNull(cmd.getHostConfig()).withDevices(Device.parse("/dev/fuse")))
.waitingFor(Wait.forLogMessage(".*Listening.*", 1).withStartupTimeout(Duration.ofSeconds(60))).withNetwork(network); .withNetwork(network);
Stream.of(container1, container2).parallel().forEach(GenericContainer::start); Stream.of(container1, container2).parallel().forEach(GenericContainer::start);
@@ -55,6 +55,9 @@ public class DhfsFuseIT {
var loggingConsumer2 = new Slf4jLogConsumer(LoggerFactory.getLogger(DhfsFuseIT.class)).withPrefix("2-" + testInfo.getDisplayName()); var loggingConsumer2 = new Slf4jLogConsumer(LoggerFactory.getLogger(DhfsFuseIT.class)).withPrefix("2-" + testInfo.getDisplayName());
container2.followOutput(loggingConsumer2.andThen(waitingConsumer2)); container2.followOutput(loggingConsumer2.andThen(waitingConsumer2));
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Listening"), 60, TimeUnit.SECONDS);
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Listening"), 60, TimeUnit.SECONDS);
c1uuid = container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/data/stuff/self_uuid").getStdout(); c1uuid = container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/data/stuff/self_uuid").getStdout();
c2uuid = container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/data/stuff/self_uuid").getStdout(); c2uuid = container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/data/stuff/self_uuid").getStdout();

View File

@@ -48,39 +48,42 @@ public class DhfsFusex3IT {
container1 = new GenericContainer<>(DhfsImage.getInstance()) container1 = new GenericContainer<>(DhfsImage.getInstance())
.withPrivilegedMode(true) .withPrivilegedMode(true)
.withCreateContainerCmdModifier(cmd -> Objects.requireNonNull(cmd.getHostConfig()).withDevices(Device.parse("/dev/fuse"))) .withCreateContainerCmdModifier(cmd -> Objects.requireNonNull(cmd.getHostConfig()).withDevices(Device.parse("/dev/fuse")))
.waitingFor(Wait.forLogMessage(".*Listening.*", 1).withStartupTimeout(Duration.ofSeconds(60))).withNetwork(network); .withNetwork(network);
container2 = new GenericContainer<>(DhfsImage.getInstance()) container2 = new GenericContainer<>(DhfsImage.getInstance())
.withPrivilegedMode(true) .withPrivilegedMode(true)
.withCreateContainerCmdModifier(cmd -> Objects.requireNonNull(cmd.getHostConfig()).withDevices(Device.parse("/dev/fuse"))) .withCreateContainerCmdModifier(cmd -> Objects.requireNonNull(cmd.getHostConfig()).withDevices(Device.parse("/dev/fuse")))
.waitingFor(Wait.forLogMessage(".*Listening.*", 1).withStartupTimeout(Duration.ofSeconds(60))).withNetwork(network); .withNetwork(network);
container3 = new GenericContainer<>(DhfsImage.getInstance()) container3 = new GenericContainer<>(DhfsImage.getInstance())
.withPrivilegedMode(true) .withPrivilegedMode(true)
.withCreateContainerCmdModifier(cmd -> Objects.requireNonNull(cmd.getHostConfig()).withDevices(Device.parse("/dev/fuse"))) .withCreateContainerCmdModifier(cmd -> Objects.requireNonNull(cmd.getHostConfig()).withDevices(Device.parse("/dev/fuse")))
.waitingFor(Wait.forLogMessage(".*Listening.*", 1).withStartupTimeout(Duration.ofSeconds(60))).withNetwork(network); .withNetwork(network);
Stream.of(container1, container2, container3).parallel().forEach(GenericContainer::start); Stream.of(container1, container2, container3).parallel().forEach(GenericContainer::start);
waitingConsumer1 = new WaitingConsumer();
var loggingConsumer1 = new Slf4jLogConsumer(LoggerFactory.getLogger(DhfsFusex3IT.class))
.withPrefix(1 + "-" + testInfo.getDisplayName());
container1.followOutput(loggingConsumer1.andThen(waitingConsumer1));
waitingConsumer2 = new WaitingConsumer();
var loggingConsumer2 = new Slf4jLogConsumer(LoggerFactory.getLogger(DhfsFusex3IT.class))
.withPrefix(2 + "-" + testInfo.getDisplayName());
container2.followOutput(loggingConsumer2.andThen(waitingConsumer2));
waitingConsumer3 = new WaitingConsumer();
var loggingConsumer3 = new Slf4jLogConsumer(LoggerFactory.getLogger(DhfsFusex3IT.class))
.withPrefix(3 + "-" + testInfo.getDisplayName());
container3.followOutput(loggingConsumer3.andThen(waitingConsumer3));
waitingConsumer3.waitUntil(frame -> frame.getUtf8String().contains("Listening"), 60, TimeUnit.SECONDS);
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Listening"), 60, TimeUnit.SECONDS);
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Listening"), 60, TimeUnit.SECONDS);
c1uuid = container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/data/stuff/self_uuid").getStdout(); c1uuid = container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/data/stuff/self_uuid").getStdout();
c2uuid = container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/data/stuff/self_uuid").getStdout(); c2uuid = container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/data/stuff/self_uuid").getStdout();
c3uuid = container3.execInContainer("/bin/sh", "-c", "cat /dhfs_test/data/stuff/self_uuid").getStdout(); c3uuid = container3.execInContainer("/bin/sh", "-c", "cat /dhfs_test/data/stuff/self_uuid").getStdout();
Log.info(container1.getContainerId() + "=" + c1uuid); Log.info(container1.getContainerId() + "=" + c1uuid + " = 1");
Log.info(container2.getContainerId() + "=" + c2uuid); Log.info(container2.getContainerId() + "=" + c2uuid + " = 2");
Log.info(container3.getContainerId() + "=" + c3uuid); Log.info(container3.getContainerId() + "=" + c3uuid + " = 3");
waitingConsumer1 = new WaitingConsumer();
var loggingConsumer1 = new Slf4jLogConsumer(LoggerFactory.getLogger(DhfsFusex3IT.class))
.withPrefix(c1uuid.substring(0, 4) + "-" + testInfo.getDisplayName());
container1.followOutput(loggingConsumer1.andThen(waitingConsumer1));
waitingConsumer2 = new WaitingConsumer();
var loggingConsumer2 = new Slf4jLogConsumer(LoggerFactory.getLogger(DhfsFusex3IT.class))
.withPrefix(c2uuid.substring(0, 4) + "-" + testInfo.getDisplayName());
container2.followOutput(loggingConsumer2.andThen(waitingConsumer2));
waitingConsumer3 = new WaitingConsumer();
var loggingConsumer3 = new Slf4jLogConsumer(LoggerFactory.getLogger(DhfsFusex3IT.class))
.withPrefix(c3uuid.substring(0, 4) + "-" + testInfo.getDisplayName());
container3.followOutput(loggingConsumer3.andThen(waitingConsumer3));
Assertions.assertDoesNotThrow(() -> UUID.fromString(c1uuid)); Assertions.assertDoesNotThrow(() -> UUID.fromString(c1uuid));
Assertions.assertDoesNotThrow(() -> UUID.fromString(c2uuid)); Assertions.assertDoesNotThrow(() -> UUID.fromString(c2uuid));

View File

@@ -52,12 +52,12 @@ public class KillIT {
container1 = new GenericContainer<>(DhfsImage.getInstance()) container1 = new GenericContainer<>(DhfsImage.getInstance())
.withPrivilegedMode(true) .withPrivilegedMode(true)
.withCreateContainerCmdModifier(cmd -> Objects.requireNonNull(cmd.getHostConfig()).withDevices(Device.parse("/dev/fuse"))) .withCreateContainerCmdModifier(cmd -> Objects.requireNonNull(cmd.getHostConfig()).withDevices(Device.parse("/dev/fuse")))
.waitingFor(Wait.forLogMessage(".*Listening.*", 1).withStartupTimeout(Duration.ofSeconds(60))).withNetwork(network) .withNetwork(network)
.withFileSystemBind(data1.getAbsolutePath(), "/dhfs_test/data"); .withFileSystemBind(data1.getAbsolutePath(), "/dhfs_test/data");
container2 = new GenericContainer<>(DhfsImage.getInstance()) container2 = new GenericContainer<>(DhfsImage.getInstance())
.withPrivilegedMode(true) .withPrivilegedMode(true)
.withCreateContainerCmdModifier(cmd -> Objects.requireNonNull(cmd.getHostConfig()).withDevices(Device.parse("/dev/fuse"))) .withCreateContainerCmdModifier(cmd -> Objects.requireNonNull(cmd.getHostConfig()).withDevices(Device.parse("/dev/fuse")))
.waitingFor(Wait.forLogMessage(".*Listening.*", 1).withStartupTimeout(Duration.ofSeconds(60))).withNetwork(network) .withNetwork(network)
.withFileSystemBind(data2.getAbsolutePath(), "/dhfs_test/data"); .withFileSystemBind(data2.getAbsolutePath(), "/dhfs_test/data");
Stream.of(container1, container2).parallel().forEach(GenericContainer::start); Stream.of(container1, container2).parallel().forEach(GenericContainer::start);
@@ -69,6 +69,9 @@ public class KillIT {
var loggingConsumer2 = new Slf4jLogConsumer(LoggerFactory.getLogger(KillIT.class)).withPrefix("2-" + testInfo.getDisplayName()); var loggingConsumer2 = new Slf4jLogConsumer(LoggerFactory.getLogger(KillIT.class)).withPrefix("2-" + testInfo.getDisplayName());
container2.followOutput(loggingConsumer2.andThen(waitingConsumer2)); container2.followOutput(loggingConsumer2.andThen(waitingConsumer2));
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Listening"), 60, TimeUnit.SECONDS);
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Listening"), 60, TimeUnit.SECONDS);
c1uuid = container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/data/stuff/self_uuid").getStdout(); c1uuid = container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/data/stuff/self_uuid").getStdout();
c2uuid = container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/data/stuff/self_uuid").getStdout(); c2uuid = container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/data/stuff/self_uuid").getStdout();

View File

@@ -64,12 +64,12 @@ public class LazyFsIT {
container1 = new GenericContainer<>(DhfsImage.getInstance()) container1 = new GenericContainer<>(DhfsImage.getInstance())
.withPrivilegedMode(true) .withPrivilegedMode(true)
.withCreateContainerCmdModifier(cmd -> Objects.requireNonNull(cmd.getHostConfig()).withDevices(Device.parse("/dev/fuse"))) .withCreateContainerCmdModifier(cmd -> Objects.requireNonNull(cmd.getHostConfig()).withDevices(Device.parse("/dev/fuse")))
.waitingFor(Wait.forLogMessage(".*Listening.*", 1).withStartupTimeout(Duration.ofSeconds(60))).withNetwork(network) .withNetwork(network)
.withFileSystemBind(data1.getAbsolutePath(), "/dhfs_test/data"); .withFileSystemBind(data1.getAbsolutePath(), "/dhfs_test/data");
container2 = new GenericContainer<>(DhfsImage.getInstance()) container2 = new GenericContainer<>(DhfsImage.getInstance())
.withPrivilegedMode(true) .withPrivilegedMode(true)
.withCreateContainerCmdModifier(cmd -> Objects.requireNonNull(cmd.getHostConfig()).withDevices(Device.parse("/dev/fuse"))) .withCreateContainerCmdModifier(cmd -> Objects.requireNonNull(cmd.getHostConfig()).withDevices(Device.parse("/dev/fuse")))
.waitingFor(Wait.forLogMessage(".*Listening.*", 1).withStartupTimeout(Duration.ofSeconds(60))).withNetwork(network) .withNetwork(network)
.withFileSystemBind(data2.getAbsolutePath(), "/dhfs_test/data"); .withFileSystemBind(data2.getAbsolutePath(), "/dhfs_test/data");
Stream.of(container1, container2).parallel().forEach(GenericContainer::start); Stream.of(container1, container2).parallel().forEach(GenericContainer::start);
@@ -81,6 +81,9 @@ public class LazyFsIT {
var loggingConsumer2 = new Slf4jLogConsumer(LoggerFactory.getLogger(LazyFsIT.class)).withPrefix("2-" + testInfo.getDisplayName()); var loggingConsumer2 = new Slf4jLogConsumer(LoggerFactory.getLogger(LazyFsIT.class)).withPrefix("2-" + testInfo.getDisplayName());
container2.followOutput(loggingConsumer2.andThen(waitingConsumer2)); container2.followOutput(loggingConsumer2.andThen(waitingConsumer2));
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Listening"), 60, TimeUnit.SECONDS);
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Listening"), 60, TimeUnit.SECONDS);
c1uuid = container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/data/stuff/self_uuid").getStdout(); c1uuid = container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/data/stuff/self_uuid").getStdout();
c2uuid = container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/data/stuff/self_uuid").getStdout(); c2uuid = container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/data/stuff/self_uuid").getStdout();

View File

@@ -38,11 +38,11 @@ public class ResyncIT {
container1 = new GenericContainer<>(DhfsImage.getInstance()) container1 = new GenericContainer<>(DhfsImage.getInstance())
.withPrivilegedMode(true) .withPrivilegedMode(true)
.withCreateContainerCmdModifier(cmd -> Objects.requireNonNull(cmd.getHostConfig()).withDevices(Device.parse("/dev/fuse"))) .withCreateContainerCmdModifier(cmd -> Objects.requireNonNull(cmd.getHostConfig()).withDevices(Device.parse("/dev/fuse")))
.waitingFor(Wait.forLogMessage(".*Listening.*", 1).withStartupTimeout(Duration.ofSeconds(60))).withNetwork(network); .withNetwork(network);
container2 = new GenericContainer<>(DhfsImage.getInstance()) container2 = new GenericContainer<>(DhfsImage.getInstance())
.withPrivilegedMode(true) .withPrivilegedMode(true)
.withCreateContainerCmdModifier(cmd -> Objects.requireNonNull(cmd.getHostConfig()).withDevices(Device.parse("/dev/fuse"))) .withCreateContainerCmdModifier(cmd -> Objects.requireNonNull(cmd.getHostConfig()).withDevices(Device.parse("/dev/fuse")))
.waitingFor(Wait.forLogMessage(".*Listening.*", 1).withStartupTimeout(Duration.ofSeconds(60))).withNetwork(network); .withNetwork(network);
Stream.of(container1, container2).parallel().forEach(GenericContainer::start); Stream.of(container1, container2).parallel().forEach(GenericContainer::start);
@@ -52,6 +52,9 @@ public class ResyncIT {
waitingConsumer2 = new WaitingConsumer(); waitingConsumer2 = new WaitingConsumer();
var loggingConsumer2 = new Slf4jLogConsumer(LoggerFactory.getLogger(DhfsFuseIT.class)).withPrefix("2-" + testInfo.getDisplayName()); var loggingConsumer2 = new Slf4jLogConsumer(LoggerFactory.getLogger(DhfsFuseIT.class)).withPrefix("2-" + testInfo.getDisplayName());
container2.followOutput(loggingConsumer2.andThen(waitingConsumer2)); container2.followOutput(loggingConsumer2.andThen(waitingConsumer2));
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Listening"), 60, TimeUnit.SECONDS);
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Listening"), 60, TimeUnit.SECONDS);
} }
@AfterEach @AfterEach

View File

@@ -67,12 +67,15 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore {
Log.info("Initializing with root " + _root); Log.info("Initializing with root " + _root);
_root.toFile().mkdirs(); _root.toFile().mkdirs();
} }
Log.info("Opening LMDB with root " + _root);
_env = create() _env = create()
.setMapSize(lmdbSize) .setMapSize(lmdbSize)
.setMaxDbs(1) .setMaxDbs(1)
.open(_root.toFile(), EnvFlags.MDB_NOTLS); .open(_root.toFile(), EnvFlags.MDB_NOTLS);
_db = _env.openDbi(DB_NAME, MDB_CREATE); _db = _env.openDbi(DB_NAME, MDB_CREATE);
Log.info("Opened LMDB with root " + _root);
try (Txn<ByteBuffer> txn = _env.txnWrite()) { try (Txn<ByteBuffer> txn = _env.txnWrite()) {
var read = readTxId(txn); var read = readTxId(txn);
if (read.isPresent()) { if (read.isPresent()) {
@@ -87,6 +90,7 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore {
} }
_ready = true; _ready = true;
Log.info("LMDB storage ready");
} }
private Optional<Long> readTxId(Txn<ByteBuffer> txn) { private Optional<Long> readTxId(Txn<ByteBuffer> txn) {
@@ -95,6 +99,9 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore {
} }
void shutdown(@Observes @Priority(900) ShutdownEvent event) throws IOException { void shutdown(@Observes @Priority(900) ShutdownEvent event) throws IOException {
if (!_ready) {
return;
}
_ready = false; _ready = false;
_db.close(); _db.close();
_env.close(); _env.close();
@@ -112,6 +119,7 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore {
*/ */
@Override @Override
public Snapshot<JObjectKey, ByteBuffer> getSnapshot() { public Snapshot<JObjectKey, ByteBuffer> getSnapshot() {
verifyReady();
var txn = _env.txnRead(); var txn = _env.txnRead();
try { try {
long commitId = readTxId(txn).orElseThrow(); long commitId = readTxId(txn).orElseThrow();

View File

@@ -23,14 +23,16 @@ class TransactionImpl implements Transaction, AutoCloseable {
private boolean _writeTrack = false; private boolean _writeTrack = false;
private Map<JObjectKey, TxRecord.TxObjectRecord<?>> _newWrites = new HashMap<>(); private Map<JObjectKey, TxRecord.TxObjectRecord<?>> _newWrites = new HashMap<>();
private interface ReadTrackingInternalCrap { /**
* Identifies the source of the read: whether it's from the source or written from the transaction.
*/
private interface ReadTrackingSourceWrapper {
boolean fromSource(); boolean fromSource();
JData obj(); JData obj();
} }
// FIXME: private record ReadTrackingSourceWrapperSource(JDataVersionedWrapper wrapped) implements ReadTrackingSourceWrapper {
private record ReadTrackingInternalCrapSource(JDataVersionedWrapper wrapped) implements ReadTrackingInternalCrap {
@Override @Override
public boolean fromSource() { public boolean fromSource() {
return true; return true;
@@ -42,7 +44,7 @@ class TransactionImpl implements Transaction, AutoCloseable {
} }
} }
private record ReadTrackingInternalCrapTx(JData obj) implements ReadTrackingInternalCrap { private record ReadTrackingSourceWrapperTx(JData obj) implements ReadTrackingSourceWrapper {
@Override @Override
public boolean fromSource() { public boolean fromSource() {
return false; return false;
@@ -107,21 +109,21 @@ class TransactionImpl implements Transaction, AutoCloseable {
@Override @Override
public CloseableKvIterator<JObjectKey, JData> getIterator(IteratorStart start, JObjectKey key) { public CloseableKvIterator<JObjectKey, JData> getIterator(IteratorStart start, JObjectKey key) {
Log.tracev("Getting tx iterator with start={0}, key={1}", start, key); Log.tracev("Getting tx iterator with start={0}, key={1}", start, key);
return new ReadTrackingIterator(new TombstoneSkippingIterator<JObjectKey, ReadTrackingInternalCrap>(start, key, return new ReadTrackingIterator(new TombstoneSkippingIterator<JObjectKey, ReadTrackingSourceWrapper>(start, key,
ListUtils.prependAndMap( ListUtils.prependAndMap(
new MappingKvIterator<>(new NavigableMapKvIterator<>(_writes, start, key), new MappingKvIterator<>(new NavigableMapKvIterator<>(_writes, start, key),
t -> switch (t) { t -> switch (t) {
case TxRecord.TxObjectRecordWrite<?> write -> case TxRecord.TxObjectRecordWrite<?> write ->
new DataWrapper<ReadTrackingInternalCrap>(new ReadTrackingInternalCrapTx(write.data())); new DataWrapper<ReadTrackingSourceWrapper>(new ReadTrackingSourceWrapperTx(write.data()));
case TxRecord.TxObjectRecordDeleted deleted -> case TxRecord.TxObjectRecordDeleted deleted ->
new TombstoneImpl<ReadTrackingInternalCrap>(); new TombstoneImpl<ReadTrackingSourceWrapper>();
case null, default -> null; case null, default -> null;
}), }),
_snapshot.getIterator(start, key), _snapshot.getIterator(start, key),
itin -> new MappingKvIterator<JObjectKey, MaybeTombstone<JDataVersionedWrapper>, MaybeTombstone<ReadTrackingInternalCrap>>(itin, itin -> new MappingKvIterator<JObjectKey, MaybeTombstone<JDataVersionedWrapper>, MaybeTombstone<ReadTrackingSourceWrapper>>(itin,
d -> switch (d) { d -> switch (d) {
case Data<JDataVersionedWrapper> w -> case Data<JDataVersionedWrapper> w ->
new DataWrapper<>(new ReadTrackingInternalCrapSource(w.value())); new DataWrapper<>(new ReadTrackingSourceWrapperSource(w.value()));
case Tombstone<JDataVersionedWrapper> t -> new TombstoneImpl<>(); case Tombstone<JDataVersionedWrapper> t -> new TombstoneImpl<>();
case null, default -> null; case null, default -> null;
})))); }))));
@@ -178,9 +180,9 @@ class TransactionImpl implements Transaction, AutoCloseable {
} }
private class ReadTrackingIterator implements CloseableKvIterator<JObjectKey, JData> { private class ReadTrackingIterator implements CloseableKvIterator<JObjectKey, JData> {
private final CloseableKvIterator<JObjectKey, ReadTrackingInternalCrap> _backing; private final CloseableKvIterator<JObjectKey, ReadTrackingSourceWrapper> _backing;
public ReadTrackingIterator(CloseableKvIterator<JObjectKey, ReadTrackingInternalCrap> backing) { public ReadTrackingIterator(CloseableKvIterator<JObjectKey, ReadTrackingSourceWrapper> backing) {
_backing = backing; _backing = backing;
} }
@@ -202,7 +204,7 @@ class TransactionImpl implements Transaction, AutoCloseable {
@Override @Override
public Pair<JObjectKey, JData> prev() { public Pair<JObjectKey, JData> prev() {
var got = _backing.prev(); var got = _backing.prev();
if (got.getValue() instanceof ReadTrackingInternalCrapSource(JDataVersionedWrapper wrapped)) { if (got.getValue() instanceof ReadTrackingSourceWrapperSource(JDataVersionedWrapper wrapped)) {
_readSet.putIfAbsent(got.getKey(), Optional.of(wrapped)); _readSet.putIfAbsent(got.getKey(), Optional.of(wrapped));
} }
return Pair.of(got.getKey(), got.getValue().obj()); return Pair.of(got.getKey(), got.getValue().obj());
@@ -231,7 +233,7 @@ class TransactionImpl implements Transaction, AutoCloseable {
@Override @Override
public Pair<JObjectKey, JData> next() { public Pair<JObjectKey, JData> next() {
var got = _backing.next(); var got = _backing.next();
if (got.getValue() instanceof ReadTrackingInternalCrapSource(JDataVersionedWrapper wrapped)) { if (got.getValue() instanceof ReadTrackingSourceWrapperSource(JDataVersionedWrapper wrapped)) {
_readSet.putIfAbsent(got.getKey(), Optional.of(wrapped)); _readSet.putIfAbsent(got.getKey(), Optional.of(wrapped));
} }
return Pair.of(got.getKey(), got.getValue().obj()); return Pair.of(got.getKey(), got.getValue().obj());

View File

@@ -67,7 +67,7 @@ public class DeferredInvalidationQueueService implements PeerConnectedEventListe
/** /**
* Periodically returns deferred invalidations to the invalidation queue for all reachable hosts. * Periodically returns deferred invalidations to the invalidation queue for all reachable hosts.
*/ */
@Scheduled(every = "15s", concurrentExecution = Scheduled.ConcurrentExecution.SKIP) @Scheduled(every = "15s", concurrentExecution = Scheduled.ConcurrentExecution.SKIP, skipExecutionIf = Scheduled.ApplicationNotRunning.class)
@Blocking @Blocking
void periodicReturn() { void periodicReturn() {
for (var reachable : reachablePeerManager.getAvailableHosts()) for (var reachable : reachablePeerManager.getAvailableHosts())

View File

@@ -15,7 +15,7 @@ public class PersistentStaticPeerDiscovery {
@Inject @Inject
PersistentPeerDataService persistentPeerDataService; PersistentPeerDataService persistentPeerDataService;
@Scheduled(every = "1s", concurrentExecution = Scheduled.ConcurrentExecution.SKIP) @Scheduled(every = "1s", concurrentExecution = Scheduled.ConcurrentExecution.SKIP, skipExecutionIf = Scheduled.ApplicationNotRunning.class)
public void discoverPeers() { public void discoverPeers() {
var addrs = persistentPeerDataService.getPersistentPeerAddresses(); var addrs = persistentPeerDataService.getPersistentPeerAddresses();
for (var addr : addrs) { for (var addr : addrs) {

View File

@@ -25,7 +25,7 @@ public class StaticPeerDiscovery {
).toList(); ).toList();
} }
@Scheduled(every = "1s", concurrentExecution = Scheduled.ConcurrentExecution.SKIP) @Scheduled(every = "1s", concurrentExecution = Scheduled.ConcurrentExecution.SKIP, skipExecutionIf = Scheduled.ApplicationNotRunning.class)
public void discoverPeers() { public void discoverPeers() {
for (var peer : _peers) { for (var peer : _peers) {
peerDiscoveryDirectory.notifyAddr(peer); peerDiscoveryDirectory.notifyAddr(peer);

View File

@@ -54,7 +54,7 @@ public class LocalPeerDiscoveryBroadcaster {
_socket.close(); _socket.close();
} }
@Scheduled(every = "${dhfs.objects.peerdiscovery.interval}", concurrentExecution = Scheduled.ConcurrentExecution.SKIP) @Scheduled(every = "${dhfs.objects.peerdiscovery.interval}", concurrentExecution = Scheduled.ConcurrentExecution.SKIP, skipExecutionIf = Scheduled.ApplicationNotRunning.class)
public void broadcast() throws Exception { public void broadcast() throws Exception {
if (!enabled) { if (!enabled) {
return; return;

View File

@@ -1,25 +0,0 @@
package com.usatiuk.dhfs.peersync;
import com.usatiuk.dhfs.ProtoSerializer;
import com.usatiuk.dhfs.persistence.PeerInfoP;
import com.usatiuk.utils.SerializationHelper;
import jakarta.inject.Singleton;
import java.io.IOException;
@Singleton
public class PeerInfoProtoSerializer implements ProtoSerializer<PeerInfoP, PeerInfo> {
@Override
public PeerInfo deserialize(PeerInfoP message) {
try (var is = message.getSerializedData().newInput()) {
return SerializationHelper.deserialize(is);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public PeerInfoP serialize(PeerInfo object) {
return PeerInfoP.newBuilder().setSerializedData(SerializationHelper.serialize(object)).build();
}
}

View File

@@ -30,7 +30,7 @@ public class PeerLastSeenUpdater {
@Inject @Inject
PersistentPeerDataService persistentPeerDataService; PersistentPeerDataService persistentPeerDataService;
@Scheduled(every = "${dhfs.objects.last-seen.update}", concurrentExecution = Scheduled.ConcurrentExecution.SKIP) @Scheduled(every = "${dhfs.objects.last-seen.update}", concurrentExecution = Scheduled.ConcurrentExecution.SKIP, skipExecutionIf = Scheduled.ApplicationNotRunning.class)
@Blocking @Blocking
void update() { void update() {
var snapshot = reachablePeerManager.getHostStateSnapshot(); var snapshot = reachablePeerManager.getHostStateSnapshot();
@@ -52,10 +52,12 @@ public class PeerLastSeenUpdater {
var curInfo = remoteTransaction.getData(PeerInfo.class, u.id()).orElse(null); var curInfo = remoteTransaction.getData(PeerInfo.class, u.id()).orElse(null);
if (curInfo == null) return; if (curInfo == null) return;
var lastSeen = curInfo.lastSeenTimestamp(); if (lastSeenTimeout != -1) {
if (System.currentTimeMillis() - lastSeen > (lastSeenTimeout * 1000)) { var lastSeen = curInfo.lastSeenTimestamp();
var kicked = curInfo.withIncrementedKickCounter(persistentPeerDataService.getSelfUuid()); if (System.currentTimeMillis() - lastSeen > (lastSeenTimeout * 1000)) {
remoteTransaction.putData(kicked); var kicked = curInfo.withIncrementedKickCounter(persistentPeerDataService.getSelfUuid());
remoteTransaction.putData(kicked);
}
} }
}); });
} }

View File

@@ -68,27 +68,28 @@ public class PersistentPeerDataService {
private KeyPair _selfKeyPair; private KeyPair _selfKeyPair;
void init(@Observes @Priority(300) StartupEvent event) throws IOException { void init(@Observes @Priority(300) StartupEvent event) throws IOException {
jObjectTxManager.run(() -> { var selfData = jObjectTxManager.run(() -> {
var selfData = curTx.get(PersistentRemoteHostsData.class, PersistentRemoteHostsData.KEY).orElse(null); return curTx.get(PersistentRemoteHostsData.class, PersistentRemoteHostsData.KEY).orElse(null);
if (selfData != null) { });
_selfUuid = selfData.selfUuid(); if (selfData != null) {
_selfCertificate = CertificateTools.certFromBytes(selfData.selfCertificate().toByteArray()); _selfUuid = selfData.selfUuid();
_selfKeyPair = SerializationHelper.deserialize(selfData.selfKeyPair().toByteArray()); _selfCertificate = CertificateTools.certFromBytes(selfData.selfCertificate().toByteArray());
return; _selfKeyPair = SerializationHelper.deserialize(selfData.selfKeyPair().toByteArray());
} else { } else {
try { Log.info("Generating a key pair, please wait");
_selfUuid = presetUuid.map(PeerId::of).orElseGet(() -> PeerId.of(UUID.randomUUID().toString())); _selfKeyPair = CertificateTools.generateKeyPair();
Log.info("Generating a key pair, please wait"); _selfUuid = presetUuid.map(PeerId::of).orElseGet(() -> PeerId.of(UUID.randomUUID().toString()));
_selfKeyPair = CertificateTools.generateKeyPair(); _selfCertificate = CertificateTools.generateCertificate(_selfKeyPair, _selfUuid.toString());
_selfCertificate = CertificateTools.generateCertificate(_selfKeyPair, _selfUuid.toString());
jObjectTxManager.run(() -> {
try {
curTx.put(new PersistentRemoteHostsData(_selfUuid, ByteString.copyFrom(_selfCertificate.getEncoded()), SerializationHelper.serialize(_selfKeyPair), HashTreePSet.empty(), HashTreePMap.empty())); curTx.put(new PersistentRemoteHostsData(_selfUuid, ByteString.copyFrom(_selfCertificate.getEncoded()), SerializationHelper.serialize(_selfKeyPair), HashTreePSet.empty(), HashTreePMap.empty()));
peerInfoService.putPeer(_selfUuid, _selfCertificate.getEncoded()); peerInfoService.putPeer(_selfUuid, _selfCertificate.getEncoded());
} catch (CertificateEncodingException e) { } catch (CertificateEncodingException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
} });
}); }
peerTrustManager.reloadTrustManagerHosts(peerInfoService.getPeers()); peerTrustManager.reloadTrustManagerHosts(peerInfoService.getPeers());
Log.info("Self uuid is: " + _selfUuid.toString()); Log.info("Self uuid is: " + _selfUuid.toString());
new File(stuffRoot).mkdirs(); new File(stuffRoot).mkdirs();

View File

@@ -66,12 +66,11 @@ public class ReachablePeerManager {
_disconnectedListeners = List.copyOf(disconnectedListeners.stream().toList()); _disconnectedListeners = List.copyOf(disconnectedListeners.stream().toList());
} }
// Note: keep priority updated with below
void init(@Observes @Priority(600) StartupEvent event) throws IOException { void init(@Observes @Priority(600) StartupEvent event) throws IOException {
_heartbeatExecutor = Executors.newVirtualThreadPerTaskExecutor(); _heartbeatExecutor = Executors.newVirtualThreadPerTaskExecutor();
} }
@Scheduled(every = "${dhfs.objects.reconnect_interval}", concurrentExecution = Scheduled.ConcurrentExecution.SKIP) @Scheduled(every = "${dhfs.objects.reconnect_interval}", concurrentExecution = Scheduled.ConcurrentExecution.SKIP, skipExecutionIf = Scheduled.ApplicationNotRunning.class)
@Blocking @Blocking
public void tryConnectAll() { public void tryConnectAll() {
if (_heartbeatExecutor == null) return; if (_heartbeatExecutor == null) return;
@@ -158,6 +157,7 @@ public class ReachablePeerManager {
/** /**
* Checks if the given host is reachable. * Checks if the given host is reachable.
*
* @param host the host to check * @param host the host to check
* @return true if the host is reachable, false otherwise * @return true if the host is reachable, false otherwise
*/ */
@@ -167,6 +167,7 @@ public class ReachablePeerManager {
/** /**
* Checks if the given host is reachable. * Checks if the given host is reachable.
*
* @param host the host to check * @param host the host to check
* @return true if the host is reachable, false otherwise * @return true if the host is reachable, false otherwise
*/ */
@@ -176,6 +177,7 @@ public class ReachablePeerManager {
/** /**
* Gets the address of the given host. * Gets the address of the given host.
*
* @param host the host to get the address for * @param host the host to get the address for
* @return the address of the host, or null if not reachable * @return the address of the host, or null if not reachable
*/ */
@@ -185,6 +187,7 @@ public class ReachablePeerManager {
/** /**
* Gets the ids of all reachable hosts. * Gets the ids of all reachable hosts.
*
* @return a list of ids of all reachable hosts * @return a list of ids of all reachable hosts
*/ */
public List<PeerId> getAvailableHosts() { public List<PeerId> getAvailableHosts() {
@@ -193,6 +196,7 @@ public class ReachablePeerManager {
/** /**
* Gets a snapshot of current state of the connected (and not connected) peers * Gets a snapshot of current state of the connected (and not connected) peers
*
* @return information about all connected/disconnected peers * @return information about all connected/disconnected peers
*/ */
public HostStateSnapshot getHostStateSnapshot() { public HostStateSnapshot getHostStateSnapshot() {
@@ -205,6 +209,7 @@ public class ReachablePeerManager {
/** /**
* Removes the given host from the cluster * Removes the given host from the cluster
*
* @param peerId the id of the host to remove * @param peerId the id of the host to remove
*/ */
public void removeRemoteHost(PeerId peerId) { public void removeRemoteHost(PeerId peerId) {
@@ -216,6 +221,7 @@ public class ReachablePeerManager {
/** /**
* Selects the best address for the given host. * Selects the best address for the given host.
* The address is selected based on the type of the address. (with e.g. LAN address preferred over WAN) * The address is selected based on the type of the address. (with e.g. LAN address preferred over WAN)
*
* @param host the host to select the address for * @param host the host to select the address for
* @return the best address for the host, or null if not reachable * @return the best address for the host, or null if not reachable
*/ */
@@ -225,6 +231,7 @@ public class ReachablePeerManager {
/** /**
* Call the given peer and get its information. * Call the given peer and get its information.
*
* @param host the peer to get the information for * @param host the peer to get the information for
* @return the information about the peer * @return the information about the peer
*/ */
@@ -243,6 +250,7 @@ public class ReachablePeerManager {
/** /**
* Adds the given peer to the cluster. * Adds the given peer to the cluster.
* The certificate provided is verified against the one peer is using right now. * The certificate provided is verified against the one peer is using right now.
*
* @param host the peer to add * @param host the peer to add
* @param cert the certificate of the peer * @param cert the certificate of the peer
*/ */
@@ -264,6 +272,7 @@ public class ReachablePeerManager {
/** /**
* Gets the information about all reachable peers that are not added to the cluster. * Gets the information about all reachable peers that are not added to the cluster.
*
* @return a collection of pairs of peer id and peer info * @return a collection of pairs of peer id and peer info
*/ */
public Collection<Pair<PeerId, ApiPeerInfo>> getSeenButNotAddedHosts() { public Collection<Pair<PeerId, ApiPeerInfo>> getSeenButNotAddedHosts() {

View File

@@ -22,20 +22,6 @@ message ObjectChangelog {
repeated entries_Entry entries = 1; repeated entries_Entry entries = 1;
} }
// TODO: Serialization
message FileDtoP {
bytes serializedData = 1;
}
message ChunkDataP {
JObjectKeyP key = 1;
bytes data = 2;
}
message PeerInfoP {
bytes serializedData = 1;
}
message JDataRemoteDtoP { message JDataRemoteDtoP {
bytes serializedData = 1; bytes serializedData = 1;
} }

View File

@@ -1,3 +1,5 @@
# https://chatgpt.com/c/681762a4-dddc-800a-adad-2797355013f8
$ErrorActionPreference = 'Stop' $ErrorActionPreference = 'Stop'
$PIDFILE = Join-Path $PSScriptRoot ".pid" $PIDFILE = Join-Path $PSScriptRoot ".pid"

View File

@@ -1,3 +1,5 @@
# https://chatgpt.com/c/681762a4-dddc-800a-adad-2797355013f8
$ErrorActionPreference = 'Stop' $ErrorActionPreference = 'Stop'
$PIDFILE = Join-Path $PSScriptRoot ".pid" $PIDFILE = Join-Path $PSScriptRoot ".pid"

View File

@@ -1,3 +1,5 @@
# https://chatgpt.com/c/681762a4-dddc-800a-adad-2797355013f8
$ErrorActionPreference = 'Stop' $ErrorActionPreference = 'Stop'
$PIDFILE = Join-Path $PSScriptRoot ".pid" $PIDFILE = Join-Path $PSScriptRoot ".pid"