feat(transport): add domain error type to GrpcTransportActor
Some checks failed
ci/woodpecker/pr/server-audit Pipeline was successful
ci/woodpecker/pr/server-lint Pipeline failed
ci/woodpecker/pr/server-vet Pipeline failed
ci/woodpecker/pr/server-test Pipeline failed

This commit is contained in:
hdbg
2026-02-25 21:53:01 +01:00
parent 7bd37b3c4a
commit 1b4369b1cb
10 changed files with 413 additions and 218 deletions

View File

@@ -1,7 +1,56 @@
//! Transport abstraction layer for bridging gRPC bidirectional streaming with kameo actors.
//!
//! This module provides a clean separation between the gRPC transport layer and business logic
//! by modeling the connection as two linked kameo actors:
//!
//! - A **transport actor** ([`GrpcTransportActor`]) that owns the gRPC stream and channel,
//! forwarding inbound messages to the business actor and outbound messages to the client.
//! - A **business logic actor** that receives inbound messages from the transport actor and
//! sends outbound messages back through the transport actor.
//!
//! The [`wire()`] function sets up bidirectional linking between the two actors, ensuring
//! that if either actor dies, the other is notified and can shut down gracefully.
//!
//! # Terminology
//!
//! - **InboundMessage**: a message received by the transport actor from the channel/socket
//! and forwarded to the business actor.
//! - **OutboundMessage**: a message produced by the business actor and sent to the transport
//! actor to be forwarded to the channel/socket.
//!
//! # Architecture
//!
//! ```text
//! gRPC Stream ──InboundMessage──▶ GrpcTransportActor ──tell(InboundMessage)──▶ BusinessActor
//! ▲ │
//! └─tell(Result<OutboundMessage, _>)────┘
//! │
//! mpsc::Sender ──▶ Client
//! ```
//!
//! # Example
//!
//! ```rust,ignore
//! let (tx, rx) = mpsc::channel(1000);
//! let context = server_context.clone();
//!
//! wire(
//! |transport_ref| MyBusinessActor::new(context, transport_ref),
//! |business_recipient, business_id| GrpcTransportActor {
//! sender: tx,
//! receiver: grpc_stream,
//! business_logic_actor: business_recipient,
//! business_logic_actor_id: business_id,
//! },
//! ).await;
//!
//! Ok(Response::new(ReceiverStream::new(rx)))
//! ```
use futures::{Stream, StreamExt};
use kameo::{
Actor,
actor::{ActorRef, PreparedActor, Spawn, WeakActorRef},
actor::{ActorRef, PreparedActor, Recipient, Spawn, WeakActorRef},
mailbox::Signal,
prelude::Message,
};
@@ -12,7 +61,15 @@ use tokio::{
use tonic::{Status, Streaming};
use tracing::{debug, error};
// Abstraction for stream for sans-io capabilities
/// A bidirectional stream abstraction for sans-io testing.
///
/// Combines a [`Stream`] of incoming messages with the ability to [`send`](Bi::send)
/// outgoing responses. This trait allows business logic to be tested without a real
/// gRPC connection by swapping in an in-memory implementation.
///
/// # Type Parameters
/// - `T`: `InboundMessage` received from the channel/socket (e.g., `UserAgentRequest`)
/// - `U`: `OutboundMessage` sent to the channel/socket (e.g., `UserAgentResponse`)
pub trait Bi<T, U>: Stream<Item = Result<T, Status>> + Send + Sync + 'static {
type Error;
fn send(
@@ -21,7 +78,10 @@ pub trait Bi<T, U>: Stream<Item = Result<T, Status>> + Send + Sync + 'static {
) -> impl std::future::Future<Output = Result<(), Self::Error>> + Send;
}
// Bi-directional stream abstraction for handling gRPC streaming requests and responses
/// Concrete [`Bi`] implementation backed by a tonic gRPC [`Streaming`] and an [`mpsc::Sender`].
///
/// This is the production implementation used in gRPC service handlers. The `request_stream`
/// receives messages from the client, and `response_sender` sends responses back.
pub struct BiStream<T, U> {
pub request_stream: Streaming<T>,
pub response_sender: mpsc::Sender<Result<U, Status>>,
@@ -54,23 +114,95 @@ where
}
}
pub trait TransportActor<T: Send + 'static>: Actor + Send + Message<T> {}
pub struct GrpcTransportActor<SendMsg, RecvMsg, A>
where
SendMsg: Send + 'static,
RecvMsg: Send + 'static,
A: TransportActor<RecvMsg>,
/// Marker trait for transport actors that can receive outbound messages of type `T`.
///
/// Implement this on your transport actor to indicate it can handle outbound messages
/// produced by the business actor. Requires the actor to implement [`Message<Result<T, E>>`]
/// so business logic can forward responses via [`tell()`](ActorRef::tell).
///
/// # Example
///
/// ```rust,ignore
/// #[derive(Actor)]
/// struct MyTransportActor { /* ... */ }
///
/// impl Message<Result<MyResponse, MyError>> for MyTransportActor {
/// type Reply = ();
/// async fn handle(&mut self, msg: Result<MyResponse, MyError>, _ctx: &mut Context<Self, Self::Reply>) -> Self::Reply {
/// // forward outbound message to channel/socket
/// }
/// }
///
/// impl TransportActor<MyResponse, MyError> for MyTransportActor {}
/// ```
pub trait TransportActor<Outbound: Send + 'static, DomainError: Send + 'static>:
Actor + Send + Message<Result<Outbound, DomainError>>
{
pub sender: mpsc::Sender<Result<SendMsg, tonic::Status>>,
pub receiver: tonic::Streaming<RecvMsg>,
pub business_logic_actor: ActorRef<A>,
}
impl<SendMsg, RecvMsg, A> Actor for GrpcTransportActor<SendMsg, RecvMsg, A>
/// A kameo actor that bridges a gRPC bidirectional stream with a business logic actor.
///
/// This actor owns the gRPC [`Streaming`] receiver and an [`mpsc::Sender`] for responses.
/// It multiplexes between its own mailbox (for outbound messages from the business actor)
/// and the gRPC stream (for inbound client messages) using [`tokio::select!`].
///
/// # Message Flow
///
/// - **Inbound**: Messages from the gRPC stream are forwarded to `business_logic_actor`
/// via [`tell()`](Recipient::tell).
/// - **Outbound**: The business actor sends `Result<Outbound, DomainError>` messages to this
/// actor, which forwards them through the `sender` channel to the gRPC response stream.
///
/// # Lifecycle
///
/// - If the business logic actor dies (detected via actor linking), this actor stops,
/// which closes the gRPC stream.
/// - If the gRPC stream closes or errors, this actor stops, which (via linking) notifies
/// the business actor.
/// - Error responses (`Err(DomainError)`) are forwarded to the client and then the actor stops,
/// closing the connection.
///
/// # Type Parameters
/// - `Outbound`: `OutboundMessage` sent to the client (e.g., `UserAgentResponse`)
/// - `Inbound`: `InboundMessage` received from the client (e.g., `UserAgentRequest`)
/// - `E`: The domain error type, must implement `Into<tonic::Status>` for gRPC conversion
pub struct GrpcTransportActor<Outbound, Inbound, DomainError>
where
SendMsg: Send + 'static,
RecvMsg: Send + 'static,
A: TransportActor<RecvMsg>,
Outbound: Send + 'static,
Inbound: Send + 'static,
DomainError: Into<tonic::Status> + Send + 'static,
{
sender: mpsc::Sender<Result<Outbound, tonic::Status>>,
receiver: tonic::Streaming<Inbound>,
business_logic_actor: Recipient<Inbound>,
_error: std::marker::PhantomData<DomainError>,
}
impl<Outbound, Inbound, DomainError> GrpcTransportActor<Outbound, Inbound, DomainError>
where
Outbound: Send + 'static,
Inbound: Send + 'static,
DomainError: Into<tonic::Status> + Send + 'static,
{
pub fn new(
sender: mpsc::Sender<Result<Outbound, tonic::Status>>,
receiver: tonic::Streaming<Inbound>,
business_logic_actor: Recipient<Inbound>,
) -> Self {
Self {
sender,
receiver,
business_logic_actor,
_error: std::marker::PhantomData,
}
}
}
impl<Outbound, Inbound, E> Actor for GrpcTransportActor<Outbound, Inbound, E>
where
Outbound: Send + 'static,
Inbound: Send + 'static,
E: Into<tonic::Status> + Send + 'static,
{
type Args = Self;
@@ -139,19 +271,27 @@ where
}
}
impl<SendMsg: Send + 'static, RecvMsg: Send + 'static, A: TransportActor<RecvMsg>> Message<SendMsg>
for GrpcTransportActor<SendMsg, RecvMsg, A>
impl<Outbound, Inbound, E> Message<Result<Outbound, E>> for GrpcTransportActor<Outbound, Inbound, E>
where
Outbound: Send + 'static,
Inbound: Send + 'static,
E: Into<tonic::Status> + Send + 'static,
{
type Reply = ();
async fn handle(
&mut self,
msg: SendMsg,
msg: Result<Outbound, E>,
ctx: &mut kameo::prelude::Context<Self, Self::Reply>,
) -> Self::Reply {
let err = self.sender.send(Ok(msg)).await;
match err {
Ok(_) => (),
let is_err = msg.is_err();
let grpc_msg = msg.map_err(Into::into);
match self.sender.send(grpc_msg).await {
Ok(_) => {
if is_err {
ctx.stop();
}
}
Err(e) => {
error!("Failed to send message: {}", e);
ctx.stop();
@@ -160,20 +300,60 @@ impl<SendMsg: Send + 'static, RecvMsg: Send + 'static, A: TransportActor<RecvMsg
}
}
pub async fn wire<T, RecvMsg, SendMsg, BusinessActor, BusinessCtor, TransportCtor>(
impl<Outbound, Inbound, E> TransportActor<Outbound, E> for GrpcTransportActor<Outbound, Inbound, E>
where
Outbound: Send + 'static,
Inbound: Send + 'static,
E: Into<tonic::Status> + Send + 'static,
{
}
/// Wires together a transport actor and a business logic actor with bidirectional linking.
///
/// This function handles the chicken-and-egg problem of two actors that need references
/// to each other at construction time. It uses kameo's [`PreparedActor`] to obtain
/// [`ActorRef`]s before spawning, then links both actors so that if either dies,
/// the other is notified via [`on_link_died`](Actor::on_link_died).
///
/// The business actor receives a type-erased [`Recipient<Result<Outbound, DomainError>>`] instead of an
/// `ActorRef<Transport>`, keeping it decoupled from the concrete transport implementation.
///
/// # Type Parameters
/// - `Transport`: The transport actor type (e.g., [`GrpcTransportActor`])
/// - `Inbound`: `InboundMessage` received by the business actor from the transport
/// - `Outbound`: `OutboundMessage` sent by the business actor back to the transport
/// - `Business`: The business logic actor
/// - `BusinessCtor`: Closure that receives a prepared business actor and transport recipient,
/// spawns the business actor, and returns its [`ActorRef`]
/// - `TransportCtor`: Closure that receives a prepared transport actor, a recipient for
/// inbound messages, and the business actor id, then spawns the transport actor
///
/// # Returns
/// A tuple of `(transport_ref, business_ref)` — actor references for both spawned actors.
pub async fn wire<
Transport,
Inbound,
Outbound,
DomainError,
Business,
BusinessCtor,
TransportCtor,
>(
business_ctor: BusinessCtor,
transport_ctor: TransportCtor,
) -> (ActorRef<T>, ActorRef<BusinessActor>)
) -> (ActorRef<Transport>, ActorRef<Business>)
where
T: TransportActor<RecvMsg>,
RecvMsg: Send + 'static,
SendMsg: Send + 'static,
BusinessActor: Actor + Send + 'static,
BusinessCtor: FnOnce(ActorRef<T>) -> BusinessActor::Args,
TransportCtor: FnOnce(ActorRef<BusinessActor>) -> T::Args,
Transport: TransportActor<Outbound, DomainError>,
Inbound: Send + 'static,
Outbound: Send + 'static,
DomainError: Send + 'static,
Business: Actor + Message<Inbound> + Send + 'static,
BusinessCtor: FnOnce(PreparedActor<Business>, Recipient<Result<Outbound, DomainError>>),
TransportCtor:
FnOnce(PreparedActor<Transport>, Recipient<Inbound>),
{
let prepared_business: PreparedActor<BusinessActor> = Spawn::prepare();
let prepared_transport: PreparedActor<T> = Spawn::prepare();
let prepared_business: PreparedActor<Business> = Spawn::prepare();
let prepared_transport: PreparedActor<Transport> = Spawn::prepare();
let business_ref = prepared_business.actor_ref().clone();
let transport_ref = prepared_transport.actor_ref().clone();
@@ -181,8 +361,11 @@ where
transport_ref.link(&business_ref).await;
business_ref.link(&transport_ref).await;
let _ = prepared_business.spawn(business_ctor(transport_ref.clone()));
let _ = prepared_transport.spawn(transport_ctor(business_ref.clone()));
let recipient = transport_ref.clone().recipient();
business_ctor(prepared_business, recipient);
let business_recipient = business_ref.clone().recipient();
transport_ctor(prepared_transport, business_recipient);
(transport_ref, business_ref)
}