merge: evm into main
Some checks failed
ci/woodpecker/push/server-audit Pipeline was successful
ci/woodpecker/push/server-lint Pipeline failed
ci/woodpecker/push/server-vet Pipeline failed
ci/woodpecker/push/server-test Pipeline was successful

This commit was merged in pull request #25.
This commit is contained in:
hdbg
2026-03-12 16:24:45 +01:00
12 changed files with 586 additions and 282 deletions

View File

@@ -4,6 +4,52 @@ This document covers concrete technology choices and dependencies. For the archi
--- ---
## Client Connection Flow
### New Client Approval
When a client whose public key is not yet in the database connects, all connected user agents are asked to approve the connection. The first agent to respond determines the outcome; remaining requests are cancelled via a watch channel.
```mermaid
flowchart TD
A([Client connects]) --> B[Receive AuthChallengeRequest]
B --> C{pubkey in DB?}
C -- yes --> D[Read nonce\nIncrement nonce in DB]
D --> G
C -- no --> E[Ask all UserAgents:\nClientConnectionRequest]
E --> F{First response}
F -- denied --> Z([Reject connection])
F -- approved --> F2[Cancel remaining\nUserAgent requests]
F2 --> F3[INSERT client\nnonce = 1]
F3 --> G[Send AuthChallenge\nwith nonce]
G --> H[Receive AuthChallengeSolution]
H --> I{Signature valid?}
I -- no --> Z
I -- yes --> J([Session started])
```
### Known Issue: Concurrent Registration Race (TOCTOU)
Two connections presenting the same previously-unknown public key can race through the approval flow simultaneously:
1. Both check the DB → neither is registered.
2. Both request approval from user agents → both receive approval.
3. Both `INSERT` the client record → the second insert silently overwrites the first, resetting the nonce.
This means the first connection's nonce is invalidated by the second, causing its challenge verification to fail. A fix requires either serialising new-client registration (e.g. an in-memory lock keyed on pubkey) or replacing the separate check + insert with an `INSERT OR IGNORE` / upsert guarded by a unique constraint on `public_key`.
### Nonce Semantics
The `program_client.nonce` column stores the **next usable nonce** — i.e. it is always one ahead of the nonce last issued in a challenge.
- **New client:** inserted with `nonce = 1`; the first challenge is issued with `nonce = 0`.
- **Existing client:** the current DB value is read and used as the challenge nonce, then immediately incremented within the same exclusive transaction, preventing replay.
---
## Cryptography ## Cryptography
### Authentication ### Authentication

View File

