refactor(actors): rename MessageRouter to FlowCoordinator

This commit is contained in:
hdbg
2026-03-21 13:10:18 +01:00
parent cd07ab7a78
commit 51674bb39c
8 changed files with 32 additions and 28 deletions

View File

@@ -67,7 +67,7 @@ The server is actor-based using the **kameo** crate. All long-lived state lives
- **`Bootstrapper`** — Manages the one-time bootstrap token written to `~/.arbiter/bootstrap_token` on first run. - **`Bootstrapper`** — Manages the one-time bootstrap token written to `~/.arbiter/bootstrap_token` on first run.
- **`KeyHolder`** — Holds the encrypted root key and manages the Sealed/Unsealed vault state machine. On unseal, decrypts the root key into a `memsafe` hardened memory cell. - **`KeyHolder`** — Holds the encrypted root key and manages the Sealed/Unsealed vault state machine. On unseal, decrypts the root key into a `memsafe` hardened memory cell.
- **`MessageRouter`** — Coordinates streaming messages between user agents and SDK clients. - **`FlowCoordinator`** — Coordinates cross-connection flow between user agents and SDK clients.
- **`EvmActor`** — Handles EVM transaction policy enforcement and signing. - **`EvmActor`** — Handles EVM transaction policy enforcement and signing.
Per-connection actors live under `actors/user_agent/` and `actors/client/`, each with `auth` (challenge-response authentication) and `session` (post-auth operations) sub-modules. Per-connection actors live under `actors/user_agent/` and `actors/client/`, each with `auth` (challenge-response authentication) and `session` (post-auth operations) sub-modules.

View File

@@ -67,7 +67,7 @@ The server is actor-based using the **kameo** crate. All long-lived state lives
- **`Bootstrapper`** — Manages the one-time bootstrap token written to `~/.arbiter/bootstrap_token` on first run. - **`Bootstrapper`** — Manages the one-time bootstrap token written to `~/.arbiter/bootstrap_token` on first run.
- **`KeyHolder`** — Holds the encrypted root key and manages the Sealed/Unsealed vault state machine. On unseal, decrypts the root key into a `memsafe` hardened memory cell. - **`KeyHolder`** — Holds the encrypted root key and manages the Sealed/Unsealed vault state machine. On unseal, decrypts the root key into a `memsafe` hardened memory cell.
- **`MessageRouter`** — Coordinates streaming messages between user agents and SDK clients. - **`FlowCoordinator`** — Coordinates cross-connection flow between user agents and SDK clients.
- **`EvmActor`** — Handles EVM transaction policy enforcement and signing. - **`EvmActor`** — Handles EVM transaction policy enforcement and signing.
Per-connection actors live under `actors/user_agent/` and `actors/client/`, each with `auth` (challenge-response authentication) and `session` (post-auth operations) sub-modules. Per-connection actors live under `actors/user_agent/` and `actors/client/`, each with `auth` (challenge-response authentication) and `session` (post-auth operations) sub-modules.

View File

