refactor(server::client::auth): removed state machine and added approval flow coordination
Some checks failed
ci/woodpecker/pr/server-audit Pipeline was successful
ci/woodpecker/pr/server-vet Pipeline failed
ci/woodpecker/pr/server-lint Pipeline failed
ci/woodpecker/pr/server-test Pipeline was successful

This commit is contained in:
hdbg
2026-03-11 20:18:06 +01:00
parent 606a1f3774
commit ba86d18250
9 changed files with 329 additions and 256 deletions

View File

@@ -0,0 +1,251 @@
use arbiter_proto::{
format_challenge,
proto::client::{
AuthChallenge, AuthChallengeSolution, ClientConnectError, ClientRequest, ClientResponse,
client_connect_error::Code as ConnectErrorCode,
client_request::Payload as ClientRequestPayload,
client_response::Payload as ClientResponsePayload,
},
transport::expect_message,
};
use diesel::{
ExpressionMethods as _, OptionalExtension as _, QueryDsl as _, dsl::insert_into, update,
};
use diesel_async::RunQueryDsl as _;
use ed25519_dalek::VerifyingKey;
use kameo::error::SendError;
use tracing::error;
use crate::{
actors::{client::ClientConnection, router::{self, RequestClientApproval}},
db::{self, schema::program_client},
};
use super::session::ClientSession;
#[derive(thiserror::Error, Debug, Clone, PartialEq, Eq)]
pub enum Error {
#[error("Unexpected message payload")]
UnexpectedMessagePayload,
#[error("Invalid client public key length")]
InvalidClientPubkeyLength,
#[error("Invalid client public key encoding")]
InvalidAuthPubkeyEncoding,
#[error("Database pool unavailable")]
DatabasePoolUnavailable,
#[error("Database operation failed")]
DatabaseOperationFailed,
#[error("Invalid challenge solution")]
InvalidChallengeSolution,
#[error("Client approval request failed")]
ApproveError(#[from] ApproveError),
#[error("Internal error")]
InternalError,
#[error("Transport error")]
Transport,
}
#[derive(thiserror::Error, Debug, Clone, PartialEq, Eq)]
pub enum ApproveError {
#[error("Internal error")]
Internal,
#[error("Client connection denied by user agents")]
Denied,
#[error("Upstream error: {0}")]
Upstream(router::ApprovalError),
}
/// Atomically reads and increments the nonce for a known client.
/// Returns `None` if the pubkey is not registered.
async fn get_nonce(db: &db::DatabasePool, pubkey: &VerifyingKey) -> Result<Option<i32>, Error> {
let pubkey_bytes = pubkey.as_bytes().to_vec();
let mut conn = db.get().await.map_err(|e| {
error!(error = ?e, "Database pool error");
Error::DatabasePoolUnavailable
})?;
conn.exclusive_transaction(|conn| {
let pubkey_bytes = pubkey_bytes.clone();
Box::pin(async move {
let Some(current_nonce) = program_client::table
.filter(program_client::public_key.eq(&pubkey_bytes))
.select(program_client::nonce)
.first::<i32>(conn)
.await
.optional()?
else {
return Result::<_, diesel::result::Error>::Ok(None);
};
update(program_client::table)
.filter(program_client::public_key.eq(&pubkey_bytes))
.set(program_client::nonce.eq(current_nonce + 1))
.execute(conn)
.await?;
Ok(Some(current_nonce))
})
})
.await
.map_err(|e| {
error!(error = ?e, "Database error");
Error::DatabaseOperationFailed
})
}
async fn approve_new_client(
actors: &crate::actors::GlobalActors,
pubkey: VerifyingKey,
) -> Result<(), Error> {
let result = actors
.router
.ask(RequestClientApproval { client_pubkey: pubkey })
.await;
match result {
Ok(true) => Ok(()),
Ok(false) => Err(Error::ApproveError(ApproveError::Denied)),
Err(SendError::HandlerError(e)) => {
error!(error = ?e, "Approval upstream error");
Err(Error::ApproveError(ApproveError::Upstream(e)))
}
Err(e) => {
error!(error = ?e, "Approval request to router failed");
Err(Error::ApproveError(ApproveError::Internal))
}
}
}
async fn insert_client(db: &db::DatabasePool, pubkey: &VerifyingKey) -> Result<(), Error> {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs() as i32;
let mut conn = db.get().await.map_err(|e| {
error!(error = ?e, "Database pool error");
Error::DatabasePoolUnavailable
})?;
insert_into(program_client::table)
.values((
program_client::public_key.eq(pubkey.as_bytes().to_vec()),
program_client::nonce.eq(1), // pre-incremented; challenge uses 0
program_client::created_at.eq(now),
program_client::updated_at.eq(now),
))
.execute(&mut conn)
.await
.map_err(|e| {
error!(error = ?e, "Failed to insert new client");
Error::DatabaseOperationFailed
})?;
Ok(())
}
async fn challenge_client(
props: &mut ClientConnection,
pubkey: VerifyingKey,
nonce: i32,
) -> Result<(), Error> {
let challenge = AuthChallenge {
pubkey: pubkey.as_bytes().to_vec(),
nonce,
};
props
.transport
.send(Ok(ClientResponse {
payload: Some(ClientResponsePayload::AuthChallenge(challenge.clone())),
}))
.await
.map_err(|e| {
error!(error = ?e, "Failed to send auth challenge");
Error::Transport
})?;
let AuthChallengeSolution { signature } = expect_message(
&mut *props.transport,
|req: ClientRequest| match req.payload? {
ClientRequestPayload::AuthChallengeSolution(s) => Some(s),
_ => None,
},
)
.await
.map_err(|e| {
error!(error = ?e, "Failed to receive challenge solution");
Error::Transport
})?;
let formatted = format_challenge(nonce, &challenge.pubkey);
let sig = signature.as_slice().try_into().map_err(|_| {
error!("Invalid signature length");
Error::InvalidChallengeSolution
})?;
pubkey.verify_strict(&formatted, &sig).map_err(|_| {
error!("Challenge solution verification failed");
Error::InvalidChallengeSolution
})?;
Ok(())
}
fn connect_error_code(err: &Error) -> ConnectErrorCode {
match err {
Error::ApproveError(ApproveError::Denied) => ConnectErrorCode::ApprovalDenied,
Error::ApproveError(ApproveError::Upstream(router::ApprovalError::NoUserAgentsConnected)) => {
ConnectErrorCode::NoUserAgentsOnline
}
_ => ConnectErrorCode::Unknown,
}
}
async fn authenticate(props: &mut ClientConnection) -> Result<VerifyingKey, Error> {
let Some(ClientRequest {
payload: Some(ClientRequestPayload::AuthChallengeRequest(challenge)),
}) = props.transport.recv().await
else {
return Err(Error::Transport);
};
let pubkey_bytes = challenge
.pubkey
.as_array()
.ok_or(Error::InvalidClientPubkeyLength)?;
let pubkey =
VerifyingKey::from_bytes(pubkey_bytes).map_err(|_| Error::InvalidAuthPubkeyEncoding)?;
let nonce = match get_nonce(&props.db, &pubkey).await? {
Some(nonce) => nonce,
None => {
approve_new_client(&props.actors, pubkey).await?;
insert_client(&props.db, &pubkey).await?;
0
}
};
challenge_client(props, pubkey, nonce).await?;
Ok(pubkey)
}
pub async fn authenticate_and_create(mut props: ClientConnection) -> Result<ClientSession, Error> {
match authenticate(&mut props).await {
Ok(pubkey) => Ok(ClientSession::new(props, pubkey)),
Err(err) => {
let code = connect_error_code(&err);
let _ = props
.transport
.send(Ok(ClientResponse {
payload: Some(ClientResponsePayload::ClientConnectError(
ClientConnectError { code: code.into() },
)),
}))
.await;
Err(err)
}
}
}

View File

@@ -1,101 +0,0 @@
use arbiter_proto::proto::client::{
AuthChallengeRequest, AuthChallengeSolution, ClientRequest,
client_request::Payload as ClientRequestPayload,
};
use ed25519_dalek::VerifyingKey;
use tracing::error;
use crate::actors::client::{
ClientConnection,
auth::state::{AuthContext, AuthStateMachine},
session::ClientSession,
};
#[derive(thiserror::Error, Debug, Clone, PartialEq, Eq)]
pub enum Error {
#[error("Unexpected message payload")]
UnexpectedMessagePayload,
#[error("Invalid client public key length")]
InvalidClientPubkeyLength,
#[error("Invalid client public key encoding")]
InvalidAuthPubkeyEncoding,
#[error("Database pool unavailable")]
DatabasePoolUnavailable,
#[error("Database operation failed")]
DatabaseOperationFailed,
#[error("Public key not registered")]
PublicKeyNotRegistered,
#[error("Invalid signature length")]
InvalidSignatureLength,
#[error("Invalid challenge solution")]
InvalidChallengeSolution,
#[error("Transport error")]
Transport,
}
mod state;
use state::*;
fn parse_auth_event(payload: ClientRequestPayload) -> Result<AuthEvents, Error> {
match payload {
ClientRequestPayload::AuthChallengeRequest(AuthChallengeRequest { pubkey }) => {
let pubkey_bytes = pubkey.as_array().ok_or(Error::InvalidClientPubkeyLength)?;
let pubkey = VerifyingKey::from_bytes(pubkey_bytes)
.map_err(|_| Error::InvalidAuthPubkeyEncoding)?;
Ok(AuthEvents::AuthRequest(ChallengeRequest {
pubkey: pubkey.into(),
}))
}
ClientRequestPayload::AuthChallengeSolution(AuthChallengeSolution { signature }) => {
Ok(AuthEvents::ReceivedSolution(ChallengeSolution {
solution: signature,
}))
}
}
}
pub async fn authenticate(props: &mut ClientConnection) -> Result<VerifyingKey, Error> {
let mut state = AuthStateMachine::new(AuthContext::new(props));
loop {
let transport = state.context_mut().conn.transport.as_mut();
let Some(ClientRequest {
payload: Some(payload),
}) = transport.recv().await
else {
return Err(Error::Transport);
};
let event = parse_auth_event(payload)?;
match state.process_event(event).await {
Ok(AuthStates::AuthOk(key)) => return Ok(key.clone()),
Err(AuthError::ActionFailed(err)) => {
error!(?err, "State machine action failed");
return Err(err);
}
Err(AuthError::GuardFailed(err)) => {
error!(?err, "State machine guard failed");
return Err(err);
}
Err(AuthError::InvalidEvent) => {
error!("Invalid event for current state");
return Err(Error::InvalidChallengeSolution);
}
Err(AuthError::TransitionsFailed) => {
error!("Invalid state transition");
return Err(Error::InvalidChallengeSolution);
}
_ => (),
}
}
}
pub async fn authenticate_and_create(
mut props: ClientConnection,
) -> Result<ClientSession, Error> {
let key = authenticate(&mut props).await?;
let session = ClientSession::new(props, key);
Ok(session)
}

View File

@@ -1,136 +0,0 @@
use arbiter_proto::proto::client::{
AuthChallenge, ClientResponse,
client_response::Payload as ClientResponsePayload,
};
use diesel::{ExpressionMethods as _, OptionalExtension as _, QueryDsl, update};
use diesel_async::RunQueryDsl;
use ed25519_dalek::VerifyingKey;
use tracing::error;
use super::Error;
use crate::{actors::client::ClientConnection, db::schema};
pub struct ChallengeRequest {
pub pubkey: VerifyingKey,
}
pub struct ChallengeContext {
pub challenge: AuthChallenge,
pub key: VerifyingKey,
}
pub struct ChallengeSolution {
pub solution: Vec<u8>,
}
smlang::statemachine!(
name: Auth,
custom_error: true,
transitions: {
*Init + AuthRequest(ChallengeRequest) / async prepare_challenge = SentChallenge(ChallengeContext),
SentChallenge(ChallengeContext) + ReceivedSolution(ChallengeSolution) [async verify_solution] / provide_key = AuthOk(VerifyingKey),
}
);
async fn create_nonce(db: &crate::db::DatabasePool, pubkey_bytes: &[u8]) -> Result<i32, Error> {
let mut db_conn = db.get().await.map_err(|e| {
error!(error = ?e, "Database pool error");
Error::DatabasePoolUnavailable
})?;
db_conn
.exclusive_transaction(|conn| {
Box::pin(async move {
let current_nonce = schema::program_client::table
.filter(schema::program_client::public_key.eq(pubkey_bytes.to_vec()))
.select(schema::program_client::nonce)
.first::<i32>(conn)
.await?;
update(schema::program_client::table)
.filter(schema::program_client::public_key.eq(pubkey_bytes.to_vec()))
.set(schema::program_client::nonce.eq(current_nonce + 1))
.execute(conn)
.await?;
Result::<_, diesel::result::Error>::Ok(current_nonce)
})
})
.await
.optional()
.map_err(|e| {
error!(error = ?e, "Database error");
Error::DatabaseOperationFailed
})?
.ok_or_else(|| {
error!(?pubkey_bytes, "Public key not found in database");
Error::PublicKeyNotRegistered
})
}
pub struct AuthContext<'a> {
pub(super) conn: &'a mut ClientConnection,
}
impl<'a> AuthContext<'a> {
pub fn new(conn: &'a mut ClientConnection) -> Self {
Self { conn }
}
}
impl AuthStateMachineContext for AuthContext<'_> {
type Error = Error;
async fn verify_solution(
&self,
ChallengeContext { challenge, key }: &ChallengeContext,
ChallengeSolution { solution }: &ChallengeSolution,
) -> Result<bool, Self::Error> {
let formatted_challenge =
arbiter_proto::format_challenge(challenge.nonce, &challenge.pubkey);
let signature = solution.as_slice().try_into().map_err(|_| {
error!(?solution, "Invalid signature length");
Error::InvalidChallengeSolution
})?;
let valid = key.verify_strict(&formatted_challenge, &signature).is_ok();
Ok(valid)
}
async fn prepare_challenge(
&mut self,
ChallengeRequest { pubkey }: ChallengeRequest,
) -> Result<ChallengeContext, Self::Error> {
let nonce = create_nonce(&self.conn.db, pubkey.as_bytes()).await?;
let challenge = AuthChallenge {
pubkey: pubkey.as_bytes().to_vec(),
nonce,
};
self.conn
.transport
.send(Ok(ClientResponse {
payload: Some(ClientResponsePayload::AuthChallenge(challenge.clone())),
}))
.await
.map_err(|e| {
error!(?e, "Failed to send auth challenge");
Error::Transport
})?;
Ok(ChallengeContext {
challenge,
key: pubkey,
})
}
fn provide_key(
&mut self,
state_data: &ChallengeContext,
_: ChallengeSolution,
) -> Result<VerifyingKey, Self::Error> {
Ok(state_data.key)
}
}

