6 Commits

30 changed files with 354 additions and 230 deletions

View File

@@ -21,7 +21,7 @@ jobs:
- name: Checkout - name: Checkout
uses: actions/checkout@v4 uses: actions/checkout@v4
with: with:
submodules: 'recursive' submodules: "recursive"
- name: Install sudo for ACT - name: Install sudo for ACT
run: apt-get update && apt-get install -y sudo run: apt-get update && apt-get install -y sudo
@@ -32,7 +32,7 @@ jobs:
- name: User allow other for fuse - name: User allow other for fuse
run: echo "user_allow_other" | sudo tee -a /etc/fuse.conf run: echo "user_allow_other" | sudo tee -a /etc/fuse.conf
- name: Dump fuse.conf - name: Dump fuse.conf
run: cat /etc/fuse.conf run: cat /etc/fuse.conf
@@ -212,7 +212,7 @@ jobs:
run: mkdir -p run-wrapper-out/dhfs/data && mkdir -p run-wrapper-out/dhfs/fuse && mkdir -p run-wrapper-out/dhfs/app run: mkdir -p run-wrapper-out/dhfs/data && mkdir -p run-wrapper-out/dhfs/fuse && mkdir -p run-wrapper-out/dhfs/app
- name: Copy DHFS - name: Copy DHFS
run: cp -r ./dhfs-package-downloaded "run-wrapper-out/dhfs/app/DHFS Package" run: cp -r ./dhfs-package-downloaded "run-wrapper-out/dhfs/app/Server"
- name: Copy Webui - name: Copy Webui
run: cp -r ./webui-dist-downloaded "run-wrapper-out/dhfs/app/Webui" run: cp -r ./webui-dist-downloaded "run-wrapper-out/dhfs/app/Webui"

View File

