merge: @main into refactor-proto
This commit is contained in:
@@ -1,36 +1,24 @@
|
||||
use arbiter_proto::{
|
||||
proto::{
|
||||
client::{
|
||||
ClientRequest, ClientResponse,
|
||||
client_request::Payload as ClientRequestPayload,
|
||||
client_response::Payload as ClientResponsePayload,
|
||||
vault::{self as proto_vault, request::Payload as VaultRequestPayload, response::Payload as VaultResponsePayload},
|
||||
},
|
||||
shared::VaultState as ProtoVaultState,
|
||||
proto::client::{
|
||||
ClientRequest, ClientResponse, client_request::Payload as ClientRequestPayload,
|
||||
client_response::Payload as ClientResponsePayload,
|
||||
},
|
||||
transport::{Receiver, Sender, grpc::GrpcBi},
|
||||
};
|
||||
use kameo::{
|
||||
actor::{ActorRef, Spawn as _},
|
||||
error::SendError,
|
||||
};
|
||||
use kameo::actor::{ActorRef, Spawn as _};
|
||||
use tonic::Status;
|
||||
use tracing::{info, warn};
|
||||
|
||||
use crate::{
|
||||
actors::{
|
||||
client::{
|
||||
self, ClientConnection,
|
||||
session::{ClientSession, Error, HandleQueryVaultState},
|
||||
},
|
||||
keyholder::KeyHolderState,
|
||||
},
|
||||
actors::client::{ClientConnection, session::ClientSession},
|
||||
grpc::request_tracker::RequestTracker,
|
||||
};
|
||||
|
||||
mod auth;
|
||||
mod evm;
|
||||
mod inbound;
|
||||
mod outbound;
|
||||
mod vault;
|
||||
|
||||
async fn dispatch_loop(
|
||||
mut bi: GrpcBi<ClientRequest, ClientResponse>,
|
||||
@@ -38,7 +26,9 @@ async fn dispatch_loop(
|
||||
mut request_tracker: RequestTracker,
|
||||
) {
|
||||
loop {
|
||||
let Some(message) = bi.recv().await else { return };
|
||||
let Some(message) = bi.recv().await else {
|
||||
return;
|
||||
};
|
||||
|
||||
let conn = match message {
|
||||
Ok(conn) => conn,
|
||||
@@ -57,16 +47,24 @@ async fn dispatch_loop(
|
||||
};
|
||||
|
||||
let Some(payload) = conn.payload else {
|
||||
let _ = bi.send(Err(Status::invalid_argument("Missing client request payload"))).await;
|
||||
let _ = bi
|
||||
.send(Err(Status::invalid_argument(
|
||||
"Missing client request payload",
|
||||
)))
|
||||
.await;
|
||||
return;
|
||||
};
|
||||
|
||||
match dispatch_inner(&actor, payload).await {
|
||||
Ok(response) => {
|
||||
if bi.send(Ok(ClientResponse {
|
||||
request_id: Some(request_id),
|
||||
payload: Some(response),
|
||||
})).await.is_err() {
|
||||
if bi
|
||||
.send(Ok(ClientResponse {
|
||||
request_id: Some(request_id),
|
||||
payload: Some(response),
|
||||
}))
|
||||
.await
|
||||
.is_err()
|
||||
{
|
||||
return;
|
||||
}
|
||||
}
|
||||
@@ -83,52 +81,33 @@ async fn dispatch_inner(
|
||||
payload: ClientRequestPayload,
|
||||
) -> Result<ClientResponsePayload, Status> {
|
||||
match payload {
|
||||
ClientRequestPayload::Vault(req) => dispatch_vault_request(actor, req).await,
|
||||
payload => {
|
||||
warn!(?payload, "Unsupported post-auth client request");
|
||||
ClientRequestPayload::Vault(req) => vault::dispatch(actor, req).await,
|
||||
ClientRequestPayload::Evm(req) => evm::dispatch(actor, req).await,
|
||||
ClientRequestPayload::Auth(..) => {
|
||||
warn!("Unsupported post-auth client auth request");
|
||||
Err(Status::invalid_argument("Unsupported client request"))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn dispatch_vault_request(
|
||||
actor: &ActorRef<ClientSession>,
|
||||
req: proto_vault::Request,
|
||||
) -> Result<ClientResponsePayload, Status> {
|
||||
let Some(payload) = req.payload else {
|
||||
return Err(Status::invalid_argument("Missing client vault request payload"));
|
||||
};
|
||||
|
||||
match payload {
|
||||
VaultRequestPayload::QueryState(_) => {
|
||||
let state = match actor.ask(HandleQueryVaultState {}).await {
|
||||
Ok(KeyHolderState::Unbootstrapped) => ProtoVaultState::Unbootstrapped,
|
||||
Ok(KeyHolderState::Sealed) => ProtoVaultState::Sealed,
|
||||
Ok(KeyHolderState::Unsealed) => ProtoVaultState::Unsealed,
|
||||
Err(SendError::HandlerError(Error::Internal)) => ProtoVaultState::Error,
|
||||
Err(err) => {
|
||||
warn!(error = ?err, "Failed to query vault state");
|
||||
ProtoVaultState::Error
|
||||
}
|
||||
};
|
||||
Ok(ClientResponsePayload::Vault(proto_vault::Response {
|
||||
payload: Some(VaultResponsePayload::State(state.into())),
|
||||
}))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn start(mut conn: ClientConnection, mut bi: GrpcBi<ClientRequest, ClientResponse>) {
|
||||
let mut request_tracker = RequestTracker::default();
|
||||
|
||||
if let Err(e) = auth::start(&mut conn, &mut bi, &mut request_tracker).await {
|
||||
let mut transport = auth::AuthTransportAdapter::new(&mut bi, &mut request_tracker);
|
||||
let _ = transport.send(Err(e.clone())).await;
|
||||
warn!(error = ?e, "Client authentication failed");
|
||||
return;
|
||||
let client_id = match auth::start(&mut conn, &mut bi, &mut request_tracker).await {
|
||||
Ok(id) => id,
|
||||
Err(err) => {
|
||||
let _ = bi
|
||||
.send(Err(Status::unauthenticated(format!(
|
||||
"Authentication failed: {}",
|
||||
err
|
||||
))))
|
||||
.await;
|
||||
warn!(error = ?err, "Client authentication failed");
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
let actor = client::session::ClientSession::spawn(client::session::ClientSession::new(conn));
|
||||
let actor = ClientSession::spawn(ClientSession::new(conn, client_id));
|
||||
let actor_for_cleanup = actor.clone();
|
||||
|
||||
info!("Client authenticated successfully");
|
||||
|
||||
Reference in New Issue
Block a user