From ac0449548055b8819856dbc52384cdcce35e57b2 Mon Sep 17 00:00:00 2001 From: hdbg Date: Wed, 25 Mar 2026 14:21:00 +0100 Subject: [PATCH] refactor(server): grpc wire conversion --- protobufs/evm.proto | 3 +- protobufs/user_agent.proto | 21 +- .../arbiter-server/src/actors/evm/mod.rs | 9 +- .../actors/user_agent/session/connection.rs | 2 +- .../crates/arbiter-server/src/grpc/client.rs | 140 ++-- .../arbiter-server/src/grpc/client/inbound.rs | 0 .../src/grpc/client/outbound.rs | 0 server/crates/arbiter-server/src/grpc/mod.rs | 13 + .../arbiter-server/src/grpc/user_agent.rs | 736 ++++++------------ .../src/grpc/user_agent/inbound.rs | 135 ++++ .../src/grpc/user_agent/outbound.rs | 92 +++ 11 files changed, 566 insertions(+), 585 deletions(-) create mode 100644 server/crates/arbiter-server/src/grpc/client/inbound.rs create mode 100644 server/crates/arbiter-server/src/grpc/client/outbound.rs create mode 100644 server/crates/arbiter-server/src/grpc/user_agent/inbound.rs create mode 100644 server/crates/arbiter-server/src/grpc/user_agent/outbound.rs diff --git a/protobufs/evm.proto b/protobufs/evm.proto index 3ad3782..f20df52 100644 --- a/protobufs/evm.proto +++ b/protobufs/evm.proto @@ -12,7 +12,8 @@ enum EvmError { } message WalletEntry { - bytes address = 1; // 20-byte Ethereum address + int32 id = 1; + bytes address = 2; // 20-byte Ethereum address } message WalletList { diff --git a/protobufs/user_agent.proto b/protobufs/user_agent.proto index ee3af0e..d2a697e 100644 --- a/protobufs/user_agent.proto +++ b/protobufs/user_agent.proto @@ -132,6 +132,19 @@ message SdkClientConnectionCancel { bytes pubkey = 1; } +message SdkClientWalletAccess { + int32 client_id = 1; + int32 wallet_id = 2; +} + +message SdkClientGrantWalletAccess { + repeated SdkClientWalletAccess accesses = 1; +} + +message SdkClientRevokeWalletAccess { + repeated SdkClientWalletAccess accesses = 1; +} + message UserAgentRequest { int32 id = 16; oneof payload { @@ -146,9 +159,11 @@ message UserAgentRequest { arbiter.evm.EvmGrantDeleteRequest evm_grant_delete = 9; arbiter.evm.EvmGrantListRequest evm_grant_list = 10; SdkClientConnectionResponse sdk_client_connection_response = 11; - SdkClientRevokeRequest sdk_client_revoke = 13; - google.protobuf.Empty sdk_client_list = 14; - BootstrapEncryptedKey bootstrap_encrypted_key = 15; + SdkClientRevokeRequest sdk_client_revoke = 12; + google.protobuf.Empty sdk_client_list = 13; + BootstrapEncryptedKey bootstrap_encrypted_key = 14; + SdkClientGrantWalletAccess grant_wallet_access_list = 15; + SdkClientRevokeWalletAccess revoke_wallet_access_list = 17; } } message UserAgentResponse { diff --git a/server/crates/arbiter-server/src/actors/evm/mod.rs b/server/crates/arbiter-server/src/actors/evm/mod.rs index 691e372..c875b18 100644 --- a/server/crates/arbiter-server/src/actors/evm/mod.rs +++ b/server/crates/arbiter-server/src/actors/evm/mod.rs @@ -105,7 +105,7 @@ impl EvmActor { #[messages] impl EvmActor { #[message] - pub async fn generate(&mut self) -> Result { + pub async fn generate(&mut self) -> Result<(i32, Address), Error> { let (mut key_cell, address) = safe_signer::generate(&mut self.rng); let plaintext = key_cell.read_inline(|reader| SafeCell::new(reader.to_vec())); @@ -117,15 +117,16 @@ impl EvmActor { .map_err(|_| Error::KeyholderSend)?; let mut conn = self.db.get().await?; - insert_into(schema::evm_wallet::table) + let wallet_id = insert_into(schema::evm_wallet::table) .values(&models::NewEvmWallet { address: address.as_slice().to_vec(), aead_encrypted_id: aead_id, }) - .execute(&mut conn) + .returning(schema::evm_wallet::id) + .get_result(&mut conn) .await?; - Ok(address) + Ok((wallet_id, address)) } #[message] diff --git a/server/crates/arbiter-server/src/actors/user_agent/session/connection.rs b/server/crates/arbiter-server/src/actors/user_agent/session/connection.rs index 397b563..85074a1 100644 --- a/server/crates/arbiter-server/src/actors/user_agent/session/connection.rs +++ b/server/crates/arbiter-server/src/actors/user_agent/session/connection.rs @@ -276,7 +276,7 @@ impl UserAgentSession { #[messages] impl UserAgentSession { #[message] - pub(crate) async fn handle_evm_wallet_create(&mut self) -> Result { + pub(crate) async fn handle_evm_wallet_create(&mut self) -> Result<(i32, Address), Error> { match self.props.actors.evm.ask(Generate {}).await { Ok(address) => Ok(address), Err(SendError::HandlerError(err)) => Err(Error::internal(format!( diff --git a/server/crates/arbiter-server/src/grpc/client.rs b/server/crates/arbiter-server/src/grpc/client.rs index 063a5b2..cd032f4 100644 --- a/server/crates/arbiter-server/src/grpc/client.rs +++ b/server/crates/arbiter-server/src/grpc/client.rs @@ -22,10 +22,11 @@ use crate::{ keyholder::KeyHolderState, }, grpc::request_tracker::RequestTracker, - utils::defer, }; mod auth; +mod inbound; +mod outbound; async fn dispatch_loop( mut bi: GrpcBi, @@ -33,52 +34,53 @@ async fn dispatch_loop( mut request_tracker: RequestTracker, ) { loop { - let Some(conn) = bi.recv().await else { + let Some(message) = bi.recv().await else { return }; + + let conn = match message { + Ok(conn) => conn, + Err(err) => { + warn!(error = ?err, "Failed to receive client request"); + return; + } + }; + + let request_id = match request_tracker.request(conn.request_id) { + Ok(id) => id, + Err(err) => { + let _ = bi.send(Err(err)).await; + return; + } + }; + + let Some(payload) = conn.payload else { + let _ = bi.send(Err(Status::invalid_argument("Missing client request payload"))).await; return; }; - if dispatch_conn_message(&mut bi, &actor, &mut request_tracker, conn) - .await - .is_err() - { - return; + match dispatch_inner(&actor, payload).await { + Ok(response) => { + if bi.send(Ok(ClientResponse { + request_id: Some(request_id), + payload: Some(response), + })).await.is_err() { + return; + } + } + Err(status) => { + let _ = bi.send(Err(status)).await; + return; + } } } } -async fn dispatch_conn_message( - bi: &mut GrpcBi, +async fn dispatch_inner( actor: &ActorRef, - request_tracker: &mut RequestTracker, - conn: Result, -) -> Result<(), ()> { - let conn = match conn { - Ok(conn) => conn, - Err(err) => { - warn!(error = ?err, "Failed to receive client request"); - return Err(()); - } - }; - - let request_id = match request_tracker.request(conn.request_id) { - Ok(request_id) => request_id, - Err(err) => { - let _ = bi.send(Err(err)).await; - return Err(()); - } - }; - let Some(payload) = conn.payload else { - let _ = bi - .send(Err(Status::invalid_argument( - "Missing client request payload", - ))) - .await; - return Err(()); - }; - - let payload = match payload { - ClientRequestPayload::QueryVaultState(_) => ClientResponsePayload::VaultState( - match actor.ask(HandleQueryVaultState {}).await { + payload: ClientRequestPayload, +) -> Result { + match payload { + ClientRequestPayload::QueryVaultState(_) => { + let state = match actor.ask(HandleQueryVaultState {}).await { Ok(KeyHolderState::Unbootstrapped) => ProtoVaultState::Unbootstrapped, Ok(KeyHolderState::Sealed) => ProtoVaultState::Sealed, Ok(KeyHolderState::Unsealed) => ProtoVaultState::Unsealed, @@ -87,46 +89,30 @@ async fn dispatch_conn_message( warn!(error = ?err, "Failed to query vault state"); ProtoVaultState::Error } - } - .into(), - ), + }; + Ok(ClientResponsePayload::VaultState(state.into())) + } payload => { warn!(?payload, "Unsupported post-auth client request"); - let _ = bi - .send(Err(Status::invalid_argument("Unsupported client request"))) - .await; - return Err(()); - } - }; - - bi.send(Ok(ClientResponse { - request_id: Some(request_id), - payload: Some(payload), - })) - .await - .map_err(|_| ()) -} - -pub async fn start(conn: ClientConnection, mut bi: GrpcBi) { - let mut conn = conn; - let mut request_tracker = RequestTracker::default(); - - match auth::start(&mut conn, &mut bi, &mut request_tracker).await { - Ok(_) => { - let actor = - client::session::ClientSession::spawn(client::session::ClientSession::new(conn)); - let actor_for_cleanup = actor.clone(); - let _ = defer(move || { - actor_for_cleanup.kill(); - }); - - info!("Client authenticated successfully"); - dispatch_loop(bi, actor, request_tracker).await; - } - Err(e) => { - let mut transport = auth::AuthTransportAdapter::new(&mut bi, &mut request_tracker); - let _ = transport.send(Err(e.clone())).await; - warn!(error = ?e, "Authentication failed"); + Err(Status::invalid_argument("Unsupported client request")) } } } + +pub async fn start(mut conn: ClientConnection, mut bi: GrpcBi) { + let mut request_tracker = RequestTracker::default(); + + if let Err(e) = auth::start(&mut conn, &mut bi, &mut request_tracker).await { + let mut transport = auth::AuthTransportAdapter::new(&mut bi, &mut request_tracker); + let _ = transport.send(Err(e.clone())).await; + warn!(error = ?e, "Client authentication failed"); + return; + }; + + let actor = client::session::ClientSession::spawn(client::session::ClientSession::new(conn)); + let actor_for_cleanup = actor.clone(); + + info!("Client authenticated successfully"); + dispatch_loop(bi, actor, request_tracker).await; + actor_for_cleanup.kill(); +} diff --git a/server/crates/arbiter-server/src/grpc/client/inbound.rs b/server/crates/arbiter-server/src/grpc/client/inbound.rs new file mode 100644 index 0000000..e69de29 diff --git a/server/crates/arbiter-server/src/grpc/client/outbound.rs b/server/crates/arbiter-server/src/grpc/client/outbound.rs new file mode 100644 index 0000000..e69de29 diff --git a/server/crates/arbiter-server/src/grpc/mod.rs b/server/crates/arbiter-server/src/grpc/mod.rs index de60b84..149f0cb 100644 --- a/server/crates/arbiter-server/src/grpc/mod.rs +++ b/server/crates/arbiter-server/src/grpc/mod.rs @@ -18,6 +18,19 @@ pub mod client; mod request_tracker; pub mod user_agent; +pub trait Convert { + type Output; + + fn convert(self) -> Self::Output; +} + +pub trait TryConvert { + type Output; + type Error; + + fn try_convert(self) -> Result; +} + #[async_trait] impl arbiter_proto::proto::arbiter_service_server::ArbiterService for super::Server { type UserAgentStream = ReceiverStream>; diff --git a/server/crates/arbiter-server/src/grpc/user_agent.rs b/server/crates/arbiter-server/src/grpc/user_agent.rs index 2742660..470b479 100644 --- a/server/crates/arbiter-server/src/grpc/user_agent.rs +++ b/server/crates/arbiter-server/src/grpc/user_agent.rs @@ -4,26 +4,21 @@ use arbiter_proto::{ proto::{ client::ClientInfo as ProtoClientMetadata, evm::{ - EtherTransferSettings as ProtoEtherTransferSettings, EvmError as ProtoEvmError, - EvmGrantCreateRequest, EvmGrantCreateResponse, EvmGrantDeleteRequest, - EvmGrantDeleteResponse, EvmGrantList, EvmGrantListResponse, GrantEntry, - SharedSettings as ProtoSharedSettings, SpecificGrant as ProtoSpecificGrant, - TokenTransferSettings as ProtoTokenTransferSettings, - TransactionRateLimit as ProtoTransactionRateLimit, - VolumeRateLimit as ProtoVolumeRateLimit, WalletCreateResponse, WalletEntry, WalletList, - WalletListResponse, evm_grant_create_response::Result as EvmGrantCreateResult, + EvmError as ProtoEvmError, EvmGrantCreateRequest, EvmGrantCreateResponse, + EvmGrantDeleteRequest, EvmGrantDeleteResponse, EvmGrantList, EvmGrantListResponse, + GrantEntry, WalletCreateResponse, WalletEntry, WalletList, WalletListResponse, + evm_grant_create_response::Result as EvmGrantCreateResult, evm_grant_delete_response::Result as EvmGrantDeleteResult, evm_grant_list_response::Result as EvmGrantListResult, - specific_grant::Grant as ProtoSpecificGrantType, wallet_create_response::Result as WalletCreateResult, wallet_list_response::Result as WalletListResult, }, user_agent::{ BootstrapEncryptedKey as ProtoBootstrapEncryptedKey, BootstrapResult as ProtoBootstrapResult, - SdkClientEntry as ProtoSdkClientEntry, SdkClientError as ProtoSdkClientError, SdkClientConnectionCancel as ProtoSdkClientConnectionCancel, SdkClientConnectionRequest as ProtoSdkClientConnectionRequest, + SdkClientEntry as ProtoSdkClientEntry, SdkClientError as ProtoSdkClientError, SdkClientList as ProtoSdkClientList, SdkClientListResponse as ProtoSdkClientListResponse, UnsealEncryptedKey as ProtoUnsealEncryptedKey, UnsealResult as ProtoUnsealResult, @@ -35,9 +30,7 @@ use arbiter_proto::{ }, transport::{Error as TransportError, Receiver, Sender, grpc::GrpcBi}, }; -use prost_types::{Timestamp as ProtoTimestamp, }; use async_trait::async_trait; -use chrono::{TimeZone, Utc}; use kameo::{ actor::{ActorRef, Spawn as _}, error::SendError, @@ -51,23 +44,18 @@ use crate::{ user_agent::{ OutOfBand, UserAgentConnection, UserAgentSession, session::{ - BootstrapError, Error, HandleBootstrapEncryptedKey, HandleEvmWalletCreate, + BootstrapError, HandleBootstrapEncryptedKey, HandleEvmWalletCreate, HandleEvmWalletList, HandleGrantCreate, HandleGrantDelete, HandleGrantList, HandleNewClientApprove, HandleQueryVaultState, HandleSdkClientList, - HandleUnsealEncryptedKey, - HandleUnsealRequest, UnsealError, + HandleUnsealEncryptedKey, HandleUnsealRequest, UnsealError, }, }, }, - evm::policies::{ - Grant, SharedGrantSettings, SpecificGrant, TransactionRateLimit, VolumeRateLimit, - ether_transfer, token_transfers, - }, - grpc::request_tracker::RequestTracker, - utils::defer, + grpc::{Convert, TryConvert, request_tracker::RequestTracker}, }; -use alloy::primitives::{Address, U256}; mod auth; +mod inbound; +mod outbound; pub struct OutOfBandAdapter(mpsc::Sender); @@ -95,92 +83,105 @@ async fn dispatch_loop( return; }; - if send_out_of_band(&mut bi, oob).await.is_err() { + let payload = match oob { + OutOfBand::ClientConnectionRequest { profile } => { + UserAgentResponsePayload::SdkClientConnectionRequest(ProtoSdkClientConnectionRequest { + pubkey: profile.pubkey.to_bytes().to_vec(), + info: Some(ProtoClientMetadata { + name: profile.metadata.name, + description: profile.metadata.description, + version: profile.metadata.version, + }), + }) + } + OutOfBand::ClientConnectionCancel { pubkey } => { + UserAgentResponsePayload::SdkClientConnectionCancel(ProtoSdkClientConnectionCancel { + pubkey: pubkey.to_bytes().to_vec(), + }) + } + }; + + if bi.send(Ok(UserAgentResponse { id: None, payload: Some(payload) })).await.is_err() { return; } } - conn = bi.recv() => { - let Some(conn) = conn else { + message = bi.recv() => { + let Some(message) = message else { return; }; + + let conn = match message { + Ok(conn) => conn, + Err(err) => { + warn!(error = ?err, "Failed to receive user agent request"); + return; + } + }; + + let request_id = match request_tracker.request(conn.id) { + Ok(id) => id, + Err(err) => { + let _ = bi.send(Err(err)).await; + return; + } + }; + + let Some(payload) = conn.payload else { + let _ = bi.send(Err(Status::invalid_argument("Missing user-agent request payload"))).await; return; }; - if let Err(e) = dispatch_conn_message(&mut bi, &actor, &mut request_tracker, conn) - .await - - { - error!(error = ?e, "Error handling user agent message"); - return; + match dispatch_inner(&actor, payload).await { + Ok(Some(response)) => { + if bi.send(Ok(UserAgentResponse { + id: Some(request_id), + payload: Some(response), + })).await.is_err() { + return; + } + } + Ok(None) => {} + Err(status) => { + error!(?status, "Failed to process user agent request"); + let _ = bi.send(Err(status)).await; + return; + } } } } } } -async fn dispatch_conn_message( - bi: &mut GrpcBi, +async fn dispatch_inner( actor: &ActorRef, - request_tracker: &mut RequestTracker, - conn: Result, -) -> Result<(), ()> { - let conn = match conn { - Ok(conn) => conn, - Err(err) => { - warn!(error = ?err, "Failed to receive user agent request"); - return Err(()); - } - }; - - let request_id = match request_tracker.request(conn.id) { - Ok(request_id) => request_id, - Err(err) => { - let _ = bi.send(Err(err)).await; - return Err(()); - } - }; - - let Some(payload) = conn.payload else { - let _ = bi - .send(Err(Status::invalid_argument( - "Missing user-agent request payload", - ))) - .await; - return Err(()); - }; - - let payload = match payload { + payload: UserAgentRequestPayload, +) -> Result, Status> { + let response = match payload { UserAgentRequestPayload::UnsealStart(UnsealStart { client_pubkey }) => { - let client_pubkey = match <[u8; 32]>::try_from(client_pubkey) { - Ok(bytes) => x25519_dalek::PublicKey::from(bytes), - Err(_) => { - let _ = bi - .send(Err(Status::invalid_argument("Invalid X25519 public key"))) - .await; - return Err(()); - } - }; + let client_pubkey = <[u8; 32]>::try_from(client_pubkey) + .map(x25519_dalek::PublicKey::from) + .map_err(|_| Status::invalid_argument("Invalid X25519 public key"))?; - match actor.ask(HandleUnsealRequest { client_pubkey }).await { - Ok(response) => UserAgentResponsePayload::UnsealStartResponse( - arbiter_proto::proto::user_agent::UnsealStartResponse { - server_pubkey: response.server_pubkey.as_bytes().to_vec(), - }, - ), - Err(err) => { + let response = actor + .ask(HandleUnsealRequest { client_pubkey }) + .await + .map_err(|err| { warn!(error = ?err, "Failed to handle unseal start request"); - let _ = bi - .send(Err(Status::internal("Failed to start unseal flow"))) - .await; - return Err(()); - } - } + Status::internal("Failed to start unseal flow") + })?; + + UserAgentResponsePayload::UnsealStartResponse( + arbiter_proto::proto::user_agent::UnsealStartResponse { + server_pubkey: response.server_pubkey.as_bytes().to_vec(), + }, + ) } + UserAgentRequestPayload::UnsealEncryptedKey(ProtoUnsealEncryptedKey { nonce, ciphertext, associated_data, - }) => UserAgentResponsePayload::UnsealResult( - match actor + }) => { + let result = match actor .ask(HandleUnsealEncryptedKey { nonce, ciphertext, @@ -194,20 +195,18 @@ async fn dispatch_conn_message( } Err(err) => { warn!(error = ?err, "Failed to handle unseal request"); - let _ = bi - .send(Err(Status::internal("Failed to unseal vault"))) - .await; - return Err(()); + return Err(Status::internal("Failed to unseal vault")); } - } - .into(), - ), + }; + UserAgentResponsePayload::UnsealResult(result.into()) + } + UserAgentRequestPayload::BootstrapEncryptedKey(ProtoBootstrapEncryptedKey { nonce, ciphertext, associated_data, - }) => UserAgentResponsePayload::BootstrapResult( - match actor + }) => { + let result = match actor .ask(HandleBootstrapEncryptedKey { nonce, ciphertext, @@ -224,16 +223,14 @@ async fn dispatch_conn_message( } Err(err) => { warn!(error = ?err, "Failed to handle bootstrap request"); - let _ = bi - .send(Err(Status::internal("Failed to bootstrap vault"))) - .await; - return Err(()); + return Err(Status::internal("Failed to bootstrap vault")); } - } - .into(), - ), - UserAgentRequestPayload::QueryVaultState(_) => UserAgentResponsePayload::VaultState( - match actor.ask(HandleQueryVaultState {}).await { + }; + UserAgentResponsePayload::BootstrapResult(result.into()) + } + + UserAgentRequestPayload::QueryVaultState(_) => { + let state = match actor.ask(HandleQueryVaultState {}).await { Ok(KeyHolderState::Unbootstrapped) => ProtoVaultState::Unbootstrapped, Ok(KeyHolderState::Sealed) => ProtoVaultState::Sealed, Ok(KeyHolderState::Unsealed) => ProtoVaultState::Unsealed, @@ -241,422 +238,163 @@ async fn dispatch_conn_message( warn!(error = ?err, "Failed to query vault state"); ProtoVaultState::Error } - } - .into(), - ), - UserAgentRequestPayload::EvmWalletCreate(_) => UserAgentResponsePayload::EvmWalletCreate( - EvmGrantOrWallet::wallet_create_response(actor.ask(HandleEvmWalletCreate {}).await), - ), - UserAgentRequestPayload::EvmWalletList(_) => UserAgentResponsePayload::EvmWalletList( - EvmGrantOrWallet::wallet_list_response(actor.ask(HandleEvmWalletList {}).await), - ), - UserAgentRequestPayload::EvmGrantList(_) => UserAgentResponsePayload::EvmGrantList( - EvmGrantOrWallet::grant_list_response(actor.ask(HandleGrantList {}).await), - ), + }; + UserAgentResponsePayload::VaultState(state.into()) + } + + UserAgentRequestPayload::EvmWalletCreate(_) => { + let result = match actor.ask(HandleEvmWalletCreate {}).await { + Ok((wallet_id, address)) => WalletCreateResult::Wallet(WalletEntry { + id: wallet_id, + address: address.to_vec(), + }), + Err(err) => { + warn!(error = ?err, "Failed to create EVM wallet"); + WalletCreateResult::Error(ProtoEvmError::Internal.into()) + } + }; + UserAgentResponsePayload::EvmWalletCreate(WalletCreateResponse { + result: Some(result), + }) + } + + UserAgentRequestPayload::EvmWalletList(_) => { + let result = match actor.ask(HandleEvmWalletList {}).await { + Ok(wallets) => WalletListResult::Wallets(WalletList { + wallets: wallets + .into_iter() + .map(|w| WalletEntry { + address: w.to_vec(), + id: todo!(), + }) + .collect(), + }), + Err(err) => { + warn!(error = ?err, "Failed to list EVM wallets"); + WalletListResult::Error(ProtoEvmError::Internal.into()) + } + }; + UserAgentResponsePayload::EvmWalletList(WalletListResponse { + result: Some(result), + }) + } + + UserAgentRequestPayload::EvmGrantList(_) => { + let result = match actor.ask(HandleGrantList {}).await { + Ok(grants) => EvmGrantListResult::Grants(EvmGrantList { + grants: grants + .into_iter() + .map(|grant| GrantEntry { + id: grant.id, + wallet_access_id: grant.shared.wallet_access_id, + shared: Some(grant.shared.convert()), + specific: Some(grant.settings.convert()), + }) + .collect(), + }), + Err(err) => { + warn!(error = ?err, "Failed to list EVM grants"); + EvmGrantListResult::Error(ProtoEvmError::Internal.into()) + } + }; + UserAgentResponsePayload::EvmGrantList(EvmGrantListResponse { + result: Some(result), + }) + } + UserAgentRequestPayload::EvmGrantCreate(EvmGrantCreateRequest { shared, specific }) => { - let (basic, grant) = match parse_grant_request(shared, specific) { - Ok(values) => values, - Err(status) => { - let _ = bi.send(Err(status)).await; - return Err(()); + let basic = shared + .ok_or_else(|| Status::invalid_argument("Missing shared grant settings"))? + .try_convert()?; + let grant = specific + .ok_or_else(|| Status::invalid_argument("Missing specific grant settings"))? + .try_convert()?; + + let result = match actor.ask(HandleGrantCreate { basic, grant }).await { + Ok(grant_id) => EvmGrantCreateResult::GrantId(grant_id), + Err(err) => { + warn!(error = ?err, "Failed to create EVM grant"); + EvmGrantCreateResult::Error(ProtoEvmError::Internal.into()) } }; - - UserAgentResponsePayload::EvmGrantCreate(EvmGrantOrWallet::grant_create_response( - actor.ask(HandleGrantCreate { basic, grant }).await, - )) + UserAgentResponsePayload::EvmGrantCreate(EvmGrantCreateResponse { + result: Some(result), + }) } + UserAgentRequestPayload::EvmGrantDelete(EvmGrantDeleteRequest { grant_id }) => { - UserAgentResponsePayload::EvmGrantDelete(EvmGrantOrWallet::grant_delete_response( - actor.ask(HandleGrantDelete { grant_id }).await, - )) + let result = match actor.ask(HandleGrantDelete { grant_id }).await { + Ok(()) => EvmGrantDeleteResult::Ok(()), + Err(err) => { + warn!(error = ?err, "Failed to delete EVM grant"); + EvmGrantDeleteResult::Error(ProtoEvmError::Internal.into()) + } + }; + UserAgentResponsePayload::EvmGrantDelete(EvmGrantDeleteResponse { + result: Some(result), + }) } - UserAgentRequestPayload::SdkClientConnectionResponse(resp) => { - let pubkey_bytes: [u8; 32] = match resp.pubkey.try_into() { - Ok(bytes) => bytes, - Err(_) => { - let _ = bi - .send(Err(Status::invalid_argument( - "Invalid Ed25519 public key length", - ))) - .await; - return Err(()); - } - }; - let pubkey = match ed25519_dalek::VerifyingKey::from_bytes(&pubkey_bytes) { - Ok(key) => key, - Err(_) => { - let _ = bi - .send(Err(Status::invalid_argument("Invalid Ed25519 public key"))) - .await; - return Err(()); - } - }; - if let Err(err) = actor + UserAgentRequestPayload::SdkClientConnectionResponse(resp) => { + let pubkey_bytes = <[u8; 32]>::try_from(resp.pubkey) + .map_err(|_| Status::invalid_argument("Invalid Ed25519 public key length"))?; + let pubkey = ed25519_dalek::VerifyingKey::from_bytes(&pubkey_bytes) + .map_err(|_| Status::invalid_argument("Invalid Ed25519 public key"))?; + + actor .ask(HandleNewClientApprove { approved: resp.approved, pubkey, }) .await - { - warn!(?err, "Failed to process client connection response"); - let _ = bi - .send(Err(Status::internal("Failed to process response"))) - .await; - return Err(()); - } + .map_err(|err| { + warn!(?err, "Failed to process client connection response"); + Status::internal("Failed to process response") + })?; - return Ok(()); + return Ok(None); } - UserAgentRequestPayload::SdkClientRevoke(_sdk_client_revoke_request) => todo!(), + + UserAgentRequestPayload::SdkClientRevoke(_) => todo!(), + UserAgentRequestPayload::SdkClientList(_) => { - UserAgentResponsePayload::SdkClientListResponse( - SdkClient::list_response(actor.ask(HandleSdkClientList {}).await), - ) - }, + let result = match actor.ask(HandleSdkClientList {}).await { + Ok(clients) => ProtoSdkClientListResult::Clients(ProtoSdkClientList { + clients: clients + .into_iter() + .map(|(client, metadata)| ProtoSdkClientEntry { + id: client.id, + pubkey: client.public_key, + info: Some(ProtoClientMetadata { + name: metadata.name, + description: metadata.description, + version: metadata.version, + }), + created_at: client.created_at.0.timestamp() as i32, + }) + .collect(), + }), + Err(err) => { + warn!(error = ?err, "Failed to list SDK clients"); + ProtoSdkClientListResult::Error(ProtoSdkClientError::Internal.into()) + } + }; + UserAgentResponsePayload::SdkClientListResponse(ProtoSdkClientListResponse { + result: Some(result), + }) + } + + UserAgentRequestPayload::GrantWalletAccessList(_) + | UserAgentRequestPayload::RevokeWalletAccessList(_) => todo!(), + UserAgentRequestPayload::AuthChallengeRequest(..) | UserAgentRequestPayload::AuthChallengeSolution(..) => { warn!(?payload, "Unsupported post-auth user agent request"); - let _ = bi - .send(Err(Status::invalid_argument( - "Unsupported user-agent request", - ))) - .await; - return Err(()); - } - - }; - - bi.send(Ok(UserAgentResponse { - id: Some(request_id), - payload: Some(payload), - })) - .await - .map_err(|_| ()) -} - -async fn send_out_of_band( - bi: &mut GrpcBi, - oob: OutOfBand, -) -> Result<(), ()> { - let payload = match oob { - OutOfBand::ClientConnectionRequest { profile } => { - UserAgentResponsePayload::SdkClientConnectionRequest(ProtoSdkClientConnectionRequest { - pubkey: profile.pubkey.to_bytes().to_vec(), - info: Some(ProtoClientMetadata { - name: profile.metadata.name, - description: profile.metadata.description, - version: profile.metadata.version, - }), - }) - } - OutOfBand::ClientConnectionCancel { pubkey } => { - UserAgentResponsePayload::SdkClientConnectionCancel(ProtoSdkClientConnectionCancel { - pubkey: pubkey.to_bytes().to_vec(), - }) + return Err(Status::invalid_argument("Unsupported user-agent request")); } }; - bi.send(Ok(UserAgentResponse { - id: None, - payload: Some(payload), - })) - .await - .map_err(|_| ()) -} - -struct SdkClient; - -impl SdkClient { - fn list_response( - result: Result< - Vec<(crate::db::models::ProgramClient, crate::db::models::ProgramClientMetadata)>, - SendError, - >, - ) -> ProtoSdkClientListResponse { - let result = match result { - Ok(clients) => ProtoSdkClientListResult::Clients(ProtoSdkClientList { - clients: clients - .into_iter() - .map(|(client, metadata)| ProtoSdkClientEntry { - id: client.id, - pubkey: client.public_key, - info: Some(ProtoClientMetadata { - name: metadata.name, - description: metadata.description, - version: metadata.version, - }), - created_at: client.created_at.0.timestamp() as i32, - }) - .collect(), - }), - Err(err) => { - warn!(error = ?err, "Failed to list SDK clients"); - ProtoSdkClientListResult::Error(ProtoSdkClientError::Internal.into()) - } - }; - - ProtoSdkClientListResponse { - result: Some(result), - } - } -} - -fn parse_grant_request( - shared: Option, - specific: Option, -) -> Result<(SharedGrantSettings, SpecificGrant), Status> { - let shared = shared.ok_or_else(|| Status::invalid_argument("Missing shared grant settings"))?; - let specific = - specific.ok_or_else(|| Status::invalid_argument("Missing specific grant settings"))?; - - Ok(( - shared_settings_from_proto(shared)?, - specific_grant_from_proto(specific)?, - )) -} - -fn shared_settings_from_proto(shared: ProtoSharedSettings) -> Result { - Ok(SharedGrantSettings { - wallet_access_id: shared.wallet_access_id, - chain: shared.chain_id, - valid_from: shared.valid_from.map(proto_timestamp_to_utc).transpose()?, - valid_until: shared.valid_until.map(proto_timestamp_to_utc).transpose()?, - max_gas_fee_per_gas: shared - .max_gas_fee_per_gas - .as_deref() - .map(u256_from_proto_bytes) - .transpose()?, - max_priority_fee_per_gas: shared - .max_priority_fee_per_gas - .as_deref() - .map(u256_from_proto_bytes) - .transpose()?, - rate_limit: shared.rate_limit.map(|limit| TransactionRateLimit { - count: limit.count, - window: chrono::Duration::seconds(limit.window_secs), - }), - }) -} - -fn specific_grant_from_proto(specific: ProtoSpecificGrant) -> Result { - match specific.grant { - Some(ProtoSpecificGrantType::EtherTransfer(ProtoEtherTransferSettings { - targets, - limit, - })) => Ok(SpecificGrant::EtherTransfer(ether_transfer::Settings { - target: targets - .into_iter() - .map(address_from_bytes) - .collect::>()?, - limit: volume_rate_limit_from_proto(limit.ok_or_else(|| { - Status::invalid_argument("Missing ether transfer volume rate limit") - })?)?, - })), - Some(ProtoSpecificGrantType::TokenTransfer(ProtoTokenTransferSettings { - token_contract, - target, - volume_limits, - })) => Ok(SpecificGrant::TokenTransfer(token_transfers::Settings { - token_contract: address_from_bytes(token_contract)?, - target: target.map(address_from_bytes).transpose()?, - volume_limits: volume_limits - .into_iter() - .map(volume_rate_limit_from_proto) - .collect::>()?, - })), - None => Err(Status::invalid_argument("Missing specific grant kind")), - } -} - -fn volume_rate_limit_from_proto(limit: ProtoVolumeRateLimit) -> Result { - Ok(VolumeRateLimit { - max_volume: u256_from_proto_bytes(&limit.max_volume)?, - window: chrono::Duration::seconds(limit.window_secs), - }) -} - -fn address_from_bytes(bytes: Vec) -> Result { - if bytes.len() != 20 { - return Err(Status::invalid_argument("Invalid EVM address")); - } - - Ok(Address::from_slice(&bytes)) -} - -fn u256_from_proto_bytes(bytes: &[u8]) -> Result { - if bytes.len() > 32 { - return Err(Status::invalid_argument("Invalid U256 byte length")); - } - - Ok(U256::from_be_slice(bytes)) -} - -fn proto_timestamp_to_utc(timestamp: ProtoTimestamp) -> Result, Status> { - Utc.timestamp_opt(timestamp.seconds, timestamp.nanos as u32) - .single() - .ok_or_else(|| Status::invalid_argument("Invalid timestamp")) -} - -fn shared_settings_to_proto(shared: SharedGrantSettings) -> ProtoSharedSettings { - ProtoSharedSettings { - wallet_access_id: shared.wallet_access_id, - chain_id: shared.chain, - valid_from: shared.valid_from.map(|time| ProtoTimestamp { - seconds: time.timestamp(), - nanos: time.timestamp_subsec_nanos() as i32, - }), - valid_until: shared.valid_until.map(|time| ProtoTimestamp { - seconds: time.timestamp(), - nanos: time.timestamp_subsec_nanos() as i32, - }), - max_gas_fee_per_gas: shared - .max_gas_fee_per_gas - .map(|value| value.to_be_bytes::<32>().to_vec()), - max_priority_fee_per_gas: shared - .max_priority_fee_per_gas - .map(|value| value.to_be_bytes::<32>().to_vec()), - rate_limit: shared.rate_limit.map(|limit| ProtoTransactionRateLimit { - count: limit.count, - window_secs: limit.window.num_seconds(), - }), - } -} - -fn specific_grant_to_proto(grant: SpecificGrant) -> ProtoSpecificGrant { - let grant = match grant { - SpecificGrant::EtherTransfer(settings) => { - ProtoSpecificGrantType::EtherTransfer(ProtoEtherTransferSettings { - targets: settings - .target - .into_iter() - .map(|address| address.to_vec()) - .collect(), - limit: Some(ProtoVolumeRateLimit { - max_volume: settings.limit.max_volume.to_be_bytes::<32>().to_vec(), - window_secs: settings.limit.window.num_seconds(), - }), - }) - } - SpecificGrant::TokenTransfer(settings) => { - ProtoSpecificGrantType::TokenTransfer(ProtoTokenTransferSettings { - token_contract: settings.token_contract.to_vec(), - target: settings.target.map(|address| address.to_vec()), - volume_limits: settings - .volume_limits - .into_iter() - .map(|limit| ProtoVolumeRateLimit { - max_volume: limit.max_volume.to_be_bytes::<32>().to_vec(), - window_secs: limit.window.num_seconds(), - }) - .collect(), - }) - } - }; - - ProtoSpecificGrant { grant: Some(grant) } -} - -struct EvmGrantOrWallet; - -impl EvmGrantOrWallet { - fn wallet_create_response( - result: Result>, - ) -> WalletCreateResponse { - let result = match result { - Ok(wallet) => WalletCreateResult::Wallet(WalletEntry { - address: wallet.to_vec(), - }), - Err(err) => { - warn!(error = ?err, "Failed to create EVM wallet"); - WalletCreateResult::Error(ProtoEvmError::Internal.into()) - } - }; - - WalletCreateResponse { - result: Some(result), - } - } - - fn wallet_list_response( - result: Result, SendError>, - ) -> WalletListResponse { - let result = match result { - Ok(wallets) => WalletListResult::Wallets(WalletList { - wallets: wallets - .into_iter() - .map(|wallet| WalletEntry { - address: wallet.to_vec(), - }) - .collect(), - }), - Err(err) => { - warn!(error = ?err, "Failed to list EVM wallets"); - WalletListResult::Error(ProtoEvmError::Internal.into()) - } - }; - - WalletListResponse { - result: Some(result), - } - } - - fn grant_create_response( - result: Result>, - ) -> EvmGrantCreateResponse { - let result = match result { - Ok(grant_id) => EvmGrantCreateResult::GrantId(grant_id), - Err(err) => { - warn!(error = ?err, "Failed to create EVM grant"); - EvmGrantCreateResult::Error(ProtoEvmError::Internal.into()) - } - }; - - EvmGrantCreateResponse { - result: Some(result), - } - } - - fn grant_delete_response(result: Result<(), SendError>) -> EvmGrantDeleteResponse { - let result = match result { - Ok(()) => EvmGrantDeleteResult::Ok(()), - Err(err) => { - warn!(error = ?err, "Failed to delete EVM grant"); - EvmGrantDeleteResult::Error(ProtoEvmError::Internal.into()) - } - }; - - EvmGrantDeleteResponse { - result: Some(result), - } - } - - fn grant_list_response( - result: Result>, SendError>, - ) -> EvmGrantListResponse { - let result = match result { - Ok(grants) => EvmGrantListResult::Grants(EvmGrantList { - grants: grants - .into_iter() - .map(|grant| GrantEntry { - id: grant.id, - wallet_access_id: grant.shared.wallet_access_id, - shared: Some(shared_settings_to_proto(grant.shared)), - specific: Some(specific_grant_to_proto(grant.settings)), - }) - .collect(), - }), - Err(err) => { - warn!(error = ?err, "Failed to list EVM grants"); - EvmGrantListResult::Error(ProtoEvmError::Internal.into()) - } - }; - - EvmGrantListResponse { - result: Some(result), - } - } + Ok(Some(response)) } pub async fn start( diff --git a/server/crates/arbiter-server/src/grpc/user_agent/inbound.rs b/server/crates/arbiter-server/src/grpc/user_agent/inbound.rs new file mode 100644 index 0000000..15466a2 --- /dev/null +++ b/server/crates/arbiter-server/src/grpc/user_agent/inbound.rs @@ -0,0 +1,135 @@ +use arbiter_proto::proto::evm::{ + EtherTransferSettings as ProtoEtherTransferSettings, + SharedSettings as ProtoSharedSettings, + SpecificGrant as ProtoSpecificGrant, + TokenTransferSettings as ProtoTokenTransferSettings, + TransactionRateLimit as ProtoTransactionRateLimit, + VolumeRateLimit as ProtoVolumeRateLimit, + specific_grant::Grant as ProtoSpecificGrantType, +}; +use alloy::primitives::{Address, U256}; +use chrono::{DateTime, TimeZone, Utc}; +use prost_types::Timestamp as ProtoTimestamp; +use tonic::Status; + +use crate::{ + evm::policies::{ + SharedGrantSettings, SpecificGrant, TransactionRateLimit, VolumeRateLimit, + ether_transfer, token_transfers, + }, + grpc::TryConvert, +}; + +fn address_from_bytes(bytes: Vec) -> Result { + if bytes.len() != 20 { + return Err(Status::invalid_argument("Invalid EVM address")); + } + Ok(Address::from_slice(&bytes)) +} + +fn u256_from_proto_bytes(bytes: &[u8]) -> Result { + if bytes.len() > 32 { + return Err(Status::invalid_argument("Invalid U256 byte length")); + } + Ok(U256::from_be_slice(bytes)) +} + +impl TryConvert for ProtoTimestamp { + type Output = DateTime; + type Error = Status; + + fn try_convert(self) -> Result, Status> { + Utc.timestamp_opt(self.seconds, self.nanos as u32) + .single() + .ok_or_else(|| Status::invalid_argument("Invalid timestamp")) + } +} + +impl TryConvert for ProtoTransactionRateLimit { + type Output = TransactionRateLimit; + type Error = Status; + + fn try_convert(self) -> Result { + Ok(TransactionRateLimit { + count: self.count, + window: chrono::Duration::seconds(self.window_secs), + }) + } +} + +impl TryConvert for ProtoVolumeRateLimit { + type Output = VolumeRateLimit; + type Error = Status; + + fn try_convert(self) -> Result { + Ok(VolumeRateLimit { + max_volume: u256_from_proto_bytes(&self.max_volume)?, + window: chrono::Duration::seconds(self.window_secs), + }) + } +} + +impl TryConvert for ProtoSharedSettings { + type Output = SharedGrantSettings; + type Error = Status; + + fn try_convert(self) -> Result { + Ok(SharedGrantSettings { + wallet_access_id: self.wallet_access_id, + chain: self.chain_id, + valid_from: self.valid_from.map(ProtoTimestamp::try_convert).transpose()?, + valid_until: self.valid_until.map(ProtoTimestamp::try_convert).transpose()?, + max_gas_fee_per_gas: self + .max_gas_fee_per_gas + .as_deref() + .map(u256_from_proto_bytes) + .transpose()?, + max_priority_fee_per_gas: self + .max_priority_fee_per_gas + .as_deref() + .map(u256_from_proto_bytes) + .transpose()?, + rate_limit: self + .rate_limit + .map(ProtoTransactionRateLimit::try_convert) + .transpose()?, + }) + } +} + +impl TryConvert for ProtoSpecificGrant { + type Output = SpecificGrant; + type Error = Status; + + fn try_convert(self) -> Result { + match self.grant { + Some(ProtoSpecificGrantType::EtherTransfer(ProtoEtherTransferSettings { + targets, + limit, + })) => Ok(SpecificGrant::EtherTransfer(ether_transfer::Settings { + target: targets + .into_iter() + .map(address_from_bytes) + .collect::>()?, + limit: limit + .ok_or_else(|| { + Status::invalid_argument("Missing ether transfer volume rate limit") + })? + .try_convert()?, + })), + Some(ProtoSpecificGrantType::TokenTransfer(ProtoTokenTransferSettings { + token_contract, + target, + volume_limits, + })) => Ok(SpecificGrant::TokenTransfer(token_transfers::Settings { + token_contract: address_from_bytes(token_contract)?, + target: target.map(address_from_bytes).transpose()?, + volume_limits: volume_limits + .into_iter() + .map(ProtoVolumeRateLimit::try_convert) + .collect::>()?, + })), + None => Err(Status::invalid_argument("Missing specific grant kind")), + } + } +} diff --git a/server/crates/arbiter-server/src/grpc/user_agent/outbound.rs b/server/crates/arbiter-server/src/grpc/user_agent/outbound.rs new file mode 100644 index 0000000..ddc6313 --- /dev/null +++ b/server/crates/arbiter-server/src/grpc/user_agent/outbound.rs @@ -0,0 +1,92 @@ +use arbiter_proto::proto::evm::{ + EtherTransferSettings as ProtoEtherTransferSettings, + SharedSettings as ProtoSharedSettings, + SpecificGrant as ProtoSpecificGrant, + TokenTransferSettings as ProtoTokenTransferSettings, + TransactionRateLimit as ProtoTransactionRateLimit, + VolumeRateLimit as ProtoVolumeRateLimit, + specific_grant::Grant as ProtoSpecificGrantType, +}; +use chrono::{DateTime, Utc}; +use prost_types::Timestamp as ProtoTimestamp; + +use crate::{ + evm::policies::{SharedGrantSettings, SpecificGrant, TransactionRateLimit, VolumeRateLimit}, + grpc::Convert, +}; + +impl Convert for DateTime { + type Output = ProtoTimestamp; + + fn convert(self) -> ProtoTimestamp { + ProtoTimestamp { + seconds: self.timestamp(), + nanos: self.timestamp_subsec_nanos() as i32, + } + } +} + +impl Convert for TransactionRateLimit { + type Output = ProtoTransactionRateLimit; + + fn convert(self) -> ProtoTransactionRateLimit { + ProtoTransactionRateLimit { + count: self.count, + window_secs: self.window.num_seconds(), + } + } +} + +impl Convert for VolumeRateLimit { + type Output = ProtoVolumeRateLimit; + + fn convert(self) -> ProtoVolumeRateLimit { + ProtoVolumeRateLimit { + max_volume: self.max_volume.to_be_bytes::<32>().to_vec(), + window_secs: self.window.num_seconds(), + } + } +} + +impl Convert for SharedGrantSettings { + type Output = ProtoSharedSettings; + + fn convert(self) -> ProtoSharedSettings { + ProtoSharedSettings { + wallet_access_id: self.wallet_access_id, + chain_id: self.chain, + valid_from: self.valid_from.map(DateTime::convert), + valid_until: self.valid_until.map(DateTime::convert), + max_gas_fee_per_gas: self + .max_gas_fee_per_gas + .map(|value| value.to_be_bytes::<32>().to_vec()), + max_priority_fee_per_gas: self + .max_priority_fee_per_gas + .map(|value| value.to_be_bytes::<32>().to_vec()), + rate_limit: self.rate_limit.map(TransactionRateLimit::convert), + } + } +} + +impl Convert for SpecificGrant { + type Output = ProtoSpecificGrant; + + fn convert(self) -> ProtoSpecificGrant { + let grant = match self { + SpecificGrant::EtherTransfer(s) => { + ProtoSpecificGrantType::EtherTransfer(ProtoEtherTransferSettings { + targets: s.target.into_iter().map(|a| a.to_vec()).collect(), + limit: Some(s.limit.convert()), + }) + } + SpecificGrant::TokenTransfer(s) => { + ProtoSpecificGrantType::TokenTransfer(ProtoTokenTransferSettings { + token_contract: s.token_contract.to_vec(), + target: s.target.map(|a| a.to_vec()), + volume_limits: s.volume_limits.into_iter().map(VolumeRateLimit::convert).collect(), + }) + } + }; + ProtoSpecificGrant { grant: Some(grant) } + } +}