redis/client.rs
1use std::time::Duration;
2
3#[cfg(feature = "aio")]
4use crate::aio::{AsyncPushSender, DefaultAsyncDNSResolver};
5#[cfg(feature = "token-based-authentication")]
6use crate::auth::StreamingCredentialsProvider;
7#[cfg(feature = "aio")]
8use crate::io::AsyncDNSResolver;
9use crate::{
10 connection::{Connection, ConnectionInfo, ConnectionLike, IntoConnectionInfo, connect},
11 types::{RedisResult, Value},
12};
13#[cfg(feature = "aio")]
14use std::pin::Pin;
15
16#[cfg(feature = "tls-rustls")]
17use crate::tls::{TlsCertificates, inner_build_with_tls};
18
19#[cfg(feature = "cache-aio")]
20use crate::caching::CacheConfig;
21#[cfg(all(
22 feature = "cache-aio",
23 any(feature = "connection-manager", feature = "cluster-async")
24))]
25use crate::caching::CacheManager;
26
27/// The client type.
28#[derive(Debug, Clone)]
29pub struct Client {
30 pub(crate) connection_info: ConnectionInfo,
31}
32
33/// The client acts as connector to the redis server. By itself it does not
34/// do much other than providing a convenient way to fetch a connection from
35/// it. In the future the plan is to provide a connection pool in the client.
36///
37/// When opening a client a URL in the following format should be used:
38///
39/// ```plain
40/// redis://host:port/db
41/// ```
42///
43/// Example usage::
44///
45/// ```rust,no_run
46/// let client = redis::Client::open("redis://127.0.0.1/").unwrap();
47/// let con = client.get_connection().unwrap();
48/// ```
49impl Client {
50 /// Connects to a redis server and returns a client. This does not
51 /// actually open a connection yet but it does perform some basic
52 /// checks on the URL that might make the operation fail.
53 pub fn open<T: IntoConnectionInfo>(params: T) -> RedisResult<Client> {
54 Ok(Client {
55 connection_info: params.into_connection_info()?,
56 })
57 }
58
59 /// Instructs the client to actually connect to redis and returns a
60 /// connection object. The connection object can be used to send
61 /// commands to the server. This can fail with a variety of errors
62 /// (like unreachable host) so it's important that you handle those
63 /// errors.
64 pub fn get_connection(&self) -> RedisResult<Connection> {
65 connect(&self.connection_info, None)
66 }
67
68 /// Instructs the client to actually connect to redis with specified
69 /// timeout and returns a connection object. The connection object
70 /// can be used to send commands to the server. This can fail with
71 /// a variety of errors (like unreachable host) so it's important
72 /// that you handle those errors.
73 pub fn get_connection_with_timeout(&self, timeout: Duration) -> RedisResult<Connection> {
74 connect(&self.connection_info, Some(timeout))
75 }
76
77 /// Returns a reference of client connection info object.
78 pub fn get_connection_info(&self) -> &ConnectionInfo {
79 &self.connection_info
80 }
81
82 /// Constructs a new `Client` with parameters necessary to create a TLS connection.
83 ///
84 /// - `conn_info` - URL using the `rediss://` scheme.
85 /// - `tls_certs` - `TlsCertificates` structure containing:
86 /// - `client_tls` - Optional `ClientTlsConfig` containing byte streams for
87 /// - `client_cert` - client's byte stream containing client certificate in PEM format
88 /// - `client_key` - client's byte stream containing private key in PEM format
89 /// - `root_cert` - Optional byte stream yielding PEM formatted file for root certificates.
90 ///
91 /// If `ClientTlsConfig` ( cert+key pair ) is not provided, then client-side authentication is not enabled.
92 /// If `root_cert` is not provided, then system root certificates are used instead.
93 ///
94 /// # Examples
95 ///
96 /// ```no_run
97 /// use std::{fs::File, io::{BufReader, Read}};
98 ///
99 /// use redis::{Client, AsyncTypedCommands as _, TlsCertificates, ClientTlsConfig};
100 ///
101 /// async fn do_redis_code(
102 /// url: &str,
103 /// root_cert_file: &str,
104 /// cert_file: &str,
105 /// key_file: &str
106 /// ) -> redis::RedisResult<()> {
107 /// let root_cert_file = File::open(root_cert_file).expect("cannot open private cert file");
108 /// let mut root_cert_vec = Vec::new();
109 /// BufReader::new(root_cert_file)
110 /// .read_to_end(&mut root_cert_vec)
111 /// .expect("Unable to read ROOT cert file");
112 ///
113 /// let cert_file = File::open(cert_file).expect("cannot open private cert file");
114 /// let mut client_cert_vec = Vec::new();
115 /// BufReader::new(cert_file)
116 /// .read_to_end(&mut client_cert_vec)
117 /// .expect("Unable to read client cert file");
118 ///
119 /// let key_file = File::open(key_file).expect("cannot open private key file");
120 /// let mut client_key_vec = Vec::new();
121 /// BufReader::new(key_file)
122 /// .read_to_end(&mut client_key_vec)
123 /// .expect("Unable to read client key file");
124 ///
125 /// let client = Client::build_with_tls(
126 /// url,
127 /// TlsCertificates {
128 /// client_tls: Some(ClientTlsConfig{
129 /// client_cert: client_cert_vec,
130 /// client_key: client_key_vec,
131 /// }),
132 /// root_cert: Some(root_cert_vec),
133 /// }
134 /// )
135 /// .expect("Unable to build client");
136 ///
137 /// let connection_info = client.get_connection_info();
138 ///
139 /// println!(">>> connection info: {connection_info:?}");
140 ///
141 /// let mut con = client.get_multiplexed_async_connection().await?;
142 ///
143 /// con.set("key1", b"foo").await?;
144 ///
145 /// redis::cmd("SET")
146 /// .arg(&["key2", "bar"])
147 /// .exec_async(&mut con)
148 /// .await?;
149 ///
150 /// let result = redis::cmd("MGET")
151 /// .arg(&["key1", "key2"])
152 /// .query_async(&mut con)
153 /// .await;
154 /// assert_eq!(result, Ok(("foo".to_string(), b"bar".to_vec())));
155 /// println!("Result from MGET: {result:?}");
156 ///
157 /// Ok(())
158 /// }
159 /// ```
160 #[cfg(feature = "tls-rustls")]
161 pub fn build_with_tls<C: IntoConnectionInfo>(
162 conn_info: C,
163 tls_certs: TlsCertificates,
164 ) -> RedisResult<Client> {
165 let connection_info = conn_info.into_connection_info()?;
166
167 inner_build_with_tls(connection_info, &tls_certs)
168 }
169}
170
171#[cfg(feature = "cache-aio")]
172#[derive(Clone)]
173pub(crate) enum Cache {
174 Config(CacheConfig),
175 #[cfg(any(feature = "connection-manager", feature = "cluster-async"))]
176 Manager(CacheManager),
177}
178
179#[cfg(feature = "aio")]
180pub(crate) const DEFAULT_RESPONSE_TIMEOUT: Option<Duration> = Some(Duration::from_millis(500));
181#[cfg(any(feature = "aio", feature = "cluster"))]
182pub(crate) const DEFAULT_CONNECTION_TIMEOUT: Option<Duration> = Some(Duration::from_secs(1));
183
184/// Options for creation of async connection
185#[cfg(feature = "aio")]
186#[derive(Clone)]
187pub struct AsyncConnectionConfig {
188 /// Maximum time to wait for a response from the server
189 pub(crate) response_timeout: Option<Duration>,
190 /// Maximum time to wait for a connection to be established
191 pub(crate) connection_timeout: Option<Duration>,
192 pub(crate) push_sender: Option<std::sync::Arc<dyn AsyncPushSender>>,
193 #[cfg(feature = "cache-aio")]
194 pub(crate) cache: Option<Cache>,
195 pub(crate) dns_resolver: Option<std::sync::Arc<dyn AsyncDNSResolver>>,
196 pub(crate) pipeline_buffer_size: Option<usize>,
197 /// Optional credentials provider for dynamic authentication (e.g., token-based authentication)
198 #[cfg(feature = "token-based-authentication")]
199 pub(crate) credentials_provider: Option<std::sync::Arc<dyn StreamingCredentialsProvider>>,
200}
201
202#[cfg(feature = "aio")]
203impl Default for AsyncConnectionConfig {
204 fn default() -> Self {
205 Self {
206 response_timeout: DEFAULT_RESPONSE_TIMEOUT,
207 connection_timeout: DEFAULT_CONNECTION_TIMEOUT,
208 push_sender: Default::default(),
209 #[cfg(feature = "cache-aio")]
210 cache: Default::default(),
211 dns_resolver: Default::default(),
212 pipeline_buffer_size: None,
213 #[cfg(feature = "token-based-authentication")]
214 credentials_provider: None,
215 }
216 }
217}
218
219#[cfg(feature = "aio")]
220impl AsyncConnectionConfig {
221 /// Creates a new instance of the options with nothing set
222 pub fn new() -> Self {
223 Self::default()
224 }
225
226 /// Each connection attempt to the server will time out after `connection_timeout`.
227 ///
228 /// Set `None` if you don't want the connection attempt to time out.
229 pub fn set_connection_timeout(mut self, connection_timeout: Option<Duration>) -> Self {
230 self.connection_timeout = connection_timeout;
231 self
232 }
233
234 /// The new connection will time out operations after `response_timeout` has passed.
235 ///
236 /// Set `None` if you don't want requests to time out.
237 pub fn set_response_timeout(mut self, response_timeout: Option<Duration>) -> Self {
238 self.response_timeout = response_timeout;
239 self
240 }
241
242 /// Sets sender sender for push values.
243 ///
244 /// The sender can be a channel, or an arbitrary function that handles [crate::PushInfo] values.
245 /// This will fail client creation if the connection isn't configured for RESP3 communications via the [crate::RedisConnectionInfo::set_protocol] function.
246 ///
247 /// # Examples
248 ///
249 /// ```rust
250 /// # use redis::AsyncConnectionConfig;
251 /// let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
252 /// let config = AsyncConnectionConfig::new().set_push_sender(tx);
253 /// ```
254 ///
255 /// ```rust
256 /// # use std::sync::{Mutex, Arc};
257 /// # use redis::AsyncConnectionConfig;
258 /// let messages = Arc::new(Mutex::new(Vec::new()));
259 /// let config = AsyncConnectionConfig::new().set_push_sender(move |msg|{
260 /// let Ok(mut messages) = messages.lock() else {
261 /// return Err(redis::aio::SendError);
262 /// };
263 /// messages.push(msg);
264 /// Ok(())
265 /// });
266 /// ```
267 pub fn set_push_sender(self, sender: impl AsyncPushSender) -> Self {
268 self.set_push_sender_internal(std::sync::Arc::new(sender))
269 }
270
271 pub(crate) fn set_push_sender_internal(
272 mut self,
273 sender: std::sync::Arc<dyn AsyncPushSender>,
274 ) -> Self {
275 self.push_sender = Some(sender);
276 self
277 }
278
279 /// Sets cache config for MultiplexedConnection, check CacheConfig for more details.
280 #[cfg(feature = "cache-aio")]
281 pub fn set_cache_config(mut self, cache_config: CacheConfig) -> Self {
282 self.cache = Some(Cache::Config(cache_config));
283 self
284 }
285
286 #[cfg(all(
287 feature = "cache-aio",
288 any(feature = "connection-manager", feature = "cluster-async")
289 ))]
290 pub(crate) fn set_cache_manager(mut self, cache_manager: CacheManager) -> Self {
291 self.cache = Some(Cache::Manager(cache_manager));
292 self
293 }
294
295 /// Set the DNS resolver for the underlying TCP connection.
296 ///
297 /// The parameter resolver must implement the [`crate::io::AsyncDNSResolver`] trait.
298 pub fn set_dns_resolver(self, dns_resolver: impl AsyncDNSResolver) -> Self {
299 self.set_dns_resolver_internal(std::sync::Arc::new(dns_resolver))
300 }
301
302 pub(super) fn set_dns_resolver_internal(
303 mut self,
304 dns_resolver: std::sync::Arc<dyn AsyncDNSResolver>,
305 ) -> Self {
306 self.dns_resolver = Some(dns_resolver);
307 self
308 }
309
310 /// Sets the buffer size for the internal pipeline channel.
311 ///
312 /// The multiplexed connection uses an internal channel to queue Redis commands
313 /// before sending them to the server. This setting controls how many commands
314 /// can be buffered in that channel.
315 ///
316 /// When the buffer is full, callers will asynchronously wait until space becomes
317 /// available. A larger buffer allows more commands to be queued during bursts of
318 /// activity, reducing wait time for callers. However, this comes at the cost of
319 /// increased memory usage.
320 ///
321 /// The default value is 50. Consider increasing this value for high-concurrency
322 /// scenarios (e.g., web servers handling many simultaneous requests) where
323 /// buffer contention may increase overall latency and cause upstream timeouts.
324 pub fn set_pipeline_buffer_size(mut self, size: usize) -> Self {
325 self.pipeline_buffer_size = Some(size);
326 self
327 }
328
329 /// Sets a credentials provider for dynamic authentication (e.g., token-based authentication).
330 ///
331 /// This is useful for authentication mechanisms that require periodic credential refresh,
332 /// such as Microsoft Entra ID (formerly Azure AD).
333 ///
334 /// # Example
335 ///
336 /// ```rust,no_run
337 /// # #[cfg(feature = "entra-id")]
338 /// # {
339 /// use redis::{AsyncConnectionConfig, EntraIdCredentialsProvider, RetryConfig};
340 ///
341 /// # async fn example() -> redis::RedisResult<()> {
342 /// let mut provider = EntraIdCredentialsProvider::new_developer_tools()?;
343 /// provider.start(RetryConfig::default());
344 ///
345 /// let config = AsyncConnectionConfig::new()
346 /// .set_credentials_provider(provider);
347 /// # Ok(())
348 /// # }
349 /// # }
350 /// ```
351 #[cfg(feature = "token-based-authentication")]
352 pub fn set_credentials_provider<P>(mut self, provider: P) -> Self
353 where
354 P: StreamingCredentialsProvider + 'static,
355 {
356 self.credentials_provider = Some(std::sync::Arc::new(provider));
357 self
358 }
359}
360
361/// To enable async support you need to chose one of the supported runtimes and active its
362/// corresponding feature: `tokio-comp` or `smol-comp`
363#[cfg(feature = "aio")]
364#[cfg_attr(docsrs, doc(cfg(feature = "aio")))]
365impl Client {
366 /// Returns an async connection from the client.
367 #[cfg(feature = "aio")]
368 #[cfg_attr(docsrs, doc(cfg(feature = "aio")))]
369 pub async fn get_multiplexed_async_connection(
370 &self,
371 ) -> RedisResult<crate::aio::MultiplexedConnection> {
372 self.get_multiplexed_async_connection_with_config(&AsyncConnectionConfig::new())
373 .await
374 }
375
376 /// Returns an async connection from the client.
377 #[cfg(feature = "aio")]
378 #[cfg_attr(docsrs, doc(cfg(feature = "aio")))]
379 pub async fn get_multiplexed_async_connection_with_config(
380 &self,
381 config: &AsyncConnectionConfig,
382 ) -> RedisResult<crate::aio::MultiplexedConnection> {
383 match Runtime::locate() {
384 #[cfg(feature = "tokio-comp")]
385 rt @ Runtime::Tokio => self
386 .get_multiplexed_async_connection_inner_with_timeout::<crate::aio::tokio::Tokio>(
387 config, rt,
388 )
389 .await,
390
391 #[cfg(feature = "smol-comp")]
392 rt @ Runtime::Smol => {
393 self.get_multiplexed_async_connection_inner_with_timeout::<crate::aio::smol::Smol>(
394 config, rt,
395 )
396 .await
397 }
398 }
399 }
400
401 /// Returns an async [`ConnectionManager`][connection-manager] from the client.
402 ///
403 /// The connection manager wraps a
404 /// [`MultiplexedConnection`][multiplexed-connection]. If a command to that
405 /// connection fails with a connection error, then a new connection is
406 /// established in the background and the error is returned to the caller.
407 ///
408 /// This means that on connection loss at least one command will fail, but
409 /// the connection will be re-established automatically if possible. Please
410 /// refer to the [`ConnectionManager`][connection-manager] docs for
411 /// detailed reconnecting behavior.
412 ///
413 /// A connection manager can be cloned, allowing requests to be sent concurrently
414 /// on the same underlying connection (tcp/unix socket).
415 ///
416 /// [connection-manager]: aio/struct.ConnectionManager.html
417 /// [multiplexed-connection]: aio/struct.MultiplexedConnection.html
418 #[cfg(feature = "connection-manager")]
419 #[cfg_attr(docsrs, doc(cfg(feature = "connection-manager")))]
420 pub async fn get_connection_manager(&self) -> RedisResult<crate::aio::ConnectionManager> {
421 crate::aio::ConnectionManager::new(self.clone()).await
422 }
423
424 /// Returns an async [`ConnectionManager`][connection-manager] from the client.
425 ///
426 /// The connection manager wraps a
427 /// [`MultiplexedConnection`][multiplexed-connection]. If a command to that
428 /// connection fails with a connection error, then a new connection is
429 /// established in the background and the error is returned to the caller.
430 ///
431 /// This means that on connection loss at least one command will fail, but
432 /// the connection will be re-established automatically if possible. Please
433 /// refer to the [`ConnectionManager`][connection-manager] docs for
434 /// detailed reconnecting behavior.
435 ///
436 /// A connection manager can be cloned, allowing requests to be sent concurrently
437 /// on the same underlying connection (tcp/unix socket).
438 ///
439 /// [connection-manager]: aio/struct.ConnectionManager.html
440 /// [multiplexed-connection]: aio/struct.MultiplexedConnection.html
441 #[cfg(feature = "connection-manager")]
442 #[cfg_attr(docsrs, doc(cfg(feature = "connection-manager")))]
443 pub async fn get_connection_manager_with_config(
444 &self,
445 config: crate::aio::ConnectionManagerConfig,
446 ) -> RedisResult<crate::aio::ConnectionManager> {
447 crate::aio::ConnectionManager::new_with_config(self.clone(), config).await
448 }
449
450 async fn get_multiplexed_async_connection_inner_with_timeout<T>(
451 &self,
452 config: &AsyncConnectionConfig,
453 rt: Runtime,
454 ) -> RedisResult<crate::aio::MultiplexedConnection>
455 where
456 T: crate::aio::RedisRuntime,
457 {
458 let result = if let Some(connection_timeout) = config.connection_timeout {
459 rt.timeout(
460 connection_timeout,
461 self.get_multiplexed_async_connection_inner::<T>(config),
462 )
463 .await
464 } else {
465 Ok(self
466 .get_multiplexed_async_connection_inner::<T>(config)
467 .await)
468 };
469
470 match result {
471 Ok(Ok(connection)) => Ok(connection),
472 Ok(Err(e)) => Err(e),
473 Err(elapsed) => Err(elapsed.into()),
474 }
475 }
476
477 async fn get_multiplexed_async_connection_inner<T>(
478 &self,
479 config: &AsyncConnectionConfig,
480 ) -> RedisResult<crate::aio::MultiplexedConnection>
481 where
482 T: crate::aio::RedisRuntime,
483 {
484 let (mut connection, driver) = self
485 .create_multiplexed_async_connection_inner::<T>(config)
486 .await?;
487 let handle = T::spawn(driver);
488 connection.set_task_handle(handle);
489 Ok(connection)
490 }
491
492 async fn create_multiplexed_async_connection_inner<T>(
493 &self,
494 config: &AsyncConnectionConfig,
495 ) -> RedisResult<(
496 crate::aio::MultiplexedConnection,
497 impl std::future::Future<Output = ()> + 'static,
498 )>
499 where
500 T: crate::aio::RedisRuntime,
501 {
502 let resolver = config
503 .dns_resolver
504 .as_deref()
505 .unwrap_or(&DefaultAsyncDNSResolver);
506 let con = self.get_simple_async_connection::<T>(resolver).await?;
507 crate::aio::MultiplexedConnection::new_with_config(
508 &self.connection_info.redis,
509 con,
510 config.clone(),
511 )
512 .await
513 }
514
515 async fn get_simple_async_connection_dynamically(
516 &self,
517 dns_resolver: &dyn AsyncDNSResolver,
518 ) -> RedisResult<Pin<Box<dyn crate::aio::AsyncStream + Send + Sync>>> {
519 match Runtime::locate() {
520 #[cfg(feature = "tokio-comp")]
521 Runtime::Tokio => {
522 self.get_simple_async_connection::<crate::aio::tokio::Tokio>(dns_resolver)
523 .await
524 }
525
526 #[cfg(feature = "smol-comp")]
527 Runtime::Smol => {
528 self.get_simple_async_connection::<crate::aio::smol::Smol>(dns_resolver)
529 .await
530 }
531 }
532 }
533
534 async fn get_simple_async_connection<T>(
535 &self,
536 dns_resolver: &dyn AsyncDNSResolver,
537 ) -> RedisResult<Pin<Box<dyn crate::aio::AsyncStream + Send + Sync>>>
538 where
539 T: crate::aio::RedisRuntime,
540 {
541 Ok(
542 crate::aio::connect_simple::<T>(&self.connection_info, dns_resolver)
543 .await?
544 .boxed(),
545 )
546 }
547
548 #[cfg(feature = "connection-manager")]
549 pub(crate) fn connection_info(&self) -> &ConnectionInfo {
550 &self.connection_info
551 }
552
553 /// Returns an async receiver for pub-sub messages.
554 #[cfg(feature = "aio")]
555 // TODO - do we want to type-erase pubsub using a trait, to allow us to replace it with a different implementation later?
556 pub async fn get_async_pubsub(&self) -> RedisResult<crate::aio::PubSub> {
557 let connection = self
558 .get_simple_async_connection_dynamically(&DefaultAsyncDNSResolver)
559 .await?;
560
561 crate::aio::PubSub::new(&self.connection_info.redis, connection).await
562 }
563
564 /// Returns an async receiver for monitor messages.
565 #[cfg(feature = "aio")]
566 pub async fn get_async_monitor(&self) -> RedisResult<crate::aio::Monitor> {
567 let connection = self
568 .get_simple_async_connection_dynamically(&DefaultAsyncDNSResolver)
569 .await?;
570 crate::aio::Monitor::new(&self.connection_info.redis, connection).await
571 }
572}
573
574#[cfg(feature = "aio")]
575use crate::aio::Runtime;
576
577impl ConnectionLike for Client {
578 fn req_packed_command(&mut self, cmd: &[u8]) -> RedisResult<Value> {
579 self.get_connection()?.req_packed_command(cmd)
580 }
581
582 fn req_packed_commands(
583 &mut self,
584 cmd: &[u8],
585 offset: usize,
586 count: usize,
587 ) -> RedisResult<Vec<Value>> {
588 self.get_connection()?
589 .req_packed_commands(cmd, offset, count)
590 }
591
592 fn get_db(&self) -> i64 {
593 self.connection_info.redis.db
594 }
595
596 fn check_connection(&mut self) -> bool {
597 if let Ok(mut conn) = self.get_connection() {
598 conn.check_connection()
599 } else {
600 false
601 }
602 }
603
604 fn is_open(&self) -> bool {
605 if let Ok(conn) = self.get_connection() {
606 conn.is_open()
607 } else {
608 false
609 }
610 }
611}
612
613#[cfg(test)]
614mod test {
615 use super::*;
616 use assert_matches::assert_matches;
617
618 #[test]
619 fn regression_293_parse_ipv6_with_interface() {
620 assert_matches!(Client::open(("fe80::cafe:beef%eno1", 6379)), Ok(_));
621 }
622
623 #[cfg(feature = "aio")]
624 #[test]
625 fn test_async_connection_config_pipeline_buffer_size_default() {
626 let config = AsyncConnectionConfig::new();
627 assert_eq!(config.pipeline_buffer_size, None);
628 }
629
630 #[cfg(feature = "aio")]
631 #[test]
632 fn test_async_connection_config_pipeline_buffer_size_custom() {
633 let config = AsyncConnectionConfig::new().set_pipeline_buffer_size(100);
634 assert_eq!(config.pipeline_buffer_size, Some(100));
635 }
636}