feat(server::grpc): wire governance RPCs through operator session

This commit is contained in:
CleverWild
2026-06-13 15:06:51 +02:00
parent 8517b981f2
commit f981ddeb79
5 changed files with 189 additions and 2 deletions

View File

@@ -19,6 +19,7 @@ use tracing::{error, info, warn};
mod auth; mod auth;
mod evm; mod evm;
mod governance;
mod inbound; mod inbound;
mod outbound; mod outbound;
mod sdk_client; mod sdk_client;
@@ -115,6 +116,7 @@ async fn dispatch_inner(
warn!("Unsupported post-auth operator auth request"); warn!("Unsupported post-auth operator auth request");
Err(Status::invalid_argument("Unsupported operator request")) Err(Status::invalid_argument("Unsupported operator request"))
} }
OperatorRequestPayload::Governance(req) => governance::dispatch(actor, req).await,
} }
} }

View File

@@ -0,0 +1,126 @@
use crate::{
actors::proposal_manager::{Error as ProposalError, ProposalKind, VoteOutcome},
peers::operator::{
OperatorSession,
session::handlers::{HandleCastVote, HandleCreateProposal, HandleQueryPending},
},
};
use arbiter_proto::proto::operator::{
governance::{
self as proto_gov, CreateProposalRequest, QueryPendingRequest, QueryPendingResponse,
VoteOutcome as ProtoVoteOutcome, create_proposal_request::Kind as ProtoKind,
request::Payload as GovRequestPayload, response::Payload as GovResponsePayload,
},
operator_response::Payload as OperatorResponsePayload,
};
use kameo::actor::ActorRef;
use tonic::Status;
use tracing::warn;
const fn wrap(payload: GovResponsePayload) -> OperatorResponsePayload {
OperatorResponsePayload::Governance(proto_gov::Response {
payload: Some(payload),
})
}
pub(super) async fn dispatch(
actor: &ActorRef<OperatorSession>,
req: proto_gov::Request,
) -> Result<Option<OperatorResponsePayload>, Status> {
let Some(payload) = req.payload else {
return Err(Status::invalid_argument(
"Missing governance request payload",
));
};
match payload {
GovRequestPayload::Create(req) => handle_create(actor, req).await,
GovRequestPayload::Vote(req) => handle_vote(actor, req).await,
GovRequestPayload::Query(QueryPendingRequest {}) => handle_query(actor).await,
}
}
async fn handle_create(
actor: &ActorRef<OperatorSession>,
req: CreateProposalRequest,
) -> Result<Option<OperatorResponsePayload>, Status> {
let kind = match req.kind {
Some(ProtoKind::ApproveSdkClient(p)) => ProposalKind::ApproveSdkClient {
client_id: p.client_id,
},
None => return Err(Status::invalid_argument("Missing proposal kind")),
};
let ttl_secs = req.ttl_secs.map(i64::from);
let proposal_id = actor
.ask(HandleCreateProposal { kind, ttl_secs })
.await
.map_err(|e| {
warn!(?e, "create_proposal failed");
Status::internal("Failed to create proposal")
})?;
Ok(Some(wrap(GovResponsePayload::Created(
proto_gov::CreateProposalResponse { proposal_id },
))))
}
async fn handle_vote(
actor: &ActorRef<OperatorSession>,
req: proto_gov::CastVoteRequest,
) -> Result<Option<OperatorResponsePayload>, Status> {
let result = actor
.ask(HandleCastVote {
proposal_id: req.proposal_id,
approve: req.approve,
signature: req.signature,
})
.await;
let outcome = match result {
Ok(VoteOutcome::Pending) => ProtoVoteOutcome::Pending,
Ok(VoteOutcome::QuorumApproved) => ProtoVoteOutcome::Approved,
Ok(VoteOutcome::QuorumRejected) => ProtoVoteOutcome::Rejected,
Err(kameo::error::SendError::HandlerError(ProposalError::AlreadyVoted)) => {
return Err(Status::invalid_argument("Already voted on this proposal"));
}
Err(kameo::error::SendError::HandlerError(ProposalError::InvalidSignature)) => {
return Err(Status::invalid_argument("Invalid vote signature"));
}
Err(kameo::error::SendError::HandlerError(ProposalError::ProposalNotFound)) => {
return Err(Status::not_found("Proposal not found"));
}
Err(e) => {
warn!(?e, "cast_vote failed");
return Err(Status::internal("Failed to cast vote"));
}
};
Ok(Some(wrap(GovResponsePayload::Voted(
proto_gov::VoteResponse {
outcome: outcome.into(),
},
))))
}
async fn handle_query(
actor: &ActorRef<OperatorSession>,
) -> Result<Option<OperatorResponsePayload>, Status> {
let summaries = actor.ask(HandleQueryPending {}).await.unwrap_or_default();
let proposals = summaries
.into_iter()
.map(|s| proto_gov::ProposalSummary {
id: s.id,
kind: s.kind,
initiator_id: s.initiator_id,
expires_at: s.expires_at.0.timestamp(),
approve_count: s.approve_count,
reject_count: s.reject_count,
})
.collect();
Ok(Some(wrap(GovResponsePayload::Pending(
QueryPendingResponse { proposals },
))))
}

