//! Transport-facing abstractions shared by protocol/session code. //! //! This module defines a small set of transport traits that actors and other //! protocol code can depend on without knowing anything about the concrete //! transport underneath. //! //! The abstraction is split into: //! - [`Sender`] for outbound delivery //! - [`Receiver`] for inbound delivery //! - [`Bi`] as the combined duplex form (`Sender + Receiver`) //! //! This split lets code depend only on the half it actually needs. For //! example, some actor/session code only sends out-of-band messages, while //! auth/state-machine code may need full duplex access. //! //! [`Bi`] remains intentionally minimal and transport-agnostic: //! - [`Receiver::recv`] yields inbound messages //! - [`Sender::send`] accepts outbound messages //! //! Transport-specific adapters, including protobuf or gRPC bridges, live in the //! crates that own those boundaries rather than in `arbiter-proto`. //! //! [`Bi`] deliberately does not model request/response correlation. Some //! transports may carry multiplexed request/response traffic, some may emit //! out-of-band messages, and some may be one-message-at-a-time state machines. //! Correlation concerns such as request IDs, pending response maps, and //! out-of-band routing belong in the adapter or connection layer built on top //! of [`Bi`], not in this abstraction itself. //! //! # Generic Ordering Rule //! //! This module consistently uses `Inbound` first and `Outbound` second in //! generic parameter lists. //! //! For [`Receiver`], [`Sender`], and [`Bi`], this means: //! - `Receiver` //! - `Sender` //! - `Bi` //! //! Concretely, for [`Bi`]: //! - `recv() -> Option` //! - `send(Outbound)` //! //! [`expect_message`] is a small helper for linear protocol steps: it reads one //! inbound message from a transport and extracts a typed value from it, failing //! if the channel closes or the message shape is not what the caller expected. //! //! [`DummyTransport`] is a no-op implementation useful for tests and local //! actor execution where no real stream exists. //! //! # Design Notes //! //! - [`Bi::send`] returns [`Error`] only for transport delivery failures, such //! as a closed outbound channel. //! - [`Bi::recv`] returns `None` when the underlying transport closes. //! - Message translation is intentionally out of scope for this module. use std::marker::PhantomData; use async_trait::async_trait; /// Errors returned by transport adapters implementing [`Bi`]. #[derive(thiserror::Error, Debug)] pub enum Error { #[error("Transport channel is closed")] ChannelClosed, #[error("Unexpected message received")] UnexpectedMessage, } /// Receives one message from `transport` and extracts a value from it using /// `extractor`. Returns [`Error::ChannelClosed`] if the transport closes and /// [`Error::UnexpectedMessage`] if `extractor` returns `None`. pub async fn expect_message( transport: &mut T, extractor: F, ) -> Result where T: Bi + ?Sized, F: FnOnce(Inbound) -> Option, { let msg = transport.recv().await.ok_or(Error::ChannelClosed)?; extractor(msg).ok_or(Error::UnexpectedMessage) } #[async_trait] pub trait Sender: Send + Sync { async fn send(&mut self, item: Outbound) -> Result<(), Error>; } #[async_trait] pub trait Receiver: Send + Sync { async fn recv(&mut self) -> Option; } /// Minimal bidirectional transport abstraction used by protocol code. /// /// `Bi` is the combined duplex form of [`Sender`] and /// [`Receiver`]. /// /// It models a channel with: /// - inbound items of type `Inbound` read via [`Bi::recv`] /// - outbound items of type `Outbound` written via [`Bi::send`] /// /// It does not imply request/response sequencing, one-at-a-time exchange, or /// any built-in correlation mechanism between inbound and outbound items. pub trait Bi: Sender + Receiver + Send + Sync {} pub trait SplittableBi: Bi { type Sender: Sender; type Receiver: Receiver; fn split(self) -> (Self::Sender, Self::Receiver); fn from_parts(sender: Self::Sender, receiver: Self::Receiver) -> Self; } /// No-op [`Bi`] transport for tests and manual actor usage. /// /// `send` drops all items and succeeds. [`Bi::recv`] never resolves and therefore /// does not busy-wait or spuriously close the stream. pub struct DummyTransport { _marker: PhantomData<(Inbound, Outbound)>, } impl Default for DummyTransport { fn default() -> Self { Self { _marker: PhantomData, } } } #[async_trait] impl Sender for DummyTransport where Inbound: Send + Sync + 'static, Outbound: Send + Sync + 'static, { async fn send(&mut self, _item: Outbound) -> Result<(), Error> { Ok(()) } } #[async_trait] impl Receiver for DummyTransport where Inbound: Send + Sync + 'static, Outbound: Send + Sync + 'static, { async fn recv(&mut self) -> Option { std::future::pending::<()>().await; None } } impl Bi for DummyTransport where Inbound: Send + Sync + 'static, Outbound: Send + Sync + 'static, { } pub mod grpc;