refactor(server): now keeps track of useragents, instead of
This commit is contained in:
@@ -10,19 +10,27 @@ use kameo::{
|
|||||||
use tracing::info;
|
use tracing::info;
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
actors::flow_coordinator::client_connect_approval::ClientApprovalController,
|
actors::{
|
||||||
peers::{
|
flow_coordinator::client_connect_approval::ClientApprovalController,
|
||||||
client::{ClientProfile, session::ClientSession},
|
useragent_registry::{GetConnected, UserAgentRegistry},
|
||||||
user_agent::UserAgentSession,
|
|
||||||
},
|
},
|
||||||
|
peers::client::{ClientProfile, session::ClientSession},
|
||||||
};
|
};
|
||||||
|
|
||||||
pub mod client_connect_approval;
|
pub mod client_connect_approval;
|
||||||
|
|
||||||
#[derive(Default)]
|
|
||||||
pub struct FlowCoordinator {
|
pub struct FlowCoordinator {
|
||||||
pub user_agents: HashMap<ActorId, ActorRef<UserAgentSession>>,
|
|
||||||
pub clients: HashMap<ActorId, ActorRef<ClientSession>>,
|
pub clients: HashMap<ActorId, ActorRef<ClientSession>>,
|
||||||
|
useragent_registry: ActorRef<UserAgentRegistry>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl FlowCoordinator {
|
||||||
|
pub fn new(useragent_registry: ActorRef<UserAgentRegistry>) -> Self {
|
||||||
|
Self {
|
||||||
|
clients: HashMap::default(),
|
||||||
|
useragent_registry,
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Actor for FlowCoordinator {
|
impl Actor for FlowCoordinator {
|
||||||
@@ -40,13 +48,7 @@ impl Actor for FlowCoordinator {
|
|||||||
id: ActorId,
|
id: ActorId,
|
||||||
_: ActorStopReason,
|
_: ActorStopReason,
|
||||||
) -> Result<ControlFlow<ActorStopReason>, Self::Error> {
|
) -> Result<ControlFlow<ActorStopReason>, Self::Error> {
|
||||||
if self.user_agents.remove(&id).is_some() {
|
if self.clients.remove(&id).is_some() {
|
||||||
info!(
|
|
||||||
?id,
|
|
||||||
actor = "FlowCoordinator",
|
|
||||||
event = "useragent.disconnected"
|
|
||||||
);
|
|
||||||
} else if self.clients.remove(&id).is_some() {
|
|
||||||
info!(
|
info!(
|
||||||
?id,
|
?id,
|
||||||
actor = "FlowCoordinator",
|
actor = "FlowCoordinator",
|
||||||
@@ -71,17 +73,6 @@ pub enum ApprovalError {
|
|||||||
|
|
||||||
#[messages]
|
#[messages]
|
||||||
impl FlowCoordinator {
|
impl FlowCoordinator {
|
||||||
#[message(ctx)]
|
|
||||||
pub async fn register_user_agent(
|
|
||||||
&mut self,
|
|
||||||
actor: ActorRef<UserAgentSession>,
|
|
||||||
ctx: &mut Context<Self, ()>,
|
|
||||||
) {
|
|
||||||
info!(id = %actor.id(), actor = "FlowCoordinator", event = "useragent.connected");
|
|
||||||
ctx.actor_ref().link(&actor).await;
|
|
||||||
self.user_agents.insert(actor.id(), actor);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[message(ctx)]
|
#[message(ctx)]
|
||||||
pub async fn register_client(
|
pub async fn register_client(
|
||||||
&mut self,
|
&mut self,
|
||||||
@@ -103,7 +94,14 @@ impl FlowCoordinator {
|
|||||||
unreachable!("Expected `request_client_approval` to have callback channel");
|
unreachable!("Expected `request_client_approval` to have callback channel");
|
||||||
};
|
};
|
||||||
|
|
||||||
let refs: Vec<_> = self.user_agents.values().cloned().collect();
|
let refs = match self.useragent_registry.ask(GetConnected).await {
|
||||||
|
Ok(refs) => refs,
|
||||||
|
Err(_) => {
|
||||||
|
reply_sender.send(Err(ApprovalError::NoUserAgentsConnected));
|
||||||
|
return reply;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
if refs.is_empty() {
|
if refs.is_empty() {
|
||||||
reply_sender.send(Err(ApprovalError::NoUserAgentsConnected));
|
reply_sender.send(Err(ApprovalError::NoUserAgentsConnected));
|
||||||
return reply;
|
return reply;
|
||||||
|
|||||||
@@ -4,7 +4,11 @@ use thiserror::Error;
|
|||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
actors::{
|
actors::{
|
||||||
bootstrap::Bootstrapper, evm::EvmActor, flow_coordinator::FlowCoordinator, vault::Vault,
|
bootstrap::Bootstrapper,
|
||||||
|
evm::EvmActor,
|
||||||
|
flow_coordinator::FlowCoordinator,
|
||||||
|
useragent_registry::UserAgentRegistry,
|
||||||
|
vault::Vault,
|
||||||
},
|
},
|
||||||
db,
|
db,
|
||||||
};
|
};
|
||||||
@@ -30,6 +34,7 @@ pub struct GlobalActors {
|
|||||||
pub vault: ActorRef<Vault>,
|
pub vault: ActorRef<Vault>,
|
||||||
pub bootstrapper: ActorRef<Bootstrapper>,
|
pub bootstrapper: ActorRef<Bootstrapper>,
|
||||||
pub flow_coordinator: ActorRef<FlowCoordinator>,
|
pub flow_coordinator: ActorRef<FlowCoordinator>,
|
||||||
|
pub useragent_registry: ActorRef<UserAgentRegistry>,
|
||||||
pub evm: ActorRef<EvmActor>,
|
pub evm: ActorRef<EvmActor>,
|
||||||
pub events: ActorRef<MessageBus>,
|
pub events: ActorRef<MessageBus>,
|
||||||
}
|
}
|
||||||
@@ -42,11 +47,15 @@ impl GlobalActors {
|
|||||||
pub async fn spawn(db: db::DatabasePool) -> Result<Self, SpawnError> {
|
pub async fn spawn(db: db::DatabasePool) -> Result<Self, SpawnError> {
|
||||||
let message_bus = Self::spawn_message_bus();
|
let message_bus = Self::spawn_message_bus();
|
||||||
let key_holder = Vault::spawn(Vault::new(db.clone(), message_bus.clone()).await?);
|
let key_holder = Vault::spawn(Vault::new(db.clone(), message_bus.clone()).await?);
|
||||||
|
let useragent_registry = UserAgentRegistry::spawn(UserAgentRegistry::default());
|
||||||
Ok(Self {
|
Ok(Self {
|
||||||
bootstrapper: Bootstrapper::spawn(Bootstrapper::new(&db).await?),
|
bootstrapper: Bootstrapper::spawn(Bootstrapper::new(&db).await?),
|
||||||
evm: EvmActor::spawn(EvmActor::new(key_holder.clone(), db)),
|
evm: EvmActor::spawn(EvmActor::new(key_holder.clone(), db)),
|
||||||
vault: key_holder,
|
vault: key_holder,
|
||||||
flow_coordinator: FlowCoordinator::spawn(FlowCoordinator::default()),
|
flow_coordinator: FlowCoordinator::spawn(FlowCoordinator::new(
|
||||||
|
useragent_registry.clone(),
|
||||||
|
)),
|
||||||
|
useragent_registry,
|
||||||
events: message_bus,
|
events: message_bus,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,57 +1,58 @@
|
|||||||
use alloy::primitives::map::HashMap;
|
use std::{collections::HashMap, ops::ControlFlow};
|
||||||
use arbiter_crypto::authn;
|
|
||||||
use kameo::{error::Infallible, prelude::*};
|
|
||||||
|
|
||||||
use crate::{db::DatabasePool, peers::user_agent::{Credentials, UserAgentSession}};
|
use kameo::{
|
||||||
|
Actor,
|
||||||
|
actor::{ActorId, ActorRef},
|
||||||
|
error::Infallible,
|
||||||
|
messages,
|
||||||
|
prelude::{ActorStopReason, Context, WeakActorRef},
|
||||||
|
};
|
||||||
|
use tracing::info;
|
||||||
|
|
||||||
use super::vault::{Vault, events as vault_events};
|
use crate::peers::user_agent::UserAgentSession;
|
||||||
|
|
||||||
pub struct Args {
|
|
||||||
pub vault: ActorRef<Vault>,
|
|
||||||
pub pool: DatabasePool,
|
|
||||||
}
|
|
||||||
|
|
||||||
|
#[derive(Default)]
|
||||||
pub struct UserAgentRegistry {
|
pub struct UserAgentRegistry {
|
||||||
vault: ActorRef<Vault>,
|
connected: HashMap<ActorId, ActorRef<UserAgentSession>>,
|
||||||
pool: DatabasePool,
|
|
||||||
connected: HashMap<Credentials, ActorRef<UserAgentSession>>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Message<vault_events::Bootstrapped> for UserAgentRegistry {
|
|
||||||
type Reply = ();
|
|
||||||
|
|
||||||
async fn handle(
|
|
||||||
&mut self,
|
|
||||||
msg: vault_events::Bootstrapped,
|
|
||||||
ctx: &mut Context<Self, Self::Reply>,
|
|
||||||
) -> Self::Reply {
|
|
||||||
todo!()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Message<vault_events::Unsealed> for UserAgentRegistry {
|
|
||||||
type Reply = ();
|
|
||||||
|
|
||||||
async fn handle(
|
|
||||||
&mut self,
|
|
||||||
msg: vault_events::Unsealed,
|
|
||||||
ctx: &mut Context<Self, Self::Reply>,
|
|
||||||
) -> Self::Reply {
|
|
||||||
todo!()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
impl Actor for UserAgentRegistry {
|
impl Actor for UserAgentRegistry {
|
||||||
type Args = Args;
|
type Args = Self;
|
||||||
|
|
||||||
type Error = Infallible;
|
type Error = Infallible;
|
||||||
|
|
||||||
async fn on_start(args: Self::Args, actor_ref: ActorRef<Self>) -> Result<Self, Self::Error> {
|
async fn on_start(args: Self::Args, _: ActorRef<Self>) -> Result<Self, Self::Error> {
|
||||||
Ok(Self {
|
Ok(args)
|
||||||
vault: args.vault,
|
|
||||||
pool: args.pool,
|
|
||||||
connected: HashMap::default(),
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn on_link_died(
|
||||||
|
&mut self,
|
||||||
|
_: WeakActorRef<Self>,
|
||||||
|
id: ActorId,
|
||||||
|
_: ActorStopReason,
|
||||||
|
) -> Result<ControlFlow<ActorStopReason>, Self::Error> {
|
||||||
|
if self.connected.remove(&id).is_some() {
|
||||||
|
info!(?id, actor = "UserAgentRegistry", event = "useragent.disconnected");
|
||||||
|
}
|
||||||
|
Ok(ControlFlow::Continue(()))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[messages]
|
||||||
|
impl UserAgentRegistry {
|
||||||
|
#[message(ctx)]
|
||||||
|
pub async fn connect_useragent(
|
||||||
|
&mut self,
|
||||||
|
actor: ActorRef<UserAgentSession>,
|
||||||
|
ctx: &mut Context<Self, ()>,
|
||||||
|
) {
|
||||||
|
info!(id = %actor.id(), actor = "UserAgentRegistry", event = "useragent.connected");
|
||||||
|
ctx.actor_ref().link(&actor).await;
|
||||||
|
self.connected.insert(actor.id(), actor);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[message]
|
||||||
|
pub fn get_connected(&self) -> Vec<ActorRef<UserAgentSession>> {
|
||||||
|
self.connected.values().cloned().collect()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -12,7 +12,8 @@ use tracing::error;
|
|||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
actors::{
|
actors::{
|
||||||
flow_coordinator::{RegisterUserAgent, client_connect_approval::ClientApprovalController},
|
flow_coordinator::client_connect_approval::ClientApprovalController,
|
||||||
|
useragent_registry::ConnectUseragent,
|
||||||
vault::events,
|
vault::events,
|
||||||
}, crypto::integrity, db::schema::useragent_client, peers::{client::ClientProfile, user_agent::{AuthCredentials, Credentials}}
|
}, crypto::integrity, db::schema::useragent_client, peers::{client::ClientProfile, user_agent::{AuthCredentials, Credentials}}
|
||||||
};
|
};
|
||||||
@@ -123,17 +124,17 @@ impl Actor for UserAgentSession {
|
|||||||
) -> Result<Self, Self::Error> {
|
) -> Result<Self, Self::Error> {
|
||||||
args.props
|
args.props
|
||||||
.actors
|
.actors
|
||||||
.flow_coordinator
|
.useragent_registry
|
||||||
.ask(RegisterUserAgent {
|
.ask(ConnectUseragent {
|
||||||
actor: this.clone(),
|
actor: this.clone(),
|
||||||
})
|
})
|
||||||
.await
|
.await
|
||||||
.map_err(|err| {
|
.map_err(|err| {
|
||||||
error!(
|
error!(
|
||||||
?err,
|
?err,
|
||||||
"Failed to register user agent connection with flow coordinator"
|
"Failed to register user agent connection with user agent registry"
|
||||||
);
|
);
|
||||||
Error::internal("Failed to register user agent connection with flow coordinator")
|
Error::internal("Failed to register user agent connection with user agent registry")
|
||||||
})?;
|
})?;
|
||||||
Ok(args)
|
Ok(args)
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user