diff --git a/app/.dart_tool/extension_discovery/README.md b/app/.dart_tool/extension_discovery/README.md new file mode 100644 index 0000000..9dc6757 --- /dev/null +++ b/app/.dart_tool/extension_discovery/README.md @@ -0,0 +1,31 @@ +Extension Discovery Cache +========================= + +This folder is used by `package:extension_discovery` to cache lists of +packages that contains extensions for other packages. + +DO NOT USE THIS FOLDER +---------------------- + + * Do not read (or rely) the contents of this folder. + * Do write to this folder. + +If you're interested in the lists of extensions stored in this folder use the +API offered by package `extension_discovery` to get this information. + +If this package doesn't work for your use-case, then don't try to read the +contents of this folder. It may change, and will not remain stable. + +Use package `extension_discovery` +--------------------------------- + +If you want to access information from this folder. + +Feel free to delete this folder +------------------------------- + +Files in this folder act as a cache, and the cache is discarded if the files +are older than the modification time of `.dart_tool/package_config.json`. + +Hence, it should never be necessary to clear this cache manually, if you find a +need to do please file a bug. diff --git a/app/.dart_tool/extension_discovery/vs_code.json b/app/.dart_tool/extension_discovery/vs_code.json new file mode 100644 index 0000000..b93d172 --- /dev/null +++ b/app/.dart_tool/extension_discovery/vs_code.json @@ -0,0 +1 @@ +{"version":2,"entries":[{"package":"app","rootUri":"../","packageUri":"lib/"}]} \ No newline at end of file diff --git a/app/.dart_tool/package_config.json b/app/.dart_tool/package_config.json new file mode 100644 index 0000000..258edaf --- /dev/null +++ b/app/.dart_tool/package_config.json @@ -0,0 +1,178 @@ +{ + "configVersion": 2, + "packages": [ + { + "name": "async", + "rootUri": "file:///Users/kaska/.pub-cache/hosted/pub.dev/async-2.13.0", + "packageUri": "lib/", + "languageVersion": "3.4" + }, + { + "name": "boolean_selector", + "rootUri": "file:///Users/kaska/.pub-cache/hosted/pub.dev/boolean_selector-2.1.2", + "packageUri": "lib/", + "languageVersion": "3.1" + }, + { + "name": "characters", + "rootUri": "file:///Users/kaska/.pub-cache/hosted/pub.dev/characters-1.4.0", + "packageUri": "lib/", + "languageVersion": "3.4" + }, + { + "name": "clock", + "rootUri": "file:///Users/kaska/.pub-cache/hosted/pub.dev/clock-1.1.2", + "packageUri": "lib/", + "languageVersion": "3.4" + }, + { + "name": "collection", + "rootUri": "file:///Users/kaska/.pub-cache/hosted/pub.dev/collection-1.19.1", + "packageUri": "lib/", + "languageVersion": "3.4" + }, + { + "name": "cupertino_icons", + "rootUri": "file:///Users/kaska/.pub-cache/hosted/pub.dev/cupertino_icons-1.0.8", + "packageUri": "lib/", + "languageVersion": "3.1" + }, + { + "name": "fake_async", + "rootUri": "file:///Users/kaska/.pub-cache/hosted/pub.dev/fake_async-1.3.3", + "packageUri": "lib/", + "languageVersion": "3.3" + }, + { + "name": "flutter", + "rootUri": "file:///Users/kaska/.local/share/mise/installs/flutter/3.38.9-stable/packages/flutter", + "packageUri": "lib/", + "languageVersion": "3.8" + }, + { + "name": "flutter_lints", + "rootUri": "file:///Users/kaska/.pub-cache/hosted/pub.dev/flutter_lints-6.0.0", + "packageUri": "lib/", + "languageVersion": "3.8" + }, + { + "name": "flutter_test", + "rootUri": "file:///Users/kaska/.local/share/mise/installs/flutter/3.38.9-stable/packages/flutter_test", + "packageUri": "lib/", + "languageVersion": "3.8" + }, + { + "name": "leak_tracker", + "rootUri": "file:///Users/kaska/.pub-cache/hosted/pub.dev/leak_tracker-11.0.2", + "packageUri": "lib/", + "languageVersion": "3.2" + }, + { + "name": "leak_tracker_flutter_testing", + "rootUri": "file:///Users/kaska/.pub-cache/hosted/pub.dev/leak_tracker_flutter_testing-3.0.10", + "packageUri": "lib/", + "languageVersion": "3.2" + }, + { + "name": "leak_tracker_testing", + "rootUri": "file:///Users/kaska/.pub-cache/hosted/pub.dev/leak_tracker_testing-3.0.2", + "packageUri": "lib/", + "languageVersion": "3.2" + }, + { + "name": "lints", + "rootUri": "file:///Users/kaska/.pub-cache/hosted/pub.dev/lints-6.1.0", + "packageUri": "lib/", + "languageVersion": "3.8" + }, + { + "name": "matcher", + "rootUri": "file:///Users/kaska/.pub-cache/hosted/pub.dev/matcher-0.12.17", + "packageUri": "lib/", + "languageVersion": "3.4" + }, + { + "name": "material_color_utilities", + "rootUri": "file:///Users/kaska/.pub-cache/hosted/pub.dev/material_color_utilities-0.11.1", + "packageUri": "lib/", + "languageVersion": "2.17" + }, + { + "name": "meta", + "rootUri": "file:///Users/kaska/.pub-cache/hosted/pub.dev/meta-1.17.0", + "packageUri": "lib/", + "languageVersion": "3.5" + }, + { + "name": "path", + "rootUri": "file:///Users/kaska/.pub-cache/hosted/pub.dev/path-1.9.1", + "packageUri": "lib/", + "languageVersion": "3.4" + }, + { + "name": "sky_engine", + "rootUri": "file:///Users/kaska/.local/share/mise/installs/flutter/3.38.9-stable/bin/cache/pkg/sky_engine", + "packageUri": "lib/", + "languageVersion": "3.8" + }, + { + "name": "source_span", + "rootUri": "file:///Users/kaska/.pub-cache/hosted/pub.dev/source_span-1.10.2", + "packageUri": "lib/", + "languageVersion": "3.1" + }, + { + "name": "stack_trace", + "rootUri": "file:///Users/kaska/.pub-cache/hosted/pub.dev/stack_trace-1.12.1", + "packageUri": "lib/", + "languageVersion": "3.4" + }, + { + "name": "stream_channel", + "rootUri": "file:///Users/kaska/.pub-cache/hosted/pub.dev/stream_channel-2.1.4", + "packageUri": "lib/", + "languageVersion": "3.3" + }, + { + "name": "string_scanner", + "rootUri": "file:///Users/kaska/.pub-cache/hosted/pub.dev/string_scanner-1.4.1", + "packageUri": "lib/", + "languageVersion": "3.1" + }, + { + "name": "term_glyph", + "rootUri": "file:///Users/kaska/.pub-cache/hosted/pub.dev/term_glyph-1.2.2", + "packageUri": "lib/", + "languageVersion": "3.1" + }, + { + "name": "test_api", + "rootUri": "file:///Users/kaska/.pub-cache/hosted/pub.dev/test_api-0.7.7", + "packageUri": "lib/", + "languageVersion": "3.5" + }, + { + "name": "vector_math", + "rootUri": "file:///Users/kaska/.pub-cache/hosted/pub.dev/vector_math-2.2.0", + "packageUri": "lib/", + "languageVersion": "3.1" + }, + { + "name": "vm_service", + "rootUri": "file:///Users/kaska/.pub-cache/hosted/pub.dev/vm_service-15.0.2", + "packageUri": "lib/", + "languageVersion": "3.5" + }, + { + "name": "app", + "rootUri": "../", + "packageUri": "lib/", + "languageVersion": "3.10" + } + ], + "generator": "pub", + "generatorVersion": "3.10.8", + "flutterRoot": "file:///Users/kaska/.local/share/mise/installs/flutter/3.38.9-stable", + "flutterVersion": "3.38.9", + "pubCache": "file:///Users/kaska/.pub-cache" +} diff --git a/app/.dart_tool/package_graph.json b/app/.dart_tool/package_graph.json new file mode 100644 index 0000000..2affe54 --- /dev/null +++ b/app/.dart_tool/package_graph.json @@ -0,0 +1,230 @@ +{ + "roots": [ + "app" + ], + "packages": [ + { + "name": "app", + "version": "1.0.0+1", + "dependencies": [ + "cupertino_icons", + "flutter" + ], + "devDependencies": [ + "flutter_lints", + "flutter_test" + ] + }, + { + "name": "flutter_lints", + "version": "6.0.0", + "dependencies": [ + "lints" + ] + }, + { + "name": "flutter_test", + "version": "0.0.0", + "dependencies": [ + "clock", + "collection", + "fake_async", + "flutter", + "leak_tracker_flutter_testing", + "matcher", + "meta", + "path", + "stack_trace", + "stream_channel", + "test_api", + "vector_math" + ] + }, + { + "name": "cupertino_icons", + "version": "1.0.8", + "dependencies": [] + }, + { + "name": "flutter", + "version": "0.0.0", + "dependencies": [ + "characters", + "collection", + "material_color_utilities", + "meta", + "sky_engine", + "vector_math" + ] + }, + { + "name": "lints", + "version": "6.1.0", + "dependencies": [] + }, + { + "name": "stream_channel", + "version": "2.1.4", + "dependencies": [ + "async" + ] + }, + { + "name": "meta", + "version": "1.17.0", + "dependencies": [] + }, + { + "name": "collection", + "version": "1.19.1", + "dependencies": [] + }, + { + "name": "leak_tracker_flutter_testing", + "version": "3.0.10", + "dependencies": [ + "flutter", + "leak_tracker", + "leak_tracker_testing", + "matcher", + "meta" + ] + }, + { + "name": "vector_math", + "version": "2.2.0", + "dependencies": [] + }, + { + "name": "stack_trace", + "version": "1.12.1", + "dependencies": [ + "path" + ] + }, + { + "name": "clock", + "version": "1.1.2", + "dependencies": [] + }, + { + "name": "fake_async", + "version": "1.3.3", + "dependencies": [ + "clock", + "collection" + ] + }, + { + "name": "path", + "version": "1.9.1", + "dependencies": [] + }, + { + "name": "matcher", + "version": "0.12.17", + "dependencies": [ + "async", + "meta", + "stack_trace", + "term_glyph", + "test_api" + ] + }, + { + "name": "test_api", + "version": "0.7.7", + "dependencies": [ + "async", + "boolean_selector", + "collection", + "meta", + "source_span", + "stack_trace", + "stream_channel", + "string_scanner", + "term_glyph" + ] + }, + { + "name": "sky_engine", + "version": "0.0.0", + "dependencies": [] + }, + { + "name": "material_color_utilities", + "version": "0.11.1", + "dependencies": [ + "collection" + ] + }, + { + "name": "characters", + "version": "1.4.0", + "dependencies": [] + }, + { + "name": "async", + "version": "2.13.0", + "dependencies": [ + "collection", + "meta" + ] + }, + { + "name": "leak_tracker_testing", + "version": "3.0.2", + "dependencies": [ + "leak_tracker", + "matcher", + "meta" + ] + }, + { + "name": "leak_tracker", + "version": "11.0.2", + "dependencies": [ + "clock", + "collection", + "meta", + "path", + "vm_service" + ] + }, + { + "name": "term_glyph", + "version": "1.2.2", + "dependencies": [] + }, + { + "name": "string_scanner", + "version": "1.4.1", + "dependencies": [ + "source_span" + ] + }, + { + "name": "source_span", + "version": "1.10.2", + "dependencies": [ + "collection", + "path", + "term_glyph" + ] + }, + { + "name": "boolean_selector", + "version": "2.1.2", + "dependencies": [ + "source_span", + "string_scanner" + ] + }, + { + "name": "vm_service", + "version": "15.0.2", + "dependencies": [] + } + ], + "configVersion": 1 +} \ No newline at end of file diff --git a/app/.dart_tool/version b/app/.dart_tool/version new file mode 100644 index 0000000..75fffa6 --- /dev/null +++ b/app/.dart_tool/version @@ -0,0 +1 @@ +3.38.9 \ No newline at end of file diff --git a/app/macos/Flutter/ephemeral/Flutter-Generated.xcconfig b/app/macos/Flutter/ephemeral/Flutter-Generated.xcconfig new file mode 100644 index 0000000..6da0bad --- /dev/null +++ b/app/macos/Flutter/ephemeral/Flutter-Generated.xcconfig @@ -0,0 +1,11 @@ +// This is a generated file; do not edit or check into version control. +FLUTTER_ROOT=/Users/kaska/.local/share/mise/installs/flutter/3.38.9-stable +FLUTTER_APPLICATION_PATH=/Users/kaska/Documents/Projects/Major/arbiter/app +COCOAPODS_PARALLEL_CODE_SIGN=true +FLUTTER_BUILD_DIR=build +FLUTTER_BUILD_NAME=1.0.0 +FLUTTER_BUILD_NUMBER=1 +DART_OBFUSCATION=false +TRACK_WIDGET_CREATION=true +TREE_SHAKE_ICONS=false +PACKAGE_CONFIG=.dart_tool/package_config.json diff --git a/app/macos/Flutter/ephemeral/flutter_export_environment.sh b/app/macos/Flutter/ephemeral/flutter_export_environment.sh new file mode 100755 index 0000000..1062508 --- /dev/null +++ b/app/macos/Flutter/ephemeral/flutter_export_environment.sh @@ -0,0 +1,12 @@ +#!/bin/sh +# This is a generated file; do not edit or check into version control. +export "FLUTTER_ROOT=/Users/kaska/.local/share/mise/installs/flutter/3.38.9-stable" +export "FLUTTER_APPLICATION_PATH=/Users/kaska/Documents/Projects/Major/arbiter/app" +export "COCOAPODS_PARALLEL_CODE_SIGN=true" +export "FLUTTER_BUILD_DIR=build" +export "FLUTTER_BUILD_NAME=1.0.0" +export "FLUTTER_BUILD_NUMBER=1" +export "DART_OBFUSCATION=false" +export "TRACK_WIDGET_CREATION=true" +export "TREE_SHAKE_ICONS=false" +export "PACKAGE_CONFIG=.dart_tool/package_config.json" diff --git a/server/Cargo.lock b/server/Cargo.lock index 42828d3..fc8e5b0 100644 --- a/server/Cargo.lock +++ b/server/Cargo.lock @@ -75,6 +75,7 @@ dependencies = [ "tonic", "tonic-prost", "tonic-prost-build", + "tracing", "url", ] diff --git a/server/crates/arbiter-proto/Cargo.toml b/server/crates/arbiter-proto/Cargo.toml index ade18fe..006e406 100644 --- a/server/crates/arbiter-proto/Cargo.toml +++ b/server/crates/arbiter-proto/Cargo.toml @@ -19,6 +19,7 @@ thiserror.workspace = true rustls-pki-types.workspace = true base64 = "0.22.1" prost-types.workspace = true +tracing.workspace = true [build-dependencies] tonic-prost-build = "0.14.3" diff --git a/server/crates/arbiter-proto/src/transport.rs b/server/crates/arbiter-proto/src/transport.rs index 691ef9a..c360162 100644 --- a/server/crates/arbiter-proto/src/transport.rs +++ b/server/crates/arbiter-proto/src/transport.rs @@ -1,46 +1,283 @@ -use futures::{Stream, StreamExt}; -use tokio::sync::mpsc::{self, error::SendError}; -use tonic::{Status, Streaming}; +//! Transport-facing abstractions for protocol/session code. +//! +//! This module separates three concerns: +//! +//! - protocol/session logic wants a small duplex interface ([`Bi`]) +//! - transport adapters need to push concrete stream items to an underlying IO layer +//! - server/client boundaries may need to translate domain outbounds into transport +//! framing (for example, a tonic stream item) +//! +//! [`Bi`] is intentionally minimal and transport-agnostic: +//! - [`Bi::recv`] yields inbound protocol messages +//! - [`Bi::send`] accepts outbound protocol/domain items +//! +//! # Generic Ordering Rule +//! +//! This module uses a single convention consistently: when a type or trait is +//! parameterized by protocol message directions, the generic parameters are +//! declared as `Inbound` first, then `Outbound`. +//! +//! For [`Bi`], that means `Bi`: +//! - `recv() -> Option` +//! - `send(Outbound)` +//! +//! For adapter types that are parameterized by direction-specific converters, +//! inbound-related converter parameters are declared before outbound-related +//! converter parameters. +//! +//! [`ProtocolConverter`] is the boundary object that converts a protocol/domain +//! outbound item into the concrete outbound item expected by a transport sender. +//! The conversion is infallible, so domain-level recoverable failures should be +//! represented inside the domain outbound type itself (for example, +//! `Result`). +//! +//! [`GrpcAdapter`] combines: +//! - a tonic inbound stream +//! - a Tokio sender for outbound transport items +//! - a [`ProtocolConverter`] for the receive path +//! - a [`ProtocolConverter`] for the send path +//! +//! [`DummyTransport`] is a no-op implementation useful for tests and local actor +//! execution where no real network stream exists. +//! +//! # Component Interaction +//! +//! The typical layering looks like this: +//! +//! ```text +//! inbound (network -> protocol) +//! ============================ +//! +//! tonic::Streaming -> GrpcAdapter::recv() -> Bi::recv() -> protocol/session actor +//! | +//! +--> recv ProtocolConverter::convert(transport) +//! +//! outbound (protocol -> network) +//! ============================== +//! +//! protocol/session actor -> Bi::send(domain outbound item, e.g. Result) +//! -> GrpcAdapter::send() +//! | +//! +--> send ProtocolConverter::convert(domain) +//! -> Tokio mpsc::Sender -> tonic response stream +//! ``` +//! +//! # Design Notes +//! +//! - `recv()` collapses adapter-specific receive failures into `None`, which +//! lets protocol code treat stream termination and transport receive failure as +//! "no more inbound items" when no finer distinction is required. +//! - `send()` returns [`Error`] only for transport delivery failures (for example, +//! when the outbound channel is closed). +//! - Conversion policy lives outside protocol/session logic and can be defined at +//! the transport boundary (such as a server endpoint module). When domain and +//! transport types are identical, [`IdentityConverter`] can be used. +use std::marker::PhantomData; -// Abstraction for stream for sans-io capabilities -pub trait Bi: Stream> + Send + Sync + 'static { - type Error; +use futures::StreamExt; +use tokio::sync::mpsc; +use tonic::Streaming; + +/// Errors returned by transport adapters implementing [`Bi`]. +pub enum Error { + /// The outbound side of the transport is no longer accepting messages. + ChannelClosed, +} + +/// Minimal bidirectional transport abstraction used by protocol code. +/// +/// `Bi` models a duplex channel with: +/// - inbound items of type `Inbound` read via [`Bi::recv`] +/// - outbound items of type `Outbound` written via [`Bi::send`] +/// +/// The trait intentionally exposes only the operations the protocol layer needs, +/// allowing it to work with gRPC streams and other transport implementations. +/// +/// # Stream termination and errors +/// +/// [`Bi::recv`] returns: +/// - `Some(item)` when a new inbound message is available +/// - `None` when the inbound stream ends or the underlying transport reports an error +/// +/// Implementations may collapse transport-specific receive errors into `None` +/// when the protocol does not need to distinguish them from normal stream +/// termination. +pub trait Bi: Send + Sync + 'static { + /// Sends one outbound item to the peer. fn send( &mut self, - item: Result, - ) -> impl std::future::Future> + Send; + item: Outbound, + ) -> impl std::future::Future> + Send; + + /// Receives the next inbound item. + /// + /// Returns `None` when the inbound stream is finished or can no longer + /// produce items. + fn recv(&mut self) -> impl std::future::Future> + Send; } -// Bi-directional stream abstraction for handling gRPC streaming requests and responses -pub struct BiStream { - pub request_stream: Streaming, - pub response_sender: mpsc::Sender>, +/// Converts protocol/domain outbound items into transport-layer outbound items. +/// +/// This trait is used by transport adapters that need to emit a concrete stream +/// item type (for example, tonic server streams) while protocol code prefers to +/// work with domain-oriented outbound values. +/// +/// `convert` is infallible by design. Any recoverable protocol failure should be +/// represented in [`Self::Domain`] and mapped into the transport item in the +/// converter implementation. +pub trait ProtocolConverter: Send + Sync + 'static { + /// Outbound item produced by protocol/domain code. + type Domain; + + /// Outbound item required by the transport sender. + type Transport; + + /// Maps a protocol/domain outbound item into the transport sender item. + fn convert(&self, item: Self::Domain) -> Self::Transport; } -impl Stream for BiStream -where - T: Send + 'static, - U: Send + 'static, -{ - type Item = Result; +/// A [`ProtocolConverter`] that forwards values unchanged. +/// +/// Useful when the protocol-facing and transport-facing item types are +/// identical, but a converter is still required by an adapter API. +pub struct IdentityConverter { + _marker: PhantomData, +} - fn poll_next( - mut self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - self.request_stream.poll_next_unpin(cx) +impl IdentityConverter { + pub fn new() -> Self { + Self { + _marker: PhantomData, + } } } -impl Bi for BiStream -where - T: Send + 'static, - U: Send + 'static, -{ - type Error = SendError>; - - async fn send(&mut self, item: Result) -> Result<(), Self::Error> { - self.response_sender.send(item).await +impl Default for IdentityConverter { + fn default() -> Self { + Self::new() + } +} + +impl ProtocolConverter for IdentityConverter +where + T: Send + Sync + 'static, +{ + type Domain = T; + type Transport = T; + + fn convert(&self, item: Self::Domain) -> Self::Transport { + item + } +} + +/// [`Bi`] adapter backed by a tonic gRPC bidirectional stream. +/// +/// The adapter owns converter instances for both directions: +/// - receive converter: transport inbound -> protocol inbound +/// - send converter: protocol outbound -> transport outbound +/// +/// This keeps protocol actors decoupled from transport framing conventions in +/// both directions. +pub struct GrpcAdapter { + sender: mpsc::Sender, + receiver: Streaming, + inbound_converter: InboundConverter, + outbound_converter: OutboundConverter, +} + +impl GrpcAdapter +where + InboundConverter: ProtocolConverter, + OutboundConverter: ProtocolConverter, +{ + /// Creates a new gRPC-backed [`Bi`] adapter. + /// + /// The provided converters define: + /// - the protocol outbound item and corresponding transport outbound item + /// - the transport inbound item and corresponding protocol inbound item + pub fn new( + sender: mpsc::Sender, + receiver: Streaming, + inbound_converter: InboundConverter, + outbound_converter: OutboundConverter, + ) -> Self { + Self { + sender, + receiver, + inbound_converter, + outbound_converter, + } + } +} + +impl + Bi + for GrpcAdapter +where + InboundConverter: ProtocolConverter, + OutboundConverter: ProtocolConverter, + OutboundConverter::Domain: Send + 'static, + OutboundConverter::Transport: Send + 'static, + InboundConverter::Transport: Send + 'static, + InboundConverter::Domain: Send + 'static, +{ + #[tracing::instrument(level = "trace", skip(self, item))] + async fn send(&mut self, item: OutboundConverter::Domain) -> Result<(), Error> { + let outbound: OutboundConverter::Transport = self.outbound_converter.convert(item); + self.sender + .send(outbound) + .await + .map_err(|_| Error::ChannelClosed) + } + + #[tracing::instrument(level = "trace", skip(self))] + async fn recv(&mut self) -> Option { + self.receiver + .next() + .await + .transpose() + .ok() + .flatten() + .map(|item| self.inbound_converter.convert(item)) + } +} + +/// No-op [`Bi`] transport for tests and manual actor usage. +/// +/// `send` drops all items and succeeds. [`Bi::recv`] never resolves and therefore +/// does not busy-wait or spuriously close the stream. +pub struct DummyTransport { + _marker: PhantomData<(Inbound, Outbound)>, +} + +impl DummyTransport { + pub fn new() -> Self { + Self { + _marker: PhantomData, + } + } +} + +impl Default for DummyTransport { + fn default() -> Self { + Self::new() + } +} + +impl Bi for DummyTransport +where + Inbound: Send + Sync + 'static, + Outbound: Send + Sync + 'static, +{ + async fn send(&mut self, _item: Outbound) -> Result<(), Error> { + Ok(()) + } + + fn recv(&mut self) -> impl std::future::Future> + Send { + async { + std::future::pending::<()>().await; + None + } } } diff --git a/server/crates/arbiter-server/src/actors/user_agent/mod.rs b/server/crates/arbiter-server/src/actors/user_agent/mod.rs index 700dd40..50cc9dc 100644 --- a/server/crates/arbiter-server/src/actors/user_agent/mod.rs +++ b/server/crates/arbiter-server/src/actors/user_agent/mod.rs @@ -1,21 +1,26 @@ use std::{ops::DerefMut, sync::Mutex}; -use arbiter_proto::proto::{ - UnsealEncryptedKey, UnsealResult, UnsealStart, UnsealStartResponse, UserAgentResponse, - auth::{ - self, AuthChallengeRequest, AuthOk, ServerMessage as AuthServerMessage, - server_message::Payload as ServerAuthPayload, +use arbiter_proto::{ + proto::{ + UnsealEncryptedKey, UnsealResult, UnsealStart, UnsealStartResponse, UserAgentRequest, + UserAgentResponse, + auth::{ + self, AuthChallengeRequest, AuthOk, ClientMessage as ClientAuthMessage, + ServerMessage as AuthServerMessage, client_message::Payload as ClientAuthPayload, + server_message::Payload as ServerAuthPayload, + }, + user_agent_request::Payload as UserAgentRequestPayload, + user_agent_response::Payload as UserAgentResponsePayload, }, - user_agent_response::Payload as UserAgentResponsePayload, + transport::{Bi, DummyTransport}, }; use chacha20poly1305::{AeadInPlace, XChaCha20Poly1305, XNonce, aead::KeyInit}; use diesel::{ExpressionMethods as _, OptionalExtension as _, QueryDsl, dsl::update}; use diesel_async::RunQueryDsl; use ed25519_dalek::VerifyingKey; -use kameo::{Actor, error::SendError, messages}; +use kameo::{Actor, error::SendError}; use memsafe::MemSafe; -use tokio::sync::mpsc::Sender; -use tonic::Status; +use tokio::select; use tracing::{error, info}; use x25519_dalek::{EphemeralSecret, PublicKey}; @@ -31,62 +36,105 @@ use crate::{ }, }, db::{self, schema}, - errors::GrpcStatusExt, }; mod state; -mod transport; -pub(crate) use transport::handle_user_agent; +#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)] +pub enum UserAgentError { + #[error("Expected message with payload")] + MissingRequestPayload, + #[error("Expected message with payload")] + UnexpectedRequestPayload, + #[error("Invalid state for challenge solution")] + InvalidStateForChallengeSolution, + #[error("Invalid state for unseal encrypted key")] + InvalidStateForUnsealEncryptedKey, + #[error("client_pubkey must be 32 bytes")] + InvalidClientPubkeyLength, + #[error("Expected pubkey to have specific length")] + InvalidAuthPubkeyLength, + #[error("Failed to convert pubkey to VerifyingKey")] + InvalidAuthPubkeyEncoding, + #[error("Invalid signature length")] + InvalidSignatureLength, + #[error("Invalid bootstrap token")] + InvalidBootstrapToken, + #[error("Public key not registered")] + PublicKeyNotRegistered, + #[error("Invalid challenge solution")] + InvalidChallengeSolution, + #[error("State machine error")] + StateTransitionFailed, + #[error("Bootstrap token consumption failed")] + BootstrapperActorUnreachable, + #[error("Vault is not available")] + KeyHolderActorUnreachable, + #[error("Database pool error")] + DatabasePoolUnavailable, + #[error("Database error")] + DatabaseOperationFailed, +} -#[derive(Actor)] -pub struct UserAgentActor { +pub struct UserAgentActor +where + Transport: Bi>, +{ db: db::DatabasePool, actors: GlobalActors, state: UserAgentStateMachine, - // will be used in future - _tx: Sender>, + transport: Transport, } -impl UserAgentActor { - pub(crate) fn new( - context: ServerContext, - tx: Sender>, - ) -> Self { +impl UserAgentActor +where + Transport: Bi>, +{ + pub(crate) fn new(context: ServerContext, transport: Transport) -> Self { Self { db: context.db.clone(), actors: context.actors.clone(), state: UserAgentStateMachine::new(DummyContext), - _tx: tx, + transport, } } - pub fn new_manual( - db: db::DatabasePool, - actors: GlobalActors, - tx: Sender>, - ) -> Self { - Self { - db, - actors, - state: UserAgentStateMachine::new(DummyContext), - _tx: tx, - } - } - - fn transition(&mut self, event: UserAgentEvents) -> Result<(), Status> { + fn transition(&mut self, event: UserAgentEvents) -> Result<(), UserAgentError> { self.state.process_event(event).map_err(|e| { error!(?e, "State transition failed"); - Status::internal("State machine error") + UserAgentError::StateTransitionFailed })?; Ok(()) } + pub async fn process_transport_inbound(&mut self, req: UserAgentRequest) -> Output { + let msg = req.payload.ok_or_else(|| { + error!(actor = "useragent", "Received message with no payload"); + UserAgentError::MissingRequestPayload + })?; + + match msg { + UserAgentRequestPayload::AuthMessage(ClientAuthMessage { + payload: Some(ClientAuthPayload::AuthChallengeRequest(req)), + }) => self.handle_auth_challenge_request(req).await, + UserAgentRequestPayload::AuthMessage(ClientAuthMessage { + payload: Some(ClientAuthPayload::AuthChallengeSolution(solution)), + }) => self.handle_auth_challenge_solution(solution).await, + UserAgentRequestPayload::UnsealStart(unseal_start) => { + self.handle_unseal_request(unseal_start).await + } + UserAgentRequestPayload::UnsealEncryptedKey(unseal_encrypted_key) => { + self.handle_unseal_encrypted_key(unseal_encrypted_key).await + } + _ => Err(UserAgentError::UnexpectedRequestPayload), + } + } + async fn auth_with_bootstrap_token( &mut self, pubkey: ed25519_dalek::VerifyingKey, token: String, - ) -> Result { + ) -> Result { let token_ok: bool = self .actors .bootstrapper @@ -94,16 +142,19 @@ impl UserAgentActor { .await .map_err(|e| { error!(?pubkey, "Failed to consume bootstrap token: {e}"); - Status::internal("Bootstrap token consumption failed") + UserAgentError::BootstrapperActorUnreachable })?; if !token_ok { error!(?pubkey, "Invalid bootstrap token provided"); - return Err(Status::invalid_argument("Invalid bootstrap token")); + return Err(UserAgentError::InvalidBootstrapToken); } { - let mut conn = self.db.get().await.to_status()?; + let mut conn = self.db.get().await.map_err(|e| { + error!(error = ?e, "Database pool error"); + UserAgentError::DatabasePoolUnavailable + })?; diesel::insert_into(schema::useragent_client::table) .values(( @@ -112,7 +163,10 @@ impl UserAgentActor { )) .execute(&mut conn) .await - .to_status()?; + .map_err(|e| { + error!(error = ?e, "Database error"); + UserAgentError::DatabaseOperationFailed + })?; } self.transition(UserAgentEvents::ReceivedBootstrapToken)?; @@ -122,7 +176,10 @@ impl UserAgentActor { async fn auth_with_challenge(&mut self, pubkey: VerifyingKey, pubkey_bytes: Vec) -> Output { let nonce: Option = { - let mut db_conn = self.db.get().await.to_status()?; + let mut db_conn = self.db.get().await.map_err(|e| { + error!(error = ?e, "Database pool error"); + UserAgentError::DatabasePoolUnavailable + })?; db_conn .exclusive_transaction(|conn| { Box::pin(async move { @@ -147,12 +204,15 @@ impl UserAgentActor { }) .await .optional() - .to_status()? + .map_err(|e| { + error!(error = ?e, "Database error"); + UserAgentError::DatabaseOperationFailed + })? }; let Some(nonce) = nonce else { error!(?pubkey, "Public key not found in database"); - return Err(Status::unauthenticated("Public key not registered")); + return Err(UserAgentError::PublicKeyNotRegistered); }; let challenge = auth::AuthChallenge { @@ -177,19 +237,17 @@ impl UserAgentActor { fn verify_challenge_solution( &self, solution: &auth::AuthChallengeSolution, - ) -> Result<(bool, &ChallengeContext), Status> { + ) -> Result<(bool, &ChallengeContext), UserAgentError> { let UserAgentStates::WaitingForChallengeSolution(challenge_context) = self.state.state() else { error!("Received challenge solution in invalid state"); - return Err(Status::invalid_argument( - "Invalid state for challenge solution", - )); + return Err(UserAgentError::InvalidStateForChallengeSolution); }; let formatted_challenge = arbiter_proto::format_challenge(&challenge_context.challenge); let signature = solution.signature.as_slice().try_into().map_err(|_| { error!(?solution, "Invalid signature length"); - Status::invalid_argument("Invalid signature length") + UserAgentError::InvalidSignatureLength })?; let valid = challenge_context @@ -201,7 +259,7 @@ impl UserAgentActor { } } -type Output = Result; +type Output = Result; fn auth_response(payload: ServerAuthPayload) -> UserAgentResponse { UserAgentResponse { @@ -217,17 +275,18 @@ fn unseal_response(payload: UserAgentResponsePayload) -> UserAgentResponse { } } -#[messages] -impl UserAgentActor { - #[message] - pub async fn handle_unseal_request(&mut self, req: UnsealStart) -> Output { +impl UserAgentActor +where + Transport: Bi>, +{ + async fn handle_unseal_request(&mut self, req: UnsealStart) -> Output { let secret = EphemeralSecret::random(); let public_key = PublicKey::from(&secret); let client_pubkey_bytes: [u8; 32] = req .client_pubkey .try_into() - .map_err(|_| Status::invalid_argument("client_pubkey must be 32 bytes"))?; + .map_err(|_| UserAgentError::InvalidClientPubkeyLength)?; let client_public_key = PublicKey::from(client_pubkey_bytes); @@ -243,13 +302,10 @@ impl UserAgentActor { )) } - #[message] - pub async fn handle_unseal_encrypted_key(&mut self, req: UnsealEncryptedKey) -> Output { + async fn handle_unseal_encrypted_key(&mut self, req: UnsealEncryptedKey) -> Output { let UserAgentStates::WaitingForUnsealKey(unseal_context) = self.state.state() else { error!("Received unseal encrypted key in invalid state"); - return Err(Status::failed_precondition( - "Invalid state for unseal encrypted key", - )); + return Err(UserAgentError::InvalidStateForUnsealEncryptedKey); }; let ephemeral_secret = { let mut secret_lock = unseal_context.secret.lock().unwrap(); @@ -313,7 +369,7 @@ impl UserAgentActor { Err(err) => { error!(?err, "Failed to send unseal request to keyholder"); self.transition(UserAgentEvents::ReceivedInvalidKey)?; - Err(Status::internal("Vault is not available")) + Err(UserAgentError::KeyHolderActorUnreachable) } } } @@ -327,14 +383,14 @@ impl UserAgentActor { } } - #[message] - pub async fn handle_auth_challenge_request(&mut self, req: AuthChallengeRequest) -> Output { - let pubkey = req.pubkey.as_array().ok_or(Status::invalid_argument( - "Expected pubkey to have specific length", - ))?; + async fn handle_auth_challenge_request(&mut self, req: AuthChallengeRequest) -> Output { + let pubkey = req + .pubkey + .as_array() + .ok_or(UserAgentError::InvalidAuthPubkeyLength)?; let pubkey = VerifyingKey::from_bytes(pubkey).map_err(|_err| { error!(?pubkey, "Failed to convert to VerifyingKey"); - Status::invalid_argument("Failed to convert pubkey to VerifyingKey") + UserAgentError::InvalidAuthPubkeyEncoding })?; self.transition(UserAgentEvents::AuthRequest)?; @@ -345,8 +401,7 @@ impl UserAgentActor { } } - #[message] - pub async fn handle_auth_challenge_solution( + async fn handle_auth_challenge_solution( &mut self, solution: auth::AuthChallengeSolution, ) -> Output { @@ -362,7 +417,72 @@ impl UserAgentActor { } else { error!("Client provided invalid solution to authentication challenge"); self.transition(UserAgentEvents::ReceivedBadSolution)?; - Err(Status::unauthenticated("Invalid challenge solution")) + Err(UserAgentError::InvalidChallengeSolution) + } + } +} + + +impl Actor for UserAgentActor +where + Transport: Bi>, +{ + type Args = Self; + + type Error = (); + + async fn on_start( + args: Self::Args, + _: kameo::prelude::ActorRef, + ) -> Result { + Ok(args) + } + + async fn next( + &mut self, + _actor_ref: kameo::prelude::WeakActorRef, + mailbox_rx: &mut kameo::prelude::MailboxReceiver, + ) -> Option> { + loop { + select! { + signal = mailbox_rx.recv() => { + return signal; + } + msg = self.transport.recv() => { + match msg { + Some(request) => { + match self.process_transport_inbound(request).await { + Ok(response) => { + if self.transport.send(Ok(response)).await.is_err() { + error!(actor = "useragent", reason = "channel closed", "send.failed"); + return Some(kameo::mailbox::Signal::Stop); + } + } + Err(err) => { + let _ = self.transport.send(Err(err)).await; + return Some(kameo::mailbox::Signal::Stop); + } + } + } + None => { + info!(actor = "useragent", "transport.closed"); + return Some(kameo::mailbox::Signal::Stop); + } + } + } + } + } + } +} + + +impl UserAgentActor>> { + pub fn new_manual(db: db::DatabasePool, actors: GlobalActors) -> Self { + Self { + db, + actors, + state: UserAgentStateMachine::new(DummyContext), + transport: DummyTransport::new(), } } } diff --git a/server/crates/arbiter-server/src/actors/user_agent/transport.rs b/server/crates/arbiter-server/src/actors/user_agent/transport.rs deleted file mode 100644 index c1ac84c..0000000 --- a/server/crates/arbiter-server/src/actors/user_agent/transport.rs +++ /dev/null @@ -1,95 +0,0 @@ -use super::UserAgentActor; -use arbiter_proto::proto::{ - UserAgentRequest, UserAgentResponse, - auth::{ClientMessage as ClientAuthMessage, client_message::Payload as ClientAuthPayload}, - user_agent_request::Payload as UserAgentRequestPayload, -}; -use futures::StreamExt; -use kameo::{ - actor::{ActorRef, Spawn as _}, - error::SendError, -}; -use tokio::sync::mpsc; -use tonic::Status; -use tracing::error; - -use crate::{ - actors::user_agent::{ - HandleAuthChallengeRequest, HandleAuthChallengeSolution, HandleUnsealEncryptedKey, - HandleUnsealRequest, - }, - context::ServerContext, -}; - -pub(crate) async fn handle_user_agent( - context: ServerContext, - mut req_stream: tonic::Streaming, - tx: mpsc::Sender>, -) { - let actor = UserAgentActor::spawn(UserAgentActor::new(context, tx.clone())); - - while let Some(Ok(req)) = req_stream.next().await - && actor.is_alive() - { - match process_message(&actor, req).await { - Ok(resp) => { - if tx.send(Ok(resp)).await.is_err() { - error!(actor = "useragent", "Failed to send response to client"); - break; - } - } - Err(status) => { - let _ = tx.send(Err(status)).await; - break; - } - } - } - - actor.kill(); -} - -async fn process_message( - actor: &ActorRef, - req: UserAgentRequest, -) -> Result { - let msg = req.payload.ok_or_else(|| { - error!(actor = "useragent", "Received message with no payload"); - Status::invalid_argument("Expected message with payload") - })?; - - match msg { - UserAgentRequestPayload::AuthMessage(ClientAuthMessage { - payload: Some(ClientAuthPayload::AuthChallengeRequest(req)), - }) => actor - .ask(HandleAuthChallengeRequest { req }) - .await - .map_err(into_status), - UserAgentRequestPayload::AuthMessage(ClientAuthMessage { - payload: Some(ClientAuthPayload::AuthChallengeSolution(solution)), - }) => actor - .ask(HandleAuthChallengeSolution { solution }) - .await - .map_err(into_status), - UserAgentRequestPayload::UnsealStart(unseal_start) => actor - .ask(HandleUnsealRequest { req: unseal_start }) - .await - .map_err(into_status), - UserAgentRequestPayload::UnsealEncryptedKey(unseal_encrypted_key) => actor - .ask(HandleUnsealEncryptedKey { - req: unseal_encrypted_key, - }) - .await - .map_err(into_status), - _ => Err(Status::invalid_argument("Expected message with payload")), - } -} - -fn into_status(e: SendError) -> Status { - match e { - SendError::HandlerError(status) => status, - _ => { - error!(actor = "useragent", "Failed to send message to actor"); - Status::internal("session failure") - } - } -} diff --git a/server/crates/arbiter-server/src/errors.rs b/server/crates/arbiter-server/src/errors.rs deleted file mode 100644 index 98dae76..0000000 --- a/server/crates/arbiter-server/src/errors.rs +++ /dev/null @@ -1,24 +0,0 @@ -use tonic::Status; -use tracing::error; - -pub trait GrpcStatusExt { - fn to_status(self) -> Result; -} - -impl GrpcStatusExt for Result { - fn to_status(self) -> Result { - self.map_err(|e| { - error!(error = ?e, "Database error"); - Status::internal("Database error") - }) - } -} - -impl GrpcStatusExt for Result { - fn to_status(self) -> Result { - self.map_err(|e| { - error!(error = ?e, "Database pool error"); - Status::internal("Database pool error") - }) - } -} diff --git a/server/crates/arbiter-server/src/lib.rs b/server/crates/arbiter-server/src/lib.rs index 9d86e27..77e03f2 100644 --- a/server/crates/arbiter-server/src/lib.rs +++ b/server/crates/arbiter-server/src/lib.rs @@ -1,26 +1,91 @@ #![forbid(unsafe_code)] use arbiter_proto::{ proto::{ClientRequest, ClientResponse, UserAgentRequest, UserAgentResponse}, - transport::BiStream, + transport::{GrpcAdapter, IdentityConverter, ProtocolConverter}, }; use async_trait::async_trait; +use kameo::actor::Spawn; use tokio_stream::wrappers::ReceiverStream; use tokio::sync::mpsc; use tonic::{Request, Response, Status}; +use tracing::info; use crate::{ - actors::{client::handle_client, user_agent::handle_user_agent}, + actors::user_agent::{UserAgentActor, UserAgentError}, context::ServerContext, }; pub mod actors; pub mod context; pub mod db; -mod errors; const DEFAULT_CHANNEL_SIZE: usize = 1000; + +/// Converts User Agent domain outbounds into the tonic stream item emitted by +/// the server. +/// +/// The conversion is defined at the server boundary so the actor module remains +/// focused on domain semantics and does not depend on tonic status encoding. +struct UserAgentGrpcConverter; + +impl ProtocolConverter for UserAgentGrpcConverter { + type Domain = Result; + type Transport = Result; + + fn convert(&self, item: Self::Domain) -> Self::Transport { + match item { + Ok(message) => Ok(message), + Err(err) => Err(user_agent_error_status(err)), + } + } +} + +/// Maps User Agent domain errors to public gRPC transport errors for the +/// `user_agent` streaming endpoint. +fn user_agent_error_status(value: UserAgentError) -> Status { + match value { + UserAgentError::MissingRequestPayload | UserAgentError::UnexpectedRequestPayload => { + Status::invalid_argument("Expected message with payload") + } + UserAgentError::InvalidStateForChallengeSolution => { + Status::invalid_argument("Invalid state for challenge solution") + } + UserAgentError::InvalidStateForUnsealEncryptedKey => { + Status::failed_precondition("Invalid state for unseal encrypted key") + } + UserAgentError::InvalidClientPubkeyLength => { + Status::invalid_argument("client_pubkey must be 32 bytes") + } + UserAgentError::InvalidAuthPubkeyLength => { + Status::invalid_argument("Expected pubkey to have specific length") + } + UserAgentError::InvalidAuthPubkeyEncoding => { + Status::invalid_argument("Failed to convert pubkey to VerifyingKey") + } + UserAgentError::InvalidSignatureLength => { + Status::invalid_argument("Invalid signature length") + } + UserAgentError::InvalidBootstrapToken => { + Status::invalid_argument("Invalid bootstrap token") + } + UserAgentError::PublicKeyNotRegistered => { + Status::unauthenticated("Public key not registered") + } + UserAgentError::InvalidChallengeSolution => { + Status::unauthenticated("Invalid challenge solution") + } + UserAgentError::StateTransitionFailed => Status::internal("State machine error"), + UserAgentError::BootstrapperActorUnreachable => { + Status::internal("Bootstrap token consumption failed") + } + UserAgentError::KeyHolderActorUnreachable => Status::internal("Vault is not available"), + UserAgentError::DatabasePoolUnavailable => Status::internal("Database pool error"), + UserAgentError::DatabaseOperationFailed => Status::internal("Database error"), + } +} + pub struct Server { context: ServerContext, } @@ -38,28 +103,29 @@ impl arbiter_proto::proto::arbiter_service_server::ArbiterService for Server { async fn client( &self, - request: Request>, + _request: Request>, ) -> Result, Status> { - let req_stream = request.into_inner(); - let (tx, rx) = mpsc::channel(DEFAULT_CHANNEL_SIZE); - tokio::spawn(handle_client( - self.context.clone(), - BiStream { - request_stream: req_stream, - response_sender: tx, - }, - )); - - Ok(Response::new(ReceiverStream::new(rx))) + todo!() } + #[tracing::instrument(level = "debug", skip(self))] async fn user_agent( &self, request: Request>, ) -> Result, Status> { let req_stream = request.into_inner(); let (tx, rx) = mpsc::channel(DEFAULT_CHANNEL_SIZE); - tokio::spawn(handle_user_agent(self.context.clone(), req_stream, tx)); + + let transport = GrpcAdapter::new( + tx, + req_stream, + IdentityConverter::::new(), + UserAgentGrpcConverter, + ); + UserAgentActor::spawn(UserAgentActor::new(self.context.clone(), transport)); + + info!(event = "connection established", "grpc.user_agent"); + Ok(Response::new(ReceiverStream::new(rx))) } } diff --git a/server/crates/arbiter-server/tests/user_agent/auth.rs b/server/crates/arbiter-server/tests/user_agent/auth.rs index c79d616..d40efb4 100644 --- a/server/crates/arbiter-server/tests/user_agent/auth.rs +++ b/server/crates/arbiter-server/tests/user_agent/auth.rs @@ -1,20 +1,29 @@ use arbiter_proto::proto::{ UserAgentResponse, - auth::{self, AuthChallengeRequest, AuthOk}, + UserAgentRequest, + auth::{self, AuthChallengeRequest, AuthOk, ClientMessage, client_message::Payload as ClientAuthPayload}, + user_agent_request::Payload as UserAgentRequestPayload, user_agent_response::Payload as UserAgentResponsePayload, }; use arbiter_server::{ actors::{ GlobalActors, bootstrap::GetToken, - user_agent::{HandleAuthChallengeRequest, HandleAuthChallengeSolution, UserAgentActor}, + user_agent::{UserAgentActor, UserAgentError}, }, db::{self, schema}, }; use diesel::{ExpressionMethods as _, QueryDsl, insert_into}; use diesel_async::RunQueryDsl; use ed25519_dalek::Signer as _; -use kameo::actor::Spawn; + +fn auth_request(payload: ClientAuthPayload) -> UserAgentRequest { + UserAgentRequest { + payload: Some(UserAgentRequestPayload::AuthMessage(ClientMessage { + payload: Some(payload), + })), + } +} #[tokio::test] #[test_log::test] @@ -23,22 +32,20 @@ pub async fn test_bootstrap_token_auth() { let actors = GlobalActors::spawn(db.clone()).await.unwrap(); let token = actors.bootstrapper.ask(GetToken).await.unwrap().unwrap(); - let user_agent = - UserAgentActor::new_manual(db.clone(), actors, tokio::sync::mpsc::channel(1).0); - let user_agent_ref = UserAgentActor::spawn(user_agent); + let mut user_agent = UserAgentActor::new_manual(db.clone(), actors); let new_key = ed25519_dalek::SigningKey::generate(&mut rand::rng()); let pubkey_bytes = new_key.verifying_key().to_bytes().to_vec(); - let result = user_agent_ref - .ask(HandleAuthChallengeRequest { - req: AuthChallengeRequest { + let result = user_agent + .process_transport_inbound(auth_request(ClientAuthPayload::AuthChallengeRequest( + AuthChallengeRequest { pubkey: pubkey_bytes, bootstrap_token: Some(token), }, - }) + ))) .await - .expect("Shouldn't fail to send message"); + .expect("Shouldn't fail to process message"); assert_eq!( result, @@ -68,35 +75,23 @@ pub async fn test_bootstrap_invalid_token_auth() { let db = db::create_test_pool().await; let actors = GlobalActors::spawn(db.clone()).await.unwrap(); - let user_agent = - UserAgentActor::new_manual(db.clone(), actors, tokio::sync::mpsc::channel(1).0); - let user_agent_ref = UserAgentActor::spawn(user_agent); + let mut user_agent = UserAgentActor::new_manual(db.clone(), actors); let new_key = ed25519_dalek::SigningKey::generate(&mut rand::rng()); let pubkey_bytes = new_key.verifying_key().to_bytes().to_vec(); - let result = user_agent_ref - .ask(HandleAuthChallengeRequest { - req: AuthChallengeRequest { + let result = user_agent + .process_transport_inbound(auth_request(ClientAuthPayload::AuthChallengeRequest( + AuthChallengeRequest { pubkey: pubkey_bytes, bootstrap_token: Some("invalid_token".to_string()), }, - }) + ))) .await; match result { - Err(kameo::error::SendError::HandlerError(status)) => { - assert_eq!(status.code(), tonic::Code::InvalidArgument); - insta::assert_debug_snapshot!(status, @r#" - Status { - code: InvalidArgument, - message: "Invalid bootstrap token", - source: None, - } - "#); - } - Err(other) => { - panic!("Expected SendError::HandlerError, got {other:?}"); + Err(err) => { + assert_eq!(err, UserAgentError::InvalidBootstrapToken); } Ok(_) => { panic!("Expected error due to invalid bootstrap token, but got success"); @@ -110,9 +105,7 @@ pub async fn test_challenge_auth() { let db = db::create_test_pool().await; let actors = GlobalActors::spawn(db.clone()).await.unwrap(); - let user_agent = - UserAgentActor::new_manual(db.clone(), actors, tokio::sync::mpsc::channel(1).0); - let user_agent_ref = UserAgentActor::spawn(user_agent); + let mut user_agent = UserAgentActor::new_manual(db.clone(), actors); let new_key = ed25519_dalek::SigningKey::generate(&mut rand::rng()); let pubkey_bytes = new_key.verifying_key().to_bytes().to_vec(); @@ -126,15 +119,15 @@ pub async fn test_challenge_auth() { .unwrap(); } - let result = user_agent_ref - .ask(HandleAuthChallengeRequest { - req: AuthChallengeRequest { + let result = user_agent + .process_transport_inbound(auth_request(ClientAuthPayload::AuthChallengeRequest( + AuthChallengeRequest { pubkey: pubkey_bytes, bootstrap_token: None, }, - }) + ))) .await - .expect("Shouldn't fail to send message"); + .expect("Shouldn't fail to process message"); let UserAgentResponse { payload: @@ -151,14 +144,14 @@ pub async fn test_challenge_auth() { let signature = new_key.sign(&formatted_challenge); let serialized_signature = signature.to_bytes().to_vec(); - let result = user_agent_ref - .ask(HandleAuthChallengeSolution { - solution: auth::AuthChallengeSolution { + let result = user_agent + .process_transport_inbound(auth_request(ClientAuthPayload::AuthChallengeSolution( + auth::AuthChallengeSolution { signature: serialized_signature, }, - }) + ))) .await - .expect("Shouldn't fail to send message"); + .expect("Shouldn't fail to process message"); assert_eq!( result, diff --git a/server/crates/arbiter-server/tests/user_agent/unseal.rs b/server/crates/arbiter-server/tests/user_agent/unseal.rs index 9a7c85f..2cb46f6 100644 --- a/server/crates/arbiter-server/tests/user_agent/unseal.rs +++ b/server/crates/arbiter-server/tests/user_agent/unseal.rs @@ -1,27 +1,52 @@ use arbiter_proto::proto::{ - UnsealEncryptedKey, UnsealResult, UnsealStart, auth::AuthChallengeRequest, + UnsealEncryptedKey, UnsealResult, UnsealStart, UserAgentRequest, UserAgentResponse, + auth::{AuthChallengeRequest, ClientMessage, client_message::Payload as ClientAuthPayload}, + user_agent_request::Payload as UserAgentRequestPayload, user_agent_response::Payload as UserAgentResponsePayload, }; +use arbiter_proto::transport::DummyTransport; use arbiter_server::{ actors::{ GlobalActors, bootstrap::GetToken, keyholder::{Bootstrap, Seal}, - user_agent::{ - HandleAuthChallengeRequest, HandleUnsealEncryptedKey, HandleUnsealRequest, - UserAgentActor, - }, + user_agent::{UserAgentActor, UserAgentError}, }, db, }; use chacha20poly1305::{AeadInPlace, XChaCha20Poly1305, XNonce, aead::KeyInit}; -use kameo::actor::{ActorRef, Spawn}; use memsafe::MemSafe; use x25519_dalek::{EphemeralSecret, PublicKey}; +type TestUserAgent = + UserAgentActor>>; + +fn auth_request(payload: ClientAuthPayload) -> UserAgentRequest { + UserAgentRequest { + payload: Some(UserAgentRequestPayload::AuthMessage(ClientMessage { + payload: Some(payload), + })), + } +} + +fn unseal_start_request(req: UnsealStart) -> UserAgentRequest { + UserAgentRequest { + payload: Some(UserAgentRequestPayload::UnsealStart(req)), + } +} + +fn unseal_key_request(req: UnsealEncryptedKey) -> UserAgentRequest { + UserAgentRequest { + payload: Some(UserAgentRequestPayload::UnsealEncryptedKey(req)), + } +} + async fn setup_authenticated_user_agent( seal_key: &[u8], -) -> (arbiter_server::db::DatabasePool, ActorRef) { +) -> ( + arbiter_server::db::DatabasePool, + TestUserAgent, +) { let db = db::create_test_pool().await; let actors = GlobalActors::spawn(db.clone()).await.unwrap(); @@ -34,38 +59,34 @@ async fn setup_authenticated_user_agent( .unwrap(); actors.key_holder.ask(Seal).await.unwrap(); - let user_agent = - UserAgentActor::new_manual(db.clone(), actors.clone(), tokio::sync::mpsc::channel(1).0); - let user_agent_ref = UserAgentActor::spawn(user_agent); + let mut user_agent = UserAgentActor::new_manual(db.clone(), actors.clone()); let token = actors.bootstrapper.ask(GetToken).await.unwrap().unwrap(); let auth_key = ed25519_dalek::SigningKey::generate(&mut rand::rng()); - user_agent_ref - .ask(HandleAuthChallengeRequest { - req: AuthChallengeRequest { + user_agent + .process_transport_inbound(auth_request(ClientAuthPayload::AuthChallengeRequest( + AuthChallengeRequest { pubkey: auth_key.verifying_key().to_bytes().to_vec(), bootstrap_token: Some(token), }, - }) + ))) .await .unwrap(); - (db, user_agent_ref) + (db, user_agent) } async fn client_dh_encrypt( - user_agent_ref: &ActorRef, + user_agent: &mut TestUserAgent, key_to_send: &[u8], ) -> UnsealEncryptedKey { let client_secret = EphemeralSecret::random(); let client_public = PublicKey::from(&client_secret); - let response = user_agent_ref - .ask(HandleUnsealRequest { - req: UnsealStart { - client_pubkey: client_public.as_bytes().to_vec(), - }, - }) + let response = user_agent + .process_transport_inbound(unseal_start_request(UnsealStart { + client_pubkey: client_public.as_bytes().to_vec(), + })) .await .unwrap(); @@ -95,12 +116,12 @@ async fn client_dh_encrypt( #[test_log::test] pub async fn test_unseal_success() { let seal_key = b"test-seal-key"; - let (_db, user_agent_ref) = setup_authenticated_user_agent(seal_key).await; + let (_db, mut user_agent) = setup_authenticated_user_agent(seal_key).await; - let encrypted_key = client_dh_encrypt(&user_agent_ref, seal_key).await; + let encrypted_key = client_dh_encrypt(&mut user_agent, seal_key).await; - let response = user_agent_ref - .ask(HandleUnsealEncryptedKey { req: encrypted_key }) + let response = user_agent + .process_transport_inbound(unseal_key_request(encrypted_key)) .await .unwrap(); @@ -113,12 +134,12 @@ pub async fn test_unseal_success() { #[tokio::test] #[test_log::test] pub async fn test_unseal_wrong_seal_key() { - let (_db, user_agent_ref) = setup_authenticated_user_agent(b"correct-key").await; + let (_db, mut user_agent) = setup_authenticated_user_agent(b"correct-key").await; - let encrypted_key = client_dh_encrypt(&user_agent_ref, b"wrong-key").await; + let encrypted_key = client_dh_encrypt(&mut user_agent, b"wrong-key").await; - let response = user_agent_ref - .ask(HandleUnsealEncryptedKey { req: encrypted_key }) + let response = user_agent + .process_transport_inbound(unseal_key_request(encrypted_key)) .await .unwrap(); @@ -131,28 +152,24 @@ pub async fn test_unseal_wrong_seal_key() { #[tokio::test] #[test_log::test] pub async fn test_unseal_corrupted_ciphertext() { - let (_db, user_agent_ref) = setup_authenticated_user_agent(b"test-key").await; + let (_db, mut user_agent) = setup_authenticated_user_agent(b"test-key").await; let client_secret = EphemeralSecret::random(); let client_public = PublicKey::from(&client_secret); - user_agent_ref - .ask(HandleUnsealRequest { - req: UnsealStart { - client_pubkey: client_public.as_bytes().to_vec(), - }, - }) + user_agent + .process_transport_inbound(unseal_start_request(UnsealStart { + client_pubkey: client_public.as_bytes().to_vec(), + })) .await .unwrap(); - let response = user_agent_ref - .ask(HandleUnsealEncryptedKey { - req: UnsealEncryptedKey { - nonce: vec![0u8; 24], - ciphertext: vec![0u8; 32], - associated_data: vec![], - }, - }) + let response = user_agent + .process_transport_inbound(unseal_key_request(UnsealEncryptedKey { + nonce: vec![0u8; 24], + ciphertext: vec![0u8; 32], + associated_data: vec![], + })) .await .unwrap(); @@ -168,24 +185,20 @@ pub async fn test_unseal_start_without_auth_fails() { let db = db::create_test_pool().await; let actors = GlobalActors::spawn(db.clone()).await.unwrap(); - let user_agent = - UserAgentActor::new_manual(db.clone(), actors, tokio::sync::mpsc::channel(1).0); - let user_agent_ref = UserAgentActor::spawn(user_agent); + let mut user_agent = UserAgentActor::new_manual(db.clone(), actors); let client_secret = EphemeralSecret::random(); let client_public = PublicKey::from(&client_secret); - let result = user_agent_ref - .ask(HandleUnsealRequest { - req: UnsealStart { - client_pubkey: client_public.as_bytes().to_vec(), - }, - }) + let result = user_agent + .process_transport_inbound(unseal_start_request(UnsealStart { + client_pubkey: client_public.as_bytes().to_vec(), + })) .await; match result { - Err(kameo::error::SendError::HandlerError(status)) => { - assert_eq!(status.code(), tonic::Code::Internal); + Err(err) => { + assert_eq!(err, UserAgentError::StateTransitionFailed); } other => panic!("Expected state machine error, got {other:?}"), } @@ -195,13 +208,13 @@ pub async fn test_unseal_start_without_auth_fails() { #[test_log::test] pub async fn test_unseal_retry_after_invalid_key() { let seal_key = b"real-seal-key"; - let (_db, user_agent_ref) = setup_authenticated_user_agent(seal_key).await; + let (_db, mut user_agent) = setup_authenticated_user_agent(seal_key).await; { - let encrypted_key = client_dh_encrypt(&user_agent_ref, b"wrong-key").await; + let encrypted_key = client_dh_encrypt(&mut user_agent, b"wrong-key").await; - let response = user_agent_ref - .ask(HandleUnsealEncryptedKey { req: encrypted_key }) + let response = user_agent + .process_transport_inbound(unseal_key_request(encrypted_key)) .await .unwrap(); @@ -212,10 +225,10 @@ pub async fn test_unseal_retry_after_invalid_key() { } { - let encrypted_key = client_dh_encrypt(&user_agent_ref, seal_key).await; + let encrypted_key = client_dh_encrypt(&mut user_agent, seal_key).await; - let response = user_agent_ref - .ask(HandleUnsealEncryptedKey { req: encrypted_key }) + let response = user_agent + .process_transport_inbound(unseal_key_request(encrypted_key)) .await .unwrap();