refactor: splitted controller and reconciler

This commit is contained in:
hdbg
2025-12-04 22:37:25 +01:00
parent bc37b58d80
commit c45e9305e5
10 changed files with 381 additions and 286 deletions

2
Cargo.lock generated
View File

@@ -1069,10 +1069,12 @@ dependencies = [
"futures", "futures",
"indicatif", "indicatif",
"miette", "miette",
"parking_lot",
"rand", "rand",
"serde", "serde",
"serde_json", "serde_json",
"serde_with", "serde_with",
"thiserror",
"tokio", "tokio",
"toml", "toml",
"tracing", "tracing",

View File

@@ -14,10 +14,12 @@ comfy-table = "7.2.1"
futures = "0.3.31" futures = "0.3.31"
indicatif = { version = "0.18.3", features = ["improved_unicode"] } indicatif = { version = "0.18.3", features = ["improved_unicode"] }
miette = { version = "7.6.0", features = ["fancy"] } miette = { version = "7.6.0", features = ["fancy"] }
parking_lot = "0.12.5"
rand = "0.9.2" rand = "0.9.2"
serde = { version = "1.0.228", features = ["derive"] } serde = { version = "1.0.228", features = ["derive"] }
serde_json = "1.0.145" serde_json = "1.0.145"
serde_with = "3.16.1" serde_with = "3.16.1"
thiserror = "2.0.17"
tokio = { version = "1.48.0", features = ["full"] } tokio = { version = "1.48.0", features = ["full"] }
toml = "0.9.8" toml = "0.9.8"
tracing = "0.1.43" tracing = "0.1.43"

View File

@@ -1,3 +1,3 @@
version = "18.1" version = "18.1"
password = "a7BASi7P3gCgc0Xx" password = "3pngIsq4aOy2z0ia"
port = 5433 port = 5432

View File

@@ -21,12 +21,10 @@ pub struct Cli {
#[derive(Clone, clap::ValueEnum)] #[derive(Clone, clap::ValueEnum)]
pub enum ConnectionFormat { pub enum ConnectionFormat {
/// Human-readable text format /// DSN Url
Text, DSN,
/// JSON format // Human readable format
Json, Human,
/// Environment variable format
Env,
} }
#[derive(Subcommand)] #[derive(Subcommand)]

2
src/consts.rs Normal file
View File

@@ -0,0 +1,2 @@
pub const USERNAME: &str = "postgres";
pub const DATABASE: &str = "postgres";

View File

@@ -1,47 +1,70 @@
use std::time::Duration; use std::time::Duration;
use miette::{bail, miette}; use miette::{Diagnostic, bail, miette};
use colored::Colorize; use colored::Colorize;
use comfy_table::{Attribute, Cell, Color, ContentArrangement, Table, presets::UTF8_FULL}; use comfy_table::{Attribute, Cell, Color, ContentArrangement, Table, presets::UTF8_FULL};
use miette::Result; use miette::Result;
use thiserror::Error;
use crate::{ use crate::{
config::{PGDConfig, PostgresVersion, Project}, config::{PGDConfig, PostgresVersion, Project},
controller::docker::DockerController, controller::{docker::DockerController, reconciler::Reconciler},
state::{InstanceState, StateManager}, state::{InstanceState, StateManager},
}; };
mod docker; mod docker;
mod utils; mod utils;
const MAX_RETRIES: u32 = 10; pub mod reconciler;
const VERIFY_DURATION_SECS: u64 = 5;
pub struct Controller { pub struct Context {
docker: DockerController, docker: DockerController,
project: Option<Project>, project: Option<Project>,
#[allow(unused)] instance: Option<InstanceState>,
state: StateManager, state: StateManager,
} }
impl Controller { impl Context {
pub async fn new() -> Result<Self> { pub async fn new(instance_override: Option<String>) -> Result<Self> {
let project = Project::load()?;
let state = StateManager::new()?;
let instance = match (project.as_ref(), instance_override) {
(None, None) => None,
// prioritizing provided instance name
(_, Some(instance)) => state.get(&instance),
(Some(project), None) => state.get(&project.name),
};
Ok(Self { Ok(Self {
docker: DockerController::new().await?, docker: DockerController::new().await?,
project: Project::load()?, project,
state: StateManager::load()?, instance,
state,
}) })
} }
}
/// Main CLI command dispatcher
pub struct Controller {
ctx: Context,
}
impl Controller {
pub fn new(ctx: Context) -> Self {
Self { ctx }
}
pub async fn init_project(&self) -> Result<()> { pub async fn init_project(&self) -> Result<()> {
if let Some(project) = &self.project { let reconciler = Reconciler { ctx: &self.ctx };
return self.reconcile(project).await;
if let Some(project) = &self.ctx.project {
return reconciler.reconcile(project).await;
} }
println!("{}", "Initializing new pgd project...".cyan()); println!("{}", "Initializing new pgd project...".cyan());
let mut versions = self.docker.available_versions().await?; let mut versions = self.ctx.docker.available_versions().await?;
versions.sort(); versions.sort();
let latest_version = versions let latest_version = versions
.last() .last()
@@ -93,217 +116,10 @@ impl Controller {
println!("{table}"); println!("{table}");
self.reconcile(&project).await?; reconciler.reconcile(&project).await?;
println!("\n{}", "✓ Project initialized successfully!".green().bold()); println!("\n{}", "✓ Project initialized successfully!".green().bold());
Ok(()) Ok(())
} }
pub async fn reconcile(&self, project: &Project) -> Result<()> {
self.docker
.ensure_version_downloaded(&project.config.version)
.await?;
self.ensure_container_running(project).await?;
Ok(())
}
async fn ensure_container_running(&self, project: &Project) -> Result<()> {
let mut state = StateManager::load()?;
let instance_state = state.get_mut(&project.name);
let container_id = match instance_state {
Some(instance) => match self.ensure_container_exists(instance).await? {
Some(id) => id,
None => self.update_project_container(project, &mut state).await?,
},
None => self.update_project_container(project, &mut state).await?,
};
let container_version = self
.docker
.get_container_postgres_version(&container_id)
.await?;
self.ensure_matches_project_version(project, &mut state, &container_id, container_version)
.await?;
if self
.docker
.is_container_running_by_id(&container_id)
.await?
{
println!("{}", "Container is already running".white());
return Ok(());
}
use indicatif::{ProgressBar, ProgressStyle};
let spinner = ProgressBar::new_spinner();
spinner.enable_steady_tick(Duration::from_millis(100));
spinner.set_style(
ProgressStyle::default_spinner()
.template("{spinner:.cyan} {msg}")
.unwrap(),
);
spinner.set_message("Starting container...");
for attempt in 1..=MAX_RETRIES {
spinner.set_message(format!(
"Starting container (attempt {}/{})",
attempt, MAX_RETRIES
));
let result = self.try_starting_container(&container_id, &spinner).await;
match result {
Ok(_) => {
spinner.finish_with_message(format!(
"{}",
"Container started successfully".green().bold()
));
return Ok(());
}
Err(err) => {
spinner.set_message(format!(
"{} {}/{} failed: {}",
"Attempt".yellow(),
attempt,
MAX_RETRIES,
err
));
}
}
if attempt < MAX_RETRIES {
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
}
}
spinner.finish_with_message(format!("{}", "Failed to start container".red()));
miette::bail!("Failed to start container after {} attempts", MAX_RETRIES)
}
async fn try_starting_container(
&self,
container_id: &String,
spinner: &indicatif::ProgressBar,
) -> Result<(), miette::Error> {
match self.docker.start_container_by_id(container_id).await {
Ok(_) => {
spinner.set_message(format!(
"{} ({}s)...",
"Verifying container is running".cyan(),
VERIFY_DURATION_SECS
));
for i in 0..VERIFY_DURATION_SECS {
spinner.set_message(format!(
"{} ({}/{}s)",
"Verifying container stability".cyan(),
i + 1,
VERIFY_DURATION_SECS
));
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
}
if self.docker.is_container_running_by_id(container_id).await? {
return Ok(());
} else {
miette::bail!("Container stopped unexpectedly after start");
}
}
Err(e) => {
miette::bail!("Failed to start: {}", e);
}
}
}
async fn update_project_container(
&self,
project: &Project,
state: &mut StateManager,
) -> Result<String, miette::Error> {
println!(
"{} {}",
"Creating container".cyan(),
project.container_name().yellow()
);
let id = self
.docker
.create_postgres_container(
&project.container_name(),
&project.config.version,
&project.config.password,
project.config.port,
)
.await?;
println!("{}", "Container created successfully".green());
state.set(
project.name.clone(),
crate::state::InstanceState::new(
id.clone(),
project.config.version,
project.config.port,
),
);
state.save()?;
Ok(id)
}
async fn ensure_container_exists(
&self,
instance: &InstanceState,
) -> Result<Option<String>, miette::Error> {
let mut container_id = None;
let id = &instance.container_id;
if self.docker.container_exists_by_id(id).await? {
container_id = Some(id.clone());
}
Ok(container_id)
}
async fn ensure_matches_project_version(
&self,
project: &Project,
_state: &mut StateManager,
_container_id: &String,
container_version: PostgresVersion,
) -> Result<(), miette::Error> {
let _: () = if container_version != project.config.version {
let needs_upgrade = container_version < project.config.version;
if needs_upgrade {
bail!("Upgrades are currently unsupported! :(");
// println!(
// "Upgrading PostgreSQL from {} to {}...",
// container_version, project.config.version
// );
// self.docker.stop_container(container_id, 10).await?;
// self.docker
// .upgrade_container_image(
// container_id,
// container_name,
// &project.config.version,
// &project.config.password,
// project.config.port,
// )
// .await?;
// if let Some(instance_state) = state.get_mut(&project.name) {
// instance_state.postgres_version = project.config.version.to_string();
// state.save()?;
// }
} else {
miette::bail!(
"Cannot downgrade PostgreSQL from {} to {}. Downgrades are not supported.",
container_version,
project.config.version
);
}
};
Ok(())
}
} }

View File

@@ -1,15 +1,17 @@
use miette::miette; use miette::{Diagnostic, miette};
use std::str::FromStr; use std::{io::Write, str::FromStr};
use thiserror::Error;
use bollard::{ use bollard::{
Docker, Docker,
query_parameters::{ query_parameters::{
CreateContainerOptions, CreateImageOptions, InspectContainerOptions, ListImagesOptions, CreateContainerOptions, CreateImageOptions, InspectContainerOptions, ListImagesOptions,
StartContainerOptions, StopContainerOptions, LogsOptions, StartContainerOptions, StopContainerOptions,
}, },
secret::ContainerCreateBody, secret::ContainerCreateBody,
}; };
use colored::Colorize; use colored::Colorize;
use futures::StreamExt;
use indicatif::MultiProgress; use indicatif::MultiProgress;
use miette::{Context, IntoDiagnostic, Result}; use miette::{Context, IntoDiagnostic, Result};
use tracing::info; use tracing::info;
@@ -26,6 +28,11 @@ fn format_image(ver: &PostgresVersion) -> String {
format!("{DOCKERHUB_POSTGRES}:{}", ver) format!("{DOCKERHUB_POSTGRES}:{}", ver)
} }
#[derive(Error, Debug, Diagnostic)]
#[error("Docker operation failed")]
#[diagnostic(code(pgd::docker))]
pub enum Error {}
pub struct DockerController { pub struct DockerController {
daemon: Docker, daemon: Docker,
} }
@@ -262,4 +269,31 @@ impl DockerController {
PostgresVersion::from_str(version_str) PostgresVersion::from_str(version_str)
.map_err(|_| miette!("Invalid version in label: {}", version_str)) .map_err(|_| miette!("Invalid version in label: {}", version_str))
} }
pub async fn stream_logs(&self, container_id: &str, follow: bool) -> Result<()> {
let options = Some(LogsOptions {
follow,
stdout: true,
stderr: true,
..Default::default()
});
let mut logs = self.daemon.logs(container_id, options);
while let Some(entry) = logs.next().await {
match entry {
Ok(output) => {
print!("{output}");
std::io::stdout().flush().ok();
}
Err(err) => {
return Err(err)
.into_diagnostic()
.wrap_err("Failed to stream container logs");
}
}
}
Ok(())
}
} }

