refactor: consolidate auth messages into client and user_agent packages
Some checks failed
ci/woodpecker/pr/server-lint Pipeline failed
ci/woodpecker/pr/server-audit Pipeline was successful
ci/woodpecker/pr/server-vet Pipeline failed
ci/woodpecker/pr/server-test Pipeline was successful
ci/woodpecker/push/server-lint Pipeline failed
ci/woodpecker/push/server-audit Pipeline was successful
ci/woodpecker/push/server-vet Pipeline failed
ci/woodpecker/push/server-test Pipeline was successful

This commit was merged in pull request #23.
This commit is contained in:
hdbg
2026-03-01 11:44:34 +01:00
parent 3cc63474a8
commit 4169b2ba42
19 changed files with 686 additions and 264 deletions

View File

@@ -1,6 +1,9 @@
#![forbid(unsafe_code)]
use arbiter_proto::{
proto::{ClientRequest, ClientResponse, UserAgentRequest, UserAgentResponse},
proto::{
client::{ClientRequest, ClientResponse},
user_agent::{UserAgentRequest, UserAgentResponse},
},
transport::{IdentityRecvConverter, SendConverter, grpc},
};
use async_trait::async_trait;
@@ -12,7 +15,10 @@ use tonic::{Request, Response, Status};
use tracing::info;
use crate::{
actors::user_agent::{UserAgentActor, UserAgentError},
actors::{
client::{ClientActor, ClientError},
user_agent::{UserAgentActor, UserAgentError},
},
context::ServerContext,
};
@@ -41,6 +47,56 @@ impl SendConverter for UserAgentGrpcSender {
}
}
/// Converts Client domain outbounds into the tonic stream item emitted by the
/// server.
///
/// The conversion is defined at the server boundary so the actor module remains
/// focused on domain semantics and does not depend on tonic status encoding.
struct ClientGrpcSender;
impl SendConverter for ClientGrpcSender {
type Input = Result<ClientResponse, ClientError>;
type Output = Result<ClientResponse, Status>;
fn convert(&self, item: Self::Input) -> Self::Output {
match item {
Ok(message) => Ok(message),
Err(err) => Err(client_error_status(err)),
}
}
}
/// Maps Client domain errors to public gRPC transport errors for the `client`
/// streaming endpoint.
fn client_error_status(value: ClientError) -> Status {
match value {
ClientError::MissingRequestPayload | ClientError::UnexpectedRequestPayload => {
Status::invalid_argument("Expected message with payload")
}
ClientError::InvalidStateForChallengeSolution => {
Status::invalid_argument("Invalid state for challenge solution")
}
ClientError::InvalidAuthPubkeyLength => {
Status::invalid_argument("Expected pubkey to have specific length")
}
ClientError::InvalidAuthPubkeyEncoding => {
Status::invalid_argument("Failed to convert pubkey to VerifyingKey")
}
ClientError::InvalidSignatureLength => {
Status::invalid_argument("Invalid signature length")
}
ClientError::PublicKeyNotRegistered => {
Status::unauthenticated("Public key not registered")
}
ClientError::InvalidChallengeSolution => {
Status::unauthenticated("Invalid challenge solution")
}
ClientError::StateTransitionFailed => Status::internal("State machine error"),
ClientError::DatabasePoolUnavailable => Status::internal("Database pool error"),
ClientError::DatabaseOperationFailed => Status::internal("Database error"),
}
}
/// Maps User Agent domain errors to public gRPC transport errors for the
/// `user_agent` streaming endpoint.
fn user_agent_error_status(value: UserAgentError) -> Status {
@@ -100,11 +156,25 @@ impl arbiter_proto::proto::arbiter_service_server::ArbiterService for Server {
type UserAgentStream = ReceiverStream<Result<UserAgentResponse, Status>>;
type ClientStream = ReceiverStream<Result<ClientResponse, Status>>;
#[tracing::instrument(level = "debug", skip(self))]
async fn client(
&self,
_request: Request<tonic::Streaming<ClientRequest>>,
request: Request<tonic::Streaming<ClientRequest>>,
) -> Result<Response<Self::ClientStream>, Status> {
todo!()
let req_stream = request.into_inner();
let (tx, rx) = mpsc::channel(DEFAULT_CHANNEL_SIZE);
let transport = grpc::GrpcAdapter::new(
tx,
req_stream,
IdentityRecvConverter::<ClientRequest>::new(),
ClientGrpcSender,
);
ClientActor::spawn(ClientActor::new(self.context.clone(), transport));
info!(event = "connection established", "grpc.client");
Ok(Response::new(ReceiverStream::new(rx)))
}
#[tracing::instrument(level = "debug", skip(self))]