mirror of
https://github.com/usatiuk/dhfs.git
synced 2025-10-28 20:47:49 +01:00
removing peers
This commit is contained in:
@@ -151,7 +151,7 @@ public class JObject<T extends JObjectData> implements Serializable, Comparable<
|
||||
throw new IllegalStateException("Data is not null when recording external resolution of " + getName());
|
||||
_metaPart.narrowClass(data.getClass());
|
||||
_dataPart.set(data);
|
||||
if (!_metaPart.isLocked())
|
||||
if (!_metaPart.isLocked() && _metaPart.getRefcount() == 0)
|
||||
_metaPart.lock();
|
||||
hydrateRefs();
|
||||
verifyRefs();
|
||||
|
||||
@@ -1,9 +1,9 @@
|
||||
package com.usatiuk.dhfs.objects.jrepository;
|
||||
|
||||
import com.google.protobuf.ByteString;
|
||||
import com.usatiuk.dhfs.SerializationHelper;
|
||||
import com.usatiuk.dhfs.objects.repository.PersistentRemoteHostsService;
|
||||
import com.usatiuk.dhfs.objects.repository.persistence.ObjectPersistentStore;
|
||||
import com.usatiuk.dhfs.SerializationHelper;
|
||||
import io.grpc.Status;
|
||||
import io.grpc.StatusRuntimeException;
|
||||
import io.quarkus.logging.Log;
|
||||
@@ -143,12 +143,18 @@ public class JObjectManagerImpl implements JObjectManager {
|
||||
_map.put(object.getName(), new NamedSoftReference(ret, _refQueue));
|
||||
}
|
||||
}
|
||||
JObject<D> finalRet = (JObject<D>) ret;
|
||||
ret.runWriteLocked(JObject.ResolutionStrategy.NO_RESOLUTION, (m, d, b, i) -> {
|
||||
if (parent.isPresent()) {
|
||||
m.addRef(parent.get());
|
||||
} else {
|
||||
m.lock();
|
||||
}
|
||||
|
||||
if (object.pushResolution() && finalRet.getData() == null) {
|
||||
finalRet.externalResolution(object);
|
||||
}
|
||||
|
||||
return true;
|
||||
});
|
||||
return (JObject<D>) ret;
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
package com.usatiuk.dhfs.objects.repository;
|
||||
|
||||
import com.usatiuk.dhfs.objects.jrepository.JObjectManager;
|
||||
import io.quarkus.logging.Log;
|
||||
import io.quarkus.runtime.ShutdownEvent;
|
||||
import io.quarkus.runtime.Startup;
|
||||
@@ -21,7 +20,7 @@ public class InvalidationQueueService {
|
||||
RemoteObjectServiceClient remoteObjectServiceClient;
|
||||
|
||||
@Inject
|
||||
JObjectManager jObjectManager;
|
||||
PersistentRemoteHostsService persistentRemoteHostsService;
|
||||
|
||||
@ConfigProperty(name = "dhfs.objects.invalidation.batch_size")
|
||||
Integer batchSize;
|
||||
@@ -66,9 +65,10 @@ public class InvalidationQueueService {
|
||||
while (!Thread.interrupted()) {
|
||||
var data = pullAll();
|
||||
Thread.sleep(delay);
|
||||
String stats = "Sent invalidation: ";
|
||||
for (var forHost : data.entrySet()) {
|
||||
data.entrySet().stream().filter(e -> persistentRemoteHostsService.existsHost(e.getKey())).forEach(forHost -> {
|
||||
String stats = "Sent invalidation: ";
|
||||
long sent = 0;
|
||||
|
||||
while (!forHost.getValue().isEmpty()) {
|
||||
ArrayList<String> chunk = new ArrayList<>();
|
||||
|
||||
@@ -99,7 +99,7 @@ public class InvalidationQueueService {
|
||||
}
|
||||
stats += forHost.getKey() + ": " + sent + " ";
|
||||
Log.info(stats);
|
||||
}
|
||||
});
|
||||
}
|
||||
} catch (InterruptedException ignored) {
|
||||
}
|
||||
|
||||
@@ -191,6 +191,23 @@ public class PersistentRemoteHostsService {
|
||||
return added;
|
||||
}
|
||||
|
||||
public boolean removeHost(UUID host) {
|
||||
boolean removed = getPeerDirectory().runWriteLocked(JObject.ResolutionStrategy.LOCAL_ONLY, (m, d, b, v) -> {
|
||||
boolean removedInner = d.getPeers().remove(host);
|
||||
if (removedInner) {
|
||||
getPeer(host).runWriteLocked(JObject.ResolutionStrategy.LOCAL_ONLY, (mp, dp, bp, vp) -> {
|
||||
mp.removeRef(m.getName());
|
||||
return null;
|
||||
});
|
||||
b.apply();
|
||||
}
|
||||
return removedInner;
|
||||
});
|
||||
if (removed)
|
||||
updateCerts();
|
||||
return removed;
|
||||
}
|
||||
|
||||
private void updateCerts() {
|
||||
getPeerDirectory().runReadLocked(JObject.ResolutionStrategy.LOCAL_ONLY, (m, d) -> {
|
||||
peerTrustManager.reloadTrustManagerHosts(getHostsNoNulls());
|
||||
|
||||
@@ -169,6 +169,15 @@ public class RemoteHostManager {
|
||||
});
|
||||
}
|
||||
|
||||
public void removeRemoteHost(UUID host) {
|
||||
persistentRemoteHostsService.removeHost(host);
|
||||
// Race?
|
||||
_transientPeersState.runWriteLocked(d -> {
|
||||
d.getStates().remove(host);
|
||||
return null;
|
||||
});
|
||||
}
|
||||
|
||||
public void addRemoteHost(UUID host) {
|
||||
if (!_seenHostsButNotAdded.containsKey(host)) {
|
||||
throw new IllegalStateException("Host " + host + " is not seen");
|
||||
|
||||
@@ -0,0 +1,4 @@
|
||||
package com.usatiuk.dhfs.objects.repository.webapi;
|
||||
|
||||
public record KnownPeerDelete(String uuid) {
|
||||
}
|
||||
@@ -3,6 +3,7 @@ package com.usatiuk.dhfs.objects.repository.webapi;
|
||||
import com.usatiuk.dhfs.objects.repository.PersistentRemoteHostsService;
|
||||
import com.usatiuk.dhfs.objects.repository.RemoteHostManager;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.ws.rs.DELETE;
|
||||
import jakarta.ws.rs.GET;
|
||||
import jakarta.ws.rs.PUT;
|
||||
import jakarta.ws.rs.Path;
|
||||
@@ -31,6 +32,12 @@ public class ManagementApi {
|
||||
remoteHostManager.addRemoteHost(UUID.fromString(knownPeerPut.uuid()));
|
||||
}
|
||||
|
||||
@Path("known-peers")
|
||||
@DELETE
|
||||
public void DeletePeer(KnownPeerDelete knownPeerDelete) {
|
||||
remoteHostManager.removeRemoteHost(UUID.fromString(knownPeerDelete.uuid()));
|
||||
}
|
||||
|
||||
@Path("available-peers")
|
||||
@GET
|
||||
public Collection<AvailablePeerInfo> availablePeers() {
|
||||
|
||||
@@ -28,6 +28,9 @@ public class DhfsFuseIT {
|
||||
WaitingConsumer waitingConsumer1;
|
||||
WaitingConsumer waitingConsumer2;
|
||||
|
||||
String c1uuid;
|
||||
String c2uuid;
|
||||
|
||||
@BeforeEach
|
||||
void setup() throws IOException, InterruptedException, TimeoutException {
|
||||
String buildPath = System.getProperty("buildDirectory");
|
||||
@@ -67,8 +70,8 @@ public class DhfsFuseIT {
|
||||
var loggingConsumer2 = new Slf4jLogConsumer(LoggerFactory.getLogger(DhfsFuseIT.class)).withPrefix("2");
|
||||
container2.followOutput(loggingConsumer2.andThen(waitingConsumer2));
|
||||
|
||||
var c1uuid = container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_root_d/self_uuid").getStdout();
|
||||
var c2uuid = container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_root_d/self_uuid").getStdout();
|
||||
c1uuid = container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_root_d/self_uuid").getStdout();
|
||||
c2uuid = container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_root_d/self_uuid").getStdout();
|
||||
|
||||
Assertions.assertDoesNotThrow(() -> UUID.fromString(c1uuid));
|
||||
Assertions.assertDoesNotThrow(() -> UUID.fromString(c2uuid));
|
||||
@@ -106,6 +109,33 @@ public class DhfsFuseIT {
|
||||
Assertions.assertEquals("tesempty\n", container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_fuse_root/testf1").getStdout());
|
||||
}
|
||||
|
||||
@Test
|
||||
void removeAddHostTest() throws IOException, InterruptedException, TimeoutException {
|
||||
Assertions.assertEquals(0, container1.execInContainer("/bin/sh", "-c", "echo tesempty > /root/dhfs_data/dhfs_fuse_root/testf1").getExitCode());
|
||||
Thread.sleep(2000);
|
||||
Assertions.assertEquals("tesempty\n", container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_fuse_root/testf1").getStdout());
|
||||
|
||||
var c2curl = container2.execInContainer("/bin/sh", "-c",
|
||||
"curl --header \"Content-Type: application/json\" " +
|
||||
" --request DELETE " +
|
||||
" --data '{\"uuid\":\"" + c1uuid + "\"}' " +
|
||||
" http://localhost:8080/objects-manage/known-peers");
|
||||
|
||||
Thread.sleep(1000);
|
||||
Assertions.assertEquals(0, container2.execInContainer("/bin/sh", "-c", "echo rewritten > /root/dhfs_data/dhfs_fuse_root/testf1").getExitCode());
|
||||
Assertions.assertEquals("tesempty\n", container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_fuse_root/testf1").getStdout());
|
||||
|
||||
container2.execInContainer("/bin/sh", "-c",
|
||||
"curl --header \"Content-Type: application/json\" " +
|
||||
" --request PUT " +
|
||||
" --data '{\"uuid\":\"" + c1uuid + "\"}' " +
|
||||
" http://localhost:8080/objects-manage/known-peers");
|
||||
|
||||
Thread.sleep(1000);
|
||||
Assertions.assertEquals("rewritten\n", container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_fuse_root/testf1").getStdout());
|
||||
Assertions.assertEquals("rewritten\n", container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_fuse_root/testf1").getStdout());
|
||||
}
|
||||
|
||||
@Test
|
||||
void dirConflictTest() throws IOException, InterruptedException, TimeoutException {
|
||||
Assertions.assertEquals(0, container1.execInContainer("/bin/sh", "-c", "ls /root/dhfs_data/dhfs_fuse_root").getExitCode());
|
||||
@@ -120,7 +150,7 @@ public class DhfsFuseIT {
|
||||
}
|
||||
}).anyMatch(r -> r != 0);
|
||||
Assumptions.assumeTrue(!createFail, "Failed creating one or more files");
|
||||
Thread.sleep(5000);
|
||||
Thread.sleep(2000);
|
||||
var ls = container2.execInContainer("/bin/sh", "-c", "ls /root/dhfs_data/dhfs_fuse_root");
|
||||
var cat = container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_fuse_root/*");
|
||||
Log.info(ls);
|
||||
@@ -143,7 +173,7 @@ public class DhfsFuseIT {
|
||||
}
|
||||
}).anyMatch(r -> r != 0);
|
||||
Assumptions.assumeTrue(!createFail, "Failed creating one or more files");
|
||||
Thread.sleep(5000);
|
||||
Thread.sleep(2000);
|
||||
var ls = container2.execInContainer("/bin/sh", "-c", "ls /root/dhfs_data/dhfs_fuse_root");
|
||||
var cat = container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_fuse_root/*");
|
||||
Log.info(ls);
|
||||
@@ -163,7 +193,7 @@ public class DhfsFuseIT {
|
||||
}
|
||||
}).anyMatch(r -> r != 0);
|
||||
Assumptions.assumeTrue(!createFail, "Failed creating one or more files");
|
||||
Thread.sleep(5000);
|
||||
Thread.sleep(2000);
|
||||
var ls = container2.execInContainer("/bin/sh", "-c", "ls /root/dhfs_data/dhfs_fuse_root");
|
||||
var cat = container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_fuse_root/*");
|
||||
Log.info(ls);
|
||||
@@ -177,7 +207,7 @@ public class DhfsFuseIT {
|
||||
boolean createdOk = (container1.execInContainer("/bin/sh", "-c", "echo test1 >> /root/dhfs_data/dhfs_fuse_root/testf").getExitCode() == 0)
|
||||
&& (container2.execInContainer("/bin/sh", "-c", "echo test2 >> /root/dhfs_data/dhfs_fuse_root/testf").getExitCode() == 0);
|
||||
Assumptions.assumeTrue(createdOk, "Failed creating one or more files");
|
||||
Thread.sleep(5000);
|
||||
Thread.sleep(2000);
|
||||
var ls = container2.execInContainer("/bin/sh", "-c", "ls /root/dhfs_data/dhfs_fuse_root");
|
||||
var cat = container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_fuse_root/*");
|
||||
Log.info(ls);
|
||||
|
||||
@@ -31,6 +31,10 @@ public class DhfsFusex3IT {
|
||||
WaitingConsumer waitingConsumer2;
|
||||
WaitingConsumer waitingConsumer3;
|
||||
|
||||
String c1uuid;
|
||||
String c2uuid;
|
||||
String c3uuid;
|
||||
|
||||
@BeforeEach
|
||||
void setup() throws IOException, InterruptedException, TimeoutException {
|
||||
String buildPath = System.getProperty("buildDirectory");
|
||||
@@ -70,9 +74,9 @@ public class DhfsFusex3IT {
|
||||
|
||||
Stream.of(container1, container2, container3).parallel().forEach(GenericContainer::start);
|
||||
|
||||
var c1uuid = container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_root_d/self_uuid").getStdout();
|
||||
var c2uuid = container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_root_d/self_uuid").getStdout();
|
||||
var c3uuid = container3.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_root_d/self_uuid").getStdout();
|
||||
c1uuid = container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_root_d/self_uuid").getStdout();
|
||||
c2uuid = container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_root_d/self_uuid").getStdout();
|
||||
c3uuid = container3.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_root_d/self_uuid").getStdout();
|
||||
|
||||
waitingConsumer1 = new WaitingConsumer();
|
||||
var loggingConsumer1 = new Slf4jLogConsumer(LoggerFactory.getLogger(DhfsFusex3IT.class)).withPrefix(c1uuid);
|
||||
@@ -136,6 +140,27 @@ public class DhfsFusex3IT {
|
||||
Assertions.assertEquals("tesempty\n", container3.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_fuse_root/testf1").getStdout());
|
||||
}
|
||||
|
||||
@Test
|
||||
void removeHostTest() throws IOException, InterruptedException, TimeoutException {
|
||||
Assertions.assertEquals(0, container1.execInContainer("/bin/sh", "-c", "echo tesempty > /root/dhfs_data/dhfs_fuse_root/testf1").getExitCode());
|
||||
Thread.sleep(1000);
|
||||
Assertions.assertEquals("tesempty\n", container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_fuse_root/testf1").getStdout());
|
||||
Assertions.assertEquals("tesempty\n", container3.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_fuse_root/testf1").getStdout());
|
||||
|
||||
var c3curl = container3.execInContainer("/bin/sh", "-c",
|
||||
"curl --header \"Content-Type: application/json\" " +
|
||||
" --request DELETE " +
|
||||
" --data '{\"uuid\":\"" + c2uuid + "\"}' " +
|
||||
" http://localhost:8080/objects-manage/known-peers");
|
||||
|
||||
Thread.sleep(1000);
|
||||
Assertions.assertEquals(0, container2.execInContainer("/bin/sh", "-c", "echo rewritten > /root/dhfs_data/dhfs_fuse_root/testf1").getExitCode());
|
||||
Thread.sleep(1000);
|
||||
Assertions.assertEquals("rewritten\n", container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_fuse_root/testf1").getStdout());
|
||||
Assertions.assertEquals("tesempty\n", container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_fuse_root/testf1").getStdout());
|
||||
Assertions.assertEquals("tesempty\n", container3.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_fuse_root/testf1").getStdout());
|
||||
}
|
||||
|
||||
@Test
|
||||
void dirConflictTest() throws IOException, InterruptedException, TimeoutException {
|
||||
boolean createFail = Stream.of(Pair.of(container1, "echo test1 >> /root/dhfs_data/dhfs_fuse_root/testf"),
|
||||
@@ -148,7 +173,7 @@ public class DhfsFusex3IT {
|
||||
}
|
||||
}).anyMatch(r -> r != 0);
|
||||
Assumptions.assumeTrue(!createFail, "Failed creating one or more files");
|
||||
Thread.sleep(5000);
|
||||
Thread.sleep(2000);
|
||||
for (var c : List.of(container1, container2, container3)) {
|
||||
var ls = c.execInContainer("/bin/sh", "-c", "ls /root/dhfs_data/dhfs_fuse_root");
|
||||
var cat = c.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_fuse_root/*");
|
||||
@@ -186,7 +211,7 @@ public class DhfsFusex3IT {
|
||||
}
|
||||
}).anyMatch(r -> r != 0);
|
||||
Assumptions.assumeTrue(!createFail, "Failed creating one or more files");
|
||||
Thread.sleep(5000);
|
||||
Thread.sleep(2000);
|
||||
for (var c : List.of(container1, container2, container3)) {
|
||||
var ls = c.execInContainer("/bin/sh", "-c", "ls /root/dhfs_data/dhfs_fuse_root");
|
||||
var cat = c.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_fuse_root/*");
|
||||
@@ -223,7 +248,7 @@ public class DhfsFusex3IT {
|
||||
}
|
||||
}).anyMatch(r -> r != 0);
|
||||
Assumptions.assumeTrue(!writeFail, "Failed creating one or more files");
|
||||
Thread.sleep(5000);
|
||||
Thread.sleep(2000);
|
||||
for (var c : List.of(container1, container2, container3)) {
|
||||
var ls = c.execInContainer("/bin/sh", "-c", "ls /root/dhfs_data/dhfs_fuse_root");
|
||||
var cat = c.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_fuse_root/*");
|
||||
|
||||
@@ -2,11 +2,4 @@
|
||||
|
||||
.peerAvailableCard {
|
||||
@extend .peerCard;
|
||||
|
||||
@include card-with-actions;
|
||||
|
||||
.peerInfo {
|
||||
flex-grow: 1;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -6,4 +6,10 @@
|
||||
> * {
|
||||
padding: 0.5rem;
|
||||
}
|
||||
|
||||
@include card-with-actions;
|
||||
|
||||
.peerInfo {
|
||||
flex-grow: 1;
|
||||
}
|
||||
}
|
||||
@@ -1,11 +1,30 @@
|
||||
import { TKnownPeerInfoTo } from "./api/dto";
|
||||
|
||||
import "./PeerKnownCard.scss";
|
||||
import { useFetcher } from "react-router-dom";
|
||||
|
||||
export interface TPeerKnownCardProps {
|
||||
peerInfo: TKnownPeerInfoTo;
|
||||
}
|
||||
|
||||
export function PeerKnownCard({ peerInfo }: TPeerKnownCardProps) {
|
||||
return <div className="peerKnownCard">{peerInfo.uuid}</div>;
|
||||
const fetcher = useFetcher();
|
||||
|
||||
return (
|
||||
<div className="peerKnownCard">
|
||||
<div className={"peerInfo"}>
|
||||
<span>UUID: </span>
|
||||
<span>{peerInfo.uuid}</span>
|
||||
</div>
|
||||
<fetcher.Form
|
||||
className="actions"
|
||||
method="put"
|
||||
action={"/home/peers"}
|
||||
>
|
||||
<button type="submit">remove</button>
|
||||
<input name="intent" hidden={true} value={"remove_peer"} />
|
||||
<input name="uuid" hidden={true} value={peerInfo.uuid} />
|
||||
</fetcher.Form>
|
||||
</div>
|
||||
);
|
||||
}
|
||||
|
||||
@@ -2,6 +2,7 @@ import {
|
||||
getAvailablePeers,
|
||||
getKnownPeers,
|
||||
putKnownPeer,
|
||||
removeKnownPeer,
|
||||
} from "./api/PeerState";
|
||||
import { ActionFunctionArgs } from "react-router-dom";
|
||||
|
||||
@@ -12,13 +13,15 @@ export async function peerStateLoader() {
|
||||
};
|
||||
}
|
||||
|
||||
export type PeerStateActionType = "add_peer" | unknown;
|
||||
export type PeerStateActionType = "add_peer" | "remove_peer" | unknown;
|
||||
|
||||
export async function peerStateAction({ request }: ActionFunctionArgs) {
|
||||
const formData = await request.formData();
|
||||
const intent = formData.get("intent") as PeerStateActionType;
|
||||
if (intent === "add_peer") {
|
||||
return await putKnownPeer(formData.get("uuid") as string);
|
||||
} else if (intent === "remove_peer") {
|
||||
return await removeKnownPeer(formData.get("uuid") as string);
|
||||
} else {
|
||||
throw new Error("Malformed action: " + JSON.stringify(request));
|
||||
}
|
||||
|
||||
@@ -30,3 +30,9 @@ export async function putKnownPeer(uuid: string): Promise<TNoContentToResp> {
|
||||
uuid,
|
||||
});
|
||||
}
|
||||
|
||||
export async function removeKnownPeer(uuid: string): Promise<TNoContentToResp> {
|
||||
return fetchJSON("/objects-manage/known-peers", "DELETE", NoContentToResp, {
|
||||
uuid,
|
||||
});
|
||||
}
|
||||
|
||||
@@ -67,3 +67,7 @@ export type TKnownPeerInfoToResp = z.infer<typeof KnownPeerInfoToResp>;
|
||||
// KnownPeerPut
|
||||
export const KnownPeerPutTo = z.object({ uuid: z.string() });
|
||||
export type TKnownPeerPutTo = z.infer<typeof KnownPeerPutTo>;
|
||||
|
||||
// KnownPeerDelete
|
||||
export const KnownPeerDeleteTo = z.object({ uuid: z.string() });
|
||||
export type TKnownPeerDeleteTo = z.infer<typeof KnownPeerDeleteTo>;
|
||||
|
||||
Reference in New Issue
Block a user