View File

@@ -0,0 +1,242 @@
use std::time::Duration;
use miette::{Diagnostic, bail, miette};
use colored::Colorize;
use comfy_table::{Attribute, Cell, Color, ContentArrangement, Table, presets::UTF8_FULL};
use miette::Result;
use thiserror::Error;
use crate::{
config::{PGDConfig, PostgresVersion, Project},
controller::{
Context,
docker::{self, DockerController},
},
state::{InstanceState, StateManager},
};
const MAX_RETRIES: usize = 10;
const VERIFY_DURATION_SECS: u64 = 5;
#[derive(Error, Debug, Diagnostic)]
#[error("Failed to sync container state")]
#[diagnostic(code(pgd::reconcile))]
pub enum ReconcileError {
AlreadyRunning,
ImageDownload(#[source] docker::Error),
}
pub struct Reconciler<'a> {
pub ctx: &'a Context,
}
impl<'a> Reconciler<'a> {
pub async fn reconcile(&self, project: &Project) -> Result<()> {
self.ctx
.docker
.ensure_version_downloaded(&project.config.version)
.await?;
self.ensure_container_running(project).await?;
Ok(())
}
async fn ensure_container_running(&self, project: &Project) -> Result<()> {
let container_id = match &self.ctx.instance {
Some(instance) => match self.ensure_container_exists(instance).await? {
Some(id) => id,
None => self.update_project_container(project).await?,
},
None => self.update_project_container(project).await?,
};
let container_version = self
.ctx
.docker
.get_container_postgres_version(&container_id)
.await?;
self.ensure_matches_project_version(project, &container_id, container_version)
.await?;
if self
.ctx
.docker
.is_container_running_by_id(&container_id)
.await?
{
println!("{}", "Container is already running".white());
return Ok(());
}
use indicatif::{ProgressBar, ProgressStyle};
let spinner = ProgressBar::new_spinner();
spinner.enable_steady_tick(Duration::from_millis(100));
spinner.set_style(
ProgressStyle::default_spinner()
.template("{spinner:.cyan} {msg}")
.unwrap(),
);
spinner.set_message("Starting container...");
for attempt in 1..=MAX_RETRIES {
spinner.set_message(format!(
"Starting container (attempt {}/{})",
attempt, MAX_RETRIES
));
let result = self.try_starting_container(&container_id, &spinner).await;
match result {
Ok(_) => {
spinner.finish_with_message(format!(
"{}",
"Container started successfully".green().bold()
));
return Ok(());
}
Err(err) => {
spinner.set_message(format!(
"{} {}/{} failed: {}",
"Attempt".yellow(),
attempt,
MAX_RETRIES,
err
));
}
}
if attempt < MAX_RETRIES {
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
}
}
spinner.finish_with_message(format!("{}", "Failed to start container".red()));
miette::bail!("Failed to start container after {} attempts", MAX_RETRIES)
}
async fn try_starting_container(
&self,
container_id: &String,
spinner: &indicatif::ProgressBar,
) -> Result<(), miette::Error> {
match self.ctx.docker.start_container_by_id(container_id).await {
Ok(_) => {
spinner.set_message(format!(
"{} ({}s)...",
"Verifying container is running".cyan(),
VERIFY_DURATION_SECS
));
for i in 0..VERIFY_DURATION_SECS {
spinner.set_message(format!(
"{} ({}/{}s)",
"Verifying container stability".cyan(),
i + 1,
VERIFY_DURATION_SECS
));
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
}
if self
.ctx
.docker
.is_container_running_by_id(container_id)
.await?
{
return Ok(());
} else {
miette::bail!("Container stopped unexpectedly after start");
}
}
Err(e) => {
miette::bail!("Failed to start: {}", e);
}
}
}
async fn update_project_container(&self, project: &Project) -> Result<String, miette::Error> {
println!(
"{} {}",
"Creating container".cyan(),
project.container_name().yellow()
);
let id = self
.ctx
.docker
.create_postgres_container(
&project.container_name(),
&project.config.version,
&project.config.password,
project.config.port,
)
.await?;
println!("{}", "Container created successfully".green());
self.ctx.state.set(
project.name.clone(),
crate::state::InstanceState::new(
id.clone(),
project.config.version,
project.config.port,
),
);
self.ctx.state.save()?;
Ok(id)
}
async fn ensure_container_exists(
&self,
instance: &InstanceState,
) -> Result<Option<String>, miette::Error> {
let mut container_id = None;
let id = &instance.container_id;
if self.ctx.docker.container_exists_by_id(id).await? {
container_id = Some(id.clone());
}
Ok(container_id)
}
async fn ensure_matches_project_version(
&self,
project: &Project,
_container_id: &String,
container_version: PostgresVersion,
) -> Result<(), miette::Error> {
let _: () = if container_version != project.config.version {
let needs_upgrade = container_version < project.config.version;
if needs_upgrade {
bail!("Upgrades are currently unsupported! :(");
// println!(
// "Upgrading PostgreSQL from {} to {}...",
// container_version, project.config.version
// );
// self.docker.stop_container(container_id, 10).await?;
// self.docker
// .upgrade_container_image(
// container_id,
// container_name,
// &project.config.version,
// &project.config.password,
// project.config.port,
// )
// .await?;
// if let Some(instance_state) = state.get_mut(&project.name) {
// instance_state.postgres_version = project.config.version.to_string();
// state.save()?;
// }
} else {
miette::bail!(
"Cannot downgrade PostgreSQL from {} to {}. Downgrades are not supported.",
container_version,
project.config.version
);
}
};
Ok(())
}
}