View File

@@ -56,7 +56,7 @@ impl Actor for MessageRouter {
}
}
#[derive(Debug, thiserror::Error)]
#[derive(Debug, thiserror::Error, Clone, PartialEq, Eq, Hash)]
pub enum ApprovalError {
#[error("No user agents connected")]
NoUserAgentsConnected,

View File

@@ -1,17 +1,14 @@
use std::{ops::DerefMut, sync::Mutex};
use arbiter_proto::proto::{
client,
user_agent::{
use arbiter_proto::proto::user_agent::{
ClientConnectionCancel, ClientConnectionRequest, UnsealEncryptedKey, UnsealResult,
UnsealStart, UnsealStartResponse, UserAgentRequest, UserAgentResponse,
user_agent_request::Payload as UserAgentRequestPayload,
user_agent_response::Payload as UserAgentResponsePayload,
},
};
};
use chacha20poly1305::{AeadInPlace, XChaCha20Poly1305, XNonce, aead::KeyInit};
use ed25519_dalek::VerifyingKey;
use kameo::{Actor, error::SendError, message, messages, prelude::Context};
use kameo::{Actor, error::SendError, messages, prelude::Context};
use memsafe::MemSafe;
use tokio::{select, sync::watch};
use tracing::{error, info};
@@ -62,7 +59,7 @@ impl UserAgentSession {
async fn send_msg<Reply: kameo::Reply>(
&mut self,
msg: UserAgentResponsePayload,
ctx: &mut Context<Self, Reply>,
_ctx: &mut Context<Self, Reply>,
) -> Result<(), Error> {
self.props
.transport
@@ -111,7 +108,6 @@ impl UserAgentSession {
#[messages]
impl UserAgentSession {
// TODO: Think about refactoring it to state-machine based flow, as we already have one
#[message(ctx)]
pub async fn request_new_client_approval(