refactor(transport): simplify converters
This commit is contained in:
@@ -3,9 +3,9 @@
|
|||||||
//! This module separates three concerns:
|
//! This module separates three concerns:
|
||||||
//!
|
//!
|
||||||
//! - protocol/session logic wants a small duplex interface ([`Bi`])
|
//! - protocol/session logic wants a small duplex interface ([`Bi`])
|
||||||
//! - transport adapters need to push concrete stream items to an underlying IO layer
|
//! - transport adapters push concrete stream items to an underlying IO layer
|
||||||
//! - server/client boundaries may need to translate domain outbounds into transport
|
//! - transport boundaries translate between protocol-facing and transport-facing
|
||||||
//! framing (for example, a tonic stream item)
|
//! item types via direction-specific converters
|
||||||
//!
|
//!
|
||||||
//! [`Bi`] is intentionally minimal and transport-agnostic:
|
//! [`Bi`] is intentionally minimal and transport-agnostic:
|
||||||
//! - [`Bi::recv`] yields inbound protocol messages
|
//! - [`Bi::recv`] yields inbound protocol messages
|
||||||
@@ -25,60 +25,57 @@
|
|||||||
//! inbound-related converter parameters are declared before outbound-related
|
//! inbound-related converter parameters are declared before outbound-related
|
||||||
//! converter parameters.
|
//! converter parameters.
|
||||||
//!
|
//!
|
||||||
//! [`ProtocolConverter`] is the boundary object that converts a protocol/domain
|
//! [`RecvConverter`] and [`SendConverter`] are infallible conversion traits used
|
||||||
//! outbound item into the concrete outbound item expected by a transport sender.
|
//! by adapters to map between protocol-facing and transport-facing item types.
|
||||||
//! The conversion is infallible, so domain-level recoverable failures should be
|
//! The traits themselves are not result-aware; adapters decide how transport
|
||||||
//! represented inside the domain outbound type itself (for example,
|
//! errors are handled before (or instead of) conversion.
|
||||||
//! `Result<Message, DomainError>`).
|
|
||||||
//!
|
//!
|
||||||
//! [`GrpcAdapter`] combines:
|
//! [`grpc::GrpcAdapter`] combines:
|
||||||
//! - a tonic inbound stream
|
//! - a tonic inbound stream
|
||||||
//! - a Tokio sender for outbound transport items
|
//! - a Tokio sender for outbound transport items
|
||||||
//! - a [`ProtocolConverter`] for the receive path
|
//! - a [`RecvConverter`] for the receive path
|
||||||
//! - a [`ProtocolConverter`] for the send path
|
//! - a [`SendConverter`] for the send path
|
||||||
//!
|
//!
|
||||||
//! [`DummyTransport`] is a no-op implementation useful for tests and local actor
|
//! [`DummyTransport`] is a no-op implementation useful for tests and local actor
|
||||||
//! execution where no real network stream exists.
|
//! execution where no real network stream exists.
|
||||||
//!
|
//!
|
||||||
//! # Component Interaction
|
//! # Component Interaction
|
||||||
//!
|
//!
|
||||||
//! The typical layering looks like this:
|
|
||||||
//!
|
|
||||||
//! ```text
|
//! ```text
|
||||||
//! inbound (network -> protocol)
|
//! inbound (network -> protocol)
|
||||||
//! ============================
|
//! ============================
|
||||||
//!
|
//!
|
||||||
//! tonic::Streaming<RecvTransport> -> GrpcAdapter::recv() -> Bi::recv() -> protocol/session actor
|
//! tonic::Streaming<RecvTransport>
|
||||||
//! |
|
//! -> grpc::GrpcAdapter::recv()
|
||||||
//! +--> recv ProtocolConverter::convert(transport)
|
//! |
|
||||||
//!
|
//! +--> on `Ok(item)`: RecvConverter::convert(RecvTransport) -> Inbound
|
||||||
|
//! +--> on `Err(status)`: log error and close stream (`None`)
|
||||||
|
//! -> Bi::recv()
|
||||||
|
//! -> protocol/session actor
|
||||||
|
//!
|
||||||
//! outbound (protocol -> network)
|
//! outbound (protocol -> network)
|
||||||
//! ==============================
|
//! ==============================
|
||||||
//!
|
//!
|
||||||
//! protocol/session actor -> Bi::send(domain outbound item, e.g. Result<Message, DomainError>)
|
//! protocol/session actor
|
||||||
//! -> GrpcAdapter::send()
|
//! -> Bi::send(Outbound)
|
||||||
//! |
|
//! -> grpc::GrpcAdapter::send()
|
||||||
//! +--> send ProtocolConverter::convert(domain)
|
//! |
|
||||||
//! -> Tokio mpsc::Sender<SendTransport> -> tonic response stream
|
//! +--> SendConverter::convert(Outbound) -> SendTransport
|
||||||
|
//! -> Tokio mpsc::Sender<SendTransport>
|
||||||
|
//! -> tonic response stream
|
||||||
//! ```
|
//! ```
|
||||||
//!
|
//!
|
||||||
//! # Design Notes
|
//! # Design Notes
|
||||||
//!
|
//!
|
||||||
//! - `recv()` collapses adapter-specific receive failures into `None`, which
|
//! - `send()` returns [`Error`] only for transport delivery failures (for
|
||||||
//! lets protocol code treat stream termination and transport receive failure as
|
//! example, when the outbound channel is closed).
|
||||||
//! "no more inbound items" when no finer distinction is required.
|
//! - [`grpc::GrpcAdapter`] logs tonic receive errors and treats them as stream
|
||||||
//! - `send()` returns [`Error`] only for transport delivery failures (for example,
|
//! closure (`None`).
|
||||||
//! when the outbound channel is closed).
|
//! - When protocol-facing and transport-facing types are identical, use
|
||||||
//! - Conversion policy lives outside protocol/session logic and can be defined at
|
//! [`IdentityRecvConverter`] / [`IdentitySendConverter`].
|
||||||
//! the transport boundary (such as a server endpoint module). When domain and
|
|
||||||
//! transport types are identical, [`IdentityConverter`] can be used.
|
|
||||||
|
|
||||||
use std::marker::PhantomData;
|
use std::marker::PhantomData;
|
||||||
|
|
||||||
use futures::StreamExt;
|
|
||||||
use tokio::sync::mpsc;
|
|
||||||
use tonic::Streaming;
|
|
||||||
|
|
||||||
/// Errors returned by transport adapters implementing [`Bi`].
|
/// Errors returned by transport adapters implementing [`Bi`].
|
||||||
pub enum Error {
|
pub enum Error {
|
||||||
/// The outbound side of the transport is no longer accepting messages.
|
/// The outbound side of the transport is no longer accepting messages.
|
||||||
@@ -90,62 +87,37 @@ pub enum Error {
|
|||||||
/// `Bi<Inbound, Outbound>` models a duplex channel with:
|
/// `Bi<Inbound, Outbound>` models a duplex channel with:
|
||||||
/// - inbound items of type `Inbound` read via [`Bi::recv`]
|
/// - inbound items of type `Inbound` read via [`Bi::recv`]
|
||||||
/// - outbound items of type `Outbound` written via [`Bi::send`]
|
/// - outbound items of type `Outbound` written via [`Bi::send`]
|
||||||
///
|
|
||||||
/// The trait intentionally exposes only the operations the protocol layer needs,
|
|
||||||
/// allowing it to work with gRPC streams and other transport implementations.
|
|
||||||
///
|
|
||||||
/// # Stream termination and errors
|
|
||||||
///
|
|
||||||
/// [`Bi::recv`] returns:
|
|
||||||
/// - `Some(item)` when a new inbound message is available
|
|
||||||
/// - `None` when the inbound stream ends or the underlying transport reports an error
|
|
||||||
///
|
|
||||||
/// Implementations may collapse transport-specific receive errors into `None`
|
|
||||||
/// when the protocol does not need to distinguish them from normal stream
|
|
||||||
/// termination.
|
|
||||||
pub trait Bi<Inbound, Outbound>: Send + Sync + 'static {
|
pub trait Bi<Inbound, Outbound>: Send + Sync + 'static {
|
||||||
/// Sends one outbound item to the peer.
|
|
||||||
fn send(
|
fn send(
|
||||||
&mut self,
|
&mut self,
|
||||||
item: Outbound,
|
item: Outbound,
|
||||||
) -> impl std::future::Future<Output = Result<(), Error>> + Send;
|
) -> impl std::future::Future<Output = Result<(), Error>> + Send;
|
||||||
|
|
||||||
/// Receives the next inbound item.
|
|
||||||
///
|
|
||||||
/// Returns `None` when the inbound stream is finished or can no longer
|
|
||||||
/// produce items.
|
|
||||||
fn recv(&mut self) -> impl std::future::Future<Output = Option<Inbound>> + Send;
|
fn recv(&mut self) -> impl std::future::Future<Output = Option<Inbound>> + Send;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Converts protocol/domain outbound items into transport-layer outbound items.
|
/// Converts transport-facing inbound items into protocol-facing inbound items.
|
||||||
///
|
pub trait RecvConverter: Send + Sync + 'static {
|
||||||
/// This trait is used by transport adapters that need to emit a concrete stream
|
type Input;
|
||||||
/// item type (for example, tonic server streams) while protocol code prefers to
|
type Output;
|
||||||
/// work with domain-oriented outbound values.
|
|
||||||
///
|
|
||||||
/// `convert` is infallible by design. Any recoverable protocol failure should be
|
|
||||||
/// represented in [`Self::Domain`] and mapped into the transport item in the
|
|
||||||
/// converter implementation.
|
|
||||||
pub trait ProtocolConverter: Send + Sync + 'static {
|
|
||||||
/// Outbound item produced by protocol/domain code.
|
|
||||||
type Domain;
|
|
||||||
|
|
||||||
/// Outbound item required by the transport sender.
|
fn convert(&self, item: Self::Input) -> Self::Output;
|
||||||
type Transport;
|
|
||||||
|
|
||||||
/// Maps a protocol/domain outbound item into the transport sender item.
|
|
||||||
fn convert(&self, item: Self::Domain) -> Self::Transport;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// A [`ProtocolConverter`] that forwards values unchanged.
|
/// Converts protocol/domain outbound items into transport-facing outbound items.
|
||||||
///
|
pub trait SendConverter: Send + Sync + 'static {
|
||||||
/// Useful when the protocol-facing and transport-facing item types are
|
type Input;
|
||||||
/// identical, but a converter is still required by an adapter API.
|
type Output;
|
||||||
pub struct IdentityConverter<T> {
|
|
||||||
|
fn convert(&self, item: Self::Input) -> Self::Output;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A [`RecvConverter`] that forwards values unchanged.
|
||||||
|
pub struct IdentityRecvConverter<T> {
|
||||||
_marker: PhantomData<T>,
|
_marker: PhantomData<T>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T> IdentityConverter<T> {
|
impl<T> IdentityRecvConverter<T> {
|
||||||
pub fn new() -> Self {
|
pub fn new() -> Self {
|
||||||
Self {
|
Self {
|
||||||
_marker: PhantomData,
|
_marker: PhantomData,
|
||||||
@@ -153,93 +125,133 @@ impl<T> IdentityConverter<T> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T> Default for IdentityConverter<T> {
|
impl<T> Default for IdentityRecvConverter<T> {
|
||||||
fn default() -> Self {
|
fn default() -> Self {
|
||||||
Self::new()
|
Self::new()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T> ProtocolConverter for IdentityConverter<T>
|
impl<T> RecvConverter for IdentityRecvConverter<T>
|
||||||
where
|
where
|
||||||
T: Send + Sync + 'static,
|
T: Send + Sync + 'static,
|
||||||
{
|
{
|
||||||
type Domain = T;
|
type Input = T;
|
||||||
type Transport = T;
|
type Output = T;
|
||||||
|
|
||||||
fn convert(&self, item: Self::Domain) -> Self::Transport {
|
fn convert(&self, item: Self::Input) -> Self::Output {
|
||||||
item
|
item
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// [`Bi`] adapter backed by a tonic gRPC bidirectional stream.
|
/// A [`SendConverter`] that forwards values unchanged.
|
||||||
///
|
pub struct IdentitySendConverter<T> {
|
||||||
/// The adapter owns converter instances for both directions:
|
_marker: PhantomData<T>,
|
||||||
/// - receive converter: transport inbound -> protocol inbound
|
|
||||||
/// - send converter: protocol outbound -> transport outbound
|
|
||||||
///
|
|
||||||
/// This keeps protocol actors decoupled from transport framing conventions in
|
|
||||||
/// both directions.
|
|
||||||
pub struct GrpcAdapter<InboundConverter: ProtocolConverter, OutboundConverter: ProtocolConverter> {
|
|
||||||
sender: mpsc::Sender<OutboundConverter::Transport>,
|
|
||||||
receiver: Streaming<InboundConverter::Domain>,
|
|
||||||
inbound_converter: InboundConverter,
|
|
||||||
outbound_converter: OutboundConverter,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<InboundConverter, OutboundConverter> GrpcAdapter<InboundConverter, OutboundConverter>
|
impl<T> IdentitySendConverter<T> {
|
||||||
where
|
pub fn new() -> Self {
|
||||||
InboundConverter: ProtocolConverter,
|
|
||||||
OutboundConverter: ProtocolConverter,
|
|
||||||
{
|
|
||||||
/// Creates a new gRPC-backed [`Bi`] adapter.
|
|
||||||
///
|
|
||||||
/// The provided converters define:
|
|
||||||
/// - the protocol outbound item and corresponding transport outbound item
|
|
||||||
/// - the transport inbound item and corresponding protocol inbound item
|
|
||||||
pub fn new(
|
|
||||||
sender: mpsc::Sender<OutboundConverter::Transport>,
|
|
||||||
receiver: Streaming<InboundConverter::Domain>,
|
|
||||||
inbound_converter: InboundConverter,
|
|
||||||
outbound_converter: OutboundConverter,
|
|
||||||
) -> Self {
|
|
||||||
Self {
|
Self {
|
||||||
sender,
|
_marker: PhantomData,
|
||||||
receiver,
|
|
||||||
inbound_converter,
|
|
||||||
outbound_converter,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<InboundConverter, OutboundConverter>
|
impl<T> Default for IdentitySendConverter<T> {
|
||||||
Bi<InboundConverter::Transport, OutboundConverter::Domain>
|
fn default() -> Self {
|
||||||
for GrpcAdapter<InboundConverter, OutboundConverter>
|
Self::new()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> SendConverter for IdentitySendConverter<T>
|
||||||
where
|
where
|
||||||
InboundConverter: ProtocolConverter,
|
T: Send + Sync + 'static,
|
||||||
OutboundConverter: ProtocolConverter,
|
|
||||||
OutboundConverter::Domain: Send + 'static,
|
|
||||||
OutboundConverter::Transport: Send + 'static,
|
|
||||||
InboundConverter::Transport: Send + 'static,
|
|
||||||
InboundConverter::Domain: Send + 'static,
|
|
||||||
{
|
{
|
||||||
#[tracing::instrument(level = "trace", skip(self, item))]
|
type Input = T;
|
||||||
async fn send(&mut self, item: OutboundConverter::Domain) -> Result<(), Error> {
|
type Output = T;
|
||||||
let outbound: OutboundConverter::Transport = self.outbound_converter.convert(item);
|
|
||||||
self.sender
|
fn convert(&self, item: Self::Input) -> Self::Output {
|
||||||
.send(outbound)
|
item
|
||||||
.await
|
}
|
||||||
.map_err(|_| Error::ChannelClosed)
|
}
|
||||||
|
|
||||||
|
/// gRPC-specific transport adapters and helpers.
|
||||||
|
pub mod grpc {
|
||||||
|
use futures::StreamExt;
|
||||||
|
use tokio::sync::mpsc;
|
||||||
|
use tonic::Streaming;
|
||||||
|
|
||||||
|
use super::{Bi, Error, RecvConverter, SendConverter};
|
||||||
|
|
||||||
|
/// [`Bi`] adapter backed by a tonic gRPC bidirectional stream.
|
||||||
|
///
|
||||||
|
|
||||||
|
/// Tonic receive errors are logged and treated as stream closure (`None`).
|
||||||
|
/// The receive converter is only invoked for successful inbound transport
|
||||||
|
/// items.
|
||||||
|
pub struct GrpcAdapter<InboundTransport, Inbound, InboundConverter, OutboundConverter>
|
||||||
|
where
|
||||||
|
InboundConverter: RecvConverter<Input = InboundTransport, Output = Inbound>,
|
||||||
|
OutboundConverter: SendConverter,
|
||||||
|
{
|
||||||
|
sender: mpsc::Sender<OutboundConverter::Output>,
|
||||||
|
receiver: Streaming<InboundTransport>,
|
||||||
|
inbound_converter: InboundConverter,
|
||||||
|
outbound_converter: OutboundConverter,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tracing::instrument(level = "trace", skip(self))]
|
|
||||||
async fn recv(&mut self) -> Option<InboundConverter::Transport> {
|
impl<InboundTransport, Inbound, InboundConverter, OutboundConverter>
|
||||||
self.receiver
|
GrpcAdapter<InboundTransport, Inbound, InboundConverter, OutboundConverter>
|
||||||
.next()
|
where
|
||||||
.await
|
InboundConverter: RecvConverter<Input = InboundTransport, Output = Inbound>,
|
||||||
.transpose()
|
OutboundConverter: SendConverter,
|
||||||
.ok()
|
{
|
||||||
.flatten()
|
pub fn new(
|
||||||
.map(|item| self.inbound_converter.convert(item))
|
sender: mpsc::Sender<OutboundConverter::Output>,
|
||||||
|
receiver: Streaming<InboundTransport>,
|
||||||
|
inbound_converter: InboundConverter,
|
||||||
|
outbound_converter: OutboundConverter,
|
||||||
|
) -> Self {
|
||||||
|
Self {
|
||||||
|
sender,
|
||||||
|
receiver,
|
||||||
|
inbound_converter,
|
||||||
|
outbound_converter,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
impl<InboundTransport, Inbound, InboundConverter, OutboundConverter> Bi<Inbound, OutboundConverter::Input>
|
||||||
|
for GrpcAdapter<InboundTransport, Inbound, InboundConverter, OutboundConverter>
|
||||||
|
where
|
||||||
|
InboundTransport: Send + 'static,
|
||||||
|
Inbound: Send + 'static,
|
||||||
|
InboundConverter: RecvConverter<Input = InboundTransport, Output = Inbound>,
|
||||||
|
OutboundConverter: SendConverter,
|
||||||
|
OutboundConverter::Input: Send + 'static,
|
||||||
|
OutboundConverter::Output: Send + 'static,
|
||||||
|
{
|
||||||
|
#[tracing::instrument(level = "trace", skip(self, item))]
|
||||||
|
async fn send(&mut self, item: OutboundConverter::Input) -> Result<(), Error> {
|
||||||
|
let outbound = self.outbound_converter.convert(item);
|
||||||
|
self.sender
|
||||||
|
.send(outbound)
|
||||||
|
.await
|
||||||
|
.map_err(|_| Error::ChannelClosed)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tracing::instrument(level = "trace", skip(self))]
|
||||||
|
async fn recv(&mut self) -> Option<Inbound> {
|
||||||
|
match self.receiver.next().await {
|
||||||
|
Some(Ok(item)) => Some(self.inbound_converter.convert(item)),
|
||||||
|
Some(Err(error)) => {
|
||||||
|
tracing::error!(error = ?error, "grpc transport recv failed; closing stream");
|
||||||
|
None
|
||||||
|
}
|
||||||
|
None => None,
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -1,7 +1,7 @@
|
|||||||
#![forbid(unsafe_code)]
|
#![forbid(unsafe_code)]
|
||||||
use arbiter_proto::{
|
use arbiter_proto::{
|
||||||
proto::{ClientRequest, ClientResponse, UserAgentRequest, UserAgentResponse},
|
proto::{ClientRequest, ClientResponse, UserAgentRequest, UserAgentResponse},
|
||||||
transport::{GrpcAdapter, IdentityConverter, ProtocolConverter},
|
transport::{IdentityRecvConverter, SendConverter, grpc},
|
||||||
};
|
};
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use kameo::actor::Spawn;
|
use kameo::actor::Spawn;
|
||||||
@@ -28,13 +28,14 @@ const DEFAULT_CHANNEL_SIZE: usize = 1000;
|
|||||||
///
|
///
|
||||||
/// The conversion is defined at the server boundary so the actor module remains
|
/// The conversion is defined at the server boundary so the actor module remains
|
||||||
/// focused on domain semantics and does not depend on tonic status encoding.
|
/// focused on domain semantics and does not depend on tonic status encoding.
|
||||||
struct UserAgentGrpcConverter;
|
struct UserAgentGrpcSender;
|
||||||
|
|
||||||
impl ProtocolConverter for UserAgentGrpcConverter {
|
|
||||||
type Domain = Result<UserAgentResponse, UserAgentError>;
|
|
||||||
type Transport = Result<UserAgentResponse, Status>;
|
|
||||||
|
|
||||||
fn convert(&self, item: Self::Domain) -> Self::Transport {
|
impl SendConverter for UserAgentGrpcSender {
|
||||||
|
type Input = Result<UserAgentResponse, UserAgentError>;
|
||||||
|
type Output = Result<UserAgentResponse, Status>;
|
||||||
|
|
||||||
|
fn convert(&self, item: Self::Input) -> Self::Output {
|
||||||
match item {
|
match item {
|
||||||
Ok(message) => Ok(message),
|
Ok(message) => Ok(message),
|
||||||
Err(err) => Err(user_agent_error_status(err)),
|
Err(err) => Err(user_agent_error_status(err)),
|
||||||
@@ -116,11 +117,11 @@ impl arbiter_proto::proto::arbiter_service_server::ArbiterService for Server {
|
|||||||
let req_stream = request.into_inner();
|
let req_stream = request.into_inner();
|
||||||
let (tx, rx) = mpsc::channel(DEFAULT_CHANNEL_SIZE);
|
let (tx, rx) = mpsc::channel(DEFAULT_CHANNEL_SIZE);
|
||||||
|
|
||||||
let transport = GrpcAdapter::new(
|
let transport = grpc::GrpcAdapter::new(
|
||||||
tx,
|
tx,
|
||||||
req_stream,
|
req_stream,
|
||||||
IdentityConverter::<UserAgentRequest>::new(),
|
IdentityRecvConverter::<UserAgentRequest>::new(),
|
||||||
UserAgentGrpcConverter,
|
UserAgentGrpcSender,
|
||||||
);
|
);
|
||||||
UserAgentActor::spawn(UserAgentActor::new(self.context.clone(), transport));
|
UserAgentActor::spawn(UserAgentActor::new(self.context.clone(), transport));
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user