Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade compute_tools and compute_api to edition 2024 #10983

Merged
merged 3 commits into from
Feb 26, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion compute_tools/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "compute_tools"
version = "0.1.0"
edition.workspace = true
edition = "2024"
license.workspace = true

[features]
Expand Down
29 changes: 14 additions & 15 deletions compute_tools/src/bin/compute_ctl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,34 +40,33 @@ use std::path::Path;
use std::process::exit;
use std::str::FromStr;
use std::sync::atomic::Ordering;
use std::sync::{mpsc, Arc, Condvar, Mutex, RwLock};
use std::{thread, time::Duration};
use std::sync::{Arc, Condvar, Mutex, RwLock, mpsc};
use std::thread;
use std::time::Duration;

use anyhow::{Context, Result};
use chrono::Utc;
use clap::Parser;
use compute_tools::disk_quota::set_disk_quota;
use compute_tools::http::server::Server;
use compute_tools::lsn_lease::launch_lsn_lease_bg_task_for_static;
use signal_hook::consts::{SIGQUIT, SIGTERM};
use signal_hook::{consts::SIGINT, iterator::Signals};
use tracing::{error, info, warn};
use url::Url;

use compute_api::responses::{ComputeCtlConfig, ComputeStatus};
use compute_api::spec::ComputeSpec;

use compute_tools::compute::{
forward_termination_signal, ComputeNode, ComputeState, ParsedSpec, PG_PID,
ComputeNode, ComputeState, PG_PID, ParsedSpec, forward_termination_signal,
};
use compute_tools::configurator::launch_configurator;
use compute_tools::disk_quota::set_disk_quota;
use compute_tools::extension_server::get_pg_version_string;
use compute_tools::http::server::Server;
use compute_tools::logger::*;
use compute_tools::lsn_lease::launch_lsn_lease_bg_task_for_static;
use compute_tools::monitor::launch_monitor;
use compute_tools::params::*;
use compute_tools::spec::*;
use compute_tools::swap::resize_swap;
use rlimit::{setrlimit, Resource};
use rlimit::{Resource, setrlimit};
use signal_hook::consts::{SIGINT, SIGQUIT, SIGTERM};
use signal_hook::iterator::Signals;
use tracing::{error, info, warn};
use url::Url;
use utils::failpoint_support;

