From c45e9305e500ba8c6483554c791547732e8b0e33 Mon Sep 17 00:00:00 2001 From: hdbg Date: Thu, 4 Dec 2025 22:37:25 +0100 Subject: [PATCH] refactor: splitted controller and reconciler --- Cargo.lock | 2 + Cargo.toml | 2 + pgd.toml | 4 +- src/cli.rs | 10 +- src/consts.rs | 2 + src/controller.rs | 258 +++++------------------------------ src/controller/docker.rs | 40 +++++- src/controller/reconciler.rs | 242 ++++++++++++++++++++++++++++++++ src/main.rs | 30 ++-- src/state.rs | 77 +++++------ 10 files changed, 381 insertions(+), 286 deletions(-) create mode 100644 src/consts.rs create mode 100644 src/controller/reconciler.rs diff --git a/Cargo.lock b/Cargo.lock index fd51258..79b82a0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1069,10 +1069,12 @@ dependencies = [ "futures", "indicatif", "miette", + "parking_lot", "rand", "serde", "serde_json", "serde_with", + "thiserror", "tokio", "toml", "tracing", diff --git a/Cargo.toml b/Cargo.toml index 202e538..5d9578c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,10 +14,12 @@ comfy-table = "7.2.1" futures = "0.3.31" indicatif = { version = "0.18.3", features = ["improved_unicode"] } miette = { version = "7.6.0", features = ["fancy"] } +parking_lot = "0.12.5" rand = "0.9.2" serde = { version = "1.0.228", features = ["derive"] } serde_json = "1.0.145" serde_with = "3.16.1" +thiserror = "2.0.17" tokio = { version = "1.48.0", features = ["full"] } toml = "0.9.8" tracing = "0.1.43" diff --git a/pgd.toml b/pgd.toml index 4f99abe..98cc0f1 100644 --- a/pgd.toml +++ b/pgd.toml @@ -1,3 +1,3 @@ version = "18.1" -password = "a7BASi7P3gCgc0Xx" -port = 5433 +password = "3pngIsq4aOy2z0ia" +port = 5432 diff --git a/src/cli.rs b/src/cli.rs index 989280f..7fde3e7 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -21,12 +21,10 @@ pub struct Cli { #[derive(Clone, clap::ValueEnum)] pub enum ConnectionFormat { - /// Human-readable text format - Text, - /// JSON format - Json, - /// Environment variable format - Env, + /// DSN Url + DSN, + // Human readable format + Human, } #[derive(Subcommand)] diff --git a/src/consts.rs b/src/consts.rs new file mode 100644 index 0000000..32dd783 --- /dev/null +++ b/src/consts.rs @@ -0,0 +1,2 @@ +pub const USERNAME: &str = "postgres"; +pub const DATABASE: &str = "postgres"; diff --git a/src/controller.rs b/src/controller.rs index a2d89e9..b4d7ea8 100644 --- a/src/controller.rs +++ b/src/controller.rs @@ -1,47 +1,70 @@ use std::time::Duration; -use miette::{bail, miette}; +use miette::{Diagnostic, bail, miette}; use colored::Colorize; use comfy_table::{Attribute, Cell, Color, ContentArrangement, Table, presets::UTF8_FULL}; use miette::Result; +use thiserror::Error; use crate::{ config::{PGDConfig, PostgresVersion, Project}, - controller::docker::DockerController, + controller::{docker::DockerController, reconciler::Reconciler}, state::{InstanceState, StateManager}, }; mod docker; mod utils; -const MAX_RETRIES: u32 = 10; -const VERIFY_DURATION_SECS: u64 = 5; +pub mod reconciler; -pub struct Controller { +pub struct Context { docker: DockerController, project: Option, - #[allow(unused)] + instance: Option, state: StateManager, } -impl Controller { - pub async fn new() -> Result { +impl Context { + pub async fn new(instance_override: Option) -> Result { + let project = Project::load()?; + let state = StateManager::new()?; + + let instance = match (project.as_ref(), instance_override) { + (None, None) => None, + // prioritizing provided instance name + (_, Some(instance)) => state.get(&instance), + (Some(project), None) => state.get(&project.name), + }; + Ok(Self { docker: DockerController::new().await?, - project: Project::load()?, - state: StateManager::load()?, + project, + instance, + state, }) } +} + +/// Main CLI command dispatcher +pub struct Controller { + ctx: Context, +} +impl Controller { + pub fn new(ctx: Context) -> Self { + Self { ctx } + } pub async fn init_project(&self) -> Result<()> { - if let Some(project) = &self.project { - return self.reconcile(project).await; + let reconciler = Reconciler { ctx: &self.ctx }; + + if let Some(project) = &self.ctx.project { + return reconciler.reconcile(project).await; } println!("{}", "Initializing new pgd project...".cyan()); - let mut versions = self.docker.available_versions().await?; + let mut versions = self.ctx.docker.available_versions().await?; versions.sort(); let latest_version = versions .last() @@ -93,217 +116,10 @@ impl Controller { println!("{table}"); - self.reconcile(&project).await?; + reconciler.reconcile(&project).await?; println!("\n{}", "✓ Project initialized successfully!".green().bold()); Ok(()) } - - pub async fn reconcile(&self, project: &Project) -> Result<()> { - self.docker - .ensure_version_downloaded(&project.config.version) - .await?; - - self.ensure_container_running(project).await?; - - Ok(()) - } - - async fn ensure_container_running(&self, project: &Project) -> Result<()> { - let mut state = StateManager::load()?; - let instance_state = state.get_mut(&project.name); - - let container_id = match instance_state { - Some(instance) => match self.ensure_container_exists(instance).await? { - Some(id) => id, - None => self.update_project_container(project, &mut state).await?, - }, - None => self.update_project_container(project, &mut state).await?, - }; - - let container_version = self - .docker - .get_container_postgres_version(&container_id) - .await?; - - self.ensure_matches_project_version(project, &mut state, &container_id, container_version) - .await?; - - if self - .docker - .is_container_running_by_id(&container_id) - .await? - { - println!("{}", "Container is already running".white()); - return Ok(()); - } - - use indicatif::{ProgressBar, ProgressStyle}; - - let spinner = ProgressBar::new_spinner(); - spinner.enable_steady_tick(Duration::from_millis(100)); - spinner.set_style( - ProgressStyle::default_spinner() - .template("{spinner:.cyan} {msg}") - .unwrap(), - ); - spinner.set_message("Starting container..."); - - for attempt in 1..=MAX_RETRIES { - spinner.set_message(format!( - "Starting container (attempt {}/{})", - attempt, MAX_RETRIES - )); - - let result = self.try_starting_container(&container_id, &spinner).await; - - match result { - Ok(_) => { - spinner.finish_with_message(format!( - "{}", - "Container started successfully".green().bold() - )); - return Ok(()); - } - Err(err) => { - spinner.set_message(format!( - "{} {}/{} failed: {}", - "Attempt".yellow(), - attempt, - MAX_RETRIES, - err - )); - } - } - - if attempt < MAX_RETRIES { - tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; - } - } - - spinner.finish_with_message(format!("{}", "Failed to start container".red())); - miette::bail!("Failed to start container after {} attempts", MAX_RETRIES) - } - - async fn try_starting_container( - &self, - container_id: &String, - spinner: &indicatif::ProgressBar, - ) -> Result<(), miette::Error> { - match self.docker.start_container_by_id(container_id).await { - Ok(_) => { - spinner.set_message(format!( - "{} ({}s)...", - "Verifying container is running".cyan(), - VERIFY_DURATION_SECS - )); - - for i in 0..VERIFY_DURATION_SECS { - spinner.set_message(format!( - "{} ({}/{}s)", - "Verifying container stability".cyan(), - i + 1, - VERIFY_DURATION_SECS - )); - tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; - } - - if self.docker.is_container_running_by_id(container_id).await? { - return Ok(()); - } else { - miette::bail!("Container stopped unexpectedly after start"); - } - } - Err(e) => { - miette::bail!("Failed to start: {}", e); - } - } - } - - async fn update_project_container( - &self, - project: &Project, - state: &mut StateManager, - ) -> Result { - println!( - "{} {}", - "Creating container".cyan(), - project.container_name().yellow() - ); - let id = self - .docker - .create_postgres_container( - &project.container_name(), - &project.config.version, - &project.config.password, - project.config.port, - ) - .await?; - println!("{}", "Container created successfully".green()); - state.set( - project.name.clone(), - crate::state::InstanceState::new( - id.clone(), - project.config.version, - project.config.port, - ), - ); - state.save()?; - Ok(id) - } - - async fn ensure_container_exists( - &self, - instance: &InstanceState, - ) -> Result, miette::Error> { - let mut container_id = None; - let id = &instance.container_id; - if self.docker.container_exists_by_id(id).await? { - container_id = Some(id.clone()); - } - Ok(container_id) - } - - async fn ensure_matches_project_version( - &self, - project: &Project, - _state: &mut StateManager, - _container_id: &String, - container_version: PostgresVersion, - ) -> Result<(), miette::Error> { - let _: () = if container_version != project.config.version { - let needs_upgrade = container_version < project.config.version; - - if needs_upgrade { - bail!("Upgrades are currently unsupported! :("); - // println!( - // "Upgrading PostgreSQL from {} to {}...", - // container_version, project.config.version - // ); - // self.docker.stop_container(container_id, 10).await?; - // self.docker - // .upgrade_container_image( - // container_id, - // container_name, - // &project.config.version, - // &project.config.password, - // project.config.port, - // ) - // .await?; - - // if let Some(instance_state) = state.get_mut(&project.name) { - // instance_state.postgres_version = project.config.version.to_string(); - // state.save()?; - // } - } else { - miette::bail!( - "Cannot downgrade PostgreSQL from {} to {}. Downgrades are not supported.", - container_version, - project.config.version - ); - } - }; - Ok(()) - } } diff --git a/src/controller/docker.rs b/src/controller/docker.rs index 3462ace..d9b57f4 100644 --- a/src/controller/docker.rs +++ b/src/controller/docker.rs @@ -1,15 +1,17 @@ -use miette::miette; -use std::str::FromStr; +use miette::{Diagnostic, miette}; +use std::{io::Write, str::FromStr}; +use thiserror::Error; use bollard::{ Docker, query_parameters::{ CreateContainerOptions, CreateImageOptions, InspectContainerOptions, ListImagesOptions, - StartContainerOptions, StopContainerOptions, + LogsOptions, StartContainerOptions, StopContainerOptions, }, secret::ContainerCreateBody, }; use colored::Colorize; +use futures::StreamExt; use indicatif::MultiProgress; use miette::{Context, IntoDiagnostic, Result}; use tracing::info; @@ -26,6 +28,11 @@ fn format_image(ver: &PostgresVersion) -> String { format!("{DOCKERHUB_POSTGRES}:{}", ver) } +#[derive(Error, Debug, Diagnostic)] +#[error("Docker operation failed")] +#[diagnostic(code(pgd::docker))] +pub enum Error {} + pub struct DockerController { daemon: Docker, } @@ -262,4 +269,31 @@ impl DockerController { PostgresVersion::from_str(version_str) .map_err(|_| miette!("Invalid version in label: {}", version_str)) } + + pub async fn stream_logs(&self, container_id: &str, follow: bool) -> Result<()> { + let options = Some(LogsOptions { + follow, + stdout: true, + stderr: true, + ..Default::default() + }); + + let mut logs = self.daemon.logs(container_id, options); + + while let Some(entry) = logs.next().await { + match entry { + Ok(output) => { + print!("{output}"); + std::io::stdout().flush().ok(); + } + Err(err) => { + return Err(err) + .into_diagnostic() + .wrap_err("Failed to stream container logs"); + } + } + } + + Ok(()) + } } diff --git a/src/controller/reconciler.rs b/src/controller/reconciler.rs new file mode 100644 index 0000000..05a5a95 --- /dev/null +++ b/src/controller/reconciler.rs @@ -0,0 +1,242 @@ +use std::time::Duration; + +use miette::{Diagnostic, bail, miette}; + +use colored::Colorize; +use comfy_table::{Attribute, Cell, Color, ContentArrangement, Table, presets::UTF8_FULL}; +use miette::Result; +use thiserror::Error; + +use crate::{ + config::{PGDConfig, PostgresVersion, Project}, + controller::{ + Context, + docker::{self, DockerController}, + }, + state::{InstanceState, StateManager}, +}; + +const MAX_RETRIES: usize = 10; +const VERIFY_DURATION_SECS: u64 = 5; + +#[derive(Error, Debug, Diagnostic)] +#[error("Failed to sync container state")] +#[diagnostic(code(pgd::reconcile))] +pub enum ReconcileError { + AlreadyRunning, + ImageDownload(#[source] docker::Error), +} + +pub struct Reconciler<'a> { + pub ctx: &'a Context, +} + +impl<'a> Reconciler<'a> { + pub async fn reconcile(&self, project: &Project) -> Result<()> { + self.ctx + .docker + .ensure_version_downloaded(&project.config.version) + .await?; + + self.ensure_container_running(project).await?; + + Ok(()) + } + + async fn ensure_container_running(&self, project: &Project) -> Result<()> { + let container_id = match &self.ctx.instance { + Some(instance) => match self.ensure_container_exists(instance).await? { + Some(id) => id, + None => self.update_project_container(project).await?, + }, + None => self.update_project_container(project).await?, + }; + + let container_version = self + .ctx + .docker + .get_container_postgres_version(&container_id) + .await?; + + self.ensure_matches_project_version(project, &container_id, container_version) + .await?; + + if self + .ctx + .docker + .is_container_running_by_id(&container_id) + .await? + { + println!("{}", "Container is already running".white()); + return Ok(()); + } + + use indicatif::{ProgressBar, ProgressStyle}; + + let spinner = ProgressBar::new_spinner(); + spinner.enable_steady_tick(Duration::from_millis(100)); + spinner.set_style( + ProgressStyle::default_spinner() + .template("{spinner:.cyan} {msg}") + .unwrap(), + ); + spinner.set_message("Starting container..."); + + for attempt in 1..=MAX_RETRIES { + spinner.set_message(format!( + "Starting container (attempt {}/{})", + attempt, MAX_RETRIES + )); + + let result = self.try_starting_container(&container_id, &spinner).await; + + match result { + Ok(_) => { + spinner.finish_with_message(format!( + "{}", + "Container started successfully".green().bold() + )); + return Ok(()); + } + Err(err) => { + spinner.set_message(format!( + "{} {}/{} failed: {}", + "Attempt".yellow(), + attempt, + MAX_RETRIES, + err + )); + } + } + + if attempt < MAX_RETRIES { + tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; + } + } + + spinner.finish_with_message(format!("{}", "Failed to start container".red())); + miette::bail!("Failed to start container after {} attempts", MAX_RETRIES) + } + + async fn try_starting_container( + &self, + container_id: &String, + spinner: &indicatif::ProgressBar, + ) -> Result<(), miette::Error> { + match self.ctx.docker.start_container_by_id(container_id).await { + Ok(_) => { + spinner.set_message(format!( + "{} ({}s)...", + "Verifying container is running".cyan(), + VERIFY_DURATION_SECS + )); + + for i in 0..VERIFY_DURATION_SECS { + spinner.set_message(format!( + "{} ({}/{}s)", + "Verifying container stability".cyan(), + i + 1, + VERIFY_DURATION_SECS + )); + tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; + } + + if self + .ctx + .docker + .is_container_running_by_id(container_id) + .await? + { + return Ok(()); + } else { + miette::bail!("Container stopped unexpectedly after start"); + } + } + Err(e) => { + miette::bail!("Failed to start: {}", e); + } + } + } + + async fn update_project_container(&self, project: &Project) -> Result { + println!( + "{} {}", + "Creating container".cyan(), + project.container_name().yellow() + ); + let id = self + .ctx + .docker + .create_postgres_container( + &project.container_name(), + &project.config.version, + &project.config.password, + project.config.port, + ) + .await?; + println!("{}", "Container created successfully".green()); + self.ctx.state.set( + project.name.clone(), + crate::state::InstanceState::new( + id.clone(), + project.config.version, + project.config.port, + ), + ); + self.ctx.state.save()?; + Ok(id) + } + + async fn ensure_container_exists( + &self, + instance: &InstanceState, + ) -> Result, miette::Error> { + let mut container_id = None; + let id = &instance.container_id; + if self.ctx.docker.container_exists_by_id(id).await? { + container_id = Some(id.clone()); + } + Ok(container_id) + } + + async fn ensure_matches_project_version( + &self, + project: &Project, + _container_id: &String, + container_version: PostgresVersion, + ) -> Result<(), miette::Error> { + let _: () = if container_version != project.config.version { + let needs_upgrade = container_version < project.config.version; + + if needs_upgrade { + bail!("Upgrades are currently unsupported! :("); + // println!( + // "Upgrading PostgreSQL from {} to {}...", + // container_version, project.config.version + // ); + // self.docker.stop_container(container_id, 10).await?; + // self.docker + // .upgrade_container_image( + // container_id, + // container_name, + // &project.config.version, + // &project.config.password, + // project.config.port, + // ) + // .await?; + + // if let Some(instance_state) = state.get_mut(&project.name) { + // instance_state.postgres_version = project.config.version.to_string(); + // state.save()?; + // } + } else { + miette::bail!( + "Cannot downgrade PostgreSQL from {} to {}. Downgrades are not supported.", + container_version, + project.config.version + ); + } + }; + Ok(()) + } +} diff --git a/src/main.rs b/src/main.rs index 0fdd095..76f5adb 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,10 +2,7 @@ mod cli; mod config; mod state; -mod consts { - pub const USERNAME: &str = "postgres"; - pub const DATABASE: &str = "postgres"; -} +mod consts; mod controller; @@ -14,7 +11,10 @@ use cli::Cli; use miette::Result; use tracing::info; -use crate::controller::Controller; +use crate::{ + cli::ControlCommands, + controller::{Context, Controller}, +}; #[tokio::main] async fn main() -> Result<()> { @@ -24,18 +24,26 @@ async fn main() -> Result<()> { init_tracing(cli.verbose); info!("pgd.start"); - let controller = Controller::new().await?; match cli.command { - cli::Commands::Init => controller.init_project().await?, - cli::Commands::Instance { name, cmd } => todo!(), + cli::Commands::Init => { + let ctx = Context::new(None).await?; + Controller::new(ctx).init_project().await?; + } + cli::Commands::Instance { name, cmd } => match cmd { + ControlCommands::Start => {} + ControlCommands::Stop => {} + ControlCommands::Restart => {} + ControlCommands::Destroy => {} + ControlCommands::Logs { follow } => todo!(), + ControlCommands::Status => {} + ControlCommands::Connection { format: _ } => {} + }, } Ok(()) } -fn init_tracing(verbose: bool) { - - +fn init_tracing(_verbose: bool) { tracing_subscriber::fmt::init(); } diff --git a/src/state.rs b/src/state.rs index 12ec8fb..1f095b4 100644 --- a/src/state.rs +++ b/src/state.rs @@ -1,80 +1,55 @@ use miette::{Context, IntoDiagnostic, Result}; use serde::{Deserialize, Serialize}; +use std::cell::{Ref, RefCell}; use std::collections::HashMap; use std::path::PathBuf; use crate::config::PostgresVersion; -/// State information for a single PostgreSQL instance #[derive(Debug, Clone, Serialize, Deserialize)] pub struct InstanceState { - /// Docker container ID pub container_id: String, - /// PostgreSQL version running in the container pub postgres_version: PostgresVersion, - /// Port the container is bound to pub port: u16, - /// Timestamp when the instance was created (Unix timestamp) pub created_at: u64, } -/// Manages the global state file at ~/.pgd/state.json -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct StateManager { - /// Map of project name to instance state +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +struct State { #[serde(default)] instances: HashMap, } - -/// Get the path to the state file (~/.pgd/state.json) - -fn state_file_path() -> Result { - let home = std::env::var("HOME") - .into_diagnostic() - .wrap_err("Failed to get HOME environment variable")?; - - Ok(PathBuf::from(home).join(".pgd").join("state.json")) -} - - -impl StateManager { - /// Load the state manager from disk, or create a new one if it doesn't exist - pub fn load() -> Result { +impl State { + fn new() -> Result { let state_path = state_file_path()?; if !state_path.exists() { - // Create the directory if it doesn't exist if let Some(parent) = state_path.parent() { std::fs::create_dir_all(parent) .into_diagnostic() .wrap_err("Failed to create .pgd directory")?; } - // Return empty state - return Ok(StateManager { - instances: HashMap::new(), - }); + return Ok(Self::default()); } let content = std::fs::read_to_string(&state_path) .into_diagnostic() .wrap_err_with(|| format!("Failed to read state file: {}", state_path.display()))?; - let state: StateManager = serde_json::from_str(&content) + let state: Self = serde_json::from_str(&content) .into_diagnostic() .wrap_err("Failed to parse state.json")?; Ok(state) } - /// Save the state manager to disk - pub fn save(&self) -> Result<()> { + fn save(&self) -> Result<()> { let state_path = state_file_path()?; - // Ensure directory exists if let Some(parent) = state_path.parent() { std::fs::create_dir_all(parent) .into_diagnostic() @@ -91,20 +66,30 @@ impl StateManager { Ok(()) } +} - /// Get mutable state for a specific project - pub fn get_mut(&mut self, project_name: &str) -> Option<&mut InstanceState> { - self.instances.get_mut(project_name) +pub struct StateManager(RefCell); + +impl StateManager { + pub fn new() -> Result { + Ok(Self(RefCell::new(State::new()?))) } - /// Set the state for a specific project - pub fn set(&mut self, project_name: String, state: InstanceState) { - self.instances.insert(project_name, state); + pub fn save(&self) -> Result<()> { + self.0.borrow().save()?; + Ok(()) } - /// Remove the state for a specific project - pub fn remove(&mut self, project_name: &str) -> Option { - self.instances.remove(project_name) + pub fn get(&self, project_name: &str) -> Option { + self.0.borrow().instances.get(project_name).cloned() + } + + pub fn set(&self, project_name: String, state: InstanceState) { + self.0.borrow_mut().instances.insert(project_name, state); + } + + pub fn remove(&self, project_name: &str) -> Option { + self.0.borrow_mut().instances.remove(project_name) } } @@ -122,4 +107,10 @@ impl InstanceState { created_at: now, } } -} \ No newline at end of file +} + +fn state_file_path() -> Result { + let home = std::env::home_dir().wrap_err("Failed to get HOME environment variable")?; + + Ok(home.join(".pgd").join("state.json")) +}