1 Commits

Author SHA1 Message Date
hdbg
8c4c63f51e WIP: kameo::messages wiring for transport generalization 2026-04-14 15:31:20 +02:00
12 changed files with 884 additions and 425 deletions

763
server/Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -26,7 +26,6 @@ thiserror = "2.0.18"
async-trait = "0.1.89" async-trait = "0.1.89"
futures = "0.3.32" futures = "0.3.32"
tokio-stream = { version = "0.1.18", features = ["full"] } tokio-stream = { version = "0.1.18", features = ["full"] }
kameo = "0.20"
prost-types = { version = "0.14.3", features = ["chrono"] } prost-types = { version = "0.14.3", features = ["chrono"] }
x25519-dalek = { version = "2.0.1", features = ["getrandom"] } x25519-dalek = { version = "2.0.1", features = ["getrandom"] }
rstest = "0.26.1" rstest = "0.26.1"
@@ -47,3 +46,6 @@ miette = { version = "7.6.0", features = ["fancy", "serde"] }
mutants = "0.0.4" mutants = "0.0.4"
ml-dsa = { version = "0.1.0-rc.8", features = ["zeroize"] } ml-dsa = { version = "0.1.0-rc.8", features = ["zeroize"] }
base64 = "0.22.1" base64 = "0.22.1"
kameo = {git = "https://github.com/hdbg/kameo.git", rev = "805b417"}
kameo_actors = {git = "https://github.com/hdbg/kameo.git", rev = "805b417"}

View File

@@ -106,6 +106,36 @@ pub trait Receiver<Inbound>: Send + Sync {
/// any built-in correlation mechanism between inbound and outbound items. /// any built-in correlation mechanism between inbound and outbound items.
pub trait Bi<Inbound, Outbound>: Sender<Outbound> + Receiver<Inbound> + Send + Sync {} pub trait Bi<Inbound, Outbound>: Sender<Outbound> + Receiver<Inbound> + Send + Sync {}
#[async_trait]
impl<T, Outbound> Sender<Outbound> for &mut T
where
T: Sender<Outbound> + ?Sized,
Outbound: Send + 'static,
{
async fn send(&mut self, item: Outbound) -> Result<(), Error> {
(**self).send(item).await
}
}
#[async_trait]
impl<T, Inbound> Receiver<Inbound> for &mut T
where
T: Receiver<Inbound> + ?Sized,
Inbound: Send + 'static,
{
async fn recv(&mut self) -> Option<Inbound> {
(**self).recv().await
}
}
impl<T, Inbound, Outbound> Bi<Inbound, Outbound> for &mut T
where
T: Bi<Inbound, Outbound> + ?Sized,
Inbound: Send + 'static,
Outbound: Send + 'static,
{
}
pub trait SplittableBi<Inbound, Outbound>: Bi<Inbound, Outbound> { pub trait SplittableBi<Inbound, Outbound>: Bi<Inbound, Outbound> {
type Sender: Sender<Outbound>; type Sender: Sender<Outbound>;
type Receiver: Receiver<Inbound>; type Receiver: Receiver<Inbound>;

View File

@@ -58,7 +58,7 @@ ml-dsa.workspace = true
ed25519-dalek.workspace = true ed25519-dalek.workspace = true
x25519-dalek.workspace = true x25519-dalek.workspace = true
k256.workspace = true k256.workspace = true
kameo_actors = "0.5.0" kameo_actors.workspace = true
[dev-dependencies] [dev-dependencies]
insta = "1.46.3" insta = "1.46.3"

View File

@@ -23,8 +23,8 @@ use crate::{
}; };
pub struct AuthTransportAdapter<'a> { pub struct AuthTransportAdapter<'a> {
bi: &'a mut GrpcBi<UserAgentRequest, UserAgentResponse>, pub(super) bi: &'a mut GrpcBi<UserAgentRequest, UserAgentResponse>,
request_tracker: &'a mut RequestTracker, pub(super) request_tracker: &'a mut RequestTracker,
} }
impl<'a> AuthTransportAdapter<'a> { impl<'a> AuthTransportAdapter<'a> {
@@ -38,16 +38,32 @@ impl<'a> AuthTransportAdapter<'a> {
} }
} }
async fn send_user_agent_response( pub(super) fn bi_mut(&mut self) -> &mut GrpcBi<UserAgentRequest, UserAgentResponse> {
self.bi
}
pub(super) fn tracker_mut(&mut self) -> &mut RequestTracker {
self.request_tracker
}
pub(super) async fn send_response_payload(
&mut self, &mut self,
payload: AuthResponsePayload, payload: UserAgentResponsePayload,
) -> Result<(), TransportError> { ) -> Result<(), TransportError> {
self.bi self.bi
.send(Ok(UserAgentResponse { .send(Ok(UserAgentResponse {
id: Some(self.request_tracker.current_request_id()), id: Some(self.request_tracker.current_request_id()),
payload: Some(UserAgentResponsePayload::Auth(proto_auth::Response {
payload: Some(payload), payload: Some(payload),
})), }))
.await
}
async fn send_user_agent_response(
&mut self,
payload: AuthResponsePayload,
) -> Result<(), TransportError> {
self.send_response_payload(UserAgentResponsePayload::Auth(proto_auth::Response {
payload: Some(payload),
})) }))
.await .await
} }
@@ -168,6 +184,6 @@ pub async fn start(
bi: &mut GrpcBi<UserAgentRequest, UserAgentResponse>, bi: &mut GrpcBi<UserAgentRequest, UserAgentResponse>,
request_tracker: &mut RequestTracker, request_tracker: &mut RequestTracker,
) -> Result<AuthCredentials, auth::Error> { ) -> Result<AuthCredentials, auth::Error> {
let transport = AuthTransportAdapter::new(bi, request_tracker); let mut transport = AuthTransportAdapter::new(bi, request_tracker);
auth::authenticate(conn, transport).await auth::authenticate(conn, &mut transport).await
} }