// this is an arbitrary build tag. Fine as a default / for testing purposes
Expand Down Expand Up @@ -149,6 +148,8 @@ struct Cli {
fn main() -> Result<()> {
let cli = Cli::parse();

let scenario = failpoint_support::init();

// For historical reasons, the main thread that processes the spec and launches postgres
// is synchronous, but we always have this tokio runtime available and we "enter" it so
// that you can use tokio::spawn() and tokio::runtime::Handle::current().block_on(...)
Expand All @@ -160,8 +161,6 @@ fn main() -> Result<()> {

let build_tag = runtime.block_on(init())?;

let scenario = failpoint_support::init();

// enable core dumping for all child processes
setrlimit(Resource::CORE, rlimit::INFINITY, rlimit::INFINITY)?;

Expand Down
10 changes: 6 additions & 4 deletions compute_tools/src/bin/fast_import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@
//! docker push localhost:3030/localregistry/compute-node-v14:latest
//! ```

use anyhow::{bail, Context};
use anyhow::{Context, bail};
use aws_config::BehaviorVersion;
use camino::{Utf8Path, Utf8PathBuf};
use clap::{Parser, Subcommand};
use compute_tools::extension_server::{get_pg_version, PostgresMajorVersion};
use compute_tools::extension_server::{PostgresMajorVersion, get_pg_version};
use nix::unistd::Pid;
use tracing::{error, info, info_span, warn, Instrument};
use tracing::{Instrument, error, info, info_span, warn};
use utils::fs_ext::is_directory_empty;

#[path = "fast_import/aws_s3_sync.rs"]
Expand Down Expand Up @@ -558,7 +558,9 @@ async fn cmd_dumprestore(
decode_connstring(kms_client.as_ref().unwrap(), &key_id, dest_ciphertext)
.await?
} else {
bail!("destination connection string must be provided in spec for dump_restore command");
bail!(
"destination connection string must be provided in spec for dump_restore command"
);
};

(source, dest)
Expand Down
3 changes: 1 addition & 2 deletions compute_tools/src/bin/fast_import/aws_s3_sync.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
use camino::{Utf8Path, Utf8PathBuf};
use tokio::task::JoinSet;
use tracing::{info, warn};
use walkdir::WalkDir;

use super::s3_uri::S3Uri;

use tracing::{info, warn};

const MAX_PARALLEL_UPLOADS: usize = 10;

/// Upload all files from 'local' to 'remote'
Expand Down
3 changes: 2 additions & 1 deletion compute_tools/src/bin/fast_import/s3_uri.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use anyhow::Result;
use std::str::FromStr;

use anyhow::Result;

/// Struct to hold parsed S3 components
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct S3Uri {
Expand Down
18 changes: 10 additions & 8 deletions compute_tools/src/catalog.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,20 @@
use std::path::Path;
use std::process::Stdio;
use std::result::Result;
use std::sync::Arc;

use compute_api::responses::CatalogObjects;
use futures::Stream;
use postgres::NoTls;
use std::{path::Path, process::Stdio, result::Result, sync::Arc};
use tokio::{
io::{AsyncBufReadExt, BufReader},
process::Command,
spawn,
};
use tokio::io::{AsyncBufReadExt, BufReader};
use tokio::process::Command;
use tokio::spawn;
use tokio_stream::{self as stream, StreamExt};
use tokio_util::codec::{BytesCodec, FramedRead};
use tracing::warn;

use crate::compute::ComputeNode;
use crate::pg_helpers::{get_existing_dbs_async, get_existing_roles_async, postgres_conf_for_db};
use compute_api::responses::CatalogObjects;

pub async fn get_dbs_and_roles(compute: &Arc<ComputeNode>) -> anyhow::Result<CatalogObjects> {
let conf = compute.get_tokio_conn_conf(Some("compute_ctl:get_dbs_and_roles"));
Expand Down Expand Up @@ -55,7 +57,7 @@ pub enum SchemaDumpError {
pub async fn get_database_schema(
compute: &Arc<ComputeNode>,
dbname: &str,
) -> Result<impl Stream<Item = Result<bytes::Bytes, std::io::Error>>, SchemaDumpError> {
) -> Result<impl Stream<Item = Result<bytes::Bytes, std::io::Error>> + use<>, SchemaDumpError> {
let pgbin = &compute.pgbin;
let basepath = Path::new(pgbin).parent().unwrap();
let pgdump = basepath.join("pg_dump");
Expand Down
2 changes: 1 addition & 1 deletion compute_tools/src/checker.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use anyhow::{anyhow, Ok, Result};
use anyhow::{Ok, Result, anyhow};
use tokio_postgres::NoTls;
use tracing::{error, instrument, warn};

Expand Down
48 changes: 23 additions & 25 deletions compute_tools/src/compute.rs
Original file line number Diff line number Diff line change
@@ -1,42 +1,37 @@
use std::collections::{HashMap, HashSet};
use std::env;
use std::fs;
use std::iter::once;
use std::os::unix::fs::{symlink, PermissionsExt};
use std::os::unix::fs::{PermissionsExt, symlink};
use std::path::Path;
use std::process::{Command, Stdio};
use std::str::FromStr;
use std::sync::atomic::AtomicU32;
use std::sync::atomic::Ordering;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::{Arc, Condvar, Mutex, RwLock};
use std::time::Duration;
use std::time::Instant;
use std::time::{Duration, Instant};
use std::{env, fs};

use anyhow::{Context, Result};
use chrono::{DateTime, Utc};
use compute_api::spec::{Database, PgIdent, Role};
use compute_api::privilege::Privilege;
use compute_api::responses::{ComputeMetrics, ComputeStatus};
use compute_api::spec::{
ComputeFeature, ComputeMode, ComputeSpec, Database, ExtVersion, PgIdent, Role,
};
use futures::StreamExt;
use futures::future::join_all;
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use nix::sys::signal::{Signal, kill};
use nix::unistd::Pid;
use postgres;
use postgres::error::SqlState;
use postgres::NoTls;
use postgres::error::SqlState;
use remote_storage::{DownloadError, RemotePath};
use tokio::spawn;
use tracing::{debug, error, info, instrument, warn};
use utils::id::{TenantId, TimelineId};
use utils::lsn::Lsn;

use compute_api::privilege::Privilege;
use compute_api::responses::{ComputeMetrics, ComputeStatus};
use compute_api::spec::{ComputeFeature, ComputeMode, ComputeSpec, ExtVersion};
use utils::measured_stream::MeasuredReader;

use nix::sys::signal::{kill, Signal};
use remote_storage::{DownloadError, RemotePath};
use tokio::spawn;

use crate::installed_extensions::get_installed_extensions;
use crate::local_proxy;
use crate::pg_helpers::*;
use crate::spec::*;
use crate::spec_apply::ApplySpecPhase::{
Expand All @@ -45,13 +40,12 @@ use crate::spec_apply::ApplySpecPhase::{
HandleNeonExtension, HandleOtherExtensions, RenameAndDeleteDatabases, RenameRoles,
RunInEachDatabase,
};
use crate::spec_apply::PerDatabasePhase;
use crate::spec_apply::PerDatabasePhase::{
ChangeSchemaPerms, DeleteDBRoleReferences, DropLogicalSubscriptions, HandleAnonExtension,
};
use crate::spec_apply::{apply_operations, MutableApplyContext, DB};
use crate::spec_apply::{DB, MutableApplyContext, PerDatabasePhase, apply_operations};
use crate::sync_sk::{check_if_synced, ping_safekeeper};
use crate::{config, extension_server};
use crate::{config, extension_server, local_proxy};

pub static SYNC_SAFEKEEPERS_PID: AtomicU32 = AtomicU32::new(0);
pub static PG_PID: AtomicU32 = AtomicU32::new(0);
Expand Down Expand Up @@ -1318,7 +1312,7 @@ impl ComputeNode {
// Merge-apply spec & changes to PostgreSQL state.
self.apply_spec_sql(spec.clone(), conf.clone(), max_concurrent_connections)?;

if let Some(ref local_proxy) = &spec.clone().local_proxy_config {
if let Some(local_proxy) = &spec.clone().local_proxy_config {
info!("configuring local_proxy");
local_proxy::configure(local_proxy).context("apply_config local_proxy")?;
}
Expand Down Expand Up @@ -1538,7 +1532,9 @@ impl ComputeNode {
&postgresql_conf_path,
"neon.disable_logical_replication_subscribers=false",
)? {
info!("updated postgresql.conf to set neon.disable_logical_replication_subscribers=false");
info!(
"updated postgresql.conf to set neon.disable_logical_replication_subscribers=false"
);
}
self.pg_reload_conf()?;
}
Expand Down Expand Up @@ -1765,7 +1761,9 @@ LIMIT 100",
info!("extension already downloaded, skipping re-download");
return Ok(0);
} else if start_time_delta < HANG_TIMEOUT && !first_try {
info!("download {ext_archive_name} already started by another process, hanging untill completion or timeout");
info!(
"download {ext_archive_name} already started by another process, hanging untill completion or timeout"
);
let mut interval = tokio::time::interval(tokio::time::Duration::from_millis(500));
loop {
info!("waiting for download");
Expand Down
5 changes: 2 additions & 3 deletions compute_tools/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,10 @@ use std::io::prelude::*;
use std::path::Path;

use anyhow::Result;

use crate::pg_helpers::escape_conf_value;
use crate::pg_helpers::{GenericOptionExt, PgOptionsSerialize};
use compute_api::spec::{ComputeMode, ComputeSpec, GenericOption};

use crate::pg_helpers::{GenericOptionExt, PgOptionsSerialize, escape_conf_value};

/// Check that `line` is inside a text file and put it there if it is not.
/// Create file if it doesn't exist.
pub fn line_in_file(path: &Path, line: &str) -> Result<bool> {
Expand Down
3 changes: 1 addition & 2 deletions compute_tools/src/configurator.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
use std::sync::Arc;
use std::thread;

use tracing::{error, info, instrument};

use compute_api::responses::ComputeStatus;
use tracing::{error, info, instrument};

use crate::compute::ComputeNode;

Expand Down
13 changes: 8 additions & 5 deletions compute_tools/src/extension_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,15 +71,15 @@ More specifically, here is an example ext_index.json
}
}
*/
use anyhow::Result;
use anyhow::{bail, Context};
use std::path::Path;
use std::str;

use anyhow::{Context, Result, bail};
use bytes::Bytes;
use compute_api::spec::RemoteExtSpec;
use regex::Regex;
use remote_storage::*;
use reqwest::StatusCode;
use std::path::Path;
use std::str;
use tar::Archive;
use tracing::info;
use tracing::log::warn;
Expand Down Expand Up @@ -244,7 +244,10 @@ pub fn create_control_files(remote_extensions: &RemoteExtSpec, pgbin: &str) {
info!("writing file {:?}{:?}", control_path, control_content);
std::fs::write(control_path, control_content).unwrap();
} else {
warn!("control file {:?} exists both locally and remotely. ignoring the remote version.", control_path);
warn!(
"control file {:?} exists both locally and remotely. ignoring the remote version.",
control_path
);
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion compute_tools/src/http/extract/json.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::ops::{Deref, DerefMut};

use axum::extract::{rejection::JsonRejection, FromRequest, Request};
use axum::extract::rejection::JsonRejection;
use axum::extract::{FromRequest, Request};
use compute_api::responses::GenericAPIError;
use http::StatusCode;

Expand Down
6 changes: 4 additions & 2 deletions compute_tools/src/http/extract/path.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use std::ops::{Deref, DerefMut};

use axum::extract::{rejection::PathRejection, FromRequestParts};
use axum::extract::FromRequestParts;
use axum::extract::rejection::PathRejection;
use compute_api::responses::GenericAPIError;
use http::{request::Parts, StatusCode};
use http::StatusCode;
use http::request::Parts;

/// Custom `Path` extractor, so that we can format errors into
/// `JsonResponse<GenericAPIError>`.
Expand Down
6 changes: 4 additions & 2 deletions compute_tools/src/http/extract/query.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use std::ops::{Deref, DerefMut};

use axum::extract::{rejection::QueryRejection, FromRequestParts};
use axum::extract::FromRequestParts;
use axum::extract::rejection::QueryRejection;
use compute_api::responses::GenericAPIError;
use http::{request::Parts, StatusCode};
use http::StatusCode;
use http::request::Parts;

/// Custom `Query` extractor, so that we can format errors into
/// `JsonResponse<GenericAPIError>`.
Expand Down
6 changes: 4 additions & 2 deletions compute_tools/src/http/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use axum::{body::Body, response::Response};
use axum::body::Body;
use axum::response::Response;
use compute_api::responses::{ComputeStatus, GenericAPIError};
use http::{header::CONTENT_TYPE, StatusCode};
use http::StatusCode;
use http::header::CONTENT_TYPE;
use serde::Serialize;
use tracing::error;

Expand Down
7 changes: 5 additions & 2 deletions compute_tools/src/http/routes/check_writability.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
use std::sync::Arc;

use axum::{extract::State, response::Response};
use axum::extract::State;
use axum::response::Response;
use compute_api::responses::ComputeStatus;
use http::StatusCode;

use crate::{checker::check_writability, compute::ComputeNode, http::JsonResponse};
use crate::checker::check_writability;
use crate::compute::ComputeNode;
use crate::http::JsonResponse;

/// Check that the compute is currently running.
pub(in crate::http) async fn is_writable(State(compute): State<Arc<ComputeNode>>) -> Response {
Expand Down
Loading
Loading