feat: actors experiment

This commit is contained in:
hdbg
2026-02-13 12:11:56 +01:00
parent bbbb4feaa0
commit 208bbbd540
13 changed files with 245 additions and 87 deletions

View File

@@ -1,23 +1,30 @@
#![allow(unused)]
use tracing::error;
use arbiter_proto::{
proto::{ClientRequest, ClientResponse, UserAgentRequest, UserAgentResponse},
transport::BiStream,
};
use async_trait::async_trait;
use futures::StreamExt;
use kameo::actor::Spawn;
use tokio_stream::wrappers::ReceiverStream;
use tokio::sync::mpsc;
use tonic::{Request, Response, Status};
use crate::{
handlers::{client::handle_client, user_agent::handle_user_agent},
actors::{
client::handle_client,
user_agent::{self, UserAgentActor},
},
context::ServerContext,
};
mod db;
pub mod handlers;
pub mod actors;
mod context;
mod db;
const DEFAULT_CHANNEL_SIZE: usize = 1000;
@@ -51,16 +58,20 @@ impl arbiter_proto::proto::arbiter_service_server::ArbiterService for Server {
&self,
request: Request<tonic::Streaming<UserAgentRequest>>,
) -> Result<Response<Self::UserAgentStream>, Status> {
let req_stream = request.into_inner();
let mut req_stream = request.into_inner();
let (tx, rx) = mpsc::channel(DEFAULT_CHANNEL_SIZE);
tokio::spawn(handle_user_agent(
self.context.clone(),
BiStream {
request_stream: req_stream,
response_sender: tx,
},
));
let actor = UserAgentActor::spawn(UserAgentActor::new(self.context.clone(), tx));
tokio::task::spawn(async move {
while let Some(Ok(req)) = req_stream.next().await && actor.is_alive() {
if actor.tell(user_agent::Grpc {msg: req}).await.is_err() {
error!("Failed to send message to UserAgentActor");
break;
}
}
});
Ok(Response::new(ReceiverStream::new(rx)))
}
}