From 51674bb39c07290ff0ba1b657d62db228399ced4 Mon Sep 17 00:00:00 2001 From: hdbg Date: Sat, 21 Mar 2026 13:10:18 +0100 Subject: [PATCH] refactor(actors): rename MessageRouter to FlowCoordinator --- AGENTS.md | 2 +- CLAUDE.md | 2 +- .../arbiter-server/src/actors/client/auth.rs | 8 +++---- .../src/actors/client/session.rs | 5 +++-- .../{router => flow_coordinator}/mod.rs | 22 +++++++++---------- .../crates/arbiter-server/src/actors/mod.rs | 11 ++++++---- .../src/actors/user_agent/session.rs | 8 +++---- .../arbiter-server/src/grpc/client/auth.rs | 2 +- 8 files changed, 32 insertions(+), 28 deletions(-) rename server/crates/arbiter-server/src/actors/{router => flow_coordinator}/mod.rs (88%) diff --git a/AGENTS.md b/AGENTS.md index bc166c4..fb2d230 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -67,7 +67,7 @@ The server is actor-based using the **kameo** crate. All long-lived state lives - **`Bootstrapper`** — Manages the one-time bootstrap token written to `~/.arbiter/bootstrap_token` on first run. - **`KeyHolder`** — Holds the encrypted root key and manages the Sealed/Unsealed vault state machine. On unseal, decrypts the root key into a `memsafe` hardened memory cell. -- **`MessageRouter`** — Coordinates streaming messages between user agents and SDK clients. +- **`FlowCoordinator`** — Coordinates cross-connection flow between user agents and SDK clients. - **`EvmActor`** — Handles EVM transaction policy enforcement and signing. Per-connection actors live under `actors/user_agent/` and `actors/client/`, each with `auth` (challenge-response authentication) and `session` (post-auth operations) sub-modules. diff --git a/CLAUDE.md b/CLAUDE.md index 776eccc..c3c3357 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -67,7 +67,7 @@ The server is actor-based using the **kameo** crate. All long-lived state lives - **`Bootstrapper`** — Manages the one-time bootstrap token written to `~/.arbiter/bootstrap_token` on first run. - **`KeyHolder`** — Holds the encrypted root key and manages the Sealed/Unsealed vault state machine. On unseal, decrypts the root key into a `memsafe` hardened memory cell. -- **`MessageRouter`** — Coordinates streaming messages between user agents and SDK clients. +- **`FlowCoordinator`** — Coordinates cross-connection flow between user agents and SDK clients. - **`EvmActor`** — Handles EVM transaction policy enforcement and signing. Per-connection actors live under `actors/user_agent/` and `actors/client/`, each with `auth` (challenge-response authentication) and `session` (post-auth operations) sub-modules. diff --git a/server/crates/arbiter-server/src/actors/client/auth.rs b/server/crates/arbiter-server/src/actors/client/auth.rs index 0b13ed9..1864678 100644 --- a/server/crates/arbiter-server/src/actors/client/auth.rs +++ b/server/crates/arbiter-server/src/actors/client/auth.rs @@ -15,7 +15,7 @@ use tracing::error; use crate::{ actors::{ client::ClientConnection, - router::{self, RequestClientApproval}, + flow_coordinator::{self, RequestClientApproval}, }, db::{ self, @@ -52,7 +52,7 @@ pub enum ApproveError { #[error("Client connection denied by user agents")] Denied, #[error("Upstream error: {0}")] - Upstream(router::ApprovalError), + Upstream(flow_coordinator::ApprovalError), } #[derive(Debug, Clone)] @@ -116,7 +116,7 @@ async fn approve_new_client( pubkey: VerifyingKey, ) -> Result<(), Error> { let result = actors - .router + .flow_coordinator .ask(RequestClientApproval { client_pubkey: pubkey, }) @@ -130,7 +130,7 @@ async fn approve_new_client( Err(Error::ApproveError(ApproveError::Upstream(e))) } Err(e) => { - error!(error = ?e, "Approval request to router failed"); + error!(error = ?e, "Approval request to flow coordinator failed"); Err(Error::ApproveError(ApproveError::Internal)) } } diff --git a/server/crates/arbiter-server/src/actors/client/session.rs b/server/crates/arbiter-server/src/actors/client/session.rs index 93f2c6e..83e2e29 100644 --- a/server/crates/arbiter-server/src/actors/client/session.rs +++ b/server/crates/arbiter-server/src/actors/client/session.rs @@ -3,7 +3,8 @@ use tracing::error; use crate::{ actors::{ - GlobalActors, client::ClientConnection, keyholder::KeyHolderState, router::RegisterClient, + GlobalActors, client::ClientConnection, flow_coordinator::RegisterClient, + keyholder::KeyHolderState, }, db, }; @@ -47,7 +48,7 @@ impl Actor for ClientSession { ) -> Result { args.props .actors - .router + .flow_coordinator .ask(RegisterClient { actor: this }) .await .map_err(|_| Error::ConnectionRegistrationFailed)?; diff --git a/server/crates/arbiter-server/src/actors/router/mod.rs b/server/crates/arbiter-server/src/actors/flow_coordinator/mod.rs similarity index 88% rename from server/crates/arbiter-server/src/actors/router/mod.rs rename to server/crates/arbiter-server/src/actors/flow_coordinator/mod.rs index f1654b2..ccaeb56 100644 --- a/server/crates/arbiter-server/src/actors/router/mod.rs +++ b/server/crates/arbiter-server/src/actors/flow_coordinator/mod.rs @@ -17,12 +17,12 @@ use crate::actors::{ }; #[derive(Default)] -pub struct MessageRouter { +pub struct FlowCoordinator { pub user_agents: HashMap>, pub clients: HashMap>, } -impl Actor for MessageRouter { +impl Actor for FlowCoordinator { type Args = Self; type Error = (); @@ -40,15 +40,15 @@ impl Actor for MessageRouter { if self.user_agents.remove(&id).is_some() { info!( ?id, - actor = "MessageRouter", + actor = "FlowCoordinator", event = "useragent.disconnected" ); } else if self.clients.remove(&id).is_some() { - info!(?id, actor = "MessageRouter", event = "client.disconnected"); + info!(?id, actor = "FlowCoordinator", event = "client.disconnected"); } else { info!( ?id, - actor = "MessageRouter", + actor = "FlowCoordinator", event = "unknown.actor.disconnected" ); } @@ -89,7 +89,7 @@ async fn request_client_approval( None => { warn!( id = weak_ref.id().to_string(), - actor = "MessageRouter", + actor = "FlowCoordinator", event = "useragent.disconnected_before_approval" ); } @@ -106,14 +106,14 @@ async fn request_client_approval( Ok(Err(err)) => { warn!( ?err, - actor = "MessageRouter", + actor = "FlowCoordinator", event = "useragent.approval_error" ); } Err(err) => { warn!( ?err, - actor = "MessageRouter", + actor = "FlowCoordinator", event = "useragent.approval_task_failed" ); } @@ -124,14 +124,14 @@ async fn request_client_approval( } #[messages] -impl MessageRouter { +impl FlowCoordinator { #[message(ctx)] pub async fn register_user_agent( &mut self, actor: ActorRef, ctx: &mut Context, ) { - info!(id = %actor.id(), actor = "MessageRouter", event = "useragent.connected"); + info!(id = %actor.id(), actor = "FlowCoordinator", event = "useragent.connected"); ctx.actor_ref().link(&actor).await; self.user_agents.insert(actor.id(), actor); } @@ -142,7 +142,7 @@ impl MessageRouter { actor: ActorRef, ctx: &mut Context, ) { - info!(id = %actor.id(), actor = "MessageRouter", event = "client.connected"); + info!(id = %actor.id(), actor = "FlowCoordinator", event = "client.connected"); ctx.actor_ref().link(&actor).await; self.clients.insert(actor.id(), actor); } diff --git a/server/crates/arbiter-server/src/actors/mod.rs b/server/crates/arbiter-server/src/actors/mod.rs index 4a678b1..1b70dd7 100644 --- a/server/crates/arbiter-server/src/actors/mod.rs +++ b/server/crates/arbiter-server/src/actors/mod.rs @@ -3,15 +3,18 @@ use miette::Diagnostic; use thiserror::Error; use crate::{ - actors::{bootstrap::Bootstrapper, evm::EvmActor, keyholder::KeyHolder, router::MessageRouter}, + actors::{ + bootstrap::Bootstrapper, evm::EvmActor, flow_coordinator::FlowCoordinator, + keyholder::KeyHolder, + }, db, }; pub mod bootstrap; pub mod client; mod evm; +pub mod flow_coordinator; pub mod keyholder; -pub mod router; pub mod user_agent; #[derive(Error, Debug, Diagnostic)] @@ -30,7 +33,7 @@ pub enum SpawnError { pub struct GlobalActors { pub key_holder: ActorRef, pub bootstrapper: ActorRef, - pub router: ActorRef, + pub flow_coordinator: ActorRef, pub evm: ActorRef, } @@ -41,7 +44,7 @@ impl GlobalActors { bootstrapper: Bootstrapper::spawn(Bootstrapper::new(&db).await?), evm: EvmActor::spawn(EvmActor::new(key_holder.clone(), db)), key_holder, - router: MessageRouter::spawn(MessageRouter::default()), + flow_coordinator: FlowCoordinator::spawn(FlowCoordinator::default()), }) } } diff --git a/server/crates/arbiter-server/src/actors/user_agent/session.rs b/server/crates/arbiter-server/src/actors/user_agent/session.rs index d9a9337..ad9b266 100644 --- a/server/crates/arbiter-server/src/actors/user_agent/session.rs +++ b/server/crates/arbiter-server/src/actors/user_agent/session.rs @@ -9,7 +9,7 @@ use tokio::sync::watch; use tracing::error; use crate::actors::{ - router::RegisterUserAgent, + flow_coordinator::RegisterUserAgent, user_agent::{OutOfBand, UserAgentConnection}, }; @@ -110,14 +110,14 @@ impl Actor for UserAgentSession { ) -> Result { args.props .actors - .router + .flow_coordinator .ask(RegisterUserAgent { actor: this.clone(), }) .await .map_err(|err| { - error!(?err, "Failed to register user agent connection with router"); - Error::internal("Failed to register user agent connection with router") + error!(?err, "Failed to register user agent connection with flow coordinator"); + Error::internal("Failed to register user agent connection with flow coordinator") })?; Ok(args) } diff --git a/server/crates/arbiter-server/src/grpc/client/auth.rs b/server/crates/arbiter-server/src/grpc/client/auth.rs index 8177810..6dd6de9 100644 --- a/server/crates/arbiter-server/src/grpc/client/auth.rs +++ b/server/crates/arbiter-server/src/grpc/client/auth.rs @@ -55,7 +55,7 @@ impl<'a> AuthTransportAdapter<'a> { ProtoAuthResult::ApprovalDenied } auth::Error::ApproveError(auth::ApproveError::Upstream( - crate::actors::router::ApprovalError::NoUserAgentsConnected, + crate::actors::flow_coordinator::ApprovalError::NoUserAgentsConnected, )) => ProtoAuthResult::NoUserAgentsOnline, auth::Error::ApproveError(auth::ApproveError::Internal) | auth::Error::DatabasePoolUnavailable