feat(server): add integrity verification for client keys #78

Merged
Skipper merged 2 commits from push-woktoqvkklwr into main 2026-04-06 16:26:11 +00:00
4 changed files with 306 additions and 130 deletions

View File

@@ -9,14 +9,16 @@ use diesel::{
}; };
use diesel_async::RunQueryDsl as _; use diesel_async::RunQueryDsl as _;
use ed25519_dalek::{Signature, VerifyingKey}; use ed25519_dalek::{Signature, VerifyingKey};
use kameo::error::SendError; use kameo::{actor::ActorRef, error::SendError};
use tracing::error; use tracing::error;
use crate::{ use crate::{
actors::{ actors::{
client::{ClientConnection, ClientProfile}, client::{ClientConnection, ClientCredentials, ClientProfile},
flow_coordinator::{self, RequestClientApproval}, flow_coordinator::{self, RequestClientApproval},
keyholder::KeyHolder,
}, },
crypto::integrity::{self, AttestationStatus},
db::{ db::{
self, self,
models::{ProgramClientMetadata, SqliteTimestamp}, models::{ProgramClientMetadata, SqliteTimestamp},
@@ -30,6 +32,8 @@ pub enum Error {
DatabasePoolUnavailable, DatabasePoolUnavailable,
#[error("Database operation failed")] #[error("Database operation failed")]
DatabaseOperationFailed, DatabaseOperationFailed,
#[error("Integrity check failed")]
IntegrityCheckFailed,
#[error("Invalid challenge solution")] #[error("Invalid challenge solution")]
InvalidChallengeSolution, InvalidChallengeSolution,
#[error("Client approval request failed")] #[error("Client approval request failed")]
@@ -38,6 +42,13 @@ pub enum Error {
Transport, Transport,
} }
impl From<diesel::result::Error> for Error {
fn from(e: diesel::result::Error) -> Self {
error!(?e, "Database error");
Self::DatabaseOperationFailed
}
}
#[derive(thiserror::Error, Debug, Clone, PartialEq, Eq)] #[derive(thiserror::Error, Debug, Clone, PartialEq, Eq)]
pub enum ApproveError { pub enum ApproveError {
#[error("Internal error")] #[error("Internal error")]
@@ -65,18 +76,78 @@ pub enum Outbound {
AuthSuccess, AuthSuccess,
} }
pub struct ClientInfo { /// Returns the current nonce and client ID for a registered client.
pub id: i32,
pub current_nonce: i32,
}
/// Atomically reads and increments the nonce for a known client.
/// Returns `None` if the pubkey is not registered. /// Returns `None` if the pubkey is not registered.
async fn get_client_and_nonce( async fn get_current_nonce_and_id(
db: &db::DatabasePool, db: &db::DatabasePool,
pubkey: &VerifyingKey, pubkey: &VerifyingKey,
) -> Result<Option<ClientInfo>, Error> { ) -> Result<Option<(i32, i32)>, Error> {
let pubkey_bytes = pubkey.as_bytes().to_vec(); let pubkey_bytes = pubkey.as_bytes().to_vec();
let mut conn = db.get().await.map_err(|e| {
error!(error = ?e, "Database pool error");
Error::DatabasePoolUnavailable
})?;
program_client::table
.filter(program_client::public_key.eq(&pubkey_bytes))
.select((program_client::id, program_client::nonce))
.first::<(i32, i32)>(&mut conn)
.await
.optional()
.map_err(|e| {
error!(error = ?e, "Database error");
Error::DatabaseOperationFailed
})
}
async fn verify_integrity(
db: &db::DatabasePool,
keyholder: &ActorRef<KeyHolder>,
pubkey: &VerifyingKey,
) -> Result<(), Error> {
let mut db_conn = db.get().await.map_err(|e| {
error!(error = ?e, "Database pool error");
Error::DatabasePoolUnavailable
})?;
let (id, nonce) = get_current_nonce_and_id(db, pubkey)
.await?
.ok_or_else(|| {
error!("Client not found during integrity verification");
Error::DatabaseOperationFailed
})?;
let attestation = integrity::verify_entity(
&mut db_conn,
keyholder,
&ClientCredentials {
pubkey: pubkey.clone(),
nonce,
},
id,
)
.await
.map_err(|e| {
error!(?e, "Integrity verification failed");
Error::IntegrityCheckFailed
})?;
if attestation != AttestationStatus::Attested {
error!("Integrity attestation unavailable for client {id}");
return Err(Error::IntegrityCheckFailed);
}
Ok(())
}
/// Atomically increments the nonce and re-signs the integrity envelope.
/// Returns the new nonce, which is used as the challenge nonce.
async fn create_nonce(
db: &db::DatabasePool,
keyholder: &ActorRef<KeyHolder>,
pubkey: &VerifyingKey,
) -> Result<i32, Error> {
let pubkey_bytes = pubkey.as_bytes().to_vec();
let pubkey = pubkey.clone();
let mut conn = db.get().await.map_err(|e| { let mut conn = db.get().await.map_err(|e| {
error!(error = ?e, "Database pool error"); error!(error = ?e, "Database pool error");
@@ -84,34 +155,35 @@ async fn get_client_and_nonce(
})?; })?;
conn.exclusive_transaction(|conn| { conn.exclusive_transaction(|conn| {
let keyholder = keyholder.clone();
let pubkey = pubkey.clone();
Box::pin(async move { Box::pin(async move {
let Some((client_id, current_nonce)) = program_client::table let (id, new_nonce): (i32, i32) = update(program_client::table)
.filter(program_client::public_key.eq(&pubkey_bytes)) .filter(program_client::public_key.eq(&pubkey_bytes))
.select((program_client::id, program_client::nonce)) .set(program_client::nonce.eq(program_client::nonce + 1))
.first::<(i32, i32)>(conn) .returning((program_client::id, program_client::nonce))
.await .get_result(conn)
.optional()?
else {
return Result::<_, diesel::result::Error>::Ok(None);
};
update(program_client::table)
.filter(program_client::public_key.eq(&pubkey_bytes))
.set(program_client::nonce.eq(current_nonce + 1))
.execute(conn)
.await?; .await?;
Ok(Some(ClientInfo { integrity::sign_entity(
id: client_id, conn,
current_nonce, &keyholder,
})) &ClientCredentials {
pubkey: pubkey.clone(),
nonce: new_nonce,
},
id,
)
.await
.map_err(|e| {
error!(?e, "Integrity sign failed after nonce update");
Error::DatabaseOperationFailed
})?;
Ok(new_nonce)
}) })
}) })
.await .await
.map_err(|e| {
error!(error = ?e, "Database error");
Error::DatabaseOperationFailed
})
} }
async fn approve_new_client( async fn approve_new_client(
@@ -139,45 +211,65 @@ async fn approve_new_client(
async fn insert_client( async fn insert_client(
db: &db::DatabasePool, db: &db::DatabasePool,
keyholder: &ActorRef<KeyHolder>,
pubkey: &VerifyingKey, pubkey: &VerifyingKey,
metadata: &ClientMetadata, metadata: &ClientMetadata,
) -> Result<i32, Error> { ) -> Result<i32, Error> {
use crate::db::schema::{client_metadata, program_client}; use crate::db::schema::{client_metadata, program_client};
let pubkey = pubkey.clone();
let metadata = metadata.clone();
let mut conn = db.get().await.map_err(|e| { let mut conn = db.get().await.map_err(|e| {
error!(error = ?e, "Database pool error"); error!(error = ?e, "Database pool error");
Error::DatabasePoolUnavailable Error::DatabasePoolUnavailable
})?; })?;
let metadata_id = insert_into(client_metadata::table) conn.exclusive_transaction(|conn| {
.values(( let keyholder = keyholder.clone();
client_metadata::name.eq(&metadata.name), let pubkey = pubkey.clone();
client_metadata::description.eq(&metadata.description), Box::pin(async move {
client_metadata::version.eq(&metadata.version), const NONCE_START: i32 = 1;
))
.returning(client_metadata::id)
.get_result::<i32>(&mut conn)
.await
.map_err(|e| {
error!(error = ?e, "Failed to insert client metadata");
Error::DatabaseOperationFailed
})?;
let client_id = insert_into(program_client::table) let metadata_id = insert_into(client_metadata::table)
.values(( .values((
program_client::public_key.eq(pubkey.as_bytes().to_vec()), client_metadata::name.eq(&metadata.name),
program_client::metadata_id.eq(metadata_id), client_metadata::description.eq(&metadata.description),
program_client::nonce.eq(1), // pre-incremented; challenge uses 0 client_metadata::version.eq(&metadata.version),
)) ))
.on_conflict_do_nothing() .returning(client_metadata::id)
.returning(program_client::id) .get_result::<i32>(conn)
.get_result::<i32>(&mut conn) .await?;
.await
.map_err(|e| {
error!(error = ?e, "Failed to insert client metadata");
Error::DatabaseOperationFailed
})?;
Ok(client_id) let client_id = insert_into(program_client::table)
.values((
program_client::public_key.eq(pubkey.as_bytes().to_vec()),
program_client::metadata_id.eq(metadata_id),
program_client::nonce.eq(NONCE_START),
))
.on_conflict_do_nothing()
.returning(program_client::id)
.get_result::<i32>(conn)
.await?;
integrity::sign_entity(
conn,
&keyholder,
&ClientCredentials {
pubkey: pubkey.clone(),
nonce: NONCE_START,
},
client_id,
)
.await
.map_err(|e| {
error!(error = ?e, "Failed to sign integrity tag for new client key");
Error::DatabaseOperationFailed
})?;
Ok(client_id)
})
})
.await
} }
async fn sync_client_metadata( async fn sync_client_metadata(
@@ -295,8 +387,11 @@ where
return Err(Error::Transport); return Err(Error::Transport);
}; };
let info = match get_client_and_nonce(&props.db, &pubkey).await? { let client_id = match get_current_nonce_and_id(&props.db, &pubkey).await? {
Some(nonce) => nonce, Some((id, _)) => {
verify_integrity(&props.db, &props.actors.key_holder, &pubkey).await?;
id
}
None => { None => {
approve_new_client( approve_new_client(
&props.actors, &props.actors,
@@ -306,16 +401,13 @@ where
}, },
) )
.await?; .await?;
let client_id = insert_client(&props.db, &pubkey, &metadata).await?; insert_client(&props.db, &props.actors.key_holder, &pubkey, &metadata).await?
ClientInfo {
id: client_id,
current_nonce: 0,
}
} }
}; };
sync_client_metadata(&props.db, info.id, &metadata).await?; sync_client_metadata(&props.db, client_id, &metadata).await?;
challenge_client(transport, pubkey, info.current_nonce).await?; let challenge_nonce = create_nonce(&props.db, &props.actors.key_holder, &pubkey).await?;
challenge_client(transport, pubkey, challenge_nonce).await?;
transport transport
.send(Ok(Outbound::AuthSuccess)) .send(Ok(Outbound::AuthSuccess))
@@ -325,5 +417,5 @@ where
Error::Transport Error::Transport
})?; })?;
Ok(info.id) Ok(client_id)
} }

View File

@@ -4,6 +4,7 @@ use tracing::{error, info};
use crate::{ use crate::{
actors::{GlobalActors, client::session::ClientSession}, actors::{GlobalActors, client::session::ClientSession},
crypto::integrity::{Integrable, hashing::Hashable},
db, db,
}; };
@@ -13,6 +14,22 @@ pub struct ClientProfile {
pub metadata: ClientMetadata, pub metadata: ClientMetadata,
} }
pub struct ClientCredentials {
pub pubkey: ed25519_dalek::VerifyingKey,
pub nonce: i32,
}
impl Integrable for ClientCredentials {
const KIND: &'static str = "client_credentials";
}
impl Hashable for ClientCredentials {
fn hash<H: sha2::Digest>(&self, hasher: &mut H) {
hasher.update(self.pubkey.as_bytes());
self.nonce.hash(hasher);
}
}
pub struct ClientConnection { pub struct ClientConnection {
pub(crate) db: db::DatabasePool, pub(crate) db: db::DatabasePool,
pub(crate) actors: GlobalActors, pub(crate) actors: GlobalActors,

View File

@@ -68,6 +68,7 @@ impl<'a> AuthTransportAdapter<'a> {
auth::Error::ApproveError(auth::ApproveError::Internal) auth::Error::ApproveError(auth::ApproveError::Internal)
| auth::Error::DatabasePoolUnavailable | auth::Error::DatabasePoolUnavailable
| auth::Error::DatabaseOperationFailed | auth::Error::DatabaseOperationFailed
| auth::Error::IntegrityCheckFailed
| auth::Error::Transport => ProtoAuthResult::Internal, | auth::Error::Transport => ProtoAuthResult::Internal,
} }
.into(), .into(),

View File

@@ -1,9 +1,14 @@
use arbiter_proto::ClientMetadata; use arbiter_proto::ClientMetadata;
use arbiter_proto::transport::{Receiver, Sender}; use arbiter_proto::transport::{Receiver, Sender};
use arbiter_server::actors::GlobalActors;
use arbiter_server::{ use arbiter_server::{
actors::client::{ClientConnection, auth, connect_client}, actors::{
db, GlobalActors,
client::{ClientConnection, ClientCredentials, auth, connect_client},
keyholder::Bootstrap,
},
crypto::integrity,
db::{self, schema},
safe_cell::{SafeCell, SafeCellHandle as _},
}; };
use diesel::{ExpressionMethods as _, NullableExpressionMethods as _, QueryDsl as _, insert_into}; use diesel::{ExpressionMethods as _, NullableExpressionMethods as _, QueryDsl as _, insert_into};
use diesel_async::RunQueryDsl; use diesel_async::RunQueryDsl;
@@ -21,7 +26,8 @@ fn metadata(name: &str, description: Option<&str>, version: Option<&str>) -> Cli
async fn insert_registered_client( async fn insert_registered_client(
db: &db::DatabasePool, db: &db::DatabasePool,
pubkey: Vec<u8>, actors: &GlobalActors,
pubkey: ed25519_dalek::VerifyingKey,
metadata: &ClientMetadata, metadata: &ClientMetadata,
) { ) {
use arbiter_server::db::schema::{client_metadata, program_client}; use arbiter_server::db::schema::{client_metadata, program_client};
@@ -37,23 +43,64 @@ async fn insert_registered_client(
.get_result(&mut conn) .get_result(&mut conn)
.await .await
.unwrap(); .unwrap();
insert_into(program_client::table) let client_id: i32 = insert_into(program_client::table)
.values(( .values((
program_client::public_key.eq(pubkey), program_client::public_key.eq(pubkey.to_bytes().to_vec()),
program_client::metadata_id.eq(metadata_id), program_client::metadata_id.eq(metadata_id),
)) ))
.returning(program_client::id)
.get_result(&mut conn)
.await
.unwrap();
integrity::sign_entity(
&mut conn,
&actors.key_holder,
&ClientCredentials { pubkey, nonce: 1 },
client_id,
)
.await
.unwrap();
}
async fn insert_bootstrap_sentinel_useragent(db: &db::DatabasePool) {
let mut conn = db.get().await.unwrap();
let sentinel_key = ed25519_dalek::SigningKey::generate(&mut rand::rng())
.verifying_key()
.to_bytes()
.to_vec();
insert_into(schema::useragent_client::table)
.values((
schema::useragent_client::public_key.eq(sentinel_key),
schema::useragent_client::key_type.eq(1i32),
))
.execute(&mut conn) .execute(&mut conn)
.await .await
.unwrap(); .unwrap();
} }
async fn spawn_test_actors(db: &db::DatabasePool) -> GlobalActors {
insert_bootstrap_sentinel_useragent(db).await;
let actors = GlobalActors::spawn(db.clone()).await.unwrap();
actors
.key_holder
.ask(Bootstrap {
seal_key_raw: SafeCell::new(b"test-seal-key".to_vec()),
})
.await
.unwrap();
actors
}
#[tokio::test] #[tokio::test]
#[test_log::test] #[test_log::test]
pub async fn test_unregistered_pubkey_rejected() { pub async fn test_unregistered_pubkey_rejected() {
let db = db::create_test_pool().await; let db = db::create_test_pool().await;
let (server_transport, mut test_transport) = ChannelTransport::new(); let (server_transport, mut test_transport) = ChannelTransport::new();
let actors = GlobalActors::spawn(db.clone()).await.unwrap(); let actors = spawn_test_actors(&db).await;
let props = ClientConnection::new(db.clone(), actors); let props = ClientConnection::new(db.clone(), actors);
let task = tokio::spawn(async move { let task = tokio::spawn(async move {
let mut server_transport = server_transport; let mut server_transport = server_transport;
@@ -78,20 +125,19 @@ pub async fn test_unregistered_pubkey_rejected() {
#[test_log::test] #[test_log::test]
pub async fn test_challenge_auth() { pub async fn test_challenge_auth() {
let db = db::create_test_pool().await; let db = db::create_test_pool().await;
let actors = spawn_test_actors(&db).await;
let new_key = ed25519_dalek::SigningKey::generate(&mut rand::rng()); let new_key = ed25519_dalek::SigningKey::generate(&mut rand::rng());
let pubkey_bytes = new_key.verifying_key().to_bytes().to_vec();
insert_registered_client( insert_registered_client(
&db, &db,
pubkey_bytes.clone(), &actors,
new_key.verifying_key(),
&metadata("client", Some("desc"), Some("1.0.0")), &metadata("client", Some("desc"), Some("1.0.0")),
) )
.await; .await;
let (server_transport, mut test_transport) = ChannelTransport::new(); let (server_transport, mut test_transport) = ChannelTransport::new();
let actors = GlobalActors::spawn(db.clone()).await.unwrap();
let props = ClientConnection::new(db.clone(), actors); let props = ClientConnection::new(db.clone(), actors);
let task = tokio::spawn(async move { let task = tokio::spawn(async move {
let mut server_transport = server_transport; let mut server_transport = server_transport;
@@ -147,34 +193,13 @@ pub async fn test_challenge_auth() {
#[test_log::test] #[test_log::test]
pub async fn test_metadata_unchanged_does_not_append_history() { pub async fn test_metadata_unchanged_does_not_append_history() {
let db = db::create_test_pool().await; let db = db::create_test_pool().await;
let actors = GlobalActors::spawn(db.clone()).await.unwrap(); let actors = spawn_test_actors(&db).await;
let props = ClientConnection::new(db.clone(), actors);
let new_key = ed25519_dalek::SigningKey::generate(&mut rand::rng()); let new_key = ed25519_dalek::SigningKey::generate(&mut rand::rng());
let requested = metadata("client", Some("desc"), Some("1.0.0")); let requested = metadata("client", Some("desc"), Some("1.0.0"));
{ insert_registered_client(&db, &actors, new_key.verifying_key(), &requested).await;
use arbiter_server::db::schema::{client_metadata, program_client};
let mut conn = db.get().await.unwrap(); let props = ClientConnection::new(db.clone(), actors);
let metadata_id: i32 = insert_into(client_metadata::table)
.values((
client_metadata::name.eq(&requested.name),
client_metadata::description.eq(&requested.description),
client_metadata::version.eq(&requested.version),
))
.returning(client_metadata::id)
.get_result(&mut conn)
.await
.unwrap();
insert_into(program_client::table)
.values((
program_client::public_key.eq(new_key.verifying_key().to_bytes().to_vec()),
program_client::metadata_id.eq(metadata_id),
))
.execute(&mut conn)
.await
.unwrap();
}
let (server_transport, mut test_transport) = ChannelTransport::new(); let (server_transport, mut test_transport) = ChannelTransport::new();
let task = tokio::spawn(async move { let task = tokio::spawn(async move {
@@ -225,33 +250,18 @@ pub async fn test_metadata_unchanged_does_not_append_history() {
#[test_log::test] #[test_log::test]
pub async fn test_metadata_change_appends_history_and_repoints_binding() { pub async fn test_metadata_change_appends_history_and_repoints_binding() {
let db = db::create_test_pool().await; let db = db::create_test_pool().await;
let actors = GlobalActors::spawn(db.clone()).await.unwrap(); let actors = spawn_test_actors(&db).await;
let props = ClientConnection::new(db.clone(), actors);
let new_key = ed25519_dalek::SigningKey::generate(&mut rand::rng()); let new_key = ed25519_dalek::SigningKey::generate(&mut rand::rng());
{ insert_registered_client(
use arbiter_server::db::schema::{client_metadata, program_client}; &db,
let mut conn = db.get().await.unwrap(); &actors,
let metadata_id: i32 = insert_into(client_metadata::table) new_key.verifying_key(),
.values(( &metadata("client", Some("old"), Some("1.0.0")),
client_metadata::name.eq("client"), )
client_metadata::description.eq(Some("old")), .await;
client_metadata::version.eq(Some("1.0.0")),
)) let props = ClientConnection::new(db.clone(), actors);
.returning(client_metadata::id)
.get_result(&mut conn)
.await
.unwrap();
insert_into(program_client::table)
.values((
program_client::public_key.eq(new_key.verifying_key().to_bytes().to_vec()),
program_client::metadata_id.eq(metadata_id),
))
.execute(&mut conn)
.await
.unwrap();
}
let (server_transport, mut test_transport) = ChannelTransport::new(); let (server_transport, mut test_transport) = ChannelTransport::new();
let task = tokio::spawn(async move { let task = tokio::spawn(async move {
@@ -322,3 +332,59 @@ pub async fn test_metadata_change_appends_history_and_repoints_binding() {
); );
} }
} }
#[tokio::test]
#[test_log::test]
pub async fn test_challenge_auth_rejects_integrity_tag_mismatch() {
let db = db::create_test_pool().await;
let actors = spawn_test_actors(&db).await;
let new_key = ed25519_dalek::SigningKey::generate(&mut rand::rng());
let requested = metadata("client", Some("desc"), Some("1.0.0"));
{
use arbiter_server::db::schema::{client_metadata, program_client};
let mut conn = db.get().await.unwrap();
let metadata_id: i32 = insert_into(client_metadata::table)
.values((
client_metadata::name.eq(&requested.name),
client_metadata::description.eq(&requested.description),
client_metadata::version.eq(&requested.version),
))
.returning(client_metadata::id)
.get_result(&mut conn)
.await
.unwrap();
insert_into(program_client::table)
.values((
program_client::public_key.eq(new_key.verifying_key().to_bytes().to_vec()),
program_client::metadata_id.eq(metadata_id),
))
.execute(&mut conn)
.await
.unwrap();
}
let (server_transport, mut test_transport) = ChannelTransport::new();
let props = ClientConnection::new(db.clone(), actors);
let task = tokio::spawn(async move {
let mut server_transport = server_transport;
connect_client(props, &mut server_transport).await;
});
test_transport
.send(auth::Inbound::AuthChallengeRequest {
pubkey: new_key.verifying_key(),
metadata: requested,
})
.await
.unwrap();
let response = test_transport
.recv()
.await
.expect("should receive auth rejection");
assert!(matches!(response, Err(auth::Error::IntegrityCheckFailed)));
task.await.unwrap();
}