View File

@@ -2,10 +2,7 @@ mod cli;
mod config; mod config;
mod state; mod state;
mod consts { mod consts;
pub const USERNAME: &str = "postgres";
pub const DATABASE: &str = "postgres";
}
mod controller; mod controller;
@@ -14,7 +11,10 @@ use cli::Cli;
use miette::Result; use miette::Result;
use tracing::info; use tracing::info;
use crate::controller::Controller; use crate::{
cli::ControlCommands,
controller::{Context, Controller},
};
#[tokio::main] #[tokio::main]
async fn main() -> Result<()> { async fn main() -> Result<()> {
@@ -24,18 +24,26 @@ async fn main() -> Result<()> {
init_tracing(cli.verbose); init_tracing(cli.verbose);
info!("pgd.start"); info!("pgd.start");
let controller = Controller::new().await?;
match cli.command { match cli.command {
cli::Commands::Init => controller.init_project().await?, cli::Commands::Init => {
cli::Commands::Instance { name, cmd } => todo!(), let ctx = Context::new(None).await?;
Controller::new(ctx).init_project().await?;
}
cli::Commands::Instance { name, cmd } => match cmd {
ControlCommands::Start => {}
ControlCommands::Stop => {}
ControlCommands::Restart => {}
ControlCommands::Destroy => {}
ControlCommands::Logs { follow } => todo!(),
ControlCommands::Status => {}
ControlCommands::Connection { format: _ } => {}
},
} }
Ok(()) Ok(())
} }
fn init_tracing(verbose: bool) { fn init_tracing(_verbose: bool) {
tracing_subscriber::fmt::init(); tracing_subscriber::fmt::init();
} }

View File

@@ -1,80 +1,55 @@
use miette::{Context, IntoDiagnostic, Result}; use miette::{Context, IntoDiagnostic, Result};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::cell::{Ref, RefCell};
use std::collections::HashMap; use std::collections::HashMap;
use std::path::PathBuf; use std::path::PathBuf;
use crate::config::PostgresVersion; use crate::config::PostgresVersion;
/// State information for a single PostgreSQL instance
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
pub struct InstanceState { pub struct InstanceState {
/// Docker container ID
pub container_id: String, pub container_id: String,
/// PostgreSQL version running in the container
pub postgres_version: PostgresVersion, pub postgres_version: PostgresVersion,
/// Port the container is bound to
pub port: u16, pub port: u16,
/// Timestamp when the instance was created (Unix timestamp)
pub created_at: u64, pub created_at: u64,
} }
/// Manages the global state file at ~/.pgd/state.json #[derive(Debug, Clone, Serialize, Deserialize, Default)]
#[derive(Debug, Clone, Serialize, Deserialize)] struct State {
pub struct StateManager {
/// Map of project name to instance state
#[serde(default)] #[serde(default)]
instances: HashMap<String, InstanceState>, instances: HashMap<String, InstanceState>,
} }
impl State {
/// Get the path to the state file (~/.pgd/state.json) fn new() -> Result<Self> {
fn state_file_path() -> Result<PathBuf> {
let home = std::env::var("HOME")
.into_diagnostic()
.wrap_err("Failed to get HOME environment variable")?;
Ok(PathBuf::from(home).join(".pgd").join("state.json"))
}
impl StateManager {
/// Load the state manager from disk, or create a new one if it doesn't exist
pub fn load() -> Result<Self> {
let state_path = state_file_path()?; let state_path = state_file_path()?;
if !state_path.exists() { if !state_path.exists() {
// Create the directory if it doesn't exist
if let Some(parent) = state_path.parent() { if let Some(parent) = state_path.parent() {
std::fs::create_dir_all(parent) std::fs::create_dir_all(parent)
.into_diagnostic() .into_diagnostic()
.wrap_err("Failed to create .pgd directory")?; .wrap_err("Failed to create .pgd directory")?;
} }
// Return empty state return Ok(Self::default());
return Ok(StateManager {
instances: HashMap::new(),
});
} }
let content = std::fs::read_to_string(&state_path) let content = std::fs::read_to_string(&state_path)
.into_diagnostic() .into_diagnostic()
.wrap_err_with(|| format!("Failed to read state file: {}", state_path.display()))?; .wrap_err_with(|| format!("Failed to read state file: {}", state_path.display()))?;
let state: StateManager = serde_json::from_str(&content) let state: Self = serde_json::from_str(&content)
.into_diagnostic() .into_diagnostic()
.wrap_err("Failed to parse state.json")?; .wrap_err("Failed to parse state.json")?;
Ok(state) Ok(state)
} }
/// Save the state manager to disk fn save(&self) -> Result<()> {
pub fn save(&self) -> Result<()> {
let state_path = state_file_path()?; let state_path = state_file_path()?;
// Ensure directory exists
if let Some(parent) = state_path.parent() { if let Some(parent) = state_path.parent() {
std::fs::create_dir_all(parent) std::fs::create_dir_all(parent)
.into_diagnostic() .into_diagnostic()
@@ -91,20 +66,30 @@ impl StateManager {
Ok(()) Ok(())
} }
/// Get mutable state for a specific project
pub fn get_mut(&mut self, project_name: &str) -> Option<&mut InstanceState> {
self.instances.get_mut(project_name)
} }
/// Set the state for a specific project pub struct StateManager(RefCell<State>);
pub fn set(&mut self, project_name: String, state: InstanceState) {
self.instances.insert(project_name, state); impl StateManager {
pub fn new() -> Result<Self> {
Ok(Self(RefCell::new(State::new()?)))
} }
/// Remove the state for a specific project pub fn save(&self) -> Result<()> {
pub fn remove(&mut self, project_name: &str) -> Option<InstanceState> { self.0.borrow().save()?;
self.instances.remove(project_name) Ok(())
}
pub fn get(&self, project_name: &str) -> Option<InstanceState> {
self.0.borrow().instances.get(project_name).cloned()
}
pub fn set(&self, project_name: String, state: InstanceState) {
self.0.borrow_mut().instances.insert(project_name, state);
}
pub fn remove(&self, project_name: &str) -> Option<InstanceState> {
self.0.borrow_mut().instances.remove(project_name)
} }
} }
@@ -123,3 +108,9 @@ impl InstanceState {
} }
} }
} }
fn state_file_path() -> Result<PathBuf> {
let home = std::env::home_dir().wrap_err("Failed to get HOME environment variable")?;
Ok(home.join(".pgd").join("state.json"))
}