use async_trait::async_trait; use futures::StreamExt; use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; use super::{Bi, Receiver, Sender}; pub struct GrpcSender { tx: mpsc::Sender>, } #[async_trait] impl Sender> for GrpcSender where Outbound: Send + Sync + 'static, { async fn send(&mut self, item: Result) -> Result<(), super::Error> { self.tx .send(item) .await .map_err(|_| super::Error::ChannelClosed) } } pub struct GrpcReceiver { rx: tonic::Streaming, } #[async_trait] impl Receiver> for GrpcReceiver where Inbound: Send + Sync + 'static, { async fn recv(&mut self) -> Option> { self.rx.next().await } } pub struct GrpcBi { sender: GrpcSender, receiver: GrpcReceiver, } impl GrpcBi where Inbound: Send + Sync + 'static, Outbound: Send + Sync + 'static, { pub fn from_bi_stream( receiver: tonic::Streaming, ) -> (Self, ReceiverStream>) { let (tx, rx) = mpsc::channel(10); let sender = GrpcSender { tx }; let receiver = GrpcReceiver { rx: receiver }; let bi = GrpcBi { sender, receiver }; (bi, ReceiverStream::new(rx)) } } #[async_trait] impl Sender> for GrpcBi where Inbound: Send + Sync + 'static, Outbound: Send + Sync + 'static, { async fn send(&mut self, item: Result) -> Result<(), super::Error> { self.sender.send(item).await } } #[async_trait] impl Receiver> for GrpcBi where Inbound: Send + Sync + 'static, Outbound: Send + Sync + 'static, { async fn recv(&mut self) -> Option> { self.receiver.recv().await } } impl Bi, Result> for GrpcBi where Inbound: Send + Sync + 'static, Outbound: Send + Sync + 'static, { } impl super::SplittableBi, Result> for GrpcBi where Inbound: Send + Sync + 'static, Outbound: Send + Sync + 'static, { type Sender = GrpcSender; type Receiver = GrpcReceiver; fn split(self) -> (Self::Sender, Self::Receiver) { (self.sender, self.receiver) } fn from_parts(sender: Self::Sender, receiver: Self::Receiver) -> Self { GrpcBi { sender, receiver } } }