@@ -23,15 +23,23 @@ message ClientRequest {
oneof payload { oneof payload {
AuthChallengeRequest auth_challenge_request = 1; AuthChallengeRequest auth_challenge_request = 1;
AuthChallengeSolution auth_challenge_solution = 2; AuthChallengeSolution auth_challenge_solution = 2;
arbiter.evm.EvmSignTransactionRequest evm_sign_transaction = 3;
arbiter.evm.EvmAnalyzeTransactionRequest evm_analyze_transaction = 4;
} }
} }
message ClientConnectError {
enum Code {
UNKNOWN = 0;
APPROVAL_DENIED = 1;
NO_USER_AGENTS_ONLINE = 2;
}
Code code = 1;
}
message ClientResponse { message ClientResponse {
oneof payload { oneof payload {
AuthChallenge auth_challenge = 1; AuthChallenge auth_challenge = 1;
AuthOk auth_ok = 2; AuthOk auth_ok = 2;
ClientConnectError client_connect_error = 5;
arbiter.evm.EvmSignTransactionResponse evm_sign_transaction = 3; arbiter.evm.EvmSignTransactionResponse evm_sign_transaction = 3;
arbiter.evm.EvmAnalyzeTransactionResponse evm_analyze_transaction = 4; arbiter.evm.EvmAnalyzeTransactionResponse evm_analyze_transaction = 4;
} }

View File

@@ -49,6 +49,16 @@ enum VaultState {
VAULT_STATE_ERROR = 4; VAULT_STATE_ERROR = 4;
} }
message ClientConnectionRequest {
bytes pubkey = 1;
}
message ClientConnectionResponse {
bool approved = 1;
}
message ClientConnectionCancel {}
message UserAgentRequest { message UserAgentRequest {
oneof payload { oneof payload {
AuthChallengeRequest auth_challenge_request = 1; AuthChallengeRequest auth_challenge_request = 1;
@@ -61,6 +71,7 @@ message UserAgentRequest {
arbiter.evm.EvmGrantCreateRequest evm_grant_create = 8; arbiter.evm.EvmGrantCreateRequest evm_grant_create = 8;
arbiter.evm.EvmGrantDeleteRequest evm_grant_delete = 9; arbiter.evm.EvmGrantDeleteRequest evm_grant_delete = 9;
arbiter.evm.EvmGrantListRequest evm_grant_list = 10; arbiter.evm.EvmGrantListRequest evm_grant_list = 10;
ClientConnectionResponse client_connection_response = 11;
} }
} }
message UserAgentResponse { message UserAgentResponse {
@@ -75,5 +86,7 @@ message UserAgentResponse {
arbiter.evm.EvmGrantCreateResponse evm_grant_create = 8; arbiter.evm.EvmGrantCreateResponse evm_grant_create = 8;
arbiter.evm.EvmGrantDeleteResponse evm_grant_delete = 9; arbiter.evm.EvmGrantDeleteResponse evm_grant_delete = 9;
arbiter.evm.EvmGrantListResponse evm_grant_list = 10; arbiter.evm.EvmGrantListResponse evm_grant_list = 10;
ClientConnectionRequest client_connection_request = 11;
ClientConnectionCancel client_connection_cancel = 12;
} }
} }

View File

@@ -83,6 +83,23 @@ use async_trait::async_trait;
pub enum Error { pub enum Error {
#[error("Transport channel is closed")] #[error("Transport channel is closed")]
ChannelClosed, ChannelClosed,
#[error("Unexpected message received")]
UnexpectedMessage,
}
/// Receives one message from `transport` and extracts a value from it using
/// `extractor`. Returns [`Error::ChannelClosed`] if the transport closes and
/// [`Error::UnexpectedMessage`] if `extractor` returns `None`.
pub async fn expect_message<T, Inbound, Outbound, Target, F>(
transport: &mut T,
extractor: F,
) -> Result<Target, Error>
where
T: Bi<Inbound, Outbound> + ?Sized,
F: FnOnce(Inbound) -> Option<Target>,
{
let msg = transport.recv().await.ok_or(Error::ChannelClosed)?;
extractor(msg).ok_or(Error::UnexpectedMessage)
} }
/// Minimal bidirectional transport abstraction used by protocol code. /// Minimal bidirectional transport abstraction used by protocol code.

View File

@@ -0,0 +1,251 @@
use arbiter_proto::{
format_challenge,
proto::client::{
AuthChallenge, AuthChallengeSolution, ClientConnectError, ClientRequest, ClientResponse,
client_connect_error::Code as ConnectErrorCode,
client_request::Payload as ClientRequestPayload,
client_response::Payload as ClientResponsePayload,
},
transport::expect_message,
};
use diesel::{
ExpressionMethods as _, OptionalExtension as _, QueryDsl as _, dsl::insert_into, update,
};
use diesel_async::RunQueryDsl as _;
use ed25519_dalek::VerifyingKey;
use kameo::error::SendError;
use tracing::error;
use crate::{
actors::{client::ClientConnection, router::{self, RequestClientApproval}},
db::{self, schema::program_client},
};
use super::session::ClientSession;
#[derive(thiserror::Error, Debug, Clone, PartialEq, Eq)]
pub enum Error {
#[error("Unexpected message payload")]
UnexpectedMessagePayload,
#[error("Invalid client public key length")]
InvalidClientPubkeyLength,
#[error("Invalid client public key encoding")]
InvalidAuthPubkeyEncoding,
#[error("Database pool unavailable")]
DatabasePoolUnavailable,
#[error("Database operation failed")]
DatabaseOperationFailed,
#[error("Invalid challenge solution")]
InvalidChallengeSolution,
#[error("Client approval request failed")]
ApproveError(#[from] ApproveError),
#[error("Internal error")]
InternalError,
#[error("Transport error")]
Transport,
}
#[derive(thiserror::Error, Debug, Clone, PartialEq, Eq)]
pub enum ApproveError {
#[error("Internal error")]
Internal,
#[error("Client connection denied by user agents")]
Denied,
#[error("Upstream error: {0}")]
Upstream(router::ApprovalError),
}
/// Atomically reads and increments the nonce for a known client.
/// Returns `None` if the pubkey is not registered.
async fn get_nonce(db: &db::DatabasePool, pubkey: &VerifyingKey) -> Result<Option<i32>, Error> {
let pubkey_bytes = pubkey.as_bytes().to_vec();
let mut conn = db.get().await.map_err(|e| {
error!(error = ?e, "Database pool error");
Error::DatabasePoolUnavailable
})?;
conn.exclusive_transaction(|conn| {
let pubkey_bytes = pubkey_bytes.clone();
Box::pin(async move {
let Some(current_nonce) = program_client::table
.filter(program_client::public_key.eq(&pubkey_bytes))
.select(program_client::nonce)
.first::<i32>(conn)
.await
.optional()?
else {
return Result::<_, diesel::result::Error>::Ok(None);
};
update(program_client::table)
.filter(program_client::public_key.eq(&pubkey_bytes))
.set(program_client::nonce.eq(current_nonce + 1))
.execute(conn)
.await?;
Ok(Some(current_nonce))
})
})
.await
.map_err(|e| {
error!(error = ?e, "Database error");
Error::DatabaseOperationFailed
})
}
async fn approve_new_client(
actors: &crate::actors::GlobalActors,
pubkey: VerifyingKey,
) -> Result<(), Error> {
let result = actors
.router
.ask(RequestClientApproval { client_pubkey: pubkey })
.await;
match result {
Ok(true) => Ok(()),
Ok(false) => Err(Error::ApproveError(ApproveError::Denied)),
Err(SendError::HandlerError(e)) => {
error!(error = ?e, "Approval upstream error");
Err(Error::ApproveError(ApproveError::Upstream(e)))
}
Err(e) => {
error!(error = ?e, "Approval request to router failed");
Err(Error::ApproveError(ApproveError::Internal))
}
}
}
async fn insert_client(db: &db::DatabasePool, pubkey: &VerifyingKey) -> Result<(), Error> {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs() as i32;
let mut conn = db.get().await.map_err(|e| {
error!(error = ?e, "Database pool error");
Error::DatabasePoolUnavailable
})?;
insert_into(program_client::table)
.values((
program_client::public_key.eq(pubkey.as_bytes().to_vec()),
program_client::nonce.eq(1), // pre-incremented; challenge uses 0
program_client::created_at.eq(now),
program_client::updated_at.eq(now),
))
.execute(&mut conn)
.await
.map_err(|e| {
error!(error = ?e, "Failed to insert new client");
Error::DatabaseOperationFailed
})?;
Ok(())
}
async fn challenge_client(
props: &mut ClientConnection,
pubkey: VerifyingKey,
nonce: i32,
) -> Result<(), Error> {
let challenge = AuthChallenge {
pubkey: pubkey.as_bytes().to_vec(),
nonce,
};
props
.transport
.send(Ok(ClientResponse {
payload: Some(ClientResponsePayload::AuthChallenge(challenge.clone())),
}))
.await
.map_err(|e| {
error!(error = ?e, "Failed to send auth challenge");
Error::Transport
})?;
let AuthChallengeSolution { signature } = expect_message(
&mut *props.transport,
|req: ClientRequest| match req.payload? {
ClientRequestPayload::AuthChallengeSolution(s) => Some(s),
_ => None,
},
)
.await
.map_err(|e| {
error!(error = ?e, "Failed to receive challenge solution");
Error::Transport
})?;
let formatted = format_challenge(nonce, &challenge.pubkey);
let sig = signature.as_slice().try_into().map_err(|_| {
error!("Invalid signature length");
Error::InvalidChallengeSolution
})?;
pubkey.verify_strict(&formatted, &sig).map_err(|_| {
error!("Challenge solution verification failed");
Error::InvalidChallengeSolution
})?;
Ok(())
}
fn connect_error_code(err: &Error) -> ConnectErrorCode {
match err {
Error::ApproveError(ApproveError::Denied) => ConnectErrorCode::ApprovalDenied,
Error::ApproveError(ApproveError::Upstream(router::ApprovalError::NoUserAgentsConnected)) => {
ConnectErrorCode::NoUserAgentsOnline
}
_ => ConnectErrorCode::Unknown,
}
}
async fn authenticate(props: &mut ClientConnection) -> Result<VerifyingKey, Error> {
let Some(ClientRequest {
payload: Some(ClientRequestPayload::AuthChallengeRequest(challenge)),
}) = props.transport.recv().await
else {
return Err(Error::Transport);
};
let pubkey_bytes = challenge
.pubkey
.as_array()
.ok_or(Error::InvalidClientPubkeyLength)?;
let pubkey =
VerifyingKey::from_bytes(pubkey_bytes).map_err(|_| Error::InvalidAuthPubkeyEncoding)?;
let nonce = match get_nonce(&props.db, &pubkey).await? {
Some(nonce) => nonce,
None => {
approve_new_client(&props.actors, pubkey).await?;
insert_client(&props.db, &pubkey).await?;
0
}
};
challenge_client(props, pubkey, nonce).await?;
Ok(pubkey)
}
pub async fn authenticate_and_create(mut props: ClientConnection) -> Result<ClientSession, Error> {
match authenticate(&mut props).await {
Ok(pubkey) => Ok(ClientSession::new(props, pubkey)),
Err(err) => {
let code = connect_error_code(&err);
let _ = props
.transport
.send(Ok(ClientResponse {
payload: Some(ClientResponsePayload::ClientConnectError(
ClientConnectError { code: code.into() },
)),
}))
.await;
Err(err)
}
}
}

View File

@@ -1,102 +0,0 @@
use arbiter_proto::proto::client::{
AuthChallengeRequest, AuthChallengeSolution, ClientRequest,
client_request::Payload as ClientRequestPayload,
};
use ed25519_dalek::VerifyingKey;
use tracing::error;
use crate::actors::client::{
ClientConnection,
auth::state::{AuthContext, AuthStateMachine},
session::ClientSession,
};
#[derive(thiserror::Error, Debug, Clone, PartialEq, Eq)]
pub enum Error {
#[error("Unexpected message payload")]
UnexpectedMessagePayload,
#[error("Invalid client public key length")]
InvalidClientPubkeyLength,
#[error("Invalid client public key encoding")]
InvalidAuthPubkeyEncoding,
#[error("Database pool unavailable")]
DatabasePoolUnavailable,
#[error("Database operation failed")]
DatabaseOperationFailed,
#[error("Public key not registered")]
PublicKeyNotRegistered,
#[error("Invalid signature length")]
InvalidSignatureLength,
#[error("Invalid challenge solution")]
InvalidChallengeSolution,
#[error("Transport error")]
Transport,
}
mod state;
use state::*;
fn parse_auth_event(payload: ClientRequestPayload) -> Result<AuthEvents, Error> {
match payload {
ClientRequestPayload::AuthChallengeRequest(AuthChallengeRequest { pubkey }) => {
let pubkey_bytes = pubkey.as_array().ok_or(Error::InvalidClientPubkeyLength)?;
let pubkey = VerifyingKey::from_bytes(pubkey_bytes)
.map_err(|_| Error::InvalidAuthPubkeyEncoding)?;
Ok(AuthEvents::AuthRequest(ChallengeRequest {
pubkey: pubkey.into(),
}))
}
ClientRequestPayload::AuthChallengeSolution(AuthChallengeSolution { signature }) => {
Ok(AuthEvents::ReceivedSolution(ChallengeSolution {
solution: signature,
}))
}
_ => Err(Error::UnexpectedMessagePayload) ,
}
}
pub async fn authenticate(props: &mut ClientConnection) -> Result<VerifyingKey, Error> {
let mut state = AuthStateMachine::new(AuthContext::new(props));
loop {
let transport = state.context_mut().conn.transport.as_mut();
let Some(ClientRequest {
payload: Some(payload),
}) = transport.recv().await
else {
return Err(Error::Transport);
};
let event = parse_auth_event(payload)?;
match state.process_event(event).await {
Ok(AuthStates::AuthOk(key)) => return Ok(key.clone()),
Err(AuthError::ActionFailed(err)) => {
error!(?err, "State machine action failed");
return Err(err);
}
Err(AuthError::GuardFailed(err)) => {
error!(?err, "State machine guard failed");
return Err(err);
}
Err(AuthError::InvalidEvent) => {
error!("Invalid event for current state");
return Err(Error::InvalidChallengeSolution);
}
Err(AuthError::TransitionsFailed) => {
error!("Invalid state transition");
return Err(Error::InvalidChallengeSolution);
}
_ => (),
}
}
}
pub async fn authenticate_and_create(
mut props: ClientConnection,
) -> Result<ClientSession, Error> {
let key = authenticate(&mut props).await?;
let session = ClientSession::new(props, key);
Ok(session)
}

View File

@@ -1,136 +0,0 @@
use arbiter_proto::proto::client::{
AuthChallenge, ClientResponse,
client_response::Payload as ClientResponsePayload,
};
use diesel::{ExpressionMethods as _, OptionalExtension as _, QueryDsl, update};
use diesel_async::RunQueryDsl;
use ed25519_dalek::VerifyingKey;
use tracing::error;
use super::Error;
use crate::{actors::client::ClientConnection, db::schema};
pub struct ChallengeRequest {
pub pubkey: VerifyingKey,
}
pub struct ChallengeContext {
pub challenge: AuthChallenge,
pub key: VerifyingKey,
}
pub struct ChallengeSolution {
pub solution: Vec<u8>,
}
smlang::statemachine!(
name: Auth,
custom_error: true,
transitions: {
*Init + AuthRequest(ChallengeRequest) / async prepare_challenge = SentChallenge(ChallengeContext),
SentChallenge(ChallengeContext) + ReceivedSolution(ChallengeSolution) [async verify_solution] / provide_key = AuthOk(VerifyingKey),
}
);
async fn create_nonce(db: &crate::db::DatabasePool, pubkey_bytes: &[u8]) -> Result<i32, Error> {
let mut db_conn = db.get().await.map_err(|e| {
error!(error = ?e, "Database pool error");
Error::DatabasePoolUnavailable
})?;
db_conn
.exclusive_transaction(|conn| {
Box::pin(async move {
let current_nonce = schema::program_client::table
.filter(schema::program_client::public_key.eq(pubkey_bytes.to_vec()))
.select(schema::program_client::nonce)
.first::<i32>(conn)
.await?;
update(schema::program_client::table)
.filter(schema::program_client::public_key.eq(pubkey_bytes.to_vec()))
.set(schema::program_client::nonce.eq(current_nonce + 1))
.execute(conn)
.await?;
Result::<_, diesel::result::Error>::Ok(current_nonce)
})
})
.await
.optional()
.map_err(|e| {
error!(error = ?e, "Database error");
Error::DatabaseOperationFailed
})?
.ok_or_else(|| {
error!(?pubkey_bytes, "Public key not found in database");
Error::PublicKeyNotRegistered
})
}
pub struct AuthContext<'a> {
pub(super) conn: &'a mut ClientConnection,
}
impl<'a> AuthContext<'a> {
pub fn new(conn: &'a mut ClientConnection) -> Self {
Self { conn }
}
}
impl AuthStateMachineContext for AuthContext<'_> {
type Error = Error;
async fn verify_solution(
&self,
ChallengeContext { challenge, key }: &ChallengeContext,
ChallengeSolution { solution }: &ChallengeSolution,
) -> Result<bool, Self::Error> {
let formatted_challenge =
arbiter_proto::format_challenge(challenge.nonce, &challenge.pubkey);
let signature = solution.as_slice().try_into().map_err(|_| {
error!(?solution, "Invalid signature length");
Error::InvalidChallengeSolution
})?;
let valid = key.verify_strict(&formatted_challenge, &signature).is_ok();
Ok(valid)
}
async fn prepare_challenge(
&mut self,
ChallengeRequest { pubkey }: ChallengeRequest,
) -> Result<ChallengeContext, Self::Error> {
let nonce = create_nonce(&self.conn.db, pubkey.as_bytes()).await?;
let challenge = AuthChallenge {
pubkey: pubkey.as_bytes().to_vec(),
nonce,
};
self.conn
.transport
.send(Ok(ClientResponse {
payload: Some(ClientResponsePayload::AuthChallenge(challenge.clone())),
}))
.await
.map_err(|e| {
error!(?e, "Failed to send auth challenge");
Error::Transport
})?;
Ok(ChallengeContext {
challenge,
key: pubkey,
})
}
fn provide_key(
&mut self,
state_data: &ChallengeContext,
_: ChallengeSolution,
) -> Result<VerifyingKey, Self::Error> {
Ok(state_data.key)
}
}