@@ -15,7 +15,7 @@ use tracing::error;
use crate::{ use crate::{
actors::{ actors::{
client::ClientConnection, client::ClientConnection,
router::{self, RequestClientApproval}, flow_coordinator::{self, RequestClientApproval},
}, },
db::{ db::{
self, self,
@@ -52,7 +52,7 @@ pub enum ApproveError {
#[error("Client connection denied by user agents")] #[error("Client connection denied by user agents")]
Denied, Denied,
#[error("Upstream error: {0}")] #[error("Upstream error: {0}")]
Upstream(router::ApprovalError), Upstream(flow_coordinator::ApprovalError),
} }
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
@@ -116,7 +116,7 @@ async fn approve_new_client(
pubkey: VerifyingKey, pubkey: VerifyingKey,
) -> Result<(), Error> { ) -> Result<(), Error> {
let result = actors let result = actors
.router .flow_coordinator
.ask(RequestClientApproval { .ask(RequestClientApproval {
client_pubkey: pubkey, client_pubkey: pubkey,
}) })
@@ -130,7 +130,7 @@ async fn approve_new_client(
Err(Error::ApproveError(ApproveError::Upstream(e))) Err(Error::ApproveError(ApproveError::Upstream(e)))
} }
Err(e) => { Err(e) => {
error!(error = ?e, "Approval request to router failed"); error!(error = ?e, "Approval request to flow coordinator failed");
Err(Error::ApproveError(ApproveError::Internal)) Err(Error::ApproveError(ApproveError::Internal))
} }
} }

View File

@@ -3,7 +3,8 @@ use tracing::error;
use crate::{ use crate::{
actors::{ actors::{
GlobalActors, client::ClientConnection, keyholder::KeyHolderState, router::RegisterClient, GlobalActors, client::ClientConnection, flow_coordinator::RegisterClient,
keyholder::KeyHolderState,
}, },
db, db,
}; };
@@ -47,7 +48,7 @@ impl Actor for ClientSession {
) -> Result<Self, Self::Error> { ) -> Result<Self, Self::Error> {
args.props args.props
.actors .actors
.router .flow_coordinator
.ask(RegisterClient { actor: this }) .ask(RegisterClient { actor: this })
.await .await
.map_err(|_| Error::ConnectionRegistrationFailed)?; .map_err(|_| Error::ConnectionRegistrationFailed)?;

View File

@@ -17,12 +17,12 @@ use crate::actors::{
}; };
#[derive(Default)] #[derive(Default)]
pub struct MessageRouter { pub struct FlowCoordinator {
pub user_agents: HashMap<ActorId, ActorRef<UserAgentSession>>, pub user_agents: HashMap<ActorId, ActorRef<UserAgentSession>>,
pub clients: HashMap<ActorId, ActorRef<ClientSession>>, pub clients: HashMap<ActorId, ActorRef<ClientSession>>,
} }
impl Actor for MessageRouter { impl Actor for FlowCoordinator {
type Args = Self; type Args = Self;
type Error = (); type Error = ();
@@ -40,15 +40,15 @@ impl Actor for MessageRouter {
if self.user_agents.remove(&id).is_some() { if self.user_agents.remove(&id).is_some() {
info!( info!(
?id, ?id,
actor = "MessageRouter", actor = "FlowCoordinator",
event = "useragent.disconnected" event = "useragent.disconnected"
); );
} else if self.clients.remove(&id).is_some() { } else if self.clients.remove(&id).is_some() {
info!(?id, actor = "MessageRouter", event = "client.disconnected"); info!(?id, actor = "FlowCoordinator", event = "client.disconnected");
} else { } else {
info!( info!(
?id, ?id,
actor = "MessageRouter", actor = "FlowCoordinator",
event = "unknown.actor.disconnected" event = "unknown.actor.disconnected"
); );
} }
@@ -89,7 +89,7 @@ async fn request_client_approval(
None => { None => {
warn!( warn!(
id = weak_ref.id().to_string(), id = weak_ref.id().to_string(),
actor = "MessageRouter", actor = "FlowCoordinator",
event = "useragent.disconnected_before_approval" event = "useragent.disconnected_before_approval"
); );
} }
@@ -106,14 +106,14 @@ async fn request_client_approval(
Ok(Err(err)) => { Ok(Err(err)) => {
warn!( warn!(
?err, ?err,
actor = "MessageRouter", actor = "FlowCoordinator",
event = "useragent.approval_error" event = "useragent.approval_error"
); );
} }
Err(err) => { Err(err) => {
warn!( warn!(
?err, ?err,
actor = "MessageRouter", actor = "FlowCoordinator",
event = "useragent.approval_task_failed" event = "useragent.approval_task_failed"
); );
} }
@@ -124,14 +124,14 @@ async fn request_client_approval(
} }
#[messages] #[messages]
impl MessageRouter { impl FlowCoordinator {
#[message(ctx)] #[message(ctx)]
pub async fn register_user_agent( pub async fn register_user_agent(
&mut self, &mut self,
actor: ActorRef<UserAgentSession>, actor: ActorRef<UserAgentSession>,
ctx: &mut Context<Self, ()>, ctx: &mut Context<Self, ()>,
) { ) {
info!(id = %actor.id(), actor = "MessageRouter", event = "useragent.connected"); info!(id = %actor.id(), actor = "FlowCoordinator", event = "useragent.connected");
ctx.actor_ref().link(&actor).await; ctx.actor_ref().link(&actor).await;
self.user_agents.insert(actor.id(), actor); self.user_agents.insert(actor.id(), actor);
} }
@@ -142,7 +142,7 @@ impl MessageRouter {
actor: ActorRef<ClientSession>, actor: ActorRef<ClientSession>,
ctx: &mut Context<Self, ()>, ctx: &mut Context<Self, ()>,
) { ) {
info!(id = %actor.id(), actor = "MessageRouter", event = "client.connected"); info!(id = %actor.id(), actor = "FlowCoordinator", event = "client.connected");
ctx.actor_ref().link(&actor).await; ctx.actor_ref().link(&actor).await;
self.clients.insert(actor.id(), actor); self.clients.insert(actor.id(), actor);
} }

View File

@@ -3,15 +3,18 @@ use miette::Diagnostic;
use thiserror::Error; use thiserror::Error;
use crate::{ use crate::{
actors::{bootstrap::Bootstrapper, evm::EvmActor, keyholder::KeyHolder, router::MessageRouter}, actors::{
bootstrap::Bootstrapper, evm::EvmActor, flow_coordinator::FlowCoordinator,
keyholder::KeyHolder,
},
db, db,
}; };
pub mod bootstrap; pub mod bootstrap;
pub mod client; pub mod client;
mod evm; mod evm;
pub mod flow_coordinator;
pub mod keyholder; pub mod keyholder;
pub mod router;
pub mod user_agent; pub mod user_agent;
#[derive(Error, Debug, Diagnostic)] #[derive(Error, Debug, Diagnostic)]
@@ -30,7 +33,7 @@ pub enum SpawnError {
pub struct GlobalActors { pub struct GlobalActors {
pub key_holder: ActorRef<KeyHolder>, pub key_holder: ActorRef<KeyHolder>,
pub bootstrapper: ActorRef<Bootstrapper>, pub bootstrapper: ActorRef<Bootstrapper>,
pub router: ActorRef<MessageRouter>, pub flow_coordinator: ActorRef<FlowCoordinator>,
pub evm: ActorRef<EvmActor>, pub evm: ActorRef<EvmActor>,
} }
@@ -41,7 +44,7 @@ impl GlobalActors {
bootstrapper: Bootstrapper::spawn(Bootstrapper::new(&db).await?), bootstrapper: Bootstrapper::spawn(Bootstrapper::new(&db).await?),
evm: EvmActor::spawn(EvmActor::new(key_holder.clone(), db)), evm: EvmActor::spawn(EvmActor::new(key_holder.clone(), db)),
key_holder, key_holder,
router: MessageRouter::spawn(MessageRouter::default()), flow_coordinator: FlowCoordinator::spawn(FlowCoordinator::default()),
}) })
} }
} }

