feat(proto): request / response pair tracking by assigning id

This commit is contained in:
hdbg
2026-03-18 23:43:44 +01:00
parent b3267c60fa
commit ee04c6b422
8 changed files with 259 additions and 118 deletions

View File

@@ -10,6 +10,7 @@ use kameo::{
actor::{ActorRef, Spawn as _},
error::SendError,
};
use tonic::Status;
use tracing::{info, warn};
use crate::{
@@ -20,6 +21,7 @@ use crate::{
},
keyholder::KeyHolderState,
},
grpc::request_tracker::RequestTracker,
utils::defer,
};
@@ -28,13 +30,17 @@ mod auth;
async fn dispatch_loop(
mut bi: GrpcBi<ClientRequest, ClientResponse>,
actor: ActorRef<ClientSession>,
mut request_tracker: RequestTracker,
) {
loop {
let Some(conn) = bi.recv().await else {
return;
};
if dispatch_conn_message(&mut bi, &actor, conn).await.is_err() {
if dispatch_conn_message(&mut bi, &actor, &mut request_tracker, conn)
.await
.is_err()
{
return;
}
}
@@ -43,7 +49,8 @@ async fn dispatch_loop(
async fn dispatch_conn_message(
bi: &mut GrpcBi<ClientRequest, ClientResponse>,
actor: &ActorRef<ClientSession>,
conn: Result<ClientRequest, tonic::Status>,
request_tracker: &mut RequestTracker,
conn: Result<ClientRequest, Status>,
) -> Result<(), ()> {
let conn = match conn {
Ok(conn) => conn,
@@ -53,9 +60,16 @@ async fn dispatch_conn_message(
}
};
let request_id = match request_tracker.request(conn.request_id) {
Ok(request_id) => request_id,
Err(err) => {
let _ = bi.send(Err(err)).await;
return Err(());
}
};
let Some(payload) = conn.payload else {
let _ = bi
.send(Err(tonic::Status::invalid_argument(
.send(Err(Status::invalid_argument(
"Missing client request payload",
)))
.await;
@@ -79,15 +93,14 @@ async fn dispatch_conn_message(
payload => {
warn!(?payload, "Unsupported post-auth client request");
let _ = bi
.send(Err(tonic::Status::invalid_argument(
"Unsupported client request",
)))
.send(Err(Status::invalid_argument("Unsupported client request")))
.await;
return Err(());
}
};
bi.send(Ok(ClientResponse {
request_id: Some(request_id),
payload: Some(payload),
}))
.await
@@ -96,7 +109,10 @@ async fn dispatch_conn_message(
pub async fn start(conn: ClientConnection, mut bi: GrpcBi<ClientRequest, ClientResponse>) {
let mut conn = conn;
match auth::start(&mut conn, &mut bi).await {
let mut request_tracker = RequestTracker::default();
let mut response_id = None;
match auth::start(&mut conn, &mut bi, &mut request_tracker, &mut response_id).await {
Ok(_) => {
let actor =
client::session::ClientSession::spawn(client::session::ClientSession::new(conn));
@@ -106,10 +122,14 @@ pub async fn start(conn: ClientConnection, mut bi: GrpcBi<ClientRequest, ClientR
});
info!("Client authenticated successfully");
dispatch_loop(bi, actor).await;
dispatch_loop(bi, actor, request_tracker).await;
}
Err(e) => {
let mut transport = auth::AuthTransportAdapter(&mut bi);
let mut transport = auth::AuthTransportAdapter::new(
&mut bi,
&mut request_tracker,
&mut response_id,
);
let _ = transport.send(Err(e.clone())).await;
warn!(error = ?e, "Authentication failed");
return;