View File

@@ -1,17 +1,20 @@
use std::{ use std::{collections::HashMap, ops::ControlFlow};
collections::{HashMap},
ops::ControlFlow,
};
use ed25519_dalek::VerifyingKey;
use kameo::{ use kameo::{
Actor, Actor,
actor::{ActorId, ActorRef}, actor::{ActorId, ActorRef},
messages, messages,
prelude::{ActorStopReason, Context, WeakActorRef}, prelude::{ActorStopReason, Context, WeakActorRef},
reply::DelegatedReply,
}; };
use tracing::info; use tokio::{sync::watch, task::JoinSet};
use tracing::{info, warn};
use crate::actors::{client::session::ClientSession, user_agent::session::UserAgentSession}; use crate::actors::{
client::session::ClientSession,
user_agent::session::{RequestNewClientApproval, UserAgentSession},
};
#[derive(Default)] #[derive(Default)]
pub struct MessageRouter { pub struct MessageRouter {
@@ -53,6 +56,74 @@ impl Actor for MessageRouter {
} }
} }
#[derive(Debug, thiserror::Error, Clone, PartialEq, Eq, Hash)]
pub enum ApprovalError {
#[error("No user agents connected")]
NoUserAgentsConnected,
}
async fn request_client_approval(
user_agents: &[WeakActorRef<UserAgentSession>],
client_pubkey: VerifyingKey,
) -> Result<bool, ApprovalError> {
if user_agents.is_empty() {
return Err(ApprovalError::NoUserAgentsConnected).into();
}
let mut pool = JoinSet::new();
let (cancel_tx, cancel_rx) = watch::channel(());
for weak_ref in user_agents {
match weak_ref.upgrade() {
Some(agent) => {
let client_pubkey = client_pubkey.clone();
let cancel_rx = cancel_rx.clone();
pool.spawn(async move {
agent
.ask(RequestNewClientApproval {
client_pubkey,
cancel_flag: cancel_rx.clone(),
})
.await
});
}
None => {
warn!(
id = weak_ref.id().to_string(),
actor = "MessageRouter",
event = "useragent.disconnected_before_approval"
);
}
}
}
while let Some(result) = pool.join_next().await {
match result {
Ok(Ok(approved)) => {
// cancel other pending requests
let _ = cancel_tx.send(());
return Ok(approved);
}
Ok(Err(err)) => {
warn!(
?err,
actor = "MessageRouter",
event = "useragent.approval_error"
);
}
Err(err) => {
warn!(
?err,
actor = "MessageRouter",
event = "useragent.approval_task_failed"
);
}
}
}
Err(ApprovalError::NoUserAgentsConnected)
}
#[messages] #[messages]
impl MessageRouter { impl MessageRouter {
#[message(ctx)] #[message(ctx)]
@@ -76,4 +147,29 @@ impl MessageRouter {
ctx.actor_ref().link(&actor).await; ctx.actor_ref().link(&actor).await;
self.clients.insert(actor.id(), actor); self.clients.insert(actor.id(), actor);
} }
#[message(ctx)]
pub async fn request_client_approval(
&mut self,
client_pubkey: VerifyingKey,
ctx: &mut Context<Self, DelegatedReply<Result<bool, ApprovalError>>>,
) -> DelegatedReply<Result<bool, ApprovalError>> {
let (reply, Some(reply_sender)) = ctx.reply_sender() else {
panic!("Exptected `request_client_approval` to have callback channel");
};
let weak_refs = self
.user_agents
.values()
.map(|agent| agent.downgrade())
.collect::<Vec<_>>();
// handle in subtask to not to lock the actor
tokio::task::spawn(async move {
let result = request_client_approval(&weak_refs, client_pubkey).await;
let _ = reply_sender.send(result);
});
reply
}
} }

