diff --git a/Cargo.lock b/Cargo.lock index 519012a..6887647 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -45,6 +45,12 @@ dependencies = [ "libc", ] +[[package]] +name = "arc-swap" +version = "1.7.1" +source = "registry+/~https://github.com/rust-lang/crates.io-index" +checksum = "69f7f8c3906b62b754cd5326047894316021dcfe5a194c8ea52bdd94934a3457" + [[package]] name = "async-trait" version = "0.1.83" @@ -62,6 +68,15 @@ version = "1.4.0" source = "registry+/~https://github.com/rust-lang/crates.io-index" checksum = "ace50bade8e6234aa140d9a2f552bbee1db4d353f69b8217bc503490fc1a9f26" +[[package]] +name = "backon" +version = "1.3.0" +source = "registry+/~https://github.com/rust-lang/crates.io-index" +checksum = "ba5289ec98f68f28dd809fd601059e6aa908bb8f6108620930828283d4ee23d7" +dependencies = [ + "fastrand", +] + [[package]] name = "backtrace" version = "0.3.74" @@ -201,9 +216,9 @@ dependencies = [ [[package]] name = "chrono" -version = "0.4.38" +version = "0.4.39" source = "registry+/~https://github.com/rust-lang/crates.io-index" -checksum = "a21f936df1771bf62b77f047b726c4625ff2e8aa607c01ec06e5a05bd8463401" +checksum = "7e36cc9d416881d2e24f9a963be5fb1cd90966419ac844274161d10488b3e825" dependencies = [ "android-tzdata", "iana-time-zone", @@ -231,16 +246,6 @@ version = "0.4.0" source = "registry+/~https://github.com/rust-lang/crates.io-index" checksum = "6245d59a3e82a7fc217c5828a6692dbc6dfb63a0c8c90495621f7b9d79704a0e" -[[package]] -name = "core-foundation" -version = "0.9.4" -source = "registry+/~https://github.com/rust-lang/crates.io-index" -checksum = "91e195e091a93c46f7102ec7818a2aa394e1e1771c3ab4825963fa03e45afb8f" -dependencies = [ - "core-foundation-sys", - "libc", -] - [[package]] name = "core-foundation-sys" version = "0.8.7" @@ -362,6 +367,12 @@ dependencies = [ "syn 2.0.90", ] +[[package]] +name = "either" +version = "1.13.0" +source = "registry+/~https://github.com/rust-lang/crates.io-index" +checksum = "60b1af1c220855b6ceac025d3f6ecdd2b7c4894bfe9cd9bda4fbb4bc7c0d4cf0" + [[package]] name = "enum-as-inner" version = "0.4.0" @@ -380,6 +391,12 @@ version = "1.0.1" source = "registry+/~https://github.com/rust-lang/crates.io-index" checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5" +[[package]] +name = "fastrand" +version = "2.3.0" +source = "registry+/~https://github.com/rust-lang/crates.io-index" +checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be" + [[package]] name = "fnv" version = "1.0.7" @@ -401,6 +418,21 @@ version = "2.0.0" source = "registry+/~https://github.com/rust-lang/crates.io-index" checksum = "e6d5a32815ae3f33302d95fdcb2ce17862f8c65363dcfd29360480ba1001fc9c" +[[package]] +name = "futures" +version = "0.3.31" +source = "registry+/~https://github.com/rust-lang/crates.io-index" +checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + [[package]] name = "futures-channel" version = "0.3.31" @@ -408,6 +440,7 @@ source = "registry+/~https://github.com/rust-lang/crates.io-index" checksum = "2dff15bf788c671c1934e366d07e30c1814a8ef514e1af724a602e8a2fbe1b10" dependencies = [ "futures-core", + "futures-sink", ] [[package]] @@ -462,6 +495,7 @@ version = "0.3.31" source = "registry+/~https://github.com/rust-lang/crates.io-index" checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81" dependencies = [ + "futures-channel", "futures-core", "futures-io", "futures-macro", @@ -812,6 +846,15 @@ version = "2.10.1" source = "registry+/~https://github.com/rust-lang/crates.io-index" checksum = "ddc24109865250148c2e0f3d25d4f0f479571723792d3802153c60922a4fb708" +[[package]] +name = "itertools" +version = "0.13.0" +source = "registry+/~https://github.com/rust-lang/crates.io-index" +checksum = "413ee7dfc52ee1a4949ceeb7dbc8a33f2d6c088194d9f922fb8318faf1f01186" +dependencies = [ + "either", +] + [[package]] name = "itoa" version = "1.0.14" @@ -836,9 +879,9 @@ checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe" [[package]] name = "libc" -version = "0.2.167" +version = "0.2.168" source = "registry+/~https://github.com/rust-lang/crates.io-index" -checksum = "09d6582e104315a817dff97f75133544b2e094ee22447d2acf4a74e189ba06fc" +checksum = "5aaeb2981e0606ca11d79718f8bb01164f1d6ed75080182d3abf017e6d244b6d" [[package]] name = "linked-hash-map" @@ -950,8 +993,8 @@ dependencies = [ "percent-encoding", "rand", "rustc_version_runtime", - "rustls 0.21.12", - "rustls-pemfile 1.0.4", + "rustls", + "rustls-pemfile", "serde", "serde_bytes", "serde_with", @@ -963,13 +1006,23 @@ dependencies = [ "take_mut", "thiserror", "tokio", - "tokio-rustls 0.24.1", + "tokio-rustls", "tokio-util", "trust-dns-proto", "trust-dns-resolver", "typed-builder", "uuid", - "webpki-roots 0.25.4", + "webpki-roots", +] + +[[package]] +name = "num-bigint" +version = "0.4.6" +source = "registry+/~https://github.com/rust-lang/crates.io-index" +checksum = "a5e44f723f1133c9deac646763579fdb3ac745e418f2a7af9cd0c431da1f20b9" +dependencies = [ + "num-integer", + "num-traits", ] [[package]] @@ -978,6 +1031,15 @@ version = "0.1.0" source = "registry+/~https://github.com/rust-lang/crates.io-index" checksum = "51d515d32fb182ee37cda2ccdcb92950d6a3c2893aa280e540671c2cd0f3b1d9" +[[package]] +name = "num-integer" +version = "0.1.46" +source = "registry+/~https://github.com/rust-lang/crates.io-index" +checksum = "7969661fd2958a5cb096e56c8e1ad0444ac2bbcd0061bd28660485a44879858f" +dependencies = [ + "num-traits", +] + [[package]] name = "num-traits" version = "0.2.19" @@ -1002,12 +1064,6 @@ version = "1.20.2" source = "registry+/~https://github.com/rust-lang/crates.io-index" checksum = "1261fe7e33c73b354eab43b1273a57c8f967d0391e80353e51f764ac02cf6775" -[[package]] -name = "openssl-probe" -version = "0.1.5" -source = "registry+/~https://github.com/rust-lang/crates.io-index" -checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" - [[package]] name = "parking_lot" version = "0.12.3" @@ -1149,28 +1205,27 @@ dependencies = [ [[package]] name = "redis" -version = "0.25.4" +version = "0.27.6" source = "registry+/~https://github.com/rust-lang/crates.io-index" -checksum = "e0d7a6955c7511f60f3ba9e86c6d02b3c3f144f8c24b288d1f4e18074ab8bbec" +checksum = "09d8f99a4090c89cc489a94833c901ead69bfbf3877b4867d5482e321ee875bc" dependencies = [ + "arc-swap", "async-trait", + "backon", "bytes", "combine", + "futures", "futures-util", + "itertools", "itoa", + "num-bigint", "percent-encoding", "pin-project-lite", - "rustls 0.22.4", - "rustls-native-certs", - "rustls-pemfile 2.2.0", - "rustls-pki-types", "ryu", "sha1_smol", "tokio", - "tokio-rustls 0.25.0", "tokio-util", "url", - "webpki-roots 0.26.7", ] [[package]] @@ -1249,37 +1304,10 @@ checksum = "3f56a14d1f48b391359b22f731fd4bd7e43c97f3c50eee276f3aa09c94784d3e" dependencies = [ "log", "ring", - "rustls-webpki 0.101.7", + "rustls-webpki", "sct", ] -[[package]] -name = "rustls" -version = "0.22.4" -source = "registry+/~https://github.com/rust-lang/crates.io-index" -checksum = "bf4ef73721ac7bcd79b2b315da7779d8fc09718c6b3d2d1b2d94850eb8c18432" -dependencies = [ - "log", - "ring", - "rustls-pki-types", - "rustls-webpki 0.102.8", - "subtle", - "zeroize", -] - -[[package]] -name = "rustls-native-certs" -version = "0.7.3" -source = "registry+/~https://github.com/rust-lang/crates.io-index" -checksum = "e5bfb394eeed242e909609f56089eecfe5fda225042e8b171791b9c95f5931e5" -dependencies = [ - "openssl-probe", - "rustls-pemfile 2.2.0", - "rustls-pki-types", - "schannel", - "security-framework", -] - [[package]] name = "rustls-pemfile" version = "1.0.4" @@ -1289,21 +1317,6 @@ dependencies = [ "base64 0.21.7", ] -[[package]] -name = "rustls-pemfile" -version = "2.2.0" -source = "registry+/~https://github.com/rust-lang/crates.io-index" -checksum = "dce314e5fee3f39953d46bb63bb8a46d40c2f8fb7cc5a3b6cab2bde9721d6e50" -dependencies = [ - "rustls-pki-types", -] - -[[package]] -name = "rustls-pki-types" -version = "1.10.0" -source = "registry+/~https://github.com/rust-lang/crates.io-index" -checksum = "16f1201b3c9a7ee8039bcadc17b7e605e2945b27eee7631788c1bd2b0643674b" - [[package]] name = "rustls-webpki" version = "0.101.7" @@ -1314,32 +1327,12 @@ dependencies = [ "untrusted", ] -[[package]] -name = "rustls-webpki" -version = "0.102.8" -source = "registry+/~https://github.com/rust-lang/crates.io-index" -checksum = "64ca1bc8749bd4cf37b5ce386cc146580777b4e8572c7b97baf22c83f444bee9" -dependencies = [ - "ring", - "rustls-pki-types", - "untrusted", -] - [[package]] name = "ryu" version = "1.0.18" source = "registry+/~https://github.com/rust-lang/crates.io-index" checksum = "f3cb5ba0dc43242ce17de99c180e96db90b235b8a9fdc9543c96d2209116bd9f" -[[package]] -name = "schannel" -version = "0.1.27" -source = "registry+/~https://github.com/rust-lang/crates.io-index" -checksum = "1f29ebaa345f945cec9fbbc532eb307f0fdad8161f281b6369539c8d84876b3d" -dependencies = [ - "windows-sys 0.59.0", -] - [[package]] name = "scopeguard" version = "1.2.0" @@ -1356,29 +1349,6 @@ dependencies = [ "untrusted", ] -[[package]] -name = "security-framework" -version = "2.11.1" -source = "registry+/~https://github.com/rust-lang/crates.io-index" -checksum = "897b2245f0b511c87893af39b033e5ca9cce68824c4d7e7630b5a1d339658d02" -dependencies = [ - "bitflags 2.6.0", - "core-foundation", - "core-foundation-sys", - "libc", - "security-framework-sys", -] - -[[package]] -name = "security-framework-sys" -version = "2.12.1" -source = "registry+/~https://github.com/rust-lang/crates.io-index" -checksum = "fa39c7303dc58b5543c94d22c1766b0d31f2ee58306363ea622b10bbc075eaa2" -dependencies = [ - "core-foundation-sys", - "libc", -] - [[package]] name = "semver" version = "0.9.0" @@ -1732,18 +1702,7 @@ version = "0.24.1" source = "registry+/~https://github.com/rust-lang/crates.io-index" checksum = "c28327cf380ac148141087fbfb9de9d7bd4e84ab5d2c28fbc911d753de8a7081" dependencies = [ - "rustls 0.21.12", - "tokio", -] - -[[package]] -name = "tokio-rustls" -version = "0.25.0" -source = "registry+/~https://github.com/rust-lang/crates.io-index" -checksum = "775e0c0f0adb3a2f22a00c4745d728b479985fc15ee7ca6a2608388c5569860f" -dependencies = [ - "rustls 0.22.4", - "rustls-pki-types", + "rustls", "tokio", ] @@ -1961,15 +1920,6 @@ version = "0.25.4" source = "registry+/~https://github.com/rust-lang/crates.io-index" checksum = "5f20c57d8d7db6d3b86154206ae5d8fba62dd39573114de97c2cb0578251f8e1" -[[package]] -name = "webpki-roots" -version = "0.26.7" -source = "registry+/~https://github.com/rust-lang/crates.io-index" -checksum = "5d642ff16b7e79272ae451b7322067cdc17cadf68c23264be9d94a32319efe7e" -dependencies = [ - "rustls-pki-types", -] - [[package]] name = "widestring" version = "1.1.0" @@ -2025,15 +1975,6 @@ dependencies = [ "windows-targets 0.52.6", ] -[[package]] -name = "windows-sys" -version = "0.59.0" -source = "registry+/~https://github.com/rust-lang/crates.io-index" -checksum = "1e38bc4d79ed67fd075bcc251a1c39b32a1776bbe92e5bef1f0bf1f8c531853b" -dependencies = [ - "windows-targets 0.52.6", -] - [[package]] name = "windows-targets" version = "0.48.5" @@ -2252,12 +2193,6 @@ dependencies = [ "synstructure", ] -[[package]] -name = "zeroize" -version = "1.8.1" -source = "registry+/~https://github.com/rust-lang/crates.io-index" -checksum = "ced3678a2879b30306d323f4542626697a464a97c0a07c9aebf7ebca65cd4dde" - [[package]] name = "zerovec" version = "0.10.4" diff --git a/Cargo.toml b/Cargo.toml index cdc6b56..59877cc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,7 +12,7 @@ hyper = { version = "1.3.1", default-features = false, features = ["http1", "ser hyper-util = { version = "0.1.5", default-features = false, features = ["tokio"] } mongodb = { version = "2.8.2", default-features = false, features = ["tokio-runtime"] } prometheus = { version = "0.13.4", default-features = false } -redis = { version = "0.25.4", default-features = false, features = ["script", "tokio-rustls-comp", "tls-rustls-webpki-roots"] } +redis = { version = "0.27.5", default-features = false, features = ["script", "tokio-comp", "connection-manager"] } serde = { version = "1.0.203", default-features = false, features = ["derive"] } serde_json = { version = "1.0.117", default-features = false } tokio = { version = "1.38.0", default-features = false } diff --git a/README.md b/README.md index 8f32d27..c258dd0 100644 --- a/README.md +++ b/README.md @@ -29,13 +29,23 @@ This program listens to a [MongoDB Change Stream](https://www.mongodb.com/docs/m * If set, `changestream-to-redis` will expose Prometheus metrics at this address. * (optional) `REDIS_BATCH_SIZE`, default `1`. * If set, it overrides the default Redis batch size, leading to an increased throughput at a cost of increased latency (larger batches result in fewer but larger requests sent to Redis). + * (optional) `REDIS_CONNECTION_RETRY_COUNT`. + * [See docs](https://docs.rs/redis/0.27.5/redis/aio/struct.ConnectionManagerConfig.html#method.set_number_of_retries). + * (optional) `REDIS_CONNECTION_TIMEOUT_SECS`. + * [See docs](https://docs.rs/redis/0.27.5/redis/aio/struct.ConnectionManagerConfig.html#method.set_connection_timeout). + * (optional) `REDIS_MAX_DELAY_SECS`. + * [See docs](https://docs.rs/redis/0.27.5/redis/aio/struct.ConnectionManagerConfig.html#method.set_max_delay). * (optional) `REDIS_QUEUE_SIZE`, default `1024`. * If set, it overrides the default Redis queue size, accepting the MongoDB events earlier and temporarily storing them in memory. + * (optional) `REDIS_PUBLISH_RETRY_COUNT`, default `0`. + * The amount of times a publication to Redis can be retried. + * (optional) `REDIS_RESPONSE_TIMEOUT_SECS`. + * [See docs](https://docs.rs/redis/0.27.5/redis/aio/struct.ConnectionManagerConfig.html#method.set_response_timeout). ## Limitations * **No change stream resumption.** It is planned, but at the moment the program is entirely stateless. -* **No error handling.** As soon as the change stream or Redis communication fails, the program exits. It is planned, though `changestream-to-redis` is meant to restart as soon as it exits. +* **No MongoDB error handling.** As soon as the change stream fails, the program exits. It is planned, though `changestream-to-redis` is meant to restart as soon as it exits. ## Performance diff --git a/src/config.rs b/src/config.rs index 1a8395e..351cc2b 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,6 +1,13 @@ use mongodb::options::FullDocumentType; +use redis::aio::ConnectionManagerConfig; use serde_json::from_str; -use std::{env::var, vec::Vec}; +use std::{env::var, time::Duration, vec::Vec}; + +macro_rules! var_parse { + ($name:expr) => { + var($name).ok().map(|value| value.parse().unwrap()) + }; +} pub struct Config { /// If true, all events are logged before being sent to Redis. @@ -34,6 +41,9 @@ pub struct Config { pub metrics_address: Option, pub mongo_url: String, pub redis_batch_size: usize, + #[expect(clippy::struct_field_names)] + pub redis_connection_manager_config: ConnectionManagerConfig, + pub redis_publish_retry_count: usize, pub redis_queue_size: usize, pub redis_url: String, } @@ -42,9 +52,7 @@ impl Config { pub fn from_env() -> Self { Self { debug: var("DEBUG").is_ok(), - deduplication: var("DEDUPLICATION") - .ok() - .map(|value| value.parse().unwrap()), + deduplication: var_parse!("DEDUPLICATION"), excluded_collections: var("EXCLUDED_COLLECTIONS") .ok() .map(|value| value.split(',').map(ToString::to_string).collect()), @@ -56,13 +64,33 @@ impl Config { .map(|value| value.split(',').map(ToString::to_string).collect()), metrics_address: var("METRICS_ADDRESS").ok(), mongo_url: var("MONGO_URL").expect("MONGO_URL is required"), - redis_batch_size: var("REDIS_BATCH_SIZE") - .ok() - .map_or(1, |value| value.parse().unwrap()), - redis_queue_size: var("REDIS_QUEUE_SIZE") - .ok() - .map_or(1024, |value| value.parse().unwrap()), + redis_batch_size: var_parse!("REDIS_BATCH_SIZE").unwrap_or(1), + redis_connection_manager_config: Self::redis_connection_manager_config_from_env(), + redis_publish_retry_count: var_parse!("REDIS_PUBLISH_RETRY_COUNT").unwrap_or(0), + redis_queue_size: var_parse!("REDIS_QUEUE_SIZE").unwrap_or(1024), redis_url: var("REDIS_URL").expect("REDIS_URL is required"), } } + + fn redis_connection_manager_config_from_env() -> ConnectionManagerConfig { + let mut config = ConnectionManagerConfig::new(); + if let Some(x) = var_parse!("REDIS_CONNECTION_RETRY_COUNT") { + config = config.set_number_of_retries(x); + } + + if let Some(x) = var_parse!("REDIS_CONNECTION_TIMEOUT_SECS").map(Duration::from_secs) { + config = config.set_connection_timeout(x); + } + + #[expect(clippy::cast_possible_truncation)] + if let Some(x) = var_parse!("REDIS_MAX_DELAY_SECS").map(Duration::from_secs) { + config = config.set_max_delay(x.as_millis() as u64); + } + + if let Some(x) = var_parse!("REDIS_RESPONSE_TIMEOUT_SECS").map(Duration::from_secs) { + config = config.set_response_timeout(x); + } + + config + } } diff --git a/src/ejson.rs b/src/ejson.rs index c222d30..ac8928c 100644 --- a/src/ejson.rs +++ b/src/ejson.rs @@ -32,7 +32,7 @@ impl Ejson for Bson { // Replace everything else with `null`s. v => { - println!("Unrecognized BSON value found: {v}"); + eprintln!("Unrecognized BSON value found: {v}"); Value::Null } } diff --git a/src/redis.rs b/src/redis.rs index 65903bb..e0904e1 100644 --- a/src/redis.rs +++ b/src/redis.rs @@ -1,5 +1,5 @@ use crate::{ejson::Ejson, event::Event, Config}; -use redis::{aio::MultiplexedConnection, Client, RedisError, Script}; +use redis::{aio::ConnectionManager, Client, RedisError, Script}; const SCRIPT_WITH_DEDUPLICATION: &str = r#" for index = 1, tonumber(ARGV[1]) do @@ -21,14 +21,14 @@ const SCRIPT_WITHOUT_DEDUPLICATION: &str = r#" "#; pub struct Redis { - connection: MultiplexedConnection, + connection_manager: ConnectionManager, script: Script, } impl Redis { pub async fn new(config: &Config) -> Result { - let connection = Client::open(config.redis_url.as_str())? - .get_multiplexed_async_connection() + let connection_manager = Client::open(config.redis_url.as_str())? + .get_connection_manager_with_config(config.redis_connection_manager_config.clone()) .await?; println!("Redis connection initialized."); @@ -37,20 +37,23 @@ impl Redis { Some(_) => SCRIPT_WITH_DEDUPLICATION, }); - Ok(Self { connection, script }) + Ok(Self { + connection_manager, + script, + }) } pub async fn publish(&mut self, config: &Config, events: Vec) -> Result<(), RedisError> { if config.debug { - for Event { ns, id, op, .. } in &events { - println!("{}::{} {}", ns, id, op.clone().into_ejson()); + for Event { id, ns, op, .. } in &events { + println!("{ns}::{id} {}", op.clone().into_ejson()); } } let mut invocation = self.script.prepare_invoke(); invocation.arg(events.len()); - for Event { ev, ns, id, op, .. } in events { + for Event { ev, id, ns, op, .. } in events { invocation.arg(ns); invocation.arg(id); invocation.arg(op.into_ejson().to_string()); @@ -61,6 +64,24 @@ impl Redis { } } - invocation.invoke_async(&mut self.connection).await + let retry_limit = config.redis_publish_retry_count; + for retry in 0..=retry_limit { + match invocation.invoke_async(&mut self.connection_manager).await { + Ok(()) => { + if retry > 0 { + eprintln!("Redis publication succeeded (retry #{retry})"); + } + + return Ok(()); + } + // All I/O errors can be safely retried. + Err(error) if !error.is_io_error() || retry == retry_limit => return Err(error), + Err(error) => { + eprintln!("Redis error (retry #{retry}): {error:?}"); + } + } + } + + unreachable!() } }