refactor(server): grpc wire conversion

This commit is contained in:
hdbg
2026-03-25 14:21:00 +01:00
parent eb25d31361
commit ac04495480
11 changed files with 566 additions and 585 deletions

View File

@@ -22,10 +22,11 @@ use crate::{
keyholder::KeyHolderState,
},
grpc::request_tracker::RequestTracker,
utils::defer,
};
mod auth;
mod inbound;
mod outbound;
async fn dispatch_loop(
mut bi: GrpcBi<ClientRequest, ClientResponse>,
@@ -33,52 +34,53 @@ async fn dispatch_loop(
mut request_tracker: RequestTracker,
) {
loop {
let Some(conn) = bi.recv().await else {
let Some(message) = bi.recv().await else { return };
let conn = match message {
Ok(conn) => conn,
Err(err) => {
warn!(error = ?err, "Failed to receive client request");
return;
}
};
let request_id = match request_tracker.request(conn.request_id) {
Ok(id) => id,
Err(err) => {
let _ = bi.send(Err(err)).await;
return;
}
};
let Some(payload) = conn.payload else {
let _ = bi.send(Err(Status::invalid_argument("Missing client request payload"))).await;
return;
};
if dispatch_conn_message(&mut bi, &actor, &mut request_tracker, conn)
.await
.is_err()
{
return;
match dispatch_inner(&actor, payload).await {
Ok(response) => {
if bi.send(Ok(ClientResponse {
request_id: Some(request_id),
payload: Some(response),
})).await.is_err() {
return;
}
}
Err(status) => {
let _ = bi.send(Err(status)).await;
return;
}
}
}
}
async fn dispatch_conn_message(
bi: &mut GrpcBi<ClientRequest, ClientResponse>,
async fn dispatch_inner(
actor: &ActorRef<ClientSession>,
request_tracker: &mut RequestTracker,
conn: Result<ClientRequest, Status>,
) -> Result<(), ()> {
let conn = match conn {
Ok(conn) => conn,
Err(err) => {
warn!(error = ?err, "Failed to receive client request");
return Err(());
}
};
let request_id = match request_tracker.request(conn.request_id) {
Ok(request_id) => request_id,
Err(err) => {
let _ = bi.send(Err(err)).await;
return Err(());
}
};
let Some(payload) = conn.payload else {
let _ = bi
.send(Err(Status::invalid_argument(
"Missing client request payload",
)))
.await;
return Err(());
};
let payload = match payload {
ClientRequestPayload::QueryVaultState(_) => ClientResponsePayload::VaultState(
match actor.ask(HandleQueryVaultState {}).await {
payload: ClientRequestPayload,
) -> Result<ClientResponsePayload, Status> {
match payload {
ClientRequestPayload::QueryVaultState(_) => {
let state = match actor.ask(HandleQueryVaultState {}).await {
Ok(KeyHolderState::Unbootstrapped) => ProtoVaultState::Unbootstrapped,
Ok(KeyHolderState::Sealed) => ProtoVaultState::Sealed,
Ok(KeyHolderState::Unsealed) => ProtoVaultState::Unsealed,
@@ -87,46 +89,30 @@ async fn dispatch_conn_message(
warn!(error = ?err, "Failed to query vault state");
ProtoVaultState::Error
}
}
.into(),
),
};
Ok(ClientResponsePayload::VaultState(state.into()))
}
payload => {
warn!(?payload, "Unsupported post-auth client request");
let _ = bi
.send(Err(Status::invalid_argument("Unsupported client request")))
.await;
return Err(());
}
};
bi.send(Ok(ClientResponse {
request_id: Some(request_id),
payload: Some(payload),
}))
.await
.map_err(|_| ())
}
pub async fn start(conn: ClientConnection, mut bi: GrpcBi<ClientRequest, ClientResponse>) {
let mut conn = conn;
let mut request_tracker = RequestTracker::default();
match auth::start(&mut conn, &mut bi, &mut request_tracker).await {
Ok(_) => {
let actor =
client::session::ClientSession::spawn(client::session::ClientSession::new(conn));
let actor_for_cleanup = actor.clone();
let _ = defer(move || {
actor_for_cleanup.kill();
});
info!("Client authenticated successfully");
dispatch_loop(bi, actor, request_tracker).await;
}
Err(e) => {
let mut transport = auth::AuthTransportAdapter::new(&mut bi, &mut request_tracker);
let _ = transport.send(Err(e.clone())).await;
warn!(error = ?e, "Authentication failed");
Err(Status::invalid_argument("Unsupported client request"))
}
}
}
pub async fn start(mut conn: ClientConnection, mut bi: GrpcBi<ClientRequest, ClientResponse>) {
let mut request_tracker = RequestTracker::default();
if let Err(e) = auth::start(&mut conn, &mut bi, &mut request_tracker).await {
let mut transport = auth::AuthTransportAdapter::new(&mut bi, &mut request_tracker);
let _ = transport.send(Err(e.clone())).await;
warn!(error = ?e, "Client authentication failed");
return;
};
let actor = client::session::ClientSession::spawn(client::session::ClientSession::new(conn));
let actor_for_cleanup = actor.clone();
info!("Client authenticated successfully");
dispatch_loop(bi, actor, request_tracker).await;
actor_for_cleanup.kill();
}