View File

@@ -180,6 +180,7 @@ where
Ok(OperatorSession::spawn(OperatorSession::new( Ok(OperatorSession::spawn(OperatorSession::new(
props.clone(), props.clone(),
creds.clone(),
oob_sender, oob_sender,
))) )))
} }

View File

@@ -279,3 +279,59 @@ impl OperatorSession {
Ok(clients) Ok(clients)
} }
} }
#[messages]
impl OperatorSession {
#[message]
pub(crate) async fn handle_create_proposal(
&mut self,
kind: crate::actors::proposal_manager::ProposalKind,
ttl_secs: Option<i64>,
) -> Result<i32, Error> {
use crate::actors::proposal_manager::CreateProposal;
let initiator_id = self.credentials.id;
self.props
.actors
.proposal_manager
.ask(CreateProposal { kind, initiator_id, ttl_secs })
.await
.map_err(|e| {
error!(?e, "create_proposal failed");
Error::internal("Failed to create proposal")
})
}
#[message]
pub(crate) async fn handle_cast_vote(
&mut self,
proposal_id: i32,
approve: bool,
signature: Vec<u8>,
) -> Result<crate::actors::proposal_manager::VoteOutcome, crate::actors::proposal_manager::Error> {
use crate::actors::proposal_manager::CastVote;
let operator_id = self.credentials.id;
self.props
.actors
.proposal_manager
.ask(CastVote { proposal_id, operator_id, approve, signature })
.await
.map_err(|err| match err {
SendError::HandlerError(e) => e,
_ => crate::actors::proposal_manager::Error::ExecutionFailed("actor unavailable".to_owned()),
})
}
#[message]
pub(crate) async fn handle_query_pending(
&mut self,
) -> Vec<crate::actors::proposal_manager::ProposalSummary> {
use crate::actors::proposal_manager::QueryPending;
let operator_id = self.credentials.id;
self.props
.actors
.proposal_manager
.ask(QueryPending { operator_id })
.await
.unwrap_or_default()
}
}

View File

@@ -1,4 +1,4 @@
use super::{OutOfBand, OperatorConnection}; use super::{Credentials, OutOfBand, OperatorConnection};
use crate::{ use crate::{
actors::{ actors::{
flow_coordinator::client_connect_approval::ClientApprovalController, flow_coordinator::client_connect_approval::ClientApprovalController,
@@ -51,6 +51,7 @@ pub struct PendingClientApproval {
pub struct OperatorSession { pub struct OperatorSession {
props: OperatorConnection, props: OperatorConnection,
credentials: Credentials,
sender: Box<dyn Sender<OutOfBand>>, sender: Box<dyn Sender<OutOfBand>>,
pending_client_approvals: HashMap<Vec<u8>, PendingClientApproval>, pending_client_approvals: HashMap<Vec<u8>, PendingClientApproval>,
@@ -59,9 +60,10 @@ pub struct OperatorSession {
pub mod handlers; pub mod handlers;
impl OperatorSession { impl OperatorSession {
pub(crate) fn new(props: OperatorConnection, sender: Box<dyn Sender<OutOfBand>>) -> Self { pub(crate) fn new(props: OperatorConnection, credentials: Credentials, sender: Box<dyn Sender<OutOfBand>>) -> Self {
Self { Self {
props, props,
credentials,
sender, sender,
pending_client_approvals: HashMap::default(), pending_client_approvals: HashMap::default(),
} }