use futures::{Stream, StreamExt}; use tokio::sync::mpsc::{self, error::SendError}; use tonic::{Status, Streaming}; // Abstraction for stream for sans-io capabilities pub trait Bi: Stream> + Send + Sync + 'static { type Error; fn send( &mut self, item: Result, ) -> impl std::future::Future> + Send; } // Bi-directional stream abstraction for handling gRPC streaming requests and responses pub struct BiStream { pub request_stream: Streaming, pub response_sender: mpsc::Sender>, } impl Stream for BiStream where T: Send + 'static, U: Send + 'static, { type Item = Result; fn poll_next( mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, ) -> std::task::Poll> { self.request_stream.poll_next_unpin(cx) } } impl Bi for BiStream where T: Send + 'static, U: Send + 'static, { type Error = SendError>; async fn send(&mut self, item: Result) -> Result<(), Self::Error> { self.response_sender.send(item).await } }