refactor(server::client): migrated to new connection design

This commit is contained in:
hdbg
2026-03-18 22:40:07 +01:00
committed by Stas
parent d61dab3285
commit 2ff4d0961c
14 changed files with 474 additions and 401 deletions

View File

@@ -1,30 +1,25 @@
use arbiter_proto::{format_challenge, transport::expect_message};
use arbiter_proto::{
format_challenge,
transport::{Bi, expect_message},
};
use diesel::{
ExpressionMethods as _, OptionalExtension as _, QueryDsl as _, dsl::insert_into, update,
};
use diesel_async::RunQueryDsl as _;
use ed25519_dalek::VerifyingKey;
use ed25519_dalek::{Signature, VerifyingKey};
use kameo::error::SendError;
use tracing::error;
use crate::{
actors::{
client::{ClientConnection, ConnectErrorCode, Request, Response},
client::ClientConnection,
router::{self, RequestClientApproval},
},
db::{self, schema::program_client},
};
use super::session::ClientSession;
#[derive(thiserror::Error, Debug, Clone, PartialEq, Eq)]
pub enum Error {
#[error("Unexpected message payload")]
UnexpectedMessagePayload,
#[error("Invalid client public key length")]
InvalidClientPubkeyLength,
#[error("Invalid client public key encoding")]
InvalidAuthPubkeyEncoding,
#[error("Database pool unavailable")]
DatabasePoolUnavailable,
#[error("Database operation failed")]
@@ -33,8 +28,6 @@ pub enum Error {
InvalidChallengeSolution,
#[error("Client approval request failed")]
ApproveError(#[from] ApproveError),
#[error("Internal error")]
InternalError,
#[error("Transport error")]
Transport,
}
@@ -49,6 +42,18 @@ pub enum ApproveError {
Upstream(router::ApprovalError),
}
#[derive(Debug, Clone)]
pub enum Inbound {
AuthChallengeRequest { pubkey: VerifyingKey },
AuthChallengeSolution { signature: Signature },
}
#[derive(Debug, Clone)]
pub enum Outbound {
AuthChallenge { pubkey: VerifyingKey, nonce: i32 },
AuthSuccess,
}
/// Atomically reads and increments the nonce for a known client.
/// Returns `None` if the pubkey is not registered.
async fn get_nonce(db: &db::DatabasePool, pubkey: &VerifyingKey) -> Result<Option<i32>, Error> {
@@ -141,27 +146,24 @@ async fn insert_client(db: &db::DatabasePool, pubkey: &VerifyingKey) -> Result<(
Ok(())
}
async fn challenge_client(
props: &mut ClientConnection,
async fn challenge_client<T>(
transport: &mut T,
pubkey: VerifyingKey,
nonce: i32,
) -> Result<(), Error> {
let challenge_pubkey = pubkey.as_bytes().to_vec();
props
.transport
.send(Ok(Response::AuthChallenge {
pubkey: challenge_pubkey.clone(),
nonce,
}))
) -> Result<(), Error>
where
T: Bi<Inbound, Result<Outbound, Error>> + ?Sized,
{
transport
.send(Ok(Outbound::AuthChallenge { pubkey, nonce }))
.await
.map_err(|e| {
error!(error = ?e, "Failed to send auth challenge");
Error::Transport
})?;
let signature = expect_message(&mut *props.transport, |req: Request| match req {
Request::AuthChallengeSolution { signature } => Some(signature),
let signature = expect_message(transport, |req: Inbound| match req {
Inbound::AuthChallengeSolution { signature } => Some(signature),
_ => None,
})
.await
@@ -170,13 +172,9 @@ async fn challenge_client(
Error::Transport
})?;
let formatted = format_challenge(nonce, &challenge_pubkey);
let sig = signature.as_slice().try_into().map_err(|_| {
error!("Invalid signature length");
Error::InvalidChallengeSolution
})?;
let formatted = format_challenge(nonce, pubkey.as_bytes());
pubkey.verify_strict(&formatted, &sig).map_err(|_| {
pubkey.verify_strict(&formatted, &signature).map_err(|_| {
error!("Challenge solution verification failed");
Error::InvalidChallengeSolution
})?;
@@ -184,30 +182,17 @@ async fn challenge_client(
Ok(())
}
fn connect_error_code(err: &Error) -> ConnectErrorCode {
match err {
Error::ApproveError(ApproveError::Denied) => ConnectErrorCode::ApprovalDenied,
Error::ApproveError(ApproveError::Upstream(
router::ApprovalError::NoUserAgentsConnected,
)) => ConnectErrorCode::NoUserAgentsOnline,
_ => ConnectErrorCode::Unknown,
}
}
async fn authenticate(props: &mut ClientConnection) -> Result<VerifyingKey, Error> {
let Some(Request::AuthChallengeRequest {
pubkey: challenge_pubkey,
}) = props.transport.recv().await
else {
pub async fn authenticate<T>(
props: &mut ClientConnection,
transport: &mut T,
) -> Result<VerifyingKey, Error>
where
T: Bi<Inbound, Result<Outbound, Error>> + Send + ?Sized,
{
let Some(Inbound::AuthChallengeRequest { pubkey }) = transport.recv().await else {
return Err(Error::Transport);
};
let pubkey_bytes = challenge_pubkey
.as_array()
.ok_or(Error::InvalidClientPubkeyLength)?;
let pubkey =
VerifyingKey::from_bytes(pubkey_bytes).map_err(|_| Error::InvalidAuthPubkeyEncoding)?;
let nonce = match get_nonce(&props.db, &pubkey).await? {
Some(nonce) => nonce,
None => {
@@ -217,21 +202,14 @@ async fn authenticate(props: &mut ClientConnection) -> Result<VerifyingKey, Erro
}
};
challenge_client(props, pubkey, nonce).await?;
challenge_client(transport, pubkey, nonce).await?;
transport
.send(Ok(Outbound::AuthSuccess))
.await
.map_err(|e| {
error!(error = ?e, "Failed to send auth success");
Error::Transport
})?;
Ok(pubkey)
}
pub async fn authenticate_and_create(mut props: ClientConnection) -> Result<ClientSession, Error> {
match authenticate(&mut props).await {
Ok(_pubkey) => Ok(ClientSession::new(props)),
Err(err) => {
let code = connect_error_code(&err);
let _ = props
.transport
.send(Ok(Response::ClientConnectError { code }))
.await;
Err(err)
}
}
}

View File

@@ -7,68 +7,31 @@ use crate::{
db,
};
#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
pub enum ClientError {
#[error("Expected message with payload")]
MissingRequestPayload,
#[error("Unexpected request payload")]
UnexpectedRequestPayload,
#[error("State machine error")]
StateTransitionFailed,
#[error("Connection registration failed")]
ConnectionRegistrationFailed,
#[error(transparent)]
Auth(#[from] auth::Error),
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ConnectErrorCode {
Unknown,
ApprovalDenied,
NoUserAgentsOnline,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum Request {
AuthChallengeRequest { pubkey: Vec<u8> },
AuthChallengeSolution { signature: Vec<u8> },
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum Response {
AuthChallenge { pubkey: Vec<u8>, nonce: i32 },
AuthOk,
ClientConnectError { code: ConnectErrorCode },
}
pub type Transport = Box<dyn Bi<Request, Result<Response, ClientError>> + Send>;
pub struct ClientConnection {
pub(crate) db: db::DatabasePool,
pub(crate) transport: Transport,
pub(crate) actors: GlobalActors,
}
impl ClientConnection {
pub fn new(db: db::DatabasePool, transport: Transport, actors: GlobalActors) -> Self {
Self {
db,
transport,
actors,
}
pub fn new(db: db::DatabasePool, actors: GlobalActors) -> Self {
Self { db, actors }
}
}
pub mod auth;
pub mod session;
pub async fn connect_client(props: ClientConnection) {
match auth::authenticate_and_create(props).await {
Ok(session) => {
ClientSession::spawn(session);
pub async fn connect_client<T>(mut props: ClientConnection, transport: &mut T)
where
T: Bi<auth::Inbound, Result<auth::Outbound, auth::Error>> + Send + ?Sized,
{
match auth::authenticate(&mut props, transport).await {
Ok(_pubkey) => {
ClientSession::spawn(ClientSession::new(props));
info!("Client authenticated, session started");
}
Err(err) => {
let _ = transport.send(Err(err.clone())).await;
error!(?err, "Authentication failed, closing connection");
}
}

View File

@@ -1,12 +1,9 @@
use kameo::Actor;
use tokio::select;
use tracing::{error, info};
use kameo::{Actor, messages};
use tracing::error;
use crate::{
actors::{
GlobalActors,
client::{ClientConnection, ClientError, Request, Response},
router::RegisterClient,
GlobalActors, client::ClientConnection, keyholder::KeyHolderState, router::RegisterClient,
},
db,
};
@@ -19,19 +16,30 @@ impl ClientSession {
pub(crate) fn new(props: ClientConnection) -> Self {
Self { props }
}
pub async fn process_transport_inbound(&mut self, req: Request) -> Output {
let _ = req;
Err(ClientError::UnexpectedRequestPayload)
}
}
type Output = Result<Response, ClientError>;
#[messages]
impl ClientSession {
#[message]
pub(crate) async fn handle_query_vault_state(&mut self) -> Result<KeyHolderState, Error> {
use crate::actors::keyholder::GetState;
let vault_state = match self.props.actors.key_holder.ask(GetState {}).await {
Ok(state) => state,
Err(err) => {
error!(?err, actor = "client", "keyholder.query.failed");
return Err(Error::Internal);
}
};
Ok(vault_state)
}
}
impl Actor for ClientSession {
type Args = Self;
type Error = ClientError;
type Error = Error;
async fn on_start(
args: Self::Args,
@@ -42,52 +50,22 @@ impl Actor for ClientSession {
.router
.ask(RegisterClient { actor: this })
.await
.map_err(|_| ClientError::ConnectionRegistrationFailed)?;
.map_err(|_| Error::ConnectionRegistrationFailed)?;
Ok(args)
}
async fn next(
&mut self,
_actor_ref: kameo::prelude::WeakActorRef<Self>,
mailbox_rx: &mut kameo::prelude::MailboxReceiver<Self>,
) -> Option<kameo::mailbox::Signal<Self>> {
loop {
select! {
signal = mailbox_rx.recv() => {
return signal;
}
msg = self.props.transport.recv() => {
match msg {
Some(request) => {
match self.process_transport_inbound(request).await {
Ok(resp) => {
if self.props.transport.send(Ok(resp)).await.is_err() {
error!(actor = "client", reason = "channel closed", "send.failed");
return Some(kameo::mailbox::Signal::Stop);
}
}
Err(err) => {
let _ = self.props.transport.send(Err(err)).await;
return Some(kameo::mailbox::Signal::Stop);
}
}
}
None => {
info!(actor = "client", "transport.closed");
return Some(kameo::mailbox::Signal::Stop);
}
}
}
}
}
}
}
impl ClientSession {
pub fn new_test(db: db::DatabasePool, actors: GlobalActors) -> Self {
use arbiter_proto::transport::DummyTransport;
let transport: super::Transport = Box::new(DummyTransport::new());
let props = ClientConnection::new(db, transport, actors);
let props = ClientConnection::new(db, actors);
Self { props }
}
}
#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("Connection registration failed")]
ConnectionRegistrationFailed,
#[error("Internal error")]
Internal,
}

View File

@@ -106,7 +106,7 @@ pub struct AuthContext<'a, T> {
}
impl<'a, T> AuthContext<'a, T> {
pub fn new(conn: &'a mut UserAgentConnection, transport: T) -> Self {
pub fn new(conn: &'a mut UserAgentConnection, transport: T) -> Self {
Self { conn, transport }
}
}
@@ -124,8 +124,7 @@ where
let stored_bytes = pubkey.to_stored_bytes();
let nonce = create_nonce(&self.conn.db, &stored_bytes).await?;
self
.transport
self.transport
.send(Ok(Outbound::AuthChallenge { nonce }))
.await
.map_err(|e| {
@@ -165,8 +164,7 @@ where
register_key(&self.conn.db, &pubkey).await?;
self
.transport
self.transport
.send(Ok(Outbound::AuthSuccess))
.await
.map_err(|_| Error::Transport)?;
@@ -214,8 +212,7 @@ where
};
if valid {
self
.transport
self.transport
.send(Ok(Outbound::AuthSuccess))
.await
.map_err(|_| Error::Transport)?;