From 8043cdf8d86f71e81dd436c169af4eefb7f36a27 Mon Sep 17 00:00:00 2001 From: hdbg Date: Sat, 21 Mar 2026 14:50:52 +0100 Subject: [PATCH] feat(server): re-introduce client approval flow --- protobufs/client.proto | 4 +- protobufs/user_agent.proto | 5 +- .../arbiter-server/src/actors/client/auth.rs | 17 +-- .../arbiter-server/src/actors/client/mod.rs | 8 +- .../client_connect_approval.rs | 101 ++++++++++++++++++ .../src/actors/flow_coordinator/mod.rs | 100 ++++------------- .../src/actors/user_agent/mod.rs | 6 +- .../src/actors/user_agent/session.rs | 86 +++++++++++---- .../actors/user_agent/session/connection.rs | 37 +++++++ .../arbiter-server/src/grpc/client/auth.rs | 4 +- .../arbiter-server/src/grpc/user_agent.rs | 57 ++++++++-- 11 files changed, 307 insertions(+), 118 deletions(-) create mode 100644 server/crates/arbiter-server/src/actors/flow_coordinator/client_connect_approval.rs diff --git a/protobufs/client.proto b/protobufs/client.proto index 1ceac34..83d25cf 100644 --- a/protobufs/client.proto +++ b/protobufs/client.proto @@ -7,8 +7,8 @@ import "google/protobuf/empty.proto"; message ClientInfo { string name = 1; - string description = 2; - string version = 3; + optional string description = 2; + optional string version = 3; } message AuthChallengeRequest { diff --git a/protobufs/user_agent.proto b/protobufs/user_agent.proto index 19f9705..397760f 100644 --- a/protobufs/user_agent.proto +++ b/protobufs/user_agent.proto @@ -86,9 +86,12 @@ message ClientConnectionRequest { message ClientConnectionResponse { bool approved = 1; + bytes pubkey = 2; } -message ClientConnectionCancel {} +message ClientConnectionCancel { + bytes pubkey = 1; +} message UserAgentRequest { int32 id = 14; diff --git a/server/crates/arbiter-server/src/actors/client/auth.rs b/server/crates/arbiter-server/src/actors/client/auth.rs index 1864678..67a94a4 100644 --- a/server/crates/arbiter-server/src/actors/client/auth.rs +++ b/server/crates/arbiter-server/src/actors/client/auth.rs @@ -14,7 +14,7 @@ use tracing::error; use crate::{ actors::{ - client::ClientConnection, + client::{ClientConnection, ClientProfile}, flow_coordinator::{self, RequestClientApproval}, }, db::{ @@ -113,13 +113,11 @@ async fn get_nonce(db: &db::DatabasePool, pubkey: &VerifyingKey) -> Result Result<(), Error> { let result = actors .flow_coordinator - .ask(RequestClientApproval { - client_pubkey: pubkey, - }) + .ask(RequestClientApproval { client: profile }) .await; match result { @@ -317,7 +315,14 @@ where let nonce = match get_nonce(&props.db, &pubkey).await? { Some(nonce) => nonce, None => { - approve_new_client(&props.actors, pubkey).await?; + approve_new_client( + &props.actors, + ClientProfile { + pubkey, + metadata: metadata.clone(), + }, + ) + .await?; insert_client(&props.db, &pubkey, &metadata).await?; 0 } diff --git a/server/crates/arbiter-server/src/actors/client/mod.rs b/server/crates/arbiter-server/src/actors/client/mod.rs index 3fae866..2ad1413 100644 --- a/server/crates/arbiter-server/src/actors/client/mod.rs +++ b/server/crates/arbiter-server/src/actors/client/mod.rs @@ -3,10 +3,16 @@ use kameo::actor::Spawn; use tracing::{error, info}; use crate::{ - actors::{GlobalActors, client::session::ClientSession}, + actors::{GlobalActors, client::{auth::ClientMetadata, session::ClientSession}}, db, }; +#[derive(Debug, Clone)] +pub struct ClientProfile { + pub pubkey: ed25519_dalek::VerifyingKey, + pub metadata: ClientMetadata, +} + pub struct ClientConnection { pub(crate) db: db::DatabasePool, pub(crate) actors: GlobalActors, diff --git a/server/crates/arbiter-server/src/actors/flow_coordinator/client_connect_approval.rs b/server/crates/arbiter-server/src/actors/flow_coordinator/client_connect_approval.rs new file mode 100644 index 0000000..a3868e0 --- /dev/null +++ b/server/crates/arbiter-server/src/actors/flow_coordinator/client_connect_approval.rs @@ -0,0 +1,101 @@ +use std::ops::ControlFlow; + +use kameo::{ + Actor, messages, + prelude::{ActorId, ActorRef, ActorStopReason, Context, WeakActorRef}, + reply::ReplySender, +}; + +use crate::actors::{ + client::ClientProfile, + flow_coordinator::ApprovalError, + user_agent::{UserAgentSession, session::BeginNewClientApproval}, +}; + +pub struct Args { + pub client: ClientProfile, + pub user_agents: Vec>, + pub reply: ReplySender> +} + +pub struct ClientApprovalController { + /// Number of UAs that have not yet responded (approval or denial) or died. + pending: usize, + /// Number of approvals received so far. + approved: usize, + reply: Option>>, +} + +impl ClientApprovalController { + fn send_reply(&mut self, result: Result) { + if let Some(reply) = self.reply.take() { + reply.send(result); + } + } +} + +impl Actor for ClientApprovalController { + type Args = Args; + type Error = (); + + async fn on_start( + Args { client, mut user_agents, reply }: Self::Args, + actor_ref: ActorRef, + ) -> Result { + let this = Self { + pending: user_agents.len(), + approved: 0, + reply: Some(reply), + }; + + for user_agent in user_agents.drain(..) { + actor_ref.link(&user_agent).await; + let _ = user_agent + .tell(BeginNewClientApproval { + client: client.clone(), + controller: actor_ref.clone(), + }) + .await; + } + + Ok(this) + } + + async fn on_link_died( + &mut self, + _: WeakActorRef, + _: ActorId, + _: ActorStopReason, + ) -> Result, Self::Error> { + // A linked UA died before responding — counts as a non-approval. + self.pending = self.pending.saturating_sub(1); + if self.pending == 0 { + // At least one UA didn't approve: deny. + self.send_reply(Ok(false)); + return Ok(ControlFlow::Break(ActorStopReason::Normal)); + } + Ok(ControlFlow::Continue(())) + } +} + +#[messages] +impl ClientApprovalController { + #[message(ctx)] + pub async fn client_approval_answer(&mut self, approved: bool, ctx: &mut Context) { + if !approved { + // Denial wins immediately regardless of other pending responses. + self.send_reply(Ok(false)); + ctx.stop(); + return; + } + + self.approved += 1; + self.pending = self.pending.saturating_sub(1); + + if self.pending == 0 { + // Every connected UA approved. + self.send_reply(Ok(true)); + ctx.stop(); + } + } +} diff --git a/server/crates/arbiter-server/src/actors/flow_coordinator/mod.rs b/server/crates/arbiter-server/src/actors/flow_coordinator/mod.rs index ccaeb56..2e0aa9a 100644 --- a/server/crates/arbiter-server/src/actors/flow_coordinator/mod.rs +++ b/server/crates/arbiter-server/src/actors/flow_coordinator/mod.rs @@ -1,21 +1,22 @@ use std::{collections::HashMap, ops::ControlFlow}; -use ed25519_dalek::VerifyingKey; use kameo::{ Actor, - actor::{ActorId, ActorRef}, + actor::{ActorId, ActorRef, Spawn}, messages, prelude::{ActorStopReason, Context, WeakActorRef}, reply::DelegatedReply, }; -use tokio::{sync::watch, task::JoinSet}; -use tracing::{info, warn}; +use tracing::info; use crate::actors::{ - client::session::ClientSession, - user_agent::session::{RequestNewClientApproval, UserAgentSession}, + client::{ClientProfile, session::ClientSession}, + flow_coordinator::client_connect_approval::ClientApprovalController, + user_agent::session::UserAgentSession, }; +pub mod client_connect_approval; + #[derive(Default)] pub struct FlowCoordinator { pub user_agents: HashMap>, @@ -44,7 +45,11 @@ impl Actor for FlowCoordinator { event = "useragent.disconnected" ); } else if self.clients.remove(&id).is_some() { - info!(?id, actor = "FlowCoordinator", event = "client.disconnected"); + info!( + ?id, + actor = "FlowCoordinator", + event = "client.disconnected" + ); } else { info!( ?id, @@ -62,67 +67,6 @@ pub enum ApprovalError { NoUserAgentsConnected, } -async fn request_client_approval( - user_agents: &[WeakActorRef], - client_pubkey: VerifyingKey, -) -> Result { - if user_agents.is_empty() { - return Err(ApprovalError::NoUserAgentsConnected); - } - - let mut pool = JoinSet::new(); - let (cancel_tx, cancel_rx) = watch::channel(()); - - for weak_ref in user_agents { - match weak_ref.upgrade() { - Some(agent) => { - let cancel_rx = cancel_rx.clone(); - pool.spawn(async move { - agent - .ask(RequestNewClientApproval { - client_pubkey, - cancel_flag: cancel_rx.clone(), - }) - .await - }); - } - None => { - warn!( - id = weak_ref.id().to_string(), - actor = "FlowCoordinator", - event = "useragent.disconnected_before_approval" - ); - } - } - } - - while let Some(result) = pool.join_next().await { - match result { - Ok(Ok(approved)) => { - // cancel other pending requests - let _ = cancel_tx.send(()); - return Ok(approved); - } - Ok(Err(err)) => { - warn!( - ?err, - actor = "FlowCoordinator", - event = "useragent.approval_error" - ); - } - Err(err) => { - warn!( - ?err, - actor = "FlowCoordinator", - event = "useragent.approval_task_failed" - ); - } - } - } - - Err(ApprovalError::NoUserAgentsConnected) -} - #[messages] impl FlowCoordinator { #[message(ctx)] @@ -150,23 +94,23 @@ impl FlowCoordinator { #[message(ctx)] pub async fn request_client_approval( &mut self, - client_pubkey: VerifyingKey, + client: ClientProfile, ctx: &mut Context>>, ) -> DelegatedReply> { let (reply, Some(reply_sender)) = ctx.reply_sender() else { unreachable!("Expected `request_client_approval` to have callback channel"); }; - let weak_refs = self - .user_agents - .values() - .map(|agent| agent.downgrade()) - .collect::>(); + let refs: Vec<_> = self.user_agents.values().cloned().collect(); + if refs.is_empty() { + reply_sender.send(Err(ApprovalError::NoUserAgentsConnected)); + return reply; + } - // handle in subtask to not to lock the actor - tokio::task::spawn(async move { - let result = request_client_approval(&weak_refs, client_pubkey).await; - reply_sender.send(result); + ClientApprovalController::spawn(client_connect_approval::Args { + client, + user_agents: refs, + reply: reply_sender, }); reply diff --git a/server/crates/arbiter-server/src/actors/user_agent/mod.rs b/server/crates/arbiter-server/src/actors/user_agent/mod.rs index 986fbb5..3a45cc5 100644 --- a/server/crates/arbiter-server/src/actors/user_agent/mod.rs +++ b/server/crates/arbiter-server/src/actors/user_agent/mod.rs @@ -1,5 +1,5 @@ use crate::{ - actors::GlobalActors, + actors::{GlobalActors, client::ClientProfile}, db::{self, models::KeyType}, }; @@ -72,8 +72,8 @@ impl TryFrom<(KeyType, Vec)> for AuthPublicKey { // Messages, sent by user agent to connection client without having a request #[derive(Debug)] pub enum OutOfBand { - ClientConnectionRequest { pubkey: ed25519_dalek::VerifyingKey }, - ClientConnectionCancel, + ClientConnectionRequest { profile: ClientProfile }, + ClientConnectionCancel { pubkey: ed25519_dalek::VerifyingKey }, } pub struct UserAgentConnection { diff --git a/server/crates/arbiter-server/src/actors/user_agent/session.rs b/server/crates/arbiter-server/src/actors/user_agent/session.rs index ad9b266..d44ab3b 100644 --- a/server/crates/arbiter-server/src/actors/user_agent/session.rs +++ b/server/crates/arbiter-server/src/actors/user_agent/session.rs @@ -1,15 +1,15 @@ -use std::borrow::Cow; +use std::{borrow::Cow, collections::HashMap}; use arbiter_proto::transport::Sender; use async_trait::async_trait; use ed25519_dalek::VerifyingKey; -use kameo::{Actor, messages}; +use kameo::{Actor, actor::ActorRef, messages, prelude::Context}; use thiserror::Error; -use tokio::sync::watch; use tracing::error; use crate::actors::{ - flow_coordinator::RegisterUserAgent, + client::ClientProfile, + flow_coordinator::{RegisterUserAgent, client_connect_approval::ClientApprovalController}, user_agent::{OutOfBand, UserAgentConnection}, }; @@ -33,20 +33,23 @@ impl Error { } } +pub struct PendingClientApproval { + controller: ActorRef, +} + pub struct UserAgentSession { props: UserAgentConnection, state: UserAgentStateMachine, - #[allow( - dead_code, - reason = "The session keeps ownership of the outbound transport even before the state-machine flow starts using it directly" - )] sender: Box>, + + pending_client_approvals: HashMap, } mod connection; pub(crate) use connection::{ BootstrapError, HandleBootstrapEncryptedKey, HandleEvmWalletCreate, HandleEvmWalletList, - HandleGrantCreate, HandleGrantDelete, HandleGrantList, HandleQueryVaultState, + HandleGrantCreate, HandleGrantDelete, HandleGrantList, HandleNewClientApprove, + HandleQueryVaultState, }; pub use connection::{HandleUnsealEncryptedKey, HandleUnsealRequest, UnsealError}; @@ -56,6 +59,7 @@ impl UserAgentSession { props, state: UserAgentStateMachine::new(DummyContext), sender, + pending_client_approvals: Default::default(), } } @@ -87,15 +91,28 @@ impl UserAgentSession { #[messages] impl UserAgentSession { #[message] - pub async fn request_new_client_approval( + pub async fn begin_new_client_approval( &mut self, - client_pubkey: VerifyingKey, - cancel_flag: watch::Receiver<()>, - ) -> Result { - // temporary use to make clippy happy while we refactor this flow - dbg!(client_pubkey); - dbg!(cancel_flag); - todo!("Think about refactoring it to state-machine based flow, as we already have one") + client: ClientProfile, + controller: ActorRef, + ) { + if let Err(e) = self + .sender + .send(OutOfBand::ClientConnectionRequest { + profile: client.clone(), + }) + .await + { + error!( + ?e, + actor = "user_agent", + event = "failed to announce new client connection" + ); + return; + } + + self.pending_client_approvals + .insert(client.pubkey, PendingClientApproval { controller }); } } @@ -116,9 +133,42 @@ impl Actor for UserAgentSession { }) .await .map_err(|err| { - error!(?err, "Failed to register user agent connection with flow coordinator"); + error!( + ?err, + "Failed to register user agent connection with flow coordinator" + ); Error::internal("Failed to register user agent connection with flow coordinator") })?; Ok(args) } + + async fn on_link_died( + &mut self, + _: kameo::prelude::WeakActorRef, + id: kameo::prelude::ActorId, + _: kameo::prelude::ActorStopReason, + ) -> Result, Self::Error> { + let cancelled_pubkey = self + .pending_client_approvals + .iter() + .find_map(|(k, v)| (v.controller.id() == id).then_some(*k)); + + if let Some(pubkey) = cancelled_pubkey { + self.pending_client_approvals.remove(&pubkey); + + if let Err(e) = self + .sender + .send(OutOfBand::ClientConnectionCancel { pubkey }) + .await + { + error!( + ?e, + actor = "user_agent", + event = "failed to announce client connection cancellation" + ); + } + } + + Ok(std::ops::ControlFlow::Continue(())) + } } diff --git a/server/crates/arbiter-server/src/actors/user_agent/session/connection.rs b/server/crates/arbiter-server/src/actors/user_agent/session/connection.rs index 1156059..b21872a 100644 --- a/server/crates/arbiter-server/src/actors/user_agent/session/connection.rs +++ b/server/crates/arbiter-server/src/actors/user_agent/session/connection.rs @@ -4,9 +4,11 @@ use alloy::primitives::Address; use chacha20poly1305::{AeadInPlace, XChaCha20Poly1305, XNonce, aead::KeyInit}; use kameo::error::SendError; use kameo::messages; +use kameo::prelude::Context; use tracing::{error, info}; use x25519_dalek::{EphemeralSecret, PublicKey}; +use crate::actors::flow_coordinator::client_connect_approval::ClientApprovalAnswer; use crate::actors::keyholder::KeyHolderState; use crate::actors::user_agent::session::Error; use crate::evm::policies::{Grant, SpecificGrant}; @@ -347,3 +349,38 @@ impl UserAgentSession { } } } + +#[messages] +impl UserAgentSession { + #[message(ctx)] + pub(crate) async fn handle_new_client_approve( + &mut self, + approved: bool, + pubkey: ed25519_dalek::VerifyingKey, + ctx: &mut Context>, + ) -> Result<(), Error> { + let pending_approval = match self.pending_client_approvals.remove(&pubkey) { + Some(approval) => approval, + None => { + error!("Received client connection response for unknown client"); + return Err(Error::internal("Unknown client in connection response")); + } + }; + + pending_approval + .controller + .tell(ClientApprovalAnswer { approved }) + .await + .map_err(|err| { + error!( + ?err, + "Failed to send client approval response to controller" + ); + Error::internal("Failed to send client approval response to controller") + })?; + + ctx.actor_ref().unlink(&pending_approval.controller).await; + + Ok(()) + } +} diff --git a/server/crates/arbiter-server/src/grpc/client/auth.rs b/server/crates/arbiter-server/src/grpc/client/auth.rs index 6dd6de9..31779cc 100644 --- a/server/crates/arbiter-server/src/grpc/client/auth.rs +++ b/server/crates/arbiter-server/src/grpc/client/auth.rs @@ -173,8 +173,8 @@ impl Bi> for AuthTransportAda fn client_metadata_from_proto(metadata: ProtoClientInfo) -> auth::ClientMetadata { auth::ClientMetadata { name: metadata.name, - description: (!metadata.description.is_empty()).then_some(metadata.description), - version: (!metadata.version.is_empty()).then_some(metadata.version), + description: metadata.description, + version: metadata.version, } } diff --git a/server/crates/arbiter-server/src/grpc/user_agent.rs b/server/crates/arbiter-server/src/grpc/user_agent.rs index 74c612b..03dd200 100644 --- a/server/crates/arbiter-server/src/grpc/user_agent.rs +++ b/server/crates/arbiter-server/src/grpc/user_agent.rs @@ -2,6 +2,7 @@ use tokio::sync::mpsc; use arbiter_proto::{ proto::{ + client::ClientInfo as ProtoClientMetadata, evm::{ EtherTransferSettings as ProtoEtherTransferSettings, EvmError as ProtoEvmError, EvmGrantCreateRequest, EvmGrantCreateResponse, EvmGrantDeleteRequest, @@ -45,7 +46,8 @@ use crate::{ session::{ BootstrapError, Error, HandleBootstrapEncryptedKey, HandleEvmWalletCreate, HandleEvmWalletList, HandleGrantCreate, HandleGrantDelete, HandleGrantList, - HandleQueryVaultState, HandleUnsealEncryptedKey, HandleUnsealRequest, UnsealError, + HandleNewClientApprove, HandleQueryVaultState, HandleUnsealEncryptedKey, + HandleUnsealRequest, UnsealError, }, }, }, @@ -259,7 +261,41 @@ async fn dispatch_conn_message( actor.ask(HandleGrantDelete { grant_id }).await, )) } - payload => { + UserAgentRequestPayload::ClientConnectionResponse(resp) => { + let pubkey_bytes: [u8; 32] = match resp.pubkey.try_into() { + Ok(bytes) => bytes, + Err(_) => { + let _ = bi + .send(Err(Status::invalid_argument("Invalid Ed25519 public key length"))) + .await; + return Err(()); + } + }; + let pubkey = match ed25519_dalek::VerifyingKey::from_bytes(&pubkey_bytes) { + Ok(key) => key, + Err(_) => { + let _ = bi + .send(Err(Status::invalid_argument("Invalid Ed25519 public key"))) + .await; + return Err(()); + } + }; + + if let Err(err) = actor + .ask(HandleNewClientApprove { + approved: resp.approved, + pubkey, + }) + .await + { + warn!(?err, "Failed to process client connection response"); + let _ = bi.send(Err(Status::internal("Failed to process response"))).await; + return Err(()); + } + + return Ok(()); + } + UserAgentRequestPayload::AuthChallengeRequest(..) | UserAgentRequestPayload::AuthChallengeSolution(..) => { warn!(?payload, "Unsupported post-auth user agent request"); let _ = bi .send(Err(Status::invalid_argument( @@ -268,6 +304,7 @@ async fn dispatch_conn_message( .await; return Err(()); } + }; bi.send(Ok(UserAgentResponse { @@ -283,14 +320,20 @@ async fn send_out_of_band( oob: OutOfBand, ) -> Result<(), ()> { let payload = match oob { - OutOfBand::ClientConnectionRequest { pubkey } => { + OutOfBand::ClientConnectionRequest { profile } => { UserAgentResponsePayload::ClientConnectionRequest(ClientConnectionRequest { - pubkey: pubkey.to_bytes().to_vec(), - info: None, + pubkey: profile.pubkey.to_bytes().to_vec(), + info: Some(ProtoClientMetadata { + name: profile.metadata.name, + description: profile.metadata.description, + version: profile.metadata.version, + }), }) } - OutOfBand::ClientConnectionCancel => { - UserAgentResponsePayload::ClientConnectionCancel(ClientConnectionCancel {}) + OutOfBand::ClientConnectionCancel { pubkey } => { + UserAgentResponsePayload::ClientConnectionCancel(ClientConnectionCancel { + pubkey: pubkey.to_bytes().to_vec(), + }) } };