refactor(server::client): migrated to new connection design

This commit is contained in:
hdbg
2026-03-18 22:40:07 +01:00
committed by Stas
parent 04bea299cb
commit a663363626
14 changed files with 474 additions and 401 deletions

View File

@@ -1,142 +1,118 @@
use arbiter_proto::{
proto::client::{
AuthChallenge as ProtoAuthChallenge, AuthChallengeRequest as ProtoAuthChallengeRequest,
AuthChallengeSolution as ProtoAuthChallengeSolution, AuthOk as ProtoAuthOk,
ClientConnectError, ClientRequest, ClientResponse,
client_connect_error::Code as ProtoClientConnectErrorCode,
ClientRequest, ClientResponse, VaultState as ProtoVaultState,
client_request::Payload as ClientRequestPayload,
client_response::Payload as ClientResponsePayload,
},
transport::{Bi, Error as TransportError, Sender},
transport::{Receiver, Sender, grpc::GrpcBi},
};
use async_trait::async_trait;
use futures::StreamExt as _;
use tokio::sync::mpsc;
use tonic::{Status, Streaming};
use kameo::{
actor::{ActorRef, Spawn as _},
error::SendError,
};
use tracing::{info, warn};
use crate::actors::client::{
self, ClientError, ConnectErrorCode, Request as DomainRequest, Response as DomainResponse,
use crate::{
actors::{
client::{
self, ClientConnection,
session::{ClientSession, Error, HandleQueryVaultState},
},
keyholder::KeyHolderState,
},
utils::defer,
};
pub struct GrpcTransport {
sender: mpsc::Sender<Result<ClientResponse, Status>>,
receiver: Streaming<ClientRequest>,
}
mod auth;
impl GrpcTransport {
pub fn new(
sender: mpsc::Sender<Result<ClientResponse, Status>>,
receiver: Streaming<ClientRequest>,
) -> Self {
Self { sender, receiver }
}
fn request_to_domain(request: ClientRequest) -> Result<DomainRequest, Status> {
match request.payload {
Some(ClientRequestPayload::AuthChallengeRequest(ProtoAuthChallengeRequest {
pubkey,
})) => Ok(DomainRequest::AuthChallengeRequest { pubkey }),
Some(ClientRequestPayload::AuthChallengeSolution(ProtoAuthChallengeSolution {
signature,
})) => Ok(DomainRequest::AuthChallengeSolution { signature }),
None => Err(Status::invalid_argument("Missing client request payload")),
}
}
fn response_to_proto(response: DomainResponse) -> ClientResponse {
let payload = match response {
DomainResponse::AuthChallenge { pubkey, nonce } => {
ClientResponsePayload::AuthChallenge(ProtoAuthChallenge { pubkey, nonce })
}
DomainResponse::AuthOk => ClientResponsePayload::AuthOk(ProtoAuthOk {}),
DomainResponse::ClientConnectError { code } => {
ClientResponsePayload::ClientConnectError(ClientConnectError {
code: match code {
ConnectErrorCode::Unknown => ProtoClientConnectErrorCode::Unknown,
ConnectErrorCode::ApprovalDenied => {
ProtoClientConnectErrorCode::ApprovalDenied
}
ConnectErrorCode::NoUserAgentsOnline => {
ProtoClientConnectErrorCode::NoUserAgentsOnline
}
}
.into(),
})
}
async fn dispatch_loop(
mut bi: GrpcBi<ClientRequest, ClientResponse>,
actor: ActorRef<ClientSession>,
) {
loop {
let Some(conn) = bi.recv().await else {
return;
};
ClientResponse {
payload: Some(payload),
}
}
fn error_to_status(value: ClientError) -> Status {
match value {
ClientError::MissingRequestPayload | ClientError::UnexpectedRequestPayload => {
Status::invalid_argument("Expected message with payload")
}
ClientError::StateTransitionFailed => Status::internal("State machine error"),
ClientError::Auth(ref err) => auth_error_status(err),
ClientError::ConnectionRegistrationFailed => {
Status::internal("Connection registration failed")
}
if dispatch_conn_message(&mut bi, &actor, conn).await.is_err() {
return;
}
}
}
#[async_trait]
impl Sender<Result<DomainResponse, ClientError>> for GrpcTransport {
async fn send(
&mut self,
item: Result<DomainResponse, ClientError>,
) -> Result<(), TransportError> {
let outbound = match item {
Ok(message) => Ok(Self::response_to_proto(message)),
Err(err) => Err(Self::error_to_status(err)),
};
async fn dispatch_conn_message(
bi: &mut GrpcBi<ClientRequest, ClientResponse>,
actor: &ActorRef<ClientSession>,
conn: Result<ClientRequest, tonic::Status>,
) -> Result<(), ()> {
let conn = match conn {
Ok(conn) => conn,
Err(err) => {
warn!(error = ?err, "Failed to receive client request");
return Err(());
}
};
self.sender
.send(outbound)
.await
.map_err(|_| TransportError::ChannelClosed)
}
}
let Some(payload) = conn.payload else {
let _ = bi
.send(Err(tonic::Status::invalid_argument(
"Missing client request payload",
)))
.await;
return Err(());
};
#[async_trait]
impl Bi<DomainRequest, Result<DomainResponse, ClientError>> for GrpcTransport {
async fn recv(&mut self) -> Option<DomainRequest> {
match self.receiver.next().await {
Some(Ok(item)) => match Self::request_to_domain(item) {
Ok(request) => Some(request),
Err(status) => {
let _ = self.sender.send(Err(status)).await;
None
let payload = match payload {
ClientRequestPayload::QueryVaultState(_) => ClientResponsePayload::VaultState(
match actor.ask(HandleQueryVaultState {}).await {
Ok(KeyHolderState::Unbootstrapped) => ProtoVaultState::Unbootstrapped,
Ok(KeyHolderState::Sealed) => ProtoVaultState::Sealed,
Ok(KeyHolderState::Unsealed) => ProtoVaultState::Unsealed,
Err(SendError::HandlerError(Error::Internal)) => ProtoVaultState::Error,
Err(err) => {
warn!(error = ?err, "Failed to query vault state");
ProtoVaultState::Error
}
},
Some(Err(error)) => {
tracing::error!(error = ?error, "grpc client recv failed; closing stream");
None
}
None => None,
.into(),
),
payload => {
warn!(?payload, "Unsupported post-auth client request");
let _ = bi
.send(Err(tonic::Status::invalid_argument(
"Unsupported client request",
)))
.await;
return Err(());
}
};
bi.send(Ok(ClientResponse {
payload: Some(payload),
}))
.await
.map_err(|_| ())
}
pub async fn start(conn: ClientConnection, mut bi: GrpcBi<ClientRequest, ClientResponse>) {
let mut conn = conn;
match auth::start(&mut conn, &mut bi).await {
Ok(_) => {
let actor =
client::session::ClientSession::spawn(client::session::ClientSession::new(conn));
let actor_for_cleanup = actor.clone();
let _ = defer(move || {
actor_for_cleanup.kill();
});
info!("Client authenticated successfully");
dispatch_loop(bi, actor).await;
}
Err(e) => {
let mut transport = auth::AuthTransportAdapter(&mut bi);
let _ = transport.send(Err(e.clone())).await;
warn!(error = ?e, "Authentication failed");
return;
}
}
}
fn auth_error_status(value: &client::auth::Error) -> Status {
use client::auth::Error;
match value {
Error::UnexpectedMessagePayload | Error::InvalidClientPubkeyLength => {
Status::invalid_argument(value.to_string())
}
Error::InvalidAuthPubkeyEncoding => {
Status::invalid_argument("Failed to convert pubkey to VerifyingKey")
}
Error::InvalidChallengeSolution => Status::unauthenticated(value.to_string()),
Error::ApproveError(_) => Status::permission_denied(value.to_string()),
Error::Transport => Status::internal("Transport error"),
Error::DatabasePoolUnavailable => Status::internal("Database pool error"),
Error::DatabaseOperationFailed => Status::internal("Database error"),
Error::InternalError => Status::internal("Internal error"),
}
}