feat(server): re-introduce client approval flow

This commit is contained in:
hdbg
2026-03-21 14:50:52 +01:00
parent 51674bb39c
commit 8043cdf8d8
11 changed files with 307 additions and 118 deletions

View File

@@ -7,8 +7,8 @@ import "google/protobuf/empty.proto";
message ClientInfo {
string name = 1;
string description = 2;
string version = 3;
optional string description = 2;
optional string version = 3;
}
message AuthChallengeRequest {

View File

@@ -86,9 +86,12 @@ message ClientConnectionRequest {
message ClientConnectionResponse {
bool approved = 1;
bytes pubkey = 2;
}
message ClientConnectionCancel {}
message ClientConnectionCancel {
bytes pubkey = 1;
}
message UserAgentRequest {
int32 id = 14;

View File

@@ -14,7 +14,7 @@ use tracing::error;
use crate::{
actors::{
client::ClientConnection,
client::{ClientConnection, ClientProfile},
flow_coordinator::{self, RequestClientApproval},
},
db::{
@@ -113,13 +113,11 @@ async fn get_nonce(db: &db::DatabasePool, pubkey: &VerifyingKey) -> Result<Optio
async fn approve_new_client(
actors: &crate::actors::GlobalActors,
pubkey: VerifyingKey,
profile: ClientProfile,
) -> Result<(), Error> {
let result = actors
.flow_coordinator
.ask(RequestClientApproval {
client_pubkey: pubkey,
})
.ask(RequestClientApproval { client: profile })
.await;
match result {
@@ -317,7 +315,14 @@ where
let nonce = match get_nonce(&props.db, &pubkey).await? {
Some(nonce) => nonce,
None => {
approve_new_client(&props.actors, pubkey).await?;
approve_new_client(
&props.actors,
ClientProfile {
pubkey,
metadata: metadata.clone(),
},
)
.await?;
insert_client(&props.db, &pubkey, &metadata).await?;
0
}

View File

@@ -3,10 +3,16 @@ use kameo::actor::Spawn;
use tracing::{error, info};
use crate::{
actors::{GlobalActors, client::session::ClientSession},
actors::{GlobalActors, client::{auth::ClientMetadata, session::ClientSession}},
db,
};
#[derive(Debug, Clone)]
pub struct ClientProfile {
pub pubkey: ed25519_dalek::VerifyingKey,
pub metadata: ClientMetadata,
}
pub struct ClientConnection {
pub(crate) db: db::DatabasePool,
pub(crate) actors: GlobalActors,

View File

@@ -0,0 +1,101 @@
use std::ops::ControlFlow;
use kameo::{
Actor, messages,
prelude::{ActorId, ActorRef, ActorStopReason, Context, WeakActorRef},
reply::ReplySender,
};
use crate::actors::{
client::ClientProfile,
flow_coordinator::ApprovalError,
user_agent::{UserAgentSession, session::BeginNewClientApproval},
};
pub struct Args {
pub client: ClientProfile,
pub user_agents: Vec<ActorRef<UserAgentSession>>,
pub reply: ReplySender<Result<bool, ApprovalError>>
}
pub struct ClientApprovalController {
/// Number of UAs that have not yet responded (approval or denial) or died.
pending: usize,
/// Number of approvals received so far.
approved: usize,
reply: Option<ReplySender<Result<bool, ApprovalError>>>,
}
impl ClientApprovalController {
fn send_reply(&mut self, result: Result<bool, ApprovalError>) {
if let Some(reply) = self.reply.take() {
reply.send(result);
}
}
}
impl Actor for ClientApprovalController {
type Args = Args;
type Error = ();
async fn on_start(
Args { client, mut user_agents, reply }: Self::Args,
actor_ref: ActorRef<Self>,
) -> Result<Self, Self::Error> {
let this = Self {
pending: user_agents.len(),
approved: 0,
reply: Some(reply),
};
for user_agent in user_agents.drain(..) {
actor_ref.link(&user_agent).await;
let _ = user_agent
.tell(BeginNewClientApproval {
client: client.clone(),
controller: actor_ref.clone(),
})
.await;
}
Ok(this)
}
async fn on_link_died(
&mut self,
_: WeakActorRef<Self>,
_: ActorId,
_: ActorStopReason,
) -> Result<ControlFlow<ActorStopReason>, Self::Error> {
// A linked UA died before responding — counts as a non-approval.
self.pending = self.pending.saturating_sub(1);
if self.pending == 0 {
// At least one UA didn't approve: deny.
self.send_reply(Ok(false));
return Ok(ControlFlow::Break(ActorStopReason::Normal));
}
Ok(ControlFlow::Continue(()))
}
}
#[messages]
impl ClientApprovalController {
#[message(ctx)]
pub async fn client_approval_answer(&mut self, approved: bool, ctx: &mut Context<Self, ()>) {
if !approved {
// Denial wins immediately regardless of other pending responses.
self.send_reply(Ok(false));
ctx.stop();
return;
}
self.approved += 1;
self.pending = self.pending.saturating_sub(1);
if self.pending == 0 {
// Every connected UA approved.
self.send_reply(Ok(true));
ctx.stop();
}
}
}

View File

@@ -1,21 +1,22 @@
use std::{collections::HashMap, ops::ControlFlow};
use ed25519_dalek::VerifyingKey;
use kameo::{
Actor,
actor::{ActorId, ActorRef},
actor::{ActorId, ActorRef, Spawn},
messages,
prelude::{ActorStopReason, Context, WeakActorRef},
reply::DelegatedReply,
};
use tokio::{sync::watch, task::JoinSet};
use tracing::{info, warn};
use tracing::info;
use crate::actors::{
client::session::ClientSession,
user_agent::session::{RequestNewClientApproval, UserAgentSession},
client::{ClientProfile, session::ClientSession},
flow_coordinator::client_connect_approval::ClientApprovalController,
user_agent::session::UserAgentSession,
};
pub mod client_connect_approval;
#[derive(Default)]
pub struct FlowCoordinator {
pub user_agents: HashMap<ActorId, ActorRef<UserAgentSession>>,
@@ -44,7 +45,11 @@ impl Actor for FlowCoordinator {
event = "useragent.disconnected"
);
} else if self.clients.remove(&id).is_some() {
info!(?id, actor = "FlowCoordinator", event = "client.disconnected");
info!(
?id,
actor = "FlowCoordinator",
event = "client.disconnected"
);
} else {
info!(
?id,
@@ -62,67 +67,6 @@ pub enum ApprovalError {
NoUserAgentsConnected,
}
async fn request_client_approval(
user_agents: &[WeakActorRef<UserAgentSession>],
client_pubkey: VerifyingKey,
) -> Result<bool, ApprovalError> {
if user_agents.is_empty() {
return Err(ApprovalError::NoUserAgentsConnected);
}
let mut pool = JoinSet::new();
let (cancel_tx, cancel_rx) = watch::channel(());
for weak_ref in user_agents {
match weak_ref.upgrade() {
Some(agent) => {
let cancel_rx = cancel_rx.clone();
pool.spawn(async move {
agent
.ask(RequestNewClientApproval {
client_pubkey,
cancel_flag: cancel_rx.clone(),
})
.await
});
}
None => {
warn!(
id = weak_ref.id().to_string(),
actor = "FlowCoordinator",
event = "useragent.disconnected_before_approval"
);
}
}
}
while let Some(result) = pool.join_next().await {
match result {
Ok(Ok(approved)) => {
// cancel other pending requests
let _ = cancel_tx.send(());
return Ok(approved);
}
Ok(Err(err)) => {
warn!(
?err,
actor = "FlowCoordinator",
event = "useragent.approval_error"
);
}
Err(err) => {
warn!(
?err,
actor = "FlowCoordinator",
event = "useragent.approval_task_failed"
);
}
}
}
Err(ApprovalError::NoUserAgentsConnected)
}
#[messages]
impl FlowCoordinator {
#[message(ctx)]
@@ -150,23 +94,23 @@ impl FlowCoordinator {
#[message(ctx)]
pub async fn request_client_approval(
&mut self,
client_pubkey: VerifyingKey,
client: ClientProfile,
ctx: &mut Context<Self, DelegatedReply<Result<bool, ApprovalError>>>,
) -> DelegatedReply<Result<bool, ApprovalError>> {
let (reply, Some(reply_sender)) = ctx.reply_sender() else {
unreachable!("Expected `request_client_approval` to have callback channel");
};
let weak_refs = self
.user_agents
.values()
.map(|agent| agent.downgrade())
.collect::<Vec<_>>();
let refs: Vec<_> = self.user_agents.values().cloned().collect();
if refs.is_empty() {
reply_sender.send(Err(ApprovalError::NoUserAgentsConnected));
return reply;
}
// handle in subtask to not to lock the actor
tokio::task::spawn(async move {
let result = request_client_approval(&weak_refs, client_pubkey).await;
reply_sender.send(result);
ClientApprovalController::spawn(client_connect_approval::Args {
client,
user_agents: refs,
reply: reply_sender,
});
reply

View File

@@ -1,5 +1,5 @@
use crate::{
actors::GlobalActors,
actors::{GlobalActors, client::ClientProfile},
db::{self, models::KeyType},
};
@@ -72,8 +72,8 @@ impl TryFrom<(KeyType, Vec<u8>)> for AuthPublicKey {
// Messages, sent by user agent to connection client without having a request
#[derive(Debug)]
pub enum OutOfBand {
ClientConnectionRequest { pubkey: ed25519_dalek::VerifyingKey },
ClientConnectionCancel,
ClientConnectionRequest { profile: ClientProfile },
ClientConnectionCancel { pubkey: ed25519_dalek::VerifyingKey },
}
pub struct UserAgentConnection {

View File

@@ -1,15 +1,15 @@
use std::borrow::Cow;
use std::{borrow::Cow, collections::HashMap};
use arbiter_proto::transport::Sender;
use async_trait::async_trait;
use ed25519_dalek::VerifyingKey;
use kameo::{Actor, messages};
use kameo::{Actor, actor::ActorRef, messages, prelude::Context};
use thiserror::Error;
use tokio::sync::watch;
use tracing::error;
use crate::actors::{
flow_coordinator::RegisterUserAgent,
client::ClientProfile,
flow_coordinator::{RegisterUserAgent, client_connect_approval::ClientApprovalController},
user_agent::{OutOfBand, UserAgentConnection},
};
@@ -33,20 +33,23 @@ impl Error {
}
}
pub struct PendingClientApproval {
controller: ActorRef<ClientApprovalController>,
}
pub struct UserAgentSession {
props: UserAgentConnection,
state: UserAgentStateMachine<DummyContext>,
#[allow(
dead_code,
reason = "The session keeps ownership of the outbound transport even before the state-machine flow starts using it directly"
)]
sender: Box<dyn Sender<OutOfBand>>,
pending_client_approvals: HashMap<VerifyingKey, PendingClientApproval>,
}
mod connection;
pub(crate) use connection::{
BootstrapError, HandleBootstrapEncryptedKey, HandleEvmWalletCreate, HandleEvmWalletList,
HandleGrantCreate, HandleGrantDelete, HandleGrantList, HandleQueryVaultState,
HandleGrantCreate, HandleGrantDelete, HandleGrantList, HandleNewClientApprove,
HandleQueryVaultState,
};
pub use connection::{HandleUnsealEncryptedKey, HandleUnsealRequest, UnsealError};
@@ -56,6 +59,7 @@ impl UserAgentSession {
props,
state: UserAgentStateMachine::new(DummyContext),
sender,
pending_client_approvals: Default::default(),
}
}
@@ -87,15 +91,28 @@ impl UserAgentSession {
#[messages]
impl UserAgentSession {
#[message]
pub async fn request_new_client_approval(
pub async fn begin_new_client_approval(
&mut self,
client_pubkey: VerifyingKey,
cancel_flag: watch::Receiver<()>,
) -> Result<bool, ()> {
// temporary use to make clippy happy while we refactor this flow
dbg!(client_pubkey);
dbg!(cancel_flag);
todo!("Think about refactoring it to state-machine based flow, as we already have one")
client: ClientProfile,
controller: ActorRef<ClientApprovalController>,
) {
if let Err(e) = self
.sender
.send(OutOfBand::ClientConnectionRequest {
profile: client.clone(),
})
.await
{
error!(
?e,
actor = "user_agent",
event = "failed to announce new client connection"
);
return;
}
self.pending_client_approvals
.insert(client.pubkey, PendingClientApproval { controller });
}
}
@@ -116,9 +133,42 @@ impl Actor for UserAgentSession {
})
.await
.map_err(|err| {
error!(?err, "Failed to register user agent connection with flow coordinator");
error!(
?err,
"Failed to register user agent connection with flow coordinator"
);
Error::internal("Failed to register user agent connection with flow coordinator")
})?;
Ok(args)
}
async fn on_link_died(
&mut self,
_: kameo::prelude::WeakActorRef<Self>,
id: kameo::prelude::ActorId,
_: kameo::prelude::ActorStopReason,
) -> Result<std::ops::ControlFlow<kameo::prelude::ActorStopReason>, Self::Error> {
let cancelled_pubkey = self
.pending_client_approvals
.iter()
.find_map(|(k, v)| (v.controller.id() == id).then_some(*k));
if let Some(pubkey) = cancelled_pubkey {
self.pending_client_approvals.remove(&pubkey);
if let Err(e) = self
.sender
.send(OutOfBand::ClientConnectionCancel { pubkey })
.await
{
error!(
?e,
actor = "user_agent",
event = "failed to announce client connection cancellation"
);
}
}
Ok(std::ops::ControlFlow::Continue(()))
}
}

View File

@@ -4,9 +4,11 @@ use alloy::primitives::Address;
use chacha20poly1305::{AeadInPlace, XChaCha20Poly1305, XNonce, aead::KeyInit};
use kameo::error::SendError;
use kameo::messages;
use kameo::prelude::Context;
use tracing::{error, info};
use x25519_dalek::{EphemeralSecret, PublicKey};
use crate::actors::flow_coordinator::client_connect_approval::ClientApprovalAnswer;
use crate::actors::keyholder::KeyHolderState;
use crate::actors::user_agent::session::Error;
use crate::evm::policies::{Grant, SpecificGrant};
@@ -347,3 +349,38 @@ impl UserAgentSession {
}
}
}
#[messages]
impl UserAgentSession {
#[message(ctx)]
pub(crate) async fn handle_new_client_approve(
&mut self,
approved: bool,
pubkey: ed25519_dalek::VerifyingKey,
ctx: &mut Context<Self, Result<(), Error>>,
) -> Result<(), Error> {
let pending_approval = match self.pending_client_approvals.remove(&pubkey) {
Some(approval) => approval,
None => {
error!("Received client connection response for unknown client");
return Err(Error::internal("Unknown client in connection response"));
}
};
pending_approval
.controller
.tell(ClientApprovalAnswer { approved })
.await
.map_err(|err| {
error!(
?err,
"Failed to send client approval response to controller"
);
Error::internal("Failed to send client approval response to controller")
})?;
ctx.actor_ref().unlink(&pending_approval.controller).await;
Ok(())
}
}

View File

@@ -173,8 +173,8 @@ impl Bi<auth::Inbound, Result<auth::Outbound, auth::Error>> for AuthTransportAda
fn client_metadata_from_proto(metadata: ProtoClientInfo) -> auth::ClientMetadata {
auth::ClientMetadata {
name: metadata.name,
description: (!metadata.description.is_empty()).then_some(metadata.description),
version: (!metadata.version.is_empty()).then_some(metadata.version),
description: metadata.description,
version: metadata.version,
}
}

View File

@@ -2,6 +2,7 @@ use tokio::sync::mpsc;
use arbiter_proto::{
proto::{
client::ClientInfo as ProtoClientMetadata,
evm::{
EtherTransferSettings as ProtoEtherTransferSettings, EvmError as ProtoEvmError,
EvmGrantCreateRequest, EvmGrantCreateResponse, EvmGrantDeleteRequest,
@@ -45,7 +46,8 @@ use crate::{
session::{
BootstrapError, Error, HandleBootstrapEncryptedKey, HandleEvmWalletCreate,
HandleEvmWalletList, HandleGrantCreate, HandleGrantDelete, HandleGrantList,
HandleQueryVaultState, HandleUnsealEncryptedKey, HandleUnsealRequest, UnsealError,
HandleNewClientApprove, HandleQueryVaultState, HandleUnsealEncryptedKey,
HandleUnsealRequest, UnsealError,
},
},
},
@@ -259,7 +261,41 @@ async fn dispatch_conn_message(
actor.ask(HandleGrantDelete { grant_id }).await,
))
}
payload => {
UserAgentRequestPayload::ClientConnectionResponse(resp) => {
let pubkey_bytes: [u8; 32] = match resp.pubkey.try_into() {
Ok(bytes) => bytes,
Err(_) => {
let _ = bi
.send(Err(Status::invalid_argument("Invalid Ed25519 public key length")))
.await;
return Err(());
}
};
let pubkey = match ed25519_dalek::VerifyingKey::from_bytes(&pubkey_bytes) {
Ok(key) => key,
Err(_) => {
let _ = bi
.send(Err(Status::invalid_argument("Invalid Ed25519 public key")))
.await;
return Err(());
}
};
if let Err(err) = actor
.ask(HandleNewClientApprove {
approved: resp.approved,
pubkey,
})
.await
{
warn!(?err, "Failed to process client connection response");
let _ = bi.send(Err(Status::internal("Failed to process response"))).await;
return Err(());
}
return Ok(());
}
UserAgentRequestPayload::AuthChallengeRequest(..) | UserAgentRequestPayload::AuthChallengeSolution(..) => {
warn!(?payload, "Unsupported post-auth user agent request");
let _ = bi
.send(Err(Status::invalid_argument(
@@ -268,6 +304,7 @@ async fn dispatch_conn_message(
.await;
return Err(());
}
};
bi.send(Ok(UserAgentResponse {
@@ -283,14 +320,20 @@ async fn send_out_of_band(
oob: OutOfBand,
) -> Result<(), ()> {
let payload = match oob {
OutOfBand::ClientConnectionRequest { pubkey } => {
OutOfBand::ClientConnectionRequest { profile } => {
UserAgentResponsePayload::ClientConnectionRequest(ClientConnectionRequest {
pubkey: pubkey.to_bytes().to_vec(),
info: None,
pubkey: profile.pubkey.to_bytes().to_vec(),
info: Some(ProtoClientMetadata {
name: profile.metadata.name,
description: profile.metadata.description,
version: profile.metadata.version,
}),
})
}
OutOfBand::ClientConnectionCancel => {
UserAgentResponsePayload::ClientConnectionCancel(ClientConnectionCancel {})
OutOfBand::ClientConnectionCancel { pubkey } => {
UserAgentResponsePayload::ClientConnectionCancel(ClientConnectionCancel {
pubkey: pubkey.to_bytes().to_vec(),
})
}
};