refactor(server): separated UserAgentActor gRPC transport related things into separate module

This commit is contained in:
hdbg
2026-02-14 17:54:23 +01:00
parent 069a997691
commit 345a967c13
2 changed files with 124 additions and 77 deletions

View File

@@ -24,7 +24,12 @@ use tokio::sync::mpsc::Sender;
use tonic::Status; use tonic::Status;
use tracing::{error, info}; use tracing::{error, info};
use crate::{ServerContext, context::bootstrap::ConsumeToken, db::schema, errors::GrpcStatusExt}; use crate::{
ServerContext,
context::bootstrap::{BootstrapActor, ConsumeToken},
db::{self, schema},
errors::GrpcStatusExt,
};
/// Context for state machine with validated key and sent challenge /// Context for state machine with validated key and sent challenge
/// Challenge is then transformed to bytes using shared function and verified /// Challenge is then transformed to bytes using shared function and verified
@@ -81,7 +86,8 @@ impl UserAgentStateMachineContext for ServerContext {
#[derive(Actor)] #[derive(Actor)]
pub struct UserAgentActor { pub struct UserAgentActor {
context: ServerContext, db: db::DatabasePool,
bootstapper: ActorRef<BootstrapActor>,
state: UserAgentStateMachine<ServerContext>, state: UserAgentStateMachine<ServerContext>,
tx: Sender<Result<UserAgentResponse, Status>>, tx: Sender<Result<UserAgentResponse, Status>>,
} }
@@ -92,12 +98,27 @@ impl UserAgentActor {
tx: Sender<Result<UserAgentResponse, Status>>, tx: Sender<Result<UserAgentResponse, Status>>,
) -> Self { ) -> Self {
Self { Self {
context: context.clone(), db: context.db.clone(),
bootstapper: context.bootstrapper.clone(),
state: UserAgentStateMachine::new(context), state: UserAgentStateMachine::new(context),
tx, tx,
} }
} }
pub(crate) fn new_manual(
db: db::DatabasePool,
bootstapper: ActorRef<BootstrapActor>,
state: UserAgentStateMachine<ServerContext>,
tx: Sender<Result<UserAgentResponse, Status>>,
) -> Self {
Self {
db,
bootstapper,
state,
tx,
}
}
fn transition(&mut self, event: UserAgentEvents) -> Result<(), Status> { fn transition(&mut self, event: UserAgentEvents) -> Result<(), Status> {
self.state.process_event(event).map_err(|e| { self.state.process_event(event).map_err(|e| {
error!(?e, "State transition failed"); error!(?e, "State transition failed");
@@ -112,8 +133,7 @@ impl UserAgentActor {
token: String, token: String,
) -> Result<UserAgentResponse, Status> { ) -> Result<UserAgentResponse, Status> {
let token_ok: bool = self let token_ok: bool = self
.context .bootstapper
.bootstrapper
.ask(ConsumeToken { token }) .ask(ConsumeToken { token })
.await .await
.map_err(|e| { .map_err(|e| {
@@ -127,7 +147,7 @@ impl UserAgentActor {
} }
{ {
let mut conn = self.context.db.get().await.to_status()?; let mut conn = self.db.get().await.to_status()?;
diesel::insert_into(schema::useragent_client::table) diesel::insert_into(schema::useragent_client::table)
.values(( .values((
@@ -146,7 +166,7 @@ impl UserAgentActor {
async fn auth_with_challenge(&mut self, pubkey: VerifyingKey, pubkey_bytes: Vec<u8>) -> Output { async fn auth_with_challenge(&mut self, pubkey: VerifyingKey, pubkey_bytes: Vec<u8>) -> Output {
let nonce: Option<i32> = { let nonce: Option<i32> = {
let mut db_conn = self.context.db.get().await.to_status()?; let mut db_conn = self.db.get().await.to_status()?;
db_conn db_conn
.transaction(|conn| { .transaction(|conn| {
Box::pin(async move { Box::pin(async move {
@@ -285,73 +305,5 @@ impl UserAgentActor {
} }
} }
pub(crate) async fn handle_user_agent( mod transport;
context: ServerContext, pub(crate) use transport::handle_user_agent;
mut req_stream: tonic::Streaming<UserAgentRequest>,
tx: mpsc::Sender<Result<UserAgentResponse, Status>>,
) {
let actor = UserAgentActor::spawn(UserAgentActor::new(context, tx.clone()));
while let Some(Ok(req)) = req_stream.next().await
&& actor.is_alive()
{
match process_message(&actor, req).await {
Ok(resp) => {
if tx.send(Ok(resp)).await.is_err() {
error!(actor = "useragent", "Failed to send response to client");
break;
}
}
Err(status) => {
let _ = tx.send(Err(status)).await;
break;
}
}
}
actor.kill();
}
async fn process_message(
actor: &ActorRef<UserAgentActor>,
req: UserAgentRequest,
) -> Result<UserAgentResponse, Status> {
let msg = req.payload.ok_or_else(|| {
error!(actor = "useragent", "Received message with no payload");
Status::invalid_argument("Expected message with payload")
})?;
let UserAgentRequestPayload::AuthMessage(ClientMessage {
payload: Some(client_message),
}) = msg
else {
error!(
actor = "useragent",
"Received unexpected message type during authentication"
);
return Err(Status::invalid_argument(
"Expected AuthMessage with ClientMessage payload",
));
};
match client_message {
ClientAuthPayload::AuthChallengeRequest(req) => actor
.ask(HandleAuthChallengeRequest { req })
.await
.map_err(into_status),
ClientAuthPayload::AuthChallengeSolution(solution) => actor
.ask(HandleAuthChallengeSolution { solution })
.await
.map_err(into_status),
}
}
fn into_status<M>(e: SendError<M, Status>) -> Status {
match e {
SendError::HandlerError(status) => status,
_ => {
error!(actor = "useragent", "Failed to send message to actor");
Status::internal("session failure")
}
}
}

View File

@@ -0,0 +1,95 @@
use super::UserAgentActor;
use arbiter_proto::proto::{
UserAgentRequest, UserAgentResponse,
auth::{
self, AuthChallenge, AuthChallengeRequest, AuthOk, ClientMessage,
ServerMessage as AuthServerMessage, client_message::Payload as ClientAuthPayload,
server_message::Payload as ServerAuthPayload,
},
user_agent_request::Payload as UserAgentRequestPayload,
user_agent_response::Payload as UserAgentResponsePayload,
};
use futures::StreamExt;
use kameo::{
actor::{ActorRef, Spawn as _},
error::SendError,
};
use tokio::sync::mpsc;
use tonic::Status;
use tracing::error;
use crate::{
actors::user_agent::{HandleAuthChallengeRequest, HandleAuthChallengeSolution},
context::ServerContext,
};
pub(crate) async fn handle_user_agent(
context: ServerContext,
mut req_stream: tonic::Streaming<UserAgentRequest>,
tx: mpsc::Sender<Result<UserAgentResponse, Status>>,
) {
let actor = UserAgentActor::spawn(UserAgentActor::new(context, tx.clone()));
while let Some(Ok(req)) = req_stream.next().await
&& actor.is_alive()
{
match process_message(&actor, req).await {
Ok(resp) => {
if tx.send(Ok(resp)).await.is_err() {
error!(actor = "useragent", "Failed to send response to client");
break;
}
}
Err(status) => {
let _ = tx.send(Err(status)).await;
break;
}
}
}
actor.kill();
}
async fn process_message(
actor: &ActorRef<UserAgentActor>,
req: UserAgentRequest,
) -> Result<UserAgentResponse, Status> {
let msg = req.payload.ok_or_else(|| {
error!(actor = "useragent", "Received message with no payload");
Status::invalid_argument("Expected message with payload")
})?;
let UserAgentRequestPayload::AuthMessage(ClientMessage {
payload: Some(client_message),
}) = msg
else {
error!(
actor = "useragent",
"Received unexpected message type during authentication"
);
return Err(Status::invalid_argument(
"Expected AuthMessage with ClientMessage payload",
));
};
match client_message {
ClientAuthPayload::AuthChallengeRequest(req) => actor
.ask(HandleAuthChallengeRequest { req })
.await
.map_err(into_status),
ClientAuthPayload::AuthChallengeSolution(solution) => actor
.ask(HandleAuthChallengeSolution { solution })
.await
.map_err(into_status),
}
}
fn into_status<M>(e: SendError<M, Status>) -> Status {
match e {
SendError::HandlerError(status) => status,
_ => {
error!(actor = "useragent", "Failed to send message to actor");
Status::internal("session failure")
}
}
}