WIP: kameo::messages wiring for transport generalization
This commit is contained in:
@@ -68,10 +68,10 @@ fn parse_auth_event(payload: Inbound) -> AuthEvents {
|
||||
|
||||
pub async fn authenticate<T>(
|
||||
props: &mut UserAgentConnection,
|
||||
transport: T,
|
||||
transport: &mut T,
|
||||
) -> Result<AuthCredentials, Error>
|
||||
where
|
||||
T: Bi<Inbound, Result<Outbound, Error>> + Send,
|
||||
T: Bi<Inbound, Result<Outbound, Error>> + Send + ?Sized,
|
||||
{
|
||||
let mut state = AuthStateMachine::new(AuthContext::new(props, transport));
|
||||
|
||||
|
||||
@@ -174,20 +174,20 @@ async fn register_key(db: &DatabasePool, pubkey: &authn::PublicKey) -> Result<i3
|
||||
Ok(id)
|
||||
}
|
||||
|
||||
pub struct AuthContext<'a, T> {
|
||||
pub struct AuthContext<'a, T: ?Sized> {
|
||||
pub(super) conn: &'a mut UserAgentConnection,
|
||||
pub(super) transport: T,
|
||||
pub(super) transport: &'a mut T,
|
||||
}
|
||||
|
||||
impl<'a, T> AuthContext<'a, T> {
|
||||
pub fn new(conn: &'a mut UserAgentConnection, transport: T) -> Self {
|
||||
impl<'a, T: ?Sized> AuthContext<'a, T> {
|
||||
pub fn new(conn: &'a mut UserAgentConnection, transport: &'a mut T) -> Self {
|
||||
Self { conn, transport }
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> AuthStateMachineContext for AuthContext<'_, T>
|
||||
where
|
||||
T: Bi<super::Inbound, Result<super::Outbound, Error>> + Send,
|
||||
T: Bi<super::Inbound, Result<super::Outbound, Error>> + Send + ?Sized,
|
||||
{
|
||||
type Error = Error;
|
||||
|
||||
|
||||
@@ -1,13 +1,25 @@
|
||||
use crate::{
|
||||
actors::GlobalActors, crypto::integrity::Integrable, db, peers::client::ClientProfile,
|
||||
actors::GlobalActors,
|
||||
crypto::integrity::{self, Integrable},
|
||||
db::{self, DatabaseError},
|
||||
peers::client::ClientProfile,
|
||||
};
|
||||
use arbiter_crypto::authn;
|
||||
|
||||
use arbiter_proto::transport::{Bi, Sender};
|
||||
pub use auth::authenticate;
|
||||
use kameo::actor::{ActorRef, Spawn as _};
|
||||
pub use session::UserAgentSession;
|
||||
use tokio::sync::oneshot;
|
||||
use tracing::warn;
|
||||
use vault_gate::VaultGate;
|
||||
|
||||
use crate::crypto::integrity::hashing::Hashable;
|
||||
|
||||
pub mod auth;
|
||||
pub mod session;
|
||||
pub mod vault_gate;
|
||||
|
||||
|
||||
#[derive(Debug, Clone, Hash)]
|
||||
pub struct Credentials {
|
||||
pub id: i32,
|
||||
@@ -40,7 +52,6 @@ impl Hashable for AuthCredentials {
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
impl Integrable for AuthCredentials {
|
||||
const KIND: &'static str = "useragent_credentials";
|
||||
}
|
||||
@@ -52,6 +63,7 @@ pub enum OutOfBand {
|
||||
ClientConnectionCancel { pubkey: authn::PublicKey },
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct UserAgentConnection {
|
||||
pub(crate) db: db::DatabasePool,
|
||||
pub(crate) actors: GlobalActors,
|
||||
@@ -63,9 +75,103 @@ impl UserAgentConnection {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum Error {
|
||||
#[error("authentication failed: {0:?}")]
|
||||
Auth(auth::Error),
|
||||
#[error("vault gate failed: {0}")]
|
||||
VaultGate(#[from] vault_gate::Error),
|
||||
#[error("transport closed unexpectedly")]
|
||||
Transport,
|
||||
#[error("database error: {0}")]
|
||||
Database(DatabaseError),
|
||||
#[error("internal: {0}")]
|
||||
Internal(String),
|
||||
}
|
||||
|
||||
impl From<auth::Error> for Error {
|
||||
fn from(err: auth::Error) -> Self {
|
||||
Self::Auth(err)
|
||||
}
|
||||
}
|
||||
|
||||
pub use auth::authenticate;
|
||||
pub use session::UserAgentSession;
|
||||
pub async fn start<T>(
|
||||
props: &mut UserAgentConnection,
|
||||
mut transport: T,
|
||||
oob_sender: Box<dyn Sender<OutOfBand>>,
|
||||
) -> Result<ActorRef<UserAgentSession>, Error>
|
||||
where
|
||||
T: Bi<auth::Inbound, Result<auth::Outbound, auth::Error>> + Send,
|
||||
T: Bi<vault_gate::Inbound, Result<vault_gate::Outbound, vault_gate::Error>> + Send,
|
||||
{
|
||||
let auth_creds = authenticate(props, &mut transport).await?;
|
||||
|
||||
use crate::crypto::integrity::hashing::Hashable;
|
||||
let creds = match integrity::is_signing_available(&props.actors.vault)
|
||||
.await
|
||||
.map_err(|_| Error::Internal("Integrity verification failed".into()))?
|
||||
{
|
||||
// credentials were checked by `auth` stage
|
||||
true => auth_creds.creds,
|
||||
false => run_vault_gate(props, &mut transport, auth_creds).await?,
|
||||
};
|
||||
|
||||
Ok(UserAgentSession::spawn(UserAgentSession::new(
|
||||
props.clone(),
|
||||
creds,
|
||||
oob_sender,
|
||||
)))
|
||||
}
|
||||
|
||||
async fn run_vault_gate<T>(
|
||||
props: &UserAgentConnection,
|
||||
transport: &mut T,
|
||||
auth_creds: AuthCredentials,
|
||||
) -> Result<Credentials, Error>
|
||||
where
|
||||
T: Bi<vault_gate::Inbound, Result<vault_gate::Outbound, vault_gate::Error>> + Send + ?Sized,
|
||||
{
|
||||
let (promotion_tx, mut promotion_rx) = oneshot::channel();
|
||||
let gate = VaultGate::spawn(VaultGate::new(
|
||||
auth_creds,
|
||||
props.actors.clone(),
|
||||
props.db.clone(),
|
||||
promotion_tx,
|
||||
));
|
||||
|
||||
let result = loop {
|
||||
tokio::select! {
|
||||
promotion = &mut promotion_rx => {
|
||||
break match promotion {
|
||||
Ok(Ok(creds)) => Ok(creds),
|
||||
Ok(Err(err)) => Err(Error::VaultGate(err)),
|
||||
Err(_) => Err(Error::Internal(
|
||||
"vault gate promotion channel closed".into(),
|
||||
)),
|
||||
};
|
||||
}
|
||||
|
||||
inbound = transport.recv() => {
|
||||
let Some(inbound) = inbound else {
|
||||
break Err(Error::Transport);
|
||||
};
|
||||
|
||||
match gate.ask(inbound).await {
|
||||
Ok(outbound) => {
|
||||
if transport.send(Ok(outbound)).await.is_err() {
|
||||
break Err(Error::Transport);
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
warn!(?err, "VaultGate failed to handle message");
|
||||
break Err(Error::Internal(format!(
|
||||
"vault gate ask failed: {err:?}"
|
||||
)));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
gate.kill();
|
||||
result
|
||||
}
|
||||
|
||||
@@ -119,7 +119,7 @@ impl VaultGate {
|
||||
}
|
||||
}
|
||||
|
||||
#[messages]
|
||||
#[messages(messages = Inbound, replies = Outbound)]
|
||||
impl VaultGate {
|
||||
#[message]
|
||||
pub async fn handle_handshake(
|
||||
@@ -185,7 +185,7 @@ impl VaultGate {
|
||||
}
|
||||
|
||||
#[message]
|
||||
pub(crate) async fn handle_bootstrap_encrypted_key(
|
||||
pub async fn handle_bootstrap_encrypted_key(
|
||||
&mut self,
|
||||
nonce: Vec<u8>,
|
||||
ciphertext: Vec<u8>,
|
||||
|
||||
@@ -2,17 +2,16 @@ use std::sync::Mutex;
|
||||
|
||||
use x25519_dalek::{EphemeralSecret, PublicKey, SharedSecret};
|
||||
|
||||
|
||||
|
||||
pub struct Handshake {
|
||||
client_pubkey: PublicKey,
|
||||
}
|
||||
|
||||
|
||||
|
||||
#[derive(Default)]
|
||||
pub enum State {
|
||||
#[default]
|
||||
Idle,
|
||||
ReadyForExchange { server_key: PublicKey, secret: SharedSecret },
|
||||
}
|
||||
Idle,
|
||||
ReadyForExchange {
|
||||
server_key: PublicKey,
|
||||
secret: SharedSecret,
|
||||
},
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user