diff --git a/IMPLEMENTATION.md b/IMPLEMENTATION.md index 84973a5..f71de5a 100644 --- a/IMPLEMENTATION.md +++ b/IMPLEMENTATION.md @@ -4,6 +4,52 @@ This document covers concrete technology choices and dependencies. For the archi --- +## Client Connection Flow + +### New Client Approval + +When a client whose public key is not yet in the database connects, all connected user agents are asked to approve the connection. The first agent to respond determines the outcome; remaining requests are cancelled via a watch channel. + +```mermaid +flowchart TD + A([Client connects]) --> B[Receive AuthChallengeRequest] + B --> C{pubkey in DB?} + + C -- yes --> D[Read nonce\nIncrement nonce in DB] + D --> G + + C -- no --> E[Ask all UserAgents:\nClientConnectionRequest] + E --> F{First response} + F -- denied --> Z([Reject connection]) + F -- approved --> F2[Cancel remaining\nUserAgent requests] + F2 --> F3[INSERT client\nnonce = 1] + F3 --> G[Send AuthChallenge\nwith nonce] + + G --> H[Receive AuthChallengeSolution] + H --> I{Signature valid?} + I -- no --> Z + I -- yes --> J([Session started]) +``` + +### Known Issue: Concurrent Registration Race (TOCTOU) + +Two connections presenting the same previously-unknown public key can race through the approval flow simultaneously: + +1. Both check the DB → neither is registered. +2. Both request approval from user agents → both receive approval. +3. Both `INSERT` the client record → the second insert silently overwrites the first, resetting the nonce. + +This means the first connection's nonce is invalidated by the second, causing its challenge verification to fail. A fix requires either serialising new-client registration (e.g. an in-memory lock keyed on pubkey) or replacing the separate check + insert with an `INSERT OR IGNORE` / upsert guarded by a unique constraint on `public_key`. + +### Nonce Semantics + +The `program_client.nonce` column stores the **next usable nonce** — i.e. it is always one ahead of the nonce last issued in a challenge. + +- **New client:** inserted with `nonce = 1`; the first challenge is issued with `nonce = 0`. +- **Existing client:** the current DB value is read and used as the challenge nonce, then immediately incremented within the same exclusive transaction, preventing replay. + +--- + ## Cryptography ### Authentication diff --git a/protobufs/client.proto b/protobufs/client.proto index f548f69..62761c3 100644 --- a/protobufs/client.proto +++ b/protobufs/client.proto @@ -23,15 +23,23 @@ message ClientRequest { oneof payload { AuthChallengeRequest auth_challenge_request = 1; AuthChallengeSolution auth_challenge_solution = 2; - arbiter.evm.EvmSignTransactionRequest evm_sign_transaction = 3; - arbiter.evm.EvmAnalyzeTransactionRequest evm_analyze_transaction = 4; } } +message ClientConnectError { + enum Code { + UNKNOWN = 0; + APPROVAL_DENIED = 1; + NO_USER_AGENTS_ONLINE = 2; + } + Code code = 1; +} + message ClientResponse { oneof payload { AuthChallenge auth_challenge = 1; AuthOk auth_ok = 2; + ClientConnectError client_connect_error = 5; arbiter.evm.EvmSignTransactionResponse evm_sign_transaction = 3; arbiter.evm.EvmAnalyzeTransactionResponse evm_analyze_transaction = 4; } diff --git a/protobufs/user_agent.proto b/protobufs/user_agent.proto index 09a2c2a..cfc00ae 100644 --- a/protobufs/user_agent.proto +++ b/protobufs/user_agent.proto @@ -49,6 +49,16 @@ enum VaultState { VAULT_STATE_ERROR = 4; } +message ClientConnectionRequest { + bytes pubkey = 1; +} + +message ClientConnectionResponse { + bool approved = 1; +} + +message ClientConnectionCancel {} + message UserAgentRequest { oneof payload { AuthChallengeRequest auth_challenge_request = 1; @@ -61,6 +71,7 @@ message UserAgentRequest { arbiter.evm.EvmGrantCreateRequest evm_grant_create = 8; arbiter.evm.EvmGrantDeleteRequest evm_grant_delete = 9; arbiter.evm.EvmGrantListRequest evm_grant_list = 10; + ClientConnectionResponse client_connection_response = 11; } } message UserAgentResponse { @@ -75,5 +86,7 @@ message UserAgentResponse { arbiter.evm.EvmGrantCreateResponse evm_grant_create = 8; arbiter.evm.EvmGrantDeleteResponse evm_grant_delete = 9; arbiter.evm.EvmGrantListResponse evm_grant_list = 10; + ClientConnectionRequest client_connection_request = 11; + ClientConnectionCancel client_connection_cancel = 12; } } diff --git a/server/crates/arbiter-proto/src/transport.rs b/server/crates/arbiter-proto/src/transport.rs index a38b892..f5acaf9 100644 --- a/server/crates/arbiter-proto/src/transport.rs +++ b/server/crates/arbiter-proto/src/transport.rs @@ -83,6 +83,23 @@ use async_trait::async_trait; pub enum Error { #[error("Transport channel is closed")] ChannelClosed, + #[error("Unexpected message received")] + UnexpectedMessage, +} + +/// Receives one message from `transport` and extracts a value from it using +/// `extractor`. Returns [`Error::ChannelClosed`] if the transport closes and +/// [`Error::UnexpectedMessage`] if `extractor` returns `None`. +pub async fn expect_message( + transport: &mut T, + extractor: F, +) -> Result +where + T: Bi + ?Sized, + F: FnOnce(Inbound) -> Option, +{ + let msg = transport.recv().await.ok_or(Error::ChannelClosed)?; + extractor(msg).ok_or(Error::UnexpectedMessage) } /// Minimal bidirectional transport abstraction used by protocol code. diff --git a/server/crates/arbiter-server/src/actors/client/auth.rs b/server/crates/arbiter-server/src/actors/client/auth.rs new file mode 100644 index 0000000..3f9c7a8 --- /dev/null +++ b/server/crates/arbiter-server/src/actors/client/auth.rs @@ -0,0 +1,251 @@ +use arbiter_proto::{ + format_challenge, + proto::client::{ + AuthChallenge, AuthChallengeSolution, ClientConnectError, ClientRequest, ClientResponse, + client_connect_error::Code as ConnectErrorCode, + client_request::Payload as ClientRequestPayload, + client_response::Payload as ClientResponsePayload, + }, + transport::expect_message, +}; +use diesel::{ + ExpressionMethods as _, OptionalExtension as _, QueryDsl as _, dsl::insert_into, update, +}; +use diesel_async::RunQueryDsl as _; +use ed25519_dalek::VerifyingKey; +use kameo::error::SendError; +use tracing::error; + +use crate::{ + actors::{client::ClientConnection, router::{self, RequestClientApproval}}, + db::{self, schema::program_client}, +}; + +use super::session::ClientSession; + +#[derive(thiserror::Error, Debug, Clone, PartialEq, Eq)] +pub enum Error { + #[error("Unexpected message payload")] + UnexpectedMessagePayload, + #[error("Invalid client public key length")] + InvalidClientPubkeyLength, + #[error("Invalid client public key encoding")] + InvalidAuthPubkeyEncoding, + #[error("Database pool unavailable")] + DatabasePoolUnavailable, + #[error("Database operation failed")] + DatabaseOperationFailed, + #[error("Invalid challenge solution")] + InvalidChallengeSolution, + #[error("Client approval request failed")] + ApproveError(#[from] ApproveError), + #[error("Internal error")] + InternalError, + #[error("Transport error")] + Transport, +} + +#[derive(thiserror::Error, Debug, Clone, PartialEq, Eq)] +pub enum ApproveError { + #[error("Internal error")] + Internal, + #[error("Client connection denied by user agents")] + Denied, + #[error("Upstream error: {0}")] + Upstream(router::ApprovalError), +} + +/// Atomically reads and increments the nonce for a known client. +/// Returns `None` if the pubkey is not registered. +async fn get_nonce(db: &db::DatabasePool, pubkey: &VerifyingKey) -> Result, Error> { + let pubkey_bytes = pubkey.as_bytes().to_vec(); + + let mut conn = db.get().await.map_err(|e| { + error!(error = ?e, "Database pool error"); + Error::DatabasePoolUnavailable + })?; + + conn.exclusive_transaction(|conn| { + let pubkey_bytes = pubkey_bytes.clone(); + Box::pin(async move { + let Some(current_nonce) = program_client::table + .filter(program_client::public_key.eq(&pubkey_bytes)) + .select(program_client::nonce) + .first::(conn) + .await + .optional()? + else { + return Result::<_, diesel::result::Error>::Ok(None); + }; + + update(program_client::table) + .filter(program_client::public_key.eq(&pubkey_bytes)) + .set(program_client::nonce.eq(current_nonce + 1)) + .execute(conn) + .await?; + + Ok(Some(current_nonce)) + }) + }) + .await + .map_err(|e| { + error!(error = ?e, "Database error"); + Error::DatabaseOperationFailed + }) +} + +async fn approve_new_client( + actors: &crate::actors::GlobalActors, + pubkey: VerifyingKey, +) -> Result<(), Error> { + let result = actors + .router + .ask(RequestClientApproval { client_pubkey: pubkey }) + .await; + + match result { + Ok(true) => Ok(()), + Ok(false) => Err(Error::ApproveError(ApproveError::Denied)), + Err(SendError::HandlerError(e)) => { + error!(error = ?e, "Approval upstream error"); + Err(Error::ApproveError(ApproveError::Upstream(e))) + } + Err(e) => { + error!(error = ?e, "Approval request to router failed"); + Err(Error::ApproveError(ApproveError::Internal)) + } + } +} + +async fn insert_client(db: &db::DatabasePool, pubkey: &VerifyingKey) -> Result<(), Error> { + let now = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_secs() as i32; + + let mut conn = db.get().await.map_err(|e| { + error!(error = ?e, "Database pool error"); + Error::DatabasePoolUnavailable + })?; + + insert_into(program_client::table) + .values(( + program_client::public_key.eq(pubkey.as_bytes().to_vec()), + program_client::nonce.eq(1), // pre-incremented; challenge uses 0 + program_client::created_at.eq(now), + program_client::updated_at.eq(now), + )) + .execute(&mut conn) + .await + .map_err(|e| { + error!(error = ?e, "Failed to insert new client"); + Error::DatabaseOperationFailed + })?; + + Ok(()) +} + +async fn challenge_client( + props: &mut ClientConnection, + pubkey: VerifyingKey, + nonce: i32, +) -> Result<(), Error> { + let challenge = AuthChallenge { + pubkey: pubkey.as_bytes().to_vec(), + nonce, + }; + + props + .transport + .send(Ok(ClientResponse { + payload: Some(ClientResponsePayload::AuthChallenge(challenge.clone())), + })) + .await + .map_err(|e| { + error!(error = ?e, "Failed to send auth challenge"); + Error::Transport + })?; + + let AuthChallengeSolution { signature } = expect_message( + &mut *props.transport, + |req: ClientRequest| match req.payload? { + ClientRequestPayload::AuthChallengeSolution(s) => Some(s), + _ => None, + }, + ) + .await + .map_err(|e| { + error!(error = ?e, "Failed to receive challenge solution"); + Error::Transport + })?; + + let formatted = format_challenge(nonce, &challenge.pubkey); + let sig = signature.as_slice().try_into().map_err(|_| { + error!("Invalid signature length"); + Error::InvalidChallengeSolution + })?; + + pubkey.verify_strict(&formatted, &sig).map_err(|_| { + error!("Challenge solution verification failed"); + Error::InvalidChallengeSolution + })?; + + Ok(()) +} + +fn connect_error_code(err: &Error) -> ConnectErrorCode { + match err { + Error::ApproveError(ApproveError::Denied) => ConnectErrorCode::ApprovalDenied, + Error::ApproveError(ApproveError::Upstream(router::ApprovalError::NoUserAgentsConnected)) => { + ConnectErrorCode::NoUserAgentsOnline + } + _ => ConnectErrorCode::Unknown, + } +} + +async fn authenticate(props: &mut ClientConnection) -> Result { + let Some(ClientRequest { + payload: Some(ClientRequestPayload::AuthChallengeRequest(challenge)), + }) = props.transport.recv().await + else { + return Err(Error::Transport); + }; + + let pubkey_bytes = challenge + .pubkey + .as_array() + .ok_or(Error::InvalidClientPubkeyLength)?; + let pubkey = + VerifyingKey::from_bytes(pubkey_bytes).map_err(|_| Error::InvalidAuthPubkeyEncoding)?; + + let nonce = match get_nonce(&props.db, &pubkey).await? { + Some(nonce) => nonce, + None => { + approve_new_client(&props.actors, pubkey).await?; + insert_client(&props.db, &pubkey).await?; + 0 + } + }; + + challenge_client(props, pubkey, nonce).await?; + + Ok(pubkey) +} + +pub async fn authenticate_and_create(mut props: ClientConnection) -> Result { + match authenticate(&mut props).await { + Ok(pubkey) => Ok(ClientSession::new(props, pubkey)), + Err(err) => { + let code = connect_error_code(&err); + let _ = props + .transport + .send(Ok(ClientResponse { + payload: Some(ClientResponsePayload::ClientConnectError( + ClientConnectError { code: code.into() }, + )), + })) + .await; + Err(err) + } + } +} diff --git a/server/crates/arbiter-server/src/actors/client/auth/mod.rs b/server/crates/arbiter-server/src/actors/client/auth/mod.rs deleted file mode 100644 index 8ff0600..0000000 --- a/server/crates/arbiter-server/src/actors/client/auth/mod.rs +++ /dev/null @@ -1,102 +0,0 @@ -use arbiter_proto::proto::client::{ - AuthChallengeRequest, AuthChallengeSolution, ClientRequest, - client_request::Payload as ClientRequestPayload, -}; -use ed25519_dalek::VerifyingKey; -use tracing::error; - -use crate::actors::client::{ - ClientConnection, - auth::state::{AuthContext, AuthStateMachine}, - session::ClientSession, -}; - -#[derive(thiserror::Error, Debug, Clone, PartialEq, Eq)] -pub enum Error { - #[error("Unexpected message payload")] - UnexpectedMessagePayload, - #[error("Invalid client public key length")] - InvalidClientPubkeyLength, - #[error("Invalid client public key encoding")] - InvalidAuthPubkeyEncoding, - #[error("Database pool unavailable")] - DatabasePoolUnavailable, - #[error("Database operation failed")] - DatabaseOperationFailed, - #[error("Public key not registered")] - PublicKeyNotRegistered, - #[error("Invalid signature length")] - InvalidSignatureLength, - #[error("Invalid challenge solution")] - InvalidChallengeSolution, - #[error("Transport error")] - Transport, -} - -mod state; -use state::*; - -fn parse_auth_event(payload: ClientRequestPayload) -> Result { - match payload { - ClientRequestPayload::AuthChallengeRequest(AuthChallengeRequest { pubkey }) => { - let pubkey_bytes = pubkey.as_array().ok_or(Error::InvalidClientPubkeyLength)?; - let pubkey = VerifyingKey::from_bytes(pubkey_bytes) - .map_err(|_| Error::InvalidAuthPubkeyEncoding)?; - Ok(AuthEvents::AuthRequest(ChallengeRequest { - pubkey: pubkey.into(), - })) - } - ClientRequestPayload::AuthChallengeSolution(AuthChallengeSolution { signature }) => { - Ok(AuthEvents::ReceivedSolution(ChallengeSolution { - solution: signature, - })) - } - _ => Err(Error::UnexpectedMessagePayload) , - } -} - -pub async fn authenticate(props: &mut ClientConnection) -> Result { - let mut state = AuthStateMachine::new(AuthContext::new(props)); - - loop { - let transport = state.context_mut().conn.transport.as_mut(); - let Some(ClientRequest { - payload: Some(payload), - }) = transport.recv().await - else { - return Err(Error::Transport); - }; - - let event = parse_auth_event(payload)?; - - match state.process_event(event).await { - Ok(AuthStates::AuthOk(key)) => return Ok(key.clone()), - Err(AuthError::ActionFailed(err)) => { - error!(?err, "State machine action failed"); - return Err(err); - } - Err(AuthError::GuardFailed(err)) => { - error!(?err, "State machine guard failed"); - return Err(err); - } - Err(AuthError::InvalidEvent) => { - error!("Invalid event for current state"); - return Err(Error::InvalidChallengeSolution); - } - Err(AuthError::TransitionsFailed) => { - error!("Invalid state transition"); - return Err(Error::InvalidChallengeSolution); - } - - _ => (), - } - } -} - -pub async fn authenticate_and_create( - mut props: ClientConnection, -) -> Result { - let key = authenticate(&mut props).await?; - let session = ClientSession::new(props, key); - Ok(session) -} diff --git a/server/crates/arbiter-server/src/actors/client/auth/state.rs b/server/crates/arbiter-server/src/actors/client/auth/state.rs deleted file mode 100644 index bfa2dc3..0000000 --- a/server/crates/arbiter-server/src/actors/client/auth/state.rs +++ /dev/null @@ -1,136 +0,0 @@ -use arbiter_proto::proto::client::{ - AuthChallenge, ClientResponse, - client_response::Payload as ClientResponsePayload, -}; -use diesel::{ExpressionMethods as _, OptionalExtension as _, QueryDsl, update}; -use diesel_async::RunQueryDsl; -use ed25519_dalek::VerifyingKey; -use tracing::error; - -use super::Error; -use crate::{actors::client::ClientConnection, db::schema}; - -pub struct ChallengeRequest { - pub pubkey: VerifyingKey, -} - -pub struct ChallengeContext { - pub challenge: AuthChallenge, - pub key: VerifyingKey, -} - -pub struct ChallengeSolution { - pub solution: Vec, -} - -smlang::statemachine!( - name: Auth, - custom_error: true, - transitions: { - *Init + AuthRequest(ChallengeRequest) / async prepare_challenge = SentChallenge(ChallengeContext), - SentChallenge(ChallengeContext) + ReceivedSolution(ChallengeSolution) [async verify_solution] / provide_key = AuthOk(VerifyingKey), - } -); - -async fn create_nonce(db: &crate::db::DatabasePool, pubkey_bytes: &[u8]) -> Result { - let mut db_conn = db.get().await.map_err(|e| { - error!(error = ?e, "Database pool error"); - Error::DatabasePoolUnavailable - })?; - db_conn - .exclusive_transaction(|conn| { - Box::pin(async move { - let current_nonce = schema::program_client::table - .filter(schema::program_client::public_key.eq(pubkey_bytes.to_vec())) - .select(schema::program_client::nonce) - .first::(conn) - .await?; - - update(schema::program_client::table) - .filter(schema::program_client::public_key.eq(pubkey_bytes.to_vec())) - .set(schema::program_client::nonce.eq(current_nonce + 1)) - .execute(conn) - .await?; - - Result::<_, diesel::result::Error>::Ok(current_nonce) - }) - }) - .await - .optional() - .map_err(|e| { - error!(error = ?e, "Database error"); - Error::DatabaseOperationFailed - })? - .ok_or_else(|| { - error!(?pubkey_bytes, "Public key not found in database"); - Error::PublicKeyNotRegistered - }) -} - -pub struct AuthContext<'a> { - pub(super) conn: &'a mut ClientConnection, -} - -impl<'a> AuthContext<'a> { - pub fn new(conn: &'a mut ClientConnection) -> Self { - Self { conn } - } -} - -impl AuthStateMachineContext for AuthContext<'_> { - type Error = Error; - - async fn verify_solution( - &self, - ChallengeContext { challenge, key }: &ChallengeContext, - ChallengeSolution { solution }: &ChallengeSolution, - ) -> Result { - let formatted_challenge = - arbiter_proto::format_challenge(challenge.nonce, &challenge.pubkey); - - let signature = solution.as_slice().try_into().map_err(|_| { - error!(?solution, "Invalid signature length"); - Error::InvalidChallengeSolution - })?; - - let valid = key.verify_strict(&formatted_challenge, &signature).is_ok(); - - Ok(valid) - } - - async fn prepare_challenge( - &mut self, - ChallengeRequest { pubkey }: ChallengeRequest, - ) -> Result { - let nonce = create_nonce(&self.conn.db, pubkey.as_bytes()).await?; - - let challenge = AuthChallenge { - pubkey: pubkey.as_bytes().to_vec(), - nonce, - }; - - self.conn - .transport - .send(Ok(ClientResponse { - payload: Some(ClientResponsePayload::AuthChallenge(challenge.clone())), - })) - .await - .map_err(|e| { - error!(?e, "Failed to send auth challenge"); - Error::Transport - })?; - - Ok(ChallengeContext { - challenge, - key: pubkey, - }) - } - - fn provide_key( - &mut self, - state_data: &ChallengeContext, - _: ChallengeSolution, - ) -> Result { - Ok(state_data.key) - } -} diff --git a/server/crates/arbiter-server/src/actors/router/mod.rs b/server/crates/arbiter-server/src/actors/router/mod.rs index 966e1ce..ac1d720 100644 --- a/server/crates/arbiter-server/src/actors/router/mod.rs +++ b/server/crates/arbiter-server/src/actors/router/mod.rs @@ -1,17 +1,20 @@ -use std::{ - collections::{HashMap}, - ops::ControlFlow, -}; +use std::{collections::HashMap, ops::ControlFlow}; +use ed25519_dalek::VerifyingKey; use kameo::{ Actor, actor::{ActorId, ActorRef}, messages, prelude::{ActorStopReason, Context, WeakActorRef}, + reply::DelegatedReply, }; -use tracing::info; +use tokio::{sync::watch, task::JoinSet}; +use tracing::{info, warn}; -use crate::actors::{client::session::ClientSession, user_agent::session::UserAgentSession}; +use crate::actors::{ + client::session::ClientSession, + user_agent::session::{RequestNewClientApproval, UserAgentSession}, +}; #[derive(Default)] pub struct MessageRouter { @@ -53,6 +56,74 @@ impl Actor for MessageRouter { } } +#[derive(Debug, thiserror::Error, Clone, PartialEq, Eq, Hash)] +pub enum ApprovalError { + #[error("No user agents connected")] + NoUserAgentsConnected, +} + +async fn request_client_approval( + user_agents: &[WeakActorRef], + client_pubkey: VerifyingKey, +) -> Result { + if user_agents.is_empty() { + return Err(ApprovalError::NoUserAgentsConnected).into(); + } + + let mut pool = JoinSet::new(); + let (cancel_tx, cancel_rx) = watch::channel(()); + + for weak_ref in user_agents { + match weak_ref.upgrade() { + Some(agent) => { + let client_pubkey = client_pubkey.clone(); + let cancel_rx = cancel_rx.clone(); + pool.spawn(async move { + agent + .ask(RequestNewClientApproval { + client_pubkey, + cancel_flag: cancel_rx.clone(), + }) + .await + }); + } + None => { + warn!( + id = weak_ref.id().to_string(), + actor = "MessageRouter", + event = "useragent.disconnected_before_approval" + ); + } + } + } + + while let Some(result) = pool.join_next().await { + match result { + Ok(Ok(approved)) => { + // cancel other pending requests + let _ = cancel_tx.send(()); + return Ok(approved); + } + Ok(Err(err)) => { + warn!( + ?err, + actor = "MessageRouter", + event = "useragent.approval_error" + ); + } + Err(err) => { + warn!( + ?err, + actor = "MessageRouter", + event = "useragent.approval_task_failed" + ); + } + } + } + + Err(ApprovalError::NoUserAgentsConnected) +} + #[messages] impl MessageRouter { #[message(ctx)] @@ -76,4 +147,29 @@ impl MessageRouter { ctx.actor_ref().link(&actor).await; self.clients.insert(actor.id(), actor); } + + #[message(ctx)] + pub async fn request_client_approval( + &mut self, + client_pubkey: VerifyingKey, + ctx: &mut Context>>, + ) -> DelegatedReply> { + let (reply, Some(reply_sender)) = ctx.reply_sender() else { + panic!("Exptected `request_client_approval` to have callback channel"); + }; + + let weak_refs = self + .user_agents + .values() + .map(|agent| agent.downgrade()) + .collect::>(); + + // handle in subtask to not to lock the actor + tokio::task::spawn(async move { + let result = request_client_approval(&weak_refs, client_pubkey).await; + let _ = reply_sender.send(result); + }); + + reply + } } diff --git a/server/crates/arbiter-server/src/actors/user_agent/mod.rs b/server/crates/arbiter-server/src/actors/user_agent/mod.rs index 2043b27..4380b72 100644 --- a/server/crates/arbiter-server/src/actors/user_agent/mod.rs +++ b/server/crates/arbiter-server/src/actors/user_agent/mod.rs @@ -11,7 +11,7 @@ use crate::{ }; #[derive(Debug, thiserror::Error, PartialEq)] -pub enum UserAgentError { +pub enum TransportResponseError { #[error("Expected message with payload")] MissingRequestPayload, #[error("Unexpected request payload")] @@ -31,7 +31,7 @@ pub enum UserAgentError { } pub type Transport = - Box> + Send>; + Box> + Send>; pub struct UserAgentConnection { db: db::DatabasePool, diff --git a/server/crates/arbiter-server/src/actors/user_agent/session.rs b/server/crates/arbiter-server/src/actors/user_agent/session.rs index de04f70..ae4cde8 100644 --- a/server/crates/arbiter-server/src/actors/user_agent/session.rs +++ b/server/crates/arbiter-server/src/actors/user_agent/session.rs @@ -3,7 +3,7 @@ use std::{ops::DerefMut, sync::Mutex}; use arbiter_proto::proto::{ evm as evm_proto, user_agent::{ - UnsealEncryptedKey, UnsealResult, UnsealStart, UnsealStartResponse, UserAgentRequest, + ClientConnectionCancel, ClientConnectionRequest, UnsealEncryptedKey, UnsealResult, UnsealStart, UnsealStartResponse, UserAgentRequest, UserAgentResponse, user_agent_request::Payload as UserAgentRequestPayload, user_agent_response::Payload as UserAgentResponsePayload, }, @@ -12,10 +12,10 @@ use chacha20poly1305::{AeadInPlace, XChaCha20Poly1305, XNonce, aead::KeyInit}; use ed25519_dalek::VerifyingKey; use kameo::{ Actor, - error::SendError, + error::SendError, messages, prelude::Context, }; use memsafe::MemSafe; -use tokio::select; +use tokio::{select, sync::watch}; use tracing::{error, info}; use x25519_dalek::{EphemeralSecret, PublicKey}; @@ -23,12 +23,22 @@ use crate::actors::{ evm::{Generate, ListWallets}, keyholder::{self, TryUnseal}, router::RegisterUserAgent, - user_agent::{UserAgentConnection, UserAgentError}, + user_agent::{TransportResponseError, UserAgentConnection}, }; mod state; use state::{DummyContext, UnsealContext, UserAgentEvents, UserAgentStateMachine, UserAgentStates}; +// Error for consumption by other actors +#[derive(Debug, thiserror::Error, PartialEq)] +pub enum Error { + #[error("User agent session ended due to connection loss")] + ConnectionLost, + + #[error("User agent session ended due to unexpected message")] + UnexpectedMessage, +} + pub struct UserAgentSession { props: UserAgentConnection, key: VerifyingKey, @@ -36,7 +46,7 @@ pub struct UserAgentSession { } impl UserAgentSession { - pub(crate) fn new(props: UserAgentConnection, key: VerifyingKey) -> Self { + pub(crate) fn new(props: UserAgentConnection, key: VerifyingKey) -> Self { Self { props, key, @@ -44,18 +54,118 @@ impl UserAgentSession { } } - fn transition(&mut self, event: UserAgentEvents) -> Result<(), UserAgentError> { + fn transition(&mut self, event: UserAgentEvents) -> Result<(), TransportResponseError> { self.state.process_event(event).map_err(|e| { error!(?e, "State transition failed"); - UserAgentError::StateTransitionFailed + TransportResponseError::StateTransitionFailed })?; Ok(()) } + async fn send_msg( + &mut self, + msg: UserAgentResponsePayload, + _ctx: &mut Context, + ) -> Result<(), Error> { + self.props + .transport + .send(Ok(response(msg))) + .await + .map_err(|_| { + error!( + actor = "useragent", + reason = "channel closed", + "send.failed" + ); + Error::ConnectionLost + }) + } + + async fn expect_msg( + &mut self, + extractor: Extractor, + ctx: &mut Context, + ) -> Result + where + Extractor: FnOnce(UserAgentRequestPayload) -> Option, + Reply: kameo::Reply, + { + let msg = self.props.transport.recv().await.ok_or_else(|| { + error!( + actor = "useragent", + reason = "channel closed", + "recv.failed" + ); + ctx.stop(); + Error::ConnectionLost + })?; + + msg.payload.and_then(extractor).ok_or_else(|| { + error!( + actor = "useragent", + reason = "unexpected message", + "recv.failed" + ); + ctx.stop(); + Error::UnexpectedMessage + }) + } +} + +#[messages] +impl UserAgentSession { + // TODO: Think about refactoring it to state-machine based flow, as we already have one + #[message(ctx)] + pub async fn request_new_client_approval( + &mut self, + client_pubkey: VerifyingKey, + mut cancel_flag: watch::Receiver<()>, + ctx: &mut Context>, + ) -> Result { + self.send_msg( + UserAgentResponsePayload::ClientConnectionRequest( + ClientConnectionRequest { + pubkey: client_pubkey.as_bytes().to_vec(), + } + .into(), + ), + ctx, + ) + .await?; + + let extractor = |msg| { + if let UserAgentRequestPayload::ClientConnectionResponse(client_connection_response) = + msg + { + Some(client_connection_response) + } else { + None + } + }; + + tokio::select! { + _ = cancel_flag.changed() => { + info!(actor = "useragent", "client connection approval cancelled"); + self.send_msg( + UserAgentResponsePayload::ClientConnectionCancel(ClientConnectionCancel {}), + ctx, + ).await?; + return Ok(false); + } + result = self.expect_msg(extractor, ctx) => { + let result = result?; + info!(actor = "useragent", "received client connection approval result: approved={}", result.approved); + return Ok(result.approved); + } + } + } +} + +impl UserAgentSession { pub async fn process_transport_inbound(&mut self, req: UserAgentRequest) -> Output { let msg = req.payload.ok_or_else(|| { error!(actor = "useragent", "Received message with no payload"); - UserAgentError::MissingRequestPayload + TransportResponseError::MissingRequestPayload })?; match msg { @@ -67,12 +177,12 @@ impl UserAgentSession { } UserAgentRequestPayload::EvmWalletCreate(_) => self.handle_evm_wallet_create().await, UserAgentRequestPayload::EvmWalletList(_) => self.handle_evm_wallet_list().await, - _ => Err(UserAgentError::UnexpectedRequestPayload), + _ => Err(TransportResponseError::UnexpectedRequestPayload), } } } -type Output = Result; +type Output = Result; fn response(payload: UserAgentResponsePayload) -> UserAgentResponse { UserAgentResponse { @@ -88,7 +198,7 @@ impl UserAgentSession { let client_pubkey_bytes: [u8; 32] = req .client_pubkey .try_into() - .map_err(|_| UserAgentError::InvalidClientPubkeyLength)?; + .map_err(|_| TransportResponseError::InvalidClientPubkeyLength)?; let client_public_key = PublicKey::from(client_pubkey_bytes); @@ -107,7 +217,7 @@ impl UserAgentSession { async fn handle_unseal_encrypted_key(&mut self, req: UnsealEncryptedKey) -> Output { let UserAgentStates::WaitingForUnsealKey(unseal_context) = self.state.state() else { error!("Received unseal encrypted key in invalid state"); - return Err(UserAgentError::InvalidStateForUnsealEncryptedKey); + return Err(TransportResponseError::InvalidStateForUnsealEncryptedKey); }; let ephemeral_secret = { let mut secret_lock = unseal_context.secret.lock().unwrap(); @@ -172,7 +282,7 @@ impl UserAgentSession { Err(err) => { error!(?err, "Failed to send unseal request to keyholder"); self.transition(UserAgentEvents::ReceivedInvalidKey)?; - Err(UserAgentError::KeyHolderActorUnreachable) + Err(TransportResponseError::KeyHolderActorUnreachable) } } } @@ -248,7 +358,7 @@ fn map_evm_error(op: &str, err: SendError) -> e impl Actor for UserAgentSession { type Args = Self; - type Error = UserAgentError; + type Error = TransportResponseError; async fn on_start( args: Self::Args, @@ -263,7 +373,7 @@ impl Actor for UserAgentSession { .await .map_err(|err| { error!(?err, "Failed to register user agent connection with router"); - UserAgentError::ConnectionRegistrationFailed + TransportResponseError::ConnectionRegistrationFailed })?; Ok(args) } diff --git a/server/crates/arbiter-server/src/db/models.rs b/server/crates/arbiter-server/src/db/models.rs index 7fea324..1c01ed6 100644 --- a/server/crates/arbiter-server/src/db/models.rs +++ b/server/crates/arbiter-server/src/db/models.rs @@ -2,7 +2,10 @@ #![allow(clippy::all)] use crate::db::schema::{ - self, aead_encrypted, arbiter_settings, evm_basic_grant, evm_ether_transfer_grant, evm_ether_transfer_grant_target, evm_ether_transfer_limit, evm_token_transfer_grant, evm_token_transfer_log, evm_token_transfer_volume_limit, evm_transaction_log, evm_wallet, root_key_history, tls_history + self, aead_encrypted, arbiter_settings, evm_basic_grant, evm_ether_transfer_grant, + evm_ether_transfer_grant_target, evm_ether_transfer_limit, evm_token_transfer_grant, + evm_token_transfer_log, evm_token_transfer_volume_limit, evm_transaction_log, evm_wallet, + root_key_history, tls_history, }; use chrono::{DateTime, Utc}; use diesel::{prelude::*, sqlite::Sqlite}; @@ -65,8 +68,8 @@ pub mod types { }; let unix_timestamp = bytes.read_long(); - let datetime = DateTime::from_timestamp(unix_timestamp, 0) - .ok_or("Timestamp is out of bounds")?; + let datetime = + DateTime::from_timestamp(unix_timestamp, 0).ok_or("Timestamp is out of bounds")?; Ok(SqliteTimestamp(datetime)) } @@ -150,7 +153,7 @@ pub struct EvmWallet { pub created_at: SqliteTimestamp, } -#[derive(Queryable, Debug)] +#[derive(Queryable, Debug, Insertable, Selectable)] #[diesel(table_name = schema::program_client, check_for_backend(Sqlite))] pub struct ProgramClient { pub id: i32, @@ -253,7 +256,6 @@ pub struct EvmEtherTransferGrantTarget { pub address: Vec, } - #[derive(Models, Queryable, Debug, Insertable, Selectable)] #[diesel(table_name = evm_token_transfer_grant, check_for_backend(Sqlite))] #[view( diff --git a/server/crates/arbiter-server/src/lib.rs b/server/crates/arbiter-server/src/lib.rs index a666bdd..c996035 100644 --- a/server/crates/arbiter-server/src/lib.rs +++ b/server/crates/arbiter-server/src/lib.rs @@ -16,7 +16,7 @@ use tracing::info; use crate::{ actors::{ client::{self, ClientError, ClientConnection as ClientConnectionProps, connect_client}, - user_agent::{self, UserAgentConnection, UserAgentError, connect_user_agent}, + user_agent::{self, UserAgentConnection, TransportResponseError, connect_user_agent}, }, context::ServerContext, }; @@ -31,7 +31,7 @@ const DEFAULT_CHANNEL_SIZE: usize = 1000; struct UserAgentGrpcSender; impl SendConverter for UserAgentGrpcSender { - type Input = Result; + type Input = Result; type Output = Result; fn convert(&self, item: Self::Input) -> Self::Output { @@ -78,31 +78,30 @@ fn client_auth_error_status(value: &client::auth::Error) -> Status { Error::InvalidAuthPubkeyEncoding => { Status::invalid_argument("Failed to convert pubkey to VerifyingKey") } - Error::InvalidSignatureLength => Status::invalid_argument("Invalid signature length"), - Error::PublicKeyNotRegistered | Error::InvalidChallengeSolution => { - Status::unauthenticated(value.to_string()) - } + Error::InvalidChallengeSolution => Status::unauthenticated(value.to_string()), + Error::ApproveError(_) => Status::permission_denied(value.to_string()), Error::Transport => Status::internal("Transport error"), Error::DatabasePoolUnavailable => Status::internal("Database pool error"), Error::DatabaseOperationFailed => Status::internal("Database error"), + Error::InternalError => Status::internal("Internal error"), } } -fn user_agent_error_status(value: UserAgentError) -> Status { +fn user_agent_error_status(value: TransportResponseError) -> Status { match value { - UserAgentError::MissingRequestPayload | UserAgentError::UnexpectedRequestPayload => { + TransportResponseError::MissingRequestPayload | TransportResponseError::UnexpectedRequestPayload => { Status::invalid_argument("Expected message with payload") } - UserAgentError::InvalidStateForUnsealEncryptedKey => { + TransportResponseError::InvalidStateForUnsealEncryptedKey => { Status::failed_precondition("Invalid state for unseal encrypted key") } - UserAgentError::InvalidClientPubkeyLength => { + TransportResponseError::InvalidClientPubkeyLength => { Status::invalid_argument("client_pubkey must be 32 bytes") } - UserAgentError::StateTransitionFailed => Status::internal("State machine error"), - UserAgentError::KeyHolderActorUnreachable => Status::internal("Vault is not available"), - UserAgentError::Auth(ref err) => auth_error_status(err), - UserAgentError::ConnectionRegistrationFailed => { + TransportResponseError::StateTransitionFailed => Status::internal("State machine error"), + TransportResponseError::KeyHolderActorUnreachable => Status::internal("Vault is not available"), + TransportResponseError::Auth(ref err) => auth_error_status(err), + TransportResponseError::ConnectionRegistrationFailed => { Status::internal("Failed registering connection") } }