refactor(server::useragent): migrated to new connection design

This commit is contained in:
hdbg
2026-03-17 18:39:12 +01:00
committed by Stas
parent 47caec38a6
commit f77ac2ea12
20 changed files with 1151 additions and 958 deletions

View File

@@ -1,14 +1,13 @@
use arbiter_proto::{
proto::client::{
AuthChallenge as ProtoAuthChallenge,
AuthChallengeRequest as ProtoAuthChallengeRequest,
AuthChallenge as ProtoAuthChallenge, AuthChallengeRequest as ProtoAuthChallengeRequest,
AuthChallengeSolution as ProtoAuthChallengeSolution, AuthOk as ProtoAuthOk,
ClientConnectError, ClientRequest, ClientResponse,
client_connect_error::Code as ProtoClientConnectErrorCode,
client_request::Payload as ClientRequestPayload,
client_response::Payload as ClientResponsePayload,
},
transport::{Bi, Error as TransportError},
transport::{Bi, Error as TransportError, Sender},
};
use async_trait::async_trait;
use futures::StreamExt as _;
@@ -37,9 +36,9 @@ impl GrpcTransport {
Some(ClientRequestPayload::AuthChallengeRequest(ProtoAuthChallengeRequest {
pubkey,
})) => Ok(DomainRequest::AuthChallengeRequest { pubkey }),
Some(ClientRequestPayload::AuthChallengeSolution(
ProtoAuthChallengeSolution { signature },
)) => Ok(DomainRequest::AuthChallengeSolution { signature }),
Some(ClientRequestPayload::AuthChallengeSolution(ProtoAuthChallengeSolution {
signature,
})) => Ok(DomainRequest::AuthChallengeSolution { signature }),
None => Err(Status::invalid_argument("Missing client request payload")),
}
}
@@ -86,8 +85,11 @@ impl GrpcTransport {
}
#[async_trait]
impl Bi<DomainRequest, Result<DomainResponse, ClientError>> for GrpcTransport {
async fn send(&mut self, item: Result<DomainResponse, ClientError>) -> Result<(), TransportError> {
impl Sender<Result<DomainResponse, ClientError>> for GrpcTransport {
async fn send(
&mut self,
item: Result<DomainResponse, ClientError>,
) -> Result<(), TransportError> {
let outbound = match item {
Ok(message) => Ok(Self::response_to_proto(message)),
Err(err) => Err(Self::error_to_status(err)),
@@ -98,7 +100,10 @@ impl Bi<DomainRequest, Result<DomainResponse, ClientError>> for GrpcTransport {
.await
.map_err(|_| TransportError::ChannelClosed)
}
}
#[async_trait]
impl Bi<DomainRequest, Result<DomainResponse, ClientError>> for GrpcTransport {
async fn recv(&mut self) -> Option<DomainRequest> {
match self.receiver.next().await {
Some(Ok(item)) => match Self::request_to_domain(item) {

View File

@@ -1,7 +1,9 @@
use arbiter_proto::proto::{
client::{ClientRequest, ClientResponse},
user_agent::{UserAgentRequest, UserAgentResponse},
use arbiter_proto::{
proto::{
client::{ClientRequest, ClientResponse},
user_agent::{UserAgentRequest, UserAgentResponse},
},
transport::grpc::GrpcBi,
};
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
@@ -10,7 +12,11 @@ use tracing::info;
use crate::{
DEFAULT_CHANNEL_SIZE,
actors::{client::{ClientConnection, connect_client}, user_agent::{UserAgentConnection, connect_user_agent}},
actors::{
client::{ClientConnection, connect_client},
user_agent::UserAgentConnection,
},
grpc::{self, user_agent::start},
};
pub mod client;
@@ -48,18 +54,19 @@ impl arbiter_proto::proto::arbiter_service_server::ArbiterService for super::Ser
request: Request<tonic::Streaming<UserAgentRequest>>,
) -> Result<Response<Self::UserAgentStream>, Status> {
let req_stream = request.into_inner();
let (tx, rx) = mpsc::channel(DEFAULT_CHANNEL_SIZE);
let transport = user_agent::GrpcTransport::new(tx, req_stream);
let props = UserAgentConnection::new(
self.context.db.clone(),
self.context.actors.clone(),
Box::new(transport),
);
tokio::spawn(connect_user_agent(props));
let (bi, rx) = GrpcBi::from_bi_stream(req_stream);
tokio::spawn(start(
UserAgentConnection {
db: self.context.db.clone(),
actors: self.context.actors.clone(),
},
bi,
));
info!(event = "connection established", "grpc.user_agent");
Ok(Response::new(ReceiverStream::new(rx)))
Ok(Response::new(rx))
}
}

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,151 @@
use arbiter_proto::{
proto::{
self,
evm::{
EtherTransferSettings as ProtoEtherTransferSettings, EvmError as ProtoEvmError,
EvmGrantCreateRequest, EvmGrantCreateResponse, EvmGrantDeleteRequest,
EvmGrantDeleteResponse, EvmGrantList, EvmGrantListResponse, GrantEntry,
SharedSettings as ProtoSharedSettings, SpecificGrant as ProtoSpecificGrant,
TokenTransferSettings as ProtoTokenTransferSettings,
VolumeRateLimit as ProtoVolumeRateLimit, WalletCreateResponse, WalletEntry, WalletList,
WalletListResponse, evm_grant_create_response::Result as EvmGrantCreateResult,
evm_grant_delete_response::Result as EvmGrantDeleteResult,
evm_grant_list_response::Result as EvmGrantListResult,
specific_grant::Grant as ProtoSpecificGrantType,
wallet_create_response::Result as WalletCreateResult,
wallet_list_response::Result as WalletListResult,
},
user_agent::{
AuthChallenge as ProtoAuthChallenge, AuthChallengeRequest as ProtoAuthChallengeRequest,
AuthChallengeSolution as ProtoAuthChallengeSolution, AuthResult as ProtoAuthResult,
BootstrapEncryptedKey as ProtoBootstrapEncryptedKey,
BootstrapResult as ProtoBootstrapResult, ClientConnectionCancel,
ClientConnectionRequest, ClientConnectionResponse, KeyType as ProtoKeyType,
UnsealEncryptedKey as ProtoUnsealEncryptedKey, UnsealResult as ProtoUnsealResult,
UnsealStart, UnsealStartResponse, UserAgentRequest, UserAgentResponse,
VaultState as ProtoVaultState, user_agent_request::Payload as UserAgentRequestPayload,
user_agent_response::Payload as UserAgentResponsePayload,
},
},
transport::{Bi, Error as TransportError, Receiver, Sender, grpc::GrpcBi},
};
use async_trait::async_trait;
use tonic::{Status, Streaming};
use tracing::{info, warn};
use crate::{
actors::user_agent::{
self, AuthPublicKey, OutOfBand as DomainResponse, UserAgentConnection, auth,
},
db::models::KeyType,
evm::policies::{
Grant, SharedGrantSettings, SpecificGrant, TransactionRateLimit, VolumeRateLimit,
ether_transfer, token_transfers,
},
};
use alloy::primitives::{Address, U256};
use chrono::{DateTime, TimeZone, Utc};
pub struct AuthTransportAdapter<'a>(&'a mut GrpcBi<UserAgentRequest, UserAgentResponse>);
#[async_trait]
impl Sender<Result<auth::Outbound, auth::Error>> for AuthTransportAdapter<'_> {
async fn send(
&mut self,
item: Result<auth::Outbound, auth::Error>,
) -> Result<(), TransportError> {
use auth::{Error, Outbound};
let response = match item {
Ok(Outbound::AuthChallenge { nonce }) => Ok(UserAgentResponsePayload::AuthChallenge(
ProtoAuthChallenge { nonce },
)),
Ok(Outbound::AuthSuccess) => Ok(UserAgentResponsePayload::AuthResult(
ProtoAuthResult::Success.into(),
)),
Err(Error::UnregisteredPublicKey) => Ok(UserAgentResponsePayload::AuthResult(
ProtoAuthResult::InvalidKey.into(),
)),
Err(Error::InvalidChallengeSolution) => Ok(UserAgentResponsePayload::AuthResult(
ProtoAuthResult::InvalidSignature.into(),
)),
Err(Error::InvalidBootstrapToken) => Ok(UserAgentResponsePayload::BootstrapResult(
ProtoAuthResult::TokenInvalid.into(),
)),
Err(Error::Internal { details }) => Err(Status::internal(details)),
Err(Error::Transport) => Err(Status::unavailable("transport error")),
};
self.0
.send(response.map(|r| UserAgentResponse { payload: Some(r) }))
.await
}
}
#[async_trait]
impl Receiver<auth::Inbound> for AuthTransportAdapter<'_> {
async fn recv(&mut self) -> Option<auth::Inbound> {
let Ok(UserAgentRequest {
payload: Some(payload),
}) = self.0.recv().await?
else {
warn!(
event = "received request with empty payload",
"grpc.useragent.auth_adapter"
);
return None;
};
match payload {
UserAgentRequestPayload::AuthChallengeRequest(ProtoAuthChallengeRequest {
pubkey,
bootstrap_token,
key_type,
}) => {
let Ok(key_type) = ProtoKeyType::try_from(key_type) else {
warn!(
event = "received request with invalid key type",
"grpc.useragent.auth_adapter"
);
return None;
};
let key_type = match key_type {
ProtoKeyType::Ed25519 => KeyType::Ed25519,
ProtoKeyType::EcdsaSecp256k1 => KeyType::EcdsaSecp256k1,
ProtoKeyType::Rsa => KeyType::Rsa,
ProtoKeyType::Unspecified => {
warn!(
event = "received request with unspecified key type",
"grpc.useragent.auth_adapter"
);
return None;
}
};
let Ok(pubkey) = AuthPublicKey::try_from((key_type, pubkey)) else {
warn!(
event = "received request with invalid public key",
"grpc.useragent.auth_adapter"
);
return None;
};
Some(auth::Inbound::AuthChallengeRequest {
pubkey,
bootstrap_token,
})
}
UserAgentRequestPayload::AuthChallengeSolution(ProtoAuthChallengeSolution {
signature,
}) => Some(auth::Inbound::AuthChallengeSolution { signature }),
_ => None, // Ignore other request types for this adapter
}
}
}
impl Bi<auth::Inbound, Result<auth::Outbound, auth::Error>> for AuthTransportAdapter<'_> {}
pub async fn start(
conn: &mut UserAgentConnection,
bi: &mut GrpcBi<UserAgentRequest, UserAgentResponse>,
) -> Result<AuthPublicKey, auth::Error> {
let mut transport = AuthTransportAdapter(bi);
auth::authenticate(conn, transport).await
}