6 Commits

30 changed files with 354 additions and 230 deletions

View File

@@ -21,7 +21,7 @@ jobs:
- name: Checkout
uses: actions/checkout@v4
with:
submodules: 'recursive'
submodules: "recursive"
- name: Install sudo for ACT
run: apt-get update && apt-get install -y sudo
@@ -32,7 +32,7 @@ jobs:
- name: User allow other for fuse
run: echo "user_allow_other" | sudo tee -a /etc/fuse.conf
- name: Dump fuse.conf
run: cat /etc/fuse.conf
@@ -212,7 +212,7 @@ jobs:
run: mkdir -p run-wrapper-out/dhfs/data && mkdir -p run-wrapper-out/dhfs/fuse && mkdir -p run-wrapper-out/dhfs/app
- name: Copy DHFS
run: cp -r ./dhfs-package-downloaded "run-wrapper-out/dhfs/app/DHFS Package"
run: cp -r ./dhfs-package-downloaded "run-wrapper-out/dhfs/app/Server"
- name: Copy Webui
run: cp -r ./webui-dist-downloaded "run-wrapper-out/dhfs/app/Webui"

View File

@@ -168,35 +168,6 @@ public class DhfsFuseIT {
"rewritten\n".equals(container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testf1").getStdout()));
}
// TODO: How this fits with the tree?
@Test
@Disabled
void deleteDelayedTest() throws IOException, InterruptedException, TimeoutException {
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo tesempty > /dhfs_test/fuse/testf1").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> "tesempty\n".equals(container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testf1").getStdout()));
await().atMost(45, TimeUnit.SECONDS).until(() -> "tesempty\n".equals(container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testf1").getStdout()));
var client = DockerClientFactory.instance().client();
client.pauseContainerCmd(container2.getContainerId()).exec();
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Lost connection to"), 60, TimeUnit.SECONDS);
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "rm /dhfs_test/fuse/testf1").getExitCode());
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Delaying deletion check"), 60, TimeUnit.SECONDS, 1);
client.unpauseContainerCmd(container2.getContainerId()).exec();
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse").getExitCode());
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Deleting from persistent"), 60, TimeUnit.SECONDS, 1);
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Deleting from persistent"), 60, TimeUnit.SECONDS, 3);
await().atMost(45, TimeUnit.SECONDS).until(() -> 1 == container2.execInContainer("/bin/sh", "-c", "test -f /dhfs_test/fuse/testf1").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> 1 == container1.execInContainer("/bin/sh", "-c", "test -f /dhfs_test/fuse/testf1").getExitCode());
}
@Test
void deleteTest() throws IOException, InterruptedException, TimeoutException {
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo tesempty > /dhfs_test/fuse/testf1").getExitCode());
@@ -221,6 +192,28 @@ public class DhfsFuseIT {
1 == container1.execInContainer("/bin/sh", "-c", "test -f /dhfs_test/fuse/testf1").getExitCode());
}
@Test
void deleteTestKickedOut() throws IOException, InterruptedException, TimeoutException {
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo tesempty > /dhfs_test/fuse/testf1").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() ->
"tesempty\n".equals(container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testf1").getStdout()));
await().atMost(45, TimeUnit.SECONDS).until(() ->
"tesempty\n".equals(container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testf1").getStdout()));
container2.stop();
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("kicked"), 60, TimeUnit.SECONDS, 1);
Log.info("Deleting");
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "rm /dhfs_test/fuse/testf1").getExitCode());
Log.info("Deleted");
// FIXME?
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Deleting from persistent"), 60, TimeUnit.SECONDS, 3);
await().atMost(45, TimeUnit.SECONDS).until(() ->
1 == container1.execInContainer("/bin/sh", "-c", "test -f /dhfs_test/fuse/testf1").getExitCode());
}
@Test
void moveFileTest() throws IOException, InterruptedException, TimeoutException {
Log.info("Creating");

View File

@@ -79,6 +79,8 @@ public class DhfsImage implements Future<String> {
"-Ddhfs.objects.sync.timeout=30",
"-Ddhfs.objects.sync.ping.timeout=5",
"-Ddhfs.objects.reconnect_interval=1s",
"-Ddhfs.objects.last-seen.timeout=30",
"-Ddhfs.objects.last-seen.update=10",
"-Ddhfs.sync.cert-check=false",
"-Dquarkus.log.category.\"com.usatiuk\".level=TRACE",
"-Dquarkus.log.category.\"com.usatiuk.dhfs\".level=TRACE",

View File

@@ -246,9 +246,9 @@ public class DhfsFuse extends FuseStubFS {
@Override
public int write(String path, Pointer buf, long size, long offset, FuseFileInfo fi) {
var buffer = UninitializedByteBuffer.allocate((int) size);
UnsafeAccessor.get().getUnsafe().copyMemory(
UnsafeAccessor.UNSAFE.copyMemory(
buf.address(),
UnsafeAccessor.get().getNioAccess().getBufferAddress(buffer),
UnsafeAccessor.NIO.getBufferAddress(buffer),
size
);
return write(path, buffer, offset, fi);

View File

@@ -46,9 +46,9 @@ public class JnrPtrByteOutput extends ByteOutput {
if (value instanceof MappedByteBuffer mb) {
mb.load();
}
long addr = UnsafeAccessor.get().getNioAccess().getBufferAddress(value) + value.position();
long addr = UnsafeAccessor.NIO.getBufferAddress(value) + value.position();
var out = _backing.address() + _pos;
UnsafeAccessor.get().getUnsafe().copyMemory(addr, out, rem);
UnsafeAccessor.UNSAFE.copyMemory(addr, out, rem);
} else {
_backing.put(_pos, value.array(), value.arrayOffset() + value.position(), rem);
}

View File

@@ -3,9 +3,11 @@ package com.usatiuk.objects.stores;
import com.usatiuk.objects.JDataVersionedWrapper;
import com.usatiuk.objects.JDataVersionedWrapperImpl;
import com.usatiuk.objects.JObjectKey;
import com.usatiuk.objects.iterators.*;
import com.usatiuk.objects.iterators.CloseableKvIterator;
import com.usatiuk.objects.iterators.IteratorStart;
import com.usatiuk.objects.iterators.MaybeTombstone;
import com.usatiuk.objects.iterators.NavigableMapKvIterator;
import com.usatiuk.objects.snapshot.Snapshot;
import com.usatiuk.objects.transaction.TxCommitException;
import com.usatiuk.objects.transaction.TxRecord;
import com.usatiuk.utils.ListUtils;
import io.quarkus.logging.Log;
@@ -25,7 +27,6 @@ import javax.annotation.Nonnull;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
@@ -218,17 +219,18 @@ public class WritebackObjectPersistentStore {
var curPwMap = curPw.pendingWrites();
for (var action : writes) {
var key = action.key();
switch (action) {
case TxRecord.TxObjectRecordWrite<?> write -> {
// Log.tracev("Flushing object {0}", write.key());
var wrapper = new JDataVersionedWrapperImpl(write.data(), oursId);
curPwMap = curPwMap.plus(write.key(), new PendingWrite(wrapper, oursId));
curPwMap = curPwMap.plus(key, new PendingWrite(wrapper, oursId));
curBundle.commit(wrapper);
}
case TxRecord.TxObjectRecordDeleted deleted -> {
// Log.tracev("Deleting object {0}", deleted.key());
curPwMap = curPwMap.plus(deleted.key(), new PendingDelete(deleted.key(), oursId));
curBundle.delete(deleted.key());
curPwMap = curPwMap.plus(key, new PendingDelete(key, oursId));
curBundle.delete(key);
}
}
}

View File

@@ -19,7 +19,6 @@ import org.apache.commons.lang3.tuple.Pair;
import java.util.*;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Stream;
@ApplicationScoped
public class JObjectManager {
@@ -75,11 +74,12 @@ public class JObjectManager {
}
for (var n : tx.drainNewWrites()) {
var key = n.key();
for (var hookPut : hookIterationData) {
hookPut.pendingWrites().put(n.key(), n);
hookPut.pendingWrites().put(key, n);
pendingCount++;
}
writes.put(n.key(), n);
writes.put(key, n);
}
@@ -108,19 +108,20 @@ public class JObjectManager {
// Log.trace("Commit iteration with " + curIteration.size() + " records for hook " + hook.getClass());
for (var entry : curIteration.entrySet()) {
var key = entry.getKey();
// Log.trace("Running pre-commit hook " + hook.getClass() + " for" + entry.getKey());
var oldObj = getPrev.apply(entry.getKey());
lastCurHookSeen.put(entry.getKey(), entry.getValue());
var oldObj = getPrev.apply(key);
lastCurHookSeen.put(key, entry.getValue());
switch (entry.getValue()) {
case TxRecord.TxObjectRecordWrite<?> write -> {
if (oldObj == null) {
hook.onCreate(write.key(), write.data());
hook.onCreate(key, write.data());
} else {
hook.onChange(write.key(), oldObj, write.data());
hook.onChange(key, oldObj, write.data());
}
}
case TxRecord.TxObjectRecordDeleted deleted -> {
hook.onDelete(deleted.key(), oldObj);
hook.onDelete(key, oldObj);
}
default -> throw new TxCommitException("Unexpected value: " + entry);
}
@@ -130,16 +131,17 @@ public class JObjectManager {
curIteration.clear();
for (var n : tx.drainNewWrites()) {
var key = n.key();
for (var hookPut : hookIterationData) {
if (hookPut == hookId) {
lastCurHookSeen.put(n.key(), n);
lastCurHookSeen.put(key, n);
continue;
}
var before = hookPut.pendingWrites().put(n.key(), n);
var before = hookPut.pendingWrites().put(key, n);
if (before == null)
pendingCount++;
}
writes.put(n.key(), n);
writes.put(key, n);
}
}
}

View File

@@ -63,6 +63,8 @@ public class TransactionFactoryImpl implements TransactionFactory {
private final HashSet<JObjectKey> _knownNew = new HashSet<>();
private final Snapshot<JObjectKey, JDataVersionedWrapper> _snapshot;
private boolean _closed = false;
private boolean _writeTrack = false;
private Map<JObjectKey, TxRecord.TxObjectRecord<?>> _newWrites = new HashMap<>();
private TransactionImpl() {
@@ -99,43 +101,29 @@ public class TransactionFactoryImpl implements TransactionFactory {
if (_knownNew.contains(key)) {
return Optional.empty();
}
var got = _readSet.computeIfAbsent(key, k -> {
var read = _snapshot.readObject(k);
return read;
});
if (got.isEmpty())
return Optional.empty();
var gotData = got.get();
return Optional.of(type.cast(gotData.data()));
return _readSet.computeIfAbsent(key, _snapshot::readObject)
.map(JDataVersionedWrapper::data)
.map(type::cast);
}
@Override
public <T extends JData> Optional<T> get(Class<T> type, JObjectKey key, LockingStrategy strategy) {
switch (_writes.get(key)) {
case TxRecord.TxObjectRecordWrite<?> write -> {
return Optional.of(type.cast(write.data()));
}
case TxRecord.TxObjectRecordDeleted deleted -> {
return Optional.empty();
}
case null, default -> {
}
}
return getFromSource(type, key);
return switch (_writes.get(key)) {
case TxRecord.TxObjectRecordWrite<?> write -> Optional.of(type.cast(write.data()));
case TxRecord.TxObjectRecordDeleted deleted -> Optional.empty();
case null -> getFromSource(type, key);
};
}
@Override
public void delete(JObjectKey key) {
var got = _writes.get(key);
if (got != null) {
if (got instanceof TxRecord.TxObjectRecordDeleted) {
return;
}
var record = new TxRecord.TxObjectRecordDeleted(key);
if (_writes.put(key, record) instanceof TxRecord.TxObjectRecordDeleted) {
return;
}
_writes.put(key, new TxRecord.TxObjectRecordDeleted(key));
_newWrites.put(key, new TxRecord.TxObjectRecordDeleted(key));
if (_writeTrack)
_newWrites.put(key, record);
}
@Override
@@ -163,25 +151,35 @@ public class TransactionFactoryImpl implements TransactionFactory {
@Override
public void put(JData obj) {
var read = _readSet.get(obj.key());
var key = obj.key();
var read = _readSet.get(key);
if (read != null && (read.map(JDataVersionedWrapper::data).orElse(null) == obj)) {
return;
}
_writes.put(obj.key(), new TxRecord.TxObjectRecordWrite<>(obj));
_newWrites.put(obj.key(), new TxRecord.TxObjectRecordWrite<>(obj));
var record = new TxRecord.TxObjectRecordWrite<>(obj);
_writes.put(key, record);
if (_writeTrack)
_newWrites.put(key, record);
}
@Override
public void putNew(JData obj) {
_knownNew.add(obj.key());
var key = obj.key();
_knownNew.add(key);
_writes.put(obj.key(), new TxRecord.TxObjectRecordWrite<>(obj));
_newWrites.put(obj.key(), new TxRecord.TxObjectRecordWrite<>(obj));
var record = new TxRecord.TxObjectRecordWrite<>(obj);
_writes.put(key, record);
if (_writeTrack)
_newWrites.put(key, record);
}
@Override
public Collection<TxRecord.TxObjectRecord<?>> drainNewWrites() {
if (!_writeTrack) {
_writeTrack = true;
return Collections.unmodifiableCollection(_writes.values());
}
var ret = _newWrites;
_newWrites = new HashMap<>();
return ret.values();

View File

@@ -12,8 +12,8 @@ public interface TransactionManager {
void rollback();
default <T> T runTries(Supplier<T> supplier, int tries) {
if (current() != null) {
default <T> T runTries(Supplier<T> supplier, int tries, boolean nest) {
if (!nest && current() != null) {
return supplier.get();
}
@@ -41,8 +41,8 @@ public interface TransactionManager {
}
}
default TransactionHandle runTries(VoidFn fn, int tries) {
if (current() != null) {
default TransactionHandle runTries(VoidFn fn, int tries, boolean nest) {
if (!nest && current() != null) {
fn.apply();
return new TransactionHandle() {
@Override
@@ -74,23 +74,38 @@ public interface TransactionManager {
throw e;
}
}
}
default <T> T runTries(Supplier<T> supplier, int tries) {
return runTries(supplier, tries, false);
}
default TransactionHandle runTries(VoidFn fn, int tries) {
return runTries(fn, tries, false);
}
default TransactionHandle run(VoidFn fn, boolean nest) {
return runTries(fn, 10, nest);
}
default <T> T run(Supplier<T> supplier, boolean nest) {
return runTries(supplier, 10, nest);
}
default TransactionHandle run(VoidFn fn) {
return runTries(fn, 10);
return run(fn, false);
}
default <T> T run(Supplier<T> supplier) {
return runTries(supplier, 10);
return run(supplier, false);
}
default void executeTx(VoidFn fn) {
run(fn);
run(fn, false);
}
default <T> T executeTx(Supplier<T> supplier) {
return run(supplier);
return run(supplier, false);
}
Transaction current();

View File

@@ -1,47 +1,48 @@
package com.usatiuk.objects.transaction;
import io.quarkus.logging.Log;
import jakarta.annotation.Nullable;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import org.apache.commons.lang3.tuple.Pair;
import java.util.Collection;
import java.util.concurrent.ExecutorService;
import java.util.Stack;
@Singleton
public class TransactionManagerImpl implements TransactionManager {
private static final ThreadLocal<TransactionPrivate> _currentTransaction = new ThreadLocal<>();
private static final ThreadLocal<Stack<TransactionPrivate>> _currentTransaction = ThreadLocal.withInitial(Stack::new);
@Inject
JObjectManager jObjectManager;
@Override
public void begin() {
if (_currentTransaction.get() != null) {
throw new IllegalStateException("Transaction already started");
}
Log.trace("Starting transaction");
var tx = jObjectManager.createTransaction();
_currentTransaction.set(tx);
_currentTransaction.get().push(tx);
}
@Override
public TransactionHandle commit() {
if (_currentTransaction.get() == null) {
var stack = _currentTransaction.get();
if (stack.empty()) {
throw new IllegalStateException("No transaction started");
}
var peeked = stack.peek();
Log.trace("Committing transaction");
Pair<Collection<Runnable>, TransactionHandle> ret;
try {
ret = jObjectManager.commit(_currentTransaction.get());
ret = jObjectManager.commit(peeked);
} catch (Throwable e) {
Log.trace("Transaction commit failed", e);
throw e;
} finally {
_currentTransaction.get().close();
_currentTransaction.remove();
peeked.close();
stack.pop();
if (stack.empty())
_currentTransaction.remove();
}
for (var r : ret.getLeft()) {
@@ -56,24 +57,33 @@ public class TransactionManagerImpl implements TransactionManager {
@Override
public void rollback() {
if (_currentTransaction.get() == null) {
var stack = _currentTransaction.get();
if (stack.empty()) {
throw new IllegalStateException("No transaction started");
}
var peeked = stack.peek();
try {
jObjectManager.rollback(_currentTransaction.get());
jObjectManager.rollback(peeked);
} catch (Throwable e) {
Log.error("Transaction rollback failed", e);
throw e;
} finally {
_currentTransaction.get().close();
_currentTransaction.remove();
peeked.close();
stack.pop();
if (stack.empty())
_currentTransaction.remove();
}
}
@Override
@Nullable
public Transaction current() {
return _currentTransaction.get();
var stack = _currentTransaction.get();
if (stack.empty()) {
return null;
}
return stack.peek();
}
}

View File

@@ -1,10 +1,12 @@
dhfs.objects.persistence=lmdb
dhfs.objects.writeback.limit=134217728
dhfs.objects.lru.limit=134217728
dhfs.objects.writeback.limit=16777216
dhfs.objects.lru.limit=67108864
dhfs.objects.lru.print-stats=false
dhfs.objects.lock_timeout_secs=15
dhfs.objects.persistence.files.root=${HOME}/dhfs_default/data/objs
dhfs.objects.persistence.snapshot-extra-checks=false
dhfs.objects.transaction.never-lock=true
dhfs.objects.last-seen.update=60
dhfs.objects.last-seen.timeout=43200
quarkus.log.category."com.usatiuk.objects.iterators".level=INFO
quarkus.log.category."com.usatiuk.objects.iterators".min-level=INFO

View File

@@ -19,8 +19,6 @@ import jakarta.inject.Inject;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -56,27 +54,21 @@ public class AutosyncProcessor {
if (downloadAll)
executorService.submit(() -> {
Log.info("Adding all to autosync");
List<JObjectKey> objs = new LinkedList<>();
txm.run(() -> {
try (var it = curTx.getIterator(IteratorStart.GE, JObjectKey.first())) {
while (it.hasNext()) {
var key = it.peekNextKey();
objs.add(key);
// TODO: Nested transactions
txm.run(() -> {
var gotObj = curTx.get(JData.class, key).orElse(null);
if (!(gotObj instanceof RemoteObjectMeta meta))
return;
if (!meta.hasLocalData())
add(meta.key());
}, true);
it.skip();
}
}
});
for (var obj : objs) {
txm.run(() -> {
var gotObj = curTx.get(JData.class, obj).orElse(null);
if (!(gotObj instanceof RemoteObjectMeta meta))
return;
if (!meta.hasLocalData())
add(meta.key());
});
}
Log.info("Adding all to autosync: finished");
});
}

View File

@@ -1,13 +1,14 @@
package com.usatiuk.dhfs.invalidation;
import com.usatiuk.dhfs.peersync.PeerId;
import com.usatiuk.dhfs.remoteobj.JDataRemoteDto;
import com.usatiuk.objects.JObjectKey;
import org.pcollections.PMap;
import java.util.Collection;
import java.util.List;
public record IndexUpdateOp(JObjectKey key, PMap<PeerId, Long> changelog) implements Op {
public record IndexUpdateOp(JObjectKey key, PMap<PeerId, Long> changelog, JDataRemoteDto data) implements Op {
@Override
public Collection<JObjectKey> getEscapedRefs() {
return List.of(key);

View File

@@ -1,10 +1,14 @@
package com.usatiuk.dhfs.invalidation;
import com.usatiuk.dhfs.peersync.PeerId;
import com.usatiuk.dhfs.remoteobj.JDataRemoteDto;
import com.usatiuk.dhfs.remoteobj.JDataRemotePush;
import com.usatiuk.dhfs.remoteobj.RemoteObjectMeta;
import com.usatiuk.dhfs.remoteobj.RemoteTransaction;
import com.usatiuk.dhfs.syncmap.DtoMapperService;
import com.usatiuk.objects.transaction.Transaction;
import com.usatiuk.objects.transaction.TransactionManager;
import io.quarkus.logging.Log;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import org.apache.commons.lang3.tuple.Pair;
@@ -19,11 +23,22 @@ public class RemoteObjectMetaOpExtractor implements OpExtractor<RemoteObjectMeta
Transaction curTx;
@Inject
RemoteTransaction remoteTransaction;
@Inject
DtoMapperService dtoMapperService;
@Override
public Pair<List<Op>, Runnable> extractOps(RemoteObjectMeta data, PeerId peerId) {
return txm.run(() -> {
return Pair.of(List.of(new IndexUpdateOp(data.key(), data.changelog())), () -> {
JDataRemoteDto dto =
data.knownType().isAnnotationPresent(JDataRemotePush.class)
? remoteTransaction.getData(data.knownType(), data.key())
.map(d -> dtoMapperService.toDto(d, d.dtoClass())).orElse(null)
: null;
if (data.knownType().isAnnotationPresent(JDataRemotePush.class) && dto == null) {
Log.warnv("Failed to get data for push {0} of type {1}", data.key(), data.knownType());
}
return Pair.of(List.of(new IndexUpdateOp(data.key(), data.changelog(), dto)), () -> {
});
});
}

View File

@@ -24,6 +24,6 @@ public class JKleppmannTreePeerInterface implements PeerInterface<PeerId> {
@Override
public Collection<PeerId> getAllPeers() {
return peerInfoService.getPeers().stream().map(PeerInfo::id).toList();
return peerInfoService.getSynchronizedPeers().stream().map(PeerInfo::id).toList();
}
}

View File

@@ -4,24 +4,48 @@ import com.google.protobuf.ByteString;
import com.usatiuk.dhfs.peertrust.CertificateTools;
import com.usatiuk.dhfs.remoteobj.JDataRemote;
import com.usatiuk.dhfs.remoteobj.JDataRemoteDto;
import com.usatiuk.dhfs.remoteobj.JDataRemotePush;
import com.usatiuk.objects.JObjectKey;
import org.pcollections.HashTreePMap;
import org.pcollections.PMap;
import java.security.cert.X509Certificate;
public record PeerInfo(JObjectKey key, PeerId id, ByteString cert) implements JDataRemote, JDataRemoteDto {
@JDataRemotePush
public record PeerInfo(JObjectKey key, PeerId id, ByteString cert,
PMap<PeerId, Long> kickCounter,
long lastSeenTimestamp) implements JDataRemote, JDataRemoteDto {
public PeerInfo(PeerId id, byte[] cert) {
this(id.toJObjectKey(), id, ByteString.copyFrom(cert));
this(id.toJObjectKey(), id, ByteString.copyFrom(cert), HashTreePMap.empty(), System.currentTimeMillis());
}
public X509Certificate parsedCert() {
return CertificateTools.certFromBytes(cert.toByteArray());
}
public PeerInfo withKickCounter(PMap<PeerId, Long> kickCounter) {
return new PeerInfo(key, id, cert, kickCounter, lastSeenTimestamp);
}
public PeerInfo withIncrementedKickCounter(PeerId peerId) {
return new PeerInfo(key, id, cert, kickCounter.plus(peerId, kickCounter.getOrDefault(peerId, 0L) + 1), lastSeenTimestamp);
}
public PeerInfo withLastSeenTimestamp(long lastSeenTimestamp) {
return new PeerInfo(key, id, cert, kickCounter, lastSeenTimestamp);
}
public long kickCounterSum() {
return kickCounter.values().stream().mapToLong(Long::longValue).sum();
}
@Override
public String toString() {
return "PeerInfo{" +
"key=" + key +
", id=" + id +
", kickCounter=" + kickCounter +
", lastSeenTimestamp=" + lastSeenTimestamp +
'}';
}
}

View File

@@ -87,17 +87,27 @@ public class PeerInfoService {
public List<PeerInfo> getPeersNoSelf() {
return jObjectTxManager.run(() -> {
var gotKey = getTreeR().traverse(List.of());
return curTx.get(JKleppmannTreeNodeHolder.class, gotKey).map(JKleppmannTreeNodeHolder::node).map(
node -> node.children().keySet().stream()
.map(JObjectKey::of).map(this::getPeerInfoImpl)
.filter(o -> {
if (o.isEmpty())
Log.warnv("Could not get peer info for peer {0}", o);
return o.isPresent();
}).map(Optional::get).filter(
peerInfo -> !peerInfo.id().equals(persistentPeerDataService.getSelfUuid())).toList())
.orElseThrow();
return getPeers().stream().filter(
peerInfo -> !peerInfo.id().equals(persistentPeerDataService.getSelfUuid())).toList();
});
}
public List<PeerInfo> getSynchronizedPeers() {
return jObjectTxManager.run(() -> {
return getPeers().stream().filter(pi -> {
if (pi.id().equals(persistentPeerDataService.getSelfUuid())) {
return true;
}
return persistentPeerDataService.isInitialSyncDone(pi.id());
}).toList();
});
}
public List<PeerInfo> getSynchronizedPeersNoSelf() {
return jObjectTxManager.run(() -> {
return getPeersNoSelf().stream().filter(pi -> {
return persistentPeerDataService.isInitialSyncDone(pi.id());
}).toList();
});
}

View File

@@ -89,10 +89,11 @@ public class PeerInfoSyncHandler implements ObjSyncHandler<PeerInfo, PeerInfo> {
if (oursCurData == null)
throw new StatusRuntimeException(Status.ABORTED.withDescription("Conflict but we don't have local copy"));
if (!receivedData.equals(oursCurData))
throw new StatusRuntimeException(Status.ABORTED.withDescription("PeerInfo data conflict"));
if (!receivedData.cert().equals(oursCurData.cert()))
throw new StatusRuntimeException(Status.ABORTED.withDescription("PeerInfo certificate conflict for " + key));
HashPMap<PeerId, Long> newChangelog = HashTreePMap.from(current.changelog());
HashPMap<PeerId, Long> newKickCounter = HashTreePMap.from(oursCurData.kickCounter());
for (var entry : receivedChangelog.entrySet()) {
newChangelog = newChangelog.plus(entry.getKey(),
@@ -100,6 +101,20 @@ public class PeerInfoSyncHandler implements ObjSyncHandler<PeerInfo, PeerInfo> {
);
}
for (var entry : receivedData.kickCounter().entrySet()) {
newKickCounter = newKickCounter.plus(entry.getKey(),
Long.max(newKickCounter.getOrDefault(entry.getKey(), 0L), entry.getValue())
);
}
var newData = oursCurData.withKickCounter(newKickCounter)
.withLastSeenTimestamp(Math.max(oursCurData.lastSeenTimestamp(), receivedData.lastSeenTimestamp()));
if (!newData.equals(oursCurData))
newChangelog = newChangelog.plus(persistentPeerDataService.getSelfUuid(), newChangelog.getOrDefault(persistentPeerDataService.getSelfUuid(), 0L) + 1L);
remoteTx.putDataRaw(newData);
current = current.withChangelog(newChangelog);
}
}

View File

@@ -0,0 +1,60 @@
package com.usatiuk.dhfs.peersync;
import com.usatiuk.dhfs.remoteobj.RemoteTransaction;
import com.usatiuk.objects.transaction.Transaction;
import com.usatiuk.objects.transaction.TransactionManager;
import io.quarkus.scheduler.Scheduled;
import io.smallrye.common.annotation.Blocking;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import org.eclipse.microprofile.config.inject.ConfigProperty;
@ApplicationScoped
public class PeerLastSeenUpdater {
@Inject
PeerManager peerManager;
@Inject
PeerInfoService peerInfoService;
@Inject
Transaction curTx;
@Inject
TransactionManager txm;
@Inject
RemoteTransaction remoteTransaction;
@ConfigProperty(name = "dhfs.objects.last-seen.timeout")
long lastSeenTimeout;
@Inject
PersistentPeerDataService persistentPeerDataService;
@Scheduled(every = "${dhfs.objects.last-seen.update}", concurrentExecution = Scheduled.ConcurrentExecution.SKIP)
@Blocking
void update() {
var snapshot = peerManager.getHostStateSnapshot();
for (var a : snapshot.available()) {
txm.run(() -> {
var curInfo = remoteTransaction.getData(PeerInfo.class, a.id()).orElse(null);
if (curInfo == null) return;
var newInfo = curInfo.withLastSeenTimestamp(System.currentTimeMillis());
remoteTransaction.putData(newInfo);
});
}
for (var u : snapshot.unavailable()) {
txm.run(() -> {
if (!persistentPeerDataService.isInitialSyncDone(u))
return;
var curInfo = remoteTransaction.getData(PeerInfo.class, u.id()).orElse(null);
if (curInfo == null) return;
var lastSeen = curInfo.lastSeenTimestamp();
if (System.currentTimeMillis() - lastSeen > (lastSeenTimeout * 1000)) {
var kicked = curInfo.withIncrementedKickCounter(persistentPeerDataService.getSelfUuid());
remoteTransaction.putData(kicked);
}
});
}
}
}

View File

@@ -125,19 +125,23 @@ public class PeerManager {
}
}
public void handleConnectionError(com.usatiuk.dhfs.peersync.PeerInfo host) {
public void handleConnectionError(PeerId host) {
boolean wasReachable = isReachable(host);
if (wasReachable)
Log.infov("Lost connection to {0}", host);
_states.remove(host.id());
_states.remove(host);
for (var l : _disconnectedListeners) {
l.handlePeerDisconnected(host.id());
l.handlePeerDisconnected(host);
}
}
public void handleConnectionError(com.usatiuk.dhfs.peersync.PeerInfo host) {
handleConnectionError(host.id());
}
// FIXME:
private boolean pingCheck(com.usatiuk.dhfs.peersync.PeerInfo host, PeerAddress address) {
try {

View File

@@ -51,6 +51,8 @@ public class PersistentPeerDataService {
PeerInfoService peerInfoService;
@Inject
TransactionManager txm;
@Inject
PeerManager peerManager;
@ConfigProperty(name = "dhfs.peerdiscovery.preset-uuid")
Optional<String> presetUuid;
@@ -89,21 +91,6 @@ public class PersistentPeerDataService {
Files.write(Path.of(stuffRoot, "self_uuid"), _selfUuid.id().toString().getBytes(), StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING);
}
// private void pushPeerUpdates() {
// pushPeerUpdates(null);
// }
// private void pushPeerUpdates(@Nullable JObject<?> obj) {
// if (obj != null)
// Log.info("Scheduling certificate update after " + obj.getMeta().getName() + " was updated");
// executorService.submit(() -> {
// updateCerts();
// invalidationQueueService.pushInvalidationToAll(PeerDirectory.PeerDirectoryObjName);
// for (var p : peerDirectory.get().runReadLocked(JObjectManager.ResolutionStrategy.LOCAL_ONLY, (m, d) -> d.getPeers().stream().toList()))
// invalidationQueueService.pushInvalidationToAll(PersistentPeerInfo.getNameFromUuid(p));
// });
// }
public PeerId getSelfUuid() {
return _selfUuid;
}
@@ -148,6 +135,7 @@ public class PersistentPeerDataService {
}
curTx.put(data.withInitialSyncDone(data.initialSyncDone().minus(peerId)));
Log.infov("Did reset sync state for {0}", peerId);
curTx.onCommit(() -> peerManager.handleConnectionError(peerId));
return true;
});
}
@@ -160,7 +148,6 @@ public class PersistentPeerDataService {
});
}
public List<IpPeerAddress> getPersistentPeerAddresses() {
return txm.run(() -> {
var data = curTx.get(PersistentRemoteHostsData.class, PersistentRemoteHostsData.KEY).orElse(null);

View File

@@ -1,7 +1,6 @@
package com.usatiuk.dhfs.peertrust;
import com.usatiuk.dhfs.invalidation.InvalidationQueueService;
import com.usatiuk.dhfs.jkleppmanntree.structs.JKleppmannTreeNode;
import com.usatiuk.dhfs.jkleppmanntree.structs.JKleppmannTreeNodeHolder;
import com.usatiuk.dhfs.peersync.PeerInfo;
import com.usatiuk.dhfs.peersync.PeerInfoService;
@@ -43,7 +42,7 @@ public class PeerInfoCertUpdateTxHook implements PreCommitTxHook {
for (var curRef : oldNode.node().children().entrySet()) {
if (!n.node().children().containsKey(curRef.getKey())) {
Log.infov("Will reset sync state for {0}", curRef.getValue());
curTx.onCommit(() -> persistentPeerDataService.resetInitialSyncDone(JKleppmannTreeNodeMetaPeer.nodeIdToPeerId(curRef.getValue())));
persistentPeerDataService.resetInitialSyncDone(JKleppmannTreeNodeMetaPeer.nodeIdToPeerId(curRef.getValue()));
}
}
return;
@@ -54,9 +53,16 @@ public class PeerInfoCertUpdateTxHook implements PreCommitTxHook {
return;
}
if (!(remote.data() instanceof PeerInfo))
if (!(remote.data() instanceof PeerInfo curPi))
return;
var oldPi = (PeerInfo) ((RemoteObjectDataWrapper) old).data();
if (oldPi.kickCounterSum() != curPi.kickCounterSum()) {
Log.warnv("Peer kicked out: {0} to {1}", key, cur);
persistentPeerDataService.resetInitialSyncDone(curPi.id());
}
Log.infov("Changed peer info: {0} to {1}", key, cur);
curTx.onCommit(() -> invalidationQueueService.pushInvalidationToAll(key));

View File

@@ -20,7 +20,7 @@ public class IndexUpdateOpHandler implements OpHandler<IndexUpdateOp> {
@Override
public void handleOp(PeerId from, IndexUpdateOp op) {
txm.run(() -> {
syncHandler.handleRemoteUpdate(from, op.key(), op.changelog(), null);
syncHandler.handleRemoteUpdate(from, op.key(), op.changelog(), op.data());
});
}
}

View File

@@ -0,0 +1,11 @@
package com.usatiuk.dhfs.remoteobj;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
public @interface JDataRemotePush {
}

View File

@@ -132,7 +132,7 @@ public class RemoteObjectDeleter {
return true;
}
var knownHosts = peerInfoService.getPeersNoSelf();
var knownHosts = peerInfoService.getSynchronizedPeersNoSelf();
RemoteObjectMeta finalTarget = target;
List<PeerId> missing = knownHosts.stream()
.map(PeerInfo::id)
@@ -187,7 +187,7 @@ public class RemoteObjectDeleter {
if (!obj.seen())
return true;
var knownHosts = peerInfoService.getPeersNoSelf();
var knownHosts = peerInfoService.getSynchronizedPeersNoSelf();
boolean missing = false;
for (var x : knownHosts) {
if (!obj.confirmedDeletes().contains(x.id())) {

View File

@@ -23,7 +23,10 @@ import org.pcollections.PMap;
import javax.annotation.Nullable;
import java.lang.reflect.ParameterizedType;
import java.util.*;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Stream;
@ApplicationScoped
@@ -124,52 +127,42 @@ public class SyncHandler {
public void resyncAfterCrash(@Observes @Priority(100000) StartupEvent event) {
if (shutdownChecker.lastShutdownClean())
return;
List<JObjectKey> objs = new LinkedList<>();
txm.run(() -> {
try (var it = curTx.getIterator(IteratorStart.GE, JObjectKey.first())) {
while (it.hasNext()) {
var key = it.peekNextKey();
objs.add(key);
// TODO: Nested transactions
txm.run(() -> {
var proc = curTx.get(JData.class, key).flatMap(o -> Optional.ofNullable(_initialSyncProcessors.get(o.getClass()))).orElse(null);
if (proc != null) {
proc.handleCrash(key);
}
Log.infov("Handled crash of {0}", key);
}, true);
it.skip();
}
}
});
for (var obj : objs) {
txm.run(() -> {
var proc = curTx.get(JData.class, obj).flatMap(o -> Optional.ofNullable(_initialSyncProcessors.get(o.getClass()))).orElse(null);
if (proc != null) {
proc.handleCrash(obj);
}
Log.infov("Handled crash of {0}", obj);
});
}
}
public void doInitialSync(PeerId peer) {
List<JObjectKey> objs = new LinkedList<>();
txm.run(() -> {
Log.tracev("Will do initial sync for {0}", peer);
try (var it = curTx.getIterator(IteratorStart.GE, JObjectKey.first())) {
while (it.hasNext()) {
var key = it.peekNextKey();
objs.add(key);
// TODO: Nested transactions
txm.run(() -> {
var proc = curTx.get(JData.class, key).flatMap(o -> Optional.ofNullable(_initialSyncProcessors.get(o.getClass()))).orElse(null);
if (proc != null) {
proc.prepareForInitialSync(peer, key);
}
Log.infov("Adding to initial sync for peer {0}: {1}", peer, key);
invalidationQueueService.pushInvalidationToOne(peer, key);
}, true);
it.skip();
}
}
});
for (var obj : objs) {
txm.run(() -> {
var proc = curTx.get(JData.class, obj).flatMap(o -> Optional.ofNullable(_initialSyncProcessors.get(o.getClass()))).orElse(null);
if (proc != null) {
proc.prepareForInitialSync(peer, obj);
}
Log.infov("Adding to initial sync for peer {0}: {1}", peer, obj);
invalidationQueueService.pushInvalidationToOne(peer, obj);
});
}
}
}

View File

@@ -17,7 +17,7 @@ public class UninitializedByteBuffer {
);
public static ByteBuffer allocate(int capacity) {
UnsafeAccessor.get().getNioAccess().reserveMemory(capacity, capacity);
UnsafeAccessor.NIO.reserveMemory(capacity, capacity);
MemorySegment segment = null;
try {
@@ -29,7 +29,7 @@ public class UninitializedByteBuffer {
Consumer<MemorySegment> cleanup = s -> {
try {
free.invokeExact(s);
UnsafeAccessor.get().getNioAccess().unreserveMemory(capacity, capacity);
UnsafeAccessor.NIO.unreserveMemory(capacity, capacity);
} catch (Throwable e) {
throw new RuntimeException(e);
}
@@ -39,6 +39,6 @@ public class UninitializedByteBuffer {
}
public static long getAddress(ByteBuffer buffer) {
return UnsafeAccessor.get().getNioAccess().getBufferAddress(buffer);
return UnsafeAccessor.NIO.getBufferAddress(buffer);
}
}

View File

@@ -6,36 +6,18 @@ import sun.misc.Unsafe;
import java.lang.reflect.Field;
public class UnsafeAccessor {
private static final UnsafeAccessor INSTANCE;
public abstract class UnsafeAccessor {
public static final JavaNioAccess NIO;
public static final Unsafe UNSAFE;
static {
try {
INSTANCE = new UnsafeAccessor();
NIO = SharedSecrets.getJavaNioAccess();
Field f = Unsafe.class.getDeclaredField("theUnsafe");
f.setAccessible(true);
UNSAFE = (Unsafe) f.get(null);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
public static UnsafeAccessor get() {
return INSTANCE;
}
private JavaNioAccess _nioAccess;
private Unsafe _unsafe;
private UnsafeAccessor() throws NoSuchFieldException, IllegalAccessException {
_nioAccess = SharedSecrets.getJavaNioAccess();
Field f = Unsafe.class.getDeclaredField("theUnsafe");
f.setAccessible(true);
_unsafe = (Unsafe) f.get(null);
}
public JavaNioAccess getNioAccess() {
return _nioAccess;
}
public Unsafe getUnsafe() {
return _unsafe;
}
}

View File

@@ -38,7 +38,7 @@ java \
-Dquarkus.log.category.\"com.usatiuk\".level=INFO \
-Dquarkus.log.category.\"com.usatiuk.dhfs\".level=INFO \
-Ddhfs.webui.root="$SCRIPT_DIR"/Webui $EXTRAOPTS_PARSED \
-jar "$SCRIPT_DIR"/"DHFS Package"/quarkus-run.jar >quarkus.log 2>&1 &
-jar "$SCRIPT_DIR"/"Server"/quarkus-run.jar >quarkus.log 2>&1 &
echo "Started $!"

View File

@@ -40,10 +40,10 @@ wget https://nightly.link/usatiuk/dhfs/actions/runs/$LATEST/Run%20wrapper.zip
unzip "Run wrapper.zip"
rm "Run wrapper.zip"
tar xvf "run-wrapper.tar.gz" --strip-components=2
tar xvf "run-wrapper.tar.gz" --strip-components 2
rm "run-wrapper.tar.gz"
rm -rf "DHFS Package"
rm -rf "Server"
rm -rf "Webui"
rm -rf "NativeLibs"