From 345a967c1336a6fb555009ded765203a8df00c5e Mon Sep 17 00:00:00 2001 From: hdbg Date: Sat, 14 Feb 2026 17:54:23 +0100 Subject: [PATCH] refactor(server): separated `UserAgentActor` gRPC transport related things into separate module --- .../arbiter-server/src/actors/user_agent.rs | 106 +++++------------- .../src/actors/user_agent/transport.rs | 95 ++++++++++++++++ 2 files changed, 124 insertions(+), 77 deletions(-) create mode 100644 server/crates/arbiter-server/src/actors/user_agent/transport.rs diff --git a/server/crates/arbiter-server/src/actors/user_agent.rs b/server/crates/arbiter-server/src/actors/user_agent.rs index d44b087..af1d040 100644 --- a/server/crates/arbiter-server/src/actors/user_agent.rs +++ b/server/crates/arbiter-server/src/actors/user_agent.rs @@ -24,7 +24,12 @@ use tokio::sync::mpsc::Sender; use tonic::Status; use tracing::{error, info}; -use crate::{ServerContext, context::bootstrap::ConsumeToken, db::schema, errors::GrpcStatusExt}; +use crate::{ + ServerContext, + context::bootstrap::{BootstrapActor, ConsumeToken}, + db::{self, schema}, + errors::GrpcStatusExt, +}; /// Context for state machine with validated key and sent challenge /// Challenge is then transformed to bytes using shared function and verified @@ -81,7 +86,8 @@ impl UserAgentStateMachineContext for ServerContext { #[derive(Actor)] pub struct UserAgentActor { - context: ServerContext, + db: db::DatabasePool, + bootstapper: ActorRef, state: UserAgentStateMachine, tx: Sender>, } @@ -92,12 +98,27 @@ impl UserAgentActor { tx: Sender>, ) -> Self { Self { - context: context.clone(), + db: context.db.clone(), + bootstapper: context.bootstrapper.clone(), state: UserAgentStateMachine::new(context), tx, } } + pub(crate) fn new_manual( + db: db::DatabasePool, + bootstapper: ActorRef, + state: UserAgentStateMachine, + tx: Sender>, + ) -> Self { + Self { + db, + bootstapper, + state, + tx, + } + } + fn transition(&mut self, event: UserAgentEvents) -> Result<(), Status> { self.state.process_event(event).map_err(|e| { error!(?e, "State transition failed"); @@ -112,8 +133,7 @@ impl UserAgentActor { token: String, ) -> Result { let token_ok: bool = self - .context - .bootstrapper + .bootstapper .ask(ConsumeToken { token }) .await .map_err(|e| { @@ -127,7 +147,7 @@ impl UserAgentActor { } { - let mut conn = self.context.db.get().await.to_status()?; + let mut conn = self.db.get().await.to_status()?; diesel::insert_into(schema::useragent_client::table) .values(( @@ -146,7 +166,7 @@ impl UserAgentActor { async fn auth_with_challenge(&mut self, pubkey: VerifyingKey, pubkey_bytes: Vec) -> Output { let nonce: Option = { - let mut db_conn = self.context.db.get().await.to_status()?; + let mut db_conn = self.db.get().await.to_status()?; db_conn .transaction(|conn| { Box::pin(async move { @@ -285,73 +305,5 @@ impl UserAgentActor { } } -pub(crate) async fn handle_user_agent( - context: ServerContext, - mut req_stream: tonic::Streaming, - tx: mpsc::Sender>, -) { - let actor = UserAgentActor::spawn(UserAgentActor::new(context, tx.clone())); - - while let Some(Ok(req)) = req_stream.next().await - && actor.is_alive() - { - match process_message(&actor, req).await { - Ok(resp) => { - if tx.send(Ok(resp)).await.is_err() { - error!(actor = "useragent", "Failed to send response to client"); - break; - } - } - Err(status) => { - let _ = tx.send(Err(status)).await; - break; - } - } - } - - actor.kill(); -} - -async fn process_message( - actor: &ActorRef, - req: UserAgentRequest, -) -> Result { - let msg = req.payload.ok_or_else(|| { - error!(actor = "useragent", "Received message with no payload"); - Status::invalid_argument("Expected message with payload") - })?; - - let UserAgentRequestPayload::AuthMessage(ClientMessage { - payload: Some(client_message), - }) = msg - else { - error!( - actor = "useragent", - "Received unexpected message type during authentication" - ); - return Err(Status::invalid_argument( - "Expected AuthMessage with ClientMessage payload", - )); - }; - - match client_message { - ClientAuthPayload::AuthChallengeRequest(req) => actor - .ask(HandleAuthChallengeRequest { req }) - .await - .map_err(into_status), - ClientAuthPayload::AuthChallengeSolution(solution) => actor - .ask(HandleAuthChallengeSolution { solution }) - .await - .map_err(into_status), - } -} - -fn into_status(e: SendError) -> Status { - match e { - SendError::HandlerError(status) => status, - _ => { - error!(actor = "useragent", "Failed to send message to actor"); - Status::internal("session failure") - } - } -} +mod transport; +pub(crate) use transport::handle_user_agent; diff --git a/server/crates/arbiter-server/src/actors/user_agent/transport.rs b/server/crates/arbiter-server/src/actors/user_agent/transport.rs new file mode 100644 index 0000000..2114d56 --- /dev/null +++ b/server/crates/arbiter-server/src/actors/user_agent/transport.rs @@ -0,0 +1,95 @@ +use super::UserAgentActor; +use arbiter_proto::proto::{ + UserAgentRequest, UserAgentResponse, + auth::{ + self, AuthChallenge, AuthChallengeRequest, AuthOk, ClientMessage, + ServerMessage as AuthServerMessage, client_message::Payload as ClientAuthPayload, + server_message::Payload as ServerAuthPayload, + }, + user_agent_request::Payload as UserAgentRequestPayload, + user_agent_response::Payload as UserAgentResponsePayload, +}; +use futures::StreamExt; +use kameo::{ + actor::{ActorRef, Spawn as _}, + error::SendError, +}; +use tokio::sync::mpsc; +use tonic::Status; +use tracing::error; + +use crate::{ + actors::user_agent::{HandleAuthChallengeRequest, HandleAuthChallengeSolution}, + context::ServerContext, +}; + +pub(crate) async fn handle_user_agent( + context: ServerContext, + mut req_stream: tonic::Streaming, + tx: mpsc::Sender>, +) { + let actor = UserAgentActor::spawn(UserAgentActor::new(context, tx.clone())); + + while let Some(Ok(req)) = req_stream.next().await + && actor.is_alive() + { + match process_message(&actor, req).await { + Ok(resp) => { + if tx.send(Ok(resp)).await.is_err() { + error!(actor = "useragent", "Failed to send response to client"); + break; + } + } + Err(status) => { + let _ = tx.send(Err(status)).await; + break; + } + } + } + + actor.kill(); +} + +async fn process_message( + actor: &ActorRef, + req: UserAgentRequest, +) -> Result { + let msg = req.payload.ok_or_else(|| { + error!(actor = "useragent", "Received message with no payload"); + Status::invalid_argument("Expected message with payload") + })?; + + let UserAgentRequestPayload::AuthMessage(ClientMessage { + payload: Some(client_message), + }) = msg + else { + error!( + actor = "useragent", + "Received unexpected message type during authentication" + ); + return Err(Status::invalid_argument( + "Expected AuthMessage with ClientMessage payload", + )); + }; + + match client_message { + ClientAuthPayload::AuthChallengeRequest(req) => actor + .ask(HandleAuthChallengeRequest { req }) + .await + .map_err(into_status), + ClientAuthPayload::AuthChallengeSolution(solution) => actor + .ask(HandleAuthChallengeSolution { solution }) + .await + .map_err(into_status), + } +} + +fn into_status(e: SendError) -> Status { + match e { + SendError::HandlerError(status) => status, + _ => { + error!(actor = "useragent", "Failed to send message to actor"); + Status::internal("session failure") + } + } +}