diff --git a/protobufs/user_agent.proto b/protobufs/user_agent.proto index 2a7e3c0..cb14466 100644 --- a/protobufs/user_agent.proto +++ b/protobufs/user_agent.proto @@ -106,6 +106,16 @@ enum VaultState { VAULT_STATE_ERROR = 4; } +message ClientConnectionRequest { + bytes pubkey = 1; +} + +message ClientConnectionResponse { + bool approved = 1; +} + +message ClientConnectionCancel {} + message UserAgentRequest { oneof payload { AuthChallengeRequest auth_challenge_request = 1; @@ -118,7 +128,7 @@ message UserAgentRequest { arbiter.evm.EvmGrantCreateRequest evm_grant_create = 8; arbiter.evm.EvmGrantDeleteRequest evm_grant_delete = 9; arbiter.evm.EvmGrantListRequest evm_grant_list = 10; - // field 11 reserved: was client_connection_response (online approval removed) + ClientConnectionResponse client_connection_response = 11; SdkClientApproveRequest sdk_client_approve = 12; SdkClientRevokeRequest sdk_client_revoke = 13; google.protobuf.Empty sdk_client_list = 14; @@ -136,7 +146,8 @@ message UserAgentResponse { arbiter.evm.EvmGrantCreateResponse evm_grant_create = 8; arbiter.evm.EvmGrantDeleteResponse evm_grant_delete = 9; arbiter.evm.EvmGrantListResponse evm_grant_list = 10; - // fields 11, 12 reserved: were client_connection_request, client_connection_cancel (online approval removed) + ClientConnectionRequest client_connection_request = 11; + ClientConnectionCancel client_connection_cancel = 12; SdkClientApproveResponse sdk_client_approve = 13; SdkClientRevokeResponse sdk_client_revoke = 14; SdkClientListResponse sdk_client_list = 15; diff --git a/server/crates/arbiter-server/src/actors/client/auth.rs b/server/crates/arbiter-server/src/actors/client/auth.rs index 649b987..55acb4c 100644 --- a/server/crates/arbiter-server/src/actors/client/auth.rs +++ b/server/crates/arbiter-server/src/actors/client/auth.rs @@ -8,13 +8,19 @@ use arbiter_proto::{ }, transport::expect_message, }; -use diesel::{ExpressionMethods as _, OptionalExtension as _, QueryDsl as _, update}; +use diesel::{ + ExpressionMethods as _, OptionalExtension as _, QueryDsl as _, dsl::insert_into, update, +}; use diesel_async::RunQueryDsl as _; use ed25519_dalek::VerifyingKey; +use kameo::error::SendError; use tracing::error; use crate::{ - actors::client::ClientConnection, + actors::{ + client::ClientConnection, + router::{self, RequestClientApproval}, + }, db::{self, schema::program_client}, }; @@ -34,14 +40,24 @@ pub enum Error { DatabaseOperationFailed, #[error("Invalid challenge solution")] InvalidChallengeSolution, - #[error("Client not registered")] - NotRegistered, + #[error("Client approval request failed")] + ApproveError(#[from] ApproveError), #[error("Internal error")] InternalError, #[error("Transport error")] Transport, } +#[derive(thiserror::Error, Debug, Clone, PartialEq, Eq)] +pub enum ApproveError { + #[error("Internal error")] + Internal, + #[error("Client connection denied by user agents")] + Denied, + #[error("Upstream error: {0}")] + Upstream(router::ApprovalError), +} + /// Atomically reads and increments the nonce for a known client. /// Returns `None` if the pubkey is not registered. async fn get_nonce( @@ -84,6 +100,85 @@ async fn get_nonce( }) } +async fn approve_new_client( + actors: &crate::actors::GlobalActors, + pubkey: VerifyingKey, +) -> Result<(), Error> { + let result = actors + .router + .ask(RequestClientApproval { + client_pubkey: pubkey, + }) + .await; + + match result { + Ok(true) => Ok(()), + Ok(false) => Err(Error::ApproveError(ApproveError::Denied)), + Err(SendError::HandlerError(e)) => { + error!(error = ?e, "Approval upstream error"); + Err(Error::ApproveError(ApproveError::Upstream(e))) + } + Err(e) => { + error!(error = ?e, "Approval request to router failed"); + Err(Error::ApproveError(ApproveError::Internal)) + } + } +} + +enum InsertClientResult { + Inserted(i32), + AlreadyExists, +} + +async fn insert_client( + db: &db::DatabasePool, + pubkey: &VerifyingKey, +) -> Result { + let now = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_secs() as i32; + + let mut conn = db.get().await.map_err(|e| { + error!(error = ?e, "Database pool error"); + Error::DatabasePoolUnavailable + })?; + + match insert_into(program_client::table) + .values(( + program_client::public_key.eq(pubkey.as_bytes().to_vec()), + program_client::nonce.eq(1), // pre-incremented; challenge uses 0 + program_client::created_at.eq(now), + program_client::updated_at.eq(now), + )) + .execute(&mut conn) + .await + { + Ok(_) => {} + Err(diesel::result::Error::DatabaseError( + diesel::result::DatabaseErrorKind::UniqueViolation, + _, + )) => return Ok(InsertClientResult::AlreadyExists), + Err(e) => { + error!(error = ?e, "Failed to insert new client"); + return Err(Error::DatabaseOperationFailed); + } + } + + let client_id = program_client::table + .filter(program_client::public_key.eq(pubkey.as_bytes().to_vec())) + .order(program_client::id.desc()) + .select(program_client::id) + .first::(&mut conn) + .await + .map_err(|e| { + error!(error = ?e, "Failed to load inserted client id"); + Error::DatabaseOperationFailed + })?; + + Ok(InsertClientResult::Inserted(client_id)) +} + async fn challenge_client( props: &mut ClientConnection, pubkey: VerifyingKey, @@ -134,7 +229,10 @@ async fn challenge_client( fn connect_error_code(err: &Error) -> ConnectErrorCode { match err { - Error::NotRegistered => ConnectErrorCode::ApprovalDenied, + Error::ApproveError(ApproveError::Denied) => ConnectErrorCode::ApprovalDenied, + Error::ApproveError(ApproveError::Upstream( + router::ApprovalError::NoUserAgentsConnected, + )) => ConnectErrorCode::NoUserAgentsOnline, _ => ConnectErrorCode::Unknown, } } @@ -156,7 +254,16 @@ async fn authenticate(props: &mut ClientConnection) -> Result<(VerifyingKey, i32 let (client_id, nonce) = match get_nonce(&props.db, &pubkey).await? { Some((client_id, nonce)) => (client_id, nonce), - None => return Err(Error::NotRegistered), + None => { + approve_new_client(&props.actors, pubkey).await?; + match insert_client(&props.db, &pubkey).await? { + InsertClientResult::Inserted(client_id) => (client_id, 0), + InsertClientResult::AlreadyExists => match get_nonce(&props.db, &pubkey).await? { + Some((client_id, nonce)) => (client_id, nonce), + None => return Err(Error::InternalError), + }, + } + } }; challenge_client(props, pubkey, nonce).await?; diff --git a/server/crates/arbiter-server/src/actors/router/mod.rs b/server/crates/arbiter-server/src/actors/router/mod.rs index 8d06152..b7303de 100644 --- a/server/crates/arbiter-server/src/actors/router/mod.rs +++ b/server/crates/arbiter-server/src/actors/router/mod.rs @@ -1,14 +1,20 @@ use std::{collections::HashMap, ops::ControlFlow}; +use ed25519_dalek::VerifyingKey; use kameo::{ Actor, actor::{ActorId, ActorRef}, messages, prelude::{ActorStopReason, Context, WeakActorRef}, + reply::DelegatedReply, }; -use tracing::info; +use tokio::{sync::watch, task::JoinSet}; +use tracing::{info, warn}; -use crate::actors::{client::session::ClientSession, user_agent::session::UserAgentSession}; +use crate::actors::{ + client::session::ClientSession, + user_agent::session::{RequestNewClientApproval, UserAgentSession}, +}; #[derive(Default)] pub struct MessageRouter { @@ -50,6 +56,72 @@ impl Actor for MessageRouter { } } +#[derive(Debug, thiserror::Error, Clone, PartialEq, Eq, Hash)] +pub enum ApprovalError { + #[error("No user agents connected")] + NoUserAgentsConnected, +} + +async fn request_client_approval( + user_agents: &[WeakActorRef], + client_pubkey: VerifyingKey, +) -> Result { + if user_agents.is_empty() { + return Err(ApprovalError::NoUserAgentsConnected); + } + + let mut pool = JoinSet::new(); + let (cancel_tx, cancel_rx) = watch::channel(()); + + for weak_ref in user_agents { + match weak_ref.upgrade() { + Some(agent) => { + let cancel_rx = cancel_rx.clone(); + pool.spawn(async move { + agent + .ask(RequestNewClientApproval { + client_pubkey, + cancel_flag: cancel_rx.clone(), + }) + .await + }); + } + None => { + warn!( + id = weak_ref.id().to_string(), + actor = "MessageRouter", + event = "useragent.disconnected_before_approval" + ); + } + } + } + + while let Some(result) = pool.join_next().await { + match result { + Ok(Ok(approved)) => { + let _ = cancel_tx.send(()); + return Ok(approved); + } + Ok(Err(err)) => { + warn!( + ?err, + actor = "MessageRouter", + event = "useragent.approval_error" + ); + } + Err(err) => { + warn!( + ?err, + actor = "MessageRouter", + event = "useragent.approval_task_failed" + ); + } + } + } + + Err(ApprovalError::NoUserAgentsConnected) +} + #[messages] impl MessageRouter { #[message(ctx)] @@ -73,4 +145,28 @@ impl MessageRouter { ctx.actor_ref().link(&actor).await; self.clients.insert(actor.id(), actor); } + + #[message(ctx)] + pub async fn request_client_approval( + &mut self, + client_pubkey: VerifyingKey, + ctx: &mut Context>>, + ) -> DelegatedReply> { + let (reply, Some(reply_sender)) = ctx.reply_sender() else { + panic!("Exptected `request_client_approval` to have callback channel"); + }; + + let weak_refs = self + .user_agents + .values() + .map(|agent| agent.downgrade()) + .collect::>(); + + tokio::task::spawn(async move { + let result = request_client_approval(&weak_refs, client_pubkey).await; + reply_sender.send(result); + }); + + reply + } } diff --git a/server/crates/arbiter-server/src/actors/user_agent/session.rs b/server/crates/arbiter-server/src/actors/user_agent/session.rs index da53f3a..7edd38e 100644 --- a/server/crates/arbiter-server/src/actors/user_agent/session.rs +++ b/server/crates/arbiter-server/src/actors/user_agent/session.rs @@ -3,21 +3,22 @@ use std::{ops::DerefMut, sync::Mutex}; use arbiter_proto::proto::{ evm as evm_proto, user_agent::{ - SdkClientApproveRequest, SdkClientApproveResponse, SdkClientEntry, - SdkClientError as ProtoSdkClientError, SdkClientList, SdkClientListResponse, - SdkClientRevokeRequest, SdkClientRevokeResponse, UnsealEncryptedKey, UnsealResult, - UnsealStart, UnsealStartResponse, UserAgentRequest, UserAgentResponse, - sdk_client_approve_response, sdk_client_list_response, sdk_client_revoke_response, - user_agent_request::Payload as UserAgentRequestPayload, + ClientConnectionCancel, ClientConnectionRequest, SdkClientApproveRequest, + SdkClientApproveResponse, SdkClientEntry, SdkClientError as ProtoSdkClientError, + SdkClientList, SdkClientListResponse, SdkClientRevokeRequest, SdkClientRevokeResponse, + UnsealEncryptedKey, UnsealResult, UnsealStart, UnsealStartResponse, UserAgentRequest, + UserAgentResponse, sdk_client_approve_response, sdk_client_list_response, + sdk_client_revoke_response, user_agent_request::Payload as UserAgentRequestPayload, user_agent_response::Payload as UserAgentResponsePayload, }, }; use chacha20poly1305::{AeadInPlace, XChaCha20Poly1305, XNonce, aead::KeyInit}; use diesel::{ExpressionMethods as _, QueryDsl as _, dsl::insert_into}; use diesel_async::RunQueryDsl as _; -use kameo::{Actor, error::SendError, prelude::Context}; +use ed25519_dalek::VerifyingKey; +use kameo::{Actor, error::SendError, messages, prelude::Context}; use memsafe::MemSafe; -use tokio::select; +use tokio::{select, sync::watch}; use tracing::{error, info}; use x25519_dalek::{EphemeralSecret, PublicKey}; @@ -115,6 +116,52 @@ impl UserAgentSession { } } +#[messages] +impl UserAgentSession { + // TODO: Think about refactoring it to state-machine based flow, as we already have one + #[message(ctx)] + pub async fn request_new_client_approval( + &mut self, + client_pubkey: VerifyingKey, + mut cancel_flag: watch::Receiver<()>, + ctx: &mut Context>, + ) -> Result { + self.send_msg( + UserAgentResponsePayload::ClientConnectionRequest(ClientConnectionRequest { + pubkey: client_pubkey.as_bytes().to_vec(), + }), + ctx, + ) + .await?; + + let extractor = |msg| { + if let UserAgentRequestPayload::ClientConnectionResponse(client_connection_response) = + msg + { + Some(client_connection_response) + } else { + None + } + }; + + tokio::select! { + _ = cancel_flag.changed() => { + info!(actor = "useragent", "client connection approval cancelled"); + self.send_msg( + UserAgentResponsePayload::ClientConnectionCancel(ClientConnectionCancel {}), + ctx, + ).await?; + Ok(false) + } + result = self.expect_msg(extractor, ctx) => { + let result = result?; + info!(actor = "useragent", "received client connection approval result: approved={}", result.approved); + Ok(result.approved) + } + } + } +} + impl UserAgentSession { pub async fn process_transport_inbound(&mut self, req: UserAgentRequest) -> Output { let msg = req.payload.ok_or_else(|| { diff --git a/server/crates/arbiter-server/src/lib.rs b/server/crates/arbiter-server/src/lib.rs index 12eb13e..ed2a2e5 100644 --- a/server/crates/arbiter-server/src/lib.rs +++ b/server/crates/arbiter-server/src/lib.rs @@ -105,7 +105,7 @@ fn client_auth_error_status(value: &client::auth::Error) -> Status { Status::invalid_argument("Failed to convert pubkey to VerifyingKey") } Error::InvalidChallengeSolution => Status::unauthenticated(value.to_string()), - Error::NotRegistered => Status::permission_denied(value.to_string()), + Error::ApproveError(_) => Status::permission_denied(value.to_string()), Error::Transport => Status::internal("Transport error"), Error::DatabasePoolUnavailable => Status::internal("Database pool error"), Error::DatabaseOperationFailed => Status::internal("Database error"),