From 1799aef6f8f631e02fd2ca34a5865923bb8d64dc Mon Sep 17 00:00:00 2001 From: hdbg Date: Thu, 26 Feb 2026 17:15:35 +0100 Subject: [PATCH 1/5] refactor(transport): implemented `Bi` stream based abstraction for actor communication with `next` loop override --- server/Cargo.lock | 1 + server/crates/arbiter-proto/Cargo.toml | 2 +- server/crates/arbiter-proto/src/transport.rs | 301 ++++++++++++++++-- .../src/actors/user_agent/mod.rs | 262 ++++++++++----- .../src/actors/user_agent/transport.rs | 95 ------ server/crates/arbiter-server/src/errors.rs | 24 -- server/crates/arbiter-server/src/lib.rs | 98 +++++- .../arbiter-server/tests/user_agent/auth.rs | 79 +++-- .../arbiter-server/tests/user_agent/unseal.rs | 139 ++++---- 9 files changed, 656 insertions(+), 345 deletions(-) delete mode 100644 server/crates/arbiter-server/src/actors/user_agent/transport.rs delete mode 100644 server/crates/arbiter-server/src/errors.rs diff --git a/server/Cargo.lock b/server/Cargo.lock index 3e8ae9e..b7e6b44 100644 --- a/server/Cargo.lock +++ b/server/Cargo.lock @@ -73,6 +73,7 @@ dependencies = [ "tonic", "tonic-prost", "tonic-prost-build", + "tracing", "url", ] diff --git a/server/crates/arbiter-proto/Cargo.toml b/server/crates/arbiter-proto/Cargo.toml index 60d27cf..0640004 100644 --- a/server/crates/arbiter-proto/Cargo.toml +++ b/server/crates/arbiter-proto/Cargo.toml @@ -17,7 +17,7 @@ miette.workspace = true thiserror.workspace = true rustls-pki-types.workspace = true base64 = "0.22.1" - +tracing.workspace = true [build-dependencies] tonic-prost-build = "0.14.3" diff --git a/server/crates/arbiter-proto/src/transport.rs b/server/crates/arbiter-proto/src/transport.rs index 691ef9a..c360162 100644 --- a/server/crates/arbiter-proto/src/transport.rs +++ b/server/crates/arbiter-proto/src/transport.rs @@ -1,46 +1,283 @@ -use futures::{Stream, StreamExt}; -use tokio::sync::mpsc::{self, error::SendError}; -use tonic::{Status, Streaming}; +//! Transport-facing abstractions for protocol/session code. +//! +//! This module separates three concerns: +//! +//! - protocol/session logic wants a small duplex interface ([`Bi`]) +//! - transport adapters need to push concrete stream items to an underlying IO layer +//! - server/client boundaries may need to translate domain outbounds into transport +//! framing (for example, a tonic stream item) +//! +//! [`Bi`] is intentionally minimal and transport-agnostic: +//! - [`Bi::recv`] yields inbound protocol messages +//! - [`Bi::send`] accepts outbound protocol/domain items +//! +//! # Generic Ordering Rule +//! +//! This module uses a single convention consistently: when a type or trait is +//! parameterized by protocol message directions, the generic parameters are +//! declared as `Inbound` first, then `Outbound`. +//! +//! For [`Bi`], that means `Bi`: +//! - `recv() -> Option` +//! - `send(Outbound)` +//! +//! For adapter types that are parameterized by direction-specific converters, +//! inbound-related converter parameters are declared before outbound-related +//! converter parameters. +//! +//! [`ProtocolConverter`] is the boundary object that converts a protocol/domain +//! outbound item into the concrete outbound item expected by a transport sender. +//! The conversion is infallible, so domain-level recoverable failures should be +//! represented inside the domain outbound type itself (for example, +//! `Result`). +//! +//! [`GrpcAdapter`] combines: +//! - a tonic inbound stream +//! - a Tokio sender for outbound transport items +//! - a [`ProtocolConverter`] for the receive path +//! - a [`ProtocolConverter`] for the send path +//! +//! [`DummyTransport`] is a no-op implementation useful for tests and local actor +//! execution where no real network stream exists. +//! +//! # Component Interaction +//! +//! The typical layering looks like this: +//! +//! ```text +//! inbound (network -> protocol) +//! ============================ +//! +//! tonic::Streaming -> GrpcAdapter::recv() -> Bi::recv() -> protocol/session actor +//! | +//! +--> recv ProtocolConverter::convert(transport) +//! +//! outbound (protocol -> network) +//! ============================== +//! +//! protocol/session actor -> Bi::send(domain outbound item, e.g. Result) +//! -> GrpcAdapter::send() +//! | +//! +--> send ProtocolConverter::convert(domain) +//! -> Tokio mpsc::Sender -> tonic response stream +//! ``` +//! +//! # Design Notes +//! +//! - `recv()` collapses adapter-specific receive failures into `None`, which +//! lets protocol code treat stream termination and transport receive failure as +//! "no more inbound items" when no finer distinction is required. +//! - `send()` returns [`Error`] only for transport delivery failures (for example, +//! when the outbound channel is closed). +//! - Conversion policy lives outside protocol/session logic and can be defined at +//! the transport boundary (such as a server endpoint module). When domain and +//! transport types are identical, [`IdentityConverter`] can be used. +use std::marker::PhantomData; -// Abstraction for stream for sans-io capabilities -pub trait Bi: Stream> + Send + Sync + 'static { - type Error; +use futures::StreamExt; +use tokio::sync::mpsc; +use tonic::Streaming; + +/// Errors returned by transport adapters implementing [`Bi`]. +pub enum Error { + /// The outbound side of the transport is no longer accepting messages. + ChannelClosed, +} + +/// Minimal bidirectional transport abstraction used by protocol code. +/// +/// `Bi` models a duplex channel with: +/// - inbound items of type `Inbound` read via [`Bi::recv`] +/// - outbound items of type `Outbound` written via [`Bi::send`] +/// +/// The trait intentionally exposes only the operations the protocol layer needs, +/// allowing it to work with gRPC streams and other transport implementations. +/// +/// # Stream termination and errors +/// +/// [`Bi::recv`] returns: +/// - `Some(item)` when a new inbound message is available +/// - `None` when the inbound stream ends or the underlying transport reports an error +/// +/// Implementations may collapse transport-specific receive errors into `None` +/// when the protocol does not need to distinguish them from normal stream +/// termination. +pub trait Bi: Send + Sync + 'static { + /// Sends one outbound item to the peer. fn send( &mut self, - item: Result, - ) -> impl std::future::Future> + Send; + item: Outbound, + ) -> impl std::future::Future> + Send; + + /// Receives the next inbound item. + /// + /// Returns `None` when the inbound stream is finished or can no longer + /// produce items. + fn recv(&mut self) -> impl std::future::Future> + Send; } -// Bi-directional stream abstraction for handling gRPC streaming requests and responses -pub struct BiStream { - pub request_stream: Streaming, - pub response_sender: mpsc::Sender>, +/// Converts protocol/domain outbound items into transport-layer outbound items. +/// +/// This trait is used by transport adapters that need to emit a concrete stream +/// item type (for example, tonic server streams) while protocol code prefers to +/// work with domain-oriented outbound values. +/// +/// `convert` is infallible by design. Any recoverable protocol failure should be +/// represented in [`Self::Domain`] and mapped into the transport item in the +/// converter implementation. +pub trait ProtocolConverter: Send + Sync + 'static { + /// Outbound item produced by protocol/domain code. + type Domain; + + /// Outbound item required by the transport sender. + type Transport; + + /// Maps a protocol/domain outbound item into the transport sender item. + fn convert(&self, item: Self::Domain) -> Self::Transport; } -impl Stream for BiStream -where - T: Send + 'static, - U: Send + 'static, -{ - type Item = Result; +/// A [`ProtocolConverter`] that forwards values unchanged. +/// +/// Useful when the protocol-facing and transport-facing item types are +/// identical, but a converter is still required by an adapter API. +pub struct IdentityConverter { + _marker: PhantomData, +} - fn poll_next( - mut self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - self.request_stream.poll_next_unpin(cx) +impl IdentityConverter { + pub fn new() -> Self { + Self { + _marker: PhantomData, + } } } -impl Bi for BiStream -where - T: Send + 'static, - U: Send + 'static, -{ - type Error = SendError>; - - async fn send(&mut self, item: Result) -> Result<(), Self::Error> { - self.response_sender.send(item).await +impl Default for IdentityConverter { + fn default() -> Self { + Self::new() + } +} + +impl ProtocolConverter for IdentityConverter +where + T: Send + Sync + 'static, +{ + type Domain = T; + type Transport = T; + + fn convert(&self, item: Self::Domain) -> Self::Transport { + item + } +} + +/// [`Bi`] adapter backed by a tonic gRPC bidirectional stream. +/// +/// The adapter owns converter instances for both directions: +/// - receive converter: transport inbound -> protocol inbound +/// - send converter: protocol outbound -> transport outbound +/// +/// This keeps protocol actors decoupled from transport framing conventions in +/// both directions. +pub struct GrpcAdapter { + sender: mpsc::Sender, + receiver: Streaming, + inbound_converter: InboundConverter, + outbound_converter: OutboundConverter, +} + +impl GrpcAdapter +where + InboundConverter: ProtocolConverter, + OutboundConverter: ProtocolConverter, +{ + /// Creates a new gRPC-backed [`Bi`] adapter. + /// + /// The provided converters define: + /// - the protocol outbound item and corresponding transport outbound item + /// - the transport inbound item and corresponding protocol inbound item + pub fn new( + sender: mpsc::Sender, + receiver: Streaming, + inbound_converter: InboundConverter, + outbound_converter: OutboundConverter, + ) -> Self { + Self { + sender, + receiver, + inbound_converter, + outbound_converter, + } + } +} + +impl + Bi + for GrpcAdapter +where + InboundConverter: ProtocolConverter, + OutboundConverter: ProtocolConverter, + OutboundConverter::Domain: Send + 'static, + OutboundConverter::Transport: Send + 'static, + InboundConverter::Transport: Send + 'static, + InboundConverter::Domain: Send + 'static, +{ + #[tracing::instrument(level = "trace", skip(self, item))] + async fn send(&mut self, item: OutboundConverter::Domain) -> Result<(), Error> { + let outbound: OutboundConverter::Transport = self.outbound_converter.convert(item); + self.sender + .send(outbound) + .await + .map_err(|_| Error::ChannelClosed) + } + + #[tracing::instrument(level = "trace", skip(self))] + async fn recv(&mut self) -> Option { + self.receiver + .next() + .await + .transpose() + .ok() + .flatten() + .map(|item| self.inbound_converter.convert(item)) + } +} + +/// No-op [`Bi`] transport for tests and manual actor usage. +/// +/// `send` drops all items and succeeds. [`Bi::recv`] never resolves and therefore +/// does not busy-wait or spuriously close the stream. +pub struct DummyTransport { + _marker: PhantomData<(Inbound, Outbound)>, +} + +impl DummyTransport { + pub fn new() -> Self { + Self { + _marker: PhantomData, + } + } +} + +impl Default for DummyTransport { + fn default() -> Self { + Self::new() + } +} + +impl Bi for DummyTransport +where + Inbound: Send + Sync + 'static, + Outbound: Send + Sync + 'static, +{ + async fn send(&mut self, _item: Outbound) -> Result<(), Error> { + Ok(()) + } + + fn recv(&mut self) -> impl std::future::Future> + Send { + async { + std::future::pending::<()>().await; + None + } } } diff --git a/server/crates/arbiter-server/src/actors/user_agent/mod.rs b/server/crates/arbiter-server/src/actors/user_agent/mod.rs index 700dd40..50cc9dc 100644 --- a/server/crates/arbiter-server/src/actors/user_agent/mod.rs +++ b/server/crates/arbiter-server/src/actors/user_agent/mod.rs @@ -1,21 +1,26 @@ use std::{ops::DerefMut, sync::Mutex}; -use arbiter_proto::proto::{ - UnsealEncryptedKey, UnsealResult, UnsealStart, UnsealStartResponse, UserAgentResponse, - auth::{ - self, AuthChallengeRequest, AuthOk, ServerMessage as AuthServerMessage, - server_message::Payload as ServerAuthPayload, +use arbiter_proto::{ + proto::{ + UnsealEncryptedKey, UnsealResult, UnsealStart, UnsealStartResponse, UserAgentRequest, + UserAgentResponse, + auth::{ + self, AuthChallengeRequest, AuthOk, ClientMessage as ClientAuthMessage, + ServerMessage as AuthServerMessage, client_message::Payload as ClientAuthPayload, + server_message::Payload as ServerAuthPayload, + }, + user_agent_request::Payload as UserAgentRequestPayload, + user_agent_response::Payload as UserAgentResponsePayload, }, - user_agent_response::Payload as UserAgentResponsePayload, + transport::{Bi, DummyTransport}, }; use chacha20poly1305::{AeadInPlace, XChaCha20Poly1305, XNonce, aead::KeyInit}; use diesel::{ExpressionMethods as _, OptionalExtension as _, QueryDsl, dsl::update}; use diesel_async::RunQueryDsl; use ed25519_dalek::VerifyingKey; -use kameo::{Actor, error::SendError, messages}; +use kameo::{Actor, error::SendError}; use memsafe::MemSafe; -use tokio::sync::mpsc::Sender; -use tonic::Status; +use tokio::select; use tracing::{error, info}; use x25519_dalek::{EphemeralSecret, PublicKey}; @@ -31,62 +36,105 @@ use crate::{ }, }, db::{self, schema}, - errors::GrpcStatusExt, }; mod state; -mod transport; -pub(crate) use transport::handle_user_agent; +#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)] +pub enum UserAgentError { + #[error("Expected message with payload")] + MissingRequestPayload, + #[error("Expected message with payload")] + UnexpectedRequestPayload, + #[error("Invalid state for challenge solution")] + InvalidStateForChallengeSolution, + #[error("Invalid state for unseal encrypted key")] + InvalidStateForUnsealEncryptedKey, + #[error("client_pubkey must be 32 bytes")] + InvalidClientPubkeyLength, + #[error("Expected pubkey to have specific length")] + InvalidAuthPubkeyLength, + #[error("Failed to convert pubkey to VerifyingKey")] + InvalidAuthPubkeyEncoding, + #[error("Invalid signature length")] + InvalidSignatureLength, + #[error("Invalid bootstrap token")] + InvalidBootstrapToken, + #[error("Public key not registered")] + PublicKeyNotRegistered, + #[error("Invalid challenge solution")] + InvalidChallengeSolution, + #[error("State machine error")] + StateTransitionFailed, + #[error("Bootstrap token consumption failed")] + BootstrapperActorUnreachable, + #[error("Vault is not available")] + KeyHolderActorUnreachable, + #[error("Database pool error")] + DatabasePoolUnavailable, + #[error("Database error")] + DatabaseOperationFailed, +} -#[derive(Actor)] -pub struct UserAgentActor { +pub struct UserAgentActor +where + Transport: Bi>, +{ db: db::DatabasePool, actors: GlobalActors, state: UserAgentStateMachine, - // will be used in future - _tx: Sender>, + transport: Transport, } -impl UserAgentActor { - pub(crate) fn new( - context: ServerContext, - tx: Sender>, - ) -> Self { +impl UserAgentActor +where + Transport: Bi>, +{ + pub(crate) fn new(context: ServerContext, transport: Transport) -> Self { Self { db: context.db.clone(), actors: context.actors.clone(), state: UserAgentStateMachine::new(DummyContext), - _tx: tx, + transport, } } - pub fn new_manual( - db: db::DatabasePool, - actors: GlobalActors, - tx: Sender>, - ) -> Self { - Self { - db, - actors, - state: UserAgentStateMachine::new(DummyContext), - _tx: tx, - } - } - - fn transition(&mut self, event: UserAgentEvents) -> Result<(), Status> { + fn transition(&mut self, event: UserAgentEvents) -> Result<(), UserAgentError> { self.state.process_event(event).map_err(|e| { error!(?e, "State transition failed"); - Status::internal("State machine error") + UserAgentError::StateTransitionFailed })?; Ok(()) } + pub async fn process_transport_inbound(&mut self, req: UserAgentRequest) -> Output { + let msg = req.payload.ok_or_else(|| { + error!(actor = "useragent", "Received message with no payload"); + UserAgentError::MissingRequestPayload + })?; + + match msg { + UserAgentRequestPayload::AuthMessage(ClientAuthMessage { + payload: Some(ClientAuthPayload::AuthChallengeRequest(req)), + }) => self.handle_auth_challenge_request(req).await, + UserAgentRequestPayload::AuthMessage(ClientAuthMessage { + payload: Some(ClientAuthPayload::AuthChallengeSolution(solution)), + }) => self.handle_auth_challenge_solution(solution).await, + UserAgentRequestPayload::UnsealStart(unseal_start) => { + self.handle_unseal_request(unseal_start).await + } + UserAgentRequestPayload::UnsealEncryptedKey(unseal_encrypted_key) => { + self.handle_unseal_encrypted_key(unseal_encrypted_key).await + } + _ => Err(UserAgentError::UnexpectedRequestPayload), + } + } + async fn auth_with_bootstrap_token( &mut self, pubkey: ed25519_dalek::VerifyingKey, token: String, - ) -> Result { + ) -> Result { let token_ok: bool = self .actors .bootstrapper @@ -94,16 +142,19 @@ impl UserAgentActor { .await .map_err(|e| { error!(?pubkey, "Failed to consume bootstrap token: {e}"); - Status::internal("Bootstrap token consumption failed") + UserAgentError::BootstrapperActorUnreachable })?; if !token_ok { error!(?pubkey, "Invalid bootstrap token provided"); - return Err(Status::invalid_argument("Invalid bootstrap token")); + return Err(UserAgentError::InvalidBootstrapToken); } { - let mut conn = self.db.get().await.to_status()?; + let mut conn = self.db.get().await.map_err(|e| { + error!(error = ?e, "Database pool error"); + UserAgentError::DatabasePoolUnavailable + })?; diesel::insert_into(schema::useragent_client::table) .values(( @@ -112,7 +163,10 @@ impl UserAgentActor { )) .execute(&mut conn) .await - .to_status()?; + .map_err(|e| { + error!(error = ?e, "Database error"); + UserAgentError::DatabaseOperationFailed + })?; } self.transition(UserAgentEvents::ReceivedBootstrapToken)?; @@ -122,7 +176,10 @@ impl UserAgentActor { async fn auth_with_challenge(&mut self, pubkey: VerifyingKey, pubkey_bytes: Vec) -> Output { let nonce: Option = { - let mut db_conn = self.db.get().await.to_status()?; + let mut db_conn = self.db.get().await.map_err(|e| { + error!(error = ?e, "Database pool error"); + UserAgentError::DatabasePoolUnavailable + })?; db_conn .exclusive_transaction(|conn| { Box::pin(async move { @@ -147,12 +204,15 @@ impl UserAgentActor { }) .await .optional() - .to_status()? + .map_err(|e| { + error!(error = ?e, "Database error"); + UserAgentError::DatabaseOperationFailed + })? }; let Some(nonce) = nonce else { error!(?pubkey, "Public key not found in database"); - return Err(Status::unauthenticated("Public key not registered")); + return Err(UserAgentError::PublicKeyNotRegistered); }; let challenge = auth::AuthChallenge { @@ -177,19 +237,17 @@ impl UserAgentActor { fn verify_challenge_solution( &self, solution: &auth::AuthChallengeSolution, - ) -> Result<(bool, &ChallengeContext), Status> { + ) -> Result<(bool, &ChallengeContext), UserAgentError> { let UserAgentStates::WaitingForChallengeSolution(challenge_context) = self.state.state() else { error!("Received challenge solution in invalid state"); - return Err(Status::invalid_argument( - "Invalid state for challenge solution", - )); + return Err(UserAgentError::InvalidStateForChallengeSolution); }; let formatted_challenge = arbiter_proto::format_challenge(&challenge_context.challenge); let signature = solution.signature.as_slice().try_into().map_err(|_| { error!(?solution, "Invalid signature length"); - Status::invalid_argument("Invalid signature length") + UserAgentError::InvalidSignatureLength })?; let valid = challenge_context @@ -201,7 +259,7 @@ impl UserAgentActor { } } -type Output = Result; +type Output = Result; fn auth_response(payload: ServerAuthPayload) -> UserAgentResponse { UserAgentResponse { @@ -217,17 +275,18 @@ fn unseal_response(payload: UserAgentResponsePayload) -> UserAgentResponse { } } -#[messages] -impl UserAgentActor { - #[message] - pub async fn handle_unseal_request(&mut self, req: UnsealStart) -> Output { +impl UserAgentActor +where + Transport: Bi>, +{ + async fn handle_unseal_request(&mut self, req: UnsealStart) -> Output { let secret = EphemeralSecret::random(); let public_key = PublicKey::from(&secret); let client_pubkey_bytes: [u8; 32] = req .client_pubkey .try_into() - .map_err(|_| Status::invalid_argument("client_pubkey must be 32 bytes"))?; + .map_err(|_| UserAgentError::InvalidClientPubkeyLength)?; let client_public_key = PublicKey::from(client_pubkey_bytes); @@ -243,13 +302,10 @@ impl UserAgentActor { )) } - #[message] - pub async fn handle_unseal_encrypted_key(&mut self, req: UnsealEncryptedKey) -> Output { + async fn handle_unseal_encrypted_key(&mut self, req: UnsealEncryptedKey) -> Output { let UserAgentStates::WaitingForUnsealKey(unseal_context) = self.state.state() else { error!("Received unseal encrypted key in invalid state"); - return Err(Status::failed_precondition( - "Invalid state for unseal encrypted key", - )); + return Err(UserAgentError::InvalidStateForUnsealEncryptedKey); }; let ephemeral_secret = { let mut secret_lock = unseal_context.secret.lock().unwrap(); @@ -313,7 +369,7 @@ impl UserAgentActor { Err(err) => { error!(?err, "Failed to send unseal request to keyholder"); self.transition(UserAgentEvents::ReceivedInvalidKey)?; - Err(Status::internal("Vault is not available")) + Err(UserAgentError::KeyHolderActorUnreachable) } } } @@ -327,14 +383,14 @@ impl UserAgentActor { } } - #[message] - pub async fn handle_auth_challenge_request(&mut self, req: AuthChallengeRequest) -> Output { - let pubkey = req.pubkey.as_array().ok_or(Status::invalid_argument( - "Expected pubkey to have specific length", - ))?; + async fn handle_auth_challenge_request(&mut self, req: AuthChallengeRequest) -> Output { + let pubkey = req + .pubkey + .as_array() + .ok_or(UserAgentError::InvalidAuthPubkeyLength)?; let pubkey = VerifyingKey::from_bytes(pubkey).map_err(|_err| { error!(?pubkey, "Failed to convert to VerifyingKey"); - Status::invalid_argument("Failed to convert pubkey to VerifyingKey") + UserAgentError::InvalidAuthPubkeyEncoding })?; self.transition(UserAgentEvents::AuthRequest)?; @@ -345,8 +401,7 @@ impl UserAgentActor { } } - #[message] - pub async fn handle_auth_challenge_solution( + async fn handle_auth_challenge_solution( &mut self, solution: auth::AuthChallengeSolution, ) -> Output { @@ -362,7 +417,72 @@ impl UserAgentActor { } else { error!("Client provided invalid solution to authentication challenge"); self.transition(UserAgentEvents::ReceivedBadSolution)?; - Err(Status::unauthenticated("Invalid challenge solution")) + Err(UserAgentError::InvalidChallengeSolution) + } + } +} + + +impl Actor for UserAgentActor +where + Transport: Bi>, +{ + type Args = Self; + + type Error = (); + + async fn on_start( + args: Self::Args, + _: kameo::prelude::ActorRef, + ) -> Result { + Ok(args) + } + + async fn next( + &mut self, + _actor_ref: kameo::prelude::WeakActorRef, + mailbox_rx: &mut kameo::prelude::MailboxReceiver, + ) -> Option> { + loop { + select! { + signal = mailbox_rx.recv() => { + return signal; + } + msg = self.transport.recv() => { + match msg { + Some(request) => { + match self.process_transport_inbound(request).await { + Ok(response) => { + if self.transport.send(Ok(response)).await.is_err() { + error!(actor = "useragent", reason = "channel closed", "send.failed"); + return Some(kameo::mailbox::Signal::Stop); + } + } + Err(err) => { + let _ = self.transport.send(Err(err)).await; + return Some(kameo::mailbox::Signal::Stop); + } + } + } + None => { + info!(actor = "useragent", "transport.closed"); + return Some(kameo::mailbox::Signal::Stop); + } + } + } + } + } + } +} + + +impl UserAgentActor>> { + pub fn new_manual(db: db::DatabasePool, actors: GlobalActors) -> Self { + Self { + db, + actors, + state: UserAgentStateMachine::new(DummyContext), + transport: DummyTransport::new(), } } } diff --git a/server/crates/arbiter-server/src/actors/user_agent/transport.rs b/server/crates/arbiter-server/src/actors/user_agent/transport.rs deleted file mode 100644 index c1ac84c..0000000 --- a/server/crates/arbiter-server/src/actors/user_agent/transport.rs +++ /dev/null @@ -1,95 +0,0 @@ -use super::UserAgentActor; -use arbiter_proto::proto::{ - UserAgentRequest, UserAgentResponse, - auth::{ClientMessage as ClientAuthMessage, client_message::Payload as ClientAuthPayload}, - user_agent_request::Payload as UserAgentRequestPayload, -}; -use futures::StreamExt; -use kameo::{ - actor::{ActorRef, Spawn as _}, - error::SendError, -}; -use tokio::sync::mpsc; -use tonic::Status; -use tracing::error; - -use crate::{ - actors::user_agent::{ - HandleAuthChallengeRequest, HandleAuthChallengeSolution, HandleUnsealEncryptedKey, - HandleUnsealRequest, - }, - context::ServerContext, -}; - -pub(crate) async fn handle_user_agent( - context: ServerContext, - mut req_stream: tonic::Streaming, - tx: mpsc::Sender>, -) { - let actor = UserAgentActor::spawn(UserAgentActor::new(context, tx.clone())); - - while let Some(Ok(req)) = req_stream.next().await - && actor.is_alive() - { - match process_message(&actor, req).await { - Ok(resp) => { - if tx.send(Ok(resp)).await.is_err() { - error!(actor = "useragent", "Failed to send response to client"); - break; - } - } - Err(status) => { - let _ = tx.send(Err(status)).await; - break; - } - } - } - - actor.kill(); -} - -async fn process_message( - actor: &ActorRef, - req: UserAgentRequest, -) -> Result { - let msg = req.payload.ok_or_else(|| { - error!(actor = "useragent", "Received message with no payload"); - Status::invalid_argument("Expected message with payload") - })?; - - match msg { - UserAgentRequestPayload::AuthMessage(ClientAuthMessage { - payload: Some(ClientAuthPayload::AuthChallengeRequest(req)), - }) => actor - .ask(HandleAuthChallengeRequest { req }) - .await - .map_err(into_status), - UserAgentRequestPayload::AuthMessage(ClientAuthMessage { - payload: Some(ClientAuthPayload::AuthChallengeSolution(solution)), - }) => actor - .ask(HandleAuthChallengeSolution { solution }) - .await - .map_err(into_status), - UserAgentRequestPayload::UnsealStart(unseal_start) => actor - .ask(HandleUnsealRequest { req: unseal_start }) - .await - .map_err(into_status), - UserAgentRequestPayload::UnsealEncryptedKey(unseal_encrypted_key) => actor - .ask(HandleUnsealEncryptedKey { - req: unseal_encrypted_key, - }) - .await - .map_err(into_status), - _ => Err(Status::invalid_argument("Expected message with payload")), - } -} - -fn into_status(e: SendError) -> Status { - match e { - SendError::HandlerError(status) => status, - _ => { - error!(actor = "useragent", "Failed to send message to actor"); - Status::internal("session failure") - } - } -} diff --git a/server/crates/arbiter-server/src/errors.rs b/server/crates/arbiter-server/src/errors.rs deleted file mode 100644 index 98dae76..0000000 --- a/server/crates/arbiter-server/src/errors.rs +++ /dev/null @@ -1,24 +0,0 @@ -use tonic::Status; -use tracing::error; - -pub trait GrpcStatusExt { - fn to_status(self) -> Result; -} - -impl GrpcStatusExt for Result { - fn to_status(self) -> Result { - self.map_err(|e| { - error!(error = ?e, "Database error"); - Status::internal("Database error") - }) - } -} - -impl GrpcStatusExt for Result { - fn to_status(self) -> Result { - self.map_err(|e| { - error!(error = ?e, "Database pool error"); - Status::internal("Database pool error") - }) - } -} diff --git a/server/crates/arbiter-server/src/lib.rs b/server/crates/arbiter-server/src/lib.rs index 9d86e27..77e03f2 100644 --- a/server/crates/arbiter-server/src/lib.rs +++ b/server/crates/arbiter-server/src/lib.rs @@ -1,26 +1,91 @@ #![forbid(unsafe_code)] use arbiter_proto::{ proto::{ClientRequest, ClientResponse, UserAgentRequest, UserAgentResponse}, - transport::BiStream, + transport::{GrpcAdapter, IdentityConverter, ProtocolConverter}, }; use async_trait::async_trait; +use kameo::actor::Spawn; use tokio_stream::wrappers::ReceiverStream; use tokio::sync::mpsc; use tonic::{Request, Response, Status}; +use tracing::info; use crate::{ - actors::{client::handle_client, user_agent::handle_user_agent}, + actors::user_agent::{UserAgentActor, UserAgentError}, context::ServerContext, }; pub mod actors; pub mod context; pub mod db; -mod errors; const DEFAULT_CHANNEL_SIZE: usize = 1000; + +/// Converts User Agent domain outbounds into the tonic stream item emitted by +/// the server. +/// +/// The conversion is defined at the server boundary so the actor module remains +/// focused on domain semantics and does not depend on tonic status encoding. +struct UserAgentGrpcConverter; + +impl ProtocolConverter for UserAgentGrpcConverter { + type Domain = Result; + type Transport = Result; + + fn convert(&self, item: Self::Domain) -> Self::Transport { + match item { + Ok(message) => Ok(message), + Err(err) => Err(user_agent_error_status(err)), + } + } +} + +/// Maps User Agent domain errors to public gRPC transport errors for the +/// `user_agent` streaming endpoint. +fn user_agent_error_status(value: UserAgentError) -> Status { + match value { + UserAgentError::MissingRequestPayload | UserAgentError::UnexpectedRequestPayload => { + Status::invalid_argument("Expected message with payload") + } + UserAgentError::InvalidStateForChallengeSolution => { + Status::invalid_argument("Invalid state for challenge solution") + } + UserAgentError::InvalidStateForUnsealEncryptedKey => { + Status::failed_precondition("Invalid state for unseal encrypted key") + } + UserAgentError::InvalidClientPubkeyLength => { + Status::invalid_argument("client_pubkey must be 32 bytes") + } + UserAgentError::InvalidAuthPubkeyLength => { + Status::invalid_argument("Expected pubkey to have specific length") + } + UserAgentError::InvalidAuthPubkeyEncoding => { + Status::invalid_argument("Failed to convert pubkey to VerifyingKey") + } + UserAgentError::InvalidSignatureLength => { + Status::invalid_argument("Invalid signature length") + } + UserAgentError::InvalidBootstrapToken => { + Status::invalid_argument("Invalid bootstrap token") + } + UserAgentError::PublicKeyNotRegistered => { + Status::unauthenticated("Public key not registered") + } + UserAgentError::InvalidChallengeSolution => { + Status::unauthenticated("Invalid challenge solution") + } + UserAgentError::StateTransitionFailed => Status::internal("State machine error"), + UserAgentError::BootstrapperActorUnreachable => { + Status::internal("Bootstrap token consumption failed") + } + UserAgentError::KeyHolderActorUnreachable => Status::internal("Vault is not available"), + UserAgentError::DatabasePoolUnavailable => Status::internal("Database pool error"), + UserAgentError::DatabaseOperationFailed => Status::internal("Database error"), + } +} + pub struct Server { context: ServerContext, } @@ -38,28 +103,29 @@ impl arbiter_proto::proto::arbiter_service_server::ArbiterService for Server { async fn client( &self, - request: Request>, + _request: Request>, ) -> Result, Status> { - let req_stream = request.into_inner(); - let (tx, rx) = mpsc::channel(DEFAULT_CHANNEL_SIZE); - tokio::spawn(handle_client( - self.context.clone(), - BiStream { - request_stream: req_stream, - response_sender: tx, - }, - )); - - Ok(Response::new(ReceiverStream::new(rx))) + todo!() } + #[tracing::instrument(level = "debug", skip(self))] async fn user_agent( &self, request: Request>, ) -> Result, Status> { let req_stream = request.into_inner(); let (tx, rx) = mpsc::channel(DEFAULT_CHANNEL_SIZE); - tokio::spawn(handle_user_agent(self.context.clone(), req_stream, tx)); + + let transport = GrpcAdapter::new( + tx, + req_stream, + IdentityConverter::::new(), + UserAgentGrpcConverter, + ); + UserAgentActor::spawn(UserAgentActor::new(self.context.clone(), transport)); + + info!(event = "connection established", "grpc.user_agent"); + Ok(Response::new(ReceiverStream::new(rx))) } } diff --git a/server/crates/arbiter-server/tests/user_agent/auth.rs b/server/crates/arbiter-server/tests/user_agent/auth.rs index c79d616..d40efb4 100644 --- a/server/crates/arbiter-server/tests/user_agent/auth.rs +++ b/server/crates/arbiter-server/tests/user_agent/auth.rs @@ -1,20 +1,29 @@ use arbiter_proto::proto::{ UserAgentResponse, - auth::{self, AuthChallengeRequest, AuthOk}, + UserAgentRequest, + auth::{self, AuthChallengeRequest, AuthOk, ClientMessage, client_message::Payload as ClientAuthPayload}, + user_agent_request::Payload as UserAgentRequestPayload, user_agent_response::Payload as UserAgentResponsePayload, }; use arbiter_server::{ actors::{ GlobalActors, bootstrap::GetToken, - user_agent::{HandleAuthChallengeRequest, HandleAuthChallengeSolution, UserAgentActor}, + user_agent::{UserAgentActor, UserAgentError}, }, db::{self, schema}, }; use diesel::{ExpressionMethods as _, QueryDsl, insert_into}; use diesel_async::RunQueryDsl; use ed25519_dalek::Signer as _; -use kameo::actor::Spawn; + +fn auth_request(payload: ClientAuthPayload) -> UserAgentRequest { + UserAgentRequest { + payload: Some(UserAgentRequestPayload::AuthMessage(ClientMessage { + payload: Some(payload), + })), + } +} #[tokio::test] #[test_log::test] @@ -23,22 +32,20 @@ pub async fn test_bootstrap_token_auth() { let actors = GlobalActors::spawn(db.clone()).await.unwrap(); let token = actors.bootstrapper.ask(GetToken).await.unwrap().unwrap(); - let user_agent = - UserAgentActor::new_manual(db.clone(), actors, tokio::sync::mpsc::channel(1).0); - let user_agent_ref = UserAgentActor::spawn(user_agent); + let mut user_agent = UserAgentActor::new_manual(db.clone(), actors); let new_key = ed25519_dalek::SigningKey::generate(&mut rand::rng()); let pubkey_bytes = new_key.verifying_key().to_bytes().to_vec(); - let result = user_agent_ref - .ask(HandleAuthChallengeRequest { - req: AuthChallengeRequest { + let result = user_agent + .process_transport_inbound(auth_request(ClientAuthPayload::AuthChallengeRequest( + AuthChallengeRequest { pubkey: pubkey_bytes, bootstrap_token: Some(token), }, - }) + ))) .await - .expect("Shouldn't fail to send message"); + .expect("Shouldn't fail to process message"); assert_eq!( result, @@ -68,35 +75,23 @@ pub async fn test_bootstrap_invalid_token_auth() { let db = db::create_test_pool().await; let actors = GlobalActors::spawn(db.clone()).await.unwrap(); - let user_agent = - UserAgentActor::new_manual(db.clone(), actors, tokio::sync::mpsc::channel(1).0); - let user_agent_ref = UserAgentActor::spawn(user_agent); + let mut user_agent = UserAgentActor::new_manual(db.clone(), actors); let new_key = ed25519_dalek::SigningKey::generate(&mut rand::rng()); let pubkey_bytes = new_key.verifying_key().to_bytes().to_vec(); - let result = user_agent_ref - .ask(HandleAuthChallengeRequest { - req: AuthChallengeRequest { + let result = user_agent + .process_transport_inbound(auth_request(ClientAuthPayload::AuthChallengeRequest( + AuthChallengeRequest { pubkey: pubkey_bytes, bootstrap_token: Some("invalid_token".to_string()), }, - }) + ))) .await; match result { - Err(kameo::error::SendError::HandlerError(status)) => { - assert_eq!(status.code(), tonic::Code::InvalidArgument); - insta::assert_debug_snapshot!(status, @r#" - Status { - code: InvalidArgument, - message: "Invalid bootstrap token", - source: None, - } - "#); - } - Err(other) => { - panic!("Expected SendError::HandlerError, got {other:?}"); + Err(err) => { + assert_eq!(err, UserAgentError::InvalidBootstrapToken); } Ok(_) => { panic!("Expected error due to invalid bootstrap token, but got success"); @@ -110,9 +105,7 @@ pub async fn test_challenge_auth() { let db = db::create_test_pool().await; let actors = GlobalActors::spawn(db.clone()).await.unwrap(); - let user_agent = - UserAgentActor::new_manual(db.clone(), actors, tokio::sync::mpsc::channel(1).0); - let user_agent_ref = UserAgentActor::spawn(user_agent); + let mut user_agent = UserAgentActor::new_manual(db.clone(), actors); let new_key = ed25519_dalek::SigningKey::generate(&mut rand::rng()); let pubkey_bytes = new_key.verifying_key().to_bytes().to_vec(); @@ -126,15 +119,15 @@ pub async fn test_challenge_auth() { .unwrap(); } - let result = user_agent_ref - .ask(HandleAuthChallengeRequest { - req: AuthChallengeRequest { + let result = user_agent + .process_transport_inbound(auth_request(ClientAuthPayload::AuthChallengeRequest( + AuthChallengeRequest { pubkey: pubkey_bytes, bootstrap_token: None, }, - }) + ))) .await - .expect("Shouldn't fail to send message"); + .expect("Shouldn't fail to process message"); let UserAgentResponse { payload: @@ -151,14 +144,14 @@ pub async fn test_challenge_auth() { let signature = new_key.sign(&formatted_challenge); let serialized_signature = signature.to_bytes().to_vec(); - let result = user_agent_ref - .ask(HandleAuthChallengeSolution { - solution: auth::AuthChallengeSolution { + let result = user_agent + .process_transport_inbound(auth_request(ClientAuthPayload::AuthChallengeSolution( + auth::AuthChallengeSolution { signature: serialized_signature, }, - }) + ))) .await - .expect("Shouldn't fail to send message"); + .expect("Shouldn't fail to process message"); assert_eq!( result, diff --git a/server/crates/arbiter-server/tests/user_agent/unseal.rs b/server/crates/arbiter-server/tests/user_agent/unseal.rs index 9a7c85f..2cb46f6 100644 --- a/server/crates/arbiter-server/tests/user_agent/unseal.rs +++ b/server/crates/arbiter-server/tests/user_agent/unseal.rs @@ -1,27 +1,52 @@ use arbiter_proto::proto::{ - UnsealEncryptedKey, UnsealResult, UnsealStart, auth::AuthChallengeRequest, + UnsealEncryptedKey, UnsealResult, UnsealStart, UserAgentRequest, UserAgentResponse, + auth::{AuthChallengeRequest, ClientMessage, client_message::Payload as ClientAuthPayload}, + user_agent_request::Payload as UserAgentRequestPayload, user_agent_response::Payload as UserAgentResponsePayload, }; +use arbiter_proto::transport::DummyTransport; use arbiter_server::{ actors::{ GlobalActors, bootstrap::GetToken, keyholder::{Bootstrap, Seal}, - user_agent::{ - HandleAuthChallengeRequest, HandleUnsealEncryptedKey, HandleUnsealRequest, - UserAgentActor, - }, + user_agent::{UserAgentActor, UserAgentError}, }, db, }; use chacha20poly1305::{AeadInPlace, XChaCha20Poly1305, XNonce, aead::KeyInit}; -use kameo::actor::{ActorRef, Spawn}; use memsafe::MemSafe; use x25519_dalek::{EphemeralSecret, PublicKey}; +type TestUserAgent = + UserAgentActor>>; + +fn auth_request(payload: ClientAuthPayload) -> UserAgentRequest { + UserAgentRequest { + payload: Some(UserAgentRequestPayload::AuthMessage(ClientMessage { + payload: Some(payload), + })), + } +} + +fn unseal_start_request(req: UnsealStart) -> UserAgentRequest { + UserAgentRequest { + payload: Some(UserAgentRequestPayload::UnsealStart(req)), + } +} + +fn unseal_key_request(req: UnsealEncryptedKey) -> UserAgentRequest { + UserAgentRequest { + payload: Some(UserAgentRequestPayload::UnsealEncryptedKey(req)), + } +} + async fn setup_authenticated_user_agent( seal_key: &[u8], -) -> (arbiter_server::db::DatabasePool, ActorRef) { +) -> ( + arbiter_server::db::DatabasePool, + TestUserAgent, +) { let db = db::create_test_pool().await; let actors = GlobalActors::spawn(db.clone()).await.unwrap(); @@ -34,38 +59,34 @@ async fn setup_authenticated_user_agent( .unwrap(); actors.key_holder.ask(Seal).await.unwrap(); - let user_agent = - UserAgentActor::new_manual(db.clone(), actors.clone(), tokio::sync::mpsc::channel(1).0); - let user_agent_ref = UserAgentActor::spawn(user_agent); + let mut user_agent = UserAgentActor::new_manual(db.clone(), actors.clone()); let token = actors.bootstrapper.ask(GetToken).await.unwrap().unwrap(); let auth_key = ed25519_dalek::SigningKey::generate(&mut rand::rng()); - user_agent_ref - .ask(HandleAuthChallengeRequest { - req: AuthChallengeRequest { + user_agent + .process_transport_inbound(auth_request(ClientAuthPayload::AuthChallengeRequest( + AuthChallengeRequest { pubkey: auth_key.verifying_key().to_bytes().to_vec(), bootstrap_token: Some(token), }, - }) + ))) .await .unwrap(); - (db, user_agent_ref) + (db, user_agent) } async fn client_dh_encrypt( - user_agent_ref: &ActorRef, + user_agent: &mut TestUserAgent, key_to_send: &[u8], ) -> UnsealEncryptedKey { let client_secret = EphemeralSecret::random(); let client_public = PublicKey::from(&client_secret); - let response = user_agent_ref - .ask(HandleUnsealRequest { - req: UnsealStart { - client_pubkey: client_public.as_bytes().to_vec(), - }, - }) + let response = user_agent + .process_transport_inbound(unseal_start_request(UnsealStart { + client_pubkey: client_public.as_bytes().to_vec(), + })) .await .unwrap(); @@ -95,12 +116,12 @@ async fn client_dh_encrypt( #[test_log::test] pub async fn test_unseal_success() { let seal_key = b"test-seal-key"; - let (_db, user_agent_ref) = setup_authenticated_user_agent(seal_key).await; + let (_db, mut user_agent) = setup_authenticated_user_agent(seal_key).await; - let encrypted_key = client_dh_encrypt(&user_agent_ref, seal_key).await; + let encrypted_key = client_dh_encrypt(&mut user_agent, seal_key).await; - let response = user_agent_ref - .ask(HandleUnsealEncryptedKey { req: encrypted_key }) + let response = user_agent + .process_transport_inbound(unseal_key_request(encrypted_key)) .await .unwrap(); @@ -113,12 +134,12 @@ pub async fn test_unseal_success() { #[tokio::test] #[test_log::test] pub async fn test_unseal_wrong_seal_key() { - let (_db, user_agent_ref) = setup_authenticated_user_agent(b"correct-key").await; + let (_db, mut user_agent) = setup_authenticated_user_agent(b"correct-key").await; - let encrypted_key = client_dh_encrypt(&user_agent_ref, b"wrong-key").await; + let encrypted_key = client_dh_encrypt(&mut user_agent, b"wrong-key").await; - let response = user_agent_ref - .ask(HandleUnsealEncryptedKey { req: encrypted_key }) + let response = user_agent + .process_transport_inbound(unseal_key_request(encrypted_key)) .await .unwrap(); @@ -131,28 +152,24 @@ pub async fn test_unseal_wrong_seal_key() { #[tokio::test] #[test_log::test] pub async fn test_unseal_corrupted_ciphertext() { - let (_db, user_agent_ref) = setup_authenticated_user_agent(b"test-key").await; + let (_db, mut user_agent) = setup_authenticated_user_agent(b"test-key").await; let client_secret = EphemeralSecret::random(); let client_public = PublicKey::from(&client_secret); - user_agent_ref - .ask(HandleUnsealRequest { - req: UnsealStart { - client_pubkey: client_public.as_bytes().to_vec(), - }, - }) + user_agent + .process_transport_inbound(unseal_start_request(UnsealStart { + client_pubkey: client_public.as_bytes().to_vec(), + })) .await .unwrap(); - let response = user_agent_ref - .ask(HandleUnsealEncryptedKey { - req: UnsealEncryptedKey { - nonce: vec![0u8; 24], - ciphertext: vec![0u8; 32], - associated_data: vec![], - }, - }) + let response = user_agent + .process_transport_inbound(unseal_key_request(UnsealEncryptedKey { + nonce: vec![0u8; 24], + ciphertext: vec![0u8; 32], + associated_data: vec![], + })) .await .unwrap(); @@ -168,24 +185,20 @@ pub async fn test_unseal_start_without_auth_fails() { let db = db::create_test_pool().await; let actors = GlobalActors::spawn(db.clone()).await.unwrap(); - let user_agent = - UserAgentActor::new_manual(db.clone(), actors, tokio::sync::mpsc::channel(1).0); - let user_agent_ref = UserAgentActor::spawn(user_agent); + let mut user_agent = UserAgentActor::new_manual(db.clone(), actors); let client_secret = EphemeralSecret::random(); let client_public = PublicKey::from(&client_secret); - let result = user_agent_ref - .ask(HandleUnsealRequest { - req: UnsealStart { - client_pubkey: client_public.as_bytes().to_vec(), - }, - }) + let result = user_agent + .process_transport_inbound(unseal_start_request(UnsealStart { + client_pubkey: client_public.as_bytes().to_vec(), + })) .await; match result { - Err(kameo::error::SendError::HandlerError(status)) => { - assert_eq!(status.code(), tonic::Code::Internal); + Err(err) => { + assert_eq!(err, UserAgentError::StateTransitionFailed); } other => panic!("Expected state machine error, got {other:?}"), } @@ -195,13 +208,13 @@ pub async fn test_unseal_start_without_auth_fails() { #[test_log::test] pub async fn test_unseal_retry_after_invalid_key() { let seal_key = b"real-seal-key"; - let (_db, user_agent_ref) = setup_authenticated_user_agent(seal_key).await; + let (_db, mut user_agent) = setup_authenticated_user_agent(seal_key).await; { - let encrypted_key = client_dh_encrypt(&user_agent_ref, b"wrong-key").await; + let encrypted_key = client_dh_encrypt(&mut user_agent, b"wrong-key").await; - let response = user_agent_ref - .ask(HandleUnsealEncryptedKey { req: encrypted_key }) + let response = user_agent + .process_transport_inbound(unseal_key_request(encrypted_key)) .await .unwrap(); @@ -212,10 +225,10 @@ pub async fn test_unseal_retry_after_invalid_key() { } { - let encrypted_key = client_dh_encrypt(&user_agent_ref, seal_key).await; + let encrypted_key = client_dh_encrypt(&mut user_agent, seal_key).await; - let response = user_agent_ref - .ask(HandleUnsealEncryptedKey { req: encrypted_key }) + let response = user_agent + .process_transport_inbound(unseal_key_request(encrypted_key)) .await .unwrap(); -- 2.49.1 From 3401205cbdede35cbc5001bbc20afce5a93519f2 Mon Sep 17 00:00:00 2001 From: hdbg Date: Thu, 26 Feb 2026 19:29:45 +0100 Subject: [PATCH 2/5] refactor(transport): simplify converters --- server/crates/arbiter-proto/src/transport.rs | 288 ++++++++++--------- server/crates/arbiter-server/src/lib.rs | 19 +- 2 files changed, 160 insertions(+), 147 deletions(-) diff --git a/server/crates/arbiter-proto/src/transport.rs b/server/crates/arbiter-proto/src/transport.rs index c360162..9ab433b 100644 --- a/server/crates/arbiter-proto/src/transport.rs +++ b/server/crates/arbiter-proto/src/transport.rs @@ -3,9 +3,9 @@ //! This module separates three concerns: //! //! - protocol/session logic wants a small duplex interface ([`Bi`]) -//! - transport adapters need to push concrete stream items to an underlying IO layer -//! - server/client boundaries may need to translate domain outbounds into transport -//! framing (for example, a tonic stream item) +//! - transport adapters push concrete stream items to an underlying IO layer +//! - transport boundaries translate between protocol-facing and transport-facing +//! item types via direction-specific converters //! //! [`Bi`] is intentionally minimal and transport-agnostic: //! - [`Bi::recv`] yields inbound protocol messages @@ -25,60 +25,57 @@ //! inbound-related converter parameters are declared before outbound-related //! converter parameters. //! -//! [`ProtocolConverter`] is the boundary object that converts a protocol/domain -//! outbound item into the concrete outbound item expected by a transport sender. -//! The conversion is infallible, so domain-level recoverable failures should be -//! represented inside the domain outbound type itself (for example, -//! `Result`). +//! [`RecvConverter`] and [`SendConverter`] are infallible conversion traits used +//! by adapters to map between protocol-facing and transport-facing item types. +//! The traits themselves are not result-aware; adapters decide how transport +//! errors are handled before (or instead of) conversion. //! -//! [`GrpcAdapter`] combines: +//! [`grpc::GrpcAdapter`] combines: //! - a tonic inbound stream //! - a Tokio sender for outbound transport items -//! - a [`ProtocolConverter`] for the receive path -//! - a [`ProtocolConverter`] for the send path +//! - a [`RecvConverter`] for the receive path +//! - a [`SendConverter`] for the send path //! //! [`DummyTransport`] is a no-op implementation useful for tests and local actor //! execution where no real network stream exists. //! //! # Component Interaction //! -//! The typical layering looks like this: -//! //! ```text //! inbound (network -> protocol) //! ============================ //! -//! tonic::Streaming -> GrpcAdapter::recv() -> Bi::recv() -> protocol/session actor -//! | -//! +--> recv ProtocolConverter::convert(transport) -//! +//! tonic::Streaming +//! -> grpc::GrpcAdapter::recv() +//! | +//! +--> on `Ok(item)`: RecvConverter::convert(RecvTransport) -> Inbound +//! +--> on `Err(status)`: log error and close stream (`None`) +//! -> Bi::recv() +//! -> protocol/session actor +//! //! outbound (protocol -> network) //! ============================== //! -//! protocol/session actor -> Bi::send(domain outbound item, e.g. Result) -//! -> GrpcAdapter::send() -//! | -//! +--> send ProtocolConverter::convert(domain) -//! -> Tokio mpsc::Sender -> tonic response stream +//! protocol/session actor +//! -> Bi::send(Outbound) +//! -> grpc::GrpcAdapter::send() +//! | +//! +--> SendConverter::convert(Outbound) -> SendTransport +//! -> Tokio mpsc::Sender +//! -> tonic response stream //! ``` //! //! # Design Notes //! -//! - `recv()` collapses adapter-specific receive failures into `None`, which -//! lets protocol code treat stream termination and transport receive failure as -//! "no more inbound items" when no finer distinction is required. -//! - `send()` returns [`Error`] only for transport delivery failures (for example, -//! when the outbound channel is closed). -//! - Conversion policy lives outside protocol/session logic and can be defined at -//! the transport boundary (such as a server endpoint module). When domain and -//! transport types are identical, [`IdentityConverter`] can be used. +//! - `send()` returns [`Error`] only for transport delivery failures (for +//! example, when the outbound channel is closed). +//! - [`grpc::GrpcAdapter`] logs tonic receive errors and treats them as stream +//! closure (`None`). +//! - When protocol-facing and transport-facing types are identical, use +//! [`IdentityRecvConverter`] / [`IdentitySendConverter`]. use std::marker::PhantomData; -use futures::StreamExt; -use tokio::sync::mpsc; -use tonic::Streaming; - /// Errors returned by transport adapters implementing [`Bi`]. pub enum Error { /// The outbound side of the transport is no longer accepting messages. @@ -90,62 +87,37 @@ pub enum Error { /// `Bi` models a duplex channel with: /// - inbound items of type `Inbound` read via [`Bi::recv`] /// - outbound items of type `Outbound` written via [`Bi::send`] -/// -/// The trait intentionally exposes only the operations the protocol layer needs, -/// allowing it to work with gRPC streams and other transport implementations. -/// -/// # Stream termination and errors -/// -/// [`Bi::recv`] returns: -/// - `Some(item)` when a new inbound message is available -/// - `None` when the inbound stream ends or the underlying transport reports an error -/// -/// Implementations may collapse transport-specific receive errors into `None` -/// when the protocol does not need to distinguish them from normal stream -/// termination. pub trait Bi: Send + Sync + 'static { - /// Sends one outbound item to the peer. fn send( &mut self, item: Outbound, ) -> impl std::future::Future> + Send; - /// Receives the next inbound item. - /// - /// Returns `None` when the inbound stream is finished or can no longer - /// produce items. fn recv(&mut self) -> impl std::future::Future> + Send; } -/// Converts protocol/domain outbound items into transport-layer outbound items. -/// -/// This trait is used by transport adapters that need to emit a concrete stream -/// item type (for example, tonic server streams) while protocol code prefers to -/// work with domain-oriented outbound values. -/// -/// `convert` is infallible by design. Any recoverable protocol failure should be -/// represented in [`Self::Domain`] and mapped into the transport item in the -/// converter implementation. -pub trait ProtocolConverter: Send + Sync + 'static { - /// Outbound item produced by protocol/domain code. - type Domain; +/// Converts transport-facing inbound items into protocol-facing inbound items. +pub trait RecvConverter: Send + Sync + 'static { + type Input; + type Output; - /// Outbound item required by the transport sender. - type Transport; - - /// Maps a protocol/domain outbound item into the transport sender item. - fn convert(&self, item: Self::Domain) -> Self::Transport; + fn convert(&self, item: Self::Input) -> Self::Output; } -/// A [`ProtocolConverter`] that forwards values unchanged. -/// -/// Useful when the protocol-facing and transport-facing item types are -/// identical, but a converter is still required by an adapter API. -pub struct IdentityConverter { +/// Converts protocol/domain outbound items into transport-facing outbound items. +pub trait SendConverter: Send + Sync + 'static { + type Input; + type Output; + + fn convert(&self, item: Self::Input) -> Self::Output; +} + +/// A [`RecvConverter`] that forwards values unchanged. +pub struct IdentityRecvConverter { _marker: PhantomData, } -impl IdentityConverter { +impl IdentityRecvConverter { pub fn new() -> Self { Self { _marker: PhantomData, @@ -153,93 +125,133 @@ impl IdentityConverter { } } -impl Default for IdentityConverter { +impl Default for IdentityRecvConverter { fn default() -> Self { Self::new() } } -impl ProtocolConverter for IdentityConverter +impl RecvConverter for IdentityRecvConverter where T: Send + Sync + 'static, { - type Domain = T; - type Transport = T; + type Input = T; + type Output = T; - fn convert(&self, item: Self::Domain) -> Self::Transport { + fn convert(&self, item: Self::Input) -> Self::Output { item } } -/// [`Bi`] adapter backed by a tonic gRPC bidirectional stream. -/// -/// The adapter owns converter instances for both directions: -/// - receive converter: transport inbound -> protocol inbound -/// - send converter: protocol outbound -> transport outbound -/// -/// This keeps protocol actors decoupled from transport framing conventions in -/// both directions. -pub struct GrpcAdapter { - sender: mpsc::Sender, - receiver: Streaming, - inbound_converter: InboundConverter, - outbound_converter: OutboundConverter, +/// A [`SendConverter`] that forwards values unchanged. +pub struct IdentitySendConverter { + _marker: PhantomData, } -impl GrpcAdapter -where - InboundConverter: ProtocolConverter, - OutboundConverter: ProtocolConverter, -{ - /// Creates a new gRPC-backed [`Bi`] adapter. - /// - /// The provided converters define: - /// - the protocol outbound item and corresponding transport outbound item - /// - the transport inbound item and corresponding protocol inbound item - pub fn new( - sender: mpsc::Sender, - receiver: Streaming, - inbound_converter: InboundConverter, - outbound_converter: OutboundConverter, - ) -> Self { +impl IdentitySendConverter { + pub fn new() -> Self { Self { - sender, - receiver, - inbound_converter, - outbound_converter, + _marker: PhantomData, } } } -impl - Bi - for GrpcAdapter +impl Default for IdentitySendConverter { + fn default() -> Self { + Self::new() + } +} + +impl SendConverter for IdentitySendConverter where - InboundConverter: ProtocolConverter, - OutboundConverter: ProtocolConverter, - OutboundConverter::Domain: Send + 'static, - OutboundConverter::Transport: Send + 'static, - InboundConverter::Transport: Send + 'static, - InboundConverter::Domain: Send + 'static, + T: Send + Sync + 'static, { - #[tracing::instrument(level = "trace", skip(self, item))] - async fn send(&mut self, item: OutboundConverter::Domain) -> Result<(), Error> { - let outbound: OutboundConverter::Transport = self.outbound_converter.convert(item); - self.sender - .send(outbound) - .await - .map_err(|_| Error::ChannelClosed) + type Input = T; + type Output = T; + + fn convert(&self, item: Self::Input) -> Self::Output { + item + } +} + +/// gRPC-specific transport adapters and helpers. +pub mod grpc { + use futures::StreamExt; + use tokio::sync::mpsc; + use tonic::Streaming; + + use super::{Bi, Error, RecvConverter, SendConverter}; + + /// [`Bi`] adapter backed by a tonic gRPC bidirectional stream. + /// + + /// Tonic receive errors are logged and treated as stream closure (`None`). + /// The receive converter is only invoked for successful inbound transport + /// items. + pub struct GrpcAdapter + where + InboundConverter: RecvConverter, + OutboundConverter: SendConverter, + { + sender: mpsc::Sender, + receiver: Streaming, + inbound_converter: InboundConverter, + outbound_converter: OutboundConverter, } - #[tracing::instrument(level = "trace", skip(self))] - async fn recv(&mut self) -> Option { - self.receiver - .next() - .await - .transpose() - .ok() - .flatten() - .map(|item| self.inbound_converter.convert(item)) + + impl + GrpcAdapter + where + InboundConverter: RecvConverter, + OutboundConverter: SendConverter, + { + pub fn new( + sender: mpsc::Sender, + receiver: Streaming, + inbound_converter: InboundConverter, + outbound_converter: OutboundConverter, + ) -> Self { + Self { + sender, + receiver, + inbound_converter, + outbound_converter, + } + } + } + + + impl Bi + for GrpcAdapter + where + InboundTransport: Send + 'static, + Inbound: Send + 'static, + InboundConverter: RecvConverter, + OutboundConverter: SendConverter, + OutboundConverter::Input: Send + 'static, + OutboundConverter::Output: Send + 'static, + { + #[tracing::instrument(level = "trace", skip(self, item))] + async fn send(&mut self, item: OutboundConverter::Input) -> Result<(), Error> { + let outbound = self.outbound_converter.convert(item); + self.sender + .send(outbound) + .await + .map_err(|_| Error::ChannelClosed) + } + + #[tracing::instrument(level = "trace", skip(self))] + async fn recv(&mut self) -> Option { + match self.receiver.next().await { + Some(Ok(item)) => Some(self.inbound_converter.convert(item)), + Some(Err(error)) => { + tracing::error!(error = ?error, "grpc transport recv failed; closing stream"); + None + } + None => None, + } + } } } diff --git a/server/crates/arbiter-server/src/lib.rs b/server/crates/arbiter-server/src/lib.rs index 77e03f2..7fde954 100644 --- a/server/crates/arbiter-server/src/lib.rs +++ b/server/crates/arbiter-server/src/lib.rs @@ -1,7 +1,7 @@ #![forbid(unsafe_code)] use arbiter_proto::{ proto::{ClientRequest, ClientResponse, UserAgentRequest, UserAgentResponse}, - transport::{GrpcAdapter, IdentityConverter, ProtocolConverter}, + transport::{IdentityRecvConverter, SendConverter, grpc}, }; use async_trait::async_trait; use kameo::actor::Spawn; @@ -28,13 +28,14 @@ const DEFAULT_CHANNEL_SIZE: usize = 1000; /// /// The conversion is defined at the server boundary so the actor module remains /// focused on domain semantics and does not depend on tonic status encoding. -struct UserAgentGrpcConverter; +struct UserAgentGrpcSender; -impl ProtocolConverter for UserAgentGrpcConverter { - type Domain = Result; - type Transport = Result; - fn convert(&self, item: Self::Domain) -> Self::Transport { +impl SendConverter for UserAgentGrpcSender { + type Input = Result; + type Output = Result; + + fn convert(&self, item: Self::Input) -> Self::Output { match item { Ok(message) => Ok(message), Err(err) => Err(user_agent_error_status(err)), @@ -116,11 +117,11 @@ impl arbiter_proto::proto::arbiter_service_server::ArbiterService for Server { let req_stream = request.into_inner(); let (tx, rx) = mpsc::channel(DEFAULT_CHANNEL_SIZE); - let transport = GrpcAdapter::new( + let transport = grpc::GrpcAdapter::new( tx, req_stream, - IdentityConverter::::new(), - UserAgentGrpcConverter, + IdentityRecvConverter::::new(), + UserAgentGrpcSender, ); UserAgentActor::spawn(UserAgentActor::new(self.context.clone(), transport)); -- 2.49.1 From 61c65ddbcb947d30fef4c83730458989d26a4b3b Mon Sep 17 00:00:00 2001 From: hdbg Date: Thu, 26 Feb 2026 15:44:48 +0100 Subject: [PATCH 3/5] feat(useragent): initial connection impl --- server/Cargo.lock | 4 + server/crates/arbiter-server/src/lib.rs | 8 +- server/crates/arbiter-useragent/Cargo.toml | 7 +- server/crates/arbiter-useragent/src/grpc.rs | 90 +++++++++ server/crates/arbiter-useragent/src/lib.rs | 211 +++++++++++++++++++- 5 files changed, 305 insertions(+), 15 deletions(-) create mode 100644 server/crates/arbiter-useragent/src/grpc.rs diff --git a/server/Cargo.lock b/server/Cargo.lock index b7e6b44..5e8baaf 100644 --- a/server/Cargo.lock +++ b/server/Cargo.lock @@ -121,9 +121,13 @@ version = "0.1.0" dependencies = [ "arbiter-proto", "ed25519-dalek", + "http", "kameo", + "rustls-webpki", "smlang", + "thiserror", "tokio", + "tokio-stream", "tonic", "tracing", "x25519-dalek", diff --git a/server/crates/arbiter-server/src/lib.rs b/server/crates/arbiter-server/src/lib.rs index 7fde954..fe62c48 100644 --- a/server/crates/arbiter-server/src/lib.rs +++ b/server/crates/arbiter-server/src/lib.rs @@ -22,18 +22,16 @@ pub mod db; const DEFAULT_CHANNEL_SIZE: usize = 1000; - /// Converts User Agent domain outbounds into the tonic stream item emitted by -/// the server. +/// the server.ยง /// /// The conversion is defined at the server boundary so the actor module remains /// focused on domain semantics and does not depend on tonic status encoding. struct UserAgentGrpcSender; - impl SendConverter for UserAgentGrpcSender { - type Input = Result; - type Output = Result; + type Input = Result; + type Output = Result; fn convert(&self, item: Self::Input) -> Self::Output { match item { diff --git a/server/crates/arbiter-useragent/Cargo.toml b/server/crates/arbiter-useragent/Cargo.toml index 16eb12d..de46f67 100644 --- a/server/crates/arbiter-useragent/Cargo.toml +++ b/server/crates/arbiter-useragent/Cargo.toml @@ -9,7 +9,12 @@ arbiter-proto.path = "../arbiter-proto" kameo.workspace = true tokio = {workspace = true, features = ["net"]} tonic.workspace = true +tonic.features = ["tls-aws-lc"] tracing.workspace = true ed25519-dalek.workspace = true smlang.workspace = true -x25519-dalek.workspace = true \ No newline at end of file +x25519-dalek.workspace = true +thiserror.workspace = true +tokio-stream.workspace = true +http = "1.4.0" +rustls-webpki = { version = "0.103.9", features = ["aws-lc-rs"] } diff --git a/server/crates/arbiter-useragent/src/grpc.rs b/server/crates/arbiter-useragent/src/grpc.rs new file mode 100644 index 0000000..ef523a9 --- /dev/null +++ b/server/crates/arbiter-useragent/src/grpc.rs @@ -0,0 +1,90 @@ +use arbiter_proto::{ + proto::{ + UserAgentRequest, UserAgentResponse, arbiter_service_client::ArbiterServiceClient, + }, + transport::{RecvConverter, IdentitySendConverter, grpc}, + url::ArbiterUrl, +}; +use ed25519_dalek::SigningKey; +use kameo::actor::{ActorRef, Spawn}; + +use tokio::sync::mpsc; +use tokio_stream::wrappers::ReceiverStream; + +use tonic::transport::ClientTlsConfig; + + +#[derive(Debug, thiserror::Error)] +pub enum InitError { + #[error("Could establish connection")] + Connection(#[from] tonic::transport::Error), + + #[error("Invalid server URI")] + InvalidUri(#[from] http::uri::InvalidUri), + + #[error("Invalid CA certificate")] + InvalidCaCert(#[from] webpki::Error), + + #[error("gRPC error")] + Grpc(#[from] tonic::Status), +} + +pub struct InboundConverter; +impl RecvConverter for InboundConverter { + type Input = UserAgentResponse; + type Output = Result; + + fn convert(&self, item: Self::Input) -> Self::Output { + Ok(item) + } +} + +use crate::InboundError; + +use super::UserAgentActor; + +pub type UserAgentGrpc = ActorRef< + UserAgentActor< + grpc::GrpcAdapter< + UserAgentResponse, + Result, + InboundConverter, + IdentitySendConverter, + >, + >, +>; +pub async fn connect_grpc( + url: ArbiterUrl, + key: SigningKey, +) -> Result { + let bootstrap_token = url.bootstrap_token.clone(); + let anchor = webpki::anchor_from_trusted_cert(&url.ca_cert)?.to_owned(); + let tls = ClientTlsConfig::new().trust_anchor(anchor); + + // TODO: if `host` is localhost, we need to verify server's process authenticity + let channel = tonic::transport::Channel::from_shared(format!("{}:{}", url.host, url.port))? + .tls_config(tls)? + .connect() + .await?; + + let mut client = ArbiterServiceClient::new(channel); + let (tx, rx) = mpsc::channel(16); + let bistream = client.user_agent(ReceiverStream::new(rx)).await?; + let bistream = bistream.into_inner(); + + let adapter = grpc::GrpcAdapter::new( + tx, + bistream, + InboundConverter, + IdentitySendConverter::new(), + ); + + let actor = UserAgentActor::spawn(UserAgentActor { + key, + bootstrap_token, + state: super::UserAgentStateMachine::new(super::DummyContext), + transport: adapter, + }); + + Ok(actor) +} diff --git a/server/crates/arbiter-useragent/src/lib.rs b/server/crates/arbiter-useragent/src/lib.rs index c4da0d0..fbdec08 100644 --- a/server/crates/arbiter-useragent/src/lib.rs +++ b/server/crates/arbiter-useragent/src/lib.rs @@ -1,13 +1,206 @@ -use ed25519_dalek::SigningKey; -use kameo::Actor; -use tonic::transport::CertificateDer; +use arbiter_proto::{ + format_challenge, + proto::{ + UserAgentRequest, UserAgentResponse, + auth::{ + self, AuthChallengeRequest, AuthOk, ClientMessage as AuthClientMessage, + ServerMessage as AuthServerMessage, client_message::Payload as ClientAuthPayload, + server_message::Payload as ServerAuthPayload, + }, + user_agent_request::Payload as UserAgentRequestPayload, + user_agent_response::Payload as UserAgentResponsePayload, + }, + transport::Bi, +}; +use ed25519_dalek::{Signer, SigningKey}; +use kameo::{ + Actor, + actor::{ActorRef, Spawn}, + prelude::Message, +}; +use smlang::statemachine; +use tokio::select; +use tracing::{error, info}; -struct Storage { - pub identity: SigningKey, - pub server_ca_cert: CertificateDer<'static>, +#[derive(Debug, thiserror::Error)] +pub enum InboundError { + #[error("Invalid user agent response")] + InvalidResponse, + #[error("Expected response payload")] + MissingResponsePayload, + #[error("Unexpected response payload")] + UnexpectedResponsePayload, + #[error("Invalid state for auth challenge")] + InvalidStateForAuthChallenge, + #[error("Invalid state for auth ok")] + InvalidStateForAuthOk, + #[error("State machine error")] + StateTransitionFailed, + #[error("Transport send failed")] + TransportSendFailed, } -#[derive(Actor)] -pub struct UserAgent { +statemachine! { + name: UserAgent, + custom_error: false, + transitions: { + *Init + SentAuthChallengeRequest = WaitingForServerAuth, + WaitingForServerAuth + ReceivedAuthChallenge = WaitingForAuthOk, + WaitingForServerAuth + ReceivedAuthOk = Authenticated, + WaitingForAuthOk + ReceivedAuthOk = Authenticated, + } +} -} \ No newline at end of file +pub struct DummyContext; +impl UserAgentStateMachineContext for DummyContext {} + +pub struct UserAgentActor +where + Transport: Bi, UserAgentRequest>, +{ + key: SigningKey, + bootstrap_token: Option, + state: UserAgentStateMachine, + transport: Transport, +} + +impl UserAgentActor +where + Transport: Bi, UserAgentRequest>, +{ + fn transition(&mut self, event: UserAgentEvents) -> Result<(), InboundError> { + self.state.process_event(event).map_err(|e| { + error!(?e, "useragent state transition failed"); + InboundError::StateTransitionFailed + })?; + Ok(()) + } + + fn auth_request(payload: ClientAuthPayload) -> UserAgentRequest { + UserAgentRequest { + payload: Some(UserAgentRequestPayload::AuthMessage(AuthClientMessage { + payload: Some(payload), + })), + } + } + + async fn send_auth_challenge_request(&mut self) -> Result<(), InboundError> { + let req = AuthChallengeRequest { + pubkey: self.key.verifying_key().to_bytes().to_vec(), + bootstrap_token: self.bootstrap_token.take(), + }; + + self.transport + .send(Self::auth_request(ClientAuthPayload::AuthChallengeRequest(req))) + .await + .map_err(|_| InboundError::TransportSendFailed)?; + self.transition(UserAgentEvents::SentAuthChallengeRequest)?; + info!(actor = "useragent", "auth.request.sent"); + Ok(()) + } + + async fn handle_auth_challenge( + &mut self, + challenge: auth::AuthChallenge, + ) -> Result<(), InboundError> { + if !matches!(self.state.state(), UserAgentStates::WaitingForServerAuth) { + return Err(InboundError::InvalidStateForAuthChallenge); + } + + self.transition(UserAgentEvents::ReceivedAuthChallenge)?; + + let formatted = format_challenge(&challenge); + let signature = self.key.sign(&formatted); + let solution = auth::AuthChallengeSolution { + signature: signature.to_bytes().to_vec(), + }; + + self.transport + .send(Self::auth_request(ClientAuthPayload::AuthChallengeSolution( + solution, + ))) + .await + .map_err(|_| InboundError::TransportSendFailed)?; + + info!(actor = "useragent", "auth.solution.sent"); + Ok(()) + } + + fn handle_auth_ok(&mut self, _ok: AuthOk) -> Result<(), InboundError> { + match self.state.state() { + UserAgentStates::WaitingForServerAuth | UserAgentStates::WaitingForAuthOk => { + self.transition(UserAgentEvents::ReceivedAuthOk)?; + info!(actor = "useragent", "auth.ok"); + Ok(()) + } + _ => Err(InboundError::InvalidStateForAuthOk), + } + } + + pub async fn process_inbound_transport( + &mut self, + inbound: Result, + ) -> Result<(), InboundError> { + let response = inbound?; + let payload = response + .payload + .ok_or(InboundError::MissingResponsePayload)?; + + match payload { + UserAgentResponsePayload::AuthMessage(AuthServerMessage { + payload: Some(ServerAuthPayload::AuthChallenge(challenge)), + }) => self.handle_auth_challenge(challenge).await, + UserAgentResponsePayload::AuthMessage(AuthServerMessage { + payload: Some(ServerAuthPayload::AuthOk(ok)), + }) => self.handle_auth_ok(ok), + _ => Err(InboundError::UnexpectedResponsePayload), + } + } +} + +impl Actor for UserAgentActor +where + Transport: Bi, UserAgentRequest>, +{ + type Args = Self; + + type Error = (); + + async fn on_start(mut args: Self::Args, _actor_ref: ActorRef) -> Result { + if let Err(err) = args.send_auth_challenge_request().await { + error!(?err, actor = "useragent", "auth.start.failed"); + return Err(()); + } + Ok(args) + } + + async fn next( + &mut self, + _actor_ref: kameo::prelude::WeakActorRef, + mailbox_rx: &mut kameo::prelude::MailboxReceiver, + ) -> Option> { + loop { + select! { + signal = mailbox_rx.recv() => { + return signal; + } + inbound = self.transport.recv() => { + match inbound { + Some(inbound) => { + if let Err(err) = self.process_inbound_transport(inbound).await { + error!(?err, actor = "useragent", "transport.inbound.failed"); + return Some(kameo::mailbox::Signal::Stop); + } + } + None => { + info!(actor = "useragent", "transport.closed"); + return Some(kameo::mailbox::Signal::Stop); + } + } + } + } + } + } +} + +mod grpc; -- 2.49.1 From 3478204b9f8fe537af7de2afa8015f886a5f4087 Mon Sep 17 00:00:00 2001 From: hdbg Date: Thu, 26 Feb 2026 22:23:52 +0100 Subject: [PATCH 4/5] tests(user-agent): basic auth tests similar to `server` --- server/crates/arbiter-proto/src/transport.rs | 18 +-- server/crates/arbiter-useragent/src/grpc.rs | 33 +--- server/crates/arbiter-useragent/src/lib.rs | 89 ++++++----- server/crates/arbiter-useragent/tests/auth.rs | 151 ++++++++++++++++++ 4 files changed, 212 insertions(+), 79 deletions(-) create mode 100644 server/crates/arbiter-useragent/tests/auth.rs diff --git a/server/crates/arbiter-proto/src/transport.rs b/server/crates/arbiter-proto/src/transport.rs index 9ab433b..48bb9a3 100644 --- a/server/crates/arbiter-proto/src/transport.rs +++ b/server/crates/arbiter-proto/src/transport.rs @@ -188,20 +188,20 @@ pub mod grpc { /// Tonic receive errors are logged and treated as stream closure (`None`). /// The receive converter is only invoked for successful inbound transport /// items. - pub struct GrpcAdapter + pub struct GrpcAdapter where - InboundConverter: RecvConverter, + InboundConverter: RecvConverter, OutboundConverter: SendConverter, { sender: mpsc::Sender, - receiver: Streaming, + receiver: Streaming, inbound_converter: InboundConverter, outbound_converter: OutboundConverter, } impl - GrpcAdapter + GrpcAdapter where InboundConverter: RecvConverter, OutboundConverter: SendConverter, @@ -222,12 +222,10 @@ pub mod grpc { } - impl Bi - for GrpcAdapter + impl< InboundConverter, OutboundConverter> Bi + for GrpcAdapter where - InboundTransport: Send + 'static, - Inbound: Send + 'static, - InboundConverter: RecvConverter, + InboundConverter: RecvConverter, OutboundConverter: SendConverter, OutboundConverter::Input: Send + 'static, OutboundConverter::Output: Send + 'static, @@ -242,7 +240,7 @@ pub mod grpc { } #[tracing::instrument(level = "trace", skip(self))] - async fn recv(&mut self) -> Option { + async fn recv(&mut self) -> Option { match self.receiver.next().await { Some(Ok(item)) => Some(self.inbound_converter.convert(item)), Some(Err(error)) => { diff --git a/server/crates/arbiter-useragent/src/grpc.rs b/server/crates/arbiter-useragent/src/grpc.rs index ef523a9..9a00ad7 100644 --- a/server/crates/arbiter-useragent/src/grpc.rs +++ b/server/crates/arbiter-useragent/src/grpc.rs @@ -2,7 +2,7 @@ use arbiter_proto::{ proto::{ UserAgentRequest, UserAgentResponse, arbiter_service_client::ArbiterServiceClient, }, - transport::{RecvConverter, IdentitySendConverter, grpc}, + transport::{IdentityRecvConverter, IdentitySendConverter, RecvConverter, grpc}, url::ArbiterUrl, }; use ed25519_dalek::SigningKey; @@ -15,7 +15,7 @@ use tonic::transport::ClientTlsConfig; #[derive(Debug, thiserror::Error)] -pub enum InitError { +pub enum ConnectError { #[error("Could establish connection")] Connection(#[from] tonic::transport::Error), @@ -29,26 +29,12 @@ pub enum InitError { Grpc(#[from] tonic::Status), } -pub struct InboundConverter; -impl RecvConverter for InboundConverter { - type Input = UserAgentResponse; - type Output = Result; - - fn convert(&self, item: Self::Input) -> Self::Output { - Ok(item) - } -} - -use crate::InboundError; - -use super::UserAgentActor; + use super::UserAgentActor; pub type UserAgentGrpc = ActorRef< UserAgentActor< grpc::GrpcAdapter< - UserAgentResponse, - Result, - InboundConverter, + IdentityRecvConverter, IdentitySendConverter, >, >, @@ -56,7 +42,7 @@ pub type UserAgentGrpc = ActorRef< pub async fn connect_grpc( url: ArbiterUrl, key: SigningKey, -) -> Result { +) -> Result { let bootstrap_token = url.bootstrap_token.clone(); let anchor = webpki::anchor_from_trusted_cert(&url.ca_cert)?.to_owned(); let tls = ClientTlsConfig::new().trust_anchor(anchor); @@ -75,16 +61,11 @@ pub async fn connect_grpc( let adapter = grpc::GrpcAdapter::new( tx, bistream, - InboundConverter, + IdentityRecvConverter::new(), IdentitySendConverter::new(), ); - let actor = UserAgentActor::spawn(UserAgentActor { - key, - bootstrap_token, - state: super::UserAgentStateMachine::new(super::DummyContext), - transport: adapter, - }); + let actor = UserAgentActor::spawn(UserAgentActor::new(key, bootstrap_token, adapter)); Ok(actor) } diff --git a/server/crates/arbiter-useragent/src/lib.rs b/server/crates/arbiter-useragent/src/lib.rs index fbdec08..5972d7e 100644 --- a/server/crates/arbiter-useragent/src/lib.rs +++ b/server/crates/arbiter-useragent/src/lib.rs @@ -13,15 +13,25 @@ use arbiter_proto::{ transport::Bi, }; use ed25519_dalek::{Signer, SigningKey}; -use kameo::{ - Actor, - actor::{ActorRef, Spawn}, - prelude::Message, -}; +use kameo::{Actor, actor::ActorRef}; use smlang::statemachine; use tokio::select; use tracing::{error, info}; +statemachine! { + name: UserAgent, + custom_error: false, + transitions: { + *Init + SentAuthChallengeRequest = WaitingForServerAuth, + WaitingForServerAuth + ReceivedAuthChallenge = WaitingForAuthOk, + WaitingForServerAuth + ReceivedAuthOk = Authenticated, + WaitingForAuthOk + ReceivedAuthOk = Authenticated, + } +} + +pub struct DummyContext; +impl UserAgentStateMachineContext for DummyContext {} + #[derive(Debug, thiserror::Error)] pub enum InboundError { #[error("Invalid user agent response")] @@ -40,23 +50,9 @@ pub enum InboundError { TransportSendFailed, } -statemachine! { - name: UserAgent, - custom_error: false, - transitions: { - *Init + SentAuthChallengeRequest = WaitingForServerAuth, - WaitingForServerAuth + ReceivedAuthChallenge = WaitingForAuthOk, - WaitingForServerAuth + ReceivedAuthOk = Authenticated, - WaitingForAuthOk + ReceivedAuthOk = Authenticated, - } -} - -pub struct DummyContext; -impl UserAgentStateMachineContext for DummyContext {} - pub struct UserAgentActor where - Transport: Bi, UserAgentRequest>, + Transport: Bi, { key: SigningKey, bootstrap_token: Option, @@ -66,8 +62,17 @@ where impl UserAgentActor where - Transport: Bi, UserAgentRequest>, + Transport: Bi, { + pub fn new(key: SigningKey, bootstrap_token: Option, transport: Transport) -> Self { + Self { + key, + bootstrap_token, + state: UserAgentStateMachine::new(DummyContext), + transport, + } + } + fn transition(&mut self, event: UserAgentEvents) -> Result<(), InboundError> { self.state.process_event(event).map_err(|e| { error!(?e, "useragent state transition failed"); @@ -90,11 +95,15 @@ where bootstrap_token: self.bootstrap_token.take(), }; + self.transition(UserAgentEvents::SentAuthChallengeRequest)?; + self.transport - .send(Self::auth_request(ClientAuthPayload::AuthChallengeRequest(req))) + .send(Self::auth_request(ClientAuthPayload::AuthChallengeRequest( + req, + ))) .await .map_err(|_| InboundError::TransportSendFailed)?; - self.transition(UserAgentEvents::SentAuthChallengeRequest)?; + info!(actor = "useragent", "auth.request.sent"); Ok(()) } @@ -103,10 +112,6 @@ where &mut self, challenge: auth::AuthChallenge, ) -> Result<(), InboundError> { - if !matches!(self.state.state(), UserAgentStates::WaitingForServerAuth) { - return Err(InboundError::InvalidStateForAuthChallenge); - } - self.transition(UserAgentEvents::ReceivedAuthChallenge)?; let formatted = format_challenge(&challenge); @@ -116,9 +121,9 @@ where }; self.transport - .send(Self::auth_request(ClientAuthPayload::AuthChallengeSolution( - solution, - ))) + .send(Self::auth_request( + ClientAuthPayload::AuthChallengeSolution(solution), + )) .await .map_err(|_| InboundError::TransportSendFailed)?; @@ -127,22 +132,16 @@ where } fn handle_auth_ok(&mut self, _ok: AuthOk) -> Result<(), InboundError> { - match self.state.state() { - UserAgentStates::WaitingForServerAuth | UserAgentStates::WaitingForAuthOk => { - self.transition(UserAgentEvents::ReceivedAuthOk)?; - info!(actor = "useragent", "auth.ok"); - Ok(()) - } - _ => Err(InboundError::InvalidStateForAuthOk), - } + self.transition(UserAgentEvents::ReceivedAuthOk)?; + info!(actor = "useragent", "auth.ok"); + Ok(()) } pub async fn process_inbound_transport( &mut self, - inbound: Result, + inbound: UserAgentResponse ) -> Result<(), InboundError> { - let response = inbound?; - let payload = response + let payload = inbound .payload .ok_or(InboundError::MissingResponsePayload)?; @@ -160,13 +159,16 @@ where impl Actor for UserAgentActor where - Transport: Bi, UserAgentRequest>, + Transport: Bi, { type Args = Self; type Error = (); - async fn on_start(mut args: Self::Args, _actor_ref: ActorRef) -> Result { + async fn on_start( + mut args: Self::Args, + _actor_ref: ActorRef, + ) -> Result { if let Err(err) = args.send_auth_challenge_request().await { error!(?err, actor = "useragent", "auth.start.failed"); return Err(()); @@ -204,3 +206,4 @@ where } mod grpc; +pub use grpc::{connect_grpc, ConnectError}; \ No newline at end of file diff --git a/server/crates/arbiter-useragent/tests/auth.rs b/server/crates/arbiter-useragent/tests/auth.rs new file mode 100644 index 0000000..a883f15 --- /dev/null +++ b/server/crates/arbiter-useragent/tests/auth.rs @@ -0,0 +1,151 @@ +use arbiter_proto::{ + format_challenge, + proto::{ + UserAgentRequest, UserAgentResponse, + auth::{ + AuthChallenge, AuthOk, ClientMessage as AuthClientMessage, + ServerMessage as AuthServerMessage, client_message::Payload as ClientAuthPayload, + server_message::Payload as ServerAuthPayload, + }, + user_agent_request::Payload as UserAgentRequestPayload, + user_agent_response::Payload as UserAgentResponsePayload, + }, + transport::Bi, +}; +use arbiter_useragent::{InboundError, UserAgentActor}; +use ed25519_dalek::SigningKey; +use kameo::actor::Spawn; +use tokio::sync::mpsc; +use tokio::time::{Duration, timeout}; + +struct TestTransport { + inbound_rx: mpsc::Receiver, + outbound_tx: mpsc::Sender, +} + +impl Bi for TestTransport { + async fn send(&mut self, item: UserAgentRequest) -> Result<(), arbiter_proto::transport::Error> { + self.outbound_tx + .send(item) + .await + .map_err(|_| arbiter_proto::transport::Error::ChannelClosed) + } + + async fn recv(&mut self) -> Option { + self.inbound_rx.recv().await + } +} + +fn make_transport() -> ( + TestTransport, + mpsc::Sender, + mpsc::Receiver, +) { + let (inbound_tx, inbound_rx) = mpsc::channel(8); + let (outbound_tx, outbound_rx) = mpsc::channel(8); + ( + TestTransport { + inbound_rx, + outbound_tx, + }, + inbound_tx, + outbound_rx, + ) +} + +fn test_key() -> SigningKey { + SigningKey::from_bytes(&[7u8; 32]) +} + +fn auth_response(payload: ServerAuthPayload) -> UserAgentResponse { + UserAgentResponse { + payload: Some(UserAgentResponsePayload::AuthMessage(AuthServerMessage { + payload: Some(payload), + })), + } +} + +#[tokio::test] +async fn sends_auth_request_on_start_with_bootstrap_token() { + let key = test_key(); + let pubkey = key.verifying_key().to_bytes().to_vec(); + let bootstrap_token = Some("bootstrap-123".to_string()); + let (transport, inbound_tx, mut outbound_rx) = make_transport(); + + let actor = UserAgentActor::spawn(UserAgentActor::new(key, bootstrap_token.clone(), transport)); + + let outbound = timeout(Duration::from_secs(1), outbound_rx.recv()) + .await + .expect("timed out waiting for auth request") + .expect("channel closed before auth request"); + + let UserAgentRequest { + payload: Some(UserAgentRequestPayload::AuthMessage(AuthClientMessage { + payload: Some(ClientAuthPayload::AuthChallengeRequest(req)), + })), + } = outbound + else { + panic!("expected auth challenge request"); + }; + + assert_eq!(req.pubkey, pubkey); + assert_eq!(req.bootstrap_token, bootstrap_token); + + drop(inbound_tx); + drop(actor); +} + +#[tokio::test] +async fn challenge_flow_sends_solution_from_transport_inbound() { + let key = test_key(); + let verify_key = key.verifying_key(); + let (transport, inbound_tx, mut outbound_rx) = make_transport(); + + let actor = UserAgentActor::spawn(UserAgentActor::new(key, None, transport)); + + let _initial_auth_request = timeout(Duration::from_secs(1), outbound_rx.recv()) + .await + .expect("timed out waiting for initial auth request") + .expect("missing initial auth request"); + + let challenge = AuthChallenge { + pubkey: verify_key.to_bytes().to_vec(), + nonce: 42, + }; + inbound_tx + .send(auth_response(ServerAuthPayload::AuthChallenge(challenge.clone()))) + .await + .unwrap(); + + let outbound = timeout(Duration::from_secs(1), outbound_rx.recv()) + .await + .expect("timed out waiting for challenge solution") + .expect("missing challenge solution"); + + let UserAgentRequest { + payload: Some(UserAgentRequestPayload::AuthMessage(AuthClientMessage { + payload: Some(ClientAuthPayload::AuthChallengeSolution(solution)), + })), + } = outbound + else { + panic!("expected auth challenge solution"); + }; + + let formatted = format_challenge(&challenge); + let sig: ed25519_dalek::Signature = solution + .signature + .as_slice() + .try_into() + .expect("signature bytes length"); + verify_key + .verify_strict(&formatted, &sig) + .expect("solution signature should verify"); + + inbound_tx + .send(auth_response(ServerAuthPayload::AuthOk(AuthOk {}))) + .await + .unwrap(); + + drop(inbound_tx); + drop(actor); +} -- 2.49.1 From 3cc63474a878cf4c3b50c67f9b6c841b52e698e9 Mon Sep 17 00:00:00 2001 From: hdbg Date: Thu, 26 Feb 2026 22:41:36 +0100 Subject: [PATCH 5/5] chore(server): update Cargo.lock dependencies --- server/Cargo.lock | 190 +++++++++++++++++++++++----------------------- 1 file changed, 95 insertions(+), 95 deletions(-) diff --git a/server/Cargo.lock b/server/Cargo.lock index 5e8baaf..e6b7973 100644 --- a/server/Cargo.lock +++ b/server/Cargo.lock @@ -47,9 +47,9 @@ dependencies = [ [[package]] name = "anyhow" -version = "1.0.101" +version = "1.0.102" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f0e0fee31ef5ed1ba1316088939cea399010ed7731dba877ed44aeb407a75ea" +checksum = "7f202df86484c868dbad7eaa557ef785d5c66295e41b460ef922eca0723b842c" [[package]] name = "arbiter-client" @@ -170,7 +170,7 @@ checksum = "3109e49b1e4909e9db6515a30c633684d68cdeaa252f215214cb4fa1a5bfee2c" dependencies = [ "proc-macro2", "quote", - "syn 2.0.115", + "syn 2.0.117", "synstructure", ] @@ -182,7 +182,7 @@ checksum = "7b18050c2cd6fe86c3a76584ef5e0baf286d038cda203eb6223df2cc413565f7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.115", + "syn 2.0.117", ] [[package]] @@ -193,7 +193,7 @@ checksum = "9035ad2d096bed7955a320ee7e2230574d28fd3c3a0f186cbea1ff3c7eed5dbb" dependencies = [ "proc-macro2", "quote", - "syn 2.0.115", + "syn 2.0.117", ] [[package]] @@ -210,9 +210,9 @@ checksum = "c08606f8c3cbf4ce6ec8e28fb0014a2c086708fe954eaa885384a6165172e7e8" [[package]] name = "aws-lc-rs" -version = "1.15.4" +version = "1.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7b7b6141e96a8c160799cc2d5adecd5cbbe5054cb8c7c4af53da0f83bb7ad256" +checksum = "d9a7b350e3bb1767102698302bc37256cbd48422809984b98d292c40e2579aa9" dependencies = [ "aws-lc-sys", "untrusted 0.7.1", @@ -348,18 +348,18 @@ dependencies = [ [[package]] name = "block-buffer" -version = "0.11.0" +version = "0.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "96eb4cdd6cf1b31d671e9efe75c5d1ec614776856cefbe109ca373554a6d514f" +checksum = "cdd35008169921d80bc60d3d0ab416eecb028c4cd653352907921d95084790be" dependencies = [ "hybrid-array", ] [[package]] name = "bumpalo" -version = "3.19.1" +version = "3.20.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5dd9dc738b7a8311c7ade152424974d8115f2cdad61e8dab8dac9f2362298510" +checksum = "5d20789868f4b01b2f2caec9f5c4e0213b41e3e5702a50157d699ae31ced2fcb" [[package]] name = "bytes" @@ -422,9 +422,9 @@ dependencies = [ [[package]] name = "chrono" -version = "0.4.43" +version = "0.4.44" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fac4744fb15ae8337dc853fee7fb3f4e48c0fbaa23d0afe49c447b4fab126118" +checksum = "c673075a2e0e5f4a1dde27ce9dee1ea4558c7ffe648f576438a20ca1d2acc4b0" dependencies = [ "iana-time-zone", "js-sys", @@ -518,9 +518,9 @@ dependencies = [ [[package]] name = "crypto-common" -version = "0.2.0" +version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "211f05e03c7d03754740fd9e585de910a095d6b99f8bcfffdef8319fa02a8331" +checksum = "77727bb15fa921304124b128af125e7e3b968275d1b108b379190264f4423710" dependencies = [ "hybrid-array", ] @@ -549,7 +549,7 @@ dependencies = [ "cfg-if", "cpufeatures 0.2.17", "curve25519-dalek-derive", - "digest 0.11.0", + "digest 0.11.1", "fiat-crypto 0.3.0", "rustc_version", "subtle", @@ -564,7 +564,7 @@ checksum = "f46882e17999c6cc590af592290432be3bce0428cb0d5f8b6715e4dc7b383eb3" dependencies = [ "proc-macro2", "quote", - "syn 2.0.115", + "syn 2.0.117", ] [[package]] @@ -588,7 +588,7 @@ dependencies = [ "proc-macro2", "quote", "strsim", - "syn 2.0.115", + "syn 2.0.117", ] [[package]] @@ -599,7 +599,7 @@ checksum = "d38308df82d1080de0afee5d069fa14b0326a88c14f15c5ccda35b4a6c414c81" dependencies = [ "darling_core", "quote", - "syn 2.0.115", + "syn 2.0.117", ] [[package]] @@ -638,9 +638,9 @@ dependencies = [ [[package]] name = "deranged" -version = "0.5.6" +version = "0.5.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cc3dc5ad92c2e2d1c193bbbbdf2ea477cb81331de4f3103f267ca18368b988c4" +checksum = "7cd812cc2bc1d69d4764bd80df88b4317eaef9e773c75226407d9bc0876b211c" dependencies = [ "powerfmt", ] @@ -686,7 +686,7 @@ dependencies = [ "dsl_auto_type", "proc-macro2", "quote", - "syn 2.0.115", + "syn 2.0.117", ] [[package]] @@ -706,7 +706,7 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fe2444076b48641147115697648dc743c2c00b61adade0f01ce67133c7babe8c" dependencies = [ - "syn 2.0.115", + "syn 2.0.117", ] [[package]] @@ -722,12 +722,12 @@ dependencies = [ [[package]] name = "digest" -version = "0.11.0" +version = "0.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f8bf3682cdec91817be507e4aa104314898b95b84d74f3d43882210101a545b6" +checksum = "285743a676ccb6b3e116bc14cc69319b957867930ae9c4822f8e0f54509d7243" dependencies = [ - "block-buffer 0.11.0", - "crypto-common 0.2.0", + "block-buffer 0.12.0", + "crypto-common 0.2.1", ] [[package]] @@ -738,7 +738,7 @@ checksum = "97369cbbc041bc366949bc74d34658d6cda5621039731c6310521892a3a20ae0" dependencies = [ "proc-macro2", "quote", - "syn 2.0.115", + "syn 2.0.117", ] [[package]] @@ -758,7 +758,7 @@ dependencies = [ "heck", "proc-macro2", "quote", - "syn 2.0.115", + "syn 2.0.117", ] [[package]] @@ -953,7 +953,7 @@ checksum = "e835b70203e41293343137df5c0664546da5745f82ec9b84d40be8336958447b" dependencies = [ "proc-macro2", "quote", - "syn 2.0.115", + "syn 2.0.117", ] [[package]] @@ -1413,9 +1413,9 @@ dependencies = [ [[package]] name = "js-sys" -version = "0.3.85" +version = "0.3.90" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8c942ebf8e95485ca0d52d97da7c5a2c387d0e7f0ba4c35e93bfcaee045955b3" +checksum = "14dc6f6450b3f6d4ed5b16327f38fed626d375a886159ca555bd7822c0c3a5a6" dependencies = [ "once_cell", "wasm-bindgen", @@ -1445,7 +1445,7 @@ dependencies = [ "heck", "proc-macro2", "quote", - "syn 2.0.115", + "syn 2.0.117", ] [[package]] @@ -1478,9 +1478,9 @@ dependencies = [ [[package]] name = "linux-raw-sys" -version = "0.11.0" +version = "0.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "df1d3c3b53da64cf5760482273a98e575c651a67eec7f77df96b5b642de8f039" +checksum = "32a66949e030da00e8c7d4434b251670a91556f4144941d37452769c25d58a53" [[package]] name = "litemap" @@ -1562,7 +1562,7 @@ checksum = "db5b29714e950dbb20d5e6f74f9dcec4edbcc1067bb7f8ed198c097b8c1a818b" dependencies = [ "proc-macro2", "quote", - "syn 2.0.115", + "syn 2.0.117", ] [[package]] @@ -1710,9 +1710,9 @@ checksum = "c08d65885ee38876c4f86fa503fb49d7b507c2b62552df7c70b2fce627e06381" [[package]] name = "owo-colors" -version = "4.2.3" +version = "4.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9c6901729fa79e91a0913333229e9ca5dc725089d1c363b2f4b4760709dc4a52" +checksum = "d211803b9b6b570f68772237e415a029d5a50c65d382910b879fb19d3271f94d" [[package]] name = "parking_lot" @@ -1792,7 +1792,7 @@ checksum = "6e918e4ff8c4549eb882f14b3a4bc8c8bc93de829416eacf579f1207a8fbf861" dependencies = [ "proc-macro2", "quote", - "syn 2.0.115", + "syn 2.0.117", ] [[package]] @@ -1852,7 +1852,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "479ca8adacdd7ce8f1fb39ce9ecccbfe93a3f1344b3d0d97f20bc0196208f62b" dependencies = [ "proc-macro2", - "syn 2.0.115", + "syn 2.0.117", ] [[package]] @@ -1924,7 +1924,7 @@ dependencies = [ "pulldown-cmark", "pulldown-cmark-to-cmark", "regex", - "syn 2.0.115", + "syn 2.0.117", "tempfile", ] @@ -1938,7 +1938,7 @@ dependencies = [ "itertools", "proc-macro2", "quote", - "syn 2.0.115", + "syn 2.0.117", ] [[package]] @@ -1952,9 +1952,9 @@ dependencies = [ [[package]] name = "pulldown-cmark" -version = "0.13.0" +version = "0.13.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1e8bbe1a966bd2f362681a44f6edce3c2310ac21e4d5067a6e7ec396297a6ea0" +checksum = "83c41efbf8f90ac44de7f3a868f0867851d261b56291732d0cbf7cceaaeb55a6" dependencies = [ "bitflags", "memchr", @@ -2060,9 +2060,9 @@ dependencies = [ [[package]] name = "regex-syntax" -version = "0.8.9" +version = "0.8.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a96887878f22d7bad8a3b6dc5b7440e0ada9a245242924394987b21cf2210a4c" +checksum = "dc897dd8d9e8bd1ed8cdad82b5966c3e0ecae09fb1907d58efaa013543185d0a" [[package]] name = "relative-path" @@ -2079,7 +2079,7 @@ dependencies = [ "proc-macro-error", "proc-macro2", "quote", - "syn 2.0.115", + "syn 2.0.117", ] [[package]] @@ -2131,7 +2131,7 @@ dependencies = [ "regex", "relative-path", "rustc_version", - "syn 2.0.115", + "syn 2.0.117", "unicode-ident", ] @@ -2161,9 +2161,9 @@ dependencies = [ [[package]] name = "rustix" -version = "1.1.3" +version = "1.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "146c9e247ccc180c1f61615433868c99f3de3ae256a30a43b49f67c2d9171f34" +checksum = "b6fe4565b9518b83ef4f91bb47ce29620ca828bd32cb7e408f0062e9930ba190" dependencies = [ "bitflags", "errno", @@ -2174,9 +2174,9 @@ dependencies = [ [[package]] name = "rustls" -version = "0.23.36" +version = "0.23.37" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c665f33d38cea657d9614f766881e4d510e0eda4239891eea56b4cadcf01801b" +checksum = "758025cb5fccfd3bc2fd74708fd4682be41d99e5dff73c377c0646c6012c73a4" dependencies = [ "aws-lc-rs", "log", @@ -2271,7 +2271,7 @@ checksum = "d540f220d3187173da220f885ab66608367b6574e925011a9353e4badda91d79" dependencies = [ "proc-macro2", "quote", - "syn 2.0.115", + "syn 2.0.117", ] [[package]] @@ -2304,7 +2304,7 @@ checksum = "7c5f3b1e2dc8aad28310d8410bd4d7e180eca65fca176c52ab00d364475d0024" dependencies = [ "cfg-if", "cpufeatures 0.2.17", - "digest 0.11.0", + "digest 0.11.1", ] [[package]] @@ -2441,7 +2441,7 @@ dependencies = [ "heck", "proc-macro2", "quote", - "syn 2.0.115", + "syn 2.0.117", ] [[package]] @@ -2484,9 +2484,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.115" +version = "2.0.117" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6e614ed320ac28113fa64972c4262d5dbc89deacdfd00c34a3e4cea073243c12" +checksum = "e665b8803e7b1d2a727f4023456bbbbe74da67099c585258af0ad9c5013b9b99" dependencies = [ "proc-macro2", "quote", @@ -2507,14 +2507,14 @@ checksum = "728a70f3dbaf5bab7f0c4b1ac8d7ae5ea60a4b5549c8a5914361c99147a709d2" dependencies = [ "proc-macro2", "quote", - "syn 2.0.115", + "syn 2.0.117", ] [[package]] name = "tempfile" -version = "3.25.0" +version = "3.26.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0136791f7c95b1f6dd99f9cc786b91bb81c3800b639b3478e561ddb7be95e5f1" +checksum = "82a72c767771b47409d2345987fda8628641887d5466101319899796367354a0" dependencies = [ "fastrand", "getrandom 0.4.1", @@ -2551,7 +2551,7 @@ checksum = "be35209fd0781c5401458ab66e4f98accf63553e8fae7425503e92fdd319783b" dependencies = [ "proc-macro2", "quote", - "syn 2.0.115", + "syn 2.0.117", ] [[package]] @@ -2581,7 +2581,7 @@ checksum = "ebc4ee7f67670e9b64d05fa4253e753e016c6c95ff35b89b7941d6b856dec1d5" dependencies = [ "proc-macro2", "quote", - "syn 2.0.115", + "syn 2.0.117", ] [[package]] @@ -2660,7 +2660,7 @@ checksum = "af407857209536a95c8e56f8231ef2c2e2aff839b22e07a1ffcbc617e9db9fa5" dependencies = [ "proc-macro2", "quote", - "syn 2.0.115", + "syn 2.0.117", ] [[package]] @@ -2734,18 +2734,18 @@ dependencies = [ [[package]] name = "toml_parser" -version = "1.0.8+spec-1.1.0" +version = "1.0.9+spec-1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0742ff5ff03ea7e67c8ae6c93cac239e0d9784833362da3f9a9c1da8dfefcbdc" +checksum = "702d4415e08923e7e1ef96cd5727c0dfed80b4d2fa25db9647fe5eb6f7c5a4c4" dependencies = [ "winnow", ] [[package]] name = "tonic" -version = "0.14.4" +version = "0.14.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7f32a6f80051a4111560201420c7885d0082ba9efe2ab61875c587bb6b18b9a0" +checksum = "fec7c61a0695dc1887c1b53952990f3ad2e3a31453e1f49f10e75424943a93ec" dependencies = [ "async-trait", "axum", @@ -2775,21 +2775,21 @@ dependencies = [ [[package]] name = "tonic-build" -version = "0.14.4" +version = "0.14.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ce6d8958ed3be404120ca43ffa0fb1e1fc7be214e96c8d33bd43a131b6eebc9e" +checksum = "1882ac3bf5ef12877d7ed57aad87e75154c11931c2ba7e6cde5e22d63522c734" dependencies = [ "prettyplease", "proc-macro2", "quote", - "syn 2.0.115", + "syn 2.0.117", ] [[package]] name = "tonic-prost" -version = "0.14.4" +version = "0.14.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9f86539c0089bfd09b1f8c0ab0239d80392af74c21bc9e0f15e1b4aca4c1647f" +checksum = "a55376a0bbaa4975a3f10d009ad763d8f4108f067c7c2e74f3001fb49778d309" dependencies = [ "bytes", "prost", @@ -2798,16 +2798,16 @@ dependencies = [ [[package]] name = "tonic-prost-build" -version = "0.14.4" +version = "0.14.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "65873ace111e90344b8973e94a1fc817c924473affff24629281f90daed1cd2e" +checksum = "f3144df636917574672e93d0f56d7edec49f90305749c668df5101751bb8f95a" dependencies = [ "prettyplease", "proc-macro2", "prost-build", "prost-types", "quote", - "syn 2.0.115", + "syn 2.0.117", "tempfile", "tonic-build", ] @@ -2862,7 +2862,7 @@ checksum = "7490cfa5ec963746568740651ac6781f701c9c5ea257c58e057f3ba8cf69e8da" dependencies = [ "proc-macro2", "quote", - "syn 2.0.115", + "syn 2.0.117", ] [[package]] @@ -2924,9 +2924,9 @@ checksum = "dbc4bc3a9f746d862c45cb89d705aa10f187bb96c76001afab07a0d35ce60142" [[package]] name = "unicode-ident" -version = "1.0.23" +version = "1.0.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "537dd038a89878be9b64dd4bd1b260315c1bb94f4d784956b81e27a088d9a09e" +checksum = "e6e4313cd5fcd3dad5cafa179702e2b244f760991f45397d14d4ebf38247da75" [[package]] name = "unicode-linebreak" @@ -3055,9 +3055,9 @@ dependencies = [ [[package]] name = "wasm-bindgen" -version = "0.2.108" +version = "0.2.113" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "64024a30ec1e37399cf85a7ffefebdb72205ca1c972291c51512360d90bd8566" +checksum = "60722a937f594b7fde9adb894d7c092fc1bb6612897c46368d18e7a20208eff2" dependencies = [ "cfg-if", "once_cell", @@ -3068,9 +3068,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro" -version = "0.2.108" +version = "0.2.113" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "008b239d9c740232e71bd39e8ef6429d27097518b6b30bdf9086833bd5b6d608" +checksum = "0fac8c6395094b6b91c4af293f4c79371c163f9a6f56184d2c9a85f5a95f3950" dependencies = [ "quote", "wasm-bindgen-macro-support", @@ -3078,22 +3078,22 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro-support" -version = "0.2.108" +version = "0.2.113" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5256bae2d58f54820e6490f9839c49780dff84c65aeab9e772f15d5f0e913a55" +checksum = "ab3fabce6159dc20728033842636887e4877688ae94382766e00b180abac9d60" dependencies = [ "bumpalo", "proc-macro2", "quote", - "syn 2.0.115", + "syn 2.0.117", "wasm-bindgen-shared", ] [[package]] name = "wasm-bindgen-shared" -version = "0.2.108" +version = "0.2.113" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1f01b580c9ac74c8d8f0c0e4afb04eeef2acf145458e52c03845ee9cd23e3d12" +checksum = "de0e091bdb824da87dc01d967388880d017a0a9bc4f3bdc0d86ee9f9336e3bb5" dependencies = [ "unicode-ident", ] @@ -3175,7 +3175,7 @@ checksum = "053e2e040ab57b9dc951b72c264860db7eb3b0200ba345b4e4c3b14f67855ddf" dependencies = [ "proc-macro2", "quote", - "syn 2.0.115", + "syn 2.0.117", ] [[package]] @@ -3186,7 +3186,7 @@ checksum = "3f316c4a2570ba26bbec722032c4099d8c8bc095efccdc15688708623367e358" dependencies = [ "proc-macro2", "quote", - "syn 2.0.115", + "syn 2.0.117", ] [[package]] @@ -3417,7 +3417,7 @@ dependencies = [ "heck", "indexmap", "prettyplease", - "syn 2.0.115", + "syn 2.0.117", "wasm-metadata", "wit-bindgen-core", "wit-component", @@ -3433,7 +3433,7 @@ dependencies = [ "prettyplease", "proc-macro2", "quote", - "syn 2.0.115", + "syn 2.0.117", "wit-bindgen-core", "wit-bindgen-rust", ] @@ -3539,7 +3539,7 @@ checksum = "b659052874eb698efe5b9e8cf382204678a0086ebf46982b79d6ca3182927e5d" dependencies = [ "proc-macro2", "quote", - "syn 2.0.115", + "syn 2.0.117", "synstructure", ] @@ -3560,7 +3560,7 @@ checksum = "d71e5d6e06ab090c67b5e44993ec16b72dcbaabc526db883a360057678b48502" dependencies = [ "proc-macro2", "quote", - "syn 2.0.115", + "syn 2.0.117", "synstructure", ] @@ -3581,7 +3581,7 @@ checksum = "85a5b4158499876c763cb03bc4e49185d3cccbabb15b33c627f7884f43db852e" dependencies = [ "proc-macro2", "quote", - "syn 2.0.115", + "syn 2.0.117", ] [[package]] @@ -3614,7 +3614,7 @@ checksum = "eadce39539ca5cb3985590102671f2567e659fca9666581ad3411d59207951f3" dependencies = [ "proc-macro2", "quote", - "syn 2.0.115", + "syn 2.0.117", ] [[package]] -- 2.49.1