refactor(server): reorganized client/user_agent actors into separate module peers and added event MessageBus

This commit is contained in:
hdbg
2026-04-07 23:54:29 +02:00
parent f3cf6a9438
commit 1585f90cae
42 changed files with 332 additions and 286 deletions

View File

@@ -0,0 +1,423 @@
use arbiter_crypto::authn::{self, CLIENT_CONTEXT};
use arbiter_proto::{
ClientMetadata,
transport::{Bi, expect_message},
};
use chrono::Utc;
use diesel::{
ExpressionMethods as _, OptionalExtension as _, QueryDsl as _, SelectableHelper as _,
dsl::insert_into, update,
};
use diesel_async::RunQueryDsl as _;
use kameo::{actor::ActorRef, error::SendError};
use tracing::error;
use crate::{
actors::{
GlobalActors, flow_coordinator::{self, RequestClientApproval}, vault::Vault
},
crypto::integrity::{self, AttestationStatus},
db::{
self,
models::{ProgramClientMetadata, SqliteTimestamp},
schema::program_client,
},
};
use super::{ClientConnection, ClientCredentials, ClientProfile};
#[derive(thiserror::Error, Debug, Clone, PartialEq, Eq)]
pub enum Error {
#[error("Database pool unavailable")]
DatabasePoolUnavailable,
#[error("Database operation failed")]
DatabaseOperationFailed,
#[error("Integrity check failed")]
IntegrityCheckFailed,
#[error("Invalid challenge solution")]
InvalidChallengeSolution,
#[error("Client approval request failed")]
ApproveError(#[from] ApproveError),
#[error("Transport error")]
Transport,
}
impl From<diesel::result::Error> for Error {
fn from(e: diesel::result::Error) -> Self {
error!(?e, "Database error");
Self::DatabaseOperationFailed
}
}
#[derive(thiserror::Error, Debug, Clone, PartialEq, Eq)]
pub enum ApproveError {
#[error("Internal error")]
Internal,
#[error("Client connection denied by user agents")]
Denied,
#[error("Upstream error: {0}")]
Upstream(flow_coordinator::ApprovalError),
}
#[derive(Debug, Clone)]
pub enum Inbound {
AuthChallengeRequest {
pubkey: authn::PublicKey,
metadata: ClientMetadata,
},
AuthChallengeSolution {
signature: authn::Signature,
},
}
#[derive(Debug, Clone)]
pub enum Outbound {
AuthChallenge {
pubkey: authn::PublicKey,
nonce: i32,
},
AuthSuccess,
}
/// Returns the current nonce and client ID for a registered client.
/// Returns `None` if the pubkey is not registered.
async fn get_current_nonce_and_id(
db: &db::DatabasePool,
pubkey: &authn::PublicKey,
) -> Result<Option<(i32, i32)>, Error> {
let pubkey_bytes = pubkey.to_bytes();
let mut conn = db.get().await.map_err(|e| {
error!(error = ?e, "Database pool error");
Error::DatabasePoolUnavailable
})?;
program_client::table
.filter(program_client::public_key.eq(&pubkey_bytes))
.select((program_client::id, program_client::nonce))
.first::<(i32, i32)>(&mut conn)
.await
.optional()
.map_err(|e| {
error!(error = ?e, "Database error");
Error::DatabaseOperationFailed
})
}
async fn verify_integrity(
db: &db::DatabasePool,
vault: &ActorRef<Vault>,
pubkey: &authn::PublicKey,
) -> Result<(), Error> {
let mut db_conn = db.get().await.map_err(|e| {
error!(error = ?e, "Database pool error");
Error::DatabasePoolUnavailable
})?;
let (id, nonce) = get_current_nonce_and_id(db, pubkey).await?.ok_or_else(|| {
error!("Client not found during integrity verification");
Error::DatabaseOperationFailed
})?;
let attestation = integrity::verify_entity(
&mut db_conn,
vault,
&ClientCredentials {
pubkey: pubkey.clone(),
nonce,
},
id,
)
.await
.map_err(|e| {
error!(?e, "Integrity verification failed");
Error::IntegrityCheckFailed
})?;
if attestation != AttestationStatus::Attested {
error!("Integrity attestation unavailable for client {id}");
return Err(Error::IntegrityCheckFailed);
}
Ok(())
}
/// Atomically increments the nonce and re-signs the integrity envelope.
/// Returns the new nonce, which is used as the challenge nonce.
async fn create_nonce(
db: &db::DatabasePool,
vault: &ActorRef<Vault>,
pubkey: &authn::PublicKey,
) -> Result<i32, Error> {
let pubkey_bytes = pubkey.to_bytes();
let pubkey = pubkey.clone();
let mut conn = db.get().await.map_err(|e| {
error!(error = ?e, "Database pool error");
Error::DatabasePoolUnavailable
})?;
conn.exclusive_transaction(|conn| {
let vault = vault.clone();
let pubkey = pubkey.clone();
Box::pin(async move {
let (id, new_nonce): (i32, i32) = update(program_client::table)
.filter(program_client::public_key.eq(&pubkey_bytes))
.set(program_client::nonce.eq(program_client::nonce + 1))
.returning((program_client::id, program_client::nonce))
.get_result(conn)
.await?;
integrity::sign_entity(
conn,
&vault,
&ClientCredentials {
pubkey: pubkey.clone(),
nonce: new_nonce,
},
id,
)
.await
.map_err(|e| {
error!(?e, "Integrity sign failed after nonce update");
Error::DatabaseOperationFailed
})?;
Ok(new_nonce)
})
})
.await
}
async fn approve_new_client(
actors: &GlobalActors,
profile: ClientProfile,
) -> Result<(), Error> {
let result = actors
.flow_coordinator
.ask(RequestClientApproval { client: profile })
.await;
match result {
Ok(true) => Ok(()),
Ok(false) => Err(Error::ApproveError(ApproveError::Denied)),
Err(SendError::HandlerError(e)) => {
error!(error = ?e, "Approval upstream error");
Err(Error::ApproveError(ApproveError::Upstream(e)))
}
Err(e) => {
error!(error = ?e, "Approval request to flow coordinator failed");
Err(Error::ApproveError(ApproveError::Internal))
}
}
}
async fn insert_client(
db: &db::DatabasePool,
vault: &ActorRef<Vault>,
pubkey: &authn::PublicKey,
metadata: &ClientMetadata,
) -> Result<i32, Error> {
use crate::db::schema::{client_metadata, program_client};
let pubkey = pubkey.clone();
let metadata = metadata.clone();
let mut conn = db.get().await.map_err(|e| {
error!(error = ?e, "Database pool error");
Error::DatabasePoolUnavailable
})?;
conn.exclusive_transaction(|conn| {
let vault = vault.clone();
let pubkey = pubkey.clone();
Box::pin(async move {
const NONCE_START: i32 = 1;
let metadata_id = insert_into(client_metadata::table)
.values((
client_metadata::name.eq(&metadata.name),
client_metadata::description.eq(&metadata.description),
client_metadata::version.eq(&metadata.version),
))
.returning(client_metadata::id)
.get_result::<i32>(conn)
.await?;
let client_id = insert_into(program_client::table)
.values((
program_client::public_key.eq(pubkey.to_bytes()),
program_client::metadata_id.eq(metadata_id),
program_client::nonce.eq(NONCE_START),
))
.on_conflict_do_nothing()
.returning(program_client::id)
.get_result::<i32>(conn)
.await?;
integrity::sign_entity(
conn,
&vault,
&ClientCredentials {
pubkey: pubkey.clone(),
nonce: NONCE_START,
},
client_id,
)
.await
.map_err(|e| {
error!(error = ?e, "Failed to sign integrity tag for new client key");
Error::DatabaseOperationFailed
})?;
Ok(client_id)
})
})
.await
}
async fn sync_client_metadata(
db: &db::DatabasePool,
client_id: i32,
metadata: &ClientMetadata,
) -> Result<(), Error> {
use crate::db::schema::{client_metadata, client_metadata_history};
let now = SqliteTimestamp(Utc::now());
let mut conn = db.get().await.map_err(|e| {
error!(error = ?e, "Database pool error");
Error::DatabasePoolUnavailable
})?;
conn.exclusive_transaction(|conn| {
let metadata = metadata.clone();
Box::pin(async move {
let (current_metadata_id, current): (i32, ProgramClientMetadata) =
program_client::table
.find(client_id)
.inner_join(client_metadata::table)
.select((
program_client::metadata_id,
ProgramClientMetadata::as_select(),
))
.first(conn)
.await?;
let unchanged = current.name == metadata.name
&& current.description == metadata.description
&& current.version == metadata.version;
if unchanged {
return Ok(());
}
insert_into(client_metadata_history::table)
.values((
client_metadata_history::metadata_id.eq(current_metadata_id),
client_metadata_history::client_id.eq(client_id),
))
.execute(conn)
.await?;
let metadata_id = insert_into(client_metadata::table)
.values((
client_metadata::name.eq(&metadata.name),
client_metadata::description.eq(&metadata.description),
client_metadata::version.eq(&metadata.version),
))
.returning(client_metadata::id)
.get_result::<i32>(conn)
.await?;
update(program_client::table.find(client_id))
.set((
program_client::metadata_id.eq(metadata_id),
program_client::updated_at.eq(now),
))
.execute(conn)
.await?;
Ok::<(), diesel::result::Error>(())
})
})
.await
.map_err(|e| {
error!(error = ?e, "Database error");
Error::DatabaseOperationFailed
})
}
async fn challenge_client<T>(
transport: &mut T,
pubkey: authn::PublicKey,
nonce: i32,
) -> Result<(), Error>
where
T: Bi<Inbound, Result<Outbound, Error>> + ?Sized,
{
transport
.send(Ok(Outbound::AuthChallenge {
pubkey: pubkey.clone(),
nonce,
}))
.await
.map_err(|e| {
error!(error = ?e, "Failed to send auth challenge");
Error::Transport
})?;
let signature = expect_message(transport, |req: Inbound| match req {
Inbound::AuthChallengeSolution { signature } => Some(signature),
_ => None,
})
.await
.map_err(|e| {
error!(error = ?e, "Failed to receive challenge solution");
Error::Transport
})?;
if !pubkey.verify(nonce, CLIENT_CONTEXT, &signature) {
error!("Challenge solution verification failed");
return Err(Error::InvalidChallengeSolution);
}
Ok(())
}
pub async fn authenticate<T>(props: &mut ClientConnection, transport: &mut T) -> Result<i32, Error>
where
T: Bi<Inbound, Result<Outbound, Error>> + Send + ?Sized,
{
let Some(Inbound::AuthChallengeRequest { pubkey, metadata }) = transport.recv().await else {
return Err(Error::Transport);
};
let client_id = match get_current_nonce_and_id(&props.db, &pubkey).await? {
Some((id, _)) => {
verify_integrity(&props.db, &props.actors.vault, &pubkey).await?;
id
}
None => {
approve_new_client(
&props.actors,
ClientProfile {
pubkey: pubkey.clone(),
metadata: metadata.clone(),
},
)
.await?;
insert_client(&props.db, &props.actors.vault, &pubkey, &metadata).await?
}
};
sync_client_metadata(&props.db, client_id, &metadata).await?;
let challenge_nonce = create_nonce(&props.db, &props.actors.vault, &pubkey).await?;
challenge_client(transport, pubkey, challenge_nonce).await?;
transport
.send(Ok(Outbound::AuthSuccess))
.await
.map_err(|e| {
error!(error = ?e, "Failed to send auth success");
Error::Transport
})?;
Ok(client_id)
}