diff --git a/server/crates/arbiter-server/src/actors/flow_coordinator/mod.rs b/server/crates/arbiter-server/src/actors/flow_coordinator/mod.rs index 51a9bac..20fee53 100644 --- a/server/crates/arbiter-server/src/actors/flow_coordinator/mod.rs +++ b/server/crates/arbiter-server/src/actors/flow_coordinator/mod.rs @@ -10,19 +10,27 @@ use kameo::{ use tracing::info; use crate::{ - actors::flow_coordinator::client_connect_approval::ClientApprovalController, - peers::{ - client::{ClientProfile, session::ClientSession}, - user_agent::UserAgentSession, + actors::{ + flow_coordinator::client_connect_approval::ClientApprovalController, + useragent_registry::{GetConnected, UserAgentRegistry}, }, + peers::client::{ClientProfile, session::ClientSession}, }; pub mod client_connect_approval; -#[derive(Default)] pub struct FlowCoordinator { - pub user_agents: HashMap>, pub clients: HashMap>, + useragent_registry: ActorRef, +} + +impl FlowCoordinator { + pub fn new(useragent_registry: ActorRef) -> Self { + Self { + clients: HashMap::default(), + useragent_registry, + } + } } impl Actor for FlowCoordinator { @@ -40,13 +48,7 @@ impl Actor for FlowCoordinator { id: ActorId, _: ActorStopReason, ) -> Result, Self::Error> { - if self.user_agents.remove(&id).is_some() { - info!( - ?id, - actor = "FlowCoordinator", - event = "useragent.disconnected" - ); - } else if self.clients.remove(&id).is_some() { + if self.clients.remove(&id).is_some() { info!( ?id, actor = "FlowCoordinator", @@ -71,17 +73,6 @@ pub enum ApprovalError { #[messages] impl FlowCoordinator { - #[message(ctx)] - pub async fn register_user_agent( - &mut self, - actor: ActorRef, - ctx: &mut Context, - ) { - info!(id = %actor.id(), actor = "FlowCoordinator", event = "useragent.connected"); - ctx.actor_ref().link(&actor).await; - self.user_agents.insert(actor.id(), actor); - } - #[message(ctx)] pub async fn register_client( &mut self, @@ -103,7 +94,14 @@ impl FlowCoordinator { unreachable!("Expected `request_client_approval` to have callback channel"); }; - let refs: Vec<_> = self.user_agents.values().cloned().collect(); + let refs = match self.useragent_registry.ask(GetConnected).await { + Ok(refs) => refs, + Err(_) => { + reply_sender.send(Err(ApprovalError::NoUserAgentsConnected)); + return reply; + } + }; + if refs.is_empty() { reply_sender.send(Err(ApprovalError::NoUserAgentsConnected)); return reply; diff --git a/server/crates/arbiter-server/src/actors/mod.rs b/server/crates/arbiter-server/src/actors/mod.rs index adb51f7..6b01dd8 100644 --- a/server/crates/arbiter-server/src/actors/mod.rs +++ b/server/crates/arbiter-server/src/actors/mod.rs @@ -4,7 +4,11 @@ use thiserror::Error; use crate::{ actors::{ - bootstrap::Bootstrapper, evm::EvmActor, flow_coordinator::FlowCoordinator, vault::Vault, + bootstrap::Bootstrapper, + evm::EvmActor, + flow_coordinator::FlowCoordinator, + useragent_registry::UserAgentRegistry, + vault::Vault, }, db, }; @@ -30,6 +34,7 @@ pub struct GlobalActors { pub vault: ActorRef, pub bootstrapper: ActorRef, pub flow_coordinator: ActorRef, + pub useragent_registry: ActorRef, pub evm: ActorRef, pub events: ActorRef, } @@ -42,11 +47,15 @@ impl GlobalActors { pub async fn spawn(db: db::DatabasePool) -> Result { let message_bus = Self::spawn_message_bus(); let key_holder = Vault::spawn(Vault::new(db.clone(), message_bus.clone()).await?); + let useragent_registry = UserAgentRegistry::spawn(UserAgentRegistry::default()); Ok(Self { bootstrapper: Bootstrapper::spawn(Bootstrapper::new(&db).await?), evm: EvmActor::spawn(EvmActor::new(key_holder.clone(), db)), vault: key_holder, - flow_coordinator: FlowCoordinator::spawn(FlowCoordinator::default()), + flow_coordinator: FlowCoordinator::spawn(FlowCoordinator::new( + useragent_registry.clone(), + )), + useragent_registry, events: message_bus, }) } diff --git a/server/crates/arbiter-server/src/actors/useragent_registry.rs b/server/crates/arbiter-server/src/actors/useragent_registry.rs index 3c80735..74e616c 100644 --- a/server/crates/arbiter-server/src/actors/useragent_registry.rs +++ b/server/crates/arbiter-server/src/actors/useragent_registry.rs @@ -1,57 +1,58 @@ -use alloy::primitives::map::HashMap; -use arbiter_crypto::authn; -use kameo::{error::Infallible, prelude::*}; +use std::{collections::HashMap, ops::ControlFlow}; -use crate::{db::DatabasePool, peers::user_agent::{Credentials, UserAgentSession}}; +use kameo::{ + Actor, + actor::{ActorId, ActorRef}, + error::Infallible, + messages, + prelude::{ActorStopReason, Context, WeakActorRef}, +}; +use tracing::info; -use super::vault::{Vault, events as vault_events}; - -pub struct Args { - pub vault: ActorRef, - pub pool: DatabasePool, -} +use crate::peers::user_agent::UserAgentSession; +#[derive(Default)] pub struct UserAgentRegistry { - vault: ActorRef, - pool: DatabasePool, - connected: HashMap>, + connected: HashMap>, } -impl Message for UserAgentRegistry { - type Reply = (); - - async fn handle( - &mut self, - msg: vault_events::Bootstrapped, - ctx: &mut Context, - ) -> Self::Reply { - todo!() - } -} - -impl Message for UserAgentRegistry { - type Reply = (); - - async fn handle( - &mut self, - msg: vault_events::Unsealed, - ctx: &mut Context, - ) -> Self::Reply { - todo!() - } -} impl Actor for UserAgentRegistry { - type Args = Args; + type Args = Self; type Error = Infallible; - async fn on_start(args: Self::Args, actor_ref: ActorRef) -> Result { - Ok(Self { - vault: args.vault, - pool: args.pool, - connected: HashMap::default(), - }) + async fn on_start(args: Self::Args, _: ActorRef) -> Result { + Ok(args) } - + async fn on_link_died( + &mut self, + _: WeakActorRef, + id: ActorId, + _: ActorStopReason, + ) -> Result, Self::Error> { + if self.connected.remove(&id).is_some() { + info!(?id, actor = "UserAgentRegistry", event = "useragent.disconnected"); + } + Ok(ControlFlow::Continue(())) + } +} + +#[messages] +impl UserAgentRegistry { + #[message(ctx)] + pub async fn connect_useragent( + &mut self, + actor: ActorRef, + ctx: &mut Context, + ) { + info!(id = %actor.id(), actor = "UserAgentRegistry", event = "useragent.connected"); + ctx.actor_ref().link(&actor).await; + self.connected.insert(actor.id(), actor); + } + + #[message] + pub fn get_connected(&self) -> Vec> { + self.connected.values().cloned().collect() + } } diff --git a/server/crates/arbiter-server/src/peers/user_agent/session/mod.rs b/server/crates/arbiter-server/src/peers/user_agent/session/mod.rs index 55cd9e5..7356d04 100644 --- a/server/crates/arbiter-server/src/peers/user_agent/session/mod.rs +++ b/server/crates/arbiter-server/src/peers/user_agent/session/mod.rs @@ -12,7 +12,8 @@ use tracing::error; use crate::{ actors::{ - flow_coordinator::{RegisterUserAgent, client_connect_approval::ClientApprovalController}, + flow_coordinator::client_connect_approval::ClientApprovalController, + useragent_registry::ConnectUseragent, vault::events, }, crypto::integrity, db::schema::useragent_client, peers::{client::ClientProfile, user_agent::{AuthCredentials, Credentials}} }; @@ -123,17 +124,17 @@ impl Actor for UserAgentSession { ) -> Result { args.props .actors - .flow_coordinator - .ask(RegisterUserAgent { + .useragent_registry + .ask(ConnectUseragent { actor: this.clone(), }) .await .map_err(|err| { error!( ?err, - "Failed to register user agent connection with flow coordinator" + "Failed to register user agent connection with user agent registry" ); - Error::internal("Failed to register user agent connection with flow coordinator") + Error::internal("Failed to register user agent connection with user agent registry") })?; Ok(args) }