View File

@@ -1,25 +1,28 @@
use arbiter_proto::proto::user_agent::{ use arbiter_proto::{
proto::user_agent::{
user_agent_request::Payload as UserAgentRequestPayload,
user_agent_response::Payload as UserAgentResponsePayload, user_agent_response::Payload as UserAgentResponsePayload,
vault::{ vault::{
self as proto_vault, self as proto_vault,
bootstrap::{ bootstrap::{self as proto_bootstrap, BootstrapResult as ProtoBootstrapResult},
self as proto_bootstrap, BootstrapResult as ProtoBootstrapResult,
},
request::Payload as VaultRequestPayload, request::Payload as VaultRequestPayload,
response::Payload as VaultResponsePayload, response::Payload as VaultResponsePayload,
unseal::{ unseal::{
self as proto_unseal, UnsealResult as ProtoUnsealResult, UnsealStart, self as proto_unseal, UnsealResult as ProtoUnsealResult,
request::Payload as UnsealRequestPayload, response::Payload as UnsealResponsePayload, request::Payload as UnsealRequestPayload,
response::Payload as UnsealResponsePayload,
}, },
}, },
},
transport::{Bi, Error as TransportError, Receiver, Sender},
}; };
use kameo::{actor::ActorRef, error::SendError}; use async_trait::async_trait;
use tonic::Status; use tonic::Status;
use tracing::warn; use tracing::warn;
use super::auth::AuthTransportAdapter;
use crate::peers::user_agent::vault_gate::{ use crate::peers::user_agent::vault_gate::{
self as vault_gate, HandleBootstrapEncryptedKey, HandleHandshake, HandleUnsealEncryptedKey, self as vault_gate, HandleBootstrapEncryptedKey, HandleHandshake, HandleUnsealEncryptedKey,
VaultGate,
}; };
fn wrap_vault_response(payload: VaultResponsePayload) -> UserAgentResponsePayload { fn wrap_vault_response(payload: VaultResponsePayload) -> UserAgentResponsePayload {
@@ -40,112 +43,196 @@ fn wrap_bootstrap_response(result: ProtoBootstrapResult) -> UserAgentResponsePay
})) }))
} }
pub(super) async fn dispatch( impl AuthTransportAdapter<'_> {
gate: &ActorRef<VaultGate>, async fn send_query_state(&mut self) -> Result<(), TransportError> {
req: proto_vault::Request,
) -> Result<Option<UserAgentResponsePayload>, Status> {
let Some(payload) = req.payload else {
return Err(Status::invalid_argument("Missing vault request payload"));
};
match payload {
VaultRequestPayload::QueryState(_) => {
use arbiter_proto::proto::shared::VaultState as ProtoVaultState; use arbiter_proto::proto::shared::VaultState as ProtoVaultState;
Ok(Some(wrap_vault_response(VaultResponsePayload::State( self.send_response_payload(wrap_vault_response(VaultResponsePayload::State(
ProtoVaultState::Sealed.into(), ProtoVaultState::Sealed.into(),
)))) )))
} .await
VaultRequestPayload::Unseal(req) => dispatch_unseal(gate, req).await,
VaultRequestPayload::Bootstrap(req) => dispatch_bootstrap(gate, req).await,
} }
} }
async fn dispatch_unseal( #[async_trait]
gate: &ActorRef<VaultGate>, impl Receiver<vault_gate::Inbound> for AuthTransportAdapter<'_> {
req: proto_unseal::Request, async fn recv(&mut self) -> Option<vault_gate::Inbound> {
) -> Result<Option<UserAgentResponsePayload>, Status> { loop {
let Some(payload) = req.payload else { let request = match self.bi_mut().recv().await? {
return Err(Status::invalid_argument("Missing unseal request payload")); Ok(request) => request,
Err(error) => {
warn!(?error, "Failed to receive user agent request during vault gate");
return None;
}
}; };
match payload { if let Err(err) = self.tracker_mut().request(request.id) {
UnsealRequestPayload::Start(req) => handle_unseal_start(gate, req).await, let _ = self.bi_mut().send(Err(err)).await;
UnsealRequestPayload::EncryptedKey(req) => handle_unseal_encrypted_key(gate, req).await, return None;
}
} }
async fn handle_unseal_start( let Some(payload) = request.payload else {
gate: &ActorRef<VaultGate>, let _ = self
req: UnsealStart, .bi_mut()
) -> Result<Option<UserAgentResponsePayload>, Status> { .send(Err(Status::invalid_argument("Missing request payload")))
let client_pubkey = <[u8; 32]>::try_from(req.client_pubkey) .await;
.map(x25519_dalek::PublicKey::from) return None;
.map_err(|_| Status::invalid_argument("Invalid X25519 public key"))?; };
let response = gate let vault_req = match payload {
.ask(HandleHandshake { client_pubkey }) UserAgentRequestPayload::Vault(req) => req,
.await _ => {
.map_err(|err| { let _ = self
warn!(error = ?err, "Failed to handle unseal start"); .bi_mut()
Status::internal("Failed to start unseal flow") .send(Err(Status::permission_denied(
})?; "Only vault operations are permitted before unsealing",
)))
.await;
return None;
}
};
Ok(Some(wrap_unseal_response(UnsealResponsePayload::Start( let Some(vault_payload) = vault_req.payload else {
proto_unseal::UnsealStartResponse { let _ = self
server_pubkey: response.server_pubkey.as_bytes().to_vec(), .bi_mut()
.send(Err(Status::invalid_argument("Missing vault request payload")))
.await;
return None;
};
match vault_payload {
VaultRequestPayload::QueryState(_) => {
if self.send_query_state().await.is_err() {
return None;
}
continue;
}
VaultRequestPayload::Unseal(req) => {
let Some(unseal_payload) = req.payload else {
let _ = self
.bi_mut()
.send(Err(Status::invalid_argument("Missing unseal request payload")))
.await;
return None;
};
match unseal_payload {
UnsealRequestPayload::Start(start) => {
let Ok(bytes) = <[u8; 32]>::try_from(start.client_pubkey) else {
let _ = self
.bi_mut()
.send(Err(Status::invalid_argument(
"Invalid X25519 public key",
)))
.await;
return None;
};
return Some(vault_gate::Inbound::HandleHandshake(HandleHandshake {
client_pubkey: x25519_dalek::PublicKey::from(bytes),
}));
}
UnsealRequestPayload::EncryptedKey(key) => {
return Some(vault_gate::Inbound::HandleUnsealEncryptedKey(
HandleUnsealEncryptedKey {
nonce: key.nonce,
ciphertext: key.ciphertext,
associated_data: key.associated_data,
}, },
)))) ));
} }
async fn handle_unseal_encrypted_key(
gate: &ActorRef<VaultGate>,
req: arbiter_proto::proto::user_agent::vault::unseal::UnsealEncryptedKey,
) -> Result<Option<UserAgentResponsePayload>, Status> {
let result = match gate
.ask(HandleUnsealEncryptedKey {
nonce: req.nonce,
ciphertext: req.ciphertext,
associated_data: req.associated_data,
})
.await
{
Ok(()) => ProtoUnsealResult::Success,
Err(SendError::HandlerError(vault_gate::Error::InvalidKey)) => ProtoUnsealResult::InvalidKey,
Err(err) => {
warn!(error = ?err, "Failed to handle unseal request");
return Err(Status::internal("Failed to unseal vault"));
} }
}
VaultRequestPayload::Bootstrap(req) => {
let Some(encrypted_key) = req.encrypted_key else {
let _ = self
.bi_mut()
.send(Err(Status::invalid_argument(
"Missing bootstrap encrypted key",
)))
.await;
return None;
}; };
Ok(Some(wrap_unseal_response(UnsealResponsePayload::Result( return Some(vault_gate::Inbound::HandleBootstrapEncryptedKey(
result.into(), HandleBootstrapEncryptedKey {
))))
}
async fn dispatch_bootstrap(
gate: &ActorRef<VaultGate>,
req: proto_bootstrap::Request,
) -> Result<Option<UserAgentResponsePayload>, Status> {
let encrypted_key = req
.encrypted_key
.ok_or_else(|| Status::invalid_argument("Missing bootstrap encrypted key"))?;
let result = match gate
.ask(HandleBootstrapEncryptedKey {
nonce: encrypted_key.nonce, nonce: encrypted_key.nonce,
ciphertext: encrypted_key.ciphertext, ciphertext: encrypted_key.ciphertext,
associated_data: encrypted_key.associated_data, associated_data: encrypted_key.associated_data,
}) },
.await ));
{ }
}
}
}
}
#[async_trait]
impl Sender<Result<vault_gate::Outbound, vault_gate::Error>> for AuthTransportAdapter<'_> {
async fn send(
&mut self,
item: Result<vault_gate::Outbound, vault_gate::Error>,
) -> Result<(), TransportError> {
let outbound = match item {
Ok(outbound) => outbound,
Err(err) => {
warn!(?err, "vault gate produced transport-level error");
return self
.bi_mut()
.send(Err(Status::internal(err.to_string())))
.await;
}
};
let payload = match outbound {
vault_gate::Outbound::HandleHandshake(Ok(response)) => {
wrap_unseal_response(UnsealResponsePayload::Start(
proto_unseal::UnsealStartResponse {
server_pubkey: response.server_pubkey.as_bytes().to_vec(),
},
))
}
vault_gate::Outbound::HandleHandshake(Err(err)) => {
warn!(?err, "handshake failed");
return self
.bi_mut()
.send(Err(Status::internal("Failed to start unseal flow")))
.await;
}
vault_gate::Outbound::HandleUnsealEncryptedKey(result) => {
let proto_result = match result {
Ok(()) => ProtoUnsealResult::Success,
Err(vault_gate::Error::InvalidKey) => ProtoUnsealResult::InvalidKey,
Err(err) => {
warn!(?err, "unseal failed");
return self
.bi_mut()
.send(Err(Status::internal("Failed to unseal vault")))
.await;
}
};
wrap_unseal_response(UnsealResponsePayload::Result(proto_result.into()))
}
vault_gate::Outbound::HandleBootstrapEncryptedKey(result) => {
let proto_result = match result {
Ok(()) => ProtoBootstrapResult::Success, Ok(()) => ProtoBootstrapResult::Success,
Err(SendError::HandlerError(vault_gate::Error::InvalidKey)) => ProtoBootstrapResult::InvalidKey, Err(vault_gate::Error::InvalidKey) => ProtoBootstrapResult::InvalidKey,
Err(SendError::HandlerError(vault_gate::Error::AlreadyBootstrapped)) => { Err(vault_gate::Error::AlreadyBootstrapped) => {
ProtoBootstrapResult::AlreadyBootstrapped ProtoBootstrapResult::AlreadyBootstrapped
} }
Err(err) => { Err(err) => {
warn!(error = ?err, "Failed to handle bootstrap request"); warn!(?err, "bootstrap failed");
return Err(Status::internal("Failed to bootstrap vault")); return self
.bi_mut()
.send(Err(Status::internal("Failed to bootstrap vault")))
.await;
} }
}; };
Ok(Some(wrap_bootstrap_response(result))) wrap_bootstrap_response(proto_result)
}
};
self.send_response_payload(payload).await
}
}
impl Bi<vault_gate::Inbound, Result<vault_gate::Outbound, vault_gate::Error>>
for AuthTransportAdapter<'_>
{
} }

