refactor(transport): implemented Bi stream based abstraction for actor communication with next loop override
This commit is contained in:
@@ -17,7 +17,7 @@ miette.workspace = true
|
||||
thiserror.workspace = true
|
||||
rustls-pki-types.workspace = true
|
||||
base64 = "0.22.1"
|
||||
|
||||
tracing.workspace = true
|
||||
|
||||
[build-dependencies]
|
||||
tonic-prost-build = "0.14.3"
|
||||
|
||||
@@ -1,46 +1,125 @@
|
||||
use futures::{Stream, StreamExt};
|
||||
use tokio::sync::mpsc::{self, error::SendError};
|
||||
use std::marker::PhantomData;
|
||||
|
||||
use futures::StreamExt;
|
||||
use tokio::sync::mpsc;
|
||||
use tonic::{Status, Streaming};
|
||||
|
||||
/// Errors returned by transport adapters implementing [`Bi`].
|
||||
pub enum Error {
|
||||
/// The outbound side of the transport is no longer accepting messages.
|
||||
ChannelClosed,
|
||||
}
|
||||
|
||||
// Abstraction for stream for sans-io capabilities
|
||||
pub trait Bi<T, U>: Stream<Item = Result<T, Status>> + Send + Sync + 'static {
|
||||
type Error;
|
||||
/// Minimal bidirectional transport abstraction used by protocol code.
|
||||
///
|
||||
/// `Bi<T, U, E>` models a duplex channel with:
|
||||
/// - inbound items of type `T` read via [`Bi::recv`]
|
||||
/// - outbound success items of type `U` or domain errors of type `E` written via [`Bi::send`]
|
||||
///
|
||||
/// The trait intentionally exposes only the operations the protocol layer needs,
|
||||
/// allowing it to work with gRPC streams and other transport implementations.
|
||||
///
|
||||
/// # Stream termination and errors
|
||||
///
|
||||
/// [`Bi::recv`] returns:
|
||||
/// - `Some(item)` when a new inbound message is available
|
||||
/// - `None` when the inbound stream ends or the underlying transport reports an error
|
||||
///
|
||||
/// Implementations may collapse transport-specific receive errors into `None`
|
||||
/// when the protocol does not need to distinguish them from normal stream
|
||||
/// termination.
|
||||
pub trait Bi<T, U, E>: Send + Sync + 'static {
|
||||
/// Sends one outbound result to the peer.
|
||||
fn send(
|
||||
&mut self,
|
||||
item: Result<U, Status>,
|
||||
) -> impl std::future::Future<Output = Result<(), Self::Error>> + Send;
|
||||
item: Result<U, E>,
|
||||
) -> impl std::future::Future<Output = Result<(), Error>> + Send;
|
||||
|
||||
/// Receives the next inbound item.
|
||||
///
|
||||
/// Returns `None` when the inbound stream is finished or can no longer
|
||||
/// produce items.
|
||||
fn recv(&mut self) -> impl std::future::Future<Output = Option<T>> + Send;
|
||||
}
|
||||
|
||||
// Bi-directional stream abstraction for handling gRPC streaming requests and responses
|
||||
pub struct BiStream<T, U> {
|
||||
pub request_stream: Streaming<T>,
|
||||
pub response_sender: mpsc::Sender<Result<U, Status>>,
|
||||
/// [`Bi`] adapter backed by a tonic gRPC bidirectional stream.
|
||||
///
|
||||
/// Outbound items are sent through a Tokio MPSC sender, while inbound items are
|
||||
/// read from tonic [`Streaming`].
|
||||
pub struct GrpcAdapter<Inbound, Outbound, E> {
|
||||
sender: mpsc::Sender<Result<Outbound, Status>>,
|
||||
receiver: Streaming<Inbound>,
|
||||
_error: PhantomData<E>,
|
||||
}
|
||||
|
||||
impl<T, U> Stream for BiStream<T, U>
|
||||
where
|
||||
T: Send + 'static,
|
||||
U: Send + 'static,
|
||||
{
|
||||
type Item = Result<T, Status>;
|
||||
impl<Inbound, Outbound, E> GrpcAdapter<Inbound, Outbound, E> {
|
||||
|
||||
fn poll_next(
|
||||
mut self: std::pin::Pin<&mut Self>,
|
||||
cx: &mut std::task::Context<'_>,
|
||||
) -> std::task::Poll<Option<Self::Item>> {
|
||||
self.request_stream.poll_next_unpin(cx)
|
||||
/// Creates a new gRPC-backed [`Bi`] adapter.
|
||||
pub fn new(sender: mpsc::Sender<Result<Outbound, Status>>, receiver: Streaming<Inbound>) -> Self {
|
||||
Self {
|
||||
sender,
|
||||
receiver,
|
||||
_error: PhantomData,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T, U> Bi<T, U> for BiStream<T, U>
|
||||
impl<Inbound, Outbound, E> Bi<Inbound, Outbound, E> for GrpcAdapter<Inbound, Outbound, E>
|
||||
where
|
||||
T: Send + 'static,
|
||||
U: Send + 'static,
|
||||
Inbound: Send + 'static,
|
||||
Outbound: Send + 'static,
|
||||
E: Into<Status> + Send + Sync + 'static,
|
||||
{
|
||||
type Error = SendError<Result<U, Status>>;
|
||||
#[tracing::instrument(level = "trace", skip(self, item))]
|
||||
async fn send(&mut self, item: Result<Outbound, E>) -> Result<(), Error> {
|
||||
self.sender
|
||||
.send(item.map_err(Into::into))
|
||||
.await
|
||||
.map_err(|_| Error::ChannelClosed)
|
||||
}
|
||||
|
||||
async fn send(&mut self, item: Result<U, Status>) -> Result<(), Self::Error> {
|
||||
self.response_sender.send(item).await
|
||||
#[tracing::instrument(level = "trace", skip(self))]
|
||||
async fn recv(&mut self) -> Option<Inbound> {
|
||||
self.receiver.next().await.transpose().ok().flatten()
|
||||
}
|
||||
}
|
||||
|
||||
/// No-op [`Bi`] transport for tests and manual actor usage.
|
||||
///
|
||||
/// `send` drops all items and succeeds. [`Bi::recv`] never resolves and therefore
|
||||
/// does not busy-wait or spuriously close the stream.
|
||||
pub struct DummyTransport<T, U, E> {
|
||||
_marker: PhantomData<(T, U, E)>,
|
||||
}
|
||||
|
||||
impl<T, U, E> DummyTransport<T, U, E> {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
_marker: PhantomData,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T, U, E> Default for DummyTransport<T, U, E> {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
}
|
||||
|
||||
impl<T, U, E> Bi<T, U, E> for DummyTransport<T, U, E>
|
||||
where
|
||||
T: Send + Sync + 'static,
|
||||
U: Send + Sync + 'static,
|
||||
E: Send + Sync + 'static,
|
||||
{
|
||||
async fn send(&mut self, _item: Result<U, E>) -> Result<(), Error> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn recv(&mut self) -> impl std::future::Future<Output = Option<T>> + Send {
|
||||
async {
|
||||
std::future::pending::<()>().await;
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user