View File

@@ -11,7 +11,7 @@ use crate::{
}; };
#[derive(Debug, thiserror::Error, PartialEq)] #[derive(Debug, thiserror::Error, PartialEq)]
pub enum UserAgentError { pub enum TransportResponseError {
#[error("Expected message with payload")] #[error("Expected message with payload")]
MissingRequestPayload, MissingRequestPayload,
#[error("Unexpected request payload")] #[error("Unexpected request payload")]
@@ -31,7 +31,7 @@ pub enum UserAgentError {
} }
pub type Transport = pub type Transport =
Box<dyn Bi<UserAgentRequest, Result<UserAgentResponse, UserAgentError>> + Send>; Box<dyn Bi<UserAgentRequest, Result<UserAgentResponse, TransportResponseError>> + Send>;
pub struct UserAgentConnection { pub struct UserAgentConnection {
db: db::DatabasePool, db: db::DatabasePool,

View File

@@ -3,7 +3,7 @@ use std::{ops::DerefMut, sync::Mutex};
use arbiter_proto::proto::{ use arbiter_proto::proto::{
evm as evm_proto, evm as evm_proto,
user_agent::{ user_agent::{
UnsealEncryptedKey, UnsealResult, UnsealStart, UnsealStartResponse, UserAgentRequest, ClientConnectionCancel, ClientConnectionRequest, UnsealEncryptedKey, UnsealResult, UnsealStart, UnsealStartResponse, UserAgentRequest,
UserAgentResponse, user_agent_request::Payload as UserAgentRequestPayload, UserAgentResponse, user_agent_request::Payload as UserAgentRequestPayload,
user_agent_response::Payload as UserAgentResponsePayload, user_agent_response::Payload as UserAgentResponsePayload,
}, },
@@ -12,10 +12,10 @@ use chacha20poly1305::{AeadInPlace, XChaCha20Poly1305, XNonce, aead::KeyInit};
use ed25519_dalek::VerifyingKey; use ed25519_dalek::VerifyingKey;
use kameo::{ use kameo::{
Actor, Actor,
error::SendError, error::SendError, messages, prelude::Context,
}; };
use memsafe::MemSafe; use memsafe::MemSafe;
use tokio::select; use tokio::{select, sync::watch};
use tracing::{error, info}; use tracing::{error, info};
use x25519_dalek::{EphemeralSecret, PublicKey}; use x25519_dalek::{EphemeralSecret, PublicKey};
@@ -23,12 +23,22 @@ use crate::actors::{
evm::{Generate, ListWallets}, evm::{Generate, ListWallets},
keyholder::{self, TryUnseal}, keyholder::{self, TryUnseal},
router::RegisterUserAgent, router::RegisterUserAgent,
user_agent::{UserAgentConnection, UserAgentError}, user_agent::{TransportResponseError, UserAgentConnection},
}; };
mod state; mod state;
use state::{DummyContext, UnsealContext, UserAgentEvents, UserAgentStateMachine, UserAgentStates}; use state::{DummyContext, UnsealContext, UserAgentEvents, UserAgentStateMachine, UserAgentStates};
// Error for consumption by other actors
#[derive(Debug, thiserror::Error, PartialEq)]
pub enum Error {
#[error("User agent session ended due to connection loss")]
ConnectionLost,
#[error("User agent session ended due to unexpected message")]
UnexpectedMessage,
}
pub struct UserAgentSession { pub struct UserAgentSession {
props: UserAgentConnection, props: UserAgentConnection,
key: VerifyingKey, key: VerifyingKey,
@@ -36,7 +46,7 @@ pub struct UserAgentSession {
} }
impl UserAgentSession { impl UserAgentSession {
pub(crate) fn new(props: UserAgentConnection, key: VerifyingKey) -> Self { pub(crate) fn new(props: UserAgentConnection, key: VerifyingKey) -> Self {
Self { Self {
props, props,
key, key,
@@ -44,18 +54,118 @@ impl UserAgentSession {
} }
} }
fn transition(&mut self, event: UserAgentEvents) -> Result<(), UserAgentError> { fn transition(&mut self, event: UserAgentEvents) -> Result<(), TransportResponseError> {
self.state.process_event(event).map_err(|e| { self.state.process_event(event).map_err(|e| {
error!(?e, "State transition failed"); error!(?e, "State transition failed");
UserAgentError::StateTransitionFailed TransportResponseError::StateTransitionFailed
})?; })?;
Ok(()) Ok(())
} }
async fn send_msg<Reply: kameo::Reply>(
&mut self,
msg: UserAgentResponsePayload,
_ctx: &mut Context<Self, Reply>,
) -> Result<(), Error> {
self.props
.transport
.send(Ok(response(msg)))
.await
.map_err(|_| {
error!(
actor = "useragent",
reason = "channel closed",
"send.failed"
);
Error::ConnectionLost
})
}
async fn expect_msg<Extractor, Msg, Reply>(
&mut self,
extractor: Extractor,
ctx: &mut Context<Self, Reply>,
) -> Result<Msg, Error>
where
Extractor: FnOnce(UserAgentRequestPayload) -> Option<Msg>,
Reply: kameo::Reply,
{
let msg = self.props.transport.recv().await.ok_or_else(|| {
error!(
actor = "useragent",
reason = "channel closed",
"recv.failed"
);
ctx.stop();
Error::ConnectionLost
})?;
msg.payload.and_then(extractor).ok_or_else(|| {
error!(
actor = "useragent",
reason = "unexpected message",
"recv.failed"
);
ctx.stop();
Error::UnexpectedMessage
})
}
}
#[messages]
impl UserAgentSession {
// TODO: Think about refactoring it to state-machine based flow, as we already have one
#[message(ctx)]
pub async fn request_new_client_approval(
&mut self,
client_pubkey: VerifyingKey,
mut cancel_flag: watch::Receiver<()>,
ctx: &mut Context<Self, Result<bool, Error>>,
) -> Result<bool, Error> {
self.send_msg(
UserAgentResponsePayload::ClientConnectionRequest(
ClientConnectionRequest {
pubkey: client_pubkey.as_bytes().to_vec(),
}
.into(),
),
ctx,
)
.await?;
let extractor = |msg| {
if let UserAgentRequestPayload::ClientConnectionResponse(client_connection_response) =
msg
{
Some(client_connection_response)
} else {
None
}
};
tokio::select! {
_ = cancel_flag.changed() => {
info!(actor = "useragent", "client connection approval cancelled");
self.send_msg(
UserAgentResponsePayload::ClientConnectionCancel(ClientConnectionCancel {}),
ctx,
).await?;
return Ok(false);
}
result = self.expect_msg(extractor, ctx) => {
let result = result?;
info!(actor = "useragent", "received client connection approval result: approved={}", result.approved);
return Ok(result.approved);
}
}
}
}
impl UserAgentSession {
pub async fn process_transport_inbound(&mut self, req: UserAgentRequest) -> Output { pub async fn process_transport_inbound(&mut self, req: UserAgentRequest) -> Output {
let msg = req.payload.ok_or_else(|| { let msg = req.payload.ok_or_else(|| {
error!(actor = "useragent", "Received message with no payload"); error!(actor = "useragent", "Received message with no payload");
UserAgentError::MissingRequestPayload TransportResponseError::MissingRequestPayload
})?; })?;
match msg { match msg {
@@ -67,12 +177,12 @@ impl UserAgentSession {
} }
UserAgentRequestPayload::EvmWalletCreate(_) => self.handle_evm_wallet_create().await, UserAgentRequestPayload::EvmWalletCreate(_) => self.handle_evm_wallet_create().await,
UserAgentRequestPayload::EvmWalletList(_) => self.handle_evm_wallet_list().await, UserAgentRequestPayload::EvmWalletList(_) => self.handle_evm_wallet_list().await,
_ => Err(UserAgentError::UnexpectedRequestPayload), _ => Err(TransportResponseError::UnexpectedRequestPayload),
} }
} }
} }
type Output = Result<UserAgentResponse, UserAgentError>; type Output = Result<UserAgentResponse, TransportResponseError>;
fn response(payload: UserAgentResponsePayload) -> UserAgentResponse { fn response(payload: UserAgentResponsePayload) -> UserAgentResponse {
UserAgentResponse { UserAgentResponse {
@@ -88,7 +198,7 @@ impl UserAgentSession {
let client_pubkey_bytes: [u8; 32] = req let client_pubkey_bytes: [u8; 32] = req
.client_pubkey .client_pubkey
.try_into() .try_into()
.map_err(|_| UserAgentError::InvalidClientPubkeyLength)?; .map_err(|_| TransportResponseError::InvalidClientPubkeyLength)?;
let client_public_key = PublicKey::from(client_pubkey_bytes); let client_public_key = PublicKey::from(client_pubkey_bytes);
@@ -107,7 +217,7 @@ impl UserAgentSession {
async fn handle_unseal_encrypted_key(&mut self, req: UnsealEncryptedKey) -> Output { async fn handle_unseal_encrypted_key(&mut self, req: UnsealEncryptedKey) -> Output {
let UserAgentStates::WaitingForUnsealKey(unseal_context) = self.state.state() else { let UserAgentStates::WaitingForUnsealKey(unseal_context) = self.state.state() else {
error!("Received unseal encrypted key in invalid state"); error!("Received unseal encrypted key in invalid state");
return Err(UserAgentError::InvalidStateForUnsealEncryptedKey); return Err(TransportResponseError::InvalidStateForUnsealEncryptedKey);
}; };
let ephemeral_secret = { let ephemeral_secret = {
let mut secret_lock = unseal_context.secret.lock().unwrap(); let mut secret_lock = unseal_context.secret.lock().unwrap();
@@ -172,7 +282,7 @@ impl UserAgentSession {
Err(err) => { Err(err) => {
error!(?err, "Failed to send unseal request to keyholder"); error!(?err, "Failed to send unseal request to keyholder");
self.transition(UserAgentEvents::ReceivedInvalidKey)?; self.transition(UserAgentEvents::ReceivedInvalidKey)?;
Err(UserAgentError::KeyHolderActorUnreachable) Err(TransportResponseError::KeyHolderActorUnreachable)
} }
} }
} }
@@ -248,7 +358,7 @@ fn map_evm_error<M>(op: &str, err: SendError<M, crate::actors::evm::Error>) -> e
impl Actor for UserAgentSession { impl Actor for UserAgentSession {
type Args = Self; type Args = Self;
type Error = UserAgentError; type Error = TransportResponseError;
async fn on_start( async fn on_start(
args: Self::Args, args: Self::Args,
@@ -263,7 +373,7 @@ impl Actor for UserAgentSession {
.await .await
.map_err(|err| { .map_err(|err| {
error!(?err, "Failed to register user agent connection with router"); error!(?err, "Failed to register user agent connection with router");
UserAgentError::ConnectionRegistrationFailed TransportResponseError::ConnectionRegistrationFailed
})?; })?;
Ok(args) Ok(args)
} }

View File

@@ -2,7 +2,10 @@
#![allow(clippy::all)] #![allow(clippy::all)]
use crate::db::schema::{ use crate::db::schema::{
self, aead_encrypted, arbiter_settings, evm_basic_grant, evm_ether_transfer_grant, evm_ether_transfer_grant_target, evm_ether_transfer_limit, evm_token_transfer_grant, evm_token_transfer_log, evm_token_transfer_volume_limit, evm_transaction_log, evm_wallet, root_key_history, tls_history self, aead_encrypted, arbiter_settings, evm_basic_grant, evm_ether_transfer_grant,
evm_ether_transfer_grant_target, evm_ether_transfer_limit, evm_token_transfer_grant,
evm_token_transfer_log, evm_token_transfer_volume_limit, evm_transaction_log, evm_wallet,
root_key_history, tls_history,
}; };
use chrono::{DateTime, Utc}; use chrono::{DateTime, Utc};
use diesel::{prelude::*, sqlite::Sqlite}; use diesel::{prelude::*, sqlite::Sqlite};
@@ -65,8 +68,8 @@ pub mod types {
}; };
let unix_timestamp = bytes.read_long(); let unix_timestamp = bytes.read_long();
let datetime = DateTime::from_timestamp(unix_timestamp, 0) let datetime =
.ok_or("Timestamp is out of bounds")?; DateTime::from_timestamp(unix_timestamp, 0).ok_or("Timestamp is out of bounds")?;
Ok(SqliteTimestamp(datetime)) Ok(SqliteTimestamp(datetime))
} }
@@ -150,7 +153,7 @@ pub struct EvmWallet {
pub created_at: SqliteTimestamp, pub created_at: SqliteTimestamp,
} }
#[derive(Queryable, Debug)] #[derive(Queryable, Debug, Insertable, Selectable)]
#[diesel(table_name = schema::program_client, check_for_backend(Sqlite))] #[diesel(table_name = schema::program_client, check_for_backend(Sqlite))]
pub struct ProgramClient { pub struct ProgramClient {
pub id: i32, pub id: i32,
@@ -253,7 +256,6 @@ pub struct EvmEtherTransferGrantTarget {
pub address: Vec<u8>, pub address: Vec<u8>,
} }
#[derive(Models, Queryable, Debug, Insertable, Selectable)] #[derive(Models, Queryable, Debug, Insertable, Selectable)]
#[diesel(table_name = evm_token_transfer_grant, check_for_backend(Sqlite))] #[diesel(table_name = evm_token_transfer_grant, check_for_backend(Sqlite))]
#[view( #[view(

View File

@@ -16,7 +16,7 @@ use tracing::info;
use crate::{ use crate::{
actors::{ actors::{
client::{self, ClientError, ClientConnection as ClientConnectionProps, connect_client}, client::{self, ClientError, ClientConnection as ClientConnectionProps, connect_client},
user_agent::{self, UserAgentConnection, UserAgentError, connect_user_agent}, user_agent::{self, UserAgentConnection, TransportResponseError, connect_user_agent},
}, },
context::ServerContext, context::ServerContext,
}; };
@@ -31,7 +31,7 @@ const DEFAULT_CHANNEL_SIZE: usize = 1000;
struct UserAgentGrpcSender; struct UserAgentGrpcSender;
impl SendConverter for UserAgentGrpcSender { impl SendConverter for UserAgentGrpcSender {
type Input = Result<UserAgentResponse, UserAgentError>; type Input = Result<UserAgentResponse, TransportResponseError>;
type Output = Result<UserAgentResponse, Status>; type Output = Result<UserAgentResponse, Status>;
fn convert(&self, item: Self::Input) -> Self::Output { fn convert(&self, item: Self::Input) -> Self::Output {
@@ -78,31 +78,30 @@ fn client_auth_error_status(value: &client::auth::Error) -> Status {
Error::InvalidAuthPubkeyEncoding => { Error::InvalidAuthPubkeyEncoding => {
Status::invalid_argument("Failed to convert pubkey to VerifyingKey") Status::invalid_argument("Failed to convert pubkey to VerifyingKey")
} }
Error::InvalidSignatureLength => Status::invalid_argument("Invalid signature length"), Error::InvalidChallengeSolution => Status::unauthenticated(value.to_string()),
Error::PublicKeyNotRegistered | Error::InvalidChallengeSolution => { Error::ApproveError(_) => Status::permission_denied(value.to_string()),
Status::unauthenticated(value.to_string())
}
Error::Transport => Status::internal("Transport error"), Error::Transport => Status::internal("Transport error"),
Error::DatabasePoolUnavailable => Status::internal("Database pool error"), Error::DatabasePoolUnavailable => Status::internal("Database pool error"),
Error::DatabaseOperationFailed => Status::internal("Database error"), Error::DatabaseOperationFailed => Status::internal("Database error"),
Error::InternalError => Status::internal("Internal error"),
} }
} }
fn user_agent_error_status(value: UserAgentError) -> Status { fn user_agent_error_status(value: TransportResponseError) -> Status {
match value { match value {
UserAgentError::MissingRequestPayload | UserAgentError::UnexpectedRequestPayload => { TransportResponseError::MissingRequestPayload | TransportResponseError::UnexpectedRequestPayload => {
Status::invalid_argument("Expected message with payload") Status::invalid_argument("Expected message with payload")
} }
UserAgentError::InvalidStateForUnsealEncryptedKey => { TransportResponseError::InvalidStateForUnsealEncryptedKey => {
Status::failed_precondition("Invalid state for unseal encrypted key") Status::failed_precondition("Invalid state for unseal encrypted key")
} }
UserAgentError::InvalidClientPubkeyLength => { TransportResponseError::InvalidClientPubkeyLength => {
Status::invalid_argument("client_pubkey must be 32 bytes") Status::invalid_argument("client_pubkey must be 32 bytes")
} }
UserAgentError::StateTransitionFailed => Status::internal("State machine error"), TransportResponseError::StateTransitionFailed => Status::internal("State machine error"),
UserAgentError::KeyHolderActorUnreachable => Status::internal("Vault is not available"), TransportResponseError::KeyHolderActorUnreachable => Status::internal("Vault is not available"),
UserAgentError::Auth(ref err) => auth_error_status(err), TransportResponseError::Auth(ref err) => auth_error_status(err),
UserAgentError::ConnectionRegistrationFailed => { TransportResponseError::ConnectionRegistrationFailed => {
Status::internal("Failed registering connection") Status::internal("Failed registering connection")
} }
} }