View File

@@ -68,10 +68,10 @@ fn parse_auth_event(payload: Inbound) -> AuthEvents {
pub async fn authenticate<T>( pub async fn authenticate<T>(
props: &mut UserAgentConnection, props: &mut UserAgentConnection,
transport: T, transport: &mut T,
) -> Result<AuthCredentials, Error> ) -> Result<AuthCredentials, Error>
where where
T: Bi<Inbound, Result<Outbound, Error>> + Send, T: Bi<Inbound, Result<Outbound, Error>> + Send + ?Sized,
{ {
let mut state = AuthStateMachine::new(AuthContext::new(props, transport)); let mut state = AuthStateMachine::new(AuthContext::new(props, transport));

View File

@@ -174,20 +174,20 @@ async fn register_key(db: &DatabasePool, pubkey: &authn::PublicKey) -> Result<i3
Ok(id) Ok(id)
} }
pub struct AuthContext<'a, T> { pub struct AuthContext<'a, T: ?Sized> {
pub(super) conn: &'a mut UserAgentConnection, pub(super) conn: &'a mut UserAgentConnection,
pub(super) transport: T, pub(super) transport: &'a mut T,
} }
impl<'a, T> AuthContext<'a, T> { impl<'a, T: ?Sized> AuthContext<'a, T> {
pub fn new(conn: &'a mut UserAgentConnection, transport: T) -> Self { pub fn new(conn: &'a mut UserAgentConnection, transport: &'a mut T) -> Self {
Self { conn, transport } Self { conn, transport }
} }
} }
impl<T> AuthStateMachineContext for AuthContext<'_, T> impl<T> AuthStateMachineContext for AuthContext<'_, T>
where where
T: Bi<super::Inbound, Result<super::Outbound, Error>> + Send, T: Bi<super::Inbound, Result<super::Outbound, Error>> + Send + ?Sized,
{ {
type Error = Error; type Error = Error;

View File

@@ -1,13 +1,25 @@
use crate::{ use crate::{
actors::GlobalActors, crypto::integrity::Integrable, db, peers::client::ClientProfile, actors::GlobalActors,
crypto::integrity::{self, Integrable},
db,
peers::client::ClientProfile,
}; };
use arbiter_crypto::authn; use arbiter_crypto::authn;
use arbiter_proto::transport::{Bi, Sender};
pub use auth::authenticate;
use kameo::actor::{ActorRef, Spawn as _};
pub use session::UserAgentSession;
use tokio::sync::oneshot;
use tracing::warn;
use vault_gate::VaultGate;
use crate::crypto::integrity::hashing::Hashable;
pub mod auth; pub mod auth;
pub mod session; pub mod session;
pub mod vault_gate; pub mod vault_gate;
#[derive(Debug, Clone, Hash)] #[derive(Debug, Clone, Hash)]
pub struct Credentials { pub struct Credentials {
pub id: i32, pub id: i32,
@@ -40,7 +52,6 @@ impl Hashable for AuthCredentials {
} }
} }
impl Integrable for AuthCredentials { impl Integrable for AuthCredentials {
const KIND: &'static str = "useragent_credentials"; const KIND: &'static str = "useragent_credentials";
} }
@@ -52,6 +63,7 @@ pub enum OutOfBand {
ClientConnectionCancel { pubkey: authn::PublicKey }, ClientConnectionCancel { pubkey: authn::PublicKey },
} }
#[derive(Clone)]
pub struct UserAgentConnection { pub struct UserAgentConnection {
pub(crate) db: db::DatabasePool, pub(crate) db: db::DatabasePool,
pub(crate) actors: GlobalActors, pub(crate) actors: GlobalActors,
@@ -63,9 +75,101 @@ impl UserAgentConnection {
} }
} }
#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("authentication failed: {0:?}")]
Auth(auth::Error),
#[error("vault gate failed: {0}")]
VaultGate(#[from] vault_gate::Error),
#[error("transport closed unexpectedly")]
Transport,
#[error("internal: {0}")]
Internal(String),
}
impl From<auth::Error> for Error {
fn from(err: auth::Error) -> Self {
Self::Auth(err)
}
}
pub use auth::authenticate; pub async fn start<T>(
pub use session::UserAgentSession; props: &mut UserAgentConnection,
mut transport: T,
oob_sender: Box<dyn Sender<OutOfBand>>,
) -> Result<ActorRef<UserAgentSession>, Error>
where
T: Bi<auth::Inbound, Result<auth::Outbound, auth::Error>> + Send,
T: Bi<vault_gate::Inbound, Result<vault_gate::Outbound, vault_gate::Error>> + Send,
{
let auth_creds = authenticate(props, &mut transport).await?;
use crate::crypto::integrity::hashing::Hashable; let creds = if integrity::is_signing_available(&props.actors.vault)
.await
.unwrap_or(false)
{
auth_creds.creds
} else {
run_vault_gate(props, &mut transport, auth_creds).await?
};
Ok(UserAgentSession::spawn(UserAgentSession::new(
props.clone(),
creds,
oob_sender,
)))
}
async fn run_vault_gate<T>(
props: &UserAgentConnection,
transport: &mut T,
auth_creds: AuthCredentials,
) -> Result<Credentials, Error>
where
T: Bi<vault_gate::Inbound, Result<vault_gate::Outbound, vault_gate::Error>> + Send + ?Sized,
{
let (promotion_tx, mut promotion_rx) = oneshot::channel();
let gate = VaultGate::spawn(VaultGate::new(
auth_creds,
props.actors.clone(),
props.db.clone(),
promotion_tx,
));
let result = loop {
tokio::select! {
promotion = &mut promotion_rx => {
break match promotion {
Ok(Ok(creds)) => Ok(creds),
Ok(Err(err)) => Err(Error::VaultGate(err)),
Err(_) => Err(Error::Internal(
"vault gate promotion channel closed".into(),
)),
};
}
inbound = transport.recv() => {
let Some(inbound) = inbound else {
break Err(Error::Transport);
};
match gate.ask(inbound).await {
Ok(outbound) => {
if transport.send(Ok(outbound)).await.is_err() {
break Err(Error::Transport);
}
}
Err(err) => {
warn!(?err, "VaultGate failed to handle message");
break Err(Error::Internal(format!(
"vault gate ask failed: {err:?}"
)));
}
}
}
}
};
gate.kill();
result
}

