diff --git a/server/Cargo.lock b/server/Cargo.lock index fc8e5b0..f6ab9b4 100644 --- a/server/Cargo.lock +++ b/server/Cargo.lock @@ -123,9 +123,13 @@ version = "0.1.0" dependencies = [ "arbiter-proto", "ed25519-dalek", + "http", "kameo", + "rustls-webpki", "smlang", + "thiserror", "tokio", + "tokio-stream", "tonic", "tracing", "x25519-dalek", diff --git a/server/crates/arbiter-server/src/lib.rs b/server/crates/arbiter-server/src/lib.rs index 7fde954..fe62c48 100644 --- a/server/crates/arbiter-server/src/lib.rs +++ b/server/crates/arbiter-server/src/lib.rs @@ -22,18 +22,16 @@ pub mod db; const DEFAULT_CHANNEL_SIZE: usize = 1000; - /// Converts User Agent domain outbounds into the tonic stream item emitted by -/// the server. +/// the server.ยง /// /// The conversion is defined at the server boundary so the actor module remains /// focused on domain semantics and does not depend on tonic status encoding. struct UserAgentGrpcSender; - impl SendConverter for UserAgentGrpcSender { - type Input = Result; - type Output = Result; + type Input = Result; + type Output = Result; fn convert(&self, item: Self::Input) -> Self::Output { match item { diff --git a/server/crates/arbiter-useragent/Cargo.toml b/server/crates/arbiter-useragent/Cargo.toml index 16eb12d..de46f67 100644 --- a/server/crates/arbiter-useragent/Cargo.toml +++ b/server/crates/arbiter-useragent/Cargo.toml @@ -9,7 +9,12 @@ arbiter-proto.path = "../arbiter-proto" kameo.workspace = true tokio = {workspace = true, features = ["net"]} tonic.workspace = true +tonic.features = ["tls-aws-lc"] tracing.workspace = true ed25519-dalek.workspace = true smlang.workspace = true -x25519-dalek.workspace = true \ No newline at end of file +x25519-dalek.workspace = true +thiserror.workspace = true +tokio-stream.workspace = true +http = "1.4.0" +rustls-webpki = { version = "0.103.9", features = ["aws-lc-rs"] } diff --git a/server/crates/arbiter-useragent/src/grpc.rs b/server/crates/arbiter-useragent/src/grpc.rs new file mode 100644 index 0000000..ef523a9 --- /dev/null +++ b/server/crates/arbiter-useragent/src/grpc.rs @@ -0,0 +1,90 @@ +use arbiter_proto::{ + proto::{ + UserAgentRequest, UserAgentResponse, arbiter_service_client::ArbiterServiceClient, + }, + transport::{RecvConverter, IdentitySendConverter, grpc}, + url::ArbiterUrl, +}; +use ed25519_dalek::SigningKey; +use kameo::actor::{ActorRef, Spawn}; + +use tokio::sync::mpsc; +use tokio_stream::wrappers::ReceiverStream; + +use tonic::transport::ClientTlsConfig; + + +#[derive(Debug, thiserror::Error)] +pub enum InitError { + #[error("Could establish connection")] + Connection(#[from] tonic::transport::Error), + + #[error("Invalid server URI")] + InvalidUri(#[from] http::uri::InvalidUri), + + #[error("Invalid CA certificate")] + InvalidCaCert(#[from] webpki::Error), + + #[error("gRPC error")] + Grpc(#[from] tonic::Status), +} + +pub struct InboundConverter; +impl RecvConverter for InboundConverter { + type Input = UserAgentResponse; + type Output = Result; + + fn convert(&self, item: Self::Input) -> Self::Output { + Ok(item) + } +} + +use crate::InboundError; + +use super::UserAgentActor; + +pub type UserAgentGrpc = ActorRef< + UserAgentActor< + grpc::GrpcAdapter< + UserAgentResponse, + Result, + InboundConverter, + IdentitySendConverter, + >, + >, +>; +pub async fn connect_grpc( + url: ArbiterUrl, + key: SigningKey, +) -> Result { + let bootstrap_token = url.bootstrap_token.clone(); + let anchor = webpki::anchor_from_trusted_cert(&url.ca_cert)?.to_owned(); + let tls = ClientTlsConfig::new().trust_anchor(anchor); + + // TODO: if `host` is localhost, we need to verify server's process authenticity + let channel = tonic::transport::Channel::from_shared(format!("{}:{}", url.host, url.port))? + .tls_config(tls)? + .connect() + .await?; + + let mut client = ArbiterServiceClient::new(channel); + let (tx, rx) = mpsc::channel(16); + let bistream = client.user_agent(ReceiverStream::new(rx)).await?; + let bistream = bistream.into_inner(); + + let adapter = grpc::GrpcAdapter::new( + tx, + bistream, + InboundConverter, + IdentitySendConverter::new(), + ); + + let actor = UserAgentActor::spawn(UserAgentActor { + key, + bootstrap_token, + state: super::UserAgentStateMachine::new(super::DummyContext), + transport: adapter, + }); + + Ok(actor) +} diff --git a/server/crates/arbiter-useragent/src/lib.rs b/server/crates/arbiter-useragent/src/lib.rs index c4da0d0..fbdec08 100644 --- a/server/crates/arbiter-useragent/src/lib.rs +++ b/server/crates/arbiter-useragent/src/lib.rs @@ -1,13 +1,206 @@ -use ed25519_dalek::SigningKey; -use kameo::Actor; -use tonic::transport::CertificateDer; +use arbiter_proto::{ + format_challenge, + proto::{ + UserAgentRequest, UserAgentResponse, + auth::{ + self, AuthChallengeRequest, AuthOk, ClientMessage as AuthClientMessage, + ServerMessage as AuthServerMessage, client_message::Payload as ClientAuthPayload, + server_message::Payload as ServerAuthPayload, + }, + user_agent_request::Payload as UserAgentRequestPayload, + user_agent_response::Payload as UserAgentResponsePayload, + }, + transport::Bi, +}; +use ed25519_dalek::{Signer, SigningKey}; +use kameo::{ + Actor, + actor::{ActorRef, Spawn}, + prelude::Message, +}; +use smlang::statemachine; +use tokio::select; +use tracing::{error, info}; -struct Storage { - pub identity: SigningKey, - pub server_ca_cert: CertificateDer<'static>, +#[derive(Debug, thiserror::Error)] +pub enum InboundError { + #[error("Invalid user agent response")] + InvalidResponse, + #[error("Expected response payload")] + MissingResponsePayload, + #[error("Unexpected response payload")] + UnexpectedResponsePayload, + #[error("Invalid state for auth challenge")] + InvalidStateForAuthChallenge, + #[error("Invalid state for auth ok")] + InvalidStateForAuthOk, + #[error("State machine error")] + StateTransitionFailed, + #[error("Transport send failed")] + TransportSendFailed, } -#[derive(Actor)] -pub struct UserAgent { +statemachine! { + name: UserAgent, + custom_error: false, + transitions: { + *Init + SentAuthChallengeRequest = WaitingForServerAuth, + WaitingForServerAuth + ReceivedAuthChallenge = WaitingForAuthOk, + WaitingForServerAuth + ReceivedAuthOk = Authenticated, + WaitingForAuthOk + ReceivedAuthOk = Authenticated, + } +} -} \ No newline at end of file +pub struct DummyContext; +impl UserAgentStateMachineContext for DummyContext {} + +pub struct UserAgentActor +where + Transport: Bi, UserAgentRequest>, +{ + key: SigningKey, + bootstrap_token: Option, + state: UserAgentStateMachine, + transport: Transport, +} + +impl UserAgentActor +where + Transport: Bi, UserAgentRequest>, +{ + fn transition(&mut self, event: UserAgentEvents) -> Result<(), InboundError> { + self.state.process_event(event).map_err(|e| { + error!(?e, "useragent state transition failed"); + InboundError::StateTransitionFailed + })?; + Ok(()) + } + + fn auth_request(payload: ClientAuthPayload) -> UserAgentRequest { + UserAgentRequest { + payload: Some(UserAgentRequestPayload::AuthMessage(AuthClientMessage { + payload: Some(payload), + })), + } + } + + async fn send_auth_challenge_request(&mut self) -> Result<(), InboundError> { + let req = AuthChallengeRequest { + pubkey: self.key.verifying_key().to_bytes().to_vec(), + bootstrap_token: self.bootstrap_token.take(), + }; + + self.transport + .send(Self::auth_request(ClientAuthPayload::AuthChallengeRequest(req))) + .await + .map_err(|_| InboundError::TransportSendFailed)?; + self.transition(UserAgentEvents::SentAuthChallengeRequest)?; + info!(actor = "useragent", "auth.request.sent"); + Ok(()) + } + + async fn handle_auth_challenge( + &mut self, + challenge: auth::AuthChallenge, + ) -> Result<(), InboundError> { + if !matches!(self.state.state(), UserAgentStates::WaitingForServerAuth) { + return Err(InboundError::InvalidStateForAuthChallenge); + } + + self.transition(UserAgentEvents::ReceivedAuthChallenge)?; + + let formatted = format_challenge(&challenge); + let signature = self.key.sign(&formatted); + let solution = auth::AuthChallengeSolution { + signature: signature.to_bytes().to_vec(), + }; + + self.transport + .send(Self::auth_request(ClientAuthPayload::AuthChallengeSolution( + solution, + ))) + .await + .map_err(|_| InboundError::TransportSendFailed)?; + + info!(actor = "useragent", "auth.solution.sent"); + Ok(()) + } + + fn handle_auth_ok(&mut self, _ok: AuthOk) -> Result<(), InboundError> { + match self.state.state() { + UserAgentStates::WaitingForServerAuth | UserAgentStates::WaitingForAuthOk => { + self.transition(UserAgentEvents::ReceivedAuthOk)?; + info!(actor = "useragent", "auth.ok"); + Ok(()) + } + _ => Err(InboundError::InvalidStateForAuthOk), + } + } + + pub async fn process_inbound_transport( + &mut self, + inbound: Result, + ) -> Result<(), InboundError> { + let response = inbound?; + let payload = response + .payload + .ok_or(InboundError::MissingResponsePayload)?; + + match payload { + UserAgentResponsePayload::AuthMessage(AuthServerMessage { + payload: Some(ServerAuthPayload::AuthChallenge(challenge)), + }) => self.handle_auth_challenge(challenge).await, + UserAgentResponsePayload::AuthMessage(AuthServerMessage { + payload: Some(ServerAuthPayload::AuthOk(ok)), + }) => self.handle_auth_ok(ok), + _ => Err(InboundError::UnexpectedResponsePayload), + } + } +} + +impl Actor for UserAgentActor +where + Transport: Bi, UserAgentRequest>, +{ + type Args = Self; + + type Error = (); + + async fn on_start(mut args: Self::Args, _actor_ref: ActorRef) -> Result { + if let Err(err) = args.send_auth_challenge_request().await { + error!(?err, actor = "useragent", "auth.start.failed"); + return Err(()); + } + Ok(args) + } + + async fn next( + &mut self, + _actor_ref: kameo::prelude::WeakActorRef, + mailbox_rx: &mut kameo::prelude::MailboxReceiver, + ) -> Option> { + loop { + select! { + signal = mailbox_rx.recv() => { + return signal; + } + inbound = self.transport.recv() => { + match inbound { + Some(inbound) => { + if let Err(err) = self.process_inbound_transport(inbound).await { + error!(?err, actor = "useragent", "transport.inbound.failed"); + return Some(kameo::mailbox::Signal::Stop); + } + } + None => { + info!(actor = "useragent", "transport.closed"); + return Some(kameo::mailbox::Signal::Stop); + } + } + } + } + } + } +} + +mod grpc;