refactor(server::{user_agent, client}): move auth part to separate function to not to pollute actor session with one-time concerns
This commit is contained in:
@@ -7,7 +7,6 @@ use arbiter_proto::{
|
||||
transport::{IdentityRecvConverter, SendConverter, grpc},
|
||||
};
|
||||
use async_trait::async_trait;
|
||||
use kameo::actor::Spawn;
|
||||
use tokio_stream::wrappers::ReceiverStream;
|
||||
|
||||
use tokio::sync::mpsc;
|
||||
@@ -16,8 +15,8 @@ use tracing::info;
|
||||
|
||||
use crate::{
|
||||
actors::{
|
||||
client::{ClientActor, ClientError},
|
||||
user_agent::{UserAgentActor, UserAgentError},
|
||||
client::{self, ClientError, ConnectionProps as ClientConnectionProps, connect_client},
|
||||
user_agent::{self, ConnectionProps, UserAgentError, connect_user_agent},
|
||||
},
|
||||
context::ServerContext,
|
||||
};
|
||||
@@ -28,11 +27,6 @@ pub mod db;
|
||||
|
||||
const DEFAULT_CHANNEL_SIZE: usize = 1000;
|
||||
|
||||
/// Converts User Agent domain outbounds into the tonic stream item emitted by
|
||||
/// the server.§
|
||||
///
|
||||
/// The conversion is defined at the server boundary so the actor module remains
|
||||
/// focused on domain semantics and does not depend on tonic status encoding.
|
||||
struct UserAgentGrpcSender;
|
||||
|
||||
impl SendConverter for UserAgentGrpcSender {
|
||||
@@ -47,11 +41,6 @@ impl SendConverter for UserAgentGrpcSender {
|
||||
}
|
||||
}
|
||||
|
||||
/// Converts Client domain outbounds into the tonic stream item emitted by the
|
||||
/// server.
|
||||
///
|
||||
/// The conversion is defined at the server boundary so the actor module remains
|
||||
/// focused on domain semantics and does not depend on tonic status encoding.
|
||||
struct ClientGrpcSender;
|
||||
|
||||
impl SendConverter for ClientGrpcSender {
|
||||
@@ -66,78 +55,71 @@ impl SendConverter for ClientGrpcSender {
|
||||
}
|
||||
}
|
||||
|
||||
/// Maps Client domain errors to public gRPC transport errors for the `client`
|
||||
/// streaming endpoint.
|
||||
fn client_error_status(value: ClientError) -> Status {
|
||||
match value {
|
||||
ClientError::MissingRequestPayload | ClientError::UnexpectedRequestPayload => {
|
||||
Status::invalid_argument("Expected message with payload")
|
||||
}
|
||||
ClientError::InvalidStateForChallengeSolution => {
|
||||
Status::invalid_argument("Invalid state for challenge solution")
|
||||
}
|
||||
ClientError::InvalidAuthPubkeyLength => {
|
||||
Status::invalid_argument("Expected pubkey to have specific length")
|
||||
}
|
||||
ClientError::InvalidAuthPubkeyEncoding => {
|
||||
Status::invalid_argument("Failed to convert pubkey to VerifyingKey")
|
||||
}
|
||||
ClientError::InvalidSignatureLength => {
|
||||
Status::invalid_argument("Invalid signature length")
|
||||
}
|
||||
ClientError::PublicKeyNotRegistered => {
|
||||
Status::unauthenticated("Public key not registered")
|
||||
}
|
||||
ClientError::InvalidChallengeSolution => {
|
||||
Status::unauthenticated("Invalid challenge solution")
|
||||
}
|
||||
ClientError::StateTransitionFailed => Status::internal("State machine error"),
|
||||
ClientError::DatabasePoolUnavailable => Status::internal("Database pool error"),
|
||||
ClientError::DatabaseOperationFailed => Status::internal("Database error"),
|
||||
ClientError::Auth(ref err) => client_auth_error_status(err),
|
||||
}
|
||||
}
|
||||
|
||||
fn client_auth_error_status(value: &client::auth::Error) -> Status {
|
||||
use client::auth::Error;
|
||||
match value {
|
||||
Error::UnexpectedMessagePayload | Error::InvalidClientPubkeyLength => {
|
||||
Status::invalid_argument(value.to_string())
|
||||
}
|
||||
Error::InvalidAuthPubkeyEncoding => {
|
||||
Status::invalid_argument("Failed to convert pubkey to VerifyingKey")
|
||||
}
|
||||
Error::InvalidSignatureLength => Status::invalid_argument("Invalid signature length"),
|
||||
Error::PublicKeyNotRegistered | Error::InvalidChallengeSolution => {
|
||||
Status::unauthenticated(value.to_string())
|
||||
}
|
||||
Error::Transport => Status::internal("Transport error"),
|
||||
Error::DatabasePoolUnavailable => Status::internal("Database pool error"),
|
||||
Error::DatabaseOperationFailed => Status::internal("Database error"),
|
||||
}
|
||||
}
|
||||
|
||||
/// Maps User Agent domain errors to public gRPC transport errors for the
|
||||
/// `user_agent` streaming endpoint.
|
||||
fn user_agent_error_status(value: UserAgentError) -> Status {
|
||||
match value {
|
||||
UserAgentError::MissingRequestPayload | UserAgentError::UnexpectedRequestPayload => {
|
||||
Status::invalid_argument("Expected message with payload")
|
||||
}
|
||||
UserAgentError::InvalidStateForChallengeSolution => {
|
||||
Status::invalid_argument("Invalid state for challenge solution")
|
||||
}
|
||||
UserAgentError::InvalidStateForUnsealEncryptedKey => {
|
||||
Status::failed_precondition("Invalid state for unseal encrypted key")
|
||||
}
|
||||
UserAgentError::InvalidClientPubkeyLength => {
|
||||
Status::invalid_argument("client_pubkey must be 32 bytes")
|
||||
}
|
||||
UserAgentError::InvalidAuthPubkeyLength => {
|
||||
Status::invalid_argument("Expected pubkey to have specific length")
|
||||
UserAgentError::StateTransitionFailed => Status::internal("State machine error"),
|
||||
UserAgentError::KeyHolderActorUnreachable => Status::internal("Vault is not available"),
|
||||
UserAgentError::Auth(ref err) => auth_error_status(err),
|
||||
}
|
||||
}
|
||||
|
||||
fn auth_error_status(value: &user_agent::auth::Error) -> Status {
|
||||
use user_agent::auth::Error;
|
||||
match value {
|
||||
Error::UnexpectedMessagePayload | Error::InvalidClientPubkeyLength => {
|
||||
Status::invalid_argument(value.to_string())
|
||||
}
|
||||
UserAgentError::InvalidAuthPubkeyEncoding => {
|
||||
Error::InvalidAuthPubkeyEncoding => {
|
||||
Status::invalid_argument("Failed to convert pubkey to VerifyingKey")
|
||||
}
|
||||
UserAgentError::InvalidSignatureLength => {
|
||||
Status::invalid_argument("Invalid signature length")
|
||||
Error::PublicKeyNotRegistered | Error::InvalidChallengeSolution => {
|
||||
Status::unauthenticated(value.to_string())
|
||||
}
|
||||
UserAgentError::InvalidBootstrapToken => {
|
||||
Status::invalid_argument("Invalid bootstrap token")
|
||||
}
|
||||
UserAgentError::PublicKeyNotRegistered => {
|
||||
Status::unauthenticated("Public key not registered")
|
||||
}
|
||||
UserAgentError::InvalidChallengeSolution => {
|
||||
Status::unauthenticated("Invalid challenge solution")
|
||||
}
|
||||
UserAgentError::StateTransitionFailed => Status::internal("State machine error"),
|
||||
UserAgentError::BootstrapperActorUnreachable => {
|
||||
Error::InvalidBootstrapToken => Status::invalid_argument("Invalid bootstrap token"),
|
||||
Error::Transport => Status::internal("Transport error"),
|
||||
Error::BootstrapperActorUnreachable => {
|
||||
Status::internal("Bootstrap token consumption failed")
|
||||
}
|
||||
UserAgentError::KeyHolderActorUnreachable => Status::internal("Vault is not available"),
|
||||
UserAgentError::DatabasePoolUnavailable => Status::internal("Database pool error"),
|
||||
UserAgentError::DatabaseOperationFailed => Status::internal("Database error"),
|
||||
Error::DatabasePoolUnavailable => Status::internal("Database pool error"),
|
||||
Error::DatabaseOperationFailed => Status::internal("Database error"),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -170,7 +152,8 @@ impl arbiter_proto::proto::arbiter_service_server::ArbiterService for Server {
|
||||
IdentityRecvConverter::<ClientRequest>::new(),
|
||||
ClientGrpcSender,
|
||||
);
|
||||
ClientActor::spawn(ClientActor::new(self.context.clone(), Box::new(transport)));
|
||||
let props = ClientConnectionProps::new(self.context.db.clone(), Box::new(transport));
|
||||
tokio::spawn(connect_client(props));
|
||||
|
||||
info!(event = "connection established", "grpc.client");
|
||||
|
||||
@@ -191,7 +174,12 @@ impl arbiter_proto::proto::arbiter_service_server::ArbiterService for Server {
|
||||
IdentityRecvConverter::<UserAgentRequest>::new(),
|
||||
UserAgentGrpcSender,
|
||||
);
|
||||
UserAgentActor::spawn(UserAgentActor::new(self.context.clone(), Box::new(transport)));
|
||||
let props = ConnectionProps::new(
|
||||
self.context.db.clone(),
|
||||
self.context.actors.clone(),
|
||||
Box::new(transport),
|
||||
);
|
||||
tokio::spawn(connect_user_agent(props));
|
||||
|
||||
info!(event = "connection established", "grpc.user_agent");
|
||||
|
||||
|
||||
Reference in New Issue
Block a user