feat(poc): add terrors PoC crate scaffold and error types

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
CleverWild
2026-03-15 19:21:55 +01:00
parent 84978afd58
commit 02980468db
18 changed files with 1144 additions and 283 deletions

View File

@@ -8,19 +8,13 @@ use arbiter_proto::{
},
transport::expect_message,
};
use diesel::{
ExpressionMethods as _, OptionalExtension as _, QueryDsl as _, dsl::insert_into, update,
};
use diesel::{ExpressionMethods as _, OptionalExtension as _, QueryDsl as _, update};
use diesel_async::RunQueryDsl as _;
use ed25519_dalek::VerifyingKey;
use kameo::error::SendError;
use tracing::error;
use crate::{
actors::{
client::ClientConnection,
router::{self, RequestClientApproval},
},
actors::client::ClientConnection,
db::{self, schema::program_client},
};
@@ -40,27 +34,20 @@ pub enum Error {
DatabaseOperationFailed,
#[error("Invalid challenge solution")]
InvalidChallengeSolution,
#[error("Client approval request failed")]
ApproveError(#[from] ApproveError),
#[error("Client not registered")]
NotRegistered,
#[error("Internal error")]
InternalError,
#[error("Transport error")]
Transport,
}
#[derive(thiserror::Error, Debug, Clone, PartialEq, Eq)]
pub enum ApproveError {
#[error("Internal error")]
Internal,
#[error("Client connection denied by user agents")]
Denied,
#[error("Upstream error: {0}")]
Upstream(router::ApprovalError),
}
/// Atomically reads and increments the nonce for a known client.
/// Returns `None` if the pubkey is not registered.
async fn get_nonce(db: &db::DatabasePool, pubkey: &VerifyingKey) -> Result<Option<i32>, Error> {
async fn get_nonce(
db: &db::DatabasePool,
pubkey: &VerifyingKey,
) -> Result<Option<(i32, i32)>, Error> {
let pubkey_bytes = pubkey.as_bytes().to_vec();
let mut conn = db.get().await.map_err(|e| {
@@ -71,10 +58,10 @@ async fn get_nonce(db: &db::DatabasePool, pubkey: &VerifyingKey) -> Result<Optio
conn.exclusive_transaction(|conn| {
let pubkey_bytes = pubkey_bytes.clone();
Box::pin(async move {
let Some(current_nonce) = program_client::table
let Some((client_id, current_nonce)) = program_client::table
.filter(program_client::public_key.eq(&pubkey_bytes))
.select(program_client::nonce)
.first::<i32>(conn)
.select((program_client::id, program_client::nonce))
.first::<(i32, i32)>(conn)
.await
.optional()?
else {
@@ -87,7 +74,7 @@ async fn get_nonce(db: &db::DatabasePool, pubkey: &VerifyingKey) -> Result<Optio
.execute(conn)
.await?;
Ok(Some(current_nonce))
Ok(Some((client_id, current_nonce)))
})
})
.await
@@ -97,59 +84,6 @@ async fn get_nonce(db: &db::DatabasePool, pubkey: &VerifyingKey) -> Result<Optio
})
}
async fn approve_new_client(
actors: &crate::actors::GlobalActors,
pubkey: VerifyingKey,
) -> Result<(), Error> {
let result = actors
.router
.ask(RequestClientApproval {
client_pubkey: pubkey,
})
.await;
match result {
Ok(true) => Ok(()),
Ok(false) => Err(Error::ApproveError(ApproveError::Denied)),
Err(SendError::HandlerError(e)) => {
error!(error = ?e, "Approval upstream error");
Err(Error::ApproveError(ApproveError::Upstream(e)))
}
Err(e) => {
error!(error = ?e, "Approval request to router failed");
Err(Error::ApproveError(ApproveError::Internal))
}
}
}
async fn insert_client(db: &db::DatabasePool, pubkey: &VerifyingKey) -> Result<(), Error> {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs() as i32;
let mut conn = db.get().await.map_err(|e| {
error!(error = ?e, "Database pool error");
Error::DatabasePoolUnavailable
})?;
insert_into(program_client::table)
.values((
program_client::public_key.eq(pubkey.as_bytes().to_vec()),
program_client::nonce.eq(1), // pre-incremented; challenge uses 0
program_client::created_at.eq(now),
program_client::updated_at.eq(now),
))
.execute(&mut conn)
.await
.map_err(|e| {
error!(error = ?e, "Failed to insert new client");
Error::DatabaseOperationFailed
})?;
Ok(())
}
async fn challenge_client(
props: &mut ClientConnection,
pubkey: VerifyingKey,
@@ -200,15 +134,12 @@ async fn challenge_client(
fn connect_error_code(err: &Error) -> ConnectErrorCode {
match err {
Error::ApproveError(ApproveError::Denied) => ConnectErrorCode::ApprovalDenied,
Error::ApproveError(ApproveError::Upstream(
router::ApprovalError::NoUserAgentsConnected,
)) => ConnectErrorCode::NoUserAgentsOnline,
Error::NotRegistered => ConnectErrorCode::ApprovalDenied,
_ => ConnectErrorCode::Unknown,
}
}
async fn authenticate(props: &mut ClientConnection) -> Result<VerifyingKey, Error> {
async fn authenticate(props: &mut ClientConnection) -> Result<(VerifyingKey, i32), Error> {
let Some(ClientRequest {
payload: Some(ClientRequestPayload::AuthChallengeRequest(challenge)),
}) = props.transport.recv().await
@@ -223,23 +154,19 @@ async fn authenticate(props: &mut ClientConnection) -> Result<VerifyingKey, Erro
let pubkey =
VerifyingKey::from_bytes(pubkey_bytes).map_err(|_| Error::InvalidAuthPubkeyEncoding)?;
let nonce = match get_nonce(&props.db, &pubkey).await? {
Some(nonce) => nonce,
None => {
approve_new_client(&props.actors, pubkey).await?;
insert_client(&props.db, &pubkey).await?;
0
}
let (client_id, nonce) = match get_nonce(&props.db, &pubkey).await? {
Some((client_id, nonce)) => (client_id, nonce),
None => return Err(Error::NotRegistered),
};
challenge_client(props, pubkey, nonce).await?;
Ok(pubkey)
Ok((pubkey, client_id))
}
pub async fn authenticate_and_create(mut props: ClientConnection) -> Result<ClientSession, Error> {
match authenticate(&mut props).await {
Ok(_pubkey) => Ok(ClientSession::new(props)),
Ok((_pubkey, client_id)) => Ok(ClientSession::new(props, client_id)),
Err(err) => {
let code = connect_error_code(&err);
let _ = props

View File

@@ -1,19 +1,35 @@
use arbiter_proto::proto::client::{ClientRequest, ClientResponse};
use alloy::{consensus::TxEip1559, primitives::Address, rlp::Decodable};
use arbiter_proto::proto::{
client::{
ClientRequest, ClientResponse, client_request::Payload as ClientRequestPayload,
client_response::Payload as ClientResponsePayload,
},
evm::{
EvmError, EvmSignTransactionResponse, evm_sign_transaction_response::Result as SignResult,
},
};
use kameo::Actor;
use tokio::select;
use tracing::{error, info};
use crate::{actors::{
GlobalActors, client::{ClientError, ClientConnection}, router::RegisterClient
}, db};
use crate::{
actors::{
GlobalActors,
client::{ClientConnection, ClientError},
evm::ClientSignTransaction,
router::RegisterClient,
},
db,
};
pub struct ClientSession {
props: ClientConnection,
client_id: i32,
}
impl ClientSession {
pub(crate) fn new(props: ClientConnection) -> Self {
Self { props }
pub(crate) fn new(props: ClientConnection, client_id: i32) -> Self {
Self { props, client_id }
}
pub async fn process_transport_inbound(&mut self, req: ClientRequest) -> Output {
@@ -22,8 +38,46 @@ impl ClientSession {
ClientError::MissingRequestPayload
})?;
let _ = msg;
Err(ClientError::UnexpectedRequestPayload)
match msg {
ClientRequestPayload::EvmSignTransaction(sign_req) => {
let wallet_address: [u8; 20] = sign_req
.wallet_address
.try_into()
.map_err(|_| ClientError::UnexpectedRequestPayload)?;
let mut rlp_bytes: &[u8] = &sign_req.rlp_transaction;
let tx = TxEip1559::decode(&mut rlp_bytes)
.map_err(|_| ClientError::UnexpectedRequestPayload)?;
let result = self
.props
.actors
.evm
.ask(ClientSignTransaction {
client_id: self.client_id,
wallet_address: Address::from_slice(&wallet_address),
transaction: tx,
})
.await;
let response_result = match result {
Ok(signature) => SignResult::Signature(signature.as_bytes().to_vec()),
Err(err) => {
error!(?err, "client sign transaction failed");
SignResult::Error(EvmError::Internal.into())
}
};
Ok(ClientResponse {
payload: Some(ClientResponsePayload::EvmSignTransaction(
EvmSignTransactionResponse {
result: Some(response_result),
},
)),
})
}
_ => Err(ClientError::UnexpectedRequestPayload),
}
}
}
@@ -89,6 +143,9 @@ impl ClientSession {
use arbiter_proto::transport::DummyTransport;
let transport: super::Transport = Box::new(DummyTransport::new());
let props = ClientConnection::new(db, transport, actors);
Self { props }
Self {
props,
client_id: 0,
}
}
}

View File

@@ -1,20 +1,14 @@
use std::{collections::HashMap, ops::ControlFlow};
use ed25519_dalek::VerifyingKey;
use kameo::{
Actor,
actor::{ActorId, ActorRef},
messages,
prelude::{ActorStopReason, Context, WeakActorRef},
reply::DelegatedReply,
};
use tokio::{sync::watch, task::JoinSet};
use tracing::{info, warn};
use tracing::info;
use crate::actors::{
client::session::ClientSession,
user_agent::session::{RequestNewClientApproval, UserAgentSession},
};
use crate::actors::{client::session::ClientSession, user_agent::session::UserAgentSession};
#[derive(Default)]
pub struct MessageRouter {
@@ -56,73 +50,6 @@ impl Actor for MessageRouter {
}
}
#[derive(Debug, thiserror::Error, Clone, PartialEq, Eq, Hash)]
pub enum ApprovalError {
#[error("No user agents connected")]
NoUserAgentsConnected,
}
async fn request_client_approval(
user_agents: &[WeakActorRef<UserAgentSession>],
client_pubkey: VerifyingKey,
) -> Result<bool, ApprovalError> {
if user_agents.is_empty() {
return Err(ApprovalError::NoUserAgentsConnected);
}
let mut pool = JoinSet::new();
let (cancel_tx, cancel_rx) = watch::channel(());
for weak_ref in user_agents {
match weak_ref.upgrade() {
Some(agent) => {
let cancel_rx = cancel_rx.clone();
pool.spawn(async move {
agent
.ask(RequestNewClientApproval {
client_pubkey,
cancel_flag: cancel_rx.clone(),
})
.await
});
}
None => {
warn!(
id = weak_ref.id().to_string(),
actor = "MessageRouter",
event = "useragent.disconnected_before_approval"
);
}
}
}
while let Some(result) = pool.join_next().await {
match result {
Ok(Ok(approved)) => {
// cancel other pending requests
let _ = cancel_tx.send(());
return Ok(approved);
}
Ok(Err(err)) => {
warn!(
?err,
actor = "MessageRouter",
event = "useragent.approval_error"
);
}
Err(err) => {
warn!(
?err,
actor = "MessageRouter",
event = "useragent.approval_task_failed"
);
}
}
}
Err(ApprovalError::NoUserAgentsConnected)
}
#[messages]
impl MessageRouter {
#[message(ctx)]
@@ -146,29 +73,4 @@ impl MessageRouter {
ctx.actor_ref().link(&actor).await;
self.clients.insert(actor.id(), actor);
}
#[message(ctx)]
pub async fn request_client_approval(
&mut self,
client_pubkey: VerifyingKey,
ctx: &mut Context<Self, DelegatedReply<Result<bool, ApprovalError>>>,
) -> DelegatedReply<Result<bool, ApprovalError>> {
let (reply, Some(reply_sender)) = ctx.reply_sender() else {
panic!("Exptected `request_client_approval` to have callback channel");
};
let weak_refs = self
.user_agents
.values()
.map(|agent| agent.downgrade())
.collect::<Vec<_>>();
// handle in subtask to not to lock the actor
tokio::task::spawn(async move {
let result = request_client_approval(&weak_refs, client_pubkey).await;
reply_sender.send(result);
});
reply
}
}

View File

@@ -3,25 +3,32 @@ use std::{ops::DerefMut, sync::Mutex};
use arbiter_proto::proto::{
evm as evm_proto,
user_agent::{
ClientConnectionCancel, ClientConnectionRequest, UnsealEncryptedKey, UnsealResult,
SdkClientApproveRequest, SdkClientApproveResponse, SdkClientEntry,
SdkClientError as ProtoSdkClientError, SdkClientList, SdkClientListResponse,
SdkClientRevokeRequest, SdkClientRevokeResponse, UnsealEncryptedKey, UnsealResult,
UnsealStart, UnsealStartResponse, UserAgentRequest, UserAgentResponse,
sdk_client_approve_response, sdk_client_list_response, sdk_client_revoke_response,
user_agent_request::Payload as UserAgentRequestPayload,
user_agent_response::Payload as UserAgentResponsePayload,
},
};
use chacha20poly1305::{AeadInPlace, XChaCha20Poly1305, XNonce, aead::KeyInit};
use ed25519_dalek::VerifyingKey;
use kameo::{Actor, error::SendError, messages, prelude::Context};
use diesel::{ExpressionMethods as _, QueryDsl as _, dsl::insert_into};
use diesel_async::RunQueryDsl as _;
use kameo::{Actor, error::SendError, prelude::Context};
use memsafe::MemSafe;
use tokio::{select, sync::watch};
use tokio::select;
use tracing::{error, info};
use x25519_dalek::{EphemeralSecret, PublicKey};
use crate::actors::{
evm::{Generate, ListWallets},
keyholder::{self, TryUnseal},
router::RegisterUserAgent,
user_agent::{TransportResponseError, UserAgentConnection},
use crate::{
actors::{
evm::{Generate, ListWallets},
keyholder::{self, TryUnseal},
router::RegisterUserAgent,
user_agent::{TransportResponseError, UserAgentConnection},
},
db::schema::program_client,
};
mod state;
@@ -108,52 +115,6 @@ impl UserAgentSession {
}
}
#[messages]
impl UserAgentSession {
// TODO: Think about refactoring it to state-machine based flow, as we already have one
#[message(ctx)]
pub async fn request_new_client_approval(
&mut self,
client_pubkey: VerifyingKey,
mut cancel_flag: watch::Receiver<()>,
ctx: &mut Context<Self, Result<bool, Error>>,
) -> Result<bool, Error> {
self.send_msg(
UserAgentResponsePayload::ClientConnectionRequest(ClientConnectionRequest {
pubkey: client_pubkey.as_bytes().to_vec(),
}),
ctx,
)
.await?;
let extractor = |msg| {
if let UserAgentRequestPayload::ClientConnectionResponse(client_connection_response) =
msg
{
Some(client_connection_response)
} else {
None
}
};
tokio::select! {
_ = cancel_flag.changed() => {
info!(actor = "useragent", "client connection approval cancelled");
self.send_msg(
UserAgentResponsePayload::ClientConnectionCancel(ClientConnectionCancel {}),
ctx,
).await?;
Ok(false)
}
result = self.expect_msg(extractor, ctx) => {
let result = result?;
info!(actor = "useragent", "received client connection approval result: approved={}", result.approved);
Ok(result.approved)
}
}
}
}
impl UserAgentSession {
pub async fn process_transport_inbound(&mut self, req: UserAgentRequest) -> Output {
let msg = req.payload.ok_or_else(|| {
@@ -170,6 +131,13 @@ impl UserAgentSession {
}
UserAgentRequestPayload::EvmWalletCreate(_) => self.handle_evm_wallet_create().await,
UserAgentRequestPayload::EvmWalletList(_) => self.handle_evm_wallet_list().await,
UserAgentRequestPayload::SdkClientApprove(req) => {
self.handle_sdk_client_approve(req).await
}
UserAgentRequestPayload::SdkClientRevoke(req) => {
self.handle_sdk_client_revoke(req).await
}
UserAgentRequestPayload::SdkClientList(_) => self.handle_sdk_client_list().await,
_ => Err(TransportResponseError::UnexpectedRequestPayload),
}
}
@@ -331,6 +299,204 @@ impl UserAgentSession {
}
}
impl UserAgentSession {
async fn handle_sdk_client_approve(&mut self, req: SdkClientApproveRequest) -> Output {
use sdk_client_approve_response::Result as ApproveResult;
if req.pubkey.len() != 32 {
return Ok(response(UserAgentResponsePayload::SdkClientApprove(
SdkClientApproveResponse {
result: Some(ApproveResult::Error(ProtoSdkClientError::Internal.into())),
},
)));
}
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs() as i32;
let mut conn = match self.props.db.get().await {
Ok(c) => c,
Err(e) => {
error!(?e, "Failed to get DB connection for sdk_client_approve");
return Ok(response(UserAgentResponsePayload::SdkClientApprove(
SdkClientApproveResponse {
result: Some(ApproveResult::Error(ProtoSdkClientError::Internal.into())),
},
)));
}
};
let pubkey_bytes = req.pubkey.clone();
let insert_result = insert_into(program_client::table)
.values((
program_client::public_key.eq(&pubkey_bytes),
program_client::nonce.eq(1), // pre-incremented; challenge will use nonce=0
program_client::created_at.eq(now),
program_client::updated_at.eq(now),
))
.execute(&mut conn)
.await;
match insert_result {
Ok(_) => {
match program_client::table
.filter(program_client::public_key.eq(&pubkey_bytes))
.order(program_client::id.desc())
.select((
program_client::id,
program_client::public_key,
program_client::created_at,
))
.first::<(i32, Vec<u8>, i32)>(&mut conn)
.await
{
Ok((id, pubkey, created_at)) => Ok(response(
UserAgentResponsePayload::SdkClientApprove(SdkClientApproveResponse {
result: Some(ApproveResult::Client(SdkClientEntry {
id,
pubkey,
created_at,
})),
}),
)),
Err(e) => {
error!(?e, "Failed to fetch inserted SDK client");
Ok(response(UserAgentResponsePayload::SdkClientApprove(
SdkClientApproveResponse {
result: Some(ApproveResult::Error(
ProtoSdkClientError::Internal.into(),
)),
},
)))
}
}
}
Err(diesel::result::Error::DatabaseError(
diesel::result::DatabaseErrorKind::UniqueViolation,
_,
)) => Ok(response(UserAgentResponsePayload::SdkClientApprove(
SdkClientApproveResponse {
result: Some(ApproveResult::Error(
ProtoSdkClientError::AlreadyExists.into(),
)),
},
))),
Err(e) => {
error!(?e, "Failed to insert SDK client");
Ok(response(UserAgentResponsePayload::SdkClientApprove(
SdkClientApproveResponse {
result: Some(ApproveResult::Error(ProtoSdkClientError::Internal.into())),
},
)))
}
}
}
async fn handle_sdk_client_list(&mut self) -> Output {
let mut conn = match self.props.db.get().await {
Ok(c) => c,
Err(e) => {
error!(?e, "Failed to get DB connection for sdk_client_list");
return Ok(response(UserAgentResponsePayload::SdkClientList(
SdkClientListResponse {
result: Some(sdk_client_list_response::Result::Error(
ProtoSdkClientError::Internal.into(),
)),
},
)));
}
};
match program_client::table
.select((
program_client::id,
program_client::public_key,
program_client::created_at,
))
.load::<(i32, Vec<u8>, i32)>(&mut conn)
.await
{
Ok(rows) => Ok(response(UserAgentResponsePayload::SdkClientList(
SdkClientListResponse {
result: Some(sdk_client_list_response::Result::Clients(SdkClientList {
clients: rows
.into_iter()
.map(|(id, pubkey, created_at)| SdkClientEntry {
id,
pubkey,
created_at,
})
.collect(),
})),
},
))),
Err(e) => {
error!(?e, "Failed to list SDK clients");
Ok(response(UserAgentResponsePayload::SdkClientList(
SdkClientListResponse {
result: Some(sdk_client_list_response::Result::Error(
ProtoSdkClientError::Internal.into(),
)),
},
)))
}
}
}
async fn handle_sdk_client_revoke(&mut self, req: SdkClientRevokeRequest) -> Output {
use sdk_client_revoke_response::Result as RevokeResult;
let mut conn = match self.props.db.get().await {
Ok(c) => c,
Err(e) => {
error!(?e, "Failed to get DB connection for sdk_client_revoke");
return Ok(response(UserAgentResponsePayload::SdkClientRevoke(
SdkClientRevokeResponse {
result: Some(RevokeResult::Error(ProtoSdkClientError::Internal.into())),
},
)));
}
};
match diesel::delete(program_client::table)
.filter(program_client::id.eq(req.client_id))
.execute(&mut conn)
.await
{
Ok(0) => Ok(response(UserAgentResponsePayload::SdkClientRevoke(
SdkClientRevokeResponse {
result: Some(RevokeResult::Error(ProtoSdkClientError::NotFound.into())),
},
))),
Ok(_) => Ok(response(UserAgentResponsePayload::SdkClientRevoke(
SdkClientRevokeResponse {
result: Some(RevokeResult::Ok(())),
},
))),
Err(diesel::result::Error::DatabaseError(
diesel::result::DatabaseErrorKind::ForeignKeyViolation,
_,
)) => Ok(response(UserAgentResponsePayload::SdkClientRevoke(
SdkClientRevokeResponse {
result: Some(RevokeResult::Error(
ProtoSdkClientError::HasRelatedData.into(),
)),
},
))),
Err(e) => {
error!(?e, "Failed to delete SDK client");
Ok(response(UserAgentResponsePayload::SdkClientRevoke(
SdkClientRevokeResponse {
result: Some(RevokeResult::Error(ProtoSdkClientError::Internal.into())),
},
)))
}
}
}
}
fn map_evm_error<M>(op: &str, err: SendError<M, crate::actors::evm::Error>) -> evm_proto::EvmError {
use crate::actors::{evm::Error as EvmError, keyholder::Error as KhError};
match err {

View File

@@ -79,7 +79,7 @@ fn client_auth_error_status(value: &client::auth::Error) -> Status {
Status::invalid_argument("Failed to convert pubkey to VerifyingKey")
}
Error::InvalidChallengeSolution => Status::unauthenticated(value.to_string()),
Error::ApproveError(_) => Status::permission_denied(value.to_string()),
Error::NotRegistered => Status::permission_denied(value.to_string()),
Error::Transport => Status::internal("Transport error"),
Error::DatabasePoolUnavailable => Status::internal("Database pool error"),
Error::DatabaseOperationFailed => Status::internal("Database error"),