use arbiter_proto::{ proto::{ client::{ClientRequest, ClientResponse}, user_agent::{UserAgentRequest, UserAgentResponse}, }, transport::grpc::GrpcBi, }; use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; use tonic::{Request, Response, Status, async_trait}; use tracing::info; use crate::{ DEFAULT_CHANNEL_SIZE, actors::{ client::{ClientConnection, connect_client}, user_agent::UserAgentConnection, }, grpc::{self, user_agent::start}, }; pub mod client; pub mod user_agent; #[async_trait] impl arbiter_proto::proto::arbiter_service_server::ArbiterService for super::Server { type UserAgentStream = ReceiverStream>; type ClientStream = ReceiverStream>; #[tracing::instrument(level = "debug", skip(self))] async fn client( &self, request: Request>, ) -> Result, Status> { let req_stream = request.into_inner(); let (tx, rx) = mpsc::channel(DEFAULT_CHANNEL_SIZE); let transport = client::GrpcTransport::new(tx, req_stream); let props = ClientConnection::new( self.context.db.clone(), Box::new(transport), self.context.actors.clone(), ); tokio::spawn(connect_client(props)); info!(event = "connection established", "grpc.client"); Ok(Response::new(ReceiverStream::new(rx))) } #[tracing::instrument(level = "debug", skip(self))] async fn user_agent( &self, request: Request>, ) -> Result, Status> { let req_stream = request.into_inner(); let (bi, rx) = GrpcBi::from_bi_stream(req_stream); tokio::spawn(start( UserAgentConnection { db: self.context.db.clone(), actors: self.context.actors.clone(), }, bi, )); info!(event = "connection established", "grpc.user_agent"); Ok(Response::new(rx)) } }