View File

@@ -9,7 +9,7 @@ use tokio::sync::watch;
use tracing::error; use tracing::error;
use crate::actors::{ use crate::actors::{
router::RegisterUserAgent, flow_coordinator::RegisterUserAgent,
user_agent::{OutOfBand, UserAgentConnection}, user_agent::{OutOfBand, UserAgentConnection},
}; };
@@ -110,14 +110,14 @@ impl Actor for UserAgentSession {
) -> Result<Self, Self::Error> { ) -> Result<Self, Self::Error> {
args.props args.props
.actors .actors
.router .flow_coordinator
.ask(RegisterUserAgent { .ask(RegisterUserAgent {
actor: this.clone(), actor: this.clone(),
}) })
.await .await
.map_err(|err| { .map_err(|err| {
error!(?err, "Failed to register user agent connection with router"); error!(?err, "Failed to register user agent connection with flow coordinator");
Error::internal("Failed to register user agent connection with router") Error::internal("Failed to register user agent connection with flow coordinator")
})?; })?;
Ok(args) Ok(args)
} }

View File

@@ -55,7 +55,7 @@ impl<'a> AuthTransportAdapter<'a> {
ProtoAuthResult::ApprovalDenied ProtoAuthResult::ApprovalDenied
} }
auth::Error::ApproveError(auth::ApproveError::Upstream( auth::Error::ApproveError(auth::ApproveError::Upstream(
crate::actors::router::ApprovalError::NoUserAgentsConnected, crate::actors::flow_coordinator::ApprovalError::NoUserAgentsConnected,
)) => ProtoAuthResult::NoUserAgentsOnline, )) => ProtoAuthResult::NoUserAgentsOnline,
auth::Error::ApproveError(auth::ApproveError::Internal) auth::Error::ApproveError(auth::ApproveError::Internal)
| auth::Error::DatabasePoolUnavailable | auth::Error::DatabasePoolUnavailable