feat(proto): request / response pair tracking by assigning id
This commit is contained in:
@@ -53,6 +53,7 @@ use crate::{
|
||||
Grant, SharedGrantSettings, SpecificGrant, TransactionRateLimit, VolumeRateLimit,
|
||||
ether_transfer, token_transfers,
|
||||
},
|
||||
grpc::request_tracker::RequestTracker,
|
||||
utils::defer,
|
||||
};
|
||||
use alloy::primitives::{Address, U256};
|
||||
@@ -74,6 +75,7 @@ async fn dispatch_loop(
|
||||
mut bi: GrpcBi<UserAgentRequest, UserAgentResponse>,
|
||||
actor: ActorRef<UserAgentSession>,
|
||||
mut receiver: mpsc::Receiver<OutOfBand>,
|
||||
mut request_tracker: RequestTracker,
|
||||
) {
|
||||
loop {
|
||||
tokio::select! {
|
||||
@@ -92,7 +94,10 @@ async fn dispatch_loop(
|
||||
return;
|
||||
};
|
||||
|
||||
if dispatch_conn_message(&mut bi, &actor, conn).await.is_err() {
|
||||
if dispatch_conn_message(&mut bi, &actor, &mut request_tracker, conn)
|
||||
.await
|
||||
.is_err()
|
||||
{
|
||||
return;
|
||||
}
|
||||
}
|
||||
@@ -103,6 +108,7 @@ async fn dispatch_loop(
|
||||
async fn dispatch_conn_message(
|
||||
bi: &mut GrpcBi<UserAgentRequest, UserAgentResponse>,
|
||||
actor: &ActorRef<UserAgentSession>,
|
||||
request_tracker: &mut RequestTracker,
|
||||
conn: Result<UserAgentRequest, Status>,
|
||||
) -> Result<(), ()> {
|
||||
let conn = match conn {
|
||||
@@ -113,6 +119,14 @@ async fn dispatch_conn_message(
|
||||
}
|
||||
};
|
||||
|
||||
let request_id = match request_tracker.request(conn.id) {
|
||||
Ok(request_id) => request_id,
|
||||
Err(err) => {
|
||||
let _ = bi.send(Err(err)).await;
|
||||
return Err(());
|
||||
}
|
||||
};
|
||||
|
||||
let Some(payload) = conn.payload else {
|
||||
let _ = bi
|
||||
.send(Err(Status::invalid_argument(
|
||||
@@ -267,6 +281,7 @@ async fn dispatch_conn_message(
|
||||
};
|
||||
|
||||
bi.send(Ok(UserAgentResponse {
|
||||
id: Some(request_id),
|
||||
payload: Some(payload),
|
||||
}))
|
||||
.await
|
||||
@@ -289,6 +304,7 @@ async fn send_out_of_band(
|
||||
};
|
||||
|
||||
bi.send(Ok(UserAgentResponse {
|
||||
id: None,
|
||||
payload: Some(payload),
|
||||
}))
|
||||
.await
|
||||
@@ -558,7 +574,17 @@ pub async fn start(
|
||||
mut conn: UserAgentConnection,
|
||||
mut bi: GrpcBi<UserAgentRequest, UserAgentResponse>,
|
||||
) {
|
||||
let pubkey = match auth::start(&mut conn, &mut bi).await {
|
||||
let mut request_tracker = RequestTracker::default();
|
||||
let mut response_id = None;
|
||||
|
||||
let pubkey = match auth::start(
|
||||
&mut conn,
|
||||
&mut bi,
|
||||
&mut request_tracker,
|
||||
&mut response_id,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(pubkey) => pubkey,
|
||||
Err(e) => {
|
||||
warn!(error = ?e, "Authentication failed");
|
||||
@@ -572,11 +598,10 @@ pub async fn start(
|
||||
let actor = UserAgentSession::spawn(UserAgentSession::new(conn, Box::new(oob_adapter)));
|
||||
let actor_for_cleanup = actor.clone();
|
||||
|
||||
// when connection closes
|
||||
let _ = defer(move || {
|
||||
actor_for_cleanup.kill();
|
||||
});
|
||||
|
||||
info!(?pubkey, "User authenticated successfully");
|
||||
dispatch_loop(bi, actor, oob_receiver).await;
|
||||
dispatch_loop(bi, actor, oob_receiver, request_tracker).await;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user