8 Commits

Author SHA1 Message Date
1025e6b246 Sync-base: cleanup proto files 2025-05-14 12:14:38 +02:00
7274131052 update readme 2025-05-14 12:05:13 +02:00
930eb38b9b Objects: cleanup ReadTrackingSourceWrapper 2025-05-14 11:41:11 +02:00
afb6f0c270 Dhfs-fuse: fix ctime 2025-05-14 11:39:40 +02:00
e7f5be689f Dhfs-fuse: report real filesystem space 2025-05-14 11:32:00 +02:00
84b1d57125 Sync-base: skip scheduled execution when app is not running 2025-05-14 11:27:39 +02:00
2206c13466 More lmdb logs 2025-05-14 11:18:05 +02:00
d1df6b705f Show logs for test containers that failed starting
it just... doesn't start sometimes???
2025-05-14 10:38:55 +02:00
23 changed files with 186 additions and 131 deletions

View File

@@ -16,7 +16,36 @@ the DHFS server in the background, and update itself (hopefully!)
## How to use it?
### General prerequisites
Unpack the run-wrapper and run the `run` script. The filesystem should be mounted to the `fuse` folder in the run-wrapper root directory.
Java should be available as `java` in path, and Java 21+ is required.
FUSE 2 userspace library also should be available:
- On Ubuntu `libfuse2` package can be installed.
- On Windows, [WinFsp](https://winfsp.dev/) should be installed.
- On macOS, [macFUSE](https://macfuse.github.io/).
In the run-wrapper, 3 scripts are available.
- `run` script starts the filesystem
- `stop` script stops it
- `update` script will update the filesystem to the newest available CI build
On Windows, Powershell alternatives should be used. For them to work, it might be required to allow execution of unsigned scripts using `set-executionpolicy remotesigned`.
Additional options for the filesystem can be specified in the `extra-opts` file in the same directory with the run scripts.
One line in the `extra-opts` file corresponds to one option passed to the JVM when starting the filesystem.
Some extra possible configuration options are:
- `-Ddhfs.fuse.root=` specifies the root where filesystem should be mounted. By default it is the `fuse` path under the `run-wrapper` root. For windows, it should be a disk letter, by default `Z:\`.
- `-Ddhfs.objects.last-seen.timeout=` specifies the period of time (in seconds) after which unavailable peers will be ignored for gabgage collection and resynchronized after being reconnected. The default is 43200 (30 days), if set to `-1`, this feature is disabled.
- `-Ddhfs.objects.autosync.download-all=` specifies whether all objects (files and their data) should be downloaded to this peer. `true` or `false`, the default is `false`.
On Windows, the entire space for the filesystem should also be preallocated, the `-Ddhfs.objects.persistence.lmdb.size=` option controls the size (the value is in bytes), on Windows the default is 100GB.
Then, a web interface will be available at `losthost:8080`, that can be used to connect with other peers.

View File

@@ -11,9 +11,10 @@ import java.util.Set;
/**
* File is a data structure that represents a file in the file system
*
* @param key unique key
* @param mode file mode
* @param cTime creation time
* @param cTime inode modification time
* @param mTime modification time
* @param symlink true if the file is a symlink, false otherwise
*/
@@ -40,6 +41,10 @@ public record File(JObjectKey key, long mode, long cTime, long mTime,
return new File(key, mode, cTime, System.currentTimeMillis(), symlink);
}
public File withCurrentCTime() {
return new File(key, mode, System.currentTimeMillis(), mTime, symlink);
}
@Override
public Collection<JObjectKey> collectRefsTo() {
return Set.of();

View File

@@ -19,6 +19,7 @@ import com.usatiuk.dhfsfs.objects.JKleppmannTreeNodeMetaFile;
import com.usatiuk.objects.JData;
import com.usatiuk.objects.JObjectKey;
import com.usatiuk.objects.iterators.IteratorStart;
import com.usatiuk.objects.stores.ObjectPersistentStore;
import com.usatiuk.objects.transaction.Transaction;
import com.usatiuk.objects.transaction.TransactionManager;
import com.usatiuk.utils.StatusRuntimeExceptionNoStacktrace;
@@ -69,6 +70,8 @@ public class DhfsFileService {
JKleppmannTreeManager jKleppmannTreeManager;
@Inject
JMapHelper jMapHelper;
@Inject
ObjectPersistentStore objectPersistentStore;
private JKleppmannTreeManager.JKleppmannTree getTree() {
return jKleppmannTreeManager.getTree(JObjectKey.of("fs"), () -> new JKleppmannTreeNodeMetaDirectory(""));
@@ -286,7 +289,7 @@ public class DhfsFileService {
} else if (dent instanceof RemoteObjectMeta) {
var remote = remoteTx.getData(JDataRemote.class, uuid).orElse(null);
if (remote instanceof File f) {
remoteTx.putData(f.withMode(mode).withCurrentMTime());
remoteTx.putData(f.withMode(mode).withCurrentCTime());
return true;
} else {
throw new IllegalArgumentException(uuid + " is not a file");
@@ -720,11 +723,10 @@ public class DhfsFileService {
* Set the access and modification times of a file.
*
* @param fileUuid the ID of the file
* @param atimeMs the access time in milliseconds
* @param mtimeMs the modification time in milliseconds
* @return true if the times were set successfully, false otherwise
*/
public Boolean setTimes(JObjectKey fileUuid, long atimeMs, long mtimeMs) {
public Boolean setTimes(JObjectKey fileUuid, long mtimeMs) {
return jObjectTxManager.executeTx(() -> {
var dent = curTx.get(JData.class, fileUuid).orElseThrow(() -> new StatusRuntimeExceptionNoStacktrace(Status.NOT_FOUND));
@@ -734,7 +736,7 @@ public class DhfsFileService {
} else if (dent instanceof RemoteObjectMeta) {
var remote = remoteTx.getData(JDataRemote.class, fileUuid).orElse(null);
if (remote instanceof File f) {
remoteTx.putData(f.withCTime(atimeMs).withMTime(mtimeMs));
remoteTx.putData(f.withCTime(System.currentTimeMillis()).withMTime(mtimeMs));
return true;
} else {
throw new IllegalArgumentException(fileUuid + " is not a file");
@@ -781,4 +783,22 @@ public class DhfsFileService {
public Long write(JObjectKey fileUuid, long offset, byte[] data) {
return write(fileUuid, offset, UnsafeByteOperations.unsafeWrap(data));
}
/**
* Get the free space on the filesystem.
*
* @return the free space in bytes
*/
public long getFreeSpace() {
return objectPersistentStore.getFreeSpace();
}
/**
* Get the total space on the filesystem.
*
* @return the total space in bytes
*/
public long getTotalSpace() {
return objectPersistentStore.getTotalSpace();
}
}

View File

@@ -54,13 +54,12 @@ public class DhfsFuse extends FuseStubFS {
boolean enabled;
@ConfigProperty(name = "dhfs.fuse.debug")
Boolean debug;
@ConfigProperty(name = "dhfs.files.target_chunk_size")
int targetChunkSize;
@Inject
DhfsFileService fileService;
/**
* Allocate a handle for the given key.
*
* @param key the key to allocate a handle for
* @return the allocated handle, not 0
*/
@@ -76,6 +75,7 @@ public class DhfsFuse extends FuseStubFS {
/**
* Get the key from the handle.
*
* @param handle the handle to get the key from
* @return the key, or null if not found
*/
@@ -112,7 +112,6 @@ public class DhfsFuse extends FuseStubFS {
opts.add("-o");
opts.add("iosize=" + iosize);
} else if (SystemUtils.IS_OS_LINUX) {
// FIXME: There's something else missing: the writes still seem to be 32k max
// opts.add("-o");
// opts.add("large_read");
opts.add("-o");
@@ -144,13 +143,12 @@ public class DhfsFuse extends FuseStubFS {
try {
stbuf.f_frsize.set(blksize);
stbuf.f_bsize.set(blksize);
// FIXME:
stbuf.f_blocks.set(1024 * 1024 * 1024 / blksize); // total data blocks in file system
stbuf.f_bfree.set(1024 * 1024 * 1024 / blksize); // free blocks in fs
stbuf.f_bavail.set(1024 * 1024 * 1024 / blksize); // avail blocks in fs
stbuf.f_files.set(1000); //FIXME:
stbuf.f_ffree.set(Integer.MAX_VALUE - 2000); //FIXME:
stbuf.f_favail.set(Integer.MAX_VALUE - 2000); //FIXME:
stbuf.f_blocks.set(fileService.getTotalSpace() / blksize); // total data blocks in file system
stbuf.f_bfree.set(fileService.getFreeSpace() / blksize); // free blocks in fs
stbuf.f_bavail.set(fileService.getFreeSpace() / blksize); // avail blocks in fs
stbuf.f_files.set(1000); // TODO: Calculate real file counts?
stbuf.f_ffree.set(Integer.MAX_VALUE - 1000);
stbuf.f_favail.set(Integer.MAX_VALUE - 1000);
stbuf.f_namemax.set(2048);
return super.statfs(path, stbuf);
} catch (Throwable e) {
@@ -186,13 +184,13 @@ public class DhfsFuse extends FuseStubFS {
}
}
// FIXME: Race?
stat.st_ctim.tv_sec.set(found.get().ctime() / 1000);
stat.st_ctim.tv_nsec.set((found.get().ctime() % 1000) * 1000);
stat.st_mtim.tv_sec.set(found.get().mtime() / 1000);
stat.st_mtim.tv_nsec.set((found.get().mtime() % 1000) * 1000);
stat.st_atim.tv_sec.set(found.get().mtime() / 1000);
stat.st_atim.tv_nsec.set((found.get().mtime() % 1000) * 1000);
var atime = Math.max(found.get().ctime(), found.get().mtime());
stat.st_atim.tv_sec.set(atime / 1000);
stat.st_atim.tv_nsec.set((atime % 1000) * 1000000L);
stat.st_blksize.set(blksize);
} catch (Throwable e) {
Log.error("When getattr " + path, e);
@@ -208,8 +206,7 @@ public class DhfsFuse extends FuseStubFS {
if (fileOpt.isEmpty()) return -ErrorCodes.ENOENT();
var file = fileOpt.get();
var res = fileService.setTimes(file,
timespec[0].tv_sec.get() * 1000,
timespec[1].tv_sec.get() * 1000);
timespec[1].tv_sec.get() * 1000L + timespec[1].tv_nsec.longValue() / 1000000L);
if (!res) return -ErrorCodes.EINVAL();
else return 0;
} catch (Throwable e) {

View File

@@ -40,11 +40,11 @@ public class DhfsFuseIT {
container1 = new GenericContainer<>(DhfsImage.getInstance())
.withPrivilegedMode(true)
.withCreateContainerCmdModifier(cmd -> Objects.requireNonNull(cmd.getHostConfig()).withDevices(Device.parse("/dev/fuse")))
.waitingFor(Wait.forLogMessage(".*Listening.*", 1).withStartupTimeout(Duration.ofSeconds(60))).withNetwork(network);
.withNetwork(network);
container2 = new GenericContainer<>(DhfsImage.getInstance())
.withPrivilegedMode(true)
.withCreateContainerCmdModifier(cmd -> Objects.requireNonNull(cmd.getHostConfig()).withDevices(Device.parse("/dev/fuse")))
.waitingFor(Wait.forLogMessage(".*Listening.*", 1).withStartupTimeout(Duration.ofSeconds(60))).withNetwork(network);
.withNetwork(network);
Stream.of(container1, container2).parallel().forEach(GenericContainer::start);
@@ -55,6 +55,9 @@ public class DhfsFuseIT {
var loggingConsumer2 = new Slf4jLogConsumer(LoggerFactory.getLogger(DhfsFuseIT.class)).withPrefix("2-" + testInfo.getDisplayName());
container2.followOutput(loggingConsumer2.andThen(waitingConsumer2));
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Listening"), 60, TimeUnit.SECONDS);
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Listening"), 60, TimeUnit.SECONDS);
c1uuid = container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/data/stuff/self_uuid").getStdout();
c2uuid = container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/data/stuff/self_uuid").getStdout();

View File

@@ -48,39 +48,42 @@ public class DhfsFusex3IT {
container1 = new GenericContainer<>(DhfsImage.getInstance())
.withPrivilegedMode(true)
.withCreateContainerCmdModifier(cmd -> Objects.requireNonNull(cmd.getHostConfig()).withDevices(Device.parse("/dev/fuse")))
.waitingFor(Wait.forLogMessage(".*Listening.*", 1).withStartupTimeout(Duration.ofSeconds(60))).withNetwork(network);
.withNetwork(network);
container2 = new GenericContainer<>(DhfsImage.getInstance())
.withPrivilegedMode(true)
.withCreateContainerCmdModifier(cmd -> Objects.requireNonNull(cmd.getHostConfig()).withDevices(Device.parse("/dev/fuse")))
.waitingFor(Wait.forLogMessage(".*Listening.*", 1).withStartupTimeout(Duration.ofSeconds(60))).withNetwork(network);
.withNetwork(network);
container3 = new GenericContainer<>(DhfsImage.getInstance())
.withPrivilegedMode(true)
.withCreateContainerCmdModifier(cmd -> Objects.requireNonNull(cmd.getHostConfig()).withDevices(Device.parse("/dev/fuse")))
.waitingFor(Wait.forLogMessage(".*Listening.*", 1).withStartupTimeout(Duration.ofSeconds(60))).withNetwork(network);
.withNetwork(network);
Stream.of(container1, container2, container3).parallel().forEach(GenericContainer::start);
waitingConsumer1 = new WaitingConsumer();
var loggingConsumer1 = new Slf4jLogConsumer(LoggerFactory.getLogger(DhfsFusex3IT.class))
.withPrefix(1 + "-" + testInfo.getDisplayName());
container1.followOutput(loggingConsumer1.andThen(waitingConsumer1));
waitingConsumer2 = new WaitingConsumer();
var loggingConsumer2 = new Slf4jLogConsumer(LoggerFactory.getLogger(DhfsFusex3IT.class))
.withPrefix(2 + "-" + testInfo.getDisplayName());
container2.followOutput(loggingConsumer2.andThen(waitingConsumer2));
waitingConsumer3 = new WaitingConsumer();
var loggingConsumer3 = new Slf4jLogConsumer(LoggerFactory.getLogger(DhfsFusex3IT.class))
.withPrefix(3 + "-" + testInfo.getDisplayName());
container3.followOutput(loggingConsumer3.andThen(waitingConsumer3));
waitingConsumer3.waitUntil(frame -> frame.getUtf8String().contains("Listening"), 60, TimeUnit.SECONDS);
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Listening"), 60, TimeUnit.SECONDS);
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Listening"), 60, TimeUnit.SECONDS);
c1uuid = container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/data/stuff/self_uuid").getStdout();
c2uuid = container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/data/stuff/self_uuid").getStdout();
c3uuid = container3.execInContainer("/bin/sh", "-c", "cat /dhfs_test/data/stuff/self_uuid").getStdout();
Log.info(container1.getContainerId() + "=" + c1uuid);
Log.info(container2.getContainerId() + "=" + c2uuid);
Log.info(container3.getContainerId() + "=" + c3uuid);
waitingConsumer1 = new WaitingConsumer();
var loggingConsumer1 = new Slf4jLogConsumer(LoggerFactory.getLogger(DhfsFusex3IT.class))
.withPrefix(c1uuid.substring(0, 4) + "-" + testInfo.getDisplayName());
container1.followOutput(loggingConsumer1.andThen(waitingConsumer1));
waitingConsumer2 = new WaitingConsumer();
var loggingConsumer2 = new Slf4jLogConsumer(LoggerFactory.getLogger(DhfsFusex3IT.class))
.withPrefix(c2uuid.substring(0, 4) + "-" + testInfo.getDisplayName());
container2.followOutput(loggingConsumer2.andThen(waitingConsumer2));
waitingConsumer3 = new WaitingConsumer();
var loggingConsumer3 = new Slf4jLogConsumer(LoggerFactory.getLogger(DhfsFusex3IT.class))
.withPrefix(c3uuid.substring(0, 4) + "-" + testInfo.getDisplayName());
container3.followOutput(loggingConsumer3.andThen(waitingConsumer3));
Log.info(container1.getContainerId() + "=" + c1uuid + " = 1");
Log.info(container2.getContainerId() + "=" + c2uuid + " = 2");
Log.info(container3.getContainerId() + "=" + c3uuid + " = 3");
Assertions.assertDoesNotThrow(() -> UUID.fromString(c1uuid));
Assertions.assertDoesNotThrow(() -> UUID.fromString(c2uuid));

View File

@@ -52,12 +52,12 @@ public class KillIT {
container1 = new GenericContainer<>(DhfsImage.getInstance())
.withPrivilegedMode(true)
.withCreateContainerCmdModifier(cmd -> Objects.requireNonNull(cmd.getHostConfig()).withDevices(Device.parse("/dev/fuse")))
.waitingFor(Wait.forLogMessage(".*Listening.*", 1).withStartupTimeout(Duration.ofSeconds(60))).withNetwork(network)
.withNetwork(network)
.withFileSystemBind(data1.getAbsolutePath(), "/dhfs_test/data");
container2 = new GenericContainer<>(DhfsImage.getInstance())
.withPrivilegedMode(true)
.withCreateContainerCmdModifier(cmd -> Objects.requireNonNull(cmd.getHostConfig()).withDevices(Device.parse("/dev/fuse")))
.waitingFor(Wait.forLogMessage(".*Listening.*", 1).withStartupTimeout(Duration.ofSeconds(60))).withNetwork(network)
.withNetwork(network)
.withFileSystemBind(data2.getAbsolutePath(), "/dhfs_test/data");
Stream.of(container1, container2).parallel().forEach(GenericContainer::start);
@@ -69,6 +69,9 @@ public class KillIT {
var loggingConsumer2 = new Slf4jLogConsumer(LoggerFactory.getLogger(KillIT.class)).withPrefix("2-" + testInfo.getDisplayName());
container2.followOutput(loggingConsumer2.andThen(waitingConsumer2));
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Listening"), 60, TimeUnit.SECONDS);
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Listening"), 60, TimeUnit.SECONDS);
c1uuid = container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/data/stuff/self_uuid").getStdout();
c2uuid = container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/data/stuff/self_uuid").getStdout();

View File

@@ -64,12 +64,12 @@ public class LazyFsIT {
container1 = new GenericContainer<>(DhfsImage.getInstance())
.withPrivilegedMode(true)
.withCreateContainerCmdModifier(cmd -> Objects.requireNonNull(cmd.getHostConfig()).withDevices(Device.parse("/dev/fuse")))
.waitingFor(Wait.forLogMessage(".*Listening.*", 1).withStartupTimeout(Duration.ofSeconds(60))).withNetwork(network)
.withNetwork(network)
.withFileSystemBind(data1.getAbsolutePath(), "/dhfs_test/data");
container2 = new GenericContainer<>(DhfsImage.getInstance())
.withPrivilegedMode(true)
.withCreateContainerCmdModifier(cmd -> Objects.requireNonNull(cmd.getHostConfig()).withDevices(Device.parse("/dev/fuse")))
.waitingFor(Wait.forLogMessage(".*Listening.*", 1).withStartupTimeout(Duration.ofSeconds(60))).withNetwork(network)
.withNetwork(network)
.withFileSystemBind(data2.getAbsolutePath(), "/dhfs_test/data");
Stream.of(container1, container2).parallel().forEach(GenericContainer::start);
@@ -81,6 +81,9 @@ public class LazyFsIT {
var loggingConsumer2 = new Slf4jLogConsumer(LoggerFactory.getLogger(LazyFsIT.class)).withPrefix("2-" + testInfo.getDisplayName());
container2.followOutput(loggingConsumer2.andThen(waitingConsumer2));
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Listening"), 60, TimeUnit.SECONDS);
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Listening"), 60, TimeUnit.SECONDS);
c1uuid = container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/data/stuff/self_uuid").getStdout();
c2uuid = container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/data/stuff/self_uuid").getStdout();

View File

@@ -38,11 +38,11 @@ public class ResyncIT {
container1 = new GenericContainer<>(DhfsImage.getInstance())
.withPrivilegedMode(true)
.withCreateContainerCmdModifier(cmd -> Objects.requireNonNull(cmd.getHostConfig()).withDevices(Device.parse("/dev/fuse")))
.waitingFor(Wait.forLogMessage(".*Listening.*", 1).withStartupTimeout(Duration.ofSeconds(60))).withNetwork(network);
.withNetwork(network);
container2 = new GenericContainer<>(DhfsImage.getInstance())
.withPrivilegedMode(true)
.withCreateContainerCmdModifier(cmd -> Objects.requireNonNull(cmd.getHostConfig()).withDevices(Device.parse("/dev/fuse")))
.waitingFor(Wait.forLogMessage(".*Listening.*", 1).withStartupTimeout(Duration.ofSeconds(60))).withNetwork(network);
.withNetwork(network);
Stream.of(container1, container2).parallel().forEach(GenericContainer::start);
@@ -52,6 +52,9 @@ public class ResyncIT {
waitingConsumer2 = new WaitingConsumer();
var loggingConsumer2 = new Slf4jLogConsumer(LoggerFactory.getLogger(DhfsFuseIT.class)).withPrefix("2-" + testInfo.getDisplayName());
container2.followOutput(loggingConsumer2.andThen(waitingConsumer2));
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Listening"), 60, TimeUnit.SECONDS);
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Listening"), 60, TimeUnit.SECONDS);
}
@AfterEach

View File

@@ -67,12 +67,15 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore {
Log.info("Initializing with root " + _root);
_root.toFile().mkdirs();
}
Log.info("Opening LMDB with root " + _root);
_env = create()
.setMapSize(lmdbSize)
.setMaxDbs(1)
.open(_root.toFile(), EnvFlags.MDB_NOTLS);
_db = _env.openDbi(DB_NAME, MDB_CREATE);
Log.info("Opened LMDB with root " + _root);
try (Txn<ByteBuffer> txn = _env.txnWrite()) {
var read = readTxId(txn);
if (read.isPresent()) {
@@ -87,6 +90,7 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore {
}
_ready = true;
Log.info("LMDB storage ready");
}
private Optional<Long> readTxId(Txn<ByteBuffer> txn) {
@@ -95,6 +99,9 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore {
}
void shutdown(@Observes @Priority(900) ShutdownEvent event) throws IOException {
if (!_ready) {
return;
}
_ready = false;
_db.close();
_env.close();
@@ -112,6 +119,7 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore {
*/
@Override
public Snapshot<JObjectKey, ByteBuffer> getSnapshot() {
verifyReady();
var txn = _env.txnRead();
try {
long commitId = readTxId(txn).orElseThrow();

View File

@@ -23,14 +23,16 @@ class TransactionImpl implements Transaction, AutoCloseable {
private boolean _writeTrack = false;
private Map<JObjectKey, TxRecord.TxObjectRecord<?>> _newWrites = new HashMap<>();
private interface ReadTrackingInternalCrap {
/**
* Identifies the source of the read: whether it's from the source or written from the transaction.
*/
private interface ReadTrackingSourceWrapper {
boolean fromSource();
JData obj();
}
// FIXME:
private record ReadTrackingInternalCrapSource(JDataVersionedWrapper wrapped) implements ReadTrackingInternalCrap {
private record ReadTrackingSourceWrapperSource(JDataVersionedWrapper wrapped) implements ReadTrackingSourceWrapper {
@Override
public boolean fromSource() {
return true;
@@ -42,7 +44,7 @@ class TransactionImpl implements Transaction, AutoCloseable {
}
}
private record ReadTrackingInternalCrapTx(JData obj) implements ReadTrackingInternalCrap {
private record ReadTrackingSourceWrapperTx(JData obj) implements ReadTrackingSourceWrapper {
@Override
public boolean fromSource() {
return false;
@@ -107,21 +109,21 @@ class TransactionImpl implements Transaction, AutoCloseable {
@Override
public CloseableKvIterator<JObjectKey, JData> getIterator(IteratorStart start, JObjectKey key) {
Log.tracev("Getting tx iterator with start={0}, key={1}", start, key);
return new ReadTrackingIterator(new TombstoneSkippingIterator<JObjectKey, ReadTrackingInternalCrap>(start, key,
return new ReadTrackingIterator(new TombstoneSkippingIterator<JObjectKey, ReadTrackingSourceWrapper>(start, key,
ListUtils.prependAndMap(
new MappingKvIterator<>(new NavigableMapKvIterator<>(_writes, start, key),
t -> switch (t) {
case TxRecord.TxObjectRecordWrite<?> write ->
new DataWrapper<ReadTrackingInternalCrap>(new ReadTrackingInternalCrapTx(write.data()));
new DataWrapper<ReadTrackingSourceWrapper>(new ReadTrackingSourceWrapperTx(write.data()));
case TxRecord.TxObjectRecordDeleted deleted ->
new TombstoneImpl<ReadTrackingInternalCrap>();
new TombstoneImpl<ReadTrackingSourceWrapper>();
case null, default -> null;
}),
_snapshot.getIterator(start, key),
itin -> new MappingKvIterator<JObjectKey, MaybeTombstone<JDataVersionedWrapper>, MaybeTombstone<ReadTrackingInternalCrap>>(itin,
itin -> new MappingKvIterator<JObjectKey, MaybeTombstone<JDataVersionedWrapper>, MaybeTombstone<ReadTrackingSourceWrapper>>(itin,
d -> switch (d) {
case Data<JDataVersionedWrapper> w ->
new DataWrapper<>(new ReadTrackingInternalCrapSource(w.value()));
new DataWrapper<>(new ReadTrackingSourceWrapperSource(w.value()));
case Tombstone<JDataVersionedWrapper> t -> new TombstoneImpl<>();
case null, default -> null;
}))));
@@ -178,9 +180,9 @@ class TransactionImpl implements Transaction, AutoCloseable {
}
private class ReadTrackingIterator implements CloseableKvIterator<JObjectKey, JData> {
private final CloseableKvIterator<JObjectKey, ReadTrackingInternalCrap> _backing;
private final CloseableKvIterator<JObjectKey, ReadTrackingSourceWrapper> _backing;
public ReadTrackingIterator(CloseableKvIterator<JObjectKey, ReadTrackingInternalCrap> backing) {
public ReadTrackingIterator(CloseableKvIterator<JObjectKey, ReadTrackingSourceWrapper> backing) {
_backing = backing;
}
@@ -202,7 +204,7 @@ class TransactionImpl implements Transaction, AutoCloseable {
@Override
public Pair<JObjectKey, JData> prev() {
var got = _backing.prev();
if (got.getValue() instanceof ReadTrackingInternalCrapSource(JDataVersionedWrapper wrapped)) {
if (got.getValue() instanceof ReadTrackingSourceWrapperSource(JDataVersionedWrapper wrapped)) {
_readSet.putIfAbsent(got.getKey(), Optional.of(wrapped));
}
return Pair.of(got.getKey(), got.getValue().obj());
@@ -231,7 +233,7 @@ class TransactionImpl implements Transaction, AutoCloseable {
@Override
public Pair<JObjectKey, JData> next() {
var got = _backing.next();
if (got.getValue() instanceof ReadTrackingInternalCrapSource(JDataVersionedWrapper wrapped)) {
if (got.getValue() instanceof ReadTrackingSourceWrapperSource(JDataVersionedWrapper wrapped)) {
_readSet.putIfAbsent(got.getKey(), Optional.of(wrapped));
}
return Pair.of(got.getKey(), got.getValue().obj());

View File

@@ -67,7 +67,7 @@ public class DeferredInvalidationQueueService implements PeerConnectedEventListe
/**
* Periodically returns deferred invalidations to the invalidation queue for all reachable hosts.
*/
@Scheduled(every = "15s", concurrentExecution = Scheduled.ConcurrentExecution.SKIP)
@Scheduled(every = "15s", concurrentExecution = Scheduled.ConcurrentExecution.SKIP, skipExecutionIf = Scheduled.ApplicationNotRunning.class)
@Blocking
void periodicReturn() {
for (var reachable : reachablePeerManager.getAvailableHosts())

View File

@@ -15,7 +15,7 @@ public class PersistentStaticPeerDiscovery {
@Inject
PersistentPeerDataService persistentPeerDataService;
@Scheduled(every = "1s", concurrentExecution = Scheduled.ConcurrentExecution.SKIP)
@Scheduled(every = "1s", concurrentExecution = Scheduled.ConcurrentExecution.SKIP, skipExecutionIf = Scheduled.ApplicationNotRunning.class)
public void discoverPeers() {
var addrs = persistentPeerDataService.getPersistentPeerAddresses();
for (var addr : addrs) {

View File

@@ -25,7 +25,7 @@ public class StaticPeerDiscovery {
).toList();
}
@Scheduled(every = "1s", concurrentExecution = Scheduled.ConcurrentExecution.SKIP)
@Scheduled(every = "1s", concurrentExecution = Scheduled.ConcurrentExecution.SKIP, skipExecutionIf = Scheduled.ApplicationNotRunning.class)
public void discoverPeers() {
for (var peer : _peers) {
peerDiscoveryDirectory.notifyAddr(peer);

View File

@@ -54,7 +54,7 @@ public class LocalPeerDiscoveryBroadcaster {
_socket.close();
}
@Scheduled(every = "${dhfs.objects.peerdiscovery.interval}", concurrentExecution = Scheduled.ConcurrentExecution.SKIP)
@Scheduled(every = "${dhfs.objects.peerdiscovery.interval}", concurrentExecution = Scheduled.ConcurrentExecution.SKIP, skipExecutionIf = Scheduled.ApplicationNotRunning.class)
public void broadcast() throws Exception {
if (!enabled) {
return;

View File

@@ -1,25 +0,0 @@
package com.usatiuk.dhfs.peersync;
import com.usatiuk.dhfs.ProtoSerializer;
import com.usatiuk.dhfs.persistence.PeerInfoP;
import com.usatiuk.utils.SerializationHelper;
import jakarta.inject.Singleton;
import java.io.IOException;
@Singleton
public class PeerInfoProtoSerializer implements ProtoSerializer<PeerInfoP, PeerInfo> {
@Override
public PeerInfo deserialize(PeerInfoP message) {
try (var is = message.getSerializedData().newInput()) {
return SerializationHelper.deserialize(is);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public PeerInfoP serialize(PeerInfo object) {
return PeerInfoP.newBuilder().setSerializedData(SerializationHelper.serialize(object)).build();
}
}

View File

@@ -30,7 +30,7 @@ public class PeerLastSeenUpdater {
@Inject
PersistentPeerDataService persistentPeerDataService;
@Scheduled(every = "${dhfs.objects.last-seen.update}", concurrentExecution = Scheduled.ConcurrentExecution.SKIP)
@Scheduled(every = "${dhfs.objects.last-seen.update}", concurrentExecution = Scheduled.ConcurrentExecution.SKIP, skipExecutionIf = Scheduled.ApplicationNotRunning.class)
@Blocking
void update() {
var snapshot = reachablePeerManager.getHostStateSnapshot();
@@ -52,11 +52,13 @@ public class PeerLastSeenUpdater {
var curInfo = remoteTransaction.getData(PeerInfo.class, u.id()).orElse(null);
if (curInfo == null) return;
if (lastSeenTimeout != -1) {
var lastSeen = curInfo.lastSeenTimestamp();
if (System.currentTimeMillis() - lastSeen > (lastSeenTimeout * 1000)) {
var kicked = curInfo.withIncrementedKickCounter(persistentPeerDataService.getSelfUuid());
remoteTransaction.putData(kicked);
}
}
});
}
}

View File

@@ -68,27 +68,28 @@ public class PersistentPeerDataService {
private KeyPair _selfKeyPair;
void init(@Observes @Priority(300) StartupEvent event) throws IOException {
jObjectTxManager.run(() -> {
var selfData = curTx.get(PersistentRemoteHostsData.class, PersistentRemoteHostsData.KEY).orElse(null);
var selfData = jObjectTxManager.run(() -> {
return curTx.get(PersistentRemoteHostsData.class, PersistentRemoteHostsData.KEY).orElse(null);
});
if (selfData != null) {
_selfUuid = selfData.selfUuid();
_selfCertificate = CertificateTools.certFromBytes(selfData.selfCertificate().toByteArray());
_selfKeyPair = SerializationHelper.deserialize(selfData.selfKeyPair().toByteArray());
return;
} else {
try {
_selfUuid = presetUuid.map(PeerId::of).orElseGet(() -> PeerId.of(UUID.randomUUID().toString()));
Log.info("Generating a key pair, please wait");
_selfKeyPair = CertificateTools.generateKeyPair();
_selfUuid = presetUuid.map(PeerId::of).orElseGet(() -> PeerId.of(UUID.randomUUID().toString()));
_selfCertificate = CertificateTools.generateCertificate(_selfKeyPair, _selfUuid.toString());
jObjectTxManager.run(() -> {
try {
curTx.put(new PersistentRemoteHostsData(_selfUuid, ByteString.copyFrom(_selfCertificate.getEncoded()), SerializationHelper.serialize(_selfKeyPair), HashTreePSet.empty(), HashTreePMap.empty()));
peerInfoService.putPeer(_selfUuid, _selfCertificate.getEncoded());
} catch (CertificateEncodingException e) {
throw new RuntimeException(e);
}
}
});
}
peerTrustManager.reloadTrustManagerHosts(peerInfoService.getPeers());
Log.info("Self uuid is: " + _selfUuid.toString());
new File(stuffRoot).mkdirs();

View File

@@ -66,12 +66,11 @@ public class ReachablePeerManager {
_disconnectedListeners = List.copyOf(disconnectedListeners.stream().toList());
}
// Note: keep priority updated with below
void init(@Observes @Priority(600) StartupEvent event) throws IOException {
_heartbeatExecutor = Executors.newVirtualThreadPerTaskExecutor();
}
@Scheduled(every = "${dhfs.objects.reconnect_interval}", concurrentExecution = Scheduled.ConcurrentExecution.SKIP)
@Scheduled(every = "${dhfs.objects.reconnect_interval}", concurrentExecution = Scheduled.ConcurrentExecution.SKIP, skipExecutionIf = Scheduled.ApplicationNotRunning.class)
@Blocking
public void tryConnectAll() {
if (_heartbeatExecutor == null) return;
@@ -158,6 +157,7 @@ public class ReachablePeerManager {
/**
* Checks if the given host is reachable.
*
* @param host the host to check
* @return true if the host is reachable, false otherwise
*/
@@ -167,6 +167,7 @@ public class ReachablePeerManager {
/**
* Checks if the given host is reachable.
*
* @param host the host to check
* @return true if the host is reachable, false otherwise
*/
@@ -176,6 +177,7 @@ public class ReachablePeerManager {
/**
* Gets the address of the given host.
*
* @param host the host to get the address for
* @return the address of the host, or null if not reachable
*/
@@ -185,6 +187,7 @@ public class ReachablePeerManager {
/**
* Gets the ids of all reachable hosts.
*
* @return a list of ids of all reachable hosts
*/
public List<PeerId> getAvailableHosts() {
@@ -193,6 +196,7 @@ public class ReachablePeerManager {
/**
* Gets a snapshot of current state of the connected (and not connected) peers
*
* @return information about all connected/disconnected peers
*/
public HostStateSnapshot getHostStateSnapshot() {
@@ -205,6 +209,7 @@ public class ReachablePeerManager {
/**
* Removes the given host from the cluster
*
* @param peerId the id of the host to remove
*/
public void removeRemoteHost(PeerId peerId) {
@@ -216,6 +221,7 @@ public class ReachablePeerManager {
/**
* Selects the best address for the given host.
* The address is selected based on the type of the address. (with e.g. LAN address preferred over WAN)
*
* @param host the host to select the address for
* @return the best address for the host, or null if not reachable
*/
@@ -225,6 +231,7 @@ public class ReachablePeerManager {
/**
* Call the given peer and get its information.
*
* @param host the peer to get the information for
* @return the information about the peer
*/
@@ -243,6 +250,7 @@ public class ReachablePeerManager {
/**
* Adds the given peer to the cluster.
* The certificate provided is verified against the one peer is using right now.
*
* @param host the peer to add
* @param cert the certificate of the peer
*/
@@ -264,6 +272,7 @@ public class ReachablePeerManager {
/**
* Gets the information about all reachable peers that are not added to the cluster.
*
* @return a collection of pairs of peer id and peer info
*/
public Collection<Pair<PeerId, ApiPeerInfo>> getSeenButNotAddedHosts() {

View File

@@ -22,20 +22,6 @@ message ObjectChangelog {
repeated entries_Entry entries = 1;
}
// TODO: Serialization
message FileDtoP {
bytes serializedData = 1;
}
message ChunkDataP {
JObjectKeyP key = 1;
bytes data = 2;
}
message PeerInfoP {
bytes serializedData = 1;
}
message JDataRemoteDtoP {
bytes serializedData = 1;
}

View File

@@ -1,3 +1,5 @@
# https://chatgpt.com/c/681762a4-dddc-800a-adad-2797355013f8
$ErrorActionPreference = 'Stop'
$PIDFILE = Join-Path $PSScriptRoot ".pid"

View File

@@ -1,3 +1,5 @@
# https://chatgpt.com/c/681762a4-dddc-800a-adad-2797355013f8
$ErrorActionPreference = 'Stop'
$PIDFILE = Join-Path $PSScriptRoot ".pid"

View File

@@ -1,3 +1,5 @@
# https://chatgpt.com/c/681762a4-dddc-800a-adad-2797355013f8
$ErrorActionPreference = 'Stop'
$PIDFILE = Join-Path $PSScriptRoot ".pid"