refactor(server): removed grpc adapter and replaced with concrete implementations
This commit is contained in:
137
server/crates/arbiter-server/src/grpc/client.rs
Normal file
137
server/crates/arbiter-server/src/grpc/client.rs
Normal file
@@ -0,0 +1,137 @@
|
||||
use arbiter_proto::{
|
||||
proto::client::{
|
||||
AuthChallenge as ProtoAuthChallenge,
|
||||
AuthChallengeRequest as ProtoAuthChallengeRequest,
|
||||
AuthChallengeSolution as ProtoAuthChallengeSolution, AuthOk as ProtoAuthOk,
|
||||
ClientConnectError, ClientRequest, ClientResponse,
|
||||
client_connect_error::Code as ProtoClientConnectErrorCode,
|
||||
client_request::Payload as ClientRequestPayload,
|
||||
client_response::Payload as ClientResponsePayload,
|
||||
},
|
||||
transport::{Bi, Error as TransportError},
|
||||
};
|
||||
use async_trait::async_trait;
|
||||
use futures::StreamExt as _;
|
||||
use tokio::sync::mpsc;
|
||||
use tonic::{Status, Streaming};
|
||||
|
||||
use crate::actors::client::{
|
||||
self, ClientError, ConnectErrorCode, Request as DomainRequest, Response as DomainResponse,
|
||||
};
|
||||
|
||||
pub struct GrpcTransport {
|
||||
sender: mpsc::Sender<Result<ClientResponse, Status>>,
|
||||
receiver: Streaming<ClientRequest>,
|
||||
}
|
||||
|
||||
impl GrpcTransport {
|
||||
pub fn new(
|
||||
sender: mpsc::Sender<Result<ClientResponse, Status>>,
|
||||
receiver: Streaming<ClientRequest>,
|
||||
) -> Self {
|
||||
Self { sender, receiver }
|
||||
}
|
||||
|
||||
fn request_to_domain(request: ClientRequest) -> Result<DomainRequest, Status> {
|
||||
match request.payload {
|
||||
Some(ClientRequestPayload::AuthChallengeRequest(ProtoAuthChallengeRequest {
|
||||
pubkey,
|
||||
})) => Ok(DomainRequest::AuthChallengeRequest { pubkey }),
|
||||
Some(ClientRequestPayload::AuthChallengeSolution(
|
||||
ProtoAuthChallengeSolution { signature },
|
||||
)) => Ok(DomainRequest::AuthChallengeSolution { signature }),
|
||||
None => Err(Status::invalid_argument("Missing client request payload")),
|
||||
}
|
||||
}
|
||||
|
||||
fn response_to_proto(response: DomainResponse) -> ClientResponse {
|
||||
let payload = match response {
|
||||
DomainResponse::AuthChallenge { pubkey, nonce } => {
|
||||
ClientResponsePayload::AuthChallenge(ProtoAuthChallenge { pubkey, nonce })
|
||||
}
|
||||
DomainResponse::AuthOk => ClientResponsePayload::AuthOk(ProtoAuthOk {}),
|
||||
DomainResponse::ClientConnectError { code } => {
|
||||
ClientResponsePayload::ClientConnectError(ClientConnectError {
|
||||
code: match code {
|
||||
ConnectErrorCode::Unknown => ProtoClientConnectErrorCode::Unknown,
|
||||
ConnectErrorCode::ApprovalDenied => {
|
||||
ProtoClientConnectErrorCode::ApprovalDenied
|
||||
}
|
||||
ConnectErrorCode::NoUserAgentsOnline => {
|
||||
ProtoClientConnectErrorCode::NoUserAgentsOnline
|
||||
}
|
||||
}
|
||||
.into(),
|
||||
})
|
||||
}
|
||||
};
|
||||
|
||||
ClientResponse {
|
||||
payload: Some(payload),
|
||||
}
|
||||
}
|
||||
|
||||
fn error_to_status(value: ClientError) -> Status {
|
||||
match value {
|
||||
ClientError::MissingRequestPayload | ClientError::UnexpectedRequestPayload => {
|
||||
Status::invalid_argument("Expected message with payload")
|
||||
}
|
||||
ClientError::StateTransitionFailed => Status::internal("State machine error"),
|
||||
ClientError::Auth(ref err) => auth_error_status(err),
|
||||
ClientError::ConnectionRegistrationFailed => {
|
||||
Status::internal("Connection registration failed")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl Bi<DomainRequest, Result<DomainResponse, ClientError>> for GrpcTransport {
|
||||
async fn send(&mut self, item: Result<DomainResponse, ClientError>) -> Result<(), TransportError> {
|
||||
let outbound = match item {
|
||||
Ok(message) => Ok(Self::response_to_proto(message)),
|
||||
Err(err) => Err(Self::error_to_status(err)),
|
||||
};
|
||||
|
||||
self.sender
|
||||
.send(outbound)
|
||||
.await
|
||||
.map_err(|_| TransportError::ChannelClosed)
|
||||
}
|
||||
|
||||
async fn recv(&mut self) -> Option<DomainRequest> {
|
||||
match self.receiver.next().await {
|
||||
Some(Ok(item)) => match Self::request_to_domain(item) {
|
||||
Ok(request) => Some(request),
|
||||
Err(status) => {
|
||||
let _ = self.sender.send(Err(status)).await;
|
||||
None
|
||||
}
|
||||
},
|
||||
Some(Err(error)) => {
|
||||
tracing::error!(error = ?error, "grpc client recv failed; closing stream");
|
||||
None
|
||||
}
|
||||
None => None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn auth_error_status(value: &client::auth::Error) -> Status {
|
||||
use client::auth::Error;
|
||||
|
||||
match value {
|
||||
Error::UnexpectedMessagePayload | Error::InvalidClientPubkeyLength => {
|
||||
Status::invalid_argument(value.to_string())
|
||||
}
|
||||
Error::InvalidAuthPubkeyEncoding => {
|
||||
Status::invalid_argument("Failed to convert pubkey to VerifyingKey")
|
||||
}
|
||||
Error::InvalidChallengeSolution => Status::unauthenticated(value.to_string()),
|
||||
Error::ApproveError(_) => Status::permission_denied(value.to_string()),
|
||||
Error::Transport => Status::internal("Transport error"),
|
||||
Error::DatabasePoolUnavailable => Status::internal("Database pool error"),
|
||||
Error::DatabaseOperationFailed => Status::internal("Database error"),
|
||||
Error::InternalError => Status::internal("Internal error"),
|
||||
}
|
||||
}
|
||||
65
server/crates/arbiter-server/src/grpc/mod.rs
Normal file
65
server/crates/arbiter-server/src/grpc/mod.rs
Normal file
@@ -0,0 +1,65 @@
|
||||
|
||||
use arbiter_proto::proto::{
|
||||
client::{ClientRequest, ClientResponse},
|
||||
user_agent::{UserAgentRequest, UserAgentResponse},
|
||||
};
|
||||
use tokio::sync::mpsc;
|
||||
use tokio_stream::wrappers::ReceiverStream;
|
||||
use tonic::{Request, Response, Status, async_trait};
|
||||
use tracing::info;
|
||||
|
||||
use crate::{
|
||||
DEFAULT_CHANNEL_SIZE,
|
||||
actors::{client::{ClientConnection, connect_client}, user_agent::{UserAgentConnection, connect_user_agent}},
|
||||
};
|
||||
|
||||
pub mod client;
|
||||
pub mod user_agent;
|
||||
|
||||
#[async_trait]
|
||||
impl arbiter_proto::proto::arbiter_service_server::ArbiterService for super::Server {
|
||||
type UserAgentStream = ReceiverStream<Result<UserAgentResponse, Status>>;
|
||||
type ClientStream = ReceiverStream<Result<ClientResponse, Status>>;
|
||||
|
||||
#[tracing::instrument(level = "debug", skip(self))]
|
||||
async fn client(
|
||||
&self,
|
||||
request: Request<tonic::Streaming<ClientRequest>>,
|
||||
) -> Result<Response<Self::ClientStream>, Status> {
|
||||
let req_stream = request.into_inner();
|
||||
let (tx, rx) = mpsc::channel(DEFAULT_CHANNEL_SIZE);
|
||||
|
||||
let transport = client::GrpcTransport::new(tx, req_stream);
|
||||
let props = ClientConnection::new(
|
||||
self.context.db.clone(),
|
||||
Box::new(transport),
|
||||
self.context.actors.clone(),
|
||||
);
|
||||
tokio::spawn(connect_client(props));
|
||||
|
||||
info!(event = "connection established", "grpc.client");
|
||||
|
||||
Ok(Response::new(ReceiverStream::new(rx)))
|
||||
}
|
||||
|
||||
#[tracing::instrument(level = "debug", skip(self))]
|
||||
async fn user_agent(
|
||||
&self,
|
||||
request: Request<tonic::Streaming<UserAgentRequest>>,
|
||||
) -> Result<Response<Self::UserAgentStream>, Status> {
|
||||
let req_stream = request.into_inner();
|
||||
let (tx, rx) = mpsc::channel(DEFAULT_CHANNEL_SIZE);
|
||||
|
||||
let transport = user_agent::GrpcTransport::new(tx, req_stream);
|
||||
let props = UserAgentConnection::new(
|
||||
self.context.db.clone(),
|
||||
self.context.actors.clone(),
|
||||
Box::new(transport),
|
||||
);
|
||||
tokio::spawn(connect_user_agent(props));
|
||||
|
||||
info!(event = "connection established", "grpc.user_agent");
|
||||
|
||||
Ok(Response::new(ReceiverStream::new(rx)))
|
||||
}
|
||||
}
|
||||
288
server/crates/arbiter-server/src/grpc/user_agent.rs
Normal file
288
server/crates/arbiter-server/src/grpc/user_agent.rs
Normal file
@@ -0,0 +1,288 @@
|
||||
use arbiter_proto::{
|
||||
proto::{
|
||||
evm::{
|
||||
EvmError as ProtoEvmError, WalletCreateResponse, WalletEntry, WalletList,
|
||||
WalletListResponse, wallet_create_response::Result as WalletCreateResult,
|
||||
wallet_list_response::Result as WalletListResult,
|
||||
},
|
||||
user_agent::{
|
||||
AuthChallenge as ProtoAuthChallenge,
|
||||
AuthChallengeRequest as ProtoAuthChallengeRequest,
|
||||
AuthChallengeSolution as ProtoAuthChallengeSolution, AuthOk as ProtoAuthOk,
|
||||
BootstrapEncryptedKey as ProtoBootstrapEncryptedKey,
|
||||
BootstrapResult as ProtoBootstrapResult, ClientConnectionCancel,
|
||||
ClientConnectionRequest, ClientConnectionResponse, KeyType as ProtoKeyType,
|
||||
UnsealEncryptedKey as ProtoUnsealEncryptedKey, UnsealResult as ProtoUnsealResult,
|
||||
UnsealStart, UnsealStartResponse, UserAgentRequest, UserAgentResponse,
|
||||
VaultState as ProtoVaultState,
|
||||
user_agent_request::Payload as UserAgentRequestPayload,
|
||||
user_agent_response::Payload as UserAgentResponsePayload,
|
||||
},
|
||||
},
|
||||
transport::{Bi, Error as TransportError},
|
||||
};
|
||||
use async_trait::async_trait;
|
||||
use futures::StreamExt as _;
|
||||
use tokio::sync::mpsc;
|
||||
use tonic::{Status, Streaming};
|
||||
|
||||
use crate::actors::user_agent::{
|
||||
self, AuthPublicKey, BootstrapError, Request as DomainRequest, Response as DomainResponse,
|
||||
TransportResponseError, UnsealError, VaultState,
|
||||
};
|
||||
|
||||
pub struct GrpcTransport {
|
||||
sender: mpsc::Sender<Result<UserAgentResponse, Status>>,
|
||||
receiver: Streaming<UserAgentRequest>,
|
||||
}
|
||||
|
||||
impl GrpcTransport {
|
||||
pub fn new(
|
||||
sender: mpsc::Sender<Result<UserAgentResponse, Status>>,
|
||||
receiver: Streaming<UserAgentRequest>,
|
||||
) -> Self {
|
||||
Self { sender, receiver }
|
||||
}
|
||||
|
||||
fn request_to_domain(request: UserAgentRequest) -> Result<DomainRequest, Status> {
|
||||
match request.payload {
|
||||
Some(UserAgentRequestPayload::AuthChallengeRequest(
|
||||
ProtoAuthChallengeRequest {
|
||||
pubkey,
|
||||
bootstrap_token,
|
||||
key_type,
|
||||
},
|
||||
)) => Ok(DomainRequest::AuthChallengeRequest {
|
||||
pubkey: parse_auth_pubkey(key_type, pubkey)?,
|
||||
bootstrap_token,
|
||||
}),
|
||||
Some(UserAgentRequestPayload::AuthChallengeSolution(
|
||||
ProtoAuthChallengeSolution { signature },
|
||||
)) => Ok(DomainRequest::AuthChallengeSolution { signature }),
|
||||
Some(UserAgentRequestPayload::UnsealStart(UnsealStart { client_pubkey })) => {
|
||||
let client_pubkey: [u8; 32] = client_pubkey
|
||||
.as_slice()
|
||||
.try_into()
|
||||
.map_err(|_| Status::invalid_argument("client_pubkey must be 32 bytes"))?;
|
||||
Ok(DomainRequest::UnsealStart {
|
||||
client_pubkey: x25519_dalek::PublicKey::from(client_pubkey),
|
||||
})
|
||||
}
|
||||
Some(UserAgentRequestPayload::UnsealEncryptedKey(ProtoUnsealEncryptedKey {
|
||||
nonce,
|
||||
ciphertext,
|
||||
associated_data,
|
||||
})) => Ok(DomainRequest::UnsealEncryptedKey {
|
||||
nonce,
|
||||
ciphertext,
|
||||
associated_data,
|
||||
}),
|
||||
Some(UserAgentRequestPayload::BootstrapEncryptedKey(
|
||||
ProtoBootstrapEncryptedKey {
|
||||
nonce,
|
||||
ciphertext,
|
||||
associated_data,
|
||||
},
|
||||
)) => Ok(DomainRequest::BootstrapEncryptedKey {
|
||||
nonce,
|
||||
ciphertext,
|
||||
associated_data,
|
||||
}),
|
||||
Some(UserAgentRequestPayload::QueryVaultState(_)) => {
|
||||
Ok(DomainRequest::QueryVaultState)
|
||||
}
|
||||
Some(UserAgentRequestPayload::EvmWalletCreate(_)) => Ok(DomainRequest::EvmWalletCreate),
|
||||
Some(UserAgentRequestPayload::EvmWalletList(_)) => Ok(DomainRequest::EvmWalletList),
|
||||
Some(UserAgentRequestPayload::ClientConnectionResponse(
|
||||
ClientConnectionResponse { approved },
|
||||
)) => Ok(DomainRequest::ClientConnectionResponse { approved }),
|
||||
Some(_) => Err(Status::invalid_argument(
|
||||
"Unexpected user-agent request payload",
|
||||
)),
|
||||
None => Err(Status::invalid_argument("Missing user-agent request payload")),
|
||||
}
|
||||
}
|
||||
|
||||
fn response_to_proto(response: DomainResponse) -> UserAgentResponse {
|
||||
let payload = match response {
|
||||
DomainResponse::AuthChallenge { nonce } => {
|
||||
UserAgentResponsePayload::AuthChallenge(ProtoAuthChallenge {
|
||||
pubkey: Vec::new(),
|
||||
nonce,
|
||||
})
|
||||
}
|
||||
DomainResponse::AuthOk => UserAgentResponsePayload::AuthOk(ProtoAuthOk {}),
|
||||
DomainResponse::UnsealStartResponse { server_pubkey } => {
|
||||
UserAgentResponsePayload::UnsealStartResponse(UnsealStartResponse {
|
||||
server_pubkey: server_pubkey.as_bytes().to_vec(),
|
||||
})
|
||||
}
|
||||
DomainResponse::UnsealResult(result) => UserAgentResponsePayload::UnsealResult(
|
||||
match result {
|
||||
Ok(()) => ProtoUnsealResult::Success,
|
||||
Err(UnsealError::InvalidKey) => ProtoUnsealResult::InvalidKey,
|
||||
Err(UnsealError::Unbootstrapped) => ProtoUnsealResult::Unbootstrapped,
|
||||
}
|
||||
.into(),
|
||||
),
|
||||
DomainResponse::BootstrapResult(result) => UserAgentResponsePayload::BootstrapResult(
|
||||
match result {
|
||||
Ok(()) => ProtoBootstrapResult::Success,
|
||||
Err(BootstrapError::AlreadyBootstrapped) => {
|
||||
ProtoBootstrapResult::AlreadyBootstrapped
|
||||
}
|
||||
Err(BootstrapError::InvalidKey) => ProtoBootstrapResult::InvalidKey,
|
||||
}
|
||||
.into(),
|
||||
),
|
||||
DomainResponse::VaultState(state) => UserAgentResponsePayload::VaultState(
|
||||
match state {
|
||||
VaultState::Unbootstrapped => ProtoVaultState::Unbootstrapped,
|
||||
VaultState::Sealed => ProtoVaultState::Sealed,
|
||||
VaultState::Unsealed => ProtoVaultState::Unsealed,
|
||||
}
|
||||
.into(),
|
||||
),
|
||||
DomainResponse::ClientConnectionRequest { pubkey } => {
|
||||
UserAgentResponsePayload::ClientConnectionRequest(ClientConnectionRequest {
|
||||
pubkey: pubkey.to_bytes().to_vec(),
|
||||
})
|
||||
}
|
||||
DomainResponse::ClientConnectionCancel => {
|
||||
UserAgentResponsePayload::ClientConnectionCancel(ClientConnectionCancel {})
|
||||
}
|
||||
DomainResponse::EvmWalletCreate(result) => {
|
||||
UserAgentResponsePayload::EvmWalletCreate(WalletCreateResponse {
|
||||
result: Some(match result {
|
||||
Ok(()) => WalletCreateResult::Wallet(WalletEntry {
|
||||
address: Vec::new(),
|
||||
}),
|
||||
Err(_) => WalletCreateResult::Error(ProtoEvmError::Internal.into()),
|
||||
}),
|
||||
})
|
||||
}
|
||||
DomainResponse::EvmWalletList(wallets) => {
|
||||
UserAgentResponsePayload::EvmWalletList(WalletListResponse {
|
||||
result: Some(WalletListResult::Wallets(WalletList {
|
||||
wallets: wallets
|
||||
.into_iter()
|
||||
.map(|addr| WalletEntry {
|
||||
address: addr.as_slice().to_vec(),
|
||||
})
|
||||
.collect(),
|
||||
})),
|
||||
})
|
||||
}
|
||||
};
|
||||
|
||||
UserAgentResponse {
|
||||
payload: Some(payload),
|
||||
}
|
||||
}
|
||||
|
||||
fn error_to_status(value: TransportResponseError) -> Status {
|
||||
match value {
|
||||
TransportResponseError::UnexpectedRequestPayload => {
|
||||
Status::invalid_argument("Expected message with payload")
|
||||
}
|
||||
TransportResponseError::InvalidStateForUnsealEncryptedKey => {
|
||||
Status::failed_precondition("Invalid state for unseal encrypted key")
|
||||
}
|
||||
TransportResponseError::InvalidClientPubkeyLength => {
|
||||
Status::invalid_argument("client_pubkey must be 32 bytes")
|
||||
}
|
||||
TransportResponseError::StateTransitionFailed => Status::internal("State machine error"),
|
||||
TransportResponseError::KeyHolderActorUnreachable => {
|
||||
Status::internal("Vault is not available")
|
||||
}
|
||||
TransportResponseError::Auth(ref err) => auth_error_status(err),
|
||||
TransportResponseError::ConnectionRegistrationFailed => {
|
||||
Status::internal("Failed registering connection")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl Bi<DomainRequest, Result<DomainResponse, TransportResponseError>> for GrpcTransport {
|
||||
async fn send(
|
||||
&mut self,
|
||||
item: Result<DomainResponse, TransportResponseError>,
|
||||
) -> Result<(), TransportError> {
|
||||
let outbound = match item {
|
||||
Ok(message) => Ok(Self::response_to_proto(message)),
|
||||
Err(err) => Err(Self::error_to_status(err)),
|
||||
};
|
||||
|
||||
self.sender
|
||||
.send(outbound)
|
||||
.await
|
||||
.map_err(|_| TransportError::ChannelClosed)
|
||||
}
|
||||
|
||||
async fn recv(&mut self) -> Option<DomainRequest> {
|
||||
match self.receiver.next().await {
|
||||
Some(Ok(item)) => match Self::request_to_domain(item) {
|
||||
Ok(request) => Some(request),
|
||||
Err(status) => {
|
||||
let _ = self.sender.send(Err(status)).await;
|
||||
None
|
||||
}
|
||||
},
|
||||
Some(Err(error)) => {
|
||||
tracing::error!(error = ?error, "grpc user-agent recv failed; closing stream");
|
||||
None
|
||||
}
|
||||
None => None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn parse_auth_pubkey(key_type: i32, pubkey: Vec<u8>) -> Result<AuthPublicKey, Status> {
|
||||
match ProtoKeyType::try_from(key_type).unwrap_or(ProtoKeyType::Unspecified) {
|
||||
ProtoKeyType::Unspecified | ProtoKeyType::Ed25519 => {
|
||||
let bytes: [u8; 32] = pubkey
|
||||
.as_slice()
|
||||
.try_into()
|
||||
.map_err(|_| Status::invalid_argument("invalid Ed25519 public key length"))?;
|
||||
let key = ed25519_dalek::VerifyingKey::from_bytes(&bytes)
|
||||
.map_err(|_| Status::invalid_argument("invalid Ed25519 public key encoding"))?;
|
||||
Ok(AuthPublicKey::Ed25519(key))
|
||||
}
|
||||
ProtoKeyType::EcdsaSecp256k1 => {
|
||||
let key = k256::ecdsa::VerifyingKey::from_sec1_bytes(&pubkey)
|
||||
.map_err(|_| Status::invalid_argument("invalid secp256k1 public key encoding"))?;
|
||||
Ok(AuthPublicKey::EcdsaSecp256k1(key))
|
||||
}
|
||||
ProtoKeyType::Rsa => {
|
||||
use rsa::pkcs8::DecodePublicKey as _;
|
||||
|
||||
let key = rsa::RsaPublicKey::from_public_key_der(&pubkey)
|
||||
.map_err(|_| Status::invalid_argument("invalid RSA public key encoding"))?;
|
||||
Ok(AuthPublicKey::Rsa(key))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn auth_error_status(value: &user_agent::auth::Error) -> Status {
|
||||
use user_agent::auth::Error;
|
||||
|
||||
match value {
|
||||
Error::UnexpectedMessagePayload | Error::InvalidClientPubkeyLength => {
|
||||
Status::invalid_argument(value.to_string())
|
||||
}
|
||||
Error::InvalidAuthPubkeyEncoding => {
|
||||
Status::invalid_argument("Failed to convert pubkey to VerifyingKey")
|
||||
}
|
||||
Error::PublicKeyNotRegistered | Error::InvalidChallengeSolution => {
|
||||
Status::unauthenticated(value.to_string())
|
||||
}
|
||||
Error::InvalidBootstrapToken => Status::invalid_argument("Invalid bootstrap token"),
|
||||
Error::Transport => Status::internal("Transport error"),
|
||||
Error::BootstrapperActorUnreachable => {
|
||||
Status::internal("Bootstrap token consumption failed")
|
||||
}
|
||||
Error::DatabasePoolUnavailable => Status::internal("Database pool error"),
|
||||
Error::DatabaseOperationFailed => Status::internal("Database error"),
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user