mirror of
https://github.com/usatiuk/dhfs.git
synced 2025-10-28 12:37:48 +01:00
Compare commits
6 Commits
b84ef95703
...
eb5b0ae03c
| Author | SHA1 | Date | |
|---|---|---|---|
| eb5b0ae03c | |||
| c329c1f982 | |||
| 4e7b13227b | |||
| db51d7280c | |||
| 70fecb389b | |||
| 6e9a2b25f6 |
6
.github/workflows/server.yml
vendored
6
.github/workflows/server.yml
vendored
@@ -21,7 +21,7 @@ jobs:
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v4
|
||||
with:
|
||||
submodules: 'recursive'
|
||||
submodules: "recursive"
|
||||
|
||||
- name: Install sudo for ACT
|
||||
run: apt-get update && apt-get install -y sudo
|
||||
@@ -32,7 +32,7 @@ jobs:
|
||||
|
||||
- name: User allow other for fuse
|
||||
run: echo "user_allow_other" | sudo tee -a /etc/fuse.conf
|
||||
|
||||
|
||||
- name: Dump fuse.conf
|
||||
run: cat /etc/fuse.conf
|
||||
|
||||
@@ -212,7 +212,7 @@ jobs:
|
||||
run: mkdir -p run-wrapper-out/dhfs/data && mkdir -p run-wrapper-out/dhfs/fuse && mkdir -p run-wrapper-out/dhfs/app
|
||||
|
||||
- name: Copy DHFS
|
||||
run: cp -r ./dhfs-package-downloaded "run-wrapper-out/dhfs/app/DHFS Package"
|
||||
run: cp -r ./dhfs-package-downloaded "run-wrapper-out/dhfs/app/Server"
|
||||
|
||||
- name: Copy Webui
|
||||
run: cp -r ./webui-dist-downloaded "run-wrapper-out/dhfs/app/Webui"
|
||||
|
||||
@@ -168,35 +168,6 @@ public class DhfsFuseIT {
|
||||
"rewritten\n".equals(container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testf1").getStdout()));
|
||||
}
|
||||
|
||||
// TODO: How this fits with the tree?
|
||||
@Test
|
||||
@Disabled
|
||||
void deleteDelayedTest() throws IOException, InterruptedException, TimeoutException {
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo tesempty > /dhfs_test/fuse/testf1").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> "tesempty\n".equals(container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testf1").getStdout()));
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> "tesempty\n".equals(container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testf1").getStdout()));
|
||||
|
||||
var client = DockerClientFactory.instance().client();
|
||||
client.pauseContainerCmd(container2.getContainerId()).exec();
|
||||
|
||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Lost connection to"), 60, TimeUnit.SECONDS);
|
||||
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "rm /dhfs_test/fuse/testf1").getExitCode());
|
||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Delaying deletion check"), 60, TimeUnit.SECONDS, 1);
|
||||
|
||||
client.unpauseContainerCmd(container2.getContainerId()).exec();
|
||||
|
||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
|
||||
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container2.execInContainer("/bin/sh", "-c", "ls /dhfs_test/fuse").getExitCode());
|
||||
|
||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Deleting from persistent"), 60, TimeUnit.SECONDS, 1);
|
||||
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Deleting from persistent"), 60, TimeUnit.SECONDS, 3);
|
||||
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 1 == container2.execInContainer("/bin/sh", "-c", "test -f /dhfs_test/fuse/testf1").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 1 == container1.execInContainer("/bin/sh", "-c", "test -f /dhfs_test/fuse/testf1").getExitCode());
|
||||
}
|
||||
|
||||
@Test
|
||||
void deleteTest() throws IOException, InterruptedException, TimeoutException {
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo tesempty > /dhfs_test/fuse/testf1").getExitCode());
|
||||
@@ -221,6 +192,28 @@ public class DhfsFuseIT {
|
||||
1 == container1.execInContainer("/bin/sh", "-c", "test -f /dhfs_test/fuse/testf1").getExitCode());
|
||||
}
|
||||
|
||||
@Test
|
||||
void deleteTestKickedOut() throws IOException, InterruptedException, TimeoutException {
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "echo tesempty > /dhfs_test/fuse/testf1").getExitCode());
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() ->
|
||||
"tesempty\n".equals(container2.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testf1").getStdout()));
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() ->
|
||||
"tesempty\n".equals(container1.execInContainer("/bin/sh", "-c", "cat /dhfs_test/fuse/testf1").getStdout()));
|
||||
|
||||
container2.stop();
|
||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("kicked"), 60, TimeUnit.SECONDS, 1);
|
||||
|
||||
Log.info("Deleting");
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() -> 0 == container1.execInContainer("/bin/sh", "-c", "rm /dhfs_test/fuse/testf1").getExitCode());
|
||||
Log.info("Deleted");
|
||||
|
||||
// FIXME?
|
||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Deleting from persistent"), 60, TimeUnit.SECONDS, 3);
|
||||
|
||||
await().atMost(45, TimeUnit.SECONDS).until(() ->
|
||||
1 == container1.execInContainer("/bin/sh", "-c", "test -f /dhfs_test/fuse/testf1").getExitCode());
|
||||
}
|
||||
|
||||
@Test
|
||||
void moveFileTest() throws IOException, InterruptedException, TimeoutException {
|
||||
Log.info("Creating");
|
||||
|
||||
@@ -79,6 +79,8 @@ public class DhfsImage implements Future<String> {
|
||||
"-Ddhfs.objects.sync.timeout=30",
|
||||
"-Ddhfs.objects.sync.ping.timeout=5",
|
||||
"-Ddhfs.objects.reconnect_interval=1s",
|
||||
"-Ddhfs.objects.last-seen.timeout=30",
|
||||
"-Ddhfs.objects.last-seen.update=10",
|
||||
"-Ddhfs.sync.cert-check=false",
|
||||
"-Dquarkus.log.category.\"com.usatiuk\".level=TRACE",
|
||||
"-Dquarkus.log.category.\"com.usatiuk.dhfs\".level=TRACE",
|
||||
|
||||
@@ -246,9 +246,9 @@ public class DhfsFuse extends FuseStubFS {
|
||||
@Override
|
||||
public int write(String path, Pointer buf, long size, long offset, FuseFileInfo fi) {
|
||||
var buffer = UninitializedByteBuffer.allocate((int) size);
|
||||
UnsafeAccessor.get().getUnsafe().copyMemory(
|
||||
UnsafeAccessor.UNSAFE.copyMemory(
|
||||
buf.address(),
|
||||
UnsafeAccessor.get().getNioAccess().getBufferAddress(buffer),
|
||||
UnsafeAccessor.NIO.getBufferAddress(buffer),
|
||||
size
|
||||
);
|
||||
return write(path, buffer, offset, fi);
|
||||
|
||||
@@ -46,9 +46,9 @@ public class JnrPtrByteOutput extends ByteOutput {
|
||||
if (value instanceof MappedByteBuffer mb) {
|
||||
mb.load();
|
||||
}
|
||||
long addr = UnsafeAccessor.get().getNioAccess().getBufferAddress(value) + value.position();
|
||||
long addr = UnsafeAccessor.NIO.getBufferAddress(value) + value.position();
|
||||
var out = _backing.address() + _pos;
|
||||
UnsafeAccessor.get().getUnsafe().copyMemory(addr, out, rem);
|
||||
UnsafeAccessor.UNSAFE.copyMemory(addr, out, rem);
|
||||
} else {
|
||||
_backing.put(_pos, value.array(), value.arrayOffset() + value.position(), rem);
|
||||
}
|
||||
|
||||
@@ -3,9 +3,11 @@ package com.usatiuk.objects.stores;
|
||||
import com.usatiuk.objects.JDataVersionedWrapper;
|
||||
import com.usatiuk.objects.JDataVersionedWrapperImpl;
|
||||
import com.usatiuk.objects.JObjectKey;
|
||||
import com.usatiuk.objects.iterators.*;
|
||||
import com.usatiuk.objects.iterators.CloseableKvIterator;
|
||||
import com.usatiuk.objects.iterators.IteratorStart;
|
||||
import com.usatiuk.objects.iterators.MaybeTombstone;
|
||||
import com.usatiuk.objects.iterators.NavigableMapKvIterator;
|
||||
import com.usatiuk.objects.snapshot.Snapshot;
|
||||
import com.usatiuk.objects.transaction.TxCommitException;
|
||||
import com.usatiuk.objects.transaction.TxRecord;
|
||||
import com.usatiuk.utils.ListUtils;
|
||||
import io.quarkus.logging.Log;
|
||||
@@ -25,7 +27,6 @@ import javax.annotation.Nonnull;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Semaphore;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.concurrent.locks.Condition;
|
||||
@@ -218,17 +219,18 @@ public class WritebackObjectPersistentStore {
|
||||
var curPwMap = curPw.pendingWrites();
|
||||
|
||||
for (var action : writes) {
|
||||
var key = action.key();
|
||||
switch (action) {
|
||||
case TxRecord.TxObjectRecordWrite<?> write -> {
|
||||
// Log.tracev("Flushing object {0}", write.key());
|
||||
var wrapper = new JDataVersionedWrapperImpl(write.data(), oursId);
|
||||
curPwMap = curPwMap.plus(write.key(), new PendingWrite(wrapper, oursId));
|
||||
curPwMap = curPwMap.plus(key, new PendingWrite(wrapper, oursId));
|
||||
curBundle.commit(wrapper);
|
||||
}
|
||||
case TxRecord.TxObjectRecordDeleted deleted -> {
|
||||
// Log.tracev("Deleting object {0}", deleted.key());
|
||||
curPwMap = curPwMap.plus(deleted.key(), new PendingDelete(deleted.key(), oursId));
|
||||
curBundle.delete(deleted.key());
|
||||
curPwMap = curPwMap.plus(key, new PendingDelete(key, oursId));
|
||||
curBundle.delete(key);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -19,7 +19,6 @@ import org.apache.commons.lang3.tuple.Pair;
|
||||
import java.util.*;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
@ApplicationScoped
|
||||
public class JObjectManager {
|
||||
@@ -75,11 +74,12 @@ public class JObjectManager {
|
||||
}
|
||||
|
||||
for (var n : tx.drainNewWrites()) {
|
||||
var key = n.key();
|
||||
for (var hookPut : hookIterationData) {
|
||||
hookPut.pendingWrites().put(n.key(), n);
|
||||
hookPut.pendingWrites().put(key, n);
|
||||
pendingCount++;
|
||||
}
|
||||
writes.put(n.key(), n);
|
||||
writes.put(key, n);
|
||||
}
|
||||
|
||||
|
||||
@@ -108,19 +108,20 @@ public class JObjectManager {
|
||||
// Log.trace("Commit iteration with " + curIteration.size() + " records for hook " + hook.getClass());
|
||||
|
||||
for (var entry : curIteration.entrySet()) {
|
||||
var key = entry.getKey();
|
||||
// Log.trace("Running pre-commit hook " + hook.getClass() + " for" + entry.getKey());
|
||||
var oldObj = getPrev.apply(entry.getKey());
|
||||
lastCurHookSeen.put(entry.getKey(), entry.getValue());
|
||||
var oldObj = getPrev.apply(key);
|
||||
lastCurHookSeen.put(key, entry.getValue());
|
||||
switch (entry.getValue()) {
|
||||
case TxRecord.TxObjectRecordWrite<?> write -> {
|
||||
if (oldObj == null) {
|
||||
hook.onCreate(write.key(), write.data());
|
||||
hook.onCreate(key, write.data());
|
||||
} else {
|
||||
hook.onChange(write.key(), oldObj, write.data());
|
||||
hook.onChange(key, oldObj, write.data());
|
||||
}
|
||||
}
|
||||
case TxRecord.TxObjectRecordDeleted deleted -> {
|
||||
hook.onDelete(deleted.key(), oldObj);
|
||||
hook.onDelete(key, oldObj);
|
||||
}
|
||||
default -> throw new TxCommitException("Unexpected value: " + entry);
|
||||
}
|
||||
@@ -130,16 +131,17 @@ public class JObjectManager {
|
||||
curIteration.clear();
|
||||
|
||||
for (var n : tx.drainNewWrites()) {
|
||||
var key = n.key();
|
||||
for (var hookPut : hookIterationData) {
|
||||
if (hookPut == hookId) {
|
||||
lastCurHookSeen.put(n.key(), n);
|
||||
lastCurHookSeen.put(key, n);
|
||||
continue;
|
||||
}
|
||||
var before = hookPut.pendingWrites().put(n.key(), n);
|
||||
var before = hookPut.pendingWrites().put(key, n);
|
||||
if (before == null)
|
||||
pendingCount++;
|
||||
}
|
||||
writes.put(n.key(), n);
|
||||
writes.put(key, n);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -63,6 +63,8 @@ public class TransactionFactoryImpl implements TransactionFactory {
|
||||
private final HashSet<JObjectKey> _knownNew = new HashSet<>();
|
||||
private final Snapshot<JObjectKey, JDataVersionedWrapper> _snapshot;
|
||||
private boolean _closed = false;
|
||||
|
||||
private boolean _writeTrack = false;
|
||||
private Map<JObjectKey, TxRecord.TxObjectRecord<?>> _newWrites = new HashMap<>();
|
||||
|
||||
private TransactionImpl() {
|
||||
@@ -99,43 +101,29 @@ public class TransactionFactoryImpl implements TransactionFactory {
|
||||
if (_knownNew.contains(key)) {
|
||||
return Optional.empty();
|
||||
}
|
||||
var got = _readSet.computeIfAbsent(key, k -> {
|
||||
var read = _snapshot.readObject(k);
|
||||
return read;
|
||||
});
|
||||
if (got.isEmpty())
|
||||
return Optional.empty();
|
||||
var gotData = got.get();
|
||||
return Optional.of(type.cast(gotData.data()));
|
||||
|
||||
return _readSet.computeIfAbsent(key, _snapshot::readObject)
|
||||
.map(JDataVersionedWrapper::data)
|
||||
.map(type::cast);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T extends JData> Optional<T> get(Class<T> type, JObjectKey key, LockingStrategy strategy) {
|
||||
switch (_writes.get(key)) {
|
||||
case TxRecord.TxObjectRecordWrite<?> write -> {
|
||||
return Optional.of(type.cast(write.data()));
|
||||
}
|
||||
case TxRecord.TxObjectRecordDeleted deleted -> {
|
||||
return Optional.empty();
|
||||
}
|
||||
case null, default -> {
|
||||
}
|
||||
}
|
||||
|
||||
return getFromSource(type, key);
|
||||
return switch (_writes.get(key)) {
|
||||
case TxRecord.TxObjectRecordWrite<?> write -> Optional.of(type.cast(write.data()));
|
||||
case TxRecord.TxObjectRecordDeleted deleted -> Optional.empty();
|
||||
case null -> getFromSource(type, key);
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public void delete(JObjectKey key) {
|
||||
var got = _writes.get(key);
|
||||
if (got != null) {
|
||||
if (got instanceof TxRecord.TxObjectRecordDeleted) {
|
||||
return;
|
||||
}
|
||||
var record = new TxRecord.TxObjectRecordDeleted(key);
|
||||
if (_writes.put(key, record) instanceof TxRecord.TxObjectRecordDeleted) {
|
||||
return;
|
||||
}
|
||||
|
||||
_writes.put(key, new TxRecord.TxObjectRecordDeleted(key));
|
||||
_newWrites.put(key, new TxRecord.TxObjectRecordDeleted(key));
|
||||
if (_writeTrack)
|
||||
_newWrites.put(key, record);
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -163,25 +151,35 @@ public class TransactionFactoryImpl implements TransactionFactory {
|
||||
|
||||
@Override
|
||||
public void put(JData obj) {
|
||||
var read = _readSet.get(obj.key());
|
||||
var key = obj.key();
|
||||
var read = _readSet.get(key);
|
||||
if (read != null && (read.map(JDataVersionedWrapper::data).orElse(null) == obj)) {
|
||||
return;
|
||||
}
|
||||
|
||||
_writes.put(obj.key(), new TxRecord.TxObjectRecordWrite<>(obj));
|
||||
_newWrites.put(obj.key(), new TxRecord.TxObjectRecordWrite<>(obj));
|
||||
var record = new TxRecord.TxObjectRecordWrite<>(obj);
|
||||
_writes.put(key, record);
|
||||
if (_writeTrack)
|
||||
_newWrites.put(key, record);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void putNew(JData obj) {
|
||||
_knownNew.add(obj.key());
|
||||
var key = obj.key();
|
||||
_knownNew.add(key);
|
||||
|
||||
_writes.put(obj.key(), new TxRecord.TxObjectRecordWrite<>(obj));
|
||||
_newWrites.put(obj.key(), new TxRecord.TxObjectRecordWrite<>(obj));
|
||||
var record = new TxRecord.TxObjectRecordWrite<>(obj);
|
||||
_writes.put(key, record);
|
||||
if (_writeTrack)
|
||||
_newWrites.put(key, record);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Collection<TxRecord.TxObjectRecord<?>> drainNewWrites() {
|
||||
if (!_writeTrack) {
|
||||
_writeTrack = true;
|
||||
return Collections.unmodifiableCollection(_writes.values());
|
||||
}
|
||||
var ret = _newWrites;
|
||||
_newWrites = new HashMap<>();
|
||||
return ret.values();
|
||||
|
||||
@@ -12,8 +12,8 @@ public interface TransactionManager {
|
||||
|
||||
void rollback();
|
||||
|
||||
default <T> T runTries(Supplier<T> supplier, int tries) {
|
||||
if (current() != null) {
|
||||
default <T> T runTries(Supplier<T> supplier, int tries, boolean nest) {
|
||||
if (!nest && current() != null) {
|
||||
return supplier.get();
|
||||
}
|
||||
|
||||
@@ -41,8 +41,8 @@ public interface TransactionManager {
|
||||
}
|
||||
}
|
||||
|
||||
default TransactionHandle runTries(VoidFn fn, int tries) {
|
||||
if (current() != null) {
|
||||
default TransactionHandle runTries(VoidFn fn, int tries, boolean nest) {
|
||||
if (!nest && current() != null) {
|
||||
fn.apply();
|
||||
return new TransactionHandle() {
|
||||
@Override
|
||||
@@ -74,23 +74,38 @@ public interface TransactionManager {
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
default <T> T runTries(Supplier<T> supplier, int tries) {
|
||||
return runTries(supplier, tries, false);
|
||||
}
|
||||
|
||||
default TransactionHandle runTries(VoidFn fn, int tries) {
|
||||
return runTries(fn, tries, false);
|
||||
}
|
||||
|
||||
default TransactionHandle run(VoidFn fn, boolean nest) {
|
||||
return runTries(fn, 10, nest);
|
||||
}
|
||||
|
||||
default <T> T run(Supplier<T> supplier, boolean nest) {
|
||||
return runTries(supplier, 10, nest);
|
||||
}
|
||||
|
||||
default TransactionHandle run(VoidFn fn) {
|
||||
return runTries(fn, 10);
|
||||
return run(fn, false);
|
||||
}
|
||||
|
||||
default <T> T run(Supplier<T> supplier) {
|
||||
return runTries(supplier, 10);
|
||||
return run(supplier, false);
|
||||
}
|
||||
|
||||
default void executeTx(VoidFn fn) {
|
||||
run(fn);
|
||||
run(fn, false);
|
||||
}
|
||||
|
||||
default <T> T executeTx(Supplier<T> supplier) {
|
||||
return run(supplier);
|
||||
return run(supplier, false);
|
||||
}
|
||||
|
||||
Transaction current();
|
||||
|
||||
@@ -1,47 +1,48 @@
|
||||
package com.usatiuk.objects.transaction;
|
||||
|
||||
import io.quarkus.logging.Log;
|
||||
import jakarta.annotation.Nullable;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Singleton;
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.Stack;
|
||||
|
||||
@Singleton
|
||||
public class TransactionManagerImpl implements TransactionManager {
|
||||
private static final ThreadLocal<TransactionPrivate> _currentTransaction = new ThreadLocal<>();
|
||||
private static final ThreadLocal<Stack<TransactionPrivate>> _currentTransaction = ThreadLocal.withInitial(Stack::new);
|
||||
@Inject
|
||||
JObjectManager jObjectManager;
|
||||
|
||||
@Override
|
||||
public void begin() {
|
||||
if (_currentTransaction.get() != null) {
|
||||
throw new IllegalStateException("Transaction already started");
|
||||
}
|
||||
|
||||
Log.trace("Starting transaction");
|
||||
var tx = jObjectManager.createTransaction();
|
||||
_currentTransaction.set(tx);
|
||||
_currentTransaction.get().push(tx);
|
||||
}
|
||||
|
||||
@Override
|
||||
public TransactionHandle commit() {
|
||||
if (_currentTransaction.get() == null) {
|
||||
var stack = _currentTransaction.get();
|
||||
if (stack.empty()) {
|
||||
throw new IllegalStateException("No transaction started");
|
||||
}
|
||||
var peeked = stack.peek();
|
||||
|
||||
Log.trace("Committing transaction");
|
||||
|
||||
Pair<Collection<Runnable>, TransactionHandle> ret;
|
||||
try {
|
||||
ret = jObjectManager.commit(_currentTransaction.get());
|
||||
ret = jObjectManager.commit(peeked);
|
||||
} catch (Throwable e) {
|
||||
Log.trace("Transaction commit failed", e);
|
||||
throw e;
|
||||
} finally {
|
||||
_currentTransaction.get().close();
|
||||
_currentTransaction.remove();
|
||||
peeked.close();
|
||||
stack.pop();
|
||||
if (stack.empty())
|
||||
_currentTransaction.remove();
|
||||
}
|
||||
|
||||
for (var r : ret.getLeft()) {
|
||||
@@ -56,24 +57,33 @@ public class TransactionManagerImpl implements TransactionManager {
|
||||
|
||||
@Override
|
||||
public void rollback() {
|
||||
if (_currentTransaction.get() == null) {
|
||||
var stack = _currentTransaction.get();
|
||||
if (stack.empty()) {
|
||||
throw new IllegalStateException("No transaction started");
|
||||
}
|
||||
var peeked = stack.peek();
|
||||
|
||||
try {
|
||||
jObjectManager.rollback(_currentTransaction.get());
|
||||
jObjectManager.rollback(peeked);
|
||||
} catch (Throwable e) {
|
||||
Log.error("Transaction rollback failed", e);
|
||||
throw e;
|
||||
} finally {
|
||||
_currentTransaction.get().close();
|
||||
_currentTransaction.remove();
|
||||
peeked.close();
|
||||
stack.pop();
|
||||
if (stack.empty())
|
||||
_currentTransaction.remove();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
@Nullable
|
||||
public Transaction current() {
|
||||
return _currentTransaction.get();
|
||||
var stack = _currentTransaction.get();
|
||||
if (stack.empty()) {
|
||||
return null;
|
||||
}
|
||||
return stack.peek();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,10 +1,12 @@
|
||||
dhfs.objects.persistence=lmdb
|
||||
dhfs.objects.writeback.limit=134217728
|
||||
dhfs.objects.lru.limit=134217728
|
||||
dhfs.objects.writeback.limit=16777216
|
||||
dhfs.objects.lru.limit=67108864
|
||||
dhfs.objects.lru.print-stats=false
|
||||
dhfs.objects.lock_timeout_secs=15
|
||||
dhfs.objects.persistence.files.root=${HOME}/dhfs_default/data/objs
|
||||
dhfs.objects.persistence.snapshot-extra-checks=false
|
||||
dhfs.objects.transaction.never-lock=true
|
||||
dhfs.objects.last-seen.update=60
|
||||
dhfs.objects.last-seen.timeout=43200
|
||||
quarkus.log.category."com.usatiuk.objects.iterators".level=INFO
|
||||
quarkus.log.category."com.usatiuk.objects.iterators".min-level=INFO
|
||||
|
||||
@@ -19,8 +19,6 @@ import jakarta.inject.Inject;
|
||||
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
|
||||
import org.eclipse.microprofile.config.inject.ConfigProperty;
|
||||
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
|
||||
@@ -56,27 +54,21 @@ public class AutosyncProcessor {
|
||||
if (downloadAll)
|
||||
executorService.submit(() -> {
|
||||
Log.info("Adding all to autosync");
|
||||
List<JObjectKey> objs = new LinkedList<>();
|
||||
txm.run(() -> {
|
||||
try (var it = curTx.getIterator(IteratorStart.GE, JObjectKey.first())) {
|
||||
while (it.hasNext()) {
|
||||
var key = it.peekNextKey();
|
||||
objs.add(key);
|
||||
// TODO: Nested transactions
|
||||
txm.run(() -> {
|
||||
var gotObj = curTx.get(JData.class, key).orElse(null);
|
||||
if (!(gotObj instanceof RemoteObjectMeta meta))
|
||||
return;
|
||||
if (!meta.hasLocalData())
|
||||
add(meta.key());
|
||||
}, true);
|
||||
it.skip();
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
for (var obj : objs) {
|
||||
txm.run(() -> {
|
||||
var gotObj = curTx.get(JData.class, obj).orElse(null);
|
||||
if (!(gotObj instanceof RemoteObjectMeta meta))
|
||||
return;
|
||||
if (!meta.hasLocalData())
|
||||
add(meta.key());
|
||||
});
|
||||
}
|
||||
Log.info("Adding all to autosync: finished");
|
||||
});
|
||||
}
|
||||
|
||||
@@ -1,13 +1,14 @@
|
||||
package com.usatiuk.dhfs.invalidation;
|
||||
|
||||
import com.usatiuk.dhfs.peersync.PeerId;
|
||||
import com.usatiuk.dhfs.remoteobj.JDataRemoteDto;
|
||||
import com.usatiuk.objects.JObjectKey;
|
||||
import org.pcollections.PMap;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
|
||||
public record IndexUpdateOp(JObjectKey key, PMap<PeerId, Long> changelog) implements Op {
|
||||
public record IndexUpdateOp(JObjectKey key, PMap<PeerId, Long> changelog, JDataRemoteDto data) implements Op {
|
||||
@Override
|
||||
public Collection<JObjectKey> getEscapedRefs() {
|
||||
return List.of(key);
|
||||
|
||||
@@ -1,10 +1,14 @@
|
||||
package com.usatiuk.dhfs.invalidation;
|
||||
|
||||
import com.usatiuk.dhfs.peersync.PeerId;
|
||||
import com.usatiuk.dhfs.remoteobj.JDataRemoteDto;
|
||||
import com.usatiuk.dhfs.remoteobj.JDataRemotePush;
|
||||
import com.usatiuk.dhfs.remoteobj.RemoteObjectMeta;
|
||||
import com.usatiuk.dhfs.remoteobj.RemoteTransaction;
|
||||
import com.usatiuk.dhfs.syncmap.DtoMapperService;
|
||||
import com.usatiuk.objects.transaction.Transaction;
|
||||
import com.usatiuk.objects.transaction.TransactionManager;
|
||||
import io.quarkus.logging.Log;
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
import jakarta.inject.Inject;
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
@@ -19,11 +23,22 @@ public class RemoteObjectMetaOpExtractor implements OpExtractor<RemoteObjectMeta
|
||||
Transaction curTx;
|
||||
@Inject
|
||||
RemoteTransaction remoteTransaction;
|
||||
@Inject
|
||||
DtoMapperService dtoMapperService;
|
||||
|
||||
@Override
|
||||
public Pair<List<Op>, Runnable> extractOps(RemoteObjectMeta data, PeerId peerId) {
|
||||
return txm.run(() -> {
|
||||
return Pair.of(List.of(new IndexUpdateOp(data.key(), data.changelog())), () -> {
|
||||
JDataRemoteDto dto =
|
||||
data.knownType().isAnnotationPresent(JDataRemotePush.class)
|
||||
? remoteTransaction.getData(data.knownType(), data.key())
|
||||
.map(d -> dtoMapperService.toDto(d, d.dtoClass())).orElse(null)
|
||||
: null;
|
||||
|
||||
if (data.knownType().isAnnotationPresent(JDataRemotePush.class) && dto == null) {
|
||||
Log.warnv("Failed to get data for push {0} of type {1}", data.key(), data.knownType());
|
||||
}
|
||||
return Pair.of(List.of(new IndexUpdateOp(data.key(), data.changelog(), dto)), () -> {
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
@@ -24,6 +24,6 @@ public class JKleppmannTreePeerInterface implements PeerInterface<PeerId> {
|
||||
|
||||
@Override
|
||||
public Collection<PeerId> getAllPeers() {
|
||||
return peerInfoService.getPeers().stream().map(PeerInfo::id).toList();
|
||||
return peerInfoService.getSynchronizedPeers().stream().map(PeerInfo::id).toList();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4,24 +4,48 @@ import com.google.protobuf.ByteString;
|
||||
import com.usatiuk.dhfs.peertrust.CertificateTools;
|
||||
import com.usatiuk.dhfs.remoteobj.JDataRemote;
|
||||
import com.usatiuk.dhfs.remoteobj.JDataRemoteDto;
|
||||
import com.usatiuk.dhfs.remoteobj.JDataRemotePush;
|
||||
import com.usatiuk.objects.JObjectKey;
|
||||
import org.pcollections.HashTreePMap;
|
||||
import org.pcollections.PMap;
|
||||
|
||||
import java.security.cert.X509Certificate;
|
||||
|
||||
public record PeerInfo(JObjectKey key, PeerId id, ByteString cert) implements JDataRemote, JDataRemoteDto {
|
||||
@JDataRemotePush
|
||||
public record PeerInfo(JObjectKey key, PeerId id, ByteString cert,
|
||||
PMap<PeerId, Long> kickCounter,
|
||||
long lastSeenTimestamp) implements JDataRemote, JDataRemoteDto {
|
||||
public PeerInfo(PeerId id, byte[] cert) {
|
||||
this(id.toJObjectKey(), id, ByteString.copyFrom(cert));
|
||||
this(id.toJObjectKey(), id, ByteString.copyFrom(cert), HashTreePMap.empty(), System.currentTimeMillis());
|
||||
}
|
||||
|
||||
public X509Certificate parsedCert() {
|
||||
return CertificateTools.certFromBytes(cert.toByteArray());
|
||||
}
|
||||
|
||||
public PeerInfo withKickCounter(PMap<PeerId, Long> kickCounter) {
|
||||
return new PeerInfo(key, id, cert, kickCounter, lastSeenTimestamp);
|
||||
}
|
||||
|
||||
public PeerInfo withIncrementedKickCounter(PeerId peerId) {
|
||||
return new PeerInfo(key, id, cert, kickCounter.plus(peerId, kickCounter.getOrDefault(peerId, 0L) + 1), lastSeenTimestamp);
|
||||
}
|
||||
|
||||
public PeerInfo withLastSeenTimestamp(long lastSeenTimestamp) {
|
||||
return new PeerInfo(key, id, cert, kickCounter, lastSeenTimestamp);
|
||||
}
|
||||
|
||||
public long kickCounterSum() {
|
||||
return kickCounter.values().stream().mapToLong(Long::longValue).sum();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "PeerInfo{" +
|
||||
"key=" + key +
|
||||
", id=" + id +
|
||||
", kickCounter=" + kickCounter +
|
||||
", lastSeenTimestamp=" + lastSeenTimestamp +
|
||||
'}';
|
||||
}
|
||||
}
|
||||
|
||||
@@ -87,17 +87,27 @@ public class PeerInfoService {
|
||||
|
||||
public List<PeerInfo> getPeersNoSelf() {
|
||||
return jObjectTxManager.run(() -> {
|
||||
var gotKey = getTreeR().traverse(List.of());
|
||||
return curTx.get(JKleppmannTreeNodeHolder.class, gotKey).map(JKleppmannTreeNodeHolder::node).map(
|
||||
node -> node.children().keySet().stream()
|
||||
.map(JObjectKey::of).map(this::getPeerInfoImpl)
|
||||
.filter(o -> {
|
||||
if (o.isEmpty())
|
||||
Log.warnv("Could not get peer info for peer {0}", o);
|
||||
return o.isPresent();
|
||||
}).map(Optional::get).filter(
|
||||
peerInfo -> !peerInfo.id().equals(persistentPeerDataService.getSelfUuid())).toList())
|
||||
.orElseThrow();
|
||||
return getPeers().stream().filter(
|
||||
peerInfo -> !peerInfo.id().equals(persistentPeerDataService.getSelfUuid())).toList();
|
||||
});
|
||||
}
|
||||
|
||||
public List<PeerInfo> getSynchronizedPeers() {
|
||||
return jObjectTxManager.run(() -> {
|
||||
return getPeers().stream().filter(pi -> {
|
||||
if (pi.id().equals(persistentPeerDataService.getSelfUuid())) {
|
||||
return true;
|
||||
}
|
||||
return persistentPeerDataService.isInitialSyncDone(pi.id());
|
||||
}).toList();
|
||||
});
|
||||
}
|
||||
|
||||
public List<PeerInfo> getSynchronizedPeersNoSelf() {
|
||||
return jObjectTxManager.run(() -> {
|
||||
return getPeersNoSelf().stream().filter(pi -> {
|
||||
return persistentPeerDataService.isInitialSyncDone(pi.id());
|
||||
}).toList();
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -89,10 +89,11 @@ public class PeerInfoSyncHandler implements ObjSyncHandler<PeerInfo, PeerInfo> {
|
||||
if (oursCurData == null)
|
||||
throw new StatusRuntimeException(Status.ABORTED.withDescription("Conflict but we don't have local copy"));
|
||||
|
||||
if (!receivedData.equals(oursCurData))
|
||||
throw new StatusRuntimeException(Status.ABORTED.withDescription("PeerInfo data conflict"));
|
||||
if (!receivedData.cert().equals(oursCurData.cert()))
|
||||
throw new StatusRuntimeException(Status.ABORTED.withDescription("PeerInfo certificate conflict for " + key));
|
||||
|
||||
HashPMap<PeerId, Long> newChangelog = HashTreePMap.from(current.changelog());
|
||||
HashPMap<PeerId, Long> newKickCounter = HashTreePMap.from(oursCurData.kickCounter());
|
||||
|
||||
for (var entry : receivedChangelog.entrySet()) {
|
||||
newChangelog = newChangelog.plus(entry.getKey(),
|
||||
@@ -100,6 +101,20 @@ public class PeerInfoSyncHandler implements ObjSyncHandler<PeerInfo, PeerInfo> {
|
||||
);
|
||||
}
|
||||
|
||||
for (var entry : receivedData.kickCounter().entrySet()) {
|
||||
newKickCounter = newKickCounter.plus(entry.getKey(),
|
||||
Long.max(newKickCounter.getOrDefault(entry.getKey(), 0L), entry.getValue())
|
||||
);
|
||||
}
|
||||
|
||||
var newData = oursCurData.withKickCounter(newKickCounter)
|
||||
.withLastSeenTimestamp(Math.max(oursCurData.lastSeenTimestamp(), receivedData.lastSeenTimestamp()));
|
||||
|
||||
if (!newData.equals(oursCurData))
|
||||
newChangelog = newChangelog.plus(persistentPeerDataService.getSelfUuid(), newChangelog.getOrDefault(persistentPeerDataService.getSelfUuid(), 0L) + 1L);
|
||||
|
||||
remoteTx.putDataRaw(newData);
|
||||
|
||||
current = current.withChangelog(newChangelog);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,60 @@
|
||||
package com.usatiuk.dhfs.peersync;
|
||||
|
||||
import com.usatiuk.dhfs.remoteobj.RemoteTransaction;
|
||||
import com.usatiuk.objects.transaction.Transaction;
|
||||
import com.usatiuk.objects.transaction.TransactionManager;
|
||||
import io.quarkus.scheduler.Scheduled;
|
||||
import io.smallrye.common.annotation.Blocking;
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
import jakarta.inject.Inject;
|
||||
import org.eclipse.microprofile.config.inject.ConfigProperty;
|
||||
|
||||
@ApplicationScoped
|
||||
public class PeerLastSeenUpdater {
|
||||
@Inject
|
||||
PeerManager peerManager;
|
||||
@Inject
|
||||
PeerInfoService peerInfoService;
|
||||
@Inject
|
||||
Transaction curTx;
|
||||
@Inject
|
||||
TransactionManager txm;
|
||||
@Inject
|
||||
RemoteTransaction remoteTransaction;
|
||||
|
||||
@ConfigProperty(name = "dhfs.objects.last-seen.timeout")
|
||||
long lastSeenTimeout;
|
||||
@Inject
|
||||
PersistentPeerDataService persistentPeerDataService;
|
||||
|
||||
@Scheduled(every = "${dhfs.objects.last-seen.update}", concurrentExecution = Scheduled.ConcurrentExecution.SKIP)
|
||||
@Blocking
|
||||
void update() {
|
||||
var snapshot = peerManager.getHostStateSnapshot();
|
||||
for (var a : snapshot.available()) {
|
||||
txm.run(() -> {
|
||||
var curInfo = remoteTransaction.getData(PeerInfo.class, a.id()).orElse(null);
|
||||
if (curInfo == null) return;
|
||||
|
||||
var newInfo = curInfo.withLastSeenTimestamp(System.currentTimeMillis());
|
||||
remoteTransaction.putData(newInfo);
|
||||
});
|
||||
}
|
||||
|
||||
for (var u : snapshot.unavailable()) {
|
||||
txm.run(() -> {
|
||||
if (!persistentPeerDataService.isInitialSyncDone(u))
|
||||
return;
|
||||
|
||||
var curInfo = remoteTransaction.getData(PeerInfo.class, u.id()).orElse(null);
|
||||
if (curInfo == null) return;
|
||||
|
||||
var lastSeen = curInfo.lastSeenTimestamp();
|
||||
if (System.currentTimeMillis() - lastSeen > (lastSeenTimeout * 1000)) {
|
||||
var kicked = curInfo.withIncrementedKickCounter(persistentPeerDataService.getSelfUuid());
|
||||
remoteTransaction.putData(kicked);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -125,19 +125,23 @@ public class PeerManager {
|
||||
}
|
||||
}
|
||||
|
||||
public void handleConnectionError(com.usatiuk.dhfs.peersync.PeerInfo host) {
|
||||
public void handleConnectionError(PeerId host) {
|
||||
boolean wasReachable = isReachable(host);
|
||||
|
||||
if (wasReachable)
|
||||
Log.infov("Lost connection to {0}", host);
|
||||
|
||||
_states.remove(host.id());
|
||||
_states.remove(host);
|
||||
|
||||
for (var l : _disconnectedListeners) {
|
||||
l.handlePeerDisconnected(host.id());
|
||||
l.handlePeerDisconnected(host);
|
||||
}
|
||||
}
|
||||
|
||||
public void handleConnectionError(com.usatiuk.dhfs.peersync.PeerInfo host) {
|
||||
handleConnectionError(host.id());
|
||||
}
|
||||
|
||||
// FIXME:
|
||||
private boolean pingCheck(com.usatiuk.dhfs.peersync.PeerInfo host, PeerAddress address) {
|
||||
try {
|
||||
|
||||
@@ -51,6 +51,8 @@ public class PersistentPeerDataService {
|
||||
PeerInfoService peerInfoService;
|
||||
@Inject
|
||||
TransactionManager txm;
|
||||
@Inject
|
||||
PeerManager peerManager;
|
||||
|
||||
@ConfigProperty(name = "dhfs.peerdiscovery.preset-uuid")
|
||||
Optional<String> presetUuid;
|
||||
@@ -89,21 +91,6 @@ public class PersistentPeerDataService {
|
||||
Files.write(Path.of(stuffRoot, "self_uuid"), _selfUuid.id().toString().getBytes(), StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING);
|
||||
}
|
||||
|
||||
// private void pushPeerUpdates() {
|
||||
// pushPeerUpdates(null);
|
||||
// }
|
||||
|
||||
// private void pushPeerUpdates(@Nullable JObject<?> obj) {
|
||||
// if (obj != null)
|
||||
// Log.info("Scheduling certificate update after " + obj.getMeta().getName() + " was updated");
|
||||
// executorService.submit(() -> {
|
||||
// updateCerts();
|
||||
// invalidationQueueService.pushInvalidationToAll(PeerDirectory.PeerDirectoryObjName);
|
||||
// for (var p : peerDirectory.get().runReadLocked(JObjectManager.ResolutionStrategy.LOCAL_ONLY, (m, d) -> d.getPeers().stream().toList()))
|
||||
// invalidationQueueService.pushInvalidationToAll(PersistentPeerInfo.getNameFromUuid(p));
|
||||
// });
|
||||
// }
|
||||
|
||||
public PeerId getSelfUuid() {
|
||||
return _selfUuid;
|
||||
}
|
||||
@@ -148,6 +135,7 @@ public class PersistentPeerDataService {
|
||||
}
|
||||
curTx.put(data.withInitialSyncDone(data.initialSyncDone().minus(peerId)));
|
||||
Log.infov("Did reset sync state for {0}", peerId);
|
||||
curTx.onCommit(() -> peerManager.handleConnectionError(peerId));
|
||||
return true;
|
||||
});
|
||||
}
|
||||
@@ -160,7 +148,6 @@ public class PersistentPeerDataService {
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
public List<IpPeerAddress> getPersistentPeerAddresses() {
|
||||
return txm.run(() -> {
|
||||
var data = curTx.get(PersistentRemoteHostsData.class, PersistentRemoteHostsData.KEY).orElse(null);
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
package com.usatiuk.dhfs.peertrust;
|
||||
|
||||
import com.usatiuk.dhfs.invalidation.InvalidationQueueService;
|
||||
import com.usatiuk.dhfs.jkleppmanntree.structs.JKleppmannTreeNode;
|
||||
import com.usatiuk.dhfs.jkleppmanntree.structs.JKleppmannTreeNodeHolder;
|
||||
import com.usatiuk.dhfs.peersync.PeerInfo;
|
||||
import com.usatiuk.dhfs.peersync.PeerInfoService;
|
||||
@@ -43,7 +42,7 @@ public class PeerInfoCertUpdateTxHook implements PreCommitTxHook {
|
||||
for (var curRef : oldNode.node().children().entrySet()) {
|
||||
if (!n.node().children().containsKey(curRef.getKey())) {
|
||||
Log.infov("Will reset sync state for {0}", curRef.getValue());
|
||||
curTx.onCommit(() -> persistentPeerDataService.resetInitialSyncDone(JKleppmannTreeNodeMetaPeer.nodeIdToPeerId(curRef.getValue())));
|
||||
persistentPeerDataService.resetInitialSyncDone(JKleppmannTreeNodeMetaPeer.nodeIdToPeerId(curRef.getValue()));
|
||||
}
|
||||
}
|
||||
return;
|
||||
@@ -54,9 +53,16 @@ public class PeerInfoCertUpdateTxHook implements PreCommitTxHook {
|
||||
return;
|
||||
}
|
||||
|
||||
if (!(remote.data() instanceof PeerInfo))
|
||||
if (!(remote.data() instanceof PeerInfo curPi))
|
||||
return;
|
||||
|
||||
var oldPi = (PeerInfo) ((RemoteObjectDataWrapper) old).data();
|
||||
|
||||
if (oldPi.kickCounterSum() != curPi.kickCounterSum()) {
|
||||
Log.warnv("Peer kicked out: {0} to {1}", key, cur);
|
||||
persistentPeerDataService.resetInitialSyncDone(curPi.id());
|
||||
}
|
||||
|
||||
Log.infov("Changed peer info: {0} to {1}", key, cur);
|
||||
|
||||
curTx.onCommit(() -> invalidationQueueService.pushInvalidationToAll(key));
|
||||
|
||||
@@ -20,7 +20,7 @@ public class IndexUpdateOpHandler implements OpHandler<IndexUpdateOp> {
|
||||
@Override
|
||||
public void handleOp(PeerId from, IndexUpdateOp op) {
|
||||
txm.run(() -> {
|
||||
syncHandler.handleRemoteUpdate(from, op.key(), op.changelog(), null);
|
||||
syncHandler.handleRemoteUpdate(from, op.key(), op.changelog(), op.data());
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,11 @@
|
||||
package com.usatiuk.dhfs.remoteobj;
|
||||
|
||||
import java.lang.annotation.ElementType;
|
||||
import java.lang.annotation.Retention;
|
||||
import java.lang.annotation.RetentionPolicy;
|
||||
import java.lang.annotation.Target;
|
||||
|
||||
@Retention(RetentionPolicy.RUNTIME)
|
||||
@Target(ElementType.TYPE)
|
||||
public @interface JDataRemotePush {
|
||||
}
|
||||
@@ -132,7 +132,7 @@ public class RemoteObjectDeleter {
|
||||
return true;
|
||||
}
|
||||
|
||||
var knownHosts = peerInfoService.getPeersNoSelf();
|
||||
var knownHosts = peerInfoService.getSynchronizedPeersNoSelf();
|
||||
RemoteObjectMeta finalTarget = target;
|
||||
List<PeerId> missing = knownHosts.stream()
|
||||
.map(PeerInfo::id)
|
||||
@@ -187,7 +187,7 @@ public class RemoteObjectDeleter {
|
||||
if (!obj.seen())
|
||||
return true;
|
||||
|
||||
var knownHosts = peerInfoService.getPeersNoSelf();
|
||||
var knownHosts = peerInfoService.getSynchronizedPeersNoSelf();
|
||||
boolean missing = false;
|
||||
for (var x : knownHosts) {
|
||||
if (!obj.confirmedDeletes().contains(x.id())) {
|
||||
|
||||
@@ -23,7 +23,10 @@ import org.pcollections.PMap;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.lang.reflect.ParameterizedType;
|
||||
import java.util.*;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
@ApplicationScoped
|
||||
@@ -124,52 +127,42 @@ public class SyncHandler {
|
||||
public void resyncAfterCrash(@Observes @Priority(100000) StartupEvent event) {
|
||||
if (shutdownChecker.lastShutdownClean())
|
||||
return;
|
||||
List<JObjectKey> objs = new LinkedList<>();
|
||||
txm.run(() -> {
|
||||
try (var it = curTx.getIterator(IteratorStart.GE, JObjectKey.first())) {
|
||||
while (it.hasNext()) {
|
||||
var key = it.peekNextKey();
|
||||
objs.add(key);
|
||||
// TODO: Nested transactions
|
||||
txm.run(() -> {
|
||||
var proc = curTx.get(JData.class, key).flatMap(o -> Optional.ofNullable(_initialSyncProcessors.get(o.getClass()))).orElse(null);
|
||||
if (proc != null) {
|
||||
proc.handleCrash(key);
|
||||
}
|
||||
Log.infov("Handled crash of {0}", key);
|
||||
|
||||
}, true);
|
||||
it.skip();
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
for (var obj : objs) {
|
||||
txm.run(() -> {
|
||||
var proc = curTx.get(JData.class, obj).flatMap(o -> Optional.ofNullable(_initialSyncProcessors.get(o.getClass()))).orElse(null);
|
||||
if (proc != null) {
|
||||
proc.handleCrash(obj);
|
||||
}
|
||||
Log.infov("Handled crash of {0}", obj);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
public void doInitialSync(PeerId peer) {
|
||||
List<JObjectKey> objs = new LinkedList<>();
|
||||
txm.run(() -> {
|
||||
Log.tracev("Will do initial sync for {0}", peer);
|
||||
try (var it = curTx.getIterator(IteratorStart.GE, JObjectKey.first())) {
|
||||
while (it.hasNext()) {
|
||||
var key = it.peekNextKey();
|
||||
objs.add(key);
|
||||
// TODO: Nested transactions
|
||||
txm.run(() -> {
|
||||
var proc = curTx.get(JData.class, key).flatMap(o -> Optional.ofNullable(_initialSyncProcessors.get(o.getClass()))).orElse(null);
|
||||
if (proc != null) {
|
||||
proc.prepareForInitialSync(peer, key);
|
||||
}
|
||||
Log.infov("Adding to initial sync for peer {0}: {1}", peer, key);
|
||||
invalidationQueueService.pushInvalidationToOne(peer, key);
|
||||
|
||||
}, true);
|
||||
it.skip();
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
for (var obj : objs) {
|
||||
txm.run(() -> {
|
||||
var proc = curTx.get(JData.class, obj).flatMap(o -> Optional.ofNullable(_initialSyncProcessors.get(o.getClass()))).orElse(null);
|
||||
if (proc != null) {
|
||||
proc.prepareForInitialSync(peer, obj);
|
||||
}
|
||||
Log.infov("Adding to initial sync for peer {0}: {1}", peer, obj);
|
||||
invalidationQueueService.pushInvalidationToOne(peer, obj);
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -17,7 +17,7 @@ public class UninitializedByteBuffer {
|
||||
);
|
||||
|
||||
public static ByteBuffer allocate(int capacity) {
|
||||
UnsafeAccessor.get().getNioAccess().reserveMemory(capacity, capacity);
|
||||
UnsafeAccessor.NIO.reserveMemory(capacity, capacity);
|
||||
|
||||
MemorySegment segment = null;
|
||||
try {
|
||||
@@ -29,7 +29,7 @@ public class UninitializedByteBuffer {
|
||||
Consumer<MemorySegment> cleanup = s -> {
|
||||
try {
|
||||
free.invokeExact(s);
|
||||
UnsafeAccessor.get().getNioAccess().unreserveMemory(capacity, capacity);
|
||||
UnsafeAccessor.NIO.unreserveMemory(capacity, capacity);
|
||||
} catch (Throwable e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
@@ -39,6 +39,6 @@ public class UninitializedByteBuffer {
|
||||
}
|
||||
|
||||
public static long getAddress(ByteBuffer buffer) {
|
||||
return UnsafeAccessor.get().getNioAccess().getBufferAddress(buffer);
|
||||
return UnsafeAccessor.NIO.getBufferAddress(buffer);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -6,36 +6,18 @@ import sun.misc.Unsafe;
|
||||
|
||||
import java.lang.reflect.Field;
|
||||
|
||||
public class UnsafeAccessor {
|
||||
private static final UnsafeAccessor INSTANCE;
|
||||
public abstract class UnsafeAccessor {
|
||||
public static final JavaNioAccess NIO;
|
||||
public static final Unsafe UNSAFE;
|
||||
|
||||
static {
|
||||
try {
|
||||
INSTANCE = new UnsafeAccessor();
|
||||
NIO = SharedSecrets.getJavaNioAccess();
|
||||
Field f = Unsafe.class.getDeclaredField("theUnsafe");
|
||||
f.setAccessible(true);
|
||||
UNSAFE = (Unsafe) f.get(null);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
public static UnsafeAccessor get() {
|
||||
return INSTANCE;
|
||||
}
|
||||
|
||||
private JavaNioAccess _nioAccess;
|
||||
private Unsafe _unsafe;
|
||||
|
||||
private UnsafeAccessor() throws NoSuchFieldException, IllegalAccessException {
|
||||
_nioAccess = SharedSecrets.getJavaNioAccess();
|
||||
Field f = Unsafe.class.getDeclaredField("theUnsafe");
|
||||
f.setAccessible(true);
|
||||
_unsafe = (Unsafe) f.get(null);
|
||||
}
|
||||
|
||||
public JavaNioAccess getNioAccess() {
|
||||
return _nioAccess;
|
||||
}
|
||||
|
||||
public Unsafe getUnsafe() {
|
||||
return _unsafe;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -38,7 +38,7 @@ java \
|
||||
-Dquarkus.log.category.\"com.usatiuk\".level=INFO \
|
||||
-Dquarkus.log.category.\"com.usatiuk.dhfs\".level=INFO \
|
||||
-Ddhfs.webui.root="$SCRIPT_DIR"/Webui $EXTRAOPTS_PARSED \
|
||||
-jar "$SCRIPT_DIR"/"DHFS Package"/quarkus-run.jar >quarkus.log 2>&1 &
|
||||
-jar "$SCRIPT_DIR"/"Server"/quarkus-run.jar >quarkus.log 2>&1 &
|
||||
|
||||
echo "Started $!"
|
||||
|
||||
|
||||
@@ -40,10 +40,10 @@ wget https://nightly.link/usatiuk/dhfs/actions/runs/$LATEST/Run%20wrapper.zip
|
||||
|
||||
unzip "Run wrapper.zip"
|
||||
rm "Run wrapper.zip"
|
||||
tar xvf "run-wrapper.tar.gz" --strip-components=2
|
||||
tar xvf "run-wrapper.tar.gz" --strip-components 2
|
||||
rm "run-wrapper.tar.gz"
|
||||
|
||||
rm -rf "DHFS Package"
|
||||
rm -rf "Server"
|
||||
rm -rf "Webui"
|
||||
rm -rf "NativeLibs"
|
||||
|
||||
|
||||
Reference in New Issue
Block a user