feat(useragent): initial connection impl

This commit is contained in:
hdbg
2026-02-26 15:44:48 +01:00
parent f5a5c62181
commit 6deec731e2
5 changed files with 305 additions and 15 deletions

4
server/Cargo.lock generated
View File

@@ -123,9 +123,13 @@ version = "0.1.0"
dependencies = [
"arbiter-proto",
"ed25519-dalek",
"http",
"kameo",
"rustls-webpki",
"smlang",
"thiserror",
"tokio",
"tokio-stream",
"tonic",
"tracing",
"x25519-dalek",

View File

@@ -22,18 +22,16 @@ pub mod db;
const DEFAULT_CHANNEL_SIZE: usize = 1000;
/// Converts User Agent domain outbounds into the tonic stream item emitted by
/// the server.
/// the server.§
///
/// The conversion is defined at the server boundary so the actor module remains
/// focused on domain semantics and does not depend on tonic status encoding.
struct UserAgentGrpcSender;
impl SendConverter for UserAgentGrpcSender {
type Input = Result<UserAgentResponse, UserAgentError>;
type Output = Result<UserAgentResponse, Status>;
type Input = Result<UserAgentResponse, UserAgentError>;
type Output = Result<UserAgentResponse, Status>;
fn convert(&self, item: Self::Input) -> Self::Output {
match item {

View File

@@ -9,7 +9,12 @@ arbiter-proto.path = "../arbiter-proto"
kameo.workspace = true
tokio = {workspace = true, features = ["net"]}
tonic.workspace = true
tonic.features = ["tls-aws-lc"]
tracing.workspace = true
ed25519-dalek.workspace = true
smlang.workspace = true
x25519-dalek.workspace = true
x25519-dalek.workspace = true
thiserror.workspace = true
tokio-stream.workspace = true
http = "1.4.0"
rustls-webpki = { version = "0.103.9", features = ["aws-lc-rs"] }

View File

@@ -0,0 +1,90 @@
use arbiter_proto::{
proto::{
UserAgentRequest, UserAgentResponse, arbiter_service_client::ArbiterServiceClient,
},
transport::{RecvConverter, IdentitySendConverter, grpc},
url::ArbiterUrl,
};
use ed25519_dalek::SigningKey;
use kameo::actor::{ActorRef, Spawn};
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use tonic::transport::ClientTlsConfig;
#[derive(Debug, thiserror::Error)]
pub enum InitError {
#[error("Could establish connection")]
Connection(#[from] tonic::transport::Error),
#[error("Invalid server URI")]
InvalidUri(#[from] http::uri::InvalidUri),
#[error("Invalid CA certificate")]
InvalidCaCert(#[from] webpki::Error),
#[error("gRPC error")]
Grpc(#[from] tonic::Status),
}
pub struct InboundConverter;
impl RecvConverter for InboundConverter {
type Input = UserAgentResponse;
type Output = Result<UserAgentResponse, InboundError>;
fn convert(&self, item: Self::Input) -> Self::Output {
Ok(item)
}
}
use crate::InboundError;
use super::UserAgentActor;
pub type UserAgentGrpc = ActorRef<
UserAgentActor<
grpc::GrpcAdapter<
UserAgentResponse,
Result<UserAgentResponse, InboundError>,
InboundConverter,
IdentitySendConverter<UserAgentRequest>,
>,
>,
>;
pub async fn connect_grpc(
url: ArbiterUrl,
key: SigningKey,
) -> Result<UserAgentGrpc, InitError> {
let bootstrap_token = url.bootstrap_token.clone();
let anchor = webpki::anchor_from_trusted_cert(&url.ca_cert)?.to_owned();
let tls = ClientTlsConfig::new().trust_anchor(anchor);
// TODO: if `host` is localhost, we need to verify server's process authenticity
let channel = tonic::transport::Channel::from_shared(format!("{}:{}", url.host, url.port))?
.tls_config(tls)?
.connect()
.await?;
let mut client = ArbiterServiceClient::new(channel);
let (tx, rx) = mpsc::channel(16);
let bistream = client.user_agent(ReceiverStream::new(rx)).await?;
let bistream = bistream.into_inner();
let adapter = grpc::GrpcAdapter::new(
tx,
bistream,
InboundConverter,
IdentitySendConverter::new(),
);
let actor = UserAgentActor::spawn(UserAgentActor {
key,
bootstrap_token,
state: super::UserAgentStateMachine::new(super::DummyContext),
transport: adapter,
});
Ok(actor)
}

View File

@@ -1,13 +1,206 @@
use ed25519_dalek::SigningKey;
use kameo::Actor;
use tonic::transport::CertificateDer;
use arbiter_proto::{
format_challenge,
proto::{
UserAgentRequest, UserAgentResponse,
auth::{
self, AuthChallengeRequest, AuthOk, ClientMessage as AuthClientMessage,
ServerMessage as AuthServerMessage, client_message::Payload as ClientAuthPayload,
server_message::Payload as ServerAuthPayload,
},
user_agent_request::Payload as UserAgentRequestPayload,
user_agent_response::Payload as UserAgentResponsePayload,
},
transport::Bi,
};
use ed25519_dalek::{Signer, SigningKey};
use kameo::{
Actor,
actor::{ActorRef, Spawn},
prelude::Message,
};
use smlang::statemachine;
use tokio::select;
use tracing::{error, info};
struct Storage {
pub identity: SigningKey,
pub server_ca_cert: CertificateDer<'static>,
#[derive(Debug, thiserror::Error)]
pub enum InboundError {
#[error("Invalid user agent response")]
InvalidResponse,
#[error("Expected response payload")]
MissingResponsePayload,
#[error("Unexpected response payload")]
UnexpectedResponsePayload,
#[error("Invalid state for auth challenge")]
InvalidStateForAuthChallenge,
#[error("Invalid state for auth ok")]
InvalidStateForAuthOk,
#[error("State machine error")]
StateTransitionFailed,
#[error("Transport send failed")]
TransportSendFailed,
}
#[derive(Actor)]
pub struct UserAgent {
statemachine! {
name: UserAgent,
custom_error: false,
transitions: {
*Init + SentAuthChallengeRequest = WaitingForServerAuth,
WaitingForServerAuth + ReceivedAuthChallenge = WaitingForAuthOk,
WaitingForServerAuth + ReceivedAuthOk = Authenticated,
WaitingForAuthOk + ReceivedAuthOk = Authenticated,
}
}
}
pub struct DummyContext;
impl UserAgentStateMachineContext for DummyContext {}
pub struct UserAgentActor<Transport>
where
Transport: Bi<Result<UserAgentResponse, InboundError>, UserAgentRequest>,
{
key: SigningKey,
bootstrap_token: Option<String>,
state: UserAgentStateMachine<DummyContext>,
transport: Transport,
}
impl<Transport> UserAgentActor<Transport>
where
Transport: Bi<Result<UserAgentResponse, InboundError>, UserAgentRequest>,
{
fn transition(&mut self, event: UserAgentEvents) -> Result<(), InboundError> {
self.state.process_event(event).map_err(|e| {
error!(?e, "useragent state transition failed");
InboundError::StateTransitionFailed
})?;
Ok(())
}
fn auth_request(payload: ClientAuthPayload) -> UserAgentRequest {
UserAgentRequest {
payload: Some(UserAgentRequestPayload::AuthMessage(AuthClientMessage {
payload: Some(payload),
})),
}
}
async fn send_auth_challenge_request(&mut self) -> Result<(), InboundError> {
let req = AuthChallengeRequest {
pubkey: self.key.verifying_key().to_bytes().to_vec(),
bootstrap_token: self.bootstrap_token.take(),
};
self.transport
.send(Self::auth_request(ClientAuthPayload::AuthChallengeRequest(req)))
.await
.map_err(|_| InboundError::TransportSendFailed)?;
self.transition(UserAgentEvents::SentAuthChallengeRequest)?;
info!(actor = "useragent", "auth.request.sent");
Ok(())
}
async fn handle_auth_challenge(
&mut self,
challenge: auth::AuthChallenge,
) -> Result<(), InboundError> {
if !matches!(self.state.state(), UserAgentStates::WaitingForServerAuth) {
return Err(InboundError::InvalidStateForAuthChallenge);
}
self.transition(UserAgentEvents::ReceivedAuthChallenge)?;
let formatted = format_challenge(&challenge);
let signature = self.key.sign(&formatted);
let solution = auth::AuthChallengeSolution {
signature: signature.to_bytes().to_vec(),
};
self.transport
.send(Self::auth_request(ClientAuthPayload::AuthChallengeSolution(
solution,
)))
.await
.map_err(|_| InboundError::TransportSendFailed)?;
info!(actor = "useragent", "auth.solution.sent");
Ok(())
}
fn handle_auth_ok(&mut self, _ok: AuthOk) -> Result<(), InboundError> {
match self.state.state() {
UserAgentStates::WaitingForServerAuth | UserAgentStates::WaitingForAuthOk => {
self.transition(UserAgentEvents::ReceivedAuthOk)?;
info!(actor = "useragent", "auth.ok");
Ok(())
}
_ => Err(InboundError::InvalidStateForAuthOk),
}
}
pub async fn process_inbound_transport(
&mut self,
inbound: Result<UserAgentResponse, InboundError>,
) -> Result<(), InboundError> {
let response = inbound?;
let payload = response
.payload
.ok_or(InboundError::MissingResponsePayload)?;
match payload {
UserAgentResponsePayload::AuthMessage(AuthServerMessage {
payload: Some(ServerAuthPayload::AuthChallenge(challenge)),
}) => self.handle_auth_challenge(challenge).await,
UserAgentResponsePayload::AuthMessage(AuthServerMessage {
payload: Some(ServerAuthPayload::AuthOk(ok)),
}) => self.handle_auth_ok(ok),
_ => Err(InboundError::UnexpectedResponsePayload),
}
}
}
impl<Transport> Actor for UserAgentActor<Transport>
where
Transport: Bi<Result<UserAgentResponse, InboundError>, UserAgentRequest>,
{
type Args = Self;
type Error = ();
async fn on_start(mut args: Self::Args, _actor_ref: ActorRef<Self>) -> Result<Self, Self::Error> {
if let Err(err) = args.send_auth_challenge_request().await {
error!(?err, actor = "useragent", "auth.start.failed");
return Err(());
}
Ok(args)
}
async fn next(
&mut self,
_actor_ref: kameo::prelude::WeakActorRef<Self>,
mailbox_rx: &mut kameo::prelude::MailboxReceiver<Self>,
) -> Option<kameo::mailbox::Signal<Self>> {
loop {
select! {
signal = mailbox_rx.recv() => {
return signal;
}
inbound = self.transport.recv() => {
match inbound {
Some(inbound) => {
if let Err(err) = self.process_inbound_transport(inbound).await {
error!(?err, actor = "useragent", "transport.inbound.failed");
return Some(kameo::mailbox::Signal::Stop);
}
}
None => {
info!(actor = "useragent", "transport.closed");
return Some(kameo::mailbox::Signal::Stop);
}
}
}
}
}
}
}
mod grpc;