diff --git a/server/crates/arbiter-server/src/grpc/operator.rs b/server/crates/arbiter-server/src/grpc/operator.rs index cdd4e80..aef89a6 100644 --- a/server/crates/arbiter-server/src/grpc/operator.rs +++ b/server/crates/arbiter-server/src/grpc/operator.rs @@ -19,6 +19,7 @@ use tracing::{error, info, warn}; mod auth; mod evm; +mod governance; mod inbound; mod outbound; mod sdk_client; @@ -115,6 +116,7 @@ async fn dispatch_inner( warn!("Unsupported post-auth operator auth request"); Err(Status::invalid_argument("Unsupported operator request")) } + OperatorRequestPayload::Governance(req) => governance::dispatch(actor, req).await, } } diff --git a/server/crates/arbiter-server/src/grpc/operator/governance.rs b/server/crates/arbiter-server/src/grpc/operator/governance.rs new file mode 100644 index 0000000..f993c05 --- /dev/null +++ b/server/crates/arbiter-server/src/grpc/operator/governance.rs @@ -0,0 +1,126 @@ +use crate::{ + actors::proposal_manager::{Error as ProposalError, ProposalKind, VoteOutcome}, + peers::operator::{ + OperatorSession, + session::handlers::{HandleCastVote, HandleCreateProposal, HandleQueryPending}, + }, +}; +use arbiter_proto::proto::operator::{ + governance::{ + self as proto_gov, CreateProposalRequest, QueryPendingRequest, QueryPendingResponse, + VoteOutcome as ProtoVoteOutcome, create_proposal_request::Kind as ProtoKind, + request::Payload as GovRequestPayload, response::Payload as GovResponsePayload, + }, + operator_response::Payload as OperatorResponsePayload, +}; +use kameo::actor::ActorRef; +use tonic::Status; +use tracing::warn; + +const fn wrap(payload: GovResponsePayload) -> OperatorResponsePayload { + OperatorResponsePayload::Governance(proto_gov::Response { + payload: Some(payload), + }) +} + +pub(super) async fn dispatch( + actor: &ActorRef, + req: proto_gov::Request, +) -> Result, Status> { + let Some(payload) = req.payload else { + return Err(Status::invalid_argument( + "Missing governance request payload", + )); + }; + + match payload { + GovRequestPayload::Create(req) => handle_create(actor, req).await, + GovRequestPayload::Vote(req) => handle_vote(actor, req).await, + GovRequestPayload::Query(QueryPendingRequest {}) => handle_query(actor).await, + } +} + +async fn handle_create( + actor: &ActorRef, + req: CreateProposalRequest, +) -> Result, Status> { + let kind = match req.kind { + Some(ProtoKind::ApproveSdkClient(p)) => ProposalKind::ApproveSdkClient { + client_id: p.client_id, + }, + None => return Err(Status::invalid_argument("Missing proposal kind")), + }; + let ttl_secs = req.ttl_secs.map(i64::from); + + let proposal_id = actor + .ask(HandleCreateProposal { kind, ttl_secs }) + .await + .map_err(|e| { + warn!(?e, "create_proposal failed"); + Status::internal("Failed to create proposal") + })?; + + Ok(Some(wrap(GovResponsePayload::Created( + proto_gov::CreateProposalResponse { proposal_id }, + )))) +} + +async fn handle_vote( + actor: &ActorRef, + req: proto_gov::CastVoteRequest, +) -> Result, Status> { + let result = actor + .ask(HandleCastVote { + proposal_id: req.proposal_id, + approve: req.approve, + signature: req.signature, + }) + .await; + + let outcome = match result { + Ok(VoteOutcome::Pending) => ProtoVoteOutcome::Pending, + Ok(VoteOutcome::QuorumApproved) => ProtoVoteOutcome::Approved, + Ok(VoteOutcome::QuorumRejected) => ProtoVoteOutcome::Rejected, + Err(kameo::error::SendError::HandlerError(ProposalError::AlreadyVoted)) => { + return Err(Status::invalid_argument("Already voted on this proposal")); + } + Err(kameo::error::SendError::HandlerError(ProposalError::InvalidSignature)) => { + return Err(Status::invalid_argument("Invalid vote signature")); + } + Err(kameo::error::SendError::HandlerError(ProposalError::ProposalNotFound)) => { + return Err(Status::not_found("Proposal not found")); + } + Err(e) => { + warn!(?e, "cast_vote failed"); + return Err(Status::internal("Failed to cast vote")); + } + }; + + Ok(Some(wrap(GovResponsePayload::Voted( + proto_gov::VoteResponse { + outcome: outcome.into(), + }, + )))) +} + +async fn handle_query( + actor: &ActorRef, +) -> Result, Status> { + let summaries = actor.ask(HandleQueryPending {}).await.unwrap_or_default(); + + let proposals = summaries + .into_iter() + .map(|s| proto_gov::ProposalSummary { + id: s.id, + kind: s.kind, + initiator_id: s.initiator_id, + expires_at: s.expires_at.0.timestamp(), + approve_count: s.approve_count, + reject_count: s.reject_count, + }) + .collect(); + + Ok(Some(wrap(GovResponsePayload::Pending( + QueryPendingResponse { proposals }, + )))) +} diff --git a/server/crates/arbiter-server/src/peers/operator/mod.rs b/server/crates/arbiter-server/src/peers/operator/mod.rs index 0869d51..fe532bc 100644 --- a/server/crates/arbiter-server/src/peers/operator/mod.rs +++ b/server/crates/arbiter-server/src/peers/operator/mod.rs @@ -180,6 +180,7 @@ where Ok(OperatorSession::spawn(OperatorSession::new( props.clone(), + creds.clone(), oob_sender, ))) } diff --git a/server/crates/arbiter-server/src/peers/operator/session/handlers.rs b/server/crates/arbiter-server/src/peers/operator/session/handlers.rs index f2b9b3e..426561d 100644 --- a/server/crates/arbiter-server/src/peers/operator/session/handlers.rs +++ b/server/crates/arbiter-server/src/peers/operator/session/handlers.rs @@ -279,3 +279,59 @@ impl OperatorSession { Ok(clients) } } + +#[messages] +impl OperatorSession { + #[message] + pub(crate) async fn handle_create_proposal( + &mut self, + kind: crate::actors::proposal_manager::ProposalKind, + ttl_secs: Option, + ) -> Result { + use crate::actors::proposal_manager::CreateProposal; + let initiator_id = self.credentials.id; + self.props + .actors + .proposal_manager + .ask(CreateProposal { kind, initiator_id, ttl_secs }) + .await + .map_err(|e| { + error!(?e, "create_proposal failed"); + Error::internal("Failed to create proposal") + }) + } + + #[message] + pub(crate) async fn handle_cast_vote( + &mut self, + proposal_id: i32, + approve: bool, + signature: Vec, + ) -> Result { + use crate::actors::proposal_manager::CastVote; + let operator_id = self.credentials.id; + self.props + .actors + .proposal_manager + .ask(CastVote { proposal_id, operator_id, approve, signature }) + .await + .map_err(|err| match err { + SendError::HandlerError(e) => e, + _ => crate::actors::proposal_manager::Error::ExecutionFailed("actor unavailable".to_owned()), + }) + } + + #[message] + pub(crate) async fn handle_query_pending( + &mut self, + ) -> Vec { + use crate::actors::proposal_manager::QueryPending; + let operator_id = self.credentials.id; + self.props + .actors + .proposal_manager + .ask(QueryPending { operator_id }) + .await + .unwrap_or_default() + } +} diff --git a/server/crates/arbiter-server/src/peers/operator/session/mod.rs b/server/crates/arbiter-server/src/peers/operator/session/mod.rs index 0fe2c84..083f106 100644 --- a/server/crates/arbiter-server/src/peers/operator/session/mod.rs +++ b/server/crates/arbiter-server/src/peers/operator/session/mod.rs @@ -1,4 +1,4 @@ -use super::{OutOfBand, OperatorConnection}; +use super::{Credentials, OutOfBand, OperatorConnection}; use crate::{ actors::{ flow_coordinator::client_connect_approval::ClientApprovalController, @@ -51,6 +51,7 @@ pub struct PendingClientApproval { pub struct OperatorSession { props: OperatorConnection, + credentials: Credentials, sender: Box>, pending_client_approvals: HashMap, PendingClientApproval>, @@ -59,9 +60,10 @@ pub struct OperatorSession { pub mod handlers; impl OperatorSession { - pub(crate) fn new(props: OperatorConnection, sender: Box>) -> Self { + pub(crate) fn new(props: OperatorConnection, credentials: Credentials, sender: Box>) -> Self { Self { props, + credentials, sender, pending_client_approvals: HashMap::default(), }