refactor(server::client): migrated to new connection design
This commit is contained in:
@@ -1,142 +1,118 @@
|
||||
use arbiter_proto::{
|
||||
proto::client::{
|
||||
AuthChallenge as ProtoAuthChallenge, AuthChallengeRequest as ProtoAuthChallengeRequest,
|
||||
AuthChallengeSolution as ProtoAuthChallengeSolution, AuthOk as ProtoAuthOk,
|
||||
ClientConnectError, ClientRequest, ClientResponse,
|
||||
client_connect_error::Code as ProtoClientConnectErrorCode,
|
||||
ClientRequest, ClientResponse, VaultState as ProtoVaultState,
|
||||
client_request::Payload as ClientRequestPayload,
|
||||
client_response::Payload as ClientResponsePayload,
|
||||
},
|
||||
transport::{Bi, Error as TransportError, Sender},
|
||||
transport::{Receiver, Sender, grpc::GrpcBi},
|
||||
};
|
||||
use async_trait::async_trait;
|
||||
use futures::StreamExt as _;
|
||||
use tokio::sync::mpsc;
|
||||
use tonic::{Status, Streaming};
|
||||
use kameo::{
|
||||
actor::{ActorRef, Spawn as _},
|
||||
error::SendError,
|
||||
};
|
||||
use tracing::{info, warn};
|
||||
|
||||
use crate::actors::client::{
|
||||
self, ClientError, ConnectErrorCode, Request as DomainRequest, Response as DomainResponse,
|
||||
use crate::{
|
||||
actors::{
|
||||
client::{
|
||||
self, ClientConnection,
|
||||
session::{ClientSession, Error, HandleQueryVaultState},
|
||||
},
|
||||
keyholder::KeyHolderState,
|
||||
},
|
||||
utils::defer,
|
||||
};
|
||||
|
||||
pub struct GrpcTransport {
|
||||
sender: mpsc::Sender<Result<ClientResponse, Status>>,
|
||||
receiver: Streaming<ClientRequest>,
|
||||
}
|
||||
mod auth;
|
||||
|
||||
impl GrpcTransport {
|
||||
pub fn new(
|
||||
sender: mpsc::Sender<Result<ClientResponse, Status>>,
|
||||
receiver: Streaming<ClientRequest>,
|
||||
) -> Self {
|
||||
Self { sender, receiver }
|
||||
}
|
||||
|
||||
fn request_to_domain(request: ClientRequest) -> Result<DomainRequest, Status> {
|
||||
match request.payload {
|
||||
Some(ClientRequestPayload::AuthChallengeRequest(ProtoAuthChallengeRequest {
|
||||
pubkey,
|
||||
})) => Ok(DomainRequest::AuthChallengeRequest { pubkey }),
|
||||
Some(ClientRequestPayload::AuthChallengeSolution(ProtoAuthChallengeSolution {
|
||||
signature,
|
||||
})) => Ok(DomainRequest::AuthChallengeSolution { signature }),
|
||||
None => Err(Status::invalid_argument("Missing client request payload")),
|
||||
}
|
||||
}
|
||||
|
||||
fn response_to_proto(response: DomainResponse) -> ClientResponse {
|
||||
let payload = match response {
|
||||
DomainResponse::AuthChallenge { pubkey, nonce } => {
|
||||
ClientResponsePayload::AuthChallenge(ProtoAuthChallenge { pubkey, nonce })
|
||||
}
|
||||
DomainResponse::AuthOk => ClientResponsePayload::AuthOk(ProtoAuthOk {}),
|
||||
DomainResponse::ClientConnectError { code } => {
|
||||
ClientResponsePayload::ClientConnectError(ClientConnectError {
|
||||
code: match code {
|
||||
ConnectErrorCode::Unknown => ProtoClientConnectErrorCode::Unknown,
|
||||
ConnectErrorCode::ApprovalDenied => {
|
||||
ProtoClientConnectErrorCode::ApprovalDenied
|
||||
}
|
||||
ConnectErrorCode::NoUserAgentsOnline => {
|
||||
ProtoClientConnectErrorCode::NoUserAgentsOnline
|
||||
}
|
||||
}
|
||||
.into(),
|
||||
})
|
||||
}
|
||||
async fn dispatch_loop(
|
||||
mut bi: GrpcBi<ClientRequest, ClientResponse>,
|
||||
actor: ActorRef<ClientSession>,
|
||||
) {
|
||||
loop {
|
||||
let Some(conn) = bi.recv().await else {
|
||||
return;
|
||||
};
|
||||
|
||||
ClientResponse {
|
||||
payload: Some(payload),
|
||||
}
|
||||
}
|
||||
|
||||
fn error_to_status(value: ClientError) -> Status {
|
||||
match value {
|
||||
ClientError::MissingRequestPayload | ClientError::UnexpectedRequestPayload => {
|
||||
Status::invalid_argument("Expected message with payload")
|
||||
}
|
||||
ClientError::StateTransitionFailed => Status::internal("State machine error"),
|
||||
ClientError::Auth(ref err) => auth_error_status(err),
|
||||
ClientError::ConnectionRegistrationFailed => {
|
||||
Status::internal("Connection registration failed")
|
||||
}
|
||||
if dispatch_conn_message(&mut bi, &actor, conn).await.is_err() {
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl Sender<Result<DomainResponse, ClientError>> for GrpcTransport {
|
||||
async fn send(
|
||||
&mut self,
|
||||
item: Result<DomainResponse, ClientError>,
|
||||
) -> Result<(), TransportError> {
|
||||
let outbound = match item {
|
||||
Ok(message) => Ok(Self::response_to_proto(message)),
|
||||
Err(err) => Err(Self::error_to_status(err)),
|
||||
};
|
||||
async fn dispatch_conn_message(
|
||||
bi: &mut GrpcBi<ClientRequest, ClientResponse>,
|
||||
actor: &ActorRef<ClientSession>,
|
||||
conn: Result<ClientRequest, tonic::Status>,
|
||||
) -> Result<(), ()> {
|
||||
let conn = match conn {
|
||||
Ok(conn) => conn,
|
||||
Err(err) => {
|
||||
warn!(error = ?err, "Failed to receive client request");
|
||||
return Err(());
|
||||
}
|
||||
};
|
||||
|
||||
self.sender
|
||||
.send(outbound)
|
||||
.await
|
||||
.map_err(|_| TransportError::ChannelClosed)
|
||||
}
|
||||
}
|
||||
let Some(payload) = conn.payload else {
|
||||
let _ = bi
|
||||
.send(Err(tonic::Status::invalid_argument(
|
||||
"Missing client request payload",
|
||||
)))
|
||||
.await;
|
||||
return Err(());
|
||||
};
|
||||
|
||||
#[async_trait]
|
||||
impl Bi<DomainRequest, Result<DomainResponse, ClientError>> for GrpcTransport {
|
||||
async fn recv(&mut self) -> Option<DomainRequest> {
|
||||
match self.receiver.next().await {
|
||||
Some(Ok(item)) => match Self::request_to_domain(item) {
|
||||
Ok(request) => Some(request),
|
||||
Err(status) => {
|
||||
let _ = self.sender.send(Err(status)).await;
|
||||
None
|
||||
let payload = match payload {
|
||||
ClientRequestPayload::QueryVaultState(_) => ClientResponsePayload::VaultState(
|
||||
match actor.ask(HandleQueryVaultState {}).await {
|
||||
Ok(KeyHolderState::Unbootstrapped) => ProtoVaultState::Unbootstrapped,
|
||||
Ok(KeyHolderState::Sealed) => ProtoVaultState::Sealed,
|
||||
Ok(KeyHolderState::Unsealed) => ProtoVaultState::Unsealed,
|
||||
Err(SendError::HandlerError(Error::Internal)) => ProtoVaultState::Error,
|
||||
Err(err) => {
|
||||
warn!(error = ?err, "Failed to query vault state");
|
||||
ProtoVaultState::Error
|
||||
}
|
||||
},
|
||||
Some(Err(error)) => {
|
||||
tracing::error!(error = ?error, "grpc client recv failed; closing stream");
|
||||
None
|
||||
}
|
||||
None => None,
|
||||
.into(),
|
||||
),
|
||||
payload => {
|
||||
warn!(?payload, "Unsupported post-auth client request");
|
||||
let _ = bi
|
||||
.send(Err(tonic::Status::invalid_argument(
|
||||
"Unsupported client request",
|
||||
)))
|
||||
.await;
|
||||
return Err(());
|
||||
}
|
||||
};
|
||||
|
||||
bi.send(Ok(ClientResponse {
|
||||
payload: Some(payload),
|
||||
}))
|
||||
.await
|
||||
.map_err(|_| ())
|
||||
}
|
||||
|
||||
pub async fn start(conn: ClientConnection, mut bi: GrpcBi<ClientRequest, ClientResponse>) {
|
||||
let mut conn = conn;
|
||||
match auth::start(&mut conn, &mut bi).await {
|
||||
Ok(_) => {
|
||||
let actor =
|
||||
client::session::ClientSession::spawn(client::session::ClientSession::new(conn));
|
||||
let actor_for_cleanup = actor.clone();
|
||||
let _ = defer(move || {
|
||||
actor_for_cleanup.kill();
|
||||
});
|
||||
|
||||
info!("Client authenticated successfully");
|
||||
dispatch_loop(bi, actor).await;
|
||||
}
|
||||
Err(e) => {
|
||||
let mut transport = auth::AuthTransportAdapter(&mut bi);
|
||||
let _ = transport.send(Err(e.clone())).await;
|
||||
warn!(error = ?e, "Authentication failed");
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn auth_error_status(value: &client::auth::Error) -> Status {
|
||||
use client::auth::Error;
|
||||
|
||||
match value {
|
||||
Error::UnexpectedMessagePayload | Error::InvalidClientPubkeyLength => {
|
||||
Status::invalid_argument(value.to_string())
|
||||
}
|
||||
Error::InvalidAuthPubkeyEncoding => {
|
||||
Status::invalid_argument("Failed to convert pubkey to VerifyingKey")
|
||||
}
|
||||
Error::InvalidChallengeSolution => Status::unauthenticated(value.to_string()),
|
||||
Error::ApproveError(_) => Status::permission_denied(value.to_string()),
|
||||
Error::Transport => Status::internal("Transport error"),
|
||||
Error::DatabasePoolUnavailable => Status::internal("Database pool error"),
|
||||
Error::DatabaseOperationFailed => Status::internal("Database error"),
|
||||
Error::InternalError => Status::internal("Internal error"),
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user