mirror of
https://github.com/usatiuk/dhfs.git
synced 2025-10-28 20:47:49 +01:00
Compare commits
3 Commits
289a2b880e
...
52ccbb99bc
| Author | SHA1 | Date | |
|---|---|---|---|
| 52ccbb99bc | |||
| d972cd1562 | |||
| 80151bcca5 |
@@ -8,7 +8,6 @@ import com.usatiuk.dhfs.remoteobj.*;
|
|||||||
import com.usatiuk.dhfsfs.service.DhfsFileService;
|
import com.usatiuk.dhfsfs.service.DhfsFileService;
|
||||||
import com.usatiuk.kleppmanntree.AlreadyExistsException;
|
import com.usatiuk.kleppmanntree.AlreadyExistsException;
|
||||||
import com.usatiuk.objects.JObjectKey;
|
import com.usatiuk.objects.JObjectKey;
|
||||||
import com.usatiuk.objects.transaction.LockingStrategy;
|
|
||||||
import com.usatiuk.objects.transaction.Transaction;
|
import com.usatiuk.objects.transaction.Transaction;
|
||||||
import io.grpc.Status;
|
import io.grpc.Status;
|
||||||
import io.grpc.StatusRuntimeException;
|
import io.grpc.StatusRuntimeException;
|
||||||
@@ -42,14 +41,10 @@ public class FileSyncHandler implements ObjSyncHandler<File, FileDto> {
|
|||||||
@Inject
|
@Inject
|
||||||
DhfsFileService fileService;
|
DhfsFileService fileService;
|
||||||
|
|
||||||
private JKleppmannTreeManager.JKleppmannTree getTreeW() {
|
private JKleppmannTreeManager.JKleppmannTree getTree() {
|
||||||
return jKleppmannTreeManager.getTree(JObjectKey.of("fs")).orElseThrow();
|
return jKleppmannTreeManager.getTree(JObjectKey.of("fs")).orElseThrow();
|
||||||
}
|
}
|
||||||
|
|
||||||
private JKleppmannTreeManager.JKleppmannTree getTreeR() {
|
|
||||||
return jKleppmannTreeManager.getTree(JObjectKey.of("fs"), LockingStrategy.OPTIMISTIC).orElseThrow();
|
|
||||||
}
|
|
||||||
|
|
||||||
private void resolveConflict(PeerId from, JObjectKey key, PMap<PeerId, Long> receivedChangelog,
|
private void resolveConflict(PeerId from, JObjectKey key, PMap<PeerId, Long> receivedChangelog,
|
||||||
@Nullable FileDto receivedData) {
|
@Nullable FileDto receivedData) {
|
||||||
var oursCurMeta = curTx.get(RemoteObjectMeta.class, key).orElse(null);
|
var oursCurMeta = curTx.get(RemoteObjectMeta.class, key).orElse(null);
|
||||||
@@ -131,12 +126,12 @@ public class FileSyncHandler implements ObjSyncHandler<File, FileDto> {
|
|||||||
|
|
||||||
do {
|
do {
|
||||||
try {
|
try {
|
||||||
getTreeW().move(parent.getRight(),
|
getTree().move(parent.getRight(),
|
||||||
new JKleppmannTreeNodeMetaFile(
|
new JKleppmannTreeNodeMetaFile(
|
||||||
parent.getLeft() + ".fconflict." + persistentPeerDataService.getSelfUuid() + "." + otherHostname.toString() + "." + i,
|
parent.getLeft() + ".fconflict." + persistentPeerDataService.getSelfUuid() + "." + otherHostname.toString() + "." + i,
|
||||||
newFile.key()
|
newFile.key()
|
||||||
),
|
),
|
||||||
getTreeW().getNewNodeId()
|
getTree().getNewNodeId()
|
||||||
);
|
);
|
||||||
} catch (AlreadyExistsException aex) {
|
} catch (AlreadyExistsException aex) {
|
||||||
i++;
|
i++;
|
||||||
|
|||||||
@@ -19,7 +19,6 @@ import com.usatiuk.dhfsfs.objects.JKleppmannTreeNodeMetaFile;
|
|||||||
import com.usatiuk.objects.JData;
|
import com.usatiuk.objects.JData;
|
||||||
import com.usatiuk.objects.JObjectKey;
|
import com.usatiuk.objects.JObjectKey;
|
||||||
import com.usatiuk.objects.iterators.IteratorStart;
|
import com.usatiuk.objects.iterators.IteratorStart;
|
||||||
import com.usatiuk.objects.transaction.LockingStrategy;
|
|
||||||
import com.usatiuk.objects.transaction.Transaction;
|
import com.usatiuk.objects.transaction.Transaction;
|
||||||
import com.usatiuk.objects.transaction.TransactionManager;
|
import com.usatiuk.objects.transaction.TransactionManager;
|
||||||
import com.usatiuk.utils.StatusRuntimeExceptionNoStacktrace;
|
import com.usatiuk.utils.StatusRuntimeExceptionNoStacktrace;
|
||||||
@@ -68,14 +67,10 @@ public class DhfsFileService {
|
|||||||
@Inject
|
@Inject
|
||||||
JMapHelper jMapHelper;
|
JMapHelper jMapHelper;
|
||||||
|
|
||||||
private JKleppmannTreeManager.JKleppmannTree getTreeW() {
|
private JKleppmannTreeManager.JKleppmannTree getTree() {
|
||||||
return jKleppmannTreeManager.getTree(JObjectKey.of("fs"), () -> new JKleppmannTreeNodeMetaDirectory(""));
|
return jKleppmannTreeManager.getTree(JObjectKey.of("fs"), () -> new JKleppmannTreeNodeMetaDirectory(""));
|
||||||
}
|
}
|
||||||
|
|
||||||
private JKleppmannTreeManager.JKleppmannTree getTreeR() {
|
|
||||||
return jKleppmannTreeManager.getTree(JObjectKey.of("fs"), LockingStrategy.OPTIMISTIC, () -> new JKleppmannTreeNodeMetaDirectory(""));
|
|
||||||
}
|
|
||||||
|
|
||||||
private ChunkData createChunk(ByteString bytes) {
|
private ChunkData createChunk(ByteString bytes) {
|
||||||
var newChunk = new ChunkData(JObjectKey.of(UUID.randomUUID().toString()), bytes);
|
var newChunk = new ChunkData(JObjectKey.of(UUID.randomUUID().toString()), bytes);
|
||||||
remoteTx.putDataNew(newChunk);
|
remoteTx.putDataNew(newChunk);
|
||||||
@@ -84,25 +79,25 @@ public class DhfsFileService {
|
|||||||
|
|
||||||
void init(@Observes @Priority(500) StartupEvent event) {
|
void init(@Observes @Priority(500) StartupEvent event) {
|
||||||
Log.info("Initializing file service");
|
Log.info("Initializing file service");
|
||||||
getTreeW();
|
getTree();
|
||||||
}
|
}
|
||||||
|
|
||||||
private JKleppmannTreeNode getDirEntryW(String name) {
|
private JKleppmannTreeNode getDirEntryW(String name) {
|
||||||
var res = getTreeW().traverse(StreamSupport.stream(Path.of(name).spliterator(), false).map(p -> p.toString()).toList());
|
var res = getTree().traverse(StreamSupport.stream(Path.of(name).spliterator(), false).map(p -> p.toString()).toList());
|
||||||
if (res == null) throw new StatusRuntimeExceptionNoStacktrace(Status.NOT_FOUND);
|
if (res == null) throw new StatusRuntimeExceptionNoStacktrace(Status.NOT_FOUND);
|
||||||
var ret = curTx.get(JKleppmannTreeNodeHolder.class, res).map(JKleppmannTreeNodeHolder::node).orElseThrow(() -> new StatusRuntimeException(Status.NOT_FOUND.withDescription("Tree node exists but not found as jObject: " + name)));
|
var ret = curTx.get(JKleppmannTreeNodeHolder.class, res).map(JKleppmannTreeNodeHolder::node).orElseThrow(() -> new StatusRuntimeException(Status.NOT_FOUND.withDescription("Tree node exists but not found as jObject: " + name)));
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
private JKleppmannTreeNode getDirEntryR(String name) {
|
private JKleppmannTreeNode getDirEntryR(String name) {
|
||||||
var res = getTreeR().traverse(StreamSupport.stream(Path.of(name).spliterator(), false).map(p -> p.toString()).toList());
|
var res = getTree().traverse(StreamSupport.stream(Path.of(name).spliterator(), false).map(p -> p.toString()).toList());
|
||||||
if (res == null) throw new StatusRuntimeExceptionNoStacktrace(Status.NOT_FOUND);
|
if (res == null) throw new StatusRuntimeExceptionNoStacktrace(Status.NOT_FOUND);
|
||||||
var ret = curTx.get(JKleppmannTreeNodeHolder.class, res).map(JKleppmannTreeNodeHolder::node).orElseThrow(() -> new StatusRuntimeException(Status.NOT_FOUND.withDescription("Tree node exists but not found as jObject: " + name)));
|
var ret = curTx.get(JKleppmannTreeNodeHolder.class, res).map(JKleppmannTreeNodeHolder::node).orElseThrow(() -> new StatusRuntimeException(Status.NOT_FOUND.withDescription("Tree node exists but not found as jObject: " + name)));
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
private Optional<JKleppmannTreeNode> getDirEntryOpt(String name) {
|
private Optional<JKleppmannTreeNode> getDirEntryOpt(String name) {
|
||||||
var res = getTreeW().traverse(StreamSupport.stream(Path.of(name).spliterator(), false).map(p -> p.toString()).toList());
|
var res = getTree().traverse(StreamSupport.stream(Path.of(name).spliterator(), false).map(p -> p.toString()).toList());
|
||||||
if (res == null) return Optional.empty();
|
if (res == null) return Optional.empty();
|
||||||
var ret = curTx.get(JKleppmannTreeNodeHolder.class, res).map(JKleppmannTreeNodeHolder::node);
|
var ret = curTx.get(JKleppmannTreeNodeHolder.class, res).map(JKleppmannTreeNodeHolder::node);
|
||||||
return ret;
|
return ret;
|
||||||
@@ -167,7 +162,7 @@ public class DhfsFileService {
|
|||||||
remoteTx.putData(f);
|
remoteTx.putData(f);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
getTreeW().move(parent.key(), new JKleppmannTreeNodeMetaFile(fname, f.key()), getTreeW().getNewNodeId());
|
getTree().move(parent.key(), new JKleppmannTreeNodeMetaFile(fname, f.key()), getTree().getNewNodeId());
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
// fobj.getMeta().removeRef(newNodeId);
|
// fobj.getMeta().removeRef(newNodeId);
|
||||||
throw e;
|
throw e;
|
||||||
@@ -179,7 +174,7 @@ public class DhfsFileService {
|
|||||||
//FIXME: Slow..
|
//FIXME: Slow..
|
||||||
public Pair<String, JObjectKey> inoToParent(JObjectKey ino) {
|
public Pair<String, JObjectKey> inoToParent(JObjectKey ino) {
|
||||||
return jObjectTxManager.executeTx(() -> {
|
return jObjectTxManager.executeTx(() -> {
|
||||||
return getTreeW().findParent(w -> {
|
return getTree().findParent(w -> {
|
||||||
if (w.meta() instanceof JKleppmannTreeNodeMetaFile f)
|
if (w.meta() instanceof JKleppmannTreeNodeMetaFile f)
|
||||||
return f.fileIno().equals(ino);
|
return f.fileIno().equals(ino);
|
||||||
return false;
|
return false;
|
||||||
@@ -197,7 +192,7 @@ public class DhfsFileService {
|
|||||||
|
|
||||||
Log.debug("Creating directory " + name);
|
Log.debug("Creating directory " + name);
|
||||||
|
|
||||||
getTreeW().move(parent.key(), new JKleppmannTreeNodeMetaDirectory(dname), getTreeW().getNewNodeId());
|
getTree().move(parent.key(), new JKleppmannTreeNodeMetaDirectory(dname), getTree().getNewNodeId());
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -210,7 +205,7 @@ public class DhfsFileService {
|
|||||||
if (!allowRecursiveDelete && !node.children().isEmpty())
|
if (!allowRecursiveDelete && !node.children().isEmpty())
|
||||||
throw new DirectoryNotEmptyException();
|
throw new DirectoryNotEmptyException();
|
||||||
}
|
}
|
||||||
getTreeW().trash(node.meta(), node.key());
|
getTree().trash(node.meta(), node.key());
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -223,7 +218,7 @@ public class DhfsFileService {
|
|||||||
var toDentry = getDirEntryW(toPath.getParent().toString());
|
var toDentry = getDirEntryW(toPath.getParent().toString());
|
||||||
ensureDir(toDentry);
|
ensureDir(toDentry);
|
||||||
|
|
||||||
getTreeW().move(toDentry.key(), meta.withName(toPath.getFileName().toString()), node.key());
|
getTree().move(toDentry.key(), meta.withName(toPath.getFileName().toString()), node.key());
|
||||||
return true;
|
return true;
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
@@ -344,7 +339,7 @@ public class DhfsFileService {
|
|||||||
if (offset < 0)
|
if (offset < 0)
|
||||||
throw new StatusRuntimeException(Status.INVALID_ARGUMENT.withDescription("Offset should be more than zero: " + offset));
|
throw new StatusRuntimeException(Status.INVALID_ARGUMENT.withDescription("Offset should be more than zero: " + offset));
|
||||||
|
|
||||||
var file = remoteTx.getData(File.class, fileUuid, LockingStrategy.WRITE).orElse(null);
|
var file = remoteTx.getData(File.class, fileUuid).orElse(null);
|
||||||
if (file == null) {
|
if (file == null) {
|
||||||
throw new StatusRuntimeException(Status.NOT_FOUND.withDescription("File not found when trying to write: " + fileUuid));
|
throw new StatusRuntimeException(Status.NOT_FOUND.withDescription("File not found when trying to write: " + fileUuid));
|
||||||
}
|
}
|
||||||
@@ -595,7 +590,7 @@ public class DhfsFileService {
|
|||||||
jMapHelper.put(f, JMapLongKey.of(0), newChunkData.key());
|
jMapHelper.put(f, JMapLongKey.of(0), newChunkData.key());
|
||||||
|
|
||||||
remoteTx.putData(f);
|
remoteTx.putData(f);
|
||||||
getTreeW().move(parent.key(), new JKleppmannTreeNodeMetaFile(fname, f.key()), getTreeW().getNewNodeId());
|
getTree().move(parent.key(), new JKleppmannTreeNodeMetaFile(fname, f.key()), getTree().getNewNodeId());
|
||||||
return f.key();
|
return f.key();
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -139,7 +139,7 @@
|
|||||||
<groupId>org.apache.maven.plugins</groupId>
|
<groupId>org.apache.maven.plugins</groupId>
|
||||||
<artifactId>maven-failsafe-plugin</artifactId>
|
<artifactId>maven-failsafe-plugin</artifactId>
|
||||||
<configuration>
|
<configuration>
|
||||||
<forkCount>1C</forkCount>
|
<forkCount>0.5C</forkCount>
|
||||||
<reuseForks>false</reuseForks>
|
<reuseForks>false</reuseForks>
|
||||||
<parallel>classes</parallel>
|
<parallel>classes</parallel>
|
||||||
<systemPropertyVariables>
|
<systemPropertyVariables>
|
||||||
|
|||||||
@@ -25,8 +25,8 @@ public class CurrentTransaction implements Transaction {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public <T extends JData> Optional<T> get(Class<T> type, JObjectKey key, LockingStrategy strategy) {
|
public <T extends JData> Optional<T> get(Class<T> type, JObjectKey key) {
|
||||||
return transactionManager.current().get(type, key, strategy);
|
return transactionManager.current().get(type, key);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|||||||
@@ -1,6 +0,0 @@
|
|||||||
package com.usatiuk.objects.transaction;
|
|
||||||
|
|
||||||
public enum LockingStrategy {
|
|
||||||
OPTIMISTIC, // Optimistic write, no blocking other possible writers/readers
|
|
||||||
WRITE, // Write lock, blocks all other writers
|
|
||||||
}
|
|
||||||
@@ -11,17 +11,13 @@ import java.util.Optional;
|
|||||||
public interface Transaction extends TransactionHandle {
|
public interface Transaction extends TransactionHandle {
|
||||||
void onCommit(Runnable runnable);
|
void onCommit(Runnable runnable);
|
||||||
|
|
||||||
<T extends JData> Optional<T> get(Class<T> type, JObjectKey key, LockingStrategy strategy);
|
<T extends JData> Optional<T> get(Class<T> type, JObjectKey key);
|
||||||
|
|
||||||
<T extends JData> void put(JData obj);
|
<T extends JData> void put(JData obj);
|
||||||
<T extends JData> void putNew(JData obj);
|
<T extends JData> void putNew(JData obj);
|
||||||
|
|
||||||
void delete(JObjectKey key);
|
void delete(JObjectKey key);
|
||||||
|
|
||||||
default <T extends JData> Optional<T> get(Class<T> type, JObjectKey key) {
|
|
||||||
return get(type, key, LockingStrategy.OPTIMISTIC);
|
|
||||||
}
|
|
||||||
|
|
||||||
CloseableKvIterator<JObjectKey, JData> getIterator(IteratorStart start, JObjectKey key);
|
CloseableKvIterator<JObjectKey, JData> getIterator(IteratorStart start, JObjectKey key);
|
||||||
|
|
||||||
default CloseableKvIterator<JObjectKey, JData> getIterator(JObjectKey key) {
|
default CloseableKvIterator<JObjectKey, JData> getIterator(JObjectKey key) {
|
||||||
|
|||||||
@@ -108,7 +108,7 @@ public class TransactionFactoryImpl implements TransactionFactory {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public <T extends JData> Optional<T> get(Class<T> type, JObjectKey key, LockingStrategy strategy) {
|
public <T extends JData> Optional<T> get(Class<T> type, JObjectKey key) {
|
||||||
return switch (_writes.get(key)) {
|
return switch (_writes.get(key)) {
|
||||||
case TxRecord.TxObjectRecordWrite<?> write -> Optional.of(type.cast(write.data()));
|
case TxRecord.TxObjectRecordWrite<?> write -> Optional.of(type.cast(write.data()));
|
||||||
case TxRecord.TxObjectRecordDeleted deleted -> Optional.empty();
|
case TxRecord.TxObjectRecordDeleted deleted -> Optional.empty();
|
||||||
|
|||||||
@@ -2,14 +2,11 @@ package com.usatiuk.objects;
|
|||||||
|
|
||||||
import com.usatiuk.objects.data.Parent;
|
import com.usatiuk.objects.data.Parent;
|
||||||
import com.usatiuk.objects.iterators.IteratorStart;
|
import com.usatiuk.objects.iterators.IteratorStart;
|
||||||
import com.usatiuk.objects.transaction.LockingStrategy;
|
|
||||||
import com.usatiuk.objects.transaction.Transaction;
|
import com.usatiuk.objects.transaction.Transaction;
|
||||||
import com.usatiuk.objects.transaction.TransactionManager;
|
import com.usatiuk.objects.transaction.TransactionManager;
|
||||||
import io.quarkus.logging.Log;
|
import io.quarkus.logging.Log;
|
||||||
import jakarta.inject.Inject;
|
import jakarta.inject.Inject;
|
||||||
import org.junit.jupiter.api.*;
|
import org.junit.jupiter.api.*;
|
||||||
import org.junit.jupiter.params.ParameterizedTest;
|
|
||||||
import org.junit.jupiter.params.provider.EnumSource;
|
|
||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
@@ -151,12 +148,12 @@ public abstract class ObjectsTestImpl {
|
|||||||
});
|
});
|
||||||
|
|
||||||
txm.run(() -> {
|
txm.run(() -> {
|
||||||
var parent = curTx.get(Parent.class, JObjectKey.of(testInfo.getDisplayName() + "Parent3"), LockingStrategy.OPTIMISTIC).orElse(null);
|
var parent = curTx.get(Parent.class, JObjectKey.of(testInfo.getDisplayName() + "Parent3")).orElse(null);
|
||||||
Assertions.assertEquals("John", parent.name());
|
Assertions.assertEquals("John", parent.name());
|
||||||
curTx.put(parent.withName("John2"));
|
curTx.put(parent.withName("John2"));
|
||||||
});
|
});
|
||||||
txm.run(() -> {
|
txm.run(() -> {
|
||||||
var parent = curTx.get(Parent.class, JObjectKey.of(testInfo.getDisplayName() + "Parent3"), LockingStrategy.WRITE).orElse(null);
|
var parent = curTx.get(Parent.class, JObjectKey.of(testInfo.getDisplayName() + "Parent3")).orElse(null);
|
||||||
Assertions.assertEquals("John2", parent.name());
|
Assertions.assertEquals("John2", parent.name());
|
||||||
curTx.put(parent.withName("John3"));
|
curTx.put(parent.withName("John3"));
|
||||||
});
|
});
|
||||||
@@ -236,10 +233,9 @@ public abstract class ObjectsTestImpl {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ParameterizedTest
|
@Test
|
||||||
@EnumSource(LockingStrategy.class)
|
void editConflict(TestInfo testInfo) {
|
||||||
void editConflict(LockingStrategy strategy, TestInfo testInfo) {
|
String key = testInfo.getDisplayName() + "Parent4";
|
||||||
String key = testInfo.getDisplayName() + "Parent4" + strategy.name();
|
|
||||||
txm.run(() -> {
|
txm.run(() -> {
|
||||||
var newParent = new Parent(JObjectKey.of(key), "John3");
|
var newParent = new Parent(JObjectKey.of(key), "John3");
|
||||||
curTx.put(newParent);
|
curTx.put(newParent);
|
||||||
@@ -260,7 +256,7 @@ public abstract class ObjectsTestImpl {
|
|||||||
} catch (Throwable e) {
|
} catch (Throwable e) {
|
||||||
throw new RuntimeException(e);
|
throw new RuntimeException(e);
|
||||||
}
|
}
|
||||||
var parent = curTx.get(Parent.class, JObjectKey.of(key), strategy).orElse(null);
|
var parent = curTx.get(Parent.class, JObjectKey.of(key)).orElse(null);
|
||||||
curTx.put(parent.withName("John"));
|
curTx.put(parent.withName("John"));
|
||||||
Log.warn("Thread 1 commit");
|
Log.warn("Thread 1 commit");
|
||||||
}, 0);
|
}, 0);
|
||||||
@@ -276,7 +272,7 @@ public abstract class ObjectsTestImpl {
|
|||||||
Log.warn("Thread 2");
|
Log.warn("Thread 2");
|
||||||
barrier.await(); // Ensure thread 2 tx id is larger than thread 1
|
barrier.await(); // Ensure thread 2 tx id is larger than thread 1
|
||||||
txm.runTries(() -> {
|
txm.runTries(() -> {
|
||||||
var parent = curTx.get(Parent.class, JObjectKey.of(key), strategy).orElse(null);
|
var parent = curTx.get(Parent.class, JObjectKey.of(key)).orElse(null);
|
||||||
curTx.put(parent.withName("John2"));
|
curTx.put(parent.withName("John2"));
|
||||||
Log.warn("Thread 2 commit");
|
Log.warn("Thread 2 commit");
|
||||||
}, 0);
|
}, 0);
|
||||||
@@ -317,10 +313,9 @@ public abstract class ObjectsTestImpl {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ParameterizedTest
|
@Test
|
||||||
@EnumSource(LockingStrategy.class)
|
void editConflict2(TestInfo testInfo) {
|
||||||
void editConflict2(LockingStrategy strategy, TestInfo testInfo) {
|
String key = testInfo.getDisplayName() + "EditConflict2";
|
||||||
String key = testInfo.getDisplayName() + "EditConflict2" + strategy.name();
|
|
||||||
txm.run(() -> {
|
txm.run(() -> {
|
||||||
var newParent = new Parent(JObjectKey.of(key), "John3");
|
var newParent = new Parent(JObjectKey.of(key), "John3");
|
||||||
curTx.put(newParent);
|
curTx.put(newParent);
|
||||||
@@ -341,7 +336,7 @@ public abstract class ObjectsTestImpl {
|
|||||||
} catch (Throwable e) {
|
} catch (Throwable e) {
|
||||||
throw new RuntimeException(e);
|
throw new RuntimeException(e);
|
||||||
}
|
}
|
||||||
var parent = curTx.get(Parent.class, JObjectKey.of(key), strategy).orElse(null);
|
var parent = curTx.get(Parent.class, JObjectKey.of(key)).orElse(null);
|
||||||
curTx.put(parent.withName("John"));
|
curTx.put(parent.withName("John"));
|
||||||
Log.warn("Thread 1 commit");
|
Log.warn("Thread 1 commit");
|
||||||
}, 0);
|
}, 0);
|
||||||
@@ -362,7 +357,7 @@ public abstract class ObjectsTestImpl {
|
|||||||
} catch (Throwable e) {
|
} catch (Throwable e) {
|
||||||
throw new RuntimeException(e);
|
throw new RuntimeException(e);
|
||||||
}
|
}
|
||||||
var parent = curTx.get(Parent.class, JObjectKey.of(key), strategy).orElse(null);
|
var parent = curTx.get(Parent.class, JObjectKey.of(key)).orElse(null);
|
||||||
curTx.put(parent.withName("John2"));
|
curTx.put(parent.withName("John2"));
|
||||||
Log.warn("Thread 2 commit");
|
Log.warn("Thread 2 commit");
|
||||||
}, 0);
|
}, 0);
|
||||||
@@ -922,10 +917,8 @@ public abstract class ObjectsTestImpl {
|
|||||||
() -> createGetObject(testInfo),
|
() -> createGetObject(testInfo),
|
||||||
() -> createDeleteObject(testInfo),
|
() -> createDeleteObject(testInfo),
|
||||||
() -> createCreateObject(testInfo),
|
() -> createCreateObject(testInfo),
|
||||||
() -> editConflict(LockingStrategy.WRITE, testInfo),
|
() -> editConflict(testInfo),
|
||||||
() -> editConflict(LockingStrategy.OPTIMISTIC, testInfo),
|
() -> editConflict2(testInfo),
|
||||||
() -> editConflict2(LockingStrategy.WRITE, testInfo),
|
|
||||||
() -> editConflict2(LockingStrategy.OPTIMISTIC, testInfo),
|
|
||||||
() -> snapshotTest1(testInfo),
|
() -> snapshotTest1(testInfo),
|
||||||
() -> snapshotTest2(testInfo),
|
() -> snapshotTest2(testInfo),
|
||||||
() -> snapshotTest3(testInfo),
|
() -> snapshotTest3(testInfo),
|
||||||
|
|||||||
@@ -2,7 +2,7 @@ package com.usatiuk.dhfs.invalidation;
|
|||||||
|
|
||||||
import com.usatiuk.dhfs.peersync.PeerConnectedEventListener;
|
import com.usatiuk.dhfs.peersync.PeerConnectedEventListener;
|
||||||
import com.usatiuk.dhfs.peersync.PeerId;
|
import com.usatiuk.dhfs.peersync.PeerId;
|
||||||
import com.usatiuk.dhfs.peersync.ConnectedPeerManager;
|
import com.usatiuk.dhfs.peersync.ReachablePeerManager;
|
||||||
import com.usatiuk.utils.SerializationHelper;
|
import com.usatiuk.utils.SerializationHelper;
|
||||||
import io.quarkus.logging.Log;
|
import io.quarkus.logging.Log;
|
||||||
import io.quarkus.runtime.ShutdownEvent;
|
import io.quarkus.runtime.ShutdownEvent;
|
||||||
@@ -24,7 +24,7 @@ import java.nio.file.Paths;
|
|||||||
public class DeferredInvalidationQueueService implements PeerConnectedEventListener {
|
public class DeferredInvalidationQueueService implements PeerConnectedEventListener {
|
||||||
private static final String dataFileName = "invqueue";
|
private static final String dataFileName = "invqueue";
|
||||||
@Inject
|
@Inject
|
||||||
ConnectedPeerManager remoteHostManager;
|
ReachablePeerManager reachablePeerManager;
|
||||||
@Inject
|
@Inject
|
||||||
InvalidationQueueService invalidationQueueService;
|
InvalidationQueueService invalidationQueueService;
|
||||||
@ConfigProperty(name = "dhfs.objects.persistence.files.root")
|
@ConfigProperty(name = "dhfs.objects.persistence.files.root")
|
||||||
@@ -63,7 +63,7 @@ public class DeferredInvalidationQueueService implements PeerConnectedEventListe
|
|||||||
@Scheduled(every = "15s", concurrentExecution = Scheduled.ConcurrentExecution.SKIP)
|
@Scheduled(every = "15s", concurrentExecution = Scheduled.ConcurrentExecution.SKIP)
|
||||||
@Blocking
|
@Blocking
|
||||||
void periodicReturn() {
|
void periodicReturn() {
|
||||||
for (var reachable : remoteHostManager.getAvailableHosts())
|
for (var reachable : reachablePeerManager.getAvailableHosts())
|
||||||
returnForHost(reachable);
|
returnForHost(reachable);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -2,7 +2,7 @@ package com.usatiuk.dhfs.invalidation;
|
|||||||
|
|
||||||
import com.usatiuk.dhfs.peersync.PeerId;
|
import com.usatiuk.dhfs.peersync.PeerId;
|
||||||
import com.usatiuk.dhfs.peersync.PeerInfoService;
|
import com.usatiuk.dhfs.peersync.PeerInfoService;
|
||||||
import com.usatiuk.dhfs.peersync.ConnectedPeerManager;
|
import com.usatiuk.dhfs.peersync.ReachablePeerManager;
|
||||||
import com.usatiuk.dhfs.peersync.PersistentPeerDataService;
|
import com.usatiuk.dhfs.peersync.PersistentPeerDataService;
|
||||||
import com.usatiuk.dhfs.rpc.RemoteObjectServiceClient;
|
import com.usatiuk.dhfs.rpc.RemoteObjectServiceClient;
|
||||||
import com.usatiuk.objects.JData;
|
import com.usatiuk.objects.JData;
|
||||||
@@ -37,7 +37,7 @@ public class InvalidationQueueService {
|
|||||||
private final AtomicReference<ConcurrentHashSet<JObjectKey>> _toAllQueue = new AtomicReference<>(new ConcurrentHashSet<>());
|
private final AtomicReference<ConcurrentHashSet<JObjectKey>> _toAllQueue = new AtomicReference<>(new ConcurrentHashSet<>());
|
||||||
private final DataLocker _locker = new DataLocker();
|
private final DataLocker _locker = new DataLocker();
|
||||||
@Inject
|
@Inject
|
||||||
ConnectedPeerManager remoteHostManager;
|
ReachablePeerManager reachablePeerManager;
|
||||||
@Inject
|
@Inject
|
||||||
DeferredInvalidationQueueService deferredInvalidationQueueService;
|
DeferredInvalidationQueueService deferredInvalidationQueueService;
|
||||||
@Inject
|
@Inject
|
||||||
@@ -103,7 +103,7 @@ public class InvalidationQueueService {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (toAllQueue != null) {
|
if (toAllQueue != null) {
|
||||||
var hostInfo = remoteHostManager.getHostStateSnapshot();
|
var hostInfo = reachablePeerManager.getHostStateSnapshot();
|
||||||
for (var o : toAllQueue) {
|
for (var o : toAllQueue) {
|
||||||
for (var h : hostInfo.available())
|
for (var h : hostInfo.available())
|
||||||
_queue.add(new InvalidationQueueEntry(h, o));
|
_queue.add(new InvalidationQueueEntry(h, o));
|
||||||
@@ -129,7 +129,7 @@ public class InvalidationQueueService {
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!remoteHostManager.isReachable(e.peer())) {
|
if (!reachablePeerManager.isReachable(e.peer())) {
|
||||||
deferredInvalidationQueueService.defer(e);
|
deferredInvalidationQueueService.defer(e);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
@@ -210,14 +210,14 @@ public class InvalidationQueueService {
|
|||||||
}
|
}
|
||||||
|
|
||||||
void pushInvalidationToOne(InvalidationQueueEntry entry) {
|
void pushInvalidationToOne(InvalidationQueueEntry entry) {
|
||||||
if (remoteHostManager.isReachable(entry.peer()))
|
if (reachablePeerManager.isReachable(entry.peer()))
|
||||||
_queue.add(entry);
|
_queue.add(entry);
|
||||||
else
|
else
|
||||||
deferredInvalidationQueueService.defer(entry);
|
deferredInvalidationQueueService.defer(entry);
|
||||||
}
|
}
|
||||||
|
|
||||||
void pushInvalidationToOneNoDelay(InvalidationQueueEntry entry) {
|
void pushInvalidationToOneNoDelay(InvalidationQueueEntry entry) {
|
||||||
if (remoteHostManager.isReachable(entry.peer()))
|
if (reachablePeerManager.isReachable(entry.peer()))
|
||||||
_queue.addNoDelay(entry);
|
_queue.addNoDelay(entry);
|
||||||
else
|
else
|
||||||
deferredInvalidationQueueService.defer(entry);
|
deferredInvalidationQueueService.defer(entry);
|
||||||
|
|||||||
@@ -10,7 +10,6 @@ import com.usatiuk.dhfs.peersync.PeerInfoService;
|
|||||||
import com.usatiuk.dhfs.peersync.PersistentPeerDataService;
|
import com.usatiuk.dhfs.peersync.PersistentPeerDataService;
|
||||||
import com.usatiuk.kleppmanntree.*;
|
import com.usatiuk.kleppmanntree.*;
|
||||||
import com.usatiuk.objects.JObjectKey;
|
import com.usatiuk.objects.JObjectKey;
|
||||||
import com.usatiuk.objects.transaction.LockingStrategy;
|
|
||||||
import com.usatiuk.objects.transaction.Transaction;
|
import com.usatiuk.objects.transaction.Transaction;
|
||||||
import com.usatiuk.objects.transaction.TransactionManager;
|
import com.usatiuk.objects.transaction.TransactionManager;
|
||||||
import io.quarkus.logging.Log;
|
import io.quarkus.logging.Log;
|
||||||
@@ -39,9 +38,9 @@ public class JKleppmannTreeManager {
|
|||||||
@Inject
|
@Inject
|
||||||
PersistentPeerDataService persistentPeerDataService;
|
PersistentPeerDataService persistentPeerDataService;
|
||||||
|
|
||||||
public JKleppmannTree getTree(JObjectKey name, LockingStrategy lockingStrategy, Supplier<JKleppmannTreeNodeMeta> rootNodeSupplier) {
|
public JKleppmannTree getTree(JObjectKey name, Supplier<JKleppmannTreeNodeMeta> rootNodeSupplier) {
|
||||||
return txManager.executeTx(() -> {
|
return txManager.executeTx(() -> {
|
||||||
var data = curTx.get(JKleppmannTreePersistentData.class, name, lockingStrategy).orElse(null);
|
var data = curTx.get(JKleppmannTreePersistentData.class, name).orElse(null);
|
||||||
if (data == null) {
|
if (data == null) {
|
||||||
data = new JKleppmannTreePersistentData(
|
data = new JKleppmannTreePersistentData(
|
||||||
name,
|
name,
|
||||||
@@ -66,18 +65,11 @@ public class JKleppmannTreeManager {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public Optional<JKleppmannTree> getTree(JObjectKey name) {
|
public Optional<JKleppmannTree> getTree(JObjectKey name) {
|
||||||
return getTree(name, LockingStrategy.WRITE);
|
|
||||||
}
|
|
||||||
|
|
||||||
public Optional<JKleppmannTree> getTree(JObjectKey name, LockingStrategy lockingStrategy) {
|
|
||||||
return txManager.executeTx(() -> {
|
return txManager.executeTx(() -> {
|
||||||
return curTx.get(JKleppmannTreePersistentData.class, name, lockingStrategy).map(JKleppmannTree::new);
|
return curTx.get(JKleppmannTreePersistentData.class, name).map(JKleppmannTree::new);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
public JKleppmannTree getTree(JObjectKey name, Supplier<JKleppmannTreeNodeMeta> rootNodeSupplier) {
|
|
||||||
return getTree(name, LockingStrategy.WRITE, rootNodeSupplier);
|
|
||||||
}
|
|
||||||
|
|
||||||
public class JKleppmannTree {
|
public class JKleppmannTree {
|
||||||
private final KleppmannTree<Long, PeerId, JKleppmannTreeNodeMeta, JObjectKey> _tree;
|
private final KleppmannTree<Long, PeerId, JKleppmannTreeNodeMeta, JObjectKey> _tree;
|
||||||
|
|||||||
@@ -5,7 +5,6 @@ import com.usatiuk.dhfs.jkleppmanntree.structs.JKleppmannTreeNodeHolder;
|
|||||||
import com.usatiuk.dhfs.peersync.structs.JKleppmannTreeNodeMetaPeer;
|
import com.usatiuk.dhfs.peersync.structs.JKleppmannTreeNodeMetaPeer;
|
||||||
import com.usatiuk.dhfs.remoteobj.RemoteTransaction;
|
import com.usatiuk.dhfs.remoteobj.RemoteTransaction;
|
||||||
import com.usatiuk.objects.JObjectKey;
|
import com.usatiuk.objects.JObjectKey;
|
||||||
import com.usatiuk.objects.transaction.LockingStrategy;
|
|
||||||
import com.usatiuk.objects.transaction.Transaction;
|
import com.usatiuk.objects.transaction.Transaction;
|
||||||
import com.usatiuk.objects.transaction.TransactionManager;
|
import com.usatiuk.objects.transaction.TransactionManager;
|
||||||
import io.quarkus.logging.Log;
|
import io.quarkus.logging.Log;
|
||||||
@@ -29,14 +28,10 @@ public class PeerInfoService {
|
|||||||
@Inject
|
@Inject
|
||||||
RemoteTransaction remoteTx;
|
RemoteTransaction remoteTx;
|
||||||
|
|
||||||
private JKleppmannTreeManager.JKleppmannTree getTreeW() {
|
private JKleppmannTreeManager.JKleppmannTree getTree() {
|
||||||
return jKleppmannTreeManager.getTree(TREE_KEY, () -> null);
|
return jKleppmannTreeManager.getTree(TREE_KEY, () -> null);
|
||||||
}
|
}
|
||||||
|
|
||||||
private JKleppmannTreeManager.JKleppmannTree getTreeR() {
|
|
||||||
return jKleppmannTreeManager.getTree(TREE_KEY, LockingStrategy.OPTIMISTIC, () -> null);
|
|
||||||
}
|
|
||||||
|
|
||||||
public Optional<PeerInfo> getPeerInfoImpl(JObjectKey key) {
|
public Optional<PeerInfo> getPeerInfoImpl(JObjectKey key) {
|
||||||
return jObjectTxManager.run(() -> {
|
return jObjectTxManager.run(() -> {
|
||||||
return curTx.get(JKleppmannTreeNodeHolder.class, key).map(JKleppmannTreeNodeHolder::node).flatMap(node -> {
|
return curTx.get(JKleppmannTreeNodeHolder.class, key).map(JKleppmannTreeNodeHolder::node).flatMap(node -> {
|
||||||
@@ -49,7 +44,7 @@ public class PeerInfoService {
|
|||||||
|
|
||||||
public boolean existsPeer(PeerId peer) {
|
public boolean existsPeer(PeerId peer) {
|
||||||
return jObjectTxManager.run(() -> {
|
return jObjectTxManager.run(() -> {
|
||||||
var gotKey = getTreeR().traverse(List.of(JKleppmannTreeNodeMetaPeer.peerIdToNodeId(peer).value()));
|
var gotKey = getTree().traverse(List.of(JKleppmannTreeNodeMetaPeer.peerIdToNodeId(peer).value()));
|
||||||
if (gotKey == null) {
|
if (gotKey == null) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
@@ -59,7 +54,7 @@ public class PeerInfoService {
|
|||||||
|
|
||||||
public Optional<PeerInfo> getPeerInfo(PeerId peer) {
|
public Optional<PeerInfo> getPeerInfo(PeerId peer) {
|
||||||
return jObjectTxManager.run(() -> {
|
return jObjectTxManager.run(() -> {
|
||||||
var gotKey = getTreeR().traverse(List.of(JKleppmannTreeNodeMetaPeer.peerIdToNodeId(peer).value()));
|
var gotKey = getTree().traverse(List.of(JKleppmannTreeNodeMetaPeer.peerIdToNodeId(peer).value()));
|
||||||
if (gotKey == null) {
|
if (gotKey == null) {
|
||||||
return Optional.empty();
|
return Optional.empty();
|
||||||
}
|
}
|
||||||
@@ -72,7 +67,7 @@ public class PeerInfoService {
|
|||||||
|
|
||||||
public List<PeerInfo> getPeers() {
|
public List<PeerInfo> getPeers() {
|
||||||
return jObjectTxManager.run(() -> {
|
return jObjectTxManager.run(() -> {
|
||||||
var gotKey = getTreeR().traverse(List.of());
|
var gotKey = getTree().traverse(List.of());
|
||||||
return curTx.get(JKleppmannTreeNodeHolder.class, gotKey).map(JKleppmannTreeNodeHolder::node).map(
|
return curTx.get(JKleppmannTreeNodeHolder.class, gotKey).map(JKleppmannTreeNodeHolder::node).map(
|
||||||
node -> node.children().keySet().stream()
|
node -> node.children().keySet().stream()
|
||||||
.map(JObjectKey::of).map(this::getPeerInfoImpl)
|
.map(JObjectKey::of).map(this::getPeerInfoImpl)
|
||||||
@@ -113,16 +108,16 @@ public class PeerInfoService {
|
|||||||
|
|
||||||
public void putPeer(PeerId id, byte[] cert) {
|
public void putPeer(PeerId id, byte[] cert) {
|
||||||
jObjectTxManager.run(() -> {
|
jObjectTxManager.run(() -> {
|
||||||
var parent = getTreeW().traverse(List.of());
|
var parent = getTree().traverse(List.of());
|
||||||
var newPeerInfo = new PeerInfo(id, cert);
|
var newPeerInfo = new PeerInfo(id, cert);
|
||||||
remoteTx.putData(newPeerInfo);
|
remoteTx.putData(newPeerInfo);
|
||||||
getTreeW().move(parent, new JKleppmannTreeNodeMetaPeer(newPeerInfo.id()), JKleppmannTreeNodeMetaPeer.peerIdToNodeId(newPeerInfo.id()));
|
getTree().move(parent, new JKleppmannTreeNodeMetaPeer(newPeerInfo.id()), JKleppmannTreeNodeMetaPeer.peerIdToNodeId(newPeerInfo.id()));
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
public void removePeer(PeerId id) {
|
public void removePeer(PeerId id) {
|
||||||
jObjectTxManager.run(() -> {
|
jObjectTxManager.run(() -> {
|
||||||
var gotKey = getTreeR().traverse(List.of(JKleppmannTreeNodeMetaPeer.peerIdToNodeId(id).value()));
|
var gotKey = getTree().traverse(List.of(JKleppmannTreeNodeMetaPeer.peerIdToNodeId(id).value()));
|
||||||
if (gotKey == null) {
|
if (gotKey == null) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@@ -131,7 +126,7 @@ public class PeerInfoService {
|
|||||||
Log.warn("Peer " + id + " not found in the tree");
|
Log.warn("Peer " + id + " not found in the tree");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
getTreeW().trash(node.meta(), node.key());
|
getTree().trash(node.meta(), node.key());
|
||||||
curTx.onCommit(persistentPeerDataService::updateCerts);
|
curTx.onCommit(persistentPeerDataService::updateCerts);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -12,7 +12,7 @@ import org.eclipse.microprofile.config.inject.ConfigProperty;
|
|||||||
@ApplicationScoped
|
@ApplicationScoped
|
||||||
public class PeerLastSeenUpdater {
|
public class PeerLastSeenUpdater {
|
||||||
@Inject
|
@Inject
|
||||||
ConnectedPeerManager connectedPeerManager;
|
ReachablePeerManager reachablePeerManager;
|
||||||
@Inject
|
@Inject
|
||||||
PeerInfoService peerInfoService;
|
PeerInfoService peerInfoService;
|
||||||
@Inject
|
@Inject
|
||||||
@@ -30,7 +30,7 @@ public class PeerLastSeenUpdater {
|
|||||||
@Scheduled(every = "${dhfs.objects.last-seen.update}", concurrentExecution = Scheduled.ConcurrentExecution.SKIP)
|
@Scheduled(every = "${dhfs.objects.last-seen.update}", concurrentExecution = Scheduled.ConcurrentExecution.SKIP)
|
||||||
@Blocking
|
@Blocking
|
||||||
void update() {
|
void update() {
|
||||||
var snapshot = connectedPeerManager.getHostStateSnapshot();
|
var snapshot = reachablePeerManager.getHostStateSnapshot();
|
||||||
for (var a : snapshot.available()) {
|
for (var a : snapshot.available()) {
|
||||||
txm.run(() -> {
|
txm.run(() -> {
|
||||||
var curInfo = remoteTransaction.getData(PeerInfo.class, a.id()).orElse(null);
|
var curInfo = remoteTransaction.getData(PeerInfo.class, a.id()).orElse(null);
|
||||||
|
|||||||
@@ -52,7 +52,7 @@ public class PersistentPeerDataService {
|
|||||||
@Inject
|
@Inject
|
||||||
TransactionManager txm;
|
TransactionManager txm;
|
||||||
@Inject
|
@Inject
|
||||||
ConnectedPeerManager connectedPeerManager;
|
ReachablePeerManager reachablePeerManager;
|
||||||
|
|
||||||
@ConfigProperty(name = "dhfs.peerdiscovery.preset-uuid")
|
@ConfigProperty(name = "dhfs.peerdiscovery.preset-uuid")
|
||||||
Optional<String> presetUuid;
|
Optional<String> presetUuid;
|
||||||
@@ -135,7 +135,7 @@ public class PersistentPeerDataService {
|
|||||||
}
|
}
|
||||||
curTx.put(data.withInitialSyncDone(data.initialSyncDone().minus(peerId)));
|
curTx.put(data.withInitialSyncDone(data.initialSyncDone().minus(peerId)));
|
||||||
Log.infov("Did reset sync state for {0}", peerId);
|
Log.infov("Did reset sync state for {0}", peerId);
|
||||||
curTx.onCommit(() -> connectedPeerManager.handleConnectionError(peerId));
|
curTx.onCommit(() -> reachablePeerManager.handleConnectionError(peerId));
|
||||||
return true;
|
return true;
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -30,7 +30,7 @@ import java.util.stream.Collectors;
|
|||||||
import java.util.stream.Stream;
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
@ApplicationScoped
|
@ApplicationScoped
|
||||||
public class ConnectedPeerManager {
|
public class ReachablePeerManager {
|
||||||
private final ConcurrentMap<PeerId, PeerAddress> _states = new ConcurrentHashMap<>();
|
private final ConcurrentMap<PeerId, PeerAddress> _states = new ConcurrentHashMap<>();
|
||||||
private final Collection<PeerConnectedEventListener> _connectedListeners;
|
private final Collection<PeerConnectedEventListener> _connectedListeners;
|
||||||
private final Collection<PeerDisconnectedEventListener> _disconnectedListeners;
|
private final Collection<PeerDisconnectedEventListener> _disconnectedListeners;
|
||||||
@@ -58,7 +58,7 @@ public class ConnectedPeerManager {
|
|||||||
SyncHandler syncHandler;
|
SyncHandler syncHandler;
|
||||||
private ExecutorService _heartbeatExecutor;
|
private ExecutorService _heartbeatExecutor;
|
||||||
|
|
||||||
public ConnectedPeerManager(Instance<PeerConnectedEventListener> connectedListeners, Instance<PeerDisconnectedEventListener> disconnectedListeners) {
|
public ReachablePeerManager(Instance<PeerConnectedEventListener> connectedListeners, Instance<PeerDisconnectedEventListener> disconnectedListeners) {
|
||||||
_connectedListeners = List.copyOf(connectedListeners.stream().toList());
|
_connectedListeners = List.copyOf(connectedListeners.stream().toList());
|
||||||
_disconnectedListeners = List.copyOf(disconnectedListeners.stream().toList());
|
_disconnectedListeners = List.copyOf(disconnectedListeners.stream().toList());
|
||||||
}
|
}
|
||||||
@@ -3,7 +3,6 @@ package com.usatiuk.dhfs.remoteobj;
|
|||||||
import com.usatiuk.dhfs.peersync.PersistentPeerDataService;
|
import com.usatiuk.dhfs.peersync.PersistentPeerDataService;
|
||||||
import com.usatiuk.dhfs.rpc.RemoteObjectServiceClient;
|
import com.usatiuk.dhfs.rpc.RemoteObjectServiceClient;
|
||||||
import com.usatiuk.objects.JObjectKey;
|
import com.usatiuk.objects.JObjectKey;
|
||||||
import com.usatiuk.objects.transaction.LockingStrategy;
|
|
||||||
import com.usatiuk.objects.transaction.Transaction;
|
import com.usatiuk.objects.transaction.Transaction;
|
||||||
import io.quarkus.logging.Log;
|
import io.quarkus.logging.Log;
|
||||||
import jakarta.inject.Inject;
|
import jakarta.inject.Inject;
|
||||||
@@ -55,11 +54,11 @@ public class RemoteTransaction {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
private <T extends JDataRemote> Optional<T> getData(Class<T> type, JObjectKey key, LockingStrategy strategy, boolean tryRequest) {
|
private <T extends JDataRemote> Optional<T> getData(Class<T> type, JObjectKey key, boolean tryRequest) {
|
||||||
return curTx.get(RemoteObjectMeta.class, RemoteObjectMeta.ofMetaKey(key), strategy)
|
return curTx.get(RemoteObjectMeta.class, RemoteObjectMeta.ofMetaKey(key))
|
||||||
.flatMap(obj -> {
|
.flatMap(obj -> {
|
||||||
if (obj.hasLocalData()) {
|
if (obj.hasLocalData()) {
|
||||||
var realData = curTx.get(RemoteObjectDataWrapper.class, RemoteObjectMeta.ofDataKey(key), strategy).orElse(null);
|
var realData = curTx.get(RemoteObjectDataWrapper.class, RemoteObjectMeta.ofDataKey(key)).orElse(null);
|
||||||
if (realData == null)
|
if (realData == null)
|
||||||
throw new IllegalStateException("Local data not found for " + key); // TODO: Race
|
throw new IllegalStateException("Local data not found for " + key); // TODO: Race
|
||||||
if (!type.isInstance(realData.data()))
|
if (!type.isInstance(realData.data()))
|
||||||
@@ -72,8 +71,8 @@ public class RemoteTransaction {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
public Optional<RemoteObjectMeta> getMeta(JObjectKey key, LockingStrategy strategy) {
|
public Optional<RemoteObjectMeta> getMeta(JObjectKey key) {
|
||||||
return curTx.get(RemoteObjectMeta.class, RemoteObjectMeta.ofMetaKey(key), strategy);
|
return curTx.get(RemoteObjectMeta.class, RemoteObjectMeta.ofMetaKey(key));
|
||||||
}
|
}
|
||||||
|
|
||||||
public <T extends JDataRemote> void putDataRaw(T obj) {
|
public <T extends JDataRemote> void putDataRaw(T obj) {
|
||||||
@@ -127,23 +126,12 @@ public class RemoteTransaction {
|
|||||||
curTx.put(newData);
|
curTx.put(newData);
|
||||||
}
|
}
|
||||||
|
|
||||||
public Optional<RemoteObjectMeta> getMeta(JObjectKey key) {
|
|
||||||
return getMeta(key, LockingStrategy.OPTIMISTIC);
|
|
||||||
}
|
|
||||||
|
|
||||||
public <T extends JDataRemote> Optional<T> getData(Class<T> type, JObjectKey key) {
|
public <T extends JDataRemote> Optional<T> getData(Class<T> type, JObjectKey key) {
|
||||||
return getData(type, key, LockingStrategy.OPTIMISTIC, true);
|
return getData(type, key, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
public <T extends JDataRemote> Optional<T> getDataLocal(Class<T> type, JObjectKey key) {
|
public <T extends JDataRemote> Optional<T> getDataLocal(Class<T> type, JObjectKey key) {
|
||||||
return getData(type, key, LockingStrategy.OPTIMISTIC, false);
|
return getData(type, key, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
public <T extends JDataRemote> Optional<T> getData(Class<T> type, JObjectKey key, LockingStrategy strategy) {
|
|
||||||
return getData(type, key, strategy, true);
|
|
||||||
}
|
|
||||||
|
|
||||||
public <T extends JDataRemote> Optional<T> getDataLocal(Class<T> type, JObjectKey key, LockingStrategy strategy) {
|
|
||||||
return getData(type, key, strategy, false);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -4,7 +4,7 @@ import com.usatiuk.dhfs.ProtoSerializer;
|
|||||||
import com.usatiuk.dhfs.invalidation.InvalidationQueueService;
|
import com.usatiuk.dhfs.invalidation.InvalidationQueueService;
|
||||||
import com.usatiuk.dhfs.invalidation.Op;
|
import com.usatiuk.dhfs.invalidation.Op;
|
||||||
import com.usatiuk.dhfs.peersync.PeerId;
|
import com.usatiuk.dhfs.peersync.PeerId;
|
||||||
import com.usatiuk.dhfs.peersync.ConnectedPeerManager;
|
import com.usatiuk.dhfs.peersync.ReachablePeerManager;
|
||||||
import com.usatiuk.dhfs.peersync.PersistentPeerDataService;
|
import com.usatiuk.dhfs.peersync.PersistentPeerDataService;
|
||||||
import com.usatiuk.dhfs.persistence.JObjectKeyP;
|
import com.usatiuk.dhfs.persistence.JObjectKeyP;
|
||||||
import com.usatiuk.dhfs.remoteobj.ReceivedObject;
|
import com.usatiuk.dhfs.remoteobj.ReceivedObject;
|
||||||
@@ -51,7 +51,7 @@ public class RemoteObjectServiceClient {
|
|||||||
@Inject
|
@Inject
|
||||||
ProtoSerializer<GetObjectReply, ReceivedObject> receivedObjectProtoSerializer;
|
ProtoSerializer<GetObjectReply, ReceivedObject> receivedObjectProtoSerializer;
|
||||||
@Inject
|
@Inject
|
||||||
ConnectedPeerManager connectedPeerManager;
|
ReachablePeerManager reachablePeerManager;
|
||||||
|
|
||||||
public Pair<PeerId, ReceivedObject> getSpecificObject(JObjectKey key, PeerId peerId) {
|
public Pair<PeerId, ReceivedObject> getSpecificObject(JObjectKey key, PeerId peerId) {
|
||||||
return rpcClientFactory.withObjSyncClient(peerId, (peer, client) -> {
|
return rpcClientFactory.withObjSyncClient(peerId, (peer, client) -> {
|
||||||
@@ -70,7 +70,7 @@ public class RemoteObjectServiceClient {
|
|||||||
|
|
||||||
var targetVersion = objMeta.versionSum();
|
var targetVersion = objMeta.versionSum();
|
||||||
var targets = objMeta.knownRemoteVersions().isEmpty()
|
var targets = objMeta.knownRemoteVersions().isEmpty()
|
||||||
? connectedPeerManager.getAvailableHosts()
|
? reachablePeerManager.getAvailableHosts()
|
||||||
: objMeta.knownRemoteVersions().entrySet().stream()
|
: objMeta.knownRemoteVersions().entrySet().stream()
|
||||||
.filter(entry -> entry.getValue().equals(targetVersion))
|
.filter(entry -> entry.getValue().equals(targetVersion))
|
||||||
.map(Map.Entry::getKey).toList();
|
.map(Map.Entry::getKey).toList();
|
||||||
|
|||||||
@@ -5,7 +5,7 @@ import com.usatiuk.dhfs.autosync.AutosyncProcessor;
|
|||||||
import com.usatiuk.dhfs.invalidation.Op;
|
import com.usatiuk.dhfs.invalidation.Op;
|
||||||
import com.usatiuk.dhfs.invalidation.OpHandlerService;
|
import com.usatiuk.dhfs.invalidation.OpHandlerService;
|
||||||
import com.usatiuk.dhfs.peersync.PeerId;
|
import com.usatiuk.dhfs.peersync.PeerId;
|
||||||
import com.usatiuk.dhfs.peersync.ConnectedPeerManager;
|
import com.usatiuk.dhfs.peersync.ReachablePeerManager;
|
||||||
import com.usatiuk.dhfs.persistence.JObjectKeyP;
|
import com.usatiuk.dhfs.persistence.JObjectKeyP;
|
||||||
import com.usatiuk.dhfs.remoteobj.*;
|
import com.usatiuk.dhfs.remoteobj.*;
|
||||||
import com.usatiuk.dhfs.repository.*;
|
import com.usatiuk.dhfs.repository.*;
|
||||||
@@ -31,7 +31,7 @@ public class RemoteObjectServiceServerImpl {
|
|||||||
@Inject
|
@Inject
|
||||||
TransactionManager txm;
|
TransactionManager txm;
|
||||||
@Inject
|
@Inject
|
||||||
ConnectedPeerManager connectedPeerManager;
|
ReachablePeerManager reachablePeerManager;
|
||||||
@Inject
|
@Inject
|
||||||
Transaction curTx;
|
Transaction curTx;
|
||||||
|
|
||||||
|
|||||||
@@ -4,7 +4,7 @@ import com.usatiuk.dhfs.peerdiscovery.IpPeerAddress;
|
|||||||
import com.usatiuk.dhfs.peerdiscovery.PeerAddress;
|
import com.usatiuk.dhfs.peerdiscovery.PeerAddress;
|
||||||
import com.usatiuk.dhfs.peersync.PeerDisconnectedEventListener;
|
import com.usatiuk.dhfs.peersync.PeerDisconnectedEventListener;
|
||||||
import com.usatiuk.dhfs.peersync.PeerId;
|
import com.usatiuk.dhfs.peersync.PeerId;
|
||||||
import com.usatiuk.dhfs.peersync.ConnectedPeerManager;
|
import com.usatiuk.dhfs.peersync.ReachablePeerManager;
|
||||||
import com.usatiuk.dhfs.repository.DhfsObjectSyncGrpcGrpc;
|
import com.usatiuk.dhfs.repository.DhfsObjectSyncGrpcGrpc;
|
||||||
import io.grpc.ManagedChannel;
|
import io.grpc.ManagedChannel;
|
||||||
import io.grpc.Status;
|
import io.grpc.Status;
|
||||||
@@ -29,7 +29,7 @@ public class RpcClientFactory implements PeerDisconnectedEventListener {
|
|||||||
long syncTimeout;
|
long syncTimeout;
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
ConnectedPeerManager connectedPeerManager;
|
ReachablePeerManager reachablePeerManager;
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
RpcChannelFactory rpcChannelFactory;
|
RpcChannelFactory rpcChannelFactory;
|
||||||
@@ -56,7 +56,7 @@ public class RpcClientFactory implements PeerDisconnectedEventListener {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public <R> R withObjSyncClient(PeerId target, ObjectSyncClientFunction<R> fn) {
|
public <R> R withObjSyncClient(PeerId target, ObjectSyncClientFunction<R> fn) {
|
||||||
var hostinfo = connectedPeerManager.getAddress(target);
|
var hostinfo = reachablePeerManager.getAddress(target);
|
||||||
|
|
||||||
if (hostinfo == null)
|
if (hostinfo == null)
|
||||||
throw new StatusRuntimeException(Status.UNAVAILABLE.withDescription("Not known to be reachable: " + target));
|
throw new StatusRuntimeException(Status.UNAVAILABLE.withDescription("Not known to be reachable: " + target));
|
||||||
|
|||||||
@@ -2,7 +2,7 @@ package com.usatiuk.dhfs.webapi;
|
|||||||
|
|
||||||
import com.usatiuk.dhfs.peersync.PeerId;
|
import com.usatiuk.dhfs.peersync.PeerId;
|
||||||
import com.usatiuk.dhfs.peersync.PeerInfoService;
|
import com.usatiuk.dhfs.peersync.PeerInfoService;
|
||||||
import com.usatiuk.dhfs.peersync.ConnectedPeerManager;
|
import com.usatiuk.dhfs.peersync.ReachablePeerManager;
|
||||||
import com.usatiuk.dhfs.peersync.PersistentPeerDataService;
|
import com.usatiuk.dhfs.peersync.PersistentPeerDataService;
|
||||||
import jakarta.inject.Inject;
|
import jakarta.inject.Inject;
|
||||||
import jakarta.ws.rs.*;
|
import jakarta.ws.rs.*;
|
||||||
@@ -14,7 +14,7 @@ public class PeerManagementApi {
|
|||||||
@Inject
|
@Inject
|
||||||
PeerInfoService peerInfoService;
|
PeerInfoService peerInfoService;
|
||||||
@Inject
|
@Inject
|
||||||
ConnectedPeerManager connectedPeerManager;
|
ReachablePeerManager reachablePeerManager;
|
||||||
@Inject
|
@Inject
|
||||||
PersistentPeerDataService persistentPeerDataService;
|
PersistentPeerDataService persistentPeerDataService;
|
||||||
|
|
||||||
@@ -23,27 +23,27 @@ public class PeerManagementApi {
|
|||||||
public List<PeerInfo> knownPeers() {
|
public List<PeerInfo> knownPeers() {
|
||||||
return peerInfoService.getPeers().stream().map(
|
return peerInfoService.getPeers().stream().map(
|
||||||
peerInfo -> new PeerInfo(peerInfo.id().toString(), Base64.getEncoder().encodeToString(peerInfo.cert().toByteArray()),
|
peerInfo -> new PeerInfo(peerInfo.id().toString(), Base64.getEncoder().encodeToString(peerInfo.cert().toByteArray()),
|
||||||
Optional.ofNullable(connectedPeerManager.getAddress(peerInfo.id())).map(Objects::toString).orElse(null))).toList();
|
Optional.ofNullable(reachablePeerManager.getAddress(peerInfo.id())).map(Objects::toString).orElse(null))).toList();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Path("known-peers/{peerId}")
|
@Path("known-peers/{peerId}")
|
||||||
@PUT
|
@PUT
|
||||||
public void addPeer(@PathParam("peerId") String peerId, KnownPeerPut knownPeerPut) {
|
public void addPeer(@PathParam("peerId") String peerId, KnownPeerPut knownPeerPut) {
|
||||||
connectedPeerManager.addRemoteHost(PeerId.of(peerId), knownPeerPut.cert());
|
reachablePeerManager.addRemoteHost(PeerId.of(peerId), knownPeerPut.cert());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Path("known-peers/{peerId}")
|
@Path("known-peers/{peerId}")
|
||||||
@DELETE
|
@DELETE
|
||||||
public void deletePeer(@PathParam("peerId") String peerId) {
|
public void deletePeer(@PathParam("peerId") String peerId) {
|
||||||
connectedPeerManager.removeRemoteHost(PeerId.of(peerId));
|
reachablePeerManager.removeRemoteHost(PeerId.of(peerId));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Path("available-peers")
|
@Path("available-peers")
|
||||||
@GET
|
@GET
|
||||||
public Collection<PeerInfo> availablePeers() {
|
public Collection<PeerInfo> availablePeers() {
|
||||||
return connectedPeerManager.getSeenButNotAddedHosts().stream()
|
return reachablePeerManager.getSeenButNotAddedHosts().stream()
|
||||||
.map(p -> new PeerInfo(p.getLeft().toString(), p.getRight().cert(),
|
.map(p -> new PeerInfo(p.getLeft().toString(), p.getRight().cert(),
|
||||||
connectedPeerManager.selectBestAddress(p.getLeft()).map(Objects::toString).orElse(null)))
|
reachablePeerManager.selectBestAddress(p.getLeft()).map(Objects::toString).orElse(null)))
|
||||||
.toList();
|
.toList();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -3,7 +3,7 @@ package com.usatiuk.dhfs.webapi;
|
|||||||
import com.usatiuk.dhfs.peerdiscovery.PeerAddrStringHelper;
|
import com.usatiuk.dhfs.peerdiscovery.PeerAddrStringHelper;
|
||||||
import com.usatiuk.dhfs.peersync.PeerId;
|
import com.usatiuk.dhfs.peersync.PeerId;
|
||||||
import com.usatiuk.dhfs.peersync.PeerInfoService;
|
import com.usatiuk.dhfs.peersync.PeerInfoService;
|
||||||
import com.usatiuk.dhfs.peersync.ConnectedPeerManager;
|
import com.usatiuk.dhfs.peersync.ReachablePeerManager;
|
||||||
import com.usatiuk.dhfs.peersync.PersistentPeerDataService;
|
import com.usatiuk.dhfs.peersync.PersistentPeerDataService;
|
||||||
import jakarta.inject.Inject;
|
import jakarta.inject.Inject;
|
||||||
import jakarta.ws.rs.*;
|
import jakarta.ws.rs.*;
|
||||||
@@ -15,7 +15,7 @@ public class PersistentPeerAddressApi {
|
|||||||
@Inject
|
@Inject
|
||||||
PeerInfoService peerInfoService;
|
PeerInfoService peerInfoService;
|
||||||
@Inject
|
@Inject
|
||||||
ConnectedPeerManager connectedPeerManager;
|
ReachablePeerManager reachablePeerManager;
|
||||||
@Inject
|
@Inject
|
||||||
PersistentPeerDataService persistentPeerDataService;
|
PersistentPeerDataService persistentPeerDataService;
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user