View File

@@ -119,7 +119,7 @@ impl VaultGate {
} }
} }
#[messages] #[messages(messages = Inbound, replies = Outbound)]
impl VaultGate { impl VaultGate {
#[message] #[message]
pub async fn handle_handshake( pub async fn handle_handshake(
@@ -185,7 +185,7 @@ impl VaultGate {
} }
#[message] #[message]
pub(crate) async fn handle_bootstrap_encrypted_key( pub async fn handle_bootstrap_encrypted_key(
&mut self, &mut self,
nonce: Vec<u8>, nonce: Vec<u8>,
ciphertext: Vec<u8>, ciphertext: Vec<u8>,

View File

@@ -2,17 +2,16 @@ use std::sync::Mutex;
use x25519_dalek::{EphemeralSecret, PublicKey, SharedSecret}; use x25519_dalek::{EphemeralSecret, PublicKey, SharedSecret};
pub struct Handshake { pub struct Handshake {
client_pubkey: PublicKey, client_pubkey: PublicKey,
} }
#[derive(Default)] #[derive(Default)]
pub enum State { pub enum State {
#[default] #[default]
Idle, Idle,
ReadyForExchange { server_key: PublicKey, secret: SharedSecret }, ReadyForExchange {
server_key: PublicKey,
secret: SharedSecret,
},
} }

View File

@@ -42,11 +42,11 @@ pub async fn test_bootstrap_token_auth() {
.unwrap(); .unwrap();
let token = actors.bootstrapper.ask(GetToken).await.unwrap().unwrap(); let token = actors.bootstrapper.ask(GetToken).await.unwrap().unwrap();
let (server_transport, mut test_transport) = ChannelTransport::new(); let (mut server_transport, mut test_transport) = ChannelTransport::new();
let db_for_task = db.clone(); let db_for_task = db.clone();
let task = tokio::spawn(async move { let task = tokio::spawn(async move {
let mut props = UserAgentConnection::new(db_for_task, actors); let mut props = UserAgentConnection::new(db_for_task, actors);
auth::authenticate(&mut props, server_transport).await auth::authenticate(&mut props, &mut server_transport).await
}); });
let new_key = MlDsa87::key_gen(&mut rand::rng()); let new_key = MlDsa87::key_gen(&mut rand::rng());
@@ -84,11 +84,11 @@ pub async fn test_bootstrap_invalid_token_auth() {
let db = db::create_test_pool().await; let db = db::create_test_pool().await;
let actors = GlobalActors::spawn(db.clone()).await.unwrap(); let actors = GlobalActors::spawn(db.clone()).await.unwrap();
let (server_transport, mut test_transport) = ChannelTransport::new(); let (mut server_transport, mut test_transport) = ChannelTransport::new();
let db_for_task = db.clone(); let db_for_task = db.clone();
let task = tokio::spawn(async move { let task = tokio::spawn(async move {
let mut props = UserAgentConnection::new(db_for_task, actors); let mut props = UserAgentConnection::new(db_for_task, actors);
auth::authenticate(&mut props, server_transport).await auth::authenticate(&mut props, &mut server_transport).await
}); });
let new_key = MlDsa87::key_gen(&mut rand::rng()); let new_key = MlDsa87::key_gen(&mut rand::rng());
@@ -157,11 +157,11 @@ pub async fn test_challenge_auth() {
.unwrap(); .unwrap();
} }
let (server_transport, mut test_transport) = ChannelTransport::new(); let (mut server_transport, mut test_transport) = ChannelTransport::new();
let db_for_task = db.clone(); let db_for_task = db.clone();
let task = tokio::spawn(async move { let task = tokio::spawn(async move {
let mut props = UserAgentConnection::new(db_for_task, actors); let mut props = UserAgentConnection::new(db_for_task, actors);
auth::authenticate(&mut props, server_transport).await auth::authenticate(&mut props, &mut server_transport).await
}); });
test_transport test_transport
@@ -234,11 +234,11 @@ pub async fn test_challenge_auth_rejects_integrity_tag_mismatch_when_unsealed()
.unwrap(); .unwrap();
} }
let (server_transport, mut test_transport) = ChannelTransport::new(); let (mut server_transport, mut test_transport) = ChannelTransport::new();
let db_for_task = db.clone(); let db_for_task = db.clone();
let task = tokio::spawn(async move { let task = tokio::spawn(async move {
let mut props = UserAgentConnection::new(db_for_task, actors); let mut props = UserAgentConnection::new(db_for_task, actors);
auth::authenticate(&mut props, server_transport).await auth::authenticate(&mut props, &mut server_transport).await
}); });
test_transport test_transport
@@ -298,11 +298,11 @@ pub async fn test_challenge_auth_rejects_invalid_signature() {
.unwrap(); .unwrap();
} }
let (server_transport, mut test_transport) = ChannelTransport::new(); let (mut server_transport, mut test_transport) = ChannelTransport::new();
let db_for_task = db.clone(); let db_for_task = db.clone();
let task = tokio::spawn(async move { let task = tokio::spawn(async move {
let mut props = UserAgentConnection::new(db_for_task, actors); let mut props = UserAgentConnection::new(db_for_task, actors);
auth::authenticate(&mut props, server_transport).await auth::authenticate(&mut props, &mut server_transport).await
}); });
test_transport test_transport