refactor(server): extract shared runtime and implement service install/run in arbiter-server.exe
This commit is contained in:
@@ -53,7 +53,11 @@ spki.workspace = true
|
||||
alloy.workspace = true
|
||||
prost-types.workspace = true
|
||||
arbiter-tokens-registry.path = "../arbiter-tokens-registry"
|
||||
clap = { version = "4.6", features = ["derive"] }
|
||||
|
||||
[dev-dependencies]
|
||||
insta = "1.46.3"
|
||||
test-log = { version = "0.2", default-features = false, features = ["trace"] }
|
||||
|
||||
[target.'cfg(windows)'.dependencies]
|
||||
windows-service = "0.8"
|
||||
|
||||
68
server/crates/arbiter-server/src/cli.rs
Normal file
68
server/crates/arbiter-server/src/cli.rs
Normal file
@@ -0,0 +1,68 @@
|
||||
use std::{net::SocketAddr, path::PathBuf};
|
||||
|
||||
use clap::{Args, Parser, Subcommand};
|
||||
|
||||
const DEFAULT_LISTEN_ADDR: &str = "127.0.0.1:50051";
|
||||
|
||||
#[derive(Debug, Parser)]
|
||||
#[command(name = "arbiter-server")]
|
||||
#[command(about = "Arbiter gRPC server")]
|
||||
pub struct Cli {
|
||||
#[command(subcommand)]
|
||||
pub command: Option<Command>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Subcommand)]
|
||||
pub enum Command {
|
||||
/// Run server in foreground mode.
|
||||
Run(RunArgs),
|
||||
/// Manage service lifecycle.
|
||||
Service {
|
||||
#[command(subcommand)]
|
||||
command: ServiceCommand,
|
||||
},
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Args)]
|
||||
pub struct RunArgs {
|
||||
#[arg(long, default_value = DEFAULT_LISTEN_ADDR)]
|
||||
pub listen_addr: SocketAddr,
|
||||
#[arg(long)]
|
||||
pub data_dir: Option<PathBuf>,
|
||||
}
|
||||
|
||||
impl Default for RunArgs {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
listen_addr: DEFAULT_LISTEN_ADDR
|
||||
.parse()
|
||||
.expect("listen address literal must be valid"),
|
||||
data_dir: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Subcommand)]
|
||||
pub enum ServiceCommand {
|
||||
/// Install Windows service in Service Control Manager.
|
||||
Install(ServiceInstallArgs),
|
||||
/// Internal service entrypoint. SCM only.
|
||||
#[command(hide = true)]
|
||||
Run(ServiceRunArgs),
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Args)]
|
||||
pub struct ServiceInstallArgs {
|
||||
#[arg(long)]
|
||||
pub start: bool,
|
||||
#[arg(long)]
|
||||
pub data_dir: Option<PathBuf>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Args)]
|
||||
pub struct ServiceRunArgs {
|
||||
#[arg(long, default_value = DEFAULT_LISTEN_ADDR)]
|
||||
pub listen_addr: SocketAddr,
|
||||
#[arg(long)]
|
||||
pub data_dir: Option<PathBuf>,
|
||||
}
|
||||
@@ -6,6 +6,7 @@ pub mod context;
|
||||
pub mod db;
|
||||
pub mod evm;
|
||||
pub mod grpc;
|
||||
pub mod runtime;
|
||||
pub mod safe_cell;
|
||||
pub mod utils;
|
||||
|
||||
|
||||
@@ -1,56 +1,42 @@
|
||||
use std::net::SocketAddr;
|
||||
mod cli;
|
||||
mod service;
|
||||
|
||||
use arbiter_proto::{proto::arbiter_service_server::ArbiterServiceServer, url::ArbiterUrl};
|
||||
use arbiter_server::{Server, actors::bootstrap::GetToken, context::ServerContext, db};
|
||||
use miette::miette;
|
||||
use clap::Parser;
|
||||
use cli::{Cli, Command, RunArgs, ServiceCommand};
|
||||
use rustls::crypto::aws_lc_rs;
|
||||
use tonic::transport::{Identity, ServerTlsConfig};
|
||||
use tracing::info;
|
||||
|
||||
const PORT: u16 = 50051;
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> miette::Result<()> {
|
||||
aws_lc_rs::default_provider().install_default().unwrap();
|
||||
init_logging();
|
||||
|
||||
tracing_subscriber::fmt()
|
||||
let cli = Cli::parse();
|
||||
|
||||
match cli.command {
|
||||
None => run_foreground(RunArgs::default()).await,
|
||||
Some(Command::Run(args)) => run_foreground(args).await,
|
||||
Some(Command::Service { command }) => match command {
|
||||
ServiceCommand::Install(args) => service::install_service(args),
|
||||
ServiceCommand::Run(args) => service::run_service_dispatcher(args),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
async fn run_foreground(args: RunArgs) -> miette::Result<()> {
|
||||
info!(addr = %args.listen_addr, "Starting arbiter server");
|
||||
arbiter_server::runtime::run_server_until_shutdown(
|
||||
arbiter_server::runtime::RunConfig::new(args.listen_addr, args.data_dir),
|
||||
std::future::pending::<()>(),
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
fn init_logging() {
|
||||
let _ = tracing_subscriber::fmt()
|
||||
.with_env_filter(
|
||||
tracing_subscriber::EnvFilter::try_from_default_env()
|
||||
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")),
|
||||
)
|
||||
.init();
|
||||
|
||||
info!("Starting arbiter server");
|
||||
|
||||
let db = db::create_pool(None).await?;
|
||||
info!("Database ready");
|
||||
|
||||
let context = ServerContext::new(db).await?;
|
||||
|
||||
let addr: SocketAddr = format!("127.0.0.1:{PORT}").parse().expect("valid address");
|
||||
info!(%addr, "Starting gRPC server");
|
||||
|
||||
let url = ArbiterUrl {
|
||||
host: addr.ip().to_string(),
|
||||
port: addr.port(),
|
||||
ca_cert: context.tls.ca_cert().clone().into_owned(),
|
||||
bootstrap_token: context.actors.bootstrapper.ask(GetToken).await.unwrap(),
|
||||
};
|
||||
|
||||
info!(%url, "Server URL");
|
||||
|
||||
let tls = ServerTlsConfig::new().identity(Identity::from_pem(
|
||||
context.tls.cert_pem(),
|
||||
context.tls.key_pem(),
|
||||
));
|
||||
|
||||
tonic::transport::Server::builder()
|
||||
.tls_config(tls)
|
||||
.map_err(|err| miette!("Faild to setup TLS: {err}"))?
|
||||
.add_service(ArbiterServiceServer::new(Server::new(context)))
|
||||
.serve(addr)
|
||||
.await
|
||||
.map_err(|e| miette::miette!("gRPC server error: {e}"))?;
|
||||
|
||||
unreachable!("gRPC server should run indefinitely");
|
||||
.try_init();
|
||||
}
|
||||
|
||||
77
server/crates/arbiter-server/src/runtime.rs
Normal file
77
server/crates/arbiter-server/src/runtime.rs
Normal file
@@ -0,0 +1,77 @@
|
||||
use std::{future::Future, net::SocketAddr, path::PathBuf};
|
||||
|
||||
use arbiter_proto::{proto::arbiter_service_server::ArbiterServiceServer, url::ArbiterUrl};
|
||||
use kameo::actor::ActorRef;
|
||||
use miette::miette;
|
||||
use tonic::transport::{Identity, ServerTlsConfig};
|
||||
use tracing::info;
|
||||
|
||||
use crate::{Server, actors::bootstrap::GetToken, context::ServerContext, db};
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct RunConfig {
|
||||
pub addr: SocketAddr,
|
||||
pub data_dir: Option<PathBuf>,
|
||||
pub log_arbiter_url: bool,
|
||||
}
|
||||
|
||||
impl RunConfig {
|
||||
pub fn new(addr: SocketAddr, data_dir: Option<PathBuf>) -> Self {
|
||||
Self {
|
||||
addr,
|
||||
data_dir,
|
||||
log_arbiter_url: true,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn run_server_until_shutdown<F>(config: RunConfig, shutdown: F) -> miette::Result<()>
|
||||
where
|
||||
F: Future<Output = ()> + Send + 'static,
|
||||
{
|
||||
arbiter_proto::set_home_path_override(config.data_dir.clone())
|
||||
.map_err(|err| miette!("failed to set home path override: {err}"))?;
|
||||
|
||||
let db = db::create_pool(None).await?;
|
||||
info!(addr = %config.addr, "Database ready");
|
||||
|
||||
let context = ServerContext::new(db).await?;
|
||||
info!(addr = %config.addr, "Server context ready");
|
||||
|
||||
if config.log_arbiter_url {
|
||||
let url =
|
||||
build_arbiter_url(config.addr, &context.actors.bootstrapper, &context.tls).await?;
|
||||
info!(%url, "Server URL");
|
||||
}
|
||||
|
||||
let tls = ServerTlsConfig::new().identity(Identity::from_pem(
|
||||
context.tls.cert_pem(),
|
||||
context.tls.key_pem(),
|
||||
));
|
||||
|
||||
tonic::transport::Server::builder()
|
||||
.tls_config(tls)
|
||||
.map_err(|err| miette!("Failed to setup TLS: {err}"))?
|
||||
.add_service(ArbiterServiceServer::new(Server::new(context)))
|
||||
.serve_with_shutdown(config.addr, shutdown)
|
||||
.await
|
||||
.map_err(|e| miette!("gRPC server error: {e}"))?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn build_arbiter_url(
|
||||
addr: SocketAddr,
|
||||
bootstrapper: &ActorRef<crate::actors::bootstrap::Bootstrapper>,
|
||||
tls: &crate::context::tls::TlsManager,
|
||||
) -> miette::Result<ArbiterUrl> {
|
||||
Ok(ArbiterUrl {
|
||||
host: addr.ip().to_string(),
|
||||
port: addr.port(),
|
||||
ca_cert: tls.ca_cert().clone().into_owned(),
|
||||
bootstrap_token: bootstrapper
|
||||
.ask(GetToken)
|
||||
.await
|
||||
.map_err(|err| miette!("failed to get bootstrap token from actor: {err}"))?,
|
||||
})
|
||||
}
|
||||
19
server/crates/arbiter-server/src/service/mod.rs
Normal file
19
server/crates/arbiter-server/src/service/mod.rs
Normal file
@@ -0,0 +1,19 @@
|
||||
#[cfg(windows)]
|
||||
mod windows;
|
||||
|
||||
#[cfg(windows)]
|
||||
pub use windows::{install_service, run_service_dispatcher};
|
||||
|
||||
#[cfg(not(windows))]
|
||||
pub fn install_service(_: crate::cli::ServiceInstallArgs) -> miette::Result<()> {
|
||||
Err(miette::miette!(
|
||||
"service install is currently supported only on Windows"
|
||||
))
|
||||
}
|
||||
|
||||
#[cfg(not(windows))]
|
||||
pub fn run_service_dispatcher(_: crate::cli::ServiceRunArgs) -> miette::Result<()> {
|
||||
Err(miette::miette!(
|
||||
"service run entrypoint is currently supported only on Windows"
|
||||
))
|
||||
}
|
||||
229
server/crates/arbiter-server/src/service/windows.rs
Normal file
229
server/crates/arbiter-server/src/service/windows.rs
Normal file
@@ -0,0 +1,229 @@
|
||||
use std::{
|
||||
ffi::OsString,
|
||||
path::{Path, PathBuf},
|
||||
process::Command,
|
||||
sync::mpsc,
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
use miette::{Context as _, IntoDiagnostic as _, miette};
|
||||
use windows_service::{
|
||||
define_windows_service,
|
||||
service::{
|
||||
ServiceAccess, ServiceControl, ServiceControlAccept, ServiceErrorControl, ServiceExitCode,
|
||||
ServiceInfo, ServiceStartType, ServiceState, ServiceStatus, ServiceType,
|
||||
},
|
||||
service_control_handler::{self, ServiceControlHandlerResult},
|
||||
service_dispatcher,
|
||||
service_manager::{ServiceManager, ServiceManagerAccess},
|
||||
};
|
||||
|
||||
use crate::cli::{ServiceInstallArgs, ServiceRunArgs};
|
||||
use arbiter_server::runtime::{RunConfig, run_server_until_shutdown};
|
||||
|
||||
const SERVICE_NAME: &str = "ArbiterServer";
|
||||
const SERVICE_DISPLAY_NAME: &str = "Arbiter Server";
|
||||
|
||||
pub fn default_service_data_dir() -> PathBuf {
|
||||
let base = std::env::var_os("PROGRAMDATA")
|
||||
.map(PathBuf::from)
|
||||
.unwrap_or_else(|| PathBuf::from(r"C:\ProgramData"));
|
||||
base.join("Arbiter")
|
||||
}
|
||||
|
||||
pub fn install_service(args: ServiceInstallArgs) -> miette::Result<()> {
|
||||
ensure_admin_rights()?;
|
||||
|
||||
let executable = std::env::current_exe().into_diagnostic()?;
|
||||
let data_dir = args.data_dir.unwrap_or_else(default_service_data_dir);
|
||||
|
||||
std::fs::create_dir_all(&data_dir)
|
||||
.into_diagnostic()
|
||||
.with_context(|| format!("failed to create service data dir: {}", data_dir.display()))?;
|
||||
ensure_token_acl_contract(&data_dir)?;
|
||||
|
||||
let manager_access = ServiceManagerAccess::CONNECT | ServiceManagerAccess::CREATE_SERVICE;
|
||||
let manager = ServiceManager::local_computer(None::<&str>, manager_access)
|
||||
.into_diagnostic()
|
||||
.wrap_err("failed to open Service Control Manager")?;
|
||||
|
||||
let launch_arguments = vec![
|
||||
OsString::from("service"),
|
||||
OsString::from("run"),
|
||||
OsString::from("--data-dir"),
|
||||
data_dir.as_os_str().to_os_string(),
|
||||
];
|
||||
|
||||
let service_info = ServiceInfo {
|
||||
name: OsString::from(SERVICE_NAME),
|
||||
display_name: OsString::from(SERVICE_DISPLAY_NAME),
|
||||
service_type: ServiceType::OWN_PROCESS,
|
||||
start_type: ServiceStartType::OnDemand,
|
||||
error_control: ServiceErrorControl::Normal,
|
||||
executable_path: executable,
|
||||
launch_arguments,
|
||||
dependencies: vec![],
|
||||
account_name: Some(OsString::from(r"NT AUTHORITY\LocalService")),
|
||||
account_password: None,
|
||||
};
|
||||
|
||||
let service = manager
|
||||
.create_service(
|
||||
&service_info,
|
||||
ServiceAccess::QUERY_STATUS | ServiceAccess::START,
|
||||
)
|
||||
.into_diagnostic()
|
||||
.wrap_err("failed to create Windows service in SCM")?;
|
||||
|
||||
if args.start {
|
||||
service
|
||||
.start::<&str>(&[])
|
||||
.into_diagnostic()
|
||||
.wrap_err("service created but failed to start")?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn run_service_dispatcher(args: ServiceRunArgs) -> miette::Result<()> {
|
||||
SERVICE_RUN_ARGS
|
||||
.set(args)
|
||||
.map_err(|_| miette!("service runtime args are already initialized"))?;
|
||||
|
||||
service_dispatcher::start(SERVICE_NAME, ffi_service_main)
|
||||
.into_diagnostic()
|
||||
.wrap_err("failed to start service dispatcher")?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
define_windows_service!(ffi_service_main, service_main);
|
||||
|
||||
static SERVICE_RUN_ARGS: std::sync::OnceLock<ServiceRunArgs> = std::sync::OnceLock::new();
|
||||
|
||||
fn service_main(_arguments: Vec<OsString>) {
|
||||
if let Err(error) = run_service_main() {
|
||||
tracing::error!(error = ?error, "Windows service main failed");
|
||||
}
|
||||
}
|
||||
|
||||
fn run_service_main() -> miette::Result<()> {
|
||||
let args = SERVICE_RUN_ARGS
|
||||
.get()
|
||||
.cloned()
|
||||
.ok_or_else(|| miette!("service run args are missing"))?;
|
||||
|
||||
let (shutdown_tx, shutdown_rx) = mpsc::channel::<()>();
|
||||
|
||||
let status_handle =
|
||||
service_control_handler::register(SERVICE_NAME, move |control| match control {
|
||||
ServiceControl::Stop => {
|
||||
let _ = shutdown_tx.send(());
|
||||
ServiceControlHandlerResult::NoError
|
||||
}
|
||||
ServiceControl::Interrogate => ServiceControlHandlerResult::NoError,
|
||||
_ => ServiceControlHandlerResult::NotImplemented,
|
||||
})
|
||||
.into_diagnostic()
|
||||
.wrap_err("failed to register service control handler")?;
|
||||
|
||||
set_status(
|
||||
&status_handle,
|
||||
ServiceState::StartPending,
|
||||
ServiceControlAccept::empty(),
|
||||
)?;
|
||||
|
||||
let runtime = tokio::runtime::Builder::new_multi_thread()
|
||||
.enable_all()
|
||||
.build()
|
||||
.into_diagnostic()
|
||||
.wrap_err("failed to build tokio runtime for service")?;
|
||||
|
||||
set_status(
|
||||
&status_handle,
|
||||
ServiceState::Running,
|
||||
ServiceControlAccept::STOP,
|
||||
)?;
|
||||
|
||||
let data_dir = args.data_dir.unwrap_or_else(default_service_data_dir);
|
||||
let config = RunConfig {
|
||||
addr: args.listen_addr,
|
||||
data_dir: Some(data_dir),
|
||||
log_arbiter_url: true,
|
||||
};
|
||||
|
||||
let result = runtime.block_on(run_server_until_shutdown(config, async move {
|
||||
let _ = tokio::task::spawn_blocking(move || shutdown_rx.recv()).await;
|
||||
}));
|
||||
|
||||
set_status(
|
||||
&status_handle,
|
||||
ServiceState::Stopped,
|
||||
ServiceControlAccept::empty(),
|
||||
)?;
|
||||
|
||||
result
|
||||
}
|
||||
|
||||
fn set_status(
|
||||
status_handle: &service_control_handler::ServiceStatusHandle,
|
||||
current_state: ServiceState,
|
||||
controls_accepted: ServiceControlAccept,
|
||||
) -> miette::Result<()> {
|
||||
status_handle
|
||||
.set_service_status(ServiceStatus {
|
||||
service_type: ServiceType::OWN_PROCESS,
|
||||
current_state,
|
||||
controls_accepted,
|
||||
exit_code: ServiceExitCode::Win32(0),
|
||||
checkpoint: 0,
|
||||
wait_hint: Duration::from_secs(10),
|
||||
process_id: None,
|
||||
})
|
||||
.into_diagnostic()
|
||||
.wrap_err("failed to update service state")?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn ensure_admin_rights() -> miette::Result<()> {
|
||||
let status = Command::new("net")
|
||||
.arg("session")
|
||||
.status()
|
||||
.into_diagnostic()
|
||||
.wrap_err("failed to check administrator rights")?;
|
||||
|
||||
if status.success() {
|
||||
Ok(())
|
||||
} else {
|
||||
Err(miette!(
|
||||
"administrator privileges are required to install Windows service"
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
fn ensure_token_acl_contract(data_dir: &Path) -> miette::Result<()> {
|
||||
// IMPORTANT: This ACL setup is intentionally explicit and should not be simplified away,
|
||||
// because service-account and interactive-user access requirements are different in production.
|
||||
let target = data_dir.as_os_str();
|
||||
|
||||
let status = Command::new("icacls")
|
||||
.arg(target)
|
||||
.arg("/grant")
|
||||
.arg("*S-1-5-19:(OI)(CI)M")
|
||||
.arg("/grant")
|
||||
.arg("*S-1-5-32-545:(OI)(CI)RX")
|
||||
.arg("/T")
|
||||
.arg("/C")
|
||||
.status()
|
||||
.into_diagnostic()
|
||||
.wrap_err("failed to apply ACLs for service data directory")?;
|
||||
|
||||
if status.success() {
|
||||
Ok(())
|
||||
} else {
|
||||
Err(miette!(
|
||||
"failed to ensure ACL contract for service data directory"
|
||||
))
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user