@@ -168,35 +168,6 @@ public class DhfsFuseIT {
"rewritten\n".equals(container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testf1").getStdout())); "rewritten\n".equals(container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testf1").getStdout()));
} }
// TODO: How this fits with the tree?
@Test
@Disabled
void deleteDelayedTest() throws IOException, InterruptedException, TimeoutException {
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo tesempty > /dhfs_test/fuse/testf1").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> "tesempty\n".equals(container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testf1").getStdout()));
await().atMost(45, TimeUnit.SECONDS).until(() -> "tesempty\n".equals(container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testf1").getStdout()));
var client = DockerClientFactory.instance().client();
client.pauseContainerCmd(container2.getContainerId()).exec();
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Lost connection to"), 60, TimeUnit.SECONDS);
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "rm /dhfs_test/fuse/testf1").getExitCode());
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Delaying deletion check"), 60, TimeUnit.SECONDS, 1);
client.unpauseContainerCmd(container2.getContainerId()).exec();
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse").getExitCode());
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Deleting from persistent"), 60, TimeUnit.SECONDS, 1);
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Deleting from persistent"), 60, TimeUnit.SECONDS, 3);
await().atMost(45, TimeUnit.SECONDS).until(() -> 1 == container2.execInContainer("/bin/sh", "-c", "test -f /dhfs_test/fuse/testf1").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() -> 1 == container1.execInContainer("/bin/sh", "-c", "test -f /dhfs_test/fuse/testf1").getExitCode());
}
@Test @Test
void deleteTest() throws IOException, InterruptedException, TimeoutException { void deleteTest() throws IOException, InterruptedException, TimeoutException {
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo tesempty > /dhfs_test/fuse/testf1").getExitCode()); await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo tesempty > /dhfs_test/fuse/testf1").getExitCode());
@@ -221,6 +192,28 @@ public class DhfsFuseIT {
1 == container1.execInContainer("/bin/sh", "-c", "test -f /dhfs_test/fuse/testf1").getExitCode()); 1 == container1.execInContainer("/bin/sh", "-c", "test -f /dhfs_test/fuse/testf1").getExitCode());
} }
@Test
void deleteTestKickedOut() throws IOException, InterruptedException, TimeoutException {
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo tesempty > /dhfs_test/fuse/testf1").getExitCode());
await().atMost(45, TimeUnit.SECONDS).until(() ->
"tesempty\n".equals(container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testf1").getStdout()));
await().atMost(45, TimeUnit.SECONDS).until(() ->
"tesempty\n".equals(container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testf1").getStdout()));
container2.stop();
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("kicked"), 60, TimeUnit.SECONDS, 1);
Log.info("Deleting");
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "rm /dhfs_test/fuse/testf1").getExitCode());
Log.info("Deleted");
// FIXME?
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Deleting from persistent"), 60, TimeUnit.SECONDS, 3);
await().atMost(45, TimeUnit.SECONDS).until(() ->
1 == container1.execInContainer("/bin/sh", "-c", "test -f /dhfs_test/fuse/testf1").getExitCode());
}
@Test @Test
void moveFileTest() throws IOException, InterruptedException, TimeoutException { void moveFileTest() throws IOException, InterruptedException, TimeoutException {
Log.info("Creating"); Log.info("Creating");

View File

@@ -79,6 +79,8 @@ public class DhfsImage implements Future<String> {
"-Ddhfs.objects.sync.timeout=30", "-Ddhfs.objects.sync.timeout=30",
"-Ddhfs.objects.sync.ping.timeout=5", "-Ddhfs.objects.sync.ping.timeout=5",
"-Ddhfs.objects.reconnect_interval=1s", "-Ddhfs.objects.reconnect_interval=1s",
"-Ddhfs.objects.last-seen.timeout=30",
"-Ddhfs.objects.last-seen.update=10",
"-Ddhfs.sync.cert-check=false", "-Ddhfs.sync.cert-check=false",
"-Dquarkus.log.category.\"com.usatiuk\".level=TRACE", "-Dquarkus.log.category.\"com.usatiuk\".level=TRACE",
"-Dquarkus.log.category.\"com.usatiuk.dhfs\".level=TRACE", "-Dquarkus.log.category.\"com.usatiuk.dhfs\".level=TRACE",

View File

@@ -246,9 +246,9 @@ public class DhfsFuse extends FuseStubFS {
@Override @Override
public int write(String path, Pointer buf, long size, long offset, FuseFileInfo fi) { public int write(String path, Pointer buf, long size, long offset, FuseFileInfo fi) {
var buffer = UninitializedByteBuffer.allocate((int) size); var buffer = UninitializedByteBuffer.allocate((int) size);
UnsafeAccessor.get().getUnsafe().copyMemory( UnsafeAccessor.UNSAFE.copyMemory(
buf.address(), buf.address(),
UnsafeAccessor.get().getNioAccess().getBufferAddress(buffer), UnsafeAccessor.NIO.getBufferAddress(buffer),
size size
); );
return write(path, buffer, offset, fi); return write(path, buffer, offset, fi);

View File

@@ -46,9 +46,9 @@ public class JnrPtrByteOutput extends ByteOutput {
if (value instanceof MappedByteBuffer mb) { if (value instanceof MappedByteBuffer mb) {
mb.load(); mb.load();
} }
long addr = UnsafeAccessor.get().getNioAccess().getBufferAddress(value) + value.position(); long addr = UnsafeAccessor.NIO.getBufferAddress(value) + value.position();
var out = _backing.address() + _pos; var out = _backing.address() + _pos;
UnsafeAccessor.get().getUnsafe().copyMemory(addr, out, rem); UnsafeAccessor.UNSAFE.copyMemory(addr, out, rem);
} else { } else {
_backing.put(_pos, value.array(), value.arrayOffset() + value.position(), rem); _backing.put(_pos, value.array(), value.arrayOffset() + value.position(), rem);
} }

View File

@@ -3,9 +3,11 @@ package com.usatiuk.objects.stores;
import com.usatiuk.objects.JDataVersionedWrapper; import com.usatiuk.objects.JDataVersionedWrapper;
import com.usatiuk.objects.JDataVersionedWrapperImpl; import com.usatiuk.objects.JDataVersionedWrapperImpl;
import com.usatiuk.objects.JObjectKey; import com.usatiuk.objects.JObjectKey;
import com.usatiuk.objects.iterators.*; import com.usatiuk.objects.iterators.CloseableKvIterator;
import com.usatiuk.objects.iterators.IteratorStart;
import com.usatiuk.objects.iterators.MaybeTombstone;
import com.usatiuk.objects.iterators.NavigableMapKvIterator;
import com.usatiuk.objects.snapshot.Snapshot; import com.usatiuk.objects.snapshot.Snapshot;
import com.usatiuk.objects.transaction.TxCommitException;
import com.usatiuk.objects.transaction.TxRecord; import com.usatiuk.objects.transaction.TxRecord;
import com.usatiuk.utils.ListUtils; import com.usatiuk.utils.ListUtils;
import io.quarkus.logging.Log; import io.quarkus.logging.Log;
@@ -25,7 +27,6 @@ import javax.annotation.Nonnull;
import java.util.*; import java.util.*;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Condition;
@@ -218,17 +219,18 @@ public class WritebackObjectPersistentStore {
var curPwMap = curPw.pendingWrites(); var curPwMap = curPw.pendingWrites();
for (var action : writes) { for (var action : writes) {
var key = action.key();
switch (action) { switch (action) {
case TxRecord.TxObjectRecordWrite<?> write -> { case TxRecord.TxObjectRecordWrite<?> write -> {
// Log.tracev("Flushing object {0}", write.key()); // Log.tracev("Flushing object {0}", write.key());
var wrapper = new JDataVersionedWrapperImpl(write.data(), oursId); var wrapper = new JDataVersionedWrapperImpl(write.data(), oursId);
curPwMap = curPwMap.plus(write.key(), new PendingWrite(wrapper, oursId)); curPwMap = curPwMap.plus(key, new PendingWrite(wrapper, oursId));
curBundle.commit(wrapper); curBundle.commit(wrapper);
} }
case TxRecord.TxObjectRecordDeleted deleted -> { case TxRecord.TxObjectRecordDeleted deleted -> {
// Log.tracev("Deleting object {0}", deleted.key()); // Log.tracev("Deleting object {0}", deleted.key());
curPwMap = curPwMap.plus(deleted.key(), new PendingDelete(deleted.key(), oursId)); curPwMap = curPwMap.plus(key, new PendingDelete(key, oursId));
curBundle.delete(deleted.key()); curBundle.delete(key);
} }
} }
} }

View File

@@ -19,7 +19,6 @@ import org.apache.commons.lang3.tuple.Pair;
import java.util.*; import java.util.*;
import java.util.function.Consumer; import java.util.function.Consumer;
import java.util.function.Function; import java.util.function.Function;
import java.util.stream.Stream;
@ApplicationScoped @ApplicationScoped
public class JObjectManager { public class JObjectManager {
@@ -75,11 +74,12 @@ public class JObjectManager {
} }
for (var n : tx.drainNewWrites()) { for (var n : tx.drainNewWrites()) {
var key = n.key();
for (var hookPut : hookIterationData) { for (var hookPut : hookIterationData) {
hookPut.pendingWrites().put(n.key(), n); hookPut.pendingWrites().put(key, n);
pendingCount++; pendingCount++;
} }
writes.put(n.key(), n); writes.put(key, n);
} }
@@ -108,19 +108,20 @@ public class JObjectManager {
// Log.trace("Commit iteration with " + curIteration.size() + " records for hook " + hook.getClass()); // Log.trace("Commit iteration with " + curIteration.size() + " records for hook " + hook.getClass());
for (var entry : curIteration.entrySet()) { for (var entry : curIteration.entrySet()) {
var key = entry.getKey();
// Log.trace("Running pre-commit hook " + hook.getClass() + " for" + entry.getKey()); // Log.trace("Running pre-commit hook " + hook.getClass() + " for" + entry.getKey());
var oldObj = getPrev.apply(entry.getKey()); var oldObj = getPrev.apply(key);
lastCurHookSeen.put(entry.getKey(), entry.getValue()); lastCurHookSeen.put(key, entry.getValue());
switch (entry.getValue()) { switch (entry.getValue()) {
case TxRecord.TxObjectRecordWrite<?> write -> { case TxRecord.TxObjectRecordWrite<?> write -> {
if (oldObj == null) { if (oldObj == null) {
hook.onCreate(write.key(), write.data()); hook.onCreate(key, write.data());
} else { } else {
hook.onChange(write.key(), oldObj, write.data()); hook.onChange(key, oldObj, write.data());
} }
} }
case TxRecord.TxObjectRecordDeleted deleted -> { case TxRecord.TxObjectRecordDeleted deleted -> {
hook.onDelete(deleted.key(), oldObj); hook.onDelete(key, oldObj);
} }
default -> throw new TxCommitException("Unexpected value: " + entry); default -> throw new TxCommitException("Unexpected value: " + entry);
} }
@@ -130,16 +131,17 @@ public class JObjectManager {
curIteration.clear(); curIteration.clear();
for (var n : tx.drainNewWrites()) { for (var n : tx.drainNewWrites()) {
var key = n.key();
for (var hookPut : hookIterationData) { for (var hookPut : hookIterationData) {
if (hookPut == hookId) { if (hookPut == hookId) {
lastCurHookSeen.put(n.key(), n); lastCurHookSeen.put(key, n);
continue; continue;
} }
var before = hookPut.pendingWrites().put(n.key(), n); var before = hookPut.pendingWrites().put(key, n);
if (before == null) if (before == null)
pendingCount++; pendingCount++;
} }
writes.put(n.key(), n); writes.put(key, n);
} }
} }
} }

View File

@@ -63,6 +63,8 @@ public class TransactionFactoryImpl implements TransactionFactory {
private final HashSet<JObjectKey> _knownNew = new HashSet<>(); private final HashSet<JObjectKey> _knownNew = new HashSet<>();
private final Snapshot<JObjectKey, JDataVersionedWrapper> _snapshot; private final Snapshot<JObjectKey, JDataVersionedWrapper> _snapshot;
private boolean _closed = false; private boolean _closed = false;
private boolean _writeTrack = false;
private Map<JObjectKey, TxRecord.TxObjectRecord<?>> _newWrites = new HashMap<>(); private Map<JObjectKey, TxRecord.TxObjectRecord<?>> _newWrites = new HashMap<>();
private TransactionImpl() { private TransactionImpl() {
@@ -99,43 +101,29 @@ public class TransactionFactoryImpl implements TransactionFactory {
if (_knownNew.contains(key)) { if (_knownNew.contains(key)) {
return Optional.empty(); return Optional.empty();
} }
var got = _readSet.computeIfAbsent(key, k -> {
var read = _snapshot.readObject(k); return _readSet.computeIfAbsent(key, _snapshot::readObject)
return read; .map(JDataVersionedWrapper::data)
}); .map(type::cast);
if (got.isEmpty())
return Optional.empty();
var gotData = got.get();
return Optional.of(type.cast(gotData.data()));
} }
@Override @Override
public <T extends JData> Optional<T> get(Class<T> type, JObjectKey key, LockingStrategy strategy) { public <T extends JData> Optional<T> get(Class<T> type, JObjectKey key, LockingStrategy strategy) {
switch (_writes.get(key)) { return switch (_writes.get(key)) {
case TxRecord.TxObjectRecordWrite<?> write -> { case TxRecord.TxObjectRecordWrite<?> write -> Optional.of(type.cast(write.data()));
return Optional.of(type.cast(write.data())); case TxRecord.TxObjectRecordDeleted deleted -> Optional.empty();
} case null -> getFromSource(type, key);
case TxRecord.TxObjectRecordDeleted deleted -> { };
return Optional.empty();
}
case null, default -> {
}
}
return getFromSource(type, key);
} }
@Override @Override
public void delete(JObjectKey key) { public void delete(JObjectKey key) {
var got = _writes.get(key); var record = new TxRecord.TxObjectRecordDeleted(key);
if (got != null) { if (_writes.put(key, record) instanceof TxRecord.TxObjectRecordDeleted) {
if (got instanceof TxRecord.TxObjectRecordDeleted) { return;
return;
}
} }
if (_writeTrack)
_writes.put(key, new TxRecord.TxObjectRecordDeleted(key)); _newWrites.put(key, record);
_newWrites.put(key, new TxRecord.TxObjectRecordDeleted(key));
} }
@Override @Override
@@ -163,25 +151,35 @@ public class TransactionFactoryImpl implements TransactionFactory {
@Override @Override
public void put(JData obj) { public void put(JData obj) {
var read = _readSet.get(obj.key()); var key = obj.key();
var read = _readSet.get(key);
if (read != null && (read.map(JDataVersionedWrapper::data).orElse(null) == obj)) { if (read != null && (read.map(JDataVersionedWrapper::data).orElse(null) == obj)) {
return; return;
} }
_writes.put(obj.key(), new TxRecord.TxObjectRecordWrite<>(obj)); var record = new TxRecord.TxObjectRecordWrite<>(obj);
_newWrites.put(obj.key(), new TxRecord.TxObjectRecordWrite<>(obj)); _writes.put(key, record);
if (_writeTrack)
_newWrites.put(key, record);
} }
@Override @Override
public void putNew(JData obj) { public void putNew(JData obj) {
_knownNew.add(obj.key()); var key = obj.key();
_knownNew.add(key);
_writes.put(obj.key(), new TxRecord.TxObjectRecordWrite<>(obj)); var record = new TxRecord.TxObjectRecordWrite<>(obj);
_newWrites.put(obj.key(), new TxRecord.TxObjectRecordWrite<>(obj)); _writes.put(key, record);
if (_writeTrack)
_newWrites.put(key, record);
} }
@Override @Override
public Collection<TxRecord.TxObjectRecord<?>> drainNewWrites() { public Collection<TxRecord.TxObjectRecord<?>> drainNewWrites() {
if (!_writeTrack) {
_writeTrack = true;
return Collections.unmodifiableCollection(_writes.values());
}
var ret = _newWrites; var ret = _newWrites;
_newWrites = new HashMap<>(); _newWrites = new HashMap<>();
return ret.values(); return ret.values();

View File

@@ -12,8 +12,8 @@ public interface TransactionManager {
void rollback(); void rollback();
default <T> T runTries(Supplier<T> supplier, int tries) { default <T> T runTries(Supplier<T> supplier, int tries, boolean nest) {
if (current() != null) { if (!nest && current() != null) {
return supplier.get(); return supplier.get();
} }
@@ -41,8 +41,8 @@ public interface TransactionManager {
} }
} }
default TransactionHandle runTries(VoidFn fn, int tries) { default TransactionHandle runTries(VoidFn fn, int tries, boolean nest) {
if (current() != null) { if (!nest && current() != null) {
fn.apply(); fn.apply();
return new TransactionHandle() { return new TransactionHandle() {
@Override @Override
@@ -74,23 +74,38 @@ public interface TransactionManager {
throw e; throw e;
} }
} }
}
default <T> T runTries(Supplier<T> supplier, int tries) {
return runTries(supplier, tries, false);
}
default TransactionHandle runTries(VoidFn fn, int tries) {
return runTries(fn, tries, false);
}
default TransactionHandle run(VoidFn fn, boolean nest) {
return runTries(fn, 10, nest);
}
default <T> T run(Supplier<T> supplier, boolean nest) {
return runTries(supplier, 10, nest);
} }
default TransactionHandle run(VoidFn fn) { default TransactionHandle run(VoidFn fn) {
return runTries(fn, 10); return run(fn, false);
} }
default <T> T run(Supplier<T> supplier) { default <T> T run(Supplier<T> supplier) {
return runTries(supplier, 10); return run(supplier, false);
} }
default void executeTx(VoidFn fn) { default void executeTx(VoidFn fn) {
run(fn); run(fn, false);
} }
default <T> T executeTx(Supplier<T> supplier) { default <T> T executeTx(Supplier<T> supplier) {
return run(supplier); return run(supplier, false);
} }
Transaction current(); Transaction current();

View File

@@ -1,47 +1,48 @@
package com.usatiuk.objects.transaction; package com.usatiuk.objects.transaction;
import io.quarkus.logging.Log; import io.quarkus.logging.Log;
import jakarta.annotation.Nullable;
import jakarta.inject.Inject; import jakarta.inject.Inject;
import jakarta.inject.Singleton; import jakarta.inject.Singleton;
import org.apache.commons.lang3.tuple.Pair; import org.apache.commons.lang3.tuple.Pair;
import java.util.Collection; import java.util.Collection;
import java.util.concurrent.ExecutorService; import java.util.Stack;
@Singleton @Singleton
public class TransactionManagerImpl implements TransactionManager { public class TransactionManagerImpl implements TransactionManager {
private static final ThreadLocal<TransactionPrivate> _currentTransaction = new ThreadLocal<>(); private static final ThreadLocal<Stack<TransactionPrivate>> _currentTransaction = ThreadLocal.withInitial(Stack::new);
@Inject @Inject
JObjectManager jObjectManager; JObjectManager jObjectManager;
@Override @Override
public void begin() { public void begin() {
if (_currentTransaction.get() != null) {
throw new IllegalStateException("Transaction already started");
}
Log.trace("Starting transaction"); Log.trace("Starting transaction");
var tx = jObjectManager.createTransaction(); var tx = jObjectManager.createTransaction();
_currentTransaction.set(tx); _currentTransaction.get().push(tx);
} }
@Override @Override
public TransactionHandle commit() { public TransactionHandle commit() {
if (_currentTransaction.get() == null) { var stack = _currentTransaction.get();
if (stack.empty()) {
throw new IllegalStateException("No transaction started"); throw new IllegalStateException("No transaction started");
} }
var peeked = stack.peek();
Log.trace("Committing transaction"); Log.trace("Committing transaction");
Pair<Collection<Runnable>, TransactionHandle> ret; Pair<Collection<Runnable>, TransactionHandle> ret;
try { try {
ret = jObjectManager.commit(_currentTransaction.get()); ret = jObjectManager.commit(peeked);
} catch (Throwable e) { } catch (Throwable e) {
Log.trace("Transaction commit failed", e); Log.trace("Transaction commit failed", e);
throw e; throw e;
} finally { } finally {
_currentTransaction.get().close(); peeked.close();
_currentTransaction.remove(); stack.pop();
if (stack.empty())
_currentTransaction.remove();
} }
for (var r : ret.getLeft()) { for (var r : ret.getLeft()) {
@@ -56,24 +57,33 @@ public class TransactionManagerImpl implements TransactionManager {
@Override @Override
public void rollback() { public void rollback() {
if (_currentTransaction.get() == null) { var stack = _currentTransaction.get();
if (stack.empty()) {
throw new IllegalStateException("No transaction started"); throw new IllegalStateException("No transaction started");
} }
var peeked = stack.peek();
try { try {
jObjectManager.rollback(_currentTransaction.get()); jObjectManager.rollback(peeked);
} catch (Throwable e) { } catch (Throwable e) {
Log.error("Transaction rollback failed", e); Log.error("Transaction rollback failed", e);
throw e; throw e;
} finally { } finally {
_currentTransaction.get().close(); peeked.close();
_currentTransaction.remove(); stack.pop();
if (stack.empty())
_currentTransaction.remove();
} }
} }
@Override @Override
@Nullable
public Transaction current() { public Transaction current() {
return _currentTransaction.get(); var stack = _currentTransaction.get();
if (stack.empty()) {
return null;
}
return stack.peek();
} }
} }

View File

@@ -1,10 +1,12 @@
dhfs.objects.persistence=lmdb dhfs.objects.persistence=lmdb
dhfs.objects.writeback.limit=134217728 dhfs.objects.writeback.limit=16777216
dhfs.objects.lru.limit=134217728 dhfs.objects.lru.limit=67108864
dhfs.objects.lru.print-stats=false dhfs.objects.lru.print-stats=false
dhfs.objects.lock_timeout_secs=15 dhfs.objects.lock_timeout_secs=15
dhfs.objects.persistence.files.root=${HOME}/dhfs_default/data/objs dhfs.objects.persistence.files.root=${HOME}/dhfs_default/data/objs
dhfs.objects.persistence.snapshot-extra-checks=false dhfs.objects.persistence.snapshot-extra-checks=false
dhfs.objects.transaction.never-lock=true dhfs.objects.transaction.never-lock=true
dhfs.objects.last-seen.update=60
dhfs.objects.last-seen.timeout=43200
quarkus.log.category."com.usatiuk.objects.iterators".level=INFO quarkus.log.category."com.usatiuk.objects.iterators".level=INFO
quarkus.log.category."com.usatiuk.objects.iterators".min-level=INFO quarkus.log.category."com.usatiuk.objects.iterators".min-level=INFO

View File

@@ -19,8 +19,6 @@ import jakarta.inject.Inject;
import org.apache.commons.lang3.concurrent.BasicThreadFactory; import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.eclipse.microprofile.config.inject.ConfigProperty; import org.eclipse.microprofile.config.inject.ConfigProperty;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
@@ -56,27 +54,21 @@ public class AutosyncProcessor {
if (downloadAll) if (downloadAll)
executorService.submit(() -> { executorService.submit(() -> {
Log.info("Adding all to autosync"); Log.info("Adding all to autosync");
List<JObjectKey> objs = new LinkedList<>();
txm.run(() -> { txm.run(() -> {
try (var it = curTx.getIterator(IteratorStart.GE, JObjectKey.first())) { try (var it = curTx.getIterator(IteratorStart.GE, JObjectKey.first())) {
while (it.hasNext()) { while (it.hasNext()) {
var key = it.peekNextKey(); var key = it.peekNextKey();
objs.add(key); txm.run(() -> {
// TODO: Nested transactions var gotObj = curTx.get(JData.class, key).orElse(null);
if (!(gotObj instanceof RemoteObjectMeta meta))
return;
if (!meta.hasLocalData())
add(meta.key());
}, true);
it.skip(); it.skip();
} }
} }
}); });
for (var obj : objs) {
txm.run(() -> {
var gotObj = curTx.get(JData.class, obj).orElse(null);
if (!(gotObj instanceof RemoteObjectMeta meta))
return;
if (!meta.hasLocalData())
add(meta.key());
});
}
Log.info("Adding all to autosync: finished"); Log.info("Adding all to autosync: finished");
}); });
} }

View File

@@ -1,13 +1,14 @@
package com.usatiuk.dhfs.invalidation; package com.usatiuk.dhfs.invalidation;
import com.usatiuk.dhfs.peersync.PeerId; import com.usatiuk.dhfs.peersync.PeerId;
import com.usatiuk.dhfs.remoteobj.JDataRemoteDto;
import com.usatiuk.objects.JObjectKey; import com.usatiuk.objects.JObjectKey;
import org.pcollections.PMap; import org.pcollections.PMap;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
public record IndexUpdateOp(JObjectKey key, PMap<PeerId, Long> changelog) implements Op { public record IndexUpdateOp(JObjectKey key, PMap<PeerId, Long> changelog, JDataRemoteDto data) implements Op {
@Override @Override
public Collection<JObjectKey> getEscapedRefs() { public Collection<JObjectKey> getEscapedRefs() {
return List.of(key); return List.of(key);

View File

@@ -1,10 +1,14 @@
package com.usatiuk.dhfs.invalidation; package com.usatiuk.dhfs.invalidation;
import com.usatiuk.dhfs.peersync.PeerId; import com.usatiuk.dhfs.peersync.PeerId;
import com.usatiuk.dhfs.remoteobj.JDataRemoteDto;
import com.usatiuk.dhfs.remoteobj.JDataRemotePush;
import com.usatiuk.dhfs.remoteobj.RemoteObjectMeta; import com.usatiuk.dhfs.remoteobj.RemoteObjectMeta;
import com.usatiuk.dhfs.remoteobj.RemoteTransaction; import com.usatiuk.dhfs.remoteobj.RemoteTransaction;
import com.usatiuk.dhfs.syncmap.DtoMapperService;
import com.usatiuk.objects.transaction.Transaction; import com.usatiuk.objects.transaction.Transaction;
import com.usatiuk.objects.transaction.TransactionManager; import com.usatiuk.objects.transaction.TransactionManager;
import io.quarkus.logging.Log;
import jakarta.enterprise.context.ApplicationScoped; import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject; import jakarta.inject.Inject;
import org.apache.commons.lang3.tuple.Pair; import org.apache.commons.lang3.tuple.Pair;
@@ -19,11 +23,22 @@ public class RemoteObjectMetaOpExtractor implements OpExtractor<RemoteObjectMeta
Transaction curTx; Transaction curTx;
@Inject @Inject
RemoteTransaction remoteTransaction; RemoteTransaction remoteTransaction;
@Inject
DtoMapperService dtoMapperService;
@Override @Override
public Pair<List<Op>, Runnable> extractOps(RemoteObjectMeta data, PeerId peerId) { public Pair<List<Op>, Runnable> extractOps(RemoteObjectMeta data, PeerId peerId) {
return txm.run(() -> { return txm.run(() -> {
return Pair.of(List.of(new IndexUpdateOp(data.key(), data.changelog())), () -> { JDataRemoteDto dto =
data.knownType().isAnnotationPresent(JDataRemotePush.class)
? remoteTransaction.getData(data.knownType(), data.key())
.map(d -> dtoMapperService.toDto(d, d.dtoClass())).orElse(null)
: null;
if (data.knownType().isAnnotationPresent(JDataRemotePush.class) && dto == null) {
Log.warnv("Failed to get data for push {0} of type {1}", data.key(), data.knownType());
}
return Pair.of(List.of(new IndexUpdateOp(data.key(), data.changelog(), dto)), () -> {
}); });
}); });
} }

View File

@@ -24,6 +24,6 @@ public class JKleppmannTreePeerInterface implements PeerInterface<PeerId> {
@Override @Override
public Collection<PeerId> getAllPeers() { public Collection<PeerId> getAllPeers() {
return peerInfoService.getPeers().stream().map(PeerInfo::id).toList(); return peerInfoService.getSynchronizedPeers().stream().map(PeerInfo::id).toList();
} }
} }

View File

@@ -4,24 +4,48 @@ import com.google.protobuf.ByteString;
import com.usatiuk.dhfs.peertrust.CertificateTools; import com.usatiuk.dhfs.peertrust.CertificateTools;
import com.usatiuk.dhfs.remoteobj.JDataRemote; import com.usatiuk.dhfs.remoteobj.JDataRemote;
import com.usatiuk.dhfs.remoteobj.JDataRemoteDto; import com.usatiuk.dhfs.remoteobj.JDataRemoteDto;
import com.usatiuk.dhfs.remoteobj.JDataRemotePush;
import com.usatiuk.objects.JObjectKey; import com.usatiuk.objects.JObjectKey;
import org.pcollections.HashTreePMap;
import org.pcollections.PMap;
import java.security.cert.X509Certificate; import java.security.cert.X509Certificate;
public record PeerInfo(JObjectKey key, PeerId id, ByteString cert) implements JDataRemote, JDataRemoteDto { @JDataRemotePush
public record PeerInfo(JObjectKey key, PeerId id, ByteString cert,
PMap<PeerId, Long> kickCounter,
long lastSeenTimestamp) implements JDataRemote, JDataRemoteDto {
public PeerInfo(PeerId id, byte[] cert) { public PeerInfo(PeerId id, byte[] cert) {
this(id.toJObjectKey(), id, ByteString.copyFrom(cert)); this(id.toJObjectKey(), id, ByteString.copyFrom(cert), HashTreePMap.empty(), System.currentTimeMillis());
} }
public X509Certificate parsedCert() { public X509Certificate parsedCert() {
return CertificateTools.certFromBytes(cert.toByteArray()); return CertificateTools.certFromBytes(cert.toByteArray());
} }
public PeerInfo withKickCounter(PMap<PeerId, Long> kickCounter) {
return new PeerInfo(key, id, cert, kickCounter, lastSeenTimestamp);
}
public PeerInfo withIncrementedKickCounter(PeerId peerId) {
return new PeerInfo(key, id, cert, kickCounter.plus(peerId, kickCounter.getOrDefault(peerId, 0L) + 1), lastSeenTimestamp);
}
public PeerInfo withLastSeenTimestamp(long lastSeenTimestamp) {
return new PeerInfo(key, id, cert, kickCounter, lastSeenTimestamp);
}
public long kickCounterSum() {
return kickCounter.values().stream().mapToLong(Long::longValue).sum();
}
@Override @Override
public String toString() { public String toString() {
return "PeerInfo{" + return "PeerInfo{" +
"key=" + key + "key=" + key +
", id=" + id + ", id=" + id +
", kickCounter=" + kickCounter +
", lastSeenTimestamp=" + lastSeenTimestamp +
'}'; '}';
} }
} }

View File

@@ -87,17 +87,27 @@ public class PeerInfoService {
public List<PeerInfo> getPeersNoSelf() { public List<PeerInfo> getPeersNoSelf() {
return jObjectTxManager.run(() -> { return jObjectTxManager.run(() -> {
var gotKey = getTreeR().traverse(List.of()); return getPeers().stream().filter(
return curTx.get(JKleppmannTreeNodeHolder.class, gotKey).map(JKleppmannTreeNodeHolder::node).map( peerInfo -> !peerInfo.id().equals(persistentPeerDataService.getSelfUuid())).toList();
node -> node.children().keySet().stream() });
.map(JObjectKey::of).map(this::getPeerInfoImpl) }
.filter(o -> {
if (o.isEmpty()) public List<PeerInfo> getSynchronizedPeers() {
Log.warnv("Could not get peer info for peer {0}", o); return jObjectTxManager.run(() -> {
return o.isPresent(); return getPeers().stream().filter(pi -> {
}).map(Optional::get).filter( if (pi.id().equals(persistentPeerDataService.getSelfUuid())) {
peerInfo -> !peerInfo.id().equals(persistentPeerDataService.getSelfUuid())).toList()) return true;
.orElseThrow(); }
return persistentPeerDataService.isInitialSyncDone(pi.id());
}).toList();
});
}
public List<PeerInfo> getSynchronizedPeersNoSelf() {
return jObjectTxManager.run(() -> {
return getPeersNoSelf().stream().filter(pi -> {
return persistentPeerDataService.isInitialSyncDone(pi.id());
}).toList();
}); });
} }

View File

@@ -89,10 +89,11 @@ public class PeerInfoSyncHandler implements ObjSyncHandler<PeerInfo, PeerInfo> {
if (oursCurData == null) if (oursCurData == null)
throw new StatusRuntimeException(Status.ABORTED.withDescription("Conflict but we don't have local copy")); throw new StatusRuntimeException(Status.ABORTED.withDescription("Conflict but we don't have local copy"));
if (!receivedData.equals(oursCurData)) if (!receivedData.cert().equals(oursCurData.cert()))
throw new StatusRuntimeException(Status.ABORTED.withDescription("PeerInfo data conflict")); throw new StatusRuntimeException(Status.ABORTED.withDescription("PeerInfo certificate conflict for " + key));
HashPMap<PeerId, Long> newChangelog = HashTreePMap.from(current.changelog()); HashPMap<PeerId, Long> newChangelog = HashTreePMap.from(current.changelog());
HashPMap<PeerId, Long> newKickCounter = HashTreePMap.from(oursCurData.kickCounter());
for (var entry : receivedChangelog.entrySet()) { for (var entry : receivedChangelog.entrySet()) {
newChangelog = newChangelog.plus(entry.getKey(), newChangelog = newChangelog.plus(entry.getKey(),
@@ -100,6 +101,20 @@ public class PeerInfoSyncHandler implements ObjSyncHandler<PeerInfo, PeerInfo> {
); );
} }
for (var entry : receivedData.kickCounter().entrySet()) {
newKickCounter = newKickCounter.plus(entry.getKey(),
Long.max(newKickCounter.getOrDefault(entry.getKey(), 0L), entry.getValue())
);
}
var newData = oursCurData.withKickCounter(newKickCounter)
.withLastSeenTimestamp(Math.max(oursCurData.lastSeenTimestamp(), receivedData.lastSeenTimestamp()));
if (!newData.equals(oursCurData))
newChangelog = newChangelog.plus(persistentPeerDataService.getSelfUuid(), newChangelog.getOrDefault(persistentPeerDataService.getSelfUuid(), 0L) + 1L);
remoteTx.putDataRaw(newData);
current = current.withChangelog(newChangelog); current = current.withChangelog(newChangelog);
} }
} }

View File

@@ -0,0 +1,60 @@
package com.usatiuk.dhfs.peersync;
import com.usatiuk.dhfs.remoteobj.RemoteTransaction;
import com.usatiuk.objects.transaction.Transaction;
import com.usatiuk.objects.transaction.TransactionManager;
import io.quarkus.scheduler.Scheduled;
import io.smallrye.common.annotation.Blocking;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import org.eclipse.microprofile.config.inject.ConfigProperty;
@ApplicationScoped
public class PeerLastSeenUpdater {
@Inject
PeerManager peerManager;
@Inject
PeerInfoService peerInfoService;
@Inject
Transaction curTx;
@Inject
TransactionManager txm;
@Inject
RemoteTransaction remoteTransaction;
@ConfigProperty(name = "dhfs.objects.last-seen.timeout")
long lastSeenTimeout;
@Inject
PersistentPeerDataService persistentPeerDataService;
@Scheduled(every = "${dhfs.objects.last-seen.update}", concurrentExecution = Scheduled.ConcurrentExecution.SKIP)
@Blocking
void update() {
var snapshot = peerManager.getHostStateSnapshot();
for (var a : snapshot.available()) {
txm.run(() -> {
var curInfo = remoteTransaction.getData(PeerInfo.class, a.id()).orElse(null);
if (curInfo == null) return;
var newInfo = curInfo.withLastSeenTimestamp(System.currentTimeMillis());
remoteTransaction.putData(newInfo);
});
}
for (var u : snapshot.unavailable()) {
txm.run(() -> {
if (!persistentPeerDataService.isInitialSyncDone(u))
return;
var curInfo = remoteTransaction.getData(PeerInfo.class, u.id()).orElse(null);
if (curInfo == null) return;
var lastSeen = curInfo.lastSeenTimestamp();
if (System.currentTimeMillis() - lastSeen > (lastSeenTimeout * 1000)) {
var kicked = curInfo.withIncrementedKickCounter(persistentPeerDataService.getSelfUuid());
remoteTransaction.putData(kicked);
}
});
}
}
}

View File

@@ -125,19 +125,23 @@ public class PeerManager {
} }
} }
public void handleConnectionError(com.usatiuk.dhfs.peersync.PeerInfo host) { public void handleConnectionError(PeerId host) {
boolean wasReachable = isReachable(host); boolean wasReachable = isReachable(host);
if (wasReachable) if (wasReachable)
Log.infov("Lost connection to {0}", host); Log.infov("Lost connection to {0}", host);
_states.remove(host.id()); _states.remove(host);
for (var l : _disconnectedListeners) { for (var l : _disconnectedListeners) {
l.handlePeerDisconnected(host.id()); l.handlePeerDisconnected(host);
} }
} }
public void handleConnectionError(com.usatiuk.dhfs.peersync.PeerInfo host) {
handleConnectionError(host.id());
}
// FIXME: // FIXME:
private boolean pingCheck(com.usatiuk.dhfs.peersync.PeerInfo host, PeerAddress address) { private boolean pingCheck(com.usatiuk.dhfs.peersync.PeerInfo host, PeerAddress address) {
try { try {

View File

@@ -51,6 +51,8 @@ public class PersistentPeerDataService {
PeerInfoService peerInfoService; PeerInfoService peerInfoService;
@Inject @Inject
TransactionManager txm; TransactionManager txm;
@Inject
PeerManager peerManager;
@ConfigProperty(name = "dhfs.peerdiscovery.preset-uuid") @ConfigProperty(name = "dhfs.peerdiscovery.preset-uuid")
Optional<String> presetUuid; Optional<String> presetUuid;
@@ -89,21 +91,6 @@ public class PersistentPeerDataService {
Files.write(Path.of(stuffRoot, "self_uuid"), _selfUuid.id().toString().getBytes(), StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING); Files.write(Path.of(stuffRoot, "self_uuid"), _selfUuid.id().toString().getBytes(), StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING);
} }
// private void pushPeerUpdates() {
// pushPeerUpdates(null);
// }
// private void pushPeerUpdates(@Nullable JObject<?> obj) {
// if (obj != null)
// Log.info("Scheduling certificate update after " + obj.getMeta().getName() + " was updated");
// executorService.submit(() -> {
// updateCerts();
// invalidationQueueService.pushInvalidationToAll(PeerDirectory.PeerDirectoryObjName);
// for (var p : peerDirectory.get().runReadLocked(JObjectManager.ResolutionStrategy.LOCAL_ONLY, (m, d) -> d.getPeers().stream().toList()))
// invalidationQueueService.pushInvalidationToAll(PersistentPeerInfo.getNameFromUuid(p));
// });
// }
public PeerId getSelfUuid() { public PeerId getSelfUuid() {
return _selfUuid; return _selfUuid;
} }
@@ -148,6 +135,7 @@ public class PersistentPeerDataService {
} }
curTx.put(data.withInitialSyncDone(data.initialSyncDone().minus(peerId))); curTx.put(data.withInitialSyncDone(data.initialSyncDone().minus(peerId)));
Log.infov("Did reset sync state for {0}", peerId); Log.infov("Did reset sync state for {0}", peerId);
curTx.onCommit(() -> peerManager.handleConnectionError(peerId));
return true; return true;
}); });
} }
@@ -160,7 +148,6 @@ public class PersistentPeerDataService {
}); });
} }
public List<IpPeerAddress> getPersistentPeerAddresses() { public List<IpPeerAddress> getPersistentPeerAddresses() {
return txm.run(() -> { return txm.run(() -> {
var data = curTx.get(PersistentRemoteHostsData.class, PersistentRemoteHostsData.KEY).orElse(null); var data = curTx.get(PersistentRemoteHostsData.class, PersistentRemoteHostsData.KEY).orElse(null);

View File

@@ -1,7 +1,6 @@
package com.usatiuk.dhfs.peertrust; package com.usatiuk.dhfs.peertrust;
import com.usatiuk.dhfs.invalidation.InvalidationQueueService; import com.usatiuk.dhfs.invalidation.InvalidationQueueService;
import com.usatiuk.dhfs.jkleppmanntree.structs.JKleppmannTreeNode;
import com.usatiuk.dhfs.jkleppmanntree.structs.JKleppmannTreeNodeHolder; import com.usatiuk.dhfs.jkleppmanntree.structs.JKleppmannTreeNodeHolder;
import com.usatiuk.dhfs.peersync.PeerInfo; import com.usatiuk.dhfs.peersync.PeerInfo;
import com.usatiuk.dhfs.peersync.PeerInfoService; import com.usatiuk.dhfs.peersync.PeerInfoService;
@@ -43,7 +42,7 @@ public class PeerInfoCertUpdateTxHook implements PreCommitTxHook {
for (var curRef : oldNode.node().children().entrySet()) { for (var curRef : oldNode.node().children().entrySet()) {
if (!n.node().children().containsKey(curRef.getKey())) { if (!n.node().children().containsKey(curRef.getKey())) {
Log.infov("Will reset sync state for {0}", curRef.getValue()); Log.infov("Will reset sync state for {0}", curRef.getValue());
curTx.onCommit(() -> persistentPeerDataService.resetInitialSyncDone(JKleppmannTreeNodeMetaPeer.nodeIdToPeerId(curRef.getValue()))); persistentPeerDataService.resetInitialSyncDone(JKleppmannTreeNodeMetaPeer.nodeIdToPeerId(curRef.getValue()));
} }
} }
return; return;
@@ -54,9 +53,16 @@ public class PeerInfoCertUpdateTxHook implements PreCommitTxHook {
return; return;
} }
if (!(remote.data() instanceof PeerInfo)) if (!(remote.data() instanceof PeerInfo curPi))
return; return;
var oldPi = (PeerInfo) ((RemoteObjectDataWrapper) old).data();
if (oldPi.kickCounterSum() != curPi.kickCounterSum()) {
Log.warnv("Peer kicked out: {0} to {1}", key, cur);
persistentPeerDataService.resetInitialSyncDone(curPi.id());
}
Log.infov("Changed peer info: {0} to {1}", key, cur); Log.infov("Changed peer info: {0} to {1}", key, cur);
curTx.onCommit(() -> invalidationQueueService.pushInvalidationToAll(key)); curTx.onCommit(() -> invalidationQueueService.pushInvalidationToAll(key));

View File

@@ -20,7 +20,7 @@ public class IndexUpdateOpHandler implements OpHandler<IndexUpdateOp> {
@Override @Override
public void handleOp(PeerId from, IndexUpdateOp op) { public void handleOp(PeerId from, IndexUpdateOp op) {
txm.run(() -> { txm.run(() -> {
syncHandler.handleRemoteUpdate(from, op.key(), op.changelog(), null); syncHandler.handleRemoteUpdate(from, op.key(), op.changelog(), op.data());
}); });
} }
} }

View File

@@ -0,0 +1,11 @@
package com.usatiuk.dhfs.remoteobj;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
public @interface JDataRemotePush {
}

View File

@@ -132,7 +132,7 @@ public class RemoteObjectDeleter {
return true; return true;
} }
var knownHosts = peerInfoService.getPeersNoSelf(); var knownHosts = peerInfoService.getSynchronizedPeersNoSelf();
RemoteObjectMeta finalTarget = target; RemoteObjectMeta finalTarget = target;
List<PeerId> missing = knownHosts.stream() List<PeerId> missing = knownHosts.stream()
.map(PeerInfo::id) .map(PeerInfo::id)
@@ -187,7 +187,7 @@ public class RemoteObjectDeleter {
if (!obj.seen()) if (!obj.seen())
return true; return true;
var knownHosts = peerInfoService.getPeersNoSelf(); var knownHosts = peerInfoService.getSynchronizedPeersNoSelf();
boolean missing = false; boolean missing = false;
for (var x : knownHosts) { for (var x : knownHosts) {
if (!obj.confirmedDeletes().contains(x.id())) { if (!obj.confirmedDeletes().contains(x.id())) {

View File

@@ -23,7 +23,10 @@ import org.pcollections.PMap;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.lang.reflect.ParameterizedType; import java.lang.reflect.ParameterizedType;
import java.util.*; import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Stream; import java.util.stream.Stream;
@ApplicationScoped @ApplicationScoped
@@ -124,52 +127,42 @@ public class SyncHandler {
public void resyncAfterCrash(@Observes @Priority(100000) StartupEvent event) { public void resyncAfterCrash(@Observes @Priority(100000) StartupEvent event) {
if (shutdownChecker.lastShutdownClean()) if (shutdownChecker.lastShutdownClean())
return; return;
List<JObjectKey> objs = new LinkedList<>();
txm.run(() -> { txm.run(() -> {
try (var it = curTx.getIterator(IteratorStart.GE, JObjectKey.first())) { try (var it = curTx.getIterator(IteratorStart.GE, JObjectKey.first())) {
while (it.hasNext()) { while (it.hasNext()) {
var key = it.peekNextKey(); var key = it.peekNextKey();
objs.add(key); txm.run(() -> {
// TODO: Nested transactions var proc = curTx.get(JData.class, key).flatMap(o -> Optional.ofNullable(_initialSyncProcessors.get(o.getClass()))).orElse(null);
if (proc != null) {
proc.handleCrash(key);
}
Log.infov("Handled crash of {0}", key);
}, true);
it.skip(); it.skip();
} }
} }
}); });
for (var obj : objs) {
txm.run(() -> {
var proc = curTx.get(JData.class, obj).flatMap(o -> Optional.ofNullable(_initialSyncProcessors.get(o.getClass()))).orElse(null);
if (proc != null) {
proc.handleCrash(obj);
}
Log.infov("Handled crash of {0}", obj);
});
}
} }
public void doInitialSync(PeerId peer) { public void doInitialSync(PeerId peer) {
List<JObjectKey> objs = new LinkedList<>();
txm.run(() -> { txm.run(() -> {
Log.tracev("Will do initial sync for {0}", peer); Log.tracev("Will do initial sync for {0}", peer);
try (var it = curTx.getIterator(IteratorStart.GE, JObjectKey.first())) { try (var it = curTx.getIterator(IteratorStart.GE, JObjectKey.first())) {
while (it.hasNext()) { while (it.hasNext()) {
var key = it.peekNextKey(); var key = it.peekNextKey();
objs.add(key); txm.run(() -> {
// TODO: Nested transactions var proc = curTx.get(JData.class, key).flatMap(o -> Optional.ofNullable(_initialSyncProcessors.get(o.getClass()))).orElse(null);
if (proc != null) {
proc.prepareForInitialSync(peer, key);
}
Log.infov("Adding to initial sync for peer {0}: {1}", peer, key);
invalidationQueueService.pushInvalidationToOne(peer, key);
}, true);
it.skip(); it.skip();
} }
} }
}); });
for (var obj : objs) {
txm.run(() -> {
var proc = curTx.get(JData.class, obj).flatMap(o -> Optional.ofNullable(_initialSyncProcessors.get(o.getClass()))).orElse(null);
if (proc != null) {
proc.prepareForInitialSync(peer, obj);
}
Log.infov("Adding to initial sync for peer {0}: {1}", peer, obj);
invalidationQueueService.pushInvalidationToOne(peer, obj);
});
}
} }
} }

View File

@@ -17,7 +17,7 @@ public class UninitializedByteBuffer {
); );
public static ByteBuffer allocate(int capacity) { public static ByteBuffer allocate(int capacity) {
UnsafeAccessor.get().getNioAccess().reserveMemory(capacity, capacity); UnsafeAccessor.NIO.reserveMemory(capacity, capacity);
MemorySegment segment = null; MemorySegment segment = null;
try { try {
@@ -29,7 +29,7 @@ public class UninitializedByteBuffer {
Consumer<MemorySegment> cleanup = s -> { Consumer<MemorySegment> cleanup = s -> {
try { try {
free.invokeExact(s); free.invokeExact(s);
UnsafeAccessor.get().getNioAccess().unreserveMemory(capacity, capacity); UnsafeAccessor.NIO.unreserveMemory(capacity, capacity);
} catch (Throwable e) { } catch (Throwable e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
@@ -39,6 +39,6 @@ public class UninitializedByteBuffer {
} }
public static long getAddress(ByteBuffer buffer) { public static long getAddress(ByteBuffer buffer) {
return UnsafeAccessor.get().getNioAccess().getBufferAddress(buffer); return UnsafeAccessor.NIO.getBufferAddress(buffer);
} }
} }

View File

@@ -6,36 +6,18 @@ import sun.misc.Unsafe;
import java.lang.reflect.Field; import java.lang.reflect.Field;
public class UnsafeAccessor { public abstract class UnsafeAccessor {
private static final UnsafeAccessor INSTANCE; public static final JavaNioAccess NIO;
public static final Unsafe UNSAFE;
static { static {
try { try {
INSTANCE = new UnsafeAccessor(); NIO = SharedSecrets.getJavaNioAccess();
Field f = Unsafe.class.getDeclaredField("theUnsafe");
f.setAccessible(true);
UNSAFE = (Unsafe) f.get(null);
} catch (Exception e) { } catch (Exception e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
} }
public static UnsafeAccessor get() {
return INSTANCE;
}
private JavaNioAccess _nioAccess;
private Unsafe _unsafe;
private UnsafeAccessor() throws NoSuchFieldException, IllegalAccessException {
_nioAccess = SharedSecrets.getJavaNioAccess();
Field f = Unsafe.class.getDeclaredField("theUnsafe");
f.setAccessible(true);
_unsafe = (Unsafe) f.get(null);
}
public JavaNioAccess getNioAccess() {
return _nioAccess;
}
public Unsafe getUnsafe() {
return _unsafe;
}
} }

View File

@@ -38,7 +38,7 @@ java \
-Dquarkus.log.category.\"com.usatiuk\".level=INFO \ -Dquarkus.log.category.\"com.usatiuk\".level=INFO \
-Dquarkus.log.category.\"com.usatiuk.dhfs\".level=INFO \ -Dquarkus.log.category.\"com.usatiuk.dhfs\".level=INFO \
-Ddhfs.webui.root="$SCRIPT_DIR"/Webui $EXTRAOPTS_PARSED \ -Ddhfs.webui.root="$SCRIPT_DIR"/Webui $EXTRAOPTS_PARSED \
-jar "$SCRIPT_DIR"/"DHFS Package"/quarkus-run.jar >quarkus.log 2>&1 & -jar "$SCRIPT_DIR"/"Server"/quarkus-run.jar >quarkus.log 2>&1 &
echo "Started $!" echo "Started $!"

View File

@@ -40,10 +40,10 @@ wget https://nightly.link/usatiuk/dhfs/actions/runs/$LATEST/Run%20wrapper.zip
unzip "Run wrapper.zip" unzip "Run wrapper.zip"
rm "Run wrapper.zip" rm "Run wrapper.zip"
tar xvf "run-wrapper.tar.gz" --strip-components=2 tar xvf "run-wrapper.tar.gz" --strip-components 2
rm "run-wrapper.tar.gz" rm "run-wrapper.tar.gz"
rm -rf "DHFS Package" rm -rf "Server"
rm -rf "Webui" rm -rf "Webui"
rm -rf "NativeLibs" rm -rf "NativeLibs"