diff --git a/protobufs/auth.proto b/protobufs/auth.proto index 3da439e..c290a66 100644 --- a/protobufs/auth.proto +++ b/protobufs/auth.proto @@ -5,7 +5,10 @@ package arbiter.auth; import "google/protobuf/timestamp.proto"; message AuthChallengeRequest { - bytes pubkey = 1; + oneof payload { + bytes pubkey = 1; + string bootstrap_token = 2; + } } message AuthChallenge { diff --git a/server/Cargo.lock b/server/Cargo.lock index 1a05936..b550575 100644 --- a/server/Cargo.lock +++ b/server/Cargo.lock @@ -61,6 +61,7 @@ version = "0.1.0" dependencies = [ "bytes", "futures", + "kameo", "prost", "prost-build", "prost-derive", @@ -89,6 +90,7 @@ dependencies = [ "ed25519", "ed25519-dalek", "futures", + "kameo", "memsafe", "miette", "rand", @@ -104,6 +106,7 @@ dependencies = [ "tokio-stream", "tonic", "tracing", + "zeroize", ] [[package]] @@ -714,6 +717,12 @@ version = "1.0.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "92773504d58c093f6de2459af4af33faa518c13451eb8f2b5698ed3d36e7c813" +[[package]] +name = "dyn-clone" +version = "1.0.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d0881ea181b1df73ff77ffaaf9c7544ecc11e82fba9b5f27b262a3c73a332555" + [[package]] name = "ed25519" version = "3.0.0-rc.4" @@ -1216,6 +1225,33 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "kameo" +version = "0.19.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c4af7638c67029fd6821d02813c3913c803784648725d4df4082c9b91d7cbb1" +dependencies = [ + "downcast-rs", + "dyn-clone", + "futures", + "kameo_macros", + "serde", + "tokio", + "tracing", +] + +[[package]] +name = "kameo_macros" +version = "0.19.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a13c324e2d8c8e126e63e66087448b4267e263e6cb8770c56d10a9d0d279d9e2" +dependencies = [ + "heck", + "proc-macro2", + "quote", + "syn 2.0.114", +] + [[package]] name = "lazy_static" version = "1.5.0" @@ -2419,6 +2455,7 @@ dependencies = [ "signal-hook-registry", "socket2", "tokio-macros", + "tracing", "windows-sys 0.61.2", ] diff --git a/server/Cargo.toml b/server/Cargo.toml index 6e195b1..3d439d2 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -25,3 +25,4 @@ thiserror = "2.0.18" async-trait = "0.1.89" futures = "0.3.31" tokio-stream = { version = "0.1.18", features = ["full"] } +kameo = "0.19.2" \ No newline at end of file diff --git a/server/crates/arbiter-proto/Cargo.toml b/server/crates/arbiter-proto/Cargo.toml index 792c24e..d3bd268 100644 --- a/server/crates/arbiter-proto/Cargo.toml +++ b/server/crates/arbiter-proto/Cargo.toml @@ -14,6 +14,7 @@ tonic-prost = "0.14.3" rkyv = "0.8.15" tokio.workspace = true futures.workspace = true +kameo.workspace = true diff --git a/server/crates/arbiter-proto/build.rs b/server/crates/arbiter-proto/build.rs index 74d9bc4..77136cb 100644 --- a/server/crates/arbiter-proto/build.rs +++ b/server/crates/arbiter-proto/build.rs @@ -4,6 +4,7 @@ static PROTOBUF_DIR: &str = "../../../protobufs"; fn main() -> Result<(), Box> { configure() + .message_attribute(".", "#[derive(::kameo::Reply)]") .compile_protos( &[ format!("{}/arbiter.proto", PROTOBUF_DIR), @@ -11,6 +12,7 @@ fn main() -> Result<(), Box> { ], &[PROTOBUF_DIR.to_string()], ) + .unwrap(); Ok(()) } diff --git a/server/crates/arbiter-server/Cargo.toml b/server/crates/arbiter-server/Cargo.toml index 9f8bcda..6317470 100644 --- a/server/crates/arbiter-server/Cargo.toml +++ b/server/crates/arbiter-server/Cargo.toml @@ -32,3 +32,5 @@ chrono.workspace = true bytes = "1.11.1" memsafe = "0.4.0" chacha20poly1305 = { version = "0.10.1", features = ["std"] } +zeroize = { version = "1.8.2", features = ["std", "simd"] } +kameo.workspace = true diff --git a/server/crates/arbiter-server/src/handlers.rs b/server/crates/arbiter-server/src/actors.rs similarity index 100% rename from server/crates/arbiter-server/src/handlers.rs rename to server/crates/arbiter-server/src/actors.rs diff --git a/server/crates/arbiter-server/src/handlers/client.rs b/server/crates/arbiter-server/src/actors/client.rs similarity index 100% rename from server/crates/arbiter-server/src/handlers/client.rs rename to server/crates/arbiter-server/src/actors/client.rs diff --git a/server/crates/arbiter-server/src/actors/user_agent.rs b/server/crates/arbiter-server/src/actors/user_agent.rs new file mode 100644 index 0000000..57b23b7 --- /dev/null +++ b/server/crates/arbiter-server/src/actors/user_agent.rs @@ -0,0 +1,136 @@ +use arbiter_proto::{ + proto::{ + UserAgentRequest, UserAgentResponse, + auth::{ + self, AuthChallengeRequest, ClientMessage, client_message::Payload as ClientAuthPayload, + }, + user_agent_request::Payload as UserAgentRequestPayload, + }, + transport::Bi, +}; +use futures::StreamExt; +use kameo::{Actor, message::StreamMessage, messages, prelude::Context}; +use secrecy::{ExposeSecret, SecretBox}; +use tokio::sync::mpsc; +use tonic::{Status, transport::Server}; +use tracing::error; + +use crate::ServerContext; + +#[derive(Debug)] +pub struct ChallengeContext { + challenge: auth::AuthChallenge, + key: ed25519_dalek::SigningKey, +} + +smlang::statemachine!( + name: UserAgent, + derive_states: [Debug], + transitions: { + *Init + ReceivedRequest(ed25519_dalek::VerifyingKey) [async check_key_existence] / provide_challenge = WaitingForChallengeSolution(ChallengeContext), + Init + ReceivedBootstrapToken(String) = Authenticated, + + WaitingForChallengeSolution(ChallengeContext) + ReceivedGoodSolution = Authenticated, + WaitingForChallengeSolution(ChallengeContext) + ReceivedBadSolution = Error, + } +); + +impl UserAgentStateMachineContext for ServerContext { + #[allow(missing_docs)] + #[allow(clippy::unused_unit)] + fn provide_challenge( + &mut self, + event_data: ed25519_dalek::VerifyingKey, + ) -> Result { + todo!() + } + + #[allow(missing_docs)] + #[allow(clippy::result_unit_err)] + async fn check_key_existence( + &self, + event_data: &ed25519_dalek::VerifyingKey, + ) -> Result { + todo!() + } +} + +#[derive(Actor)] +pub struct UserAgentActor { + context: ServerContext, + state: UserAgentStateMachine, + tx: mpsc::Sender>, +} + +impl UserAgentActor { + pub(crate) fn new( + context: ServerContext, + tx: mpsc::Sender>, + ) -> Self { + Self { + context: context.clone(), + state: UserAgentStateMachine::new(context), + tx, + } + } + + async fn handle_grpc( + &mut self, + msg: UserAgentRequest, + ctx: &mut Context, + ) -> Result { + let Some(msg) = msg.payload else { + error!(actor = "useragent", "Received message with no payload"); + ctx.stop(); + return Err(tonic::Status::invalid_argument( + "Message payload is required", + )); + }; + + let UserAgentRequestPayload::AuthMessage(ClientMessage { + payload: Some(client_message), + }) = msg + else { + error!( + actor = "useragent", + "Received unexpected message type during authentication" + ); + ctx.stop(); + return Err(tonic::Status::invalid_argument( + "Unexpected message type during authentication", + )); + }; + + match client_message { + ClientAuthPayload::AuthChallengeRequest(AuthChallengeRequest { + payload: Some(payload), + }) => match payload { + auth::auth_challenge_request::Payload::Pubkey(items) => todo!(), + auth::auth_challenge_request::Payload::BootstrapToken(_) => todo!(), + }, + ClientAuthPayload::AuthChallengeSolution(_auth_challenge_solution) => todo!(), + _ => { + error!( + actor = "useragent", + "Received unexpected message type during authentication" + ); + ctx.stop(); + return Err(tonic::Status::invalid_argument( + "Unexpected message type during authentication", + )); + } + } + } +} + +#[messages] +impl UserAgentActor { + #[message(ctx)] + pub async fn grpc(&mut self, msg: UserAgentRequest, ctx: &mut Context) { + let result = self.handle_grpc(msg, ctx).await; + self.tx.send(result).await.unwrap_or_else(|e| { + error!(handler = "useragent", "Failed to send response: {}", e); + ctx.stop(); + }); + } +} diff --git a/server/crates/arbiter-server/src/context.rs b/server/crates/arbiter-server/src/context.rs index bd6ebd6..b2d3051 100644 --- a/server/crates/arbiter-server/src/context.rs +++ b/server/crates/arbiter-server/src/context.rs @@ -11,6 +11,7 @@ use tokio::sync::RwLock; use crate::{ context::{ + bootstrap::generate_token, lease::LeaseHandler, tls::{TlsDataRaw, TlsManager}, }, @@ -21,11 +22,9 @@ use crate::{ }, }; +pub(crate) mod bootstrap; pub(crate) mod lease; pub(crate) mod tls; -pub(crate) mod bootstrap { - -} #[derive(Error, Debug, Diagnostic)] pub enum InitError { @@ -44,6 +43,10 @@ pub enum InitError { #[error("TLS initialization failed: {0}")] #[diagnostic(code(arbiter_server::init::tls_init))] Tls(#[from] tls::TlsInitError), + + #[error("I/O Error: {0}")] + #[diagnostic(code(arbiter_server::init::io))] + Io(#[from] std::io::Error), } // TODO: Placeholder for secure root key cell implementation @@ -52,7 +55,7 @@ pub struct KeyStorage; statemachine! { name: Server, transitions: { - *NotBootstrapped + Bootstrapped = Sealed, + *NotBootstrapped(String) + Bootstrapped = Sealed, Sealed + Unsealed(KeyStorage) / move_key = Ready(KeyStorage), Ready(KeyStorage) + Sealed / dispose_key = Sealed, } @@ -135,7 +138,9 @@ impl ServerContext { drop(conn); - let mut state = ServerStateMachine::new(_Context); + let bootstrap_token = generate_token().await?; + + let mut state = ServerStateMachine::new(_Context, bootstrap_token); if let Some(settings) = &settings && settings.root_key_id.is_some() @@ -144,7 +149,6 @@ impl ServerContext { let _ = state.process_event(ServerEvents::Bootstrapped); } - Ok(Self(Arc::new(_ServerContextInner { db, rng, diff --git a/server/crates/arbiter-server/src/context/bootstrap.rs b/server/crates/arbiter-server/src/context/bootstrap.rs new file mode 100644 index 0000000..2a00ed5 --- /dev/null +++ b/server/crates/arbiter-server/src/context/bootstrap.rs @@ -0,0 +1,30 @@ +use arbiter_proto::{BOOTSTRAP_TOKEN_PATH, home_path}; +use diesel::{QueryDsl, dsl::exists, select}; +use diesel_async::RunQueryDsl; +use memsafe::MemSafe; +use miette::Diagnostic; +use rand::{RngExt, distr::StandardUniform, make_rng, rngs::StdRng}; +use secrecy::SecretString; +use thiserror::Error; +use tracing::info; +use zeroize::{Zeroize, Zeroizing}; + +use crate::db::{self, schema}; + +const TOKEN_LENGTH: usize = 64; + +pub async fn generate_token() -> Result { + let rng: StdRng = make_rng(); + + let token: String = rng + .sample_iter::(StandardUniform) + .take(TOKEN_LENGTH) + .fold(Default::default(), |mut accum, char| { + accum += char.to_string().as_str(); + accum + }); + + tokio::fs::write(home_path()?.join(BOOTSTRAP_TOKEN_PATH), token.as_str()).await?; + + Ok(token) +} diff --git a/server/crates/arbiter-server/src/handlers/user_agent.rs b/server/crates/arbiter-server/src/handlers/user_agent.rs deleted file mode 100644 index 3c2cdb3..0000000 --- a/server/crates/arbiter-server/src/handlers/user_agent.rs +++ /dev/null @@ -1,69 +0,0 @@ -use arbiter_proto::{ - proto::{ - UserAgentRequest, UserAgentResponse, - auth::{ - self, AuthChallengeRequest, ClientMessage, client_message::Payload as ClientAuthPayload - }, - user_agent_request::Payload as UserAgentRequestPayload, - }, - transport::Bi, -}; -use futures::StreamExt; -use tracing::error; - -use crate::ServerContext; - -smlang::statemachine!( - name: UserAgentAuth, - derive_states: [Debug], - derive_events: [Clone, Debug], - transitions: { - *Init + ReceivedRequest(ed25519_dalek::VerifyingKey) / provide_challenge = WaitingForChallengeSolution(auth::AuthChallenge), - WaitingForChallengeSolution(auth::AuthChallenge) + ReceivedGoodSolution = Authenticated, - WaitingForChallengeSolution(auth::AuthChallenge) + ReceivedBadSolution = Error, - } -); - - - -impl UserAgentAuthStateMachineContext for ServerContext { - #[allow(missing_docs)] - #[allow(clippy::unused_unit)] - fn provide_challenge< >(&mut self,_event_data:ed25519_dalek::VerifyingKey) -> Result { - todo!() - } -} - -pub(crate) async fn handle_user_agent( - context: ServerContext, - mut bistream: impl Bi + Unpin, -) { - let auth_sm = UserAgentAuthStateMachine::new(context); - - while let Some(Ok(msg)) = bistream.next().await - && auth_sm.state() != &UserAgentAuthStates::Authenticated - { - let Some(msg) = msg.payload else { - error!(handler = "useragent", "Received message with no payload"); - return; - }; - - let UserAgentRequestPayload::AuthMessage(ClientMessage { - payload: Some(client_message), - }) = msg - else { - error!( - handler = "useragent", - "Received unexpected message type during authentication" - ); - return; - }; - - match client_message { - ClientAuthPayload::AuthChallengeRequest(auth_challenge_request) => { - let AuthChallengeRequest { pubkey } = auth_challenge_request; - }, - ClientAuthPayload::AuthChallengeSolution(_auth_challenge_solution) => todo!(), - } - } -} diff --git a/server/crates/arbiter-server/src/lib.rs b/server/crates/arbiter-server/src/lib.rs index b2ad941..e9d58b6 100644 --- a/server/crates/arbiter-server/src/lib.rs +++ b/server/crates/arbiter-server/src/lib.rs @@ -1,23 +1,30 @@ #![allow(unused)] +use tracing::error; + use arbiter_proto::{ proto::{ClientRequest, ClientResponse, UserAgentRequest, UserAgentResponse}, transport::BiStream, }; use async_trait::async_trait; +use futures::StreamExt; +use kameo::actor::Spawn; use tokio_stream::wrappers::ReceiverStream; use tokio::sync::mpsc; use tonic::{Request, Response, Status}; use crate::{ - handlers::{client::handle_client, user_agent::handle_user_agent}, + actors::{ + client::handle_client, + user_agent::{self, UserAgentActor}, + }, context::ServerContext, }; -mod db; -pub mod handlers; +pub mod actors; mod context; +mod db; const DEFAULT_CHANNEL_SIZE: usize = 1000; @@ -51,16 +58,20 @@ impl arbiter_proto::proto::arbiter_service_server::ArbiterService for Server { &self, request: Request>, ) -> Result, Status> { - let req_stream = request.into_inner(); + let mut req_stream = request.into_inner(); let (tx, rx) = mpsc::channel(DEFAULT_CHANNEL_SIZE); - tokio::spawn(handle_user_agent( - self.context.clone(), - BiStream { - request_stream: req_stream, - response_sender: tx, - }, - )); + let actor = UserAgentActor::spawn(UserAgentActor::new(self.context.clone(), tx)); + + tokio::task::spawn(async move { + while let Some(Ok(req)) = req_stream.next().await && actor.is_alive() { + if actor.tell(user_agent::Grpc {msg: req}).await.is_err() { + error!("Failed to send message to UserAgentActor"); + break; + } + } + }); + Ok(Response::new(ReceiverStream::new(rx))) } }